diff --git a/api/service/stagedsync/sync_config.go b/api/service/stagedsync/sync_config.go index 91b3a4d73..7d2067081 100644 --- a/api/service/stagedsync/sync_config.go +++ b/api/service/stagedsync/sync_config.go @@ -23,9 +23,8 @@ const ( downloadBlocksRetryLimit = 3 // downloadBlocks service retry limit RegistrationNumber = 3 SyncingPortDifference = 3000 - inSyncThreshold = 0 // when peerBlockHeight - myBlockHeight <= inSyncThreshold, it's ready to join consensus - SyncLoopBatchSize uint32 = 30 // maximum size for one query of block hashes - verifyHeaderBatchSize uint64 = 100 // block chain header verification batch size (not used for now) + inSyncThreshold = 0 // when peerBlockHeight - myBlockHeight <= inSyncThreshold, it's ready to join consensus + SyncLoopBatchSize uint32 = 30 // maximum size for one query of block hashes LastMileBlocksSize = 50 // after cutting off a number of connected peers, the result number of peers @@ -53,14 +52,6 @@ type SyncPeerConfig struct { failedTimes uint64 } -// CreateTestSyncPeerConfig used for testing. -func CreateTestSyncPeerConfig(client *downloader.Client, blockHashes [][]byte) *SyncPeerConfig { - return &SyncPeerConfig{ - client: client, - blockHashes: blockHashes, - } -} - // GetClient returns client pointer of downloader.Client func (peerConfig *SyncPeerConfig) GetClient() *downloader.Client { return peerConfig.client @@ -303,21 +294,21 @@ func (sc *SyncConfig) FindPeerByHash(peerHash []byte) *SyncPeerConfig { // getHowManyMaxConsensus returns max number of consensus nodes and the first ID of consensus group. // Assumption: all peers are sorted by CompareSyncPeerConfigByBlockHashes first. // Caller shall ensure mtx is locked for reading. -func (sc *SyncConfig) getHowManyMaxConsensus() (int, int) { +func getHowManyMaxConsensus(peers []*SyncPeerConfig) (int, int) { // As all peers are sorted by their blockHashes, all equal blockHashes should come together and consecutively. - if len(sc.peers) == 0 { + if len(peers) == 0 { return -1, 0 - } else if len(sc.peers) == 1 { + } else if len(peers) == 1 { return 0, 1 } - maxFirstID := len(sc.peers) - 1 + maxFirstID := len(peers) - 1 for i := maxFirstID - 1; i >= 0; i-- { - if CompareSyncPeerConfigByblockHashes(sc.peers[maxFirstID], sc.peers[i]) != 0 { + if CompareSyncPeerConfigByblockHashes(peers[maxFirstID], peers[i]) != 0 { break } maxFirstID = i } - maxCount := len(sc.peers) - maxFirstID + maxCount := len(peers) - maxFirstID return maxFirstID, maxCount } @@ -386,7 +377,7 @@ func (sc *SyncConfig) GetBlockHashesConsensusAndCleanUp(bgMode bool) error { sort.Slice(sc.peers, func(i, j int) bool { return CompareSyncPeerConfigByblockHashes(sc.peers[i], sc.peers[j]) == -1 }) - maxFirstID, maxCount := sc.getHowManyMaxConsensus() + maxFirstID, maxCount := getHowManyMaxConsensus(sc.peers) if maxFirstID == -1 { return errors.New("invalid peer index -1 for block hashes query") } diff --git a/cmd/harmony/main.go b/cmd/harmony/main.go index ea2d62f6e..9fc89d45d 100644 --- a/cmd/harmony/main.go +++ b/cmd/harmony/main.go @@ -268,6 +268,29 @@ func setupNodeLog(config harmonyconfig.HarmonyConfig) { } } +func revert(chain core.BlockChain, hc harmonyconfig.HarmonyConfig) { + curNum := chain.CurrentBlock().NumberU64() + if curNum < uint64(hc.Revert.RevertBefore) && curNum >= uint64(hc.Revert.RevertTo) { + // Remove invalid blocks + for chain.CurrentBlock().NumberU64() >= uint64(hc.Revert.RevertTo) { + curBlock := chain.CurrentBlock() + rollbacks := []ethCommon.Hash{curBlock.Hash()} + if err := chain.Rollback(rollbacks); err != nil { + fmt.Printf("Revert failed: %v\n", err) + os.Exit(1) + } + lastSig := curBlock.Header().LastCommitSignature() + sigAndBitMap := append(lastSig[:], curBlock.Header().LastCommitBitmap()...) + chain.WriteCommitSig(curBlock.NumberU64()-1, sigAndBitMap) + } + fmt.Printf("Revert finished. Current block: %v\n", chain.CurrentBlock().NumberU64()) + utils.Logger().Warn(). + Uint64("Current Block", chain.CurrentBlock().NumberU64()). + Msg("Revert finished.") + os.Exit(1) + } +} + func setupNodeAndRun(hc harmonyconfig.HarmonyConfig) { var err error @@ -353,26 +376,7 @@ func setupNodeAndRun(hc harmonyconfig.HarmonyConfig) { if hc.Revert.RevertBeacon { chain = currentNode.Beaconchain() } - curNum := chain.CurrentBlock().NumberU64() - if curNum < uint64(hc.Revert.RevertBefore) && curNum >= uint64(hc.Revert.RevertTo) { - // Remove invalid blocks - for chain.CurrentBlock().NumberU64() >= uint64(hc.Revert.RevertTo) { - curBlock := chain.CurrentBlock() - rollbacks := []ethCommon.Hash{curBlock.Hash()} - if err := chain.Rollback(rollbacks); err != nil { - fmt.Printf("Revert failed: %v\n", err) - os.Exit(1) - } - lastSig := curBlock.Header().LastCommitSignature() - sigAndBitMap := append(lastSig[:], curBlock.Header().LastCommitBitmap()...) - chain.WriteCommitSig(curBlock.NumberU64()-1, sigAndBitMap) - } - fmt.Printf("Revert finished. Current block: %v\n", chain.CurrentBlock().NumberU64()) - utils.Logger().Warn(). - Uint64("Current Block", chain.CurrentBlock().NumberU64()). - Msg("Revert finished.") - os.Exit(1) - } + revert(chain, hc) } //// code to handle pre-image export, import and generation @@ -727,31 +731,7 @@ func createGlobalConfig(hc harmonyconfig.HarmonyConfig) (*nodeconfig.ConfigType, return nodeConfig, nil } -func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfig.ConfigType, registry *registry.Registry) *node.Node { - // Parse minPeers from harmonyconfig.HarmonyConfig - var minPeers int - var aggregateSig bool - if hc.Consensus != nil { - minPeers = hc.Consensus.MinPeers - aggregateSig = hc.Consensus.AggregateSig - } else { - minPeers = defaultConsensusConfig.MinPeers - aggregateSig = defaultConsensusConfig.AggregateSig - } - - blacklist, err := setupBlacklist(hc) - if err != nil { - utils.Logger().Warn().Msgf("Blacklist setup error: %s", err.Error()) - } - allowedTxs, err := setupAllowedTxs(hc) - if err != nil { - utils.Logger().Warn().Msgf("AllowedTxs setup error: %s", err.Error()) - } - - localAccounts, err := setupLocalAccounts(hc, blacklist) - if err != nil { - utils.Logger().Warn().Msgf("local accounts setup error: %s", err.Error()) - } +func setupChain(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfig.ConfigType, registry *registry.Registry) *registry.Registry { // Current node. var chainDBFactory shardchain.DBFactory @@ -770,6 +750,7 @@ func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfi } engine := chain.NewEngine() + registry.SetEngine(engine) chainConfig := nodeConfig.GetNetworkType().ChainConfig() collection := shardchain.NewCollection( @@ -780,6 +761,7 @@ func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfi collection.DisableCache(shardID) } } + registry.SetShardChainCollection(collection) var blockchain core.BlockChain @@ -793,17 +775,48 @@ func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfi registry.SetBeaconchain(beacon) } - blockchain, err = collection.ShardChain(nodeConfig.ShardID) + blockchain, err := collection.ShardChain(nodeConfig.ShardID) if err != nil { _, _ = fmt.Fprintf(os.Stderr, "Error :%v \n", err) os.Exit(1) } registry.SetBlockchain(blockchain) - registry.SetWebHooks(nodeConfig.WebHooks.Hooks) if registry.GetBeaconchain() == nil { registry.SetBeaconchain(registry.GetBlockchain()) } + return registry +} +func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfig.ConfigType, registry *registry.Registry) *node.Node { + // Parse minPeers from harmonyconfig.HarmonyConfig + var minPeers int + var aggregateSig bool + if hc.Consensus != nil { + minPeers = hc.Consensus.MinPeers + aggregateSig = hc.Consensus.AggregateSig + } else { + minPeers = defaultConsensusConfig.MinPeers + aggregateSig = defaultConsensusConfig.AggregateSig + } + + blacklist, err := setupBlacklist(hc) + if err != nil { + utils.Logger().Warn().Msgf("Blacklist setup error: %s", err.Error()) + } + allowedTxs, err := setupAllowedTxs(hc) + if err != nil { + utils.Logger().Warn().Msgf("AllowedTxs setup error: %s", err.Error()) + } + localAccounts, err := setupLocalAccounts(hc, blacklist) + if err != nil { + utils.Logger().Warn().Msgf("local accounts setup error: %s", err.Error()) + } + + registry = setupChain(hc, nodeConfig, registry) + if registry.GetShardChainCollection() == nil { + panic("shard chain collection is nil1111111") + } + registry.SetWebHooks(nodeConfig.WebHooks.Hooks) cxPool := core.NewCxPool(core.CxPoolSize) registry.SetCxPool(cxPool) @@ -818,7 +831,7 @@ func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfi os.Exit(1) } - currentNode := node.New(myHost, currentConsensus, engine, collection, blacklist, allowedTxs, localAccounts, nodeConfig.ArchiveModes(), &hc, registry) + currentNode := node.New(myHost, currentConsensus, blacklist, allowedTxs, localAccounts, &hc, registry) if hc.Legacy != nil && hc.Legacy.TPBroadcastInvalidTxn != nil { currentNode.BroadcastInvalidTx = *hc.Legacy.TPBroadcastInvalidTxn diff --git a/core_test/shardchain_test.go b/core_test/shardchain_test.go index 36a32f543..6d09df965 100644 --- a/core_test/shardchain_test.go +++ b/core_test/shardchain_test.go @@ -48,7 +48,10 @@ func TestAddNewBlock(t *testing.T) { if err != nil { t.Fatal("cannot get blockchain") } - reg := registry.New().SetBlockchain(blockchain) + reg := registry.New(). + SetBlockchain(blockchain). + SetEngine(engine). + SetShardChainCollection(collection) consensus, err := consensus.New( host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsKey), reg, decider, 3, false, ) @@ -57,7 +60,7 @@ func TestAddNewBlock(t *testing.T) { } nodeconfig.SetNetworkType(nodeconfig.Testnet) var block *types.Block - node := node.New(host, consensus, engine, collection, nil, nil, nil, nil, nil, reg) + node := node.New(host, consensus, nil, nil, nil, nil, reg) commitSigs := make(chan []byte, 1) commitSigs <- []byte{} block, err = node.Worker.FinalizeNewBlock( diff --git a/internal/registry/registry.go b/internal/registry/registry.go index 703260910..50398bc5c 100644 --- a/internal/registry/registry.go +++ b/internal/registry/registry.go @@ -3,7 +3,9 @@ package registry import ( "sync" + "github.com/harmony-one/harmony/consensus/engine" "github.com/harmony-one/harmony/core" + "github.com/harmony-one/harmony/internal/shardchain" "github.com/harmony-one/harmony/webhooks" ) @@ -16,6 +18,8 @@ type Registry struct { txPool *core.TxPool cxPool *core.CxPool isBackup bool + engine engine.Engine + collection *shardchain.CollectionImpl } // New creates a new registry. @@ -122,3 +126,37 @@ func (r *Registry) GetCxPool() *core.CxPool { return r.cxPool } + +// SetEngine sets the engine to registry. +func (r *Registry) SetEngine(engine engine.Engine) *Registry { + r.mu.Lock() + defer r.mu.Unlock() + + r.engine = engine + return r +} + +// GetEngine gets the engine from registry. +func (r *Registry) GetEngine() engine.Engine { + r.mu.Lock() + defer r.mu.Unlock() + + return r.engine +} + +// SetShardChainCollection sets the shard chain collection to registry. +func (r *Registry) SetShardChainCollection(collection *shardchain.CollectionImpl) *Registry { + r.mu.Lock() + defer r.mu.Unlock() + + r.collection = collection + return r +} + +// GetShardChainCollection gets the shard chain collection from registry. +func (r *Registry) GetShardChainCollection() *shardchain.CollectionImpl { + r.mu.Lock() + defer r.mu.Unlock() + + return r.collection +} diff --git a/node/node.go b/node/node.go index 981338df5..e4d567066 100644 --- a/node/node.go +++ b/node/node.go @@ -11,7 +11,6 @@ import ( "sync" "time" - "github.com/harmony-one/harmony/consensus/engine" "github.com/harmony-one/harmony/internal/registry" "github.com/harmony-one/harmony/internal/shardchain/tikv_manage" "github.com/harmony-one/harmony/internal/tikv" @@ -49,7 +48,6 @@ import ( common2 "github.com/harmony-one/harmony/internal/common" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/internal/params" - "github.com/harmony-one/harmony/internal/shardchain" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/node/worker" "github.com/harmony-one/harmony/p2p" @@ -104,8 +102,7 @@ type Node struct { pendingCXReceipts map[string]*types.CXReceiptsProof // All the receipts received but not yet processed for Consensus pendingCXMutex sync.Mutex crosslinks *crosslinks.Crosslinks // Memory storage for crosslink processing. - // Shard databases - shardChains shardchain.Collection + SelfPeer p2p.Peer stateMutex sync.Mutex // mutex for change node state TxPool *core.TxPool @@ -193,7 +190,10 @@ func (node *Node) Beaconchain() core.BlockChain { } func (node *Node) chain(shardID uint32, options core.Options) core.BlockChain { - bc, err := node.shardChains.ShardChain(shardID, options) + if node.registry.GetShardChainCollection() == nil { + panic("shard chain collection is nil") + } + bc, err := node.registry.GetShardChainCollection().ShardChain(shardID, options) if err != nil { utils.Logger().Error().Err(err).Msg("cannot get beaconchain") } @@ -1026,12 +1026,9 @@ func (node *Node) GetSyncID() [SyncIDLength]byte { func New( host p2p.Host, consensusObj *consensus.Consensus, - engine engine.Engine, - collection *shardchain.CollectionImpl, blacklist map[common.Address]struct{}, allowedTxs map[common.Address]core.AllowedTxData, localAccounts []common.Address, - isArchival map[uint32]bool, harmonyconfig *harmonyconfig.HarmonyConfig, registry *registry.Registry, ) *Node { @@ -1058,7 +1055,6 @@ func New( networkType := node.NodeConfig.GetNetworkType() chainConfig := networkType.ChainConfig() node.chainConfig = chainConfig - node.shardChains = collection node.IsSynchronized = abool.NewBool(false) if host != nil { @@ -1081,9 +1077,9 @@ func New( if b2 { shardID := node.NodeConfig.ShardID // HACK get the real error reason - _, err = node.shardChains.ShardChain(shardID) + _, err = node.registry.GetShardChainCollection().ShardChain(shardID) } else { - _, err = node.shardChains.ShardChain(shard.BeaconChainShardID) + _, err = node.registry.GetShardChainCollection().ShardChain(shard.BeaconChainShardID) } fmt.Fprintf(os.Stderr, "Cannot initialize node: %v\n", err) os.Exit(-1) diff --git a/node/node_handler_test.go b/node/node_handler_test.go index 307e21b11..a5085652b 100644 --- a/node/node_handler_test.go +++ b/node/node_handler_test.go @@ -45,13 +45,16 @@ func TestAddNewBlock(t *testing.T) { if err != nil { t.Fatal("cannot get blockchain") } - reg := registry.New().SetBlockchain(blockchain) + reg := registry.New(). + SetBlockchain(blockchain). + SetEngine(engine). + SetShardChainCollection(collection) consensus, err := consensus.New(host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsKey), reg, decider, 3, false) if err != nil { t.Fatalf("Cannot craeate consensus: %v", err) } nodeconfig.SetNetworkType(nodeconfig.Testnet) - node := New(host, consensus, engine, collection, nil, nil, nil, nil, nil, reg) + node := New(host, consensus, nil, nil, nil, nil, reg) txs := make(map[common.Address]types.Transactions) stks := staking.StakingTransactions{} @@ -100,7 +103,11 @@ func TestVerifyNewBlock(t *testing.T) { if err != nil { t.Fatal("cannot get blockchain") } - reg := registry.New().SetBlockchain(blockchain) + reg := registry.New(). + SetBlockchain(blockchain). + SetEngine(engine). + SetShardChainCollection(collection) + consensusObj, err := consensus.New( host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsKey), reg, decider, 3, false, ) @@ -110,7 +117,7 @@ func TestVerifyNewBlock(t *testing.T) { archiveMode := make(map[uint32]bool) archiveMode[0] = true archiveMode[1] = false - node := New(host, consensusObj, engine, collection, nil, nil, nil, archiveMode, nil, reg) + node := New(host, consensusObj, nil, nil, nil, nil, reg) txs := make(map[common.Address]types.Transactions) stks := staking.StakingTransactions{} @@ -156,7 +163,10 @@ func TestVerifyVRF(t *testing.T) { decider := quorum.NewDecider( quorum.SuperMajorityVote, shard.BeaconChainShardID, ) - reg := registry.New().SetBlockchain(blockchain) + reg := registry.New(). + SetBlockchain(blockchain). + SetEngine(engine). + SetShardChainCollection(collection) consensus, err := consensus.New( host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsKey), reg, decider, 3, false, ) @@ -166,7 +176,7 @@ func TestVerifyVRF(t *testing.T) { archiveMode := make(map[uint32]bool) archiveMode[0] = true archiveMode[1] = false - node := New(host, consensus, engine, collection, nil, nil, nil, archiveMode, nil, reg) + node := New(host, consensus, nil, nil, nil, nil, reg) txs := make(map[common.Address]types.Transactions) stks := staking.StakingTransactions{} diff --git a/node/node_newblock_test.go b/node/node_newblock_test.go index 963af2f55..86dd1e6c7 100644 --- a/node/node_newblock_test.go +++ b/node/node_newblock_test.go @@ -46,7 +46,11 @@ func TestFinalizeNewBlockAsync(t *testing.T) { decider := quorum.NewDecider( quorum.SuperMajorityVote, shard.BeaconChainShardID, ) - reg := registry.New().SetBlockchain(blockchain).SetBeaconchain(blockchain) + reg := registry.New(). + SetBlockchain(blockchain). + SetBeaconchain(blockchain). + SetEngine(engine). + SetShardChainCollection(collection) consensusObj, err := consensus.New( host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsKey), reg, decider, 3, false, ) @@ -54,7 +58,7 @@ func TestFinalizeNewBlockAsync(t *testing.T) { t.Fatalf("Cannot craeate consensus: %v", err) } - node := New(host, consensusObj, engine, collection, nil, nil, nil, nil, nil, registry.New().SetBlockchain(blockchain)) + node := New(host, consensusObj, nil, nil, nil, nil, reg) node.Worker.UpdateCurrent() diff --git a/node/node_test.go b/node/node_test.go index 49ba5d164..d96a26624 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -46,7 +46,11 @@ func TestNewNode(t *testing.T) { if err != nil { t.Fatal("cannot get blockchain") } - reg := registry.New().SetBlockchain(blockchain) + reg := registry.New(). + SetBlockchain(blockchain). + SetEngine(engine). + SetShardChainCollection(collection) + consensus, err := consensus.New( host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsKey), reg, decider, 3, false, ) @@ -54,7 +58,7 @@ func TestNewNode(t *testing.T) { t.Fatalf("Cannot craeate consensus: %v", err) } - node := New(host, consensus, engine, collection, nil, nil, nil, nil, nil, reg) + node := New(host, consensus, nil, nil, nil, nil, reg) if node.Consensus == nil { t.Error("Consensus is not initialized for the node") }