Small fixes and code cleanup for network stack. (#4320)

* staged dns sync v1.0

* enabled stream downloader for localnet

* fix code review issues

* remove extra lock

* staged dns sync v1.0

* Fixed, code clean up and other.

* Fixed, code clean up and other.

* Fixed, code clean up and other.

* Fix config.

Co-authored-by: “GheisMohammadi” <“Gheis.Mohammadi@gmail.com”>
pull/4323/head
Konstantin 2 years ago committed by GitHub
parent 0104b1d623
commit 6ade0bfa34
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 37
      api/service/legacysync/epoch_syncing.go
  2. 11
      api/service/legacysync/helpers.go
  3. 30
      api/service/legacysync/syncing.go
  4. 36
      api/service/legacysync/syncing_test.go
  5. 4
      node/node_syncing.go

@ -85,17 +85,18 @@ func (ss *EpochSync) GetActivePeerNumber() int {
}
// SyncLoop will keep syncing with peers until catches up
func (ss *EpochSync) SyncLoop(bc core.BlockChain, isBeacon bool, consensus *consensus.Consensus) time.Duration {
return time.Duration(ss.syncLoop(bc, isBeacon, consensus)) * time.Second
func (ss *EpochSync) SyncLoop(bc core.BlockChain, consensus *consensus.Consensus) time.Duration {
return time.Duration(syncLoop(bc, ss.syncConfig)) * time.Second
}
func (ss *EpochSync) syncLoop(bc core.BlockChain, isBeacon bool, _ *consensus.Consensus) (timeout int) {
maxHeight := getMaxPeerHeight(ss.syncConfig)
func syncLoop(bc core.BlockChain, syncConfig *SyncConfig) (timeout int) {
isBeacon := bc.ShardID() == 0
maxHeight := getMaxPeerHeight(syncConfig)
for {
if maxHeight == 0 || maxHeight == math.MaxUint64 {
utils.Logger().Info().
Msgf("[EPOCHSYNC] No peers to sync (isBeacon: %t, ShardID: %d, peersCount: %d)",
isBeacon, bc.ShardID(), ss.syncConfig.PeersCount())
isBeacon, bc.ShardID(), syncConfig.PeersCount())
return 10
}
@ -104,19 +105,19 @@ func (ss *EpochSync) syncLoop(bc core.BlockChain, isBeacon bool, _ *consensus.Co
if otherEpoch == curEpoch+1 {
utils.Logger().Info().
Msgf("[EPOCHSYNC] Node is now IN SYNC! (isBeacon: %t, ShardID: %d, otherEpoch: %d, currentEpoch: %d, peersCount: %d)",
isBeacon, bc.ShardID(), otherEpoch, curEpoch, ss.syncConfig.PeersCount())
isBeacon, bc.ShardID(), otherEpoch, curEpoch, syncConfig.PeersCount())
return 60
}
if otherEpoch < curEpoch {
for _, peerCfg := range ss.syncConfig.GetPeers() {
ss.syncConfig.RemovePeer(peerCfg, fmt.Sprintf("[EPOCHSYNC]: current height is higher that others, remove peers: %s", peerCfg.String()))
for _, peerCfg := range syncConfig.GetPeers() {
syncConfig.RemovePeer(peerCfg, fmt.Sprintf("[EPOCHSYNC]: current height is higher that others, remove peers: %s", peerCfg.String()))
}
return 2
}
utils.Logger().Info().
Msgf("[EPOCHSYNC] Node is OUT OF SYNC (isBeacon: %t, ShardID: %d, otherEpoch: %d, currentEpoch: %d, peers count %d)",
isBeacon, bc.ShardID(), otherEpoch, curEpoch, ss.syncConfig.PeersCount())
isBeacon, bc.ShardID(), otherEpoch, curEpoch, syncConfig.PeersCount())
var heights []uint64
loopEpoch := curEpoch + 1
@ -133,7 +134,7 @@ func (ss *EpochSync) syncLoop(bc core.BlockChain, isBeacon bool, _ *consensus.Co
return 10
}
err := ss.ProcessStateSync(heights, bc)
err := ProcessStateSync(syncConfig, heights, bc)
if err != nil {
utils.Logger().Error().Err(err).
Msgf("[EPOCHSYNC] ProcessStateSync failed (isBeacon: %t, ShardID: %d, otherEpoch: %d, currentEpoch: %d)",
@ -144,11 +145,11 @@ func (ss *EpochSync) syncLoop(bc core.BlockChain, isBeacon bool, _ *consensus.Co
}
// ProcessStateSync processes state sync from the blocks received but not yet processed so far
func (ss *EpochSync) ProcessStateSync(heights []uint64, bc core.BlockChain) error {
func ProcessStateSync(syncConfig *SyncConfig, heights []uint64, bc core.BlockChain) error {
var payload [][]byte
var peerCfg *SyncPeerConfig
peers := ss.syncConfig.GetPeers()
peers := syncConfig.GetPeers()
if len(peers) == 0 {
return errors.New("no peers to sync")
}
@ -156,11 +157,11 @@ func (ss *EpochSync) ProcessStateSync(heights []uint64, bc core.BlockChain) erro
for index, peerConfig := range peers {
resp := peerConfig.GetClient().GetBlocksByHeights(heights)
if resp == nil {
ss.syncConfig.RemovePeer(peerConfig, fmt.Sprintf("[EPOCHSYNC]: no response from peer: #%d %s, count %d", index, peerConfig.String(), len(peers)))
syncConfig.RemovePeer(peerConfig, fmt.Sprintf("[EPOCHSYNC]: no response from peer: #%d %s, count %d", index, peerConfig.String(), len(peers)))
continue
}
if len(resp.Payload) == 0 {
ss.syncConfig.RemovePeer(peerConfig, fmt.Sprintf("[EPOCHSYNC]: empty payload response from peer: #%d %s, count %d", index, peerConfig.String(), len(peers)))
syncConfig.RemovePeer(peerConfig, fmt.Sprintf("[EPOCHSYNC]: empty payload response from peer: #%d %s, count %d", index, peerConfig.String(), len(peers)))
continue
}
payload = resp.Payload
@ -168,12 +169,12 @@ func (ss *EpochSync) ProcessStateSync(heights []uint64, bc core.BlockChain) erro
break
}
if len(payload) == 0 {
return errors.Errorf("empty payload: no blocks were returned by GetBlocksByHeights for all peers, currentPeersCount %d", ss.syncConfig.PeersCount())
return errors.Errorf("empty payload: no blocks were returned by GetBlocksByHeights for all peers, currentPeersCount %d", syncConfig.PeersCount())
}
err := ss.processWithPayload(payload, bc)
err := processWithPayload(payload, bc)
if err != nil {
// Assume that node sent us invalid data.
ss.syncConfig.RemovePeer(peerCfg, fmt.Sprintf("[EPOCHSYNC]: failed to process with payload from peer: %s", err.Error()))
syncConfig.RemovePeer(peerCfg, fmt.Sprintf("[EPOCHSYNC]: failed to process with payload from peer: %s", err.Error()))
utils.Logger().Error().Err(err).
Msgf("[EPOCHSYNC] Removing peer %s for invalid data", peerCfg.String())
return err
@ -181,7 +182,7 @@ func (ss *EpochSync) ProcessStateSync(heights []uint64, bc core.BlockChain) erro
return nil
}
func (ss *EpochSync) processWithPayload(payload [][]byte, bc core.BlockChain) error {
func processWithPayload(payload [][]byte, bc core.BlockChain) error {
decoded := make([]*types.Block, 0, len(payload))
for idx, blockBytes := range payload {
block, err := RlpDecodeBlockOrBlockWithSig(blockBytes)

@ -28,11 +28,11 @@ func getMaxPeerHeight(syncConfig *SyncConfig) uint64 {
// utils.Logger().Debug().Bool("isBeacon", isBeacon).Str("peerIP", peerConfig.ip).Str("peerPort", peerConfig.port).Msg("[Sync]getMaxPeerHeight")
response, err := peerConfig.client.GetBlockChainHeight()
if err != nil {
utils.Logger().Warn().Err(err).Str("peerIP", peerConfig.ip).Str("peerPort", peerConfig.port).Msg("[Sync]GetBlockChainHeight failed")
utils.Logger().Warn().Err(err).Str("peerIP", peerConfig.peer.IP).Str("peerPort", peerConfig.peer.Port).Msg("[Sync]GetBlockChainHeight failed")
syncConfig.RemovePeer(peerConfig, fmt.Sprintf("failed getMaxPeerHeight for shard %d with message: %s", syncConfig.ShardID(), err.Error()))
return
}
utils.Logger().Info().Str("peerIP", peerConfig.ip).Uint64("blockHeight", response.BlockHeight).
utils.Logger().Info().Str("peerIP", peerConfig.peer.IP).Uint64("blockHeight", response.BlockHeight).
Msg("[SYNC] getMaxPeerHeight")
lock.Lock()
@ -86,8 +86,7 @@ func createSyncConfig(syncConfig *SyncConfig, peers []p2p.Peer, shardID uint32,
return
}
peerConfig := &SyncPeerConfig{
ip: peer.IP,
port: peer.Port,
peer: peer,
client: client,
}
syncConfig.AddPeer(peerConfig)
@ -102,8 +101,7 @@ func createSyncConfig(syncConfig *SyncConfig, peers []p2p.Peer, shardID uint32,
continue
}
peerConfig := &SyncPeerConfig{
ip: peer.IP,
port: peer.Port,
peer: peer,
client: client,
}
syncConfig.AddPeer(peerConfig)
@ -113,7 +111,6 @@ func createSyncConfig(syncConfig *SyncConfig, peers []p2p.Peer, shardID uint32,
}
}
}
utils.Logger().Info().
Int("len", len(syncConfig.peers)).
Uint32("shardID", shardID).

@ -51,8 +51,7 @@ const (
// SyncPeerConfig is peer config to sync.
type SyncPeerConfig struct {
ip string
port string
peer p2p.Peer
peerHash []byte
client *downloader.Client
blockHashes [][]byte // block hashes before node doing sync
@ -67,7 +66,7 @@ func (peerConfig *SyncPeerConfig) GetClient() *downloader.Client {
// IsEqual checks the equality between two sync peers
func (peerConfig *SyncPeerConfig) IsEqual(pc2 *SyncPeerConfig) bool {
return peerConfig.ip == pc2.ip && peerConfig.port == pc2.port
return peerConfig.peer.IP == pc2.peer.IP && peerConfig.peer.Port == pc2.peer.Port
}
// SyncBlockTask is the task struct to sync a specific block.
@ -164,6 +163,9 @@ func (sc *SyncConfig) ForEachPeer(f func(peer *SyncPeerConfig) (brk bool)) {
}
func (sc *SyncConfig) PeersCount() int {
if sc == nil {
return 0
}
sc.mtx.RLock()
defer sc.mtx.RUnlock()
return len(sc.peers)
@ -183,8 +185,8 @@ func (sc *SyncConfig) RemovePeer(peer *SyncPeerConfig, reason string) {
}
}
utils.Logger().Info().
Str("peerIP", peer.ip).
Str("peerPortMsg", peer.port).
Str("peerIP", peer.peer.IP).
Str("peerPortMsg", peer.peer.Port).
Str("reason", reason).
Msg("[SYNC] remove GRPC peer")
}
@ -497,7 +499,7 @@ func (sc *SyncConfig) GetBlockHashesConsensusAndCleanUp() error {
}
utils.Logger().Info().
Int("maxFirstID", maxFirstID).
Str("targetPeerIP", sc.peers[maxFirstID].ip).
Str("targetPeerIP", sc.peers[maxFirstID].peer.IP).
Int("maxCount", maxCount).
Int("hashSize", len(sc.peers[maxFirstID].blockHashes)).
Msg("[SYNC] block consensus hashes")
@ -517,15 +519,15 @@ func (ss *StateSync) getConsensusHashes(startHash []byte, size uint32) error {
response := peerConfig.client.GetBlockHashes(startHash, size, ss.selfip, ss.selfport)
if response == nil {
utils.Logger().Warn().
Str("peerIP", peerConfig.ip).
Str("peerPort", peerConfig.port).
Str("peerIP", peerConfig.peer.IP).
Str("peerPort", peerConfig.peer.Port).
Msg("[SYNC] getConsensusHashes Nil Response")
ss.syncConfig.RemovePeer(peerConfig, fmt.Sprintf("StateSync %d: nil response for GetBlockHashes", ss.blockChain.ShardID()))
return
}
utils.Logger().Info().Uint32("queried blockHash size", size).
Int("got blockHashSize", len(response.Payload)).
Str("PeerIP", peerConfig.ip).
Str("PeerIP", peerConfig.peer.IP).
Msg("[SYNC] GetBlockHashes")
if len(response.Payload) > int(size+1) {
utils.Logger().Warn().
@ -587,8 +589,8 @@ func (ss *StateSync) downloadBlocks(bc core.BlockChain) {
payload, err := peerConfig.GetBlocks(tasks.blockHashes())
if err != nil {
utils.Logger().Warn().Err(err).
Str("peerID", peerConfig.ip).
Str("port", peerConfig.port).
Str("peerID", peerConfig.peer.IP).
Str("port", peerConfig.peer.Port).
Msg("[SYNC] downloadBlocks: GetBlocks failed")
ss.syncConfig.RemovePeer(peerConfig, fmt.Sprintf("StateSync %d: error returned for GetBlocks: %s", ss.blockChain.ShardID(), err.Error()))
return
@ -1009,7 +1011,7 @@ func (peerConfig *SyncPeerConfig) registerToBroadcast(peerHash []byte, ip, port
}
func (peerConfig *SyncPeerConfig) String() interface{} {
return fmt.Sprintf("peer: %s:%s ", peerConfig.ip, peerConfig.port)
return fmt.Sprintf("peer: %s:%s ", peerConfig.peer.IP, peerConfig.peer.Port)
}
// RegisterNodeInfo will register node to peers to accept future new block broadcasting
@ -1023,12 +1025,12 @@ func (ss *StateSync) RegisterNodeInfo() int {
count := 0
ss.syncConfig.ForEachPeer(func(peerConfig *SyncPeerConfig) (brk bool) {
logger := utils.Logger().With().Str("peerPort", peerConfig.port).Str("peerIP", peerConfig.ip).Logger()
logger := utils.Logger().With().Str("peerPort", peerConfig.peer.Port).Str("peerIP", peerConfig.peer.IP).Logger()
if count >= registrationNumber {
brk = true
return
}
if peerConfig.ip == ss.selfip && peerConfig.port == GetSyncingPort(ss.selfport) {
if peerConfig.peer.IP == ss.selfip && peerConfig.peer.Port == GetSyncingPort(ss.selfport) {
logger.Debug().
Str("selfport", ss.selfport).
Str("selfsyncport", GetSyncingPort(ss.selfport)).

@ -29,34 +29,46 @@ func TestSyncPeerConfig_IsEqual(t *testing.T) {
}{
{
p1: &SyncPeerConfig{
ip: "0.0.0.1",
port: "1",
peer: p2p.Peer{
IP: "0.0.0.1",
Port: "1",
},
},
p2: &SyncPeerConfig{
ip: "0.0.0.1",
port: "2",
peer: p2p.Peer{
IP: "0.0.0.1",
Port: "2",
},
},
exp: false,
},
{
p1: &SyncPeerConfig{
ip: "0.0.0.1",
port: "1",
peer: p2p.Peer{
IP: "0.0.0.1",
Port: "1",
},
},
p2: &SyncPeerConfig{
ip: "0.0.0.2",
port: "1",
peer: p2p.Peer{
IP: "0.0.0.2",
Port: "1",
},
},
exp: false,
},
{
p1: &SyncPeerConfig{
ip: "0.0.0.1",
port: "1",
peer: p2p.Peer{
IP: "0.0.0.1",
Port: "1",
},
},
p2: &SyncPeerConfig{
ip: "0.0.0.1",
port: "1",
peer: p2p.Peer{
IP: "0.0.0.1",
Port: "1",
},
},
exp: true,
},

@ -259,7 +259,7 @@ func (node *Node) doBeaconSyncing() {
}
}
<-time.After(node.epochSync.SyncLoop(node.EpochChain(), true, nil))
<-time.After(node.epochSync.SyncLoop(node.EpochChain(), nil))
}
}
@ -296,7 +296,7 @@ func (node *Node) doSync(bc core.BlockChain, worker *worker.Worker, willJoinCons
Msg("cannot retrieve syncing peers")
return
}
if err := node.stateSync.CreateSyncConfig(peers, shardID, node.HarmonyConfig.P2P.WaitForEachPeerToConnect); err != nil {
if err := syncInstance.CreateSyncConfig(peers, shardID, node.HarmonyConfig.P2P.WaitForEachPeerToConnect); err != nil {
utils.Logger().Warn().
Err(err).
Interface("peers", peers).

Loading…
Cancel
Save