From 3e7ff3839fa564cb1c07ffc04b1b6baac7cc1d72 Mon Sep 17 00:00:00 2001 From: Max <82761650+MaxMustermann2@users.noreply.github.com> Date: Fri, 15 Dec 2023 23:25:30 +0530 Subject: [PATCH 1/5] do not return bech32 in receipt, fix 7% fee (#4588) * Fix: max rate issue (#4580) * fix: max-rate bellow the era min-rate * fix comments * add localnet epoch config * update config * update config * update config * update config * add log * remove hip30 from localnet * disable localnet config * engine: actually fix the 7% fee implementation * rpc: fix transaction receipt format for eth use the same receipt as `hmyv2_`. using a boolean variable, decide if the addresses need to be converted to bech32. do not return a contract address unless a contract was actually deployed in the transaction by using a pointer address type. * rpc: add comment indicating function is unused with the switch to `v2.NewReceipt` for even `eth_` queries, the `eth.NewReceipt` function is no longer used * build: fix delegation tests * update comment blocks was referring to `blocks of code` and not blocks in a chain. removed the confusing word * rpc: remove ConvertToEth in GetBlockReceipts * internal: max rate hard fork schedule * internal: testnet max rate schedule --------- Co-authored-by: Diego Nava <8563843+diego1q2w@users.noreply.github.com> --- core/state/statedb.go | 4 ++- core/tx_pool.go | 10 ++---- hmy/staking.go | 8 ++++- internal/chain/engine.go | 30 ++++++++++++++--- internal/params/config.go | 20 +++++++++++- rpc/blockchain.go | 4 +-- rpc/eth/types.go | 2 +- rpc/transaction.go | 14 ++------ rpc/v2/types.go | 55 +++++++++++++++++--------------- staking/availability/measure.go | 24 ++++++++++++++ staking/types/delegation.go | 16 +++++++--- staking/types/delegation_test.go | 14 ++++---- test/build-localnet-validator.sh | 4 +-- 13 files changed, 138 insertions(+), 67 deletions(-) diff --git a/core/state/statedb.go b/core/state/statedb.go index 96bd4d26e..fce6f750b 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -894,7 +894,9 @@ func (db *DB) Finalise(deleteEmptyObjects bool) { // Commit validator changes in cache to stateObjects // TODO: remove validator cache after commit for addr, wrapper := range db.stateValidators { - db.UpdateValidatorWrapper(addr, wrapper) + if err := db.UpdateValidatorWrapper(addr, wrapper); err != nil { + utils.Logger().Warn().Err(err).Msg("Unable to update the validator wrapper on the finalize") + } } addressesToPrefetch := make([][]byte, 0, len(db.journal.dirties)) for addr := range db.journal.dirties { diff --git a/core/tx_pool.go b/core/tx_pool.go index d91849022..2457da385 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -850,15 +850,11 @@ func (pool *TxPool) validateStakingTx(tx *staking.StakingTransaction) error { } currentBlockNumber := pool.chain.CurrentBlock().Number() pendingBlockNumber := new(big.Int).Add(currentBlockNumber, big.NewInt(1)) - pendingEpoch := pool.chain.CurrentBlock().Epoch() - if shard.Schedule.IsLastBlock(currentBlockNumber.Uint64()) { - pendingEpoch = new(big.Int).Add(pendingEpoch, big.NewInt(1)) - } chainContext, ok := pool.chain.(ChainContext) if !ok { chainContext = nil // might use testing blockchain, set to nil for verifier to handle. } - _, err = VerifyAndCreateValidatorFromMsg(pool.currentState, chainContext, pendingEpoch, pendingBlockNumber, stkMsg) + _, err = VerifyAndCreateValidatorFromMsg(pool.currentState, chainContext, pool.pendingEpoch(), pendingBlockNumber, stkMsg) return err case staking.DirectiveEditValidator: msg, err := staking.RLPDecodeStakeMsg(tx.Data(), staking.DirectiveEditValidator) @@ -932,7 +928,6 @@ func (pool *TxPool) validateStakingTx(tx *staking.StakingTransaction) error { if from != stkMsg.DelegatorAddress { return errors.WithMessagef(ErrInvalidSender, "staking transaction sender is %s", b32) } - _, err = VerifyAndUndelegateFromMsg(pool.currentState, pool.pendingEpoch(), stkMsg) return err case staking.DirectiveCollectRewards: @@ -964,11 +959,12 @@ func (pool *TxPool) validateStakingTx(tx *staking.StakingTransaction) error { } } +// pendingEpoch refers to the epoch of the pending block func (pool *TxPool) pendingEpoch() *big.Int { currentBlock := pool.chain.CurrentBlock() pendingEpoch := currentBlock.Epoch() if shard.Schedule.IsLastBlock(currentBlock.Number().Uint64()) { - pendingEpoch.Add(pendingEpoch, big.NewInt(1)) + pendingEpoch = new(big.Int).Add(pendingEpoch, common.Big1) } return pendingEpoch } diff --git a/hmy/staking.go b/hmy/staking.go index 9d99b7908..83e800544 100644 --- a/hmy/staking.go +++ b/hmy/staking.go @@ -143,6 +143,11 @@ func (hmy *Harmony) IsNoEarlyUnlockEpoch(epoch *big.Int) bool { return hmy.BlockChain.Config().IsNoEarlyUnlock(epoch) } +// IsMaxRate ... +func (hmy *Harmony) IsMaxRate(epoch *big.Int) bool { + return hmy.BlockChain.Config().IsMaxRate(epoch) +} + // IsCommitteeSelectionBlock checks if the given block is the committee selection block func (hmy *Harmony) IsCommitteeSelectionBlock(header *block.Header) bool { return chain.IsCommitteeSelectionBlock(hmy.BlockChain, header) @@ -592,6 +597,7 @@ func (hmy *Harmony) GetUndelegationPayouts( return undelegationPayouts, nil } + isMaxRate := hmy.IsMaxRate(epoch) lockingPeriod := hmy.GetDelegationLockingPeriodInEpoch(undelegationPayoutBlock.Epoch()) for _, validator := range hmy.GetAllValidatorAddresses() { wrapper, err := hmy.BlockChain.ReadValidatorInformationAtRoot(validator, undelegationPayoutBlock.Root()) @@ -600,7 +606,7 @@ func (hmy *Harmony) GetUndelegationPayouts( } noEarlyUnlock := hmy.IsNoEarlyUnlockEpoch(epoch) for _, delegation := range wrapper.Delegations { - withdraw := delegation.RemoveUnlockedUndelegations(epoch, wrapper.LastEpochInCommittee, lockingPeriod, noEarlyUnlock) + withdraw := delegation.RemoveUnlockedUndelegations(epoch, wrapper.LastEpochInCommittee, lockingPeriod, noEarlyUnlock, isMaxRate) if withdraw.Cmp(bigZero) == 1 { undelegationPayouts.SetPayoutByDelegatorAddrAndValidatorAddr(validator, delegation.DelegatorAddress, withdraw) } diff --git a/internal/chain/engine.go b/internal/chain/engine.go index 4f3aac9ff..8e07f7a1e 100644 --- a/internal/chain/engine.go +++ b/internal/chain/engine.go @@ -370,9 +370,15 @@ func payoutUndelegations( const msg = "[Finalize] failed to read all validators" return errors.New(msg) } - // Payout undelegated/unlocked tokens + // Payout undelegated/unlocked tokens at the end of each epoch lockPeriod := GetLockPeriodInEpoch(chain, header.Epoch()) noEarlyUnlock := chain.Config().IsNoEarlyUnlock(header.Epoch()) + newShardState, err := header.GetShardState() + if err != nil { + const msg = "[Finalize] failed to read shard state" + return errors.New(msg) + } + isMaxRate := chain.Config().IsMaxRate(newShardState.Epoch) for _, validator := range validators { wrapper, err := state.ValidatorWrapper(validator, true, false) if err != nil { @@ -383,7 +389,7 @@ func payoutUndelegations( for i := range wrapper.Delegations { delegation := &wrapper.Delegations[i] totalWithdraw := delegation.RemoveUnlockedUndelegations( - header.Epoch(), wrapper.LastEpochInCommittee, lockPeriod, noEarlyUnlock, + header.Epoch(), wrapper.LastEpochInCommittee, lockPeriod, noEarlyUnlock, isMaxRate, ) if totalWithdraw.Sign() != 0 { state.AddBalance(delegation.DelegatorAddress, totalWithdraw) @@ -426,6 +432,7 @@ func setElectionEpochAndMinFee(chain engine.ChainReader, header *block.Header, s map[common.Address]struct{}, len(newShardState.StakedValidators().Addrs), ) + // this loop is for elected validators only for _, addr := range newShardState.StakedValidators().Addrs { wrapper, err := state.ValidatorWrapper(addr, true, false) if err != nil { @@ -448,11 +455,13 @@ func setElectionEpochAndMinFee(chain engine.ChainReader, header *block.Header, s } isElected[addr] = struct{}{} } + // due to a bug in the old implementation of the minimum fee, // unelected validators did not have their fee updated even // when the protocol required them to do so. here we fix it, - // but only after the HIP-30 hard fork is effective. - if config.IsHIP30(newShardState.Epoch) { + // but only after the HIP-30 hard fork is effective + // this loop applies to all validators, but excludes the ones in isElected + if config.IsHIP30(newShardState.Epoch) && minRateNotZero { for _, addr := range chain.ValidatorCandidates() { // skip elected validator if _, ok := isElected[addr]; ok { @@ -466,6 +475,19 @@ func setElectionEpochAndMinFee(chain engine.ChainReader, header *block.Header, s } } } + + // for all validators which have MaxRate < minRate + maxChangeRate + // set their MaxRate equal to the minRate + MaxChangeRate + // this will allow the wrapper.SanityCheck to pass if Rate is set to a value + // higher than the the MaxRate by UpdateMinimumCommissionFee above + if config.IsMaxRate(newShardState.Epoch) && minRateNotZero { + for _, addr := range chain.ValidatorCandidates() { + if _, err := availability.UpdateMaxCommissionFee(state, addr, minRate); err != nil { + return err + } + } + } + return nil } diff --git a/internal/params/config.go b/internal/params/config.go index 86695ba40..c96bf8ab9 100644 --- a/internal/params/config.go +++ b/internal/params/config.go @@ -75,6 +75,7 @@ var ( ValidatorCodeFixEpoch: big.NewInt(1535), // 2023-07-20 05:51:07+00:00 HIP30Epoch: big.NewInt(1673), // 2023-11-02 17:30:00+00:00 BlockGas30MEpoch: big.NewInt(1673), // 2023-11-02 17:30:00+00:00 + MaxRateEpoch: big.NewInt(1733), // 2023-12-17 12:20:15+00:00 } // TestnetChainConfig contains the chain parameters to run a node on the harmony test network. @@ -118,6 +119,7 @@ var ( ValidatorCodeFixEpoch: big.NewInt(1296), // 2023-04-28 07:14:20+00:00 HIP30Epoch: big.NewInt(2176), // 2023-10-12 10:00:00+00:00 BlockGas30MEpoch: big.NewInt(2176), // 2023-10-12 10:00:00+00:00 + MaxRateEpoch: big.NewInt(2520), // 2023-12-16 12:17:14+00:00 } // PangaeaChainConfig contains the chain parameters for the Pangaea network. // All features except for CrossLink are enabled at launch. @@ -161,6 +163,7 @@ var ( ValidatorCodeFixEpoch: EpochTBD, HIP30Epoch: EpochTBD, BlockGas30MEpoch: big.NewInt(0), + MaxRateEpoch: EpochTBD, } // PartnerChainConfig contains the chain parameters for the Partner network. @@ -205,6 +208,7 @@ var ( ValidatorCodeFixEpoch: big.NewInt(5), HIP30Epoch: big.NewInt(7), BlockGas30MEpoch: big.NewInt(7), + MaxRateEpoch: EpochTBD, } // StressnetChainConfig contains the chain parameters for the Stress test network. @@ -249,6 +253,7 @@ var ( ValidatorCodeFixEpoch: EpochTBD, HIP30Epoch: EpochTBD, BlockGas30MEpoch: big.NewInt(0), + MaxRateEpoch: EpochTBD, } // LocalnetChainConfig contains the chain parameters to run for local development. @@ -292,6 +297,7 @@ var ( ValidatorCodeFixEpoch: big.NewInt(2), HIP30Epoch: EpochTBD, BlockGas30MEpoch: big.NewInt(0), + MaxRateEpoch: EpochTBD, } // AllProtocolChanges ... @@ -336,7 +342,8 @@ var ( big.NewInt(0), // FeeCollectEpoch big.NewInt(0), // ValidatorCodeFixEpoch big.NewInt(0), // BlockGas30M - big.NewInt(0), // HIP30Epoch + big.NewInt(0), // BlockGas30M + big.NewInt(0), // MaxRateEpoch } // TestChainConfig ... @@ -382,6 +389,7 @@ var ( big.NewInt(0), // ValidatorCodeFixEpoch big.NewInt(0), // HIP30Epoch big.NewInt(0), // BlockGas30M + big.NewInt(0), // MaxRateEpoch } // TestRules ... @@ -547,6 +555,9 @@ type ChainConfig struct { HIP30Epoch *big.Int `json:"hip30-epoch,omitempty"` BlockGas30MEpoch *big.Int `json:"block-gas-30m-epoch,omitempty"` + + // MaxRateEpoch will make sure the validator max-rate is at least equal to the minRate + the validator max-rate-increase + MaxRateEpoch *big.Int `json:"max-rate-epoch,omitempty"` } // String implements the fmt.Stringer interface. @@ -612,6 +623,9 @@ func (c *ChainConfig) mustValid() { // capabilities required to transfer balance across shards require(c.HIP30Epoch.Cmp(c.CrossTxEpoch) > 0, "must satisfy: HIP30Epoch > CrossTxEpoch") + // max rate (7%) fix is applied on or after hip30 + require(c.MaxRateEpoch.Cmp(c.HIP30Epoch) >= 0, + "must satisfy: MaxRateEpoch >= HIP30Epoch") } // IsEIP155 returns whether epoch is either equal to the EIP155 fork epoch or greater. @@ -803,6 +817,10 @@ func (c *ChainConfig) IsHIP30(epoch *big.Int) bool { return isForked(c.HIP30Epoch, epoch) } +func (c *ChainConfig) IsMaxRate(epoch *big.Int) bool { + return isForked(c.MaxRateEpoch, epoch) +} + // During this epoch, shards 2 and 3 will start sending // their balances over to shard 0 or 1. func (c *ChainConfig) IsOneEpochBeforeHIP30(epoch *big.Int) bool { diff --git a/rpc/blockchain.go b/rpc/blockchain.go index ae588e750..e830b30f0 100644 --- a/rpc/blockchain.go +++ b/rpc/blockchain.go @@ -461,10 +461,10 @@ func (s *PublicBlockchainService) GetBlockReceipts( case V1: r, err = v1.NewReceipt(tx, blockHash, block.NumberU64(), index, rmap[tx.Hash()]) case V2: - r, err = v2.NewReceipt(tx, blockHash, block.NumberU64(), index, rmap[tx.Hash()]) + r, err = v2.NewReceipt(tx, blockHash, block.NumberU64(), index, rmap[tx.Hash()], false) case Eth: if tx, ok := tx.(*types.Transaction); ok { - r, err = eth.NewReceipt(tx.ConvertToEth(), blockHash, block.NumberU64(), index, rmap[tx.Hash()]) + r, err = v2.NewReceipt(tx, blockHash, block.NumberU64(), index, rmap[tx.Hash()], true) } default: return nil, ErrUnknownRPCVersion diff --git a/rpc/eth/types.go b/rpc/eth/types.go index f76aa4442..e3a64eb7f 100644 --- a/rpc/eth/types.go +++ b/rpc/eth/types.go @@ -111,7 +111,7 @@ func NewTransaction( return result, nil } -// NewReceipt returns the RPC data for a new receipt +// NewReceipt returns the RPC data for a new receipt. It is unused at the moment. func NewReceipt(tx *types.EthTransaction, blockHash common.Hash, blockNumber, blockIndex uint64, receipt *types.Receipt) (map[string]interface{}, error) { senderAddr, err := tx.SenderAddress() if err != nil { diff --git a/rpc/transaction.go b/rpc/transaction.go index 8ea211d6a..ef30092b2 100644 --- a/rpc/transaction.go +++ b/rpc/transaction.go @@ -751,19 +751,11 @@ func (s *PublicTransactionService) GetTransactionReceipt( return nil, err } return NewStructuredResponse(RPCReceipt) - case V2: + case V2, Eth: if tx == nil { - RPCReceipt, err = v2.NewReceipt(stx, blockHash, blockNumber, index, receipt) + RPCReceipt, err = v2.NewReceipt(stx, blockHash, blockNumber, index, receipt, false) } else { - RPCReceipt, err = v2.NewReceipt(tx, blockHash, blockNumber, index, receipt) - } - if err != nil { - return nil, err - } - return NewStructuredResponse(RPCReceipt) - case Eth: - if tx != nil { - RPCReceipt, err = eth.NewReceipt(tx.ConvertToEth(), blockHash, blockNumber, index, receipt) + RPCReceipt, err = v2.NewReceipt(tx, blockHash, blockNumber, index, receipt, s.version == Eth) } if err != nil { return nil, err diff --git a/rpc/v2/types.go b/rpc/v2/types.go index c9abbead0..d2b21627c 100644 --- a/rpc/v2/types.go +++ b/rpc/v2/types.go @@ -204,20 +204,20 @@ type UndelegateMsg struct { // TxReceipt represents a transaction receipt that will serialize to the RPC representation. type TxReceipt struct { - BlockHash common.Hash `json:"blockHash"` - TransactionHash common.Hash `json:"transactionHash"` - BlockNumber uint64 `json:"blockNumber"` - TransactionIndex uint64 `json:"transactionIndex"` - GasUsed uint64 `json:"gasUsed"` - CumulativeGasUsed uint64 `json:"cumulativeGasUsed"` - ContractAddress common.Address `json:"contractAddress"` - Logs []*types.Log `json:"logs"` - LogsBloom ethtypes.Bloom `json:"logsBloom"` - ShardID uint32 `json:"shardID"` - From string `json:"from"` - To string `json:"to"` - Root hexutil.Bytes `json:"root"` - Status uint `json:"status"` + BlockHash common.Hash `json:"blockHash"` + TransactionHash common.Hash `json:"transactionHash"` + BlockNumber uint64 `json:"blockNumber"` + TransactionIndex uint64 `json:"transactionIndex"` + GasUsed uint64 `json:"gasUsed"` + CumulativeGasUsed uint64 `json:"cumulativeGasUsed"` + ContractAddress *common.Address `json:"contractAddress"` + Logs []*types.Log `json:"logs"` + LogsBloom ethtypes.Bloom `json:"logsBloom"` + ShardID uint32 `json:"shardID"` + From string `json:"from"` + To string `json:"to"` + Root hexutil.Bytes `json:"root"` + Status uint `json:"status"` } // StakingTxReceipt represents a staking transaction receipt that will serialize to the RPC representation. @@ -334,11 +334,11 @@ func NewTransaction( // NewReceipt returns a transaction OR staking transaction that will serialize to the RPC representation func NewReceipt( - tx interface{}, blockHash common.Hash, blockNumber, blockIndex uint64, receipt *types.Receipt, + tx interface{}, blockHash common.Hash, blockNumber, blockIndex uint64, receipt *types.Receipt, eth bool, ) (interface{}, error) { plainTx, ok := tx.(*types.Transaction) if ok { - return NewTxReceipt(plainTx, blockHash, blockNumber, blockIndex, receipt) + return NewTxReceipt(plainTx, blockHash, blockNumber, blockIndex, receipt, eth) } stakingTx, ok := tx.(*staking.StakingTransaction) if ok { @@ -349,7 +349,7 @@ func NewReceipt( // NewTxReceipt returns a plain transaction receipt that will serialize to the RPC representation func NewTxReceipt( - tx *types.Transaction, blockHash common.Hash, blockNumber, blockIndex uint64, receipt *types.Receipt, + tx *types.Transaction, blockHash common.Hash, blockNumber, blockIndex uint64, receipt *types.Receipt, eth bool, ) (*TxReceipt, error) { // Set correct to & from address senderAddr, err := tx.SenderAddress() @@ -363,13 +363,18 @@ func NewTxReceipt( receiver = "" } else { // Handle response type for regular transaction - sender, err = internal_common.AddressToBech32(senderAddr) - if err != nil { - return nil, err - } - receiver, err = internal_common.AddressToBech32(*tx.To()) - if err != nil { - return nil, err + if eth { + sender = senderAddr.String() + receiver = tx.To().String() + } else { + sender, err = internal_common.AddressToBech32(senderAddr) + if err != nil { + return nil, err + } + receiver, err = internal_common.AddressToBech32(*tx.To()) + if err != nil { + return nil, err + } } } @@ -404,7 +409,7 @@ func NewTxReceipt( // If the ContractAddress is 20 0x0 bytes, assume it is not a contract creation if receipt.ContractAddress != (common.Address{}) { - txReceipt.ContractAddress = receipt.ContractAddress + txReceipt.ContractAddress = &receipt.ContractAddress } return txReceipt, nil } diff --git a/staking/availability/measure.go b/staking/availability/measure.go index 881baa855..6bf36bfb0 100644 --- a/staking/availability/measure.go +++ b/staking/availability/measure.go @@ -267,3 +267,27 @@ func UpdateMinimumCommissionFee( } return false, nil } + +// UpdateMaxCommissionFee makes sure the max-rate is at least higher than the rate + max-rate-change. +func UpdateMaxCommissionFee(state *state.DB, addr common.Address, minRate numeric.Dec) (bool, error) { + utils.Logger().Info().Msg("begin update max commission fee") + + wrapper, err := state.ValidatorWrapper(addr, true, false) + if err != nil { + return false, err + } + + minMaxRate := minRate.Add(wrapper.MaxChangeRate) + + if wrapper.MaxRate.LT(minMaxRate) { + utils.Logger().Info(). + Str("addr", addr.Hex()). + Str("old max-rate", wrapper.MaxRate.String()). + Str("new max-rate", minMaxRate.String()). + Msg("updating max commission rate") + wrapper.MaxRate.SetBytes(minMaxRate.Bytes()) + return true, nil + } + + return false, nil +} diff --git a/staking/types/delegation.go b/staking/types/delegation.go index 9f0a0c622..c222048e4 100644 --- a/staking/types/delegation.go +++ b/staking/types/delegation.go @@ -178,15 +178,21 @@ func (d *Delegation) DeleteEntry(epoch *big.Int) { // RemoveUnlockedUndelegations removes all fully unlocked // undelegations and returns the total sum func (d *Delegation) RemoveUnlockedUndelegations( - curEpoch, lastEpochInCommittee *big.Int, lockPeriod int, noEarlyUnlock bool, + curEpoch, lastEpochInCommittee *big.Int, lockPeriod int, noEarlyUnlock bool, isMaxRate bool, ) *big.Int { totalWithdraw := big.NewInt(0) count := 0 for j := range d.Undelegations { - if big.NewInt(0).Sub(curEpoch, d.Undelegations[j].Epoch).Int64() >= int64(lockPeriod) || - (!noEarlyUnlock && big.NewInt(0).Sub(curEpoch, lastEpochInCommittee).Int64() >= int64(lockPeriod)) { - // need to wait at least 7 epochs to withdraw; or the validator has been out of committee for 7 epochs - totalWithdraw.Add(totalWithdraw, d.Undelegations[j].Amount) + epochsSinceUndelegation := big.NewInt(0).Sub(curEpoch, d.Undelegations[j].Epoch).Int64() + // >=7 epochs have passed since undelegation, or + lockPeriodApplies := epochsSinceUndelegation >= int64(lockPeriod) + // >=7 epochs have passed since unelection during the noEarlyUnlock configuration + earlyUnlockPeriodApplies := big.NewInt(0).Sub(curEpoch, lastEpochInCommittee).Int64() >= int64(lockPeriod) && !noEarlyUnlock + maxRateApplies := isMaxRate && epochsSinceUndelegation > int64(lockPeriod) + if lockPeriodApplies || earlyUnlockPeriodApplies { + if !maxRateApplies { + totalWithdraw.Add(totalWithdraw, d.Undelegations[j].Amount) + } count++ } else { break diff --git a/staking/types/delegation_test.go b/staking/types/delegation_test.go index c40750e4c..feadda3cc 100644 --- a/staking/types/delegation_test.go +++ b/staking/types/delegation_test.go @@ -75,7 +75,7 @@ func TestUnlockedLastEpochInCommittee(t *testing.T) { amount4 := big.NewInt(4000) delegation.Undelegate(epoch4, amount4) - result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, false) + result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, false, false) if result.Cmp(big.NewInt(8000)) != 0 { t.Errorf("removing an unlocked undelegation fails") } @@ -90,7 +90,7 @@ func TestUnlockedLastEpochInCommitteeFail(t *testing.T) { amount4 := big.NewInt(4000) delegation.Undelegate(epoch4, amount4) - result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, false) + result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, false, false) if result.Cmp(big.NewInt(0)) != 0 { t.Errorf("premature delegation shouldn't be unlocked") } @@ -104,7 +104,7 @@ func TestUnlockedFullPeriod(t *testing.T) { amount5 := big.NewInt(4000) delegation.Undelegate(epoch5, amount5) - result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, false) + result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, false, false) if result.Cmp(big.NewInt(4000)) != 0 { t.Errorf("removing an unlocked undelegation fails") } @@ -118,7 +118,7 @@ func TestQuickUnlock(t *testing.T) { amount7 := big.NewInt(4000) delegation.Undelegate(epoch7, amount7) - result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 0, false) + result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 0, false, false) if result.Cmp(big.NewInt(4000)) != 0 { t.Errorf("removing an unlocked undelegation fails") } @@ -133,7 +133,7 @@ func TestUnlockedFullPeriodFail(t *testing.T) { amount5 := big.NewInt(4000) delegation.Undelegate(epoch5, amount5) - result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, false) + result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, false, false) if result.Cmp(big.NewInt(0)) != 0 { t.Errorf("premature delegation shouldn't be unlocked") } @@ -147,7 +147,7 @@ func TestUnlockedPremature(t *testing.T) { amount6 := big.NewInt(4000) delegation.Undelegate(epoch6, amount6) - result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, false) + result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, false, false) if result.Cmp(big.NewInt(0)) != 0 { t.Errorf("premature delegation shouldn't be unlocked") } @@ -161,7 +161,7 @@ func TestNoEarlyUnlock(t *testing.T) { amount4 := big.NewInt(4000) delegation.Undelegate(epoch4, amount4) - result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, true) + result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, true, false) if result.Cmp(big.NewInt(0)) != 0 { t.Errorf("should not allow early unlock") } diff --git a/test/build-localnet-validator.sh b/test/build-localnet-validator.sh index 08d987777..70501c8d6 100644 --- a/test/build-localnet-validator.sh +++ b/test/build-localnet-validator.sh @@ -32,7 +32,7 @@ hmy --node="http://localhost:9500" staking create-validator \ --bls-pubkeys 4f41a37a3a8d0695dd6edcc58142c6b7d98e74da5c90e79b587b3b960b6a4f5e048e6d8b8a000d77a478d44cd640270c,7dcc035a943e29e17959dabe636efad7303d2c6f273ace457ba9dcc2fd19d3f37e70ba1cd8d082cf8ff7be2f861db48c \ --name "s0-localnet-validator1" --identity "validator1" --details "validator1" \ --security-contact "localnet" --website "localnet.one" \ - --max-change-rate 0.1 --max-rate 0.1 --rate 0.1 \ + --max-change-rate 0.01 --max-rate 0.01 --rate 0.01 \ --max-total-delegation 100000000 --min-self-delegation 10000 --bls-pubkeys-dir .hmy/extbls/ hmy --node="http://localhost:9500" staking create-validator \ @@ -40,7 +40,7 @@ hmy --node="http://localhost:9500" staking create-validator \ --bls-pubkeys b0917378b179a519a5055259c4f8980cce37d58af300b00dd98b07076d3d9a3b16c4a55f84522f553872225a7b1efc0c \ --name "s0-localnet-validator2" --identity "validator2" --details "validator2" \ --security-contact "localnet" --website "localnet.one" \ - --max-change-rate 0.1 --max-rate 0.1 --rate 0.1 \ + --max-change-rate 0.1 --max-rate 0.1 --rate 0.05 \ --max-total-delegation 100000000 --min-self-delegation 10000 --bls-pubkeys-dir .hmy/extbls/ hmy --node="http://localhost:9500" staking create-validator \ From dd65484d186f8c86aa7560f7bd22ea6ccd39b7f7 Mon Sep 17 00:00:00 2001 From: Diego Nava <8563843+diego1q2w@users.noreply.github.com> Date: Mon, 18 Dec 2023 20:42:19 +0100 Subject: [PATCH 2/5] fix getTransactionReceipt response (#4590) --- rpc/blockchain.go | 4 ++-- rpc/eth/types.go | 44 ++++++++++++++++++++++++++++++++++++++++++++ rpc/transaction.go | 14 +++++++++++--- rpc/v2/types.go | 26 ++++++++++---------------- 4 files changed, 67 insertions(+), 21 deletions(-) diff --git a/rpc/blockchain.go b/rpc/blockchain.go index e830b30f0..284bff5af 100644 --- a/rpc/blockchain.go +++ b/rpc/blockchain.go @@ -461,10 +461,10 @@ func (s *PublicBlockchainService) GetBlockReceipts( case V1: r, err = v1.NewReceipt(tx, blockHash, block.NumberU64(), index, rmap[tx.Hash()]) case V2: - r, err = v2.NewReceipt(tx, blockHash, block.NumberU64(), index, rmap[tx.Hash()], false) + r, err = v2.NewReceipt(tx, blockHash, block.NumberU64(), index, rmap[tx.Hash()]) case Eth: if tx, ok := tx.(*types.Transaction); ok { - r, err = v2.NewReceipt(tx, blockHash, block.NumberU64(), index, rmap[tx.Hash()], true) + r, err = v2.NewReceipt(tx, blockHash, block.NumberU64(), index, rmap[tx.Hash()]) } default: return nil, ErrUnknownRPCVersion diff --git a/rpc/eth/types.go b/rpc/eth/types.go index e3a64eb7f..2767984cd 100644 --- a/rpc/eth/types.go +++ b/rpc/eth/types.go @@ -155,6 +155,50 @@ func NewReceipt(tx *types.EthTransaction, blockHash common.Hash, blockNumber, bl return fields, nil } +// NewReceiptFromTransaction returns the RPC data for a new receipt. It is unused at the moment. +func NewReceiptFromTransaction(tx *types.Transaction, blockHash common.Hash, blockNumber, blockIndex uint64, receipt *types.Receipt) (map[string]interface{}, error) { + senderAddr, err := tx.SenderAddress() + if err != nil { + return nil, err + } + + ethTxHash := tx.Hash() + for i, _ := range receipt.Logs { + // Override log txHash with receipt's + receipt.Logs[i].TxHash = ethTxHash + } + + fields := map[string]interface{}{ + "blockHash": blockHash, + "blockNumber": hexutil.Uint64(blockNumber), + "transactionHash": ethTxHash, + "transactionIndex": hexutil.Uint64(blockIndex), + "from": senderAddr, + "to": tx.To(), + "gasUsed": hexutil.Uint64(receipt.GasUsed), + "cumulativeGasUsed": hexutil.Uint64(receipt.CumulativeGasUsed), + "contractAddress": nil, + "logs": receipt.Logs, + "logsBloom": receipt.Bloom, + } + + // Assign receipt status or post state. + if len(receipt.PostState) > 0 { + fields["root"] = hexutil.Bytes(receipt.PostState) + } else { + fields["status"] = hexutil.Uint(receipt.Status) + } + if receipt.Logs == nil { + fields["logs"] = [][]*types.Log{} + } + // If the ContractAddress is 20 0x0 bytes, assume it is not a contract creation + if receipt.ContractAddress != (common.Address{}) { + fields["contractAddress"] = receipt.ContractAddress + } + + return fields, nil +} + func newBlock(b *types.Block) *Block { head := b.Header() diff --git a/rpc/transaction.go b/rpc/transaction.go index ef30092b2..53f8f4767 100644 --- a/rpc/transaction.go +++ b/rpc/transaction.go @@ -751,11 +751,19 @@ func (s *PublicTransactionService) GetTransactionReceipt( return nil, err } return NewStructuredResponse(RPCReceipt) - case V2, Eth: + case V2: if tx == nil { - RPCReceipt, err = v2.NewReceipt(stx, blockHash, blockNumber, index, receipt, false) + RPCReceipt, err = v2.NewReceipt(stx, blockHash, blockNumber, index, receipt) } else { - RPCReceipt, err = v2.NewReceipt(tx, blockHash, blockNumber, index, receipt, s.version == Eth) + RPCReceipt, err = v2.NewReceipt(tx, blockHash, blockNumber, index, receipt) + } + if err != nil { + return nil, err + } + return NewStructuredResponse(RPCReceipt) + case Eth: + if tx != nil { + RPCReceipt, err = eth.NewReceiptFromTransaction(tx, blockHash, blockNumber, index, receipt) } if err != nil { return nil, err diff --git a/rpc/v2/types.go b/rpc/v2/types.go index d2b21627c..76f36ad40 100644 --- a/rpc/v2/types.go +++ b/rpc/v2/types.go @@ -334,11 +334,11 @@ func NewTransaction( // NewReceipt returns a transaction OR staking transaction that will serialize to the RPC representation func NewReceipt( - tx interface{}, blockHash common.Hash, blockNumber, blockIndex uint64, receipt *types.Receipt, eth bool, + tx interface{}, blockHash common.Hash, blockNumber, blockIndex uint64, receipt *types.Receipt, ) (interface{}, error) { plainTx, ok := tx.(*types.Transaction) if ok { - return NewTxReceipt(plainTx, blockHash, blockNumber, blockIndex, receipt, eth) + return NewTxReceipt(plainTx, blockHash, blockNumber, blockIndex, receipt) } stakingTx, ok := tx.(*staking.StakingTransaction) if ok { @@ -349,7 +349,7 @@ func NewReceipt( // NewTxReceipt returns a plain transaction receipt that will serialize to the RPC representation func NewTxReceipt( - tx *types.Transaction, blockHash common.Hash, blockNumber, blockIndex uint64, receipt *types.Receipt, eth bool, + tx *types.Transaction, blockHash common.Hash, blockNumber, blockIndex uint64, receipt *types.Receipt, ) (*TxReceipt, error) { // Set correct to & from address senderAddr, err := tx.SenderAddress() @@ -362,19 +362,13 @@ func NewTxReceipt( sender = senderAddr.String() receiver = "" } else { - // Handle response type for regular transaction - if eth { - sender = senderAddr.String() - receiver = tx.To().String() - } else { - sender, err = internal_common.AddressToBech32(senderAddr) - if err != nil { - return nil, err - } - receiver, err = internal_common.AddressToBech32(*tx.To()) - if err != nil { - return nil, err - } + sender, err = internal_common.AddressToBech32(senderAddr) + if err != nil { + return nil, err + } + receiver, err = internal_common.AddressToBech32(*tx.To()) + if err != nil { + return nil, err } } From 8717ccf61d2a61497d399ce1958c5d84411099d8 Mon Sep 17 00:00:00 2001 From: Max <82761650+MaxMustermann2@users.noreply.github.com> Date: Tue, 19 Dec 2023 15:28:04 +0530 Subject: [PATCH 3/5] rpc: fix the `from` address calculation (#4593) * test: add delegation type tests These were skipped intentionally in the previous pull request * rpc: undo all changes to 2023.3.0 * rpc: calculate SenderAddress before `ConvertToEth` `tx.ConvertToEth` silently drops the `ShardID` and `ToShardID` fields. This results in the hash of the transaction changing (either via removal of these fields in the hash calculation or by automatic filling of the values by the node's shard). The different hash calculation results in incorrect sender address calculation. There have been a couple of attempts to fix this issue, which created troubles elsewhere. This pull request reverts to the behaviour seen in 2023.3.0 with a simple edit that calculates the `SenderAddress` before dropping the fields in the call to `ConvertToEth`. --- rpc/blockchain.go | 6 +- rpc/eth/types.go | 80 ++++----------------- rpc/pool.go | 9 ++- rpc/transaction.go | 15 +++- rpc/v2/types.go | 31 ++++---- staking/types/delegation_test.go | 120 +++++++++++++++++++++++++++++++ 6 files changed, 177 insertions(+), 84 deletions(-) diff --git a/rpc/blockchain.go b/rpc/blockchain.go index 284bff5af..c9a6a1313 100644 --- a/rpc/blockchain.go +++ b/rpc/blockchain.go @@ -464,7 +464,11 @@ func (s *PublicBlockchainService) GetBlockReceipts( r, err = v2.NewReceipt(tx, blockHash, block.NumberU64(), index, rmap[tx.Hash()]) case Eth: if tx, ok := tx.(*types.Transaction); ok { - r, err = v2.NewReceipt(tx, blockHash, block.NumberU64(), index, rmap[tx.Hash()]) + from, err := tx.SenderAddress() + if err != nil { + return nil, err + } + r, err = eth.NewReceipt(from, tx.ConvertToEth(), blockHash, block.NumberU64(), index, rmap[tx.Hash()]) } default: return nil, ErrUnknownRPCVersion diff --git a/rpc/eth/types.go b/rpc/eth/types.go index 2767984cd..c51d604b2 100644 --- a/rpc/eth/types.go +++ b/rpc/eth/types.go @@ -74,19 +74,9 @@ type Transaction struct { // representation, with the given location metadata set (if available). // Note that all txs on Harmony are replay protected (post EIP155 epoch). func NewTransaction( - tx *types.EthTransaction, blockHash common.Hash, + from common.Address, tx *types.EthTransaction, blockHash common.Hash, blockNumber uint64, timestamp uint64, index uint64, ) (*Transaction, error) { - from := common.Address{} - var err error - if tx.IsEthCompatible() { - from, err = tx.SenderAddress() - } else { - from, err = tx.ConvertToHmy().SenderAddress() - } - if err != nil { - return nil, err - } v, r, s := tx.RawSignatureValues() result := &Transaction{ @@ -111,59 +101,10 @@ func NewTransaction( return result, nil } -// NewReceipt returns the RPC data for a new receipt. It is unused at the moment. -func NewReceipt(tx *types.EthTransaction, blockHash common.Hash, blockNumber, blockIndex uint64, receipt *types.Receipt) (map[string]interface{}, error) { - senderAddr, err := tx.SenderAddress() - if err != nil { - return nil, err - } - +// NewReceipt returns the RPC data for a new receipt +func NewReceipt(senderAddr common.Address, tx *types.EthTransaction, blockHash common.Hash, blockNumber, blockIndex uint64, receipt *types.Receipt) (map[string]interface{}, error) { ethTxHash := tx.Hash() - for i, _ := range receipt.Logs { - // Override log txHash with receipt's - receipt.Logs[i].TxHash = ethTxHash - } - - fields := map[string]interface{}{ - "blockHash": blockHash, - "blockNumber": hexutil.Uint64(blockNumber), - "transactionHash": ethTxHash, - "transactionIndex": hexutil.Uint64(blockIndex), - "from": senderAddr, - "to": tx.To(), - "gasUsed": hexutil.Uint64(receipt.GasUsed), - "cumulativeGasUsed": hexutil.Uint64(receipt.CumulativeGasUsed), - "contractAddress": nil, - "logs": receipt.Logs, - "logsBloom": receipt.Bloom, - } - - // Assign receipt status or post state. - if len(receipt.PostState) > 0 { - fields["root"] = hexutil.Bytes(receipt.PostState) - } else { - fields["status"] = hexutil.Uint(receipt.Status) - } - if receipt.Logs == nil { - fields["logs"] = [][]*types.Log{} - } - // If the ContractAddress is 20 0x0 bytes, assume it is not a contract creation - if receipt.ContractAddress != (common.Address{}) { - fields["contractAddress"] = receipt.ContractAddress - } - - return fields, nil -} - -// NewReceiptFromTransaction returns the RPC data for a new receipt. It is unused at the moment. -func NewReceiptFromTransaction(tx *types.Transaction, blockHash common.Hash, blockNumber, blockIndex uint64, receipt *types.Receipt) (map[string]interface{}, error) { - senderAddr, err := tx.SenderAddress() - if err != nil { - return nil, err - } - - ethTxHash := tx.Hash() - for i, _ := range receipt.Logs { + for i := range receipt.Logs { // Override log txHash with receipt's receipt.Logs[i].TxHash = ethTxHash } @@ -253,7 +194,11 @@ func blockWithFullTxFromBlock(b *types.Block) (*BlockWithFullTx, error) { } for idx, tx := range b.Transactions() { - fmtTx, err := NewTransaction(tx.ConvertToEth(), b.Hash(), b.NumberU64(), b.Time().Uint64(), uint64(idx)) + from, err := tx.SenderAddress() + if err != nil { + return nil, err + } + fmtTx, err := NewTransaction(from, tx.ConvertToEth(), b.Hash(), b.NumberU64(), b.Time().Uint64(), uint64(idx)) if err != nil { return nil, err } @@ -270,5 +215,10 @@ func NewTransactionFromBlockIndex(b *types.Block, index uint64) (*Transaction, e "tx index %v greater than or equal to number of transactions on block %v", index, b.Hash().String(), ) } - return NewTransaction(txs[index].ConvertToEth(), b.Hash(), b.NumberU64(), b.Time().Uint64(), index) + tx := txs[index].ConvertToEth() + from, err := tx.SenderAddress() + if err != nil { + return nil, err + } + return NewTransaction(from, tx, b.Hash(), b.NumberU64(), b.Time().Uint64(), index) } diff --git a/rpc/pool.go b/rpc/pool.go index c0d4858c2..ee4e34828 100644 --- a/rpc/pool.go +++ b/rpc/pool.go @@ -253,7 +253,14 @@ func (s *PublicPoolService) PendingTransactions( continue // Legacy behavior is to not return error here } case Eth: - tx, err = eth.NewTransaction(plainTx.ConvertToEth(), common.Hash{}, 0, 0, 0) + from, err := plainTx.SenderAddress() + if err != nil { + utils.Logger().Debug(). + Err(err). + Msgf("%v error at %v", LogTag, "PendingTransactions") + continue // Legacy behavior is to not return error here + } + tx, err = eth.NewTransaction(from, plainTx.ConvertToEth(), common.Hash{}, 0, 0, 0) if err != nil { utils.Logger().Debug(). Err(err). diff --git a/rpc/transaction.go b/rpc/transaction.go index 53f8f4767..4b4504585 100644 --- a/rpc/transaction.go +++ b/rpc/transaction.go @@ -236,7 +236,13 @@ func (s *PublicTransactionService) newRPCTransaction(tx *types.Transaction, bloc } return NewStructuredResponse(tx) case Eth: - tx, err := eth.NewTransaction(tx.ConvertToEth(), blockHash, blockNumber, timestamp, index) + // calculate SenderAddress before ConvertToEth + senderAddr, err := tx.SenderAddress() + if err != nil { + DoMetricRPCQueryInfo(GetTransactionByHash, FailedNumber) + return nil, err + } + tx, err := eth.NewTransaction(senderAddr, tx.ConvertToEth(), blockHash, blockNumber, timestamp, index) if err != nil { DoMetricRPCQueryInfo(GetTransactionByHash, FailedNumber) return nil, err @@ -763,7 +769,12 @@ func (s *PublicTransactionService) GetTransactionReceipt( return NewStructuredResponse(RPCReceipt) case Eth: if tx != nil { - RPCReceipt, err = eth.NewReceiptFromTransaction(tx, blockHash, blockNumber, index, receipt) + // calculate SenderAddress before ConvertToEth + senderAddr, err := tx.SenderAddress() + if err != nil { + return nil, err + } + RPCReceipt, err = eth.NewReceipt(senderAddr, tx.ConvertToEth(), blockHash, blockNumber, index, receipt) } if err != nil { return nil, err diff --git a/rpc/v2/types.go b/rpc/v2/types.go index 76f36ad40..c9abbead0 100644 --- a/rpc/v2/types.go +++ b/rpc/v2/types.go @@ -204,20 +204,20 @@ type UndelegateMsg struct { // TxReceipt represents a transaction receipt that will serialize to the RPC representation. type TxReceipt struct { - BlockHash common.Hash `json:"blockHash"` - TransactionHash common.Hash `json:"transactionHash"` - BlockNumber uint64 `json:"blockNumber"` - TransactionIndex uint64 `json:"transactionIndex"` - GasUsed uint64 `json:"gasUsed"` - CumulativeGasUsed uint64 `json:"cumulativeGasUsed"` - ContractAddress *common.Address `json:"contractAddress"` - Logs []*types.Log `json:"logs"` - LogsBloom ethtypes.Bloom `json:"logsBloom"` - ShardID uint32 `json:"shardID"` - From string `json:"from"` - To string `json:"to"` - Root hexutil.Bytes `json:"root"` - Status uint `json:"status"` + BlockHash common.Hash `json:"blockHash"` + TransactionHash common.Hash `json:"transactionHash"` + BlockNumber uint64 `json:"blockNumber"` + TransactionIndex uint64 `json:"transactionIndex"` + GasUsed uint64 `json:"gasUsed"` + CumulativeGasUsed uint64 `json:"cumulativeGasUsed"` + ContractAddress common.Address `json:"contractAddress"` + Logs []*types.Log `json:"logs"` + LogsBloom ethtypes.Bloom `json:"logsBloom"` + ShardID uint32 `json:"shardID"` + From string `json:"from"` + To string `json:"to"` + Root hexutil.Bytes `json:"root"` + Status uint `json:"status"` } // StakingTxReceipt represents a staking transaction receipt that will serialize to the RPC representation. @@ -362,6 +362,7 @@ func NewTxReceipt( sender = senderAddr.String() receiver = "" } else { + // Handle response type for regular transaction sender, err = internal_common.AddressToBech32(senderAddr) if err != nil { return nil, err @@ -403,7 +404,7 @@ func NewTxReceipt( // If the ContractAddress is 20 0x0 bytes, assume it is not a contract creation if receipt.ContractAddress != (common.Address{}) { - txReceipt.ContractAddress = &receipt.ContractAddress + txReceipt.ContractAddress = receipt.ContractAddress } return txReceipt, nil } diff --git a/staking/types/delegation_test.go b/staking/types/delegation_test.go index feadda3cc..ff0c25c02 100644 --- a/staking/types/delegation_test.go +++ b/staking/types/delegation_test.go @@ -166,3 +166,123 @@ func TestNoEarlyUnlock(t *testing.T) { t.Errorf("should not allow early unlock") } } + +func TestMaxRateAtLess(t *testing.T) { + // recreate it so that all tests can run + delegation := NewDelegation(delegatorAddr, delegationAmt) + lastEpochInCommittee := big.NewInt(1) + curEpoch := big.NewInt(27) + epoch := big.NewInt(21) + amount := big.NewInt(4000) + delegation.Undelegate(epoch, amount) + initialLength := len(delegation.Undelegations) + + result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, true, true) + if result.Cmp(big.NewInt(0)) != 0 { + t.Errorf("should not allow unlock before 7") + } + finalLength := len(delegation.Undelegations) + if initialLength != finalLength { + t.Errorf("should not remove undelegations before 7") + } +} + +func TestMaxRateAtEqual(t *testing.T) { + // recreate it so that all tests can run + delegation := NewDelegation(delegatorAddr, delegationAmt) + lastEpochInCommittee := big.NewInt(1) + curEpoch := big.NewInt(28) + epoch := big.NewInt(21) + amount := big.NewInt(4000) + delegation.Undelegate(epoch, amount) + initialLength := len(delegation.Undelegations) + + result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, true, true) + if result.Cmp(big.NewInt(4000)) != 0 { + t.Errorf("should withdraw at 7") + } + finalLength := len(delegation.Undelegations) + if initialLength == finalLength { + t.Errorf("should remove undelegations at 7") + } +} + +func TestMaxRateAtExcess(t *testing.T) { + // recreate it so that all tests can run + delegation := NewDelegation(delegatorAddr, delegationAmt) + lastEpochInCommittee := big.NewInt(1) + curEpoch := big.NewInt(29) + epoch := big.NewInt(21) + amount := big.NewInt(4000) + delegation.Undelegate(epoch, amount) + initialLength := len(delegation.Undelegations) + + result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, true, true) + if result.Cmp(big.NewInt(0)) != 0 { + t.Errorf("should not withdraw at 8") + } + finalLength := len(delegation.Undelegations) + if initialLength == finalLength { + t.Errorf("should remove undelegations at 8") + } +} + +func TestNoMaxRateAtLess(t *testing.T) { + // recreate it so that all tests can run + delegation := NewDelegation(delegatorAddr, delegationAmt) + lastEpochInCommittee := big.NewInt(1) + curEpoch := big.NewInt(27) + epoch := big.NewInt(21) + amount := big.NewInt(4000) + delegation.Undelegate(epoch, amount) + initialLength := len(delegation.Undelegations) + + result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, true, false) + if result.Cmp(big.NewInt(0)) != 0 { + t.Errorf("should not allow unlock before 7") + } + finalLength := len(delegation.Undelegations) + if initialLength != finalLength { + t.Errorf("should not remove undelegations before 7") + } +} + +func TestNoMaxRateAtEqual(t *testing.T) { + // recreate it so that all tests can run + delegation := NewDelegation(delegatorAddr, delegationAmt) + lastEpochInCommittee := big.NewInt(1) + curEpoch := big.NewInt(28) + epoch := big.NewInt(21) + amount := big.NewInt(4000) + delegation.Undelegate(epoch, amount) + initialLength := len(delegation.Undelegations) + + result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, true, false) + if result.Cmp(big.NewInt(4000)) != 0 { + t.Errorf("should withdraw at 7") + } + finalLength := len(delegation.Undelegations) + if initialLength == finalLength { + t.Errorf("should remove undelegations at 7") + } +} + +func TestNoMaxRateAtExcess(t *testing.T) { + // recreate it so that all tests can run + delegation := NewDelegation(delegatorAddr, delegationAmt) + lastEpochInCommittee := big.NewInt(1) + curEpoch := big.NewInt(29) + epoch := big.NewInt(21) + amount := big.NewInt(4000) + delegation.Undelegate(epoch, amount) + initialLength := len(delegation.Undelegations) + + result := delegation.RemoveUnlockedUndelegations(curEpoch, lastEpochInCommittee, 7, true, false) + if result.Cmp(big.NewInt(4000)) != 0 { + t.Errorf("should withdraw at 8") + } + finalLength := len(delegation.Undelegations) + if initialLength == finalLength { + t.Errorf("should remove undelegations at 8") + } +} From e68b44fd9814af389cc36617e261dd53ea2d2cc9 Mon Sep 17 00:00:00 2001 From: Gheis Mohammadi Date: Tue, 9 Jan 2024 10:49:53 +0800 Subject: [PATCH 4/5] Complete Fast Sync codes (#4594) * adjust full state sync request parameters, rename stage_state * add full state stage to the list of stages in fast sync * add RangeMode and ChainExecutionMode to handle execution of the stream sync stage * fix block exists issue on stage_states in stream sync * fix double insertion in stage states * add count for state downloader to return number of tasks * fix travis build issue by goimports * switch to Full Sync on pivot block, fix checking nil length in HandleRequestError --- .../stagedstreamsync/default_stages.go | 87 +++++++---- api/service/stagedstreamsync/downloader.go | 1 + api/service/stagedstreamsync/sig_verify.go | 12 +- api/service/stagedstreamsync/stage.go | 23 +++ api/service/stagedstreamsync/stage_finish.go | 5 + .../stagedstreamsync/stage_receipts.go | 6 + .../{stage_state.go => stage_states.go} | 4 + .../stagedstreamsync/stage_statesync.go | 9 +- .../stagedstreamsync/stage_statesync_full.go | 70 ++++++--- .../stagedstreamsync/staged_stream_sync.go | 55 +++++++ api/service/stagedstreamsync/stages.go | 19 +-- .../stagedstreamsync/state_sync_full.go | 147 ++++++++++-------- api/service/stagedstreamsync/syncing.go | 2 + 13 files changed, 305 insertions(+), 135 deletions(-) rename api/service/stagedstreamsync/{stage_state.go => stage_states.go} (98%) diff --git a/api/service/stagedstreamsync/default_stages.go b/api/service/stagedstreamsync/default_stages.go index f869ee5fe..fe64e26d4 100644 --- a/api/service/stagedstreamsync/default_stages.go +++ b/api/service/stagedstreamsync/default_stages.go @@ -64,7 +64,7 @@ func initFastSyncStagesOrder() { ShortRange, BlockBodies, Receipts, - StateSync, + FullStateSync, States, LastMile, Finish, @@ -74,7 +74,7 @@ func initFastSyncStagesOrder() { Finish, LastMile, States, - StateSync, + FullStateSync, Receipts, BlockBodies, ShortRange, @@ -86,7 +86,7 @@ func initFastSyncStagesOrder() { Finish, LastMile, States, - StateSync, + FullStateSync, Receipts, BlockBodies, ShortRange, @@ -101,6 +101,7 @@ func DefaultStages(ctx context.Context, srCfg StageShortRangeCfg, bodiesCfg StageBodiesCfg, stateSyncCfg StageStateSyncCfg, + fullStateSyncCfg StageFullStateSyncCfg, statesCfg StageStatesCfg, receiptsCfg StageReceiptsCfg, lastMileCfg StageLastMileCfg, @@ -113,55 +114,81 @@ func DefaultStages(ctx context.Context, handlerStageBodies := NewStageBodies(bodiesCfg) handlerStageStates := NewStageStates(statesCfg) handlerStageStateSync := NewStageStateSync(stateSyncCfg) + handlerStageFullStateSync := NewStageFullStateSync(fullStateSyncCfg) handlerStageReceipts := NewStageReceipts(receiptsCfg) handlerStageLastMile := NewStageLastMile(lastMileCfg) handlerStageFinish := NewStageFinish(finishCfg) return []*Stage{ { - ID: Heads, - Description: "Retrieve Chain Heads", - Handler: handlerStageHeads, + ID: Heads, + Description: "Retrieve Chain Heads", + Handler: handlerStageHeads, + RangeMode: OnlyLongRange, + ChainExecutionMode: AllChains, }, { - ID: SyncEpoch, - Description: "Sync only Last Block of Epoch", - Handler: handlerStageEpochSync, + ID: SyncEpoch, + Description: "Sync only Last Block of Epoch", + Handler: handlerStageEpochSync, + RangeMode: OnlyShortRange, + ChainExecutionMode: OnlyEpochChain, }, { - ID: ShortRange, - Description: "Short Range Sync", - Handler: handlerStageShortRange, + ID: ShortRange, + Description: "Short Range Sync", + Handler: handlerStageShortRange, + RangeMode: OnlyShortRange, + ChainExecutionMode: AllChainsExceptEpochChain, }, { - ID: BlockBodies, - Description: "Retrieve Block Bodies", - Handler: handlerStageBodies, + ID: BlockBodies, + Description: "Retrieve Block Bodies", + Handler: handlerStageBodies, + RangeMode: OnlyLongRange, + ChainExecutionMode: AllChainsExceptEpochChain, }, { - ID: States, - Description: "Update Blockchain State", - Handler: handlerStageStates, + ID: States, + Description: "Update Blockchain State", + Handler: handlerStageStates, + RangeMode: OnlyLongRange, + ChainExecutionMode: AllChainsExceptEpochChain, }, { - ID: StateSync, - Description: "Retrieve States", - Handler: handlerStageStateSync, + ID: StateSync, + Description: "Retrieve States", + Handler: handlerStageStateSync, + RangeMode: OnlyLongRange, + ChainExecutionMode: AllChainsExceptEpochChain, }, { - ID: Receipts, - Description: "Retrieve Receipts", - Handler: handlerStageReceipts, + ID: FullStateSync, + Description: "Retrieve Full States", + Handler: handlerStageFullStateSync, + RangeMode: OnlyLongRange, + ChainExecutionMode: AllChainsExceptEpochChain, }, { - ID: LastMile, - Description: "update status for blocks after sync and update last mile blocks as well", - Handler: handlerStageLastMile, + ID: Receipts, + Description: "Retrieve Receipts", + Handler: handlerStageReceipts, + RangeMode: OnlyLongRange, + ChainExecutionMode: AllChainsExceptEpochChain, }, { - ID: Finish, - Description: "Finalize Changes", - Handler: handlerStageFinish, + ID: LastMile, + Description: "update status for blocks after sync and update last mile blocks as well", + Handler: handlerStageLastMile, + RangeMode: LongRangeAndShortRange, + ChainExecutionMode: AllChainsExceptEpochChain, + }, + { + ID: Finish, + Description: "Finalize Changes", + Handler: handlerStageFinish, + RangeMode: LongRangeAndShortRange, + ChainExecutionMode: AllChains, }, } } diff --git a/api/service/stagedstreamsync/downloader.go b/api/service/stagedstreamsync/downloader.go index 371104895..9d564b016 100644 --- a/api/service/stagedstreamsync/downloader.go +++ b/api/service/stagedstreamsync/downloader.go @@ -285,4 +285,5 @@ func (d *Downloader) loop() { return } } + } diff --git a/api/service/stagedstreamsync/sig_verify.go b/api/service/stagedstreamsync/sig_verify.go index bdf5a2107..cd7fc4f91 100644 --- a/api/service/stagedstreamsync/sig_verify.go +++ b/api/service/stagedstreamsync/sig_verify.go @@ -54,14 +54,7 @@ func verifyBlock(bc blockChain, block *types.Block, nextBlocks ...*types.Block) if err := bc.Engine().VerifyHeader(bc, block.Header(), true); err != nil { return errors.Wrap(err, "[VerifyHeader]") } - _, err = bc.InsertChain(types.Blocks{block}, false) - switch { - case errors.Is(err, core.ErrKnownBlock): - return nil - case err != nil: - return errors.Wrap(err, "[InsertChain]") - default: - } + return nil } @@ -72,6 +65,9 @@ func verifyAndInsertBlock(bc blockChain, block *types.Block, nextBlocks ...*type } // insert block if _, err := bc.InsertChain(types.Blocks{block}, false); err != nil { + if errors.Is(err, core.ErrKnownBlock) { + return nil + } return errors.Wrap(err, "[InsertChain]") } return nil diff --git a/api/service/stagedstreamsync/stage.go b/api/service/stagedstreamsync/stage.go index 48334a5e5..59602fe81 100644 --- a/api/service/stagedstreamsync/stage.go +++ b/api/service/stagedstreamsync/stage.go @@ -30,6 +30,25 @@ type StageHandler interface { CleanUp(ctx context.Context, firstCycle bool, p *CleanUpState, tx kv.RwTx) error } +type RangeExecution uint32 + +const ( + LongRangeAndShortRange RangeExecution = iota // Both short range and long range + OnlyShortRange // only short range + OnlyLongRange // only long range + //OnlyEpochSync // only epoch sync +) + +type ChainExecution uint32 + +const ( + AllChains ChainExecution = iota // Can execute for any shard + AllChainsExceptEpochChain // Can execute for any shard except epoch chain + OnlyBeaconNode // only for beacon node + OnlyEpochChain // only for epoch chain + OnlyShardChain // only for shard node (exclude beacon node and epoch chain) +) + // Stage is a single sync stage in staged sync. type Stage struct { // ID of the sync stage. Should not be empty and should be unique. It is recommended to prefix it with reverse domain to avoid clashes (`com.example.my-stage`). @@ -42,6 +61,10 @@ type Stage struct { DisabledDescription string // Disabled defines if the stage is disabled. It sets up when the stage is build by its `StageBuilder`. Disabled bool + // Range defines whether stage has to be executed for either long range or short range + RangeMode RangeExecution + // ShardExecution defines this stage has to be executed for which shards + ChainExecutionMode ChainExecution } // StageState is the state of the stage. diff --git a/api/service/stagedstreamsync/stage_finish.go b/api/service/stagedstreamsync/stage_finish.go index 0dfae53ae..c94aa692b 100644 --- a/api/service/stagedstreamsync/stage_finish.go +++ b/api/service/stagedstreamsync/stage_finish.go @@ -39,6 +39,11 @@ func (finish *StageFinish) Exec(ctx context.Context, firstCycle bool, invalidBlo // TODO: prepare indices (useful for RPC) and finalize + // switch to Full Sync Mode if the states are synced + if s.state.status.statesSynced { + s.state.status.cycleSyncMode = FullSync + } + if useInternalTx { if err := tx.Commit(); err != nil { return err diff --git a/api/service/stagedstreamsync/stage_receipts.go b/api/service/stagedstreamsync/stage_receipts.go index 4445eb6ba..78e8e089c 100644 --- a/api/service/stagedstreamsync/stage_receipts.go +++ b/api/service/stagedstreamsync/stage_receipts.go @@ -12,6 +12,7 @@ import ( "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/internal/utils" sttypes "github.com/harmony-one/harmony/p2p/stream/types" + "github.com/harmony-one/harmony/shard" "github.com/ledgerwatch/erigon-lib/kv" "github.com/pkg/errors" ) @@ -56,6 +57,11 @@ func (r *StageReceipts) Exec(ctx context.Context, firstCycle bool, invalidBlockR return nil } + // shouldn't execute for epoch chain + if r.configs.bc.ShardID() == shard.BeaconChainShardID && !s.state.isBeaconNode { + return nil + } + useInternalTx := tx == nil if invalidBlockRevert { diff --git a/api/service/stagedstreamsync/stage_state.go b/api/service/stagedstreamsync/stage_states.go similarity index 98% rename from api/service/stagedstreamsync/stage_state.go rename to api/service/stagedstreamsync/stage_states.go index df864d63f..1b668786c 100644 --- a/api/service/stagedstreamsync/stage_state.go +++ b/api/service/stagedstreamsync/stage_states.go @@ -165,6 +165,10 @@ func (stg *StageStates) Exec(ctx context.Context, firstCycle bool, invalidBlockR return ErrInvalidBlockNumber } + if stg.configs.bc.HasBlock(block.Hash(), block.NumberU64()) { + continue + } + if err := verifyAndInsertBlock(stg.configs.bc, block); err != nil { stg.configs.logger.Warn().Err(err).Uint64("cycle target block", targetHeight). Uint64("block number", block.NumberU64()). diff --git a/api/service/stagedstreamsync/stage_statesync.go b/api/service/stagedstreamsync/stage_statesync.go index c4e66e10e..3ce733f41 100644 --- a/api/service/stagedstreamsync/stage_statesync.go +++ b/api/service/stagedstreamsync/stage_statesync.go @@ -10,6 +10,7 @@ import ( "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/internal/utils" sttypes "github.com/harmony-one/harmony/p2p/stream/types" + "github.com/harmony-one/harmony/shard" "github.com/ledgerwatch/erigon-lib/kv" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -58,8 +59,14 @@ func (sss *StageStateSync) Exec(ctx context.Context, bool, invalidBlockRevert bo // for short range sync, skip this step if !s.state.initSync { return nil - } // only execute this stage in fast/snap sync mode and once we reach to pivot + } + + // shouldn't execute for epoch chain + if sss.configs.bc.ShardID() == shard.BeaconChainShardID && !s.state.isBeaconNode { + return nil + } + // only execute this stage in fast/snap sync mode and once we reach to pivot if s.state.status.pivotBlock == nil || s.state.CurrentBlockNumber() != s.state.status.pivotBlock.NumberU64() || s.state.status.statesSynced { diff --git a/api/service/stagedstreamsync/stage_statesync_full.go b/api/service/stagedstreamsync/stage_statesync_full.go index d304ca1c3..c1579114b 100644 --- a/api/service/stagedstreamsync/stage_statesync_full.go +++ b/api/service/stagedstreamsync/stage_statesync_full.go @@ -9,6 +9,7 @@ import ( "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/internal/utils" sttypes "github.com/harmony-one/harmony/p2p/stream/types" + "github.com/harmony-one/harmony/shard" "github.com/pkg/errors" //sttypes "github.com/harmony-one/harmony/p2p/stream/types" @@ -59,8 +60,19 @@ func (sss *StageFullStateSync) Exec(ctx context.Context, bool, invalidBlockRever // for short range sync, skip this step if !s.state.initSync { return nil - } // only execute this stage in fast/snap sync mode and once we reach to pivot + } + + // shouldn't execute for epoch chain + if sss.configs.bc.ShardID() == shard.BeaconChainShardID && !s.state.isBeaconNode { + return nil + } + + // if states are already synced, don't execute this stage + if s.state.status.statesSynced { + return + } + // only execute this stage in fast/snap sync mode and once we reach to pivot if s.state.status.pivotBlock == nil || s.state.CurrentBlockNumber() != s.state.status.pivotBlock.NumberU64() || s.state.status.statesSynced { @@ -72,21 +84,21 @@ func (sss *StageFullStateSync) Exec(ctx context.Context, bool, invalidBlockRever // if currentHead >= maxHeight { // return nil // } - // currProgress := s.state.CurrentBlockNumber() // targetHeight := s.state.currentCycle.TargetHeight - // if errV := CreateView(ctx, sss.configs.db, tx, func(etx kv.Tx) error { - // if currProgress, err = s.CurrentStageProgress(etx); err != nil { - // return err - // } - // return nil - // }); errV != nil { - // return errV - // } + currProgress := uint64(0) + if errV := CreateView(ctx, sss.configs.db, tx, func(etx kv.Tx) error { + if currProgress, err = s.CurrentStageProgress(etx); err != nil { + return err + } + return nil + }); errV != nil { + return errV + } + if currProgress >= s.state.status.pivotBlock.NumberU64() { + return nil + } - // if currProgress >= targetHeight { - // return nil - // } useInternalTx := tx == nil if useInternalTx { var err error @@ -109,6 +121,8 @@ func (sss *StageFullStateSync) Exec(ctx context.Context, bool, invalidBlockRever scheme := sss.configs.bc.TrieDB().Scheme() sdm := newFullStateDownloadManager(sss.configs.bc.ChainDb(), scheme, tx, sss.configs.bc, sss.configs.concurrency, s.state.logger) sdm.setRootHash(currentBlockRootHash) + + sdm.SyncStarted() var wg sync.WaitGroup for i := 0; i < s.state.config.Concurrency; i++ { wg.Add(1) @@ -128,6 +142,12 @@ func (sss *StageFullStateSync) Exec(ctx context.Context, bool, invalidBlockRever // states should be fully synced in this stage s.state.status.statesSynced = true + if err := sss.saveProgress(s, tx); err != nil { + sss.configs.logger.Warn().Err(err). + Uint64("pivot block number", s.state.status.pivotBlock.NumberU64()). + Msg(WrapStagedSyncMsg("save progress for statesync stage failed")) + } + /* gbm := s.state.gbm @@ -169,8 +189,8 @@ func (sss *StageFullStateSync) runStateWorkerLoop(ctx context.Context, sdm *Full return default: } - accountTasks, codes, storages, healtask, codetask, err := sdm.GetNextBatch() - if len(accountTasks)+len(codes)+len(storages.accounts)+len(healtask.hashes)+len(codetask.hashes) == 0 || err != nil { + accountTasks, codes, storages, healtask, codetask, nTasks, err := sdm.GetNextBatch() + if nTasks == 0 || err != nil { select { case <-ctx.Done(): return @@ -184,8 +204,8 @@ func (sss *StageFullStateSync) runStateWorkerLoop(ctx context.Context, sdm *Full task := accountTasks[0] origin := task.Next limit := task.Last - root := sdm.root - cap := maxRequestSize + root := task.root + cap := task.cap retAccounts, proof, stid, err := sss.configs.protocol.GetAccountRange(ctx, root, origin, limit, uint64(cap)) if err != nil { if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { @@ -234,10 +254,10 @@ func (sss *StageFullStateSync) runStateWorkerLoop(ctx context.Context, sdm *Full } else if len(storages.accounts) > 0 { - root := sdm.root + root := storages.root roots := storages.roots accounts := storages.accounts - cap := maxRequestSize + cap := storages.cap origin := storages.origin limit := storages.limit mainTask := storages.mainTask @@ -276,13 +296,14 @@ func (sss *StageFullStateSync) runStateWorkerLoop(ctx context.Context, sdm *Full } else { // assign trie node Heal Tasks if len(healtask.hashes) > 0 { - root := sdm.root + root := healtask.root task := healtask.task hashes := healtask.hashes pathsets := healtask.pathsets paths := healtask.paths + bytes := healtask.bytes - nodes, stid, err := sss.configs.protocol.GetTrieNodes(ctx, root, pathsets, maxRequestSize) + nodes, stid, err := sss.configs.protocol.GetTrieNodes(ctx, root, pathsets, uint64(bytes)) if err != nil { if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { sss.configs.protocol.StreamFailed(stid, "GetTrieNodes failed") @@ -316,7 +337,8 @@ func (sss *StageFullStateSync) runStateWorkerLoop(ctx context.Context, sdm *Full if len(codetask.hashes) > 0 { task := codetask.task hashes := codetask.hashes - retCodes, stid, err := sss.configs.protocol.GetByteCodes(ctx, hashes, maxRequestSize) + bytes := codetask.bytes + retCodes, stid, err := sss.configs.protocol.GetByteCodes(ctx, hashes, uint64(bytes)) if err != nil { if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { sss.configs.protocol.StreamFailed(stid, "GetByteCodes failed") @@ -354,7 +376,7 @@ func (sss *StageFullStateSync) downloadByteCodes(ctx context.Context, sdm *FullS for _, codeTask := range codeTasks { // try to get byte codes from remote peer // if any of them failed, the stid will be the id of the failed stream - retCodes, stid, err := sss.configs.protocol.GetByteCodes(ctx, codeTask.hashes, maxRequestSize) + retCodes, stid, err := sss.configs.protocol.GetByteCodes(ctx, codeTask.hashes, uint64(codeTask.cap)) if err != nil { return stid, err } @@ -413,7 +435,7 @@ func (stg *StageFullStateSync) saveProgress(s *StageState, tx kv.RwTx) (err erro } // save progress - if err = s.Update(tx, s.state.CurrentBlockNumber()); err != nil { + if err = s.Update(tx, s.state.status.pivotBlock.NumberU64()); err != nil { utils.Logger().Error(). Err(err). Msgf("[STAGED_SYNC] saving progress for block States stage failed") diff --git a/api/service/stagedstreamsync/staged_stream_sync.go b/api/service/stagedstreamsync/staged_stream_sync.go index 03340eb15..1782068b2 100644 --- a/api/service/stagedstreamsync/staged_stream_sync.go +++ b/api/service/stagedstreamsync/staged_stream_sync.go @@ -16,6 +16,7 @@ import ( "github.com/harmony-one/harmony/internal/utils" syncproto "github.com/harmony-one/harmony/p2p/stream/protocols/sync" sttypes "github.com/harmony-one/harmony/p2p/stream/types" + "github.com/harmony-one/harmony/shard" "github.com/ledgerwatch/erigon-lib/kv" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -405,6 +406,11 @@ func (s *StagedStreamSync) Run(ctx context.Context, db kv.RwDB, tx kv.RwTx, firs continue } + // TODO: enable this part after make sure all works well + // if !s.canExecute(stage) { + // continue + // } + if err := s.runStage(ctx, stage, db, tx, firstCycle, s.invalidBlock.Active); err != nil { utils.Logger().Error(). Err(err). @@ -431,6 +437,55 @@ func (s *StagedStreamSync) Run(ctx context.Context, db kv.RwDB, tx kv.RwTx, firs return nil } +func (s *StagedStreamSync) canExecute(stage *Stage) bool { + // check range mode + if stage.RangeMode != LongRangeAndShortRange { + isLongRange := s.initSync + switch stage.RangeMode { + case OnlyLongRange: + if !isLongRange { + return false + } + case OnlyShortRange: + if isLongRange { + return false + } + default: + return false + } + } + + // check chain execution + if stage.ChainExecutionMode != AllChains { + shardID := s.bc.ShardID() + isBeaconNode := s.isBeaconNode + isShardChain := shardID != shard.BeaconChainShardID + isEpochChain := shardID == shard.BeaconChainShardID && !isBeaconNode + switch stage.ChainExecutionMode { + case AllChainsExceptEpochChain: + if isEpochChain { + return false + } + case OnlyBeaconNode: + if !isBeaconNode { + return false + } + case OnlyShardChain: + if !isShardChain { + return false + } + case OnlyEpochChain: + if !isEpochChain { + return false + } + default: + return false + } + } + + return true +} + // CreateView creates a view for a given db func CreateView(ctx context.Context, db kv.RwDB, tx kv.Tx, f func(tx kv.Tx) error) error { if tx != nil { diff --git a/api/service/stagedstreamsync/stages.go b/api/service/stagedstreamsync/stages.go index 6ad9e4519..33f3b293b 100644 --- a/api/service/stagedstreamsync/stages.go +++ b/api/service/stagedstreamsync/stages.go @@ -8,15 +8,16 @@ import ( type SyncStageID string const ( - Heads SyncStageID = "Heads" // Heads are downloaded - ShortRange SyncStageID = "ShortRange" // short range - SyncEpoch SyncStageID = "SyncEpoch" // epoch sync - BlockBodies SyncStageID = "BlockBodies" // Block bodies are downloaded, TxHash and UncleHash are getting verified - States SyncStageID = "States" // will construct most recent state from downloaded blocks - StateSync SyncStageID = "StateSync" // State sync - Receipts SyncStageID = "Receipts" // Receipts - LastMile SyncStageID = "LastMile" // update blocks after sync and update last mile blocks as well - Finish SyncStageID = "Finish" // Nominal stage after all other stages + Heads SyncStageID = "Heads" // Heads are downloaded + ShortRange SyncStageID = "ShortRange" // short range + SyncEpoch SyncStageID = "SyncEpoch" // epoch sync + BlockBodies SyncStageID = "BlockBodies" // Block bodies are downloaded, TxHash and UncleHash are getting verified + States SyncStageID = "States" // will construct most recent state from downloaded blocks + StateSync SyncStageID = "StateSync" // State sync + FullStateSync SyncStageID = "FullStateSync" // Full State Sync + Receipts SyncStageID = "Receipts" // Receipts + LastMile SyncStageID = "LastMile" // update blocks after sync and update last mile blocks as well + Finish SyncStageID = "Finish" // Nominal stage after all other stages ) // GetStageName returns the stage name in string diff --git a/api/service/stagedstreamsync/state_sync_full.go b/api/service/stagedstreamsync/state_sync_full.go index c98dcbafd..14cdb1f59 100644 --- a/api/service/stagedstreamsync/state_sync_full.go +++ b/api/service/stagedstreamsync/state_sync_full.go @@ -108,6 +108,11 @@ var ( type accountTask struct { id uint64 //unique id for account task + root common.Hash + origin common.Hash + limit common.Hash + cap int + // These fields get serialized to leveldb on shutdown Next common.Hash // Next account to sync in this interval Last common.Hash // Last account to sync in this interval @@ -229,16 +234,19 @@ type byteCodeTasksBundle struct { id uint64 //unique id for bytecode task bundle task *accountTask hashes []common.Hash + cap int } type storageTaskBundle struct { id uint64 //unique id for storage task bundle + root common.Hash accounts []common.Hash roots []common.Hash mainTask *accountTask subtask *storageTask origin common.Hash limit common.Hash + cap int } // healTask represents the sync task for healing the snap-synced chunk boundaries. @@ -251,6 +259,7 @@ type healTask struct { pathsets []*message.TrieNodePathSet task *healTask root common.Hash + bytes int byteCodeReq bool } @@ -259,7 +268,6 @@ type tasks struct { storageTasks map[uint64]*storageTaskBundle // Set of trie node tasks currently queued for retrieval, indexed by path codeTasks map[uint64]*byteCodeTasksBundle // Set of byte code tasks currently queued for retrieval, indexed by hash healer map[uint64]*healTask - snapped bool // Flag to signal that snap phase is done } func newTasks() *tasks { @@ -268,7 +276,6 @@ func newTasks() *tasks { storageTasks: make(map[uint64]*storageTaskBundle, 0), codeTasks: make(map[uint64]*byteCodeTasksBundle), healer: make(map[uint64]*healTask, 0), - snapped: false, } } @@ -399,8 +406,6 @@ type FullStateDownloadManager struct { storageSynced uint64 // Number of storage slots downloaded storageBytes common.StorageSize // Number of storage trie bytes persisted to disk - pend sync.WaitGroup // Tracks network request goroutines for graceful shutdown - stateWriter ethdb.Batch // Shared batch writer used for persisting raw states accountHealed uint64 // Number of accounts downloaded during the healing stage accountHealedBytes common.StorageSize // Number of raw account bytes persisted to disk during the healing stage @@ -420,6 +425,9 @@ type FullStateDownloadManager struct { bytecodeHealBytes common.StorageSize // Number of bytecodes persisted to disk bytecodeHealDups uint64 // Number of bytecodes already processed bytecodeHealNops uint64 // Number of bytecodes not requested + + startTime time.Time // Time instance when snapshot sync started + logTime time.Time // Time instance when status was last reported } func newFullStateDownloadManager(db ethdb.KeyValueStore, @@ -430,18 +438,19 @@ func newFullStateDownloadManager(db ethdb.KeyValueStore, logger zerolog.Logger) *FullStateDownloadManager { return &FullStateDownloadManager{ - db: db, - scheme: scheme, - bc: bc, - stateWriter: db.NewBatch(), - tx: tx, - keccak: sha3.NewLegacyKeccak256().(crypto.KeccakState), - concurrency: concurrency, - logger: logger, - tasks: newTasks(), - requesting: newTasks(), - processing: newTasks(), - retries: newTasks(), + db: db, + scheme: scheme, + bc: bc, + stateWriter: db.NewBatch(), + tx: tx, + keccak: sha3.NewLegacyKeccak256().(crypto.KeccakState), + concurrency: concurrency, + logger: logger, + tasks: newTasks(), + requesting: newTasks(), + processing: newTasks(), + retries: newTasks(), + trienodeHealThrottle: maxTrienodeHealThrottle, // Tune downward instead of insta-filling with junk } } @@ -531,6 +540,12 @@ func (s *FullStateDownloadManager) commitHealer(force bool) { utils.Logger().Debug().Str("type", "trienodes").Interface("bytes", common.StorageSize(batch.ValueSize())).Msg("Persisted set of healing data") } +func (s *FullStateDownloadManager) SyncStarted() { + if s.startTime == (time.Time{}) { + s.startTime = time.Now() + } +} + func (s *FullStateDownloadManager) SyncCompleted() { defer func() { // Persist any progress, independent of failure for _, task := range s.tasks.accountTasks { @@ -556,7 +571,8 @@ func (s *FullStateDownloadManager) SyncCompleted() { utils.Logger().Debug().Interface("root", s.root).Msg("Terminating snapshot sync cycle") }() - utils.Logger().Debug().Msg("Snapshot sync already completed") + elapsed := time.Since(s.startTime) + utils.Logger().Debug().Interface("elapsed", elapsed).Msg("Snapshot sync already completed") } // getNextBatch returns objects with a maximum of n state download @@ -566,38 +582,30 @@ func (s *FullStateDownloadManager) GetNextBatch() (accounts []*accountTask, storages *storageTaskBundle, healtask *healTask, codetask *healTask, + nItems int, err error) { s.lock.Lock() defer s.lock.Unlock() - accounts, codes, storages, healtask, codetask = s.getBatchFromRetries() - nItems := len(accounts) + len(codes) + len(storages.roots) + len(healtask.hashes) + len(codetask.hashes) + accounts, codes, storages, healtask, codetask, nItems = s.getBatchFromRetries() if nItems > 0 { return } if len(s.tasks.accountTasks) == 0 && s.scheduler.Pending() == 0 { - if nItems == 0 { - s.SyncCompleted() - } + s.SyncCompleted() return } // Refill available tasks from the scheduler. - withHealTasks := true - if healtask != nil || codetask != nil { - withHealTasks = false - } - newAccounts, newCodes, newStorageTaskBundle, newHealTask, newCodeTask := s.getBatchFromUnprocessed(withHealTasks) + newAccounts, newCodes, newStorageTaskBundle, newHealTask, newCodeTask, nItems := s.getBatchFromUnprocessed() accounts = append(accounts, newAccounts...) codes = append(codes, newCodes...) storages = newStorageTaskBundle - if withHealTasks { - healtask = newHealTask - codetask = newCodeTask - } + healtask = newHealTask + codetask = newCodeTask return } @@ -714,7 +722,7 @@ func (s *FullStateDownloadManager) loadSyncStatus() { // Either we've failed to decode the previous state, or there was none. // Start a fresh sync by chunking up the account range and scheduling // them for retrieval. - s.tasks.accountTasks = nil + s.tasks = newTasks() s.accountSynced, s.accountBytes = 0, 0 s.bytecodeSynced, s.bytecodeBytes = 0, 0 s.storageSynced, s.storageBytes = 0, 0 @@ -921,16 +929,18 @@ func (s *FullStateDownloadManager) updateStats(written, duplicate, unexpected in // getBatchFromUnprocessed returns objects with a maximum of n unprocessed state download // tasks to send to the remote peer. -func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) ( +func (s *FullStateDownloadManager) getBatchFromUnprocessed() ( accounts []*accountTask, codes []*byteCodeTasksBundle, storages *storageTaskBundle, healtask *healTask, - codetask *healTask) { + codetask *healTask, + count int) { // over trie nodes as those can be written to disk and forgotten about. codes = make([]*byteCodeTasksBundle, 0) accounts = make([]*accountTask, 0) + count = 0 for i, task := range s.tasks.accountTasks { // Stop when we've gathered enough requests @@ -956,12 +966,18 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) ( break } + task.root = s.root + task.origin = task.Next + task.limit = task.Last + task.cap = maxRequestSize + task.requested = true s.tasks.accountTasks[i].requested = true accounts = append(accounts, task) s.requesting.addAccountTask(task.id, task) s.tasks.addAccountTask(task.id, task) // one task account is enough for an stream + count = len(accounts) return } @@ -997,6 +1013,7 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) ( id: taskID, hashes: hashes, task: task, + cap: maxRequestSize, } codes = append(codes, bytecodeTask) @@ -1005,12 +1022,14 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) ( // Stop when we've gathered enough requests if totalHashes >= maxCodeRequestCount { + count = totalHashes return } } // if we found some codes, can assign it to node if totalHashes > 0 { + count = totalHashes return } @@ -1020,14 +1039,8 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) ( continue } - // TODO: check cap calculations (shouldn't give us big chunk) - // if cap > maxRequestSize { - // cap = maxRequestSize - // } - // if cap < minRequestSize { // Don't bother with peers below a bare minimum performance - // cap = minRequestSize - // } - storageSets := maxRequestSize / 1024 + cap := maxRequestSize + storageSets := cap / 1024 storages = &storageTaskBundle{ accounts: make([]common.Hash, 0, storageSets), @@ -1089,23 +1102,21 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) ( storages.origin = storages.subtask.Next storages.limit = storages.subtask.Last } + storages.root = s.root + storages.cap = cap s.tasks.addStorageTaskBundle(taskID, storages) s.requesting.addStorageTaskBundle(taskID, storages) - + count = len(storages.accounts) return } if len(storages.accounts) > 0 { - return - } - - if !withHealTasks { + count = len(storages.accounts) return } // Sync phase done, run heal phase - - // Iterate over pending tasks and try to find a peer to retrieve with + // Iterate over pending tasks for (len(s.tasks.healer) > 0 && len(s.tasks.healer[0].hashes) > 0) || s.scheduler.Pending() > 0 { // If there are not enough trie tasks queued to fully assign, fill the // queue from the state sync scheduler. The trie synced schedules these @@ -1129,7 +1140,7 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) ( // If all the heal tasks are bytecodes or already downloading, bail if len(s.tasks.healer[0].trieTasks) == 0 { - return + break } // Generate the network query and send it to the peer // if cap > maxTrieRequestCount { @@ -1177,6 +1188,7 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) ( pathsets: pathsets, root: s.root, task: s.tasks.healer[0], + bytes: maxRequestSize, byteCodeReq: false, } @@ -1184,6 +1196,7 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) ( s.requesting.addHealerTask(taskID, healtask) if len(hashes) > 0 { + count = len(hashes) return } } @@ -1205,7 +1218,7 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) ( // If all the heal tasks are trienodes or already downloading, bail if len(s.tasks.healer[0].codeTasks) == 0 { - return + break } // Task pending retrieval, try to find an idle peer. If no such peer // exists, we probably assigned tasks for all (or they are stateless). @@ -1243,9 +1256,10 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) ( id: taskID, hashes: hashes, task: s.tasks.healer[0], + bytes: maxRequestSize, byteCodeReq: true, } - + count = len(hashes) s.tasks.healer[taskID] = codetask s.requesting.addHealerTask(taskID, healtask) } @@ -1272,7 +1286,8 @@ func (s *FullStateDownloadManager) getBatchFromRetries() ( codes []*byteCodeTasksBundle, storages *storageTaskBundle, healtask *healTask, - codetask *healTask) { + codetask *healTask, + count int) { // over trie nodes as those can be written to disk and forgotten about. accounts = make([]*accountTask, 0) @@ -1290,6 +1305,7 @@ func (s *FullStateDownloadManager) getBatchFromRetries() ( } if len(accounts) > 0 { + count = len(accounts) return } @@ -1301,6 +1317,7 @@ func (s *FullStateDownloadManager) getBatchFromRetries() ( } if len(codes) > 0 { + count = len(codes) return } @@ -1316,10 +1333,7 @@ func (s *FullStateDownloadManager) getBatchFromRetries() ( } s.requesting.addStorageTaskBundle(storages.id, storages) s.retries.deleteStorageTaskBundle(storages.id) - return - } - - if len(storages.accounts) > 0 { + count = len(storages.accounts) return } @@ -1338,6 +1352,7 @@ func (s *FullStateDownloadManager) getBatchFromRetries() ( } s.requesting.addHealerTask(id, task) s.retries.deleteHealerTask(id) + count = len(task.hashes) return } if task.byteCodeReq { @@ -1352,11 +1367,13 @@ func (s *FullStateDownloadManager) getBatchFromRetries() ( } s.requesting.addHealerTask(id, task) s.retries.deleteHealerTask(id) + count = len(task.hashes) return } } } + count = 0 return } @@ -1371,14 +1388,18 @@ func (s *FullStateDownloadManager) HandleRequestError(accounts []*accountTask, s.lock.Lock() defer s.lock.Unlock() - for _, task := range accounts { - s.requesting.deleteAccountTask(task.id) - s.retries.addAccountTask(task.id, task) + if accounts != nil && len(accounts) > 0 { + for _, task := range accounts { + s.requesting.deleteAccountTask(task.id) + s.retries.addAccountTask(task.id, task) + } } - for _, code := range codes { - s.requesting.deleteCodeTask(code.id) - s.retries.addCodeTask(code.id, code) + if codes != nil && len(codes) > 0 { + for _, code := range codes { + s.requesting.deleteCodeTask(code.id) + s.retries.addCodeTask(code.id, code) + } } if storages != nil { diff --git a/api/service/stagedstreamsync/syncing.go b/api/service/stagedstreamsync/syncing.go index e6879a523..c3bc585f2 100644 --- a/api/service/stagedstreamsync/syncing.go +++ b/api/service/stagedstreamsync/syncing.go @@ -90,6 +90,7 @@ func CreateStagedSync(ctx context.Context, stageBodiesCfg := NewStageBodiesCfg(bc, mainDB, dbs, config.Concurrency, protocol, isBeaconNode, extractReceiptHashes, config.LogProgress) stageStatesCfg := NewStageStatesCfg(bc, mainDB, dbs, config.Concurrency, logger, config.LogProgress) stageStateSyncCfg := NewStageStateSyncCfg(bc, mainDB, config.Concurrency, protocol, logger, config.LogProgress) + stageFullStateSyncCfg := NewStageFullStateSyncCfg(bc, mainDB, config.Concurrency, protocol, logger, config.LogProgress) stageReceiptsCfg := NewStageReceiptsCfg(bc, mainDB, dbs, config.Concurrency, protocol, isBeaconNode, config.LogProgress) lastMileCfg := NewStageLastMileCfg(ctx, bc, mainDB) stageFinishCfg := NewStageFinishCfg(mainDB) @@ -103,6 +104,7 @@ func CreateStagedSync(ctx context.Context, stageShortRangeCfg, stageBodiesCfg, stageStateSyncCfg, + stageFullStateSyncCfg, stageStatesCfg, stageReceiptsCfg, lastMileCfg, From 5443bf7d94ed92efd69a39c999d0f9a88033d35a Mon Sep 17 00:00:00 2001 From: Diego Nava <8563843+diego1q2w@users.noreply.github.com> Date: Mon, 8 Jan 2024 20:50:35 -0600 Subject: [PATCH 5/5] reduce internal voting power to 10% devnet (#4599) --- internal/configs/sharding/partner.go | 10 ++++++++++ internal/params/config.go | 14 ++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/internal/configs/sharding/partner.go b/internal/configs/sharding/partner.go index 99ea96141..93cc4139d 100644 --- a/internal/configs/sharding/partner.go +++ b/internal/configs/sharding/partner.go @@ -40,6 +40,8 @@ const ( func (ps partnerSchedule) InstanceForEpoch(epoch *big.Int) Instance { switch { + case params.PartnerChainConfig.IsDevnetExternalEpoch(epoch): + return partnerV3 case params.PartnerChainConfig.IsHIP30(epoch): return partnerV2 case epoch.Cmp(params.PartnerChainConfig.StakingEpoch) >= 0: @@ -111,3 +113,11 @@ var partnerV2 = MustNewInstance( hip30CollectionAddressTestnet, partnerReshardingEpoch, PartnerSchedule.BlocksPerEpoch(), ) +var partnerV3 = MustNewInstance( + 2, 5, 1, 0, + numeric.MustNewDecFromStr("0.1"), genesis.TNHarmonyAccounts, + genesis.TNFoundationalAccounts, emptyAllowlist, + feeCollectorsDevnet[1], numeric.MustNewDecFromStr("0.25"), + hip30CollectionAddressTestnet, partnerReshardingEpoch, + PartnerSchedule.BlocksPerEpoch(), +) diff --git a/internal/params/config.go b/internal/params/config.go index 15ca7d287..703dea062 100644 --- a/internal/params/config.go +++ b/internal/params/config.go @@ -76,6 +76,7 @@ var ( HIP30Epoch: big.NewInt(1673), // 2023-11-02 17:30:00+00:00 BlockGas30MEpoch: big.NewInt(1673), // 2023-11-02 17:30:00+00:00 MaxRateEpoch: EpochTBD, + DevnetExternalEpoch: EpochTBD, } // TestnetChainConfig contains the chain parameters to run a node on the harmony test network. @@ -120,6 +121,7 @@ var ( HIP30Epoch: big.NewInt(2176), // 2023-10-12 10:00:00+00:00 BlockGas30MEpoch: big.NewInt(2176), // 2023-10-12 10:00:00+00:00 MaxRateEpoch: EpochTBD, + DevnetExternalEpoch: EpochTBD, } // PangaeaChainConfig contains the chain parameters for the Pangaea network. // All features except for CrossLink are enabled at launch. @@ -164,6 +166,7 @@ var ( HIP30Epoch: EpochTBD, BlockGas30MEpoch: big.NewInt(0), MaxRateEpoch: EpochTBD, + DevnetExternalEpoch: EpochTBD, } // PartnerChainConfig contains the chain parameters for the Partner network. @@ -209,6 +212,7 @@ var ( HIP30Epoch: big.NewInt(7), BlockGas30MEpoch: big.NewInt(7), MaxRateEpoch: EpochTBD, + DevnetExternalEpoch: EpochTBD, } // StressnetChainConfig contains the chain parameters for the Stress test network. @@ -254,6 +258,7 @@ var ( HIP30Epoch: EpochTBD, BlockGas30MEpoch: big.NewInt(0), MaxRateEpoch: EpochTBD, + DevnetExternalEpoch: EpochTBD, } // LocalnetChainConfig contains the chain parameters to run for local development. @@ -298,6 +303,7 @@ var ( HIP30Epoch: EpochTBD, BlockGas30MEpoch: big.NewInt(0), MaxRateEpoch: EpochTBD, + DevnetExternalEpoch: EpochTBD, } // AllProtocolChanges ... @@ -344,6 +350,7 @@ var ( big.NewInt(0), // BlockGas30M big.NewInt(0), // BlockGas30M big.NewInt(0), // MaxRateEpoch + big.NewInt(0), } // TestChainConfig ... @@ -390,6 +397,7 @@ var ( big.NewInt(0), // HIP30Epoch big.NewInt(0), // BlockGas30M big.NewInt(0), // MaxRateEpoch + big.NewInt(0), } // TestRules ... @@ -554,6 +562,8 @@ type ChainConfig struct { // 4. Change the minimum validator commission from 5 to 7% (all nets) HIP30Epoch *big.Int `json:"hip30-epoch,omitempty"` + DevnetExternalEpoch *big.Int `json:"devnet-external-epoch,omitempty"` + BlockGas30MEpoch *big.Int `json:"block-gas-30m-epoch,omitempty"` // MaxRateEpoch will make sure the validator max-rate is at least equal to the minRate + the validator max-rate-increase @@ -814,6 +824,10 @@ func (c *ChainConfig) IsHIP30(epoch *big.Int) bool { return isForked(c.HIP30Epoch, epoch) } +func (c *ChainConfig) IsDevnetExternalEpoch(epoch *big.Int) bool { + return isForked(c.DevnetExternalEpoch, epoch) +} + func (c *ChainConfig) IsMaxRate(epoch *big.Int) bool { return isForked(c.MaxRateEpoch, epoch) }