Revert merge normal and staking txns (#2895)

* Revert merge normal and staking txns

* fix lint

* fix build
pull/2896/head
Rongjian Lan 5 years ago committed by GitHub
parent 918dec8cbe
commit 8fde77d88f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 107
      core/state_processor.go
  2. 85
      core/types/transaction.go
  3. 6
      core/types/transaction_test.go
  4. 108
      core/types/tx_pool.go
  5. 11
      node/node_handler_test.go
  6. 29
      node/node_newblock.go
  7. 75
      node/worker/worker.go
  8. 6
      node/worker/worker_test.go
  9. 14
      test/chain/main.go

@ -85,92 +85,33 @@ func (p *StateProcessor) Process(
return nil, nil, nil, 0, nil, err
}
// Signer specific for normal transactions
// For staking tx directly available from tx
signer := types.MakeSigner(p.config, header.Epoch())
poolTransactions := map[common.Address]types.PoolTransactions{}
// to account for processing transactions in the existing blocks before prestaking
if !p.config.IsPreStaking(header.Epoch()) {
// Iterate over and process the individual transactions
for i, tx := range block.Transactions() {
statedb.Prepare(tx.Hash(), block.Hash(), i)
receipt, cxReceipt, _, err := ApplyTransaction(
p.config, p.bc, &beneficiary, gp, statedb, header, tx, usedGas, cfg,
)
if err != nil {
return nil, nil, nil, 0, nil, err
}
receipts = append(receipts, receipt)
if cxReceipt != nil {
outcxs = append(outcxs, cxReceipt)
}
allLogs = append(allLogs, receipt.Logs...)
}
} else {
// Iterate over and add normal transactions to poolTransactions
for _, tx := range block.Transactions() {
from, err := types.Sender(signer, tx)
if err != nil {
return nil, nil, nil, 0, nil, err
}
poolTransactions[from] = append(poolTransactions[from], tx)
// Iterate over and process the individual transactions
for i, tx := range block.Transactions() {
statedb.Prepare(tx.Hash(), block.Hash(), i)
receipt, cxReceipt, _, err := ApplyTransaction(
p.config, p.bc, &beneficiary, gp, statedb, header, tx, usedGas, cfg,
)
if err != nil {
return nil, nil, nil, 0, nil, err
}
// Iterate over and add staking transactions to poolTransactions
for _, tx := range block.StakingTransactions() {
from, err := tx.SenderAddress()
if err != nil {
return nil, nil, nil, 0, nil, err
}
poolTransactions[from] = append(poolTransactions[from], tx)
receipts = append(receipts, receipt)
if cxReceipt != nil {
outcxs = append(outcxs, cxReceipt)
}
txs := types.NewPoolTransactionsByPriceAndNonce(signer, poolTransactions)
dbIndex := 0
// NORMAL
for {
// Retrieve the next transaction and abort if all done
tx := txs.Peek()
if tx == nil {
break
}
statedb.Prepare(tx.Hash(), block.Hash(), dbIndex)
dbIndex = dbIndex + 1
var (
err error
receipt *types.Receipt
cxReceipt *types.CXReceipt
)
if plainTx, ok := tx.(*types.Transaction); ok {
if receipt, cxReceipt, _, err = ApplyTransaction(
p.config, p.bc, &beneficiary, gp, statedb, header, plainTx, usedGas, cfg,
); err != nil {
return nil, nil, nil, 0, nil, err
}
if cxReceipt != nil {
outcxs = append(outcxs, cxReceipt)
}
} else if stx, ok := tx.(*staking.StakingTransaction); ok {
if receipt, _, err = ApplyStakingTransaction(
p.config, p.bc, &beneficiary, gp, statedb, header, stx, usedGas, cfg,
); err != nil {
return nil, nil, nil, 0, nil, err
}
} else {
txs.Shift()
continue
}
receipts = append(receipts, receipt)
allLogs = append(allLogs, receipt.Logs...)
// Everything ok
txs.Shift()
allLogs = append(allLogs, receipt.Logs...)
}
// Iterate over and process the staking transactions
L := len(block.Transactions())
for i, tx := range block.StakingTransactions() {
statedb.Prepare(tx.Hash(), block.Hash(), i+L)
receipt, _, err := ApplyStakingTransaction(
p.config, p.bc, &beneficiary, gp, statedb, header, tx, usedGas, cfg,
)
if err != nil {
return nil, nil, nil, 0, nil, err
}
receipts = append(receipts, receipt)
allLogs = append(allLogs, receipt.Logs...)
}
// incomingReceipts should always be processed

@ -17,6 +17,7 @@
package types
import (
"container/heap"
"errors"
"fmt"
"io"
@ -483,6 +484,90 @@ func (s TxByNonce) Len() int { return len(s) }
func (s TxByNonce) Less(i, j int) bool { return s[i].data.AccountNonce < s[j].data.AccountNonce }
func (s TxByNonce) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
// TxByPrice implements both the sort and the heap interface, making it useful
// for all at once sorting as well as individually adding and removing elements.
type TxByPrice Transactions
func (s TxByPrice) Len() int { return len(s) }
func (s TxByPrice) Less(i, j int) bool { return s[i].data.Price.Cmp(s[j].data.Price) > 0 }
func (s TxByPrice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
// Push pushes a transaction.
func (s *TxByPrice) Push(x interface{}) {
*s = append(*s, x.(*Transaction))
}
// Pop pops a transaction.
func (s *TxByPrice) Pop() interface{} {
old := *s
n := len(old)
x := old[n-1]
*s = old[0 : n-1]
return x
}
// TransactionsByPriceAndNonce represents a set of transactions that can return
// transactions in a profit-maximizing sorted order, while supporting removing
// entire batches of transactions for non-executable accounts.
type TransactionsByPriceAndNonce struct {
txs map[common.Address]Transactions // Per account nonce-sorted list of transactions
heads TxByPrice // Next transaction for each unique account (price heap)
signer Signer // Signer for the set of transactions
}
// NewTransactionsByPriceAndNonce creates a transaction set that can retrieve
// price sorted transactions in a nonce-honouring way.
//
// Note, the input map is reowned so the caller should not interact any more with
// if after providing it to the constructor.
func NewTransactionsByPriceAndNonce(signer Signer, txs map[common.Address]Transactions) *TransactionsByPriceAndNonce {
// Initialize a price based heap with the head transactions
heads := make(TxByPrice, 0, len(txs))
for from, accTxs := range txs {
heads = append(heads, accTxs[0])
// Ensure the sender address is from the signer
acc, _ := Sender(signer, accTxs[0])
txs[acc] = accTxs[1:]
if from != acc {
delete(txs, from)
}
}
heap.Init(&heads)
// Assemble and return the transaction set
return &TransactionsByPriceAndNonce{
txs: txs,
heads: heads,
signer: signer,
}
}
// Peek returns the next transaction by price.
func (t *TransactionsByPriceAndNonce) Peek() *Transaction {
if len(t.heads) == 0 {
return nil
}
return t.heads[0]
}
// Shift replaces the current best head with the next one from the same account.
func (t *TransactionsByPriceAndNonce) Shift() {
acc, _ := Sender(t.signer, t.heads[0])
if txs, ok := t.txs[acc]; ok && len(txs) > 0 {
t.heads[0], t.txs[acc] = txs[0], txs[1:]
heap.Fix(&t.heads, 0)
} else {
heap.Pop(&t.heads)
}
}
// Pop removes the best transaction, *not* replacing it with the next one from
// the same account. This should be used when a transaction cannot be executed
// and hence all subsequent ones should be discarded from the same account.
func (t *TransactionsByPriceAndNonce) Pop() {
heap.Pop(&t.heads)
}
// Message is a fully derived transaction and implements core.Message
// NOTE: In a future PR this will be removed.
type Message struct {

@ -44,7 +44,7 @@ func TestTransactionPriceNonceSort(t *testing.T) {
signer := HomesteadSigner{}
// Generate a batch of transactions with overlapping values, but shifted nonces
groups := map[common.Address]PoolTransactions{}
groups := map[common.Address]Transactions{}
for start, key := range keys {
addr := crypto.PubkeyToAddress(key.PublicKey)
for i := 0; i < 25; i++ {
@ -53,11 +53,11 @@ func TestTransactionPriceNonceSort(t *testing.T) {
}
}
// Sort the transactions and cross check the nonce ordering
txset := NewPoolTransactionsByPriceAndNonce(signer, groups)
txset := NewTransactionsByPriceAndNonce(signer, groups)
txs := Transactions{}
for tx := txset.Peek(); tx != nil; tx = txset.Peek() {
txs = append(txs, tx.(*Transaction))
txs = append(txs, tx)
txset.Shift()
}
if len(txs) != 25*25 {

@ -1,11 +1,8 @@
package types
import (
"bytes"
"container/heap"
"io"
"math/big"
"sort"
"github.com/pkg/errors"
@ -105,108 +102,3 @@ type PoolTxByNonce PoolTransactions
func (s PoolTxByNonce) Len() int { return len(s) }
func (s PoolTxByNonce) Less(i, j int) bool { return (s[i]).Nonce() < (s[j]).Nonce() }
func (s PoolTxByNonce) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
// PoolTxByPrice implements both the sort and the heap interface, making it useful
// for all at once sorting as well as individually adding and removing elements.
type PoolTxByPrice PoolTransactions
func (s PoolTxByPrice) Len() int { return len(s) }
func (s PoolTxByPrice) Less(i, j int) bool { return s[i].GasPrice().Cmp(s[j].GasPrice()) > 0 }
func (s PoolTxByPrice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
// Push pushes a transaction.
func (s *PoolTxByPrice) Push(x interface{}) {
*s = append(*s, x.(*Transaction))
}
// Pop pops a transaction.
func (s *PoolTxByPrice) Pop() interface{} {
old := *s
n := len(old)
x := old[n-1]
*s = old[0 : n-1]
return x
}
// PoolTransactionsByPriceAndNonce represents a set of transactions that can return
// transactions in a profit-maximizing sorted order, while supporting removing
// entire batches of transactions for non-executable accounts.
type PoolTransactionsByPriceAndNonce struct {
txs map[common.Address]PoolTransactions // Per account nonce-sorted list of transactions
heads PoolTxByPrice // Next transaction for each unique account (price heap)
signer Signer // Signer for the set of transactions
}
// NewPoolTransactionsByPriceAndNonce creates a transaction set that can retrieve
// price sorted transactions in a nonce-honouring way.
//
// Note, the input map is reowned so the caller should not interact any more with
// if after providing it to the constructor.
func NewPoolTransactionsByPriceAndNonce(signer Signer, txs map[common.Address]PoolTransactions) *PoolTransactionsByPriceAndNonce {
// Initialize a price based heap with the head transactions
sortedAddrs := []common.Address{}
for from := range txs {
sortedAddrs = append(sortedAddrs, from)
}
sort.SliceStable(sortedAddrs, func(i, j int) bool {
return bytes.Compare(sortedAddrs[i].Bytes(), sortedAddrs[j].Bytes()) < 0
})
heads := make(PoolTxByPrice, 0, len(txs))
for _, from := range sortedAddrs {
accTxs := txs[from]
heads = append(heads, accTxs[0])
// Ensure the sender address is from the signer
var acc common.Address
if plainTx, ok := accTxs[0].(*Transaction); ok {
acc, _ = Sender(signer, plainTx)
} else if stx, ok := accTxs[0].(*staking.StakingTransaction); ok {
acc, _ = stx.SenderAddress()
}
txs[acc] = accTxs[1:]
if from != acc {
delete(txs, from)
}
}
heap.Init(&heads)
// Assemble and return the transaction set
return &PoolTransactionsByPriceAndNonce{
txs: txs,
heads: heads,
signer: signer,
}
}
// Peek returns the next transaction by price.
func (t *PoolTransactionsByPriceAndNonce) Peek() PoolTransaction {
if len(t.heads) == 0 {
return nil
}
return t.heads[0]
}
// Shift replaces the current best head with the next one from the same account.
func (t *PoolTransactionsByPriceAndNonce) Shift() {
var acc common.Address
if plainTx, ok := t.heads[0].(*Transaction); ok {
acc, _ = Sender(t.signer, plainTx)
} else if stx, ok := t.heads[0].(*staking.StakingTransaction); ok {
acc, _ = stx.SenderAddress()
}
if txs, ok := t.txs[acc]; ok && len(txs) > 0 {
t.heads[0], t.txs[acc] = txs[0], txs[1:]
heap.Fix(&t.heads, 0)
} else {
heap.Pop(&t.heads)
}
}
// Pop removes the best transaction, *not* replacing it with the next one from
// the same account. This should be used when a transaction cannot be executed
// and hence all subsequent ones should be discarded from the same account.
func (t *PoolTransactionsByPriceAndNonce) Pop() {
heap.Pop(&t.heads)
}

@ -13,6 +13,7 @@ import (
"github.com/harmony-one/harmony/multibls"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/shard"
staking "github.com/harmony-one/harmony/staking/types"
)
func TestAddNewBlock(t *testing.T) {
@ -36,9 +37,10 @@ func TestAddNewBlock(t *testing.T) {
nodeconfig.SetNetworkType(nodeconfig.Devnet)
node := New(host, consensus, testDBFactory, nil, false)
txs := make(map[common.Address]types.PoolTransactions)
txs := make(map[common.Address]types.Transactions)
stks := staking.StakingTransactions{}
node.Worker.CommitTransactions(
txs, common.Address{},
txs, stks, common.Address{},
)
block, _ := node.Worker.FinalizeNewBlock(
[]byte{}, []byte{}, 0, common.Address{}, nil, nil,
@ -74,9 +76,10 @@ func TestVerifyNewBlock(t *testing.T) {
}
node := New(host, consensus, testDBFactory, nil, false)
txs := make(map[common.Address]types.PoolTransactions)
txs := make(map[common.Address]types.Transactions)
stks := staking.StakingTransactions{}
node.Worker.CommitTransactions(
txs, common.Address{},
txs, stks, common.Address{},
)
block, _ := node.Worker.FinalizeNewBlock(
[]byte{}, []byte{}, 0, common.Address{}, nil, nil,

@ -6,6 +6,8 @@ import (
"strings"
"time"
staking "github.com/harmony-one/harmony/staking/types"
"github.com/ethereum/go-ethereum/common"
"github.com/harmony-one/harmony/core/rawdb"
"github.com/harmony-one/harmony/core/types"
@ -118,12 +120,35 @@ func (node *Node) proposeNewBlock() (*types.Block, error) {
utils.Logger().Err(err).Msg("Failed to fetch pending transactions")
return nil, err
}
pendingPlainTxs := map[common.Address]types.Transactions{}
pendingStakingTxs := staking.StakingTransactions{}
for addr, poolTxs := range pendingPoolTxs {
plainTxsPerAcc := types.Transactions{}
for _, tx := range poolTxs {
if plainTx, ok := tx.(*types.Transaction); ok {
plainTxsPerAcc = append(plainTxsPerAcc, plainTx)
} else if stakingTx, ok := tx.(*staking.StakingTransaction); ok {
// Only process staking transactions after pre-staking epoch happened.
if node.Blockchain().Config().IsPreStaking(node.Worker.GetCurrentHeader().Epoch()) {
pendingStakingTxs = append(pendingStakingTxs, stakingTx)
}
} else {
utils.Logger().Err(types.ErrUnknownPoolTxType).
Msg("Failed to parse pending transactions")
return nil, types.ErrUnknownPoolTxType
}
}
if plainTxsPerAcc.Len() > 0 {
pendingPlainTxs[addr] = plainTxsPerAcc
}
}
utils.AnalysisEnd("proposeNewBlockChooseFromTxnPool")
// Try commit normal and staking transactions based on the current state
// The successfully committed transactions will be put in the proposed block
if err := node.Worker.CommitTransactions(pendingPoolTxs, beneficiary); err != nil {
if err := node.Worker.CommitTransactions(
pendingPlainTxs, pendingStakingTxs, beneficiary,
); err != nil {
utils.Logger().Error().Err(err).Msg("cannot commit transactions")
return nil, err
}

@ -53,15 +53,15 @@ type Worker struct {
// CommitTransactions commits transactions for new block.
func (w *Worker) CommitTransactions(
poolTransactions map[common.Address]types.PoolTransactions, coinbase common.Address,
pendingNormal map[common.Address]types.Transactions,
pendingStaking staking.StakingTransactions, coinbase common.Address,
) error {
if w.current.gasPool == nil {
w.current.gasPool = new(core.GasPool).AddGas(w.current.header.GasLimit())
}
txs := types.NewPoolTransactionsByPriceAndNonce(w.current.signer, poolTransactions)
dbIndex := 0
txs := types.NewTransactionsByPriceAndNonce(w.current.signer, pendingNormal)
// NORMAL
for {
// If we don't have enough gas for any further transactions then we're done
@ -74,7 +74,10 @@ func (w *Worker) CommitTransactions(
if tx == nil {
break
}
// Error may be ignored here. The error has already been checked
// during transaction acceptance is the transaction pool.
// We use the eip155 signer regardless of the current hf.
from, _ := types.Sender(w.current.signer, tx)
// Check whether the tx is replay protected. If we're not in the EIP155 hf
// phase, start ignoring the sender until we do.
if tx.Protected() && !w.config.IsEIP155(w.current.header.Epoch()) {
@ -82,37 +85,16 @@ func (w *Worker) CommitTransactions(
txs.Pop()
continue
}
// Start executing the transaction
w.current.state.Prepare(tx.Hash(), common.Hash{}, dbIndex)
w.current.state.Prepare(tx.Hash(), common.Hash{}, len(w.current.txs))
if tx.ShardID() != w.chain.ShardID() {
txs.Shift()
continue
}
var (
err error
from common.Address
)
_, err := w.commitTransaction(tx, coinbase)
if plainTx, ok := tx.(*types.Transaction); ok {
// Error may be ignored here. The error has already been checked
// during transaction acceptance is the transaction pool.
// We use the eip155 signer regardless of the current hf.
from, _ = types.Sender(w.current.signer, plainTx)
_, err = w.commitTransaction(plainTx, coinbase)
} else if stx, ok := tx.(*staking.StakingTransaction); ok {
// STAKING - only beaconchain process staking transaction
if w.chain.ShardID() != shard.BeaconChainShardID ||
!w.config.IsPreStaking(w.current.header.Epoch()) {
txs.Shift()
continue
}
from, _ = stx.SenderAddress()
_, err = w.commitStakingTransaction(stx, coinbase)
}
// TODO(rj): rollback for staking errors
sender, _ := common2.AddressToBech32(from)
switch err {
case core.ErrGasLimitReached:
@ -132,11 +114,7 @@ func (w *Worker) CommitTransactions(
case nil:
// Everything ok, collect the logs and shift in the next transaction from the same account
utils.Logger().Info().Str("txHash", tx.Hash().Hex()).
Uint64("txGasLimit", tx.Gas()).
Msg("Successfully committed transaction")
txs.Shift()
dbIndex = dbIndex + 1
default:
// Strange error, discard the transaction and get the next in line (note, the
@ -146,6 +124,41 @@ func (w *Worker) CommitTransactions(
}
}
// STAKING - only beaconchain process staking transaction
if w.chain.ShardID() == shard.BeaconChainShardID {
for _, tx := range pendingStaking {
// TODO: merge staking transaction processing with normal transaction processing.
// <<THESE CODE ARE DUPLICATED AS ABOVE
// If we don't have enough gas for any further transactions then we're done
if w.current.gasPool.Gas() < params.TxGas {
utils.Logger().Info().Uint64("have", w.current.gasPool.Gas()).Uint64("want", params.TxGas).Msg("Not enough gas for further transactions")
break
}
// Check whether the tx is replay protected. If we're not in the EIP155 hf
// phase, start ignoring the sender until we do.
if tx.Protected() && !w.config.IsEIP155(w.current.header.Epoch()) {
utils.Logger().Info().Str("hash", tx.Hash().Hex()).Str("eip155Epoch", w.config.EIP155Epoch.String()).Msg("Ignoring reply protected transaction")
txs.Pop()
continue
}
// Start executing the transaction
w.current.state.Prepare(tx.Hash(), common.Hash{}, len(w.current.txs)+len(w.current.stakingTxs))
// THESE CODE ARE DUPLICATED AS ABOVE>>
if _, err := w.commitStakingTransaction(tx, coinbase); err != nil {
txID := tx.Hash().Hex()
utils.Logger().Error().Err(err).
Str("stakingTxID", txID).
Interface("stakingTx", tx).
Msg("Failed committing staking transaction")
} else {
utils.Logger().Info().Str("stakingTxId", tx.Hash().Hex()).
Uint64("txGasLimit", tx.Gas()).
Msg("Successfully committed staking transaction")
}
}
}
utils.Logger().Info().
Int("newTxns", len(w.current.txs)).
Int("newStakingTxns", len(w.current.stakingTxs)).

@ -75,10 +75,10 @@ func TestCommitTransactions(t *testing.T) {
tx, _ := types.SignTx(types.NewTransaction(baseNonce, testBankAddress, uint32(0), big.NewInt(int64(denominations.One*randAmount)), params.TxGas, nil, nil), types.HomesteadSigner{}, testBankKey)
// Commit the tx to the worker
txs := make(map[common.Address]types.PoolTransactions)
txs[testBankAddress] = types.PoolTransactions{tx}
txs := make(map[common.Address]types.Transactions)
txs[testBankAddress] = types.Transactions{tx}
err := worker.CommitTransactions(
txs, testBankAddress,
txs, nil, testBankAddress,
)
if err != nil {
t.Error(err)

@ -118,13 +118,11 @@ func fundFaucetContract(chain *core.BlockChain) {
tx, _ := types.SignTx(types.NewTransaction(nonce+uint64(4), randomUserAddress, 0, big.NewInt(int64(amount)), params.TxGas, nil, nil), types.HomesteadSigner{}, FaucetPriKey)
txs = append(txs, tx)
txmap := make(map[common.Address]types.PoolTransactions)
for _, tx := range txs {
txmap[FaucetAddress] = append(txmap[FaucetAddress], tx)
}
txmap := make(map[common.Address]types.Transactions)
txmap[FaucetAddress] = txs
err := contractworker.CommitTransactions(
txmap, testUserAddress,
txmap, nil, testUserAddress,
)
if err != nil {
fmt.Println(err)
@ -162,11 +160,11 @@ func callFaucetContractToFundAnAddress(chain *core.BlockChain) {
callEnc = append(callEnc, paddedAddress...)
callfaucettx, _ := types.SignTx(types.NewTransaction(nonce+uint64(5), faucetContractAddress, 0, big.NewInt(0), params.TxGasContractCreation*10, nil, callEnc), types.HomesteadSigner{}, FaucetPriKey)
txmap := make(map[common.Address]types.PoolTransactions)
txmap[FaucetAddress] = types.PoolTransactions{callfaucettx}
txmap := make(map[common.Address]types.Transactions)
txmap[FaucetAddress] = types.Transactions{callfaucettx}
err = contractworker.CommitTransactions(
txmap, testUserAddress,
txmap, nil, testUserAddress,
)
if err != nil {
fmt.Println(err)

Loading…
Cancel
Save