You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
658 lines
20 KiB
658 lines
20 KiB
package syncing
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"reflect"
|
|
"sort"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/Workiva/go-datastructures/queue"
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/ethereum/go-ethereum/rlp"
|
|
"github.com/harmony-one/harmony/api/service/syncing/downloader"
|
|
pb "github.com/harmony-one/harmony/api/service/syncing/downloader/proto"
|
|
"github.com/harmony-one/harmony/core"
|
|
"github.com/harmony-one/harmony/core/types"
|
|
"github.com/harmony-one/harmony/internal/ctxerror"
|
|
"github.com/harmony-one/harmony/internal/utils"
|
|
"github.com/harmony-one/harmony/node/worker"
|
|
"github.com/harmony-one/harmony/p2p"
|
|
)
|
|
|
|
// Constants for syncing.
|
|
const (
|
|
ConsensusRatio = float64(0.66)
|
|
SleepTimeAfterNonConsensusBlockHashes = time.Second * 30
|
|
TimesToFail = 5 // Downloadblocks service retry limit
|
|
RegistrationNumber = 3
|
|
SyncingPortDifference = 3000
|
|
inSyncThreshold = 0 // when peerBlockHeight - myBlockHeight <= inSyncThreshold, it's ready to join consensus
|
|
)
|
|
|
|
// SyncPeerConfig is peer config to sync.
|
|
type SyncPeerConfig struct {
|
|
ip string
|
|
port string
|
|
peerHash []byte
|
|
client *downloader.Client
|
|
blockHashes [][]byte // block hashes before node doing sync
|
|
newBlocks []*types.Block // blocks after node doing sync
|
|
mux sync.Mutex
|
|
}
|
|
|
|
// GetClient returns client pointer of downloader.Client
|
|
func (peerConfig *SyncPeerConfig) GetClient() *downloader.Client {
|
|
return peerConfig.client
|
|
}
|
|
|
|
// SyncBlockTask is the task struct to sync a specific block.
|
|
type SyncBlockTask struct {
|
|
index int
|
|
blockHash []byte
|
|
}
|
|
|
|
// SyncConfig contains an array of SyncPeerConfig.
|
|
type SyncConfig struct {
|
|
// mtx locks peers, and *SyncPeerConfig pointers in peers.
|
|
// SyncPeerConfig itself is guarded by its own mutex.
|
|
mtx sync.RWMutex
|
|
|
|
peers []*SyncPeerConfig
|
|
}
|
|
|
|
// AddPeer adds the given sync peer.
|
|
func (sc *SyncConfig) AddPeer(peer *SyncPeerConfig) {
|
|
sc.mtx.Lock()
|
|
defer sc.mtx.Unlock()
|
|
sc.peers = append(sc.peers, peer)
|
|
}
|
|
|
|
// ForEachPeer calls the given function with each peer.
|
|
// It breaks the iteration iff the function returns true.
|
|
func (sc *SyncConfig) ForEachPeer(f func(peer *SyncPeerConfig) (brk bool)) {
|
|
sc.mtx.RLock()
|
|
defer sc.mtx.RUnlock()
|
|
for _, peer := range sc.peers {
|
|
if f(peer) {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// CreateStateSync returns the implementation of StateSyncInterface interface.
|
|
func CreateStateSync(ip string, port string, peerHash [20]byte) *StateSync {
|
|
stateSync := &StateSync{}
|
|
stateSync.selfip = ip
|
|
stateSync.selfport = port
|
|
stateSync.selfPeerHash = peerHash
|
|
stateSync.commonBlocks = make(map[int]*types.Block)
|
|
stateSync.lastMileBlocks = []*types.Block{}
|
|
return stateSync
|
|
}
|
|
|
|
// StateSync is the struct that implements StateSyncInterface.
|
|
type StateSync struct {
|
|
selfip string
|
|
selfport string
|
|
selfPeerHash [20]byte // hash of ip and address combination
|
|
commonBlocks map[int]*types.Block
|
|
lastMileBlocks []*types.Block // last mile blocks to catch up with the consensus
|
|
syncConfig *SyncConfig
|
|
stateSyncTaskQueue *queue.Queue
|
|
syncMux sync.Mutex
|
|
}
|
|
|
|
// AddLastMileBlock add the lastest a few block into queue for syncing
|
|
func (ss *StateSync) AddLastMileBlock(block *types.Block) {
|
|
ss.syncMux.Lock()
|
|
defer ss.syncMux.Unlock()
|
|
ss.lastMileBlocks = append(ss.lastMileBlocks, block)
|
|
}
|
|
|
|
// CloseConnections close grpc connections for state sync clients
|
|
func (sc *SyncConfig) CloseConnections() {
|
|
sc.mtx.RLock()
|
|
defer sc.mtx.RUnlock()
|
|
for _, pc := range sc.peers {
|
|
pc.client.Close()
|
|
}
|
|
}
|
|
|
|
// FindPeerByHash returns the peer with the given hash, or nil if not found.
|
|
func (sc *SyncConfig) FindPeerByHash(peerHash []byte) *SyncPeerConfig {
|
|
sc.mtx.RLock()
|
|
defer sc.mtx.RUnlock()
|
|
for _, pc := range sc.peers {
|
|
if bytes.Compare(pc.peerHash, peerHash) == 0 {
|
|
return pc
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// AddNewBlock will add newly received block into state syncing queue
|
|
func (ss *StateSync) AddNewBlock(peerHash []byte, block *types.Block) {
|
|
pc := ss.syncConfig.FindPeerByHash(peerHash)
|
|
if pc == nil {
|
|
// Received a block with no active peer; just ignore.
|
|
return
|
|
}
|
|
// TODO ek – we shouldn't mess with SyncPeerConfig's mutex.
|
|
// Factor this into a method, like pc.AddNewBlock(block)
|
|
pc.mux.Lock()
|
|
defer pc.mux.Unlock()
|
|
pc.newBlocks = append(pc.newBlocks, block)
|
|
utils.GetLogInstance().Debug("[SYNC] new block received", "total", len(pc.newBlocks), "blockHeight", block.NumberU64())
|
|
}
|
|
|
|
// CreateTestSyncPeerConfig used for testing.
|
|
func CreateTestSyncPeerConfig(client *downloader.Client, blockHashes [][]byte) *SyncPeerConfig {
|
|
return &SyncPeerConfig{
|
|
client: client,
|
|
blockHashes: blockHashes,
|
|
}
|
|
}
|
|
|
|
// CompareSyncPeerConfigByblockHashes compares two SyncPeerConfig by blockHashes.
|
|
func CompareSyncPeerConfigByblockHashes(a *SyncPeerConfig, b *SyncPeerConfig) int {
|
|
if len(a.blockHashes) != len(b.blockHashes) {
|
|
if len(a.blockHashes) < len(b.blockHashes) {
|
|
return -1
|
|
}
|
|
return 1
|
|
}
|
|
for id := range a.blockHashes {
|
|
if !reflect.DeepEqual(a.blockHashes[id], b.blockHashes[id]) {
|
|
return bytes.Compare(a.blockHashes[id], b.blockHashes[id])
|
|
}
|
|
}
|
|
return 0
|
|
}
|
|
|
|
// GetBlocks gets blocks by calling grpc request to the corresponding peer.
|
|
func (peerConfig *SyncPeerConfig) GetBlocks(hashes [][]byte) ([][]byte, error) {
|
|
response := peerConfig.client.GetBlocks(hashes)
|
|
if response == nil {
|
|
return nil, ErrGetBlock
|
|
}
|
|
return response.Payload, nil
|
|
}
|
|
|
|
// CreateSyncConfig creates SyncConfig for StateSync object.
|
|
func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer, isBeacon bool) error {
|
|
utils.GetLogInstance().Debug("CreateSyncConfig: len of peers", "len", len(peers), "isBeacon", isBeacon)
|
|
if len(peers) == 0 {
|
|
return ctxerror.New("[SYNC] no peers to connect to")
|
|
}
|
|
ss.syncConfig = &SyncConfig{}
|
|
var wg sync.WaitGroup
|
|
for _, peer := range peers {
|
|
wg.Add(1)
|
|
go func(peer p2p.Peer) {
|
|
defer wg.Done()
|
|
client := downloader.ClientSetup(peer.IP, peer.Port)
|
|
if client == nil {
|
|
return
|
|
}
|
|
peerConfig := &SyncPeerConfig{
|
|
ip: peer.IP,
|
|
port: peer.Port,
|
|
client: client,
|
|
}
|
|
ss.syncConfig.AddPeer(peerConfig)
|
|
}(peer)
|
|
}
|
|
wg.Wait()
|
|
utils.GetLogInstance().Info("[SYNC] Finished making connection to peers.", "len", len(ss.syncConfig.peers), "isBeacon", isBeacon)
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetActivePeerNumber returns the number of active peers
|
|
func (ss *StateSync) GetActivePeerNumber() int {
|
|
if ss.syncConfig == nil {
|
|
return 0
|
|
}
|
|
// len() is atomic; no need to hold mutex.
|
|
return len(ss.syncConfig.peers)
|
|
}
|
|
|
|
// getHowManyMaxConsensus returns max number of consensus nodes and the first ID of consensus group.
|
|
// Assumption: all peers are sorted by CompareSyncPeerConfigByBlockHashes first.
|
|
// Caller shall ensure mtx is locked for reading.
|
|
func (sc *SyncConfig) getHowManyMaxConsensus() (int, int) {
|
|
// As all peers are sorted by their blockHashes, all equal blockHashes should come together and consecutively.
|
|
curCount := 0
|
|
curFirstID := -1
|
|
maxCount := 0
|
|
maxFirstID := -1
|
|
for i := range sc.peers {
|
|
if curFirstID == -1 || CompareSyncPeerConfigByblockHashes(sc.peers[curFirstID], sc.peers[i]) != 0 {
|
|
curCount = 1
|
|
curFirstID = i
|
|
} else {
|
|
curCount++
|
|
}
|
|
if curCount > maxCount {
|
|
maxCount = curCount
|
|
maxFirstID = curFirstID
|
|
}
|
|
}
|
|
return maxFirstID, maxCount
|
|
}
|
|
|
|
// InitForTesting used for testing.
|
|
func (sc *SyncConfig) InitForTesting(client *downloader.Client, blockHashes [][]byte) {
|
|
sc.mtx.RLock()
|
|
defer sc.mtx.RUnlock()
|
|
for i := range sc.peers {
|
|
sc.peers[i].blockHashes = blockHashes
|
|
sc.peers[i].client = client
|
|
}
|
|
}
|
|
|
|
// cleanUpPeers cleans up all peers whose blockHashes are not equal to
|
|
// consensus block hashes. Caller shall ensure mtx is locked for RW.
|
|
func (sc *SyncConfig) cleanUpPeers(maxFirstID int) {
|
|
fixedPeer := sc.peers[maxFirstID]
|
|
for i := 0; i < len(sc.peers); i++ {
|
|
if CompareSyncPeerConfigByblockHashes(fixedPeer, sc.peers[i]) != 0 {
|
|
// TODO: move it into a util delete func.
|
|
// See tip https://github.com/golang/go/wiki/SliceTricks
|
|
// Close the client and remove the peer out of the
|
|
sc.peers[i].client.Close()
|
|
copy(sc.peers[i:], sc.peers[i+1:])
|
|
sc.peers[len(sc.peers)-1] = nil
|
|
sc.peers = sc.peers[:len(sc.peers)-1]
|
|
}
|
|
}
|
|
}
|
|
|
|
// GetBlockHashesConsensusAndCleanUp chesk if all consensus hashes are equal.
|
|
func (sc *SyncConfig) GetBlockHashesConsensusAndCleanUp() bool {
|
|
sc.mtx.Lock()
|
|
defer sc.mtx.Unlock()
|
|
// Sort all peers by the blockHashes.
|
|
sort.Slice(sc.peers, func(i, j int) bool {
|
|
return CompareSyncPeerConfigByblockHashes(sc.peers[i], sc.peers[j]) == -1
|
|
})
|
|
maxFirstID, maxCount := sc.getHowManyMaxConsensus()
|
|
utils.GetLogInstance().Info("[SYNC] block consensus hashes", "maxFirstID", maxFirstID, "maxCount", maxCount)
|
|
if float64(maxCount) >= ConsensusRatio*float64(len(sc.peers)) {
|
|
sc.cleanUpPeers(maxFirstID)
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
// GetConsensusHashes gets all hashes needed to download.
|
|
func (ss *StateSync) GetConsensusHashes(startHash []byte) bool {
|
|
count := 0
|
|
for {
|
|
var wg sync.WaitGroup
|
|
ss.syncConfig.ForEachPeer(func(peerConfig *SyncPeerConfig) (brk bool) {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
response := peerConfig.client.GetBlockHashes(startHash)
|
|
if response == nil {
|
|
return
|
|
}
|
|
peerConfig.blockHashes = response.Payload
|
|
}()
|
|
return
|
|
})
|
|
wg.Wait()
|
|
if ss.syncConfig.GetBlockHashesConsensusAndCleanUp() {
|
|
break
|
|
}
|
|
if count > TimesToFail {
|
|
utils.GetLogInstance().Info("GetConsensusHashes: reached retry limit")
|
|
return false
|
|
}
|
|
count++
|
|
time.Sleep(SleepTimeAfterNonConsensusBlockHashes)
|
|
}
|
|
utils.GetLogInstance().Info("syncing: Finished getting consensus block hashes.")
|
|
return true
|
|
}
|
|
|
|
func (ss *StateSync) generateStateSyncTaskQueue(bc *core.BlockChain) {
|
|
ss.stateSyncTaskQueue = queue.New(0)
|
|
ss.syncConfig.ForEachPeer(func(configPeer *SyncPeerConfig) (brk bool) {
|
|
for id, blockHash := range configPeer.blockHashes {
|
|
if err := ss.stateSyncTaskQueue.Put(SyncBlockTask{index: id, blockHash: blockHash}); err != nil {
|
|
ctxerror.Warn(utils.GetLogger(), err, "cannot add task",
|
|
"taskIndex", id, "taskBlock", hex.EncodeToString(blockHash))
|
|
}
|
|
}
|
|
brk = true
|
|
return
|
|
})
|
|
utils.GetLogInstance().Info("syncing: Finished generateStateSyncTaskQueue", "length", ss.stateSyncTaskQueue.Len())
|
|
}
|
|
|
|
// downloadBlocks downloads blocks from state sync task queue.
|
|
func (ss *StateSync) downloadBlocks(bc *core.BlockChain) {
|
|
// Initialize blockchain
|
|
var wg sync.WaitGroup
|
|
count := 0
|
|
ss.syncConfig.ForEachPeer(func(peerConfig *SyncPeerConfig) (brk bool) {
|
|
wg.Add(1)
|
|
go func(stateSyncTaskQueue *queue.Queue, bc *core.BlockChain) {
|
|
defer wg.Done()
|
|
for !stateSyncTaskQueue.Empty() {
|
|
task, err := ss.stateSyncTaskQueue.Poll(1, time.Millisecond)
|
|
if err == queue.ErrTimeout || len(task) == 0 {
|
|
utils.GetLogInstance().Debug("[SYNC] ss.stateSyncTaskQueue poll timeout", "error", err)
|
|
break
|
|
}
|
|
syncTask := task[0].(SyncBlockTask)
|
|
//id := syncTask.index
|
|
payload, err := peerConfig.GetBlocks([][]byte{syncTask.blockHash})
|
|
if err != nil || len(payload) == 0 {
|
|
count++
|
|
utils.GetLogInstance().Debug("[SYNC] GetBlocks failed", "failNumber", count)
|
|
if count > TimesToFail {
|
|
break
|
|
}
|
|
if err := ss.stateSyncTaskQueue.Put(syncTask); err != nil {
|
|
ctxerror.Warn(utils.GetLogger(), err, "cannot add task",
|
|
"taskIndex", syncTask.index,
|
|
"taskBlock", hex.EncodeToString(syncTask.blockHash))
|
|
}
|
|
continue
|
|
}
|
|
|
|
var blockObj types.Block
|
|
// currently only send one block a time
|
|
err = rlp.DecodeBytes(payload[0], &blockObj)
|
|
|
|
if err != nil {
|
|
count++
|
|
utils.GetLogInstance().Debug("[SYNC] downloadBlocks: failed to DecodeBytes from received new block")
|
|
if count > TimesToFail {
|
|
break
|
|
}
|
|
if err := ss.stateSyncTaskQueue.Put(syncTask); err != nil {
|
|
ctxerror.Warn(utils.GetLogger(), err, "cannot add task",
|
|
"taskIndex", syncTask.index,
|
|
"taskBlock", hex.EncodeToString(syncTask.blockHash))
|
|
}
|
|
continue
|
|
}
|
|
ss.syncMux.Lock()
|
|
ss.commonBlocks[syncTask.index] = &blockObj
|
|
ss.syncMux.Unlock()
|
|
}
|
|
}(ss.stateSyncTaskQueue, bc)
|
|
return
|
|
})
|
|
wg.Wait()
|
|
utils.GetLogInstance().Info("[SYNC] Finished downloadBlocks.")
|
|
}
|
|
|
|
// CompareBlockByHash compares two block by hash, it will be used in sort the blocks
|
|
func CompareBlockByHash(a *types.Block, b *types.Block) int {
|
|
ha := a.Hash()
|
|
hb := b.Hash()
|
|
return bytes.Compare(ha[:], hb[:])
|
|
}
|
|
|
|
// GetHowManyMaxConsensus will get the most common blocks and the first such blockID
|
|
func GetHowManyMaxConsensus(blocks []*types.Block) (int, int) {
|
|
// As all peers are sorted by their blockHashes, all equal blockHashes should come together and consecutively.
|
|
curCount := 0
|
|
curFirstID := -1
|
|
maxCount := 0
|
|
maxFirstID := -1
|
|
for i := range blocks {
|
|
if curFirstID == -1 || CompareBlockByHash(blocks[curFirstID], blocks[i]) != 0 {
|
|
curCount = 1
|
|
curFirstID = i
|
|
} else {
|
|
curCount++
|
|
}
|
|
if curCount > maxCount {
|
|
maxCount = curCount
|
|
maxFirstID = curFirstID
|
|
}
|
|
}
|
|
return maxFirstID, maxCount
|
|
}
|
|
|
|
func (ss *StateSync) getMaxConsensusBlockFromParentHash(parentHash common.Hash) *types.Block {
|
|
candidateBlocks := []*types.Block{}
|
|
ss.syncMux.Lock()
|
|
ss.syncConfig.ForEachPeer(func(peerConfig *SyncPeerConfig) (brk bool) {
|
|
for _, block := range peerConfig.newBlocks {
|
|
ph := block.ParentHash()
|
|
if bytes.Compare(ph[:], parentHash[:]) == 0 {
|
|
candidateBlocks = append(candidateBlocks, block)
|
|
break
|
|
}
|
|
}
|
|
return
|
|
})
|
|
ss.syncMux.Unlock()
|
|
if len(candidateBlocks) == 0 {
|
|
return nil
|
|
}
|
|
// Sort by blockHashes.
|
|
sort.Slice(candidateBlocks, func(i, j int) bool {
|
|
return CompareBlockByHash(candidateBlocks[i], candidateBlocks[j]) == -1
|
|
})
|
|
maxFirstID, maxCount := GetHowManyMaxConsensus(candidateBlocks)
|
|
utils.GetLogInstance().Debug("[SYNC] Find block with matching parenthash", "parentHash", parentHash, "hash", candidateBlocks[maxFirstID].Hash(), "maxCount", maxCount)
|
|
return candidateBlocks[maxFirstID]
|
|
}
|
|
|
|
func (ss *StateSync) getBlockFromOldBlocksByParentHash(parentHash common.Hash) *types.Block {
|
|
for _, block := range ss.commonBlocks {
|
|
ph := block.ParentHash()
|
|
if bytes.Compare(ph[:], parentHash[:]) == 0 {
|
|
return block
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (ss *StateSync) getBlockFromLastMileBlocksByParentHash(parentHash common.Hash) *types.Block {
|
|
for _, block := range ss.lastMileBlocks {
|
|
ph := block.ParentHash()
|
|
if bytes.Compare(ph[:], parentHash[:]) == 0 {
|
|
return block
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (ss *StateSync) updateBlockAndStatus(block *types.Block, bc *core.BlockChain, worker *worker.Worker) bool {
|
|
utils.GetLogInstance().Info("[SYNC] Current Block", "blockHex", bc.CurrentBlock().Hash().Hex())
|
|
_, err := bc.InsertChain([]*types.Block{block})
|
|
if err != nil {
|
|
utils.GetLogInstance().Debug("Error adding new block to blockchain", "Error", err)
|
|
return false
|
|
}
|
|
ss.syncMux.Lock()
|
|
if err := worker.UpdateCurrent(); err != nil {
|
|
ctxerror.Warn(utils.GetLogger(), err, "(*Worker).UpdateCurrent failed")
|
|
}
|
|
ss.syncMux.Unlock()
|
|
utils.GetLogInstance().Info("[SYNC] new block added to blockchain", "blockHeight", bc.CurrentBlock().NumberU64(), "blockHex", bc.CurrentBlock().Hash().Hex())
|
|
return true
|
|
}
|
|
|
|
// generateNewState will construct most recent state from downloaded blocks
|
|
func (ss *StateSync) generateNewState(bc *core.BlockChain, worker *worker.Worker) {
|
|
// update blocks created before node start sync
|
|
parentHash := bc.CurrentBlock().Hash()
|
|
for {
|
|
block := ss.getBlockFromOldBlocksByParentHash(parentHash)
|
|
if block == nil {
|
|
break
|
|
}
|
|
ok := ss.updateBlockAndStatus(block, bc, worker)
|
|
if !ok {
|
|
break
|
|
}
|
|
parentHash = block.Hash()
|
|
}
|
|
ss.syncMux.Lock()
|
|
ss.commonBlocks = make(map[int]*types.Block)
|
|
ss.syncMux.Unlock()
|
|
|
|
// update blocks after node start sync
|
|
parentHash = bc.CurrentBlock().Hash()
|
|
for {
|
|
block := ss.getMaxConsensusBlockFromParentHash(parentHash)
|
|
if block == nil {
|
|
break
|
|
}
|
|
ok := ss.updateBlockAndStatus(block, bc, worker)
|
|
if !ok {
|
|
break
|
|
}
|
|
parentHash = block.Hash()
|
|
}
|
|
// TODO ek – Do we need to hold syncMux now that syncConfig has its onw
|
|
// mutex?
|
|
ss.syncMux.Lock()
|
|
ss.syncConfig.ForEachPeer(func(peer *SyncPeerConfig) (brk bool) {
|
|
peer.newBlocks = []*types.Block{}
|
|
return
|
|
})
|
|
ss.syncMux.Unlock()
|
|
|
|
// update last mile blocks if any
|
|
parentHash = bc.CurrentBlock().Hash()
|
|
for {
|
|
block := ss.getBlockFromLastMileBlocksByParentHash(parentHash)
|
|
if block == nil {
|
|
break
|
|
}
|
|
ok := ss.updateBlockAndStatus(block, bc, worker)
|
|
if !ok {
|
|
break
|
|
}
|
|
parentHash = block.Hash()
|
|
}
|
|
}
|
|
|
|
// ProcessStateSync processes state sync from the blocks received but not yet processed so far
|
|
func (ss *StateSync) ProcessStateSync(startHash []byte, bc *core.BlockChain, worker *worker.Worker, isBeacon bool) {
|
|
if !isBeacon {
|
|
ss.RegisterNodeInfo()
|
|
}
|
|
// Gets consensus hashes.
|
|
if !ss.GetConsensusHashes(startHash) {
|
|
utils.GetLogInstance().Debug("[SYNC] ProcessStateSync unable to reach consensus on ss.GetConsensusHashes")
|
|
return
|
|
}
|
|
ss.generateStateSyncTaskQueue(bc)
|
|
// Download blocks.
|
|
if ss.stateSyncTaskQueue.Len() > 0 {
|
|
ss.downloadBlocks(bc)
|
|
}
|
|
ss.generateNewState(bc, worker)
|
|
}
|
|
|
|
func (peerConfig *SyncPeerConfig) registerToBroadcast(peerHash []byte, ip, port string) error {
|
|
response := peerConfig.client.Register(peerHash, ip, port)
|
|
if response == nil || response.Type == pb.DownloaderResponse_FAIL {
|
|
return ErrRegistrationFail
|
|
} else if response.Type == pb.DownloaderResponse_SUCCESS {
|
|
return nil
|
|
}
|
|
return ErrRegistrationFail
|
|
}
|
|
|
|
// RegisterNodeInfo will register node to peers to accept future new block broadcasting
|
|
// return number of successful registration
|
|
func (ss *StateSync) RegisterNodeInfo() int {
|
|
registrationNumber := RegistrationNumber
|
|
utils.GetLogInstance().Debug("[SYNC] node registration to peers",
|
|
"registrationNumber", registrationNumber,
|
|
"activePeerNumber", len(ss.syncConfig.peers))
|
|
|
|
count := 0
|
|
ss.syncConfig.ForEachPeer(func(peerConfig *SyncPeerConfig) (brk bool) {
|
|
if count >= registrationNumber {
|
|
brk = true
|
|
return
|
|
}
|
|
if peerConfig.ip == ss.selfip && peerConfig.port == GetSyncingPort(ss.selfport) {
|
|
utils.GetLogInstance().Debug("[SYNC] skip self", "peerport", peerConfig.port, "selfport", ss.selfport, "selfsyncport", GetSyncingPort(ss.selfport))
|
|
return
|
|
}
|
|
err := peerConfig.registerToBroadcast(ss.selfPeerHash[:], ss.selfip, ss.selfport)
|
|
if err != nil {
|
|
utils.GetLogInstance().Debug("[SYNC] register failed to peer", "ip", peerConfig.ip, "port", peerConfig.port, "selfPeerHash", ss.selfPeerHash)
|
|
return
|
|
}
|
|
utils.GetLogInstance().Debug("[SYNC] register success", "ip", peerConfig.ip, "port", peerConfig.port)
|
|
count++
|
|
return
|
|
})
|
|
return count
|
|
}
|
|
|
|
// getMaxPeerHeight gets the maximum blockchain heights from peers
|
|
func (ss *StateSync) getMaxPeerHeight() uint64 {
|
|
maxHeight := uint64(0)
|
|
var wg sync.WaitGroup
|
|
ss.syncConfig.ForEachPeer(func(peerConfig *SyncPeerConfig) (brk bool) {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
response := peerConfig.client.GetBlockChainHeight()
|
|
ss.syncMux.Lock()
|
|
if response != nil && maxHeight < response.BlockHeight {
|
|
maxHeight = response.BlockHeight
|
|
}
|
|
ss.syncMux.Unlock()
|
|
}()
|
|
return
|
|
})
|
|
wg.Wait()
|
|
return maxHeight
|
|
}
|
|
|
|
// IsSameBlockchainHeight checks whether the node is out of sync from other peers
|
|
func (ss *StateSync) IsSameBlockchainHeight(bc *core.BlockChain) (uint64, bool) {
|
|
otherHeight := ss.getMaxPeerHeight()
|
|
currentHeight := bc.CurrentBlock().NumberU64()
|
|
return otherHeight, currentHeight == otherHeight
|
|
}
|
|
|
|
// IsOutOfSync checks whether the node is out of sync from other peers
|
|
func (ss *StateSync) IsOutOfSync(bc *core.BlockChain) bool {
|
|
otherHeight := ss.getMaxPeerHeight()
|
|
currentHeight := bc.CurrentBlock().NumberU64()
|
|
utils.GetLogInstance().Debug("[SYNC] IsOutOfSync", "otherHeight", otherHeight, "myHeight", currentHeight)
|
|
return currentHeight+inSyncThreshold < otherHeight
|
|
}
|
|
|
|
// SyncLoop will keep syncing with peers until catches up
|
|
func (ss *StateSync) SyncLoop(bc *core.BlockChain, worker *worker.Worker, willJoinConsensus bool, isBeacon bool) {
|
|
for {
|
|
if !ss.IsOutOfSync(bc) {
|
|
utils.GetLogInstance().Info("[SYNC] Node is now IN SYNC!")
|
|
return
|
|
}
|
|
startHash := bc.CurrentBlock().Hash()
|
|
ss.ProcessStateSync(startHash[:], bc, worker, isBeacon)
|
|
}
|
|
}
|
|
|
|
// GetSyncingPort returns the syncing port.
|
|
func GetSyncingPort(nodePort string) string {
|
|
if port, err := strconv.Atoi(nodePort); err == nil {
|
|
return fmt.Sprintf("%d", port-SyncingPortDifference)
|
|
}
|
|
return ""
|
|
}
|
|
|