make process state sync func to return error

pull/1789/head
Dennis Won 5 years ago
parent 6f0e40c454
commit e75b4a49e2
  1. 12
      api/service/syncing/errors.go
  2. 104
      api/service/syncing/syncing.go
  3. 2
      node/node_syncing.go

@ -4,7 +4,13 @@ import "errors"
// Errors ... // Errors ...
var ( var (
ErrRegistrationFail = errors.New("[SYNC]: registration failed") ErrRegistrationFail = errors.New("[SYNC]: registration failed")
ErrGetBlock = errors.New("[SYNC]: get block failed") ErrGetBlock = errors.New("[SYNC]: get block failed")
ErrGetBlockHash = errors.New("[SYNC]: get blockhash failed") ErrGetBlockHash = errors.New("[SYNC]: get blockhash failed")
ErrProcessStateSync = errors.New("[SYNC]: get blockhash failed")
ErrGetConsensusHashes = errors.New("[SYNC]: get consensus hashes failed")
ErrGenStateSyncTaskQueue = errors.New("[SYNC]: generate state sync task queue failed")
ErrDownloadBlocks = errors.New("[SYNC]: get download blocks failed")
ErrUpdateBlockAndStatus = errors.New("[SYNC]: update block and status failed")
ErrGenerateNewState = errors.New("[SYNC]: get generate new state failed")
) )

@ -26,13 +26,15 @@ import (
// Constants for syncing. // Constants for syncing.
const ( const (
TimesToFail = 5 // Downloadblocks service retry limit downloadBlocksRetryLimit = 5 // downloadBlocks service retry limit
RegistrationNumber = 3 TimesToFail = 5 // downloadBlocks service retry limit
SyncingPortDifference = 3000 RegistrationNumber = 3
inSyncThreshold = 0 // when peerBlockHeight - myBlockHeight <= inSyncThreshold, it's ready to join consensus SyncingPortDifference = 3000
BatchSize uint32 = 1000 //maximum size for one query of block hashes inSyncThreshold = 0 // when peerBlockHeight - myBlockHeight <= inSyncThreshold, it's ready to join consensus
SyncLoopFrequency = 1 // unit in second SyncLoopBatchSize uint32 = 1000 // maximum size for one query of block hashes
LastMileBlocksSize = 10 verifyHeaderBatchSize uint64 = 100 // block chain header verification batch size
SyncLoopFrequency = 1 // unit in second
LastMileBlocksSize = 10
) )
// SyncPeerConfig is peer config to sync. // SyncPeerConfig is peer config to sync.
@ -333,26 +335,27 @@ func (sc *SyncConfig) GetBlockHashesConsensusAndCleanUp() {
sc.cleanUpPeers(maxFirstID) sc.cleanUpPeers(maxFirstID)
} }
// GetConsensusHashes gets all hashes needed to download. // getConsensusHashes gets all hashes needed to download.
func (ss *StateSync) GetConsensusHashes(startHash []byte, size uint32) { func (ss *StateSync) getConsensusHashes(startHash []byte, size uint32) {
var wg sync.WaitGroup var wg sync.WaitGroup
ss.syncConfig.ForEachPeer(func(peerConfig *SyncPeerConfig) (brk bool) { ss.syncConfig.ForEachPeer(func(peerConfig *SyncPeerConfig) (brk bool) {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
response := peerConfig.client.GetBlockHashes(startHash, size, ss.selfip, ss.selfport) response := peerConfig.client.GetBlockHashes(startHash, size, ss.selfip, ss.selfport)
if response == nil { if response == nil {
utils.Logger().Warn(). utils.Logger().Warn().
Str("peerIP", peerConfig.ip). Str("peerIP", peerConfig.ip).
Str("peerPort", peerConfig.port). Str("peerPort", peerConfig.port).
Msg("[SYNC] GetConsensusHashes Nil Response") Msg("[SYNC] getConsensusHashes Nil Response")
return return
} }
if len(response.Payload) > int(size+1) { if len(response.Payload) > int(size+1) {
utils.Logger().Warn(). utils.Logger().Warn().
Uint32("requestSize", size). Uint32("requestSize", size).
Int("respondSize", len(response.Payload)). Int("respondSize", len(response.Payload)).
Msg("[SYNC] GetConsensusHashes: receive more blockHahses than request!") Msg("[SYNC] getConsensusHashes: receive more blockHahses than request!")
peerConfig.blockHashes = response.Payload[:size+1] peerConfig.blockHashes = response.Payload[:size+1]
} else { } else {
peerConfig.blockHashes = response.Payload peerConfig.blockHashes = response.Payload
@ -404,7 +407,7 @@ func (ss *StateSync) downloadBlocks(bc *core.BlockChain) {
if err != nil || len(payload) == 0 { if err != nil || len(payload) == 0 {
count++ count++
utils.Logger().Error().Err(err).Int("failNumber", count).Msg("[SYNC] downloadBlocks: GetBlocks failed") utils.Logger().Error().Err(err).Int("failNumber", count).Msg("[SYNC] downloadBlocks: GetBlocks failed")
if count > TimesToFail { if count > downloadBlocksRetryLimit {
break break
} }
if err := ss.stateSyncTaskQueue.Put(syncTask); err != nil { if err := ss.stateSyncTaskQueue.Put(syncTask); err != nil {
@ -424,7 +427,7 @@ func (ss *StateSync) downloadBlocks(bc *core.BlockChain) {
if err != nil { if err != nil {
count++ count++
utils.Logger().Error().Err(err).Msg("[SYNC] downloadBlocks: failed to DecodeBytes from received new block") utils.Logger().Error().Err(err).Msg("[SYNC] downloadBlocks: failed to DecodeBytes from received new block")
if count > TimesToFail { if count > downloadBlocksRetryLimit {
break break
} }
if err := ss.stateSyncTaskQueue.Put(syncTask); err != nil { if err := ss.stateSyncTaskQueue.Put(syncTask); err != nil {
@ -527,50 +530,55 @@ func (ss *StateSync) getBlockFromLastMileBlocksByParentHash(parentHash common.Ha
return nil return nil
} }
func (ss *StateSync) updateBlockAndStatus(block *types.Block, bc *core.BlockChain, worker *worker.Worker) bool { func (ss *StateSync) updateBlockAndStatus(block *types.Block, bc *core.BlockChain, worker *worker.Worker) error {
utils.Logger().Info().Str("blockHex", bc.CurrentBlock().Hash().Hex()).Msg("[SYNC] Current Block") utils.Logger().Info().Str("blockHex", bc.CurrentBlock().Hash().Hex()).Msg("[SYNC] updateBlockAndStatus: Current Block")
// Verify block signatures // Verify block signatures
if block.NumberU64() > 1 { if block.NumberU64() > 1 {
// Verify signature every 100 blocks // Verify signature every 100 blocks
verifySig := block.NumberU64()%100 == 0 verifySig := block.NumberU64()%verifyHeaderBatchSize == 0
err := bc.Engine().VerifyHeader(bc, block.Header(), verifySig) err := bc.Engine().VerifyHeader(bc, block.Header(), verifySig)
if err != nil { if err != nil {
utils.Logger().Error().Err(err).Msgf("[SYNC] failed verifying signatures for new block %d", block.NumberU64()) utils.Logger().Error().Err(err).Msgf("[SYNC] updateBlockAndStatus: failed verifying signatures for new block %d", block.NumberU64())
utils.Logger().Debug().Interface("block", bc.CurrentBlock()).Msg("[SYNC] Rolling back last 99 blocks!")
for i := 0; i < 99; i++ { utils.Logger().Debug().Interface("block", bc.CurrentBlock()).Msg("[SYNC] updateBlockAndStatus: Rolling back last 99 blocks!")
bc.Rollback([]common.Hash{bc.CurrentBlock().Hash()}) var hashes []common.Hash
for i := uint64(0); i < verifyHeaderBatchSize-1; i++ {
hashes = append(hashes, bc.CurrentBlock().Hash())
} }
return false bc.Rollback(hashes)
return err
} }
} }
_, err := bc.InsertChain([]*types.Block{block}, false /* verifyHeaders */) _, err := bc.InsertChain([]*types.Block{block}, false /* verifyHeaders */)
if err != nil { if err != nil {
utils.Logger().Error().Err(err).Msgf("[SYNC] Error adding new block to blockchain %d %d", block.NumberU64(), block.ShardID()) utils.Logger().Error().Err(err).Msgf("[SYNC] updateBlockAndStatus: Error adding new block to blockchain %d %d", block.NumberU64(), block.ShardID())
utils.Logger().Debug().Interface("block", bc.CurrentBlock()).Msg("[SYNC] Rolling back current block!") utils.Logger().Debug().Interface("block", bc.CurrentBlock()).Msg("[SYNC] updateBlockAndStatus: Rolling back current block!")
bc.Rollback([]common.Hash{bc.CurrentBlock().Hash()}) bc.Rollback([]common.Hash{bc.CurrentBlock().Hash()})
return false return err
} }
utils.Logger().Info(). utils.Logger().Info().
Uint64("blockHeight", bc.CurrentBlock().NumberU64()). Uint64("blockHeight", bc.CurrentBlock().NumberU64()).
Str("blockHex", bc.CurrentBlock().Hash().Hex()). Str("blockHex", bc.CurrentBlock().Hash().Hex()).
Msg("[SYNC] new block added to blockchain") Msg("[SYNC] updateBlockAndStatus: new block added to blockchain")
return true return nil
} }
// generateNewState will construct most recent state from downloaded blocks // generateNewState will construct most recent state from downloaded blocks
func (ss *StateSync) generateNewState(bc *core.BlockChain, worker *worker.Worker) { func (ss *StateSync) generateNewState(bc *core.BlockChain, worker *worker.Worker) error {
// update blocks created before node start sync // update blocks created before node start sync
parentHash := bc.CurrentBlock().Hash() parentHash := bc.CurrentBlock().Hash()
var err error
for { for {
block := ss.getBlockFromOldBlocksByParentHash(parentHash) block := ss.getBlockFromOldBlocksByParentHash(parentHash)
if block == nil { if block == nil {
break break
} }
ok := ss.updateBlockAndStatus(block, bc, worker) err = ss.updateBlockAndStatus(block, bc, worker)
if !ok { if err != nil {
break break
} }
parentHash = block.Hash() parentHash = block.Hash()
@ -586,8 +594,8 @@ func (ss *StateSync) generateNewState(bc *core.BlockChain, worker *worker.Worker
if block == nil { if block == nil {
break break
} }
ok := ss.updateBlockAndStatus(block, bc, worker) err = ss.updateBlockAndStatus(block, bc, worker)
if !ok { if err != nil {
break break
} }
parentHash = block.Hash() parentHash = block.Hash()
@ -607,25 +615,26 @@ func (ss *StateSync) generateNewState(bc *core.BlockChain, worker *worker.Worker
if block == nil { if block == nil {
break break
} }
ok := ss.updateBlockAndStatus(block, bc, worker) err = ss.updateBlockAndStatus(block, bc, worker)
if !ok { if err != nil {
break break
} }
parentHash = block.Hash() parentHash = block.Hash()
} }
return err
} }
// ProcessStateSync processes state sync from the blocks received but not yet processed so far // ProcessStateSync processes state sync from the blocks received but not yet processed so far
// TODO: return error func (ss *StateSync) ProcessStateSync(startHash []byte, size uint32, bc *core.BlockChain, worker *worker.Worker) error {
func (ss *StateSync) ProcessStateSync(startHash []byte, size uint32, bc *core.BlockChain, worker *worker.Worker) {
// Gets consensus hashes. // Gets consensus hashes.
ss.GetConsensusHashes(startHash, size) ss.getConsensusHashes(startHash, size)
ss.generateStateSyncTaskQueue(bc) ss.generateStateSyncTaskQueue(bc)
// Download blocks. // Download blocks.
if ss.stateSyncTaskQueue.Len() > 0 { if ss.stateSyncTaskQueue.Len() > 0 {
ss.downloadBlocks(bc) ss.downloadBlocks(bc)
} }
ss.generateNewState(bc, worker) return ss.generateNewState(bc, worker)
} }
func (peerConfig *SyncPeerConfig) registerToBroadcast(peerHash []byte, ip, port string) error { func (peerConfig *SyncPeerConfig) registerToBroadcast(peerHash []byte, ip, port string) error {
@ -738,17 +747,28 @@ Loop:
currentHeight := bc.CurrentBlock().NumberU64() currentHeight := bc.CurrentBlock().NumberU64()
if currentHeight >= otherHeight { if currentHeight >= otherHeight {
utils.Logger().Info().Msgf("[SYNC] Node is now IN SYNC! (isBeacon: %t, ShardID: %d, otherHeight: %d, currentHeight: %d)", isBeacon, bc.ShardID(), otherHeight, currentHeight) utils.Logger().Info().
Msgf("[SYNC] Node is now IN SYNC! (isBeacon: %t, ShardID: %d, otherHeight: %d, currentHeight: %d)",
isBeacon, bc.ShardID(), otherHeight, currentHeight)
break Loop break Loop
} else { } else {
utils.Logger().Debug().Msgf("[SYNC] Node is Not in Sync (isBeacon: %t, ShardID: %d, otherHeight: %d, currentHeight: %d)", isBeacon, bc.ShardID(), otherHeight, currentHeight) utils.Logger().Debug().
Msgf("[SYNC] Node is Not in Sync (isBeacon: %t, ShardID: %d, otherHeight: %d, currentHeight: %d)",
isBeacon, bc.ShardID(), otherHeight, currentHeight)
} }
startHash := bc.CurrentBlock().Hash() startHash := bc.CurrentBlock().Hash()
size := uint32(otherHeight - currentHeight) size := uint32(otherHeight - currentHeight)
if size > BatchSize { if size > SyncLoopBatchSize {
size = BatchSize size = SyncLoopBatchSize
}
err := ss.ProcessStateSync(startHash[:], size, bc, worker)
if err != nil {
utils.Logger().Error().Err(err).
Msgf("[SYNC] ProcessStateSync failed (isBeacon: %t, ShardID: %d, otherHeight: %d, currentHeight: %d)",
isBeacon, bc.ShardID(), otherHeight, currentHeight)
// should we still call UpdateConsensusInformation() upon state sync failure?
// how to handle error here?
} }
ss.ProcessStateSync(startHash[:], size, bc, worker)
ss.purgeOldBlocksFromCache() ss.purgeOldBlocksFromCache()
if consensus != nil { if consensus != nil {
consensus.UpdateConsensusInformation() consensus.UpdateConsensusInformation()

@ -332,7 +332,7 @@ func (node *Node) CalculateResponse(request *downloader_pb.DownloaderRequest, in
if request.BlockHash == nil { if request.BlockHash == nil {
return response, fmt.Errorf("[SYNC] GetBlockHashes Request BlockHash is NIL") return response, fmt.Errorf("[SYNC] GetBlockHashes Request BlockHash is NIL")
} }
if request.Size == 0 || request.Size > syncing.BatchSize { if request.Size == 0 || request.Size > syncing.SyncLoopBatchSize {
return response, fmt.Errorf("[SYNC] GetBlockHashes Request contains invalid Size %v", request.Size) return response, fmt.Errorf("[SYNC] GetBlockHashes Request contains invalid Size %v", request.Size)
} }
size := uint64(request.Size) size := uint64(request.Size)

Loading…
Cancel
Save