From cc0f3d459d77bca19755a1ce10d6d0727acc242b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CGheisMohammadi=E2=80=9D?= <“Gheis.Mohammadi@gmail.com”> Date: Wed, 28 Dec 2022 18:41:09 +0800 Subject: [PATCH] add comments and refactor verify sig --- api/service/stagedstreamsync/const.go | 4 +- api/service/stagedstreamsync/downloader.go | 56 +---------------- api/service/stagedstreamsync/helpers.go | 14 +++++ api/service/stagedstreamsync/sig_verify.go | 60 +++++++++++++++++++ .../stagedstreamsync/staged_stream_sync.go | 32 +++++----- 5 files changed, 95 insertions(+), 71 deletions(-) create mode 100644 api/service/stagedstreamsync/sig_verify.go diff --git a/api/service/stagedstreamsync/const.go b/api/service/stagedstreamsync/const.go index 721b44d9f..ed68d80b1 100644 --- a/api/service/stagedstreamsync/const.go +++ b/api/service/stagedstreamsync/const.go @@ -21,11 +21,11 @@ const ( SoftQueueCap int = 100 // DefaultConcurrency is the default settings for concurrency - DefaultConcurrency = 4 + DefaultConcurrency int = 4 // ShortRangeTimeout is the timeout for each short range sync, which allow short range sync // to restart automatically when stuck in `getBlockHashes` - ShortRangeTimeout = 1 * time.Minute + ShortRangeTimeout time.Duration = 1 * time.Minute ) type ( diff --git a/api/service/stagedstreamsync/downloader.go b/api/service/stagedstreamsync/downloader.go index 383447f98..9883c8e8f 100644 --- a/api/service/stagedstreamsync/downloader.go +++ b/api/service/stagedstreamsync/downloader.go @@ -6,13 +6,9 @@ import ( "time" "github.com/ethereum/go-ethereum/event" - "github.com/pkg/errors" "github.com/rs/zerolog" "github.com/harmony-one/harmony/core" - "github.com/harmony-one/harmony/core/types" - "github.com/harmony-one/harmony/crypto/bls" - "github.com/harmony-one/harmony/internal/chain" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/p2p" @@ -128,7 +124,7 @@ func (d *Downloader) NumPeers() int { return d.syncProtocol.NumStreams() } -// IsSyncing returns the current sync status +// SyncStatus returns the current sync status func (d *Downloader) SyncStatus() (bool, uint64, uint64) { syncing, target := d.stagedSyncInstance.status.get() if !syncing { @@ -252,53 +248,3 @@ func (d *Downloader) loop() { } } } - -var emptySigVerifyErr *sigVerifyErr - -type sigVerifyErr struct { - err error -} - -func (e *sigVerifyErr) Error() string { - return fmt.Sprintf("[VerifyHeaderSignature] %v", e.err.Error()) -} - -func verifyAndInsertBlocks(bc blockChain, blocks types.Blocks) (int, error) { - for i, block := range blocks { - if err := verifyAndInsertBlock(bc, block, blocks[i+1:]...); err != nil { - return i, err - } - } - return len(blocks), nil -} - -func verifyAndInsertBlock(bc blockChain, block *types.Block, nextBlocks ...*types.Block) error { - var ( - sigBytes bls.SerializedSignature - bitmap []byte - err error - ) - if len(nextBlocks) > 0 { - // get commit sig from the next block - next := nextBlocks[0] - sigBytes = next.Header().LastCommitSignature() - bitmap = next.Header().LastCommitBitmap() - } else { - // get commit sig from current block - sigBytes, bitmap, err = chain.ParseCommitSigAndBitmap(block.GetCurrentCommitSig()) - if err != nil { - return errors.Wrap(err, "parse commitSigAndBitmap") - } - } - - if err := bc.Engine().VerifyHeaderSignature(bc, block.Header(), sigBytes, bitmap); err != nil { - return &sigVerifyErr{err} - } - if err := bc.Engine().VerifyHeader(bc, block.Header(), true); err != nil { - return errors.Wrap(err, "[VerifyHeader]") - } - if _, err := bc.InsertChain(types.Blocks{block}, false); err != nil { - return errors.Wrap(err, "[InsertChain]") - } - return nil -} diff --git a/api/service/stagedstreamsync/helpers.go b/api/service/stagedstreamsync/helpers.go index cd6fd8f6f..9e986a538 100644 --- a/api/service/stagedstreamsync/helpers.go +++ b/api/service/stagedstreamsync/helpers.go @@ -112,3 +112,17 @@ func countHashMaxVote(m map[sttypes.StreamID]common.Hash, whitelist map[sttypes. } return res, nextWl } + +func ByteCount(b uint64) string { + const unit = 1024 + if b < unit { + return fmt.Sprintf("%dB", b) + } + div, exp := uint64(unit), 0 + for n := b / unit; n >= unit; n /= unit { + div *= unit + exp++ + } + return fmt.Sprintf("%.1f%cB", + float64(b)/float64(div), "KMGTPE"[exp]) +} diff --git a/api/service/stagedstreamsync/sig_verify.go b/api/service/stagedstreamsync/sig_verify.go new file mode 100644 index 000000000..649c6eaec --- /dev/null +++ b/api/service/stagedstreamsync/sig_verify.go @@ -0,0 +1,60 @@ +package stagedstreamsync + +import ( + "fmt" + + "github.com/harmony-one/harmony/core/types" + "github.com/harmony-one/harmony/crypto/bls" + "github.com/harmony-one/harmony/internal/chain" + "github.com/pkg/errors" +) + +var emptySigVerifyErr *sigVerifyErr + +type sigVerifyErr struct { + err error +} + +func (e *sigVerifyErr) Error() string { + return fmt.Sprintf("[VerifyHeaderSignature] %v", e.err.Error()) +} + +func verifyAndInsertBlocks(bc blockChain, blocks types.Blocks) (int, error) { + for i, block := range blocks { + if err := verifyAndInsertBlock(bc, block, blocks[i+1:]...); err != nil { + return i, err + } + } + return len(blocks), nil +} + +func verifyAndInsertBlock(bc blockChain, block *types.Block, nextBlocks ...*types.Block) error { + var ( + sigBytes bls.SerializedSignature + bitmap []byte + err error + ) + if len(nextBlocks) > 0 { + // get commit sig from the next block + next := nextBlocks[0] + sigBytes = next.Header().LastCommitSignature() + bitmap = next.Header().LastCommitBitmap() + } else { + // get commit sig from current block + sigBytes, bitmap, err = chain.ParseCommitSigAndBitmap(block.GetCurrentCommitSig()) + if err != nil { + return errors.Wrap(err, "parse commitSigAndBitmap") + } + } + + if err := bc.Engine().VerifyHeaderSignature(bc, block.Header(), sigBytes, bitmap); err != nil { + return &sigVerifyErr{err} + } + if err := bc.Engine().VerifyHeader(bc, block.Header(), true); err != nil { + return errors.Wrap(err, "[VerifyHeader]") + } + if _, err := bc.InsertChain(types.Blocks{block}, false); err != nil { + return errors.Wrap(err, "[InsertChain]") + } + return nil +} diff --git a/api/service/stagedstreamsync/staged_stream_sync.go b/api/service/stagedstreamsync/staged_stream_sync.go index a5fa011f4..9603f5b06 100644 --- a/api/service/stagedstreamsync/staged_stream_sync.go +++ b/api/service/stagedstreamsync/staged_stream_sync.go @@ -175,6 +175,7 @@ func (s *StagedStreamSync) IsAfter(stage1, stage2 SyncStageID) bool { return idx1 > idx2 } +// RevertTo sets the revert point func (s *StagedStreamSync) RevertTo(revertPoint uint64, invalidBlockNumber uint64, invalidBlockHash common.Hash, invalidBlockStreamID sttypes.StreamID) { utils.Logger().Info(). Uint64("invalidBlockNumber", invalidBlockNumber). @@ -195,10 +196,12 @@ func (s *StagedStreamSync) Done() { s.revertPoint = nil } +// IsDone returns true if last stage have been done func (s *StagedStreamSync) IsDone() bool { return s.currentStage >= uint(len(s.stages)) && s.revertPoint == nil } +// SetCurrentStage sets the current stage to a given stage id func (s *StagedStreamSync) SetCurrentStage(id SyncStageID) error { for i, stage := range s.stages { if stage.ID == id { @@ -210,6 +213,7 @@ func (s *StagedStreamSync) SetCurrentStage(id SyncStageID) error { return ErrStageNotFound } +// StageState retrieves the latest stage state from db func (s *StagedStreamSync) StageState(stage SyncStageID, tx kv.Tx, db kv.RwDB) (*StageState, error) { var blockNum uint64 var err error @@ -226,6 +230,7 @@ func (s *StagedStreamSync) StageState(stage SyncStageID, tx kv.Tx, db kv.RwDB) ( return &StageState{s, stage, blockNum}, nil } +// cleanUp cleans up the stage by calling pruneStage func (s *StagedStreamSync) cleanUp(fromStage int, db kv.RwDB, tx kv.RwTx, firstCycle bool) error { found := false for i := 0; i < len(s.pruningOrder); i++ { @@ -242,6 +247,7 @@ func (s *StagedStreamSync) cleanUp(fromStage int, db kv.RwDB, tx kv.RwTx, firstC return nil } +// New creates a new StagedStreamSync instance func New(ctx context.Context, bc core.BlockChain, db kv.RwDB, @@ -299,6 +305,7 @@ func New(ctx context.Context, } } +// doGetCurrentNumberRequest returns estimated current block number and corresponding stream func (s *StagedStreamSync) doGetCurrentNumberRequest() (uint64, sttypes.StreamID, error) { ctx, cancel := context.WithTimeout(s.ctx, 10*time.Second) defer cancel() @@ -310,11 +317,13 @@ func (s *StagedStreamSync) doGetCurrentNumberRequest() (uint64, sttypes.StreamID return bn, stid, nil } +// promLabels returns a prometheus labels for current shard id func (s *StagedStreamSync) promLabels() prometheus.Labels { sid := s.bc.ShardID() return prometheus.Labels{"ShardID": fmt.Sprintf("%d", sid)} } +// checkHaveEnoughStreams checks whether node is connected to certain number of streams func (s *StagedStreamSync) checkHaveEnoughStreams() error { numStreams := s.protocol.NumStreams() if numStreams < s.config.MinStreams { @@ -324,6 +333,7 @@ func (s *StagedStreamSync) checkHaveEnoughStreams() error { return nil } +// SetNewContext sets a new context for all stages func (s *StagedStreamSync) SetNewContext(ctx context.Context) error { for _, s := range s.stages { s.Handler.SetStageContext(ctx) @@ -331,6 +341,7 @@ func (s *StagedStreamSync) SetNewContext(ctx context.Context) error { return nil } +// Run runs a full cycle of stages func (s *StagedStreamSync) Run(db kv.RwDB, tx kv.RwTx, firstCycle bool) error { s.prevRevertPoint = nil s.timings = s.timings[:0] @@ -395,6 +406,7 @@ func (s *StagedStreamSync) Run(db kv.RwDB, tx kv.RwTx, firstCycle bool) error { return nil } +// CreateView creates a view for a given db func CreateView(ctx context.Context, db kv.RwDB, tx kv.Tx, f func(tx kv.Tx) error) error { if tx != nil { return f(tx) @@ -404,20 +416,7 @@ func CreateView(ctx context.Context, db kv.RwDB, tx kv.Tx, f func(tx kv.Tx) erro }) } -func ByteCount(b uint64) string { - const unit = 1024 - if b < unit { - return fmt.Sprintf("%dB", b) - } - div, exp := uint64(unit), 0 - for n := b / unit; n >= unit; n /= unit { - div *= unit - exp++ - } - return fmt.Sprintf("%.1f%cB", - float64(b)/float64(div), "KMGTPE"[exp]) -} - +// printLogs prints all timing logs func printLogs(tx kv.RwTx, timings []Timing) error { var logCtx []interface{} count := 0 @@ -463,6 +462,7 @@ func printLogs(tx kv.RwTx, timings []Timing) error { return nil } +// runStage executes stage func (s *StagedStreamSync) runStage(stage *Stage, db kv.RwDB, tx kv.RwTx, firstCycle bool, invalidBlockRevert bool) (err error) { start := time.Now() stageState, err := s.StageState(stage.ID, tx, db) @@ -489,6 +489,7 @@ func (s *StagedStreamSync) runStage(stage *Stage, db kv.RwDB, tx kv.RwTx, firstC return nil } +// revertStage reverts stage func (s *StagedStreamSync) revertStage(firstCycle bool, stage *Stage, db kv.RwDB, tx kv.RwTx) error { start := time.Now() stageState, err := s.StageState(stage.ID, tx, db) @@ -521,6 +522,7 @@ func (s *StagedStreamSync) revertStage(firstCycle bool, stage *Stage, db kv.RwDB return nil } +// pruneStage cleans up the stage and logs the timing func (s *StagedStreamSync) pruneStage(firstCycle bool, stage *Stage, db kv.RwDB, tx kv.RwTx) error { start := time.Now() @@ -566,6 +568,7 @@ func (s *StagedStreamSync) DisableAllStages() []SyncStageID { return backupEnabledIds } +// DisableStages disables stages by a set of given stage IDs func (s *StagedStreamSync) DisableStages(ids ...SyncStageID) { for i := range s.stages { for _, id := range ids { @@ -577,6 +580,7 @@ func (s *StagedStreamSync) DisableStages(ids ...SyncStageID) { } } +// EnableStages enables stages by a set of given stage IDs func (s *StagedStreamSync) EnableStages(ids ...SyncStageID) { for i := range s.stages { for _, id := range ids {