diff --git a/client/txgen/main.go b/client/txgen/main.go index 025035eb1..696bd06a1 100644 --- a/client/txgen/main.go +++ b/client/txgen/main.go @@ -3,6 +3,7 @@ package main import ( "flag" "fmt" + "github.com/harmony-one/harmony/utils" "os" "path" "runtime" @@ -77,15 +78,17 @@ func main() { // Nodes containing utxopools to mirror the shards' data in the network nodes := []*node.Node{} + clientPeer := config.GetClientPeer() for shardID := range shardIDLeaderMap { - node := node.New(&consensus.Consensus{ShardID: shardID}, nil, p2p.Peer{}) + _, pubKey := utils.GenKey(clientPeer.IP, clientPeer.Port) + clientPeer.PubKey = pubKey + node := node.New(&consensus.Consensus{ShardID: shardID}, nil, *clientPeer) // Assign many fake addresses so we have enough address to play with at first node.AddTestingAddresses(setting.NumOfAddress) nodes = append(nodes, node) } // Client/txgenerator server node setup - clientPeer := config.GetClientPeer() consensusObj := consensus.New(*clientPeer, "0", nil, p2p.Peer{}) clientNode := node.New(consensusObj, nil, *clientPeer) diff --git a/client/txgen/txgen/account_txs_generator.go b/client/txgen/txgen/account_txs_generator.go index 4ea92e05f..4521b8943 100644 --- a/client/txgen/txgen/account_txs_generator.go +++ b/client/txgen/txgen/account_txs_generator.go @@ -27,6 +27,7 @@ func GenerateSimulatedTransactionsAccount(shardID int, dataNodes []*node.Node, s for j := 0; j < 10; j++ { randomUserKey, _ := crypto.GenerateKey() randomUserAddress := crypto.PubkeyToAddress(randomUserKey.PublicKey) + tx, _ := types.SignTx(types.NewTransaction(baseNonce+uint64(j), randomUserAddress, uint32(shardID), big.NewInt(1000), params.TxGas, nil, nil), types.HomesteadSigner{}, node.TestBankKeys[i]) txs[i*10+j] = tx } diff --git a/harmony/main.go b/harmony/main.go index 1ae4283b2..8fad3d413 100644 --- a/harmony/main.go +++ b/harmony/main.go @@ -84,7 +84,7 @@ func main() { } txs := make([]*types.Transaction, 100) - worker := worker.New(params.TestChainConfig, chain, consensus.NewFaker()) + worker := worker.New(params.TestChainConfig, chain, consensus.NewFaker(), crypto.PubkeyToAddress(testBankKey.PublicKey)) nonce := worker.GetCurrentState().GetNonce(crypto.PubkeyToAddress(testBankKey.PublicKey)) for i := range txs { randomUserKey, _ := crypto.GenerateKey() @@ -93,5 +93,5 @@ func main() { txs[i] = tx } - worker.CommitTransactions(txs, crypto.PubkeyToAddress(testBankKey.PublicKey)) + worker.CommitTransactions(txs) } diff --git a/node/node.go b/node/node.go index f2fccc09e..a82779b7b 100644 --- a/node/node.go +++ b/node/node.go @@ -139,14 +139,14 @@ func (node *Node) getTransactionsForNewBlock(maxNumTxs int) ([]*blockchain.Trans // Note the pending transaction list will then contain the rest of the txs func (node *Node) getTransactionsForNewBlockAccount(maxNumTxs int) (types.Transactions, []*blockchain.CrossShardTxAndProof) { node.pendingTxMutexAccount.Lock() - selected, unselected, invalid, crossShardTxs := node.pendingTransactionsAccount, types.Transactions{}, types.Transactions{}, []*blockchain.CrossShardTxAndProof{} + selected, unselected, invalid := node.Worker.SelectTransactionsForNewBlock(node.pendingTransactionsAccount, maxNumTxs) _ = invalid // invalid txs are discard node.log.Debug("Invalid transactions discarded", "number", len(invalid)) node.pendingTransactionsAccount = unselected node.log.Debug("Remaining pending transactions", "number", len(node.pendingTransactionsAccount)) node.pendingTxMutexAccount.Unlock() - return selected, crossShardTxs //TODO: replace cross-shard proofs for account model + return selected, nil //TODO: replace cross-shard proofs for account model } // StartServer starts a server and process the request by a handler. @@ -308,7 +308,7 @@ func New(consensus *bft.Consensus, db *hdb.LDBDatabase, selfPeer p2p.Peer) *Node node.Chain = chain node.TxPool = core.NewTxPool(core.DefaultTxPoolConfig, params.TestChainConfig, chain) node.BlockChannelAccount = make(chan *types.Block) - node.Worker = worker.New(params.TestChainConfig, chain, node.Consensus) + node.Worker = worker.New(params.TestChainConfig, chain, node.Consensus, pki.GetAddressFromPublicKey(selfPeer.PubKey)) } node.SelfPeer = selfPeer diff --git a/node/node_handler.go b/node/node_handler.go index 54626dab7..bef2bb5df 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -340,21 +340,13 @@ func (node *Node) WaitForConsensusReadyAccount(readySignal chan struct{}) { if len(node.pendingTransactionsAccount) >= 1000 { // Normal tx block consensus selectedTxs, _ := node.getTransactionsForNewBlockAccount(MaxNumberOfTransactionsPerBlock) - err := node.Worker.UpdateCurrent() + node.Worker.CommitTransactions(selectedTxs) + block, err := node.Worker.Commit() if err != nil { - node.log.Debug("Failed updating worker's state", "Error", err) - } - err = node.Worker.CommitTransactions(selectedTxs, pki.GetAddressFromPublicKey(node.SelfPeer.PubKey)) - if err == nil { - block, err := node.Worker.Commit() - if err != nil { - node.log.Debug("Failed commiting new block", "Error", err) - } else { - newBlock = block - break - } + node.log.Debug("Failed commiting new block", "Error", err) } else { - node.log.Debug("Failed to create new block", "Error", err) + newBlock = block + break } } // If not enough transactions to run Consensus, diff --git a/node/node_test.go b/node/node_test.go index bb0458b4b..339563e77 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -2,6 +2,7 @@ package node import ( "fmt" + "github.com/harmony-one/harmony/utils" "os" "testing" "time" @@ -16,7 +17,8 @@ import ( ) func TestNewNewNode(test *testing.T) { - leader := p2p.Peer{IP: "1", Port: "2"} + _, pubKey := utils.GenKey("1", "2") + leader := p2p.Peer{IP: "1", Port: "2", PubKey: pubKey} validator := p2p.Peer{IP: "3", Port: "5"} consensus := consensus.New(leader, "0", []p2p.Peer{leader, validator}, leader) @@ -43,7 +45,8 @@ func TestNewNewNode(test *testing.T) { } func TestCountNumTransactionsInBlockchain(test *testing.T) { - leader := p2p.Peer{IP: "1", Port: "2"} + _, pubKey := utils.GenKey("1", "2") + leader := p2p.Peer{IP: "1", Port: "2", PubKey: pubKey} validator := p2p.Peer{IP: "3", Port: "5"} consensus := consensus.New(leader, "0", []p2p.Peer{leader, validator}, leader) @@ -77,7 +80,8 @@ func TestAddPeers(test *testing.T) { ValidatorID: 2, }, } - leader := p2p.Peer{IP: "1", Port: "2"} + _, pubKey := utils.GenKey("1", "2") + leader := p2p.Peer{IP: "1", Port: "2", PubKey: pubKey} validator := p2p.Peer{IP: "3", Port: "5"} consensus := consensus.New(leader, "0", []p2p.Peer{leader, validator}, leader) diff --git a/node/worker/worker.go b/node/worker/worker.go index ad7a8fee7..f4dbc14b2 100644 --- a/node/worker/worker.go +++ b/node/worker/worker.go @@ -1,6 +1,7 @@ package worker import ( + "github.com/harmony-one/harmony/log" "math/big" "time" @@ -30,12 +31,41 @@ type Worker struct { chain *core.BlockChain current *environment // An environment for current running cycle. - engine consensus.Engine + coinbase common.Address + engine consensus.Engine gasFloor uint64 gasCeil uint64 } +func (w *Worker) SelectTransactionsForNewBlock(txs types.Transactions, maxNumTxs int) (types.Transactions, types.Transactions, types.Transactions) { + if w.current.gasPool == nil { + w.current.gasPool = new(core.GasPool).AddGas(w.current.header.GasLimit) + } + selected := types.Transactions{} + unselected := types.Transactions{} + invalid := types.Transactions{} + for _, tx := range txs { + snap := w.current.state.Snapshot() + _, err := w.commitTransaction(tx, w.coinbase) + if len(selected) > maxNumTxs { + unselected = append(unselected, tx) + } else { + if err != nil { + w.current.state.RevertToSnapshot(snap) + invalid = append(invalid, tx) + } else { + selected = append(selected, tx) + } + } + } + err := w.UpdateCurrent() + if err != nil { + log.Debug("Failed updating worker's state", "Error", err) + } + return selected, unselected, invalid +} + func (w *Worker) commitTransaction(tx *types.Transaction, coinbase common.Address) ([]*types.Log, error) { snap := w.current.state.Snapshot() @@ -51,14 +81,13 @@ func (w *Worker) commitTransaction(tx *types.Transaction, coinbase common.Addres } // CommitTransactions commits transactions. -func (w *Worker) CommitTransactions(txs []*types.Transaction, coinbase common.Address) error { - snap := w.current.state.Snapshot() - +func (w *Worker) CommitTransactions(txs types.Transactions) error { if w.current.gasPool == nil { w.current.gasPool = new(core.GasPool).AddGas(w.current.header.GasLimit) } for _, tx := range txs { - _, err := w.commitTransaction(tx, coinbase) + snap := w.current.state.Snapshot() + _, err := w.commitTransaction(tx, w.coinbase) if err != nil { w.current.state.RevertToSnapshot(snap) return err @@ -114,7 +143,7 @@ func (w *Worker) Commit() (*types.Block, error) { } // New ... -func New(config *params.ChainConfig, chain *core.BlockChain, engine consensus.Engine) *Worker { +func New(config *params.ChainConfig, chain *core.BlockChain, engine consensus.Engine, coinbase common.Address) *Worker { worker := &Worker{ config: config, chain: chain, @@ -122,6 +151,7 @@ func New(config *params.ChainConfig, chain *core.BlockChain, engine consensus.En } worker.gasFloor = 0 worker.gasCeil = 1000000000000000 + worker.coinbase = coinbase parent := worker.chain.CurrentBlock() num := parent.Number()