diff --git a/api/service/legacysync/downloader/client.go b/api/service/legacysync/downloader/client.go index cb2ea3f4d..de08c5a98 100644 --- a/api/service/legacysync/downloader/client.go +++ b/api/service/legacysync/downloader/client.go @@ -16,6 +16,7 @@ type Client struct { dlClient pb.DownloaderClient opts []grpc.DialOption conn *grpc.ClientConn + addr string } // ClientSetup setups a Client given ip and port. @@ -28,8 +29,9 @@ func ClientSetup(ip, port string, withBlock bool) *Client { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() + client.addr = fmt.Sprintf("%s:%s", ip, port) var err error - client.conn, err = grpc.DialContext(ctx, fmt.Sprintf(ip+":"+port), client.opts...) + client.conn, err = grpc.DialContext(ctx, client.addr, client.opts...) if err != nil { utils.Logger().Error().Err(err).Str("ip", ip).Msg("[SYNC] client.go:ClientSetup fail to dial") return nil @@ -71,11 +73,18 @@ func (client *Client) WaitForConnection(t time.Duration) bool { } // Close closes the Client. -func (client *Client) Close() { +func (client *Client) Close(reason string) { err := client.conn.Close() if err != nil { - utils.Logger().Info().Msg("[SYNC] unable to close connection") - } + utils.Logger().Info(). + Str("peerAddress", client.addr). + Msg("[SYNC] unable to close peer connection") + return + } + utils.Logger().Info(). + Str("peerAddress", client.addr). + Str("reason", reason). + Msg("[SYNC] peer connection closed") } // GetBlockHashes gets block hashes from all the peers by calling grpc request. diff --git a/api/service/legacysync/epoch_syncing.go b/api/service/legacysync/epoch_syncing.go index f2d222b29..ddc69a8a4 100644 --- a/api/service/legacysync/epoch_syncing.go +++ b/api/service/legacysync/epoch_syncing.go @@ -94,7 +94,7 @@ func (ss *EpochSync) syncLoop(bc core.BlockChain, isBeacon bool, _ *consensus.Co for { if maxHeight == 0 || maxHeight == math.MaxUint64 { utils.Logger().Info(). - Msgf("[EPOCHSYNC] No peers to sync (isBeacon: %t, ShardID: %d, peerscount: %d)", + Msgf("[EPOCHSYNC] No peers to sync (isBeacon: %t, ShardID: %d, peersCount: %d)", isBeacon, bc.ShardID(), ss.syncConfig.PeersCount()) return 10 } @@ -109,7 +109,7 @@ func (ss *EpochSync) syncLoop(bc core.BlockChain, isBeacon bool, _ *consensus.Co } if otherEpoch < curEpoch { for _, peerCfg := range ss.syncConfig.GetPeers() { - ss.syncConfig.RemovePeer(peerCfg, fmt.Sprintf("[EPOCHSYNC]: current height is higher that others, removve peers: %s", peerCfg.String())) + ss.syncConfig.RemovePeer(peerCfg, fmt.Sprintf("[EPOCHSYNC]: current height is higher that others, remove peers: %s", peerCfg.String())) } return 2 } @@ -201,8 +201,8 @@ func (ss *EpochSync) processWithPayload(payload [][]byte, bc core.BlockChain) er } // CreateSyncConfig creates SyncConfig for StateSync object. -func (ss *EpochSync) CreateSyncConfig(peers []p2p.Peer, shardID uint32) error { +func (ss *EpochSync) CreateSyncConfig(peers []p2p.Peer, shardID uint32, waitForEachPeerToConnect bool) error { var err error - ss.syncConfig, err = createSyncConfig(ss.syncConfig, peers, shardID) + ss.syncConfig, err = createSyncConfig(ss.syncConfig, peers, shardID, waitForEachPeerToConnect) return err } diff --git a/api/service/legacysync/helpers.go b/api/service/legacysync/helpers.go index a54b3cd18..656eab26f 100644 --- a/api/service/legacysync/helpers.go +++ b/api/service/legacysync/helpers.go @@ -3,6 +3,7 @@ package legacysync import ( "fmt" "sync" + "time" "github.com/ethereum/go-ethereum/common/math" "github.com/harmony-one/harmony/api/service/legacysync/downloader" @@ -50,18 +51,22 @@ func getMaxPeerHeight(syncConfig *SyncConfig) uint64 { return maxHeight } -func createSyncConfig(syncConfig *SyncConfig, peers []p2p.Peer, shardID uint32) (*SyncConfig, error) { +func createSyncConfig(syncConfig *SyncConfig, peers []p2p.Peer, shardID uint32, waitForEachPeerToConnect bool) (*SyncConfig, error) { // sanity check to ensure no duplicate peers if err := checkPeersDuplicity(peers); err != nil { return syncConfig, err } + // limit the number of dns peers to connect + randSeed := time.Now().UnixNano() + targetSize, peers := limitNumPeers(peers, randSeed) utils.Logger().Debug(). - Int("len", len(peers)). + Int("peers count", len(peers)). + Int("target size", targetSize). Uint32("shardID", shardID). Msg("[SYNC] CreateSyncConfig: len of peers") - if len(peers) == 0 { + if targetSize == 0 { return syncConfig, errors.New("[SYNC] no peers to connect to") } if syncConfig != nil { @@ -69,14 +74,32 @@ func createSyncConfig(syncConfig *SyncConfig, peers []p2p.Peer, shardID uint32) } syncConfig = NewSyncConfig(shardID, nil) - var wg sync.WaitGroup - for _, peer := range peers { - wg.Add(1) - go func(peer p2p.Peer) { - defer wg.Done() - client := downloader.ClientSetup(peer.IP, peer.Port, false) - if client == nil { - return + if !waitForEachPeerToConnect { + var wg sync.WaitGroup + ps := peers[:targetSize] + for _, peer := range ps { + wg.Add(1) + go func(peer p2p.Peer) { + defer wg.Done() + client := downloader.ClientSetup(peer.IP, peer.Port, false) + if client == nil { + return + } + peerConfig := &SyncPeerConfig{ + ip: peer.IP, + port: peer.Port, + client: client, + } + syncConfig.AddPeer(peerConfig) + }(peer) + } + wg.Wait() + } else { + var connectedPeers int + for _, peer := range peers { + client := downloader.ClientSetup(peer.IP, peer.Port, true) + if client == nil || !client.IsReady() { + continue } peerConfig := &SyncPeerConfig{ ip: peer.IP, @@ -84,9 +107,13 @@ func createSyncConfig(syncConfig *SyncConfig, peers []p2p.Peer, shardID uint32) client: client, } syncConfig.AddPeer(peerConfig) - }(peer) + connectedPeers++ + if connectedPeers >= targetSize { + break + } + } } - wg.Wait() + utils.Logger().Info(). Int("len", len(syncConfig.peers)). Uint32("shardID", shardID). diff --git a/api/service/legacysync/syncing.go b/api/service/legacysync/syncing.go index 025ea5902..7fe2c894f 100644 --- a/api/service/legacysync/syncing.go +++ b/api/service/legacysync/syncing.go @@ -174,7 +174,8 @@ func (sc *SyncConfig) RemovePeer(peer *SyncPeerConfig, reason string) { sc.mtx.Lock() defer sc.mtx.Unlock() - peer.client.Close() + closeReason := fmt.Sprintf("remove peer (reason: %s)", reason) + peer.client.Close(closeReason) for i, p := range sc.peers { if p == peer { sc.peers = append(sc.peers[:i], sc.peers[i+1:]...) @@ -288,7 +289,7 @@ func (sc *SyncConfig) CloseConnections() { sc.mtx.RLock() defer sc.mtx.RUnlock() for _, pc := range sc.peers { - pc.client.Close() + pc.client.Close("close all connections") } } @@ -363,9 +364,9 @@ func (peerConfig *SyncPeerConfig) GetBlocks(hashes [][]byte) ([][]byte, error) { } // CreateSyncConfig creates SyncConfig for StateSync object. -func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer, shardID uint32) error { +func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer, shardID uint32, waitForEachPeerToConnect bool) error { var err error - ss.syncConfig, err = createSyncConfig(ss.syncConfig, peers, shardID) + ss.syncConfig, err = createSyncConfig(ss.syncConfig, peers, shardID, waitForEachPeerToConnect) return err } @@ -387,16 +388,16 @@ func checkPeersDuplicity(ps []p2p.Peer) error { } // limitNumPeers limits number of peers to release some server end sources. -func limitNumPeers(ps []p2p.Peer, randSeed int64) []p2p.Peer { +func limitNumPeers(ps []p2p.Peer, randSeed int64) (int, []p2p.Peer) { targetSize := calcNumPeersWithBound(len(ps), NumPeersLowBound, numPeersHighBound) if len(ps) <= targetSize { - return ps + return len(ps), ps } r := rand.New(rand.NewSource(randSeed)) r.Shuffle(len(ps), func(i, j int) { ps[i], ps[j] = ps[j], ps[i] }) - return ps[:targetSize] + return targetSize, ps } // Peers are expected to limited at half of the size, capped between lowBound and highBound. @@ -462,19 +463,20 @@ func (sc *SyncConfig) InitForTesting(client *downloader.Client, blockHashes [][] func (sc *SyncConfig) cleanUpPeers(maxFirstID int) { fixedPeer := sc.peers[maxFirstID] - utils.Logger().Info().Int("peers", len(sc.peers)).Msg("[SYNC] before cleanUpPeers") + var removedPeers int for i := 0; i < len(sc.peers); i++ { if CompareSyncPeerConfigByblockHashes(fixedPeer, sc.peers[i]) != 0 { // TODO: move it into a util delete func. // See tip https://github.com/golang/go/wiki/SliceTricks // Close the client and remove the peer out of the - sc.peers[i].client.Close() + sc.peers[i].client.Close("cleanup peers") copy(sc.peers[i:], sc.peers[i+1:]) sc.peers[len(sc.peers)-1] = nil sc.peers = sc.peers[:len(sc.peers)-1] + removedPeers++ } } - utils.Logger().Info().Int("peers", len(sc.peers)).Msg("[SYNC] post cleanUpPeers") + utils.Logger().Info().Int("removed peers", removedPeers).Msg("[SYNC] post cleanUpPeers") } // GetBlockHashesConsensusAndCleanUp selects the most common peer config based on their block hashes to download/sync. diff --git a/api/service/legacysync/syncing_test.go b/api/service/legacysync/syncing_test.go index 954c757d1..ed6f84503 100644 --- a/api/service/legacysync/syncing_test.go +++ b/api/service/legacysync/syncing_test.go @@ -167,7 +167,8 @@ func TestLimitPeersWithBound(t *testing.T) { for _, test := range tests { ps := makePeersForTest(test.size) - res := limitNumPeers(ps, 1) + sz, res := limitNumPeers(ps, 1) + res = res[:sz] if len(res) != test.expSize { t.Errorf("result size unexpected: %v / %v", len(res), test.expSize) @@ -183,8 +184,10 @@ func TestLimitPeersWithBound_random(t *testing.T) { ps2 := makePeersForTest(100) s1, s2 := int64(1), int64(2) - res1 := limitNumPeers(ps1, s1) - res2 := limitNumPeers(ps2, s2) + sz1, res1 := limitNumPeers(ps1, s1) + res1 = res1[:sz1] + sz2, res2 := limitNumPeers(ps2, s2) + res2 = res2[:sz2] if reflect.DeepEqual(res1, res2) { t.Fatal("not randomized limit peer") } diff --git a/api/service/stagedsync/stagedsync.go b/api/service/stagedsync/stagedsync.go index 67369c0fa..88df1a671 100644 --- a/api/service/stagedsync/stagedsync.go +++ b/api/service/stagedsync/stagedsync.go @@ -663,7 +663,7 @@ func (ss *StagedSync) AddNewBlock(peerHash []byte, block *types.Block) { } // CreateSyncConfig creates SyncConfig for StateSync object. -func (ss *StagedSync) CreateSyncConfig(peers []p2p.Peer, shardID uint32) error { +func (ss *StagedSync) CreateSyncConfig(peers []p2p.Peer, shardID uint32, waitForEachPeerToConnect bool) error { // sanity check to ensure no duplicate peers if err := checkPeersDuplicity(peers); err != nil { return err diff --git a/api/service/stagedsync/sync_config.go b/api/service/stagedsync/sync_config.go index 55ea3a3ac..f42737cc1 100644 --- a/api/service/stagedsync/sync_config.go +++ b/api/service/stagedsync/sync_config.go @@ -226,7 +226,7 @@ func (sc *SyncConfig) RemovePeer(peer *SyncPeerConfig, reason string) { sc.mtx.Lock() defer sc.mtx.Unlock() - peer.client.Close() + peer.client.Close(reason) for i, p := range sc.peers { if p == peer { sc.peers = append(sc.peers[:i], sc.peers[i+1:]...) @@ -245,7 +245,7 @@ func (sc *SyncConfig) ReplacePeerWithReserved(peer *SyncPeerConfig, reason strin sc.mtx.Lock() defer sc.mtx.Unlock() - peer.client.Close() + peer.client.Close(reason) for i, p := range sc.peers { if p == peer { if len(sc.reservedPeers) > 0 { @@ -277,7 +277,7 @@ func (sc *SyncConfig) CloseConnections() { sc.mtx.RLock() defer sc.mtx.RUnlock() for _, pc := range sc.peers { - pc.client.Close() + pc.client.Close("close all connections") } } @@ -334,7 +334,7 @@ func (sc *SyncConfig) cleanUpPeers(maxFirstID int) { // TODO: move it into a util delete func. // See tip https://github.com/golang/go/wiki/SliceTricks // Close the client and remove the peer out of the - sc.peers[i].client.Close() + sc.peers[i].client.Close("close by cleanup function, because blockHashes is not equal to consensus block hashes") copy(sc.peers[i:], sc.peers[i+1:]) sc.peers[len(sc.peers)-1] = nil sc.peers = sc.peers[:len(sc.peers)-1] @@ -347,7 +347,7 @@ func (sc *SyncConfig) cleanUpPeers(maxFirstID int) { } } -// cleanUpInvalidPeers cleans up all peers whose missed any required block hash or sent any invalid block hash +// cleanUpInvalidPeers cleans up all peers whose missed a few required block hash or sent an invalid block hash // Caller shall ensure mtx is locked for RW. func (sc *SyncConfig) cleanUpInvalidPeers(ipm map[string]bool) { sc.mtx.Lock() @@ -355,7 +355,7 @@ func (sc *SyncConfig) cleanUpInvalidPeers(ipm map[string]bool) { countBeforeCleanUp := len(sc.peers) for i := 0; i < len(sc.peers); i++ { if ipm[string(sc.peers[i].peerHash)] == true { - sc.peers[i].client.Close() + sc.peers[i].client.Close("cleanup invalid peers, it may missed a few required block hashes or sent an invalid block hash") copy(sc.peers[i:], sc.peers[i+1:]) sc.peers[len(sc.peers)-1] = nil sc.peers = sc.peers[:len(sc.peers)-1] diff --git a/cmd/harmony/config_migrations.go b/cmd/harmony/config_migrations.go index c5d86bdd0..70bc8fa71 100644 --- a/cmd/harmony/config_migrations.go +++ b/cmd/harmony/config_migrations.go @@ -308,6 +308,14 @@ func init() { return confTree } + migrations["2.5.9"] = func(confTree *toml.Tree) *toml.Tree { + if confTree.Get("P2P.WaitForEachPeerToConnect") == nil { + confTree.Set("P2P.WaitForEachPeerToConnect", defaultConfig.P2P.WaitForEachPeerToConnect) + } + confTree.Set("Version", "2.5.10") + return confTree + } + // check that the latest version here is the same as in default.go largestKey := getNextVersion(migrations) if largestKey != tomlConfigVersion { diff --git a/cmd/harmony/default.go b/cmd/harmony/default.go index 84295eb87..7de12af9c 100644 --- a/cmd/harmony/default.go +++ b/cmd/harmony/default.go @@ -5,7 +5,7 @@ import ( nodeconfig "github.com/harmony-one/harmony/internal/configs/node" ) -const tomlConfigVersion = "2.5.9" +const tomlConfigVersion = "2.5.10" const ( defNetworkType = nodeconfig.Mainnet @@ -25,13 +25,14 @@ var defaultConfig = harmonyconfig.HarmonyConfig{ }, Network: getDefaultNetworkConfig(defNetworkType), P2P: harmonyconfig.P2pConfig{ - Port: nodeconfig.DefaultP2PPort, - IP: nodeconfig.DefaultPublicListenIP, - KeyFile: "./.hmykey", - DiscConcurrency: nodeconfig.DefaultP2PConcurrency, - MaxConnsPerIP: nodeconfig.DefaultMaxConnPerIP, - DisablePrivateIPScan: false, - MaxPeers: nodeconfig.DefaultMaxPeers, + Port: nodeconfig.DefaultP2PPort, + IP: nodeconfig.DefaultPublicListenIP, + KeyFile: "./.hmykey", + DiscConcurrency: nodeconfig.DefaultP2PConcurrency, + MaxConnsPerIP: nodeconfig.DefaultMaxConnPerIP, + DisablePrivateIPScan: false, + MaxPeers: nodeconfig.DefaultMaxPeers, + WaitForEachPeerToConnect: nodeconfig.DefaultWaitForEachPeerToConnect, }, HTTP: harmonyconfig.HttpConfig{ Enabled: true, diff --git a/cmd/harmony/flags.go b/cmd/harmony/flags.go index d2c458149..26a250d95 100644 --- a/cmd/harmony/flags.go +++ b/cmd/harmony/flags.go @@ -579,6 +579,11 @@ var ( Usage: "maximum number of peers allowed, 0 means no limit", DefValue: defaultConfig.P2P.MaxConnsPerIP, } + waitForEachPeerToConnectFlag = cli.BoolFlag{ + Name: "p2p.wait-for-connections", + Usage: "node waits for each single peer to connect and it doesn't add them to peers list after timeout", + DefValue: defaultConfig.P2P.WaitForEachPeerToConnect, + } ) func applyP2PFlags(cmd *cobra.Command, config *harmonyconfig.HarmonyConfig) { @@ -615,6 +620,10 @@ func applyP2PFlags(cmd *cobra.Command, config *harmonyconfig.HarmonyConfig) { config.P2P.MaxPeers = int64(cli.GetIntFlagValue(cmd, maxPeersFlag)) } + if cli.IsFlagChanged(cmd, waitForEachPeerToConnectFlag) { + config.P2P.WaitForEachPeerToConnect = cli.GetBoolFlagValue(cmd, waitForEachPeerToConnectFlag) + } + if cli.IsFlagChanged(cmd, p2pDisablePrivateIPScanFlag) { config.P2P.DisablePrivateIPScan = cli.GetBoolFlagValue(cmd, p2pDisablePrivateIPScanFlag) } diff --git a/cmd/harmony/flags_test.go b/cmd/harmony/flags_test.go index d47a1712f..7bd3e7199 100644 --- a/cmd/harmony/flags_test.go +++ b/cmd/harmony/flags_test.go @@ -58,13 +58,14 @@ func TestHarmonyFlags(t *testing.T) { ServerPort: nodeconfig.DefaultDNSPort, }, P2P: harmonyconfig.P2pConfig{ - Port: 9000, - IP: defaultConfig.P2P.IP, - KeyFile: defaultConfig.P2P.KeyFile, - DiscConcurrency: 5, - MaxConnsPerIP: 5, - DisablePrivateIPScan: false, - MaxPeers: defaultConfig.P2P.MaxPeers, + Port: 9000, + IP: defaultConfig.P2P.IP, + KeyFile: defaultConfig.P2P.KeyFile, + DiscConcurrency: 5, + MaxConnsPerIP: 5, + DisablePrivateIPScan: false, + MaxPeers: defaultConfig.P2P.MaxPeers, + WaitForEachPeerToConnect: false, }, HTTP: harmonyconfig.HttpConfig{ Enabled: true, @@ -366,60 +367,65 @@ func TestP2PFlags(t *testing.T) { args: []string{"--p2p.port", "9001", "--p2p.keyfile", "./key.file", "--p2p.dht.datastore", defDataStore}, expConfig: harmonyconfig.P2pConfig{ - Port: 9001, - IP: nodeconfig.DefaultPublicListenIP, - KeyFile: "./key.file", - DHTDataStore: &defDataStore, - MaxConnsPerIP: 10, - DisablePrivateIPScan: false, - MaxPeers: defaultConfig.P2P.MaxPeers, + Port: 9001, + IP: nodeconfig.DefaultPublicListenIP, + KeyFile: "./key.file", + DHTDataStore: &defDataStore, + MaxConnsPerIP: 10, + DisablePrivateIPScan: false, + MaxPeers: defaultConfig.P2P.MaxPeers, + WaitForEachPeerToConnect: false, }, }, { args: []string{"--port", "9001", "--key", "./key.file"}, expConfig: harmonyconfig.P2pConfig{ - Port: 9001, - IP: nodeconfig.DefaultPublicListenIP, - KeyFile: "./key.file", - MaxConnsPerIP: 10, - DisablePrivateIPScan: false, - MaxPeers: defaultConfig.P2P.MaxPeers, + Port: 9001, + IP: nodeconfig.DefaultPublicListenIP, + KeyFile: "./key.file", + MaxConnsPerIP: 10, + DisablePrivateIPScan: false, + MaxPeers: defaultConfig.P2P.MaxPeers, + WaitForEachPeerToConnect: false, }, }, { args: []string{"--p2p.port", "9001", "--p2p.disc.concurrency", "5", "--p2p.security.max-conn-per-ip", "5"}, expConfig: harmonyconfig.P2pConfig{ - Port: 9001, - IP: nodeconfig.DefaultPublicListenIP, - KeyFile: "./.hmykey", - DiscConcurrency: 5, - MaxConnsPerIP: 5, - DisablePrivateIPScan: false, - MaxPeers: defaultConfig.P2P.MaxPeers, + Port: 9001, + IP: nodeconfig.DefaultPublicListenIP, + KeyFile: "./.hmykey", + DiscConcurrency: 5, + MaxConnsPerIP: 5, + DisablePrivateIPScan: false, + MaxPeers: defaultConfig.P2P.MaxPeers, + WaitForEachPeerToConnect: false, }, }, { args: []string{"--p2p.no-private-ip-scan"}, expConfig: harmonyconfig.P2pConfig{ - Port: nodeconfig.DefaultP2PPort, - IP: nodeconfig.DefaultPublicListenIP, - KeyFile: "./.hmykey", - DiscConcurrency: nodeconfig.DefaultP2PConcurrency, - MaxConnsPerIP: nodeconfig.DefaultMaxConnPerIP, - DisablePrivateIPScan: true, - MaxPeers: defaultConfig.P2P.MaxPeers, + Port: nodeconfig.DefaultP2PPort, + IP: nodeconfig.DefaultPublicListenIP, + KeyFile: "./.hmykey", + DiscConcurrency: nodeconfig.DefaultP2PConcurrency, + MaxConnsPerIP: nodeconfig.DefaultMaxConnPerIP, + DisablePrivateIPScan: true, + MaxPeers: defaultConfig.P2P.MaxPeers, + WaitForEachPeerToConnect: false, }, }, { args: []string{"--p2p.security.max-peers", "100"}, expConfig: harmonyconfig.P2pConfig{ - Port: nodeconfig.DefaultP2PPort, - IP: nodeconfig.DefaultPublicListenIP, - KeyFile: "./.hmykey", - DiscConcurrency: nodeconfig.DefaultP2PConcurrency, - MaxConnsPerIP: nodeconfig.DefaultMaxConnPerIP, - DisablePrivateIPScan: defaultConfig.P2P.DisablePrivateIPScan, - MaxPeers: 100, + Port: nodeconfig.DefaultP2PPort, + IP: nodeconfig.DefaultPublicListenIP, + KeyFile: "./.hmykey", + DiscConcurrency: nodeconfig.DefaultP2PConcurrency, + MaxConnsPerIP: nodeconfig.DefaultMaxConnPerIP, + DisablePrivateIPScan: defaultConfig.P2P.DisablePrivateIPScan, + MaxPeers: 100, + WaitForEachPeerToConnect: false, }, }, } diff --git a/cmd/harmony/main.go b/cmd/harmony/main.go index bf5ebdcdf..276c09a2d 100644 --- a/cmd/harmony/main.go +++ b/cmd/harmony/main.go @@ -624,14 +624,15 @@ func createGlobalConfig(hc harmonyconfig.HarmonyConfig) (*nodeconfig.ConfigType, ConsensusPubKey: nodeConfig.ConsensusPriKey[0].Pub.Object, } myHost, err = p2p.NewHost(p2p.HostConfig{ - Self: &selfPeer, - BLSKey: nodeConfig.P2PPriKey, - BootNodes: hc.Network.BootNodes, - DataStoreFile: hc.P2P.DHTDataStore, - DiscConcurrency: hc.P2P.DiscConcurrency, - MaxConnPerIP: hc.P2P.MaxConnsPerIP, - DisablePrivateIPScan: hc.P2P.DisablePrivateIPScan, - MaxPeers: hc.P2P.MaxPeers, + Self: &selfPeer, + BLSKey: nodeConfig.P2PPriKey, + BootNodes: hc.Network.BootNodes, + DataStoreFile: hc.P2P.DHTDataStore, + DiscConcurrency: hc.P2P.DiscConcurrency, + MaxConnPerIP: hc.P2P.MaxConnsPerIP, + DisablePrivateIPScan: hc.P2P.DisablePrivateIPScan, + MaxPeers: hc.P2P.MaxPeers, + WaitForEachPeerToConnect: hc.P2P.WaitForEachPeerToConnect, }) if err != nil { return nil, errors.Wrap(err, "cannot create P2P network host") diff --git a/internal/configs/harmony/harmony.go b/internal/configs/harmony/harmony.go index 35a32825e..90569b96f 100644 --- a/internal/configs/harmony/harmony.go +++ b/internal/configs/harmony/harmony.go @@ -46,14 +46,15 @@ type NetworkConfig struct { } type P2pConfig struct { - Port int - IP string - KeyFile string - DHTDataStore *string `toml:",omitempty"` - DiscConcurrency int // Discovery Concurrency value - MaxConnsPerIP int - DisablePrivateIPScan bool - MaxPeers int64 + Port int + IP string + KeyFile string + DHTDataStore *string `toml:",omitempty"` + DiscConcurrency int // Discovery Concurrency value + MaxConnsPerIP int + DisablePrivateIPScan bool + MaxPeers int64 + WaitForEachPeerToConnect bool } type GeneralConfig struct { diff --git a/internal/configs/node/network.go b/internal/configs/node/network.go index 0749d43e7..332b5cce7 100644 --- a/internal/configs/node/network.go +++ b/internal/configs/node/network.go @@ -63,6 +63,8 @@ const ( DefaultMaxConnPerIP = 10 // DefaultMaxPeers is the maximum number of remote peers, with 0 representing no limit DefaultMaxPeers = 0 + // DefaultWaitForEachPeerToConnect sets the sync configs to connect to neighbor peers one by one and waits for each peer to connect + DefaultWaitForEachPeerToConnect = false ) const ( diff --git a/node/node.go b/node/node.go index c50e74e1f..d09a038df 100644 --- a/node/node.go +++ b/node/node.go @@ -86,7 +86,7 @@ type ISync interface { UpdateBlockAndStatus(block *types.Block, bc core.BlockChain, verifyAllSig bool) error AddLastMileBlock(block *types.Block) GetActivePeerNumber() int - CreateSyncConfig(peers []p2p.Peer, shardID uint32) error + CreateSyncConfig(peers []p2p.Peer, shardID uint32, waitForEachPeerToConnect bool) error SyncLoop(bc core.BlockChain, worker *worker.Worker, isBeacon bool, consensus *consensus.Consensus, loopMinTime time.Duration) IsSynchronized() bool IsSameBlockchainHeight(bc core.BlockChain) (uint64, bool) diff --git a/node/node_syncing.go b/node/node_syncing.go index 1b4a1e17c..ca3ca5bfd 100644 --- a/node/node_syncing.go +++ b/node/node_syncing.go @@ -253,7 +253,7 @@ func (node *Node) doBeaconSyncing() { continue } - if err := node.epochSync.CreateSyncConfig(peers, shard.BeaconChainShardID); err != nil { + if err := node.epochSync.CreateSyncConfig(peers, shard.BeaconChainShardID, node.HarmonyConfig.P2P.WaitForEachPeerToConnect); err != nil { utils.Logger().Warn().Err(err).Msg("[EPOCHSYNC] cannot create beacon sync config") continue } @@ -296,7 +296,7 @@ func (node *Node) doSync(bc core.BlockChain, worker *worker.Worker, willJoinCons Msg("cannot retrieve syncing peers") return } - if err := syncInstance.CreateSyncConfig(peers, shardID); err != nil { + if err := node.stateSync.CreateSyncConfig(peers, shardID, node.HarmonyConfig.P2P.WaitForEachPeerToConnect); err != nil { utils.Logger().Warn(). Err(err). Interface("peers", peers). @@ -416,7 +416,7 @@ func (node *Node) SendNewBlockToUnsync() { elapseTime := time.Now().UnixNano() - config.timestamp if elapseTime > broadcastTimeout { utils.Logger().Warn().Str("peerID", peerID).Msg("[SYNC] SendNewBlockToUnsync to peer timeout") - node.peerRegistrationRecord[peerID].client.Close() + node.peerRegistrationRecord[peerID].client.Close("send new block to peer timeout") delete(node.peerRegistrationRecord, peerID) continue } @@ -425,13 +425,13 @@ func (node *Node) SendNewBlockToUnsync() { sendBytes = blockWithSigBytes } response, err := config.client.PushNewBlock(node.GetSyncID(), sendBytes, false) - // close the connection if cannot push new block to unsync node + // close the connection if cannot push new block to not synchronized node if err != nil { - node.peerRegistrationRecord[peerID].client.Close() + node.peerRegistrationRecord[peerID].client.Close("cannot push new block to not synchronized node") delete(node.peerRegistrationRecord, peerID) } if response != nil && response.Type == downloader_pb.DownloaderResponse_INSYNC { - node.peerRegistrationRecord[peerID].client.Close() + node.peerRegistrationRecord[peerID].client.Close("node is synchronized") delete(node.peerRegistrationRecord, peerID) } } diff --git a/p2p/host.go b/p2p/host.go index 66026f95c..a2326c812 100644 --- a/p2p/host.go +++ b/p2p/host.go @@ -80,14 +80,15 @@ const ( // HostConfig is the config structure to create a new host type HostConfig struct { - Self *Peer - BLSKey libp2p_crypto.PrivKey - BootNodes []string - DataStoreFile *string - DiscConcurrency int - MaxConnPerIP int - DisablePrivateIPScan bool - MaxPeers int64 + Self *Peer + BLSKey libp2p_crypto.PrivKey + BootNodes []string + DataStoreFile *string + DiscConcurrency int + MaxConnPerIP int + DisablePrivateIPScan bool + MaxPeers int64 + WaitForEachPeerToConnect bool } func init() {