diff --git a/core/block_validator.go b/core/block_validator.go index afb1ab3be..d24287eb1 100644 --- a/core/block_validator.go +++ b/core/block_validator.go @@ -187,7 +187,7 @@ func (v *BlockValidator) ValidateCXReceiptsProof(cxp *types.CXReceiptsProof) err merkleProof := cxp.MerkleProof shardRoot := common.Hash{} foundMatchingShardID := false - byteBuffer := bytes.NewBuffer([]byte{}) + byteBuffer := bytes.Buffer{} // prepare to calculate source shard outgoing cxreceipts root hash for j := 0; j < len(merkleProof.ShardIDs); j++ { diff --git a/core/types/cx_receipt.go b/core/types/cx_receipt.go index c77dac5a0..d819e8964 100644 --- a/core/types/cx_receipt.go +++ b/core/types/cx_receipt.go @@ -182,5 +182,10 @@ func (cxp *CXReceiptsProof) GetToShardID() (uint32, error) { // ContainsEmptyField checks whether the given CXReceiptsProof contains empty field func (cxp *CXReceiptsProof) ContainsEmptyField() bool { - return cxp == nil || cxp.Receipts == nil || cxp.MerkleProof == nil || cxp.Header == nil || len(cxp.CommitSig)+len(cxp.CommitBitmap) == 0 + anyNil := cxp == nil || + cxp.Receipts == nil || + cxp.MerkleProof == nil || + cxp.Header == nil + anyZero := len(cxp.CommitSig)+len(cxp.CommitBitmap) == 0 + return anyNil || anyZero } diff --git a/node/node.go b/node/node.go index 89312571e..ba94f25f6 100644 --- a/node/node.go +++ b/node/node.go @@ -5,6 +5,7 @@ import ( "crypto/ecdsa" "fmt" "os" + "strings" "sync" "time" @@ -20,6 +21,7 @@ import ( "github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/consensus/reward" "github.com/harmony-one/harmony/core" + "github.com/harmony-one/harmony/core/rawdb" "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/drand" "github.com/harmony-one/harmony/internal/chain" @@ -289,20 +291,27 @@ func (node *Node) addPendingTransactions(newTxs types.Transactions) { // Add new staking transactions to the pending staking transaction list. func (node *Node) addPendingStakingTransactions(newStakingTxs staking.StakingTransactions) { - txPoolLimit := 1000 // TODO: incorporate staking txn into TxPool - + // TODO: incorporate staking txn into TxPool if node.NodeConfig.ShardID == shard.BeaconChainShardID && node.Blockchain().Config().IsPreStaking(node.Blockchain().CurrentHeader().Epoch()) { node.pendingStakingTxMutex.Lock() for _, tx := range newStakingTxs { + const txPoolLimit = 1000 + if s := len(node.pendingStakingTransactions); s >= txPoolLimit { + utils.Logger().Info(). + Int("tx-pool-size", s). + Int("tx-pool-limit", txPoolLimit). + Msg("Current staking txn pool reached limit") + break + } if _, ok := node.pendingStakingTransactions[tx.Hash()]; !ok { node.pendingStakingTransactions[tx.Hash()] = tx } - if len(node.pendingStakingTransactions) > txPoolLimit { - break - } } - utils.Logger().Info().Int("length of newStakingTxs", len(newStakingTxs)).Int("totalPending", len(node.pendingStakingTransactions)).Msg("Got more staking transactions") + utils.Logger().Info(). + Int("length of newStakingTxs", len(newStakingTxs)). + Int("totalPending", len(node.pendingStakingTransactions)). + Msg("Got more staking transactions") node.pendingStakingTxMutex.Unlock() } } @@ -311,7 +320,8 @@ func (node *Node) addPendingStakingTransactions(newStakingTxs staking.StakingTra func (node *Node) AddPendingStakingTransaction( newStakingTx *staking.StakingTransaction) { // TODO: everyone should record staking txns, not just leader - if node.Consensus.IsLeader() && node.NodeConfig.ShardID == 0 { + if node.Consensus.IsLeader() && + node.NodeConfig.ShardID == shard.BeaconChainShardID { node.addPendingStakingTransactions(staking.StakingTransactions{newStakingTx}) } else { utils.Logger().Info().Str("Hash", newStakingTx.Hash().Hex()).Msg("Broadcasting Staking Tx") @@ -337,20 +347,63 @@ func (node *Node) AddPendingReceipts(receipts *types.CXReceiptsProof) { defer node.pendingCXMutex.Unlock() if receipts.ContainsEmptyField() { - utils.Logger().Info().Int("totalPendingReceipts", len(node.pendingCXReceipts)).Msg("CXReceiptsProof contains empty field") + utils.Logger().Info(). + Int("totalPendingReceipts", len(node.pendingCXReceipts)). + Msg("CXReceiptsProof contains empty field") return } blockNum := receipts.Header.Number().Uint64() shardID := receipts.Header.ShardID() + + // Sanity checks + + if err := node.Blockchain().Validator().ValidateCXReceiptsProof(receipts); err != nil { + if !strings.Contains(err.Error(), rawdb.MsgNoShardStateFromDB) { + utils.Logger().Error().Err(err).Msg("[proposeReceiptsProof] Invalid CXReceiptsProof") + return + } + } + + // cross-shard receipt should not be coming from our shard + if s := node.Consensus.ShardID; s == shardID { + utils.Logger().Info(). + Uint32("my-shard", s). + Uint32("receipt-shard", shardID). + Msg("ShardID of incoming receipt was same as mine") + return + } + + if e := receipts.Header.Epoch(); blockNum == 0 || + !node.Blockchain().Config().IsCrossLink(e) { + utils.Logger().Info(). + Uint64("incoming-epoch", e.Uint64()). + Msg("Incoming receipt had meaningless epoch") + return + } + key := utils.GetPendingCXKey(shardID, blockNum) + // DDoS protection + const maxCrossTxnSize = 4096 + if s := len(node.pendingCXReceipts); s >= maxCrossTxnSize { + utils.Logger().Info(). + Int("pending-cx-receipts-size", s). + Int("pending-cx-receipts-limit", maxCrossTxnSize). + Msg("Current pending cx-receipts reached size limit") + return + } + if _, ok := node.pendingCXReceipts[key]; ok { - utils.Logger().Info().Int("totalPendingReceipts", len(node.pendingCXReceipts)).Msg("Already Got Same Receipt message") + utils.Logger().Info(). + Int("totalPendingReceipts", len(node.pendingCXReceipts)). + Msg("Already Got Same Receipt message") return } node.pendingCXReceipts[key] = receipts - utils.Logger().Info().Int("totalPendingReceipts", len(node.pendingCXReceipts)).Msg("Got ONE more receipt message") + utils.Logger().Info(). + Int("totalPendingReceipts", len(node.pendingCXReceipts)). + Msg("Got ONE more receipt message") } func (node *Node) startRxPipeline(