Make pending txs a map

pull/1672/head
Rongjian Lan 5 years ago
parent 6141f8c7f3
commit de6a1df793
  1. 76
      node/node.go
  2. 2
      node/worker/worker.go
  3. 17
      staking/types/staking_transaction.go

@ -124,11 +124,11 @@ type Node struct {
CxPool *core.CxPool // pool for missing cross shard receipts resend
pendingTransactions types.Transactions // All the transactions received but not yet processed for Consensus
pendingTransactions map[common.Hash]*types.Transaction // All the transactions received but not yet processed for Consensus
pendingTxMutex sync.Mutex
recentTxsStats types.RecentTxsStats
pendingStakingTransactions types2.StakingTransactions // All the staking transactions received but not yet processed for Consensus
pendingStakingTransactions map[common.Hash]*types2.StakingTransaction // All the staking transactions received but not yet processed for Consensus
pendingStakingTxMutex sync.Mutex
Worker *worker.Worker
@ -249,28 +249,6 @@ func (node *Node) Beaconchain() *core.BlockChain {
return bc
}
func (node *Node) reducePendingTransactions() {
txPoolLimit := core.ShardingSchedule.MaxTxPoolSizeLimit()
curLen := len(node.pendingTransactions)
// If length of pendingTransactions is greater than TxPoolLimit then by greedy take the TxPoolLimit recent transactions.
if curLen > txPoolLimit+txPoolLimit {
node.pendingTransactions = append(types.Transactions(nil), node.pendingTransactions[curLen-txPoolLimit:]...)
utils.Logger().Info().Msg("mem stat reduce pending transaction")
}
}
func (node *Node) reducePendingStakingTransactions() {
txPoolLimit := core.ShardingSchedule.MaxTxPoolSizeLimit()
curLen := len(node.pendingStakingTransactions)
// If length of pendingStakingTransactions is greater than TxPoolLimit then by greedy take the TxPoolLimit recent transactions.
if curLen > txPoolLimit+txPoolLimit {
node.pendingStakingTransactions = append(types2.StakingTransactions(nil), node.pendingStakingTransactions[curLen-txPoolLimit:]...)
utils.Logger().Info().Msg("mem stat reduce pending staking transaction")
}
}
func (node *Node) tryBroadcast(tx *types.Transaction) {
msg := proto_node.ConstructTransactionListMessageAccount(types.Transactions{tx})
@ -288,18 +266,32 @@ func (node *Node) tryBroadcast(tx *types.Transaction) {
// Add new transactions to the pending transaction list.
func (node *Node) addPendingTransactions(newTxs types.Transactions) {
txPoolLimit := core.ShardingSchedule.MaxTxPoolSizeLimit()
node.pendingTxMutex.Lock()
node.pendingTransactions = append(node.pendingTransactions, newTxs...)
node.reducePendingTransactions()
for _, tx := range newTxs {
if _, ok := node.pendingTransactions[tx.Hash()]; !ok {
node.pendingTransactions[tx.Hash()] = tx
}
if len(node.pendingTransactions) > txPoolLimit {
break
}
}
node.pendingTxMutex.Unlock()
utils.Logger().Info().Int("length of newTxs", len(newTxs)).Int("totalPending", len(node.pendingTransactions)).Msg("Got more transactions")
}
// Add new staking transactions to the pending staking transaction list.
func (node *Node) addPendingStakingTransactions(newStakingTxs types2.StakingTransactions) {
txPoolLimit := core.ShardingSchedule.MaxTxPoolSizeLimit()
node.pendingStakingTxMutex.Lock()
node.pendingStakingTransactions = append(node.pendingStakingTransactions, newStakingTxs...)
node.reducePendingStakingTransactions()
for _, tx := range newStakingTxs {
if _, ok := node.pendingStakingTransactions[tx.Hash()]; !ok {
node.pendingStakingTransactions[tx.Hash()] = tx
}
if len(node.pendingStakingTransactions) > txPoolLimit {
break
}
}
node.pendingStakingTxMutex.Unlock()
utils.Logger().Info().Int("length of newStakingTxs", len(newStakingTxs)).Int("totalPending", len(node.pendingTransactions)).Msg("Got more staking transactions")
}
@ -369,19 +361,33 @@ func (node *Node) getTransactionsForNewBlock(coinbase common.Address) (types.Tra
node.pendingStakingTxMutex.Lock()
defer node.pendingStakingTxMutex.Unlock()
selected, unselected, invalid := node.Worker.SelectTransactionsForNewBlock(newBlockNum, node.pendingTransactions, node.recentTxsStats, txsThrottleConfig, coinbase)
selectedStaking, unselectedStaking, invalidStaking := node.Worker.SelectStakingTransactionsForNewBlock(newBlockNum, node.pendingTransactions, node.recentTxsStats, txsThrottleConfig, coinbase)
pendingTransactions := types.Transactions{}
pendingStakingTransactions := types2.StakingTransactions{}
node.pendingTransactions = unselected
node.reducePendingTransactions()
for _, tx := range node.pendingTransactions {
pendingTransactions = append(pendingTransactions, tx)
}
for _, tx := range node.pendingStakingTransactions {
pendingStakingTransactions = append(pendingStakingTransactions, tx)
}
selected, unselected, invalid := node.Worker.SelectTransactionsForNewBlock(newBlockNum, pendingTransactions, node.recentTxsStats, txsThrottleConfig, coinbase)
selectedStaking, unselectedStaking, invalidStaking := node.Worker.SelectStakingTransactionsForNewBlock(newBlockNum, pendingStakingTransactions, node.recentTxsStats, txsThrottleConfig, coinbase)
node.pendingTransactions = make(map[common.Hash]*types.Transaction)
for _, unselectedTx := range unselected {
node.pendingTransactions[unselectedTx.Hash()] = unselectedTx
}
utils.Logger().Info().
Int("remainPending", len(node.pendingTransactions)).
Int("selected", len(selected)).
Int("invalidDiscarded", len(invalid)).
Msg("Selecting Transactions")
node.pendingStakingTransactions = unselectedStaking
node.reducePendingStakingTransactions()
node.pendingStakingTransactions = make(map[common.Hash]*types2.StakingTransaction)
for _, unselectedStakingTx := range unselectedStaking {
node.pendingStakingTransactions[unselectedStakingTx.Hash()] = unselectedStakingTx
}
utils.Logger().Info().
Int("remainPending", len(node.pendingStakingTransactions)).
Int("selected", len(unselectedStaking)).
@ -464,6 +470,8 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, chainDBFactory shardc
}
node.pendingCXReceipts = make(map[string]*types.CXReceiptsProof)
node.pendingTransactions = make(map[common.Hash]*types.Transaction)
node.pendingStakingTransactions = make(map[common.Hash]*types2.StakingTransaction)
node.Consensus.VerifiedNewBlock = make(chan *types.Block)
// the sequence number is the next block number to be added in consensus protocol, which is always one more than current chain header block

@ -150,7 +150,7 @@ func (w *Worker) SelectTransactionsForNewBlock(newBlockNum uint64, txs types.Tra
}
// SelectStakingTransactionsForNewBlock selects staking transactions for new block.
func (w *Worker) SelectStakingTransactionsForNewBlock(newBlockNum uint64, txs types.Transactions, recentTxsStats types.RecentTxsStats, txsThrottleConfig *shardingconfig.TxsThrottleConfig, coinbase common.Address) (types2.StakingTransactions, types2.StakingTransactions, types2.StakingTransactions) {
func (w *Worker) SelectStakingTransactionsForNewBlock(newBlockNum uint64, txs types2.StakingTransactions, recentTxsStats types.RecentTxsStats, txsThrottleConfig *shardingconfig.TxsThrottleConfig, coinbase common.Address) (types2.StakingTransactions, types2.StakingTransactions, types2.StakingTransactions) {
// TODO: implement staking transaction selection
return types2.StakingTransactions{}, types2.StakingTransactions{}, types2.StakingTransactions{}
}

@ -1,9 +1,11 @@
package types
import (
"bytes"
"math/big"
"github.com/harmony-one/harmony/internal/common"
"github.com/ethereum/go-ethereum/common"
"github.com/harmony-one/harmony/crypto/hash"
)
// StakingTransaction struct.
@ -19,8 +21,19 @@ type StakingTransaction struct {
S *big.Int `json:"s" gencodec:"required"`
// This is only used when marshaling to JSON.
Hash *common.Hash `json:"hash" rlp:"-"`
hash *common.Hash `json:"hash" rlp:"-"`
}
// StakingTransactions is a Transaction slice type for basic sorting.
type StakingTransactions []*StakingTransaction
// Hash hashes the RLP encoding of tx.
// It uniquely identifies the transaction.
func (tx *StakingTransaction) Hash() common.Hash {
emptyHash := common.Hash{}
if bytes.Compare(tx.hash[:], emptyHash[:]) == 0 {
h := hash.FromRLP(tx)
tx.hash = &h
}
return *tx.hash
}

Loading…
Cancel
Save