set up validator id for peers

pull/69/merge
Minh Doan 6 years ago
parent 72db858956
commit 73996866dd
  1. 6
      consensus/consensus_leader.go
  2. 12
      local_config4.txt
  3. 29
      p2p/peer.go
  4. 35
      utils/distribution_config.go

@ -86,7 +86,7 @@ func (consensus *Consensus) startConsensus(newBlock *blockchain.Block) {
consensus.blockHeader = byteBuffer.Bytes() consensus.blockHeader = byteBuffer.Bytes()
msgToSend := consensus.constructAnnounceMessage() msgToSend := consensus.constructAnnounceMessage()
p2p.BroadcastMessage(consensus.GetValidatorPeers(), msgToSend) p2p.BroadcastMessageFromLeader(consensus.GetValidatorPeers(), msgToSend)
// Set state to ANNOUNCE_DONE // Set state to ANNOUNCE_DONE
consensus.state = ANNOUNCE_DONE consensus.state = ANNOUNCE_DONE
consensus.commitByLeader(true) consensus.commitByLeader(true)
@ -203,7 +203,7 @@ func (consensus *Consensus) processCommitMessage(payload []byte, targetState Con
consensus.responseByLeader(challengeScalar, targetState == CHALLENGE_DONE) consensus.responseByLeader(challengeScalar, targetState == CHALLENGE_DONE)
// Broadcast challenge message // Broadcast challenge message
p2p.BroadcastMessage(consensus.GetValidatorPeers(), msgToSend) p2p.BroadcastMessageFromLeader(consensus.GetValidatorPeers(), msgToSend)
// Set state to targetState (CHALLENGE_DONE or FINAL_CHALLENGE_DONE) // Set state to targetState (CHALLENGE_DONE or FINAL_CHALLENGE_DONE)
consensus.state = targetState consensus.state = targetState
@ -349,7 +349,7 @@ func (consensus *Consensus) processResponseMessage(payload []byte, targetState C
// Start the second round of Cosi // Start the second round of Cosi
msgToSend := consensus.constructCollectiveSigMessage(collectiveSig, bitmap) msgToSend := consensus.constructCollectiveSigMessage(collectiveSig, bitmap)
p2p.BroadcastMessage(consensus.GetValidatorPeers(), msgToSend) p2p.BroadcastMessageFromLeader(consensus.GetValidatorPeers(), msgToSend)
consensus.commitByLeader(false) consensus.commitByLeader(false)
} else { } else {
consensus.Log.Debug("Consensus reached with signatures.", "numOfSignatures", len(*responses)) consensus.Log.Debug("Consensus reached with signatures.", "numOfSignatures", len(*responses))

@ -0,0 +1,12 @@
127.0.0.1 9001 validator 0
127.0.0.1 9002 validator 0
127.0.0.1 9003 validator 0
127.0.0.1 9004 validator 0
127.0.0.1 9005 validator 1
127.0.0.1 9006 validator 1
127.0.0.1 9007 validator 1
127.0.0.1 9008 validator 1
127.0.0.1 9009 validator 1
127.0.0.1 9010 validator 1
127.0.0.1 9000 leader 0
127.0.0.1 9999 client 0

@ -15,13 +15,16 @@ import (
// Peer is the object for a p2p peer (node) // Peer is the object for a p2p peer (node)
type Peer struct { type Peer struct {
Ip string // Ip address of the peer Ip string // Ip address of the peer
Port string // Port number of the peer Port string // Port number of the peer
PubKey kyber.Point // Public key of the peer PubKey kyber.Point // Public key of the peer
Ready bool // Ready is true if the peer is ready to join consensus. Ready bool // Ready is true if the peer is ready to join consensus.
ValidatorID int
// TODO(minhdoan, rj): use this Ready to not send/broadcast to this peer if it wasn't available. // TODO(minhdoan, rj): use this Ready to not send/broadcast to this peer if it wasn't available.
} }
const MAX_BROADCAST = 20
// SendMessage sends the message to the peer // SendMessage sends the message to the peer
func SendMessage(peer Peer, msg []byte) { func SendMessage(peer Peer, msg []byte) {
// Construct normal p2p message // Construct normal p2p message
@ -48,6 +51,24 @@ func BroadcastMessage(peers []Peer, msg []byte) {
wg.Wait() wg.Wait()
} }
// BroadcastMessage sends the message to a list of peers from a leader.
func BroadcastMessageFromLeader(peers []Peer, msg []byte) {
// Construct broadcast p2p message
content := ConstructP2pMessage(byte(17), msg)
var wg sync.WaitGroup
wg.Add(len(peers))
for _, peer := range peers {
peerCopy := peer
go func() {
defer wg.Done()
send(peerCopy.Ip, peerCopy.Port, content)
}()
}
wg.Wait()
}
// ConstructP2pMessage constructs the p2p message as [messageType, contentSize, content] // ConstructP2pMessage constructs the p2p message as [messageType, contentSize, content]
func ConstructP2pMessage(msgType byte, content []byte) []byte { func ConstructP2pMessage(msgType byte, content []byte) []byte {

@ -13,10 +13,11 @@ import (
) )
type ConfigEntry struct { type ConfigEntry struct {
IP string IP string
Port string Port string
Role string Role string
ShardID string ShardID string
ValidatorID int // Validator ID in its shard.
} }
type DistributionConfig struct { type DistributionConfig struct {
@ -29,21 +30,6 @@ func NewDistributionConfig() *DistributionConfig {
return &config return &config
} }
// done
// Gets all the validator peers
func (config *DistributionConfig) GetValidators() []p2p.Peer {
var peerList []p2p.Peer
for _, entry := range config.config {
if entry.Role != "validator" {
continue
}
peer := p2p.Peer{Port: entry.Port, Ip: entry.IP}
peerList = append(peerList, peer)
}
return peerList
}
// done
// Gets all the leader peers and corresponding shard Ids // Gets all the leader peers and corresponding shard Ids
func (config *DistributionConfig) GetLeadersAndShardIds() ([]p2p.Peer, []uint32) { func (config *DistributionConfig) GetLeadersAndShardIds() ([]p2p.Peer, []uint32) {
var peerList []p2p.Peer var peerList []p2p.Peer
@ -96,9 +82,16 @@ func (config *DistributionConfig) ReadConfigFile(filename string) error {
fscanner := bufio.NewScanner(file) fscanner := bufio.NewScanner(file)
result := []ConfigEntry{} result := []ConfigEntry{}
validatorMap := map[int]int{}
for fscanner.Scan() { for fscanner.Scan() {
p := strings.Split(fscanner.Text(), " ") p := strings.Split(fscanner.Text(), " ")
entry := ConfigEntry{p[0], p[1], p[2], p[3]} shardID, _ := strconv.Atoi(p[3])
validatorID := -1
if p[2] == "validator" {
validatorID = validatorMap[shardID]
validatorMap[shardID]++
}
entry := ConfigEntry{p[0], p[1], p[2], p[3], validatorID}
result = append(result, entry) result = append(result, entry)
} }
config.config = result config.config = result
@ -123,7 +116,7 @@ func (config *DistributionConfig) GetPeers(ip, port, shardID string) []p2p.Peer
continue continue
} }
// Get public key deterministically based on ip and port // Get public key deterministically based on ip and port
peer := p2p.Peer{Port: entry.Port, Ip: entry.IP} peer := p2p.Peer{Port: entry.Port, Ip: entry.IP, ValidatorID: entry.ValidatorID}
setKey(&peer) setKey(&peer)
peerList = append(peerList, peer) peerList = append(peerList, peer)
} }

Loading…
Cancel
Save