@ -7,7 +7,7 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/api/service/syncing"
@ -78,7 +78,7 @@ func (node *Node) GetPeersFromDNS() []p2p.Peer {
dns := fmt . Sprintf ( "s%d.%s" , shardID , node . dnsZone )
addrs , err := net . LookupHost ( dns )
if err != nil {
utils . GetLogInstance ( ) . Debug ( "[SYNC] GetPeersFromDNS cannot find peers" , "error" , err )
utils . Logger ( ) . Debug ( ) . Msg ( "[SYNC] GetPeersFromDNS cannot find peers" )
return nil
}
port := syncing . GetSyncingPort ( node . SelfPeer . Port )
@ -114,24 +114,22 @@ func (node *Node) DoBeaconSyncing() {
func ( node * Node ) DoSyncing ( bc * core . BlockChain , worker * worker . Worker , getPeers func ( ) [ ] p2p . Peer , willJoinConsensus bool ) {
ticker := time . NewTicker ( SyncFrequency * time . Second )
logger := utils . GetLogInstance ( )
getLogger := func ( ) log . Logger { return utils . WithCallerSkip ( logger , 1 ) }
SyncingLoop :
for {
select {
case <- ticker . C :
if node . stateSync == nil {
node . stateSync = syncing . CreateStateSync ( node . SelfPeer . IP , node . SelfPeer . Port , node . GetSyncID ( ) )
logger = logger . New ( "syncID" , node . GetSyncID ( ) )
get Logger( ) . Debug ( "[SYNC] initialized state sync" )
utils . Logger ( ) . Debug ( ) . Ms g ( "[SYNC] initialized state sync" )
}
if node . stateSync . GetActivePeerNumber ( ) < MinConnectedPeers {
peers := getPeers ( )
if err := node . stateSync . CreateSyncConfig ( peers , false ) ; err != nil {
get Logger( ) . Debug ( "[SYNC] create peers error" , "error" , err )
utils . Logger ( ) . Debug ( ) . Msg ( "[SYNC] create peers error" )
continue SyncingLoop
}
get Logger( ) . Debug ( "[SYNC] Get Active Peers" , "len" , node . stateSync . GetActivePeerNumber ( ) )
utils . Logger ( ) . Debug ( ) . Int ( "len" , node . stateSync . GetActivePeerNumber ( ) ) . Msg ( "[SYNC] Get Active Peers" )
}
if node . stateSync . IsOutOfSync ( bc ) {
node . stateMutex . Lock ( )
@ -192,7 +190,7 @@ func (node *Node) InitSyncingServer() {
// StartSyncingServer starts syncing server.
func ( node * Node ) StartSyncingServer ( ) {
utils . GetLogInstance ( ) . Info ( "[SYNC] support_syncing: StartSyncingServer" )
utils . Logger ( ) . Info ( ) . Msg ( "[SYNC] support_syncing: StartSyncingServer" )
if node . downloaderServer . GrpcServer == nil {
node . downloaderServer . Start ( node . SelfPeer . IP , syncing . GetSyncingPort ( node . SelfPeer . Port ) )
}
@ -204,7 +202,7 @@ func (node *Node) SendNewBlockToUnsync() {
block := <- node . Consensus . VerifiedNewBlock
blockHash , err := rlp . EncodeToBytes ( block )
if err != nil {
utils . GetLogInstance ( ) . Warn ( "[SYNC] unable to encode block to hashes" )
utils . Logger ( ) . Warn ( ) . Msg ( "[SYNC] unable to encode block to hashes" )
continue
}
@ -212,7 +210,7 @@ func (node *Node) SendNewBlockToUnsync() {
for peerID , config := range node . peerRegistrationRecord {
elapseTime := time . Now ( ) . UnixNano ( ) - config . timestamp
if elapseTime > broadcastTimeout {
utils . GetLogInstance ( ) . Warn ( "[SYNC] SendNewBlockToUnsync to peer timeout" , "peerID" , peerID )
utils . Logger ( ) . Warn ( ) . Str ( "peerID" , peerID ) . Msg ( "[SYNC] SendNewBlockToUnsync to peer timeout" )
node . peerRegistrationRecord [ peerID ] . client . Close ( )
delete ( node . peerRegistrationRecord , peerID )
continue
@ -248,7 +246,14 @@ func (node *Node) CalculateResponse(request *downloader_pb.DownloaderRequest, in
startHeight := startBlock . NumberU64 ( )
endHeight := node . Blockchain ( ) . CurrentBlock ( ) . NumberU64 ( )
if startHeight >= endHeight {
utils . GetLogInstance ( ) . Debug ( "[SYNC] GetBlockHashes Request: I am not higher than requested node" , "myHeight" , endHeight , "requestHeight" , startHeight , "incomingIP" , request . Ip , "incomingPort" , request . Port , "incomingPeer" , incomingPeer )
utils . Logger ( ) .
Debug ( ) .
Uint64 ( "myHeight" , endHeight ) .
Uint64 ( "requestHeight" , startHeight ) .
Str ( "incomingIP" , request . Ip ) .
Str ( "incomingPort" , request . Port ) .
Str ( "incomingPeer" , incomingPeer ) .
Msg ( "[SYNC] GetBlockHashes Request: I am not higher than requested node" )
return response , nil
}
@ -281,14 +286,16 @@ func (node *Node) CalculateResponse(request *downloader_pb.DownloaderRequest, in
// this is the out of sync node acts as grpc server side
case downloader_pb . DownloaderRequest_NEWBLOCK :
if node . State != NodeNotInSync {
utils . GetLogInstance ( ) . Debug ( "[SYNC] new block received, but state is" , "state" , node . State . String ( ) )
utils . Logger ( ) . Debug ( ) .
Str ( "state" , node . State . String ( ) ) .
Msg ( "[SYNC] new block received, but state is" )
response . Type = downloader_pb . DownloaderResponse_INSYNC
return response , nil
}
var blockObj types . Block
err := rlp . DecodeBytes ( request . BlockHash , & blockObj )
if err != nil {
utils . GetLogInstance ( ) . Warn ( "[SYNC] unable to decode received new block" )
utils . Logger ( ) . Warn ( ) . Msg ( "[SYNC] unable to decode received new block" )
return response , err
}
node . stateSync . AddNewBlock ( request . PeerHash , & blockObj )
@ -301,7 +308,10 @@ func (node *Node) CalculateResponse(request *downloader_pb.DownloaderRequest, in
defer node . stateMutex . Unlock ( )
if _ , ok := node . peerRegistrationRecord [ peerID ] ; ok {
response . Type = downloader_pb . DownloaderResponse_FAIL
utils . GetLogInstance ( ) . Warn ( "[SYNC] peerRegistration record already exists" , "ip" , ip , "port" , port )
utils . Logger ( ) . Warn ( ) .
Interface ( "ip" , ip ) .
Interface ( "port" , port ) .
Msg ( "[SYNC] peerRegistration record already exists" )
return response , nil
} else if len ( node . peerRegistrationRecord ) >= maxBroadcastNodes {
response . Type = downloader_pb . DownloaderResponse_FAIL
@ -312,19 +322,27 @@ func (node *Node) CalculateResponse(request *downloader_pb.DownloaderRequest, in
syncPort := syncing . GetSyncingPort ( port )
client := downloader . ClientSetup ( ip , syncPort )
if client == nil {
utils . GetLogInstance ( ) . Warn ( "[SYNC] unable to setup client for peerID" , "ip" , ip , "port" , port )
utils . Logger ( ) . Warn ( ) .
Str ( "ip" , ip ) .
Str ( "port" , port ) .
Msg ( "[SYNC] unable to setup client for peerID" )
return response , nil
}
config := & syncConfig { timestamp : time . Now ( ) . UnixNano ( ) , client : client }
node . peerRegistrationRecord [ peerID ] = config
utils . GetLogInstance ( ) . Debug ( "[SYNC] register peerID success" , "ip" , ip , "port" , port )
utils . Logger ( ) . Debug ( ) .
Str ( "ip" , ip ) .
Str ( "port" , port ) .
Msg ( "[SYNC] register peerID success" )
response . Type = downloader_pb . DownloaderResponse_SUCCESS
}
case downloader_pb . DownloaderRequest_REGISTERTIMEOUT :
if node . State == NodeNotInSync {
count := node . stateSync . RegisterNodeInfo ( )
utils . GetLogInstance ( ) . Debug ( "[SYNC] extra node registered" , "number" , count )
utils . Logger ( ) . Debug ( ) .
Int ( "number" , count ) .
Msg ( "[SYNC] extra node registered" )
}
}
return response , nil