From 29bcc0c48b285aa8104a8a4e72de0878f40c00f8 Mon Sep 17 00:00:00 2001 From: Konstantin <355847+Frozen@users.noreply.github.com> Date: Thu, 24 Nov 2022 12:17:13 +0700 Subject: [PATCH] Consensus accepts blockchain as a parameter. (#4296) * Consensus doesn't require anymore `Node` as a circular dependency. * Proper blockchain initialization. * Removed double initialization. --- cmd/harmony/main.go | 45 ++++---- consensus/consensus.go | 6 +- consensus/consensus_service_test.go | 4 +- consensus/consensus_test.go | 2 +- consensus/construct_test.go | 12 +-- core/blockchain.go | 2 +- core/blockchain_impl.go | 162 +++++++++++++++++++++++++++- core/blockchain_stub.go | 2 +- node/node.go | 11 +- node/node_cross_link.go | 67 +----------- node/node_cross_shard.go | 46 -------- node/node_handler.go | 122 +++++---------------- node/node_handler_test.go | 37 +++++-- node/node_newblock_test.go | 24 +++-- node/node_test.go | 29 +++-- 15 files changed, 302 insertions(+), 269 deletions(-) diff --git a/cmd/harmony/main.go b/cmd/harmony/main.go index 0c3056df4..e10994913 100644 --- a/cmd/harmony/main.go +++ b/cmd/harmony/main.go @@ -16,6 +16,7 @@ import ( "time" "github.com/harmony-one/harmony/consensus/quorum" + "github.com/harmony-one/harmony/internal/chain" "github.com/harmony-one/harmony/internal/shardchain/tikv_manage" "github.com/harmony-one/harmony/internal/tikv/redis_helper" "github.com/harmony-one/harmony/internal/tikv/statedb_cache" @@ -647,17 +648,6 @@ func createGlobalConfig(hc harmonyconfig.HarmonyConfig) (*nodeconfig.ConfigType, } func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfig.ConfigType) *node.Node { - // Consensus object. - // TODO: consensus object shouldn't start here - decider := quorum.NewDecider(quorum.SuperMajorityVote, uint32(hc.General.ShardID)) - currentConsensus, err := consensus.New( - myHost, nodeConfig.ShardID, nodeConfig.ConsensusPriKey, decider) - - if err != nil { - _, _ = fmt.Fprintf(os.Stderr, "Error :%v \n", err) - os.Exit(1) - } - // Parse minPeers from harmonyconfig.HarmonyConfig var minPeers int var aggregateSig bool @@ -668,8 +658,6 @@ func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfi minPeers = defaultConsensusConfig.MinPeers aggregateSig = defaultConsensusConfig.AggregateSig } - currentConsensus.MinPeers = minPeers - currentConsensus.AggregateSig = aggregateSig blacklist, err := setupBlacklist(hc) if err != nil { @@ -701,7 +689,30 @@ func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfi chainDBFactory = &shardchain.LDBFactory{RootDir: nodeConfig.DBDir} } - currentNode := node.New(myHost, currentConsensus, chainDBFactory, blacklist, allowedTxs, localAccounts, nodeConfig.ArchiveModes(), &hc) + engine := chain.NewEngine() + + chainConfig := nodeConfig.GetNetworkType().ChainConfig() + collection := shardchain.NewCollection( + &hc, chainDBFactory, &core.GenesisInitializer{NetworkType: nodeConfig.GetNetworkType()}, engine, &chainConfig, + ) + + blockchain, err := collection.ShardChain(nodeConfig.ShardID) + if err != nil { + _, _ = fmt.Fprintf(os.Stderr, "Error :%v \n", err) + os.Exit(1) + } + + // Consensus object. + decider := quorum.NewDecider(quorum.SuperMajorityVote, nodeConfig.ShardID) + currentConsensus, err := consensus.New( + myHost, nodeConfig.ShardID, nodeConfig.ConsensusPriKey, blockchain, decider, minPeers, aggregateSig) + + if err != nil { + _, _ = fmt.Fprintf(os.Stderr, "Error :%v \n", err) + os.Exit(1) + } + + currentNode := node.New(myHost, currentConsensus, engine, collection, blacklist, allowedTxs, localAccounts, nodeConfig.ArchiveModes(), &hc) if hc.Legacy != nil && hc.Legacy.TPBroadcastInvalidTxn != nil { currentNode.BroadcastInvalidTx = *hc.Legacy.TPBroadcastInvalidTxn @@ -724,9 +735,6 @@ func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfi } else { currentNode.SyncingPeerProvider = node.NewDNSSyncingPeerProvider(hc.DNSSync.Zone, strconv.Itoa(hc.DNSSync.Port)) } - - // TODO: refactor the creation of blockchain out of node.New() - currentConsensus.Blockchain = currentNode.Blockchain() currentNode.NodeConfig.DNSZone = hc.DNSSync.Zone currentNode.NodeConfig.SetBeaconGroupID( @@ -755,7 +763,8 @@ func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfi Msg("Init Blockchain") // Assign closure functions to the consensus object - currentConsensus.SetBlockVerifier(currentNode.VerifyNewBlock) + currentConsensus.SetBlockVerifier( + node.VerifyNewBlock(currentNode.NodeConfig, currentNode.Blockchain(), currentNode.Beaconchain())) currentConsensus.PostConsensusJob = currentNode.PostConsensusProcessing // update consensus information based on the blockchain currentConsensus.SetMode(currentConsensus.UpdateConsensusInformation()) diff --git a/consensus/consensus.go b/consensus/consensus.go index 8708df049..4e4ac882e 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -211,10 +211,14 @@ func (consensus *Consensus) BlockNum() uint64 { // New create a new Consensus record func New( host p2p.Host, shard uint32, multiBLSPriKey multibls.PrivateKeys, - Decider quorum.Decider, + blockchain core.BlockChain, + Decider quorum.Decider, minPeers int, aggregateSig bool, ) (*Consensus, error) { consensus := Consensus{} consensus.Decider = Decider + consensus.Blockchain = blockchain + consensus.MinPeers = minPeers + consensus.AggregateSig = aggregateSig consensus.host = host consensus.msgSender = NewMessageSender(host) consensus.BlockNumLowChan = make(chan struct{}, 1) diff --git a/consensus/consensus_service_test.go b/consensus/consensus_service_test.go index 4aa4c3473..dd2fca7ab 100644 --- a/consensus/consensus_service_test.go +++ b/consensus/consensus_service_test.go @@ -25,7 +25,7 @@ func TestSignAndMarshalConsensusMessage(t *testing.T) { } decider := quorum.NewDecider(quorum.SuperMajorityVote, shard.BeaconChainShardID) blsPriKey := bls.RandPrivateKey() - consensus, err := New(host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsPriKey), decider) + consensus, err := New(host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsPriKey), nil, decider, 3, false) if err != nil { t.Fatalf("Cannot craeate consensus: %v", err) } @@ -58,7 +58,7 @@ func TestSetViewID(t *testing.T) { ) blsPriKey := bls.RandPrivateKey() consensus, err := New( - host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsPriKey), decider, + host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsPriKey), nil, decider, 3, false, ) if err != nil { t.Fatalf("Cannot craeate consensus: %v", err) diff --git a/consensus/consensus_test.go b/consensus/consensus_test.go index 1f559045f..ca2875f9c 100644 --- a/consensus/consensus_test.go +++ b/consensus/consensus_test.go @@ -90,7 +90,7 @@ func GenerateConsensusForTesting() (p2p.Host, multibls.PrivateKeys, *Consensus, decider := quorum.NewDecider(quorum.SuperMajorityVote, shard.BeaconChainShardID) multiBLSPrivateKey := multibls.GetPrivateKeys(bls.RandPrivateKey()) - consensus, err := New(host, shard.BeaconChainShardID, multiBLSPrivateKey, decider) + consensus, err := New(host, shard.BeaconChainShardID, multiBLSPrivateKey, nil, decider, 3, false) if err != nil { return nil, nil, nil, nil, err } diff --git a/consensus/construct_test.go b/consensus/construct_test.go index 6d78938a4..71da42e3f 100644 --- a/consensus/construct_test.go +++ b/consensus/construct_test.go @@ -31,7 +31,7 @@ func TestConstructAnnounceMessage(test *testing.T) { quorum.SuperMajorityVote, shard.BeaconChainShardID, ) blsPriKey := bls.RandPrivateKey() - consensus, err := New(host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsPriKey), decider) + consensus, err := New(host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsPriKey), nil, decider, 3, false) if err != nil { test.Fatalf("Cannot create consensus: %v", err) } @@ -63,9 +63,7 @@ func TestConstructPreparedMessage(test *testing.T) { quorum.SuperMajorityVote, shard.BeaconChainShardID, ) blsPriKey := bls.RandPrivateKey() - consensus, err := New( - host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsPriKey), decider, - ) + consensus, err := New(host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsPriKey), nil, decider, 3, false) if err != nil { test.Fatalf("Cannot craeate consensus: %v", err) } @@ -145,7 +143,7 @@ func TestConstructPrepareMessage(test *testing.T) { ) consensus, err := New( - host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsPriKey1), decider, + host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsPriKey1), nil, decider, 3, false, ) if err != nil { test.Fatalf("Cannot create consensus: %v", err) @@ -236,7 +234,7 @@ func TestConstructCommitMessage(test *testing.T) { quorum.SuperMajorityStake, shard.BeaconChainShardID, ) - consensus, err := New(host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsPriKey1), decider) + consensus, err := New(host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsPriKey1), nil, decider, 3, false) if err != nil { test.Fatalf("Cannot create consensus: %v", err) } @@ -318,7 +316,7 @@ func TestPopulateMessageFields(t *testing.T) { quorum.SuperMajorityVote, shard.BeaconChainShardID, ) consensus, err := New( - host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsPriKey), decider, + host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsPriKey), nil, decider, 3, false, ) if err != nil { t.Fatalf("Cannot craeate consensus: %v", err) diff --git a/core/blockchain.go b/core/blockchain.go index f205ffd2a..1c7ea43d3 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -43,7 +43,7 @@ type Options struct { // canonical chain. type BlockChain interface { // ValidateNewBlock validates new block. - ValidateNewBlock(block *types.Block) error + ValidateNewBlock(block *types.Block, beaconChain BlockChain) error // SetHead rewinds the local chain to a new head. In the case of headers, everything // above the new head will be deleted and the new one set. In the case of blocks // though, the head may be further rewound if block bodies are missing (non-archive diff --git a/core/blockchain_impl.go b/core/blockchain_impl.go index 0e24dde84..783a2e98e 100644 --- a/core/blockchain_impl.go +++ b/core/blockchain_impl.go @@ -90,6 +90,8 @@ var ( // errExceedMaxPendingSlashes .. errExceedMaxPendingSlashes = errors.New("exceeed max pending slashes") errNilEpoch = errors.New("nil epoch for voting power computation") + errAlreadyExist = errors.New("crosslink already exist") + errDoubleSpent = errors.New("[verifyIncomingReceipts] Double Spent") ) const ( @@ -294,7 +296,165 @@ func newBlockChainWithOptions( return bc, nil } -func (bc *BlockChainImpl) ValidateNewBlock(block *types.Block) error { +// VerifyBlockCrossLinks verifies the crosslinks of the block. +// This function should be called from beacon chain. +func VerifyBlockCrossLinks(blockchain BlockChain, block *types.Block) error { + cxLinksData := block.Header().CrossLinks() + if len(cxLinksData) == 0 { + utils.Logger().Debug().Msgf("[CrossLinkVerification] Zero CrossLinks in the header") + return nil + } + + crossLinks := types.CrossLinks{} + err := rlp.DecodeBytes(cxLinksData, &crossLinks) + if err != nil { + return errors.Wrapf( + err, "[CrossLinkVerification] failed to decode cross links", + ) + } + + if !crossLinks.IsSorted() { + return errors.New("[CrossLinkVerification] cross links are not sorted") + } + + for _, crossLink := range crossLinks { + // ReadCrossLink beacon chain usage. + cl, err := blockchain.ReadCrossLink(crossLink.ShardID(), crossLink.BlockNum()) + if err == nil && cl != nil { + utils.Logger().Err(errAlreadyExist). + Uint64("beacon-block-number", block.NumberU64()). + Interface("remote", crossLink). + Interface("local", cl). + Msg("[CrossLinkVerification]") + // TODO Add slash for exist same blocknum but different crosslink + return errors.Wrapf( + errAlreadyExist, + "[CrossLinkVerification] shard: %d block: %d on beacon block %d", + crossLink.ShardID(), + crossLink.BlockNum(), + block.NumberU64(), + ) + } + if err := VerifyCrossLink(blockchain, crossLink); err != nil { + return errors.Wrapf(err, "cannot VerifyBlockCrossLinks") + } + } + return nil +} + +// VerifyCrossLink verifies the header is valid +func VerifyCrossLink(blockchain BlockChain, cl types.CrossLink) error { + if blockchain.ShardID() != shard.BeaconChainShardID { + return errors.New("[VerifyCrossLink] Shard chains should not verify cross links") + } + engine := blockchain.Engine() + + if err := engine.VerifyCrossLink(blockchain, cl); err != nil { + return errors.Wrap(err, "[VerifyCrossLink]") + } + return nil +} + +func VerifyIncomingReceipts(blockchain BlockChain, block *types.Block) error { + m := make(map[common.Hash]struct{}) + cxps := block.IncomingReceipts() + for _, cxp := range cxps { + // double spent + if blockchain.IsSpent(cxp) { + return errDoubleSpent + } + hash := cxp.MerkleProof.BlockHash + // duplicated receipts + if _, ok := m[hash]; ok { + return errDoubleSpent + } + m[hash] = struct{}{} + + for _, item := range cxp.Receipts { + if s := blockchain.ShardID(); item.ToShardID != s { + return errors.Errorf( + "[verifyIncomingReceipts] Invalid ToShardID %d expectShardID %d", + s, item.ToShardID, + ) + } + } + + if err := blockchain.Validator().ValidateCXReceiptsProof(cxp); err != nil { + return errors.Wrapf(err, "[verifyIncomingReceipts] verification failed") + } + } + + incomingReceiptHash := types.EmptyRootHash + if len(cxps) > 0 { + incomingReceiptHash = types.DeriveSha(cxps) + } + if incomingReceiptHash != block.Header().IncomingReceiptHash() { + return errors.New("[verifyIncomingReceipts] Invalid IncomingReceiptHash in block header") + } + + return nil +} + +func (bc *BlockChainImpl) ValidateNewBlock(block *types.Block, beaconChain BlockChain) error { + if block == nil || block.Header() == nil { + return errors.New("nil header or block asked to verify") + } + + if block.ShardID() != bc.ShardID() { + utils.Logger().Error(). + Uint32("my shard ID", bc.ShardID()). + Uint32("new block's shard ID", block.ShardID()). + Msg("[ValidateNewBlock] Wrong shard ID of the new block") + return errors.New("[ValidateNewBlock] Wrong shard ID of the new block") + } + + if block.NumberU64() <= bc.CurrentBlock().NumberU64() { + return errors.Errorf("block with the same block number is already committed: %d", block.NumberU64()) + } + if err := bc.Validator().ValidateHeader(block, true); err != nil { + utils.Logger().Error(). + Str("blockHash", block.Hash().Hex()). + Err(err). + Msg("[ValidateNewBlock] Cannot validate header for the new block") + return err + } + if err := bc.Engine().VerifyVRF( + bc, block.Header(), + ); err != nil { + utils.Logger().Error(). + Str("blockHash", block.Hash().Hex()). + Err(err). + Msg("[ValidateNewBlock] Cannot verify vrf for the new block") + return errors.Wrap(err, + "[ValidateNewBlock] Cannot verify vrf for the new block", + ) + } + err := bc.Engine().VerifyShardState(bc, beaconChain, block.Header()) + if err != nil { + utils.Logger().Error(). + Str("blockHash", block.Hash().Hex()). + Err(err). + Msg("[ValidateNewBlock] Cannot verify shard state for the new block") + return errors.Wrap(err, + "[ValidateNewBlock] Cannot verify shard state for the new block", + ) + } + err = bc.validateNewBlock(block) + if err != nil { + return err + } + if bc.shardID == shard.BeaconChainShardID { + err = VerifyBlockCrossLinks(bc, block) + if err != nil { + utils.Logger().Debug().Err(err).Msg("ops2 VerifyBlockCrossLinks Failed") + return err + } + } + + return VerifyIncomingReceipts(bc, block) +} + +func (bc *BlockChainImpl) validateNewBlock(block *types.Block) error { state, err := state.New(bc.CurrentBlock().Root(), bc.stateCache) if err != nil { return err diff --git a/core/blockchain_stub.go b/core/blockchain_stub.go index ad7e21b04..ccb1c9847 100644 --- a/core/blockchain_stub.go +++ b/core/blockchain_stub.go @@ -29,7 +29,7 @@ type Stub struct { Name string } -func (a Stub) ValidateNewBlock(block *types.Block) error { +func (a Stub) ValidateNewBlock(block *types.Block, beaconChain BlockChain) error { return errors.Errorf("method ValidateNewBlock not implemented for %s", a.Name) } diff --git a/node/node.go b/node/node.go index 4efd42b98..8b0c8a835 100644 --- a/node/node.go +++ b/node/node.go @@ -12,6 +12,7 @@ import ( "sync" "time" + "github.com/harmony-one/harmony/consensus/engine" "github.com/harmony-one/harmony/internal/shardchain/tikv_manage" "github.com/harmony-one/harmony/internal/tikv" "github.com/harmony-one/harmony/internal/tikv/redis_helper" @@ -43,7 +44,6 @@ import ( "github.com/harmony-one/harmony/core/rawdb" "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/crypto/bls" - "github.com/harmony-one/harmony/internal/chain" common2 "github.com/harmony-one/harmony/internal/common" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/internal/params" @@ -987,7 +987,8 @@ func (node *Node) GetSyncID() [SyncIDLength]byte { func New( host p2p.Host, consensusObj *consensus.Consensus, - chainDBFactory shardchain.DBFactory, + engine engine.Engine, + collection *shardchain.CollectionImpl, blacklist map[common.Address]struct{}, allowedTxs map[common.Address]core.AllowedTxData, localAccounts []common.Address, @@ -1016,12 +1017,6 @@ func New( chainConfig := networkType.ChainConfig() node.chainConfig = chainConfig - engine := chain.NewEngine() - - collection := shardchain.NewCollection( - harmonyconfig, chainDBFactory, &core.GenesisInitializer{NetworkType: node.NodeConfig.GetNetworkType()}, engine, &chainConfig, - ) - for shardID, archival := range isArchival { if archival { collection.DisableCache(shardID) diff --git a/node/node_cross_link.go b/node/node_cross_link.go index c855739b8..b6e1f10bf 100644 --- a/node/node_cross_link.go +++ b/node/node_cross_link.go @@ -6,6 +6,7 @@ import ( common2 "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rlp" ffi_bls "github.com/harmony-one/bls/ffi/go/bls" + "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/shard" @@ -18,57 +19,6 @@ const ( crossLinkBatchSize = 3 ) -var ( - errAlreadyExist = errors.New("crosslink already exist") -) - -// VerifyBlockCrossLinks verifies the crosslinks of the block. -// This method should be called from beacon chain. -func (node *Node) VerifyBlockCrossLinks(block *types.Block) error { - cxLinksData := block.Header().CrossLinks() - if len(cxLinksData) == 0 { - utils.Logger().Debug().Msgf("[CrossLinkVerification] Zero CrossLinks in the header") - return nil - } - - crossLinks := types.CrossLinks{} - err := rlp.DecodeBytes(cxLinksData, &crossLinks) - if err != nil { - return errors.Wrapf( - err, "[CrossLinkVerification] failed to decode cross links", - ) - } - - if !crossLinks.IsSorted() { - return errors.New("[CrossLinkVerification] cross links are not sorted") - } - - for _, crossLink := range crossLinks { - // ReadCrossLink beacon chain usage. - cl, err := node.Blockchain().ReadCrossLink(crossLink.ShardID(), crossLink.BlockNum()) - if err == nil && cl != nil { - utils.Logger().Err(errAlreadyExist). - Uint64("beacon-block-number", block.NumberU64()). - Interface("remote", crossLink). - Interface("local", cl). - Msg("[CrossLinkVerification]") - // TODO Add slash for exist same blocknum but different crosslink - return errors.Wrapf( - errAlreadyExist, - "[CrossLinkVerification] shard: %d block: %d on beacon block %d", - crossLink.ShardID(), - crossLink.BlockNum(), - block.NumberU64(), - ) - } - if err := node.VerifyCrossLink(crossLink); err != nil { - return errors.Wrapf(err, "cannot VerifyBlockCrossLinks") - - } - } - return nil -} - // ProcessCrossLinkHeartbeatMessage process crosslink heart beat signal. // This function is only called on shards 1,2,3 when network message `CrosslinkHeartbeat` receiving. func (node *Node) ProcessCrossLinkHeartbeatMessage(msgPayload []byte) { @@ -198,7 +148,7 @@ func (node *Node) ProcessCrossLinkMessage(msgPayload []byte) { continue } - if err = node.VerifyCrossLink(cl); err != nil { + if err = core.VerifyCrossLink(node.Blockchain(), cl); err != nil { nodeCrossLinkMessageCounterVec.With(prometheus.Labels{"type": "invalid_crosslink"}).Inc() utils.Logger().Info(). Str("cross-link-issue", err.Error()). @@ -219,16 +169,3 @@ func (node *Node) ProcessCrossLinkMessage(msgPayload []byte) { Msgf("[ProcessingCrossLink] Add pending crosslinks, total pending: %d", Len) } } - -// VerifyCrossLink verifies the header is valid -func (node *Node) VerifyCrossLink(cl types.CrossLink) error { - if node.Blockchain().ShardID() != shard.BeaconChainShardID { - return errors.New("[VerifyCrossLink] Shard chains should not verify cross links") - } - engine := node.Blockchain().Engine() - - if err := engine.VerifyCrossLink(node.Blockchain(), cl); err != nil { - return errors.Wrap(err, "[VerifyCrossLink]") - } - return nil -} diff --git a/node/node_cross_shard.go b/node/node_cross_shard.go index 3e7878a7c..014432c4e 100644 --- a/node/node_cross_shard.go +++ b/node/node_cross_shard.go @@ -1,7 +1,6 @@ package node import ( - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rlp" proto_node "github.com/harmony-one/harmony/api/proto/node" "github.com/harmony-one/harmony/core" @@ -10,7 +9,6 @@ import ( "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/shard" - "github.com/pkg/errors" ) // BroadcastCXReceipts broadcasts cross shard receipts to correspoding @@ -116,50 +114,6 @@ func (node *Node) BroadcastMissingCXReceipts() { } } -var ( - errDoubleSpent = errors.New("[verifyIncomingReceipts] Double Spent") -) - -func (node *Node) verifyIncomingReceipts(block *types.Block) error { - m := make(map[common.Hash]struct{}) - cxps := block.IncomingReceipts() - for _, cxp := range cxps { - // double spent - if node.Blockchain().IsSpent(cxp) { - return errDoubleSpent - } - hash := cxp.MerkleProof.BlockHash - // duplicated receipts - if _, ok := m[hash]; ok { - return errDoubleSpent - } - m[hash] = struct{}{} - - for _, item := range cxp.Receipts { - if s := node.Blockchain().ShardID(); item.ToShardID != s { - return errors.Errorf( - "[verifyIncomingReceipts] Invalid ToShardID %d expectShardID %d", - s, item.ToShardID, - ) - } - } - - if err := node.Blockchain().Validator().ValidateCXReceiptsProof(cxp); err != nil { - return errors.Wrapf(err, "[verifyIncomingReceipts] verification failed") - } - } - - incomingReceiptHash := types.EmptyRootHash - if len(cxps) > 0 { - incomingReceiptHash = types.DeriveSha(cxps) - } - if incomingReceiptHash != block.Header().IncomingReceiptHash() { - return errors.New("[verifyIncomingReceipts] Invalid IncomingReceiptHash in block header") - } - - return nil -} - // ProcessReceiptMessage store the receipts and merkle proof in local data store func (node *Node) ProcessReceiptMessage(msgPayload []byte) { cxp := types.CXReceiptsProof{} diff --git a/node/node_handler.go b/node/node_handler.go index cbf3b829d..f1b1115e3 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -52,11 +52,6 @@ func (node *Node) processSkippedMsgTypeByteValue( } } -var ( - errInvalidPayloadSize = errors.New("invalid payload size") - errWrongBlockMsgSize = errors.New("invalid block message size") -) - // HandleNodeMessage parses the message and dispatch the actions. func (node *Node) HandleNodeMessage( ctx context.Context, @@ -334,101 +329,34 @@ func getCrosslinkHeadersForShards(shardChain core.BlockChain, curBlock *types.Bl // VerifyNewBlock is called by consensus participants to verify the block (account model) they are // running consensus on. -func (node *Node) VerifyNewBlock(newBlock *types.Block) error { - if newBlock == nil || newBlock.Header() == nil { - return errors.New("nil header or block asked to verify") - } - - if newBlock.ShardID() != node.Blockchain().ShardID() { - utils.Logger().Error(). - Uint32("my shard ID", node.Blockchain().ShardID()). - Uint32("new block's shard ID", newBlock.ShardID()). - Msg("[VerifyNewBlock] Wrong shard ID of the new block") - return errors.New("[VerifyNewBlock] Wrong shard ID of the new block") - } - - if newBlock.NumberU64() <= node.Blockchain().CurrentBlock().NumberU64() { - return errors.Errorf("block with the same block number is already committed: %d", newBlock.NumberU64()) - } - if err := node.Blockchain().Validator().ValidateHeader(newBlock, true); err != nil { - utils.Logger().Error(). - Str("blockHash", newBlock.Hash().Hex()). - Err(err). - Msg("[VerifyNewBlock] Cannot validate header for the new block") - return err - } - - if err := node.Blockchain().Engine().VerifyVRF( - node.Blockchain(), newBlock.Header(), - ); err != nil { - utils.Logger().Error(). - Str("blockHash", newBlock.Hash().Hex()). - Err(err). - Msg("[VerifyNewBlock] Cannot verify vrf for the new block") - return errors.Wrap(err, - "[VerifyNewBlock] Cannot verify vrf for the new block", - ) - } - - if err := node.Blockchain().Engine().VerifyShardState( - node.Blockchain(), node.Beaconchain(), newBlock.Header(), - ); err != nil { - utils.Logger().Error(). - Str("blockHash", newBlock.Hash().Hex()). - Err(err). - Msg("[VerifyNewBlock] Cannot verify shard state for the new block") - return errors.Wrap(err, - "[VerifyNewBlock] Cannot verify shard state for the new block", - ) - } - - if err := node.Blockchain().ValidateNewBlock(newBlock); err != nil { - if hooks := node.NodeConfig.WebHooks.Hooks; hooks != nil { - if p := hooks.ProtocolIssues; p != nil { - url := p.OnCannotCommit - go func() { - webhooks.DoPost(url, map[string]interface{}{ - "bad-header": newBlock.Header(), - "reason": err.Error(), - }) - }() +func VerifyNewBlock(nodeConfig *nodeconfig.ConfigType, blockChain core.BlockChain, beaconChain core.BlockChain) func(*types.Block) error { + return func(newBlock *types.Block) error { + if err := blockChain.ValidateNewBlock(newBlock, beaconChain); err != nil { + if hooks := nodeConfig.WebHooks.Hooks; hooks != nil { + if p := hooks.ProtocolIssues; p != nil { + url := p.OnCannotCommit + go func() { + webhooks.DoPost(url, map[string]interface{}{ + "bad-header": newBlock.Header(), + "reason": err.Error(), + }) + }() + } } + utils.Logger().Error(). + Str("blockHash", newBlock.Hash().Hex()). + Int("numTx", len(newBlock.Transactions())). + Int("numStakingTx", len(newBlock.StakingTransactions())). + Err(err). + Msg("[VerifyNewBlock] Cannot Verify New Block!!!") + return errors.Errorf( + "[VerifyNewBlock] Cannot Verify New Block!!! block-hash %s txn-count %d", + newBlock.Hash().Hex(), + len(newBlock.Transactions()), + ) } - utils.Logger().Error(). - Str("blockHash", newBlock.Hash().Hex()). - Int("numTx", len(newBlock.Transactions())). - Int("numStakingTx", len(newBlock.StakingTransactions())). - Err(err). - Msg("[VerifyNewBlock] Cannot Verify New Block!!!") - return errors.Errorf( - "[VerifyNewBlock] Cannot Verify New Block!!! block-hash %s txn-count %d", - newBlock.Hash().Hex(), - len(newBlock.Transactions()), - ) - } - - // Verify cross links - // TODO: move into ValidateNewBlock - if node.IsRunningBeaconChain() { - err := node.VerifyBlockCrossLinks(newBlock) - if err != nil { - utils.Logger().Debug().Err(err).Msg("ops2 VerifyBlockCrossLinks Failed") - return err - } - } - - // TODO: move into ValidateNewBlock - if err := node.verifyIncomingReceipts(newBlock); err != nil { - utils.Logger().Error(). - Str("blockHash", newBlock.Hash().Hex()). - Int("numIncomingReceipts", len(newBlock.IncomingReceipts())). - Err(err). - Msg("[VerifyNewBlock] Cannot ValidateNewBlock") - return errors.Wrapf( - err, "[VerifyNewBlock] Cannot ValidateNewBlock", - ) + return nil } - return nil } // PostConsensusProcessing is called by consensus participants, after consensus is done, to: diff --git a/node/node_handler_test.go b/node/node_handler_test.go index 881bc9262..ccf214688 100644 --- a/node/node_handler_test.go +++ b/node/node_handler_test.go @@ -7,9 +7,12 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/consensus/quorum" + "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/crypto/bls" + "github.com/harmony-one/harmony/internal/chain" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" + "github.com/harmony-one/harmony/internal/shardchain" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/multibls" "github.com/harmony-one/harmony/p2p" @@ -29,17 +32,22 @@ func TestAddNewBlock(t *testing.T) { if err != nil { t.Fatalf("newhost failure: %v", err) } + engine := chain.NewEngine() + chainconfig := nodeconfig.GetShardConfig(shard.BeaconChainShardID).GetNetworkType().ChainConfig() + collection := shardchain.NewCollection( + nil, testDBFactory, &core.GenesisInitializer{NetworkType: nodeconfig.GetShardConfig(shard.BeaconChainShardID).GetNetworkType()}, engine, &chainconfig, + ) decider := quorum.NewDecider( quorum.SuperMajorityVote, shard.BeaconChainShardID, ) consensus, err := consensus.New( - host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsKey), decider, + host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsKey), nil, decider, 3, false, ) if err != nil { t.Fatalf("Cannot craeate consensus: %v", err) } nodeconfig.SetNetworkType(nodeconfig.Devnet) - node := New(host, consensus, testDBFactory, nil, nil, nil, nil, nil) + node := New(host, consensus, engine, collection, nil, nil, nil, nil, nil) txs := make(map[common.Address]types.Transactions) stks := staking.StakingTransactions{} @@ -76,11 +84,16 @@ func TestVerifyNewBlock(t *testing.T) { if err != nil { t.Fatalf("newhost failure: %v", err) } + engine := chain.NewEngine() + chainconfig := nodeconfig.GetShardConfig(shard.BeaconChainShardID).GetNetworkType().ChainConfig() + collection := shardchain.NewCollection( + nil, testDBFactory, &core.GenesisInitializer{NetworkType: nodeconfig.GetShardConfig(shard.BeaconChainShardID).GetNetworkType()}, engine, &chainconfig, + ) decider := quorum.NewDecider( quorum.SuperMajorityVote, shard.BeaconChainShardID, ) consensus, err := consensus.New( - host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsKey), decider, + host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsKey), nil, decider, 3, false, ) if err != nil { t.Fatalf("Cannot craeate consensus: %v", err) @@ -88,7 +101,7 @@ func TestVerifyNewBlock(t *testing.T) { archiveMode := make(map[uint32]bool) archiveMode[0] = true archiveMode[1] = false - node := New(host, consensus, testDBFactory, nil, nil, nil, archiveMode, nil) + node := New(host, consensus, engine, collection, nil, nil, nil, archiveMode, nil) txs := make(map[common.Address]types.Transactions) stks := staking.StakingTransactions{} @@ -105,7 +118,7 @@ func TestVerifyNewBlock(t *testing.T) { // work around vrf verification as it's tested in another test. node.Blockchain().Config().VRFEpoch = big.NewInt(2) - if err := node.VerifyNewBlock(block); err != nil { + if err := VerifyNewBlock(nil, node.Blockchain(), node.Beaconchain())(block); err != nil { t.Error("New block is not verified successfully:", err) } } @@ -122,11 +135,20 @@ func TestVerifyVRF(t *testing.T) { if err != nil { t.Fatalf("newhost failure: %v", err) } + engine := chain.NewEngine() + chainconfig := nodeconfig.GetShardConfig(shard.BeaconChainShardID).GetNetworkType().ChainConfig() + collection := shardchain.NewCollection( + nil, testDBFactory, &core.GenesisInitializer{NetworkType: nodeconfig.GetShardConfig(shard.BeaconChainShardID).GetNetworkType()}, engine, &chainconfig, + ) + blockchain, err := collection.ShardChain(shard.BeaconChainShardID) + if err != nil { + t.Fatal("cannot get blockchain") + } decider := quorum.NewDecider( quorum.SuperMajorityVote, shard.BeaconChainShardID, ) consensus, err := consensus.New( - host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsKey), decider, + host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsKey), blockchain, decider, 3, false, ) if err != nil { t.Fatalf("Cannot craeate consensus: %v", err) @@ -134,9 +156,8 @@ func TestVerifyVRF(t *testing.T) { archiveMode := make(map[uint32]bool) archiveMode[0] = true archiveMode[1] = false - node := New(host, consensus, testDBFactory, nil, nil, nil, archiveMode, nil) + node := New(host, consensus, engine, collection, nil, nil, nil, archiveMode, nil) - consensus.Blockchain = node.Blockchain() txs := make(map[common.Address]types.Transactions) stks := staking.StakingTransactions{} node.Worker.CommitTransactions( diff --git a/node/node_newblock_test.go b/node/node_newblock_test.go index 59f2a8c88..11bee965d 100644 --- a/node/node_newblock_test.go +++ b/node/node_newblock_test.go @@ -4,18 +4,21 @@ import ( "strings" "testing" - "github.com/harmony-one/harmony/internal/shardchain" - "github.com/ethereum/go-ethereum/common" "github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/consensus/quorum" + "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/crypto/bls" + "github.com/harmony-one/harmony/internal/chain" + nodeconfig "github.com/harmony-one/harmony/internal/configs/node" + "github.com/harmony-one/harmony/internal/shardchain" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/multibls" "github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/shard" staking "github.com/harmony-one/harmony/staking/types" + "github.com/stretchr/testify/require" ) func TestFinalizeNewBlockAsync(t *testing.T) { @@ -30,17 +33,26 @@ func TestFinalizeNewBlockAsync(t *testing.T) { if err != nil { t.Fatalf("newhost failure: %v", err) } + var testDBFactory = &shardchain.MemDBFactory{} + engine := chain.NewEngine() + chainconfig := nodeconfig.GetShardConfig(shard.BeaconChainShardID).GetNetworkType().ChainConfig() + collection := shardchain.NewCollection( + nil, testDBFactory, &core.GenesisInitializer{NetworkType: nodeconfig.GetShardConfig(shard.BeaconChainShardID).GetNetworkType()}, engine, &chainconfig, + ) + blockchain, err := collection.ShardChain(shard.BeaconChainShardID) + require.NoError(t, err) + decider := quorum.NewDecider( quorum.SuperMajorityVote, shard.BeaconChainShardID, ) consensus, err := consensus.New( - host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsKey), decider, + host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsKey), nil, decider, 3, false, ) if err != nil { t.Fatalf("Cannot craeate consensus: %v", err) } - var testDBFactory = &shardchain.MemDBFactory{} - node := New(host, consensus, testDBFactory, nil, nil, nil, nil, nil) + + node := New(host, consensus, engine, collection, nil, nil, nil, nil, nil) node.Worker.UpdateCurrent() @@ -58,7 +70,7 @@ func TestFinalizeNewBlockAsync(t *testing.T) { commitSigs, func() uint64 { return 0 }, common.Address{}, nil, nil, ) - if err := node.VerifyNewBlock(block); err != nil { + if err := VerifyNewBlock(nil, blockchain, nil)(block); err != nil { t.Error("New block is not verified successfully:", err) } diff --git a/node/node_test.go b/node/node_test.go index 9c8e4c2b4..c2a972257 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -7,7 +7,10 @@ import ( "github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/consensus/quorum" + "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/crypto/bls" + "github.com/harmony-one/harmony/internal/chain" + nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/internal/shardchain" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/multibls" @@ -30,16 +33,21 @@ func TestNewNode(t *testing.T) { if err != nil { t.Fatalf("newhost failure: %v", err) } + engine := chain.NewEngine() decider := quorum.NewDecider( quorum.SuperMajorityVote, shard.BeaconChainShardID, ) consensus, err := consensus.New( - host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsKey), decider, + host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsKey), nil, decider, 3, false, ) if err != nil { t.Fatalf("Cannot craeate consensus: %v", err) } - node := New(host, consensus, testDBFactory, nil, nil, nil, nil, nil) + chainconfig := nodeconfig.GetShardConfig(shard.BeaconChainShardID).GetNetworkType().ChainConfig() + collection := shardchain.NewCollection( + nil, testDBFactory, &core.GenesisInitializer{NetworkType: nodeconfig.GetShardConfig(shard.BeaconChainShardID).GetNetworkType()}, engine, &chainconfig, + ) + node := New(host, consensus, engine, collection, nil, nil, nil, nil, nil) if node.Consensus == nil { t.Error("Consensus is not initialized for the node") } @@ -203,20 +211,27 @@ func TestAddBeaconPeer(t *testing.T) { if err != nil { t.Fatalf("newhost failure: %v", err) } + engine := chain.NewEngine() + chainconfig := nodeconfig.GetShardConfig(shard.BeaconChainShardID).GetNetworkType().ChainConfig() + collection := shardchain.NewCollection( + nil, testDBFactory, &core.GenesisInitializer{NetworkType: nodeconfig.GetShardConfig(shard.BeaconChainShardID).GetNetworkType()}, engine, &chainconfig, + ) + blockchain, err := collection.ShardChain(shard.BeaconChainShardID) + if err != nil { + t.Fatalf("Cannot craeate consensus: %v", err) + } decider := quorum.NewDecider( quorum.SuperMajorityVote, shard.BeaconChainShardID, ) + consensus, err := consensus.New( - host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsKey), decider, + host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsKey), blockchain, decider, 3, false, ) - if err != nil { - t.Fatalf("Cannot craeate consensus: %v", err) - } archiveMode := make(map[uint32]bool) archiveMode[0] = true archiveMode[1] = false - node := New(host, consensus, testDBFactory, nil, nil, nil, archiveMode, nil) + node := New(host, consensus, engine, collection, nil, nil, nil, archiveMode, nil) for _, p := range peers1 { ret := node.AddBeaconPeer(p) if ret {