From 40a73635195bbaf50db532531123cbcd3946eabc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CGheisMohammadi=E2=80=9D?= <36589218+GheisMohammadi@users.noreply.github.com> Date: Tue, 18 Apr 2023 19:51:43 +0800 Subject: [PATCH] remove old stream codes --- api/service/manager.go | 3 - api/service/synchronize/service.go | 31 -- cmd/harmony/main.go | 46 +-- hmy/downloader/adapter.go | 32 -- hmy/downloader/adapter_test.go | 366 ------------------ hmy/downloader/beaconhelper.go | 156 -------- hmy/downloader/const.go | 80 ---- hmy/downloader/downloader.go | 321 ---------------- hmy/downloader/downloader_test.go | 96 ----- hmy/downloader/downloaders.go | 97 ----- hmy/downloader/longrange.go | 522 ------------------------- hmy/downloader/longrange_test.go | 331 ---------------- hmy/downloader/metric.go | 98 ----- hmy/downloader/shortrange.go | 593 ----------------------------- hmy/downloader/shortrange_test.go | 437 --------------------- hmy/downloader/types.go | 292 -------------- hmy/downloader/types_test.go | 266 ------------- node/node_syncing.go | 29 +- 18 files changed, 11 insertions(+), 3785 deletions(-) delete mode 100644 api/service/synchronize/service.go delete mode 100644 hmy/downloader/adapter.go delete mode 100644 hmy/downloader/adapter_test.go delete mode 100644 hmy/downloader/beaconhelper.go delete mode 100644 hmy/downloader/const.go delete mode 100644 hmy/downloader/downloader.go delete mode 100644 hmy/downloader/downloader_test.go delete mode 100644 hmy/downloader/downloaders.go delete mode 100644 hmy/downloader/longrange.go delete mode 100644 hmy/downloader/longrange_test.go delete mode 100644 hmy/downloader/metric.go delete mode 100644 hmy/downloader/shortrange.go delete mode 100644 hmy/downloader/shortrange_test.go delete mode 100644 hmy/downloader/types.go delete mode 100644 hmy/downloader/types_test.go diff --git a/api/service/manager.go b/api/service/manager.go index d879b6a6c..57ca15d60 100644 --- a/api/service/manager.go +++ b/api/service/manager.go @@ -23,7 +23,6 @@ const ( Prometheus Synchronize CrosslinkSending - StagedStreamSync ) func (t Type) String() string { @@ -46,8 +45,6 @@ func (t Type) String() string { return "Synchronize" case CrosslinkSending: return "CrosslinkSending" - case StagedStreamSync: - return "StagedStreamSync" default: return "Unknown" } diff --git a/api/service/synchronize/service.go b/api/service/synchronize/service.go deleted file mode 100644 index 4e4eb8013..000000000 --- a/api/service/synchronize/service.go +++ /dev/null @@ -1,31 +0,0 @@ -package synchronize - -import ( - "github.com/harmony-one/harmony/core" - "github.com/harmony-one/harmony/hmy/downloader" - "github.com/harmony-one/harmony/p2p" -) - -// Service is simply a adapter of Downloaders, which support block synchronization -type Service struct { - Downloaders *downloader.Downloaders -} - -// NewService creates the a new downloader service -func NewService(host p2p.Host, bcs []core.BlockChain, config downloader.Config) *Service { - return &Service{ - Downloaders: downloader.NewDownloaders(host, bcs, config), - } -} - -// Start start the service -func (s *Service) Start() error { - s.Downloaders.Start() - return nil -} - -// Stop stop the service -func (s *Service) Stop() error { - s.Downloaders.Close() - return nil -} diff --git a/cmd/harmony/main.go b/cmd/harmony/main.go index e66d3c9c7..030a0d2d1 100644 --- a/cmd/harmony/main.go +++ b/cmd/harmony/main.go @@ -40,12 +40,10 @@ import ( "github.com/harmony-one/harmony/api/service/pprof" "github.com/harmony-one/harmony/api/service/prometheus" "github.com/harmony-one/harmony/api/service/stagedstreamsync" - "github.com/harmony-one/harmony/api/service/synchronize" "github.com/harmony-one/harmony/common/fdlimit" "github.com/harmony-one/harmony/common/ntp" "github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/core" - "github.com/harmony-one/harmony/hmy/downloader" "github.com/harmony-one/harmony/internal/cli" "github.com/harmony-one/harmony/internal/common" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" @@ -405,12 +403,9 @@ func setupNodeAndRun(hc harmonyconfig.HarmonyConfig) { // Setup services if hc.Sync.Enabled { - if hc.Sync.StagedSync { - setupStagedSyncService(currentNode, myHost, hc) - } else { - setupSyncService(currentNode, myHost, hc) - } + setupSyncService(currentNode, myHost, hc) } + if currentNode.NodeConfig.Role() == nodeconfig.Validator { currentNode.RegisterValidatorServices() } else if currentNode.NodeConfig.Role() == nodeconfig.ExplorerNode { @@ -896,41 +891,6 @@ func setupSyncService(node *node.Node, host p2p.Host, hc harmonyconfig.HarmonyCo blockchains = append(blockchains, node.EpochChain()) } - dConfig := downloader.Config{ - ServerOnly: !hc.Sync.Downloader, - Network: nodeconfig.NetworkType(hc.Network.NetworkType), - Concurrency: hc.Sync.Concurrency, - MinStreams: hc.Sync.MinPeers, - InitStreams: hc.Sync.InitStreams, - SmSoftLowCap: hc.Sync.DiscSoftLowCap, - SmHardLowCap: hc.Sync.DiscHardLowCap, - SmHiCap: hc.Sync.DiscHighCap, - SmDiscBatch: hc.Sync.DiscBatch, - } - // If we are running side chain, we will need to do some extra works for beacon - // sync. - if !node.IsRunningBeaconChain() { - dConfig.BHConfig = &downloader.BeaconHelperConfig{ - BlockC: node.BeaconBlockChannel, - InsertHook: node.BeaconSyncHook, - } - } - s := synchronize.NewService(host, blockchains, dConfig) - - node.RegisterService(service.Synchronize, s) - - d := s.Downloaders.GetShardDownloader(node.Blockchain().ShardID()) - if hc.Sync.Downloader && hc.General.NodeType != nodeTypeExplorer { - node.Consensus.SetDownloader(d) // Set downloader when stream client is active - } -} - -func setupStagedSyncService(node *node.Node, host p2p.Host, hc harmonyconfig.HarmonyConfig) { - blockchains := []core.BlockChain{node.Blockchain()} - if node.Blockchain().ShardID() != shard.BeaconChainShardID { - blockchains = append(blockchains, node.EpochChain()) - } - sConfig := stagedstreamsync.Config{ ServerOnly: !hc.Sync.Downloader, Network: nodeconfig.NetworkType(hc.Network.NetworkType), @@ -956,7 +916,7 @@ func setupStagedSyncService(node *node.Node, host p2p.Host, hc harmonyconfig.Har //Setup stream sync service s := stagedstreamsync.NewService(host, blockchains, sConfig, hc.General.DataDir) - node.RegisterService(service.StagedStreamSync, s) + node.RegisterService(service.Synchronize, s) d := s.Downloaders.GetShardDownloader(node.Blockchain().ShardID()) if hc.Sync.Downloader && hc.General.NodeType != nodeTypeExplorer { diff --git a/hmy/downloader/adapter.go b/hmy/downloader/adapter.go deleted file mode 100644 index c8758b506..000000000 --- a/hmy/downloader/adapter.go +++ /dev/null @@ -1,32 +0,0 @@ -package downloader - -import ( - "context" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/event" - "github.com/harmony-one/harmony/consensus/engine" - "github.com/harmony-one/harmony/core/types" - "github.com/harmony-one/harmony/p2p/stream/common/streammanager" - syncproto "github.com/harmony-one/harmony/p2p/stream/protocols/sync" - sttypes "github.com/harmony-one/harmony/p2p/stream/types" -) - -type syncProtocol interface { - GetCurrentBlockNumber(ctx context.Context, opts ...syncproto.Option) (uint64, sttypes.StreamID, error) - GetBlocksByNumber(ctx context.Context, bns []uint64, opts ...syncproto.Option) ([]*types.Block, sttypes.StreamID, error) - GetBlockHashes(ctx context.Context, bns []uint64, opts ...syncproto.Option) ([]common.Hash, sttypes.StreamID, error) - GetBlocksByHashes(ctx context.Context, hs []common.Hash, opts ...syncproto.Option) ([]*types.Block, sttypes.StreamID, error) - - RemoveStream(stID sttypes.StreamID) // If a stream delivers invalid data, remove the stream - SubscribeAddStreamEvent(ch chan<- streammanager.EvtStreamAdded) event.Subscription - NumStreams() int -} - -type blockChain interface { - engine.ChainReader - Engine() engine.Engine - - InsertChain(chain types.Blocks, verifyHeaders bool) (int, error) - WriteCommitSig(blockNum uint64, lastCommits []byte) error -} diff --git a/hmy/downloader/adapter_test.go b/hmy/downloader/adapter_test.go deleted file mode 100644 index 2d1f12e70..000000000 --- a/hmy/downloader/adapter_test.go +++ /dev/null @@ -1,366 +0,0 @@ -package downloader - -import ( - "context" - "fmt" - "math/big" - "sync" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/event" - - "github.com/harmony-one/harmony/block" - "github.com/harmony-one/harmony/consensus/engine" - "github.com/harmony-one/harmony/consensus/reward" - "github.com/harmony-one/harmony/core/state" - "github.com/harmony-one/harmony/core/types" - "github.com/harmony-one/harmony/crypto/bls" - "github.com/harmony-one/harmony/internal/params" - "github.com/harmony-one/harmony/p2p/stream/common/streammanager" - syncproto "github.com/harmony-one/harmony/p2p/stream/protocols/sync" - sttypes "github.com/harmony-one/harmony/p2p/stream/types" - "github.com/harmony-one/harmony/shard" - "github.com/harmony-one/harmony/staking/slash" - staking "github.com/harmony-one/harmony/staking/types" -) - -type testBlockChain struct { - curBN uint64 - insertErrHook func(bn uint64) error - lock sync.Mutex -} - -func newTestBlockChain(curBN uint64, insertErrHook func(bn uint64) error) *testBlockChain { - return &testBlockChain{ - curBN: curBN, - insertErrHook: insertErrHook, - } -} - -func (bc *testBlockChain) CurrentBlock() *types.Block { - bc.lock.Lock() - defer bc.lock.Unlock() - - return makeTestBlock(bc.curBN) -} - -func (bc *testBlockChain) CurrentHeader() *block.Header { - bc.lock.Lock() - defer bc.lock.Unlock() - - return makeTestBlock(bc.curBN).Header() -} - -func (bc *testBlockChain) currentBlockNumber() uint64 { - bc.lock.Lock() - defer bc.lock.Unlock() - - return bc.curBN -} - -func (bc *testBlockChain) InsertChain(chain types.Blocks, verifyHeaders bool) (int, error) { - bc.lock.Lock() - defer bc.lock.Unlock() - - for i, block := range chain { - if bc.insertErrHook != nil { - if err := bc.insertErrHook(block.NumberU64()); err != nil { - return i, err - } - } - if block.NumberU64() <= bc.curBN { - continue - } - if block.NumberU64() != bc.curBN+1 { - return i, fmt.Errorf("not expected block number: %v / %v", block.NumberU64(), bc.curBN+1) - } - bc.curBN++ - } - return len(chain), nil -} - -func (bc *testBlockChain) changeBlockNumber(val uint64) { - bc.lock.Lock() - defer bc.lock.Unlock() - - bc.curBN = val -} - -func (bc *testBlockChain) ShardID() uint32 { return 0 } -func (bc *testBlockChain) ReadShardState(epoch *big.Int) (*shard.State, error) { return nil, nil } -func (bc *testBlockChain) Config() *params.ChainConfig { return nil } -func (bc *testBlockChain) WriteCommitSig(blockNum uint64, lastCommits []byte) error { return nil } -func (bc *testBlockChain) GetHeader(hash common.Hash, number uint64) *block.Header { return nil } -func (bc *testBlockChain) GetHeaderByNumber(number uint64) *block.Header { return nil } -func (bc *testBlockChain) GetHeaderByHash(hash common.Hash) *block.Header { return nil } -func (bc *testBlockChain) GetBlock(hash common.Hash, number uint64) *types.Block { return nil } -func (bc *testBlockChain) ReadValidatorList() ([]common.Address, error) { return nil, nil } -func (bc *testBlockChain) ReadCommitSig(blockNum uint64) ([]byte, error) { return nil, nil } -func (bc *testBlockChain) ReadBlockRewardAccumulator(uint64) (*big.Int, error) { return nil, nil } -func (bc *testBlockChain) ValidatorCandidates() []common.Address { return nil } -func (bc *testBlockChain) Engine() engine.Engine { return &dummyEngine{} } -func (cr *testBlockChain) ReadValidatorInformationAtState( - addr common.Address, state *state.DB, -) (*staking.ValidatorWrapper, error) { - return nil, nil -} -func (cr *testBlockChain) StateAt(root common.Hash) (*state.DB, error) { - return nil, nil -} -func (bc *testBlockChain) ReadValidatorInformation(addr common.Address) (*staking.ValidatorWrapper, error) { - return nil, nil -} -func (bc *testBlockChain) ReadValidatorSnapshot(addr common.Address) (*staking.ValidatorSnapshot, error) { - return nil, nil -} -func (bc *testBlockChain) ReadValidatorSnapshotAtEpoch(epoch *big.Int, addr common.Address) (*staking.ValidatorSnapshot, error) { - return nil, nil -} -func (bc *testBlockChain) ReadValidatorStats(addr common.Address) (*staking.ValidatorStats, error) { - return nil, nil -} -func (bc *testBlockChain) SuperCommitteeForNextEpoch(beacon engine.ChainReader, header *block.Header, isVerify bool) (*shard.State, error) { - return nil, nil -} - -type dummyEngine struct{} - -func (e *dummyEngine) VerifyHeader(engine.ChainReader, *block.Header, bool) error { - return nil -} -func (e *dummyEngine) VerifyHeaderSignature(engine.ChainReader, *block.Header, bls.SerializedSignature, []byte) error { - return nil -} -func (e *dummyEngine) VerifyCrossLink(engine.ChainReader, types.CrossLink) error { - return nil -} -func (e *dummyEngine) VerifyVRF(chain engine.ChainReader, header *block.Header) error { - return nil -} -func (e *dummyEngine) VerifyHeaders(engine.ChainReader, []*block.Header, []bool) (chan<- struct{}, <-chan error) { - return nil, nil -} -func (e *dummyEngine) VerifySeal(engine.ChainReader, *block.Header) error { return nil } -func (e *dummyEngine) VerifyShardState(engine.ChainReader, engine.ChainReader, *block.Header) error { - return nil -} -func (e *dummyEngine) Beaconchain() engine.ChainReader { return nil } -func (e *dummyEngine) SetBeaconchain(engine.ChainReader) {} -func (e *dummyEngine) Finalize( - chain engine.ChainReader, beacon engine.ChainReader, header *block.Header, - state *state.DB, txs []*types.Transaction, - receipts []*types.Receipt, outcxs []*types.CXReceipt, - incxs []*types.CXReceiptsProof, stks staking.StakingTransactions, - doubleSigners slash.Records, sigsReady chan bool, viewID func() uint64, -) (*types.Block, reward.Reader, error) { - return nil, nil, nil -} - -type testInsertHelper struct { - bc *testBlockChain -} - -func (ch *testInsertHelper) verifyAndInsertBlock(block *types.Block) error { - _, err := ch.bc.InsertChain(types.Blocks{block}, true) - return err -} -func (ch *testInsertHelper) verifyAndInsertBlocks(blocks types.Blocks) (int, error) { - return ch.bc.InsertChain(blocks, true) -} - -const ( - initStreamNum = 32 - minStreamNum = 16 -) - -type testSyncProtocol struct { - streamIDs []sttypes.StreamID - remoteChain *testBlockChain - requestErrHook func(uint64) error - - curIndex int - numStreams int - lock sync.Mutex -} - -func newTestSyncProtocol(targetBN uint64, numStreams int, requestErrHook func(uint64) error) *testSyncProtocol { - return &testSyncProtocol{ - streamIDs: makeStreamIDs(numStreams), - remoteChain: newTestBlockChain(targetBN, nil), - requestErrHook: requestErrHook, - curIndex: 0, - numStreams: numStreams, - } -} - -func (sp *testSyncProtocol) GetCurrentBlockNumber(ctx context.Context, opts ...syncproto.Option) (uint64, sttypes.StreamID, error) { - sp.lock.Lock() - defer sp.lock.Unlock() - - bn := sp.remoteChain.currentBlockNumber() - - return bn, sp.nextStreamID(), nil -} - -func (sp *testSyncProtocol) GetBlocksByNumber(ctx context.Context, bns []uint64, opts ...syncproto.Option) ([]*types.Block, sttypes.StreamID, error) { - sp.lock.Lock() - defer sp.lock.Unlock() - - res := make([]*types.Block, 0, len(bns)) - for _, bn := range bns { - if sp.requestErrHook != nil { - if err := sp.requestErrHook(bn); err != nil { - return nil, sp.nextStreamID(), err - } - } - if bn > sp.remoteChain.currentBlockNumber() { - res = append(res, nil) - } else { - res = append(res, makeTestBlock(bn)) - } - } - return res, sp.nextStreamID(), nil -} - -func (sp *testSyncProtocol) GetBlockHashes(ctx context.Context, bns []uint64, opts ...syncproto.Option) ([]common.Hash, sttypes.StreamID, error) { - sp.lock.Lock() - defer sp.lock.Unlock() - - res := make([]common.Hash, 0, len(bns)) - for _, bn := range bns { - if sp.requestErrHook != nil { - if err := sp.requestErrHook(bn); err != nil { - return nil, sp.nextStreamID(), err - } - } - if bn > sp.remoteChain.currentBlockNumber() { - res = append(res, emptyHash) - } else { - res = append(res, makeTestBlockHash(bn)) - } - } - return res, sp.nextStreamID(), nil -} - -func (sp *testSyncProtocol) GetBlocksByHashes(ctx context.Context, hs []common.Hash, opts ...syncproto.Option) ([]*types.Block, sttypes.StreamID, error) { - sp.lock.Lock() - defer sp.lock.Unlock() - - res := make([]*types.Block, 0, len(hs)) - for _, h := range hs { - bn := testHashToNumber(h) - if sp.requestErrHook != nil { - if err := sp.requestErrHook(bn); err != nil { - return nil, sp.nextStreamID(), err - } - } - if bn > sp.remoteChain.currentBlockNumber() { - res = append(res, nil) - } else { - res = append(res, makeTestBlock(bn)) - } - } - return res, sp.nextStreamID(), nil -} - -func (sp *testSyncProtocol) RemoveStream(target sttypes.StreamID) { - sp.lock.Lock() - defer sp.lock.Unlock() - - for i, stid := range sp.streamIDs { - if stid == target { - if i == len(sp.streamIDs)-1 { - sp.streamIDs = sp.streamIDs[:i] - } else { - sp.streamIDs = append(sp.streamIDs[:i], sp.streamIDs[i+1:]...) - } - // mock discovery - if len(sp.streamIDs) < minStreamNum { - sp.streamIDs = append(sp.streamIDs, makeStreamID(sp.numStreams)) - sp.numStreams++ - } - } - } -} - -func (sp *testSyncProtocol) NumStreams() int { - sp.lock.Lock() - defer sp.lock.Unlock() - - return len(sp.streamIDs) -} - -func (sp *testSyncProtocol) SubscribeAddStreamEvent(ch chan<- streammanager.EvtStreamAdded) event.Subscription { - var evtFeed event.Feed - go func() { - sp.lock.Lock() - num := len(sp.streamIDs) - sp.lock.Unlock() - for i := 0; i != num; i++ { - evtFeed.Send(streammanager.EvtStreamAdded{Stream: nil}) - } - }() - return evtFeed.Subscribe(ch) -} - -// TODO: add with whitelist stuff -func (sp *testSyncProtocol) nextStreamID() sttypes.StreamID { - if sp.curIndex >= len(sp.streamIDs) { - sp.curIndex = 0 - } - index := sp.curIndex - sp.curIndex++ - if sp.curIndex >= len(sp.streamIDs) { - sp.curIndex = 0 - } - return sp.streamIDs[index] -} - -func (sp *testSyncProtocol) changeBlockNumber(val uint64) { - sp.remoteChain.changeBlockNumber(val) -} - -func makeStreamIDs(size int) []sttypes.StreamID { - res := make([]sttypes.StreamID, 0, size) - for i := 0; i != size; i++ { - res = append(res, makeStreamID(i)) - } - return res -} - -func makeStreamID(index int) sttypes.StreamID { - return sttypes.StreamID(fmt.Sprintf("test stream %v", index)) -} - -var ( - hashNumberMap = map[common.Hash]uint64{} - computed uint64 - hashNumberLock sync.Mutex -) - -func testHashToNumber(h common.Hash) uint64 { - hashNumberLock.Lock() - defer hashNumberLock.Unlock() - - if h == emptyHash { - panic("not allowed") - } - if bn, ok := hashNumberMap[h]; ok { - return bn - } - for ; ; computed++ { - ch := makeTestBlockHash(computed) - hashNumberMap[ch] = computed - if ch == h { - return computed - } - } -} - -func testNumberToHashes(nums []uint64) []common.Hash { - hashes := make([]common.Hash, 0, len(nums)) - for _, num := range nums { - hashes = append(hashes, makeTestBlockHash(num)) - } - return hashes -} diff --git a/hmy/downloader/beaconhelper.go b/hmy/downloader/beaconhelper.go deleted file mode 100644 index 96d06ebf8..000000000 --- a/hmy/downloader/beaconhelper.go +++ /dev/null @@ -1,156 +0,0 @@ -package downloader - -import ( - "time" - - "github.com/harmony-one/harmony/core/types" - "github.com/harmony-one/harmony/internal/utils" - "github.com/rs/zerolog" -) - -// lastMileCache keeps the last 50 number blocks in memory cache -const lastMileCap = 50 - -type ( - // beaconHelper is the helper for the beacon downloader. The beaconHelper is only started - // when node is running on side chain, listening to beacon client pub-sub message and - // insert the latest blocks to the beacon chain. - beaconHelper struct { - bc blockChain - blockC <-chan *types.Block - // TODO: refactor this hook to consensus module. We'd better put it in - // consensus module under a subscription. - insertHook func() - - lastMileCache *blocksByNumber - insertC chan insertTask - closeC chan struct{} - logger zerolog.Logger - } - - insertTask struct { - doneC chan struct{} - } -) - -func newBeaconHelper(bc blockChain, blockC <-chan *types.Block, insertHook func()) *beaconHelper { - return &beaconHelper{ - bc: bc, - blockC: blockC, - insertHook: insertHook, - lastMileCache: newBlocksByNumber(lastMileCap), - insertC: make(chan insertTask, 1), - closeC: make(chan struct{}), - logger: utils.Logger().With(). - Str("module", "downloader"). - Str("sub-module", "beacon helper"). - Logger(), - } -} - -func (bh *beaconHelper) start() { - go bh.loop() -} - -func (bh *beaconHelper) close() { - close(bh.closeC) -} - -func (bh *beaconHelper) loop() { - t := time.NewTicker(10 * time.Second) - defer t.Stop() - for { - select { - case <-t.C: - bh.insertAsync() - - case b, ok := <-bh.blockC: - if !ok { - return // blockC closed. Node exited - } - if b == nil { - continue - } - bh.lastMileCache.push(b) - bh.insertAsync() - - case it := <-bh.insertC: - inserted, bn, err := bh.insertLastMileBlocks() - numBlocksInsertedBeaconHelperCounter.Add(float64(inserted)) - if err != nil { - bh.logger.Warn().Err(err).Msg("insert last mile blocks error") - continue - } - bh.logger.Info().Int("inserted", inserted). - Uint64("end height", bn). - Uint32("shard", bh.bc.ShardID()). - Msg("insert last mile blocks") - - close(it.doneC) - - case <-bh.closeC: - return - } - } -} - -// insertSync triggers the insert last mile without blocking -func (bh *beaconHelper) insertAsync() { - select { - case bh.insertC <- insertTask{ - doneC: make(chan struct{}), - }: - default: - } -} - -// insertSync triggers the insert last mile while blocking -func (bh *beaconHelper) insertSync() { - task := insertTask{ - doneC: make(chan struct{}), - } - bh.insertC <- task - <-task.doneC -} - -func (bh *beaconHelper) insertLastMileBlocks() (inserted int, bn uint64, err error) { - bn = bh.bc.CurrentBlock().NumberU64() + 1 - for { - b := bh.getNextBlock(bn) - if b == nil { - bn-- - return - } - // TODO: Instruct the beacon helper to verify signatures. This may require some forks - // in pub-sub message (add commit sigs in node.block.sync messages) - if _, err = bh.bc.InsertChain(types.Blocks{b}, true); err != nil { - bn-- - return - } - bh.logger.Info().Uint64("number", b.NumberU64()).Msg("Inserted block from beacon pub-sub") - - if bh.insertHook != nil { - bh.insertHook() - } - inserted++ - bn++ - } -} - -func (bh *beaconHelper) getNextBlock(expBN uint64) *types.Block { - for bh.lastMileCache.len() > 0 { - b := bh.lastMileCache.pop() - if b == nil { - return nil - } - if b.NumberU64() < expBN { - continue - } - if b.NumberU64() > expBN { - bh.lastMileCache.push(b) - return nil - } - return b - } - return nil -} diff --git a/hmy/downloader/const.go b/hmy/downloader/const.go deleted file mode 100644 index a6cafb918..000000000 --- a/hmy/downloader/const.go +++ /dev/null @@ -1,80 +0,0 @@ -package downloader - -import ( - "time" - - "github.com/harmony-one/harmony/core/types" - nodeconfig "github.com/harmony-one/harmony/internal/configs/node" -) - -const ( - numBlocksByNumPerRequest int = 10 // number of blocks for each request - blocksPerInsert int = 50 // number of blocks for each insert batch - - numBlockHashesPerRequest int = 20 // number of get block hashes for short range sync - numBlocksByHashesUpperCap int = 10 // number of get blocks by hashes upper cap - numBlocksByHashesLowerCap int = 3 // number of get blocks by hashes lower cap - - lastMileThres int = 10 - - // soft cap of size in resultQueue. When the queue size is larger than this limit, - // no more request will be assigned to workers to wait for InsertChain to finish. - softQueueCap int = 100 - - // defaultConcurrency is the default settings for concurrency - defaultConcurrency = 4 - - // shortRangeTimeout is the timeout for each short range sync, which allow short range sync - // to restart automatically when stuck in `getBlockHashes` - shortRangeTimeout = 1 * time.Minute -) - -type ( - // Config is the downloader config - Config struct { - // Only run stream sync protocol as a server. - // TODO: remove this when stream sync is fully up. - ServerOnly bool - // use staged sync - Staged bool - // parameters - Network nodeconfig.NetworkType - Concurrency int // Number of concurrent sync requests - MinStreams int // Minimum number of streams to do sync - InitStreams int // Number of streams requirement for initial bootstrap - - // stream manager config - SmSoftLowCap int - SmHardLowCap int - SmHiCap int - SmDiscBatch int - - // config for beacon config - BHConfig *BeaconHelperConfig - } - - // BeaconHelperConfig is the extra config used for beaconHelper which uses - // pub-sub block message to do sync. - BeaconHelperConfig struct { - BlockC <-chan *types.Block - InsertHook func() - } -) - -func (c *Config) fixValues() { - if c.Concurrency == 0 { - c.Concurrency = defaultConcurrency - } - if c.Concurrency > c.MinStreams { - c.MinStreams = c.Concurrency - } - if c.MinStreams > c.InitStreams { - c.InitStreams = c.MinStreams - } - if c.MinStreams > c.SmSoftLowCap { - c.SmSoftLowCap = c.MinStreams - } - if c.MinStreams > c.SmHardLowCap { - c.SmHardLowCap = c.MinStreams - } -} diff --git a/hmy/downloader/downloader.go b/hmy/downloader/downloader.go deleted file mode 100644 index 01ec242ab..000000000 --- a/hmy/downloader/downloader.go +++ /dev/null @@ -1,321 +0,0 @@ -package downloader - -import ( - "context" - "fmt" - "time" - - "github.com/ethereum/go-ethereum/event" - "github.com/pkg/errors" - "github.com/rs/zerolog" - - "github.com/harmony-one/harmony/core" - "github.com/harmony-one/harmony/core/types" - "github.com/harmony-one/harmony/crypto/bls" - "github.com/harmony-one/harmony/internal/chain" - nodeconfig "github.com/harmony-one/harmony/internal/configs/node" - "github.com/harmony-one/harmony/internal/utils" - "github.com/harmony-one/harmony/p2p" - "github.com/harmony-one/harmony/p2p/stream/common/streammanager" - "github.com/harmony-one/harmony/p2p/stream/protocols/sync" -) - -type ( - // Downloader is responsible for sync task of one shard - Downloader struct { - bc blockChain - syncProtocol syncProtocol - bh *beaconHelper - - downloadC chan struct{} - closeC chan struct{} - ctx context.Context - cancel func() - - evtDownloadFinished event.Feed // channel for each download task finished - evtDownloadFinishedSubscribed bool - evtDownloadStarted event.Feed // channel for each download has started - evtDownloadStartedSubscribed bool - - status status - config Config - logger zerolog.Logger - } -) - -// NewDownloader creates a new downloader -func NewDownloader(host p2p.Host, bc core.BlockChain, isBeaconNode bool, config Config) *Downloader { - config.fixValues() - - sp := sync.NewProtocol(sync.Config{ - Chain: bc, - Host: host.GetP2PHost(), - Discovery: host.GetDiscovery(), - ShardID: nodeconfig.ShardID(bc.ShardID()), - Network: config.Network, - BeaconNode: isBeaconNode, - - SmSoftLowCap: config.SmSoftLowCap, - SmHardLowCap: config.SmHardLowCap, - SmHiCap: config.SmHiCap, - DiscBatch: config.SmDiscBatch, - }) - host.AddStreamProtocol(sp) - - var bh *beaconHelper - if config.BHConfig != nil && bc.ShardID() == 0 { - bh = newBeaconHelper(bc, config.BHConfig.BlockC, config.BHConfig.InsertHook) - } - - ctx, cancel := context.WithCancel(context.Background()) - - return &Downloader{ - bc: bc, - syncProtocol: sp, - bh: bh, - - downloadC: make(chan struct{}), - closeC: make(chan struct{}), - ctx: ctx, - cancel: cancel, - - status: newStatus(), - config: config, - logger: utils.Logger().With().Str("module", "downloader").Uint32("ShardID", bc.ShardID()).Logger(), - } -} - -// Start start the downloader -func (d *Downloader) Start() { - go d.run() - - if d.bh != nil { - d.bh.start() - } -} - -// Close close the downloader -func (d *Downloader) Close() { - close(d.closeC) - d.cancel() - - if d.bh != nil { - d.bh.close() - } -} - -// DownloadAsync triggers the download async. -func (d *Downloader) DownloadAsync() { - select { - case d.downloadC <- struct{}{}: - consensusTriggeredDownloadCounterVec.With(d.promLabels()).Inc() - - case <-time.After(100 * time.Millisecond): - } -} - -// NumPeers returns the number of peers connected of a specific shard. -func (d *Downloader) NumPeers() int { - return d.syncProtocol.NumStreams() -} - -// IsSyncing return the current sync status -func (d *Downloader) SyncStatus() (bool, uint64, uint64) { - current := d.bc.CurrentBlock().NumberU64() - syncing, target := d.status.get() - if !syncing { // means synced - target = current - } - // isSyncing, target, blocks to target - return syncing, target, target - current -} - -// SubscribeDownloadStarted subscribe download started -func (d *Downloader) SubscribeDownloadStarted(ch chan struct{}) event.Subscription { - d.evtDownloadStartedSubscribed = true - return d.evtDownloadStarted.Subscribe(ch) -} - -// SubscribeDownloadFinished subscribe the download finished -func (d *Downloader) SubscribeDownloadFinished(ch chan struct{}) event.Subscription { - d.evtDownloadFinishedSubscribed = true - return d.evtDownloadFinished.Subscribe(ch) -} - -func (d *Downloader) run() { - d.waitForBootFinish() - d.loop() -} - -// waitForBootFinish wait for stream manager to finish the initial discovery and have -// enough peers to start downloader -func (d *Downloader) waitForBootFinish() { - evtCh := make(chan streammanager.EvtStreamAdded, 1) - sub := d.syncProtocol.SubscribeAddStreamEvent(evtCh) - defer sub.Unsubscribe() - - checkCh := make(chan struct{}, 1) - trigger := func() { - select { - case checkCh <- struct{}{}: - default: - } - } - trigger() - - t := time.NewTicker(10 * time.Second) - defer t.Stop() - for { - d.logger.Info().Msg("waiting for initial bootstrap discovery") - select { - case <-t.C: - trigger() - - case <-evtCh: - trigger() - - case <-checkCh: - if d.syncProtocol.NumStreams() >= d.config.InitStreams { - return - } - case <-d.closeC: - return - } - } -} - -func (d *Downloader) loop() { - ticker := time.NewTicker(10 * time.Second) - defer ticker.Stop() - initSync := true - trigger := func() { - select { - case d.downloadC <- struct{}{}: - case <-time.After(100 * time.Millisecond): - } - } - go trigger() - - for { - select { - case <-ticker.C: - go trigger() - - case <-d.downloadC: - addedBN, err := d.doDownload(initSync) - if err != nil { - // If error happens, sleep 5 seconds and retry - d.logger.Warn().Err(err).Bool("bootstrap", initSync).Msg("failed to download") - go func() { - time.Sleep(5 * time.Second) - trigger() - }() - time.Sleep(1 * time.Second) - continue - } - d.logger.Info().Int("block added", addedBN). - Uint64("current height", d.bc.CurrentBlock().NumberU64()). - Bool("initSync", initSync). - Uint32("shard", d.bc.ShardID()). - Msg("sync finished") - - if addedBN != 0 { - // If block number has been changed, trigger another sync - // and try to add last mile from pub-sub (blocking) - go trigger() - if d.bh != nil { - d.bh.insertSync() - } - } - initSync = false - - case <-d.closeC: - return - } - } -} - -func (d *Downloader) doDownload(initSync bool) (n int, err error) { - if initSync { - d.logger.Info().Uint64("current number", d.bc.CurrentBlock().NumberU64()). - Uint32("shard ID", d.bc.ShardID()).Msg("start long range sync") - - n, err = d.doLongRangeSync() - } else { - d.logger.Info().Uint64("current number", d.bc.CurrentBlock().NumberU64()). - Uint32("shard ID", d.bc.ShardID()).Msg("start short range sync") - - n, err = d.doShortRangeSync() - } - if err != nil { - pl := d.promLabels() - pl["error"] = err.Error() - numFailedDownloadCounterVec.With(pl).Inc() - return - } - return -} - -func (d *Downloader) startSyncing() { - d.status.startSyncing() - if d.evtDownloadStartedSubscribed { - d.evtDownloadStarted.Send(struct{}{}) - } -} - -func (d *Downloader) finishSyncing() { - d.status.finishSyncing() - if d.evtDownloadFinishedSubscribed { - d.evtDownloadFinished.Send(struct{}{}) - } -} - -var emptySigVerifyErr *sigVerifyErr - -type sigVerifyErr struct { - err error -} - -func (e *sigVerifyErr) Error() string { - return fmt.Sprintf("[VerifyHeaderSignature] %v", e.err.Error()) -} - -func verifyAndInsertBlocks(bc blockChain, blocks types.Blocks) (int, error) { - for i, block := range blocks { - if err := verifyAndInsertBlock(bc, block, blocks[i+1:]...); err != nil { - return i, err - } - } - return len(blocks), nil -} - -func verifyAndInsertBlock(bc blockChain, block *types.Block, nextBlocks ...*types.Block) error { - var ( - sigBytes bls.SerializedSignature - bitmap []byte - err error - ) - if len(nextBlocks) > 0 { - // get commit sig from the next block - next := nextBlocks[0] - sigBytes = next.Header().LastCommitSignature() - bitmap = next.Header().LastCommitBitmap() - } else { - // get commit sig from current block - sigBytes, bitmap, err = chain.ParseCommitSigAndBitmap(block.GetCurrentCommitSig()) - if err != nil { - return errors.Wrap(err, "parse commitSigAndBitmap") - } - } - - if err := bc.Engine().VerifyHeaderSignature(bc, block.Header(), sigBytes, bitmap); err != nil { - return &sigVerifyErr{err} - } - if err := bc.Engine().VerifyHeader(bc, block.Header(), true); err != nil { - return errors.Wrap(err, "[VerifyHeader]") - } - if _, err := bc.InsertChain(types.Blocks{block}, false); err != nil { - return errors.Wrap(err, "[InsertChain]") - } - return nil -} diff --git a/hmy/downloader/downloader_test.go b/hmy/downloader/downloader_test.go deleted file mode 100644 index b290b6eec..000000000 --- a/hmy/downloader/downloader_test.go +++ /dev/null @@ -1,96 +0,0 @@ -package downloader - -import ( - "context" - "fmt" - "testing" - "time" -) - -func TestDownloader_Integration(t *testing.T) { - sp := newTestSyncProtocol(1000, 48, nil) - bc := newTestBlockChain(0, nil) - ctx, cancel := context.WithCancel(context.Background()) - c := Config{} - c.fixValues() // use default config values - - d := &Downloader{ - bc: bc, - syncProtocol: sp, - downloadC: make(chan struct{}), - closeC: make(chan struct{}), - ctx: ctx, - cancel: cancel, - config: c, - } - - // subscribe download event - finishedCh := make(chan struct{}, 1) - finishedSub := d.SubscribeDownloadFinished(finishedCh) - startedCh := make(chan struct{}, 1) - startedSub := d.SubscribeDownloadStarted(startedCh) - defer finishedSub.Unsubscribe() - defer startedSub.Unsubscribe() - - // Start the downloader - d.Start() - defer d.Close() - - // During bootstrap, trigger two download task: one long range, one short range. - // The second one will not trigger start / finish events. - if err := checkReceiveChanMulTimes(startedCh, 1, 10*time.Second); err != nil { - t.Fatal(err) - } - if err := checkReceiveChanMulTimes(finishedCh, 1, 10*time.Second); err != nil { - t.Fatal(err) - } - if curBN := d.bc.CurrentBlock().NumberU64(); curBN != 1000 { - t.Fatal("blockchain not synced to the latest") - } - - // Increase the remote block number, and trigger one download task manually - sp.changeBlockNumber(1010) - d.DownloadAsync() - // We shall do short range test twice - if err := checkReceiveChanMulTimes(startedCh, 1, 10*time.Second); err != nil { - t.Fatal(err) - } - if err := checkReceiveChanMulTimes(finishedCh, 1, 10*time.Second); err != nil { - t.Fatal(err) - } - if curBN := d.bc.CurrentBlock().NumberU64(); curBN != 1010 { - t.Fatal("blockchain not synced to the latest") - } - - // Remote block number unchanged, and trigger one download task manually - d.DownloadAsync() - if err := checkReceiveChanMulTimes(startedCh, 0, 10*time.Second); err != nil { - t.Fatal(err) - } - if err := checkReceiveChanMulTimes(finishedCh, 0, 10*time.Second); err != nil { - t.Fatal(err) - } - - // At last, check number of streams, should be exactly the same as the initial number - if sp.numStreams != 48 { - t.Errorf("unexpected number of streams at the end: %v / %v", sp.numStreams, 48) - } -} - -func checkReceiveChanMulTimes(ch chan struct{}, times int, timeout time.Duration) error { - t := time.Tick(timeout) - - for i := 0; i != times; i++ { - select { - case <-ch: - case <-t: - return fmt.Errorf("timed out %v", timeout) - } - } - select { - case <-ch: - return fmt.Errorf("received an extra event") - case <-time.After(100 * time.Millisecond): - } - return nil -} diff --git a/hmy/downloader/downloaders.go b/hmy/downloader/downloaders.go deleted file mode 100644 index 528e47e84..000000000 --- a/hmy/downloader/downloaders.go +++ /dev/null @@ -1,97 +0,0 @@ -package downloader - -import ( - "github.com/harmony-one/abool" - "github.com/harmony-one/harmony/core" - "github.com/harmony-one/harmony/p2p" -) - -// Downloaders is the set of downloaders -type Downloaders struct { - ds map[uint32]*Downloader - active *abool.AtomicBool - - config Config -} - -// NewDownloaders creates Downloaders for sync of multiple blockchains -func NewDownloaders(host p2p.Host, bcs []core.BlockChain, config Config) *Downloaders { - ds := make(map[uint32]*Downloader) - isBeaconNode := len(bcs) == 1 - for _, bc := range bcs { - if bc == nil { - continue - } - if _, ok := ds[bc.ShardID()]; ok { - continue - } - ds[bc.ShardID()] = NewDownloader(host, bc, isBeaconNode, config) - } - return &Downloaders{ - ds: ds, - active: abool.New(), - config: config, - } -} - -// Start start the downloaders -func (ds *Downloaders) Start() { - if ds.config.ServerOnly { - // Run in server only mode. Do not start downloaders. - return - } - ds.active.Set() - for _, d := range ds.ds { - d.Start() - } -} - -// Close close the downloaders -func (ds *Downloaders) Close() { - if ds.config.ServerOnly { - // Run in server only mode. Downloaders not started. - return - } - ds.active.UnSet() - for _, d := range ds.ds { - d.Close() - } -} - -// DownloadAsync triggers a download -func (ds *Downloaders) DownloadAsync(shardID uint32) { - d, ok := ds.ds[shardID] - if !ok && d != nil { - d.DownloadAsync() - } -} - -// GetShardDownloader get the downloader with the given shard ID -func (ds *Downloaders) GetShardDownloader(shardID uint32) *Downloader { - return ds.ds[shardID] -} - -// NumPeers returns the connected peers for each shard -func (ds *Downloaders) NumPeers() map[uint32]int { - res := make(map[uint32]int) - - for sid, d := range ds.ds { - res[sid] = d.NumPeers() - } - return res -} - -// SyncStatus returns whether the given shard is doing syncing task and the target block -// number. -func (ds *Downloaders) SyncStatus(shardID uint32) (bool, uint64, uint64) { - d, ok := ds.ds[shardID] - if !ok { - return false, 0, 0 - } - return d.SyncStatus() -} - -// IsActive returns whether the downloader is active -func (ds *Downloaders) IsActive() bool { - return ds.active.IsSet() -} diff --git a/hmy/downloader/longrange.go b/hmy/downloader/longrange.go deleted file mode 100644 index 4d4935b8f..000000000 --- a/hmy/downloader/longrange.go +++ /dev/null @@ -1,522 +0,0 @@ -package downloader - -import ( - "context" - "fmt" - "sync" - "time" - - "github.com/harmony-one/harmony/core/types" - syncproto "github.com/harmony-one/harmony/p2p/stream/protocols/sync" - sttypes "github.com/harmony-one/harmony/p2p/stream/types" - "github.com/pkg/errors" - "github.com/rs/zerolog" -) - -// doLongRangeSync does the long range sync. -// One LongRangeSync consists of several iterations. -// For each iteration, estimate the current block number, then fetch block & insert to blockchain -func (d *Downloader) doLongRangeSync() (int, error) { - var totalInserted int - - for { - ctx, cancel := context.WithCancel(d.ctx) - - iter := &lrSyncIter{ - bc: d.bc, - p: d.syncProtocol, - d: d, - ctx: ctx, - config: d.config, - logger: d.logger.With().Str("mode", "long range").Logger(), - } - if err := iter.doLongRangeSync(); err != nil { - cancel() - return totalInserted + iter.inserted, err - } - cancel() - - totalInserted += iter.inserted - - if iter.inserted < lastMileThres { - return totalInserted, nil - } - } -} - -// lrSyncIter run a single iteration of a full long range sync. -// First get a rough estimate of the current block height, and then sync to this -// block number -type lrSyncIter struct { - bc blockChain - p syncProtocol - d *Downloader - - gbm *getBlocksManager // initialized when finished get block number - inserted int - - config Config - ctx context.Context - logger zerolog.Logger -} - -func (lsi *lrSyncIter) doLongRangeSync() error { - if err := lsi.checkPrerequisites(); err != nil { - return err - } - bn, err := lsi.estimateCurrentNumber() - if err != nil { - return err - } - if curBN := lsi.bc.CurrentBlock().NumberU64(); bn <= curBN { - lsi.logger.Info().Uint64("current number", curBN).Uint64("target number", bn). - Msg("early return of long range sync") - return nil - } - - lsi.d.startSyncing() - defer lsi.d.finishSyncing() - - lsi.logger.Info().Uint64("target number", bn).Msg("estimated remote current number") - lsi.d.status.setTargetBN(bn) - - return lsi.fetchAndInsertBlocks(bn) -} - -func (lsi *lrSyncIter) checkPrerequisites() error { - return lsi.checkHaveEnoughStreams() -} - -// estimateCurrentNumber roughly estimate the current block number. -// The block number does not need to be exact, but just a temporary target of the iteration -func (lsi *lrSyncIter) estimateCurrentNumber() (uint64, error) { - var ( - cnResults = make(map[sttypes.StreamID]uint64) - lock sync.Mutex - wg sync.WaitGroup - ) - wg.Add(lsi.config.Concurrency) - for i := 0; i != lsi.config.Concurrency; i++ { - go func() { - defer wg.Done() - bn, stid, err := lsi.doGetCurrentNumberRequest() - if err != nil { - lsi.logger.Err(err).Str("streamID", string(stid)). - Msg("getCurrentNumber request failed. Removing stream") - if !errors.Is(err, context.Canceled) { - lsi.p.RemoveStream(stid) - } - return - } - lock.Lock() - cnResults[stid] = bn - lock.Unlock() - }() - } - wg.Wait() - - if len(cnResults) == 0 { - select { - case <-lsi.ctx.Done(): - return 0, lsi.ctx.Err() - default: - } - return 0, errors.New("zero block number response from remote nodes") - } - bn := computeBlockNumberByMaxVote(cnResults) - return bn, nil -} - -func (lsi *lrSyncIter) doGetCurrentNumberRequest() (uint64, sttypes.StreamID, error) { - ctx, cancel := context.WithTimeout(lsi.ctx, 10*time.Second) - defer cancel() - - bn, stid, err := lsi.p.GetCurrentBlockNumber(ctx, syncproto.WithHighPriority()) - if err != nil { - return 0, stid, err - } - return bn, stid, nil -} - -// fetchAndInsertBlocks use the pipeline pattern to boost the performance of inserting blocks. -// TODO: For resharding, use the pipeline to do fast sync (epoch loop, header loop, body loop) -func (lsi *lrSyncIter) fetchAndInsertBlocks(targetBN uint64) error { - gbm := newGetBlocksManager(lsi.bc, targetBN, lsi.logger) - lsi.gbm = gbm - - // Setup workers to fetch blocks from remote node - for i := 0; i != lsi.config.Concurrency; i++ { - worker := &getBlocksWorker{ - gbm: gbm, - protocol: lsi.p, - } - go worker.workLoop(lsi.ctx) - } - - // insert the blocks to chain. Return when the target block number is reached. - lsi.insertChainLoop(targetBN) - - select { - case <-lsi.ctx.Done(): - return lsi.ctx.Err() - default: - } - return nil -} - -func (lsi *lrSyncIter) insertChainLoop(targetBN uint64) { - var ( - gbm = lsi.gbm - t = time.NewTicker(100 * time.Millisecond) - resultC = make(chan struct{}, 1) - ) - defer t.Stop() - - trigger := func() { - select { - case resultC <- struct{}{}: - default: - } - } - - for { - select { - case <-lsi.ctx.Done(): - return - - case <-t.C: - // Redundancy, periodically check whether there is blocks that can be processed - trigger() - - case <-gbm.resultC: - // New block arrive in resultQueue - trigger() - - case <-resultC: - blockResults := gbm.PullContinuousBlocks(blocksPerInsert) - if len(blockResults) > 0 { - lsi.processBlocks(blockResults, targetBN) - // more blocks is expected being able to be pulled from queue - trigger() - } - if lsi.bc.CurrentBlock().NumberU64() >= targetBN { - return - } - } - } -} - -func (lsi *lrSyncIter) processBlocks(results []*blockResult, targetBN uint64) { - blocks := blockResultsToBlocks(results) - - for i, block := range blocks { - if err := verifyAndInsertBlock(lsi.bc, block); err != nil { - lsi.logger.Warn().Err(err).Uint64("target block", targetBN). - Uint64("block number", block.NumberU64()). - Msg("insert blocks failed in long range") - pl := lsi.d.promLabels() - pl["error"] = err.Error() - longRangeFailInsertedBlockCounterVec.With(pl).Inc() - - lsi.p.RemoveStream(results[i].stid) - lsi.gbm.HandleInsertError(results, i) - return - } - - lsi.inserted++ - longRangeSyncedBlockCounterVec.With(lsi.d.promLabels()).Inc() - } - lsi.gbm.HandleInsertResult(results) -} - -func (lsi *lrSyncIter) checkHaveEnoughStreams() error { - numStreams := lsi.p.NumStreams() - if numStreams < lsi.config.MinStreams { - return fmt.Errorf("number of streams smaller than minimum: %v < %v", - numStreams, lsi.config.MinStreams) - } - return nil -} - -// getBlocksWorker does the request job -type getBlocksWorker struct { - gbm *getBlocksManager - protocol syncProtocol -} - -func (w *getBlocksWorker) workLoop(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - default: - } - batch := w.gbm.GetNextBatch() - if len(batch) == 0 { - select { - case <-ctx.Done(): - return - case <-time.After(100 * time.Millisecond): - continue - } - } - - blocks, stid, err := w.doBatch(ctx, batch) - if err != nil { - if !errors.Is(err, context.Canceled) { - w.protocol.RemoveStream(stid) - } - err = errors.Wrap(err, "request error") - w.gbm.HandleRequestError(batch, err, stid) - } else { - w.gbm.HandleRequestResult(batch, blocks, stid) - } - } -} - -func (w *getBlocksWorker) doBatch(ctx context.Context, bns []uint64) ([]*types.Block, sttypes.StreamID, error) { - ctx, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() - - blocks, stid, err := w.protocol.GetBlocksByNumber(ctx, bns) - if err != nil { - return nil, stid, err - } - if err := validateGetBlocksResult(bns, blocks); err != nil { - return nil, stid, err - } - return blocks, stid, nil -} - -// getBlocksManager is the helper structure for get blocks request management -type getBlocksManager struct { - chain blockChain - - targetBN uint64 - requesting map[uint64]struct{} // block numbers that have been assigned to workers but not received - processing map[uint64]struct{} // block numbers received requests but not inserted - retries *prioritizedNumbers // requests where error happens - rq *resultQueue // result queue wait to be inserted into blockchain - - resultC chan struct{} - logger zerolog.Logger - lock sync.Mutex -} - -func newGetBlocksManager(chain blockChain, targetBN uint64, logger zerolog.Logger) *getBlocksManager { - return &getBlocksManager{ - chain: chain, - targetBN: targetBN, - requesting: make(map[uint64]struct{}), - processing: make(map[uint64]struct{}), - retries: newPrioritizedNumbers(), - rq: newResultQueue(), - resultC: make(chan struct{}, 1), - logger: logger, - } -} - -// GetNextBatch get the next block numbers batch -func (gbm *getBlocksManager) GetNextBatch() []uint64 { - gbm.lock.Lock() - defer gbm.lock.Unlock() - - cap := numBlocksByNumPerRequest - - bns := gbm.getBatchFromRetries(cap) - cap -= len(bns) - gbm.addBatchToRequesting(bns) - - if gbm.availableForMoreTasks() { - addBNs := gbm.getBatchFromUnprocessed(cap) - gbm.addBatchToRequesting(addBNs) - bns = append(bns, addBNs...) - } - - return bns -} - -// HandleRequestError handles the error result -func (gbm *getBlocksManager) HandleRequestError(bns []uint64, err error, stid sttypes.StreamID) { - gbm.lock.Lock() - defer gbm.lock.Unlock() - - gbm.logger.Warn().Err(err).Str("stream", string(stid)).Msg("get blocks error") - - // add requested block numbers to retries - for _, bn := range bns { - delete(gbm.requesting, bn) - gbm.retries.push(bn) - } - - // remove results from result queue by the stream and add back to retries - removed := gbm.rq.removeResultsByStreamID(stid) - for _, bn := range removed { - delete(gbm.processing, bn) - gbm.retries.push(bn) - } -} - -// HandleRequestResult handles get blocks result -func (gbm *getBlocksManager) HandleRequestResult(bns []uint64, blocks []*types.Block, stid sttypes.StreamID) { - gbm.lock.Lock() - defer gbm.lock.Unlock() - - for i, bn := range bns { - delete(gbm.requesting, bn) - if blocks[i] == nil { - gbm.retries.push(bn) - } else { - gbm.processing[bn] = struct{}{} - } - } - gbm.rq.addBlockResults(blocks, stid) - select { - case gbm.resultC <- struct{}{}: - default: - } -} - -// HandleInsertResult handle the insert result -func (gbm *getBlocksManager) HandleInsertResult(inserted []*blockResult) { - gbm.lock.Lock() - defer gbm.lock.Unlock() - - for _, block := range inserted { - delete(gbm.processing, block.getBlockNumber()) - } -} - -// HandleInsertError handles the error during InsertChain -func (gbm *getBlocksManager) HandleInsertError(results []*blockResult, n int) { - gbm.lock.Lock() - defer gbm.lock.Unlock() - - var ( - inserted []*blockResult - errResult *blockResult - abandoned []*blockResult - ) - inserted = results[:n] - errResult = results[n] - if n != len(results) { - abandoned = results[n+1:] - } - - for _, res := range inserted { - delete(gbm.processing, res.getBlockNumber()) - } - for _, res := range abandoned { - gbm.rq.addBlockResults([]*types.Block{res.block}, res.stid) - } - - delete(gbm.processing, errResult.getBlockNumber()) - gbm.retries.push(errResult.getBlockNumber()) - - removed := gbm.rq.removeResultsByStreamID(errResult.stid) - for _, bn := range removed { - delete(gbm.processing, bn) - gbm.retries.push(bn) - } -} - -// PullContinuousBlocks pull continuous blocks from request queue -func (gbm *getBlocksManager) PullContinuousBlocks(cap int) []*blockResult { - gbm.lock.Lock() - defer gbm.lock.Unlock() - - expHeight := gbm.chain.CurrentBlock().NumberU64() + 1 - results, stales := gbm.rq.popBlockResults(expHeight, cap) - // For stale blocks, we remove them from processing - for _, bn := range stales { - delete(gbm.processing, bn) - } - return results -} - -// getBatchFromRetries get the block number batch to be requested from retries. -func (gbm *getBlocksManager) getBatchFromRetries(cap int) []uint64 { - var ( - requestBNs []uint64 - curHeight = gbm.chain.CurrentBlock().NumberU64() - ) - for cnt := 0; cnt < cap; cnt++ { - bn := gbm.retries.pop() - if bn == 0 { - break // no more retries - } - if bn <= curHeight { - continue - } - requestBNs = append(requestBNs, bn) - } - return requestBNs -} - -// getBatchFromRetries get the block number batch to be requested from unprocessed. -func (gbm *getBlocksManager) getBatchFromUnprocessed(cap int) []uint64 { - var ( - requestBNs []uint64 - curHeight = gbm.chain.CurrentBlock().NumberU64() - ) - bn := curHeight + 1 - // TODO: this algorithm can be potentially optimized. - for cnt := 0; cnt < cap && bn <= gbm.targetBN; cnt++ { - for bn <= gbm.targetBN { - _, ok1 := gbm.requesting[bn] - _, ok2 := gbm.processing[bn] - if !ok1 && !ok2 { - requestBNs = append(requestBNs, bn) - bn++ - break - } - bn++ - } - } - return requestBNs -} - -func (gbm *getBlocksManager) availableForMoreTasks() bool { - return gbm.rq.results.Len() < softQueueCap -} - -func (gbm *getBlocksManager) addBatchToRequesting(bns []uint64) { - for _, bn := range bns { - gbm.requesting[bn] = struct{}{} - } -} - -func validateGetBlocksResult(requested []uint64, result []*types.Block) error { - if len(result) != len(requested) { - return fmt.Errorf("unexpected number of blocks delivered: %v / %v", len(result), len(requested)) - } - for i, block := range result { - if block != nil && block.NumberU64() != requested[i] { - return fmt.Errorf("block with unexpected number delivered: %v / %v", block.NumberU64(), requested[i]) - } - } - return nil -} - -// computeBlockNumberByMaxVote compute the target block number by max vote. -func computeBlockNumberByMaxVote(votes map[sttypes.StreamID]uint64) uint64 { - var ( - nm = make(map[uint64]int) - res uint64 - maxCnt int - ) - for _, bn := range votes { - _, ok := nm[bn] - if !ok { - nm[bn] = 0 - } - nm[bn]++ - cnt := nm[bn] - - if cnt > maxCnt || (cnt == maxCnt && bn > res) { - res = bn - maxCnt = cnt - } - } - return res -} diff --git a/hmy/downloader/longrange_test.go b/hmy/downloader/longrange_test.go deleted file mode 100644 index 105aaf064..000000000 --- a/hmy/downloader/longrange_test.go +++ /dev/null @@ -1,331 +0,0 @@ -package downloader - -import ( - "context" - "fmt" - "math/rand" - "sync" - "testing" - - sttypes "github.com/harmony-one/harmony/p2p/stream/types" - "github.com/pkg/errors" -) - -func TestDownloader_doLongRangeSync(t *testing.T) { - targetBN := uint64(1000) - bc := newTestBlockChain(1, nil) - - d := &Downloader{ - bc: bc, - syncProtocol: newTestSyncProtocol(targetBN, 32, nil), - config: Config{ - Concurrency: 16, - MinStreams: 16, - }, - ctx: context.Background(), - } - synced, err := d.doLongRangeSync() - if err != nil { - t.Error(err) - } - if synced == 0 { - t.Errorf("synced false") - } - if curNum := d.bc.CurrentBlock().NumberU64(); curNum != targetBN { - t.Errorf("block number not expected: %v / %v", curNum, targetBN) - } -} - -func TestLrSyncIter_EstimateCurrentNumber(t *testing.T) { - lsi := &lrSyncIter{ - p: newTestSyncProtocol(100, 32, nil), - ctx: context.Background(), - config: Config{ - Concurrency: 16, - MinStreams: 10, - }, - } - bn, err := lsi.estimateCurrentNumber() - if err != nil { - t.Error(err) - } - if bn != 100 { - t.Errorf("unexpected block number: %v / %v", bn, 100) - } -} - -func TestGetBlocksManager_GetNextBatch(t *testing.T) { - tests := []struct { - gbm *getBlocksManager - expBNs []uint64 - }{ - { - gbm: makeGetBlocksManager( - 10, 100, []uint64{9, 11, 12, 13}, - []uint64{14, 15, 16}, []uint64{}, 0, - ), - expBNs: []uint64{17, 18, 19, 20, 21, 22, 23, 24, 25, 26}, - }, - { - gbm: makeGetBlocksManager( - 10, 100, []uint64{9, 13, 14, 15, 16}, - []uint64{}, []uint64{10, 11, 12}, 0, - ), - expBNs: []uint64{11, 12, 17, 18, 19, 20, 21, 22, 23, 24}, - }, - { - gbm: makeGetBlocksManager( - 10, 100, []uint64{9, 13, 14, 15, 16}, - []uint64{}, []uint64{10, 11, 12}, 120, - ), - expBNs: []uint64{11, 12}, - }, - { - gbm: makeGetBlocksManager( - 10, 100, []uint64{9, 13, 14, 15, 16}, - []uint64{}, []uint64{}, 120, - ), - expBNs: []uint64{}, - }, - { - gbm: makeGetBlocksManager( - 10, 20, []uint64{9, 13, 14, 15, 16}, - []uint64{}, []uint64{}, 0, - ), - expBNs: []uint64{11, 12, 17, 18, 19, 20}, - }, - { - gbm: makeGetBlocksManager( - 10, 100, []uint64{9, 13, 14, 15, 16}, - []uint64{}, []uint64{}, 0, - ), - expBNs: []uint64{11, 12, 17, 18, 19, 20, 21, 22, 23, 24}, - }, - } - - for i, test := range tests { - if i < 4 { - continue - } - batch := test.gbm.GetNextBatch() - if len(test.expBNs) != len(batch) { - t.Errorf("Test %v: unexpected size [%v] / [%v]", i, batch, test.expBNs) - } - for i := range test.expBNs { - if test.expBNs[i] != batch[i] { - t.Errorf("Test %v: [%v] / [%v]", i, batch, test.expBNs) - } - } - } -} - -func TestLrSyncIter_FetchAndInsertBlocks(t *testing.T) { - targetBN := uint64(1000) - chain := newTestBlockChain(0, nil) - protocol := newTestSyncProtocol(targetBN, 32, nil) - ctx := context.Background() - - lsi := &lrSyncIter{ - bc: chain, - d: &Downloader{bc: chain}, - p: protocol, - gbm: nil, - config: Config{ - Concurrency: 100, - }, - ctx: ctx, - } - lsi.fetchAndInsertBlocks(targetBN) - - if err := fetchAndInsertBlocksResultCheck(lsi, targetBN, initStreamNum); err != nil { - t.Error(err) - } -} - -// When FetchAndInsertBlocks, one request has an error -func TestLrSyncIter_FetchAndInsertBlocks_ErrRequest(t *testing.T) { - targetBN := uint64(1000) - var once sync.Once - errHook := func(bn uint64) error { - var err error - once.Do(func() { - err = errors.New("test error expected") - }) - return err - } - chain := newTestBlockChain(0, nil) - protocol := newTestSyncProtocol(targetBN, 32, errHook) - ctx := context.Background() - - lsi := &lrSyncIter{ - bc: chain, - d: &Downloader{bc: chain}, - p: protocol, - gbm: nil, - config: Config{ - Concurrency: 100, - }, - ctx: ctx, - } - lsi.fetchAndInsertBlocks(targetBN) - - if err := fetchAndInsertBlocksResultCheck(lsi, targetBN, initStreamNum-1); err != nil { - t.Error(err) - } -} - -// When FetchAndInsertBlocks, one insertion has an error -func TestLrSyncIter_FetchAndInsertBlocks_ErrInsert(t *testing.T) { - targetBN := uint64(1000) - var once sync.Once - errHook := func(bn uint64) error { - var err error - once.Do(func() { - err = errors.New("test error expected") - }) - return err - } - chain := newTestBlockChain(0, errHook) - protocol := newTestSyncProtocol(targetBN, 32, nil) - ctx := context.Background() - - lsi := &lrSyncIter{ - bc: chain, - d: &Downloader{bc: chain}, - p: protocol, - gbm: nil, - config: Config{ - Concurrency: 100, - }, - ctx: ctx, - } - lsi.fetchAndInsertBlocks(targetBN) - - if err := fetchAndInsertBlocksResultCheck(lsi, targetBN, initStreamNum-1); err != nil { - t.Error(err) - } -} - -// When FetchAndInsertBlocks, randomly error happens -func TestLrSyncIter_FetchAndInsertBlocks_RandomErr(t *testing.T) { - targetBN := uint64(10000) - rand.Seed(0) - errHook := func(bn uint64) error { - // 10% error happens - if rand.Intn(10)%10 == 0 { - return errors.New("error expected") - } - return nil - } - chain := newTestBlockChain(0, errHook) - protocol := newTestSyncProtocol(targetBN, 32, errHook) - ctx := context.Background() - - lsi := &lrSyncIter{ - bc: chain, - d: &Downloader{bc: chain}, - p: protocol, - gbm: nil, - config: Config{ - Concurrency: 100, - }, - ctx: ctx, - } - lsi.fetchAndInsertBlocks(targetBN) - - if err := fetchAndInsertBlocksResultCheck(lsi, targetBN, minStreamNum); err != nil { - t.Error(err) - } -} - -func fetchAndInsertBlocksResultCheck(lsi *lrSyncIter, targetBN uint64, expNumStreams int) error { - if bn := lsi.bc.CurrentBlock().NumberU64(); bn != targetBN { - return fmt.Errorf("did not reached targetBN: %v / %v", bn, targetBN) - } - lsi.gbm.lock.Lock() - defer lsi.gbm.lock.Unlock() - if len(lsi.gbm.processing) != 0 { - return fmt.Errorf("not empty processing: %v", lsi.gbm.processing) - } - if len(lsi.gbm.requesting) != 0 { - return fmt.Errorf("not empty requesting: %v", lsi.gbm.requesting) - } - if lsi.gbm.retries.length() != 0 { - return fmt.Errorf("not empty retries: %v", lsi.gbm.retries) - } - if lsi.gbm.rq.length() != 0 { - return fmt.Errorf("not empty result queue: %v", lsi.gbm.rq.results) - } - tsp := lsi.p.(*testSyncProtocol) - if len(tsp.streamIDs) != expNumStreams { - return fmt.Errorf("num streams not expected: %v / %v", len(tsp.streamIDs), expNumStreams) - } - return nil -} - -func TestComputeBNMaxVote(t *testing.T) { - tests := []struct { - votes map[sttypes.StreamID]uint64 - exp uint64 - }{ - { - votes: map[sttypes.StreamID]uint64{ - makeStreamID(0): 10, - makeStreamID(1): 10, - makeStreamID(2): 20, - }, - exp: 10, - }, - { - votes: map[sttypes.StreamID]uint64{ - makeStreamID(0): 10, - makeStreamID(1): 20, - }, - exp: 20, - }, - { - votes: map[sttypes.StreamID]uint64{ - makeStreamID(0): 20, - makeStreamID(1): 10, - makeStreamID(2): 20, - }, - exp: 20, - }, - } - - for i, test := range tests { - res := computeBlockNumberByMaxVote(test.votes) - if res != test.exp { - t.Errorf("Test %v: unexpected bn %v / %v", i, res, test.exp) - } - } -} - -func makeGetBlocksManager(curBN, targetBN uint64, requesting, processing, retries []uint64, sizeRQ int) *getBlocksManager { - chain := newTestBlockChain(curBN, nil) - requestingM := make(map[uint64]struct{}) - for _, bn := range requesting { - requestingM[bn] = struct{}{} - } - processingM := make(map[uint64]struct{}) - for _, bn := range processing { - processingM[bn] = struct{}{} - } - retriesPN := newPrioritizedNumbers() - for _, retry := range retries { - retriesPN.push(retry) - } - rq := newResultQueue() - for i := uint64(0); i != uint64(sizeRQ); i++ { - rq.addBlockResults(makeTestBlocks([]uint64{i + curBN}), "") - } - return &getBlocksManager{ - chain: chain, - targetBN: targetBN, - requesting: requestingM, - processing: processingM, - retries: retriesPN, - rq: rq, - resultC: make(chan struct{}, 1), - } -} diff --git a/hmy/downloader/metric.go b/hmy/downloader/metric.go deleted file mode 100644 index 2995db7b3..000000000 --- a/hmy/downloader/metric.go +++ /dev/null @@ -1,98 +0,0 @@ -package downloader - -import ( - "fmt" - - prom "github.com/harmony-one/harmony/api/service/prometheus" - "github.com/prometheus/client_golang/prometheus" -) - -func init() { - prom.PromRegistry().MustRegister( - consensusTriggeredDownloadCounterVec, - longRangeSyncedBlockCounterVec, - longRangeFailInsertedBlockCounterVec, - numShortRangeCounterVec, - numFailedDownloadCounterVec, - numBlocksInsertedShortRangeHistogramVec, - numBlocksInsertedBeaconHelperCounter, - ) -} - -var ( - consensusTriggeredDownloadCounterVec = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: "hmy", - Subsystem: "downloader", - Name: "consensus_trigger", - Help: "number of times consensus triggered download task", - }, - []string{"ShardID"}, - ) - - longRangeSyncedBlockCounterVec = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: "hmy", - Subsystem: "downloader", - Name: "num_blocks_synced_long_range", - Help: "number of blocks synced in long range sync", - }, - []string{"ShardID"}, - ) - - longRangeFailInsertedBlockCounterVec = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: "hmy", - Subsystem: "downloader", - Name: "num_blocks_failed_long_range", - Help: "number of blocks failed to insert into change in long range sync", - }, - []string{"ShardID", "error"}, - ) - - numShortRangeCounterVec = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: "hmy", - Subsystem: "downloader", - Name: "num_short_range", - Help: "number of short range sync is triggered", - }, - []string{"ShardID"}, - ) - - numFailedDownloadCounterVec = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: "hmy", - Subsystem: "downloader", - Name: "failed_download", - Help: "number of downloading is failed", - }, - []string{"ShardID", "error"}, - ) - - numBlocksInsertedShortRangeHistogramVec = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: "hmy", - Subsystem: "downloader", - Name: "num_blocks_inserted_short_range", - Help: "number of blocks inserted for each short range sync", - // Buckets: 0, 1, 2, 4, +INF (capped at 10) - Buckets: prometheus.ExponentialBuckets(0.5, 2, 5), - }, - []string{"ShardID"}, - ) - - numBlocksInsertedBeaconHelperCounter = prometheus.NewCounter( - prometheus.CounterOpts{ - Namespace: "hmy", - Subsystem: "downloader", - Name: "num_blocks_inserted_beacon_helper", - Help: "number of blocks inserted from beacon helper", - }, - ) -) - -func (d *Downloader) promLabels() prometheus.Labels { - sid := d.bc.ShardID() - return prometheus.Labels{"ShardID": fmt.Sprintf("%d", sid)} -} diff --git a/hmy/downloader/shortrange.go b/hmy/downloader/shortrange.go deleted file mode 100644 index 8276911d4..000000000 --- a/hmy/downloader/shortrange.go +++ /dev/null @@ -1,593 +0,0 @@ -package downloader - -import ( - "context" - "fmt" - "math" - "sync" - "time" - - "github.com/ethereum/go-ethereum/common" - "github.com/harmony-one/harmony/core" - "github.com/harmony-one/harmony/core/types" - syncProto "github.com/harmony-one/harmony/p2p/stream/protocols/sync" - sttypes "github.com/harmony-one/harmony/p2p/stream/types" - "github.com/harmony-one/harmony/shard" - "github.com/pkg/errors" - "github.com/rs/zerolog" -) - -// doShortRangeSync does the short range sync. -// Compared with long range sync, short range sync is more focused on syncing to the latest block. -// It consist of 3 steps: -// 1. Obtain the block hashes and compute the longest hash chain.. -// 2. Get blocks by hashes from computed hash chain. -// 3. Insert the blocks to blockchain. -func (d *Downloader) doShortRangeSync() (int, error) { - if _, ok := d.bc.(*core.EpochChain); ok { - return d.doShortRangeSyncForEpochSync() - } - numShortRangeCounterVec.With(d.promLabels()).Inc() - - srCtx, cancel := context.WithTimeout(d.ctx, shortRangeTimeout) - defer cancel() - - sh := &srHelper{ - syncProtocol: d.syncProtocol, - ctx: srCtx, - config: d.config, - logger: d.logger.With().Str("mode", "short range").Logger(), - } - - if err := sh.checkPrerequisites(); err != nil { - return 0, errors.Wrap(err, "prerequisite") - } - curBN := d.bc.CurrentBlock().NumberU64() - hashChain, whitelist, err := sh.getHashChain(sh.prepareBlockHashNumbers(curBN)) - if err != nil { - return 0, errors.Wrap(err, "getHashChain") - } - if len(hashChain) == 0 { - // short circuit for no sync is needed - return 0, nil - } - - expEndBN := curBN + uint64(len(hashChain)) - d.logger.Info().Uint64("current number", curBN). - Uint64("target number", expEndBN). - Interface("hashChain", hashChain). - Msg("short range start syncing") - d.startSyncing() - d.status.setTargetBN(expEndBN) - defer func() { - d.logger.Info().Msg("short range finished syncing") - d.finishSyncing() - }() - - blocks, stids, err := sh.getBlocksByHashes(hashChain, whitelist) - if err != nil { - d.logger.Warn().Err(err).Msg("getBlocksByHashes failed") - if !errors.Is(err, context.Canceled) { - sh.removeStreams(whitelist) // Remote nodes cannot provide blocks with target hashes - } - return 0, errors.Wrap(err, "getBlocksByHashes") - } - d.logger.Info().Int("num blocks", len(blocks)).Msg("getBlockByHashes result") - - n, err := verifyAndInsertBlocks(d.bc, blocks) - numBlocksInsertedShortRangeHistogramVec.With(d.promLabels()).Observe(float64(n)) - if err != nil { - d.logger.Warn().Err(err).Int("blocks inserted", n).Msg("Insert block failed") - if sh.blameAllStreams(blocks, n, err) { - sh.removeStreams(whitelist) // Data provided by remote nodes is corrupted - } else { - // It is the last block gives a wrong commit sig. Blame the provider of the last block. - st2Blame := stids[len(stids)-1] - sh.removeStreams([]sttypes.StreamID{st2Blame}) - } - return n, err - } - d.logger.Info().Err(err).Int("blocks inserted", n).Msg("Insert block success") - - return len(blocks), nil -} - -func (d *Downloader) doShortRangeSyncForEpochSync() (int, error) { - numShortRangeCounterVec.With(d.promLabels()).Inc() - - srCtx, cancel := context.WithTimeout(d.ctx, shortRangeTimeout) - defer cancel() - - sh := &srHelper{ - syncProtocol: d.syncProtocol, - ctx: srCtx, - config: d.config, - logger: d.logger.With().Str("mode", "short range").Logger(), - } - - if err := sh.checkPrerequisites(); err != nil { - return 0, errors.Wrap(err, "prerequisite") - } - curBN := d.bc.CurrentBlock().NumberU64() - bns := make([]uint64, 0, numBlocksByNumPerRequest) - loopEpoch := d.bc.CurrentHeader().Epoch().Uint64() //+ 1 - for len(bns) < numBlocksByNumPerRequest { - blockNum := shard.Schedule.EpochLastBlock(loopEpoch) - if blockNum > curBN { - bns = append(bns, blockNum) - } - loopEpoch = loopEpoch + 1 - } - - if len(bns) == 0 { - return 0, nil - } - - blocks, streamID, err := sh.getBlocksChain(bns) - if err != nil { - return 0, errors.Wrap(err, "getHashChain") - } - if len(blocks) == 0 { - // short circuit for no sync is needed - return 0, nil - } - n, err := d.bc.InsertChain(blocks, true) - numBlocksInsertedShortRangeHistogramVec.With(d.promLabels()).Observe(float64(n)) - if err != nil { - sh.removeStreams([]sttypes.StreamID{streamID}) // Data provided by remote nodes is corrupted - return n, err - } - d.logger.Info().Err(err).Int("blocks inserted", n).Msg("Insert block success") - - return len(blocks), nil -} - -type srHelper struct { - syncProtocol syncProtocol - - ctx context.Context - config Config - logger zerolog.Logger -} - -func (sh *srHelper) getHashChain(bns []uint64) ([]common.Hash, []sttypes.StreamID, error) { - results := newBlockHashResults(bns) - - var wg sync.WaitGroup - wg.Add(sh.config.Concurrency) - - for i := 0; i != sh.config.Concurrency; i++ { - go func(index int) { - defer wg.Done() - - hashes, stid, err := sh.doGetBlockHashesRequest(bns) - if err != nil { - sh.logger.Warn().Err(err).Str("StreamID", string(stid)). - Msg("doGetBlockHashes return error") - return - } - sh.logger.Info(). - Str("StreamID", string(stid)). - Int("hashes", len(hashes)). - Interface("hashes", hashes).Int("index", index). - Msg("GetBlockHashesRequests response") - results.addResult(hashes, stid) - }(i) - } - wg.Wait() - - select { - case <-sh.ctx.Done(): - sh.logger.Info().Err(sh.ctx.Err()).Int("num blocks", results.numBlocksWithResults()). - Msg("short range sync get hashes timed out") - return nil, nil, sh.ctx.Err() - default: - } - - sh.logger.Info().Msg("compute longest hash chain") - hashChain, wl := results.computeLongestHashChain() - sh.logger.Info().Int("hashChain size", len(hashChain)).Int("whitelist", len(wl)). - Msg("computeLongestHashChain result") - return hashChain, wl, nil -} - -func (sh *srHelper) getBlocksChain(bns []uint64) ([]*types.Block, sttypes.StreamID, error) { - return sh.doGetBlocksByNumbersRequest(bns) -} - -func (sh *srHelper) getBlocksByHashes(hashes []common.Hash, whitelist []sttypes.StreamID) ([]*types.Block, []sttypes.StreamID, error) { - ctx, cancel := context.WithCancel(sh.ctx) - defer cancel() - m := newGetBlocksByHashManager(hashes, whitelist) - - var ( - wg sync.WaitGroup - gErr error - errLock sync.Mutex - ) - - concurrency := sh.config.Concurrency - if concurrency > m.numRequests() { - concurrency = m.numRequests() - } - - wg.Add(concurrency) - for i := 0; i != concurrency; i++ { - go func(index int) { - defer wg.Done() - defer cancel() // it's ok to cancel context more than once - - for { - if m.isDone() { - return - } - hashes, wl, err := m.getNextHashes() - if err != nil { - errLock.Lock() - gErr = err - errLock.Unlock() - return - } - if len(hashes) == 0 { - select { - case <-time.After(200 * time.Millisecond): - continue - case <-ctx.Done(): - return - } - } - blocks, stid, err := sh.doGetBlocksByHashesRequest(ctx, hashes, wl) - if err != nil { - sh.logger.Err(err).Str("StreamID", string(stid)).Msg("getBlocksByHashes worker failed") - m.handleResultError(hashes, stid) - } else { - sh.logger.Info().Str("StreamID", string(stid)).Int("blocks", len(blocks)). - Int("index", index).Msg("doGetBlocksByHashesRequest response") - m.addResult(hashes, blocks, stid) - } - } - }(i) - } - wg.Wait() - - if gErr != nil { - return nil, nil, gErr - } - select { - case <-sh.ctx.Done(): - res, _, _ := m.getResults() - sh.logger.Info().Err(sh.ctx.Err()).Int("num blocks", len(res)). - Msg("short range sync get blocks timed out") - return nil, nil, sh.ctx.Err() - default: - } - - return m.getResults() -} - -func (sh *srHelper) checkPrerequisites() error { - if sh.syncProtocol.NumStreams() < sh.config.Concurrency { - return errors.New("not enough streams") - } - return nil -} - -func (sh *srHelper) prepareBlockHashNumbers(curNumber uint64) []uint64 { - res := make([]uint64, 0, numBlockHashesPerRequest) - - for bn := curNumber + 1; bn <= curNumber+uint64(numBlockHashesPerRequest); bn++ { - res = append(res, bn) - } - return res -} - -func (sh *srHelper) doGetBlockHashesRequest(bns []uint64) ([]common.Hash, sttypes.StreamID, error) { - ctx, cancel := context.WithTimeout(sh.ctx, 1*time.Second) - defer cancel() - - hashes, stid, err := sh.syncProtocol.GetBlockHashes(ctx, bns) - if err != nil { - sh.logger.Warn().Err(err).Str("stream", string(stid)).Msg("failed to doGetBlockHashesRequest") - return nil, stid, err - } - if len(hashes) != len(bns) { - err := errors.New("unexpected get block hashes result delivered") - sh.logger.Warn().Err(err).Str("stream", string(stid)).Msg("failed to doGetBlockHashesRequest") - sh.syncProtocol.RemoveStream(stid) - return nil, stid, err - } - return hashes, stid, nil -} - -func (sh *srHelper) doGetBlocksByNumbersRequest(bns []uint64) ([]*types.Block, sttypes.StreamID, error) { - ctx, cancel := context.WithTimeout(sh.ctx, 2*time.Second) - defer cancel() - - blocks, stid, err := sh.syncProtocol.GetBlocksByNumber(ctx, bns) - if err != nil { - sh.logger.Warn().Err(err).Str("stream", string(stid)).Msg("failed to doGetBlockHashesRequest") - return nil, stid, err - } - return blocks, stid, nil -} - -func (sh *srHelper) doGetBlocksByHashesRequest(ctx context.Context, hashes []common.Hash, wl []sttypes.StreamID) ([]*types.Block, sttypes.StreamID, error) { - ctx, cancel := context.WithTimeout(sh.ctx, 10*time.Second) - defer cancel() - - blocks, stid, err := sh.syncProtocol.GetBlocksByHashes(ctx, hashes, - syncProto.WithWhitelist(wl)) - if err != nil { - sh.logger.Warn().Err(err).Str("stream", string(stid)).Msg("failed to getBlockByHashes") - return nil, stid, err - } - if err := checkGetBlockByHashesResult(blocks, hashes); err != nil { - sh.logger.Warn().Err(err).Str("stream", string(stid)).Msg("failed to getBlockByHashes") - sh.syncProtocol.RemoveStream(stid) - return nil, stid, err - } - return blocks, stid, nil -} - -func (sh *srHelper) removeStreams(sts []sttypes.StreamID) { - for _, st := range sts { - sh.syncProtocol.RemoveStream(st) - } -} - -// Only not to blame all whitelisted streams when the it's not the last block signature verification failed. -func (sh *srHelper) blameAllStreams(blocks types.Blocks, errIndex int, err error) bool { - if errors.As(err, &emptySigVerifyErr) && errIndex == len(blocks)-1 { - return false - } - return true -} - -func checkGetBlockByHashesResult(blocks []*types.Block, hashes []common.Hash) error { - if len(blocks) != len(hashes) { - return errors.New("unexpected number of getBlocksByHashes result") - } - for i, block := range blocks { - if block == nil { - return errors.New("nil block found") - } - if block.Hash() != hashes[i] { - return fmt.Errorf("unexpected block hash: %x / %x", block.Hash(), hashes[i]) - } - } - return nil -} - -type ( - blockHashResults struct { - bns []uint64 - results []map[sttypes.StreamID]common.Hash - - lock sync.Mutex - } -) - -func newBlockHashResults(bns []uint64) *blockHashResults { - results := make([]map[sttypes.StreamID]common.Hash, 0, len(bns)) - for range bns { - results = append(results, make(map[sttypes.StreamID]common.Hash)) - } - return &blockHashResults{ - bns: bns, - results: results, - } -} - -func (res *blockHashResults) addResult(hashes []common.Hash, stid sttypes.StreamID) { - res.lock.Lock() - defer res.lock.Unlock() - - for i, h := range hashes { - if h == emptyHash { - return // nil block hash reached - } - res.results[i][stid] = h - } - return -} - -func (res *blockHashResults) computeLongestHashChain() ([]common.Hash, []sttypes.StreamID) { - var ( - whitelist map[sttypes.StreamID]struct{} - hashChain []common.Hash - ) - for _, result := range res.results { - hash, nextWl := countHashMaxVote(result, whitelist) - if hash == emptyHash { - break - } - hashChain = append(hashChain, hash) - whitelist = nextWl - } - - sts := make([]sttypes.StreamID, 0, len(whitelist)) - for st := range whitelist { - sts = append(sts, st) - } - return hashChain, sts -} - -func (res *blockHashResults) numBlocksWithResults() int { - res.lock.Lock() - defer res.lock.Unlock() - - cnt := 0 - for _, result := range res.results { - if len(result) != 0 { - cnt++ - } - } - return cnt -} - -func countHashMaxVote(m map[sttypes.StreamID]common.Hash, whitelist map[sttypes.StreamID]struct{}) (common.Hash, map[sttypes.StreamID]struct{}) { - var ( - voteM = make(map[common.Hash]int) - res common.Hash - maxCnt = 0 - ) - - for st, h := range m { - if len(whitelist) != 0 { - if _, ok := whitelist[st]; !ok { - continue - } - } - if _, ok := voteM[h]; !ok { - voteM[h] = 0 - } - voteM[h]++ - if voteM[h] > maxCnt { - maxCnt = voteM[h] - res = h - } - } - - nextWl := make(map[sttypes.StreamID]struct{}) - for st, h := range m { - if h != res { - continue - } - if len(whitelist) != 0 { - if _, ok := whitelist[st]; ok { - nextWl[st] = struct{}{} - } - } else { - nextWl[st] = struct{}{} - } - } - return res, nextWl -} - -type getBlocksByHashManager struct { - hashes []common.Hash - pendings map[common.Hash]struct{} - results map[common.Hash]blockResult - whitelist []sttypes.StreamID - - lock sync.Mutex -} - -func newGetBlocksByHashManager(hashes []common.Hash, whitelist []sttypes.StreamID) *getBlocksByHashManager { - return &getBlocksByHashManager{ - hashes: hashes, - pendings: make(map[common.Hash]struct{}), - results: make(map[common.Hash]blockResult), - whitelist: whitelist, - } -} - -func (m *getBlocksByHashManager) getNextHashes() ([]common.Hash, []sttypes.StreamID, error) { - m.lock.Lock() - defer m.lock.Unlock() - - num := m.numBlocksPerRequest() - hashes := make([]common.Hash, 0, num) - if len(m.whitelist) == 0 { - return nil, nil, errors.New("empty white list") - } - - for _, hash := range m.hashes { - if len(hashes) == num { - break - } - _, ok1 := m.pendings[hash] - _, ok2 := m.results[hash] - if !ok1 && !ok2 { - hashes = append(hashes, hash) - } - } - sts := make([]sttypes.StreamID, len(m.whitelist)) - copy(sts, m.whitelist) - return hashes, sts, nil -} - -func (m *getBlocksByHashManager) numBlocksPerRequest() int { - val := divideCeil(len(m.hashes), len(m.whitelist)) - if val < numBlocksByHashesLowerCap { - val = numBlocksByHashesLowerCap - } - if val > numBlocksByHashesUpperCap { - val = numBlocksByHashesUpperCap - } - return val -} - -func (m *getBlocksByHashManager) numRequests() int { - return divideCeil(len(m.hashes), m.numBlocksPerRequest()) -} - -func (m *getBlocksByHashManager) addResult(hashes []common.Hash, blocks []*types.Block, stid sttypes.StreamID) { - m.lock.Lock() - defer m.lock.Unlock() - - for i, hash := range hashes { - block := blocks[i] - delete(m.pendings, hash) - m.results[hash] = blockResult{ - block: block, - stid: stid, - } - } -} - -func (m *getBlocksByHashManager) handleResultError(hashes []common.Hash, stid sttypes.StreamID) { - m.lock.Lock() - defer m.lock.Unlock() - - m.removeStreamID(stid) - - for _, hash := range hashes { - delete(m.pendings, hash) - } -} - -func (m *getBlocksByHashManager) getResults() ([]*types.Block, []sttypes.StreamID, error) { - m.lock.Lock() - defer m.lock.Unlock() - - blocks := make([]*types.Block, 0, len(m.hashes)) - stids := make([]sttypes.StreamID, 0, len(m.hashes)) - for _, hash := range m.hashes { - if m.results[hash].block == nil { - return nil, nil, errors.New("SANITY: nil block found") - } - blocks = append(blocks, m.results[hash].block) - stids = append(stids, m.results[hash].stid) - } - return blocks, stids, nil -} - -func (m *getBlocksByHashManager) isDone() bool { - m.lock.Lock() - defer m.lock.Unlock() - - return len(m.results) == len(m.hashes) -} - -func (m *getBlocksByHashManager) removeStreamID(target sttypes.StreamID) { - // O(n^2) complexity. But considering the whitelist size is small, should not - // have performance issue. -loop: - for i, stid := range m.whitelist { - if stid == target { - if i == len(m.whitelist) { - m.whitelist = m.whitelist[:i] - } else { - m.whitelist = append(m.whitelist[:i], m.whitelist[i+1:]...) - } - goto loop - } - } - return -} - -func divideCeil(x, y int) int { - fVal := float64(x) / float64(y) - return int(math.Ceil(fVal)) -} diff --git a/hmy/downloader/shortrange_test.go b/hmy/downloader/shortrange_test.go deleted file mode 100644 index 8f23f2507..000000000 --- a/hmy/downloader/shortrange_test.go +++ /dev/null @@ -1,437 +0,0 @@ -package downloader - -import ( - "context" - "errors" - "fmt" - "sync" - "testing" - - "github.com/ethereum/go-ethereum/common" - sttypes "github.com/harmony-one/harmony/p2p/stream/types" - "github.com/rs/zerolog" -) - -func TestDownloader_doShortRangeSync(t *testing.T) { - chain := newTestBlockChain(100, nil) - - d := &Downloader{ - bc: chain, - syncProtocol: newTestSyncProtocol(105, 32, nil), - config: Config{ - Concurrency: 16, - MinStreams: 16, - }, - ctx: context.Background(), - logger: zerolog.Logger{}, - } - n, err := d.doShortRangeSync() - if err != nil { - t.Error(err) - } - if n == 0 { - t.Error("not synced") - } - if curNum := d.bc.CurrentBlock().NumberU64(); curNum != 105 { - t.Errorf("unexpected block number after sync: %v / %v", curNum, 105) - } -} - -func TestSrHelper_getHashChain(t *testing.T) { - tests := []struct { - curBN uint64 - syncProtocol syncProtocol - config Config - - expHashChainSize int - expStSize int - }{ - { - curBN: 100, - syncProtocol: newTestSyncProtocol(1000, 32, nil), - config: Config{ - Concurrency: 16, - MinStreams: 16, - }, - expHashChainSize: numBlockHashesPerRequest, - expStSize: 16, // Concurrency - }, - { - curBN: 100, - syncProtocol: newTestSyncProtocol(100, 32, nil), - config: Config{ - Concurrency: 16, - MinStreams: 16, - }, - expHashChainSize: 0, - expStSize: 0, - }, - { - curBN: 100, - syncProtocol: newTestSyncProtocol(110, 32, nil), - config: Config{ - Concurrency: 16, - MinStreams: 16, - }, - expHashChainSize: 10, - expStSize: 16, - }, - { - // stream size is smaller than concurrency - curBN: 100, - syncProtocol: newTestSyncProtocol(1000, 10, nil), - config: Config{ - Concurrency: 16, - MinStreams: 8, - }, - expHashChainSize: numBlockHashesPerRequest, - expStSize: 10, - }, - { - // one stream reports an error, else are fine - curBN: 100, - syncProtocol: newTestSyncProtocol(1000, 32, makeOnceErrorFunc()), - config: Config{ - Concurrency: 16, - MinStreams: 16, - }, - expHashChainSize: numBlockHashesPerRequest, - expStSize: 15, // Concurrency - }, - { - // error happens at one block number, all stream removed - curBN: 100, - syncProtocol: newTestSyncProtocol(1000, 32, func(bn uint64) error { - if bn == 110 { - return errors.New("test error") - } - return nil - }), - config: Config{ - Concurrency: 16, - MinStreams: 16, - }, - expHashChainSize: 0, - expStSize: 0, - }, - { - curBN: 100, - syncProtocol: newTestSyncProtocol(1000, 32, nil), - config: Config{ - Concurrency: 16, - MinStreams: 16, - }, - expHashChainSize: numBlockHashesPerRequest, - expStSize: 16, // Concurrency - }, - } - - for i, test := range tests { - sh := &srHelper{ - syncProtocol: test.syncProtocol, - ctx: context.Background(), - config: test.config, - } - hashChain, wl, err := sh.getHashChain(sh.prepareBlockHashNumbers(test.curBN)) - if err != nil { - t.Error(err) - } - if len(hashChain) != test.expHashChainSize { - t.Errorf("Test %v: hash chain size unexpected: %v / %v", i, len(hashChain), test.expHashChainSize) - } - if len(wl) != test.expStSize { - t.Errorf("Test %v: whitelist size unexpected: %v / %v", i, len(wl), test.expStSize) - } - } -} - -func TestSrHelper_GetBlocksByHashes(t *testing.T) { - tests := []struct { - hashes []common.Hash - syncProtocol syncProtocol - config Config - - expBlockNumbers []uint64 - expErr error - }{ - { - hashes: testNumberToHashes([]uint64{101, 102, 103, 104, 105, 106, 107, 108, 109, 110}), - syncProtocol: newTestSyncProtocol(1000, 32, nil), - config: Config{ - Concurrency: 16, - MinStreams: 16, - }, - expBlockNumbers: []uint64{101, 102, 103, 104, 105, 106, 107, 108, 109, 110}, - expErr: nil, - }, - { - // remote node cannot give the block with the given hash - hashes: testNumberToHashes([]uint64{101, 102, 103, 104, 105, 106, 107, 108, 109, 110}), - syncProtocol: newTestSyncProtocol(100, 32, nil), - config: Config{ - Concurrency: 16, - MinStreams: 16, - }, - expBlockNumbers: []uint64{}, - expErr: errors.New("all streams are bad"), - }, - { - // one request return an error, else are fine - hashes: testNumberToHashes([]uint64{101, 102, 103, 104, 105, 106, 107, 108, 109, 110}), - syncProtocol: newTestSyncProtocol(1000, 32, makeOnceErrorFunc()), - config: Config{ - Concurrency: 16, - MinStreams: 16, - }, - expBlockNumbers: []uint64{101, 102, 103, 104, 105, 106, 107, 108, 109, 110}, - expErr: nil, - }, - { - // All nodes encounter an error - hashes: testNumberToHashes([]uint64{101, 102, 103, 104, 105, 106, 107, 108, 109, 110}), - syncProtocol: newTestSyncProtocol(1000, 32, func(n uint64) error { - if n == 109 { - return errors.New("test error") - } - return nil - }), - config: Config{ - Concurrency: 16, - MinStreams: 16, - }, - expErr: errors.New("error expected"), - }, - } - for i, test := range tests { - sh := &srHelper{ - syncProtocol: test.syncProtocol, - ctx: context.Background(), - config: test.config, - } - blocks, _, err := sh.getBlocksByHashes(test.hashes, makeStreamIDs(5)) - if (err == nil) != (test.expErr == nil) { - t.Errorf("Test %v: unexpected error %v / %v", i, err, test.expErr) - } - if len(blocks) != len(test.expBlockNumbers) { - t.Errorf("Test %v: unepxected block number size: %v / %v", i, len(blocks), len(test.expBlockNumbers)) - } - for i, block := range blocks { - gotNum := testHashToNumber(block.Hash()) - if gotNum != test.expBlockNumbers[i] { - t.Errorf("Test %v: unexpected block number", i) - } - } - } -} - -func TestBlockHashResult_ComputeLongestHashChain(t *testing.T) { - tests := []struct { - bns []uint64 - results map[sttypes.StreamID][]int64 - expChain []uint64 - expWhitelist map[sttypes.StreamID]struct{} - expErr error - }{ - { - bns: []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, - results: map[sttypes.StreamID][]int64{ - makeStreamID(0): {1, 2, 3, 4, 5, 6, 7}, - makeStreamID(1): {1, 2, 3, 4, 5, 6, 7}, - makeStreamID(2): {1, 2, 3, 4, 5}, // left behind - }, - expChain: []uint64{1, 2, 3, 4, 5, 6, 7}, - expWhitelist: map[sttypes.StreamID]struct{}{ - makeStreamID(0): {}, - makeStreamID(1): {}, - }, - }, - { - // minority fork - bns: []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, - results: map[sttypes.StreamID][]int64{ - makeStreamID(0): {1, 2, 3, 4, 5, 6, 7}, - makeStreamID(1): {1, 2, 3, 4, 5, 6, 7}, - makeStreamID(2): {1, 2, 3, 4, 5, 7, 8, 9}, - }, - expChain: []uint64{1, 2, 3, 4, 5, 6, 7}, - expWhitelist: map[sttypes.StreamID]struct{}{ - makeStreamID(0): {}, - makeStreamID(1): {}, - }, - }, { - // nil block - bns: []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, - results: map[sttypes.StreamID][]int64{ - makeStreamID(0): {}, - makeStreamID(1): {}, - makeStreamID(2): {}, - }, - expChain: nil, - expWhitelist: nil, - }, { - // not continuous block - bns: []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, - results: map[sttypes.StreamID][]int64{ - makeStreamID(0): {1, 2, 3, 4, 5, 6, 7, -1, 9}, - makeStreamID(1): {1, 2, 3, 4, 5, 6, 7}, - makeStreamID(2): {1, 2, 3, 4, 5, 7, 8, 9}, - }, - expChain: []uint64{1, 2, 3, 4, 5, 6, 7}, - expWhitelist: map[sttypes.StreamID]struct{}{ - makeStreamID(0): {}, - makeStreamID(1): {}, - }, - }, - { - // not continuous block - bns: []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, - results: map[sttypes.StreamID][]int64{}, - expErr: errors.New("zero result"), - }, - } - - for i, test := range tests { - res := newBlockHashResults(test.bns) - for st, hs := range test.results { - res.addResult(makeTestBlockHashes(hs), st) - } - - chain, wl := res.computeLongestHashChain() - - if err := checkHashChainResult(chain, test.expChain); err != nil { - t.Errorf("Test %v: %v", i, err) - } - if err := checkStreamSetEqual(streamIDListToMap(wl), test.expWhitelist); err != nil { - t.Errorf("Test %v: %v", i, err) - } - } -} - -func checkHashChainResult(gots []common.Hash, exps []uint64) error { - if len(gots) != len(exps) { - return errors.New("unexpected size") - } - for i, got := range gots { - exp := exps[i] - if got != makeTestBlockHash(exp) { - return errors.New("unexpected block hash") - } - } - return nil -} - -func TestHashMaxVote(t *testing.T) { - tests := []struct { - m map[sttypes.StreamID]common.Hash - whitelist map[sttypes.StreamID]struct{} - expRes common.Hash - expWhitelist map[sttypes.StreamID]struct{} - }{ - { - m: map[sttypes.StreamID]common.Hash{ - makeStreamID(0): makeTestBlockHash(0), - makeStreamID(1): makeTestBlockHash(1), - makeStreamID(2): makeTestBlockHash(1), - }, - whitelist: map[sttypes.StreamID]struct{}{ - makeStreamID(0): {}, - makeStreamID(1): {}, - makeStreamID(2): {}, - }, - expRes: makeTestBlockHash(1), - expWhitelist: map[sttypes.StreamID]struct{}{ - makeStreamID(1): {}, - makeStreamID(2): {}, - }, - }, { - m: map[sttypes.StreamID]common.Hash{ - makeStreamID(0): makeTestBlockHash(0), - makeStreamID(1): makeTestBlockHash(1), - makeStreamID(2): makeTestBlockHash(1), - }, - whitelist: nil, - expRes: makeTestBlockHash(1), - expWhitelist: map[sttypes.StreamID]struct{}{ - makeStreamID(1): {}, - makeStreamID(2): {}, - }, - }, { - m: map[sttypes.StreamID]common.Hash{ - makeStreamID(0): makeTestBlockHash(0), - makeStreamID(1): makeTestBlockHash(1), - makeStreamID(2): makeTestBlockHash(1), - makeStreamID(3): makeTestBlockHash(0), - makeStreamID(4): makeTestBlockHash(0), - }, - whitelist: map[sttypes.StreamID]struct{}{ - makeStreamID(0): {}, - makeStreamID(1): {}, - makeStreamID(2): {}, - }, - expRes: makeTestBlockHash(1), - expWhitelist: map[sttypes.StreamID]struct{}{ - makeStreamID(1): {}, - makeStreamID(2): {}, - }, - }, - } - - for i, test := range tests { - h, wl := countHashMaxVote(test.m, test.whitelist) - - if h != test.expRes { - t.Errorf("Test %v: unexpected hash: %x / %x", i, h, test.expRes) - } - if err := checkStreamSetEqual(wl, test.expWhitelist); err != nil { - t.Errorf("Test %v: %v", i, err) - } - } -} - -func checkStreamSetEqual(m1, m2 map[sttypes.StreamID]struct{}) error { - if len(m1) != len(m2) { - return fmt.Errorf("unexpected size: %v / %v", len(m1), len(m2)) - } - for st := range m1 { - if _, ok := m2[st]; !ok { - return errors.New("not equal") - } - } - return nil -} - -func makeTestBlockHashes(bns []int64) []common.Hash { - hs := make([]common.Hash, 0, len(bns)) - for _, bn := range bns { - if bn < 0 { - hs = append(hs, emptyHash) - } else { - hs = append(hs, makeTestBlockHash(uint64(bn))) - } - } - return hs -} - -func streamIDListToMap(sts []sttypes.StreamID) map[sttypes.StreamID]struct{} { - res := make(map[sttypes.StreamID]struct{}) - - for _, st := range sts { - res[st] = struct{}{} - } - return res -} - -func makeTestBlockHash(bn uint64) common.Hash { - return makeTestBlock(bn).Hash() -} - -func makeOnceErrorFunc() func(num uint64) error { - var once sync.Once - return func(num uint64) error { - var err error - once.Do(func() { - err = errors.New("test error expected") - }) - return err - } -} diff --git a/hmy/downloader/types.go b/hmy/downloader/types.go deleted file mode 100644 index 56c6bcb8c..000000000 --- a/hmy/downloader/types.go +++ /dev/null @@ -1,292 +0,0 @@ -package downloader - -import ( - "container/heap" - "sync" - - "github.com/ethereum/go-ethereum/common" - "github.com/harmony-one/harmony/core/types" - sttypes "github.com/harmony-one/harmony/p2p/stream/types" -) - -var ( - emptyHash common.Hash -) - -type status struct { - isSyncing bool - targetBN uint64 - lock sync.Mutex -} - -func newStatus() status { - return status{} -} - -func (s *status) startSyncing() { - s.lock.Lock() - defer s.lock.Unlock() - - s.isSyncing = true -} - -func (s *status) setTargetBN(val uint64) { - s.lock.Lock() - defer s.lock.Unlock() - - s.targetBN = val -} - -func (s *status) finishSyncing() { - s.lock.Lock() - defer s.lock.Unlock() - - s.isSyncing = false - s.targetBN = 0 -} - -func (s *status) get() (bool, uint64) { - s.lock.Lock() - defer s.lock.Unlock() - - return s.isSyncing, s.targetBN -} - -type getBlocksResult struct { - bns []uint64 - blocks []*types.Block - stid sttypes.StreamID -} - -type resultQueue struct { - results *priorityQueue - lock sync.Mutex -} - -func newResultQueue() *resultQueue { - pq := make(priorityQueue, 0, 200) // 200 - rough estimate - heap.Init(&pq) - return &resultQueue{ - results: &pq, - } -} - -// addBlockResults adds the blocks to the result queue to be processed by insertChainLoop. -// If a nil block is detected in the block list, will not process further blocks. -func (rq *resultQueue) addBlockResults(blocks []*types.Block, stid sttypes.StreamID) { - rq.lock.Lock() - defer rq.lock.Unlock() - - for _, block := range blocks { - if block == nil { - continue - } - heap.Push(rq.results, &blockResult{ - block: block, - stid: stid, - }) - } - return -} - -// popBlockResults pop a continuous list of blocks starting at expStartBN with capped size. -// Return the stale block numbers as the second return value -func (rq *resultQueue) popBlockResults(expStartBN uint64, cap int) ([]*blockResult, []uint64) { - rq.lock.Lock() - defer rq.lock.Unlock() - - var ( - res = make([]*blockResult, 0, cap) - stales []uint64 - ) - - for cnt := 0; rq.results.Len() > 0 && cnt < cap; cnt++ { - br := heap.Pop(rq.results).(*blockResult) - // stale block number - if br.block.NumberU64() < expStartBN { - stales = append(stales, br.block.NumberU64()) - continue - } - if br.block.NumberU64() != expStartBN { - heap.Push(rq.results, br) - return res, stales - } - res = append(res, br) - expStartBN++ - } - return res, stales -} - -// removeResultsByStreamID remove the block results of the given stream, return the block -// number removed from the queue -func (rq *resultQueue) removeResultsByStreamID(stid sttypes.StreamID) []uint64 { - rq.lock.Lock() - defer rq.lock.Unlock() - - var removed []uint64 - -Loop: - for { - for i, res := range *rq.results { - blockRes := res.(*blockResult) - if blockRes.stid == stid { - rq.removeByIndex(i) - removed = append(removed, blockRes.block.NumberU64()) - goto Loop - } - } - break - } - return removed -} - -func (rq *resultQueue) length() int { - return len(*rq.results) -} - -func (rq *resultQueue) removeByIndex(index int) { - heap.Remove(rq.results, index) -} - -// bnPrioritizedItem is the item which uses block number to determine its priority -type bnPrioritizedItem interface { - getBlockNumber() uint64 -} - -type blockResult struct { - block *types.Block - stid sttypes.StreamID -} - -func (br *blockResult) getBlockNumber() uint64 { - return br.block.NumberU64() -} - -func blockResultsToBlocks(results []*blockResult) []*types.Block { - blocks := make([]*types.Block, 0, len(results)) - - for _, result := range results { - blocks = append(blocks, result.block) - } - return blocks -} - -type ( - prioritizedNumber uint64 - - prioritizedNumbers struct { - q *priorityQueue - } -) - -func (b prioritizedNumber) getBlockNumber() uint64 { - return uint64(b) -} - -func newPrioritizedNumbers() *prioritizedNumbers { - pqs := make(priorityQueue, 0) - heap.Init(&pqs) - return &prioritizedNumbers{ - q: &pqs, - } -} - -func (pbs *prioritizedNumbers) push(bn uint64) { - heap.Push(pbs.q, prioritizedNumber(bn)) -} - -func (pbs *prioritizedNumbers) pop() uint64 { - if pbs.q.Len() == 0 { - return 0 - } - item := heap.Pop(pbs.q) - return uint64(item.(prioritizedNumber)) -} - -func (pbs *prioritizedNumbers) length() int { - return len(*pbs.q) -} - -type ( - blockByNumber types.Block - - // blocksByNumber is the priority queue ordered by number - blocksByNumber struct { - q *priorityQueue - cap int - } -) - -func (b *blockByNumber) getBlockNumber() uint64 { - raw := (*types.Block)(b) - return raw.NumberU64() -} - -func newBlocksByNumber(cap int) *blocksByNumber { - pqs := make(priorityQueue, 0) - heap.Init(&pqs) - return &blocksByNumber{ - q: &pqs, - cap: cap, - } -} - -func (bs *blocksByNumber) push(b *types.Block) { - heap.Push(bs.q, (*blockByNumber)(b)) - for bs.q.Len() > bs.cap { - heap.Pop(bs.q) - } -} - -func (bs *blocksByNumber) pop() *types.Block { - if bs.q.Len() == 0 { - return nil - } - item := heap.Pop(bs.q) - return (*types.Block)(item.(*blockByNumber)) -} - -func (bs *blocksByNumber) len() int { - return bs.q.Len() -} - -// priorityQueue is a priorityQueue with lowest block number with highest priority -type priorityQueue []bnPrioritizedItem - -// resultQueue implements heap interface -func (q priorityQueue) Len() int { - return len(q) -} - -// resultQueue implements heap interface -func (q priorityQueue) Less(i, j int) bool { - bn1 := q[i].getBlockNumber() - bn2 := q[j].getBlockNumber() - return bn1 < bn2 // small block number has higher priority -} - -// resultQueue implements heap interface -func (q priorityQueue) Swap(i, j int) { - q[i], q[j] = q[j], q[i] -} - -// resultQueue implements heap interface -func (q *priorityQueue) Push(x interface{}) { - item, ok := x.(bnPrioritizedItem) - if !ok { - panic("wrong type of getBlockNumber interface") - } - *q = append(*q, item) -} - -// resultQueue implements heap interface -func (q *priorityQueue) Pop() interface{} { - prev := *q - n := len(prev) - if n == 0 { - return nil - } - res := prev[n-1] - *q = prev[0 : n-1] - return res -} diff --git a/hmy/downloader/types_test.go b/hmy/downloader/types_test.go deleted file mode 100644 index 3a924df3d..000000000 --- a/hmy/downloader/types_test.go +++ /dev/null @@ -1,266 +0,0 @@ -package downloader - -import ( - "container/heap" - "fmt" - "math/big" - "strings" - "testing" - - "github.com/harmony-one/harmony/block" - headerV3 "github.com/harmony-one/harmony/block/v3" - "github.com/harmony-one/harmony/core/types" - bls_cosi "github.com/harmony-one/harmony/crypto/bls" - sttypes "github.com/harmony-one/harmony/p2p/stream/types" -) - -func TestResultQueue_AddBlockResults(t *testing.T) { - tests := []struct { - initBNs []uint64 - addBNs []uint64 - expSize int - }{ - { - initBNs: []uint64{}, - addBNs: []uint64{1, 2, 3, 4}, - expSize: 4, - }, - { - initBNs: []uint64{1, 2, 3, 4}, - addBNs: []uint64{5, 6, 7, 8}, - expSize: 8, - }, - } - for i, test := range tests { - rq := makeTestResultQueue(test.initBNs) - rq.addBlockResults(makeTestBlocks(test.addBNs), "") - - if rq.results.Len() != test.expSize { - t.Errorf("Test %v: unexpected size: %v / %v", i, rq.results.Len(), test.expSize) - } - } -} - -func TestResultQueue_PopBlockResults(t *testing.T) { - tests := []struct { - initBNs []uint64 - cap int - expStart uint64 - expSize int - staleSize int - }{ - { - initBNs: []uint64{1, 2, 3, 4, 5}, - cap: 3, - expStart: 1, - expSize: 3, - staleSize: 0, - }, - { - initBNs: []uint64{1, 2, 3, 4, 5}, - cap: 10, - expStart: 1, - expSize: 5, - staleSize: 0, - }, - { - initBNs: []uint64{1, 3, 4, 5}, - cap: 10, - expStart: 1, - expSize: 1, - staleSize: 0, - }, - { - initBNs: []uint64{1, 2, 3, 4, 5}, - cap: 10, - expStart: 0, - expSize: 0, - staleSize: 0, - }, - { - initBNs: []uint64{1, 1, 1, 1, 2}, - cap: 10, - expStart: 1, - expSize: 2, - staleSize: 3, - }, - { - initBNs: []uint64{1, 2, 3, 4, 5}, - cap: 10, - expStart: 2, - expSize: 4, - staleSize: 1, - }, - } - for i, test := range tests { - rq := makeTestResultQueue(test.initBNs) - res, stales := rq.popBlockResults(test.expStart, test.cap) - if len(res) != test.expSize { - t.Errorf("Test %v: unexpect size %v / %v", i, len(res), test.expSize) - } - if len(stales) != test.staleSize { - t.Errorf("Test %v: unexpect stale size %v / %v", i, len(stales), test.staleSize) - } - } -} - -func TestResultQueue_RemoveResultsByStreamID(t *testing.T) { - tests := []struct { - rq *resultQueue - rmStreamID sttypes.StreamID - removed int - expSize int - }{ - { - rq: makeTestResultQueue([]uint64{1, 2, 3, 4}), - rmStreamID: "test stream id", - removed: 4, - expSize: 0, - }, - { - rq: func() *resultQueue { - rq := makeTestResultQueue([]uint64{2, 3, 4, 5}) - rq.addBlockResults([]*types.Block{ - makeTestBlock(1), - makeTestBlock(5), - makeTestBlock(6), - }, "another test stream id") - return rq - }(), - rmStreamID: "test stream id", - removed: 4, - expSize: 3, - }, - { - rq: func() *resultQueue { - rq := makeTestResultQueue([]uint64{2, 3, 4, 5}) - rq.addBlockResults([]*types.Block{ - makeTestBlock(1), - makeTestBlock(5), - makeTestBlock(6), - }, "another test stream id") - return rq - }(), - rmStreamID: "another test stream id", - removed: 3, - expSize: 4, - }, - } - for i, test := range tests { - res := test.rq.removeResultsByStreamID(test.rmStreamID) - if len(res) != test.removed { - t.Errorf("Test %v: unexpected number removed %v / %v", i, len(res), test.removed) - } - if gotSize := test.rq.results.Len(); gotSize != test.expSize { - t.Errorf("Test %v: unexpected number after removal %v / %v", i, gotSize, test.expSize) - } - } -} - -func makeTestResultQueue(bns []uint64) *resultQueue { - rq := newResultQueue() - for _, bn := range bns { - heap.Push(rq.results, &blockResult{ - block: makeTestBlock(bn), - stid: "test stream id", - }) - } - return rq -} - -func TestPrioritizedBlocks(t *testing.T) { - addBNs := []uint64{4, 7, 6, 9} - - bns := newPrioritizedNumbers() - for _, bn := range addBNs { - bns.push(bn) - } - prevBN := uint64(0) - for len(*bns.q) > 0 { - b := bns.pop() - if b < prevBN { - t.Errorf("number not incrementing") - } - prevBN = b - } - if last := bns.pop(); last != 0 { - t.Errorf("last elem is not 0") - } -} - -func TestBlocksByNumber(t *testing.T) { - addBNs := []uint64{4, 7, 6, 9} - - bns := newBlocksByNumber(10) - for _, bn := range addBNs { - bns.push(makeTestBlock(bn)) - } - if bns.len() != len(addBNs) { - t.Errorf("size unexpected: %v / %v", bns.len(), len(addBNs)) - } - prevBN := uint64(0) - for len(*bns.q) > 0 { - b := bns.pop() - if b.NumberU64() < prevBN { - t.Errorf("number not incrementing") - } - prevBN = b.NumberU64() - } - if lastBlock := bns.pop(); lastBlock != nil { - t.Errorf("last block is not nil") - } -} - -func TestPriorityQueue(t *testing.T) { - testBNs := []uint64{1, 9, 2, 4, 5, 12} - pq := make(priorityQueue, 0, 10) - heap.Init(&pq) - for _, bn := range testBNs { - heap.Push(&pq, &blockResult{ - block: makeTestBlock(bn), - stid: "", - }) - } - cmpBN := uint64(0) - for pq.Len() > 0 { - bn := heap.Pop(&pq).(*blockResult).block.NumberU64() - if bn < cmpBN { - t.Errorf("not incrementing") - } - cmpBN = bn - } - if pq.Len() != 0 { - t.Errorf("after poping, size not 0") - } -} - -func makeTestBlocks(bns []uint64) []*types.Block { - blocks := make([]*types.Block, 0, len(bns)) - for _, bn := range bns { - blocks = append(blocks, makeTestBlock(bn)) - } - return blocks -} - -func makeTestBlock(bn uint64) *types.Block { - testHeader := &block.Header{Header: headerV3.NewHeader()} - testHeader.SetNumber(big.NewInt(int64(bn))) - testHeader.SetLastCommitSignature(bls_cosi.SerializedSignature{}) - testHeader.SetLastCommitBitmap(make([]byte, 10)) - block := types.NewBlockWithHeader(testHeader) - block.SetCurrentCommitSig(make([]byte, 106)) - return block -} - -func assertError(got, expect error) error { - if (got == nil) != (expect == nil) { - return fmt.Errorf("unexpected error [%v] / [%v]", got, expect) - } - if (got == nil) || (expect == nil) { - return nil - } - if !strings.Contains(got.Error(), expect.Error()) { - return fmt.Errorf("unexpected error [%v] / [%v]", got, expect) - } - return nil -} diff --git a/node/node_syncing.go b/node/node_syncing.go index 84eb1256f..a0efaba44 100644 --- a/node/node_syncing.go +++ b/node/node_syncing.go @@ -24,7 +24,6 @@ import ( downloader_pb "github.com/harmony-one/harmony/api/service/legacysync/downloader/proto" "github.com/harmony-one/harmony/api/service/stagedstreamsync" "github.com/harmony-one/harmony/api/service/stagedsync" - "github.com/harmony-one/harmony/api/service/synchronize" "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/types" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" @@ -875,25 +874,13 @@ type Downloaders interface { } func (node *Node) getDownloaders() Downloaders { - if node.NodeConfig.StagedSync { - syncService := node.serviceManager.GetService(service.StagedStreamSync) - if syncService == nil { - return nil - } - dsService, ok := syncService.(*stagedstreamsync.StagedStreamSyncService) - if !ok { - return nil - } - return dsService.Downloaders - } else { - syncService := node.serviceManager.GetService(service.Synchronize) - if syncService == nil { - return nil - } - dsService, ok := syncService.(*synchronize.Service) - if !ok { - return nil - } - return dsService.Downloaders + syncService := node.serviceManager.GetService(service.Synchronize) + if syncService == nil { + return nil + } + dsService, ok := syncService.(*stagedstreamsync.StagedStreamSyncService) + if !ok { + return nil } + return dsService.Downloaders }