return back deleted codes, fix rebase issues, goimports

pull/4465/head
“GheisMohammadi” 12 months ago
parent 0901e92bf8
commit f3ce9f3ac9
No known key found for this signature in database
GPG Key ID: 15073AED3829FE90
  1. 2
      api/service/stagedstreamsync/range.go
  2. 4
      api/service/stagedstreamsync/stage_statesync.go
  3. 2
      core/blockchain.go
  4. 185
      core/blockchain_impl.go

@ -81,4 +81,4 @@ func incHash(h common.Hash) common.Hash {
a.SetBytes32(h[:])
a.AddUint64(&a, 1)
return common.Hash(a.Bytes32())
}
}

@ -58,8 +58,8 @@ func (sss *StageStateSync) Exec(ctx context.Context, bool, invalidBlockRevert bo
// for short range sync, skip this step
if !s.state.initSync {
return nil
} // only execute this stage in fast/snap sync mode and once we reach to pivot
} // only execute this stage in fast/snap sync mode and once we reach to pivot
if s.state.status.pivotBlock == nil ||
s.state.CurrentBlockNumber() != s.state.status.pivotBlock.NumberU64() ||
s.state.status.statesSynced {

@ -105,6 +105,8 @@ type BlockChain interface {
// Rollback is designed to remove a chain of links from the database that aren't
// certain enough to be valid.
Rollback(chain []common.Hash) error
// writeHeadBlock writes a new head block
WriteHeadBlock(block *types.Block) error
// WriteBlockWithoutState writes only the block and its metadata to the database,
// but does not write any state. This is used to construct competing side forks
// up to the point where they exceed the canonical total difficulty.

@ -34,6 +34,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/common/prque"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/metrics"
@ -69,8 +70,9 @@ import (
)
var (
headBlockGauge = metrics.NewRegisteredGauge("chain/head/block", nil)
headHeaderGauge = metrics.NewRegisteredGauge("chain/head/header", nil)
headBlockGauge = metrics.NewRegisteredGauge("chain/head/block", nil)
headHeaderGauge = metrics.NewRegisteredGauge("chain/head/header", nil)
headFastBlockGauge = metrics.NewRegisteredGauge("chain/head/receipt", nil)
accountReadTimer = metrics.NewRegisteredTimer("chain/account/reads", nil)
accountHashTimer = metrics.NewRegisteredTimer("chain/account/hashes", nil)
@ -185,7 +187,8 @@ type BlockChainImpl struct {
pendingCrossLinksMutex sync.RWMutex // pending crosslinks lock
pendingSlashingCandidatesMU sync.RWMutex // pending slashing candidates
currentBlock atomic.Value // Current head of the block chain
currentBlock atomic.Value // Current head of the block chain
currentFastBlock atomic.Value // Current head of the fast-sync chain (may be above the block chain!)
stateCache state.Database // State database to reuse between imports (contains state cache)
bodyCache *lru.Cache // Cache for the most recent block bodies
@ -319,6 +322,7 @@ func newBlockChainWithOptions(
}
var nilBlock *types.Block
bc.currentBlock.Store(nilBlock)
bc.currentFastBlock.Store(nilBlock)
if err := bc.loadLastState(); err != nil {
return nil, err
}
@ -612,8 +616,22 @@ func (bc *BlockChainImpl) loadLastState() error {
return errors.Wrap(err, "headerChain SetCurrentHeader")
}
// Restore the last known head fast block
bc.currentFastBlock.Store(currentBlock)
headFastBlockGauge.Update(int64(currentBlock.NumberU64()))
if head := rawdb.ReadHeadFastBlockHash(bc.db); head != (common.Hash{}) {
if block := bc.GetBlockByHash(head); block != nil {
bc.currentFastBlock.Store(block)
headFastBlockGauge.Update(int64(block.NumberU64()))
}
}
// Issue a status log for the user
currentFastBlock := bc.CurrentFastBlock()
headerTd := bc.GetTd(currentHeader.Hash(), currentHeader.Number().Uint64())
blockTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64())
fastTd := bc.GetTd(currentFastBlock.Hash(), currentFastBlock.NumberU64())
utils.Logger().Info().
Str("number", currentHeader.Number().String()).
@ -627,6 +645,12 @@ func (bc *BlockChainImpl) loadLastState() error {
Str("td", blockTd.String()).
Str("age", common.PrettyAge(time.Unix(currentBlock.Time().Int64(), 0)).String()).
Msg("Loaded most recent local full block")
utils.Logger().Info().
Str("number", currentFastBlock.Number().String()).
Str("hash", currentFastBlock.Hash().Hex()).
Str("td", fastTd.String()).
Str("age", common.PrettyAge(time.Unix(currentFastBlock.Time().Int64(), 0)).String()).
Msg("Loaded most recent local fast block")
return nil
}
@ -663,16 +687,30 @@ func (bc *BlockChainImpl) setHead(head uint64) error {
headBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))
}
}
// Rewind the fast block in a simpleton way to the target head
if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock != nil && currentHeader.Number().Uint64() < currentFastBlock.NumberU64() {
newHeadFastBlock := bc.GetBlock(currentHeader.Hash(), currentHeader.Number().Uint64())
bc.currentFastBlock.Store(newHeadFastBlock)
headFastBlockGauge.Update(int64(newHeadFastBlock.NumberU64()))
}
// If either blocks reached nil, reset to the genesis state
if currentBlock := bc.CurrentBlock(); currentBlock == nil {
bc.currentBlock.Store(bc.genesisBlock)
headBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))
}
if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock == nil {
bc.currentFastBlock.Store(bc.genesisBlock)
headFastBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))
}
currentBlock := bc.CurrentBlock()
currentFastBlock := bc.CurrentFastBlock()
if err := rawdb.WriteHeadBlockHash(bc.db, currentBlock.Hash()); err != nil {
return err
}
if err := rawdb.WriteHeadFastBlockHash(bc.db, currentFastBlock.Hash()); err != nil {
return err
}
return bc.loadLastState()
}
@ -738,6 +776,8 @@ func (bc *BlockChainImpl) resetWithGenesisBlock(genesis *types.Block) error {
}
bc.currentBlock.Store(bc.genesisBlock)
headBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))
bc.currentFastBlock.Store(bc.genesisBlock)
headFastBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))
return nil
}
@ -839,6 +879,10 @@ func (bc *BlockChainImpl) ExportN(w io.Writer, first uint64, last uint64) error
return nil
}
func (bc *BlockChainImpl) WriteHeadBlock(block *types.Block) error {
return bc.writeHeadBlock(block)
}
// writeHeadBlock writes a new head block
func (bc *BlockChainImpl) writeHeadBlock(block *types.Block) error {
// If the block is on a side chain or an unknown one, force other heads onto it too
@ -881,6 +925,9 @@ func (bc *BlockChainImpl) writeHeadBlock(block *types.Block) error {
if err := rawdb.WriteHeadFastBlockHash(bc.db, block.Hash()); err != nil {
return err
}
bc.currentFastBlock.Store(block)
headFastBlockGauge.Update(int64(block.NumberU64()))
}
return nil
}
@ -894,6 +941,9 @@ func (bc *BlockChainImpl) tikvFastForward(block *types.Block, logs []*types.Log)
return errors.Wrap(err, "HeaderChain SetCurrentHeader")
}
bc.currentFastBlock.Store(block)
headFastBlockGauge.Update(int64(block.NumberU64()))
var events []interface{}
events = append(events, ChainEvent{block, block.Hash(), logs})
events = append(events, ChainHeadEvent{block})
@ -1195,6 +1245,14 @@ func (bc *BlockChainImpl) Rollback(chain []common.Hash) error {
}
}
}
if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock != nil && currentFastBlock.Hash() == hash {
newFastBlock := bc.GetBlock(currentFastBlock.ParentHash(), currentFastBlock.NumberU64()-1)
if newFastBlock != nil {
bc.currentFastBlock.Store(newFastBlock)
headFastBlockGauge.Update(int64(newFastBlock.NumberU64()))
rawdb.WriteHeadFastBlockHash(bc.db, newFastBlock.Hash())
}
}
if currentBlock := bc.CurrentBlock(); currentBlock != nil && currentBlock.Hash() == hash {
newBlock := bc.GetBlock(currentBlock.ParentHash(), currentBlock.NumberU64()-1)
if newBlock != nil {
@ -1792,7 +1850,7 @@ func (bc *BlockChainImpl) insertChain(chain types.Blocks, verifyHeaders bool) (i
// Write the block to the chain and get the status.
substart = time.Now()
status, err := bc.writeBlockWithState(
status, err := bc.WriteBlockWithState(
block, receipts, cxReceipts, stakeMsgs, payout, state,
)
if err != nil {
@ -1848,125 +1906,6 @@ func (bc *BlockChainImpl) insertChain(chain types.Blocks, verifyHeaders bool) (i
return 0, events, coalescedLogs, nil
}
// insertChainWithoutBlockExecution adds a set of blocks to blockchain without adding states
func (bc *BlockChainImpl) insertChainWithoutBlockExecution(chain types.Blocks, verifyHeaders bool) (int, []interface{}, []*types.Log, error) {
// Sanity check that we have something meaningful to import
if len(chain) == 0 {
return 0, nil, nil, nil
}
// Do a sanity check that the provided chain is actually ordered and linked
for i := 1; i < len(chain); i++ {
if chain[i].NumberU64() != chain[i-1].NumberU64()+1 || chain[i].ParentHash() != chain[i-1].Hash() {
// Chain broke ancestry, log a message (programming error) and skip insertion
utils.Logger().Error().
Str("number", chain[i].Number().String()).
Str("hash", chain[i].Hash().Hex()).
Str("parent", chain[i].ParentHash().Hex()).
Str("prevnumber", chain[i-1].Number().String()).
Str("prevhash", chain[i-1].Hash().Hex()).
Msg("insertChain: non contiguous block insert")
return 0, nil, nil, fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, chain[i-1].NumberU64(),
chain[i-1].Hash().Bytes()[:4], i, chain[i].NumberU64(), chain[i].Hash().Bytes()[:4], chain[i].ParentHash().Bytes()[:4])
}
}
bc.chainmu.Lock()
defer bc.chainmu.Unlock()
var verifyHeadersResults <-chan error
// If the block header chain has not been verified, conduct header verification here.
if verifyHeaders {
headers := make([]*block.Header, len(chain))
seals := make([]bool, len(chain))
for i, block := range chain {
headers[i] = block.Header()
seals[i] = true
}
// Note that VerifyHeaders verifies headers in the chain in parallel
abort, results := bc.Engine().VerifyHeaders(bc, headers, seals)
verifyHeadersResults = results
defer close(abort)
}
// Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss)
//senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number()), chain)
// Iterate over the blocks and insert when the verifier permits
for i, block := range chain {
// If the chain is terminating, stop processing blocks
if atomic.LoadInt32(&bc.procInterrupt) == 1 {
utils.Logger().Debug().Msg("Premature abort during blocks processing")
break
}
var err error
if verifyHeaders {
err = <-verifyHeadersResults
}
if err == nil {
err = bc.Validator().ValidateBody(block)
}
switch {
case err == ErrKnownBlock:
// Block and state both already known. However if the current block is below
// this number we did a rollback and we should reimport it nonetheless.
if bc.CurrentBlock().NumberU64() >= block.NumberU64() {
continue
}
case err == consensus_engine.ErrFutureBlock:
// Allow up to MaxFuture second in the future blocks. If this limit is exceeded
// the chain is discarded and processed at a later time if given.
max := big.NewInt(time.Now().Unix() + maxTimeFutureBlocks)
if block.Time().Cmp(max) > 0 {
return i, nil, nil, fmt.Errorf("future block: %v > %v", block.Time(), max)
}
bc.futureBlocks.Add(block.Hash(), block)
continue
case err == consensus_engine.ErrUnknownAncestor && bc.futureBlocks.Contains(block.ParentHash()):
bc.futureBlocks.Add(block.Hash(), block)
continue
case err == consensus_engine.ErrPrunedAncestor:
var winner []*types.Block
parent := bc.GetBlock(block.ParentHash(), block.NumberU64()-1)
for parent != nil && !bc.HasState(parent.Root()) {
winner = append(winner, parent)
parent = bc.GetBlock(parent.ParentHash(), parent.NumberU64()-1)
}
for j := 0; j < len(winner)/2; j++ {
winner[j], winner[len(winner)-1-j] = winner[len(winner)-1-j], winner[j]
}
// Prune in case non-empty winner chain
if len(winner) > 0 {
// Import all the pruned blocks to make the state available
bc.chainmu.Unlock()
_, _, _, err := bc.insertChainWithoutBlockExecution(winner, true /* verifyHeaders */)
bc.chainmu.Lock()
if err != nil {
return i, nil, nil, err
}
}
case err != nil:
bc.reportBlock(block, nil, err)
return i, nil, nil, err
}
// Create a new statedb using the parent block and report an
// error if it fails.
if err = bc.WriteBlockWithoutState(block); err != nil {
return i, nil, nil, err
}
}
return 0, nil, nil, nil
}
// insertStats tracks and reports on block insertion.
type insertStats struct {
queued, processed, ignored int

Loading…
Cancel
Save