You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1325 lines
39 KiB
1325 lines
39 KiB
package legacysync
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"math/rand"
|
|
"reflect"
|
|
"sort"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/Workiva/go-datastructures/queue"
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/ethereum/go-ethereum/rlp"
|
|
"github.com/harmony-one/harmony/api/service/legacysync/downloader"
|
|
pb "github.com/harmony-one/harmony/api/service/legacysync/downloader/proto"
|
|
"github.com/harmony-one/harmony/consensus"
|
|
"github.com/harmony-one/harmony/consensus/engine"
|
|
"github.com/harmony-one/harmony/core"
|
|
"github.com/harmony-one/harmony/core/types"
|
|
"github.com/harmony-one/harmony/internal/chain"
|
|
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
|
|
"github.com/harmony-one/harmony/internal/utils"
|
|
"github.com/harmony-one/harmony/node/worker"
|
|
"github.com/harmony-one/harmony/p2p"
|
|
libp2p_peer "github.com/libp2p/go-libp2p/core/peer"
|
|
"github.com/pkg/errors"
|
|
)
|
|
|
|
// Constants for syncing.
|
|
const (
|
|
downloadBlocksRetryLimit = 10 // downloadBlocks service retry limit
|
|
RegistrationNumber = 3
|
|
SyncingPortDifference = 3000
|
|
inSyncThreshold = 0 // when peerBlockHeight - myBlockHeight <= inSyncThreshold, it's ready to join consensus
|
|
SyncLoopBatchSize uint32 = 30 // maximum size for one query of block hashes
|
|
verifyHeaderBatchSize uint64 = 100 // block chain header verification batch size (not used for now)
|
|
LastMileBlocksSize = 50
|
|
|
|
// after cutting off a number of connected peers, the result number of peers
|
|
// shall be between numPeersLowBound and numPeersHighBound
|
|
NumPeersLowBound = 3
|
|
numPeersHighBound = 5
|
|
|
|
downloadTaskBatch = 5
|
|
|
|
//LoopMinTime sync loop must take at least as this value, otherwise it waits for it
|
|
LoopMinTime = 0
|
|
)
|
|
|
|
// SyncPeerConfig is peer config to sync.
|
|
type SyncPeerConfig struct {
|
|
peer p2p.Peer
|
|
peerHash []byte
|
|
client *downloader.Client
|
|
blockHashes [][]byte // block hashes before node doing sync
|
|
newBlocks []*types.Block // blocks after node doing sync
|
|
mux sync.Mutex
|
|
}
|
|
|
|
// GetClient returns client pointer of downloader.Client
|
|
func (peerConfig *SyncPeerConfig) GetClient() *downloader.Client {
|
|
return peerConfig.client
|
|
}
|
|
|
|
// IsEqual checks the equality between two sync peers
|
|
func (peerConfig *SyncPeerConfig) IsEqual(pc2 *SyncPeerConfig) bool {
|
|
return peerConfig.peer.IP == pc2.peer.IP && peerConfig.peer.Port == pc2.peer.Port
|
|
}
|
|
|
|
// SyncBlockTask is the task struct to sync a specific block.
|
|
type SyncBlockTask struct {
|
|
index int
|
|
blockHash []byte
|
|
}
|
|
|
|
type syncBlockTasks []SyncBlockTask
|
|
|
|
func (tasks syncBlockTasks) blockHashes() [][]byte {
|
|
hashes := make([][]byte, 0, len(tasks))
|
|
for _, task := range tasks {
|
|
hash := make([]byte, len(task.blockHash))
|
|
copy(hash, task.blockHash)
|
|
hashes = append(hashes, task.blockHash)
|
|
}
|
|
return hashes
|
|
}
|
|
|
|
func (tasks syncBlockTasks) blockHashesStr() []string {
|
|
hashes := make([]string, 0, len(tasks))
|
|
for _, task := range tasks {
|
|
hash := hex.EncodeToString(task.blockHash)
|
|
hashes = append(hashes, hash)
|
|
}
|
|
return hashes
|
|
}
|
|
|
|
func (tasks syncBlockTasks) indexes() []int {
|
|
indexes := make([]int, 0, len(tasks))
|
|
for _, task := range tasks {
|
|
indexes = append(indexes, task.index)
|
|
}
|
|
return indexes
|
|
}
|
|
|
|
// SyncConfig contains an array of SyncPeerConfig.
|
|
type SyncConfig struct {
|
|
// mtx locks peers, and *SyncPeerConfig pointers in peers.
|
|
// SyncPeerConfig itself is guarded by its own mutex.
|
|
mtx sync.RWMutex
|
|
|
|
peers []*SyncPeerConfig
|
|
shardID uint32
|
|
selfPeerID libp2p_peer.ID
|
|
}
|
|
|
|
func NewSyncConfig(shardID uint32, selfPeerID libp2p_peer.ID, peers []*SyncPeerConfig) *SyncConfig {
|
|
return &SyncConfig{
|
|
peers: peers,
|
|
shardID: shardID,
|
|
selfPeerID: selfPeerID,
|
|
}
|
|
}
|
|
|
|
func (sc *SyncConfig) ShardID() uint32 {
|
|
return sc.shardID
|
|
}
|
|
|
|
// AddPeer adds the given sync peer.
|
|
func (sc *SyncConfig) AddPeer(peer *SyncPeerConfig) {
|
|
sc.mtx.Lock()
|
|
defer sc.mtx.Unlock()
|
|
|
|
// Ensure no duplicate peers
|
|
for _, p2 := range sc.peers {
|
|
if peer.IsEqual(p2) {
|
|
return
|
|
}
|
|
if peer.peer.PeerID == sc.selfPeerID {
|
|
return
|
|
}
|
|
}
|
|
sc.peers = append(sc.peers, peer)
|
|
}
|
|
|
|
func (sc *SyncConfig) GetPeers() []*SyncPeerConfig {
|
|
sc.mtx.RLock()
|
|
defer sc.mtx.RUnlock()
|
|
out := make([]*SyncPeerConfig, len(sc.peers))
|
|
copy(out, sc.peers)
|
|
return out
|
|
}
|
|
|
|
// ForEachPeer calls the given function with each peer.
|
|
// It breaks the iteration iff the function returns true.
|
|
func (sc *SyncConfig) ForEachPeer(f func(peer *SyncPeerConfig) (brk bool)) {
|
|
sc.mtx.RLock()
|
|
peers := make([]*SyncPeerConfig, len(sc.peers))
|
|
copy(peers, sc.peers)
|
|
sc.mtx.RUnlock()
|
|
|
|
for _, peer := range peers {
|
|
if f(peer) {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
func (sc *SyncConfig) PeersCount() int {
|
|
if sc == nil {
|
|
return 0
|
|
}
|
|
sc.mtx.RLock()
|
|
defer sc.mtx.RUnlock()
|
|
return len(sc.peers)
|
|
}
|
|
|
|
// RemovePeer removes a peer from SyncConfig
|
|
func (sc *SyncConfig) RemovePeer(peer *SyncPeerConfig, reason string) {
|
|
sc.mtx.Lock()
|
|
defer sc.mtx.Unlock()
|
|
|
|
closeReason := fmt.Sprintf("remove peer (reason: %s)", reason)
|
|
peer.client.Close(closeReason)
|
|
for i, p := range sc.peers {
|
|
if p == peer {
|
|
sc.peers = append(sc.peers[:i], sc.peers[i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
utils.Logger().Info().
|
|
Str("peerIP", peer.peer.IP).
|
|
Str("peerPortMsg", peer.peer.Port).
|
|
Str("reason", reason).
|
|
Msg("[SYNC] remove GRPC peer")
|
|
}
|
|
|
|
// CreateStateSync returns the implementation of StateSyncInterface interface.
|
|
func CreateStateSync(bc blockChain, ip string, port string, peerHash [20]byte, peerID libp2p_peer.ID, isExplorer bool, role nodeconfig.Role) *StateSync {
|
|
stateSync := &StateSync{}
|
|
stateSync.blockChain = bc
|
|
stateSync.selfip = ip
|
|
stateSync.selfport = port
|
|
stateSync.selfPeerHash = peerHash
|
|
stateSync.commonBlocks = make(map[int]*types.Block)
|
|
stateSync.lastMileBlocks = []*types.Block{}
|
|
stateSync.isExplorer = isExplorer
|
|
stateSync.syncConfig = NewSyncConfig(bc.ShardID(), peerID, nil)
|
|
|
|
stateSync.syncStatus = newSyncStatus(role)
|
|
return stateSync
|
|
}
|
|
|
|
// Small subset from Blockchain struct.
|
|
type blockChain interface {
|
|
CurrentBlock() *types.Block
|
|
ShardID() uint32
|
|
}
|
|
|
|
// StateSync is the struct that implements StateSyncInterface.
|
|
type StateSync struct {
|
|
blockChain blockChain
|
|
selfip string
|
|
selfport string
|
|
selfPeerHash [20]byte // hash of ip and address combination
|
|
commonBlocks map[int]*types.Block
|
|
lastMileBlocks []*types.Block // last mile blocks to catch up with the consensus
|
|
syncConfig *SyncConfig
|
|
isExplorer bool
|
|
stateSyncTaskQueue *queue.Queue
|
|
syncMux sync.Mutex
|
|
lastMileMux sync.Mutex
|
|
|
|
syncStatus syncStatus
|
|
}
|
|
|
|
func (ss *StateSync) IntoEpochSync() *EpochSync {
|
|
return &EpochSync{
|
|
beaconChain: ss.blockChain,
|
|
selfip: ss.selfip,
|
|
selfport: ss.selfport,
|
|
selfPeerHash: ss.selfPeerHash,
|
|
commonBlocks: ss.commonBlocks,
|
|
lastMileBlocks: ss.lastMileBlocks,
|
|
syncConfig: ss.syncConfig,
|
|
isExplorer: ss.isExplorer,
|
|
stateSyncTaskQueue: ss.stateSyncTaskQueue,
|
|
syncMux: sync.Mutex{},
|
|
lastMileMux: sync.Mutex{},
|
|
syncStatus: ss.syncStatus.Clone(),
|
|
}
|
|
}
|
|
|
|
func (ss *StateSync) purgeAllBlocksFromCache() {
|
|
ss.lastMileMux.Lock()
|
|
ss.lastMileBlocks = nil
|
|
ss.lastMileMux.Unlock()
|
|
|
|
ss.syncMux.Lock()
|
|
defer ss.syncMux.Unlock()
|
|
ss.commonBlocks = make(map[int]*types.Block)
|
|
|
|
ss.syncConfig.ForEachPeer(func(configPeer *SyncPeerConfig) (brk bool) {
|
|
configPeer.blockHashes = nil
|
|
configPeer.newBlocks = nil
|
|
return
|
|
})
|
|
}
|
|
|
|
func (ss *StateSync) purgeOldBlocksFromCache() {
|
|
ss.syncMux.Lock()
|
|
defer ss.syncMux.Unlock()
|
|
ss.commonBlocks = make(map[int]*types.Block)
|
|
ss.syncConfig.ForEachPeer(func(configPeer *SyncPeerConfig) (brk bool) {
|
|
configPeer.blockHashes = nil
|
|
return
|
|
})
|
|
}
|
|
|
|
// AddLastMileBlock add the latest a few block into queue for syncing
|
|
// only keep the latest blocks with size capped by LastMileBlocksSize
|
|
func (ss *StateSync) AddLastMileBlock(block *types.Block) {
|
|
ss.lastMileMux.Lock()
|
|
defer ss.lastMileMux.Unlock()
|
|
if ss.lastMileBlocks != nil {
|
|
if len(ss.lastMileBlocks) >= LastMileBlocksSize {
|
|
ss.lastMileBlocks = ss.lastMileBlocks[1:]
|
|
}
|
|
ss.lastMileBlocks = append(ss.lastMileBlocks, block)
|
|
}
|
|
}
|
|
|
|
// CloseConnections close grpc connections for state sync clients
|
|
func (sc *SyncConfig) CloseConnections() {
|
|
sc.mtx.RLock()
|
|
defer sc.mtx.RUnlock()
|
|
for _, pc := range sc.peers {
|
|
pc.client.Close("close all connections")
|
|
}
|
|
}
|
|
|
|
// FindPeerByHash returns the peer with the given hash, or nil if not found.
|
|
func (sc *SyncConfig) FindPeerByHash(peerHash []byte) *SyncPeerConfig {
|
|
sc.mtx.RLock()
|
|
defer sc.mtx.RUnlock()
|
|
for _, pc := range sc.peers {
|
|
if bytes.Equal(pc.peerHash, peerHash) {
|
|
return pc
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// AddNewBlock will add newly received block into state syncing queue
|
|
func (ss *StateSync) AddNewBlock(peerHash []byte, block *types.Block) {
|
|
pc := ss.syncConfig.FindPeerByHash(peerHash)
|
|
if pc == nil {
|
|
// Received a block with no active peer; just ignore.
|
|
return
|
|
}
|
|
// TODO ek – we shouldn't mess with SyncPeerConfig's mutex.
|
|
// Factor this into a method, like pc.AddNewBlock(block)
|
|
pc.mux.Lock()
|
|
defer pc.mux.Unlock()
|
|
pc.newBlocks = append(pc.newBlocks, block)
|
|
utils.Logger().Debug().
|
|
Int("total", len(pc.newBlocks)).
|
|
Uint64("blockHeight", block.NumberU64()).
|
|
Msg("[SYNC] new block received")
|
|
}
|
|
|
|
// CreateTestSyncPeerConfig used for testing.
|
|
func CreateTestSyncPeerConfig(client *downloader.Client, blockHashes [][]byte) *SyncPeerConfig {
|
|
return &SyncPeerConfig{
|
|
client: client,
|
|
blockHashes: blockHashes,
|
|
}
|
|
}
|
|
|
|
// CompareSyncPeerConfigByblockHashes compares two SyncPeerConfig by blockHashes.
|
|
func CompareSyncPeerConfigByblockHashes(a *SyncPeerConfig, b *SyncPeerConfig) int {
|
|
if len(a.blockHashes) != len(b.blockHashes) {
|
|
if len(a.blockHashes) < len(b.blockHashes) {
|
|
return -1
|
|
}
|
|
return 1
|
|
}
|
|
for id := range a.blockHashes {
|
|
if !reflect.DeepEqual(a.blockHashes[id], b.blockHashes[id]) {
|
|
return bytes.Compare(a.blockHashes[id], b.blockHashes[id])
|
|
}
|
|
}
|
|
return 0
|
|
}
|
|
|
|
// BlockWithSig the serialization structure for request DownloaderRequest_BLOCKWITHSIG
|
|
// The block is encoded as block + commit signature
|
|
type BlockWithSig struct {
|
|
Block *types.Block
|
|
CommitSigAndBitmap []byte
|
|
}
|
|
|
|
// GetBlocks gets blocks by calling grpc request to the corresponding peer.
|
|
func (peerConfig *SyncPeerConfig) GetBlocks(hashes [][]byte) ([][]byte, error) {
|
|
response := peerConfig.client.GetBlocksAndSigs(hashes)
|
|
if response == nil {
|
|
return nil, ErrGetBlock
|
|
}
|
|
return response.Payload, nil
|
|
}
|
|
|
|
// CreateSyncConfig creates SyncConfig for StateSync object.
|
|
func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer, shardID uint32, selfPeerID libp2p_peer.ID, waitForEachPeerToConnect bool) error {
|
|
var err error
|
|
ss.syncConfig, err = createSyncConfig(ss.syncConfig, peers, shardID, selfPeerID, waitForEachPeerToConnect)
|
|
return err
|
|
}
|
|
|
|
// checkPeersDuplicity checks whether there are duplicates in p2p.Peer
|
|
func checkPeersDuplicity(ps []p2p.Peer) error {
|
|
type peerDupID struct {
|
|
ip string
|
|
port string
|
|
}
|
|
m := make(map[peerDupID]struct{})
|
|
for _, p := range ps {
|
|
dip := peerDupID{p.IP, p.Port}
|
|
if _, ok := m[dip]; ok {
|
|
return fmt.Errorf("duplicate peer [%v:%v]", p.IP, p.Port)
|
|
}
|
|
m[dip] = struct{}{}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// limitNumPeers limits number of peers to release some server end sources.
|
|
func limitNumPeers(ps []p2p.Peer, randSeed int64) (int, []p2p.Peer) {
|
|
targetSize := calcNumPeersWithBound(len(ps), NumPeersLowBound, numPeersHighBound)
|
|
if len(ps) <= targetSize {
|
|
return len(ps), ps
|
|
}
|
|
|
|
r := rand.New(rand.NewSource(randSeed))
|
|
r.Shuffle(len(ps), func(i, j int) { ps[i], ps[j] = ps[j], ps[i] })
|
|
|
|
return targetSize, ps
|
|
}
|
|
|
|
// Peers are expected to limited at half of the size, capped between lowBound and highBound.
|
|
func calcNumPeersWithBound(size int, lowBound, highBound int) int {
|
|
if size < lowBound {
|
|
return size
|
|
}
|
|
expLen := size / 2
|
|
if expLen < lowBound {
|
|
expLen = lowBound
|
|
}
|
|
if expLen > highBound {
|
|
expLen = highBound
|
|
}
|
|
return expLen
|
|
}
|
|
|
|
// GetActivePeerNumber returns the number of active peers
|
|
func (ss *StateSync) GetActivePeerNumber() int {
|
|
if ss.syncConfig == nil {
|
|
return 0
|
|
}
|
|
// len() is atomic; no need to hold mutex.
|
|
return len(ss.syncConfig.peers)
|
|
}
|
|
|
|
// getHowManyMaxConsensus returns max number of consensus nodes and the first ID of consensus group.
|
|
// Assumption: all peers are sorted by CompareSyncPeerConfigByBlockHashes first.
|
|
// Caller shall ensure mtx is locked for reading.
|
|
func (sc *SyncConfig) getHowManyMaxConsensus() (int, int) {
|
|
// As all peers are sorted by their blockHashes, all equal blockHashes should come together and consecutively.
|
|
curCount := 0
|
|
curFirstID := -1
|
|
maxCount := 0
|
|
maxFirstID := -1
|
|
for i := range sc.peers {
|
|
if curFirstID == -1 || CompareSyncPeerConfigByblockHashes(sc.peers[curFirstID], sc.peers[i]) != 0 {
|
|
curCount = 1
|
|
curFirstID = i
|
|
} else {
|
|
curCount++
|
|
}
|
|
if curCount >= maxCount {
|
|
maxCount = curCount
|
|
maxFirstID = curFirstID
|
|
}
|
|
}
|
|
return maxFirstID, maxCount
|
|
}
|
|
|
|
// InitForTesting used for testing.
|
|
func (sc *SyncConfig) InitForTesting(client *downloader.Client, blockHashes [][]byte) {
|
|
sc.mtx.RLock()
|
|
defer sc.mtx.RUnlock()
|
|
for i := range sc.peers {
|
|
sc.peers[i].blockHashes = blockHashes
|
|
sc.peers[i].client = client
|
|
}
|
|
}
|
|
|
|
// cleanUpPeers cleans up all peers whose blockHashes are not equal to
|
|
// consensus block hashes. Caller shall ensure mtx is locked for RW.
|
|
func (sc *SyncConfig) cleanUpPeers(maxFirstID int) {
|
|
fixedPeer := sc.peers[maxFirstID]
|
|
|
|
var removedPeers int
|
|
for i := 0; i < len(sc.peers); i++ {
|
|
if CompareSyncPeerConfigByblockHashes(fixedPeer, sc.peers[i]) != 0 {
|
|
// TODO: move it into a util delete func.
|
|
// See tip https://github.com/golang/go/wiki/SliceTricks
|
|
// Close the client and remove the peer out of the
|
|
sc.peers[i].client.Close("cleanup peers")
|
|
copy(sc.peers[i:], sc.peers[i+1:])
|
|
sc.peers[len(sc.peers)-1] = nil
|
|
sc.peers = sc.peers[:len(sc.peers)-1]
|
|
removedPeers++
|
|
}
|
|
}
|
|
utils.Logger().Info().Int("removed peers", removedPeers).Msg("[SYNC] post cleanUpPeers")
|
|
}
|
|
|
|
// GetBlockHashesConsensusAndCleanUp selects the most common peer config based on their block hashes to download/sync.
|
|
// Note that choosing the most common peer config does not guarantee that the blocks to be downloaded are the correct ones.
|
|
// The subsequent node syncing steps of verifying the block header chain will give such confirmation later.
|
|
// If later block header verification fails with the sync peer config chosen here, the entire sync loop gets retried with a new peer set.
|
|
func (sc *SyncConfig) GetBlockHashesConsensusAndCleanUp() error {
|
|
sc.mtx.Lock()
|
|
defer sc.mtx.Unlock()
|
|
// Sort all peers by the blockHashes.
|
|
sort.Slice(sc.peers, func(i, j int) bool {
|
|
return CompareSyncPeerConfigByblockHashes(sc.peers[i], sc.peers[j]) == -1
|
|
})
|
|
maxFirstID, maxCount := sc.getHowManyMaxConsensus()
|
|
|
|
if maxFirstID == -1 {
|
|
return errors.New("invalid peer index -1 for block hashes query")
|
|
}
|
|
utils.Logger().Info().
|
|
Int("maxFirstID", maxFirstID).
|
|
Str("targetPeerIP", sc.peers[maxFirstID].peer.IP).
|
|
Int("maxCount", maxCount).
|
|
Int("hashSize", len(sc.peers[maxFirstID].blockHashes)).
|
|
Msg("[SYNC] block consensus hashes")
|
|
|
|
sc.cleanUpPeers(maxFirstID)
|
|
return nil
|
|
}
|
|
|
|
// getConsensusHashes gets all hashes needed to download.
|
|
func (ss *StateSync) getConsensusHashes(startHash []byte, size uint32) error {
|
|
var wg sync.WaitGroup
|
|
ss.syncConfig.ForEachPeer(func(peerConfig *SyncPeerConfig) (brk bool) {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
|
|
response := peerConfig.client.GetBlockHashes(startHash, size, ss.selfip, ss.selfport)
|
|
if response == nil {
|
|
utils.Logger().Warn().
|
|
Str("peerIP", peerConfig.peer.IP).
|
|
Str("peerPort", peerConfig.peer.Port).
|
|
Msg("[SYNC] getConsensusHashes Nil Response")
|
|
ss.syncConfig.RemovePeer(peerConfig, fmt.Sprintf("StateSync %d: nil response for GetBlockHashes", ss.blockChain.ShardID()))
|
|
return
|
|
}
|
|
utils.Logger().Info().Uint32("queried blockHash size", size).
|
|
Int("got blockHashSize", len(response.Payload)).
|
|
Str("PeerIP", peerConfig.peer.IP).
|
|
Msg("[SYNC] GetBlockHashes")
|
|
if len(response.Payload) > int(size+1) {
|
|
utils.Logger().Warn().
|
|
Uint32("requestSize", size).
|
|
Int("respondSize", len(response.Payload)).
|
|
Msg("[SYNC] getConsensusHashes: receive more blockHashes than requested!")
|
|
peerConfig.blockHashes = response.Payload[:size+1]
|
|
} else {
|
|
peerConfig.blockHashes = response.Payload
|
|
}
|
|
}()
|
|
return
|
|
})
|
|
wg.Wait()
|
|
if err := ss.syncConfig.GetBlockHashesConsensusAndCleanUp(); err != nil {
|
|
return err
|
|
}
|
|
utils.Logger().Info().Msg("[SYNC] Finished getting consensus block hashes")
|
|
return nil
|
|
}
|
|
|
|
func (ss *StateSync) generateStateSyncTaskQueue(bc core.BlockChain) {
|
|
ss.stateSyncTaskQueue = queue.New(0)
|
|
ss.syncConfig.ForEachPeer(func(configPeer *SyncPeerConfig) (brk bool) {
|
|
for id, blockHash := range configPeer.blockHashes {
|
|
if err := ss.stateSyncTaskQueue.Put(SyncBlockTask{index: id, blockHash: blockHash}); err != nil {
|
|
utils.Logger().Warn().
|
|
Err(err).
|
|
Int("taskIndex", id).
|
|
Str("taskBlock", hex.EncodeToString(blockHash)).
|
|
Msg("[SYNC] generateStateSyncTaskQueue: cannot add task")
|
|
}
|
|
}
|
|
brk = true
|
|
return
|
|
})
|
|
utils.Logger().Info().Int64("length", ss.stateSyncTaskQueue.Len()).Msg("[SYNC] generateStateSyncTaskQueue: finished")
|
|
}
|
|
|
|
// downloadBlocks downloads blocks from state sync task queue.
|
|
func (ss *StateSync) downloadBlocks(bc core.BlockChain) {
|
|
// Initialize blockchain
|
|
var wg sync.WaitGroup
|
|
count := 0
|
|
taskQueue := downloadTaskQueue{ss.stateSyncTaskQueue}
|
|
ss.syncConfig.ForEachPeer(func(peerConfig *SyncPeerConfig) (brk bool) {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
for !taskQueue.empty() {
|
|
tasks, err := taskQueue.poll(downloadTaskBatch, time.Millisecond)
|
|
if err != nil || len(tasks) == 0 {
|
|
if err == queue.ErrDisposed {
|
|
continue
|
|
}
|
|
utils.Logger().Error().Err(err).Msg("[SYNC] downloadBlocks: ss.stateSyncTaskQueue poll timeout")
|
|
break
|
|
}
|
|
payload, err := peerConfig.GetBlocks(tasks.blockHashes())
|
|
if err != nil {
|
|
utils.Logger().Warn().Err(err).
|
|
Str("peerID", peerConfig.peer.IP).
|
|
Str("port", peerConfig.peer.Port).
|
|
Msg("[SYNC] downloadBlocks: GetBlocks failed")
|
|
ss.syncConfig.RemovePeer(peerConfig, fmt.Sprintf("StateSync %d: error returned for GetBlocks: %s", ss.blockChain.ShardID(), err.Error()))
|
|
return
|
|
}
|
|
if len(payload) == 0 {
|
|
count++
|
|
utils.Logger().Error().Int("failNumber", count).
|
|
Msg("[SYNC] downloadBlocks: no more retrievable blocks")
|
|
if count > downloadBlocksRetryLimit {
|
|
break
|
|
}
|
|
if err := taskQueue.put(tasks); err != nil {
|
|
utils.Logger().Warn().
|
|
Err(err).
|
|
Interface("taskIndexes", tasks.indexes()).
|
|
Interface("taskBlockes", tasks.blockHashesStr()).
|
|
Msg("downloadBlocks: cannot add task")
|
|
}
|
|
continue
|
|
}
|
|
|
|
failedTasks := ss.handleBlockSyncResult(payload, tasks)
|
|
|
|
if len(failedTasks) != 0 {
|
|
count++
|
|
if count > downloadBlocksRetryLimit {
|
|
break
|
|
}
|
|
if err := taskQueue.put(failedTasks); err != nil {
|
|
utils.Logger().Warn().
|
|
Err(err).
|
|
Interface("taskIndexes", failedTasks.indexes()).
|
|
Interface("taskBlockes", tasks.blockHashesStr()).
|
|
Msg("cannot add task")
|
|
}
|
|
continue
|
|
}
|
|
}
|
|
}()
|
|
return
|
|
})
|
|
wg.Wait()
|
|
utils.Logger().Info().Msg("[SYNC] downloadBlocks: finished")
|
|
}
|
|
|
|
func (ss *StateSync) handleBlockSyncResult(payload [][]byte, tasks syncBlockTasks) syncBlockTasks {
|
|
if len(payload) > len(tasks) {
|
|
utils.Logger().Warn().
|
|
Err(errors.New("unexpected number of block delivered")).
|
|
Int("expect", len(tasks)).
|
|
Int("got", len(payload))
|
|
return tasks
|
|
}
|
|
|
|
var failedTasks syncBlockTasks
|
|
if len(payload) < len(tasks) {
|
|
utils.Logger().Warn().
|
|
Err(errors.New("unexpected number of block delivered")).
|
|
Int("expect", len(tasks)).
|
|
Int("got", len(payload))
|
|
failedTasks = append(failedTasks, tasks[len(payload):]...)
|
|
}
|
|
|
|
for i, blockBytes := range payload {
|
|
// For forward compatibility at server side, it can be types.block or BlockWithSig
|
|
blockObj, err := RlpDecodeBlockOrBlockWithSig(blockBytes)
|
|
if err != nil {
|
|
utils.Logger().Warn().
|
|
Err(err).
|
|
Int("taskIndex", tasks[i].index).
|
|
Str("taskBlock", hex.EncodeToString(tasks[i].blockHash)).
|
|
Msg("download block")
|
|
failedTasks = append(failedTasks, tasks[i])
|
|
continue
|
|
}
|
|
gotHash := blockObj.Hash()
|
|
if !bytes.Equal(gotHash[:], tasks[i].blockHash) {
|
|
utils.Logger().Warn().
|
|
Err(errors.New("wrong block delivery")).
|
|
Str("expectHash", hex.EncodeToString(tasks[i].blockHash)).
|
|
Str("gotHash", hex.EncodeToString(gotHash[:]))
|
|
failedTasks = append(failedTasks, tasks[i])
|
|
continue
|
|
}
|
|
ss.syncMux.Lock()
|
|
ss.commonBlocks[tasks[i].index] = blockObj
|
|
ss.syncMux.Unlock()
|
|
}
|
|
return failedTasks
|
|
}
|
|
|
|
// RlpDecodeBlockOrBlockWithSig decode payload to types.Block or BlockWithSig.
|
|
// Return the block with commitSig if set.
|
|
func RlpDecodeBlockOrBlockWithSig(payload []byte) (*types.Block, error) {
|
|
var block *types.Block
|
|
if err := rlp.DecodeBytes(payload, &block); err == nil {
|
|
// received payload as *types.Block
|
|
return block, nil
|
|
}
|
|
|
|
var bws BlockWithSig
|
|
if err := rlp.DecodeBytes(payload, &bws); err == nil {
|
|
block := bws.Block
|
|
block.SetCurrentCommitSig(bws.CommitSigAndBitmap)
|
|
return block, nil
|
|
}
|
|
return nil, errors.New("failed to decode to either types.Block or BlockWithSig")
|
|
}
|
|
|
|
// downloadTaskQueue is wrapper around Queue with item to be SyncBlockTask
|
|
type downloadTaskQueue struct {
|
|
q *queue.Queue
|
|
}
|
|
|
|
func (queue downloadTaskQueue) poll(num int64, timeOut time.Duration) (syncBlockTasks, error) {
|
|
items, err := queue.q.Poll(num, timeOut)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
tasks := make(syncBlockTasks, 0, len(items))
|
|
for _, item := range items {
|
|
task := item.(SyncBlockTask)
|
|
tasks = append(tasks, task)
|
|
}
|
|
return tasks, nil
|
|
}
|
|
|
|
func (queue downloadTaskQueue) put(tasks syncBlockTasks) error {
|
|
for _, task := range tasks {
|
|
if err := queue.q.Put(task); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (queue downloadTaskQueue) empty() bool {
|
|
return queue.q.Empty()
|
|
}
|
|
|
|
// CompareBlockByHash compares two block by hash, it will be used in sort the blocks
|
|
func CompareBlockByHash(a *types.Block, b *types.Block) int {
|
|
ha := a.Hash()
|
|
hb := b.Hash()
|
|
return bytes.Compare(ha[:], hb[:])
|
|
}
|
|
|
|
// GetHowManyMaxConsensus will get the most common blocks and the first such blockID
|
|
func GetHowManyMaxConsensus(blocks []*types.Block) (int, int) {
|
|
// As all peers are sorted by their blockHashes, all equal blockHashes should come together and consecutively.
|
|
curCount := 0
|
|
curFirstID := -1
|
|
maxCount := 0
|
|
maxFirstID := -1
|
|
for i := range blocks {
|
|
if curFirstID == -1 || CompareBlockByHash(blocks[curFirstID], blocks[i]) != 0 {
|
|
curCount = 1
|
|
curFirstID = i
|
|
} else {
|
|
curCount++
|
|
}
|
|
if curCount > maxCount {
|
|
maxCount = curCount
|
|
maxFirstID = curFirstID
|
|
}
|
|
}
|
|
return maxFirstID, maxCount
|
|
}
|
|
|
|
func (ss *StateSync) getMaxConsensusBlockFromParentHash(parentHash common.Hash) *types.Block {
|
|
var (
|
|
candidateBlocks []*types.Block
|
|
candidateLock sync.Mutex
|
|
)
|
|
|
|
ss.syncConfig.ForEachPeer(func(peerConfig *SyncPeerConfig) (brk bool) {
|
|
peerConfig.mux.Lock()
|
|
defer peerConfig.mux.Unlock()
|
|
|
|
for _, block := range peerConfig.newBlocks {
|
|
ph := block.ParentHash()
|
|
if bytes.Equal(ph[:], parentHash[:]) {
|
|
candidateLock.Lock()
|
|
candidateBlocks = append(candidateBlocks, block)
|
|
candidateLock.Unlock()
|
|
break
|
|
}
|
|
}
|
|
return
|
|
})
|
|
if len(candidateBlocks) == 0 {
|
|
return nil
|
|
}
|
|
// Sort by blockHashes.
|
|
sort.Slice(candidateBlocks, func(i, j int) bool {
|
|
return CompareBlockByHash(candidateBlocks[i], candidateBlocks[j]) == -1
|
|
})
|
|
maxFirstID, maxCount := GetHowManyMaxConsensus(candidateBlocks)
|
|
hash := candidateBlocks[maxFirstID].Hash()
|
|
utils.Logger().Debug().
|
|
Hex("parentHash", parentHash[:]).
|
|
Hex("hash", hash[:]).
|
|
Int("maxCount", maxCount).
|
|
Msg("[SYNC] Find block with matching parenthash")
|
|
return candidateBlocks[maxFirstID]
|
|
}
|
|
|
|
func (ss *StateSync) getBlockFromOldBlocksByParentHash(parentHash common.Hash) *types.Block {
|
|
for _, block := range ss.commonBlocks {
|
|
ph := block.ParentHash()
|
|
if bytes.Equal(ph[:], parentHash[:]) {
|
|
return block
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (ss *StateSync) getCommonBlockIter(parentHash common.Hash) *commonBlockIter {
|
|
return newCommonBlockIter(ss.commonBlocks, parentHash)
|
|
}
|
|
|
|
type commonBlockIter struct {
|
|
parentToChild map[common.Hash]*types.Block
|
|
curParentHash common.Hash
|
|
}
|
|
|
|
func newCommonBlockIter(blocks map[int]*types.Block, startHash common.Hash) *commonBlockIter {
|
|
m := make(map[common.Hash]*types.Block)
|
|
for _, block := range blocks {
|
|
m[block.ParentHash()] = block
|
|
}
|
|
return &commonBlockIter{
|
|
parentToChild: m,
|
|
curParentHash: startHash,
|
|
}
|
|
}
|
|
|
|
func (iter *commonBlockIter) Next() *types.Block {
|
|
curBlock, ok := iter.parentToChild[iter.curParentHash]
|
|
if !ok || curBlock == nil {
|
|
return nil
|
|
}
|
|
iter.curParentHash = curBlock.Hash()
|
|
return curBlock
|
|
}
|
|
|
|
func (iter *commonBlockIter) HasNext() bool {
|
|
_, ok := iter.parentToChild[iter.curParentHash]
|
|
return ok
|
|
}
|
|
|
|
func (ss *StateSync) getBlockFromLastMileBlocksByParentHash(parentHash common.Hash) *types.Block {
|
|
for _, block := range ss.lastMileBlocks {
|
|
ph := block.ParentHash()
|
|
if bytes.Equal(ph[:], parentHash[:]) {
|
|
return block
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// UpdateBlockAndStatus ...
|
|
func (ss *StateSync) UpdateBlockAndStatus(block *types.Block, bc core.BlockChain, verifyAllSig bool) error {
|
|
if block.NumberU64() != bc.CurrentBlock().NumberU64()+1 {
|
|
utils.Logger().Debug().Uint64("curBlockNum", bc.CurrentBlock().NumberU64()).Uint64("receivedBlockNum", block.NumberU64()).Msg("[SYNC] Inappropriate block number, ignore!")
|
|
return nil
|
|
}
|
|
|
|
haveCurrentSig := len(block.GetCurrentCommitSig()) != 0
|
|
// Verify block signatures
|
|
if block.NumberU64() > 1 {
|
|
// Verify signature every N blocks (which N is verifyHeaderBatchSize and can be adjusted in configs)
|
|
verifySeal := block.NumberU64()%verifyHeaderBatchSize == 0 || verifyAllSig
|
|
verifyCurrentSig := verifyAllSig && haveCurrentSig
|
|
if verifyCurrentSig {
|
|
sig, bitmap, err := chain.ParseCommitSigAndBitmap(block.GetCurrentCommitSig())
|
|
if err != nil {
|
|
return errors.Wrap(err, "parse commitSigAndBitmap")
|
|
}
|
|
|
|
startTime := time.Now()
|
|
if err := bc.Engine().VerifyHeaderSignature(bc, block.Header(), sig, bitmap); err != nil {
|
|
return errors.Wrapf(err, "verify header signature %v", block.Hash().String())
|
|
}
|
|
utils.Logger().Debug().Int64("elapsed time", time.Now().Sub(startTime).Milliseconds()).Msg("[Sync] VerifyHeaderSignature")
|
|
}
|
|
err := bc.Engine().VerifyHeader(bc, block.Header(), verifySeal)
|
|
if err == engine.ErrUnknownAncestor {
|
|
return err
|
|
} else if err != nil {
|
|
utils.Logger().Error().Err(err).Msgf("[SYNC] UpdateBlockAndStatus: failed verifying signatures for new block %d", block.NumberU64())
|
|
|
|
if !verifyAllSig {
|
|
utils.Logger().Info().Interface("block", bc.CurrentBlock()).Msg("[SYNC] UpdateBlockAndStatus: Rolling back last 99 blocks!")
|
|
for i := uint64(0); i < verifyHeaderBatchSize-1; i++ {
|
|
if rbErr := bc.Rollback([]common.Hash{bc.CurrentBlock().Hash()}); rbErr != nil {
|
|
utils.Logger().Err(rbErr).Msg("[SYNC] UpdateBlockAndStatus: failed to rollback")
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
}
|
|
|
|
_, err := bc.InsertChain([]*types.Block{block}, false /* verifyHeaders */)
|
|
if err != nil {
|
|
utils.Logger().Error().
|
|
Err(err).
|
|
Msgf(
|
|
"[SYNC] UpdateBlockAndStatus: Error adding new block to blockchain %d %d",
|
|
block.NumberU64(),
|
|
block.ShardID(),
|
|
)
|
|
return err
|
|
}
|
|
utils.Logger().Info().
|
|
Uint64("blockHeight", block.NumberU64()).
|
|
Uint64("blockEpoch", block.Epoch().Uint64()).
|
|
Str("blockHex", block.Hash().Hex()).
|
|
Uint32("ShardID", block.ShardID()).
|
|
Msg("[SYNC] UpdateBlockAndStatus: New Block Added to Blockchain")
|
|
|
|
for i, tx := range block.StakingTransactions() {
|
|
utils.Logger().Info().
|
|
Msgf(
|
|
"StakingTxn %d: %s, %v", i, tx.StakingType().String(), tx.StakingMessage(),
|
|
)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// generateNewState will construct most recent state from downloaded blocks
|
|
func (ss *StateSync) generateNewState(bc core.BlockChain, worker *worker.Worker) error {
|
|
// update blocks created before node start sync
|
|
parentHash := bc.CurrentBlock().Hash()
|
|
|
|
var err error
|
|
|
|
commonIter := ss.getCommonBlockIter(parentHash)
|
|
for {
|
|
block := commonIter.Next()
|
|
if block == nil {
|
|
break
|
|
}
|
|
// Enforce sig check for the last block in a batch
|
|
enforceSigCheck := !commonIter.HasNext()
|
|
err = ss.UpdateBlockAndStatus(block, bc, enforceSigCheck)
|
|
if err != nil {
|
|
break
|
|
}
|
|
}
|
|
|
|
ss.syncMux.Lock()
|
|
ss.commonBlocks = make(map[int]*types.Block)
|
|
ss.syncMux.Unlock()
|
|
|
|
// update blocks after node start sync
|
|
parentHash = bc.CurrentBlock().Hash()
|
|
for {
|
|
block := ss.getMaxConsensusBlockFromParentHash(parentHash)
|
|
if block == nil {
|
|
break
|
|
}
|
|
err = ss.UpdateBlockAndStatus(block, bc, true)
|
|
if err != nil {
|
|
break
|
|
}
|
|
parentHash = block.Hash()
|
|
}
|
|
// TODO ek – Do we need to hold syncMux now that syncConfig has its own mutex?
|
|
ss.syncMux.Lock()
|
|
ss.syncConfig.ForEachPeer(func(peer *SyncPeerConfig) (brk bool) {
|
|
peer.newBlocks = []*types.Block{}
|
|
return
|
|
})
|
|
ss.syncMux.Unlock()
|
|
|
|
// update last mile blocks if any
|
|
parentHash = bc.CurrentBlock().Hash()
|
|
for {
|
|
block := ss.getBlockFromLastMileBlocksByParentHash(parentHash)
|
|
if block == nil {
|
|
break
|
|
}
|
|
err = ss.UpdateBlockAndStatus(block, bc, false)
|
|
if err != nil {
|
|
break
|
|
}
|
|
parentHash = block.Hash()
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// ProcessStateSync processes state sync from the blocks received but not yet processed so far
|
|
func (ss *StateSync) ProcessStateSync(startHash []byte, size uint32, bc core.BlockChain, worker *worker.Worker) error {
|
|
// Gets consensus hashes.
|
|
if err := ss.getConsensusHashes(startHash, size); err != nil {
|
|
return errors.Wrap(err, "getConsensusHashes")
|
|
}
|
|
ss.generateStateSyncTaskQueue(bc)
|
|
// Download blocks.
|
|
if ss.stateSyncTaskQueue.Len() > 0 {
|
|
ss.downloadBlocks(bc)
|
|
}
|
|
return ss.generateNewState(bc, worker)
|
|
}
|
|
|
|
func (peerConfig *SyncPeerConfig) registerToBroadcast(peerHash []byte, ip, port string) error {
|
|
response := peerConfig.client.Register(peerHash, ip, port)
|
|
if response == nil || response.Type == pb.DownloaderResponse_FAIL {
|
|
return ErrRegistrationFail
|
|
} else if response.Type == pb.DownloaderResponse_SUCCESS {
|
|
return nil
|
|
}
|
|
return ErrRegistrationFail
|
|
}
|
|
|
|
func (peerConfig *SyncPeerConfig) String() interface{} {
|
|
return fmt.Sprintf("peer: %s:%s ", peerConfig.peer.IP, peerConfig.peer.Port)
|
|
}
|
|
|
|
// RegisterNodeInfo will register node to peers to accept future new block broadcasting
|
|
// return number of successful registration
|
|
func (ss *StateSync) RegisterNodeInfo() int {
|
|
registrationNumber := RegistrationNumber
|
|
utils.Logger().Debug().
|
|
Int("registrationNumber", registrationNumber).
|
|
Int("activePeerNumber", len(ss.syncConfig.peers)).
|
|
Msg("[SYNC] node registration to peers")
|
|
|
|
count := 0
|
|
ss.syncConfig.ForEachPeer(func(peerConfig *SyncPeerConfig) (brk bool) {
|
|
logger := utils.Logger().With().Str("peerPort", peerConfig.peer.Port).Str("peerIP", peerConfig.peer.IP).Logger()
|
|
if count >= registrationNumber {
|
|
brk = true
|
|
return
|
|
}
|
|
if peerConfig.peer.IP == ss.selfip && peerConfig.peer.Port == GetSyncingPort(ss.selfport) {
|
|
logger.Debug().
|
|
Str("selfport", ss.selfport).
|
|
Str("selfsyncport", GetSyncingPort(ss.selfport)).
|
|
Msg("[SYNC] skip self")
|
|
return
|
|
}
|
|
err := peerConfig.registerToBroadcast(ss.selfPeerHash[:], ss.selfip, ss.selfport)
|
|
if err != nil {
|
|
logger.Debug().
|
|
Hex("selfPeerHash", ss.selfPeerHash[:]).
|
|
Msg("[SYNC] register failed to peer")
|
|
return
|
|
}
|
|
|
|
logger.Debug().Msg("[SYNC] register success")
|
|
count++
|
|
return
|
|
})
|
|
return count
|
|
}
|
|
|
|
// IsSameBlockchainHeight checks whether the node is out of sync from other peers
|
|
func (ss *StateSync) IsSameBlockchainHeight(bc core.BlockChain) (uint64, bool) {
|
|
otherHeight := getMaxPeerHeight(ss.syncConfig)
|
|
currentHeight := bc.CurrentBlock().NumberU64()
|
|
return otherHeight, currentHeight == otherHeight
|
|
}
|
|
|
|
// GetMaxPeerHeight ..
|
|
func (ss *StateSync) GetMaxPeerHeight() uint64 {
|
|
return getMaxPeerHeight(ss.syncConfig)
|
|
}
|
|
|
|
// SyncLoop will keep syncing with peers until catches up
|
|
func (ss *StateSync) SyncLoop(bc core.BlockChain, worker *worker.Worker, isBeacon bool, consensus *consensus.Consensus, loopMinTime time.Duration) {
|
|
utils.Logger().Info().Msgf("legacy sync is executing ...")
|
|
if !isBeacon {
|
|
ss.RegisterNodeInfo()
|
|
}
|
|
|
|
for {
|
|
start := time.Now()
|
|
otherHeight := getMaxPeerHeight(ss.syncConfig)
|
|
currentHeight := bc.CurrentBlock().NumberU64()
|
|
if currentHeight >= otherHeight {
|
|
utils.Logger().Info().
|
|
Msgf("[SYNC] Node is now IN SYNC! (isBeacon: %t, ShardID: %d, otherHeight: %d, currentHeight: %d)",
|
|
isBeacon, bc.ShardID(), otherHeight, currentHeight)
|
|
break
|
|
}
|
|
utils.Logger().Info().
|
|
Msgf("[SYNC] Node is OUT OF SYNC (isBeacon: %t, ShardID: %d, otherHeight: %d, currentHeight: %d)",
|
|
isBeacon, bc.ShardID(), otherHeight, currentHeight)
|
|
|
|
startHash := bc.CurrentBlock().Hash()
|
|
size := uint32(otherHeight - currentHeight)
|
|
if size > SyncLoopBatchSize {
|
|
size = SyncLoopBatchSize
|
|
}
|
|
err := ss.ProcessStateSync(startHash[:], size, bc, worker)
|
|
if err != nil {
|
|
utils.Logger().Error().Err(err).
|
|
Msgf("[SYNC] ProcessStateSync failed (isBeacon: %t, ShardID: %d, otherHeight: %d, currentHeight: %d)",
|
|
isBeacon, bc.ShardID(), otherHeight, currentHeight)
|
|
ss.purgeOldBlocksFromCache()
|
|
break
|
|
}
|
|
ss.purgeOldBlocksFromCache()
|
|
|
|
if loopMinTime != 0 {
|
|
waitTime := loopMinTime - time.Since(start)
|
|
c := time.After(waitTime)
|
|
select {
|
|
case <-c:
|
|
}
|
|
}
|
|
}
|
|
if consensus != nil {
|
|
if err := ss.addConsensusLastMile(bc, consensus); err != nil {
|
|
utils.Logger().Error().Err(err).Msg("[SYNC] Add consensus last mile")
|
|
}
|
|
// TODO: move this to explorer handler code.
|
|
if ss.isExplorer {
|
|
consensus.UpdateConsensusInformation()
|
|
}
|
|
}
|
|
utils.Logger().Info().Msgf("legacy sync is executed")
|
|
ss.purgeAllBlocksFromCache()
|
|
}
|
|
|
|
func (ss *StateSync) addConsensusLastMile(bc core.BlockChain, consensus *consensus.Consensus) error {
|
|
curNumber := bc.CurrentBlock().NumberU64()
|
|
blockIter, err := consensus.GetLastMileBlockIter(curNumber + 1)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for {
|
|
block := blockIter.Next()
|
|
if block == nil {
|
|
break
|
|
}
|
|
if _, err := bc.InsertChain(types.Blocks{block}, true); err != nil {
|
|
return errors.Wrap(err, "failed to InsertChain")
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GetSyncingPort returns the syncing port.
|
|
func GetSyncingPort(nodePort string) string {
|
|
if port, err := strconv.Atoi(nodePort); err == nil {
|
|
return fmt.Sprintf("%d", port-SyncingPortDifference)
|
|
}
|
|
return ""
|
|
}
|
|
|
|
const (
|
|
// syncStatusExpiration is the expiration time out of a sync status.
|
|
// If last sync result in memory is before the expiration, the sync status
|
|
// will be updated.
|
|
syncStatusExpiration = 6 * time.Second
|
|
|
|
// syncStatusExpirationNonValidator is the expiration of sync cache for non-validators.
|
|
// Compared with non-validator, the sync check is not as strict as validator nodes.
|
|
// TODO: add this field to harmony config
|
|
syncStatusExpirationNonValidator = 12 * time.Second
|
|
)
|
|
|
|
type (
|
|
syncStatus struct {
|
|
lastResult SyncCheckResult
|
|
lastUpdateTime time.Time
|
|
lock sync.RWMutex
|
|
expiration time.Duration
|
|
}
|
|
|
|
SyncCheckResult struct {
|
|
IsSynchronized bool
|
|
OtherHeight uint64
|
|
HeightDiff uint64
|
|
}
|
|
)
|
|
|
|
func ParseResult(res SyncCheckResult) (IsSynchronized bool, OtherHeight uint64, HeightDiff uint64) {
|
|
IsSynchronized = res.IsSynchronized
|
|
OtherHeight = res.OtherHeight
|
|
HeightDiff = res.HeightDiff
|
|
return IsSynchronized, OtherHeight, HeightDiff
|
|
}
|
|
|
|
func newSyncStatus(role nodeconfig.Role) syncStatus {
|
|
expiration := getSyncStatusExpiration(role)
|
|
return syncStatus{
|
|
expiration: expiration,
|
|
}
|
|
}
|
|
|
|
func getSyncStatusExpiration(role nodeconfig.Role) time.Duration {
|
|
switch role {
|
|
case nodeconfig.Validator:
|
|
return syncStatusExpiration
|
|
case nodeconfig.ExplorerNode:
|
|
return syncStatusExpirationNonValidator
|
|
default:
|
|
return syncStatusExpirationNonValidator
|
|
}
|
|
}
|
|
|
|
func (status *syncStatus) Get(fallback func() SyncCheckResult) SyncCheckResult {
|
|
status.lock.RLock()
|
|
if !status.expired() {
|
|
result := status.lastResult
|
|
status.lock.RUnlock()
|
|
return result
|
|
}
|
|
status.lock.RUnlock()
|
|
|
|
status.lock.Lock()
|
|
defer status.lock.Unlock()
|
|
if status.expired() {
|
|
result := fallback()
|
|
status.update(result)
|
|
}
|
|
return status.lastResult
|
|
}
|
|
|
|
func (status *syncStatus) Clone() syncStatus {
|
|
return syncStatus{
|
|
lastResult: status.lastResult,
|
|
lastUpdateTime: status.lastUpdateTime,
|
|
lock: sync.RWMutex{},
|
|
expiration: status.expiration,
|
|
}
|
|
}
|
|
|
|
func (ss *StateSync) IsSynchronized() bool {
|
|
result := ss.GetSyncStatus()
|
|
return result.IsSynchronized
|
|
}
|
|
|
|
func (status *syncStatus) expired() bool {
|
|
return time.Since(status.lastUpdateTime) > status.expiration
|
|
}
|
|
|
|
func (status *syncStatus) update(result SyncCheckResult) {
|
|
status.lastUpdateTime = time.Now()
|
|
status.lastResult = result
|
|
}
|
|
|
|
// GetSyncStatus get the last sync status for other modules (E.g. RPC, explorer).
|
|
// If the last sync result is not expired, return the sync result immediately.
|
|
// If the last result is expired, ask the remote DNS nodes for latest height and return the result.
|
|
func (ss *StateSync) GetSyncStatus() SyncCheckResult {
|
|
return ss.syncStatus.Get(func() SyncCheckResult {
|
|
return ss.isSynchronized(false)
|
|
})
|
|
}
|
|
|
|
func (ss *StateSync) GetParsedSyncStatus() (IsSynchronized bool, OtherHeight uint64, HeightDiff uint64) {
|
|
res := ss.syncStatus.Get(func() SyncCheckResult {
|
|
return ss.isSynchronized(false)
|
|
})
|
|
return ParseResult(res)
|
|
}
|
|
|
|
// GetSyncStatusDoubleChecked return the sync status when enforcing a immediate query on DNS nodes
|
|
// with a double check to avoid false alarm.
|
|
func (ss *StateSync) GetSyncStatusDoubleChecked() SyncCheckResult {
|
|
result := ss.isSynchronized(true)
|
|
return result
|
|
}
|
|
|
|
func (ss *StateSync) GetParsedSyncStatusDoubleChecked() (IsSynchronized bool, OtherHeight uint64, HeightDiff uint64) {
|
|
result := ss.isSynchronized(true)
|
|
return ParseResult(result)
|
|
}
|
|
|
|
// isSynchronized query the remote DNS node for the latest height to check what is the current
|
|
// sync status
|
|
func (ss *StateSync) isSynchronized(doubleCheck bool) SyncCheckResult {
|
|
if ss.syncConfig == nil {
|
|
return SyncCheckResult{} // If syncConfig is not instantiated, return not in sync
|
|
}
|
|
otherHeight1 := getMaxPeerHeight(ss.syncConfig)
|
|
lastHeight := ss.blockChain.CurrentBlock().NumberU64()
|
|
wasOutOfSync := lastHeight+inSyncThreshold < otherHeight1
|
|
|
|
if !doubleCheck {
|
|
heightDiff := otherHeight1 - lastHeight
|
|
if otherHeight1 < lastHeight {
|
|
heightDiff = 0 //
|
|
}
|
|
utils.Logger().Info().
|
|
Uint64("OtherHeight", otherHeight1).
|
|
Uint64("lastHeight", lastHeight).
|
|
Msg("[SYNC] Checking sync status")
|
|
return SyncCheckResult{
|
|
IsSynchronized: !wasOutOfSync,
|
|
OtherHeight: otherHeight1,
|
|
HeightDiff: heightDiff,
|
|
}
|
|
}
|
|
// double check the sync status after 1 second to confirm (avoid false alarm)
|
|
time.Sleep(1 * time.Second)
|
|
|
|
otherHeight2 := getMaxPeerHeight(ss.syncConfig)
|
|
currentHeight := ss.blockChain.CurrentBlock().NumberU64()
|
|
|
|
isOutOfSync := currentHeight+inSyncThreshold < otherHeight2
|
|
utils.Logger().Info().
|
|
Uint64("OtherHeight1", otherHeight1).
|
|
Uint64("OtherHeight2", otherHeight2).
|
|
Uint64("lastHeight", lastHeight).
|
|
Uint64("currentHeight", currentHeight).
|
|
Msg("[SYNC] Checking sync status")
|
|
// Only confirm out of sync when the node has lower height and didn't move in heights for 2 consecutive checks
|
|
heightDiff := otherHeight2 - lastHeight
|
|
if otherHeight2 < lastHeight {
|
|
heightDiff = 0 // overflow
|
|
}
|
|
return SyncCheckResult{
|
|
IsSynchronized: !(wasOutOfSync && isOutOfSync && lastHeight == currentHeight),
|
|
OtherHeight: otherHeight2,
|
|
HeightDiff: heightDiff,
|
|
}
|
|
}
|
|
|