From e75b4a49e290ee851ee617b3d5e886f6ebee575c Mon Sep 17 00:00:00 2001 From: Dennis Won Date: Wed, 30 Oct 2019 21:54:51 +0000 Subject: [PATCH] make process state sync func to return error --- api/service/syncing/errors.go | 12 +++- api/service/syncing/syncing.go | 104 ++++++++++++++++++++------------- node/node_syncing.go | 2 +- 3 files changed, 72 insertions(+), 46 deletions(-) diff --git a/api/service/syncing/errors.go b/api/service/syncing/errors.go index a70193583..ae8c7dae9 100644 --- a/api/service/syncing/errors.go +++ b/api/service/syncing/errors.go @@ -4,7 +4,13 @@ import "errors" // Errors ... var ( - ErrRegistrationFail = errors.New("[SYNC]: registration failed") - ErrGetBlock = errors.New("[SYNC]: get block failed") - ErrGetBlockHash = errors.New("[SYNC]: get blockhash failed") + ErrRegistrationFail = errors.New("[SYNC]: registration failed") + ErrGetBlock = errors.New("[SYNC]: get block failed") + ErrGetBlockHash = errors.New("[SYNC]: get blockhash failed") + ErrProcessStateSync = errors.New("[SYNC]: get blockhash failed") + ErrGetConsensusHashes = errors.New("[SYNC]: get consensus hashes failed") + ErrGenStateSyncTaskQueue = errors.New("[SYNC]: generate state sync task queue failed") + ErrDownloadBlocks = errors.New("[SYNC]: get download blocks failed") + ErrUpdateBlockAndStatus = errors.New("[SYNC]: update block and status failed") + ErrGenerateNewState = errors.New("[SYNC]: get generate new state failed") ) diff --git a/api/service/syncing/syncing.go b/api/service/syncing/syncing.go index 45f73dca7..6c1a849a7 100644 --- a/api/service/syncing/syncing.go +++ b/api/service/syncing/syncing.go @@ -26,13 +26,15 @@ import ( // Constants for syncing. const ( - TimesToFail = 5 // Downloadblocks service retry limit - RegistrationNumber = 3 - SyncingPortDifference = 3000 - inSyncThreshold = 0 // when peerBlockHeight - myBlockHeight <= inSyncThreshold, it's ready to join consensus - BatchSize uint32 = 1000 //maximum size for one query of block hashes - SyncLoopFrequency = 1 // unit in second - LastMileBlocksSize = 10 + downloadBlocksRetryLimit = 5 // downloadBlocks service retry limit + TimesToFail = 5 // downloadBlocks service retry limit + RegistrationNumber = 3 + SyncingPortDifference = 3000 + inSyncThreshold = 0 // when peerBlockHeight - myBlockHeight <= inSyncThreshold, it's ready to join consensus + SyncLoopBatchSize uint32 = 1000 // maximum size for one query of block hashes + verifyHeaderBatchSize uint64 = 100 // block chain header verification batch size + SyncLoopFrequency = 1 // unit in second + LastMileBlocksSize = 10 ) // SyncPeerConfig is peer config to sync. @@ -333,26 +335,27 @@ func (sc *SyncConfig) GetBlockHashesConsensusAndCleanUp() { sc.cleanUpPeers(maxFirstID) } -// GetConsensusHashes gets all hashes needed to download. -func (ss *StateSync) GetConsensusHashes(startHash []byte, size uint32) { +// getConsensusHashes gets all hashes needed to download. +func (ss *StateSync) getConsensusHashes(startHash []byte, size uint32) { var wg sync.WaitGroup ss.syncConfig.ForEachPeer(func(peerConfig *SyncPeerConfig) (brk bool) { wg.Add(1) go func() { defer wg.Done() + response := peerConfig.client.GetBlockHashes(startHash, size, ss.selfip, ss.selfport) if response == nil { utils.Logger().Warn(). Str("peerIP", peerConfig.ip). Str("peerPort", peerConfig.port). - Msg("[SYNC] GetConsensusHashes Nil Response") + Msg("[SYNC] getConsensusHashes Nil Response") return } if len(response.Payload) > int(size+1) { utils.Logger().Warn(). Uint32("requestSize", size). Int("respondSize", len(response.Payload)). - Msg("[SYNC] GetConsensusHashes: receive more blockHahses than request!") + Msg("[SYNC] getConsensusHashes: receive more blockHahses than request!") peerConfig.blockHashes = response.Payload[:size+1] } else { peerConfig.blockHashes = response.Payload @@ -404,7 +407,7 @@ func (ss *StateSync) downloadBlocks(bc *core.BlockChain) { if err != nil || len(payload) == 0 { count++ utils.Logger().Error().Err(err).Int("failNumber", count).Msg("[SYNC] downloadBlocks: GetBlocks failed") - if count > TimesToFail { + if count > downloadBlocksRetryLimit { break } if err := ss.stateSyncTaskQueue.Put(syncTask); err != nil { @@ -424,7 +427,7 @@ func (ss *StateSync) downloadBlocks(bc *core.BlockChain) { if err != nil { count++ utils.Logger().Error().Err(err).Msg("[SYNC] downloadBlocks: failed to DecodeBytes from received new block") - if count > TimesToFail { + if count > downloadBlocksRetryLimit { break } if err := ss.stateSyncTaskQueue.Put(syncTask); err != nil { @@ -527,50 +530,55 @@ func (ss *StateSync) getBlockFromLastMileBlocksByParentHash(parentHash common.Ha return nil } -func (ss *StateSync) updateBlockAndStatus(block *types.Block, bc *core.BlockChain, worker *worker.Worker) bool { - utils.Logger().Info().Str("blockHex", bc.CurrentBlock().Hash().Hex()).Msg("[SYNC] Current Block") +func (ss *StateSync) updateBlockAndStatus(block *types.Block, bc *core.BlockChain, worker *worker.Worker) error { + utils.Logger().Info().Str("blockHex", bc.CurrentBlock().Hash().Hex()).Msg("[SYNC] updateBlockAndStatus: Current Block") // Verify block signatures if block.NumberU64() > 1 { // Verify signature every 100 blocks - verifySig := block.NumberU64()%100 == 0 + verifySig := block.NumberU64()%verifyHeaderBatchSize == 0 err := bc.Engine().VerifyHeader(bc, block.Header(), verifySig) if err != nil { - utils.Logger().Error().Err(err).Msgf("[SYNC] failed verifying signatures for new block %d", block.NumberU64()) - utils.Logger().Debug().Interface("block", bc.CurrentBlock()).Msg("[SYNC] Rolling back last 99 blocks!") - for i := 0; i < 99; i++ { - bc.Rollback([]common.Hash{bc.CurrentBlock().Hash()}) + utils.Logger().Error().Err(err).Msgf("[SYNC] updateBlockAndStatus: failed verifying signatures for new block %d", block.NumberU64()) + + utils.Logger().Debug().Interface("block", bc.CurrentBlock()).Msg("[SYNC] updateBlockAndStatus: Rolling back last 99 blocks!") + var hashes []common.Hash + for i := uint64(0); i < verifyHeaderBatchSize-1; i++ { + hashes = append(hashes, bc.CurrentBlock().Hash()) } - return false + bc.Rollback(hashes) + return err } } _, err := bc.InsertChain([]*types.Block{block}, false /* verifyHeaders */) if err != nil { - utils.Logger().Error().Err(err).Msgf("[SYNC] Error adding new block to blockchain %d %d", block.NumberU64(), block.ShardID()) + utils.Logger().Error().Err(err).Msgf("[SYNC] updateBlockAndStatus: Error adding new block to blockchain %d %d", block.NumberU64(), block.ShardID()) - utils.Logger().Debug().Interface("block", bc.CurrentBlock()).Msg("[SYNC] Rolling back current block!") + utils.Logger().Debug().Interface("block", bc.CurrentBlock()).Msg("[SYNC] updateBlockAndStatus: Rolling back current block!") bc.Rollback([]common.Hash{bc.CurrentBlock().Hash()}) - return false + return err } utils.Logger().Info(). Uint64("blockHeight", bc.CurrentBlock().NumberU64()). Str("blockHex", bc.CurrentBlock().Hash().Hex()). - Msg("[SYNC] new block added to blockchain") - return true + Msg("[SYNC] updateBlockAndStatus: new block added to blockchain") + return nil } // generateNewState will construct most recent state from downloaded blocks -func (ss *StateSync) generateNewState(bc *core.BlockChain, worker *worker.Worker) { +func (ss *StateSync) generateNewState(bc *core.BlockChain, worker *worker.Worker) error { // update blocks created before node start sync parentHash := bc.CurrentBlock().Hash() + + var err error for { block := ss.getBlockFromOldBlocksByParentHash(parentHash) if block == nil { break } - ok := ss.updateBlockAndStatus(block, bc, worker) - if !ok { + err = ss.updateBlockAndStatus(block, bc, worker) + if err != nil { break } parentHash = block.Hash() @@ -586,8 +594,8 @@ func (ss *StateSync) generateNewState(bc *core.BlockChain, worker *worker.Worker if block == nil { break } - ok := ss.updateBlockAndStatus(block, bc, worker) - if !ok { + err = ss.updateBlockAndStatus(block, bc, worker) + if err != nil { break } parentHash = block.Hash() @@ -607,25 +615,26 @@ func (ss *StateSync) generateNewState(bc *core.BlockChain, worker *worker.Worker if block == nil { break } - ok := ss.updateBlockAndStatus(block, bc, worker) - if !ok { + err = ss.updateBlockAndStatus(block, bc, worker) + if err != nil { break } parentHash = block.Hash() } + + return err } // ProcessStateSync processes state sync from the blocks received but not yet processed so far -// TODO: return error -func (ss *StateSync) ProcessStateSync(startHash []byte, size uint32, bc *core.BlockChain, worker *worker.Worker) { +func (ss *StateSync) ProcessStateSync(startHash []byte, size uint32, bc *core.BlockChain, worker *worker.Worker) error { // Gets consensus hashes. - ss.GetConsensusHashes(startHash, size) + ss.getConsensusHashes(startHash, size) ss.generateStateSyncTaskQueue(bc) // Download blocks. if ss.stateSyncTaskQueue.Len() > 0 { ss.downloadBlocks(bc) } - ss.generateNewState(bc, worker) + return ss.generateNewState(bc, worker) } func (peerConfig *SyncPeerConfig) registerToBroadcast(peerHash []byte, ip, port string) error { @@ -738,17 +747,28 @@ Loop: currentHeight := bc.CurrentBlock().NumberU64() if currentHeight >= otherHeight { - utils.Logger().Info().Msgf("[SYNC] Node is now IN SYNC! (isBeacon: %t, ShardID: %d, otherHeight: %d, currentHeight: %d)", isBeacon, bc.ShardID(), otherHeight, currentHeight) + utils.Logger().Info(). + Msgf("[SYNC] Node is now IN SYNC! (isBeacon: %t, ShardID: %d, otherHeight: %d, currentHeight: %d)", + isBeacon, bc.ShardID(), otherHeight, currentHeight) break Loop } else { - utils.Logger().Debug().Msgf("[SYNC] Node is Not in Sync (isBeacon: %t, ShardID: %d, otherHeight: %d, currentHeight: %d)", isBeacon, bc.ShardID(), otherHeight, currentHeight) + utils.Logger().Debug(). + Msgf("[SYNC] Node is Not in Sync (isBeacon: %t, ShardID: %d, otherHeight: %d, currentHeight: %d)", + isBeacon, bc.ShardID(), otherHeight, currentHeight) } startHash := bc.CurrentBlock().Hash() size := uint32(otherHeight - currentHeight) - if size > BatchSize { - size = BatchSize + if size > SyncLoopBatchSize { + size = SyncLoopBatchSize + } + err := ss.ProcessStateSync(startHash[:], size, bc, worker) + if err != nil { + utils.Logger().Error().Err(err). + Msgf("[SYNC] ProcessStateSync failed (isBeacon: %t, ShardID: %d, otherHeight: %d, currentHeight: %d)", + isBeacon, bc.ShardID(), otherHeight, currentHeight) + // should we still call UpdateConsensusInformation() upon state sync failure? + // how to handle error here? } - ss.ProcessStateSync(startHash[:], size, bc, worker) ss.purgeOldBlocksFromCache() if consensus != nil { consensus.UpdateConsensusInformation() diff --git a/node/node_syncing.go b/node/node_syncing.go index a1fdf67c4..359496716 100644 --- a/node/node_syncing.go +++ b/node/node_syncing.go @@ -332,7 +332,7 @@ func (node *Node) CalculateResponse(request *downloader_pb.DownloaderRequest, in if request.BlockHash == nil { return response, fmt.Errorf("[SYNC] GetBlockHashes Request BlockHash is NIL") } - if request.Size == 0 || request.Size > syncing.BatchSize { + if request.Size == 0 || request.Size > syncing.SyncLoopBatchSize { return response, fmt.Errorf("[SYNC] GetBlockHashes Request contains invalid Size %v", request.Size) } size := uint64(request.Size)