Hold SyncConfig mutex for accessing peers

Use read/write (shared/exclusive) lock to enable concurrent read access
for most of the times.  Also fold a few methods from StateSync to
SyncConfig, to avoid accessing SyncConfig mutex from outside.

Ensure that the methods that lock sc.mtx on its own aren't called
with sc.mtx already locked, because Go mutexes are not reentrant.
pull/686/head
Eugene Kim 6 years ago
parent 4011ede7ea
commit fc0ddebc1a
  1. 157
      api/service/syncing/syncing.go

@ -70,6 +70,18 @@ func (sc *SyncConfig) AddPeer(peer *SyncPeerConfig) {
sc.peers = append(sc.peers, peer)
}
// ForEachPeer calls the given function with each peer.
// It breaks the iteration iff the function returns true.
func (sc *SyncConfig) ForEachPeer(f func(peer *SyncPeerConfig) (brk bool)) {
sc.mtx.RLock()
defer sc.mtx.RUnlock()
for _, peer := range sc.peers {
if f(peer) {
break
}
}
}
// CreateStateSync returns the implementation of StateSyncInterface interface.
func CreateStateSync(ip string, port string, peerHash [20]byte) *StateSync {
stateSync := &StateSync{}
@ -101,23 +113,39 @@ func (ss *StateSync) AddLastMileBlock(block *types.Block) {
}
// CloseConnections close grpc connections for state sync clients
func (ss *StateSync) CloseConnections() {
for _, pc := range ss.syncConfig.peers {
func (sc *SyncConfig) CloseConnections() {
sc.mtx.RLock()
defer sc.mtx.RUnlock()
for _, pc := range sc.peers {
pc.client.Close()
}
}
// FindPeerByHash returns the peer with the given hash, or nil if not found.
func (sc *SyncConfig) FindPeerByHash(peerHash []byte) *SyncPeerConfig {
sc.mtx.RLock()
defer sc.mtx.RUnlock()
for _, pc := range sc.peers {
if bytes.Compare(pc.peerHash, peerHash) == 0 {
return pc
}
}
return nil
}
// AddNewBlock will add newly received block into state syncing queue
func (ss *StateSync) AddNewBlock(peerHash []byte, block *types.Block) {
for i, pc := range ss.syncConfig.peers {
if bytes.Compare(pc.peerHash, peerHash) != 0 {
continue
}
pc.mux.Lock()
pc.newBlocks = append(pc.newBlocks, block)
pc.mux.Unlock()
utils.GetLogInstance().Debug("[SYNC] new block received", "total", len(ss.syncConfig.peers[i].newBlocks), "blockHeight", block.NumberU64())
pc := ss.syncConfig.FindPeerByHash(peerHash)
if pc == nil {
// Received a block with no active peer; just ignore.
return
}
// TODO ek – we shouldn't mess with SyncPeerConfig's mutex.
// Factor this into a method, like pc.AddNewBlock(block)
pc.mux.Lock()
defer pc.mux.Unlock()
pc.newBlocks = append(pc.newBlocks, block)
utils.GetLogInstance().Debug("[SYNC] new block received", "total", len(pc.newBlocks), "blockHeight", block.NumberU64())
}
// CreateTestSyncPeerConfig used for testing.
@ -188,19 +216,21 @@ func (ss *StateSync) GetActivePeerNumber() int {
if ss.syncConfig == nil {
return 0
}
// len() is atomic; no need to hold mutex.
return len(ss.syncConfig.peers)
}
// GetHowManyMaxConsensus returns max number of consensus nodes and the first ID of consensus group.
// getHowManyMaxConsensus returns max number of consensus nodes and the first ID of consensus group.
// Assumption: all peers are sorted by CompareSyncPeerConfigByBlockHashes first.
func (syncConfig *SyncConfig) GetHowManyMaxConsensus() (int, int) {
// Caller shall ensure mtx is locked for reading.
func (sc *SyncConfig) getHowManyMaxConsensus() (int, int) {
// As all peers are sorted by their blockHashes, all equal blockHashes should come together and consecutively.
curCount := 0
curFirstID := -1
maxCount := 0
maxFirstID := -1
for i := range syncConfig.peers {
if curFirstID == -1 || CompareSyncPeerConfigByblockHashes(syncConfig.peers[curFirstID], syncConfig.peers[i]) != 0 {
for i := range sc.peers {
if curFirstID == -1 || CompareSyncPeerConfigByblockHashes(sc.peers[curFirstID], sc.peers[i]) != 0 {
curCount = 1
curFirstID = i
} else {
@ -215,39 +245,44 @@ func (syncConfig *SyncConfig) GetHowManyMaxConsensus() (int, int) {
}
// InitForTesting used for testing.
func (syncConfig *SyncConfig) InitForTesting(client *downloader.Client, blockHashes [][]byte) {
for i := range syncConfig.peers {
syncConfig.peers[i].blockHashes = blockHashes
syncConfig.peers[i].client = client
func (sc *SyncConfig) InitForTesting(client *downloader.Client, blockHashes [][]byte) {
sc.mtx.RLock()
defer sc.mtx.RUnlock()
for i := range sc.peers {
sc.peers[i].blockHashes = blockHashes
sc.peers[i].client = client
}
}
// CleanUpPeers cleans up all peers whose blockHashes are not equal to consensus block hashes.
func (syncConfig *SyncConfig) CleanUpPeers(maxFirstID int) {
fixedPeer := syncConfig.peers[maxFirstID]
for i := 0; i < len(syncConfig.peers); i++ {
if CompareSyncPeerConfigByblockHashes(fixedPeer, syncConfig.peers[i]) != 0 {
// cleanUpPeers cleans up all peers whose blockHashes are not equal to
// consensus block hashes. Caller shall ensure mtx is locked for RW.
func (sc *SyncConfig) cleanUpPeers(maxFirstID int) {
fixedPeer := sc.peers[maxFirstID]
for i := 0; i < len(sc.peers); i++ {
if CompareSyncPeerConfigByblockHashes(fixedPeer, sc.peers[i]) != 0 {
// TODO: move it into a util delete func.
// See tip https://github.com/golang/go/wiki/SliceTricks
// Close the client and remove the peer out of the
syncConfig.peers[i].client.Close()
copy(syncConfig.peers[i:], syncConfig.peers[i+1:])
syncConfig.peers[len(syncConfig.peers)-1] = nil
syncConfig.peers = syncConfig.peers[:len(syncConfig.peers)-1]
sc.peers[i].client.Close()
copy(sc.peers[i:], sc.peers[i+1:])
sc.peers[len(sc.peers)-1] = nil
sc.peers = sc.peers[:len(sc.peers)-1]
}
}
}
// GetBlockHashesConsensusAndCleanUp chesk if all consensus hashes are equal.
func (sc *SyncConfig) GetBlockHashesConsensusAndCleanUp() bool {
sc.mtx.Lock()
defer sc.mtx.Unlock()
// Sort all peers by the blockHashes.
sort.Slice(sc.peers, func(i, j int) bool {
return CompareSyncPeerConfigByblockHashes(sc.peers[i], sc.peers[j]) == -1
})
maxFirstID, maxCount := sc.GetHowManyMaxConsensus()
maxFirstID, maxCount := sc.getHowManyMaxConsensus()
utils.GetLogInstance().Info("[SYNC] block consensus hashes", "maxFirstID", maxFirstID, "maxCount", maxCount)
if float64(maxCount) >= ConsensusRatio*float64(len(sc.peers)) {
sc.CleanUpPeers(maxFirstID)
sc.cleanUpPeers(maxFirstID)
return true
}
return false
@ -258,17 +293,18 @@ func (ss *StateSync) GetConsensusHashes(startHash []byte) bool {
count := 0
for {
var wg sync.WaitGroup
for id := range ss.syncConfig.peers {
ss.syncConfig.ForEachPeer(func(peerConfig *SyncPeerConfig) (brk bool) {
wg.Add(1)
go func(peerConfig *SyncPeerConfig) {
go func() {
defer wg.Done()
response := peerConfig.client.GetBlockHashes(startHash)
if response == nil {
return
}
peerConfig.blockHashes = response.Payload
}(ss.syncConfig.peers[id])
}
}()
return
})
wg.Wait()
if ss.syncConfig.GetBlockHashesConsensusAndCleanUp() {
break
@ -286,12 +322,13 @@ func (ss *StateSync) GetConsensusHashes(startHash []byte) bool {
func (ss *StateSync) generateStateSyncTaskQueue(bc *core.BlockChain) {
ss.stateSyncTaskQueue = queue.New(0)
for _, configPeer := range ss.syncConfig.peers {
ss.syncConfig.ForEachPeer(func(configPeer *SyncPeerConfig) (brk bool) {
for id, blockHash := range configPeer.blockHashes {
ss.stateSyncTaskQueue.Put(SyncBlockTask{index: id, blockHash: blockHash})
}
break
}
brk = true
return
})
utils.GetLogInstance().Info("syncing: Finished generateStateSyncTaskQueue", "length", ss.stateSyncTaskQueue.Len())
}
@ -299,10 +336,10 @@ func (ss *StateSync) generateStateSyncTaskQueue(bc *core.BlockChain) {
func (ss *StateSync) downloadBlocks(bc *core.BlockChain) {
// Initialize blockchain
var wg sync.WaitGroup
wg.Add(len(ss.syncConfig.peers))
count := 0
for i := range ss.syncConfig.peers {
go func(peerConfig *SyncPeerConfig, stateSyncTaskQueue *queue.Queue, bc *core.BlockChain) {
ss.syncConfig.ForEachPeer(func(peerConfig *SyncPeerConfig) (brk bool) {
wg.Add(1)
go func(stateSyncTaskQueue *queue.Queue, bc *core.BlockChain) {
defer wg.Done()
for !stateSyncTaskQueue.Empty() {
task, err := ss.stateSyncTaskQueue.Poll(1, time.Millisecond)
@ -339,8 +376,9 @@ func (ss *StateSync) downloadBlocks(bc *core.BlockChain) {
ss.commonBlocks[syncTask.index] = &blockObj
ss.syncMux.Unlock()
}
}(ss.syncConfig.peers[i], ss.stateSyncTaskQueue, bc)
}
}(ss.stateSyncTaskQueue, bc)
return
})
wg.Wait()
utils.GetLogInstance().Info("[SYNC] Finished downloadBlocks.")
}
@ -377,8 +415,7 @@ func GetHowManyMaxConsensus(blocks []*types.Block) (int, int) {
func (ss *StateSync) getMaxConsensusBlockFromParentHash(parentHash common.Hash) *types.Block {
candidateBlocks := []*types.Block{}
ss.syncMux.Lock()
for id := range ss.syncConfig.peers {
peerConfig := ss.syncConfig.peers[id]
ss.syncConfig.ForEachPeer(func(peerConfig *SyncPeerConfig) (brk bool) {
for _, block := range peerConfig.newBlocks {
ph := block.ParentHash()
if bytes.Compare(ph[:], parentHash[:]) == 0 {
@ -386,7 +423,8 @@ func (ss *StateSync) getMaxConsensusBlockFromParentHash(parentHash common.Hash)
break
}
}
}
return
})
ss.syncMux.Unlock()
if len(candidateBlocks) == 0 {
return nil
@ -466,10 +504,13 @@ func (ss *StateSync) generateNewState(bc *core.BlockChain, worker *worker.Worker
}
parentHash = block.Hash()
}
// TODO ek – Do we need to hold syncMux now that syncConfig has its onw
// mutex?
ss.syncMux.Lock()
for id := range ss.syncConfig.peers {
ss.syncConfig.peers[id].newBlocks = []*types.Block{}
}
ss.syncConfig.ForEachPeer(func(peer *SyncPeerConfig) (brk bool) {
peer.newBlocks = []*types.Block{}
return
})
ss.syncMux.Unlock()
// update last mile blocks if any
@ -522,23 +563,24 @@ func (ss *StateSync) RegisterNodeInfo() int {
"activePeerNumber", len(ss.syncConfig.peers))
count := 0
for id := range ss.syncConfig.peers {
peerConfig := ss.syncConfig.peers[id]
ss.syncConfig.ForEachPeer(func(peerConfig *SyncPeerConfig) (brk bool) {
if count >= registrationNumber {
break
brk = true
return
}
if peerConfig.ip == ss.selfip && peerConfig.port == GetSyncingPort(ss.selfport) {
utils.GetLogInstance().Debug("[SYNC] skip self", "peerport", peerConfig.port, "selfport", ss.selfport, "selfsyncport", GetSyncingPort(ss.selfport))
continue
return
}
err := peerConfig.registerToBroadcast(ss.selfPeerHash[:], ss.selfip, ss.selfport)
if err != nil {
utils.GetLogInstance().Debug("[SYNC] register failed to peer", "ip", peerConfig.ip, "port", peerConfig.port, "selfPeerHash", ss.selfPeerHash)
continue
return
}
utils.GetLogInstance().Debug("[SYNC] register success", "ip", peerConfig.ip, "port", peerConfig.port)
count++
}
return
})
return count
}
@ -546,9 +588,9 @@ func (ss *StateSync) RegisterNodeInfo() int {
func (ss *StateSync) getMaxPeerHeight() uint64 {
maxHeight := uint64(0)
var wg sync.WaitGroup
for id := range ss.syncConfig.peers {
ss.syncConfig.ForEachPeer(func(peerConfig *SyncPeerConfig) (brk bool) {
wg.Add(1)
go func(peerConfig *SyncPeerConfig) {
go func() {
defer wg.Done()
response := peerConfig.client.GetBlockChainHeight()
ss.syncMux.Lock()
@ -556,8 +598,9 @@ func (ss *StateSync) getMaxPeerHeight() uint64 {
maxHeight = response.BlockHeight
}
ss.syncMux.Unlock()
}(ss.syncConfig.peers[id])
}
}()
return
})
wg.Wait()
return maxHeight
}

Loading…
Cancel
Save