Feature registry (#4324)

* Registry for services.

* Test.

* Reverted comment.

* Fix.
pull/4328/head
Konstantin 2 years ago committed by GitHub
parent fde22107d1
commit 8ee11600ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      cmd/harmony/main.go
  2. 18
      consensus/consensus.go
  3. 18
      consensus/consensus_service.go
  4. 3
      consensus/consensus_test.go
  5. 28
      consensus/consensus_v2.go
  6. 4
      consensus/double_sign.go
  7. 4
      consensus/downloader.go
  8. 2
      consensus/leader.go
  9. 2
      consensus/threshold.go
  10. 10
      consensus/validator.go
  11. 13
      consensus/view_change.go
  12. 35
      internal/registry/registry.go
  13. 16
      internal/registry/registry_test.go
  14. 28
      node/node.go
  15. 2
      node/node_handler.go
  16. 24
      node/node_handler_test.go
  17. 3
      node/node_newblock_test.go
  18. 19
      node/node_test.go

@ -17,6 +17,7 @@ import (
"github.com/harmony-one/harmony/consensus/quorum"
"github.com/harmony-one/harmony/internal/chain"
"github.com/harmony-one/harmony/internal/registry"
"github.com/harmony-one/harmony/internal/shardchain/tikv_manage"
"github.com/harmony-one/harmony/internal/tikv/redis_helper"
"github.com/harmony-one/harmony/internal/tikv/statedb_cache"
@ -310,7 +311,7 @@ func setupNodeAndRun(hc harmonyconfig.HarmonyConfig) {
// Update ethereum compatible chain ids
params.UpdateEthChainIDByShard(nodeConfig.ShardID)
currentNode := setupConsensusAndNode(hc, nodeConfig)
currentNode := setupConsensusAndNode(hc, nodeConfig, registry.New())
nodeconfig.GetDefaultConfig().ShardID = nodeConfig.ShardID
nodeconfig.GetDefaultConfig().IsOffline = nodeConfig.IsOffline
nodeconfig.GetDefaultConfig().Downloader = nodeConfig.Downloader
@ -659,7 +660,7 @@ func createGlobalConfig(hc harmonyconfig.HarmonyConfig) (*nodeconfig.ConfigType,
return nodeConfig, nil
}
func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfig.ConfigType) *node.Node {
func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfig.ConfigType, registry *registry.Registry) *node.Node {
// Parse minPeers from harmonyconfig.HarmonyConfig
var minPeers int
var aggregateSig bool
@ -733,14 +734,14 @@ func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfi
// Consensus object.
decider := quorum.NewDecider(quorum.SuperMajorityVote, nodeConfig.ShardID)
currentConsensus, err := consensus.New(
myHost, nodeConfig.ShardID, nodeConfig.ConsensusPriKey, blockchain, decider, minPeers, aggregateSig)
myHost, nodeConfig.ShardID, nodeConfig.ConsensusPriKey, registry.SetBlockchain(blockchain), decider, minPeers, aggregateSig)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "Error :%v \n", err)
os.Exit(1)
}
currentNode := node.New(myHost, currentConsensus, engine, collection, blacklist, allowedTxs, localAccounts, nodeConfig.ArchiveModes(), &hc)
currentNode := node.New(myHost, currentConsensus, engine, collection, blacklist, allowedTxs, localAccounts, nodeConfig.ArchiveModes(), &hc, registry)
if hc.Legacy != nil && hc.Legacy.TPBroadcastInvalidTxn != nil {
currentNode.BroadcastInvalidTx = *hc.Legacy.TPBroadcastInvalidTxn

@ -6,12 +6,13 @@ import (
"sync/atomic"
"time"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/internal/registry"
"github.com/harmony-one/abool"
bls_core "github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/consensus/quorum"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
bls_cosi "github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/internal/utils"
@ -62,8 +63,8 @@ type Consensus struct {
multiSigBitmap *bls_cosi.Mask // Bitmap for parsing multisig bitmap from validators
multiSigMutex sync.RWMutex
// The blockchain this consensus is working on
Blockchain core.BlockChain
// Registry for services.
registry *registry.Registry
// Minimal number of peers in the shard
// If the number of validators is less than minPeers, the consensus won't start
MinPeers int
@ -137,7 +138,12 @@ type Consensus struct {
dHelper *downloadHelper
}
// VerifyBlock is a function used to verify the block and keep trace of verified blocks
// Blockchain returns the blockchain.
func (consensus *Consensus) Blockchain() core.BlockChain {
return consensus.registry.GetBlockchain()
}
// VerifyBlock is a function used to verify the block and keep trace of verified blocks.
func (consensus *Consensus) VerifyBlock(block *types.Block) error {
if !consensus.FBFTLog.IsBlockVerified(block.Hash()) {
if err := consensus.BlockVerifier(block); err != nil {
@ -211,12 +217,12 @@ func (consensus *Consensus) BlockNum() uint64 {
// New create a new Consensus record
func New(
host p2p.Host, shard uint32, multiBLSPriKey multibls.PrivateKeys,
blockchain core.BlockChain,
registry *registry.Registry,
Decider quorum.Decider, minPeers int, aggregateSig bool,
) (*Consensus, error) {
consensus := Consensus{}
consensus.Decider = Decider
consensus.Blockchain = blockchain
consensus.registry = registry
consensus.MinPeers = minPeers
consensus.AggregateSig = aggregateSig
consensus.host = host

@ -264,7 +264,7 @@ func (consensus *Consensus) ReadSignatureBitmapPayload(
// (b) node in committed but has any err during processing: Syncing mode
// (c) node in committed and everything looks good: Normal mode
func (consensus *Consensus) UpdateConsensusInformation() Mode {
curHeader := consensus.Blockchain.CurrentHeader()
curHeader := consensus.Blockchain().CurrentHeader()
curEpoch := curHeader.Epoch()
nextEpoch := new(big.Int).Add(curHeader.Epoch(), common.Big1)
@ -286,13 +286,13 @@ func (consensus *Consensus) UpdateConsensusInformation() Mode {
consensus.BlockPeriod = 5 * time.Second
// Enable 2s block time at the twoSecondsEpoch
if consensus.Blockchain.Config().IsTwoSeconds(nextEpoch) {
if consensus.Blockchain().Config().IsTwoSeconds(nextEpoch) {
consensus.BlockPeriod = 2 * time.Second
}
isFirstTimeStaking := consensus.Blockchain.Config().IsStaking(nextEpoch) &&
curHeader.IsLastBlockInEpoch() && !consensus.Blockchain.Config().IsStaking(curEpoch)
haventUpdatedDecider := consensus.Blockchain.Config().IsStaking(curEpoch) &&
isFirstTimeStaking := consensus.Blockchain().Config().IsStaking(nextEpoch) &&
curHeader.IsLastBlockInEpoch() && !consensus.Blockchain().Config().IsStaking(curEpoch)
haventUpdatedDecider := consensus.Blockchain().Config().IsStaking(curEpoch) &&
consensus.Decider.Policy() != quorum.SuperMajorityStake
// Only happens once, the flip-over to a new Decider policy
@ -305,7 +305,7 @@ func (consensus *Consensus) UpdateConsensusInformation() Mode {
epochToSet := curEpoch
hasError := false
curShardState, err := committee.WithStakingEnabled.ReadFromDB(
curEpoch, consensus.Blockchain,
curEpoch, consensus.Blockchain(),
)
if err != nil {
consensus.getLogger().Error().
@ -321,7 +321,7 @@ func (consensus *Consensus) UpdateConsensusInformation() Mode {
if curHeader.IsLastBlockInEpoch() && isNotGenesisBlock {
nextShardState, err := committee.WithStakingEnabled.ReadFromDB(
nextEpoch, consensus.Blockchain,
nextEpoch, consensus.Blockchain(),
)
if err != nil {
consensus.getLogger().Error().
@ -389,7 +389,7 @@ func (consensus *Consensus) UpdateConsensusInformation() Mode {
// a solution to take care of this case because the coinbase of the latest block doesn't really represent the
// the real current leader in case of M1 view change.
if !curHeader.IsLastBlockInEpoch() && curHeader.Number().Uint64() != 0 {
leaderPubKey, err := chain.GetLeaderPubKeyFromCoinbase(consensus.Blockchain, curHeader)
leaderPubKey, err := chain.GetLeaderPubKeyFromCoinbase(consensus.Blockchain(), curHeader)
if err != nil || leaderPubKey == nil {
consensus.getLogger().Error().Err(err).
Msg("[UpdateConsensusInformation] Unable to get leaderPubKey from coinbase")
@ -527,7 +527,7 @@ func (consensus *Consensus) selfCommit(payload []byte) error {
consensus.switchPhase("selfCommit", FBFTCommit)
consensus.aggregatedPrepareSig = aggSig
consensus.prepareBitmap = mask
commitPayload := signature.ConstructCommitPayload(consensus.Blockchain,
commitPayload := signature.ConstructCommitPayload(consensus.Blockchain(),
block.Epoch(), block.Hash(), block.NumberU64(), block.Header().ViewID().Uint64())
for i, key := range consensus.priKey {
if err := consensus.commitBitmap.SetKey(key.Pub.Bytes, true); err != nil {

@ -7,6 +7,7 @@ import (
"github.com/harmony-one/abool"
"github.com/harmony-one/harmony/consensus/quorum"
"github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/internal/registry"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/multibls"
"github.com/harmony-one/harmony/p2p"
@ -90,7 +91,7 @@ func GenerateConsensusForTesting() (p2p.Host, multibls.PrivateKeys, *Consensus,
decider := quorum.NewDecider(quorum.SuperMajorityVote, shard.BeaconChainShardID)
multiBLSPrivateKey := multibls.GetPrivateKeys(bls.RandPrivateKey())
consensus, err := New(host, shard.BeaconChainShardID, multiBLSPrivateKey, nil, decider, 3, false)
consensus, err := New(host, shard.BeaconChainShardID, multiBLSPrivateKey, registry.New(), decider, 3, false)
if err != nil {
return nil, nil, nil, nil, err
}

@ -169,7 +169,7 @@ func (consensus *Consensus) finalCommit() {
return
}
consensus.getLogger().Info().Hex("new", commitSigAndBitmap).Msg("[finalCommit] Overriding commit signatures!!")
consensus.Blockchain.WriteCommitSig(block.NumberU64(), commitSigAndBitmap)
consensus.Blockchain().WriteCommitSig(block.NumberU64(), commitSigAndBitmap)
// Send committed message before block insertion.
// if leader successfully finalizes the block, send committed message to validators
@ -267,7 +267,7 @@ func (consensus *Consensus) BlockCommitSigs(blockNum uint64) ([]byte, error) {
if consensus.BlockNum() <= 1 {
return nil, nil
}
lastCommits, err := consensus.Blockchain.ReadCommitSig(blockNum)
lastCommits, err := consensus.Blockchain().ReadCommitSig(blockNum)
if err != nil ||
len(lastCommits) < bls.BLSSignatureSizeInBytes {
msgs := consensus.FBFTLog.GetMessagesByTypeSeq(
@ -363,9 +363,9 @@ func (consensus *Consensus) Start(
case <-consensus.syncReadyChan:
consensus.getLogger().Info().Msg("[ConsensusMainLoop] syncReadyChan")
consensus.mutex.Lock()
if consensus.BlockNum() < consensus.Blockchain.CurrentHeader().Number().Uint64()+1 {
consensus.SetBlockNum(consensus.Blockchain.CurrentHeader().Number().Uint64() + 1)
consensus.SetViewIDs(consensus.Blockchain.CurrentHeader().ViewID().Uint64() + 1)
if consensus.BlockNum() < consensus.Blockchain().CurrentHeader().Number().Uint64()+1 {
consensus.SetBlockNum(consensus.Blockchain().CurrentHeader().Number().Uint64() + 1)
consensus.SetViewIDs(consensus.Blockchain().CurrentHeader().ViewID().Uint64() + 1)
mode := consensus.UpdateConsensusInformation()
consensus.current.SetMode(mode)
consensus.getLogger().Info().Msg("[syncReadyChan] Start consensus timer")
@ -386,7 +386,7 @@ func (consensus *Consensus) Start(
// TODO: Refactor this piece of code to consensus/downloader.go after DNS legacy sync is removed
case <-consensus.syncNotReadyChan:
consensus.getLogger().Info().Msg("[ConsensusMainLoop] syncNotReadyChan")
consensus.SetBlockNum(consensus.Blockchain.CurrentHeader().Number().Uint64() + 1)
consensus.SetBlockNum(consensus.Blockchain().CurrentHeader().Number().Uint64() + 1)
consensus.current.SetMode(Syncing)
consensus.getLogger().Info().Msg("[ConsensusMainLoop] Node is OUT OF SYNC")
consensusSyncCounterVec.With(prometheus.Labels{"consensus": "out_of_sync"}).Inc()
@ -574,7 +574,7 @@ func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error {
Msg("[preCommitAndPropose] Sent Committed Message")
}
if _, err := consensus.Blockchain.InsertChain([]*types.Block{blk}, !consensus.FBFTLog.IsBlockVerified(blk.Hash())); err != nil {
if _, err := consensus.Blockchain().InsertChain([]*types.Block{blk}, !consensus.FBFTLog.IsBlockVerified(blk.Hash())); err != nil {
consensus.getLogger().Error().Err(err).Msg("[preCommitAndPropose] Failed to add block to chain")
return
}
@ -606,7 +606,7 @@ func (consensus *Consensus) verifyLastCommitSig(lastCommitSig []byte, blk *types
}
aggPubKey := consensus.commitBitmap.AggregatePublic
commitPayload := signature.ConstructCommitPayload(consensus.Blockchain,
commitPayload := signature.ConstructCommitPayload(consensus.Blockchain(),
blk.Epoch(), blk.Hash(), blk.NumberU64(), blk.Header().ViewID().Uint64())
if !aggSig.VerifyHash(aggPubKey, commitPayload) {
@ -658,8 +658,8 @@ func (consensus *Consensus) tryCatchup() error {
}
func (consensus *Consensus) commitBlock(blk *types.Block, committedMsg *FBFTMessage) error {
if consensus.Blockchain.CurrentBlock().NumberU64() < blk.NumberU64() {
if _, err := consensus.Blockchain.InsertChain([]*types.Block{blk}, !consensus.FBFTLog.IsBlockVerified(blk.Hash())); err != nil {
if consensus.Blockchain().CurrentBlock().NumberU64() < blk.NumberU64() {
if _, err := consensus.Blockchain().InsertChain([]*types.Block{blk}, !consensus.FBFTLog.IsBlockVerified(blk.Hash())); err != nil {
consensus.getLogger().Error().Err(err).Msg("[commitBlock] Failed to add block to chain")
return err
}
@ -716,7 +716,7 @@ func (consensus *Consensus) GenerateVrfAndProof(newHeader *block.Header) error {
return errors.New("[GenerateVrfAndProof] no leader private key provided")
}
sk := vrf_bls.NewVRFSigner(key.Pri)
previousHeader := consensus.Blockchain.GetHeaderByNumber(
previousHeader := consensus.Blockchain().GetHeaderByNumber(
newHeader.Number().Uint64() - 1,
)
if previousHeader == nil {
@ -745,7 +745,7 @@ func (consensus *Consensus) GenerateVdfAndProof(newBlock *types.Block, vrfBlockN
//derive VDF seed from VRFs generated in the current epoch
seed := [32]byte{}
for i := 0; i < consensus.VdfSeedSize(); i++ {
previousVrf := consensus.Blockchain.GetVrfByNumber(vrfBlockNumbers[i])
previousVrf := consensus.Blockchain().GetVrfByNumber(vrfBlockNumbers[i])
for j := 0; j < len(seed); j++ {
seed[j] = seed[j] ^ previousVrf[j]
}
@ -779,7 +779,7 @@ func (consensus *Consensus) GenerateVdfAndProof(newBlock *types.Block, vrfBlockN
// ValidateVdfAndProof validates the VDF/proof in the current epoch
func (consensus *Consensus) ValidateVdfAndProof(headerObj *block.Header) bool {
vrfBlockNumbers, err := consensus.Blockchain.ReadEpochVrfBlockNums(headerObj.Epoch())
vrfBlockNumbers, err := consensus.Blockchain().ReadEpochVrfBlockNums(headerObj.Epoch())
if err != nil {
consensus.getLogger().Error().Err(err).
Str("MsgBlockNum", headerObj.Number().String()).
@ -794,7 +794,7 @@ func (consensus *Consensus) ValidateVdfAndProof(headerObj *block.Header) bool {
seed := [32]byte{}
for i := 0; i < consensus.VdfSeedSize(); i++ {
previousVrf := consensus.Blockchain.GetVrfByNumber(vrfBlockNumbers[i])
previousVrf := consensus.Blockchain().GetVrfByNumber(vrfBlockNumbers[i])
for j := 0; j < len(seed); j++ {
seed[j] = seed[j] ^ previousVrf[j]
}

@ -40,8 +40,8 @@ func (consensus *Consensus) checkDoubleSign(recvMsg *FBFTMessage) bool {
return true
}
curHeader := consensus.Blockchain.CurrentHeader()
committee, err := consensus.Blockchain.ReadShardState(curHeader.Epoch())
curHeader := consensus.Blockchain().CurrentHeader()
committee, err := consensus.Blockchain().ReadShardState(curHeader.Epoch())
if err != nil {
consensus.getLogger().Err(err).
Uint32("shard", consensus.ShardID).

@ -90,7 +90,7 @@ func (dh *downloadHelper) downloadFinishedLoop() {
}
func (consensus *Consensus) addConsensusLastMile() error {
curBN := consensus.Blockchain.CurrentBlock().NumberU64()
curBN := consensus.Blockchain().CurrentBlock().NumberU64()
blockIter, err := consensus.GetLastMileBlockIter(curBN + 1)
if err != nil {
return err
@ -100,7 +100,7 @@ func (consensus *Consensus) addConsensusLastMile() error {
if block == nil {
break
}
if _, err := consensus.Blockchain.InsertChain(types.Blocks{block}, true); err != nil {
if _, err := consensus.Blockchain().InsertChain(types.Blocks{block}, true); err != nil {
return errors.Wrap(err, "failed to InsertChain")
}
}

@ -247,7 +247,7 @@ func (consensus *Consensus) onCommit(recvMsg *FBFTMessage) {
Msg("[OnCommit] Failed finding a matching block for committed message")
return
}
commitPayload := signature.ConstructCommitPayload(consensus.Blockchain,
commitPayload := signature.ConstructCommitPayload(consensus.Blockchain(),
blockObj.Epoch(), blockObj.Hash(), blockObj.NumberU64(), blockObj.Header().ViewID().Uint64())
logger = logger.With().
Uint64("MsgViewID", recvMsg.ViewID).

@ -46,7 +46,7 @@ func (consensus *Consensus) didReachPrepareQuorum() error {
Msg("[didReachPrepareQuorum] Unparseable block data")
return err
}
commitPayload := signature.ConstructCommitPayload(consensus.Blockchain,
commitPayload := signature.ConstructCommitPayload(consensus.Blockchain(),
blockObj.Epoch(), blockObj.Hash(), blockObj.NumberU64(), blockObj.Header().ViewID().Uint64())
// so by this point, everyone has committed to the blockhash of this block

@ -166,7 +166,7 @@ func (consensus *Consensus) sendCommitMessages(blockObj *types.Block) {
priKeys := consensus.getPriKeysInCommittee()
// Sign commit signature on the received block and construct the p2p messages
commitPayload := signature.ConstructCommitPayload(consensus.Blockchain,
commitPayload := signature.ConstructCommitPayload(consensus.Blockchain(),
blockObj.Epoch(), blockObj.Hash(), blockObj.NumberU64(), blockObj.Header().ViewID().Uint64())
p2pMsgs := consensus.constructP2pMessages(msg_pb.MessageType_COMMIT, commitPayload, priKeys)
@ -336,7 +336,7 @@ func (consensus *Consensus) onCommitted(recvMsg *FBFTMessage) {
Msg("[OnCommitted] Failed to parse commit sigBytes and bitmap")
return
}
if err := consensus.Blockchain.Engine().VerifyHeaderSignature(consensus.Blockchain, blockObj.Header(),
if err := consensus.Blockchain().Engine().VerifyHeaderSignature(consensus.Blockchain(), blockObj.Header(),
sigBytes, bitmap); err != nil {
consensus.getLogger().Error().
Uint64("blockNum", recvMsg.BlockNum).
@ -358,17 +358,17 @@ func (consensus *Consensus) onCommitted(recvMsg *FBFTMessage) {
// If we already have a committed signature received before, check whether the new one
// has more signatures and if yes, override the old data.
// Otherwise, simply write the commit signature in db.
commitSigBitmap, err := consensus.Blockchain.ReadCommitSig(blockObj.NumberU64())
commitSigBitmap, err := consensus.Blockchain().ReadCommitSig(blockObj.NumberU64())
// Need to check whether this block actually was committed, because it could be another block
// with the same number that's committed and overriding its commit sigBytes is wrong.
blk := consensus.Blockchain.GetBlockByHash(blockObj.Hash())
blk := consensus.Blockchain().GetBlockByHash(blockObj.Hash())
if err == nil && len(commitSigBitmap) == len(recvMsg.Payload) && blk != nil {
new := mask.CountEnabled()
mask.SetMask(commitSigBitmap[bls.BLSSignatureSizeInBytes:])
cur := mask.CountEnabled()
if new > cur {
consensus.getLogger().Info().Hex("old", commitSigBitmap).Hex("new", recvMsg.Payload).Msg("[OnCommitted] Overriding commit signatures!!")
consensus.Blockchain.WriteCommitSig(blockObj.NumberU64(), recvMsg.Payload)
consensus.Blockchain().WriteCommitSig(blockObj.NumberU64(), recvMsg.Payload)
}
}

@ -129,10 +129,10 @@ func (consensus *Consensus) fallbackNextViewID() (uint64, time.Duration) {
// viewID is only used as the fallback mechansim to determine the nextViewID
func (consensus *Consensus) getNextViewID() (uint64, time.Duration) {
// handle corner case at first
if consensus.Blockchain == nil {
if consensus.Blockchain() == nil {
return consensus.fallbackNextViewID()
}
curHeader := consensus.Blockchain.CurrentHeader()
curHeader := consensus.Blockchain().CurrentHeader()
if curHeader == nil {
return consensus.fallbackNextViewID()
}
@ -172,12 +172,13 @@ func (consensus *Consensus) getNextLeaderKey(viewID uint64) *bls.PublicKeyWrappe
}
var lastLeaderPubKey *bls.PublicKeyWrapper
var err error
blockchain := consensus.Blockchain()
epoch := big.NewInt(0)
if consensus.Blockchain == nil {
if blockchain == nil {
consensus.getLogger().Error().Msg("[getNextLeaderKey] Blockchain is nil. Use consensus.LeaderPubKey")
lastLeaderPubKey = consensus.LeaderPubKey
} else {
curHeader := consensus.Blockchain.CurrentHeader()
curHeader := blockchain.CurrentHeader()
if curHeader == nil {
consensus.getLogger().Error().Msg("[getNextLeaderKey] Failed to get current header from blockchain")
lastLeaderPubKey = consensus.LeaderPubKey
@ -185,7 +186,7 @@ func (consensus *Consensus) getNextLeaderKey(viewID uint64) *bls.PublicKeyWrappe
stuckBlockViewID := curHeader.ViewID().Uint64() + 1
gap = int(viewID - stuckBlockViewID)
// this is the truth of the leader based on blockchain blocks
lastLeaderPubKey, err = chain.GetLeaderPubKeyFromCoinbase(consensus.Blockchain, curHeader)
lastLeaderPubKey, err = chain.GetLeaderPubKeyFromCoinbase(blockchain, curHeader)
if err != nil || lastLeaderPubKey == nil {
consensus.getLogger().Error().Err(err).
Msg("[getNextLeaderKey] Unable to get leaderPubKey from coinbase. Set it to consensus.LeaderPubKey")
@ -215,7 +216,7 @@ func (consensus *Consensus) getNextLeaderKey(viewID uint64) *bls.PublicKeyWrappe
// FIXME: rotate leader on harmony nodes only before fully externalization
var wasFound bool
var next *bls.PublicKeyWrapper
if consensus.Blockchain != nil && consensus.Blockchain.Config().IsAllowlistEpoch(epoch) {
if blockchain != nil && blockchain.Config().IsAllowlistEpoch(epoch) {
wasFound, next = consensus.Decider.NthNextHmyExt(
shard.Schedule.InstanceForEpoch(epoch),
lastLeaderPubKey,

@ -0,0 +1,35 @@
package registry
import (
"sync"
"github.com/harmony-one/harmony/core"
)
// Registry consolidates services at one place.
type Registry struct {
mu sync.Mutex
blockchain core.BlockChain
}
// New creates a new registry.
func New() *Registry {
return &Registry{}
}
// SetBlockchain sets the blockchain to registry.
func (r *Registry) SetBlockchain(bc core.BlockChain) *Registry {
r.mu.Lock()
defer r.mu.Unlock()
r.blockchain = bc
return r
}
// GetBlockchain gets the blockchain from registry.
func (r *Registry) GetBlockchain() core.BlockChain {
r.mu.Lock()
defer r.mu.Unlock()
return r.blockchain
}

@ -0,0 +1,16 @@
package registry
import (
"testing"
"github.com/harmony-one/harmony/core"
"github.com/stretchr/testify/require"
)
func TestRegistry(t *testing.T) {
registry := New()
require.Nil(t, registry.GetBlockchain())
registry.SetBlockchain(core.Stub{})
require.NotNil(t, registry.GetBlockchain())
}

@ -13,6 +13,7 @@ import (
"time"
"github.com/harmony-one/harmony/consensus/engine"
"github.com/harmony-one/harmony/internal/registry"
"github.com/harmony-one/harmony/internal/shardchain/tikv_manage"
"github.com/harmony-one/harmony/internal/tikv"
"github.com/harmony-one/harmony/internal/tikv/redis_helper"
@ -154,19 +155,12 @@ type Node struct {
// context control for pub-sub handling
psCtx context.Context
psCancel func()
registry *registry.Registry
}
// Blockchain returns the blockchain for the node's current shard.
func (node *Node) Blockchain() core.BlockChain {
shardID := node.NodeConfig.ShardID
bc, err := node.shardChains.ShardChain(shardID)
if err != nil {
utils.Logger().Error().
Uint32("shardID", shardID).
Err(err).
Msg("cannot get shard chain")
}
return bc
return node.registry.GetBlockchain()
}
func (node *Node) SyncInstance() ISync {
@ -1031,11 +1025,15 @@ func New(
localAccounts []common.Address,
isArchival map[uint32]bool,
harmonyconfig *harmonyconfig.HarmonyConfig,
registry *registry.Registry,
) *Node {
node := Node{}
node.unixTimeAtNodeStart = time.Now().Unix()
node.TransactionErrorSink = types.NewTransactionErrorSink()
node.crosslinks = crosslinks.New()
node := Node{
registry: registry,
unixTimeAtNodeStart: time.Now().Unix(),
TransactionErrorSink: types.NewTransactionErrorSink(),
crosslinks: crosslinks.New(),
}
// Get the node config that's created in the harmony.go program.
if consensusObj != nil {
node.NodeConfig = nodeconfig.GetShardConfig(consensusObj.ShardID)
@ -1214,7 +1212,7 @@ func (node *Node) InitConsensusWithValidators() (err error) {
Uint64("epoch", epoch.Uint64()).
Msg("[InitConsensusWithValidators] Try To Get PublicKeys")
shardState, err := committee.WithStakingEnabled.Compute(
epoch, node.Consensus.Blockchain,
epoch, node.Consensus.Blockchain(),
)
if err != nil {
utils.Logger().Err(err).
@ -1336,7 +1334,7 @@ func (node *Node) populateSelfAddresses(epoch *big.Int) {
node.keysToAddrsEpoch = epoch
shardID := node.Consensus.ShardID
shardState, err := node.Consensus.Blockchain.ReadShardState(epoch)
shardState, err := node.Consensus.Blockchain().ReadShardState(epoch)
if err != nil {
utils.Logger().Error().Err(err).
Int64("epoch", epoch.Int64()).

@ -209,7 +209,7 @@ func (node *Node) BroadcastCrossLinkFromShardsToBeacon() { // leader of 1-3 shar
err = node.host.SendMessageToGroups(
[]nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(shard.BeaconChainShardID)},
p2p.ConstructMessage(
proto_node.ConstructCrossLinkMessage(node.Consensus.Blockchain, headers)),
proto_node.ConstructCrossLinkMessage(node.Consensus.Blockchain(), headers)),
)
if err != nil {
utils.Logger().Error().Err(err).Msgf("[BroadcastCrossLink] failed to broadcast message")

@ -12,6 +12,7 @@ import (
"github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/internal/chain"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/registry"
"github.com/harmony-one/harmony/internal/shardchain"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/multibls"
@ -40,14 +41,19 @@ func TestAddNewBlock(t *testing.T) {
decider := quorum.NewDecider(
quorum.SuperMajorityVote, shard.BeaconChainShardID,
)
blockchain, err := collection.ShardChain(shard.BeaconChainShardID)
if err != nil {
t.Fatal("cannot get blockchain")
}
reg := registry.New().SetBlockchain(blockchain)
consensus, err := consensus.New(
host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsKey), nil, decider, 3, false,
host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsKey), reg, decider, 3, false,
)
if err != nil {
t.Fatalf("Cannot craeate consensus: %v", err)
}
nodeconfig.SetNetworkType(nodeconfig.Devnet)
node := New(host, consensus, engine, collection, nil, nil, nil, nil, nil)
node := New(host, consensus, engine, collection, nil, nil, nil, nil, nil, reg)
txs := make(map[common.Address]types.Transactions)
stks := staking.StakingTransactions{}
@ -92,8 +98,13 @@ func TestVerifyNewBlock(t *testing.T) {
decider := quorum.NewDecider(
quorum.SuperMajorityVote, shard.BeaconChainShardID,
)
blockchain, err := collection.ShardChain(shard.BeaconChainShardID)
if err != nil {
t.Fatal("cannot get blockchain")
}
reg := registry.New().SetBlockchain(blockchain)
consensus, err := consensus.New(
host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsKey), nil, decider, 3, false,
host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsKey), reg, decider, 3, false,
)
if err != nil {
t.Fatalf("Cannot craeate consensus: %v", err)
@ -101,7 +112,7 @@ func TestVerifyNewBlock(t *testing.T) {
archiveMode := make(map[uint32]bool)
archiveMode[0] = true
archiveMode[1] = false
node := New(host, consensus, engine, collection, nil, nil, nil, archiveMode, nil)
node := New(host, consensus, engine, collection, nil, nil, nil, archiveMode, nil, reg)
txs := make(map[common.Address]types.Transactions)
stks := staking.StakingTransactions{}
@ -147,8 +158,9 @@ func TestVerifyVRF(t *testing.T) {
decider := quorum.NewDecider(
quorum.SuperMajorityVote, shard.BeaconChainShardID,
)
reg := registry.New().SetBlockchain(blockchain)
consensus, err := consensus.New(
host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsKey), blockchain, decider, 3, false,
host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsKey), reg, decider, 3, false,
)
if err != nil {
t.Fatalf("Cannot craeate consensus: %v", err)
@ -156,7 +168,7 @@ func TestVerifyVRF(t *testing.T) {
archiveMode := make(map[uint32]bool)
archiveMode[0] = true
archiveMode[1] = false
node := New(host, consensus, engine, collection, nil, nil, nil, archiveMode, nil)
node := New(host, consensus, engine, collection, nil, nil, nil, archiveMode, nil, reg)
txs := make(map[common.Address]types.Transactions)
stks := staking.StakingTransactions{}

@ -12,6 +12,7 @@ import (
"github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/internal/chain"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/registry"
"github.com/harmony-one/harmony/internal/shardchain"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/multibls"
@ -52,7 +53,7 @@ func TestFinalizeNewBlockAsync(t *testing.T) {
t.Fatalf("Cannot craeate consensus: %v", err)
}
node := New(host, consensus, engine, collection, nil, nil, nil, nil, nil)
node := New(host, consensus, engine, collection, nil, nil, nil, nil, nil, registry.New().SetBlockchain(blockchain))
node.Worker.UpdateCurrent()

@ -10,6 +10,7 @@ import (
"github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/internal/chain"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/registry"
"github.com/harmony-one/harmony/internal/shardchain"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/multibls"
@ -36,17 +37,23 @@ func TestNewNode(t *testing.T) {
decider := quorum.NewDecider(
quorum.SuperMajorityVote, shard.BeaconChainShardID,
)
chainconfig := nodeconfig.GetShardConfig(shard.BeaconChainShardID).GetNetworkType().ChainConfig()
collection := shardchain.NewCollection(
nil, testDBFactory, &core.GenesisInitializer{NetworkType: nodeconfig.GetShardConfig(shard.BeaconChainShardID).GetNetworkType()}, engine, &chainconfig,
)
blockchain, err := collection.ShardChain(shard.BeaconChainShardID)
if err != nil {
t.Fatal("cannot get blockchain")
}
reg := registry.New().SetBlockchain(blockchain)
consensus, err := consensus.New(
host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsKey), nil, decider, 3, false,
host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsKey), reg, decider, 3, false,
)
if err != nil {
t.Fatalf("Cannot craeate consensus: %v", err)
}
chainconfig := nodeconfig.GetShardConfig(shard.BeaconChainShardID).GetNetworkType().ChainConfig()
collection := shardchain.NewCollection(
nil, testDBFactory, &core.GenesisInitializer{NetworkType: nodeconfig.GetShardConfig(shard.BeaconChainShardID).GetNetworkType()}, engine, &chainconfig,
)
node := New(host, consensus, engine, collection, nil, nil, nil, nil, nil)
node := New(host, consensus, engine, collection, nil, nil, nil, nil, nil, reg)
if node.Consensus == nil {
t.Error("Consensus is not initialized for the node")
}

Loading…
Cancel
Save