@ -16,7 +16,6 @@ import (
"github.com/Workiva/go-datastructures/queue"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/api/service/syncing/downloader"
pb "github.com/harmony-one/harmony/api/service/syncing/downloader/proto"
@ -48,9 +47,6 @@ func (peerConfig *SyncPeerConfig) GetClient() *downloader.Client {
return peerConfig . client
}
// Log is the temporary log for syncing.
var Log = log . New ( )
// SyncBlockTask is the task struct to sync a specific block.
type SyncBlockTask struct {
index int
@ -107,7 +103,7 @@ func GetServicePort(nodePort string) string {
if port , err := strconv . Atoi ( nodePort ) ; err == nil {
return fmt . Sprintf ( "%d" , port + SyncingPortDifference )
}
Log . Warn ( "unable to get service port" )
utils . Get LogInstance ( ) . Warn ( "unable to get service port" )
return ""
}
@ -123,7 +119,7 @@ func (ss *StateSync) AddNewBlock(peerHash []byte, block *types.Block) {
pc . mux . Lock ( )
pc . newBlocks = append ( pc . newBlocks , block )
pc . mux . Unlock ( )
Log . Debug ( "[SYNC] new block received" , "total" , len ( ss . syncConfig . peers [ i ] . newBlocks ) , "blockHeight" , block . NumberU64 ( ) )
utils . Get LogInstance ( ) . Debug ( "[SYNC] new block received" , "total" , len ( ss . syncConfig . peers [ i ] . newBlocks ) , "blockHeight" , block . NumberU64 ( ) )
}
}
@ -165,8 +161,8 @@ func (peerConfig *SyncPeerConfig) GetBlocks(hashes [][]byte) ([][]byte, error) {
// CreateSyncConfig creates SyncConfig for StateSync object.
func ( ss * StateSync ) CreateSyncConfig ( peers [ ] p2p . Peer ) {
Log . Debug ( "CreateSyncConfig: len of peers" , "len" , len ( peers ) )
Log . Debug ( "CreateSyncConfig: len of peers" , "peers" , peers )
utils . Get LogInstance ( ) . Debug ( "CreateSyncConfig: len of peers" , "len" , len ( peers ) )
utils . Get LogInstance ( ) . Debug ( "CreateSyncConfig: len of peers" , "peers" , peers )
ss . peerNumber = len ( peers )
ss . syncConfig = & SyncConfig {
peers : make ( [ ] * SyncPeerConfig , ss . peerNumber ) ,
@ -176,9 +172,9 @@ func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer) {
ip : peers [ id ] . IP ,
port : peers [ id ] . Port ,
}
Log . Debug ( "[SYNC] CreateSyncConfig: peer port to connect" , "port" , peers [ id ] . Port )
utils . Get LogInstance ( ) . Debug ( "[SYNC] CreateSyncConfig: peer port to connect" , "port" , peers [ id ] . Port )
}
Log . Info ( "[SYNC] syncing: Finished creating SyncConfig." )
utils . Get LogInstance ( ) . Info ( "[SYNC] syncing: Finished creating SyncConfig." )
}
// MakeConnectionToPeers makes grpc connection to all peers.
@ -194,7 +190,7 @@ func (ss *StateSync) MakeConnectionToPeers() {
}
wg . Wait ( )
ss . CleanUpNilPeers ( )
Log . Info ( "syncing: Finished making connection to peers." )
utils . Get LogInstance ( ) . Info ( "syncing: Finished making connection to peers." )
}
// CleanUpNilPeers cleans up peer with nil client and recalculate activePeerNumber.
@ -293,13 +289,13 @@ func (ss *StateSync) GetConsensusHashes(startHash []byte) bool {
break
}
if count > TimesToFail {
Log . Info ( "GetConsensusHashes: reached # of times to failed" )
utils . Get LogInstance ( ) . Info ( "GetConsensusHashes: reached # of times to failed" )
return false
}
count ++
time . Sleep ( SleepTimeAfterNonConsensusBlockHashes )
}
Log . Info ( "syncing: Finished getting consensus block hashes." )
utils . Get LogInstance ( ) . Info ( "syncing: Finished getting consensus block hashes." )
return true
}
@ -313,7 +309,7 @@ func (ss *StateSync) generateStateSyncTaskQueue(bc *core.BlockChain) {
break
}
}
Log . Info ( "syncing: Finished generateStateSyncTaskQueue" , "length" , ss . stateSyncTaskQueue . Len ( ) )
utils . Get LogInstance ( ) . Info ( "syncing: Finished generateStateSyncTaskQueue" , "length" , ss . stateSyncTaskQueue . Len ( ) )
}
// downloadBlocks downloads blocks from state sync task queue.
@ -331,7 +327,7 @@ func (ss *StateSync) downloadBlocks(bc *core.BlockChain) {
for ! stateSyncTaskQueue . Empty ( ) {
task , err := ss . stateSyncTaskQueue . Poll ( 1 , time . Millisecond )
if err == queue . ErrTimeout {
Log . Debug ( "[SYNC] ss.stateSyncTaskQueue poll timeout" , "error" , err )
utils . Get LogInstance ( ) . Debug ( "[SYNC] ss.stateSyncTaskQueue poll timeout" , "error" , err )
break
}
syncTask := task [ 0 ] . ( SyncBlockTask )
@ -339,7 +335,7 @@ func (ss *StateSync) downloadBlocks(bc *core.BlockChain) {
payload , err := peerConfig . GetBlocks ( [ ] [ ] byte { syncTask . blockHash } )
if err != nil {
count ++
Log . Debug ( "[SYNC] GetBlocks failed" , "failNumber" , count )
utils . Get LogInstance ( ) . Debug ( "[SYNC] GetBlocks failed" , "failNumber" , count )
if count > TimesToFail {
break
}
@ -352,7 +348,7 @@ func (ss *StateSync) downloadBlocks(bc *core.BlockChain) {
err = rlp . DecodeBytes ( payload [ 0 ] , & blockObj )
if err != nil {
count ++
Log . Debug ( "[SYNC] downloadBlocks: failed to DecodeBytes from received new block" )
utils . Get LogInstance ( ) . Debug ( "[SYNC] downloadBlocks: failed to DecodeBytes from received new block" )
if count > TimesToFail {
break
}
@ -366,7 +362,7 @@ func (ss *StateSync) downloadBlocks(bc *core.BlockChain) {
} ( ss . syncConfig . peers [ i ] , ss . stateSyncTaskQueue , bc )
}
wg . Wait ( )
Log . Info ( "[SYNC] Finished downloadBlocks." )
utils . Get LogInstance ( ) . Info ( "[SYNC] Finished downloadBlocks." )
}
// CompareBlockByHash compares two block by hash, it will be used in sort the blocks
@ -420,7 +416,7 @@ func (ss *StateSync) getMaxConsensusBlockFromParentHash(parentHash common.Hash)
return CompareBlockByHash ( candidateBlocks [ i ] , candidateBlocks [ j ] ) == - 1
} )
maxFirstID , maxCount := GetHowManyMaxConsensus ( candidateBlocks )
Log . Debug ( "[SYNC] Find block with matching parenthash" , "parentHash" , parentHash , "hash" , candidateBlocks [ maxFirstID ] . Hash ( ) , "maxCount" , maxCount )
utils . Get LogInstance ( ) . Debug ( "[SYNC] Find block with matching parenthash" , "parentHash" , parentHash , "hash" , candidateBlocks [ maxFirstID ] . Hash ( ) , "maxCount" , maxCount )
return candidateBlocks [ maxFirstID ]
}
@ -445,13 +441,13 @@ func (ss *StateSync) getBlockFromLastMileBlocksByParentHash(parentHash common.Ha
}
func ( ss * StateSync ) updateBlockAndStatus ( block * types . Block , bc * core . BlockChain , worker * worker . Worker ) bool {
Log . Info ( "[SYNC] Current Block" , "blockHex" , bc . CurrentBlock ( ) . Hash ( ) . Hex ( ) )
utils . Get LogInstance ( ) . Info ( "[SYNC] Current Block" , "blockHex" , bc . CurrentBlock ( ) . Hash ( ) . Hex ( ) )
_ , err := bc . InsertChain ( [ ] * types . Block { block } )
if err != nil {
Log . Debug ( "Error adding new block to blockchain" , "Error" , err )
utils . Get LogInstance ( ) . Debug ( "Error adding new block to blockchain" , "Error" , err )
return false
}
Log . Info ( "[SYNC] new block added to blockchain" , "blockHeight" , bc . CurrentBlock ( ) . NumberU64 ( ) , "blockHex" , bc . CurrentBlock ( ) . Hash ( ) . Hex ( ) , "parentHex" , bc . CurrentBlock ( ) . ParentHash ( ) . Hex ( ) )
utils . Get LogInstance ( ) . Info ( "[SYNC] new block added to blockchain" , "blockHeight" , bc . CurrentBlock ( ) . NumberU64 ( ) , "blockHex" , bc . CurrentBlock ( ) . Hash ( ) . Hex ( ) , "parentHex" , bc . CurrentBlock ( ) . ParentHash ( ) . Hex ( ) )
ss . syncMux . Lock ( )
worker . UpdateCurrent ( )
ss . syncMux . Unlock ( )
@ -516,10 +512,10 @@ func (ss *StateSync) StartStateSync(startHash []byte, bc *core.BlockChain, worke
ss . RegisterNodeInfo ( )
// Gets consensus hashes.
if ! ss . GetConsensusHashes ( startHash ) {
Log . Debug ( "[SYNC] StartStateSync unable to reach consensus on ss.GetConsensusHashes" )
utils . Get LogInstance ( ) . Debug ( "[SYNC] StartStateSync unable to reach consensus on ss.GetConsensusHashes" )
return
}
Log . Debug ( "[SYNC] StartStateSync reach consensus on ss.GetConsensusHashes" )
utils . Get LogInstance ( ) . Debug ( "[SYNC] StartStateSync reach consensus on ss.GetConsensusHashes" )
ss . generateStateSyncTaskQueue ( bc )
// Download blocks.
if ss . stateSyncTaskQueue . Len ( ) > 0 {
@ -543,7 +539,7 @@ func (peerConfig *SyncPeerConfig) registerToBroadcast(peerHash []byte) error {
func ( ss * StateSync ) RegisterNodeInfo ( ) int {
ss . CleanUpNilPeers ( )
registrationNumber := RegistrationNumber
Log . Debug ( "[SYNC] node registration to peers" , "registrationNumber" , registrationNumber , "activePeerNumber" , ss . activePeerNumber )
utils . Get LogInstance ( ) . Debug ( "[SYNC] node registration to peers" , "registrationNumber" , registrationNumber , "activePeerNumber" , ss . activePeerNumber )
peerID := utils . GetUniqueIDFromIPPort ( ss . selfip , ss . selfport )
peerHash := make ( [ ] byte , 4 )
binary . BigEndian . PutUint32 ( peerHash [ : ] , peerID )
@ -559,10 +555,10 @@ func (ss *StateSync) RegisterNodeInfo() int {
}
err := peerConfig . registerToBroadcast ( peerHash )
if err != nil {
Log . Debug ( "[SYNC] register failed to peer" , "ip" , peerConfig . ip , "port" , peerConfig . port , "peerHash" , peerHash )
utils . Get LogInstance ( ) . Debug ( "[SYNC] register failed to peer" , "ip" , peerConfig . ip , "port" , peerConfig . port , "peerHash" , peerHash )
continue
}
Log . Debug ( "[SYNC] register success" , "ip" , peerConfig . ip , "port" , peerConfig . port )
utils . Get LogInstance ( ) . Debug ( "[SYNC] register success" , "ip" , peerConfig . ip , "port" , peerConfig . port )
count ++
}
return count