From 0d5dcfcae301a3fb356f4f445ca6dcfe58e15450 Mon Sep 17 00:00:00 2001 From: chao Date: Thu, 21 Mar 2019 18:14:33 -0700 Subject: [PATCH] refactor state syncing; unify state sync and beacon sync (i.e. depends on whether node needs to join consensus after fully synced) --- api/service/discovery/service.go | 2 +- api/service/syncing/syncing.go | 22 +++++- cmd/harmony/main.go | 2 +- consensus/consensus.go | 55 +++++++------ consensus/consensus_validator.go | 29 ------- node/node.go | 5 -- node/node_handler.go | 1 - node/node_syncing.go | 131 +++++++++++-------------------- node/service_setup.go | 14 ++-- test/configs/oneshard2.txt | 1 + 10 files changed, 103 insertions(+), 159 deletions(-) diff --git a/api/service/discovery/service.go b/api/service/discovery/service.go index b4b1804bc..4247ab296 100644 --- a/api/service/discovery/service.go +++ b/api/service/discovery/service.go @@ -89,6 +89,7 @@ func (s *Service) contactP2pPeers() { // It is just a temporary hack. When we work on re-sharding to regular shard, this has to be changed. if !s.config.IsBeacon { if s.addBeaconPeerFunc != nil { + utils.GetLogInstance().Debug("hehe", "peer", peer) s.addBeaconPeerFunc(&peer) } } @@ -135,7 +136,6 @@ func (s *Service) sentPingMessage(g p2p.GroupID, msgBuf []byte) { if err != nil { utils.GetLogInstance().Error("Failed to send ping message", "group", g) } - } // Init is to initialize for discoveryService. diff --git a/api/service/syncing/syncing.go b/api/service/syncing/syncing.go index 20e5747bf..ad10ffb02 100644 --- a/api/service/syncing/syncing.go +++ b/api/service/syncing/syncing.go @@ -26,7 +26,7 @@ const ( TimesToFail = 5 // Downloadblocks service retry limit RegistrationNumber = 3 SyncingPortDifference = 3000 - inSyncThreshold = 1 // when peerBlockHeight - myBlockHeight <= inSyncThreshold, it's ready to join consensus + inSyncThreshold = 0 // when peerBlockHeight - myBlockHeight <= inSyncThreshold, it's ready to join consensus ) // SyncPeerConfig is peer config to sync. @@ -183,6 +183,15 @@ func (ss *StateSync) MakeConnectionToPeers() { utils.GetLogInstance().Info("[SYNC] Finished making connection to peers.") } +// GetActivePeerNumber returns the number of active peers +func (ss *StateSync) GetActivePeerNumber() int { + if ss.syncConfig == nil || len(ss.syncConfig.peers) == 0 { + return 0 + } + ss.CleanUpNilPeers() + return ss.activePeerNumber +} + // CleanUpNilPeers cleans up peer with nil client and recalculate activePeerNumber. func (ss *StateSync) CleanUpNilPeers() { ss.activePeerNumber = 0 @@ -574,11 +583,16 @@ func (ss *StateSync) getMaxPeerHeight() uint64 { return maxHeight } +// IsOutOfSync checks whether the node is out of sync from other peers +func (ss *StateSync) IsOutOfSync() bool { + otherHeight := ss.getMaxPeerHeight() + return ss.currentHeight+inSyncThreshold < otherHeight +} + // SyncLoop will keep syncing with peers until catches up -func (ss *StateSync) SyncLoop(bc *core.BlockChain, worker *worker.Worker) { +func (ss *StateSync) SyncLoop(bc *core.BlockChain, worker *worker.Worker, willJoinConsensus bool) { for { - otherHeight := ss.getMaxPeerHeight() - if ss.currentHeight+inSyncThreshold >= otherHeight { + if !ss.IsOutOfSync() { utils.GetLogInstance().Info("[SYNC] Node is now IN SYNC!") return } diff --git a/cmd/harmony/main.go b/cmd/harmony/main.go index f3ccc762c..f8d60a6d1 100644 --- a/cmd/harmony/main.go +++ b/cmd/harmony/main.go @@ -264,7 +264,7 @@ func main() { if *isArchival { currentNode, nodeConfig = setupArchivalNode(nodeConfig) loggingInit(*logFolder, nodeConfig.StringRole, *ip, *port, *onlyLogTps) - go currentNode.DoBeaconSyncing() + go currentNode.SupportBeaconSyncing() } else { // Start Profiler for leader if profile argument is on if nodeConfig.StringRole == "leader" && (*profile || *metricsReportURL != "") { diff --git a/consensus/consensus.go b/consensus/consensus.go index 6b60c1070..848248b65 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -76,6 +76,8 @@ type Consensus struct { blockHashes [][32]byte // Shard Id which this node belongs to ShardID uint32 + // whether to ignore consensusID check + ignoreConsensusIDCheck bool // global consensus mutex mutex sync.Mutex @@ -92,11 +94,12 @@ type Consensus struct { // The verifier func passed from Node object BlockVerifier func(*types.Block) bool - // current consensus block to check if out of sync - ConsensusBlock chan *BFTBlockInfo // verified block to state sync broadcast VerifiedNewBlock chan *types.Block + // will trigger state syncing when consensus ID is low + ConsensusIDLowChan chan struct{} + // Channel for DRG protocol to send pRnd (preimage of randomness resulting from combined vrf randomnesses) to consensus. The first 32 bytes are randomness, the rest is for bitmap. PRndChannel chan []byte // Channel for DRG protocol to send the final randomness to consensus. The first 32 bytes are the randomness and the last 32 bytes are the hash of the block where the corresponding pRnd was generated @@ -115,13 +118,6 @@ type Consensus struct { OfflinePeerList []p2p.Peer } -// BFTBlockInfo send the latest block that was in BFT consensus process as well as its consensusID to state syncing -// consensusID is necessary to make sure the out of sync node can enter the correct view -type BFTBlockInfo struct { - Block *types.Block - ConsensusID uint32 -} - // BlockConsensusStatus used to keep track of the consensus status of multiple blocks received so far // This is mainly used in the case that this node is lagging behind and needs to catch up. // For example, the consensus moved to round N and this node received message(N). @@ -133,16 +129,6 @@ type BlockConsensusStatus struct { state State // the latest state of the consensus } -// UpdateConsensusID is used to update latest consensusID for nodes that out of sync -func (consensus *Consensus) UpdateConsensusID(consensusID uint32) { - consensus.mutex.Lock() - defer consensus.mutex.Unlock() - if consensus.consensusID < consensusID { - utils.GetLogInstance().Debug("update consensusID", "myConsensusID", consensus.consensusID, "newConsensusID", consensusID) - consensus.consensusID = consensusID - } -} - // WaitForNewRandomness listens to the RndChannel to receive new VDF randomness. func (consensus *Consensus) WaitForNewRandomness() { go func() { @@ -171,6 +157,7 @@ func (consensus *Consensus) GetNextRnd() ([32]byte, [32]byte, error) { func New(host p2p.Host, ShardID uint32, peers []p2p.Peer, leader p2p.Peer, blsPriKey *bls.SecretKey) *Consensus { consensus := Consensus{} consensus.host = host + consensus.ConsensusIDLowChan = make(chan struct{}) selfPeer := host.GetSelfPeer() if leader.Port == selfPeer.Port && leader.IP == selfPeer.IP { @@ -258,20 +245,36 @@ func (consensus *Consensus) checkConsensusMessage(message *msg_pb.Message, publi utils.GetLogInstance().Warn("Failed to verify the message signature", "Error", err) return consensus_engine.ErrInvalidConsensusMessage } - - // check consensus Id - if consensusID != consensus.consensusID { - utils.GetLogInstance().Warn("Wrong consensus Id", "myConsensusId", consensus.consensusID, "theirConsensusId", consensusID, "consensus", consensus) - return consensus_engine.ErrConsensusIDNotMatch - } - if !bytes.Equal(blockHash, consensus.blockHash[:]) { utils.GetLogInstance().Warn("Wrong blockHash", "consensus", consensus) return consensus_engine.ErrInvalidConsensusMessage } + + // just ignore consensus check for the first time when node join + if consensus.ignoreConsensusIDCheck { + consensus.consensusID = consensusID + consensus.ToggleConsensusCheck() + return nil + } else if consensusID != consensus.consensusID { + utils.GetLogInstance().Warn("Wrong consensus Id", "myConsensusId", consensus.consensusID, "theirConsensusId", consensusID, "consensus", consensus) + // notify state syncing to start + select { + case consensus.ConsensusIDLowChan <- struct{}{}: + default: + } + + return consensus_engine.ErrConsensusIDNotMatch + } return nil } +// ToggleConsensusCheck flip the flag of whether ignore consensusID check during consensus process +func (consensus *Consensus) ToggleConsensusCheck() { + consensus.mutex.Lock() + defer consensus.mutex.Unlock() + consensus.ignoreConsensusIDCheck = !consensus.ignoreConsensusIDCheck +} + // GetPeerByAddress the validator peer based on validator Address. func (consensus *Consensus) GetPeerByAddress(validatorAddress string) *p2p.Peer { v, ok := consensus.validators.Load(validatorAddress) diff --git a/consensus/consensus_validator.go b/consensus/consensus_validator.go index c67dd3dbe..d84b06911 100644 --- a/consensus/consensus_validator.go +++ b/consensus/consensus_validator.go @@ -6,7 +6,6 @@ import ( protobuf "github.com/golang/protobuf/proto" "github.com/harmony-one/bls/ffi/go/bls" msg_pb "github.com/harmony-one/harmony/api/proto/message" - consensus_engine "github.com/harmony-one/harmony/consensus/engine" "github.com/harmony-one/harmony/core/types" bls_cosi "github.com/harmony-one/harmony/crypto/bls" "github.com/harmony-one/harmony/internal/attack" @@ -15,30 +14,6 @@ import ( "github.com/harmony-one/harmony/p2p/host" ) -// sendBFTBlockToStateSyncing will send the latest BFT consensus block to state syncing checkingjjkkkkkkkkkkkkkkkjnjk -func (consensus *Consensus) sendBFTBlockToStateSyncing(consensusID uint32) { - // validator send consensus block to state syncing - if val, ok := consensus.blocksReceived[consensusID]; ok { - consensus.mutex.Lock() - delete(consensus.blocksReceived, consensusID) - consensus.mutex.Unlock() - - var blockObj types.Block - err := rlp.DecodeBytes(val.block, &blockObj) - if err != nil { - utils.GetLogInstance().Debug("failed to construct the cached block") - return - } - blockInfo := &BFTBlockInfo{Block: &blockObj, ConsensusID: consensusID} - select { - case consensus.ConsensusBlock <- blockInfo: - default: - utils.GetLogInstance().Warn("consensus block unable to sent to state sync", "height", blockObj.NumberU64(), "blockHash", blockObj.Hash().Hex()) - } - } - return -} - // IsValidatorMessage checks if a message is to be sent to a validator. func (consensus *Consensus) IsValidatorMessage(message *msg_pb.Message) bool { return message.ReceiverType == msg_pb.ReceiverType_VALIDATOR && message.ServiceType == msg_pb.ServiceType_CONSENSUS @@ -94,10 +69,6 @@ func (consensus *Consensus) processAnnounceMessage(message *msg_pb.Message) { if err := consensus.checkConsensusMessage(message, consensus.leader.ConsensusPubKey); err != nil { utils.GetLogInstance().Debug("Failed to check the leader message") - if err == consensus_engine.ErrConsensusIDNotMatch { - utils.GetLogInstance().Debug("sending bft block to state syncing") - consensus.sendBFTBlockToStateSyncing(consensusID) - } return } diff --git a/node/node.go b/node/node.go index 0030524af..e889c54a2 100644 --- a/node/node.go +++ b/node/node.go @@ -162,9 +162,6 @@ type Node struct { // Channel to notify consensus service to really start consensus startConsensus chan struct{} - // channel to notify the peers are ready - peerReadyChan chan struct{} - // node configuration, including group ID, shard ID, etc NodeConfig *nodeconfig.ConfigType @@ -265,7 +262,6 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, db ethdb.Database) *N node.TxPool = core.NewTxPool(core.DefaultTxPoolConfig, params.TestChainConfig, chain) node.Worker = worker.New(params.TestChainConfig, chain, node.Consensus, pki.GetAddressFromPublicKey(node.SelfPeer.ConsensusPubKey), node.Consensus.ShardID) - node.Consensus.ConsensusBlock = make(chan *consensus.BFTBlockInfo) node.Consensus.VerifiedNewBlock = make(chan *types.Block) if isFirstTime { @@ -299,7 +295,6 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, db ethdb.Database) *N go node.ReceiveGroupMessage() node.startConsensus = make(chan struct{}) - node.peerReadyChan = make(chan struct{}) // init the global and the only node config node.NodeConfig = nodeconfig.GetConfigs(nodeconfig.Global) diff --git a/node/node_handler.go b/node/node_handler.go index 0c0360642..3ff7a6c15 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -490,7 +490,6 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int { if len(peers) > 0 { node.AddPeers(peers) - node.peerReadyChan <- struct{}{} } // Reset Validator PublicKeys every time we receive PONG message from Leader diff --git a/node/node_syncing.go b/node/node_syncing.go index e53b97c73..27bfde520 100644 --- a/node/node_syncing.go +++ b/node/node_syncing.go @@ -12,16 +12,19 @@ import ( "github.com/harmony-one/harmony/api/service/syncing" "github.com/harmony-one/harmony/api/service/syncing/downloader" downloader_pb "github.com/harmony-one/harmony/api/service/syncing/downloader/proto" - "github.com/harmony-one/harmony/consensus" + "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/types" + nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/internal/utils" + "github.com/harmony-one/harmony/node/worker" "github.com/harmony-one/harmony/p2p" ) // Constants related to doing syncing. const ( lastMileThreshold = 4 - inSyncThreshold = 1 + inSyncThreshold = 1 // unit in number of block + SyncFrequency = 10 // unit in second ) // GetSyncingPort returns the syncing port. @@ -61,106 +64,64 @@ func (node *Node) GetSyncingPeers() []p2p.Peer { return node.getNeighborPeers(&node.Neighbors) } -// DoBeaconSyncing update received beaconchain blocks and downloads missing beacon chain blocks -func (node *Node) DoBeaconSyncing() { - for { - select { - case beaconBlock := <-node.BeaconBlockChannel: - if node.beaconSync == nil { - node.beaconSync = syncing.CreateStateSync(node.SelfPeer.IP, node.SelfPeer.Port, node.GetSyncID()) - node.beaconSync.CreateSyncConfig(node.GetBeaconSyncingPeers()) - node.beaconSync.MakeConnectionToPeers() - } - startHash := node.beaconChain.CurrentBlock().Hash() - node.beaconSync.AddLastMileBlock(beaconBlock) - node.beaconSync.ProcessStateSync(startHash[:], node.beaconChain, node.BeaconWorker) - utils.GetLogInstance().Debug("[SYNC] STARTING BEACON SYNC") - } - } -} - -// IsOutOfSync checks whether the node is out of sync by comparing latest block with consensus block -func (node *Node) IsOutOfSync(consensusBlockInfo *consensus.BFTBlockInfo) bool { - consensusBlock := consensusBlockInfo.Block - consensusID := consensusBlockInfo.ConsensusID - - myHeight := node.blockchain.CurrentBlock().NumberU64() - newHeight := consensusBlock.NumberU64() - utils.GetLogInstance().Debug("[SYNC]", "myHeight", myHeight, "newHeight", newHeight) - if newHeight-myHeight <= inSyncThreshold { - node.stateSync.AddLastMileBlock(consensusBlock) - node.Consensus.UpdateConsensusID(consensusID + 1) - return false - } - // cache latest blocks for last mile catch up - if newHeight-myHeight <= lastMileThreshold && node.stateSync != nil { - node.stateSync.AddLastMileBlock(consensusBlock) - } - return true -} +// DoSyncing keep the node in sync with other peers, willJoinConsensus means the node will try to join consensus after catch up +func (node *Node) DoSyncing(bc *core.BlockChain, worker *worker.Worker, getPeers func() []p2p.Peer, willJoinConsensus bool) { + ticker := time.NewTicker(SyncFrequency * time.Second) -// DoSync syncs with peers until catchup, this function is not coupled with consensus -func (node *Node) DoSync() { - <-node.peerReadyChan - ss := syncing.CreateStateSync(node.SelfPeer.IP, node.SelfPeer.Port, node.GetSyncID()) - if ss.CreateSyncConfig(node.GetSyncingPeers()) { - ss.MakeConnectionToPeers() - ss.SyncLoop(node.blockchain, node.Worker) - } -} - -// DoSyncing wait for check status and starts syncing if out of sync -func (node *Node) DoSyncing() { +SyncingLoop: for { select { - // in current implementation logic, timeout means in sync - case <-time.After(5 * time.Second): - //myHeight := node.blockchain.CurrentBlock().NumberU64() - //utils.GetLogInstance().Debug("[SYNC]", "currentHeight", myHeight) - node.stateMutex.Lock() - node.State = NodeReadyForConsensus - node.stateMutex.Unlock() - continue - case consensusBlockInfo := <-node.Consensus.ConsensusBlock: - if !node.IsOutOfSync(consensusBlockInfo) { - startHash := node.blockchain.CurrentBlock().Hash() - node.stateSync.ProcessStateSync(startHash[:], node.blockchain, node.Worker) - if node.State == NodeNotInSync { - utils.GetLogInstance().Info("[SYNC] Node is now IN SYNC!") + case <-ticker.C: + if willJoinConsensus { + <-node.Consensus.ConsensusIDLowChan + } + if node.stateSync == nil { + node.stateSync = syncing.CreateStateSync(node.SelfPeer.IP, node.SelfPeer.Port, node.GetSyncID()) + } + if node.stateSync.GetActivePeerNumber() == 0 { + if node.stateSync.CreateSyncConfig(getPeers()) { + node.stateSync.MakeConnectionToPeers() + } else { + utils.GetLogInstance().Debug("[SYNC] no active peers, continue SyncingLoop") + continue SyncingLoop } - node.stateMutex.Lock() - node.State = NodeReadyForConsensus - node.stateMutex.Unlock() - node.stateSync.CloseConnections() - node.stateSync = nil - continue - } else { - utils.GetLogInstance().Debug("[SYNC] node is out of sync") + } + if node.stateSync.IsOutOfSync() { + utils.GetLogInstance().Debug("[SYNC] out of sync, doing syncing") node.stateMutex.Lock() node.State = NodeNotInSync node.stateMutex.Unlock() + node.stateSync.SyncLoop(bc, worker, willJoinConsensus) + if willJoinConsensus { + node.stateMutex.Lock() + node.State = NodeReadyForConsensus + node.stateMutex.Unlock() + node.Consensus.ToggleConsensusCheck() + } } - - if node.stateSync == nil { - node.stateSync = syncing.CreateStateSync(node.SelfPeer.IP, node.SelfPeer.Port, node.GetSyncID()) - node.stateSync.CreateSyncConfig(node.GetSyncingPeers()) - node.stateSync.MakeConnectionToPeers() - } - startHash := node.blockchain.CurrentBlock().Hash() - node.stateSync.ProcessStateSync(startHash[:], node.blockchain, node.Worker) + node.stateMutex.Lock() + node.State = NodeReadyForConsensus + node.stateMutex.Unlock() } } } +// SupportBeaconSyncing sync with beacon chain for archival node in beacon chan or non-beacon node +func (node *Node) SupportBeaconSyncing() { + node.InitSyncingServer() + node.StartSyncingServer() + go node.DoSyncing(node.beaconChain, node.BeaconWorker, node.GetBeaconSyncingPeers, false) +} + // SupportSyncing keeps sleeping until it's doing consensus or it's a leader. func (node *Node) SupportSyncing() { node.InitSyncingServer() node.StartSyncingServer() - - //go node.DoSyncing() - go node.DoSync() - go node.DoBeaconSyncing() go node.SendNewBlockToUnsync() + + if node.NodeConfig.Role() != nodeconfig.ShardLeader && node.NodeConfig.Role() != nodeconfig.BeaconLeader { + go node.DoSyncing(node.blockchain, node.Worker, node.GetSyncingPeers, true) + } } // InitSyncingServer starts downloader server. diff --git a/node/service_setup.go b/node/service_setup.go index 591bb3ba2..48022dcae 100644 --- a/node/service_setup.go +++ b/node/service_setup.go @@ -21,7 +21,7 @@ func (node *Node) setupForShardLeader() { nodeConfig, chanPeer := node.initNodeConfiguration(false, false) // Register peer discovery service. No need to do staking for beacon chain node. - node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer, node.AddBeaconPeer)) + node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer, node.AddBeaconPeer, node.beaconPeerReadyChan)) // Register networkinfo service. "0" is the beacon shard ID node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer, nil)) @@ -43,7 +43,7 @@ func (node *Node) setupForShardValidator() { // Register client support service. node.serviceManager.RegisterService(service.ClientSupport, clientsupport.New(node.blockchain.State, node.CallFaucetContract, node.getDeployedStakingContract, node.SelfPeer.IP, node.SelfPeer.Port)) // Register peer discovery service. "0" is the beacon shard ID. No need to do staking for beacon chain node. - node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer, node.AddBeaconPeer)) + node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer, node.AddBeaconPeer, node.beaconPeerReadyChan)) // Register networkinfo service. "0" is the beacon shard ID node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer, nil)) } @@ -52,7 +52,7 @@ func (node *Node) setupForBeaconLeader() { nodeConfig, chanPeer := node.initNodeConfiguration(true, false) // Register peer discovery service. No need to do staking for beacon chain node. - node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer, nil)) + node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer, nil, nil)) // Register networkinfo service. node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer, nil)) // Register consensus service. @@ -78,7 +78,7 @@ func (node *Node) setupForBeaconValidator() { // Register client support service. node.serviceManager.RegisterService(service.ClientSupport, clientsupport.New(node.blockchain.State, node.CallFaucetContract, node.getDeployedStakingContract, node.SelfPeer.IP, node.SelfPeer.Port)) // Register peer discovery service. No need to do staking for beacon chain node. - node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer, nil)) + node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer, nil, nil)) // Register networkinfo service. node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer, nil)) } @@ -90,7 +90,7 @@ func (node *Node) setupForNewNode() { // Register staking service. node.serviceManager.RegisterService(service.Staking, staking.New(node.host, node.AccountKey, node.beaconChain, node.NodeConfig.ConsensusPubKey.GetAddress())) // Register peer discovery service. "0" is the beacon shard ID - node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer, node.AddBeaconPeer)) + node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer, node.AddBeaconPeer, nil)) // Register networkinfo service. "0" is the beacon shard ID node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer, nil)) @@ -101,7 +101,7 @@ func (node *Node) setupForClientNode() { nodeConfig, chanPeer := node.initNodeConfiguration(false, true) // Register peer discovery service. - node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer, node.AddBeaconPeer)) + node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer, node.AddBeaconPeer, node.beaconPeerReadyChan)) // Register networkinfo service. "0" is the beacon shard ID node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer, nil)) } @@ -109,7 +109,7 @@ func (node *Node) setupForClientNode() { func (node *Node) setupForArchivalNode() { nodeConfig, chanPeer := node.initNodeConfiguration(false, false) // Register peer discovery service. - node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer, node.AddBeaconPeer)) + node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer, node.AddBeaconPeer, node.beaconPeerReadyChan)) // Register networkinfo service. "0" is the beacon shard ID node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer, nil)) //TODO: Add Syncing as a service. diff --git a/test/configs/oneshard2.txt b/test/configs/oneshard2.txt index d5e64632b..e6ed12a9f 100644 --- a/test/configs/oneshard2.txt +++ b/test/configs/oneshard2.txt @@ -4,5 +4,6 @@ 127.0.0.1 9003 validator 0 127.0.0.1 9004 validator 0 127.0.0.1 9005 validator 0 +127.0.0.1 9009 archival 0 127.0.0.1 9010 newnode 0 127.0.0.1 19999 client 0