|
|
@ -103,9 +103,7 @@ func (ss *StateSync) AddLastMileBlock(block *types.Block) { |
|
|
|
// CloseConnections close grpc connections for state sync clients
|
|
|
|
// CloseConnections close grpc connections for state sync clients
|
|
|
|
func (ss *StateSync) CloseConnections() { |
|
|
|
func (ss *StateSync) CloseConnections() { |
|
|
|
for _, pc := range ss.syncConfig.peers { |
|
|
|
for _, pc := range ss.syncConfig.peers { |
|
|
|
if pc.client != nil { |
|
|
|
pc.client.Close() |
|
|
|
pc.client.Close() |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -148,9 +146,6 @@ func CompareSyncPeerConfigByblockHashes(a *SyncPeerConfig, b *SyncPeerConfig) in |
|
|
|
|
|
|
|
|
|
|
|
// GetBlocks gets blocks by calling grpc request to the corresponding peer.
|
|
|
|
// GetBlocks gets blocks by calling grpc request to the corresponding peer.
|
|
|
|
func (peerConfig *SyncPeerConfig) GetBlocks(hashes [][]byte) ([][]byte, error) { |
|
|
|
func (peerConfig *SyncPeerConfig) GetBlocks(hashes [][]byte) ([][]byte, error) { |
|
|
|
if peerConfig.client == nil { |
|
|
|
|
|
|
|
return nil, ErrSyncPeerConfigClientNotReady |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
response := peerConfig.client.GetBlocks(hashes) |
|
|
|
response := peerConfig.client.GetBlocks(hashes) |
|
|
|
if response == nil { |
|
|
|
if response == nil { |
|
|
|
return nil, ErrGetBlock |
|
|
|
return nil, ErrGetBlock |
|
|
@ -264,9 +259,6 @@ func (ss *StateSync) GetConsensusHashes(startHash []byte) bool { |
|
|
|
for { |
|
|
|
for { |
|
|
|
var wg sync.WaitGroup |
|
|
|
var wg sync.WaitGroup |
|
|
|
for id := range ss.syncConfig.peers { |
|
|
|
for id := range ss.syncConfig.peers { |
|
|
|
if ss.syncConfig.peers[id].client == nil { |
|
|
|
|
|
|
|
continue |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
wg.Add(1) |
|
|
|
wg.Add(1) |
|
|
|
go func(peerConfig *SyncPeerConfig) { |
|
|
|
go func(peerConfig *SyncPeerConfig) { |
|
|
|
defer wg.Done() |
|
|
|
defer wg.Done() |
|
|
@ -295,12 +287,10 @@ func (ss *StateSync) GetConsensusHashes(startHash []byte) bool { |
|
|
|
func (ss *StateSync) generateStateSyncTaskQueue(bc *core.BlockChain) { |
|
|
|
func (ss *StateSync) generateStateSyncTaskQueue(bc *core.BlockChain) { |
|
|
|
ss.stateSyncTaskQueue = queue.New(0) |
|
|
|
ss.stateSyncTaskQueue = queue.New(0) |
|
|
|
for _, configPeer := range ss.syncConfig.peers { |
|
|
|
for _, configPeer := range ss.syncConfig.peers { |
|
|
|
if configPeer.client != nil { |
|
|
|
for id, blockHash := range configPeer.blockHashes { |
|
|
|
for id, blockHash := range configPeer.blockHashes { |
|
|
|
ss.stateSyncTaskQueue.Put(SyncBlockTask{index: id, blockHash: blockHash}) |
|
|
|
ss.stateSyncTaskQueue.Put(SyncBlockTask{index: id, blockHash: blockHash}) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
break |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
break |
|
|
|
} |
|
|
|
} |
|
|
|
utils.GetLogInstance().Info("syncing: Finished generateStateSyncTaskQueue", "length", ss.stateSyncTaskQueue.Len()) |
|
|
|
utils.GetLogInstance().Info("syncing: Finished generateStateSyncTaskQueue", "length", ss.stateSyncTaskQueue.Len()) |
|
|
|
} |
|
|
|
} |
|
|
@ -312,9 +302,6 @@ func (ss *StateSync) downloadBlocks(bc *core.BlockChain) { |
|
|
|
wg.Add(len(ss.syncConfig.peers)) |
|
|
|
wg.Add(len(ss.syncConfig.peers)) |
|
|
|
count := 0 |
|
|
|
count := 0 |
|
|
|
for i := range ss.syncConfig.peers { |
|
|
|
for i := range ss.syncConfig.peers { |
|
|
|
if ss.syncConfig.peers[i].client == nil { |
|
|
|
|
|
|
|
continue |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
go func(peerConfig *SyncPeerConfig, stateSyncTaskQueue *queue.Queue, bc *core.BlockChain) { |
|
|
|
go func(peerConfig *SyncPeerConfig, stateSyncTaskQueue *queue.Queue, bc *core.BlockChain) { |
|
|
|
defer wg.Done() |
|
|
|
defer wg.Done() |
|
|
|
for !stateSyncTaskQueue.Empty() { |
|
|
|
for !stateSyncTaskQueue.Empty() { |
|
|
@ -544,9 +531,6 @@ func (ss *StateSync) RegisterNodeInfo() int { |
|
|
|
utils.GetLogInstance().Debug("[SYNC] skip self", "peerport", peerConfig.port, "selfport", ss.selfport, "selfsyncport", GetSyncingPort(ss.selfport)) |
|
|
|
utils.GetLogInstance().Debug("[SYNC] skip self", "peerport", peerConfig.port, "selfport", ss.selfport, "selfsyncport", GetSyncingPort(ss.selfport)) |
|
|
|
continue |
|
|
|
continue |
|
|
|
} |
|
|
|
} |
|
|
|
if peerConfig.client == nil { |
|
|
|
|
|
|
|
continue |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
err := peerConfig.registerToBroadcast(ss.selfPeerHash[:], ss.selfip, ss.selfport) |
|
|
|
err := peerConfig.registerToBroadcast(ss.selfPeerHash[:], ss.selfip, ss.selfport) |
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
utils.GetLogInstance().Debug("[SYNC] register failed to peer", "ip", peerConfig.ip, "port", peerConfig.port, "selfPeerHash", ss.selfPeerHash) |
|
|
|
utils.GetLogInstance().Debug("[SYNC] register failed to peer", "ip", peerConfig.ip, "port", peerConfig.port, "selfPeerHash", ss.selfPeerHash) |
|
|
|