1st attempt to replace broadcast with pubsub

Signed-off-by: Leo Chen <leo@harmony.one>
pull/428/head
Leo Chen 6 years ago
parent 175e43fed7
commit f2a5e5c5c9
  1. 6
      node/node.go
  2. 10
      node/node_handler.go

@ -182,10 +182,6 @@ type Node struct {
// Group Message Receiver // Group Message Receiver
groupReceiver p2p.GroupReceiver groupReceiver p2p.GroupReceiver
// fully integrate with libp2p for networking
// FIXME: this is temporary hack until we can fully replace the old one
UseLibP2P bool
// Duplicated Ping Message Received // Duplicated Ping Message Received
duplicatedPing map[string]bool duplicatedPing map[string]bool
} }
@ -219,7 +215,7 @@ func (node *Node) getTransactionsForNewBlock(maxNumTxs int) types.Transactions {
// StartServer starts a server and process the requests by a handler. // StartServer starts a server and process the requests by a handler.
func (node *Node) StartServer() { func (node *Node) StartServer() {
if node.UseLibP2P { if utils.UseLibP2P {
select {} select {}
} else { } else {
node.host.BindHandlerAndServe(node.StreamHandler) node.host.BindHandlerAndServe(node.StreamHandler)

@ -59,7 +59,7 @@ func (node *Node) ReceiveGroupMessage() {
} }
msg, sender, err := node.groupReceiver.Receive(ctx) msg, sender, err := node.groupReceiver.Receive(ctx)
if sender != node.host.GetID() { if sender != node.host.GetID() {
utils.GetLogInstance().Info("[PUBSUB]", "received group msg", len(msg), "sender", sender) // utils.GetLogInstance().Info("[PUBSUB]", "received group msg", len(msg), "sender", sender)
if err == nil { if err == nil {
// skip the first 5 bytes, 1 byte is p2p type, 4 bytes are message size // skip the first 5 bytes, 1 byte is p2p type, 4 bytes are message size
node.messageHandler(msg[5:], string(sender)) node.messageHandler(msg[5:], string(sender))
@ -229,7 +229,11 @@ func (node *Node) transactionMessageHandler(msgPayload []byte) {
func (node *Node) BroadcastNewBlock(newBlock *types.Block) { func (node *Node) BroadcastNewBlock(newBlock *types.Block) {
if node.ClientPeer != nil { if node.ClientPeer != nil {
utils.GetLogInstance().Debug("Sending new block to client", "client", node.ClientPeer) utils.GetLogInstance().Debug("Sending new block to client", "client", node.ClientPeer)
node.SendMessage(*node.ClientPeer, proto_node.ConstructBlocksSyncMessage([]*types.Block{newBlock})) if utils.UseLibP2P {
node.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, proto_node.ConstructBlocksSyncMessage([]*types.Block{newBlock}))
} else {
node.SendMessage(*node.ClientPeer, proto_node.ConstructBlocksSyncMessage([]*types.Block{newBlock}))
}
} }
} }
@ -325,7 +329,7 @@ func (node *Node) pingMessageHandler(msgPayload []byte, sender string) int {
node.AddPeers([]*p2p.Peer{peer}) node.AddPeers([]*p2p.Peer{peer})
// This is the old way of broadcasting pong message // This is the old way of broadcasting pong message
if node.Consensus.IsLeader && !node.UseLibP2P { if node.Consensus.IsLeader && !utils.UseLibP2P {
peers := node.Consensus.GetValidatorPeers() peers := node.Consensus.GetValidatorPeers()
pong := proto_discovery.NewPongMessage(peers, node.Consensus.PublicKeys) pong := proto_discovery.NewPongMessage(peers, node.Consensus.PublicKeys)
buffer := pong.ConstructPongMessage() buffer := pong.ConstructPongMessage()

Loading…
Cancel
Save