From d21ce08309c787ee6eaaff7f6af2cf8e1dd10cf1 Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Tue, 5 Nov 2019 17:51:29 -0800 Subject: [PATCH 1/7] Use TxPool for worker's transaction proposal --- go.mod | 31 ++---- node/node.go | 83 ++-------------- node/node_handler_test.go | 13 ++- node/node_newblock.go | 18 +++- node/worker/worker.go | 190 +++++++++++++++---------------------- node/worker/worker_test.go | 6 +- test/chain/main.go | 149 ++--------------------------- 7 files changed, 129 insertions(+), 361 deletions(-) diff --git a/go.mod b/go.mod index 65efc47dd..016cfd98b 100644 --- a/go.mod +++ b/go.mod @@ -4,33 +4,29 @@ go 1.12 require ( github.com/Workiva/go-datastructures v1.0.50 - github.com/allegro/bigcache v1.2.1 // indirect - github.com/aristanetworks/goarista v0.0.0-20190607111240-52c2a7864a08 // indirect - github.com/beorn7/perks v1.0.1 // indirect + github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc // indirect + github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf // indirect github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d github.com/cespare/cp v1.1.1 github.com/davecgh/go-spew v1.1.1 github.com/deckarep/golang-set v1.7.1 - github.com/edsrzf/mmap-go v1.0.0 // indirect github.com/ethereum/go-ethereum v1.8.27 github.com/fatih/color v1.7.0 github.com/fjl/memsize v0.0.0-20180929194037-2a09253e352a - github.com/garslo/gogen v0.0.0-20170307003452-d6ebae628c7c // indirect github.com/golang/mock v1.3.1 - github.com/golang/protobuf v1.3.1 + github.com/golang/protobuf v1.3.2 github.com/golangci/golangci-lint v1.17.1 github.com/gorilla/handlers v1.4.0 github.com/gorilla/mux v1.7.2 github.com/harmony-ek/gencodec v0.0.0-20190215044613-e6740dbdd846 github.com/harmony-one/bls v0.0.5 + github.com/harmony-one/go-sdk v0.0.0-20191105223634-37bdcedd4a9d github.com/harmony-one/taggedrlp v0.1.2 - github.com/harmony-one/vdf v0.0.0-20190924175951-620379da8849 - github.com/hashicorp/golang-lru v0.5.1 + github.com/harmony-one/vdf v1.0.0 + github.com/hashicorp/golang-lru v0.5.3 github.com/iancoleman/strcase v0.0.0-20190422225806-e506e3ef7365 github.com/ipfs/go-ds-badger v0.0.5 github.com/ipfs/go-log v0.0.1 - github.com/karalabe/hid v1.0.0 // indirect - github.com/kylelemons/godebug v1.1.0 // indirect github.com/libp2p/go-libp2p v0.3.1 github.com/libp2p/go-libp2p-core v0.2.2 github.com/libp2p/go-libp2p-crypto v0.1.0 @@ -46,27 +42,18 @@ require ( github.com/natefinch/lumberjack v2.0.0+incompatible github.com/pborman/uuid v1.2.0 github.com/pkg/errors v0.8.1 - github.com/prometheus/client_golang v0.9.2 - github.com/prometheus/common v0.4.1 // indirect - github.com/prometheus/procfs v0.0.3 // indirect + github.com/prometheus/client_golang v1.1.0 github.com/rjeczalik/notify v0.9.2 - github.com/rs/cors v1.7.0 // indirect github.com/rs/zerolog v1.14.3 github.com/shirou/gopsutil v2.18.12+incompatible - github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a // indirect github.com/spf13/cobra v0.0.5 github.com/stretchr/testify v1.3.0 github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965 github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc - golang.org/x/crypto v0.0.0-20190618222545-ea8f1a30c443 + golang.org/x/crypto v0.0.0-20190909091759-094676da4a83 golang.org/x/lint v0.0.0-20190409202823-959b441ac422 - golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3 // indirect golang.org/x/tools v0.0.0-20190924052046-3ac2a5bbd98a - google.golang.org/appengine v1.4.0 // indirect - google.golang.org/grpc v1.22.0 + google.golang.org/grpc v1.23.1 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 gopkg.in/ini.v1 v1.42.0 - gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect - gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect - gopkg.in/urfave/cli.v1 v1.20.0 // indirect ) diff --git a/node/node.go b/node/node.go index e068aa700..6a96588ba 100644 --- a/node/node.go +++ b/node/node.go @@ -277,18 +277,14 @@ func (node *Node) tryBroadcast(tx *types.Transaction) { // Add new transactions to the pending transaction list. func (node *Node) addPendingTransactions(newTxs types.Transactions) { - txPoolLimit := core.ShardingSchedule.MaxTxPoolSizeLimit() node.pendingTxMutex.Lock() - for _, tx := range newTxs { - if _, ok := node.pendingTransactions[tx.Hash()]; !ok { - node.pendingTransactions[tx.Hash()] = tx - } - if len(node.pendingTransactions) > txPoolLimit { - break - } - } + + node.TxPool.AddRemotes(newTxs) + node.pendingTxMutex.Unlock() - utils.Logger().Info().Int("length of newTxs", len(newTxs)).Int("totalPending", len(node.pendingTransactions)).Msg("Got more transactions") + + pendingCount, queueCount := node.TxPool.Stats() + utils.Logger().Info().Int("length of newTxs", len(newTxs)).Int("totalPending", pendingCount).Int("totalQueued", queueCount).Msg("Got more transactions") } // Add new staking transactions to the pending staking transaction list. @@ -304,7 +300,7 @@ func (node *Node) addPendingStakingTransactions(newStakingTxs staking.StakingTra } } node.pendingStakingTxMutex.Unlock() - utils.Logger().Info().Int("length of newStakingTxs", len(newStakingTxs)).Int("totalPending", len(node.pendingTransactions)).Msg("Got more staking transactions") + utils.Logger().Info().Int("length of newStakingTxs", len(newStakingTxs)).Int("totalPending", len(node.pendingStakingTransactions)).Msg("Got more staking transactions") } // AddPendingStakingTransaction staking transactions @@ -322,7 +318,6 @@ func (node *Node) AddPendingTransaction(newTx *types.Transaction) { utils.Logger().Info().Str("Hash", newTx.Hash().Hex()).Msg("Broadcasting Tx") node.tryBroadcast(newTx) } - utils.Logger().Debug().Int("totalPending", len(node.pendingTransactions)).Msg("Got ONE more transaction") } // AddPendingReceipts adds one receipt message to pending list. @@ -347,70 +342,6 @@ func (node *Node) AddPendingReceipts(receipts *types.CXReceiptsProof) { utils.Logger().Info().Int("totalPendingReceipts", len(node.pendingCXReceipts)).Msg("Got ONE more receipt message") } -// Take out a subset of valid transactions from the pending transaction list -// Note the pending transaction list will then contain the rest of the txs -func (node *Node) getTransactionsForNewBlock(coinbase common.Address) (types.Transactions, staking.StakingTransactions) { - txsThrottleConfig := core.ShardingSchedule.TxsThrottleConfig() - - // the next block number to be added in consensus protocol, which is always one more than current chain header block - newBlockNum := node.Blockchain().CurrentBlock().NumberU64() + 1 - // remove old (> txsThrottleConfigRecentTxDuration) blockNum keys from recentTxsStats and initiailize for the new block - for blockNum := range node.recentTxsStats { - recentTxsBlockNumGap := uint64(txsThrottleConfig.RecentTxDuration / node.BlockPeriod) - if recentTxsBlockNumGap < newBlockNum-blockNum { - delete(node.recentTxsStats, blockNum) - } - } - node.recentTxsStats[newBlockNum] = make(types.BlockTxsCounts) - // Must update to the correct current state before processing potential txns - if err := node.Worker.UpdateCurrent(coinbase); err != nil { - utils.Logger().Error(). - Err(err). - Msg("Failed updating worker's state before txn selection") - return types.Transactions{}, staking.StakingTransactions{} - } - - node.pendingTxMutex.Lock() - defer node.pendingTxMutex.Unlock() - node.pendingStakingTxMutex.Lock() - defer node.pendingStakingTxMutex.Unlock() - pendingTransactions := types.Transactions{} - pendingStakingTransactions := staking.StakingTransactions{} - for _, tx := range node.pendingTransactions { - pendingTransactions = append(pendingTransactions, tx) - } - for _, tx := range node.pendingStakingTransactions { - pendingStakingTransactions = append(pendingStakingTransactions, tx) - } - - selected, unselected, invalid := node.Worker.SelectTransactionsForNewBlock(newBlockNum, pendingTransactions, node.recentTxsStats, txsThrottleConfig, coinbase) - - selectedStaking, unselectedStaking, invalidStaking := - node.Worker.SelectStakingTransactionsForNewBlock(newBlockNum, pendingStakingTransactions, coinbase) - - node.pendingTransactions = make(map[common.Hash]*types.Transaction) - for _, unselectedTx := range unselected { - node.pendingTransactions[unselectedTx.Hash()] = unselectedTx - } - utils.Logger().Info(). - Int("remainPending", len(node.pendingTransactions)). - Int("selected", len(selected)). - Int("invalidDiscarded", len(invalid)). - Msg("Selecting Transactions") - - node.pendingStakingTransactions = make(map[common.Hash]*staking.StakingTransaction) - for _, unselectedStakingTx := range unselectedStaking { - node.pendingStakingTransactions[unselectedStakingTx.Hash()] = unselectedStakingTx - } - utils.Logger().Info(). - Int("remainPending", len(node.pendingStakingTransactions)). - Int("selected", len(unselectedStaking)). - Int("invalidDiscarded", len(invalidStaking)). - Msg("Selecting Staking Transactions") - - return selected, selectedStaking -} - func (node *Node) startRxPipeline( receiver p2p.GroupReceiver, queue *msgq.Queue, numWorkers int, ) { diff --git a/node/node_handler_test.go b/node/node_handler_test.go index 49a012270..e25d2df4f 100644 --- a/node/node_handler_test.go +++ b/node/node_handler_test.go @@ -3,6 +3,9 @@ package node import ( "testing" + "github.com/harmony-one/harmony/core/types" + types2 "github.com/harmony-one/harmony/staking/types" + "github.com/ethereum/go-ethereum/common" "github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/consensus/quorum" @@ -33,8 +36,9 @@ func TestAddNewBlock(t *testing.T) { nodeconfig.SetNetworkType(nodeconfig.Devnet) node := New(host, consensus, testDBFactory, false) - selectedTxs, stks := node.getTransactionsForNewBlock(common.Address{}) - node.Worker.CommitTransactions(selectedTxs, stks, common.Address{}) + txs := make(map[common.Address]types.Transactions) + stks := types2.StakingTransactions{} + node.Worker.CommitTransactions(txs, stks, common.Address{}) block, _ := node.Worker.FinalizeNewBlock([]byte{}, []byte{}, 0, common.Address{}, nil, nil) err = node.AddNewBlock(block) @@ -65,8 +69,9 @@ func TestVerifyNewBlock(t *testing.T) { } node := New(host, consensus, testDBFactory, false) - selectedTxs, stks := node.getTransactionsForNewBlock(common.Address{}) - node.Worker.CommitTransactions(selectedTxs, stks, common.Address{}) + txs := make(map[common.Address]types.Transactions) + stks := types2.StakingTransactions{} + node.Worker.CommitTransactions(txs, stks, common.Address{}) block, _ := node.Worker.FinalizeNewBlock([]byte{}, []byte{}, 0, common.Address{}, nil, nil) if err := node.VerifyNewBlock(block); err != nil { diff --git a/node/node_newblock.go b/node/node_newblock.go index faeff3426..5991d80e3 100644 --- a/node/node_newblock.go +++ b/node/node_newblock.go @@ -5,6 +5,8 @@ import ( "sort" "time" + types2 "github.com/harmony-one/harmony/staking/types" + "github.com/ethereum/go-ethereum/common" "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/types" @@ -80,10 +82,20 @@ func (node *Node) proposeNewBlock() (*types.Block, error) { // Update worker's current header and state data in preparation to propose/process new transactions coinbase := node.Consensus.SelfAddress - // Prepare transactions including staking transactions - selectedTxs, selectedStakingTxs := node.getTransactionsForNewBlock(coinbase) + // Prepare transactions including staking transactions\ + pending, err := node.TxPool.Pending() + if err != nil { + utils.Logger().Err(err).Msg("Failed to fetch pending transactions") + return nil, err + } + + // TODO: integrate staking transaction into tx pool + pendingStakingTransactions := types2.StakingTransactions{} + for _, tx := range node.pendingStakingTransactions { + pendingStakingTransactions = append(pendingStakingTransactions, tx) + } - if err := node.Worker.CommitTransactions(selectedTxs, selectedStakingTxs, coinbase); err != nil { + if err := node.Worker.CommitTransactions(pending, pendingStakingTransactions, coinbase); err != nil { ctxerror.Log15(utils.GetLogger().Error, ctxerror.New("cannot commit transactions"). WithCause(err)) diff --git a/node/worker/worker.go b/node/worker/worker.go index d998d0e27..35a713bd9 100644 --- a/node/worker/worker.go +++ b/node/worker/worker.go @@ -5,6 +5,8 @@ import ( "math/big" "time" + "github.com/harmony-one/go-sdk/pkg/address" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rlp" "github.com/harmony-one/harmony/block" @@ -25,6 +27,8 @@ import ( // environment is the worker's current environment and holds all of the current state information. type environment struct { + signer types.Signer + state *state.DB // apply state changes here gasPool *core.GasPool // available gas used to pack transactions @@ -92,107 +96,96 @@ func (w *Worker) throttleTxs(selected types.Transactions, recentTxsStats types.R return sender, shardingconfig.TxSelect } -// SelectTransactionsForNewBlock selects transactions for new block. -func (w *Worker) SelectTransactionsForNewBlock(newBlockNum uint64, txs types.Transactions, recentTxsStats types.RecentTxsStats, txsThrottleConfig *shardingconfig.TxsThrottleConfig, coinbase common.Address) (types.Transactions, types.Transactions, types.Transactions) { +// CommitTransactions commits transactions for new block. +func (w *Worker) CommitTransactions(pendingNormal map[common.Address]types.Transactions, pendingStaking staking.StakingTransactions, coinbase common.Address) error { if w.current.gasPool == nil { w.current.gasPool = new(core.GasPool).AddGas(w.current.header.GasLimit()) } - selected := types.Transactions{} - unselected := types.Transactions{} - invalid := types.Transactions{} - for _, tx := range txs { - if tx.ShardID() != w.chain.ShardID() { - invalid = append(invalid, tx) - continue - } + txs := types.NewTransactionsByPriceAndNonce(w.current.signer, pendingNormal) + + var coalescedLogs []*types.Log + // NORMAL + for { // If we don't have enough gas for any further transactions then we're done if w.current.gasPool.Gas() < params.TxGas { - utils.Logger().Info().Str("Not enough gas for further transactions, have", w.current.gasPool.String()).Uint64("want", params.TxGas) - unselected = append(unselected, tx) + utils.Logger().Info().Uint64("have", w.current.gasPool.Gas()).Uint64("want", params.TxGas).Msg("Not enough gas for further transactions") + break + } + // Retrieve the next transaction and abort if all done + tx := txs.Peek() + if tx == nil { + break + } + // Error may be ignored here. The error has already been checked + // during transaction acceptance is the transaction pool. + // + // We use the eip155 signer regardless of the current hf. + from, _ := types.Sender(w.current.signer, tx) + // Check whether the tx is replay protected. If we're not in the EIP155 hf + // phase, start ignoring the sender until we do. + if tx.Protected() && !w.config.IsEIP155(w.current.header.Number()) { + utils.Logger().Info().Str("hash", tx.Hash().Hex()).Str("eip155Epoch", w.config.EIP155Epoch.String()).Msg("Ignoring reply protected transaction") + + txs.Pop() continue } + // Start executing the transaction + w.current.state.Prepare(tx.Hash(), common.Hash{}, len(w.current.txs)) - sender, flag := w.throttleTxs(selected, recentTxsStats, txsThrottleConfig, tx) - switch flag { - case shardingconfig.TxUnselect: - unselected = append(unselected, tx) - - case shardingconfig.TxInvalid: - invalid = append(invalid, tx) - - case shardingconfig.TxSelect: - if tx.GasPrice().Uint64() == 0 { - invalid = append(invalid, tx) - } else { - snap := w.current.state.Snapshot() - _, err := w.commitTransaction(tx, coinbase) - if err != nil { - w.current.state.RevertToSnapshot(snap) - invalid = append(invalid, tx) - utils.Logger().Error().Err(err).Str("txId", tx.Hash().Hex()).Msg("Commit transaction error") - } else { - selected = append(selected, tx) - // handle the case when msg was not able to extracted from tx - if len(sender.String()) > 0 { - recentTxsStats[newBlockNum][sender]++ - } - } - } + if tx.ShardID() != w.chain.ShardID() { + txs.Shift() + continue } - // log invalid or unselected txs - if flag == shardingconfig.TxUnselect || flag == shardingconfig.TxInvalid { - utils.Logger().Info().Str("txId", tx.Hash().Hex()).Str("txThrottleFlag", flag.String()).Msg("Transaction Throttle flag") + logs, err := w.commitTransaction(tx, coinbase) + switch err { + case core.ErrGasLimitReached: + // Pop the current out-of-gas transaction without shifting in the next from the account + utils.Logger().Info().Str("sender", address.ToBech32(from)).Msg("Gas limit exceeded for current block") + txs.Pop() + + case core.ErrNonceTooLow: + // New head notification data race between the transaction pool and miner, shift + utils.Logger().Info().Str("sender", address.ToBech32(from)).Uint64("nonce", tx.Nonce()).Msg("Skipping transaction with low nonce") + txs.Shift() + + case core.ErrNonceTooHigh: + // Reorg notification data race between the transaction pool and miner, skip account = + utils.Logger().Info().Str("sender", address.ToBech32(from)).Uint64("nonce", tx.Nonce()).Msg("Skipping account with high nonce") + txs.Pop() + + case nil: + // Everything ok, collect the logs and shift in the next transaction from the same account + coalescedLogs = append(coalescedLogs, logs...) + txs.Shift() + + default: + // Strange error, discard the transaction and get the next in line (note, the + // nonce-too-high clause will prevent us from executing in vain). + utils.Logger().Info().Str("hash", tx.Hash().Hex()).AnErr("err", err).Msg("Transaction failed, account skipped") + txs.Shift() } - - utils.Logger().Info().Str("txId", tx.Hash().Hex()).Uint64("txGasLimit", tx.Gas()).Msg("Transaction gas limit info") - } - - utils.Logger().Info().Uint64("newBlockNum", newBlockNum).Int("newTxns", len(selected)).Uint64("blockGasLimit", w.current.header.GasLimit()).Uint64("blockGasUsed", w.current.header.GasUsed()).Msg("Block gas limit and usage info") - - return selected, unselected, invalid -} - -// SelectStakingTransactionsForNewBlock selects staking transactions for new block. -func (w *Worker) SelectStakingTransactionsForNewBlock( - newBlockNum uint64, txs staking.StakingTransactions, - coinbase common.Address) (staking.StakingTransactions, staking.StakingTransactions, staking.StakingTransactions) { - - // only beaconchain process staking transaction - if w.chain.ShardID() != values.BeaconChainShardID { - utils.Logger().Warn().Msgf("Invalid shardID: %v", w.chain.ShardID()) - return nil, nil, nil } - if w.current.gasPool == nil { - w.current.gasPool = new(core.GasPool).AddGas(w.current.header.GasLimit()) - } - - selected := staking.StakingTransactions{} - // TODO: chao add total gas fee checking when needed - unselected := staking.StakingTransactions{} - invalid := staking.StakingTransactions{} - for _, tx := range txs { - snap := w.current.state.Snapshot() - _, err := w.commitStakingTransaction(tx, coinbase) - if err != nil { - w.current.state.RevertToSnapshot(snap) - invalid = append(invalid, tx) - utils.Logger().Error().Err(err).Str("stakingTxId", tx.Hash().Hex()).Msg("Commit staking transaction error") - } else { - selected = append(selected, tx) - utils.Logger().Info().Str("stakingTxId", tx.Hash().Hex()).Uint64("txGasLimit", tx.Gas()).Msg("StakingTransaction gas limit info") + // STAKING - only beaconchain process staking transaction + if w.chain.ShardID() == values.BeaconChainShardID { + for _, tx := range pendingStaking { + logs, err := w.commitStakingTransaction(tx, coinbase) + if err != nil { + utils.Logger().Error().Err(err).Str("stakingTxId", tx.Hash().Hex()).Msg("Commit staking transaction error") + } else { + coalescedLogs = append(coalescedLogs, logs...) + utils.Logger().Info().Str("stakingTxId", tx.Hash().Hex()).Uint64("txGasLimit", tx.Gas()).Msg("StakingTransaction gas limit info") + } } } - utils.Logger().Info().Uint64("newBlockNum", newBlockNum).Uint64("blockGasLimit", - w.current.header.GasLimit()).Uint64("blockGasUsed", - w.current.header.GasUsed()).Msg("[SelectStakingTransaction] Block gas limit and usage info") + utils.Logger().Info().Int("newTxns", len(w.current.txs)).Uint64("blockGasLimit", w.current.header.GasLimit()).Uint64("blockGasUsed", w.current.header.GasUsed()).Msg("Block gas limit and usage info") - return selected, unselected, invalid + return nil } func (w *Worker) commitStakingTransaction(tx *staking.StakingTransaction, coinbase common.Address) ([]*types.Log, error) { @@ -238,40 +231,6 @@ func (w *Worker) commitTransaction(tx *types.Transaction, coinbase common.Addres return receipt.Logs, nil } -// CommitTransactions commits transactions. -func (w *Worker) CommitTransactions(txs types.Transactions, stakingTxns staking.StakingTransactions, coinbase common.Address) error { - // Must update to the correct current state before processing potential txns - if err := w.UpdateCurrent(coinbase); err != nil { - utils.Logger().Error(). - Err(err). - Msg("Failed updating worker's state before committing txns") - return err - } - - if w.current.gasPool == nil { - w.current.gasPool = new(core.GasPool).AddGas(w.current.header.GasLimit()) - } - for _, tx := range txs { - snap := w.current.state.Snapshot() - _, err := w.commitTransaction(tx, coinbase) - if err != nil { - w.current.state.RevertToSnapshot(snap) - return err - - } - } - for _, tx := range stakingTxns { - snap := w.current.state.Snapshot() - _, err := w.commitStakingTransaction(tx, coinbase) - if err != nil { - w.current.state.RevertToSnapshot(snap) - return err - - } - } - return nil -} - // CommitReceipts commits a list of already verified incoming cross shard receipts func (w *Worker) CommitReceipts(receiptsList []*types.CXReceiptsProof) error { if w.current.gasPool == nil { @@ -322,6 +281,7 @@ func (w *Worker) makeCurrent(parent *types.Block, header *block.Header) error { return err } env := &environment{ + signer: types.NewEIP155Signer(w.config.ChainID), state: state, header: header, } diff --git a/node/worker/worker_test.go b/node/worker/worker_test.go index c52ee4cb5..3a0ef5fd5 100644 --- a/node/worker/worker_test.go +++ b/node/worker/worker_test.go @@ -5,6 +5,8 @@ import ( "math/rand" "testing" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" blockfactory "github.com/harmony-one/harmony/block/factory" @@ -74,7 +76,9 @@ func TestCommitTransactions(t *testing.T) { tx, _ := types.SignTx(types.NewTransaction(baseNonce, testBankAddress, uint32(0), big.NewInt(int64(denominations.One*randAmount)), params.TxGas, nil, nil), types.HomesteadSigner{}, testBankKey) // Commit the tx to the worker - err := worker.CommitTransactions(types.Transactions{tx}, nil, testBankAddress) + txs := make(map[common.Address]types.Transactions) + txs[testBankAddress] = types.Transactions{tx} + err := worker.CommitTransactions(txs, nil, testBankAddress) if err != nil { t.Error(err) } diff --git a/test/chain/main.go b/test/chain/main.go index 8a565ea69..cfd61ac6f 100644 --- a/test/chain/main.go +++ b/test/chain/main.go @@ -125,7 +125,11 @@ func fundFaucetContract(chain *core.BlockChain) { amount := 720000 tx, _ := types.SignTx(types.NewTransaction(nonce+uint64(4), StakingAddress, 0, big.NewInt(int64(amount)), params.TxGas, nil, nil), types.HomesteadSigner{}, FaucetPriKey) txs = append(txs, tx) - err := contractworker.CommitTransactions(txs, nil, testUserAddress) + + txmap := make(map[common.Address]types.Transactions) + txmap[FaucetAddress] = txs + + err := contractworker.CommitTransactions(txmap, nil, testUserAddress) if err != nil { fmt.Println(err) } @@ -163,7 +167,10 @@ func callFaucetContractToFundAnAddress(chain *core.BlockChain) { callEnc = append(callEnc, paddedAddress...) callfaucettx, _ := types.SignTx(types.NewTransaction(nonce+uint64(5), faucetContractAddress, 0, big.NewInt(0), params.TxGasContractCreation*10, nil, callEnc), types.HomesteadSigner{}, FaucetPriKey) - err = contractworker.CommitTransactions(types.Transactions{callfaucettx}, nil, testUserAddress) + txmap := make(map[common.Address]types.Transactions) + txmap[FaucetAddress] = types.Transactions{callfaucettx} + + err = contractworker.CommitTransactions(txmap, nil, testUserAddress) if err != nil { fmt.Println(err) } @@ -192,143 +199,6 @@ func playFaucetContract(chain *core.BlockChain) { callFaucetContractToFundAnAddress(chain) } -func playSetupStakingContract(chain *core.BlockChain) { - fmt.Println() - fmt.Println("--------- ************************** ---------") - fmt.Println() - fmt.Println("--------- Now Setting up Staking Contract ---------") - fmt.Println() - //worker := pkgworker.New(params.TestChainConfig, chain, consensus.NewFaker(), crypto.PubkeyToAddress(StakingPriKey.PublicKey), 0) - - state = contractworker.GetCurrentState() - fmt.Println("Before Staking Balances") - fmt.Println("user address balance") - fmt.Println(state.GetBalance(allRandomUserAddress[0])) - fmt.Println("The balances for 2 more users:") - fmt.Println(state.GetBalance(allRandomUserAddress[1])) - fmt.Println(state.GetBalance(allRandomUserAddress[2])) - - nonce = contractworker.GetCurrentState().GetNonce(crypto.PubkeyToAddress(StakingPriKey.PublicKey)) - dataEnc = common.FromHex(StakingContractBinary) - stx, _ := types.SignTx(types.NewContractCreation(nonce, 0, big.NewInt(0), params.TxGasContractCreation*10, nil, dataEnc), types.HomesteadSigner{}, StakingPriKey) - stakingtxns = append(stakingtxns, stx) - - stakeContractAddress = crypto.CreateAddress(StakingAddress, nonce+uint64(0)) - - state = contractworker.GetCurrentState() - fmt.Println("stake contract balance :") - fmt.Println(state.GetBalance(stakeContractAddress)) - fmt.Println("stake address balance :") - fmt.Println(state.GetBalance(StakingAddress)) -} - -func playStaking(chain *core.BlockChain) { - depositFnSignature := []byte("deposit()") - - fnHash := hash.Keccak256(depositFnSignature) - methodID := fnHash[:4] - - var callEncl []byte - callEncl = append(callEncl, methodID...) - stake := 100000 - fmt.Println() - fmt.Println("--------- Staking Contract added to txns ---------") - fmt.Println() - fmt.Printf("-- Now Staking with stake: %d --\n", stake) - fmt.Println() - for i := 0; i <= 2; i++ { - //Deposit Does not take a argument, stake is transferred via amount. - tx, _ := types.SignTx(types.NewTransaction(0, stakeContractAddress, 0, big.NewInt(int64(stake)), params.TxGas*5, nil, callEncl), types.HomesteadSigner{}, allRandomUserKey[i]) - stakingtxns = append(stakingtxns, tx) - } - err = contractworker.CommitTransactions(stakingtxns, nil, common.Address{}) - - if err != nil { - fmt.Println(err) - } - block, _ := contractworker.FinalizeNewBlock([]byte{}, []byte{}, 0, common.Address{}, nil, nil) - _, err = chain.InsertChain(types.Blocks{block}, true /* verifyHeaders */) - if err != nil { - fmt.Println(err) - } - // receipts := contractworker.GetCurrentReceipts() - // fmt.Println(receipts[len(receipts)-4].ContractAddress) - state = contractworker.GetCurrentState() - - fmt.Printf("After Staking Balances (should be less by %d)\n", stake) - fmt.Println("user address balance") - fmt.Println(state.GetBalance(allRandomUserAddress[0])) - fmt.Println("The balances for 2 more users:") - fmt.Println(state.GetBalance(allRandomUserAddress[1])) - fmt.Println(state.GetBalance(allRandomUserAddress[2])) - fmt.Println("faucet contract balance (unchanged):") - fmt.Println(state.GetBalance(faucetContractAddress)) - fmt.Println("stake contract balance :") - fmt.Println(state.GetBalance(stakeContractAddress)) - fmt.Println("stake address balance :") - fmt.Println(state.GetBalance(StakingAddress)) -} - -func playWithdrawStaking(chain *core.BlockChain) { - fmt.Println() - fmt.Println("--------- Now Setting up Withdrawing Stakes ---------") - - withdrawFnSignature := []byte("withdraw(uint256)") - - fnHash := hash.Keccak256(withdrawFnSignature) - methodID := fnHash[:4] - - withdraw := "5000" - withdrawstake := new(big.Int) - withdrawstake.SetString(withdraw, 10) - paddedAmount := common.LeftPadBytes(withdrawstake.Bytes(), 32) - - var dataEncl []byte - dataEncl = append(dataEncl, methodID...) - dataEncl = append(dataEncl, paddedAmount...) - - var withdrawstakingtxns []*types.Transaction - - fmt.Println() - fmt.Printf("-- Withdrawing Stake by amount: %s --\n", withdraw) - fmt.Println() - - for i := 0; i <= 2; i++ { - cnonce := contractworker.GetCurrentState().GetNonce(allRandomUserAddress[i]) - tx, _ := types.SignTx(types.NewTransaction(cnonce, stakeContractAddress, 0, big.NewInt(0), params.TxGas*5, nil, dataEncl), types.HomesteadSigner{}, allRandomUserKey[i]) - withdrawstakingtxns = append(withdrawstakingtxns, tx) - } - - err = contractworker.CommitTransactions(withdrawstakingtxns, nil, common.Address{}) - if err != nil { - fmt.Println("error:") - fmt.Println(err) - } - - block, _ := contractworker.FinalizeNewBlock([]byte{}, []byte{}, 0, common.Address{}, nil, nil) - _, err = chain.InsertChain(types.Blocks{block}, true /* verifyHeaders */) - if err != nil { - fmt.Println(err) - } - state = contractworker.GetCurrentState() - fmt.Printf("Withdraw Staking Balances (should be up by %s)\n", withdraw) - fmt.Println(state.GetBalance(allRandomUserAddress[0])) - fmt.Println(state.GetBalance(allRandomUserAddress[1])) - fmt.Println(state.GetBalance(allRandomUserAddress[2])) - fmt.Println("faucet contract balance (unchanged):") - fmt.Println(state.GetBalance(faucetContractAddress)) - fmt.Printf("stake contract balance (should downup by %s)\n", withdraw) - fmt.Println(state.GetBalance(stakeContractAddress)) - fmt.Println("stake address balance :") - fmt.Println(state.GetBalance(StakingAddress)) -} - -func playStakingContract(chain *core.BlockChain) { - playSetupStakingContract(chain) - playStaking(chain) - playWithdrawStaking(chain) -} - func main() { genesis := gspec.MustCommit(database) chain, _ := core.NewBlockChain(database, nil, gspec.Config, chain.Engine(), vm.Config{}, nil) @@ -356,5 +226,4 @@ func main() { } playFaucetContract(chain) - playStakingContract(chain) } From 2c9f2135cd173a7cc3a4da327cb61dbc4a2c2c19 Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Tue, 5 Nov 2019 19:19:06 -0800 Subject: [PATCH 2/7] Reset tx pool when there is a new block --- core/blockchain.go | 2 +- core/tx_pool.go | 27 ++++++++++++++++++++------- node/node_newblock.go | 3 +++ 3 files changed, 24 insertions(+), 8 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 7d4af6818..c09e14785 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1155,7 +1155,6 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types. // After insertion is done, all accumulated events will be fired. func (bc *BlockChain) InsertChain(chain types.Blocks, verifyHeaders bool) (int, error) { n, events, logs, err := bc.insertChain(chain, verifyHeaders) - bc.PostChainEvents(events, logs) if err == nil { for idx, block := range chain { header := block.Header() @@ -1197,6 +1196,7 @@ func (bc *BlockChain) InsertChain(chain types.Blocks, verifyHeaders bool) (int, } } + bc.PostChainEvents(events, logs) return n, err } diff --git a/core/tx_pool.go b/core/tx_pool.go index c1da5ca2e..57928aa6a 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -292,7 +292,7 @@ func (pool *TxPool) loop() { defer journal.Stop() // Track the previous head headers for transaction reorgs - //head := pool.chain.CurrentBlock() + head := pool.chain.CurrentBlock() // Keep waiting for and reacting to the various events for { @@ -301,12 +301,25 @@ func (pool *TxPool) loop() { case ev := <-pool.chainHeadCh: if ev.Block != nil { pool.mu.Lock() - //if pool.chainconfig.IsHomestead(ev.Block.Number()) { - // pool.homestead = true - //} - //pool.reset(head.Header(), ev.Block.Header()) - //head = ev.Block - + if pool.chainconfig.IsS3(ev.Block.Epoch()) { + pool.homestead = true + } + pool.reset(head.Header(), ev.Block.Header()) + head = ev.Block + + // DEBUG- + pending, queued := pool.stats() + stales := pool.priced.stales + + if pending != prevPending || queued != prevQueued || stales != prevStales { + utils.Logger().Debug(). + Int("executable", pending). + Int("queued", queued). + Int("stales", stales). + Msg("Transaction pool status report") + prevPending, prevQueued, prevStales = pending, queued, stales + } + // - pool.mu.Unlock() } // Be unsubscribed due to system stopped diff --git a/node/node_newblock.go b/node/node_newblock.go index 5991d80e3..4150ce208 100644 --- a/node/node_newblock.go +++ b/node/node_newblock.go @@ -82,6 +82,7 @@ func (node *Node) proposeNewBlock() (*types.Block, error) { // Update worker's current header and state data in preparation to propose/process new transactions coinbase := node.Consensus.SelfAddress + utils.Logger().Info().Msg("11111111111") // Prepare transactions including staking transactions\ pending, err := node.TxPool.Pending() if err != nil { @@ -89,6 +90,7 @@ func (node *Node) proposeNewBlock() (*types.Block, error) { return nil, err } + utils.Logger().Info().Msg("222222222222") // TODO: integrate staking transaction into tx pool pendingStakingTransactions := types2.StakingTransactions{} for _, tx := range node.pendingStakingTransactions { @@ -102,6 +104,7 @@ func (node *Node) proposeNewBlock() (*types.Block, error) { return nil, err } + utils.Logger().Info().Msg("33333333333333") // Prepare cross shard transaction receipts receiptsList := node.proposeReceiptsProof() if len(receiptsList) != 0 { From 6dbfcf1d65e6a3c67be5e9552e3bad2c42919b10 Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Tue, 5 Nov 2019 23:10:27 -0800 Subject: [PATCH 3/7] remove debug code --- core/tx_pool.go | 14 -------------- node/node_newblock.go | 4 +--- 2 files changed, 1 insertion(+), 17 deletions(-) diff --git a/core/tx_pool.go b/core/tx_pool.go index 57928aa6a..acecac9cf 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -306,20 +306,6 @@ func (pool *TxPool) loop() { } pool.reset(head.Header(), ev.Block.Header()) head = ev.Block - - // DEBUG- - pending, queued := pool.stats() - stales := pool.priced.stales - - if pending != prevPending || queued != prevQueued || stales != prevStales { - utils.Logger().Debug(). - Int("executable", pending). - Int("queued", queued). - Int("stales", stales). - Msg("Transaction pool status report") - prevPending, prevQueued, prevStales = pending, queued, stales - } - // - pool.mu.Unlock() } // Be unsubscribed due to system stopped diff --git a/node/node_newblock.go b/node/node_newblock.go index 4150ce208..bccdef28a 100644 --- a/node/node_newblock.go +++ b/node/node_newblock.go @@ -82,7 +82,6 @@ func (node *Node) proposeNewBlock() (*types.Block, error) { // Update worker's current header and state data in preparation to propose/process new transactions coinbase := node.Consensus.SelfAddress - utils.Logger().Info().Msg("11111111111") // Prepare transactions including staking transactions\ pending, err := node.TxPool.Pending() if err != nil { @@ -90,13 +89,13 @@ func (node *Node) proposeNewBlock() (*types.Block, error) { return nil, err } - utils.Logger().Info().Msg("222222222222") // TODO: integrate staking transaction into tx pool pendingStakingTransactions := types2.StakingTransactions{} for _, tx := range node.pendingStakingTransactions { pendingStakingTransactions = append(pendingStakingTransactions, tx) } + node.Worker.UpdateCurrent(coinbase) if err := node.Worker.CommitTransactions(pending, pendingStakingTransactions, coinbase); err != nil { ctxerror.Log15(utils.GetLogger().Error, ctxerror.New("cannot commit transactions"). @@ -104,7 +103,6 @@ func (node *Node) proposeNewBlock() (*types.Block, error) { return nil, err } - utils.Logger().Info().Msg("33333333333333") // Prepare cross shard transaction receipts receiptsList := node.proposeReceiptsProof() if len(receiptsList) != 0 { From a41a4a121665712d25c337be9effff5e5b1bf94d Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Wed, 6 Nov 2019 17:05:38 -0800 Subject: [PATCH 4/7] Fix build issue --- go.mod | 31 ++++++++++++++++++++++--------- node/worker/worker.go | 9 +++++---- 2 files changed, 27 insertions(+), 13 deletions(-) diff --git a/go.mod b/go.mod index 016cfd98b..65efc47dd 100644 --- a/go.mod +++ b/go.mod @@ -4,29 +4,33 @@ go 1.12 require ( github.com/Workiva/go-datastructures v1.0.50 - github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc // indirect - github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf // indirect + github.com/allegro/bigcache v1.2.1 // indirect + github.com/aristanetworks/goarista v0.0.0-20190607111240-52c2a7864a08 // indirect + github.com/beorn7/perks v1.0.1 // indirect github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d github.com/cespare/cp v1.1.1 github.com/davecgh/go-spew v1.1.1 github.com/deckarep/golang-set v1.7.1 + github.com/edsrzf/mmap-go v1.0.0 // indirect github.com/ethereum/go-ethereum v1.8.27 github.com/fatih/color v1.7.0 github.com/fjl/memsize v0.0.0-20180929194037-2a09253e352a + github.com/garslo/gogen v0.0.0-20170307003452-d6ebae628c7c // indirect github.com/golang/mock v1.3.1 - github.com/golang/protobuf v1.3.2 + github.com/golang/protobuf v1.3.1 github.com/golangci/golangci-lint v1.17.1 github.com/gorilla/handlers v1.4.0 github.com/gorilla/mux v1.7.2 github.com/harmony-ek/gencodec v0.0.0-20190215044613-e6740dbdd846 github.com/harmony-one/bls v0.0.5 - github.com/harmony-one/go-sdk v0.0.0-20191105223634-37bdcedd4a9d github.com/harmony-one/taggedrlp v0.1.2 - github.com/harmony-one/vdf v1.0.0 - github.com/hashicorp/golang-lru v0.5.3 + github.com/harmony-one/vdf v0.0.0-20190924175951-620379da8849 + github.com/hashicorp/golang-lru v0.5.1 github.com/iancoleman/strcase v0.0.0-20190422225806-e506e3ef7365 github.com/ipfs/go-ds-badger v0.0.5 github.com/ipfs/go-log v0.0.1 + github.com/karalabe/hid v1.0.0 // indirect + github.com/kylelemons/godebug v1.1.0 // indirect github.com/libp2p/go-libp2p v0.3.1 github.com/libp2p/go-libp2p-core v0.2.2 github.com/libp2p/go-libp2p-crypto v0.1.0 @@ -42,18 +46,27 @@ require ( github.com/natefinch/lumberjack v2.0.0+incompatible github.com/pborman/uuid v1.2.0 github.com/pkg/errors v0.8.1 - github.com/prometheus/client_golang v1.1.0 + github.com/prometheus/client_golang v0.9.2 + github.com/prometheus/common v0.4.1 // indirect + github.com/prometheus/procfs v0.0.3 // indirect github.com/rjeczalik/notify v0.9.2 + github.com/rs/cors v1.7.0 // indirect github.com/rs/zerolog v1.14.3 github.com/shirou/gopsutil v2.18.12+incompatible + github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a // indirect github.com/spf13/cobra v0.0.5 github.com/stretchr/testify v1.3.0 github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965 github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc - golang.org/x/crypto v0.0.0-20190909091759-094676da4a83 + golang.org/x/crypto v0.0.0-20190618222545-ea8f1a30c443 golang.org/x/lint v0.0.0-20190409202823-959b441ac422 + golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3 // indirect golang.org/x/tools v0.0.0-20190924052046-3ac2a5bbd98a - google.golang.org/grpc v1.23.1 + google.golang.org/appengine v1.4.0 // indirect + google.golang.org/grpc v1.22.0 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 gopkg.in/ini.v1 v1.42.0 + gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect + gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect + gopkg.in/urfave/cli.v1 v1.20.0 // indirect ) diff --git a/node/worker/worker.go b/node/worker/worker.go index 35a713bd9..abc7b3a93 100644 --- a/node/worker/worker.go +++ b/node/worker/worker.go @@ -5,7 +5,7 @@ import ( "math/big" "time" - "github.com/harmony-one/go-sdk/pkg/address" + common2 "github.com/harmony-one/harmony/internal/common" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rlp" @@ -141,20 +141,21 @@ func (w *Worker) CommitTransactions(pendingNormal map[common.Address]types.Trans } logs, err := w.commitTransaction(tx, coinbase) + sender, _ := common2.AddressToBech32(from) switch err { case core.ErrGasLimitReached: // Pop the current out-of-gas transaction without shifting in the next from the account - utils.Logger().Info().Str("sender", address.ToBech32(from)).Msg("Gas limit exceeded for current block") + utils.Logger().Info().Str("sender", sender).Msg("Gas limit exceeded for current block") txs.Pop() case core.ErrNonceTooLow: // New head notification data race between the transaction pool and miner, shift - utils.Logger().Info().Str("sender", address.ToBech32(from)).Uint64("nonce", tx.Nonce()).Msg("Skipping transaction with low nonce") + utils.Logger().Info().Str("sender", sender).Uint64("nonce", tx.Nonce()).Msg("Skipping transaction with low nonce") txs.Shift() case core.ErrNonceTooHigh: // Reorg notification data race between the transaction pool and miner, skip account = - utils.Logger().Info().Str("sender", address.ToBech32(from)).Uint64("nonce", tx.Nonce()).Msg("Skipping account with high nonce") + utils.Logger().Info().Str("sender", sender).Uint64("nonce", tx.Nonce()).Msg("Skipping account with high nonce") txs.Pop() case nil: From 41d2c4a6b2c946b33d832996b85e82494fe2349d Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Fri, 8 Nov 2019 09:52:59 -0800 Subject: [PATCH 5/7] Fix typo --- node/node_newblock.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/node_newblock.go b/node/node_newblock.go index bccdef28a..cba6460df 100644 --- a/node/node_newblock.go +++ b/node/node_newblock.go @@ -82,7 +82,7 @@ func (node *Node) proposeNewBlock() (*types.Block, error) { // Update worker's current header and state data in preparation to propose/process new transactions coinbase := node.Consensus.SelfAddress - // Prepare transactions including staking transactions\ + // Prepare transactions including staking transactions pending, err := node.TxPool.Pending() if err != nil { utils.Logger().Err(err).Msg("Failed to fetch pending transactions") From 77623dcaee5d33df50c318a7193da4669fec1ccd Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Fri, 8 Nov 2019 10:10:20 -0800 Subject: [PATCH 6/7] Remove Tx Throttling logic --- internal/configs/sharding/fixedschedule.go | 40 ------------ internal/configs/sharding/localnet.go | 46 -------------- internal/configs/sharding/mainnet.go | 46 -------------- internal/configs/sharding/pangaea.go | 42 ------------- internal/configs/sharding/shardingconfig.go | 67 --------------------- internal/configs/sharding/testnet.go | 46 -------------- node/node.go | 4 +- node/worker/worker.go | 43 ------------- 8 files changed, 1 insertion(+), 333 deletions(-) diff --git a/internal/configs/sharding/fixedschedule.go b/internal/configs/sharding/fixedschedule.go index 59fc1f92a..1fd9cd71e 100644 --- a/internal/configs/sharding/fixedschedule.go +++ b/internal/configs/sharding/fixedschedule.go @@ -2,9 +2,6 @@ package shardingconfig import ( "math/big" - "time" - - "github.com/harmony-one/harmony/common/denominations" ) const ( @@ -55,43 +52,6 @@ func (s fixedSchedule) RandomnessStartingEpoch() uint64 { return mainnetRandomnessStartingEpoch } -func (s fixedSchedule) MaxTxAmountLimit() *big.Int { - amountBigInt := big.NewInt(mainnetMaxTxAmountLimit) - amountBigInt = amountBigInt.Mul(amountBigInt, big.NewInt(denominations.One)) - return amountBigInt -} - -func (s fixedSchedule) MaxNumRecentTxsPerAccountLimit() uint64 { - return mainnetMaxNumRecentTxsPerAccountLimit -} - -func (s fixedSchedule) MaxTxPoolSizeLimit() int { - return mainnetMaxTxPoolSizeLimit -} - -func (s fixedSchedule) MaxNumTxsPerBlockLimit() int { - return mainnetMaxNumTxsPerBlockLimit -} - -func (s fixedSchedule) RecentTxDuration() time.Duration { - return mainnetRecentTxDuration -} - -func (s fixedSchedule) EnableTxnThrottling() bool { - return mainnetEnableTxnThrottling -} - -func (s fixedSchedule) TxsThrottleConfig() *TxsThrottleConfig { - return &TxsThrottleConfig{ - MaxTxAmountLimit: s.MaxTxAmountLimit(), - MaxNumRecentTxsPerAccountLimit: s.MaxNumRecentTxsPerAccountLimit(), - MaxTxPoolSizeLimit: s.MaxTxPoolSizeLimit(), - MaxNumTxsPerBlockLimit: s.MaxNumTxsPerBlockLimit(), - RecentTxDuration: s.RecentTxDuration(), - EnableTxnThrottling: s.EnableTxnThrottling(), - } -} - func (s fixedSchedule) GetNetworkID() NetworkID { return DevNet } diff --git a/internal/configs/sharding/localnet.go b/internal/configs/sharding/localnet.go index 2a0f0b302..47130a686 100644 --- a/internal/configs/sharding/localnet.go +++ b/internal/configs/sharding/localnet.go @@ -3,9 +3,7 @@ package shardingconfig import ( "fmt" "math/big" - "time" - "github.com/harmony-one/harmony/common/denominations" "github.com/harmony-one/harmony/internal/genesis" ) @@ -26,13 +24,6 @@ const ( localnetConsensusRatio = float64(0.1) localnetRandomnessStartingEpoch = 0 - - localnetMaxTxAmountLimit = 1e3 // unit is in One - localnetMaxNumRecentTxsPerAccountLimit = 1e2 - localnetMaxTxPoolSizeLimit = 8000 - localnetMaxNumTxsPerBlockLimit = 1000 - localnetRecentTxDuration = time.Hour - localnetEnableTxnThrottling = false ) func (localnetSchedule) InstanceForEpoch(epoch *big.Int) Instance { @@ -97,43 +88,6 @@ func (ls localnetSchedule) RandomnessStartingEpoch() uint64 { return localnetRandomnessStartingEpoch } -func (ls localnetSchedule) MaxTxAmountLimit() *big.Int { - amountBigInt := big.NewInt(localnetMaxTxAmountLimit) - amountBigInt = amountBigInt.Mul(amountBigInt, big.NewInt(denominations.One)) - return amountBigInt -} - -func (ls localnetSchedule) MaxNumRecentTxsPerAccountLimit() uint64 { - return localnetMaxNumRecentTxsPerAccountLimit -} - -func (ls localnetSchedule) MaxTxPoolSizeLimit() int { - return localnetMaxTxPoolSizeLimit -} - -func (ls localnetSchedule) MaxNumTxsPerBlockLimit() int { - return localnetMaxNumTxsPerBlockLimit -} - -func (ls localnetSchedule) RecentTxDuration() time.Duration { - return localnetRecentTxDuration -} - -func (ls localnetSchedule) EnableTxnThrottling() bool { - return localnetEnableTxnThrottling -} - -func (ls localnetSchedule) TxsThrottleConfig() *TxsThrottleConfig { - return &TxsThrottleConfig{ - MaxTxAmountLimit: ls.MaxTxAmountLimit(), - MaxNumRecentTxsPerAccountLimit: ls.MaxNumRecentTxsPerAccountLimit(), - MaxTxPoolSizeLimit: ls.MaxTxPoolSizeLimit(), - MaxNumTxsPerBlockLimit: ls.MaxNumTxsPerBlockLimit(), - RecentTxDuration: ls.RecentTxDuration(), - EnableTxnThrottling: ls.EnableTxnThrottling(), - } -} - func (ls localnetSchedule) GetNetworkID() NetworkID { return LocalNet } diff --git a/internal/configs/sharding/mainnet.go b/internal/configs/sharding/mainnet.go index efaeb963e..2b5c22a5e 100644 --- a/internal/configs/sharding/mainnet.go +++ b/internal/configs/sharding/mainnet.go @@ -2,9 +2,7 @@ package shardingconfig import ( "math/big" - "time" - "github.com/harmony-one/harmony/common/denominations" "github.com/harmony-one/harmony/internal/genesis" ) @@ -29,13 +27,6 @@ const ( mainnetV1_4Epoch = 46 mainnetV1_5Epoch = 54 - mainnetMaxTxAmountLimit = 1e3 // unit is interface{} One - mainnetMaxNumRecentTxsPerAccountLimit = 1e2 - mainnetMaxTxPoolSizeLimit = 8000 - mainnetMaxNumTxsPerBlockLimit = 1000 - mainnetRecentTxDuration = time.Hour - mainnetEnableTxnThrottling = false - // MainNetHTTPPattern is the http pattern for mainnet. MainNetHTTPPattern = "https://api.s%d.t.hmny.io" // MainNetWSPattern is the websocket pattern for mainnet. @@ -135,43 +126,6 @@ func (ms mainnetSchedule) RandomnessStartingEpoch() uint64 { return mainnetRandomnessStartingEpoch } -func (ms mainnetSchedule) MaxTxAmountLimit() *big.Int { - amountBigInt := big.NewInt(mainnetMaxTxAmountLimit) - amountBigInt = amountBigInt.Mul(amountBigInt, big.NewInt(denominations.One)) - return amountBigInt -} - -func (ms mainnetSchedule) MaxNumRecentTxsPerAccountLimit() uint64 { - return mainnetMaxNumRecentTxsPerAccountLimit -} - -func (ms mainnetSchedule) MaxTxPoolSizeLimit() int { - return mainnetMaxTxPoolSizeLimit -} - -func (ms mainnetSchedule) MaxNumTxsPerBlockLimit() int { - return mainnetMaxNumTxsPerBlockLimit -} - -func (ms mainnetSchedule) RecentTxDuration() time.Duration { - return mainnetRecentTxDuration -} - -func (ms mainnetSchedule) EnableTxnThrottling() bool { - return mainnetEnableTxnThrottling -} - -func (ms mainnetSchedule) TxsThrottleConfig() *TxsThrottleConfig { - return &TxsThrottleConfig{ - MaxTxAmountLimit: ms.MaxTxAmountLimit(), - MaxNumRecentTxsPerAccountLimit: ms.MaxNumRecentTxsPerAccountLimit(), - MaxTxPoolSizeLimit: ms.MaxTxPoolSizeLimit(), - MaxNumTxsPerBlockLimit: ms.MaxNumTxsPerBlockLimit(), - RecentTxDuration: ms.RecentTxDuration(), - EnableTxnThrottling: ms.EnableTxnThrottling(), - } -} - func (ms mainnetSchedule) GetNetworkID() NetworkID { return MainNet } diff --git a/internal/configs/sharding/pangaea.go b/internal/configs/sharding/pangaea.go index 3b14e9065..0cf66358a 100644 --- a/internal/configs/sharding/pangaea.go +++ b/internal/configs/sharding/pangaea.go @@ -2,11 +2,8 @@ package shardingconfig import ( "math/big" - "time" "github.com/ethereum/go-ethereum/common" - "github.com/harmony-one/harmony/common/denominations" - "github.com/harmony-one/harmony/internal/genesis" ) @@ -15,8 +12,6 @@ const ( PangaeaHTTPPattern = "https://api.s%d.pga.hmny.io" // PangaeaWSPattern is the websocket pattern for pangaea. PangaeaWSPattern = "wss://ws.s%d.pga.hmny.io" - // transaction throttling disabled on pangaea network - pangaeaEnableTxnThrottling = false ) // PangaeaSchedule is the Pangaea sharding configuration schedule. @@ -64,43 +59,6 @@ func (ps pangaeaSchedule) RandomnessStartingEpoch() uint64 { return mainnetRandomnessStartingEpoch } -func (ps pangaeaSchedule) MaxTxAmountLimit() *big.Int { - amountBigInt := big.NewInt(mainnetMaxTxAmountLimit) - amountBigInt = amountBigInt.Mul(amountBigInt, big.NewInt(denominations.One)) - return amountBigInt -} - -func (ps pangaeaSchedule) MaxNumRecentTxsPerAccountLimit() uint64 { - return mainnetMaxNumRecentTxsPerAccountLimit -} - -func (ps pangaeaSchedule) MaxTxPoolSizeLimit() int { - return mainnetMaxTxPoolSizeLimit -} - -func (ps pangaeaSchedule) MaxNumTxsPerBlockLimit() int { - return mainnetMaxNumTxsPerBlockLimit -} - -func (ps pangaeaSchedule) RecentTxDuration() time.Duration { - return mainnetRecentTxDuration -} - -func (ps pangaeaSchedule) EnableTxnThrottling() bool { - return pangaeaEnableTxnThrottling -} - -func (ps pangaeaSchedule) TxsThrottleConfig() *TxsThrottleConfig { - return &TxsThrottleConfig{ - MaxTxAmountLimit: ps.MaxTxAmountLimit(), - MaxNumRecentTxsPerAccountLimit: ps.MaxNumRecentTxsPerAccountLimit(), - MaxTxPoolSizeLimit: ps.MaxTxPoolSizeLimit(), - MaxNumTxsPerBlockLimit: ps.MaxNumTxsPerBlockLimit(), - RecentTxDuration: ps.RecentTxDuration(), - EnableTxnThrottling: ps.EnableTxnThrottling(), - } -} - func (pangaeaSchedule) GetNetworkID() NetworkID { return Pangaea } diff --git a/internal/configs/sharding/shardingconfig.go b/internal/configs/sharding/shardingconfig.go index e53d8ba2b..d46c9eaeb 100644 --- a/internal/configs/sharding/shardingconfig.go +++ b/internal/configs/sharding/shardingconfig.go @@ -5,7 +5,6 @@ package shardingconfig import ( "fmt" "math/big" - "time" "github.com/harmony-one/harmony/internal/genesis" ) @@ -37,27 +36,6 @@ type Schedule interface { //RandomnessStartingEpoch returns starting epoch of randonness generation RandomnessStartingEpoch() uint64 - // Max amount limit for a valid transaction - MaxTxAmountLimit() *big.Int - - // Max number of transactions of a particular account per block level - MaxNumRecentTxsPerAccountLimit() uint64 - - // Max total number of transactions allowed as pending transactions in transaction pool - MaxTxPoolSizeLimit() int - - // Max total number of transactions allowed to be processed per block - MaxNumTxsPerBlockLimit() int - - // How long "recent" means for transaction in time Duration unit - RecentTxDuration() time.Duration - - // EnableTxnThrottling is the switch for transaction throttling - EnableTxnThrottling() bool - - // configuration for throttling pending transactions - TxsThrottleConfig() *TxsThrottleConfig - // GetNetworkID() return networkID type. GetNetworkID() NetworkID @@ -90,51 +68,6 @@ type Instance interface { ReshardingEpoch() []*big.Int } -// TxThrottleFlag is the throttling flag for each transaction -// Refer below enum declaration for more context. -type TxThrottleFlag int - -// TxThrottleFlag is determined per transaction -// during the new block proposal and pending transactions throttling -const ( - TxSelect TxThrottleFlag = iota - TxUnselect - TxInvalid -) - -func (result TxThrottleFlag) String() string { - switch result { - case TxSelect: - return "TxSelect" - case TxUnselect: - return "TxUnselect" - case TxInvalid: - return "TxInvalid" - } - return "TxThrottleUnknown" -} - -// TxsThrottleConfig contains configuration for throttling pending transactions per node block -type TxsThrottleConfig struct { - // Max amount limit for a valid transaction - MaxTxAmountLimit *big.Int - - // Max number of transactions of a particular account for the past hour - RecentTxDuration time.Duration - - // Max number of transactions of a particular account for the past hour - MaxNumRecentTxsPerAccountLimit uint64 - - // Max total number of transactions allowed as pending transactions in transaction pool - MaxTxPoolSizeLimit int - - // Max total number of transactions allowed to be processed per block - MaxNumTxsPerBlockLimit int - - // EnableTxnThrottling is the switch for transaction throttling - EnableTxnThrottling bool -} - // genShardingStructure return sharding structure, given shard number and its patterns. func genShardingStructure(shardNum, shardID int, httpPattern, wsPattern string) []map[string]interface{} { res := []map[string]interface{}{} diff --git a/internal/configs/sharding/testnet.go b/internal/configs/sharding/testnet.go index 2778d8eea..9e069bb70 100644 --- a/internal/configs/sharding/testnet.go +++ b/internal/configs/sharding/testnet.go @@ -2,9 +2,7 @@ package shardingconfig import ( "math/big" - "time" - "github.com/harmony-one/harmony/common/denominations" "github.com/harmony-one/harmony/internal/genesis" ) @@ -20,13 +18,6 @@ const ( testnetVdfDifficulty = 10000 // This takes about 20s to finish the vdf - testnetMaxTxAmountLimit = 1e3 // unit is in One - testnetMaxNumRecentTxsPerAccountLimit = 1e2 - testnetMaxTxPoolSizeLimit = 8000 - testnetMaxNumTxsPerBlockLimit = 1000 - testnetRecentTxDuration = time.Hour - testnetEnableTxnThrottling = false - // TestNetHTTPPattern is the http pattern for testnet. TestNetHTTPPattern = "https://api.s%d.b.hmny.io" // TestNetWSPattern is the websocket pattern for testnet. @@ -72,43 +63,6 @@ func (ts testnetSchedule) RandomnessStartingEpoch() uint64 { return mainnetRandomnessStartingEpoch } -func (ts testnetSchedule) MaxTxAmountLimit() *big.Int { - amountBigInt := big.NewInt(testnetMaxTxAmountLimit) - amountBigInt = amountBigInt.Mul(amountBigInt, big.NewInt(denominations.One)) - return amountBigInt -} - -func (ts testnetSchedule) MaxNumRecentTxsPerAccountLimit() uint64 { - return testnetMaxNumRecentTxsPerAccountLimit -} - -func (ts testnetSchedule) MaxTxPoolSizeLimit() int { - return testnetMaxTxPoolSizeLimit -} - -func (ts testnetSchedule) MaxNumTxsPerBlockLimit() int { - return testnetMaxNumTxsPerBlockLimit -} - -func (ts testnetSchedule) RecentTxDuration() time.Duration { - return testnetRecentTxDuration -} - -func (ts testnetSchedule) EnableTxnThrottling() bool { - return testnetEnableTxnThrottling -} - -func (ts testnetSchedule) TxsThrottleConfig() *TxsThrottleConfig { - return &TxsThrottleConfig{ - MaxTxAmountLimit: ts.MaxTxAmountLimit(), - MaxNumRecentTxsPerAccountLimit: ts.MaxNumRecentTxsPerAccountLimit(), - MaxTxPoolSizeLimit: ts.MaxTxPoolSizeLimit(), - MaxNumTxsPerBlockLimit: ts.MaxNumTxsPerBlockLimit(), - RecentTxDuration: ts.RecentTxDuration(), - EnableTxnThrottling: ts.EnableTxnThrottling(), - } -} - func (ts testnetSchedule) GetNetworkID() NetworkID { return TestNet } diff --git a/node/node.go b/node/node.go index 6a96588ba..bc3326ec1 100644 --- a/node/node.go +++ b/node/node.go @@ -51,8 +51,6 @@ const ( ) const ( - // TxPoolLimit is the limit of transaction pool. - TxPoolLimit = 20000 // NumTryBroadCast is the number of times trying to broadcast NumTryBroadCast = 3 // ClientRxQueueSize is the number of client messages to queue before tail-dropping. @@ -289,7 +287,7 @@ func (node *Node) addPendingTransactions(newTxs types.Transactions) { // Add new staking transactions to the pending staking transaction list. func (node *Node) addPendingStakingTransactions(newStakingTxs staking.StakingTransactions) { - txPoolLimit := core.ShardingSchedule.MaxTxPoolSizeLimit() + txPoolLimit := 1000 // TODO: incorporate staking txn into TxPool node.pendingStakingTxMutex.Lock() for _, tx := range newStakingTxs { if _, ok := node.pendingStakingTransactions[tx.Hash()]; !ok { diff --git a/node/worker/worker.go b/node/worker/worker.go index abc7b3a93..05be5c2f0 100644 --- a/node/worker/worker.go +++ b/node/worker/worker.go @@ -17,7 +17,6 @@ import ( "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/values" "github.com/harmony-one/harmony/core/vm" - shardingconfig "github.com/harmony-one/harmony/internal/configs/sharding" "github.com/harmony-one/harmony/internal/ctxerror" "github.com/harmony-one/harmony/internal/params" "github.com/harmony-one/harmony/internal/utils" @@ -54,48 +53,6 @@ type Worker struct { gasCeil uint64 } -// Returns a tuple where the first value is the txs sender account address, -// the second is the throttling result enum for the transaction of interest. -// Throttling happens based on the amount, frequency, etc. -func (w *Worker) throttleTxs(selected types.Transactions, recentTxsStats types.RecentTxsStats, txsThrottleConfig *shardingconfig.TxsThrottleConfig, tx *types.Transaction) (common.Address, shardingconfig.TxThrottleFlag) { - var sender common.Address - msg, err := tx.AsMessage(types.MakeSigner(w.config, w.chain.CurrentBlock().Epoch())) - if err != nil { - utils.Logger().Error().Err(err).Str("txId", tx.Hash().Hex()).Msg("Error when parsing tx into message") - } else { - sender = msg.From() - } - - // do not throttle transactions if disabled - if !txsThrottleConfig.EnableTxnThrottling { - return sender, shardingconfig.TxSelect - } - - // already selected max num txs - if len(selected) > txsThrottleConfig.MaxNumTxsPerBlockLimit { - utils.Logger().Info().Str("txId", tx.Hash().Hex()).Int("MaxNumTxsPerBlockLimit", txsThrottleConfig.MaxNumTxsPerBlockLimit).Msg("Throttling tx with max num txs per block limit") - return sender, shardingconfig.TxUnselect - } - - // throttle a single sender sending too many transactions in one block - if tx.Value().Cmp(txsThrottleConfig.MaxTxAmountLimit) > 0 { - utils.Logger().Info().Str("txId", tx.Hash().Hex()).Uint64("MaxTxAmountLimit", txsThrottleConfig.MaxTxAmountLimit.Uint64()).Uint64("txAmount", tx.Value().Uint64()).Msg("Throttling tx with max amount limit") - return sender, shardingconfig.TxInvalid - } - - // throttle too large transaction - var numTxsPastHour uint64 - for _, blockTxsCounts := range recentTxsStats { - numTxsPastHour += blockTxsCounts[sender] - } - if numTxsPastHour >= txsThrottleConfig.MaxNumRecentTxsPerAccountLimit { - utils.Logger().Info().Str("txId", tx.Hash().Hex()).Uint64("MaxNumRecentTxsPerAccountLimit", txsThrottleConfig.MaxNumRecentTxsPerAccountLimit).Msg("Throttling tx with max txs per account in a single block limit") - return sender, shardingconfig.TxInvalid - } - - return sender, shardingconfig.TxSelect -} - // CommitTransactions commits transactions for new block. func (w *Worker) CommitTransactions(pendingNormal map[common.Address]types.Transactions, pendingStaking staking.StakingTransactions, coinbase common.Address) error { From 40f099d56b0e8630256e2db1a884efde3fbe3b95 Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Fri, 8 Nov 2019 10:15:13 -0800 Subject: [PATCH 7/7] Add comment for PostChainEvents --- core/blockchain.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/blockchain.go b/core/blockchain.go index c09e14785..a53353c8e 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1156,6 +1156,7 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types. func (bc *BlockChain) InsertChain(chain types.Blocks, verifyHeaders bool) (int, error) { n, events, logs, err := bc.insertChain(chain, verifyHeaders) if err == nil { + // TODO: incorporate these into insertChain for idx, block := range chain { header := block.Header() header.Logger(utils.Logger()).Info(). @@ -1196,6 +1197,7 @@ func (bc *BlockChain) InsertChain(chain types.Blocks, verifyHeaders bool) (int, } } + // This should be done after everything about adding a block is done. bc.PostChainEvents(events, logs) return n, err }