[stream] added downloader / consensus interface

pull/3586/head
Jacky Wang 4 years ago
parent 53bde710af
commit 312843e5a1
No known key found for this signature in database
GPG Key ID: 1085CE5F4FF5842C
  1. 2
      consensus/consensus.go
  2. 16
      consensus/consensus_v2.go
  3. 131
      consensus/downloader.go
  4. 11
      consensus/validator.go

@ -130,6 +130,8 @@ type Consensus struct {
finality int64
// finalityCounter keep tracks of the finality time
finalityCounter int64
dHelper *downloadHelper
}
// SetCommitDelay sets the commit message delay. If set to non-zero,

@ -467,13 +467,26 @@ func (consensus *Consensus) Start(
}
consensus.getLogger().Info().Msg("[ConsensusMainLoop] Ended.")
}()
if consensus.dHelper != nil {
consensus.dHelper.start()
}
}
// Close close the consensus. If current is in normal commit phase, wait until the commit
// phase end.
func (consensus *Consensus) Close() error {
if consensus.dHelper != nil {
consensus.dHelper.close()
}
consensus.waitForCommit()
return nil
}
// waitForCommit wait extra 2 seconds for commit phase to finish
func (consensus *Consensus) waitForCommit() {
if consensus.Mode() != Normal || consensus.phase != FBFTCommit {
return nil
return
}
// We only need to wait consensus is in normal commit phase
utils.Logger().Warn().Str("phase", consensus.phase.String()).Msg("[shutdown] commit phase has to wait")
@ -483,7 +496,6 @@ func (consensus *Consensus) Close() error {
utils.Logger().Warn().Msg("[shutdown] wait for consensus finished")
time.Sleep(time.Millisecond * 100)
}
return nil
}
// LastMileBlockIter is the iterator to iterate over the last mile blocks in consensus cache.

@ -0,0 +1,131 @@
package consensus
import (
"github.com/ethereum/go-ethereum/event"
"github.com/harmony-one/harmony/core/types"
"github.com/pkg/errors"
)
// downloader is the adapter interface for downloader.Downloader, which is used for
// 1. Subscribe download finished event to help syncing to the latest block.
// 2. Trigger the downloader to start working
type downloader interface {
SubscribeDownloadFinished(ch chan struct{}) event.Subscription
SubscribeDownloadStarted(ch chan struct{}) event.Subscription
DownloadAsync()
}
// Set downloader set the downloader of the shard to consensus
// TODO: It will be better to move this to consensus.New and register consensus as a service
func (consensus *Consensus) SetDownloader(d downloader) {
consensus.dHelper = newDownloadHelper(consensus, d)
}
type downloadHelper struct {
d downloader
c *Consensus
startedCh chan struct{}
finishedCh chan struct{}
startedSub event.Subscription
finishedSub event.Subscription
}
func newDownloadHelper(c *Consensus, d downloader) *downloadHelper {
startedCh := make(chan struct{}, 1)
startedSub := d.SubscribeDownloadStarted(startedCh)
finishedCh := make(chan struct{}, 1)
finishedSub := d.SubscribeDownloadFinished(finishedCh)
return &downloadHelper{
c: c,
d: d,
startedCh: startedCh,
finishedCh: finishedCh,
startedSub: startedSub,
finishedSub: finishedSub,
}
}
func (dh *downloadHelper) start() {
go dh.downloadStartedLoop()
go dh.downloadFinishedLoop()
}
func (dh *downloadHelper) close() {
dh.startedSub.Unsubscribe()
dh.finishedSub.Unsubscribe()
}
func (dh *downloadHelper) downloadStartedLoop() {
for {
select {
case <-dh.startedCh:
dh.c.BlocksNotSynchronized()
case err := <-dh.finishedSub.Err():
dh.c.getLogger().Info().Err(err).Msg("consensus download finished loop closed")
return
}
}
}
func (dh *downloadHelper) downloadFinishedLoop() {
for {
select {
case <-dh.finishedCh:
err := dh.c.addConsensusLastMile()
if err != nil {
dh.c.getLogger().Error().Err(err).Msg("add last mile failed")
}
dh.c.BlocksSynchronized()
case err := <-dh.finishedSub.Err():
dh.c.getLogger().Info().Err(err).Msg("consensus download finished loop closed")
return
}
}
}
func (consensus *Consensus) addConsensusLastMile() error {
curBN := consensus.Blockchain.CurrentBlock().NumberU64()
blockIter, err := consensus.GetLastMileBlockIter(curBN + 1)
if err != nil {
return err
}
for {
block := blockIter.Next()
if block == nil {
break
}
if _, err := consensus.Blockchain.InsertChain(types.Blocks{block}, true); err != nil {
return errors.Wrap(err, "failed to InsertChain")
}
}
return nil
}
func (consensus *Consensus) spinUpStateSync() {
if consensus.dHelper != nil {
consensus.dHelper.d.DownloadAsync()
consensus.current.SetMode(Syncing)
for _, v := range consensus.consensusTimeout {
v.Stop()
}
} else {
consensus.spinLegacyStateSync()
}
}
func (consensus *Consensus) spinLegacyStateSync() {
select {
case consensus.BlockNumLowChan <- struct{}{}:
consensus.current.SetMode(Syncing)
for _, v := range consensus.consensusTimeout {
v.Stop()
}
default:
}
}

@ -402,14 +402,3 @@ func (consensus *Consensus) broadcastConsensusP2pMessages(p2pMsgs []*NetworkMess
}
return nil
}
func (consensus *Consensus) spinUpStateSync() {
select {
case consensus.BlockNumLowChan <- struct{}{}:
consensus.current.SetMode(Syncing)
for _, v := range consensus.consensusTimeout {
v.Stop()
}
default:
}
}

Loading…
Cancel
Save