add comments and refactor verify sig

pull/4351/head
“GheisMohammadi” 2 years ago committed by Casey Gardiner
parent 28010ac083
commit cc0f3d459d
  1. 4
      api/service/stagedstreamsync/const.go
  2. 56
      api/service/stagedstreamsync/downloader.go
  3. 14
      api/service/stagedstreamsync/helpers.go
  4. 60
      api/service/stagedstreamsync/sig_verify.go
  5. 32
      api/service/stagedstreamsync/staged_stream_sync.go

@ -21,11 +21,11 @@ const (
SoftQueueCap int = 100 SoftQueueCap int = 100
// DefaultConcurrency is the default settings for concurrency // DefaultConcurrency is the default settings for concurrency
DefaultConcurrency = 4 DefaultConcurrency int = 4
// ShortRangeTimeout is the timeout for each short range sync, which allow short range sync // ShortRangeTimeout is the timeout for each short range sync, which allow short range sync
// to restart automatically when stuck in `getBlockHashes` // to restart automatically when stuck in `getBlockHashes`
ShortRangeTimeout = 1 * time.Minute ShortRangeTimeout time.Duration = 1 * time.Minute
) )
type ( type (

@ -6,13 +6,9 @@ import (
"time" "time"
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
"github.com/pkg/errors"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/internal/chain"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node" nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
@ -128,7 +124,7 @@ func (d *Downloader) NumPeers() int {
return d.syncProtocol.NumStreams() return d.syncProtocol.NumStreams()
} }
// IsSyncing returns the current sync status // SyncStatus returns the current sync status
func (d *Downloader) SyncStatus() (bool, uint64, uint64) { func (d *Downloader) SyncStatus() (bool, uint64, uint64) {
syncing, target := d.stagedSyncInstance.status.get() syncing, target := d.stagedSyncInstance.status.get()
if !syncing { if !syncing {
@ -252,53 +248,3 @@ func (d *Downloader) loop() {
} }
} }
} }
var emptySigVerifyErr *sigVerifyErr
type sigVerifyErr struct {
err error
}
func (e *sigVerifyErr) Error() string {
return fmt.Sprintf("[VerifyHeaderSignature] %v", e.err.Error())
}
func verifyAndInsertBlocks(bc blockChain, blocks types.Blocks) (int, error) {
for i, block := range blocks {
if err := verifyAndInsertBlock(bc, block, blocks[i+1:]...); err != nil {
return i, err
}
}
return len(blocks), nil
}
func verifyAndInsertBlock(bc blockChain, block *types.Block, nextBlocks ...*types.Block) error {
var (
sigBytes bls.SerializedSignature
bitmap []byte
err error
)
if len(nextBlocks) > 0 {
// get commit sig from the next block
next := nextBlocks[0]
sigBytes = next.Header().LastCommitSignature()
bitmap = next.Header().LastCommitBitmap()
} else {
// get commit sig from current block
sigBytes, bitmap, err = chain.ParseCommitSigAndBitmap(block.GetCurrentCommitSig())
if err != nil {
return errors.Wrap(err, "parse commitSigAndBitmap")
}
}
if err := bc.Engine().VerifyHeaderSignature(bc, block.Header(), sigBytes, bitmap); err != nil {
return &sigVerifyErr{err}
}
if err := bc.Engine().VerifyHeader(bc, block.Header(), true); err != nil {
return errors.Wrap(err, "[VerifyHeader]")
}
if _, err := bc.InsertChain(types.Blocks{block}, false); err != nil {
return errors.Wrap(err, "[InsertChain]")
}
return nil
}

@ -112,3 +112,17 @@ func countHashMaxVote(m map[sttypes.StreamID]common.Hash, whitelist map[sttypes.
} }
return res, nextWl return res, nextWl
} }
func ByteCount(b uint64) string {
const unit = 1024
if b < unit {
return fmt.Sprintf("%dB", b)
}
div, exp := uint64(unit), 0
for n := b / unit; n >= unit; n /= unit {
div *= unit
exp++
}
return fmt.Sprintf("%.1f%cB",
float64(b)/float64(div), "KMGTPE"[exp])
}

@ -0,0 +1,60 @@
package stagedstreamsync
import (
"fmt"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/internal/chain"
"github.com/pkg/errors"
)
var emptySigVerifyErr *sigVerifyErr
type sigVerifyErr struct {
err error
}
func (e *sigVerifyErr) Error() string {
return fmt.Sprintf("[VerifyHeaderSignature] %v", e.err.Error())
}
func verifyAndInsertBlocks(bc blockChain, blocks types.Blocks) (int, error) {
for i, block := range blocks {
if err := verifyAndInsertBlock(bc, block, blocks[i+1:]...); err != nil {
return i, err
}
}
return len(blocks), nil
}
func verifyAndInsertBlock(bc blockChain, block *types.Block, nextBlocks ...*types.Block) error {
var (
sigBytes bls.SerializedSignature
bitmap []byte
err error
)
if len(nextBlocks) > 0 {
// get commit sig from the next block
next := nextBlocks[0]
sigBytes = next.Header().LastCommitSignature()
bitmap = next.Header().LastCommitBitmap()
} else {
// get commit sig from current block
sigBytes, bitmap, err = chain.ParseCommitSigAndBitmap(block.GetCurrentCommitSig())
if err != nil {
return errors.Wrap(err, "parse commitSigAndBitmap")
}
}
if err := bc.Engine().VerifyHeaderSignature(bc, block.Header(), sigBytes, bitmap); err != nil {
return &sigVerifyErr{err}
}
if err := bc.Engine().VerifyHeader(bc, block.Header(), true); err != nil {
return errors.Wrap(err, "[VerifyHeader]")
}
if _, err := bc.InsertChain(types.Blocks{block}, false); err != nil {
return errors.Wrap(err, "[InsertChain]")
}
return nil
}

@ -175,6 +175,7 @@ func (s *StagedStreamSync) IsAfter(stage1, stage2 SyncStageID) bool {
return idx1 > idx2 return idx1 > idx2
} }
// RevertTo sets the revert point
func (s *StagedStreamSync) RevertTo(revertPoint uint64, invalidBlockNumber uint64, invalidBlockHash common.Hash, invalidBlockStreamID sttypes.StreamID) { func (s *StagedStreamSync) RevertTo(revertPoint uint64, invalidBlockNumber uint64, invalidBlockHash common.Hash, invalidBlockStreamID sttypes.StreamID) {
utils.Logger().Info(). utils.Logger().Info().
Uint64("invalidBlockNumber", invalidBlockNumber). Uint64("invalidBlockNumber", invalidBlockNumber).
@ -195,10 +196,12 @@ func (s *StagedStreamSync) Done() {
s.revertPoint = nil s.revertPoint = nil
} }
// IsDone returns true if last stage have been done
func (s *StagedStreamSync) IsDone() bool { func (s *StagedStreamSync) IsDone() bool {
return s.currentStage >= uint(len(s.stages)) && s.revertPoint == nil return s.currentStage >= uint(len(s.stages)) && s.revertPoint == nil
} }
// SetCurrentStage sets the current stage to a given stage id
func (s *StagedStreamSync) SetCurrentStage(id SyncStageID) error { func (s *StagedStreamSync) SetCurrentStage(id SyncStageID) error {
for i, stage := range s.stages { for i, stage := range s.stages {
if stage.ID == id { if stage.ID == id {
@ -210,6 +213,7 @@ func (s *StagedStreamSync) SetCurrentStage(id SyncStageID) error {
return ErrStageNotFound return ErrStageNotFound
} }
// StageState retrieves the latest stage state from db
func (s *StagedStreamSync) StageState(stage SyncStageID, tx kv.Tx, db kv.RwDB) (*StageState, error) { func (s *StagedStreamSync) StageState(stage SyncStageID, tx kv.Tx, db kv.RwDB) (*StageState, error) {
var blockNum uint64 var blockNum uint64
var err error var err error
@ -226,6 +230,7 @@ func (s *StagedStreamSync) StageState(stage SyncStageID, tx kv.Tx, db kv.RwDB) (
return &StageState{s, stage, blockNum}, nil return &StageState{s, stage, blockNum}, nil
} }
// cleanUp cleans up the stage by calling pruneStage
func (s *StagedStreamSync) cleanUp(fromStage int, db kv.RwDB, tx kv.RwTx, firstCycle bool) error { func (s *StagedStreamSync) cleanUp(fromStage int, db kv.RwDB, tx kv.RwTx, firstCycle bool) error {
found := false found := false
for i := 0; i < len(s.pruningOrder); i++ { for i := 0; i < len(s.pruningOrder); i++ {
@ -242,6 +247,7 @@ func (s *StagedStreamSync) cleanUp(fromStage int, db kv.RwDB, tx kv.RwTx, firstC
return nil return nil
} }
// New creates a new StagedStreamSync instance
func New(ctx context.Context, func New(ctx context.Context,
bc core.BlockChain, bc core.BlockChain,
db kv.RwDB, db kv.RwDB,
@ -299,6 +305,7 @@ func New(ctx context.Context,
} }
} }
// doGetCurrentNumberRequest returns estimated current block number and corresponding stream
func (s *StagedStreamSync) doGetCurrentNumberRequest() (uint64, sttypes.StreamID, error) { func (s *StagedStreamSync) doGetCurrentNumberRequest() (uint64, sttypes.StreamID, error) {
ctx, cancel := context.WithTimeout(s.ctx, 10*time.Second) ctx, cancel := context.WithTimeout(s.ctx, 10*time.Second)
defer cancel() defer cancel()
@ -310,11 +317,13 @@ func (s *StagedStreamSync) doGetCurrentNumberRequest() (uint64, sttypes.StreamID
return bn, stid, nil return bn, stid, nil
} }
// promLabels returns a prometheus labels for current shard id
func (s *StagedStreamSync) promLabels() prometheus.Labels { func (s *StagedStreamSync) promLabels() prometheus.Labels {
sid := s.bc.ShardID() sid := s.bc.ShardID()
return prometheus.Labels{"ShardID": fmt.Sprintf("%d", sid)} return prometheus.Labels{"ShardID": fmt.Sprintf("%d", sid)}
} }
// checkHaveEnoughStreams checks whether node is connected to certain number of streams
func (s *StagedStreamSync) checkHaveEnoughStreams() error { func (s *StagedStreamSync) checkHaveEnoughStreams() error {
numStreams := s.protocol.NumStreams() numStreams := s.protocol.NumStreams()
if numStreams < s.config.MinStreams { if numStreams < s.config.MinStreams {
@ -324,6 +333,7 @@ func (s *StagedStreamSync) checkHaveEnoughStreams() error {
return nil return nil
} }
// SetNewContext sets a new context for all stages
func (s *StagedStreamSync) SetNewContext(ctx context.Context) error { func (s *StagedStreamSync) SetNewContext(ctx context.Context) error {
for _, s := range s.stages { for _, s := range s.stages {
s.Handler.SetStageContext(ctx) s.Handler.SetStageContext(ctx)
@ -331,6 +341,7 @@ func (s *StagedStreamSync) SetNewContext(ctx context.Context) error {
return nil return nil
} }
// Run runs a full cycle of stages
func (s *StagedStreamSync) Run(db kv.RwDB, tx kv.RwTx, firstCycle bool) error { func (s *StagedStreamSync) Run(db kv.RwDB, tx kv.RwTx, firstCycle bool) error {
s.prevRevertPoint = nil s.prevRevertPoint = nil
s.timings = s.timings[:0] s.timings = s.timings[:0]
@ -395,6 +406,7 @@ func (s *StagedStreamSync) Run(db kv.RwDB, tx kv.RwTx, firstCycle bool) error {
return nil return nil
} }
// CreateView creates a view for a given db
func CreateView(ctx context.Context, db kv.RwDB, tx kv.Tx, f func(tx kv.Tx) error) error { func CreateView(ctx context.Context, db kv.RwDB, tx kv.Tx, f func(tx kv.Tx) error) error {
if tx != nil { if tx != nil {
return f(tx) return f(tx)
@ -404,20 +416,7 @@ func CreateView(ctx context.Context, db kv.RwDB, tx kv.Tx, f func(tx kv.Tx) erro
}) })
} }
func ByteCount(b uint64) string { // printLogs prints all timing logs
const unit = 1024
if b < unit {
return fmt.Sprintf("%dB", b)
}
div, exp := uint64(unit), 0
for n := b / unit; n >= unit; n /= unit {
div *= unit
exp++
}
return fmt.Sprintf("%.1f%cB",
float64(b)/float64(div), "KMGTPE"[exp])
}
func printLogs(tx kv.RwTx, timings []Timing) error { func printLogs(tx kv.RwTx, timings []Timing) error {
var logCtx []interface{} var logCtx []interface{}
count := 0 count := 0
@ -463,6 +462,7 @@ func printLogs(tx kv.RwTx, timings []Timing) error {
return nil return nil
} }
// runStage executes stage
func (s *StagedStreamSync) runStage(stage *Stage, db kv.RwDB, tx kv.RwTx, firstCycle bool, invalidBlockRevert bool) (err error) { func (s *StagedStreamSync) runStage(stage *Stage, db kv.RwDB, tx kv.RwTx, firstCycle bool, invalidBlockRevert bool) (err error) {
start := time.Now() start := time.Now()
stageState, err := s.StageState(stage.ID, tx, db) stageState, err := s.StageState(stage.ID, tx, db)
@ -489,6 +489,7 @@ func (s *StagedStreamSync) runStage(stage *Stage, db kv.RwDB, tx kv.RwTx, firstC
return nil return nil
} }
// revertStage reverts stage
func (s *StagedStreamSync) revertStage(firstCycle bool, stage *Stage, db kv.RwDB, tx kv.RwTx) error { func (s *StagedStreamSync) revertStage(firstCycle bool, stage *Stage, db kv.RwDB, tx kv.RwTx) error {
start := time.Now() start := time.Now()
stageState, err := s.StageState(stage.ID, tx, db) stageState, err := s.StageState(stage.ID, tx, db)
@ -521,6 +522,7 @@ func (s *StagedStreamSync) revertStage(firstCycle bool, stage *Stage, db kv.RwDB
return nil return nil
} }
// pruneStage cleans up the stage and logs the timing
func (s *StagedStreamSync) pruneStage(firstCycle bool, stage *Stage, db kv.RwDB, tx kv.RwTx) error { func (s *StagedStreamSync) pruneStage(firstCycle bool, stage *Stage, db kv.RwDB, tx kv.RwTx) error {
start := time.Now() start := time.Now()
@ -566,6 +568,7 @@ func (s *StagedStreamSync) DisableAllStages() []SyncStageID {
return backupEnabledIds return backupEnabledIds
} }
// DisableStages disables stages by a set of given stage IDs
func (s *StagedStreamSync) DisableStages(ids ...SyncStageID) { func (s *StagedStreamSync) DisableStages(ids ...SyncStageID) {
for i := range s.stages { for i := range s.stages {
for _, id := range ids { for _, id := range ids {
@ -577,6 +580,7 @@ func (s *StagedStreamSync) DisableStages(ids ...SyncStageID) {
} }
} }
// EnableStages enables stages by a set of given stage IDs
func (s *StagedStreamSync) EnableStages(ids ...SyncStageID) { func (s *StagedStreamSync) EnableStages(ids ...SyncStageID) {
for i := range s.stages { for i := range s.stages {
for _, id := range ids { for _, id := range ids {

Loading…
Cancel
Save