Added LastMile locks.

pull/4369/head
frozen 2 years ago committed by Casey Gardiner
parent b9af100719
commit 3b49a6b7fe
  1. 26
      api/service/legacysync/syncing.go
  2. 10
      consensus/consensus_v2.go
  3. 29
      consensus/downloader.go

@ -18,6 +18,7 @@ import (
"github.com/harmony-one/harmony/api/service/legacysync/downloader"
pb "github.com/harmony-one/harmony/api/service/legacysync/downloader/proto"
"github.com/harmony-one/harmony/consensus"
consensus2 "github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/consensus/engine"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
@ -1142,20 +1143,19 @@ func (ss *StateSync) SyncLoop(bc core.BlockChain, worker *worker.Worker, isBeaco
func (ss *StateSync) addConsensusLastMile(bc core.BlockChain, consensus *consensus.Consensus) error {
curNumber := bc.CurrentBlock().NumberU64()
blockIter, err := consensus.GetLastMileBlockIter(curNumber + 1)
if err != nil {
return err
}
for {
block := blockIter.Next()
if block == nil {
break
}
if _, err := bc.InsertChain(types.Blocks{block}, true); err != nil {
return errors.Wrap(err, "failed to InsertChain")
err := consensus.GetLastMileBlockIter(curNumber+1, func(blockIter *consensus2.LastMileBlockIter) error {
for {
block := blockIter.Next()
if block == nil {
break
}
if _, err := bc.InsertChain(types.Blocks{block}, true); err != nil {
return errors.Wrap(err, "failed to InsertChain")
}
}
}
return nil
return nil
})
return err
}
// GetSyncingPort returns the syncing port.

@ -496,24 +496,24 @@ type LastMileBlockIter struct {
}
// GetLastMileBlockIter get the iterator of the last mile blocks starting from number bnStart
func (consensus *Consensus) GetLastMileBlockIter(bnStart uint64) (*LastMileBlockIter, error) {
func (consensus *Consensus) GetLastMileBlockIter(bnStart uint64, cb func(iter *LastMileBlockIter) error) error {
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
if consensus.BlockVerifier == nil {
return nil, errors.New("consensus haven't initialized yet")
return errors.New("consensus haven't initialized yet")
}
blocks, _, err := consensus.getLastMileBlocksAndMsg(bnStart)
if err != nil {
return nil, err
return err
}
return &LastMileBlockIter{
return cb(&LastMileBlockIter{
blockCandidates: blocks,
fbftLog: consensus.FBFTLog,
verify: consensus.BlockVerifier,
curIndex: 0,
logger: consensus.getLogger(),
}, nil
})
}
// Next iterate to the next last mile block

@ -76,7 +76,7 @@ func (dh *downloadHelper) downloadFinishedLoop() {
for {
select {
case <-dh.finishedCh:
err := dh.c.addConsensusLastMile()
err := dh.c.AddConsensusLastMile()
if err != nil {
dh.c.getLogger().Error().Err(err).Msg("add last mile failed")
}
@ -89,22 +89,21 @@ func (dh *downloadHelper) downloadFinishedLoop() {
}
}
func (consensus *Consensus) addConsensusLastMile() error {
func (consensus *Consensus) AddConsensusLastMile() error {
curBN := consensus.Blockchain().CurrentBlock().NumberU64()
blockIter, err := consensus.GetLastMileBlockIter(curBN + 1)
if err != nil {
return err
}
for {
block := blockIter.Next()
if block == nil {
break
}
if _, err := consensus.Blockchain().InsertChain(types.Blocks{block}, true); err != nil {
return errors.Wrap(err, "failed to InsertChain")
err := consensus.GetLastMileBlockIter(curBN+1, func(blockIter *LastMileBlockIter) error {
for {
block := blockIter.Next()
if block == nil {
break
}
if _, err := consensus.Blockchain().InsertChain(types.Blocks{block}, true); err != nil {
return errors.Wrap(err, "failed to InsertChain")
}
}
}
return nil
return nil
})
return err
}
func (consensus *Consensus) spinUpStateSync() {

Loading…
Cancel
Save