state_sync incremental improvement

pull/88/head
Minh Doan 6 years ago committed by Minh Doan
parent dd6b504079
commit fd4cbda500
  1. 13
      syncing/interface.go
  2. 73
      syncing/syncing.go
  3. 1
      syncing/syncing_test.go

@ -5,9 +5,14 @@ import (
"github.com/harmony-one/harmony/p2p"
)
// StateSync is the interface to do state-sync.
// StateSyncInterface is the interface to do state-sync.
// TODO(minhdoan): Nice to have, we should abstract the Blockchain type as generic type.
type StateSyncInterface interface {
// Syncing Block in blockchain from other peers.
// The return channel is the signal of syncing finish.
ProcessStateSync(peers []p2p.Peer, bc *blockchain.Blockchain) (chan struct{}, error)
// Syncing blockchain from other peers.
// The returned channel is the signal of syncing finish.
ProcessStateSyncFromPeers(peers []p2p.Peer, bc *blockchain.Blockchain) (chan struct{}, error)
// Syncing blockchain from a single peer.
// The returned channel is the signal of syncing finish.
ProcessStateSyncFromSinglePeer(peer *p2p.Peer, bc *blockchain.Blockchain) (chan struct{}, error)
}

@ -33,12 +33,21 @@ type SyncConfig struct {
peers []SyncPeerConfig
}
// StateSync is the main object used to do state sync.
// GetStateSync returns the implementation of StateSyncInterface interface.
func GetStateSync() *StateSync {
return &StateSync{}
}
// StateSync is the struct that implements StateSyncInterface.
type StateSync struct {
peerNumber int
activePeerNumber int
syncConfig *SyncConfig
}
// ProcessStateSync used to do state sync.
func (ss *StateSync) ProcessStateSync(peers []p2p.Peer, bc *blockchain.Blockchain) (chan struct{}, error) {
// ProcessStateSyncFromPeers used to do state sync.
func (ss *StateSync) ProcessStateSyncFromPeers(peers []p2p.Peer, bc *blockchain.Blockchain) (chan struct{}, error) {
// TODO: Validate peers.
done := make(chan struct{})
go func() {
ss.StartStateSync(peers, bc)
@ -47,44 +56,59 @@ func (ss *StateSync) ProcessStateSync(peers []p2p.Peer, bc *blockchain.Blockchai
return done, nil
}
// StartStateSync starts state sync.
func (ss *StateSync) StartStateSync(peers []p2p.Peer, bc *blockchain.Blockchain) {
peerNumber := len(peers)
syncConfig := SyncConfig{
peers: make([]SyncPeerConfig, peerNumber),
// ProcessStateSyncFromSinglePeer used to do state sync from a single peer.
func (ss *StateSync) ProcessStateSyncFromSinglePeer(peer *p2p.Peer, bc *blockchain.Blockchain) (chan struct{}, error) {
// Later.
return nil, nil
}
func (ss *StateSync) createSyncConfig(peers []p2p.Peer) {
ss.peerNumber = len(peers)
ss.syncConfig = &SyncConfig{
peers: make([]SyncPeerConfig, ss.peerNumber),
}
for id := range ss.syncConfig.peers {
ss.syncConfig.peers[id].peer = peers[id]
ss.syncConfig.peers[id].trusted = false
}
for id := range syncConfig.peers {
syncConfig.peers[id].peer = peers[id]
syncConfig.peers[id].trusted = false
}
func (ss *StateSync) makeConnectionToPeers() {
var wg sync.WaitGroup
wg.Add(peerNumber)
wg.Add(ss.peerNumber)
for id := range syncConfig.peers {
for _, synPeerConfig := range ss.syncConfig.peers {
go func(peerConfig *SyncPeerConfig) {
defer wg.Done()
peerConfig.conn, peerConfig.err = p2p.DialWithSocketClient(peerConfig.peer.Ip, peerConfig.peer.Port)
}(&syncConfig.peers[id])
}(&synPeerConfig)
}
wg.Wait()
activePeerNumber := 0
for _, configPeer := range syncConfig.peers {
ss.activePeerNumber = 0
for _, configPeer := range ss.syncConfig.peers {
if configPeer.err == nil {
activePeerNumber++
ss.activePeerNumber++
configPeer.w = bufio.NewWriter(configPeer.conn)
configPeer.trusted = true
}
}
}
// StartStateSync starts state sync.
func (ss *StateSync) StartStateSync(peers []p2p.Peer, bc *blockchain.Blockchain) {
// Create sync config.
ss.createSyncConfig(peers)
// Make connections to peers.
ss.makeConnectionToPeers()
// Looping to get an array of block hashes from honest nodes.
LOOP_HONEST_NODE:
for {
var wg sync.WaitGroup
wg.Add(activePeerNumber)
wg.Add(ss.activePeerNumber)
for _, configPeer := range syncConfig.peers {
for _, configPeer := range ss.syncConfig.peers {
if configPeer.err != nil {
continue
}
@ -110,7 +134,7 @@ LOOP_HONEST_NODE:
}
wg.Wait()
if getConsensus(&syncConfig) {
if getConsensus(ss.syncConfig) {
break LOOP_HONEST_NODE
}
}
@ -118,7 +142,7 @@ LOOP_HONEST_NODE:
taskSyncQueue := queue.New(0)
blockSize := 0
TASK_LOOP:
for _, configPeer := range syncConfig.peers {
for _, configPeer := range ss.syncConfig.peers {
if configPeer.trusted {
for id, blockHash := range configPeer.blockHashes {
taskSyncQueue.Put(SyncBlockTask{index: id, blockHash: blockHash})
@ -129,8 +153,9 @@ TASK_LOOP:
}
// Initialize blockchain
bc.Blocks = make([]*blockchain.Block, blockSize)
wg.Add(activePeerNumber)
for _, configPeer := range syncConfig.peers {
var wg sync.WaitGroup
wg.Add(ss.activePeerNumber)
for _, configPeer := range ss.syncConfig.peers {
if configPeer.err != nil {
continue
}

@ -0,0 +1 @@
package syncing
Loading…
Cancel
Save