Merge pull request #987 from mikedoan/gc

run gc every 10 minutes
pull/993/head
Leo Chen 6 years ago committed by GitHub
commit 0087b1c327
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      cmd/harmony/main.go
  2. 15
      consensus/consensus.go
  3. 3
      consensus/consensus_service.go
  4. 3
      consensus/view_change.go
  5. 34
      internal/memprofiling/lib.go
  6. 23
      node/node.go

@ -99,6 +99,8 @@ var (
accountIndex = flag.Int("account_index", 0, "the index of the staking account to use") accountIndex = flag.Int("account_index", 0, "the index of the staking account to use")
shardID = flag.Int("shard_id", -1, "the shard ID of this node") shardID = flag.Int("shard_id", -1, "the shard ID of this node")
enableMemProfiling = flag.Bool("enableMemProfiling", false, "Enable memsize logging.") enableMemProfiling = flag.Bool("enableMemProfiling", false, "Enable memsize logging.")
enableGC = flag.Bool("enableGC", true, "Enable calling garbage collector manually .")
// logConn logs incoming/outgoing connections // logConn logs incoming/outgoing connections
logConn = flag.Bool("log_conn", false, "log incoming/outgoing connections") logConn = flag.Bool("log_conn", false, "log incoming/outgoing connections")
@ -189,6 +191,11 @@ func initSetup() {
} }
hmykey.SetHmyPass(myPass) hmykey.SetHmyPass(myPass)
} }
// Set up manual call for garbage collection.
if *enableGC {
memprofiling.MaybeCallGCPeriodically()
}
} }
func createGlobalConfig() *nodeconfig.ConfigType { func createGlobalConfig() *nodeconfig.ConfigType {
@ -387,7 +394,6 @@ func setUpConsensusAndNode(nodeConfig *nodeconfig.ConfigType) *node.Node {
// Watching currentNode and currentConsensus. // Watching currentNode and currentConsensus.
memprofiling.GetMemProfiling().Add("currentNode", currentNode) memprofiling.GetMemProfiling().Add("currentNode", currentNode)
memprofiling.GetMemProfiling().Add("currentConsensus", currentConsensus) memprofiling.GetMemProfiling().Add("currentConsensus", currentConsensus)
currentNode.WatchObservedObjects()
return currentNode return currentNode
} }

@ -17,7 +17,6 @@ import (
bls_cosi "github.com/harmony-one/harmony/crypto/bls" bls_cosi "github.com/harmony-one/harmony/crypto/bls"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node" nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/ctxerror" "github.com/harmony-one/harmony/internal/ctxerror"
"github.com/harmony-one/harmony/internal/memprofiling"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/internal/utils/contract" "github.com/harmony-one/harmony/internal/utils/contract"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
@ -142,18 +141,6 @@ type Consensus struct {
disableViewChange bool disableViewChange bool
} }
// WatchObservedObjects adds more objects from consensus object to watch for memory issues.
func (consensus *Consensus) WatchObservedObjects() {
memprofiling.GetMemProfiling().Add("consensus.prepareSigs", &consensus.prepareSigs)
memprofiling.GetMemProfiling().Add("consensus.commitSigs", &consensus.commitSigs)
memprofiling.GetMemProfiling().Add("consensus.prepareBitmap", &consensus.prepareBitmap)
memprofiling.GetMemProfiling().Add("consensus.commitBitmap", &consensus.commitBitmap)
memprofiling.GetMemProfiling().Add("consensus.bhpSigs", &consensus.bhpSigs)
memprofiling.GetMemProfiling().Add("consensus.nilSigs", &consensus.nilSigs)
memprofiling.GetMemProfiling().Add("consensus.bhpBitmap", &consensus.bhpBitmap)
memprofiling.GetMemProfiling().Add("consensus.nilBitmap", &consensus.nilBitmap)
}
// StakeInfoFinder returns the stake information finder instance this // StakeInfoFinder returns the stake information finder instance this
// consensus uses, e.g. for block reward distribution. // consensus uses, e.g. for block reward distribution.
func (consensus *Consensus) StakeInfoFinder() StakeInfoFinder { func (consensus *Consensus) StakeInfoFinder() StakeInfoFinder {
@ -260,8 +247,6 @@ func New(host p2p.Host, ShardID uint32, leader p2p.Peer, blsPriKey *bls.SecretKe
consensus.uniqueIDInstance = utils.GetUniqueValidatorIDInstance() consensus.uniqueIDInstance = utils.GetUniqueValidatorIDInstance()
// Watch objects for the first time.
consensus.WatchObservedObjects()
return &consensus, nil return &consensus, nil
} }

@ -360,9 +360,6 @@ func (consensus *Consensus) ResetState() {
consensus.commitBitmap = commitBitmap consensus.commitBitmap = commitBitmap
consensus.aggregatedPrepareSig = nil consensus.aggregatedPrepareSig = nil
consensus.aggregatedCommitSig = nil consensus.aggregatedCommitSig = nil
// Because we created new map objects we need to overwrite the mapping of observed objects.
consensus.WatchObservedObjects()
} }
// Returns a string representation of this consensus // Returns a string representation of this consensus

@ -150,9 +150,6 @@ func (consensus *Consensus) ResetViewChangeState() {
consensus.bhpSigs = map[common.Address]*bls.Sign{} consensus.bhpSigs = map[common.Address]*bls.Sign{}
consensus.nilSigs = map[common.Address]*bls.Sign{} consensus.nilSigs = map[common.Address]*bls.Sign{}
consensus.viewIDSigs = map[common.Address]*bls.Sign{} consensus.viewIDSigs = map[common.Address]*bls.Sign{}
// Because we created new map objects we need to overwrite the mapping of observed objects.
consensus.WatchObservedObjects()
} }
func createTimeout() map[TimeoutType]*utils.Timeout { func createTimeout() map[TimeoutType]*utils.Timeout {

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"reflect" "reflect"
"runtime"
"sync" "sync"
"time" "time"
@ -18,9 +19,11 @@ const (
MemProfilingPortDiff = 1000 MemProfilingPortDiff = 1000
// Constants of for scanning mem size. // Constants of for scanning mem size.
memSizeScanTime = 30 * time.Second memSizeScanTime = 30 * time.Second
// Run garbage collector every 30 minutes.
gcTime = 10 * time.Minute
) )
// MemProfiling is the struct of MemProfiling. // MemProfiling is the struct to watch objects for memprofiling.
type MemProfiling struct { type MemProfiling struct {
h *memsizeui.Handler h *memsizeui.Handler
s *http.Server s *http.Server
@ -88,3 +91,32 @@ func (m *MemProfiling) PeriodicallyScanMemSize() {
} }
}() }()
} }
// MaybeCallGCPeriodically runs GC manually every gcTime minutes. This is one of the options to mitigate the OOM issue.
func MaybeCallGCPeriodically() {
go func() {
for {
select {
case <-time.After(gcTime):
PrintMemUsage("mem stats before GC")
runtime.GC()
PrintMemUsage("mem stats after GC")
}
}
}()
}
// PrintMemUsage prints memory usage.
func PrintMemUsage(msg string) {
var m runtime.MemStats
runtime.ReadMemStats(&m)
utils.GetLogInstance().Info(msg,
"alloc", bToMb(m.Alloc),
"totalalloc", bToMb(m.TotalAlloc),
"sys", bToMb(m.Sys),
"numgc", m.NumGC)
}
func bToMb(b uint64) uint64 {
return b / 1024 / 1024
}

@ -27,7 +27,6 @@ import (
"github.com/harmony-one/harmony/drand" "github.com/harmony-one/harmony/drand"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node" nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/ctxerror" "github.com/harmony-one/harmony/internal/ctxerror"
"github.com/harmony-one/harmony/internal/memprofiling"
"github.com/harmony-one/harmony/internal/shardchain" "github.com/harmony-one/harmony/internal/shardchain"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/node/worker" "github.com/harmony-one/harmony/node/worker"
@ -224,16 +223,19 @@ func (node *Node) Beaconchain() *core.BlockChain {
return bc return bc
} }
func (node *Node) reducePendingTransactions() {
// If length of pendingTransactions is greater than TxPoolLimit then by greedy take the TxPoolLimit recent transactions.
if len(node.pendingTransactions) > TxPoolLimit+TxPoolLimit {
curLen := len(node.pendingTransactions)
node.pendingTransactions = append(types.Transactions(nil), node.pendingTransactions[curLen-TxPoolLimit:]...)
}
}
// Add new transactions to the pending transaction list. // Add new transactions to the pending transaction list.
func (node *Node) addPendingTransactions(newTxs types.Transactions) { func (node *Node) addPendingTransactions(newTxs types.Transactions) {
node.pendingTxMutex.Lock() node.pendingTxMutex.Lock()
node.pendingTransactions = append(node.pendingTransactions, newTxs...) node.pendingTransactions = append(node.pendingTransactions, newTxs...)
// If length of pendingTransactions is greater than TxPoolLimit then by greedy take the TxPoolLimit recent transactions. node.reducePendingTransactions()
if len(node.pendingTransactions) > TxPoolLimit {
utils.GetLogInstance().Warn("Got more transactions than expected and this could caused OOM", "num", len(newTxs), "totalPending", len(node.pendingTransactions))
curLen := len(node.pendingTransactions)
node.pendingTransactions = node.pendingTransactions[curLen-TxPoolLimit:]
}
node.pendingTxMutex.Unlock() node.pendingTxMutex.Unlock()
utils.GetLogInstance().Info("Got more transactions", "num", len(newTxs), "totalPending", len(node.pendingTransactions)) utils.GetLogInstance().Info("Got more transactions", "num", len(newTxs), "totalPending", len(node.pendingTransactions))
} }
@ -251,6 +253,7 @@ func (node *Node) getTransactionsForNewBlock(maxNumTxs int) types.Transactions {
selected, unselected, invalid := node.Worker.SelectTransactionsForNewBlock(node.pendingTransactions, maxNumTxs) selected, unselected, invalid := node.Worker.SelectTransactionsForNewBlock(node.pendingTransactions, maxNumTxs)
node.pendingTransactions = unselected node.pendingTransactions = unselected
node.reducePendingTransactions()
utils.GetLogInstance().Debug("Selecting Transactions", "remainPending", len(node.pendingTransactions), "selected", len(selected), "invalidDiscarded", len(invalid)) utils.GetLogInstance().Debug("Selecting Transactions", "remainPending", len(node.pendingTransactions), "selected", len(selected), "invalidDiscarded", len(invalid))
node.pendingTxMutex.Unlock() node.pendingTxMutex.Unlock()
return selected return selected
@ -283,12 +286,6 @@ func (node *Node) GetSyncID() [SyncIDLength]byte {
return node.syncID return node.syncID
} }
// WatchObservedObjects adds more objects to watch for memory issues.
func (node *Node) WatchObservedObjects() {
memprofiling.GetMemProfiling().Add("currentNode.pendingTransactions", &node.pendingTransactions)
memprofiling.GetMemProfiling().Add("currentNode.transactionInConsensus", &node.transactionInConsensus)
}
// New creates a new node. // New creates a new node.
func New(host p2p.Host, consensusObj *consensus.Consensus, chainDBFactory shardchain.DBFactory, isArchival bool) *Node { func New(host p2p.Host, consensusObj *consensus.Consensus, chainDBFactory shardchain.DBFactory, isArchival bool) *Node {
var err error var err error

Loading…
Cancel
Save