diff --git a/node/node.go b/node/node.go index 8eda30de1..2e94daeb1 100644 --- a/node/node.go +++ b/node/node.go @@ -124,11 +124,11 @@ type Node struct { CxPool *core.CxPool // pool for missing cross shard receipts resend - pendingTransactions types.Transactions // All the transactions received but not yet processed for Consensus + pendingTransactions map[common.Hash]*types.Transaction // All the transactions received but not yet processed for Consensus pendingTxMutex sync.Mutex recentTxsStats types.RecentTxsStats - pendingStakingTransactions types2.StakingTransactions // All the staking transactions received but not yet processed for Consensus + pendingStakingTransactions map[common.Hash]*types2.StakingTransaction // All the staking transactions received but not yet processed for Consensus pendingStakingTxMutex sync.Mutex Worker *worker.Worker @@ -249,28 +249,6 @@ func (node *Node) Beaconchain() *core.BlockChain { return bc } -func (node *Node) reducePendingTransactions() { - txPoolLimit := core.ShardingSchedule.MaxTxPoolSizeLimit() - curLen := len(node.pendingTransactions) - - // If length of pendingTransactions is greater than TxPoolLimit then by greedy take the TxPoolLimit recent transactions. - if curLen > txPoolLimit+txPoolLimit { - node.pendingTransactions = append(types.Transactions(nil), node.pendingTransactions[curLen-txPoolLimit:]...) - utils.Logger().Info().Msg("mem stat reduce pending transaction") - } -} - -func (node *Node) reducePendingStakingTransactions() { - txPoolLimit := core.ShardingSchedule.MaxTxPoolSizeLimit() - curLen := len(node.pendingStakingTransactions) - - // If length of pendingStakingTransactions is greater than TxPoolLimit then by greedy take the TxPoolLimit recent transactions. - if curLen > txPoolLimit+txPoolLimit { - node.pendingStakingTransactions = append(types2.StakingTransactions(nil), node.pendingStakingTransactions[curLen-txPoolLimit:]...) - utils.Logger().Info().Msg("mem stat reduce pending staking transaction") - } -} - func (node *Node) tryBroadcast(tx *types.Transaction) { msg := proto_node.ConstructTransactionListMessageAccount(types.Transactions{tx}) @@ -288,18 +266,32 @@ func (node *Node) tryBroadcast(tx *types.Transaction) { // Add new transactions to the pending transaction list. func (node *Node) addPendingTransactions(newTxs types.Transactions) { + txPoolLimit := core.ShardingSchedule.MaxTxPoolSizeLimit() node.pendingTxMutex.Lock() - node.pendingTransactions = append(node.pendingTransactions, newTxs...) - node.reducePendingTransactions() + for _, tx := range newTxs { + if _, ok := node.pendingTransactions[tx.Hash()]; !ok { + node.pendingTransactions[tx.Hash()] = tx + } + if len(node.pendingTransactions) > txPoolLimit { + break + } + } node.pendingTxMutex.Unlock() utils.Logger().Info().Int("length of newTxs", len(newTxs)).Int("totalPending", len(node.pendingTransactions)).Msg("Got more transactions") } // Add new staking transactions to the pending staking transaction list. func (node *Node) addPendingStakingTransactions(newStakingTxs types2.StakingTransactions) { + txPoolLimit := core.ShardingSchedule.MaxTxPoolSizeLimit() node.pendingStakingTxMutex.Lock() - node.pendingStakingTransactions = append(node.pendingStakingTransactions, newStakingTxs...) - node.reducePendingStakingTransactions() + for _, tx := range newStakingTxs { + if _, ok := node.pendingStakingTransactions[tx.Hash()]; !ok { + node.pendingStakingTransactions[tx.Hash()] = tx + } + if len(node.pendingStakingTransactions) > txPoolLimit { + break + } + } node.pendingStakingTxMutex.Unlock() utils.Logger().Info().Int("length of newStakingTxs", len(newStakingTxs)).Int("totalPending", len(node.pendingTransactions)).Msg("Got more staking transactions") } @@ -369,19 +361,33 @@ func (node *Node) getTransactionsForNewBlock(coinbase common.Address) (types.Tra node.pendingStakingTxMutex.Lock() defer node.pendingStakingTxMutex.Unlock() - selected, unselected, invalid := node.Worker.SelectTransactionsForNewBlock(newBlockNum, node.pendingTransactions, node.recentTxsStats, txsThrottleConfig, coinbase) - selectedStaking, unselectedStaking, invalidStaking := node.Worker.SelectStakingTransactionsForNewBlock(newBlockNum, node.pendingTransactions, node.recentTxsStats, txsThrottleConfig, coinbase) + pendingTransactions := types.Transactions{} + pendingStakingTransactions := types2.StakingTransactions{} - node.pendingTransactions = unselected - node.reducePendingTransactions() + for _, tx := range node.pendingTransactions { + pendingTransactions = append(pendingTransactions, tx) + } + for _, tx := range node.pendingStakingTransactions { + pendingStakingTransactions = append(pendingStakingTransactions, tx) + } + + selected, unselected, invalid := node.Worker.SelectTransactionsForNewBlock(newBlockNum, pendingTransactions, node.recentTxsStats, txsThrottleConfig, coinbase) + selectedStaking, unselectedStaking, invalidStaking := node.Worker.SelectStakingTransactionsForNewBlock(newBlockNum, pendingStakingTransactions, node.recentTxsStats, txsThrottleConfig, coinbase) + + node.pendingTransactions = make(map[common.Hash]*types.Transaction) + for _, unselectedTx := range unselected { + node.pendingTransactions[unselectedTx.Hash()] = unselectedTx + } utils.Logger().Info(). Int("remainPending", len(node.pendingTransactions)). Int("selected", len(selected)). Int("invalidDiscarded", len(invalid)). Msg("Selecting Transactions") - node.pendingStakingTransactions = unselectedStaking - node.reducePendingStakingTransactions() + node.pendingStakingTransactions = make(map[common.Hash]*types2.StakingTransaction) + for _, unselectedStakingTx := range unselectedStaking { + node.pendingStakingTransactions[unselectedStakingTx.Hash()] = unselectedStakingTx + } utils.Logger().Info(). Int("remainPending", len(node.pendingStakingTransactions)). Int("selected", len(unselectedStaking)). @@ -464,6 +470,8 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, chainDBFactory shardc } node.pendingCXReceipts = make(map[string]*types.CXReceiptsProof) + node.pendingTransactions = make(map[common.Hash]*types.Transaction) + node.pendingStakingTransactions = make(map[common.Hash]*types2.StakingTransaction) node.Consensus.VerifiedNewBlock = make(chan *types.Block) // the sequence number is the next block number to be added in consensus protocol, which is always one more than current chain header block diff --git a/node/worker/worker.go b/node/worker/worker.go index e82347394..dc4d41993 100644 --- a/node/worker/worker.go +++ b/node/worker/worker.go @@ -150,7 +150,7 @@ func (w *Worker) SelectTransactionsForNewBlock(newBlockNum uint64, txs types.Tra } // SelectStakingTransactionsForNewBlock selects staking transactions for new block. -func (w *Worker) SelectStakingTransactionsForNewBlock(newBlockNum uint64, txs types.Transactions, recentTxsStats types.RecentTxsStats, txsThrottleConfig *shardingconfig.TxsThrottleConfig, coinbase common.Address) (types2.StakingTransactions, types2.StakingTransactions, types2.StakingTransactions) { +func (w *Worker) SelectStakingTransactionsForNewBlock(newBlockNum uint64, txs types2.StakingTransactions, recentTxsStats types.RecentTxsStats, txsThrottleConfig *shardingconfig.TxsThrottleConfig, coinbase common.Address) (types2.StakingTransactions, types2.StakingTransactions, types2.StakingTransactions) { // TODO: implement staking transaction selection return types2.StakingTransactions{}, types2.StakingTransactions{}, types2.StakingTransactions{} } diff --git a/staking/types/staking_transaction.go b/staking/types/staking_transaction.go index 5d2ee5441..bc319123a 100644 --- a/staking/types/staking_transaction.go +++ b/staking/types/staking_transaction.go @@ -1,9 +1,11 @@ package types import ( + "bytes" "math/big" - "github.com/harmony-one/harmony/internal/common" + "github.com/ethereum/go-ethereum/common" + "github.com/harmony-one/harmony/crypto/hash" ) // StakingTransaction struct. @@ -19,8 +21,19 @@ type StakingTransaction struct { S *big.Int `json:"s" gencodec:"required"` // This is only used when marshaling to JSON. - Hash *common.Hash `json:"hash" rlp:"-"` + hash *common.Hash `json:"hash" rlp:"-"` } // StakingTransactions is a Transaction slice type for basic sorting. type StakingTransactions []*StakingTransaction + +// Hash hashes the RLP encoding of tx. +// It uniquely identifies the transaction. +func (tx *StakingTransaction) Hash() common.Hash { + emptyHash := common.Hash{} + if bytes.Compare(tx.hash[:], emptyHash[:]) == 0 { + h := hash.FromRLP(tx) + tx.hash = &h + } + return *tx.hash +}