diff --git a/api/service/config.go b/api/service/config.go index 95fedae15..cec7ef44e 100644 --- a/api/service/config.go +++ b/api/service/config.go @@ -2,7 +2,6 @@ package service import ( nodeconfig "github.com/harmony-one/harmony/internal/configs/node" - "github.com/harmony-one/harmony/p2p" ) // NodeConfig defines a structure of node configuration @@ -11,37 +10,37 @@ import ( // cyclic imports type NodeConfig struct { // The three groupID design, please refer to https://github.com/harmony-one/harmony/blob/master/node/node.md#libp2p-integration - Beacon p2p.GroupID // the beacon group ID - ShardGroupID p2p.GroupID // the group ID of the shard - Client p2p.GroupID // the client group ID of the shard - IsClient bool // whether this node is a client node, such as wallet/txgen - IsBeacon bool // whether this node is a beacon node or not - ShardID uint32 // shardID of this node - Actions map[p2p.GroupID]p2p.ActionType // actions on the groups - PushgatewayIP string // prometheus pushgateway ip - PushgatewayPort string // prometheus pushgateway port - MetricsFlag bool // flag to collect metrics or not + Beacon nodeconfig.GroupID // the beacon group ID + ShardGroupID nodeconfig.GroupID // the group ID of the shard + Client nodeconfig.GroupID // the client group ID of the shard + IsClient bool // whether this node is a client node, such as wallet/txgen + IsBeacon bool // whether this node is a beacon node or not + ShardID uint32 // shardID of this node + Actions map[nodeconfig.GroupID]nodeconfig.ActionType // actions on the groups + PushgatewayIP string // prometheus pushgateway ip + PushgatewayPort string // prometheus pushgateway port + MetricsFlag bool // flag to collect metrics or not } // GroupIDShards is a map of ShardGroupID ID // key is the shard ID // value is the corresponding group ID var ( - GroupIDShards map[p2p.ShardID]p2p.GroupID - GroupIDShardClients map[p2p.ShardID]p2p.GroupID + GroupIDShards map[nodeconfig.ShardID]nodeconfig.GroupID + GroupIDShardClients map[nodeconfig.ShardID]nodeconfig.GroupID ) func init() { - GroupIDShards = make(map[p2p.ShardID]p2p.GroupID) - GroupIDShardClients = make(map[p2p.ShardID]p2p.GroupID) + GroupIDShards = make(map[nodeconfig.ShardID]nodeconfig.GroupID) + GroupIDShardClients = make(map[nodeconfig.ShardID]nodeconfig.GroupID) // init beacon chain group IDs - GroupIDShards[0] = p2p.GroupIDBeacon - GroupIDShardClients[0] = p2p.GroupIDBeaconClient + GroupIDShards[0] = nodeconfig.NewGroupIDByShardID(0) + GroupIDShardClients[0] = nodeconfig.NewClientGroupIDByShardID(0) for i := 1; i < nodeconfig.MaxShards; i++ { - sid := p2p.ShardID(i) - GroupIDShards[sid] = p2p.NewGroupIDByShardID(sid) - GroupIDShardClients[sid] = p2p.NewClientGroupIDByShardID(sid) + sid := nodeconfig.ShardID(i) + GroupIDShards[sid] = nodeconfig.NewGroupIDByShardID(sid) + GroupIDShardClients[sid] = nodeconfig.NewClientGroupIDByShardID(sid) } } diff --git a/api/service/discovery/service.go b/api/service/discovery/service.go index f0038d22f..30868273b 100644 --- a/api/service/discovery/service.go +++ b/api/service/discovery/service.go @@ -18,9 +18,9 @@ type Service struct { host p2p.Host peerChan chan p2p.Peer stopChan chan struct{} - actionChan chan p2p.GroupAction + actionChan chan nodeconfig.GroupAction config service.NodeConfig - actions map[p2p.GroupID]p2p.ActionType + actions map[nodeconfig.GroupID]nodeconfig.ActionType messageChan chan *msg_pb.Message addBeaconPeerFunc func(*p2p.Peer) bool } @@ -34,9 +34,9 @@ func New(h p2p.Host, config service.NodeConfig, peerChan chan p2p.Peer, addPeer host: h, peerChan: peerChan, stopChan: make(chan struct{}), - actionChan: make(chan p2p.GroupAction), + actionChan: make(chan nodeconfig.GroupAction), config: config, - actions: make(map[p2p.GroupID]p2p.ActionType), + actions: make(map[nodeconfig.GroupID]nodeconfig.ActionType), addBeaconPeerFunc: addPeer, } } @@ -58,7 +58,7 @@ func (s *Service) StopService() { // NotifyService receives notification from service manager func (s *Service) NotifyService(params map[string]interface{}) { data := params["peer"] - action, ok := data.(p2p.GroupAction) + action, ok := data.(nodeconfig.GroupAction) if !ok { utils.Logger().Error().Msg("Wrong data type passed to NotifyService") return @@ -117,18 +117,14 @@ func (s *Service) contactP2pPeers() { } // sentPingMessage sends a ping message to a pubsub topic -func (s *Service) sentPingMessage(g p2p.GroupID, msgBuf []byte) { +func (s *Service) sentPingMessage(g nodeconfig.GroupID, msgBuf []byte) { var err error - if g == p2p.GroupIDBeacon || g == p2p.GroupIDBeaconClient { - err = s.host.SendMessageToGroups([]p2p.GroupID{s.config.Beacon}, msgBuf) - } else { - // The following logical will be used for 2nd stage peer discovery process - // do nothing when the groupID is unknown - if s.config.ShardGroupID == p2p.GroupIDUnknown { - return - } - err = s.host.SendMessageToGroups([]p2p.GroupID{s.config.ShardGroupID}, msgBuf) + // The following logical will be used for 2nd stage peer discovery process + // do nothing when the groupID is unknown + if s.config.ShardGroupID == nodeconfig.GroupIDUnknown { + return } + err = s.host.SendMessageToGroups([]nodeconfig.GroupID{s.config.ShardGroupID}, msgBuf) if err != nil { utils.Logger().Error().Err(err).Str("group", string(g)).Msg("Failed to send ping message") } diff --git a/api/service/manager_test.go b/api/service/manager_test.go index 0e147f78e..cfccdccef 100644 --- a/api/service/manager_test.go +++ b/api/service/manager_test.go @@ -8,7 +8,6 @@ import ( "github.com/ethereum/go-ethereum/rpc" msg_pb "github.com/harmony-one/harmony/api/proto/message" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" - "github.com/harmony-one/harmony/p2p" ) type SupportSyncingTest struct { @@ -102,8 +101,8 @@ func TestStopServices(t *testing.T) { } func TestInit(t *testing.T) { - if GroupIDShards[p2p.ShardID(0)] != p2p.GroupIDBeacon { - t.Errorf("GroupIDShards[0]: %v != GroupIDBeacon: %v", GroupIDShards[p2p.ShardID(0)], p2p.GroupIDBeacon) + if GroupIDShards[nodeconfig.ShardID(0)] != nodeconfig.NewGroupIDByShardID(0) { + t.Errorf("GroupIDShards[0]: %v != GroupIDBeacon: %v", GroupIDShards[nodeconfig.ShardID(0)], nodeconfig.NewGroupIDByShardID(0)) } if len(GroupIDShards) != nodeconfig.MaxShards { t.Errorf("len(GroupIDShards): %v != TotalShards: %v", len(GroupIDShards), nodeconfig.MaxShards) diff --git a/api/service/networkinfo/service.go b/api/service/networkinfo/service.go index f23212da4..0d87c298f 100644 --- a/api/service/networkinfo/service.go +++ b/api/service/networkinfo/service.go @@ -12,6 +12,7 @@ import ( "github.com/ethereum/go-ethereum/rpc" msg_pb "github.com/harmony-one/harmony/api/proto/message" + nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/p2p" badger "github.com/ipfs/go-ds-badger" @@ -25,7 +26,7 @@ import ( // Service is the network info service. type Service struct { Host p2p.Host - Rendezvous p2p.GroupID + Rendezvous nodeconfig.GroupID bootnodes utils.AddrList dht *libp2pdht.IpfsDHT cancel context.CancelFunc @@ -61,7 +62,7 @@ const ( // New returns role conversion service. If dataStorePath is not empty, it // points to a persistent database directory to use. func New( - h p2p.Host, rendezvous p2p.GroupID, peerChan chan p2p.Peer, + h p2p.Host, rendezvous nodeconfig.GroupID, peerChan chan p2p.Peer, bootnodes utils.AddrList, dataStorePath string, ) (*Service, error) { var cancel context.CancelFunc @@ -100,7 +101,7 @@ func New( // MustNew is a panic-on-error version of New. func MustNew( - h p2p.Host, rendezvous p2p.GroupID, peerChan chan p2p.Peer, + h p2p.Host, rendezvous nodeconfig.GroupID, peerChan chan p2p.Peer, bootnodes utils.AddrList, dataStorePath string, ) *Service { service, err := New(h, rendezvous, peerChan, bootnodes, dataStorePath) @@ -167,7 +168,10 @@ func (s *Service) Init() error { utils.Logger().Info().Str("Rendezvous", string(s.Rendezvous)).Msg("Announcing ourselves...") s.discovery = libp2pdis.NewRoutingDiscovery(s.dht) libp2pdis.Advertise(ctx, s.discovery, string(s.Rendezvous)) - libp2pdis.Advertise(ctx, s.discovery, string(p2p.GroupIDBeaconClient)) + + // Everyone is beacon client, which means everyone is connected via beacon client topic + // 0 is beacon chain FIXME: use a constant + libp2pdis.Advertise(ctx, s.discovery, string(nodeconfig.NewClientGroupIDByShardID(0))) utils.Logger().Info().Msg("Successfully announced!") return nil @@ -193,7 +197,8 @@ func (s *Service) DoService() { return case <-tick.C: libp2pdis.Advertise(ctx, s.discovery, string(s.Rendezvous)) - libp2pdis.Advertise(ctx, s.discovery, string(p2p.GroupIDBeaconClient)) + // 0 is beacon chain FIXME: use a constant + libp2pdis.Advertise(ctx, s.discovery, string(nodeconfig.NewClientGroupIDByShardID(0))) utils.Logger().Info().Str("Rendezvous", string(s.Rendezvous)).Msg("Successfully announced!") default: var err error diff --git a/api/service/networkinfo/service_test.go b/api/service/networkinfo/service_test.go index 27fa00bc0..b2eec2d2a 100644 --- a/api/service/networkinfo/service_test.go +++ b/api/service/networkinfo/service_test.go @@ -6,6 +6,7 @@ import ( "github.com/harmony-one/harmony/crypto/bls" + nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p/p2pimpl" @@ -28,7 +29,7 @@ func TestService(t *testing.T) { t.Fatal("unable to new host in harmony") } - s, err := New(host, p2p.GroupIDBeaconClient, nil, nil, "") + s, err := New(host, nodeconfig.GroupIDBeaconClient, nil, nil, "") if err != nil { t.Fatalf("New() failed: %s", err) } diff --git a/cmd/client/txgen/main.go b/cmd/client/txgen/main.go index 820112f7c..beaf2d97a 100644 --- a/cmd/client/txgen/main.go +++ b/cmd/client/txgen/main.go @@ -119,9 +119,9 @@ func setUpTXGen() *node.Node { } txGen.NodeConfig.SetRole(nodeconfig.ClientNode) if shardID == 0 { - txGen.NodeConfig.SetShardGroupID(p2p.GroupIDBeacon) + txGen.NodeConfig.SetShardGroupID(nodeconfig.GroupIDBeacon) } else { - txGen.NodeConfig.SetShardGroupID(p2p.NewGroupIDByShardID(p2p.ShardID(shardID))) + txGen.NodeConfig.SetShardGroupID(nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(shardID))) } txGen.NodeConfig.SetIsClient(true) @@ -289,10 +289,10 @@ func SendTxsToShard(clientNode *node.Node, txs types.Transactions, shardID uint3 msg := proto_node.ConstructTransactionListMessageAccount(txs) var err error if shardID == 0 { - err = clientNode.GetHost().SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeaconClient}, p2p_host.ConstructP2pMessage(byte(0), msg)) + err = clientNode.GetHost().SendMessageToGroups([]nodeconfig.GroupID{nodeconfig.GroupIDBeaconClient}, p2p_host.ConstructP2pMessage(byte(0), msg)) } else { - clientGroup := p2p.NewClientGroupIDByShardID(p2p.ShardID(shardID)) - err = clientNode.GetHost().SendMessageToGroups([]p2p.GroupID{clientGroup}, p2p_host.ConstructP2pMessage(byte(0), msg)) + clientGroup := nodeconfig.NewClientGroupIDByShardID(nodeconfig.ShardID(shardID)) + err = clientNode.GetHost().SendMessageToGroups([]nodeconfig.GroupID{clientGroup}, p2p_host.ConstructP2pMessage(byte(0), msg)) } if err != nil { utils.Logger().Debug(). diff --git a/cmd/client/wallet/main.go b/cmd/client/wallet/main.go index 30dd4194d..c5640b207 100644 --- a/cmd/client/wallet/main.go +++ b/cmd/client/wallet/main.go @@ -935,9 +935,9 @@ func clearKeystore() { // submitTransaction submits the transaction to the Harmony network func submitTransaction(tx *types.Transaction, walletNode *node.Node, shardID uint32) error { msg := proto_node.ConstructTransactionListMessageAccount(types.Transactions{tx}) - clientGroup := p2p.NewClientGroupIDByShardID(p2p.ShardID(shardID)) + clientGroup := nodeconfig.NewClientGroupIDByShardID(nodeconfig.ShardID(shardID)) - err := walletNode.GetHost().SendMessageToGroups([]p2p.GroupID{clientGroup}, p2p_host.ConstructP2pMessage(byte(0), msg)) + err := walletNode.GetHost().SendMessageToGroups([]nodeconfig.GroupID{clientGroup}, p2p_host.ConstructP2pMessage(byte(0), msg)) if err != nil { fmt.Printf("Error in SubmitTransaction: %v\n", err) return err diff --git a/cmd/client/wallet_stress_test/main.go b/cmd/client/wallet_stress_test/main.go index e92268417..6b1574db8 100644 --- a/cmd/client/wallet_stress_test/main.go +++ b/cmd/client/wallet_stress_test/main.go @@ -457,9 +457,9 @@ func clearKeystore() { // submitTransaction submits the transaction to the Harmony network func submitTransaction(tx *types.Transaction, walletNode *node.Node, shardID uint32) error { msg := proto_node.ConstructTransactionListMessageAccount(types.Transactions{tx}) - clientGroup := p2p.NewClientGroupIDByShardID(p2p.ShardID(shardID)) + clientGroup := nodeconfig.NewClientGroupIDByShardID(nodeconfig.ShardID(shardID)) - err := walletNode.GetHost().SendMessageToGroups([]p2p.GroupID{clientGroup}, p2p_host.ConstructP2pMessage(byte(0), msg)) + err := walletNode.GetHost().SendMessageToGroups([]nodeconfig.GroupID{clientGroup}, p2p_host.ConstructP2pMessage(byte(0), msg)) if err != nil { fmt.Printf("Error in SubmitTransaction: %v\n", err) return err diff --git a/cmd/harmony/main.go b/cmd/harmony/main.go index b85f0d870..32d924837 100644 --- a/cmd/harmony/main.go +++ b/cmd/harmony/main.go @@ -35,6 +35,7 @@ import ( "github.com/harmony-one/harmony/p2p/p2pimpl" ) +// Version string variables var ( version string builtBy string @@ -42,6 +43,11 @@ var ( commit string ) +// Host +var ( + myHost p2p.Host +) + // InitLDBDatabase initializes a LDBDatabase. isGenesis=true will return the beacon chain database for normal shard nodes func InitLDBDatabase(ip string, port string, freshDB bool, isBeacon bool) (*ethdb.LDBDatabase, error) { var dbFileName string @@ -64,6 +70,7 @@ func printVersion() { os.Exit(0) } +// Flags var ( ip = flag.String("ip", "127.0.0.1", "ip of the node") port = flag.String("port", "9000", "port of the node.") @@ -262,11 +269,12 @@ func createGlobalConfig() *nodeconfig.ConfigType { if err != nil { panic(err) } - nodeConfig.SelfPeer = p2p.Peer{IP: *ip, Port: *port, ConsensusPubKey: nodeConfig.ConsensusPubKey} - nodeConfig.Host, err = p2pimpl.NewHost(&nodeConfig.SelfPeer, nodeConfig.P2pPriKey) + selfPeer := p2p.Peer{IP: *ip, Port: *port, ConsensusPubKey: nodeConfig.ConsensusPubKey} + + myHost, err = p2pimpl.NewHost(&selfPeer, nodeConfig.P2pPriKey) if *logConn && nodeConfig.GetNetworkType() != nodeconfig.Mainnet { - nodeConfig.Host.GetP2PHost().Network().Notify(utils.NewConnLogger(utils.GetLogInstance())) + myHost.GetP2PHost().Network().Notify(utils.NewConnLogger(utils.GetLogInstance())) } if err != nil { panic("unable to new host in harmony") @@ -281,7 +289,7 @@ func setupConsensusAndNode(nodeConfig *nodeconfig.ConfigType) *node.Node { // Consensus object. // TODO: consensus object shouldn't start here // TODO(minhdoan): During refactoring, found out that the peers list is actually empty. Need to clean up the logic of consensus later. - currentConsensus, err := consensus.New(nodeConfig.Host, nodeConfig.ShardID, nodeConfig.Leader, nodeConfig.ConsensusPriKey) + currentConsensus, err := consensus.New(myHost, nodeConfig.ShardID, p2p.Peer{}, nodeConfig.ConsensusPriKey) currentConsensus.SelfAddress = common.ParseAddr(initialAccount.Address) if err != nil { @@ -302,7 +310,7 @@ func setupConsensusAndNode(nodeConfig *nodeconfig.ConfigType) *node.Node { // Current node. chainDBFactory := &shardchain.LDBFactory{RootDir: nodeConfig.DBDir} - currentNode := node.New(nodeConfig.Host, currentConsensus, chainDBFactory, *isArchival) + currentNode := node.New(myHost, currentConsensus, chainDBFactory, *isArchival) switch { case *networkType == nodeconfig.Localnet: @@ -338,21 +346,21 @@ func setupConsensusAndNode(nodeConfig *nodeconfig.ConfigType) *node.Node { currentNode.NodeConfig.SetPushgatewayPort(nodeConfig.PushgatewayPort) currentNode.NodeConfig.SetMetricsFlag(nodeConfig.MetricsFlag) - currentNode.NodeConfig.SetBeaconGroupID(p2p.NewGroupIDByShardID(0)) + currentNode.NodeConfig.SetBeaconGroupID(nodeconfig.NewGroupIDByShardID(0)) switch *nodeType { case "explorer": currentNode.NodeConfig.SetRole(nodeconfig.ExplorerNode) - currentNode.NodeConfig.SetShardGroupID(p2p.NewGroupIDByShardID(p2p.ShardID(*shardID))) - currentNode.NodeConfig.SetClientGroupID(p2p.NewClientGroupIDByShardID(p2p.ShardID(*shardID))) + currentNode.NodeConfig.SetShardGroupID(nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(*shardID))) + currentNode.NodeConfig.SetClientGroupID(nodeconfig.NewClientGroupIDByShardID(nodeconfig.ShardID(*shardID))) case "validator": currentNode.NodeConfig.SetRole(nodeconfig.Validator) if nodeConfig.ShardID == 0 { - currentNode.NodeConfig.SetShardGroupID(p2p.GroupIDBeacon) - currentNode.NodeConfig.SetClientGroupID(p2p.GroupIDBeaconClient) + currentNode.NodeConfig.SetShardGroupID(nodeconfig.NewGroupIDByShardID(0)) + currentNode.NodeConfig.SetClientGroupID(nodeconfig.NewClientGroupIDByShardID(0)) } else { - currentNode.NodeConfig.SetShardGroupID(p2p.NewGroupIDByShardID(p2p.ShardID(nodeConfig.ShardID))) - currentNode.NodeConfig.SetClientGroupID(p2p.NewClientGroupIDByShardID(p2p.ShardID(nodeConfig.ShardID))) + currentNode.NodeConfig.SetShardGroupID(nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(nodeConfig.ShardID))) + currentNode.NodeConfig.SetClientGroupID(nodeconfig.NewClientGroupIDByShardID(nodeconfig.ShardID(nodeConfig.ShardID))) } } currentNode.NodeConfig.ConsensusPubKey = nodeConfig.ConsensusPubKey @@ -479,7 +487,7 @@ func main() { Str("BeaconGroupID", nodeConfig.GetBeaconGroupID().String()). Str("ClientGroupID", nodeConfig.GetClientGroupID().String()). Str("Role", currentNode.NodeConfig.Role().String()). - Str("multiaddress", fmt.Sprintf("/ip4/%s/tcp/%s/p2p/%s", *ip, *port, nodeConfig.Host.GetID().Pretty())). + Str("multiaddress", fmt.Sprintf("/ip4/%s/tcp/%s/p2p/%s", *ip, *port, myHost.GetID().Pretty())). Msg(startMsg) if *enableMemProfiling { diff --git a/consensus/consensus_msg_sender.go b/consensus/consensus_msg_sender.go index 9995a96f0..538cf5fc2 100644 --- a/consensus/consensus_msg_sender.go +++ b/consensus/consensus_msg_sender.go @@ -5,6 +5,7 @@ import ( "time" msg_pb "github.com/harmony-one/harmony/api/proto/message" + nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/p2p" ) @@ -28,7 +29,7 @@ type MessageSender struct { // MessageRetry controls the message that can be retried type MessageRetry struct { blockNum uint64 // The block number this message is for - groups []p2p.GroupID + groups []nodeconfig.GroupID p2pMsg []byte msgType msg_pb.MessageType retryCount int @@ -58,7 +59,7 @@ func (sender *MessageSender) Reset(blockNum uint64) { } // SendWithRetry sends message with retry logic. -func (sender *MessageSender) SendWithRetry(blockNum uint64, msgType msg_pb.MessageType, groups []p2p.GroupID, p2pMsg []byte) error { +func (sender *MessageSender) SendWithRetry(blockNum uint64, msgType msg_pb.MessageType, groups []nodeconfig.GroupID, p2pMsg []byte) error { willRetry := sender.retryTimes != 0 msgRetry := MessageRetry{blockNum: blockNum, groups: groups, p2pMsg: p2pMsg, msgType: msgType, retryCount: 0, isActive: willRetry} if willRetry { @@ -71,7 +72,7 @@ func (sender *MessageSender) SendWithRetry(blockNum uint64, msgType msg_pb.Messa } // SendWithoutRetry sends message without retry logic. -func (sender *MessageSender) SendWithoutRetry(groups []p2p.GroupID, p2pMsg []byte) error { +func (sender *MessageSender) SendWithoutRetry(groups []nodeconfig.GroupID, p2pMsg []byte) error { return sender.host.SendMessageToGroups(groups, p2pMsg) } diff --git a/consensus/consensus_v2.go b/consensus/consensus_v2.go index 5ebb94736..4d23dd749 100644 --- a/consensus/consensus_v2.go +++ b/consensus/consensus_v2.go @@ -19,9 +19,9 @@ import ( "github.com/harmony-one/harmony/core/types" vrf_bls "github.com/harmony-one/harmony/crypto/vrf/bls" "github.com/harmony-one/harmony/internal/chain" + nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/internal/ctxerror" "github.com/harmony-one/harmony/internal/utils" - "github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p/host" ) @@ -134,9 +134,9 @@ func (consensus *Consensus) announce(block *types.Block) { // Construct broadcast p2p message - if err := consensus.msgSender.SendWithRetry(consensus.blockNum, msg_pb.MessageType_ANNOUNCE, []p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil { + if err := consensus.msgSender.SendWithRetry(consensus.blockNum, msg_pb.MessageType_ANNOUNCE, []nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil { utils.Logger().Warn(). - Str("groupID", string(p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID)))). + Str("groupID", string(nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)))). Msg("[Announce] Cannot send announce message") } else { utils.Logger().Info(). @@ -284,7 +284,7 @@ func (consensus *Consensus) prepare() { msgToSend := consensus.constructPrepareMessage() // TODO: this will not return immediatey, may block - if err := consensus.msgSender.SendWithoutRetry([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil { + if err := consensus.msgSender.SendWithoutRetry([]nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil { utils.Logger().Warn().Err(err).Msg("[OnAnnounce] Cannot send prepare message") } else { utils.Logger().Info(). @@ -408,7 +408,7 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) { return } - if err := consensus.msgSender.SendWithRetry(consensus.blockNum, msg_pb.MessageType_PREPARED, []p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil { + if err := consensus.msgSender.SendWithRetry(consensus.blockNum, msg_pb.MessageType_PREPARED, []nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil { utils.Logger().Warn().Msg("[OnPrepare] Cannot send prepared message") } else { utils.Logger().Debug(). @@ -592,7 +592,7 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) { time.Sleep(consensus.delayCommit) } - if err := consensus.msgSender.SendWithoutRetry([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil { + if err := consensus.msgSender.SendWithoutRetry([]nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil { utils.Logger().Warn().Msg("[OnPrepared] Cannot send commit message!!") } else { utils.Logger().Info(). @@ -771,7 +771,7 @@ func (consensus *Consensus) finalizeCommits() { } // if leader success finalize the block, send committed message to validators - if err := consensus.msgSender.SendWithRetry(block.NumberU64(), msg_pb.MessageType_COMMITTED, []p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil { + if err := consensus.msgSender.SendWithRetry(block.NumberU64(), msg_pb.MessageType_COMMITTED, []nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil { utils.Logger().Warn().Err(err).Msg("[Finalizing] Cannot send committed message") } else { utils.Logger().Info(). diff --git a/consensus/view_change.go b/consensus/view_change.go index 6f5bd533d..0136a6f9d 100644 --- a/consensus/view_change.go +++ b/consensus/view_change.go @@ -10,8 +10,8 @@ import ( "github.com/harmony-one/bls/ffi/go/bls" msg_pb "github.com/harmony-one/harmony/api/proto/message" bls_cosi "github.com/harmony-one/harmony/crypto/bls" + nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/internal/utils" - "github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p/host" ) @@ -187,7 +187,7 @@ func (consensus *Consensus) startViewChange(viewID uint64) { Msg("[startViewChange]") msgToSend := consensus.constructViewChangeMessage() - consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)) + consensus.host.SendMessageToGroups([]nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)) consensus.consensusTimeout[timeoutViewChange].SetDuration(duration) consensus.consensusTimeout[timeoutViewChange].Start() @@ -432,7 +432,7 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) { Int("payloadSize", len(consensus.m1Payload)). Hex("M1Payload", consensus.m1Payload). Msg("[onViewChange] Sent NewView Message") - consensus.msgSender.SendWithRetry(consensus.blockNum, msg_pb.MessageType_NEWVIEW, []p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)) + consensus.msgSender.SendWithRetry(consensus.blockNum, msg_pb.MessageType_NEWVIEW, []nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)) consensus.viewID = recvMsg.ViewID consensus.ResetViewChangeState() @@ -562,7 +562,7 @@ func (consensus *Consensus) onNewView(msg *msg_pb.Message) { msgToSend := consensus.constructCommitMessage(commitPayload) utils.Logger().Info().Msg("onNewView === commit") - consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)) + consensus.host.SendMessageToGroups([]nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)) utils.Logger().Debug(). Str("From", consensus.phase.String()). Str("To", Commit.String()). diff --git a/drand/drand_leader.go b/drand/drand_leader.go index 5cd5c21b9..3ec7c10f4 100644 --- a/drand/drand_leader.go +++ b/drand/drand_leader.go @@ -12,8 +12,8 @@ import ( "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/crypto/vdf" "github.com/harmony-one/harmony/crypto/vrf/p256" + nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/internal/utils" - "github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p/host" ) @@ -81,7 +81,7 @@ func (dRand *DRand) init(epochBlock *types.Block) { Hex("msg", msgToSend). Str("leader.PubKey", dRand.leader.ConsensusPubKey.SerializeToHexStr()). Msg("[DRG] sent init") - dRand.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(dRand.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)) + dRand.host.SendMessageToGroups([]nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(dRand.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)) } // ProcessMessageLeader dispatches messages for the leader to corresponding processors. diff --git a/drand/drand_validator.go b/drand/drand_validator.go index 8c8165df5..66bf1ac03 100644 --- a/drand/drand_validator.go +++ b/drand/drand_validator.go @@ -3,8 +3,8 @@ package drand import ( protobuf "github.com/golang/protobuf/proto" msg_pb "github.com/harmony-one/harmony/api/proto/message" + nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/internal/utils" - "github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p/host" ) @@ -59,5 +59,5 @@ func (dRand *DRand) processInitMessage(message *msg_pb.Message) { msgToSend := dRand.constructCommitMessage(rand, proof) // Send the commit message back to leader - dRand.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(dRand.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)) + dRand.host.SendMessageToGroups([]nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(dRand.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)) } diff --git a/internal/configs/node/config.go b/internal/configs/node/config.go index d544e62b5..21f1ab3c1 100644 --- a/internal/configs/node/config.go +++ b/internal/configs/node/config.go @@ -11,8 +11,6 @@ import ( "github.com/harmony-one/bls/ffi/go/bls" p2p_crypto "github.com/libp2p/go-libp2p-crypto" - - "github.com/harmony-one/harmony/p2p" ) // Role defines a role of a node. @@ -67,21 +65,20 @@ var publicRPC bool // enable public RPC access // ConfigType is the structure of all node related configuration variables type ConfigType struct { // The three groupID design, please refer to https://github.com/harmony-one/harmony/blob/master/node/node.md#libp2p-integration - beacon p2p.GroupID // the beacon group ID - group p2p.GroupID // the group ID of the shard (note: for beacon chain node, the beacon and shard group are the same) - client p2p.GroupID // the client group ID of the shard - isClient bool // whether this node is a client node, such as wallet/txgen - isBeacon bool // whether this node is beacon node doing consensus or not - ShardID uint32 // ShardID of this node; TODO ek – reviisit when resharding - role Role // Role of the node - Port string // Port of the node. - IP string // IP of the node. + beacon GroupID // the beacon group ID + group GroupID // the group ID of the shard (note: for beacon chain node, the beacon and shard group are the same) + client GroupID // the client group ID of the shard + isClient bool // whether this node is a client node, such as wallet/txgen + isBeacon bool // whether this node is beacon node doing consensus or not + ShardID uint32 // ShardID of this node; TODO ek – reviisit when resharding + role Role // Role of the node + Port string // Port of the node. + IP string // IP of the node. MetricsFlag bool // collect and upload metrics flag PushgatewayIP string // metrics pushgateway prometheus ip PushgatewayPort string // metrics pushgateway prometheus port StringRole string - Host p2p.Host StakingPriKey *ecdsa.PrivateKey P2pPriKey p2p_crypto.PrivKey ConsensusPriKey *bls.SecretKey @@ -90,9 +87,6 @@ type ConfigType struct { // Database directory DBDir string - SelfPeer p2p.Peer - Leader p2p.Peer - networkType NetworkType } @@ -139,17 +133,17 @@ func (conf *ConfigType) String() string { } // SetBeaconGroupID set the groupID for beacon group -func (conf *ConfigType) SetBeaconGroupID(g p2p.GroupID) { +func (conf *ConfigType) SetBeaconGroupID(g GroupID) { conf.beacon = g } // SetShardGroupID set the groupID for shard group -func (conf *ConfigType) SetShardGroupID(g p2p.GroupID) { +func (conf *ConfigType) SetShardGroupID(g GroupID) { conf.group = g } // SetClientGroupID set the groupID for client group -func (conf *ConfigType) SetClientGroupID(g p2p.GroupID) { +func (conf *ConfigType) SetClientGroupID(g GroupID) { conf.client = g } @@ -199,12 +193,12 @@ func (conf *ConfigType) GetPushgatewayPort() string { } // GetBeaconGroupID returns the groupID for beacon group -func (conf *ConfigType) GetBeaconGroupID() p2p.GroupID { +func (conf *ConfigType) GetBeaconGroupID() GroupID { return conf.beacon } // GetShardGroupID returns the groupID for shard group -func (conf *ConfigType) GetShardGroupID() p2p.GroupID { +func (conf *ConfigType) GetShardGroupID() GroupID { return conf.group } @@ -214,7 +208,7 @@ func (conf *ConfigType) GetShardID() uint32 { } // GetClientGroupID returns the groupID for client group -func (conf *ConfigType) GetClientGroupID() p2p.GroupID { +func (conf *ConfigType) GetClientGroupID() GroupID { return conf.client } diff --git a/internal/configs/node/config_test.go b/internal/configs/node/config_test.go index cf4da0a5b..a3306bbb6 100644 --- a/internal/configs/node/config_test.go +++ b/internal/configs/node/config_test.go @@ -2,8 +2,6 @@ package nodeconfig import ( "testing" - - "github.com/harmony-one/harmony/p2p" ) func TestNodeConfigSingleton(t *testing.T) { @@ -13,14 +11,14 @@ func TestNodeConfigSingleton(t *testing.T) { // get the singleton variable c := GetShardConfig(Global) - c.SetBeaconGroupID(p2p.GroupIDBeacon) + c.SetBeaconGroupID(GroupIDBeacon) d := GetShardConfig(Global) g := d.GetBeaconGroupID() - if g != p2p.GroupIDBeacon { - t.Errorf("GetBeaconGroupID = %v, expected = %v", g, p2p.GroupIDBeacon) + if g != GroupIDBeacon { + t.Errorf("GetBeaconGroupID = %v, expected = %v", g, GroupIDBeacon) } } diff --git a/p2p/group.go b/internal/configs/node/group.go similarity index 55% rename from p2p/group.go rename to internal/configs/node/group.go index 0dd6cde6e..a94f2f7a8 100644 --- a/p2p/group.go +++ b/internal/configs/node/group.go @@ -1,12 +1,8 @@ -package p2p +package nodeconfig import ( - "context" "fmt" - "io" "strconv" - - libp2p_peer "github.com/libp2p/go-libp2p-peer" ) // GroupID is a multicast group ID. @@ -25,32 +21,50 @@ func (id GroupID) String() string { // Const of group ID const ( - GroupIDBeacon GroupID = "harmony/0.0.1/node/beacon" - GroupIDBeaconClient GroupID = "harmony/0.0.1/client/beacon" - GroupIDShardPrefix GroupID = "harmony/0.0.1/node/shard/%s" - GroupIDShardClientPrefix GroupID = "harmony/0.0.1/client/shard/%s" - GroupIDGlobal GroupID = "harmony/0.0.1/node/global" - GroupIDGlobalClient GroupID = "harmony/0.0.1/node/global" - GroupIDUnknown GroupID = "B1acKh0lE" + GroupIDBeacon GroupID = "%s/0.0.1/node/beacon" + GroupIDBeaconClient GroupID = "%s/0.0.1/client/beacon" + GroupIDShardPrefix GroupID = "%s/0.0.1/node/shard/%s" + GroupIDShardClientPrefix GroupID = "%s/0.0.1/client/shard/%s" + GroupIDGlobal GroupID = "%s/0.0.1/node/global" + GroupIDGlobalClient GroupID = "%s/0.0.1/node/global" + GroupIDUnknown GroupID = "%s/B1acKh0lE" ) // ShardID defines the ID of a shard type ShardID uint32 +func getNetworkPrefix(shardID ShardID) (netPre string) { + switch GetShardConfig(uint32(shardID)).GetNetworkType() { + case Mainnet: + netPre = "harmony" + case Testnet: + netPre = "testnet" + case Pangaea: + netPre = "pangaea" + case Devnet: + netPre = "devnet" + case Localnet: + netPre = "local" + default: + netPre = "misc" + } + return +} + // NewGroupIDByShardID returns a new groupID for a shard func NewGroupIDByShardID(shardID ShardID) GroupID { if shardID == 0 { - return GroupIDBeacon + return GroupID(fmt.Sprintf(GroupIDBeacon.String(), getNetworkPrefix(shardID))) } - return GroupID(fmt.Sprintf(GroupIDShardPrefix.String(), strconv.Itoa(int(shardID)))) + return GroupID(fmt.Sprintf(GroupIDShardPrefix.String(), getNetworkPrefix(shardID), strconv.Itoa(int(shardID)))) } // NewClientGroupIDByShardID returns a new groupID for a shard's client func NewClientGroupIDByShardID(shardID ShardID) GroupID { if shardID == 0 { - return GroupIDBeaconClient + return GroupID(fmt.Sprintf(GroupIDBeaconClient.String(), getNetworkPrefix(shardID))) } - return GroupID(fmt.Sprintf(GroupIDShardClientPrefix.String(), strconv.Itoa(int(shardID)))) + return GroupID(fmt.Sprintf(GroupIDShardClientPrefix.String(), getNetworkPrefix(shardID), strconv.Itoa(int(shardID)))) } // ActionType lists action on group @@ -88,12 +102,3 @@ type GroupAction struct { func (g GroupAction) String() string { return fmt.Sprintf("%s/%s", g.Name, g.Action) } - -// GroupReceiver is a multicast group message receiver interface. -type GroupReceiver interface { - // Close closes this receiver. - io.Closer - - // Receive a message. - Receive(ctx context.Context) (msg []byte, sender libp2p_peer.ID, err error) -} diff --git a/p2p/group_test.go b/internal/configs/node/group_test.go similarity index 99% rename from p2p/group_test.go rename to internal/configs/node/group_test.go index 93cb85454..0c975756b 100644 --- a/p2p/group_test.go +++ b/internal/configs/node/group_test.go @@ -1,4 +1,4 @@ -package p2p +package nodeconfig import "testing" diff --git a/node/node.go b/node/node.go index e21749b29..dfec23457 100644 --- a/node/node.go +++ b/node/node.go @@ -246,11 +246,11 @@ func (node *Node) Beaconchain() *core.BlockChain { func (node *Node) tryBroadcast(tx *types.Transaction) { msg := proto_node.ConstructTransactionListMessageAccount(types.Transactions{tx}) - shardGroupID := p2p.NewGroupIDByShardID(p2p.ShardID(tx.ShardID())) + shardGroupID := nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(tx.ShardID())) utils.Logger().Info().Str("shardGroupID", string(shardGroupID)).Msg("tryBroadcast") for attempt := 0; attempt < NumTryBroadCast; attempt++ { - if err := node.host.SendMessageToGroups([]p2p.GroupID{shardGroupID}, p2p_host.ConstructP2pMessage(byte(0), msg)); err != nil && attempt < NumTryBroadCast { + if err := node.host.SendMessageToGroups([]nodeconfig.GroupID{shardGroupID}, p2p_host.ConstructP2pMessage(byte(0), msg)); err != nil && attempt < NumTryBroadCast { utils.Logger().Error().Int("attempt", attempt).Msg("Error when trying to broadcast tx") } else { break @@ -604,15 +604,15 @@ func (node *Node) initNodeConfiguration() (service.NodeConfig, chan p2p.Peer) { PushgatewayIP: node.NodeConfig.GetPushgatewayIP(), PushgatewayPort: node.NodeConfig.GetPushgatewayPort(), IsClient: node.NodeConfig.IsClient(), - Beacon: p2p.GroupIDBeacon, + Beacon: nodeconfig.NewGroupIDByShardID(0), ShardGroupID: node.NodeConfig.GetShardGroupID(), - Actions: make(map[p2p.GroupID]p2p.ActionType), + Actions: make(map[nodeconfig.GroupID]nodeconfig.ActionType), } if nodeConfig.IsClient { - nodeConfig.Actions[p2p.GroupIDBeaconClient] = p2p.ActionStart + nodeConfig.Actions[nodeconfig.NewClientGroupIDByShardID(0)] = nodeconfig.ActionStart } else { - nodeConfig.Actions[node.NodeConfig.GetShardGroupID()] = p2p.ActionStart + nodeConfig.Actions[node.NodeConfig.GetShardGroupID()] = nodeconfig.ActionStart } var err error @@ -621,7 +621,7 @@ func (node *Node) initNodeConfiguration() (service.NodeConfig, chan p2p.Peer) { utils.Logger().Error().Err(err).Msg("Failed to create shard receiver") } - node.globalGroupReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeaconClient) + node.globalGroupReceiver, err = node.host.GroupReceiver(nodeconfig.NewClientGroupIDByShardID(0)) if err != nil { utils.Logger().Error().Err(err).Msg("Failed to create global receiver") } diff --git a/node/node_cross_shard.go b/node/node_cross_shard.go index 3817f7ffe..d996ab7c7 100644 --- a/node/node_cross_shard.go +++ b/node/node_cross_shard.go @@ -4,7 +4,6 @@ import ( "encoding/binary" "errors" - "github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p/host" "github.com/ethereum/go-ethereum/common" @@ -17,6 +16,7 @@ import ( "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/types" bls_cosi "github.com/harmony-one/harmony/crypto/bls" + nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/internal/ctxerror" "github.com/harmony-one/harmony/internal/utils" ) @@ -68,8 +68,8 @@ func (node *Node) BroadcastCXReceiptsWithShardID(block *types.Block, commitSig [ } utils.Logger().Info().Uint32("ToShardID", toShardID).Msg("[BroadcastCXReceiptsWithShardID] ReadCXReceipts and MerkleProof Found") - groupID := p2p.ShardID(toShardID) - go node.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(groupID)}, host.ConstructP2pMessage(byte(0), proto_node.ConstructCXReceiptsProof(cxReceipts, merkleProof, block.Header(), commitSig, commitBitmap))) + groupID := nodeconfig.ShardID(toShardID) + go node.host.SendMessageToGroups([]nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(groupID)}, host.ConstructP2pMessage(byte(0), proto_node.ConstructCXReceiptsProof(cxReceipts, merkleProof, block.Header(), commitSig, commitBitmap))) } // BroadcastMissingCXReceipts broadcasts missing cross shard receipts per request diff --git a/node/node_handler.go b/node/node_handler.go index 6c552b5f0..b87afd9ff 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -259,7 +259,7 @@ func (node *Node) stakingMessageHandler(msgPayload []byte) { // NOTE: For now, just send to the client (basically not broadcasting) // TODO (lc): broadcast the new blocks to new nodes doing state sync func (node *Node) BroadcastNewBlock(newBlock *types.Block) { - groups := []p2p.GroupID{node.NodeConfig.GetClientGroupID()} + groups := []nodeconfig.GroupID{node.NodeConfig.GetClientGroupID()} utils.Logger().Info().Msgf("broadcasting new block %d, group %s", newBlock.NumberU64(), groups[0]) msg := host.ConstructP2pMessage(byte(0), proto_node.ConstructBlocksSyncMessage([]*types.Block{newBlock})) if err := node.host.SendMessageToGroups(groups, msg); err != nil { @@ -303,7 +303,7 @@ func (node *Node) BroadcastCrossLinkHeader(newBlock *types.Block) { for _, header := range headers { utils.Logger().Debug().Msgf("[BroadcastCrossLinkHeader] Broadcasting %d", header.Number().Uint64()) } - node.host.SendMessageToGroups([]p2p.GroupID{node.NodeConfig.GetBeaconGroupID()}, host.ConstructP2pMessage(byte(0), proto_node.ConstructCrossLinkHeadersMessage(headers))) + node.host.SendMessageToGroups([]nodeconfig.GroupID{node.NodeConfig.GetBeaconGroupID()}, host.ConstructP2pMessage(byte(0), proto_node.ConstructCrossLinkHeadersMessage(headers))) } // VerifyNewBlock is called by consensus participants to verify the block (account model) they are running consensus on diff --git a/node/node_resharding.go b/node/node_resharding.go index 7b0910255..8bab93dcf 100644 --- a/node/node_resharding.go +++ b/node/node_resharding.go @@ -20,9 +20,9 @@ import ( proto_node "github.com/harmony-one/harmony/api/proto/node" "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/types" + nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/internal/ctxerror" "github.com/harmony-one/harmony/internal/utils" - "github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p/host" "github.com/harmony-one/harmony/shard" ) @@ -141,7 +141,7 @@ func (node *Node) broadcastEpochShardState(newBlock *types.Block) error { }, ) return node.host.SendMessageToGroups( - []p2p.GroupID{node.NodeConfig.GetClientGroupID()}, + []nodeconfig.GroupID{node.NodeConfig.GetClientGroupID()}, host.ConstructP2pMessage(byte(0), epochShardStateMessage)) } diff --git a/node/service_setup.go b/node/service_setup.go index dd96cd416..1687edaba 100644 --- a/node/service_setup.go +++ b/node/service_setup.go @@ -14,7 +14,6 @@ import ( "github.com/harmony-one/harmony/api/service/networkinfo" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/internal/utils" - "github.com/harmony-one/harmony/p2p" ) func (node *Node) setupForValidator() { @@ -59,7 +58,7 @@ func (node *Node) setupForNewNode() { func (node *Node) setupForClientNode() { // Register networkinfo service. "0" is the beacon shard ID - node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.MustNew(node.host, p2p.GroupIDBeacon, nil, nil, node.networkInfoDHTPath())) + node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.MustNew(node.host, nodeconfig.NewGroupIDByShardID(0), nil, nil, node.networkInfoDHTPath())) } func (node *Node) setupForExplorerNode() { @@ -109,8 +108,8 @@ func (node *Node) StopServices() { func (node *Node) networkInfoDHTPath() string { return fmt.Sprintf(".dht-%s-%s-c%s", - node.NodeConfig.SelfPeer.IP, - node.NodeConfig.SelfPeer.Port, + node.SelfPeer.IP, + node.SelfPeer.Port, node.chainConfig.ChainID, ) } diff --git a/p2p/host.go b/p2p/host.go index 09aa0d5c8..2ffe7a0bd 100644 --- a/p2p/host.go +++ b/p2p/host.go @@ -1,6 +1,7 @@ package p2p import ( + nodeconfig "github.com/harmony-one/harmony/internal/configs/node" libp2p_host "github.com/libp2p/go-libp2p-host" libp2p_peer "github.com/libp2p/go-libp2p-peer" ) @@ -21,11 +22,11 @@ type Host interface { ConnectHostPeer(Peer) // SendMessageToGroups sends a message to one or more multicast groups. - SendMessageToGroups(groups []GroupID, msg []byte) error + SendMessageToGroups(groups []nodeconfig.GroupID, msg []byte) error // GroupReceiver returns a receiver of messages sent to a multicast group. // Each call creates a new receiver. // If multiple receivers are created for the same group, // a message sent to the group will be delivered to all of the receivers. - GroupReceiver(GroupID) (receiver GroupReceiver, err error) + GroupReceiver(nodeconfig.GroupID) (receiver GroupReceiver, err error) } diff --git a/p2p/host/hostv2/hostv2.go b/p2p/host/hostv2/hostv2.go index 9b67bf718..e8e05b7bf 100644 --- a/p2p/host/hostv2/hostv2.go +++ b/p2p/host/hostv2/hostv2.go @@ -9,6 +9,7 @@ import ( "github.com/rs/zerolog" + nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/p2p" @@ -54,7 +55,7 @@ type HostV2 struct { } // SendMessageToGroups sends a message to one or more multicast groups. -func (host *HostV2) SendMessageToGroups(groups []p2p.GroupID, msg []byte) error { +func (host *HostV2) SendMessageToGroups(groups []nodeconfig.GroupID, msg []byte) error { var error error for _, group := range groups { err := host.pubsub.Publish(string(group), msg) @@ -100,7 +101,7 @@ func (r *GroupReceiverImpl) Receive(ctx context.Context) ( // GroupReceiver returns a receiver of messages sent to a multicast group. // See the GroupReceiver interface for details. -func (host *HostV2) GroupReceiver(group p2p.GroupID) ( +func (host *HostV2) GroupReceiver(group nodeconfig.GroupID) ( receiver p2p.GroupReceiver, err error, ) { sub, err := host.pubsub.Subscribe(string(group)) diff --git a/p2p/host/hostv2/hostv2_test.go b/p2p/host/hostv2/hostv2_test.go index b98203dfa..9f63337f0 100644 --- a/p2p/host/hostv2/hostv2_test.go +++ b/p2p/host/hostv2/hostv2_test.go @@ -11,7 +11,7 @@ import ( libp2p_pubsub "github.com/libp2p/go-libp2p-pubsub" libp2p_pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb" - "github.com/harmony-one/harmony/p2p" + nodeconfig "github.com/harmony-one/harmony/internal/configs/node" mock "github.com/harmony-one/harmony/p2p/host/hostv2/mock" ) @@ -19,7 +19,7 @@ func TestHostV2_SendMessageToGroups(t *testing.T) { t.Run("Basic", func(t *testing.T) { mc := gomock.NewController(t) defer mc.Finish() - groups := []p2p.GroupID{"ABC", "DEF"} + groups := []nodeconfig.GroupID{"ABC", "DEF"} data := []byte{1, 2, 3} pubsub := mock.NewMockpubsub(mc) gomock.InOrder( @@ -34,7 +34,7 @@ func TestHostV2_SendMessageToGroups(t *testing.T) { t.Run("Error", func(t *testing.T) { mc := gomock.NewController(t) defer mc.Finish() - groups := []p2p.GroupID{"ABC", "DEF"} + groups := []nodeconfig.GroupID{"ABC", "DEF"} data := []byte{1, 2, 3} pubsub := mock.NewMockpubsub(mc) gomock.InOrder( diff --git a/p2p/host/mock/host_mock.go b/p2p/host/mock/host_mock.go index 815e3da82..adc0552a4 100644 --- a/p2p/host/mock/host_mock.go +++ b/p2p/host/mock/host_mock.go @@ -6,6 +6,7 @@ package mock_p2p import ( gomock "github.com/golang/mock/gomock" + node "github.com/harmony-one/harmony/internal/configs/node" p2p "github.com/harmony-one/harmony/p2p" go_libp2p_host "github.com/libp2p/go-libp2p-host" go_libp2p_peer "github.com/libp2p/go-libp2p-peer" @@ -132,7 +133,7 @@ func (mr *MockHostMockRecorder) ConnectHostPeer(arg0 interface{}) *gomock.Call { } // SendMessageToGroups mocks base method -func (m *MockHost) SendMessageToGroups(groups []p2p.GroupID, msg []byte) error { +func (m *MockHost) SendMessageToGroups(groups []node.GroupID, msg []byte) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "SendMessageToGroups", groups, msg) ret0, _ := ret[0].(error) @@ -146,7 +147,7 @@ func (mr *MockHostMockRecorder) SendMessageToGroups(groups, msg interface{}) *go } // GroupReceiver mocks base method -func (m *MockHost) GroupReceiver(arg0 p2p.GroupID) (p2p.GroupReceiver, error) { +func (m *MockHost) GroupReceiver(arg0 node.GroupID) (p2p.GroupReceiver, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GroupReceiver", arg0) ret0, _ := ret[0].(p2p.GroupReceiver) diff --git a/p2p/p2p.go b/p2p/p2p.go index 33cae41c9..2f362fb5d 100644 --- a/p2p/p2p.go +++ b/p2p/p2p.go @@ -1,7 +1,9 @@ package p2p import ( + "context" "fmt" + "io" "net" "github.com/harmony-one/bls/ffi/go/bls" @@ -28,3 +30,12 @@ func (p Peer) String() string { } return fmt.Sprintf("BlsPubKey:%s-%s/%s[%d]", BlsPubKey, net.JoinHostPort(p.IP, p.Port), p.PeerID, len(p.Addrs)) } + +// GroupReceiver is a multicast group message receiver interface. +type GroupReceiver interface { + // Close closes this receiver. + io.Closer + + // Receive a message. + Receive(ctx context.Context) (msg []byte, sender libp2p_peer.ID, err error) +}