From 86fca2070f141a989fcee4b3f6fa14cfd789c37e Mon Sep 17 00:00:00 2001 From: Konstantin <355847+Frozen@users.noreply.github.com> Date: Tue, 5 Dec 2023 11:34:03 -0400 Subject: [PATCH] Fixed infinity loop sync. (#4575) * Removed outdated check. * Fallback for old sync for BeaconBlockChannel. * Additional logs. * fix: max-rate bellow the era min-rate (#4552) * fix: max-rate bellow the era min-rate * fix comments * add localnet epoch config * update config * update config * update config * update config * Revert "fix: max-rate bellow the era min-rate (#4552)" (#4578) This reverts commit f9934683252e42d73e1cd6f71e82f687a456999b. --------- Co-authored-by: Diego Nava <8563843+diego1q2w@users.noreply.github.com> --- api/service/stagedstreamsync/staged_stream_sync.go | 3 ++- api/service/stagedstreamsync/syncing.go | 8 ++++++-- consensus/consensus.go | 12 +++++++----- consensus/consensus_v2.go | 7 ++++--- consensus/downloader.go | 2 +- consensus/validator.go | 2 +- core/blockchain_impl.go | 2 +- core/epochchain.go | 3 ++- core/rawdb/accessors_offchain.go | 2 +- node/node_handler.go | 7 +++++-- node/node_syncing.go | 2 +- 11 files changed, 31 insertions(+), 19 deletions(-) diff --git a/api/service/stagedstreamsync/staged_stream_sync.go b/api/service/stagedstreamsync/staged_stream_sync.go index 98922af28..0a14d0cb3 100644 --- a/api/service/stagedstreamsync/staged_stream_sync.go +++ b/api/service/stagedstreamsync/staged_stream_sync.go @@ -642,8 +642,9 @@ func (ss *StagedStreamSync) addConsensusLastMile(bc core.BlockChain, cs *consens case errors.Is(err, core.ErrNotLastBlockInEpoch): case err != nil: return errors.Wrap(err, "failed to InsertChain") + default: + hashes = append(hashes, block.Header().Hash()) } - hashes = append(hashes, block.Header().Hash()) } return nil }) diff --git a/api/service/stagedstreamsync/syncing.go b/api/service/stagedstreamsync/syncing.go index 738f2f920..9e8926468 100644 --- a/api/service/stagedstreamsync/syncing.go +++ b/api/service/stagedstreamsync/syncing.go @@ -219,6 +219,8 @@ func (s *StagedStreamSync) Debug(source string, msg interface{}) { // For each iteration, estimate the current block number, then fetch block & insert to blockchain func (s *StagedStreamSync) doSync(downloaderContext context.Context, initSync bool) (uint64, int, error) { + startedNumber := s.bc.CurrentBlock().NumberU64() + var totalInserted int s.initSync = initSync @@ -249,7 +251,7 @@ func (s *StagedStreamSync) doSync(downloaderContext context.Context, initSync bo for { ctx, cancel := context.WithCancel(downloaderContext) - n, err := s.doSyncCycle(ctx, initSync) + n, err := s.doSyncCycle(ctx) if err != nil { utils.Logger().Error(). Err(err). @@ -281,6 +283,8 @@ func (s *StagedStreamSync) doSync(downloaderContext context.Context, initSync bo Bool("isBeacon", s.isBeacon). Uint32("shard", s.bc.ShardID()). Int("blocks", totalInserted). + Uint64("startedNumber", startedNumber). + Uint64("currentNumber", s.bc.CurrentBlock().NumberU64()). Msg(WrapStagedSyncMsg("sync cycle blocks inserted successfully")) } @@ -304,7 +308,7 @@ func (s *StagedStreamSync) doSync(downloaderContext context.Context, initSync bo return estimatedHeight, totalInserted, nil } -func (s *StagedStreamSync) doSyncCycle(ctx context.Context, initSync bool) (int, error) { +func (s *StagedStreamSync) doSyncCycle(ctx context.Context) (int, error) { // TODO: initSync=true means currentCycleNumber==0, so we can remove initSync diff --git a/consensus/consensus.go b/consensus/consensus.go index 019fd8542..18b53e682 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -40,6 +40,10 @@ const ( AsyncProposal ) +type DownloadAsync interface { + DownloadAsync() +} + // Consensus is the main struct with all states and data related to consensus process. type Consensus struct { Decider quorum.Decider @@ -122,9 +126,7 @@ type Consensus struct { // finalityCounter keep tracks of the finality time finalityCounter atomic.Value //int64 - dHelper interface { - DownloadAsync() - } + dHelper DownloadAsync // Both flags only for initialization state. start bool @@ -190,10 +192,10 @@ func (consensus *Consensus) BlocksSynchronized() { } // BlocksNotSynchronized lets the main loop know that block is not synchronized -func (consensus *Consensus) BlocksNotSynchronized() { +func (consensus *Consensus) BlocksNotSynchronized(reason string) { consensus.mutex.Lock() defer consensus.mutex.Unlock() - consensus.syncNotReadyChan() + consensus.syncNotReadyChan(reason) } // VdfSeedSize returns the number of VRFs for VDF computation diff --git a/consensus/consensus_v2.go b/consensus/consensus_v2.go index 514feaf86..0eb6e338d 100644 --- a/consensus/consensus_v2.go +++ b/consensus/consensus_v2.go @@ -359,11 +359,12 @@ func (consensus *Consensus) syncReadyChan() { } } -func (consensus *Consensus) syncNotReadyChan() { - consensus.getLogger().Info().Msg("[ConsensusMainLoop] syncNotReadyChan") +func (consensus *Consensus) syncNotReadyChan(reason string) { + mode := consensus.current.Mode() consensus.setBlockNum(consensus.Blockchain().CurrentHeader().Number().Uint64() + 1) consensus.current.SetMode(Syncing) - consensus.getLogger().Info().Msg("[ConsensusMainLoop] Node is OUT OF SYNC") + consensus.getLogger().Info().Msgf("[ConsensusMainLoop] syncNotReadyChan, prev %s, reason %s", mode.String(), reason) + consensus.getLogger().Info().Msgf("[ConsensusMainLoop] Node is OUT OF SYNC, reason: %s", reason) consensusSyncCounterVec.With(prometheus.Labels{"consensus": "out_of_sync"}).Inc() } diff --git a/consensus/downloader.go b/consensus/downloader.go index 804a25aab..595d07b01 100644 --- a/consensus/downloader.go +++ b/consensus/downloader.go @@ -61,7 +61,7 @@ func (dh *downloadHelper) downloadStartedLoop(c *Consensus) { for { select { case <-dh.startedCh: - c.BlocksNotSynchronized() + c.BlocksNotSynchronized("downloadStartedLoop") case err := <-dh.startedSub.Err(): c.GetLogger().Info().Err(err).Msg("consensus download finished loop closed") diff --git a/consensus/validator.go b/consensus/validator.go index c148a6189..891fe0c03 100644 --- a/consensus/validator.go +++ b/consensus/validator.go @@ -65,7 +65,7 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) { _, err := consensus.ValidateNewBlock(recvMsg) if err == nil { consensus.GetLogger().Info(). - Msg("[Announce] Block verified") + Msgf("[Announce] Block verified %d", recvMsg.BlockNum) } }() } diff --git a/core/blockchain_impl.go b/core/blockchain_impl.go index f6084b4be..cc3031567 100644 --- a/core/blockchain_impl.go +++ b/core/blockchain_impl.go @@ -1608,7 +1608,7 @@ func (bc *BlockChainImpl) insertChain(chain types.Blocks, verifyHeaders bool) (i switch status { case CanonStatTy: - logger.Info().Msg("Inserted new block") + logger.Info().Msgf("Inserted new block s: %d e: %d n:%d", block.ShardID(), block.Epoch().Uint64(), block.NumberU64()) coalescedLogs = append(coalescedLogs, logs...) blockInsertTimer.UpdateSince(bstart) events = append(events, ChainEvent{block, block.Hash(), logs}) diff --git a/core/epochchain.go b/core/epochchain.go index 7d9aeae1a..2dab28471 100644 --- a/core/epochchain.go +++ b/core/epochchain.go @@ -166,7 +166,8 @@ func (bc *EpochChain) InsertChain(blocks types.Blocks, _ bool) (int, error) { se1() se2() utils.Logger().Info(). - Msgf("[EPOCHSYNC] Added block %d %s", block.NumberU64(), block.Hash().Hex()) + Msgf("[EPOCHSYNC] Added block %d, epoch %d, %s", block.NumberU64(), block.Epoch().Uint64(), block.Hash().Hex()) + } return 0, nil } diff --git a/core/rawdb/accessors_offchain.go b/core/rawdb/accessors_offchain.go index dd4329903..4808c8c23 100644 --- a/core/rawdb/accessors_offchain.go +++ b/core/rawdb/accessors_offchain.go @@ -43,7 +43,7 @@ func WriteShardStateBytes(db DatabaseWriter, epoch *big.Int, data []byte) error } utils.Logger().Info(). Str("epoch", epoch.String()). - Int("size", len(data)).Msg("wrote sharding state") + Int("size", len(data)).Msgf("wrote sharding state, epoch %d", epoch.Uint64()) return nil } diff --git a/node/node_handler.go b/node/node_handler.go index c5feeed07..b745ca713 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -337,7 +337,7 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block) error { } BroadcastCXReceipts(newBlock, node.Consensus) } else { - if node.Consensus.Mode() != consensus.Listening { + if mode := node.Consensus.Mode(); mode != consensus.Listening { numSignatures := node.Consensus.NumSignaturesIncludedInBlock(newBlock) utils.Logger().Info(). Uint64("blockNum", newBlock.NumberU64()). @@ -347,9 +347,12 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block) error { Int("numTxns", len(newBlock.Transactions())). Int("numStakingTxns", len(newBlock.StakingTransactions())). Uint32("numSignatures", numSignatures). + Str("mode", mode.String()). Msg("BINGO !!! Reached Consensus") if node.Consensus.Mode() == consensus.Syncing { - node.Consensus.SetMode(node.Consensus.UpdateConsensusInformation()) + mode = node.Consensus.UpdateConsensusInformation() + utils.Logger().Info().Msgf("Switching to mode %s", mode) + node.Consensus.SetMode(mode) } node.Consensus.UpdateValidatorMetrics(float64(numSignatures), float64(newBlock.NumberU64())) diff --git a/node/node_syncing.go b/node/node_syncing.go index 5319827ff..b1ee21ea7 100644 --- a/node/node_syncing.go +++ b/node/node_syncing.go @@ -316,7 +316,7 @@ func (node *Node) doSync(syncInstance ISync, syncingPeerProvider SyncingPeerProv if isSynchronized, _, _ := syncInstance.GetParsedSyncStatusDoubleChecked(); !isSynchronized { node.IsSynchronized.UnSet() if willJoinConsensus { - consensus.BlocksNotSynchronized() + consensus.BlocksNotSynchronized("node.doSync") } isBeacon := bc.ShardID() == shard.BeaconChainShardID syncInstance.SyncLoop(bc, isBeacon, consensus, legacysync.LoopMinTime)