From 9084066a50575397d6ef0a3ead861b873373c891 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CGheisMohammadi=E2=80=9D?= <36589218+GheisMohammadi@users.noreply.github.com> Date: Thu, 26 Jan 2023 11:17:45 +0800 Subject: [PATCH] fix self query issue --- api/service/legacysync/epoch_syncing.go | 6 +++-- api/service/legacysync/helpers.go | 6 +++-- api/service/legacysync/syncing.go | 24 ++++++++++++-------- api/service/legacysync/syncing_test.go | 4 +++- api/service/stagedsync/stagedsync.go | 10 +++++++-- api/service/stagedsync/sync_config.go | 7 ++++++ cmd/harmony/main.go | 3 ++- node/node.go | 2 +- node/node_syncing.go | 29 +++++++++++++++++++++---- 9 files changed, 69 insertions(+), 22 deletions(-) diff --git a/api/service/legacysync/epoch_syncing.go b/api/service/legacysync/epoch_syncing.go index 8bf7ae145..641fb5889 100644 --- a/api/service/legacysync/epoch_syncing.go +++ b/api/service/legacysync/epoch_syncing.go @@ -14,6 +14,8 @@ import ( "github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/shard" "github.com/pkg/errors" + + libp2p_peer "github.com/libp2p/go-libp2p/core/peer" ) type EpochSync struct { @@ -202,8 +204,8 @@ func processWithPayload(payload [][]byte, bc core.BlockChain) error { } // CreateSyncConfig creates SyncConfig for StateSync object. -func (ss *EpochSync) CreateSyncConfig(peers []p2p.Peer, shardID uint32, waitForEachPeerToConnect bool) error { +func (ss *EpochSync) CreateSyncConfig(peers []p2p.Peer, shardID uint32, selfPeerID libp2p_peer.ID, waitForEachPeerToConnect bool) error { var err error - ss.syncConfig, err = createSyncConfig(ss.syncConfig, peers, shardID, waitForEachPeerToConnect) + ss.syncConfig, err = createSyncConfig(ss.syncConfig, peers, shardID, selfPeerID, waitForEachPeerToConnect) return err } diff --git a/api/service/legacysync/helpers.go b/api/service/legacysync/helpers.go index ca66e0ddc..4699257f8 100644 --- a/api/service/legacysync/helpers.go +++ b/api/service/legacysync/helpers.go @@ -9,6 +9,7 @@ import ( "github.com/harmony-one/harmony/api/service/legacysync/downloader" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/p2p" + libp2p_peer "github.com/libp2p/go-libp2p/core/peer" "github.com/pkg/errors" ) @@ -51,7 +52,7 @@ func getMaxPeerHeight(syncConfig *SyncConfig) uint64 { return maxHeight } -func createSyncConfig(syncConfig *SyncConfig, peers []p2p.Peer, shardID uint32, waitForEachPeerToConnect bool) (*SyncConfig, error) { +func createSyncConfig(syncConfig *SyncConfig, peers []p2p.Peer, shardID uint32, selfPeerID libp2p_peer.ID, waitForEachPeerToConnect bool) (*SyncConfig, error) { // sanity check to ensure no duplicate peers if err := checkPeersDuplicity(peers); err != nil { return syncConfig, err @@ -61,6 +62,7 @@ func createSyncConfig(syncConfig *SyncConfig, peers []p2p.Peer, shardID uint32, targetSize, peers := limitNumPeers(peers, randSeed) utils.Logger().Debug(). + Str("self peer ID", string(selfPeerID)). Int("peers count", len(peers)). Int("target size", targetSize). Uint32("shardID", shardID). @@ -72,7 +74,7 @@ func createSyncConfig(syncConfig *SyncConfig, peers []p2p.Peer, shardID uint32, if syncConfig != nil { syncConfig.CloseConnections() } - syncConfig = NewSyncConfig(shardID, nil) + syncConfig = NewSyncConfig(shardID, selfPeerID, nil) if !waitForEachPeerToConnect { var wg sync.WaitGroup diff --git a/api/service/legacysync/syncing.go b/api/service/legacysync/syncing.go index 8afaa3a95..0f004bfb4 100644 --- a/api/service/legacysync/syncing.go +++ b/api/service/legacysync/syncing.go @@ -25,6 +25,7 @@ import ( "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/node/worker" "github.com/harmony-one/harmony/p2p" + libp2p_peer "github.com/libp2p/go-libp2p/core/peer" "github.com/pkg/errors" ) @@ -110,14 +111,16 @@ type SyncConfig struct { // SyncPeerConfig itself is guarded by its own mutex. mtx sync.RWMutex - peers []*SyncPeerConfig - shardID uint32 + peers []*SyncPeerConfig + shardID uint32 + selfPeerID libp2p_peer.ID } -func NewSyncConfig(shardID uint32, peers []*SyncPeerConfig) *SyncConfig { +func NewSyncConfig(shardID uint32, selfPeerID libp2p_peer.ID, peers []*SyncPeerConfig) *SyncConfig { return &SyncConfig{ - peers: peers, - shardID: shardID, + peers: peers, + shardID: shardID, + selfPeerID: selfPeerID, } } @@ -135,6 +138,9 @@ func (sc *SyncConfig) AddPeer(peer *SyncPeerConfig) { if peer.IsEqual(p2) { return } + if peer.peer.PeerID == sc.selfPeerID { + return + } } sc.peers = append(sc.peers, peer) } @@ -192,7 +198,7 @@ func (sc *SyncConfig) RemovePeer(peer *SyncPeerConfig, reason string) { } // CreateStateSync returns the implementation of StateSyncInterface interface. -func CreateStateSync(bc blockChain, ip string, port string, peerHash [20]byte, isExplorer bool, role nodeconfig.Role) *StateSync { +func CreateStateSync(bc blockChain, ip string, port string, peerHash [20]byte, peerID libp2p_peer.ID, isExplorer bool, role nodeconfig.Role) *StateSync { stateSync := &StateSync{} stateSync.blockChain = bc stateSync.selfip = ip @@ -201,7 +207,7 @@ func CreateStateSync(bc blockChain, ip string, port string, peerHash [20]byte, i stateSync.commonBlocks = make(map[int]*types.Block) stateSync.lastMileBlocks = []*types.Block{} stateSync.isExplorer = isExplorer - stateSync.syncConfig = NewSyncConfig(bc.ShardID(), nil) + stateSync.syncConfig = NewSyncConfig(bc.ShardID(), peerID, nil) stateSync.syncStatus = newSyncStatus(role) return stateSync @@ -366,9 +372,9 @@ func (peerConfig *SyncPeerConfig) GetBlocks(hashes [][]byte) ([][]byte, error) { } // CreateSyncConfig creates SyncConfig for StateSync object. -func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer, shardID uint32, waitForEachPeerToConnect bool) error { +func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer, shardID uint32, selfPeerID libp2p_peer.ID, waitForEachPeerToConnect bool) error { var err error - ss.syncConfig, err = createSyncConfig(ss.syncConfig, peers, shardID, waitForEachPeerToConnect) + ss.syncConfig, err = createSyncConfig(ss.syncConfig, peers, shardID, selfPeerID, waitForEachPeerToConnect) return err } diff --git a/api/service/legacysync/syncing_test.go b/api/service/legacysync/syncing_test.go index bcab494a1..bc17aeaec 100644 --- a/api/service/legacysync/syncing_test.go +++ b/api/service/legacysync/syncing_test.go @@ -12,6 +12,7 @@ import ( "time" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" + peer "github.com/libp2p/go-libp2p-core/peer" "github.com/ethereum/go-ethereum/common" "github.com/harmony-one/harmony/api/service/legacysync/downloader" @@ -128,7 +129,8 @@ func (mockBlockchain) ShardID() uint32 { } func TestCreateStateSync(t *testing.T) { - stateSync := CreateStateSync(mockBlockchain{}, "127.0.0.1", "8000", [20]byte{}, false, nodeconfig.Validator) + pID, _ := peer.IDFromBytes([]byte{}) + stateSync := CreateStateSync(mockBlockchain{}, "127.0.0.1", "8000", [20]byte{}, pID, false, nodeconfig.Validator) if stateSync == nil { t.Error("Unable to create stateSync") diff --git a/api/service/stagedsync/stagedsync.go b/api/service/stagedsync/stagedsync.go index 88df1a671..c81f0b5b1 100644 --- a/api/service/stagedsync/stagedsync.go +++ b/api/service/stagedsync/stagedsync.go @@ -26,6 +26,8 @@ import ( "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/p2p" "github.com/ledgerwatch/erigon-lib/kv" + + libp2p_peer "github.com/libp2p/go-libp2p/core/peer" ) type StagedSync struct { @@ -663,7 +665,7 @@ func (ss *StagedSync) AddNewBlock(peerHash []byte, block *types.Block) { } // CreateSyncConfig creates SyncConfig for StateSync object. -func (ss *StagedSync) CreateSyncConfig(peers []p2p.Peer, shardID uint32, waitForEachPeerToConnect bool) error { +func (ss *StagedSync) CreateSyncConfig(peers []p2p.Peer, shardID uint32, selfPeerID libp2p_peer.ID, waitForEachPeerToConnect bool) error { // sanity check to ensure no duplicate peers if err := checkPeersDuplicity(peers); err != nil { return err @@ -678,6 +680,7 @@ func (ss *StagedSync) CreateSyncConfig(peers []p2p.Peer, shardID uint32, waitFor } utils.Logger().Debug(). + Str("self peer ID", string(selfPeerID)). Int("peers count", len(peers)). Int("target size", targetSize). Msg("[STAGED_SYNC] CreateSyncConfig: len of peers") @@ -685,7 +688,9 @@ func (ss *StagedSync) CreateSyncConfig(peers []p2p.Peer, shardID uint32, waitFor if ss.syncConfig != nil { ss.syncConfig.CloseConnections() } - ss.syncConfig = &SyncConfig{} + ss.syncConfig = &SyncConfig{ + selfPeerID: selfPeerID, + } var connectedPeers int for _, peer := range peers { @@ -694,6 +699,7 @@ func (ss *StagedSync) CreateSyncConfig(peers []p2p.Peer, shardID uint32, waitFor continue } peerConfig := &SyncPeerConfig{ + peer: peer, ip: peer.IP, port: peer.Port, client: client, diff --git a/api/service/stagedsync/sync_config.go b/api/service/stagedsync/sync_config.go index f42737cc1..91b3a4d73 100644 --- a/api/service/stagedsync/sync_config.go +++ b/api/service/stagedsync/sync_config.go @@ -14,6 +14,8 @@ import ( "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/p2p" + + libp2p_peer "github.com/libp2p/go-libp2p/core/peer" ) // Constants for syncing. @@ -40,6 +42,7 @@ const ( // SyncPeerConfig is peer config to sync. type SyncPeerConfig struct { + peer p2p.Peer ip string port string peerHash []byte @@ -156,6 +159,7 @@ type SyncConfig struct { mtx sync.RWMutex reservedPeers []*SyncPeerConfig peers []*SyncPeerConfig + selfPeerID libp2p_peer.ID } // AddPeer adds the given sync peer. @@ -168,6 +172,9 @@ func (sc *SyncConfig) AddPeer(peer *SyncPeerConfig) { if peer.IsEqual(p2) { return } + if peer.peer.PeerID == sc.selfPeerID { + return + } } sc.peers = append(sc.peers, peer) } diff --git a/cmd/harmony/main.go b/cmd/harmony/main.go index 8543158b3..b68e75742 100644 --- a/cmd/harmony/main.go +++ b/cmd/harmony/main.go @@ -773,7 +773,8 @@ func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfi currentNode.SyncingPeerProvider = node.NewLocalSyncingPeerProvider( 6000, uint16(selfPort), epochConfig.NumShards(), uint32(epochConfig.NumNodesPerShard())) } else { - currentNode.SyncingPeerProvider = node.NewDNSSyncingPeerProvider(hc.DNSSync.Zone, strconv.Itoa(hc.DNSSync.Port)) + addrs := myHost.GetP2PHost().Addrs() + currentNode.SyncingPeerProvider = node.NewDNSSyncingPeerProvider(hc.DNSSync.Zone, strconv.Itoa(hc.DNSSync.Port), addrs) } currentNode.NodeConfig.DNSZone = hc.DNSSync.Zone diff --git a/node/node.go b/node/node.go index bc80ef81a..6c2ebcd5f 100644 --- a/node/node.go +++ b/node/node.go @@ -87,7 +87,7 @@ type ISync interface { UpdateBlockAndStatus(block *types.Block, bc core.BlockChain, verifyAllSig bool) error AddLastMileBlock(block *types.Block) GetActivePeerNumber() int - CreateSyncConfig(peers []p2p.Peer, shardID uint32, waitForEachPeerToConnect bool) error + CreateSyncConfig(peers []p2p.Peer, shardID uint32, selfPeerID libp2p_peer.ID, waitForEachPeerToConnect bool) error SyncLoop(bc core.BlockChain, worker *worker.Worker, isBeacon bool, consensus *consensus.Consensus, loopMinTime time.Duration) IsSynchronized() bool IsSameBlockchainHeight(bc core.BlockChain) (uint64, bool) diff --git a/node/node_syncing.go b/node/node_syncing.go index 2219be96d..a0a8e6c84 100644 --- a/node/node_syncing.go +++ b/node/node_syncing.go @@ -9,6 +9,7 @@ import ( "time" "github.com/harmony-one/harmony/internal/tikv" + "github.com/multiformats/go-multiaddr" prom "github.com/harmony-one/harmony/api/service/prometheus" "github.com/prometheus/client_golang/prometheus" @@ -105,7 +106,8 @@ func (node *Node) createStateSync(bc core.BlockChain) *legacysync.StateSync { mutatedPort := strconv.Itoa(mySyncPort + legacysync.SyncingPortDifference) role := node.NodeConfig.Role() return legacysync.CreateStateSync(bc, node.SelfPeer.IP, mutatedPort, - node.GetSyncID(), node.NodeConfig.Role() == nodeconfig.ExplorerNode, role) + node.GetSyncID(), node.host.GetID(), + node.NodeConfig.Role() == nodeconfig.ExplorerNode, role) } func (node *Node) createStagedSync(bc core.BlockChain) *stagedsync.StagedSync { @@ -151,14 +153,16 @@ type SyncingPeerProvider interface { // DNSSyncingPeerProvider uses the given DNS zone to resolve syncing peers. type DNSSyncingPeerProvider struct { + selfAddrs []multiaddr.Multiaddr zone, port string lookupHost func(name string) (addrs []string, err error) } // NewDNSSyncingPeerProvider returns a provider that uses given DNS name and // port number to resolve syncing peers. -func NewDNSSyncingPeerProvider(zone, port string) *DNSSyncingPeerProvider { +func NewDNSSyncingPeerProvider(zone, port string, addrs []multiaddr.Multiaddr) *DNSSyncingPeerProvider { return &DNSSyncingPeerProvider{ + selfAddrs: addrs, zone: zone, port: port, lookupHost: net.LookupHost, @@ -174,11 +178,27 @@ func (p *DNSSyncingPeerProvider) SyncingPeers(shardID uint32) (peers []p2p.Peer, "[SYNC] cannot find peers using DNS name %#v", dns) } for _, addr := range addrs { + // no need to have peer itself on the list of connected peers + if p.getSelfAddrIndex(addr, p.port) >= 0 { + continue + } peers = append(peers, p2p.Peer{IP: addr, Port: p.port}) } return peers, nil } +// getSelfAddrIndex returns address index if it is one of self addresses +func (p *DNSSyncingPeerProvider) getSelfAddrIndex(IP string, Port string) int { + peerAddr4 := fmt.Sprintf("/ip4/%s/tcp/%s", IP, Port) + peerAddr6 := fmt.Sprintf("/ip6/%s/tcp/%s", IP, Port) + for addrIndex, addr := range p.selfAddrs { + if addr.String() == peerAddr4 || addr.String() == peerAddr6 { + return addrIndex + } + } + return -1 +} + // LocalSyncingPeerProvider uses localnet deployment convention to synthesize // syncing peers. type LocalSyncingPeerProvider struct { @@ -253,7 +273,8 @@ func (node *Node) doBeaconSyncing() { continue } - if err := node.epochSync.CreateSyncConfig(peers, shard.BeaconChainShardID, node.HarmonyConfig.P2P.WaitForEachPeerToConnect); err != nil { + if err := node.epochSync.CreateSyncConfig(peers, shard.BeaconChainShardID, node.host.GetID(), + node.HarmonyConfig.P2P.WaitForEachPeerToConnect); err != nil { utils.Logger().Warn().Err(err).Msg("[EPOCHSYNC] cannot create beacon sync config") continue } @@ -296,7 +317,7 @@ func (node *Node) doSync(bc core.BlockChain, worker *worker.Worker, willJoinCons Msg("cannot retrieve syncing peers") return } - if err := syncInstance.CreateSyncConfig(peers, shardID, node.HarmonyConfig.P2P.WaitForEachPeerToConnect); err != nil { + if err := syncInstance.CreateSyncConfig(peers, shardID, node.host.GetID(), node.HarmonyConfig.P2P.WaitForEachPeerToConnect); err != nil { utils.Logger().Warn(). Err(err). Interface("peers", peers).