Keep rotation meta in memory. (#4459)

* Keep rotation meta in memory.

* Proper name for struct.

* Clean up.
pull/4487/head
Konstantin 1 year ago committed by GitHub
parent b093fea169
commit fa00cd143d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      consensus/consensus_v2.go
  2. 70
      core/blockchain_impl.go
  3. 8
      core/blockchain_leader_rotation.go
  4. 4
      core/state_processor.go
  5. 83
      core_test/shardchain_test.go
  6. 43
      internal/configs/node/config.go
  7. 6
      node/node_handler_test.go
  8. 4
      node/node_newblock_test.go

@ -157,10 +157,11 @@ func (consensus *Consensus) finalCommit() {
Msg("[finalCommit] Unable to construct Committed message")
return
}
msgToSend, FBFTMsg :=
network.Bytes,
network.FBFTMsg
commitSigAndBitmap := FBFTMsg.Payload
var (
msgToSend = network.Bytes
FBFTMsg = network.FBFTMsg
commitSigAndBitmap = FBFTMsg.Payload
)
consensus.fBFTLog.AddVerifiedMessage(FBFTMsg)
// find correct block content
curBlockHash := consensus.blockHash

@ -221,6 +221,7 @@ type BlockChainImpl struct {
badBlocks *lru.Cache // Bad block cache
pendingSlashes slash.Records
maxGarbCollectedBlkNum int64
leaderRotationMeta leaderRotationMeta
options Options
}
@ -359,6 +360,12 @@ func newBlockChainWithOptions(
bc.snaps, _ = snapshot.New(snapconfig, bc.db, bc.triedb, head.Hash())
}
curHeader := bc.CurrentBlock().Header()
err = bc.buildLeaderRotationMeta(curHeader)
if err != nil {
return nil, errors.WithMessage(err, "failed to build leader rotation meta")
}
// Take ownership of this particular state
go bc.update()
return bc, nil
@ -1479,8 +1486,11 @@ func (bc *BlockChainImpl) WriteBlockWithState(
defer bc.mu.Unlock()
currentBlock := bc.CurrentBlock()
if currentBlock == nil || block.ParentHash() != currentBlock.Hash() {
return NonStatTy, errors.New("Hash of parent block doesn't match the current block hash")
if currentBlock == nil {
return NonStatTy, errors.New("Current block is nil")
}
if block.ParentHash() != currentBlock.Hash() {
return NonStatTy, errors.Errorf("Hash of parent block %s doesn't match the current block hash %s", currentBlock.Hash().Hex(), block.ParentHash().Hex())
}
// Commit state object changes to in-memory trie
@ -1650,20 +1660,52 @@ func (bc *BlockChainImpl) InsertChain(chain types.Blocks, verifyHeaders bool) (i
return n, err
}
// buildLeaderRotationMeta builds leader rotation meta if feature is activated.
func (bc *BlockChainImpl) buildLeaderRotationMeta(curHeader *block.Header) error {
if !bc.chainConfig.IsLeaderRotation(curHeader.Epoch()) {
return nil
}
if curHeader.NumberU64() == 0 {
return errors.New("current header is genesis")
}
curPubKey, err := bc.getLeaderPubKeyFromCoinbase(curHeader)
if err != nil {
return err
}
for i := curHeader.NumberU64() - 1; i >= 0; i-- {
header := bc.GetHeaderByNumber(i)
if header == nil {
return errors.New("header is nil")
}
blockPubKey, err := bc.getLeaderPubKeyFromCoinbase(header)
if err != nil {
return err
}
if curPubKey.Bytes != blockPubKey.Bytes || curHeader.Epoch().Uint64() != header.Epoch().Uint64() {
for j := i; i <= curHeader.NumberU64(); j++ {
header := bc.GetHeaderByNumber(i)
if header == nil {
return errors.New("header is nil")
}
err := bc.saveLeaderRotationMeta(header)
if err != nil {
utils.Logger().Error().Err(err).Msg("save leader continuous blocks count error")
return err
}
}
return nil
}
}
return errors.New("no leader rotation meta to save")
}
func (bc *BlockChainImpl) saveLeaderRotationMeta(h *block.Header) error {
blockPubKey, err := bc.getLeaderPubKeyFromCoinbase(h)
if err != nil {
return err
}
type stored struct {
pub []byte
epoch uint64
count uint64
shifts uint64
}
var s stored
// error is possible here only on the first iteration, so we can ignore it
s.pub, s.epoch, s.count, s.shifts, _ = rawdb.ReadLeaderRotationMeta(bc.db)
var s = bc.leaderRotationMeta
// increase counter only if the same leader and epoch
if bytes.Equal(s.pub, blockPubKey.Bytes[:]) && s.epoch == h.Epoch().Uint64() {
@ -1679,11 +1721,9 @@ func (bc *BlockChainImpl) saveLeaderRotationMeta(h *block.Header) error {
if s.epoch != h.Epoch().Uint64() {
s.shifts = 0
}
s.epoch = h.Epoch().Uint64()
bc.leaderRotationMeta = s
err = rawdb.WriteLeaderRotationMeta(bc.db, blockPubKey.Bytes[:], h.Epoch().Uint64(), s.count, s.shifts)
if err != nil {
return err
}
return nil
}

@ -0,0 +1,8 @@
package core
type leaderRotationMeta struct {
pub []byte
epoch uint64
count uint64
shifts uint64
}

@ -120,7 +120,7 @@ func (p *StateProcessor) Process(
header = block.Header()
allLogs []*types.Log
gp = new(GasPool).AddGas(block.GasLimit())
blockStakeMsgs []staking.StakeMsg = make([]staking.StakeMsg, 0)
blockStakeMsgs = make([]staking.StakeMsg, 0)
)
beneficiary, err := p.bc.GetECDSAFromCoinbase(header)
@ -202,7 +202,7 @@ func (p *StateProcessor) Process(
receipts, outcxs, incxs, block.StakingTransactions(), slashes, sigsReady, func() uint64 { return header.ViewID().Uint64() },
)
if err != nil {
return nil, nil, nil, nil, 0, nil, statedb, errors.New("[Process] Cannot finalize block")
return nil, nil, nil, nil, 0, nil, statedb, errors.WithMessage(err, "[Process] Cannot finalize block")
}
result := &ProcessorResult{

@ -0,0 +1,83 @@
package core_test
import (
"fmt"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/consensus/quorum"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/internal/chain"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/registry"
"github.com/harmony-one/harmony/internal/shardchain"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/multibls"
"github.com/harmony-one/harmony/node"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/shard"
"github.com/stretchr/testify/require"
)
var testDBFactory = &shardchain.MemDBFactory{}
func TestAddNewBlock(t *testing.T) {
blsKey := bls.RandPrivateKey()
pubKey := blsKey.GetPublicKey()
leader := p2p.Peer{IP: "127.0.0.1", Port: "9882", ConsensusPubKey: pubKey}
priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902")
host, err := p2p.NewHost(p2p.HostConfig{
Self: &leader,
BLSKey: priKey,
})
if err != nil {
t.Fatalf("newhost failure: %v", err)
}
engine := chain.NewEngine()
chainconfig := nodeconfig.GetShardConfig(shard.BeaconChainShardID).GetNetworkType().ChainConfig()
collection := shardchain.NewCollection(
nil, testDBFactory, &core.GenesisInitializer{NetworkType: nodeconfig.GetShardConfig(shard.BeaconChainShardID).GetNetworkType()}, engine, &chainconfig,
)
decider := quorum.NewDecider(
quorum.SuperMajorityVote, shard.BeaconChainShardID,
)
blockchain, err := collection.ShardChain(shard.BeaconChainShardID)
if err != nil {
t.Fatal("cannot get blockchain")
}
reg := registry.New().SetBlockchain(blockchain)
consensus, err := consensus.New(
host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsKey), reg, decider, 3, false,
)
if err != nil {
t.Fatalf("Cannot craeate consensus: %v", err)
}
nodeconfig.SetNetworkType(nodeconfig.Testnet)
var block *types.Block
node := node.New(host, consensus, engine, collection, nil, nil, nil, nil, nil, reg)
commitSigs := make(chan []byte, 1)
commitSigs <- []byte{}
block, err = node.Worker.FinalizeNewBlock(
commitSigs, func() uint64 { return uint64(0) }, common.Address{}, nil, nil,
)
if err != nil {
t.Fatal("cannot finalize new block")
}
nn := node.Blockchain().CurrentBlock()
t.Log("[*]", nn.NumberU64(), nn.Hash().Hex(), nn.ParentHash())
_, err = blockchain.InsertChain([]*types.Block{block}, false)
require.NoError(t, err, "error when adding new block")
pk, epoch, count, shifts, err := blockchain.LeaderRotationMeta()
fmt.Println("pk", pk, "epoch", epoch, "count", count, "shifts", shifts, "err", err)
t.Log("#", block.Header().NumberU64(), node.Blockchain().CurrentBlock().NumberU64(), block.Hash().Hex(), block.ParentHash())
err = blockchain.Rollback([]common.Hash{block.Hash()})
require.NoError(t, err, "error when rolling back")
}

@ -58,6 +58,31 @@ const (
Localnet = "localnet"
)
// ChainConfig returns the chain configuration for the network type.
func (t NetworkType) ChainConfig() params.ChainConfig {
switch t {
case Mainnet:
return *params.MainnetChainConfig
case Pangaea:
return *params.PangaeaChainConfig
case Partner:
return *params.PartnerChainConfig
case Stressnet:
return *params.StressnetChainConfig
case Localnet:
return *params.LocalnetChainConfig
default:
return *params.TestnetChainConfig
}
}
func (n NetworkType) String() string {
if n == "" {
return Testnet // default to testnet
}
return string(n)
}
// Global is the index of the global node configuration
const (
Global = 0
@ -352,21 +377,3 @@ func (conf *ConfigType) ValidateConsensusKeysForSameShard(pubkeys multibls.Publi
}
return nil
}
// ChainConfig returns the chain configuration for the network type.
func (t NetworkType) ChainConfig() params.ChainConfig {
switch t {
case Mainnet:
return *params.MainnetChainConfig
case Pangaea:
return *params.PangaeaChainConfig
case Partner:
return *params.PartnerChainConfig
case Stressnet:
return *params.StressnetChainConfig
case Localnet:
return *params.LocalnetChainConfig
default:
return *params.TestnetChainConfig
}
}

@ -46,13 +46,11 @@ func TestAddNewBlock(t *testing.T) {
t.Fatal("cannot get blockchain")
}
reg := registry.New().SetBlockchain(blockchain)
consensus, err := consensus.New(
host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsKey), reg, decider, 3, false,
)
consensus, err := consensus.New(host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsKey), reg, decider, 3, false)
if err != nil {
t.Fatalf("Cannot craeate consensus: %v", err)
}
nodeconfig.SetNetworkType(nodeconfig.Devnet)
nodeconfig.SetNetworkType(nodeconfig.Testnet)
node := New(host, consensus, engine, collection, nil, nil, nil, nil, nil, reg)
txs := make(map[common.Address]types.Transactions)

@ -63,10 +63,8 @@ func TestFinalizeNewBlockAsync(t *testing.T) {
node.Worker.CommitTransactions(
txs, stks, common.Address{},
)
commitSigs := make(chan []byte)
go func() {
commitSigs := make(chan []byte, 1)
commitSigs <- []byte{}
}()
block, _ := node.Worker.FinalizeNewBlock(
commitSigs, func() uint64 { return 0 }, common.Address{}, nil, nil,

Loading…
Cancel
Save