diff --git a/api/service/syncing/errors.go b/api/service/syncing/errors.go index 02221c12c..a70193583 100644 --- a/api/service/syncing/errors.go +++ b/api/service/syncing/errors.go @@ -4,8 +4,7 @@ import "errors" // Errors ... var ( - ErrSyncPeerConfigClientNotReady = errors.New("[SYNC]: client is not ready") - ErrRegistrationFail = errors.New("[SYNC]: registration failed") - ErrGetBlock = errors.New("[SYNC]: get block failed") - ErrGetBlockHash = errors.New("[SYNC]: get blockhash failed") + ErrRegistrationFail = errors.New("[SYNC]: registration failed") + ErrGetBlock = errors.New("[SYNC]: get block failed") + ErrGetBlockHash = errors.New("[SYNC]: get blockhash failed") ) diff --git a/api/service/syncing/syncing.go b/api/service/syncing/syncing.go index 566424a1a..ba9086fe3 100644 --- a/api/service/syncing/syncing.go +++ b/api/service/syncing/syncing.go @@ -16,6 +16,7 @@ import ( pb "github.com/harmony-one/harmony/api/service/syncing/downloader/proto" "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/types" + "github.com/harmony-one/harmony/internal/ctxerror" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/node/worker" "github.com/harmony-one/harmony/p2p" @@ -55,9 +56,32 @@ type SyncBlockTask struct { // SyncConfig contains an array of SyncPeerConfig. type SyncConfig struct { + // mtx locks peers, and *SyncPeerConfig pointers in peers. + // SyncPeerConfig itself is guarded by its own mutex. + mtx sync.RWMutex + peers []*SyncPeerConfig } +// AddPeer adds the given sync peer. +func (sc *SyncConfig) AddPeer(peer *SyncPeerConfig) { + sc.mtx.Lock() + defer sc.mtx.Unlock() + sc.peers = append(sc.peers, peer) +} + +// ForEachPeer calls the given function with each peer. +// It breaks the iteration iff the function returns true. +func (sc *SyncConfig) ForEachPeer(f func(peer *SyncPeerConfig) (brk bool)) { + sc.mtx.RLock() + defer sc.mtx.RUnlock() + for _, peer := range sc.peers { + if f(peer) { + break + } + } +} + // CreateStateSync returns the implementation of StateSyncInterface interface. func CreateStateSync(ip string, port string, peerHash [20]byte) *StateSync { stateSync := &StateSync{} @@ -74,8 +98,6 @@ type StateSync struct { selfip string selfport string selfPeerHash [20]byte // hash of ip and address combination - peerNumber int - activePeerNumber int commonBlocks map[int]*types.Block lastMileBlocks []*types.Block // last mile blocks to catch up with the consensus syncConfig *SyncConfig @@ -91,25 +113,39 @@ func (ss *StateSync) AddLastMileBlock(block *types.Block) { } // CloseConnections close grpc connections for state sync clients -func (ss *StateSync) CloseConnections() { - for _, pc := range ss.syncConfig.peers { - if pc.client != nil { - pc.client.Close() +func (sc *SyncConfig) CloseConnections() { + sc.mtx.RLock() + defer sc.mtx.RUnlock() + for _, pc := range sc.peers { + pc.client.Close() + } +} + +// FindPeerByHash returns the peer with the given hash, or nil if not found. +func (sc *SyncConfig) FindPeerByHash(peerHash []byte) *SyncPeerConfig { + sc.mtx.RLock() + defer sc.mtx.RUnlock() + for _, pc := range sc.peers { + if bytes.Compare(pc.peerHash, peerHash) == 0 { + return pc } } + return nil } // AddNewBlock will add newly received block into state syncing queue func (ss *StateSync) AddNewBlock(peerHash []byte, block *types.Block) { - for i, pc := range ss.syncConfig.peers { - if bytes.Compare(pc.peerHash, peerHash) != 0 { - continue - } - pc.mux.Lock() - pc.newBlocks = append(pc.newBlocks, block) - pc.mux.Unlock() - utils.GetLogInstance().Debug("[SYNC] new block received", "total", len(ss.syncConfig.peers[i].newBlocks), "blockHeight", block.NumberU64()) + pc := ss.syncConfig.FindPeerByHash(peerHash) + if pc == nil { + // Received a block with no active peer; just ignore. + return } + // TODO ek – we shouldn't mess with SyncPeerConfig's mutex. + // Factor this into a method, like pc.AddNewBlock(block) + pc.mux.Lock() + defer pc.mux.Unlock() + pc.newBlocks = append(pc.newBlocks, block) + utils.GetLogInstance().Debug("[SYNC] new block received", "total", len(pc.newBlocks), "blockHeight", block.NumberU64()) } // CreateTestSyncPeerConfig used for testing. @@ -138,9 +174,6 @@ func CompareSyncPeerConfigByblockHashes(a *SyncPeerConfig, b *SyncPeerConfig) in // GetBlocks gets blocks by calling grpc request to the corresponding peer. func (peerConfig *SyncPeerConfig) GetBlocks(hashes [][]byte) ([][]byte, error) { - if peerConfig.client == nil { - return nil, ErrSyncPeerConfigClientNotReady - } response := peerConfig.client.GetBlocks(hashes) if response == nil { return nil, ErrGetBlock @@ -149,71 +182,55 @@ func (peerConfig *SyncPeerConfig) GetBlocks(hashes [][]byte) ([][]byte, error) { } // CreateSyncConfig creates SyncConfig for StateSync object. -func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer) bool { +func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer) error { utils.GetLogInstance().Debug("CreateSyncConfig: len of peers", "len", len(peers)) if len(peers) == 0 { - utils.GetLogInstance().Warn("[SYNC] Unable to get neighbor peers") - return false + return ctxerror.New("[SYNC] no peers to connect to") } - ss.peerNumber = len(peers) - ss.syncConfig = &SyncConfig{ - peers: make([]*SyncPeerConfig, ss.peerNumber), - } - for id := range ss.syncConfig.peers { - ss.syncConfig.peers[id] = &SyncPeerConfig{ - ip: peers[id].IP, - port: peers[id].Port, - } - } - utils.GetLogInstance().Info("[SYNC] Finished creating SyncConfig") - return true -} - -// MakeConnectionToPeers makes grpc connection to all peers. -func (ss *StateSync) MakeConnectionToPeers() { + ss.syncConfig = &SyncConfig{} var wg sync.WaitGroup - wg.Add(ss.peerNumber) - for id := range ss.syncConfig.peers { - go func(peerConfig *SyncPeerConfig) { + for _, peer := range peers { + wg.Add(1) + go func(peer p2p.Peer) { defer wg.Done() - peerConfig.client = downloader.ClientSetup(peerConfig.ip, peerConfig.port) - }(ss.syncConfig.peers[id]) + client := downloader.ClientSetup(peer.IP, peer.Port) + if client == nil { + return + } + peerConfig := &SyncPeerConfig{ + ip: peer.IP, + port: peer.Port, + client: client, + } + ss.syncConfig.AddPeer(peerConfig) + }(peer) } wg.Wait() - ss.CleanUpNilPeers() utils.GetLogInstance().Info("[SYNC] Finished making connection to peers.") + + return nil } // GetActivePeerNumber returns the number of active peers func (ss *StateSync) GetActivePeerNumber() int { - if ss.syncConfig == nil || len(ss.syncConfig.peers) == 0 { + if ss.syncConfig == nil { return 0 } - ss.CleanUpNilPeers() - return ss.activePeerNumber -} - -// CleanUpNilPeers cleans up peer with nil client and recalculate activePeerNumber. -func (ss *StateSync) CleanUpNilPeers() { - ss.activePeerNumber = 0 - for _, configPeer := range ss.syncConfig.peers { - if configPeer.client != nil { - ss.activePeerNumber++ - } - } - utils.GetLogInstance().Info("[SYNC] clean up inactive peers", "activeNumber", ss.activePeerNumber) + // len() is atomic; no need to hold mutex. + return len(ss.syncConfig.peers) } -// GetHowManyMaxConsensus returns max number of consensus nodes and the first ID of consensus group. +// getHowManyMaxConsensus returns max number of consensus nodes and the first ID of consensus group. // Assumption: all peers are sorted by CompareSyncPeerConfigByBlockHashes first. -func (syncConfig *SyncConfig) GetHowManyMaxConsensus() (int, int) { +// Caller shall ensure mtx is locked for reading. +func (sc *SyncConfig) getHowManyMaxConsensus() (int, int) { // As all peers are sorted by their blockHashes, all equal blockHashes should come together and consecutively. curCount := 0 curFirstID := -1 maxCount := 0 maxFirstID := -1 - for i := range syncConfig.peers { - if curFirstID == -1 || CompareSyncPeerConfigByblockHashes(syncConfig.peers[curFirstID], syncConfig.peers[i]) != 0 { + for i := range sc.peers { + if curFirstID == -1 || CompareSyncPeerConfigByblockHashes(sc.peers[curFirstID], sc.peers[i]) != 0 { curCount = 1 curFirstID = i } else { @@ -228,40 +245,44 @@ func (syncConfig *SyncConfig) GetHowManyMaxConsensus() (int, int) { } // InitForTesting used for testing. -func (syncConfig *SyncConfig) InitForTesting(client *downloader.Client, blockHashes [][]byte) { - for i := range syncConfig.peers { - syncConfig.peers[i].blockHashes = blockHashes - syncConfig.peers[i].client = client +func (sc *SyncConfig) InitForTesting(client *downloader.Client, blockHashes [][]byte) { + sc.mtx.RLock() + defer sc.mtx.RUnlock() + for i := range sc.peers { + sc.peers[i].blockHashes = blockHashes + sc.peers[i].client = client } } -// CleanUpPeers cleans up all peers whose blockHashes are not equal to consensus block hashes. -func (syncConfig *SyncConfig) CleanUpPeers(maxFirstID int) { - fixedPeer := syncConfig.peers[maxFirstID] - for i := 0; i < len(syncConfig.peers); i++ { - if CompareSyncPeerConfigByblockHashes(fixedPeer, syncConfig.peers[i]) != 0 { +// cleanUpPeers cleans up all peers whose blockHashes are not equal to +// consensus block hashes. Caller shall ensure mtx is locked for RW. +func (sc *SyncConfig) cleanUpPeers(maxFirstID int) { + fixedPeer := sc.peers[maxFirstID] + for i := 0; i < len(sc.peers); i++ { + if CompareSyncPeerConfigByblockHashes(fixedPeer, sc.peers[i]) != 0 { // TODO: move it into a util delete func. // See tip https://github.com/golang/go/wiki/SliceTricks // Close the client and remove the peer out of the - syncConfig.peers[i].client.Close() - copy(syncConfig.peers[i:], syncConfig.peers[i+1:]) - syncConfig.peers[len(syncConfig.peers)-1] = nil - syncConfig.peers = syncConfig.peers[:len(syncConfig.peers)-1] + sc.peers[i].client.Close() + copy(sc.peers[i:], sc.peers[i+1:]) + sc.peers[len(sc.peers)-1] = nil + sc.peers = sc.peers[:len(sc.peers)-1] } } } // GetBlockHashesConsensusAndCleanUp chesk if all consensus hashes are equal. -func (ss *StateSync) GetBlockHashesConsensusAndCleanUp() bool { +func (sc *SyncConfig) GetBlockHashesConsensusAndCleanUp() bool { + sc.mtx.Lock() + defer sc.mtx.Unlock() // Sort all peers by the blockHashes. - sort.Slice(ss.syncConfig.peers, func(i, j int) bool { - return CompareSyncPeerConfigByblockHashes(ss.syncConfig.peers[i], ss.syncConfig.peers[j]) == -1 + sort.Slice(sc.peers, func(i, j int) bool { + return CompareSyncPeerConfigByblockHashes(sc.peers[i], sc.peers[j]) == -1 }) - maxFirstID, maxCount := ss.syncConfig.GetHowManyMaxConsensus() + maxFirstID, maxCount := sc.getHowManyMaxConsensus() utils.GetLogInstance().Info("[SYNC] block consensus hashes", "maxFirstID", maxFirstID, "maxCount", maxCount) - if float64(maxCount) >= ConsensusRatio*float64(ss.activePeerNumber) { - ss.syncConfig.CleanUpPeers(maxFirstID) - ss.CleanUpNilPeers() + if float64(maxCount) >= ConsensusRatio*float64(len(sc.peers)) { + sc.cleanUpPeers(maxFirstID) return true } return false @@ -272,22 +293,20 @@ func (ss *StateSync) GetConsensusHashes(startHash []byte) bool { count := 0 for { var wg sync.WaitGroup - for id := range ss.syncConfig.peers { - if ss.syncConfig.peers[id].client == nil { - continue - } + ss.syncConfig.ForEachPeer(func(peerConfig *SyncPeerConfig) (brk bool) { wg.Add(1) - go func(peerConfig *SyncPeerConfig) { + go func() { defer wg.Done() response := peerConfig.client.GetBlockHashes(startHash) if response == nil { return } peerConfig.blockHashes = response.Payload - }(ss.syncConfig.peers[id]) - } + }() + return + }) wg.Wait() - if ss.GetBlockHashesConsensusAndCleanUp() { + if ss.syncConfig.GetBlockHashesConsensusAndCleanUp() { break } if count > TimesToFail { @@ -303,14 +322,13 @@ func (ss *StateSync) GetConsensusHashes(startHash []byte) bool { func (ss *StateSync) generateStateSyncTaskQueue(bc *core.BlockChain) { ss.stateSyncTaskQueue = queue.New(0) - for _, configPeer := range ss.syncConfig.peers { - if configPeer.client != nil { - for id, blockHash := range configPeer.blockHashes { - ss.stateSyncTaskQueue.Put(SyncBlockTask{index: id, blockHash: blockHash}) - } - break + ss.syncConfig.ForEachPeer(func(configPeer *SyncPeerConfig) (brk bool) { + for id, blockHash := range configPeer.blockHashes { + ss.stateSyncTaskQueue.Put(SyncBlockTask{index: id, blockHash: blockHash}) } - } + brk = true + return + }) utils.GetLogInstance().Info("syncing: Finished generateStateSyncTaskQueue", "length", ss.stateSyncTaskQueue.Len()) } @@ -318,13 +336,10 @@ func (ss *StateSync) generateStateSyncTaskQueue(bc *core.BlockChain) { func (ss *StateSync) downloadBlocks(bc *core.BlockChain) { // Initialize blockchain var wg sync.WaitGroup - wg.Add(ss.activePeerNumber) count := 0 - for i := range ss.syncConfig.peers { - if ss.syncConfig.peers[i].client == nil { - continue - } - go func(peerConfig *SyncPeerConfig, stateSyncTaskQueue *queue.Queue, bc *core.BlockChain) { + ss.syncConfig.ForEachPeer(func(peerConfig *SyncPeerConfig) (brk bool) { + wg.Add(1) + go func(stateSyncTaskQueue *queue.Queue, bc *core.BlockChain) { defer wg.Done() for !stateSyncTaskQueue.Empty() { task, err := ss.stateSyncTaskQueue.Poll(1, time.Millisecond) @@ -361,8 +376,9 @@ func (ss *StateSync) downloadBlocks(bc *core.BlockChain) { ss.commonBlocks[syncTask.index] = &blockObj ss.syncMux.Unlock() } - }(ss.syncConfig.peers[i], ss.stateSyncTaskQueue, bc) - } + }(ss.stateSyncTaskQueue, bc) + return + }) wg.Wait() utils.GetLogInstance().Info("[SYNC] Finished downloadBlocks.") } @@ -399,8 +415,7 @@ func GetHowManyMaxConsensus(blocks []*types.Block) (int, int) { func (ss *StateSync) getMaxConsensusBlockFromParentHash(parentHash common.Hash) *types.Block { candidateBlocks := []*types.Block{} ss.syncMux.Lock() - for id := range ss.syncConfig.peers { - peerConfig := ss.syncConfig.peers[id] + ss.syncConfig.ForEachPeer(func(peerConfig *SyncPeerConfig) (brk bool) { for _, block := range peerConfig.newBlocks { ph := block.ParentHash() if bytes.Compare(ph[:], parentHash[:]) == 0 { @@ -408,7 +423,8 @@ func (ss *StateSync) getMaxConsensusBlockFromParentHash(parentHash common.Hash) break } } - } + return + }) ss.syncMux.Unlock() if len(candidateBlocks) == 0 { return nil @@ -488,10 +504,13 @@ func (ss *StateSync) generateNewState(bc *core.BlockChain, worker *worker.Worker } parentHash = block.Hash() } + // TODO ek – Do we need to hold syncMux now that syncConfig has its onw + // mutex? ss.syncMux.Lock() - for id := range ss.syncConfig.peers { - ss.syncConfig.peers[id].newBlocks = []*types.Block{} - } + ss.syncConfig.ForEachPeer(func(peer *SyncPeerConfig) (brk bool) { + peer.newBlocks = []*types.Block{} + return + }) ss.syncMux.Unlock() // update last mile blocks if any @@ -538,42 +557,40 @@ func (peerConfig *SyncPeerConfig) registerToBroadcast(peerHash []byte, ip, port // RegisterNodeInfo will register node to peers to accept future new block broadcasting // return number of successfull registration func (ss *StateSync) RegisterNodeInfo() int { - ss.CleanUpNilPeers() registrationNumber := RegistrationNumber - utils.GetLogInstance().Debug("[SYNC] node registration to peers", "registrationNumber", registrationNumber, "activePeerNumber", ss.activePeerNumber) + utils.GetLogInstance().Debug("[SYNC] node registration to peers", + "registrationNumber", registrationNumber, + "activePeerNumber", len(ss.syncConfig.peers)) count := 0 - for id := range ss.syncConfig.peers { - peerConfig := ss.syncConfig.peers[id] + ss.syncConfig.ForEachPeer(func(peerConfig *SyncPeerConfig) (brk bool) { if count >= registrationNumber { - break + brk = true + return } if peerConfig.ip == ss.selfip && peerConfig.port == GetSyncingPort(ss.selfport) { utils.GetLogInstance().Debug("[SYNC] skip self", "peerport", peerConfig.port, "selfport", ss.selfport, "selfsyncport", GetSyncingPort(ss.selfport)) - continue - } - if peerConfig.client == nil { - continue + return } err := peerConfig.registerToBroadcast(ss.selfPeerHash[:], ss.selfip, ss.selfport) if err != nil { utils.GetLogInstance().Debug("[SYNC] register failed to peer", "ip", peerConfig.ip, "port", peerConfig.port, "selfPeerHash", ss.selfPeerHash) - continue + return } utils.GetLogInstance().Debug("[SYNC] register success", "ip", peerConfig.ip, "port", peerConfig.port) count++ - } + return + }) return count } // getMaxPeerHeight gets the maximum blockchain heights from peers func (ss *StateSync) getMaxPeerHeight() uint64 { - ss.CleanUpNilPeers() maxHeight := uint64(0) var wg sync.WaitGroup - for id := range ss.syncConfig.peers { + ss.syncConfig.ForEachPeer(func(peerConfig *SyncPeerConfig) (brk bool) { wg.Add(1) - go func(peerConfig *SyncPeerConfig) { + go func() { defer wg.Done() response := peerConfig.client.GetBlockChainHeight() ss.syncMux.Lock() @@ -581,8 +598,9 @@ func (ss *StateSync) getMaxPeerHeight() uint64 { maxHeight = response.BlockHeight } ss.syncMux.Unlock() - }(ss.syncConfig.peers[id]) - } + }() + return + }) wg.Wait() return maxHeight } diff --git a/internal/ctxerror/ctxerror.go b/internal/ctxerror/ctxerror.go new file mode 100644 index 000000000..73dc57563 --- /dev/null +++ b/internal/ctxerror/ctxerror.go @@ -0,0 +1,124 @@ +// Package ctxerror provides a context-aware error facility. +// +// Inspired by log15-style (semi-)structured logging, +// it also provides a log15 bridge. +package ctxerror + +//go:generate mockgen -source ctxerror.go -destination mock/ctxerror.go + +import ( + "fmt" +) + +// CtxError is a context-aware error container. +type CtxError interface { + // Error returns a fully formatted message, with context info. + Error() string + + // Message returns the bare error message, without context info. + Message() string + + // Contexts returns message contexts. + // Caller shall not modify the returned map. + Contexts() map[string]interface{} + + // WithCause chains an error after the receiver. + // It returns the merged/chained instance, + // where the message is ": ", + // and with contexts merged (ones in c takes precedence). + WithCause(c error) CtxError +} + +type ctxError struct { + msg string + ctx map[string]interface{} +} + +// New creates and returns a new context-aware error. +func New(msg string, ctx ...interface{}) CtxError { + e := &ctxError{msg: msg, ctx: make(map[string]interface{})} + e.updateCtx(ctx...) + return e +} + +func (e *ctxError) updateCtx(ctx ...interface{}) { + var name string + if len(ctx)%2 == 1 { + ctx = append(ctx, nil) + } + for idx, value := range ctx { + if idx%2 == 0 { + name = value.(string) + } else { + e.ctx[name] = value + } + } +} + +// Error returns a fully formatted message, with context info. +func (e *ctxError) Error() string { + s := e.msg + for k, v := range e.ctx { + s += fmt.Sprintf(", %s=%#v", k, v) + } + return s +} + +// Message returns the bare error message, without context info. +func (e *ctxError) Message() string { + return e.msg +} + +// Contexts returns message contexts. +// Caller shall not modify the returned map. +func (e *ctxError) Contexts() map[string]interface{} { + return e.ctx +} + +// WithCause chains an error after the receiver. +// It returns the merged/chained instance, +// where the message is “: ”, +// and with contexts merged (ones in c takes precedence). +func (e *ctxError) WithCause(c error) CtxError { + r := &ctxError{msg: e.msg + ": ", ctx: make(map[string]interface{})} + for k, v := range e.ctx { + r.ctx[k] = v + } + switch c := c.(type) { + case *ctxError: + r.msg += c.msg + for k, v := range c.ctx { + r.ctx[k] = v + } + default: + r.msg += c.Error() + } + return r +} + +// Log15Func is a log15-compatible logging function. +type Log15Func func(msg string, ctx ...interface{}) + +// Log15Logger logs something with a log15-style logging function. +type Log15Logger interface { + Log15(f Log15Func) +} + +// Log15 logs the receiver with a log15-style logging function. +func (e *ctxError) Log15(f Log15Func) { + var ctx []interface{} + for k, v := range e.ctx { + ctx = append(ctx, k, v) + } + f(e.msg, ctx...) +} + +// Log15 logs an error with a log15-style logging function. +// It handles both regular errors and Log15Logger-compliant errors. +func Log15(f Log15Func, e error) { + if e15, ok := e.(Log15Logger); ok { + e15.Log15(f) + } else { + f(e.Error()) + } +} diff --git a/internal/ctxerror/ctxerror_test.go b/internal/ctxerror/ctxerror_test.go new file mode 100644 index 000000000..0b6a95a25 --- /dev/null +++ b/internal/ctxerror/ctxerror_test.go @@ -0,0 +1,363 @@ +package ctxerror + +import ( + "errors" + "reflect" + "testing" +) + +func TestNew(t *testing.T) { + type args struct { + msg string + ctx []interface{} + } + tests := []struct { + name string + args args + want CtxError + }{ + { + name: "Empty", + args: args{msg: "", ctx: []interface{}{}}, + want: &ctxError{msg: "", ctx: map[string]interface{}{}}, + }, + { + name: "Regular", + args: args{msg: "omg", ctx: []interface{}{"wtf", 1, "bbq", 2}}, + want: &ctxError{msg: "omg", ctx: map[string]interface{}{"wtf": 1, "bbq": 2}}, + }, + { + name: "Truncated", + args: args{ + msg: "omg", + ctx: []interface{}{"wtf", 1, "bbq" /* missing value... */}, + }, + want: &ctxError{ + msg: "omg", + ctx: map[string]interface{}{"wtf": 1, "bbq": /* becomes */ nil}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := New(tt.args.msg, tt.args.ctx...); !reflect.DeepEqual(got, tt.want) { + t.Errorf("New() = %#v, want %#v", got, tt.want) + } + }) + } +} + +func Test_ctxError_updateCtx(t *testing.T) { + tests := []struct { + name string + before, after map[string]interface{} + delta []interface{} + }{ + { + name: "Empty", + before: map[string]interface{}{"omg": 1, "wtf": 2, "bbq": 3}, + delta: []interface{}{}, + after: map[string]interface{}{"omg": 1, "wtf": 2, "bbq": 3}, + }, + { + name: "Regular", + before: map[string]interface{}{"omg": 1, "wtf": 2, "bbq": 3}, + delta: []interface{}{"omg", 10, "wtf", 20}, + after: map[string]interface{}{"omg": 10, "wtf": 20, "bbq": 3}, + }, + { + name: "Truncated", + before: map[string]interface{}{"omg": 1, "wtf": 2, "bbq": 3}, + delta: []interface{}{"omg", 10, "wtf" /* missing value... */}, + after: map[string]interface{}{"omg": 10, "wtf": /* becomes */ nil, "bbq": 3}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + e := &ctxError{msg: tt.name, ctx: tt.before} + e.updateCtx(tt.delta...) + if !reflect.DeepEqual(e.ctx, tt.after) { + t.Errorf("expected ctx %#v != %#v seen", tt.after, e.ctx) + } + }) + } +} + +func Test_ctxError_Error(t *testing.T) { + type fields struct { + msg string + ctx map[string]interface{} + } + tests := []struct { + name string + fields fields + want string + }{ + { + name: "AllEmpty", + fields: fields{msg: "", ctx: map[string]interface{}{}}, + want: "", + }, + { + name: "CtxEmpty", + fields: fields{msg: "omg", ctx: map[string]interface{}{}}, + want: "omg", + }, + { + name: "MsgEmpty", + fields: fields{msg: "", ctx: map[string]interface{}{"wtf": "bbq"}}, + want: ", wtf=\"bbq\"", + }, + { + name: "Regular", + fields: fields{msg: "omg", ctx: map[string]interface{}{"wtf": "bbq"}}, + want: "omg, wtf=\"bbq\"", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + e := &ctxError{ + msg: tt.fields.msg, + ctx: tt.fields.ctx, + } + if got := e.Error(); got != tt.want { + t.Errorf("Error() = %#v, want %#v", got, tt.want) + } + }) + } +} + +func Test_ctxError_Message(t *testing.T) { + type fields struct { + msg string + ctx map[string]interface{} + } + tests := []struct { + name string + fields fields + want string + }{ + { + name: "AllEmpty", + fields: fields{msg: "", ctx: map[string]interface{}{}}, + want: "", + }, + { + name: "CtxEmpty", + fields: fields{msg: "omg", ctx: map[string]interface{}{}}, + want: "omg", + }, + { + name: "MsgEmpty", + fields: fields{msg: "", ctx: map[string]interface{}{"wtf": "bbq"}}, + want: "", + }, + { + name: "Regular", + fields: fields{msg: "omg", ctx: map[string]interface{}{"wtf": "bbq"}}, + want: "omg", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + e := &ctxError{ + msg: tt.fields.msg, + ctx: tt.fields.ctx, + } + if got := e.Message(); got != tt.want { + t.Errorf("Message() = %#v, want %#v", got, tt.want) + } + }) + } +} + +func Test_ctxError_Contexts(t *testing.T) { + type fields struct { + msg string + ctx map[string]interface{} + } + tests := []struct { + name string + fields fields + want map[string]interface{} + }{ + { + name: "Empty", + fields: fields{msg: "", ctx: map[string]interface{}{}}, + want: map[string]interface{}{}, + }, + { + name: "Regular", + fields: fields{ + msg: "", + ctx: map[string]interface{}{"omg": 1, "wtf": 2, "bbq": 3}, + }, + want: map[string]interface{}{"omg": 1, "wtf": 2, "bbq": 3}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + e := &ctxError{ + msg: tt.fields.msg, + ctx: tt.fields.ctx, + } + if got := e.Contexts(); !reflect.DeepEqual(got, tt.want) { + t.Errorf("Contexts() = %#v, want %#v", got, tt.want) + } + }) + } +} + +func Test_ctxError_WithCause(t *testing.T) { + type fields struct { + msg string + ctx map[string]interface{} + } + type args struct { + c error + } + tests := []struct { + name string + fields fields + args args + want CtxError + }{ + { + name: "CtxError", + fields: fields{ + msg: "hello", + ctx: map[string]interface{}{"omg": 1, "wtf": 2}, + }, + args: args{c: &ctxError{ + msg: "world", + ctx: map[string]interface{}{"wtf": 20, "bbq": 30}, + }}, + want: &ctxError{ + msg: "hello: world", + ctx: map[string]interface{}{"omg": 1, "wtf": 20, "bbq": 30}, + }, + }, + { + name: "RegularError", + fields: fields{ + msg: "hello", + ctx: map[string]interface{}{"omg": 1, "wtf": 2}, + }, + args: args{c: errors.New("world")}, + want: &ctxError{ + msg: "hello: world", + ctx: map[string]interface{}{"omg": 1, "wtf": 2}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + e := &ctxError{ + msg: tt.fields.msg, + ctx: tt.fields.ctx, + } + if got := e.WithCause(tt.args.c); !reflect.DeepEqual(got, tt.want) { + t.Errorf("WithCause() = %#v, want %#v", got, tt.want) + } + }) + } +} + +func Test_ctxError_Log15(t *testing.T) { + type fields struct { + msg string + ctx map[string]interface{} + } + type want struct { + msg string + ctx []interface{} + } + tests := []struct { + name string + fields fields + want want + }{ + { + name: "Empty", + fields: fields{msg: "", ctx: map[string]interface{}{}}, + want: want{msg: "", ctx: nil}, + }, + { + name: "Regular", + fields: fields{msg: "hello", ctx: map[string]interface{}{"omg": 1}}, + want: want{msg: "hello", ctx: []interface{}{"omg", 1}}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + called := false + f := func(msg string, ctx ...interface{}) { + called = true + if msg != tt.want.msg { + t.Errorf("expected message %#v != %#v seen", + tt.want.msg, msg) + } + if !reflect.DeepEqual(ctx, tt.want.ctx) { + t.Errorf("expected ctx %#v != %#v seen", ctx, tt.want.ctx) + } + } + e := &ctxError{ + msg: tt.fields.msg, + ctx: tt.fields.ctx, + } + e.Log15(f) + if !called { + t.Error("logging func not called") + } + }) + } +} + +func TestLog15(t *testing.T) { + type args struct { + e error + } + type want struct { + msg string + ctx []interface{} + } + tests := []struct { + name string + args args + want want + }{ + { + name: "Regular", + args: args{e: errors.New("hello")}, + want: want{msg: "hello", ctx: nil}, + }, + { + name: "CtxError", + args: args{e: &ctxError{ + msg: "hello", + ctx: map[string]interface{}{"omg": 1}, + }}, + want: want{msg: "hello", ctx: []interface{}{"omg", 1}}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + called := false + f := func(msg string, ctx ...interface{}) { + called = true + if msg != tt.want.msg { + t.Errorf("expected message %#v != %#v seen", + tt.want.msg, msg) + } + if !reflect.DeepEqual(ctx, tt.want.ctx) { + t.Errorf("expected ctx %#v != %#v seen", + tt.want.ctx, ctx) + } + } + Log15(f, tt.args.e) + if !called { + t.Errorf("logging func not called") + } + }) + } +} diff --git a/internal/ctxerror/mock/ctxerror.go b/internal/ctxerror/mock/ctxerror.go new file mode 100644 index 000000000..5354b2634 --- /dev/null +++ b/internal/ctxerror/mock/ctxerror.go @@ -0,0 +1,125 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: ctxerror.go + +// Package mock_ctxerror is a generated GoMock package. +package mock_ctxerror + +import ( + gomock "github.com/golang/mock/gomock" + ctxerror "github.com/harmony-one/harmony/internal/ctxerror" + reflect "reflect" +) + +// MockCtxError is a mock of CtxError interface +type MockCtxError struct { + ctrl *gomock.Controller + recorder *MockCtxErrorMockRecorder +} + +// MockCtxErrorMockRecorder is the mock recorder for MockCtxError +type MockCtxErrorMockRecorder struct { + mock *MockCtxError +} + +// NewMockCtxError creates a new mock instance +func NewMockCtxError(ctrl *gomock.Controller) *MockCtxError { + mock := &MockCtxError{ctrl: ctrl} + mock.recorder = &MockCtxErrorMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockCtxError) EXPECT() *MockCtxErrorMockRecorder { + return m.recorder +} + +// Error mocks base method +func (m *MockCtxError) Error() string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Error") + ret0, _ := ret[0].(string) + return ret0 +} + +// Error indicates an expected call of Error +func (mr *MockCtxErrorMockRecorder) Error() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Error", reflect.TypeOf((*MockCtxError)(nil).Error)) +} + +// Message mocks base method +func (m *MockCtxError) Message() string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Message") + ret0, _ := ret[0].(string) + return ret0 +} + +// Message indicates an expected call of Message +func (mr *MockCtxErrorMockRecorder) Message() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Message", reflect.TypeOf((*MockCtxError)(nil).Message)) +} + +// Contexts mocks base method +func (m *MockCtxError) Contexts() map[string]interface{} { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Contexts") + ret0, _ := ret[0].(map[string]interface{}) + return ret0 +} + +// Contexts indicates an expected call of Contexts +func (mr *MockCtxErrorMockRecorder) Contexts() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Contexts", reflect.TypeOf((*MockCtxError)(nil).Contexts)) +} + +// WithCause mocks base method +func (m *MockCtxError) WithCause(c error) ctxerror.CtxError { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WithCause", c) + ret0, _ := ret[0].(ctxerror.CtxError) + return ret0 +} + +// WithCause indicates an expected call of WithCause +func (mr *MockCtxErrorMockRecorder) WithCause(c interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WithCause", reflect.TypeOf((*MockCtxError)(nil).WithCause), c) +} + +// MockLog15Logger is a mock of Log15Logger interface +type MockLog15Logger struct { + ctrl *gomock.Controller + recorder *MockLog15LoggerMockRecorder +} + +// MockLog15LoggerMockRecorder is the mock recorder for MockLog15Logger +type MockLog15LoggerMockRecorder struct { + mock *MockLog15Logger +} + +// NewMockLog15Logger creates a new mock instance +func NewMockLog15Logger(ctrl *gomock.Controller) *MockLog15Logger { + mock := &MockLog15Logger{ctrl: ctrl} + mock.recorder = &MockLog15LoggerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockLog15Logger) EXPECT() *MockLog15LoggerMockRecorder { + return m.recorder +} + +// Log15 mocks base method +func (m *MockLog15Logger) Log15(f ctxerror.Log15Func) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Log15", f) +} + +// Log15 indicates an expected call of Log15 +func (mr *MockLog15LoggerMockRecorder) Log15(f interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Log15", reflect.TypeOf((*MockLog15Logger)(nil).Log15), f) +} diff --git a/node/node_syncing.go b/node/node_syncing.go index 80b69f65a..6934f4bff 100644 --- a/node/node_syncing.go +++ b/node/node_syncing.go @@ -13,6 +13,7 @@ import ( "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/types" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" + "github.com/harmony-one/harmony/internal/ctxerror" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/node/worker" "github.com/harmony-one/harmony/p2p" @@ -61,10 +62,9 @@ SyncingLoop: node.stateSync = syncing.CreateStateSync(node.SelfPeer.IP, node.SelfPeer.Port, node.GetSyncID()) } if node.stateSync.GetActivePeerNumber() == 0 { - if node.stateSync.CreateSyncConfig(getPeers()) { - node.stateSync.MakeConnectionToPeers() - } else { - utils.GetLogInstance().Debug("[SYNC] no active peers, continue SyncingLoop") + peers := getPeers() + if err := node.stateSync.CreateSyncConfig(peers); err != nil { + ctxerror.Log15(utils.GetLogInstance().Debug, err) continue SyncingLoop } } diff --git a/scripts/list_harmony_go_files.sh b/scripts/list_harmony_go_files.sh index a8d8c74e2..80474ab8f 100755 --- a/scripts/list_harmony_go_files.sh +++ b/scripts/list_harmony_go_files.sh @@ -4,5 +4,5 @@ exec git ls-files '*.go' | grep -v \ -e '\.pb\.go$' \ -e '/mock_stream\.go' \ -e '/host_mock\.go' \ - -e '^p2p/host/hostv2/mock/' \ + -e '/mock/[^/]*\.go' \ -e '/gen_[^/]*\.go'