From 0d1b8479708dc2edb11bc5ca1d805a734edb2b8d Mon Sep 17 00:00:00 2001 From: Minh Doan Date: Thu, 13 Sep 2018 14:20:29 -0700 Subject: [PATCH] add multi boardcast improvement --- node/node_handler.go | 8 ++++++++ p2p/peer.go | 43 +++++++++++++------------------------------ 2 files changed, 21 insertions(+), 30 deletions(-) diff --git a/node/node_handler.go b/node/node_handler.go index c717b3707..8982c0522 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -26,6 +26,12 @@ const ( NumBlocksBeforeStateBlock = 1000 ) +func (node *Node) MaybeBroadcastAsValidator(content []byte) { + if node.SelfPeer.ValidatorID > 0 && node.SelfPeer.ValidatorID < p2p.MAX_BROADCAST { + go p2p.BroadcastMessageFromValidator(node.SelfPeer, node.Consensus.GetValidatorPeers(), content) + } +} + // NodeHandler handles a new incoming connection. func (node *Node) NodeHandler(conn net.Conn) { defer conn.Close() @@ -37,6 +43,8 @@ func (node *Node) NodeHandler(conn net.Conn) { node.log.Error("Read p2p data failed", "err", err, "node", node) return } + node.MaybeBroadcastAsValidator(content) + consensusObj := node.Consensus msgCategory, err := proto.GetMessageCategory(content) diff --git a/p2p/peer.go b/p2p/peer.go index 87741d971..5e81e1c50 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -35,11 +35,7 @@ func SendMessage(peer Peer, msg []byte) { send(peer.Ip, peer.Port, content) } -// BroadcastMessage sends the message to a list of peers -func BroadcastMessage(peers []Peer, msg []byte) { - // Construct broadcast p2p message - content := ConstructP2pMessage(byte(17), msg) - +func BoadcastToPeers(peers []Peer, content []byte) { var wg sync.WaitGroup wg.Add(len(peers)) @@ -56,6 +52,14 @@ func BroadcastMessage(peers []Peer, msg []byte) { log.Info("Broadcasting Down", "time spent", time.Now().Sub(start).Seconds()) } +// BroadcastMessage sends the message to a list of peers +func BroadcastMessage(peers []Peer, msg []byte) { + // Construct broadcast p2p message + content := ConstructP2pMessage(byte(17), msg) + + BoadcastToPeers(peers, content) +} + func SelectMyPeers(peers []Peer, min int, max int) []Peer { res := []Peer{} for _, peer := range peers { @@ -72,37 +76,16 @@ func BroadcastMessageFromLeader(peers []Peer, msg []byte) { content := ConstructP2pMessage(byte(17), msg) peers = SelectMyPeers(peers, 0, MAX_BROADCAST-1) - var wg sync.WaitGroup - wg.Add(len(peers)) - - for _, peer := range peers { - peerCopy := peer - go func() { - defer wg.Done() - send(peerCopy.Ip, peerCopy.Port, content) - }() - } - wg.Wait() + BoadcastToPeers(peers, content) } // BroadcastMessage sends the message to a list of peers from a leader. -func BroadcastMessageFromValidator(Self Peer, peers []Peer, msg []byte) { - +func BroadcastMessageFromValidator(selfPeer Peer, peers []Peer, msg []byte) { // Construct broadcast p2p message content := ConstructP2pMessage(byte(17), msg) - peers = SelectMyPeers(peers, 0, MAX_BROADCAST-1) - var wg sync.WaitGroup - wg.Add(len(peers)) - - for _, peer := range peers { - peerCopy := peer - go func() { - defer wg.Done() - send(peerCopy.Ip, peerCopy.Port, content) - }() - } - wg.Wait() + peers = SelectMyPeers(peers, (selfPeer.ValidatorID+1)*MAX_BROADCAST, (selfPeer.ValidatorID+2)*MAX_BROADCAST-1) + BoadcastToPeers(peers, content) } // ConstructP2pMessage constructs the p2p message as [messageType, contentSize, content]