Merge pull request #3586 from JackyWYX/stream_consensus

[Stream] added stream downloader to consensus
pull/3594/head
Rongjian Lan 4 years ago committed by GitHub
commit 1699ff773d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      consensus/consensus.go
  2. 19
      consensus/consensus_v2.go
  3. 131
      consensus/downloader.go
  4. 11
      consensus/validator.go

@ -130,6 +130,8 @@ type Consensus struct {
finality int64 finality int64
// finalityCounter keep tracks of the finality time // finalityCounter keep tracks of the finality time
finalityCounter int64 finalityCounter int64
dHelper *downloadHelper
} }
// SetCommitDelay sets the commit message delay. If set to non-zero, // SetCommitDelay sets the commit message delay. If set to non-zero,

@ -334,6 +334,8 @@ func (consensus *Consensus) Start(
break break
} }
} }
// TODO: Refactor this piece of code to consensus/downloader.go after DNS legacy sync is removed
case <-consensus.syncReadyChan: case <-consensus.syncReadyChan:
consensus.getLogger().Info().Msg("[ConsensusMainLoop] syncReadyChan") consensus.getLogger().Info().Msg("[ConsensusMainLoop] syncReadyChan")
consensus.mutex.Lock() consensus.mutex.Lock()
@ -352,6 +354,7 @@ func (consensus *Consensus) Start(
} }
consensus.mutex.Unlock() consensus.mutex.Unlock()
// TODO: Refactor this piece of code to consensus/downloader.go after DNS legacy sync is removed
case <-consensus.syncNotReadyChan: case <-consensus.syncNotReadyChan:
consensus.getLogger().Info().Msg("[ConsensusMainLoop] syncNotReadyChan") consensus.getLogger().Info().Msg("[ConsensusMainLoop] syncNotReadyChan")
consensus.SetBlockNum(consensus.Blockchain.CurrentHeader().Number().Uint64() + 1) consensus.SetBlockNum(consensus.Blockchain.CurrentHeader().Number().Uint64() + 1)
@ -467,13 +470,26 @@ func (consensus *Consensus) Start(
} }
consensus.getLogger().Info().Msg("[ConsensusMainLoop] Ended.") consensus.getLogger().Info().Msg("[ConsensusMainLoop] Ended.")
}() }()
if consensus.dHelper != nil {
consensus.dHelper.start()
}
} }
// Close close the consensus. If current is in normal commit phase, wait until the commit // Close close the consensus. If current is in normal commit phase, wait until the commit
// phase end. // phase end.
func (consensus *Consensus) Close() error { func (consensus *Consensus) Close() error {
if consensus.Mode() != Normal || consensus.phase != FBFTCommit { if consensus.dHelper != nil {
consensus.dHelper.close()
}
consensus.waitForCommit()
return nil return nil
}
// waitForCommit wait extra 2 seconds for commit phase to finish
func (consensus *Consensus) waitForCommit() {
if consensus.Mode() != Normal || consensus.phase != FBFTCommit {
return
} }
// We only need to wait consensus is in normal commit phase // We only need to wait consensus is in normal commit phase
utils.Logger().Warn().Str("phase", consensus.phase.String()).Msg("[shutdown] commit phase has to wait") utils.Logger().Warn().Str("phase", consensus.phase.String()).Msg("[shutdown] commit phase has to wait")
@ -483,7 +499,6 @@ func (consensus *Consensus) Close() error {
utils.Logger().Warn().Msg("[shutdown] wait for consensus finished") utils.Logger().Warn().Msg("[shutdown] wait for consensus finished")
time.Sleep(time.Millisecond * 100) time.Sleep(time.Millisecond * 100)
} }
return nil
} }
// LastMileBlockIter is the iterator to iterate over the last mile blocks in consensus cache. // LastMileBlockIter is the iterator to iterate over the last mile blocks in consensus cache.

@ -0,0 +1,131 @@
package consensus
import (
"github.com/ethereum/go-ethereum/event"
"github.com/harmony-one/harmony/core/types"
"github.com/pkg/errors"
)
// downloader is the adapter interface for downloader.Downloader, which is used for
// 1. Subscribe download finished event to help syncing to the latest block.
// 2. Trigger the downloader to start working
type downloader interface {
SubscribeDownloadFinished(ch chan struct{}) event.Subscription
SubscribeDownloadStarted(ch chan struct{}) event.Subscription
DownloadAsync()
}
// Set downloader set the downloader of the shard to consensus
// TODO: It will be better to move this to consensus.New and register consensus as a service
func (consensus *Consensus) SetDownloader(d downloader) {
consensus.dHelper = newDownloadHelper(consensus, d)
}
type downloadHelper struct {
d downloader
c *Consensus
startedCh chan struct{}
finishedCh chan struct{}
startedSub event.Subscription
finishedSub event.Subscription
}
func newDownloadHelper(c *Consensus, d downloader) *downloadHelper {
startedCh := make(chan struct{}, 1)
startedSub := d.SubscribeDownloadStarted(startedCh)
finishedCh := make(chan struct{}, 1)
finishedSub := d.SubscribeDownloadFinished(finishedCh)
return &downloadHelper{
c: c,
d: d,
startedCh: startedCh,
finishedCh: finishedCh,
startedSub: startedSub,
finishedSub: finishedSub,
}
}
func (dh *downloadHelper) start() {
go dh.downloadStartedLoop()
go dh.downloadFinishedLoop()
}
func (dh *downloadHelper) close() {
dh.startedSub.Unsubscribe()
dh.finishedSub.Unsubscribe()
}
func (dh *downloadHelper) downloadStartedLoop() {
for {
select {
case <-dh.startedCh:
dh.c.BlocksNotSynchronized()
case err := <-dh.startedSub.Err():
dh.c.getLogger().Info().Err(err).Msg("consensus download finished loop closed")
return
}
}
}
func (dh *downloadHelper) downloadFinishedLoop() {
for {
select {
case <-dh.finishedCh:
err := dh.c.addConsensusLastMile()
if err != nil {
dh.c.getLogger().Error().Err(err).Msg("add last mile failed")
}
dh.c.BlocksSynchronized()
case err := <-dh.finishedSub.Err():
dh.c.getLogger().Info().Err(err).Msg("consensus download finished loop closed")
return
}
}
}
func (consensus *Consensus) addConsensusLastMile() error {
curBN := consensus.Blockchain.CurrentBlock().NumberU64()
blockIter, err := consensus.GetLastMileBlockIter(curBN + 1)
if err != nil {
return err
}
for {
block := blockIter.Next()
if block == nil {
break
}
if _, err := consensus.Blockchain.InsertChain(types.Blocks{block}, true); err != nil {
return errors.Wrap(err, "failed to InsertChain")
}
}
return nil
}
func (consensus *Consensus) spinUpStateSync() {
if consensus.dHelper != nil {
consensus.dHelper.d.DownloadAsync()
consensus.current.SetMode(Syncing)
for _, v := range consensus.consensusTimeout {
v.Stop()
}
} else {
consensus.spinLegacyStateSync()
}
}
func (consensus *Consensus) spinLegacyStateSync() {
select {
case consensus.BlockNumLowChan <- struct{}{}:
consensus.current.SetMode(Syncing)
for _, v := range consensus.consensusTimeout {
v.Stop()
}
default:
}
}

@ -402,14 +402,3 @@ func (consensus *Consensus) broadcastConsensusP2pMessages(p2pMsgs []*NetworkMess
} }
return nil return nil
} }
func (consensus *Consensus) spinUpStateSync() {
select {
case consensus.BlockNumLowChan <- struct{}{}:
consensus.current.SetMode(Syncing)
for _, v := range consensus.consensusTimeout {
v.Stop()
}
default:
}
}

Loading…
Cancel
Save