fix state sync response message flooding

pull/652/head
chao 6 years ago committed by chaosma
parent 496d4e4fde
commit 6c7ae4cb64
  1. 14
      api/service/syncing/syncing.go
  2. 16
      node/node_syncing.go

@ -2,8 +2,10 @@ package syncing
import ( import (
"bytes" "bytes"
"fmt"
"reflect" "reflect"
"sort" "sort"
"strconv"
"sync" "sync"
"time" "time"
@ -546,6 +548,10 @@ func (ss *StateSync) RegisterNodeInfo() int {
if count >= registrationNumber { if count >= registrationNumber {
break break
} }
if peerConfig.ip == ss.selfip && peerConfig.port == GetSyncingPort(ss.selfport) {
utils.GetLogInstance().Debug("[SYNC] skip self", "peerport", peerConfig.port, "selfport", ss.selfport, "selfsyncport", GetSyncingPort(ss.selfport))
continue
}
if peerConfig.client == nil { if peerConfig.client == nil {
continue continue
} }
@ -600,3 +606,11 @@ func (ss *StateSync) SyncLoop(bc *core.BlockChain, worker *worker.Worker, willJo
ss.ProcessStateSync(startHash[:], bc, worker) ss.ProcessStateSync(startHash[:], bc, worker)
} }
} }
// GetSyncingPort returns the syncing port.
func GetSyncingPort(nodePort string) string {
if port, err := strconv.Atoi(nodePort); err == nil {
return fmt.Sprintf("%d", port-SyncingPortDifference)
}
return ""
}

@ -2,8 +2,6 @@ package node
import ( import (
"bytes" "bytes"
"fmt"
"strconv"
"sync" "sync"
"time" "time"
@ -27,14 +25,6 @@ const (
SyncFrequency = 10 // unit in second SyncFrequency = 10 // unit in second
) )
// GetSyncingPort returns the syncing port.
func GetSyncingPort(nodePort string) string {
if port, err := strconv.Atoi(nodePort); err == nil {
return fmt.Sprintf("%d", port-syncing.SyncingPortDifference)
}
return ""
}
// getNeighborPeers is a helper function to return list of peers // getNeighborPeers is a helper function to return list of peers
// based on different neightbor map // based on different neightbor map
func (node *Node) getNeighborPeers(neighbor *sync.Map) []p2p.Peer { func (node *Node) getNeighborPeers(neighbor *sync.Map) []p2p.Peer {
@ -42,7 +32,7 @@ func (node *Node) getNeighborPeers(neighbor *sync.Map) []p2p.Peer {
neighbor.Range(func(k, v interface{}) bool { neighbor.Range(func(k, v interface{}) bool {
p := v.(p2p.Peer) p := v.(p2p.Peer)
t := p.Port t := p.Port
p.Port = GetSyncingPort(t) p.Port = syncing.GetSyncingPort(t)
tmp = append(tmp, p) tmp = append(tmp, p)
return true return true
}) })
@ -127,7 +117,7 @@ func (node *Node) InitSyncingServer() {
// StartSyncingServer starts syncing server. // StartSyncingServer starts syncing server.
func (node *Node) StartSyncingServer() { func (node *Node) StartSyncingServer() {
utils.GetLogInstance().Info("support_sycning: StartSyncingServer") utils.GetLogInstance().Info("support_sycning: StartSyncingServer")
node.downloaderServer.Start(node.SelfPeer.IP, GetSyncingPort(node.SelfPeer.Port)) node.downloaderServer.Start(node.SelfPeer.IP, syncing.GetSyncingPort(node.SelfPeer.Port))
} }
// SendNewBlockToUnsync send latest verified block to unsync, registered nodes // SendNewBlockToUnsync send latest verified block to unsync, registered nodes
@ -231,7 +221,7 @@ func (node *Node) CalculateResponse(request *downloader_pb.DownloaderRequest) (*
return response, nil return response, nil
} else { } else {
response.Type = downloader_pb.DownloaderResponse_FAIL response.Type = downloader_pb.DownloaderResponse_FAIL
syncPort := GetSyncingPort(port) syncPort := syncing.GetSyncingPort(port)
client := downloader.ClientSetup(ip, syncPort) client := downloader.ClientSetup(ip, syncPort)
if client == nil { if client == nil {
utils.GetLogInstance().Warn("[SYNC] unable to setup client for peerID", "ip", ip, "port", port) utils.GetLogInstance().Warn("[SYNC] unable to setup client for peerID", "ip", ip, "port", port)

Loading…
Cancel
Save