beacon syncing; use p2p to broadcast new blocks

pull/724/head
chao 6 years ago
parent fefcbd4ca3
commit 61acdc41cb
  1. 12
      api/service/syncing/syncing.go
  2. 7
      cmd/harmony/main.go
  3. 2
      core/resharding.go
  4. 2
      node/node_handler.go
  5. 26
      node/node_syncing.go

@ -206,7 +206,7 @@ func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer) error {
}(peer) }(peer)
} }
wg.Wait() wg.Wait()
utils.GetLogInstance().Info("[SYNC] Finished making connection to peers.") utils.GetLogInstance().Info("[SYNC] Finished making connection to peers.", "len", len(ss.syncConfig.peers))
return nil return nil
} }
@ -529,8 +529,10 @@ func (ss *StateSync) generateNewState(bc *core.BlockChain, worker *worker.Worker
} }
// ProcessStateSync processes state sync from the blocks received but not yet processed so far // ProcessStateSync processes state sync from the blocks received but not yet processed so far
func (ss *StateSync) ProcessStateSync(startHash []byte, bc *core.BlockChain, worker *worker.Worker) { func (ss *StateSync) ProcessStateSync(startHash []byte, bc *core.BlockChain, worker *worker.Worker, isBeacon bool) {
if !isBeacon {
ss.RegisterNodeInfo() ss.RegisterNodeInfo()
}
// Gets consensus hashes. // Gets consensus hashes.
if !ss.GetConsensusHashes(startHash) { if !ss.GetConsensusHashes(startHash) {
utils.GetLogInstance().Debug("[SYNC] ProcessStateSync unable to reach consensus on ss.GetConsensusHashes") utils.GetLogInstance().Debug("[SYNC] ProcessStateSync unable to reach consensus on ss.GetConsensusHashes")
@ -614,14 +616,14 @@ func (ss *StateSync) IsOutOfSync(bc *core.BlockChain) bool {
} }
// SyncLoop will keep syncing with peers until catches up // SyncLoop will keep syncing with peers until catches up
func (ss *StateSync) SyncLoop(bc *core.BlockChain, worker *worker.Worker, willJoinConsensus bool) { func (ss *StateSync) SyncLoop(bc *core.BlockChain, worker *worker.Worker, willJoinConsensus bool, isBeacon bool) {
for { for {
if !ss.IsOutOfSync(bc) { if !isBeacon && !ss.IsOutOfSync(bc) {
utils.GetLogInstance().Info("[SYNC] Node is now IN SYNC!") utils.GetLogInstance().Info("[SYNC] Node is now IN SYNC!")
return return
} }
startHash := bc.CurrentBlock().Hash() startHash := bc.CurrentBlock().Hash()
ss.ProcessStateSync(startHash[:], bc, worker) ss.ProcessStateSync(startHash[:], bc, worker, isBeacon)
} }
} }

@ -297,10 +297,9 @@ func main() {
if consensus.IsLeader { if consensus.IsLeader {
go currentNode.SendPongMessage() go currentNode.SendPongMessage()
} }
// TODO: enable beacon chain sync if consensus.ShardID != 0 {
//if consensus.ShardID != 0 { go currentNode.SupportBeaconSyncing()
// go currentNode.SupportBeaconSyncing() }
//}
go currentNode.SupportSyncing() go currentNode.SupportSyncing()
utils.GetLogInstance().Info("New Harmony Node ====", "Role", currentNode.NodeConfig.Role(), "multiaddress", fmt.Sprintf("/ip4/%s/tcp/%s/p2p/%s", *ip, *port, nodeConfig.Host.GetID().Pretty())) utils.GetLogInstance().Info("New Harmony Node ====", "Role", currentNode.NodeConfig.Role(), "multiaddress", fmt.Sprintf("/ip4/%s/tcp/%s/p2p/%s", *ip, *port, nodeConfig.Host.GetID().Pretty()))
currentNode.ServiceManagerSetup() currentNode.ServiceManagerSetup()

@ -24,7 +24,7 @@ const (
// GenesisShardNum is the number of shard at genesis // GenesisShardNum is the number of shard at genesis
GenesisShardNum = 4 GenesisShardNum = 4
// GenesisShardSize is the size of each shard at genesis // GenesisShardSize is the size of each shard at genesis
GenesisShardSize = 50 GenesisShardSize = 5
// CuckooRate is the percentage of nodes getting reshuffled in the second step of cuckoo resharding. // CuckooRate is the percentage of nodes getting reshuffled in the second step of cuckoo resharding.
CuckooRate = 0.1 CuckooRate = 0.1
) )

@ -146,7 +146,7 @@ func (node *Node) messageHandler(content []byte, sender string) {
// for non-beaconchain node, subscribe to beacon block broadcast // for non-beaconchain node, subscribe to beacon block broadcast
role := node.NodeConfig.Role() role := node.NodeConfig.Role()
if proto_node.BlockMessageType(msgPayload[0]) == proto_node.Sync && (role == nodeconfig.ShardValidator || role == nodeconfig.ShardLeader || role == nodeconfig.NewNode) { if proto_node.BlockMessageType(msgPayload[0]) == proto_node.Sync && (role == nodeconfig.ShardValidator || role == nodeconfig.ShardLeader || role == nodeconfig.NewNode) {
utils.GetLogInstance().Info("Block being handled by block channel", "self peer", node.SelfPeer) utils.GetLogInstance().Info("Block being handled by block channel", "self peer", node.SelfPeer, "block", blocks[0].NumberU64())
for _, block := range blocks { for _, block := range blocks {
node.BeaconBlockChannel <- block node.BeaconBlockChannel <- block
} }

@ -50,6 +50,26 @@ func (node *Node) GetSyncingPeers() []p2p.Peer {
return node.getNeighborPeers(&node.Neighbors) return node.getNeighborPeers(&node.Neighbors)
} }
// DoBeaconSyncing update received beaconchain blocks and downloads missing beacon chain blocks
func (node *Node) DoBeaconSyncing() {
for {
select {
case beaconBlock := <-node.BeaconBlockChannel:
if node.beaconSync == nil {
node.beaconSync = syncing.CreateStateSync(node.SelfPeer.IP, node.SelfPeer.Port, node.GetSyncID())
}
if node.beaconSync.GetActivePeerNumber() == 0 {
peers := node.GetBeaconSyncingPeers()
if err := node.beaconSync.CreateSyncConfig(peers); err != nil {
ctxerror.Log15(utils.GetLogInstance().Debug, err)
}
}
node.beaconSync.AddLastMileBlock(beaconBlock)
node.beaconSync.SyncLoop(node.beaconChain, node.BeaconWorker, false, true)
}
}
}
// DoSyncing keep the node in sync with other peers, willJoinConsensus means the node will try to join consensus after catch up // DoSyncing keep the node in sync with other peers, willJoinConsensus means the node will try to join consensus after catch up
func (node *Node) DoSyncing(bc *core.BlockChain, worker *worker.Worker, getPeers func() []p2p.Peer, willJoinConsensus bool) { func (node *Node) DoSyncing(bc *core.BlockChain, worker *worker.Worker, getPeers func() []p2p.Peer, willJoinConsensus bool) {
ticker := time.NewTicker(SyncFrequency * time.Second) ticker := time.NewTicker(SyncFrequency * time.Second)
@ -73,7 +93,7 @@ SyncingLoop:
node.stateMutex.Lock() node.stateMutex.Lock()
node.State = NodeNotInSync node.State = NodeNotInSync
node.stateMutex.Unlock() node.stateMutex.Unlock()
node.stateSync.SyncLoop(bc, worker, willJoinConsensus) node.stateSync.SyncLoop(bc, worker, willJoinConsensus, false)
if willJoinConsensus { if willJoinConsensus {
node.stateMutex.Lock() node.stateMutex.Lock()
node.State = NodeReadyForConsensus node.State = NodeReadyForConsensus
@ -93,9 +113,7 @@ SyncingLoop:
// SupportBeaconSyncing sync with beacon chain for archival node in beacon chan or non-beacon node // SupportBeaconSyncing sync with beacon chain for archival node in beacon chan or non-beacon node
func (node *Node) SupportBeaconSyncing() { func (node *Node) SupportBeaconSyncing() {
node.InitSyncingServer() node.DoBeaconSyncing()
node.StartSyncingServer()
go node.DoSyncing(node.beaconChain, node.BeaconWorker, node.GetBeaconSyncingPeers, false)
} }
// SupportSyncing keeps sleeping until it's doing consensus or it's a leader. // SupportSyncing keeps sleeping until it's doing consensus or it's a leader.

Loading…
Cancel
Save