Fixed race errors in downloader & consensus (#4442)

* Fix race.

* Fix race.

* Added log.

* Fix panic.
pull/4446/head
Konstantin 1 year ago committed by GitHub
parent 78d802428c
commit 10119bd8d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      consensus/consensus.go
  2. 2
      consensus/debug.go
  3. 10
      p2p/stream/common/streammanager/streammanager.go
  4. 2
      p2p/stream/protocols/sync/protocol.go
  5. 2
      p2p/stream/protocols/sync/stream.go

@ -188,6 +188,8 @@ func (consensus *Consensus) BlocksSynchronized() {
// BlocksNotSynchronized lets the main loop know that block is not synchronized
func (consensus *Consensus) BlocksNotSynchronized() {
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
consensus.syncNotReadyChan()
}

@ -14,6 +14,8 @@ func (consensus *Consensus) getConsensusPhase() string {
// GetConsensusMode returns the current mode of the consensus
func (consensus *Consensus) GetConsensusMode() string {
consensus.mutex.RLock()
defer consensus.mutex.RUnlock()
return consensus.current.mode.String()
}

@ -299,7 +299,7 @@ func (sm *streamManager) removeAllStreamOnClose() {
wg.Wait()
// Be nice. after close, the field is still accessible to prevent potential panics
sm.streams = newStreamSet()
sm.streams.Erase()
}
func (sm *streamManager) discoverAndSetupStream(discCtx context.Context) (int, error) {
@ -403,6 +403,14 @@ func newStreamSet() *streamSet {
}
}
func (ss *streamSet) Erase() {
ss.lock.Lock()
defer ss.lock.Unlock()
ss.streams = make(map[sttypes.StreamID]sttypes.Stream)
ss.numByProto = make(map[sttypes.ProtoSpec]int)
}
func (ss *streamSet) size() int {
ss.lock.RLock()
defer ss.lock.RUnlock()

@ -180,7 +180,7 @@ func (p *Protocol) HandleStream(raw libp2p_network.Stream) {
Msg("failed to add new stream")
return
}
fmt.Println("Connected to", raw.Conn().RemotePeer().String(), "(", st.ProtoID(), ")")
fmt.Println("Connected to", raw.Conn().RemotePeer().String(), "(", st.ProtoID(), ")", "my ID: ", raw.Conn().LocalPeer().String())
st.run()
}

@ -298,7 +298,7 @@ func (st *syncStream) computeBlockNumberResp(rid uint64) *syncpb.Message {
return syncpb.MakeGetBlockNumberResponseMessage(rid, bn)
}
func (st syncStream) computeGetBlockHashesResp(rid uint64, bns []uint64) (*syncpb.Message, error) {
func (st *syncStream) computeGetBlockHashesResp(rid uint64, bns []uint64) (*syncpb.Message, error) {
if len(bns) > GetBlockHashesAmountCap {
err := fmt.Errorf("GetBlockHashes amount exceed cap: %v>%v", len(bns), GetBlockHashesAmountCap)
return nil, err

Loading…
Cancel
Save