fix self query issue

pull/4352/head
“GheisMohammadi” 2 years ago committed by Casey Gardiner
parent 41d72b4c4e
commit 94e3c52bd7
  1. 6
      api/service/legacysync/epoch_syncing.go
  2. 6
      api/service/legacysync/helpers.go
  3. 16
      api/service/legacysync/syncing.go
  4. 4
      api/service/legacysync/syncing_test.go
  5. 10
      api/service/stagedsync/stagedsync.go
  6. 7
      api/service/stagedsync/sync_config.go
  7. 3
      cmd/harmony/main.go
  8. 2
      node/node.go
  9. 29
      node/node_syncing.go

@ -14,6 +14,8 @@ import (
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/shard"
"github.com/pkg/errors"
libp2p_peer "github.com/libp2p/go-libp2p/core/peer"
)
type EpochSync struct {
@ -202,8 +204,8 @@ func processWithPayload(payload [][]byte, bc core.BlockChain) error {
}
// CreateSyncConfig creates SyncConfig for StateSync object.
func (ss *EpochSync) CreateSyncConfig(peers []p2p.Peer, shardID uint32, waitForEachPeerToConnect bool) error {
func (ss *EpochSync) CreateSyncConfig(peers []p2p.Peer, shardID uint32, selfPeerID libp2p_peer.ID, waitForEachPeerToConnect bool) error {
var err error
ss.syncConfig, err = createSyncConfig(ss.syncConfig, peers, shardID, waitForEachPeerToConnect)
ss.syncConfig, err = createSyncConfig(ss.syncConfig, peers, shardID, selfPeerID, waitForEachPeerToConnect)
return err
}

@ -9,6 +9,7 @@ import (
"github.com/harmony-one/harmony/api/service/legacysync/downloader"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
libp2p_peer "github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
)
@ -51,7 +52,7 @@ func getMaxPeerHeight(syncConfig *SyncConfig) uint64 {
return maxHeight
}
func createSyncConfig(syncConfig *SyncConfig, peers []p2p.Peer, shardID uint32, waitForEachPeerToConnect bool) (*SyncConfig, error) {
func createSyncConfig(syncConfig *SyncConfig, peers []p2p.Peer, shardID uint32, selfPeerID libp2p_peer.ID, waitForEachPeerToConnect bool) (*SyncConfig, error) {
// sanity check to ensure no duplicate peers
if err := checkPeersDuplicity(peers); err != nil {
return syncConfig, err
@ -61,6 +62,7 @@ func createSyncConfig(syncConfig *SyncConfig, peers []p2p.Peer, shardID uint32,
targetSize, peers := limitNumPeers(peers, randSeed)
utils.Logger().Debug().
Str("self peer ID", string(selfPeerID)).
Int("peers count", len(peers)).
Int("target size", targetSize).
Uint32("shardID", shardID).
@ -72,7 +74,7 @@ func createSyncConfig(syncConfig *SyncConfig, peers []p2p.Peer, shardID uint32,
if syncConfig != nil {
syncConfig.CloseConnections()
}
syncConfig = NewSyncConfig(shardID, nil)
syncConfig = NewSyncConfig(shardID, selfPeerID, nil)
if !waitForEachPeerToConnect {
var wg sync.WaitGroup

@ -25,6 +25,7 @@ import (
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/node/worker"
"github.com/harmony-one/harmony/p2p"
libp2p_peer "github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
)
@ -112,12 +113,14 @@ type SyncConfig struct {
peers []*SyncPeerConfig
shardID uint32
selfPeerID libp2p_peer.ID
}
func NewSyncConfig(shardID uint32, peers []*SyncPeerConfig) *SyncConfig {
func NewSyncConfig(shardID uint32, selfPeerID libp2p_peer.ID, peers []*SyncPeerConfig) *SyncConfig {
return &SyncConfig{
peers: peers,
shardID: shardID,
selfPeerID: selfPeerID,
}
}
@ -135,6 +138,9 @@ func (sc *SyncConfig) AddPeer(peer *SyncPeerConfig) {
if peer.IsEqual(p2) {
return
}
if peer.peer.PeerID == sc.selfPeerID {
return
}
}
sc.peers = append(sc.peers, peer)
}
@ -192,7 +198,7 @@ func (sc *SyncConfig) RemovePeer(peer *SyncPeerConfig, reason string) {
}
// CreateStateSync returns the implementation of StateSyncInterface interface.
func CreateStateSync(bc blockChain, ip string, port string, peerHash [20]byte, isExplorer bool, role nodeconfig.Role) *StateSync {
func CreateStateSync(bc blockChain, ip string, port string, peerHash [20]byte, peerID libp2p_peer.ID, isExplorer bool, role nodeconfig.Role) *StateSync {
stateSync := &StateSync{}
stateSync.blockChain = bc
stateSync.selfip = ip
@ -201,7 +207,7 @@ func CreateStateSync(bc blockChain, ip string, port string, peerHash [20]byte, i
stateSync.commonBlocks = make(map[int]*types.Block)
stateSync.lastMileBlocks = []*types.Block{}
stateSync.isExplorer = isExplorer
stateSync.syncConfig = NewSyncConfig(bc.ShardID(), nil)
stateSync.syncConfig = NewSyncConfig(bc.ShardID(), peerID, nil)
stateSync.syncStatus = newSyncStatus(role)
return stateSync
@ -366,9 +372,9 @@ func (peerConfig *SyncPeerConfig) GetBlocks(hashes [][]byte) ([][]byte, error) {
}
// CreateSyncConfig creates SyncConfig for StateSync object.
func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer, shardID uint32, waitForEachPeerToConnect bool) error {
func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer, shardID uint32, selfPeerID libp2p_peer.ID, waitForEachPeerToConnect bool) error {
var err error
ss.syncConfig, err = createSyncConfig(ss.syncConfig, peers, shardID, waitForEachPeerToConnect)
ss.syncConfig, err = createSyncConfig(ss.syncConfig, peers, shardID, selfPeerID, waitForEachPeerToConnect)
return err
}

@ -12,6 +12,7 @@ import (
"time"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
peer "github.com/libp2p/go-libp2p-core/peer"
"github.com/ethereum/go-ethereum/common"
"github.com/harmony-one/harmony/api/service/legacysync/downloader"
@ -128,7 +129,8 @@ func (mockBlockchain) ShardID() uint32 {
}
func TestCreateStateSync(t *testing.T) {
stateSync := CreateStateSync(mockBlockchain{}, "127.0.0.1", "8000", [20]byte{}, false, nodeconfig.Validator)
pID, _ := peer.IDFromBytes([]byte{})
stateSync := CreateStateSync(mockBlockchain{}, "127.0.0.1", "8000", [20]byte{}, pID, false, nodeconfig.Validator)
if stateSync == nil {
t.Error("Unable to create stateSync")

@ -26,6 +26,8 @@ import (
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
"github.com/ledgerwatch/erigon-lib/kv"
libp2p_peer "github.com/libp2p/go-libp2p/core/peer"
)
type StagedSync struct {
@ -663,7 +665,7 @@ func (ss *StagedSync) AddNewBlock(peerHash []byte, block *types.Block) {
}
// CreateSyncConfig creates SyncConfig for StateSync object.
func (ss *StagedSync) CreateSyncConfig(peers []p2p.Peer, shardID uint32, waitForEachPeerToConnect bool) error {
func (ss *StagedSync) CreateSyncConfig(peers []p2p.Peer, shardID uint32, selfPeerID libp2p_peer.ID, waitForEachPeerToConnect bool) error {
// sanity check to ensure no duplicate peers
if err := checkPeersDuplicity(peers); err != nil {
return err
@ -678,6 +680,7 @@ func (ss *StagedSync) CreateSyncConfig(peers []p2p.Peer, shardID uint32, waitFor
}
utils.Logger().Debug().
Str("self peer ID", string(selfPeerID)).
Int("peers count", len(peers)).
Int("target size", targetSize).
Msg("[STAGED_SYNC] CreateSyncConfig: len of peers")
@ -685,7 +688,9 @@ func (ss *StagedSync) CreateSyncConfig(peers []p2p.Peer, shardID uint32, waitFor
if ss.syncConfig != nil {
ss.syncConfig.CloseConnections()
}
ss.syncConfig = &SyncConfig{}
ss.syncConfig = &SyncConfig{
selfPeerID: selfPeerID,
}
var connectedPeers int
for _, peer := range peers {
@ -694,6 +699,7 @@ func (ss *StagedSync) CreateSyncConfig(peers []p2p.Peer, shardID uint32, waitFor
continue
}
peerConfig := &SyncPeerConfig{
peer: peer,
ip: peer.IP,
port: peer.Port,
client: client,

@ -14,6 +14,8 @@ import (
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
libp2p_peer "github.com/libp2p/go-libp2p/core/peer"
)
// Constants for syncing.
@ -40,6 +42,7 @@ const (
// SyncPeerConfig is peer config to sync.
type SyncPeerConfig struct {
peer p2p.Peer
ip string
port string
peerHash []byte
@ -156,6 +159,7 @@ type SyncConfig struct {
mtx sync.RWMutex
reservedPeers []*SyncPeerConfig
peers []*SyncPeerConfig
selfPeerID libp2p_peer.ID
}
// AddPeer adds the given sync peer.
@ -168,6 +172,9 @@ func (sc *SyncConfig) AddPeer(peer *SyncPeerConfig) {
if peer.IsEqual(p2) {
return
}
if peer.peer.PeerID == sc.selfPeerID {
return
}
}
sc.peers = append(sc.peers, peer)
}

@ -773,7 +773,8 @@ func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfi
currentNode.SyncingPeerProvider = node.NewLocalSyncingPeerProvider(
6000, uint16(selfPort), epochConfig.NumShards(), uint32(epochConfig.NumNodesPerShard()))
} else {
currentNode.SyncingPeerProvider = node.NewDNSSyncingPeerProvider(hc.DNSSync.Zone, strconv.Itoa(hc.DNSSync.Port))
addrs := myHost.GetP2PHost().Addrs()
currentNode.SyncingPeerProvider = node.NewDNSSyncingPeerProvider(hc.DNSSync.Zone, strconv.Itoa(hc.DNSSync.Port), addrs)
}
currentNode.NodeConfig.DNSZone = hc.DNSSync.Zone

@ -87,7 +87,7 @@ type ISync interface {
UpdateBlockAndStatus(block *types.Block, bc core.BlockChain, verifyAllSig bool) error
AddLastMileBlock(block *types.Block)
GetActivePeerNumber() int
CreateSyncConfig(peers []p2p.Peer, shardID uint32, waitForEachPeerToConnect bool) error
CreateSyncConfig(peers []p2p.Peer, shardID uint32, selfPeerID libp2p_peer.ID, waitForEachPeerToConnect bool) error
SyncLoop(bc core.BlockChain, worker *worker.Worker, isBeacon bool, consensus *consensus.Consensus, loopMinTime time.Duration)
IsSynchronized() bool
IsSameBlockchainHeight(bc core.BlockChain) (uint64, bool)

@ -9,6 +9,7 @@ import (
"time"
"github.com/harmony-one/harmony/internal/tikv"
"github.com/multiformats/go-multiaddr"
prom "github.com/harmony-one/harmony/api/service/prometheus"
"github.com/prometheus/client_golang/prometheus"
@ -105,7 +106,8 @@ func (node *Node) createStateSync(bc core.BlockChain) *legacysync.StateSync {
mutatedPort := strconv.Itoa(mySyncPort + legacysync.SyncingPortDifference)
role := node.NodeConfig.Role()
return legacysync.CreateStateSync(bc, node.SelfPeer.IP, mutatedPort,
node.GetSyncID(), node.NodeConfig.Role() == nodeconfig.ExplorerNode, role)
node.GetSyncID(), node.host.GetID(),
node.NodeConfig.Role() == nodeconfig.ExplorerNode, role)
}
func (node *Node) createStagedSync(bc core.BlockChain) *stagedsync.StagedSync {
@ -151,14 +153,16 @@ type SyncingPeerProvider interface {
// DNSSyncingPeerProvider uses the given DNS zone to resolve syncing peers.
type DNSSyncingPeerProvider struct {
selfAddrs []multiaddr.Multiaddr
zone, port string
lookupHost func(name string) (addrs []string, err error)
}
// NewDNSSyncingPeerProvider returns a provider that uses given DNS name and
// port number to resolve syncing peers.
func NewDNSSyncingPeerProvider(zone, port string) *DNSSyncingPeerProvider {
func NewDNSSyncingPeerProvider(zone, port string, addrs []multiaddr.Multiaddr) *DNSSyncingPeerProvider {
return &DNSSyncingPeerProvider{
selfAddrs: addrs,
zone: zone,
port: port,
lookupHost: net.LookupHost,
@ -174,11 +178,27 @@ func (p *DNSSyncingPeerProvider) SyncingPeers(shardID uint32) (peers []p2p.Peer,
"[SYNC] cannot find peers using DNS name %#v", dns)
}
for _, addr := range addrs {
// no need to have peer itself on the list of connected peers
if p.getSelfAddrIndex(addr, p.port) >= 0 {
continue
}
peers = append(peers, p2p.Peer{IP: addr, Port: p.port})
}
return peers, nil
}
// getSelfAddrIndex returns address index if it is one of self addresses
func (p *DNSSyncingPeerProvider) getSelfAddrIndex(IP string, Port string) int {
peerAddr4 := fmt.Sprintf("/ip4/%s/tcp/%s", IP, Port)
peerAddr6 := fmt.Sprintf("/ip6/%s/tcp/%s", IP, Port)
for addrIndex, addr := range p.selfAddrs {
if addr.String() == peerAddr4 || addr.String() == peerAddr6 {
return addrIndex
}
}
return -1
}
// LocalSyncingPeerProvider uses localnet deployment convention to synthesize
// syncing peers.
type LocalSyncingPeerProvider struct {
@ -253,7 +273,8 @@ func (node *Node) doBeaconSyncing() {
continue
}
if err := node.epochSync.CreateSyncConfig(peers, shard.BeaconChainShardID, node.HarmonyConfig.P2P.WaitForEachPeerToConnect); err != nil {
if err := node.epochSync.CreateSyncConfig(peers, shard.BeaconChainShardID, node.host.GetID(),
node.HarmonyConfig.P2P.WaitForEachPeerToConnect); err != nil {
utils.Logger().Warn().Err(err).Msg("[EPOCHSYNC] cannot create beacon sync config")
continue
}
@ -296,7 +317,7 @@ func (node *Node) doSync(bc core.BlockChain, worker *worker.Worker, willJoinCons
Msg("cannot retrieve syncing peers")
return
}
if err := syncInstance.CreateSyncConfig(peers, shardID, node.HarmonyConfig.P2P.WaitForEachPeerToConnect); err != nil {
if err := syncInstance.CreateSyncConfig(peers, shardID, node.host.GetID(), node.HarmonyConfig.P2P.WaitForEachPeerToConnect); err != nil {
utils.Logger().Warn().
Err(err).
Interface("peers", peers).

Loading…
Cancel
Save