make process state sync func to return error

pull/1789/head
Dennis Won 5 years ago
parent 6f0e40c454
commit e75b4a49e2
  1. 12
      api/service/syncing/errors.go
  2. 104
      api/service/syncing/syncing.go
  3. 2
      node/node_syncing.go

@ -4,7 +4,13 @@ import "errors"
// Errors ...
var (
ErrRegistrationFail = errors.New("[SYNC]: registration failed")
ErrGetBlock = errors.New("[SYNC]: get block failed")
ErrGetBlockHash = errors.New("[SYNC]: get blockhash failed")
ErrRegistrationFail = errors.New("[SYNC]: registration failed")
ErrGetBlock = errors.New("[SYNC]: get block failed")
ErrGetBlockHash = errors.New("[SYNC]: get blockhash failed")
ErrProcessStateSync = errors.New("[SYNC]: get blockhash failed")
ErrGetConsensusHashes = errors.New("[SYNC]: get consensus hashes failed")
ErrGenStateSyncTaskQueue = errors.New("[SYNC]: generate state sync task queue failed")
ErrDownloadBlocks = errors.New("[SYNC]: get download blocks failed")
ErrUpdateBlockAndStatus = errors.New("[SYNC]: update block and status failed")
ErrGenerateNewState = errors.New("[SYNC]: get generate new state failed")
)

@ -26,13 +26,15 @@ import (
// Constants for syncing.
const (
TimesToFail = 5 // Downloadblocks service retry limit
RegistrationNumber = 3
SyncingPortDifference = 3000
inSyncThreshold = 0 // when peerBlockHeight - myBlockHeight <= inSyncThreshold, it's ready to join consensus
BatchSize uint32 = 1000 //maximum size for one query of block hashes
SyncLoopFrequency = 1 // unit in second
LastMileBlocksSize = 10
downloadBlocksRetryLimit = 5 // downloadBlocks service retry limit
TimesToFail = 5 // downloadBlocks service retry limit
RegistrationNumber = 3
SyncingPortDifference = 3000
inSyncThreshold = 0 // when peerBlockHeight - myBlockHeight <= inSyncThreshold, it's ready to join consensus
SyncLoopBatchSize uint32 = 1000 // maximum size for one query of block hashes
verifyHeaderBatchSize uint64 = 100 // block chain header verification batch size
SyncLoopFrequency = 1 // unit in second
LastMileBlocksSize = 10
)
// SyncPeerConfig is peer config to sync.
@ -333,26 +335,27 @@ func (sc *SyncConfig) GetBlockHashesConsensusAndCleanUp() {
sc.cleanUpPeers(maxFirstID)
}
// GetConsensusHashes gets all hashes needed to download.
func (ss *StateSync) GetConsensusHashes(startHash []byte, size uint32) {
// getConsensusHashes gets all hashes needed to download.
func (ss *StateSync) getConsensusHashes(startHash []byte, size uint32) {
var wg sync.WaitGroup
ss.syncConfig.ForEachPeer(func(peerConfig *SyncPeerConfig) (brk bool) {
wg.Add(1)
go func() {
defer wg.Done()
response := peerConfig.client.GetBlockHashes(startHash, size, ss.selfip, ss.selfport)
if response == nil {
utils.Logger().Warn().
Str("peerIP", peerConfig.ip).
Str("peerPort", peerConfig.port).
Msg("[SYNC] GetConsensusHashes Nil Response")
Msg("[SYNC] getConsensusHashes Nil Response")
return
}
if len(response.Payload) > int(size+1) {
utils.Logger().Warn().
Uint32("requestSize", size).
Int("respondSize", len(response.Payload)).
Msg("[SYNC] GetConsensusHashes: receive more blockHahses than request!")
Msg("[SYNC] getConsensusHashes: receive more blockHahses than request!")
peerConfig.blockHashes = response.Payload[:size+1]
} else {
peerConfig.blockHashes = response.Payload
@ -404,7 +407,7 @@ func (ss *StateSync) downloadBlocks(bc *core.BlockChain) {
if err != nil || len(payload) == 0 {
count++
utils.Logger().Error().Err(err).Int("failNumber", count).Msg("[SYNC] downloadBlocks: GetBlocks failed")
if count > TimesToFail {
if count > downloadBlocksRetryLimit {
break
}
if err := ss.stateSyncTaskQueue.Put(syncTask); err != nil {
@ -424,7 +427,7 @@ func (ss *StateSync) downloadBlocks(bc *core.BlockChain) {
if err != nil {
count++
utils.Logger().Error().Err(err).Msg("[SYNC] downloadBlocks: failed to DecodeBytes from received new block")
if count > TimesToFail {
if count > downloadBlocksRetryLimit {
break
}
if err := ss.stateSyncTaskQueue.Put(syncTask); err != nil {
@ -527,50 +530,55 @@ func (ss *StateSync) getBlockFromLastMileBlocksByParentHash(parentHash common.Ha
return nil
}
func (ss *StateSync) updateBlockAndStatus(block *types.Block, bc *core.BlockChain, worker *worker.Worker) bool {
utils.Logger().Info().Str("blockHex", bc.CurrentBlock().Hash().Hex()).Msg("[SYNC] Current Block")
func (ss *StateSync) updateBlockAndStatus(block *types.Block, bc *core.BlockChain, worker *worker.Worker) error {
utils.Logger().Info().Str("blockHex", bc.CurrentBlock().Hash().Hex()).Msg("[SYNC] updateBlockAndStatus: Current Block")
// Verify block signatures
if block.NumberU64() > 1 {
// Verify signature every 100 blocks
verifySig := block.NumberU64()%100 == 0
verifySig := block.NumberU64()%verifyHeaderBatchSize == 0
err := bc.Engine().VerifyHeader(bc, block.Header(), verifySig)
if err != nil {
utils.Logger().Error().Err(err).Msgf("[SYNC] failed verifying signatures for new block %d", block.NumberU64())
utils.Logger().Debug().Interface("block", bc.CurrentBlock()).Msg("[SYNC] Rolling back last 99 blocks!")
for i := 0; i < 99; i++ {
bc.Rollback([]common.Hash{bc.CurrentBlock().Hash()})
utils.Logger().Error().Err(err).Msgf("[SYNC] updateBlockAndStatus: failed verifying signatures for new block %d", block.NumberU64())
utils.Logger().Debug().Interface("block", bc.CurrentBlock()).Msg("[SYNC] updateBlockAndStatus: Rolling back last 99 blocks!")
var hashes []common.Hash
for i := uint64(0); i < verifyHeaderBatchSize-1; i++ {
hashes = append(hashes, bc.CurrentBlock().Hash())
}
return false
bc.Rollback(hashes)
return err
}
}
_, err := bc.InsertChain([]*types.Block{block}, false /* verifyHeaders */)
if err != nil {
utils.Logger().Error().Err(err).Msgf("[SYNC] Error adding new block to blockchain %d %d", block.NumberU64(), block.ShardID())
utils.Logger().Error().Err(err).Msgf("[SYNC] updateBlockAndStatus: Error adding new block to blockchain %d %d", block.NumberU64(), block.ShardID())
utils.Logger().Debug().Interface("block", bc.CurrentBlock()).Msg("[SYNC] Rolling back current block!")
utils.Logger().Debug().Interface("block", bc.CurrentBlock()).Msg("[SYNC] updateBlockAndStatus: Rolling back current block!")
bc.Rollback([]common.Hash{bc.CurrentBlock().Hash()})
return false
return err
}
utils.Logger().Info().
Uint64("blockHeight", bc.CurrentBlock().NumberU64()).
Str("blockHex", bc.CurrentBlock().Hash().Hex()).
Msg("[SYNC] new block added to blockchain")
return true
Msg("[SYNC] updateBlockAndStatus: new block added to blockchain")
return nil
}
// generateNewState will construct most recent state from downloaded blocks
func (ss *StateSync) generateNewState(bc *core.BlockChain, worker *worker.Worker) {
func (ss *StateSync) generateNewState(bc *core.BlockChain, worker *worker.Worker) error {
// update blocks created before node start sync
parentHash := bc.CurrentBlock().Hash()
var err error
for {
block := ss.getBlockFromOldBlocksByParentHash(parentHash)
if block == nil {
break
}
ok := ss.updateBlockAndStatus(block, bc, worker)
if !ok {
err = ss.updateBlockAndStatus(block, bc, worker)
if err != nil {
break
}
parentHash = block.Hash()
@ -586,8 +594,8 @@ func (ss *StateSync) generateNewState(bc *core.BlockChain, worker *worker.Worker
if block == nil {
break
}
ok := ss.updateBlockAndStatus(block, bc, worker)
if !ok {
err = ss.updateBlockAndStatus(block, bc, worker)
if err != nil {
break
}
parentHash = block.Hash()
@ -607,25 +615,26 @@ func (ss *StateSync) generateNewState(bc *core.BlockChain, worker *worker.Worker
if block == nil {
break
}
ok := ss.updateBlockAndStatus(block, bc, worker)
if !ok {
err = ss.updateBlockAndStatus(block, bc, worker)
if err != nil {
break
}
parentHash = block.Hash()
}
return err
}
// ProcessStateSync processes state sync from the blocks received but not yet processed so far
// TODO: return error
func (ss *StateSync) ProcessStateSync(startHash []byte, size uint32, bc *core.BlockChain, worker *worker.Worker) {
func (ss *StateSync) ProcessStateSync(startHash []byte, size uint32, bc *core.BlockChain, worker *worker.Worker) error {
// Gets consensus hashes.
ss.GetConsensusHashes(startHash, size)
ss.getConsensusHashes(startHash, size)
ss.generateStateSyncTaskQueue(bc)
// Download blocks.
if ss.stateSyncTaskQueue.Len() > 0 {
ss.downloadBlocks(bc)
}
ss.generateNewState(bc, worker)
return ss.generateNewState(bc, worker)
}
func (peerConfig *SyncPeerConfig) registerToBroadcast(peerHash []byte, ip, port string) error {
@ -738,17 +747,28 @@ Loop:
currentHeight := bc.CurrentBlock().NumberU64()
if currentHeight >= otherHeight {
utils.Logger().Info().Msgf("[SYNC] Node is now IN SYNC! (isBeacon: %t, ShardID: %d, otherHeight: %d, currentHeight: %d)", isBeacon, bc.ShardID(), otherHeight, currentHeight)
utils.Logger().Info().
Msgf("[SYNC] Node is now IN SYNC! (isBeacon: %t, ShardID: %d, otherHeight: %d, currentHeight: %d)",
isBeacon, bc.ShardID(), otherHeight, currentHeight)
break Loop
} else {
utils.Logger().Debug().Msgf("[SYNC] Node is Not in Sync (isBeacon: %t, ShardID: %d, otherHeight: %d, currentHeight: %d)", isBeacon, bc.ShardID(), otherHeight, currentHeight)
utils.Logger().Debug().
Msgf("[SYNC] Node is Not in Sync (isBeacon: %t, ShardID: %d, otherHeight: %d, currentHeight: %d)",
isBeacon, bc.ShardID(), otherHeight, currentHeight)
}
startHash := bc.CurrentBlock().Hash()
size := uint32(otherHeight - currentHeight)
if size > BatchSize {
size = BatchSize
if size > SyncLoopBatchSize {
size = SyncLoopBatchSize
}
err := ss.ProcessStateSync(startHash[:], size, bc, worker)
if err != nil {
utils.Logger().Error().Err(err).
Msgf("[SYNC] ProcessStateSync failed (isBeacon: %t, ShardID: %d, otherHeight: %d, currentHeight: %d)",
isBeacon, bc.ShardID(), otherHeight, currentHeight)
// should we still call UpdateConsensusInformation() upon state sync failure?
// how to handle error here?
}
ss.ProcessStateSync(startHash[:], size, bc, worker)
ss.purgeOldBlocksFromCache()
if consensus != nil {
consensus.UpdateConsensusInformation()

@ -332,7 +332,7 @@ func (node *Node) CalculateResponse(request *downloader_pb.DownloaderRequest, in
if request.BlockHash == nil {
return response, fmt.Errorf("[SYNC] GetBlockHashes Request BlockHash is NIL")
}
if request.Size == 0 || request.Size > syncing.BatchSize {
if request.Size == 0 || request.Size > syncing.SyncLoopBatchSize {
return response, fmt.Errorf("[SYNC] GetBlockHashes Request contains invalid Size %v", request.Size)
}
size := uint64(request.Size)

Loading…
Cancel
Save