diff --git a/cmd/harmony.go b/cmd/harmony.go index ecf23ba68..0f4eb070d 100644 --- a/cmd/harmony.go +++ b/cmd/harmony.go @@ -199,6 +199,7 @@ func main() { // Attack determination. attack.GetInstance().SetAttackEnabled(attackDetermination(*attackedMode)) } + utils.UseLibP2P = false } else { if *isLeader { role = "leader" @@ -206,6 +207,7 @@ func main() { } else { role = "validator" } + utils.UseLibP2P = true } // Init logging. loggingInit(*logFolder, role, *ip, *port, *onlyLogTps) @@ -287,7 +289,6 @@ func main() { if consensus.IsLeader { go currentNode.SendPongMessage() } - currentNode.UseLibP2P = true } go currentNode.SupportSyncing() diff --git a/consensus/consensus.go b/consensus/consensus.go index 60d067955..b3ef03d1d 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -432,7 +432,11 @@ func (consensus *Consensus) RemovePeers(peers []p2p.Peer) int { pong := proto_discovery.NewPongMessage(validators, consensus.PublicKeys) buffer := pong.ConstructPongMessage() - host.BroadcastMessageFromLeader(consensus.host, validators, buffer, consensus.OfflinePeers) + if utils.UseLibP2P { + consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, buffer) + } else { + host.BroadcastMessageFromLeader(consensus.host, validators, buffer, consensus.OfflinePeers) + } } return count2 diff --git a/consensus/consensus_leader.go b/consensus/consensus_leader.go index e1a70a82d..48afe4a01 100644 --- a/consensus/consensus_leader.go +++ b/consensus/consensus_leader.go @@ -14,6 +14,7 @@ import ( bls_cosi "github.com/harmony-one/harmony/crypto/bls" "github.com/harmony-one/harmony/internal/profiler" "github.com/harmony-one/harmony/internal/utils" + "github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p/host" ) @@ -107,7 +108,11 @@ func (consensus *Consensus) startConsensus(newBlock *types.Block) { // Leader sign the block hash itself consensus.prepareSigs[consensus.nodeID] = consensus.priKey.SignHash(consensus.blockHash[:]) - host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend, consensus.OfflinePeers) + if utils.UseLibP2P { + consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, msgToSend) + } else { + host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend, consensus.OfflinePeers) + } } // processPrepareMessage processes the prepare message sent from validators @@ -164,7 +169,12 @@ func (consensus *Consensus) processPrepareMessage(message consensus_proto.Messag // Construct and broadcast prepared message msgToSend, aggSig := consensus.constructPreparedMessage() consensus.aggregatedPrepareSig = aggSig - host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend, consensus.OfflinePeers) + + if utils.UseLibP2P { + consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, msgToSend) + } else { + host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend, consensus.OfflinePeers) + } // Set state to targetState consensus.state = targetState @@ -230,7 +240,12 @@ func (consensus *Consensus) processCommitMessage(message consensus_proto.Message // Construct and broadcast committed message msgToSend, aggSig := consensus.constructCommittedMessage() consensus.aggregatedCommitSig = aggSig - host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend, consensus.OfflinePeers) + + if utils.UseLibP2P { + consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, msgToSend) + } else { + host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend, consensus.OfflinePeers) + } var blockObj types.Block err := rlp.DecodeBytes(consensus.block, &blockObj) diff --git a/consensus/consensus_validator.go b/consensus/consensus_validator.go index 783095173..381bb8df1 100644 --- a/consensus/consensus_validator.go +++ b/consensus/consensus_validator.go @@ -3,6 +3,7 @@ package consensus import ( "github.com/harmony-one/bls/ffi/go/bls" bls_cosi "github.com/harmony-one/harmony/crypto/bls" + "github.com/harmony-one/harmony/p2p" "github.com/ethereum/go-ethereum/rlp" protobuf "github.com/golang/protobuf/proto" @@ -103,7 +104,11 @@ func (consensus *Consensus) processAnnounceMessage(message consensus_proto.Messa // Construct and send prepare message msgToSend := consensus.constructPrepareMessage() - consensus.SendMessage(consensus.leader, msgToSend) + if utils.UseLibP2P { + consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, msgToSend) + } else { + consensus.SendMessage(consensus.leader, msgToSend) + } consensus.state = PrepareDone } @@ -163,7 +168,11 @@ func (consensus *Consensus) processPreparedMessage(message consensus_proto.Messa // Construct and send the commit message multiSigAndBitmap := append(multiSig, bitmap...) msgToSend := consensus.constructCommitMessage(multiSigAndBitmap) - consensus.SendMessage(consensus.leader, msgToSend) + if utils.UseLibP2P { + consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, msgToSend) + } else { + consensus.SendMessage(consensus.leader, msgToSend) + } consensus.state = CommitDone } diff --git a/internal/utils/singleton.go b/internal/utils/singleton.go index 5735e7a7a..48a93a2fd 100644 --- a/internal/utils/singleton.go +++ b/internal/utils/singleton.go @@ -13,6 +13,10 @@ import ( var ( Port string IP string + // Global Variable to use libp2p for networking + // FIXME: this is a temporary hack, once we totally switch to libp2p + // this variable shouldn't be used + UseLibP2P bool ) // SetPortAndIP used to print out loggings of node with Port and IP. diff --git a/p2p/host/hostv2/hostv2.go b/p2p/host/hostv2/hostv2.go index d37519c9e..6eb097039 100644 --- a/p2p/host/hostv2/hostv2.go +++ b/p2p/host/hostv2/hostv2.go @@ -162,8 +162,8 @@ func New(self *p2p.Peer, priKey p2p_crypto.PrivKey, opts ...p2p_config.Option) * append(opts, libp2p.ListenAddrs(listenAddr), libp2p.Identity(priKey))..., ) catchError(err) - // pubsub, err := pubsub.NewGossipSub(ctx, p2pHost) - pubsub, err := pubsub.NewFloodSub(ctx, p2pHost) + pubsub, err := pubsub.NewGossipSub(ctx, p2pHost) + // pubsub, err := pubsub.NewFloodSub(ctx, p2pHost) catchError(err) self.PeerID = p2pHost.ID()