|
|
@ -16,6 +16,7 @@ import ( |
|
|
|
pb "github.com/harmony-one/harmony/api/service/syncing/downloader/proto" |
|
|
|
pb "github.com/harmony-one/harmony/api/service/syncing/downloader/proto" |
|
|
|
"github.com/harmony-one/harmony/core" |
|
|
|
"github.com/harmony-one/harmony/core" |
|
|
|
"github.com/harmony-one/harmony/core/types" |
|
|
|
"github.com/harmony-one/harmony/core/types" |
|
|
|
|
|
|
|
"github.com/harmony-one/harmony/internal/ctxerror" |
|
|
|
"github.com/harmony-one/harmony/internal/utils" |
|
|
|
"github.com/harmony-one/harmony/internal/utils" |
|
|
|
"github.com/harmony-one/harmony/node/worker" |
|
|
|
"github.com/harmony-one/harmony/node/worker" |
|
|
|
"github.com/harmony-one/harmony/p2p" |
|
|
|
"github.com/harmony-one/harmony/p2p" |
|
|
@ -55,9 +56,32 @@ type SyncBlockTask struct { |
|
|
|
|
|
|
|
|
|
|
|
// SyncConfig contains an array of SyncPeerConfig.
|
|
|
|
// SyncConfig contains an array of SyncPeerConfig.
|
|
|
|
type SyncConfig struct { |
|
|
|
type SyncConfig struct { |
|
|
|
|
|
|
|
// mtx locks peers, and *SyncPeerConfig pointers in peers.
|
|
|
|
|
|
|
|
// SyncPeerConfig itself is guarded by its own mutex.
|
|
|
|
|
|
|
|
mtx sync.RWMutex |
|
|
|
|
|
|
|
|
|
|
|
peers []*SyncPeerConfig |
|
|
|
peers []*SyncPeerConfig |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// AddPeer adds the given sync peer.
|
|
|
|
|
|
|
|
func (sc *SyncConfig) AddPeer(peer *SyncPeerConfig) { |
|
|
|
|
|
|
|
sc.mtx.Lock() |
|
|
|
|
|
|
|
defer sc.mtx.Unlock() |
|
|
|
|
|
|
|
sc.peers = append(sc.peers, peer) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// ForEachPeer calls the given function with each peer.
|
|
|
|
|
|
|
|
// It breaks the iteration iff the function returns true.
|
|
|
|
|
|
|
|
func (sc *SyncConfig) ForEachPeer(f func(peer *SyncPeerConfig) (brk bool)) { |
|
|
|
|
|
|
|
sc.mtx.RLock() |
|
|
|
|
|
|
|
defer sc.mtx.RUnlock() |
|
|
|
|
|
|
|
for _, peer := range sc.peers { |
|
|
|
|
|
|
|
if f(peer) { |
|
|
|
|
|
|
|
break |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// CreateStateSync returns the implementation of StateSyncInterface interface.
|
|
|
|
// CreateStateSync returns the implementation of StateSyncInterface interface.
|
|
|
|
func CreateStateSync(ip string, port string, peerHash [20]byte) *StateSync { |
|
|
|
func CreateStateSync(ip string, port string, peerHash [20]byte) *StateSync { |
|
|
|
stateSync := &StateSync{} |
|
|
|
stateSync := &StateSync{} |
|
|
@ -74,8 +98,6 @@ type StateSync struct { |
|
|
|
selfip string |
|
|
|
selfip string |
|
|
|
selfport string |
|
|
|
selfport string |
|
|
|
selfPeerHash [20]byte // hash of ip and address combination
|
|
|
|
selfPeerHash [20]byte // hash of ip and address combination
|
|
|
|
peerNumber int |
|
|
|
|
|
|
|
activePeerNumber int |
|
|
|
|
|
|
|
commonBlocks map[int]*types.Block |
|
|
|
commonBlocks map[int]*types.Block |
|
|
|
lastMileBlocks []*types.Block // last mile blocks to catch up with the consensus
|
|
|
|
lastMileBlocks []*types.Block // last mile blocks to catch up with the consensus
|
|
|
|
syncConfig *SyncConfig |
|
|
|
syncConfig *SyncConfig |
|
|
@ -91,25 +113,39 @@ func (ss *StateSync) AddLastMileBlock(block *types.Block) { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// CloseConnections close grpc connections for state sync clients
|
|
|
|
// CloseConnections close grpc connections for state sync clients
|
|
|
|
func (ss *StateSync) CloseConnections() { |
|
|
|
func (sc *SyncConfig) CloseConnections() { |
|
|
|
for _, pc := range ss.syncConfig.peers { |
|
|
|
sc.mtx.RLock() |
|
|
|
if pc.client != nil { |
|
|
|
defer sc.mtx.RUnlock() |
|
|
|
pc.client.Close() |
|
|
|
for _, pc := range sc.peers { |
|
|
|
|
|
|
|
pc.client.Close() |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// FindPeerByHash returns the peer with the given hash, or nil if not found.
|
|
|
|
|
|
|
|
func (sc *SyncConfig) FindPeerByHash(peerHash []byte) *SyncPeerConfig { |
|
|
|
|
|
|
|
sc.mtx.RLock() |
|
|
|
|
|
|
|
defer sc.mtx.RUnlock() |
|
|
|
|
|
|
|
for _, pc := range sc.peers { |
|
|
|
|
|
|
|
if bytes.Compare(pc.peerHash, peerHash) == 0 { |
|
|
|
|
|
|
|
return pc |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
return nil |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// AddNewBlock will add newly received block into state syncing queue
|
|
|
|
// AddNewBlock will add newly received block into state syncing queue
|
|
|
|
func (ss *StateSync) AddNewBlock(peerHash []byte, block *types.Block) { |
|
|
|
func (ss *StateSync) AddNewBlock(peerHash []byte, block *types.Block) { |
|
|
|
for i, pc := range ss.syncConfig.peers { |
|
|
|
pc := ss.syncConfig.FindPeerByHash(peerHash) |
|
|
|
if bytes.Compare(pc.peerHash, peerHash) != 0 { |
|
|
|
if pc == nil { |
|
|
|
continue |
|
|
|
// Received a block with no active peer; just ignore.
|
|
|
|
} |
|
|
|
return |
|
|
|
pc.mux.Lock() |
|
|
|
|
|
|
|
pc.newBlocks = append(pc.newBlocks, block) |
|
|
|
|
|
|
|
pc.mux.Unlock() |
|
|
|
|
|
|
|
utils.GetLogInstance().Debug("[SYNC] new block received", "total", len(ss.syncConfig.peers[i].newBlocks), "blockHeight", block.NumberU64()) |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// TODO ek – we shouldn't mess with SyncPeerConfig's mutex.
|
|
|
|
|
|
|
|
// Factor this into a method, like pc.AddNewBlock(block)
|
|
|
|
|
|
|
|
pc.mux.Lock() |
|
|
|
|
|
|
|
defer pc.mux.Unlock() |
|
|
|
|
|
|
|
pc.newBlocks = append(pc.newBlocks, block) |
|
|
|
|
|
|
|
utils.GetLogInstance().Debug("[SYNC] new block received", "total", len(pc.newBlocks), "blockHeight", block.NumberU64()) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// CreateTestSyncPeerConfig used for testing.
|
|
|
|
// CreateTestSyncPeerConfig used for testing.
|
|
|
@ -138,9 +174,6 @@ func CompareSyncPeerConfigByblockHashes(a *SyncPeerConfig, b *SyncPeerConfig) in |
|
|
|
|
|
|
|
|
|
|
|
// GetBlocks gets blocks by calling grpc request to the corresponding peer.
|
|
|
|
// GetBlocks gets blocks by calling grpc request to the corresponding peer.
|
|
|
|
func (peerConfig *SyncPeerConfig) GetBlocks(hashes [][]byte) ([][]byte, error) { |
|
|
|
func (peerConfig *SyncPeerConfig) GetBlocks(hashes [][]byte) ([][]byte, error) { |
|
|
|
if peerConfig.client == nil { |
|
|
|
|
|
|
|
return nil, ErrSyncPeerConfigClientNotReady |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
response := peerConfig.client.GetBlocks(hashes) |
|
|
|
response := peerConfig.client.GetBlocks(hashes) |
|
|
|
if response == nil { |
|
|
|
if response == nil { |
|
|
|
return nil, ErrGetBlock |
|
|
|
return nil, ErrGetBlock |
|
|
@ -149,71 +182,55 @@ func (peerConfig *SyncPeerConfig) GetBlocks(hashes [][]byte) ([][]byte, error) { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// CreateSyncConfig creates SyncConfig for StateSync object.
|
|
|
|
// CreateSyncConfig creates SyncConfig for StateSync object.
|
|
|
|
func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer) bool { |
|
|
|
func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer) error { |
|
|
|
utils.GetLogInstance().Debug("CreateSyncConfig: len of peers", "len", len(peers)) |
|
|
|
utils.GetLogInstance().Debug("CreateSyncConfig: len of peers", "len", len(peers)) |
|
|
|
if len(peers) == 0 { |
|
|
|
if len(peers) == 0 { |
|
|
|
utils.GetLogInstance().Warn("[SYNC] Unable to get neighbor peers") |
|
|
|
return ctxerror.New("[SYNC] no peers to connect to") |
|
|
|
return false |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
ss.peerNumber = len(peers) |
|
|
|
ss.syncConfig = &SyncConfig{} |
|
|
|
ss.syncConfig = &SyncConfig{ |
|
|
|
|
|
|
|
peers: make([]*SyncPeerConfig, ss.peerNumber), |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
for id := range ss.syncConfig.peers { |
|
|
|
|
|
|
|
ss.syncConfig.peers[id] = &SyncPeerConfig{ |
|
|
|
|
|
|
|
ip: peers[id].IP, |
|
|
|
|
|
|
|
port: peers[id].Port, |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
utils.GetLogInstance().Info("[SYNC] Finished creating SyncConfig") |
|
|
|
|
|
|
|
return true |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// MakeConnectionToPeers makes grpc connection to all peers.
|
|
|
|
|
|
|
|
func (ss *StateSync) MakeConnectionToPeers() { |
|
|
|
|
|
|
|
var wg sync.WaitGroup |
|
|
|
var wg sync.WaitGroup |
|
|
|
wg.Add(ss.peerNumber) |
|
|
|
for _, peer := range peers { |
|
|
|
for id := range ss.syncConfig.peers { |
|
|
|
wg.Add(1) |
|
|
|
go func(peerConfig *SyncPeerConfig) { |
|
|
|
go func(peer p2p.Peer) { |
|
|
|
defer wg.Done() |
|
|
|
defer wg.Done() |
|
|
|
peerConfig.client = downloader.ClientSetup(peerConfig.ip, peerConfig.port) |
|
|
|
client := downloader.ClientSetup(peer.IP, peer.Port) |
|
|
|
}(ss.syncConfig.peers[id]) |
|
|
|
if client == nil { |
|
|
|
|
|
|
|
return |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
peerConfig := &SyncPeerConfig{ |
|
|
|
|
|
|
|
ip: peer.IP, |
|
|
|
|
|
|
|
port: peer.Port, |
|
|
|
|
|
|
|
client: client, |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
ss.syncConfig.AddPeer(peerConfig) |
|
|
|
|
|
|
|
}(peer) |
|
|
|
} |
|
|
|
} |
|
|
|
wg.Wait() |
|
|
|
wg.Wait() |
|
|
|
ss.CleanUpNilPeers() |
|
|
|
|
|
|
|
utils.GetLogInstance().Info("[SYNC] Finished making connection to peers.") |
|
|
|
utils.GetLogInstance().Info("[SYNC] Finished making connection to peers.") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return nil |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// GetActivePeerNumber returns the number of active peers
|
|
|
|
// GetActivePeerNumber returns the number of active peers
|
|
|
|
func (ss *StateSync) GetActivePeerNumber() int { |
|
|
|
func (ss *StateSync) GetActivePeerNumber() int { |
|
|
|
if ss.syncConfig == nil || len(ss.syncConfig.peers) == 0 { |
|
|
|
if ss.syncConfig == nil { |
|
|
|
return 0 |
|
|
|
return 0 |
|
|
|
} |
|
|
|
} |
|
|
|
ss.CleanUpNilPeers() |
|
|
|
// len() is atomic; no need to hold mutex.
|
|
|
|
return ss.activePeerNumber |
|
|
|
return len(ss.syncConfig.peers) |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// CleanUpNilPeers cleans up peer with nil client and recalculate activePeerNumber.
|
|
|
|
|
|
|
|
func (ss *StateSync) CleanUpNilPeers() { |
|
|
|
|
|
|
|
ss.activePeerNumber = 0 |
|
|
|
|
|
|
|
for _, configPeer := range ss.syncConfig.peers { |
|
|
|
|
|
|
|
if configPeer.client != nil { |
|
|
|
|
|
|
|
ss.activePeerNumber++ |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
utils.GetLogInstance().Info("[SYNC] clean up inactive peers", "activeNumber", ss.activePeerNumber) |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// GetHowManyMaxConsensus returns max number of consensus nodes and the first ID of consensus group.
|
|
|
|
// getHowManyMaxConsensus returns max number of consensus nodes and the first ID of consensus group.
|
|
|
|
// Assumption: all peers are sorted by CompareSyncPeerConfigByBlockHashes first.
|
|
|
|
// Assumption: all peers are sorted by CompareSyncPeerConfigByBlockHashes first.
|
|
|
|
func (syncConfig *SyncConfig) GetHowManyMaxConsensus() (int, int) { |
|
|
|
// Caller shall ensure mtx is locked for reading.
|
|
|
|
|
|
|
|
func (sc *SyncConfig) getHowManyMaxConsensus() (int, int) { |
|
|
|
// As all peers are sorted by their blockHashes, all equal blockHashes should come together and consecutively.
|
|
|
|
// As all peers are sorted by their blockHashes, all equal blockHashes should come together and consecutively.
|
|
|
|
curCount := 0 |
|
|
|
curCount := 0 |
|
|
|
curFirstID := -1 |
|
|
|
curFirstID := -1 |
|
|
|
maxCount := 0 |
|
|
|
maxCount := 0 |
|
|
|
maxFirstID := -1 |
|
|
|
maxFirstID := -1 |
|
|
|
for i := range syncConfig.peers { |
|
|
|
for i := range sc.peers { |
|
|
|
if curFirstID == -1 || CompareSyncPeerConfigByblockHashes(syncConfig.peers[curFirstID], syncConfig.peers[i]) != 0 { |
|
|
|
if curFirstID == -1 || CompareSyncPeerConfigByblockHashes(sc.peers[curFirstID], sc.peers[i]) != 0 { |
|
|
|
curCount = 1 |
|
|
|
curCount = 1 |
|
|
|
curFirstID = i |
|
|
|
curFirstID = i |
|
|
|
} else { |
|
|
|
} else { |
|
|
@ -228,40 +245,44 @@ func (syncConfig *SyncConfig) GetHowManyMaxConsensus() (int, int) { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// InitForTesting used for testing.
|
|
|
|
// InitForTesting used for testing.
|
|
|
|
func (syncConfig *SyncConfig) InitForTesting(client *downloader.Client, blockHashes [][]byte) { |
|
|
|
func (sc *SyncConfig) InitForTesting(client *downloader.Client, blockHashes [][]byte) { |
|
|
|
for i := range syncConfig.peers { |
|
|
|
sc.mtx.RLock() |
|
|
|
syncConfig.peers[i].blockHashes = blockHashes |
|
|
|
defer sc.mtx.RUnlock() |
|
|
|
syncConfig.peers[i].client = client |
|
|
|
for i := range sc.peers { |
|
|
|
|
|
|
|
sc.peers[i].blockHashes = blockHashes |
|
|
|
|
|
|
|
sc.peers[i].client = client |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// CleanUpPeers cleans up all peers whose blockHashes are not equal to consensus block hashes.
|
|
|
|
// cleanUpPeers cleans up all peers whose blockHashes are not equal to
|
|
|
|
func (syncConfig *SyncConfig) CleanUpPeers(maxFirstID int) { |
|
|
|
// consensus block hashes. Caller shall ensure mtx is locked for RW.
|
|
|
|
fixedPeer := syncConfig.peers[maxFirstID] |
|
|
|
func (sc *SyncConfig) cleanUpPeers(maxFirstID int) { |
|
|
|
for i := 0; i < len(syncConfig.peers); i++ { |
|
|
|
fixedPeer := sc.peers[maxFirstID] |
|
|
|
if CompareSyncPeerConfigByblockHashes(fixedPeer, syncConfig.peers[i]) != 0 { |
|
|
|
for i := 0; i < len(sc.peers); i++ { |
|
|
|
|
|
|
|
if CompareSyncPeerConfigByblockHashes(fixedPeer, sc.peers[i]) != 0 { |
|
|
|
// TODO: move it into a util delete func.
|
|
|
|
// TODO: move it into a util delete func.
|
|
|
|
// See tip https://github.com/golang/go/wiki/SliceTricks
|
|
|
|
// See tip https://github.com/golang/go/wiki/SliceTricks
|
|
|
|
// Close the client and remove the peer out of the
|
|
|
|
// Close the client and remove the peer out of the
|
|
|
|
syncConfig.peers[i].client.Close() |
|
|
|
sc.peers[i].client.Close() |
|
|
|
copy(syncConfig.peers[i:], syncConfig.peers[i+1:]) |
|
|
|
copy(sc.peers[i:], sc.peers[i+1:]) |
|
|
|
syncConfig.peers[len(syncConfig.peers)-1] = nil |
|
|
|
sc.peers[len(sc.peers)-1] = nil |
|
|
|
syncConfig.peers = syncConfig.peers[:len(syncConfig.peers)-1] |
|
|
|
sc.peers = sc.peers[:len(sc.peers)-1] |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// GetBlockHashesConsensusAndCleanUp chesk if all consensus hashes are equal.
|
|
|
|
// GetBlockHashesConsensusAndCleanUp chesk if all consensus hashes are equal.
|
|
|
|
func (ss *StateSync) GetBlockHashesConsensusAndCleanUp() bool { |
|
|
|
func (sc *SyncConfig) GetBlockHashesConsensusAndCleanUp() bool { |
|
|
|
|
|
|
|
sc.mtx.Lock() |
|
|
|
|
|
|
|
defer sc.mtx.Unlock() |
|
|
|
// Sort all peers by the blockHashes.
|
|
|
|
// Sort all peers by the blockHashes.
|
|
|
|
sort.Slice(ss.syncConfig.peers, func(i, j int) bool { |
|
|
|
sort.Slice(sc.peers, func(i, j int) bool { |
|
|
|
return CompareSyncPeerConfigByblockHashes(ss.syncConfig.peers[i], ss.syncConfig.peers[j]) == -1 |
|
|
|
return CompareSyncPeerConfigByblockHashes(sc.peers[i], sc.peers[j]) == -1 |
|
|
|
}) |
|
|
|
}) |
|
|
|
maxFirstID, maxCount := ss.syncConfig.GetHowManyMaxConsensus() |
|
|
|
maxFirstID, maxCount := sc.getHowManyMaxConsensus() |
|
|
|
utils.GetLogInstance().Info("[SYNC] block consensus hashes", "maxFirstID", maxFirstID, "maxCount", maxCount) |
|
|
|
utils.GetLogInstance().Info("[SYNC] block consensus hashes", "maxFirstID", maxFirstID, "maxCount", maxCount) |
|
|
|
if float64(maxCount) >= ConsensusRatio*float64(ss.activePeerNumber) { |
|
|
|
if float64(maxCount) >= ConsensusRatio*float64(len(sc.peers)) { |
|
|
|
ss.syncConfig.CleanUpPeers(maxFirstID) |
|
|
|
sc.cleanUpPeers(maxFirstID) |
|
|
|
ss.CleanUpNilPeers() |
|
|
|
|
|
|
|
return true |
|
|
|
return true |
|
|
|
} |
|
|
|
} |
|
|
|
return false |
|
|
|
return false |
|
|
@ -272,22 +293,20 @@ func (ss *StateSync) GetConsensusHashes(startHash []byte) bool { |
|
|
|
count := 0 |
|
|
|
count := 0 |
|
|
|
for { |
|
|
|
for { |
|
|
|
var wg sync.WaitGroup |
|
|
|
var wg sync.WaitGroup |
|
|
|
for id := range ss.syncConfig.peers { |
|
|
|
ss.syncConfig.ForEachPeer(func(peerConfig *SyncPeerConfig) (brk bool) { |
|
|
|
if ss.syncConfig.peers[id].client == nil { |
|
|
|
|
|
|
|
continue |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
wg.Add(1) |
|
|
|
wg.Add(1) |
|
|
|
go func(peerConfig *SyncPeerConfig) { |
|
|
|
go func() { |
|
|
|
defer wg.Done() |
|
|
|
defer wg.Done() |
|
|
|
response := peerConfig.client.GetBlockHashes(startHash) |
|
|
|
response := peerConfig.client.GetBlockHashes(startHash) |
|
|
|
if response == nil { |
|
|
|
if response == nil { |
|
|
|
return |
|
|
|
return |
|
|
|
} |
|
|
|
} |
|
|
|
peerConfig.blockHashes = response.Payload |
|
|
|
peerConfig.blockHashes = response.Payload |
|
|
|
}(ss.syncConfig.peers[id]) |
|
|
|
}() |
|
|
|
} |
|
|
|
return |
|
|
|
|
|
|
|
}) |
|
|
|
wg.Wait() |
|
|
|
wg.Wait() |
|
|
|
if ss.GetBlockHashesConsensusAndCleanUp() { |
|
|
|
if ss.syncConfig.GetBlockHashesConsensusAndCleanUp() { |
|
|
|
break |
|
|
|
break |
|
|
|
} |
|
|
|
} |
|
|
|
if count > TimesToFail { |
|
|
|
if count > TimesToFail { |
|
|
@ -303,14 +322,13 @@ func (ss *StateSync) GetConsensusHashes(startHash []byte) bool { |
|
|
|
|
|
|
|
|
|
|
|
func (ss *StateSync) generateStateSyncTaskQueue(bc *core.BlockChain) { |
|
|
|
func (ss *StateSync) generateStateSyncTaskQueue(bc *core.BlockChain) { |
|
|
|
ss.stateSyncTaskQueue = queue.New(0) |
|
|
|
ss.stateSyncTaskQueue = queue.New(0) |
|
|
|
for _, configPeer := range ss.syncConfig.peers { |
|
|
|
ss.syncConfig.ForEachPeer(func(configPeer *SyncPeerConfig) (brk bool) { |
|
|
|
if configPeer.client != nil { |
|
|
|
for id, blockHash := range configPeer.blockHashes { |
|
|
|
for id, blockHash := range configPeer.blockHashes { |
|
|
|
ss.stateSyncTaskQueue.Put(SyncBlockTask{index: id, blockHash: blockHash}) |
|
|
|
ss.stateSyncTaskQueue.Put(SyncBlockTask{index: id, blockHash: blockHash}) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
break |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
brk = true |
|
|
|
|
|
|
|
return |
|
|
|
|
|
|
|
}) |
|
|
|
utils.GetLogInstance().Info("syncing: Finished generateStateSyncTaskQueue", "length", ss.stateSyncTaskQueue.Len()) |
|
|
|
utils.GetLogInstance().Info("syncing: Finished generateStateSyncTaskQueue", "length", ss.stateSyncTaskQueue.Len()) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -318,13 +336,10 @@ func (ss *StateSync) generateStateSyncTaskQueue(bc *core.BlockChain) { |
|
|
|
func (ss *StateSync) downloadBlocks(bc *core.BlockChain) { |
|
|
|
func (ss *StateSync) downloadBlocks(bc *core.BlockChain) { |
|
|
|
// Initialize blockchain
|
|
|
|
// Initialize blockchain
|
|
|
|
var wg sync.WaitGroup |
|
|
|
var wg sync.WaitGroup |
|
|
|
wg.Add(ss.activePeerNumber) |
|
|
|
|
|
|
|
count := 0 |
|
|
|
count := 0 |
|
|
|
for i := range ss.syncConfig.peers { |
|
|
|
ss.syncConfig.ForEachPeer(func(peerConfig *SyncPeerConfig) (brk bool) { |
|
|
|
if ss.syncConfig.peers[i].client == nil { |
|
|
|
wg.Add(1) |
|
|
|
continue |
|
|
|
go func(stateSyncTaskQueue *queue.Queue, bc *core.BlockChain) { |
|
|
|
} |
|
|
|
|
|
|
|
go func(peerConfig *SyncPeerConfig, stateSyncTaskQueue *queue.Queue, bc *core.BlockChain) { |
|
|
|
|
|
|
|
defer wg.Done() |
|
|
|
defer wg.Done() |
|
|
|
for !stateSyncTaskQueue.Empty() { |
|
|
|
for !stateSyncTaskQueue.Empty() { |
|
|
|
task, err := ss.stateSyncTaskQueue.Poll(1, time.Millisecond) |
|
|
|
task, err := ss.stateSyncTaskQueue.Poll(1, time.Millisecond) |
|
|
@ -361,8 +376,9 @@ func (ss *StateSync) downloadBlocks(bc *core.BlockChain) { |
|
|
|
ss.commonBlocks[syncTask.index] = &blockObj |
|
|
|
ss.commonBlocks[syncTask.index] = &blockObj |
|
|
|
ss.syncMux.Unlock() |
|
|
|
ss.syncMux.Unlock() |
|
|
|
} |
|
|
|
} |
|
|
|
}(ss.syncConfig.peers[i], ss.stateSyncTaskQueue, bc) |
|
|
|
}(ss.stateSyncTaskQueue, bc) |
|
|
|
} |
|
|
|
return |
|
|
|
|
|
|
|
}) |
|
|
|
wg.Wait() |
|
|
|
wg.Wait() |
|
|
|
utils.GetLogInstance().Info("[SYNC] Finished downloadBlocks.") |
|
|
|
utils.GetLogInstance().Info("[SYNC] Finished downloadBlocks.") |
|
|
|
} |
|
|
|
} |
|
|
@ -399,8 +415,7 @@ func GetHowManyMaxConsensus(blocks []*types.Block) (int, int) { |
|
|
|
func (ss *StateSync) getMaxConsensusBlockFromParentHash(parentHash common.Hash) *types.Block { |
|
|
|
func (ss *StateSync) getMaxConsensusBlockFromParentHash(parentHash common.Hash) *types.Block { |
|
|
|
candidateBlocks := []*types.Block{} |
|
|
|
candidateBlocks := []*types.Block{} |
|
|
|
ss.syncMux.Lock() |
|
|
|
ss.syncMux.Lock() |
|
|
|
for id := range ss.syncConfig.peers { |
|
|
|
ss.syncConfig.ForEachPeer(func(peerConfig *SyncPeerConfig) (brk bool) { |
|
|
|
peerConfig := ss.syncConfig.peers[id] |
|
|
|
|
|
|
|
for _, block := range peerConfig.newBlocks { |
|
|
|
for _, block := range peerConfig.newBlocks { |
|
|
|
ph := block.ParentHash() |
|
|
|
ph := block.ParentHash() |
|
|
|
if bytes.Compare(ph[:], parentHash[:]) == 0 { |
|
|
|
if bytes.Compare(ph[:], parentHash[:]) == 0 { |
|
|
@ -408,7 +423,8 @@ func (ss *StateSync) getMaxConsensusBlockFromParentHash(parentHash common.Hash) |
|
|
|
break |
|
|
|
break |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
return |
|
|
|
|
|
|
|
}) |
|
|
|
ss.syncMux.Unlock() |
|
|
|
ss.syncMux.Unlock() |
|
|
|
if len(candidateBlocks) == 0 { |
|
|
|
if len(candidateBlocks) == 0 { |
|
|
|
return nil |
|
|
|
return nil |
|
|
@ -488,10 +504,13 @@ func (ss *StateSync) generateNewState(bc *core.BlockChain, worker *worker.Worker |
|
|
|
} |
|
|
|
} |
|
|
|
parentHash = block.Hash() |
|
|
|
parentHash = block.Hash() |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// TODO ek – Do we need to hold syncMux now that syncConfig has its onw
|
|
|
|
|
|
|
|
// mutex?
|
|
|
|
ss.syncMux.Lock() |
|
|
|
ss.syncMux.Lock() |
|
|
|
for id := range ss.syncConfig.peers { |
|
|
|
ss.syncConfig.ForEachPeer(func(peer *SyncPeerConfig) (brk bool) { |
|
|
|
ss.syncConfig.peers[id].newBlocks = []*types.Block{} |
|
|
|
peer.newBlocks = []*types.Block{} |
|
|
|
} |
|
|
|
return |
|
|
|
|
|
|
|
}) |
|
|
|
ss.syncMux.Unlock() |
|
|
|
ss.syncMux.Unlock() |
|
|
|
|
|
|
|
|
|
|
|
// update last mile blocks if any
|
|
|
|
// update last mile blocks if any
|
|
|
@ -538,42 +557,40 @@ func (peerConfig *SyncPeerConfig) registerToBroadcast(peerHash []byte, ip, port |
|
|
|
// RegisterNodeInfo will register node to peers to accept future new block broadcasting
|
|
|
|
// RegisterNodeInfo will register node to peers to accept future new block broadcasting
|
|
|
|
// return number of successfull registration
|
|
|
|
// return number of successfull registration
|
|
|
|
func (ss *StateSync) RegisterNodeInfo() int { |
|
|
|
func (ss *StateSync) RegisterNodeInfo() int { |
|
|
|
ss.CleanUpNilPeers() |
|
|
|
|
|
|
|
registrationNumber := RegistrationNumber |
|
|
|
registrationNumber := RegistrationNumber |
|
|
|
utils.GetLogInstance().Debug("[SYNC] node registration to peers", "registrationNumber", registrationNumber, "activePeerNumber", ss.activePeerNumber) |
|
|
|
utils.GetLogInstance().Debug("[SYNC] node registration to peers", |
|
|
|
|
|
|
|
"registrationNumber", registrationNumber, |
|
|
|
|
|
|
|
"activePeerNumber", len(ss.syncConfig.peers)) |
|
|
|
|
|
|
|
|
|
|
|
count := 0 |
|
|
|
count := 0 |
|
|
|
for id := range ss.syncConfig.peers { |
|
|
|
ss.syncConfig.ForEachPeer(func(peerConfig *SyncPeerConfig) (brk bool) { |
|
|
|
peerConfig := ss.syncConfig.peers[id] |
|
|
|
|
|
|
|
if count >= registrationNumber { |
|
|
|
if count >= registrationNumber { |
|
|
|
break |
|
|
|
brk = true |
|
|
|
|
|
|
|
return |
|
|
|
} |
|
|
|
} |
|
|
|
if peerConfig.ip == ss.selfip && peerConfig.port == GetSyncingPort(ss.selfport) { |
|
|
|
if peerConfig.ip == ss.selfip && peerConfig.port == GetSyncingPort(ss.selfport) { |
|
|
|
utils.GetLogInstance().Debug("[SYNC] skip self", "peerport", peerConfig.port, "selfport", ss.selfport, "selfsyncport", GetSyncingPort(ss.selfport)) |
|
|
|
utils.GetLogInstance().Debug("[SYNC] skip self", "peerport", peerConfig.port, "selfport", ss.selfport, "selfsyncport", GetSyncingPort(ss.selfport)) |
|
|
|
continue |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
if peerConfig.client == nil { |
|
|
|
|
|
|
|
continue |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
err := peerConfig.registerToBroadcast(ss.selfPeerHash[:], ss.selfip, ss.selfport) |
|
|
|
err := peerConfig.registerToBroadcast(ss.selfPeerHash[:], ss.selfip, ss.selfport) |
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
utils.GetLogInstance().Debug("[SYNC] register failed to peer", "ip", peerConfig.ip, "port", peerConfig.port, "selfPeerHash", ss.selfPeerHash) |
|
|
|
utils.GetLogInstance().Debug("[SYNC] register failed to peer", "ip", peerConfig.ip, "port", peerConfig.port, "selfPeerHash", ss.selfPeerHash) |
|
|
|
continue |
|
|
|
return |
|
|
|
} |
|
|
|
} |
|
|
|
utils.GetLogInstance().Debug("[SYNC] register success", "ip", peerConfig.ip, "port", peerConfig.port) |
|
|
|
utils.GetLogInstance().Debug("[SYNC] register success", "ip", peerConfig.ip, "port", peerConfig.port) |
|
|
|
count++ |
|
|
|
count++ |
|
|
|
} |
|
|
|
return |
|
|
|
|
|
|
|
}) |
|
|
|
return count |
|
|
|
return count |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// getMaxPeerHeight gets the maximum blockchain heights from peers
|
|
|
|
// getMaxPeerHeight gets the maximum blockchain heights from peers
|
|
|
|
func (ss *StateSync) getMaxPeerHeight() uint64 { |
|
|
|
func (ss *StateSync) getMaxPeerHeight() uint64 { |
|
|
|
ss.CleanUpNilPeers() |
|
|
|
|
|
|
|
maxHeight := uint64(0) |
|
|
|
maxHeight := uint64(0) |
|
|
|
var wg sync.WaitGroup |
|
|
|
var wg sync.WaitGroup |
|
|
|
for id := range ss.syncConfig.peers { |
|
|
|
ss.syncConfig.ForEachPeer(func(peerConfig *SyncPeerConfig) (brk bool) { |
|
|
|
wg.Add(1) |
|
|
|
wg.Add(1) |
|
|
|
go func(peerConfig *SyncPeerConfig) { |
|
|
|
go func() { |
|
|
|
defer wg.Done() |
|
|
|
defer wg.Done() |
|
|
|
response := peerConfig.client.GetBlockChainHeight() |
|
|
|
response := peerConfig.client.GetBlockChainHeight() |
|
|
|
ss.syncMux.Lock() |
|
|
|
ss.syncMux.Lock() |
|
|
@ -581,8 +598,9 @@ func (ss *StateSync) getMaxPeerHeight() uint64 { |
|
|
|
maxHeight = response.BlockHeight |
|
|
|
maxHeight = response.BlockHeight |
|
|
|
} |
|
|
|
} |
|
|
|
ss.syncMux.Unlock() |
|
|
|
ss.syncMux.Unlock() |
|
|
|
}(ss.syncConfig.peers[id]) |
|
|
|
}() |
|
|
|
} |
|
|
|
return |
|
|
|
|
|
|
|
}) |
|
|
|
wg.Wait() |
|
|
|
wg.Wait() |
|
|
|
return maxHeight |
|
|
|
return maxHeight |
|
|
|
} |
|
|
|
} |
|
|
|