Last mile blocks insert.

pull/4455/head
frozen 1 year ago
parent b1389dacc4
commit 2f23d81879
No known key found for this signature in database
GPG Key ID: 5391C63E79B03EDE
  1. 3
      api/service/stagedstreamsync/downloader.go
  2. 4
      api/service/stagedstreamsync/downloaders.go
  3. 4
      api/service/stagedstreamsync/service.go
  4. 34
      api/service/stagedstreamsync/stage_short_range.go
  5. 6
      api/service/stagedstreamsync/syncing.go
  6. 5
      cmd/harmony/main.go
  7. 8
      core/blockchain.go
  8. 5
      core/blockchain_impl.go
  9. 4
      core/blockchain_stub.go
  10. 4
      node/node.go
  11. 19
      node/node_handler.go

@ -6,6 +6,7 @@ import (
"time"
"github.com/ethereum/go-ethereum/event"
"github.com/harmony-one/harmony/consensus"
"github.com/rs/zerolog"
"github.com/harmony-one/harmony/consensus"
@ -38,7 +39,7 @@ type (
)
// NewDownloader creates a new downloader
func NewDownloader(host p2p.Host, bc core.BlockChain, consensus *consensus.Consensus, dbDir string, isBeaconNode bool, config Config) *Downloader {
func NewDownloader(host p2p.Host, bc core.BlockChain, consensus *consensus.Consensus, dbDir string, isBeaconNode bool, config Config, c *consensus.Consensus) *Downloader {
config.fixValues()
sp := sync.NewProtocol(sync.Config{

@ -16,7 +16,7 @@ type Downloaders struct {
}
// NewDownloaders creates Downloaders for sync of multiple blockchains
func NewDownloaders(host p2p.Host, bcs []core.BlockChain, consensus *consensus.Consensus, dbDir string, config Config) *Downloaders {
func NewDownloaders(host p2p.Host, bcs []core.BlockChain, consensus *consensus.Consensus, dbDir string, config Config, c *consensus.Consensus) *Downloaders {
ds := make(map[uint32]*Downloader)
isBeaconNode := len(bcs) == 1
for _, bc := range bcs {
@ -26,7 +26,7 @@ func NewDownloaders(host p2p.Host, bcs []core.BlockChain, consensus *consensus.C
if _, ok := ds[bc.ShardID()]; ok {
continue
}
ds[bc.ShardID()] = NewDownloader(host, bc, consensus, dbDir, isBeaconNode, config)
ds[bc.ShardID()] = NewDownloader(host, bc, consensus, dbDir, isBeaconNode, config, c)
}
return &Downloaders{
ds: ds,

@ -12,9 +12,9 @@ type StagedStreamSyncService struct {
}
// NewService creates a new downloader service
func NewService(host p2p.Host, bcs []core.BlockChain, consensus *consensus.Consensus, config Config, dbDir string) *StagedStreamSyncService {
func NewService(host p2p.Host, bcs []core.BlockChain, consensus *consensus.Consensus, config Config, dbDir string, c *consensus.Consensus) *StagedStreamSyncService {
return &StagedStreamSyncService{
Downloaders: NewDownloaders(host, bcs, consensus, dbDir, config),
Downloaders: NewDownloaders(host, bcs, consensus, dbDir, config, c),
}
}

@ -3,7 +3,9 @@ package stagedstreamsync
import (
"context"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/harmony-one/harmony/shard"
@ -18,6 +20,7 @@ type StageShortRange struct {
type StageShortRangeCfg struct {
bc core.BlockChain
db kv.RwDB
c *consensus.Consensus
}
func NewStageShortRange(cfg StageShortRangeCfg) *StageShortRange {
@ -26,10 +29,11 @@ func NewStageShortRange(cfg StageShortRangeCfg) *StageShortRange {
}
}
func NewStageShortRangeCfg(bc core.BlockChain, db kv.RwDB) StageShortRangeCfg {
func NewStageShortRangeCfg(bc core.BlockChain, db kv.RwDB, c *consensus.Consensus) StageShortRangeCfg {
return StageShortRangeCfg{
bc: bc,
db: db,
c: c,
}
}
@ -104,9 +108,12 @@ func (sr *StageShortRange) doShortRangeSync(ctx context.Context, s *StageState)
return 0, errors.Wrap(err, "prerequisite")
}
}
curBN := sr.configs.bc.CurrentBlock().NumberU64()
blkNums := sh.prepareBlockHashNumbers(curBN)
hashChain, whitelist, err := sh.getHashChain(ctx, blkNums)
var (
bc = sr.configs.bc
curBN = bc.CurrentHeader().NumberU64()
blkNums = sh.prepareBlockHashNumbers(curBN)
hashChain, whitelist, err = sh.getHashChain(ctx, blkNums)
)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
return 0, nil
@ -156,6 +163,25 @@ func (sr *StageShortRange) doShortRangeSync(ctx context.Context, s *StageState)
return 0, err
}
numInserted := 0
err = sr.configs.c.GetLastMileBlockIter(sr.configs.bc.CurrentHeader().NumberU64()+1, func(blockIter *consensus.LastMileBlockIter) error {
for {
block := blockIter.Next()
if block == nil {
break
}
if _, err := bc.InsertChain(types.Blocks{block}, true); err != nil {
return errors.Wrap(err, "failed to InsertChain")
}
numInserted++
}
return nil
})
if err != nil {
return 0, errors.WithMessage(err, "failed to InsertChain for last mile blocks")
}
utils.Logger().Info().Int("last mile blocks inserted", numInserted).Msg("Insert last mile blocks success")
return n, nil
}

@ -46,8 +46,9 @@ func CreateStagedSync(ctx context.Context,
protocol syncProtocol,
config Config,
logger zerolog.Logger,
logProgress bool,
c *consensus.Consensus,
) (*StagedStreamSync, error) {
logger.Info().
Uint32("shard", bc.ShardID()).
Bool("beaconNode", isBeaconNode).
@ -56,7 +57,6 @@ func CreateStagedSync(ctx context.Context,
Bool("serverOnly", config.ServerOnly).
Int("minStreams", config.MinStreams).
Msg(WrapStagedSyncMsg("creating staged sync"))
var mainDB kv.RwDB
dbs := make([]kv.RwDB, config.Concurrency)
if config.UseMemDB {
@ -82,7 +82,7 @@ func CreateStagedSync(ctx context.Context,
}
stageHeadsCfg := NewStageHeadersCfg(bc, mainDB)
stageShortRangeCfg := NewStageShortRangeCfg(bc, mainDB)
stageShortRangeCfg := NewStageShortRangeCfg(bc, mainDB, c)
stageSyncEpochCfg := NewStageEpochCfg(bc, mainDB)
stageBodiesCfg := NewStageBodiesCfg(bc, mainDB, dbs, config.Concurrency, protocol, isBeaconNode, config.LogProgress)
stageStatesCfg := NewStageStatesCfg(bc, mainDB, dbs, config.Concurrency, logger, config.LogProgress)

@ -1,6 +1,7 @@
package main
import (
"context"
"fmt"
"math/big"
"math/rand"
@ -521,7 +522,7 @@ func setupNodeAndRun(hc harmonyconfig.HarmonyConfig) {
Msg("Start p2p host failed")
}
if err := currentNode.BootstrapConsensus(); err != nil {
if err := node.BootstrapConsensus(context.TODO(), currentNode.Consensus, currentNode.Host()); err != nil {
fmt.Fprint(os.Stderr, "could not bootstrap consensus", err.Error())
if !currentNode.NodeConfig.IsOffline {
os.Exit(-1)
@ -1032,7 +1033,7 @@ func setupStagedSyncService(node *node.Node, host p2p.Host, hc harmonyconfig.Har
}
}
//Setup stream sync service
s := stagedstreamsync.NewService(host, blockchains, node.Consensus, sConfig, hc.General.DataDir)
s := stagedstreamsync.NewService(host, blockchains, node.Consensus, sConfig, hc.General.DataDir, node.Consensus)
node.RegisterService(service.StagedStreamSync, s)

@ -109,14 +109,6 @@ type BlockChain interface {
// but does not write any state. This is used to construct competing side forks
// up to the point where they exceed the canonical total difficulty.
WriteBlockWithoutState(block *types.Block, td *big.Int) (err error)
// WriteBlockWithState writes the block and all associated state to the database.
WriteBlockWithState(
block *types.Block, receipts []*types.Receipt,
cxReceipts []*types.CXReceipt,
stakeMsgs []types2.StakeMsg,
paid reward.Reader,
state *state.DB,
) (status WriteStatus, err error)
// GetMaxGarbageCollectedBlockNumber ..
GetMaxGarbageCollectedBlockNumber() int64
// InsertChain attempts to insert the given batch of blocks in to the canonical

@ -1473,7 +1473,8 @@ func (bc *BlockChainImpl) WriteBlockWithoutState(block *types.Block, td *big.Int
return nil
}
func (bc *BlockChainImpl) WriteBlockWithState(
// writeBlockWithState writes the block and all associated state to the database.
func (bc *BlockChainImpl) writeBlockWithState(
block *types.Block, receipts []*types.Receipt,
cxReceipts []*types.CXReceipt,
stakeMsgs []staking.StakeMsg,
@ -1880,7 +1881,7 @@ func (bc *BlockChainImpl) insertChain(chain types.Blocks, verifyHeaders bool) (i
// Write the block to the chain and get the status.
substart = time.Now()
status, err := bc.WriteBlockWithState(
status, err := bc.writeBlockWithState(
block, receipts, cxReceipts, stakeMsgs, payout, state,
)
if err != nil {

@ -124,10 +124,6 @@ func (a Stub) WriteBlockWithoutState(block *types.Block, td *big.Int) (err error
return errors.Errorf("method WriteBlockWithoutState not implemented for %s", a.Name)
}
func (a Stub) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, cxReceipts []*types.CXReceipt, stakeMsgs []staking.StakeMsg, paid reward.Reader, state *state.DB) (status WriteStatus, err error) {
return 0, errors.Errorf("method WriteBlockWithState not implemented for %s", a.Name)
}
func (a Stub) GetMaxGarbageCollectedBlockNumber() int64 {
return 0
}

@ -149,6 +149,10 @@ type Node struct {
registry *registry.Registry
}
func (node *Node) Host() p2p.Host {
return node.host
}
// Blockchain returns the blockchain for the node's current shard.
func (node *Node) Blockchain() core.BlockChain {
return node.registry.GetBlockchain()

@ -404,16 +404,21 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block) error {
}
// BootstrapConsensus is a goroutine to check number of peers and start the consensus
func (node *Node) BootstrapConsensus() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
func BootstrapConsensus(ctx context.Context, consensus *consensus.Consensus, host p2p.Host) error {
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
min := node.Consensus.MinPeers
min := consensus.MinPeers
enoughMinPeers := make(chan struct{})
const checkEvery = 3 * time.Second
go func() {
for {
<-time.After(checkEvery)
numPeersNow := node.host.GetPeerCount()
select {
case <-ctx.Done():
return
case <-time.After(checkEvery):
}
numPeersNow := host.GetPeerCount()
if numPeersNow >= min {
utils.Logger().Info().Msg("[bootstrap] StartConsensus")
enoughMinPeers <- struct{}{}
@ -432,9 +437,7 @@ func (node *Node) BootstrapConsensus() error {
case <-ctx.Done():
return ctx.Err()
case <-enoughMinPeers:
go func() {
node.Consensus.StartChannel()
}()
go consensus.StartChannel()
return nil
}
}

Loading…
Cancel
Save