Merge pull request #724 from chaosma/chao-wip

beacon chain syncing
pull/741/head
chaosma 6 years ago committed by GitHub
commit c977b07bb4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      .gitignore
  2. 2
      api/service/syncing/downloader/client.go
  3. 16
      api/service/syncing/syncing.go
  4. 7
      cmd/harmony/main.go
  5. 4
      node/node.go
  6. 2
      node/node_handler.go
  7. 29
      node/node_syncing.go

3
.gitignore vendored

@ -64,3 +64,6 @@ node_modules/
# go mod summary file
go.sum
# bnkey
test/.bnkey

@ -27,7 +27,7 @@ func ClientSetup(ip, port string) *Client {
utils.GetLogInstance().Info("client.go:ClientSetup fail to dial: ", "error", err)
return nil
}
utils.GetLogInstance().Info("[SYNC] grpc connect successfully", "ip", ip, "port", port)
client.dlClient = pb.NewDownloaderClient(client.conn)
return &client
}

@ -182,8 +182,8 @@ func (peerConfig *SyncPeerConfig) GetBlocks(hashes [][]byte) ([][]byte, error) {
}
// CreateSyncConfig creates SyncConfig for StateSync object.
func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer) error {
utils.GetLogInstance().Debug("CreateSyncConfig: len of peers", "len", len(peers))
func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer, isBeacon bool) error {
utils.GetLogInstance().Debug("CreateSyncConfig: len of peers", "len", len(peers), "isBeacon", isBeacon)
if len(peers) == 0 {
return ctxerror.New("[SYNC] no peers to connect to")
}
@ -206,7 +206,7 @@ func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer) error {
}(peer)
}
wg.Wait()
utils.GetLogInstance().Info("[SYNC] Finished making connection to peers.")
utils.GetLogInstance().Info("[SYNC] Finished making connection to peers.", "len", len(ss.syncConfig.peers), "isBeacon", isBeacon)
return nil
}
@ -529,8 +529,10 @@ func (ss *StateSync) generateNewState(bc *core.BlockChain, worker *worker.Worker
}
// ProcessStateSync processes state sync from the blocks received but not yet processed so far
func (ss *StateSync) ProcessStateSync(startHash []byte, bc *core.BlockChain, worker *worker.Worker) {
ss.RegisterNodeInfo()
func (ss *StateSync) ProcessStateSync(startHash []byte, bc *core.BlockChain, worker *worker.Worker, isBeacon bool) {
if !isBeacon {
ss.RegisterNodeInfo()
}
// Gets consensus hashes.
if !ss.GetConsensusHashes(startHash) {
utils.GetLogInstance().Debug("[SYNC] ProcessStateSync unable to reach consensus on ss.GetConsensusHashes")
@ -614,14 +616,14 @@ func (ss *StateSync) IsOutOfSync(bc *core.BlockChain) bool {
}
// SyncLoop will keep syncing with peers until catches up
func (ss *StateSync) SyncLoop(bc *core.BlockChain, worker *worker.Worker, willJoinConsensus bool) {
func (ss *StateSync) SyncLoop(bc *core.BlockChain, worker *worker.Worker, willJoinConsensus bool, isBeacon bool) {
for {
if !ss.IsOutOfSync(bc) {
utils.GetLogInstance().Info("[SYNC] Node is now IN SYNC!")
return
}
startHash := bc.CurrentBlock().Hash()
ss.ProcessStateSync(startHash[:], bc, worker)
ss.ProcessStateSync(startHash[:], bc, worker, isBeacon)
}
}

@ -302,10 +302,9 @@ func main() {
if consensus.IsLeader {
go currentNode.SendPongMessage()
}
// TODO: enable beacon chain sync
//if consensus.ShardID != 0 {
// go currentNode.SupportBeaconSyncing()
//}
if consensus.ShardID != 0 {
go currentNode.SupportBeaconSyncing()
}
go currentNode.SupportSyncing()
utils.GetLogInstance().Info("New Harmony Node ====", "Role", currentNode.NodeConfig.Role(), "multiaddress", fmt.Sprintf("/ip4/%s/tcp/%s/p2p/%s", *ip, *port, nodeConfig.Host.GetID().Pretty()))
currentNode.ServiceManagerSetup()

@ -430,13 +430,13 @@ func (node *Node) AddBeaconChainDatabase(db ethdb.Database) {
database = ethdb.NewMemDatabase()
}
// TODO (chao) currently we use the same genesis block as normal shard
chain, err := node.GenesisBlockSetup(database, 0, false)
chain, err := node.GenesisBlockSetup(database, 0, true)
if err != nil {
utils.GetLogInstance().Error("Error when doing genesis setup")
os.Exit(1)
}
node.beaconChain = chain
node.BeaconWorker = worker.New(params.TestChainConfig, chain, node.Consensus, pki.GetAddressFromPublicKey(node.SelfPeer.ConsensusPubKey), node.Consensus.ShardID)
node.BeaconWorker = worker.New(params.TestChainConfig, chain, &consensus.Consensus{}, pki.GetAddressFromPublicKey(node.SelfPeer.ConsensusPubKey), node.Consensus.ShardID)
}
// InitBlockChainFromDB retrieves the latest blockchain and state available from the local database

@ -165,7 +165,7 @@ func (node *Node) messageHandler(content []byte, sender string) {
// for non-beaconchain node, subscribe to beacon block broadcast
role := node.NodeConfig.Role()
if proto_node.BlockMessageType(msgPayload[0]) == proto_node.Sync && (role == nodeconfig.ShardValidator || role == nodeconfig.ShardLeader || role == nodeconfig.NewNode) {
utils.GetLogInstance().Info("Block being handled by block channel", "self peer", node.SelfPeer)
utils.GetLogInstance().Info("Block being handled by block channel", "self peer", node.SelfPeer, "block", blocks[0].NumberU64())
for _, block := range blocks {
node.BeaconBlockChannel <- block
}

@ -50,6 +50,27 @@ func (node *Node) GetSyncingPeers() []p2p.Peer {
return node.getNeighborPeers(&node.Neighbors)
}
// DoBeaconSyncing update received beaconchain blocks and downloads missing beacon chain blocks
func (node *Node) DoBeaconSyncing() {
for {
select {
case beaconBlock := <-node.BeaconBlockChannel:
if node.beaconSync == nil {
node.beaconSync = syncing.CreateStateSync(node.SelfPeer.IP, node.SelfPeer.Port, node.GetSyncID())
}
if node.beaconSync.GetActivePeerNumber() == 0 {
peers := node.GetBeaconSyncingPeers()
if err := node.beaconSync.CreateSyncConfig(peers, true); err != nil {
ctxerror.Log15(utils.GetLogInstance().Debug, err)
continue
}
}
node.beaconSync.AddLastMileBlock(beaconBlock)
node.beaconSync.SyncLoop(node.beaconChain, node.BeaconWorker, false, true)
}
}
}
// DoSyncing keep the node in sync with other peers, willJoinConsensus means the node will try to join consensus after catch up
func (node *Node) DoSyncing(bc *core.BlockChain, worker *worker.Worker, getPeers func() []p2p.Peer, willJoinConsensus bool) {
ticker := time.NewTicker(SyncFrequency * time.Second)
@ -63,7 +84,7 @@ SyncingLoop:
}
if node.stateSync.GetActivePeerNumber() == 0 {
peers := getPeers()
if err := node.stateSync.CreateSyncConfig(peers); err != nil {
if err := node.stateSync.CreateSyncConfig(peers, false); err != nil {
ctxerror.Log15(utils.GetLogInstance().Debug, err)
continue SyncingLoop
}
@ -73,7 +94,7 @@ SyncingLoop:
node.stateMutex.Lock()
node.State = NodeNotInSync
node.stateMutex.Unlock()
node.stateSync.SyncLoop(bc, worker, willJoinConsensus)
node.stateSync.SyncLoop(bc, worker, willJoinConsensus, false)
if willJoinConsensus {
node.stateMutex.Lock()
node.State = NodeReadyForConsensus
@ -93,9 +114,7 @@ SyncingLoop:
// SupportBeaconSyncing sync with beacon chain for archival node in beacon chan or non-beacon node
func (node *Node) SupportBeaconSyncing() {
node.InitSyncingServer()
node.StartSyncingServer()
go node.DoSyncing(node.beaconChain, node.BeaconWorker, node.GetBeaconSyncingPeers, false)
go node.DoBeaconSyncing()
}
// SupportSyncing keeps sleeping until it's doing consensus or it's a leader.

Loading…
Cancel
Save