Merge branch 'dev' into feature/clear-stale-staking-data-31024

feature/clear-stale-staking-data
static 9 months ago
commit 48162c1e3a
  1. 3
      api/service/legacysync/epoch_syncing.go
  2. 15
      api/service/legacysync/syncing.go
  3. 20
      api/service/stagedstreamsync/stage_statesync_full.go
  4. 4
      api/service/stagedstreamsync/syncing.go
  5. 4
      api/service/stagedsync/stage_lastmile.go
  6. 3
      api/service/stagedsync/stagedsync.go
  7. 29
      cmd/harmony/config.go
  8. 13
      cmd/harmony/config_migrations.go
  9. 23
      cmd/harmony/default.go
  10. 91
      cmd/harmony/flags.go
  11. 150
      cmd/harmony/flags_test.go
  12. 11
      cmd/harmony/main.go
  13. 44
      cmd/harmony/main_test.go
  14. 2
      consensus/checks.go
  15. 14
      consensus/consensus.go
  16. 46
      consensus/consensus_service.go
  17. 3
      consensus/consensus_test.go
  18. 60
      consensus/consensus_v2.go
  19. 10
      consensus/construct.go
  20. 4
      consensus/construct_test.go
  21. 2
      consensus/double_sign.go
  22. 3
      consensus/enums.go
  23. 34
      consensus/leader.go
  24. 179
      consensus/quorum/thread_safe_decider.go
  25. 2
      consensus/threshold.go
  26. 10
      consensus/validator.go
  27. 38
      consensus/view_change.go
  28. 2
      consensus/view_change_construct.go
  29. 6
      consensus/view_change_test.go
  30. 2
      core/block_validator.go
  31. 73
      core/blockchain_impl.go
  32. 32
      core/blockchain_leader_rotation.go
  33. 43
      core/blockchain_leader_rotation_test.go
  34. 3
      core/state_processor.go
  35. 24
      core/tx_pool.go
  36. 13
      internal/configs/harmony/harmony.go
  37. 8
      internal/configs/sharding/partner.go
  38. 10
      internal/configs/sharding/testnet.go
  39. 20
      internal/params/config.go
  40. 22
      internal/shardchain/shardchains.go
  41. 2
      node/api.go
  42. 80
      node/node.go
  43. 2
      node/node_explorer.go
  44. 19
      node/node_newblock.go
  45. 18
      p2p/stream/protocols/sync/chain.go
  46. 2
      p2p/stream/protocols/sync/chain_test.go
  47. 2
      p2p/stream/protocols/sync/client.go
  48. 4
      p2p/stream/protocols/sync/protocol.go
  49. 7
      p2p/stream/protocols/sync/stream.go
  50. 2
      p2p/stream/protocols/sync/stream_test.go
  51. 2
      p2p/stream/protocols/sync/utils.go
  52. 2
      scripts/travis_rosetta_checker.sh
  53. 2
      scripts/travis_rpc_checker.sh
  54. 18
      test/chain/reward/main.go

@ -138,6 +138,9 @@ func syncLoop(bc core.BlockChain, syncConfig *SyncConfig) (timeout int) {
err := ProcessStateSync(syncConfig, heights, bc)
if err != nil {
if errors.Is(err, core.ErrKnownBlock) {
return 10
}
utils.Logger().Error().Err(err).
Msgf("[EPOCHSYNC] ProcessStateSync failed (isBeacon: %t, ShardID: %d, otherEpoch: %d, currentEpoch: %d)",
isBeacon, bc.ShardID(), otherEpoch, curEpoch)

@ -860,11 +860,12 @@ func (ss *StateSync) getBlockFromLastMileBlocksByParentHash(parentHash common.Ha
}
// UpdateBlockAndStatus ...
func (ss *StateSync) UpdateBlockAndStatus(block *types.Block, bc core.BlockChain, verifyAllSig bool) error {
func (ss *StateSync) UpdateBlockAndStatus(block *types.Block, bc core.BlockChain) error {
if block.NumberU64() != bc.CurrentBlock().NumberU64()+1 {
utils.Logger().Debug().Uint64("curBlockNum", bc.CurrentBlock().NumberU64()).Uint64("receivedBlockNum", block.NumberU64()).Msg("[SYNC] Inappropriate block number, ignore!")
return nil
}
verifyAllSig := true
haveCurrentSig := len(block.GetCurrentCommitSig()) != 0
// Verify block signatures
@ -911,6 +912,7 @@ func (ss *StateSync) UpdateBlockAndStatus(block *types.Block, bc core.BlockChain
Uint64("blockEpoch", block.Epoch().Uint64()).
Str("blockHex", block.Hash().Hex()).
Uint32("ShardID", block.ShardID()).
Err(err).
Msg("[SYNC] UpdateBlockAndStatus: Block exists")
return nil
case err != nil:
@ -954,8 +956,8 @@ func (ss *StateSync) generateNewState(bc core.BlockChain) error {
break
}
// Enforce sig check for the last block in a batch
enforceSigCheck := !commonIter.HasNext()
err = ss.UpdateBlockAndStatus(block, bc, enforceSigCheck)
_ = !commonIter.HasNext()
err = ss.UpdateBlockAndStatus(block, bc)
if err != nil {
break
}
@ -972,7 +974,7 @@ func (ss *StateSync) generateNewState(bc core.BlockChain) error {
if block == nil {
break
}
err = ss.UpdateBlockAndStatus(block, bc, true)
err = ss.UpdateBlockAndStatus(block, bc)
if err != nil {
break
}
@ -993,7 +995,7 @@ func (ss *StateSync) generateNewState(bc core.BlockChain) error {
if block == nil {
break
}
err = ss.UpdateBlockAndStatus(block, bc, false)
err = ss.UpdateBlockAndStatus(block, bc)
if err != nil {
break
}
@ -1121,6 +1123,9 @@ func (ss *StateSync) SyncLoop(bc core.BlockChain, isBeacon bool, consensus *cons
}
err := ss.ProcessStateSync(startHash[:], size, bc)
if err != nil {
if errors.Is(err, core.ErrKnownBlock) {
continue
}
utils.Logger().Error().Err(err).
Msgf("[SYNC] ProcessStateSync failed (isBeacon: %t, ShardID: %d, otherHeight: %d, currentHeight: %d)",
isBeacon, bc.ShardID(), otherHeight, currentHeight)

@ -190,7 +190,11 @@ func (sss *StageFullStateSync) runStateWorkerLoop(ctx context.Context, sdm *Full
default:
}
accountTasks, codes, storages, healtask, codetask, nTasks, err := sdm.GetNextBatch()
if nTasks == 0 || err != nil {
if nTasks == 0 {
utils.Logger().Debug().Msg("the state worker loop received no more tasks")
return
}
if err != nil {
select {
case <-ctx.Done():
return
@ -199,7 +203,7 @@ func (sss *StageFullStateSync) runStateWorkerLoop(ctx context.Context, sdm *Full
}
}
if len(accountTasks) > 0 {
if accountTasks != nil && len(accountTasks) > 0 {
task := accountTasks[0]
origin := task.Next
@ -222,8 +226,8 @@ func (sss *StageFullStateSync) runStateWorkerLoop(ctx context.Context, sdm *Full
utils.Logger().Warn().
Str("stream", string(stid)).
Msg(WrapStagedSyncMsg("GetAccountRange failed, received empty accounts"))
err := errors.New("GetAccountRange received empty slots")
sdm.HandleRequestError(accountTasks, codes, storages, healtask, codetask, stid, err)
//err := errors.New("GetAccountRange received empty slots")
//sdm.HandleRequestError(accountTasks, codes, storages, healtask, codetask, stid, err)
return
}
if err := sdm.HandleAccountRequestResult(task, retAccounts, proof, origin[:], limit[:], loopID, stid); err != nil {
@ -236,7 +240,7 @@ func (sss *StageFullStateSync) runStateWorkerLoop(ctx context.Context, sdm *Full
return
}
} else if len(codes) > 0 {
} else if codes != nil && len(codes) > 0 {
stid, err := sss.downloadByteCodes(ctx, sdm, codes, loopID)
if err != nil {
@ -252,7 +256,7 @@ func (sss *StageFullStateSync) runStateWorkerLoop(ctx context.Context, sdm *Full
return
}
} else if len(storages.accounts) > 0 {
} else if storages != nil && len(storages.accounts) > 0 {
root := storages.root
roots := storages.roots
@ -295,7 +299,7 @@ func (sss *StageFullStateSync) runStateWorkerLoop(ctx context.Context, sdm *Full
} else {
// assign trie node Heal Tasks
if len(healtask.hashes) > 0 {
if healtask != nil && len(healtask.hashes) > 0 {
root := healtask.root
task := healtask.task
hashes := healtask.hashes
@ -334,7 +338,7 @@ func (sss *StageFullStateSync) runStateWorkerLoop(ctx context.Context, sdm *Full
}
}
if len(codetask.hashes) > 0 {
if codetask != nil && len(codetask.hashes) > 0 {
task := codetask.task
hashes := codetask.hashes
bytes := codetask.bytes

@ -465,6 +465,10 @@ func (s *StagedStreamSync) CurrentBlockNumber() uint64 {
return s.bc.CurrentBlock().NumberU64()
}
if s.status.pivotBlock != nil && s.bc.CurrentFastBlock().NumberU64() >= s.status.pivotBlock.NumberU64() {
return s.bc.CurrentFastBlock().NumberU64()
}
current := uint64(0)
switch s.config.SyncMode {
case FullSync:

@ -49,7 +49,7 @@ func (lm *StageLastMile) Exec(firstCycle bool, invalidBlockRevert bool, s *Stage
if block == nil {
break
}
err = s.state.UpdateBlockAndStatus(block, bc, true)
err = s.state.UpdateBlockAndStatus(block, bc)
if err != nil {
break
}
@ -70,7 +70,7 @@ func (lm *StageLastMile) Exec(firstCycle bool, invalidBlockRevert bool, s *Stage
if block == nil {
break
}
err = s.state.UpdateBlockAndStatus(block, bc, false)
err = s.state.UpdateBlockAndStatus(block, bc)
if err != nil {
break
}

@ -1035,7 +1035,7 @@ func (ss *StagedSync) getBlockFromLastMileBlocksByParentHash(parentHash common.H
}
// UpdateBlockAndStatus updates block and its status in db
func (ss *StagedSync) UpdateBlockAndStatus(block *types.Block, bc core.BlockChain, verifyAllSig bool) error {
func (ss *StagedSync) UpdateBlockAndStatus(block *types.Block, bc core.BlockChain) error {
if block.NumberU64() != bc.CurrentBlock().NumberU64()+1 {
utils.Logger().Debug().
Uint64("curBlockNum", bc.CurrentBlock().NumberU64()).
@ -1043,6 +1043,7 @@ func (ss *StagedSync) UpdateBlockAndStatus(block *types.Block, bc core.BlockChai
Msg("[STAGED_SYNC] Inappropriate block number, ignore!")
return nil
}
verifyAllSig := true
haveCurrentSig := len(block.GetCurrentCommitSig()) != 0
// Verify block signatures

@ -145,6 +145,35 @@ func getDefaultSyncConfig(nt nodeconfig.NetworkType) harmonyconfig.SyncConfig {
}
}
func getDefaultCacheConfig(nt nodeconfig.NetworkType) harmonyconfig.CacheConfig {
cacheConfig := harmonyconfig.CacheConfig{
Disabled: defaultCacheConfig.Disabled,
TrieNodeLimit: defaultCacheConfig.TrieNodeLimit,
TriesInMemory: defaultCacheConfig.TriesInMemory,
TrieTimeLimit: defaultCacheConfig.TrieTimeLimit,
SnapshotLimit: defaultCacheConfig.SnapshotLimit,
SnapshotWait: defaultCacheConfig.SnapshotWait,
Preimages: defaultCacheConfig.Preimages,
SnapshotNoBuild: defaultCacheConfig.SnapshotNoBuild,
}
switch nt {
case nodeconfig.Mainnet:
cacheConfig.Disabled = true
cacheConfig.Preimages = true
case nodeconfig.Testnet:
cacheConfig.Disabled = false
cacheConfig.Preimages = true
case nodeconfig.Localnet:
cacheConfig.Disabled = false
cacheConfig.Preimages = false
default:
cacheConfig.Disabled = false
cacheConfig.Preimages = true
}
return cacheConfig
}
var configCmd = &cobra.Command{
Use: "config",
Short: "dump or update config",

@ -334,7 +334,7 @@ func init() {
migrations["2.5.11"] = func(confTree *toml.Tree) *toml.Tree {
if confTree.Get("General.TriesInMemory") == nil {
confTree.Set("General.TriesInMemory", defaultConfig.General.TriesInMemory)
confTree.Set("General.TriesInMemory", defaultConfig.Cache.TriesInMemory)
}
confTree.Set("Version", "2.5.12")
return confTree
@ -405,6 +405,17 @@ func init() {
return confTree
}
migrations["2.6.0"] = func(confTree *toml.Tree) *toml.Tree {
confTree.Delete("General.TriesInMemory")
if confTree.Get("Cache") == nil {
confTree.Set("Cache", defaultConfig.Cache)
}
// upgrade minor version because of `Cache` section introduction
confTree.Set("Version", "2.6.1")
return confTree
}
// check that the latest version here is the same as in default.go
largestKey := getNextVersion(migrations)
if largestKey != tomlConfigVersion {

@ -1,13 +1,15 @@
package main
import (
"time"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/hmy"
harmonyconfig "github.com/harmony-one/harmony/internal/configs/harmony"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
)
const tomlConfigVersion = "2.6.0"
const tomlConfigVersion = "2.6.1"
const (
defNetworkType = nodeconfig.Mainnet
@ -24,7 +26,6 @@ var defaultConfig = harmonyconfig.HarmonyConfig{
IsOffline: false,
DataDir: "./",
TraceEnable: false,
TriesInMemory: 128,
},
Network: getDefaultNetworkConfig(defNetworkType),
P2P: harmonyconfig.P2pConfig{
@ -131,6 +132,7 @@ var defaultConfig = harmonyconfig.HarmonyConfig{
LowUsageThreshold: hmy.DefaultGPOConfig.LowUsageThreshold,
BlockGasLimit: hmy.DefaultGPOConfig.BlockGasLimit,
},
Cache: getDefaultCacheConfig(defNetworkType),
}
var defaultSysConfig = harmonyconfig.SysConfig{
@ -271,6 +273,17 @@ var (
}
)
var defaultCacheConfig = harmonyconfig.CacheConfig{
Disabled: false,
TrieNodeLimit: 256,
TriesInMemory: 128,
TrieTimeLimit: 2 * time.Minute,
SnapshotLimit: 256,
SnapshotWait: true,
Preimages: true,
SnapshotNoBuild: false,
}
const (
defaultBroadcastInvalidTx = false
)
@ -285,6 +298,7 @@ func getDefaultHmyConfigCopy(nt nodeconfig.NetworkType) harmonyconfig.HarmonyCon
}
config.Sync = getDefaultSyncConfig(nt)
config.DNSSync = getDefaultDNSSyncConfig(nt)
config.Cache = getDefaultCacheConfig(nt)
return config
}
@ -324,6 +338,11 @@ func getDefaultPrometheusConfigCopy() harmonyconfig.PrometheusConfig {
return config
}
func getDefaultCacheConfigCopy() harmonyconfig.CacheConfig {
config := defaultCacheConfig
return config
}
const (
nodeTypeValidator = "validator"
nodeTypeExplorer = "explorer"

@ -32,7 +32,6 @@ var (
legacyDataDirFlag,
taraceFlag,
triesInMemoryFlag,
}
dnsSyncFlags = []cli.Flag{
@ -268,6 +267,16 @@ var (
gpoBlockGasLimitFlag,
}
cacheConfigFlags = []cli.Flag{
cacheDisabled,
cacheTrieNodeLimit,
cacheTriesInMemory,
cachePreimages,
cacheSnapshotLimit,
cacheSnapshotNoBuild,
cacheSnapshotWait,
}
metricsFlags = []cli.Flag{
metricsETHFlag,
metricsExpensiveETHFlag,
@ -352,11 +361,6 @@ var (
Usage: "indicates if full transaction tracing should be enabled",
DefValue: defaultConfig.General.TraceEnable,
}
triesInMemoryFlag = cli.IntFlag{
Name: "blockchain.tries_in_memory",
Usage: "number of blocks from header stored in disk before exiting",
DefValue: defaultConfig.General.TriesInMemory,
}
)
func getRootFlags() []cli.Flag {
@ -436,14 +440,6 @@ func applyGeneralFlags(cmd *cobra.Command, config *harmonyconfig.HarmonyConfig)
if cli.IsFlagChanged(cmd, isBackupFlag) {
config.General.IsBackup = cli.GetBoolFlagValue(cmd, isBackupFlag)
}
if cli.IsFlagChanged(cmd, triesInMemoryFlag) {
value := cli.GetIntFlagValue(cmd, triesInMemoryFlag)
if value <= 2 {
panic("Must provide number greater than 2 for General.TriesInMemory")
}
config.General.TriesInMemory = value
}
}
// network flags
@ -2115,3 +2111,70 @@ func applyGPOFlags(cmd *cobra.Command, cfg *harmonyconfig.HarmonyConfig) {
cfg.GPO.BlockGasLimit = cli.GetIntFlagValue(cmd, gpoBlockGasLimitFlag)
}
}
// cache config flags
var (
cacheDisabled = cli.BoolFlag{
Name: "cache.disabled",
Usage: "Whether to disable trie write caching (archive node)",
DefValue: defaultCacheConfig.Disabled,
}
cacheTrieNodeLimit = cli.IntFlag{
Name: "cache.trie_node_limit",
Usage: " Memory limit (MB) at which to flush the current in-memory trie to disk",
DefValue: defaultCacheConfig.TrieNodeLimit,
}
cacheTriesInMemory = cli.Uint64Flag{
Name: "cache.tries_in_memory",
Usage: "Block number from the head stored in disk before exiting",
DefValue: defaultCacheConfig.TriesInMemory,
}
cachePreimages = cli.BoolFlag{
Name: "cache.preimages",
Usage: "Whether to store preimage of trie key to the disk",
DefValue: defaultCacheConfig.Preimages,
}
cacheSnapshotLimit = cli.IntFlag{
Name: "cache.snapshot_limit",
Usage: "Memory allowance (MB) to use for caching snapshot entries in memory",
DefValue: defaultCacheConfig.SnapshotLimit,
}
cacheSnapshotNoBuild = cli.BoolFlag{
Name: "cache.snapshot_no_build",
Usage: "Whether the background generation is allowed",
DefValue: defaultCacheConfig.SnapshotNoBuild,
}
cacheSnapshotWait = cli.BoolFlag{
Name: "cache.snapshot_wait",
Usage: "Wait for snapshot construction on startup",
DefValue: defaultCacheConfig.SnapshotWait,
}
)
func applyCacheFlags(cmd *cobra.Command, cfg *harmonyconfig.HarmonyConfig) {
if cli.IsFlagChanged(cmd, cacheDisabled) {
cfg.Cache.Disabled = cli.GetBoolFlagValue(cmd, cacheDisabled)
}
if cli.IsFlagChanged(cmd, cacheTrieNodeLimit) {
cfg.Cache.TrieNodeLimit = cli.GetIntFlagValue(cmd, cacheTrieNodeLimit)
}
if cli.IsFlagChanged(cmd, cacheTriesInMemory) {
value := cli.GetUint64FlagValue(cmd, cacheTriesInMemory)
if value <= 2 {
panic("Must provide number greater than 2 for Cache.TriesInMemory")
}
cfg.Cache.TriesInMemory = value
}
if cli.IsFlagChanged(cmd, cachePreimages) {
cfg.Cache.Preimages = cli.GetBoolFlagValue(cmd, cachePreimages)
}
if cli.IsFlagChanged(cmd, cacheSnapshotLimit) {
cfg.Cache.SnapshotLimit = cli.GetIntFlagValue(cmd, cacheSnapshotLimit)
}
if cli.IsFlagChanged(cmd, cacheSnapshotNoBuild) {
cfg.Cache.SnapshotNoBuild = cli.GetBoolFlagValue(cmd, cacheSnapshotNoBuild)
}
if cli.IsFlagChanged(cmd, cacheSnapshotWait) {
cfg.Cache.SnapshotWait = cli.GetBoolFlagValue(cmd, cacheSnapshotWait)
}
}

@ -37,12 +37,11 @@ func TestHarmonyFlags(t *testing.T) {
expConfig: harmonyconfig.HarmonyConfig{
Version: tomlConfigVersion,
General: harmonyconfig.GeneralConfig{
NodeType: "validator",
NoStaking: false,
ShardID: -1,
IsArchival: false,
DataDir: "./",
TriesInMemory: 128,
NodeType: "validator",
NoStaking: false,
ShardID: -1,
IsArchival: false,
DataDir: "./",
},
Network: harmonyconfig.NetworkConfig{
NetworkType: "mainnet",
@ -183,6 +182,16 @@ func TestHarmonyFlags(t *testing.T) {
LowUsageThreshold: defaultConfig.GPO.LowUsageThreshold,
BlockGasLimit: defaultConfig.GPO.BlockGasLimit,
},
Cache: harmonyconfig.CacheConfig{
Disabled: defaultConfig.Cache.Disabled,
TrieNodeLimit: defaultCacheConfig.TrieNodeLimit,
TriesInMemory: defaultConfig.Cache.TriesInMemory,
TrieTimeLimit: defaultConfig.Cache.TrieTimeLimit,
SnapshotLimit: defaultConfig.Cache.SnapshotLimit,
SnapshotWait: defaultConfig.Cache.SnapshotWait,
Preimages: defaultConfig.Cache.Preimages,
SnapshotNoBuild: defaultConfig.Cache.SnapshotNoBuild,
},
},
},
}
@ -208,80 +217,63 @@ func TestGeneralFlags(t *testing.T) {
{
args: []string{},
expConfig: harmonyconfig.GeneralConfig{
NodeType: "validator",
NoStaking: false,
ShardID: -1,
IsArchival: false,
DataDir: "./",
TriesInMemory: 128,
NodeType: "validator",
NoStaking: false,
ShardID: -1,
IsArchival: false,
DataDir: "./",
},
},
{
args: []string{"--run", "explorer", "--run.legacy", "--run.shard=0",
"--run.archive=true", "--datadir=./.hmy"},
expConfig: harmonyconfig.GeneralConfig{
NodeType: "explorer",
NoStaking: true,
ShardID: 0,
IsArchival: true,
DataDir: "./.hmy",
TriesInMemory: 128,
NodeType: "explorer",
NoStaking: true,
ShardID: 0,
IsArchival: true,
DataDir: "./.hmy",
},
},
{
args: []string{"--node_type", "explorer", "--staking", "--shard_id", "0",
"--is_archival", "--db_dir", "./"},
expConfig: harmonyconfig.GeneralConfig{
NodeType: "explorer",
NoStaking: false,
ShardID: 0,
IsArchival: true,
DataDir: "./",
TriesInMemory: 128,
NodeType: "explorer",
NoStaking: false,
ShardID: 0,
IsArchival: true,
DataDir: "./",
},
},
{
args: []string{"--staking=false", "--is_archival=false"},
expConfig: harmonyconfig.GeneralConfig{
NodeType: "validator",
NoStaking: true,
ShardID: -1,
IsArchival: false,
DataDir: "./",
TriesInMemory: 128,
NodeType: "validator",
NoStaking: true,
ShardID: -1,
IsArchival: false,
DataDir: "./",
},
},
{
args: []string{"--run", "explorer", "--run.shard", "0"},
expConfig: harmonyconfig.GeneralConfig{
NodeType: "explorer",
NoStaking: false,
ShardID: 0,
IsArchival: false,
DataDir: "./",
TriesInMemory: 128,
NodeType: "explorer",
NoStaking: false,
ShardID: 0,
IsArchival: false,
DataDir: "./",
},
},
{
args: []string{"--run", "explorer", "--run.shard", "0", "--run.archive=false"},
expConfig: harmonyconfig.GeneralConfig{
NodeType: "explorer",
NoStaking: false,
ShardID: 0,
IsArchival: false,
DataDir: "./",
TriesInMemory: 128,
},
},
{
args: []string{"--blockchain.tries_in_memory", "64"},
expConfig: harmonyconfig.GeneralConfig{
NodeType: "validator",
NoStaking: false,
ShardID: -1,
IsArchival: false,
DataDir: "./",
TriesInMemory: 64,
NodeType: "explorer",
NoStaking: false,
ShardID: 0,
IsArchival: false,
DataDir: "./",
},
},
}
@ -1435,6 +1427,58 @@ func TestGPOFlags(t *testing.T) {
}
}
func TestCacheFlags(t *testing.T) {
tests := []struct {
args []string
expConfig harmonyconfig.CacheConfig
expErr error
}{
{
args: []string{},
expConfig: harmonyconfig.CacheConfig{
Disabled: true, // based on network type
TrieNodeLimit: defaultCacheConfig.TrieNodeLimit,
TriesInMemory: defaultCacheConfig.TriesInMemory,
TrieTimeLimit: defaultCacheConfig.TrieTimeLimit,
SnapshotLimit: defaultCacheConfig.SnapshotLimit,
SnapshotWait: defaultCacheConfig.SnapshotWait,
Preimages: defaultCacheConfig.Preimages, // based on network type
SnapshotNoBuild: defaultCacheConfig.SnapshotNoBuild,
},
},
{
args: []string{"--cache.disabled=true", "--cache.trie_node_limit", "512", "--cache.tries_in_memory", "256", "--cache.preimages=false", "--cache.snapshot_limit", "512", "--cache.snapshot_no_build=true", "--cache.snapshot_wait=false"},
expConfig: harmonyconfig.CacheConfig{
Disabled: true,
TrieNodeLimit: 512,
TriesInMemory: 256,
TrieTimeLimit: 2 * time.Minute,
SnapshotLimit: 512,
SnapshotWait: false,
Preimages: false,
SnapshotNoBuild: true,
},
},
}
for i, test := range tests {
ts := newFlagTestSuite(t, cacheConfigFlags, applyCacheFlags)
hc, err := ts.run(test.args)
if assErr := assertError(err, test.expErr); assErr != nil {
t.Fatalf("Test %v: %v", i, assErr)
}
if err != nil || test.expErr != nil {
continue
}
if !reflect.DeepEqual(hc.Cache, test.expConfig) {
t.Errorf("Test %v:\n\t%+v\n\t%+v", i, hc.Cache, test.expConfig)
}
ts.tearDown()
}
}
func TestDevnetFlags(t *testing.T) {
tests := []struct {
args []string

@ -245,6 +245,7 @@ func applyRootFlags(cmd *cobra.Command, config *harmonyconfig.HarmonyConfig) {
applySyncFlags(cmd, config)
applyShardDataFlags(cmd, config)
applyGPOFlags(cmd, config)
applyCacheFlags(cmd, config)
}
func setupNodeLog(config harmonyconfig.HarmonyConfig) {
@ -1061,8 +1062,8 @@ func setupBlacklist(hc harmonyconfig.HarmonyConfig) (map[ethCommon.Address]struc
return addrMap, nil
}
func parseAllowedTxs(data []byte) (map[ethCommon.Address]core.AllowedTxData, error) {
allowedTxs := make(map[ethCommon.Address]core.AllowedTxData)
func parseAllowedTxs(data []byte) (map[ethCommon.Address][]core.AllowedTxData, error) {
allowedTxs := make(map[ethCommon.Address][]core.AllowedTxData)
for _, line := range strings.Split(string(data), "\n") {
line = strings.TrimSpace(line)
if len(line) != 0 { // AllowedTxs file may have trailing empty string line
@ -1083,16 +1084,16 @@ func parseAllowedTxs(data []byte) (map[ethCommon.Address]core.AllowedTxData, err
if err != nil {
return nil, err
}
allowedTxs[from] = core.AllowedTxData{
allowedTxs[from] = append(allowedTxs[from], core.AllowedTxData{
To: to,
Data: data,
}
})
}
}
return allowedTxs, nil
}
func setupAllowedTxs(hc harmonyconfig.HarmonyConfig) (map[ethCommon.Address]core.AllowedTxData, error) {
func setupAllowedTxs(hc harmonyconfig.HarmonyConfig) (map[ethCommon.Address][]core.AllowedTxData, error) {
utils.Logger().Debug().Msgf("Using AllowedTxs file at `%s`", hc.TxPool.AllowedTxsFile)
data, err := os.ReadFile(hc.TxPool.AllowedTxsFile)
if err != nil {

@ -16,22 +16,26 @@ func TestAllowedTxsParse(t *testing.T) {
one1s4dvv454dtmkzsulffz3epewsyhrjq9y0g3fqz->0x985458E523dB3d53125813eD68c274899e9DfAb4:0xa9059cbb
one1s4dvv454dtmkzsulffz3epewsyhrjq9y0g3fqz->one10fhdp2g9q5azrs2ukk608x6krd4rleg0ueskug:0x
`)
expected := map[ethCommon.Address]core.AllowedTxData{
common.HexToAddress("0x7A6Ed0a905053A21C15cB5b4F39b561B6A3FE50f"): core.AllowedTxData{
To: common.HexToAddress("0x855Ac656956AF761439f4a451c872E812E3900a4"),
Data: common.FromHex("0x"),
expected := map[ethCommon.Address][]core.AllowedTxData{
common.HexToAddress("0x7A6Ed0a905053A21C15cB5b4F39b561B6A3FE50f"): {
core.AllowedTxData{
To: common.HexToAddress("0x855Ac656956AF761439f4a451c872E812E3900a4"),
Data: common.FromHex("0x"),
},
core.AllowedTxData{
To: common.HexToAddress("0x985458E523dB3d53125813eD68c274899e9DfAb4"),
Data: common.FromHex("0xa9059cbb"),
},
},
common.HexToAddress("0x7A6Ed0a905053A21C15cB5b4F39b561B6A3FE50f"): core.AllowedTxData{
To: common.HexToAddress("0x985458E523dB3d53125813eD68c274899e9DfAb4"),
Data: common.FromHex("0xa9059cbb"),
},
common.HexToAddress("0x855Ac656956AF761439f4a451c872E812E3900a4"): core.AllowedTxData{
To: common.HexToAddress("0x985458E523dB3d53125813eD68c274899e9DfAb4"),
Data: common.FromHex("0xa9059cbb"),
},
common.HexToAddress("0x855Ac656956AF761439f4a451c872E812E3900a4"): core.AllowedTxData{
To: common.HexToAddress("0x7A6Ed0a905053A21C15cB5b4F39b561B6A3FE50f"),
Data: common.FromHex("0x"),
common.HexToAddress("0x855Ac656956AF761439f4a451c872E812E3900a4"): {
core.AllowedTxData{
To: common.HexToAddress("0x985458E523dB3d53125813eD68c274899e9DfAb4"),
Data: common.FromHex("0xa9059cbb"),
},
core.AllowedTxData{
To: common.HexToAddress("0x7A6Ed0a905053A21C15cB5b4F39b561B6A3FE50f"),
Data: common.FromHex("0x"),
},
},
}
got, err := parseAllowedTxs(testData)
@ -41,10 +45,12 @@ func TestAllowedTxsParse(t *testing.T) {
if len(got) != len(expected) {
t.Errorf("lenght of allowed transactions not equal, got: %d expected: %d", len(got), len(expected))
}
for from, txData := range got {
expectedTxData := expected[from]
if expectedTxData.To != txData.To || !bytes.Equal(expectedTxData.Data, txData.Data) {
t.Errorf("txData not equal: got: %v expected: %v", txData, expectedTxData)
for from, txsData := range got {
for i, txData := range txsData {
expectedTxData := expected[from][i]
if expectedTxData.To != txData.To || !bytes.Equal(expectedTxData.Data, txData.Data) {
t.Errorf("txData not equal: got: %v expected: %v", txData, expectedTxData)
}
}
}
}

@ -4,13 +4,13 @@ import (
"bytes"
"encoding/binary"
protobuf "github.com/golang/protobuf/proto"
libbls "github.com/harmony-one/bls/ffi/go/bls"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/crypto/hash"
"github.com/pkg/errors"
protobuf "google.golang.org/protobuf/proto"
)
// MaxBlockNumDiff limits the received block number to only 100 further from the current block number

@ -46,7 +46,7 @@ type DownloadAsync interface {
// Consensus is the main struct with all states and data related to consensus process.
type Consensus struct {
Decider quorum.Decider
decider quorum.Decider
// FBFTLog stores the pbft messages and blocks during FBFT process
fBFTLog *FBFTLog
// phase: different phase of FBFT protocol: pre-prepare, prepare, commit, finish etc
@ -200,7 +200,9 @@ func (consensus *Consensus) BlocksNotSynchronized(reason string) {
// VdfSeedSize returns the number of VRFs for VDF computation
func (consensus *Consensus) VdfSeedSize() int {
return int(consensus.Decider.ParticipantsCount()) * 2 / 3
consensus.mutex.RLock()
defer consensus.mutex.RUnlock()
return int(consensus.decider.ParticipantsCount()) * 2 / 3
}
// GetPublicKeys returns the public keys
@ -252,6 +254,8 @@ func (consensus *Consensus) getConsensusLeaderPrivateKey() (*bls.PrivateKeyWrapp
}
func (consensus *Consensus) IsBackup() bool {
consensus.mutex.RLock()
defer consensus.mutex.RUnlock()
return consensus.isBackup
}
@ -275,7 +279,7 @@ func New(
fBFTLog: NewFBFTLog(),
phase: FBFTAnnounce,
current: State{mode: Normal},
Decider: Decider,
decider: Decider,
registry: registry,
MinPeers: minPeers,
AggregateSig: aggregateSig,
@ -322,6 +326,10 @@ func (consensus *Consensus) Registry() *registry.Registry {
return consensus.registry
}
func (consensus *Consensus) Decider() quorum.Decider {
return quorum.NewThreadSafeDecider(consensus.decider, consensus.mutex)
}
// InitConsensusWithValidators initialize shard state
// from latest epoch and update committee pub
// keys for consensus

@ -5,27 +5,26 @@ import (
"sync/atomic"
"time"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/multibls"
"github.com/harmony-one/harmony/webhooks"
"github.com/ethereum/go-ethereum/common"
protobuf "github.com/golang/protobuf/proto"
bls_core "github.com/harmony-one/bls/ffi/go/bls"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
consensus_engine "github.com/harmony-one/harmony/consensus/engine"
"github.com/harmony-one/harmony/consensus/quorum"
"github.com/harmony-one/harmony/consensus/signature"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/crypto/bls"
bls_cosi "github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/crypto/hash"
"github.com/harmony-one/harmony/internal/chain"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/multibls"
"github.com/harmony-one/harmony/shard"
"github.com/harmony-one/harmony/shard/committee"
"github.com/harmony-one/harmony/webhooks"
"github.com/pkg/errors"
"github.com/rs/zerolog"
protobuf "google.golang.org/protobuf/proto"
)
// WaitForNewRandomness listens to the RndChannel to receive new VDF randomness.
@ -82,7 +81,7 @@ func (consensus *Consensus) UpdatePublicKeys(pubKeys, allowlist []bls_cosi.Publi
}
func (consensus *Consensus) updatePublicKeys(pubKeys, allowlist []bls_cosi.PublicKeyWrapper) int64 {
consensus.Decider.UpdateParticipants(pubKeys, allowlist)
consensus.decider.UpdateParticipants(pubKeys, allowlist)
consensus.getLogger().Info().Msg("My Committee updated")
for i := range pubKeys {
consensus.getLogger().Info().
@ -91,7 +90,7 @@ func (consensus *Consensus) updatePublicKeys(pubKeys, allowlist []bls_cosi.Publi
Msg("Member")
}
allKeys := consensus.Decider.Participants()
allKeys := consensus.decider.Participants()
if len(allKeys) != 0 {
consensus.LeaderPubKey = &allKeys[0]
consensus.getLogger().Info().
@ -115,7 +114,7 @@ func (consensus *Consensus) updatePublicKeys(pubKeys, allowlist []bls_cosi.Publi
if !consensus.isViewChangingMode() {
consensus.resetViewChangeState()
}
return consensus.Decider.ParticipantsCount()
return consensus.decider.ParticipantsCount()
}
// Sign on the hash of the message
@ -144,7 +143,7 @@ func (consensus *Consensus) updateBitmaps() {
consensus.getLogger().Debug().
Str("MessageType", consensus.phase.String()).
Msg("[UpdateBitmaps] Updating consensus bitmaps")
members := consensus.Decider.Participants()
members := consensus.decider.Participants()
prepareBitmap := bls_cosi.NewMask(members)
commitBitmap := bls_cosi.NewMask(members)
multiSigBitmap := bls_cosi.NewMask(members)
@ -160,7 +159,7 @@ func (consensus *Consensus) resetState() {
consensus.blockHash = [32]byte{}
consensus.block = []byte{}
consensus.Decider.ResetPrepareAndCommitVotes()
consensus.decider.ResetPrepareAndCommitVotes()
if consensus.prepareBitmap != nil {
consensus.prepareBitmap.Clear()
}
@ -179,7 +178,7 @@ func (consensus *Consensus) IsValidatorInCommittee(pubKey bls.SerializedPublicKe
}
func (consensus *Consensus) isValidatorInCommittee(pubKey bls.SerializedPublicKey) bool {
return consensus.Decider.IndexOf(pubKey) != -1
return consensus.decider.IndexOf(pubKey) != -1
}
// SetMode sets the mode of consensus
@ -191,10 +190,6 @@ func (consensus *Consensus) SetMode(m Mode) {
// SetMode sets the mode of consensus
func (consensus *Consensus) setMode(m Mode) {
if m == Normal && consensus.isBackup {
m = NormalBackup
}
consensus.getLogger().Debug().
Str("Mode", m.String()).
Msg("[SetMode]")
@ -203,11 +198,12 @@ func (consensus *Consensus) setMode(m Mode) {
// SetIsBackup sets the mode of consensus
func (consensus *Consensus) SetIsBackup(isBackup bool) {
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
consensus.getLogger().Debug().
Bool("IsBackup", isBackup).
Msg("[SetIsBackup]")
consensus.isBackup = isBackup
consensus.current.SetIsBackup(isBackup)
}
// Mode returns the mode of consensus
@ -271,7 +267,7 @@ func (consensus *Consensus) setBlockNum(blockNum uint64) {
// ReadSignatureBitmapPayload read the payload for signature and bitmap; offset is the beginning position of reading
func (consensus *Consensus) ReadSignatureBitmapPayload(recvPayload []byte, offset int) (*bls_core.Sign, *bls_cosi.Mask, error) {
consensus.mutex.RLock()
members := consensus.Decider.Participants()
members := consensus.decider.Participants()
consensus.mutex.RUnlock()
return consensus.readSignatureBitmapPayload(recvPayload, offset, members)
}
@ -334,12 +330,12 @@ func (consensus *Consensus) updateConsensusInformation() Mode {
isFirstTimeStaking := consensus.Blockchain().Config().IsStaking(nextEpoch) &&
curHeader.IsLastBlockInEpoch() && !consensus.Blockchain().Config().IsStaking(curEpoch)
haventUpdatedDecider := consensus.Blockchain().Config().IsStaking(curEpoch) &&
consensus.Decider.Policy() != quorum.SuperMajorityStake
consensus.decider.Policy() != quorum.SuperMajorityStake
// Only happens once, the flip-over to a new Decider policy
if isFirstTimeStaking || haventUpdatedDecider {
decider := quorum.NewDecider(quorum.SuperMajorityStake, consensus.ShardID)
consensus.Decider = decider
consensus.decider = decider
}
var committeeToSet *shard.Committee
@ -412,7 +408,7 @@ func (consensus *Consensus) updateConsensusInformation() Mode {
consensus.updatePublicKeys(pubKeys, shard.Schedule.InstanceForEpoch(nextEpoch).ExternalAllowlist())
// Update voters in the committee
if _, err := consensus.Decider.SetVoters(
if _, err := consensus.decider.SetVoters(
committeeToSet, epochToSet,
); err != nil {
consensus.getLogger().Error().
@ -582,7 +578,7 @@ func (consensus *Consensus) selfCommit(payload []byte) error {
return errGetPreparedBlock
}
aggSig, mask, err := consensus.readSignatureBitmapPayload(payload, 32, consensus.Decider.Participants())
aggSig, mask, err := consensus.readSignatureBitmapPayload(payload, 32, consensus.decider.Participants())
if err != nil {
return errReadBitmapPayload
}
@ -606,7 +602,7 @@ func (consensus *Consensus) selfCommit(payload []byte) error {
continue
}
if _, err := consensus.Decider.AddNewVote(
if _, err := consensus.decider.AddNewVote(
quorum.Commit,
[]*bls_cosi.PublicKeyWrapper{key.Pub},
key.Pri.SignHash(commitPayload),
@ -628,7 +624,7 @@ func (consensus *Consensus) selfCommit(payload []byte) error {
func (consensus *Consensus) NumSignaturesIncludedInBlock(block *types.Block) uint32 {
count := uint32(0)
consensus.mutex.Lock()
members := consensus.Decider.Participants()
members := consensus.decider.Participants()
pubKeys := consensus.getPublicKeys()
consensus.mutex.Unlock()

@ -18,7 +18,7 @@ import (
)
func TestConsensusInitialization(t *testing.T) {
host, multiBLSPrivateKey, consensus, decider, err := GenerateConsensusForTesting()
host, multiBLSPrivateKey, consensus, _, err := GenerateConsensusForTesting()
assert.NoError(t, err)
messageSender := &MessageSender{host: host, retryTimes: int(phaseDuration.Seconds()) / RetryIntervalInSec}
@ -30,7 +30,6 @@ func TestConsensusInitialization(t *testing.T) {
expectedTimeouts[timeoutViewChange] = viewChangeDuration
expectedTimeouts[timeoutBootstrap] = bootstrapDuration
assert.Equal(t, decider, consensus.Decider)
assert.Equal(t, host, consensus.host)
assert.Equal(t, messageSender, consensus.msgSender)

@ -91,7 +91,7 @@ func (consensus *Consensus) HandleMessageUpdate(ctx context.Context, peer libp2p
case t == msg_pb.MessageType_VIEWCHANGE:
fbftMsg, err = ParseViewChangeMessage(msg)
case t == msg_pb.MessageType_NEWVIEW:
members := consensus.Decider.Participants()
members := consensus.decider.Participants()
fbftMsg, err = ParseNewViewMessage(msg, members)
default:
fbftMsg, err = consensus.parseFBFTMessage(msg)
@ -106,7 +106,7 @@ func (consensus *Consensus) HandleMessageUpdate(ctx context.Context, peer libp2p
consensus.isLeader()
// if in backup normal mode, force ignore view change event and leader event.
if consensus.current.Mode() == NormalBackup {
if consensus.isBackup {
canHandleViewChange = false
intendedForLeader = false
}
@ -138,7 +138,7 @@ func (consensus *Consensus) HandleMessageUpdate(ctx context.Context, peer libp2p
}
func (consensus *Consensus) finalCommit() {
numCommits := consensus.Decider.SignersCount(quorum.Commit)
numCommits := consensus.decider.SignersCount(quorum.Commit)
consensus.getLogger().Info().
Int64("NumCommits", numCommits).
@ -178,7 +178,10 @@ func (consensus *Consensus) finalCommit() {
return
}
consensus.getLogger().Info().Hex("new", commitSigAndBitmap).Msg("[finalCommit] Overriding commit signatures!!")
consensus.Blockchain().WriteCommitSig(block.NumberU64(), commitSigAndBitmap)
if err := consensus.Blockchain().WriteCommitSig(block.NumberU64(), commitSigAndBitmap); err != nil {
consensus.getLogger().Warn().Err(err).Msg("[finalCommit] failed writting commit sig")
}
// Send committed message before block insertion.
// if leader successfully finalizes the block, send committed message to validators
@ -441,7 +444,7 @@ func (consensus *Consensus) BlockChannel(newBlock *types.Block) {
Int("numTxs", len(newBlock.Transactions())).
Int("numStakingTxs", len(newBlock.StakingTransactions())).
Time("startTime", startTime).
Int64("publicKeys", consensus.Decider.ParticipantsCount()).
Int64("publicKeys", consensus.decider.ParticipantsCount()).
Msg("[ConsensusMainLoop] STARTING CONSENSUS")
consensus.announce(newBlock)
})
@ -679,7 +682,7 @@ func (consensus *Consensus) commitBlock(blk *types.Block, committedMsg *FBFTMess
// rotateLeader rotates the leader to the next leader in the committee.
// This function must be called with enabled leader rotation.
func (consensus *Consensus) rotateLeader(epoch *big.Int) *bls.PublicKeyWrapper {
func (consensus *Consensus) rotateLeader(epoch *big.Int, defaultKey *bls.PublicKeyWrapper) *bls.PublicKeyWrapper {
var (
bc = consensus.Blockchain()
leader = consensus.getLeaderPubKey()
@ -687,31 +690,32 @@ func (consensus *Consensus) rotateLeader(epoch *big.Int) *bls.PublicKeyWrapper {
curNumber = curBlock.NumberU64()
curEpoch = curBlock.Epoch().Uint64()
)
if epoch.Uint64() != curEpoch {
return defaultKey
}
const blocksCountAliveness = 4
utils.Logger().Info().Msgf("[Rotating leader] epoch: %v rotation:%v external rotation %v", epoch.Uint64(), bc.Config().IsLeaderRotationInternalValidators(epoch), bc.Config().IsLeaderRotationExternalValidatorsAllowed(epoch))
ss, err := bc.ReadShardState(epoch)
if err != nil {
utils.Logger().Error().Err(err).Msg("Failed to read shard state")
return nil
return defaultKey
}
committee, err := ss.FindCommitteeByID(consensus.ShardID)
if err != nil {
utils.Logger().Error().Err(err).Msg("Failed to find committee")
return nil
return defaultKey
}
slotsCount := len(committee.Slots)
blocksPerEpoch := shard.Schedule.InstanceForEpoch(epoch).BlocksPerEpoch()
if blocksPerEpoch == 0 {
utils.Logger().Error().Msg("[Rotating leader] blocks per epoch is 0")
return nil
return defaultKey
}
if slotsCount == 0 {
utils.Logger().Error().Msg("[Rotating leader] slots count is 0")
return nil
return defaultKey
}
numBlocksProducedByLeader := blocksPerEpoch / uint64(slotsCount)
rest := blocksPerEpoch % uint64(slotsCount)
const minimumBlocksForLeaderInRow = blocksCountAliveness
if numBlocksProducedByLeader < minimumBlocksForLeaderInRow {
// mine no less than 3 blocks in a row
@ -720,15 +724,11 @@ func (consensus *Consensus) rotateLeader(epoch *big.Int) *bls.PublicKeyWrapper {
s := bc.LeaderRotationMeta()
if !bytes.Equal(leader.Bytes[:], s.Pub) {
// Another leader.
return nil
}
// If it is the first validator producing blocks, it should also produce the remaining 'rest' of the blocks.
if s.Shifts == 0 {
numBlocksProducedByLeader += rest
return defaultKey
}
if s.Count < numBlocksProducedByLeader {
// Not enough blocks produced by the leader, continue producing by the same leader.
return nil
return defaultKey
}
// Passed all checks, we can change leader.
// NthNext will move the leader to the next leader in the committee.
@ -741,23 +741,23 @@ func (consensus *Consensus) rotateLeader(epoch *big.Int) *bls.PublicKeyWrapper {
for i := 0; i < len(committee.Slots); i++ {
if bc.Config().IsLeaderRotationExternalValidatorsAllowed(epoch) {
wasFound, next = consensus.Decider.NthNextValidator(committee.Slots, leader, offset)
wasFound, next = consensus.decider.NthNextValidator(committee.Slots, leader, offset)
} else {
wasFound, next = consensus.Decider.NthNextHmy(shard.Schedule.InstanceForEpoch(epoch), leader, offset)
wasFound, next = consensus.decider.NthNextHmy(shard.Schedule.InstanceForEpoch(epoch), leader, offset)
}
if !wasFound {
utils.Logger().Error().Msg("Failed to get next leader")
// Seems like nothing we can do here.
return nil
return defaultKey
}
members := consensus.Decider.Participants()
members := consensus.decider.Participants()
mask := bls.NewMask(members)
skipped := 0
for i := 0; i < blocksCountAliveness; i++ {
header := bc.GetHeaderByNumber(curNumber - uint64(i))
if header == nil {
utils.Logger().Error().Msgf("Failed to get header by number %d", curNumber-uint64(i))
return nil
return defaultKey
}
// if epoch is different, we should not check this block.
if header.Epoch().Uint64() != curEpoch {
@ -767,12 +767,12 @@ func (consensus *Consensus) rotateLeader(epoch *big.Int) *bls.PublicKeyWrapper {
err = mask.SetMask(header.LastCommitBitmap())
if err != nil {
utils.Logger().Err(err).Msg("Failed to set mask")
return nil
return defaultKey
}
ok, err := mask.KeyEnabled(next.Bytes)
if err != nil {
utils.Logger().Err(err).Msg("Failed to get key enabled")
return nil
return defaultKey
}
if !ok {
skipped++
@ -787,14 +787,13 @@ func (consensus *Consensus) rotateLeader(epoch *big.Int) *bls.PublicKeyWrapper {
}
return next
}
return nil
return defaultKey
}
// SetupForNewConsensus sets the state for new consensus
func (consensus *Consensus) setupForNewConsensus(blk *types.Block, committedMsg *FBFTMessage) {
atomic.StoreUint64(&consensus.blockNum, blk.NumberU64()+1)
consensus.setCurBlockViewID(committedMsg.ViewID + 1)
consensus.LeaderPubKey = committedMsg.SenderPubkeys[0]
var epoch *big.Int
if blk.IsLastBlockInEpoch() {
epoch = new(big.Int).Add(blk.Epoch(), common.Big1)
@ -802,9 +801,14 @@ func (consensus *Consensus) setupForNewConsensus(blk *types.Block, committedMsg
epoch = blk.Epoch()
}
if consensus.Blockchain().Config().IsLeaderRotationInternalValidators(epoch) {
if next := consensus.rotateLeader(epoch); next != nil {
if next := consensus.rotateLeader(epoch, committedMsg.SenderPubkeys[0]); next != nil {
prev := consensus.getLeaderPubKey()
consensus.setLeaderPubKey(next)
if consensus.isLeader() {
utils.Logger().Info().Msgf("We are block %d, I am the new leader %s", blk.NumberU64(), next.Bytes.Hex())
} else {
utils.Logger().Info().Msgf("We are block %d, the leader is %s", blk.NumberU64(), next.Bytes.Hex())
}
if consensus.isLeader() && !consensus.getLeaderPubKey().Object.IsEqual(prev.Object) {
// leader changed
blockPeriod := consensus.BlockPeriod

@ -4,15 +4,13 @@ import (
"bytes"
"errors"
protobuf "github.com/golang/protobuf/proto"
"github.com/harmony-one/harmony/crypto/bls"
bls_core "github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/api/proto"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/consensus/quorum"
"github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/internal/utils"
protobuf "google.golang.org/protobuf/proto"
)
// NetworkMessage is a message intended to be
@ -82,7 +80,7 @@ func (consensus *Consensus) construct(
)
} else {
// TODO: use a persistent bitmap to report bitmap
mask := bls.NewMask(consensus.Decider.Participants())
mask := bls.NewMask(consensus.decider.Participants())
for _, key := range priKeys {
mask.SetKey(key.Pub.Bytes, true)
}
@ -161,7 +159,7 @@ func (consensus *Consensus) construct(
func (consensus *Consensus) constructQuorumSigAndBitmap(p quorum.Phase) []byte {
buffer := bytes.Buffer{}
// 96 bytes aggregated signature
aggSig := consensus.Decider.AggregateVotes(p)
aggSig := consensus.decider.AggregateVotes(p)
buffer.Write(aggSig.Serialize())
// Bitmap
if p == quorum.Prepare {

@ -81,7 +81,7 @@ func TestConstructPreparedMessage(test *testing.T) {
validatorKey := bls.SerializedPublicKey{}
validatorKey.FromLibBLSPublicKey(validatorPubKey)
validatorKeyWrapper := bls.PublicKeyWrapper{Object: validatorPubKey, Bytes: validatorKey}
consensus.Decider.AddNewVote(
consensus.Decider().AddNewVote(
quorum.Prepare,
[]*bls.PublicKeyWrapper{&leaderKeyWrapper},
leaderPriKey.Sign(message),
@ -89,7 +89,7 @@ func TestConstructPreparedMessage(test *testing.T) {
consensus.BlockNum(),
consensus.GetCurBlockViewID(),
)
if _, err := consensus.Decider.AddNewVote(
if _, err := consensus.Decider().AddNewVote(
quorum.Prepare,
[]*bls.PublicKeyWrapper{&validatorKeyWrapper},
validatorPriKey.Sign(message),

@ -17,7 +17,7 @@ func (consensus *Consensus) checkDoubleSign(recvMsg *FBFTMessage) bool {
if consensus.couldThisBeADoubleSigner(recvMsg) {
addrSet := map[common.Address]struct{}{}
for _, pubKey2 := range recvMsg.SenderPubkeys {
if alreadyCastBallot := consensus.Decider.ReadBallot(
if alreadyCastBallot := consensus.decider.ReadBallot(
quorum.Commit, pubKey2.Bytes,
); alreadyCastBallot != nil {
for _, pubKey1 := range alreadyCastBallot.SignerPubKeys {

@ -14,8 +14,6 @@ const (
Syncing
// Listening ..
Listening
// NormalBackup Backup Node ..
NormalBackup
)
// FBFTPhase : different phases of consensus
@ -34,7 +32,6 @@ var (
ViewChanging: "ViewChanging",
Syncing: "Syncing",
Listening: "Listening",
NormalBackup: "NormalBackup",
}
phaseNames = map[FBFTPhase]string{
FBFTAnnounce: "Announce",

@ -62,7 +62,7 @@ func (consensus *Consensus) announce(block *types.Block) {
continue
}
if _, err := consensus.Decider.AddNewVote(
if _, err := consensus.decider.AddNewVote(
quorum.Prepare,
[]*bls.PublicKeyWrapper{key.Pub},
key.Pri.SignHash(consensus.blockHash[:]),
@ -112,7 +112,7 @@ func (consensus *Consensus) onPrepare(recvMsg *FBFTMessage) {
prepareBitmap := consensus.prepareBitmap
// proceed only when the message is not received before
for _, signer := range recvMsg.SenderPubkeys {
signed := consensus.Decider.ReadBallot(quorum.Prepare, signer.Bytes)
signed := consensus.decider.ReadBallot(quorum.Prepare, signer.Bytes)
if signed != nil {
consensus.getLogger().Debug().
Str("validatorPubKey", signer.Bytes.Hex()).
@ -121,16 +121,18 @@ func (consensus *Consensus) onPrepare(recvMsg *FBFTMessage) {
}
}
if consensus.Decider.IsQuorumAchieved(quorum.Prepare) {
if consensus.decider.IsQuorumAchieved(quorum.Prepare) {
// already have enough signatures
consensus.getLogger().Debug().
Interface("validatorPubKeys", recvMsg.SenderPubkeys).
Msg("[OnPrepare] Received Additional Prepare Message")
return
}
signerCount := consensus.Decider.SignersCount(quorum.Prepare)
signerCount := consensus.decider.SignersCount(quorum.Prepare)
//// Read - End
consensus.UpdateLeaderMetrics(float64(signerCount), float64(consensus.getBlockNum()))
// Check BLS signature for the multi-sig
prepareSig := recvMsg.Payload
var sign bls_core.Sign
@ -161,11 +163,11 @@ func (consensus *Consensus) onPrepare(recvMsg *FBFTMessage) {
consensus.getLogger().Debug().
Int64("NumReceivedSoFar", signerCount).
Int64("PublicKeys", consensus.Decider.ParticipantsCount()).
Int64("PublicKeys", consensus.decider.ParticipantsCount()).
Msg("[OnPrepare] Received New Prepare Signature")
//// Write - Start
if _, err := consensus.Decider.AddNewVote(
if _, err := consensus.decider.AddNewVote(
quorum.Prepare, recvMsg.SenderPubkeys,
&sign, recvMsg.BlockHash,
recvMsg.BlockNum, recvMsg.ViewID,
@ -181,7 +183,7 @@ func (consensus *Consensus) onPrepare(recvMsg *FBFTMessage) {
//// Write - End
//// Read - Start
if consensus.Decider.IsQuorumAchieved(quorum.Prepare) {
if consensus.decider.IsQuorumAchieved(quorum.Prepare) {
// NOTE Let it handle its own logs
if err := consensus.didReachPrepareQuorum(); err != nil {
return
@ -199,7 +201,7 @@ func (consensus *Consensus) onCommit(recvMsg *FBFTMessage) {
}
// proceed only when the message is not received before
for _, signer := range recvMsg.SenderPubkeys {
signed := consensus.Decider.ReadBallot(quorum.Commit, signer.Bytes)
signed := consensus.decider.ReadBallot(quorum.Commit, signer.Bytes)
if signed != nil {
consensus.getLogger().Debug().
Str("validatorPubKey", signer.Bytes.Hex()).
@ -211,9 +213,9 @@ func (consensus *Consensus) onCommit(recvMsg *FBFTMessage) {
commitBitmap := consensus.commitBitmap
// has to be called before verifying signature
quorumWasMet := consensus.Decider.IsQuorumAchieved(quorum.Commit)
quorumWasMet := consensus.decider.IsQuorumAchieved(quorum.Commit)
signerCount := consensus.Decider.SignersCount(quorum.Commit)
signerCount := consensus.decider.SignersCount(quorum.Commit)
//// Read - End
// Verify the signature on commitPayload is correct
@ -267,7 +269,7 @@ func (consensus *Consensus) onCommit(recvMsg *FBFTMessage) {
return
}
*/
if _, err := consensus.Decider.AddNewVote(
if _, err := consensus.decider.AddNewVote(
quorum.Commit, recvMsg.SenderPubkeys,
&sign, recvMsg.BlockHash,
recvMsg.BlockNum, recvMsg.ViewID,
@ -285,15 +287,7 @@ func (consensus *Consensus) onCommit(recvMsg *FBFTMessage) {
//// Read - Start
viewID := consensus.getCurBlockViewID()
if consensus.Decider.IsAllSigsCollected() {
logger.Info().Msg("[OnCommit] 100% Enough commits received")
consensus.finalCommit()
consensus.msgSender.StopRetry(msg_pb.MessageType_PREPARED)
return
}
quorumIsMet := consensus.Decider.IsQuorumAchieved(quorum.Commit)
quorumIsMet := consensus.decider.IsQuorumAchieved(quorum.Commit)
//// Read - End
if !quorumWasMet && quorumIsMet {

@ -0,0 +1,179 @@
package quorum
import (
"math/big"
"sync"
"github.com/ethereum/go-ethereum/common"
bls_core "github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/consensus/votepower"
"github.com/harmony-one/harmony/crypto/bls"
shardingconfig "github.com/harmony-one/harmony/internal/configs/sharding"
"github.com/harmony-one/harmony/multibls"
"github.com/harmony-one/harmony/numeric"
"github.com/harmony-one/harmony/shard"
)
var _ Decider = threadSafeDeciderImpl{}
type threadSafeDeciderImpl struct {
mu *sync.RWMutex
decider Decider
}
func NewThreadSafeDecider(decider Decider, mu *sync.RWMutex) Decider {
return threadSafeDeciderImpl{
mu: mu,
decider: decider,
}
}
func (a threadSafeDeciderImpl) String() string {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.String()
}
func (a threadSafeDeciderImpl) Participants() multibls.PublicKeys {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.Participants()
}
func (a threadSafeDeciderImpl) IndexOf(key bls.SerializedPublicKey) int {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.IndexOf(key)
}
func (a threadSafeDeciderImpl) ParticipantsCount() int64 {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.ParticipantsCount()
}
func (a threadSafeDeciderImpl) NthNextValidator(slotList shard.SlotList, pubKey *bls.PublicKeyWrapper, next int) (bool, *bls.PublicKeyWrapper) {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.NthNextValidator(slotList, pubKey, next)
}
func (a threadSafeDeciderImpl) NthNextHmy(instance shardingconfig.Instance, pubkey *bls.PublicKeyWrapper, next int) (bool, *bls.PublicKeyWrapper) {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.NthNextHmy(instance, pubkey, next)
}
func (a threadSafeDeciderImpl) NthNextHmyExt(instance shardingconfig.Instance, wrapper *bls.PublicKeyWrapper, i int) (bool, *bls.PublicKeyWrapper) {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.NthNextHmyExt(instance, wrapper, i)
}
func (a threadSafeDeciderImpl) FirstParticipant(instance shardingconfig.Instance) *bls.PublicKeyWrapper {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.FirstParticipant(instance)
}
func (a threadSafeDeciderImpl) UpdateParticipants(pubKeys, allowlist []bls.PublicKeyWrapper) {
a.mu.Lock()
defer a.mu.Unlock()
a.decider.UpdateParticipants(pubKeys, allowlist)
}
func (a threadSafeDeciderImpl) submitVote(p Phase, pubkeys []bls.SerializedPublicKey, sig *bls_core.Sign, headerHash common.Hash, height, viewID uint64) (*votepower.Ballot, error) {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.submitVote(p, pubkeys, sig, headerHash, height, viewID)
}
func (a threadSafeDeciderImpl) SignersCount(phase Phase) int64 {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.SignersCount(phase)
}
func (a threadSafeDeciderImpl) reset(phases []Phase) {
a.mu.Lock()
defer a.mu.Unlock()
a.decider.reset(phases)
}
func (a threadSafeDeciderImpl) ReadBallot(p Phase, pubkey bls.SerializedPublicKey) *votepower.Ballot {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.ReadBallot(p, pubkey)
}
func (a threadSafeDeciderImpl) TwoThirdsSignersCount() int64 {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.TwoThirdsSignersCount()
}
func (a threadSafeDeciderImpl) AggregateVotes(p Phase) *bls_core.Sign {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.AggregateVotes(p)
}
func (a threadSafeDeciderImpl) SetVoters(subCommittee *shard.Committee, epoch *big.Int) (*TallyResult, error) {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.SetVoters(subCommittee, epoch)
}
func (a threadSafeDeciderImpl) Policy() Policy {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.Policy()
}
func (a threadSafeDeciderImpl) AddNewVote(p Phase, pubkeys []*bls.PublicKeyWrapper, sig *bls_core.Sign, headerHash common.Hash, height, viewID uint64) (*votepower.Ballot, error) {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.AddNewVote(p, pubkeys, sig, headerHash, height, viewID)
}
func (a threadSafeDeciderImpl) IsQuorumAchievedByMask(mask *bls.Mask) bool {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.IsQuorumAchievedByMask(mask)
}
func (a threadSafeDeciderImpl) QuorumThreshold() numeric.Dec {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.QuorumThreshold()
}
func (a threadSafeDeciderImpl) IsAllSigsCollected() bool {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.IsAllSigsCollected()
}
func (a threadSafeDeciderImpl) ResetPrepareAndCommitVotes() {
a.mu.Lock()
defer a.mu.Unlock()
a.decider.ResetPrepareAndCommitVotes()
}
func (a threadSafeDeciderImpl) ResetViewChangeVotes() {
a.mu.Lock()
defer a.mu.Unlock()
a.decider.ResetViewChangeVotes()
}
func (a threadSafeDeciderImpl) CurrentTotalPower(p Phase) (*numeric.Dec, error) {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.CurrentTotalPower(p)
}
func (a threadSafeDeciderImpl) IsQuorumAchieved(p Phase) bool {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.IsQuorumAchieved(p)
}

@ -57,7 +57,7 @@ func (consensus *Consensus) didReachPrepareQuorum() error {
continue
}
if _, err := consensus.Decider.AddNewVote(
if _, err := consensus.decider.AddNewVote(
quorum.Commit,
[]*bls.PublicKeyWrapper{key.Pub},
key.Pri.SignHash(commitPayload),

@ -133,7 +133,7 @@ func (consensus *Consensus) validateNewBlock(recvMsg *FBFTMessage) (*types.Block
}
func (consensus *Consensus) prepare() {
if consensus.IsBackup() {
if consensus.isBackup {
return
}
@ -152,7 +152,7 @@ func (consensus *Consensus) prepare() {
// sendCommitMessages send out commit messages to leader
func (consensus *Consensus) sendCommitMessages(blockObj *types.Block) {
if consensus.IsBackup() || blockObj == nil {
if consensus.isBackup || blockObj == nil {
return
}
@ -199,12 +199,12 @@ func (consensus *Consensus) onPrepared(recvMsg *FBFTMessage) {
// check validity of prepared signature
blockHash := recvMsg.BlockHash
aggSig, mask, err := consensus.readSignatureBitmapPayload(recvMsg.Payload, 0, consensus.Decider.Participants())
aggSig, mask, err := consensus.readSignatureBitmapPayload(recvMsg.Payload, 0, consensus.decider.Participants())
if err != nil {
consensus.getLogger().Error().Err(err).Msg("ReadSignatureBitmapPayload failed!")
return
}
if !consensus.Decider.IsQuorumAchievedByMask(mask) {
if !consensus.decider.IsQuorumAchievedByMask(mask) {
consensus.getLogger().Warn().Msgf("[OnPrepared] Quorum Not achieved.")
return
}
@ -335,7 +335,7 @@ func (consensus *Consensus) onCommitted(recvMsg *FBFTMessage) {
return
}
aggSig, mask, err := chain.DecodeSigBitmap(sigBytes, bitmap, consensus.Decider.Participants())
aggSig, mask, err := chain.DecodeSigBitmap(sigBytes, bitmap, consensus.decider.Participants())
if err != nil {
consensus.getLogger().Error().Err(err).Msg("[OnCommitted] readSignatureBitmapPayload failed")
return

@ -33,8 +33,6 @@ type State struct {
// view changing id is used during view change mode
// it is the next view id
viewChangingID uint64
isBackup bool
}
// Mode return the current node mode
@ -44,10 +42,6 @@ func (pm *State) Mode() Mode {
// SetMode set the node mode as required
func (pm *State) SetMode(s Mode) {
if s == Normal && pm.isBackup {
s = NormalBackup
}
pm.mode = s
}
@ -81,10 +75,6 @@ func (pm *State) GetViewChangeDuraion() time.Duration {
return time.Duration(diff * diff * int64(viewChangeDuration))
}
func (pm *State) SetIsBackup(isBackup bool) {
pm.isBackup = isBackup
}
// fallbackNextViewID return the next view ID and duration when there is an exception
// to calculate the time-based viewId
func (consensus *Consensus) fallbackNextViewID() (uint64, time.Duration) {
@ -187,7 +177,7 @@ func (consensus *Consensus) getNextLeaderKey(viewID uint64, committee *shard.Com
// it can still sync with other validators.
if curHeader.IsLastBlockInEpoch() {
consensus.getLogger().Info().Msg("[getNextLeaderKey] view change in the first block of new epoch")
lastLeaderPubKey = consensus.Decider.FirstParticipant(shard.Schedule.InstanceForEpoch(epoch))
lastLeaderPubKey = consensus.decider.FirstParticipant(shard.Schedule.InstanceForEpoch(epoch))
}
}
}
@ -204,18 +194,18 @@ func (consensus *Consensus) getNextLeaderKey(viewID uint64, committee *shard.Com
var next *bls.PublicKeyWrapper
if blockchain != nil && blockchain.Config().IsLeaderRotationInternalValidators(epoch) {
if blockchain.Config().IsLeaderRotationExternalValidatorsAllowed(epoch) {
wasFound, next = consensus.Decider.NthNextValidator(
wasFound, next = consensus.decider.NthNextValidator(
committee.Slots,
lastLeaderPubKey,
gap)
} else {
wasFound, next = consensus.Decider.NthNextHmy(
wasFound, next = consensus.decider.NthNextHmy(
shard.Schedule.InstanceForEpoch(epoch),
lastLeaderPubKey,
gap)
}
} else {
wasFound, next = consensus.Decider.NthNextHmy(
wasFound, next = consensus.decider.NthNextHmy(
shard.Schedule.InstanceForEpoch(epoch),
lastLeaderPubKey,
gap)
@ -281,7 +271,7 @@ func (consensus *Consensus) startViewChange() {
defer consensus.consensusTimeout[timeoutViewChange].Start()
// update the dictionary key if the viewID is first time received
members := consensus.Decider.Participants()
members := consensus.decider.Participants()
consensus.vc.AddViewIDKeyIfNotExist(nextViewID, members)
// init my own payload
@ -386,10 +376,10 @@ func (consensus *Consensus) onViewChange(recvMsg *FBFTMessage) {
return
}
if consensus.Decider.IsQuorumAchievedByMask(consensus.vc.GetViewIDBitmap(recvMsg.ViewID)) {
if consensus.decider.IsQuorumAchievedByMask(consensus.vc.GetViewIDBitmap(recvMsg.ViewID)) {
consensus.getLogger().Info().
Int64("have", consensus.Decider.SignersCount(quorum.ViewChange)).
Int64("need", consensus.Decider.TwoThirdsSignersCount()).
Int64("have", consensus.decider.SignersCount(quorum.ViewChange)).
Int64("need", consensus.decider.TwoThirdsSignersCount()).
Interface("SenderPubkeys", recvMsg.SenderPubkeys).
Str("newLeaderKey", newLeaderKey.Bytes.Hex()).
Msg("[onViewChange] Received Enough View Change Messages")
@ -404,7 +394,7 @@ func (consensus *Consensus) onViewChange(recvMsg *FBFTMessage) {
senderKey := recvMsg.SenderPubkeys[0]
// update the dictionary key if the viewID is first time received
members := consensus.Decider.Participants()
members := consensus.decider.Participants()
consensus.vc.AddViewIDKeyIfNotExist(recvMsg.ViewID, members)
// do it once only per viewID/Leader
@ -420,7 +410,7 @@ func (consensus *Consensus) onViewChange(recvMsg *FBFTMessage) {
return
}
err = consensus.vc.ProcessViewChangeMsg(consensus.fBFTLog, consensus.Decider, recvMsg, consensus.verifyBlock)
err = consensus.vc.ProcessViewChangeMsg(consensus.fBFTLog, consensus.decider, recvMsg, consensus.verifyBlock)
if err != nil {
consensus.getLogger().Error().Err(err).
Uint64("viewID", recvMsg.ViewID).
@ -431,7 +421,7 @@ func (consensus *Consensus) onViewChange(recvMsg *FBFTMessage) {
}
// received enough view change messages, change state to normal consensus
if consensus.Decider.IsQuorumAchievedByMask(consensus.vc.GetViewIDBitmap(recvMsg.ViewID)) && consensus.isViewChangingMode() {
if consensus.decider.IsQuorumAchievedByMask(consensus.vc.GetViewIDBitmap(recvMsg.ViewID)) && consensus.isViewChangingMode() {
// no previous prepared message, go straight to normal mode
// and start proposing new block
if consensus.vc.IsM1PayloadEmpty() {
@ -495,7 +485,7 @@ func (consensus *Consensus) onNewView(recvMsg *FBFTMessage) {
}
m3Mask := recvMsg.M3Bitmap
if !consensus.Decider.IsQuorumAchievedByMask(m3Mask) {
if !consensus.decider.IsQuorumAchievedByMask(m3Mask) {
consensus.getLogger().Warn().
Msgf("[onNewView] Quorum Not achieved")
return
@ -507,7 +497,7 @@ func (consensus *Consensus) onNewView(recvMsg *FBFTMessage) {
utils.CountOneBits(m3Mask.Bitmap) > utils.CountOneBits(m2Mask.Bitmap)) {
// m1 is not empty, check it's valid
blockHash := recvMsg.Payload[:32]
aggSig, mask, err := consensus.readSignatureBitmapPayload(recvMsg.Payload, 32, consensus.Decider.Participants())
aggSig, mask, err := consensus.readSignatureBitmapPayload(recvMsg.Payload, 32, consensus.decider.Participants())
if err != nil {
consensus.getLogger().Error().Err(err).
Msg("[onNewView] ReadSignatureBitmapPayload Failed")
@ -584,5 +574,5 @@ func (consensus *Consensus) resetViewChangeState() {
Msg("[ResetViewChangeState] Resetting view change state")
consensus.current.SetMode(Normal)
consensus.vc.Reset()
consensus.Decider.ResetViewChangeVotes()
consensus.decider.ResetViewChangeVotes()
}

@ -465,7 +465,7 @@ func (vc *viewChange) InitPayload(
if !inited {
viewIDBytes := make([]byte, 8)
binary.LittleEndian.PutUint64(viewIDBytes, viewID)
vc.getLogger().Info().Uint64("viewID", viewID).Uint64("blockNum", blockNum).Msg("[InitPayload] add my M3 (ViewID) type messaage")
vc.getLogger().Info().Uint64("viewID", viewID).Uint64("blockNum", blockNum).Msg("[InitPayload] add my M3 (ViewID) type message")
for _, key := range privKeys {
if _, ok := vc.viewIDBitmap[viewID]; !ok {
viewIDBitmap := bls_cosi.NewMask(members)

@ -94,7 +94,7 @@ func TestGetNextLeaderKeyShouldSucceed(t *testing.T) {
_, _, consensus, _, err := GenerateConsensusForTesting()
assert.NoError(t, err)
assert.Equal(t, int64(0), consensus.Decider.ParticipantsCount())
assert.Equal(t, int64(0), consensus.Decider().ParticipantsCount())
blsKeys := []*bls_core.PublicKey{}
wrappedBLSKeys := []bls.PublicKeyWrapper{}
@ -111,8 +111,8 @@ func TestGetNextLeaderKeyShouldSucceed(t *testing.T) {
wrappedBLSKeys = append(wrappedBLSKeys, wrapped)
}
consensus.Decider.UpdateParticipants(wrappedBLSKeys, []bls.PublicKeyWrapper{})
assert.Equal(t, keyCount, consensus.Decider.ParticipantsCount())
consensus.Decider().UpdateParticipants(wrappedBLSKeys, []bls.PublicKeyWrapper{})
assert.Equal(t, keyCount, consensus.Decider().ParticipantsCount())
consensus.LeaderPubKey = &wrappedBLSKeys[0]
nextKey := consensus.getNextLeaderKey(uint64(1), nil)

@ -56,7 +56,7 @@ func NewBlockValidator(blockchain BlockChain) *BlockValidator {
func (v *BlockValidator) ValidateBody(block *types.Block) error {
// Check whether the block's known, and if not, that it's linkable
if v.bc.HasBlockAndState(block.Hash(), block.NumberU64()) {
return ErrKnownBlock
return errors.WithMessage(ErrKnownBlock, "validate body: has block and state")
}
if !v.bc.HasBlockAndState(block.ParentHash(), block.NumberU64()-1) {
if !v.bc.HasBlock(block.ParentHash(), block.NumberU64()-1) {

@ -31,6 +31,8 @@ import (
"sync/atomic"
"time"
"github.com/pkg/errors"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/common/prque"
@ -66,7 +68,6 @@ import (
"github.com/harmony-one/harmony/staking/slash"
staking "github.com/harmony-one/harmony/staking/types"
lru "github.com/hashicorp/golang-lru"
"github.com/pkg/errors"
)
var (
@ -353,7 +354,11 @@ func newBlockChainWithOptions(
NoBuild: bc.cacheConfig.SnapshotNoBuild,
AsyncBuild: !bc.cacheConfig.SnapshotWait,
}
bc.snaps, _ = snapshot.New(snapconfig, bc.db, bc.triedb, head.Hash())
fmt.Println("loading/generating snapshot...")
utils.Logger().Info().
Str("Root", head.Root().Hex()).
Msg("loading/generating snapshot")
bc.snaps, _ = snapshot.New(snapconfig, bc.db, bc.triedb, head.Root())
}
curHeader := bc.CurrentBlock().Header()
@ -781,6 +786,20 @@ func (bc *BlockChainImpl) resetWithGenesisBlock(genesis *types.Block) error {
return nil
}
func (bc *BlockChainImpl) repairRecreateStateTries(head **types.Block) error {
for {
blk := bc.GetBlockByNumber((*head).NumberU64() + 1)
if blk != nil {
_, _, _, err := bc.insertChain([]*types.Block{blk}, true)
if err != nil {
return err
}
*head = blk
continue
}
}
}
// repair tries to repair the current blockchain by rolling back the current block
// until one with associated state is found. This is needed to fix incomplete db
// writes caused either by crashes/power outages, or simply non-committed tries.
@ -788,6 +807,16 @@ func (bc *BlockChainImpl) resetWithGenesisBlock(genesis *types.Block) error {
// This method only rolls back the current block. The current header and current
// fast block are left intact.
func (bc *BlockChainImpl) repair(head **types.Block) error {
if err := bc.repairValidatorsAndCommitSigs(head); err != nil {
return errors.WithMessage(err, "failed to repair validators and commit sigs")
}
if err := bc.repairRecreateStateTries(head); err != nil {
return errors.WithMessage(err, "failed to recreate state tries")
}
return nil
}
func (bc *BlockChainImpl) repairValidatorsAndCommitSigs(head **types.Block) error {
valsToRemove := map[common.Address]struct{}{}
for {
// Abort if we've rewound to a head block that does have associated state
@ -796,6 +825,9 @@ func (bc *BlockChainImpl) repair(head **types.Block) error {
Str("number", (*head).Number().String()).
Str("hash", (*head).Hash().Hex()).
Msg("Rewound blockchain to past state")
if err := rawdb.WriteHeadBlockHash(bc.db, (*head).Hash()); err != nil {
return errors.WithMessagef(err, "failed to write head block hash number %d", (*head).NumberU64())
}
return bc.removeInValidatorList(valsToRemove)
}
// Repair last commit sigs
@ -803,6 +835,14 @@ func (bc *BlockChainImpl) repair(head **types.Block) error {
sigAndBitMap := append(lastSig[:], (*head).Header().LastCommitBitmap()...)
bc.WriteCommitSig((*head).NumberU64()-1, sigAndBitMap)
err := rawdb.DeleteBlock(bc.db, (*head).Hash(), (*head).NumberU64())
if err != nil {
return errors.WithMessagef(err, "failed to delete block %d", (*head).NumberU64())
}
if err := rawdb.WriteHeadBlockHash(bc.db, (*head).ParentHash()); err != nil {
return errors.WithMessagef(err, "failed to write head block hash number %d", (*head).NumberU64()-1)
}
// Otherwise rewind one block and recheck state availability there
for _, stkTxn := range (*head).StakingTransactions() {
if stkTxn.StakingType() == staking.DirectiveCreateValidator {
@ -899,6 +939,9 @@ func (bc *BlockChainImpl) writeHeadBlock(block *types.Block) error {
if err := rawdb.WriteHeadHeaderHash(batch, block.Hash()); err != nil {
return err
}
if err := rawdb.WriteHeaderNumber(batch, block.Hash(), block.NumberU64()); err != nil {
return err
}
isNewEpoch := block.IsLastBlockInEpoch()
if isNewEpoch {
@ -1731,21 +1774,16 @@ func (bc *BlockChainImpl) insertChain(chain types.Blocks, verifyHeaders bool) (i
err = NewBlockValidator(bc).ValidateBody(block)
}
switch {
case err == ErrKnownBlock:
// Block and state both already known. However if the current block is below
// this number we did a rollback and we should reimport it nonetheless.
if bc.CurrentBlock().NumberU64() >= block.NumberU64() {
stats.ignored++
continue
}
case errors.Is(err, ErrKnownBlock):
return i, events, coalescedLogs, err
case err == consensus_engine.ErrFutureBlock:
return i, events, coalescedLogs, err
case err == consensus_engine.ErrUnknownAncestor:
case errors.Is(err, consensus_engine.ErrUnknownAncestor):
return i, events, coalescedLogs, err
case err == consensus_engine.ErrPrunedAncestor:
case errors.Is(err, consensus_engine.ErrPrunedAncestor):
// TODO: add fork choice mechanism
// Block competing with the canonical chain, store in the db, but don't process
// until the competitor TD goes above the canonical TD
@ -1772,9 +1810,7 @@ func (bc *BlockChainImpl) insertChain(chain types.Blocks, verifyHeaders bool) (i
// Prune in case non-empty winner chain
if len(winner) > 0 {
// Import all the pruned blocks to make the state available
bc.chainmu.Unlock()
_, evs, logs, err := bc.insertChain(winner, true /* verifyHeaders */)
bc.chainmu.Lock()
events, coalescedLogs = evs, logs
if err != nil {
@ -1909,10 +1945,10 @@ func (bc *BlockChainImpl) insertChain(chain types.Blocks, verifyHeaders bool) (i
// insertStats tracks and reports on block insertion.
type insertStats struct {
queued, processed, ignored int
usedGas uint64
lastIndex int
startTime mclock.AbsTime
queued, processed int
usedGas uint64
lastIndex int
startTime mclock.AbsTime
}
// statsReportLimit is the time limit during import and export after which we
@ -1951,9 +1987,6 @@ func (st *insertStats) report(chain []*types.Block, index int, cache common.Stor
if st.queued > 0 {
context = context.Int("queued", st.queued)
}
if st.ignored > 0 {
context = context.Int("ignored", st.ignored)
}
logger := context.Logger()
logger.Info().Msg("Imported new chain segment")

@ -14,10 +14,9 @@ import (
// LeaderRotationMeta contains information about leader rotation
type LeaderRotationMeta struct {
Pub []byte // bls public key of previous block miner
Epoch uint64 // epoch number of previously inserted block
Count uint64 // quantity of continuous blocks inserted by the same leader
Shifts uint64 // number of leader shifts, shift happens when leader changes
Pub []byte // bls public key of previous block miner
Epoch uint64 // epoch number of previously inserted block
Count uint64 // quantity of continuous blocks inserted by the same leader
}
// ShortString returns string representation of the struct
@ -28,8 +27,6 @@ func (a LeaderRotationMeta) ShortString() string {
s.WriteString(strconv.FormatUint(a.Epoch, 10))
s.WriteString(" ")
s.WriteString(strconv.FormatUint(a.Count, 10))
s.WriteString(" ")
s.WriteString(strconv.FormatUint(a.Shifts, 10))
return s.String()
}
@ -39,17 +36,15 @@ func (a LeaderRotationMeta) Hash() []byte {
c.Write(a.Pub)
c.Write([]byte(strconv.FormatUint(a.Epoch, 10)))
c.Write([]byte(strconv.FormatUint(a.Count, 10)))
c.Write([]byte(strconv.FormatUint(a.Shifts, 10)))
return c.Sum(nil)
}
// Clone returns a copy of the struct
func (a LeaderRotationMeta) Clone() LeaderRotationMeta {
return LeaderRotationMeta{
Pub: append([]byte{}, a.Pub...),
Epoch: a.Epoch,
Count: a.Count,
Shifts: a.Shifts,
Pub: append([]byte{}, a.Pub...),
Epoch: a.Epoch,
Count: a.Count,
}
}
@ -109,19 +104,10 @@ func processRotationMeta(epoch uint64, blockPubKey bls.SerializedPublicKey, s Le
} else {
s.Count = 1
}
// we should increase shifts if the leader has changed.
if !bytes.Equal(s.Pub, blockPubKey[:]) {
s.Shifts++
}
// but set to zero if new
if s.Epoch != epoch {
s.Shifts = 0
}
s.Epoch = epoch
return LeaderRotationMeta{
Pub: blockPubKey[:],
Epoch: s.Epoch,
Count: s.Count,
Shifts: s.Shifts,
Pub: blockPubKey[:],
Epoch: s.Epoch,
Count: s.Count,
}
}

@ -12,46 +12,27 @@ var k1 = bls.SerializedPublicKey{1, 2, 3}
func TestRotationMetaProcess(t *testing.T) {
t.Run("same_leader_increase_count", func(t *testing.T) {
rs := processRotationMeta(1, bls.SerializedPublicKey{}, LeaderRotationMeta{
Pub: bls.SerializedPublicKey{}.Bytes(),
Epoch: 1,
Count: 1,
Shifts: 1,
Pub: bls.SerializedPublicKey{}.Bytes(),
Epoch: 1,
Count: 1,
})
require.Equal(t, LeaderRotationMeta{
Pub: bls.SerializedPublicKey{}.Bytes(),
Epoch: 1,
Count: 2,
Shifts: 1,
}, rs)
})
t.Run("new_leader_increase_shifts", func(t *testing.T) {
rs := processRotationMeta(1, k1, LeaderRotationMeta{
Pub: bls.SerializedPublicKey{}.Bytes(),
Epoch: 1,
Count: 1,
Shifts: 1,
})
require.Equal(t, LeaderRotationMeta{
Pub: k1.Bytes(),
Epoch: 1,
Count: 1,
Shifts: 2,
Pub: bls.SerializedPublicKey{}.Bytes(),
Epoch: 1,
Count: 2,
}, rs)
})
t.Run("new_epoch_reset_count", func(t *testing.T) {
rs := processRotationMeta(2, k1, LeaderRotationMeta{
Pub: bls.SerializedPublicKey{}.Bytes(),
Epoch: 1,
Count: 1,
Shifts: 1,
Pub: bls.SerializedPublicKey{}.Bytes(),
Epoch: 1,
Count: 1,
})
require.Equal(t, LeaderRotationMeta{
Pub: k1.Bytes(),
Epoch: 2,
Count: 1,
Shifts: 0,
Pub: k1.Bytes(),
Epoch: 2,
Count: 1,
}, rs)
})
}

@ -22,8 +22,6 @@ import (
"math/big"
"time"
lru "github.com/hashicorp/golang-lru"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp"
@ -40,6 +38,7 @@ import (
"github.com/harmony-one/harmony/staking/effective"
"github.com/harmony-one/harmony/staking/slash"
staking "github.com/harmony-one/harmony/staking/types"
lru "github.com/hashicorp/golang-lru"
"github.com/pkg/errors"
)

@ -172,8 +172,8 @@ type TxPoolConfig struct {
AddEvent func(tx types.PoolTransaction, local bool) // Fire add event
Blacklist map[common.Address]struct{} // Set of accounts that cannot be a part of any transaction
AllowedTxs map[common.Address]AllowedTxData // Set of allowed transactions can break the blocklist
Blacklist map[common.Address]struct{} // Set of accounts that cannot be a part of any transaction
AllowedTxs map[common.Address][]AllowedTxData // Set of allowed transactions can break the blocklist
}
// DefaultTxPoolConfig contains the default configurations for the transaction
@ -193,7 +193,7 @@ var DefaultTxPoolConfig = TxPoolConfig{
Lifetime: 30 * time.Minute, // --txpool.lifetime
Blacklist: map[common.Address]struct{}{},
AllowedTxs: map[common.Address]AllowedTxData{},
AllowedTxs: map[common.Address][]AllowedTxData{},
}
// sanitize checks the provided user configurations and changes anything that's
@ -753,12 +753,20 @@ func (pool *TxPool) validateTx(tx types.PoolTransaction, local bool) error {
}
// do whitelist check first, if tx not in whitelist, do blacklist check
if allowedTx, exists := pool.config.AllowedTxs[from]; exists {
if to := tx.To(); to == nil || *to != allowedTx.To || !bytes.Equal(tx.Data(), allowedTx.Data) {
toAddr := common.Address{}
if to != nil {
toAddr = *to
if allowedTxs, exists := pool.config.AllowedTxs[from]; exists {
txIsAllowed := false
to := tx.To()
toAddr := common.Address{}
if to != nil {
toAddr = *to
for _, allowedTx := range allowedTxs {
if toAddr == allowedTx.To && bytes.Equal(tx.Data(), allowedTx.Data) {
txIsAllowed = true
break
}
}
}
if !txIsAllowed {
return errors.WithMessagef(ErrAllowedTxs, "transaction sender: %x, receiver: %x, input: %x", tx.From(), toAddr, tx.Data())
}
} else {

@ -37,6 +37,7 @@ type HarmonyConfig struct {
ShardData ShardDataConfig
GPO GasPriceOracleConfig
Preimage *PreimageConfig
Cache CacheConfig
}
func (hc HarmonyConfig) ToRPCServerConfig() nodeconfig.RPCServerConfig {
@ -138,7 +139,6 @@ type GeneralConfig struct {
TraceEnable bool
EnablePruneBeaconChain bool
RunElasticMode bool
TriesInMemory int
}
type TiKVConfig struct {
@ -306,6 +306,17 @@ type RevertConfig struct {
RevertBefore int
}
type CacheConfig struct {
Disabled bool // Whether to disable trie write caching (archive node)
TrieNodeLimit int // Memory limit (MB) at which to flush the current in-memory trie to disk
TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk
TriesInMemory uint64 // Block number from the head stored in disk before exiting
Preimages bool // Whether to store preimage of trie key to the disk
SnapshotLimit int // Memory allowance (MB) to use for caching snapshot entries in memory
SnapshotNoBuild bool // Whether the background generation is allowed
SnapshotWait bool // Wait for snapshot construction on startup
}
type PreimageConfig struct {
ImportFrom string
ExportTo string

@ -99,14 +99,14 @@ var partnerV0 = MustNewInstance(
partnerReshardingEpoch, PartnerSchedule.BlocksPerEpoch(),
)
var partnerV1 = MustNewInstance(
2, 5, 4, 0,
2, 15, 4, 0,
numeric.MustNewDecFromStr("0.9"), genesis.TNHarmonyAccounts,
genesis.TNFoundationalAccounts, emptyAllowlist, nil,
numeric.ZeroDec(), ethCommon.Address{},
partnerReshardingEpoch, PartnerSchedule.BlocksPerEpoch(),
)
var partnerV2 = MustNewInstance(
2, 5, 4, 0,
2, 20, 4, 0,
numeric.MustNewDecFromStr("0.9"), genesis.TNHarmonyAccounts,
genesis.TNFoundationalAccounts, emptyAllowlist,
feeCollectorsDevnet[1], numeric.MustNewDecFromStr("0.25"),
@ -114,8 +114,8 @@ var partnerV2 = MustNewInstance(
PartnerSchedule.BlocksPerEpoch(),
)
var partnerV3 = MustNewInstance(
2, 5, 1, 0,
numeric.MustNewDecFromStr("0.1"), genesis.TNHarmonyAccounts,
2, 20, 0, 0,
numeric.MustNewDecFromStr("0.0"), genesis.TNHarmonyAccounts,
genesis.TNFoundationalAccounts, emptyAllowlist,
feeCollectorsDevnet[1], numeric.MustNewDecFromStr("0.25"),
hip30CollectionAddressTestnet, partnerReshardingEpoch,

@ -42,6 +42,8 @@ const (
func (ts testnetSchedule) InstanceForEpoch(epoch *big.Int) Instance {
switch {
case params.TestnetChainConfig.IsTestnetExternalEpoch(epoch):
return testnetV6
case params.TestnetChainConfig.IsHIP30(epoch):
return testnetV5
case params.TestnetChainConfig.IsFeeCollectEpoch(epoch):
@ -169,4 +171,12 @@ var (
hip30CollectionAddressTestnet, testnetReshardingEpoch,
TestnetSchedule.BlocksPerEpoch(),
)
testnetV6 = MustNewInstance(
2, 30, 0, 0,
numeric.MustNewDecFromStr("0.0"), genesis.TNHarmonyAccountsV1,
genesis.TNFoundationalAccounts, emptyAllowlist,
feeCollectorsTestnet, numeric.MustNewDecFromStr("0.25"),
hip30CollectionAddressTestnet, testnetReshardingEpoch,
TestnetSchedule.BlocksPerEpoch(),
)
)

@ -78,6 +78,7 @@ var (
BlockGas30MEpoch: big.NewInt(1673), // 2023-11-02 17:30:00+00:00
MaxRateEpoch: EpochTBD,
DevnetExternalEpoch: EpochTBD,
TestnetExternalEpoch: EpochTBD,
}
// TestnetChainConfig contains the chain parameters to run a node on the harmony test network.
@ -124,6 +125,7 @@ var (
BlockGas30MEpoch: big.NewInt(2176), // 2023-10-12 10:00:00+00:00
MaxRateEpoch: EpochTBD,
DevnetExternalEpoch: EpochTBD,
TestnetExternalEpoch: EpochTBD,
}
// PangaeaChainConfig contains the chain parameters for the Pangaea network.
// All features except for CrossLink are enabled at launch.
@ -170,6 +172,7 @@ var (
BlockGas30MEpoch: big.NewInt(0),
MaxRateEpoch: EpochTBD,
DevnetExternalEpoch: EpochTBD,
TestnetExternalEpoch: EpochTBD,
}
// PartnerChainConfig contains the chain parameters for the Partner network.
@ -208,15 +211,16 @@ var (
SlotsLimitedEpoch: EpochTBD, // epoch to enable HIP-16
CrossShardXferPrecompileEpoch: big.NewInt(5),
AllowlistEpoch: EpochTBD,
LeaderRotationInternalValidatorsEpoch: big.NewInt(2379),
LeaderRotationExternalValidatorsEpoch: big.NewInt(3173),
LeaderRotationInternalValidatorsEpoch: big.NewInt(144),
LeaderRotationExternalValidatorsEpoch: big.NewInt(144),
FeeCollectEpoch: big.NewInt(5),
ValidatorCodeFixEpoch: big.NewInt(5),
HIP30Epoch: big.NewInt(7),
BlockGas30MEpoch: big.NewInt(7),
NoNilDelegationsEpoch: EpochTBD,
MaxRateEpoch: EpochTBD,
DevnetExternalEpoch: EpochTBD,
TestnetExternalEpoch: EpochTBD,
DevnetExternalEpoch: big.NewInt(144),
}
// StressnetChainConfig contains the chain parameters for the Stress test network.
@ -264,6 +268,7 @@ var (
BlockGas30MEpoch: big.NewInt(0),
MaxRateEpoch: EpochTBD,
DevnetExternalEpoch: EpochTBD,
TestnetExternalEpoch: EpochTBD,
}
// LocalnetChainConfig contains the chain parameters to run for local development.
@ -310,6 +315,7 @@ var (
BlockGas30MEpoch: big.NewInt(0),
MaxRateEpoch: EpochTBD,
DevnetExternalEpoch: EpochTBD,
TestnetExternalEpoch: EpochTBD,
}
// AllProtocolChanges ...
@ -357,6 +363,7 @@ var (
big.NewInt(0), // HIP30Epoch
big.NewInt(0), // NoNilDelegationsEpoch
big.NewInt(0), // MaxRateEpoch
big.NewInt(0), // MaxRateEpoch
big.NewInt(0),
}
@ -405,6 +412,7 @@ var (
big.NewInt(0), // NoNilDelegationsEpoch
big.NewInt(0), // BlockGas30M
big.NewInt(0), // MaxRateEpoch
big.NewInt(0), // MaxRateEpoch
big.NewInt(0),
}
@ -575,6 +583,8 @@ type ChainConfig struct {
DevnetExternalEpoch *big.Int `json:"devnet-external-epoch,omitempty"`
TestnetExternalEpoch *big.Int `json:"testnet-external-epoch,omitempty"`
BlockGas30MEpoch *big.Int `json:"block-gas-30m-epoch,omitempty"`
// MaxRateEpoch will make sure the validator max-rate is at least equal to the minRate + the validator max-rate-increase
@ -861,6 +871,10 @@ func (c *ChainConfig) IsDevnetExternalEpoch(epoch *big.Int) bool {
return isForked(c.DevnetExternalEpoch, epoch)
}
func (c *ChainConfig) IsTestnetExternalEpoch(epoch *big.Int) bool {
return isForked(c.TestnetExternalEpoch, epoch)
}
func (c *ChainConfig) IsMaxRate(epoch *big.Int) bool {
return isForked(c.MaxRateEpoch, epoch)
}

@ -3,7 +3,6 @@ package shardchain
import (
"math/big"
"sync"
"time"
"github.com/harmony-one/harmony/core/state"
harmonyconfig "github.com/harmony-one/harmony/internal/configs/harmony"
@ -110,14 +109,19 @@ func (sc *CollectionImpl) ShardChain(shardID uint32, options ...core.Options) (c
Uint32("shardID", shardID).
Msg("disable cache, running in archival mode")
} else {
cacheConfig = &core.CacheConfig{
TrieNodeLimit: 256,
TrieTimeLimit: 2 * time.Minute,
TriesInMemory: 128,
Preimages: true,
}
if sc.harmonyconfig != nil {
cacheConfig.TriesInMemory = uint64(sc.harmonyconfig.General.TriesInMemory)
hc := sc.harmonyconfig
if hc != nil {
cacheConfig = &core.CacheConfig{
Disabled: hc.Cache.Disabled,
TrieNodeLimit: hc.Cache.TrieNodeLimit,
TrieTimeLimit: hc.Cache.TrieTimeLimit,
TriesInMemory: hc.Cache.TriesInMemory,
SnapshotLimit: hc.Cache.SnapshotLimit,
SnapshotWait: hc.Cache.SnapshotWait,
Preimages: hc.Cache.Preimages,
}
} else {
cacheConfig = nil
}
}

@ -177,7 +177,7 @@ func (node *Node) GetConfig() rpc_common.Config {
// GetLastSigningPower get last signed power
func (node *Node) GetLastSigningPower() (float64, error) {
power, err := node.Consensus.Decider.CurrentTotalPower(quorum.Commit)
power, err := node.Consensus.Decider().CurrentTotalPower(quorum.Commit)
if err != nil {
return 0, err
}

@ -11,28 +11,10 @@ import (
"sync"
"time"
"github.com/harmony-one/harmony/internal/registry"
"github.com/harmony-one/harmony/internal/shardchain/tikv_manage"
"github.com/harmony-one/harmony/internal/tikv"
"github.com/harmony-one/harmony/internal/tikv/redis_helper"
"github.com/harmony-one/harmony/internal/utils/lrucache"
"github.com/ethereum/go-ethereum/rlp"
harmonyconfig "github.com/harmony-one/harmony/internal/configs/harmony"
"github.com/harmony-one/harmony/internal/utils/crosslinks"
"github.com/ethereum/go-ethereum/common"
protobuf "github.com/golang/protobuf/proto"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/abool"
bls_core "github.com/harmony-one/bls/ffi/go/bls"
lru "github.com/hashicorp/golang-lru"
libp2p_pubsub "github.com/libp2p/go-libp2p-pubsub"
libp2p_peer "github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/rcrowley/go-metrics"
"golang.org/x/sync/semaphore"
"github.com/harmony-one/harmony/api/proto"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
proto_node "github.com/harmony-one/harmony/api/proto/node"
@ -46,9 +28,16 @@ import (
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/crypto/bls"
common2 "github.com/harmony-one/harmony/internal/common"
harmonyconfig "github.com/harmony-one/harmony/internal/configs/harmony"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/params"
"github.com/harmony-one/harmony/internal/registry"
"github.com/harmony-one/harmony/internal/shardchain/tikv_manage"
"github.com/harmony-one/harmony/internal/tikv"
"github.com/harmony-one/harmony/internal/tikv/redis_helper"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/internal/utils/crosslinks"
"github.com/harmony-one/harmony/internal/utils/lrucache"
"github.com/harmony-one/harmony/node/worker"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/shard"
@ -56,6 +45,14 @@ import (
"github.com/harmony-one/harmony/staking/slash"
staking "github.com/harmony-one/harmony/staking/types"
"github.com/harmony-one/harmony/webhooks"
lru "github.com/hashicorp/golang-lru"
libp2p_pubsub "github.com/libp2p/go-libp2p-pubsub"
libp2p_peer "github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/rcrowley/go-metrics"
"golang.org/x/sync/semaphore"
protobuf "google.golang.org/protobuf/proto"
)
const (
@ -81,7 +78,7 @@ type syncConfig struct {
}
type ISync interface {
UpdateBlockAndStatus(block *types.Block, bc core.BlockChain, verifyAllSig bool) error
UpdateBlockAndStatus(block *types.Block, bc core.BlockChain) error
AddLastMileBlock(block *types.Block)
GetActivePeerNumber() int
CreateSyncConfig(peers []p2p.Peer, shardID uint32, selfPeerID libp2p_peer.ID, waitForEachPeerToConnect bool) error
@ -655,7 +652,7 @@ func validateShardBoundMessage(consensus *consensus.Consensus, peer libp2p_peer.
return nil, nil, true, errors.WithStack(shard.ErrValidNotInCommittee)
}
} else {
count := consensus.Decider.ParticipantsCount()
count := consensus.Decider().ParticipantsCount()
if (count+7)>>3 != int64(len(senderBitmap)) {
nodeConsensusMessageCounterVec.With(prometheus.Labels{"type": "invalid_participant_count"}).Inc()
return nil, nil, true, errors.WithStack(errWrongSizeOfBitmap)
@ -1022,7 +1019,7 @@ func New(
host p2p.Host,
consensusObj *consensus.Consensus,
blacklist map[common.Address]struct{},
allowedTxs map[common.Address]core.AllowedTxData,
allowedTxs map[common.Address][]core.AllowedTxData,
localAccounts []common.Address,
harmonyconfig *harmonyconfig.HarmonyConfig,
registry *registry.Registry,
@ -1178,6 +1175,45 @@ func New(
node.serviceManager = service.NewManager()
// delete old pending crosslinks
if node.Blockchain().ShardID() == shard.BeaconChainShardID {
ten := big.NewInt(10)
crossLinkEpochThreshold := new(big.Int).Sub(node.Blockchain().CurrentHeader().Epoch(), ten)
invalidToDelete := make([]types.CrossLink, 0, 1000)
allPending, err := node.Blockchain().ReadPendingCrossLinks()
if err == nil {
for _, pending := range allPending {
// if pending crosslink is older than 10 epochs, delete it
if pending.EpochF.Cmp(crossLinkEpochThreshold) <= 0 {
invalidToDelete = append(invalidToDelete, pending)
utils.Logger().Info().
Uint32("shard", pending.ShardID()).
Int64("epoch", pending.Epoch().Int64()).
Uint64("blockNum", pending.BlockNum()).
Int64("viewID", pending.ViewID().Int64()).
Interface("hash", pending.Hash()).
Msg("[PendingCrossLinksOnInit] delete old pending cross links")
}
}
if n, err := node.Blockchain().DeleteFromPendingCrossLinks(invalidToDelete); err != nil {
utils.Logger().Error().
Err(err).
Msg("[PendingCrossLinksOnInit] deleting old pending cross links failed")
} else if len(invalidToDelete) > 0 {
utils.Logger().Info().
Int("not-deleted", n).
Int("deleted", len(invalidToDelete)).
Msg("[PendingCrossLinksOnInit] deleted old pending cross links")
}
} else {
utils.Logger().Error().
Err(err).
Msg("[PendingCrossLinksOnInit] read pending cross links failed")
}
}
return &node
}

@ -53,7 +53,7 @@ func (node *Node) explorerMessageHandler(ctx context.Context, msg *msg_pb.Messag
return err
}
if !node.Consensus.Decider.IsQuorumAchievedByMask(mask) {
if !node.Consensus.Decider().IsQuorumAchievedByMask(mask) {
utils.Logger().Error().Msg("[Explorer] not have enough signature power")
return nil
}

@ -1,6 +1,7 @@
package node
import (
"math/big"
"sort"
"strings"
"time"
@ -226,11 +227,18 @@ func (node *Node) ProposeNewBlock(commitSigs chan []byte) (*types.Block, error)
utils.AnalysisStart("proposeNewBlockVerifyCrossLinks")
// Prepare cross links and slashing messages
var crossLinksToPropose types.CrossLinks
ten := big.NewInt(10)
crossLinkEpochThreshold := new(big.Int).Sub(currentHeader.Epoch(), ten)
if isBeaconchainInCrossLinkEra {
allPending, err := node.Blockchain().ReadPendingCrossLinks()
invalidToDelete := []types.CrossLink{}
if err == nil {
for _, pending := range allPending {
// if pending crosslink is older than 10 epochs, delete it and continue. this logic is also applied when the node starts
if pending.EpochF.Cmp(crossLinkEpochThreshold) <= 0 {
invalidToDelete = append(invalidToDelete, pending)
continue
}
// ReadCrossLink beacon chain usage.
exist, err := node.Blockchain().ReadCrossLink(pending.ShardID(), pending.BlockNum())
if err == nil || exist != nil {
@ -263,7 +271,16 @@ func (node *Node) ProposeNewBlock(commitSigs chan []byte) (*types.Block, error)
len(allPending),
)
}
node.Blockchain().DeleteFromPendingCrossLinks(invalidToDelete)
if n, err := node.Blockchain().DeleteFromPendingCrossLinks(invalidToDelete); err != nil {
utils.Logger().Error().
Err(err).
Msg("[ProposeNewBlock] invalid pending cross links failed")
} else if len(invalidToDelete) > 0 {
utils.Logger().Info().
Int("not-deleted", n).
Int("deleted", len(invalidToDelete)).
Msg("[ProposeNewBlock] deleted invalid pending cross links")
}
}
utils.AnalysisEnd("proposeNewBlockVerifyCrossLinks")

@ -209,7 +209,11 @@ func (ch *chainHelperImpl) getAccountRange(root common.Hash, origin common.Hash,
if err != nil {
return nil, nil, err
}
it, err := ch.chain.Snapshots().AccountIterator(root, origin)
snapshots := ch.chain.Snapshots()
if snapshots == nil {
return nil, nil, errors.Errorf("failed to retrieve snapshots")
}
it, err := snapshots.AccountIterator(root, origin)
if err != nil {
return nil, nil, err
}
@ -275,6 +279,10 @@ func (ch *chainHelperImpl) getStorageRanges(root common.Hash, accounts []common.
proofs [][]byte
size uint64
)
snapshots := ch.chain.Snapshots()
if snapshots == nil {
return nil, nil, errors.Errorf("failed to retrieve snapshots")
}
for _, account := range accounts {
// If we've exceeded the requested data limit, abort without opening
// a new storage range (that we'd need to prove due to exceeded size)
@ -284,7 +292,7 @@ func (ch *chainHelperImpl) getStorageRanges(root common.Hash, accounts []common.
// The first account might start from a different origin and end sooner
// origin==nil or limit ==nil
// Retrieve the requested state and bail out if non existent
it, err := ch.chain.Snapshots().StorageIterator(root, account, origin)
it, err := snapshots.StorageIterator(root, account, origin)
if err != nil {
return nil, nil, err
}
@ -409,7 +417,11 @@ func (ch *chainHelperImpl) getTrieNodes(root common.Hash, paths []*message.TrieN
return nil, nil
}
// The 'snap' might be nil, in which case we cannot serve storage slots.
snap := ch.chain.Snapshots().Snapshot(root)
snapshots := ch.chain.Snapshots()
if snapshots == nil {
return nil, errors.Errorf("failed to retrieve snapshots")
}
snap := snapshots.Snapshot(root)
// Retrieve trie nodes until the packet size limit is reached
var (
nodes [][]byte

@ -11,11 +11,11 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
protobuf "github.com/golang/protobuf/proto"
"github.com/harmony-one/harmony/block"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/p2p/stream/protocols/sync/message"
syncpb "github.com/harmony-one/harmony/p2p/stream/protocols/sync/message"
protobuf "google.golang.org/protobuf/proto"
)
type testChainHelper struct{}

@ -8,12 +8,12 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
protobuf "github.com/golang/protobuf/proto"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/p2p/stream/protocols/sync/message"
syncpb "github.com/harmony-one/harmony/p2p/stream/protocols/sync/message"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/pkg/errors"
protobuf "google.golang.org/protobuf/proto"
)
// GetBlocksByNumber do getBlocksByNumberRequest through sync stream protocol.

@ -271,8 +271,6 @@ func (p *Protocol) RemoveStream(stID sttypes.StreamID) {
if exist && st != nil {
//TODO: log this incident with reason
st.Close()
// stream manager removes this stream from the list and triggers discovery if number of streams are not enough
p.sm.RemoveStream(stID) //TODO: double check to see if this part is needed
p.logger.Info().
Str("stream ID", string(stID)).
Msg("stream removed")
@ -290,8 +288,6 @@ func (p *Protocol) StreamFailed(stID sttypes.StreamID, reason string) {
Msg("stream failed")
if st.FailedTimes() >= MaxStreamFailures {
st.Close()
// stream manager removes this stream from the list and triggers discovery if number of streams are not enough
p.sm.RemoveStream(stID) //TODO: double check to see if this part is needed
p.logger.Warn().
Str("stream ID", string(st.ID())).
Msg("stream removed")

@ -5,17 +5,16 @@ import (
"sync/atomic"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
protobuf "github.com/golang/protobuf/proto"
"github.com/harmony-one/harmony/p2p/stream/protocols/sync/message"
syncpb "github.com/harmony-one/harmony/p2p/stream/protocols/sync/message"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
libp2p_network "github.com/libp2p/go-libp2p/core/network"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/rs/zerolog"
protobuf "google.golang.org/protobuf/proto"
)
// syncStream is the structure for a stream running sync protocol.
@ -84,7 +83,7 @@ func (st *syncStream) readMsgLoop() {
func (st *syncStream) deliverMsg(msg protobuf.Message) {
syncMsg := msg.(*syncpb.Message)
if syncMsg == nil {
st.logger.Info().Str("message", msg.String()).Msg("received unexpected sync message")
st.logger.Info().Interface("message", msg).Msg("received unexpected sync message")
return
}
if req := syncMsg.GetReq(); req != nil {

@ -7,7 +7,6 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
protobuf "github.com/golang/protobuf/proto"
syncpb "github.com/harmony-one/harmony/p2p/stream/protocols/sync/message"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
ic "github.com/libp2p/go-libp2p/core/crypto"
@ -15,6 +14,7 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
ma "github.com/multiformats/go-multiaddr"
protobuf "google.golang.org/protobuf/proto"
)
var _ sttypes.Protocol = &Protocol{}

@ -3,10 +3,10 @@ package sync
import (
"fmt"
protobuf "github.com/golang/protobuf/proto"
"github.com/harmony-one/harmony/p2p/stream/common/requestmanager"
syncpb "github.com/harmony-one/harmony/p2p/stream/protocols/sync/message"
"github.com/pkg/errors"
protobuf "google.golang.org/protobuf/proto"
)
var (

@ -1,12 +1,14 @@
#!/usr/bin/env bash
set -e
echo $TRAVIS_PULL_REQUEST_BRANCH
DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" >/dev/null 2>&1 && pwd)"
echo $DIR
echo $GOPATH
cd $GOPATH/src/github.com/harmony-one/harmony-test
git fetch
git pull
git checkout $TRAVIS_PULL_REQUEST_BRANCH || true
git branch --show-current
cd localnet
docker build -t harmonyone/localnet-test .

@ -1,11 +1,13 @@
#!/usr/bin/env bash
set -e
echo $TRAVIS_PULL_REQUEST_BRANCH
DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" >/dev/null 2>&1 && pwd)"
echo $DIR
echo $GOPATH
cd $GOPATH/src/github.com/harmony-one/harmony-test
git fetch
git checkout $TRAVIS_PULL_REQUEST_BRANCH || true
git pull
git branch --show-current
cd localnet

@ -6,28 +6,24 @@ import (
"math/rand"
"time"
"github.com/harmony-one/harmony/core/rawdb"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/crypto/bls"
blockfactory "github.com/harmony-one/harmony/block/factory"
"github.com/harmony-one/harmony/internal/params"
"github.com/harmony-one/harmony/internal/utils"
common2 "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
bls_core "github.com/harmony-one/bls/ffi/go/bls"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
blockfactory "github.com/harmony-one/harmony/block/factory"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/rawdb"
"github.com/harmony-one/harmony/core/state"
"github.com/harmony-one/harmony/core/vm"
"github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/crypto/hash"
"github.com/harmony-one/harmony/internal/chain"
"github.com/harmony-one/harmony/internal/common"
protobuf "github.com/golang/protobuf/proto"
"github.com/harmony-one/harmony/internal/params"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/numeric"
staking "github.com/harmony-one/harmony/staking/types"
protobuf "google.golang.org/protobuf/proto"
)
var (

Loading…
Cancel
Save