Use TxPool for worker's transaction proposal

pull/1806/head
Rongjian Lan 5 years ago
parent 5c23794b88
commit d21ce08309
  1. 31
      go.mod
  2. 83
      node/node.go
  3. 13
      node/node_handler_test.go
  4. 18
      node/node_newblock.go
  5. 190
      node/worker/worker.go
  6. 6
      node/worker/worker_test.go
  7. 149
      test/chain/main.go

@ -4,33 +4,29 @@ go 1.12
require ( require (
github.com/Workiva/go-datastructures v1.0.50 github.com/Workiva/go-datastructures v1.0.50
github.com/allegro/bigcache v1.2.1 // indirect github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc // indirect
github.com/aristanetworks/goarista v0.0.0-20190607111240-52c2a7864a08 // indirect github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d
github.com/cespare/cp v1.1.1 github.com/cespare/cp v1.1.1
github.com/davecgh/go-spew v1.1.1 github.com/davecgh/go-spew v1.1.1
github.com/deckarep/golang-set v1.7.1 github.com/deckarep/golang-set v1.7.1
github.com/edsrzf/mmap-go v1.0.0 // indirect
github.com/ethereum/go-ethereum v1.8.27 github.com/ethereum/go-ethereum v1.8.27
github.com/fatih/color v1.7.0 github.com/fatih/color v1.7.0
github.com/fjl/memsize v0.0.0-20180929194037-2a09253e352a github.com/fjl/memsize v0.0.0-20180929194037-2a09253e352a
github.com/garslo/gogen v0.0.0-20170307003452-d6ebae628c7c // indirect
github.com/golang/mock v1.3.1 github.com/golang/mock v1.3.1
github.com/golang/protobuf v1.3.1 github.com/golang/protobuf v1.3.2
github.com/golangci/golangci-lint v1.17.1 github.com/golangci/golangci-lint v1.17.1
github.com/gorilla/handlers v1.4.0 github.com/gorilla/handlers v1.4.0
github.com/gorilla/mux v1.7.2 github.com/gorilla/mux v1.7.2
github.com/harmony-ek/gencodec v0.0.0-20190215044613-e6740dbdd846 github.com/harmony-ek/gencodec v0.0.0-20190215044613-e6740dbdd846
github.com/harmony-one/bls v0.0.5 github.com/harmony-one/bls v0.0.5
github.com/harmony-one/go-sdk v0.0.0-20191105223634-37bdcedd4a9d
github.com/harmony-one/taggedrlp v0.1.2 github.com/harmony-one/taggedrlp v0.1.2
github.com/harmony-one/vdf v0.0.0-20190924175951-620379da8849 github.com/harmony-one/vdf v1.0.0
github.com/hashicorp/golang-lru v0.5.1 github.com/hashicorp/golang-lru v0.5.3
github.com/iancoleman/strcase v0.0.0-20190422225806-e506e3ef7365 github.com/iancoleman/strcase v0.0.0-20190422225806-e506e3ef7365
github.com/ipfs/go-ds-badger v0.0.5 github.com/ipfs/go-ds-badger v0.0.5
github.com/ipfs/go-log v0.0.1 github.com/ipfs/go-log v0.0.1
github.com/karalabe/hid v1.0.0 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/libp2p/go-libp2p v0.3.1 github.com/libp2p/go-libp2p v0.3.1
github.com/libp2p/go-libp2p-core v0.2.2 github.com/libp2p/go-libp2p-core v0.2.2
github.com/libp2p/go-libp2p-crypto v0.1.0 github.com/libp2p/go-libp2p-crypto v0.1.0
@ -46,27 +42,18 @@ require (
github.com/natefinch/lumberjack v2.0.0+incompatible github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/pborman/uuid v1.2.0 github.com/pborman/uuid v1.2.0
github.com/pkg/errors v0.8.1 github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v0.9.2 github.com/prometheus/client_golang v1.1.0
github.com/prometheus/common v0.4.1 // indirect
github.com/prometheus/procfs v0.0.3 // indirect
github.com/rjeczalik/notify v0.9.2 github.com/rjeczalik/notify v0.9.2
github.com/rs/cors v1.7.0 // indirect
github.com/rs/zerolog v1.14.3 github.com/rs/zerolog v1.14.3
github.com/shirou/gopsutil v2.18.12+incompatible github.com/shirou/gopsutil v2.18.12+incompatible
github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a // indirect
github.com/spf13/cobra v0.0.5 github.com/spf13/cobra v0.0.5
github.com/stretchr/testify v1.3.0 github.com/stretchr/testify v1.3.0
github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965 github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965
github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc
golang.org/x/crypto v0.0.0-20190618222545-ea8f1a30c443 golang.org/x/crypto v0.0.0-20190909091759-094676da4a83
golang.org/x/lint v0.0.0-20190409202823-959b441ac422 golang.org/x/lint v0.0.0-20190409202823-959b441ac422
golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3 // indirect
golang.org/x/tools v0.0.0-20190924052046-3ac2a5bbd98a golang.org/x/tools v0.0.0-20190924052046-3ac2a5bbd98a
google.golang.org/appengine v1.4.0 // indirect google.golang.org/grpc v1.23.1
google.golang.org/grpc v1.22.0
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127
gopkg.in/ini.v1 v1.42.0 gopkg.in/ini.v1 v1.42.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
gopkg.in/urfave/cli.v1 v1.20.0 // indirect
) )

@ -277,18 +277,14 @@ func (node *Node) tryBroadcast(tx *types.Transaction) {
// Add new transactions to the pending transaction list. // Add new transactions to the pending transaction list.
func (node *Node) addPendingTransactions(newTxs types.Transactions) { func (node *Node) addPendingTransactions(newTxs types.Transactions) {
txPoolLimit := core.ShardingSchedule.MaxTxPoolSizeLimit()
node.pendingTxMutex.Lock() node.pendingTxMutex.Lock()
for _, tx := range newTxs {
if _, ok := node.pendingTransactions[tx.Hash()]; !ok { node.TxPool.AddRemotes(newTxs)
node.pendingTransactions[tx.Hash()] = tx
}
if len(node.pendingTransactions) > txPoolLimit {
break
}
}
node.pendingTxMutex.Unlock() node.pendingTxMutex.Unlock()
utils.Logger().Info().Int("length of newTxs", len(newTxs)).Int("totalPending", len(node.pendingTransactions)).Msg("Got more transactions")
pendingCount, queueCount := node.TxPool.Stats()
utils.Logger().Info().Int("length of newTxs", len(newTxs)).Int("totalPending", pendingCount).Int("totalQueued", queueCount).Msg("Got more transactions")
} }
// Add new staking transactions to the pending staking transaction list. // Add new staking transactions to the pending staking transaction list.
@ -304,7 +300,7 @@ func (node *Node) addPendingStakingTransactions(newStakingTxs staking.StakingTra
} }
} }
node.pendingStakingTxMutex.Unlock() node.pendingStakingTxMutex.Unlock()
utils.Logger().Info().Int("length of newStakingTxs", len(newStakingTxs)).Int("totalPending", len(node.pendingTransactions)).Msg("Got more staking transactions") utils.Logger().Info().Int("length of newStakingTxs", len(newStakingTxs)).Int("totalPending", len(node.pendingStakingTransactions)).Msg("Got more staking transactions")
} }
// AddPendingStakingTransaction staking transactions // AddPendingStakingTransaction staking transactions
@ -322,7 +318,6 @@ func (node *Node) AddPendingTransaction(newTx *types.Transaction) {
utils.Logger().Info().Str("Hash", newTx.Hash().Hex()).Msg("Broadcasting Tx") utils.Logger().Info().Str("Hash", newTx.Hash().Hex()).Msg("Broadcasting Tx")
node.tryBroadcast(newTx) node.tryBroadcast(newTx)
} }
utils.Logger().Debug().Int("totalPending", len(node.pendingTransactions)).Msg("Got ONE more transaction")
} }
// AddPendingReceipts adds one receipt message to pending list. // AddPendingReceipts adds one receipt message to pending list.
@ -347,70 +342,6 @@ func (node *Node) AddPendingReceipts(receipts *types.CXReceiptsProof) {
utils.Logger().Info().Int("totalPendingReceipts", len(node.pendingCXReceipts)).Msg("Got ONE more receipt message") utils.Logger().Info().Int("totalPendingReceipts", len(node.pendingCXReceipts)).Msg("Got ONE more receipt message")
} }
// Take out a subset of valid transactions from the pending transaction list
// Note the pending transaction list will then contain the rest of the txs
func (node *Node) getTransactionsForNewBlock(coinbase common.Address) (types.Transactions, staking.StakingTransactions) {
txsThrottleConfig := core.ShardingSchedule.TxsThrottleConfig()
// the next block number to be added in consensus protocol, which is always one more than current chain header block
newBlockNum := node.Blockchain().CurrentBlock().NumberU64() + 1
// remove old (> txsThrottleConfigRecentTxDuration) blockNum keys from recentTxsStats and initiailize for the new block
for blockNum := range node.recentTxsStats {
recentTxsBlockNumGap := uint64(txsThrottleConfig.RecentTxDuration / node.BlockPeriod)
if recentTxsBlockNumGap < newBlockNum-blockNum {
delete(node.recentTxsStats, blockNum)
}
}
node.recentTxsStats[newBlockNum] = make(types.BlockTxsCounts)
// Must update to the correct current state before processing potential txns
if err := node.Worker.UpdateCurrent(coinbase); err != nil {
utils.Logger().Error().
Err(err).
Msg("Failed updating worker's state before txn selection")
return types.Transactions{}, staking.StakingTransactions{}
}
node.pendingTxMutex.Lock()
defer node.pendingTxMutex.Unlock()
node.pendingStakingTxMutex.Lock()
defer node.pendingStakingTxMutex.Unlock()
pendingTransactions := types.Transactions{}
pendingStakingTransactions := staking.StakingTransactions{}
for _, tx := range node.pendingTransactions {
pendingTransactions = append(pendingTransactions, tx)
}
for _, tx := range node.pendingStakingTransactions {
pendingStakingTransactions = append(pendingStakingTransactions, tx)
}
selected, unselected, invalid := node.Worker.SelectTransactionsForNewBlock(newBlockNum, pendingTransactions, node.recentTxsStats, txsThrottleConfig, coinbase)
selectedStaking, unselectedStaking, invalidStaking :=
node.Worker.SelectStakingTransactionsForNewBlock(newBlockNum, pendingStakingTransactions, coinbase)
node.pendingTransactions = make(map[common.Hash]*types.Transaction)
for _, unselectedTx := range unselected {
node.pendingTransactions[unselectedTx.Hash()] = unselectedTx
}
utils.Logger().Info().
Int("remainPending", len(node.pendingTransactions)).
Int("selected", len(selected)).
Int("invalidDiscarded", len(invalid)).
Msg("Selecting Transactions")
node.pendingStakingTransactions = make(map[common.Hash]*staking.StakingTransaction)
for _, unselectedStakingTx := range unselectedStaking {
node.pendingStakingTransactions[unselectedStakingTx.Hash()] = unselectedStakingTx
}
utils.Logger().Info().
Int("remainPending", len(node.pendingStakingTransactions)).
Int("selected", len(unselectedStaking)).
Int("invalidDiscarded", len(invalidStaking)).
Msg("Selecting Staking Transactions")
return selected, selectedStaking
}
func (node *Node) startRxPipeline( func (node *Node) startRxPipeline(
receiver p2p.GroupReceiver, queue *msgq.Queue, numWorkers int, receiver p2p.GroupReceiver, queue *msgq.Queue, numWorkers int,
) { ) {

@ -3,6 +3,9 @@ package node
import ( import (
"testing" "testing"
"github.com/harmony-one/harmony/core/types"
types2 "github.com/harmony-one/harmony/staking/types"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/consensus/quorum" "github.com/harmony-one/harmony/consensus/quorum"
@ -33,8 +36,9 @@ func TestAddNewBlock(t *testing.T) {
nodeconfig.SetNetworkType(nodeconfig.Devnet) nodeconfig.SetNetworkType(nodeconfig.Devnet)
node := New(host, consensus, testDBFactory, false) node := New(host, consensus, testDBFactory, false)
selectedTxs, stks := node.getTransactionsForNewBlock(common.Address{}) txs := make(map[common.Address]types.Transactions)
node.Worker.CommitTransactions(selectedTxs, stks, common.Address{}) stks := types2.StakingTransactions{}
node.Worker.CommitTransactions(txs, stks, common.Address{})
block, _ := node.Worker.FinalizeNewBlock([]byte{}, []byte{}, 0, common.Address{}, nil, nil) block, _ := node.Worker.FinalizeNewBlock([]byte{}, []byte{}, 0, common.Address{}, nil, nil)
err = node.AddNewBlock(block) err = node.AddNewBlock(block)
@ -65,8 +69,9 @@ func TestVerifyNewBlock(t *testing.T) {
} }
node := New(host, consensus, testDBFactory, false) node := New(host, consensus, testDBFactory, false)
selectedTxs, stks := node.getTransactionsForNewBlock(common.Address{}) txs := make(map[common.Address]types.Transactions)
node.Worker.CommitTransactions(selectedTxs, stks, common.Address{}) stks := types2.StakingTransactions{}
node.Worker.CommitTransactions(txs, stks, common.Address{})
block, _ := node.Worker.FinalizeNewBlock([]byte{}, []byte{}, 0, common.Address{}, nil, nil) block, _ := node.Worker.FinalizeNewBlock([]byte{}, []byte{}, 0, common.Address{}, nil, nil)
if err := node.VerifyNewBlock(block); err != nil { if err := node.VerifyNewBlock(block); err != nil {

@ -5,6 +5,8 @@ import (
"sort" "sort"
"time" "time"
types2 "github.com/harmony-one/harmony/staking/types"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/types"
@ -80,10 +82,20 @@ func (node *Node) proposeNewBlock() (*types.Block, error) {
// Update worker's current header and state data in preparation to propose/process new transactions // Update worker's current header and state data in preparation to propose/process new transactions
coinbase := node.Consensus.SelfAddress coinbase := node.Consensus.SelfAddress
// Prepare transactions including staking transactions // Prepare transactions including staking transactions\
selectedTxs, selectedStakingTxs := node.getTransactionsForNewBlock(coinbase) pending, err := node.TxPool.Pending()
if err != nil {
utils.Logger().Err(err).Msg("Failed to fetch pending transactions")
return nil, err
}
// TODO: integrate staking transaction into tx pool
pendingStakingTransactions := types2.StakingTransactions{}
for _, tx := range node.pendingStakingTransactions {
pendingStakingTransactions = append(pendingStakingTransactions, tx)
}
if err := node.Worker.CommitTransactions(selectedTxs, selectedStakingTxs, coinbase); err != nil { if err := node.Worker.CommitTransactions(pending, pendingStakingTransactions, coinbase); err != nil {
ctxerror.Log15(utils.GetLogger().Error, ctxerror.Log15(utils.GetLogger().Error,
ctxerror.New("cannot commit transactions"). ctxerror.New("cannot commit transactions").
WithCause(err)) WithCause(err))

@ -5,6 +5,8 @@ import (
"math/big" "math/big"
"time" "time"
"github.com/harmony-one/go-sdk/pkg/address"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/block" "github.com/harmony-one/harmony/block"
@ -25,6 +27,8 @@ import (
// environment is the worker's current environment and holds all of the current state information. // environment is the worker's current environment and holds all of the current state information.
type environment struct { type environment struct {
signer types.Signer
state *state.DB // apply state changes here state *state.DB // apply state changes here
gasPool *core.GasPool // available gas used to pack transactions gasPool *core.GasPool // available gas used to pack transactions
@ -92,107 +96,96 @@ func (w *Worker) throttleTxs(selected types.Transactions, recentTxsStats types.R
return sender, shardingconfig.TxSelect return sender, shardingconfig.TxSelect
} }
// SelectTransactionsForNewBlock selects transactions for new block. // CommitTransactions commits transactions for new block.
func (w *Worker) SelectTransactionsForNewBlock(newBlockNum uint64, txs types.Transactions, recentTxsStats types.RecentTxsStats, txsThrottleConfig *shardingconfig.TxsThrottleConfig, coinbase common.Address) (types.Transactions, types.Transactions, types.Transactions) { func (w *Worker) CommitTransactions(pendingNormal map[common.Address]types.Transactions, pendingStaking staking.StakingTransactions, coinbase common.Address) error {
if w.current.gasPool == nil { if w.current.gasPool == nil {
w.current.gasPool = new(core.GasPool).AddGas(w.current.header.GasLimit()) w.current.gasPool = new(core.GasPool).AddGas(w.current.header.GasLimit())
} }
selected := types.Transactions{} txs := types.NewTransactionsByPriceAndNonce(w.current.signer, pendingNormal)
unselected := types.Transactions{}
invalid := types.Transactions{} var coalescedLogs []*types.Log
for _, tx := range txs {
if tx.ShardID() != w.chain.ShardID() {
invalid = append(invalid, tx)
continue
}
// NORMAL
for {
// If we don't have enough gas for any further transactions then we're done // If we don't have enough gas for any further transactions then we're done
if w.current.gasPool.Gas() < params.TxGas { if w.current.gasPool.Gas() < params.TxGas {
utils.Logger().Info().Str("Not enough gas for further transactions, have", w.current.gasPool.String()).Uint64("want", params.TxGas) utils.Logger().Info().Uint64("have", w.current.gasPool.Gas()).Uint64("want", params.TxGas).Msg("Not enough gas for further transactions")
unselected = append(unselected, tx) break
}
// Retrieve the next transaction and abort if all done
tx := txs.Peek()
if tx == nil {
break
}
// Error may be ignored here. The error has already been checked
// during transaction acceptance is the transaction pool.
//
// We use the eip155 signer regardless of the current hf.
from, _ := types.Sender(w.current.signer, tx)
// Check whether the tx is replay protected. If we're not in the EIP155 hf
// phase, start ignoring the sender until we do.
if tx.Protected() && !w.config.IsEIP155(w.current.header.Number()) {
utils.Logger().Info().Str("hash", tx.Hash().Hex()).Str("eip155Epoch", w.config.EIP155Epoch.String()).Msg("Ignoring reply protected transaction")
txs.Pop()
continue continue
} }
// Start executing the transaction
w.current.state.Prepare(tx.Hash(), common.Hash{}, len(w.current.txs))
sender, flag := w.throttleTxs(selected, recentTxsStats, txsThrottleConfig, tx) if tx.ShardID() != w.chain.ShardID() {
switch flag { txs.Shift()
case shardingconfig.TxUnselect: continue
unselected = append(unselected, tx)
case shardingconfig.TxInvalid:
invalid = append(invalid, tx)
case shardingconfig.TxSelect:
if tx.GasPrice().Uint64() == 0 {
invalid = append(invalid, tx)
} else {
snap := w.current.state.Snapshot()
_, err := w.commitTransaction(tx, coinbase)
if err != nil {
w.current.state.RevertToSnapshot(snap)
invalid = append(invalid, tx)
utils.Logger().Error().Err(err).Str("txId", tx.Hash().Hex()).Msg("Commit transaction error")
} else {
selected = append(selected, tx)
// handle the case when msg was not able to extracted from tx
if len(sender.String()) > 0 {
recentTxsStats[newBlockNum][sender]++
}
}
}
} }
// log invalid or unselected txs logs, err := w.commitTransaction(tx, coinbase)
if flag == shardingconfig.TxUnselect || flag == shardingconfig.TxInvalid { switch err {
utils.Logger().Info().Str("txId", tx.Hash().Hex()).Str("txThrottleFlag", flag.String()).Msg("Transaction Throttle flag") case core.ErrGasLimitReached:
// Pop the current out-of-gas transaction without shifting in the next from the account
utils.Logger().Info().Str("sender", address.ToBech32(from)).Msg("Gas limit exceeded for current block")
txs.Pop()
case core.ErrNonceTooLow:
// New head notification data race between the transaction pool and miner, shift
utils.Logger().Info().Str("sender", address.ToBech32(from)).Uint64("nonce", tx.Nonce()).Msg("Skipping transaction with low nonce")
txs.Shift()
case core.ErrNonceTooHigh:
// Reorg notification data race between the transaction pool and miner, skip account =
utils.Logger().Info().Str("sender", address.ToBech32(from)).Uint64("nonce", tx.Nonce()).Msg("Skipping account with high nonce")
txs.Pop()
case nil:
// Everything ok, collect the logs and shift in the next transaction from the same account
coalescedLogs = append(coalescedLogs, logs...)
txs.Shift()
default:
// Strange error, discard the transaction and get the next in line (note, the
// nonce-too-high clause will prevent us from executing in vain).
utils.Logger().Info().Str("hash", tx.Hash().Hex()).AnErr("err", err).Msg("Transaction failed, account skipped")
txs.Shift()
} }
utils.Logger().Info().Str("txId", tx.Hash().Hex()).Uint64("txGasLimit", tx.Gas()).Msg("Transaction gas limit info")
}
utils.Logger().Info().Uint64("newBlockNum", newBlockNum).Int("newTxns", len(selected)).Uint64("blockGasLimit", w.current.header.GasLimit()).Uint64("blockGasUsed", w.current.header.GasUsed()).Msg("Block gas limit and usage info")
return selected, unselected, invalid
}
// SelectStakingTransactionsForNewBlock selects staking transactions for new block.
func (w *Worker) SelectStakingTransactionsForNewBlock(
newBlockNum uint64, txs staking.StakingTransactions,
coinbase common.Address) (staking.StakingTransactions, staking.StakingTransactions, staking.StakingTransactions) {
// only beaconchain process staking transaction
if w.chain.ShardID() != values.BeaconChainShardID {
utils.Logger().Warn().Msgf("Invalid shardID: %v", w.chain.ShardID())
return nil, nil, nil
} }
if w.current.gasPool == nil { // STAKING - only beaconchain process staking transaction
w.current.gasPool = new(core.GasPool).AddGas(w.current.header.GasLimit()) if w.chain.ShardID() == values.BeaconChainShardID {
} for _, tx := range pendingStaking {
logs, err := w.commitStakingTransaction(tx, coinbase)
selected := staking.StakingTransactions{} if err != nil {
// TODO: chao add total gas fee checking when needed utils.Logger().Error().Err(err).Str("stakingTxId", tx.Hash().Hex()).Msg("Commit staking transaction error")
unselected := staking.StakingTransactions{} } else {
invalid := staking.StakingTransactions{} coalescedLogs = append(coalescedLogs, logs...)
for _, tx := range txs { utils.Logger().Info().Str("stakingTxId", tx.Hash().Hex()).Uint64("txGasLimit", tx.Gas()).Msg("StakingTransaction gas limit info")
snap := w.current.state.Snapshot() }
_, err := w.commitStakingTransaction(tx, coinbase)
if err != nil {
w.current.state.RevertToSnapshot(snap)
invalid = append(invalid, tx)
utils.Logger().Error().Err(err).Str("stakingTxId", tx.Hash().Hex()).Msg("Commit staking transaction error")
} else {
selected = append(selected, tx)
utils.Logger().Info().Str("stakingTxId", tx.Hash().Hex()).Uint64("txGasLimit", tx.Gas()).Msg("StakingTransaction gas limit info")
} }
} }
utils.Logger().Info().Uint64("newBlockNum", newBlockNum).Uint64("blockGasLimit", utils.Logger().Info().Int("newTxns", len(w.current.txs)).Uint64("blockGasLimit", w.current.header.GasLimit()).Uint64("blockGasUsed", w.current.header.GasUsed()).Msg("Block gas limit and usage info")
w.current.header.GasLimit()).Uint64("blockGasUsed",
w.current.header.GasUsed()).Msg("[SelectStakingTransaction] Block gas limit and usage info")
return selected, unselected, invalid return nil
} }
func (w *Worker) commitStakingTransaction(tx *staking.StakingTransaction, coinbase common.Address) ([]*types.Log, error) { func (w *Worker) commitStakingTransaction(tx *staking.StakingTransaction, coinbase common.Address) ([]*types.Log, error) {
@ -238,40 +231,6 @@ func (w *Worker) commitTransaction(tx *types.Transaction, coinbase common.Addres
return receipt.Logs, nil return receipt.Logs, nil
} }
// CommitTransactions commits transactions.
func (w *Worker) CommitTransactions(txs types.Transactions, stakingTxns staking.StakingTransactions, coinbase common.Address) error {
// Must update to the correct current state before processing potential txns
if err := w.UpdateCurrent(coinbase); err != nil {
utils.Logger().Error().
Err(err).
Msg("Failed updating worker's state before committing txns")
return err
}
if w.current.gasPool == nil {
w.current.gasPool = new(core.GasPool).AddGas(w.current.header.GasLimit())
}
for _, tx := range txs {
snap := w.current.state.Snapshot()
_, err := w.commitTransaction(tx, coinbase)
if err != nil {
w.current.state.RevertToSnapshot(snap)
return err
}
}
for _, tx := range stakingTxns {
snap := w.current.state.Snapshot()
_, err := w.commitStakingTransaction(tx, coinbase)
if err != nil {
w.current.state.RevertToSnapshot(snap)
return err
}
}
return nil
}
// CommitReceipts commits a list of already verified incoming cross shard receipts // CommitReceipts commits a list of already verified incoming cross shard receipts
func (w *Worker) CommitReceipts(receiptsList []*types.CXReceiptsProof) error { func (w *Worker) CommitReceipts(receiptsList []*types.CXReceiptsProof) error {
if w.current.gasPool == nil { if w.current.gasPool == nil {
@ -322,6 +281,7 @@ func (w *Worker) makeCurrent(parent *types.Block, header *block.Header) error {
return err return err
} }
env := &environment{ env := &environment{
signer: types.NewEIP155Signer(w.config.ChainID),
state: state, state: state,
header: header, header: header,
} }

@ -5,6 +5,8 @@ import (
"math/rand" "math/rand"
"testing" "testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
blockfactory "github.com/harmony-one/harmony/block/factory" blockfactory "github.com/harmony-one/harmony/block/factory"
@ -74,7 +76,9 @@ func TestCommitTransactions(t *testing.T) {
tx, _ := types.SignTx(types.NewTransaction(baseNonce, testBankAddress, uint32(0), big.NewInt(int64(denominations.One*randAmount)), params.TxGas, nil, nil), types.HomesteadSigner{}, testBankKey) tx, _ := types.SignTx(types.NewTransaction(baseNonce, testBankAddress, uint32(0), big.NewInt(int64(denominations.One*randAmount)), params.TxGas, nil, nil), types.HomesteadSigner{}, testBankKey)
// Commit the tx to the worker // Commit the tx to the worker
err := worker.CommitTransactions(types.Transactions{tx}, nil, testBankAddress) txs := make(map[common.Address]types.Transactions)
txs[testBankAddress] = types.Transactions{tx}
err := worker.CommitTransactions(txs, nil, testBankAddress)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }

@ -125,7 +125,11 @@ func fundFaucetContract(chain *core.BlockChain) {
amount := 720000 amount := 720000
tx, _ := types.SignTx(types.NewTransaction(nonce+uint64(4), StakingAddress, 0, big.NewInt(int64(amount)), params.TxGas, nil, nil), types.HomesteadSigner{}, FaucetPriKey) tx, _ := types.SignTx(types.NewTransaction(nonce+uint64(4), StakingAddress, 0, big.NewInt(int64(amount)), params.TxGas, nil, nil), types.HomesteadSigner{}, FaucetPriKey)
txs = append(txs, tx) txs = append(txs, tx)
err := contractworker.CommitTransactions(txs, nil, testUserAddress)
txmap := make(map[common.Address]types.Transactions)
txmap[FaucetAddress] = txs
err := contractworker.CommitTransactions(txmap, nil, testUserAddress)
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
} }
@ -163,7 +167,10 @@ func callFaucetContractToFundAnAddress(chain *core.BlockChain) {
callEnc = append(callEnc, paddedAddress...) callEnc = append(callEnc, paddedAddress...)
callfaucettx, _ := types.SignTx(types.NewTransaction(nonce+uint64(5), faucetContractAddress, 0, big.NewInt(0), params.TxGasContractCreation*10, nil, callEnc), types.HomesteadSigner{}, FaucetPriKey) callfaucettx, _ := types.SignTx(types.NewTransaction(nonce+uint64(5), faucetContractAddress, 0, big.NewInt(0), params.TxGasContractCreation*10, nil, callEnc), types.HomesteadSigner{}, FaucetPriKey)
err = contractworker.CommitTransactions(types.Transactions{callfaucettx}, nil, testUserAddress) txmap := make(map[common.Address]types.Transactions)
txmap[FaucetAddress] = types.Transactions{callfaucettx}
err = contractworker.CommitTransactions(txmap, nil, testUserAddress)
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
} }
@ -192,143 +199,6 @@ func playFaucetContract(chain *core.BlockChain) {
callFaucetContractToFundAnAddress(chain) callFaucetContractToFundAnAddress(chain)
} }
func playSetupStakingContract(chain *core.BlockChain) {
fmt.Println()
fmt.Println("--------- ************************** ---------")
fmt.Println()
fmt.Println("--------- Now Setting up Staking Contract ---------")
fmt.Println()
//worker := pkgworker.New(params.TestChainConfig, chain, consensus.NewFaker(), crypto.PubkeyToAddress(StakingPriKey.PublicKey), 0)
state = contractworker.GetCurrentState()
fmt.Println("Before Staking Balances")
fmt.Println("user address balance")
fmt.Println(state.GetBalance(allRandomUserAddress[0]))
fmt.Println("The balances for 2 more users:")
fmt.Println(state.GetBalance(allRandomUserAddress[1]))
fmt.Println(state.GetBalance(allRandomUserAddress[2]))
nonce = contractworker.GetCurrentState().GetNonce(crypto.PubkeyToAddress(StakingPriKey.PublicKey))
dataEnc = common.FromHex(StakingContractBinary)
stx, _ := types.SignTx(types.NewContractCreation(nonce, 0, big.NewInt(0), params.TxGasContractCreation*10, nil, dataEnc), types.HomesteadSigner{}, StakingPriKey)
stakingtxns = append(stakingtxns, stx)
stakeContractAddress = crypto.CreateAddress(StakingAddress, nonce+uint64(0))
state = contractworker.GetCurrentState()
fmt.Println("stake contract balance :")
fmt.Println(state.GetBalance(stakeContractAddress))
fmt.Println("stake address balance :")
fmt.Println(state.GetBalance(StakingAddress))
}
func playStaking(chain *core.BlockChain) {
depositFnSignature := []byte("deposit()")
fnHash := hash.Keccak256(depositFnSignature)
methodID := fnHash[:4]
var callEncl []byte
callEncl = append(callEncl, methodID...)
stake := 100000
fmt.Println()
fmt.Println("--------- Staking Contract added to txns ---------")
fmt.Println()
fmt.Printf("-- Now Staking with stake: %d --\n", stake)
fmt.Println()
for i := 0; i <= 2; i++ {
//Deposit Does not take a argument, stake is transferred via amount.
tx, _ := types.SignTx(types.NewTransaction(0, stakeContractAddress, 0, big.NewInt(int64(stake)), params.TxGas*5, nil, callEncl), types.HomesteadSigner{}, allRandomUserKey[i])
stakingtxns = append(stakingtxns, tx)
}
err = contractworker.CommitTransactions(stakingtxns, nil, common.Address{})
if err != nil {
fmt.Println(err)
}
block, _ := contractworker.FinalizeNewBlock([]byte{}, []byte{}, 0, common.Address{}, nil, nil)
_, err = chain.InsertChain(types.Blocks{block}, true /* verifyHeaders */)
if err != nil {
fmt.Println(err)
}
// receipts := contractworker.GetCurrentReceipts()
// fmt.Println(receipts[len(receipts)-4].ContractAddress)
state = contractworker.GetCurrentState()
fmt.Printf("After Staking Balances (should be less by %d)\n", stake)
fmt.Println("user address balance")
fmt.Println(state.GetBalance(allRandomUserAddress[0]))
fmt.Println("The balances for 2 more users:")
fmt.Println(state.GetBalance(allRandomUserAddress[1]))
fmt.Println(state.GetBalance(allRandomUserAddress[2]))
fmt.Println("faucet contract balance (unchanged):")
fmt.Println(state.GetBalance(faucetContractAddress))
fmt.Println("stake contract balance :")
fmt.Println(state.GetBalance(stakeContractAddress))
fmt.Println("stake address balance :")
fmt.Println(state.GetBalance(StakingAddress))
}
func playWithdrawStaking(chain *core.BlockChain) {
fmt.Println()
fmt.Println("--------- Now Setting up Withdrawing Stakes ---------")
withdrawFnSignature := []byte("withdraw(uint256)")
fnHash := hash.Keccak256(withdrawFnSignature)
methodID := fnHash[:4]
withdraw := "5000"
withdrawstake := new(big.Int)
withdrawstake.SetString(withdraw, 10)
paddedAmount := common.LeftPadBytes(withdrawstake.Bytes(), 32)
var dataEncl []byte
dataEncl = append(dataEncl, methodID...)
dataEncl = append(dataEncl, paddedAmount...)
var withdrawstakingtxns []*types.Transaction
fmt.Println()
fmt.Printf("-- Withdrawing Stake by amount: %s --\n", withdraw)
fmt.Println()
for i := 0; i <= 2; i++ {
cnonce := contractworker.GetCurrentState().GetNonce(allRandomUserAddress[i])
tx, _ := types.SignTx(types.NewTransaction(cnonce, stakeContractAddress, 0, big.NewInt(0), params.TxGas*5, nil, dataEncl), types.HomesteadSigner{}, allRandomUserKey[i])
withdrawstakingtxns = append(withdrawstakingtxns, tx)
}
err = contractworker.CommitTransactions(withdrawstakingtxns, nil, common.Address{})
if err != nil {
fmt.Println("error:")
fmt.Println(err)
}
block, _ := contractworker.FinalizeNewBlock([]byte{}, []byte{}, 0, common.Address{}, nil, nil)
_, err = chain.InsertChain(types.Blocks{block}, true /* verifyHeaders */)
if err != nil {
fmt.Println(err)
}
state = contractworker.GetCurrentState()
fmt.Printf("Withdraw Staking Balances (should be up by %s)\n", withdraw)
fmt.Println(state.GetBalance(allRandomUserAddress[0]))
fmt.Println(state.GetBalance(allRandomUserAddress[1]))
fmt.Println(state.GetBalance(allRandomUserAddress[2]))
fmt.Println("faucet contract balance (unchanged):")
fmt.Println(state.GetBalance(faucetContractAddress))
fmt.Printf("stake contract balance (should downup by %s)\n", withdraw)
fmt.Println(state.GetBalance(stakeContractAddress))
fmt.Println("stake address balance :")
fmt.Println(state.GetBalance(StakingAddress))
}
func playStakingContract(chain *core.BlockChain) {
playSetupStakingContract(chain)
playStaking(chain)
playWithdrawStaking(chain)
}
func main() { func main() {
genesis := gspec.MustCommit(database) genesis := gspec.MustCommit(database)
chain, _ := core.NewBlockChain(database, nil, gspec.Config, chain.Engine(), vm.Config{}, nil) chain, _ := core.NewBlockChain(database, nil, gspec.Config, chain.Engine(), vm.Config{}, nil)
@ -356,5 +226,4 @@ func main() {
} }
playFaucetContract(chain) playFaucetContract(chain)
playStakingContract(chain)
} }

Loading…
Cancel
Save