From fa00cd143da8a86db8e649893804c722a27380be Mon Sep 17 00:00:00 2001 From: Konstantin <355847+Frozen@users.noreply.github.com> Date: Fri, 11 Aug 2023 14:26:38 -0400 Subject: [PATCH] Keep rotation meta in memory. (#4459) * Keep rotation meta in memory. * Proper name for struct. * Clean up. --- consensus/consensus_v2.go | 9 ++-- core/blockchain_impl.go | 70 +++++++++++++++++++------ core/blockchain_leader_rotation.go | 8 +++ core/state_processor.go | 6 +-- core_test/shardchain_test.go | 83 ++++++++++++++++++++++++++++++ internal/configs/node/config.go | 43 +++++++++------- node/node_handler_test.go | 6 +-- node/node_newblock_test.go | 6 +-- 8 files changed, 183 insertions(+), 48 deletions(-) create mode 100644 core/blockchain_leader_rotation.go create mode 100644 core_test/shardchain_test.go diff --git a/consensus/consensus_v2.go b/consensus/consensus_v2.go index 38bb06ff4..3627f0b65 100644 --- a/consensus/consensus_v2.go +++ b/consensus/consensus_v2.go @@ -157,10 +157,11 @@ func (consensus *Consensus) finalCommit() { Msg("[finalCommit] Unable to construct Committed message") return } - msgToSend, FBFTMsg := - network.Bytes, - network.FBFTMsg - commitSigAndBitmap := FBFTMsg.Payload + var ( + msgToSend = network.Bytes + FBFTMsg = network.FBFTMsg + commitSigAndBitmap = FBFTMsg.Payload + ) consensus.fBFTLog.AddVerifiedMessage(FBFTMsg) // find correct block content curBlockHash := consensus.blockHash diff --git a/core/blockchain_impl.go b/core/blockchain_impl.go index 698eaeeba..965dccd9a 100644 --- a/core/blockchain_impl.go +++ b/core/blockchain_impl.go @@ -221,6 +221,7 @@ type BlockChainImpl struct { badBlocks *lru.Cache // Bad block cache pendingSlashes slash.Records maxGarbCollectedBlkNum int64 + leaderRotationMeta leaderRotationMeta options Options } @@ -359,6 +360,12 @@ func newBlockChainWithOptions( bc.snaps, _ = snapshot.New(snapconfig, bc.db, bc.triedb, head.Hash()) } + curHeader := bc.CurrentBlock().Header() + err = bc.buildLeaderRotationMeta(curHeader) + if err != nil { + return nil, errors.WithMessage(err, "failed to build leader rotation meta") + } + // Take ownership of this particular state go bc.update() return bc, nil @@ -1479,8 +1486,11 @@ func (bc *BlockChainImpl) WriteBlockWithState( defer bc.mu.Unlock() currentBlock := bc.CurrentBlock() - if currentBlock == nil || block.ParentHash() != currentBlock.Hash() { - return NonStatTy, errors.New("Hash of parent block doesn't match the current block hash") + if currentBlock == nil { + return NonStatTy, errors.New("Current block is nil") + } + if block.ParentHash() != currentBlock.Hash() { + return NonStatTy, errors.Errorf("Hash of parent block %s doesn't match the current block hash %s", currentBlock.Hash().Hex(), block.ParentHash().Hex()) } // Commit state object changes to in-memory trie @@ -1650,20 +1660,52 @@ func (bc *BlockChainImpl) InsertChain(chain types.Blocks, verifyHeaders bool) (i return n, err } +// buildLeaderRotationMeta builds leader rotation meta if feature is activated. +func (bc *BlockChainImpl) buildLeaderRotationMeta(curHeader *block.Header) error { + if !bc.chainConfig.IsLeaderRotation(curHeader.Epoch()) { + return nil + } + if curHeader.NumberU64() == 0 { + return errors.New("current header is genesis") + } + curPubKey, err := bc.getLeaderPubKeyFromCoinbase(curHeader) + if err != nil { + return err + } + for i := curHeader.NumberU64() - 1; i >= 0; i-- { + header := bc.GetHeaderByNumber(i) + if header == nil { + return errors.New("header is nil") + } + blockPubKey, err := bc.getLeaderPubKeyFromCoinbase(header) + if err != nil { + return err + } + if curPubKey.Bytes != blockPubKey.Bytes || curHeader.Epoch().Uint64() != header.Epoch().Uint64() { + for j := i; i <= curHeader.NumberU64(); j++ { + header := bc.GetHeaderByNumber(i) + if header == nil { + return errors.New("header is nil") + } + err := bc.saveLeaderRotationMeta(header) + if err != nil { + utils.Logger().Error().Err(err).Msg("save leader continuous blocks count error") + return err + } + } + return nil + } + } + return errors.New("no leader rotation meta to save") +} + func (bc *BlockChainImpl) saveLeaderRotationMeta(h *block.Header) error { blockPubKey, err := bc.getLeaderPubKeyFromCoinbase(h) if err != nil { return err } - type stored struct { - pub []byte - epoch uint64 - count uint64 - shifts uint64 - } - var s stored - // error is possible here only on the first iteration, so we can ignore it - s.pub, s.epoch, s.count, s.shifts, _ = rawdb.ReadLeaderRotationMeta(bc.db) + + var s = bc.leaderRotationMeta // increase counter only if the same leader and epoch if bytes.Equal(s.pub, blockPubKey.Bytes[:]) && s.epoch == h.Epoch().Uint64() { @@ -1679,11 +1721,9 @@ func (bc *BlockChainImpl) saveLeaderRotationMeta(h *block.Header) error { if s.epoch != h.Epoch().Uint64() { s.shifts = 0 } + s.epoch = h.Epoch().Uint64() + bc.leaderRotationMeta = s - err = rawdb.WriteLeaderRotationMeta(bc.db, blockPubKey.Bytes[:], h.Epoch().Uint64(), s.count, s.shifts) - if err != nil { - return err - } return nil } diff --git a/core/blockchain_leader_rotation.go b/core/blockchain_leader_rotation.go new file mode 100644 index 000000000..6b9c91a92 --- /dev/null +++ b/core/blockchain_leader_rotation.go @@ -0,0 +1,8 @@ +package core + +type leaderRotationMeta struct { + pub []byte + epoch uint64 + count uint64 + shifts uint64 +} diff --git a/core/state_processor.go b/core/state_processor.go index fe7eeffd1..7a2f2a5d4 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -119,8 +119,8 @@ func (p *StateProcessor) Process( usedGas = new(uint64) header = block.Header() allLogs []*types.Log - gp = new(GasPool).AddGas(block.GasLimit()) - blockStakeMsgs []staking.StakeMsg = make([]staking.StakeMsg, 0) + gp = new(GasPool).AddGas(block.GasLimit()) + blockStakeMsgs = make([]staking.StakeMsg, 0) ) beneficiary, err := p.bc.GetECDSAFromCoinbase(header) @@ -202,7 +202,7 @@ func (p *StateProcessor) Process( receipts, outcxs, incxs, block.StakingTransactions(), slashes, sigsReady, func() uint64 { return header.ViewID().Uint64() }, ) if err != nil { - return nil, nil, nil, nil, 0, nil, statedb, errors.New("[Process] Cannot finalize block") + return nil, nil, nil, nil, 0, nil, statedb, errors.WithMessage(err, "[Process] Cannot finalize block") } result := &ProcessorResult{ diff --git a/core_test/shardchain_test.go b/core_test/shardchain_test.go new file mode 100644 index 000000000..36a32f543 --- /dev/null +++ b/core_test/shardchain_test.go @@ -0,0 +1,83 @@ +package core_test + +import ( + "fmt" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/harmony-one/harmony/consensus" + "github.com/harmony-one/harmony/consensus/quorum" + "github.com/harmony-one/harmony/core" + "github.com/harmony-one/harmony/core/types" + "github.com/harmony-one/harmony/crypto/bls" + "github.com/harmony-one/harmony/internal/chain" + nodeconfig "github.com/harmony-one/harmony/internal/configs/node" + "github.com/harmony-one/harmony/internal/registry" + "github.com/harmony-one/harmony/internal/shardchain" + "github.com/harmony-one/harmony/internal/utils" + "github.com/harmony-one/harmony/multibls" + "github.com/harmony-one/harmony/node" + "github.com/harmony-one/harmony/p2p" + "github.com/harmony-one/harmony/shard" + "github.com/stretchr/testify/require" +) + +var testDBFactory = &shardchain.MemDBFactory{} + +func TestAddNewBlock(t *testing.T) { + blsKey := bls.RandPrivateKey() + pubKey := blsKey.GetPublicKey() + leader := p2p.Peer{IP: "127.0.0.1", Port: "9882", ConsensusPubKey: pubKey} + priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902") + host, err := p2p.NewHost(p2p.HostConfig{ + Self: &leader, + BLSKey: priKey, + }) + if err != nil { + t.Fatalf("newhost failure: %v", err) + } + engine := chain.NewEngine() + chainconfig := nodeconfig.GetShardConfig(shard.BeaconChainShardID).GetNetworkType().ChainConfig() + collection := shardchain.NewCollection( + nil, testDBFactory, &core.GenesisInitializer{NetworkType: nodeconfig.GetShardConfig(shard.BeaconChainShardID).GetNetworkType()}, engine, &chainconfig, + ) + decider := quorum.NewDecider( + quorum.SuperMajorityVote, shard.BeaconChainShardID, + ) + blockchain, err := collection.ShardChain(shard.BeaconChainShardID) + if err != nil { + t.Fatal("cannot get blockchain") + } + reg := registry.New().SetBlockchain(blockchain) + consensus, err := consensus.New( + host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsKey), reg, decider, 3, false, + ) + if err != nil { + t.Fatalf("Cannot craeate consensus: %v", err) + } + nodeconfig.SetNetworkType(nodeconfig.Testnet) + var block *types.Block + node := node.New(host, consensus, engine, collection, nil, nil, nil, nil, nil, reg) + commitSigs := make(chan []byte, 1) + commitSigs <- []byte{} + block, err = node.Worker.FinalizeNewBlock( + commitSigs, func() uint64 { return uint64(0) }, common.Address{}, nil, nil, + ) + if err != nil { + t.Fatal("cannot finalize new block") + } + + nn := node.Blockchain().CurrentBlock() + t.Log("[*]", nn.NumberU64(), nn.Hash().Hex(), nn.ParentHash()) + + _, err = blockchain.InsertChain([]*types.Block{block}, false) + require.NoError(t, err, "error when adding new block") + + pk, epoch, count, shifts, err := blockchain.LeaderRotationMeta() + fmt.Println("pk", pk, "epoch", epoch, "count", count, "shifts", shifts, "err", err) + + t.Log("#", block.Header().NumberU64(), node.Blockchain().CurrentBlock().NumberU64(), block.Hash().Hex(), block.ParentHash()) + + err = blockchain.Rollback([]common.Hash{block.Hash()}) + require.NoError(t, err, "error when rolling back") +} diff --git a/internal/configs/node/config.go b/internal/configs/node/config.go index 20e001e20..5370a2e52 100644 --- a/internal/configs/node/config.go +++ b/internal/configs/node/config.go @@ -58,6 +58,31 @@ const ( Localnet = "localnet" ) +// ChainConfig returns the chain configuration for the network type. +func (t NetworkType) ChainConfig() params.ChainConfig { + switch t { + case Mainnet: + return *params.MainnetChainConfig + case Pangaea: + return *params.PangaeaChainConfig + case Partner: + return *params.PartnerChainConfig + case Stressnet: + return *params.StressnetChainConfig + case Localnet: + return *params.LocalnetChainConfig + default: + return *params.TestnetChainConfig + } +} + +func (n NetworkType) String() string { + if n == "" { + return Testnet // default to testnet + } + return string(n) +} + // Global is the index of the global node configuration const ( Global = 0 @@ -352,21 +377,3 @@ func (conf *ConfigType) ValidateConsensusKeysForSameShard(pubkeys multibls.Publi } return nil } - -// ChainConfig returns the chain configuration for the network type. -func (t NetworkType) ChainConfig() params.ChainConfig { - switch t { - case Mainnet: - return *params.MainnetChainConfig - case Pangaea: - return *params.PangaeaChainConfig - case Partner: - return *params.PartnerChainConfig - case Stressnet: - return *params.StressnetChainConfig - case Localnet: - return *params.LocalnetChainConfig - default: - return *params.TestnetChainConfig - } -} diff --git a/node/node_handler_test.go b/node/node_handler_test.go index 3de291839..307e21b11 100644 --- a/node/node_handler_test.go +++ b/node/node_handler_test.go @@ -46,13 +46,11 @@ func TestAddNewBlock(t *testing.T) { t.Fatal("cannot get blockchain") } reg := registry.New().SetBlockchain(blockchain) - consensus, err := consensus.New( - host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsKey), reg, decider, 3, false, - ) + consensus, err := consensus.New(host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsKey), reg, decider, 3, false) if err != nil { t.Fatalf("Cannot craeate consensus: %v", err) } - nodeconfig.SetNetworkType(nodeconfig.Devnet) + nodeconfig.SetNetworkType(nodeconfig.Testnet) node := New(host, consensus, engine, collection, nil, nil, nil, nil, nil, reg) txs := make(map[common.Address]types.Transactions) diff --git a/node/node_newblock_test.go b/node/node_newblock_test.go index 39affacab..963af2f55 100644 --- a/node/node_newblock_test.go +++ b/node/node_newblock_test.go @@ -63,10 +63,8 @@ func TestFinalizeNewBlockAsync(t *testing.T) { node.Worker.CommitTransactions( txs, stks, common.Address{}, ) - commitSigs := make(chan []byte) - go func() { - commitSigs <- []byte{} - }() + commitSigs := make(chan []byte, 1) + commitSigs <- []byte{} block, _ := node.Worker.FinalizeNewBlock( commitSigs, func() uint64 { return 0 }, common.Address{}, nil, nil,