You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
63 lines
1.6 KiB
63 lines
1.6 KiB
// Package msgq implements a simple, finite-sized message queue. It can be used
|
|
// as a building block for a message processor pool.
|
|
package msgq
|
|
|
|
import (
|
|
"github.com/libp2p/go-libp2p-core/peer"
|
|
"github.com/pkg/errors"
|
|
)
|
|
|
|
type message struct {
|
|
content []byte
|
|
sender peer.ID
|
|
}
|
|
|
|
// MessageAdder enqueues a received message for processing. It returns without
|
|
// blocking, and may return a queue overrun error.
|
|
type MessageAdder interface {
|
|
AddMessage(content []byte, sender peer.ID) error
|
|
}
|
|
|
|
// MessageHandler is a message handler.
|
|
type MessageHandler interface {
|
|
HandleMessage(content []byte, sender peer.ID)
|
|
}
|
|
|
|
// Queue is a finite-sized message queue.
|
|
type Queue struct {
|
|
ch chan message
|
|
}
|
|
|
|
// New returns a new message queue of the given size.
|
|
func New(size int) *Queue {
|
|
return &Queue{ch: make(chan message, size)}
|
|
}
|
|
|
|
// AddMessage enqueues a received message for processing. It returns without
|
|
// blocking, and may return a queue overrun error.
|
|
func (q *Queue) AddMessage(content []byte, sender peer.ID) error {
|
|
select {
|
|
case q.ch <- message{content, sender}:
|
|
default:
|
|
return ErrRxOverrun
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// HandleMessages dequeues and dispatches incoming messages using the given
|
|
// message handler, until the message queue is closed. This function can be
|
|
// spawned as a background goroutine, potentially multiple times for a pool.
|
|
func (q *Queue) HandleMessages(h MessageHandler) {
|
|
for msg := range q.ch {
|
|
h.HandleMessage(msg.content, msg.sender)
|
|
}
|
|
}
|
|
|
|
// Close closes the given queue.
|
|
func (q *Queue) Close() error {
|
|
close(q.ch)
|
|
return nil
|
|
}
|
|
|
|
// ErrRxOverrun signals that a receive queue has been overrun.
|
|
var ErrRxOverrun = errors.New("rx overrun")
|
|
|