Merge pull request #4503 from harmony-one/hip30/testing

HIP-30 Shard Reduction Process
pull/4525/head
Soph 1 year ago committed by GitHub
commit 2378b2d880
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      Makefile
  2. 13
      cmd/harmony/default.go
  3. 64
      cmd/harmony/flags.go
  4. 89
      cmd/harmony/flags_test.go
  5. 57
      cmd/harmony/main.go
  6. 2
      consensus/consensus_service.go
  7. 70
      consensus/metrics.go
  8. 4
      core/blockchain.go
  9. 25
      core/blockchain_impl.go
  10. 14
      core/blockchain_stub.go
  11. 6
      core/epochchain.go
  12. 11
      core/genesis.go
  13. 374
      core/preimages.go
  14. 62
      core/rawdb/accessors_state.go
  15. 4
      core/rawdb/schema.go
  16. 268
      core/state_processor.go
  17. 7
      internal/chain/reward.go
  18. 21
      internal/cli/flag.go
  19. 15
      internal/cli/parse.go
  20. 10
      internal/configs/harmony/harmony.go
  21. 1
      internal/configs/node/config.go
  22. 1
      internal/configs/sharding/testnet.go
  23. 4
      internal/params/config.go
  24. 17
      internal/params/config_test.go
  25. 2
      internal/params/protocol_params.go
  26. 7
      internal/shardchain/shardchains.go
  27. 38
      node/node.go
  28. 17
      node/node_newblock.go
  29. 48
      node/worker/worker.go
  30. 35
      rpc/preimages.go
  31. 5
      rpc/rpc.go
  32. 7
      staking/reward/values.go

@ -56,6 +56,7 @@ trace-pointer:
bash ./scripts/go_executable_build.sh -t
debug:
rm -rf .dht-127.0.0.1*
bash ./test/debug.sh
debug-kill:
@ -167,3 +168,6 @@ docker:
travis_go_checker:
bash ./scripts/travis_go_checker.sh
travis_rpc_checker:
bash ./scripts/travis_rpc_checker.sh

@ -65,6 +65,7 @@ var defaultConfig = harmonyconfig.HarmonyConfig{
RateLimterEnabled: true,
RequestsPerSecond: nodeconfig.DefaultRPCRateLimit,
EvmCallTimeout: nodeconfig.DefaultEvmCallTimeout,
PreimagesEnabled: false,
},
BLSKeys: harmonyconfig.BlsConfig{
KeyDir: "./.hmy/blskeys",
@ -149,6 +150,13 @@ var defaultRevertConfig = harmonyconfig.RevertConfig{
RevertTo: 0,
}
var defaultPreimageConfig = harmonyconfig.PreimageConfig{
ImportFrom: "",
ExportTo: "",
GenerateStart: 0,
GenerateEnd: 0,
}
var defaultLogContext = harmonyconfig.LogContext{
IP: "127.0.0.1",
Port: 9000,
@ -291,6 +299,11 @@ func getDefaultRevertConfigCopy() harmonyconfig.RevertConfig {
return config
}
func getDefaultPreimageConfigCopy() harmonyconfig.PreimageConfig {
config := defaultPreimageConfig
return config
}
func getDefaultLogContextCopy() harmonyconfig.LogContext {
config := defaultLogContext
return config

@ -90,6 +90,7 @@ var (
rpcOptFlags = []cli.Flag{
rpcDebugEnabledFlag,
rpcPreimagesEnabledFlag,
rpcEthRPCsEnabledFlag,
rpcStakingRPCsEnabledFlag,
rpcLegacyRPCsEnabledFlag,
@ -205,6 +206,13 @@ var (
revertBeforeFlag,
}
preimageFlags = []cli.Flag{
preimageImportFlag,
preimageExportFlag,
preimageGenerateStartFlag,
preimageGenerateEndFlag,
}
legacyRevertFlags = []cli.Flag{
legacyRevertBeaconFlag,
legacyRevertBeforeFlag,
@ -370,6 +378,7 @@ func getRootFlags() []cli.Flag {
flags = append(flags, sysFlags...)
flags = append(flags, devnetFlags...)
flags = append(flags, revertFlags...)
flags = append(flags, preimageFlags...)
flags = append(flags, legacyMiscFlags...)
flags = append(flags, prometheusFlags...)
flags = append(flags, syncFlags...)
@ -827,6 +836,12 @@ var (
DefValue: defaultConfig.RPCOpt.DebugEnabled,
Hidden: true,
}
rpcPreimagesEnabledFlag = cli.BoolFlag{
Name: "rpc.preimages",
Usage: "enable preimages export api",
DefValue: defaultConfig.RPCOpt.PreimagesEnabled,
Hidden: true, // not for end users
}
rpcEthRPCsEnabledFlag = cli.BoolFlag{
Name: "rpc.eth",
@ -879,6 +894,9 @@ func applyRPCOptFlags(cmd *cobra.Command, config *harmonyconfig.HarmonyConfig) {
if cli.IsFlagChanged(cmd, rpcDebugEnabledFlag) {
config.RPCOpt.DebugEnabled = cli.GetBoolFlagValue(cmd, rpcDebugEnabledFlag)
}
if cli.IsFlagChanged(cmd, rpcPreimagesEnabledFlag) {
config.RPCOpt.PreimagesEnabled = cli.GetBoolFlagValue(cmd, rpcPreimagesEnabledFlag)
}
if cli.IsFlagChanged(cmd, rpcEthRPCsEnabledFlag) {
config.RPCOpt.EthRPCsEnabled = cli.GetBoolFlagValue(cmd, rpcEthRPCsEnabledFlag)
}
@ -1656,6 +1674,52 @@ func applyRevertFlags(cmd *cobra.Command, config *harmonyconfig.HarmonyConfig) {
}
}
var (
preimageImportFlag = cli.StringFlag{
Name: "preimage.import",
Usage: "Import pre-images from CSV file",
Hidden: true,
DefValue: defaultPreimageConfig.ImportFrom,
}
preimageExportFlag = cli.StringFlag{
Name: "preimage.export",
Usage: "Export pre-images to CSV file",
Hidden: true,
DefValue: defaultPreimageConfig.ExportTo,
}
preimageGenerateStartFlag = cli.Uint64Flag{
Name: "preimage.start",
Usage: "The block number from which pre-images are to be generated",
Hidden: true,
DefValue: defaultPreimageConfig.GenerateStart,
}
preimageGenerateEndFlag = cli.Uint64Flag{
Name: "preimage.end",
Usage: "The block number upto and including which pre-images are to be generated",
Hidden: true,
DefValue: defaultPreimageConfig.GenerateEnd,
}
)
func applyPreimageFlags(cmd *cobra.Command, config *harmonyconfig.HarmonyConfig) {
if cli.HasFlagsChanged(cmd, preimageFlags) {
cfg := getDefaultPreimageConfigCopy()
config.Preimage = &cfg
}
if cli.IsFlagChanged(cmd, preimageImportFlag) {
config.Preimage.ImportFrom = cli.GetStringFlagValue(cmd, preimageImportFlag)
}
if cli.IsFlagChanged(cmd, preimageExportFlag) {
config.Preimage.ExportTo = cli.GetStringFlagValue(cmd, preimageExportFlag)
}
if cli.IsFlagChanged(cmd, preimageGenerateStartFlag) {
config.Preimage.GenerateStart = cli.GetUint64FlagValue(cmd, preimageGenerateStartFlag)
}
if cli.IsFlagChanged(cmd, preimageGenerateEndFlag) {
config.Preimage.GenerateEnd = cli.GetUint64FlagValue(cmd, preimageGenerateEndFlag)
}
}
var (
legacyPortFlag = cli.IntFlag{
Name: "port",

@ -92,6 +92,7 @@ func TestHarmonyFlags(t *testing.T) {
RateLimterEnabled: true,
RequestsPerSecond: 1000,
EvmCallTimeout: defaultConfig.RPCOpt.EvmCallTimeout,
PreimagesEnabled: defaultConfig.RPCOpt.PreimagesEnabled,
},
WS: harmonyconfig.WsConfig{
Enabled: true,
@ -752,6 +753,7 @@ func TestRPCOptFlags(t *testing.T) {
RateLimterEnabled: true,
RequestsPerSecond: 1000,
EvmCallTimeout: defaultConfig.RPCOpt.EvmCallTimeout,
PreimagesEnabled: defaultConfig.RPCOpt.PreimagesEnabled,
},
},
@ -766,6 +768,7 @@ func TestRPCOptFlags(t *testing.T) {
RateLimterEnabled: true,
RequestsPerSecond: 1000,
EvmCallTimeout: defaultConfig.RPCOpt.EvmCallTimeout,
PreimagesEnabled: defaultConfig.RPCOpt.PreimagesEnabled,
},
},
@ -780,6 +783,7 @@ func TestRPCOptFlags(t *testing.T) {
RateLimterEnabled: true,
RequestsPerSecond: 1000,
EvmCallTimeout: defaultConfig.RPCOpt.EvmCallTimeout,
PreimagesEnabled: defaultConfig.RPCOpt.PreimagesEnabled,
},
},
@ -794,6 +798,7 @@ func TestRPCOptFlags(t *testing.T) {
RateLimterEnabled: true,
RequestsPerSecond: 1000,
EvmCallTimeout: defaultConfig.RPCOpt.EvmCallTimeout,
PreimagesEnabled: defaultConfig.RPCOpt.PreimagesEnabled,
},
},
@ -808,6 +813,7 @@ func TestRPCOptFlags(t *testing.T) {
RateLimterEnabled: true,
RequestsPerSecond: 1000,
EvmCallTimeout: defaultConfig.RPCOpt.EvmCallTimeout,
PreimagesEnabled: defaultConfig.RPCOpt.PreimagesEnabled,
},
},
@ -822,6 +828,7 @@ func TestRPCOptFlags(t *testing.T) {
RateLimterEnabled: true,
RequestsPerSecond: 1000,
EvmCallTimeout: defaultConfig.RPCOpt.EvmCallTimeout,
PreimagesEnabled: defaultConfig.RPCOpt.PreimagesEnabled,
},
},
@ -836,6 +843,7 @@ func TestRPCOptFlags(t *testing.T) {
RateLimterEnabled: true,
RequestsPerSecond: 2000,
EvmCallTimeout: defaultConfig.RPCOpt.EvmCallTimeout,
PreimagesEnabled: defaultConfig.RPCOpt.PreimagesEnabled,
},
},
@ -850,6 +858,7 @@ func TestRPCOptFlags(t *testing.T) {
RateLimterEnabled: false,
RequestsPerSecond: 2000,
EvmCallTimeout: defaultConfig.RPCOpt.EvmCallTimeout,
PreimagesEnabled: defaultConfig.RPCOpt.PreimagesEnabled,
},
},
@ -864,6 +873,22 @@ func TestRPCOptFlags(t *testing.T) {
RateLimterEnabled: true,
RequestsPerSecond: 1000,
EvmCallTimeout: "10s",
PreimagesEnabled: defaultConfig.RPCOpt.PreimagesEnabled,
},
},
{
args: []string{"--rpc.preimages"},
expConfig: harmonyconfig.RpcOptConfig{
DebugEnabled: false,
EthRPCsEnabled: true,
StakingRPCsEnabled: true,
LegacyRPCsEnabled: true,
RpcFilterFile: "./.hmy/rpc_filter.txt",
RateLimterEnabled: true,
RequestsPerSecond: 1000,
EvmCallTimeout: defaultConfig.RPCOpt.EvmCallTimeout,
PreimagesEnabled: true,
},
},
}
@ -1509,6 +1534,70 @@ func TestRevertFlags(t *testing.T) {
}
}
func TestPreimageFlags(t *testing.T) {
tests := []struct {
args []string
expConfig *harmonyconfig.PreimageConfig
expErr error
}{
{
args: []string{},
expConfig: nil,
},
{
args: []string{"--preimage.import", "/path/to/source.csv"},
expConfig: &harmonyconfig.PreimageConfig{
ImportFrom: "/path/to/source.csv",
ExportTo: defaultPreimageConfig.ExportTo,
GenerateStart: defaultPreimageConfig.GenerateStart,
GenerateEnd: defaultPreimageConfig.GenerateEnd,
},
},
{
args: []string{"--preimage.export", "/path/to/destination.csv"},
expConfig: &harmonyconfig.PreimageConfig{
ImportFrom: defaultPreimageConfig.ImportFrom,
ExportTo: "/path/to/destination.csv",
GenerateStart: defaultPreimageConfig.GenerateStart,
GenerateEnd: defaultPreimageConfig.GenerateEnd,
},
},
{
args: []string{"--preimage.start", "1"},
expConfig: &harmonyconfig.PreimageConfig{
ImportFrom: defaultPreimageConfig.ImportFrom,
ExportTo: defaultPreimageConfig.ExportTo,
GenerateStart: 1,
GenerateEnd: defaultPreimageConfig.GenerateEnd,
},
},
{
args: []string{"--preimage.end", "2"},
expConfig: &harmonyconfig.PreimageConfig{
ImportFrom: defaultPreimageConfig.ImportFrom,
ExportTo: defaultPreimageConfig.ExportTo,
GenerateStart: defaultPreimageConfig.GenerateStart,
GenerateEnd: 2,
},
},
}
for i, test := range tests {
ts := newFlagTestSuite(t, preimageFlags, applyPreimageFlags)
hc, err := ts.run(test.args)
if assErr := assertError(err, test.expErr); assErr != nil {
t.Fatalf("Test %v: %v", i, assErr)
}
if err != nil || test.expErr != nil {
continue
}
if !reflect.DeepEqual(hc.Preimage, test.expConfig) {
t.Errorf("Test %v:\n\t%+v\n\t%+v", i, hc.Preimage, test.expConfig)
}
ts.tearDown()
}
}
func TestDNSSyncFlags(t *testing.T) {
tests := []struct {
args []string

@ -246,6 +246,7 @@ func applyRootFlags(cmd *cobra.Command, config *harmonyconfig.HarmonyConfig) {
applySysFlags(cmd, config)
applyDevnetFlags(cmd, config)
applyRevertFlags(cmd, config)
applyPreimageFlags(cmd, config)
applyPrometheusFlags(cmd, config)
applySyncFlags(cmd, config)
applyShardDataFlags(cmd, config)
@ -375,6 +376,57 @@ func setupNodeAndRun(hc harmonyconfig.HarmonyConfig) {
}
}
//// code to handle pre-image export, import and generation
if hc.Preimage != nil {
if hc.Preimage.ImportFrom != "" {
if err := core.ImportPreimages(
currentNode.Blockchain(),
hc.Preimage.ImportFrom,
); err != nil {
fmt.Println("Error importing", err)
os.Exit(1)
}
os.Exit(0)
} else if exportPath := hc.Preimage.ExportTo; exportPath != "" {
if err := core.ExportPreimages(
currentNode.Blockchain(),
exportPath,
); err != nil {
fmt.Println("Error exporting", err)
os.Exit(1)
}
os.Exit(0)
// both must be set
} else if hc.Preimage.GenerateStart > 0 {
chain := currentNode.Blockchain()
end := hc.Preimage.GenerateEnd
current := chain.CurrentBlock().NumberU64()
if end > current {
fmt.Printf(
"Cropping generate endpoint from %d to %d\n",
end, current,
)
end = current
}
if end == 0 {
end = current
}
fmt.Println("Starting generation")
if err := core.GeneratePreimages(
chain,
hc.Preimage.GenerateStart, end,
); err != nil {
fmt.Println("Error generating", err)
os.Exit(1)
}
fmt.Println("Generation successful")
os.Exit(0)
}
os.Exit(0)
}
startMsg := "==== New Harmony Node ===="
if hc.General.NodeType == nodeTypeExplorer {
startMsg = "==== New Explorer Node ===="
@ -452,6 +504,11 @@ func setupNodeAndRun(hc harmonyconfig.HarmonyConfig) {
Msg("Start Rosetta failed")
}
go core.WritePreimagesMetricsIntoPrometheus(
currentNode.Blockchain(),
currentNode.Consensus.UpdatePreimageGenerationMetrics,
)
go listenOSSigAndShutDown(currentNode)
if !hc.General.IsOffline {

@ -95,7 +95,7 @@ func (consensus *Consensus) updatePublicKeys(pubKeys, allowlist []bls_cosi.Publi
if len(allKeys) != 0 {
consensus.LeaderPubKey = &allKeys[0]
consensus.getLogger().Info().
Str("info", consensus.LeaderPubKey.Bytes.Hex()).Msg("My Leader")
Str("info", consensus.LeaderPubKey.Bytes.Hex()).Msg("Setting leader as first validator, because provided new keys")
} else {
consensus.getLogger().Error().
Msg("[UpdatePublicKeys] Participants is empty")

@ -9,6 +9,52 @@ import (
)
var (
preimageStartGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "hmy",
Subsystem: "blockchain",
Name: "preimage_start",
Help: "the first block for which pre-image generation ran locally",
ConstLabels: map[string]string{},
},
[]string{
"shard",
},
)
preimageEndGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "hmy",
Subsystem: "blockchain",
Name: "preimage_end",
Help: "the last block for which pre-image generation ran locally",
},
[]string{
"shard",
},
)
verifiedPreimagesGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "hmy",
Subsystem: "blockchain",
Name: "verified_preimages",
Help: "the number of verified preimages",
},
[]string{
"shard",
},
)
lastPreimageImportGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "hmy",
Subsystem: "blockchain",
Name: "last_preimage_import",
Help: "the last known block for which preimages were imported",
},
[]string{
"shard",
},
)
// consensusCounterVec is used to keep track of consensus reached
consensusCounterVec = prometheus.NewCounterVec(
prometheus.CounterOpts{
@ -103,6 +149,26 @@ func (consensus *Consensus) UpdateLeaderMetrics(numCommits float64, blockNum flo
consensusCounterVec.With(prometheus.Labels{"consensus": "num_commits"}).Add(numCommits)
consensusGaugeVec.With(prometheus.Labels{"consensus": "num_commits"}).Set(numCommits)
}
func (consensus *Consensus) UpdatePreimageGenerationMetrics(
preimageStart uint64,
preimageEnd uint64,
lastPreimageImport uint64,
verifiedAddresses uint64,
shard uint32,
) {
if lastPreimageImport > 0 {
lastPreimageImportGauge.With(prometheus.Labels{"shard": fmt.Sprintf("%d", shard)}).Set(float64(lastPreimageImport))
}
if preimageStart > 0 {
preimageStartGauge.With(prometheus.Labels{"shard": fmt.Sprintf("%d", shard)}).Set(float64(preimageStart))
}
if preimageEnd > 0 {
preimageEndGauge.With(prometheus.Labels{"shard": fmt.Sprintf("%d", shard)}).Set(float64(preimageEnd))
}
if verifiedAddresses > 0 {
verifiedPreimagesGauge.With(prometheus.Labels{"shard": fmt.Sprintf("%d", shard)}).Set(float64(verifiedAddresses))
}
}
// AddPubkeyMetrics add the list of blskeys to prometheus metrics
func (consensus *Consensus) AddPubkeyMetrics() {
@ -122,6 +188,10 @@ func initMetrics() {
consensusGaugeVec,
consensusPubkeyVec,
consensusFinalityHistogram,
lastPreimageImportGauge,
preimageEndGauge,
preimageStartGauge,
verifiedPreimagesGauge,
)
})
}

@ -11,6 +11,7 @@ import (
"github.com/harmony-one/harmony/consensus/reward"
"github.com/harmony-one/harmony/core/rawdb"
"github.com/harmony-one/harmony/core/state"
"github.com/harmony-one/harmony/core/state/snapshot"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/core/vm"
"github.com/harmony-one/harmony/crypto/bls"
@ -344,6 +345,9 @@ type BlockChain interface {
) (status WriteStatus, err error)
GetLeaderPubKeyFromCoinbase(h *block.Header) (*bls.PublicKeyWrapper, error)
CommitPreimages() error
GetStateCache() state.Database
GetSnapshotTrie() *snapshot.Tree
// ========== Only For Tikv Start ==========

@ -149,6 +149,7 @@ var defaultCacheConfig = &CacheConfig{
TrieTimeLimit: 5 * time.Minute,
SnapshotLimit: 256,
SnapshotWait: true,
Preimages: true,
}
type BlockChainImpl struct {
@ -236,7 +237,7 @@ func NewBlockChainWithOptions(
// NewBlockChain returns a fully initialised block chain using information
// available in the database. It initialises the default Ethereum validator and
// Processor.
// Processor. As of Aug-23, this is only used by tests
func NewBlockChain(
db ethdb.Database, stateCache state.Database, beaconChain BlockChain, cacheConfig *CacheConfig, chainConfig *params.ChainConfig,
engine consensus_engine.Engine, vmConfig vm.Config,
@ -366,6 +367,12 @@ func newBlockChainWithOptions(
return nil, errors.WithMessage(err, "failed to build leader rotation meta")
}
if cacheConfig.Preimages {
if _, _, err := rawdb.WritePreImageStartEndBlock(bc.ChainDb(), curHeader.NumberU64()+1, 0); err != nil {
return nil, errors.WithMessage(err, "failed to write pre-image start end blocks")
}
}
// Take ownership of this particular state
go bc.update()
return bc, nil
@ -1190,6 +1197,10 @@ func (bc *BlockChainImpl) Stop() {
// Flush the collected preimages to disk
if err := bc.stateCache.TrieDB().CommitPreimages(); err != nil {
utils.Logger().Error().Interface("err", err).Msg("Failed to commit trie preimages")
} else {
if _, _, err := rawdb.WritePreImageStartEndBlock(bc.ChainDb(), 0, bc.CurrentBlock().NumberU64()); err != nil {
utils.Logger().Error().Interface("err", err).Msg("Failed to mark preimages end block")
}
}
// Ensure all live cached entries be saved into disk, so that we can skip
// cache warmup when node restarts.
@ -3676,6 +3687,18 @@ func (bc *BlockChainImpl) InitTiKV(conf *harmonyconfig.TiKVConfig) {
go bc.tikvCleanCache()
}
func (bc *BlockChainImpl) CommitPreimages() error {
return bc.stateCache.TrieDB().CommitPreimages()
}
func (bc *BlockChainImpl) GetStateCache() state.Database {
return bc.stateCache
}
func (bc *BlockChainImpl) GetSnapshotTrie() *snapshot.Tree {
return bc.snaps
}
var (
leveldbErrSpec = "leveldb"
tooManyOpenFilesErrStr = "Too many open files"

@ -3,6 +3,8 @@ package core
import (
"math/big"
"github.com/harmony-one/harmony/core/state/snapshot"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
@ -439,3 +441,15 @@ func (a Stub) InitTiKV(conf *harmonyconfig.TiKVConfig) {
func (a Stub) LeaderRotationMeta() (publicKeyBytes []byte, epoch, count, shifts uint64, err error) {
return nil, 0, 0, 0, errors.Errorf("method LeaderRotationMeta not implemented for %s", a.Name)
}
func (a Stub) CommitPreimages() error {
return errors.Errorf("method CommitPreimages not implemented for %s", a.Name)
}
func (a Stub) GetStateCache() state.Database {
return nil
}
func (a Stub) GetSnapshotTrie() *snapshot.Tree {
return nil
}

@ -323,3 +323,9 @@ func (bc *EpochChain) IsSameLeaderAsPreviousBlock(block *types.Block) bool {
func (bc *EpochChain) GetVMConfig() *vm.Config {
return bc.vmConfig
}
func (bc *EpochChain) CommitPreimages() error {
// epoch chain just has last block, which does not have any txs
// so no pre-images here
return nil
}

@ -28,6 +28,7 @@ import (
"strings"
"github.com/ethereum/go-ethereum/common"
ethCommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/common/math"
"github.com/ethereum/go-ethereum/crypto"
@ -252,11 +253,19 @@ func (g *Genesis) ToBlock(db ethdb.Database) *types.Block {
for key, value := range account.Storage {
statedb.SetState(addr, key, value)
}
if err := rawdb.WritePreimages(
statedb.Database().DiskDB(), map[ethCommon.Hash][]byte{
crypto.Keccak256Hash(addr.Bytes()): addr.Bytes(),
},
); err != nil {
utils.Logger().Error().Err(err).Msg("Failed to store preimage")
os.Exit(1)
}
}
root := statedb.IntermediateRoot(false)
shardStateBytes, err := shard.EncodeWrapper(g.ShardState, false)
if err != nil {
utils.Logger().Error().Msg("failed to rlp-serialize genesis shard state")
utils.Logger().Error().Err(err).Msg("failed to rlp-serialize genesis shard state")
os.Exit(1)
}
head := g.Factory.NewHeader(common.Big0).With().

@ -0,0 +1,374 @@
package core
import (
"encoding/csv"
"fmt"
"io"
"os"
"strconv"
"time"
"github.com/ethereum/go-ethereum/common"
ethCommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/harmony-one/harmony/block"
"github.com/harmony-one/harmony/core/rawdb"
"github.com/harmony-one/harmony/core/state"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils"
"github.com/pkg/errors"
)
// ImportPreimages is public so `main.go` can call it directly`
func ImportPreimages(chain BlockChain, path string) error {
reader, err := os.Open(path)
if err != nil {
return fmt.Errorf("could not open file for reading: %s", err)
}
csvReader := csv.NewReader(reader)
dbReader := chain.ChainDb()
imported := uint64(0)
for {
record, err := csvReader.Read()
if errors.Is(err, io.EOF) {
return fmt.Errorf("MyBlockNumber field missing, cannot proceed")
}
if err != nil {
return fmt.Errorf("could not read from reader: %s", err)
}
// this means the address is a number
if blockNumber, err := strconv.ParseUint(record[1], 10, 64); err == nil {
if record[0] == "MyBlockNumber" {
// set this value in database, and prometheus, if needed
prev, err := rawdb.ReadPreimageImportBlock(dbReader)
if err != nil {
return fmt.Errorf("no prior value found, overwriting: %s", err)
}
if blockNumber > prev {
if rawdb.WritePreimageImportBlock(dbReader, blockNumber) != nil {
return fmt.Errorf("error saving last import block: %s", err)
}
}
// this is the last record
imported = blockNumber
break
}
}
key := ethCommon.HexToHash(record[0])
value := ethCommon.Hex2Bytes(record[1])
// validate
if crypto.Keccak256Hash(value) != key {
fmt.Println("Data mismatch: skipping", record)
continue
}
// add to database
_ = rawdb.WritePreimages(
dbReader, map[ethCommon.Hash][]byte{
key: value,
},
)
}
// now, at this point, we will have to generate missing pre-images
if imported != 0 {
genStart, _ := rawdb.ReadPreImageStartBlock(dbReader)
genEnd, _ := rawdb.ReadPreImageEndBlock(dbReader)
current := chain.CurrentBlock().NumberU64()
toGenStart, toGenEnd := FindMissingRange(imported, genStart, genEnd, current)
if toGenStart != 0 && toGenEnd != 0 {
if err := GeneratePreimages(
chain, toGenStart, toGenEnd,
); err != nil {
return fmt.Errorf("error generating: %s", err)
}
}
}
return nil
}
// ExportPreimages is public so `main.go` can call it directly`
func ExportPreimages(chain BlockChain, path string) error {
if err := chain.CommitPreimages(); err != nil {
return fmt.Errorf("unable to commit preimages: %w", err)
}
// set up csv
writer, err := os.Create(path)
if err != nil {
utils.Logger().Error().
Msgf("unable to create file at %s due to %s", path, err)
return fmt.Errorf(
"unable to create file at %s due to %s",
path, err,
)
}
csvWriter := csv.NewWriter(writer)
// open trie
block := chain.CurrentBlock()
statedb, err := chain.StateAt(block.Root())
if err != nil {
utils.Logger().Error().
Msgf(
"unable to open statedb at %s due to %s",
block.Root(), err,
)
return fmt.Errorf(
"unable to open statedb at %x due to %s",
block.Root(), err,
)
}
trie, err := statedb.Database().OpenTrie(
block.Root(),
)
if err != nil {
utils.Logger().Error().
Msgf(
"unable to open trie at %x due to %s",
block.Root(), err,
)
return fmt.Errorf(
"unable to open trie at %x due to %s",
block.Root(), err,
)
}
accountIterator := trie.NodeIterator(nil)
dbReader := chain.ChainDb()
for accountIterator.Next(true) {
// the leaf nodes of the MPT represent accounts
if !accountIterator.Leaf() {
continue
}
// the leaf key is the hashed address
hashed := accountIterator.LeafKey()
asHash := ethCommon.BytesToHash(hashed)
// obtain the corresponding address
preimage := rawdb.ReadPreimage(
dbReader, asHash,
)
if len(preimage) == 0 {
utils.Logger().Warn().
Msgf("Address not found for %x", asHash)
continue
}
address := ethCommon.BytesToAddress(preimage)
// key value format, so hash of value is first
csvWriter.Write([]string{
fmt.Sprintf("%x", asHash.Bytes()),
fmt.Sprintf("%x", address.Bytes()),
})
}
// lastly, write the block number
csvWriter.Write(
[]string{
"MyBlockNumber",
block.Number().String(),
},
)
// to disk
csvWriter.Flush()
if err := csvWriter.Error(); err != nil {
utils.Logger().Error().
Msgf("unable to write csv due to %s", err)
return fmt.Errorf("unable to write csv due to %s", err)
}
writer.Close()
return nil
}
func GeneratePreimages(chain BlockChain, start, end uint64) error {
if start < 2 {
return fmt.Errorf("too low starting point %d", start)
}
fmt.Println("generating from", start, "to", end)
// fetch all the blocks, from start and end both inclusive
// then execute them - the execution will write the pre-images
// to disk and we are good to go
// attempt to find a block number for which we have block and state
// with number < start
var startingState *state.DB
var startingBlock *types.Block
for i := start - 1; i > 0; i-- {
fmt.Println("finding block number", i)
startingBlock = chain.GetBlockByNumber(i)
if startingBlock == nil {
fmt.Println("not found block number", i)
// rewound too much in snapdb, so exit loop
// although this is only designed for s2/s3 nodes in mind
// which do not have such a snapdb
break
}
fmt.Println("found block number", startingBlock.NumberU64(), startingBlock.Root().Hex())
stateAt, err := chain.StateAt(startingBlock.Root())
if err != nil {
continue
}
startingState = stateAt
break
}
if startingBlock == nil || startingState == nil {
return fmt.Errorf("no eligible starting block with state found")
}
var endingState *state.DB
var errProcess error
// now execute block T+1 based on starting state
for i := startingBlock.NumberU64() + 1; i <= end; i++ {
if i%10000 == 0 {
fmt.Println("processing block", i)
}
block := chain.GetBlockByNumber(i)
if block == nil {
// because we have startingBlock we must have all following
return fmt.Errorf("block %d not found", i)
}
stateAt, _ := chain.StateAt(block.Root())
_, _, _, _, _, _, endingState, errProcess = chain.Processor().Process(block, startingState, *chain.GetVMConfig(), false)
if errProcess != nil {
return fmt.Errorf("error executing block #%d: %s", i, errProcess)
}
if stateAt != nil {
if root, err := endingState.Commit(false); err != nil {
return fmt.Errorf("unabe to commit state for block '%d': %w", i, err)
} else if root.Hex() != block.Root().Hex() {
return fmt.Errorf("block root hashes different after commit commitRoot='%s' blockRoot='%s'", root.Hex(), block.Root().Hex())
}
if err := chain.CommitPreimages(); err != nil {
return fmt.Errorf("error committing preimages for block '%d': %w", i, err)
}
startingState = stateAt
} else {
startingState = endingState
}
}
// force any pre-images in memory so far to go to disk, if they haven't already
fmt.Println("committing images")
if _, err := endingState.Commit(false); err != nil {
return fmt.Errorf("unabe to commit state for block: %w", err)
}
if err := chain.CommitPreimages(); err != nil {
return fmt.Errorf("error committing preimages %s", err)
}
if _, _, err := rawdb.WritePreImageStartEndBlock(chain.ChainDb(), startingBlock.NumberU64(), end); err != nil {
return fmt.Errorf("error writing pre-image gen blocks %s", err)
}
return nil
}
func FindMissingRange(
imported, start, end, current uint64,
) (uint64, uint64) {
// both are unset
if start == 0 && end == 0 {
if imported < current {
return imported + 1, current
} else {
return 0, 0
}
}
// constraints: start <= end <= current
// in regular usage, we should have end == current
// however, with the GenerateFlag usage, we can have end < current
check1 := start <= end
if !check1 {
panic("Start > End")
}
check2 := end <= current
if !check2 {
panic("End > Current")
}
// imported can sit in any of the 4 ranges
if imported < start {
// both inclusive
return imported + 1, start - 1
}
if imported < end {
return end + 1, current
}
if imported < current {
return imported + 1, current
}
// future data imported
if current < imported {
return 0, 0
}
return 0, 0
}
func VerifyPreimages(header *block.Header, chain BlockChain) (uint64, error) {
var existingPreimages uint64
parentRoot := chain.GetBlockByHash(
header.ParentHash(),
).Root() // for examining MPT at this root, should exist
db, err := chain.StateAt(parentRoot)
if err != nil {
return 0, err
}
trie, err := db.Database().OpenTrie(parentRoot)
if err != nil {
return 0, err
}
if err := chain.CommitPreimages(); err != nil {
return 0, fmt.Errorf("unable to commit preimages: %w", err)
}
diskDB := db.Database().DiskDB()
// start the iteration
accountIterator := trie.NodeIterator(nil)
for accountIterator.Next(true) {
// leaf means leaf node of the MPT, which is an account
// the leaf key is the address
if accountIterator.Leaf() {
key := accountIterator.LeafKey()
preimage := rawdb.ReadPreimage(diskDB, common.BytesToHash(key))
if len(preimage) == 0 {
err := errors.New(
fmt.Sprintf(
"cannot find preimage for %x after '%d' accounts", key, existingPreimages,
),
)
utils.Logger().Warn().Msg(err.Error())
return existingPreimages, err
}
address := common.BytesToAddress(preimage)
// skip blank address
if address == (common.Address{}) {
continue
}
existingPreimages++
}
}
return existingPreimages, nil
}
func WritePreimagesMetricsIntoPrometheus(chain BlockChain, sendMetrics func(preimageStart, preimageEnd, lastPreimageImport, verifiedAddresses uint64, shard uint32)) {
shardID := chain.ShardID()
if shardID < 2 {
return
}
ticker := time.NewTicker(time.Minute * 5)
dbReader := chain.ChainDb()
for {
select {
case <-ticker.C:
lastImport, _ := rawdb.ReadPreimageImportBlock(dbReader)
startBlock, _ := rawdb.ReadPreImageStartBlock(dbReader)
endBlock, _ := rawdb.ReadPreImageEndBlock(dbReader)
chain.CurrentBlock().NumberU64()
verify, _ := VerifyPreimages(chain.CurrentBlock().Header(), chain)
sendMetrics(startBlock, endBlock, lastImport, verify, shardID)
}
}
}

@ -147,3 +147,65 @@ func DeleteValidatorCode(db ethdb.KeyValueWriter, hash common.Hash) {
utils.Logger().Error().Err(err).Msg("Failed to delete validator code")
}
}
func WritePreimageImportBlock(db ethdb.KeyValueWriter, number uint64) error {
return db.Put(preImageImportKey, encodeBlockNumber(number))
}
func ReadPreimageImportBlock(db ethdb.KeyValueReader) (uint64, error) {
val, err := db.Get(preImageImportKey)
if err != nil {
return 0, err
}
return decodeBlockNumber(val), nil
}
func WritePreImageStartEndBlock(
db ethdb.KeyValueStore,
start uint64,
end uint64,
) (
uint64,
uint64,
error,
) {
returnStart := start
returnEnd := end
if start != 0 {
existingStart, err := ReadPreImageStartBlock(db)
if err != nil || existingStart > start {
if err := db.Put(preImageGenStartKey, encodeBlockNumber(start)); err != nil {
return 0, 0, err
} else {
returnStart = existingStart
}
}
}
if end != 0 {
existingEnd, err := ReadPreImageEndBlock(db)
if err != nil || existingEnd < end {
if err := db.Put(preImageGenEndKey, encodeBlockNumber(end)); err != nil {
return 0, 0, err
} else {
returnEnd = existingEnd
}
}
}
return returnStart, returnEnd, nil
}
func ReadPreImageStartBlock(db ethdb.KeyValueReader) (uint64, error) {
val, err := db.Get(preImageGenStartKey)
if err != nil {
return 0, err
}
return decodeBlockNumber(val), nil
}
func ReadPreImageEndBlock(db ethdb.KeyValueReader) (uint64, error) {
val, err := db.Get(preImageGenEndKey)
if err != nil {
return 0, err
}
return decodeBlockNumber(val), nil
}

@ -148,6 +148,10 @@ var (
BloomTrieIndexPrefix = []byte("bltIndex-")
CliqueSnapshotPrefix = []byte("clique-")
preImageImportKey = []byte("preimage-import")
preImageGenStartKey = []byte("preimage-gen-start")
preImageGenEndKey = []byte("preimage-gen-end")
)
// LegacyTxLookupEntry is the legacy TxLookupEntry definition with some unnecessary

@ -17,6 +17,8 @@
package core
import (
"encoding/binary"
"fmt"
"math/big"
"time"
@ -27,6 +29,7 @@ import (
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/block"
"github.com/harmony-one/harmony/consensus/reward"
"github.com/harmony-one/harmony/core/rawdb"
"github.com/harmony-one/harmony/core/state"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/core/vm"
@ -40,6 +43,11 @@ import (
"github.com/pkg/errors"
)
var (
ErrNoMigrationRequired = errors.New("No balance migration required")
ErrNoMigrationPossible = errors.New("No balance migration possible")
)
const (
resultCacheLimit = 64 // The number of cached results from processing blocks
)
@ -125,43 +133,63 @@ func (p *StateProcessor) Process(
return nil, nil, nil, nil, 0, nil, statedb, err
}
startTime := time.Now()
// Iterate over and process the individual transactions
for i, tx := range block.Transactions() {
statedb.Prepare(tx.Hash(), block.Hash(), i)
receipt, cxReceipt, stakeMsgs, _, err := ApplyTransaction(
p.bc, &beneficiary, gp, statedb, header, tx, usedGas, cfg,
)
if err != nil {
processTxsAndStxs := true
cxReceipt, err := MayBalanceMigration(gp, header, statedb, p.bc)
if err != nil {
if errors.Is(err, ErrNoMigrationPossible) {
// ran out of accounts
processTxsAndStxs = false
}
if !errors.Is(err, ErrNoMigrationRequired) && !errors.Is(err, ErrNoMigrationPossible) {
return nil, nil, nil, nil, 0, nil, statedb, err
}
receipts = append(receipts, receipt)
} else {
if cxReceipt != nil {
outcxs = append(outcxs, cxReceipt)
// only 1 cx per block
processTxsAndStxs = false
}
if len(stakeMsgs) > 0 {
blockStakeMsgs = append(blockStakeMsgs, stakeMsgs...)
}
allLogs = append(allLogs, receipt.Logs...)
}
utils.Logger().Debug().Int64("elapsed time", time.Now().Sub(startTime).Milliseconds()).Msg("Process Normal Txns")
startTime = time.Now()
// Iterate over and process the staking transactions
L := len(block.Transactions())
for i, tx := range block.StakingTransactions() {
statedb.Prepare(tx.Hash(), block.Hash(), i+L)
receipt, _, err := ApplyStakingTransaction(
p.bc, &beneficiary, gp, statedb, header, tx, usedGas, cfg,
)
if err != nil {
return nil, nil, nil, nil, 0, nil, statedb, err
if processTxsAndStxs {
startTime := time.Now()
// Iterate over and process the individual transactions
for i, tx := range block.Transactions() {
statedb.Prepare(tx.Hash(), block.Hash(), i)
receipt, cxReceipt, stakeMsgs, _, err := ApplyTransaction(
p.bc, &beneficiary, gp, statedb, header, tx, usedGas, cfg,
)
if err != nil {
return nil, nil, nil, nil, 0, nil, statedb, err
}
receipts = append(receipts, receipt)
if cxReceipt != nil {
outcxs = append(outcxs, cxReceipt)
}
if len(stakeMsgs) > 0 {
blockStakeMsgs = append(blockStakeMsgs, stakeMsgs...)
}
allLogs = append(allLogs, receipt.Logs...)
}
receipts = append(receipts, receipt)
allLogs = append(allLogs, receipt.Logs...)
utils.Logger().Debug().Int64("elapsed time", time.Now().Sub(startTime).Milliseconds()).Msg("Process Normal Txns")
startTime = time.Now()
// Iterate over and process the staking transactions
L := len(block.Transactions())
for i, tx := range block.StakingTransactions() {
statedb.Prepare(tx.Hash(), block.Hash(), i+L)
receipt, _, err := ApplyStakingTransaction(
p.bc, &beneficiary, gp, statedb, header, tx, usedGas, cfg,
)
if err != nil {
return nil, nil, nil, nil, 0, nil, statedb, err
}
receipts = append(receipts, receipt)
allLogs = append(allLogs, receipt.Logs...)
}
utils.Logger().Debug().Int64("elapsed time", time.Now().Sub(startTime).Milliseconds()).Msg("Process Staking Txns")
}
utils.Logger().Debug().Int64("elapsed time", time.Now().Sub(startTime).Milliseconds()).Msg("Process Staking Txns")
// incomingReceipts should always be processed
// after transactions (to be consistent with the block proposal)
for _, cx := range block.IncomingReceipts() {
@ -182,7 +210,7 @@ func (p *StateProcessor) Process(
}
}
if err := MayTestnetShardReduction(p.bc, statedb, header); err != nil {
if err := MayShardReduction(p.bc, statedb, header); err != nil {
return nil, nil, nil, nil, 0, nil, statedb, err
}
@ -282,7 +310,7 @@ func ApplyTransaction(bc ChainContext, author *common.Address, gp *GasPool, stat
// Apply the transaction to the current state (included in the env)
result, err := ApplyMessage(vmenv, msg, gp)
if err != nil {
return nil, nil, nil, 0, err
return nil, nil, nil, 0, errors.Wrapf(err, "apply failed from='%s' to='%s' balance='%s'", msg.From().Hex(), msg.To().Hex(), statedb.GetBalance(msg.From()).String())
}
// Update the state with pending changes
var root []byte
@ -440,13 +468,16 @@ func StakingToMessage(
return msg, nil
}
// MayTestnetShardReduction handles the change in the number of Shards. It will mark the affected validator as inactive.
// MayShardReduction handles the change in the number of Shards. It will mark the affected validator as inactive.
// This function does not handle all cases, only for ShardNum from 4 to 2.
func MayTestnetShardReduction(bc ChainContext, statedb *state.DB, header *block.Header) error {
func MayShardReduction(bc ChainContext, statedb *state.DB, header *block.Header) error {
isBeaconChain := header.ShardID() == shard.BeaconChainShardID
isLastBlock := shard.Schedule.IsLastBlock(header.Number().Uint64())
isTestnet := nodeconfig.GetDefaultConfig().GetNetworkType() == nodeconfig.Testnet
if !(isTestnet && isBeaconChain && isLastBlock) {
networkType := nodeconfig.GetDefaultConfig().GetNetworkType()
isTestnet := networkType == nodeconfig.Testnet
isMainnet := networkType == nodeconfig.Mainnet
isReducenet := isMainnet || isTestnet
if !(isReducenet && isBeaconChain && isLastBlock) {
return nil
}
curInstance := shard.Schedule.InstanceForEpoch(header.Epoch())
@ -477,6 +508,15 @@ func MayTestnetShardReduction(bc ChainContext, statedb *state.DB, header *block.
for _, pubKey := range validator.SlotPubKeys {
curShard := new(big.Int).Mod(pubKey.Big(), big.NewInt(int64(curNumShards))).Uint64()
nextShard := new(big.Int).Mod(pubKey.Big(), big.NewInt(int64(nextNumShards))).Uint64()
// background: any editValidator transactions take effect at next epoch.
// assumption: shard reduction happens at epoch X.
// validators who wish to continue validating after the shard reduction occurs
// must have a different node running with a key from shard 0 or 1.
// this key must be added to the validator during epoch X - 1
// and keys belonging to shards 2 and 3 removed at that point in time.
// the different node running will be unelected, but continue syncing in X - 1.
// if elected, it will start validating in epoch X.
// once epoch X begins, they can terminate servers from shards 2 and 3.
if curShard >= uint64(nextNumShards) || curShard != nextShard {
validator.Status = effective.Inactive
break
@ -486,3 +526,163 @@ func MayTestnetShardReduction(bc ChainContext, statedb *state.DB, header *block.
statedb.IntermediateRoot(bc.Config().IsS3(header.Epoch()))
return nil
}
func MayBalanceMigration(
gasPool *GasPool,
header *block.Header,
db *state.DB,
chain BlockChain,
) (*types.CXReceipt, error) {
config := chain.Config()
isMainnet := nodeconfig.GetDefaultConfig().GetNetworkType() == nodeconfig.Mainnet
if isMainnet {
if config.IsOneEpochBeforeHIP30(header.Epoch()) {
nxtShards := shard.Schedule.InstanceForEpoch(
new(big.Int).Add(header.Epoch(), common.Big1),
).NumShards()
if myShard := chain.ShardID(); myShard >= nxtShards {
// i need to send my balances to the destination shard
// however, i do not know when the next epoch will begin
// because only shard 0 can govern that
// so i will just generate one cross shard transaction
// in each block of the epoch. this epoch is defined by
// nxtShards = 2 and curShards = 4
parentRoot := chain.GetBlockByHash(
header.ParentHash(),
).Root() // for examining MPT at this root, should exist
cx, err := generateOneMigrationMessage(
db, parentRoot,
header.NumberU64(),
myShard, uint32(1), // dstShard is always 1
)
if err != nil {
return nil, err
}
if cx != nil {
gasPool.SubGas(params.TxGasXShard)
return cx, nil
}
// both err and cx are nil, which means we
// ran out of eligible accounts in MPT
return nil, ErrNoMigrationPossible
}
}
}
// for testing balance migration on devnet
isDevnet := nodeconfig.GetDefaultConfig().GetNetworkType() == nodeconfig.Devnet
isLocalnet := nodeconfig.GetDefaultConfig().GetNetworkType() == nodeconfig.Localnet
if isDevnet || isLocalnet {
if config.IsOneEpochBeforeHIP30(header.Epoch()) {
if myShard := chain.ShardID(); myShard != shard.BeaconChainShardID {
parentRoot := chain.GetBlockByHash(
header.ParentHash(),
).Root() // for examining MPT at this root, should exist
// for examining MPT at this root, should exist
cx, err := generateOneMigrationMessage(
db, parentRoot,
header.NumberU64(),
myShard, shard.BeaconChainShardID, // dstShard
)
if err != nil {
return nil, errors.Wrap(err, "generateOneMigrationMessage")
}
if cx != nil {
gasPool.SubGas(params.TxGasXShard)
return cx, nil
}
//return nil, errors.Wrap(ErrNoMigrationPossible, "MayBalanceMigration: cx is nil")
return nil, nil
}
}
}
return nil, ErrNoMigrationRequired
}
func generateOneMigrationMessage(
statedb *state.DB,
parentRoot common.Hash,
number uint64,
myShard uint32,
dstShard uint32,
) (*types.CXReceipt, error) {
// set up txHash prefix
txHash := make([]byte,
// 8 for uint64 block number
// 4 for uint32 shard id
8+4,
)
binary.LittleEndian.PutUint64(txHash[:8], number)
binary.LittleEndian.PutUint32(txHash[8:], myShard)
// open the trie, as of previous block.
// in this block we aren't processing transactions anyway.
trie, err := statedb.Database().OpenTrie(
parentRoot,
)
if err != nil {
return nil, err
}
// disk db, for use by rawdb
// this is same as blockchain.ChainDb
db := statedb.Database().DiskDB()
// start the iteration
accountIterator := trie.NodeIterator(nil)
// TODO: cache this iteration?
for accountIterator.Next(true) {
// leaf means leaf node of the MPT, which is an account
// the leaf key is the address
if accountIterator.Leaf() {
key := accountIterator.LeafKey()
preimage := rawdb.ReadPreimage(db, common.BytesToHash(key))
if len(preimage) == 0 {
return nil, errors.New(
fmt.Sprintf(
"cannot find preimage for %x", key,
),
)
}
address := common.BytesToAddress(preimage)
// skip blank address
if address == (common.Address{}) {
continue
}
// deserialize
var account state.Account
if err = rlp.DecodeBytes(accountIterator.LeafBlob(), &account); err != nil {
return nil, err
}
// skip contracts
if common.BytesToHash(account.CodeHash) != state.EmptyCodeHash {
continue
}
// skip anything with storage
if account.Root != state.EmptyRootHash {
continue
}
// skip no (or negative?) balance
if account.Balance.Cmp(common.Big0) <= 0 {
continue
}
// for safety, fetch the latest balance (again)
balance := statedb.GetBalance(address)
if balance.Cmp(common.Big0) <= 0 {
continue
}
// adds a journal entry (dirtied)
statedb.SubBalance(address, balance)
// create the receipt
res := &types.CXReceipt{
From: address,
To: &address,
ShardID: myShard,
ToShardID: dstShard,
Amount: balance,
TxHash: common.BytesToHash(txHash),
}
// move from dirty to pending, same as b/w 2 txs
statedb.Finalise(true)
return res, nil
}
}
return nil, nil
}

@ -224,7 +224,9 @@ func getDefaultStakingReward(bc engine.ChainReader, epoch *big.Int, blockNum uin
}
} else {
// Mainnet (other nets):
if bc.Config().IsTwoSeconds(epoch) {
if bc.Config().IsHIP30(epoch) {
defaultReward = stakingReward.HIP30StakedBlocks
} else if bc.Config().IsTwoSeconds(epoch) {
defaultReward = stakingReward.TwoSecStakedBlocks
} else if bc.Config().IsFiveSeconds(epoch) {
defaultReward = stakingReward.FiveSecStakedBlocks
@ -410,7 +412,8 @@ func distributeRewardAfterAggregateEpoch(bc engine.ChainReader, state *state.DB,
utils.Logger().Debug().Int64("elapsed time", time.Now().Sub(startTimeLocal).Milliseconds()).Msg("After Chain Reward (AddReward)")
utils.Logger().Debug().Int64("elapsed time", time.Now().Sub(startTime).Milliseconds()).Msg("After Chain Reward")
return remainingReward, network.NewStakingEraRewardForRound(
// remainingReward needs to be multipled with the number of crosslinks across all shards
return remainingReward.MulInt(big.NewInt(int64(len(allCrossLinks)))), network.NewStakingEraRewardForRound(
newRewards, payouts,
), nil
}

@ -71,8 +71,7 @@ type Int64Flag struct {
Usage string
Deprecated string
Hidden bool
DefValue int64
DefValue int64
}
// RegisterTo register the int flag to FlagSet
@ -81,6 +80,22 @@ func (f Int64Flag) RegisterTo(fs *pflag.FlagSet) error {
return markHiddenOrDeprecated(fs, f.Name, f.Deprecated, f.Hidden)
}
// Uint64Flag is the flag with uint64 value, used for block number configurations
type Uint64Flag struct {
Name string
Shorthand string
Usage string
Deprecated string
Hidden bool
DefValue uint64
}
// RegisterTo register the int flag to FlagSet
func (f Uint64Flag) RegisterTo(fs *pflag.FlagSet) error {
fs.Uint64P(f.Name, f.Shorthand, f.DefValue, f.Usage)
return markHiddenOrDeprecated(fs, f.Name, f.Deprecated, f.Hidden)
}
// StringSliceFlag is the flag with string slice value
type StringSliceFlag struct {
Name string
@ -143,6 +158,8 @@ func getFlagName(flag Flag) string {
return f.Name
case Int64Flag:
return f.Name
case Uint64Flag:
return f.Name
}
return ""
}

@ -76,6 +76,12 @@ func GetInt64FlagValue(cmd *cobra.Command, flag Int64Flag) int64 {
return getInt64FlagValue(cmd.Flags(), flag)
}
// GetInt64FlagValue get the int value for the given Int64Flag from the local flags of the
// cobra command.
func GetUint64FlagValue(cmd *cobra.Command, flag Uint64Flag) uint64 {
return getUint64FlagValue(cmd.Flags(), flag)
}
// GetIntPersistentFlagValue get the int value for the given IntFlag from the persistent
// flags of the cobra command.
func GetIntPersistentFlagValue(cmd *cobra.Command, flag IntFlag) int {
@ -100,6 +106,15 @@ func getInt64FlagValue(fs *pflag.FlagSet, flag Int64Flag) int64 {
return val
}
func getUint64FlagValue(fs *pflag.FlagSet, flag Uint64Flag) uint64 {
val, err := fs.GetUint64(flag.Name)
if err != nil {
handleParseError(err)
return 0
}
return val
}
// GetStringSliceFlagValue get the string slice value for the given StringSliceFlag from
// the local flags of the cobra command.
func GetStringSliceFlagValue(cmd *cobra.Command, flag StringSliceFlag) []string {

@ -36,6 +36,7 @@ type HarmonyConfig struct {
DNSSync DnsSync
ShardData ShardDataConfig
GPO GasPriceOracleConfig
Preimage *PreimageConfig
}
func (hc HarmonyConfig) ToRPCServerConfig() nodeconfig.RPCServerConfig {
@ -84,6 +85,7 @@ func (hc HarmonyConfig) ToRPCServerConfig() nodeconfig.RPCServerConfig {
WSPort: hc.WS.Port,
WSAuthPort: hc.WS.AuthPort,
DebugEnabled: hc.RPCOpt.DebugEnabled,
PreimagesEnabled: hc.RPCOpt.PreimagesEnabled,
EthRPCsEnabled: hc.RPCOpt.EthRPCsEnabled,
StakingRPCsEnabled: hc.RPCOpt.StakingRPCsEnabled,
LegacyRPCsEnabled: hc.RPCOpt.LegacyRPCsEnabled,
@ -287,6 +289,7 @@ type RpcOptConfig struct {
RateLimterEnabled bool // Enable Rate limiter for RPC
RequestsPerSecond int // for RPC rate limiter
EvmCallTimeout string // Timeout for eth_call
PreimagesEnabled bool // Expose preimage API
}
type DevnetConfig struct {
@ -303,6 +306,13 @@ type RevertConfig struct {
RevertBefore int
}
type PreimageConfig struct {
ImportFrom string
ExportTo string
GenerateStart uint64
GenerateEnd uint64
}
type LegacyConfig struct {
WebHookConfig *string `toml:",omitempty"`
TPBroadcastInvalidTxn *bool `toml:",omitempty"`

@ -153,6 +153,7 @@ type RPCServerConfig struct {
DebugEnabled bool
PreimagesEnabled bool
EthRPCsEnabled bool
StakingRPCsEnabled bool
LegacyRPCsEnabled bool

@ -161,7 +161,6 @@ var (
feeCollectorsTestnet, numeric.ZeroDec(), ethCommon.Address{},
testnetReshardingEpoch, TestnetSchedule.BlocksPerEpoch(),
)
testnetV5 = MustNewInstance(
2, 30, 8, 0.15,
numeric.MustNewDecFromStr("0.90"), genesis.TNHarmonyAccountsV1,

@ -811,8 +811,8 @@ func (c *ChainConfig) IsHIP30(epoch *big.Int) bool {
// During this epoch, shards 2 and 3 will start sending
// their balances over to shard 0 or 1.
func (c *ChainConfig) IsEpochBeforeHIP30(epoch *big.Int) bool {
return isForked(new(big.Int).Sub(c.HIP30Epoch, common.Big1), epoch)
func (c *ChainConfig) IsOneEpochBeforeHIP30(epoch *big.Int) bool {
return epoch.Sub(c.HIP30Epoch, epoch).Cmp(common.Big1) == 0
}
// UpdateEthChainIDByShard update the ethChainID based on shard ID.

@ -0,0 +1,17 @@
package params
import (
"math/big"
"testing"
"github.com/stretchr/testify/require"
)
func TestIsOneEpochBeforeHIP30(t *testing.T) {
c := ChainConfig{
HIP30Epoch: big.NewInt(3),
}
require.True(t, c.IsOneEpochBeforeHIP30(big.NewInt(2)))
require.False(t, c.IsOneEpochBeforeHIP30(big.NewInt(3)))
}

@ -24,6 +24,8 @@ const (
CallNewAccountGas uint64 = 25000 // Paid for CALL when the destination address didn't exist prior.
// TxGas ...
TxGas uint64 = 21000 // Per transaction not creating a contract. NOTE: Not payable on data of calls between transactions.
// TxGasXShard
TxGasXShard uint64 = 23000 // Approximate cost for transferring native tokens across shards. Used in balance migration
// TxGasContractCreation ...
TxGasContractCreation uint64 = 53000 // Per transaction that creates a contract. NOTE: Not payable on data of calls between transactions.
// TxGasValidatorCreation ...

@ -100,8 +100,12 @@ func (sc *CollectionImpl) ShardChain(shardID uint32, options ...core.Options) (c
}
}
var cacheConfig *core.CacheConfig
// archival node
if sc.disableCache[shardID] {
cacheConfig = &core.CacheConfig{Disabled: true}
cacheConfig = &core.CacheConfig{
Disabled: true,
Preimages: true,
}
utils.Logger().Info().
Uint32("shardID", shardID).
Msg("disable cache, running in archival mode")
@ -110,6 +114,7 @@ func (sc *CollectionImpl) ShardChain(shardID uint32, options ...core.Options) (c
TrieNodeLimit: 256,
TrieTimeLimit: 2 * time.Minute,
TriesInMemory: 128,
Preimages: true,
}
if sc.harmonyconfig != nil {
cacheConfig.TriesInMemory = uint64(sc.harmonyconfig.General.TriesInMemory)

@ -255,21 +255,38 @@ func (node *Node) tryBroadcastStaking(stakingTx *staking.StakingTransaction) {
// Add new transactions to the pending transaction list.
func addPendingTransactions(registry *registry.Registry, newTxs types.Transactions) []error {
var (
errs []error
bc = registry.GetBlockchain()
txPool = registry.GetTxPool()
poolTxs = types.PoolTransactions{}
acceptCx = bc.Config().AcceptsCrossTx(bc.CurrentHeader().Epoch())
errs []error
bc = registry.GetBlockchain()
txPool = registry.GetTxPool()
poolTxs = types.PoolTransactions{}
epoch = bc.CurrentHeader().Epoch()
acceptCx = bc.Config().AcceptsCrossTx(epoch)
isBeforeHIP30 = bc.Config().IsOneEpochBeforeHIP30(epoch)
nxtShards = shard.Schedule.InstanceForEpoch(new(big.Int).Add(epoch, common.Big1)).NumShards()
)
for _, tx := range newTxs {
if tx.ShardID() != tx.ToShardID() && !acceptCx {
errs = append(errs, errors.WithMessage(errInvalidEpoch, "cross-shard tx not accepted yet"))
continue
if tx.ShardID() != tx.ToShardID() {
if !acceptCx {
errs = append(errs, errors.WithMessage(errInvalidEpoch, "cross-shard tx not accepted yet"))
continue
}
if isBeforeHIP30 {
if tx.ToShardID() >= nxtShards {
errs = append(errs, errors.New("shards 2 and 3 are shutting down in the next epoch"))
continue
}
}
}
if tx.IsEthCompatible() && !bc.Config().IsEthCompatible(bc.CurrentBlock().Epoch()) {
errs = append(errs, errors.WithMessage(errInvalidEpoch, "ethereum tx not accepted yet"))
continue
}
if isBeforeHIP30 {
if bc.ShardID() >= nxtShards {
errs = append(errs, errors.New("shards 2 and 3 are shutting down in the next epoch"))
continue
}
}
poolTxs = append(poolTxs, tx)
}
errs = append(errs, registry.GetTxPool().AddRemotes(poolTxs)...)
@ -480,7 +497,8 @@ func (node *Node) validateNodeMessage(ctx context.Context, payload []byte) (
if err := rlp.DecodeBytes(blocksPayload, &blocks); err != nil {
return nil, 0, errors.Wrap(err, "block decode error")
}
curBeaconHeight := node.Beaconchain().CurrentBlock().NumberU64()
curBeaconBlock := node.EpochChain().CurrentBlock()
curBeaconHeight := curBeaconBlock.NumberU64()
for _, block := range blocks {
// Ban blocks number that is smaller than tolerance
if block.NumberU64()+beaconBlockHeightTolerance <= curBeaconHeight {
@ -490,7 +508,7 @@ func (node *Node) validateNodeMessage(ctx context.Context, payload []byte) (
} else if block.NumberU64()-beaconBlockHeightTolerance > curBeaconHeight {
utils.Logger().Debug().Uint64("receivedNum", block.NumberU64()).
Uint64("currentNum", curBeaconHeight).Msg("beacon block sync message rejected")
return nil, 0, errors.New("beacon block height too much higher than current height beyond tolerance")
return nil, 0, errors.Errorf("beacon block height too much higher than current height beyond tolerance, block %d, current %d, epoch %d , current %d", block.NumberU64(), curBeaconHeight, block.Epoch().Uint64(), curBeaconBlock.Epoch().Uint64())
} else if block.NumberU64() <= curBeaconHeight {
utils.Logger().Debug().Uint64("receivedNum", block.NumberU64()).
Uint64("currentNum", curBeaconHeight).Msg("beacon block sync message ignored")

@ -159,7 +159,8 @@ func (node *Node) ProposeNewBlock(commitSigs chan []byte) (*types.Block, error)
}
}
if !shard.Schedule.IsLastBlock(header.Number().Uint64()) {
// Execute all the time except for last block of epoch for shard 0
if !shard.Schedule.IsLastBlock(header.Number().Uint64()) || node.Consensus.ShardID != 0 {
// Prepare normal and staking transactions retrieved from transaction pool
utils.AnalysisStart("proposeNewBlockChooseFromTxnPool")
@ -202,7 +203,13 @@ func (node *Node) ProposeNewBlock(commitSigs chan []byte) (*types.Block, error)
utils.AnalysisEnd("proposeNewBlockChooseFromTxnPool")
}
// Prepare cross shard transaction receipts
// Prepare incoming cross shard transaction receipts
// These are accepted even during the epoch before hip-30
// because the destination shard only receives them after
// balance is deducted on source shard. to prevent this from
// being a significant problem, the source shards will stop
// accepting txs destined to the shards which are shutting down
// one epoch prior the shut down
receiptsList := node.proposeReceiptsProof()
if len(receiptsList) != 0 {
if err := node.Worker.CommitReceipts(receiptsList); err != nil {
@ -251,7 +258,7 @@ func (node *Node) ProposeNewBlock(commitSigs chan []byte) (*types.Block, error)
len(crossLinksToPropose), len(allPending),
)
} else {
utils.Logger().Error().Err(err).Msgf(
utils.Logger().Warn().Err(err).Msgf(
"[ProposeNewBlock] Unable to Read PendingCrossLinks, number of crosslinks: %d",
len(allPending),
)
@ -267,7 +274,7 @@ func (node *Node) ProposeNewBlock(commitSigs chan []byte) (*types.Block, error)
}
}
node.Worker.ApplyTestnetShardReduction()
node.Worker.ApplyShardReduction()
// Prepare shard state
var shardState *shard.State
if shardState, err = node.Blockchain().SuperCommitteeForNextEpoch(
@ -287,7 +294,9 @@ func (node *Node) ProposeNewBlock(commitSigs chan []byte) (*types.Block, error)
utils.Logger().Error().Err(err).Msg("[ProposeNewBlock] Failed finalizing the new block")
return nil, err
}
utils.Logger().Info().Msg("[ProposeNewBlock] verifying the new block header")
// err = node.Blockchain().Validator().ValidateHeader(finalizedBlock, true)
err = core.NewBlockValidator(node.Blockchain()).ValidateHeader(finalizedBlock, true)
if err != nil {

@ -178,6 +178,38 @@ func (w *Worker) CommitTransactions(
w.current.gasPool = new(core.GasPool).AddGas(w.current.header.GasLimit())
}
// if this is epoch for balance migration, no txs (or stxs)
// will be included in the block
// it is technically feasible for some to end up in the pool
// say, from the last epoch, but those will not be executed
// and no balance will be lost
// any cross-shard transfers destined to a shard being shut down
// will execute (since they are already spent on the source shard)
// but the balance will immediately be returned to shard 1
cx, err := core.MayBalanceMigration(
w.current.gasPool,
w.GetCurrentHeader(),
w.current.state,
w.chain,
)
if err != nil {
if errors.Is(err, core.ErrNoMigrationPossible) {
// means we do not accept transactions from the network
return nil
}
if !errors.Is(err, core.ErrNoMigrationRequired) {
// this shard not migrating => ErrNoMigrationRequired
// any other error means exit this block
return err
}
} else {
if cx != nil {
w.current.outcxs = append(w.current.outcxs, cx)
return nil
}
}
// HARMONY TXNS
normalTxns := types.NewTransactionsByPriceAndNonce(w.current.signer, w.current.ethSigner, pendingNormal)
@ -252,9 +284,9 @@ func (w *Worker) commitStakingTransaction(
return nil
}
// ApplyTestnetShardReduction only used to reduce shards of Testnet
func (w *Worker) ApplyTestnetShardReduction() {
core.MayTestnetShardReduction(w.chain, w.current.state, w.current.header)
// ApplyShardReduction only used to reduce shards of Testnet
func (w *Worker) ApplyShardReduction() {
core.MayShardReduction(w.chain, w.current.state, w.current.header)
}
var (
@ -416,16 +448,6 @@ func (w *Worker) GetCurrentReceipts() []*types.Receipt {
return w.current.receipts
}
// OutgoingReceipts get the receipts generated starting from the last state.
func (w *Worker) OutgoingReceipts() []*types.CXReceipt {
return w.current.outcxs
}
// IncomingReceipts get incoming receipts in destination shard that is received from source shard
func (w *Worker) IncomingReceipts() []*types.CXReceiptsProof {
return w.current.incxs
}
// CollectVerifiedSlashes sets w.current.slashes only to those that
// past verification
func (w *Worker) CollectVerifiedSlashes() error {

@ -0,0 +1,35 @@
package rpc
import (
"context"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/eth/rpc"
"github.com/harmony-one/harmony/hmy"
)
type PreimagesService struct {
hmy *hmy.Harmony
}
// NewPreimagesAPI creates a new API for the RPC interface
func NewPreimagesAPI(hmy *hmy.Harmony, version string) rpc.API {
var service interface{} = &PreimagesService{hmy}
return rpc.API{
Namespace: version,
Version: APIVersion,
Service: service,
Public: true,
}
}
func (s *PreimagesService) Export(ctx context.Context, path string) error {
// these are by default not blocking
return core.ExportPreimages(s.hmy.BlockChain, path)
}
func (s *PreimagesService) Verify(ctx context.Context) (uint64, error) {
currentBlock := s.hmy.CurrentBlock()
// these are by default not blocking
return core.VerifyPreimages(currentBlock.Header(), s.hmy.BlockChain)
}

@ -42,7 +42,7 @@ const (
var (
// HTTPModules ..
HTTPModules = []string{"hmy", "hmyv2", "eth", "debug", "trace", netNamespace, netV1Namespace, netV2Namespace, web3Namespace, "explorer"}
HTTPModules = []string{"hmy", "hmyv2", "eth", "debug", "trace", netNamespace, netV1Namespace, netV2Namespace, web3Namespace, "explorer", "preimages"}
// WSModules ..
WSModules = []string{"hmy", "hmyv2", "eth", "debug", "trace", netNamespace, netV1Namespace, netV2Namespace, web3Namespace, "web3"}
@ -71,6 +71,9 @@ func (n Version) Namespace() string {
func StartServers(hmy *hmy.Harmony, apis []rpc.API, config nodeconfig.RPCServerConfig, rpcOpt harmony.RpcOptConfig) error {
apis = append(apis, getAPIs(hmy, config)...)
authApis := append(apis, getAuthAPIs(hmy, config.DebugEnabled, config.RateLimiterEnabled, config.RequestsPerSecond)...)
if rpcOpt.PreimagesEnabled {
authApis = append(authApis, NewPreimagesAPI(hmy, "preimages"))
}
// load method filter from file (if exist)
var rmf rpc.RpcMethodFilter
rpcFilterFilePath := strings.TrimSpace(rpcOpt.RpcFilterFile)

@ -31,6 +31,13 @@ var (
TwoSecStakedBlocks = numeric.NewDecFromBigInt(new(big.Int).Mul(
big.NewInt(7*denominations.Nano), big.NewInt(denominations.Nano),
))
// HIP30StakedBlocks is the reward received after HIP-30 goes into
// effect. It is simply double the TwoSecStakedBlocks reward, since
// the number of shards is being halved and we keep emission
// constant.
HIP30StakedBlocks = numeric.NewDecFromBigInt(new(big.Int).Mul(
big.NewInt(14*denominations.Nano), big.NewInt(denominations.Nano),
))
// TotalInitialTokens is the total amount of tokens (in ONE) at block 0 of the network.
// This should be set/change on the node's init according to the core.GenesisSpec.

Loading…
Cancel
Save