fix the bugs in multicaset

pull/69/merge
Minh Doan 6 years ago
parent de692464ed
commit 44c2c00378
  1. 4
      node/node_handler.go
  2. 8
      p2p/peer.go
  3. 2
      utils/distribution_config.go

@ -27,7 +27,7 @@ const (
) )
func (node *Node) MaybeBroadcastAsValidator(content []byte) { func (node *Node) MaybeBroadcastAsValidator(content []byte) {
if node.SelfPeer.ValidatorID > 0 && node.SelfPeer.ValidatorID < p2p.MAX_BROADCAST { if node.SelfPeer.ValidatorID > 0 && node.SelfPeer.ValidatorID <= p2p.MAX_BROADCAST {
go p2p.BroadcastMessageFromValidator(node.SelfPeer, node.Consensus.GetValidatorPeers(), content) go p2p.BroadcastMessageFromValidator(node.SelfPeer, node.Consensus.GetValidatorPeers(), content)
} }
} }
@ -43,7 +43,7 @@ func (node *Node) NodeHandler(conn net.Conn) {
node.log.Error("Read p2p data failed", "err", err, "node", node) node.log.Error("Read p2p data failed", "err", err, "node", node)
return return
} }
// node.MaybeBroadcastAsValidator(content) node.MaybeBroadcastAsValidator(content)
consensusObj := node.Consensus consensusObj := node.Consensus

@ -53,7 +53,7 @@ func BroadcastMessage(peers []Peer, msg []byte) {
}() }()
} }
wg.Wait() wg.Wait()
log.Info("Broadcasting Down", "time spent", time.Now().Sub(start).Seconds()) log.Info("Broadcasting Done", "time spent", time.Now().Sub(start).Seconds())
} }
func SelectMyPeers(peers []Peer, min int, max int) []Peer { func SelectMyPeers(peers []Peer, min int, max int) []Peer {
@ -69,14 +69,16 @@ func SelectMyPeers(peers []Peer, min int, max int) []Peer {
// BroadcastMessage sends the message to a list of peers from a leader. // BroadcastMessage sends the message to a list of peers from a leader.
func BroadcastMessageFromLeader(peers []Peer, msg []byte) { func BroadcastMessageFromLeader(peers []Peer, msg []byte) {
// TODO(minhdoan): Enable back for multicast. // TODO(minhdoan): Enable back for multicast.
// peers = SelectMyPeers(peers, 0, MAX_BROADCAST-1) peers = SelectMyPeers(peers, 1, MAX_BROADCAST)
BroadcastMessage(peers, msg) BroadcastMessage(peers, msg)
log.Info("Done sending from leader")
} }
// BroadcastMessage sends the message to a list of peers from a validator. // BroadcastMessage sends the message to a list of peers from a validator.
func BroadcastMessageFromValidator(selfPeer Peer, peers []Peer, msg []byte) { func BroadcastMessageFromValidator(selfPeer Peer, peers []Peer, msg []byte) {
peers = SelectMyPeers(peers, (selfPeer.ValidatorID+1)*MAX_BROADCAST, (selfPeer.ValidatorID+2)*MAX_BROADCAST-1) peers = SelectMyPeers(peers, selfPeer.ValidatorID*MAX_BROADCAST+1, (selfPeer.ValidatorID+1)*MAX_BROADCAST)
BroadcastMessage(peers, msg) BroadcastMessage(peers, msg)
log.Info("Done sending from validator")
} }
// ConstructP2pMessage constructs the p2p message as [messageType, contentSize, content] // ConstructP2pMessage constructs the p2p message as [messageType, contentSize, content]

@ -88,8 +88,8 @@ func (config *DistributionConfig) ReadConfigFile(filename string) error {
shardID, _ := strconv.Atoi(p[3]) shardID, _ := strconv.Atoi(p[3])
validatorID := -1 validatorID := -1
if p[2] == "validator" { if p[2] == "validator" {
validatorID = validatorMap[shardID]
validatorMap[shardID]++ validatorMap[shardID]++
validatorID = validatorMap[shardID]
} }
entry := ConfigEntry{p[0], p[1], p[2], p[3], validatorID} entry := ConfigEntry{p[0], p[1], p[2], p[3], validatorID}
result = append(result, entry) result = append(result, entry)

Loading…
Cancel
Save