add description for closing client and change randomize process to ma… (#4276)

* add description for closing client and change randomize process to make sure only online nodes are added to sync config

* fix sync test

* fix legacy limitNumPeers test

* add WaitForEachPeerToConnect to node configs to make parallel peer connection optional

Co-authored-by: “GheisMohammadi” <“Gheis.Mohammadi@gmail.com”>
pull/4323/head
Gheis 2 years ago committed by GitHub
parent 54742e73e1
commit 0104b1d623
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 17
      api/service/legacysync/downloader/client.go
  2. 8
      api/service/legacysync/epoch_syncing.go
  3. 35
      api/service/legacysync/helpers.go
  4. 22
      api/service/legacysync/syncing.go
  5. 9
      api/service/legacysync/syncing_test.go
  6. 2
      api/service/stagedsync/stagedsync.go
  7. 12
      api/service/stagedsync/sync_config.go
  8. 8
      cmd/harmony/config_migrations.go
  9. 3
      cmd/harmony/default.go
  10. 9
      cmd/harmony/flags.go
  11. 6
      cmd/harmony/flags_test.go
  12. 1
      cmd/harmony/main.go
  13. 1
      internal/configs/harmony/harmony.go
  14. 2
      internal/configs/node/network.go
  15. 2
      node/node.go
  16. 12
      node/node_syncing.go
  17. 1
      p2p/host.go

@ -16,6 +16,7 @@ type Client struct {
dlClient pb.DownloaderClient dlClient pb.DownloaderClient
opts []grpc.DialOption opts []grpc.DialOption
conn *grpc.ClientConn conn *grpc.ClientConn
addr string
} }
// ClientSetup setups a Client given ip and port. // ClientSetup setups a Client given ip and port.
@ -28,8 +29,9 @@ func ClientSetup(ip, port string, withBlock bool) *Client {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel() defer cancel()
client.addr = fmt.Sprintf("%s:%s", ip, port)
var err error var err error
client.conn, err = grpc.DialContext(ctx, fmt.Sprintf(ip+":"+port), client.opts...) client.conn, err = grpc.DialContext(ctx, client.addr, client.opts...)
if err != nil { if err != nil {
utils.Logger().Error().Err(err).Str("ip", ip).Msg("[SYNC] client.go:ClientSetup fail to dial") utils.Logger().Error().Err(err).Str("ip", ip).Msg("[SYNC] client.go:ClientSetup fail to dial")
return nil return nil
@ -71,11 +73,18 @@ func (client *Client) WaitForConnection(t time.Duration) bool {
} }
// Close closes the Client. // Close closes the Client.
func (client *Client) Close() { func (client *Client) Close(reason string) {
err := client.conn.Close() err := client.conn.Close()
if err != nil { if err != nil {
utils.Logger().Info().Msg("[SYNC] unable to close connection") utils.Logger().Info().
} Str("peerAddress", client.addr).
Msg("[SYNC] unable to close peer connection")
return
}
utils.Logger().Info().
Str("peerAddress", client.addr).
Str("reason", reason).
Msg("[SYNC] peer connection closed")
} }
// GetBlockHashes gets block hashes from all the peers by calling grpc request. // GetBlockHashes gets block hashes from all the peers by calling grpc request.

@ -94,7 +94,7 @@ func (ss *EpochSync) syncLoop(bc core.BlockChain, isBeacon bool, _ *consensus.Co
for { for {
if maxHeight == 0 || maxHeight == math.MaxUint64 { if maxHeight == 0 || maxHeight == math.MaxUint64 {
utils.Logger().Info(). utils.Logger().Info().
Msgf("[EPOCHSYNC] No peers to sync (isBeacon: %t, ShardID: %d, peerscount: %d)", Msgf("[EPOCHSYNC] No peers to sync (isBeacon: %t, ShardID: %d, peersCount: %d)",
isBeacon, bc.ShardID(), ss.syncConfig.PeersCount()) isBeacon, bc.ShardID(), ss.syncConfig.PeersCount())
return 10 return 10
} }
@ -109,7 +109,7 @@ func (ss *EpochSync) syncLoop(bc core.BlockChain, isBeacon bool, _ *consensus.Co
} }
if otherEpoch < curEpoch { if otherEpoch < curEpoch {
for _, peerCfg := range ss.syncConfig.GetPeers() { for _, peerCfg := range ss.syncConfig.GetPeers() {
ss.syncConfig.RemovePeer(peerCfg, fmt.Sprintf("[EPOCHSYNC]: current height is higher that others, removve peers: %s", peerCfg.String())) ss.syncConfig.RemovePeer(peerCfg, fmt.Sprintf("[EPOCHSYNC]: current height is higher that others, remove peers: %s", peerCfg.String()))
} }
return 2 return 2
} }
@ -201,8 +201,8 @@ func (ss *EpochSync) processWithPayload(payload [][]byte, bc core.BlockChain) er
} }
// CreateSyncConfig creates SyncConfig for StateSync object. // CreateSyncConfig creates SyncConfig for StateSync object.
func (ss *EpochSync) CreateSyncConfig(peers []p2p.Peer, shardID uint32) error { func (ss *EpochSync) CreateSyncConfig(peers []p2p.Peer, shardID uint32, waitForEachPeerToConnect bool) error {
var err error var err error
ss.syncConfig, err = createSyncConfig(ss.syncConfig, peers, shardID) ss.syncConfig, err = createSyncConfig(ss.syncConfig, peers, shardID, waitForEachPeerToConnect)
return err return err
} }

@ -3,6 +3,7 @@ package legacysync
import ( import (
"fmt" "fmt"
"sync" "sync"
"time"
"github.com/ethereum/go-ethereum/common/math" "github.com/ethereum/go-ethereum/common/math"
"github.com/harmony-one/harmony/api/service/legacysync/downloader" "github.com/harmony-one/harmony/api/service/legacysync/downloader"
@ -50,18 +51,22 @@ func getMaxPeerHeight(syncConfig *SyncConfig) uint64 {
return maxHeight return maxHeight
} }
func createSyncConfig(syncConfig *SyncConfig, peers []p2p.Peer, shardID uint32) (*SyncConfig, error) { func createSyncConfig(syncConfig *SyncConfig, peers []p2p.Peer, shardID uint32, waitForEachPeerToConnect bool) (*SyncConfig, error) {
// sanity check to ensure no duplicate peers // sanity check to ensure no duplicate peers
if err := checkPeersDuplicity(peers); err != nil { if err := checkPeersDuplicity(peers); err != nil {
return syncConfig, err return syncConfig, err
} }
// limit the number of dns peers to connect
randSeed := time.Now().UnixNano()
targetSize, peers := limitNumPeers(peers, randSeed)
utils.Logger().Debug(). utils.Logger().Debug().
Int("len", len(peers)). Int("peers count", len(peers)).
Int("target size", targetSize).
Uint32("shardID", shardID). Uint32("shardID", shardID).
Msg("[SYNC] CreateSyncConfig: len of peers") Msg("[SYNC] CreateSyncConfig: len of peers")
if len(peers) == 0 { if targetSize == 0 {
return syncConfig, errors.New("[SYNC] no peers to connect to") return syncConfig, errors.New("[SYNC] no peers to connect to")
} }
if syncConfig != nil { if syncConfig != nil {
@ -69,8 +74,10 @@ func createSyncConfig(syncConfig *SyncConfig, peers []p2p.Peer, shardID uint32)
} }
syncConfig = NewSyncConfig(shardID, nil) syncConfig = NewSyncConfig(shardID, nil)
if !waitForEachPeerToConnect {
var wg sync.WaitGroup var wg sync.WaitGroup
for _, peer := range peers { ps := peers[:targetSize]
for _, peer := range ps {
wg.Add(1) wg.Add(1)
go func(peer p2p.Peer) { go func(peer p2p.Peer) {
defer wg.Done() defer wg.Done()
@ -87,6 +94,26 @@ func createSyncConfig(syncConfig *SyncConfig, peers []p2p.Peer, shardID uint32)
}(peer) }(peer)
} }
wg.Wait() wg.Wait()
} else {
var connectedPeers int
for _, peer := range peers {
client := downloader.ClientSetup(peer.IP, peer.Port, true)
if client == nil || !client.IsReady() {
continue
}
peerConfig := &SyncPeerConfig{
ip: peer.IP,
port: peer.Port,
client: client,
}
syncConfig.AddPeer(peerConfig)
connectedPeers++
if connectedPeers >= targetSize {
break
}
}
}
utils.Logger().Info(). utils.Logger().Info().
Int("len", len(syncConfig.peers)). Int("len", len(syncConfig.peers)).
Uint32("shardID", shardID). Uint32("shardID", shardID).

@ -174,7 +174,8 @@ func (sc *SyncConfig) RemovePeer(peer *SyncPeerConfig, reason string) {
sc.mtx.Lock() sc.mtx.Lock()
defer sc.mtx.Unlock() defer sc.mtx.Unlock()
peer.client.Close() closeReason := fmt.Sprintf("remove peer (reason: %s)", reason)
peer.client.Close(closeReason)
for i, p := range sc.peers { for i, p := range sc.peers {
if p == peer { if p == peer {
sc.peers = append(sc.peers[:i], sc.peers[i+1:]...) sc.peers = append(sc.peers[:i], sc.peers[i+1:]...)
@ -288,7 +289,7 @@ func (sc *SyncConfig) CloseConnections() {
sc.mtx.RLock() sc.mtx.RLock()
defer sc.mtx.RUnlock() defer sc.mtx.RUnlock()
for _, pc := range sc.peers { for _, pc := range sc.peers {
pc.client.Close() pc.client.Close("close all connections")
} }
} }
@ -363,9 +364,9 @@ func (peerConfig *SyncPeerConfig) GetBlocks(hashes [][]byte) ([][]byte, error) {
} }
// CreateSyncConfig creates SyncConfig for StateSync object. // CreateSyncConfig creates SyncConfig for StateSync object.
func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer, shardID uint32) error { func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer, shardID uint32, waitForEachPeerToConnect bool) error {
var err error var err error
ss.syncConfig, err = createSyncConfig(ss.syncConfig, peers, shardID) ss.syncConfig, err = createSyncConfig(ss.syncConfig, peers, shardID, waitForEachPeerToConnect)
return err return err
} }
@ -387,16 +388,16 @@ func checkPeersDuplicity(ps []p2p.Peer) error {
} }
// limitNumPeers limits number of peers to release some server end sources. // limitNumPeers limits number of peers to release some server end sources.
func limitNumPeers(ps []p2p.Peer, randSeed int64) []p2p.Peer { func limitNumPeers(ps []p2p.Peer, randSeed int64) (int, []p2p.Peer) {
targetSize := calcNumPeersWithBound(len(ps), NumPeersLowBound, numPeersHighBound) targetSize := calcNumPeersWithBound(len(ps), NumPeersLowBound, numPeersHighBound)
if len(ps) <= targetSize { if len(ps) <= targetSize {
return ps return len(ps), ps
} }
r := rand.New(rand.NewSource(randSeed)) r := rand.New(rand.NewSource(randSeed))
r.Shuffle(len(ps), func(i, j int) { ps[i], ps[j] = ps[j], ps[i] }) r.Shuffle(len(ps), func(i, j int) { ps[i], ps[j] = ps[j], ps[i] })
return ps[:targetSize] return targetSize, ps
} }
// Peers are expected to limited at half of the size, capped between lowBound and highBound. // Peers are expected to limited at half of the size, capped between lowBound and highBound.
@ -462,19 +463,20 @@ func (sc *SyncConfig) InitForTesting(client *downloader.Client, blockHashes [][]
func (sc *SyncConfig) cleanUpPeers(maxFirstID int) { func (sc *SyncConfig) cleanUpPeers(maxFirstID int) {
fixedPeer := sc.peers[maxFirstID] fixedPeer := sc.peers[maxFirstID]
utils.Logger().Info().Int("peers", len(sc.peers)).Msg("[SYNC] before cleanUpPeers") var removedPeers int
for i := 0; i < len(sc.peers); i++ { for i := 0; i < len(sc.peers); i++ {
if CompareSyncPeerConfigByblockHashes(fixedPeer, sc.peers[i]) != 0 { if CompareSyncPeerConfigByblockHashes(fixedPeer, sc.peers[i]) != 0 {
// TODO: move it into a util delete func. // TODO: move it into a util delete func.
// See tip https://github.com/golang/go/wiki/SliceTricks // See tip https://github.com/golang/go/wiki/SliceTricks
// Close the client and remove the peer out of the // Close the client and remove the peer out of the
sc.peers[i].client.Close() sc.peers[i].client.Close("cleanup peers")
copy(sc.peers[i:], sc.peers[i+1:]) copy(sc.peers[i:], sc.peers[i+1:])
sc.peers[len(sc.peers)-1] = nil sc.peers[len(sc.peers)-1] = nil
sc.peers = sc.peers[:len(sc.peers)-1] sc.peers = sc.peers[:len(sc.peers)-1]
removedPeers++
} }
} }
utils.Logger().Info().Int("peers", len(sc.peers)).Msg("[SYNC] post cleanUpPeers") utils.Logger().Info().Int("removed peers", removedPeers).Msg("[SYNC] post cleanUpPeers")
} }
// GetBlockHashesConsensusAndCleanUp selects the most common peer config based on their block hashes to download/sync. // GetBlockHashesConsensusAndCleanUp selects the most common peer config based on their block hashes to download/sync.

@ -167,7 +167,8 @@ func TestLimitPeersWithBound(t *testing.T) {
for _, test := range tests { for _, test := range tests {
ps := makePeersForTest(test.size) ps := makePeersForTest(test.size)
res := limitNumPeers(ps, 1) sz, res := limitNumPeers(ps, 1)
res = res[:sz]
if len(res) != test.expSize { if len(res) != test.expSize {
t.Errorf("result size unexpected: %v / %v", len(res), test.expSize) t.Errorf("result size unexpected: %v / %v", len(res), test.expSize)
@ -183,8 +184,10 @@ func TestLimitPeersWithBound_random(t *testing.T) {
ps2 := makePeersForTest(100) ps2 := makePeersForTest(100)
s1, s2 := int64(1), int64(2) s1, s2 := int64(1), int64(2)
res1 := limitNumPeers(ps1, s1) sz1, res1 := limitNumPeers(ps1, s1)
res2 := limitNumPeers(ps2, s2) res1 = res1[:sz1]
sz2, res2 := limitNumPeers(ps2, s2)
res2 = res2[:sz2]
if reflect.DeepEqual(res1, res2) { if reflect.DeepEqual(res1, res2) {
t.Fatal("not randomized limit peer") t.Fatal("not randomized limit peer")
} }

@ -663,7 +663,7 @@ func (ss *StagedSync) AddNewBlock(peerHash []byte, block *types.Block) {
} }
// CreateSyncConfig creates SyncConfig for StateSync object. // CreateSyncConfig creates SyncConfig for StateSync object.
func (ss *StagedSync) CreateSyncConfig(peers []p2p.Peer, shardID uint32) error { func (ss *StagedSync) CreateSyncConfig(peers []p2p.Peer, shardID uint32, waitForEachPeerToConnect bool) error {
// sanity check to ensure no duplicate peers // sanity check to ensure no duplicate peers
if err := checkPeersDuplicity(peers); err != nil { if err := checkPeersDuplicity(peers); err != nil {
return err return err

@ -226,7 +226,7 @@ func (sc *SyncConfig) RemovePeer(peer *SyncPeerConfig, reason string) {
sc.mtx.Lock() sc.mtx.Lock()
defer sc.mtx.Unlock() defer sc.mtx.Unlock()
peer.client.Close() peer.client.Close(reason)
for i, p := range sc.peers { for i, p := range sc.peers {
if p == peer { if p == peer {
sc.peers = append(sc.peers[:i], sc.peers[i+1:]...) sc.peers = append(sc.peers[:i], sc.peers[i+1:]...)
@ -245,7 +245,7 @@ func (sc *SyncConfig) ReplacePeerWithReserved(peer *SyncPeerConfig, reason strin
sc.mtx.Lock() sc.mtx.Lock()
defer sc.mtx.Unlock() defer sc.mtx.Unlock()
peer.client.Close() peer.client.Close(reason)
for i, p := range sc.peers { for i, p := range sc.peers {
if p == peer { if p == peer {
if len(sc.reservedPeers) > 0 { if len(sc.reservedPeers) > 0 {
@ -277,7 +277,7 @@ func (sc *SyncConfig) CloseConnections() {
sc.mtx.RLock() sc.mtx.RLock()
defer sc.mtx.RUnlock() defer sc.mtx.RUnlock()
for _, pc := range sc.peers { for _, pc := range sc.peers {
pc.client.Close() pc.client.Close("close all connections")
} }
} }
@ -334,7 +334,7 @@ func (sc *SyncConfig) cleanUpPeers(maxFirstID int) {
// TODO: move it into a util delete func. // TODO: move it into a util delete func.
// See tip https://github.com/golang/go/wiki/SliceTricks // See tip https://github.com/golang/go/wiki/SliceTricks
// Close the client and remove the peer out of the // Close the client and remove the peer out of the
sc.peers[i].client.Close() sc.peers[i].client.Close("close by cleanup function, because blockHashes is not equal to consensus block hashes")
copy(sc.peers[i:], sc.peers[i+1:]) copy(sc.peers[i:], sc.peers[i+1:])
sc.peers[len(sc.peers)-1] = nil sc.peers[len(sc.peers)-1] = nil
sc.peers = sc.peers[:len(sc.peers)-1] sc.peers = sc.peers[:len(sc.peers)-1]
@ -347,7 +347,7 @@ func (sc *SyncConfig) cleanUpPeers(maxFirstID int) {
} }
} }
// cleanUpInvalidPeers cleans up all peers whose missed any required block hash or sent any invalid block hash // cleanUpInvalidPeers cleans up all peers whose missed a few required block hash or sent an invalid block hash
// Caller shall ensure mtx is locked for RW. // Caller shall ensure mtx is locked for RW.
func (sc *SyncConfig) cleanUpInvalidPeers(ipm map[string]bool) { func (sc *SyncConfig) cleanUpInvalidPeers(ipm map[string]bool) {
sc.mtx.Lock() sc.mtx.Lock()
@ -355,7 +355,7 @@ func (sc *SyncConfig) cleanUpInvalidPeers(ipm map[string]bool) {
countBeforeCleanUp := len(sc.peers) countBeforeCleanUp := len(sc.peers)
for i := 0; i < len(sc.peers); i++ { for i := 0; i < len(sc.peers); i++ {
if ipm[string(sc.peers[i].peerHash)] == true { if ipm[string(sc.peers[i].peerHash)] == true {
sc.peers[i].client.Close() sc.peers[i].client.Close("cleanup invalid peers, it may missed a few required block hashes or sent an invalid block hash")
copy(sc.peers[i:], sc.peers[i+1:]) copy(sc.peers[i:], sc.peers[i+1:])
sc.peers[len(sc.peers)-1] = nil sc.peers[len(sc.peers)-1] = nil
sc.peers = sc.peers[:len(sc.peers)-1] sc.peers = sc.peers[:len(sc.peers)-1]

@ -308,6 +308,14 @@ func init() {
return confTree return confTree
} }
migrations["2.5.9"] = func(confTree *toml.Tree) *toml.Tree {
if confTree.Get("P2P.WaitForEachPeerToConnect") == nil {
confTree.Set("P2P.WaitForEachPeerToConnect", defaultConfig.P2P.WaitForEachPeerToConnect)
}
confTree.Set("Version", "2.5.10")
return confTree
}
// check that the latest version here is the same as in default.go // check that the latest version here is the same as in default.go
largestKey := getNextVersion(migrations) largestKey := getNextVersion(migrations)
if largestKey != tomlConfigVersion { if largestKey != tomlConfigVersion {

@ -5,7 +5,7 @@ import (
nodeconfig "github.com/harmony-one/harmony/internal/configs/node" nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
) )
const tomlConfigVersion = "2.5.9" const tomlConfigVersion = "2.5.10"
const ( const (
defNetworkType = nodeconfig.Mainnet defNetworkType = nodeconfig.Mainnet
@ -32,6 +32,7 @@ var defaultConfig = harmonyconfig.HarmonyConfig{
MaxConnsPerIP: nodeconfig.DefaultMaxConnPerIP, MaxConnsPerIP: nodeconfig.DefaultMaxConnPerIP,
DisablePrivateIPScan: false, DisablePrivateIPScan: false,
MaxPeers: nodeconfig.DefaultMaxPeers, MaxPeers: nodeconfig.DefaultMaxPeers,
WaitForEachPeerToConnect: nodeconfig.DefaultWaitForEachPeerToConnect,
}, },
HTTP: harmonyconfig.HttpConfig{ HTTP: harmonyconfig.HttpConfig{
Enabled: true, Enabled: true,

@ -579,6 +579,11 @@ var (
Usage: "maximum number of peers allowed, 0 means no limit", Usage: "maximum number of peers allowed, 0 means no limit",
DefValue: defaultConfig.P2P.MaxConnsPerIP, DefValue: defaultConfig.P2P.MaxConnsPerIP,
} }
waitForEachPeerToConnectFlag = cli.BoolFlag{
Name: "p2p.wait-for-connections",
Usage: "node waits for each single peer to connect and it doesn't add them to peers list after timeout",
DefValue: defaultConfig.P2P.WaitForEachPeerToConnect,
}
) )
func applyP2PFlags(cmd *cobra.Command, config *harmonyconfig.HarmonyConfig) { func applyP2PFlags(cmd *cobra.Command, config *harmonyconfig.HarmonyConfig) {
@ -615,6 +620,10 @@ func applyP2PFlags(cmd *cobra.Command, config *harmonyconfig.HarmonyConfig) {
config.P2P.MaxPeers = int64(cli.GetIntFlagValue(cmd, maxPeersFlag)) config.P2P.MaxPeers = int64(cli.GetIntFlagValue(cmd, maxPeersFlag))
} }
if cli.IsFlagChanged(cmd, waitForEachPeerToConnectFlag) {
config.P2P.WaitForEachPeerToConnect = cli.GetBoolFlagValue(cmd, waitForEachPeerToConnectFlag)
}
if cli.IsFlagChanged(cmd, p2pDisablePrivateIPScanFlag) { if cli.IsFlagChanged(cmd, p2pDisablePrivateIPScanFlag) {
config.P2P.DisablePrivateIPScan = cli.GetBoolFlagValue(cmd, p2pDisablePrivateIPScanFlag) config.P2P.DisablePrivateIPScan = cli.GetBoolFlagValue(cmd, p2pDisablePrivateIPScanFlag)
} }

@ -65,6 +65,7 @@ func TestHarmonyFlags(t *testing.T) {
MaxConnsPerIP: 5, MaxConnsPerIP: 5,
DisablePrivateIPScan: false, DisablePrivateIPScan: false,
MaxPeers: defaultConfig.P2P.MaxPeers, MaxPeers: defaultConfig.P2P.MaxPeers,
WaitForEachPeerToConnect: false,
}, },
HTTP: harmonyconfig.HttpConfig{ HTTP: harmonyconfig.HttpConfig{
Enabled: true, Enabled: true,
@ -373,6 +374,7 @@ func TestP2PFlags(t *testing.T) {
MaxConnsPerIP: 10, MaxConnsPerIP: 10,
DisablePrivateIPScan: false, DisablePrivateIPScan: false,
MaxPeers: defaultConfig.P2P.MaxPeers, MaxPeers: defaultConfig.P2P.MaxPeers,
WaitForEachPeerToConnect: false,
}, },
}, },
{ {
@ -384,6 +386,7 @@ func TestP2PFlags(t *testing.T) {
MaxConnsPerIP: 10, MaxConnsPerIP: 10,
DisablePrivateIPScan: false, DisablePrivateIPScan: false,
MaxPeers: defaultConfig.P2P.MaxPeers, MaxPeers: defaultConfig.P2P.MaxPeers,
WaitForEachPeerToConnect: false,
}, },
}, },
{ {
@ -396,6 +399,7 @@ func TestP2PFlags(t *testing.T) {
MaxConnsPerIP: 5, MaxConnsPerIP: 5,
DisablePrivateIPScan: false, DisablePrivateIPScan: false,
MaxPeers: defaultConfig.P2P.MaxPeers, MaxPeers: defaultConfig.P2P.MaxPeers,
WaitForEachPeerToConnect: false,
}, },
}, },
{ {
@ -408,6 +412,7 @@ func TestP2PFlags(t *testing.T) {
MaxConnsPerIP: nodeconfig.DefaultMaxConnPerIP, MaxConnsPerIP: nodeconfig.DefaultMaxConnPerIP,
DisablePrivateIPScan: true, DisablePrivateIPScan: true,
MaxPeers: defaultConfig.P2P.MaxPeers, MaxPeers: defaultConfig.P2P.MaxPeers,
WaitForEachPeerToConnect: false,
}, },
}, },
{ {
@ -420,6 +425,7 @@ func TestP2PFlags(t *testing.T) {
MaxConnsPerIP: nodeconfig.DefaultMaxConnPerIP, MaxConnsPerIP: nodeconfig.DefaultMaxConnPerIP,
DisablePrivateIPScan: defaultConfig.P2P.DisablePrivateIPScan, DisablePrivateIPScan: defaultConfig.P2P.DisablePrivateIPScan,
MaxPeers: 100, MaxPeers: 100,
WaitForEachPeerToConnect: false,
}, },
}, },
} }

@ -632,6 +632,7 @@ func createGlobalConfig(hc harmonyconfig.HarmonyConfig) (*nodeconfig.ConfigType,
MaxConnPerIP: hc.P2P.MaxConnsPerIP, MaxConnPerIP: hc.P2P.MaxConnsPerIP,
DisablePrivateIPScan: hc.P2P.DisablePrivateIPScan, DisablePrivateIPScan: hc.P2P.DisablePrivateIPScan,
MaxPeers: hc.P2P.MaxPeers, MaxPeers: hc.P2P.MaxPeers,
WaitForEachPeerToConnect: hc.P2P.WaitForEachPeerToConnect,
}) })
if err != nil { if err != nil {
return nil, errors.Wrap(err, "cannot create P2P network host") return nil, errors.Wrap(err, "cannot create P2P network host")

@ -54,6 +54,7 @@ type P2pConfig struct {
MaxConnsPerIP int MaxConnsPerIP int
DisablePrivateIPScan bool DisablePrivateIPScan bool
MaxPeers int64 MaxPeers int64
WaitForEachPeerToConnect bool
} }
type GeneralConfig struct { type GeneralConfig struct {

@ -63,6 +63,8 @@ const (
DefaultMaxConnPerIP = 10 DefaultMaxConnPerIP = 10
// DefaultMaxPeers is the maximum number of remote peers, with 0 representing no limit // DefaultMaxPeers is the maximum number of remote peers, with 0 representing no limit
DefaultMaxPeers = 0 DefaultMaxPeers = 0
// DefaultWaitForEachPeerToConnect sets the sync configs to connect to neighbor peers one by one and waits for each peer to connect
DefaultWaitForEachPeerToConnect = false
) )
const ( const (

@ -86,7 +86,7 @@ type ISync interface {
UpdateBlockAndStatus(block *types.Block, bc core.BlockChain, verifyAllSig bool) error UpdateBlockAndStatus(block *types.Block, bc core.BlockChain, verifyAllSig bool) error
AddLastMileBlock(block *types.Block) AddLastMileBlock(block *types.Block)
GetActivePeerNumber() int GetActivePeerNumber() int
CreateSyncConfig(peers []p2p.Peer, shardID uint32) error CreateSyncConfig(peers []p2p.Peer, shardID uint32, waitForEachPeerToConnect bool) error
SyncLoop(bc core.BlockChain, worker *worker.Worker, isBeacon bool, consensus *consensus.Consensus, loopMinTime time.Duration) SyncLoop(bc core.BlockChain, worker *worker.Worker, isBeacon bool, consensus *consensus.Consensus, loopMinTime time.Duration)
IsSynchronized() bool IsSynchronized() bool
IsSameBlockchainHeight(bc core.BlockChain) (uint64, bool) IsSameBlockchainHeight(bc core.BlockChain) (uint64, bool)

@ -253,7 +253,7 @@ func (node *Node) doBeaconSyncing() {
continue continue
} }
if err := node.epochSync.CreateSyncConfig(peers, shard.BeaconChainShardID); err != nil { if err := node.epochSync.CreateSyncConfig(peers, shard.BeaconChainShardID, node.HarmonyConfig.P2P.WaitForEachPeerToConnect); err != nil {
utils.Logger().Warn().Err(err).Msg("[EPOCHSYNC] cannot create beacon sync config") utils.Logger().Warn().Err(err).Msg("[EPOCHSYNC] cannot create beacon sync config")
continue continue
} }
@ -296,7 +296,7 @@ func (node *Node) doSync(bc core.BlockChain, worker *worker.Worker, willJoinCons
Msg("cannot retrieve syncing peers") Msg("cannot retrieve syncing peers")
return return
} }
if err := syncInstance.CreateSyncConfig(peers, shardID); err != nil { if err := node.stateSync.CreateSyncConfig(peers, shardID, node.HarmonyConfig.P2P.WaitForEachPeerToConnect); err != nil {
utils.Logger().Warn(). utils.Logger().Warn().
Err(err). Err(err).
Interface("peers", peers). Interface("peers", peers).
@ -416,7 +416,7 @@ func (node *Node) SendNewBlockToUnsync() {
elapseTime := time.Now().UnixNano() - config.timestamp elapseTime := time.Now().UnixNano() - config.timestamp
if elapseTime > broadcastTimeout { if elapseTime > broadcastTimeout {
utils.Logger().Warn().Str("peerID", peerID).Msg("[SYNC] SendNewBlockToUnsync to peer timeout") utils.Logger().Warn().Str("peerID", peerID).Msg("[SYNC] SendNewBlockToUnsync to peer timeout")
node.peerRegistrationRecord[peerID].client.Close() node.peerRegistrationRecord[peerID].client.Close("send new block to peer timeout")
delete(node.peerRegistrationRecord, peerID) delete(node.peerRegistrationRecord, peerID)
continue continue
} }
@ -425,13 +425,13 @@ func (node *Node) SendNewBlockToUnsync() {
sendBytes = blockWithSigBytes sendBytes = blockWithSigBytes
} }
response, err := config.client.PushNewBlock(node.GetSyncID(), sendBytes, false) response, err := config.client.PushNewBlock(node.GetSyncID(), sendBytes, false)
// close the connection if cannot push new block to unsync node // close the connection if cannot push new block to not synchronized node
if err != nil { if err != nil {
node.peerRegistrationRecord[peerID].client.Close() node.peerRegistrationRecord[peerID].client.Close("cannot push new block to not synchronized node")
delete(node.peerRegistrationRecord, peerID) delete(node.peerRegistrationRecord, peerID)
} }
if response != nil && response.Type == downloader_pb.DownloaderResponse_INSYNC { if response != nil && response.Type == downloader_pb.DownloaderResponse_INSYNC {
node.peerRegistrationRecord[peerID].client.Close() node.peerRegistrationRecord[peerID].client.Close("node is synchronized")
delete(node.peerRegistrationRecord, peerID) delete(node.peerRegistrationRecord, peerID)
} }
} }

@ -88,6 +88,7 @@ type HostConfig struct {
MaxConnPerIP int MaxConnPerIP int
DisablePrivateIPScan bool DisablePrivateIPScan bool
MaxPeers int64 MaxPeers int64
WaitForEachPeerToConnect bool
} }
func init() { func init() {

Loading…
Cancel
Save