From fd4cbda500816e78bda973102b1412ae78b568ad Mon Sep 17 00:00:00 2001 From: Minh Doan Date: Wed, 21 Nov 2018 11:27:45 -0800 Subject: [PATCH] state_sync incremental improvement --- syncing/interface.go | 13 +++++--- syncing/syncing.go | 73 +++++++++++++++++++++++++++-------------- syncing/syncing_test.go | 1 + 3 files changed, 59 insertions(+), 28 deletions(-) create mode 100644 syncing/syncing_test.go diff --git a/syncing/interface.go b/syncing/interface.go index 609f4413c..71e7f81ab 100644 --- a/syncing/interface.go +++ b/syncing/interface.go @@ -5,9 +5,14 @@ import ( "github.com/harmony-one/harmony/p2p" ) -// StateSync is the interface to do state-sync. +// StateSyncInterface is the interface to do state-sync. +// TODO(minhdoan): Nice to have, we should abstract the Blockchain type as generic type. type StateSyncInterface interface { - // Syncing Block in blockchain from other peers. - // The return channel is the signal of syncing finish. - ProcessStateSync(peers []p2p.Peer, bc *blockchain.Blockchain) (chan struct{}, error) + // Syncing blockchain from other peers. + // The returned channel is the signal of syncing finish. + ProcessStateSyncFromPeers(peers []p2p.Peer, bc *blockchain.Blockchain) (chan struct{}, error) + + // Syncing blockchain from a single peer. + // The returned channel is the signal of syncing finish. + ProcessStateSyncFromSinglePeer(peer *p2p.Peer, bc *blockchain.Blockchain) (chan struct{}, error) } diff --git a/syncing/syncing.go b/syncing/syncing.go index 61a078114..dee3717c0 100644 --- a/syncing/syncing.go +++ b/syncing/syncing.go @@ -33,12 +33,21 @@ type SyncConfig struct { peers []SyncPeerConfig } -// StateSync is the main object used to do state sync. +// GetStateSync returns the implementation of StateSyncInterface interface. +func GetStateSync() *StateSync { + return &StateSync{} +} + +// StateSync is the struct that implements StateSyncInterface. type StateSync struct { + peerNumber int + activePeerNumber int + syncConfig *SyncConfig } -// ProcessStateSync used to do state sync. -func (ss *StateSync) ProcessStateSync(peers []p2p.Peer, bc *blockchain.Blockchain) (chan struct{}, error) { +// ProcessStateSyncFromPeers used to do state sync. +func (ss *StateSync) ProcessStateSyncFromPeers(peers []p2p.Peer, bc *blockchain.Blockchain) (chan struct{}, error) { + // TODO: Validate peers. done := make(chan struct{}) go func() { ss.StartStateSync(peers, bc) @@ -47,44 +56,59 @@ func (ss *StateSync) ProcessStateSync(peers []p2p.Peer, bc *blockchain.Blockchai return done, nil } -// StartStateSync starts state sync. -func (ss *StateSync) StartStateSync(peers []p2p.Peer, bc *blockchain.Blockchain) { - peerNumber := len(peers) - syncConfig := SyncConfig{ - peers: make([]SyncPeerConfig, peerNumber), +// ProcessStateSyncFromSinglePeer used to do state sync from a single peer. +func (ss *StateSync) ProcessStateSyncFromSinglePeer(peer *p2p.Peer, bc *blockchain.Blockchain) (chan struct{}, error) { + // Later. + return nil, nil +} + +func (ss *StateSync) createSyncConfig(peers []p2p.Peer) { + ss.peerNumber = len(peers) + ss.syncConfig = &SyncConfig{ + peers: make([]SyncPeerConfig, ss.peerNumber), } - for id := range syncConfig.peers { - syncConfig.peers[id].peer = peers[id] - syncConfig.peers[id].trusted = false + for id := range ss.syncConfig.peers { + ss.syncConfig.peers[id].peer = peers[id] + ss.syncConfig.peers[id].trusted = false } +} +func (ss *StateSync) makeConnectionToPeers() { var wg sync.WaitGroup - wg.Add(peerNumber) + wg.Add(ss.peerNumber) - for id := range syncConfig.peers { + for _, synPeerConfig := range ss.syncConfig.peers { go func(peerConfig *SyncPeerConfig) { defer wg.Done() peerConfig.conn, peerConfig.err = p2p.DialWithSocketClient(peerConfig.peer.Ip, peerConfig.peer.Port) - }(&syncConfig.peers[id]) + }(&synPeerConfig) } wg.Wait() - - activePeerNumber := 0 - for _, configPeer := range syncConfig.peers { + ss.activePeerNumber = 0 + for _, configPeer := range ss.syncConfig.peers { if configPeer.err == nil { - activePeerNumber++ + ss.activePeerNumber++ configPeer.w = bufio.NewWriter(configPeer.conn) configPeer.trusted = true } } +} + +// StartStateSync starts state sync. +func (ss *StateSync) StartStateSync(peers []p2p.Peer, bc *blockchain.Blockchain) { + // Create sync config. + ss.createSyncConfig(peers) + + // Make connections to peers. + ss.makeConnectionToPeers() // Looping to get an array of block hashes from honest nodes. LOOP_HONEST_NODE: for { var wg sync.WaitGroup - wg.Add(activePeerNumber) + wg.Add(ss.activePeerNumber) - for _, configPeer := range syncConfig.peers { + for _, configPeer := range ss.syncConfig.peers { if configPeer.err != nil { continue } @@ -110,7 +134,7 @@ LOOP_HONEST_NODE: } wg.Wait() - if getConsensus(&syncConfig) { + if getConsensus(ss.syncConfig) { break LOOP_HONEST_NODE } } @@ -118,7 +142,7 @@ LOOP_HONEST_NODE: taskSyncQueue := queue.New(0) blockSize := 0 TASK_LOOP: - for _, configPeer := range syncConfig.peers { + for _, configPeer := range ss.syncConfig.peers { if configPeer.trusted { for id, blockHash := range configPeer.blockHashes { taskSyncQueue.Put(SyncBlockTask{index: id, blockHash: blockHash}) @@ -129,8 +153,9 @@ TASK_LOOP: } // Initialize blockchain bc.Blocks = make([]*blockchain.Block, blockSize) - wg.Add(activePeerNumber) - for _, configPeer := range syncConfig.peers { + var wg sync.WaitGroup + wg.Add(ss.activePeerNumber) + for _, configPeer := range ss.syncConfig.peers { if configPeer.err != nil { continue } diff --git a/syncing/syncing_test.go b/syncing/syncing_test.go new file mode 100644 index 000000000..3f8305535 --- /dev/null +++ b/syncing/syncing_test.go @@ -0,0 +1 @@ +package syncing