Removed unused functions and improved locks usage. (#4572)

pull/4574/head
Konstantin 1 year ago committed by GitHub
parent 1f974af163
commit 368fc9e07b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 31
      core/blockchain.go
  2. 149
      core/blockchain_impl.go
  3. 51
      core/headerchain.go

@ -47,11 +47,6 @@ type Options struct {
type BlockChain interface { type BlockChain interface {
// ValidateNewBlock validates new block. // ValidateNewBlock validates new block.
ValidateNewBlock(block *types.Block, beaconChain BlockChain) error ValidateNewBlock(block *types.Block, beaconChain BlockChain) error
// SetHead rewinds the local chain to a new head. In the case of headers, everything
// above the new head will be deleted and the new one set. In the case of blocks
// though, the head may be further rewound if block bodies are missing (non-archive
// nodes after a fast sync).
SetHead(head uint64) error
// ShardID returns the shard Id of the blockchain. // ShardID returns the shard Id of the blockchain.
ShardID() uint32 ShardID() uint32
// CurrentBlock retrieves the current head block of the canonical chain. The // CurrentBlock retrieves the current head block of the canonical chain. The
@ -105,18 +100,6 @@ type BlockChain interface {
// Rollback is designed to remove a chain of links from the database that aren't // Rollback is designed to remove a chain of links from the database that aren't
// certain enough to be valid. // certain enough to be valid.
Rollback(chain []common.Hash) error Rollback(chain []common.Hash) error
// WriteBlockWithoutState writes only the block and its metadata to the database,
// but does not write any state. This is used to construct competing side forks
// up to the point where they exceed the canonical total difficulty.
WriteBlockWithoutState(block *types.Block, td *big.Int) (err error)
// WriteBlockWithState writes the block and all associated state to the database.
WriteBlockWithState(
block *types.Block, receipts []*types.Receipt,
cxReceipts []*types.CXReceipt,
stakeMsgs []types2.StakeMsg,
paid reward.Reader,
state *state.DB,
) (status WriteStatus, err error)
// GetMaxGarbageCollectedBlockNumber .. // GetMaxGarbageCollectedBlockNumber ..
GetMaxGarbageCollectedBlockNumber() int64 GetMaxGarbageCollectedBlockNumber() int64
// InsertChain attempts to insert the given batch of blocks in to the canonical // InsertChain attempts to insert the given batch of blocks in to the canonical
@ -167,8 +150,6 @@ type BlockChain interface {
WriteShardStateBytes(db rawdb.DatabaseWriter, WriteShardStateBytes(db rawdb.DatabaseWriter,
epoch *big.Int, shardState []byte, epoch *big.Int, shardState []byte,
) (*shard.State, error) ) (*shard.State, error)
// WriteHeadBlock writes head block.
WriteHeadBlock(block *types.Block) error
// ReadCommitSig retrieves the commit signature on a block. // ReadCommitSig retrieves the commit signature on a block.
ReadCommitSig(blockNum uint64) ([]byte, error) ReadCommitSig(blockNum uint64) ([]byte, error)
// WriteCommitSig saves the commits signatures signed on a block. // WriteCommitSig saves the commits signatures signed on a block.
@ -179,20 +160,8 @@ type BlockChain interface {
GetVrfByNumber(number uint64) []byte GetVrfByNumber(number uint64) []byte
// ChainDb returns the database. // ChainDb returns the database.
ChainDb() ethdb.Database ChainDb() ethdb.Database
// GetEpochBlockNumber returns the first block number of the given epoch.
GetEpochBlockNumber(epoch *big.Int) (*big.Int, error)
// StoreEpochBlockNumber stores the given epoch-first block number.
StoreEpochBlockNumber(
epoch *big.Int, blockNum *big.Int,
) error
// ReadEpochVrfBlockNums retrieves block numbers with valid VRF for the specified epoch. // ReadEpochVrfBlockNums retrieves block numbers with valid VRF for the specified epoch.
ReadEpochVrfBlockNums(epoch *big.Int) ([]uint64, error) ReadEpochVrfBlockNums(epoch *big.Int) ([]uint64, error)
// WriteEpochVrfBlockNums saves block numbers with valid VRF for the specified epoch.
WriteEpochVrfBlockNums(epoch *big.Int, vrfNumbers []uint64) error
// ReadEpochVdfBlockNum retrieves block number with valid VDF for the specified epoch.
ReadEpochVdfBlockNum(epoch *big.Int) (*big.Int, error)
// WriteEpochVdfBlockNum saves block number with valid VDF for the specified epoch.
WriteEpochVdfBlockNum(epoch *big.Int, blockNum *big.Int) error
// WriteCrossLinks saves the hashes of crosslinks by shardID and blockNum combination key. // WriteCrossLinks saves the hashes of crosslinks by shardID and blockNum combination key.
WriteCrossLinks(batch rawdb.DatabaseWriter, cls []types.CrossLink) error WriteCrossLinks(batch rawdb.DatabaseWriter, cls []types.CrossLink) error
// DeleteCrossLinks removes the hashes of crosslinks by shardID and blockNum combination key. // DeleteCrossLinks removes the hashes of crosslinks by shardID and blockNum combination key.

@ -34,7 +34,6 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/common/prque" "github.com/ethereum/go-ethereum/common/prque"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/metrics"
@ -181,9 +180,7 @@ type BlockChainImpl struct {
scope event.SubscriptionScope scope event.SubscriptionScope
genesisBlock *types.Block genesisBlock *types.Block
mu sync.RWMutex // global mutex for locking chain operations
chainmu sync.RWMutex // blockchain insertion lock chainmu sync.RWMutex // blockchain insertion lock
procmu sync.RWMutex // block processor lock
pendingCrossLinksMutex sync.RWMutex // pending crosslinks lock pendingCrossLinksMutex sync.RWMutex // pending crosslinks lock
pendingSlashingCandidatesMU sync.RWMutex // pending slashing candidates pendingSlashingCandidatesMU sync.RWMutex // pending slashing candidates
@ -576,14 +573,14 @@ func (bc *BlockChainImpl) loadLastState() error {
if head == (common.Hash{}) { if head == (common.Hash{}) {
// Corrupt or empty database, init from scratch // Corrupt or empty database, init from scratch
utils.Logger().Warn().Msg("Empty database, resetting chain") utils.Logger().Warn().Msg("Empty database, resetting chain")
return bc.Reset() return bc.reset()
} }
// Make sure the entire head block is available // Make sure the entire head block is available
currentBlock := bc.GetBlockByHash(head) currentBlock := bc.GetBlockByHash(head)
if currentBlock == nil { if currentBlock == nil {
// Corrupt or empty database, init from scratch // Corrupt or empty database, init from scratch
utils.Logger().Warn().Str("hash", head.Hex()).Msg("Head block missing, resetting chain") utils.Logger().Warn().Str("hash", head.Hex()).Msg("Head block missing, resetting chain")
return bc.Reset() return bc.reset()
} }
// Make sure the state associated with the block is available // Make sure the state associated with the block is available
if _, err := state.New(currentBlock.Root(), bc.stateCache, bc.snaps); err != nil { if _, err := state.New(currentBlock.Root(), bc.stateCache, bc.snaps); err != nil {
@ -633,12 +630,9 @@ func (bc *BlockChainImpl) loadLastState() error {
return nil return nil
} }
func (bc *BlockChainImpl) SetHead(head uint64) error { func (bc *BlockChainImpl) setHead(head uint64) error {
utils.Logger().Warn().Uint64("target", head).Msg("Rewinding blockchain") utils.Logger().Warn().Uint64("target", head).Msg("Rewinding blockchain")
bc.mu.Lock()
defer bc.mu.Unlock()
// Rewind the header chain, deleting all block bodies until then // Rewind the header chain, deleting all block bodies until then
delFn := func(db rawdb.DatabaseDeleter, hash common.Hash, num uint64) error { delFn := func(db rawdb.DatabaseDeleter, hash common.Hash, num uint64) error {
return rawdb.DeleteBody(db, hash, num) return rawdb.DeleteBody(db, hash, num)
@ -691,8 +685,6 @@ func (bc *BlockChainImpl) CurrentBlock() *types.Block {
} }
func (bc *BlockChainImpl) Processor() Processor { func (bc *BlockChainImpl) Processor() Processor {
bc.procmu.RLock()
defer bc.procmu.RUnlock()
return bc.processor return bc.processor
} }
@ -709,17 +701,15 @@ func (bc *BlockChainImpl) Snapshots() *snapshot.Tree {
return bc.snaps return bc.snaps
} }
func (bc *BlockChainImpl) Reset() error { func (bc *BlockChainImpl) reset() error {
return bc.ResetWithGenesisBlock(bc.genesisBlock) return bc.resetWithGenesisBlock(bc.genesisBlock)
} }
func (bc *BlockChainImpl) ResetWithGenesisBlock(genesis *types.Block) error { func (bc *BlockChainImpl) resetWithGenesisBlock(genesis *types.Block) error {
// Dump the entire block chain and purge the caches // Dump the entire block chain and purge the caches
if err := bc.SetHead(0); err != nil { if err := bc.setHead(0); err != nil {
return err return err
} }
bc.mu.Lock()
defer bc.mu.Unlock()
// Prepare the genesis block and reinitialise the chain // Prepare the genesis block and reinitialise the chain
if err := rawdb.WriteBlock(bc.db, genesis); err != nil { if err := rawdb.WriteBlock(bc.db, genesis); err != nil {
@ -808,8 +798,8 @@ func (bc *BlockChainImpl) Export(w io.Writer) error {
// ExportN writes a subset of the active chain to the given writer. // ExportN writes a subset of the active chain to the given writer.
func (bc *BlockChainImpl) ExportN(w io.Writer, first uint64, last uint64) error { func (bc *BlockChainImpl) ExportN(w io.Writer, first uint64, last uint64) error {
bc.mu.RLock() bc.chainmu.RLock()
defer bc.mu.RUnlock() defer bc.chainmu.RUnlock()
if first > last { if first > last {
return fmt.Errorf("export failed: first (%d) is greater than last (%d)", first, last) return fmt.Errorf("export failed: first (%d) is greater than last (%d)", first, last)
@ -837,10 +827,6 @@ func (bc *BlockChainImpl) ExportN(w io.Writer, first uint64, last uint64) error
return nil return nil
} }
func (bc *BlockChainImpl) WriteHeadBlock(block *types.Block) error {
return bc.writeHeadBlock(block)
}
// writeHeadBlock writes a new head block // writeHeadBlock writes a new head block
func (bc *BlockChainImpl) writeHeadBlock(block *types.Block) error { func (bc *BlockChainImpl) writeHeadBlock(block *types.Block) error {
// If the block is on a side chain or an unknown one, force other heads onto it too // If the block is on a side chain or an unknown one, force other heads onto it too
@ -1167,8 +1153,8 @@ const (
) )
func (bc *BlockChainImpl) Rollback(chain []common.Hash) error { func (bc *BlockChainImpl) Rollback(chain []common.Hash) error {
bc.mu.Lock() bc.chainmu.Lock()
defer bc.mu.Unlock() defer bc.chainmu.Unlock()
valsToRemove := map[common.Address]struct{}{} valsToRemove := map[common.Address]struct{}{}
for i := len(chain) - 1; i >= 0; i-- { for i := len(chain) - 1; i >= 0; i-- {
@ -1205,95 +1191,15 @@ func (bc *BlockChainImpl) Rollback(chain []common.Hash) error {
return bc.removeInValidatorList(valsToRemove) return bc.removeInValidatorList(valsToRemove)
} }
// SetReceiptsData computes all the non-consensus fields of the receipts
func SetReceiptsData(config *params.ChainConfig, block *types.Block, receipts types.Receipts) error {
signer := types.MakeSigner(config, block.Epoch())
ethSigner := types.NewEIP155Signer(config.EthCompatibleChainID)
transactions, stakingTransactions, logIndex := block.Transactions(), block.StakingTransactions(), uint(0)
if len(transactions)+len(stakingTransactions) != len(receipts) {
return errors.New("transaction+stakingTransactions and receipt count mismatch")
}
// The used gas can be calculated based on previous receipts
if len(receipts) > 0 && len(transactions) > 0 {
receipts[0].GasUsed = receipts[0].CumulativeGasUsed
}
for j := 1; j < len(transactions); j++ {
// The transaction hash can be retrieved from the transaction itself
receipts[j].TxHash = transactions[j].Hash()
receipts[j].GasUsed = receipts[j].CumulativeGasUsed - receipts[j-1].CumulativeGasUsed
// The contract address can be derived from the transaction itself
if transactions[j].To() == nil {
// Deriving the signer is expensive, only do if it's actually needed
var from common.Address
if transactions[j].IsEthCompatible() {
from, _ = types.Sender(ethSigner, transactions[j])
} else {
from, _ = types.Sender(signer, transactions[j])
}
receipts[j].ContractAddress = crypto.CreateAddress(from, transactions[j].Nonce())
}
// The derived log fields can simply be set from the block and transaction
for k := 0; k < len(receipts[j].Logs); k++ {
receipts[j].Logs[k].BlockNumber = block.NumberU64()
receipts[j].Logs[k].BlockHash = block.Hash()
receipts[j].Logs[k].TxHash = receipts[j].TxHash
receipts[j].Logs[k].TxIndex = uint(j)
receipts[j].Logs[k].Index = logIndex
logIndex++
}
}
// The used gas can be calculated based on previous receipts
if len(receipts) > len(transactions) && len(stakingTransactions) > 0 {
receipts[len(transactions)].GasUsed = receipts[len(transactions)].CumulativeGasUsed
}
// in a block, txns are processed before staking txns
for j := len(transactions) + 1; j < len(transactions)+len(stakingTransactions); j++ {
// The transaction hash can be retrieved from the staking transaction itself
receipts[j].TxHash = stakingTransactions[j].Hash()
receipts[j].GasUsed = receipts[j].CumulativeGasUsed - receipts[j-1].CumulativeGasUsed
// The derived log fields can simply be set from the block and transaction
for k := 0; k < len(receipts[j].Logs); k++ {
receipts[j].Logs[k].BlockNumber = block.NumberU64()
receipts[j].Logs[k].BlockHash = block.Hash()
receipts[j].Logs[k].TxHash = receipts[j].TxHash
receipts[j].Logs[k].TxIndex = uint(j) + uint(len(transactions))
receipts[j].Logs[k].Index = logIndex
logIndex++
}
}
return nil
}
var lastWrite uint64 var lastWrite uint64
func (bc *BlockChainImpl) WriteBlockWithoutState(block *types.Block, td *big.Int) (err error) { func (bc *BlockChainImpl) writeBlockWithState(
bc.chainmu.Lock()
defer bc.chainmu.Unlock()
if err := bc.hc.WriteTd(block.Hash(), block.NumberU64(), td); err != nil {
return err
}
if err := rawdb.WriteBlock(bc.db, block); err != nil {
return err
}
return nil
}
func (bc *BlockChainImpl) WriteBlockWithState(
block *types.Block, receipts []*types.Receipt, block *types.Block, receipts []*types.Receipt,
cxReceipts []*types.CXReceipt, cxReceipts []*types.CXReceipt,
stakeMsgs []staking.StakeMsg, stakeMsgs []staking.StakeMsg,
paid reward.Reader, paid reward.Reader,
state *state.DB, state *state.DB,
) (status WriteStatus, err error) { ) (status WriteStatus, err error) {
// Make sure no inconsistent state is leaked during insertion
bc.mu.Lock()
defer bc.mu.Unlock()
currentBlock := bc.CurrentBlock() currentBlock := bc.CurrentBlock()
if currentBlock == nil { if currentBlock == nil {
return NonStatTy, errors.New("Current block is nil") return NonStatTy, errors.New("Current block is nil")
@ -1676,7 +1582,7 @@ func (bc *BlockChainImpl) insertChain(chain types.Blocks, verifyHeaders bool) (i
// Write the block to the chain and get the status. // Write the block to the chain and get the status.
substart = time.Now() substart = time.Now()
status, err := bc.WriteBlockWithState( status, err := bc.writeBlockWithState(
block, receipts, cxReceipts, stakeMsgs, payout, state, block, receipts, cxReceipts, stakeMsgs, payout, state,
) )
if err != nil { if err != nil {
@ -1888,35 +1794,6 @@ Error: %v
} }
} }
// InsertHeaderChain attempts to insert the given header chain in to the local
// chain, possibly creating a reorg. If an error is returned, it will return the
// index number of the failing header as well an error describing what went wrong.
//
// The verify parameter can be used to fine tune whether nonce verification
// should be done or not. The reason behind the optional check is because some
// of the header retrieval mechanisms already need to verify nonces, as well as
// because nonces can be verified sparsely, not needing to check each.
func (bc *BlockChainImpl) InsertHeaderChain(chain []*block.Header, checkFreq int) (int, error) {
start := time.Now()
if i, err := bc.hc.ValidateHeaderChain(chain, checkFreq); err != nil {
return i, err
}
// Make sure only one thread manipulates the chain at once
bc.chainmu.Lock()
defer bc.chainmu.Unlock()
whFunc := func(header *block.Header) error {
bc.mu.Lock()
defer bc.mu.Unlock()
_, err := bc.hc.WriteHeader(header)
return err
}
return bc.hc.InsertHeaderChain(chain, whFunc, start)
}
func (bc *BlockChainImpl) CurrentHeader() *block.Header { func (bc *BlockChainImpl) CurrentHeader() *block.Header {
return bc.hc.CurrentHeader() return bc.hc.CurrentHeader()
} }

@ -18,13 +18,11 @@ package core
import ( import (
crand "crypto/rand" crand "crypto/rand"
"errors"
"fmt" "fmt"
"math" "math"
"math/big" "math/big"
mrand "math/rand" mrand "math/rand"
"sync/atomic" "sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
@ -260,55 +258,6 @@ func (hc *HeaderChain) ValidateHeaderChain(chain []*block.Header, checkFreq int)
return 0, nil return 0, nil
} }
// InsertHeaderChain attempts to insert the given header chain in to the local
// chain, possibly creating a reorg. If an error is returned, it will return the
// index number of the failing header as well an error describing what went wrong.
//
// The verify parameter can be used to fine tune whether nonce verification
// should be done or not. The reason behind the optional check is because some
// of the header retrieval mechanisms already need to verfy nonces, as well as
// because nonces can be verified sparsely, not needing to check each.
func (hc *HeaderChain) InsertHeaderChain(chain []*block.Header, writeHeader WhCallback, start time.Time) (int, error) {
// Collect some import statistics to report on
stats := struct{ processed, ignored int }{}
// All headers passed verification, import them into the database
for i, header := range chain {
// Short circuit insertion if shutting down
if hc.procInterrupt() {
utils.Logger().Debug().Msg("Premature abort during headers import")
return i, errors.New("aborted")
}
// If the header's already known, skip it, otherwise store
if hc.HasHeader(header.Hash(), header.Number().Uint64()) {
stats.ignored++
continue
}
if err := writeHeader(header); err != nil {
return i, err
}
stats.processed++
}
// Report some public statistics so the user has a clue what's going on
last := chain[len(chain)-1]
context := utils.Logger().With().
Int("count", stats.processed).
Str("elapsed", common.PrettyDuration(time.Since(start)).String()).
Str("number", last.Number().String()).
Str("hash", last.Hash().Hex())
if timestamp := time.Unix(last.Time().Int64(), 0); time.Since(timestamp) > time.Minute {
context = context.Str("age", common.PrettyAge(timestamp).String())
}
if stats.ignored > 0 {
context = context.Int("ignored", stats.ignored)
}
logger := context.Logger()
logger.Info().Msg("Imported new block headers")
return 0, nil
}
// GetBlockHashesFromHash retrieves a number of block hashes starting at a given // GetBlockHashesFromHash retrieves a number of block hashes starting at a given
// hash, fetching towards the genesis block. // hash, fetching towards the genesis block.
func (hc *HeaderChain) GetBlockHashesFromHash(hash common.Hash, max uint64) []common.Hash { func (hc *HeaderChain) GetBlockHashesFromHash(hash common.Hash, max uint64) []common.Hash {

Loading…
Cancel
Save