diff --git a/msgq/msgq.go b/msgq/msgq.go index e4c4f6d4b..a80ad1cf7 100644 --- a/msgq/msgq.go +++ b/msgq/msgq.go @@ -12,6 +12,12 @@ type message struct { sender peer.ID } +// MessageAdder enqueues a received message for processing. It returns without +// blocking, and may return a queue overrun error. +type MessageAdder interface { + AddMessage(content []byte, sender peer.ID) error +} + // MessageHandler is a message handler. type MessageHandler interface { HandleMessage(content []byte, sender peer.ID) diff --git a/node/node.go b/node/node.go index 3ed3bbaa1..d36170d61 100644 --- a/node/node.go +++ b/node/node.go @@ -55,10 +55,18 @@ const ( TxPoolLimit = 20000 // NumTryBroadCast is the number of times trying to broadcast NumTryBroadCast = 3 - // RxQueueSize is the number of messages to queue before tail-dropping. - RxQueueSize = 16384 - // RxWorkers is the number of concurrent message handlers. - RxWorkers = 32 + // ClientRxQueueSize is the number of client messages to queue before tail-dropping. + ClientRxQueueSize = 16384 + // ShardRxQueueSize is the number of shard messages to queue before tail-dropping. + ShardRxQueueSize = 16384 + // GlobalRxQueueSize is the number of global messages to queue before tail-dropping. + GlobalRxQueueSize = 16384 + // ClientRxWorkers is the number of concurrent client message handlers. + ClientRxWorkers = 8 + // ShardRxWorkers is the number of concurrent shard message handlers. + ShardRxWorkers = 32 + // GlobalRxWorkers is the number of concurrent global message handlers. + GlobalRxWorkers = 32 ) func (state State) String() string { @@ -152,7 +160,9 @@ type Node struct { host p2p.Host // Incoming messages to process. - rxQueue *msgq.Queue + clientRxQueue *msgq.Queue + shardRxQueue *msgq.Queue + globalRxQueue *msgq.Queue // Service manager. serviceManager *service.Manager @@ -407,22 +417,29 @@ func (node *Node) getTransactionsForNewBlock( return selected, selectedStaking } -// StartServer starts a server and process the requests by a handler. -func (node *Node) StartServer() { - for i := 0; i < RxWorkers; i++ { - go node.rxQueue.HandleMessages(node) +func (node *Node) startRxPipeline( + receiver p2p.GroupReceiver, queue *msgq.Queue, numWorkers int, +) { + // consumers + for i := 0; i < numWorkers; i++ { + go queue.HandleMessages(node) } + // provider + go node.receiveGroupMessage(receiver, queue) +} +// StartServer starts a server and process the requests by a handler. +func (node *Node) StartServer() { // start the goroutine to receive client message // client messages are sent by clients, like txgen, wallet - go node.receiveGroupMessage(node.clientReceiver) + node.startRxPipeline(node.clientReceiver, node.clientRxQueue, ClientRxWorkers) // start the goroutine to receive group message - go node.receiveGroupMessage(node.shardGroupReceiver) + node.startRxPipeline(node.shardGroupReceiver, node.shardRxQueue, ShardRxWorkers) // start the goroutine to receive global message, used for cross-shard TX // FIXME (leo): we use beacon client topic as the global topic for now - go node.receiveGroupMessage(node.globalGroupReceiver) + node.startRxPipeline(node.globalGroupReceiver, node.globalRxQueue, GlobalRxWorkers) select {} } @@ -525,7 +542,9 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, chainDBFactory shardc Interface("genesis block header", node.Blockchain().GetHeaderByNumber(0)). Msg("Genesis block hash") - node.rxQueue = msgq.New(RxQueueSize) + node.clientRxQueue = msgq.New(ClientRxQueueSize) + node.shardRxQueue = msgq.New(ShardRxQueueSize) + node.globalRxQueue = msgq.New(GlobalRxQueueSize) // Setup initial state of syncing. node.peerRegistrationRecord = make(map[string]*syncConfig) diff --git a/node/node_handler.go b/node/node_handler.go index 447d3e631..b7663a2ea 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -26,6 +26,7 @@ import ( nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/internal/ctxerror" "github.com/harmony-one/harmony/internal/utils" + "github.com/harmony-one/harmony/msgq" "github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p/host" "github.com/harmony-one/harmony/shard" @@ -38,7 +39,9 @@ const ( ) // receiveGroupMessage use libp2p pubsub mechanism to receive broadcast messages -func (node *Node) receiveGroupMessage(receiver p2p.GroupReceiver) { +func (node *Node) receiveGroupMessage( + receiver p2p.GroupReceiver, rxQueue msgq.MessageAdder, +) { ctx := context.Background() // TODO ek – infinite loop; add shutdown/cleanup logic for { @@ -53,7 +56,7 @@ func (node *Node) receiveGroupMessage(receiver p2p.GroupReceiver) { } //utils.Logger().Info("[PUBSUB]", "received group msg", len(msg), "sender", sender) // skip the first 5 bytes, 1 byte is p2p type, 4 bytes are message size - if err := node.rxQueue.AddMessage(msg[5:], sender); err != nil { + if err := rxQueue.AddMessage(msg[5:], sender); err != nil { utils.Logger().Warn().Err(err). Str("sender", sender.Pretty()). Msg("cannot enqueue incoming message for processing")