Revert improvements. (#4520)

pull/4500/head
Konstantin 1 year ago committed by GitHub
parent 370d122dbd
commit cf5dd8b5fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 23
      api/service/stagedsync/sync_config.go
  2. 109
      cmd/harmony/main.go
  3. 7
      core_test/shardchain_test.go
  4. 38
      internal/registry/registry.go
  5. 18
      node/node.go
  6. 22
      node/node_handler_test.go
  7. 8
      node/node_newblock_test.go
  8. 8
      node/node_test.go

@ -25,7 +25,6 @@ const (
SyncingPortDifference = 3000
inSyncThreshold = 0 // when peerBlockHeight - myBlockHeight <= inSyncThreshold, it's ready to join consensus
SyncLoopBatchSize uint32 = 30 // maximum size for one query of block hashes
verifyHeaderBatchSize uint64 = 100 // block chain header verification batch size (not used for now)
LastMileBlocksSize = 50
// after cutting off a number of connected peers, the result number of peers
@ -53,14 +52,6 @@ type SyncPeerConfig struct {
failedTimes uint64
}
// CreateTestSyncPeerConfig used for testing.
func CreateTestSyncPeerConfig(client *downloader.Client, blockHashes [][]byte) *SyncPeerConfig {
return &SyncPeerConfig{
client: client,
blockHashes: blockHashes,
}
}
// GetClient returns client pointer of downloader.Client
func (peerConfig *SyncPeerConfig) GetClient() *downloader.Client {
return peerConfig.client
@ -303,21 +294,21 @@ func (sc *SyncConfig) FindPeerByHash(peerHash []byte) *SyncPeerConfig {
// getHowManyMaxConsensus returns max number of consensus nodes and the first ID of consensus group.
// Assumption: all peers are sorted by CompareSyncPeerConfigByBlockHashes first.
// Caller shall ensure mtx is locked for reading.
func (sc *SyncConfig) getHowManyMaxConsensus() (int, int) {
func getHowManyMaxConsensus(peers []*SyncPeerConfig) (int, int) {
// As all peers are sorted by their blockHashes, all equal blockHashes should come together and consecutively.
if len(sc.peers) == 0 {
if len(peers) == 0 {
return -1, 0
} else if len(sc.peers) == 1 {
} else if len(peers) == 1 {
return 0, 1
}
maxFirstID := len(sc.peers) - 1
maxFirstID := len(peers) - 1
for i := maxFirstID - 1; i >= 0; i-- {
if CompareSyncPeerConfigByblockHashes(sc.peers[maxFirstID], sc.peers[i]) != 0 {
if CompareSyncPeerConfigByblockHashes(peers[maxFirstID], peers[i]) != 0 {
break
}
maxFirstID = i
}
maxCount := len(sc.peers) - maxFirstID
maxCount := len(peers) - maxFirstID
return maxFirstID, maxCount
}
@ -386,7 +377,7 @@ func (sc *SyncConfig) GetBlockHashesConsensusAndCleanUp(bgMode bool) error {
sort.Slice(sc.peers, func(i, j int) bool {
return CompareSyncPeerConfigByblockHashes(sc.peers[i], sc.peers[j]) == -1
})
maxFirstID, maxCount := sc.getHowManyMaxConsensus()
maxFirstID, maxCount := getHowManyMaxConsensus(sc.peers)
if maxFirstID == -1 {
return errors.New("invalid peer index -1 for block hashes query")
}

@ -268,6 +268,29 @@ func setupNodeLog(config harmonyconfig.HarmonyConfig) {
}
}
func revert(chain core.BlockChain, hc harmonyconfig.HarmonyConfig) {
curNum := chain.CurrentBlock().NumberU64()
if curNum < uint64(hc.Revert.RevertBefore) && curNum >= uint64(hc.Revert.RevertTo) {
// Remove invalid blocks
for chain.CurrentBlock().NumberU64() >= uint64(hc.Revert.RevertTo) {
curBlock := chain.CurrentBlock()
rollbacks := []ethCommon.Hash{curBlock.Hash()}
if err := chain.Rollback(rollbacks); err != nil {
fmt.Printf("Revert failed: %v\n", err)
os.Exit(1)
}
lastSig := curBlock.Header().LastCommitSignature()
sigAndBitMap := append(lastSig[:], curBlock.Header().LastCommitBitmap()...)
chain.WriteCommitSig(curBlock.NumberU64()-1, sigAndBitMap)
}
fmt.Printf("Revert finished. Current block: %v\n", chain.CurrentBlock().NumberU64())
utils.Logger().Warn().
Uint64("Current Block", chain.CurrentBlock().NumberU64()).
Msg("Revert finished.")
os.Exit(1)
}
}
func setupNodeAndRun(hc harmonyconfig.HarmonyConfig) {
var err error
@ -353,26 +376,7 @@ func setupNodeAndRun(hc harmonyconfig.HarmonyConfig) {
if hc.Revert.RevertBeacon {
chain = currentNode.Beaconchain()
}
curNum := chain.CurrentBlock().NumberU64()
if curNum < uint64(hc.Revert.RevertBefore) && curNum >= uint64(hc.Revert.RevertTo) {
// Remove invalid blocks
for chain.CurrentBlock().NumberU64() >= uint64(hc.Revert.RevertTo) {
curBlock := chain.CurrentBlock()
rollbacks := []ethCommon.Hash{curBlock.Hash()}
if err := chain.Rollback(rollbacks); err != nil {
fmt.Printf("Revert failed: %v\n", err)
os.Exit(1)
}
lastSig := curBlock.Header().LastCommitSignature()
sigAndBitMap := append(lastSig[:], curBlock.Header().LastCommitBitmap()...)
chain.WriteCommitSig(curBlock.NumberU64()-1, sigAndBitMap)
}
fmt.Printf("Revert finished. Current block: %v\n", chain.CurrentBlock().NumberU64())
utils.Logger().Warn().
Uint64("Current Block", chain.CurrentBlock().NumberU64()).
Msg("Revert finished.")
os.Exit(1)
}
revert(chain, hc)
}
//// code to handle pre-image export, import and generation
@ -727,31 +731,7 @@ func createGlobalConfig(hc harmonyconfig.HarmonyConfig) (*nodeconfig.ConfigType,
return nodeConfig, nil
}
func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfig.ConfigType, registry *registry.Registry) *node.Node {
// Parse minPeers from harmonyconfig.HarmonyConfig
var minPeers int
var aggregateSig bool
if hc.Consensus != nil {
minPeers = hc.Consensus.MinPeers
aggregateSig = hc.Consensus.AggregateSig
} else {
minPeers = defaultConsensusConfig.MinPeers
aggregateSig = defaultConsensusConfig.AggregateSig
}
blacklist, err := setupBlacklist(hc)
if err != nil {
utils.Logger().Warn().Msgf("Blacklist setup error: %s", err.Error())
}
allowedTxs, err := setupAllowedTxs(hc)
if err != nil {
utils.Logger().Warn().Msgf("AllowedTxs setup error: %s", err.Error())
}
localAccounts, err := setupLocalAccounts(hc, blacklist)
if err != nil {
utils.Logger().Warn().Msgf("local accounts setup error: %s", err.Error())
}
func setupChain(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfig.ConfigType, registry *registry.Registry) *registry.Registry {
// Current node.
var chainDBFactory shardchain.DBFactory
@ -770,6 +750,7 @@ func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfi
}
engine := chain.NewEngine()
registry.SetEngine(engine)
chainConfig := nodeConfig.GetNetworkType().ChainConfig()
collection := shardchain.NewCollection(
@ -780,6 +761,7 @@ func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfi
collection.DisableCache(shardID)
}
}
registry.SetShardChainCollection(collection)
var blockchain core.BlockChain
@ -793,17 +775,48 @@ func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfi
registry.SetBeaconchain(beacon)
}
blockchain, err = collection.ShardChain(nodeConfig.ShardID)
blockchain, err := collection.ShardChain(nodeConfig.ShardID)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "Error :%v \n", err)
os.Exit(1)
}
registry.SetBlockchain(blockchain)
registry.SetWebHooks(nodeConfig.WebHooks.Hooks)
if registry.GetBeaconchain() == nil {
registry.SetBeaconchain(registry.GetBlockchain())
}
return registry
}
func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfig.ConfigType, registry *registry.Registry) *node.Node {
// Parse minPeers from harmonyconfig.HarmonyConfig
var minPeers int
var aggregateSig bool
if hc.Consensus != nil {
minPeers = hc.Consensus.MinPeers
aggregateSig = hc.Consensus.AggregateSig
} else {
minPeers = defaultConsensusConfig.MinPeers
aggregateSig = defaultConsensusConfig.AggregateSig
}
blacklist, err := setupBlacklist(hc)
if err != nil {
utils.Logger().Warn().Msgf("Blacklist setup error: %s", err.Error())
}
allowedTxs, err := setupAllowedTxs(hc)
if err != nil {
utils.Logger().Warn().Msgf("AllowedTxs setup error: %s", err.Error())
}
localAccounts, err := setupLocalAccounts(hc, blacklist)
if err != nil {
utils.Logger().Warn().Msgf("local accounts setup error: %s", err.Error())
}
registry = setupChain(hc, nodeConfig, registry)
if registry.GetShardChainCollection() == nil {
panic("shard chain collection is nil1111111")
}
registry.SetWebHooks(nodeConfig.WebHooks.Hooks)
cxPool := core.NewCxPool(core.CxPoolSize)
registry.SetCxPool(cxPool)
@ -818,7 +831,7 @@ func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfi
os.Exit(1)
}
currentNode := node.New(myHost, currentConsensus, engine, collection, blacklist, allowedTxs, localAccounts, nodeConfig.ArchiveModes(), &hc, registry)
currentNode := node.New(myHost, currentConsensus, blacklist, allowedTxs, localAccounts, &hc, registry)
if hc.Legacy != nil && hc.Legacy.TPBroadcastInvalidTxn != nil {
currentNode.BroadcastInvalidTx = *hc.Legacy.TPBroadcastInvalidTxn

@ -48,7 +48,10 @@ func TestAddNewBlock(t *testing.T) {
if err != nil {
t.Fatal("cannot get blockchain")
}
reg := registry.New().SetBlockchain(blockchain)
reg := registry.New().
SetBlockchain(blockchain).
SetEngine(engine).
SetShardChainCollection(collection)
consensus, err := consensus.New(
host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsKey), reg, decider, 3, false,
)
@ -57,7 +60,7 @@ func TestAddNewBlock(t *testing.T) {
}
nodeconfig.SetNetworkType(nodeconfig.Testnet)
var block *types.Block
node := node.New(host, consensus, engine, collection, nil, nil, nil, nil, nil, reg)
node := node.New(host, consensus, nil, nil, nil, nil, reg)
commitSigs := make(chan []byte, 1)
commitSigs <- []byte{}
block, err = node.Worker.FinalizeNewBlock(

@ -3,7 +3,9 @@ package registry
import (
"sync"
"github.com/harmony-one/harmony/consensus/engine"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/internal/shardchain"
"github.com/harmony-one/harmony/webhooks"
)
@ -16,6 +18,8 @@ type Registry struct {
txPool *core.TxPool
cxPool *core.CxPool
isBackup bool
engine engine.Engine
collection *shardchain.CollectionImpl
}
// New creates a new registry.
@ -122,3 +126,37 @@ func (r *Registry) GetCxPool() *core.CxPool {
return r.cxPool
}
// SetEngine sets the engine to registry.
func (r *Registry) SetEngine(engine engine.Engine) *Registry {
r.mu.Lock()
defer r.mu.Unlock()
r.engine = engine
return r
}
// GetEngine gets the engine from registry.
func (r *Registry) GetEngine() engine.Engine {
r.mu.Lock()
defer r.mu.Unlock()
return r.engine
}
// SetShardChainCollection sets the shard chain collection to registry.
func (r *Registry) SetShardChainCollection(collection *shardchain.CollectionImpl) *Registry {
r.mu.Lock()
defer r.mu.Unlock()
r.collection = collection
return r
}
// GetShardChainCollection gets the shard chain collection from registry.
func (r *Registry) GetShardChainCollection() *shardchain.CollectionImpl {
r.mu.Lock()
defer r.mu.Unlock()
return r.collection
}

@ -11,7 +11,6 @@ import (
"sync"
"time"
"github.com/harmony-one/harmony/consensus/engine"
"github.com/harmony-one/harmony/internal/registry"
"github.com/harmony-one/harmony/internal/shardchain/tikv_manage"
"github.com/harmony-one/harmony/internal/tikv"
@ -49,7 +48,6 @@ import (
common2 "github.com/harmony-one/harmony/internal/common"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/params"
"github.com/harmony-one/harmony/internal/shardchain"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/node/worker"
"github.com/harmony-one/harmony/p2p"
@ -104,8 +102,7 @@ type Node struct {
pendingCXReceipts map[string]*types.CXReceiptsProof // All the receipts received but not yet processed for Consensus
pendingCXMutex sync.Mutex
crosslinks *crosslinks.Crosslinks // Memory storage for crosslink processing.
// Shard databases
shardChains shardchain.Collection
SelfPeer p2p.Peer
stateMutex sync.Mutex // mutex for change node state
TxPool *core.TxPool
@ -193,7 +190,10 @@ func (node *Node) Beaconchain() core.BlockChain {
}
func (node *Node) chain(shardID uint32, options core.Options) core.BlockChain {
bc, err := node.shardChains.ShardChain(shardID, options)
if node.registry.GetShardChainCollection() == nil {
panic("shard chain collection is nil")
}
bc, err := node.registry.GetShardChainCollection().ShardChain(shardID, options)
if err != nil {
utils.Logger().Error().Err(err).Msg("cannot get beaconchain")
}
@ -1026,12 +1026,9 @@ func (node *Node) GetSyncID() [SyncIDLength]byte {
func New(
host p2p.Host,
consensusObj *consensus.Consensus,
engine engine.Engine,
collection *shardchain.CollectionImpl,
blacklist map[common.Address]struct{},
allowedTxs map[common.Address]core.AllowedTxData,
localAccounts []common.Address,
isArchival map[uint32]bool,
harmonyconfig *harmonyconfig.HarmonyConfig,
registry *registry.Registry,
) *Node {
@ -1058,7 +1055,6 @@ func New(
networkType := node.NodeConfig.GetNetworkType()
chainConfig := networkType.ChainConfig()
node.chainConfig = chainConfig
node.shardChains = collection
node.IsSynchronized = abool.NewBool(false)
if host != nil {
@ -1081,9 +1077,9 @@ func New(
if b2 {
shardID := node.NodeConfig.ShardID
// HACK get the real error reason
_, err = node.shardChains.ShardChain(shardID)
_, err = node.registry.GetShardChainCollection().ShardChain(shardID)
} else {
_, err = node.shardChains.ShardChain(shard.BeaconChainShardID)
_, err = node.registry.GetShardChainCollection().ShardChain(shard.BeaconChainShardID)
}
fmt.Fprintf(os.Stderr, "Cannot initialize node: %v\n", err)
os.Exit(-1)

@ -45,13 +45,16 @@ func TestAddNewBlock(t *testing.T) {
if err != nil {
t.Fatal("cannot get blockchain")
}
reg := registry.New().SetBlockchain(blockchain)
reg := registry.New().
SetBlockchain(blockchain).
SetEngine(engine).
SetShardChainCollection(collection)
consensus, err := consensus.New(host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsKey), reg, decider, 3, false)
if err != nil {
t.Fatalf("Cannot craeate consensus: %v", err)
}
nodeconfig.SetNetworkType(nodeconfig.Testnet)
node := New(host, consensus, engine, collection, nil, nil, nil, nil, nil, reg)
node := New(host, consensus, nil, nil, nil, nil, reg)
txs := make(map[common.Address]types.Transactions)
stks := staking.StakingTransactions{}
@ -100,7 +103,11 @@ func TestVerifyNewBlock(t *testing.T) {
if err != nil {
t.Fatal("cannot get blockchain")
}
reg := registry.New().SetBlockchain(blockchain)
reg := registry.New().
SetBlockchain(blockchain).
SetEngine(engine).
SetShardChainCollection(collection)
consensusObj, err := consensus.New(
host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsKey), reg, decider, 3, false,
)
@ -110,7 +117,7 @@ func TestVerifyNewBlock(t *testing.T) {
archiveMode := make(map[uint32]bool)
archiveMode[0] = true
archiveMode[1] = false
node := New(host, consensusObj, engine, collection, nil, nil, nil, archiveMode, nil, reg)
node := New(host, consensusObj, nil, nil, nil, nil, reg)
txs := make(map[common.Address]types.Transactions)
stks := staking.StakingTransactions{}
@ -156,7 +163,10 @@ func TestVerifyVRF(t *testing.T) {
decider := quorum.NewDecider(
quorum.SuperMajorityVote, shard.BeaconChainShardID,
)
reg := registry.New().SetBlockchain(blockchain)
reg := registry.New().
SetBlockchain(blockchain).
SetEngine(engine).
SetShardChainCollection(collection)
consensus, err := consensus.New(
host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsKey), reg, decider, 3, false,
)
@ -166,7 +176,7 @@ func TestVerifyVRF(t *testing.T) {
archiveMode := make(map[uint32]bool)
archiveMode[0] = true
archiveMode[1] = false
node := New(host, consensus, engine, collection, nil, nil, nil, archiveMode, nil, reg)
node := New(host, consensus, nil, nil, nil, nil, reg)
txs := make(map[common.Address]types.Transactions)
stks := staking.StakingTransactions{}

@ -46,7 +46,11 @@ func TestFinalizeNewBlockAsync(t *testing.T) {
decider := quorum.NewDecider(
quorum.SuperMajorityVote, shard.BeaconChainShardID,
)
reg := registry.New().SetBlockchain(blockchain).SetBeaconchain(blockchain)
reg := registry.New().
SetBlockchain(blockchain).
SetBeaconchain(blockchain).
SetEngine(engine).
SetShardChainCollection(collection)
consensusObj, err := consensus.New(
host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsKey), reg, decider, 3, false,
)
@ -54,7 +58,7 @@ func TestFinalizeNewBlockAsync(t *testing.T) {
t.Fatalf("Cannot craeate consensus: %v", err)
}
node := New(host, consensusObj, engine, collection, nil, nil, nil, nil, nil, registry.New().SetBlockchain(blockchain))
node := New(host, consensusObj, nil, nil, nil, nil, reg)
node.Worker.UpdateCurrent()

@ -46,7 +46,11 @@ func TestNewNode(t *testing.T) {
if err != nil {
t.Fatal("cannot get blockchain")
}
reg := registry.New().SetBlockchain(blockchain)
reg := registry.New().
SetBlockchain(blockchain).
SetEngine(engine).
SetShardChainCollection(collection)
consensus, err := consensus.New(
host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsKey), reg, decider, 3, false,
)
@ -54,7 +58,7 @@ func TestNewNode(t *testing.T) {
t.Fatalf("Cannot craeate consensus: %v", err)
}
node := New(host, consensus, engine, collection, nil, nil, nil, nil, nil, reg)
node := New(host, consensus, nil, nil, nil, nil, reg)
if node.Consensus == nil {
t.Error("Consensus is not initialized for the node")
}

Loading…
Cancel
Save