Process messages after node is fully initialized

This obviates polling logic for receiver being nil.
pull/1710/head
Eugene Kim 5 years ago
parent 2ddff3c3d4
commit d6b429dc9f
  1. 29
      node/node.go
  2. 13
      node/node_handler.go

@ -413,6 +413,21 @@ func (node *Node) getTransactionsForNewBlock(
// StartServer starts a server and process the requests by a handler.
func (node *Node) StartServer() {
for i := 0; i < RxWorkers; i++ {
go node.handleIncomingMessages()
}
// start the goroutine to receive client message
// client messages are sent by clients, like txgen, wallet
go node.ReceiveClientGroupMessage()
// start the goroutine to receive group message
go node.ReceiveGroupMessage()
// start the goroutine to receive global message, used for cross-shard TX
// FIXME (leo): we use beacon client topic as the global topic for now
go node.ReceiveGlobalMessage()
select {}
}
@ -528,20 +543,6 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, chainDBFactory shardc
Msg("Genesis block hash")
node.incomingMessages = make(chan incomingMessage, RxQueueSize)
for i := 0; i < RxWorkers; i++ {
go node.handleIncomingMessages()
}
// start the goroutine to receive client message
// client messages are sent by clients, like txgen, wallet
go node.ReceiveClientGroupMessage()
// start the goroutine to receive group message
go node.ReceiveGroupMessage()
// start the goroutine to receive global message, used for cross-shard TX
// FIXME (leo): we use beacon client topic as the global topic for now
go node.ReceiveGlobalMessage()
// Setup initial state of syncing.
node.peerRegistrationRecord = make(map[string]*syncConfig)

@ -41,10 +41,6 @@ func (node *Node) ReceiveGlobalMessage() {
ctx := context.Background()
for {
// TODO ek – infinite loop; add shutdown/cleanup logic
if node.globalGroupReceiver == nil {
time.Sleep(100 * time.Millisecond)
continue
}
msg, sender, err := node.globalGroupReceiver.Receive(ctx)
if err != nil {
utils.Logger().Warn().Err(err).
@ -65,10 +61,6 @@ func (node *Node) ReceiveGroupMessage() {
ctx := context.Background()
// TODO ek – infinite loop; add shutdown/cleanup logic
for {
if node.shardGroupReceiver == nil {
time.Sleep(100 * time.Millisecond)
continue
}
msg, sender, err := node.shardGroupReceiver.Receive(ctx)
if err != nil {
utils.Logger().Warn().Err(err).
@ -89,11 +81,6 @@ func (node *Node) ReceiveClientGroupMessage() {
ctx := context.Background()
// TODO ek – infinite loop; add shutdown/cleanup logic
for {
if node.clientReceiver == nil {
// check less frequent on client messages
time.Sleep(100 * time.Millisecond)
continue
}
msg, sender, err := node.clientReceiver.Receive(ctx)
if err != nil {
utils.Logger().Warn().Err(err).

Loading…
Cancel
Save