diff --git a/syncing/syncing.go b/syncing/syncing.go index dee3717c0..b56d40d10 100644 --- a/syncing/syncing.go +++ b/syncing/syncing.go @@ -3,6 +3,7 @@ package syncing import ( "bufio" "net" + "reflect" "sync" "time" @@ -40,9 +41,11 @@ func GetStateSync() *StateSync { // StateSync is the struct that implements StateSyncInterface. type StateSync struct { - peerNumber int - activePeerNumber int - syncConfig *SyncConfig + peerNumber int + activePeerNumber int + blockHeight int + syncConfig *SyncConfig + stateSyncTaskQueue *queue.Queue } // ProcessStateSyncFromPeers used to do state sync. @@ -94,16 +97,24 @@ func (ss *StateSync) makeConnectionToPeers() { } } -// StartStateSync starts state sync. -func (ss *StateSync) StartStateSync(peers []p2p.Peer, bc *blockchain.Blockchain) { - // Create sync config. - ss.createSyncConfig(peers) - - // Make connections to peers. - ss.makeConnectionToPeers() +// areConsensusHashesEqual chesk if all consensus hashes are equal. +func (ss *StateSync) areConsensusHashesEqual() bool { + var fixedPeer *SyncPeerConfig + for _, configPeer := range ss.syncConfig.peers { + if configPeer.trusted { + if fixedPeer == nil { + fixedPeer = &configPeer + } + if !reflect.DeepEqual(configPeer.blockHashes, fixedPeer) { + return false + } + } + } + return true +} - // Looping to get an array of block hashes from honest nodes. -LOOP_HONEST_NODE: +// getConsensusHashes gets all hashes needed to download. +func (ss *StateSync) getConsensusHashes() { for { var wg sync.WaitGroup wg.Add(ss.activePeerNumber) @@ -132,37 +143,39 @@ LOOP_HONEST_NODE: peerConfig.blockHashes = blockchainSyncMessage.BlockHashes }(&configPeer) } - wg.Wait() - - if getConsensus(ss.syncConfig) { - break LOOP_HONEST_NODE + if ss.areConsensusHashesEqual() { + break } } +} - taskSyncQueue := queue.New(0) - blockSize := 0 -TASK_LOOP: +// getConsensusHashes gets all hashes needed to download. +func (ss *StateSync) generateStateSyncTaskQueue() { + ss.stateSyncTaskQueue = queue.New(0) for _, configPeer := range ss.syncConfig.peers { if configPeer.trusted { for id, blockHash := range configPeer.blockHashes { - taskSyncQueue.Put(SyncBlockTask{index: id, blockHash: blockHash}) + ss.stateSyncTaskQueue.Put(SyncBlockTask{index: id, blockHash: blockHash}) } - blockSize = len(configPeer.blockHashes) - break TASK_LOOP + ss.blockHeight = len(configPeer.blockHashes) } } +} + +// downloadBlocks downloads blocks from state sync task queue. +func (ss *StateSync) downloadBlocks(bc *blockchain.Blockchain) { // Initialize blockchain - bc.Blocks = make([]*blockchain.Block, blockSize) + bc.Blocks = make([]*blockchain.Block, ss.blockHeight) var wg sync.WaitGroup - wg.Add(ss.activePeerNumber) + wg.Add(int(ss.stateSyncTaskQueue.Len())) for _, configPeer := range ss.syncConfig.peers { if configPeer.err != nil { continue } - go func(peerConfig *SyncPeerConfig, taskSyncQueue *queue.Queue, bc *blockchain.Blockchain) { + go func(peerConfig *SyncPeerConfig, stateSyncTaskQueue *queue.Queue, bc *blockchain.Blockchain) { defer wg.Done() - for !taskSyncQueue.Empty() { - task, err := taskSyncQueue.Poll(1, time.Millisecond) + for !stateSyncTaskQueue.Empty() { + task, err := stateSyncTaskQueue.Poll(1, time.Millisecond) if err == queue.ErrTimeout { break } @@ -181,11 +194,28 @@ TASK_LOOP: bc.Blocks[syncTask.index] = block } } - }(&configPeer, taskSyncQueue, bc) + }(&configPeer, ss.stateSyncTaskQueue, bc) } wg.Wait() } +// StartStateSync starts state sync. +func (ss *StateSync) StartStateSync(peers []p2p.Peer, bc *blockchain.Blockchain) { + // Creates sync config. + ss.createSyncConfig(peers) + + // Makes connections to peers. + ss.makeConnectionToPeers() + + // Gets consensus hashes. + ss.getConsensusHashes() + + // Generates state-sync task queue. + ss.generateStateSyncTaskQueue() + + ss.downloadBlocks(bc) +} + func getConsensus(syncConfig *SyncConfig) bool { return true }