@ -12,16 +12,19 @@ import (
"github.com/harmony-one/harmony/api/service/syncing"
"github.com/harmony-one/harmony/api/service/syncing/downloader"
downloader_pb "github.com/harmony-one/harmony/api/service/syncing/downloader/proto"
"github.com/harmony-one/harmony/consensus "
"github.com/harmony-one/harmony/core "
"github.com/harmony-one/harmony/core/types"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/node/worker"
"github.com/harmony-one/harmony/p2p"
)
// Constants related to doing syncing.
const (
lastMileThreshold = 4
inSyncThreshold = 1
inSyncThreshold = 1 // unit in number of block
SyncFrequency = 10 // unit in second
)
// GetSyncingPort returns the syncing port.
@ -61,106 +64,64 @@ func (node *Node) GetSyncingPeers() []p2p.Peer {
return node . getNeighborPeers ( & node . Neighbors )
}
// DoBeaconSyncing update received beaconchain blocks and downloads missing beacon chain blocks
func ( node * Node ) DoBeaconSyncing ( ) {
for {
select {
case beaconBlock := <- node . BeaconBlockChannel :
if node . beaconSync == nil {
node . beaconSync = syncing . CreateStateSync ( node . SelfPeer . IP , node . SelfPeer . Port , node . GetSyncID ( ) )
node . beaconSync . CreateSyncConfig ( node . GetBeaconSyncingPeers ( ) )
node . beaconSync . MakeConnectionToPeers ( )
}
startHash := node . beaconChain . CurrentBlock ( ) . Hash ( )
node . beaconSync . AddLastMileBlock ( beaconBlock )
node . beaconSync . ProcessStateSync ( startHash [ : ] , node . beaconChain , node . BeaconWorker )
utils . GetLogInstance ( ) . Debug ( "[SYNC] STARTING BEACON SYNC" )
}
}
}
// IsOutOfSync checks whether the node is out of sync by comparing latest block with consensus block
func ( node * Node ) IsOutOfSync ( consensusBlockInfo * consensus . BFTBlockInfo ) bool {
consensusBlock := consensusBlockInfo . Block
consensusID := consensusBlockInfo . ConsensusID
myHeight := node . blockchain . CurrentBlock ( ) . NumberU64 ( )
newHeight := consensusBlock . NumberU64 ( )
utils . GetLogInstance ( ) . Debug ( "[SYNC]" , "myHeight" , myHeight , "newHeight" , newHeight )
if newHeight - myHeight <= inSyncThreshold {
node . stateSync . AddLastMileBlock ( consensusBlock )
node . Consensus . UpdateConsensusID ( consensusID + 1 )
return false
}
// cache latest blocks for last mile catch up
if newHeight - myHeight <= lastMileThreshold && node . stateSync != nil {
node . stateSync . AddLastMileBlock ( consensusBlock )
}
return true
}
// DoSyncing keep the node in sync with other peers, willJoinConsensus means the node will try to join consensus after catch up
func ( node * Node ) DoSyncing ( bc * core . BlockChain , worker * worker . Worker , getPeers func ( ) [ ] p2p . Peer , willJoinConsensus bool ) {
ticker := time . NewTicker ( SyncFrequency * time . Second )
// DoSync syncs with peers until catchup, this function is not coupled with consensus
func ( node * Node ) DoSync ( ) {
<- node . peerReadyChan
ss := syncing . CreateStateSync ( node . SelfPeer . IP , node . SelfPeer . Port , node . GetSyncID ( ) )
if ss . CreateSyncConfig ( node . GetSyncingPeers ( ) ) {
ss . MakeConnectionToPeers ( )
ss . SyncLoop ( node . blockchain , node . Worker )
}
}
// DoSyncing wait for check status and starts syncing if out of sync
func ( node * Node ) DoSyncing ( ) {
SyncingLoop :
for {
select {
// in current implementation logic, timeout means in sync
case <- time . After ( 5 * time . Second ) :
//myHeight := node.blockchain.CurrentBlock().NumberU64()
//utils.GetLogInstance().Debug("[SYNC]", "currentHeight", myHeight)
node . stateMutex . Lock ( )
node . State = NodeReadyForConsensus
node . stateMutex . Unlock ( )
continue
case consensusBlockInfo := <- node . Consensus . ConsensusBlock :
if ! node . IsOutOfSync ( consensusBlockInfo ) {
startHash := node . blockchain . CurrentBlock ( ) . Hash ( )
node . stateSync . ProcessStateSync ( startHash [ : ] , node . blockchain , node . Worker )
if node . State == NodeNotInSync {
utils . GetLogInstance ( ) . Info ( "[SYNC] Node is now IN SYNC!" )
case <- ticker . C :
if willJoinConsensus {
<- node . Consensus . ConsensusIDLowChan
}
if node . stateSync == nil {
node . stateSync = syncing . CreateStateSync ( node . SelfPeer . IP , node . SelfPeer . Port , node . GetSyncID ( ) )
}
if node . stateSync . GetActivePeerNumber ( ) == 0 {
if node . stateSync . CreateSyncConfig ( getPeers ( ) ) {
node . stateSync . MakeConnectionToPeers ( )
} else {
utils . GetLogInstance ( ) . Debug ( "[SYNC] no active peers, continue SyncingLoop" )
continue SyncingLoop
}
node . stateMutex . Lock ( )
node . State = NodeReadyForConsensus
node . stateMutex . Unlock ( )
node . stateSync . CloseConnections ( )
node . stateSync = nil
continue
} else {
utils . GetLogInstance ( ) . Debug ( "[SYNC] node is out of sync" )
}
if node . stateSync . IsOutOfSync ( ) {
utils . GetLogInstance ( ) . Debug ( "[SYNC] out of sync, doing syncing" )
node . stateMutex . Lock ( )
node . State = NodeNotInSync
node . stateMutex . Unlock ( )
node . stateSync . SyncLoop ( bc , worker , willJoinConsensus )
if willJoinConsensus {
node . stateMutex . Lock ( )
node . State = NodeReadyForConsensus
node . stateMutex . Unlock ( )
node . Consensus . ToggleConsensusCheck ( )
}
}
if node . stateSync == nil {
node . stateSync = syncing . CreateStateSync ( node . SelfPeer . IP , node . SelfPeer . Port , node . GetSyncID ( ) )
node . stateSync . CreateSyncConfig ( node . GetSyncingPeers ( ) )
node . stateSync . MakeConnectionToPeers ( )
}
startHash := node . blockchain . CurrentBlock ( ) . Hash ( )
node . stateSync . ProcessStateSync ( startHash [ : ] , node . blockchain , node . Worker )
node . stateMutex . Lock ( )
node . State = NodeReadyForConsensus
node . stateMutex . Unlock ( )
}
}
}
// SupportBeaconSyncing sync with beacon chain for archival node in beacon chan or non-beacon node
func ( node * Node ) SupportBeaconSyncing ( ) {
node . InitSyncingServer ( )
node . StartSyncingServer ( )
go node . DoSyncing ( node . beaconChain , node . BeaconWorker , node . GetBeaconSyncingPeers , false )
}
// SupportSyncing keeps sleeping until it's doing consensus or it's a leader.
func ( node * Node ) SupportSyncing ( ) {
node . InitSyncingServer ( )
node . StartSyncingServer ( )
//go node.DoSyncing()
go node . DoSync ( )
go node . DoBeaconSyncing ( )
go node . SendNewBlockToUnsync ( )
if node . NodeConfig . Role ( ) != nodeconfig . ShardLeader && node . NodeConfig . Role ( ) != nodeconfig . BeaconLeader {
go node . DoSyncing ( node . blockchain , node . Worker , node . GetSyncingPeers , true )
}
}
// InitSyncingServer starts downloader server.