|
|
|
@ -26,7 +26,6 @@ import ( |
|
|
|
|
|
|
|
|
|
// Constants for syncing.
|
|
|
|
|
const ( |
|
|
|
|
SleepTimeAfterNonConsensusBlockHashes = time.Second * 30 |
|
|
|
|
TimesToFail = 5 // Downloadblocks service retry limit
|
|
|
|
|
RegistrationNumber = 3 |
|
|
|
|
SyncingPortDifference = 3000 |
|
|
|
@ -316,7 +315,7 @@ func (sc *SyncConfig) cleanUpPeers(maxFirstID int) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// GetBlockHashesConsensusAndCleanUp chesk if all consensus hashes are equal.
|
|
|
|
|
func (sc *SyncConfig) GetBlockHashesConsensusAndCleanUp() bool { |
|
|
|
|
func (sc *SyncConfig) GetBlockHashesConsensusAndCleanUp() { |
|
|
|
|
sc.mtx.Lock() |
|
|
|
|
defer sc.mtx.Unlock() |
|
|
|
|
// Sort all peers by the blockHashes.
|
|
|
|
@ -328,55 +327,39 @@ func (sc *SyncConfig) GetBlockHashesConsensusAndCleanUp() bool { |
|
|
|
|
Int("maxFirstID", maxFirstID). |
|
|
|
|
Int("maxCount", maxCount). |
|
|
|
|
Msg("[SYNC] block consensus hashes") |
|
|
|
|
if float64(maxCount) >= core.ShardingSchedule.ConsensusRatio()*float64(len(sc.peers)) { |
|
|
|
|
sc.cleanUpPeers(maxFirstID) |
|
|
|
|
return true |
|
|
|
|
} |
|
|
|
|
return false |
|
|
|
|
sc.cleanUpPeers(maxFirstID) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// GetConsensusHashes gets all hashes needed to download.
|
|
|
|
|
func (ss *StateSync) GetConsensusHashes(startHash []byte, size uint32) bool { |
|
|
|
|
count := 0 |
|
|
|
|
for { |
|
|
|
|
var wg sync.WaitGroup |
|
|
|
|
ss.syncConfig.ForEachPeer(func(peerConfig *SyncPeerConfig) (brk bool) { |
|
|
|
|
wg.Add(1) |
|
|
|
|
go func() { |
|
|
|
|
defer wg.Done() |
|
|
|
|
response := peerConfig.client.GetBlockHashes(startHash, size, ss.selfip, ss.selfport) |
|
|
|
|
if response == nil { |
|
|
|
|
utils.Logger().Warn(). |
|
|
|
|
Str("peerIP", peerConfig.ip). |
|
|
|
|
Str("peerPort", peerConfig.port). |
|
|
|
|
Msg("[SYNC] GetConsensusHashes Nil Response") |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
if len(response.Payload) > int(size+1) { |
|
|
|
|
utils.Logger().Warn(). |
|
|
|
|
Uint32("requestSize", size). |
|
|
|
|
Int("respondSize", len(response.Payload)). |
|
|
|
|
Msg("[SYNC] GetConsensusHashes: receive more blockHahses than request!") |
|
|
|
|
peerConfig.blockHashes = response.Payload[:size+1] |
|
|
|
|
} else { |
|
|
|
|
peerConfig.blockHashes = response.Payload |
|
|
|
|
} |
|
|
|
|
}() |
|
|
|
|
return |
|
|
|
|
}) |
|
|
|
|
wg.Wait() |
|
|
|
|
if ss.syncConfig.GetBlockHashesConsensusAndCleanUp() { |
|
|
|
|
break |
|
|
|
|
} |
|
|
|
|
if count > TimesToFail { |
|
|
|
|
utils.Logger().Info().Msg("[SYNC] GetConsensusHashes: reached retry limit") |
|
|
|
|
return false |
|
|
|
|
} |
|
|
|
|
count++ |
|
|
|
|
time.Sleep(SleepTimeAfterNonConsensusBlockHashes) |
|
|
|
|
} |
|
|
|
|
func (ss *StateSync) GetConsensusHashes(startHash []byte, size uint32) { |
|
|
|
|
var wg sync.WaitGroup |
|
|
|
|
ss.syncConfig.ForEachPeer(func(peerConfig *SyncPeerConfig) (brk bool) { |
|
|
|
|
wg.Add(1) |
|
|
|
|
go func() { |
|
|
|
|
defer wg.Done() |
|
|
|
|
response := peerConfig.client.GetBlockHashes(startHash, size, ss.selfip, ss.selfport) |
|
|
|
|
if response == nil { |
|
|
|
|
utils.Logger().Warn(). |
|
|
|
|
Str("peerIP", peerConfig.ip). |
|
|
|
|
Str("peerPort", peerConfig.port). |
|
|
|
|
Msg("[SYNC] GetConsensusHashes Nil Response") |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
if len(response.Payload) > int(size+1) { |
|
|
|
|
utils.Logger().Warn(). |
|
|
|
|
Uint32("requestSize", size). |
|
|
|
|
Int("respondSize", len(response.Payload)). |
|
|
|
|
Msg("[SYNC] GetConsensusHashes: receive more blockHahses than request!") |
|
|
|
|
peerConfig.blockHashes = response.Payload[:size+1] |
|
|
|
|
} else { |
|
|
|
|
peerConfig.blockHashes = response.Payload |
|
|
|
|
} |
|
|
|
|
}() |
|
|
|
|
return |
|
|
|
|
}) |
|
|
|
|
wg.Wait() |
|
|
|
|
ss.syncConfig.GetBlockHashesConsensusAndCleanUp() |
|
|
|
|
utils.Logger().Info().Msg("[SYNC] Finished getting consensus block hashes") |
|
|
|
|
return true |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (ss *StateSync) generateStateSyncTaskQueue(bc *core.BlockChain) { |
|
|
|
@ -388,13 +371,13 @@ func (ss *StateSync) generateStateSyncTaskQueue(bc *core.BlockChain) { |
|
|
|
|
Err(err). |
|
|
|
|
Int("taskIndex", id). |
|
|
|
|
Str("taskBlock", hex.EncodeToString(blockHash)). |
|
|
|
|
Msg("cannot add task") |
|
|
|
|
Msg("[SYNC] generateStateSyncTaskQueue: cannot add task") |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
brk = true |
|
|
|
|
return |
|
|
|
|
}) |
|
|
|
|
utils.Logger().Info().Int64("length", ss.stateSyncTaskQueue.Len()).Msg("[SYNC] Finished generateStateSyncTaskQueue") |
|
|
|
|
utils.Logger().Info().Int64("length", ss.stateSyncTaskQueue.Len()).Msg("[SYNC] generateStateSyncTaskQueue: finished") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// downloadBlocks downloads blocks from state sync task queue.
|
|
|
|
@ -409,7 +392,7 @@ func (ss *StateSync) downloadBlocks(bc *core.BlockChain) { |
|
|
|
|
for !stateSyncTaskQueue.Empty() { |
|
|
|
|
task, err := ss.stateSyncTaskQueue.Poll(1, time.Millisecond) |
|
|
|
|
if err == queue.ErrTimeout || len(task) == 0 { |
|
|
|
|
utils.Logger().Error().Err(err).Msg("[SYNC] ss.stateSyncTaskQueue poll timeout") |
|
|
|
|
utils.Logger().Error().Err(err).Msg("[SYNC] downloadBlocks: ss.stateSyncTaskQueue poll timeout") |
|
|
|
|
break |
|
|
|
|
} |
|
|
|
|
syncTask := task[0].(SyncBlockTask) |
|
|
|
@ -417,7 +400,7 @@ func (ss *StateSync) downloadBlocks(bc *core.BlockChain) { |
|
|
|
|
payload, err := peerConfig.GetBlocks([][]byte{syncTask.blockHash}) |
|
|
|
|
if err != nil || len(payload) == 0 { |
|
|
|
|
count++ |
|
|
|
|
utils.Logger().Error().Err(err).Int("failNumber", count).Msg("[SYNC] GetBlocks failed") |
|
|
|
|
utils.Logger().Error().Err(err).Int("failNumber", count).Msg("[SYNC] downloadBlocks: GetBlocks failed") |
|
|
|
|
if count > TimesToFail { |
|
|
|
|
break |
|
|
|
|
} |
|
|
|
@ -426,7 +409,7 @@ func (ss *StateSync) downloadBlocks(bc *core.BlockChain) { |
|
|
|
|
Err(err). |
|
|
|
|
Int("taskIndex", syncTask.index). |
|
|
|
|
Str("taskBlock", hex.EncodeToString(syncTask.blockHash)). |
|
|
|
|
Msg("cannot add task") |
|
|
|
|
Msg("downloadBlocks: cannot add task") |
|
|
|
|
} |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
@ -458,7 +441,7 @@ func (ss *StateSync) downloadBlocks(bc *core.BlockChain) { |
|
|
|
|
return |
|
|
|
|
}) |
|
|
|
|
wg.Wait() |
|
|
|
|
utils.Logger().Info().Msg("[SYNC] Finished downloadBlocks") |
|
|
|
|
utils.Logger().Info().Msg("[SYNC] downloadBlocks: finished") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// CompareBlockByHash compares two block by hash, it will be used in sort the blocks
|
|
|
|
@ -635,10 +618,7 @@ func (ss *StateSync) generateNewState(bc *core.BlockChain, worker *worker.Worker |
|
|
|
|
// TODO: return error
|
|
|
|
|
func (ss *StateSync) ProcessStateSync(startHash []byte, size uint32, bc *core.BlockChain, worker *worker.Worker) { |
|
|
|
|
// Gets consensus hashes.
|
|
|
|
|
if !ss.GetConsensusHashes(startHash, size) { |
|
|
|
|
utils.Logger().Debug().Msg("[SYNC] ProcessStateSync unable to reach consensus on ss.GetConsensusHashes") |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
ss.GetConsensusHashes(startHash, size) |
|
|
|
|
ss.generateStateSyncTaskQueue(bc) |
|
|
|
|
// Download blocks.
|
|
|
|
|
if ss.stateSyncTaskQueue.Len() > 0 { |
|
|
|
|