From 45e0eeb728f7af187cca8cc1af1895e6784ef9ab Mon Sep 17 00:00:00 2001 From: Minh Doan Date: Tue, 27 Nov 2018 22:12:11 -0800 Subject: [PATCH 1/2] add some more State and set up syncing in each node --- benchmark.go | 5 ++- node/node.go | 60 +++++++++++++++++++++++++++--------- node/node_handler.go | 11 ++++--- syncing/downloader/server.go | 4 +++ 4 files changed, 60 insertions(+), 20 deletions(-) diff --git a/benchmark.go b/benchmark.go index af17637fa..b1c43e396 100644 --- a/benchmark.go +++ b/benchmark.go @@ -230,5 +230,8 @@ func main() { } } - currentNode.StartServer(*port) + go currentNode.StartServer(*port) + go currentNode.StartHelpSyncing() + // Keep waiting. + <-make(chan struct{}) } diff --git a/node/node.go b/node/node.go index 224eb53c8..555f786b0 100644 --- a/node/node.go +++ b/node/node.go @@ -31,17 +31,26 @@ import ( downloader_pb "github.com/harmony-one/harmony/syncing/downloader/proto" ) -type NodeState byte +// State is a state of a node. +type State byte +// All constants except the NodeLeader below are for validators only. const ( - NodeInit NodeState = iota // Node just started, before contacting BeaconChain - NodeWaitToJoin // Node contacted BeaconChain, wait to join Shard - NodeJoinedShard // Node joined Shard, ready for consensus - NodeOffline // Node is offline - NodeReadyForConsensus // Node is ready to do consensus - NodeDoingConsensus // Node is already doing consensus + NodeInit State = iota // Node just started, before contacting BeaconChain + NodeWaitToJoin // Node contacted BeaconChain, wait to join Shard + NodeJoinedShard // Node joined Shard, ready for consensus + NodeOffline // Node is offline + NodeReadyForConsensus // Node is ready to do consensus + NodeDoingConsensus // Node is already doing consensus + NodeLeader // Node is the leader of some shard. ) +const ( + // TimeToSleepForSyncing is the time waiting for node transformed into NodeDoingConsensus + TimeToSleepForSyncing = time.Second * 30 +) + +// NetworkNode is TODO(leo) type NetworkNode struct { SelfPeer p2p.Peer IDCPeer p2p.Peer @@ -71,7 +80,7 @@ type Node struct { SyncNode bool // TODO(minhdoan): Remove it later. chain *core.BlockChain // Account Model Neighbors sync.Map // All the neighbor nodes, key is the sha256 of Peer IP/Port, value is the p2p.Peer - State NodeState // State of the Node + State State // State of the Node // Account Model pendingTransactionsAccount types.Transactions // TODO: replace with txPool @@ -150,6 +159,7 @@ func (node *Node) StartServer(port string) { node.listenOnPort(port) } +// SetLog sets log for Node. func (node *Node) SetLog() *Node { node.log = log.New() return node @@ -205,7 +215,7 @@ func (node *Node) countNumTransactionsInBlockchainAccount() int { return count } -//ConnectIdentityChain connects to identity chain +//ConnectBeaconChain connects to identity chain func (node *Node) ConnectBeaconChain() { Nnode := &NetworkNode{SelfPeer: node.SelfPeer, IDCPeer: node.IDCPeer} msg := node.SerializeNode(Nnode) @@ -298,12 +308,16 @@ func New(consensus *bft.Consensus, db *hdb.LDBDatabase) *Node { } // Logger node.log = log.New() - node.State = NodeInit + if consensus.IsLeader { + node.State = NodeLeader + } else { + node.State = NodeInit + } return &node } -// Add neighbors nodes +// AddPeers adds neighbors nodes func (node *Node) AddPeers(peers []p2p.Peer) int { count := 0 for _, p := range peers { @@ -337,10 +351,28 @@ func (node *Node) JoinShard(leader p2p.Peer) { } } -// StartDownloaderServer starts downloader server. -func (node *Node) StartDownloaderServer() { +// StartHelpSyncing keeps sleeping until it's doing consensus or it's a leader. +func (node *Node) StartHelpSyncing() { + for { + time.Sleep(TimeToSleepForSyncing) + // + if node.State == NodeDoingConsensus || node.State == NodeLeader { + node.InitSyncingServer() + node.StartHelpSyncing() + break + } + } +} + +// InitSyncingServer starts downloader server. +func (node *Node) InitSyncingServer() { node.downloaderServer = downloader.NewServer(node) - // node.downloaderServer.Start(node.) +} + +// StartSyncingServer starts syncing server. +func (node *Node) StartSyncingServer() { + // Handles returned grpcServer?? + node.downloaderServer.Start("localhost", downloader.DefaultDownloadPort) } // CalculateResponse implements DownloadInterface on Node object. diff --git a/node/node_handler.go b/node/node_handler.go index e5a447b8b..b6d9eece7 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -5,14 +5,15 @@ import ( "bytes" "encoding/gob" "fmt" - "github.com/ethereum/go-ethereum/rlp" - "github.com/harmony-one/harmony/core/types" - "github.com/harmony-one/harmony/crypto/pki" "net" "os" "strconv" "time" + "github.com/ethereum/go-ethereum/rlp" + "github.com/harmony-one/harmony/core/types" + "github.com/harmony-one/harmony/crypto/pki" + "github.com/harmony-one/harmony/blockchain" hmy_crypto "github.com/harmony-one/harmony/crypto" "github.com/harmony-one/harmony/p2p" @@ -371,7 +372,7 @@ func (node *Node) WaitForConsensusReady(readySignal chan struct{}) { } } -// WaitForConsensusReady ... +// WaitForConsensusReadyAccount ... func (node *Node) WaitForConsensusReadyAccount(readySignal chan struct{}) { node.log.Debug("Waiting for Consensus ready", "node", node) @@ -465,7 +466,7 @@ func (node *Node) VerifyNewBlock(newBlock *blockchain.Block) bool { return node.UtxoPool.VerifyTransactions(newBlock.Transactions) } -// VerifyNewBlock is called by consensus participants to verify the block (account model) they are running consensus on +// VerifyNewBlockAccount is called by consensus participants to verify the block (account model) they are running consensus on func (node *Node) VerifyNewBlockAccount(newBlock *types.Block) bool { err := node.Chain.ValidateNewBlock(newBlock, pki.GetAddressFromPublicKey(node.SelfPeer.PubKey)) if err != nil { diff --git a/syncing/downloader/server.go b/syncing/downloader/server.go index 753e6572e..0b11fac8a 100644 --- a/syncing/downloader/server.go +++ b/syncing/downloader/server.go @@ -11,6 +11,10 @@ import ( pb "github.com/harmony-one/harmony/syncing/downloader/proto" ) +const ( + DefaultDownloadPort = "8888" +) + // Server is the Server struct for downloader package. type Server struct { downloadInterface DownloadInterface From c2263ed53dfc5a32c0394e49ebf64985b50ebcb1 Mon Sep 17 00:00:00 2001 From: Minh Doan Date: Wed, 28 Nov 2018 11:45:44 -0800 Subject: [PATCH 2/2] change the listened address --- benchmark.go | 6 ++---- node/node.go | 19 ++++++------------- 2 files changed, 8 insertions(+), 17 deletions(-) diff --git a/benchmark.go b/benchmark.go index b1c43e396..5b698c651 100644 --- a/benchmark.go +++ b/benchmark.go @@ -230,8 +230,6 @@ func main() { } } - go currentNode.StartServer(*port) - go currentNode.StartHelpSyncing() - // Keep waiting. - <-make(chan struct{}) + go currentNode.SupportSyncing() + currentNode.StartServer(*port) } diff --git a/node/node.go b/node/node.go index 555f786b0..f420ab9a3 100644 --- a/node/node.go +++ b/node/node.go @@ -50,7 +50,7 @@ const ( TimeToSleepForSyncing = time.Second * 30 ) -// NetworkNode is TODO(leo) +// NetworkNode ... type NetworkNode struct { SelfPeer p2p.Peer IDCPeer p2p.Peer @@ -351,17 +351,10 @@ func (node *Node) JoinShard(leader p2p.Peer) { } } -// StartHelpSyncing keeps sleeping until it's doing consensus or it's a leader. -func (node *Node) StartHelpSyncing() { - for { - time.Sleep(TimeToSleepForSyncing) - // - if node.State == NodeDoingConsensus || node.State == NodeLeader { - node.InitSyncingServer() - node.StartHelpSyncing() - break - } - } +// SupportSyncing keeps sleeping until it's doing consensus or it's a leader. +func (node *Node) SupportSyncing() { + node.InitSyncingServer() + node.StartSyncingServer() } // InitSyncingServer starts downloader server. @@ -372,7 +365,7 @@ func (node *Node) InitSyncingServer() { // StartSyncingServer starts syncing server. func (node *Node) StartSyncingServer() { // Handles returned grpcServer?? - node.downloaderServer.Start("localhost", downloader.DefaultDownloadPort) + node.downloaderServer.Start(node.SelfPeer.Ip, downloader.DefaultDownloadPort) } // CalculateResponse implements DownloadInterface on Node object.