|
|
|
@ -55,10 +55,18 @@ const ( |
|
|
|
|
TxPoolLimit = 20000 |
|
|
|
|
// NumTryBroadCast is the number of times trying to broadcast
|
|
|
|
|
NumTryBroadCast = 3 |
|
|
|
|
// RxQueueSize is the number of messages to queue before tail-dropping.
|
|
|
|
|
RxQueueSize = 16384 |
|
|
|
|
// RxWorkers is the number of concurrent message handlers.
|
|
|
|
|
RxWorkers = 32 |
|
|
|
|
// ClientRxQueueSize is the number of client messages to queue before tail-dropping.
|
|
|
|
|
ClientRxQueueSize = 16384 |
|
|
|
|
// ShardRxQueueSize is the number of shard messages to queue before tail-dropping.
|
|
|
|
|
ShardRxQueueSize = 16384 |
|
|
|
|
// GlobalRxQueueSize is the number of global messages to queue before tail-dropping.
|
|
|
|
|
GlobalRxQueueSize = 16384 |
|
|
|
|
// ClientRxWorkers is the number of concurrent client message handlers.
|
|
|
|
|
ClientRxWorkers = 8 |
|
|
|
|
// ShardRxWorkers is the number of concurrent shard message handlers.
|
|
|
|
|
ShardRxWorkers = 32 |
|
|
|
|
// GlobalRxWorkers is the number of concurrent global message handlers.
|
|
|
|
|
GlobalRxWorkers = 32 |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
func (state State) String() string { |
|
|
|
@ -152,7 +160,9 @@ type Node struct { |
|
|
|
|
host p2p.Host |
|
|
|
|
|
|
|
|
|
// Incoming messages to process.
|
|
|
|
|
rxQueue *msgq.Queue |
|
|
|
|
clientRxQueue *msgq.Queue |
|
|
|
|
shardRxQueue *msgq.Queue |
|
|
|
|
globalRxQueue *msgq.Queue |
|
|
|
|
|
|
|
|
|
// Service manager.
|
|
|
|
|
serviceManager *service.Manager |
|
|
|
@ -407,22 +417,29 @@ func (node *Node) getTransactionsForNewBlock( |
|
|
|
|
return selected, selectedStaking |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// StartServer starts a server and process the requests by a handler.
|
|
|
|
|
func (node *Node) StartServer() { |
|
|
|
|
for i := 0; i < RxWorkers; i++ { |
|
|
|
|
go node.rxQueue.HandleMessages(node) |
|
|
|
|
func (node *Node) startRxPipeline( |
|
|
|
|
receiver p2p.GroupReceiver, queue *msgq.Queue, numWorkers int, |
|
|
|
|
) { |
|
|
|
|
// consumers
|
|
|
|
|
for i := 0; i < numWorkers; i++ { |
|
|
|
|
go queue.HandleMessages(node) |
|
|
|
|
} |
|
|
|
|
// provider
|
|
|
|
|
go node.receiveGroupMessage(receiver, queue) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// StartServer starts a server and process the requests by a handler.
|
|
|
|
|
func (node *Node) StartServer() { |
|
|
|
|
// start the goroutine to receive client message
|
|
|
|
|
// client messages are sent by clients, like txgen, wallet
|
|
|
|
|
go node.receiveGroupMessage(node.clientReceiver) |
|
|
|
|
node.startRxPipeline(node.clientReceiver, node.clientRxQueue, ClientRxWorkers) |
|
|
|
|
|
|
|
|
|
// start the goroutine to receive group message
|
|
|
|
|
go node.receiveGroupMessage(node.shardGroupReceiver) |
|
|
|
|
node.startRxPipeline(node.shardGroupReceiver, node.shardRxQueue, ShardRxWorkers) |
|
|
|
|
|
|
|
|
|
// start the goroutine to receive global message, used for cross-shard TX
|
|
|
|
|
// FIXME (leo): we use beacon client topic as the global topic for now
|
|
|
|
|
go node.receiveGroupMessage(node.globalGroupReceiver) |
|
|
|
|
node.startRxPipeline(node.globalGroupReceiver, node.globalRxQueue, GlobalRxWorkers) |
|
|
|
|
|
|
|
|
|
select {} |
|
|
|
|
} |
|
|
|
@ -525,7 +542,9 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, chainDBFactory shardc |
|
|
|
|
Interface("genesis block header", node.Blockchain().GetHeaderByNumber(0)). |
|
|
|
|
Msg("Genesis block hash") |
|
|
|
|
|
|
|
|
|
node.rxQueue = msgq.New(RxQueueSize) |
|
|
|
|
node.clientRxQueue = msgq.New(ClientRxQueueSize) |
|
|
|
|
node.shardRxQueue = msgq.New(ShardRxQueueSize) |
|
|
|
|
node.globalRxQueue = msgq.New(GlobalRxQueueSize) |
|
|
|
|
|
|
|
|
|
// Setup initial state of syncing.
|
|
|
|
|
node.peerRegistrationRecord = make(map[string]*syncConfig) |
|
|
|
|