Merge pull request #1672 from rlan35/staking_specs

Add staking transaction struct and handler logic
pull/1674/head
chaosma 5 years ago committed by GitHub
commit f91c013ed2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      api/proto/node/node.go
  2. 95
      node/node.go
  3. 1
      node/node_cross_shard.go
  4. 18
      node/node_handler.go
  5. 8
      node/node_handler_test.go
  6. 6
      node/node_newblock.go
  7. 4
      node/staking_test.go
  8. 23
      node/worker/worker.go
  9. 4
      node/worker/worker_test.go
  10. 33
      staking/types/messages.go
  11. 39
      staking/types/staking_transaction.go
  12. 2
      staking/types/validator.go
  13. 10
      test/chain/main.go

@ -28,6 +28,7 @@ const (
_ // used to be Control
PING // node send ip/pki to register with leader
ShardState
Staking
)
// BlockchainSyncMessage is a struct for blockchain sync message.

@ -6,6 +6,8 @@ import (
"sync"
"time"
types2 "github.com/harmony-one/harmony/staking/types"
"github.com/ethereum/go-ethereum/common"
"github.com/harmony-one/harmony/internal/params"
@ -122,10 +124,13 @@ type Node struct {
CxPool *core.CxPool // pool for missing cross shard receipts resend
pendingTransactions types.Transactions // All the transactions received but not yet processed for Consensus
pendingTransactions map[common.Hash]*types.Transaction // All the transactions received but not yet processed for Consensus
pendingTxMutex sync.Mutex
recentTxsStats types.RecentTxsStats
pendingStakingTransactions map[common.Hash]*types2.StakingTransaction // All the staking transactions received but not yet processed for Consensus
pendingStakingTxMutex sync.Mutex
Worker *worker.Worker
BeaconWorker *worker.Worker // worker for beacon chain
@ -244,17 +249,6 @@ func (node *Node) Beaconchain() *core.BlockChain {
return bc
}
func (node *Node) reducePendingTransactions() {
txPoolLimit := core.ShardingSchedule.MaxTxPoolSizeLimit()
curLen := len(node.pendingTransactions)
// If length of pendingTransactions is greater than TxPoolLimit then by greedy take the TxPoolLimit recent transactions.
if curLen > txPoolLimit+txPoolLimit {
node.pendingTransactions = append(types.Transactions(nil), node.pendingTransactions[curLen-txPoolLimit:]...)
utils.Logger().Info().Msg("mem stat reduce pending transaction")
}
}
func (node *Node) tryBroadcast(tx *types.Transaction) {
msg := proto_node.ConstructTransactionListMessageAccount(types.Transactions{tx})
@ -272,13 +266,36 @@ func (node *Node) tryBroadcast(tx *types.Transaction) {
// Add new transactions to the pending transaction list.
func (node *Node) addPendingTransactions(newTxs types.Transactions) {
txPoolLimit := core.ShardingSchedule.MaxTxPoolSizeLimit()
node.pendingTxMutex.Lock()
node.pendingTransactions = append(node.pendingTransactions, newTxs...)
node.reducePendingTransactions()
for _, tx := range newTxs {
if _, ok := node.pendingTransactions[tx.Hash()]; !ok {
node.pendingTransactions[tx.Hash()] = tx
}
if len(node.pendingTransactions) > txPoolLimit {
break
}
}
node.pendingTxMutex.Unlock()
utils.Logger().Info().Int("length of newTxs", len(newTxs)).Int("totalPending", len(node.pendingTransactions)).Msg("Got more transactions")
}
// Add new staking transactions to the pending staking transaction list.
func (node *Node) addPendingStakingTransactions(newStakingTxs types2.StakingTransactions) {
txPoolLimit := core.ShardingSchedule.MaxTxPoolSizeLimit()
node.pendingStakingTxMutex.Lock()
for _, tx := range newStakingTxs {
if _, ok := node.pendingStakingTransactions[tx.Hash()]; !ok {
node.pendingStakingTransactions[tx.Hash()] = tx
}
if len(node.pendingStakingTransactions) > txPoolLimit {
break
}
}
node.pendingStakingTxMutex.Unlock()
utils.Logger().Info().Int("length of newStakingTxs", len(newStakingTxs)).Int("totalPending", len(node.pendingTransactions)).Msg("Got more staking transactions")
}
// AddPendingTransaction adds one new transaction to the pending transaction list.
// This is only called from SDK.
func (node *Node) AddPendingTransaction(newTx *types.Transaction) {
@ -315,8 +332,7 @@ func (node *Node) AddPendingReceipts(receipts *types.CXReceiptsProof) {
// Take out a subset of valid transactions from the pending transaction list
// Note the pending transaction list will then contain the rest of the txs
func (node *Node) getTransactionsForNewBlock(coinbase common.Address) types.Transactions {
node.pendingTxMutex.Lock()
func (node *Node) getTransactionsForNewBlock(coinbase common.Address) (types.Transactions, types2.StakingTransactions) {
txsThrottleConfig := core.ShardingSchedule.TxsThrottleConfig()
@ -332,18 +348,53 @@ func (node *Node) getTransactionsForNewBlock(coinbase common.Address) types.Tran
}
node.recentTxsStats[newBlockNum] = make(types.BlockTxsCounts)
selected, unselected, invalid := node.Worker.SelectTransactionsForNewBlock(newBlockNum, node.pendingTransactions, node.recentTxsStats, txsThrottleConfig, coinbase)
// Must update to the correct current state before processing potential txns
if err := node.Worker.UpdateCurrent(coinbase); err != nil {
utils.Logger().Error().
Err(err).
Msg("Failed updating worker's state before txn selection")
return types.Transactions{}, types2.StakingTransactions{}
}
node.pendingTransactions = unselected
node.reducePendingTransactions()
node.pendingTxMutex.Lock()
defer node.pendingTxMutex.Unlock()
node.pendingStakingTxMutex.Lock()
defer node.pendingStakingTxMutex.Unlock()
pendingTransactions := types.Transactions{}
pendingStakingTransactions := types2.StakingTransactions{}
for _, tx := range node.pendingTransactions {
pendingTransactions = append(pendingTransactions, tx)
}
for _, tx := range node.pendingStakingTransactions {
pendingStakingTransactions = append(pendingStakingTransactions, tx)
}
selected, unselected, invalid := node.Worker.SelectTransactionsForNewBlock(newBlockNum, pendingTransactions, node.recentTxsStats, txsThrottleConfig, coinbase)
selectedStaking, unselectedStaking, invalidStaking := node.Worker.SelectStakingTransactionsForNewBlock(newBlockNum, pendingStakingTransactions, node.recentTxsStats, txsThrottleConfig, coinbase)
node.pendingTransactions = make(map[common.Hash]*types.Transaction)
for _, unselectedTx := range unselected {
node.pendingTransactions[unselectedTx.Hash()] = unselectedTx
}
utils.Logger().Info().
Int("remainPending", len(node.pendingTransactions)).
Int("selected", len(selected)).
Int("invalidDiscarded", len(invalid)).
Msg("Selecting Transactions")
node.pendingTxMutex.Unlock()
return selected
node.pendingStakingTransactions = make(map[common.Hash]*types2.StakingTransaction)
for _, unselectedStakingTx := range unselectedStaking {
node.pendingStakingTransactions[unselectedStakingTx.Hash()] = unselectedStakingTx
}
utils.Logger().Info().
Int("remainPending", len(node.pendingStakingTransactions)).
Int("selected", len(unselectedStaking)).
Int("invalidDiscarded", len(invalidStaking)).
Msg("Selecting Transactions")
return selected, selectedStaking
}
// StartServer starts a server and process the requests by a handler.
@ -419,6 +470,8 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, chainDBFactory shardc
}
node.pendingCXReceipts = make(map[string]*types.CXReceiptsProof)
node.pendingTransactions = make(map[common.Hash]*types.Transaction)
node.pendingStakingTransactions = make(map[common.Hash]*types2.StakingTransaction)
node.Consensus.VerifiedNewBlock = make(chan *types.Block)
// the sequence number is the next block number to be added in consensus protocol, which is always one more than current chain header block

@ -289,6 +289,7 @@ func (node *Node) VerifyCrosslinkHeader(prevHeader, header *block.Header) error
}
// Verify signature of the new cross link header
// TODO: check whether to recalculate shard state
shardState, err := node.Blockchain().ReadShardState(prevHeader.Epoch())
committee := shardState.FindCommitteeByID(prevHeader.ShardID())

@ -9,6 +9,8 @@ import (
"sync/atomic"
"time"
types2 "github.com/harmony-one/harmony/staking/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/common"
@ -151,6 +153,9 @@ func (node *Node) messageHandler(content []byte, sender libp2p_peer.ID) {
case proto_node.Transaction:
utils.Logger().Debug().Msg("NET: received message: Node/Transaction")
node.transactionMessageHandler(msgPayload)
case proto_node.Staking:
utils.Logger().Debug().Msg("NET: received message: Node/Staking")
node.stakingMessageHandler(msgPayload)
case proto_node.Block:
utils.Logger().Debug().Msg("NET: received message: Node/Block")
blockMsgType := proto_node.BlockMessageType(msgPayload[0])
@ -239,11 +244,24 @@ func (node *Node) transactionMessageHandler(msgPayload []byte) {
utils.Logger().Error().
Err(err).
Msg("Failed to deserialize transaction list")
return
}
node.addPendingTransactions(txs)
}
}
func (node *Node) stakingMessageHandler(msgPayload []byte) {
txs := types2.StakingTransactions{}
err := rlp.Decode(bytes.NewReader(msgPayload[:]), &txs)
if err != nil {
utils.Logger().Error().
Err(err).
Msg("Failed to deserialize staking transaction list")
return
}
node.addPendingStakingTransactions(txs)
}
// BroadcastNewBlock is called by consensus leader to sync new blocks with other clients/nodes.
// NOTE: For now, just send to the client (basically not broadcasting)
// TODO (lc): broadcast the new blocks to new nodes doing state sync

@ -30,8 +30,8 @@ func TestAddNewBlock(t *testing.T) {
nodeconfig.GetShardConfig(0).SetNetworkType(nodeconfig.Devnet)
node := New(host, consensus, testDBFactory, false)
selectedTxs := node.getTransactionsForNewBlock(common.Address{})
node.Worker.CommitTransactions(selectedTxs, common.Address{})
selectedTxs, selectedStakingTxs := node.getTransactionsForNewBlock(common.Address{})
node.Worker.CommitTransactions(selectedTxs, selectedStakingTxs, common.Address{})
block, _ := node.Worker.FinalizeNewBlock([]byte{}, []byte{}, 0, common.Address{}, nil, nil)
err = node.AddNewBlock(block)
@ -59,8 +59,8 @@ func TestVerifyNewBlock(t *testing.T) {
}
node := New(host, consensus, testDBFactory, false)
selectedTxs := node.getTransactionsForNewBlock(common.Address{})
node.Worker.CommitTransactions(selectedTxs, common.Address{})
selectedTxs, selectedStakingTxs := node.getTransactionsForNewBlock(common.Address{})
node.Worker.CommitTransactions(selectedTxs, selectedStakingTxs, common.Address{})
block, _ := node.Worker.FinalizeNewBlock([]byte{}, []byte{}, 0, common.Address{}, nil, nil)
if err := node.VerifyNewBlock(block); err != nil {

@ -81,10 +81,10 @@ func (node *Node) proposeNewBlock() (*types.Block, error) {
// Update worker's current header and state data in preparation to propose/process new transactions
coinbase := node.Consensus.SelfAddress
// Prepare transactions
selectedTxs := node.getTransactionsForNewBlock(coinbase)
// Prepare transactions including staking transactions
selectedTxs, selectedStakingTxs := node.getTransactionsForNewBlock(coinbase)
if err := node.Worker.CommitTransactions(selectedTxs, coinbase); err != nil {
if err := node.Worker.CommitTransactions(selectedTxs, selectedStakingTxs, coinbase); err != nil {
ctxerror.Log15(utils.GetLogger().Error,
ctxerror.New("cannot commit transactions").
WithCause(err))

@ -41,8 +41,8 @@ func TestUpdateStakingList(t *testing.T) {
node.BlockPeriod = 8 * time.Second
for i := 0; i < 1; i++ {
selectedTxs := node.getTransactionsForNewBlock(common.Address{})
node.Worker.CommitTransactions(selectedTxs, common.Address{})
selectedTxs, selectedStakingTxs := node.getTransactionsForNewBlock(common.Address{})
node.Worker.CommitTransactions(selectedTxs, selectedStakingTxs, common.Address{})
block, err := node.Worker.FinalizeNewBlock([]byte{}, []byte{}, 0, common.Address{}, nil, nil)
// The block must first be finalized before being added to the blockchain.

@ -5,6 +5,8 @@ import (
"math/big"
"time"
types2 "github.com/harmony-one/harmony/staking/types"
blockfactory "github.com/harmony-one/harmony/block/factory"
"github.com/harmony-one/harmony/shard"
@ -96,13 +98,6 @@ func (w *Worker) throttleTxs(selected types.Transactions, recentTxsStats types.R
// SelectTransactionsForNewBlock selects transactions for new block.
func (w *Worker) SelectTransactionsForNewBlock(newBlockNum uint64, txs types.Transactions, recentTxsStats types.RecentTxsStats, txsThrottleConfig *shardingconfig.TxsThrottleConfig, coinbase common.Address) (types.Transactions, types.Transactions, types.Transactions) {
// Must update to the correct current state before processing potential txns
if err := w.UpdateCurrent(coinbase); err != nil {
utils.Logger().Error().
Err(err).
Msg("Failed updating worker's state before txn selection")
return types.Transactions{}, txs, types.Transactions{}
}
if w.current.gasPool == nil {
w.current.gasPool = new(core.GasPool).AddGas(w.current.header.GasLimit())
@ -154,6 +149,12 @@ func (w *Worker) SelectTransactionsForNewBlock(newBlockNum uint64, txs types.Tra
return selected, unselected, invalid
}
// SelectStakingTransactionsForNewBlock selects staking transactions for new block.
func (w *Worker) SelectStakingTransactionsForNewBlock(newBlockNum uint64, txs types2.StakingTransactions, recentTxsStats types.RecentTxsStats, txsThrottleConfig *shardingconfig.TxsThrottleConfig, coinbase common.Address) (types2.StakingTransactions, types2.StakingTransactions, types2.StakingTransactions) {
// TODO: implement staking transaction selection
return types2.StakingTransactions{}, types2.StakingTransactions{}, types2.StakingTransactions{}
}
func (w *Worker) commitTransaction(tx *types.Transaction, coinbase common.Address) ([]*types.Log, error) {
snap := w.current.state.Snapshot()
@ -177,8 +178,8 @@ func (w *Worker) commitTransaction(tx *types.Transaction, coinbase common.Addres
return receipt.Logs, nil
}
// CommitTransactions commits transactions.
func (w *Worker) CommitTransactions(txs types.Transactions, coinbase common.Address) error {
// CommitTransactions commits transactions including staking transactions.
func (w *Worker) CommitTransactions(txs types.Transactions, stakingTxns types2.StakingTransactions, coinbase common.Address) error {
// Must update to the correct current state before processing potential txns
if err := w.UpdateCurrent(coinbase); err != nil {
utils.Logger().Error().
@ -199,6 +200,10 @@ func (w *Worker) CommitTransactions(txs types.Transactions, coinbase common.Addr
}
}
for _, stakingTx := range stakingTxns {
_ = stakingTx
// TODO: add logic to commit staking txns
}
return nil
}

@ -5,6 +5,8 @@ import (
"math/rand"
"testing"
types2 "github.com/harmony-one/harmony/staking/types"
blockfactory "github.com/harmony-one/harmony/block/factory"
chain2 "github.com/harmony-one/harmony/internal/chain"
@ -75,7 +77,7 @@ func TestCommitTransactions(t *testing.T) {
tx, _ := types.SignTx(types.NewTransaction(baseNonce, testBankAddress, uint32(0), big.NewInt(int64(denominations.One*randAmount)), params.TxGas, nil, nil), types.HomesteadSigner{}, testBankKey)
// Commit the tx to the worker
err := worker.CommitTransactions(types.Transactions{tx}, testBankAddress)
err := worker.CommitTransactions(types.Transactions{tx}, types2.StakingTransactions{}, testBankAddress)
if err != nil {
t.Error(err)
}

@ -0,0 +1,33 @@
package types
import (
"math/big"
"github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/internal/common"
)
// StakingMessage must fulfill these interfaces
type StakingMessage interface {
// Type returns a human-readable string for the type of the staking message
Type() string
// Signer returns the ECDSA address who must sign the outer transaction
Signer() common.Address
}
// MsgCreateValidator - struct for creating a new validator
type MsgCreateValidator struct {
Description Description `json:"description" yaml:"description"`
Commission CommissionRates `json:"commission" yaml:"commission"`
MinSelfDelegation big.Int `json:"min_self_delegation" yaml:"min_self_delegation"`
Address common.Address `json:"validator_address" yaml:"validator_address"`
ValidatingPubKey bls.PublicKey `json:"validating_pub_key" yaml:"validating_pub_key"`
Amount big.Int `json:"amount" yaml:"amount"`
}
// Type ...
func (msg MsgCreateValidator) Type() string { return "create_validator" }
// Signer ...
func (msg MsgCreateValidator) Signer() common.Address { return msg.Address }

@ -0,0 +1,39 @@
package types
import (
"bytes"
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/harmony-one/harmony/crypto/hash"
)
// StakingTransaction struct.
type StakingTransaction struct {
AccountNonce uint64 `json:"nonce" gencodec:"required"`
Price *big.Int `json:"gasPrice" gencodec:"required"`
GasLimit uint64 `json:"gas" gencodec:"required"`
Msg StakingMessage `json:"msg" gencodec:"required"`
// Signature values
V *big.Int `json:"v" gencodec:"required"`
R *big.Int `json:"r" gencodec:"required"`
S *big.Int `json:"s" gencodec:"required"`
// This is only used when marshaling to JSON.
hash *common.Hash `json:"hash" rlp:"-"`
}
// StakingTransactions is a Transaction slice type for basic sorting.
type StakingTransactions []*StakingTransaction
// Hash hashes the RLP encoding of tx.
// It uniquely identifies the transaction.
func (tx *StakingTransaction) Hash() common.Hash {
emptyHash := common.Hash{}
if bytes.Compare(tx.hash[:], emptyHash[:]) == 0 {
h := hash.FromRLP(tx)
tx.hash = &h
}
return *tx.hash
}

@ -9,7 +9,7 @@ import (
// Validator - data fields for a validator
type Validator struct {
StakingAddress common.Address `json:"staking_address" yaml:"staking_address"` // ECDSA address of the validator
Address common.Address `json:"address" yaml:"address"` // ECDSA address of the validator
ValidatingPubKey bls.PublicKey `json:"validating_pub_key" yaml:"validating_pub_key"` // The BLS public key of the validator for consensus
Description Description `json:"description" yaml:"description"` // description for the validator
Active bool `json:"active" yaml:"active"` // Is the validator active in the validating process or not

@ -6,6 +6,8 @@ import (
"log"
"math/big"
types2 "github.com/harmony-one/harmony/staking/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/common"
@ -127,7 +129,7 @@ func fundFaucetContract(chain *core.BlockChain) {
amount := 720000
tx, _ := types.SignTx(types.NewTransaction(nonce+uint64(4), StakingAddress, 0, big.NewInt(int64(amount)), params.TxGas, nil, nil), types.HomesteadSigner{}, FaucetPriKey)
txs = append(txs, tx)
err := contractworker.CommitTransactions(txs, testUserAddress)
err := contractworker.CommitTransactions(txs, types2.StakingTransactions{}, testUserAddress)
if err != nil {
fmt.Println(err)
}
@ -165,7 +167,7 @@ func callFaucetContractToFundAnAddress(chain *core.BlockChain) {
callEnc = append(callEnc, paddedAddress...)
callfaucettx, _ := types.SignTx(types.NewTransaction(nonce+uint64(5), faucetContractAddress, 0, big.NewInt(0), params.TxGasContractCreation*10, nil, callEnc), types.HomesteadSigner{}, FaucetPriKey)
err = contractworker.CommitTransactions(types.Transactions{callfaucettx}, testUserAddress)
err = contractworker.CommitTransactions(types.Transactions{callfaucettx}, types2.StakingTransactions{}, testUserAddress)
if err != nil {
fmt.Println(err)
}
@ -243,7 +245,7 @@ func playStaking(chain *core.BlockChain) {
tx, _ := types.SignTx(types.NewTransaction(0, stakeContractAddress, 0, big.NewInt(int64(stake)), params.TxGas*5, nil, callEncl), types.HomesteadSigner{}, allRandomUserKey[i])
stakingtxns = append(stakingtxns, tx)
}
err = contractworker.CommitTransactions(stakingtxns, common.Address{})
err = contractworker.CommitTransactions(stakingtxns, types2.StakingTransactions{}, common.Address{})
if err != nil {
fmt.Println(err)
@ -301,7 +303,7 @@ func playWithdrawStaking(chain *core.BlockChain) {
withdrawstakingtxns = append(withdrawstakingtxns, tx)
}
err = contractworker.CommitTransactions(withdrawstakingtxns, common.Address{})
err = contractworker.CommitTransactions(withdrawstakingtxns, types2.StakingTransactions{}, common.Address{})
if err != nil {
fmt.Println("error:")
fmt.Println(err)

Loading…
Cancel
Save