Factor message queue and friends into ./msgq

pull/1710/head
Eugene Kim 5 years ago
parent 4e479e253a
commit 5c6fb65e6f
  1. 51
      msgq/msgq.go
  2. 13
      node/node.go
  3. 37
      node/node_handler.go

@ -0,0 +1,51 @@
// Package msgq implements a simple, finite-sized message queue. It can be used
// as a building block for a message processor pool.
package msgq
import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/pkg/errors"
)
type message struct {
content []byte
sender peer.ID
}
// MessageHandler is a message handler.
type MessageHandler interface {
HandleMessage(content []byte, sender peer.ID)
}
// Queue is a finite-sized message queue.
type Queue struct {
ch chan message
}
// New returns a new message queue of the given size.
func New(size int) *Queue {
return &Queue{ch: make(chan message, size)}
}
// AddMessage enqueues a received message for processing. It returns without
// blocking, and may return a queue overrun error.
func (q *Queue) AddMessage(content []byte, sender peer.ID) error {
select {
case q.ch <- message{content, sender}:
default:
return ErrRxOverrun
}
return nil
}
// HandleMessages dequeues and dispatches incoming messages using the given
// message handler, until the message queue is closed. This function can be
// spawned as a background goroutine, potentially multiple times for a pool.
func (q *Queue) HandleMessages(h MessageHandler) {
for msg := range q.ch {
h.HandleMessage(msg.content, msg.sender)
}
}
// ErrRxOverrun signals that a receive queue has been overrun.
var ErrRxOverrun = errors.New("rx overrun")

@ -7,7 +7,6 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
libp2p_peer "github.com/libp2p/go-libp2p-core/peer"
"github.com/harmony-one/harmony/accounts"
"github.com/harmony-one/harmony/api/client"
@ -29,6 +28,7 @@ import (
"github.com/harmony-one/harmony/internal/params"
"github.com/harmony-one/harmony/internal/shardchain"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/msgq"
"github.com/harmony-one/harmony/node/worker"
"github.com/harmony-one/harmony/p2p"
p2p_host "github.com/harmony-one/harmony/p2p/host"
@ -94,11 +94,6 @@ type syncConfig struct {
client *downloader.Client
}
type incomingMessage struct {
content []byte
sender libp2p_peer.ID
}
// Node represents a protocol-participating node in the network
type Node struct {
Consensus *consensus.Consensus // Consensus object containing all Consensus related data (e.g. committee members, signatures, commits)
@ -157,7 +152,7 @@ type Node struct {
host p2p.Host
// Incoming messages to process.
incomingMessages chan incomingMessage
rxQueue *msgq.Queue
// Service manager.
serviceManager *service.Manager
@ -414,7 +409,7 @@ func (node *Node) getTransactionsForNewBlock(
// StartServer starts a server and process the requests by a handler.
func (node *Node) StartServer() {
for i := 0; i < RxWorkers; i++ {
go node.handleIncomingMessages()
go node.rxQueue.HandleMessages(node)
}
// start the goroutine to receive client message
@ -542,7 +537,7 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, chainDBFactory shardc
Interface("genesis block header", node.Blockchain().GetHeaderByNumber(0)).
Msg("Genesis block hash")
node.incomingMessages = make(chan incomingMessage, RxQueueSize)
node.rxQueue = msgq.New(RxQueueSize)
// Setup initial state of syncing.
node.peerRegistrationRecord = make(map[string]*syncConfig)

@ -14,6 +14,8 @@ import (
"github.com/ethereum/go-ethereum/rlp"
pb "github.com/golang/protobuf/proto"
"github.com/harmony-one/bls/ffi/go/bls"
libp2p_peer "github.com/libp2p/go-libp2p-core/peer"
"github.com/harmony-one/harmony/api/proto"
proto_discovery "github.com/harmony-one/harmony/api/proto/discovery"
"github.com/harmony-one/harmony/api/proto/message"
@ -28,8 +30,6 @@ import (
"github.com/harmony-one/harmony/p2p/host"
"github.com/harmony-one/harmony/shard"
staking "github.com/harmony-one/harmony/staking/types"
libp2p_peer "github.com/libp2p/go-libp2p-core/peer"
"github.com/pkg/errors"
)
const (
@ -53,7 +53,7 @@ func (node *Node) receiveGroupMessage(receiver p2p.GroupReceiver) {
}
//utils.Logger().Info("[PUBSUB]", "received group msg", len(msg), "sender", sender)
// skip the first 5 bytes, 1 byte is p2p type, 4 bytes are message size
if err := node.enqueueIncomingMessage(msg[5:], sender); err != nil {
if err := node.rxQueue.AddMessage(msg[5:], sender); err != nil {
utils.Logger().Warn().Err(err).
Str("sender", sender.Pretty()).
Msg("cannot enqueue incoming message for processing")
@ -61,34 +61,13 @@ func (node *Node) receiveGroupMessage(receiver p2p.GroupReceiver) {
}
}
var errRxOverrun = errors.New("rx overrun")
func (node *Node) enqueueIncomingMessage(
content []byte, sender libp2p_peer.ID,
) error {
select {
case node.incomingMessages <- incomingMessage{content, sender}:
default:
return errRxOverrun
}
return nil
}
func (node *Node) handleIncomingMessages() {
for {
// TODO ek – infinite loop; add shutdown/cleanup logic
msg := <-node.incomingMessages
node.handleMessage(msg.content, msg.sender)
}
}
// handleMessage parses the message and dispatch the actions
func (node *Node) handleMessage(content []byte, sender libp2p_peer.ID) {
// HandleMessage parses the message and dispatch the actions.
func (node *Node) HandleMessage(content []byte, sender libp2p_peer.ID) {
msgCategory, err := proto.GetMessageCategory(content)
if err != nil {
utils.Logger().Error().
Err(err).
Msg("handleMessage get message category failed")
Msg("HandleMessage get message category failed")
return
}
@ -96,7 +75,7 @@ func (node *Node) handleMessage(content []byte, sender libp2p_peer.ID) {
if err != nil {
utils.Logger().Error().
Err(err).
Msg("handleMessage get message type failed")
Msg("HandleMessage get message type failed")
return
}
@ -104,7 +83,7 @@ func (node *Node) handleMessage(content []byte, sender libp2p_peer.ID) {
if err != nil {
utils.Logger().Error().
Err(err).
Msg("handleMessage get message payload failed")
Msg("HandleMessage get message payload failed")
return
}

Loading…
Cancel
Save