@ -5,6 +5,7 @@ import (
"crypto/ecdsa"
"crypto/ecdsa"
"fmt"
"fmt"
"os"
"os"
"strings"
"sync"
"sync"
"time"
"time"
@ -20,6 +21,7 @@ import (
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/consensus/reward"
"github.com/harmony-one/harmony/consensus/reward"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/rawdb"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/drand"
"github.com/harmony-one/harmony/drand"
"github.com/harmony-one/harmony/internal/chain"
"github.com/harmony-one/harmony/internal/chain"
@ -289,20 +291,27 @@ func (node *Node) addPendingTransactions(newTxs types.Transactions) {
// Add new staking transactions to the pending staking transaction list.
// Add new staking transactions to the pending staking transaction list.
func ( node * Node ) addPendingStakingTransactions ( newStakingTxs staking . StakingTransactions ) {
func ( node * Node ) addPendingStakingTransactions ( newStakingTxs staking . StakingTransactions ) {
txPoolLimit := 1000 // TODO: incorporate staking txn into TxPool
// TODO: incorporate staking txn into TxPool
if node . NodeConfig . ShardID == shard . BeaconChainShardID &&
if node . NodeConfig . ShardID == shard . BeaconChainShardID &&
node . Blockchain ( ) . Config ( ) . IsPreStaking ( node . Blockchain ( ) . CurrentHeader ( ) . Epoch ( ) ) {
node . Blockchain ( ) . Config ( ) . IsPreStaking ( node . Blockchain ( ) . CurrentHeader ( ) . Epoch ( ) ) {
node . pendingStakingTxMutex . Lock ( )
node . pendingStakingTxMutex . Lock ( )
for _ , tx := range newStakingTxs {
for _ , tx := range newStakingTxs {
const txPoolLimit = 1000
if s := len ( node . pendingStakingTransactions ) ; s >= txPoolLimit {
utils . Logger ( ) . Info ( ) .
Int ( "tx-pool-size" , s ) .
Int ( "tx-pool-limit" , txPoolLimit ) .
Msg ( "Current staking txn pool reached limit" )
break
}
if _ , ok := node . pendingStakingTransactions [ tx . Hash ( ) ] ; ! ok {
if _ , ok := node . pendingStakingTransactions [ tx . Hash ( ) ] ; ! ok {
node . pendingStakingTransactions [ tx . Hash ( ) ] = tx
node . pendingStakingTransactions [ tx . Hash ( ) ] = tx
}
}
if len ( node . pendingStakingTransactions ) > txPoolLimit {
break
}
}
}
utils . Logger ( ) . Info ( ) . Int ( "length of newStakingTxs" , len ( newStakingTxs ) ) . Int ( "totalPending" , len ( node . pendingStakingTransactions ) ) . Msg ( "Got more staking transactions" )
utils . Logger ( ) . Info ( ) .
Int ( "length of newStakingTxs" , len ( newStakingTxs ) ) .
Int ( "totalPending" , len ( node . pendingStakingTransactions ) ) .
Msg ( "Got more staking transactions" )
node . pendingStakingTxMutex . Unlock ( )
node . pendingStakingTxMutex . Unlock ( )
}
}
}
}
@ -311,7 +320,8 @@ func (node *Node) addPendingStakingTransactions(newStakingTxs staking.StakingTra
func ( node * Node ) AddPendingStakingTransaction (
func ( node * Node ) AddPendingStakingTransaction (
newStakingTx * staking . StakingTransaction ) {
newStakingTx * staking . StakingTransaction ) {
// TODO: everyone should record staking txns, not just leader
// TODO: everyone should record staking txns, not just leader
if node . Consensus . IsLeader ( ) && node . NodeConfig . ShardID == 0 {
if node . Consensus . IsLeader ( ) &&
node . NodeConfig . ShardID == shard . BeaconChainShardID {
node . addPendingStakingTransactions ( staking . StakingTransactions { newStakingTx } )
node . addPendingStakingTransactions ( staking . StakingTransactions { newStakingTx } )
} else {
} else {
utils . Logger ( ) . Info ( ) . Str ( "Hash" , newStakingTx . Hash ( ) . Hex ( ) ) . Msg ( "Broadcasting Staking Tx" )
utils . Logger ( ) . Info ( ) . Str ( "Hash" , newStakingTx . Hash ( ) . Hex ( ) ) . Msg ( "Broadcasting Staking Tx" )
@ -337,20 +347,63 @@ func (node *Node) AddPendingReceipts(receipts *types.CXReceiptsProof) {
defer node . pendingCXMutex . Unlock ( )
defer node . pendingCXMutex . Unlock ( )
if receipts . ContainsEmptyField ( ) {
if receipts . ContainsEmptyField ( ) {
utils . Logger ( ) . Info ( ) . Int ( "totalPendingReceipts" , len ( node . pendingCXReceipts ) ) . Msg ( "CXReceiptsProof contains empty field" )
utils . Logger ( ) . Info ( ) .
Int ( "totalPendingReceipts" , len ( node . pendingCXReceipts ) ) .
Msg ( "CXReceiptsProof contains empty field" )
return
return
}
}
blockNum := receipts . Header . Number ( ) . Uint64 ( )
blockNum := receipts . Header . Number ( ) . Uint64 ( )
shardID := receipts . Header . ShardID ( )
shardID := receipts . Header . ShardID ( )
// Sanity checks
if err := node . Blockchain ( ) . Validator ( ) . ValidateCXReceiptsProof ( receipts ) ; err != nil {
if ! strings . Contains ( err . Error ( ) , rawdb . MsgNoShardStateFromDB ) {
utils . Logger ( ) . Error ( ) . Err ( err ) . Msg ( "[proposeReceiptsProof] Invalid CXReceiptsProof" )
return
}
}
// cross-shard receipt should not be coming from our shard
if s := node . Consensus . ShardID ; s == shardID {
utils . Logger ( ) . Info ( ) .
Uint32 ( "my-shard" , s ) .
Uint32 ( "receipt-shard" , shardID ) .
Msg ( "ShardID of incoming receipt was same as mine" )
return
}
if e := receipts . Header . Epoch ( ) ; blockNum == 0 ||
! node . Blockchain ( ) . Config ( ) . IsCrossLink ( e ) {
utils . Logger ( ) . Info ( ) .
Uint64 ( "incoming-epoch" , e . Uint64 ( ) ) .
Msg ( "Incoming receipt had meaningless epoch" )
return
}
key := utils . GetPendingCXKey ( shardID , blockNum )
key := utils . GetPendingCXKey ( shardID , blockNum )
// DDoS protection
const maxCrossTxnSize = 4096
if s := len ( node . pendingCXReceipts ) ; s >= maxCrossTxnSize {
utils . Logger ( ) . Info ( ) .
Int ( "pending-cx-receipts-size" , s ) .
Int ( "pending-cx-receipts-limit" , maxCrossTxnSize ) .
Msg ( "Current pending cx-receipts reached size limit" )
return
}
if _ , ok := node . pendingCXReceipts [ key ] ; ok {
if _ , ok := node . pendingCXReceipts [ key ] ; ok {
utils . Logger ( ) . Info ( ) . Int ( "totalPendingReceipts" , len ( node . pendingCXReceipts ) ) . Msg ( "Already Got Same Receipt message" )
utils . Logger ( ) . Info ( ) .
Int ( "totalPendingReceipts" , len ( node . pendingCXReceipts ) ) .
Msg ( "Already Got Same Receipt message" )
return
return
}
}
node . pendingCXReceipts [ key ] = receipts
node . pendingCXReceipts [ key ] = receipts
utils . Logger ( ) . Info ( ) . Int ( "totalPendingReceipts" , len ( node . pendingCXReceipts ) ) . Msg ( "Got ONE more receipt message" )
utils . Logger ( ) . Info ( ) .
Int ( "totalPendingReceipts" , len ( node . pendingCXReceipts ) ) .
Msg ( "Got ONE more receipt message" )
}
}
func ( node * Node ) startRxPipeline (
func ( node * Node ) startRxPipeline (