diff --git a/api/service/stagedstreamsync/range.go b/api/service/stagedstreamsync/range.go index de18b02ab..d05a92ed4 100644 --- a/api/service/stagedstreamsync/range.go +++ b/api/service/stagedstreamsync/range.go @@ -81,4 +81,4 @@ func incHash(h common.Hash) common.Hash { a.SetBytes32(h[:]) a.AddUint64(&a, 1) return common.Hash(a.Bytes32()) -} \ No newline at end of file +} diff --git a/api/service/stagedstreamsync/stage_statesync.go b/api/service/stagedstreamsync/stage_statesync.go index 086d0fb41..4928b71b0 100644 --- a/api/service/stagedstreamsync/stage_statesync.go +++ b/api/service/stagedstreamsync/stage_statesync.go @@ -58,8 +58,8 @@ func (sss *StageStateSync) Exec(ctx context.Context, bool, invalidBlockRevert bo // for short range sync, skip this step if !s.state.initSync { return nil - } // only execute this stage in fast/snap sync mode and once we reach to pivot - + } // only execute this stage in fast/snap sync mode and once we reach to pivot + if s.state.status.pivotBlock == nil || s.state.CurrentBlockNumber() != s.state.status.pivotBlock.NumberU64() || s.state.status.statesSynced { diff --git a/core/blockchain.go b/core/blockchain.go index 1f7233f42..f47133bad 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -105,6 +105,8 @@ type BlockChain interface { // Rollback is designed to remove a chain of links from the database that aren't // certain enough to be valid. Rollback(chain []common.Hash) error + // writeHeadBlock writes a new head block + WriteHeadBlock(block *types.Block) error // WriteBlockWithoutState writes only the block and its metadata to the database, // but does not write any state. This is used to construct competing side forks // up to the point where they exceed the canonical total difficulty. diff --git a/core/blockchain_impl.go b/core/blockchain_impl.go index 15527c3fe..c7f01d413 100644 --- a/core/blockchain_impl.go +++ b/core/blockchain_impl.go @@ -34,6 +34,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/common/prque" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/metrics" @@ -69,8 +70,9 @@ import ( ) var ( - headBlockGauge = metrics.NewRegisteredGauge("chain/head/block", nil) - headHeaderGauge = metrics.NewRegisteredGauge("chain/head/header", nil) + headBlockGauge = metrics.NewRegisteredGauge("chain/head/block", nil) + headHeaderGauge = metrics.NewRegisteredGauge("chain/head/header", nil) + headFastBlockGauge = metrics.NewRegisteredGauge("chain/head/receipt", nil) accountReadTimer = metrics.NewRegisteredTimer("chain/account/reads", nil) accountHashTimer = metrics.NewRegisteredTimer("chain/account/hashes", nil) @@ -185,7 +187,8 @@ type BlockChainImpl struct { pendingCrossLinksMutex sync.RWMutex // pending crosslinks lock pendingSlashingCandidatesMU sync.RWMutex // pending slashing candidates - currentBlock atomic.Value // Current head of the block chain + currentBlock atomic.Value // Current head of the block chain + currentFastBlock atomic.Value // Current head of the fast-sync chain (may be above the block chain!) stateCache state.Database // State database to reuse between imports (contains state cache) bodyCache *lru.Cache // Cache for the most recent block bodies @@ -319,6 +322,7 @@ func newBlockChainWithOptions( } var nilBlock *types.Block bc.currentBlock.Store(nilBlock) + bc.currentFastBlock.Store(nilBlock) if err := bc.loadLastState(); err != nil { return nil, err } @@ -612,8 +616,22 @@ func (bc *BlockChainImpl) loadLastState() error { return errors.Wrap(err, "headerChain SetCurrentHeader") } + // Restore the last known head fast block + bc.currentFastBlock.Store(currentBlock) + headFastBlockGauge.Update(int64(currentBlock.NumberU64())) + if head := rawdb.ReadHeadFastBlockHash(bc.db); head != (common.Hash{}) { + if block := bc.GetBlockByHash(head); block != nil { + bc.currentFastBlock.Store(block) + headFastBlockGauge.Update(int64(block.NumberU64())) + } + } + + // Issue a status log for the user + currentFastBlock := bc.CurrentFastBlock() + headerTd := bc.GetTd(currentHeader.Hash(), currentHeader.Number().Uint64()) blockTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64()) + fastTd := bc.GetTd(currentFastBlock.Hash(), currentFastBlock.NumberU64()) utils.Logger().Info(). Str("number", currentHeader.Number().String()). @@ -627,6 +645,12 @@ func (bc *BlockChainImpl) loadLastState() error { Str("td", blockTd.String()). Str("age", common.PrettyAge(time.Unix(currentBlock.Time().Int64(), 0)).String()). Msg("Loaded most recent local full block") + utils.Logger().Info(). + Str("number", currentFastBlock.Number().String()). + Str("hash", currentFastBlock.Hash().Hex()). + Str("td", fastTd.String()). + Str("age", common.PrettyAge(time.Unix(currentFastBlock.Time().Int64(), 0)).String()). + Msg("Loaded most recent local fast block") return nil } @@ -663,16 +687,30 @@ func (bc *BlockChainImpl) setHead(head uint64) error { headBlockGauge.Update(int64(bc.genesisBlock.NumberU64())) } } + // Rewind the fast block in a simpleton way to the target head + if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock != nil && currentHeader.Number().Uint64() < currentFastBlock.NumberU64() { + newHeadFastBlock := bc.GetBlock(currentHeader.Hash(), currentHeader.Number().Uint64()) + bc.currentFastBlock.Store(newHeadFastBlock) + headFastBlockGauge.Update(int64(newHeadFastBlock.NumberU64())) + } // If either blocks reached nil, reset to the genesis state if currentBlock := bc.CurrentBlock(); currentBlock == nil { bc.currentBlock.Store(bc.genesisBlock) headBlockGauge.Update(int64(bc.genesisBlock.NumberU64())) } + if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock == nil { + bc.currentFastBlock.Store(bc.genesisBlock) + headFastBlockGauge.Update(int64(bc.genesisBlock.NumberU64())) + } currentBlock := bc.CurrentBlock() + currentFastBlock := bc.CurrentFastBlock() if err := rawdb.WriteHeadBlockHash(bc.db, currentBlock.Hash()); err != nil { return err } + if err := rawdb.WriteHeadFastBlockHash(bc.db, currentFastBlock.Hash()); err != nil { + return err + } return bc.loadLastState() } @@ -738,6 +776,8 @@ func (bc *BlockChainImpl) resetWithGenesisBlock(genesis *types.Block) error { } bc.currentBlock.Store(bc.genesisBlock) headBlockGauge.Update(int64(bc.genesisBlock.NumberU64())) + bc.currentFastBlock.Store(bc.genesisBlock) + headFastBlockGauge.Update(int64(bc.genesisBlock.NumberU64())) return nil } @@ -839,6 +879,10 @@ func (bc *BlockChainImpl) ExportN(w io.Writer, first uint64, last uint64) error return nil } +func (bc *BlockChainImpl) WriteHeadBlock(block *types.Block) error { + return bc.writeHeadBlock(block) +} + // writeHeadBlock writes a new head block func (bc *BlockChainImpl) writeHeadBlock(block *types.Block) error { // If the block is on a side chain or an unknown one, force other heads onto it too @@ -881,6 +925,9 @@ func (bc *BlockChainImpl) writeHeadBlock(block *types.Block) error { if err := rawdb.WriteHeadFastBlockHash(bc.db, block.Hash()); err != nil { return err } + + bc.currentFastBlock.Store(block) + headFastBlockGauge.Update(int64(block.NumberU64())) } return nil } @@ -894,6 +941,9 @@ func (bc *BlockChainImpl) tikvFastForward(block *types.Block, logs []*types.Log) return errors.Wrap(err, "HeaderChain SetCurrentHeader") } + bc.currentFastBlock.Store(block) + headFastBlockGauge.Update(int64(block.NumberU64())) + var events []interface{} events = append(events, ChainEvent{block, block.Hash(), logs}) events = append(events, ChainHeadEvent{block}) @@ -1195,6 +1245,14 @@ func (bc *BlockChainImpl) Rollback(chain []common.Hash) error { } } } + if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock != nil && currentFastBlock.Hash() == hash { + newFastBlock := bc.GetBlock(currentFastBlock.ParentHash(), currentFastBlock.NumberU64()-1) + if newFastBlock != nil { + bc.currentFastBlock.Store(newFastBlock) + headFastBlockGauge.Update(int64(newFastBlock.NumberU64())) + rawdb.WriteHeadFastBlockHash(bc.db, newFastBlock.Hash()) + } + } if currentBlock := bc.CurrentBlock(); currentBlock != nil && currentBlock.Hash() == hash { newBlock := bc.GetBlock(currentBlock.ParentHash(), currentBlock.NumberU64()-1) if newBlock != nil { @@ -1792,7 +1850,7 @@ func (bc *BlockChainImpl) insertChain(chain types.Blocks, verifyHeaders bool) (i // Write the block to the chain and get the status. substart = time.Now() - status, err := bc.writeBlockWithState( + status, err := bc.WriteBlockWithState( block, receipts, cxReceipts, stakeMsgs, payout, state, ) if err != nil { @@ -1848,125 +1906,6 @@ func (bc *BlockChainImpl) insertChain(chain types.Blocks, verifyHeaders bool) (i return 0, events, coalescedLogs, nil } -// insertChainWithoutBlockExecution adds a set of blocks to blockchain without adding states -func (bc *BlockChainImpl) insertChainWithoutBlockExecution(chain types.Blocks, verifyHeaders bool) (int, []interface{}, []*types.Log, error) { - // Sanity check that we have something meaningful to import - if len(chain) == 0 { - return 0, nil, nil, nil - } - // Do a sanity check that the provided chain is actually ordered and linked - for i := 1; i < len(chain); i++ { - if chain[i].NumberU64() != chain[i-1].NumberU64()+1 || chain[i].ParentHash() != chain[i-1].Hash() { - // Chain broke ancestry, log a message (programming error) and skip insertion - utils.Logger().Error(). - Str("number", chain[i].Number().String()). - Str("hash", chain[i].Hash().Hex()). - Str("parent", chain[i].ParentHash().Hex()). - Str("prevnumber", chain[i-1].Number().String()). - Str("prevhash", chain[i-1].Hash().Hex()). - Msg("insertChain: non contiguous block insert") - - return 0, nil, nil, fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, chain[i-1].NumberU64(), - chain[i-1].Hash().Bytes()[:4], i, chain[i].NumberU64(), chain[i].Hash().Bytes()[:4], chain[i].ParentHash().Bytes()[:4]) - } - } - - bc.chainmu.Lock() - defer bc.chainmu.Unlock() - - var verifyHeadersResults <-chan error - - // If the block header chain has not been verified, conduct header verification here. - if verifyHeaders { - headers := make([]*block.Header, len(chain)) - seals := make([]bool, len(chain)) - - for i, block := range chain { - headers[i] = block.Header() - seals[i] = true - } - // Note that VerifyHeaders verifies headers in the chain in parallel - abort, results := bc.Engine().VerifyHeaders(bc, headers, seals) - verifyHeadersResults = results - defer close(abort) - } - - // Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss) - //senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number()), chain) - - // Iterate over the blocks and insert when the verifier permits - for i, block := range chain { - // If the chain is terminating, stop processing blocks - if atomic.LoadInt32(&bc.procInterrupt) == 1 { - utils.Logger().Debug().Msg("Premature abort during blocks processing") - break - } - - var err error - if verifyHeaders { - err = <-verifyHeadersResults - } - if err == nil { - err = bc.Validator().ValidateBody(block) - } - switch { - case err == ErrKnownBlock: - // Block and state both already known. However if the current block is below - // this number we did a rollback and we should reimport it nonetheless. - if bc.CurrentBlock().NumberU64() >= block.NumberU64() { - continue - } - - case err == consensus_engine.ErrFutureBlock: - // Allow up to MaxFuture second in the future blocks. If this limit is exceeded - // the chain is discarded and processed at a later time if given. - max := big.NewInt(time.Now().Unix() + maxTimeFutureBlocks) - if block.Time().Cmp(max) > 0 { - return i, nil, nil, fmt.Errorf("future block: %v > %v", block.Time(), max) - } - bc.futureBlocks.Add(block.Hash(), block) - continue - - case err == consensus_engine.ErrUnknownAncestor && bc.futureBlocks.Contains(block.ParentHash()): - bc.futureBlocks.Add(block.Hash(), block) - continue - - case err == consensus_engine.ErrPrunedAncestor: - var winner []*types.Block - parent := bc.GetBlock(block.ParentHash(), block.NumberU64()-1) - for parent != nil && !bc.HasState(parent.Root()) { - winner = append(winner, parent) - parent = bc.GetBlock(parent.ParentHash(), parent.NumberU64()-1) - } - for j := 0; j < len(winner)/2; j++ { - winner[j], winner[len(winner)-1-j] = winner[len(winner)-1-j], winner[j] - } - // Prune in case non-empty winner chain - if len(winner) > 0 { - // Import all the pruned blocks to make the state available - bc.chainmu.Unlock() - _, _, _, err := bc.insertChainWithoutBlockExecution(winner, true /* verifyHeaders */) - bc.chainmu.Lock() - if err != nil { - return i, nil, nil, err - } - } - - case err != nil: - bc.reportBlock(block, nil, err) - return i, nil, nil, err - } - - // Create a new statedb using the parent block and report an - // error if it fails. - if err = bc.WriteBlockWithoutState(block); err != nil { - return i, nil, nil, err - } - } - - return 0, nil, nil, nil -} - // insertStats tracks and reports on block insertion. type insertStats struct { queued, processed, ignored int