fix syncing assumption that all peers in syncing are honest. hashes consensus are reached when 2/3 nodes agree on the hashes

pull/110/head
Minh Doan 6 years ago
parent f5e26cf624
commit ceeb019bac
  1. 102
      syncing/syncing.go

@ -1,7 +1,9 @@
package syncing package syncing
import ( import (
"bytes"
"reflect" "reflect"
"sort"
"sync" "sync"
"time" "time"
@ -11,6 +13,11 @@ import (
"github.com/harmony-one/harmony/syncing/downloader" "github.com/harmony-one/harmony/syncing/downloader"
) )
// Constants for syncing.
const (
ConsensusRatio = float64(0.66)
)
// SyncPeerConfig is peer config to sync. // SyncPeerConfig is peer config to sync.
type SyncPeerConfig struct { type SyncPeerConfig struct {
ip string ip string
@ -27,7 +34,7 @@ type SyncBlockTask struct {
// SyncConfig contains an array of SyncPeerConfig. // SyncConfig contains an array of SyncPeerConfig.
type SyncConfig struct { type SyncConfig struct {
peers []SyncPeerConfig peers []*SyncPeerConfig
} }
// GetStateSync returns the implementation of StateSyncInterface interface. // GetStateSync returns the implementation of StateSyncInterface interface.
@ -44,6 +51,21 @@ type StateSync struct {
stateSyncTaskQueue *queue.Queue stateSyncTaskQueue *queue.Queue
} }
func compareSyncPeerConfigByBlockHashes(a *SyncPeerConfig, b *SyncPeerConfig) int {
if len(a.blockHashes) != len(b.blockHashes) {
if len(a.blockHashes) < len(b.blockHashes) {
return -1
}
return 1
}
for id := range a.blockHashes {
if !reflect.DeepEqual(a.blockHashes[id], b.blockHashes[id]) {
return bytes.Compare(a.blockHashes[id], b.blockHashes[id])
}
}
return 0
}
// GetBlockHashes gets block hashes by calling grpc request to the corresponding peer. // GetBlockHashes gets block hashes by calling grpc request to the corresponding peer.
func (peerConfig *SyncPeerConfig) GetBlockHashes() error { func (peerConfig *SyncPeerConfig) GetBlockHashes() error {
if peerConfig.client == nil { if peerConfig.client == nil {
@ -82,10 +104,10 @@ func (ss *StateSync) ProcessStateSyncFromPeers(peers []p2p.Peer, bc *blockchain.
func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer) { func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer) {
ss.peerNumber = len(peers) ss.peerNumber = len(peers)
ss.syncConfig = &SyncConfig{ ss.syncConfig = &SyncConfig{
peers: make([]SyncPeerConfig, ss.peerNumber), peers: make([]*SyncPeerConfig, ss.peerNumber),
} }
for id := range ss.syncConfig.peers { for id := range ss.syncConfig.peers {
ss.syncConfig.peers[id] = SyncPeerConfig{ ss.syncConfig.peers[id] = &SyncPeerConfig{
ip: peers[id].Ip, ip: peers[id].Ip,
port: peers[id].Port, port: peers[id].Port,
} }
@ -101,9 +123,14 @@ func (ss *StateSync) makeConnectionToPeers() {
go func(peerConfig *SyncPeerConfig) { go func(peerConfig *SyncPeerConfig) {
defer wg.Done() defer wg.Done()
peerConfig.client = downloader.ClientSetup(peerConfig.ip, peerConfig.port) peerConfig.client = downloader.ClientSetup(peerConfig.ip, peerConfig.port)
}(&ss.syncConfig.peers[id]) }(ss.syncConfig.peers[id])
} }
wg.Wait() wg.Wait()
ss.CleanUpNilPeers()
}
// CleanUpNilPeers cleans up peer with nil client and recalculate activePeerNumber.
func (ss *StateSync) CleanUpNilPeers() {
ss.activePeerNumber = 0 ss.activePeerNumber = 0
for _, configPeer := range ss.syncConfig.peers { for _, configPeer := range ss.syncConfig.peers {
if configPeer.client != nil { if configPeer.client != nil {
@ -112,20 +139,57 @@ func (ss *StateSync) makeConnectionToPeers() {
} }
} }
// areConsensusHashesEqual chesk if all consensus hashes are equal. // getHowMaxConsensus returns max number of consensus nodes and the first ID of consensus group.
func (ss *StateSync) areConsensusHashesEqual() bool { // Assumption: all peers are sorted by compareSyncPeerConfigByBlockHashes first.
var firstPeer *SyncPeerConfig func (syncConfig *SyncConfig) getHowMaxConsensus() (int, int) {
for _, configPeer := range ss.syncConfig.peers { // As all peers are sorted by their blockHashes, all equal blockHashes should come together and consecutively.
if configPeer.client != nil { curCount := 0
if firstPeer == nil { curFirstID := -1
firstPeer = &configPeer maxCount := 0
} maxFirstID := -1
if !reflect.DeepEqual(configPeer.blockHashes, firstPeer.blockHashes) { for i := range syncConfig.peers {
return false if curFirstID == -1 || compareSyncPeerConfigByBlockHashes(syncConfig.peers[curFirstID], syncConfig.peers[i]) != 0 {
} curCount = 1
curFirstID = i
} else {
curCount++
} }
if curCount > maxCount {
maxCount = curCount
maxFirstID = curFirstID
}
}
return maxFirstID, maxCount
}
// CleanUpPeers cleans up all peers whose blockHashes are not equal to consensus block hashes.
func (syncConfig *SyncConfig) CleanUpPeers(maxFirstID int) {
for i := range syncConfig.peers {
if compareSyncPeerConfigByBlockHashes(syncConfig.peers[maxFirstID], syncConfig.peers[i]) != 0 {
// TODO: move it into a util delete func.
// See tip https://github.com/golang/go/wiki/SliceTricks
// Close the client and remove the peer out of the
syncConfig.peers[i].client.Close()
copy(syncConfig.peers[i:], syncConfig.peers[i+1:])
syncConfig.peers[len(syncConfig.peers)-1] = nil
syncConfig.peers = syncConfig.peers[:len(syncConfig.peers)-1]
}
}
}
// getBlockHashesConsensusAndCleanUp chesk if all consensus hashes are equal.
func (ss *StateSync) getBlockHashesConsensusAndCleanUp() bool {
// Sort all peers by the blockHashes.
sort.Slice(ss.syncConfig.peers, func(i, j int) bool {
return compareSyncPeerConfigByBlockHashes(ss.syncConfig.peers[i], ss.syncConfig.peers[j]) == -1
})
maxCount, maxFirstID := ss.syncConfig.getHowMaxConsensus()
if float64(maxCount) >= ConsensusRatio*float64(ss.activePeerNumber) {
ss.syncConfig.CleanUpPeers(maxFirstID)
ss.CleanUpNilPeers()
return true
} }
return true return false
} }
// getConsensusHashes gets all hashes needed to download. // getConsensusHashes gets all hashes needed to download.
@ -142,10 +206,10 @@ func (ss *StateSync) getConsensusHashes() {
defer wg.Done() defer wg.Done()
response := peerConfig.client.GetBlockHashes() response := peerConfig.client.GetBlockHashes()
peerConfig.blockHashes = response.Payload peerConfig.blockHashes = response.Payload
}(&ss.syncConfig.peers[id]) }(ss.syncConfig.peers[id])
} }
wg.Wait() wg.Wait()
if ss.areConsensusHashesEqual() { if ss.getBlockHashesConsensusAndCleanUp() {
break break
} }
} }
@ -198,7 +262,7 @@ func (ss *StateSync) downloadBlocks(bc *blockchain.Blockchain) {
} }
} }
} }
}(&ss.syncConfig.peers[i], ss.stateSyncTaskQueue, bc) }(ss.syncConfig.peers[i], ss.stateSyncTaskQueue, bc)
} }
wg.Wait() wg.Wait()
} }

Loading…
Cancel
Save