From ceeb019bac25941d11067bd009ae6a1291a87232 Mon Sep 17 00:00:00 2001 From: Minh Doan Date: Wed, 28 Nov 2018 15:35:50 -0800 Subject: [PATCH] fix syncing assumption that all peers in syncing are honest. hashes consensus are reached when 2/3 nodes agree on the hashes --- syncing/syncing.go | 102 ++++++++++++++++++++++++++++++++++++--------- 1 file changed, 83 insertions(+), 19 deletions(-) diff --git a/syncing/syncing.go b/syncing/syncing.go index 7f8de5ba3..b5a12a9b6 100644 --- a/syncing/syncing.go +++ b/syncing/syncing.go @@ -1,7 +1,9 @@ package syncing import ( + "bytes" "reflect" + "sort" "sync" "time" @@ -11,6 +13,11 @@ import ( "github.com/harmony-one/harmony/syncing/downloader" ) +// Constants for syncing. +const ( + ConsensusRatio = float64(0.66) +) + // SyncPeerConfig is peer config to sync. type SyncPeerConfig struct { ip string @@ -27,7 +34,7 @@ type SyncBlockTask struct { // SyncConfig contains an array of SyncPeerConfig. type SyncConfig struct { - peers []SyncPeerConfig + peers []*SyncPeerConfig } // GetStateSync returns the implementation of StateSyncInterface interface. @@ -44,6 +51,21 @@ type StateSync struct { stateSyncTaskQueue *queue.Queue } +func compareSyncPeerConfigByBlockHashes(a *SyncPeerConfig, b *SyncPeerConfig) int { + if len(a.blockHashes) != len(b.blockHashes) { + if len(a.blockHashes) < len(b.blockHashes) { + return -1 + } + return 1 + } + for id := range a.blockHashes { + if !reflect.DeepEqual(a.blockHashes[id], b.blockHashes[id]) { + return bytes.Compare(a.blockHashes[id], b.blockHashes[id]) + } + } + return 0 +} + // GetBlockHashes gets block hashes by calling grpc request to the corresponding peer. func (peerConfig *SyncPeerConfig) GetBlockHashes() error { if peerConfig.client == nil { @@ -82,10 +104,10 @@ func (ss *StateSync) ProcessStateSyncFromPeers(peers []p2p.Peer, bc *blockchain. func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer) { ss.peerNumber = len(peers) ss.syncConfig = &SyncConfig{ - peers: make([]SyncPeerConfig, ss.peerNumber), + peers: make([]*SyncPeerConfig, ss.peerNumber), } for id := range ss.syncConfig.peers { - ss.syncConfig.peers[id] = SyncPeerConfig{ + ss.syncConfig.peers[id] = &SyncPeerConfig{ ip: peers[id].Ip, port: peers[id].Port, } @@ -101,9 +123,14 @@ func (ss *StateSync) makeConnectionToPeers() { go func(peerConfig *SyncPeerConfig) { defer wg.Done() peerConfig.client = downloader.ClientSetup(peerConfig.ip, peerConfig.port) - }(&ss.syncConfig.peers[id]) + }(ss.syncConfig.peers[id]) } wg.Wait() + ss.CleanUpNilPeers() +} + +// CleanUpNilPeers cleans up peer with nil client and recalculate activePeerNumber. +func (ss *StateSync) CleanUpNilPeers() { ss.activePeerNumber = 0 for _, configPeer := range ss.syncConfig.peers { if configPeer.client != nil { @@ -112,20 +139,57 @@ func (ss *StateSync) makeConnectionToPeers() { } } -// areConsensusHashesEqual chesk if all consensus hashes are equal. -func (ss *StateSync) areConsensusHashesEqual() bool { - var firstPeer *SyncPeerConfig - for _, configPeer := range ss.syncConfig.peers { - if configPeer.client != nil { - if firstPeer == nil { - firstPeer = &configPeer - } - if !reflect.DeepEqual(configPeer.blockHashes, firstPeer.blockHashes) { - return false - } +// getHowMaxConsensus returns max number of consensus nodes and the first ID of consensus group. +// Assumption: all peers are sorted by compareSyncPeerConfigByBlockHashes first. +func (syncConfig *SyncConfig) getHowMaxConsensus() (int, int) { + // As all peers are sorted by their blockHashes, all equal blockHashes should come together and consecutively. + curCount := 0 + curFirstID := -1 + maxCount := 0 + maxFirstID := -1 + for i := range syncConfig.peers { + if curFirstID == -1 || compareSyncPeerConfigByBlockHashes(syncConfig.peers[curFirstID], syncConfig.peers[i]) != 0 { + curCount = 1 + curFirstID = i + } else { + curCount++ } + if curCount > maxCount { + maxCount = curCount + maxFirstID = curFirstID + } + } + return maxFirstID, maxCount +} + +// CleanUpPeers cleans up all peers whose blockHashes are not equal to consensus block hashes. +func (syncConfig *SyncConfig) CleanUpPeers(maxFirstID int) { + for i := range syncConfig.peers { + if compareSyncPeerConfigByBlockHashes(syncConfig.peers[maxFirstID], syncConfig.peers[i]) != 0 { + // TODO: move it into a util delete func. + // See tip https://github.com/golang/go/wiki/SliceTricks + // Close the client and remove the peer out of the + syncConfig.peers[i].client.Close() + copy(syncConfig.peers[i:], syncConfig.peers[i+1:]) + syncConfig.peers[len(syncConfig.peers)-1] = nil + syncConfig.peers = syncConfig.peers[:len(syncConfig.peers)-1] + } + } +} + +// getBlockHashesConsensusAndCleanUp chesk if all consensus hashes are equal. +func (ss *StateSync) getBlockHashesConsensusAndCleanUp() bool { + // Sort all peers by the blockHashes. + sort.Slice(ss.syncConfig.peers, func(i, j int) bool { + return compareSyncPeerConfigByBlockHashes(ss.syncConfig.peers[i], ss.syncConfig.peers[j]) == -1 + }) + maxCount, maxFirstID := ss.syncConfig.getHowMaxConsensus() + if float64(maxCount) >= ConsensusRatio*float64(ss.activePeerNumber) { + ss.syncConfig.CleanUpPeers(maxFirstID) + ss.CleanUpNilPeers() + return true } - return true + return false } // getConsensusHashes gets all hashes needed to download. @@ -142,10 +206,10 @@ func (ss *StateSync) getConsensusHashes() { defer wg.Done() response := peerConfig.client.GetBlockHashes() peerConfig.blockHashes = response.Payload - }(&ss.syncConfig.peers[id]) + }(ss.syncConfig.peers[id]) } wg.Wait() - if ss.areConsensusHashesEqual() { + if ss.getBlockHashesConsensusAndCleanUp() { break } } @@ -198,7 +262,7 @@ func (ss *StateSync) downloadBlocks(bc *blockchain.Blockchain) { } } } - }(&ss.syncConfig.peers[i], ss.stateSyncTaskQueue, bc) + }(ss.syncConfig.peers[i], ss.stateSyncTaskQueue, bc) } wg.Wait() }