From fc0ddebc1ad0ebc488e373d49db6380457bf3a9d Mon Sep 17 00:00:00 2001 From: Eugene Kim Date: Fri, 5 Apr 2019 20:45:50 -0700 Subject: [PATCH] Hold SyncConfig mutex for accessing peers Use read/write (shared/exclusive) lock to enable concurrent read access for most of the times. Also fold a few methods from StateSync to SyncConfig, to avoid accessing SyncConfig mutex from outside. Ensure that the methods that lock sc.mtx on its own aren't called with sc.mtx already locked, because Go mutexes are not reentrant. --- api/service/syncing/syncing.go | 157 +++++++++++++++++++++------------ 1 file changed, 100 insertions(+), 57 deletions(-) diff --git a/api/service/syncing/syncing.go b/api/service/syncing/syncing.go index 8986f947c..ba9086fe3 100644 --- a/api/service/syncing/syncing.go +++ b/api/service/syncing/syncing.go @@ -70,6 +70,18 @@ func (sc *SyncConfig) AddPeer(peer *SyncPeerConfig) { sc.peers = append(sc.peers, peer) } +// ForEachPeer calls the given function with each peer. +// It breaks the iteration iff the function returns true. +func (sc *SyncConfig) ForEachPeer(f func(peer *SyncPeerConfig) (brk bool)) { + sc.mtx.RLock() + defer sc.mtx.RUnlock() + for _, peer := range sc.peers { + if f(peer) { + break + } + } +} + // CreateStateSync returns the implementation of StateSyncInterface interface. func CreateStateSync(ip string, port string, peerHash [20]byte) *StateSync { stateSync := &StateSync{} @@ -101,23 +113,39 @@ func (ss *StateSync) AddLastMileBlock(block *types.Block) { } // CloseConnections close grpc connections for state sync clients -func (ss *StateSync) CloseConnections() { - for _, pc := range ss.syncConfig.peers { +func (sc *SyncConfig) CloseConnections() { + sc.mtx.RLock() + defer sc.mtx.RUnlock() + for _, pc := range sc.peers { pc.client.Close() } } +// FindPeerByHash returns the peer with the given hash, or nil if not found. +func (sc *SyncConfig) FindPeerByHash(peerHash []byte) *SyncPeerConfig { + sc.mtx.RLock() + defer sc.mtx.RUnlock() + for _, pc := range sc.peers { + if bytes.Compare(pc.peerHash, peerHash) == 0 { + return pc + } + } + return nil +} + // AddNewBlock will add newly received block into state syncing queue func (ss *StateSync) AddNewBlock(peerHash []byte, block *types.Block) { - for i, pc := range ss.syncConfig.peers { - if bytes.Compare(pc.peerHash, peerHash) != 0 { - continue - } - pc.mux.Lock() - pc.newBlocks = append(pc.newBlocks, block) - pc.mux.Unlock() - utils.GetLogInstance().Debug("[SYNC] new block received", "total", len(ss.syncConfig.peers[i].newBlocks), "blockHeight", block.NumberU64()) + pc := ss.syncConfig.FindPeerByHash(peerHash) + if pc == nil { + // Received a block with no active peer; just ignore. + return } + // TODO ek – we shouldn't mess with SyncPeerConfig's mutex. + // Factor this into a method, like pc.AddNewBlock(block) + pc.mux.Lock() + defer pc.mux.Unlock() + pc.newBlocks = append(pc.newBlocks, block) + utils.GetLogInstance().Debug("[SYNC] new block received", "total", len(pc.newBlocks), "blockHeight", block.NumberU64()) } // CreateTestSyncPeerConfig used for testing. @@ -188,19 +216,21 @@ func (ss *StateSync) GetActivePeerNumber() int { if ss.syncConfig == nil { return 0 } + // len() is atomic; no need to hold mutex. return len(ss.syncConfig.peers) } -// GetHowManyMaxConsensus returns max number of consensus nodes and the first ID of consensus group. +// getHowManyMaxConsensus returns max number of consensus nodes and the first ID of consensus group. // Assumption: all peers are sorted by CompareSyncPeerConfigByBlockHashes first. -func (syncConfig *SyncConfig) GetHowManyMaxConsensus() (int, int) { +// Caller shall ensure mtx is locked for reading. +func (sc *SyncConfig) getHowManyMaxConsensus() (int, int) { // As all peers are sorted by their blockHashes, all equal blockHashes should come together and consecutively. curCount := 0 curFirstID := -1 maxCount := 0 maxFirstID := -1 - for i := range syncConfig.peers { - if curFirstID == -1 || CompareSyncPeerConfigByblockHashes(syncConfig.peers[curFirstID], syncConfig.peers[i]) != 0 { + for i := range sc.peers { + if curFirstID == -1 || CompareSyncPeerConfigByblockHashes(sc.peers[curFirstID], sc.peers[i]) != 0 { curCount = 1 curFirstID = i } else { @@ -215,39 +245,44 @@ func (syncConfig *SyncConfig) GetHowManyMaxConsensus() (int, int) { } // InitForTesting used for testing. -func (syncConfig *SyncConfig) InitForTesting(client *downloader.Client, blockHashes [][]byte) { - for i := range syncConfig.peers { - syncConfig.peers[i].blockHashes = blockHashes - syncConfig.peers[i].client = client +func (sc *SyncConfig) InitForTesting(client *downloader.Client, blockHashes [][]byte) { + sc.mtx.RLock() + defer sc.mtx.RUnlock() + for i := range sc.peers { + sc.peers[i].blockHashes = blockHashes + sc.peers[i].client = client } } -// CleanUpPeers cleans up all peers whose blockHashes are not equal to consensus block hashes. -func (syncConfig *SyncConfig) CleanUpPeers(maxFirstID int) { - fixedPeer := syncConfig.peers[maxFirstID] - for i := 0; i < len(syncConfig.peers); i++ { - if CompareSyncPeerConfigByblockHashes(fixedPeer, syncConfig.peers[i]) != 0 { +// cleanUpPeers cleans up all peers whose blockHashes are not equal to +// consensus block hashes. Caller shall ensure mtx is locked for RW. +func (sc *SyncConfig) cleanUpPeers(maxFirstID int) { + fixedPeer := sc.peers[maxFirstID] + for i := 0; i < len(sc.peers); i++ { + if CompareSyncPeerConfigByblockHashes(fixedPeer, sc.peers[i]) != 0 { // TODO: move it into a util delete func. // See tip https://github.com/golang/go/wiki/SliceTricks // Close the client and remove the peer out of the - syncConfig.peers[i].client.Close() - copy(syncConfig.peers[i:], syncConfig.peers[i+1:]) - syncConfig.peers[len(syncConfig.peers)-1] = nil - syncConfig.peers = syncConfig.peers[:len(syncConfig.peers)-1] + sc.peers[i].client.Close() + copy(sc.peers[i:], sc.peers[i+1:]) + sc.peers[len(sc.peers)-1] = nil + sc.peers = sc.peers[:len(sc.peers)-1] } } } // GetBlockHashesConsensusAndCleanUp chesk if all consensus hashes are equal. func (sc *SyncConfig) GetBlockHashesConsensusAndCleanUp() bool { + sc.mtx.Lock() + defer sc.mtx.Unlock() // Sort all peers by the blockHashes. sort.Slice(sc.peers, func(i, j int) bool { return CompareSyncPeerConfigByblockHashes(sc.peers[i], sc.peers[j]) == -1 }) - maxFirstID, maxCount := sc.GetHowManyMaxConsensus() + maxFirstID, maxCount := sc.getHowManyMaxConsensus() utils.GetLogInstance().Info("[SYNC] block consensus hashes", "maxFirstID", maxFirstID, "maxCount", maxCount) if float64(maxCount) >= ConsensusRatio*float64(len(sc.peers)) { - sc.CleanUpPeers(maxFirstID) + sc.cleanUpPeers(maxFirstID) return true } return false @@ -258,17 +293,18 @@ func (ss *StateSync) GetConsensusHashes(startHash []byte) bool { count := 0 for { var wg sync.WaitGroup - for id := range ss.syncConfig.peers { + ss.syncConfig.ForEachPeer(func(peerConfig *SyncPeerConfig) (brk bool) { wg.Add(1) - go func(peerConfig *SyncPeerConfig) { + go func() { defer wg.Done() response := peerConfig.client.GetBlockHashes(startHash) if response == nil { return } peerConfig.blockHashes = response.Payload - }(ss.syncConfig.peers[id]) - } + }() + return + }) wg.Wait() if ss.syncConfig.GetBlockHashesConsensusAndCleanUp() { break @@ -286,12 +322,13 @@ func (ss *StateSync) GetConsensusHashes(startHash []byte) bool { func (ss *StateSync) generateStateSyncTaskQueue(bc *core.BlockChain) { ss.stateSyncTaskQueue = queue.New(0) - for _, configPeer := range ss.syncConfig.peers { + ss.syncConfig.ForEachPeer(func(configPeer *SyncPeerConfig) (brk bool) { for id, blockHash := range configPeer.blockHashes { ss.stateSyncTaskQueue.Put(SyncBlockTask{index: id, blockHash: blockHash}) } - break - } + brk = true + return + }) utils.GetLogInstance().Info("syncing: Finished generateStateSyncTaskQueue", "length", ss.stateSyncTaskQueue.Len()) } @@ -299,10 +336,10 @@ func (ss *StateSync) generateStateSyncTaskQueue(bc *core.BlockChain) { func (ss *StateSync) downloadBlocks(bc *core.BlockChain) { // Initialize blockchain var wg sync.WaitGroup - wg.Add(len(ss.syncConfig.peers)) count := 0 - for i := range ss.syncConfig.peers { - go func(peerConfig *SyncPeerConfig, stateSyncTaskQueue *queue.Queue, bc *core.BlockChain) { + ss.syncConfig.ForEachPeer(func(peerConfig *SyncPeerConfig) (brk bool) { + wg.Add(1) + go func(stateSyncTaskQueue *queue.Queue, bc *core.BlockChain) { defer wg.Done() for !stateSyncTaskQueue.Empty() { task, err := ss.stateSyncTaskQueue.Poll(1, time.Millisecond) @@ -339,8 +376,9 @@ func (ss *StateSync) downloadBlocks(bc *core.BlockChain) { ss.commonBlocks[syncTask.index] = &blockObj ss.syncMux.Unlock() } - }(ss.syncConfig.peers[i], ss.stateSyncTaskQueue, bc) - } + }(ss.stateSyncTaskQueue, bc) + return + }) wg.Wait() utils.GetLogInstance().Info("[SYNC] Finished downloadBlocks.") } @@ -377,8 +415,7 @@ func GetHowManyMaxConsensus(blocks []*types.Block) (int, int) { func (ss *StateSync) getMaxConsensusBlockFromParentHash(parentHash common.Hash) *types.Block { candidateBlocks := []*types.Block{} ss.syncMux.Lock() - for id := range ss.syncConfig.peers { - peerConfig := ss.syncConfig.peers[id] + ss.syncConfig.ForEachPeer(func(peerConfig *SyncPeerConfig) (brk bool) { for _, block := range peerConfig.newBlocks { ph := block.ParentHash() if bytes.Compare(ph[:], parentHash[:]) == 0 { @@ -386,7 +423,8 @@ func (ss *StateSync) getMaxConsensusBlockFromParentHash(parentHash common.Hash) break } } - } + return + }) ss.syncMux.Unlock() if len(candidateBlocks) == 0 { return nil @@ -466,10 +504,13 @@ func (ss *StateSync) generateNewState(bc *core.BlockChain, worker *worker.Worker } parentHash = block.Hash() } + // TODO ek – Do we need to hold syncMux now that syncConfig has its onw + // mutex? ss.syncMux.Lock() - for id := range ss.syncConfig.peers { - ss.syncConfig.peers[id].newBlocks = []*types.Block{} - } + ss.syncConfig.ForEachPeer(func(peer *SyncPeerConfig) (brk bool) { + peer.newBlocks = []*types.Block{} + return + }) ss.syncMux.Unlock() // update last mile blocks if any @@ -522,23 +563,24 @@ func (ss *StateSync) RegisterNodeInfo() int { "activePeerNumber", len(ss.syncConfig.peers)) count := 0 - for id := range ss.syncConfig.peers { - peerConfig := ss.syncConfig.peers[id] + ss.syncConfig.ForEachPeer(func(peerConfig *SyncPeerConfig) (brk bool) { if count >= registrationNumber { - break + brk = true + return } if peerConfig.ip == ss.selfip && peerConfig.port == GetSyncingPort(ss.selfport) { utils.GetLogInstance().Debug("[SYNC] skip self", "peerport", peerConfig.port, "selfport", ss.selfport, "selfsyncport", GetSyncingPort(ss.selfport)) - continue + return } err := peerConfig.registerToBroadcast(ss.selfPeerHash[:], ss.selfip, ss.selfport) if err != nil { utils.GetLogInstance().Debug("[SYNC] register failed to peer", "ip", peerConfig.ip, "port", peerConfig.port, "selfPeerHash", ss.selfPeerHash) - continue + return } utils.GetLogInstance().Debug("[SYNC] register success", "ip", peerConfig.ip, "port", peerConfig.port) count++ - } + return + }) return count } @@ -546,9 +588,9 @@ func (ss *StateSync) RegisterNodeInfo() int { func (ss *StateSync) getMaxPeerHeight() uint64 { maxHeight := uint64(0) var wg sync.WaitGroup - for id := range ss.syncConfig.peers { + ss.syncConfig.ForEachPeer(func(peerConfig *SyncPeerConfig) (brk bool) { wg.Add(1) - go func(peerConfig *SyncPeerConfig) { + go func() { defer wg.Done() response := peerConfig.client.GetBlockChainHeight() ss.syncMux.Lock() @@ -556,8 +598,9 @@ func (ss *StateSync) getMaxPeerHeight() uint64 { maxHeight = response.BlockHeight } ss.syncMux.Unlock() - }(ss.syncConfig.peers[id]) - } + }() + return + }) wg.Wait() return maxHeight }