Merge pull request #1638 from harmony-ek/include_network_id_in_dht_cache

Include chain ID in DHT cache database directory name
pull/1639/head
Rongjian Lan 5 years ago committed by GitHub
commit 46722716dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 41
      api/service/networkinfo/service.go
  2. 5
      api/service/networkinfo/service_test.go
  3. 4
      node/node.go
  4. 18
      node/service_setup.go

@ -8,6 +8,7 @@ import (
"time"
manet "github.com/multiformats/go-multiaddr-net"
"github.com/pkg/errors"
"github.com/ethereum/go-ethereum/rpc"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
@ -17,6 +18,7 @@ import (
coredis "github.com/libp2p/go-libp2p-core/discovery"
libp2pdis "github.com/libp2p/go-libp2p-discovery"
libp2pdht "github.com/libp2p/go-libp2p-kad-dht"
libp2pdhtopts "github.com/libp2p/go-libp2p-kad-dht/opts"
peerstore "github.com/libp2p/go-libp2p-peerstore"
)
@ -56,16 +58,31 @@ const (
discoveryLimit = 32
)
// New returns role conversion service.
func New(h p2p.Host, rendezvous p2p.GroupID, peerChan chan p2p.Peer, bootnodes utils.AddrList) *Service {
// New returns role conversion service. If dataStorePath is not empty, it
// points to a persistent database directory to use.
func New(
h p2p.Host, rendezvous p2p.GroupID, peerChan chan p2p.Peer,
bootnodes utils.AddrList, dataStorePath string,
) (*Service, error) {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(context.Background(), connectionTimeout)
dataStore, err := badger.NewDatastore(fmt.Sprintf(".dht-%s-%s", h.GetSelfPeer().IP, h.GetSelfPeer().Port), nil)
if err != nil {
panic(err)
var dhtOpts []libp2pdhtopts.Option
if dataStorePath != "" {
dataStore, err := badger.NewDatastore(dataStorePath, nil)
if err != nil {
return nil, errors.Wrapf(err,
"cannot open Badger datastore at %s", dataStorePath)
}
utils.Logger().Info().
Str("dataStorePath", dataStorePath).
Msg("backing DHT with Badger datastore")
dhtOpts = append(dhtOpts, libp2pdhtopts.Datastore(dataStore))
}
dht := libp2pdht.NewDHT(ctx, h.GetP2PHost(), dataStore)
dht, err := libp2pdht.New(ctx, h.GetP2PHost(), dhtOpts...)
if err != nil {
return nil, errors.Wrapf(err, "cannot create DHT")
}
return &Service{
Host: h,
@ -78,7 +95,19 @@ func New(h p2p.Host, rendezvous p2p.GroupID, peerChan chan p2p.Peer, bootnodes u
bootnodes: bootnodes,
discovery: nil,
started: false,
}, nil
}
// MustNew is a panic-on-error version of New.
func MustNew(
h p2p.Host, rendezvous p2p.GroupID, peerChan chan p2p.Peer,
bootnodes utils.AddrList, dataStorePath string,
) *Service {
service, err := New(h, rendezvous, peerChan, bootnodes, dataStorePath)
if err != nil {
panic(err)
}
return service
}
// StartService starts network info service.

@ -28,7 +28,10 @@ func TestService(t *testing.T) {
t.Fatal("unable to new host in harmony")
}
s := New(host, p2p.GroupIDBeaconClient, nil, nil)
s, err := New(host, p2p.GroupIDBeaconClient, nil, nil, "")
if err != nil {
t.Fatalf("New() failed: %s", err)
}
s.StartService()

@ -192,6 +192,9 @@ type Node struct {
// node configuration, including group ID, shard ID, etc
NodeConfig *nodeconfig.ConfigType
// Chain configuration.
chainConfig params.ChainConfig
// map of service type to its message channel.
serviceMessageChan map[service.Type]chan *msg_pb.Message
@ -373,6 +376,7 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, chainDBFactory shardc
case nodeconfig.Pangaea:
chainConfig = *params.PangaeaChainConfig
}
node.chainConfig = chainConfig
collection := shardchain.NewCollection(
chainDBFactory, &genesisInitializer{&node}, chain.Engine, &chainConfig)

@ -1,6 +1,8 @@
package node
import (
"fmt"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/api/service"
"github.com/harmony-one/harmony/api/service/blockproposal"
@ -22,7 +24,7 @@ func (node *Node) setupForValidator() {
// Register peer discovery service. No need to do staking for beacon chain node.
node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer, node.AddBeaconPeer))
// Register networkinfo service. "0" is the beacon shard ID
node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, node.NodeConfig.GetShardGroupID(), chanPeer, nil))
node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.MustNew(node.host, node.NodeConfig.GetShardGroupID(), chanPeer, nil, node.networkInfoDHTPath()))
// Register consensus service.
node.serviceManager.RegisterService(service.Consensus, consensus.New(node.BlockChannel, node.Consensus, node.startConsensus))
// Register new block service.
@ -51,7 +53,7 @@ func (node *Node) setupForNewNode() {
// Register peer discovery service. "0" is the beacon shard ID
node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer, node.AddBeaconPeer))
// Register networkinfo service. "0" is the beacon shard ID
node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, node.NodeConfig.GetBeaconGroupID(), chanPeer, nil))
node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.MustNew(node.host, node.NodeConfig.GetBeaconGroupID(), chanPeer, nil, node.networkInfoDHTPath()))
// Register new metrics service
if node.NodeConfig.GetMetricsFlag() {
node.serviceManager.RegisterService(service.Metrics, metrics.New(&node.SelfPeer, node.NodeConfig.ConsensusPubKey.SerializeToHexStr(), node.NodeConfig.GetPushgatewayIP(), node.NodeConfig.GetPushgatewayPort()))
@ -60,7 +62,7 @@ func (node *Node) setupForNewNode() {
func (node *Node) setupForClientNode() {
// Register networkinfo service. "0" is the beacon shard ID
node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, nil, nil))
node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.MustNew(node.host, p2p.GroupIDBeacon, nil, nil, node.networkInfoDHTPath()))
}
func (node *Node) setupForExplorerNode() {
@ -69,7 +71,7 @@ func (node *Node) setupForExplorerNode() {
// Register peer discovery service.
node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer, nil))
// Register networkinfo service.
node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, node.NodeConfig.GetShardGroupID(), chanPeer, nil))
node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.MustNew(node.host, node.NodeConfig.GetShardGroupID(), chanPeer, nil, node.networkInfoDHTPath()))
// Register explorer service.
node.serviceManager.RegisterService(service.SupportExplorer, explorer.New(&node.SelfPeer, node.NodeConfig.GetShardID(), node.Consensus.GetNodeIDs, node.GetBalanceOfAddress))
// Register explorer service.
@ -107,3 +109,11 @@ func (node *Node) StopServices() {
}
node.serviceManager.StopServicesByRole([]service.Type{})
}
func (node *Node) networkInfoDHTPath() string {
return fmt.Sprintf(".dht-%s-%s-c%s",
node.NodeConfig.SelfPeer.IP,
node.NodeConfig.SelfPeer.Port,
node.chainConfig.ChainID,
)
}

Loading…
Cancel
Save