Merge pull request #1806 from rlan35/staking_tx

Use TxPool for worker's transaction proposal
pull/1851/head
Rongjian Lan 5 years ago committed by GitHub
commit 47890d1656
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      core/blockchain.go
  2. 13
      core/tx_pool.go
  3. 40
      internal/configs/sharding/fixedschedule.go
  4. 46
      internal/configs/sharding/localnet.go
  5. 46
      internal/configs/sharding/mainnet.go
  6. 42
      internal/configs/sharding/pangaea.go
  7. 67
      internal/configs/sharding/shardingconfig.go
  8. 46
      internal/configs/sharding/testnet.go
  9. 87
      node/node.go
  10. 13
      node/node_handler_test.go
  11. 17
      node/node_newblock.go
  12. 234
      node/worker/worker.go
  13. 6
      node/worker/worker_test.go
  14. 149
      test/chain/main.go

@ -1145,8 +1145,8 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
// After insertion is done, all accumulated events will be fired.
func (bc *BlockChain) InsertChain(chain types.Blocks, verifyHeaders bool) (int, error) {
n, events, logs, err := bc.insertChain(chain, verifyHeaders)
bc.PostChainEvents(events, logs)
if err == nil {
// TODO: incorporate these into insertChain
for idx, block := range chain {
header := block.Header()
header.Logger(utils.Logger()).Info().
@ -1187,6 +1187,8 @@ func (bc *BlockChain) InsertChain(chain types.Blocks, verifyHeaders bool) (int,
}
}
// This should be done after everything about adding a block is done.
bc.PostChainEvents(events, logs)
return n, err
}

@ -292,7 +292,7 @@ func (pool *TxPool) loop() {
defer journal.Stop()
// Track the previous head headers for transaction reorgs
//head := pool.chain.CurrentBlock()
head := pool.chain.CurrentBlock()
// Keep waiting for and reacting to the various events
for {
@ -301,12 +301,11 @@ func (pool *TxPool) loop() {
case ev := <-pool.chainHeadCh:
if ev.Block != nil {
pool.mu.Lock()
//if pool.chainconfig.IsHomestead(ev.Block.Number()) {
// pool.homestead = true
//}
//pool.reset(head.Header(), ev.Block.Header())
//head = ev.Block
if pool.chainconfig.IsS3(ev.Block.Epoch()) {
pool.homestead = true
}
pool.reset(head.Header(), ev.Block.Header())
head = ev.Block
pool.mu.Unlock()
}
// Be unsubscribed due to system stopped

@ -2,9 +2,6 @@ package shardingconfig
import (
"math/big"
"time"
"github.com/harmony-one/harmony/common/denominations"
)
const (
@ -55,43 +52,6 @@ func (s fixedSchedule) RandomnessStartingEpoch() uint64 {
return mainnetRandomnessStartingEpoch
}
func (s fixedSchedule) MaxTxAmountLimit() *big.Int {
amountBigInt := big.NewInt(mainnetMaxTxAmountLimit)
amountBigInt = amountBigInt.Mul(amountBigInt, big.NewInt(denominations.One))
return amountBigInt
}
func (s fixedSchedule) MaxNumRecentTxsPerAccountLimit() uint64 {
return mainnetMaxNumRecentTxsPerAccountLimit
}
func (s fixedSchedule) MaxTxPoolSizeLimit() int {
return mainnetMaxTxPoolSizeLimit
}
func (s fixedSchedule) MaxNumTxsPerBlockLimit() int {
return mainnetMaxNumTxsPerBlockLimit
}
func (s fixedSchedule) RecentTxDuration() time.Duration {
return mainnetRecentTxDuration
}
func (s fixedSchedule) EnableTxnThrottling() bool {
return mainnetEnableTxnThrottling
}
func (s fixedSchedule) TxsThrottleConfig() *TxsThrottleConfig {
return &TxsThrottleConfig{
MaxTxAmountLimit: s.MaxTxAmountLimit(),
MaxNumRecentTxsPerAccountLimit: s.MaxNumRecentTxsPerAccountLimit(),
MaxTxPoolSizeLimit: s.MaxTxPoolSizeLimit(),
MaxNumTxsPerBlockLimit: s.MaxNumTxsPerBlockLimit(),
RecentTxDuration: s.RecentTxDuration(),
EnableTxnThrottling: s.EnableTxnThrottling(),
}
}
func (s fixedSchedule) GetNetworkID() NetworkID {
return DevNet
}

@ -3,9 +3,7 @@ package shardingconfig
import (
"fmt"
"math/big"
"time"
"github.com/harmony-one/harmony/common/denominations"
"github.com/harmony-one/harmony/internal/genesis"
)
@ -26,13 +24,6 @@ const (
localnetConsensusRatio = float64(0.1)
localnetRandomnessStartingEpoch = 0
localnetMaxTxAmountLimit = 1e3 // unit is in One
localnetMaxNumRecentTxsPerAccountLimit = 1e2
localnetMaxTxPoolSizeLimit = 8000
localnetMaxNumTxsPerBlockLimit = 1000
localnetRecentTxDuration = time.Hour
localnetEnableTxnThrottling = false
)
func (localnetSchedule) InstanceForEpoch(epoch *big.Int) Instance {
@ -97,43 +88,6 @@ func (ls localnetSchedule) RandomnessStartingEpoch() uint64 {
return localnetRandomnessStartingEpoch
}
func (ls localnetSchedule) MaxTxAmountLimit() *big.Int {
amountBigInt := big.NewInt(localnetMaxTxAmountLimit)
amountBigInt = amountBigInt.Mul(amountBigInt, big.NewInt(denominations.One))
return amountBigInt
}
func (ls localnetSchedule) MaxNumRecentTxsPerAccountLimit() uint64 {
return localnetMaxNumRecentTxsPerAccountLimit
}
func (ls localnetSchedule) MaxTxPoolSizeLimit() int {
return localnetMaxTxPoolSizeLimit
}
func (ls localnetSchedule) MaxNumTxsPerBlockLimit() int {
return localnetMaxNumTxsPerBlockLimit
}
func (ls localnetSchedule) RecentTxDuration() time.Duration {
return localnetRecentTxDuration
}
func (ls localnetSchedule) EnableTxnThrottling() bool {
return localnetEnableTxnThrottling
}
func (ls localnetSchedule) TxsThrottleConfig() *TxsThrottleConfig {
return &TxsThrottleConfig{
MaxTxAmountLimit: ls.MaxTxAmountLimit(),
MaxNumRecentTxsPerAccountLimit: ls.MaxNumRecentTxsPerAccountLimit(),
MaxTxPoolSizeLimit: ls.MaxTxPoolSizeLimit(),
MaxNumTxsPerBlockLimit: ls.MaxNumTxsPerBlockLimit(),
RecentTxDuration: ls.RecentTxDuration(),
EnableTxnThrottling: ls.EnableTxnThrottling(),
}
}
func (ls localnetSchedule) GetNetworkID() NetworkID {
return LocalNet
}

@ -2,9 +2,7 @@ package shardingconfig
import (
"math/big"
"time"
"github.com/harmony-one/harmony/common/denominations"
"github.com/harmony-one/harmony/internal/genesis"
)
@ -29,13 +27,6 @@ const (
mainnetV1_4Epoch = 46
mainnetV1_5Epoch = 54
mainnetMaxTxAmountLimit = 1e3 // unit is interface{} One
mainnetMaxNumRecentTxsPerAccountLimit = 1e2
mainnetMaxTxPoolSizeLimit = 8000
mainnetMaxNumTxsPerBlockLimit = 1000
mainnetRecentTxDuration = time.Hour
mainnetEnableTxnThrottling = false
// MainNetHTTPPattern is the http pattern for mainnet.
MainNetHTTPPattern = "https://api.s%d.t.hmny.io"
// MainNetWSPattern is the websocket pattern for mainnet.
@ -135,43 +126,6 @@ func (ms mainnetSchedule) RandomnessStartingEpoch() uint64 {
return mainnetRandomnessStartingEpoch
}
func (ms mainnetSchedule) MaxTxAmountLimit() *big.Int {
amountBigInt := big.NewInt(mainnetMaxTxAmountLimit)
amountBigInt = amountBigInt.Mul(amountBigInt, big.NewInt(denominations.One))
return amountBigInt
}
func (ms mainnetSchedule) MaxNumRecentTxsPerAccountLimit() uint64 {
return mainnetMaxNumRecentTxsPerAccountLimit
}
func (ms mainnetSchedule) MaxTxPoolSizeLimit() int {
return mainnetMaxTxPoolSizeLimit
}
func (ms mainnetSchedule) MaxNumTxsPerBlockLimit() int {
return mainnetMaxNumTxsPerBlockLimit
}
func (ms mainnetSchedule) RecentTxDuration() time.Duration {
return mainnetRecentTxDuration
}
func (ms mainnetSchedule) EnableTxnThrottling() bool {
return mainnetEnableTxnThrottling
}
func (ms mainnetSchedule) TxsThrottleConfig() *TxsThrottleConfig {
return &TxsThrottleConfig{
MaxTxAmountLimit: ms.MaxTxAmountLimit(),
MaxNumRecentTxsPerAccountLimit: ms.MaxNumRecentTxsPerAccountLimit(),
MaxTxPoolSizeLimit: ms.MaxTxPoolSizeLimit(),
MaxNumTxsPerBlockLimit: ms.MaxNumTxsPerBlockLimit(),
RecentTxDuration: ms.RecentTxDuration(),
EnableTxnThrottling: ms.EnableTxnThrottling(),
}
}
func (ms mainnetSchedule) GetNetworkID() NetworkID {
return MainNet
}

@ -2,11 +2,8 @@ package shardingconfig
import (
"math/big"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/harmony-one/harmony/common/denominations"
"github.com/harmony-one/harmony/internal/genesis"
)
@ -15,8 +12,6 @@ const (
PangaeaHTTPPattern = "https://api.s%d.pga.hmny.io"
// PangaeaWSPattern is the websocket pattern for pangaea.
PangaeaWSPattern = "wss://ws.s%d.pga.hmny.io"
// transaction throttling disabled on pangaea network
pangaeaEnableTxnThrottling = false
)
// PangaeaSchedule is the Pangaea sharding configuration schedule.
@ -64,43 +59,6 @@ func (ps pangaeaSchedule) RandomnessStartingEpoch() uint64 {
return mainnetRandomnessStartingEpoch
}
func (ps pangaeaSchedule) MaxTxAmountLimit() *big.Int {
amountBigInt := big.NewInt(mainnetMaxTxAmountLimit)
amountBigInt = amountBigInt.Mul(amountBigInt, big.NewInt(denominations.One))
return amountBigInt
}
func (ps pangaeaSchedule) MaxNumRecentTxsPerAccountLimit() uint64 {
return mainnetMaxNumRecentTxsPerAccountLimit
}
func (ps pangaeaSchedule) MaxTxPoolSizeLimit() int {
return mainnetMaxTxPoolSizeLimit
}
func (ps pangaeaSchedule) MaxNumTxsPerBlockLimit() int {
return mainnetMaxNumTxsPerBlockLimit
}
func (ps pangaeaSchedule) RecentTxDuration() time.Duration {
return mainnetRecentTxDuration
}
func (ps pangaeaSchedule) EnableTxnThrottling() bool {
return pangaeaEnableTxnThrottling
}
func (ps pangaeaSchedule) TxsThrottleConfig() *TxsThrottleConfig {
return &TxsThrottleConfig{
MaxTxAmountLimit: ps.MaxTxAmountLimit(),
MaxNumRecentTxsPerAccountLimit: ps.MaxNumRecentTxsPerAccountLimit(),
MaxTxPoolSizeLimit: ps.MaxTxPoolSizeLimit(),
MaxNumTxsPerBlockLimit: ps.MaxNumTxsPerBlockLimit(),
RecentTxDuration: ps.RecentTxDuration(),
EnableTxnThrottling: ps.EnableTxnThrottling(),
}
}
func (pangaeaSchedule) GetNetworkID() NetworkID {
return Pangaea
}

@ -5,7 +5,6 @@ package shardingconfig
import (
"fmt"
"math/big"
"time"
"github.com/harmony-one/harmony/internal/genesis"
)
@ -37,27 +36,6 @@ type Schedule interface {
//RandomnessStartingEpoch returns starting epoch of randonness generation
RandomnessStartingEpoch() uint64
// Max amount limit for a valid transaction
MaxTxAmountLimit() *big.Int
// Max number of transactions of a particular account per block level
MaxNumRecentTxsPerAccountLimit() uint64
// Max total number of transactions allowed as pending transactions in transaction pool
MaxTxPoolSizeLimit() int
// Max total number of transactions allowed to be processed per block
MaxNumTxsPerBlockLimit() int
// How long "recent" means for transaction in time Duration unit
RecentTxDuration() time.Duration
// EnableTxnThrottling is the switch for transaction throttling
EnableTxnThrottling() bool
// configuration for throttling pending transactions
TxsThrottleConfig() *TxsThrottleConfig
// GetNetworkID() return networkID type.
GetNetworkID() NetworkID
@ -90,51 +68,6 @@ type Instance interface {
ReshardingEpoch() []*big.Int
}
// TxThrottleFlag is the throttling flag for each transaction
// Refer below enum declaration for more context.
type TxThrottleFlag int
// TxThrottleFlag is determined per transaction
// during the new block proposal and pending transactions throttling
const (
TxSelect TxThrottleFlag = iota
TxUnselect
TxInvalid
)
func (result TxThrottleFlag) String() string {
switch result {
case TxSelect:
return "TxSelect"
case TxUnselect:
return "TxUnselect"
case TxInvalid:
return "TxInvalid"
}
return "TxThrottleUnknown"
}
// TxsThrottleConfig contains configuration for throttling pending transactions per node block
type TxsThrottleConfig struct {
// Max amount limit for a valid transaction
MaxTxAmountLimit *big.Int
// Max number of transactions of a particular account for the past hour
RecentTxDuration time.Duration
// Max number of transactions of a particular account for the past hour
MaxNumRecentTxsPerAccountLimit uint64
// Max total number of transactions allowed as pending transactions in transaction pool
MaxTxPoolSizeLimit int
// Max total number of transactions allowed to be processed per block
MaxNumTxsPerBlockLimit int
// EnableTxnThrottling is the switch for transaction throttling
EnableTxnThrottling bool
}
// genShardingStructure return sharding structure, given shard number and its patterns.
func genShardingStructure(shardNum, shardID int, httpPattern, wsPattern string) []map[string]interface{} {
res := []map[string]interface{}{}

@ -2,9 +2,7 @@ package shardingconfig
import (
"math/big"
"time"
"github.com/harmony-one/harmony/common/denominations"
"github.com/harmony-one/harmony/internal/genesis"
)
@ -20,13 +18,6 @@ const (
testnetVdfDifficulty = 10000 // This takes about 20s to finish the vdf
testnetMaxTxAmountLimit = 1e3 // unit is in One
testnetMaxNumRecentTxsPerAccountLimit = 1e2
testnetMaxTxPoolSizeLimit = 8000
testnetMaxNumTxsPerBlockLimit = 1000
testnetRecentTxDuration = time.Hour
testnetEnableTxnThrottling = false
// TestNetHTTPPattern is the http pattern for testnet.
TestNetHTTPPattern = "https://api.s%d.b.hmny.io"
// TestNetWSPattern is the websocket pattern for testnet.
@ -72,43 +63,6 @@ func (ts testnetSchedule) RandomnessStartingEpoch() uint64 {
return mainnetRandomnessStartingEpoch
}
func (ts testnetSchedule) MaxTxAmountLimit() *big.Int {
amountBigInt := big.NewInt(testnetMaxTxAmountLimit)
amountBigInt = amountBigInt.Mul(amountBigInt, big.NewInt(denominations.One))
return amountBigInt
}
func (ts testnetSchedule) MaxNumRecentTxsPerAccountLimit() uint64 {
return testnetMaxNumRecentTxsPerAccountLimit
}
func (ts testnetSchedule) MaxTxPoolSizeLimit() int {
return testnetMaxTxPoolSizeLimit
}
func (ts testnetSchedule) MaxNumTxsPerBlockLimit() int {
return testnetMaxNumTxsPerBlockLimit
}
func (ts testnetSchedule) RecentTxDuration() time.Duration {
return testnetRecentTxDuration
}
func (ts testnetSchedule) EnableTxnThrottling() bool {
return testnetEnableTxnThrottling
}
func (ts testnetSchedule) TxsThrottleConfig() *TxsThrottleConfig {
return &TxsThrottleConfig{
MaxTxAmountLimit: ts.MaxTxAmountLimit(),
MaxNumRecentTxsPerAccountLimit: ts.MaxNumRecentTxsPerAccountLimit(),
MaxTxPoolSizeLimit: ts.MaxTxPoolSizeLimit(),
MaxNumTxsPerBlockLimit: ts.MaxNumTxsPerBlockLimit(),
RecentTxDuration: ts.RecentTxDuration(),
EnableTxnThrottling: ts.EnableTxnThrottling(),
}
}
func (ts testnetSchedule) GetNetworkID() NetworkID {
return TestNet
}

@ -52,8 +52,6 @@ const (
)
const (
// TxPoolLimit is the limit of transaction pool.
TxPoolLimit = 20000
// NumTryBroadCast is the number of times trying to broadcast
NumTryBroadCast = 3
// ClientRxQueueSize is the number of client messages to queue before tail-dropping.
@ -278,23 +276,19 @@ func (node *Node) tryBroadcast(tx *types.Transaction) {
// Add new transactions to the pending transaction list.
func (node *Node) addPendingTransactions(newTxs types.Transactions) {
txPoolLimit := shard.Schedule.MaxTxPoolSizeLimit()
node.pendingTxMutex.Lock()
for _, tx := range newTxs {
if _, ok := node.pendingTransactions[tx.Hash()]; !ok {
node.pendingTransactions[tx.Hash()] = tx
}
if len(node.pendingTransactions) > txPoolLimit {
break
}
}
node.TxPool.AddRemotes(newTxs)
node.pendingTxMutex.Unlock()
utils.Logger().Info().Int("length of newTxs", len(newTxs)).Int("totalPending", len(node.pendingTransactions)).Msg("Got more transactions")
pendingCount, queueCount := node.TxPool.Stats()
utils.Logger().Info().Int("length of newTxs", len(newTxs)).Int("totalPending", pendingCount).Int("totalQueued", queueCount).Msg("Got more transactions")
}
// Add new staking transactions to the pending staking transaction list.
func (node *Node) addPendingStakingTransactions(newStakingTxs staking.StakingTransactions) {
txPoolLimit := shard.Schedule.MaxTxPoolSizeLimit()
txPoolLimit := 1000 // TODO: incorporate staking txn into TxPool
node.pendingStakingTxMutex.Lock()
for _, tx := range newStakingTxs {
if _, ok := node.pendingStakingTransactions[tx.Hash()]; !ok {
@ -305,7 +299,7 @@ func (node *Node) addPendingStakingTransactions(newStakingTxs staking.StakingTra
}
}
node.pendingStakingTxMutex.Unlock()
utils.Logger().Info().Int("length of newStakingTxs", len(newStakingTxs)).Int("totalPending", len(node.pendingTransactions)).Msg("Got more staking transactions")
utils.Logger().Info().Int("length of newStakingTxs", len(newStakingTxs)).Int("totalPending", len(node.pendingStakingTransactions)).Msg("Got more staking transactions")
}
// AddPendingStakingTransaction staking transactions
@ -323,7 +317,6 @@ func (node *Node) AddPendingTransaction(newTx *types.Transaction) {
utils.Logger().Info().Str("Hash", newTx.Hash().Hex()).Msg("Broadcasting Tx")
node.tryBroadcast(newTx)
}
utils.Logger().Debug().Int("totalPending", len(node.pendingTransactions)).Msg("Got ONE more transaction")
}
// AddPendingReceipts adds one receipt message to pending list.
@ -348,70 +341,6 @@ func (node *Node) AddPendingReceipts(receipts *types.CXReceiptsProof) {
utils.Logger().Info().Int("totalPendingReceipts", len(node.pendingCXReceipts)).Msg("Got ONE more receipt message")
}
// Take out a subset of valid transactions from the pending transaction list
// Note the pending transaction list will then contain the rest of the txs
func (node *Node) getTransactionsForNewBlock(coinbase common.Address) (types.Transactions, staking.StakingTransactions) {
txsThrottleConfig := shard.Schedule.TxsThrottleConfig()
// the next block number to be added in consensus protocol, which is always one more than current chain header block
newBlockNum := node.Blockchain().CurrentBlock().NumberU64() + 1
// remove old (> txsThrottleConfigRecentTxDuration) blockNum keys from recentTxsStats and initiailize for the new block
for blockNum := range node.recentTxsStats {
recentTxsBlockNumGap := uint64(txsThrottleConfig.RecentTxDuration / node.BlockPeriod)
if recentTxsBlockNumGap < newBlockNum-blockNum {
delete(node.recentTxsStats, blockNum)
}
}
node.recentTxsStats[newBlockNum] = make(types.BlockTxsCounts)
// Must update to the correct current state before processing potential txns
if err := node.Worker.UpdateCurrent(coinbase); err != nil {
utils.Logger().Error().
Err(err).
Msg("Failed updating worker's state before txn selection")
return types.Transactions{}, staking.StakingTransactions{}
}
node.pendingTxMutex.Lock()
defer node.pendingTxMutex.Unlock()
node.pendingStakingTxMutex.Lock()
defer node.pendingStakingTxMutex.Unlock()
pendingTransactions := types.Transactions{}
pendingStakingTransactions := staking.StakingTransactions{}
for _, tx := range node.pendingTransactions {
pendingTransactions = append(pendingTransactions, tx)
}
for _, tx := range node.pendingStakingTransactions {
pendingStakingTransactions = append(pendingStakingTransactions, tx)
}
selected, unselected, invalid := node.Worker.SelectTransactionsForNewBlock(newBlockNum, pendingTransactions, node.recentTxsStats, txsThrottleConfig, coinbase)
selectedStaking, unselectedStaking, invalidStaking :=
node.Worker.SelectStakingTransactionsForNewBlock(newBlockNum, pendingStakingTransactions, coinbase)
node.pendingTransactions = make(map[common.Hash]*types.Transaction)
for _, unselectedTx := range unselected {
node.pendingTransactions[unselectedTx.Hash()] = unselectedTx
}
utils.Logger().Info().
Int("remainPending", len(node.pendingTransactions)).
Int("selected", len(selected)).
Int("invalidDiscarded", len(invalid)).
Msg("Selecting Transactions")
node.pendingStakingTransactions = make(map[common.Hash]*staking.StakingTransaction)
for _, unselectedStakingTx := range unselectedStaking {
node.pendingStakingTransactions[unselectedStakingTx.Hash()] = unselectedStakingTx
}
utils.Logger().Info().
Int("remainPending", len(node.pendingStakingTransactions)).
Int("selected", len(unselectedStaking)).
Int("invalidDiscarded", len(invalidStaking)).
Msg("Selecting Staking Transactions")
return selected, selectedStaking
}
func (node *Node) startRxPipeline(
receiver p2p.GroupReceiver, queue *msgq.Queue, numWorkers int,
) {

@ -3,6 +3,9 @@ package node
import (
"testing"
"github.com/harmony-one/harmony/core/types"
types2 "github.com/harmony-one/harmony/staking/types"
"github.com/ethereum/go-ethereum/common"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/consensus/quorum"
@ -33,8 +36,9 @@ func TestAddNewBlock(t *testing.T) {
nodeconfig.SetNetworkType(nodeconfig.Devnet)
node := New(host, consensus, testDBFactory, false)
selectedTxs, stks := node.getTransactionsForNewBlock(common.Address{})
node.Worker.CommitTransactions(selectedTxs, stks, common.Address{})
txs := make(map[common.Address]types.Transactions)
stks := types2.StakingTransactions{}
node.Worker.CommitTransactions(txs, stks, common.Address{})
block, _ := node.Worker.FinalizeNewBlock([]byte{}, []byte{}, 0, common.Address{}, nil, nil)
err = node.AddNewBlock(block)
@ -65,8 +69,9 @@ func TestVerifyNewBlock(t *testing.T) {
}
node := New(host, consensus, testDBFactory, false)
selectedTxs, stks := node.getTransactionsForNewBlock(common.Address{})
node.Worker.CommitTransactions(selectedTxs, stks, common.Address{})
txs := make(map[common.Address]types.Transactions)
stks := types2.StakingTransactions{}
node.Worker.CommitTransactions(txs, stks, common.Address{})
block, _ := node.Worker.FinalizeNewBlock([]byte{}, []byte{}, 0, common.Address{}, nil, nil)
if err := node.VerifyNewBlock(block); err != nil {

@ -5,6 +5,8 @@ import (
"sort"
"time"
types2 "github.com/harmony-one/harmony/staking/types"
"github.com/ethereum/go-ethereum/common"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils"
@ -80,9 +82,20 @@ func (node *Node) proposeNewBlock() (*types.Block, error) {
coinbase := node.Consensus.SelfAddress
// Prepare transactions including staking transactions
selectedTxs, selectedStakingTxs := node.getTransactionsForNewBlock(coinbase)
pending, err := node.TxPool.Pending()
if err != nil {
utils.Logger().Err(err).Msg("Failed to fetch pending transactions")
return nil, err
}
// TODO: integrate staking transaction into tx pool
pendingStakingTransactions := types2.StakingTransactions{}
for _, tx := range node.pendingStakingTransactions {
pendingStakingTransactions = append(pendingStakingTransactions, tx)
}
if err := node.Worker.CommitTransactions(selectedTxs, selectedStakingTxs, coinbase); err != nil {
node.Worker.UpdateCurrent(coinbase)
if err := node.Worker.CommitTransactions(pending, pendingStakingTransactions, coinbase); err != nil {
utils.Logger().Error().Err(err).Msg("cannot commit transactions")
return nil, err
}

@ -5,6 +5,8 @@ import (
"math/big"
"time"
common2 "github.com/harmony-one/harmony/internal/common"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/block"
@ -14,7 +16,6 @@ import (
"github.com/harmony-one/harmony/core/state"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/core/vm"
shardingconfig "github.com/harmony-one/harmony/internal/configs/sharding"
"github.com/harmony-one/harmony/internal/ctxerror"
"github.com/harmony-one/harmony/internal/params"
"github.com/harmony-one/harmony/internal/utils"
@ -25,6 +26,8 @@ import (
// environment is the worker's current environment and holds all of the current state information.
type environment struct {
signer types.Signer
state *state.DB // apply state changes here
gasPool *core.GasPool // available gas used to pack transactions
@ -50,149 +53,97 @@ type Worker struct {
gasCeil uint64
}
// Returns a tuple where the first value is the txs sender account address,
// the second is the throttling result enum for the transaction of interest.
// Throttling happens based on the amount, frequency, etc.
func (w *Worker) throttleTxs(selected types.Transactions, recentTxsStats types.RecentTxsStats, txsThrottleConfig *shardingconfig.TxsThrottleConfig, tx *types.Transaction) (common.Address, shardingconfig.TxThrottleFlag) {
var sender common.Address
msg, err := tx.AsMessage(types.MakeSigner(w.config, w.chain.CurrentBlock().Epoch()))
if err != nil {
utils.Logger().Error().Err(err).Str("txId", tx.Hash().Hex()).Msg("Error when parsing tx into message")
} else {
sender = msg.From()
}
// do not throttle transactions if disabled
if !txsThrottleConfig.EnableTxnThrottling {
return sender, shardingconfig.TxSelect
}
// already selected max num txs
if len(selected) > txsThrottleConfig.MaxNumTxsPerBlockLimit {
utils.Logger().Info().Str("txId", tx.Hash().Hex()).Int("MaxNumTxsPerBlockLimit", txsThrottleConfig.MaxNumTxsPerBlockLimit).Msg("Throttling tx with max num txs per block limit")
return sender, shardingconfig.TxUnselect
}
// throttle a single sender sending too many transactions in one block
if tx.Value().Cmp(txsThrottleConfig.MaxTxAmountLimit) > 0 {
utils.Logger().Info().Str("txId", tx.Hash().Hex()).Uint64("MaxTxAmountLimit", txsThrottleConfig.MaxTxAmountLimit.Uint64()).Uint64("txAmount", tx.Value().Uint64()).Msg("Throttling tx with max amount limit")
return sender, shardingconfig.TxInvalid
}
// throttle too large transaction
var numTxsPastHour uint64
for _, blockTxsCounts := range recentTxsStats {
numTxsPastHour += blockTxsCounts[sender]
}
if numTxsPastHour >= txsThrottleConfig.MaxNumRecentTxsPerAccountLimit {
utils.Logger().Info().Str("txId", tx.Hash().Hex()).Uint64("MaxNumRecentTxsPerAccountLimit", txsThrottleConfig.MaxNumRecentTxsPerAccountLimit).Msg("Throttling tx with max txs per account in a single block limit")
return sender, shardingconfig.TxInvalid
}
return sender, shardingconfig.TxSelect
}
// SelectTransactionsForNewBlock selects transactions for new block.
func (w *Worker) SelectTransactionsForNewBlock(newBlockNum uint64, txs types.Transactions, recentTxsStats types.RecentTxsStats, txsThrottleConfig *shardingconfig.TxsThrottleConfig, coinbase common.Address) (types.Transactions, types.Transactions, types.Transactions) {
// CommitTransactions commits transactions for new block.
func (w *Worker) CommitTransactions(pendingNormal map[common.Address]types.Transactions, pendingStaking staking.StakingTransactions, coinbase common.Address) error {
if w.current.gasPool == nil {
w.current.gasPool = new(core.GasPool).AddGas(w.current.header.GasLimit())
}
selected := types.Transactions{}
unselected := types.Transactions{}
invalid := types.Transactions{}
for _, tx := range txs {
if tx.ShardID() != w.chain.ShardID() {
invalid = append(invalid, tx)
continue
}
txs := types.NewTransactionsByPriceAndNonce(w.current.signer, pendingNormal)
var coalescedLogs []*types.Log
// NORMAL
for {
// If we don't have enough gas for any further transactions then we're done
if w.current.gasPool.Gas() < params.TxGas {
utils.Logger().Info().Str("Not enough gas for further transactions, have", w.current.gasPool.String()).Uint64("want", params.TxGas)
unselected = append(unselected, tx)
utils.Logger().Info().Uint64("have", w.current.gasPool.Gas()).Uint64("want", params.TxGas).Msg("Not enough gas for further transactions")
break
}
// Retrieve the next transaction and abort if all done
tx := txs.Peek()
if tx == nil {
break
}
// Error may be ignored here. The error has already been checked
// during transaction acceptance is the transaction pool.
//
// We use the eip155 signer regardless of the current hf.
from, _ := types.Sender(w.current.signer, tx)
// Check whether the tx is replay protected. If we're not in the EIP155 hf
// phase, start ignoring the sender until we do.
if tx.Protected() && !w.config.IsEIP155(w.current.header.Number()) {
utils.Logger().Info().Str("hash", tx.Hash().Hex()).Str("eip155Epoch", w.config.EIP155Epoch.String()).Msg("Ignoring reply protected transaction")
txs.Pop()
continue
}
// Start executing the transaction
w.current.state.Prepare(tx.Hash(), common.Hash{}, len(w.current.txs))
sender, flag := w.throttleTxs(selected, recentTxsStats, txsThrottleConfig, tx)
switch flag {
case shardingconfig.TxUnselect:
unselected = append(unselected, tx)
case shardingconfig.TxInvalid:
invalid = append(invalid, tx)
case shardingconfig.TxSelect:
if tx.GasPrice().Uint64() == 0 {
invalid = append(invalid, tx)
} else {
snap := w.current.state.Snapshot()
_, err := w.commitTransaction(tx, coinbase)
if err != nil {
w.current.state.RevertToSnapshot(snap)
invalid = append(invalid, tx)
utils.Logger().Error().Err(err).Str("txId", tx.Hash().Hex()).Msg("Commit transaction error")
} else {
selected = append(selected, tx)
// handle the case when msg was not able to extracted from tx
if len(sender.String()) > 0 {
recentTxsStats[newBlockNum][sender]++
}
}
}
if tx.ShardID() != w.chain.ShardID() {
txs.Shift()
continue
}
// log invalid or unselected txs
if flag == shardingconfig.TxUnselect || flag == shardingconfig.TxInvalid {
utils.Logger().Info().Str("txId", tx.Hash().Hex()).Str("txThrottleFlag", flag.String()).Msg("Transaction Throttle flag")
logs, err := w.commitTransaction(tx, coinbase)
sender, _ := common2.AddressToBech32(from)
switch err {
case core.ErrGasLimitReached:
// Pop the current out-of-gas transaction without shifting in the next from the account
utils.Logger().Info().Str("sender", sender).Msg("Gas limit exceeded for current block")
txs.Pop()
case core.ErrNonceTooLow:
// New head notification data race between the transaction pool and miner, shift
utils.Logger().Info().Str("sender", sender).Uint64("nonce", tx.Nonce()).Msg("Skipping transaction with low nonce")
txs.Shift()
case core.ErrNonceTooHigh:
// Reorg notification data race between the transaction pool and miner, skip account =
utils.Logger().Info().Str("sender", sender).Uint64("nonce", tx.Nonce()).Msg("Skipping account with high nonce")
txs.Pop()
case nil:
// Everything ok, collect the logs and shift in the next transaction from the same account
coalescedLogs = append(coalescedLogs, logs...)
txs.Shift()
default:
// Strange error, discard the transaction and get the next in line (note, the
// nonce-too-high clause will prevent us from executing in vain).
utils.Logger().Info().Str("hash", tx.Hash().Hex()).AnErr("err", err).Msg("Transaction failed, account skipped")
txs.Shift()
}
utils.Logger().Info().Str("txId", tx.Hash().Hex()).Uint64("txGasLimit", tx.Gas()).Msg("Transaction gas limit info")
}
utils.Logger().Info().Uint64("newBlockNum", newBlockNum).Int("newTxns", len(selected)).Uint64("blockGasLimit", w.current.header.GasLimit()).Uint64("blockGasUsed", w.current.header.GasUsed()).Msg("Block gas limit and usage info")
return selected, unselected, invalid
}
// SelectStakingTransactionsForNewBlock selects staking transactions for new block.
func (w *Worker) SelectStakingTransactionsForNewBlock(
newBlockNum uint64, txs staking.StakingTransactions,
coinbase common.Address) (staking.StakingTransactions, staking.StakingTransactions, staking.StakingTransactions) {
// only beaconchain process staking transaction
if w.chain.ShardID() != shard.BeaconChainShardID {
utils.Logger().Warn().Msgf("Invalid shardID: %v", w.chain.ShardID())
return nil, nil, nil
}
if w.current.gasPool == nil {
w.current.gasPool = new(core.GasPool).AddGas(w.current.header.GasLimit())
}
selected := staking.StakingTransactions{}
// TODO: chao add total gas fee checking when needed
unselected := staking.StakingTransactions{}
invalid := staking.StakingTransactions{}
for _, tx := range txs {
snap := w.current.state.Snapshot()
_, err := w.commitStakingTransaction(tx, coinbase)
if err != nil {
w.current.state.RevertToSnapshot(snap)
invalid = append(invalid, tx)
utils.Logger().Error().Err(err).Str("stakingTxId", tx.Hash().Hex()).Msg("Commit staking transaction error")
} else {
selected = append(selected, tx)
utils.Logger().Info().Str("stakingTxId", tx.Hash().Hex()).Uint64("txGasLimit", tx.Gas()).Msg("StakingTransaction gas limit info")
// STAKING - only beaconchain process staking transaction
if w.chain.ShardID() == shard.BeaconChainShardID {
for _, tx := range pendingStaking {
logs, err := w.commitStakingTransaction(tx, coinbase)
if err != nil {
utils.Logger().Error().Err(err).Str("stakingTxId", tx.Hash().Hex()).Msg("Commit staking transaction error")
} else {
coalescedLogs = append(coalescedLogs, logs...)
utils.Logger().Info().Str("stakingTxId", tx.Hash().Hex()).Uint64("txGasLimit", tx.Gas()).Msg("StakingTransaction gas limit info")
}
}
}
utils.Logger().Info().Uint64("newBlockNum", newBlockNum).Uint64("blockGasLimit",
w.current.header.GasLimit()).Uint64("blockGasUsed",
w.current.header.GasUsed()).Msg("[SelectStakingTransaction] Block gas limit and usage info")
utils.Logger().Info().Int("newTxns", len(w.current.txs)).Uint64("blockGasLimit", w.current.header.GasLimit()).Uint64("blockGasUsed", w.current.header.GasUsed()).Msg("Block gas limit and usage info")
return selected, unselected, invalid
return nil
}
func (w *Worker) commitStakingTransaction(tx *staking.StakingTransaction, coinbase common.Address) ([]*types.Log, error) {
@ -238,40 +189,6 @@ func (w *Worker) commitTransaction(tx *types.Transaction, coinbase common.Addres
return receipt.Logs, nil
}
// CommitTransactions commits transactions.
func (w *Worker) CommitTransactions(txs types.Transactions, stakingTxns staking.StakingTransactions, coinbase common.Address) error {
// Must update to the correct current state before processing potential txns
if err := w.UpdateCurrent(coinbase); err != nil {
utils.Logger().Error().
Err(err).
Msg("Failed updating worker's state before committing txns")
return err
}
if w.current.gasPool == nil {
w.current.gasPool = new(core.GasPool).AddGas(w.current.header.GasLimit())
}
for _, tx := range txs {
snap := w.current.state.Snapshot()
_, err := w.commitTransaction(tx, coinbase)
if err != nil {
w.current.state.RevertToSnapshot(snap)
return err
}
}
for _, tx := range stakingTxns {
snap := w.current.state.Snapshot()
_, err := w.commitStakingTransaction(tx, coinbase)
if err != nil {
w.current.state.RevertToSnapshot(snap)
return err
}
}
return nil
}
// CommitReceipts commits a list of already verified incoming cross shard receipts
func (w *Worker) CommitReceipts(receiptsList []*types.CXReceiptsProof) error {
if w.current.gasPool == nil {
@ -322,6 +239,7 @@ func (w *Worker) makeCurrent(parent *types.Block, header *block.Header) error {
return err
}
env := &environment{
signer: types.NewEIP155Signer(w.config.ChainID),
state: state,
header: header,
}

@ -5,6 +5,8 @@ import (
"math/rand"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
blockfactory "github.com/harmony-one/harmony/block/factory"
@ -74,7 +76,9 @@ func TestCommitTransactions(t *testing.T) {
tx, _ := types.SignTx(types.NewTransaction(baseNonce, testBankAddress, uint32(0), big.NewInt(int64(denominations.One*randAmount)), params.TxGas, nil, nil), types.HomesteadSigner{}, testBankKey)
// Commit the tx to the worker
err := worker.CommitTransactions(types.Transactions{tx}, nil, testBankAddress)
txs := make(map[common.Address]types.Transactions)
txs[testBankAddress] = types.Transactions{tx}
err := worker.CommitTransactions(txs, nil, testBankAddress)
if err != nil {
t.Error(err)
}

@ -125,7 +125,11 @@ func fundFaucetContract(chain *core.BlockChain) {
amount := 720000
tx, _ := types.SignTx(types.NewTransaction(nonce+uint64(4), StakingAddress, 0, big.NewInt(int64(amount)), params.TxGas, nil, nil), types.HomesteadSigner{}, FaucetPriKey)
txs = append(txs, tx)
err := contractworker.CommitTransactions(txs, nil, testUserAddress)
txmap := make(map[common.Address]types.Transactions)
txmap[FaucetAddress] = txs
err := contractworker.CommitTransactions(txmap, nil, testUserAddress)
if err != nil {
fmt.Println(err)
}
@ -163,7 +167,10 @@ func callFaucetContractToFundAnAddress(chain *core.BlockChain) {
callEnc = append(callEnc, paddedAddress...)
callfaucettx, _ := types.SignTx(types.NewTransaction(nonce+uint64(5), faucetContractAddress, 0, big.NewInt(0), params.TxGasContractCreation*10, nil, callEnc), types.HomesteadSigner{}, FaucetPriKey)
err = contractworker.CommitTransactions(types.Transactions{callfaucettx}, nil, testUserAddress)
txmap := make(map[common.Address]types.Transactions)
txmap[FaucetAddress] = types.Transactions{callfaucettx}
err = contractworker.CommitTransactions(txmap, nil, testUserAddress)
if err != nil {
fmt.Println(err)
}
@ -192,143 +199,6 @@ func playFaucetContract(chain *core.BlockChain) {
callFaucetContractToFundAnAddress(chain)
}
func playSetupStakingContract(chain *core.BlockChain) {
fmt.Println()
fmt.Println("--------- ************************** ---------")
fmt.Println()
fmt.Println("--------- Now Setting up Staking Contract ---------")
fmt.Println()
//worker := pkgworker.New(params.TestChainConfig, chain, consensus.NewFaker(), crypto.PubkeyToAddress(StakingPriKey.PublicKey), 0)
state = contractworker.GetCurrentState()
fmt.Println("Before Staking Balances")
fmt.Println("user address balance")
fmt.Println(state.GetBalance(allRandomUserAddress[0]))
fmt.Println("The balances for 2 more users:")
fmt.Println(state.GetBalance(allRandomUserAddress[1]))
fmt.Println(state.GetBalance(allRandomUserAddress[2]))
nonce = contractworker.GetCurrentState().GetNonce(crypto.PubkeyToAddress(StakingPriKey.PublicKey))
dataEnc = common.FromHex(StakingContractBinary)
stx, _ := types.SignTx(types.NewContractCreation(nonce, 0, big.NewInt(0), params.TxGasContractCreation*10, nil, dataEnc), types.HomesteadSigner{}, StakingPriKey)
stakingtxns = append(stakingtxns, stx)
stakeContractAddress = crypto.CreateAddress(StakingAddress, nonce+uint64(0))
state = contractworker.GetCurrentState()
fmt.Println("stake contract balance :")
fmt.Println(state.GetBalance(stakeContractAddress))
fmt.Println("stake address balance :")
fmt.Println(state.GetBalance(StakingAddress))
}
func playStaking(chain *core.BlockChain) {
depositFnSignature := []byte("deposit()")
fnHash := hash.Keccak256(depositFnSignature)
methodID := fnHash[:4]
var callEncl []byte
callEncl = append(callEncl, methodID...)
stake := 100000
fmt.Println()
fmt.Println("--------- Staking Contract added to txns ---------")
fmt.Println()
fmt.Printf("-- Now Staking with stake: %d --\n", stake)
fmt.Println()
for i := 0; i <= 2; i++ {
//Deposit Does not take a argument, stake is transferred via amount.
tx, _ := types.SignTx(types.NewTransaction(0, stakeContractAddress, 0, big.NewInt(int64(stake)), params.TxGas*5, nil, callEncl), types.HomesteadSigner{}, allRandomUserKey[i])
stakingtxns = append(stakingtxns, tx)
}
err = contractworker.CommitTransactions(stakingtxns, nil, common.Address{})
if err != nil {
fmt.Println(err)
}
block, _ := contractworker.FinalizeNewBlock([]byte{}, []byte{}, 0, common.Address{}, nil, nil)
_, err = chain.InsertChain(types.Blocks{block}, true /* verifyHeaders */)
if err != nil {
fmt.Println(err)
}
// receipts := contractworker.GetCurrentReceipts()
// fmt.Println(receipts[len(receipts)-4].ContractAddress)
state = contractworker.GetCurrentState()
fmt.Printf("After Staking Balances (should be less by %d)\n", stake)
fmt.Println("user address balance")
fmt.Println(state.GetBalance(allRandomUserAddress[0]))
fmt.Println("The balances for 2 more users:")
fmt.Println(state.GetBalance(allRandomUserAddress[1]))
fmt.Println(state.GetBalance(allRandomUserAddress[2]))
fmt.Println("faucet contract balance (unchanged):")
fmt.Println(state.GetBalance(faucetContractAddress))
fmt.Println("stake contract balance :")
fmt.Println(state.GetBalance(stakeContractAddress))
fmt.Println("stake address balance :")
fmt.Println(state.GetBalance(StakingAddress))
}
func playWithdrawStaking(chain *core.BlockChain) {
fmt.Println()
fmt.Println("--------- Now Setting up Withdrawing Stakes ---------")
withdrawFnSignature := []byte("withdraw(uint256)")
fnHash := hash.Keccak256(withdrawFnSignature)
methodID := fnHash[:4]
withdraw := "5000"
withdrawstake := new(big.Int)
withdrawstake.SetString(withdraw, 10)
paddedAmount := common.LeftPadBytes(withdrawstake.Bytes(), 32)
var dataEncl []byte
dataEncl = append(dataEncl, methodID...)
dataEncl = append(dataEncl, paddedAmount...)
var withdrawstakingtxns []*types.Transaction
fmt.Println()
fmt.Printf("-- Withdrawing Stake by amount: %s --\n", withdraw)
fmt.Println()
for i := 0; i <= 2; i++ {
cnonce := contractworker.GetCurrentState().GetNonce(allRandomUserAddress[i])
tx, _ := types.SignTx(types.NewTransaction(cnonce, stakeContractAddress, 0, big.NewInt(0), params.TxGas*5, nil, dataEncl), types.HomesteadSigner{}, allRandomUserKey[i])
withdrawstakingtxns = append(withdrawstakingtxns, tx)
}
err = contractworker.CommitTransactions(withdrawstakingtxns, nil, common.Address{})
if err != nil {
fmt.Println("error:")
fmt.Println(err)
}
block, _ := contractworker.FinalizeNewBlock([]byte{}, []byte{}, 0, common.Address{}, nil, nil)
_, err = chain.InsertChain(types.Blocks{block}, true /* verifyHeaders */)
if err != nil {
fmt.Println(err)
}
state = contractworker.GetCurrentState()
fmt.Printf("Withdraw Staking Balances (should be up by %s)\n", withdraw)
fmt.Println(state.GetBalance(allRandomUserAddress[0]))
fmt.Println(state.GetBalance(allRandomUserAddress[1]))
fmt.Println(state.GetBalance(allRandomUserAddress[2]))
fmt.Println("faucet contract balance (unchanged):")
fmt.Println(state.GetBalance(faucetContractAddress))
fmt.Printf("stake contract balance (should downup by %s)\n", withdraw)
fmt.Println(state.GetBalance(stakeContractAddress))
fmt.Println("stake address balance :")
fmt.Println(state.GetBalance(StakingAddress))
}
func playStakingContract(chain *core.BlockChain) {
playSetupStakingContract(chain)
playStaking(chain)
playWithdrawStaking(chain)
}
func main() {
genesis := gspec.MustCommit(database)
chain, _ := core.NewBlockChain(database, nil, gspec.Config, chain.Engine(), vm.Config{}, nil)
@ -356,5 +226,4 @@ func main() {
}
playFaucetContract(chain)
playStakingContract(chain)
}

Loading…
Cancel
Save