diff --git a/.gitignore b/.gitignore index 35e76428b..5ab1b3342 100644 --- a/.gitignore +++ b/.gitignore @@ -64,3 +64,6 @@ node_modules/ # go mod summary file go.sum + +# bnkey +test/.bnkey diff --git a/api/service/syncing/downloader/client.go b/api/service/syncing/downloader/client.go index d17c5e24c..dfb5d5cac 100644 --- a/api/service/syncing/downloader/client.go +++ b/api/service/syncing/downloader/client.go @@ -27,7 +27,7 @@ func ClientSetup(ip, port string) *Client { utils.GetLogInstance().Info("client.go:ClientSetup fail to dial: ", "error", err) return nil } - + utils.GetLogInstance().Info("[SYNC] grpc connect successfully", "ip", ip, "port", port) client.dlClient = pb.NewDownloaderClient(client.conn) return &client } diff --git a/api/service/syncing/syncing.go b/api/service/syncing/syncing.go index b8b840b61..f823eac2f 100644 --- a/api/service/syncing/syncing.go +++ b/api/service/syncing/syncing.go @@ -182,8 +182,8 @@ func (peerConfig *SyncPeerConfig) GetBlocks(hashes [][]byte) ([][]byte, error) { } // CreateSyncConfig creates SyncConfig for StateSync object. -func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer) error { - utils.GetLogInstance().Debug("CreateSyncConfig: len of peers", "len", len(peers)) +func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer, isBeacon bool) error { + utils.GetLogInstance().Debug("CreateSyncConfig: len of peers", "len", len(peers), "isBeacon", isBeacon) if len(peers) == 0 { return ctxerror.New("[SYNC] no peers to connect to") } @@ -206,7 +206,7 @@ func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer) error { }(peer) } wg.Wait() - utils.GetLogInstance().Info("[SYNC] Finished making connection to peers.") + utils.GetLogInstance().Info("[SYNC] Finished making connection to peers.", "len", len(ss.syncConfig.peers), "isBeacon", isBeacon) return nil } @@ -529,8 +529,10 @@ func (ss *StateSync) generateNewState(bc *core.BlockChain, worker *worker.Worker } // ProcessStateSync processes state sync from the blocks received but not yet processed so far -func (ss *StateSync) ProcessStateSync(startHash []byte, bc *core.BlockChain, worker *worker.Worker) { - ss.RegisterNodeInfo() +func (ss *StateSync) ProcessStateSync(startHash []byte, bc *core.BlockChain, worker *worker.Worker, isBeacon bool) { + if !isBeacon { + ss.RegisterNodeInfo() + } // Gets consensus hashes. if !ss.GetConsensusHashes(startHash) { utils.GetLogInstance().Debug("[SYNC] ProcessStateSync unable to reach consensus on ss.GetConsensusHashes") @@ -614,14 +616,14 @@ func (ss *StateSync) IsOutOfSync(bc *core.BlockChain) bool { } // SyncLoop will keep syncing with peers until catches up -func (ss *StateSync) SyncLoop(bc *core.BlockChain, worker *worker.Worker, willJoinConsensus bool) { +func (ss *StateSync) SyncLoop(bc *core.BlockChain, worker *worker.Worker, willJoinConsensus bool, isBeacon bool) { for { if !ss.IsOutOfSync(bc) { utils.GetLogInstance().Info("[SYNC] Node is now IN SYNC!") return } startHash := bc.CurrentBlock().Hash() - ss.ProcessStateSync(startHash[:], bc, worker) + ss.ProcessStateSync(startHash[:], bc, worker, isBeacon) } } diff --git a/cmd/harmony/main.go b/cmd/harmony/main.go index 76a6d0dc6..3c502c422 100644 --- a/cmd/harmony/main.go +++ b/cmd/harmony/main.go @@ -302,10 +302,9 @@ func main() { if consensus.IsLeader { go currentNode.SendPongMessage() } - // TODO: enable beacon chain sync - //if consensus.ShardID != 0 { - // go currentNode.SupportBeaconSyncing() - //} + if consensus.ShardID != 0 { + go currentNode.SupportBeaconSyncing() + } go currentNode.SupportSyncing() utils.GetLogInstance().Info("New Harmony Node ====", "Role", currentNode.NodeConfig.Role(), "multiaddress", fmt.Sprintf("/ip4/%s/tcp/%s/p2p/%s", *ip, *port, nodeConfig.Host.GetID().Pretty())) currentNode.ServiceManagerSetup() diff --git a/node/node.go b/node/node.go index 9fe36fbc5..57ddb77c2 100644 --- a/node/node.go +++ b/node/node.go @@ -430,13 +430,13 @@ func (node *Node) AddBeaconChainDatabase(db ethdb.Database) { database = ethdb.NewMemDatabase() } // TODO (chao) currently we use the same genesis block as normal shard - chain, err := node.GenesisBlockSetup(database, 0, false) + chain, err := node.GenesisBlockSetup(database, 0, true) if err != nil { utils.GetLogInstance().Error("Error when doing genesis setup") os.Exit(1) } node.beaconChain = chain - node.BeaconWorker = worker.New(params.TestChainConfig, chain, node.Consensus, pki.GetAddressFromPublicKey(node.SelfPeer.ConsensusPubKey), node.Consensus.ShardID) + node.BeaconWorker = worker.New(params.TestChainConfig, chain, &consensus.Consensus{}, pki.GetAddressFromPublicKey(node.SelfPeer.ConsensusPubKey), node.Consensus.ShardID) } // InitBlockChainFromDB retrieves the latest blockchain and state available from the local database diff --git a/node/node_handler.go b/node/node_handler.go index c8bc3a8f4..80d668068 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -165,7 +165,7 @@ func (node *Node) messageHandler(content []byte, sender string) { // for non-beaconchain node, subscribe to beacon block broadcast role := node.NodeConfig.Role() if proto_node.BlockMessageType(msgPayload[0]) == proto_node.Sync && (role == nodeconfig.ShardValidator || role == nodeconfig.ShardLeader || role == nodeconfig.NewNode) { - utils.GetLogInstance().Info("Block being handled by block channel", "self peer", node.SelfPeer) + utils.GetLogInstance().Info("Block being handled by block channel", "self peer", node.SelfPeer, "block", blocks[0].NumberU64()) for _, block := range blocks { node.BeaconBlockChannel <- block } diff --git a/node/node_syncing.go b/node/node_syncing.go index 3e723d395..14fe113d0 100644 --- a/node/node_syncing.go +++ b/node/node_syncing.go @@ -50,6 +50,27 @@ func (node *Node) GetSyncingPeers() []p2p.Peer { return node.getNeighborPeers(&node.Neighbors) } +// DoBeaconSyncing update received beaconchain blocks and downloads missing beacon chain blocks +func (node *Node) DoBeaconSyncing() { + for { + select { + case beaconBlock := <-node.BeaconBlockChannel: + if node.beaconSync == nil { + node.beaconSync = syncing.CreateStateSync(node.SelfPeer.IP, node.SelfPeer.Port, node.GetSyncID()) + } + if node.beaconSync.GetActivePeerNumber() == 0 { + peers := node.GetBeaconSyncingPeers() + if err := node.beaconSync.CreateSyncConfig(peers, true); err != nil { + ctxerror.Log15(utils.GetLogInstance().Debug, err) + continue + } + } + node.beaconSync.AddLastMileBlock(beaconBlock) + node.beaconSync.SyncLoop(node.beaconChain, node.BeaconWorker, false, true) + } + } +} + // DoSyncing keep the node in sync with other peers, willJoinConsensus means the node will try to join consensus after catch up func (node *Node) DoSyncing(bc *core.BlockChain, worker *worker.Worker, getPeers func() []p2p.Peer, willJoinConsensus bool) { ticker := time.NewTicker(SyncFrequency * time.Second) @@ -63,7 +84,7 @@ SyncingLoop: } if node.stateSync.GetActivePeerNumber() == 0 { peers := getPeers() - if err := node.stateSync.CreateSyncConfig(peers); err != nil { + if err := node.stateSync.CreateSyncConfig(peers, false); err != nil { ctxerror.Log15(utils.GetLogInstance().Debug, err) continue SyncingLoop } @@ -73,7 +94,7 @@ SyncingLoop: node.stateMutex.Lock() node.State = NodeNotInSync node.stateMutex.Unlock() - node.stateSync.SyncLoop(bc, worker, willJoinConsensus) + node.stateSync.SyncLoop(bc, worker, willJoinConsensus, false) if willJoinConsensus { node.stateMutex.Lock() node.State = NodeReadyForConsensus @@ -93,9 +114,7 @@ SyncingLoop: // SupportBeaconSyncing sync with beacon chain for archival node in beacon chan or non-beacon node func (node *Node) SupportBeaconSyncing() { - node.InitSyncingServer() - node.StartSyncingServer() - go node.DoSyncing(node.beaconChain, node.BeaconWorker, node.GetBeaconSyncingPeers, false) + go node.DoBeaconSyncing() } // SupportSyncing keeps sleeping until it's doing consensus or it's a leader.