From e9f3a9e69eccb88f8830625cb439a9c53e2f4822 Mon Sep 17 00:00:00 2001 From: Jacky Wang Date: Mon, 15 Mar 2021 14:01:07 -0700 Subject: [PATCH 1/6] [stream] requestmanager use refresh streams method instead of event based stream operation. --- .../common/requestmanager/requestmanager.go | 63 +++++++++++++------ p2p/stream/common/streammanager/interface.go | 18 ++++-- 2 files changed, 55 insertions(+), 26 deletions(-) diff --git a/p2p/stream/common/requestmanager/requestmanager.go b/p2p/stream/common/requestmanager/requestmanager.go index 4e0d33c14..75356de41 100644 --- a/p2p/stream/common/requestmanager/requestmanager.go +++ b/p2p/stream/common/requestmanager/requestmanager.go @@ -26,6 +26,7 @@ type requestManager struct { waitings requestQueue // double linked list of requests that are on the waiting list // Stream events + sm streammanager.Reader newStreamC <-chan streammanager.EvtStreamAdded rmStreamC <-chan streammanager.EvtStreamRemoved // Request events @@ -40,11 +41,11 @@ type requestManager struct { } // NewRequestManager creates a new request manager -func NewRequestManager(sm streammanager.Subscriber) RequestManager { +func NewRequestManager(sm streammanager.ReaderSubscriber) RequestManager { return newRequestManager(sm) } -func newRequestManager(sm streammanager.Subscriber) *requestManager { +func newRequestManager(sm streammanager.ReaderSubscriber) *requestManager { // subscribe at initialize to prevent misuse of upper function which might cause // the bootstrap peers are ignored newStreamC := make(chan streammanager.EvtStreamAdded) @@ -60,6 +61,7 @@ func newRequestManager(sm streammanager.Subscriber) *requestManager { pendings: make(map[uint64]*request), waitings: newRequestQueue(), + sm: sm, newStreamC: newStreamC, rmStreamC: rmStreamC, cancelReqC: make(chan cancelReqData, 16), @@ -182,13 +184,11 @@ func (rm *requestManager) loop() { case data := <-rm.cancelReqC: rm.handleCancelRequest(data) - case evt := <-rm.newStreamC: - rm.logger.Info().Str("streamID", string(evt.Stream.ID())).Msg("add new stream") - rm.addNewStream(evt.Stream) + case <-rm.newStreamC: + rm.refreshStreams() - case evt := <-rm.rmStreamC: - rm.logger.Info().Str("streamID", string(evt.ID)).Msg("remove stream") - rm.removeStream(evt.ID) + case <-rm.rmStreamC: + rm.refreshStreams() case <-rm.stopC: rm.logger.Info().Msg("request manager stopped") @@ -349,26 +349,49 @@ func (rm *requestManager) pickAvailableStream(req *request) (*stream, error) { return nil, errors.New("no more available streams") } -func (rm *requestManager) addNewStream(st sttypes.Stream) { +func (rm *requestManager) refreshStreams() { rm.lock.Lock() defer rm.lock.Unlock() - if _, ok := rm.streams[st.ID()]; !ok { - rm.streams[st.ID()] = &stream{Stream: st} - rm.available[st.ID()] = struct{}{} + added, removed := checkStreamUpdates(rm.streams, rm.sm.GetStreams()) + + for _, st := range added { + rm.logger.Info().Str("streamID", string(st.ID())).Msg("add new stream") + rm.addNewStream(st) + } + for _, st := range removed { + rm.logger.Info().Str("streamID", string(st.ID())).Msg("remove stream") + rm.removeStream(st) } } -// removeStream remove the stream from request manager, clear the pending request -// of the stream. Return whether a pending request is canceled in the stream, -func (rm *requestManager) removeStream(id sttypes.StreamID) { - rm.lock.Lock() - defer rm.lock.Unlock() +func checkStreamUpdates(exists map[sttypes.StreamID]*stream, targets []sttypes.Stream) (added []sttypes.Stream, removed []*stream) { + targetM := make(map[sttypes.StreamID]sttypes.Stream) - st, ok := rm.streams[id] - if !ok { - return + for _, target := range targets { + id := target.ID() + targetM[id] = target + if _, ok := exists[id]; !ok { + added = append(added, target) + } + } + for id, exist := range exists { + if _, ok := targetM[id]; !ok { + removed = append(removed, exist) + } } + return +} + +func (rm *requestManager) addNewStream(st sttypes.Stream) { + rm.streams[st.ID()] = &stream{Stream: st} + rm.available[st.ID()] = struct{}{} +} + +// removeStream remove the stream from request manager, clear the pending request +// of the stream. +func (rm *requestManager) removeStream(st *stream) { + id := st.ID() delete(rm.available, id) delete(rm.streams, id) diff --git a/p2p/stream/common/streammanager/interface.go b/p2p/stream/common/streammanager/interface.go index 5c34488d8..e6659cf65 100644 --- a/p2p/stream/common/streammanager/interface.go +++ b/p2p/stream/common/streammanager/interface.go @@ -14,13 +14,19 @@ import ( // StreamManager is the interface for streamManager type StreamManager interface { p2ptypes.LifeCycle - StreamOperator + Operator Subscriber - StreamReader + Reader } -// StreamOperator handles new stream or remove stream -type StreamOperator interface { +// ReaderSubscriber reads stream and subscribe stream events +type ReaderSubscriber interface { + Reader + Subscriber +} + +// Operator handles new stream or remove stream +type Operator interface { NewStream(stream sttypes.Stream) error RemoveStream(stID sttypes.StreamID) error } @@ -31,8 +37,8 @@ type Subscriber interface { SubscribeRemoveStreamEvent(ch chan<- EvtStreamRemoved) event.Subscription } -// StreamReader is the interface to read stream in stream manager -type StreamReader interface { +// Reader is the interface to read stream in stream manager +type Reader interface { GetStreams() []sttypes.Stream GetStreamByID(id sttypes.StreamID) (sttypes.Stream, bool) } From f12b9be3d84174adf55dcc92da1b0a8b3e313044 Mon Sep 17 00:00:00 2001 From: Jacky Wang Date: Mon, 15 Mar 2021 14:39:55 -0700 Subject: [PATCH 2/6] [stream] fix request manager tests for refresh stream --- .../common/requestmanager/interface_test.go | 45 ++++++++- .../requestmanager/requestmanager_test.go | 93 ++++++++++++++++++- 2 files changed, 136 insertions(+), 2 deletions(-) diff --git a/p2p/stream/common/requestmanager/interface_test.go b/p2p/stream/common/requestmanager/interface_test.go index 01e8c595a..57ffe3355 100644 --- a/p2p/stream/common/requestmanager/interface_test.go +++ b/p2p/stream/common/requestmanager/interface_test.go @@ -14,19 +14,25 @@ import ( var testProtoID = sttypes.ProtoID("harmony/sync/unitest/0/1.0.0") type testStreamManager struct { + streams map[sttypes.StreamID]sttypes.Stream + newStreamFeed event.Feed rmStreamFeed event.Feed } func newTestStreamManager() *testStreamManager { - return &testStreamManager{} + return &testStreamManager{ + streams: make(map[sttypes.StreamID]sttypes.Stream), + } } func (sm *testStreamManager) addNewStream(st sttypes.Stream) { + sm.streams[st.ID()] = st sm.newStreamFeed.Send(streammanager.EvtStreamAdded{Stream: st}) } func (sm *testStreamManager) rmStream(stid sttypes.StreamID) { + delete(sm.streams, stid) sm.rmStreamFeed.Send(streammanager.EvtStreamRemoved{ID: stid}) } @@ -38,6 +44,20 @@ func (sm *testStreamManager) SubscribeRemoveStreamEvent(ch chan<- streammanager. return sm.rmStreamFeed.Subscribe(ch) } +func (sm *testStreamManager) GetStreams() []sttypes.Stream { + sts := make([]sttypes.Stream, 0, len(sm.streams)) + + for _, st := range sm.streams { + sts = append(sts, st) + } + return sts +} + +func (sm *testStreamManager) GetStreamByID(id sttypes.StreamID) (sttypes.Stream, bool) { + st, exist := sm.streams[id] + return st, exist +} + type testStream struct { id sttypes.StreamID rm *requestManager @@ -79,6 +99,29 @@ func (st *testStream) ResetOnClose() error { return nil } +func makeDummyTestStreams(indexes []int) []sttypes.Stream { + sts := make([]sttypes.Stream, 0, len(indexes)) + + for _, index := range indexes { + sts = append(sts, &testStream{ + id: makeStreamID(index), + }) + } + return sts +} + +func makeDummyStreamSets(indexes []int) map[sttypes.StreamID]*stream { + m := make(map[sttypes.StreamID]*stream) + + for _, index := range indexes { + st := &testStream{ + id: makeStreamID(index), + } + m[st.ID()] = &stream{Stream: st} + } + return m +} + func makeStreamID(index int) sttypes.StreamID { return sttypes.StreamID(strconv.Itoa(index)) } diff --git a/p2p/stream/common/requestmanager/requestmanager_test.go b/p2p/stream/common/requestmanager/requestmanager_test.go index a81bdf202..01638f6b9 100644 --- a/p2p/stream/common/requestmanager/requestmanager_test.go +++ b/p2p/stream/common/requestmanager/requestmanager_test.go @@ -2,6 +2,7 @@ package requestmanager import ( "context" + "fmt" "sync" "sync/atomic" "testing" @@ -303,6 +304,95 @@ func TestGenReqID(t *testing.T) { } } +func TestCheckStreamUpdates(t *testing.T) { + tests := []struct { + exists map[sttypes.StreamID]*stream + targets []sttypes.Stream + expAddedIndexes []int + expRemovedIndexes []int + }{ + { + exists: makeDummyStreamSets([]int{1, 2, 3, 4, 5}), + targets: makeDummyTestStreams([]int{2, 3, 4, 5}), + expAddedIndexes: []int{}, + expRemovedIndexes: []int{1}, + }, + { + exists: makeDummyStreamSets([]int{1, 2, 3, 4, 5}), + targets: makeDummyTestStreams([]int{1, 2, 3, 4, 5, 6}), + expAddedIndexes: []int{6}, + expRemovedIndexes: []int{}, + }, + { + exists: makeDummyStreamSets([]int{}), + targets: makeDummyTestStreams([]int{}), + expAddedIndexes: []int{}, + expRemovedIndexes: []int{}, + }, + { + exists: makeDummyStreamSets([]int{}), + targets: makeDummyTestStreams([]int{1, 2, 3, 4, 5}), + expAddedIndexes: []int{1, 2, 3, 4, 5}, + expRemovedIndexes: []int{}, + }, + { + exists: makeDummyStreamSets([]int{1, 2, 3, 4, 5}), + targets: makeDummyTestStreams([]int{}), + expAddedIndexes: []int{}, + expRemovedIndexes: []int{1, 2, 3, 4, 5}, + }, + { + exists: makeDummyStreamSets([]int{1, 2, 3, 4, 5}), + targets: makeDummyTestStreams([]int{6, 7, 8, 9, 10}), + expAddedIndexes: []int{6, 7, 8, 9, 10}, + expRemovedIndexes: []int{1, 2, 3, 4, 5}, + }, + } + + for i, test := range tests { + added, removed := checkStreamUpdates(test.exists, test.targets) + + if err := checkStreamIDsEqual(added, test.expAddedIndexes); err != nil { + t.Errorf("Test %v: check added: %v", i, err) + } + if err := checkStreamIDsEqual2(removed, test.expRemovedIndexes); err != nil { + t.Errorf("Test %v: check removed: %v", i, err) + } + } +} + +func checkStreamIDsEqual(sts []sttypes.Stream, expIndexes []int) error { + if len(sts) != len(expIndexes) { + return fmt.Errorf("size not equal") + } + expM := make(map[sttypes.StreamID]struct{}) + for _, index := range expIndexes { + expM[makeStreamID(index)] = struct{}{} + } + for _, st := range sts { + if _, ok := expM[st.ID()]; !ok { + return fmt.Errorf("stream not exist in exp: %v", st.ID()) + } + } + return nil +} + +func checkStreamIDsEqual2(sts []*stream, expIndexes []int) error { + if len(sts) != len(expIndexes) { + return fmt.Errorf("size not equal") + } + expM := make(map[sttypes.StreamID]struct{}) + for _, index := range expIndexes { + expM[makeStreamID(index)] = struct{}{} + } + for _, st := range sts { + if _, ok := expM[st.ID()]; !ok { + return fmt.Errorf("stream not exist in exp: %v", st.ID()) + } + } + return nil +} + type testSuite struct { rm *requestManager sm *testStreamManager @@ -330,7 +420,8 @@ func newTestSuite(delayF delayFunc, respF responseFunc, numStreams int) *testSui cancel: cancel, } for i := 0; i != numStreams; i++ { - ts.bootStreams = append(ts.bootStreams, ts.makeTestStream(i)) + st := ts.makeTestStream(i) + ts.bootStreams = append(ts.bootStreams, st) } return ts } From 72f74c08cb3df6b259a8b3125511e70b90dea6b8 Mon Sep 17 00:00:00 2001 From: Jacky Wang Date: Mon, 15 Mar 2021 23:08:30 -0700 Subject: [PATCH 3/6] [stream] request can be canceled from waitings in request manager --- .../common/requestmanager/requestmanager.go | 23 ++-- .../requestmanager/requestmanager_test.go | 75 +++++++++++- p2p/stream/common/requestmanager/types.go | 107 ++++++++++++------ .../common/requestmanager/types_test.go | 18 +-- 4 files changed, 167 insertions(+), 56 deletions(-) diff --git a/p2p/stream/common/requestmanager/requestmanager.go b/p2p/stream/common/requestmanager/requestmanager.go index 75356de41..2cac5c51d 100644 --- a/p2p/stream/common/requestmanager/requestmanager.go +++ b/p2p/stream/common/requestmanager/requestmanager.go @@ -23,7 +23,7 @@ type requestManager struct { streams map[sttypes.StreamID]*stream // All streams available map[sttypes.StreamID]struct{} // Streams that are available for request pendings map[uint64]*request // requests that are sent but not received response - waitings requestQueue // double linked list of requests that are on the waiting list + waitings requestQueues // double linked list of requests that are on the waiting list // Stream events sm streammanager.Reader @@ -59,7 +59,7 @@ func newRequestManager(sm streammanager.ReaderSubscriber) *requestManager { streams: make(map[sttypes.StreamID]*stream), available: make(map[sttypes.StreamID]struct{}), pendings: make(map[uint64]*request), - waitings: newRequestQueue(), + waitings: newRequestQueues(), sm: sm, newStreamC: newStreamC, @@ -104,8 +104,8 @@ func (rm *requestManager) doRequestAsync(ctx context.Context, raw sttypes.Reques select { case <-ctx.Done(): // canceled or timeout in upper function calls rm.cancelReqC <- cancelReqData{ - reqID: req.ReqID(), - err: ctx.Err(), + req: req, + err: ctx.Err(), } case <-req.doneC: } @@ -255,21 +255,20 @@ func (rm *requestManager) handleCancelRequest(data cancelReqData) { rm.lock.Lock() defer rm.lock.Unlock() - req, ok := rm.pendings[data.reqID] - if !ok { - return - } + var ( + req = data.req + err = data.err + ) + rm.waitings.Remove(req) rm.removePendingRequest(req) - var stid sttypes.StreamID if req.owner != nil { stid = req.owner.ID() } - req.doneWithResponse(responseData{ resp: nil, stID: stid, - err: data.err, + err: err, }) } @@ -417,7 +416,7 @@ func (rm *requestManager) close() { rm.pendings = make(map[uint64]*request) rm.available = make(map[sttypes.StreamID]struct{}) rm.streams = make(map[sttypes.StreamID]*stream) - rm.waitings = newRequestQueue() + rm.waitings = newRequestQueues() close(rm.stopC) } diff --git a/p2p/stream/common/requestmanager/requestmanager_test.go b/p2p/stream/common/requestmanager/requestmanager_test.go index 01638f6b9..63804d0d5 100644 --- a/p2p/stream/common/requestmanager/requestmanager_test.go +++ b/p2p/stream/common/requestmanager/requestmanager_test.go @@ -133,7 +133,7 @@ func TestRequestManager_UnknownDelivery(t *testing.T) { req := makeTestRequest(100) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) resC := ts.rm.doRequestAsync(ctx, req) - time.Sleep(6 * time.Second) + time.Sleep(2 * time.Second) cancel() // Since the reqID is not delivered, the result is not delivered to the request @@ -165,6 +165,79 @@ func TestRequestManager_StaleDelivery(t *testing.T) { } } +// TestRequestManager_cancelWaitings test the scenario of request being canceled +// while still in waitings. In order to do this, +// 1. Set number of streams to 1 +// 2. Occupy the stream with a request, and block +// 3. Do the second request. This request will be in waitings. +// 4. Cancel the second request. Request shall be removed from waitings. +// 5. Unblock the first request +// 6. Request 1 finished, request 2 canceled +func TestRequestManager_cancelWaitings(t *testing.T) { + req1 := makeTestRequest(1) + req2 := makeTestRequest(2) + + var req1Block sync.Mutex + req1Block.Lock() + unblockReq1 := func() { req1Block.Unlock() } + + delayF := makeDefaultDelayFunc(150 * time.Millisecond) + respF := func(req *testRequest) *testResponse { + if req.index == req1.index { + req1Block.Lock() + } + return makeDefaultResponseFunc()(req) + } + ts := newTestSuite(delayF, respF, 1) + ts.Start() + defer ts.Close() + + ctx1, _ := context.WithTimeout(context.Background(), 1*time.Second) + ctx2, cancel2 := context.WithTimeout(context.Background(), 1*time.Second) + resC1 := ts.rm.doRequestAsync(ctx1, req1) + resC2 := ts.rm.doRequestAsync(ctx2, req2) + + cancel2() + unblockReq1() + + var ( + res1 responseData + res2 responseData + ) + + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + + select { + case res1 = <-resC1: + case <-time.After(1 * time.Second): + t.Errorf("req1 timed out") + } + }() + go func() { + defer wg.Done() + + select { + case res2 = <-resC2: + case <-time.After(1 * time.Second): + t.Errorf("req2 timed out") + } + }() + wg.Wait() + + if res1.err != nil { + t.Errorf("request 1 shall return nil error") + } + if res2.err != context.Canceled { + t.Errorf("request 2 shall be canceled") + } + if ts.rm.waitings.reqsPLow.len() != 0 || ts.rm.waitings.reqsPHigh.len() != 0 { + t.Errorf("waitings shall be clean") + } +} + // closing request manager will also close all func TestRequestManager_Close(t *testing.T) { delayF := makeDefaultDelayFunc(1 * time.Second) diff --git a/p2p/stream/common/requestmanager/types.go b/p2p/stream/common/requestmanager/types.go index 7cf29c821..c73488e1d 100644 --- a/p2p/stream/common/requestmanager/types.go +++ b/p2p/stream/common/requestmanager/types.go @@ -114,8 +114,8 @@ func (st *stream) clearPendingRequest() *request { } type cancelReqData struct { - reqID uint64 - err error + req *request + err error } // responseData is the wrapped response for stream requests @@ -125,58 +125,97 @@ type responseData struct { err error } -// requestQueue is a wrapper of double linked list with Request as type -type requestQueue struct { - reqsPHigh *list.List // high priority, currently defined by upper function calls - reqsPLow *list.List // low priority, applied to all normal requests - lock sync.Mutex +// requestQueues is a wrapper of double linked list with Request as type +type requestQueues struct { + reqsPHigh *requestQueue // high priority, currently defined by upper function calls + reqsPLow *requestQueue // low priority, applied to all normal requests } -func newRequestQueue() requestQueue { - return requestQueue{ - reqsPHigh: list.New(), - reqsPLow: list.New(), +func newRequestQueues() requestQueues { + return requestQueues{ + reqsPHigh: newRequestQueue(), + reqsPLow: newRequestQueue(), } } -// Push add a new request to requestQueue. -func (q *requestQueue) Push(req *request, priority reqPriority) error { - q.lock.Lock() - defer q.lock.Unlock() - +// Push add a new request to requestQueues. +func (q *requestQueues) Push(req *request, priority reqPriority) error { if priority == reqPriorityHigh || req.priority == reqPriorityHigh { - return pushRequestToList(q.reqsPHigh, req) + return q.reqsPHigh.push(req) } - if priority == reqPriorityLow { - return pushRequestToList(q.reqsPLow, req) - } - return nil + return q.reqsPLow.push(req) } // Pop will first pop the request from high priority, and then pop from low priority -func (q *requestQueue) Pop() *request { - q.lock.Lock() - defer q.lock.Unlock() - - if req := popRequestFromList(q.reqsPHigh); req != nil { +func (q *requestQueues) Pop() *request { + if req := q.reqsPHigh.pop(); req != nil { return req } - return popRequestFromList(q.reqsPLow) + return q.reqsPLow.pop() +} + +func (q *requestQueues) Remove(req *request) { + q.reqsPHigh.remove(req) + q.reqsPLow.remove(req) +} + +// requestQueue is a thread safe request double linked list +type requestQueue struct { + l *list.List + elemM map[*request]*list.Element // Yes, pointer as map key + lock sync.Mutex +} + +func newRequestQueue() *requestQueue { + return &requestQueue{ + l: list.New(), + elemM: make(map[*request]*list.Element), + } } -func pushRequestToList(l *list.List, req *request) error { - if l.Len() >= maxWaitingSize { +func (rl *requestQueue) push(req *request) error { + rl.lock.Lock() + defer rl.lock.Unlock() + + if rl.l.Len() >= maxWaitingSize { return ErrQueueFull } - l.PushBack(req) + elem := rl.l.PushBack(req) + rl.elemM[req] = elem return nil } -func popRequestFromList(l *list.List) *request { - elem := l.Front() +func (rl *requestQueue) pop() *request { + rl.lock.Lock() + defer rl.lock.Unlock() + + elem := rl.l.Front() if elem == nil { return nil } - l.Remove(elem) - return elem.Value.(*request) + rl.l.Remove(elem) + + req := elem.Value.(*request) + delete(rl.elemM, req) + return req +} + +func (rl *requestQueue) remove(req *request) { + rl.lock.Lock() + defer rl.lock.Unlock() + + elem := rl.elemM[req] + if elem == nil { + // Already removed + return + } + rl.l.Remove(elem) + delete(rl.elemM, req) +} + +func (rl *requestQueue) len() int { + rl.lock.Lock() + defer rl.lock.Unlock() + + return rl.l.Len() } diff --git a/p2p/stream/common/requestmanager/types_test.go b/p2p/stream/common/requestmanager/types_test.go index f98c93323..f19c5f0ca 100644 --- a/p2p/stream/common/requestmanager/types_test.go +++ b/p2p/stream/common/requestmanager/types_test.go @@ -102,19 +102,19 @@ func TestRequestQueue_Pop(t *testing.T) { } } -func makeTestRequestQueue(sizes []int) requestQueue { +func makeTestRequestQueue(sizes []int) requestQueues { if len(sizes) != 2 { panic("unexpected sizes") } - q := newRequestQueue() + q := newRequestQueues() index := 0 for i := 0; i != sizes[0]; i++ { - q.reqsPHigh.PushBack(wrapRequestFromRaw(makeTestRequest(uint64(index)))) + q.reqsPHigh.push(wrapRequestFromRaw(makeTestRequest(uint64(index)))) index++ } for i := 0; i != sizes[1]; i++ { - q.reqsPLow.PushBack(wrapRequestFromRaw(makeTestRequest(uint64(index)))) + q.reqsPLow.push(wrapRequestFromRaw(makeTestRequest(uint64(index)))) index++ } return q @@ -138,15 +138,15 @@ func getTestRequestFromElem(elem *list.Element) (*testRequest, error) { return raw, nil } -func (q *requestQueue) checkSizes(sizes []int) error { +func (q *requestQueues) checkSizes(sizes []int) error { if len(sizes) != 2 { panic("expect 2 sizes") } - if q.reqsPHigh.Len() != sizes[0] { - return fmt.Errorf("high priority %v / %v", q.reqsPHigh.Len(), sizes[0]) + if q.reqsPHigh.len() != sizes[0] { + return fmt.Errorf("high priority %v / %v", q.reqsPHigh.len(), sizes[0]) } - if q.reqsPLow.Len() != sizes[1] { - return fmt.Errorf("low priority %v / %v", q.reqsPLow.Len(), sizes[2]) + if q.reqsPLow.len() != sizes[1] { + return fmt.Errorf("low priority %v / %v", q.reqsPLow.len(), sizes[2]) } return nil } From 6ef5bc5faa22a78b2dfcac6fc3f9462ef94d1f76 Mon Sep 17 00:00:00 2001 From: Jacky Wang Date: Mon, 15 Mar 2021 23:25:02 -0700 Subject: [PATCH 4/6] [stream] elaborated log message to pickAvailableStream --- p2p/stream/common/requestmanager/requestmanager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/stream/common/requestmanager/requestmanager.go b/p2p/stream/common/requestmanager/requestmanager.go index 2cac5c51d..d1671cecf 100644 --- a/p2p/stream/common/requestmanager/requestmanager.go +++ b/p2p/stream/common/requestmanager/requestmanager.go @@ -289,7 +289,7 @@ func (rm *requestManager) getNextRequest() (*request, *stream) { st, err := rm.pickAvailableStream(req) if err != nil { - rm.logger.Debug().Msg("No available streams.") + rm.logger.Debug().Err(err).Str("request", req.String()).Msg("Pick available streams.") rm.addRequestToWaitings(req, reqPriorityHigh) return nil, nil } From 312843e5a15e9a323f0e0a3244069575c2f06e2b Mon Sep 17 00:00:00 2001 From: Jacky Wang Date: Fri, 12 Mar 2021 18:51:18 -0800 Subject: [PATCH 5/6] [stream] added downloader / consensus interface --- consensus/consensus.go | 2 + consensus/consensus_v2.go | 16 ++++- consensus/downloader.go | 131 ++++++++++++++++++++++++++++++++++++++ consensus/validator.go | 11 ---- 4 files changed, 147 insertions(+), 13 deletions(-) create mode 100644 consensus/downloader.go diff --git a/consensus/consensus.go b/consensus/consensus.go index cef85f0d8..a4676f397 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -130,6 +130,8 @@ type Consensus struct { finality int64 // finalityCounter keep tracks of the finality time finalityCounter int64 + + dHelper *downloadHelper } // SetCommitDelay sets the commit message delay. If set to non-zero, diff --git a/consensus/consensus_v2.go b/consensus/consensus_v2.go index 31be69473..faa56c9bb 100644 --- a/consensus/consensus_v2.go +++ b/consensus/consensus_v2.go @@ -467,13 +467,26 @@ func (consensus *Consensus) Start( } consensus.getLogger().Info().Msg("[ConsensusMainLoop] Ended.") }() + + if consensus.dHelper != nil { + consensus.dHelper.start() + } } // Close close the consensus. If current is in normal commit phase, wait until the commit // phase end. func (consensus *Consensus) Close() error { + if consensus.dHelper != nil { + consensus.dHelper.close() + } + consensus.waitForCommit() + return nil +} + +// waitForCommit wait extra 2 seconds for commit phase to finish +func (consensus *Consensus) waitForCommit() { if consensus.Mode() != Normal || consensus.phase != FBFTCommit { - return nil + return } // We only need to wait consensus is in normal commit phase utils.Logger().Warn().Str("phase", consensus.phase.String()).Msg("[shutdown] commit phase has to wait") @@ -483,7 +496,6 @@ func (consensus *Consensus) Close() error { utils.Logger().Warn().Msg("[shutdown] wait for consensus finished") time.Sleep(time.Millisecond * 100) } - return nil } // LastMileBlockIter is the iterator to iterate over the last mile blocks in consensus cache. diff --git a/consensus/downloader.go b/consensus/downloader.go new file mode 100644 index 000000000..fd7f8077f --- /dev/null +++ b/consensus/downloader.go @@ -0,0 +1,131 @@ +package consensus + +import ( + "github.com/ethereum/go-ethereum/event" + "github.com/harmony-one/harmony/core/types" + "github.com/pkg/errors" +) + +// downloader is the adapter interface for downloader.Downloader, which is used for +// 1. Subscribe download finished event to help syncing to the latest block. +// 2. Trigger the downloader to start working +type downloader interface { + SubscribeDownloadFinished(ch chan struct{}) event.Subscription + SubscribeDownloadStarted(ch chan struct{}) event.Subscription + DownloadAsync() +} + +// Set downloader set the downloader of the shard to consensus +// TODO: It will be better to move this to consensus.New and register consensus as a service +func (consensus *Consensus) SetDownloader(d downloader) { + consensus.dHelper = newDownloadHelper(consensus, d) +} + +type downloadHelper struct { + d downloader + c *Consensus + + startedCh chan struct{} + finishedCh chan struct{} + + startedSub event.Subscription + finishedSub event.Subscription +} + +func newDownloadHelper(c *Consensus, d downloader) *downloadHelper { + startedCh := make(chan struct{}, 1) + startedSub := d.SubscribeDownloadStarted(startedCh) + + finishedCh := make(chan struct{}, 1) + finishedSub := d.SubscribeDownloadFinished(finishedCh) + + return &downloadHelper{ + c: c, + d: d, + startedCh: startedCh, + finishedCh: finishedCh, + startedSub: startedSub, + finishedSub: finishedSub, + } +} + +func (dh *downloadHelper) start() { + go dh.downloadStartedLoop() + go dh.downloadFinishedLoop() +} + +func (dh *downloadHelper) close() { + dh.startedSub.Unsubscribe() + dh.finishedSub.Unsubscribe() +} + +func (dh *downloadHelper) downloadStartedLoop() { + for { + select { + case <-dh.startedCh: + dh.c.BlocksNotSynchronized() + + case err := <-dh.finishedSub.Err(): + dh.c.getLogger().Info().Err(err).Msg("consensus download finished loop closed") + return + } + } +} + +func (dh *downloadHelper) downloadFinishedLoop() { + for { + select { + case <-dh.finishedCh: + err := dh.c.addConsensusLastMile() + if err != nil { + dh.c.getLogger().Error().Err(err).Msg("add last mile failed") + } + dh.c.BlocksSynchronized() + + case err := <-dh.finishedSub.Err(): + dh.c.getLogger().Info().Err(err).Msg("consensus download finished loop closed") + return + } + } +} + +func (consensus *Consensus) addConsensusLastMile() error { + curBN := consensus.Blockchain.CurrentBlock().NumberU64() + blockIter, err := consensus.GetLastMileBlockIter(curBN + 1) + if err != nil { + return err + } + for { + block := blockIter.Next() + if block == nil { + break + } + if _, err := consensus.Blockchain.InsertChain(types.Blocks{block}, true); err != nil { + return errors.Wrap(err, "failed to InsertChain") + } + } + return nil +} + +func (consensus *Consensus) spinUpStateSync() { + if consensus.dHelper != nil { + consensus.dHelper.d.DownloadAsync() + consensus.current.SetMode(Syncing) + for _, v := range consensus.consensusTimeout { + v.Stop() + } + } else { + consensus.spinLegacyStateSync() + } +} + +func (consensus *Consensus) spinLegacyStateSync() { + select { + case consensus.BlockNumLowChan <- struct{}{}: + consensus.current.SetMode(Syncing) + for _, v := range consensus.consensusTimeout { + v.Stop() + } + default: + } +} diff --git a/consensus/validator.go b/consensus/validator.go index 1d97471fd..df3e4dff0 100644 --- a/consensus/validator.go +++ b/consensus/validator.go @@ -402,14 +402,3 @@ func (consensus *Consensus) broadcastConsensusP2pMessages(p2pMsgs []*NetworkMess } return nil } - -func (consensus *Consensus) spinUpStateSync() { - select { - case consensus.BlockNumLowChan <- struct{}{}: - consensus.current.SetMode(Syncing) - for _, v := range consensus.consensusTimeout { - v.Stop() - } - default: - } -} From 72286f09ec08b4eb38204b3bf1b45b2e2adb4cb1 Mon Sep 17 00:00:00 2001 From: Jacky Wang Date: Mon, 15 Mar 2021 23:57:43 -0700 Subject: [PATCH 6/6] [stream] resolve some code review comments --- consensus/consensus_v2.go | 3 +++ consensus/downloader.go | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/consensus/consensus_v2.go b/consensus/consensus_v2.go index faa56c9bb..63c701816 100644 --- a/consensus/consensus_v2.go +++ b/consensus/consensus_v2.go @@ -334,6 +334,8 @@ func (consensus *Consensus) Start( break } } + + // TODO: Refactor this piece of code to consensus/downloader.go after DNS legacy sync is removed case <-consensus.syncReadyChan: consensus.getLogger().Info().Msg("[ConsensusMainLoop] syncReadyChan") consensus.mutex.Lock() @@ -352,6 +354,7 @@ func (consensus *Consensus) Start( } consensus.mutex.Unlock() + // TODO: Refactor this piece of code to consensus/downloader.go after DNS legacy sync is removed case <-consensus.syncNotReadyChan: consensus.getLogger().Info().Msg("[ConsensusMainLoop] syncNotReadyChan") consensus.SetBlockNum(consensus.Blockchain.CurrentHeader().Number().Uint64() + 1) diff --git a/consensus/downloader.go b/consensus/downloader.go index fd7f8077f..3c53dae1f 100644 --- a/consensus/downloader.go +++ b/consensus/downloader.go @@ -65,7 +65,7 @@ func (dh *downloadHelper) downloadStartedLoop() { case <-dh.startedCh: dh.c.BlocksNotSynchronized() - case err := <-dh.finishedSub.Err(): + case err := <-dh.startedSub.Err(): dh.c.getLogger().Info().Err(err).Msg("consensus download finished loop closed") return }