From abd670639bff2615c08c560a7d9cdea1001bba76 Mon Sep 17 00:00:00 2001 From: Dennis Won Date: Mon, 12 Aug 2019 22:45:12 -0700 Subject: [PATCH] wallet stress_test_script v.0 for stress testing token transfer --- cmd/client/wallet/main.go | 7 +- cmd/client/wallet_stress_test/README.md | 1 + .../generated_wallet.ini.go | 60 +++ cmd/client/wallet_stress_test/main.go | 437 ++++++++++++++++++ internal/configs/sharding/fixedschedule.go | 6 + internal/configs/sharding/localnet.go | 7 + internal/configs/sharding/mainnet.go | 9 +- internal/configs/sharding/shardingconfig.go | 7 + internal/configs/sharding/testnet.go | 9 +- node/node.go | 8 +- node/worker/worker.go | 1 + scripts/go_executable_build.sh | 1 + 12 files changed, 547 insertions(+), 6 deletions(-) create mode 100644 cmd/client/wallet_stress_test/README.md create mode 100644 cmd/client/wallet_stress_test/generated_wallet.ini.go create mode 100644 cmd/client/wallet_stress_test/main.go diff --git a/cmd/client/wallet/main.go b/cmd/client/wallet/main.go index 3c86ffd2f..c21e4abaf 100644 --- a/cmd/client/wallet/main.go +++ b/cmd/client/wallet/main.go @@ -91,6 +91,7 @@ var ( transferSenderPtr = transferCommand.String("from", "0", "Specify the sender account address or index") transferReceiverPtr = transferCommand.String("to", "", "Specify the receiver account") transferAmountPtr = transferCommand.Float64("amount", 0, "Specify the amount to transfer") + transferGasPricePtr = transferCommand.Uint64("gasPrice", 0, "Specify the gas price amount. Unit is Nano.") transferShardIDPtr = transferCommand.Int("shardID", 0, "Specify the shard ID for the transfer") transferInputDataPtr = transferCommand.String("inputData", "", "Base64-encoded input data to embed in the transaction") transferSenderPassPtr = transferCommand.String("pass", "", "Passphrase of the sender's private key") @@ -651,6 +652,7 @@ func processTransferCommand() { sender := *transferSenderPtr receiver := *transferReceiverPtr amount := *transferAmountPtr + gasPrice := *transferGasPricePtr shardID := *transferShardIDPtr base64InputData := *transferInputDataPtr senderPass := *transferSenderPassPtr @@ -708,9 +710,12 @@ func processTransferCommand() { return } + gasPriceBigInt := big.NewInt(int64(gasPrice)) + gasPriceBigInt = gasPriceBigInt.Mul(gasPriceBigInt, big.NewInt(denominations.Nano)) + tx := types.NewTransaction( state.nonce, receiverAddress, uint32(shardID), amountBigInt, - gas, nil, inputData) + gas, gasPriceBigInt, inputData) account, err := ks.Find(accounts.Account{Address: senderAddress}) if err != nil { diff --git a/cmd/client/wallet_stress_test/README.md b/cmd/client/wallet_stress_test/README.md new file mode 100644 index 000000000..159d16be7 --- /dev/null +++ b/cmd/client/wallet_stress_test/README.md @@ -0,0 +1 @@ +The wallet program is the demo wallet which talks to Harmony devnet for various kinds of operations. For detail, please compile and execute ./bin/wallet. \ No newline at end of file diff --git a/cmd/client/wallet_stress_test/generated_wallet.ini.go b/cmd/client/wallet_stress_test/generated_wallet.ini.go new file mode 100644 index 000000000..b1ed56eff --- /dev/null +++ b/cmd/client/wallet_stress_test/generated_wallet.ini.go @@ -0,0 +1,60 @@ +package main + +const ( + defaultWalletIni = `[default] +bootnode = /ip4/100.26.90.187/tcp/9874/p2p/Qmdfjtk6hPoyrH1zVD9PEH4zfWLo38dP2mDvvKXfh3tnEv +bootnode = /ip4/54.213.43.194/tcp/9874/p2p/QmZJJx6AdaoEkGLrYG4JeLCKeCKDjnFz2wfHNHxAqFSGA9 +shards = 4 + +[default.shard0.rpc] +rpc = l0.t.hmny.io:14555 +rpc = s0.t.hmny.io:14555 + +[default.shard1.rpc] +rpc = l1.t.hmny.io:14555 +rpc = s1.t.hmny.io:14555 + +[default.shard2.rpc] +rpc = l2.t.hmny.io:14555 +rpc = s2.t.hmny.io:14555 + +[default.shard3.rpc] +rpc = l3.t.hmny.io:14555 +rpc = s3.t.hmny.io:14555 + +[local] +bootnode = /ip4/127.0.0.1/tcp/19876/p2p/Qmc1V6W7BwX8Ugb42Ti8RnXF1rY5PF7nnZ6bKBryCgi6cv +shards = 2 + +[local.shard0.rpc] +rpc = 127.0.0.1:14555 +rpc = 127.0.0.1:14557 +rpc = 127.0.0.1:14559 + +[local.shard1.rpc] +rpc = 127.0.0.1:14556 +rpc = 127.0.0.1:14558 +rpc = 127.0.0.1:14560 + +[devnet] +bootnode = /ip4/100.26.90.187/tcp/9871/p2p/Qmdfjtk6hPoyrH1zVD9PEH4zfWLo38dP2mDvvKXfh3tnEv +bootnode = /ip4/54.213.43.194/tcp/9871/p2p/QmRVbTpEYup8dSaURZfF6ByrMTSKa4UyUzJhSjahFzRqNj +shards = 4 + +[devnet.shard0.rpc] +rpc = l0.t1.hmny.io:14555 +rpc = s0.t1.hmny.io:14555 + +[devnet.shard1.rpc] +rpc = l1.t1.hmny.io:14555 +rpc = s1.t1.hmny.io:14555 + +[devnet.shard2.rpc] +rpc = l2.t1.hmny.io:14555 +rpc = s2.t1.hmny.io:14555 + +[devnet.shard3.rpc] +rpc = l3.t1.hmny.io:14555 +rpc = s3.t1.hmny.io:14555 +` +) diff --git a/cmd/client/wallet_stress_test/main.go b/cmd/client/wallet_stress_test/main.go new file mode 100644 index 000000000..a03aa73de --- /dev/null +++ b/cmd/client/wallet_stress_test/main.go @@ -0,0 +1,437 @@ +package main + +import ( + "flag" + "fmt" + "io/ioutil" + "math/big" + "math/rand" + "os" + "path" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" + "github.com/harmony-one/harmony/accounts" + "github.com/harmony-one/harmony/accounts/keystore" + "github.com/harmony-one/harmony/api/client" + clientService "github.com/harmony-one/harmony/api/client/service" + proto_node "github.com/harmony-one/harmony/api/proto/node" + "github.com/harmony-one/harmony/common/denominations" + "github.com/harmony-one/harmony/core" + "github.com/harmony-one/harmony/core/types" + common2 "github.com/harmony-one/harmony/internal/common" + nodeconfig "github.com/harmony-one/harmony/internal/configs/node" + "github.com/harmony-one/harmony/internal/ctxerror" + "github.com/harmony-one/harmony/internal/shardchain" + "github.com/harmony-one/harmony/internal/utils" + "github.com/harmony-one/harmony/node" + "github.com/harmony-one/harmony/p2p" + p2p_host "github.com/harmony-one/harmony/p2p/host" + "github.com/harmony-one/harmony/p2p/p2pimpl" +) + +var ( + version string + builtBy string + builtAt string + commit string +) + +func printVersion(me string) { + fmt.Fprintf(os.Stderr, "Harmony (C) 2019. %v, version %v-%v (%v %v)\n", path.Base(me), version, commit, builtBy, builtAt) + os.Exit(0) +} + +// AccountState includes the balance and nonce of an account +type AccountState struct { + balance *big.Int + nonce uint64 +} + +const ( + rpcRetry = 3 + defaultConfigFile = ".hmy/wallet.ini" + defaultProfile = "default" + keystoreDir = ".hmy/keystore" +) + +var ( + // Transfer subcommands + transferCommand = flag.NewFlagSet("transfer", flag.ExitOnError) + transferSenderPtr = transferCommand.String("from", "0", "Specify the sender account address or index") + transferReceiverPtr = transferCommand.String("to", "", "Specify the receiver account") + transferAmountPtr = transferCommand.Float64("amount", 0, "Specify the amount to transfer") + transferGasPricePtr = transferCommand.Uint64("gasPrice", 0, "Specify the gas price amount. Unit is Nano.") + transferShardIDPtr = transferCommand.Int("shardID", 0, "Specify the shard ID for the transfer") + transferInputDataPtr = transferCommand.String("inputData", "", "Base64-encoded input data to embed in the transaction") + transferSenderPassPtr = transferCommand.String("pass", "", "Passphrase of the sender's private key") +) + +var ( + walletProfile *utils.WalletProfile + ks *keystore.KeyStore +) + +// setupLog setup log for verbose output +func setupLog() { + // enable logging for wallet + h := log.StreamHandler(os.Stdout, log.TerminalFormat(true)) + log.Root().SetHandler(h) +} + +// The main wallet program entrance. Note the this wallet program is for demo-purpose only. It does not implement +// the secure storage of keys. +func main() { + rand.Seed(int64(time.Now().Nanosecond())) + + // Verify that a subcommand has been provided + // os.Arg[0] is the main command + // os.Arg[1] will be the subcommand + if len(os.Args) < 2 { + fmt.Println("Usage:") + fmt.Println(" wallet -p profile ") + fmt.Println(" -p profile - Specify the profile of the wallet, either testnet/devnet or others configured. Default is: testnet") + fmt.Println(" The profile is in file:", defaultConfigFile) + fmt.Println() + fmt.Println("Actions:") + fmt.Println(" 1. stressTest - Stress test transactions with corner cases.") + os.Exit(1) + } + +ARG: + for { + lastArg := os.Args[len(os.Args)-1] + switch lastArg { + case "--verbose": + setupLog() + os.Args = os.Args[:len(os.Args)-1] + default: + break ARG + } + } + + var profile string + if os.Args[1] == "-p" { + profile = os.Args[2] + os.Args = os.Args[2:] + } else { + profile = defaultProfile + } + if len(os.Args) == 1 { + fmt.Println("Missing action") + flag.PrintDefaults() + os.Exit(1) + } + + // create new keystore backend + scryptN := keystore.StandardScryptN + scryptP := keystore.StandardScryptP + ks = keystore.NewKeyStore(keystoreDir, scryptN, scryptP) + + // Switch on the subcommand + switch os.Args[1] { + case "-version": + printVersion(os.Args[0]) + case "stressTest": + readProfile(profile) + processStressTestCommand() + default: + fmt.Printf("Unknown action: %s\n", os.Args[1]) + flag.PrintDefaults() + os.Exit(1) + } +} + +//go:generate go run ../../../scripts/wallet_embed_ini_files.go + +func readProfile(profile string) { + fmt.Printf("Using %s profile for wallet\n", profile) + + // try to load .hmy/wallet.ini from filesystem + // use default_wallet_ini if .hmy/wallet.ini doesn't exist + var err error + var iniBytes []byte + + iniBytes, err = ioutil.ReadFile(defaultConfigFile) + if err != nil { + log.Debug(fmt.Sprintf("%s doesn't exist, using default ini\n", defaultConfigFile)) + iniBytes = []byte(defaultWalletIni) + } + + walletProfile, err = utils.ReadWalletProfile(iniBytes, profile) + if err != nil { + fmt.Printf("Read wallet profile error: %v\nExiting ...\n", err) + os.Exit(2) + } +} + +// createWalletNode creates wallet server node. +func createWalletNode() *node.Node { + bootNodeAddrs, err := utils.StringsToAddrs(walletProfile.Bootnodes) + if err != nil { + panic(err) + } + utils.BootNodes = bootNodeAddrs + shardID := 0 + // dummy host for wallet + // TODO: potentially, too many dummy IP may flush out good IP address from our bootnode DHT + // we need to understand the impact to bootnode DHT with this dummy host ip added + self := p2p.Peer{IP: "127.0.0.1", Port: "6999"} + priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "6999") + host, err := p2pimpl.NewHost(&self, priKey) + if err != nil { + panic(err) + } + chainDBFactory := &shardchain.MemDBFactory{} + w := node.New(host, nil, chainDBFactory, false) + w.Client = client.NewClient(w.GetHost(), uint32(shardID)) + + w.NodeConfig.SetRole(nodeconfig.ClientNode) + w.ServiceManagerSetup() + w.RunServices() + return w +} + +// ./bin/wallet -p local transfer +// --from one1uyshu2jgv8w465yc8kkny36thlt2wvel89tcmg +// --to one1spshr72utf6rwxseaz339j09ed8p6f8ke370zj +// --amount 1 --shardID 1 + +func processStressTestCommand() { + /* + + Account 17: + Address: one1spshr72utf6rwxseaz339j09ed8p6f8ke370zj + Balance in Shard 0: x.xxx, nonce: 0 + Balance in Shard 1: 0.0000, nonce: 0 + Account 18: + Address: one1uyshu2jgv8w465yc8kkny36thlt2wvel89tcmg + Balance in Shard 0: 0.0000, nonce: 0 + Balance in Shard 1: x.xxx, nonce: 0 + + */ + + senderAddress := common2.ParseAddr("one1uyshu2jgv8w465yc8kkny36thlt2wvel89tcmg") + receiverAddress := common2.ParseAddr("one1spshr72utf6rwxseaz339j09ed8p6f8ke370zj") + shardID := 1 + + walletNode := createWalletNode() + + shardIDToAccountState := FetchBalance(senderAddress) + state := shardIDToAccountState[shardID] + + balance := state.balance + // amount 1/10th of the balance + amountBigInt := balance.Div(balance, big.NewInt(10)) + + // default inputData + data := make([]byte, 0) + + gasLimit, _ := core.IntrinsicGas(data, false, true) + + gasPrice := 0 + gasPriceBigInt := big.NewInt(int64(gasPrice)) + gasPriceBigInt = gasPriceBigInt.Mul(gasPriceBigInt, big.NewInt(denominations.Nano)) + + senderPass := "" + + for i := 0; i < 4; i++ { + currNonce := state.nonce + tx := types.NewTransaction( + currNonce, receiverAddress, uint32(shardID), amountBigInt, + gasLimit, gasPriceBigInt, data) + + account, _ := ks.Find(accounts.Account{Address: senderAddress}) + + ks.Unlock(account, senderPass) + + tx, _ = ks.SignTx(account, tx, nil) + + if err := submitTransaction(tx, walletNode, uint32(shardID)); err != nil { + fmt.Println(ctxerror.New("submitTransaction failed", + "tx", tx, "shardID", shardID).WithCause(err)) + } + + for retry := 0; retry < 10; retry++ { + accountStates := FetchBalance(senderAddress) + state = accountStates[shardID] + fmt.Println("state.nonce", state.nonce) + if state.nonce == currNonce+1 { + break + } + time.Sleep(3 * time.Second) + } + } + + fmt.Printf("Sender Account: %s:\n", common2.MustAddressToBech32(senderAddress)) + for shardID, balanceNonce := range FetchBalance(senderAddress) { + fmt.Printf(" Balance in Shard %d: %s, nonce: %v \n", shardID, convertBalanceIntoReadableFormat(balanceNonce.balance), balanceNonce.nonce) + } +} + +func convertBalanceIntoReadableFormat(balance *big.Int) string { + balance = balance.Div(balance, big.NewInt(denominations.Nano)) + strBalance := fmt.Sprintf("%d", balance.Uint64()) + + bytes := []byte(strBalance) + hasDecimal := false + for i := 0; i < 11; i++ { + if len(bytes)-1-i < 0 { + bytes = append([]byte{'0'}, bytes...) + } + if bytes[len(bytes)-1-i] != '0' && i < 9 { + hasDecimal = true + } + if i == 9 { + newBytes := append([]byte{'.'}, bytes[len(bytes)-i:]...) + bytes = append(bytes[:len(bytes)-i], newBytes...) + } + } + zerosToRemove := 0 + for i := 0; i < len(bytes); i++ { + if hasDecimal { + if bytes[len(bytes)-1-i] == '0' { + bytes = bytes[:len(bytes)-1-i] + i-- + } else { + break + } + } else { + if zerosToRemove < 5 { + bytes = bytes[:len(bytes)-1-i] + i-- + zerosToRemove++ + } else { + break + } + } + } + return string(bytes) +} + +// FetchBalance fetches account balance of specified address from the Harmony network +func FetchBalance(address common.Address) []*AccountState { + result := []*AccountState{} + for shardID := 0; shardID < walletProfile.Shards; shardID++ { + // Fill in nil pointers for each shard; nil represent failed balance fetch. + result = append(result, nil) + } + + var wg sync.WaitGroup + wg.Add(walletProfile.Shards) + + for shardID := 0; shardID < walletProfile.Shards; shardID++ { + go func(shardID int) { + defer wg.Done() + balance := big.NewInt(0) + var nonce uint64 + result[uint32(shardID)] = &AccountState{balance, 0} + + var wgShard sync.WaitGroup + wgShard.Add(len(walletProfile.RPCServer[shardID])) + + var mutexAccountState = &sync.Mutex{} + + for rpcServerID := 0; rpcServerID < len(walletProfile.RPCServer[shardID]); rpcServerID++ { + go func(rpcServerID int) { + for retry := 0; retry < rpcRetry; retry++ { + + server := walletProfile.RPCServer[shardID][rpcServerID] + client, err := clientService.NewClient(server.IP, server.Port) + if err != nil { + continue + } + + log.Debug("FetchBalance", "server", server) + response, err := client.GetBalance(address) + if err != nil { + log.Info("failed to get balance, retrying ...") + time.Sleep(200 * time.Millisecond) + continue + } + log.Debug("FetchBalance", "response", response) + respBalance := big.NewInt(0) + respBalance.SetBytes(response.Balance) + + mutexAccountState.Lock() + if balance.Cmp(respBalance) < 0 { + balance.SetBytes(response.Balance) + nonce = response.Nonce + } + mutexAccountState.Unlock() + break + } + wgShard.Done() + }(rpcServerID) + } + wgShard.Wait() + + result[shardID] = &AccountState{balance, nonce} + }(shardID) + } + wg.Wait() + return result +} + +// GetFreeToken requests for token test token on each shard +func GetFreeToken(address common.Address) { + for i := 0; i < walletProfile.Shards; i++ { + // use the 1st server (leader) to make the getFreeToken call + server := walletProfile.RPCServer[i][0] + client, err := clientService.NewClient(server.IP, server.Port) + if err != nil { + continue + } + + log.Debug("GetFreeToken", "server", server) + + for retry := 0; retry < rpcRetry; retry++ { + response, err := client.GetFreeToken(address) + if err != nil { + log.Info("failed to get free token, retrying ...") + time.Sleep(200 * time.Millisecond) + continue + } + log.Debug("GetFreeToken", "response", response) + txID := common.Hash{} + txID.SetBytes(response.TxId) + fmt.Printf("Transaction Id requesting free token in shard %d: %s\n", i, txID.Hex()) + break + } + } +} + +// clearKeystore deletes all data in the local keystore +func clearKeystore() { + dir, err := ioutil.ReadDir(keystoreDir) + if err != nil { + panic("Failed to read keystore directory") + } + for _, d := range dir { + subdir := path.Join([]string{keystoreDir, d.Name()}...) + if err := os.RemoveAll(subdir); err != nil { + fmt.Println(ctxerror.New("cannot remove directory", + "path", subdir).WithCause(err)) + } + } + fmt.Println("All existing accounts deleted...") +} + +// submitTransaction submits the transaction to the Harmony network +func submitTransaction(tx *types.Transaction, walletNode *node.Node, shardID uint32) error { + msg := proto_node.ConstructTransactionListMessageAccount(types.Transactions{tx}) + clientGroup := p2p.NewClientGroupIDByShardID(p2p.ShardID(shardID)) + + err := walletNode.GetHost().SendMessageToGroups([]p2p.GroupID{clientGroup}, p2p_host.ConstructP2pMessage(byte(0), msg)) + if err != nil { + fmt.Printf("Error in SubmitTransaction: %v\n", err) + return err + } + fmt.Printf("Transaction Id for shard %d: %s\n", int(shardID), tx.Hash().Hex()) + // FIXME (leo): how to we know the tx was successful sent to the network + // this is a hacky way to wait for sometime + time.Sleep(3 * time.Second) + return nil +} diff --git a/internal/configs/sharding/fixedschedule.go b/internal/configs/sharding/fixedschedule.go index db7f4057f..5df96ae42 100644 --- a/internal/configs/sharding/fixedschedule.go +++ b/internal/configs/sharding/fixedschedule.go @@ -2,6 +2,7 @@ package shardingconfig import ( "math/big" + "time" "github.com/harmony-one/harmony/common/denominations" ) @@ -52,12 +53,17 @@ func (s fixedSchedule) MaxNumTxsPerBlockLimit() int { return mainnetMaxNumTxsPerBlockLimit } +func (s fixedSchedule) RecentTxDuration() time.Duration { + return mainnetRecentTxDuration +} + func (s fixedSchedule) TxsThrottleConfig() *TxsThrottleConfig { return &TxsThrottleConfig{ MaxTxAmountLimit: s.MaxTxAmountLimit(), MaxNumRecentTxsPerAccountLimit: s.MaxNumRecentTxsPerAccountLimit(), MaxTxPoolSizeLimit: s.MaxTxPoolSizeLimit(), MaxNumTxsPerBlockLimit: s.MaxNumTxsPerBlockLimit(), + RecentTxDuration: s.RecentTxDuration(), } } diff --git a/internal/configs/sharding/localnet.go b/internal/configs/sharding/localnet.go index 4416fa674..06d416e18 100644 --- a/internal/configs/sharding/localnet.go +++ b/internal/configs/sharding/localnet.go @@ -2,6 +2,7 @@ package shardingconfig import ( "math/big" + "time" "github.com/harmony-one/harmony/common/denominations" "github.com/harmony-one/harmony/internal/genesis" @@ -24,6 +25,7 @@ const ( localnetMaxNumRecentTxsPerAccountLimit = 2 localnetMaxTxPoolSizeLimit = 8000 localnetMaxNumTxsPerBlockLimit = 1000 + localnetRecentTxDuration = 100 * time.Second ) func (localnetSchedule) InstanceForEpoch(epoch *big.Int) Instance { @@ -81,12 +83,17 @@ func (ls localnetSchedule) MaxNumTxsPerBlockLimit() int { return localnetMaxNumTxsPerBlockLimit } +func (ls localnetSchedule) RecentTxDuration() time.Duration { + return localnetRecentTxDuration +} + func (ls localnetSchedule) TxsThrottleConfig() *TxsThrottleConfig { return &TxsThrottleConfig{ MaxTxAmountLimit: ls.MaxTxAmountLimit(), MaxNumRecentTxsPerAccountLimit: ls.MaxNumRecentTxsPerAccountLimit(), MaxTxPoolSizeLimit: ls.MaxTxPoolSizeLimit(), MaxNumTxsPerBlockLimit: ls.MaxNumTxsPerBlockLimit(), + RecentTxDuration: ls.RecentTxDuration(), } } diff --git a/internal/configs/sharding/mainnet.go b/internal/configs/sharding/mainnet.go index 5749a4914..865548575 100644 --- a/internal/configs/sharding/mainnet.go +++ b/internal/configs/sharding/mainnet.go @@ -2,6 +2,7 @@ package shardingconfig import ( "math/big" + "time" "github.com/harmony-one/harmony/common/denominations" "github.com/harmony-one/harmony/internal/genesis" @@ -15,9 +16,10 @@ const ( mainnetV3Epoch = 8 mainnetMaxTxAmountLimit = 1e3 // unit is in One - mainnetMaxNumRecentTxsPerAccountLimit = 10 + mainnetMaxNumRecentTxsPerAccountLimit = 1e2 mainnetMaxTxPoolSizeLimit = 8000 mainnetMaxNumTxsPerBlockLimit = 1000 + mainnetRecentTxDuration = time.Hour ) // MainnetSchedule is the mainnet sharding configuration schedule. @@ -85,12 +87,17 @@ func (ms mainnetSchedule) MaxNumTxsPerBlockLimit() int { return mainnetMaxNumTxsPerBlockLimit } +func (ms mainnetSchedule) RecentTxDuration() time.Duration { + return mainnetRecentTxDuration +} + func (ms mainnetSchedule) TxsThrottleConfig() *TxsThrottleConfig { return &TxsThrottleConfig{ MaxTxAmountLimit: ms.MaxTxAmountLimit(), MaxNumRecentTxsPerAccountLimit: ms.MaxNumRecentTxsPerAccountLimit(), MaxTxPoolSizeLimit: ms.MaxTxPoolSizeLimit(), MaxNumTxsPerBlockLimit: ms.MaxNumTxsPerBlockLimit(), + RecentTxDuration: ms.RecentTxDuration(), } } diff --git a/internal/configs/sharding/shardingconfig.go b/internal/configs/sharding/shardingconfig.go index 1b6feb2cf..8a2d6f820 100644 --- a/internal/configs/sharding/shardingconfig.go +++ b/internal/configs/sharding/shardingconfig.go @@ -4,6 +4,7 @@ package shardingconfig import ( "math/big" + "time" "github.com/harmony-one/harmony/internal/genesis" ) @@ -34,6 +35,9 @@ type Schedule interface { // Max total number of transactions allowed to be processed per block MaxNumTxsPerBlockLimit() int + // How long "recent" means for transaction in time Duration unit + RecentTxDuration() time.Duration + // configuration for throttling pending transactions TxsThrottleConfig() *TxsThrottleConfig } @@ -92,6 +96,9 @@ type TxsThrottleConfig struct { // Max amount limit for a valid transaction MaxTxAmountLimit *big.Int + // Max number of transactions of a particular account for the past hour + RecentTxDuration time.Duration + // Max number of transactions of a particular account for the past hour MaxNumRecentTxsPerAccountLimit uint64 diff --git a/internal/configs/sharding/testnet.go b/internal/configs/sharding/testnet.go index a472bb417..d3b740047 100644 --- a/internal/configs/sharding/testnet.go +++ b/internal/configs/sharding/testnet.go @@ -2,6 +2,7 @@ package shardingconfig import ( "math/big" + "time" "github.com/harmony-one/harmony/common/denominations" "github.com/harmony-one/harmony/internal/genesis" @@ -21,9 +22,10 @@ const ( threeOne = 111 testnetMaxTxAmountLimit = 1e3 // unit is in One - testnetMaxNumRecentTxsPerAccountLimit = 10 + testnetMaxNumRecentTxsPerAccountLimit = 1e2 testnetMaxTxPoolSizeLimit = 8000 testnetMaxNumTxsPerBlockLimit = 1000 + testnetRecentTxDuration = time.Hour ) func (testnetSchedule) InstanceForEpoch(epoch *big.Int) Instance { @@ -82,12 +84,17 @@ func (ts testnetSchedule) MaxNumTxsPerBlockLimit() int { return testnetMaxNumTxsPerBlockLimit } +func (ts testnetSchedule) RecentTxDuration() time.Duration { + return testnetRecentTxDuration +} + func (ts testnetSchedule) TxsThrottleConfig() *TxsThrottleConfig { return &TxsThrottleConfig{ MaxTxAmountLimit: ts.MaxTxAmountLimit(), MaxNumRecentTxsPerAccountLimit: ts.MaxNumRecentTxsPerAccountLimit(), MaxTxPoolSizeLimit: ts.MaxTxPoolSizeLimit(), MaxNumTxsPerBlockLimit: ts.MaxNumTxsPerBlockLimit(), + RecentTxDuration: ts.RecentTxDuration(), } } diff --git a/node/node.go b/node/node.go index 62d665b63..76e2d7d43 100644 --- a/node/node.go +++ b/node/node.go @@ -259,12 +259,14 @@ func (node *Node) AddPendingTransaction(newTx *types.Transaction) { func (node *Node) getTransactionsForNewBlock(coinbase common.Address) types.Transactions { node.pendingTxMutex.Lock() + txsThrottleConfig := core.ShardingSchedule.TxsThrottleConfig() + // the next block number to be added in consensus protocol, which is always one more than current chain header block newBlockNum := node.Blockchain().CurrentBlock().NumberU64() + 1 - // remove old (currently > 1 hr) blockNum keys from recentTxsStats and initiailize for the new block + // remove old (> txsThrottleConfigRecentTxDuration) blockNum keys from recentTxsStats and initiailize for the new block for blockNum := range node.recentTxsStats { - blockNumHourAgo := uint64(time.Hour / node.BlockPeriod) + blockNumHourAgo := uint64(txsThrottleConfig.RecentTxDuration / node.BlockPeriod) if blockNumHourAgo < newBlockNum-blockNum { delete(node.recentTxsStats, blockNum) @@ -272,7 +274,7 @@ func (node *Node) getTransactionsForNewBlock(coinbase common.Address) types.Tran } node.recentTxsStats[newBlockNum] = make(types.BlockTxsCounts) - selected, unselected, invalid := node.Worker.SelectTransactionsForNewBlock(newBlockNum, node.pendingTransactions, node.recentTxsStats, core.ShardingSchedule.TxsThrottleConfig(), coinbase) + selected, unselected, invalid := node.Worker.SelectTransactionsForNewBlock(newBlockNum, node.pendingTransactions, node.recentTxsStats, txsThrottleConfig, coinbase) node.pendingTransactions = unselected node.reducePendingTransactions() diff --git a/node/worker/worker.go b/node/worker/worker.go index 78468993f..32d9f977e 100644 --- a/node/worker/worker.go +++ b/node/worker/worker.go @@ -109,6 +109,7 @@ func (w *Worker) SelectTransactionsForNewBlock(newBlockNum uint64, txs types.Tra for _, tx := range txs { if tx.ShardID() != w.shardID { invalid = append(invalid, tx) + continue } sender, flag := w.throttleTxs(selected, recentTxsStats, txsThrottleConfig, tx) diff --git a/scripts/go_executable_build.sh b/scripts/go_executable_build.sh index 0ea616a16..1a7cdf6df 100755 --- a/scripts/go_executable_build.sh +++ b/scripts/go_executable_build.sh @@ -7,6 +7,7 @@ SRC[harmony]=cmd/harmony/main.go SRC[txgen]=cmd/client/txgen/main.go SRC[bootnode]=cmd/bootnode/main.go SRC[wallet]="cmd/client/wallet/main.go cmd/client/wallet/generated_wallet.ini.go" +SRC[wallet_stress_test]="cmd/client/wallet_stress_test/main.go cmd/client/wallet_stress_test/generated_wallet.ini.go" BINDIR=bin BUCKET=unique-bucket-bin