From 4328d726524ab3ffe9b3a10fc030051c0d13c710 Mon Sep 17 00:00:00 2001 From: Minh Doan Date: Thu, 13 Sep 2018 14:07:26 -0700 Subject: [PATCH] add self peer to node --- benchmark.go | 3 +++ node/node.go | 1 + p2p/peer.go | 31 +++++++++++++++++++++++++++++++ utils/distribution_config.go | 11 +++++++++++ 4 files changed, 46 insertions(+) diff --git a/benchmark.go b/benchmark.go index 318cbc933..d179b3212 100644 --- a/benchmark.go +++ b/benchmark.go @@ -84,6 +84,7 @@ func main() { shardID := distributionConfig.GetShardID(*ip, *port) peers := distributionConfig.GetPeers(*ip, *port, shardID) leader := distributionConfig.GetLeader(shardID) + selfPeer := distributionConfig.GetSelfPeer(*ip, *port, shardID) var role string if leader.Ip == *ip && leader.Port == *port { @@ -123,6 +124,8 @@ func main() { attack.GetInstance().SetLogger(consensus.Log) // Current node. currentNode := node.New(consensus, ldb) + // Add self peer. + currentNode.SelfPeer = selfPeer // Add sync node configuration. currentNode.SyncNode = *syncNode // Create client peer. diff --git a/node/node.go b/node/node.go index 24be69ffe..d239bd496 100644 --- a/node/node.go +++ b/node/node.go @@ -37,6 +37,7 @@ type Node struct { ClientPeer *p2p.Peer // The peer for the benchmark tx generator client, used for leaders to return proof-of-accept Client *client.Client // The presence of a client object means this node will also act as a client IsWaiting bool + SelfPeer p2p.Peer // TODO(minhdoan): it could be duplicated with Self below whose is Alok work. Self p2p.Peer IDCPeer p2p.Peer SyncNode bool // TODO(minhdoan): Remove it later. diff --git a/p2p/peer.go b/p2p/peer.go index 032a96fcb..87741d971 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -56,11 +56,42 @@ func BroadcastMessage(peers []Peer, msg []byte) { log.Info("Broadcasting Down", "time spent", time.Now().Sub(start).Seconds()) } +func SelectMyPeers(peers []Peer, min int, max int) []Peer { + res := []Peer{} + for _, peer := range peers { + if peer.ValidatorID >= min && peer.ValidatorID <= max { + res = append(res, peer) + } + } + return res +} + // BroadcastMessage sends the message to a list of peers from a leader. func BroadcastMessageFromLeader(peers []Peer, msg []byte) { // Construct broadcast p2p message content := ConstructP2pMessage(byte(17), msg) + peers = SelectMyPeers(peers, 0, MAX_BROADCAST-1) + var wg sync.WaitGroup + wg.Add(len(peers)) + + for _, peer := range peers { + peerCopy := peer + go func() { + defer wg.Done() + send(peerCopy.Ip, peerCopy.Port, content) + }() + } + wg.Wait() +} + +// BroadcastMessage sends the message to a list of peers from a leader. +func BroadcastMessageFromValidator(Self Peer, peers []Peer, msg []byte) { + + // Construct broadcast p2p message + content := ConstructP2pMessage(byte(17), msg) + + peers = SelectMyPeers(peers, 0, MAX_BROADCAST-1) var wg sync.WaitGroup wg.Add(len(peers)) diff --git a/utils/distribution_config.go b/utils/distribution_config.go index f9345f934..a9a97506b 100644 --- a/utils/distribution_config.go +++ b/utils/distribution_config.go @@ -123,6 +123,17 @@ func (config *DistributionConfig) GetPeers(ip, port, shardID string) []p2p.Peer return peerList } +// GetPeers Gets the validator list +func (config *DistributionConfig) GetSelfPeer(ip, port, shardID string) p2p.Peer { + for _, entry := range config.config { + if entry.IP == ip && entry.Port == port && entry.ShardID == shardID { + peer := p2p.Peer{Port: entry.Port, Ip: entry.IP, ValidatorID: entry.ValidatorID} + return peer + } + } + return p2p.Peer{} +} + // GetLeader Gets the leader of this shard id func (config *DistributionConfig) GetLeader(shardID string) p2p.Peer { var leaderPeer p2p.Peer