state sync improvement

pull/88/head
Minh Doan 6 years ago
parent 676785fa36
commit d35f12b85a
  1. 86
      syncing/syncing.go

@ -3,6 +3,7 @@ package syncing
import (
"bufio"
"net"
"reflect"
"sync"
"time"
@ -40,9 +41,11 @@ func GetStateSync() *StateSync {
// StateSync is the struct that implements StateSyncInterface.
type StateSync struct {
peerNumber int
activePeerNumber int
syncConfig *SyncConfig
peerNumber int
activePeerNumber int
blockHeight int
syncConfig *SyncConfig
stateSyncTaskQueue *queue.Queue
}
// ProcessStateSyncFromPeers used to do state sync.
@ -94,16 +97,24 @@ func (ss *StateSync) makeConnectionToPeers() {
}
}
// StartStateSync starts state sync.
func (ss *StateSync) StartStateSync(peers []p2p.Peer, bc *blockchain.Blockchain) {
// Create sync config.
ss.createSyncConfig(peers)
// Make connections to peers.
ss.makeConnectionToPeers()
// areConsensusHashesEqual chesk if all consensus hashes are equal.
func (ss *StateSync) areConsensusHashesEqual() bool {
var fixedPeer *SyncPeerConfig
for _, configPeer := range ss.syncConfig.peers {
if configPeer.trusted {
if fixedPeer == nil {
fixedPeer = &configPeer
}
if !reflect.DeepEqual(configPeer.blockHashes, fixedPeer) {
return false
}
}
}
return true
}
// Looping to get an array of block hashes from honest nodes.
LOOP_HONEST_NODE:
// getConsensusHashes gets all hashes needed to download.
func (ss *StateSync) getConsensusHashes() {
for {
var wg sync.WaitGroup
wg.Add(ss.activePeerNumber)
@ -132,37 +143,39 @@ LOOP_HONEST_NODE:
peerConfig.blockHashes = blockchainSyncMessage.BlockHashes
}(&configPeer)
}
wg.Wait()
if getConsensus(ss.syncConfig) {
break LOOP_HONEST_NODE
if ss.areConsensusHashesEqual() {
break
}
}
}
taskSyncQueue := queue.New(0)
blockSize := 0
TASK_LOOP:
// getConsensusHashes gets all hashes needed to download.
func (ss *StateSync) generateStateSyncTaskQueue() {
ss.stateSyncTaskQueue = queue.New(0)
for _, configPeer := range ss.syncConfig.peers {
if configPeer.trusted {
for id, blockHash := range configPeer.blockHashes {
taskSyncQueue.Put(SyncBlockTask{index: id, blockHash: blockHash})
ss.stateSyncTaskQueue.Put(SyncBlockTask{index: id, blockHash: blockHash})
}
blockSize = len(configPeer.blockHashes)
break TASK_LOOP
ss.blockHeight = len(configPeer.blockHashes)
}
}
}
// downloadBlocks downloads blocks from state sync task queue.
func (ss *StateSync) downloadBlocks(bc *blockchain.Blockchain) {
// Initialize blockchain
bc.Blocks = make([]*blockchain.Block, blockSize)
bc.Blocks = make([]*blockchain.Block, ss.blockHeight)
var wg sync.WaitGroup
wg.Add(ss.activePeerNumber)
wg.Add(int(ss.stateSyncTaskQueue.Len()))
for _, configPeer := range ss.syncConfig.peers {
if configPeer.err != nil {
continue
}
go func(peerConfig *SyncPeerConfig, taskSyncQueue *queue.Queue, bc *blockchain.Blockchain) {
go func(peerConfig *SyncPeerConfig, stateSyncTaskQueue *queue.Queue, bc *blockchain.Blockchain) {
defer wg.Done()
for !taskSyncQueue.Empty() {
task, err := taskSyncQueue.Poll(1, time.Millisecond)
for !stateSyncTaskQueue.Empty() {
task, err := stateSyncTaskQueue.Poll(1, time.Millisecond)
if err == queue.ErrTimeout {
break
}
@ -181,11 +194,28 @@ TASK_LOOP:
bc.Blocks[syncTask.index] = block
}
}
}(&configPeer, taskSyncQueue, bc)
}(&configPeer, ss.stateSyncTaskQueue, bc)
}
wg.Wait()
}
// StartStateSync starts state sync.
func (ss *StateSync) StartStateSync(peers []p2p.Peer, bc *blockchain.Blockchain) {
// Creates sync config.
ss.createSyncConfig(peers)
// Makes connections to peers.
ss.makeConnectionToPeers()
// Gets consensus hashes.
ss.getConsensusHashes()
// Generates state-sync task queue.
ss.generateStateSyncTaskQueue()
ss.downloadBlocks(bc)
}
func getConsensus(syncConfig *SyncConfig) bool {
return true
}

Loading…
Cancel
Save