diff --git a/api/service/stagedstreamsync/default_stages.go b/api/service/stagedstreamsync/default_stages.go index f869ee5fe..fe64e26d4 100644 --- a/api/service/stagedstreamsync/default_stages.go +++ b/api/service/stagedstreamsync/default_stages.go @@ -64,7 +64,7 @@ func initFastSyncStagesOrder() { ShortRange, BlockBodies, Receipts, - StateSync, + FullStateSync, States, LastMile, Finish, @@ -74,7 +74,7 @@ func initFastSyncStagesOrder() { Finish, LastMile, States, - StateSync, + FullStateSync, Receipts, BlockBodies, ShortRange, @@ -86,7 +86,7 @@ func initFastSyncStagesOrder() { Finish, LastMile, States, - StateSync, + FullStateSync, Receipts, BlockBodies, ShortRange, @@ -101,6 +101,7 @@ func DefaultStages(ctx context.Context, srCfg StageShortRangeCfg, bodiesCfg StageBodiesCfg, stateSyncCfg StageStateSyncCfg, + fullStateSyncCfg StageFullStateSyncCfg, statesCfg StageStatesCfg, receiptsCfg StageReceiptsCfg, lastMileCfg StageLastMileCfg, @@ -113,55 +114,81 @@ func DefaultStages(ctx context.Context, handlerStageBodies := NewStageBodies(bodiesCfg) handlerStageStates := NewStageStates(statesCfg) handlerStageStateSync := NewStageStateSync(stateSyncCfg) + handlerStageFullStateSync := NewStageFullStateSync(fullStateSyncCfg) handlerStageReceipts := NewStageReceipts(receiptsCfg) handlerStageLastMile := NewStageLastMile(lastMileCfg) handlerStageFinish := NewStageFinish(finishCfg) return []*Stage{ { - ID: Heads, - Description: "Retrieve Chain Heads", - Handler: handlerStageHeads, + ID: Heads, + Description: "Retrieve Chain Heads", + Handler: handlerStageHeads, + RangeMode: OnlyLongRange, + ChainExecutionMode: AllChains, }, { - ID: SyncEpoch, - Description: "Sync only Last Block of Epoch", - Handler: handlerStageEpochSync, + ID: SyncEpoch, + Description: "Sync only Last Block of Epoch", + Handler: handlerStageEpochSync, + RangeMode: OnlyShortRange, + ChainExecutionMode: OnlyEpochChain, }, { - ID: ShortRange, - Description: "Short Range Sync", - Handler: handlerStageShortRange, + ID: ShortRange, + Description: "Short Range Sync", + Handler: handlerStageShortRange, + RangeMode: OnlyShortRange, + ChainExecutionMode: AllChainsExceptEpochChain, }, { - ID: BlockBodies, - Description: "Retrieve Block Bodies", - Handler: handlerStageBodies, + ID: BlockBodies, + Description: "Retrieve Block Bodies", + Handler: handlerStageBodies, + RangeMode: OnlyLongRange, + ChainExecutionMode: AllChainsExceptEpochChain, }, { - ID: States, - Description: "Update Blockchain State", - Handler: handlerStageStates, + ID: States, + Description: "Update Blockchain State", + Handler: handlerStageStates, + RangeMode: OnlyLongRange, + ChainExecutionMode: AllChainsExceptEpochChain, }, { - ID: StateSync, - Description: "Retrieve States", - Handler: handlerStageStateSync, + ID: StateSync, + Description: "Retrieve States", + Handler: handlerStageStateSync, + RangeMode: OnlyLongRange, + ChainExecutionMode: AllChainsExceptEpochChain, }, { - ID: Receipts, - Description: "Retrieve Receipts", - Handler: handlerStageReceipts, + ID: FullStateSync, + Description: "Retrieve Full States", + Handler: handlerStageFullStateSync, + RangeMode: OnlyLongRange, + ChainExecutionMode: AllChainsExceptEpochChain, }, { - ID: LastMile, - Description: "update status for blocks after sync and update last mile blocks as well", - Handler: handlerStageLastMile, + ID: Receipts, + Description: "Retrieve Receipts", + Handler: handlerStageReceipts, + RangeMode: OnlyLongRange, + ChainExecutionMode: AllChainsExceptEpochChain, }, { - ID: Finish, - Description: "Finalize Changes", - Handler: handlerStageFinish, + ID: LastMile, + Description: "update status for blocks after sync and update last mile blocks as well", + Handler: handlerStageLastMile, + RangeMode: LongRangeAndShortRange, + ChainExecutionMode: AllChainsExceptEpochChain, + }, + { + ID: Finish, + Description: "Finalize Changes", + Handler: handlerStageFinish, + RangeMode: LongRangeAndShortRange, + ChainExecutionMode: AllChains, }, } } diff --git a/api/service/stagedstreamsync/downloader.go b/api/service/stagedstreamsync/downloader.go index 371104895..9d564b016 100644 --- a/api/service/stagedstreamsync/downloader.go +++ b/api/service/stagedstreamsync/downloader.go @@ -285,4 +285,5 @@ func (d *Downloader) loop() { return } } + } diff --git a/api/service/stagedstreamsync/sig_verify.go b/api/service/stagedstreamsync/sig_verify.go index bdf5a2107..cd7fc4f91 100644 --- a/api/service/stagedstreamsync/sig_verify.go +++ b/api/service/stagedstreamsync/sig_verify.go @@ -54,14 +54,7 @@ func verifyBlock(bc blockChain, block *types.Block, nextBlocks ...*types.Block) if err := bc.Engine().VerifyHeader(bc, block.Header(), true); err != nil { return errors.Wrap(err, "[VerifyHeader]") } - _, err = bc.InsertChain(types.Blocks{block}, false) - switch { - case errors.Is(err, core.ErrKnownBlock): - return nil - case err != nil: - return errors.Wrap(err, "[InsertChain]") - default: - } + return nil } @@ -72,6 +65,9 @@ func verifyAndInsertBlock(bc blockChain, block *types.Block, nextBlocks ...*type } // insert block if _, err := bc.InsertChain(types.Blocks{block}, false); err != nil { + if errors.Is(err, core.ErrKnownBlock) { + return nil + } return errors.Wrap(err, "[InsertChain]") } return nil diff --git a/api/service/stagedstreamsync/stage.go b/api/service/stagedstreamsync/stage.go index 48334a5e5..59602fe81 100644 --- a/api/service/stagedstreamsync/stage.go +++ b/api/service/stagedstreamsync/stage.go @@ -30,6 +30,25 @@ type StageHandler interface { CleanUp(ctx context.Context, firstCycle bool, p *CleanUpState, tx kv.RwTx) error } +type RangeExecution uint32 + +const ( + LongRangeAndShortRange RangeExecution = iota // Both short range and long range + OnlyShortRange // only short range + OnlyLongRange // only long range + //OnlyEpochSync // only epoch sync +) + +type ChainExecution uint32 + +const ( + AllChains ChainExecution = iota // Can execute for any shard + AllChainsExceptEpochChain // Can execute for any shard except epoch chain + OnlyBeaconNode // only for beacon node + OnlyEpochChain // only for epoch chain + OnlyShardChain // only for shard node (exclude beacon node and epoch chain) +) + // Stage is a single sync stage in staged sync. type Stage struct { // ID of the sync stage. Should not be empty and should be unique. It is recommended to prefix it with reverse domain to avoid clashes (`com.example.my-stage`). @@ -42,6 +61,10 @@ type Stage struct { DisabledDescription string // Disabled defines if the stage is disabled. It sets up when the stage is build by its `StageBuilder`. Disabled bool + // Range defines whether stage has to be executed for either long range or short range + RangeMode RangeExecution + // ShardExecution defines this stage has to be executed for which shards + ChainExecutionMode ChainExecution } // StageState is the state of the stage. diff --git a/api/service/stagedstreamsync/stage_finish.go b/api/service/stagedstreamsync/stage_finish.go index 0dfae53ae..c94aa692b 100644 --- a/api/service/stagedstreamsync/stage_finish.go +++ b/api/service/stagedstreamsync/stage_finish.go @@ -39,6 +39,11 @@ func (finish *StageFinish) Exec(ctx context.Context, firstCycle bool, invalidBlo // TODO: prepare indices (useful for RPC) and finalize + // switch to Full Sync Mode if the states are synced + if s.state.status.statesSynced { + s.state.status.cycleSyncMode = FullSync + } + if useInternalTx { if err := tx.Commit(); err != nil { return err diff --git a/api/service/stagedstreamsync/stage_receipts.go b/api/service/stagedstreamsync/stage_receipts.go index 4445eb6ba..78e8e089c 100644 --- a/api/service/stagedstreamsync/stage_receipts.go +++ b/api/service/stagedstreamsync/stage_receipts.go @@ -12,6 +12,7 @@ import ( "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/internal/utils" sttypes "github.com/harmony-one/harmony/p2p/stream/types" + "github.com/harmony-one/harmony/shard" "github.com/ledgerwatch/erigon-lib/kv" "github.com/pkg/errors" ) @@ -56,6 +57,11 @@ func (r *StageReceipts) Exec(ctx context.Context, firstCycle bool, invalidBlockR return nil } + // shouldn't execute for epoch chain + if r.configs.bc.ShardID() == shard.BeaconChainShardID && !s.state.isBeaconNode { + return nil + } + useInternalTx := tx == nil if invalidBlockRevert { diff --git a/api/service/stagedstreamsync/stage_state.go b/api/service/stagedstreamsync/stage_states.go similarity index 98% rename from api/service/stagedstreamsync/stage_state.go rename to api/service/stagedstreamsync/stage_states.go index df864d63f..1b668786c 100644 --- a/api/service/stagedstreamsync/stage_state.go +++ b/api/service/stagedstreamsync/stage_states.go @@ -165,6 +165,10 @@ func (stg *StageStates) Exec(ctx context.Context, firstCycle bool, invalidBlockR return ErrInvalidBlockNumber } + if stg.configs.bc.HasBlock(block.Hash(), block.NumberU64()) { + continue + } + if err := verifyAndInsertBlock(stg.configs.bc, block); err != nil { stg.configs.logger.Warn().Err(err).Uint64("cycle target block", targetHeight). Uint64("block number", block.NumberU64()). diff --git a/api/service/stagedstreamsync/stage_statesync.go b/api/service/stagedstreamsync/stage_statesync.go index c4e66e10e..3ce733f41 100644 --- a/api/service/stagedstreamsync/stage_statesync.go +++ b/api/service/stagedstreamsync/stage_statesync.go @@ -10,6 +10,7 @@ import ( "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/internal/utils" sttypes "github.com/harmony-one/harmony/p2p/stream/types" + "github.com/harmony-one/harmony/shard" "github.com/ledgerwatch/erigon-lib/kv" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -58,8 +59,14 @@ func (sss *StageStateSync) Exec(ctx context.Context, bool, invalidBlockRevert bo // for short range sync, skip this step if !s.state.initSync { return nil - } // only execute this stage in fast/snap sync mode and once we reach to pivot + } + + // shouldn't execute for epoch chain + if sss.configs.bc.ShardID() == shard.BeaconChainShardID && !s.state.isBeaconNode { + return nil + } + // only execute this stage in fast/snap sync mode and once we reach to pivot if s.state.status.pivotBlock == nil || s.state.CurrentBlockNumber() != s.state.status.pivotBlock.NumberU64() || s.state.status.statesSynced { diff --git a/api/service/stagedstreamsync/stage_statesync_full.go b/api/service/stagedstreamsync/stage_statesync_full.go index d304ca1c3..c1579114b 100644 --- a/api/service/stagedstreamsync/stage_statesync_full.go +++ b/api/service/stagedstreamsync/stage_statesync_full.go @@ -9,6 +9,7 @@ import ( "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/internal/utils" sttypes "github.com/harmony-one/harmony/p2p/stream/types" + "github.com/harmony-one/harmony/shard" "github.com/pkg/errors" //sttypes "github.com/harmony-one/harmony/p2p/stream/types" @@ -59,8 +60,19 @@ func (sss *StageFullStateSync) Exec(ctx context.Context, bool, invalidBlockRever // for short range sync, skip this step if !s.state.initSync { return nil - } // only execute this stage in fast/snap sync mode and once we reach to pivot + } + + // shouldn't execute for epoch chain + if sss.configs.bc.ShardID() == shard.BeaconChainShardID && !s.state.isBeaconNode { + return nil + } + + // if states are already synced, don't execute this stage + if s.state.status.statesSynced { + return + } + // only execute this stage in fast/snap sync mode and once we reach to pivot if s.state.status.pivotBlock == nil || s.state.CurrentBlockNumber() != s.state.status.pivotBlock.NumberU64() || s.state.status.statesSynced { @@ -72,21 +84,21 @@ func (sss *StageFullStateSync) Exec(ctx context.Context, bool, invalidBlockRever // if currentHead >= maxHeight { // return nil // } - // currProgress := s.state.CurrentBlockNumber() // targetHeight := s.state.currentCycle.TargetHeight - // if errV := CreateView(ctx, sss.configs.db, tx, func(etx kv.Tx) error { - // if currProgress, err = s.CurrentStageProgress(etx); err != nil { - // return err - // } - // return nil - // }); errV != nil { - // return errV - // } + currProgress := uint64(0) + if errV := CreateView(ctx, sss.configs.db, tx, func(etx kv.Tx) error { + if currProgress, err = s.CurrentStageProgress(etx); err != nil { + return err + } + return nil + }); errV != nil { + return errV + } + if currProgress >= s.state.status.pivotBlock.NumberU64() { + return nil + } - // if currProgress >= targetHeight { - // return nil - // } useInternalTx := tx == nil if useInternalTx { var err error @@ -109,6 +121,8 @@ func (sss *StageFullStateSync) Exec(ctx context.Context, bool, invalidBlockRever scheme := sss.configs.bc.TrieDB().Scheme() sdm := newFullStateDownloadManager(sss.configs.bc.ChainDb(), scheme, tx, sss.configs.bc, sss.configs.concurrency, s.state.logger) sdm.setRootHash(currentBlockRootHash) + + sdm.SyncStarted() var wg sync.WaitGroup for i := 0; i < s.state.config.Concurrency; i++ { wg.Add(1) @@ -128,6 +142,12 @@ func (sss *StageFullStateSync) Exec(ctx context.Context, bool, invalidBlockRever // states should be fully synced in this stage s.state.status.statesSynced = true + if err := sss.saveProgress(s, tx); err != nil { + sss.configs.logger.Warn().Err(err). + Uint64("pivot block number", s.state.status.pivotBlock.NumberU64()). + Msg(WrapStagedSyncMsg("save progress for statesync stage failed")) + } + /* gbm := s.state.gbm @@ -169,8 +189,8 @@ func (sss *StageFullStateSync) runStateWorkerLoop(ctx context.Context, sdm *Full return default: } - accountTasks, codes, storages, healtask, codetask, err := sdm.GetNextBatch() - if len(accountTasks)+len(codes)+len(storages.accounts)+len(healtask.hashes)+len(codetask.hashes) == 0 || err != nil { + accountTasks, codes, storages, healtask, codetask, nTasks, err := sdm.GetNextBatch() + if nTasks == 0 || err != nil { select { case <-ctx.Done(): return @@ -184,8 +204,8 @@ func (sss *StageFullStateSync) runStateWorkerLoop(ctx context.Context, sdm *Full task := accountTasks[0] origin := task.Next limit := task.Last - root := sdm.root - cap := maxRequestSize + root := task.root + cap := task.cap retAccounts, proof, stid, err := sss.configs.protocol.GetAccountRange(ctx, root, origin, limit, uint64(cap)) if err != nil { if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { @@ -234,10 +254,10 @@ func (sss *StageFullStateSync) runStateWorkerLoop(ctx context.Context, sdm *Full } else if len(storages.accounts) > 0 { - root := sdm.root + root := storages.root roots := storages.roots accounts := storages.accounts - cap := maxRequestSize + cap := storages.cap origin := storages.origin limit := storages.limit mainTask := storages.mainTask @@ -276,13 +296,14 @@ func (sss *StageFullStateSync) runStateWorkerLoop(ctx context.Context, sdm *Full } else { // assign trie node Heal Tasks if len(healtask.hashes) > 0 { - root := sdm.root + root := healtask.root task := healtask.task hashes := healtask.hashes pathsets := healtask.pathsets paths := healtask.paths + bytes := healtask.bytes - nodes, stid, err := sss.configs.protocol.GetTrieNodes(ctx, root, pathsets, maxRequestSize) + nodes, stid, err := sss.configs.protocol.GetTrieNodes(ctx, root, pathsets, uint64(bytes)) if err != nil { if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { sss.configs.protocol.StreamFailed(stid, "GetTrieNodes failed") @@ -316,7 +337,8 @@ func (sss *StageFullStateSync) runStateWorkerLoop(ctx context.Context, sdm *Full if len(codetask.hashes) > 0 { task := codetask.task hashes := codetask.hashes - retCodes, stid, err := sss.configs.protocol.GetByteCodes(ctx, hashes, maxRequestSize) + bytes := codetask.bytes + retCodes, stid, err := sss.configs.protocol.GetByteCodes(ctx, hashes, uint64(bytes)) if err != nil { if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { sss.configs.protocol.StreamFailed(stid, "GetByteCodes failed") @@ -354,7 +376,7 @@ func (sss *StageFullStateSync) downloadByteCodes(ctx context.Context, sdm *FullS for _, codeTask := range codeTasks { // try to get byte codes from remote peer // if any of them failed, the stid will be the id of the failed stream - retCodes, stid, err := sss.configs.protocol.GetByteCodes(ctx, codeTask.hashes, maxRequestSize) + retCodes, stid, err := sss.configs.protocol.GetByteCodes(ctx, codeTask.hashes, uint64(codeTask.cap)) if err != nil { return stid, err } @@ -413,7 +435,7 @@ func (stg *StageFullStateSync) saveProgress(s *StageState, tx kv.RwTx) (err erro } // save progress - if err = s.Update(tx, s.state.CurrentBlockNumber()); err != nil { + if err = s.Update(tx, s.state.status.pivotBlock.NumberU64()); err != nil { utils.Logger().Error(). Err(err). Msgf("[STAGED_SYNC] saving progress for block States stage failed") diff --git a/api/service/stagedstreamsync/staged_stream_sync.go b/api/service/stagedstreamsync/staged_stream_sync.go index 03340eb15..1782068b2 100644 --- a/api/service/stagedstreamsync/staged_stream_sync.go +++ b/api/service/stagedstreamsync/staged_stream_sync.go @@ -16,6 +16,7 @@ import ( "github.com/harmony-one/harmony/internal/utils" syncproto "github.com/harmony-one/harmony/p2p/stream/protocols/sync" sttypes "github.com/harmony-one/harmony/p2p/stream/types" + "github.com/harmony-one/harmony/shard" "github.com/ledgerwatch/erigon-lib/kv" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -405,6 +406,11 @@ func (s *StagedStreamSync) Run(ctx context.Context, db kv.RwDB, tx kv.RwTx, firs continue } + // TODO: enable this part after make sure all works well + // if !s.canExecute(stage) { + // continue + // } + if err := s.runStage(ctx, stage, db, tx, firstCycle, s.invalidBlock.Active); err != nil { utils.Logger().Error(). Err(err). @@ -431,6 +437,55 @@ func (s *StagedStreamSync) Run(ctx context.Context, db kv.RwDB, tx kv.RwTx, firs return nil } +func (s *StagedStreamSync) canExecute(stage *Stage) bool { + // check range mode + if stage.RangeMode != LongRangeAndShortRange { + isLongRange := s.initSync + switch stage.RangeMode { + case OnlyLongRange: + if !isLongRange { + return false + } + case OnlyShortRange: + if isLongRange { + return false + } + default: + return false + } + } + + // check chain execution + if stage.ChainExecutionMode != AllChains { + shardID := s.bc.ShardID() + isBeaconNode := s.isBeaconNode + isShardChain := shardID != shard.BeaconChainShardID + isEpochChain := shardID == shard.BeaconChainShardID && !isBeaconNode + switch stage.ChainExecutionMode { + case AllChainsExceptEpochChain: + if isEpochChain { + return false + } + case OnlyBeaconNode: + if !isBeaconNode { + return false + } + case OnlyShardChain: + if !isShardChain { + return false + } + case OnlyEpochChain: + if !isEpochChain { + return false + } + default: + return false + } + } + + return true +} + // CreateView creates a view for a given db func CreateView(ctx context.Context, db kv.RwDB, tx kv.Tx, f func(tx kv.Tx) error) error { if tx != nil { diff --git a/api/service/stagedstreamsync/stages.go b/api/service/stagedstreamsync/stages.go index 6ad9e4519..33f3b293b 100644 --- a/api/service/stagedstreamsync/stages.go +++ b/api/service/stagedstreamsync/stages.go @@ -8,15 +8,16 @@ import ( type SyncStageID string const ( - Heads SyncStageID = "Heads" // Heads are downloaded - ShortRange SyncStageID = "ShortRange" // short range - SyncEpoch SyncStageID = "SyncEpoch" // epoch sync - BlockBodies SyncStageID = "BlockBodies" // Block bodies are downloaded, TxHash and UncleHash are getting verified - States SyncStageID = "States" // will construct most recent state from downloaded blocks - StateSync SyncStageID = "StateSync" // State sync - Receipts SyncStageID = "Receipts" // Receipts - LastMile SyncStageID = "LastMile" // update blocks after sync and update last mile blocks as well - Finish SyncStageID = "Finish" // Nominal stage after all other stages + Heads SyncStageID = "Heads" // Heads are downloaded + ShortRange SyncStageID = "ShortRange" // short range + SyncEpoch SyncStageID = "SyncEpoch" // epoch sync + BlockBodies SyncStageID = "BlockBodies" // Block bodies are downloaded, TxHash and UncleHash are getting verified + States SyncStageID = "States" // will construct most recent state from downloaded blocks + StateSync SyncStageID = "StateSync" // State sync + FullStateSync SyncStageID = "FullStateSync" // Full State Sync + Receipts SyncStageID = "Receipts" // Receipts + LastMile SyncStageID = "LastMile" // update blocks after sync and update last mile blocks as well + Finish SyncStageID = "Finish" // Nominal stage after all other stages ) // GetStageName returns the stage name in string diff --git a/api/service/stagedstreamsync/state_sync_full.go b/api/service/stagedstreamsync/state_sync_full.go index c98dcbafd..14cdb1f59 100644 --- a/api/service/stagedstreamsync/state_sync_full.go +++ b/api/service/stagedstreamsync/state_sync_full.go @@ -108,6 +108,11 @@ var ( type accountTask struct { id uint64 //unique id for account task + root common.Hash + origin common.Hash + limit common.Hash + cap int + // These fields get serialized to leveldb on shutdown Next common.Hash // Next account to sync in this interval Last common.Hash // Last account to sync in this interval @@ -229,16 +234,19 @@ type byteCodeTasksBundle struct { id uint64 //unique id for bytecode task bundle task *accountTask hashes []common.Hash + cap int } type storageTaskBundle struct { id uint64 //unique id for storage task bundle + root common.Hash accounts []common.Hash roots []common.Hash mainTask *accountTask subtask *storageTask origin common.Hash limit common.Hash + cap int } // healTask represents the sync task for healing the snap-synced chunk boundaries. @@ -251,6 +259,7 @@ type healTask struct { pathsets []*message.TrieNodePathSet task *healTask root common.Hash + bytes int byteCodeReq bool } @@ -259,7 +268,6 @@ type tasks struct { storageTasks map[uint64]*storageTaskBundle // Set of trie node tasks currently queued for retrieval, indexed by path codeTasks map[uint64]*byteCodeTasksBundle // Set of byte code tasks currently queued for retrieval, indexed by hash healer map[uint64]*healTask - snapped bool // Flag to signal that snap phase is done } func newTasks() *tasks { @@ -268,7 +276,6 @@ func newTasks() *tasks { storageTasks: make(map[uint64]*storageTaskBundle, 0), codeTasks: make(map[uint64]*byteCodeTasksBundle), healer: make(map[uint64]*healTask, 0), - snapped: false, } } @@ -399,8 +406,6 @@ type FullStateDownloadManager struct { storageSynced uint64 // Number of storage slots downloaded storageBytes common.StorageSize // Number of storage trie bytes persisted to disk - pend sync.WaitGroup // Tracks network request goroutines for graceful shutdown - stateWriter ethdb.Batch // Shared batch writer used for persisting raw states accountHealed uint64 // Number of accounts downloaded during the healing stage accountHealedBytes common.StorageSize // Number of raw account bytes persisted to disk during the healing stage @@ -420,6 +425,9 @@ type FullStateDownloadManager struct { bytecodeHealBytes common.StorageSize // Number of bytecodes persisted to disk bytecodeHealDups uint64 // Number of bytecodes already processed bytecodeHealNops uint64 // Number of bytecodes not requested + + startTime time.Time // Time instance when snapshot sync started + logTime time.Time // Time instance when status was last reported } func newFullStateDownloadManager(db ethdb.KeyValueStore, @@ -430,18 +438,19 @@ func newFullStateDownloadManager(db ethdb.KeyValueStore, logger zerolog.Logger) *FullStateDownloadManager { return &FullStateDownloadManager{ - db: db, - scheme: scheme, - bc: bc, - stateWriter: db.NewBatch(), - tx: tx, - keccak: sha3.NewLegacyKeccak256().(crypto.KeccakState), - concurrency: concurrency, - logger: logger, - tasks: newTasks(), - requesting: newTasks(), - processing: newTasks(), - retries: newTasks(), + db: db, + scheme: scheme, + bc: bc, + stateWriter: db.NewBatch(), + tx: tx, + keccak: sha3.NewLegacyKeccak256().(crypto.KeccakState), + concurrency: concurrency, + logger: logger, + tasks: newTasks(), + requesting: newTasks(), + processing: newTasks(), + retries: newTasks(), + trienodeHealThrottle: maxTrienodeHealThrottle, // Tune downward instead of insta-filling with junk } } @@ -531,6 +540,12 @@ func (s *FullStateDownloadManager) commitHealer(force bool) { utils.Logger().Debug().Str("type", "trienodes").Interface("bytes", common.StorageSize(batch.ValueSize())).Msg("Persisted set of healing data") } +func (s *FullStateDownloadManager) SyncStarted() { + if s.startTime == (time.Time{}) { + s.startTime = time.Now() + } +} + func (s *FullStateDownloadManager) SyncCompleted() { defer func() { // Persist any progress, independent of failure for _, task := range s.tasks.accountTasks { @@ -556,7 +571,8 @@ func (s *FullStateDownloadManager) SyncCompleted() { utils.Logger().Debug().Interface("root", s.root).Msg("Terminating snapshot sync cycle") }() - utils.Logger().Debug().Msg("Snapshot sync already completed") + elapsed := time.Since(s.startTime) + utils.Logger().Debug().Interface("elapsed", elapsed).Msg("Snapshot sync already completed") } // getNextBatch returns objects with a maximum of n state download @@ -566,38 +582,30 @@ func (s *FullStateDownloadManager) GetNextBatch() (accounts []*accountTask, storages *storageTaskBundle, healtask *healTask, codetask *healTask, + nItems int, err error) { s.lock.Lock() defer s.lock.Unlock() - accounts, codes, storages, healtask, codetask = s.getBatchFromRetries() - nItems := len(accounts) + len(codes) + len(storages.roots) + len(healtask.hashes) + len(codetask.hashes) + accounts, codes, storages, healtask, codetask, nItems = s.getBatchFromRetries() if nItems > 0 { return } if len(s.tasks.accountTasks) == 0 && s.scheduler.Pending() == 0 { - if nItems == 0 { - s.SyncCompleted() - } + s.SyncCompleted() return } // Refill available tasks from the scheduler. - withHealTasks := true - if healtask != nil || codetask != nil { - withHealTasks = false - } - newAccounts, newCodes, newStorageTaskBundle, newHealTask, newCodeTask := s.getBatchFromUnprocessed(withHealTasks) + newAccounts, newCodes, newStorageTaskBundle, newHealTask, newCodeTask, nItems := s.getBatchFromUnprocessed() accounts = append(accounts, newAccounts...) codes = append(codes, newCodes...) storages = newStorageTaskBundle - if withHealTasks { - healtask = newHealTask - codetask = newCodeTask - } + healtask = newHealTask + codetask = newCodeTask return } @@ -714,7 +722,7 @@ func (s *FullStateDownloadManager) loadSyncStatus() { // Either we've failed to decode the previous state, or there was none. // Start a fresh sync by chunking up the account range and scheduling // them for retrieval. - s.tasks.accountTasks = nil + s.tasks = newTasks() s.accountSynced, s.accountBytes = 0, 0 s.bytecodeSynced, s.bytecodeBytes = 0, 0 s.storageSynced, s.storageBytes = 0, 0 @@ -921,16 +929,18 @@ func (s *FullStateDownloadManager) updateStats(written, duplicate, unexpected in // getBatchFromUnprocessed returns objects with a maximum of n unprocessed state download // tasks to send to the remote peer. -func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) ( +func (s *FullStateDownloadManager) getBatchFromUnprocessed() ( accounts []*accountTask, codes []*byteCodeTasksBundle, storages *storageTaskBundle, healtask *healTask, - codetask *healTask) { + codetask *healTask, + count int) { // over trie nodes as those can be written to disk and forgotten about. codes = make([]*byteCodeTasksBundle, 0) accounts = make([]*accountTask, 0) + count = 0 for i, task := range s.tasks.accountTasks { // Stop when we've gathered enough requests @@ -956,12 +966,18 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) ( break } + task.root = s.root + task.origin = task.Next + task.limit = task.Last + task.cap = maxRequestSize + task.requested = true s.tasks.accountTasks[i].requested = true accounts = append(accounts, task) s.requesting.addAccountTask(task.id, task) s.tasks.addAccountTask(task.id, task) // one task account is enough for an stream + count = len(accounts) return } @@ -997,6 +1013,7 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) ( id: taskID, hashes: hashes, task: task, + cap: maxRequestSize, } codes = append(codes, bytecodeTask) @@ -1005,12 +1022,14 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) ( // Stop when we've gathered enough requests if totalHashes >= maxCodeRequestCount { + count = totalHashes return } } // if we found some codes, can assign it to node if totalHashes > 0 { + count = totalHashes return } @@ -1020,14 +1039,8 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) ( continue } - // TODO: check cap calculations (shouldn't give us big chunk) - // if cap > maxRequestSize { - // cap = maxRequestSize - // } - // if cap < minRequestSize { // Don't bother with peers below a bare minimum performance - // cap = minRequestSize - // } - storageSets := maxRequestSize / 1024 + cap := maxRequestSize + storageSets := cap / 1024 storages = &storageTaskBundle{ accounts: make([]common.Hash, 0, storageSets), @@ -1089,23 +1102,21 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) ( storages.origin = storages.subtask.Next storages.limit = storages.subtask.Last } + storages.root = s.root + storages.cap = cap s.tasks.addStorageTaskBundle(taskID, storages) s.requesting.addStorageTaskBundle(taskID, storages) - + count = len(storages.accounts) return } if len(storages.accounts) > 0 { - return - } - - if !withHealTasks { + count = len(storages.accounts) return } // Sync phase done, run heal phase - - // Iterate over pending tasks and try to find a peer to retrieve with + // Iterate over pending tasks for (len(s.tasks.healer) > 0 && len(s.tasks.healer[0].hashes) > 0) || s.scheduler.Pending() > 0 { // If there are not enough trie tasks queued to fully assign, fill the // queue from the state sync scheduler. The trie synced schedules these @@ -1129,7 +1140,7 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) ( // If all the heal tasks are bytecodes or already downloading, bail if len(s.tasks.healer[0].trieTasks) == 0 { - return + break } // Generate the network query and send it to the peer // if cap > maxTrieRequestCount { @@ -1177,6 +1188,7 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) ( pathsets: pathsets, root: s.root, task: s.tasks.healer[0], + bytes: maxRequestSize, byteCodeReq: false, } @@ -1184,6 +1196,7 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) ( s.requesting.addHealerTask(taskID, healtask) if len(hashes) > 0 { + count = len(hashes) return } } @@ -1205,7 +1218,7 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) ( // If all the heal tasks are trienodes or already downloading, bail if len(s.tasks.healer[0].codeTasks) == 0 { - return + break } // Task pending retrieval, try to find an idle peer. If no such peer // exists, we probably assigned tasks for all (or they are stateless). @@ -1243,9 +1256,10 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) ( id: taskID, hashes: hashes, task: s.tasks.healer[0], + bytes: maxRequestSize, byteCodeReq: true, } - + count = len(hashes) s.tasks.healer[taskID] = codetask s.requesting.addHealerTask(taskID, healtask) } @@ -1272,7 +1286,8 @@ func (s *FullStateDownloadManager) getBatchFromRetries() ( codes []*byteCodeTasksBundle, storages *storageTaskBundle, healtask *healTask, - codetask *healTask) { + codetask *healTask, + count int) { // over trie nodes as those can be written to disk and forgotten about. accounts = make([]*accountTask, 0) @@ -1290,6 +1305,7 @@ func (s *FullStateDownloadManager) getBatchFromRetries() ( } if len(accounts) > 0 { + count = len(accounts) return } @@ -1301,6 +1317,7 @@ func (s *FullStateDownloadManager) getBatchFromRetries() ( } if len(codes) > 0 { + count = len(codes) return } @@ -1316,10 +1333,7 @@ func (s *FullStateDownloadManager) getBatchFromRetries() ( } s.requesting.addStorageTaskBundle(storages.id, storages) s.retries.deleteStorageTaskBundle(storages.id) - return - } - - if len(storages.accounts) > 0 { + count = len(storages.accounts) return } @@ -1338,6 +1352,7 @@ func (s *FullStateDownloadManager) getBatchFromRetries() ( } s.requesting.addHealerTask(id, task) s.retries.deleteHealerTask(id) + count = len(task.hashes) return } if task.byteCodeReq { @@ -1352,11 +1367,13 @@ func (s *FullStateDownloadManager) getBatchFromRetries() ( } s.requesting.addHealerTask(id, task) s.retries.deleteHealerTask(id) + count = len(task.hashes) return } } } + count = 0 return } @@ -1371,14 +1388,18 @@ func (s *FullStateDownloadManager) HandleRequestError(accounts []*accountTask, s.lock.Lock() defer s.lock.Unlock() - for _, task := range accounts { - s.requesting.deleteAccountTask(task.id) - s.retries.addAccountTask(task.id, task) + if accounts != nil && len(accounts) > 0 { + for _, task := range accounts { + s.requesting.deleteAccountTask(task.id) + s.retries.addAccountTask(task.id, task) + } } - for _, code := range codes { - s.requesting.deleteCodeTask(code.id) - s.retries.addCodeTask(code.id, code) + if codes != nil && len(codes) > 0 { + for _, code := range codes { + s.requesting.deleteCodeTask(code.id) + s.retries.addCodeTask(code.id, code) + } } if storages != nil { diff --git a/api/service/stagedstreamsync/syncing.go b/api/service/stagedstreamsync/syncing.go index e6879a523..c3bc585f2 100644 --- a/api/service/stagedstreamsync/syncing.go +++ b/api/service/stagedstreamsync/syncing.go @@ -90,6 +90,7 @@ func CreateStagedSync(ctx context.Context, stageBodiesCfg := NewStageBodiesCfg(bc, mainDB, dbs, config.Concurrency, protocol, isBeaconNode, extractReceiptHashes, config.LogProgress) stageStatesCfg := NewStageStatesCfg(bc, mainDB, dbs, config.Concurrency, logger, config.LogProgress) stageStateSyncCfg := NewStageStateSyncCfg(bc, mainDB, config.Concurrency, protocol, logger, config.LogProgress) + stageFullStateSyncCfg := NewStageFullStateSyncCfg(bc, mainDB, config.Concurrency, protocol, logger, config.LogProgress) stageReceiptsCfg := NewStageReceiptsCfg(bc, mainDB, dbs, config.Concurrency, protocol, isBeaconNode, config.LogProgress) lastMileCfg := NewStageLastMileCfg(ctx, bc, mainDB) stageFinishCfg := NewStageFinishCfg(mainDB) @@ -103,6 +104,7 @@ func CreateStagedSync(ctx context.Context, stageShortRangeCfg, stageBodiesCfg, stageStateSyncCfg, + stageFullStateSyncCfg, stageStatesCfg, stageReceiptsCfg, lastMileCfg, diff --git a/core/tx_pool.go b/core/tx_pool.go index 7cd3cd9b9..0f61b8d25 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -850,15 +850,11 @@ func (pool *TxPool) validateStakingTx(tx *staking.StakingTransaction) error { } currentBlockNumber := pool.chain.CurrentBlock().Number() pendingBlockNumber := new(big.Int).Add(currentBlockNumber, big.NewInt(1)) - pendingEpoch := pool.chain.CurrentBlock().Epoch() - if shard.Schedule.IsLastBlock(currentBlockNumber.Uint64()) { - pendingEpoch = new(big.Int).Add(pendingEpoch, big.NewInt(1)) - } chainContext, ok := pool.chain.(ChainContext) if !ok { chainContext = nil // might use testing blockchain, set to nil for verifier to handle. } - _, err = VerifyAndCreateValidatorFromMsg(pool.currentState, chainContext, pendingEpoch, pendingBlockNumber, stkMsg) + _, err = VerifyAndCreateValidatorFromMsg(pool.currentState, chainContext, pool.pendingEpoch(), pendingBlockNumber, stkMsg) return err case staking.DirectiveEditValidator: msg, err := staking.RLPDecodeStakeMsg(tx.Data(), staking.DirectiveEditValidator) @@ -964,11 +960,12 @@ func (pool *TxPool) validateStakingTx(tx *staking.StakingTransaction) error { } } +// pendingEpoch refers to the epoch of the pending block func (pool *TxPool) pendingEpoch() *big.Int { currentBlock := pool.chain.CurrentBlock() pendingEpoch := currentBlock.Epoch() if shard.Schedule.IsLastBlock(currentBlock.Number().Uint64()) { - pendingEpoch.Add(pendingEpoch, big.NewInt(1)) + pendingEpoch = new(big.Int).Add(pendingEpoch, common.Big1) } return pendingEpoch } diff --git a/hmy/staking.go b/hmy/staking.go index 9d99b7908..83e800544 100644 --- a/hmy/staking.go +++ b/hmy/staking.go @@ -143,6 +143,11 @@ func (hmy *Harmony) IsNoEarlyUnlockEpoch(epoch *big.Int) bool { return hmy.BlockChain.Config().IsNoEarlyUnlock(epoch) } +// IsMaxRate ... +func (hmy *Harmony) IsMaxRate(epoch *big.Int) bool { + return hmy.BlockChain.Config().IsMaxRate(epoch) +} + // IsCommitteeSelectionBlock checks if the given block is the committee selection block func (hmy *Harmony) IsCommitteeSelectionBlock(header *block.Header) bool { return chain.IsCommitteeSelectionBlock(hmy.BlockChain, header) @@ -592,6 +597,7 @@ func (hmy *Harmony) GetUndelegationPayouts( return undelegationPayouts, nil } + isMaxRate := hmy.IsMaxRate(epoch) lockingPeriod := hmy.GetDelegationLockingPeriodInEpoch(undelegationPayoutBlock.Epoch()) for _, validator := range hmy.GetAllValidatorAddresses() { wrapper, err := hmy.BlockChain.ReadValidatorInformationAtRoot(validator, undelegationPayoutBlock.Root()) @@ -600,7 +606,7 @@ func (hmy *Harmony) GetUndelegationPayouts( } noEarlyUnlock := hmy.IsNoEarlyUnlockEpoch(epoch) for _, delegation := range wrapper.Delegations { - withdraw := delegation.RemoveUnlockedUndelegations(epoch, wrapper.LastEpochInCommittee, lockingPeriod, noEarlyUnlock) + withdraw := delegation.RemoveUnlockedUndelegations(epoch, wrapper.LastEpochInCommittee, lockingPeriod, noEarlyUnlock, isMaxRate) if withdraw.Cmp(bigZero) == 1 { undelegationPayouts.SetPayoutByDelegatorAddrAndValidatorAddr(validator, delegation.DelegatorAddress, withdraw) } diff --git a/internal/chain/engine.go b/internal/chain/engine.go index ff303579c..4ae302e18 100644 --- a/internal/chain/engine.go +++ b/internal/chain/engine.go @@ -397,9 +397,15 @@ func payoutUndelegations( const msg = "[Finalize] failed to read all validators" return errors.New(msg) } - // Payout undelegated/unlocked tokens + // Payout undelegated/unlocked tokens at the end of each epoch lockPeriod := GetLockPeriodInEpoch(chain, header.Epoch()) noEarlyUnlock := chain.Config().IsNoEarlyUnlock(header.Epoch()) + newShardState, err := header.GetShardState() + if err != nil { + const msg = "[Finalize] failed to read shard state" + return errors.New(msg) + } + isMaxRate := chain.Config().IsMaxRate(newShardState.Epoch) for _, validator := range validators { wrapper, err := state.ValidatorWrapper(validator, true, false) if err != nil { @@ -410,7 +416,7 @@ func payoutUndelegations( for i := range wrapper.Delegations { delegation := &wrapper.Delegations[i] totalWithdraw := delegation.RemoveUnlockedUndelegations( - header.Epoch(), wrapper.LastEpochInCommittee, lockPeriod, noEarlyUnlock, + header.Epoch(), wrapper.LastEpochInCommittee, lockPeriod, noEarlyUnlock, isMaxRate, ) if totalWithdraw.Sign() != 0 { state.AddBalance(delegation.DelegatorAddress, totalWithdraw) @@ -533,6 +539,7 @@ func setElectionEpochAndMinFee(chain engine.ChainReader, header *block.Header, s map[common.Address]struct{}, len(newShardState.StakedValidators().Addrs), ) + // this loop is for elected validators only for _, addr := range newShardState.StakedValidators().Addrs { wrapper, err := state.ValidatorWrapper(addr, true, false) if err != nil { @@ -566,8 +573,9 @@ func setElectionEpochAndMinFee(chain engine.ChainReader, header *block.Header, s // due to a bug in the old implementation of the minimum fee, // unelected validators did not have their fee updated even // when the protocol required them to do so. here we fix it, - // but only after the HIP-30 hard fork is effective. - if config.IsHIP30(newShardState.Epoch) { + // but only after the HIP-30 hard fork is effective + // this loop applies to all validators, but excludes the ones in isElected + if config.IsHIP30(newShardState.Epoch) && minRateNotZero { for _, addr := range chain.ValidatorCandidates() { // skip elected validator if _, ok := isElected[addr]; ok { @@ -581,6 +589,19 @@ func setElectionEpochAndMinFee(chain engine.ChainReader, header *block.Header, s } } } + + // for all validators which have MaxRate < minRate + maxChangeRate + // set their MaxRate equal to the minRate + MaxChangeRate + // this will allow the wrapper.SanityCheck to pass if Rate is set to a value + // higher than the the MaxRate by UpdateMinimumCommissionFee above + if config.IsMaxRate(newShardState.Epoch) && minRateNotZero { + for _, addr := range chain.ValidatorCandidates() { + if _, err := availability.UpdateMaxCommissionFee(state, addr, minRate); err != nil { + return err + } + } + } + return nil } diff --git a/internal/configs/sharding/partner.go b/internal/configs/sharding/partner.go index 99ea96141..93cc4139d 100644 --- a/internal/configs/sharding/partner.go +++ b/internal/configs/sharding/partner.go @@ -40,6 +40,8 @@ const ( func (ps partnerSchedule) InstanceForEpoch(epoch *big.Int) Instance { switch { + case params.PartnerChainConfig.IsDevnetExternalEpoch(epoch): + return partnerV3 case params.PartnerChainConfig.IsHIP30(epoch): return partnerV2 case epoch.Cmp(params.PartnerChainConfig.StakingEpoch) >= 0: @@ -111,3 +113,11 @@ var partnerV2 = MustNewInstance( hip30CollectionAddressTestnet, partnerReshardingEpoch, PartnerSchedule.BlocksPerEpoch(), ) +var partnerV3 = MustNewInstance( + 2, 5, 1, 0, + numeric.MustNewDecFromStr("0.1"), genesis.TNHarmonyAccounts, + genesis.TNFoundationalAccounts, emptyAllowlist, + feeCollectorsDevnet[1], numeric.MustNewDecFromStr("0.25"), + hip30CollectionAddressTestnet, partnerReshardingEpoch, + PartnerSchedule.BlocksPerEpoch(), +) diff --git a/internal/params/config.go b/internal/params/config.go index d9bc1708a..332cd987b 100644 --- a/internal/params/config.go +++ b/internal/params/config.go @@ -77,6 +77,7 @@ var ( NoNilDelegationsEpoch: EpochTBD, BlockGas30MEpoch: big.NewInt(1673), // 2023-11-02 17:30:00+00:00 MaxRateEpoch: EpochTBD, + DevnetExternalEpoch: EpochTBD, } // TestnetChainConfig contains the chain parameters to run a node on the harmony test network. @@ -122,6 +123,7 @@ var ( NoNilDelegationsEpoch: EpochTBD, BlockGas30MEpoch: big.NewInt(2176), // 2023-10-12 10:00:00+00:00 MaxRateEpoch: EpochTBD, + DevnetExternalEpoch: EpochTBD, } // PangaeaChainConfig contains the chain parameters for the Pangaea network. // All features except for CrossLink are enabled at launch. @@ -167,6 +169,7 @@ var ( NoNilDelegationsEpoch: EpochTBD, BlockGas30MEpoch: big.NewInt(0), MaxRateEpoch: EpochTBD, + DevnetExternalEpoch: EpochTBD, } // PartnerChainConfig contains the chain parameters for the Partner network. @@ -213,6 +216,7 @@ var ( BlockGas30MEpoch: big.NewInt(7), NoNilDelegationsEpoch: EpochTBD, MaxRateEpoch: EpochTBD, + DevnetExternalEpoch: EpochTBD, } // StressnetChainConfig contains the chain parameters for the Stress test network. @@ -259,6 +263,7 @@ var ( NoNilDelegationsEpoch: big.NewInt(2), BlockGas30MEpoch: big.NewInt(0), MaxRateEpoch: EpochTBD, + DevnetExternalEpoch: EpochTBD, } // LocalnetChainConfig contains the chain parameters to run for local development. @@ -304,6 +309,7 @@ var ( NoNilDelegationsEpoch: big.NewInt(2), BlockGas30MEpoch: big.NewInt(0), MaxRateEpoch: EpochTBD, + DevnetExternalEpoch: EpochTBD, } // AllProtocolChanges ... @@ -351,6 +357,7 @@ var ( big.NewInt(0), // HIP30Epoch big.NewInt(0), // NoNilDelegationsEpoch big.NewInt(0), // MaxRateEpoch + big.NewInt(0), } // TestChainConfig ... @@ -398,6 +405,7 @@ var ( big.NewInt(0), // NoNilDelegationsEpoch big.NewInt(0), // BlockGas30M big.NewInt(0), // MaxRateEpoch + big.NewInt(0), } // TestRules ... @@ -565,6 +573,8 @@ type ChainConfig struct { // 4. Change the minimum validator commission from 5 to 7% (all nets) HIP30Epoch *big.Int `json:"hip30-epoch,omitempty"` + DevnetExternalEpoch *big.Int `json:"devnet-external-epoch,omitempty"` + BlockGas30MEpoch *big.Int `json:"block-gas-30m-epoch,omitempty"` // MaxRateEpoch will make sure the validator max-rate is at least equal to the minRate + the validator max-rate-increase @@ -647,6 +657,9 @@ func (c *ChainConfig) mustValid() { // capabilities required to transfer balance across shards require(c.HIP30Epoch.Cmp(c.CrossTxEpoch) > 0, "must satisfy: HIP30Epoch > CrossTxEpoch") + // max rate (7%) fix is applied on or after hip30 + require(c.MaxRateEpoch.Cmp(c.HIP30Epoch) >= 0, + "must satisfy: MaxRateEpoch >= HIP30Epoch") } // IsEIP155 returns whether epoch is either equal to the EIP155 fork epoch or greater. @@ -844,6 +857,10 @@ func (c *ChainConfig) IsHIP30(epoch *big.Int) bool { return isForked(c.HIP30Epoch, epoch) } +func (c *ChainConfig) IsDevnetExternalEpoch(epoch *big.Int) bool { + return isForked(c.DevnetExternalEpoch, epoch) +} + func (c *ChainConfig) IsMaxRate(epoch *big.Int) bool { return isForked(c.MaxRateEpoch, epoch) } diff --git a/rpc/blockchain.go b/rpc/blockchain.go index ae588e750..c9a6a1313 100644 --- a/rpc/blockchain.go +++ b/rpc/blockchain.go @@ -464,7 +464,11 @@ func (s *PublicBlockchainService) GetBlockReceipts( r, err = v2.NewReceipt(tx, blockHash, block.NumberU64(), index, rmap[tx.Hash()]) case Eth: if tx, ok := tx.(*types.Transaction); ok { - r, err = eth.NewReceipt(tx.ConvertToEth(), blockHash, block.NumberU64(), index, rmap[tx.Hash()]) + from, err := tx.SenderAddress() + if err != nil { + return nil, err + } + r, err = eth.NewReceipt(from, tx.ConvertToEth(), blockHash, block.NumberU64(), index, rmap[tx.Hash()]) } default: return nil, ErrUnknownRPCVersion diff --git a/rpc/eth/types.go b/rpc/eth/types.go index f1ad725eb..a319a8fc1 100644 --- a/rpc/eth/types.go +++ b/rpc/eth/types.go @@ -74,19 +74,9 @@ type Transaction struct { // representation, with the given location metadata set (if available). // Note that all txs on Harmony are replay protected (post EIP155 epoch). func NewTransaction( - tx *types.EthTransaction, blockHash common.Hash, + from common.Address, tx *types.EthTransaction, blockHash common.Hash, blockNumber uint64, timestamp uint64, index uint64, ) (*Transaction, error) { - from := common.Address{} - var err error - if tx.IsEthCompatible() { - from, err = tx.SenderAddress() - } else { - from, err = tx.ConvertToHmy().SenderAddress() - } - if err != nil { - return nil, err - } v, r, s := tx.RawSignatureValues() result := &Transaction{ @@ -143,14 +133,9 @@ func NewTransactionFromTransaction( } // NewReceipt returns the RPC data for a new receipt -func NewReceipt(tx *types.EthTransaction, blockHash common.Hash, blockNumber, blockIndex uint64, receipt *types.Receipt) (map[string]interface{}, error) { - senderAddr, err := tx.SenderAddress() - if err != nil { - return nil, err - } - +func NewReceipt(senderAddr common.Address, tx *types.EthTransaction, blockHash common.Hash, blockNumber, blockIndex uint64, receipt *types.Receipt) (map[string]interface{}, error) { ethTxHash := tx.Hash() - for i, _ := range receipt.Logs { + for i := range receipt.Logs { // Override log txHash with receipt's receipt.Logs[i].TxHash = ethTxHash } @@ -240,7 +225,11 @@ func blockWithFullTxFromBlock(b *types.Block) (*BlockWithFullTx, error) { } for idx, tx := range b.Transactions() { - fmtTx, err := NewTransaction(tx.ConvertToEth(), b.Hash(), b.NumberU64(), b.Time().Uint64(), uint64(idx)) + from, err := tx.SenderAddress() + if err != nil { + return nil, err + } + fmtTx, err := NewTransaction(from, tx.ConvertToEth(), b.Hash(), b.NumberU64(), b.Time().Uint64(), uint64(idx)) if err != nil { return nil, err } @@ -257,5 +246,10 @@ func NewTransactionFromBlockIndex(b *types.Block, index uint64) (*Transaction, e "tx index %v greater than or equal to number of transactions on block %v", index, b.Hash().String(), ) } - return NewTransaction(txs[index].ConvertToEth(), b.Hash(), b.NumberU64(), b.Time().Uint64(), index) + tx := txs[index].ConvertToEth() + from, err := tx.SenderAddress() + if err != nil { + return nil, err + } + return NewTransaction(from, tx, b.Hash(), b.NumberU64(), b.Time().Uint64(), index) } diff --git a/rpc/pool.go b/rpc/pool.go index c0d4858c2..ee4e34828 100644 --- a/rpc/pool.go +++ b/rpc/pool.go @@ -253,7 +253,14 @@ func (s *PublicPoolService) PendingTransactions( continue // Legacy behavior is to not return error here } case Eth: - tx, err = eth.NewTransaction(plainTx.ConvertToEth(), common.Hash{}, 0, 0, 0) + from, err := plainTx.SenderAddress() + if err != nil { + utils.Logger().Debug(). + Err(err). + Msgf("%v error at %v", LogTag, "PendingTransactions") + continue // Legacy behavior is to not return error here + } + tx, err = eth.NewTransaction(from, plainTx.ConvertToEth(), common.Hash{}, 0, 0, 0) if err != nil { utils.Logger().Debug(). Err(err). diff --git a/rpc/transaction.go b/rpc/transaction.go index 4106425c2..4b4504585 100644 --- a/rpc/transaction.go +++ b/rpc/transaction.go @@ -236,7 +236,13 @@ func (s *PublicTransactionService) newRPCTransaction(tx *types.Transaction, bloc } return NewStructuredResponse(tx) case Eth: - tx, err := eth.NewTransactionFromTransaction(tx, blockHash, blockNumber, timestamp, index) + // calculate SenderAddress before ConvertToEth + senderAddr, err := tx.SenderAddress() + if err != nil { + DoMetricRPCQueryInfo(GetTransactionByHash, FailedNumber) + return nil, err + } + tx, err := eth.NewTransaction(senderAddr, tx.ConvertToEth(), blockHash, blockNumber, timestamp, index) if err != nil { DoMetricRPCQueryInfo(GetTransactionByHash, FailedNumber) return nil, err @@ -751,7 +757,7 @@ func (s *PublicTransactionService) GetTransactionReceipt( return nil, err } return NewStructuredResponse(RPCReceipt) - case V2, Eth: + case V2: if tx == nil { RPCReceipt, err = v2.NewReceipt(stx, blockHash, blockNumber, index, receipt) } else { @@ -761,6 +767,19 @@ func (s *PublicTransactionService) GetTransactionReceipt( return nil, err } return NewStructuredResponse(RPCReceipt) + case Eth: + if tx != nil { + // calculate SenderAddress before ConvertToEth + senderAddr, err := tx.SenderAddress() + if err != nil { + return nil, err + } + RPCReceipt, err = eth.NewReceipt(senderAddr, tx.ConvertToEth(), blockHash, blockNumber, index, receipt) + } + if err != nil { + return nil, err + } + return NewStructuredResponse(RPCReceipt) default: return nil, ErrUnknownRPCVersion } diff --git a/staking/types/delegation.go b/staking/types/delegation.go index 0ec742481..4b480b1d4 100644 --- a/staking/types/delegation.go +++ b/staking/types/delegation.go @@ -193,15 +193,21 @@ func (d *Delegation) DeleteEntry(epoch *big.Int) { // RemoveUnlockedUndelegations removes all fully unlocked // undelegations and returns the total sum func (d *Delegation) RemoveUnlockedUndelegations( - curEpoch, lastEpochInCommittee *big.Int, lockPeriod int, noEarlyUnlock bool, + curEpoch, lastEpochInCommittee *big.Int, lockPeriod int, noEarlyUnlock bool, isMaxRate bool, ) *big.Int { totalWithdraw := big.NewInt(0) count := 0 for j := range d.Undelegations { - if big.NewInt(0).Sub(curEpoch, d.Undelegations[j].Epoch).Int64() >= int64(lockPeriod) || - (!noEarlyUnlock && big.NewInt(0).Sub(curEpoch, lastEpochInCommittee).Int64() >= int64(lockPeriod)) { - // need to wait at least 7 epochs to withdraw; or the validator has been out of committee for 7 epochs - totalWithdraw.Add(totalWithdraw, d.Undelegations[j].Amount) + epochsSinceUndelegation := big.NewInt(0).Sub(curEpoch, d.Undelegations[j].Epoch).Int64() + // >=7 epochs have passed since undelegation, or + lockPeriodApplies := epochsSinceUndelegation >= int64(lockPeriod) + // >=7 epochs have passed since unelection during the noEarlyUnlock configuration + earlyUnlockPeriodApplies := big.NewInt(0).Sub(curEpoch, lastEpochInCommittee).Int64() >= int64(lockPeriod) && !noEarlyUnlock + maxRateApplies := isMaxRate && epochsSinceUndelegation > int64(lockPeriod) + if lockPeriodApplies || earlyUnlockPeriodApplies { + if !maxRateApplies { + totalWithdraw.Add(totalWithdraw, d.Undelegations[j].Amount) + } count++ } else { break diff --git a/staking/types/delegation_test.go b/staking/types/delegation_test.go index e8b8a68b1..d3450df8f 100644 --- a/staking/types/delegation_test.go +++ b/staking/types/delegation_test.go @@ -75,7 +75,7 @@ func TestUnlockedLastEpochInCommittee(t *testing.T) { amount4 := big.NewInt(4000) delegation.Undelegate(epoch4, amount4, nil) - result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, false) + result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, false, false) if result.Cmp(big.NewInt(8000)) != 0 { t.Errorf("removing an unlocked undelegation fails") } @@ -90,7 +90,7 @@ func TestUnlockedLastEpochInCommitteeFail(t *testing.T) { amount4 := big.NewInt(4000) delegation.Undelegate(epoch4, amount4, nil) - result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, false) + result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, false, false) if result.Cmp(big.NewInt(0)) != 0 { t.Errorf("premature delegation shouldn't be unlocked") } @@ -104,7 +104,7 @@ func TestUnlockedFullPeriod(t *testing.T) { amount5 := big.NewInt(4000) delegation.Undelegate(epoch5, amount5, nil) - result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, false) + result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, false, false) if result.Cmp(big.NewInt(4000)) != 0 { t.Errorf("removing an unlocked undelegation fails") } @@ -118,7 +118,7 @@ func TestQuickUnlock(t *testing.T) { amount7 := big.NewInt(4000) delegation.Undelegate(epoch7, amount7, nil) - result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 0, false) + result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 0, false, false) if result.Cmp(big.NewInt(4000)) != 0 { t.Errorf("removing an unlocked undelegation fails") } @@ -133,7 +133,7 @@ func TestUnlockedFullPeriodFail(t *testing.T) { amount5 := big.NewInt(4000) delegation.Undelegate(epoch5, amount5, nil) - result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, false) + result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, false, false) if result.Cmp(big.NewInt(0)) != 0 { t.Errorf("premature delegation shouldn't be unlocked") } @@ -147,7 +147,7 @@ func TestUnlockedPremature(t *testing.T) { amount6 := big.NewInt(4000) delegation.Undelegate(epoch6, amount6, nil) - result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, false) + result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, false, false) if result.Cmp(big.NewInt(0)) != 0 { t.Errorf("premature delegation shouldn't be unlocked") } @@ -161,8 +161,128 @@ func TestNoEarlyUnlock(t *testing.T) { amount4 := big.NewInt(4000) delegation.Undelegate(epoch4, amount4, nil) - result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, true) + result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, true, false) if result.Cmp(big.NewInt(0)) != 0 { t.Errorf("should not allow early unlock") } } + +func TestMaxRateAtLess(t *testing.T) { + // recreate it so that all tests can run + delegation := NewDelegation(delegatorAddr, delegationAmt) + lastEpochInCommittee := big.NewInt(1) + curEpoch := big.NewInt(27) + epoch := big.NewInt(21) + amount := big.NewInt(4000) + delegation.Undelegate(epoch, amount) + initialLength := len(delegation.Undelegations) + + result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, true, true) + if result.Cmp(big.NewInt(0)) != 0 { + t.Errorf("should not allow unlock before 7") + } + finalLength := len(delegation.Undelegations) + if initialLength != finalLength { + t.Errorf("should not remove undelegations before 7") + } +} + +func TestMaxRateAtEqual(t *testing.T) { + // recreate it so that all tests can run + delegation := NewDelegation(delegatorAddr, delegationAmt) + lastEpochInCommittee := big.NewInt(1) + curEpoch := big.NewInt(28) + epoch := big.NewInt(21) + amount := big.NewInt(4000) + delegation.Undelegate(epoch, amount) + initialLength := len(delegation.Undelegations) + + result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, true, true) + if result.Cmp(big.NewInt(4000)) != 0 { + t.Errorf("should withdraw at 7") + } + finalLength := len(delegation.Undelegations) + if initialLength == finalLength { + t.Errorf("should remove undelegations at 7") + } +} + +func TestMaxRateAtExcess(t *testing.T) { + // recreate it so that all tests can run + delegation := NewDelegation(delegatorAddr, delegationAmt) + lastEpochInCommittee := big.NewInt(1) + curEpoch := big.NewInt(29) + epoch := big.NewInt(21) + amount := big.NewInt(4000) + delegation.Undelegate(epoch, amount) + initialLength := len(delegation.Undelegations) + + result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, true, true) + if result.Cmp(big.NewInt(0)) != 0 { + t.Errorf("should not withdraw at 8") + } + finalLength := len(delegation.Undelegations) + if initialLength == finalLength { + t.Errorf("should remove undelegations at 8") + } +} + +func TestNoMaxRateAtLess(t *testing.T) { + // recreate it so that all tests can run + delegation := NewDelegation(delegatorAddr, delegationAmt) + lastEpochInCommittee := big.NewInt(1) + curEpoch := big.NewInt(27) + epoch := big.NewInt(21) + amount := big.NewInt(4000) + delegation.Undelegate(epoch, amount) + initialLength := len(delegation.Undelegations) + + result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, true, false) + if result.Cmp(big.NewInt(0)) != 0 { + t.Errorf("should not allow unlock before 7") + } + finalLength := len(delegation.Undelegations) + if initialLength != finalLength { + t.Errorf("should not remove undelegations before 7") + } +} + +func TestNoMaxRateAtEqual(t *testing.T) { + // recreate it so that all tests can run + delegation := NewDelegation(delegatorAddr, delegationAmt) + lastEpochInCommittee := big.NewInt(1) + curEpoch := big.NewInt(28) + epoch := big.NewInt(21) + amount := big.NewInt(4000) + delegation.Undelegate(epoch, amount) + initialLength := len(delegation.Undelegations) + + result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, true, false) + if result.Cmp(big.NewInt(4000)) != 0 { + t.Errorf("should withdraw at 7") + } + finalLength := len(delegation.Undelegations) + if initialLength == finalLength { + t.Errorf("should remove undelegations at 7") + } +} + +func TestNoMaxRateAtExcess(t *testing.T) { + // recreate it so that all tests can run + delegation := NewDelegation(delegatorAddr, delegationAmt) + lastEpochInCommittee := big.NewInt(1) + curEpoch := big.NewInt(29) + epoch := big.NewInt(21) + amount := big.NewInt(4000) + delegation.Undelegate(epoch, amount) + initialLength := len(delegation.Undelegations) + + result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, true, false) + if result.Cmp(big.NewInt(4000)) != 0 { + t.Errorf("should withdraw at 8") + } + finalLength := len(delegation.Undelegations) + if initialLength == finalLength { + t.Errorf("should remove undelegations at 8") + } +}