diff --git a/api/service/discovery/service.go b/api/service/discovery/service.go index 2e0d35a4f..34c287ecf 100644 --- a/api/service/discovery/service.go +++ b/api/service/discovery/service.go @@ -106,7 +106,7 @@ func (s *Service) contactP2pPeers() { } if a == p2p.ActionStart || a == p2p.ActionResume || a == p2p.ActionPause { - if g == p2p.GroupIDBeacon { + if g == p2p.GroupIDBeacon || g == p2p.GroupIDBeaconClient { if s.config.IsBeacon { // beacon chain node err = s.host.SendMessageToGroups([]p2p.GroupID{s.config.Beacon}, regMsgBuf) diff --git a/api/service/networkinfo/service.go b/api/service/networkinfo/service.go index 318fcb913..b40b2d725 100644 --- a/api/service/networkinfo/service.go +++ b/api/service/networkinfo/service.go @@ -133,7 +133,9 @@ func (s *Service) DoService() { } p := p2p.Peer{IP: ip, Port: port, PeerID: peer.ID, Addrs: peer.Addrs} utils.GetLogInstance().Info("Notify peerChan", "peer", p) - s.peerChan <- p + if s.peerChan != nil { + s.peerChan <- p + } } case <-s.stopChan: return diff --git a/cmd/client/txgen/main.go b/cmd/client/txgen/main.go index 44be6f380..a784d0264 100644 --- a/cmd/client/txgen/main.go +++ b/cmd/client/txgen/main.go @@ -19,6 +19,7 @@ import ( "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/node" "github.com/harmony-one/harmony/p2p" + p2p_host "github.com/harmony-one/harmony/p2p/host" "github.com/harmony-one/harmony/p2p/p2pimpl" peerstore "github.com/libp2p/go-libp2p-peerstore" multiaddr "github.com/multiformats/go-multiaddr" @@ -70,40 +71,65 @@ func main() { // Add GOMAXPROCS to achieve max performance. runtime.GOMAXPROCS(1024) - var bcPeer *p2p.Peer - var shardIDLeaderMap map[uint32]p2p.Peer - priKey, _, err := utils.LoadKeyFromFile(*keyFile) - if err != nil { - panic(err) - } + // Logging setup + utils.SetPortAndIP(*port, *ip) - if *bcAddr != "" { - // Turn the destination into a multiaddr. - maddr, err := multiaddr.NewMultiaddr(*bcAddr) + if len(utils.BootNodes) == 0 { + bootNodeAddrs, err := utils.StringsToAddrs(utils.DefaultBootNodeAddrStrings) if err != nil { panic(err) } + utils.BootNodes = bootNodeAddrs + } - // Extract the peer ID from the multiaddr. - info, err := peerstore.InfoFromP2pAddr(maddr) - if err != nil { - panic(err) - } + var shardIDLeaderMap map[uint32]p2p.Peer + nodePriKey, _, err := utils.LoadKeyFromFile(*keyFile) + if err != nil { + panic(err) + } - bcPeer = &p2p.Peer{IP: *bcIP, Port: *bcPort, Addrs: info.Addrs, PeerID: info.ID} - } else { - bcPeer = &p2p.Peer{IP: *bcIP, Port: *bcPort} + peerPriKey, peerPubKey := utils.GenKey(*ip, *port) + if peerPriKey == nil || peerPubKey == nil { + panic(fmt.Errorf("generate key error")) } - candidateNode := newnode.New(*ip, *port, priKey) - candidateNode.AddPeer(bcPeer) - candidateNode.ContactBeaconChain(*bcPeer) - selfPeer := candidateNode.GetSelfPeer() - selfPeer.PubKey = candidateNode.PubK + selfPeer := p2p.Peer{IP: *ip, Port: *port, ValidatorID: -1, PubKey: peerPubKey} - shardIDLeaderMap = candidateNode.Leaders + if !*libp2pPD { + var bcPeer *p2p.Peer + if *bcAddr != "" { + // Turn the destination into a multiaddr. + maddr, err := multiaddr.NewMultiaddr(*bcAddr) + if err != nil { + panic(err) + } - debugPrintShardIDLeaderMap(shardIDLeaderMap) + // Extract the peer ID from the multiaddr. + info, err := peerstore.InfoFromP2pAddr(maddr) + if err != nil { + panic(err) + } + + bcPeer = &p2p.Peer{IP: *bcIP, Port: *bcPort, Addrs: info.Addrs, PeerID: info.ID} + } else { + bcPeer = &p2p.Peer{IP: *bcIP, Port: *bcPort} + } + + candidateNode := newnode.New(*ip, *port, nodePriKey) + candidateNode.AddPeer(bcPeer) + candidateNode.ContactBeaconChain(*bcPeer) + selfPeer := candidateNode.GetSelfPeer() + selfPeer.PubKey = candidateNode.PubK + + shardIDLeaderMap = candidateNode.Leaders + + debugPrintShardIDLeaderMap(shardIDLeaderMap) + } else { + // Init with LibP2P enabled, FIXME: (leochen) right now we support only one shard + shardIDLeaderMap = make(map[uint32]p2p.Peer) + shardIDLeaderMap[0] = p2p.Peer{} + utils.UseLibP2P = true + } // Do cross shard tx if there are more than one shard setting := txgen.Settings{ @@ -124,7 +150,7 @@ func main() { // Nodes containing blockchain data to mirror the shards' data in the network nodes := []*node.Node{} - host, err := p2pimpl.NewHost(&selfPeer, priKey) + host, err := p2pimpl.NewHost(&selfPeer, nodePriKey) if err != nil { panic("unable to new host in txgen") } @@ -147,15 +173,15 @@ func main() { }() // This func is used to update the client's blockchain when new blocks are received from the leaders updateBlocksFunc := func(blocks []*types.Block) { - log.Info("[Txgen] Received new block", "block", blocks) + utils.GetLogInstance().Info("[Txgen] Received new block", "block", blocks) for _, block := range blocks { for _, node := range nodes { shardID := block.ShardID() if node.Consensus.ShardID == shardID { // Add it to blockchain - log.Info("Current Block", "hash", node.Blockchain().CurrentBlock().Hash().Hex()) - log.Info("Adding block from leader", "txNum", len(block.Transactions()), "shardID", shardID, "preHash", block.ParentHash().Hex()) + utils.GetLogInstance().Info("Current Block", "hash", node.Blockchain().CurrentBlock().Hash().Hex()) + utils.GetLogInstance().Info("Adding block from leader", "txNum", len(block.Transactions()), "shardID", shardID, "preHash", block.ParentHash().Hex()) node.AddNewBlock(block) stateMutex.Lock() node.Worker.UpdateCurrent() @@ -169,24 +195,24 @@ func main() { } clientNode.Client.UpdateBlocks = updateBlocksFunc - // Start the client server to listen to leader's message - go clientNode.StartServer() - for _, leader := range shardIDLeaderMap { - log.Debug("Client Join Shard", "leader", leader) - clientNode.GetHost().AddPeer(&leader) if *libp2pPD { clientNode.Role = node.NewNode } else { + clientNode.GetHost().AddPeer(&leader) + utils.GetLogInstance().Debug("Client Join Shard", "leader", leader) go clientNode.JoinShard(leader) } clientNode.State = node.NodeReadyForConsensus } + if *libp2pPD { clientNode.ServiceManagerSetup() clientNode.RunServices() - clientNode.StartServer() + go clientNode.StartServer() } else { + // Start the client server to listen to leader's message + go clientNode.StartServer() // wait for 1 seconds for client to send ping message to leader time.Sleep(time.Second) clientNode.StopPing <- struct{}{} @@ -194,14 +220,14 @@ func main() { clientNode.State = node.NodeReadyForConsensus // Transaction generation process - time.Sleep(2 * time.Second) // wait for nodes to be ready + time.Sleep(5 * time.Second) // wait for nodes to be ready start := time.Now() totalTime := float64(*duration) for { t := time.Now() if totalTime > 0 && t.Sub(start).Seconds() >= totalTime { - log.Debug("Generator timer ended.", "duration", (int(t.Sub(start))), "startTime", start, "totalTime", totalTime) + utils.GetLogInstance().Debug("Generator timer ended.", "duration", (int(t.Sub(start))), "startTime", start, "totalTime", totalTime) break } select { @@ -210,7 +236,7 @@ func main() { lock := sync.Mutex{} stateMutex.Lock() - log.Warn("STARTING TX GEN", "gomaxprocs", runtime.GOMAXPROCS(0)) + utils.GetLogInstance().Warn("STARTING TX GEN", "gomaxprocs", runtime.GOMAXPROCS(0)) txs, _ := txgen.GenerateSimulatedTransactionsAccount(int(shardID), nodes, setting) lock.Lock() @@ -226,26 +252,34 @@ func main() { }(shardID, txs) } lock.Unlock() - case <-time.After(2 * time.Second): - log.Warn("No new block is received so far") + case <-time.After(10 * time.Second): + utils.GetLogInstance().Warn("No new block is received so far") } } // Send a stop message to stop the nodes at the end msg := proto_node.ConstructStopMessage() - clientNode.BroadcastMessage(clientNode.Client.GetLeaders(), msg) + if utils.UseLibP2P { + clientNode.GetHost().SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeaconClient}, p2p_host.ConstructP2pMessage(byte(0), msg)) + } else { + clientNode.BroadcastMessage(clientNode.Client.GetLeaders(), msg) + } time.Sleep(3000 * time.Millisecond) } // SendTxsToLeader sends txs to leader account. func SendTxsToLeader(clientNode *node.Node, leader p2p.Peer, txs types.Transactions) { - log.Debug("[Generator] Sending account-based txs to...", "leader", leader, "numTxs", len(txs)) + utils.GetLogInstance().Debug("[Generator] Sending account-based txs to...", "leader", leader, "numTxs", len(txs)) msg := proto_node.ConstructTransactionListMessageAccount(txs) - clientNode.SendMessage(leader, msg) + if utils.UseLibP2P { + clientNode.GetHost().SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeaconClient}, p2p_host.ConstructP2pMessage(byte(0), msg)) + } else { + clientNode.SendMessage(leader, msg) + } } func debugPrintShardIDLeaderMap(leaderMap map[uint32]p2p.Peer) { for k, v := range leaderMap { - log.Debug("Leader", "ShardID", k, "Leader", v) + utils.GetLogInstance().Debug("Leader", "ShardID", k, "Leader", v) } } diff --git a/consensus/consensus_leader.go b/consensus/consensus_leader.go index da05f522c..478bae0ae 100644 --- a/consensus/consensus_leader.go +++ b/consensus/consensus_leader.go @@ -31,13 +31,24 @@ var ( // WaitForNewBlock waits for the next new block to run consensus on func (consensus *Consensus) WaitForNewBlock(blockChannel chan *types.Block, stopChan chan struct{}, stoppedChan chan struct{}, startChannel chan struct{}) { + // gensis block is the first block to be processed. + // But we shouldn't start consensus yet, as we need to wait for all validators + // received the leader's pub key which will be propogated via Pong message. + // After we started the first consensus, we will go back to normal case to wait + // for new blocks. + // The signal to start the first consensus right now is the sending of Pong message (SendPongMessage function in node/node_handler.go + // but it can be changed to other conditions later + first := true go func() { defer close(stoppedChan) for { select { default: - // got the signal to start consensus - _ = <-startChannel + if first && startChannel != nil { + // got the signal to start consensus + _ = <-startChannel + first = false + } utils.GetLogInstance().Debug("Waiting for block", "consensus", consensus) // keep waiting for new blocks diff --git a/drand/drand.go b/drand/drand.go index 492b09792..561d76a52 100644 --- a/drand/drand.go +++ b/drand/drand.go @@ -37,7 +37,7 @@ type DRand struct { validators sync.Map // key is uint16, value is p2p.Peer // Leader's address - leader p2p.Peer + Leader p2p.Peer // Public keys of the committee including leader and validators PublicKeys []*bls.PublicKey @@ -85,7 +85,7 @@ func New(host p2p.Host, ShardID string, peers []p2p.Peer, leader p2p.Peer, confi dRand.IsLeader = false } - dRand.leader = leader + dRand.Leader = leader for _, peer := range peers { dRand.validators.Store(utils.GetUniqueIDFromPeer(peer), peer) } @@ -101,7 +101,7 @@ func New(host p2p.Host, ShardID string, peers []p2p.Peer, leader p2p.Peer, confi dRand.PublicKeys = allPublicKeys - bitmap, _ := bls_cosi.NewMask(dRand.PublicKeys, dRand.leader.PubKey) + bitmap, _ := bls_cosi.NewMask(dRand.PublicKeys, dRand.Leader.PubKey) dRand.bitmap = bitmap dRand.pRand = nil @@ -145,6 +145,7 @@ func (dRand *DRand) AddPeers(peers []*p2p.Peer) int { dRand.pubKeyLock.Lock() dRand.PublicKeys = append(dRand.PublicKeys, peer.PubKey) dRand.pubKeyLock.Unlock() + utils.GetLogInstance().Debug("[DRAND]", "AddPeers", *peer) } count++ } @@ -241,7 +242,7 @@ func (dRand *DRand) getValidatorPeerByID(validatorID uint32) *p2p.Peer { func (dRand *DRand) ResetState() { dRand.vrfs = &map[uint32][]byte{} - bitmap, _ := bls_cosi.NewMask(dRand.PublicKeys, dRand.leader.PubKey) + bitmap, _ := bls_cosi.NewMask(dRand.PublicKeys, dRand.Leader.PubKey) dRand.bitmap = bitmap dRand.pRand = nil dRand.rand = nil diff --git a/drand/drand_leader.go b/drand/drand_leader.go index 43479a67b..0753152e9 100644 --- a/drand/drand_leader.go +++ b/drand/drand_leader.go @@ -9,6 +9,7 @@ import ( "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/crypto/vrf/p256" "github.com/harmony-one/harmony/internal/utils" + "github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p/host" ) @@ -45,7 +46,11 @@ func (dRand *DRand) init(epochBlock *types.Block) { (*dRand.vrfs)[dRand.nodeID] = append(rand[:], proof...) - host.BroadcastMessageFromLeader(dRand.host, dRand.GetValidatorPeers(), msgToSend, nil) + if utils.UseLibP2P { + dRand.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend)) + } else { + host.BroadcastMessageFromLeader(dRand.host, dRand.GetValidatorPeers(), msgToSend, nil) + } } // ProcessMessageLeader dispatches messages for the leader to corresponding processors. @@ -86,7 +91,7 @@ func (dRand *DRand) processCommitMessage(message drand_proto.Message) { // Verify message signature err := verifyMessageSig(validatorPeer.PubKey, message) if err != nil { - utils.GetLogInstance().Warn("Failed to verify the message signature", "Error", err) + utils.GetLogInstance().Warn("[DRAND] failed to verify the message signature", "Error", err, "PubKey", validatorPeer.PubKey) return } @@ -99,18 +104,18 @@ func (dRand *DRand) processCommitMessage(message drand_proto.Message) { expectedRand, err := pubKey.ProofToHash(dRand.blockHash[:], proof) if err != nil || !bytes.Equal(expectedRand[:], rand) { - utils.GetLogInstance().Error("Failed to verify the VRF", "error", err, "validatorID", validatorID, "expectedRand", expectedRand, "receivedRand", rand) + utils.GetLogInstance().Error("[DRAND] Failed to verify the VRF", "error", err, "validatorID", validatorID, "expectedRand", expectedRand, "receivedRand", rand) return } - utils.GetLogInstance().Debug("Received new commit", "numReceivedSoFar", len((*vrfs)), "validatorID", validatorID, "PublicKeys", len(dRand.PublicKeys)) + utils.GetLogInstance().Debug("Received new VRF commit", "numReceivedSoFar", len((*vrfs)), "validatorID", validatorID, "PublicKeys", len(dRand.PublicKeys)) (*vrfs)[validatorID] = message.Payload dRand.bitmap.SetKey(validatorPeer.PubKey, true) // Set the bitmap indicating that this validator signed. if len((*vrfs)) >= ((len(dRand.PublicKeys))/3 + 1) { // Construct pRand and initiate consensus on it - utils.GetLogInstance().Debug("Received enough randomness commit", "numReceivedSoFar", len((*vrfs)), "validatorID", validatorID, "PublicKeys", len(dRand.PublicKeys)) + utils.GetLogInstance().Debug("[DRAND] {BINGO} Received enough randomness commit", "numReceivedSoFar", len((*vrfs)), "validatorID", validatorID, "PublicKeys", len(dRand.PublicKeys)) pRnd := [32]byte{} // Bitwise XOR on all the submitted vrfs diff --git a/drand/drand_validator.go b/drand/drand_validator.go index 116533f69..d025ca288 100644 --- a/drand/drand_validator.go +++ b/drand/drand_validator.go @@ -4,6 +4,7 @@ import ( protobuf "github.com/golang/protobuf/proto" drand_proto "github.com/harmony-one/harmony/api/drand" "github.com/harmony-one/harmony/internal/utils" + "github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p/host" ) @@ -34,9 +35,9 @@ func (dRand *DRand) processInitMessage(message drand_proto.Message) { blockHash := message.BlockHash // Verify message signature - err := verifyMessageSig(dRand.leader.PubKey, message) + err := verifyMessageSig(dRand.Leader.PubKey, message) if err != nil { - utils.GetLogInstance().Warn("Failed to verify the message signature", "Error", err) + utils.GetLogInstance().Warn("Failed to verify the message signature", "Error", err, "Leader.PubKey", dRand.Leader.PubKey) return } @@ -48,5 +49,9 @@ func (dRand *DRand) processInitMessage(message drand_proto.Message) { msgToSend := dRand.constructCommitMessage(rand, proof) // Send the commit message back to leader - host.SendMessage(dRand.host, dRand.leader, msgToSend, nil) + if utils.UseLibP2P { + dRand.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend)) + } else { + host.SendMessage(dRand.host, dRand.Leader, msgToSend, nil) + } } diff --git a/node/node.go b/node/node.go index b7c7f0ea9..571ec7129 100644 --- a/node/node.go +++ b/node/node.go @@ -35,7 +35,6 @@ import ( "github.com/harmony-one/harmony/api/service/networkinfo" randomness_service "github.com/harmony-one/harmony/api/service/randomness" - "github.com/harmony-one/harmony/api/service/staking" "github.com/harmony-one/harmony/api/service/syncing" "github.com/harmony-one/harmony/api/service/syncing/downloader" downloader_pb "github.com/harmony-one/harmony/api/service/syncing/downloader/proto" @@ -197,6 +196,10 @@ type Node struct { // Group Message Receiver groupReceiver p2p.GroupReceiver + // Client Message Receiver to handle light client messages + // Beacon leader needs to use this receiver to talk to new node + clientReceiver p2p.GroupReceiver + // Duplicated Ping Message Received duplicatedPing map[string]bool @@ -205,6 +208,9 @@ type Node struct { // My GroupID MyShardGroupID p2p.GroupID + + // My ShardClient GroupID + MyClientGroupID p2p.GroupID } // Blockchain returns the blockchain from node @@ -229,7 +235,7 @@ func (node *Node) getTransactionsForNewBlock(maxNumTxs int) types.Transactions { utils.GetLogInstance().Debug("Invalid transactions discarded", "number", len(invalid)) node.pendingTransactions = unselected - utils.GetLogInstance().Debug("Remaining pending transactions", "number", len(node.pendingTransactions)) + utils.GetLogInstance().Debug("Remaining pending transactions", "number", len(node.pendingTransactions), "selected", len(selected)) node.pendingTxMutex.Unlock() return selected } @@ -313,6 +319,7 @@ func New(host p2p.Host, consensus *bft.Consensus, db ethdb.Database) *Node { if consensus != nil && consensus.IsLeader { node.State = NodeLeader + go node.ReceiveClientGroupMessage() } else { node.State = NodeInit } @@ -329,7 +336,11 @@ func New(host p2p.Host, consensus *bft.Consensus, db ethdb.Database) *Node { node.duplicatedPing = make(map[string]bool) - node.startConsensus = make(chan struct{}) + if utils.UseLibP2P { + node.startConsensus = make(chan struct{}) + } else { + node.startConsensus = nil + } return &node } @@ -716,25 +727,45 @@ func decodeFuncSign(data []byte) string { return funcSign } -func (node *Node) setupForShardLeader() { +func (node *Node) initNodeConfiguration() (service.NodeConfig, chan p2p.Peer) { chanPeer := make(chan p2p.Peer) nodeConfig := service.NodeConfig{ IsBeacon: false, - IsClient: false, + IsClient: true, Beacon: p2p.GroupIDBeacon, Group: p2p.GroupIDUnknown, Actions: make(map[p2p.GroupID]p2p.ActionType), } - nodeConfig.Actions[p2p.GroupIDBeacon] = p2p.ActionStart + nodeConfig.Actions[p2p.GroupIDBeaconClient] = p2p.ActionStart var err error - node.groupReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeacon) + node.groupReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeaconClient) if err != nil { utils.GetLogInstance().Error("create group receiver error", "msg", err) - return } + return nodeConfig, chanPeer +} + +func (node *Node) initBeaconNodeConfiguration() (service.NodeConfig, chan p2p.Peer) { + nodeConfig, chanPeer := node.initNodeConfiguration() + nodeConfig.IsBeacon = true + + var err error + // All beacon chain node will subscribe to BeaconClient topic + node.clientReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeaconClient) + if err != nil { + utils.GetLogInstance().Error("create client receiver error", "msg", err) + } + node.MyClientGroupID = p2p.GroupIDBeaconClient + + return nodeConfig, chanPeer +} + +func (node *Node) setupForShardLeader() { + nodeConfig, chanPeer := node.initNodeConfiguration() + // Register peer discovery service. No need to do staking for beacon chain node. node.serviceManager.RegisterService(service_manager.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer)) // Register networkinfo service. "0" is the beacon shard ID @@ -753,47 +784,16 @@ func (node *Node) setupForShardLeader() { } func (node *Node) setupForShardValidator() { - chanPeer := make(chan p2p.Peer) - nodeConfig := service.NodeConfig{ - IsBeacon: false, - IsClient: false, - Beacon: p2p.GroupIDBeacon, - Group: p2p.GroupIDUnknown, - Actions: make(map[p2p.GroupID]p2p.ActionType), - } - nodeConfig.Actions[p2p.GroupIDBeacon] = p2p.ActionStart - - var err error - node.groupReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeacon) - if err != nil { - utils.GetLogInstance().Error("create group receiver error", "msg", err) - return - } + nodeConfig, chanPeer := node.initNodeConfiguration() // Register peer discovery service. "0" is the beacon shard ID. No need to do staking for beacon chain node. node.serviceManager.RegisterService(service_manager.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer)) // Register networkinfo service. "0" is the beacon shard ID node.serviceManager.RegisterService(service_manager.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer)) - } func (node *Node) setupForBeaconLeader() { - chanPeer := make(chan p2p.Peer) - nodeConfig := service.NodeConfig{ - IsBeacon: true, - IsClient: false, - Beacon: p2p.GroupIDBeacon, - Group: p2p.GroupIDUnknown, - Actions: make(map[p2p.GroupID]p2p.ActionType), - } - nodeConfig.Actions[p2p.GroupIDBeacon] = p2p.ActionStart - - var err error - node.groupReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeacon) - if err != nil { - utils.GetLogInstance().Error("create group receiver error", "msg", err) - return - } + nodeConfig, chanPeer := node.initBeaconNodeConfiguration() // Register peer discovery service. No need to do staking for beacon chain node. node.serviceManager.RegisterService(service_manager.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer)) @@ -808,26 +808,10 @@ func (node *Node) setupForBeaconLeader() { node.serviceManager.RegisterService(service_manager.ClientSupport, clientsupport.New(node.blockchain.State, node.CallFaucetContract, node.getDeployedStakingContract, node.SelfPeer.IP, node.SelfPeer.Port)) // Register randomness service node.serviceManager.RegisterService(service_manager.Randomness, randomness_service.New(node.DRand)) - } func (node *Node) setupForBeaconValidator() { - chanPeer := make(chan p2p.Peer) - nodeConfig := service.NodeConfig{ - IsBeacon: true, - IsClient: false, - Beacon: p2p.GroupIDBeacon, - Group: p2p.GroupIDUnknown, - Actions: make(map[p2p.GroupID]p2p.ActionType), - } - nodeConfig.Actions[p2p.GroupIDBeacon] = p2p.ActionStart - - var err error - node.groupReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeacon) - if err != nil { - utils.GetLogInstance().Error("create group receiver error", "msg", err) - return - } + nodeConfig, chanPeer := node.initBeaconNodeConfiguration() // Register peer discovery service. No need to do staking for beacon chain node. node.serviceManager.RegisterService(service_manager.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer)) @@ -838,28 +822,12 @@ func (node *Node) setupForBeaconValidator() { } func (node *Node) setupForNewNode() { - chanPeer := make(chan p2p.Peer) - stakingPeer := make(chan p2p.Peer) - - nodeConfig := service.NodeConfig{ - IsBeacon: false, - IsClient: false, - Beacon: p2p.GroupIDBeacon, - Group: p2p.GroupIDUnknown, - Actions: make(map[p2p.GroupID]p2p.ActionType), - } - nodeConfig.Actions[p2p.GroupIDBeacon] = p2p.ActionStart - - var err error - node.groupReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeacon) - if err != nil { - utils.GetLogInstance().Error("create group receiver error", "msg", err) - return - } + nodeConfig, chanPeer := node.initNodeConfiguration() // Register staking service. - node.serviceManager.RegisterService(service_manager.Staking, staking.New(node.AccountKey, 0, stakingPeer)) - // Register peer discovery service. "0" is the beacon shard ID + // node.serviceManager.RegisterService(service_manager.Staking, staking.New(node.AccountKey, 0, stakingPeer)) + // TODO: (leo) no need to start discovery service for new node until we received the sharding info + // Register peer discovery service. node.serviceManager.RegisterService(service_manager.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer)) // Register networkinfo service. "0" is the beacon shard ID node.serviceManager.RegisterService(service_manager.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer)) diff --git a/node/node.md b/node/node.md index 2a00f1273..34a71cf2d 100644 --- a/node/node.md +++ b/node/node.md @@ -43,3 +43,17 @@ type Action struct { ### Resharding Service Manager is very handy to transform a node role from validator to leader or anything else. All we need to do is to stop all current services and start all services of the new role. + +### LibP2P Integration + +We have enabled libp2p based gossiping using pubsub. Nodes no longer send messages to individual nodes. +All message communication is via SendMessageToGroups function. + +* Beacon chain nodes need to subscribe to TWO topics +** one is beacon chain topic itself: GroupIDBeacon +** another one is beacon client topic: GroupIDBeaconClient. Only Beacon Chain leader needs to send to this topic. + +* Every new node other than beacon chain nodes needs to subscribe to THREE topic. This also include txgen program. +** one is beacon chain client topic => It is used to send staking transaction, and receive beacon chain blocks to determine the sharding info and randomness +** one is shard consensus itself => It is used for within shard consensus, pingpong messages +** one is client of the shard => It is used to receive tx from client, and send block back to client like txgen. Only shard Leader needs to send to this topic. diff --git a/node/node_handler.go b/node/node_handler.go index 19a155a74..ae09f6134 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -69,6 +69,26 @@ func (node *Node) ReceiveGroupMessage() { } } +// ReceiveClientGroupMessage use libp2p pubsub mechanism to receive broadcast messages for client +func (node *Node) ReceiveClientGroupMessage() { + ctx := context.Background() + for { + if node.clientReceiver == nil { + // check less frequent on client messages + time.Sleep(1000 * time.Millisecond) + continue + } + msg, sender, err := node.clientReceiver.Receive(ctx) + if sender != node.host.GetID() { + utils.GetLogInstance().Info("[CLIENT]", "received group msg", len(msg), "sender", sender) + if err == nil { + // skip the first 5 bytes, 1 byte is p2p type, 4 bytes are message size + node.messageHandler(msg[5:], string(sender)) + } + } + } +} + // messageHandler parses the message and dispatch the actions func (node *Node) messageHandler(content []byte, sender string) { // node.MaybeBroadcastAsValidator(content) @@ -231,7 +251,7 @@ func (node *Node) BroadcastNewBlock(newBlock *types.Block) { if node.ClientPeer != nil { utils.GetLogInstance().Debug("Sending new block to client", "client", node.ClientPeer) if utils.UseLibP2P { - node.host.SendMessageToGroups([]p2p.GroupID{node.MyShardGroupID}, proto_node.ConstructBlocksSyncMessage([]*types.Block{newBlock})) + node.host.SendMessageToGroups([]p2p.GroupID{node.MyClientGroupID}, host.ConstructP2pMessage(byte(0), proto_node.ConstructBlocksSyncMessage([]*types.Block{newBlock}))) } else { node.SendMessage(*node.ClientPeer, proto_node.ConstructBlocksSyncMessage([]*types.Block{newBlock})) } @@ -389,8 +409,7 @@ func (node *Node) SendPongMessage() { if !sentMessage { pong := proto_discovery.NewPongMessage(peers, node.Consensus.PublicKeys, node.Consensus.Leader.PubKey) buffer := pong.ConstructPongMessage() - content := host.ConstructP2pMessage(byte(0), buffer) - err := node.host.SendMessageToGroups([]p2p.GroupID{node.MyShardGroupID}, content) + err := node.host.SendMessageToGroups([]p2p.GroupID{node.MyShardGroupID}, host.ConstructP2pMessage(byte(0), buffer)) if err != nil { utils.GetLogInstance().Error("[PONG] failed to send pong message", "group", node.MyShardGroupID) continue @@ -441,7 +460,12 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int { node.Consensus.Leader.PubKey = &bls.PublicKey{} err = node.Consensus.Leader.PubKey.Deserialize(pong.LeaderPubKey) if err != nil { - utils.GetLogInstance().Error("Unmarshal Leader PubKey Failed", "error", err) + utils.GetLogInstance().Error("Unmarshal Consensus Leader PubKey Failed", "error", err) + } + node.DRand.Leader.PubKey = &bls.PublicKey{} + err = node.DRand.Leader.PubKey.Deserialize(pong.LeaderPubKey) + if err != nil { + utils.GetLogInstance().Error("Unmarshal DRand Leader PubKey Failed", "error", err) } // Reset Validator PublicKeys every time we receive PONG message from Leader diff --git a/p2p/group.go b/p2p/group.go index 68304969d..43aa473dd 100644 --- a/p2p/group.go +++ b/p2p/group.go @@ -24,9 +24,10 @@ func (id GroupID) String() string { // Const of group ID const ( - GroupIDBeacon GroupID = "harmony/0.0.1/beacon" - GroupIDGlobal GroupID = "harmony/0.0.1/global" - GroupIDUnknown GroupID = "B1acKh0lE" + GroupIDBeacon GroupID = "harmony/0.0.1/beacon" + GroupIDBeaconClient GroupID = "harmony/0.0.1/beacon/client" + GroupIDGlobal GroupID = "harmony/0.0.1/global" + GroupIDUnknown GroupID = "B1acKh0lE" ) // ActionType lists action on group