add multi boardcast improvement

pull/69/merge
Minh Doan 6 years ago
parent 4328d72652
commit 0d1b847970
  1. 8
      node/node_handler.go
  2. 43
      p2p/peer.go

@ -26,6 +26,12 @@ const (
NumBlocksBeforeStateBlock = 1000 NumBlocksBeforeStateBlock = 1000
) )
func (node *Node) MaybeBroadcastAsValidator(content []byte) {
if node.SelfPeer.ValidatorID > 0 && node.SelfPeer.ValidatorID < p2p.MAX_BROADCAST {
go p2p.BroadcastMessageFromValidator(node.SelfPeer, node.Consensus.GetValidatorPeers(), content)
}
}
// NodeHandler handles a new incoming connection. // NodeHandler handles a new incoming connection.
func (node *Node) NodeHandler(conn net.Conn) { func (node *Node) NodeHandler(conn net.Conn) {
defer conn.Close() defer conn.Close()
@ -37,6 +43,8 @@ func (node *Node) NodeHandler(conn net.Conn) {
node.log.Error("Read p2p data failed", "err", err, "node", node) node.log.Error("Read p2p data failed", "err", err, "node", node)
return return
} }
node.MaybeBroadcastAsValidator(content)
consensusObj := node.Consensus consensusObj := node.Consensus
msgCategory, err := proto.GetMessageCategory(content) msgCategory, err := proto.GetMessageCategory(content)

@ -35,11 +35,7 @@ func SendMessage(peer Peer, msg []byte) {
send(peer.Ip, peer.Port, content) send(peer.Ip, peer.Port, content)
} }
// BroadcastMessage sends the message to a list of peers func BoadcastToPeers(peers []Peer, content []byte) {
func BroadcastMessage(peers []Peer, msg []byte) {
// Construct broadcast p2p message
content := ConstructP2pMessage(byte(17), msg)
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(len(peers)) wg.Add(len(peers))
@ -56,6 +52,14 @@ func BroadcastMessage(peers []Peer, msg []byte) {
log.Info("Broadcasting Down", "time spent", time.Now().Sub(start).Seconds()) log.Info("Broadcasting Down", "time spent", time.Now().Sub(start).Seconds())
} }
// BroadcastMessage sends the message to a list of peers
func BroadcastMessage(peers []Peer, msg []byte) {
// Construct broadcast p2p message
content := ConstructP2pMessage(byte(17), msg)
BoadcastToPeers(peers, content)
}
func SelectMyPeers(peers []Peer, min int, max int) []Peer { func SelectMyPeers(peers []Peer, min int, max int) []Peer {
res := []Peer{} res := []Peer{}
for _, peer := range peers { for _, peer := range peers {
@ -72,37 +76,16 @@ func BroadcastMessageFromLeader(peers []Peer, msg []byte) {
content := ConstructP2pMessage(byte(17), msg) content := ConstructP2pMessage(byte(17), msg)
peers = SelectMyPeers(peers, 0, MAX_BROADCAST-1) peers = SelectMyPeers(peers, 0, MAX_BROADCAST-1)
var wg sync.WaitGroup BoadcastToPeers(peers, content)
wg.Add(len(peers))
for _, peer := range peers {
peerCopy := peer
go func() {
defer wg.Done()
send(peerCopy.Ip, peerCopy.Port, content)
}()
}
wg.Wait()
} }
// BroadcastMessage sends the message to a list of peers from a leader. // BroadcastMessage sends the message to a list of peers from a leader.
func BroadcastMessageFromValidator(Self Peer, peers []Peer, msg []byte) { func BroadcastMessageFromValidator(selfPeer Peer, peers []Peer, msg []byte) {
// Construct broadcast p2p message // Construct broadcast p2p message
content := ConstructP2pMessage(byte(17), msg) content := ConstructP2pMessage(byte(17), msg)
peers = SelectMyPeers(peers, 0, MAX_BROADCAST-1) peers = SelectMyPeers(peers, (selfPeer.ValidatorID+1)*MAX_BROADCAST, (selfPeer.ValidatorID+2)*MAX_BROADCAST-1)
var wg sync.WaitGroup BoadcastToPeers(peers, content)
wg.Add(len(peers))
for _, peer := range peers {
peerCopy := peer
go func() {
defer wg.Done()
send(peerCopy.Ip, peerCopy.Port, content)
}()
}
wg.Wait()
} }
// ConstructP2pMessage constructs the p2p message as [messageType, contentSize, content] // ConstructP2pMessage constructs the p2p message as [messageType, contentSize, content]

Loading…
Cancel
Save