From 312843e5a15e9a323f0e0a3244069575c2f06e2b Mon Sep 17 00:00:00 2001 From: Jacky Wang Date: Fri, 12 Mar 2021 18:51:18 -0800 Subject: [PATCH] [stream] added downloader / consensus interface --- consensus/consensus.go | 2 + consensus/consensus_v2.go | 16 ++++- consensus/downloader.go | 131 ++++++++++++++++++++++++++++++++++++++ consensus/validator.go | 11 ---- 4 files changed, 147 insertions(+), 13 deletions(-) create mode 100644 consensus/downloader.go diff --git a/consensus/consensus.go b/consensus/consensus.go index cef85f0d8..a4676f397 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -130,6 +130,8 @@ type Consensus struct { finality int64 // finalityCounter keep tracks of the finality time finalityCounter int64 + + dHelper *downloadHelper } // SetCommitDelay sets the commit message delay. If set to non-zero, diff --git a/consensus/consensus_v2.go b/consensus/consensus_v2.go index 31be69473..faa56c9bb 100644 --- a/consensus/consensus_v2.go +++ b/consensus/consensus_v2.go @@ -467,13 +467,26 @@ func (consensus *Consensus) Start( } consensus.getLogger().Info().Msg("[ConsensusMainLoop] Ended.") }() + + if consensus.dHelper != nil { + consensus.dHelper.start() + } } // Close close the consensus. If current is in normal commit phase, wait until the commit // phase end. func (consensus *Consensus) Close() error { + if consensus.dHelper != nil { + consensus.dHelper.close() + } + consensus.waitForCommit() + return nil +} + +// waitForCommit wait extra 2 seconds for commit phase to finish +func (consensus *Consensus) waitForCommit() { if consensus.Mode() != Normal || consensus.phase != FBFTCommit { - return nil + return } // We only need to wait consensus is in normal commit phase utils.Logger().Warn().Str("phase", consensus.phase.String()).Msg("[shutdown] commit phase has to wait") @@ -483,7 +496,6 @@ func (consensus *Consensus) Close() error { utils.Logger().Warn().Msg("[shutdown] wait for consensus finished") time.Sleep(time.Millisecond * 100) } - return nil } // LastMileBlockIter is the iterator to iterate over the last mile blocks in consensus cache. diff --git a/consensus/downloader.go b/consensus/downloader.go new file mode 100644 index 000000000..fd7f8077f --- /dev/null +++ b/consensus/downloader.go @@ -0,0 +1,131 @@ +package consensus + +import ( + "github.com/ethereum/go-ethereum/event" + "github.com/harmony-one/harmony/core/types" + "github.com/pkg/errors" +) + +// downloader is the adapter interface for downloader.Downloader, which is used for +// 1. Subscribe download finished event to help syncing to the latest block. +// 2. Trigger the downloader to start working +type downloader interface { + SubscribeDownloadFinished(ch chan struct{}) event.Subscription + SubscribeDownloadStarted(ch chan struct{}) event.Subscription + DownloadAsync() +} + +// Set downloader set the downloader of the shard to consensus +// TODO: It will be better to move this to consensus.New and register consensus as a service +func (consensus *Consensus) SetDownloader(d downloader) { + consensus.dHelper = newDownloadHelper(consensus, d) +} + +type downloadHelper struct { + d downloader + c *Consensus + + startedCh chan struct{} + finishedCh chan struct{} + + startedSub event.Subscription + finishedSub event.Subscription +} + +func newDownloadHelper(c *Consensus, d downloader) *downloadHelper { + startedCh := make(chan struct{}, 1) + startedSub := d.SubscribeDownloadStarted(startedCh) + + finishedCh := make(chan struct{}, 1) + finishedSub := d.SubscribeDownloadFinished(finishedCh) + + return &downloadHelper{ + c: c, + d: d, + startedCh: startedCh, + finishedCh: finishedCh, + startedSub: startedSub, + finishedSub: finishedSub, + } +} + +func (dh *downloadHelper) start() { + go dh.downloadStartedLoop() + go dh.downloadFinishedLoop() +} + +func (dh *downloadHelper) close() { + dh.startedSub.Unsubscribe() + dh.finishedSub.Unsubscribe() +} + +func (dh *downloadHelper) downloadStartedLoop() { + for { + select { + case <-dh.startedCh: + dh.c.BlocksNotSynchronized() + + case err := <-dh.finishedSub.Err(): + dh.c.getLogger().Info().Err(err).Msg("consensus download finished loop closed") + return + } + } +} + +func (dh *downloadHelper) downloadFinishedLoop() { + for { + select { + case <-dh.finishedCh: + err := dh.c.addConsensusLastMile() + if err != nil { + dh.c.getLogger().Error().Err(err).Msg("add last mile failed") + } + dh.c.BlocksSynchronized() + + case err := <-dh.finishedSub.Err(): + dh.c.getLogger().Info().Err(err).Msg("consensus download finished loop closed") + return + } + } +} + +func (consensus *Consensus) addConsensusLastMile() error { + curBN := consensus.Blockchain.CurrentBlock().NumberU64() + blockIter, err := consensus.GetLastMileBlockIter(curBN + 1) + if err != nil { + return err + } + for { + block := blockIter.Next() + if block == nil { + break + } + if _, err := consensus.Blockchain.InsertChain(types.Blocks{block}, true); err != nil { + return errors.Wrap(err, "failed to InsertChain") + } + } + return nil +} + +func (consensus *Consensus) spinUpStateSync() { + if consensus.dHelper != nil { + consensus.dHelper.d.DownloadAsync() + consensus.current.SetMode(Syncing) + for _, v := range consensus.consensusTimeout { + v.Stop() + } + } else { + consensus.spinLegacyStateSync() + } +} + +func (consensus *Consensus) spinLegacyStateSync() { + select { + case consensus.BlockNumLowChan <- struct{}{}: + consensus.current.SetMode(Syncing) + for _, v := range consensus.consensusTimeout { + v.Stop() + } + default: + } +} diff --git a/consensus/validator.go b/consensus/validator.go index 1d97471fd..df3e4dff0 100644 --- a/consensus/validator.go +++ b/consensus/validator.go @@ -402,14 +402,3 @@ func (consensus *Consensus) broadcastConsensusP2pMessages(p2pMsgs []*NetworkMess } return nil } - -func (consensus *Consensus) spinUpStateSync() { - select { - case consensus.BlockNumLowChan <- struct{}{}: - consensus.current.SetMode(Syncing) - for _, v := range consensus.consensusTimeout { - v.Stop() - } - default: - } -}