diff --git a/consensus/consensus_leader.go b/consensus/consensus_leader.go index c39cffae0..c84b035db 100644 --- a/consensus/consensus_leader.go +++ b/consensus/consensus_leader.go @@ -86,7 +86,7 @@ func (consensus *Consensus) startConsensus(newBlock *blockchain.Block) { consensus.blockHeader = byteBuffer.Bytes() msgToSend := consensus.constructAnnounceMessage() - p2p.BroadcastMessage(consensus.GetValidatorPeers(), msgToSend) + p2p.BroadcastMessageFromLeader(consensus.GetValidatorPeers(), msgToSend) // Set state to ANNOUNCE_DONE consensus.state = ANNOUNCE_DONE consensus.commitByLeader(true) @@ -203,7 +203,7 @@ func (consensus *Consensus) processCommitMessage(payload []byte, targetState Con consensus.responseByLeader(challengeScalar, targetState == CHALLENGE_DONE) // Broadcast challenge message - p2p.BroadcastMessage(consensus.GetValidatorPeers(), msgToSend) + p2p.BroadcastMessageFromLeader(consensus.GetValidatorPeers(), msgToSend) // Set state to targetState (CHALLENGE_DONE or FINAL_CHALLENGE_DONE) consensus.state = targetState @@ -349,7 +349,7 @@ func (consensus *Consensus) processResponseMessage(payload []byte, targetState C // Start the second round of Cosi msgToSend := consensus.constructCollectiveSigMessage(collectiveSig, bitmap) - p2p.BroadcastMessage(consensus.GetValidatorPeers(), msgToSend) + p2p.BroadcastMessageFromLeader(consensus.GetValidatorPeers(), msgToSend) consensus.commitByLeader(false) } else { consensus.Log.Debug("Consensus reached with signatures.", "numOfSignatures", len(*responses)) diff --git a/local_config4.txt b/local_config4.txt new file mode 100644 index 000000000..ac0809030 --- /dev/null +++ b/local_config4.txt @@ -0,0 +1,12 @@ +127.0.0.1 9001 validator 0 +127.0.0.1 9002 validator 0 +127.0.0.1 9003 validator 0 +127.0.0.1 9004 validator 0 +127.0.0.1 9005 validator 1 +127.0.0.1 9006 validator 1 +127.0.0.1 9007 validator 1 +127.0.0.1 9008 validator 1 +127.0.0.1 9009 validator 1 +127.0.0.1 9010 validator 1 +127.0.0.1 9000 leader 0 +127.0.0.1 9999 client 0 diff --git a/p2p/peer.go b/p2p/peer.go index 96c60d592..b0d81585f 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -15,13 +15,16 @@ import ( // Peer is the object for a p2p peer (node) type Peer struct { - Ip string // Ip address of the peer - Port string // Port number of the peer - PubKey kyber.Point // Public key of the peer - Ready bool // Ready is true if the peer is ready to join consensus. + Ip string // Ip address of the peer + Port string // Port number of the peer + PubKey kyber.Point // Public key of the peer + Ready bool // Ready is true if the peer is ready to join consensus. + ValidatorID int // TODO(minhdoan, rj): use this Ready to not send/broadcast to this peer if it wasn't available. } +const MAX_BROADCAST = 20 + // SendMessage sends the message to the peer func SendMessage(peer Peer, msg []byte) { // Construct normal p2p message @@ -48,6 +51,24 @@ func BroadcastMessage(peers []Peer, msg []byte) { wg.Wait() } +// BroadcastMessage sends the message to a list of peers from a leader. +func BroadcastMessageFromLeader(peers []Peer, msg []byte) { + // Construct broadcast p2p message + content := ConstructP2pMessage(byte(17), msg) + + var wg sync.WaitGroup + wg.Add(len(peers)) + + for _, peer := range peers { + peerCopy := peer + go func() { + defer wg.Done() + send(peerCopy.Ip, peerCopy.Port, content) + }() + } + wg.Wait() +} + // ConstructP2pMessage constructs the p2p message as [messageType, contentSize, content] func ConstructP2pMessage(msgType byte, content []byte) []byte { diff --git a/utils/distribution_config.go b/utils/distribution_config.go index f5c6f72e8..f9345f934 100644 --- a/utils/distribution_config.go +++ b/utils/distribution_config.go @@ -13,10 +13,11 @@ import ( ) type ConfigEntry struct { - IP string - Port string - Role string - ShardID string + IP string + Port string + Role string + ShardID string + ValidatorID int // Validator ID in its shard. } type DistributionConfig struct { @@ -29,21 +30,6 @@ func NewDistributionConfig() *DistributionConfig { return &config } -// done -// Gets all the validator peers -func (config *DistributionConfig) GetValidators() []p2p.Peer { - var peerList []p2p.Peer - for _, entry := range config.config { - if entry.Role != "validator" { - continue - } - peer := p2p.Peer{Port: entry.Port, Ip: entry.IP} - peerList = append(peerList, peer) - } - return peerList -} - -// done // Gets all the leader peers and corresponding shard Ids func (config *DistributionConfig) GetLeadersAndShardIds() ([]p2p.Peer, []uint32) { var peerList []p2p.Peer @@ -96,9 +82,16 @@ func (config *DistributionConfig) ReadConfigFile(filename string) error { fscanner := bufio.NewScanner(file) result := []ConfigEntry{} + validatorMap := map[int]int{} for fscanner.Scan() { p := strings.Split(fscanner.Text(), " ") - entry := ConfigEntry{p[0], p[1], p[2], p[3]} + shardID, _ := strconv.Atoi(p[3]) + validatorID := -1 + if p[2] == "validator" { + validatorID = validatorMap[shardID] + validatorMap[shardID]++ + } + entry := ConfigEntry{p[0], p[1], p[2], p[3], validatorID} result = append(result, entry) } config.config = result @@ -123,7 +116,7 @@ func (config *DistributionConfig) GetPeers(ip, port, shardID string) []p2p.Peer continue } // Get public key deterministically based on ip and port - peer := p2p.Peer{Port: entry.Port, Ip: entry.IP} + peer := p2p.Peer{Port: entry.Port, Ip: entry.IP, ValidatorID: entry.ValidatorID} setKey(&peer) peerList = append(peerList, peer) }