From e7d62cae18e54329b1e3c532671999f4b1bb5208 Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Tue, 1 Jan 2019 14:05:49 -0800 Subject: [PATCH 1/3] Add more comments to worker.go --- node/worker/worker.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/node/worker/worker.go b/node/worker/worker.go index 942dd08dd..8741d6e0f 100644 --- a/node/worker/worker.go +++ b/node/worker/worker.go @@ -105,7 +105,7 @@ func (w *Worker) CommitTransactions(txs types.Transactions) error { return nil } -// UpdateCurrent updates ... +// UpdateCurrent updates the current environment with the current state and header. func (w *Worker) UpdateCurrent() error { parent := w.chain.CurrentBlock() num := parent.Number() @@ -135,17 +135,17 @@ func (w *Worker) makeCurrent(parent *types.Block, header *types.Header) error { return nil } -// GetCurrentState ... +// GetCurrentState gets the current state. func (w *Worker) GetCurrentState() *state.StateDB { return w.current.state } -// GetCurrentReceipts ... +// GetCurrentReceipts get the receipts generated starting from the last state. func (w *Worker) GetCurrentReceipts() []*types.Receipt { return w.current.receipts } -// Commit ... +// Commit generate a new block for the new txs. func (w *Worker) Commit() (*types.Block, error) { s := w.current.state.Copy() block, err := w.engine.Finalize(w.chain, w.current.header, s, w.current.txs, w.current.receipts) @@ -155,7 +155,7 @@ func (w *Worker) Commit() (*types.Block, error) { return block, nil } -// New ... +// New create a new worker object. func New(config *params.ChainConfig, chain *core.BlockChain, engine consensus.Engine, coinbase common.Address, shardID uint32) *Worker { worker := &Worker{ config: config, From 334c47776fa2f213f64e8cd3718e4e160d05daf6 Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Tue, 1 Jan 2019 15:27:13 -0800 Subject: [PATCH 2/3] Cleanup node.go --- node/{node_utils.go => address_faker.go} | 2 +- node/node.go | 45 ++++++++++-------------- test/deploy.sh | 2 +- 3 files changed, 20 insertions(+), 29 deletions(-) rename node/{node_utils.go => address_faker.go} (94%) diff --git a/node/node_utils.go b/node/address_faker.go similarity index 94% rename from node/node_utils.go rename to node/address_faker.go index 4e296b8d8..230f8a473 100644 --- a/node/node_utils.go +++ b/node/address_faker.go @@ -12,7 +12,7 @@ import ( // CreateGenesisAllocWithTestingAddresses create the genesis block allocation that contains deterministically // generated testing addressess with tokens. -// TODO: Consider to remove it later when moving to production.a +// TODO: Remove it later when moving to production. func (node *Node) CreateGenesisAllocWithTestingAddresses(numAddress int) core.GenesisAlloc { rand.Seed(0) len := 1000000 diff --git a/node/node.go b/node/node.go index fb61d903a..6fdf2760f 100644 --- a/node/node.go +++ b/node/node.go @@ -91,22 +91,23 @@ type NetworkNode struct { IDCPeer p2p.Peer } -// Node represents a program (machine) participating in the network -// TODO(minhdoan, rj): consider using BlockChannel *chan blockchain.Block for efficiency. +// Node represents a protocol-participating node in the network type Node struct { Consensus *bft.Consensus // Consensus object containing all Consensus related data (e.g. committee members, signatures, commits) BlockChannel chan *types.Block // The channel to receive new blocks from Node pendingTransactions types.Transactions // All the transactions received but not yet processed for Consensus transactionInConsensus []*types.Transaction // The transactions selected into the new block and under Consensus process - blockchain *core.BlockChain // The blockchain for the shard where this node belongs - db *hdb.LDBDatabase // LevelDB to store blockchain. - log log.Logger // Log utility pendingTxMutex sync.Mutex - crossTxToReturnMutex sync.Mutex - ClientPeer *p2p.Peer // The peer for the benchmark tx generator client, used for leaders to return proof-of-accept - Client *client.Client // The presence of a client object means this node will also act as a client - SelfPeer p2p.Peer // TODO(minhdoan): it could be duplicated with Self below whose is Alok work. - IDCPeer p2p.Peer + + blockchain *core.BlockChain // The blockchain for the shard where this node belongs + db *hdb.LDBDatabase // LevelDB to store blockchain. + + log log.Logger // Log utility + + ClientPeer *p2p.Peer // The peer for the benchmark tx generator client, used for leaders to return proof-of-accept + Client *client.Client // The presence of a client object means this node will also act as a client + SelfPeer p2p.Peer // TODO(minhdoan): it could be duplicated with Self below whose is Alok work. + IDCPeer p2p.Peer Neighbors sync.Map // All the neighbor nodes, key is the sha256 of Peer IP/Port, value is the p2p.Peer State State // State of the Node @@ -122,10 +123,6 @@ type Node struct { stateSync *syncing.StateSync syncingState uint32 - // Test only - TestBankKeys []*ecdsa.PrivateKey - ContractKeys []*ecdsa.PrivateKey - ContractAddresses []common.Address // The p2p host used to send/receive p2p messages host host.Host @@ -134,6 +131,11 @@ type Node struct { // Signal channel for lost validators OfflinePeers chan p2p.Peer + + // For test only + TestBankKeys []*ecdsa.PrivateKey + ContractKeys []*ecdsa.PrivateKey + ContractAddresses []common.Address } // Blockchain returns the blockchain from node @@ -163,21 +165,11 @@ func (node *Node) getTransactionsForNewBlock(maxNumTxs int) types.Transactions { return selected } -// StartServer starts a server and process the request by a handler. +// StartServer starts a server and process the requests by a handler. func (node *Node) StartServer() { node.host.BindHandlerAndServe(node.StreamHandler) } -// SetLog sets log for Node. -func (node *Node) SetLog() *Node { - node.log = log.New() - return node -} - -func (node *Node) String() string { - return node.Consensus.String() -} - // Count the total number of transactions in the blockchain // Currently used for stats reporting purpose func (node *Node) countNumTransactionsInBlockchain() int { @@ -237,7 +229,7 @@ func New(host host.Host, consensus *bft.Consensus, db *hdb.LDBDatabase) *Node { // Initialize genesis block and blockchain genesisAlloc := node.CreateGenesisAllocWithTestingAddresses(100) - contractKey, _ := ecdsa.GenerateKey(crypto.S256(), strings.NewReader("Test contract key string blablablablablablablablablablablablablablablablabl")) + contractKey, _ := ecdsa.GenerateKey(crypto.S256(), strings.NewReader("Test contract key string stream that is fixed so that generated test key are deterministic every time")) contractAddress := crypto.PubkeyToAddress(contractKey.PublicKey) contractFunds := big.NewInt(9000000) contractFunds = contractFunds.Mul(contractFunds, big.NewInt(params.Ether)) @@ -332,7 +324,6 @@ func GetSyncingPort(nodePort string) string { // GetSyncingPeers returns list of peers. // Right now, the list length is only 1 for testing. -// TODO(mihdoan): fix it later. func (node *Node) GetSyncingPeers() []p2p.Peer { res := []p2p.Peer{} node.Neighbors.Range(func(k, v interface{}) bool { diff --git a/test/deploy.sh b/test/deploy.sh index 681fab544..46a576c25 100755 --- a/test/deploy.sh +++ b/test/deploy.sh @@ -89,7 +89,7 @@ pushd $ROOT echo "compiling ..." go build -o bin/benchmark go build -o bin/txgen client/txgen/main.go -go build -o bin/beacon beaconchain/main/main.go +go build -o bin/beacon cmd/beaconchain/main.go popd # Create a tmp folder for logs From f372ef742157802abd59e7b1afc364e6dc217dc7 Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Tue, 1 Jan 2019 17:42:19 -0800 Subject: [PATCH 3/3] Cleanup node_handler.go --- node/node_handler.go | 36 +++++++++++++++--------------------- 1 file changed, 15 insertions(+), 21 deletions(-) diff --git a/node/node_handler.go b/node/node_handler.go index 39914e481..c76125f8d 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -20,22 +20,19 @@ import ( ) const ( - // MinNumberOfTransactionsPerBlock is the min number of transaction per a block. - MinNumberOfTransactionsPerBlock = 6000 // MaxNumberOfTransactionsPerBlock is the max number of transaction per a block. MaxNumberOfTransactionsPerBlock = 8000 - // NumBlocksBeforeStateBlock is the number of blocks allowed before generating state block - NumBlocksBeforeStateBlock = 1000 ) // MaybeBroadcastAsValidator returns if the node is a validator node. func (node *Node) MaybeBroadcastAsValidator(content []byte) { + // TODO: this is tree broadcasting. this needs to be removed later. Actually the whole logic needs to be replaced by p2p. if node.SelfPeer.ValidatorID > 0 && node.SelfPeer.ValidatorID <= host.MaxBroadCast { go host.BroadcastMessageFromValidator(node.host, node.SelfPeer, node.Consensus.GetValidatorPeers(), content) } } -// StreamHandler handles a new incoming connection. +// StreamHandler handles a new incoming network message. func (node *Node) StreamHandler(s p2p.Stream) { defer s.Close() @@ -46,7 +43,6 @@ func (node *Node) StreamHandler(s p2p.Stream) { node.log.Error("Read p2p data failed", "err", err, "node", node) return } - // TODO: this is tree broadcasting. this needs to be removed later. Actually the whole logic needs to be replaced by p2p. node.MaybeBroadcastAsValidator(content) consensusObj := node.Consensus @@ -88,10 +84,10 @@ func (node *Node) StreamHandler(s p2p.Stream) { case proto.Consensus: msgPayload, _ := proto.GetConsensusMessagePayload(content) if consensusObj.IsLeader { - node.log.Info("NET: received message: Consensus/Leader") + node.log.Info("NET: Leader received message:", "messageCategory", msgCategory, "messageType", msgType) consensusObj.ProcessMessageLeader(msgPayload) } else { - node.log.Info("NET: received message: Consensus/Validator") + node.log.Info("NET: Validator received message:", "messageCategory", msgCategory, "messageType", msgType) consensusObj.ProcessMessageValidator(msgPayload) // TODO(minhdoan): add logic to check if the current blockchain is not sync with other consensus // we should switch to other state rather than DoingConsensus. @@ -153,11 +149,6 @@ func (node *Node) StreamHandler(s p2p.Stream) { default: node.log.Error("Unknown", "MsgCategory", msgCategory) } - - // Post processing after receiving messsages. - // if node.State == NodeJoinedShard || node.State == NodeReadyForConsensus { - // go node.DoSyncing() - // } } func (node *Node) transactionMessageHandler(msgPayload []byte) { @@ -199,15 +190,16 @@ func (node *Node) transactionMessageHandler(msgPayload []byte) { // WaitForConsensusReady listen for the readiness signal from consensus and generate new block for consensus. func (node *Node) WaitForConsensusReady(readySignal chan struct{}) { node.log.Debug("Waiting for Consensus ready", "node", node) - time.Sleep(15 * time.Second) + time.Sleep(15 * time.Second) // Wait for other nodes to be ready (test-only) firstTime := true var newBlock *types.Block timeoutCount := 0 - for { // keep waiting for Consensus ready + for { + // keep waiting for Consensus ready select { case <-readySignal: - time.Sleep(100 * time.Millisecond) // Delay a bit so validator is catched up. + time.Sleep(100 * time.Millisecond) // Delay a bit so validator is catched up (test-only). case <-time.After(200 * time.Second): node.Consensus.ResetState() timeoutCount++ @@ -215,12 +207,14 @@ func (node *Node) WaitForConsensusReady(readySignal chan struct{}) { } for { - node.log.Debug("STARTING BLOCK") + node.log.Debug("Start creating new block") + // threshold and firstTime are for the test-only built-in smart contract tx. TODO: remove in production threshold := 1 if firstTime { threshold = 2 firstTime = false } + if len(node.pendingTransactions) >= threshold { // Normal tx block consensus selectedTxs := node.getTransactionsForNewBlock(MaxNumberOfTransactionsPerBlock) @@ -250,7 +244,7 @@ func (node *Node) WaitForConsensusReady(readySignal chan struct{}) { // NOTE: For now, just send to the client (basically not broadcasting) func (node *Node) BroadcastNewBlock(newBlock *types.Block) { if node.ClientPeer != nil { - node.log.Debug("NET: SENDING NEW BLOCK TO CLIENT", "client", node.ClientPeer) + node.log.Debug("Sending new block to client", "client", node.ClientPeer) node.SendMessage(*node.ClientPeer, proto_node.ConstructBlocksSyncMessage([]*types.Block{newBlock})) } } @@ -267,7 +261,7 @@ func (node *Node) VerifyNewBlock(newBlock *types.Block) bool { // PostConsensusProcessing is called by consensus participants, after consensus is done, to: // 1. add the new block to blockchain -// 2. [leader] move cross shard tx and proof to the list where they wait to be sent to the client +// 2. [leader] send new block to the client func (node *Node) PostConsensusProcessing(newBlock *types.Block) { if node.Consensus.IsLeader { node.BroadcastNewBlock(newBlock) @@ -278,9 +272,9 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block) { // AddNewBlock is usedd to add new block into the blockchain. func (node *Node) AddNewBlock(newBlock *types.Block) { - num, err := node.blockchain.InsertChain([]*types.Block{newBlock}) + blockNum, err := node.blockchain.InsertChain([]*types.Block{newBlock}) if err != nil { - node.log.Debug("Error adding to chain", "numBlocks", num, "Error", err) + node.log.Debug("Error adding new block to blockchain", "blockNum", blockNum, "Error", err) } }