@ -26,13 +26,15 @@ import (
// Constants for syncing.
// Constants for syncing.
const (
const (
TimesToFail = 5 // Downloadblocks service retry limit
downloadBlocksRetryLimit = 5 // downloadBlocks service retry limit
RegistrationNumber = 3
TimesToFail = 5 // downloadBlocks service retry limit
SyncingPortDifference = 3000
RegistrationNumber = 3
inSyncThreshold = 0 // when peerBlockHeight - myBlockHeight <= inSyncThreshold, it's ready to join consensus
SyncingPortDifference = 3000
BatchSize uint32 = 1000 //maximum size for one query of block hashes
inSyncThreshold = 0 // when peerBlockHeight - myBlockHeight <= inSyncThreshold, it's ready to join consensus
SyncLoopFrequency = 1 // unit in second
SyncLoopBatchSize uint32 = 1000 // maximum size for one query of block hashes
LastMileBlocksSize = 10
verifyHeaderBatchSize uint64 = 100 // block chain header verification batch size
SyncLoopFrequency = 1 // unit in second
LastMileBlocksSize = 10
)
)
// SyncPeerConfig is peer config to sync.
// SyncPeerConfig is peer config to sync.
@ -333,26 +335,27 @@ func (sc *SyncConfig) GetBlockHashesConsensusAndCleanUp() {
sc . cleanUpPeers ( maxFirstID )
sc . cleanUpPeers ( maxFirstID )
}
}
// G etConsensusHashes gets all hashes needed to download.
// g etConsensusHashes gets all hashes needed to download.
func ( ss * StateSync ) G etConsensusHashes( startHash [ ] byte , size uint32 ) {
func ( ss * StateSync ) g etConsensusHashes( startHash [ ] byte , size uint32 ) {
var wg sync . WaitGroup
var wg sync . WaitGroup
ss . syncConfig . ForEachPeer ( func ( peerConfig * SyncPeerConfig ) ( brk bool ) {
ss . syncConfig . ForEachPeer ( func ( peerConfig * SyncPeerConfig ) ( brk bool ) {
wg . Add ( 1 )
wg . Add ( 1 )
go func ( ) {
go func ( ) {
defer wg . Done ( )
defer wg . Done ( )
response := peerConfig . client . GetBlockHashes ( startHash , size , ss . selfip , ss . selfport )
response := peerConfig . client . GetBlockHashes ( startHash , size , ss . selfip , ss . selfport )
if response == nil {
if response == nil {
utils . Logger ( ) . Warn ( ) .
utils . Logger ( ) . Warn ( ) .
Str ( "peerIP" , peerConfig . ip ) .
Str ( "peerIP" , peerConfig . ip ) .
Str ( "peerPort" , peerConfig . port ) .
Str ( "peerPort" , peerConfig . port ) .
Msg ( "[SYNC] G etConsensusHashes Nil Response" )
Msg ( "[SYNC] g etConsensusHashes Nil Response" )
return
return
}
}
if len ( response . Payload ) > int ( size + 1 ) {
if len ( response . Payload ) > int ( size + 1 ) {
utils . Logger ( ) . Warn ( ) .
utils . Logger ( ) . Warn ( ) .
Uint32 ( "requestSize" , size ) .
Uint32 ( "requestSize" , size ) .
Int ( "respondSize" , len ( response . Payload ) ) .
Int ( "respondSize" , len ( response . Payload ) ) .
Msg ( "[SYNC] G etConsensusHashes: receive more blockHahses than request!" )
Msg ( "[SYNC] g etConsensusHashes: receive more blockHahses than request!" )
peerConfig . blockHashes = response . Payload [ : size + 1 ]
peerConfig . blockHashes = response . Payload [ : size + 1 ]
} else {
} else {
peerConfig . blockHashes = response . Payload
peerConfig . blockHashes = response . Payload
@ -404,7 +407,7 @@ func (ss *StateSync) downloadBlocks(bc *core.BlockChain) {
if err != nil || len ( payload ) == 0 {
if err != nil || len ( payload ) == 0 {
count ++
count ++
utils . Logger ( ) . Error ( ) . Err ( err ) . Int ( "failNumber" , count ) . Msg ( "[SYNC] downloadBlocks: GetBlocks failed" )
utils . Logger ( ) . Error ( ) . Err ( err ) . Int ( "failNumber" , count ) . Msg ( "[SYNC] downloadBlocks: GetBlocks failed" )
if count > TimesToFail {
if count > downloadBlocksRetryLimit {
break
break
}
}
if err := ss . stateSyncTaskQueue . Put ( syncTask ) ; err != nil {
if err := ss . stateSyncTaskQueue . Put ( syncTask ) ; err != nil {
@ -424,7 +427,7 @@ func (ss *StateSync) downloadBlocks(bc *core.BlockChain) {
if err != nil {
if err != nil {
count ++
count ++
utils . Logger ( ) . Error ( ) . Err ( err ) . Msg ( "[SYNC] downloadBlocks: failed to DecodeBytes from received new block" )
utils . Logger ( ) . Error ( ) . Err ( err ) . Msg ( "[SYNC] downloadBlocks: failed to DecodeBytes from received new block" )
if count > TimesToFail {
if count > downloadBlocksRetryLimit {
break
break
}
}
if err := ss . stateSyncTaskQueue . Put ( syncTask ) ; err != nil {
if err := ss . stateSyncTaskQueue . Put ( syncTask ) ; err != nil {
@ -527,50 +530,55 @@ func (ss *StateSync) getBlockFromLastMileBlocksByParentHash(parentHash common.Ha
return nil
return nil
}
}
func ( ss * StateSync ) updateBlockAndStatus ( block * types . Block , bc * core . BlockChain , worker * worker . Worker ) bool {
func ( ss * StateSync ) updateBlockAndStatus ( block * types . Block , bc * core . BlockChain , worker * worker . Worker ) error {
utils . Logger ( ) . Info ( ) . Str ( "blockHex" , bc . CurrentBlock ( ) . Hash ( ) . Hex ( ) ) . Msg ( "[SYNC] Current Block" )
utils . Logger ( ) . Info ( ) . Str ( "blockHex" , bc . CurrentBlock ( ) . Hash ( ) . Hex ( ) ) . Msg ( "[SYNC] updateBlockAndStatus: Current Block" )
// Verify block signatures
// Verify block signatures
if block . NumberU64 ( ) > 1 {
if block . NumberU64 ( ) > 1 {
// Verify signature every 100 blocks
// Verify signature every 100 blocks
verifySig := block . NumberU64 ( ) % 100 == 0
verifySig := block . NumberU64 ( ) % verifyHeaderBatchSize == 0
err := bc . Engine ( ) . VerifyHeader ( bc , block . Header ( ) , verifySig )
err := bc . Engine ( ) . VerifyHeader ( bc , block . Header ( ) , verifySig )
if err != nil {
if err != nil {
utils . Logger ( ) . Error ( ) . Err ( err ) . Msgf ( "[SYNC] failed verifying signatures for new block %d" , block . NumberU64 ( ) )
utils . Logger ( ) . Error ( ) . Err ( err ) . Msgf ( "[SYNC] updateBlockAndStatus: failed verifying signatures for new block %d" , block . NumberU64 ( ) )
utils . Logger ( ) . Debug ( ) . Interface ( "block" , bc . CurrentBlock ( ) ) . Msg ( "[SYNC] Rolling back last 99 blocks!" )
for i := 0 ; i < 99 ; i ++ {
utils . Logger ( ) . Debug ( ) . Interface ( "block" , bc . CurrentBlock ( ) ) . Msg ( "[SYNC] updateBlockAndStatus: Rolling back last 99 blocks!" )
bc . Rollback ( [ ] common . Hash { bc . CurrentBlock ( ) . Hash ( ) } )
var hashes [ ] common . Hash
for i := uint64 ( 0 ) ; i < verifyHeaderBatchSize - 1 ; i ++ {
hashes = append ( hashes , bc . CurrentBlock ( ) . Hash ( ) )
}
}
return false
bc . Rollback ( hashes )
return err
}
}
}
}
_ , err := bc . InsertChain ( [ ] * types . Block { block } , false /* verifyHeaders */ )
_ , err := bc . InsertChain ( [ ] * types . Block { block } , false /* verifyHeaders */ )
if err != nil {
if err != nil {
utils . Logger ( ) . Error ( ) . Err ( err ) . Msgf ( "[SYNC] Error adding new block to blockchain %d %d" , block . NumberU64 ( ) , block . ShardID ( ) )
utils . Logger ( ) . Error ( ) . Err ( err ) . Msgf ( "[SYNC] updateBlockAndStatus: Error adding new block to blockchain %d %d" , block . NumberU64 ( ) , block . ShardID ( ) )
utils . Logger ( ) . Debug ( ) . Interface ( "block" , bc . CurrentBlock ( ) ) . Msg ( "[SYNC] Rolling back current block!" )
utils . Logger ( ) . Debug ( ) . Interface ( "block" , bc . CurrentBlock ( ) ) . Msg ( "[SYNC] updateBlockAndStatus: Rolling back current block!" )
bc . Rollback ( [ ] common . Hash { bc . CurrentBlock ( ) . Hash ( ) } )
bc . Rollback ( [ ] common . Hash { bc . CurrentBlock ( ) . Hash ( ) } )
return false
return err
}
}
utils . Logger ( ) . Info ( ) .
utils . Logger ( ) . Info ( ) .
Uint64 ( "blockHeight" , bc . CurrentBlock ( ) . NumberU64 ( ) ) .
Uint64 ( "blockHeight" , bc . CurrentBlock ( ) . NumberU64 ( ) ) .
Str ( "blockHex" , bc . CurrentBlock ( ) . Hash ( ) . Hex ( ) ) .
Str ( "blockHex" , bc . CurrentBlock ( ) . Hash ( ) . Hex ( ) ) .
Msg ( "[SYNC] new block added to blockchain" )
Msg ( "[SYNC] updateBlockAndStatus: new block added to blockchain" )
return true
return nil
}
}
// generateNewState will construct most recent state from downloaded blocks
// generateNewState will construct most recent state from downloaded blocks
func ( ss * StateSync ) generateNewState ( bc * core . BlockChain , worker * worker . Worker ) {
func ( ss * StateSync ) generateNewState ( bc * core . BlockChain , worker * worker . Worker ) error {
// update blocks created before node start sync
// update blocks created before node start sync
parentHash := bc . CurrentBlock ( ) . Hash ( )
parentHash := bc . CurrentBlock ( ) . Hash ( )
var err error
for {
for {
block := ss . getBlockFromOldBlocksByParentHash ( parentHash )
block := ss . getBlockFromOldBlocksByParentHash ( parentHash )
if block == nil {
if block == nil {
break
break
}
}
ok : = ss . updateBlockAndStatus ( block , bc , worker )
err = ss . updateBlockAndStatus ( block , bc , worker )
if ! ok {
if err != nil {
break
break
}
}
parentHash = block . Hash ( )
parentHash = block . Hash ( )
@ -586,8 +594,8 @@ func (ss *StateSync) generateNewState(bc *core.BlockChain, worker *worker.Worker
if block == nil {
if block == nil {
break
break
}
}
ok : = ss . updateBlockAndStatus ( block , bc , worker )
err = ss . updateBlockAndStatus ( block , bc , worker )
if ! ok {
if err != nil {
break
break
}
}
parentHash = block . Hash ( )
parentHash = block . Hash ( )
@ -607,25 +615,26 @@ func (ss *StateSync) generateNewState(bc *core.BlockChain, worker *worker.Worker
if block == nil {
if block == nil {
break
break
}
}
ok : = ss . updateBlockAndStatus ( block , bc , worker )
err = ss . updateBlockAndStatus ( block , bc , worker )
if ! ok {
if err != nil {
break
break
}
}
parentHash = block . Hash ( )
parentHash = block . Hash ( )
}
}
return err
}
}
// ProcessStateSync processes state sync from the blocks received but not yet processed so far
// ProcessStateSync processes state sync from the blocks received but not yet processed so far
// TODO: return error
func ( ss * StateSync ) ProcessStateSync ( startHash [ ] byte , size uint32 , bc * core . BlockChain , worker * worker . Worker ) error {
func ( ss * StateSync ) ProcessStateSync ( startHash [ ] byte , size uint32 , bc * core . BlockChain , worker * worker . Worker ) {
// Gets consensus hashes.
// Gets consensus hashes.
ss . G etConsensusHashes( startHash , size )
ss . g etConsensusHashes( startHash , size )
ss . generateStateSyncTaskQueue ( bc )
ss . generateStateSyncTaskQueue ( bc )
// Download blocks.
// Download blocks.
if ss . stateSyncTaskQueue . Len ( ) > 0 {
if ss . stateSyncTaskQueue . Len ( ) > 0 {
ss . downloadBlocks ( bc )
ss . downloadBlocks ( bc )
}
}
ss . generateNewState ( bc , worker )
return ss . generateNewState ( bc , worker )
}
}
func ( peerConfig * SyncPeerConfig ) registerToBroadcast ( peerHash [ ] byte , ip , port string ) error {
func ( peerConfig * SyncPeerConfig ) registerToBroadcast ( peerHash [ ] byte , ip , port string ) error {
@ -738,17 +747,28 @@ Loop:
currentHeight := bc . CurrentBlock ( ) . NumberU64 ( )
currentHeight := bc . CurrentBlock ( ) . NumberU64 ( )
if currentHeight >= otherHeight {
if currentHeight >= otherHeight {
utils . Logger ( ) . Info ( ) . Msgf ( "[SYNC] Node is now IN SYNC! (isBeacon: %t, ShardID: %d, otherHeight: %d, currentHeight: %d)" , isBeacon , bc . ShardID ( ) , otherHeight , currentHeight )
utils . Logger ( ) . Info ( ) .
Msgf ( "[SYNC] Node is now IN SYNC! (isBeacon: %t, ShardID: %d, otherHeight: %d, currentHeight: %d)" ,
isBeacon , bc . ShardID ( ) , otherHeight , currentHeight )
break Loop
break Loop
} else {
} else {
utils . Logger ( ) . Debug ( ) . Msgf ( "[SYNC] Node is Not in Sync (isBeacon: %t, ShardID: %d, otherHeight: %d, currentHeight: %d)" , isBeacon , bc . ShardID ( ) , otherHeight , currentHeight )
utils . Logger ( ) . Debug ( ) .
Msgf ( "[SYNC] Node is Not in Sync (isBeacon: %t, ShardID: %d, otherHeight: %d, currentHeight: %d)" ,
isBeacon , bc . ShardID ( ) , otherHeight , currentHeight )
}
}
startHash := bc . CurrentBlock ( ) . Hash ( )
startHash := bc . CurrentBlock ( ) . Hash ( )
size := uint32 ( otherHeight - currentHeight )
size := uint32 ( otherHeight - currentHeight )
if size > BatchSize {
if size > SyncLoopBatchSize {
size = BatchSize
size = SyncLoopBatchSize
}
err := ss . ProcessStateSync ( startHash [ : ] , size , bc , worker )
if err != nil {
utils . Logger ( ) . Error ( ) . Err ( err ) .
Msgf ( "[SYNC] ProcessStateSync failed (isBeacon: %t, ShardID: %d, otherHeight: %d, currentHeight: %d)" ,
isBeacon , bc . ShardID ( ) , otherHeight , currentHeight )
// should we still call UpdateConsensusInformation() upon state sync failure?
// how to handle error here?
}
}
ss . ProcessStateSync ( startHash [ : ] , size , bc , worker )
ss . purgeOldBlocksFromCache ( )
ss . purgeOldBlocksFromCache ( )
if consensus != nil {
if consensus != nil {
consensus . UpdateConsensusInformation ( )
consensus . UpdateConsensusInformation ( )