diff --git a/api/service/stagedstreamsync/downloader.go b/api/service/stagedstreamsync/downloader.go index 371104895..156722a14 100644 --- a/api/service/stagedstreamsync/downloader.go +++ b/api/service/stagedstreamsync/downloader.go @@ -6,6 +6,7 @@ import ( "time" "github.com/ethereum/go-ethereum/event" + "github.com/harmony-one/harmony/consensus" "github.com/rs/zerolog" "github.com/harmony-one/harmony/consensus" @@ -38,7 +39,7 @@ type ( ) // NewDownloader creates a new downloader -func NewDownloader(host p2p.Host, bc core.BlockChain, consensus *consensus.Consensus, dbDir string, isBeaconNode bool, config Config) *Downloader { +func NewDownloader(host p2p.Host, bc core.BlockChain, consensus *consensus.Consensus, dbDir string, isBeaconNode bool, config Config, c *consensus.Consensus) *Downloader { config.fixValues() sp := sync.NewProtocol(sync.Config{ diff --git a/api/service/stagedstreamsync/downloaders.go b/api/service/stagedstreamsync/downloaders.go index 583f3e152..08a8e40de 100644 --- a/api/service/stagedstreamsync/downloaders.go +++ b/api/service/stagedstreamsync/downloaders.go @@ -16,7 +16,7 @@ type Downloaders struct { } // NewDownloaders creates Downloaders for sync of multiple blockchains -func NewDownloaders(host p2p.Host, bcs []core.BlockChain, consensus *consensus.Consensus, dbDir string, config Config) *Downloaders { +func NewDownloaders(host p2p.Host, bcs []core.BlockChain, consensus *consensus.Consensus, dbDir string, config Config, c *consensus.Consensus) *Downloaders { ds := make(map[uint32]*Downloader) isBeaconNode := len(bcs) == 1 for _, bc := range bcs { @@ -26,7 +26,7 @@ func NewDownloaders(host p2p.Host, bcs []core.BlockChain, consensus *consensus.C if _, ok := ds[bc.ShardID()]; ok { continue } - ds[bc.ShardID()] = NewDownloader(host, bc, consensus, dbDir, isBeaconNode, config) + ds[bc.ShardID()] = NewDownloader(host, bc, consensus, dbDir, isBeaconNode, config, c) } return &Downloaders{ ds: ds, diff --git a/api/service/stagedstreamsync/service.go b/api/service/stagedstreamsync/service.go index f7ffd7f2d..90db7eada 100644 --- a/api/service/stagedstreamsync/service.go +++ b/api/service/stagedstreamsync/service.go @@ -12,9 +12,9 @@ type StagedStreamSyncService struct { } // NewService creates a new downloader service -func NewService(host p2p.Host, bcs []core.BlockChain, consensus *consensus.Consensus, config Config, dbDir string) *StagedStreamSyncService { +func NewService(host p2p.Host, bcs []core.BlockChain, consensus *consensus.Consensus, config Config, dbDir string, c *consensus.Consensus) *StagedStreamSyncService { return &StagedStreamSyncService{ - Downloaders: NewDownloaders(host, bcs, consensus, dbDir, config), + Downloaders: NewDownloaders(host, bcs, consensus, dbDir, config, c), } } diff --git a/api/service/stagedstreamsync/stage_short_range.go b/api/service/stagedstreamsync/stage_short_range.go index ce6cdf36b..f3037869a 100644 --- a/api/service/stagedstreamsync/stage_short_range.go +++ b/api/service/stagedstreamsync/stage_short_range.go @@ -3,7 +3,9 @@ package stagedstreamsync import ( "context" + "github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/core" + "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/internal/utils" sttypes "github.com/harmony-one/harmony/p2p/stream/types" "github.com/harmony-one/harmony/shard" @@ -18,6 +20,7 @@ type StageShortRange struct { type StageShortRangeCfg struct { bc core.BlockChain db kv.RwDB + c *consensus.Consensus } func NewStageShortRange(cfg StageShortRangeCfg) *StageShortRange { @@ -26,10 +29,11 @@ func NewStageShortRange(cfg StageShortRangeCfg) *StageShortRange { } } -func NewStageShortRangeCfg(bc core.BlockChain, db kv.RwDB) StageShortRangeCfg { +func NewStageShortRangeCfg(bc core.BlockChain, db kv.RwDB, c *consensus.Consensus) StageShortRangeCfg { return StageShortRangeCfg{ bc: bc, db: db, + c: c, } } @@ -104,9 +108,12 @@ func (sr *StageShortRange) doShortRangeSync(ctx context.Context, s *StageState) return 0, errors.Wrap(err, "prerequisite") } } - curBN := sr.configs.bc.CurrentBlock().NumberU64() - blkNums := sh.prepareBlockHashNumbers(curBN) - hashChain, whitelist, err := sh.getHashChain(ctx, blkNums) + var ( + bc = sr.configs.bc + curBN = bc.CurrentHeader().NumberU64() + blkNums = sh.prepareBlockHashNumbers(curBN) + hashChain, whitelist, err = sh.getHashChain(ctx, blkNums) + ) if err != nil { if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { return 0, nil @@ -156,6 +163,25 @@ func (sr *StageShortRange) doShortRangeSync(ctx context.Context, s *StageState) return 0, err } + numInserted := 0 + err = sr.configs.c.GetLastMileBlockIter(sr.configs.bc.CurrentHeader().NumberU64()+1, func(blockIter *consensus.LastMileBlockIter) error { + for { + block := blockIter.Next() + if block == nil { + break + } + if _, err := bc.InsertChain(types.Blocks{block}, true); err != nil { + return errors.Wrap(err, "failed to InsertChain") + } + numInserted++ + } + return nil + }) + if err != nil { + return 0, errors.WithMessage(err, "failed to InsertChain for last mile blocks") + } + utils.Logger().Info().Int("last mile blocks inserted", numInserted).Msg("Insert last mile blocks success") + return n, nil } diff --git a/api/service/stagedstreamsync/syncing.go b/api/service/stagedstreamsync/syncing.go index 738f2f920..adf52ae9f 100644 --- a/api/service/stagedstreamsync/syncing.go +++ b/api/service/stagedstreamsync/syncing.go @@ -46,8 +46,9 @@ func CreateStagedSync(ctx context.Context, protocol syncProtocol, config Config, logger zerolog.Logger, + logProgress bool, + c *consensus.Consensus, ) (*StagedStreamSync, error) { - logger.Info(). Uint32("shard", bc.ShardID()). Bool("beaconNode", isBeaconNode). @@ -56,7 +57,6 @@ func CreateStagedSync(ctx context.Context, Bool("serverOnly", config.ServerOnly). Int("minStreams", config.MinStreams). Msg(WrapStagedSyncMsg("creating staged sync")) - var mainDB kv.RwDB dbs := make([]kv.RwDB, config.Concurrency) if config.UseMemDB { @@ -82,7 +82,7 @@ func CreateStagedSync(ctx context.Context, } stageHeadsCfg := NewStageHeadersCfg(bc, mainDB) - stageShortRangeCfg := NewStageShortRangeCfg(bc, mainDB) + stageShortRangeCfg := NewStageShortRangeCfg(bc, mainDB, c) stageSyncEpochCfg := NewStageEpochCfg(bc, mainDB) stageBodiesCfg := NewStageBodiesCfg(bc, mainDB, dbs, config.Concurrency, protocol, isBeaconNode, config.LogProgress) stageStatesCfg := NewStageStatesCfg(bc, mainDB, dbs, config.Concurrency, logger, config.LogProgress) diff --git a/cmd/harmony/main.go b/cmd/harmony/main.go index 9fc89d45d..b1afbe6bf 100644 --- a/cmd/harmony/main.go +++ b/cmd/harmony/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" "math/big" "math/rand" @@ -521,7 +522,7 @@ func setupNodeAndRun(hc harmonyconfig.HarmonyConfig) { Msg("Start p2p host failed") } - if err := currentNode.BootstrapConsensus(); err != nil { + if err := node.BootstrapConsensus(context.TODO(), currentNode.Consensus, currentNode.Host()); err != nil { fmt.Fprint(os.Stderr, "could not bootstrap consensus", err.Error()) if !currentNode.NodeConfig.IsOffline { os.Exit(-1) @@ -1032,7 +1033,7 @@ func setupStagedSyncService(node *node.Node, host p2p.Host, hc harmonyconfig.Har } } //Setup stream sync service - s := stagedstreamsync.NewService(host, blockchains, node.Consensus, sConfig, hc.General.DataDir) + s := stagedstreamsync.NewService(host, blockchains, node.Consensus, sConfig, hc.General.DataDir, node.Consensus) node.RegisterService(service.StagedStreamSync, s) diff --git a/core/blockchain.go b/core/blockchain.go index 0adc96925..40d33100a 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -109,14 +109,6 @@ type BlockChain interface { // but does not write any state. This is used to construct competing side forks // up to the point where they exceed the canonical total difficulty. WriteBlockWithoutState(block *types.Block, td *big.Int) (err error) - // WriteBlockWithState writes the block and all associated state to the database. - WriteBlockWithState( - block *types.Block, receipts []*types.Receipt, - cxReceipts []*types.CXReceipt, - stakeMsgs []types2.StakeMsg, - paid reward.Reader, - state *state.DB, - ) (status WriteStatus, err error) // GetMaxGarbageCollectedBlockNumber .. GetMaxGarbageCollectedBlockNumber() int64 // InsertChain attempts to insert the given batch of blocks in to the canonical diff --git a/core/blockchain_impl.go b/core/blockchain_impl.go index e9eca1f4c..9e7f1134b 100644 --- a/core/blockchain_impl.go +++ b/core/blockchain_impl.go @@ -1473,7 +1473,8 @@ func (bc *BlockChainImpl) WriteBlockWithoutState(block *types.Block, td *big.Int return nil } -func (bc *BlockChainImpl) WriteBlockWithState( +// writeBlockWithState writes the block and all associated state to the database. +func (bc *BlockChainImpl) writeBlockWithState( block *types.Block, receipts []*types.Receipt, cxReceipts []*types.CXReceipt, stakeMsgs []staking.StakeMsg, @@ -1880,7 +1881,7 @@ func (bc *BlockChainImpl) insertChain(chain types.Blocks, verifyHeaders bool) (i // Write the block to the chain and get the status. substart = time.Now() - status, err := bc.WriteBlockWithState( + status, err := bc.writeBlockWithState( block, receipts, cxReceipts, stakeMsgs, payout, state, ) if err != nil { diff --git a/core/blockchain_stub.go b/core/blockchain_stub.go index e9ef10ce9..e42a12b10 100644 --- a/core/blockchain_stub.go +++ b/core/blockchain_stub.go @@ -124,10 +124,6 @@ func (a Stub) WriteBlockWithoutState(block *types.Block, td *big.Int) (err error return errors.Errorf("method WriteBlockWithoutState not implemented for %s", a.Name) } -func (a Stub) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, cxReceipts []*types.CXReceipt, stakeMsgs []staking.StakeMsg, paid reward.Reader, state *state.DB) (status WriteStatus, err error) { - return 0, errors.Errorf("method WriteBlockWithState not implemented for %s", a.Name) -} - func (a Stub) GetMaxGarbageCollectedBlockNumber() int64 { return 0 } diff --git a/node/node.go b/node/node.go index e4d567066..41373e1b5 100644 --- a/node/node.go +++ b/node/node.go @@ -149,6 +149,10 @@ type Node struct { registry *registry.Registry } +func (node *Node) Host() p2p.Host { + return node.host +} + // Blockchain returns the blockchain for the node's current shard. func (node *Node) Blockchain() core.BlockChain { return node.registry.GetBlockchain() diff --git a/node/node_handler.go b/node/node_handler.go index eeaf90f2d..89464d3c0 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -404,16 +404,21 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block) error { } // BootstrapConsensus is a goroutine to check number of peers and start the consensus -func (node *Node) BootstrapConsensus() error { - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) +func BootstrapConsensus(ctx context.Context, consensus *consensus.Consensus, host p2p.Host) error { + ctx, cancel := context.WithTimeout(ctx, time.Minute) defer cancel() - min := node.Consensus.MinPeers + min := consensus.MinPeers enoughMinPeers := make(chan struct{}) const checkEvery = 3 * time.Second go func() { for { - <-time.After(checkEvery) - numPeersNow := node.host.GetPeerCount() + select { + case <-ctx.Done(): + return + case <-time.After(checkEvery): + } + + numPeersNow := host.GetPeerCount() if numPeersNow >= min { utils.Logger().Info().Msg("[bootstrap] StartConsensus") enoughMinPeers <- struct{}{} @@ -432,9 +437,7 @@ func (node *Node) BootstrapConsensus() error { case <-ctx.Done(): return ctx.Err() case <-enoughMinPeers: - go func() { - node.Consensus.StartChannel() - }() + go consensus.StartChannel() return nil } }