Merge pull request #467 from LeoHChen/libp2p

Txgen using Libp2p gossip to send TX to shard
pull/466/head
Leo Chen 6 years ago committed by GitHub
commit b443ed0463
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      api/service/discovery/service.go
  2. 2
      api/service/networkinfo/service.go
  3. 74
      cmd/client/txgen/main.go
  4. 11
      consensus/consensus_leader.go
  5. 9
      drand/drand.go
  6. 13
      drand/drand_leader.go
  7. 11
      drand/drand_validator.go
  8. 122
      node/node.go
  9. 14
      node/node.md
  10. 32
      node/node_handler.go
  11. 1
      p2p/group.go

@ -106,7 +106,7 @@ func (s *Service) contactP2pPeers() {
}
if a == p2p.ActionStart || a == p2p.ActionResume || a == p2p.ActionPause {
if g == p2p.GroupIDBeacon {
if g == p2p.GroupIDBeacon || g == p2p.GroupIDBeaconClient {
if s.config.IsBeacon {
// beacon chain node
err = s.host.SendMessageToGroups([]p2p.GroupID{s.config.Beacon}, regMsgBuf)

@ -133,8 +133,10 @@ func (s *Service) DoService() {
}
p := p2p.Peer{IP: ip, Port: port, PeerID: peer.ID, Addrs: peer.Addrs}
utils.GetLogInstance().Info("Notify peerChan", "peer", p)
if s.peerChan != nil {
s.peerChan <- p
}
}
case <-s.stopChan:
return
}

@ -19,6 +19,7 @@ import (
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/node"
"github.com/harmony-one/harmony/p2p"
p2p_host "github.com/harmony-one/harmony/p2p/host"
"github.com/harmony-one/harmony/p2p/p2pimpl"
peerstore "github.com/libp2p/go-libp2p-peerstore"
multiaddr "github.com/multiformats/go-multiaddr"
@ -70,13 +71,32 @@ func main() {
// Add GOMAXPROCS to achieve max performance.
runtime.GOMAXPROCS(1024)
var bcPeer *p2p.Peer
// Logging setup
utils.SetPortAndIP(*port, *ip)
if len(utils.BootNodes) == 0 {
bootNodeAddrs, err := utils.StringsToAddrs(utils.DefaultBootNodeAddrStrings)
if err != nil {
panic(err)
}
utils.BootNodes = bootNodeAddrs
}
var shardIDLeaderMap map[uint32]p2p.Peer
priKey, _, err := utils.LoadKeyFromFile(*keyFile)
nodePriKey, _, err := utils.LoadKeyFromFile(*keyFile)
if err != nil {
panic(err)
}
peerPriKey, peerPubKey := utils.GenKey(*ip, *port)
if peerPriKey == nil || peerPubKey == nil {
panic(fmt.Errorf("generate key error"))
}
selfPeer := p2p.Peer{IP: *ip, Port: *port, ValidatorID: -1, PubKey: peerPubKey}
if !*libp2pPD {
var bcPeer *p2p.Peer
if *bcAddr != "" {
// Turn the destination into a multiaddr.
maddr, err := multiaddr.NewMultiaddr(*bcAddr)
@ -95,7 +115,7 @@ func main() {
bcPeer = &p2p.Peer{IP: *bcIP, Port: *bcPort}
}
candidateNode := newnode.New(*ip, *port, priKey)
candidateNode := newnode.New(*ip, *port, nodePriKey)
candidateNode.AddPeer(bcPeer)
candidateNode.ContactBeaconChain(*bcPeer)
selfPeer := candidateNode.GetSelfPeer()
@ -104,6 +124,12 @@ func main() {
shardIDLeaderMap = candidateNode.Leaders
debugPrintShardIDLeaderMap(shardIDLeaderMap)
} else {
// Init with LibP2P enabled, FIXME: (leochen) right now we support only one shard
shardIDLeaderMap = make(map[uint32]p2p.Peer)
shardIDLeaderMap[0] = p2p.Peer{}
utils.UseLibP2P = true
}
// Do cross shard tx if there are more than one shard
setting := txgen.Settings{
@ -124,7 +150,7 @@ func main() {
// Nodes containing blockchain data to mirror the shards' data in the network
nodes := []*node.Node{}
host, err := p2pimpl.NewHost(&selfPeer, priKey)
host, err := p2pimpl.NewHost(&selfPeer, nodePriKey)
if err != nil {
panic("unable to new host in txgen")
}
@ -147,15 +173,15 @@ func main() {
}()
// This func is used to update the client's blockchain when new blocks are received from the leaders
updateBlocksFunc := func(blocks []*types.Block) {
log.Info("[Txgen] Received new block", "block", blocks)
utils.GetLogInstance().Info("[Txgen] Received new block", "block", blocks)
for _, block := range blocks {
for _, node := range nodes {
shardID := block.ShardID()
if node.Consensus.ShardID == shardID {
// Add it to blockchain
log.Info("Current Block", "hash", node.Blockchain().CurrentBlock().Hash().Hex())
log.Info("Adding block from leader", "txNum", len(block.Transactions()), "shardID", shardID, "preHash", block.ParentHash().Hex())
utils.GetLogInstance().Info("Current Block", "hash", node.Blockchain().CurrentBlock().Hash().Hex())
utils.GetLogInstance().Info("Adding block from leader", "txNum", len(block.Transactions()), "shardID", shardID, "preHash", block.ParentHash().Hex())
node.AddNewBlock(block)
stateMutex.Lock()
node.Worker.UpdateCurrent()
@ -169,24 +195,24 @@ func main() {
}
clientNode.Client.UpdateBlocks = updateBlocksFunc
// Start the client server to listen to leader's message
go clientNode.StartServer()
for _, leader := range shardIDLeaderMap {
log.Debug("Client Join Shard", "leader", leader)
clientNode.GetHost().AddPeer(&leader)
if *libp2pPD {
clientNode.Role = node.NewNode
} else {
clientNode.GetHost().AddPeer(&leader)
utils.GetLogInstance().Debug("Client Join Shard", "leader", leader)
go clientNode.JoinShard(leader)
}
clientNode.State = node.NodeReadyForConsensus
}
if *libp2pPD {
clientNode.ServiceManagerSetup()
clientNode.RunServices()
clientNode.StartServer()
go clientNode.StartServer()
} else {
// Start the client server to listen to leader's message
go clientNode.StartServer()
// wait for 1 seconds for client to send ping message to leader
time.Sleep(time.Second)
clientNode.StopPing <- struct{}{}
@ -194,14 +220,14 @@ func main() {
clientNode.State = node.NodeReadyForConsensus
// Transaction generation process
time.Sleep(2 * time.Second) // wait for nodes to be ready
time.Sleep(5 * time.Second) // wait for nodes to be ready
start := time.Now()
totalTime := float64(*duration)
for {
t := time.Now()
if totalTime > 0 && t.Sub(start).Seconds() >= totalTime {
log.Debug("Generator timer ended.", "duration", (int(t.Sub(start))), "startTime", start, "totalTime", totalTime)
utils.GetLogInstance().Debug("Generator timer ended.", "duration", (int(t.Sub(start))), "startTime", start, "totalTime", totalTime)
break
}
select {
@ -210,7 +236,7 @@ func main() {
lock := sync.Mutex{}
stateMutex.Lock()
log.Warn("STARTING TX GEN", "gomaxprocs", runtime.GOMAXPROCS(0))
utils.GetLogInstance().Warn("STARTING TX GEN", "gomaxprocs", runtime.GOMAXPROCS(0))
txs, _ := txgen.GenerateSimulatedTransactionsAccount(int(shardID), nodes, setting)
lock.Lock()
@ -226,26 +252,34 @@ func main() {
}(shardID, txs)
}
lock.Unlock()
case <-time.After(2 * time.Second):
log.Warn("No new block is received so far")
case <-time.After(10 * time.Second):
utils.GetLogInstance().Warn("No new block is received so far")
}
}
// Send a stop message to stop the nodes at the end
msg := proto_node.ConstructStopMessage()
if utils.UseLibP2P {
clientNode.GetHost().SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeaconClient}, p2p_host.ConstructP2pMessage(byte(0), msg))
} else {
clientNode.BroadcastMessage(clientNode.Client.GetLeaders(), msg)
}
time.Sleep(3000 * time.Millisecond)
}
// SendTxsToLeader sends txs to leader account.
func SendTxsToLeader(clientNode *node.Node, leader p2p.Peer, txs types.Transactions) {
log.Debug("[Generator] Sending account-based txs to...", "leader", leader, "numTxs", len(txs))
utils.GetLogInstance().Debug("[Generator] Sending account-based txs to...", "leader", leader, "numTxs", len(txs))
msg := proto_node.ConstructTransactionListMessageAccount(txs)
if utils.UseLibP2P {
clientNode.GetHost().SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeaconClient}, p2p_host.ConstructP2pMessage(byte(0), msg))
} else {
clientNode.SendMessage(leader, msg)
}
}
func debugPrintShardIDLeaderMap(leaderMap map[uint32]p2p.Peer) {
for k, v := range leaderMap {
log.Debug("Leader", "ShardID", k, "Leader", v)
utils.GetLogInstance().Debug("Leader", "ShardID", k, "Leader", v)
}
}

@ -31,13 +31,24 @@ var (
// WaitForNewBlock waits for the next new block to run consensus on
func (consensus *Consensus) WaitForNewBlock(blockChannel chan *types.Block, stopChan chan struct{}, stoppedChan chan struct{}, startChannel chan struct{}) {
// gensis block is the first block to be processed.
// But we shouldn't start consensus yet, as we need to wait for all validators
// received the leader's pub key which will be propogated via Pong message.
// After we started the first consensus, we will go back to normal case to wait
// for new blocks.
// The signal to start the first consensus right now is the sending of Pong message (SendPongMessage function in node/node_handler.go
// but it can be changed to other conditions later
first := true
go func() {
defer close(stoppedChan)
for {
select {
default:
if first && startChannel != nil {
// got the signal to start consensus
_ = <-startChannel
first = false
}
utils.GetLogInstance().Debug("Waiting for block", "consensus", consensus)
// keep waiting for new blocks

@ -37,7 +37,7 @@ type DRand struct {
validators sync.Map // key is uint16, value is p2p.Peer
// Leader's address
leader p2p.Peer
Leader p2p.Peer
// Public keys of the committee including leader and validators
PublicKeys []*bls.PublicKey
@ -85,7 +85,7 @@ func New(host p2p.Host, ShardID string, peers []p2p.Peer, leader p2p.Peer, confi
dRand.IsLeader = false
}
dRand.leader = leader
dRand.Leader = leader
for _, peer := range peers {
dRand.validators.Store(utils.GetUniqueIDFromPeer(peer), peer)
}
@ -101,7 +101,7 @@ func New(host p2p.Host, ShardID string, peers []p2p.Peer, leader p2p.Peer, confi
dRand.PublicKeys = allPublicKeys
bitmap, _ := bls_cosi.NewMask(dRand.PublicKeys, dRand.leader.PubKey)
bitmap, _ := bls_cosi.NewMask(dRand.PublicKeys, dRand.Leader.PubKey)
dRand.bitmap = bitmap
dRand.pRand = nil
@ -145,6 +145,7 @@ func (dRand *DRand) AddPeers(peers []*p2p.Peer) int {
dRand.pubKeyLock.Lock()
dRand.PublicKeys = append(dRand.PublicKeys, peer.PubKey)
dRand.pubKeyLock.Unlock()
utils.GetLogInstance().Debug("[DRAND]", "AddPeers", *peer)
}
count++
}
@ -241,7 +242,7 @@ func (dRand *DRand) getValidatorPeerByID(validatorID uint32) *p2p.Peer {
func (dRand *DRand) ResetState() {
dRand.vrfs = &map[uint32][]byte{}
bitmap, _ := bls_cosi.NewMask(dRand.PublicKeys, dRand.leader.PubKey)
bitmap, _ := bls_cosi.NewMask(dRand.PublicKeys, dRand.Leader.PubKey)
dRand.bitmap = bitmap
dRand.pRand = nil
dRand.rand = nil

@ -9,6 +9,7 @@ import (
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/crypto/vrf/p256"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/host"
)
@ -45,8 +46,12 @@ func (dRand *DRand) init(epochBlock *types.Block) {
(*dRand.vrfs)[dRand.nodeID] = append(rand[:], proof...)
if utils.UseLibP2P {
dRand.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend))
} else {
host.BroadcastMessageFromLeader(dRand.host, dRand.GetValidatorPeers(), msgToSend, nil)
}
}
// ProcessMessageLeader dispatches messages for the leader to corresponding processors.
func (dRand *DRand) ProcessMessageLeader(payload []byte) {
@ -86,7 +91,7 @@ func (dRand *DRand) processCommitMessage(message drand_proto.Message) {
// Verify message signature
err := verifyMessageSig(validatorPeer.PubKey, message)
if err != nil {
utils.GetLogInstance().Warn("Failed to verify the message signature", "Error", err)
utils.GetLogInstance().Warn("[DRAND] failed to verify the message signature", "Error", err, "PubKey", validatorPeer.PubKey)
return
}
@ -99,18 +104,18 @@ func (dRand *DRand) processCommitMessage(message drand_proto.Message) {
expectedRand, err := pubKey.ProofToHash(dRand.blockHash[:], proof)
if err != nil || !bytes.Equal(expectedRand[:], rand) {
utils.GetLogInstance().Error("Failed to verify the VRF", "error", err, "validatorID", validatorID, "expectedRand", expectedRand, "receivedRand", rand)
utils.GetLogInstance().Error("[DRAND] Failed to verify the VRF", "error", err, "validatorID", validatorID, "expectedRand", expectedRand, "receivedRand", rand)
return
}
utils.GetLogInstance().Debug("Received new commit", "numReceivedSoFar", len((*vrfs)), "validatorID", validatorID, "PublicKeys", len(dRand.PublicKeys))
utils.GetLogInstance().Debug("Received new VRF commit", "numReceivedSoFar", len((*vrfs)), "validatorID", validatorID, "PublicKeys", len(dRand.PublicKeys))
(*vrfs)[validatorID] = message.Payload
dRand.bitmap.SetKey(validatorPeer.PubKey, true) // Set the bitmap indicating that this validator signed.
if len((*vrfs)) >= ((len(dRand.PublicKeys))/3 + 1) {
// Construct pRand and initiate consensus on it
utils.GetLogInstance().Debug("Received enough randomness commit", "numReceivedSoFar", len((*vrfs)), "validatorID", validatorID, "PublicKeys", len(dRand.PublicKeys))
utils.GetLogInstance().Debug("[DRAND] {BINGO} Received enough randomness commit", "numReceivedSoFar", len((*vrfs)), "validatorID", validatorID, "PublicKeys", len(dRand.PublicKeys))
pRnd := [32]byte{}
// Bitwise XOR on all the submitted vrfs

@ -4,6 +4,7 @@ import (
protobuf "github.com/golang/protobuf/proto"
drand_proto "github.com/harmony-one/harmony/api/drand"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/host"
)
@ -34,9 +35,9 @@ func (dRand *DRand) processInitMessage(message drand_proto.Message) {
blockHash := message.BlockHash
// Verify message signature
err := verifyMessageSig(dRand.leader.PubKey, message)
err := verifyMessageSig(dRand.Leader.PubKey, message)
if err != nil {
utils.GetLogInstance().Warn("Failed to verify the message signature", "Error", err)
utils.GetLogInstance().Warn("Failed to verify the message signature", "Error", err, "Leader.PubKey", dRand.Leader.PubKey)
return
}
@ -48,5 +49,9 @@ func (dRand *DRand) processInitMessage(message drand_proto.Message) {
msgToSend := dRand.constructCommitMessage(rand, proof)
// Send the commit message back to leader
host.SendMessage(dRand.host, dRand.leader, msgToSend, nil)
if utils.UseLibP2P {
dRand.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend))
} else {
host.SendMessage(dRand.host, dRand.Leader, msgToSend, nil)
}
}

@ -35,7 +35,6 @@ import (
"github.com/harmony-one/harmony/api/service/networkinfo"
randomness_service "github.com/harmony-one/harmony/api/service/randomness"
"github.com/harmony-one/harmony/api/service/staking"
"github.com/harmony-one/harmony/api/service/syncing"
"github.com/harmony-one/harmony/api/service/syncing/downloader"
downloader_pb "github.com/harmony-one/harmony/api/service/syncing/downloader/proto"
@ -197,6 +196,10 @@ type Node struct {
// Group Message Receiver
groupReceiver p2p.GroupReceiver
// Client Message Receiver to handle light client messages
// Beacon leader needs to use this receiver to talk to new node
clientReceiver p2p.GroupReceiver
// Duplicated Ping Message Received
duplicatedPing map[string]bool
@ -205,6 +208,9 @@ type Node struct {
// My GroupID
MyShardGroupID p2p.GroupID
// My ShardClient GroupID
MyClientGroupID p2p.GroupID
}
// Blockchain returns the blockchain from node
@ -229,7 +235,7 @@ func (node *Node) getTransactionsForNewBlock(maxNumTxs int) types.Transactions {
utils.GetLogInstance().Debug("Invalid transactions discarded", "number", len(invalid))
node.pendingTransactions = unselected
utils.GetLogInstance().Debug("Remaining pending transactions", "number", len(node.pendingTransactions))
utils.GetLogInstance().Debug("Remaining pending transactions", "number", len(node.pendingTransactions), "selected", len(selected))
node.pendingTxMutex.Unlock()
return selected
}
@ -313,6 +319,7 @@ func New(host p2p.Host, consensus *bft.Consensus, db ethdb.Database) *Node {
if consensus != nil && consensus.IsLeader {
node.State = NodeLeader
go node.ReceiveClientGroupMessage()
} else {
node.State = NodeInit
}
@ -329,7 +336,11 @@ func New(host p2p.Host, consensus *bft.Consensus, db ethdb.Database) *Node {
node.duplicatedPing = make(map[string]bool)
if utils.UseLibP2P {
node.startConsensus = make(chan struct{})
} else {
node.startConsensus = nil
}
return &node
}
@ -716,25 +727,45 @@ func decodeFuncSign(data []byte) string {
return funcSign
}
func (node *Node) setupForShardLeader() {
func (node *Node) initNodeConfiguration() (service.NodeConfig, chan p2p.Peer) {
chanPeer := make(chan p2p.Peer)
nodeConfig := service.NodeConfig{
IsBeacon: false,
IsClient: false,
IsClient: true,
Beacon: p2p.GroupIDBeacon,
Group: p2p.GroupIDUnknown,
Actions: make(map[p2p.GroupID]p2p.ActionType),
}
nodeConfig.Actions[p2p.GroupIDBeacon] = p2p.ActionStart
nodeConfig.Actions[p2p.GroupIDBeaconClient] = p2p.ActionStart
var err error
node.groupReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeacon)
node.groupReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeaconClient)
if err != nil {
utils.GetLogInstance().Error("create group receiver error", "msg", err)
return
}
return nodeConfig, chanPeer
}
func (node *Node) initBeaconNodeConfiguration() (service.NodeConfig, chan p2p.Peer) {
nodeConfig, chanPeer := node.initNodeConfiguration()
nodeConfig.IsBeacon = true
var err error
// All beacon chain node will subscribe to BeaconClient topic
node.clientReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeaconClient)
if err != nil {
utils.GetLogInstance().Error("create client receiver error", "msg", err)
}
node.MyClientGroupID = p2p.GroupIDBeaconClient
return nodeConfig, chanPeer
}
func (node *Node) setupForShardLeader() {
nodeConfig, chanPeer := node.initNodeConfiguration()
// Register peer discovery service. No need to do staking for beacon chain node.
node.serviceManager.RegisterService(service_manager.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer))
// Register networkinfo service. "0" is the beacon shard ID
@ -753,47 +784,16 @@ func (node *Node) setupForShardLeader() {
}
func (node *Node) setupForShardValidator() {
chanPeer := make(chan p2p.Peer)
nodeConfig := service.NodeConfig{
IsBeacon: false,
IsClient: false,
Beacon: p2p.GroupIDBeacon,
Group: p2p.GroupIDUnknown,
Actions: make(map[p2p.GroupID]p2p.ActionType),
}
nodeConfig.Actions[p2p.GroupIDBeacon] = p2p.ActionStart
var err error
node.groupReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeacon)
if err != nil {
utils.GetLogInstance().Error("create group receiver error", "msg", err)
return
}
nodeConfig, chanPeer := node.initNodeConfiguration()
// Register peer discovery service. "0" is the beacon shard ID. No need to do staking for beacon chain node.
node.serviceManager.RegisterService(service_manager.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer))
// Register networkinfo service. "0" is the beacon shard ID
node.serviceManager.RegisterService(service_manager.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer))
}
func (node *Node) setupForBeaconLeader() {
chanPeer := make(chan p2p.Peer)
nodeConfig := service.NodeConfig{
IsBeacon: true,
IsClient: false,
Beacon: p2p.GroupIDBeacon,
Group: p2p.GroupIDUnknown,
Actions: make(map[p2p.GroupID]p2p.ActionType),
}
nodeConfig.Actions[p2p.GroupIDBeacon] = p2p.ActionStart
var err error
node.groupReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeacon)
if err != nil {
utils.GetLogInstance().Error("create group receiver error", "msg", err)
return
}
nodeConfig, chanPeer := node.initBeaconNodeConfiguration()
// Register peer discovery service. No need to do staking for beacon chain node.
node.serviceManager.RegisterService(service_manager.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer))
@ -808,26 +808,10 @@ func (node *Node) setupForBeaconLeader() {
node.serviceManager.RegisterService(service_manager.ClientSupport, clientsupport.New(node.blockchain.State, node.CallFaucetContract, node.getDeployedStakingContract, node.SelfPeer.IP, node.SelfPeer.Port))
// Register randomness service
node.serviceManager.RegisterService(service_manager.Randomness, randomness_service.New(node.DRand))
}
func (node *Node) setupForBeaconValidator() {
chanPeer := make(chan p2p.Peer)
nodeConfig := service.NodeConfig{
IsBeacon: true,
IsClient: false,
Beacon: p2p.GroupIDBeacon,
Group: p2p.GroupIDUnknown,
Actions: make(map[p2p.GroupID]p2p.ActionType),
}
nodeConfig.Actions[p2p.GroupIDBeacon] = p2p.ActionStart
var err error
node.groupReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeacon)
if err != nil {
utils.GetLogInstance().Error("create group receiver error", "msg", err)
return
}
nodeConfig, chanPeer := node.initBeaconNodeConfiguration()
// Register peer discovery service. No need to do staking for beacon chain node.
node.serviceManager.RegisterService(service_manager.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer))
@ -838,28 +822,12 @@ func (node *Node) setupForBeaconValidator() {
}
func (node *Node) setupForNewNode() {
chanPeer := make(chan p2p.Peer)
stakingPeer := make(chan p2p.Peer)
nodeConfig := service.NodeConfig{
IsBeacon: false,
IsClient: false,
Beacon: p2p.GroupIDBeacon,
Group: p2p.GroupIDUnknown,
Actions: make(map[p2p.GroupID]p2p.ActionType),
}
nodeConfig.Actions[p2p.GroupIDBeacon] = p2p.ActionStart
var err error
node.groupReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeacon)
if err != nil {
utils.GetLogInstance().Error("create group receiver error", "msg", err)
return
}
nodeConfig, chanPeer := node.initNodeConfiguration()
// Register staking service.
node.serviceManager.RegisterService(service_manager.Staking, staking.New(node.AccountKey, 0, stakingPeer))
// Register peer discovery service. "0" is the beacon shard ID
// node.serviceManager.RegisterService(service_manager.Staking, staking.New(node.AccountKey, 0, stakingPeer))
// TODO: (leo) no need to start discovery service for new node until we received the sharding info
// Register peer discovery service.
node.serviceManager.RegisterService(service_manager.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer))
// Register networkinfo service. "0" is the beacon shard ID
node.serviceManager.RegisterService(service_manager.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer))

@ -43,3 +43,17 @@ type Action struct {
### Resharding
Service Manager is very handy to transform a node role from validator to leader or anything else. All we need to do is to stop all current services and start all services of the new role.
### LibP2P Integration
We have enabled libp2p based gossiping using pubsub. Nodes no longer send messages to individual nodes.
All message communication is via SendMessageToGroups function.
* Beacon chain nodes need to subscribe to TWO topics
** one is beacon chain topic itself: GroupIDBeacon
** another one is beacon client topic: GroupIDBeaconClient. Only Beacon Chain leader needs to send to this topic.
* Every new node other than beacon chain nodes needs to subscribe to THREE topic. This also include txgen program.
** one is beacon chain client topic => It is used to send staking transaction, and receive beacon chain blocks to determine the sharding info and randomness
** one is shard consensus itself => It is used for within shard consensus, pingpong messages
** one is client of the shard => It is used to receive tx from client, and send block back to client like txgen. Only shard Leader needs to send to this topic.

@ -69,6 +69,26 @@ func (node *Node) ReceiveGroupMessage() {
}
}
// ReceiveClientGroupMessage use libp2p pubsub mechanism to receive broadcast messages for client
func (node *Node) ReceiveClientGroupMessage() {
ctx := context.Background()
for {
if node.clientReceiver == nil {
// check less frequent on client messages
time.Sleep(1000 * time.Millisecond)
continue
}
msg, sender, err := node.clientReceiver.Receive(ctx)
if sender != node.host.GetID() {
utils.GetLogInstance().Info("[CLIENT]", "received group msg", len(msg), "sender", sender)
if err == nil {
// skip the first 5 bytes, 1 byte is p2p type, 4 bytes are message size
node.messageHandler(msg[5:], string(sender))
}
}
}
}
// messageHandler parses the message and dispatch the actions
func (node *Node) messageHandler(content []byte, sender string) {
// node.MaybeBroadcastAsValidator(content)
@ -231,7 +251,7 @@ func (node *Node) BroadcastNewBlock(newBlock *types.Block) {
if node.ClientPeer != nil {
utils.GetLogInstance().Debug("Sending new block to client", "client", node.ClientPeer)
if utils.UseLibP2P {
node.host.SendMessageToGroups([]p2p.GroupID{node.MyShardGroupID}, proto_node.ConstructBlocksSyncMessage([]*types.Block{newBlock}))
node.host.SendMessageToGroups([]p2p.GroupID{node.MyClientGroupID}, host.ConstructP2pMessage(byte(0), proto_node.ConstructBlocksSyncMessage([]*types.Block{newBlock})))
} else {
node.SendMessage(*node.ClientPeer, proto_node.ConstructBlocksSyncMessage([]*types.Block{newBlock}))
}
@ -389,8 +409,7 @@ func (node *Node) SendPongMessage() {
if !sentMessage {
pong := proto_discovery.NewPongMessage(peers, node.Consensus.PublicKeys, node.Consensus.Leader.PubKey)
buffer := pong.ConstructPongMessage()
content := host.ConstructP2pMessage(byte(0), buffer)
err := node.host.SendMessageToGroups([]p2p.GroupID{node.MyShardGroupID}, content)
err := node.host.SendMessageToGroups([]p2p.GroupID{node.MyShardGroupID}, host.ConstructP2pMessage(byte(0), buffer))
if err != nil {
utils.GetLogInstance().Error("[PONG] failed to send pong message", "group", node.MyShardGroupID)
continue
@ -441,7 +460,12 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int {
node.Consensus.Leader.PubKey = &bls.PublicKey{}
err = node.Consensus.Leader.PubKey.Deserialize(pong.LeaderPubKey)
if err != nil {
utils.GetLogInstance().Error("Unmarshal Leader PubKey Failed", "error", err)
utils.GetLogInstance().Error("Unmarshal Consensus Leader PubKey Failed", "error", err)
}
node.DRand.Leader.PubKey = &bls.PublicKey{}
err = node.DRand.Leader.PubKey.Deserialize(pong.LeaderPubKey)
if err != nil {
utils.GetLogInstance().Error("Unmarshal DRand Leader PubKey Failed", "error", err)
}
// Reset Validator PublicKeys every time we receive PONG message from Leader

@ -25,6 +25,7 @@ func (id GroupID) String() string {
// Const of group ID
const (
GroupIDBeacon GroupID = "harmony/0.0.1/beacon"
GroupIDBeaconClient GroupID = "harmony/0.0.1/beacon/client"
GroupIDGlobal GroupID = "harmony/0.0.1/global"
GroupIDUnknown GroupID = "B1acKh0lE"
)

Loading…
Cancel
Save