add block insertion without execution to blockchain implementation

pull/4465/head
“GheisMohammadi” 1 year ago
parent e11b6ef122
commit 91034682c7
No known key found for this signature in database
GPG Key ID: 15073AED3829FE90
  1. 18
      core/blockchain.go
  2. 147
      core/blockchain_impl.go
  3. 4
      core/blockchain_stub.go
  4. 2
      core/epochchain.go
  5. 2
      hmy/downloader/adapter.go
  6. 2
      hmy/downloader/beaconhelper.go
  7. 8
      hmy/downloader/downloader.go
  8. 2
      hmy/downloader/longrange.go
  9. 4
      hmy/downloader/shortrange.go
  10. 2
      node/node_handler_test.go

@ -100,6 +100,18 @@ type BlockChain interface {
// Rollback is designed to remove a chain of links from the database that aren't // Rollback is designed to remove a chain of links from the database that aren't
// certain enough to be valid. // certain enough to be valid.
Rollback(chain []common.Hash) error Rollback(chain []common.Hash) error
// WriteBlockWithoutState writes only the block and its metadata to the database,
// but does not write any state. This is used to construct competing side forks
// up to the point where they exceed the canonical total difficulty.
WriteBlockWithoutState(block *types.Block) (err error)
// WriteBlockWithState writes the block and all associated state to the database.
WriteBlockWithState(
block *types.Block, receipts []*types.Receipt,
cxReceipts []*types.CXReceipt,
stakeMsgs []types2.StakeMsg,
paid reward.Reader,
state *state.DB,
) (status WriteStatus, err error)
// GetMaxGarbageCollectedBlockNumber .. // GetMaxGarbageCollectedBlockNumber ..
GetMaxGarbageCollectedBlockNumber() int64 GetMaxGarbageCollectedBlockNumber() int64
// InsertChain attempts to insert the given batch of blocks in to the canonical // InsertChain attempts to insert the given batch of blocks in to the canonical
@ -108,9 +120,9 @@ type BlockChain interface {
// wrong. // wrong.
// //
// After insertion is done, all accumulated events will be fired. // After insertion is done, all accumulated events will be fired.
InsertChain(chain types.Blocks, verifyHeaders bool) (int, error) InsertChain(chain types.Blocks, verifyHeaders bool, blockExecution bool) (int, error)
// LeaderRotationMeta returns info about leader rotation. // LeaderRotationMeta returns the number of continuous blocks by the leader.
LeaderRotationMeta() LeaderRotationMeta LeaderRotationMeta() (publicKeyBytes []byte, epoch, count, shifts uint64, err error)
// BadBlocks returns a list of the last 'bad blocks' that // BadBlocks returns a list of the last 'bad blocks' that
// the client has seen on the network. // the client has seen on the network.
BadBlocks() []BadBlock BadBlocks() []BadBlock

@ -1194,7 +1194,18 @@ func (bc *BlockChainImpl) Rollback(chain []common.Hash) error {
var lastWrite uint64 var lastWrite uint64
func (bc *BlockChainImpl) writeBlockWithState( func (bc *BlockChainImpl) WriteBlockWithoutState(block *types.Block) (err error) {
bc.chainmu.Lock()
defer bc.chainmu.Unlock()
if err := rawdb.WriteBlock(bc.db, block); err != nil {
return err
}
return nil
}
func (bc *BlockChainImpl) WriteBlockWithState(
block *types.Block, receipts []*types.Receipt, block *types.Block, receipts []*types.Receipt,
cxReceipts []*types.CXReceipt, cxReceipts []*types.CXReceipt,
stakeMsgs []staking.StakeMsg, stakeMsgs []staking.StakeMsg,
@ -1348,7 +1359,7 @@ func (bc *BlockChainImpl) GetMaxGarbageCollectedBlockNumber() int64 {
return bc.maxGarbCollectedBlkNum return bc.maxGarbCollectedBlkNum
} }
func (bc *BlockChainImpl) InsertChain(chain types.Blocks, verifyHeaders bool) (int, error) { func (bc *BlockChainImpl) InsertChain(chain types.Blocks, verifyHeaders bool, blockExecution bool) (int, error) {
// if in tikv mode, writer node need preempt master or come be a follower // if in tikv mode, writer node need preempt master or come be a follower
if bc.isInitTiKV() && !bc.tikvPreemptMaster(bc.rangeBlock(chain)) { if bc.isInitTiKV() && !bc.tikvPreemptMaster(bc.rangeBlock(chain)) {
return len(chain), nil return len(chain), nil
@ -1392,10 +1403,17 @@ func (bc *BlockChainImpl) LeaderRotationMeta() LeaderRotationMeta {
return bc.leaderRotationMeta.Clone() return bc.leaderRotationMeta.Clone()
} }
func (bc *BlockChainImpl) insertChain(chain types.Blocks, verifyHeaders bool, blockExecution bool) (int, []interface{}, []*types.Log, error) {
if blockExecution {
return bc.insertChainWithBlockExecution(chain, verifyHeaders)
}
return bc.insertChainWithoutBlockExecution(chain, verifyHeaders)
}
// insertChain will execute the actual chain insertion and event aggregation. The // insertChain will execute the actual chain insertion and event aggregation. The
// only reason this method exists as a separate one is to make locking cleaner // only reason this method exists as a separate one is to make locking cleaner
// with deferred statements. // with deferred statements.
func (bc *BlockChainImpl) insertChain(chain types.Blocks, verifyHeaders bool) (int, []interface{}, []*types.Log, error) { func (bc *BlockChainImpl) insertChainWithBlockExecution(chain types.Blocks, verifyHeaders bool) (int, []interface{}, []*types.Log, error) {
// Sanity check that we have something meaningful to import // Sanity check that we have something meaningful to import
if len(chain) == 0 { if len(chain) == 0 {
return 0, nil, nil, ErrEmptyChain return 0, nil, nil, ErrEmptyChain
@ -1506,7 +1524,9 @@ func (bc *BlockChainImpl) insertChain(chain types.Blocks, verifyHeaders bool) (i
// Prune in case non-empty winner chain // Prune in case non-empty winner chain
if len(winner) > 0 { if len(winner) > 0 {
// Import all the pruned blocks to make the state available // Import all the pruned blocks to make the state available
_, evs, logs, err := bc.insertChain(winner, true /* verifyHeaders */) bc.chainmu.Unlock()
_, evs, logs, err := bc.insertChainWithBlockExecution(winner, true /* verifyHeaders */)
bc.chainmu.Lock()
events, coalescedLogs = evs, logs events, coalescedLogs = evs, logs
if err != nil { if err != nil {
@ -1639,6 +1659,125 @@ func (bc *BlockChainImpl) insertChain(chain types.Blocks, verifyHeaders bool) (i
return 0, events, coalescedLogs, nil return 0, events, coalescedLogs, nil
} }
//receiptChain []types.Receipts,
func (bc *BlockChainImpl) insertChainWithoutBlockExecution(chain types.Blocks, verifyHeaders bool) (int, []interface{}, []*types.Log, error) {
// Sanity check that we have something meaningful to import
if len(chain) == 0 {
return 0, nil, nil, nil
}
// Do a sanity check that the provided chain is actually ordered and linked
for i := 1; i < len(chain); i++ {
if chain[i].NumberU64() != chain[i-1].NumberU64()+1 || chain[i].ParentHash() != chain[i-1].Hash() {
// Chain broke ancestry, log a message (programming error) and skip insertion
utils.Logger().Error().
Str("number", chain[i].Number().String()).
Str("hash", chain[i].Hash().Hex()).
Str("parent", chain[i].ParentHash().Hex()).
Str("prevnumber", chain[i-1].Number().String()).
Str("prevhash", chain[i-1].Hash().Hex()).
Msg("insertChain: non contiguous block insert")
return 0, nil, nil, fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, chain[i-1].NumberU64(),
chain[i-1].Hash().Bytes()[:4], i, chain[i].NumberU64(), chain[i].Hash().Bytes()[:4], chain[i].ParentHash().Bytes()[:4])
}
}
bc.chainmu.Lock()
defer bc.chainmu.Unlock()
var verifyHeadersResults <-chan error
// If the block header chain has not been verified, conduct header verification here.
if verifyHeaders {
headers := make([]*block.Header, len(chain))
seals := make([]bool, len(chain))
for i, block := range chain {
headers[i] = block.Header()
seals[i] = true
}
// Note that VerifyHeaders verifies headers in the chain in parallel
abort, results := bc.Engine().VerifyHeaders(bc, headers, seals)
verifyHeadersResults = results
defer close(abort)
}
// Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss)
//senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number()), chain)
// Iterate over the blocks and insert when the verifier permits
for i, block := range chain {
// If the chain is terminating, stop processing blocks
if atomic.LoadInt32(&bc.procInterrupt) == 1 {
utils.Logger().Debug().Msg("Premature abort during blocks processing")
break
}
var err error
if verifyHeaders {
err = <-verifyHeadersResults
}
if err == nil {
err = bc.Validator().ValidateBody(block)
}
switch {
case err == ErrKnownBlock:
// Block and state both already known. However if the current block is below
// this number we did a rollback and we should reimport it nonetheless.
if bc.CurrentBlock().NumberU64() >= block.NumberU64() {
continue
}
case err == consensus_engine.ErrFutureBlock:
// Allow up to MaxFuture second in the future blocks. If this limit is exceeded
// the chain is discarded and processed at a later time if given.
max := big.NewInt(time.Now().Unix() + maxTimeFutureBlocks)
if block.Time().Cmp(max) > 0 {
return i, nil, nil, fmt.Errorf("future block: %v > %v", block.Time(), max)
}
bc.futureBlocks.Add(block.Hash(), block)
continue
case err == consensus_engine.ErrUnknownAncestor && bc.futureBlocks.Contains(block.ParentHash()):
bc.futureBlocks.Add(block.Hash(), block)
continue
case err == consensus_engine.ErrPrunedAncestor:
var winner []*types.Block
parent := bc.GetBlock(block.ParentHash(), block.NumberU64()-1)
for parent != nil && !bc.HasState(parent.Root()) {
winner = append(winner, parent)
parent = bc.GetBlock(parent.ParentHash(), parent.NumberU64()-1)
}
for j := 0; j < len(winner)/2; j++ {
winner[j], winner[len(winner)-1-j] = winner[len(winner)-1-j], winner[j]
}
// Prune in case non-empty winner chain
if len(winner) > 0 {
// Import all the pruned blocks to make the state available
bc.chainmu.Unlock()
_, _, _, err := bc.insertChainWithoutBlockExecution(winner, true /* verifyHeaders */)
bc.chainmu.Lock()
if err != nil {
return i, nil, nil, err
}
}
case err != nil:
bc.reportBlock(block, nil, err)
return i, nil, nil, err
}
// Create a new statedb using the parent block and report an
// error if it fails.
if err = bc.WriteBlockWithoutState(block); err != nil {
return i, nil, nil, err
}
}
return 0, nil, nil, nil
}
// insertStats tracks and reports on block insertion. // insertStats tracks and reports on block insertion.
type insertStats struct { type insertStats struct {
queued, processed, ignored int queued, processed, ignored int

@ -120,7 +120,7 @@ func (a Stub) Rollback(chain []common.Hash) error {
return errors.Errorf("method Rollback not implemented for %s", a.Name) return errors.Errorf("method Rollback not implemented for %s", a.Name)
} }
func (a Stub) WriteBlockWithoutState(block *types.Block, td *big.Int) (err error) { func (a Stub) WriteBlockWithoutState(block *types.Block) (err error) {
return errors.Errorf("method WriteBlockWithoutState not implemented for %s", a.Name) return errors.Errorf("method WriteBlockWithoutState not implemented for %s", a.Name)
} }
@ -132,7 +132,7 @@ func (a Stub) GetMaxGarbageCollectedBlockNumber() int64 {
return 0 return 0
} }
func (a Stub) InsertChain(chain types.Blocks, verifyHeaders bool) (int, error) { func (a Stub) InsertChain(chain types.Blocks, verifyHeaders bool, blockExecution bool) (int, error) {
return 0, errors.Errorf("method InsertChain not implemented for %s", a.Name) return 0, errors.Errorf("method InsertChain not implemented for %s", a.Name)
} }

@ -114,7 +114,7 @@ func (bc *EpochChain) Stop() {
}) })
} }
func (bc *EpochChain) InsertChain(blocks types.Blocks, _ bool) (int, error) { func (bc *EpochChain) InsertChain(blocks types.Blocks, _ bool, _ bool) (int, error) {
if len(blocks) == 0 { if len(blocks) == 0 {
return 0, nil return 0, nil
} }

@ -27,6 +27,6 @@ type blockChain interface {
engine.ChainReader engine.ChainReader
Engine() engine.Engine Engine() engine.Engine
InsertChain(chain types.Blocks, verifyHeaders bool) (int, error) InsertChain(chain types.Blocks, verifyHeaders bool, blockExecution bool) (int, error)
WriteCommitSig(blockNum uint64, lastCommits []byte) error WriteCommitSig(blockNum uint64, lastCommits []byte) error
} }

@ -123,7 +123,7 @@ func (bh *beaconHelper) insertLastMileBlocks() (inserted int, bn uint64, err err
} }
// TODO: Instruct the beacon helper to verify signatures. This may require some forks // TODO: Instruct the beacon helper to verify signatures. This may require some forks
// in pub-sub message (add commit sigs in node.block.sync messages) // in pub-sub message (add commit sigs in node.block.sync messages)
if _, err = bh.bc.InsertChain(types.Blocks{b}, true); err != nil { if _, err = bh.bc.InsertChain(types.Blocks{b}, true, true); err != nil {
bn-- bn--
return return
} }

@ -280,16 +280,16 @@ func (e *sigVerifyErr) Error() string {
return fmt.Sprintf("[VerifyHeaderSignature] %v", e.err.Error()) return fmt.Sprintf("[VerifyHeaderSignature] %v", e.err.Error())
} }
func verifyAndInsertBlocks(bc blockChain, blocks types.Blocks) (int, error) { func verifyAndInsertBlocks(bc blockChain, blockExecution bool, blocks types.Blocks) (int, error) {
for i, block := range blocks { for i, block := range blocks {
if err := verifyAndInsertBlock(bc, block, blocks[i+1:]...); err != nil { if err := verifyAndInsertBlock(bc, block, blockExecution, blocks[i+1:]...); err != nil {
return i, err return i, err
} }
} }
return len(blocks), nil return len(blocks), nil
} }
func verifyAndInsertBlock(bc blockChain, block *types.Block, nextBlocks ...*types.Block) error { func verifyAndInsertBlock(bc blockChain, block *types.Block, blockExecution bool, nextBlocks ...*types.Block) error {
var ( var (
sigBytes bls.SerializedSignature sigBytes bls.SerializedSignature
bitmap []byte bitmap []byte
@ -314,7 +314,7 @@ func verifyAndInsertBlock(bc blockChain, block *types.Block, nextBlocks ...*type
if err := bc.Engine().VerifyHeader(bc, block.Header(), true); err != nil { if err := bc.Engine().VerifyHeader(bc, block.Header(), true); err != nil {
return errors.Wrap(err, "[VerifyHeader]") return errors.Wrap(err, "[VerifyHeader]")
} }
if _, err := bc.InsertChain(types.Blocks{block}, false); err != nil { if _, err := bc.InsertChain(types.Blocks{block}, false, blockExecution); err != nil {
return errors.Wrap(err, "[InsertChain]") return errors.Wrap(err, "[InsertChain]")
} }
return nil return nil

@ -210,7 +210,7 @@ func (lsi *lrSyncIter) processBlocks(results []*blockResult, targetBN uint64) {
blocks := blockResultsToBlocks(results) blocks := blockResultsToBlocks(results)
for i, block := range blocks { for i, block := range blocks {
if err := verifyAndInsertBlock(lsi.bc, block); err != nil { if err := verifyAndInsertBlock(lsi.bc, block, true); err != nil {
lsi.logger.Warn().Err(err).Uint64("target block", targetBN). lsi.logger.Warn().Err(err).Uint64("target block", targetBN).
Uint64("block number", block.NumberU64()). Uint64("block number", block.NumberU64()).
Msg("insert blocks failed in long range") Msg("insert blocks failed in long range")

@ -74,7 +74,7 @@ func (d *Downloader) doShortRangeSync() (int, error) {
} }
d.logger.Info().Int("num blocks", len(blocks)).Msg("getBlockByHashes result") d.logger.Info().Int("num blocks", len(blocks)).Msg("getBlockByHashes result")
n, err := verifyAndInsertBlocks(d.bc, blocks) n, err := verifyAndInsertBlocks(d.bc, true, blocks)
numBlocksInsertedShortRangeHistogramVec.With(d.promLabels()).Observe(float64(n)) numBlocksInsertedShortRangeHistogramVec.With(d.promLabels()).Observe(float64(n))
if err != nil { if err != nil {
d.logger.Warn().Err(err).Int("blocks inserted", n).Msg("Insert block failed") d.logger.Warn().Err(err).Int("blocks inserted", n).Msg("Insert block failed")
@ -131,7 +131,7 @@ func (d *Downloader) doShortRangeSyncForEpochSync() (int, error) {
// short circuit for no sync is needed // short circuit for no sync is needed
return 0, nil return 0, nil
} }
n, err := d.bc.InsertChain(blocks, true) n, err := d.bc.InsertChain(blocks, true, true)
numBlocksInsertedShortRangeHistogramVec.With(d.promLabels()).Observe(float64(n)) numBlocksInsertedShortRangeHistogramVec.With(d.promLabels()).Observe(float64(n))
if err != nil { if err != nil {
sh.removeStreams([]sttypes.StreamID{streamID}) // Data provided by remote nodes is corrupted sh.removeStreams([]sttypes.StreamID{streamID}) // Data provided by remote nodes is corrupted

@ -69,7 +69,7 @@ func TestAddNewBlock(t *testing.T) {
commitSigs, func() uint64 { return 0 }, common.Address{}, nil, nil, commitSigs, func() uint64 { return 0 }, common.Address{}, nil, nil,
) )
_, err = node.Blockchain().InsertChain([]*types.Block{block}, true) _, err = node.Blockchain().InsertChain([]*types.Block{block}, true, true)
if err != nil { if err != nil {
t.Errorf("error when adding new block %v", err) t.Errorf("error when adding new block %v", err)
} }

Loading…
Cancel
Save