State prune (#2244)

* update testnet epochs

* Fix the one-off error fully

* Add const for the bad block num

* Graceful shutdown of node

* Add syscall.SIGTERM for shutdown
pull/2251/head
Rongjian Lan 5 years ago committed by GitHub
parent 61e7330701
commit 0aeb3c9e34
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 24
      cmd/harmony/main.go
  2. 19
      core/blockchain.go
  3. 4
      core/rawdb/accessors_chain.go
  4. 10
      node/node.go
  5. 2
      node/node_handler.go

@ -8,10 +8,12 @@ import (
"math/big" "math/big"
"math/rand" "math/rand"
"os" "os"
"os/signal"
"path" "path"
"runtime" "runtime"
"strconv" "strconv"
"strings" "strings"
"syscall"
"time" "time"
ethCommon "github.com/ethereum/go-ethereum/common" ethCommon "github.com/ethereum/go-ethereum/common"
@ -78,7 +80,7 @@ var (
// isGenesis indicates this node is a genesis node // isGenesis indicates this node is a genesis node
isGenesis = flag.Bool("is_genesis", true, "true means this node is a genesis node") isGenesis = flag.Bool("is_genesis", true, "true means this node is a genesis node")
// isArchival indicates this node is an archival node that will save and archive current blockchain // isArchival indicates this node is an archival node that will save and archive current blockchain
isArchival = flag.Bool("is_archival", true, "false makes node faster by turning caching off") isArchival = flag.Bool("is_archival", false, "false will enable cached state pruning")
// delayCommit is the commit-delay timer, used by Harmony nodes // delayCommit is the commit-delay timer, used by Harmony nodes
delayCommit = flag.String("delay_commit", "0ms", "how long to delay sending commit messages in consensus, ex: 500ms, 1s") delayCommit = flag.String("delay_commit", "0ms", "how long to delay sending commit messages in consensus, ex: 500ms, 1s")
// nodeType indicates the type of the node: validator, explorer // nodeType indicates the type of the node: validator, explorer
@ -636,5 +638,25 @@ func main() {
go currentNode.CollectMetrics() go currentNode.CollectMetrics()
} }
// Prepare for graceful shutdown from os signals
osSignal := make(chan os.Signal)
signal.Notify(osSignal, os.Interrupt, os.Kill, syscall.SIGTERM)
go func() {
for {
select {
case sig := <-osSignal:
if sig == os.Kill || sig == syscall.SIGTERM {
fmt.Printf("Got %s signal. Gracefully shutting down...\n", sig)
currentNode.ShutDown()
}
if sig == os.Interrupt {
fmt.Printf("Got %s signal. Dumping state to DB...\n", sig)
currentNode.Blockchain().Stop()
currentNode.Beaconchain().Stop()
}
}
}
}()
currentNode.StartServer() currentNode.StartServer()
} }

@ -180,7 +180,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
if cacheConfig == nil { if cacheConfig == nil {
cacheConfig = &CacheConfig{ cacheConfig = &CacheConfig{
TrieNodeLimit: 256 * 1024 * 1024, TrieNodeLimit: 256 * 1024 * 1024,
TrieTimeLimit: 5 * time.Minute, TrieTimeLimit: 2 * time.Minute,
} }
} }
bodyCache, _ := lru.New(bodyCacheLimit) bodyCache, _ := lru.New(bodyCacheLimit)
@ -326,13 +326,15 @@ func (bc *BlockChain) loadLastState() error {
// Everything seems to be fine, set as the head block // Everything seems to be fine, set as the head block
bc.currentBlock.Store(currentBlock) bc.currentBlock.Store(currentBlock)
// We don't need the following as we want the current header and block to be consistent
// Restore the last known head header // Restore the last known head header
//currentHeader := currentBlock.Header()
//if head := rawdb.ReadHeadHeaderHash(bc.db); head != (common.Hash{}) {
// if header := bc.GetHeaderByHash(head); header != nil {
// currentHeader = header
// }
//}
currentHeader := currentBlock.Header() currentHeader := currentBlock.Header()
if head := rawdb.ReadHeadHeaderHash(bc.db); head != (common.Hash{}) {
if header := bc.GetHeaderByHash(head); header != nil {
currentHeader = header
}
}
bc.hc.SetCurrentHeader(currentHeader) bc.hc.SetCurrentHeader(currentHeader)
// Restore the last known head fast block // Restore the last known head fast block
@ -555,6 +557,11 @@ func (bc *BlockChain) repair(head **types.Block) error {
Msg("Rewound blockchain to past state") Msg("Rewound blockchain to past state")
return nil return nil
} }
// Repair last commit sigs
lastSig := (*head).Header().LastCommitSignature()
sigAndBitMap := append(lastSig[:], (*head).Header().LastCommitBitmap()...)
bc.WriteLastCommits(sigAndBitMap)
// Otherwise rewind one block and recheck state availability there // Otherwise rewind one block and recheck state availability there
(*head) = bc.GetBlock((*head).ParentHash(), (*head).NumberU64()-1) (*head) = bc.GetBlock((*head).ParentHash(), (*head).NumberU64()-1)
} }

@ -439,7 +439,7 @@ func WriteShardStateBytes(db DatabaseWriter, epoch *big.Int, data []byte) (err e
"epoch", epoch, "epoch", epoch,
).WithCause(err) ).WithCause(err)
} }
utils.Logger().Info().Str("epoch", epoch.String()).Int("numShards", len(data)).Msg("wrote sharding state") utils.Logger().Info().Str("epoch", epoch.String()).Int("size", len(data)).Msg("wrote sharding state")
return nil return nil
} }
@ -461,7 +461,7 @@ func WriteLastCommits(
return ctxerror.New("cannot write last commits").WithCause(err) return ctxerror.New("cannot write last commits").WithCause(err)
} }
utils.Logger().Info(). utils.Logger().Info().
Int("numShards", len(data)). Int("size", len(data)).
Msg("wrote last commits") Msg("wrote last commits")
return nil return nil
} }

@ -711,3 +711,13 @@ func (node *Node) SetSyncFreq(syncFreq int) {
func (node *Node) SetBeaconSyncFreq(syncFreq int) { func (node *Node) SetBeaconSyncFreq(syncFreq int) {
node.beaconSyncFreq = syncFreq node.beaconSyncFreq = syncFreq
} }
// ShutDown gracefully shut down the node server and dump the in-memory blockchain state into DB.
func (node *Node) ShutDown() {
node.Blockchain().Stop()
node.Beaconchain().Stop()
node.StopServices()
node.stopHTTP()
fmt.Printf("Exiting node program...")
os.Exit(0)
}

@ -154,7 +154,7 @@ func (node *Node) HandleMessage(content []byte, sender libp2p_peer.ID) {
Msg("block sync") Msg("block sync")
} else { } else {
// for non-beaconchain node, subscribe to beacon block broadcast // for non-beaconchain node, subscribe to beacon block broadcast
if node.Blockchain().ShardID() != 0 { if node.Blockchain().ShardID() != 0 && node.NodeConfig.Role() != nodeconfig.ExplorerNode {
for _, block := range blocks { for _, block := range blocks {
if block.ShardID() == 0 { if block.ShardID() == 0 {
utils.Logger().Info(). utils.Logger().Info().

Loading…
Cancel
Save