fix self query issue

pull/4377/head
“GheisMohammadi” 2 years ago committed by Casey Gardiner
parent 5625f6ed94
commit 9084066a50
  1. 6
      api/service/legacysync/epoch_syncing.go
  2. 6
      api/service/legacysync/helpers.go
  3. 24
      api/service/legacysync/syncing.go
  4. 4
      api/service/legacysync/syncing_test.go
  5. 10
      api/service/stagedsync/stagedsync.go
  6. 7
      api/service/stagedsync/sync_config.go
  7. 3
      cmd/harmony/main.go
  8. 2
      node/node.go
  9. 29
      node/node_syncing.go

@ -14,6 +14,8 @@ import (
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/shard" "github.com/harmony-one/harmony/shard"
"github.com/pkg/errors" "github.com/pkg/errors"
libp2p_peer "github.com/libp2p/go-libp2p/core/peer"
) )
type EpochSync struct { type EpochSync struct {
@ -202,8 +204,8 @@ func processWithPayload(payload [][]byte, bc core.BlockChain) error {
} }
// CreateSyncConfig creates SyncConfig for StateSync object. // CreateSyncConfig creates SyncConfig for StateSync object.
func (ss *EpochSync) CreateSyncConfig(peers []p2p.Peer, shardID uint32, waitForEachPeerToConnect bool) error { func (ss *EpochSync) CreateSyncConfig(peers []p2p.Peer, shardID uint32, selfPeerID libp2p_peer.ID, waitForEachPeerToConnect bool) error {
var err error var err error
ss.syncConfig, err = createSyncConfig(ss.syncConfig, peers, shardID, waitForEachPeerToConnect) ss.syncConfig, err = createSyncConfig(ss.syncConfig, peers, shardID, selfPeerID, waitForEachPeerToConnect)
return err return err
} }

@ -9,6 +9,7 @@ import (
"github.com/harmony-one/harmony/api/service/legacysync/downloader" "github.com/harmony-one/harmony/api/service/legacysync/downloader"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
libp2p_peer "github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -51,7 +52,7 @@ func getMaxPeerHeight(syncConfig *SyncConfig) uint64 {
return maxHeight return maxHeight
} }
func createSyncConfig(syncConfig *SyncConfig, peers []p2p.Peer, shardID uint32, waitForEachPeerToConnect bool) (*SyncConfig, error) { func createSyncConfig(syncConfig *SyncConfig, peers []p2p.Peer, shardID uint32, selfPeerID libp2p_peer.ID, waitForEachPeerToConnect bool) (*SyncConfig, error) {
// sanity check to ensure no duplicate peers // sanity check to ensure no duplicate peers
if err := checkPeersDuplicity(peers); err != nil { if err := checkPeersDuplicity(peers); err != nil {
return syncConfig, err return syncConfig, err
@ -61,6 +62,7 @@ func createSyncConfig(syncConfig *SyncConfig, peers []p2p.Peer, shardID uint32,
targetSize, peers := limitNumPeers(peers, randSeed) targetSize, peers := limitNumPeers(peers, randSeed)
utils.Logger().Debug(). utils.Logger().Debug().
Str("self peer ID", string(selfPeerID)).
Int("peers count", len(peers)). Int("peers count", len(peers)).
Int("target size", targetSize). Int("target size", targetSize).
Uint32("shardID", shardID). Uint32("shardID", shardID).
@ -72,7 +74,7 @@ func createSyncConfig(syncConfig *SyncConfig, peers []p2p.Peer, shardID uint32,
if syncConfig != nil { if syncConfig != nil {
syncConfig.CloseConnections() syncConfig.CloseConnections()
} }
syncConfig = NewSyncConfig(shardID, nil) syncConfig = NewSyncConfig(shardID, selfPeerID, nil)
if !waitForEachPeerToConnect { if !waitForEachPeerToConnect {
var wg sync.WaitGroup var wg sync.WaitGroup

@ -25,6 +25,7 @@ import (
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/node/worker" "github.com/harmony-one/harmony/node/worker"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
libp2p_peer "github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -110,14 +111,16 @@ type SyncConfig struct {
// SyncPeerConfig itself is guarded by its own mutex. // SyncPeerConfig itself is guarded by its own mutex.
mtx sync.RWMutex mtx sync.RWMutex
peers []*SyncPeerConfig peers []*SyncPeerConfig
shardID uint32 shardID uint32
selfPeerID libp2p_peer.ID
} }
func NewSyncConfig(shardID uint32, peers []*SyncPeerConfig) *SyncConfig { func NewSyncConfig(shardID uint32, selfPeerID libp2p_peer.ID, peers []*SyncPeerConfig) *SyncConfig {
return &SyncConfig{ return &SyncConfig{
peers: peers, peers: peers,
shardID: shardID, shardID: shardID,
selfPeerID: selfPeerID,
} }
} }
@ -135,6 +138,9 @@ func (sc *SyncConfig) AddPeer(peer *SyncPeerConfig) {
if peer.IsEqual(p2) { if peer.IsEqual(p2) {
return return
} }
if peer.peer.PeerID == sc.selfPeerID {
return
}
} }
sc.peers = append(sc.peers, peer) sc.peers = append(sc.peers, peer)
} }
@ -192,7 +198,7 @@ func (sc *SyncConfig) RemovePeer(peer *SyncPeerConfig, reason string) {
} }
// CreateStateSync returns the implementation of StateSyncInterface interface. // CreateStateSync returns the implementation of StateSyncInterface interface.
func CreateStateSync(bc blockChain, ip string, port string, peerHash [20]byte, isExplorer bool, role nodeconfig.Role) *StateSync { func CreateStateSync(bc blockChain, ip string, port string, peerHash [20]byte, peerID libp2p_peer.ID, isExplorer bool, role nodeconfig.Role) *StateSync {
stateSync := &StateSync{} stateSync := &StateSync{}
stateSync.blockChain = bc stateSync.blockChain = bc
stateSync.selfip = ip stateSync.selfip = ip
@ -201,7 +207,7 @@ func CreateStateSync(bc blockChain, ip string, port string, peerHash [20]byte, i
stateSync.commonBlocks = make(map[int]*types.Block) stateSync.commonBlocks = make(map[int]*types.Block)
stateSync.lastMileBlocks = []*types.Block{} stateSync.lastMileBlocks = []*types.Block{}
stateSync.isExplorer = isExplorer stateSync.isExplorer = isExplorer
stateSync.syncConfig = NewSyncConfig(bc.ShardID(), nil) stateSync.syncConfig = NewSyncConfig(bc.ShardID(), peerID, nil)
stateSync.syncStatus = newSyncStatus(role) stateSync.syncStatus = newSyncStatus(role)
return stateSync return stateSync
@ -366,9 +372,9 @@ func (peerConfig *SyncPeerConfig) GetBlocks(hashes [][]byte) ([][]byte, error) {
} }
// CreateSyncConfig creates SyncConfig for StateSync object. // CreateSyncConfig creates SyncConfig for StateSync object.
func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer, shardID uint32, waitForEachPeerToConnect bool) error { func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer, shardID uint32, selfPeerID libp2p_peer.ID, waitForEachPeerToConnect bool) error {
var err error var err error
ss.syncConfig, err = createSyncConfig(ss.syncConfig, peers, shardID, waitForEachPeerToConnect) ss.syncConfig, err = createSyncConfig(ss.syncConfig, peers, shardID, selfPeerID, waitForEachPeerToConnect)
return err return err
} }

@ -12,6 +12,7 @@ import (
"time" "time"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node" nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
peer "github.com/libp2p/go-libp2p-core/peer"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/harmony-one/harmony/api/service/legacysync/downloader" "github.com/harmony-one/harmony/api/service/legacysync/downloader"
@ -128,7 +129,8 @@ func (mockBlockchain) ShardID() uint32 {
} }
func TestCreateStateSync(t *testing.T) { func TestCreateStateSync(t *testing.T) {
stateSync := CreateStateSync(mockBlockchain{}, "127.0.0.1", "8000", [20]byte{}, false, nodeconfig.Validator) pID, _ := peer.IDFromBytes([]byte{})
stateSync := CreateStateSync(mockBlockchain{}, "127.0.0.1", "8000", [20]byte{}, pID, false, nodeconfig.Validator)
if stateSync == nil { if stateSync == nil {
t.Error("Unable to create stateSync") t.Error("Unable to create stateSync")

@ -26,6 +26,8 @@ import (
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
"github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv"
libp2p_peer "github.com/libp2p/go-libp2p/core/peer"
) )
type StagedSync struct { type StagedSync struct {
@ -663,7 +665,7 @@ func (ss *StagedSync) AddNewBlock(peerHash []byte, block *types.Block) {
} }
// CreateSyncConfig creates SyncConfig for StateSync object. // CreateSyncConfig creates SyncConfig for StateSync object.
func (ss *StagedSync) CreateSyncConfig(peers []p2p.Peer, shardID uint32, waitForEachPeerToConnect bool) error { func (ss *StagedSync) CreateSyncConfig(peers []p2p.Peer, shardID uint32, selfPeerID libp2p_peer.ID, waitForEachPeerToConnect bool) error {
// sanity check to ensure no duplicate peers // sanity check to ensure no duplicate peers
if err := checkPeersDuplicity(peers); err != nil { if err := checkPeersDuplicity(peers); err != nil {
return err return err
@ -678,6 +680,7 @@ func (ss *StagedSync) CreateSyncConfig(peers []p2p.Peer, shardID uint32, waitFor
} }
utils.Logger().Debug(). utils.Logger().Debug().
Str("self peer ID", string(selfPeerID)).
Int("peers count", len(peers)). Int("peers count", len(peers)).
Int("target size", targetSize). Int("target size", targetSize).
Msg("[STAGED_SYNC] CreateSyncConfig: len of peers") Msg("[STAGED_SYNC] CreateSyncConfig: len of peers")
@ -685,7 +688,9 @@ func (ss *StagedSync) CreateSyncConfig(peers []p2p.Peer, shardID uint32, waitFor
if ss.syncConfig != nil { if ss.syncConfig != nil {
ss.syncConfig.CloseConnections() ss.syncConfig.CloseConnections()
} }
ss.syncConfig = &SyncConfig{} ss.syncConfig = &SyncConfig{
selfPeerID: selfPeerID,
}
var connectedPeers int var connectedPeers int
for _, peer := range peers { for _, peer := range peers {
@ -694,6 +699,7 @@ func (ss *StagedSync) CreateSyncConfig(peers []p2p.Peer, shardID uint32, waitFor
continue continue
} }
peerConfig := &SyncPeerConfig{ peerConfig := &SyncPeerConfig{
peer: peer,
ip: peer.IP, ip: peer.IP,
port: peer.Port, port: peer.Port,
client: client, client: client,

@ -14,6 +14,8 @@ import (
"github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
libp2p_peer "github.com/libp2p/go-libp2p/core/peer"
) )
// Constants for syncing. // Constants for syncing.
@ -40,6 +42,7 @@ const (
// SyncPeerConfig is peer config to sync. // SyncPeerConfig is peer config to sync.
type SyncPeerConfig struct { type SyncPeerConfig struct {
peer p2p.Peer
ip string ip string
port string port string
peerHash []byte peerHash []byte
@ -156,6 +159,7 @@ type SyncConfig struct {
mtx sync.RWMutex mtx sync.RWMutex
reservedPeers []*SyncPeerConfig reservedPeers []*SyncPeerConfig
peers []*SyncPeerConfig peers []*SyncPeerConfig
selfPeerID libp2p_peer.ID
} }
// AddPeer adds the given sync peer. // AddPeer adds the given sync peer.
@ -168,6 +172,9 @@ func (sc *SyncConfig) AddPeer(peer *SyncPeerConfig) {
if peer.IsEqual(p2) { if peer.IsEqual(p2) {
return return
} }
if peer.peer.PeerID == sc.selfPeerID {
return
}
} }
sc.peers = append(sc.peers, peer) sc.peers = append(sc.peers, peer)
} }

@ -773,7 +773,8 @@ func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfi
currentNode.SyncingPeerProvider = node.NewLocalSyncingPeerProvider( currentNode.SyncingPeerProvider = node.NewLocalSyncingPeerProvider(
6000, uint16(selfPort), epochConfig.NumShards(), uint32(epochConfig.NumNodesPerShard())) 6000, uint16(selfPort), epochConfig.NumShards(), uint32(epochConfig.NumNodesPerShard()))
} else { } else {
currentNode.SyncingPeerProvider = node.NewDNSSyncingPeerProvider(hc.DNSSync.Zone, strconv.Itoa(hc.DNSSync.Port)) addrs := myHost.GetP2PHost().Addrs()
currentNode.SyncingPeerProvider = node.NewDNSSyncingPeerProvider(hc.DNSSync.Zone, strconv.Itoa(hc.DNSSync.Port), addrs)
} }
currentNode.NodeConfig.DNSZone = hc.DNSSync.Zone currentNode.NodeConfig.DNSZone = hc.DNSSync.Zone

@ -87,7 +87,7 @@ type ISync interface {
UpdateBlockAndStatus(block *types.Block, bc core.BlockChain, verifyAllSig bool) error UpdateBlockAndStatus(block *types.Block, bc core.BlockChain, verifyAllSig bool) error
AddLastMileBlock(block *types.Block) AddLastMileBlock(block *types.Block)
GetActivePeerNumber() int GetActivePeerNumber() int
CreateSyncConfig(peers []p2p.Peer, shardID uint32, waitForEachPeerToConnect bool) error CreateSyncConfig(peers []p2p.Peer, shardID uint32, selfPeerID libp2p_peer.ID, waitForEachPeerToConnect bool) error
SyncLoop(bc core.BlockChain, worker *worker.Worker, isBeacon bool, consensus *consensus.Consensus, loopMinTime time.Duration) SyncLoop(bc core.BlockChain, worker *worker.Worker, isBeacon bool, consensus *consensus.Consensus, loopMinTime time.Duration)
IsSynchronized() bool IsSynchronized() bool
IsSameBlockchainHeight(bc core.BlockChain) (uint64, bool) IsSameBlockchainHeight(bc core.BlockChain) (uint64, bool)

@ -9,6 +9,7 @@ import (
"time" "time"
"github.com/harmony-one/harmony/internal/tikv" "github.com/harmony-one/harmony/internal/tikv"
"github.com/multiformats/go-multiaddr"
prom "github.com/harmony-one/harmony/api/service/prometheus" prom "github.com/harmony-one/harmony/api/service/prometheus"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
@ -105,7 +106,8 @@ func (node *Node) createStateSync(bc core.BlockChain) *legacysync.StateSync {
mutatedPort := strconv.Itoa(mySyncPort + legacysync.SyncingPortDifference) mutatedPort := strconv.Itoa(mySyncPort + legacysync.SyncingPortDifference)
role := node.NodeConfig.Role() role := node.NodeConfig.Role()
return legacysync.CreateStateSync(bc, node.SelfPeer.IP, mutatedPort, return legacysync.CreateStateSync(bc, node.SelfPeer.IP, mutatedPort,
node.GetSyncID(), node.NodeConfig.Role() == nodeconfig.ExplorerNode, role) node.GetSyncID(), node.host.GetID(),
node.NodeConfig.Role() == nodeconfig.ExplorerNode, role)
} }
func (node *Node) createStagedSync(bc core.BlockChain) *stagedsync.StagedSync { func (node *Node) createStagedSync(bc core.BlockChain) *stagedsync.StagedSync {
@ -151,14 +153,16 @@ type SyncingPeerProvider interface {
// DNSSyncingPeerProvider uses the given DNS zone to resolve syncing peers. // DNSSyncingPeerProvider uses the given DNS zone to resolve syncing peers.
type DNSSyncingPeerProvider struct { type DNSSyncingPeerProvider struct {
selfAddrs []multiaddr.Multiaddr
zone, port string zone, port string
lookupHost func(name string) (addrs []string, err error) lookupHost func(name string) (addrs []string, err error)
} }
// NewDNSSyncingPeerProvider returns a provider that uses given DNS name and // NewDNSSyncingPeerProvider returns a provider that uses given DNS name and
// port number to resolve syncing peers. // port number to resolve syncing peers.
func NewDNSSyncingPeerProvider(zone, port string) *DNSSyncingPeerProvider { func NewDNSSyncingPeerProvider(zone, port string, addrs []multiaddr.Multiaddr) *DNSSyncingPeerProvider {
return &DNSSyncingPeerProvider{ return &DNSSyncingPeerProvider{
selfAddrs: addrs,
zone: zone, zone: zone,
port: port, port: port,
lookupHost: net.LookupHost, lookupHost: net.LookupHost,
@ -174,11 +178,27 @@ func (p *DNSSyncingPeerProvider) SyncingPeers(shardID uint32) (peers []p2p.Peer,
"[SYNC] cannot find peers using DNS name %#v", dns) "[SYNC] cannot find peers using DNS name %#v", dns)
} }
for _, addr := range addrs { for _, addr := range addrs {
// no need to have peer itself on the list of connected peers
if p.getSelfAddrIndex(addr, p.port) >= 0 {
continue
}
peers = append(peers, p2p.Peer{IP: addr, Port: p.port}) peers = append(peers, p2p.Peer{IP: addr, Port: p.port})
} }
return peers, nil return peers, nil
} }
// getSelfAddrIndex returns address index if it is one of self addresses
func (p *DNSSyncingPeerProvider) getSelfAddrIndex(IP string, Port string) int {
peerAddr4 := fmt.Sprintf("/ip4/%s/tcp/%s", IP, Port)
peerAddr6 := fmt.Sprintf("/ip6/%s/tcp/%s", IP, Port)
for addrIndex, addr := range p.selfAddrs {
if addr.String() == peerAddr4 || addr.String() == peerAddr6 {
return addrIndex
}
}
return -1
}
// LocalSyncingPeerProvider uses localnet deployment convention to synthesize // LocalSyncingPeerProvider uses localnet deployment convention to synthesize
// syncing peers. // syncing peers.
type LocalSyncingPeerProvider struct { type LocalSyncingPeerProvider struct {
@ -253,7 +273,8 @@ func (node *Node) doBeaconSyncing() {
continue continue
} }
if err := node.epochSync.CreateSyncConfig(peers, shard.BeaconChainShardID, node.HarmonyConfig.P2P.WaitForEachPeerToConnect); err != nil { if err := node.epochSync.CreateSyncConfig(peers, shard.BeaconChainShardID, node.host.GetID(),
node.HarmonyConfig.P2P.WaitForEachPeerToConnect); err != nil {
utils.Logger().Warn().Err(err).Msg("[EPOCHSYNC] cannot create beacon sync config") utils.Logger().Warn().Err(err).Msg("[EPOCHSYNC] cannot create beacon sync config")
continue continue
} }
@ -296,7 +317,7 @@ func (node *Node) doSync(bc core.BlockChain, worker *worker.Worker, willJoinCons
Msg("cannot retrieve syncing peers") Msg("cannot retrieve syncing peers")
return return
} }
if err := syncInstance.CreateSyncConfig(peers, shardID, node.HarmonyConfig.P2P.WaitForEachPeerToConnect); err != nil { if err := syncInstance.CreateSyncConfig(peers, shardID, node.host.GetID(), node.HarmonyConfig.P2P.WaitForEachPeerToConnect); err != nil {
utils.Logger().Warn(). utils.Logger().Warn().
Err(err). Err(err).
Interface("peers", peers). Interface("peers", peers).

Loading…
Cancel
Save