[StateSync] add retries to process state sync where failed peer config is excluded on the next try (#1835)

* add retries to process state sync where failed peer config is excluded on the next try

* Updated per @fxfactorial 's review comments.

* use anonymous function as a callback for iteration
pull/2161/head
Jong Hyuck Won 5 years ago committed by Edgar Aroutiounian
parent b42b3d3d91
commit 973ac15c8b
  1. 90
      api/service/syncing/syncing.go
  2. 10
      api/service/syncing/syncing_test.go

@ -29,7 +29,7 @@ import (
// Constants for syncing.
const (
downloadBlocksRetryLimit = 5 // downloadBlocks service retry limit
TimesToFail = 5 // downloadBlocks service retry limit
stateSyncRetryLimit = 5 // ProcessStateSync retry limit
RegistrationNumber = 3
SyncingPortDifference = 3000
inSyncThreshold = 0 // when peerBlockHeight - myBlockHeight <= inSyncThreshold, it's ready to join consensus
@ -66,8 +66,10 @@ type SyncConfig struct {
// mtx locks peers, and *SyncPeerConfig pointers in peers.
// SyncPeerConfig itself is guarded by its own mutex.
mtx sync.RWMutex
peers []*SyncPeerConfig
// failedPeers contains the sync peer config that had been chosen
// as the max consensus sync peer config but failed during chain insertion
failedPeers []*SyncPeerConfig
peers []*SyncPeerConfig
}
// AddPeer adds the given sync peer.
@ -199,8 +201,8 @@ func CreateTestSyncPeerConfig(client *downloader.Client, blockHashes [][]byte) *
}
}
// CompareSyncPeerConfigByblockHashes compares two SyncPeerConfig by blockHashes.
func CompareSyncPeerConfigByblockHashes(a *SyncPeerConfig, b *SyncPeerConfig) int {
// CompareSyncPeerConfigByBlockHashes compares two SyncPeerConfig by blockHashes.
func CompareSyncPeerConfigByBlockHashes(a *SyncPeerConfig, b *SyncPeerConfig) int {
if len(a.blockHashes) != len(b.blockHashes) {
if len(a.blockHashes) < len(b.blockHashes) {
return -1
@ -283,7 +285,19 @@ func (sc *SyncConfig) getHowManyMaxConsensus() (int, int) {
maxCount := 0
maxFirstID := -1
for i := range sc.peers {
if curFirstID == -1 || CompareSyncPeerConfigByblockHashes(sc.peers[curFirstID], sc.peers[i]) != 0 {
// skip if among the previously failed peer config
skip := false
for j := range sc.failedPeers {
if CompareSyncPeerConfigByBlockHashes(sc.peers[i], sc.failedPeers[j]) == 0 {
skip = true
break
}
}
if skip {
continue
}
if curFirstID == -1 || CompareSyncPeerConfigByBlockHashes(sc.peers[curFirstID], sc.peers[i]) != 0 {
curCount = 1
curFirstID = i
} else {
@ -312,10 +326,10 @@ func (sc *SyncConfig) InitForTesting(client *downloader.Client, blockHashes [][]
func (sc *SyncConfig) cleanUpPeers(maxFirstID int) {
fixedPeer := sc.peers[maxFirstID]
for i := 0; i < len(sc.peers); i++ {
if CompareSyncPeerConfigByblockHashes(fixedPeer, sc.peers[i]) != 0 {
if CompareSyncPeerConfigByBlockHashes(fixedPeer, sc.peers[i]) != 0 {
// TODO: move it into a util delete func.
// See tip https://github.com/golang/go/wiki/SliceTricks
// Close the client and remove the peer out of the
// Close the client and remove the peer out of the peers set
sc.peers[i].client.Close()
copy(sc.peers[i:], sc.peers[i+1:])
sc.peers[len(sc.peers)-1] = nil
@ -333,7 +347,7 @@ func (sc *SyncConfig) GetBlockHashesConsensusAndCleanUp() {
defer sc.mtx.Unlock()
// Sort all peers by the blockHashes.
sort.Slice(sc.peers, func(i, j int) bool {
return CompareSyncPeerConfigByblockHashes(sc.peers[i], sc.peers[j]) == -1
return CompareSyncPeerConfigByBlockHashes(sc.peers[i], sc.peers[j]) == -1
})
maxFirstID, maxCount := sc.getHowManyMaxConsensus()
utils.Logger().Info().
@ -770,28 +784,62 @@ Loop:
currentHeight := bc.CurrentBlock().NumberU64()
if currentHeight >= otherHeight {
utils.Logger().Info().
Msgf("[SYNC] Node is now IN SYNC! (isBeacon: %t, ShardID: %d, otherHeight: %d, currentHeight: %d)",
isBeacon, bc.ShardID(), otherHeight, currentHeight)
utils.Logger().Debug().
Bool("isBeacon", isBeacon).
Uint32("ShardID", bc.ShardID()).
Uint64("otherHeight", otherHeight).
Uint64("currentHeight", currentHeight).
Msg("[SYNC] Node is now IN SYNC!")
break Loop
} else {
utils.Logger().Debug().
Msgf("[SYNC] Node is Not in Sync (isBeacon: %t, ShardID: %d, otherHeight: %d, currentHeight: %d)",
isBeacon, bc.ShardID(), otherHeight, currentHeight)
Bool("isBeacon", isBeacon).
Uint32("ShardID", bc.ShardID()).
Uint64("otherHeight", otherHeight).
Uint64("currentHeight", currentHeight).
Msg("[SYNC] Node is Not in Sync.")
}
startHash := bc.CurrentBlock().Hash()
size := uint32(otherHeight - currentHeight)
if size > SyncLoopBatchSize {
size = SyncLoopBatchSize
}
err := ss.ProcessStateSync(startHash[:], size, bc, worker)
if err != nil {
utils.Logger().Error().Err(err).
Msgf("[SYNC] ProcessStateSync failed (isBeacon: %t, ShardID: %d, otherHeight: %d, currentHeight: %d)",
isBeacon, bc.ShardID(), otherHeight, currentHeight)
// should we still call UpdateConsensusInformation() upon state sync failure?
// how to handle error here?
retryCount := 0
var err error
for {
err = ss.ProcessStateSync(startHash[:], size, bc, worker)
if err == nil {
break
}
if retryCount >= stateSyncRetryLimit {
utils.Logger().Warn().Err(err).
Bool("isBeacon", isBeacon).
Uint32("ShardID", bc.ShardID()).
Uint64("otherHeight", otherHeight).
Uint64("currentHeight", currentHeight).
Msg("[SYNC] ProcessStateSync failed after retries. Node failed to sync.")
break
}
utils.Logger().Warn().Err(err).
Bool("isBeacon", isBeacon).
Uint32("ShardID", bc.ShardID()).
Uint64("otherHeight", otherHeight).
Uint64("currentHeight", currentHeight).
Int("retryCount", retryCount).
Msg("[SYNC] ProcessStateSync failed. Retrying ...")
// Track the peer sync config(s) that failed node sync here to exclude it on successive retries
trackFailedPeer := func(configPeer *SyncPeerConfig) (brk bool) {
ss.syncConfig.failedPeers = append(ss.syncConfig.failedPeers, configPeer)
brk = true
return
}
ss.syncConfig.ForEachPeer(trackFailedPeer)
retryCount++
}
ss.purgeOldBlocksFromCache()
if consensus != nil {
consensus.UpdateConsensusInformation()

@ -16,7 +16,7 @@ func TestCreateTestSyncPeerConfig(t *testing.T) {
}
// Simple test for IncorrectResponse
func TestCompareSyncPeerConfigByblockHashes(t *testing.T) {
func TestCompareSyncPeerConfigByBlockHashes(t *testing.T) {
client := &downloader.Client{}
blockHashes1 := [][]byte{{1, 2, 3}}
syncPeerConfig1 := CreateTestSyncPeerConfig(client, blockHashes1)
@ -24,19 +24,19 @@ func TestCompareSyncPeerConfigByblockHashes(t *testing.T) {
syncPeerConfig2 := CreateTestSyncPeerConfig(client, blockHashes2)
// syncPeerConfig1 is less than syncPeerConfig2
assert.Equal(t, CompareSyncPeerConfigByblockHashes(syncPeerConfig1, syncPeerConfig2), -1, "syncPeerConfig1 is less than syncPeerConfig2")
assert.Equal(t, CompareSyncPeerConfigByBlockHashes(syncPeerConfig1, syncPeerConfig2), -1, "syncPeerConfig1 is less than syncPeerConfig2")
// syncPeerConfig1 is greater than syncPeerConfig2
blockHashes1[0][2] = 5
assert.Equal(t, CompareSyncPeerConfigByblockHashes(syncPeerConfig1, syncPeerConfig2), 1, "syncPeerConfig1 is greater than syncPeerConfig2")
assert.Equal(t, CompareSyncPeerConfigByBlockHashes(syncPeerConfig1, syncPeerConfig2), 1, "syncPeerConfig1 is greater than syncPeerConfig2")
// syncPeerConfig1 is equal to syncPeerConfig2
blockHashes1[0][2] = 4
assert.Equal(t, CompareSyncPeerConfigByblockHashes(syncPeerConfig1, syncPeerConfig2), 0, "syncPeerConfig1 is equal to syncPeerConfig2")
assert.Equal(t, CompareSyncPeerConfigByBlockHashes(syncPeerConfig1, syncPeerConfig2), 0, "syncPeerConfig1 is equal to syncPeerConfig2")
// syncPeerConfig1 is less than syncPeerConfig2
blockHashes1 = blockHashes1[:1]
assert.Equal(t, CompareSyncPeerConfigByblockHashes(syncPeerConfig1, syncPeerConfig2), 0, "syncPeerConfig1 is less than syncPeerConfig2")
assert.Equal(t, CompareSyncPeerConfigByBlockHashes(syncPeerConfig1, syncPeerConfig2), 0, "syncPeerConfig1 is less than syncPeerConfig2")
}
func TestCreateStateSync(t *testing.T) {

Loading…
Cancel
Save