pull/69/head
Minh Doan 6 years ago
parent c097113ca5
commit 2d46c18170
  1. 60
      node/node.go

@ -212,41 +212,37 @@ TASK_LOOP:
bc := &blockchain.Blockchain{
Blocks: make([]*blockchain.Block, blockSize),
}
// loop to do syncing.
for {
var wg sync.WaitGroup
wg.Add(activePeerNumber)
for _, configPeer := range syncConfig.peers {
if configPeer.err != nil {
continue
}
go func(peerConfig *SyncPeerConfig, taskSyncQueue *queue.Queue, bc *blockchain.Blockchain) {
defer wg.Done()
for !taskSyncQueue.Empty() {
task, err := taskSyncQueue.Poll(1, time.Millisecond)
if err == queue.ErrTimeout {
break
}
syncTask := task[0].(SyncBlockTask)
msg := proto_node.ConstructBlockchainSyncMessage(proto_node.GET_BLOCK, syncTask.blockHash)
peerConfig.w.Write(msg)
peerConfig.w.Flush()
var content []byte
content, peerConfig.err = p2p.ReadMessageContent(peerConfig.conn)
if peerConfig.err != nil {
peerConfig.trusted = false
return
}
block, err := blockchain.DeserializeBlock(content)
if err == nil {
bc.Blocks[syncTask.index] = block
}
}
}(&configPeer, taskSyncQueue, bc)
wg.Add(activePeerNumber)
for _, configPeer := range syncConfig.peers {
if configPeer.err != nil {
continue
}
wg.Wait()
go func(peerConfig *SyncPeerConfig, taskSyncQueue *queue.Queue, bc *blockchain.Blockchain) {
defer wg.Done()
for !taskSyncQueue.Empty() {
task, err := taskSyncQueue.Poll(1, time.Millisecond)
if err == queue.ErrTimeout {
break
}
syncTask := task[0].(SyncBlockTask)
msg := proto_node.ConstructBlockchainSyncMessage(proto_node.GET_BLOCK, syncTask.blockHash)
peerConfig.w.Write(msg)
peerConfig.w.Flush()
var content []byte
content, peerConfig.err = p2p.ReadMessageContent(peerConfig.conn)
if peerConfig.err != nil {
peerConfig.trusted = false
return
}
block, err := blockchain.DeserializeBlock(content)
if err == nil {
bc.Blocks[syncTask.index] = block
}
}
}(&configPeer, taskSyncQueue, bc)
}
wg.Wait()
return bc
}

Loading…
Cancel
Save