Merge pull request #4603 from harmony-one/dev-clear-stake-010924

Update feature with dev
feature/clear-stale-staking-data
Adam Androulidakis 11 months ago committed by GitHub
commit 2d521b6806
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 87
      api/service/stagedstreamsync/default_stages.go
  2. 1
      api/service/stagedstreamsync/downloader.go
  3. 12
      api/service/stagedstreamsync/sig_verify.go
  4. 23
      api/service/stagedstreamsync/stage.go
  5. 5
      api/service/stagedstreamsync/stage_finish.go
  6. 6
      api/service/stagedstreamsync/stage_receipts.go
  7. 4
      api/service/stagedstreamsync/stage_states.go
  8. 9
      api/service/stagedstreamsync/stage_statesync.go
  9. 70
      api/service/stagedstreamsync/stage_statesync_full.go
  10. 55
      api/service/stagedstreamsync/staged_stream_sync.go
  11. 19
      api/service/stagedstreamsync/stages.go
  12. 147
      api/service/stagedstreamsync/state_sync_full.go
  13. 2
      api/service/stagedstreamsync/syncing.go
  14. 9
      core/tx_pool.go
  15. 8
      hmy/staking.go
  16. 29
      internal/chain/engine.go
  17. 10
      internal/configs/sharding/partner.go
  18. 17
      internal/params/config.go
  19. 6
      rpc/blockchain.go
  20. 34
      rpc/eth/types.go
  21. 9
      rpc/pool.go
  22. 23
      rpc/transaction.go
  23. 16
      staking/types/delegation.go
  24. 134
      staking/types/delegation_test.go

@ -64,7 +64,7 @@ func initFastSyncStagesOrder() {
ShortRange, ShortRange,
BlockBodies, BlockBodies,
Receipts, Receipts,
StateSync, FullStateSync,
States, States,
LastMile, LastMile,
Finish, Finish,
@ -74,7 +74,7 @@ func initFastSyncStagesOrder() {
Finish, Finish,
LastMile, LastMile,
States, States,
StateSync, FullStateSync,
Receipts, Receipts,
BlockBodies, BlockBodies,
ShortRange, ShortRange,
@ -86,7 +86,7 @@ func initFastSyncStagesOrder() {
Finish, Finish,
LastMile, LastMile,
States, States,
StateSync, FullStateSync,
Receipts, Receipts,
BlockBodies, BlockBodies,
ShortRange, ShortRange,
@ -101,6 +101,7 @@ func DefaultStages(ctx context.Context,
srCfg StageShortRangeCfg, srCfg StageShortRangeCfg,
bodiesCfg StageBodiesCfg, bodiesCfg StageBodiesCfg,
stateSyncCfg StageStateSyncCfg, stateSyncCfg StageStateSyncCfg,
fullStateSyncCfg StageFullStateSyncCfg,
statesCfg StageStatesCfg, statesCfg StageStatesCfg,
receiptsCfg StageReceiptsCfg, receiptsCfg StageReceiptsCfg,
lastMileCfg StageLastMileCfg, lastMileCfg StageLastMileCfg,
@ -113,55 +114,81 @@ func DefaultStages(ctx context.Context,
handlerStageBodies := NewStageBodies(bodiesCfg) handlerStageBodies := NewStageBodies(bodiesCfg)
handlerStageStates := NewStageStates(statesCfg) handlerStageStates := NewStageStates(statesCfg)
handlerStageStateSync := NewStageStateSync(stateSyncCfg) handlerStageStateSync := NewStageStateSync(stateSyncCfg)
handlerStageFullStateSync := NewStageFullStateSync(fullStateSyncCfg)
handlerStageReceipts := NewStageReceipts(receiptsCfg) handlerStageReceipts := NewStageReceipts(receiptsCfg)
handlerStageLastMile := NewStageLastMile(lastMileCfg) handlerStageLastMile := NewStageLastMile(lastMileCfg)
handlerStageFinish := NewStageFinish(finishCfg) handlerStageFinish := NewStageFinish(finishCfg)
return []*Stage{ return []*Stage{
{ {
ID: Heads, ID: Heads,
Description: "Retrieve Chain Heads", Description: "Retrieve Chain Heads",
Handler: handlerStageHeads, Handler: handlerStageHeads,
RangeMode: OnlyLongRange,
ChainExecutionMode: AllChains,
}, },
{ {
ID: SyncEpoch, ID: SyncEpoch,
Description: "Sync only Last Block of Epoch", Description: "Sync only Last Block of Epoch",
Handler: handlerStageEpochSync, Handler: handlerStageEpochSync,
RangeMode: OnlyShortRange,
ChainExecutionMode: OnlyEpochChain,
}, },
{ {
ID: ShortRange, ID: ShortRange,
Description: "Short Range Sync", Description: "Short Range Sync",
Handler: handlerStageShortRange, Handler: handlerStageShortRange,
RangeMode: OnlyShortRange,
ChainExecutionMode: AllChainsExceptEpochChain,
}, },
{ {
ID: BlockBodies, ID: BlockBodies,
Description: "Retrieve Block Bodies", Description: "Retrieve Block Bodies",
Handler: handlerStageBodies, Handler: handlerStageBodies,
RangeMode: OnlyLongRange,
ChainExecutionMode: AllChainsExceptEpochChain,
}, },
{ {
ID: States, ID: States,
Description: "Update Blockchain State", Description: "Update Blockchain State",
Handler: handlerStageStates, Handler: handlerStageStates,
RangeMode: OnlyLongRange,
ChainExecutionMode: AllChainsExceptEpochChain,
}, },
{ {
ID: StateSync, ID: StateSync,
Description: "Retrieve States", Description: "Retrieve States",
Handler: handlerStageStateSync, Handler: handlerStageStateSync,
RangeMode: OnlyLongRange,
ChainExecutionMode: AllChainsExceptEpochChain,
}, },
{ {
ID: Receipts, ID: FullStateSync,
Description: "Retrieve Receipts", Description: "Retrieve Full States",
Handler: handlerStageReceipts, Handler: handlerStageFullStateSync,
RangeMode: OnlyLongRange,
ChainExecutionMode: AllChainsExceptEpochChain,
}, },
{ {
ID: LastMile, ID: Receipts,
Description: "update status for blocks after sync and update last mile blocks as well", Description: "Retrieve Receipts",
Handler: handlerStageLastMile, Handler: handlerStageReceipts,
RangeMode: OnlyLongRange,
ChainExecutionMode: AllChainsExceptEpochChain,
}, },
{ {
ID: Finish, ID: LastMile,
Description: "Finalize Changes", Description: "update status for blocks after sync and update last mile blocks as well",
Handler: handlerStageFinish, Handler: handlerStageLastMile,
RangeMode: LongRangeAndShortRange,
ChainExecutionMode: AllChainsExceptEpochChain,
},
{
ID: Finish,
Description: "Finalize Changes",
Handler: handlerStageFinish,
RangeMode: LongRangeAndShortRange,
ChainExecutionMode: AllChains,
}, },
} }
} }

@ -285,4 +285,5 @@ func (d *Downloader) loop() {
return return
} }
} }
} }

@ -54,14 +54,7 @@ func verifyBlock(bc blockChain, block *types.Block, nextBlocks ...*types.Block)
if err := bc.Engine().VerifyHeader(bc, block.Header(), true); err != nil { if err := bc.Engine().VerifyHeader(bc, block.Header(), true); err != nil {
return errors.Wrap(err, "[VerifyHeader]") return errors.Wrap(err, "[VerifyHeader]")
} }
_, err = bc.InsertChain(types.Blocks{block}, false)
switch {
case errors.Is(err, core.ErrKnownBlock):
return nil
case err != nil:
return errors.Wrap(err, "[InsertChain]")
default:
}
return nil return nil
} }
@ -72,6 +65,9 @@ func verifyAndInsertBlock(bc blockChain, block *types.Block, nextBlocks ...*type
} }
// insert block // insert block
if _, err := bc.InsertChain(types.Blocks{block}, false); err != nil { if _, err := bc.InsertChain(types.Blocks{block}, false); err != nil {
if errors.Is(err, core.ErrKnownBlock) {
return nil
}
return errors.Wrap(err, "[InsertChain]") return errors.Wrap(err, "[InsertChain]")
} }
return nil return nil

@ -30,6 +30,25 @@ type StageHandler interface {
CleanUp(ctx context.Context, firstCycle bool, p *CleanUpState, tx kv.RwTx) error CleanUp(ctx context.Context, firstCycle bool, p *CleanUpState, tx kv.RwTx) error
} }
type RangeExecution uint32
const (
LongRangeAndShortRange RangeExecution = iota // Both short range and long range
OnlyShortRange // only short range
OnlyLongRange // only long range
//OnlyEpochSync // only epoch sync
)
type ChainExecution uint32
const (
AllChains ChainExecution = iota // Can execute for any shard
AllChainsExceptEpochChain // Can execute for any shard except epoch chain
OnlyBeaconNode // only for beacon node
OnlyEpochChain // only for epoch chain
OnlyShardChain // only for shard node (exclude beacon node and epoch chain)
)
// Stage is a single sync stage in staged sync. // Stage is a single sync stage in staged sync.
type Stage struct { type Stage struct {
// ID of the sync stage. Should not be empty and should be unique. It is recommended to prefix it with reverse domain to avoid clashes (`com.example.my-stage`). // ID of the sync stage. Should not be empty and should be unique. It is recommended to prefix it with reverse domain to avoid clashes (`com.example.my-stage`).
@ -42,6 +61,10 @@ type Stage struct {
DisabledDescription string DisabledDescription string
// Disabled defines if the stage is disabled. It sets up when the stage is build by its `StageBuilder`. // Disabled defines if the stage is disabled. It sets up when the stage is build by its `StageBuilder`.
Disabled bool Disabled bool
// Range defines whether stage has to be executed for either long range or short range
RangeMode RangeExecution
// ShardExecution defines this stage has to be executed for which shards
ChainExecutionMode ChainExecution
} }
// StageState is the state of the stage. // StageState is the state of the stage.

@ -39,6 +39,11 @@ func (finish *StageFinish) Exec(ctx context.Context, firstCycle bool, invalidBlo
// TODO: prepare indices (useful for RPC) and finalize // TODO: prepare indices (useful for RPC) and finalize
// switch to Full Sync Mode if the states are synced
if s.state.status.statesSynced {
s.state.status.cycleSyncMode = FullSync
}
if useInternalTx { if useInternalTx {
if err := tx.Commit(); err != nil { if err := tx.Commit(); err != nil {
return err return err

@ -12,6 +12,7 @@ import (
"github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
sttypes "github.com/harmony-one/harmony/p2p/stream/types" sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/harmony-one/harmony/shard"
"github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -56,6 +57,11 @@ func (r *StageReceipts) Exec(ctx context.Context, firstCycle bool, invalidBlockR
return nil return nil
} }
// shouldn't execute for epoch chain
if r.configs.bc.ShardID() == shard.BeaconChainShardID && !s.state.isBeaconNode {
return nil
}
useInternalTx := tx == nil useInternalTx := tx == nil
if invalidBlockRevert { if invalidBlockRevert {

@ -165,6 +165,10 @@ func (stg *StageStates) Exec(ctx context.Context, firstCycle bool, invalidBlockR
return ErrInvalidBlockNumber return ErrInvalidBlockNumber
} }
if stg.configs.bc.HasBlock(block.Hash(), block.NumberU64()) {
continue
}
if err := verifyAndInsertBlock(stg.configs.bc, block); err != nil { if err := verifyAndInsertBlock(stg.configs.bc, block); err != nil {
stg.configs.logger.Warn().Err(err).Uint64("cycle target block", targetHeight). stg.configs.logger.Warn().Err(err).Uint64("cycle target block", targetHeight).
Uint64("block number", block.NumberU64()). Uint64("block number", block.NumberU64()).

@ -10,6 +10,7 @@ import (
"github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
sttypes "github.com/harmony-one/harmony/p2p/stream/types" sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/harmony-one/harmony/shard"
"github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
@ -58,8 +59,14 @@ func (sss *StageStateSync) Exec(ctx context.Context, bool, invalidBlockRevert bo
// for short range sync, skip this step // for short range sync, skip this step
if !s.state.initSync { if !s.state.initSync {
return nil return nil
} // only execute this stage in fast/snap sync mode and once we reach to pivot }
// shouldn't execute for epoch chain
if sss.configs.bc.ShardID() == shard.BeaconChainShardID && !s.state.isBeaconNode {
return nil
}
// only execute this stage in fast/snap sync mode and once we reach to pivot
if s.state.status.pivotBlock == nil || if s.state.status.pivotBlock == nil ||
s.state.CurrentBlockNumber() != s.state.status.pivotBlock.NumberU64() || s.state.CurrentBlockNumber() != s.state.status.pivotBlock.NumberU64() ||
s.state.status.statesSynced { s.state.status.statesSynced {

@ -9,6 +9,7 @@ import (
"github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
sttypes "github.com/harmony-one/harmony/p2p/stream/types" sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/harmony-one/harmony/shard"
"github.com/pkg/errors" "github.com/pkg/errors"
//sttypes "github.com/harmony-one/harmony/p2p/stream/types" //sttypes "github.com/harmony-one/harmony/p2p/stream/types"
@ -59,8 +60,19 @@ func (sss *StageFullStateSync) Exec(ctx context.Context, bool, invalidBlockRever
// for short range sync, skip this step // for short range sync, skip this step
if !s.state.initSync { if !s.state.initSync {
return nil return nil
} // only execute this stage in fast/snap sync mode and once we reach to pivot }
// shouldn't execute for epoch chain
if sss.configs.bc.ShardID() == shard.BeaconChainShardID && !s.state.isBeaconNode {
return nil
}
// if states are already synced, don't execute this stage
if s.state.status.statesSynced {
return
}
// only execute this stage in fast/snap sync mode and once we reach to pivot
if s.state.status.pivotBlock == nil || if s.state.status.pivotBlock == nil ||
s.state.CurrentBlockNumber() != s.state.status.pivotBlock.NumberU64() || s.state.CurrentBlockNumber() != s.state.status.pivotBlock.NumberU64() ||
s.state.status.statesSynced { s.state.status.statesSynced {
@ -72,21 +84,21 @@ func (sss *StageFullStateSync) Exec(ctx context.Context, bool, invalidBlockRever
// if currentHead >= maxHeight { // if currentHead >= maxHeight {
// return nil // return nil
// } // }
// currProgress := s.state.CurrentBlockNumber()
// targetHeight := s.state.currentCycle.TargetHeight // targetHeight := s.state.currentCycle.TargetHeight
// if errV := CreateView(ctx, sss.configs.db, tx, func(etx kv.Tx) error { currProgress := uint64(0)
// if currProgress, err = s.CurrentStageProgress(etx); err != nil { if errV := CreateView(ctx, sss.configs.db, tx, func(etx kv.Tx) error {
// return err if currProgress, err = s.CurrentStageProgress(etx); err != nil {
// } return err
// return nil }
// }); errV != nil { return nil
// return errV }); errV != nil {
// } return errV
}
if currProgress >= s.state.status.pivotBlock.NumberU64() {
return nil
}
// if currProgress >= targetHeight {
// return nil
// }
useInternalTx := tx == nil useInternalTx := tx == nil
if useInternalTx { if useInternalTx {
var err error var err error
@ -109,6 +121,8 @@ func (sss *StageFullStateSync) Exec(ctx context.Context, bool, invalidBlockRever
scheme := sss.configs.bc.TrieDB().Scheme() scheme := sss.configs.bc.TrieDB().Scheme()
sdm := newFullStateDownloadManager(sss.configs.bc.ChainDb(), scheme, tx, sss.configs.bc, sss.configs.concurrency, s.state.logger) sdm := newFullStateDownloadManager(sss.configs.bc.ChainDb(), scheme, tx, sss.configs.bc, sss.configs.concurrency, s.state.logger)
sdm.setRootHash(currentBlockRootHash) sdm.setRootHash(currentBlockRootHash)
sdm.SyncStarted()
var wg sync.WaitGroup var wg sync.WaitGroup
for i := 0; i < s.state.config.Concurrency; i++ { for i := 0; i < s.state.config.Concurrency; i++ {
wg.Add(1) wg.Add(1)
@ -128,6 +142,12 @@ func (sss *StageFullStateSync) Exec(ctx context.Context, bool, invalidBlockRever
// states should be fully synced in this stage // states should be fully synced in this stage
s.state.status.statesSynced = true s.state.status.statesSynced = true
if err := sss.saveProgress(s, tx); err != nil {
sss.configs.logger.Warn().Err(err).
Uint64("pivot block number", s.state.status.pivotBlock.NumberU64()).
Msg(WrapStagedSyncMsg("save progress for statesync stage failed"))
}
/* /*
gbm := s.state.gbm gbm := s.state.gbm
@ -169,8 +189,8 @@ func (sss *StageFullStateSync) runStateWorkerLoop(ctx context.Context, sdm *Full
return return
default: default:
} }
accountTasks, codes, storages, healtask, codetask, err := sdm.GetNextBatch() accountTasks, codes, storages, healtask, codetask, nTasks, err := sdm.GetNextBatch()
if len(accountTasks)+len(codes)+len(storages.accounts)+len(healtask.hashes)+len(codetask.hashes) == 0 || err != nil { if nTasks == 0 || err != nil {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
@ -184,8 +204,8 @@ func (sss *StageFullStateSync) runStateWorkerLoop(ctx context.Context, sdm *Full
task := accountTasks[0] task := accountTasks[0]
origin := task.Next origin := task.Next
limit := task.Last limit := task.Last
root := sdm.root root := task.root
cap := maxRequestSize cap := task.cap
retAccounts, proof, stid, err := sss.configs.protocol.GetAccountRange(ctx, root, origin, limit, uint64(cap)) retAccounts, proof, stid, err := sss.configs.protocol.GetAccountRange(ctx, root, origin, limit, uint64(cap))
if err != nil { if err != nil {
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
@ -234,10 +254,10 @@ func (sss *StageFullStateSync) runStateWorkerLoop(ctx context.Context, sdm *Full
} else if len(storages.accounts) > 0 { } else if len(storages.accounts) > 0 {
root := sdm.root root := storages.root
roots := storages.roots roots := storages.roots
accounts := storages.accounts accounts := storages.accounts
cap := maxRequestSize cap := storages.cap
origin := storages.origin origin := storages.origin
limit := storages.limit limit := storages.limit
mainTask := storages.mainTask mainTask := storages.mainTask
@ -276,13 +296,14 @@ func (sss *StageFullStateSync) runStateWorkerLoop(ctx context.Context, sdm *Full
} else { } else {
// assign trie node Heal Tasks // assign trie node Heal Tasks
if len(healtask.hashes) > 0 { if len(healtask.hashes) > 0 {
root := sdm.root root := healtask.root
task := healtask.task task := healtask.task
hashes := healtask.hashes hashes := healtask.hashes
pathsets := healtask.pathsets pathsets := healtask.pathsets
paths := healtask.paths paths := healtask.paths
bytes := healtask.bytes
nodes, stid, err := sss.configs.protocol.GetTrieNodes(ctx, root, pathsets, maxRequestSize) nodes, stid, err := sss.configs.protocol.GetTrieNodes(ctx, root, pathsets, uint64(bytes))
if err != nil { if err != nil {
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
sss.configs.protocol.StreamFailed(stid, "GetTrieNodes failed") sss.configs.protocol.StreamFailed(stid, "GetTrieNodes failed")
@ -316,7 +337,8 @@ func (sss *StageFullStateSync) runStateWorkerLoop(ctx context.Context, sdm *Full
if len(codetask.hashes) > 0 { if len(codetask.hashes) > 0 {
task := codetask.task task := codetask.task
hashes := codetask.hashes hashes := codetask.hashes
retCodes, stid, err := sss.configs.protocol.GetByteCodes(ctx, hashes, maxRequestSize) bytes := codetask.bytes
retCodes, stid, err := sss.configs.protocol.GetByteCodes(ctx, hashes, uint64(bytes))
if err != nil { if err != nil {
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
sss.configs.protocol.StreamFailed(stid, "GetByteCodes failed") sss.configs.protocol.StreamFailed(stid, "GetByteCodes failed")
@ -354,7 +376,7 @@ func (sss *StageFullStateSync) downloadByteCodes(ctx context.Context, sdm *FullS
for _, codeTask := range codeTasks { for _, codeTask := range codeTasks {
// try to get byte codes from remote peer // try to get byte codes from remote peer
// if any of them failed, the stid will be the id of the failed stream // if any of them failed, the stid will be the id of the failed stream
retCodes, stid, err := sss.configs.protocol.GetByteCodes(ctx, codeTask.hashes, maxRequestSize) retCodes, stid, err := sss.configs.protocol.GetByteCodes(ctx, codeTask.hashes, uint64(codeTask.cap))
if err != nil { if err != nil {
return stid, err return stid, err
} }
@ -413,7 +435,7 @@ func (stg *StageFullStateSync) saveProgress(s *StageState, tx kv.RwTx) (err erro
} }
// save progress // save progress
if err = s.Update(tx, s.state.CurrentBlockNumber()); err != nil { if err = s.Update(tx, s.state.status.pivotBlock.NumberU64()); err != nil {
utils.Logger().Error(). utils.Logger().Error().
Err(err). Err(err).
Msgf("[STAGED_SYNC] saving progress for block States stage failed") Msgf("[STAGED_SYNC] saving progress for block States stage failed")

@ -16,6 +16,7 @@ import (
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
syncproto "github.com/harmony-one/harmony/p2p/stream/protocols/sync" syncproto "github.com/harmony-one/harmony/p2p/stream/protocols/sync"
sttypes "github.com/harmony-one/harmony/p2p/stream/types" sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/harmony-one/harmony/shard"
"github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
@ -405,6 +406,11 @@ func (s *StagedStreamSync) Run(ctx context.Context, db kv.RwDB, tx kv.RwTx, firs
continue continue
} }
// TODO: enable this part after make sure all works well
// if !s.canExecute(stage) {
// continue
// }
if err := s.runStage(ctx, stage, db, tx, firstCycle, s.invalidBlock.Active); err != nil { if err := s.runStage(ctx, stage, db, tx, firstCycle, s.invalidBlock.Active); err != nil {
utils.Logger().Error(). utils.Logger().Error().
Err(err). Err(err).
@ -431,6 +437,55 @@ func (s *StagedStreamSync) Run(ctx context.Context, db kv.RwDB, tx kv.RwTx, firs
return nil return nil
} }
func (s *StagedStreamSync) canExecute(stage *Stage) bool {
// check range mode
if stage.RangeMode != LongRangeAndShortRange {
isLongRange := s.initSync
switch stage.RangeMode {
case OnlyLongRange:
if !isLongRange {
return false
}
case OnlyShortRange:
if isLongRange {
return false
}
default:
return false
}
}
// check chain execution
if stage.ChainExecutionMode != AllChains {
shardID := s.bc.ShardID()
isBeaconNode := s.isBeaconNode
isShardChain := shardID != shard.BeaconChainShardID
isEpochChain := shardID == shard.BeaconChainShardID && !isBeaconNode
switch stage.ChainExecutionMode {
case AllChainsExceptEpochChain:
if isEpochChain {
return false
}
case OnlyBeaconNode:
if !isBeaconNode {
return false
}
case OnlyShardChain:
if !isShardChain {
return false
}
case OnlyEpochChain:
if !isEpochChain {
return false
}
default:
return false
}
}
return true
}
// CreateView creates a view for a given db // CreateView creates a view for a given db
func CreateView(ctx context.Context, db kv.RwDB, tx kv.Tx, f func(tx kv.Tx) error) error { func CreateView(ctx context.Context, db kv.RwDB, tx kv.Tx, f func(tx kv.Tx) error) error {
if tx != nil { if tx != nil {

@ -8,15 +8,16 @@ import (
type SyncStageID string type SyncStageID string
const ( const (
Heads SyncStageID = "Heads" // Heads are downloaded Heads SyncStageID = "Heads" // Heads are downloaded
ShortRange SyncStageID = "ShortRange" // short range ShortRange SyncStageID = "ShortRange" // short range
SyncEpoch SyncStageID = "SyncEpoch" // epoch sync SyncEpoch SyncStageID = "SyncEpoch" // epoch sync
BlockBodies SyncStageID = "BlockBodies" // Block bodies are downloaded, TxHash and UncleHash are getting verified BlockBodies SyncStageID = "BlockBodies" // Block bodies are downloaded, TxHash and UncleHash are getting verified
States SyncStageID = "States" // will construct most recent state from downloaded blocks States SyncStageID = "States" // will construct most recent state from downloaded blocks
StateSync SyncStageID = "StateSync" // State sync StateSync SyncStageID = "StateSync" // State sync
Receipts SyncStageID = "Receipts" // Receipts FullStateSync SyncStageID = "FullStateSync" // Full State Sync
LastMile SyncStageID = "LastMile" // update blocks after sync and update last mile blocks as well Receipts SyncStageID = "Receipts" // Receipts
Finish SyncStageID = "Finish" // Nominal stage after all other stages LastMile SyncStageID = "LastMile" // update blocks after sync and update last mile blocks as well
Finish SyncStageID = "Finish" // Nominal stage after all other stages
) )
// GetStageName returns the stage name in string // GetStageName returns the stage name in string

@ -108,6 +108,11 @@ var (
type accountTask struct { type accountTask struct {
id uint64 //unique id for account task id uint64 //unique id for account task
root common.Hash
origin common.Hash
limit common.Hash
cap int
// These fields get serialized to leveldb on shutdown // These fields get serialized to leveldb on shutdown
Next common.Hash // Next account to sync in this interval Next common.Hash // Next account to sync in this interval
Last common.Hash // Last account to sync in this interval Last common.Hash // Last account to sync in this interval
@ -229,16 +234,19 @@ type byteCodeTasksBundle struct {
id uint64 //unique id for bytecode task bundle id uint64 //unique id for bytecode task bundle
task *accountTask task *accountTask
hashes []common.Hash hashes []common.Hash
cap int
} }
type storageTaskBundle struct { type storageTaskBundle struct {
id uint64 //unique id for storage task bundle id uint64 //unique id for storage task bundle
root common.Hash
accounts []common.Hash accounts []common.Hash
roots []common.Hash roots []common.Hash
mainTask *accountTask mainTask *accountTask
subtask *storageTask subtask *storageTask
origin common.Hash origin common.Hash
limit common.Hash limit common.Hash
cap int
} }
// healTask represents the sync task for healing the snap-synced chunk boundaries. // healTask represents the sync task for healing the snap-synced chunk boundaries.
@ -251,6 +259,7 @@ type healTask struct {
pathsets []*message.TrieNodePathSet pathsets []*message.TrieNodePathSet
task *healTask task *healTask
root common.Hash root common.Hash
bytes int
byteCodeReq bool byteCodeReq bool
} }
@ -259,7 +268,6 @@ type tasks struct {
storageTasks map[uint64]*storageTaskBundle // Set of trie node tasks currently queued for retrieval, indexed by path storageTasks map[uint64]*storageTaskBundle // Set of trie node tasks currently queued for retrieval, indexed by path
codeTasks map[uint64]*byteCodeTasksBundle // Set of byte code tasks currently queued for retrieval, indexed by hash codeTasks map[uint64]*byteCodeTasksBundle // Set of byte code tasks currently queued for retrieval, indexed by hash
healer map[uint64]*healTask healer map[uint64]*healTask
snapped bool // Flag to signal that snap phase is done
} }
func newTasks() *tasks { func newTasks() *tasks {
@ -268,7 +276,6 @@ func newTasks() *tasks {
storageTasks: make(map[uint64]*storageTaskBundle, 0), storageTasks: make(map[uint64]*storageTaskBundle, 0),
codeTasks: make(map[uint64]*byteCodeTasksBundle), codeTasks: make(map[uint64]*byteCodeTasksBundle),
healer: make(map[uint64]*healTask, 0), healer: make(map[uint64]*healTask, 0),
snapped: false,
} }
} }
@ -399,8 +406,6 @@ type FullStateDownloadManager struct {
storageSynced uint64 // Number of storage slots downloaded storageSynced uint64 // Number of storage slots downloaded
storageBytes common.StorageSize // Number of storage trie bytes persisted to disk storageBytes common.StorageSize // Number of storage trie bytes persisted to disk
pend sync.WaitGroup // Tracks network request goroutines for graceful shutdown
stateWriter ethdb.Batch // Shared batch writer used for persisting raw states stateWriter ethdb.Batch // Shared batch writer used for persisting raw states
accountHealed uint64 // Number of accounts downloaded during the healing stage accountHealed uint64 // Number of accounts downloaded during the healing stage
accountHealedBytes common.StorageSize // Number of raw account bytes persisted to disk during the healing stage accountHealedBytes common.StorageSize // Number of raw account bytes persisted to disk during the healing stage
@ -420,6 +425,9 @@ type FullStateDownloadManager struct {
bytecodeHealBytes common.StorageSize // Number of bytecodes persisted to disk bytecodeHealBytes common.StorageSize // Number of bytecodes persisted to disk
bytecodeHealDups uint64 // Number of bytecodes already processed bytecodeHealDups uint64 // Number of bytecodes already processed
bytecodeHealNops uint64 // Number of bytecodes not requested bytecodeHealNops uint64 // Number of bytecodes not requested
startTime time.Time // Time instance when snapshot sync started
logTime time.Time // Time instance when status was last reported
} }
func newFullStateDownloadManager(db ethdb.KeyValueStore, func newFullStateDownloadManager(db ethdb.KeyValueStore,
@ -430,18 +438,19 @@ func newFullStateDownloadManager(db ethdb.KeyValueStore,
logger zerolog.Logger) *FullStateDownloadManager { logger zerolog.Logger) *FullStateDownloadManager {
return &FullStateDownloadManager{ return &FullStateDownloadManager{
db: db, db: db,
scheme: scheme, scheme: scheme,
bc: bc, bc: bc,
stateWriter: db.NewBatch(), stateWriter: db.NewBatch(),
tx: tx, tx: tx,
keccak: sha3.NewLegacyKeccak256().(crypto.KeccakState), keccak: sha3.NewLegacyKeccak256().(crypto.KeccakState),
concurrency: concurrency, concurrency: concurrency,
logger: logger, logger: logger,
tasks: newTasks(), tasks: newTasks(),
requesting: newTasks(), requesting: newTasks(),
processing: newTasks(), processing: newTasks(),
retries: newTasks(), retries: newTasks(),
trienodeHealThrottle: maxTrienodeHealThrottle, // Tune downward instead of insta-filling with junk
} }
} }
@ -531,6 +540,12 @@ func (s *FullStateDownloadManager) commitHealer(force bool) {
utils.Logger().Debug().Str("type", "trienodes").Interface("bytes", common.StorageSize(batch.ValueSize())).Msg("Persisted set of healing data") utils.Logger().Debug().Str("type", "trienodes").Interface("bytes", common.StorageSize(batch.ValueSize())).Msg("Persisted set of healing data")
} }
func (s *FullStateDownloadManager) SyncStarted() {
if s.startTime == (time.Time{}) {
s.startTime = time.Now()
}
}
func (s *FullStateDownloadManager) SyncCompleted() { func (s *FullStateDownloadManager) SyncCompleted() {
defer func() { // Persist any progress, independent of failure defer func() { // Persist any progress, independent of failure
for _, task := range s.tasks.accountTasks { for _, task := range s.tasks.accountTasks {
@ -556,7 +571,8 @@ func (s *FullStateDownloadManager) SyncCompleted() {
utils.Logger().Debug().Interface("root", s.root).Msg("Terminating snapshot sync cycle") utils.Logger().Debug().Interface("root", s.root).Msg("Terminating snapshot sync cycle")
}() }()
utils.Logger().Debug().Msg("Snapshot sync already completed") elapsed := time.Since(s.startTime)
utils.Logger().Debug().Interface("elapsed", elapsed).Msg("Snapshot sync already completed")
} }
// getNextBatch returns objects with a maximum of n state download // getNextBatch returns objects with a maximum of n state download
@ -566,38 +582,30 @@ func (s *FullStateDownloadManager) GetNextBatch() (accounts []*accountTask,
storages *storageTaskBundle, storages *storageTaskBundle,
healtask *healTask, healtask *healTask,
codetask *healTask, codetask *healTask,
nItems int,
err error) { err error) {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
accounts, codes, storages, healtask, codetask = s.getBatchFromRetries() accounts, codes, storages, healtask, codetask, nItems = s.getBatchFromRetries()
nItems := len(accounts) + len(codes) + len(storages.roots) + len(healtask.hashes) + len(codetask.hashes)
if nItems > 0 { if nItems > 0 {
return return
} }
if len(s.tasks.accountTasks) == 0 && s.scheduler.Pending() == 0 { if len(s.tasks.accountTasks) == 0 && s.scheduler.Pending() == 0 {
if nItems == 0 { s.SyncCompleted()
s.SyncCompleted()
}
return return
} }
// Refill available tasks from the scheduler. // Refill available tasks from the scheduler.
withHealTasks := true newAccounts, newCodes, newStorageTaskBundle, newHealTask, newCodeTask, nItems := s.getBatchFromUnprocessed()
if healtask != nil || codetask != nil {
withHealTasks = false
}
newAccounts, newCodes, newStorageTaskBundle, newHealTask, newCodeTask := s.getBatchFromUnprocessed(withHealTasks)
accounts = append(accounts, newAccounts...) accounts = append(accounts, newAccounts...)
codes = append(codes, newCodes...) codes = append(codes, newCodes...)
storages = newStorageTaskBundle storages = newStorageTaskBundle
if withHealTasks { healtask = newHealTask
healtask = newHealTask codetask = newCodeTask
codetask = newCodeTask
}
return return
} }
@ -714,7 +722,7 @@ func (s *FullStateDownloadManager) loadSyncStatus() {
// Either we've failed to decode the previous state, or there was none. // Either we've failed to decode the previous state, or there was none.
// Start a fresh sync by chunking up the account range and scheduling // Start a fresh sync by chunking up the account range and scheduling
// them for retrieval. // them for retrieval.
s.tasks.accountTasks = nil s.tasks = newTasks()
s.accountSynced, s.accountBytes = 0, 0 s.accountSynced, s.accountBytes = 0, 0
s.bytecodeSynced, s.bytecodeBytes = 0, 0 s.bytecodeSynced, s.bytecodeBytes = 0, 0
s.storageSynced, s.storageBytes = 0, 0 s.storageSynced, s.storageBytes = 0, 0
@ -921,16 +929,18 @@ func (s *FullStateDownloadManager) updateStats(written, duplicate, unexpected in
// getBatchFromUnprocessed returns objects with a maximum of n unprocessed state download // getBatchFromUnprocessed returns objects with a maximum of n unprocessed state download
// tasks to send to the remote peer. // tasks to send to the remote peer.
func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) ( func (s *FullStateDownloadManager) getBatchFromUnprocessed() (
accounts []*accountTask, accounts []*accountTask,
codes []*byteCodeTasksBundle, codes []*byteCodeTasksBundle,
storages *storageTaskBundle, storages *storageTaskBundle,
healtask *healTask, healtask *healTask,
codetask *healTask) { codetask *healTask,
count int) {
// over trie nodes as those can be written to disk and forgotten about. // over trie nodes as those can be written to disk and forgotten about.
codes = make([]*byteCodeTasksBundle, 0) codes = make([]*byteCodeTasksBundle, 0)
accounts = make([]*accountTask, 0) accounts = make([]*accountTask, 0)
count = 0
for i, task := range s.tasks.accountTasks { for i, task := range s.tasks.accountTasks {
// Stop when we've gathered enough requests // Stop when we've gathered enough requests
@ -956,12 +966,18 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) (
break break
} }
task.root = s.root
task.origin = task.Next
task.limit = task.Last
task.cap = maxRequestSize
task.requested = true
s.tasks.accountTasks[i].requested = true s.tasks.accountTasks[i].requested = true
accounts = append(accounts, task) accounts = append(accounts, task)
s.requesting.addAccountTask(task.id, task) s.requesting.addAccountTask(task.id, task)
s.tasks.addAccountTask(task.id, task) s.tasks.addAccountTask(task.id, task)
// one task account is enough for an stream // one task account is enough for an stream
count = len(accounts)
return return
} }
@ -997,6 +1013,7 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) (
id: taskID, id: taskID,
hashes: hashes, hashes: hashes,
task: task, task: task,
cap: maxRequestSize,
} }
codes = append(codes, bytecodeTask) codes = append(codes, bytecodeTask)
@ -1005,12 +1022,14 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) (
// Stop when we've gathered enough requests // Stop when we've gathered enough requests
if totalHashes >= maxCodeRequestCount { if totalHashes >= maxCodeRequestCount {
count = totalHashes
return return
} }
} }
// if we found some codes, can assign it to node // if we found some codes, can assign it to node
if totalHashes > 0 { if totalHashes > 0 {
count = totalHashes
return return
} }
@ -1020,14 +1039,8 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) (
continue continue
} }
// TODO: check cap calculations (shouldn't give us big chunk) cap := maxRequestSize
// if cap > maxRequestSize { storageSets := cap / 1024
// cap = maxRequestSize
// }
// if cap < minRequestSize { // Don't bother with peers below a bare minimum performance
// cap = minRequestSize
// }
storageSets := maxRequestSize / 1024
storages = &storageTaskBundle{ storages = &storageTaskBundle{
accounts: make([]common.Hash, 0, storageSets), accounts: make([]common.Hash, 0, storageSets),
@ -1089,23 +1102,21 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) (
storages.origin = storages.subtask.Next storages.origin = storages.subtask.Next
storages.limit = storages.subtask.Last storages.limit = storages.subtask.Last
} }
storages.root = s.root
storages.cap = cap
s.tasks.addStorageTaskBundle(taskID, storages) s.tasks.addStorageTaskBundle(taskID, storages)
s.requesting.addStorageTaskBundle(taskID, storages) s.requesting.addStorageTaskBundle(taskID, storages)
count = len(storages.accounts)
return return
} }
if len(storages.accounts) > 0 { if len(storages.accounts) > 0 {
return count = len(storages.accounts)
}
if !withHealTasks {
return return
} }
// Sync phase done, run heal phase // Sync phase done, run heal phase
// Iterate over pending tasks
// Iterate over pending tasks and try to find a peer to retrieve with
for (len(s.tasks.healer) > 0 && len(s.tasks.healer[0].hashes) > 0) || s.scheduler.Pending() > 0 { for (len(s.tasks.healer) > 0 && len(s.tasks.healer[0].hashes) > 0) || s.scheduler.Pending() > 0 {
// If there are not enough trie tasks queued to fully assign, fill the // If there are not enough trie tasks queued to fully assign, fill the
// queue from the state sync scheduler. The trie synced schedules these // queue from the state sync scheduler. The trie synced schedules these
@ -1129,7 +1140,7 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) (
// If all the heal tasks are bytecodes or already downloading, bail // If all the heal tasks are bytecodes or already downloading, bail
if len(s.tasks.healer[0].trieTasks) == 0 { if len(s.tasks.healer[0].trieTasks) == 0 {
return break
} }
// Generate the network query and send it to the peer // Generate the network query and send it to the peer
// if cap > maxTrieRequestCount { // if cap > maxTrieRequestCount {
@ -1177,6 +1188,7 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) (
pathsets: pathsets, pathsets: pathsets,
root: s.root, root: s.root,
task: s.tasks.healer[0], task: s.tasks.healer[0],
bytes: maxRequestSize,
byteCodeReq: false, byteCodeReq: false,
} }
@ -1184,6 +1196,7 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) (
s.requesting.addHealerTask(taskID, healtask) s.requesting.addHealerTask(taskID, healtask)
if len(hashes) > 0 { if len(hashes) > 0 {
count = len(hashes)
return return
} }
} }
@ -1205,7 +1218,7 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) (
// If all the heal tasks are trienodes or already downloading, bail // If all the heal tasks are trienodes or already downloading, bail
if len(s.tasks.healer[0].codeTasks) == 0 { if len(s.tasks.healer[0].codeTasks) == 0 {
return break
} }
// Task pending retrieval, try to find an idle peer. If no such peer // Task pending retrieval, try to find an idle peer. If no such peer
// exists, we probably assigned tasks for all (or they are stateless). // exists, we probably assigned tasks for all (or they are stateless).
@ -1243,9 +1256,10 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) (
id: taskID, id: taskID,
hashes: hashes, hashes: hashes,
task: s.tasks.healer[0], task: s.tasks.healer[0],
bytes: maxRequestSize,
byteCodeReq: true, byteCodeReq: true,
} }
count = len(hashes)
s.tasks.healer[taskID] = codetask s.tasks.healer[taskID] = codetask
s.requesting.addHealerTask(taskID, healtask) s.requesting.addHealerTask(taskID, healtask)
} }
@ -1272,7 +1286,8 @@ func (s *FullStateDownloadManager) getBatchFromRetries() (
codes []*byteCodeTasksBundle, codes []*byteCodeTasksBundle,
storages *storageTaskBundle, storages *storageTaskBundle,
healtask *healTask, healtask *healTask,
codetask *healTask) { codetask *healTask,
count int) {
// over trie nodes as those can be written to disk and forgotten about. // over trie nodes as those can be written to disk and forgotten about.
accounts = make([]*accountTask, 0) accounts = make([]*accountTask, 0)
@ -1290,6 +1305,7 @@ func (s *FullStateDownloadManager) getBatchFromRetries() (
} }
if len(accounts) > 0 { if len(accounts) > 0 {
count = len(accounts)
return return
} }
@ -1301,6 +1317,7 @@ func (s *FullStateDownloadManager) getBatchFromRetries() (
} }
if len(codes) > 0 { if len(codes) > 0 {
count = len(codes)
return return
} }
@ -1316,10 +1333,7 @@ func (s *FullStateDownloadManager) getBatchFromRetries() (
} }
s.requesting.addStorageTaskBundle(storages.id, storages) s.requesting.addStorageTaskBundle(storages.id, storages)
s.retries.deleteStorageTaskBundle(storages.id) s.retries.deleteStorageTaskBundle(storages.id)
return count = len(storages.accounts)
}
if len(storages.accounts) > 0 {
return return
} }
@ -1338,6 +1352,7 @@ func (s *FullStateDownloadManager) getBatchFromRetries() (
} }
s.requesting.addHealerTask(id, task) s.requesting.addHealerTask(id, task)
s.retries.deleteHealerTask(id) s.retries.deleteHealerTask(id)
count = len(task.hashes)
return return
} }
if task.byteCodeReq { if task.byteCodeReq {
@ -1352,11 +1367,13 @@ func (s *FullStateDownloadManager) getBatchFromRetries() (
} }
s.requesting.addHealerTask(id, task) s.requesting.addHealerTask(id, task)
s.retries.deleteHealerTask(id) s.retries.deleteHealerTask(id)
count = len(task.hashes)
return return
} }
} }
} }
count = 0
return return
} }
@ -1371,14 +1388,18 @@ func (s *FullStateDownloadManager) HandleRequestError(accounts []*accountTask,
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
for _, task := range accounts { if accounts != nil && len(accounts) > 0 {
s.requesting.deleteAccountTask(task.id) for _, task := range accounts {
s.retries.addAccountTask(task.id, task) s.requesting.deleteAccountTask(task.id)
s.retries.addAccountTask(task.id, task)
}
} }
for _, code := range codes { if codes != nil && len(codes) > 0 {
s.requesting.deleteCodeTask(code.id) for _, code := range codes {
s.retries.addCodeTask(code.id, code) s.requesting.deleteCodeTask(code.id)
s.retries.addCodeTask(code.id, code)
}
} }
if storages != nil { if storages != nil {

@ -90,6 +90,7 @@ func CreateStagedSync(ctx context.Context,
stageBodiesCfg := NewStageBodiesCfg(bc, mainDB, dbs, config.Concurrency, protocol, isBeaconNode, extractReceiptHashes, config.LogProgress) stageBodiesCfg := NewStageBodiesCfg(bc, mainDB, dbs, config.Concurrency, protocol, isBeaconNode, extractReceiptHashes, config.LogProgress)
stageStatesCfg := NewStageStatesCfg(bc, mainDB, dbs, config.Concurrency, logger, config.LogProgress) stageStatesCfg := NewStageStatesCfg(bc, mainDB, dbs, config.Concurrency, logger, config.LogProgress)
stageStateSyncCfg := NewStageStateSyncCfg(bc, mainDB, config.Concurrency, protocol, logger, config.LogProgress) stageStateSyncCfg := NewStageStateSyncCfg(bc, mainDB, config.Concurrency, protocol, logger, config.LogProgress)
stageFullStateSyncCfg := NewStageFullStateSyncCfg(bc, mainDB, config.Concurrency, protocol, logger, config.LogProgress)
stageReceiptsCfg := NewStageReceiptsCfg(bc, mainDB, dbs, config.Concurrency, protocol, isBeaconNode, config.LogProgress) stageReceiptsCfg := NewStageReceiptsCfg(bc, mainDB, dbs, config.Concurrency, protocol, isBeaconNode, config.LogProgress)
lastMileCfg := NewStageLastMileCfg(ctx, bc, mainDB) lastMileCfg := NewStageLastMileCfg(ctx, bc, mainDB)
stageFinishCfg := NewStageFinishCfg(mainDB) stageFinishCfg := NewStageFinishCfg(mainDB)
@ -103,6 +104,7 @@ func CreateStagedSync(ctx context.Context,
stageShortRangeCfg, stageShortRangeCfg,
stageBodiesCfg, stageBodiesCfg,
stageStateSyncCfg, stageStateSyncCfg,
stageFullStateSyncCfg,
stageStatesCfg, stageStatesCfg,
stageReceiptsCfg, stageReceiptsCfg,
lastMileCfg, lastMileCfg,

@ -850,15 +850,11 @@ func (pool *TxPool) validateStakingTx(tx *staking.StakingTransaction) error {
} }
currentBlockNumber := pool.chain.CurrentBlock().Number() currentBlockNumber := pool.chain.CurrentBlock().Number()
pendingBlockNumber := new(big.Int).Add(currentBlockNumber, big.NewInt(1)) pendingBlockNumber := new(big.Int).Add(currentBlockNumber, big.NewInt(1))
pendingEpoch := pool.chain.CurrentBlock().Epoch()
if shard.Schedule.IsLastBlock(currentBlockNumber.Uint64()) {
pendingEpoch = new(big.Int).Add(pendingEpoch, big.NewInt(1))
}
chainContext, ok := pool.chain.(ChainContext) chainContext, ok := pool.chain.(ChainContext)
if !ok { if !ok {
chainContext = nil // might use testing blockchain, set to nil for verifier to handle. chainContext = nil // might use testing blockchain, set to nil for verifier to handle.
} }
_, err = VerifyAndCreateValidatorFromMsg(pool.currentState, chainContext, pendingEpoch, pendingBlockNumber, stkMsg) _, err = VerifyAndCreateValidatorFromMsg(pool.currentState, chainContext, pool.pendingEpoch(), pendingBlockNumber, stkMsg)
return err return err
case staking.DirectiveEditValidator: case staking.DirectiveEditValidator:
msg, err := staking.RLPDecodeStakeMsg(tx.Data(), staking.DirectiveEditValidator) msg, err := staking.RLPDecodeStakeMsg(tx.Data(), staking.DirectiveEditValidator)
@ -964,11 +960,12 @@ func (pool *TxPool) validateStakingTx(tx *staking.StakingTransaction) error {
} }
} }
// pendingEpoch refers to the epoch of the pending block
func (pool *TxPool) pendingEpoch() *big.Int { func (pool *TxPool) pendingEpoch() *big.Int {
currentBlock := pool.chain.CurrentBlock() currentBlock := pool.chain.CurrentBlock()
pendingEpoch := currentBlock.Epoch() pendingEpoch := currentBlock.Epoch()
if shard.Schedule.IsLastBlock(currentBlock.Number().Uint64()) { if shard.Schedule.IsLastBlock(currentBlock.Number().Uint64()) {
pendingEpoch.Add(pendingEpoch, big.NewInt(1)) pendingEpoch = new(big.Int).Add(pendingEpoch, common.Big1)
} }
return pendingEpoch return pendingEpoch
} }

@ -143,6 +143,11 @@ func (hmy *Harmony) IsNoEarlyUnlockEpoch(epoch *big.Int) bool {
return hmy.BlockChain.Config().IsNoEarlyUnlock(epoch) return hmy.BlockChain.Config().IsNoEarlyUnlock(epoch)
} }
// IsMaxRate ...
func (hmy *Harmony) IsMaxRate(epoch *big.Int) bool {
return hmy.BlockChain.Config().IsMaxRate(epoch)
}
// IsCommitteeSelectionBlock checks if the given block is the committee selection block // IsCommitteeSelectionBlock checks if the given block is the committee selection block
func (hmy *Harmony) IsCommitteeSelectionBlock(header *block.Header) bool { func (hmy *Harmony) IsCommitteeSelectionBlock(header *block.Header) bool {
return chain.IsCommitteeSelectionBlock(hmy.BlockChain, header) return chain.IsCommitteeSelectionBlock(hmy.BlockChain, header)
@ -592,6 +597,7 @@ func (hmy *Harmony) GetUndelegationPayouts(
return undelegationPayouts, nil return undelegationPayouts, nil
} }
isMaxRate := hmy.IsMaxRate(epoch)
lockingPeriod := hmy.GetDelegationLockingPeriodInEpoch(undelegationPayoutBlock.Epoch()) lockingPeriod := hmy.GetDelegationLockingPeriodInEpoch(undelegationPayoutBlock.Epoch())
for _, validator := range hmy.GetAllValidatorAddresses() { for _, validator := range hmy.GetAllValidatorAddresses() {
wrapper, err := hmy.BlockChain.ReadValidatorInformationAtRoot(validator, undelegationPayoutBlock.Root()) wrapper, err := hmy.BlockChain.ReadValidatorInformationAtRoot(validator, undelegationPayoutBlock.Root())
@ -600,7 +606,7 @@ func (hmy *Harmony) GetUndelegationPayouts(
} }
noEarlyUnlock := hmy.IsNoEarlyUnlockEpoch(epoch) noEarlyUnlock := hmy.IsNoEarlyUnlockEpoch(epoch)
for _, delegation := range wrapper.Delegations { for _, delegation := range wrapper.Delegations {
withdraw := delegation.RemoveUnlockedUndelegations(epoch, wrapper.LastEpochInCommittee, lockingPeriod, noEarlyUnlock) withdraw := delegation.RemoveUnlockedUndelegations(epoch, wrapper.LastEpochInCommittee, lockingPeriod, noEarlyUnlock, isMaxRate)
if withdraw.Cmp(bigZero) == 1 { if withdraw.Cmp(bigZero) == 1 {
undelegationPayouts.SetPayoutByDelegatorAddrAndValidatorAddr(validator, delegation.DelegatorAddress, withdraw) undelegationPayouts.SetPayoutByDelegatorAddrAndValidatorAddr(validator, delegation.DelegatorAddress, withdraw)
} }

@ -397,9 +397,15 @@ func payoutUndelegations(
const msg = "[Finalize] failed to read all validators" const msg = "[Finalize] failed to read all validators"
return errors.New(msg) return errors.New(msg)
} }
// Payout undelegated/unlocked tokens // Payout undelegated/unlocked tokens at the end of each epoch
lockPeriod := GetLockPeriodInEpoch(chain, header.Epoch()) lockPeriod := GetLockPeriodInEpoch(chain, header.Epoch())
noEarlyUnlock := chain.Config().IsNoEarlyUnlock(header.Epoch()) noEarlyUnlock := chain.Config().IsNoEarlyUnlock(header.Epoch())
newShardState, err := header.GetShardState()
if err != nil {
const msg = "[Finalize] failed to read shard state"
return errors.New(msg)
}
isMaxRate := chain.Config().IsMaxRate(newShardState.Epoch)
for _, validator := range validators { for _, validator := range validators {
wrapper, err := state.ValidatorWrapper(validator, true, false) wrapper, err := state.ValidatorWrapper(validator, true, false)
if err != nil { if err != nil {
@ -410,7 +416,7 @@ func payoutUndelegations(
for i := range wrapper.Delegations { for i := range wrapper.Delegations {
delegation := &wrapper.Delegations[i] delegation := &wrapper.Delegations[i]
totalWithdraw := delegation.RemoveUnlockedUndelegations( totalWithdraw := delegation.RemoveUnlockedUndelegations(
header.Epoch(), wrapper.LastEpochInCommittee, lockPeriod, noEarlyUnlock, header.Epoch(), wrapper.LastEpochInCommittee, lockPeriod, noEarlyUnlock, isMaxRate,
) )
if totalWithdraw.Sign() != 0 { if totalWithdraw.Sign() != 0 {
state.AddBalance(delegation.DelegatorAddress, totalWithdraw) state.AddBalance(delegation.DelegatorAddress, totalWithdraw)
@ -533,6 +539,7 @@ func setElectionEpochAndMinFee(chain engine.ChainReader, header *block.Header, s
map[common.Address]struct{}, map[common.Address]struct{},
len(newShardState.StakedValidators().Addrs), len(newShardState.StakedValidators().Addrs),
) )
// this loop is for elected validators only
for _, addr := range newShardState.StakedValidators().Addrs { for _, addr := range newShardState.StakedValidators().Addrs {
wrapper, err := state.ValidatorWrapper(addr, true, false) wrapper, err := state.ValidatorWrapper(addr, true, false)
if err != nil { if err != nil {
@ -566,8 +573,9 @@ func setElectionEpochAndMinFee(chain engine.ChainReader, header *block.Header, s
// due to a bug in the old implementation of the minimum fee, // due to a bug in the old implementation of the minimum fee,
// unelected validators did not have their fee updated even // unelected validators did not have their fee updated even
// when the protocol required them to do so. here we fix it, // when the protocol required them to do so. here we fix it,
// but only after the HIP-30 hard fork is effective. // but only after the HIP-30 hard fork is effective
if config.IsHIP30(newShardState.Epoch) { // this loop applies to all validators, but excludes the ones in isElected
if config.IsHIP30(newShardState.Epoch) && minRateNotZero {
for _, addr := range chain.ValidatorCandidates() { for _, addr := range chain.ValidatorCandidates() {
// skip elected validator // skip elected validator
if _, ok := isElected[addr]; ok { if _, ok := isElected[addr]; ok {
@ -581,6 +589,19 @@ func setElectionEpochAndMinFee(chain engine.ChainReader, header *block.Header, s
} }
} }
} }
// for all validators which have MaxRate < minRate + maxChangeRate
// set their MaxRate equal to the minRate + MaxChangeRate
// this will allow the wrapper.SanityCheck to pass if Rate is set to a value
// higher than the the MaxRate by UpdateMinimumCommissionFee above
if config.IsMaxRate(newShardState.Epoch) && minRateNotZero {
for _, addr := range chain.ValidatorCandidates() {
if _, err := availability.UpdateMaxCommissionFee(state, addr, minRate); err != nil {
return err
}
}
}
return nil return nil
} }

@ -40,6 +40,8 @@ const (
func (ps partnerSchedule) InstanceForEpoch(epoch *big.Int) Instance { func (ps partnerSchedule) InstanceForEpoch(epoch *big.Int) Instance {
switch { switch {
case params.PartnerChainConfig.IsDevnetExternalEpoch(epoch):
return partnerV3
case params.PartnerChainConfig.IsHIP30(epoch): case params.PartnerChainConfig.IsHIP30(epoch):
return partnerV2 return partnerV2
case epoch.Cmp(params.PartnerChainConfig.StakingEpoch) >= 0: case epoch.Cmp(params.PartnerChainConfig.StakingEpoch) >= 0:
@ -111,3 +113,11 @@ var partnerV2 = MustNewInstance(
hip30CollectionAddressTestnet, partnerReshardingEpoch, hip30CollectionAddressTestnet, partnerReshardingEpoch,
PartnerSchedule.BlocksPerEpoch(), PartnerSchedule.BlocksPerEpoch(),
) )
var partnerV3 = MustNewInstance(
2, 5, 1, 0,
numeric.MustNewDecFromStr("0.1"), genesis.TNHarmonyAccounts,
genesis.TNFoundationalAccounts, emptyAllowlist,
feeCollectorsDevnet[1], numeric.MustNewDecFromStr("0.25"),
hip30CollectionAddressTestnet, partnerReshardingEpoch,
PartnerSchedule.BlocksPerEpoch(),
)

@ -77,6 +77,7 @@ var (
NoNilDelegationsEpoch: EpochTBD, NoNilDelegationsEpoch: EpochTBD,
BlockGas30MEpoch: big.NewInt(1673), // 2023-11-02 17:30:00+00:00 BlockGas30MEpoch: big.NewInt(1673), // 2023-11-02 17:30:00+00:00
MaxRateEpoch: EpochTBD, MaxRateEpoch: EpochTBD,
DevnetExternalEpoch: EpochTBD,
} }
// TestnetChainConfig contains the chain parameters to run a node on the harmony test network. // TestnetChainConfig contains the chain parameters to run a node on the harmony test network.
@ -122,6 +123,7 @@ var (
NoNilDelegationsEpoch: EpochTBD, NoNilDelegationsEpoch: EpochTBD,
BlockGas30MEpoch: big.NewInt(2176), // 2023-10-12 10:00:00+00:00 BlockGas30MEpoch: big.NewInt(2176), // 2023-10-12 10:00:00+00:00
MaxRateEpoch: EpochTBD, MaxRateEpoch: EpochTBD,
DevnetExternalEpoch: EpochTBD,
} }
// PangaeaChainConfig contains the chain parameters for the Pangaea network. // PangaeaChainConfig contains the chain parameters for the Pangaea network.
// All features except for CrossLink are enabled at launch. // All features except for CrossLink are enabled at launch.
@ -167,6 +169,7 @@ var (
NoNilDelegationsEpoch: EpochTBD, NoNilDelegationsEpoch: EpochTBD,
BlockGas30MEpoch: big.NewInt(0), BlockGas30MEpoch: big.NewInt(0),
MaxRateEpoch: EpochTBD, MaxRateEpoch: EpochTBD,
DevnetExternalEpoch: EpochTBD,
} }
// PartnerChainConfig contains the chain parameters for the Partner network. // PartnerChainConfig contains the chain parameters for the Partner network.
@ -213,6 +216,7 @@ var (
BlockGas30MEpoch: big.NewInt(7), BlockGas30MEpoch: big.NewInt(7),
NoNilDelegationsEpoch: EpochTBD, NoNilDelegationsEpoch: EpochTBD,
MaxRateEpoch: EpochTBD, MaxRateEpoch: EpochTBD,
DevnetExternalEpoch: EpochTBD,
} }
// StressnetChainConfig contains the chain parameters for the Stress test network. // StressnetChainConfig contains the chain parameters for the Stress test network.
@ -259,6 +263,7 @@ var (
NoNilDelegationsEpoch: big.NewInt(2), NoNilDelegationsEpoch: big.NewInt(2),
BlockGas30MEpoch: big.NewInt(0), BlockGas30MEpoch: big.NewInt(0),
MaxRateEpoch: EpochTBD, MaxRateEpoch: EpochTBD,
DevnetExternalEpoch: EpochTBD,
} }
// LocalnetChainConfig contains the chain parameters to run for local development. // LocalnetChainConfig contains the chain parameters to run for local development.
@ -304,6 +309,7 @@ var (
NoNilDelegationsEpoch: big.NewInt(2), NoNilDelegationsEpoch: big.NewInt(2),
BlockGas30MEpoch: big.NewInt(0), BlockGas30MEpoch: big.NewInt(0),
MaxRateEpoch: EpochTBD, MaxRateEpoch: EpochTBD,
DevnetExternalEpoch: EpochTBD,
} }
// AllProtocolChanges ... // AllProtocolChanges ...
@ -351,6 +357,7 @@ var (
big.NewInt(0), // HIP30Epoch big.NewInt(0), // HIP30Epoch
big.NewInt(0), // NoNilDelegationsEpoch big.NewInt(0), // NoNilDelegationsEpoch
big.NewInt(0), // MaxRateEpoch big.NewInt(0), // MaxRateEpoch
big.NewInt(0),
} }
// TestChainConfig ... // TestChainConfig ...
@ -398,6 +405,7 @@ var (
big.NewInt(0), // NoNilDelegationsEpoch big.NewInt(0), // NoNilDelegationsEpoch
big.NewInt(0), // BlockGas30M big.NewInt(0), // BlockGas30M
big.NewInt(0), // MaxRateEpoch big.NewInt(0), // MaxRateEpoch
big.NewInt(0),
} }
// TestRules ... // TestRules ...
@ -565,6 +573,8 @@ type ChainConfig struct {
// 4. Change the minimum validator commission from 5 to 7% (all nets) // 4. Change the minimum validator commission from 5 to 7% (all nets)
HIP30Epoch *big.Int `json:"hip30-epoch,omitempty"` HIP30Epoch *big.Int `json:"hip30-epoch,omitempty"`
DevnetExternalEpoch *big.Int `json:"devnet-external-epoch,omitempty"`
BlockGas30MEpoch *big.Int `json:"block-gas-30m-epoch,omitempty"` BlockGas30MEpoch *big.Int `json:"block-gas-30m-epoch,omitempty"`
// MaxRateEpoch will make sure the validator max-rate is at least equal to the minRate + the validator max-rate-increase // MaxRateEpoch will make sure the validator max-rate is at least equal to the minRate + the validator max-rate-increase
@ -647,6 +657,9 @@ func (c *ChainConfig) mustValid() {
// capabilities required to transfer balance across shards // capabilities required to transfer balance across shards
require(c.HIP30Epoch.Cmp(c.CrossTxEpoch) > 0, require(c.HIP30Epoch.Cmp(c.CrossTxEpoch) > 0,
"must satisfy: HIP30Epoch > CrossTxEpoch") "must satisfy: HIP30Epoch > CrossTxEpoch")
// max rate (7%) fix is applied on or after hip30
require(c.MaxRateEpoch.Cmp(c.HIP30Epoch) >= 0,
"must satisfy: MaxRateEpoch >= HIP30Epoch")
} }
// IsEIP155 returns whether epoch is either equal to the EIP155 fork epoch or greater. // IsEIP155 returns whether epoch is either equal to the EIP155 fork epoch or greater.
@ -844,6 +857,10 @@ func (c *ChainConfig) IsHIP30(epoch *big.Int) bool {
return isForked(c.HIP30Epoch, epoch) return isForked(c.HIP30Epoch, epoch)
} }
func (c *ChainConfig) IsDevnetExternalEpoch(epoch *big.Int) bool {
return isForked(c.DevnetExternalEpoch, epoch)
}
func (c *ChainConfig) IsMaxRate(epoch *big.Int) bool { func (c *ChainConfig) IsMaxRate(epoch *big.Int) bool {
return isForked(c.MaxRateEpoch, epoch) return isForked(c.MaxRateEpoch, epoch)
} }

@ -464,7 +464,11 @@ func (s *PublicBlockchainService) GetBlockReceipts(
r, err = v2.NewReceipt(tx, blockHash, block.NumberU64(), index, rmap[tx.Hash()]) r, err = v2.NewReceipt(tx, blockHash, block.NumberU64(), index, rmap[tx.Hash()])
case Eth: case Eth:
if tx, ok := tx.(*types.Transaction); ok { if tx, ok := tx.(*types.Transaction); ok {
r, err = eth.NewReceipt(tx.ConvertToEth(), blockHash, block.NumberU64(), index, rmap[tx.Hash()]) from, err := tx.SenderAddress()
if err != nil {
return nil, err
}
r, err = eth.NewReceipt(from, tx.ConvertToEth(), blockHash, block.NumberU64(), index, rmap[tx.Hash()])
} }
default: default:
return nil, ErrUnknownRPCVersion return nil, ErrUnknownRPCVersion

@ -74,19 +74,9 @@ type Transaction struct {
// representation, with the given location metadata set (if available). // representation, with the given location metadata set (if available).
// Note that all txs on Harmony are replay protected (post EIP155 epoch). // Note that all txs on Harmony are replay protected (post EIP155 epoch).
func NewTransaction( func NewTransaction(
tx *types.EthTransaction, blockHash common.Hash, from common.Address, tx *types.EthTransaction, blockHash common.Hash,
blockNumber uint64, timestamp uint64, index uint64, blockNumber uint64, timestamp uint64, index uint64,
) (*Transaction, error) { ) (*Transaction, error) {
from := common.Address{}
var err error
if tx.IsEthCompatible() {
from, err = tx.SenderAddress()
} else {
from, err = tx.ConvertToHmy().SenderAddress()
}
if err != nil {
return nil, err
}
v, r, s := tx.RawSignatureValues() v, r, s := tx.RawSignatureValues()
result := &Transaction{ result := &Transaction{
@ -143,14 +133,9 @@ func NewTransactionFromTransaction(
} }
// NewReceipt returns the RPC data for a new receipt // NewReceipt returns the RPC data for a new receipt
func NewReceipt(tx *types.EthTransaction, blockHash common.Hash, blockNumber, blockIndex uint64, receipt *types.Receipt) (map[string]interface{}, error) { func NewReceipt(senderAddr common.Address, tx *types.EthTransaction, blockHash common.Hash, blockNumber, blockIndex uint64, receipt *types.Receipt) (map[string]interface{}, error) {
senderAddr, err := tx.SenderAddress()
if err != nil {
return nil, err
}
ethTxHash := tx.Hash() ethTxHash := tx.Hash()
for i, _ := range receipt.Logs { for i := range receipt.Logs {
// Override log txHash with receipt's // Override log txHash with receipt's
receipt.Logs[i].TxHash = ethTxHash receipt.Logs[i].TxHash = ethTxHash
} }
@ -240,7 +225,11 @@ func blockWithFullTxFromBlock(b *types.Block) (*BlockWithFullTx, error) {
} }
for idx, tx := range b.Transactions() { for idx, tx := range b.Transactions() {
fmtTx, err := NewTransaction(tx.ConvertToEth(), b.Hash(), b.NumberU64(), b.Time().Uint64(), uint64(idx)) from, err := tx.SenderAddress()
if err != nil {
return nil, err
}
fmtTx, err := NewTransaction(from, tx.ConvertToEth(), b.Hash(), b.NumberU64(), b.Time().Uint64(), uint64(idx))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -257,5 +246,10 @@ func NewTransactionFromBlockIndex(b *types.Block, index uint64) (*Transaction, e
"tx index %v greater than or equal to number of transactions on block %v", index, b.Hash().String(), "tx index %v greater than or equal to number of transactions on block %v", index, b.Hash().String(),
) )
} }
return NewTransaction(txs[index].ConvertToEth(), b.Hash(), b.NumberU64(), b.Time().Uint64(), index) tx := txs[index].ConvertToEth()
from, err := tx.SenderAddress()
if err != nil {
return nil, err
}
return NewTransaction(from, tx, b.Hash(), b.NumberU64(), b.Time().Uint64(), index)
} }

@ -253,7 +253,14 @@ func (s *PublicPoolService) PendingTransactions(
continue // Legacy behavior is to not return error here continue // Legacy behavior is to not return error here
} }
case Eth: case Eth:
tx, err = eth.NewTransaction(plainTx.ConvertToEth(), common.Hash{}, 0, 0, 0) from, err := plainTx.SenderAddress()
if err != nil {
utils.Logger().Debug().
Err(err).
Msgf("%v error at %v", LogTag, "PendingTransactions")
continue // Legacy behavior is to not return error here
}
tx, err = eth.NewTransaction(from, plainTx.ConvertToEth(), common.Hash{}, 0, 0, 0)
if err != nil { if err != nil {
utils.Logger().Debug(). utils.Logger().Debug().
Err(err). Err(err).

@ -236,7 +236,13 @@ func (s *PublicTransactionService) newRPCTransaction(tx *types.Transaction, bloc
} }
return NewStructuredResponse(tx) return NewStructuredResponse(tx)
case Eth: case Eth:
tx, err := eth.NewTransactionFromTransaction(tx, blockHash, blockNumber, timestamp, index) // calculate SenderAddress before ConvertToEth
senderAddr, err := tx.SenderAddress()
if err != nil {
DoMetricRPCQueryInfo(GetTransactionByHash, FailedNumber)
return nil, err
}
tx, err := eth.NewTransaction(senderAddr, tx.ConvertToEth(), blockHash, blockNumber, timestamp, index)
if err != nil { if err != nil {
DoMetricRPCQueryInfo(GetTransactionByHash, FailedNumber) DoMetricRPCQueryInfo(GetTransactionByHash, FailedNumber)
return nil, err return nil, err
@ -751,7 +757,7 @@ func (s *PublicTransactionService) GetTransactionReceipt(
return nil, err return nil, err
} }
return NewStructuredResponse(RPCReceipt) return NewStructuredResponse(RPCReceipt)
case V2, Eth: case V2:
if tx == nil { if tx == nil {
RPCReceipt, err = v2.NewReceipt(stx, blockHash, blockNumber, index, receipt) RPCReceipt, err = v2.NewReceipt(stx, blockHash, blockNumber, index, receipt)
} else { } else {
@ -761,6 +767,19 @@ func (s *PublicTransactionService) GetTransactionReceipt(
return nil, err return nil, err
} }
return NewStructuredResponse(RPCReceipt) return NewStructuredResponse(RPCReceipt)
case Eth:
if tx != nil {
// calculate SenderAddress before ConvertToEth
senderAddr, err := tx.SenderAddress()
if err != nil {
return nil, err
}
RPCReceipt, err = eth.NewReceipt(senderAddr, tx.ConvertToEth(), blockHash, blockNumber, index, receipt)
}
if err != nil {
return nil, err
}
return NewStructuredResponse(RPCReceipt)
default: default:
return nil, ErrUnknownRPCVersion return nil, ErrUnknownRPCVersion
} }

@ -193,15 +193,21 @@ func (d *Delegation) DeleteEntry(epoch *big.Int) {
// RemoveUnlockedUndelegations removes all fully unlocked // RemoveUnlockedUndelegations removes all fully unlocked
// undelegations and returns the total sum // undelegations and returns the total sum
func (d *Delegation) RemoveUnlockedUndelegations( func (d *Delegation) RemoveUnlockedUndelegations(
curEpoch, lastEpochInCommittee *big.Int, lockPeriod int, noEarlyUnlock bool, curEpoch, lastEpochInCommittee *big.Int, lockPeriod int, noEarlyUnlock bool, isMaxRate bool,
) *big.Int { ) *big.Int {
totalWithdraw := big.NewInt(0) totalWithdraw := big.NewInt(0)
count := 0 count := 0
for j := range d.Undelegations { for j := range d.Undelegations {
if big.NewInt(0).Sub(curEpoch, d.Undelegations[j].Epoch).Int64() >= int64(lockPeriod) || epochsSinceUndelegation := big.NewInt(0).Sub(curEpoch, d.Undelegations[j].Epoch).Int64()
(!noEarlyUnlock && big.NewInt(0).Sub(curEpoch, lastEpochInCommittee).Int64() >= int64(lockPeriod)) { // >=7 epochs have passed since undelegation, or
// need to wait at least 7 epochs to withdraw; or the validator has been out of committee for 7 epochs lockPeriodApplies := epochsSinceUndelegation >= int64(lockPeriod)
totalWithdraw.Add(totalWithdraw, d.Undelegations[j].Amount) // >=7 epochs have passed since unelection during the noEarlyUnlock configuration
earlyUnlockPeriodApplies := big.NewInt(0).Sub(curEpoch, lastEpochInCommittee).Int64() >= int64(lockPeriod) && !noEarlyUnlock
maxRateApplies := isMaxRate && epochsSinceUndelegation > int64(lockPeriod)
if lockPeriodApplies || earlyUnlockPeriodApplies {
if !maxRateApplies {
totalWithdraw.Add(totalWithdraw, d.Undelegations[j].Amount)
}
count++ count++
} else { } else {
break break

@ -75,7 +75,7 @@ func TestUnlockedLastEpochInCommittee(t *testing.T) {
amount4 := big.NewInt(4000) amount4 := big.NewInt(4000)
delegation.Undelegate(epoch4, amount4, nil) delegation.Undelegate(epoch4, amount4, nil)
result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, false) result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, false, false)
if result.Cmp(big.NewInt(8000)) != 0 { if result.Cmp(big.NewInt(8000)) != 0 {
t.Errorf("removing an unlocked undelegation fails") t.Errorf("removing an unlocked undelegation fails")
} }
@ -90,7 +90,7 @@ func TestUnlockedLastEpochInCommitteeFail(t *testing.T) {
amount4 := big.NewInt(4000) amount4 := big.NewInt(4000)
delegation.Undelegate(epoch4, amount4, nil) delegation.Undelegate(epoch4, amount4, nil)
result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, false) result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, false, false)
if result.Cmp(big.NewInt(0)) != 0 { if result.Cmp(big.NewInt(0)) != 0 {
t.Errorf("premature delegation shouldn't be unlocked") t.Errorf("premature delegation shouldn't be unlocked")
} }
@ -104,7 +104,7 @@ func TestUnlockedFullPeriod(t *testing.T) {
amount5 := big.NewInt(4000) amount5 := big.NewInt(4000)
delegation.Undelegate(epoch5, amount5, nil) delegation.Undelegate(epoch5, amount5, nil)
result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, false) result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, false, false)
if result.Cmp(big.NewInt(4000)) != 0 { if result.Cmp(big.NewInt(4000)) != 0 {
t.Errorf("removing an unlocked undelegation fails") t.Errorf("removing an unlocked undelegation fails")
} }
@ -118,7 +118,7 @@ func TestQuickUnlock(t *testing.T) {
amount7 := big.NewInt(4000) amount7 := big.NewInt(4000)
delegation.Undelegate(epoch7, amount7, nil) delegation.Undelegate(epoch7, amount7, nil)
result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 0, false) result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 0, false, false)
if result.Cmp(big.NewInt(4000)) != 0 { if result.Cmp(big.NewInt(4000)) != 0 {
t.Errorf("removing an unlocked undelegation fails") t.Errorf("removing an unlocked undelegation fails")
} }
@ -133,7 +133,7 @@ func TestUnlockedFullPeriodFail(t *testing.T) {
amount5 := big.NewInt(4000) amount5 := big.NewInt(4000)
delegation.Undelegate(epoch5, amount5, nil) delegation.Undelegate(epoch5, amount5, nil)
result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, false) result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, false, false)
if result.Cmp(big.NewInt(0)) != 0 { if result.Cmp(big.NewInt(0)) != 0 {
t.Errorf("premature delegation shouldn't be unlocked") t.Errorf("premature delegation shouldn't be unlocked")
} }
@ -147,7 +147,7 @@ func TestUnlockedPremature(t *testing.T) {
amount6 := big.NewInt(4000) amount6 := big.NewInt(4000)
delegation.Undelegate(epoch6, amount6, nil) delegation.Undelegate(epoch6, amount6, nil)
result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, false) result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, false, false)
if result.Cmp(big.NewInt(0)) != 0 { if result.Cmp(big.NewInt(0)) != 0 {
t.Errorf("premature delegation shouldn't be unlocked") t.Errorf("premature delegation shouldn't be unlocked")
} }
@ -161,8 +161,128 @@ func TestNoEarlyUnlock(t *testing.T) {
amount4 := big.NewInt(4000) amount4 := big.NewInt(4000)
delegation.Undelegate(epoch4, amount4, nil) delegation.Undelegate(epoch4, amount4, nil)
result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, true) result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, true, false)
if result.Cmp(big.NewInt(0)) != 0 { if result.Cmp(big.NewInt(0)) != 0 {
t.Errorf("should not allow early unlock") t.Errorf("should not allow early unlock")
} }
} }
func TestMaxRateAtLess(t *testing.T) {
// recreate it so that all tests can run
delegation := NewDelegation(delegatorAddr, delegationAmt)
lastEpochInCommittee := big.NewInt(1)
curEpoch := big.NewInt(27)
epoch := big.NewInt(21)
amount := big.NewInt(4000)
delegation.Undelegate(epoch, amount)
initialLength := len(delegation.Undelegations)
result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, true, true)
if result.Cmp(big.NewInt(0)) != 0 {
t.Errorf("should not allow unlock before 7")
}
finalLength := len(delegation.Undelegations)
if initialLength != finalLength {
t.Errorf("should not remove undelegations before 7")
}
}
func TestMaxRateAtEqual(t *testing.T) {
// recreate it so that all tests can run
delegation := NewDelegation(delegatorAddr, delegationAmt)
lastEpochInCommittee := big.NewInt(1)
curEpoch := big.NewInt(28)
epoch := big.NewInt(21)
amount := big.NewInt(4000)
delegation.Undelegate(epoch, amount)
initialLength := len(delegation.Undelegations)
result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, true, true)
if result.Cmp(big.NewInt(4000)) != 0 {
t.Errorf("should withdraw at 7")
}
finalLength := len(delegation.Undelegations)
if initialLength == finalLength {
t.Errorf("should remove undelegations at 7")
}
}
func TestMaxRateAtExcess(t *testing.T) {
// recreate it so that all tests can run
delegation := NewDelegation(delegatorAddr, delegationAmt)
lastEpochInCommittee := big.NewInt(1)
curEpoch := big.NewInt(29)
epoch := big.NewInt(21)
amount := big.NewInt(4000)
delegation.Undelegate(epoch, amount)
initialLength := len(delegation.Undelegations)
result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, true, true)
if result.Cmp(big.NewInt(0)) != 0 {
t.Errorf("should not withdraw at 8")
}
finalLength := len(delegation.Undelegations)
if initialLength == finalLength {
t.Errorf("should remove undelegations at 8")
}
}
func TestNoMaxRateAtLess(t *testing.T) {
// recreate it so that all tests can run
delegation := NewDelegation(delegatorAddr, delegationAmt)
lastEpochInCommittee := big.NewInt(1)
curEpoch := big.NewInt(27)
epoch := big.NewInt(21)
amount := big.NewInt(4000)
delegation.Undelegate(epoch, amount)
initialLength := len(delegation.Undelegations)
result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, true, false)
if result.Cmp(big.NewInt(0)) != 0 {
t.Errorf("should not allow unlock before 7")
}
finalLength := len(delegation.Undelegations)
if initialLength != finalLength {
t.Errorf("should not remove undelegations before 7")
}
}
func TestNoMaxRateAtEqual(t *testing.T) {
// recreate it so that all tests can run
delegation := NewDelegation(delegatorAddr, delegationAmt)
lastEpochInCommittee := big.NewInt(1)
curEpoch := big.NewInt(28)
epoch := big.NewInt(21)
amount := big.NewInt(4000)
delegation.Undelegate(epoch, amount)
initialLength := len(delegation.Undelegations)
result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, true, false)
if result.Cmp(big.NewInt(4000)) != 0 {
t.Errorf("should withdraw at 7")
}
finalLength := len(delegation.Undelegations)
if initialLength == finalLength {
t.Errorf("should remove undelegations at 7")
}
}
func TestNoMaxRateAtExcess(t *testing.T) {
// recreate it so that all tests can run
delegation := NewDelegation(delegatorAddr, delegationAmt)
lastEpochInCommittee := big.NewInt(1)
curEpoch := big.NewInt(29)
epoch := big.NewInt(21)
amount := big.NewInt(4000)
delegation.Undelegate(epoch, amount)
initialLength := len(delegation.Undelegations)
result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, true, false)
if result.Cmp(big.NewInt(4000)) != 0 {
t.Errorf("should withdraw at 8")
}
finalLength := len(delegation.Undelegations)
if initialLength == finalLength {
t.Errorf("should remove undelegations at 8")
}
}

Loading…
Cancel
Save