From d6b429dc9facf3f5e2e04aac2f9fc2312511c1a9 Mon Sep 17 00:00:00 2001 From: Eugene Kim Date: Tue, 8 Oct 2019 13:49:00 -0700 Subject: [PATCH] Process messages after node is fully initialized This obviates polling logic for receiver being nil. --- node/node.go | 29 +++++++++++++++-------------- node/node_handler.go | 13 ------------- 2 files changed, 15 insertions(+), 27 deletions(-) diff --git a/node/node.go b/node/node.go index 90d86dbbc..b2666fc47 100644 --- a/node/node.go +++ b/node/node.go @@ -413,6 +413,21 @@ func (node *Node) getTransactionsForNewBlock( // StartServer starts a server and process the requests by a handler. func (node *Node) StartServer() { + for i := 0; i < RxWorkers; i++ { + go node.handleIncomingMessages() + } + + // start the goroutine to receive client message + // client messages are sent by clients, like txgen, wallet + go node.ReceiveClientGroupMessage() + + // start the goroutine to receive group message + go node.ReceiveGroupMessage() + + // start the goroutine to receive global message, used for cross-shard TX + // FIXME (leo): we use beacon client topic as the global topic for now + go node.ReceiveGlobalMessage() + select {} } @@ -528,20 +543,6 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, chainDBFactory shardc Msg("Genesis block hash") node.incomingMessages = make(chan incomingMessage, RxQueueSize) - for i := 0; i < RxWorkers; i++ { - go node.handleIncomingMessages() - } - - // start the goroutine to receive client message - // client messages are sent by clients, like txgen, wallet - go node.ReceiveClientGroupMessage() - - // start the goroutine to receive group message - go node.ReceiveGroupMessage() - - // start the goroutine to receive global message, used for cross-shard TX - // FIXME (leo): we use beacon client topic as the global topic for now - go node.ReceiveGlobalMessage() // Setup initial state of syncing. node.peerRegistrationRecord = make(map[string]*syncConfig) diff --git a/node/node_handler.go b/node/node_handler.go index 79834019a..956c6a045 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -41,10 +41,6 @@ func (node *Node) ReceiveGlobalMessage() { ctx := context.Background() for { // TODO ek – infinite loop; add shutdown/cleanup logic - if node.globalGroupReceiver == nil { - time.Sleep(100 * time.Millisecond) - continue - } msg, sender, err := node.globalGroupReceiver.Receive(ctx) if err != nil { utils.Logger().Warn().Err(err). @@ -65,10 +61,6 @@ func (node *Node) ReceiveGroupMessage() { ctx := context.Background() // TODO ek – infinite loop; add shutdown/cleanup logic for { - if node.shardGroupReceiver == nil { - time.Sleep(100 * time.Millisecond) - continue - } msg, sender, err := node.shardGroupReceiver.Receive(ctx) if err != nil { utils.Logger().Warn().Err(err). @@ -89,11 +81,6 @@ func (node *Node) ReceiveClientGroupMessage() { ctx := context.Background() // TODO ek – infinite loop; add shutdown/cleanup logic for { - if node.clientReceiver == nil { - // check less frequent on client messages - time.Sleep(100 * time.Millisecond) - continue - } msg, sender, err := node.clientReceiver.Receive(ctx) if err != nil { utils.Logger().Warn().Err(err).