change sync state logic and fix its test

pull/102/head
Minh Doan 6 years ago committed by Minh Doan
parent 8b84044730
commit 3b1bd7fdf3
  1. 1
      syncing/interface.go
  2. 43
      syncing/syncing.go
  3. 13
      syncing/syncing_test.go

@ -6,7 +6,6 @@ import (
) )
// StateSyncInterface is the interface to do state-sync. // StateSyncInterface is the interface to do state-sync.
// TODO(minhdoan): Nice to have, we should abstract the Blockchain type as generic type.
type StateSyncInterface interface { type StateSyncInterface interface {
// Syncing blockchain from other peers. // Syncing blockchain from other peers.
// The returned channel is the signal of syncing finish. // The returned channel is the signal of syncing finish.

@ -59,11 +59,11 @@ func (peerConfig *SyncPeerConfig) GetBlockHashes() error {
} }
// GetBlocks ... // GetBlocks ...
func (peerConfig *SyncPeerConfig) GetBlocks(heights []int32) ([][]byte, error) { func (peerConfig *SyncPeerConfig) GetBlocks(hashes [][]byte) ([][]byte, error) {
if peerConfig.client == nil { if peerConfig.client == nil {
return nil, ErrSyncPeerConfigClientNotReady return nil, ErrSyncPeerConfigClientNotReady
} }
response := peerConfig.client.GetBlocks(heights) response := peerConfig.client.GetBlocks(hashes)
return response.Payload, nil return response.Payload, nil
} }
@ -78,12 +78,7 @@ func (ss *StateSync) ProcessStateSyncFromPeers(peers []p2p.Peer, bc *blockchain.
return done, nil return done, nil
} }
// ProcessStateSyncFromSinglePeer used to do state sync from a single peer. // CreateSyncConfig ...
func (ss *StateSync) ProcessStateSyncFromSinglePeer(peer *p2p.Peer, bc *blockchain.Blockchain) (chan struct{}, error) {
// Later.
return nil, nil
}
func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer) { func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer) {
ss.peerNumber = len(peers) ss.peerNumber = len(peers)
ss.syncConfig = &SyncConfig{ ss.syncConfig = &SyncConfig{
@ -101,11 +96,11 @@ func (ss *StateSync) makeConnectionToPeers() {
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(ss.peerNumber) wg.Add(ss.peerNumber)
for _, synPeerConfig := range ss.syncConfig.peers { for id := range ss.syncConfig.peers {
go func(peerConfig *SyncPeerConfig) { go func(peerConfig *SyncPeerConfig) {
defer wg.Done() defer wg.Done()
peerConfig.client = downloader.ClientSetup(peerConfig.ip, peerConfig.port) peerConfig.client = downloader.ClientSetup(peerConfig.ip, peerConfig.port)
}(&synPeerConfig) }(&ss.syncConfig.peers[id])
} }
wg.Wait() wg.Wait()
ss.activePeerNumber = 0 ss.activePeerNumber = 0
@ -138,14 +133,14 @@ func (ss *StateSync) getConsensusHashes() {
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(ss.activePeerNumber) wg.Add(ss.activePeerNumber)
for _, configPeer := range ss.syncConfig.peers { for id := range ss.syncConfig.peers {
if configPeer.client == nil { if ss.syncConfig.peers[id].client == nil {
continue continue
} }
go func(peerConfig *SyncPeerConfig) { go func(peerConfig *SyncPeerConfig) {
defer wg.Done() defer wg.Done()
peerConfig.client.GetBlockHashes() peerConfig.client.GetBlockHashes()
}(&configPeer) }(&ss.syncConfig.peers[id])
} }
wg.Wait() wg.Wait()
if ss.areConsensusHashesEqual() { if ss.areConsensusHashesEqual() {
@ -173,9 +168,9 @@ func (ss *StateSync) downloadBlocks(bc *blockchain.Blockchain) {
// Initialize blockchain // Initialize blockchain
bc.Blocks = make([]*blockchain.Block, ss.blockHeight) bc.Blocks = make([]*blockchain.Block, ss.blockHeight)
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(int(ss.stateSyncTaskQueue.Len())) wg.Add(ss.activePeerNumber)
for _, configPeer := range ss.syncConfig.peers { for i := range ss.syncConfig.peers {
if configPeer.client == nil { if ss.syncConfig.peers[i].client == nil {
continue continue
} }
go func(peerConfig *SyncPeerConfig, stateSyncTaskQueue *queue.Queue, bc *blockchain.Blockchain) { go func(peerConfig *SyncPeerConfig, stateSyncTaskQueue *queue.Queue, bc *blockchain.Blockchain) {
@ -188,23 +183,17 @@ func (ss *StateSync) downloadBlocks(bc *blockchain.Blockchain) {
syncTask := task[0].(SyncBlockTask) syncTask := task[0].(SyncBlockTask)
for { for {
id := syncTask.index id := syncTask.index
heights := []int32{int32(id)} payload, err := peerConfig.GetBlocks([][]byte{syncTask.blockHash})
payload, err := peerConfig.GetBlocks(heights)
if err == nil { if err == nil {
stop := true // As of now, only send and ask for one block.
for i, id := range heights { bc.Blocks[id], err = blockchain.DeserializeBlock(payload[0])
bc.Blocks[id], err = blockchain.DeserializeBlock(payload[i]) if err == nil {
if err != nil {
stop = false
}
}
if stop {
break break
} }
} }
} }
} }
}(&configPeer, ss.stateSyncTaskQueue, bc) }(&ss.syncConfig.peers[i], ss.stateSyncTaskQueue, bc)
} }
wg.Wait() wg.Wait()
} }

@ -1,6 +1,7 @@
package syncing_test package syncing_test
import ( import (
"reflect"
"testing" "testing"
bc "github.com/harmony-one/harmony/blockchain" bc "github.com/harmony-one/harmony/blockchain"
@ -79,8 +80,9 @@ func (node *FakeNode) CalculateResponse(request *pb.DownloaderRequest) (*pb.Down
response.Payload = append(response.Payload, block.Hash[:]) response.Payload = append(response.Payload, block.Hash[:])
} }
} else { } else {
for _, id := range request.Height { for i := range request.Hashes {
response.Payload = append(response.Payload, node.bc.Blocks[id].Serialize()) block := node.bc.FindBlock(request.Hashes[i])
response.Payload = append(response.Payload, block.Serialize())
} }
} }
return response, nil return response, nil
@ -102,8 +104,15 @@ func TestSyncing(t *testing.T) {
peers[i].Ip = fakeNodes[i].ip peers[i].Ip = fakeNodes[i].ip
peers[i].Port = fakeNodes[i].port peers[i].Port = fakeNodes[i].port
} }
stateSync.StartStateSync(peers, bc) stateSync.StartStateSync(peers, bc)
for i := range bc.Blocks {
if !reflect.DeepEqual(bc.Blocks[i], fakeNodes[0].bc.Blocks[i]) {
t.Error("not equal")
}
}
for _, fakeNode := range fakeNodes { for _, fakeNode := range fakeNodes {
fakeNode.grpcServer.Stop() fakeNode.grpcServer.Stop()
} }

Loading…
Cancel
Save