From 81effe40b5048cbb5d2b8f0a21c8111e4cebffad Mon Sep 17 00:00:00 2001 From: Minh Doan Date: Sat, 8 Jun 2019 15:04:10 -0700 Subject: [PATCH 1/3] run gc every 30 minutes --- cmd/harmony/main.go | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/cmd/harmony/main.go b/cmd/harmony/main.go index 92b89a09c..27731b7d6 100644 --- a/cmd/harmony/main.go +++ b/cmd/harmony/main.go @@ -98,6 +98,8 @@ var ( accountIndex = flag.Int("account_index", 0, "the index of the staking account to use") shardID = flag.Int("shard_id", -1, "the shard ID of this node") enableMemProfiling = flag.Bool("enableMemProfiling", false, "Enable memsize logging.") + enableGC = flag.Bool("enableGC", true, "Enable calling garbage collector manually .") + // logConn logs incoming/outgoing connections logConn = flag.Bool("log_conn", false, "log incoming/outgoing connections") @@ -120,6 +122,12 @@ var ( "Do not propose view change (testing only)") ) +// Constants for GC. +const ( + // Run garbage collector every 30 minutes. + gcTime = 30 * time.Minute +) + func initSetup() { if *versionFlag { printVersion(os.Args[0]) @@ -188,6 +196,23 @@ func initSetup() { } hmykey.SetHmyPass(myPass) } + + // Set up manual call for garbage collection. + if *enableGC { + maybeCallGCPeriodically() + } +} + +// Run GC manually every 30 minutes. This is one of the options to mitigate the OOM issue. +func maybeCallGCPeriodically() { + go func() { + for { + select { + case <-time.After(gcTime): + runtime.GC() + } + } + }() } func createGlobalConfig() *nodeconfig.ConfigType { From ab145ea9a4a65ea9907e8a712ef81f7c7143bfa4 Mon Sep 17 00:00:00 2001 From: Minh Doan Date: Sat, 8 Jun 2019 16:32:39 -0700 Subject: [PATCH 2/3] add mem stats --- cmd/harmony/main.go | 20 +------------------- internal/memprofiling/lib.go | 34 +++++++++++++++++++++++++++++++++- 2 files changed, 34 insertions(+), 20 deletions(-) diff --git a/cmd/harmony/main.go b/cmd/harmony/main.go index 27731b7d6..3d50b8201 100644 --- a/cmd/harmony/main.go +++ b/cmd/harmony/main.go @@ -122,12 +122,6 @@ var ( "Do not propose view change (testing only)") ) -// Constants for GC. -const ( - // Run garbage collector every 30 minutes. - gcTime = 30 * time.Minute -) - func initSetup() { if *versionFlag { printVersion(os.Args[0]) @@ -199,22 +193,10 @@ func initSetup() { // Set up manual call for garbage collection. if *enableGC { - maybeCallGCPeriodically() + memprofiling.MaybeCallGCPeriodically() } } -// Run GC manually every 30 minutes. This is one of the options to mitigate the OOM issue. -func maybeCallGCPeriodically() { - go func() { - for { - select { - case <-time.After(gcTime): - runtime.GC() - } - } - }() -} - func createGlobalConfig() *nodeconfig.ConfigType { var err error var myShardID uint32 diff --git a/internal/memprofiling/lib.go b/internal/memprofiling/lib.go index 73e913a99..3d15786c0 100644 --- a/internal/memprofiling/lib.go +++ b/internal/memprofiling/lib.go @@ -4,6 +4,7 @@ import ( "fmt" "net/http" "reflect" + "runtime" "sync" "time" @@ -18,9 +19,11 @@ const ( MemProfilingPortDiff = 1000 // Constants of for scanning mem size. memSizeScanTime = 30 * time.Second + // Run garbage collector every 30 minutes. + gcTime = 10 * time.Minute ) -// MemProfiling is the struct of MemProfiling. +// MemProfiling is the struct to watch objects for memprofiling. type MemProfiling struct { h *memsizeui.Handler s *http.Server @@ -88,3 +91,32 @@ func (m *MemProfiling) PeriodicallyScanMemSize() { } }() } + +// MaybeCallGCPeriodically runs GC manually every gcTime minutes. This is one of the options to mitigate the OOM issue. +func MaybeCallGCPeriodically() { + go func() { + for { + select { + case <-time.After(gcTime): + PrintMemUsage("mem stats before GC") + runtime.GC() + PrintMemUsage("mem stats after GC") + } + } + }() +} + +// PrintMemUsage prints memory usage. +func PrintMemUsage(msg string) { + var m runtime.MemStats + runtime.ReadMemStats(&m) + utils.GetLogInstance().Info(msg, + "alloc", bToMb(m.Alloc), + "totalalloc", bToMb(m.TotalAlloc), + "sys", bToMb(m.Sys), + "numgc", m.NumGC) +} + +func bToMb(b uint64) uint64 { + return b / 1024 / 1024 +} From 9009c39398c25fdc3c33f3d9e360eb2444c45d7f Mon Sep 17 00:00:00 2001 From: Minh Doan Date: Sat, 8 Jun 2019 18:55:17 -0700 Subject: [PATCH 3/3] disable watching some objects and fix potential memory leak --- cmd/harmony/main.go | 1 - consensus/consensus.go | 15 --------------- consensus/consensus_service.go | 3 --- consensus/view_change.go | 3 --- node/node.go | 23 ++++++++++------------- 5 files changed, 10 insertions(+), 35 deletions(-) diff --git a/cmd/harmony/main.go b/cmd/harmony/main.go index 3d50b8201..656d1948f 100644 --- a/cmd/harmony/main.go +++ b/cmd/harmony/main.go @@ -386,7 +386,6 @@ func setUpConsensusAndNode(nodeConfig *nodeconfig.ConfigType) *node.Node { // Watching currentNode and currentConsensus. memprofiling.GetMemProfiling().Add("currentNode", currentNode) memprofiling.GetMemProfiling().Add("currentConsensus", currentConsensus) - currentNode.WatchObservedObjects() return currentNode } diff --git a/consensus/consensus.go b/consensus/consensus.go index 52e8077b0..d41ee685b 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -17,7 +17,6 @@ import ( bls_cosi "github.com/harmony-one/harmony/crypto/bls" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/internal/ctxerror" - "github.com/harmony-one/harmony/internal/memprofiling" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils/contract" "github.com/harmony-one/harmony/p2p" @@ -142,18 +141,6 @@ type Consensus struct { disableViewChange bool } -// WatchObservedObjects adds more objects from consensus object to watch for memory issues. -func (consensus *Consensus) WatchObservedObjects() { - memprofiling.GetMemProfiling().Add("consensus.prepareSigs", &consensus.prepareSigs) - memprofiling.GetMemProfiling().Add("consensus.commitSigs", &consensus.commitSigs) - memprofiling.GetMemProfiling().Add("consensus.prepareBitmap", &consensus.prepareBitmap) - memprofiling.GetMemProfiling().Add("consensus.commitBitmap", &consensus.commitBitmap) - memprofiling.GetMemProfiling().Add("consensus.bhpSigs", &consensus.bhpSigs) - memprofiling.GetMemProfiling().Add("consensus.nilSigs", &consensus.nilSigs) - memprofiling.GetMemProfiling().Add("consensus.bhpBitmap", &consensus.bhpBitmap) - memprofiling.GetMemProfiling().Add("consensus.nilBitmap", &consensus.nilBitmap) -} - // StakeInfoFinder returns the stake information finder instance this // consensus uses, e.g. for block reward distribution. func (consensus *Consensus) StakeInfoFinder() StakeInfoFinder { @@ -260,8 +247,6 @@ func New(host p2p.Host, ShardID uint32, leader p2p.Peer, blsPriKey *bls.SecretKe consensus.uniqueIDInstance = utils.GetUniqueValidatorIDInstance() - // Watch objects for the first time. - consensus.WatchObservedObjects() return &consensus, nil } diff --git a/consensus/consensus_service.go b/consensus/consensus_service.go index 7d6b898f5..1045aa045 100644 --- a/consensus/consensus_service.go +++ b/consensus/consensus_service.go @@ -359,9 +359,6 @@ func (consensus *Consensus) ResetState() { consensus.commitBitmap = commitBitmap consensus.aggregatedPrepareSig = nil consensus.aggregatedCommitSig = nil - - // Because we created new map objects we need to overwrite the mapping of observed objects. - consensus.WatchObservedObjects() } // Returns a string representation of this consensus diff --git a/consensus/view_change.go b/consensus/view_change.go index 1ff29040d..40b8689e2 100644 --- a/consensus/view_change.go +++ b/consensus/view_change.go @@ -150,9 +150,6 @@ func (consensus *Consensus) ResetViewChangeState() { consensus.bhpSigs = map[common.Address]*bls.Sign{} consensus.nilSigs = map[common.Address]*bls.Sign{} consensus.viewIDSigs = map[common.Address]*bls.Sign{} - - // Because we created new map objects we need to overwrite the mapping of observed objects. - consensus.WatchObservedObjects() } func createTimeout() map[TimeoutType]*utils.Timeout { diff --git a/node/node.go b/node/node.go index 46e4407eb..6e7d7e0f5 100644 --- a/node/node.go +++ b/node/node.go @@ -27,7 +27,6 @@ import ( "github.com/harmony-one/harmony/drand" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/internal/ctxerror" - "github.com/harmony-one/harmony/internal/memprofiling" "github.com/harmony-one/harmony/internal/shardchain" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/node/worker" @@ -224,16 +223,19 @@ func (node *Node) Beaconchain() *core.BlockChain { return bc } +func (node *Node) reducePendingTransactions() { + // If length of pendingTransactions is greater than TxPoolLimit then by greedy take the TxPoolLimit recent transactions. + if len(node.pendingTransactions) > TxPoolLimit+TxPoolLimit { + curLen := len(node.pendingTransactions) + node.pendingTransactions = append(types.Transactions(nil), node.pendingTransactions[curLen-TxPoolLimit:]...) + } +} + // Add new transactions to the pending transaction list. func (node *Node) addPendingTransactions(newTxs types.Transactions) { node.pendingTxMutex.Lock() node.pendingTransactions = append(node.pendingTransactions, newTxs...) - // If length of pendingTransactions is greater than TxPoolLimit then by greedy take the TxPoolLimit recent transactions. - if len(node.pendingTransactions) > TxPoolLimit { - utils.GetLogInstance().Warn("Got more transactions than expected and this could caused OOM", "num", len(newTxs), "totalPending", len(node.pendingTransactions)) - curLen := len(node.pendingTransactions) - node.pendingTransactions = node.pendingTransactions[curLen-TxPoolLimit:] - } + node.reducePendingTransactions() node.pendingTxMutex.Unlock() utils.GetLogInstance().Info("Got more transactions", "num", len(newTxs), "totalPending", len(node.pendingTransactions)) } @@ -251,6 +253,7 @@ func (node *Node) getTransactionsForNewBlock(maxNumTxs int) types.Transactions { selected, unselected, invalid := node.Worker.SelectTransactionsForNewBlock(node.pendingTransactions, maxNumTxs) node.pendingTransactions = unselected + node.reducePendingTransactions() utils.GetLogInstance().Debug("Selecting Transactions", "remainPending", len(node.pendingTransactions), "selected", len(selected), "invalidDiscarded", len(invalid)) node.pendingTxMutex.Unlock() return selected @@ -283,12 +286,6 @@ func (node *Node) GetSyncID() [SyncIDLength]byte { return node.syncID } -// WatchObservedObjects adds more objects to watch for memory issues. -func (node *Node) WatchObservedObjects() { - memprofiling.GetMemProfiling().Add("currentNode.pendingTransactions", &node.pendingTransactions) - memprofiling.GetMemProfiling().Add("currentNode.transactionInConsensus", &node.transactionInConsensus) -} - // New creates a new node. func New(host p2p.Host, consensusObj *consensus.Consensus, chainDBFactory shardchain.DBFactory, isArchival bool) *Node { var err error