Merge pull request #191 from harmony-one/rj_branch

Cleanup node.go
pull/183/head
Rongjian Lan 6 years ago committed by GitHub
commit 367589232d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      node/address_faker.go
  2. 45
      node/node.go
  3. 36
      node/node_handler.go
  4. 10
      node/worker/worker.go
  5. 2
      test/deploy.sh

@ -12,7 +12,7 @@ import (
// CreateGenesisAllocWithTestingAddresses create the genesis block allocation that contains deterministically // CreateGenesisAllocWithTestingAddresses create the genesis block allocation that contains deterministically
// generated testing addressess with tokens. // generated testing addressess with tokens.
// TODO: Consider to remove it later when moving to production.a // TODO: Remove it later when moving to production.
func (node *Node) CreateGenesisAllocWithTestingAddresses(numAddress int) core.GenesisAlloc { func (node *Node) CreateGenesisAllocWithTestingAddresses(numAddress int) core.GenesisAlloc {
rand.Seed(0) rand.Seed(0)
len := 1000000 len := 1000000

@ -91,22 +91,23 @@ type NetworkNode struct {
IDCPeer p2p.Peer IDCPeer p2p.Peer
} }
// Node represents a program (machine) participating in the network // Node represents a protocol-participating node in the network
// TODO(minhdoan, rj): consider using BlockChannel *chan blockchain.Block for efficiency.
type Node struct { type Node struct {
Consensus *bft.Consensus // Consensus object containing all Consensus related data (e.g. committee members, signatures, commits) Consensus *bft.Consensus // Consensus object containing all Consensus related data (e.g. committee members, signatures, commits)
BlockChannel chan *types.Block // The channel to receive new blocks from Node BlockChannel chan *types.Block // The channel to receive new blocks from Node
pendingTransactions types.Transactions // All the transactions received but not yet processed for Consensus pendingTransactions types.Transactions // All the transactions received but not yet processed for Consensus
transactionInConsensus []*types.Transaction // The transactions selected into the new block and under Consensus process transactionInConsensus []*types.Transaction // The transactions selected into the new block and under Consensus process
blockchain *core.BlockChain // The blockchain for the shard where this node belongs
db *hdb.LDBDatabase // LevelDB to store blockchain.
log log.Logger // Log utility
pendingTxMutex sync.Mutex pendingTxMutex sync.Mutex
crossTxToReturnMutex sync.Mutex
ClientPeer *p2p.Peer // The peer for the benchmark tx generator client, used for leaders to return proof-of-accept blockchain *core.BlockChain // The blockchain for the shard where this node belongs
Client *client.Client // The presence of a client object means this node will also act as a client db *hdb.LDBDatabase // LevelDB to store blockchain.
SelfPeer p2p.Peer // TODO(minhdoan): it could be duplicated with Self below whose is Alok work.
IDCPeer p2p.Peer log log.Logger // Log utility
ClientPeer *p2p.Peer // The peer for the benchmark tx generator client, used for leaders to return proof-of-accept
Client *client.Client // The presence of a client object means this node will also act as a client
SelfPeer p2p.Peer // TODO(minhdoan): it could be duplicated with Self below whose is Alok work.
IDCPeer p2p.Peer
Neighbors sync.Map // All the neighbor nodes, key is the sha256 of Peer IP/Port, value is the p2p.Peer Neighbors sync.Map // All the neighbor nodes, key is the sha256 of Peer IP/Port, value is the p2p.Peer
State State // State of the Node State State // State of the Node
@ -122,10 +123,6 @@ type Node struct {
stateSync *syncing.StateSync stateSync *syncing.StateSync
syncingState uint32 syncingState uint32
// Test only
TestBankKeys []*ecdsa.PrivateKey
ContractKeys []*ecdsa.PrivateKey
ContractAddresses []common.Address
// The p2p host used to send/receive p2p messages // The p2p host used to send/receive p2p messages
host host.Host host host.Host
@ -134,6 +131,11 @@ type Node struct {
// Signal channel for lost validators // Signal channel for lost validators
OfflinePeers chan p2p.Peer OfflinePeers chan p2p.Peer
// For test only
TestBankKeys []*ecdsa.PrivateKey
ContractKeys []*ecdsa.PrivateKey
ContractAddresses []common.Address
} }
// Blockchain returns the blockchain from node // Blockchain returns the blockchain from node
@ -163,21 +165,11 @@ func (node *Node) getTransactionsForNewBlock(maxNumTxs int) types.Transactions {
return selected return selected
} }
// StartServer starts a server and process the request by a handler. // StartServer starts a server and process the requests by a handler.
func (node *Node) StartServer() { func (node *Node) StartServer() {
node.host.BindHandlerAndServe(node.StreamHandler) node.host.BindHandlerAndServe(node.StreamHandler)
} }
// SetLog sets log for Node.
func (node *Node) SetLog() *Node {
node.log = log.New()
return node
}
func (node *Node) String() string {
return node.Consensus.String()
}
// Count the total number of transactions in the blockchain // Count the total number of transactions in the blockchain
// Currently used for stats reporting purpose // Currently used for stats reporting purpose
func (node *Node) countNumTransactionsInBlockchain() int { func (node *Node) countNumTransactionsInBlockchain() int {
@ -237,7 +229,7 @@ func New(host host.Host, consensus *bft.Consensus, db *hdb.LDBDatabase) *Node {
// Initialize genesis block and blockchain // Initialize genesis block and blockchain
genesisAlloc := node.CreateGenesisAllocWithTestingAddresses(100) genesisAlloc := node.CreateGenesisAllocWithTestingAddresses(100)
contractKey, _ := ecdsa.GenerateKey(crypto.S256(), strings.NewReader("Test contract key string blablablablablablablablablablablablablablablablabl")) contractKey, _ := ecdsa.GenerateKey(crypto.S256(), strings.NewReader("Test contract key string stream that is fixed so that generated test key are deterministic every time"))
contractAddress := crypto.PubkeyToAddress(contractKey.PublicKey) contractAddress := crypto.PubkeyToAddress(contractKey.PublicKey)
contractFunds := big.NewInt(9000000) contractFunds := big.NewInt(9000000)
contractFunds = contractFunds.Mul(contractFunds, big.NewInt(params.Ether)) contractFunds = contractFunds.Mul(contractFunds, big.NewInt(params.Ether))
@ -332,7 +324,6 @@ func GetSyncingPort(nodePort string) string {
// GetSyncingPeers returns list of peers. // GetSyncingPeers returns list of peers.
// Right now, the list length is only 1 for testing. // Right now, the list length is only 1 for testing.
// TODO(mihdoan): fix it later.
func (node *Node) GetSyncingPeers() []p2p.Peer { func (node *Node) GetSyncingPeers() []p2p.Peer {
res := []p2p.Peer{} res := []p2p.Peer{}
node.Neighbors.Range(func(k, v interface{}) bool { node.Neighbors.Range(func(k, v interface{}) bool {

@ -21,22 +21,19 @@ import (
) )
const ( const (
// MinNumberOfTransactionsPerBlock is the min number of transaction per a block.
MinNumberOfTransactionsPerBlock = 6000
// MaxNumberOfTransactionsPerBlock is the max number of transaction per a block. // MaxNumberOfTransactionsPerBlock is the max number of transaction per a block.
MaxNumberOfTransactionsPerBlock = 8000 MaxNumberOfTransactionsPerBlock = 8000
// NumBlocksBeforeStateBlock is the number of blocks allowed before generating state block
NumBlocksBeforeStateBlock = 1000
) )
// MaybeBroadcastAsValidator returns if the node is a validator node. // MaybeBroadcastAsValidator returns if the node is a validator node.
func (node *Node) MaybeBroadcastAsValidator(content []byte) { func (node *Node) MaybeBroadcastAsValidator(content []byte) {
// TODO: this is tree broadcasting. this needs to be removed later. Actually the whole logic needs to be replaced by p2p.
if node.SelfPeer.ValidatorID > 0 && node.SelfPeer.ValidatorID <= host.MaxBroadCast { if node.SelfPeer.ValidatorID > 0 && node.SelfPeer.ValidatorID <= host.MaxBroadCast {
go host.BroadcastMessageFromValidator(node.host, node.SelfPeer, node.Consensus.GetValidatorPeers(), content) go host.BroadcastMessageFromValidator(node.host, node.SelfPeer, node.Consensus.GetValidatorPeers(), content)
} }
} }
// StreamHandler handles a new incoming connection. // StreamHandler handles a new incoming network message.
func (node *Node) StreamHandler(s p2p.Stream) { func (node *Node) StreamHandler(s p2p.Stream) {
defer s.Close() defer s.Close()
@ -47,7 +44,6 @@ func (node *Node) StreamHandler(s p2p.Stream) {
node.log.Error("Read p2p data failed", "err", err, "node", node) node.log.Error("Read p2p data failed", "err", err, "node", node)
return return
} }
// TODO: this is tree broadcasting. this needs to be removed later. Actually the whole logic needs to be replaced by p2p.
node.MaybeBroadcastAsValidator(content) node.MaybeBroadcastAsValidator(content)
consensusObj := node.Consensus consensusObj := node.Consensus
@ -89,10 +85,10 @@ func (node *Node) StreamHandler(s p2p.Stream) {
case proto.Consensus: case proto.Consensus:
msgPayload, _ := proto.GetConsensusMessagePayload(content) msgPayload, _ := proto.GetConsensusMessagePayload(content)
if consensusObj.IsLeader { if consensusObj.IsLeader {
node.log.Info("NET: received message: Consensus/Leader") node.log.Info("NET: Leader received message:", "messageCategory", msgCategory, "messageType", msgType)
consensusObj.ProcessMessageLeader(msgPayload) consensusObj.ProcessMessageLeader(msgPayload)
} else { } else {
node.log.Info("NET: received message: Consensus/Validator") node.log.Info("NET: Validator received message:", "messageCategory", msgCategory, "messageType", msgType)
consensusObj.ProcessMessageValidator(msgPayload) consensusObj.ProcessMessageValidator(msgPayload)
// TODO(minhdoan): add logic to check if the current blockchain is not sync with other consensus // TODO(minhdoan): add logic to check if the current blockchain is not sync with other consensus
// we should switch to other state rather than DoingConsensus. // we should switch to other state rather than DoingConsensus.
@ -154,11 +150,6 @@ func (node *Node) StreamHandler(s p2p.Stream) {
default: default:
node.log.Error("Unknown", "MsgCategory", msgCategory) node.log.Error("Unknown", "MsgCategory", msgCategory)
} }
// Post processing after receiving messsages.
// if node.State == NodeJoinedShard || node.State == NodeReadyForConsensus {
// go node.DoSyncing()
// }
} }
func (node *Node) transactionMessageHandler(msgPayload []byte) { func (node *Node) transactionMessageHandler(msgPayload []byte) {
@ -200,15 +191,16 @@ func (node *Node) transactionMessageHandler(msgPayload []byte) {
// WaitForConsensusReady listen for the readiness signal from consensus and generate new block for consensus. // WaitForConsensusReady listen for the readiness signal from consensus and generate new block for consensus.
func (node *Node) WaitForConsensusReady(readySignal chan struct{}) { func (node *Node) WaitForConsensusReady(readySignal chan struct{}) {
node.log.Debug("Waiting for Consensus ready", "node", node) node.log.Debug("Waiting for Consensus ready", "node", node)
time.Sleep(15 * time.Second) time.Sleep(15 * time.Second) // Wait for other nodes to be ready (test-only)
firstTime := true firstTime := true
var newBlock *types.Block var newBlock *types.Block
timeoutCount := 0 timeoutCount := 0
for { // keep waiting for Consensus ready for {
// keep waiting for Consensus ready
select { select {
case <-readySignal: case <-readySignal:
time.Sleep(100 * time.Millisecond) // Delay a bit so validator is catched up. time.Sleep(100 * time.Millisecond) // Delay a bit so validator is catched up (test-only).
case <-time.After(200 * time.Second): case <-time.After(200 * time.Second):
node.Consensus.ResetState() node.Consensus.ResetState()
timeoutCount++ timeoutCount++
@ -216,12 +208,14 @@ func (node *Node) WaitForConsensusReady(readySignal chan struct{}) {
} }
for { for {
node.log.Debug("STARTING BLOCK") node.log.Debug("Start creating new block")
// threshold and firstTime are for the test-only built-in smart contract tx. TODO: remove in production
threshold := 1 threshold := 1
if firstTime { if firstTime {
threshold = 2 threshold = 2
firstTime = false firstTime = false
} }
if len(node.pendingTransactions) >= threshold { if len(node.pendingTransactions) >= threshold {
// Normal tx block consensus // Normal tx block consensus
selectedTxs := node.getTransactionsForNewBlock(MaxNumberOfTransactionsPerBlock) selectedTxs := node.getTransactionsForNewBlock(MaxNumberOfTransactionsPerBlock)
@ -251,7 +245,7 @@ func (node *Node) WaitForConsensusReady(readySignal chan struct{}) {
// NOTE: For now, just send to the client (basically not broadcasting) // NOTE: For now, just send to the client (basically not broadcasting)
func (node *Node) BroadcastNewBlock(newBlock *types.Block) { func (node *Node) BroadcastNewBlock(newBlock *types.Block) {
if node.ClientPeer != nil { if node.ClientPeer != nil {
node.log.Debug("NET: SENDING NEW BLOCK TO CLIENT", "client", node.ClientPeer) node.log.Debug("Sending new block to client", "client", node.ClientPeer)
node.SendMessage(*node.ClientPeer, proto_node.ConstructBlocksSyncMessage([]*types.Block{newBlock})) node.SendMessage(*node.ClientPeer, proto_node.ConstructBlocksSyncMessage([]*types.Block{newBlock}))
} }
} }
@ -268,7 +262,7 @@ func (node *Node) VerifyNewBlock(newBlock *types.Block) bool {
// PostConsensusProcessing is called by consensus participants, after consensus is done, to: // PostConsensusProcessing is called by consensus participants, after consensus is done, to:
// 1. add the new block to blockchain // 1. add the new block to blockchain
// 2. [leader] move cross shard tx and proof to the list where they wait to be sent to the client // 2. [leader] send new block to the client
func (node *Node) PostConsensusProcessing(newBlock *types.Block) { func (node *Node) PostConsensusProcessing(newBlock *types.Block) {
if node.Consensus.IsLeader { if node.Consensus.IsLeader {
node.BroadcastNewBlock(newBlock) node.BroadcastNewBlock(newBlock)
@ -279,9 +273,9 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block) {
// AddNewBlock is usedd to add new block into the blockchain. // AddNewBlock is usedd to add new block into the blockchain.
func (node *Node) AddNewBlock(newBlock *types.Block) { func (node *Node) AddNewBlock(newBlock *types.Block) {
num, err := node.blockchain.InsertChain([]*types.Block{newBlock}) blockNum, err := node.blockchain.InsertChain([]*types.Block{newBlock})
if err != nil { if err != nil {
node.log.Debug("Error adding to chain", "numBlocks", num, "Error", err) node.log.Debug("Error adding new block to blockchain", "blockNum", blockNum, "Error", err)
} }
} }

@ -105,7 +105,7 @@ func (w *Worker) CommitTransactions(txs types.Transactions) error {
return nil return nil
} }
// UpdateCurrent updates ... // UpdateCurrent updates the current environment with the current state and header.
func (w *Worker) UpdateCurrent() error { func (w *Worker) UpdateCurrent() error {
parent := w.chain.CurrentBlock() parent := w.chain.CurrentBlock()
num := parent.Number() num := parent.Number()
@ -135,17 +135,17 @@ func (w *Worker) makeCurrent(parent *types.Block, header *types.Header) error {
return nil return nil
} }
// GetCurrentState ... // GetCurrentState gets the current state.
func (w *Worker) GetCurrentState() *state.StateDB { func (w *Worker) GetCurrentState() *state.StateDB {
return w.current.state return w.current.state
} }
// GetCurrentReceipts ... // GetCurrentReceipts get the receipts generated starting from the last state.
func (w *Worker) GetCurrentReceipts() []*types.Receipt { func (w *Worker) GetCurrentReceipts() []*types.Receipt {
return w.current.receipts return w.current.receipts
} }
// Commit ... // Commit generate a new block for the new txs.
func (w *Worker) Commit() (*types.Block, error) { func (w *Worker) Commit() (*types.Block, error) {
s := w.current.state.Copy() s := w.current.state.Copy()
block, err := w.engine.Finalize(w.chain, w.current.header, s, w.current.txs, w.current.receipts) block, err := w.engine.Finalize(w.chain, w.current.header, s, w.current.txs, w.current.receipts)
@ -155,7 +155,7 @@ func (w *Worker) Commit() (*types.Block, error) {
return block, nil return block, nil
} }
// New ... // New create a new worker object.
func New(config *params.ChainConfig, chain *core.BlockChain, engine consensus.Engine, coinbase common.Address, shardID uint32) *Worker { func New(config *params.ChainConfig, chain *core.BlockChain, engine consensus.Engine, coinbase common.Address, shardID uint32) *Worker {
worker := &Worker{ worker := &Worker{
config: config, config: config,

@ -89,7 +89,7 @@ pushd $ROOT
echo "compiling ..." echo "compiling ..."
go build -o bin/benchmark go build -o bin/benchmark
go build -o bin/txgen client/txgen/main.go go build -o bin/txgen client/txgen/main.go
go build -o bin/beacon beaconchain/main/main.go go build -o bin/beacon cmd/beaconchain/main.go
popd popd
# Create a tmp folder for logs # Create a tmp folder for logs

Loading…
Cancel
Save