diff --git a/api/service/syncing/syncing.go b/api/service/syncing/syncing.go index b8b840b61..d43d4f0a4 100644 --- a/api/service/syncing/syncing.go +++ b/api/service/syncing/syncing.go @@ -206,7 +206,7 @@ func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer) error { }(peer) } wg.Wait() - utils.GetLogInstance().Info("[SYNC] Finished making connection to peers.") + utils.GetLogInstance().Info("[SYNC] Finished making connection to peers.", "len", len(ss.syncConfig.peers)) return nil } @@ -529,8 +529,10 @@ func (ss *StateSync) generateNewState(bc *core.BlockChain, worker *worker.Worker } // ProcessStateSync processes state sync from the blocks received but not yet processed so far -func (ss *StateSync) ProcessStateSync(startHash []byte, bc *core.BlockChain, worker *worker.Worker) { - ss.RegisterNodeInfo() +func (ss *StateSync) ProcessStateSync(startHash []byte, bc *core.BlockChain, worker *worker.Worker, isBeacon bool) { + if !isBeacon { + ss.RegisterNodeInfo() + } // Gets consensus hashes. if !ss.GetConsensusHashes(startHash) { utils.GetLogInstance().Debug("[SYNC] ProcessStateSync unable to reach consensus on ss.GetConsensusHashes") @@ -614,14 +616,14 @@ func (ss *StateSync) IsOutOfSync(bc *core.BlockChain) bool { } // SyncLoop will keep syncing with peers until catches up -func (ss *StateSync) SyncLoop(bc *core.BlockChain, worker *worker.Worker, willJoinConsensus bool) { +func (ss *StateSync) SyncLoop(bc *core.BlockChain, worker *worker.Worker, willJoinConsensus bool, isBeacon bool) { for { - if !ss.IsOutOfSync(bc) { + if !isBeacon && !ss.IsOutOfSync(bc) { utils.GetLogInstance().Info("[SYNC] Node is now IN SYNC!") return } startHash := bc.CurrentBlock().Hash() - ss.ProcessStateSync(startHash[:], bc, worker) + ss.ProcessStateSync(startHash[:], bc, worker, isBeacon) } } diff --git a/cmd/harmony/main.go b/cmd/harmony/main.go index 9a7cef20e..462575d56 100644 --- a/cmd/harmony/main.go +++ b/cmd/harmony/main.go @@ -297,10 +297,9 @@ func main() { if consensus.IsLeader { go currentNode.SendPongMessage() } - // TODO: enable beacon chain sync - //if consensus.ShardID != 0 { - // go currentNode.SupportBeaconSyncing() - //} + if consensus.ShardID != 0 { + go currentNode.SupportBeaconSyncing() + } go currentNode.SupportSyncing() utils.GetLogInstance().Info("New Harmony Node ====", "Role", currentNode.NodeConfig.Role(), "multiaddress", fmt.Sprintf("/ip4/%s/tcp/%s/p2p/%s", *ip, *port, nodeConfig.Host.GetID().Pretty())) currentNode.ServiceManagerSetup() diff --git a/core/resharding.go b/core/resharding.go index c671ec2af..d4c099c82 100644 --- a/core/resharding.go +++ b/core/resharding.go @@ -24,7 +24,7 @@ const ( // GenesisShardNum is the number of shard at genesis GenesisShardNum = 4 // GenesisShardSize is the size of each shard at genesis - GenesisShardSize = 50 + GenesisShardSize = 5 // CuckooRate is the percentage of nodes getting reshuffled in the second step of cuckoo resharding. CuckooRate = 0.1 ) diff --git a/node/node_handler.go b/node/node_handler.go index 7d83a9405..58f889ca9 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -146,7 +146,7 @@ func (node *Node) messageHandler(content []byte, sender string) { // for non-beaconchain node, subscribe to beacon block broadcast role := node.NodeConfig.Role() if proto_node.BlockMessageType(msgPayload[0]) == proto_node.Sync && (role == nodeconfig.ShardValidator || role == nodeconfig.ShardLeader || role == nodeconfig.NewNode) { - utils.GetLogInstance().Info("Block being handled by block channel", "self peer", node.SelfPeer) + utils.GetLogInstance().Info("Block being handled by block channel", "self peer", node.SelfPeer, "block", blocks[0].NumberU64()) for _, block := range blocks { node.BeaconBlockChannel <- block } diff --git a/node/node_syncing.go b/node/node_syncing.go index 3e723d395..4c9965e93 100644 --- a/node/node_syncing.go +++ b/node/node_syncing.go @@ -50,6 +50,26 @@ func (node *Node) GetSyncingPeers() []p2p.Peer { return node.getNeighborPeers(&node.Neighbors) } +// DoBeaconSyncing update received beaconchain blocks and downloads missing beacon chain blocks +func (node *Node) DoBeaconSyncing() { + for { + select { + case beaconBlock := <-node.BeaconBlockChannel: + if node.beaconSync == nil { + node.beaconSync = syncing.CreateStateSync(node.SelfPeer.IP, node.SelfPeer.Port, node.GetSyncID()) + } + if node.beaconSync.GetActivePeerNumber() == 0 { + peers := node.GetBeaconSyncingPeers() + if err := node.beaconSync.CreateSyncConfig(peers); err != nil { + ctxerror.Log15(utils.GetLogInstance().Debug, err) + } + } + node.beaconSync.AddLastMileBlock(beaconBlock) + node.beaconSync.SyncLoop(node.beaconChain, node.BeaconWorker, false, true) + } + } +} + // DoSyncing keep the node in sync with other peers, willJoinConsensus means the node will try to join consensus after catch up func (node *Node) DoSyncing(bc *core.BlockChain, worker *worker.Worker, getPeers func() []p2p.Peer, willJoinConsensus bool) { ticker := time.NewTicker(SyncFrequency * time.Second) @@ -73,7 +93,7 @@ SyncingLoop: node.stateMutex.Lock() node.State = NodeNotInSync node.stateMutex.Unlock() - node.stateSync.SyncLoop(bc, worker, willJoinConsensus) + node.stateSync.SyncLoop(bc, worker, willJoinConsensus, false) if willJoinConsensus { node.stateMutex.Lock() node.State = NodeReadyForConsensus @@ -93,9 +113,7 @@ SyncingLoop: // SupportBeaconSyncing sync with beacon chain for archival node in beacon chan or non-beacon node func (node *Node) SupportBeaconSyncing() { - node.InitSyncingServer() - node.StartSyncingServer() - go node.DoSyncing(node.beaconChain, node.BeaconWorker, node.GetBeaconSyncingPeers, false) + node.DoBeaconSyncing() } // SupportSyncing keeps sleeping until it's doing consensus or it's a leader.