Merge pull request #3422 from JackyWYX/dynamic_dns_dig

[SYNC] Dynamic DNS peer discovery
pull/3439/head
Rongjian Lan 4 years ago committed by GitHub
commit f0d10c2acc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 62
      api/service/syncing/syncing.go
  2. 105
      api/service/syncing/syncing_test.go

@ -62,6 +62,11 @@ func (peerConfig *SyncPeerConfig) GetClient() *downloader.Client {
return peerConfig.client return peerConfig.client
} }
// IsEqual checks the equality between two sync peers
func (peerConfig *SyncPeerConfig) IsEqual(pc2 *SyncPeerConfig) bool {
return peerConfig.ip == pc2.ip && peerConfig.port == pc2.port
}
// SyncBlockTask is the task struct to sync a specific block. // SyncBlockTask is the task struct to sync a specific block.
type SyncBlockTask struct { type SyncBlockTask struct {
index int index int
@ -110,6 +115,13 @@ type SyncConfig struct {
func (sc *SyncConfig) AddPeer(peer *SyncPeerConfig) { func (sc *SyncConfig) AddPeer(peer *SyncPeerConfig) {
sc.mtx.Lock() sc.mtx.Lock()
defer sc.mtx.Unlock() defer sc.mtx.Unlock()
// Ensure no duplicate peers
for _, p2 := range sc.peers {
if peer.IsEqual(p2) {
return
}
}
sc.peers = append(sc.peers, peer) sc.peers = append(sc.peers, peer)
} }
@ -125,6 +137,22 @@ func (sc *SyncConfig) ForEachPeer(f func(peer *SyncPeerConfig) (brk bool)) {
} }
} }
// RemovePeer removes a peer from SyncConfig
func (sc *SyncConfig) RemovePeer(peer *SyncPeerConfig) {
sc.mtx.Lock()
defer sc.mtx.Unlock()
peer.client.Close()
for i, p := range sc.peers {
if p == peer {
sc.peers = append(sc.peers[:i], sc.peers[i+1:]...)
break
}
}
utils.Logger().Info().Str("peerIP", peer.ip).Str("peerPortMsg", peer.port).
Msg("[SYNC] remove GRPC peer")
}
// CreateStateSync returns the implementation of StateSyncInterface interface. // CreateStateSync returns the implementation of StateSyncInterface interface.
func CreateStateSync(ip string, port string, peerHash [20]byte) *StateSync { func CreateStateSync(ip string, port string, peerHash [20]byte) *StateSync {
stateSync := &StateSync{} stateSync := &StateSync{}
@ -263,6 +291,10 @@ func (peerConfig *SyncPeerConfig) GetBlocks(hashes [][]byte) ([][]byte, error) {
// CreateSyncConfig creates SyncConfig for StateSync object. // CreateSyncConfig creates SyncConfig for StateSync object.
func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer, isBeacon bool) error { func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer, isBeacon bool) error {
// sanity check to ensure no duplicate peers
if err := checkPeersDuplicity(peers); err != nil {
return err
}
// limit the number of dns peers to connect // limit the number of dns peers to connect
randSeed := time.Now().UnixNano() randSeed := time.Now().UnixNano()
peers = limitNumPeers(peers, randSeed) peers = limitNumPeers(peers, randSeed)
@ -306,6 +338,23 @@ func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer, isBeacon bool) error {
return nil return nil
} }
// checkPeersDuplicity checks whether there are duplicates in p2p.Peer
func checkPeersDuplicity(ps []p2p.Peer) error {
type peerDupID struct {
ip string
port string
}
m := make(map[peerDupID]struct{})
for _, p := range ps {
dip := peerDupID{p.IP, p.Port}
if _, ok := m[dip]; ok {
return fmt.Errorf("duplicate peer [%v:%v]", p.IP, p.Port)
}
m[dip] = struct{}{}
}
return nil
}
// limitNumPeers limits number of peers to release some server end sources. // limitNumPeers limits number of peers to release some server end sources.
func limitNumPeers(ps []p2p.Peer, randSeed int64) []p2p.Peer { func limitNumPeers(ps []p2p.Peer, randSeed int64) []p2p.Peer {
targetSize := calcNumPeersWithBound(len(ps), NumPeersLowBound, numPeersHighBound) targetSize := calcNumPeersWithBound(len(ps), NumPeersLowBound, numPeersHighBound)
@ -427,6 +476,7 @@ func (ss *StateSync) getConsensusHashes(startHash []byte, size uint32) {
Str("peerIP", peerConfig.ip). Str("peerIP", peerConfig.ip).
Str("peerPort", peerConfig.port). Str("peerPort", peerConfig.port).
Msg("[SYNC] getConsensusHashes Nil Response") Msg("[SYNC] getConsensusHashes Nil Response")
ss.syncConfig.RemovePeer(peerConfig)
return return
} }
if len(response.Payload) > int(size+1) { if len(response.Payload) > int(size+1) {
@ -484,9 +534,18 @@ func (ss *StateSync) downloadBlocks(bc *core.BlockChain) {
break break
} }
payload, err := peerConfig.GetBlocks(tasks.blockHashes()) payload, err := peerConfig.GetBlocks(tasks.blockHashes())
if err != nil {
utils.Logger().Warn().Err(err).
Str("peerID", peerConfig.ip).
Str("port", peerConfig.port).
Msg("[SYNC] downloadBlocks: GetBlocks failed")
ss.syncConfig.RemovePeer(peerConfig)
return
}
if err != nil || len(payload) == 0 { if err != nil || len(payload) == 0 {
count++ count++
utils.Logger().Error().Err(err).Int("failNumber", count).Msg("[SYNC] downloadBlocks: GetBlocks failed") utils.Logger().Error().Int("failNumber", count).
Msg("[SYNC] downloadBlocks: no more retrievable blocks")
if count > downloadBlocksRetryLimit { if count > downloadBlocksRetryLimit {
break break
} }
@ -871,6 +930,7 @@ func (ss *StateSync) getMaxPeerHeight(isBeacon bool) uint64 {
response, err := peerConfig.client.GetBlockChainHeight() response, err := peerConfig.client.GetBlockChainHeight()
if err != nil { if err != nil {
utils.Logger().Warn().Err(err).Str("peerIP", peerConfig.ip).Str("peerPort", peerConfig.port).Msg("[Sync]GetBlockChainHeight failed") utils.Logger().Warn().Err(err).Str("peerIP", peerConfig.ip).Str("peerPort", peerConfig.port).Msg("[Sync]GetBlockChainHeight failed")
ss.syncConfig.RemovePeer(peerConfig)
return return
} }
ss.syncMux.Lock() ss.syncMux.Lock()

@ -1,8 +1,10 @@
package syncing package syncing
import ( import (
"errors"
"fmt" "fmt"
"reflect" "reflect"
"strings"
"testing" "testing"
"github.com/harmony-one/harmony/api/service/syncing/downloader" "github.com/harmony-one/harmony/api/service/syncing/downloader"
@ -10,6 +12,53 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
func TestSyncPeerConfig_IsEqual(t *testing.T) {
tests := []struct {
p1, p2 *SyncPeerConfig
exp bool
}{
{
p1: &SyncPeerConfig{
ip: "0.0.0.1",
port: "1",
},
p2: &SyncPeerConfig{
ip: "0.0.0.1",
port: "2",
},
exp: false,
},
{
p1: &SyncPeerConfig{
ip: "0.0.0.1",
port: "1",
},
p2: &SyncPeerConfig{
ip: "0.0.0.2",
port: "1",
},
exp: false,
},
{
p1: &SyncPeerConfig{
ip: "0.0.0.1",
port: "1",
},
p2: &SyncPeerConfig{
ip: "0.0.0.1",
port: "1",
},
exp: true,
},
}
for i, test := range tests {
res := test.p1.IsEqual(test.p2)
if res != test.exp {
t.Errorf("Test %v: unexpected res %v / %v", i, res, test.exp)
}
}
}
// Simple test for IncorrectResponse // Simple test for IncorrectResponse
func TestCreateTestSyncPeerConfig(t *testing.T) { func TestCreateTestSyncPeerConfig(t *testing.T) {
client := &downloader.Client{} client := &downloader.Client{}
@ -53,6 +102,32 @@ func TestCreateStateSync(t *testing.T) {
} }
} }
func TestCheckPeersDuplicity(t *testing.T) {
tests := []struct {
peers []p2p.Peer
expErr error
}{
{
peers: makePeersForTest(100),
expErr: nil,
},
{
peers: append(makePeersForTest(100), p2p.Peer{
IP: makeTestPeerIP(0),
}),
expErr: errors.New("duplicate peer"),
},
}
for i, test := range tests {
err := checkPeersDuplicity(test.peers)
if assErr := assertTestError(err, test.expErr); assErr != nil {
t.Errorf("Test %v: %v", i, assErr)
}
}
}
func TestLimitPeersWithBound(t *testing.T) { func TestLimitPeersWithBound(t *testing.T) {
tests := []struct { tests := []struct {
size int size int
@ -76,7 +151,7 @@ func TestLimitPeersWithBound(t *testing.T) {
if len(res) != test.expSize { if len(res) != test.expSize {
t.Errorf("result size unexpected: %v / %v", len(res), test.expSize) t.Errorf("result size unexpected: %v / %v", len(res), test.expSize)
} }
if err := checkTestPeerDuplicity(res); err != nil { if err := checkPeersDuplicity(res); err != nil {
t.Error(err) t.Error(err)
} }
} }
@ -97,24 +172,30 @@ func TestLimitPeersWithBound_random(t *testing.T) {
func makePeersForTest(size int) []p2p.Peer { func makePeersForTest(size int) []p2p.Peer {
ps := make([]p2p.Peer, 0, size) ps := make([]p2p.Peer, 0, size)
for i := 0; i != size; i++ { for i := 0; i != size; i++ {
ps = append(ps, p2p.Peer{ ps = append(ps, makePeerForTest(i))
IP: makeTestPeerIP(i),
})
} }
return ps return ps
} }
func checkTestPeerDuplicity(ps []p2p.Peer) error { func makePeerForTest(i interface{}) p2p.Peer {
m := make(map[string]struct{}) return p2p.Peer{
for _, p := range ps { IP: makeTestPeerIP(i),
if _, ok := m[p.IP]; ok {
return fmt.Errorf("duplicate ip")
}
m[p.IP] = struct{}{}
} }
return nil
} }
func makeTestPeerIP(i interface{}) string { func makeTestPeerIP(i interface{}) string {
return fmt.Sprintf("%v", i) return fmt.Sprintf("%v", i)
} }
func assertTestError(got, expect error) error {
if (got == nil) && (expect == nil) {
return nil
}
if (got == nil) != (expect == nil) {
return fmt.Errorf("unexpected error: [%v] / [%v]", got, expect)
}
if !strings.Contains(got.Error(), expect.Error()) {
return fmt.Errorf("unexpected error: [%v] / [%v]", got, expect)
}
return nil
}

Loading…
Cancel
Save