diff --git a/api/service/legacysync/epoch_syncing.go b/api/service/legacysync/epoch_syncing.go index 5d9b4dab1..7719c8a81 100644 --- a/api/service/legacysync/epoch_syncing.go +++ b/api/service/legacysync/epoch_syncing.go @@ -138,6 +138,9 @@ func syncLoop(bc core.BlockChain, syncConfig *SyncConfig) (timeout int) { err := ProcessStateSync(syncConfig, heights, bc) if err != nil { + if errors.Is(err, core.ErrKnownBlock) { + return 10 + } utils.Logger().Error().Err(err). Msgf("[EPOCHSYNC] ProcessStateSync failed (isBeacon: %t, ShardID: %d, otherEpoch: %d, currentEpoch: %d)", isBeacon, bc.ShardID(), otherEpoch, curEpoch) diff --git a/api/service/legacysync/syncing.go b/api/service/legacysync/syncing.go index 92c8a457f..a85a5e9d5 100644 --- a/api/service/legacysync/syncing.go +++ b/api/service/legacysync/syncing.go @@ -860,11 +860,12 @@ func (ss *StateSync) getBlockFromLastMileBlocksByParentHash(parentHash common.Ha } // UpdateBlockAndStatus ... -func (ss *StateSync) UpdateBlockAndStatus(block *types.Block, bc core.BlockChain, verifyAllSig bool) error { +func (ss *StateSync) UpdateBlockAndStatus(block *types.Block, bc core.BlockChain) error { if block.NumberU64() != bc.CurrentBlock().NumberU64()+1 { utils.Logger().Debug().Uint64("curBlockNum", bc.CurrentBlock().NumberU64()).Uint64("receivedBlockNum", block.NumberU64()).Msg("[SYNC] Inappropriate block number, ignore!") return nil } + verifyAllSig := true haveCurrentSig := len(block.GetCurrentCommitSig()) != 0 // Verify block signatures @@ -911,6 +912,7 @@ func (ss *StateSync) UpdateBlockAndStatus(block *types.Block, bc core.BlockChain Uint64("blockEpoch", block.Epoch().Uint64()). Str("blockHex", block.Hash().Hex()). Uint32("ShardID", block.ShardID()). + Err(err). Msg("[SYNC] UpdateBlockAndStatus: Block exists") return nil case err != nil: @@ -954,8 +956,8 @@ func (ss *StateSync) generateNewState(bc core.BlockChain) error { break } // Enforce sig check for the last block in a batch - enforceSigCheck := !commonIter.HasNext() - err = ss.UpdateBlockAndStatus(block, bc, enforceSigCheck) + _ = !commonIter.HasNext() + err = ss.UpdateBlockAndStatus(block, bc) if err != nil { break } @@ -972,7 +974,7 @@ func (ss *StateSync) generateNewState(bc core.BlockChain) error { if block == nil { break } - err = ss.UpdateBlockAndStatus(block, bc, true) + err = ss.UpdateBlockAndStatus(block, bc) if err != nil { break } @@ -993,7 +995,7 @@ func (ss *StateSync) generateNewState(bc core.BlockChain) error { if block == nil { break } - err = ss.UpdateBlockAndStatus(block, bc, false) + err = ss.UpdateBlockAndStatus(block, bc) if err != nil { break } @@ -1121,6 +1123,9 @@ func (ss *StateSync) SyncLoop(bc core.BlockChain, isBeacon bool, consensus *cons } err := ss.ProcessStateSync(startHash[:], size, bc) if err != nil { + if errors.Is(err, core.ErrKnownBlock) { + continue + } utils.Logger().Error().Err(err). Msgf("[SYNC] ProcessStateSync failed (isBeacon: %t, ShardID: %d, otherHeight: %d, currentHeight: %d)", isBeacon, bc.ShardID(), otherHeight, currentHeight) diff --git a/api/service/stagedstreamsync/stage_statesync_full.go b/api/service/stagedstreamsync/stage_statesync_full.go index c1579114b..f5bd213af 100644 --- a/api/service/stagedstreamsync/stage_statesync_full.go +++ b/api/service/stagedstreamsync/stage_statesync_full.go @@ -190,7 +190,11 @@ func (sss *StageFullStateSync) runStateWorkerLoop(ctx context.Context, sdm *Full default: } accountTasks, codes, storages, healtask, codetask, nTasks, err := sdm.GetNextBatch() - if nTasks == 0 || err != nil { + if nTasks == 0 { + utils.Logger().Debug().Msg("the state worker loop received no more tasks") + return + } + if err != nil { select { case <-ctx.Done(): return @@ -199,7 +203,7 @@ func (sss *StageFullStateSync) runStateWorkerLoop(ctx context.Context, sdm *Full } } - if len(accountTasks) > 0 { + if accountTasks != nil && len(accountTasks) > 0 { task := accountTasks[0] origin := task.Next @@ -222,8 +226,8 @@ func (sss *StageFullStateSync) runStateWorkerLoop(ctx context.Context, sdm *Full utils.Logger().Warn(). Str("stream", string(stid)). Msg(WrapStagedSyncMsg("GetAccountRange failed, received empty accounts")) - err := errors.New("GetAccountRange received empty slots") - sdm.HandleRequestError(accountTasks, codes, storages, healtask, codetask, stid, err) + //err := errors.New("GetAccountRange received empty slots") + //sdm.HandleRequestError(accountTasks, codes, storages, healtask, codetask, stid, err) return } if err := sdm.HandleAccountRequestResult(task, retAccounts, proof, origin[:], limit[:], loopID, stid); err != nil { @@ -236,7 +240,7 @@ func (sss *StageFullStateSync) runStateWorkerLoop(ctx context.Context, sdm *Full return } - } else if len(codes) > 0 { + } else if codes != nil && len(codes) > 0 { stid, err := sss.downloadByteCodes(ctx, sdm, codes, loopID) if err != nil { @@ -252,7 +256,7 @@ func (sss *StageFullStateSync) runStateWorkerLoop(ctx context.Context, sdm *Full return } - } else if len(storages.accounts) > 0 { + } else if storages != nil && len(storages.accounts) > 0 { root := storages.root roots := storages.roots @@ -295,7 +299,7 @@ func (sss *StageFullStateSync) runStateWorkerLoop(ctx context.Context, sdm *Full } else { // assign trie node Heal Tasks - if len(healtask.hashes) > 0 { + if healtask != nil && len(healtask.hashes) > 0 { root := healtask.root task := healtask.task hashes := healtask.hashes @@ -334,7 +338,7 @@ func (sss *StageFullStateSync) runStateWorkerLoop(ctx context.Context, sdm *Full } } - if len(codetask.hashes) > 0 { + if codetask != nil && len(codetask.hashes) > 0 { task := codetask.task hashes := codetask.hashes bytes := codetask.bytes diff --git a/api/service/stagedstreamsync/syncing.go b/api/service/stagedstreamsync/syncing.go index c3bc585f2..0db0dd4e2 100644 --- a/api/service/stagedstreamsync/syncing.go +++ b/api/service/stagedstreamsync/syncing.go @@ -465,6 +465,10 @@ func (s *StagedStreamSync) CurrentBlockNumber() uint64 { return s.bc.CurrentBlock().NumberU64() } + if s.status.pivotBlock != nil && s.bc.CurrentFastBlock().NumberU64() >= s.status.pivotBlock.NumberU64() { + return s.bc.CurrentFastBlock().NumberU64() + } + current := uint64(0) switch s.config.SyncMode { case FullSync: diff --git a/api/service/stagedsync/stage_lastmile.go b/api/service/stagedsync/stage_lastmile.go index df6079bd0..13fece8ee 100644 --- a/api/service/stagedsync/stage_lastmile.go +++ b/api/service/stagedsync/stage_lastmile.go @@ -49,7 +49,7 @@ func (lm *StageLastMile) Exec(firstCycle bool, invalidBlockRevert bool, s *Stage if block == nil { break } - err = s.state.UpdateBlockAndStatus(block, bc, true) + err = s.state.UpdateBlockAndStatus(block, bc) if err != nil { break } @@ -70,7 +70,7 @@ func (lm *StageLastMile) Exec(firstCycle bool, invalidBlockRevert bool, s *Stage if block == nil { break } - err = s.state.UpdateBlockAndStatus(block, bc, false) + err = s.state.UpdateBlockAndStatus(block, bc) if err != nil { break } diff --git a/api/service/stagedsync/stagedsync.go b/api/service/stagedsync/stagedsync.go index 83be4bae4..7959a05d2 100644 --- a/api/service/stagedsync/stagedsync.go +++ b/api/service/stagedsync/stagedsync.go @@ -1035,7 +1035,7 @@ func (ss *StagedSync) getBlockFromLastMileBlocksByParentHash(parentHash common.H } // UpdateBlockAndStatus updates block and its status in db -func (ss *StagedSync) UpdateBlockAndStatus(block *types.Block, bc core.BlockChain, verifyAllSig bool) error { +func (ss *StagedSync) UpdateBlockAndStatus(block *types.Block, bc core.BlockChain) error { if block.NumberU64() != bc.CurrentBlock().NumberU64()+1 { utils.Logger().Debug(). Uint64("curBlockNum", bc.CurrentBlock().NumberU64()). @@ -1043,6 +1043,7 @@ func (ss *StagedSync) UpdateBlockAndStatus(block *types.Block, bc core.BlockChai Msg("[STAGED_SYNC] Inappropriate block number, ignore!") return nil } + verifyAllSig := true haveCurrentSig := len(block.GetCurrentCommitSig()) != 0 // Verify block signatures diff --git a/cmd/harmony/config.go b/cmd/harmony/config.go index 5a41f22da..037221835 100644 --- a/cmd/harmony/config.go +++ b/cmd/harmony/config.go @@ -145,6 +145,35 @@ func getDefaultSyncConfig(nt nodeconfig.NetworkType) harmonyconfig.SyncConfig { } } +func getDefaultCacheConfig(nt nodeconfig.NetworkType) harmonyconfig.CacheConfig { + cacheConfig := harmonyconfig.CacheConfig{ + Disabled: defaultCacheConfig.Disabled, + TrieNodeLimit: defaultCacheConfig.TrieNodeLimit, + TriesInMemory: defaultCacheConfig.TriesInMemory, + TrieTimeLimit: defaultCacheConfig.TrieTimeLimit, + SnapshotLimit: defaultCacheConfig.SnapshotLimit, + SnapshotWait: defaultCacheConfig.SnapshotWait, + Preimages: defaultCacheConfig.Preimages, + SnapshotNoBuild: defaultCacheConfig.SnapshotNoBuild, + } + + switch nt { + case nodeconfig.Mainnet: + cacheConfig.Disabled = true + cacheConfig.Preimages = true + case nodeconfig.Testnet: + cacheConfig.Disabled = false + cacheConfig.Preimages = true + case nodeconfig.Localnet: + cacheConfig.Disabled = false + cacheConfig.Preimages = false + default: + cacheConfig.Disabled = false + cacheConfig.Preimages = true + } + return cacheConfig +} + var configCmd = &cobra.Command{ Use: "config", Short: "dump or update config", diff --git a/cmd/harmony/config_migrations.go b/cmd/harmony/config_migrations.go index 8f222b3d6..0db87d074 100644 --- a/cmd/harmony/config_migrations.go +++ b/cmd/harmony/config_migrations.go @@ -334,7 +334,7 @@ func init() { migrations["2.5.11"] = func(confTree *toml.Tree) *toml.Tree { if confTree.Get("General.TriesInMemory") == nil { - confTree.Set("General.TriesInMemory", defaultConfig.General.TriesInMemory) + confTree.Set("General.TriesInMemory", defaultConfig.Cache.TriesInMemory) } confTree.Set("Version", "2.5.12") return confTree @@ -405,6 +405,17 @@ func init() { return confTree } + migrations["2.6.0"] = func(confTree *toml.Tree) *toml.Tree { + confTree.Delete("General.TriesInMemory") + + if confTree.Get("Cache") == nil { + confTree.Set("Cache", defaultConfig.Cache) + } + // upgrade minor version because of `Cache` section introduction + confTree.Set("Version", "2.6.1") + return confTree + } + // check that the latest version here is the same as in default.go largestKey := getNextVersion(migrations) if largestKey != tomlConfigVersion { diff --git a/cmd/harmony/default.go b/cmd/harmony/default.go index 86ed4226a..22b964b99 100644 --- a/cmd/harmony/default.go +++ b/cmd/harmony/default.go @@ -1,13 +1,15 @@ package main import ( + "time" + "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/hmy" harmonyconfig "github.com/harmony-one/harmony/internal/configs/harmony" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" ) -const tomlConfigVersion = "2.6.0" +const tomlConfigVersion = "2.6.1" const ( defNetworkType = nodeconfig.Mainnet @@ -24,7 +26,6 @@ var defaultConfig = harmonyconfig.HarmonyConfig{ IsOffline: false, DataDir: "./", TraceEnable: false, - TriesInMemory: 128, }, Network: getDefaultNetworkConfig(defNetworkType), P2P: harmonyconfig.P2pConfig{ @@ -131,6 +132,7 @@ var defaultConfig = harmonyconfig.HarmonyConfig{ LowUsageThreshold: hmy.DefaultGPOConfig.LowUsageThreshold, BlockGasLimit: hmy.DefaultGPOConfig.BlockGasLimit, }, + Cache: getDefaultCacheConfig(defNetworkType), } var defaultSysConfig = harmonyconfig.SysConfig{ @@ -271,6 +273,17 @@ var ( } ) +var defaultCacheConfig = harmonyconfig.CacheConfig{ + Disabled: false, + TrieNodeLimit: 256, + TriesInMemory: 128, + TrieTimeLimit: 2 * time.Minute, + SnapshotLimit: 256, + SnapshotWait: true, + Preimages: true, + SnapshotNoBuild: false, +} + const ( defaultBroadcastInvalidTx = false ) @@ -285,6 +298,7 @@ func getDefaultHmyConfigCopy(nt nodeconfig.NetworkType) harmonyconfig.HarmonyCon } config.Sync = getDefaultSyncConfig(nt) config.DNSSync = getDefaultDNSSyncConfig(nt) + config.Cache = getDefaultCacheConfig(nt) return config } @@ -324,6 +338,11 @@ func getDefaultPrometheusConfigCopy() harmonyconfig.PrometheusConfig { return config } +func getDefaultCacheConfigCopy() harmonyconfig.CacheConfig { + config := defaultCacheConfig + return config +} + const ( nodeTypeValidator = "validator" nodeTypeExplorer = "explorer" diff --git a/cmd/harmony/flags.go b/cmd/harmony/flags.go index 2af21cb24..a52b7138f 100644 --- a/cmd/harmony/flags.go +++ b/cmd/harmony/flags.go @@ -32,7 +32,6 @@ var ( legacyDataDirFlag, taraceFlag, - triesInMemoryFlag, } dnsSyncFlags = []cli.Flag{ @@ -268,6 +267,16 @@ var ( gpoBlockGasLimitFlag, } + cacheConfigFlags = []cli.Flag{ + cacheDisabled, + cacheTrieNodeLimit, + cacheTriesInMemory, + cachePreimages, + cacheSnapshotLimit, + cacheSnapshotNoBuild, + cacheSnapshotWait, + } + metricsFlags = []cli.Flag{ metricsETHFlag, metricsExpensiveETHFlag, @@ -352,11 +361,6 @@ var ( Usage: "indicates if full transaction tracing should be enabled", DefValue: defaultConfig.General.TraceEnable, } - triesInMemoryFlag = cli.IntFlag{ - Name: "blockchain.tries_in_memory", - Usage: "number of blocks from header stored in disk before exiting", - DefValue: defaultConfig.General.TriesInMemory, - } ) func getRootFlags() []cli.Flag { @@ -436,14 +440,6 @@ func applyGeneralFlags(cmd *cobra.Command, config *harmonyconfig.HarmonyConfig) if cli.IsFlagChanged(cmd, isBackupFlag) { config.General.IsBackup = cli.GetBoolFlagValue(cmd, isBackupFlag) } - - if cli.IsFlagChanged(cmd, triesInMemoryFlag) { - value := cli.GetIntFlagValue(cmd, triesInMemoryFlag) - if value <= 2 { - panic("Must provide number greater than 2 for General.TriesInMemory") - } - config.General.TriesInMemory = value - } } // network flags @@ -2115,3 +2111,70 @@ func applyGPOFlags(cmd *cobra.Command, cfg *harmonyconfig.HarmonyConfig) { cfg.GPO.BlockGasLimit = cli.GetIntFlagValue(cmd, gpoBlockGasLimitFlag) } } + +// cache config flags +var ( + cacheDisabled = cli.BoolFlag{ + Name: "cache.disabled", + Usage: "Whether to disable trie write caching (archive node)", + DefValue: defaultCacheConfig.Disabled, + } + cacheTrieNodeLimit = cli.IntFlag{ + Name: "cache.trie_node_limit", + Usage: " Memory limit (MB) at which to flush the current in-memory trie to disk", + DefValue: defaultCacheConfig.TrieNodeLimit, + } + cacheTriesInMemory = cli.Uint64Flag{ + Name: "cache.tries_in_memory", + Usage: "Block number from the head stored in disk before exiting", + DefValue: defaultCacheConfig.TriesInMemory, + } + cachePreimages = cli.BoolFlag{ + Name: "cache.preimages", + Usage: "Whether to store preimage of trie key to the disk", + DefValue: defaultCacheConfig.Preimages, + } + cacheSnapshotLimit = cli.IntFlag{ + Name: "cache.snapshot_limit", + Usage: "Memory allowance (MB) to use for caching snapshot entries in memory", + DefValue: defaultCacheConfig.SnapshotLimit, + } + cacheSnapshotNoBuild = cli.BoolFlag{ + Name: "cache.snapshot_no_build", + Usage: "Whether the background generation is allowed", + DefValue: defaultCacheConfig.SnapshotNoBuild, + } + cacheSnapshotWait = cli.BoolFlag{ + Name: "cache.snapshot_wait", + Usage: "Wait for snapshot construction on startup", + DefValue: defaultCacheConfig.SnapshotWait, + } +) + +func applyCacheFlags(cmd *cobra.Command, cfg *harmonyconfig.HarmonyConfig) { + if cli.IsFlagChanged(cmd, cacheDisabled) { + cfg.Cache.Disabled = cli.GetBoolFlagValue(cmd, cacheDisabled) + } + if cli.IsFlagChanged(cmd, cacheTrieNodeLimit) { + cfg.Cache.TrieNodeLimit = cli.GetIntFlagValue(cmd, cacheTrieNodeLimit) + } + if cli.IsFlagChanged(cmd, cacheTriesInMemory) { + value := cli.GetUint64FlagValue(cmd, cacheTriesInMemory) + if value <= 2 { + panic("Must provide number greater than 2 for Cache.TriesInMemory") + } + cfg.Cache.TriesInMemory = value + } + if cli.IsFlagChanged(cmd, cachePreimages) { + cfg.Cache.Preimages = cli.GetBoolFlagValue(cmd, cachePreimages) + } + if cli.IsFlagChanged(cmd, cacheSnapshotLimit) { + cfg.Cache.SnapshotLimit = cli.GetIntFlagValue(cmd, cacheSnapshotLimit) + } + if cli.IsFlagChanged(cmd, cacheSnapshotNoBuild) { + cfg.Cache.SnapshotNoBuild = cli.GetBoolFlagValue(cmd, cacheSnapshotNoBuild) + } + if cli.IsFlagChanged(cmd, cacheSnapshotWait) { + cfg.Cache.SnapshotWait = cli.GetBoolFlagValue(cmd, cacheSnapshotWait) + } +} diff --git a/cmd/harmony/flags_test.go b/cmd/harmony/flags_test.go index bea0e0eab..ffe261b39 100644 --- a/cmd/harmony/flags_test.go +++ b/cmd/harmony/flags_test.go @@ -37,12 +37,11 @@ func TestHarmonyFlags(t *testing.T) { expConfig: harmonyconfig.HarmonyConfig{ Version: tomlConfigVersion, General: harmonyconfig.GeneralConfig{ - NodeType: "validator", - NoStaking: false, - ShardID: -1, - IsArchival: false, - DataDir: "./", - TriesInMemory: 128, + NodeType: "validator", + NoStaking: false, + ShardID: -1, + IsArchival: false, + DataDir: "./", }, Network: harmonyconfig.NetworkConfig{ NetworkType: "mainnet", @@ -183,6 +182,16 @@ func TestHarmonyFlags(t *testing.T) { LowUsageThreshold: defaultConfig.GPO.LowUsageThreshold, BlockGasLimit: defaultConfig.GPO.BlockGasLimit, }, + Cache: harmonyconfig.CacheConfig{ + Disabled: defaultConfig.Cache.Disabled, + TrieNodeLimit: defaultCacheConfig.TrieNodeLimit, + TriesInMemory: defaultConfig.Cache.TriesInMemory, + TrieTimeLimit: defaultConfig.Cache.TrieTimeLimit, + SnapshotLimit: defaultConfig.Cache.SnapshotLimit, + SnapshotWait: defaultConfig.Cache.SnapshotWait, + Preimages: defaultConfig.Cache.Preimages, + SnapshotNoBuild: defaultConfig.Cache.SnapshotNoBuild, + }, }, }, } @@ -208,80 +217,63 @@ func TestGeneralFlags(t *testing.T) { { args: []string{}, expConfig: harmonyconfig.GeneralConfig{ - NodeType: "validator", - NoStaking: false, - ShardID: -1, - IsArchival: false, - DataDir: "./", - TriesInMemory: 128, + NodeType: "validator", + NoStaking: false, + ShardID: -1, + IsArchival: false, + DataDir: "./", }, }, { args: []string{"--run", "explorer", "--run.legacy", "--run.shard=0", "--run.archive=true", "--datadir=./.hmy"}, expConfig: harmonyconfig.GeneralConfig{ - NodeType: "explorer", - NoStaking: true, - ShardID: 0, - IsArchival: true, - DataDir: "./.hmy", - TriesInMemory: 128, + NodeType: "explorer", + NoStaking: true, + ShardID: 0, + IsArchival: true, + DataDir: "./.hmy", }, }, { args: []string{"--node_type", "explorer", "--staking", "--shard_id", "0", "--is_archival", "--db_dir", "./"}, expConfig: harmonyconfig.GeneralConfig{ - NodeType: "explorer", - NoStaking: false, - ShardID: 0, - IsArchival: true, - DataDir: "./", - TriesInMemory: 128, + NodeType: "explorer", + NoStaking: false, + ShardID: 0, + IsArchival: true, + DataDir: "./", }, }, { args: []string{"--staking=false", "--is_archival=false"}, expConfig: harmonyconfig.GeneralConfig{ - NodeType: "validator", - NoStaking: true, - ShardID: -1, - IsArchival: false, - DataDir: "./", - TriesInMemory: 128, + NodeType: "validator", + NoStaking: true, + ShardID: -1, + IsArchival: false, + DataDir: "./", }, }, { args: []string{"--run", "explorer", "--run.shard", "0"}, expConfig: harmonyconfig.GeneralConfig{ - NodeType: "explorer", - NoStaking: false, - ShardID: 0, - IsArchival: false, - DataDir: "./", - TriesInMemory: 128, + NodeType: "explorer", + NoStaking: false, + ShardID: 0, + IsArchival: false, + DataDir: "./", }, }, { args: []string{"--run", "explorer", "--run.shard", "0", "--run.archive=false"}, expConfig: harmonyconfig.GeneralConfig{ - NodeType: "explorer", - NoStaking: false, - ShardID: 0, - IsArchival: false, - DataDir: "./", - TriesInMemory: 128, - }, - }, - { - args: []string{"--blockchain.tries_in_memory", "64"}, - expConfig: harmonyconfig.GeneralConfig{ - NodeType: "validator", - NoStaking: false, - ShardID: -1, - IsArchival: false, - DataDir: "./", - TriesInMemory: 64, + NodeType: "explorer", + NoStaking: false, + ShardID: 0, + IsArchival: false, + DataDir: "./", }, }, } @@ -1435,6 +1427,58 @@ func TestGPOFlags(t *testing.T) { } } +func TestCacheFlags(t *testing.T) { + tests := []struct { + args []string + expConfig harmonyconfig.CacheConfig + expErr error + }{ + { + args: []string{}, + expConfig: harmonyconfig.CacheConfig{ + Disabled: true, // based on network type + TrieNodeLimit: defaultCacheConfig.TrieNodeLimit, + TriesInMemory: defaultCacheConfig.TriesInMemory, + TrieTimeLimit: defaultCacheConfig.TrieTimeLimit, + SnapshotLimit: defaultCacheConfig.SnapshotLimit, + SnapshotWait: defaultCacheConfig.SnapshotWait, + Preimages: defaultCacheConfig.Preimages, // based on network type + SnapshotNoBuild: defaultCacheConfig.SnapshotNoBuild, + }, + }, + { + args: []string{"--cache.disabled=true", "--cache.trie_node_limit", "512", "--cache.tries_in_memory", "256", "--cache.preimages=false", "--cache.snapshot_limit", "512", "--cache.snapshot_no_build=true", "--cache.snapshot_wait=false"}, + expConfig: harmonyconfig.CacheConfig{ + Disabled: true, + TrieNodeLimit: 512, + TriesInMemory: 256, + TrieTimeLimit: 2 * time.Minute, + SnapshotLimit: 512, + SnapshotWait: false, + Preimages: false, + SnapshotNoBuild: true, + }, + }, + } + + for i, test := range tests { + ts := newFlagTestSuite(t, cacheConfigFlags, applyCacheFlags) + hc, err := ts.run(test.args) + + if assErr := assertError(err, test.expErr); assErr != nil { + t.Fatalf("Test %v: %v", i, assErr) + } + if err != nil || test.expErr != nil { + continue + } + + if !reflect.DeepEqual(hc.Cache, test.expConfig) { + t.Errorf("Test %v:\n\t%+v\n\t%+v", i, hc.Cache, test.expConfig) + } + ts.tearDown() + } +} + func TestDevnetFlags(t *testing.T) { tests := []struct { args []string diff --git a/cmd/harmony/main.go b/cmd/harmony/main.go index ec05e2419..31332b2e6 100644 --- a/cmd/harmony/main.go +++ b/cmd/harmony/main.go @@ -245,6 +245,7 @@ func applyRootFlags(cmd *cobra.Command, config *harmonyconfig.HarmonyConfig) { applySyncFlags(cmd, config) applyShardDataFlags(cmd, config) applyGPOFlags(cmd, config) + applyCacheFlags(cmd, config) } func setupNodeLog(config harmonyconfig.HarmonyConfig) { @@ -1061,8 +1062,8 @@ func setupBlacklist(hc harmonyconfig.HarmonyConfig) (map[ethCommon.Address]struc return addrMap, nil } -func parseAllowedTxs(data []byte) (map[ethCommon.Address]core.AllowedTxData, error) { - allowedTxs := make(map[ethCommon.Address]core.AllowedTxData) +func parseAllowedTxs(data []byte) (map[ethCommon.Address][]core.AllowedTxData, error) { + allowedTxs := make(map[ethCommon.Address][]core.AllowedTxData) for _, line := range strings.Split(string(data), "\n") { line = strings.TrimSpace(line) if len(line) != 0 { // AllowedTxs file may have trailing empty string line @@ -1083,16 +1084,16 @@ func parseAllowedTxs(data []byte) (map[ethCommon.Address]core.AllowedTxData, err if err != nil { return nil, err } - allowedTxs[from] = core.AllowedTxData{ + allowedTxs[from] = append(allowedTxs[from], core.AllowedTxData{ To: to, Data: data, - } + }) } } return allowedTxs, nil } -func setupAllowedTxs(hc harmonyconfig.HarmonyConfig) (map[ethCommon.Address]core.AllowedTxData, error) { +func setupAllowedTxs(hc harmonyconfig.HarmonyConfig) (map[ethCommon.Address][]core.AllowedTxData, error) { utils.Logger().Debug().Msgf("Using AllowedTxs file at `%s`", hc.TxPool.AllowedTxsFile) data, err := os.ReadFile(hc.TxPool.AllowedTxsFile) if err != nil { diff --git a/cmd/harmony/main_test.go b/cmd/harmony/main_test.go index 0ee836f33..c6b2db9c4 100644 --- a/cmd/harmony/main_test.go +++ b/cmd/harmony/main_test.go @@ -16,22 +16,26 @@ func TestAllowedTxsParse(t *testing.T) { one1s4dvv454dtmkzsulffz3epewsyhrjq9y0g3fqz->0x985458E523dB3d53125813eD68c274899e9DfAb4:0xa9059cbb one1s4dvv454dtmkzsulffz3epewsyhrjq9y0g3fqz->one10fhdp2g9q5azrs2ukk608x6krd4rleg0ueskug:0x `) - expected := map[ethCommon.Address]core.AllowedTxData{ - common.HexToAddress("0x7A6Ed0a905053A21C15cB5b4F39b561B6A3FE50f"): core.AllowedTxData{ - To: common.HexToAddress("0x855Ac656956AF761439f4a451c872E812E3900a4"), - Data: common.FromHex("0x"), + expected := map[ethCommon.Address][]core.AllowedTxData{ + common.HexToAddress("0x7A6Ed0a905053A21C15cB5b4F39b561B6A3FE50f"): { + core.AllowedTxData{ + To: common.HexToAddress("0x855Ac656956AF761439f4a451c872E812E3900a4"), + Data: common.FromHex("0x"), + }, + core.AllowedTxData{ + To: common.HexToAddress("0x985458E523dB3d53125813eD68c274899e9DfAb4"), + Data: common.FromHex("0xa9059cbb"), + }, }, - common.HexToAddress("0x7A6Ed0a905053A21C15cB5b4F39b561B6A3FE50f"): core.AllowedTxData{ - To: common.HexToAddress("0x985458E523dB3d53125813eD68c274899e9DfAb4"), - Data: common.FromHex("0xa9059cbb"), - }, - common.HexToAddress("0x855Ac656956AF761439f4a451c872E812E3900a4"): core.AllowedTxData{ - To: common.HexToAddress("0x985458E523dB3d53125813eD68c274899e9DfAb4"), - Data: common.FromHex("0xa9059cbb"), - }, - common.HexToAddress("0x855Ac656956AF761439f4a451c872E812E3900a4"): core.AllowedTxData{ - To: common.HexToAddress("0x7A6Ed0a905053A21C15cB5b4F39b561B6A3FE50f"), - Data: common.FromHex("0x"), + common.HexToAddress("0x855Ac656956AF761439f4a451c872E812E3900a4"): { + core.AllowedTxData{ + To: common.HexToAddress("0x985458E523dB3d53125813eD68c274899e9DfAb4"), + Data: common.FromHex("0xa9059cbb"), + }, + core.AllowedTxData{ + To: common.HexToAddress("0x7A6Ed0a905053A21C15cB5b4F39b561B6A3FE50f"), + Data: common.FromHex("0x"), + }, }, } got, err := parseAllowedTxs(testData) @@ -41,10 +45,12 @@ func TestAllowedTxsParse(t *testing.T) { if len(got) != len(expected) { t.Errorf("lenght of allowed transactions not equal, got: %d expected: %d", len(got), len(expected)) } - for from, txData := range got { - expectedTxData := expected[from] - if expectedTxData.To != txData.To || !bytes.Equal(expectedTxData.Data, txData.Data) { - t.Errorf("txData not equal: got: %v expected: %v", txData, expectedTxData) + for from, txsData := range got { + for i, txData := range txsData { + expectedTxData := expected[from][i] + if expectedTxData.To != txData.To || !bytes.Equal(expectedTxData.Data, txData.Data) { + t.Errorf("txData not equal: got: %v expected: %v", txData, expectedTxData) + } } } } diff --git a/consensus/checks.go b/consensus/checks.go index 32f59fb93..b4e1d1207 100644 --- a/consensus/checks.go +++ b/consensus/checks.go @@ -4,13 +4,13 @@ import ( "bytes" "encoding/binary" - protobuf "github.com/golang/protobuf/proto" libbls "github.com/harmony-one/bls/ffi/go/bls" msg_pb "github.com/harmony-one/harmony/api/proto/message" "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/crypto/bls" "github.com/harmony-one/harmony/crypto/hash" "github.com/pkg/errors" + protobuf "google.golang.org/protobuf/proto" ) // MaxBlockNumDiff limits the received block number to only 100 further from the current block number diff --git a/consensus/consensus.go b/consensus/consensus.go index 18b53e682..63f18a85e 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -46,7 +46,7 @@ type DownloadAsync interface { // Consensus is the main struct with all states and data related to consensus process. type Consensus struct { - Decider quorum.Decider + decider quorum.Decider // FBFTLog stores the pbft messages and blocks during FBFT process fBFTLog *FBFTLog // phase: different phase of FBFT protocol: pre-prepare, prepare, commit, finish etc @@ -200,7 +200,9 @@ func (consensus *Consensus) BlocksNotSynchronized(reason string) { // VdfSeedSize returns the number of VRFs for VDF computation func (consensus *Consensus) VdfSeedSize() int { - return int(consensus.Decider.ParticipantsCount()) * 2 / 3 + consensus.mutex.RLock() + defer consensus.mutex.RUnlock() + return int(consensus.decider.ParticipantsCount()) * 2 / 3 } // GetPublicKeys returns the public keys @@ -252,6 +254,8 @@ func (consensus *Consensus) getConsensusLeaderPrivateKey() (*bls.PrivateKeyWrapp } func (consensus *Consensus) IsBackup() bool { + consensus.mutex.RLock() + defer consensus.mutex.RUnlock() return consensus.isBackup } @@ -275,7 +279,7 @@ func New( fBFTLog: NewFBFTLog(), phase: FBFTAnnounce, current: State{mode: Normal}, - Decider: Decider, + decider: Decider, registry: registry, MinPeers: minPeers, AggregateSig: aggregateSig, @@ -322,6 +326,10 @@ func (consensus *Consensus) Registry() *registry.Registry { return consensus.registry } +func (consensus *Consensus) Decider() quorum.Decider { + return quorum.NewThreadSafeDecider(consensus.decider, consensus.mutex) +} + // InitConsensusWithValidators initialize shard state // from latest epoch and update committee pub // keys for consensus diff --git a/consensus/consensus_service.go b/consensus/consensus_service.go index 40f0bc23d..d658fe83d 100644 --- a/consensus/consensus_service.go +++ b/consensus/consensus_service.go @@ -5,27 +5,26 @@ import ( "sync/atomic" "time" - "github.com/harmony-one/harmony/core" - "github.com/harmony-one/harmony/core/types" - "github.com/harmony-one/harmony/crypto/bls" - "github.com/harmony-one/harmony/multibls" - "github.com/harmony-one/harmony/webhooks" - "github.com/ethereum/go-ethereum/common" - protobuf "github.com/golang/protobuf/proto" bls_core "github.com/harmony-one/bls/ffi/go/bls" msg_pb "github.com/harmony-one/harmony/api/proto/message" consensus_engine "github.com/harmony-one/harmony/consensus/engine" "github.com/harmony-one/harmony/consensus/quorum" "github.com/harmony-one/harmony/consensus/signature" + "github.com/harmony-one/harmony/core" + "github.com/harmony-one/harmony/core/types" + "github.com/harmony-one/harmony/crypto/bls" bls_cosi "github.com/harmony-one/harmony/crypto/bls" "github.com/harmony-one/harmony/crypto/hash" "github.com/harmony-one/harmony/internal/chain" "github.com/harmony-one/harmony/internal/utils" + "github.com/harmony-one/harmony/multibls" "github.com/harmony-one/harmony/shard" "github.com/harmony-one/harmony/shard/committee" + "github.com/harmony-one/harmony/webhooks" "github.com/pkg/errors" "github.com/rs/zerolog" + protobuf "google.golang.org/protobuf/proto" ) // WaitForNewRandomness listens to the RndChannel to receive new VDF randomness. @@ -82,7 +81,7 @@ func (consensus *Consensus) UpdatePublicKeys(pubKeys, allowlist []bls_cosi.Publi } func (consensus *Consensus) updatePublicKeys(pubKeys, allowlist []bls_cosi.PublicKeyWrapper) int64 { - consensus.Decider.UpdateParticipants(pubKeys, allowlist) + consensus.decider.UpdateParticipants(pubKeys, allowlist) consensus.getLogger().Info().Msg("My Committee updated") for i := range pubKeys { consensus.getLogger().Info(). @@ -91,7 +90,7 @@ func (consensus *Consensus) updatePublicKeys(pubKeys, allowlist []bls_cosi.Publi Msg("Member") } - allKeys := consensus.Decider.Participants() + allKeys := consensus.decider.Participants() if len(allKeys) != 0 { consensus.LeaderPubKey = &allKeys[0] consensus.getLogger().Info(). @@ -115,7 +114,7 @@ func (consensus *Consensus) updatePublicKeys(pubKeys, allowlist []bls_cosi.Publi if !consensus.isViewChangingMode() { consensus.resetViewChangeState() } - return consensus.Decider.ParticipantsCount() + return consensus.decider.ParticipantsCount() } // Sign on the hash of the message @@ -144,7 +143,7 @@ func (consensus *Consensus) updateBitmaps() { consensus.getLogger().Debug(). Str("MessageType", consensus.phase.String()). Msg("[UpdateBitmaps] Updating consensus bitmaps") - members := consensus.Decider.Participants() + members := consensus.decider.Participants() prepareBitmap := bls_cosi.NewMask(members) commitBitmap := bls_cosi.NewMask(members) multiSigBitmap := bls_cosi.NewMask(members) @@ -160,7 +159,7 @@ func (consensus *Consensus) resetState() { consensus.blockHash = [32]byte{} consensus.block = []byte{} - consensus.Decider.ResetPrepareAndCommitVotes() + consensus.decider.ResetPrepareAndCommitVotes() if consensus.prepareBitmap != nil { consensus.prepareBitmap.Clear() } @@ -179,7 +178,7 @@ func (consensus *Consensus) IsValidatorInCommittee(pubKey bls.SerializedPublicKe } func (consensus *Consensus) isValidatorInCommittee(pubKey bls.SerializedPublicKey) bool { - return consensus.Decider.IndexOf(pubKey) != -1 + return consensus.decider.IndexOf(pubKey) != -1 } // SetMode sets the mode of consensus @@ -191,10 +190,6 @@ func (consensus *Consensus) SetMode(m Mode) { // SetMode sets the mode of consensus func (consensus *Consensus) setMode(m Mode) { - if m == Normal && consensus.isBackup { - m = NormalBackup - } - consensus.getLogger().Debug(). Str("Mode", m.String()). Msg("[SetMode]") @@ -203,11 +198,12 @@ func (consensus *Consensus) setMode(m Mode) { // SetIsBackup sets the mode of consensus func (consensus *Consensus) SetIsBackup(isBackup bool) { + consensus.mutex.Lock() + defer consensus.mutex.Unlock() consensus.getLogger().Debug(). Bool("IsBackup", isBackup). Msg("[SetIsBackup]") consensus.isBackup = isBackup - consensus.current.SetIsBackup(isBackup) } // Mode returns the mode of consensus @@ -271,7 +267,7 @@ func (consensus *Consensus) setBlockNum(blockNum uint64) { // ReadSignatureBitmapPayload read the payload for signature and bitmap; offset is the beginning position of reading func (consensus *Consensus) ReadSignatureBitmapPayload(recvPayload []byte, offset int) (*bls_core.Sign, *bls_cosi.Mask, error) { consensus.mutex.RLock() - members := consensus.Decider.Participants() + members := consensus.decider.Participants() consensus.mutex.RUnlock() return consensus.readSignatureBitmapPayload(recvPayload, offset, members) } @@ -334,12 +330,12 @@ func (consensus *Consensus) updateConsensusInformation() Mode { isFirstTimeStaking := consensus.Blockchain().Config().IsStaking(nextEpoch) && curHeader.IsLastBlockInEpoch() && !consensus.Blockchain().Config().IsStaking(curEpoch) haventUpdatedDecider := consensus.Blockchain().Config().IsStaking(curEpoch) && - consensus.Decider.Policy() != quorum.SuperMajorityStake + consensus.decider.Policy() != quorum.SuperMajorityStake // Only happens once, the flip-over to a new Decider policy if isFirstTimeStaking || haventUpdatedDecider { decider := quorum.NewDecider(quorum.SuperMajorityStake, consensus.ShardID) - consensus.Decider = decider + consensus.decider = decider } var committeeToSet *shard.Committee @@ -412,7 +408,7 @@ func (consensus *Consensus) updateConsensusInformation() Mode { consensus.updatePublicKeys(pubKeys, shard.Schedule.InstanceForEpoch(nextEpoch).ExternalAllowlist()) // Update voters in the committee - if _, err := consensus.Decider.SetVoters( + if _, err := consensus.decider.SetVoters( committeeToSet, epochToSet, ); err != nil { consensus.getLogger().Error(). @@ -582,7 +578,7 @@ func (consensus *Consensus) selfCommit(payload []byte) error { return errGetPreparedBlock } - aggSig, mask, err := consensus.readSignatureBitmapPayload(payload, 32, consensus.Decider.Participants()) + aggSig, mask, err := consensus.readSignatureBitmapPayload(payload, 32, consensus.decider.Participants()) if err != nil { return errReadBitmapPayload } @@ -606,7 +602,7 @@ func (consensus *Consensus) selfCommit(payload []byte) error { continue } - if _, err := consensus.Decider.AddNewVote( + if _, err := consensus.decider.AddNewVote( quorum.Commit, []*bls_cosi.PublicKeyWrapper{key.Pub}, key.Pri.SignHash(commitPayload), @@ -628,7 +624,7 @@ func (consensus *Consensus) selfCommit(payload []byte) error { func (consensus *Consensus) NumSignaturesIncludedInBlock(block *types.Block) uint32 { count := uint32(0) consensus.mutex.Lock() - members := consensus.Decider.Participants() + members := consensus.decider.Participants() pubKeys := consensus.getPublicKeys() consensus.mutex.Unlock() diff --git a/consensus/consensus_test.go b/consensus/consensus_test.go index 992e725e7..2fe524fdf 100644 --- a/consensus/consensus_test.go +++ b/consensus/consensus_test.go @@ -18,7 +18,7 @@ import ( ) func TestConsensusInitialization(t *testing.T) { - host, multiBLSPrivateKey, consensus, decider, err := GenerateConsensusForTesting() + host, multiBLSPrivateKey, consensus, _, err := GenerateConsensusForTesting() assert.NoError(t, err) messageSender := &MessageSender{host: host, retryTimes: int(phaseDuration.Seconds()) / RetryIntervalInSec} @@ -30,7 +30,6 @@ func TestConsensusInitialization(t *testing.T) { expectedTimeouts[timeoutViewChange] = viewChangeDuration expectedTimeouts[timeoutBootstrap] = bootstrapDuration - assert.Equal(t, decider, consensus.Decider) assert.Equal(t, host, consensus.host) assert.Equal(t, messageSender, consensus.msgSender) diff --git a/consensus/consensus_v2.go b/consensus/consensus_v2.go index 0e1c40705..33ba54b1d 100644 --- a/consensus/consensus_v2.go +++ b/consensus/consensus_v2.go @@ -91,7 +91,7 @@ func (consensus *Consensus) HandleMessageUpdate(ctx context.Context, peer libp2p case t == msg_pb.MessageType_VIEWCHANGE: fbftMsg, err = ParseViewChangeMessage(msg) case t == msg_pb.MessageType_NEWVIEW: - members := consensus.Decider.Participants() + members := consensus.decider.Participants() fbftMsg, err = ParseNewViewMessage(msg, members) default: fbftMsg, err = consensus.parseFBFTMessage(msg) @@ -106,7 +106,7 @@ func (consensus *Consensus) HandleMessageUpdate(ctx context.Context, peer libp2p consensus.isLeader() // if in backup normal mode, force ignore view change event and leader event. - if consensus.current.Mode() == NormalBackup { + if consensus.isBackup { canHandleViewChange = false intendedForLeader = false } @@ -138,7 +138,7 @@ func (consensus *Consensus) HandleMessageUpdate(ctx context.Context, peer libp2p } func (consensus *Consensus) finalCommit() { - numCommits := consensus.Decider.SignersCount(quorum.Commit) + numCommits := consensus.decider.SignersCount(quorum.Commit) consensus.getLogger().Info(). Int64("NumCommits", numCommits). @@ -178,7 +178,10 @@ func (consensus *Consensus) finalCommit() { return } consensus.getLogger().Info().Hex("new", commitSigAndBitmap).Msg("[finalCommit] Overriding commit signatures!!") - consensus.Blockchain().WriteCommitSig(block.NumberU64(), commitSigAndBitmap) + + if err := consensus.Blockchain().WriteCommitSig(block.NumberU64(), commitSigAndBitmap); err != nil { + consensus.getLogger().Warn().Err(err).Msg("[finalCommit] failed writting commit sig") + } // Send committed message before block insertion. // if leader successfully finalizes the block, send committed message to validators @@ -441,7 +444,7 @@ func (consensus *Consensus) BlockChannel(newBlock *types.Block) { Int("numTxs", len(newBlock.Transactions())). Int("numStakingTxs", len(newBlock.StakingTransactions())). Time("startTime", startTime). - Int64("publicKeys", consensus.Decider.ParticipantsCount()). + Int64("publicKeys", consensus.decider.ParticipantsCount()). Msg("[ConsensusMainLoop] STARTING CONSENSUS") consensus.announce(newBlock) }) @@ -679,7 +682,7 @@ func (consensus *Consensus) commitBlock(blk *types.Block, committedMsg *FBFTMess // rotateLeader rotates the leader to the next leader in the committee. // This function must be called with enabled leader rotation. -func (consensus *Consensus) rotateLeader(epoch *big.Int) *bls.PublicKeyWrapper { +func (consensus *Consensus) rotateLeader(epoch *big.Int, defaultKey *bls.PublicKeyWrapper) *bls.PublicKeyWrapper { var ( bc = consensus.Blockchain() leader = consensus.getLeaderPubKey() @@ -687,31 +690,32 @@ func (consensus *Consensus) rotateLeader(epoch *big.Int) *bls.PublicKeyWrapper { curNumber = curBlock.NumberU64() curEpoch = curBlock.Epoch().Uint64() ) + if epoch.Uint64() != curEpoch { + return defaultKey + } const blocksCountAliveness = 4 - utils.Logger().Info().Msgf("[Rotating leader] epoch: %v rotation:%v external rotation %v", epoch.Uint64(), bc.Config().IsLeaderRotationInternalValidators(epoch), bc.Config().IsLeaderRotationExternalValidatorsAllowed(epoch)) ss, err := bc.ReadShardState(epoch) if err != nil { utils.Logger().Error().Err(err).Msg("Failed to read shard state") - return nil + return defaultKey } committee, err := ss.FindCommitteeByID(consensus.ShardID) if err != nil { utils.Logger().Error().Err(err).Msg("Failed to find committee") - return nil + return defaultKey } slotsCount := len(committee.Slots) blocksPerEpoch := shard.Schedule.InstanceForEpoch(epoch).BlocksPerEpoch() if blocksPerEpoch == 0 { utils.Logger().Error().Msg("[Rotating leader] blocks per epoch is 0") - return nil + return defaultKey } if slotsCount == 0 { utils.Logger().Error().Msg("[Rotating leader] slots count is 0") - return nil + return defaultKey } numBlocksProducedByLeader := blocksPerEpoch / uint64(slotsCount) - rest := blocksPerEpoch % uint64(slotsCount) const minimumBlocksForLeaderInRow = blocksCountAliveness if numBlocksProducedByLeader < minimumBlocksForLeaderInRow { // mine no less than 3 blocks in a row @@ -720,15 +724,11 @@ func (consensus *Consensus) rotateLeader(epoch *big.Int) *bls.PublicKeyWrapper { s := bc.LeaderRotationMeta() if !bytes.Equal(leader.Bytes[:], s.Pub) { // Another leader. - return nil - } - // If it is the first validator producing blocks, it should also produce the remaining 'rest' of the blocks. - if s.Shifts == 0 { - numBlocksProducedByLeader += rest + return defaultKey } if s.Count < numBlocksProducedByLeader { // Not enough blocks produced by the leader, continue producing by the same leader. - return nil + return defaultKey } // Passed all checks, we can change leader. // NthNext will move the leader to the next leader in the committee. @@ -741,23 +741,23 @@ func (consensus *Consensus) rotateLeader(epoch *big.Int) *bls.PublicKeyWrapper { for i := 0; i < len(committee.Slots); i++ { if bc.Config().IsLeaderRotationExternalValidatorsAllowed(epoch) { - wasFound, next = consensus.Decider.NthNextValidator(committee.Slots, leader, offset) + wasFound, next = consensus.decider.NthNextValidator(committee.Slots, leader, offset) } else { - wasFound, next = consensus.Decider.NthNextHmy(shard.Schedule.InstanceForEpoch(epoch), leader, offset) + wasFound, next = consensus.decider.NthNextHmy(shard.Schedule.InstanceForEpoch(epoch), leader, offset) } if !wasFound { utils.Logger().Error().Msg("Failed to get next leader") // Seems like nothing we can do here. - return nil + return defaultKey } - members := consensus.Decider.Participants() + members := consensus.decider.Participants() mask := bls.NewMask(members) skipped := 0 for i := 0; i < blocksCountAliveness; i++ { header := bc.GetHeaderByNumber(curNumber - uint64(i)) if header == nil { utils.Logger().Error().Msgf("Failed to get header by number %d", curNumber-uint64(i)) - return nil + return defaultKey } // if epoch is different, we should not check this block. if header.Epoch().Uint64() != curEpoch { @@ -767,12 +767,12 @@ func (consensus *Consensus) rotateLeader(epoch *big.Int) *bls.PublicKeyWrapper { err = mask.SetMask(header.LastCommitBitmap()) if err != nil { utils.Logger().Err(err).Msg("Failed to set mask") - return nil + return defaultKey } ok, err := mask.KeyEnabled(next.Bytes) if err != nil { utils.Logger().Err(err).Msg("Failed to get key enabled") - return nil + return defaultKey } if !ok { skipped++ @@ -787,14 +787,13 @@ func (consensus *Consensus) rotateLeader(epoch *big.Int) *bls.PublicKeyWrapper { } return next } - return nil + return defaultKey } // SetupForNewConsensus sets the state for new consensus func (consensus *Consensus) setupForNewConsensus(blk *types.Block, committedMsg *FBFTMessage) { atomic.StoreUint64(&consensus.blockNum, blk.NumberU64()+1) consensus.setCurBlockViewID(committedMsg.ViewID + 1) - consensus.LeaderPubKey = committedMsg.SenderPubkeys[0] var epoch *big.Int if blk.IsLastBlockInEpoch() { epoch = new(big.Int).Add(blk.Epoch(), common.Big1) @@ -802,9 +801,14 @@ func (consensus *Consensus) setupForNewConsensus(blk *types.Block, committedMsg epoch = blk.Epoch() } if consensus.Blockchain().Config().IsLeaderRotationInternalValidators(epoch) { - if next := consensus.rotateLeader(epoch); next != nil { + if next := consensus.rotateLeader(epoch, committedMsg.SenderPubkeys[0]); next != nil { prev := consensus.getLeaderPubKey() consensus.setLeaderPubKey(next) + if consensus.isLeader() { + utils.Logger().Info().Msgf("We are block %d, I am the new leader %s", blk.NumberU64(), next.Bytes.Hex()) + } else { + utils.Logger().Info().Msgf("We are block %d, the leader is %s", blk.NumberU64(), next.Bytes.Hex()) + } if consensus.isLeader() && !consensus.getLeaderPubKey().Object.IsEqual(prev.Object) { // leader changed blockPeriod := consensus.BlockPeriod diff --git a/consensus/construct.go b/consensus/construct.go index 10488816c..01d02a4d4 100644 --- a/consensus/construct.go +++ b/consensus/construct.go @@ -4,15 +4,13 @@ import ( "bytes" "errors" - protobuf "github.com/golang/protobuf/proto" - - "github.com/harmony-one/harmony/crypto/bls" - bls_core "github.com/harmony-one/bls/ffi/go/bls" "github.com/harmony-one/harmony/api/proto" msg_pb "github.com/harmony-one/harmony/api/proto/message" "github.com/harmony-one/harmony/consensus/quorum" + "github.com/harmony-one/harmony/crypto/bls" "github.com/harmony-one/harmony/internal/utils" + protobuf "google.golang.org/protobuf/proto" ) // NetworkMessage is a message intended to be @@ -82,7 +80,7 @@ func (consensus *Consensus) construct( ) } else { // TODO: use a persistent bitmap to report bitmap - mask := bls.NewMask(consensus.Decider.Participants()) + mask := bls.NewMask(consensus.decider.Participants()) for _, key := range priKeys { mask.SetKey(key.Pub.Bytes, true) } @@ -161,7 +159,7 @@ func (consensus *Consensus) construct( func (consensus *Consensus) constructQuorumSigAndBitmap(p quorum.Phase) []byte { buffer := bytes.Buffer{} // 96 bytes aggregated signature - aggSig := consensus.Decider.AggregateVotes(p) + aggSig := consensus.decider.AggregateVotes(p) buffer.Write(aggSig.Serialize()) // Bitmap if p == quorum.Prepare { diff --git a/consensus/construct_test.go b/consensus/construct_test.go index 7188ebea6..c836e7822 100644 --- a/consensus/construct_test.go +++ b/consensus/construct_test.go @@ -81,7 +81,7 @@ func TestConstructPreparedMessage(test *testing.T) { validatorKey := bls.SerializedPublicKey{} validatorKey.FromLibBLSPublicKey(validatorPubKey) validatorKeyWrapper := bls.PublicKeyWrapper{Object: validatorPubKey, Bytes: validatorKey} - consensus.Decider.AddNewVote( + consensus.Decider().AddNewVote( quorum.Prepare, []*bls.PublicKeyWrapper{&leaderKeyWrapper}, leaderPriKey.Sign(message), @@ -89,7 +89,7 @@ func TestConstructPreparedMessage(test *testing.T) { consensus.BlockNum(), consensus.GetCurBlockViewID(), ) - if _, err := consensus.Decider.AddNewVote( + if _, err := consensus.Decider().AddNewVote( quorum.Prepare, []*bls.PublicKeyWrapper{&validatorKeyWrapper}, validatorPriKey.Sign(message), diff --git a/consensus/double_sign.go b/consensus/double_sign.go index 3a8d559fd..144c67bff 100644 --- a/consensus/double_sign.go +++ b/consensus/double_sign.go @@ -17,7 +17,7 @@ func (consensus *Consensus) checkDoubleSign(recvMsg *FBFTMessage) bool { if consensus.couldThisBeADoubleSigner(recvMsg) { addrSet := map[common.Address]struct{}{} for _, pubKey2 := range recvMsg.SenderPubkeys { - if alreadyCastBallot := consensus.Decider.ReadBallot( + if alreadyCastBallot := consensus.decider.ReadBallot( quorum.Commit, pubKey2.Bytes, ); alreadyCastBallot != nil { for _, pubKey1 := range alreadyCastBallot.SignerPubKeys { diff --git a/consensus/enums.go b/consensus/enums.go index a8c4357c2..41eafba86 100644 --- a/consensus/enums.go +++ b/consensus/enums.go @@ -14,8 +14,6 @@ const ( Syncing // Listening .. Listening - // NormalBackup Backup Node .. - NormalBackup ) // FBFTPhase : different phases of consensus @@ -34,7 +32,6 @@ var ( ViewChanging: "ViewChanging", Syncing: "Syncing", Listening: "Listening", - NormalBackup: "NormalBackup", } phaseNames = map[FBFTPhase]string{ FBFTAnnounce: "Announce", diff --git a/consensus/leader.go b/consensus/leader.go index 0bd934cb7..4a227fa62 100644 --- a/consensus/leader.go +++ b/consensus/leader.go @@ -62,7 +62,7 @@ func (consensus *Consensus) announce(block *types.Block) { continue } - if _, err := consensus.Decider.AddNewVote( + if _, err := consensus.decider.AddNewVote( quorum.Prepare, []*bls.PublicKeyWrapper{key.Pub}, key.Pri.SignHash(consensus.blockHash[:]), @@ -112,7 +112,7 @@ func (consensus *Consensus) onPrepare(recvMsg *FBFTMessage) { prepareBitmap := consensus.prepareBitmap // proceed only when the message is not received before for _, signer := range recvMsg.SenderPubkeys { - signed := consensus.Decider.ReadBallot(quorum.Prepare, signer.Bytes) + signed := consensus.decider.ReadBallot(quorum.Prepare, signer.Bytes) if signed != nil { consensus.getLogger().Debug(). Str("validatorPubKey", signer.Bytes.Hex()). @@ -121,16 +121,18 @@ func (consensus *Consensus) onPrepare(recvMsg *FBFTMessage) { } } - if consensus.Decider.IsQuorumAchieved(quorum.Prepare) { + if consensus.decider.IsQuorumAchieved(quorum.Prepare) { // already have enough signatures consensus.getLogger().Debug(). Interface("validatorPubKeys", recvMsg.SenderPubkeys). Msg("[OnPrepare] Received Additional Prepare Message") return } - signerCount := consensus.Decider.SignersCount(quorum.Prepare) + signerCount := consensus.decider.SignersCount(quorum.Prepare) //// Read - End + consensus.UpdateLeaderMetrics(float64(signerCount), float64(consensus.getBlockNum())) + // Check BLS signature for the multi-sig prepareSig := recvMsg.Payload var sign bls_core.Sign @@ -161,11 +163,11 @@ func (consensus *Consensus) onPrepare(recvMsg *FBFTMessage) { consensus.getLogger().Debug(). Int64("NumReceivedSoFar", signerCount). - Int64("PublicKeys", consensus.Decider.ParticipantsCount()). + Int64("PublicKeys", consensus.decider.ParticipantsCount()). Msg("[OnPrepare] Received New Prepare Signature") //// Write - Start - if _, err := consensus.Decider.AddNewVote( + if _, err := consensus.decider.AddNewVote( quorum.Prepare, recvMsg.SenderPubkeys, &sign, recvMsg.BlockHash, recvMsg.BlockNum, recvMsg.ViewID, @@ -181,7 +183,7 @@ func (consensus *Consensus) onPrepare(recvMsg *FBFTMessage) { //// Write - End //// Read - Start - if consensus.Decider.IsQuorumAchieved(quorum.Prepare) { + if consensus.decider.IsQuorumAchieved(quorum.Prepare) { // NOTE Let it handle its own logs if err := consensus.didReachPrepareQuorum(); err != nil { return @@ -199,7 +201,7 @@ func (consensus *Consensus) onCommit(recvMsg *FBFTMessage) { } // proceed only when the message is not received before for _, signer := range recvMsg.SenderPubkeys { - signed := consensus.Decider.ReadBallot(quorum.Commit, signer.Bytes) + signed := consensus.decider.ReadBallot(quorum.Commit, signer.Bytes) if signed != nil { consensus.getLogger().Debug(). Str("validatorPubKey", signer.Bytes.Hex()). @@ -211,9 +213,9 @@ func (consensus *Consensus) onCommit(recvMsg *FBFTMessage) { commitBitmap := consensus.commitBitmap // has to be called before verifying signature - quorumWasMet := consensus.Decider.IsQuorumAchieved(quorum.Commit) + quorumWasMet := consensus.decider.IsQuorumAchieved(quorum.Commit) - signerCount := consensus.Decider.SignersCount(quorum.Commit) + signerCount := consensus.decider.SignersCount(quorum.Commit) //// Read - End // Verify the signature on commitPayload is correct @@ -267,7 +269,7 @@ func (consensus *Consensus) onCommit(recvMsg *FBFTMessage) { return } */ - if _, err := consensus.Decider.AddNewVote( + if _, err := consensus.decider.AddNewVote( quorum.Commit, recvMsg.SenderPubkeys, &sign, recvMsg.BlockHash, recvMsg.BlockNum, recvMsg.ViewID, @@ -285,15 +287,7 @@ func (consensus *Consensus) onCommit(recvMsg *FBFTMessage) { //// Read - Start viewID := consensus.getCurBlockViewID() - if consensus.Decider.IsAllSigsCollected() { - logger.Info().Msg("[OnCommit] 100% Enough commits received") - consensus.finalCommit() - - consensus.msgSender.StopRetry(msg_pb.MessageType_PREPARED) - return - } - - quorumIsMet := consensus.Decider.IsQuorumAchieved(quorum.Commit) + quorumIsMet := consensus.decider.IsQuorumAchieved(quorum.Commit) //// Read - End if !quorumWasMet && quorumIsMet { diff --git a/consensus/quorum/thread_safe_decider.go b/consensus/quorum/thread_safe_decider.go new file mode 100644 index 000000000..9999325f6 --- /dev/null +++ b/consensus/quorum/thread_safe_decider.go @@ -0,0 +1,179 @@ +package quorum + +import ( + "math/big" + "sync" + + "github.com/ethereum/go-ethereum/common" + bls_core "github.com/harmony-one/bls/ffi/go/bls" + "github.com/harmony-one/harmony/consensus/votepower" + "github.com/harmony-one/harmony/crypto/bls" + shardingconfig "github.com/harmony-one/harmony/internal/configs/sharding" + "github.com/harmony-one/harmony/multibls" + "github.com/harmony-one/harmony/numeric" + "github.com/harmony-one/harmony/shard" +) + +var _ Decider = threadSafeDeciderImpl{} + +type threadSafeDeciderImpl struct { + mu *sync.RWMutex + decider Decider +} + +func NewThreadSafeDecider(decider Decider, mu *sync.RWMutex) Decider { + return threadSafeDeciderImpl{ + mu: mu, + decider: decider, + } +} + +func (a threadSafeDeciderImpl) String() string { + a.mu.Lock() + defer a.mu.Unlock() + return a.decider.String() +} + +func (a threadSafeDeciderImpl) Participants() multibls.PublicKeys { + a.mu.Lock() + defer a.mu.Unlock() + return a.decider.Participants() +} + +func (a threadSafeDeciderImpl) IndexOf(key bls.SerializedPublicKey) int { + a.mu.Lock() + defer a.mu.Unlock() + return a.decider.IndexOf(key) +} + +func (a threadSafeDeciderImpl) ParticipantsCount() int64 { + a.mu.Lock() + defer a.mu.Unlock() + return a.decider.ParticipantsCount() +} + +func (a threadSafeDeciderImpl) NthNextValidator(slotList shard.SlotList, pubKey *bls.PublicKeyWrapper, next int) (bool, *bls.PublicKeyWrapper) { + a.mu.Lock() + defer a.mu.Unlock() + return a.decider.NthNextValidator(slotList, pubKey, next) +} + +func (a threadSafeDeciderImpl) NthNextHmy(instance shardingconfig.Instance, pubkey *bls.PublicKeyWrapper, next int) (bool, *bls.PublicKeyWrapper) { + a.mu.Lock() + defer a.mu.Unlock() + return a.decider.NthNextHmy(instance, pubkey, next) +} + +func (a threadSafeDeciderImpl) NthNextHmyExt(instance shardingconfig.Instance, wrapper *bls.PublicKeyWrapper, i int) (bool, *bls.PublicKeyWrapper) { + a.mu.Lock() + defer a.mu.Unlock() + return a.decider.NthNextHmyExt(instance, wrapper, i) +} + +func (a threadSafeDeciderImpl) FirstParticipant(instance shardingconfig.Instance) *bls.PublicKeyWrapper { + a.mu.Lock() + defer a.mu.Unlock() + return a.decider.FirstParticipant(instance) +} + +func (a threadSafeDeciderImpl) UpdateParticipants(pubKeys, allowlist []bls.PublicKeyWrapper) { + a.mu.Lock() + defer a.mu.Unlock() + a.decider.UpdateParticipants(pubKeys, allowlist) +} + +func (a threadSafeDeciderImpl) submitVote(p Phase, pubkeys []bls.SerializedPublicKey, sig *bls_core.Sign, headerHash common.Hash, height, viewID uint64) (*votepower.Ballot, error) { + a.mu.Lock() + defer a.mu.Unlock() + return a.decider.submitVote(p, pubkeys, sig, headerHash, height, viewID) +} + +func (a threadSafeDeciderImpl) SignersCount(phase Phase) int64 { + a.mu.Lock() + defer a.mu.Unlock() + return a.decider.SignersCount(phase) +} + +func (a threadSafeDeciderImpl) reset(phases []Phase) { + a.mu.Lock() + defer a.mu.Unlock() + a.decider.reset(phases) +} + +func (a threadSafeDeciderImpl) ReadBallot(p Phase, pubkey bls.SerializedPublicKey) *votepower.Ballot { + a.mu.Lock() + defer a.mu.Unlock() + return a.decider.ReadBallot(p, pubkey) +} + +func (a threadSafeDeciderImpl) TwoThirdsSignersCount() int64 { + a.mu.Lock() + defer a.mu.Unlock() + return a.decider.TwoThirdsSignersCount() +} + +func (a threadSafeDeciderImpl) AggregateVotes(p Phase) *bls_core.Sign { + a.mu.Lock() + defer a.mu.Unlock() + return a.decider.AggregateVotes(p) +} + +func (a threadSafeDeciderImpl) SetVoters(subCommittee *shard.Committee, epoch *big.Int) (*TallyResult, error) { + a.mu.Lock() + defer a.mu.Unlock() + return a.decider.SetVoters(subCommittee, epoch) +} + +func (a threadSafeDeciderImpl) Policy() Policy { + a.mu.Lock() + defer a.mu.Unlock() + return a.decider.Policy() +} + +func (a threadSafeDeciderImpl) AddNewVote(p Phase, pubkeys []*bls.PublicKeyWrapper, sig *bls_core.Sign, headerHash common.Hash, height, viewID uint64) (*votepower.Ballot, error) { + a.mu.Lock() + defer a.mu.Unlock() + return a.decider.AddNewVote(p, pubkeys, sig, headerHash, height, viewID) +} + +func (a threadSafeDeciderImpl) IsQuorumAchievedByMask(mask *bls.Mask) bool { + a.mu.Lock() + defer a.mu.Unlock() + return a.decider.IsQuorumAchievedByMask(mask) +} + +func (a threadSafeDeciderImpl) QuorumThreshold() numeric.Dec { + a.mu.Lock() + defer a.mu.Unlock() + return a.decider.QuorumThreshold() +} + +func (a threadSafeDeciderImpl) IsAllSigsCollected() bool { + a.mu.Lock() + defer a.mu.Unlock() + return a.decider.IsAllSigsCollected() +} + +func (a threadSafeDeciderImpl) ResetPrepareAndCommitVotes() { + a.mu.Lock() + defer a.mu.Unlock() + a.decider.ResetPrepareAndCommitVotes() +} + +func (a threadSafeDeciderImpl) ResetViewChangeVotes() { + a.mu.Lock() + defer a.mu.Unlock() + a.decider.ResetViewChangeVotes() +} + +func (a threadSafeDeciderImpl) CurrentTotalPower(p Phase) (*numeric.Dec, error) { + a.mu.Lock() + defer a.mu.Unlock() + return a.decider.CurrentTotalPower(p) +} + +func (a threadSafeDeciderImpl) IsQuorumAchieved(p Phase) bool { + a.mu.Lock() + defer a.mu.Unlock() + return a.decider.IsQuorumAchieved(p) +} diff --git a/consensus/threshold.go b/consensus/threshold.go index e611eaedc..339f6d2a7 100644 --- a/consensus/threshold.go +++ b/consensus/threshold.go @@ -57,7 +57,7 @@ func (consensus *Consensus) didReachPrepareQuorum() error { continue } - if _, err := consensus.Decider.AddNewVote( + if _, err := consensus.decider.AddNewVote( quorum.Commit, []*bls.PublicKeyWrapper{key.Pub}, key.Pri.SignHash(commitPayload), diff --git a/consensus/validator.go b/consensus/validator.go index 891fe0c03..2f14f76b4 100644 --- a/consensus/validator.go +++ b/consensus/validator.go @@ -133,7 +133,7 @@ func (consensus *Consensus) validateNewBlock(recvMsg *FBFTMessage) (*types.Block } func (consensus *Consensus) prepare() { - if consensus.IsBackup() { + if consensus.isBackup { return } @@ -152,7 +152,7 @@ func (consensus *Consensus) prepare() { // sendCommitMessages send out commit messages to leader func (consensus *Consensus) sendCommitMessages(blockObj *types.Block) { - if consensus.IsBackup() || blockObj == nil { + if consensus.isBackup || blockObj == nil { return } @@ -199,12 +199,12 @@ func (consensus *Consensus) onPrepared(recvMsg *FBFTMessage) { // check validity of prepared signature blockHash := recvMsg.BlockHash - aggSig, mask, err := consensus.readSignatureBitmapPayload(recvMsg.Payload, 0, consensus.Decider.Participants()) + aggSig, mask, err := consensus.readSignatureBitmapPayload(recvMsg.Payload, 0, consensus.decider.Participants()) if err != nil { consensus.getLogger().Error().Err(err).Msg("ReadSignatureBitmapPayload failed!") return } - if !consensus.Decider.IsQuorumAchievedByMask(mask) { + if !consensus.decider.IsQuorumAchievedByMask(mask) { consensus.getLogger().Warn().Msgf("[OnPrepared] Quorum Not achieved.") return } @@ -335,7 +335,7 @@ func (consensus *Consensus) onCommitted(recvMsg *FBFTMessage) { return } - aggSig, mask, err := chain.DecodeSigBitmap(sigBytes, bitmap, consensus.Decider.Participants()) + aggSig, mask, err := chain.DecodeSigBitmap(sigBytes, bitmap, consensus.decider.Participants()) if err != nil { consensus.getLogger().Error().Err(err).Msg("[OnCommitted] readSignatureBitmapPayload failed") return diff --git a/consensus/view_change.go b/consensus/view_change.go index d03ae5a13..f55c5400f 100644 --- a/consensus/view_change.go +++ b/consensus/view_change.go @@ -33,8 +33,6 @@ type State struct { // view changing id is used during view change mode // it is the next view id viewChangingID uint64 - - isBackup bool } // Mode return the current node mode @@ -44,10 +42,6 @@ func (pm *State) Mode() Mode { // SetMode set the node mode as required func (pm *State) SetMode(s Mode) { - if s == Normal && pm.isBackup { - s = NormalBackup - } - pm.mode = s } @@ -81,10 +75,6 @@ func (pm *State) GetViewChangeDuraion() time.Duration { return time.Duration(diff * diff * int64(viewChangeDuration)) } -func (pm *State) SetIsBackup(isBackup bool) { - pm.isBackup = isBackup -} - // fallbackNextViewID return the next view ID and duration when there is an exception // to calculate the time-based viewId func (consensus *Consensus) fallbackNextViewID() (uint64, time.Duration) { @@ -187,7 +177,7 @@ func (consensus *Consensus) getNextLeaderKey(viewID uint64, committee *shard.Com // it can still sync with other validators. if curHeader.IsLastBlockInEpoch() { consensus.getLogger().Info().Msg("[getNextLeaderKey] view change in the first block of new epoch") - lastLeaderPubKey = consensus.Decider.FirstParticipant(shard.Schedule.InstanceForEpoch(epoch)) + lastLeaderPubKey = consensus.decider.FirstParticipant(shard.Schedule.InstanceForEpoch(epoch)) } } } @@ -204,18 +194,18 @@ func (consensus *Consensus) getNextLeaderKey(viewID uint64, committee *shard.Com var next *bls.PublicKeyWrapper if blockchain != nil && blockchain.Config().IsLeaderRotationInternalValidators(epoch) { if blockchain.Config().IsLeaderRotationExternalValidatorsAllowed(epoch) { - wasFound, next = consensus.Decider.NthNextValidator( + wasFound, next = consensus.decider.NthNextValidator( committee.Slots, lastLeaderPubKey, gap) } else { - wasFound, next = consensus.Decider.NthNextHmy( + wasFound, next = consensus.decider.NthNextHmy( shard.Schedule.InstanceForEpoch(epoch), lastLeaderPubKey, gap) } } else { - wasFound, next = consensus.Decider.NthNextHmy( + wasFound, next = consensus.decider.NthNextHmy( shard.Schedule.InstanceForEpoch(epoch), lastLeaderPubKey, gap) @@ -281,7 +271,7 @@ func (consensus *Consensus) startViewChange() { defer consensus.consensusTimeout[timeoutViewChange].Start() // update the dictionary key if the viewID is first time received - members := consensus.Decider.Participants() + members := consensus.decider.Participants() consensus.vc.AddViewIDKeyIfNotExist(nextViewID, members) // init my own payload @@ -386,10 +376,10 @@ func (consensus *Consensus) onViewChange(recvMsg *FBFTMessage) { return } - if consensus.Decider.IsQuorumAchievedByMask(consensus.vc.GetViewIDBitmap(recvMsg.ViewID)) { + if consensus.decider.IsQuorumAchievedByMask(consensus.vc.GetViewIDBitmap(recvMsg.ViewID)) { consensus.getLogger().Info(). - Int64("have", consensus.Decider.SignersCount(quorum.ViewChange)). - Int64("need", consensus.Decider.TwoThirdsSignersCount()). + Int64("have", consensus.decider.SignersCount(quorum.ViewChange)). + Int64("need", consensus.decider.TwoThirdsSignersCount()). Interface("SenderPubkeys", recvMsg.SenderPubkeys). Str("newLeaderKey", newLeaderKey.Bytes.Hex()). Msg("[onViewChange] Received Enough View Change Messages") @@ -404,7 +394,7 @@ func (consensus *Consensus) onViewChange(recvMsg *FBFTMessage) { senderKey := recvMsg.SenderPubkeys[0] // update the dictionary key if the viewID is first time received - members := consensus.Decider.Participants() + members := consensus.decider.Participants() consensus.vc.AddViewIDKeyIfNotExist(recvMsg.ViewID, members) // do it once only per viewID/Leader @@ -420,7 +410,7 @@ func (consensus *Consensus) onViewChange(recvMsg *FBFTMessage) { return } - err = consensus.vc.ProcessViewChangeMsg(consensus.fBFTLog, consensus.Decider, recvMsg, consensus.verifyBlock) + err = consensus.vc.ProcessViewChangeMsg(consensus.fBFTLog, consensus.decider, recvMsg, consensus.verifyBlock) if err != nil { consensus.getLogger().Error().Err(err). Uint64("viewID", recvMsg.ViewID). @@ -431,7 +421,7 @@ func (consensus *Consensus) onViewChange(recvMsg *FBFTMessage) { } // received enough view change messages, change state to normal consensus - if consensus.Decider.IsQuorumAchievedByMask(consensus.vc.GetViewIDBitmap(recvMsg.ViewID)) && consensus.isViewChangingMode() { + if consensus.decider.IsQuorumAchievedByMask(consensus.vc.GetViewIDBitmap(recvMsg.ViewID)) && consensus.isViewChangingMode() { // no previous prepared message, go straight to normal mode // and start proposing new block if consensus.vc.IsM1PayloadEmpty() { @@ -495,7 +485,7 @@ func (consensus *Consensus) onNewView(recvMsg *FBFTMessage) { } m3Mask := recvMsg.M3Bitmap - if !consensus.Decider.IsQuorumAchievedByMask(m3Mask) { + if !consensus.decider.IsQuorumAchievedByMask(m3Mask) { consensus.getLogger().Warn(). Msgf("[onNewView] Quorum Not achieved") return @@ -507,7 +497,7 @@ func (consensus *Consensus) onNewView(recvMsg *FBFTMessage) { utils.CountOneBits(m3Mask.Bitmap) > utils.CountOneBits(m2Mask.Bitmap)) { // m1 is not empty, check it's valid blockHash := recvMsg.Payload[:32] - aggSig, mask, err := consensus.readSignatureBitmapPayload(recvMsg.Payload, 32, consensus.Decider.Participants()) + aggSig, mask, err := consensus.readSignatureBitmapPayload(recvMsg.Payload, 32, consensus.decider.Participants()) if err != nil { consensus.getLogger().Error().Err(err). Msg("[onNewView] ReadSignatureBitmapPayload Failed") @@ -584,5 +574,5 @@ func (consensus *Consensus) resetViewChangeState() { Msg("[ResetViewChangeState] Resetting view change state") consensus.current.SetMode(Normal) consensus.vc.Reset() - consensus.Decider.ResetViewChangeVotes() + consensus.decider.ResetViewChangeVotes() } diff --git a/consensus/view_change_construct.go b/consensus/view_change_construct.go index fcf025e74..5d2553175 100644 --- a/consensus/view_change_construct.go +++ b/consensus/view_change_construct.go @@ -465,7 +465,7 @@ func (vc *viewChange) InitPayload( if !inited { viewIDBytes := make([]byte, 8) binary.LittleEndian.PutUint64(viewIDBytes, viewID) - vc.getLogger().Info().Uint64("viewID", viewID).Uint64("blockNum", blockNum).Msg("[InitPayload] add my M3 (ViewID) type messaage") + vc.getLogger().Info().Uint64("viewID", viewID).Uint64("blockNum", blockNum).Msg("[InitPayload] add my M3 (ViewID) type message") for _, key := range privKeys { if _, ok := vc.viewIDBitmap[viewID]; !ok { viewIDBitmap := bls_cosi.NewMask(members) diff --git a/consensus/view_change_test.go b/consensus/view_change_test.go index 96d8fbc86..bbc699944 100644 --- a/consensus/view_change_test.go +++ b/consensus/view_change_test.go @@ -94,7 +94,7 @@ func TestGetNextLeaderKeyShouldSucceed(t *testing.T) { _, _, consensus, _, err := GenerateConsensusForTesting() assert.NoError(t, err) - assert.Equal(t, int64(0), consensus.Decider.ParticipantsCount()) + assert.Equal(t, int64(0), consensus.Decider().ParticipantsCount()) blsKeys := []*bls_core.PublicKey{} wrappedBLSKeys := []bls.PublicKeyWrapper{} @@ -111,8 +111,8 @@ func TestGetNextLeaderKeyShouldSucceed(t *testing.T) { wrappedBLSKeys = append(wrappedBLSKeys, wrapped) } - consensus.Decider.UpdateParticipants(wrappedBLSKeys, []bls.PublicKeyWrapper{}) - assert.Equal(t, keyCount, consensus.Decider.ParticipantsCount()) + consensus.Decider().UpdateParticipants(wrappedBLSKeys, []bls.PublicKeyWrapper{}) + assert.Equal(t, keyCount, consensus.Decider().ParticipantsCount()) consensus.LeaderPubKey = &wrappedBLSKeys[0] nextKey := consensus.getNextLeaderKey(uint64(1), nil) diff --git a/core/block_validator.go b/core/block_validator.go index 4e097b94d..700606832 100644 --- a/core/block_validator.go +++ b/core/block_validator.go @@ -56,7 +56,7 @@ func NewBlockValidator(blockchain BlockChain) *BlockValidator { func (v *BlockValidator) ValidateBody(block *types.Block) error { // Check whether the block's known, and if not, that it's linkable if v.bc.HasBlockAndState(block.Hash(), block.NumberU64()) { - return ErrKnownBlock + return errors.WithMessage(ErrKnownBlock, "validate body: has block and state") } if !v.bc.HasBlockAndState(block.ParentHash(), block.NumberU64()-1) { if !v.bc.HasBlock(block.ParentHash(), block.NumberU64()-1) { diff --git a/core/blockchain_impl.go b/core/blockchain_impl.go index ec3c5fc29..cfe85237f 100644 --- a/core/blockchain_impl.go +++ b/core/blockchain_impl.go @@ -31,6 +31,8 @@ import ( "sync/atomic" "time" + "github.com/pkg/errors" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/common/prque" @@ -66,7 +68,6 @@ import ( "github.com/harmony-one/harmony/staking/slash" staking "github.com/harmony-one/harmony/staking/types" lru "github.com/hashicorp/golang-lru" - "github.com/pkg/errors" ) var ( @@ -353,7 +354,11 @@ func newBlockChainWithOptions( NoBuild: bc.cacheConfig.SnapshotNoBuild, AsyncBuild: !bc.cacheConfig.SnapshotWait, } - bc.snaps, _ = snapshot.New(snapconfig, bc.db, bc.triedb, head.Hash()) + fmt.Println("loading/generating snapshot...") + utils.Logger().Info(). + Str("Root", head.Root().Hex()). + Msg("loading/generating snapshot") + bc.snaps, _ = snapshot.New(snapconfig, bc.db, bc.triedb, head.Root()) } curHeader := bc.CurrentBlock().Header() @@ -781,6 +786,20 @@ func (bc *BlockChainImpl) resetWithGenesisBlock(genesis *types.Block) error { return nil } +func (bc *BlockChainImpl) repairRecreateStateTries(head **types.Block) error { + for { + blk := bc.GetBlockByNumber((*head).NumberU64() + 1) + if blk != nil { + _, _, _, err := bc.insertChain([]*types.Block{blk}, true) + if err != nil { + return err + } + *head = blk + continue + } + } +} + // repair tries to repair the current blockchain by rolling back the current block // until one with associated state is found. This is needed to fix incomplete db // writes caused either by crashes/power outages, or simply non-committed tries. @@ -788,6 +807,16 @@ func (bc *BlockChainImpl) resetWithGenesisBlock(genesis *types.Block) error { // This method only rolls back the current block. The current header and current // fast block are left intact. func (bc *BlockChainImpl) repair(head **types.Block) error { + if err := bc.repairValidatorsAndCommitSigs(head); err != nil { + return errors.WithMessage(err, "failed to repair validators and commit sigs") + } + if err := bc.repairRecreateStateTries(head); err != nil { + return errors.WithMessage(err, "failed to recreate state tries") + } + return nil +} + +func (bc *BlockChainImpl) repairValidatorsAndCommitSigs(head **types.Block) error { valsToRemove := map[common.Address]struct{}{} for { // Abort if we've rewound to a head block that does have associated state @@ -796,6 +825,9 @@ func (bc *BlockChainImpl) repair(head **types.Block) error { Str("number", (*head).Number().String()). Str("hash", (*head).Hash().Hex()). Msg("Rewound blockchain to past state") + if err := rawdb.WriteHeadBlockHash(bc.db, (*head).Hash()); err != nil { + return errors.WithMessagef(err, "failed to write head block hash number %d", (*head).NumberU64()) + } return bc.removeInValidatorList(valsToRemove) } // Repair last commit sigs @@ -803,6 +835,14 @@ func (bc *BlockChainImpl) repair(head **types.Block) error { sigAndBitMap := append(lastSig[:], (*head).Header().LastCommitBitmap()...) bc.WriteCommitSig((*head).NumberU64()-1, sigAndBitMap) + err := rawdb.DeleteBlock(bc.db, (*head).Hash(), (*head).NumberU64()) + if err != nil { + return errors.WithMessagef(err, "failed to delete block %d", (*head).NumberU64()) + } + if err := rawdb.WriteHeadBlockHash(bc.db, (*head).ParentHash()); err != nil { + return errors.WithMessagef(err, "failed to write head block hash number %d", (*head).NumberU64()-1) + } + // Otherwise rewind one block and recheck state availability there for _, stkTxn := range (*head).StakingTransactions() { if stkTxn.StakingType() == staking.DirectiveCreateValidator { @@ -899,6 +939,9 @@ func (bc *BlockChainImpl) writeHeadBlock(block *types.Block) error { if err := rawdb.WriteHeadHeaderHash(batch, block.Hash()); err != nil { return err } + if err := rawdb.WriteHeaderNumber(batch, block.Hash(), block.NumberU64()); err != nil { + return err + } isNewEpoch := block.IsLastBlockInEpoch() if isNewEpoch { @@ -1731,21 +1774,16 @@ func (bc *BlockChainImpl) insertChain(chain types.Blocks, verifyHeaders bool) (i err = NewBlockValidator(bc).ValidateBody(block) } switch { - case err == ErrKnownBlock: - // Block and state both already known. However if the current block is below - // this number we did a rollback and we should reimport it nonetheless. - if bc.CurrentBlock().NumberU64() >= block.NumberU64() { - stats.ignored++ - continue - } + case errors.Is(err, ErrKnownBlock): + return i, events, coalescedLogs, err case err == consensus_engine.ErrFutureBlock: return i, events, coalescedLogs, err - case err == consensus_engine.ErrUnknownAncestor: + case errors.Is(err, consensus_engine.ErrUnknownAncestor): return i, events, coalescedLogs, err - case err == consensus_engine.ErrPrunedAncestor: + case errors.Is(err, consensus_engine.ErrPrunedAncestor): // TODO: add fork choice mechanism // Block competing with the canonical chain, store in the db, but don't process // until the competitor TD goes above the canonical TD @@ -1772,9 +1810,7 @@ func (bc *BlockChainImpl) insertChain(chain types.Blocks, verifyHeaders bool) (i // Prune in case non-empty winner chain if len(winner) > 0 { // Import all the pruned blocks to make the state available - bc.chainmu.Unlock() _, evs, logs, err := bc.insertChain(winner, true /* verifyHeaders */) - bc.chainmu.Lock() events, coalescedLogs = evs, logs if err != nil { @@ -1909,10 +1945,10 @@ func (bc *BlockChainImpl) insertChain(chain types.Blocks, verifyHeaders bool) (i // insertStats tracks and reports on block insertion. type insertStats struct { - queued, processed, ignored int - usedGas uint64 - lastIndex int - startTime mclock.AbsTime + queued, processed int + usedGas uint64 + lastIndex int + startTime mclock.AbsTime } // statsReportLimit is the time limit during import and export after which we @@ -1951,9 +1987,6 @@ func (st *insertStats) report(chain []*types.Block, index int, cache common.Stor if st.queued > 0 { context = context.Int("queued", st.queued) } - if st.ignored > 0 { - context = context.Int("ignored", st.ignored) - } logger := context.Logger() logger.Info().Msg("Imported new chain segment") diff --git a/core/blockchain_leader_rotation.go b/core/blockchain_leader_rotation.go index b7cdef519..8b2683780 100644 --- a/core/blockchain_leader_rotation.go +++ b/core/blockchain_leader_rotation.go @@ -14,10 +14,9 @@ import ( // LeaderRotationMeta contains information about leader rotation type LeaderRotationMeta struct { - Pub []byte // bls public key of previous block miner - Epoch uint64 // epoch number of previously inserted block - Count uint64 // quantity of continuous blocks inserted by the same leader - Shifts uint64 // number of leader shifts, shift happens when leader changes + Pub []byte // bls public key of previous block miner + Epoch uint64 // epoch number of previously inserted block + Count uint64 // quantity of continuous blocks inserted by the same leader } // ShortString returns string representation of the struct @@ -28,8 +27,6 @@ func (a LeaderRotationMeta) ShortString() string { s.WriteString(strconv.FormatUint(a.Epoch, 10)) s.WriteString(" ") s.WriteString(strconv.FormatUint(a.Count, 10)) - s.WriteString(" ") - s.WriteString(strconv.FormatUint(a.Shifts, 10)) return s.String() } @@ -39,17 +36,15 @@ func (a LeaderRotationMeta) Hash() []byte { c.Write(a.Pub) c.Write([]byte(strconv.FormatUint(a.Epoch, 10))) c.Write([]byte(strconv.FormatUint(a.Count, 10))) - c.Write([]byte(strconv.FormatUint(a.Shifts, 10))) return c.Sum(nil) } // Clone returns a copy of the struct func (a LeaderRotationMeta) Clone() LeaderRotationMeta { return LeaderRotationMeta{ - Pub: append([]byte{}, a.Pub...), - Epoch: a.Epoch, - Count: a.Count, - Shifts: a.Shifts, + Pub: append([]byte{}, a.Pub...), + Epoch: a.Epoch, + Count: a.Count, } } @@ -109,19 +104,10 @@ func processRotationMeta(epoch uint64, blockPubKey bls.SerializedPublicKey, s Le } else { s.Count = 1 } - // we should increase shifts if the leader has changed. - if !bytes.Equal(s.Pub, blockPubKey[:]) { - s.Shifts++ - } - // but set to zero if new - if s.Epoch != epoch { - s.Shifts = 0 - } s.Epoch = epoch return LeaderRotationMeta{ - Pub: blockPubKey[:], - Epoch: s.Epoch, - Count: s.Count, - Shifts: s.Shifts, + Pub: blockPubKey[:], + Epoch: s.Epoch, + Count: s.Count, } } diff --git a/core/blockchain_leader_rotation_test.go b/core/blockchain_leader_rotation_test.go index 047dbdd63..e964d39d7 100644 --- a/core/blockchain_leader_rotation_test.go +++ b/core/blockchain_leader_rotation_test.go @@ -12,46 +12,27 @@ var k1 = bls.SerializedPublicKey{1, 2, 3} func TestRotationMetaProcess(t *testing.T) { t.Run("same_leader_increase_count", func(t *testing.T) { rs := processRotationMeta(1, bls.SerializedPublicKey{}, LeaderRotationMeta{ - Pub: bls.SerializedPublicKey{}.Bytes(), - Epoch: 1, - Count: 1, - Shifts: 1, + Pub: bls.SerializedPublicKey{}.Bytes(), + Epoch: 1, + Count: 1, }) require.Equal(t, LeaderRotationMeta{ - Pub: bls.SerializedPublicKey{}.Bytes(), - Epoch: 1, - Count: 2, - Shifts: 1, - }, rs) - }) - - t.Run("new_leader_increase_shifts", func(t *testing.T) { - rs := processRotationMeta(1, k1, LeaderRotationMeta{ - Pub: bls.SerializedPublicKey{}.Bytes(), - Epoch: 1, - Count: 1, - Shifts: 1, - }) - require.Equal(t, LeaderRotationMeta{ - Pub: k1.Bytes(), - Epoch: 1, - Count: 1, - Shifts: 2, + Pub: bls.SerializedPublicKey{}.Bytes(), + Epoch: 1, + Count: 2, }, rs) }) t.Run("new_epoch_reset_count", func(t *testing.T) { rs := processRotationMeta(2, k1, LeaderRotationMeta{ - Pub: bls.SerializedPublicKey{}.Bytes(), - Epoch: 1, - Count: 1, - Shifts: 1, + Pub: bls.SerializedPublicKey{}.Bytes(), + Epoch: 1, + Count: 1, }) require.Equal(t, LeaderRotationMeta{ - Pub: k1.Bytes(), - Epoch: 2, - Count: 1, - Shifts: 0, + Pub: k1.Bytes(), + Epoch: 2, + Count: 1, }, rs) }) } diff --git a/core/state_processor.go b/core/state_processor.go index 11505f038..3a69a52a3 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -22,8 +22,6 @@ import ( "math/big" "time" - lru "github.com/hashicorp/golang-lru" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/rlp" @@ -40,6 +38,7 @@ import ( "github.com/harmony-one/harmony/staking/effective" "github.com/harmony-one/harmony/staking/slash" staking "github.com/harmony-one/harmony/staking/types" + lru "github.com/hashicorp/golang-lru" "github.com/pkg/errors" ) diff --git a/core/tx_pool.go b/core/tx_pool.go index 0f61b8d25..a8158b621 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -172,8 +172,8 @@ type TxPoolConfig struct { AddEvent func(tx types.PoolTransaction, local bool) // Fire add event - Blacklist map[common.Address]struct{} // Set of accounts that cannot be a part of any transaction - AllowedTxs map[common.Address]AllowedTxData // Set of allowed transactions can break the blocklist + Blacklist map[common.Address]struct{} // Set of accounts that cannot be a part of any transaction + AllowedTxs map[common.Address][]AllowedTxData // Set of allowed transactions can break the blocklist } // DefaultTxPoolConfig contains the default configurations for the transaction @@ -193,7 +193,7 @@ var DefaultTxPoolConfig = TxPoolConfig{ Lifetime: 30 * time.Minute, // --txpool.lifetime Blacklist: map[common.Address]struct{}{}, - AllowedTxs: map[common.Address]AllowedTxData{}, + AllowedTxs: map[common.Address][]AllowedTxData{}, } // sanitize checks the provided user configurations and changes anything that's @@ -753,12 +753,20 @@ func (pool *TxPool) validateTx(tx types.PoolTransaction, local bool) error { } // do whitelist check first, if tx not in whitelist, do blacklist check - if allowedTx, exists := pool.config.AllowedTxs[from]; exists { - if to := tx.To(); to == nil || *to != allowedTx.To || !bytes.Equal(tx.Data(), allowedTx.Data) { - toAddr := common.Address{} - if to != nil { - toAddr = *to + if allowedTxs, exists := pool.config.AllowedTxs[from]; exists { + txIsAllowed := false + to := tx.To() + toAddr := common.Address{} + if to != nil { + toAddr = *to + for _, allowedTx := range allowedTxs { + if toAddr == allowedTx.To && bytes.Equal(tx.Data(), allowedTx.Data) { + txIsAllowed = true + break + } } + } + if !txIsAllowed { return errors.WithMessagef(ErrAllowedTxs, "transaction sender: %x, receiver: %x, input: %x", tx.From(), toAddr, tx.Data()) } } else { diff --git a/internal/configs/harmony/harmony.go b/internal/configs/harmony/harmony.go index 7ff250148..276c90d05 100644 --- a/internal/configs/harmony/harmony.go +++ b/internal/configs/harmony/harmony.go @@ -37,6 +37,7 @@ type HarmonyConfig struct { ShardData ShardDataConfig GPO GasPriceOracleConfig Preimage *PreimageConfig + Cache CacheConfig } func (hc HarmonyConfig) ToRPCServerConfig() nodeconfig.RPCServerConfig { @@ -138,7 +139,6 @@ type GeneralConfig struct { TraceEnable bool EnablePruneBeaconChain bool RunElasticMode bool - TriesInMemory int } type TiKVConfig struct { @@ -306,6 +306,17 @@ type RevertConfig struct { RevertBefore int } +type CacheConfig struct { + Disabled bool // Whether to disable trie write caching (archive node) + TrieNodeLimit int // Memory limit (MB) at which to flush the current in-memory trie to disk + TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk + TriesInMemory uint64 // Block number from the head stored in disk before exiting + Preimages bool // Whether to store preimage of trie key to the disk + SnapshotLimit int // Memory allowance (MB) to use for caching snapshot entries in memory + SnapshotNoBuild bool // Whether the background generation is allowed + SnapshotWait bool // Wait for snapshot construction on startup +} + type PreimageConfig struct { ImportFrom string ExportTo string diff --git a/internal/configs/sharding/partner.go b/internal/configs/sharding/partner.go index 93cc4139d..bbf5dccb4 100644 --- a/internal/configs/sharding/partner.go +++ b/internal/configs/sharding/partner.go @@ -99,14 +99,14 @@ var partnerV0 = MustNewInstance( partnerReshardingEpoch, PartnerSchedule.BlocksPerEpoch(), ) var partnerV1 = MustNewInstance( - 2, 5, 4, 0, + 2, 15, 4, 0, numeric.MustNewDecFromStr("0.9"), genesis.TNHarmonyAccounts, genesis.TNFoundationalAccounts, emptyAllowlist, nil, numeric.ZeroDec(), ethCommon.Address{}, partnerReshardingEpoch, PartnerSchedule.BlocksPerEpoch(), ) var partnerV2 = MustNewInstance( - 2, 5, 4, 0, + 2, 20, 4, 0, numeric.MustNewDecFromStr("0.9"), genesis.TNHarmonyAccounts, genesis.TNFoundationalAccounts, emptyAllowlist, feeCollectorsDevnet[1], numeric.MustNewDecFromStr("0.25"), @@ -114,8 +114,8 @@ var partnerV2 = MustNewInstance( PartnerSchedule.BlocksPerEpoch(), ) var partnerV3 = MustNewInstance( - 2, 5, 1, 0, - numeric.MustNewDecFromStr("0.1"), genesis.TNHarmonyAccounts, + 2, 20, 0, 0, + numeric.MustNewDecFromStr("0.0"), genesis.TNHarmonyAccounts, genesis.TNFoundationalAccounts, emptyAllowlist, feeCollectorsDevnet[1], numeric.MustNewDecFromStr("0.25"), hip30CollectionAddressTestnet, partnerReshardingEpoch, diff --git a/internal/configs/sharding/testnet.go b/internal/configs/sharding/testnet.go index 7c2994071..f64085753 100644 --- a/internal/configs/sharding/testnet.go +++ b/internal/configs/sharding/testnet.go @@ -42,6 +42,8 @@ const ( func (ts testnetSchedule) InstanceForEpoch(epoch *big.Int) Instance { switch { + case params.TestnetChainConfig.IsTestnetExternalEpoch(epoch): + return testnetV6 case params.TestnetChainConfig.IsHIP30(epoch): return testnetV5 case params.TestnetChainConfig.IsFeeCollectEpoch(epoch): @@ -169,4 +171,12 @@ var ( hip30CollectionAddressTestnet, testnetReshardingEpoch, TestnetSchedule.BlocksPerEpoch(), ) + testnetV6 = MustNewInstance( + 2, 30, 0, 0, + numeric.MustNewDecFromStr("0.0"), genesis.TNHarmonyAccountsV1, + genesis.TNFoundationalAccounts, emptyAllowlist, + feeCollectorsTestnet, numeric.MustNewDecFromStr("0.25"), + hip30CollectionAddressTestnet, testnetReshardingEpoch, + TestnetSchedule.BlocksPerEpoch(), + ) ) diff --git a/internal/params/config.go b/internal/params/config.go index 332cd987b..20b0d44be 100644 --- a/internal/params/config.go +++ b/internal/params/config.go @@ -78,6 +78,7 @@ var ( BlockGas30MEpoch: big.NewInt(1673), // 2023-11-02 17:30:00+00:00 MaxRateEpoch: EpochTBD, DevnetExternalEpoch: EpochTBD, + TestnetExternalEpoch: EpochTBD, } // TestnetChainConfig contains the chain parameters to run a node on the harmony test network. @@ -124,6 +125,7 @@ var ( BlockGas30MEpoch: big.NewInt(2176), // 2023-10-12 10:00:00+00:00 MaxRateEpoch: EpochTBD, DevnetExternalEpoch: EpochTBD, + TestnetExternalEpoch: EpochTBD, } // PangaeaChainConfig contains the chain parameters for the Pangaea network. // All features except for CrossLink are enabled at launch. @@ -170,6 +172,7 @@ var ( BlockGas30MEpoch: big.NewInt(0), MaxRateEpoch: EpochTBD, DevnetExternalEpoch: EpochTBD, + TestnetExternalEpoch: EpochTBD, } // PartnerChainConfig contains the chain parameters for the Partner network. @@ -208,15 +211,16 @@ var ( SlotsLimitedEpoch: EpochTBD, // epoch to enable HIP-16 CrossShardXferPrecompileEpoch: big.NewInt(5), AllowlistEpoch: EpochTBD, - LeaderRotationInternalValidatorsEpoch: big.NewInt(2379), - LeaderRotationExternalValidatorsEpoch: big.NewInt(3173), + LeaderRotationInternalValidatorsEpoch: big.NewInt(144), + LeaderRotationExternalValidatorsEpoch: big.NewInt(144), FeeCollectEpoch: big.NewInt(5), ValidatorCodeFixEpoch: big.NewInt(5), HIP30Epoch: big.NewInt(7), BlockGas30MEpoch: big.NewInt(7), NoNilDelegationsEpoch: EpochTBD, MaxRateEpoch: EpochTBD, - DevnetExternalEpoch: EpochTBD, + TestnetExternalEpoch: EpochTBD, + DevnetExternalEpoch: big.NewInt(144), } // StressnetChainConfig contains the chain parameters for the Stress test network. @@ -264,6 +268,7 @@ var ( BlockGas30MEpoch: big.NewInt(0), MaxRateEpoch: EpochTBD, DevnetExternalEpoch: EpochTBD, + TestnetExternalEpoch: EpochTBD, } // LocalnetChainConfig contains the chain parameters to run for local development. @@ -310,6 +315,7 @@ var ( BlockGas30MEpoch: big.NewInt(0), MaxRateEpoch: EpochTBD, DevnetExternalEpoch: EpochTBD, + TestnetExternalEpoch: EpochTBD, } // AllProtocolChanges ... @@ -357,6 +363,7 @@ var ( big.NewInt(0), // HIP30Epoch big.NewInt(0), // NoNilDelegationsEpoch big.NewInt(0), // MaxRateEpoch + big.NewInt(0), // MaxRateEpoch big.NewInt(0), } @@ -405,6 +412,7 @@ var ( big.NewInt(0), // NoNilDelegationsEpoch big.NewInt(0), // BlockGas30M big.NewInt(0), // MaxRateEpoch + big.NewInt(0), // MaxRateEpoch big.NewInt(0), } @@ -575,6 +583,8 @@ type ChainConfig struct { DevnetExternalEpoch *big.Int `json:"devnet-external-epoch,omitempty"` + TestnetExternalEpoch *big.Int `json:"testnet-external-epoch,omitempty"` + BlockGas30MEpoch *big.Int `json:"block-gas-30m-epoch,omitempty"` // MaxRateEpoch will make sure the validator max-rate is at least equal to the minRate + the validator max-rate-increase @@ -861,6 +871,10 @@ func (c *ChainConfig) IsDevnetExternalEpoch(epoch *big.Int) bool { return isForked(c.DevnetExternalEpoch, epoch) } +func (c *ChainConfig) IsTestnetExternalEpoch(epoch *big.Int) bool { + return isForked(c.TestnetExternalEpoch, epoch) +} + func (c *ChainConfig) IsMaxRate(epoch *big.Int) bool { return isForked(c.MaxRateEpoch, epoch) } diff --git a/internal/shardchain/shardchains.go b/internal/shardchain/shardchains.go index 5da1b9186..6a9e9230a 100644 --- a/internal/shardchain/shardchains.go +++ b/internal/shardchain/shardchains.go @@ -3,7 +3,6 @@ package shardchain import ( "math/big" "sync" - "time" "github.com/harmony-one/harmony/core/state" harmonyconfig "github.com/harmony-one/harmony/internal/configs/harmony" @@ -110,14 +109,19 @@ func (sc *CollectionImpl) ShardChain(shardID uint32, options ...core.Options) (c Uint32("shardID", shardID). Msg("disable cache, running in archival mode") } else { - cacheConfig = &core.CacheConfig{ - TrieNodeLimit: 256, - TrieTimeLimit: 2 * time.Minute, - TriesInMemory: 128, - Preimages: true, - } - if sc.harmonyconfig != nil { - cacheConfig.TriesInMemory = uint64(sc.harmonyconfig.General.TriesInMemory) + hc := sc.harmonyconfig + if hc != nil { + cacheConfig = &core.CacheConfig{ + Disabled: hc.Cache.Disabled, + TrieNodeLimit: hc.Cache.TrieNodeLimit, + TrieTimeLimit: hc.Cache.TrieTimeLimit, + TriesInMemory: hc.Cache.TriesInMemory, + SnapshotLimit: hc.Cache.SnapshotLimit, + SnapshotWait: hc.Cache.SnapshotWait, + Preimages: hc.Cache.Preimages, + } + } else { + cacheConfig = nil } } diff --git a/node/api.go b/node/api.go index ef76079f1..e3862f510 100644 --- a/node/api.go +++ b/node/api.go @@ -177,7 +177,7 @@ func (node *Node) GetConfig() rpc_common.Config { // GetLastSigningPower get last signed power func (node *Node) GetLastSigningPower() (float64, error) { - power, err := node.Consensus.Decider.CurrentTotalPower(quorum.Commit) + power, err := node.Consensus.Decider().CurrentTotalPower(quorum.Commit) if err != nil { return 0, err } diff --git a/node/node.go b/node/node.go index 573786c00..d8a8e8e6f 100644 --- a/node/node.go +++ b/node/node.go @@ -11,28 +11,10 @@ import ( "sync" "time" - "github.com/harmony-one/harmony/internal/registry" - "github.com/harmony-one/harmony/internal/shardchain/tikv_manage" - "github.com/harmony-one/harmony/internal/tikv" - "github.com/harmony-one/harmony/internal/tikv/redis_helper" - "github.com/harmony-one/harmony/internal/utils/lrucache" - - "github.com/ethereum/go-ethereum/rlp" - harmonyconfig "github.com/harmony-one/harmony/internal/configs/harmony" - "github.com/harmony-one/harmony/internal/utils/crosslinks" - "github.com/ethereum/go-ethereum/common" - protobuf "github.com/golang/protobuf/proto" + "github.com/ethereum/go-ethereum/rlp" "github.com/harmony-one/abool" bls_core "github.com/harmony-one/bls/ffi/go/bls" - lru "github.com/hashicorp/golang-lru" - libp2p_pubsub "github.com/libp2p/go-libp2p-pubsub" - libp2p_peer "github.com/libp2p/go-libp2p/core/peer" - "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" - "github.com/rcrowley/go-metrics" - "golang.org/x/sync/semaphore" - "github.com/harmony-one/harmony/api/proto" msg_pb "github.com/harmony-one/harmony/api/proto/message" proto_node "github.com/harmony-one/harmony/api/proto/node" @@ -46,9 +28,16 @@ import ( "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/crypto/bls" common2 "github.com/harmony-one/harmony/internal/common" + harmonyconfig "github.com/harmony-one/harmony/internal/configs/harmony" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/internal/params" + "github.com/harmony-one/harmony/internal/registry" + "github.com/harmony-one/harmony/internal/shardchain/tikv_manage" + "github.com/harmony-one/harmony/internal/tikv" + "github.com/harmony-one/harmony/internal/tikv/redis_helper" "github.com/harmony-one/harmony/internal/utils" + "github.com/harmony-one/harmony/internal/utils/crosslinks" + "github.com/harmony-one/harmony/internal/utils/lrucache" "github.com/harmony-one/harmony/node/worker" "github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/shard" @@ -56,6 +45,14 @@ import ( "github.com/harmony-one/harmony/staking/slash" staking "github.com/harmony-one/harmony/staking/types" "github.com/harmony-one/harmony/webhooks" + lru "github.com/hashicorp/golang-lru" + libp2p_pubsub "github.com/libp2p/go-libp2p-pubsub" + libp2p_peer "github.com/libp2p/go-libp2p/core/peer" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/rcrowley/go-metrics" + "golang.org/x/sync/semaphore" + protobuf "google.golang.org/protobuf/proto" ) const ( @@ -81,7 +78,7 @@ type syncConfig struct { } type ISync interface { - UpdateBlockAndStatus(block *types.Block, bc core.BlockChain, verifyAllSig bool) error + UpdateBlockAndStatus(block *types.Block, bc core.BlockChain) error AddLastMileBlock(block *types.Block) GetActivePeerNumber() int CreateSyncConfig(peers []p2p.Peer, shardID uint32, selfPeerID libp2p_peer.ID, waitForEachPeerToConnect bool) error @@ -655,7 +652,7 @@ func validateShardBoundMessage(consensus *consensus.Consensus, peer libp2p_peer. return nil, nil, true, errors.WithStack(shard.ErrValidNotInCommittee) } } else { - count := consensus.Decider.ParticipantsCount() + count := consensus.Decider().ParticipantsCount() if (count+7)>>3 != int64(len(senderBitmap)) { nodeConsensusMessageCounterVec.With(prometheus.Labels{"type": "invalid_participant_count"}).Inc() return nil, nil, true, errors.WithStack(errWrongSizeOfBitmap) @@ -1022,7 +1019,7 @@ func New( host p2p.Host, consensusObj *consensus.Consensus, blacklist map[common.Address]struct{}, - allowedTxs map[common.Address]core.AllowedTxData, + allowedTxs map[common.Address][]core.AllowedTxData, localAccounts []common.Address, harmonyconfig *harmonyconfig.HarmonyConfig, registry *registry.Registry, @@ -1178,6 +1175,45 @@ func New( node.serviceManager = service.NewManager() + // delete old pending crosslinks + if node.Blockchain().ShardID() == shard.BeaconChainShardID { + ten := big.NewInt(10) + crossLinkEpochThreshold := new(big.Int).Sub(node.Blockchain().CurrentHeader().Epoch(), ten) + + invalidToDelete := make([]types.CrossLink, 0, 1000) + allPending, err := node.Blockchain().ReadPendingCrossLinks() + if err == nil { + for _, pending := range allPending { + // if pending crosslink is older than 10 epochs, delete it + if pending.EpochF.Cmp(crossLinkEpochThreshold) <= 0 { + invalidToDelete = append(invalidToDelete, pending) + utils.Logger().Info(). + Uint32("shard", pending.ShardID()). + Int64("epoch", pending.Epoch().Int64()). + Uint64("blockNum", pending.BlockNum()). + Int64("viewID", pending.ViewID().Int64()). + Interface("hash", pending.Hash()). + Msg("[PendingCrossLinksOnInit] delete old pending cross links") + } + } + + if n, err := node.Blockchain().DeleteFromPendingCrossLinks(invalidToDelete); err != nil { + utils.Logger().Error(). + Err(err). + Msg("[PendingCrossLinksOnInit] deleting old pending cross links failed") + } else if len(invalidToDelete) > 0 { + utils.Logger().Info(). + Int("not-deleted", n). + Int("deleted", len(invalidToDelete)). + Msg("[PendingCrossLinksOnInit] deleted old pending cross links") + } + } else { + utils.Logger().Error(). + Err(err). + Msg("[PendingCrossLinksOnInit] read pending cross links failed") + } + } + return &node } diff --git a/node/node_explorer.go b/node/node_explorer.go index ce1b0a244..1e4a4010a 100644 --- a/node/node_explorer.go +++ b/node/node_explorer.go @@ -53,7 +53,7 @@ func (node *Node) explorerMessageHandler(ctx context.Context, msg *msg_pb.Messag return err } - if !node.Consensus.Decider.IsQuorumAchievedByMask(mask) { + if !node.Consensus.Decider().IsQuorumAchievedByMask(mask) { utils.Logger().Error().Msg("[Explorer] not have enough signature power") return nil } diff --git a/node/node_newblock.go b/node/node_newblock.go index fdca8b741..bafb340a8 100644 --- a/node/node_newblock.go +++ b/node/node_newblock.go @@ -1,6 +1,7 @@ package node import ( + "math/big" "sort" "strings" "time" @@ -226,11 +227,18 @@ func (node *Node) ProposeNewBlock(commitSigs chan []byte) (*types.Block, error) utils.AnalysisStart("proposeNewBlockVerifyCrossLinks") // Prepare cross links and slashing messages var crossLinksToPropose types.CrossLinks + ten := big.NewInt(10) + crossLinkEpochThreshold := new(big.Int).Sub(currentHeader.Epoch(), ten) if isBeaconchainInCrossLinkEra { allPending, err := node.Blockchain().ReadPendingCrossLinks() invalidToDelete := []types.CrossLink{} if err == nil { for _, pending := range allPending { + // if pending crosslink is older than 10 epochs, delete it and continue. this logic is also applied when the node starts + if pending.EpochF.Cmp(crossLinkEpochThreshold) <= 0 { + invalidToDelete = append(invalidToDelete, pending) + continue + } // ReadCrossLink beacon chain usage. exist, err := node.Blockchain().ReadCrossLink(pending.ShardID(), pending.BlockNum()) if err == nil || exist != nil { @@ -263,7 +271,16 @@ func (node *Node) ProposeNewBlock(commitSigs chan []byte) (*types.Block, error) len(allPending), ) } - node.Blockchain().DeleteFromPendingCrossLinks(invalidToDelete) + if n, err := node.Blockchain().DeleteFromPendingCrossLinks(invalidToDelete); err != nil { + utils.Logger().Error(). + Err(err). + Msg("[ProposeNewBlock] invalid pending cross links failed") + } else if len(invalidToDelete) > 0 { + utils.Logger().Info(). + Int("not-deleted", n). + Int("deleted", len(invalidToDelete)). + Msg("[ProposeNewBlock] deleted invalid pending cross links") + } } utils.AnalysisEnd("proposeNewBlockVerifyCrossLinks") diff --git a/p2p/stream/protocols/sync/chain.go b/p2p/stream/protocols/sync/chain.go index 451952bcc..009c7b0af 100644 --- a/p2p/stream/protocols/sync/chain.go +++ b/p2p/stream/protocols/sync/chain.go @@ -209,7 +209,11 @@ func (ch *chainHelperImpl) getAccountRange(root common.Hash, origin common.Hash, if err != nil { return nil, nil, err } - it, err := ch.chain.Snapshots().AccountIterator(root, origin) + snapshots := ch.chain.Snapshots() + if snapshots == nil { + return nil, nil, errors.Errorf("failed to retrieve snapshots") + } + it, err := snapshots.AccountIterator(root, origin) if err != nil { return nil, nil, err } @@ -275,6 +279,10 @@ func (ch *chainHelperImpl) getStorageRanges(root common.Hash, accounts []common. proofs [][]byte size uint64 ) + snapshots := ch.chain.Snapshots() + if snapshots == nil { + return nil, nil, errors.Errorf("failed to retrieve snapshots") + } for _, account := range accounts { // If we've exceeded the requested data limit, abort without opening // a new storage range (that we'd need to prove due to exceeded size) @@ -284,7 +292,7 @@ func (ch *chainHelperImpl) getStorageRanges(root common.Hash, accounts []common. // The first account might start from a different origin and end sooner // origin==nil or limit ==nil // Retrieve the requested state and bail out if non existent - it, err := ch.chain.Snapshots().StorageIterator(root, account, origin) + it, err := snapshots.StorageIterator(root, account, origin) if err != nil { return nil, nil, err } @@ -409,7 +417,11 @@ func (ch *chainHelperImpl) getTrieNodes(root common.Hash, paths []*message.TrieN return nil, nil } // The 'snap' might be nil, in which case we cannot serve storage slots. - snap := ch.chain.Snapshots().Snapshot(root) + snapshots := ch.chain.Snapshots() + if snapshots == nil { + return nil, errors.Errorf("failed to retrieve snapshots") + } + snap := snapshots.Snapshot(root) // Retrieve trie nodes until the packet size limit is reached var ( nodes [][]byte diff --git a/p2p/stream/protocols/sync/chain_test.go b/p2p/stream/protocols/sync/chain_test.go index 414492054..eb153023c 100644 --- a/p2p/stream/protocols/sync/chain_test.go +++ b/p2p/stream/protocols/sync/chain_test.go @@ -11,11 +11,11 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rlp" - protobuf "github.com/golang/protobuf/proto" "github.com/harmony-one/harmony/block" "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/p2p/stream/protocols/sync/message" syncpb "github.com/harmony-one/harmony/p2p/stream/protocols/sync/message" + protobuf "google.golang.org/protobuf/proto" ) type testChainHelper struct{} diff --git a/p2p/stream/protocols/sync/client.go b/p2p/stream/protocols/sync/client.go index 45707e119..9ae9d5c8f 100644 --- a/p2p/stream/protocols/sync/client.go +++ b/p2p/stream/protocols/sync/client.go @@ -8,12 +8,12 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rlp" - protobuf "github.com/golang/protobuf/proto" "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/p2p/stream/protocols/sync/message" syncpb "github.com/harmony-one/harmony/p2p/stream/protocols/sync/message" sttypes "github.com/harmony-one/harmony/p2p/stream/types" "github.com/pkg/errors" + protobuf "google.golang.org/protobuf/proto" ) // GetBlocksByNumber do getBlocksByNumberRequest through sync stream protocol. diff --git a/p2p/stream/protocols/sync/protocol.go b/p2p/stream/protocols/sync/protocol.go index 0cb48bfff..b4e84592a 100644 --- a/p2p/stream/protocols/sync/protocol.go +++ b/p2p/stream/protocols/sync/protocol.go @@ -271,8 +271,6 @@ func (p *Protocol) RemoveStream(stID sttypes.StreamID) { if exist && st != nil { //TODO: log this incident with reason st.Close() - // stream manager removes this stream from the list and triggers discovery if number of streams are not enough - p.sm.RemoveStream(stID) //TODO: double check to see if this part is needed p.logger.Info(). Str("stream ID", string(stID)). Msg("stream removed") @@ -290,8 +288,6 @@ func (p *Protocol) StreamFailed(stID sttypes.StreamID, reason string) { Msg("stream failed") if st.FailedTimes() >= MaxStreamFailures { st.Close() - // stream manager removes this stream from the list and triggers discovery if number of streams are not enough - p.sm.RemoveStream(stID) //TODO: double check to see if this part is needed p.logger.Warn(). Str("stream ID", string(st.ID())). Msg("stream removed") diff --git a/p2p/stream/protocols/sync/stream.go b/p2p/stream/protocols/sync/stream.go index 3077a8a13..2f2468404 100644 --- a/p2p/stream/protocols/sync/stream.go +++ b/p2p/stream/protocols/sync/stream.go @@ -5,17 +5,16 @@ import ( "sync/atomic" "time" - "github.com/prometheus/client_golang/prometheus" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rlp" - protobuf "github.com/golang/protobuf/proto" "github.com/harmony-one/harmony/p2p/stream/protocols/sync/message" syncpb "github.com/harmony-one/harmony/p2p/stream/protocols/sync/message" sttypes "github.com/harmony-one/harmony/p2p/stream/types" libp2p_network "github.com/libp2p/go-libp2p/core/network" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/rs/zerolog" + protobuf "google.golang.org/protobuf/proto" ) // syncStream is the structure for a stream running sync protocol. @@ -84,7 +83,7 @@ func (st *syncStream) readMsgLoop() { func (st *syncStream) deliverMsg(msg protobuf.Message) { syncMsg := msg.(*syncpb.Message) if syncMsg == nil { - st.logger.Info().Str("message", msg.String()).Msg("received unexpected sync message") + st.logger.Info().Interface("message", msg).Msg("received unexpected sync message") return } if req := syncMsg.GetReq(); req != nil { diff --git a/p2p/stream/protocols/sync/stream_test.go b/p2p/stream/protocols/sync/stream_test.go index 3b538c14b..f2e546289 100644 --- a/p2p/stream/protocols/sync/stream_test.go +++ b/p2p/stream/protocols/sync/stream_test.go @@ -7,7 +7,6 @@ import ( "time" "github.com/ethereum/go-ethereum/common" - protobuf "github.com/golang/protobuf/proto" syncpb "github.com/harmony-one/harmony/p2p/stream/protocols/sync/message" sttypes "github.com/harmony-one/harmony/p2p/stream/types" ic "github.com/libp2p/go-libp2p/core/crypto" @@ -15,6 +14,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" ma "github.com/multiformats/go-multiaddr" + protobuf "google.golang.org/protobuf/proto" ) var _ sttypes.Protocol = &Protocol{} diff --git a/p2p/stream/protocols/sync/utils.go b/p2p/stream/protocols/sync/utils.go index da781c23f..d4ca22429 100644 --- a/p2p/stream/protocols/sync/utils.go +++ b/p2p/stream/protocols/sync/utils.go @@ -3,10 +3,10 @@ package sync import ( "fmt" - protobuf "github.com/golang/protobuf/proto" "github.com/harmony-one/harmony/p2p/stream/common/requestmanager" syncpb "github.com/harmony-one/harmony/p2p/stream/protocols/sync/message" "github.com/pkg/errors" + protobuf "google.golang.org/protobuf/proto" ) var ( diff --git a/scripts/travis_rosetta_checker.sh b/scripts/travis_rosetta_checker.sh index b2e395fdb..d2f98569f 100644 --- a/scripts/travis_rosetta_checker.sh +++ b/scripts/travis_rosetta_checker.sh @@ -1,12 +1,14 @@ #!/usr/bin/env bash set -e +echo $TRAVIS_PULL_REQUEST_BRANCH DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" >/dev/null 2>&1 && pwd)" echo $DIR echo $GOPATH cd $GOPATH/src/github.com/harmony-one/harmony-test git fetch git pull +git checkout $TRAVIS_PULL_REQUEST_BRANCH || true git branch --show-current cd localnet docker build -t harmonyone/localnet-test . diff --git a/scripts/travis_rpc_checker.sh b/scripts/travis_rpc_checker.sh index b057452f8..5de2ef93b 100755 --- a/scripts/travis_rpc_checker.sh +++ b/scripts/travis_rpc_checker.sh @@ -1,11 +1,13 @@ #!/usr/bin/env bash set -e +echo $TRAVIS_PULL_REQUEST_BRANCH DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" >/dev/null 2>&1 && pwd)" echo $DIR echo $GOPATH cd $GOPATH/src/github.com/harmony-one/harmony-test git fetch +git checkout $TRAVIS_PULL_REQUEST_BRANCH || true git pull git branch --show-current cd localnet diff --git a/test/chain/reward/main.go b/test/chain/reward/main.go index 165b61e36..3c3ad3a88 100644 --- a/test/chain/reward/main.go +++ b/test/chain/reward/main.go @@ -6,28 +6,24 @@ import ( "math/rand" "time" - "github.com/harmony-one/harmony/core/rawdb" - - msg_pb "github.com/harmony-one/harmony/api/proto/message" - "github.com/harmony-one/harmony/crypto/bls" - - blockfactory "github.com/harmony-one/harmony/block/factory" - "github.com/harmony-one/harmony/internal/params" - "github.com/harmony-one/harmony/internal/utils" - common2 "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" bls_core "github.com/harmony-one/bls/ffi/go/bls" + msg_pb "github.com/harmony-one/harmony/api/proto/message" + blockfactory "github.com/harmony-one/harmony/block/factory" "github.com/harmony-one/harmony/core" + "github.com/harmony-one/harmony/core/rawdb" "github.com/harmony-one/harmony/core/state" "github.com/harmony-one/harmony/core/vm" + "github.com/harmony-one/harmony/crypto/bls" "github.com/harmony-one/harmony/crypto/hash" "github.com/harmony-one/harmony/internal/chain" "github.com/harmony-one/harmony/internal/common" - - protobuf "github.com/golang/protobuf/proto" + "github.com/harmony-one/harmony/internal/params" + "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/numeric" staking "github.com/harmony-one/harmony/staking/types" + protobuf "google.golang.org/protobuf/proto" ) var (