Merge pull request #1319 from denniswon/r3_txs_throttle

[r3 patch] throttle transactions with configuration per-network level
pull/1360/head r3-20190814.0
Leo Chen 5 years ago committed by GitHub
commit 060181b0ef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      cmd/client/wallet/main.go
  2. 1
      cmd/client/wallet_stress_test/README.md
  3. 86
      cmd/client/wallet_stress_test/generated_wallet.ini.go
  4. 465
      cmd/client/wallet_stress_test/main.go
  5. 6
      cmd/harmony/main.go
  6. 29
      core/types/transaction.go
  7. 35
      internal/configs/sharding/fixedschedule.go
  8. 42
      internal/configs/sharding/localnet.go
  9. 41
      internal/configs/sharding/mainnet.go
  10. 61
      internal/configs/sharding/shardingconfig.go
  11. 40
      internal/configs/sharding/testnet.go
  12. 75
      node/node.go
  13. 4
      node/node_handler.go
  14. 4
      node/node_handler_test.go
  15. 17
      node/node_newblock.go
  16. 2
      node/staking_test.go
  17. 91
      node/worker/worker.go
  18. 1
      scripts/go_executable_build.sh

@ -91,6 +91,7 @@ var (
transferSenderPtr = transferCommand.String("from", "0", "Specify the sender account address or index") transferSenderPtr = transferCommand.String("from", "0", "Specify the sender account address or index")
transferReceiverPtr = transferCommand.String("to", "", "Specify the receiver account") transferReceiverPtr = transferCommand.String("to", "", "Specify the receiver account")
transferAmountPtr = transferCommand.Float64("amount", 0, "Specify the amount to transfer") transferAmountPtr = transferCommand.Float64("amount", 0, "Specify the amount to transfer")
transferGasPricePtr = transferCommand.Uint64("gasPrice", 0, "Specify the gas price amount. Unit is Nano.")
transferShardIDPtr = transferCommand.Int("shardID", 0, "Specify the shard ID for the transfer") transferShardIDPtr = transferCommand.Int("shardID", 0, "Specify the shard ID for the transfer")
transferInputDataPtr = transferCommand.String("inputData", "", "Base64-encoded input data to embed in the transaction") transferInputDataPtr = transferCommand.String("inputData", "", "Base64-encoded input data to embed in the transaction")
transferSenderPassPtr = transferCommand.String("pass", "", "Passphrase of the sender's private key") transferSenderPassPtr = transferCommand.String("pass", "", "Passphrase of the sender's private key")
@ -651,6 +652,7 @@ func processTransferCommand() {
sender := *transferSenderPtr sender := *transferSenderPtr
receiver := *transferReceiverPtr receiver := *transferReceiverPtr
amount := *transferAmountPtr amount := *transferAmountPtr
gasPrice := *transferGasPricePtr
shardID := *transferShardIDPtr shardID := *transferShardIDPtr
base64InputData := *transferInputDataPtr base64InputData := *transferInputDataPtr
senderPass := *transferSenderPassPtr senderPass := *transferSenderPassPtr
@ -708,9 +710,12 @@ func processTransferCommand() {
return return
} }
gasPriceBigInt := big.NewInt(int64(gasPrice))
gasPriceBigInt = gasPriceBigInt.Mul(gasPriceBigInt, big.NewInt(denominations.Nano))
tx := types.NewTransaction( tx := types.NewTransaction(
state.nonce, receiverAddress, uint32(shardID), amountBigInt, state.nonce, receiverAddress, uint32(shardID), amountBigInt,
gas, nil, inputData) gas, gasPriceBigInt, inputData)
account, err := ks.Find(accounts.Account{Address: senderAddress}) account, err := ks.Find(accounts.Account{Address: senderAddress})
if err != nil { if err != nil {

@ -0,0 +1 @@
The wallet program is the demo wallet which talks to Harmony devnet for various kinds of operations. For detail, please compile and execute ./bin/wallet.

@ -0,0 +1,86 @@
package main
const (
defaultWalletIni = `[default]
bootnode = /ip4/100.26.90.187/tcp/9874/p2p/Qmdfjtk6hPoyrH1zVD9PEH4zfWLo38dP2mDvvKXfh3tnEv
bootnode = /ip4/54.213.43.194/tcp/9874/p2p/QmZJJx6AdaoEkGLrYG4JeLCKeCKDjnFz2wfHNHxAqFSGA9
bootnode = /ip4/13.113.101.219/tcp/12019/p2p/QmQayinFSgMMw5cSpDUiD9pQ2WeP6WNmGxpZ6ou3mdVFJX
bootnode = /ip4/99.81.170.167/tcp/12019/p2p/QmRVbTpEYup8dSaURZfF6ByrMTSKa4UyUzJhSjahFzRqNj
shards = 4
[default.shard0.rpc]
rpc = l0.t.hmny.io:14555
rpc = s0.t.hmny.io:14555
[default.shard1.rpc]
rpc = l1.t.hmny.io:14555
rpc = s1.t.hmny.io:14555
[default.shard2.rpc]
rpc = l2.t.hmny.io:14555
rpc = s2.t.hmny.io:14555
[default.shard3.rpc]
rpc = l3.t.hmny.io:14555
rpc = s3.t.hmny.io:14555
[local]
bootnode = /ip4/127.0.0.1/tcp/19876/p2p/Qmc1V6W7BwX8Ugb42Ti8RnXF1rY5PF7nnZ6bKBryCgi6cv
shards = 2
[local.shard0.rpc]
rpc = 127.0.0.1:14555
rpc = 127.0.0.1:14557
rpc = 127.0.0.1:14559
[local.shard1.rpc]
rpc = 127.0.0.1:14556
rpc = 127.0.0.1:14558
rpc = 127.0.0.1:14560
[beta]
bootnode = /ip4/54.213.43.194/tcp/9868/p2p/QmZJJx6AdaoEkGLrYG4JeLCKeCKDjnFz2wfHNHxAqFSGA9
bootnode = /ip4/100.26.90.187/tcp/9868/p2p/Qmdfjtk6hPoyrH1zVD9PEH4zfWLo38dP2mDvvKXfh3tnEv
bootnode = /ip4/13.113.101.219/tcp/12018/p2p/QmQayinFSgMMw5cSpDUiD9pQ2WeP6WNmGxpZ6ou3mdVFJX
shards = 4
[beta.shard0.rpc]
rpc = l0.b.hmny.io:14555
rpc = s0.b.hmny.io:14555
[beta.shard1.rpc]
rpc = l1.b.hmny.io:14555
rpc = s1.b.hmny.io:14555
[beta.shard2.rpc]
rpc = l2.b.hmny.io:14555
rpc = s2.b.hmny.io:14555
[beta.shard3.rpc]
rpc = l3.b.hmny.io:14555
rpc = s3.b.hmny.io:14555
[pangaea]
bootnode = /ip4/100.26.90.187/tcp/9867/p2p/Qmdfjtk6hPoyrH1zVD9PEH4zfWLo38dP2mDvvKXfh3tnEv
bootnode = /ip4/54.213.43.194/tcp/9867/p2p/QmZJJx6AdaoEkGLrYG4JeLCKeCKDjnFz2wfHNHxAqFSGA9
bootnode = /ip4/13.113.101.219/tcp/9867/p2p/QmQayinFSgMMw5cSpDUiD9pQ2WeP6WNmGxpZ6ou3mdVFJX
bootnode = /ip4/99.81.170.167/tcp/9867/p2p/QmRVbTpEYup8dSaURZfF6ByrMTSKa4UyUzJhSjahFzRqNj
shards = 4
[pangaea.shard0.rpc]
rpc = l0.p.hmny.io:14555
rpc = s0.p.hmny.io:14555
[pangaea.shard1.rpc]
rpc = l1.p.hmny.io:14555
rpc = s1.p.hmny.io:14555
[pangaea.shard2.rpc]
rpc = l2.p.hmny.io:14555
rpc = s2.p.hmny.io:14555
[pangaea.shard3.rpc]
rpc = l3.p.hmny.io:14555
rpc = s3.p.hmny.io:14555
`
)

@ -0,0 +1,465 @@
package main
import (
"flag"
"fmt"
"io/ioutil"
"math/big"
"math/rand"
"os"
"path"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/harmony-one/harmony/accounts"
"github.com/harmony-one/harmony/accounts/keystore"
"github.com/harmony-one/harmony/api/client"
clientService "github.com/harmony-one/harmony/api/client/service"
proto_node "github.com/harmony-one/harmony/api/proto/node"
"github.com/harmony-one/harmony/common/denominations"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
common2 "github.com/harmony-one/harmony/internal/common"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/ctxerror"
"github.com/harmony-one/harmony/internal/shardchain"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/node"
"github.com/harmony-one/harmony/p2p"
p2p_host "github.com/harmony-one/harmony/p2p/host"
"github.com/harmony-one/harmony/p2p/p2pimpl"
)
var (
version string
builtBy string
builtAt string
commit string
nextNonce uint64
)
func printVersion(me string) {
fmt.Fprintf(os.Stderr, "Harmony (C) 2019. %v, version %v-%v (%v %v)\n", path.Base(me), version, commit, builtBy, builtAt)
os.Exit(0)
}
// AccountState includes the balance and nonce of an account
type AccountState struct {
balance *big.Int
nonce uint64
}
const (
rpcRetry = 3
defaultConfigFile = ".hmy/wallet.ini"
defaultProfile = "default"
keystoreDir = ".hmy/keystore"
)
var (
// Transfer subcommands
transferCommand = flag.NewFlagSet("transfer", flag.ExitOnError)
transferSenderPtr = transferCommand.String("from", "0", "Specify the sender account address or index")
transferReceiverPtr = transferCommand.String("to", "", "Specify the receiver account")
transferAmountPtr = transferCommand.Float64("amount", 0, "Specify the amount to transfer")
transferGasPricePtr = transferCommand.Uint64("gasPrice", 0, "Specify the gas price amount. Unit is Nano.")
transferShardIDPtr = transferCommand.Int("shardID", 0, "Specify the shard ID for the transfer")
transferInputDataPtr = transferCommand.String("inputData", "", "Base64-encoded input data to embed in the transaction")
transferSenderPassPtr = transferCommand.String("pass", "", "Passphrase of the sender's private key")
)
var (
walletProfile *utils.WalletProfile
ks *keystore.KeyStore
)
// setupLog setup log for verbose output
func setupLog() {
// enable logging for wallet
h := log.StreamHandler(os.Stdout, log.TerminalFormat(true))
log.Root().SetHandler(h)
}
// The main wallet program entrance. Note the this wallet program is for demo-purpose only. It does not implement
// the secure storage of keys.
func main() {
rand.Seed(int64(time.Now().Nanosecond()))
// Verify that a subcommand has been provided
// os.Arg[0] is the main command
// os.Arg[1] will be the subcommand
if len(os.Args) < 2 {
fmt.Println("Usage:")
fmt.Println(" wallet -p profile <action> <params>")
fmt.Println(" -p profile - Specify the profile of the wallet, either testnet/devnet or others configured. Default is: testnet")
fmt.Println(" The profile is in file:", defaultConfigFile)
fmt.Println()
fmt.Println("Actions:")
fmt.Println(" 1. stressTest - Stress test transactions with corner cases.")
os.Exit(1)
}
ARG:
for {
lastArg := os.Args[len(os.Args)-1]
switch lastArg {
case "--verbose":
setupLog()
os.Args = os.Args[:len(os.Args)-1]
default:
break ARG
}
}
var profile string
if os.Args[1] == "-p" {
profile = os.Args[2]
os.Args = os.Args[2:]
} else {
profile = defaultProfile
}
if len(os.Args) == 1 {
fmt.Println("Missing action")
flag.PrintDefaults()
os.Exit(1)
}
// create new keystore backend
scryptN := keystore.StandardScryptN
scryptP := keystore.StandardScryptP
ks = keystore.NewKeyStore(keystoreDir, scryptN, scryptP)
// Switch on the subcommand
switch os.Args[1] {
case "-version":
printVersion(os.Args[0])
case "stressTest":
readProfile(profile)
processStressTestCommand()
default:
fmt.Printf("Unknown action: %s\n", os.Args[1])
flag.PrintDefaults()
os.Exit(1)
}
}
//go:generate go run ../../../scripts/wallet_embed_ini_files.go
func readProfile(profile string) {
fmt.Printf("Using %s profile for wallet\n", profile)
// try to load .hmy/wallet.ini from filesystem
// use default_wallet_ini if .hmy/wallet.ini doesn't exist
var err error
var iniBytes []byte
iniBytes, err = ioutil.ReadFile(defaultConfigFile)
if err != nil {
log.Debug(fmt.Sprintf("%s doesn't exist, using default ini\n", defaultConfigFile))
iniBytes = []byte(defaultWalletIni)
}
walletProfile, err = utils.ReadWalletProfile(iniBytes, profile)
if err != nil {
fmt.Printf("Read wallet profile error: %v\nExiting ...\n", err)
os.Exit(2)
}
}
// createWalletNode creates wallet server node.
func createWalletNode() *node.Node {
bootNodeAddrs, err := utils.StringsToAddrs(walletProfile.Bootnodes)
if err != nil {
panic(err)
}
utils.BootNodes = bootNodeAddrs
shardID := 0
// dummy host for wallet
// TODO: potentially, too many dummy IP may flush out good IP address from our bootnode DHT
// we need to understand the impact to bootnode DHT with this dummy host ip added
self := p2p.Peer{IP: "127.0.0.1", Port: "6999"}
priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "6999")
host, err := p2pimpl.NewHost(&self, priKey)
if err != nil {
panic(err)
}
chainDBFactory := &shardchain.MemDBFactory{}
w := node.New(host, nil, chainDBFactory, false)
w.Client = client.NewClient(w.GetHost(), uint32(shardID))
w.NodeConfig.SetRole(nodeconfig.ClientNode)
w.ServiceManagerSetup()
w.RunServices()
return w
}
// ./bin/wallet -p local transfer
// --from one1uyshu2jgv8w465yc8kkny36thlt2wvel89tcmg
// --to one1spshr72utf6rwxseaz339j09ed8p6f8ke370zj
// --amount 1 --shardID 1
func processStressTestCommand() {
/*
Account 17:
Address: one1spshr72utf6rwxseaz339j09ed8p6f8ke370zj
Balance in Shard 0: x.xxx, nonce: 0
Balance in Shard 1: 0.0000, nonce: 0
Account 18:
Address: one1uyshu2jgv8w465yc8kkny36thlt2wvel89tcmg
Balance in Shard 0: 0.0000, nonce: 0
Balance in Shard 1: x.xxx, nonce: 0
*/
fmt.Println("Creating wallet node")
walletNode := createWalletNode()
senderAddress := common2.ParseAddr("one1uyshu2jgv8w465yc8kkny36thlt2wvel89tcmg")
receiverAddress := common2.ParseAddr("one1spshr72utf6rwxseaz339j09ed8p6f8ke370zj")
shardID := 1
fmt.Printf("Sender account: %s:\n", common2.MustAddressToBech32(senderAddress))
// default inputData
data := make([]byte, 0)
gasLimit, _ := core.IntrinsicGas(data, false, true)
gasPrice := 0
gasPriceBigInt := big.NewInt(int64(gasPrice))
gasPriceBigInt = gasPriceBigInt.Mul(gasPriceBigInt, big.NewInt(denominations.Nano))
fmt.Printf("gas limit: %d, gas price: %d", gasLimit, gasPriceBigInt.Uint64())
senderPass := ""
var shardIDToAccountStateSender []*AccountState
var shardIDToAccountStateReceiver []*AccountState
var senderState *AccountState
var receiverState *AccountState
var retry uint32
for i := 0; ; i++ {
for retry = 0; retry < 10; retry++ {
shardIDToAccountStateSender = FetchBalance(senderAddress)
shardIDToAccountStateReceiver = FetchBalance(receiverAddress)
senderState = shardIDToAccountStateSender[shardID]
receiverState = shardIDToAccountStateReceiver[shardID]
if senderState.nonce == nextNonce {
break
}
time.Sleep(3 * time.Second)
fmt.Printf(".")
}
if retry == 10 {
fmt.Printf("\nRetry expired. Num txs made: %d\n", i)
break
}
nextNonce++
senderBalance := senderState.balance
receiverBalance := receiverState.balance
// amount 1/10th of the balance
amountBigInt := senderBalance.Div(senderBalance, big.NewInt(10))
fmt.Printf("\nsender: balance (shard %d: %s, nonce: %v)\n", shardID, convertBalanceIntoReadableFormat(senderBalance), senderState.nonce)
fmt.Printf("receiver balance (shard %d: %s, nonce: %v)\n", shardID, convertBalanceIntoReadableFormat(receiverBalance), receiverState.nonce)
// stop stress testing here after printing out the final balance
if i == 10 {
break
}
tx := types.NewTransaction(
senderState.nonce, receiverAddress, uint32(shardID), amountBigInt,
gasLimit, gasPriceBigInt, data)
account, _ := ks.Find(accounts.Account{Address: senderAddress})
ks.Unlock(account, senderPass)
tx, _ = ks.SignTx(account, tx, nil)
if err := submitTransaction(tx, walletNode, uint32(shardID)); err != nil {
fmt.Println(ctxerror.New("submitTransaction failed",
"tx", tx, "shardID", shardID).WithCause(err))
}
}
for shardID, balanceNonce := range FetchBalance(senderAddress) {
fmt.Printf(" Final: Balance in Shard %d: %s, nonce: %v \n", shardID, convertBalanceIntoReadableFormat(balanceNonce.balance), balanceNonce.nonce)
}
}
func convertBalanceIntoReadableFormat(balance *big.Int) string {
balance = balance.Div(balance, big.NewInt(denominations.Nano))
strBalance := fmt.Sprintf("%d", balance.Uint64())
bytes := []byte(strBalance)
hasDecimal := false
for i := 0; i < 11; i++ {
if len(bytes)-1-i < 0 {
bytes = append([]byte{'0'}, bytes...)
}
if bytes[len(bytes)-1-i] != '0' && i < 9 {
hasDecimal = true
}
if i == 9 {
newBytes := append([]byte{'.'}, bytes[len(bytes)-i:]...)
bytes = append(bytes[:len(bytes)-i], newBytes...)
}
}
zerosToRemove := 0
for i := 0; i < len(bytes); i++ {
if hasDecimal {
if bytes[len(bytes)-1-i] == '0' {
bytes = bytes[:len(bytes)-1-i]
i--
} else {
break
}
} else {
if zerosToRemove < 5 {
bytes = bytes[:len(bytes)-1-i]
i--
zerosToRemove++
} else {
break
}
}
}
return string(bytes)
}
// FetchBalance fetches account balance of specified address from the Harmony network
func FetchBalance(address common.Address) []*AccountState {
result := []*AccountState{}
for shardID := 0; shardID < walletProfile.Shards; shardID++ {
// Fill in nil pointers for each shard; nil represent failed balance fetch.
result = append(result, nil)
}
var wg sync.WaitGroup
wg.Add(walletProfile.Shards)
for shardID := 0; shardID < walletProfile.Shards; shardID++ {
go func(shardID int) {
defer wg.Done()
balance := big.NewInt(0)
var nonce uint64
result[uint32(shardID)] = &AccountState{balance, 0}
var wgShard sync.WaitGroup
wgShard.Add(len(walletProfile.RPCServer[shardID]))
var mutexAccountState = &sync.Mutex{}
for rpcServerID := 0; rpcServerID < len(walletProfile.RPCServer[shardID]); rpcServerID++ {
go func(rpcServerID int) {
for retry := 0; retry < rpcRetry; retry++ {
server := walletProfile.RPCServer[shardID][rpcServerID]
client, err := clientService.NewClient(server.IP, server.Port)
if err != nil {
continue
}
log.Debug("FetchBalance", "server", server)
response, err := client.GetBalance(address)
if err != nil {
log.Info("failed to get balance, retrying ...")
time.Sleep(200 * time.Millisecond)
continue
}
log.Debug("FetchBalance", "response", response)
respBalance := big.NewInt(0)
respBalance.SetBytes(response.Balance)
mutexAccountState.Lock()
if balance.Cmp(respBalance) < 0 {
balance.SetBytes(response.Balance)
nonce = response.Nonce
}
mutexAccountState.Unlock()
break
}
wgShard.Done()
}(rpcServerID)
}
wgShard.Wait()
result[shardID] = &AccountState{balance, nonce}
}(shardID)
}
wg.Wait()
return result
}
// GetFreeToken requests for token test token on each shard
func GetFreeToken(address common.Address) {
for i := 0; i < walletProfile.Shards; i++ {
// use the 1st server (leader) to make the getFreeToken call
server := walletProfile.RPCServer[i][0]
client, err := clientService.NewClient(server.IP, server.Port)
if err != nil {
continue
}
log.Debug("GetFreeToken", "server", server)
for retry := 0; retry < rpcRetry; retry++ {
response, err := client.GetFreeToken(address)
if err != nil {
log.Info("failed to get free token, retrying ...")
time.Sleep(200 * time.Millisecond)
continue
}
log.Debug("GetFreeToken", "response", response)
txID := common.Hash{}
txID.SetBytes(response.TxId)
fmt.Printf("Transaction Id requesting free token in shard %d: %s\n", i, txID.Hex())
break
}
}
}
// clearKeystore deletes all data in the local keystore
func clearKeystore() {
dir, err := ioutil.ReadDir(keystoreDir)
if err != nil {
panic("Failed to read keystore directory")
}
for _, d := range dir {
subdir := path.Join([]string{keystoreDir, d.Name()}...)
if err := os.RemoveAll(subdir); err != nil {
fmt.Println(ctxerror.New("cannot remove directory",
"path", subdir).WithCause(err))
}
}
fmt.Println("All existing accounts deleted...")
}
// submitTransaction submits the transaction to the Harmony network
func submitTransaction(tx *types.Transaction, walletNode *node.Node, shardID uint32) error {
msg := proto_node.ConstructTransactionListMessageAccount(types.Transactions{tx})
clientGroup := p2p.NewClientGroupIDByShardID(p2p.ShardID(shardID))
err := walletNode.GetHost().SendMessageToGroups([]p2p.GroupID{clientGroup}, p2p_host.ConstructP2pMessage(byte(0), msg))
if err != nil {
fmt.Printf("Error in SubmitTransaction: %v\n", err)
return err
}
fmt.Printf("Transaction Id for shard %d: %s\n", int(shardID), tx.Hash().Hex())
// FIXME (leo): how to we know the tx was successful sent to the network
// this is a hacky way to wait for sometime
time.Sleep(3 * time.Second)
return nil
}

@ -505,10 +505,8 @@ func main() {
currentNode.ServiceManagerSetup() currentNode.ServiceManagerSetup()
// RPC for SDK not supported for mainnet. // RPC for SDK not supported for mainnet.
if *networkType != nodeconfig.Mainnet { if err := currentNode.StartRPC(*port); err != nil {
if err := currentNode.StartRPC(*port); err != nil { ctxerror.Warn(utils.GetLogger(), err, "StartRPC failed")
ctxerror.Warn(utils.GetLogger(), err, "StartRPC failed")
}
} }
currentNode.RunServices() currentNode.RunServices()
currentNode.StartServer() currentNode.StartServer()

@ -19,6 +19,7 @@ package types
import ( import (
"container/heap" "container/heap"
"errors" "errors"
"fmt"
"io" "io"
"math/big" "math/big"
"sync/atomic" "sync/atomic"
@ -27,6 +28,7 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
common2 "github.com/harmony-one/harmony/internal/common"
) )
// no go:generate gencodec -type txdata -field-override txdataMarshaling -out gen_tx_json.go // no go:generate gencodec -type txdata -field-override txdataMarshaling -out gen_tx_json.go
@ -485,3 +487,30 @@ func (m Message) Data() []byte {
func (m Message) CheckNonce() bool { func (m Message) CheckNonce() bool {
return m.checkNonce return m.checkNonce
} }
// RecentTxsStats is a recent transactions stats map tracking stats like BlockTxsCounts.
type RecentTxsStats map[uint64]BlockTxsCounts
// String returns the string formatted representation of RecentTxsStats
func (rts RecentTxsStats) String() string {
ret := "{ "
for blockNum, blockTxsCounts := range rts {
ret += fmt.Sprintf("blockNum:%d=%s", blockNum, blockTxsCounts.String())
}
ret += " }"
return ret
}
// BlockTxsCounts is a transactions counts map of
// the number of transactions made by each account in a block on this node.
type BlockTxsCounts map[common.Address]uint64
// String returns the string formatted representation of BlockTxsCounts
func (btc BlockTxsCounts) String() string {
ret := "{ "
for sender, numTxs := range btc {
ret += fmt.Sprintf("%s:%d,", common2.MustAddressToBech32(sender), numTxs)
}
ret += " }"
return ret
}

@ -2,6 +2,9 @@ package shardingconfig
import ( import (
"math/big" "math/big"
"time"
"github.com/harmony-one/harmony/common/denominations"
) )
const ( const (
@ -32,6 +35,38 @@ func (s fixedSchedule) IsLastBlock(blockNum uint64) bool {
return blockNum%blocks == blocks-1 return blockNum%blocks == blocks-1
} }
func (s fixedSchedule) MaxTxAmountLimit() *big.Int {
amountBigInt := big.NewInt(mainnetMaxTxAmountLimit)
amountBigInt = amountBigInt.Mul(amountBigInt, big.NewInt(denominations.One))
return amountBigInt
}
func (s fixedSchedule) MaxNumRecentTxsPerAccountLimit() uint64 {
return mainnetMaxNumRecentTxsPerAccountLimit
}
func (s fixedSchedule) MaxTxPoolSizeLimit() int {
return mainnetMaxTxPoolSizeLimit
}
func (s fixedSchedule) MaxNumTxsPerBlockLimit() int {
return mainnetMaxNumTxsPerBlockLimit
}
func (s fixedSchedule) RecentTxDuration() time.Duration {
return mainnetRecentTxDuration
}
func (s fixedSchedule) TxsThrottleConfig() *TxsThrottleConfig {
return &TxsThrottleConfig{
MaxTxAmountLimit: s.MaxTxAmountLimit(),
MaxNumRecentTxsPerAccountLimit: s.MaxNumRecentTxsPerAccountLimit(),
MaxTxPoolSizeLimit: s.MaxTxPoolSizeLimit(),
MaxNumTxsPerBlockLimit: s.MaxNumTxsPerBlockLimit(),
RecentTxDuration: s.RecentTxDuration(),
}
}
// NewFixedSchedule returns a sharding configuration schedule that uses the // NewFixedSchedule returns a sharding configuration schedule that uses the
// given config instance for all epochs. Useful for testing. // given config instance for all epochs. Useful for testing.
func NewFixedSchedule(instance Instance) Schedule { func NewFixedSchedule(instance Instance) Schedule {

@ -2,7 +2,9 @@ package shardingconfig
import ( import (
"math/big" "math/big"
"time"
"github.com/harmony-one/harmony/common/denominations"
"github.com/harmony-one/harmony/internal/genesis" "github.com/harmony-one/harmony/internal/genesis"
) )
@ -18,6 +20,12 @@ const (
localnetEpochBlock1 = 20 localnetEpochBlock1 = 20
twoOne = 5 twoOne = 5
localnetMaxTxAmountLimit = 1e2 // unit is in One
localnetMaxNumRecentTxsPerAccountLimit = 2
localnetMaxTxPoolSizeLimit = 8000
localnetMaxNumTxsPerBlockLimit = 1000
localnetRecentTxDuration = 10 * time.Second
) )
func (localnetSchedule) InstanceForEpoch(epoch *big.Int) Instance { func (localnetSchedule) InstanceForEpoch(epoch *big.Int) Instance {
@ -31,7 +39,7 @@ func (localnetSchedule) InstanceForEpoch(epoch *big.Int) Instance {
} }
} }
func (localnetSchedule) BlocksPerEpoch() uint64 { func (ls localnetSchedule) BlocksPerEpoch() uint64 {
return twoOne return twoOne
} }
@ -57,6 +65,38 @@ func (ls localnetSchedule) IsLastBlock(blockNum uint64) bool {
} }
} }
func (ls localnetSchedule) MaxTxAmountLimit() *big.Int {
amountBigInt := big.NewInt(localnetMaxTxAmountLimit)
amountBigInt = amountBigInt.Mul(amountBigInt, big.NewInt(denominations.One))
return amountBigInt
}
func (ls localnetSchedule) MaxNumRecentTxsPerAccountLimit() uint64 {
return localnetMaxNumRecentTxsPerAccountLimit
}
func (ls localnetSchedule) MaxTxPoolSizeLimit() int {
return localnetMaxTxPoolSizeLimit
}
func (ls localnetSchedule) MaxNumTxsPerBlockLimit() int {
return localnetMaxNumTxsPerBlockLimit
}
func (ls localnetSchedule) RecentTxDuration() time.Duration {
return localnetRecentTxDuration
}
func (ls localnetSchedule) TxsThrottleConfig() *TxsThrottleConfig {
return &TxsThrottleConfig{
MaxTxAmountLimit: ls.MaxTxAmountLimit(),
MaxNumRecentTxsPerAccountLimit: ls.MaxNumRecentTxsPerAccountLimit(),
MaxTxPoolSizeLimit: ls.MaxTxPoolSizeLimit(),
MaxNumTxsPerBlockLimit: ls.MaxNumTxsPerBlockLimit(),
RecentTxDuration: ls.RecentTxDuration(),
}
}
var localnetReshardingEpoch = []*big.Int{big.NewInt(0), big.NewInt(localnetV1Epoch), big.NewInt(localnetV2Epoch)} var localnetReshardingEpoch = []*big.Int{big.NewInt(0), big.NewInt(localnetV1Epoch), big.NewInt(localnetV2Epoch)}
var localnetV0 = MustNewInstance(2, 7, 5, genesis.LocalHarmonyAccounts, genesis.LocalFnAccounts, localnetReshardingEpoch) var localnetV0 = MustNewInstance(2, 7, 5, genesis.LocalHarmonyAccounts, genesis.LocalFnAccounts, localnetReshardingEpoch)

@ -2,7 +2,9 @@ package shardingconfig
import ( import (
"math/big" "math/big"
"time"
"github.com/harmony-one/harmony/common/denominations"
"github.com/harmony-one/harmony/internal/genesis" "github.com/harmony-one/harmony/internal/genesis"
) )
@ -13,6 +15,12 @@ const (
mainnetV0_2Epoch = 5 mainnetV0_2Epoch = 5
mainnetV0_3Epoch = 8 mainnetV0_3Epoch = 8
mainnetV0_4Epoch = 10 mainnetV0_4Epoch = 10
mainnetMaxTxAmountLimit = 1e3 // unit is interface{} One
mainnetMaxNumRecentTxsPerAccountLimit = 1e2
mainnetMaxTxPoolSizeLimit = 8000
mainnetMaxNumTxsPerBlockLimit = 1000
mainnetRecentTxDuration = time.Hour
) )
// MainnetSchedule is the mainnet sharding configuration schedule. // MainnetSchedule is the mainnet sharding configuration schedule.
@ -65,7 +73,40 @@ func (ms mainnetSchedule) IsLastBlock(blockNum uint64) bool {
} }
} }
func (ms mainnetSchedule) MaxTxAmountLimit() *big.Int {
amountBigInt := big.NewInt(mainnetMaxTxAmountLimit)
amountBigInt = amountBigInt.Mul(amountBigInt, big.NewInt(denominations.One))
return amountBigInt
}
func (ms mainnetSchedule) MaxNumRecentTxsPerAccountLimit() uint64 {
return mainnetMaxNumRecentTxsPerAccountLimit
}
func (ms mainnetSchedule) MaxTxPoolSizeLimit() int {
return mainnetMaxTxPoolSizeLimit
}
func (ms mainnetSchedule) MaxNumTxsPerBlockLimit() int {
return mainnetMaxNumTxsPerBlockLimit
}
func (ms mainnetSchedule) RecentTxDuration() time.Duration {
return mainnetRecentTxDuration
}
func (ms mainnetSchedule) TxsThrottleConfig() *TxsThrottleConfig {
return &TxsThrottleConfig{
MaxTxAmountLimit: ms.MaxTxAmountLimit(),
MaxNumRecentTxsPerAccountLimit: ms.MaxNumRecentTxsPerAccountLimit(),
MaxTxPoolSizeLimit: ms.MaxTxPoolSizeLimit(),
MaxNumTxsPerBlockLimit: ms.MaxNumTxsPerBlockLimit(),
RecentTxDuration: ms.RecentTxDuration(),
}
}
var mainnetReshardingEpoch = []*big.Int{big.NewInt(0), big.NewInt(mainnetV0_1Epoch), big.NewInt(mainnetV0_2Epoch), big.NewInt(mainnetV0_3Epoch), big.NewInt(mainnetV0_4Epoch)} var mainnetReshardingEpoch = []*big.Int{big.NewInt(0), big.NewInt(mainnetV0_1Epoch), big.NewInt(mainnetV0_2Epoch), big.NewInt(mainnetV0_3Epoch), big.NewInt(mainnetV0_4Epoch)}
var mainnetV0 = MustNewInstance(4, 150, 112, genesis.HarmonyAccounts, genesis.FoundationalNodeAccounts, mainnetReshardingEpoch) var mainnetV0 = MustNewInstance(4, 150, 112, genesis.HarmonyAccounts, genesis.FoundationalNodeAccounts, mainnetReshardingEpoch)
var mainnetV0_1 = MustNewInstance(4, 152, 112, genesis.HarmonyAccounts, genesis.FoundationalNodeAccountsV0_1, mainnetReshardingEpoch) var mainnetV0_1 = MustNewInstance(4, 152, 112, genesis.HarmonyAccounts, genesis.FoundationalNodeAccountsV0_1, mainnetReshardingEpoch)
var mainnetV0_2 = MustNewInstance(4, 200, 148, genesis.HarmonyAccounts, genesis.FoundationalNodeAccountsV0_2, mainnetReshardingEpoch) var mainnetV0_2 = MustNewInstance(4, 200, 148, genesis.HarmonyAccounts, genesis.FoundationalNodeAccountsV0_2, mainnetReshardingEpoch)

@ -4,6 +4,7 @@ package shardingconfig
import ( import (
"math/big" "math/big"
"time"
"github.com/harmony-one/harmony/internal/genesis" "github.com/harmony-one/harmony/internal/genesis"
) )
@ -21,6 +22,24 @@ type Schedule interface {
// IsLastBlock check if the block is the last block in the epoch // IsLastBlock check if the block is the last block in the epoch
IsLastBlock(blockNum uint64) bool IsLastBlock(blockNum uint64) bool
// Max amount limit for a valid transaction
MaxTxAmountLimit() *big.Int
// Max number of transactions of a particular account per block level
MaxNumRecentTxsPerAccountLimit() uint64
// Max total number of transactions allowed as pending transactions in transaction pool
MaxTxPoolSizeLimit() int
// Max total number of transactions allowed to be processed per block
MaxNumTxsPerBlockLimit() int
// How long "recent" means for transaction in time Duration unit
RecentTxDuration() time.Duration
// configuration for throttling pending transactions
TxsThrottleConfig() *TxsThrottleConfig
} }
// Instance is one sharding configuration instance. // Instance is one sharding configuration instance.
@ -47,3 +66,45 @@ type Instance interface {
// ReshardingEpoch returns a list of Epoch while off-chain resharding happens // ReshardingEpoch returns a list of Epoch while off-chain resharding happens
ReshardingEpoch() []*big.Int ReshardingEpoch() []*big.Int
} }
// TxThrottleFlag is the throttling flag for each transaction
// Refer below enum declaration for more context.
type TxThrottleFlag int
// TxThrottleFlag is determined per transaction
// during the new block proposal and pending transactions throttling
const (
TxSelect TxThrottleFlag = iota
TxUnselect
TxInvalid
)
func (result TxThrottleFlag) String() string {
switch result {
case TxSelect:
return "TxSelect"
case TxUnselect:
return "TxUnselect"
case TxInvalid:
return "TxInvalid"
}
return "TxThrottleUnknown"
}
// TxsThrottleConfig contains configuration for throttling pending transactions per node block
type TxsThrottleConfig struct {
// Max amount limit for a valid transaction
MaxTxAmountLimit *big.Int
// Max number of transactions of a particular account for the past hour
RecentTxDuration time.Duration
// Max number of transactions of a particular account for the past hour
MaxNumRecentTxsPerAccountLimit uint64
// Max total number of transactions allowed as pending transactions in transaction pool
MaxTxPoolSizeLimit int
// Max total number of transactions allowed to be processed per block
MaxNumTxsPerBlockLimit int
}

@ -2,7 +2,9 @@ package shardingconfig
import ( import (
"math/big" "math/big"
"time"
"github.com/harmony-one/harmony/common/denominations"
"github.com/harmony-one/harmony/internal/genesis" "github.com/harmony-one/harmony/internal/genesis"
) )
@ -18,6 +20,12 @@ const (
testnetEpochBlock1 = 78 testnetEpochBlock1 = 78
threeOne = 111 threeOne = 111
testnetMaxTxAmountLimit = 1e3 // unit is in One
testnetMaxNumRecentTxsPerAccountLimit = 1e2
testnetMaxTxPoolSizeLimit = 8000
testnetMaxNumTxsPerBlockLimit = 1000
testnetRecentTxDuration = time.Hour
) )
func (testnetSchedule) InstanceForEpoch(epoch *big.Int) Instance { func (testnetSchedule) InstanceForEpoch(epoch *big.Int) Instance {
@ -58,6 +66,38 @@ func (ts testnetSchedule) IsLastBlock(blockNum uint64) bool {
} }
} }
func (ts testnetSchedule) MaxTxAmountLimit() *big.Int {
amountBigInt := big.NewInt(testnetMaxTxAmountLimit)
amountBigInt = amountBigInt.Mul(amountBigInt, big.NewInt(denominations.One))
return amountBigInt
}
func (ts testnetSchedule) MaxNumRecentTxsPerAccountLimit() uint64 {
return testnetMaxNumRecentTxsPerAccountLimit
}
func (ts testnetSchedule) MaxTxPoolSizeLimit() int {
return testnetMaxTxPoolSizeLimit
}
func (ts testnetSchedule) MaxNumTxsPerBlockLimit() int {
return testnetMaxNumTxsPerBlockLimit
}
func (ts testnetSchedule) RecentTxDuration() time.Duration {
return testnetRecentTxDuration
}
func (ts testnetSchedule) TxsThrottleConfig() *TxsThrottleConfig {
return &TxsThrottleConfig{
MaxTxAmountLimit: ts.MaxTxAmountLimit(),
MaxNumRecentTxsPerAccountLimit: ts.MaxNumRecentTxsPerAccountLimit(),
MaxTxPoolSizeLimit: ts.MaxTxPoolSizeLimit(),
MaxNumTxsPerBlockLimit: ts.MaxNumTxsPerBlockLimit(),
RecentTxDuration: ts.RecentTxDuration(),
}
}
var testnetReshardingEpoch = []*big.Int{big.NewInt(0), big.NewInt(testnetV1Epoch), big.NewInt(testnetV2Epoch)} var testnetReshardingEpoch = []*big.Int{big.NewInt(0), big.NewInt(testnetV1Epoch), big.NewInt(testnetV2Epoch)}
var testnetV0 = MustNewInstance(2, 150, 150, genesis.TNHarmonyAccounts, genesis.TNFoundationalAccounts, testnetReshardingEpoch) var testnetV0 = MustNewInstance(2, 150, 150, genesis.TNHarmonyAccounts, genesis.TNFoundationalAccounts, testnetReshardingEpoch)

@ -90,9 +90,8 @@ type Node struct {
BlockChannel chan *types.Block // The channel to send newly proposed blocks BlockChannel chan *types.Block // The channel to send newly proposed blocks
ConfirmedBlockChannel chan *types.Block // The channel to send confirmed blocks ConfirmedBlockChannel chan *types.Block // The channel to send confirmed blocks
BeaconBlockChannel chan *types.Block // The channel to send beacon blocks for non-beaconchain nodes BeaconBlockChannel chan *types.Block // The channel to send beacon blocks for non-beaconchain nodes
pendingTransactions types.Transactions // All the transactions received but not yet processed for Consensus
pendingTxMutex sync.Mutex DRand *drand.DRand // The instance for distributed randomness protocol
DRand *drand.DRand // The instance for distributed randomness protocol
// Shard databases // Shard databases
shardChains shardchain.Collection shardChains shardchain.Collection
@ -111,7 +110,12 @@ type Node struct {
// BeaconNeighbors store only neighbor nodes in the beacon chain shard // BeaconNeighbors store only neighbor nodes in the beacon chain shard
BeaconNeighbors sync.Map // All the neighbor nodes, key is the sha256 of Peer IP/Port, value is the p2p.Peer BeaconNeighbors sync.Map // All the neighbor nodes, key is the sha256 of Peer IP/Port, value is the p2p.Peer
TxPool *core.TxPool TxPool *core.TxPool // TODO migrate to TxPool from pendingTransactions list below
pendingTransactions types.Transactions // All the transactions received but not yet processed for Consensus
pendingTxMutex sync.Mutex
recentTxsStats types.RecentTxsStats
Worker *worker.Worker Worker *worker.Worker
BeaconWorker *worker.Worker // worker for beacon chain BeaconWorker *worker.Worker // worker for beacon chain
@ -225,46 +229,61 @@ func (node *Node) Beaconchain() *core.BlockChain {
} }
func (node *Node) reducePendingTransactions() { func (node *Node) reducePendingTransactions() {
txPoolLimit := core.ShardingSchedule.MaxTxPoolSizeLimit()
curLen := len(node.pendingTransactions)
// If length of pendingTransactions is greater than TxPoolLimit then by greedy take the TxPoolLimit recent transactions. // If length of pendingTransactions is greater than TxPoolLimit then by greedy take the TxPoolLimit recent transactions.
if len(node.pendingTransactions) > TxPoolLimit+TxPoolLimit { if curLen > txPoolLimit+txPoolLimit {
curLen := len(node.pendingTransactions) node.pendingTransactions = append(types.Transactions(nil), node.pendingTransactions[curLen-txPoolLimit:]...)
node.pendingTransactions = append(types.Transactions(nil), node.pendingTransactions[curLen-TxPoolLimit:]...)
utils.GetLogger().Info("mem stat reduce pending transaction") utils.GetLogger().Info("mem stat reduce pending transaction")
} }
} }
// Add new transactions to the pending transaction list. // Add new transactions to the pending transaction list.
func (node *Node) addPendingTransactions(newTxs types.Transactions) { func (node *Node) addPendingTransactions(newTxs types.Transactions) {
if node.NodeConfig.GetNetworkType() != nodeconfig.Mainnet { node.pendingTxMutex.Lock()
node.pendingTxMutex.Lock() node.pendingTransactions = append(node.pendingTransactions, newTxs...)
node.pendingTransactions = append(node.pendingTransactions, newTxs...) node.reducePendingTransactions()
node.reducePendingTransactions() node.pendingTxMutex.Unlock()
node.pendingTxMutex.Unlock() utils.GetLogInstance().Info("Got more transactions", "num", len(newTxs), "totalPending", len(node.pendingTransactions))
utils.GetLogInstance().Info("Got more transactions", "num", len(newTxs), "totalPending", len(node.pendingTransactions))
}
} }
// AddPendingTransaction adds one new transaction to the pending transaction list. // AddPendingTransaction adds one new transaction to the pending transaction list.
func (node *Node) AddPendingTransaction(newTx *types.Transaction) { func (node *Node) AddPendingTransaction(newTx *types.Transaction) {
if node.NodeConfig.GetNetworkType() != nodeconfig.Mainnet { node.addPendingTransactions(types.Transactions{newTx})
node.addPendingTransactions(types.Transactions{newTx}) utils.GetLogInstance().Debug("Got ONE more transaction", "totalPending", len(node.pendingTransactions))
utils.GetLogInstance().Debug("Got ONE more transaction", "totalPending", len(node.pendingTransactions))
}
} }
// Take out a subset of valid transactions from the pending transaction list // Take out a subset of valid transactions from the pending transaction list
// Note the pending transaction list will then contain the rest of the txs // Note the pending transaction list will then contain the rest of the txs
func (node *Node) getTransactionsForNewBlock(maxNumTxs int, coinbase common.Address) types.Transactions { func (node *Node) getTransactionsForNewBlock(coinbase common.Address) types.Transactions {
if node.NodeConfig.GetNetworkType() == nodeconfig.Mainnet {
return types.Transactions{}
}
node.pendingTxMutex.Lock() node.pendingTxMutex.Lock()
selected, unselected, invalid := node.Worker.SelectTransactionsForNewBlock(node.pendingTransactions, maxNumTxs, coinbase)
txsThrottleConfig := core.ShardingSchedule.TxsThrottleConfig()
// the next block number to be added in consensus protocol, which is always one more than current chain header block
newBlockNum := node.Blockchain().CurrentBlock().NumberU64() + 1
// remove old (> txsThrottleConfigRecentTxDuration) blockNum keys from recentTxsStats and initiailize for the new block
for blockNum := range node.recentTxsStats {
recentTxsBlockNumGap := uint64(txsThrottleConfig.RecentTxDuration / node.BlockPeriod)
if recentTxsBlockNumGap < newBlockNum-blockNum {
delete(node.recentTxsStats, blockNum)
}
}
node.recentTxsStats[newBlockNum] = make(types.BlockTxsCounts)
selected, unselected, invalid := node.Worker.SelectTransactionsForNewBlock(newBlockNum, node.pendingTransactions, node.recentTxsStats, txsThrottleConfig, coinbase)
node.pendingTransactions = unselected node.pendingTransactions = unselected
node.reducePendingTransactions() node.reducePendingTransactions()
utils.GetLogInstance().Debug("Selecting Transactions", "remainPending", len(node.pendingTransactions), "selected", len(selected), "invalidDiscarded", len(invalid))
utils.GetLogInstance().Info("Selecting Transactions",
"newBlockNum", newBlockNum,
"remainPending", len(node.pendingTransactions),
"invalidDiscarded", len(invalid))
node.pendingTxMutex.Unlock() node.pendingTxMutex.Unlock()
return selected return selected
} }
@ -330,6 +349,8 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, chainDBFactory shardc
node.BlockChannel = make(chan *types.Block) node.BlockChannel = make(chan *types.Block)
node.ConfirmedBlockChannel = make(chan *types.Block) node.ConfirmedBlockChannel = make(chan *types.Block)
node.BeaconBlockChannel = make(chan *types.Block) node.BeaconBlockChannel = make(chan *types.Block)
node.recentTxsStats = make(types.RecentTxsStats)
node.TxPool = core.NewTxPool(core.DefaultTxPoolConfig, node.Blockchain().Config(), chain) node.TxPool = core.NewTxPool(core.DefaultTxPoolConfig, node.Blockchain().Config(), chain)
node.Worker = worker.New(node.Blockchain().Config(), chain, node.Consensus, node.Consensus.ShardID) node.Worker = worker.New(node.Blockchain().Config(), chain, node.Consensus, node.Consensus.ShardID)
@ -491,17 +512,17 @@ func (node *Node) initNodeConfiguration() (service.NodeConfig, chan p2p.Peer) {
var err error var err error
node.shardGroupReceiver, err = node.host.GroupReceiver(node.NodeConfig.GetShardGroupID()) node.shardGroupReceiver, err = node.host.GroupReceiver(node.NodeConfig.GetShardGroupID())
if err != nil { if err != nil {
utils.GetLogInstance().Error("Failed to create shard receiver", "msg", err) utils.GetLogInstance().Error("Failed to create shard receiver", "err", err)
} }
node.globalGroupReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeaconClient) node.globalGroupReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeaconClient)
if err != nil { if err != nil {
utils.GetLogInstance().Error("Failed to create global receiver", "msg", err) utils.GetLogInstance().Error("Failed to create global receiver", "err", err)
} }
node.clientReceiver, err = node.host.GroupReceiver(node.NodeConfig.GetClientGroupID()) node.clientReceiver, err = node.host.GroupReceiver(node.NodeConfig.GetClientGroupID())
if err != nil { if err != nil {
utils.GetLogInstance().Error("Failed to create client receiver", "msg", err) utils.GetLogInstance().Error("Failed to create client receiver", "err", err)
} }
return nodeConfig, chanPeer return nodeConfig, chanPeer
} }

@ -40,9 +40,7 @@ import (
) )
const ( const (
// MaxNumberOfTransactionsPerBlock is the max number of transaction per a block. consensusTimeout = 30 * time.Second
MaxNumberOfTransactionsPerBlock = 8000
consensusTimeout = 30 * time.Second
) )
// ReceiveGlobalMessage use libp2p pubsub mechanism to receive global broadcast messages // ReceiveGlobalMessage use libp2p pubsub mechanism to receive global broadcast messages

@ -30,7 +30,7 @@ func TestAddNewBlock(t *testing.T) {
nodeconfig.GetShardConfig(0).SetNetworkType(nodeconfig.Devnet) nodeconfig.GetShardConfig(0).SetNetworkType(nodeconfig.Devnet)
node := New(host, consensus, testDBFactory, false) node := New(host, consensus, testDBFactory, false)
selectedTxs := node.getTransactionsForNewBlock(MaxNumberOfTransactionsPerBlock, common.Address{}) selectedTxs := node.getTransactionsForNewBlock(common.Address{})
node.Worker.CommitTransactions(selectedTxs, common.Address{}) node.Worker.CommitTransactions(selectedTxs, common.Address{})
block, _ := node.Worker.Commit([]byte{}, []byte{}, 0, common.Address{}) block, _ := node.Worker.Commit([]byte{}, []byte{}, 0, common.Address{})
@ -59,7 +59,7 @@ func TestVerifyNewBlock(t *testing.T) {
} }
node := New(host, consensus, testDBFactory, false) node := New(host, consensus, testDBFactory, false)
selectedTxs := node.getTransactionsForNewBlock(MaxNumberOfTransactionsPerBlock, common.Address{}) selectedTxs := node.getTransactionsForNewBlock(common.Address{})
node.Worker.CommitTransactions(selectedTxs, common.Address{}) node.Worker.CommitTransactions(selectedTxs, common.Address{})
block, _ := node.Worker.Commit([]byte{}, []byte{}, 0, common.Address{}) block, _ := node.Worker.Commit([]byte{}, []byte{}, 0, common.Address{})

@ -8,7 +8,6 @@ import (
"github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/types"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/ctxerror" "github.com/harmony-one/harmony/internal/ctxerror"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
) )
@ -48,12 +47,9 @@ func (node *Node) WaitForConsensusReadyv2(readySignal chan struct{}, stopChan ch
coinbase := node.Consensus.SelfAddress coinbase := node.Consensus.SelfAddress
// Normal tx block consensus // Normal tx block consensus
selectedTxs := types.Transactions{} // Empty transaction list selectedTxs := node.getTransactionsForNewBlock(coinbase)
if node.NodeConfig.GetNetworkType() != nodeconfig.Mainnet { if err := node.Worker.UpdateCurrent(coinbase); err != nil {
selectedTxs = node.getTransactionsForNewBlock(MaxNumberOfTransactionsPerBlock, coinbase) utils.GetLogger().Error("Failed updating worker's state", "Error", err)
if err := node.Worker.UpdateCurrent(coinbase); err != nil {
utils.GetLogger().Error("Failed updating worker's state", "Error", err)
}
} }
utils.GetLogInstance().Info("PROPOSING NEW BLOCK ------------------------------------------------", "blockNum", node.Blockchain().CurrentBlock().NumberU64()+1, "selectedTxs", len(selectedTxs)) utils.GetLogInstance().Info("PROPOSING NEW BLOCK ------------------------------------------------", "blockNum", node.Blockchain().CurrentBlock().NumberU64()+1, "selectedTxs", len(selectedTxs))
if err := node.Worker.CommitTransactions(selectedTxs, coinbase); err != nil { if err := node.Worker.CommitTransactions(selectedTxs, coinbase); err != nil {
@ -71,13 +67,6 @@ func (node *Node) WaitForConsensusReadyv2(readySignal chan struct{}, stopChan ch
viewID := node.Consensus.GetViewID() viewID := node.Consensus.GetViewID()
// add aggregated commit signatures from last block, except for the first two blocks // add aggregated commit signatures from last block, except for the first two blocks
if node.NodeConfig.GetNetworkType() == nodeconfig.Mainnet {
if err = node.Worker.UpdateCurrent(coinbase); err != nil {
utils.GetLogger().Debug("Failed updating worker's state", "Error", err)
continue
}
}
newBlock, err = node.Worker.Commit(sig, mask, viewID, coinbase) newBlock, err = node.Worker.Commit(sig, mask, viewID, coinbase)
if err != nil { if err != nil {

@ -38,7 +38,7 @@ func TestUpdateStakingList(t *testing.T) {
node := New(host, consensus, testDBFactory, false) node := New(host, consensus, testDBFactory, false)
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
selectedTxs := node.getTransactionsForNewBlock(MaxNumberOfTransactionsPerBlock, common.Address{}) selectedTxs := node.getTransactionsForNewBlock(common.Address{})
node.Worker.CommitTransactions(selectedTxs, common.Address{}) node.Worker.CommitTransactions(selectedTxs, common.Address{})
block, _ := node.Worker.Commit([]byte{}, []byte{}, 0, common.Address{}) block, _ := node.Worker.Commit([]byte{}, []byte{}, 0, common.Address{})

@ -12,6 +12,8 @@ import (
"github.com/harmony-one/harmony/core/state" "github.com/harmony-one/harmony/core/state"
"github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/core/vm" "github.com/harmony-one/harmony/core/vm"
shardingconfig "github.com/harmony-one/harmony/internal/configs/sharding"
"github.com/harmony-one/harmony/internal/ctxerror" "github.com/harmony-one/harmony/internal/ctxerror"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
) )
@ -41,32 +43,109 @@ type Worker struct {
shardID uint32 shardID uint32
} }
// Returns a tuple where the first value is the txs sender account address,
// the second is the throttling result enum for the transaction of interest.
// Throttling happens based on the amount, frequency, etc.
func (w *Worker) throttleTxs(selected types.Transactions, recentTxsStats types.RecentTxsStats, txsThrottleConfig *shardingconfig.TxsThrottleConfig, tx *types.Transaction) (common.Address, shardingconfig.TxThrottleFlag) {
var sender common.Address
msg, err := tx.AsMessage(types.MakeSigner(w.config, w.chain.CurrentBlock().Number()))
if err != nil {
utils.GetLogInstance().Error("Error when parsing tx into message",
"tx Id", tx.Hash().Hex(), "err", err)
} else {
sender = msg.From()
}
// already selected max num txs
if len(selected) > txsThrottleConfig.MaxNumTxsPerBlockLimit {
utils.GetLogInstance().Info("Throttling tx with max num txs per block limit",
"tx Id", tx.Hash().Hex(),
"MaxNumTxsPerBlockLimit", txsThrottleConfig.MaxNumTxsPerBlockLimit)
return sender, shardingconfig.TxUnselect
}
// throttle a single sender sending too many transactions in one block
if tx.Value().Cmp(txsThrottleConfig.MaxTxAmountLimit) > 0 {
utils.GetLogInstance().Info("Throttling tx with max amount limit",
"tx Id", tx.Hash().Hex(),
"MaxTxAmountLimit", txsThrottleConfig.MaxTxAmountLimit.Uint64(),
"Tx amount", tx.Value())
return sender, shardingconfig.TxInvalid
}
// throttle too large transaction
var numTxsPastHour uint64
for _, blockTxsCounts := range recentTxsStats {
numTxsPastHour += blockTxsCounts[sender]
}
if numTxsPastHour >= txsThrottleConfig.MaxNumRecentTxsPerAccountLimit {
utils.GetLogInstance().Info("Throttling tx with max txs per account in a single block limit",
"tx Id", tx.Hash().Hex(),
"MaxNumRecentTxsPerAccountLimit", txsThrottleConfig.MaxNumRecentTxsPerAccountLimit)
return sender, shardingconfig.TxInvalid
}
return sender, shardingconfig.TxSelect
}
// SelectTransactionsForNewBlock selects transactions for new block. // SelectTransactionsForNewBlock selects transactions for new block.
func (w *Worker) SelectTransactionsForNewBlock(txs types.Transactions, maxNumTxs int, coinbase common.Address) (types.Transactions, types.Transactions, types.Transactions) { func (w *Worker) SelectTransactionsForNewBlock(newBlockNum uint64, txs types.Transactions, recentTxsStats types.RecentTxsStats, txsThrottleConfig *shardingconfig.TxsThrottleConfig, coinbase common.Address) (types.Transactions, types.Transactions, types.Transactions) {
if w.current.gasPool == nil { if w.current.gasPool == nil {
w.current.gasPool = new(core.GasPool).AddGas(w.current.header.GasLimit) w.current.gasPool = new(core.GasPool).AddGas(w.current.header.GasLimit)
} }
selected := types.Transactions{} selected := types.Transactions{}
unselected := types.Transactions{} unselected := types.Transactions{}
invalid := types.Transactions{} invalid := types.Transactions{}
for _, tx := range txs { for _, tx := range txs {
if tx.ShardID() != w.shardID { if tx.ShardID() != w.shardID {
invalid = append(invalid, tx) invalid = append(invalid, tx)
continue
} }
snap := w.current.state.Snapshot()
_, err := w.commitTransaction(tx, coinbase) sender, flag := w.throttleTxs(selected, recentTxsStats, txsThrottleConfig, tx)
if len(selected) > maxNumTxs { switch flag {
case shardingconfig.TxUnselect:
unselected = append(unselected, tx) unselected = append(unselected, tx)
} else {
case shardingconfig.TxInvalid:
invalid = append(invalid, tx)
case shardingconfig.TxSelect:
snap := w.current.state.Snapshot()
_, err := w.commitTransaction(tx, coinbase)
if err != nil { if err != nil {
w.current.state.RevertToSnapshot(snap) w.current.state.RevertToSnapshot(snap)
invalid = append(invalid, tx) invalid = append(invalid, tx)
utils.GetLogger().Debug("Invalid transaction", "Error", err) utils.GetLogInstance().Error("Commit transaction error",
"Transaction Id", tx.Hash().Hex(),
"err", err)
} else { } else {
selected = append(selected, tx) selected = append(selected, tx)
// handle the case when msg was not able to extracted from tx
if len(sender.String()) > 0 {
recentTxsStats[newBlockNum][sender]++
}
} }
} }
// log invalid or unselected txs
if flag == shardingconfig.TxUnselect || flag == shardingconfig.TxInvalid {
utils.GetLogInstance().Info("Transaction Throttle flag",
"Transaction Id", tx.Hash().Hex(),
"txThrottleFlag", flag.String())
}
utils.GetLogInstance().Info("Transaction gas limit info",
"Transaction Id", tx.Hash().Hex(),
"tx gas limit", tx.Gas())
} }
utils.GetLogInstance().Info("Block gas limit and usage info",
"newBlockNum", newBlockNum,
"block gas limit", w.current.header.GasLimit,
"block gas used", w.current.header.GasUsed)
return selected, unselected, invalid return selected, unselected, invalid
} }

@ -7,6 +7,7 @@ SRC[harmony]=cmd/harmony/main.go
SRC[txgen]=cmd/client/txgen/main.go SRC[txgen]=cmd/client/txgen/main.go
SRC[bootnode]=cmd/bootnode/main.go SRC[bootnode]=cmd/bootnode/main.go
SRC[wallet]="cmd/client/wallet/main.go cmd/client/wallet/generated_wallet.ini.go" SRC[wallet]="cmd/client/wallet/main.go cmd/client/wallet/generated_wallet.ini.go"
SRC[wallet_stress_test]="cmd/client/wallet_stress_test/main.go cmd/client/wallet_stress_test/generated_wallet.ini.go"
BINDIR=bin BINDIR=bin
BUCKET=unique-bucket-bin BUCKET=unique-bucket-bin

Loading…
Cancel
Save