Refactor shard state related func and avoid recomputing shard state unnecessarily

pull/1621/head
Rongjian Lan 5 years ago
parent 3de4056de3
commit 19b1c03888
  1. 4
      cmd/harmony/main.go
  2. 4
      consensus/consensus_service.go
  3. 2
      consensus/engine/consensus_engine.go
  4. 2
      core/block_validator.go
  5. 2
      core/blockchain.go
  6. 18
      core/resharding.go
  7. 50
      internal/chain/engine.go
  8. 14
      node/node.go
  9. 2
      node/node_genesis.go
  10. 4
      node/node_handler.go
  11. 2
      node/node_newblock.go
  12. 2
      node/worker/worker.go

@ -367,8 +367,8 @@ func setupConsensusAndNode(nodeConfig *nodeconfig.ConfigType) *node.Node {
// currentNode.DRand = dRand
// This needs to be executed after consensus and drand are setup
if err := currentNode.GetInitShardState(); err != nil {
ctxerror.Crit(utils.GetLogger(), err, "GetInitShardState failed",
if err := currentNode.CalculateInitShardState(); err != nil {
ctxerror.Crit(utils.GetLogger(), err, "CalculateInitShardState failed",
"shardID", *shardID)
}

@ -505,7 +505,7 @@ func (consensus *Consensus) UpdateConsensusInformation() Mode {
header := consensus.ChainReader.CurrentHeader()
epoch := header.Epoch()
curPubKeys := core.GetPublicKeys(epoch, header.ShardID())
curPubKeys := core.CalculatePublicKeys(epoch, header.ShardID())
consensus.numPrevPubKeys = len(curPubKeys)
consensus.getLogger().Info().Msg("[UpdateConsensusInformation] Updating.....")
@ -515,7 +515,7 @@ func (consensus *Consensus) UpdateConsensusInformation() Mode {
consensus.SetEpochNum(epoch.Uint64() + 1)
consensus.getLogger().Info().Uint64("headerNum", header.Number().Uint64()).Msg("[UpdateConsensusInformation] Epoch updated for next epoch")
nextEpoch := new(big.Int).Add(epoch, common.Big1)
pubKeys = core.GetPublicKeys(nextEpoch, header.ShardID())
pubKeys = core.CalculatePublicKeys(nextEpoch, header.ShardID())
} else {
consensus.SetEpochNum(epoch.Uint64())
pubKeys = curPubKeys

@ -56,7 +56,7 @@ type Engine interface {
// is used for verifying "incoming" block header against commit signature and bitmap sent from the other chain cross-shard via libp2p.
// i.e. this header verification api is more flexible since the caller specifies which commit signature and bitmap to use
// for verifying the block header, which is necessary for cross-shard block header verification. Example of such is cross-shard transaction.
VerifyHeaderWithSignature(header *block.Header, commitSig []byte, commitBitmap []byte) error
VerifyHeaderWithSignature(chain ChainReader, header *block.Header, commitSig []byte, commitBitmap []byte) error
// VerifyHeaders is similar to VerifyHeader, but verifies a batch of headers
// concurrently. The method returns a quit channel to abort the operations and

@ -225,5 +225,5 @@ func (v *BlockValidator) ValidateCXReceiptsProof(cxp *types.CXReceiptsProof) err
}
// (4) verify blockHeader with seal
return v.engine.VerifyHeaderWithSignature(cxp.Header, cxp.CommitSig, cxp.CommitBitmap)
return v.engine.VerifyHeaderWithSignature(v.bc, cxp.Header, cxp.CommitSig, cxp.CommitBitmap)
}

@ -1907,7 +1907,7 @@ func (bc *BlockChain) GetVrfByNumber(number uint64) []byte {
return header.Vrf()
}
// GetShardState returns the shard state for the given epoch,
// CalculateShardState returns the shard state for the given epoch,
// creating one if needed.
func (bc *BlockChain) GetShardState(
epoch *big.Int,

@ -163,7 +163,7 @@ func CalculateNewShardState(
stakeInfo *map[common.Address]*structs.StakeInfo,
) (shard.State, error) {
if epoch.Cmp(big.NewInt(GenesisEpoch)) == 0 {
return GetInitShardState(), nil
return CalculateInitShardState(), nil
}
prevEpoch := new(big.Int).Sub(epoch, common.Big1)
ss, err := GetShardingStateFromBlockChain(bc, prevEpoch)
@ -215,15 +215,15 @@ func (ss *ShardingState) UpdateShardingState(stakeInfo *map[common.Address]*stru
// Depends on the type of the network. Defaults to the mainnet schedule.
var ShardingSchedule shardingconfig.Schedule = shardingconfig.MainnetSchedule
// GetInitShardState returns the initial shard state at genesis.
func GetInitShardState() shard.State {
return GetShardState(big.NewInt(GenesisEpoch))
// CalculateInitShardState returns the initial shard state at genesis.
func CalculateInitShardState() shard.State {
return CalculateShardState(big.NewInt(GenesisEpoch))
}
// GetShardState returns the shard state based on epoch number
// CalculateShardState returns the shard state based on epoch number
// This api for getting shard state is what should be used to get shard state regardless of
// current chain dependency (ex. getting shard state from block header received during cross-shard transaction)
func GetShardState(epoch *big.Int) shard.State {
func CalculateShardState(epoch *big.Int) shard.State {
utils.Logger().Info().Int64("epoch", epoch.Int64()).Msg("Get Shard State of Epoch.")
shardingConfig := ShardingSchedule.InstanceForEpoch(epoch)
shardNum := int(shardingConfig.NumShards())
@ -271,9 +271,9 @@ func GetShardState(epoch *big.Int) shard.State {
return shardState
}
// GetPublicKeys returns the publickeys given epoch and shardID
func GetPublicKeys(epoch *big.Int, shardID uint32) []*bls.PublicKey {
shardState := GetShardState(epoch)
// CalculatePublicKeys returns the publickeys given epoch and shardID
func CalculatePublicKeys(epoch *big.Int, shardID uint32) []*bls.PublicKey {
shardState := CalculateShardState(epoch)
// Update validator public keys
committee := shardState.FindCommitteeByID(shardID)

@ -11,7 +11,6 @@ import (
"github.com/harmony-one/harmony/block"
"github.com/harmony-one/harmony/consensus/engine"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/state"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/ctxerror"
@ -104,29 +103,7 @@ func (e *engineImpl) VerifyHeaders(chain engine.ChainReader, headers []*block.He
// ReadPublicKeysFromLastBlock finds the public keys of last block's committee
func ReadPublicKeysFromLastBlock(bc engine.ChainReader, header *block.Header) ([]*bls.PublicKey, error) {
parentHeader := bc.GetHeaderByHash(header.ParentHash())
if parentHeader == nil {
return nil, ctxerror.New("cannot find parent block header in DB",
"parentHash", header.ParentHash())
}
parentShardState := core.GetShardState(parentHeader.Epoch())
parentCommittee := parentShardState.FindCommitteeByID(parentHeader.ShardID())
if parentCommittee == nil {
return nil, ctxerror.New("cannot find shard in the shard state",
"parentBlockNumber", parentHeader.Number(),
"shardID", parentHeader.ShardID(),
)
}
var committerKeys []*bls.PublicKey
for _, member := range parentCommittee.NodeList {
committerKey := new(bls.PublicKey)
err := member.BlsPublicKey.ToLibBLSPublicKey(committerKey)
if err != nil {
return nil, ctxerror.New("cannot convert BLS public key",
"blsPublicKey", member.BlsPublicKey).WithCause(err)
}
committerKeys = append(committerKeys, committerKey)
}
return committerKeys, nil
return GetPublicKeys(bc, parentHeader)
}
// VerifySeal implements Engine, checking whether the given block's parent block satisfies
@ -148,7 +125,7 @@ func (e *engineImpl) VerifySeal(chain engine.ChainReader, header *block.Header)
}
parentHash := header.ParentHash()
parentHeader := chain.GetHeader(parentHash, header.Number().Uint64()-1)
parentQuorum, err := QuorumForBlock(parentHeader)
parentQuorum, err := QuorumForBlock(chain, parentHeader)
if err != nil {
return errors.Wrapf(err,
"cannot calculate quorum for block %s", header.Number())
@ -181,8 +158,12 @@ func (e *engineImpl) Finalize(chain engine.ChainReader, header *block.Header, st
}
// QuorumForBlock returns the quorum for the given block header.
func QuorumForBlock(h *block.Header) (quorum int, err error) {
ss := core.GetShardState(h.Epoch())
func QuorumForBlock(chain engine.ChainReader, h *block.Header) (quorum int, err error) {
ss, err := chain.ReadShardState(h.Epoch())
if err != nil {
return 0, ctxerror.New("failed to read shard state of epoch",
"epoch", h.Epoch().Uint64())
}
c := ss.FindCommitteeByID(h.ShardID())
if c == nil {
return 0, errors.Errorf(
@ -195,8 +176,8 @@ func QuorumForBlock(h *block.Header) (quorum int, err error) {
// is used for verifying "incoming" block header against commit signature and bitmap sent from the other chain cross-shard via libp2p.
// i.e. this header verification api is more flexible since the caller specifies which commit signature and bitmap to use
// for verifying the block header, which is necessary for cross-shard block header verification. Example of such is cross-shard transaction.
func (e *engineImpl) VerifyHeaderWithSignature(header *block.Header, commitSig []byte, commitBitmap []byte) error {
publicKeys, err := GetPublicKeys(header)
func (e *engineImpl) VerifyHeaderWithSignature(chain engine.ChainReader, header *block.Header, commitSig []byte, commitBitmap []byte) error {
publicKeys, err := GetPublicKeys(chain, header)
if err != nil {
return ctxerror.New("[VerifyHeaderWithSignature] Cannot get publickeys for block header").WithCause(err)
}
@ -208,7 +189,7 @@ func (e *engineImpl) VerifyHeaderWithSignature(header *block.Header, commitSig [
}
hash := header.Hash()
quorum, err := QuorumForBlock(header)
quorum, err := QuorumForBlock(chain, header)
if err != nil {
return errors.Wrapf(err,
"cannot calculate quorum for block %s", header.Number())
@ -229,8 +210,13 @@ func (e *engineImpl) VerifyHeaderWithSignature(header *block.Header, commitSig [
}
// GetPublicKeys finds the public keys of the committee that signed the block header
func GetPublicKeys(header *block.Header) ([]*bls.PublicKey, error) {
shardState := core.GetShardState(header.Epoch())
func GetPublicKeys(chain engine.ChainReader, header *block.Header) ([]*bls.PublicKey, error) {
shardState, err := chain.ReadShardState(header.Epoch())
if err != nil {
return nil, ctxerror.New("failed to read shard state of epoch",
"epoch", header.Epoch().Uint64())
}
committee := shardState.FindCommitteeByID(header.ShardID())
if committee == nil {
return nil, ctxerror.New("cannot find shard in the shard state",

@ -458,10 +458,10 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, chainDBFactory shardc
return &node
}
// GetInitShardState initialize shard state from latest epoch and update committee pub keys for consensus and drand
func (node *Node) GetInitShardState() (err error) {
// CalculateInitShardState initialize shard state from latest epoch and update committee pub keys for consensus and drand
func (node *Node) CalculateInitShardState() (err error) {
if node.Consensus == nil {
return ctxerror.New("[GetInitShardState] consenus is nil; Cannot figure out shardID")
return ctxerror.New("[CalculateInitShardState] consenus is nil; Cannot figure out shardID")
}
shardID := node.Consensus.ShardID
@ -473,11 +473,11 @@ func (node *Node) GetInitShardState() (err error) {
Uint64("blockNum", blockNum).
Uint32("shardID", shardID).
Uint64("epoch", epoch.Uint64()).
Msg("[GetInitShardState] Try To Get PublicKeys from database")
pubKeys := core.GetPublicKeys(epoch, shardID)
Msg("[CalculateInitShardState] Try To Get PublicKeys from database")
pubKeys := core.CalculatePublicKeys(epoch, shardID)
if len(pubKeys) == 0 {
return ctxerror.New(
"[GetInitShardState] PublicKeys is Empty, Cannot update public keys",
"[CalculateInitShardState] PublicKeys is Empty, Cannot update public keys",
"shardID", shardID,
"blockNum", blockNum)
}
@ -487,7 +487,7 @@ func (node *Node) GetInitShardState() (err error) {
utils.Logger().Info().
Uint64("blockNum", blockNum).
Int("numPubKeys", len(pubKeys)).
Msg("[GetInitShardState] Successfully updated public keys")
Msg("[CalculateInitShardState] Successfully updated public keys")
node.Consensus.UpdatePublicKeys(pubKeys)
node.Consensus.SetMode(consensus.Normal)
return nil

@ -41,7 +41,7 @@ type genesisInitializer struct {
// InitChainDB sets up a new genesis block in the database for the given shard.
func (gi *genesisInitializer) InitChainDB(db ethdb.Database, shardID uint32) error {
shardState := core.GetInitShardState()
shardState := core.CalculateInitShardState()
if shardID != 0 {
// store only the local shard for shard chains
c := shardState.FindCommitteeByID(shardID)

@ -441,7 +441,7 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block, commitSigAndBit
// ctxerror.Log15(utils.Logger().Error, e)
// }
// }
// shardState, err := newBlockHeader.GetShardState()
// shardState, err := newBlockHeader.CalculateShardState()
// if err != nil {
// e := ctxerror.New("cannot get shard state from header").WithCause(err)
// ctxerror.Log15(utils.Logger().Error, e)
@ -484,7 +484,7 @@ var (
)
func initGenesisCatalog() {
genesisShardState := core.GetInitShardState()
genesisShardState := core.CalculateInitShardState()
for _, committee := range genesisShardState {
for i, nodeID := range committee.NodeList {
genesisNode := &genesisNode{

@ -131,7 +131,7 @@ func (node *Node) proposeShardStateWithoutBeaconSync(block *types.Block) shard.S
}
nextEpoch := new(big.Int).Add(block.Header().Epoch(), common.Big1)
return core.GetShardState(nextEpoch)
return core.CalculateShardState(nextEpoch)
}
func (node *Node) proposeShardState(block *types.Block) error {

@ -299,7 +299,7 @@ func (w *Worker) ProposeShardStateWithoutBeaconSync() shard.State {
return nil
}
nextEpoch := new(big.Int).Add(w.current.header.Epoch(), common.Big1)
return core.GetShardState(nextEpoch)
return core.CalculateShardState(nextEpoch)
}
// FinalizeNewBlock generate a new block for the next consensus round.

Loading…
Cancel
Save