[p2p] add network type prefix to group id

this is solve the problem of validators in different network connected
with each others.

* mainet is still using the original harmony prefix to keep backward
compatibility

* pangaea uses "pangaea" as network prefix
* testnet uses "testnet" as network prefix

All nodes in Pangaea and Testnet need to restart to re-connect with each
other.  Mainnet nodes have no changes.

Signed-off-by: Leo Chen <leo@harmony.one>
pull/1688/head
Leo Chen 5 years ago
parent b4ffb3f7e7
commit 2240b8d48f
  1. 39
      api/service/config.go
  2. 26
      api/service/discovery/service.go
  3. 5
      api/service/manager_test.go
  4. 15
      api/service/networkinfo/service.go
  5. 3
      api/service/networkinfo/service_test.go
  6. 10
      cmd/client/txgen/main.go
  7. 4
      cmd/client/wallet/main.go
  8. 4
      cmd/client/wallet_stress_test/main.go
  9. 34
      cmd/harmony/main.go
  10. 7
      consensus/consensus_msg_sender.go
  11. 14
      consensus/consensus_v2.go
  12. 8
      consensus/view_change.go
  13. 4
      drand/drand_leader.go
  14. 4
      drand/drand_validator.go
  15. 36
      internal/configs/node/config.go
  16. 8
      internal/configs/node/config_test.go
  17. 55
      internal/configs/node/group.go
  18. 2
      internal/configs/node/group_test.go
  19. 14
      node/node.go
  20. 6
      node/node_cross_shard.go
  21. 4
      node/node_handler.go
  22. 4
      node/node_resharding.go
  23. 7
      node/service_setup.go
  24. 5
      p2p/host.go
  25. 5
      p2p/host/hostv2/hostv2.go
  26. 6
      p2p/host/hostv2/hostv2_test.go
  27. 5
      p2p/host/mock/host_mock.go
  28. 11
      p2p/p2p.go

@ -2,7 +2,6 @@ package service
import (
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/p2p"
)
// NodeConfig defines a structure of node configuration
@ -11,37 +10,37 @@ import (
// cyclic imports
type NodeConfig struct {
// The three groupID design, please refer to https://github.com/harmony-one/harmony/blob/master/node/node.md#libp2p-integration
Beacon p2p.GroupID // the beacon group ID
ShardGroupID p2p.GroupID // the group ID of the shard
Client p2p.GroupID // the client group ID of the shard
IsClient bool // whether this node is a client node, such as wallet/txgen
IsBeacon bool // whether this node is a beacon node or not
ShardID uint32 // shardID of this node
Actions map[p2p.GroupID]p2p.ActionType // actions on the groups
PushgatewayIP string // prometheus pushgateway ip
PushgatewayPort string // prometheus pushgateway port
MetricsFlag bool // flag to collect metrics or not
Beacon nodeconfig.GroupID // the beacon group ID
ShardGroupID nodeconfig.GroupID // the group ID of the shard
Client nodeconfig.GroupID // the client group ID of the shard
IsClient bool // whether this node is a client node, such as wallet/txgen
IsBeacon bool // whether this node is a beacon node or not
ShardID uint32 // shardID of this node
Actions map[nodeconfig.GroupID]nodeconfig.ActionType // actions on the groups
PushgatewayIP string // prometheus pushgateway ip
PushgatewayPort string // prometheus pushgateway port
MetricsFlag bool // flag to collect metrics or not
}
// GroupIDShards is a map of ShardGroupID ID
// key is the shard ID
// value is the corresponding group ID
var (
GroupIDShards map[p2p.ShardID]p2p.GroupID
GroupIDShardClients map[p2p.ShardID]p2p.GroupID
GroupIDShards map[nodeconfig.ShardID]nodeconfig.GroupID
GroupIDShardClients map[nodeconfig.ShardID]nodeconfig.GroupID
)
func init() {
GroupIDShards = make(map[p2p.ShardID]p2p.GroupID)
GroupIDShardClients = make(map[p2p.ShardID]p2p.GroupID)
GroupIDShards = make(map[nodeconfig.ShardID]nodeconfig.GroupID)
GroupIDShardClients = make(map[nodeconfig.ShardID]nodeconfig.GroupID)
// init beacon chain group IDs
GroupIDShards[0] = p2p.GroupIDBeacon
GroupIDShardClients[0] = p2p.GroupIDBeaconClient
GroupIDShards[0] = nodeconfig.NewGroupIDByShardID(0)
GroupIDShardClients[0] = nodeconfig.NewClientGroupIDByShardID(0)
for i := 1; i < nodeconfig.MaxShards; i++ {
sid := p2p.ShardID(i)
GroupIDShards[sid] = p2p.NewGroupIDByShardID(sid)
GroupIDShardClients[sid] = p2p.NewClientGroupIDByShardID(sid)
sid := nodeconfig.ShardID(i)
GroupIDShards[sid] = nodeconfig.NewGroupIDByShardID(sid)
GroupIDShardClients[sid] = nodeconfig.NewClientGroupIDByShardID(sid)
}
}

@ -18,9 +18,9 @@ type Service struct {
host p2p.Host
peerChan chan p2p.Peer
stopChan chan struct{}
actionChan chan p2p.GroupAction
actionChan chan nodeconfig.GroupAction
config service.NodeConfig
actions map[p2p.GroupID]p2p.ActionType
actions map[nodeconfig.GroupID]nodeconfig.ActionType
messageChan chan *msg_pb.Message
addBeaconPeerFunc func(*p2p.Peer) bool
}
@ -34,9 +34,9 @@ func New(h p2p.Host, config service.NodeConfig, peerChan chan p2p.Peer, addPeer
host: h,
peerChan: peerChan,
stopChan: make(chan struct{}),
actionChan: make(chan p2p.GroupAction),
actionChan: make(chan nodeconfig.GroupAction),
config: config,
actions: make(map[p2p.GroupID]p2p.ActionType),
actions: make(map[nodeconfig.GroupID]nodeconfig.ActionType),
addBeaconPeerFunc: addPeer,
}
}
@ -58,7 +58,7 @@ func (s *Service) StopService() {
// NotifyService receives notification from service manager
func (s *Service) NotifyService(params map[string]interface{}) {
data := params["peer"]
action, ok := data.(p2p.GroupAction)
action, ok := data.(nodeconfig.GroupAction)
if !ok {
utils.Logger().Error().Msg("Wrong data type passed to NotifyService")
return
@ -117,18 +117,14 @@ func (s *Service) contactP2pPeers() {
}
// sentPingMessage sends a ping message to a pubsub topic
func (s *Service) sentPingMessage(g p2p.GroupID, msgBuf []byte) {
func (s *Service) sentPingMessage(g nodeconfig.GroupID, msgBuf []byte) {
var err error
if g == p2p.GroupIDBeacon || g == p2p.GroupIDBeaconClient {
err = s.host.SendMessageToGroups([]p2p.GroupID{s.config.Beacon}, msgBuf)
} else {
// The following logical will be used for 2nd stage peer discovery process
// do nothing when the groupID is unknown
if s.config.ShardGroupID == p2p.GroupIDUnknown {
return
}
err = s.host.SendMessageToGroups([]p2p.GroupID{s.config.ShardGroupID}, msgBuf)
// The following logical will be used for 2nd stage peer discovery process
// do nothing when the groupID is unknown
if s.config.ShardGroupID == nodeconfig.GroupIDUnknown {
return
}
err = s.host.SendMessageToGroups([]nodeconfig.GroupID{s.config.ShardGroupID}, msgBuf)
if err != nil {
utils.Logger().Error().Err(err).Str("group", string(g)).Msg("Failed to send ping message")
}

@ -8,7 +8,6 @@ import (
"github.com/ethereum/go-ethereum/rpc"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/p2p"
)
type SupportSyncingTest struct {
@ -102,8 +101,8 @@ func TestStopServices(t *testing.T) {
}
func TestInit(t *testing.T) {
if GroupIDShards[p2p.ShardID(0)] != p2p.GroupIDBeacon {
t.Errorf("GroupIDShards[0]: %v != GroupIDBeacon: %v", GroupIDShards[p2p.ShardID(0)], p2p.GroupIDBeacon)
if GroupIDShards[nodeconfig.ShardID(0)] != nodeconfig.NewGroupIDByShardID(0) {
t.Errorf("GroupIDShards[0]: %v != GroupIDBeacon: %v", GroupIDShards[nodeconfig.ShardID(0)], nodeconfig.NewGroupIDByShardID(0))
}
if len(GroupIDShards) != nodeconfig.MaxShards {
t.Errorf("len(GroupIDShards): %v != TotalShards: %v", len(GroupIDShards), nodeconfig.MaxShards)

@ -12,6 +12,7 @@ import (
"github.com/ethereum/go-ethereum/rpc"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
badger "github.com/ipfs/go-ds-badger"
@ -25,7 +26,7 @@ import (
// Service is the network info service.
type Service struct {
Host p2p.Host
Rendezvous p2p.GroupID
Rendezvous nodeconfig.GroupID
bootnodes utils.AddrList
dht *libp2pdht.IpfsDHT
cancel context.CancelFunc
@ -61,7 +62,7 @@ const (
// New returns role conversion service. If dataStorePath is not empty, it
// points to a persistent database directory to use.
func New(
h p2p.Host, rendezvous p2p.GroupID, peerChan chan p2p.Peer,
h p2p.Host, rendezvous nodeconfig.GroupID, peerChan chan p2p.Peer,
bootnodes utils.AddrList, dataStorePath string,
) (*Service, error) {
var cancel context.CancelFunc
@ -100,7 +101,7 @@ func New(
// MustNew is a panic-on-error version of New.
func MustNew(
h p2p.Host, rendezvous p2p.GroupID, peerChan chan p2p.Peer,
h p2p.Host, rendezvous nodeconfig.GroupID, peerChan chan p2p.Peer,
bootnodes utils.AddrList, dataStorePath string,
) *Service {
service, err := New(h, rendezvous, peerChan, bootnodes, dataStorePath)
@ -167,7 +168,10 @@ func (s *Service) Init() error {
utils.Logger().Info().Str("Rendezvous", string(s.Rendezvous)).Msg("Announcing ourselves...")
s.discovery = libp2pdis.NewRoutingDiscovery(s.dht)
libp2pdis.Advertise(ctx, s.discovery, string(s.Rendezvous))
libp2pdis.Advertise(ctx, s.discovery, string(p2p.GroupIDBeaconClient))
// Everyone is beacon client, which means everyone is connected via beacon client topic
// 0 is beacon chain FIXME: use a constant
libp2pdis.Advertise(ctx, s.discovery, string(nodeconfig.NewClientGroupIDByShardID(0)))
utils.Logger().Info().Msg("Successfully announced!")
return nil
@ -193,7 +197,8 @@ func (s *Service) DoService() {
return
case <-tick.C:
libp2pdis.Advertise(ctx, s.discovery, string(s.Rendezvous))
libp2pdis.Advertise(ctx, s.discovery, string(p2p.GroupIDBeaconClient))
// 0 is beacon chain FIXME: use a constant
libp2pdis.Advertise(ctx, s.discovery, string(nodeconfig.NewClientGroupIDByShardID(0)))
utils.Logger().Info().Str("Rendezvous", string(s.Rendezvous)).Msg("Successfully announced!")
default:
var err error

@ -6,6 +6,7 @@ import (
"github.com/harmony-one/harmony/crypto/bls"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/p2pimpl"
@ -28,7 +29,7 @@ func TestService(t *testing.T) {
t.Fatal("unable to new host in harmony")
}
s, err := New(host, p2p.GroupIDBeaconClient, nil, nil, "")
s, err := New(host, nodeconfig.GroupIDBeaconClient, nil, nil, "")
if err != nil {
t.Fatalf("New() failed: %s", err)
}

@ -119,9 +119,9 @@ func setUpTXGen() *node.Node {
}
txGen.NodeConfig.SetRole(nodeconfig.ClientNode)
if shardID == 0 {
txGen.NodeConfig.SetShardGroupID(p2p.GroupIDBeacon)
txGen.NodeConfig.SetShardGroupID(nodeconfig.GroupIDBeacon)
} else {
txGen.NodeConfig.SetShardGroupID(p2p.NewGroupIDByShardID(p2p.ShardID(shardID)))
txGen.NodeConfig.SetShardGroupID(nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(shardID)))
}
txGen.NodeConfig.SetIsClient(true)
@ -289,10 +289,10 @@ func SendTxsToShard(clientNode *node.Node, txs types.Transactions, shardID uint3
msg := proto_node.ConstructTransactionListMessageAccount(txs)
var err error
if shardID == 0 {
err = clientNode.GetHost().SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeaconClient}, p2p_host.ConstructP2pMessage(byte(0), msg))
err = clientNode.GetHost().SendMessageToGroups([]nodeconfig.GroupID{nodeconfig.GroupIDBeaconClient}, p2p_host.ConstructP2pMessage(byte(0), msg))
} else {
clientGroup := p2p.NewClientGroupIDByShardID(p2p.ShardID(shardID))
err = clientNode.GetHost().SendMessageToGroups([]p2p.GroupID{clientGroup}, p2p_host.ConstructP2pMessage(byte(0), msg))
clientGroup := nodeconfig.NewClientGroupIDByShardID(nodeconfig.ShardID(shardID))
err = clientNode.GetHost().SendMessageToGroups([]nodeconfig.GroupID{clientGroup}, p2p_host.ConstructP2pMessage(byte(0), msg))
}
if err != nil {
utils.Logger().Debug().

@ -935,9 +935,9 @@ func clearKeystore() {
// submitTransaction submits the transaction to the Harmony network
func submitTransaction(tx *types.Transaction, walletNode *node.Node, shardID uint32) error {
msg := proto_node.ConstructTransactionListMessageAccount(types.Transactions{tx})
clientGroup := p2p.NewClientGroupIDByShardID(p2p.ShardID(shardID))
clientGroup := nodeconfig.NewClientGroupIDByShardID(nodeconfig.ShardID(shardID))
err := walletNode.GetHost().SendMessageToGroups([]p2p.GroupID{clientGroup}, p2p_host.ConstructP2pMessage(byte(0), msg))
err := walletNode.GetHost().SendMessageToGroups([]nodeconfig.GroupID{clientGroup}, p2p_host.ConstructP2pMessage(byte(0), msg))
if err != nil {
fmt.Printf("Error in SubmitTransaction: %v\n", err)
return err

@ -457,9 +457,9 @@ func clearKeystore() {
// submitTransaction submits the transaction to the Harmony network
func submitTransaction(tx *types.Transaction, walletNode *node.Node, shardID uint32) error {
msg := proto_node.ConstructTransactionListMessageAccount(types.Transactions{tx})
clientGroup := p2p.NewClientGroupIDByShardID(p2p.ShardID(shardID))
clientGroup := nodeconfig.NewClientGroupIDByShardID(nodeconfig.ShardID(shardID))
err := walletNode.GetHost().SendMessageToGroups([]p2p.GroupID{clientGroup}, p2p_host.ConstructP2pMessage(byte(0), msg))
err := walletNode.GetHost().SendMessageToGroups([]nodeconfig.GroupID{clientGroup}, p2p_host.ConstructP2pMessage(byte(0), msg))
if err != nil {
fmt.Printf("Error in SubmitTransaction: %v\n", err)
return err

@ -35,6 +35,7 @@ import (
"github.com/harmony-one/harmony/p2p/p2pimpl"
)
// Version string variables
var (
version string
builtBy string
@ -42,6 +43,11 @@ var (
commit string
)
// Host
var (
myHost p2p.Host
)
// InitLDBDatabase initializes a LDBDatabase. isGenesis=true will return the beacon chain database for normal shard nodes
func InitLDBDatabase(ip string, port string, freshDB bool, isBeacon bool) (*ethdb.LDBDatabase, error) {
var dbFileName string
@ -64,6 +70,7 @@ func printVersion() {
os.Exit(0)
}
// Flags
var (
ip = flag.String("ip", "127.0.0.1", "ip of the node")
port = flag.String("port", "9000", "port of the node.")
@ -262,11 +269,12 @@ func createGlobalConfig() *nodeconfig.ConfigType {
if err != nil {
panic(err)
}
nodeConfig.SelfPeer = p2p.Peer{IP: *ip, Port: *port, ConsensusPubKey: nodeConfig.ConsensusPubKey}
nodeConfig.Host, err = p2pimpl.NewHost(&nodeConfig.SelfPeer, nodeConfig.P2pPriKey)
selfPeer := p2p.Peer{IP: *ip, Port: *port, ConsensusPubKey: nodeConfig.ConsensusPubKey}
myHost, err = p2pimpl.NewHost(&selfPeer, nodeConfig.P2pPriKey)
if *logConn && nodeConfig.GetNetworkType() != nodeconfig.Mainnet {
nodeConfig.Host.GetP2PHost().Network().Notify(utils.NewConnLogger(utils.GetLogInstance()))
myHost.GetP2PHost().Network().Notify(utils.NewConnLogger(utils.GetLogInstance()))
}
if err != nil {
panic("unable to new host in harmony")
@ -281,7 +289,7 @@ func setupConsensusAndNode(nodeConfig *nodeconfig.ConfigType) *node.Node {
// Consensus object.
// TODO: consensus object shouldn't start here
// TODO(minhdoan): During refactoring, found out that the peers list is actually empty. Need to clean up the logic of consensus later.
currentConsensus, err := consensus.New(nodeConfig.Host, nodeConfig.ShardID, nodeConfig.Leader, nodeConfig.ConsensusPriKey)
currentConsensus, err := consensus.New(myHost, nodeConfig.ShardID, p2p.Peer{}, nodeConfig.ConsensusPriKey)
currentConsensus.SelfAddress = common.ParseAddr(initialAccount.Address)
if err != nil {
@ -302,7 +310,7 @@ func setupConsensusAndNode(nodeConfig *nodeconfig.ConfigType) *node.Node {
// Current node.
chainDBFactory := &shardchain.LDBFactory{RootDir: nodeConfig.DBDir}
currentNode := node.New(nodeConfig.Host, currentConsensus, chainDBFactory, *isArchival)
currentNode := node.New(myHost, currentConsensus, chainDBFactory, *isArchival)
switch {
case *networkType == nodeconfig.Localnet:
@ -338,21 +346,21 @@ func setupConsensusAndNode(nodeConfig *nodeconfig.ConfigType) *node.Node {
currentNode.NodeConfig.SetPushgatewayPort(nodeConfig.PushgatewayPort)
currentNode.NodeConfig.SetMetricsFlag(nodeConfig.MetricsFlag)
currentNode.NodeConfig.SetBeaconGroupID(p2p.NewGroupIDByShardID(0))
currentNode.NodeConfig.SetBeaconGroupID(nodeconfig.NewGroupIDByShardID(0))
switch *nodeType {
case "explorer":
currentNode.NodeConfig.SetRole(nodeconfig.ExplorerNode)
currentNode.NodeConfig.SetShardGroupID(p2p.NewGroupIDByShardID(p2p.ShardID(*shardID)))
currentNode.NodeConfig.SetClientGroupID(p2p.NewClientGroupIDByShardID(p2p.ShardID(*shardID)))
currentNode.NodeConfig.SetShardGroupID(nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(*shardID)))
currentNode.NodeConfig.SetClientGroupID(nodeconfig.NewClientGroupIDByShardID(nodeconfig.ShardID(*shardID)))
case "validator":
currentNode.NodeConfig.SetRole(nodeconfig.Validator)
if nodeConfig.ShardID == 0 {
currentNode.NodeConfig.SetShardGroupID(p2p.GroupIDBeacon)
currentNode.NodeConfig.SetClientGroupID(p2p.GroupIDBeaconClient)
currentNode.NodeConfig.SetShardGroupID(nodeconfig.NewGroupIDByShardID(0))
currentNode.NodeConfig.SetClientGroupID(nodeconfig.NewClientGroupIDByShardID(0))
} else {
currentNode.NodeConfig.SetShardGroupID(p2p.NewGroupIDByShardID(p2p.ShardID(nodeConfig.ShardID)))
currentNode.NodeConfig.SetClientGroupID(p2p.NewClientGroupIDByShardID(p2p.ShardID(nodeConfig.ShardID)))
currentNode.NodeConfig.SetShardGroupID(nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(nodeConfig.ShardID)))
currentNode.NodeConfig.SetClientGroupID(nodeconfig.NewClientGroupIDByShardID(nodeconfig.ShardID(nodeConfig.ShardID)))
}
}
currentNode.NodeConfig.ConsensusPubKey = nodeConfig.ConsensusPubKey
@ -479,7 +487,7 @@ func main() {
Str("BeaconGroupID", nodeConfig.GetBeaconGroupID().String()).
Str("ClientGroupID", nodeConfig.GetClientGroupID().String()).
Str("Role", currentNode.NodeConfig.Role().String()).
Str("multiaddress", fmt.Sprintf("/ip4/%s/tcp/%s/p2p/%s", *ip, *port, nodeConfig.Host.GetID().Pretty())).
Str("multiaddress", fmt.Sprintf("/ip4/%s/tcp/%s/p2p/%s", *ip, *port, myHost.GetID().Pretty())).
Msg(startMsg)
if *enableMemProfiling {

@ -5,6 +5,7 @@ import (
"time"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
)
@ -28,7 +29,7 @@ type MessageSender struct {
// MessageRetry controls the message that can be retried
type MessageRetry struct {
blockNum uint64 // The block number this message is for
groups []p2p.GroupID
groups []nodeconfig.GroupID
p2pMsg []byte
msgType msg_pb.MessageType
retryCount int
@ -58,7 +59,7 @@ func (sender *MessageSender) Reset(blockNum uint64) {
}
// SendWithRetry sends message with retry logic.
func (sender *MessageSender) SendWithRetry(blockNum uint64, msgType msg_pb.MessageType, groups []p2p.GroupID, p2pMsg []byte) error {
func (sender *MessageSender) SendWithRetry(blockNum uint64, msgType msg_pb.MessageType, groups []nodeconfig.GroupID, p2pMsg []byte) error {
willRetry := sender.retryTimes != 0
msgRetry := MessageRetry{blockNum: blockNum, groups: groups, p2pMsg: p2pMsg, msgType: msgType, retryCount: 0, isActive: willRetry}
if willRetry {
@ -71,7 +72,7 @@ func (sender *MessageSender) SendWithRetry(blockNum uint64, msgType msg_pb.Messa
}
// SendWithoutRetry sends message without retry logic.
func (sender *MessageSender) SendWithoutRetry(groups []p2p.GroupID, p2pMsg []byte) error {
func (sender *MessageSender) SendWithoutRetry(groups []nodeconfig.GroupID, p2pMsg []byte) error {
return sender.host.SendMessageToGroups(groups, p2pMsg)
}

@ -19,9 +19,9 @@ import (
"github.com/harmony-one/harmony/core/types"
vrf_bls "github.com/harmony-one/harmony/crypto/vrf/bls"
"github.com/harmony-one/harmony/internal/chain"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/ctxerror"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/host"
)
@ -134,9 +134,9 @@ func (consensus *Consensus) announce(block *types.Block) {
// Construct broadcast p2p message
if err := consensus.msgSender.SendWithRetry(consensus.blockNum, msg_pb.MessageType_ANNOUNCE, []p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil {
if err := consensus.msgSender.SendWithRetry(consensus.blockNum, msg_pb.MessageType_ANNOUNCE, []nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil {
utils.Logger().Warn().
Str("groupID", string(p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID)))).
Str("groupID", string(nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)))).
Msg("[Announce] Cannot send announce message")
} else {
utils.Logger().Info().
@ -284,7 +284,7 @@ func (consensus *Consensus) prepare() {
msgToSend := consensus.constructPrepareMessage()
// TODO: this will not return immediatey, may block
if err := consensus.msgSender.SendWithoutRetry([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil {
if err := consensus.msgSender.SendWithoutRetry([]nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil {
utils.Logger().Warn().Err(err).Msg("[OnAnnounce] Cannot send prepare message")
} else {
utils.Logger().Info().
@ -408,7 +408,7 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) {
return
}
if err := consensus.msgSender.SendWithRetry(consensus.blockNum, msg_pb.MessageType_PREPARED, []p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil {
if err := consensus.msgSender.SendWithRetry(consensus.blockNum, msg_pb.MessageType_PREPARED, []nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil {
utils.Logger().Warn().Msg("[OnPrepare] Cannot send prepared message")
} else {
utils.Logger().Debug().
@ -592,7 +592,7 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
time.Sleep(consensus.delayCommit)
}
if err := consensus.msgSender.SendWithoutRetry([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil {
if err := consensus.msgSender.SendWithoutRetry([]nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil {
utils.Logger().Warn().Msg("[OnPrepared] Cannot send commit message!!")
} else {
utils.Logger().Info().
@ -771,7 +771,7 @@ func (consensus *Consensus) finalizeCommits() {
}
// if leader success finalize the block, send committed message to validators
if err := consensus.msgSender.SendWithRetry(block.NumberU64(), msg_pb.MessageType_COMMITTED, []p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil {
if err := consensus.msgSender.SendWithRetry(block.NumberU64(), msg_pb.MessageType_COMMITTED, []nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil {
utils.Logger().Warn().Err(err).Msg("[Finalizing] Cannot send committed message")
} else {
utils.Logger().Info().

@ -10,8 +10,8 @@ import (
"github.com/harmony-one/bls/ffi/go/bls"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
bls_cosi "github.com/harmony-one/harmony/crypto/bls"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/host"
)
@ -187,7 +187,7 @@ func (consensus *Consensus) startViewChange(viewID uint64) {
Msg("[startViewChange]")
msgToSend := consensus.constructViewChangeMessage()
consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend))
consensus.host.SendMessageToGroups([]nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend))
consensus.consensusTimeout[timeoutViewChange].SetDuration(duration)
consensus.consensusTimeout[timeoutViewChange].Start()
@ -432,7 +432,7 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
Int("payloadSize", len(consensus.m1Payload)).
Hex("M1Payload", consensus.m1Payload).
Msg("[onViewChange] Sent NewView Message")
consensus.msgSender.SendWithRetry(consensus.blockNum, msg_pb.MessageType_NEWVIEW, []p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend))
consensus.msgSender.SendWithRetry(consensus.blockNum, msg_pb.MessageType_NEWVIEW, []nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend))
consensus.viewID = recvMsg.ViewID
consensus.ResetViewChangeState()
@ -562,7 +562,7 @@ func (consensus *Consensus) onNewView(msg *msg_pb.Message) {
msgToSend := consensus.constructCommitMessage(commitPayload)
utils.Logger().Info().Msg("onNewView === commit")
consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend))
consensus.host.SendMessageToGroups([]nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend))
utils.Logger().Debug().
Str("From", consensus.phase.String()).
Str("To", Commit.String()).

@ -12,8 +12,8 @@ import (
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/crypto/vdf"
"github.com/harmony-one/harmony/crypto/vrf/p256"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/host"
)
@ -81,7 +81,7 @@ func (dRand *DRand) init(epochBlock *types.Block) {
Hex("msg", msgToSend).
Str("leader.PubKey", dRand.leader.ConsensusPubKey.SerializeToHexStr()).
Msg("[DRG] sent init")
dRand.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(dRand.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend))
dRand.host.SendMessageToGroups([]nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(dRand.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend))
}
// ProcessMessageLeader dispatches messages for the leader to corresponding processors.

@ -3,8 +3,8 @@ package drand
import (
protobuf "github.com/golang/protobuf/proto"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/host"
)
@ -59,5 +59,5 @@ func (dRand *DRand) processInitMessage(message *msg_pb.Message) {
msgToSend := dRand.constructCommitMessage(rand, proof)
// Send the commit message back to leader
dRand.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(dRand.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend))
dRand.host.SendMessageToGroups([]nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(dRand.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend))
}

@ -11,8 +11,6 @@ import (
"github.com/harmony-one/bls/ffi/go/bls"
p2p_crypto "github.com/libp2p/go-libp2p-crypto"
"github.com/harmony-one/harmony/p2p"
)
// Role defines a role of a node.
@ -67,21 +65,20 @@ var publicRPC bool // enable public RPC access
// ConfigType is the structure of all node related configuration variables
type ConfigType struct {
// The three groupID design, please refer to https://github.com/harmony-one/harmony/blob/master/node/node.md#libp2p-integration
beacon p2p.GroupID // the beacon group ID
group p2p.GroupID // the group ID of the shard (note: for beacon chain node, the beacon and shard group are the same)
client p2p.GroupID // the client group ID of the shard
isClient bool // whether this node is a client node, such as wallet/txgen
isBeacon bool // whether this node is beacon node doing consensus or not
ShardID uint32 // ShardID of this node; TODO ek – reviisit when resharding
role Role // Role of the node
Port string // Port of the node.
IP string // IP of the node.
beacon GroupID // the beacon group ID
group GroupID // the group ID of the shard (note: for beacon chain node, the beacon and shard group are the same)
client GroupID // the client group ID of the shard
isClient bool // whether this node is a client node, such as wallet/txgen
isBeacon bool // whether this node is beacon node doing consensus or not
ShardID uint32 // ShardID of this node; TODO ek – reviisit when resharding
role Role // Role of the node
Port string // Port of the node.
IP string // IP of the node.
MetricsFlag bool // collect and upload metrics flag
PushgatewayIP string // metrics pushgateway prometheus ip
PushgatewayPort string // metrics pushgateway prometheus port
StringRole string
Host p2p.Host
StakingPriKey *ecdsa.PrivateKey
P2pPriKey p2p_crypto.PrivKey
ConsensusPriKey *bls.SecretKey
@ -90,9 +87,6 @@ type ConfigType struct {
// Database directory
DBDir string
SelfPeer p2p.Peer
Leader p2p.Peer
networkType NetworkType
}
@ -139,17 +133,17 @@ func (conf *ConfigType) String() string {
}
// SetBeaconGroupID set the groupID for beacon group
func (conf *ConfigType) SetBeaconGroupID(g p2p.GroupID) {
func (conf *ConfigType) SetBeaconGroupID(g GroupID) {
conf.beacon = g
}
// SetShardGroupID set the groupID for shard group
func (conf *ConfigType) SetShardGroupID(g p2p.GroupID) {
func (conf *ConfigType) SetShardGroupID(g GroupID) {
conf.group = g
}
// SetClientGroupID set the groupID for client group
func (conf *ConfigType) SetClientGroupID(g p2p.GroupID) {
func (conf *ConfigType) SetClientGroupID(g GroupID) {
conf.client = g
}
@ -199,12 +193,12 @@ func (conf *ConfigType) GetPushgatewayPort() string {
}
// GetBeaconGroupID returns the groupID for beacon group
func (conf *ConfigType) GetBeaconGroupID() p2p.GroupID {
func (conf *ConfigType) GetBeaconGroupID() GroupID {
return conf.beacon
}
// GetShardGroupID returns the groupID for shard group
func (conf *ConfigType) GetShardGroupID() p2p.GroupID {
func (conf *ConfigType) GetShardGroupID() GroupID {
return conf.group
}
@ -214,7 +208,7 @@ func (conf *ConfigType) GetShardID() uint32 {
}
// GetClientGroupID returns the groupID for client group
func (conf *ConfigType) GetClientGroupID() p2p.GroupID {
func (conf *ConfigType) GetClientGroupID() GroupID {
return conf.client
}

@ -2,8 +2,6 @@ package nodeconfig
import (
"testing"
"github.com/harmony-one/harmony/p2p"
)
func TestNodeConfigSingleton(t *testing.T) {
@ -13,14 +11,14 @@ func TestNodeConfigSingleton(t *testing.T) {
// get the singleton variable
c := GetShardConfig(Global)
c.SetBeaconGroupID(p2p.GroupIDBeacon)
c.SetBeaconGroupID(GroupIDBeacon)
d := GetShardConfig(Global)
g := d.GetBeaconGroupID()
if g != p2p.GroupIDBeacon {
t.Errorf("GetBeaconGroupID = %v, expected = %v", g, p2p.GroupIDBeacon)
if g != GroupIDBeacon {
t.Errorf("GetBeaconGroupID = %v, expected = %v", g, GroupIDBeacon)
}
}

@ -1,12 +1,8 @@
package p2p
package nodeconfig
import (
"context"
"fmt"
"io"
"strconv"
libp2p_peer "github.com/libp2p/go-libp2p-peer"
)
// GroupID is a multicast group ID.
@ -25,32 +21,50 @@ func (id GroupID) String() string {
// Const of group ID
const (
GroupIDBeacon GroupID = "harmony/0.0.1/node/beacon"
GroupIDBeaconClient GroupID = "harmony/0.0.1/client/beacon"
GroupIDShardPrefix GroupID = "harmony/0.0.1/node/shard/%s"
GroupIDShardClientPrefix GroupID = "harmony/0.0.1/client/shard/%s"
GroupIDGlobal GroupID = "harmony/0.0.1/node/global"
GroupIDGlobalClient GroupID = "harmony/0.0.1/node/global"
GroupIDUnknown GroupID = "B1acKh0lE"
GroupIDBeacon GroupID = "%s/0.0.1/node/beacon"
GroupIDBeaconClient GroupID = "%s/0.0.1/client/beacon"
GroupIDShardPrefix GroupID = "%s/0.0.1/node/shard/%s"
GroupIDShardClientPrefix GroupID = "%s/0.0.1/client/shard/%s"
GroupIDGlobal GroupID = "%s/0.0.1/node/global"
GroupIDGlobalClient GroupID = "%s/0.0.1/node/global"
GroupIDUnknown GroupID = "%s/B1acKh0lE"
)
// ShardID defines the ID of a shard
type ShardID uint32
func getNetworkPrefix(shardID ShardID) (netPre string) {
switch GetShardConfig(uint32(shardID)).GetNetworkType() {
case Mainnet:
netPre = "harmony"
case Testnet:
netPre = "testnet"
case Pangaea:
netPre = "pangaea"
case Devnet:
netPre = "devnet"
case Localnet:
netPre = "local"
default:
netPre = "misc"
}
return
}
// NewGroupIDByShardID returns a new groupID for a shard
func NewGroupIDByShardID(shardID ShardID) GroupID {
if shardID == 0 {
return GroupIDBeacon
return GroupID(fmt.Sprintf(GroupIDBeacon.String(), getNetworkPrefix(shardID)))
}
return GroupID(fmt.Sprintf(GroupIDShardPrefix.String(), strconv.Itoa(int(shardID))))
return GroupID(fmt.Sprintf(GroupIDShardPrefix.String(), getNetworkPrefix(shardID), strconv.Itoa(int(shardID))))
}
// NewClientGroupIDByShardID returns a new groupID for a shard's client
func NewClientGroupIDByShardID(shardID ShardID) GroupID {
if shardID == 0 {
return GroupIDBeaconClient
return GroupID(fmt.Sprintf(GroupIDBeaconClient.String(), getNetworkPrefix(shardID)))
}
return GroupID(fmt.Sprintf(GroupIDShardClientPrefix.String(), strconv.Itoa(int(shardID))))
return GroupID(fmt.Sprintf(GroupIDShardClientPrefix.String(), getNetworkPrefix(shardID), strconv.Itoa(int(shardID))))
}
// ActionType lists action on group
@ -88,12 +102,3 @@ type GroupAction struct {
func (g GroupAction) String() string {
return fmt.Sprintf("%s/%s", g.Name, g.Action)
}
// GroupReceiver is a multicast group message receiver interface.
type GroupReceiver interface {
// Close closes this receiver.
io.Closer
// Receive a message.
Receive(ctx context.Context) (msg []byte, sender libp2p_peer.ID, err error)
}

@ -1,4 +1,4 @@
package p2p
package nodeconfig
import "testing"

@ -246,11 +246,11 @@ func (node *Node) Beaconchain() *core.BlockChain {
func (node *Node) tryBroadcast(tx *types.Transaction) {
msg := proto_node.ConstructTransactionListMessageAccount(types.Transactions{tx})
shardGroupID := p2p.NewGroupIDByShardID(p2p.ShardID(tx.ShardID()))
shardGroupID := nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(tx.ShardID()))
utils.Logger().Info().Str("shardGroupID", string(shardGroupID)).Msg("tryBroadcast")
for attempt := 0; attempt < NumTryBroadCast; attempt++ {
if err := node.host.SendMessageToGroups([]p2p.GroupID{shardGroupID}, p2p_host.ConstructP2pMessage(byte(0), msg)); err != nil && attempt < NumTryBroadCast {
if err := node.host.SendMessageToGroups([]nodeconfig.GroupID{shardGroupID}, p2p_host.ConstructP2pMessage(byte(0), msg)); err != nil && attempt < NumTryBroadCast {
utils.Logger().Error().Int("attempt", attempt).Msg("Error when trying to broadcast tx")
} else {
break
@ -604,15 +604,15 @@ func (node *Node) initNodeConfiguration() (service.NodeConfig, chan p2p.Peer) {
PushgatewayIP: node.NodeConfig.GetPushgatewayIP(),
PushgatewayPort: node.NodeConfig.GetPushgatewayPort(),
IsClient: node.NodeConfig.IsClient(),
Beacon: p2p.GroupIDBeacon,
Beacon: nodeconfig.NewGroupIDByShardID(0),
ShardGroupID: node.NodeConfig.GetShardGroupID(),
Actions: make(map[p2p.GroupID]p2p.ActionType),
Actions: make(map[nodeconfig.GroupID]nodeconfig.ActionType),
}
if nodeConfig.IsClient {
nodeConfig.Actions[p2p.GroupIDBeaconClient] = p2p.ActionStart
nodeConfig.Actions[nodeconfig.NewClientGroupIDByShardID(0)] = nodeconfig.ActionStart
} else {
nodeConfig.Actions[node.NodeConfig.GetShardGroupID()] = p2p.ActionStart
nodeConfig.Actions[node.NodeConfig.GetShardGroupID()] = nodeconfig.ActionStart
}
var err error
@ -621,7 +621,7 @@ func (node *Node) initNodeConfiguration() (service.NodeConfig, chan p2p.Peer) {
utils.Logger().Error().Err(err).Msg("Failed to create shard receiver")
}
node.globalGroupReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeaconClient)
node.globalGroupReceiver, err = node.host.GroupReceiver(nodeconfig.NewClientGroupIDByShardID(0))
if err != nil {
utils.Logger().Error().Err(err).Msg("Failed to create global receiver")
}

@ -4,7 +4,6 @@ import (
"encoding/binary"
"errors"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/host"
"github.com/ethereum/go-ethereum/common"
@ -17,6 +16,7 @@ import (
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
bls_cosi "github.com/harmony-one/harmony/crypto/bls"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/ctxerror"
"github.com/harmony-one/harmony/internal/utils"
)
@ -68,8 +68,8 @@ func (node *Node) BroadcastCXReceiptsWithShardID(block *types.Block, commitSig [
}
utils.Logger().Info().Uint32("ToShardID", toShardID).Msg("[BroadcastCXReceiptsWithShardID] ReadCXReceipts and MerkleProof Found")
groupID := p2p.ShardID(toShardID)
go node.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(groupID)}, host.ConstructP2pMessage(byte(0), proto_node.ConstructCXReceiptsProof(cxReceipts, merkleProof, block.Header(), commitSig, commitBitmap)))
groupID := nodeconfig.ShardID(toShardID)
go node.host.SendMessageToGroups([]nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(groupID)}, host.ConstructP2pMessage(byte(0), proto_node.ConstructCXReceiptsProof(cxReceipts, merkleProof, block.Header(), commitSig, commitBitmap)))
}
// BroadcastMissingCXReceipts broadcasts missing cross shard receipts per request

@ -259,7 +259,7 @@ func (node *Node) stakingMessageHandler(msgPayload []byte) {
// NOTE: For now, just send to the client (basically not broadcasting)
// TODO (lc): broadcast the new blocks to new nodes doing state sync
func (node *Node) BroadcastNewBlock(newBlock *types.Block) {
groups := []p2p.GroupID{node.NodeConfig.GetClientGroupID()}
groups := []nodeconfig.GroupID{node.NodeConfig.GetClientGroupID()}
utils.Logger().Info().Msgf("broadcasting new block %d, group %s", newBlock.NumberU64(), groups[0])
msg := host.ConstructP2pMessage(byte(0), proto_node.ConstructBlocksSyncMessage([]*types.Block{newBlock}))
if err := node.host.SendMessageToGroups(groups, msg); err != nil {
@ -303,7 +303,7 @@ func (node *Node) BroadcastCrossLinkHeader(newBlock *types.Block) {
for _, header := range headers {
utils.Logger().Debug().Msgf("[BroadcastCrossLinkHeader] Broadcasting %d", header.Number().Uint64())
}
node.host.SendMessageToGroups([]p2p.GroupID{node.NodeConfig.GetBeaconGroupID()}, host.ConstructP2pMessage(byte(0), proto_node.ConstructCrossLinkHeadersMessage(headers)))
node.host.SendMessageToGroups([]nodeconfig.GroupID{node.NodeConfig.GetBeaconGroupID()}, host.ConstructP2pMessage(byte(0), proto_node.ConstructCrossLinkHeadersMessage(headers)))
}
// VerifyNewBlock is called by consensus participants to verify the block (account model) they are running consensus on

@ -20,9 +20,9 @@ import (
proto_node "github.com/harmony-one/harmony/api/proto/node"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/ctxerror"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/host"
"github.com/harmony-one/harmony/shard"
)
@ -141,7 +141,7 @@ func (node *Node) broadcastEpochShardState(newBlock *types.Block) error {
},
)
return node.host.SendMessageToGroups(
[]p2p.GroupID{node.NodeConfig.GetClientGroupID()},
[]nodeconfig.GroupID{node.NodeConfig.GetClientGroupID()},
host.ConstructP2pMessage(byte(0), epochShardStateMessage))
}

@ -14,7 +14,6 @@ import (
"github.com/harmony-one/harmony/api/service/networkinfo"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
)
func (node *Node) setupForValidator() {
@ -59,7 +58,7 @@ func (node *Node) setupForNewNode() {
func (node *Node) setupForClientNode() {
// Register networkinfo service. "0" is the beacon shard ID
node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.MustNew(node.host, p2p.GroupIDBeacon, nil, nil, node.networkInfoDHTPath()))
node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.MustNew(node.host, nodeconfig.NewGroupIDByShardID(0), nil, nil, node.networkInfoDHTPath()))
}
func (node *Node) setupForExplorerNode() {
@ -109,8 +108,8 @@ func (node *Node) StopServices() {
func (node *Node) networkInfoDHTPath() string {
return fmt.Sprintf(".dht-%s-%s-c%s",
node.NodeConfig.SelfPeer.IP,
node.NodeConfig.SelfPeer.Port,
node.SelfPeer.IP,
node.SelfPeer.Port,
node.chainConfig.ChainID,
)
}

@ -1,6 +1,7 @@
package p2p
import (
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
libp2p_host "github.com/libp2p/go-libp2p-host"
libp2p_peer "github.com/libp2p/go-libp2p-peer"
)
@ -21,11 +22,11 @@ type Host interface {
ConnectHostPeer(Peer)
// SendMessageToGroups sends a message to one or more multicast groups.
SendMessageToGroups(groups []GroupID, msg []byte) error
SendMessageToGroups(groups []nodeconfig.GroupID, msg []byte) error
// GroupReceiver returns a receiver of messages sent to a multicast group.
// Each call creates a new receiver.
// If multiple receivers are created for the same group,
// a message sent to the group will be delivered to all of the receivers.
GroupReceiver(GroupID) (receiver GroupReceiver, err error)
GroupReceiver(nodeconfig.GroupID) (receiver GroupReceiver, err error)
}

@ -9,6 +9,7 @@ import (
"github.com/rs/zerolog"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
@ -54,7 +55,7 @@ type HostV2 struct {
}
// SendMessageToGroups sends a message to one or more multicast groups.
func (host *HostV2) SendMessageToGroups(groups []p2p.GroupID, msg []byte) error {
func (host *HostV2) SendMessageToGroups(groups []nodeconfig.GroupID, msg []byte) error {
var error error
for _, group := range groups {
err := host.pubsub.Publish(string(group), msg)
@ -100,7 +101,7 @@ func (r *GroupReceiverImpl) Receive(ctx context.Context) (
// GroupReceiver returns a receiver of messages sent to a multicast group.
// See the GroupReceiver interface for details.
func (host *HostV2) GroupReceiver(group p2p.GroupID) (
func (host *HostV2) GroupReceiver(group nodeconfig.GroupID) (
receiver p2p.GroupReceiver, err error,
) {
sub, err := host.pubsub.Subscribe(string(group))

@ -11,7 +11,7 @@ import (
libp2p_pubsub "github.com/libp2p/go-libp2p-pubsub"
libp2p_pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/harmony-one/harmony/p2p"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
mock "github.com/harmony-one/harmony/p2p/host/hostv2/mock"
)
@ -19,7 +19,7 @@ func TestHostV2_SendMessageToGroups(t *testing.T) {
t.Run("Basic", func(t *testing.T) {
mc := gomock.NewController(t)
defer mc.Finish()
groups := []p2p.GroupID{"ABC", "DEF"}
groups := []nodeconfig.GroupID{"ABC", "DEF"}
data := []byte{1, 2, 3}
pubsub := mock.NewMockpubsub(mc)
gomock.InOrder(
@ -34,7 +34,7 @@ func TestHostV2_SendMessageToGroups(t *testing.T) {
t.Run("Error", func(t *testing.T) {
mc := gomock.NewController(t)
defer mc.Finish()
groups := []p2p.GroupID{"ABC", "DEF"}
groups := []nodeconfig.GroupID{"ABC", "DEF"}
data := []byte{1, 2, 3}
pubsub := mock.NewMockpubsub(mc)
gomock.InOrder(

@ -6,6 +6,7 @@ package mock_p2p
import (
gomock "github.com/golang/mock/gomock"
node "github.com/harmony-one/harmony/internal/configs/node"
p2p "github.com/harmony-one/harmony/p2p"
go_libp2p_host "github.com/libp2p/go-libp2p-host"
go_libp2p_peer "github.com/libp2p/go-libp2p-peer"
@ -132,7 +133,7 @@ func (mr *MockHostMockRecorder) ConnectHostPeer(arg0 interface{}) *gomock.Call {
}
// SendMessageToGroups mocks base method
func (m *MockHost) SendMessageToGroups(groups []p2p.GroupID, msg []byte) error {
func (m *MockHost) SendMessageToGroups(groups []node.GroupID, msg []byte) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SendMessageToGroups", groups, msg)
ret0, _ := ret[0].(error)
@ -146,7 +147,7 @@ func (mr *MockHostMockRecorder) SendMessageToGroups(groups, msg interface{}) *go
}
// GroupReceiver mocks base method
func (m *MockHost) GroupReceiver(arg0 p2p.GroupID) (p2p.GroupReceiver, error) {
func (m *MockHost) GroupReceiver(arg0 node.GroupID) (p2p.GroupReceiver, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GroupReceiver", arg0)
ret0, _ := ret[0].(p2p.GroupReceiver)

@ -1,7 +1,9 @@
package p2p
import (
"context"
"fmt"
"io"
"net"
"github.com/harmony-one/bls/ffi/go/bls"
@ -28,3 +30,12 @@ func (p Peer) String() string {
}
return fmt.Sprintf("BlsPubKey:%s-%s/%s[%d]", BlsPubKey, net.JoinHostPort(p.IP, p.Port), p.PeerID, len(p.Addrs))
}
// GroupReceiver is a multicast group message receiver interface.
type GroupReceiver interface {
// Close closes this receiver.
io.Closer
// Receive a message.
Receive(ctx context.Context) (msg []byte, sender libp2p_peer.ID, err error)
}

Loading…
Cancel
Save