update syncing implementation with new downloader api

pull/97/head
Minh Doan 6 years ago committed by Minh Doan
parent f6c514f802
commit 4bff45145f
  1. 1
      syncing/downloader/client.go
  2. 8
      syncing/errors.go
  3. 109
      syncing/syncing.go

@ -37,6 +37,7 @@ func ClientSetup(ip, port string) *Client {
client.conn, err = grpc.Dial(fmt.Sprintf("%s:%s", ip, port), client.opts...)
if err != nil {
log.Fatalf("fail to dial: %v", err)
return nil
}
client.dlClient = pb.NewDownloaderClient(client.conn)

@ -0,0 +1,8 @@
package syncing
import "errors"
// Errors ...
var (
ErrSyncPeerConfigClientNotReady = errors.New("client is not ready")
)

@ -1,8 +1,6 @@
package syncing
import (
"bufio"
"net"
"reflect"
"sync"
"time"
@ -10,23 +8,21 @@ import (
"github.com/Workiva/go-datastructures/queue"
"github.com/harmony-one/harmony/blockchain"
"github.com/harmony-one/harmony/p2p"
proto_node "github.com/harmony-one/harmony/proto/node"
"github.com/harmony-one/harmony/syncing/downloader"
)
// SyncPeerConfig is peer config to sync.
type SyncPeerConfig struct {
peer p2p.Peer
conn net.Conn
w *bufio.Writer
err error
trusted bool
blockHashes [][32]byte
ip string
port string
client *downloader.Client
blockHashes [][]byte
}
// SyncBlockTask is the task struct to sync a specific block.
type SyncBlockTask struct {
index int
blockHash [32]byte
blockHash []byte
}
// SyncConfig contains an array of SyncPeerConfig.
@ -48,6 +44,29 @@ type StateSync struct {
stateSyncTaskQueue *queue.Queue
}
// GetBlockHashes ...
func (peerConfig *SyncPeerConfig) GetBlockHashes() error {
if peerConfig.client == nil {
return ErrSyncPeerConfigClientNotReady
}
response := peerConfig.client.GetBlockHashes()
peerConfig.blockHashes = make([][]byte, len(response.Payload))
for i := range response.Payload {
peerConfig.blockHashes[i] = make([]byte, len(response.Payload[i]))
copy(peerConfig.blockHashes[i], response.Payload[i])
}
return nil
}
// GetBlocks ...
func (peerConfig *SyncPeerConfig) GetBlocks(heights []int32) ([][]byte, error) {
if peerConfig.client == nil {
return nil, ErrSyncPeerConfigClientNotReady
}
response := peerConfig.client.GetBlocks(heights)
return response.Payload, nil
}
// ProcessStateSyncFromPeers used to do state sync.
func (ss *StateSync) ProcessStateSyncFromPeers(peers []p2p.Peer, bc *blockchain.Blockchain) (chan struct{}, error) {
// TODO: Validate peers.
@ -71,8 +90,10 @@ func (ss *StateSync) createSyncConfig(peers []p2p.Peer) {
peers: make([]SyncPeerConfig, ss.peerNumber),
}
for id := range ss.syncConfig.peers {
ss.syncConfig.peers[id].peer = peers[id]
ss.syncConfig.peers[id].trusted = false
ss.syncConfig.peers[id] = SyncPeerConfig{
ip: peers[id].Ip,
port: peers[id].Port,
}
}
}
@ -83,29 +104,27 @@ func (ss *StateSync) makeConnectionToPeers() {
for _, synPeerConfig := range ss.syncConfig.peers {
go func(peerConfig *SyncPeerConfig) {
defer wg.Done()
peerConfig.conn, peerConfig.err = p2p.DialWithSocketClient(peerConfig.peer.Ip, peerConfig.peer.Port)
peerConfig.client = downloader.ClientSetup(peerConfig.ip, peerConfig.port)
}(&synPeerConfig)
}
wg.Wait()
ss.activePeerNumber = 0
for _, configPeer := range ss.syncConfig.peers {
if configPeer.err == nil {
if configPeer.client != nil {
ss.activePeerNumber++
configPeer.w = bufio.NewWriter(configPeer.conn)
configPeer.trusted = true
}
}
}
// areConsensusHashesEqual chesk if all consensus hashes are equal.
func (ss *StateSync) areConsensusHashesEqual() bool {
var fixedPeer *SyncPeerConfig
var firstPeer *SyncPeerConfig
for _, configPeer := range ss.syncConfig.peers {
if configPeer.trusted {
if fixedPeer == nil {
fixedPeer = &configPeer
if configPeer.client != nil {
if firstPeer == nil {
firstPeer = &configPeer
}
if !reflect.DeepEqual(configPeer.blockHashes, fixedPeer) {
if !reflect.DeepEqual(configPeer.blockHashes, firstPeer.blockHashes) {
return false
}
}
@ -120,29 +139,15 @@ func (ss *StateSync) getConsensusHashes() {
wg.Add(ss.activePeerNumber)
for _, configPeer := range ss.syncConfig.peers {
if configPeer.err != nil {
if configPeer.client == nil {
continue
}
go func(peerConfig *SyncPeerConfig) {
defer wg.Done()
msg := proto_node.ConstructBlockchainSyncMessage(proto_node.GetLastBlockHashes, [32]byte{})
peerConfig.w.Write(msg)
peerConfig.w.Flush()
var content []byte
content, peerConfig.err = p2p.ReadMessageContent(peerConfig.conn)
if peerConfig.err != nil {
peerConfig.trusted = false
return
}
var blockchainSyncMessage *proto_node.BlockchainSyncMessage
blockchainSyncMessage, peerConfig.err = proto_node.DeserializeBlockchainSyncMessage(content)
if peerConfig.err != nil {
peerConfig.trusted = false
return
}
peerConfig.blockHashes = blockchainSyncMessage.BlockHashes
peerConfig.client.GetBlockHashes()
}(&configPeer)
}
wg.Wait()
if ss.areConsensusHashesEqual() {
break
}
@ -153,11 +158,12 @@ func (ss *StateSync) getConsensusHashes() {
func (ss *StateSync) generateStateSyncTaskQueue() {
ss.stateSyncTaskQueue = queue.New(0)
for _, configPeer := range ss.syncConfig.peers {
if configPeer.trusted {
if configPeer.client != nil {
for id, blockHash := range configPeer.blockHashes {
ss.stateSyncTaskQueue.Put(SyncBlockTask{index: id, blockHash: blockHash})
}
ss.blockHeight = len(configPeer.blockHashes)
break
}
}
}
@ -169,7 +175,7 @@ func (ss *StateSync) downloadBlocks(bc *blockchain.Blockchain) {
var wg sync.WaitGroup
wg.Add(int(ss.stateSyncTaskQueue.Len()))
for _, configPeer := range ss.syncConfig.peers {
if configPeer.err != nil {
if configPeer.client == nil {
continue
}
go func(peerConfig *SyncPeerConfig, stateSyncTaskQueue *queue.Queue, bc *blockchain.Blockchain) {
@ -180,18 +186,15 @@ func (ss *StateSync) downloadBlocks(bc *blockchain.Blockchain) {
break
}
syncTask := task[0].(SyncBlockTask)
msg := proto_node.ConstructBlockchainSyncMessage(proto_node.GetBlock, syncTask.blockHash)
peerConfig.w.Write(msg)
peerConfig.w.Flush()
var content []byte
content, peerConfig.err = p2p.ReadMessageContent(peerConfig.conn)
if peerConfig.err != nil {
peerConfig.trusted = false
return
for {
id := syncTask.index
heights := []int32{int32(id)}
payload, err := peerConfig.GetBlocks(heights)
if err != nil {
// Write log
} else {
bc.Blocks[id], err = blockchain.DeserializeBlock(payload[0])
}
block, err := blockchain.DeserializeBlock(content)
if err == nil {
bc.Blocks[syncTask.index] = block
}
}
}(&configPeer, ss.stateSyncTaskQueue, bc)
@ -215,7 +218,3 @@ func (ss *StateSync) StartStateSync(peers []p2p.Peer, bc *blockchain.Blockchain)
ss.downloadBlocks(bc)
}
func getConsensus(syncConfig *SyncConfig) bool {
return true
}

Loading…
Cancel
Save