Fix out-of-range index
pull/692/head
Eugene Kim 6 years ago committed by GitHub
commit 5d19a8ebb5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      api/service/syncing/errors.go
  2. 258
      api/service/syncing/syncing.go
  3. 124
      internal/ctxerror/ctxerror.go
  4. 363
      internal/ctxerror/ctxerror_test.go
  5. 125
      internal/ctxerror/mock/ctxerror.go
  6. 8
      node/node_syncing.go
  7. 2
      scripts/list_harmony_go_files.sh

@ -4,7 +4,6 @@ import "errors"
// Errors ... // Errors ...
var ( var (
ErrSyncPeerConfigClientNotReady = errors.New("[SYNC]: client is not ready")
ErrRegistrationFail = errors.New("[SYNC]: registration failed") ErrRegistrationFail = errors.New("[SYNC]: registration failed")
ErrGetBlock = errors.New("[SYNC]: get block failed") ErrGetBlock = errors.New("[SYNC]: get block failed")
ErrGetBlockHash = errors.New("[SYNC]: get blockhash failed") ErrGetBlockHash = errors.New("[SYNC]: get blockhash failed")

@ -16,6 +16,7 @@ import (
pb "github.com/harmony-one/harmony/api/service/syncing/downloader/proto" pb "github.com/harmony-one/harmony/api/service/syncing/downloader/proto"
"github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/ctxerror"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/node/worker" "github.com/harmony-one/harmony/node/worker"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
@ -55,9 +56,32 @@ type SyncBlockTask struct {
// SyncConfig contains an array of SyncPeerConfig. // SyncConfig contains an array of SyncPeerConfig.
type SyncConfig struct { type SyncConfig struct {
// mtx locks peers, and *SyncPeerConfig pointers in peers.
// SyncPeerConfig itself is guarded by its own mutex.
mtx sync.RWMutex
peers []*SyncPeerConfig peers []*SyncPeerConfig
} }
// AddPeer adds the given sync peer.
func (sc *SyncConfig) AddPeer(peer *SyncPeerConfig) {
sc.mtx.Lock()
defer sc.mtx.Unlock()
sc.peers = append(sc.peers, peer)
}
// ForEachPeer calls the given function with each peer.
// It breaks the iteration iff the function returns true.
func (sc *SyncConfig) ForEachPeer(f func(peer *SyncPeerConfig) (brk bool)) {
sc.mtx.RLock()
defer sc.mtx.RUnlock()
for _, peer := range sc.peers {
if f(peer) {
break
}
}
}
// CreateStateSync returns the implementation of StateSyncInterface interface. // CreateStateSync returns the implementation of StateSyncInterface interface.
func CreateStateSync(ip string, port string, peerHash [20]byte) *StateSync { func CreateStateSync(ip string, port string, peerHash [20]byte) *StateSync {
stateSync := &StateSync{} stateSync := &StateSync{}
@ -74,8 +98,6 @@ type StateSync struct {
selfip string selfip string
selfport string selfport string
selfPeerHash [20]byte // hash of ip and address combination selfPeerHash [20]byte // hash of ip and address combination
peerNumber int
activePeerNumber int
commonBlocks map[int]*types.Block commonBlocks map[int]*types.Block
lastMileBlocks []*types.Block // last mile blocks to catch up with the consensus lastMileBlocks []*types.Block // last mile blocks to catch up with the consensus
syncConfig *SyncConfig syncConfig *SyncConfig
@ -91,25 +113,39 @@ func (ss *StateSync) AddLastMileBlock(block *types.Block) {
} }
// CloseConnections close grpc connections for state sync clients // CloseConnections close grpc connections for state sync clients
func (ss *StateSync) CloseConnections() { func (sc *SyncConfig) CloseConnections() {
for _, pc := range ss.syncConfig.peers { sc.mtx.RLock()
if pc.client != nil { defer sc.mtx.RUnlock()
for _, pc := range sc.peers {
pc.client.Close() pc.client.Close()
} }
}
// FindPeerByHash returns the peer with the given hash, or nil if not found.
func (sc *SyncConfig) FindPeerByHash(peerHash []byte) *SyncPeerConfig {
sc.mtx.RLock()
defer sc.mtx.RUnlock()
for _, pc := range sc.peers {
if bytes.Compare(pc.peerHash, peerHash) == 0 {
return pc
}
} }
return nil
} }
// AddNewBlock will add newly received block into state syncing queue // AddNewBlock will add newly received block into state syncing queue
func (ss *StateSync) AddNewBlock(peerHash []byte, block *types.Block) { func (ss *StateSync) AddNewBlock(peerHash []byte, block *types.Block) {
for i, pc := range ss.syncConfig.peers { pc := ss.syncConfig.FindPeerByHash(peerHash)
if bytes.Compare(pc.peerHash, peerHash) != 0 { if pc == nil {
continue // Received a block with no active peer; just ignore.
return
} }
// TODO ek – we shouldn't mess with SyncPeerConfig's mutex.
// Factor this into a method, like pc.AddNewBlock(block)
pc.mux.Lock() pc.mux.Lock()
defer pc.mux.Unlock()
pc.newBlocks = append(pc.newBlocks, block) pc.newBlocks = append(pc.newBlocks, block)
pc.mux.Unlock() utils.GetLogInstance().Debug("[SYNC] new block received", "total", len(pc.newBlocks), "blockHeight", block.NumberU64())
utils.GetLogInstance().Debug("[SYNC] new block received", "total", len(ss.syncConfig.peers[i].newBlocks), "blockHeight", block.NumberU64())
}
} }
// CreateTestSyncPeerConfig used for testing. // CreateTestSyncPeerConfig used for testing.
@ -138,9 +174,6 @@ func CompareSyncPeerConfigByblockHashes(a *SyncPeerConfig, b *SyncPeerConfig) in
// GetBlocks gets blocks by calling grpc request to the corresponding peer. // GetBlocks gets blocks by calling grpc request to the corresponding peer.
func (peerConfig *SyncPeerConfig) GetBlocks(hashes [][]byte) ([][]byte, error) { func (peerConfig *SyncPeerConfig) GetBlocks(hashes [][]byte) ([][]byte, error) {
if peerConfig.client == nil {
return nil, ErrSyncPeerConfigClientNotReady
}
response := peerConfig.client.GetBlocks(hashes) response := peerConfig.client.GetBlocks(hashes)
if response == nil { if response == nil {
return nil, ErrGetBlock return nil, ErrGetBlock
@ -149,71 +182,55 @@ func (peerConfig *SyncPeerConfig) GetBlocks(hashes [][]byte) ([][]byte, error) {
} }
// CreateSyncConfig creates SyncConfig for StateSync object. // CreateSyncConfig creates SyncConfig for StateSync object.
func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer) bool { func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer) error {
utils.GetLogInstance().Debug("CreateSyncConfig: len of peers", "len", len(peers)) utils.GetLogInstance().Debug("CreateSyncConfig: len of peers", "len", len(peers))
if len(peers) == 0 { if len(peers) == 0 {
utils.GetLogInstance().Warn("[SYNC] Unable to get neighbor peers") return ctxerror.New("[SYNC] no peers to connect to")
return false
}
ss.peerNumber = len(peers)
ss.syncConfig = &SyncConfig{
peers: make([]*SyncPeerConfig, ss.peerNumber),
} }
for id := range ss.syncConfig.peers { ss.syncConfig = &SyncConfig{}
ss.syncConfig.peers[id] = &SyncPeerConfig{
ip: peers[id].IP,
port: peers[id].Port,
}
}
utils.GetLogInstance().Info("[SYNC] Finished creating SyncConfig")
return true
}
// MakeConnectionToPeers makes grpc connection to all peers.
func (ss *StateSync) MakeConnectionToPeers() {
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(ss.peerNumber) for _, peer := range peers {
for id := range ss.syncConfig.peers { wg.Add(1)
go func(peerConfig *SyncPeerConfig) { go func(peer p2p.Peer) {
defer wg.Done() defer wg.Done()
peerConfig.client = downloader.ClientSetup(peerConfig.ip, peerConfig.port) client := downloader.ClientSetup(peer.IP, peer.Port)
}(ss.syncConfig.peers[id]) if client == nil {
return
}
peerConfig := &SyncPeerConfig{
ip: peer.IP,
port: peer.Port,
client: client,
}
ss.syncConfig.AddPeer(peerConfig)
}(peer)
} }
wg.Wait() wg.Wait()
ss.CleanUpNilPeers()
utils.GetLogInstance().Info("[SYNC] Finished making connection to peers.") utils.GetLogInstance().Info("[SYNC] Finished making connection to peers.")
return nil
} }
// GetActivePeerNumber returns the number of active peers // GetActivePeerNumber returns the number of active peers
func (ss *StateSync) GetActivePeerNumber() int { func (ss *StateSync) GetActivePeerNumber() int {
if ss.syncConfig == nil || len(ss.syncConfig.peers) == 0 { if ss.syncConfig == nil {
return 0 return 0
} }
ss.CleanUpNilPeers() // len() is atomic; no need to hold mutex.
return ss.activePeerNumber return len(ss.syncConfig.peers)
}
// CleanUpNilPeers cleans up peer with nil client and recalculate activePeerNumber.
func (ss *StateSync) CleanUpNilPeers() {
ss.activePeerNumber = 0
for _, configPeer := range ss.syncConfig.peers {
if configPeer.client != nil {
ss.activePeerNumber++
}
}
utils.GetLogInstance().Info("[SYNC] clean up inactive peers", "activeNumber", ss.activePeerNumber)
} }
// GetHowManyMaxConsensus returns max number of consensus nodes and the first ID of consensus group. // getHowManyMaxConsensus returns max number of consensus nodes and the first ID of consensus group.
// Assumption: all peers are sorted by CompareSyncPeerConfigByBlockHashes first. // Assumption: all peers are sorted by CompareSyncPeerConfigByBlockHashes first.
func (syncConfig *SyncConfig) GetHowManyMaxConsensus() (int, int) { // Caller shall ensure mtx is locked for reading.
func (sc *SyncConfig) getHowManyMaxConsensus() (int, int) {
// As all peers are sorted by their blockHashes, all equal blockHashes should come together and consecutively. // As all peers are sorted by their blockHashes, all equal blockHashes should come together and consecutively.
curCount := 0 curCount := 0
curFirstID := -1 curFirstID := -1
maxCount := 0 maxCount := 0
maxFirstID := -1 maxFirstID := -1
for i := range syncConfig.peers { for i := range sc.peers {
if curFirstID == -1 || CompareSyncPeerConfigByblockHashes(syncConfig.peers[curFirstID], syncConfig.peers[i]) != 0 { if curFirstID == -1 || CompareSyncPeerConfigByblockHashes(sc.peers[curFirstID], sc.peers[i]) != 0 {
curCount = 1 curCount = 1
curFirstID = i curFirstID = i
} else { } else {
@ -228,40 +245,44 @@ func (syncConfig *SyncConfig) GetHowManyMaxConsensus() (int, int) {
} }
// InitForTesting used for testing. // InitForTesting used for testing.
func (syncConfig *SyncConfig) InitForTesting(client *downloader.Client, blockHashes [][]byte) { func (sc *SyncConfig) InitForTesting(client *downloader.Client, blockHashes [][]byte) {
for i := range syncConfig.peers { sc.mtx.RLock()
syncConfig.peers[i].blockHashes = blockHashes defer sc.mtx.RUnlock()
syncConfig.peers[i].client = client for i := range sc.peers {
sc.peers[i].blockHashes = blockHashes
sc.peers[i].client = client
} }
} }
// CleanUpPeers cleans up all peers whose blockHashes are not equal to consensus block hashes. // cleanUpPeers cleans up all peers whose blockHashes are not equal to
func (syncConfig *SyncConfig) CleanUpPeers(maxFirstID int) { // consensus block hashes. Caller shall ensure mtx is locked for RW.
fixedPeer := syncConfig.peers[maxFirstID] func (sc *SyncConfig) cleanUpPeers(maxFirstID int) {
for i := 0; i < len(syncConfig.peers); i++ { fixedPeer := sc.peers[maxFirstID]
if CompareSyncPeerConfigByblockHashes(fixedPeer, syncConfig.peers[i]) != 0 { for i := 0; i < len(sc.peers); i++ {
if CompareSyncPeerConfigByblockHashes(fixedPeer, sc.peers[i]) != 0 {
// TODO: move it into a util delete func. // TODO: move it into a util delete func.
// See tip https://github.com/golang/go/wiki/SliceTricks // See tip https://github.com/golang/go/wiki/SliceTricks
// Close the client and remove the peer out of the // Close the client and remove the peer out of the
syncConfig.peers[i].client.Close() sc.peers[i].client.Close()
copy(syncConfig.peers[i:], syncConfig.peers[i+1:]) copy(sc.peers[i:], sc.peers[i+1:])
syncConfig.peers[len(syncConfig.peers)-1] = nil sc.peers[len(sc.peers)-1] = nil
syncConfig.peers = syncConfig.peers[:len(syncConfig.peers)-1] sc.peers = sc.peers[:len(sc.peers)-1]
} }
} }
} }
// GetBlockHashesConsensusAndCleanUp chesk if all consensus hashes are equal. // GetBlockHashesConsensusAndCleanUp chesk if all consensus hashes are equal.
func (ss *StateSync) GetBlockHashesConsensusAndCleanUp() bool { func (sc *SyncConfig) GetBlockHashesConsensusAndCleanUp() bool {
sc.mtx.Lock()
defer sc.mtx.Unlock()
// Sort all peers by the blockHashes. // Sort all peers by the blockHashes.
sort.Slice(ss.syncConfig.peers, func(i, j int) bool { sort.Slice(sc.peers, func(i, j int) bool {
return CompareSyncPeerConfigByblockHashes(ss.syncConfig.peers[i], ss.syncConfig.peers[j]) == -1 return CompareSyncPeerConfigByblockHashes(sc.peers[i], sc.peers[j]) == -1
}) })
maxFirstID, maxCount := ss.syncConfig.GetHowManyMaxConsensus() maxFirstID, maxCount := sc.getHowManyMaxConsensus()
utils.GetLogInstance().Info("[SYNC] block consensus hashes", "maxFirstID", maxFirstID, "maxCount", maxCount) utils.GetLogInstance().Info("[SYNC] block consensus hashes", "maxFirstID", maxFirstID, "maxCount", maxCount)
if float64(maxCount) >= ConsensusRatio*float64(ss.activePeerNumber) { if float64(maxCount) >= ConsensusRatio*float64(len(sc.peers)) {
ss.syncConfig.CleanUpPeers(maxFirstID) sc.cleanUpPeers(maxFirstID)
ss.CleanUpNilPeers()
return true return true
} }
return false return false
@ -272,22 +293,20 @@ func (ss *StateSync) GetConsensusHashes(startHash []byte) bool {
count := 0 count := 0
for { for {
var wg sync.WaitGroup var wg sync.WaitGroup
for id := range ss.syncConfig.peers { ss.syncConfig.ForEachPeer(func(peerConfig *SyncPeerConfig) (brk bool) {
if ss.syncConfig.peers[id].client == nil {
continue
}
wg.Add(1) wg.Add(1)
go func(peerConfig *SyncPeerConfig) { go func() {
defer wg.Done() defer wg.Done()
response := peerConfig.client.GetBlockHashes(startHash) response := peerConfig.client.GetBlockHashes(startHash)
if response == nil { if response == nil {
return return
} }
peerConfig.blockHashes = response.Payload peerConfig.blockHashes = response.Payload
}(ss.syncConfig.peers[id]) }()
} return
})
wg.Wait() wg.Wait()
if ss.GetBlockHashesConsensusAndCleanUp() { if ss.syncConfig.GetBlockHashesConsensusAndCleanUp() {
break break
} }
if count > TimesToFail { if count > TimesToFail {
@ -303,14 +322,13 @@ func (ss *StateSync) GetConsensusHashes(startHash []byte) bool {
func (ss *StateSync) generateStateSyncTaskQueue(bc *core.BlockChain) { func (ss *StateSync) generateStateSyncTaskQueue(bc *core.BlockChain) {
ss.stateSyncTaskQueue = queue.New(0) ss.stateSyncTaskQueue = queue.New(0)
for _, configPeer := range ss.syncConfig.peers { ss.syncConfig.ForEachPeer(func(configPeer *SyncPeerConfig) (brk bool) {
if configPeer.client != nil {
for id, blockHash := range configPeer.blockHashes { for id, blockHash := range configPeer.blockHashes {
ss.stateSyncTaskQueue.Put(SyncBlockTask{index: id, blockHash: blockHash}) ss.stateSyncTaskQueue.Put(SyncBlockTask{index: id, blockHash: blockHash})
} }
break brk = true
} return
} })
utils.GetLogInstance().Info("syncing: Finished generateStateSyncTaskQueue", "length", ss.stateSyncTaskQueue.Len()) utils.GetLogInstance().Info("syncing: Finished generateStateSyncTaskQueue", "length", ss.stateSyncTaskQueue.Len())
} }
@ -318,13 +336,10 @@ func (ss *StateSync) generateStateSyncTaskQueue(bc *core.BlockChain) {
func (ss *StateSync) downloadBlocks(bc *core.BlockChain) { func (ss *StateSync) downloadBlocks(bc *core.BlockChain) {
// Initialize blockchain // Initialize blockchain
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(ss.activePeerNumber)
count := 0 count := 0
for i := range ss.syncConfig.peers { ss.syncConfig.ForEachPeer(func(peerConfig *SyncPeerConfig) (brk bool) {
if ss.syncConfig.peers[i].client == nil { wg.Add(1)
continue go func(stateSyncTaskQueue *queue.Queue, bc *core.BlockChain) {
}
go func(peerConfig *SyncPeerConfig, stateSyncTaskQueue *queue.Queue, bc *core.BlockChain) {
defer wg.Done() defer wg.Done()
for !stateSyncTaskQueue.Empty() { for !stateSyncTaskQueue.Empty() {
task, err := ss.stateSyncTaskQueue.Poll(1, time.Millisecond) task, err := ss.stateSyncTaskQueue.Poll(1, time.Millisecond)
@ -361,8 +376,9 @@ func (ss *StateSync) downloadBlocks(bc *core.BlockChain) {
ss.commonBlocks[syncTask.index] = &blockObj ss.commonBlocks[syncTask.index] = &blockObj
ss.syncMux.Unlock() ss.syncMux.Unlock()
} }
}(ss.syncConfig.peers[i], ss.stateSyncTaskQueue, bc) }(ss.stateSyncTaskQueue, bc)
} return
})
wg.Wait() wg.Wait()
utils.GetLogInstance().Info("[SYNC] Finished downloadBlocks.") utils.GetLogInstance().Info("[SYNC] Finished downloadBlocks.")
} }
@ -399,8 +415,7 @@ func GetHowManyMaxConsensus(blocks []*types.Block) (int, int) {
func (ss *StateSync) getMaxConsensusBlockFromParentHash(parentHash common.Hash) *types.Block { func (ss *StateSync) getMaxConsensusBlockFromParentHash(parentHash common.Hash) *types.Block {
candidateBlocks := []*types.Block{} candidateBlocks := []*types.Block{}
ss.syncMux.Lock() ss.syncMux.Lock()
for id := range ss.syncConfig.peers { ss.syncConfig.ForEachPeer(func(peerConfig *SyncPeerConfig) (brk bool) {
peerConfig := ss.syncConfig.peers[id]
for _, block := range peerConfig.newBlocks { for _, block := range peerConfig.newBlocks {
ph := block.ParentHash() ph := block.ParentHash()
if bytes.Compare(ph[:], parentHash[:]) == 0 { if bytes.Compare(ph[:], parentHash[:]) == 0 {
@ -408,7 +423,8 @@ func (ss *StateSync) getMaxConsensusBlockFromParentHash(parentHash common.Hash)
break break
} }
} }
} return
})
ss.syncMux.Unlock() ss.syncMux.Unlock()
if len(candidateBlocks) == 0 { if len(candidateBlocks) == 0 {
return nil return nil
@ -488,10 +504,13 @@ func (ss *StateSync) generateNewState(bc *core.BlockChain, worker *worker.Worker
} }
parentHash = block.Hash() parentHash = block.Hash()
} }
// TODO ek – Do we need to hold syncMux now that syncConfig has its onw
// mutex?
ss.syncMux.Lock() ss.syncMux.Lock()
for id := range ss.syncConfig.peers { ss.syncConfig.ForEachPeer(func(peer *SyncPeerConfig) (brk bool) {
ss.syncConfig.peers[id].newBlocks = []*types.Block{} peer.newBlocks = []*types.Block{}
} return
})
ss.syncMux.Unlock() ss.syncMux.Unlock()
// update last mile blocks if any // update last mile blocks if any
@ -538,42 +557,40 @@ func (peerConfig *SyncPeerConfig) registerToBroadcast(peerHash []byte, ip, port
// RegisterNodeInfo will register node to peers to accept future new block broadcasting // RegisterNodeInfo will register node to peers to accept future new block broadcasting
// return number of successfull registration // return number of successfull registration
func (ss *StateSync) RegisterNodeInfo() int { func (ss *StateSync) RegisterNodeInfo() int {
ss.CleanUpNilPeers()
registrationNumber := RegistrationNumber registrationNumber := RegistrationNumber
utils.GetLogInstance().Debug("[SYNC] node registration to peers", "registrationNumber", registrationNumber, "activePeerNumber", ss.activePeerNumber) utils.GetLogInstance().Debug("[SYNC] node registration to peers",
"registrationNumber", registrationNumber,
"activePeerNumber", len(ss.syncConfig.peers))
count := 0 count := 0
for id := range ss.syncConfig.peers { ss.syncConfig.ForEachPeer(func(peerConfig *SyncPeerConfig) (brk bool) {
peerConfig := ss.syncConfig.peers[id]
if count >= registrationNumber { if count >= registrationNumber {
break brk = true
return
} }
if peerConfig.ip == ss.selfip && peerConfig.port == GetSyncingPort(ss.selfport) { if peerConfig.ip == ss.selfip && peerConfig.port == GetSyncingPort(ss.selfport) {
utils.GetLogInstance().Debug("[SYNC] skip self", "peerport", peerConfig.port, "selfport", ss.selfport, "selfsyncport", GetSyncingPort(ss.selfport)) utils.GetLogInstance().Debug("[SYNC] skip self", "peerport", peerConfig.port, "selfport", ss.selfport, "selfsyncport", GetSyncingPort(ss.selfport))
continue return
}
if peerConfig.client == nil {
continue
} }
err := peerConfig.registerToBroadcast(ss.selfPeerHash[:], ss.selfip, ss.selfport) err := peerConfig.registerToBroadcast(ss.selfPeerHash[:], ss.selfip, ss.selfport)
if err != nil { if err != nil {
utils.GetLogInstance().Debug("[SYNC] register failed to peer", "ip", peerConfig.ip, "port", peerConfig.port, "selfPeerHash", ss.selfPeerHash) utils.GetLogInstance().Debug("[SYNC] register failed to peer", "ip", peerConfig.ip, "port", peerConfig.port, "selfPeerHash", ss.selfPeerHash)
continue return
} }
utils.GetLogInstance().Debug("[SYNC] register success", "ip", peerConfig.ip, "port", peerConfig.port) utils.GetLogInstance().Debug("[SYNC] register success", "ip", peerConfig.ip, "port", peerConfig.port)
count++ count++
} return
})
return count return count
} }
// getMaxPeerHeight gets the maximum blockchain heights from peers // getMaxPeerHeight gets the maximum blockchain heights from peers
func (ss *StateSync) getMaxPeerHeight() uint64 { func (ss *StateSync) getMaxPeerHeight() uint64 {
ss.CleanUpNilPeers()
maxHeight := uint64(0) maxHeight := uint64(0)
var wg sync.WaitGroup var wg sync.WaitGroup
for id := range ss.syncConfig.peers { ss.syncConfig.ForEachPeer(func(peerConfig *SyncPeerConfig) (brk bool) {
wg.Add(1) wg.Add(1)
go func(peerConfig *SyncPeerConfig) { go func() {
defer wg.Done() defer wg.Done()
response := peerConfig.client.GetBlockChainHeight() response := peerConfig.client.GetBlockChainHeight()
ss.syncMux.Lock() ss.syncMux.Lock()
@ -581,8 +598,9 @@ func (ss *StateSync) getMaxPeerHeight() uint64 {
maxHeight = response.BlockHeight maxHeight = response.BlockHeight
} }
ss.syncMux.Unlock() ss.syncMux.Unlock()
}(ss.syncConfig.peers[id]) }()
} return
})
wg.Wait() wg.Wait()
return maxHeight return maxHeight
} }

@ -0,0 +1,124 @@
// Package ctxerror provides a context-aware error facility.
//
// Inspired by log15-style (semi-)structured logging,
// it also provides a log15 bridge.
package ctxerror
//go:generate mockgen -source ctxerror.go -destination mock/ctxerror.go
import (
"fmt"
)
// CtxError is a context-aware error container.
type CtxError interface {
// Error returns a fully formatted message, with context info.
Error() string
// Message returns the bare error message, without context info.
Message() string
// Contexts returns message contexts.
// Caller shall not modify the returned map.
Contexts() map[string]interface{}
// WithCause chains an error after the receiver.
// It returns the merged/chained instance,
// where the message is "<receiver.Message>: <c.Message>",
// and with contexts merged (ones in c takes precedence).
WithCause(c error) CtxError
}
type ctxError struct {
msg string
ctx map[string]interface{}
}
// New creates and returns a new context-aware error.
func New(msg string, ctx ...interface{}) CtxError {
e := &ctxError{msg: msg, ctx: make(map[string]interface{})}
e.updateCtx(ctx...)
return e
}
func (e *ctxError) updateCtx(ctx ...interface{}) {
var name string
if len(ctx)%2 == 1 {
ctx = append(ctx, nil)
}
for idx, value := range ctx {
if idx%2 == 0 {
name = value.(string)
} else {
e.ctx[name] = value
}
}
}
// Error returns a fully formatted message, with context info.
func (e *ctxError) Error() string {
s := e.msg
for k, v := range e.ctx {
s += fmt.Sprintf(", %s=%#v", k, v)
}
return s
}
// Message returns the bare error message, without context info.
func (e *ctxError) Message() string {
return e.msg
}
// Contexts returns message contexts.
// Caller shall not modify the returned map.
func (e *ctxError) Contexts() map[string]interface{} {
return e.ctx
}
// WithCause chains an error after the receiver.
// It returns the merged/chained instance,
// where the message is “<receiver.Message>: <c.Message>”,
// and with contexts merged (ones in c takes precedence).
func (e *ctxError) WithCause(c error) CtxError {
r := &ctxError{msg: e.msg + ": ", ctx: make(map[string]interface{})}
for k, v := range e.ctx {
r.ctx[k] = v
}
switch c := c.(type) {
case *ctxError:
r.msg += c.msg
for k, v := range c.ctx {
r.ctx[k] = v
}
default:
r.msg += c.Error()
}
return r
}
// Log15Func is a log15-compatible logging function.
type Log15Func func(msg string, ctx ...interface{})
// Log15Logger logs something with a log15-style logging function.
type Log15Logger interface {
Log15(f Log15Func)
}
// Log15 logs the receiver with a log15-style logging function.
func (e *ctxError) Log15(f Log15Func) {
var ctx []interface{}
for k, v := range e.ctx {
ctx = append(ctx, k, v)
}
f(e.msg, ctx...)
}
// Log15 logs an error with a log15-style logging function.
// It handles both regular errors and Log15Logger-compliant errors.
func Log15(f Log15Func, e error) {
if e15, ok := e.(Log15Logger); ok {
e15.Log15(f)
} else {
f(e.Error())
}
}

@ -0,0 +1,363 @@
package ctxerror
import (
"errors"
"reflect"
"testing"
)
func TestNew(t *testing.T) {
type args struct {
msg string
ctx []interface{}
}
tests := []struct {
name string
args args
want CtxError
}{
{
name: "Empty",
args: args{msg: "", ctx: []interface{}{}},
want: &ctxError{msg: "", ctx: map[string]interface{}{}},
},
{
name: "Regular",
args: args{msg: "omg", ctx: []interface{}{"wtf", 1, "bbq", 2}},
want: &ctxError{msg: "omg", ctx: map[string]interface{}{"wtf": 1, "bbq": 2}},
},
{
name: "Truncated",
args: args{
msg: "omg",
ctx: []interface{}{"wtf", 1, "bbq" /* missing value... */},
},
want: &ctxError{
msg: "omg",
ctx: map[string]interface{}{"wtf": 1, "bbq": /* becomes */ nil},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := New(tt.args.msg, tt.args.ctx...); !reflect.DeepEqual(got, tt.want) {
t.Errorf("New() = %#v, want %#v", got, tt.want)
}
})
}
}
func Test_ctxError_updateCtx(t *testing.T) {
tests := []struct {
name string
before, after map[string]interface{}
delta []interface{}
}{
{
name: "Empty",
before: map[string]interface{}{"omg": 1, "wtf": 2, "bbq": 3},
delta: []interface{}{},
after: map[string]interface{}{"omg": 1, "wtf": 2, "bbq": 3},
},
{
name: "Regular",
before: map[string]interface{}{"omg": 1, "wtf": 2, "bbq": 3},
delta: []interface{}{"omg", 10, "wtf", 20},
after: map[string]interface{}{"omg": 10, "wtf": 20, "bbq": 3},
},
{
name: "Truncated",
before: map[string]interface{}{"omg": 1, "wtf": 2, "bbq": 3},
delta: []interface{}{"omg", 10, "wtf" /* missing value... */},
after: map[string]interface{}{"omg": 10, "wtf": /* becomes */ nil, "bbq": 3},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
e := &ctxError{msg: tt.name, ctx: tt.before}
e.updateCtx(tt.delta...)
if !reflect.DeepEqual(e.ctx, tt.after) {
t.Errorf("expected ctx %#v != %#v seen", tt.after, e.ctx)
}
})
}
}
func Test_ctxError_Error(t *testing.T) {
type fields struct {
msg string
ctx map[string]interface{}
}
tests := []struct {
name string
fields fields
want string
}{
{
name: "AllEmpty",
fields: fields{msg: "", ctx: map[string]interface{}{}},
want: "",
},
{
name: "CtxEmpty",
fields: fields{msg: "omg", ctx: map[string]interface{}{}},
want: "omg",
},
{
name: "MsgEmpty",
fields: fields{msg: "", ctx: map[string]interface{}{"wtf": "bbq"}},
want: ", wtf=\"bbq\"",
},
{
name: "Regular",
fields: fields{msg: "omg", ctx: map[string]interface{}{"wtf": "bbq"}},
want: "omg, wtf=\"bbq\"",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
e := &ctxError{
msg: tt.fields.msg,
ctx: tt.fields.ctx,
}
if got := e.Error(); got != tt.want {
t.Errorf("Error() = %#v, want %#v", got, tt.want)
}
})
}
}
func Test_ctxError_Message(t *testing.T) {
type fields struct {
msg string
ctx map[string]interface{}
}
tests := []struct {
name string
fields fields
want string
}{
{
name: "AllEmpty",
fields: fields{msg: "", ctx: map[string]interface{}{}},
want: "",
},
{
name: "CtxEmpty",
fields: fields{msg: "omg", ctx: map[string]interface{}{}},
want: "omg",
},
{
name: "MsgEmpty",
fields: fields{msg: "", ctx: map[string]interface{}{"wtf": "bbq"}},
want: "",
},
{
name: "Regular",
fields: fields{msg: "omg", ctx: map[string]interface{}{"wtf": "bbq"}},
want: "omg",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
e := &ctxError{
msg: tt.fields.msg,
ctx: tt.fields.ctx,
}
if got := e.Message(); got != tt.want {
t.Errorf("Message() = %#v, want %#v", got, tt.want)
}
})
}
}
func Test_ctxError_Contexts(t *testing.T) {
type fields struct {
msg string
ctx map[string]interface{}
}
tests := []struct {
name string
fields fields
want map[string]interface{}
}{
{
name: "Empty",
fields: fields{msg: "", ctx: map[string]interface{}{}},
want: map[string]interface{}{},
},
{
name: "Regular",
fields: fields{
msg: "",
ctx: map[string]interface{}{"omg": 1, "wtf": 2, "bbq": 3},
},
want: map[string]interface{}{"omg": 1, "wtf": 2, "bbq": 3},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
e := &ctxError{
msg: tt.fields.msg,
ctx: tt.fields.ctx,
}
if got := e.Contexts(); !reflect.DeepEqual(got, tt.want) {
t.Errorf("Contexts() = %#v, want %#v", got, tt.want)
}
})
}
}
func Test_ctxError_WithCause(t *testing.T) {
type fields struct {
msg string
ctx map[string]interface{}
}
type args struct {
c error
}
tests := []struct {
name string
fields fields
args args
want CtxError
}{
{
name: "CtxError",
fields: fields{
msg: "hello",
ctx: map[string]interface{}{"omg": 1, "wtf": 2},
},
args: args{c: &ctxError{
msg: "world",
ctx: map[string]interface{}{"wtf": 20, "bbq": 30},
}},
want: &ctxError{
msg: "hello: world",
ctx: map[string]interface{}{"omg": 1, "wtf": 20, "bbq": 30},
},
},
{
name: "RegularError",
fields: fields{
msg: "hello",
ctx: map[string]interface{}{"omg": 1, "wtf": 2},
},
args: args{c: errors.New("world")},
want: &ctxError{
msg: "hello: world",
ctx: map[string]interface{}{"omg": 1, "wtf": 2},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
e := &ctxError{
msg: tt.fields.msg,
ctx: tt.fields.ctx,
}
if got := e.WithCause(tt.args.c); !reflect.DeepEqual(got, tt.want) {
t.Errorf("WithCause() = %#v, want %#v", got, tt.want)
}
})
}
}
func Test_ctxError_Log15(t *testing.T) {
type fields struct {
msg string
ctx map[string]interface{}
}
type want struct {
msg string
ctx []interface{}
}
tests := []struct {
name string
fields fields
want want
}{
{
name: "Empty",
fields: fields{msg: "", ctx: map[string]interface{}{}},
want: want{msg: "", ctx: nil},
},
{
name: "Regular",
fields: fields{msg: "hello", ctx: map[string]interface{}{"omg": 1}},
want: want{msg: "hello", ctx: []interface{}{"omg", 1}},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
called := false
f := func(msg string, ctx ...interface{}) {
called = true
if msg != tt.want.msg {
t.Errorf("expected message %#v != %#v seen",
tt.want.msg, msg)
}
if !reflect.DeepEqual(ctx, tt.want.ctx) {
t.Errorf("expected ctx %#v != %#v seen", ctx, tt.want.ctx)
}
}
e := &ctxError{
msg: tt.fields.msg,
ctx: tt.fields.ctx,
}
e.Log15(f)
if !called {
t.Error("logging func not called")
}
})
}
}
func TestLog15(t *testing.T) {
type args struct {
e error
}
type want struct {
msg string
ctx []interface{}
}
tests := []struct {
name string
args args
want want
}{
{
name: "Regular",
args: args{e: errors.New("hello")},
want: want{msg: "hello", ctx: nil},
},
{
name: "CtxError",
args: args{e: &ctxError{
msg: "hello",
ctx: map[string]interface{}{"omg": 1},
}},
want: want{msg: "hello", ctx: []interface{}{"omg", 1}},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
called := false
f := func(msg string, ctx ...interface{}) {
called = true
if msg != tt.want.msg {
t.Errorf("expected message %#v != %#v seen",
tt.want.msg, msg)
}
if !reflect.DeepEqual(ctx, tt.want.ctx) {
t.Errorf("expected ctx %#v != %#v seen",
tt.want.ctx, ctx)
}
}
Log15(f, tt.args.e)
if !called {
t.Errorf("logging func not called")
}
})
}
}

@ -0,0 +1,125 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: ctxerror.go
// Package mock_ctxerror is a generated GoMock package.
package mock_ctxerror
import (
gomock "github.com/golang/mock/gomock"
ctxerror "github.com/harmony-one/harmony/internal/ctxerror"
reflect "reflect"
)
// MockCtxError is a mock of CtxError interface
type MockCtxError struct {
ctrl *gomock.Controller
recorder *MockCtxErrorMockRecorder
}
// MockCtxErrorMockRecorder is the mock recorder for MockCtxError
type MockCtxErrorMockRecorder struct {
mock *MockCtxError
}
// NewMockCtxError creates a new mock instance
func NewMockCtxError(ctrl *gomock.Controller) *MockCtxError {
mock := &MockCtxError{ctrl: ctrl}
mock.recorder = &MockCtxErrorMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use
func (m *MockCtxError) EXPECT() *MockCtxErrorMockRecorder {
return m.recorder
}
// Error mocks base method
func (m *MockCtxError) Error() string {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Error")
ret0, _ := ret[0].(string)
return ret0
}
// Error indicates an expected call of Error
func (mr *MockCtxErrorMockRecorder) Error() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Error", reflect.TypeOf((*MockCtxError)(nil).Error))
}
// Message mocks base method
func (m *MockCtxError) Message() string {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Message")
ret0, _ := ret[0].(string)
return ret0
}
// Message indicates an expected call of Message
func (mr *MockCtxErrorMockRecorder) Message() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Message", reflect.TypeOf((*MockCtxError)(nil).Message))
}
// Contexts mocks base method
func (m *MockCtxError) Contexts() map[string]interface{} {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Contexts")
ret0, _ := ret[0].(map[string]interface{})
return ret0
}
// Contexts indicates an expected call of Contexts
func (mr *MockCtxErrorMockRecorder) Contexts() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Contexts", reflect.TypeOf((*MockCtxError)(nil).Contexts))
}
// WithCause mocks base method
func (m *MockCtxError) WithCause(c error) ctxerror.CtxError {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "WithCause", c)
ret0, _ := ret[0].(ctxerror.CtxError)
return ret0
}
// WithCause indicates an expected call of WithCause
func (mr *MockCtxErrorMockRecorder) WithCause(c interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WithCause", reflect.TypeOf((*MockCtxError)(nil).WithCause), c)
}
// MockLog15Logger is a mock of Log15Logger interface
type MockLog15Logger struct {
ctrl *gomock.Controller
recorder *MockLog15LoggerMockRecorder
}
// MockLog15LoggerMockRecorder is the mock recorder for MockLog15Logger
type MockLog15LoggerMockRecorder struct {
mock *MockLog15Logger
}
// NewMockLog15Logger creates a new mock instance
func NewMockLog15Logger(ctrl *gomock.Controller) *MockLog15Logger {
mock := &MockLog15Logger{ctrl: ctrl}
mock.recorder = &MockLog15LoggerMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use
func (m *MockLog15Logger) EXPECT() *MockLog15LoggerMockRecorder {
return m.recorder
}
// Log15 mocks base method
func (m *MockLog15Logger) Log15(f ctxerror.Log15Func) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "Log15", f)
}
// Log15 indicates an expected call of Log15
func (mr *MockLog15LoggerMockRecorder) Log15(f interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Log15", reflect.TypeOf((*MockLog15Logger)(nil).Log15), f)
}

@ -13,6 +13,7 @@ import (
"github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/types"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node" nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/ctxerror"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/node/worker" "github.com/harmony-one/harmony/node/worker"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
@ -61,10 +62,9 @@ SyncingLoop:
node.stateSync = syncing.CreateStateSync(node.SelfPeer.IP, node.SelfPeer.Port, node.GetSyncID()) node.stateSync = syncing.CreateStateSync(node.SelfPeer.IP, node.SelfPeer.Port, node.GetSyncID())
} }
if node.stateSync.GetActivePeerNumber() == 0 { if node.stateSync.GetActivePeerNumber() == 0 {
if node.stateSync.CreateSyncConfig(getPeers()) { peers := getPeers()
node.stateSync.MakeConnectionToPeers() if err := node.stateSync.CreateSyncConfig(peers); err != nil {
} else { ctxerror.Log15(utils.GetLogInstance().Debug, err)
utils.GetLogInstance().Debug("[SYNC] no active peers, continue SyncingLoop")
continue SyncingLoop continue SyncingLoop
} }
} }

@ -4,5 +4,5 @@ exec git ls-files '*.go' | grep -v \
-e '\.pb\.go$' \ -e '\.pb\.go$' \
-e '/mock_stream\.go' \ -e '/mock_stream\.go' \
-e '/host_mock\.go' \ -e '/host_mock\.go' \
-e '^p2p/host/hostv2/mock/' \ -e '/mock/[^/]*\.go' \
-e '/gen_[^/]*\.go' -e '/gen_[^/]*\.go'

Loading…
Cancel
Save