Merge pull request #1725 from harmony-ek/use_per_group_queues

Use per-group-receiver message queue
pull/1731/head
Eugene Kim 5 years ago committed by GitHub
commit 8b5a3235fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      msgq/msgq.go
  2. 45
      node/node.go
  3. 7
      node/node_handler.go

@ -12,6 +12,12 @@ type message struct {
sender peer.ID sender peer.ID
} }
// MessageAdder enqueues a received message for processing. It returns without
// blocking, and may return a queue overrun error.
type MessageAdder interface {
AddMessage(content []byte, sender peer.ID) error
}
// MessageHandler is a message handler. // MessageHandler is a message handler.
type MessageHandler interface { type MessageHandler interface {
HandleMessage(content []byte, sender peer.ID) HandleMessage(content []byte, sender peer.ID)

@ -55,10 +55,18 @@ const (
TxPoolLimit = 20000 TxPoolLimit = 20000
// NumTryBroadCast is the number of times trying to broadcast // NumTryBroadCast is the number of times trying to broadcast
NumTryBroadCast = 3 NumTryBroadCast = 3
// RxQueueSize is the number of messages to queue before tail-dropping. // ClientRxQueueSize is the number of client messages to queue before tail-dropping.
RxQueueSize = 16384 ClientRxQueueSize = 16384
// RxWorkers is the number of concurrent message handlers. // ShardRxQueueSize is the number of shard messages to queue before tail-dropping.
RxWorkers = 32 ShardRxQueueSize = 16384
// GlobalRxQueueSize is the number of global messages to queue before tail-dropping.
GlobalRxQueueSize = 16384
// ClientRxWorkers is the number of concurrent client message handlers.
ClientRxWorkers = 8
// ShardRxWorkers is the number of concurrent shard message handlers.
ShardRxWorkers = 32
// GlobalRxWorkers is the number of concurrent global message handlers.
GlobalRxWorkers = 32
) )
func (state State) String() string { func (state State) String() string {
@ -152,7 +160,9 @@ type Node struct {
host p2p.Host host p2p.Host
// Incoming messages to process. // Incoming messages to process.
rxQueue *msgq.Queue clientRxQueue *msgq.Queue
shardRxQueue *msgq.Queue
globalRxQueue *msgq.Queue
// Service manager. // Service manager.
serviceManager *service.Manager serviceManager *service.Manager
@ -407,22 +417,29 @@ func (node *Node) getTransactionsForNewBlock(
return selected, selectedStaking return selected, selectedStaking
} }
// StartServer starts a server and process the requests by a handler. func (node *Node) startRxPipeline(
func (node *Node) StartServer() { receiver p2p.GroupReceiver, queue *msgq.Queue, numWorkers int,
for i := 0; i < RxWorkers; i++ { ) {
go node.rxQueue.HandleMessages(node) // consumers
for i := 0; i < numWorkers; i++ {
go queue.HandleMessages(node)
}
// provider
go node.receiveGroupMessage(receiver, queue)
} }
// StartServer starts a server and process the requests by a handler.
func (node *Node) StartServer() {
// start the goroutine to receive client message // start the goroutine to receive client message
// client messages are sent by clients, like txgen, wallet // client messages are sent by clients, like txgen, wallet
go node.receiveGroupMessage(node.clientReceiver) node.startRxPipeline(node.clientReceiver, node.clientRxQueue, ClientRxWorkers)
// start the goroutine to receive group message // start the goroutine to receive group message
go node.receiveGroupMessage(node.shardGroupReceiver) node.startRxPipeline(node.shardGroupReceiver, node.shardRxQueue, ShardRxWorkers)
// start the goroutine to receive global message, used for cross-shard TX // start the goroutine to receive global message, used for cross-shard TX
// FIXME (leo): we use beacon client topic as the global topic for now // FIXME (leo): we use beacon client topic as the global topic for now
go node.receiveGroupMessage(node.globalGroupReceiver) node.startRxPipeline(node.globalGroupReceiver, node.globalRxQueue, GlobalRxWorkers)
select {} select {}
} }
@ -525,7 +542,9 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, chainDBFactory shardc
Interface("genesis block header", node.Blockchain().GetHeaderByNumber(0)). Interface("genesis block header", node.Blockchain().GetHeaderByNumber(0)).
Msg("Genesis block hash") Msg("Genesis block hash")
node.rxQueue = msgq.New(RxQueueSize) node.clientRxQueue = msgq.New(ClientRxQueueSize)
node.shardRxQueue = msgq.New(ShardRxQueueSize)
node.globalRxQueue = msgq.New(GlobalRxQueueSize)
// Setup initial state of syncing. // Setup initial state of syncing.
node.peerRegistrationRecord = make(map[string]*syncConfig) node.peerRegistrationRecord = make(map[string]*syncConfig)

@ -26,6 +26,7 @@ import (
nodeconfig "github.com/harmony-one/harmony/internal/configs/node" nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/ctxerror" "github.com/harmony-one/harmony/internal/ctxerror"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/msgq"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/host" "github.com/harmony-one/harmony/p2p/host"
"github.com/harmony-one/harmony/shard" "github.com/harmony-one/harmony/shard"
@ -38,7 +39,9 @@ const (
) )
// receiveGroupMessage use libp2p pubsub mechanism to receive broadcast messages // receiveGroupMessage use libp2p pubsub mechanism to receive broadcast messages
func (node *Node) receiveGroupMessage(receiver p2p.GroupReceiver) { func (node *Node) receiveGroupMessage(
receiver p2p.GroupReceiver, rxQueue msgq.MessageAdder,
) {
ctx := context.Background() ctx := context.Background()
// TODO ek – infinite loop; add shutdown/cleanup logic // TODO ek – infinite loop; add shutdown/cleanup logic
for { for {
@ -53,7 +56,7 @@ func (node *Node) receiveGroupMessage(receiver p2p.GroupReceiver) {
} }
//utils.Logger().Info("[PUBSUB]", "received group msg", len(msg), "sender", sender) //utils.Logger().Info("[PUBSUB]", "received group msg", len(msg), "sender", sender)
// skip the first 5 bytes, 1 byte is p2p type, 4 bytes are message size // skip the first 5 bytes, 1 byte is p2p type, 4 bytes are message size
if err := node.rxQueue.AddMessage(msg[5:], sender); err != nil { if err := rxQueue.AddMessage(msg[5:], sender); err != nil {
utils.Logger().Warn().Err(err). utils.Logger().Warn().Err(err).
Str("sender", sender.Pretty()). Str("sender", sender.Pretty()).
Msg("cannot enqueue incoming message for processing") Msg("cannot enqueue incoming message for processing")

Loading…
Cancel
Save