diff --git a/Makefile b/Makefile index deb990bf5..6fc8f2607 100644 --- a/Makefile +++ b/Makefile @@ -179,4 +179,7 @@ debug_external: clean bash test/debug-external.sh build_localnet_validator: - bash test/build-localnet-validator.sh \ No newline at end of file + bash test/build-localnet-validator.sh + +tt: + go test -v -test.run OnDisconnectCheck ./p2p/security \ No newline at end of file diff --git a/api/service/stagedstreamsync/downloader.go b/api/service/stagedstreamsync/downloader.go index 2b112bbac..371104895 100644 --- a/api/service/stagedstreamsync/downloader.go +++ b/api/service/stagedstreamsync/downloader.go @@ -6,6 +6,8 @@ import ( "time" "github.com/ethereum/go-ethereum/event" + "github.com/rs/zerolog" + "github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/core" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" @@ -14,7 +16,6 @@ import ( "github.com/harmony-one/harmony/p2p/stream/common/streammanager" "github.com/harmony-one/harmony/p2p/stream/protocols/sync" "github.com/harmony-one/harmony/shard" - "github.com/rs/zerolog" ) type ( @@ -37,7 +38,7 @@ type ( ) // NewDownloader creates a new downloader -func NewDownloader(host p2p.Host, bc core.BlockChain, consensus *consensus.Consensus, dbDir string, isBeaconNode bool, config Config, c *consensus.Consensus) *Downloader { +func NewDownloader(host p2p.Host, bc core.BlockChain, consensus *consensus.Consensus, dbDir string, isBeaconNode bool, config Config) *Downloader { config.fixValues() sp := sync.NewProtocol(sync.Config{ diff --git a/api/service/stagedstreamsync/downloaders.go b/api/service/stagedstreamsync/downloaders.go index 08a8e40de..583f3e152 100644 --- a/api/service/stagedstreamsync/downloaders.go +++ b/api/service/stagedstreamsync/downloaders.go @@ -16,7 +16,7 @@ type Downloaders struct { } // NewDownloaders creates Downloaders for sync of multiple blockchains -func NewDownloaders(host p2p.Host, bcs []core.BlockChain, consensus *consensus.Consensus, dbDir string, config Config, c *consensus.Consensus) *Downloaders { +func NewDownloaders(host p2p.Host, bcs []core.BlockChain, consensus *consensus.Consensus, dbDir string, config Config) *Downloaders { ds := make(map[uint32]*Downloader) isBeaconNode := len(bcs) == 1 for _, bc := range bcs { @@ -26,7 +26,7 @@ func NewDownloaders(host p2p.Host, bcs []core.BlockChain, consensus *consensus.C if _, ok := ds[bc.ShardID()]; ok { continue } - ds[bc.ShardID()] = NewDownloader(host, bc, consensus, dbDir, isBeaconNode, config, c) + ds[bc.ShardID()] = NewDownloader(host, bc, consensus, dbDir, isBeaconNode, config) } return &Downloaders{ ds: ds, diff --git a/api/service/stagedstreamsync/service.go b/api/service/stagedstreamsync/service.go index 90db7eada..f7ffd7f2d 100644 --- a/api/service/stagedstreamsync/service.go +++ b/api/service/stagedstreamsync/service.go @@ -12,9 +12,9 @@ type StagedStreamSyncService struct { } // NewService creates a new downloader service -func NewService(host p2p.Host, bcs []core.BlockChain, consensus *consensus.Consensus, config Config, dbDir string, c *consensus.Consensus) *StagedStreamSyncService { +func NewService(host p2p.Host, bcs []core.BlockChain, consensus *consensus.Consensus, config Config, dbDir string) *StagedStreamSyncService { return &StagedStreamSyncService{ - Downloaders: NewDownloaders(host, bcs, consensus, dbDir, config, c), + Downloaders: NewDownloaders(host, bcs, consensus, dbDir, config), } } diff --git a/api/service/stagedstreamsync/stage_heads.go b/api/service/stagedstreamsync/stage_heads.go index d05543c06..c917884a3 100644 --- a/api/service/stagedstreamsync/stage_heads.go +++ b/api/service/stagedstreamsync/stage_heads.go @@ -53,7 +53,7 @@ func (heads *StageHeads) Exec(ctx context.Context, firstCycle bool, invalidBlock maxHeight := s.state.status.targetBN maxBlocksPerSyncCycle := uint64(1024) // TODO: should be in config -> s.state.MaxBlocksPerSyncCycle - currentHeight := heads.configs.bc.CurrentHeader().NumberU64() + currentHeight := heads.configs.bc.CurrentBlock().NumberU64() s.state.currentCycle.TargetHeight = maxHeight targetHeight := uint64(0) if errV := CreateView(ctx, heads.configs.db, tx, func(etx kv.Tx) (err error) { diff --git a/api/service/stagedstreamsync/stage_short_range.go b/api/service/stagedstreamsync/stage_short_range.go index f3037869a..ce6cdf36b 100644 --- a/api/service/stagedstreamsync/stage_short_range.go +++ b/api/service/stagedstreamsync/stage_short_range.go @@ -3,9 +3,7 @@ package stagedstreamsync import ( "context" - "github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/core" - "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/internal/utils" sttypes "github.com/harmony-one/harmony/p2p/stream/types" "github.com/harmony-one/harmony/shard" @@ -20,7 +18,6 @@ type StageShortRange struct { type StageShortRangeCfg struct { bc core.BlockChain db kv.RwDB - c *consensus.Consensus } func NewStageShortRange(cfg StageShortRangeCfg) *StageShortRange { @@ -29,11 +26,10 @@ func NewStageShortRange(cfg StageShortRangeCfg) *StageShortRange { } } -func NewStageShortRangeCfg(bc core.BlockChain, db kv.RwDB, c *consensus.Consensus) StageShortRangeCfg { +func NewStageShortRangeCfg(bc core.BlockChain, db kv.RwDB) StageShortRangeCfg { return StageShortRangeCfg{ bc: bc, db: db, - c: c, } } @@ -108,12 +104,9 @@ func (sr *StageShortRange) doShortRangeSync(ctx context.Context, s *StageState) return 0, errors.Wrap(err, "prerequisite") } } - var ( - bc = sr.configs.bc - curBN = bc.CurrentHeader().NumberU64() - blkNums = sh.prepareBlockHashNumbers(curBN) - hashChain, whitelist, err = sh.getHashChain(ctx, blkNums) - ) + curBN := sr.configs.bc.CurrentBlock().NumberU64() + blkNums := sh.prepareBlockHashNumbers(curBN) + hashChain, whitelist, err := sh.getHashChain(ctx, blkNums) if err != nil { if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { return 0, nil @@ -163,25 +156,6 @@ func (sr *StageShortRange) doShortRangeSync(ctx context.Context, s *StageState) return 0, err } - numInserted := 0 - err = sr.configs.c.GetLastMileBlockIter(sr.configs.bc.CurrentHeader().NumberU64()+1, func(blockIter *consensus.LastMileBlockIter) error { - for { - block := blockIter.Next() - if block == nil { - break - } - if _, err := bc.InsertChain(types.Blocks{block}, true); err != nil { - return errors.Wrap(err, "failed to InsertChain") - } - numInserted++ - } - return nil - }) - if err != nil { - return 0, errors.WithMessage(err, "failed to InsertChain for last mile blocks") - } - utils.Logger().Info().Int("last mile blocks inserted", numInserted).Msg("Insert last mile blocks success") - return n, nil } diff --git a/api/service/stagedstreamsync/syncing.go b/api/service/stagedstreamsync/syncing.go index 0be19902a..738f2f920 100644 --- a/api/service/stagedstreamsync/syncing.go +++ b/api/service/stagedstreamsync/syncing.go @@ -47,6 +47,7 @@ func CreateStagedSync(ctx context.Context, config Config, logger zerolog.Logger, ) (*StagedStreamSync, error) { + logger.Info(). Uint32("shard", bc.ShardID()). Bool("beaconNode", isBeaconNode). @@ -55,6 +56,7 @@ func CreateStagedSync(ctx context.Context, Bool("serverOnly", config.ServerOnly). Int("minStreams", config.MinStreams). Msg(WrapStagedSyncMsg("creating staged sync")) + var mainDB kv.RwDB dbs := make([]kv.RwDB, config.Concurrency) if config.UseMemDB { @@ -80,7 +82,7 @@ func CreateStagedSync(ctx context.Context, } stageHeadsCfg := NewStageHeadersCfg(bc, mainDB) - stageShortRangeCfg := NewStageShortRangeCfg(bc, mainDB, consensus) + stageShortRangeCfg := NewStageShortRangeCfg(bc, mainDB) stageSyncEpochCfg := NewStageEpochCfg(bc, mainDB) stageBodiesCfg := NewStageBodiesCfg(bc, mainDB, dbs, config.Concurrency, protocol, isBeaconNode, config.LogProgress) stageStatesCfg := NewStageStatesCfg(bc, mainDB, dbs, config.Concurrency, logger, config.LogProgress) @@ -225,9 +227,6 @@ func (s *StagedStreamSync) doSync(downloaderContext context.Context, initSync bo return 0, 0, err } - s.startSyncing() - defer s.finishSyncing() - var estimatedHeight uint64 if initSync { if h, err := s.estimateCurrentNumber(downloaderContext); err != nil { @@ -244,20 +243,13 @@ func (s *StagedStreamSync) doSync(downloaderContext context.Context, initSync bo } } - i := 0 + s.startSyncing() + defer s.finishSyncing() + for { - i++ ctx, cancel := context.WithCancel(downloaderContext) - started := s.bc.CurrentHeader().NumberU64() + n, err := s.doSyncCycle(ctx, initSync) - finished := s.bc.CurrentHeader().NumberU64() - utils.Logger().Info(). - Uint64("from", started). - Int("returned", n). - Uint64("to", finished). - Bool("initSync", initSync). - Int("cycle", i). - Msg(WrapStagedSyncMsg("synced blocks")) if err != nil { utils.Logger().Error(). Err(err). @@ -377,9 +369,6 @@ func (s *StagedStreamSync) finishSyncing() { if s.evtDownloadFinishedSubscribed { s.evtDownloadFinished.Send(struct{}{}) } - utils.Logger().Info(). - Bool("evtDownloadFinishedSubscribed", s.evtDownloadFinishedSubscribed). - Msg(WrapStagedSyncMsg("finished syncing")) } func (s *StagedStreamSync) checkPrerequisites() error { diff --git a/cmd/harmony/main.go b/cmd/harmony/main.go index b1afbe6bf..9fc89d45d 100644 --- a/cmd/harmony/main.go +++ b/cmd/harmony/main.go @@ -1,7 +1,6 @@ package main import ( - "context" "fmt" "math/big" "math/rand" @@ -522,7 +521,7 @@ func setupNodeAndRun(hc harmonyconfig.HarmonyConfig) { Msg("Start p2p host failed") } - if err := node.BootstrapConsensus(context.TODO(), currentNode.Consensus, currentNode.Host()); err != nil { + if err := currentNode.BootstrapConsensus(); err != nil { fmt.Fprint(os.Stderr, "could not bootstrap consensus", err.Error()) if !currentNode.NodeConfig.IsOffline { os.Exit(-1) @@ -1033,7 +1032,7 @@ func setupStagedSyncService(node *node.Node, host p2p.Host, hc harmonyconfig.Har } } //Setup stream sync service - s := stagedstreamsync.NewService(host, blockchains, node.Consensus, sConfig, hc.General.DataDir, node.Consensus) + s := stagedstreamsync.NewService(host, blockchains, node.Consensus, sConfig, hc.General.DataDir) node.RegisterService(service.StagedStreamSync, s) diff --git a/consensus/consensus.go b/consensus/consensus.go index 1e35773ef..b396f6ead 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -94,6 +94,8 @@ type Consensus struct { // The post-consensus job func passed from Node object // Called when consensus on a new block is done PostConsensusJob func(*types.Block) error + // The verifier func passed from Node object + BlockVerifier VerifyBlockFunc // verified block to state sync broadcast VerifiedNewBlock chan *types.Block // will trigger state syncing when blockNum is low @@ -169,12 +171,12 @@ func (consensus *Consensus) Beaconchain() core.BlockChain { } // VerifyBlock is a function used to verify the block and keep trace of verified blocks. -func (FBFTLog *FBFTLog) verifyBlock(block *types.Block) error { - if !FBFTLog.IsBlockVerified(block.Hash()) { - if err := FBFTLog.BlockVerify(block); err != nil { +func (consensus *Consensus) verifyBlock(block *types.Block) error { + if !consensus.fBFTLog.IsBlockVerified(block.Hash()) { + if err := consensus.BlockVerifier(block); err != nil { return errors.Errorf("Block verification failed: %s", err) } - FBFTLog.MarkBlockVerified(block) + consensus.fBFTLog.MarkBlockVerified(block) } return nil } @@ -268,7 +270,7 @@ func New( consensus := Consensus{ mutex: &sync.RWMutex{}, ShardID: shard, - fBFTLog: NewFBFTLog(VerifyNewBlock(registry.GetWebHooks(), registry.GetBlockchain(), registry.GetBeaconchain())), + fBFTLog: NewFBFTLog(), phase: FBFTAnnounce, current: State{mode: Normal}, Decider: Decider, @@ -302,7 +304,12 @@ func New( consensus.RndChannel = make(chan [vdfAndSeedSize]byte) consensus.IgnoreViewIDCheck = abool.NewBool(false) // Make Sure Verifier is not null - consensus.vc = newViewChange(consensus.fBFTLog.BlockVerify) + consensus.vc = newViewChange() + // TODO: reference to blockchain/beaconchain should be removed. + verifier := VerifyNewBlock(registry.GetWebHooks(), consensus.Blockchain(), consensus.Beaconchain()) + consensus.BlockVerifier = verifier + consensus.vc.verifyBlock = consensus.verifyBlock + // init prometheus metrics initMetrics() consensus.AddPubkeyMetrics() diff --git a/consensus/consensus_test.go b/consensus/consensus_test.go index 725e70c3f..697ba4952 100644 --- a/consensus/consensus_test.go +++ b/consensus/consensus_test.go @@ -22,7 +22,6 @@ func TestConsensusInitialization(t *testing.T) { assert.NoError(t, err) messageSender := &MessageSender{host: host, retryTimes: int(phaseDuration.Seconds()) / RetryIntervalInSec} - fbtLog := NewFBFTLog(nil) state := State{mode: Normal} timeouts := createTimeout() @@ -37,10 +36,6 @@ func TestConsensusInitialization(t *testing.T) { assert.IsType(t, make(chan struct{}), consensus.BlockNumLowChan) // FBFTLog - assert.Equal(t, fbtLog.blocks, consensus.fBFTLog.blocks) - assert.Equal(t, fbtLog.messages, consensus.fBFTLog.messages) - assert.Equal(t, len(fbtLog.verifiedBlocks), 0) - assert.Equal(t, fbtLog.verifiedBlocks, consensus.fBFTLog.verifiedBlocks) assert.NotNil(t, consensus.FBFTLog()) assert.Equal(t, FBFTAnnounce, consensus.phase) diff --git a/consensus/consensus_v2.go b/consensus/consensus_v2.go index 258d82a8f..6d3ef5b47 100644 --- a/consensus/consensus_v2.go +++ b/consensus/consensus_v2.go @@ -8,14 +8,13 @@ import ( "sync/atomic" "time" - libp2p_peer "github.com/libp2p/go-libp2p/core/peer" - "github.com/ethereum/go-ethereum/common" bls2 "github.com/harmony-one/bls/ffi/go/bls" "github.com/harmony-one/harmony/consensus/signature" "github.com/harmony-one/harmony/core" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/internal/utils" + libp2p_peer "github.com/libp2p/go-libp2p/core/peer" "github.com/rs/zerolog" msg_pb "github.com/harmony-one/harmony/api/proto/message" @@ -395,12 +394,11 @@ func (consensus *Consensus) tick() { // the bootstrap timer will be stopped once consensus is reached or view change // is succeeded if k != timeoutBootstrap { - if v.Stop() { // prevent useless logs - consensus.getLogger().Debug(). - Str("k", k.String()). - Str("Mode", consensus.current.Mode().String()). - Msg("[ConsensusMainLoop] consensusTimeout stopped!!!") - } + consensus.getLogger().Debug(). + Str("k", k.String()). + Str("Mode", consensus.current.Mode().String()). + Msg("[ConsensusMainLoop] consensusTimeout stopped!!!") + v.Stop() continue } } @@ -456,6 +454,7 @@ func (consensus *Consensus) BlockChannel(newBlock *types.Block) { type LastMileBlockIter struct { blockCandidates []*types.Block fbftLog *FBFTLog + verify func(*types.Block) error curIndex int logger *zerolog.Logger } @@ -470,6 +469,9 @@ func (consensus *Consensus) GetLastMileBlockIter(bnStart uint64, cb func(iter *L // GetLastMileBlockIter get the iterator of the last mile blocks starting from number bnStart func (consensus *Consensus) getLastMileBlockIter(bnStart uint64, cb func(iter *LastMileBlockIter) error) error { + if consensus.BlockVerifier == nil { + return errors.New("consensus haven't initialized yet") + } blocks, _, err := consensus.getLastMileBlocksAndMsg(bnStart) if err != nil { return err @@ -477,6 +479,7 @@ func (consensus *Consensus) getLastMileBlockIter(bnStart uint64, cb func(iter *L return cb(&LastMileBlockIter{ blockCandidates: blocks, fbftLog: consensus.fBFTLog, + verify: consensus.BlockVerifier, curIndex: 0, logger: consensus.getLogger(), }) @@ -491,7 +494,7 @@ func (iter *LastMileBlockIter) Next() *types.Block { iter.curIndex++ if !iter.fbftLog.IsBlockVerified(block.Hash()) { - if err := iter.fbftLog.BlockVerify(block); err != nil { + if err := iter.verify(block); err != nil { iter.logger.Debug().Err(err).Msg("block verification failed in consensus last mile block") return nil } @@ -618,6 +621,9 @@ func (consensus *Consensus) verifyLastCommitSig(lastCommitSig []byte, blk *types // tryCatchup add the last mile block in PBFT log memory cache to blockchain. func (consensus *Consensus) tryCatchup() error { // TODO: change this to a more systematic symbol + if consensus.BlockVerifier == nil { + return errors.New("consensus haven't finished initialization") + } initBN := consensus.getBlockNum() defer consensus.postCatchup(initBN) @@ -632,7 +638,7 @@ func (consensus *Consensus) tryCatchup() error { } blk.SetCurrentCommitSig(msg.Payload) - if err := consensus.fBFTLog.verifyBlock(blk); err != nil { + if err := consensus.verifyBlock(blk); err != nil { consensus.getLogger().Err(err).Msg("[TryCatchup] failed block verifier") return err } diff --git a/consensus/fbft_log.go b/consensus/fbft_log.go index 7ffa2ff9e..982aecab7 100644 --- a/consensus/fbft_log.go +++ b/consensus/fbft_log.go @@ -113,16 +113,14 @@ type FBFTLog struct { blocks map[common.Hash]*types.Block // store blocks received in FBFT verifiedBlocks map[common.Hash]struct{} // store block hashes for blocks that has already been verified messages map[fbftMsgID]*FBFTMessage // store messages received in FBFT - verifyNewBlock func(*types.Block) error // block verification function } // NewFBFTLog returns new instance of FBFTLog -func NewFBFTLog(verifyNewBlock func(*types.Block) error) *FBFTLog { +func NewFBFTLog() *FBFTLog { pbftLog := FBFTLog{ blocks: make(map[common.Hash]*types.Block), messages: make(map[fbftMsgID]*FBFTMessage), verifiedBlocks: make(map[common.Hash]struct{}), - verifyNewBlock: verifyNewBlock, } return &pbftLog } @@ -132,10 +130,6 @@ func (log *FBFTLog) AddBlock(block *types.Block) { log.blocks[block.Hash()] = block } -func (log *FBFTLog) BlockVerify(block *types.Block) error { - return log.verifyNewBlock(block) -} - // MarkBlockVerified marks the block as verified func (log *FBFTLog) MarkBlockVerified(block *types.Block) { log.verifiedBlocks[block.Hash()] = struct{}{} diff --git a/consensus/fbft_log_test.go b/consensus/fbft_log_test.go index c22c70b3e..420effff4 100644 --- a/consensus/fbft_log_test.go +++ b/consensus/fbft_log_test.go @@ -65,7 +65,7 @@ func TestGetMessagesByTypeSeqViewHash(t *testing.T) { ViewID: 3, BlockHash: [32]byte{01, 02}, } - log := NewFBFTLog(nil) + log := NewFBFTLog() log.AddVerifiedMessage(&pbftMsg) found := log.GetMessagesByTypeSeqViewHash( @@ -90,7 +90,7 @@ func TestHasMatchingAnnounce(t *testing.T) { ViewID: 3, BlockHash: [32]byte{01, 02}, } - log := NewFBFTLog(nil) + log := NewFBFTLog() log.AddVerifiedMessage(&pbftMsg) found := log.HasMatchingViewAnnounce(2, 3, [32]byte{01, 02}) if !found { diff --git a/consensus/validator.go b/consensus/validator.go index 6a1f21e1a..0506f4359 100644 --- a/consensus/validator.go +++ b/consensus/validator.go @@ -63,11 +63,6 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) { go func() { // Best effort check, no need to error out. _, err := consensus.ValidateNewBlock(recvMsg) - if err != nil { - // maybe ban sender - consensus.getLogger().Error(). - Err(err).Msgf("[Announce] Failed to validate block") - } if err == nil { consensus.GetLogger().Info(). Msg("[Announce] Block verified") @@ -81,7 +76,6 @@ func (consensus *Consensus) ValidateNewBlock(recvMsg *FBFTMessage) (*types.Block defer consensus.mutex.Unlock() return consensus.validateNewBlock(recvMsg) } - func (consensus *Consensus) validateNewBlock(recvMsg *FBFTMessage) (*types.Block, error) { if consensus.fBFTLog.IsBlockVerified(recvMsg.BlockHash) { var blockObj *types.Block @@ -131,7 +125,12 @@ func (consensus *Consensus) validateNewBlock(recvMsg *FBFTMessage) (*types.Block Hex("blockHash", recvMsg.BlockHash[:]). Msg("[validateNewBlock] Prepared message and block added") - if err := consensus.fBFTLog.verifyBlock(&blockObj); err != nil { + if consensus.BlockVerifier == nil { + consensus.getLogger().Debug().Msg("[validateNewBlock] consensus received message before init. Ignoring") + return nil, errors.New("nil block verifier") + } + + if err := consensus.verifyBlock(&blockObj); err != nil { consensus.getLogger().Error().Err(err).Msg("[validateNewBlock] Block verification failed") return nil, errors.Errorf("Block verification failed: %s", err.Error()) } diff --git a/consensus/view_change_construct.go b/consensus/view_change_construct.go index 0c3aa1e60..061d2a795 100644 --- a/consensus/view_change_construct.go +++ b/consensus/view_change_construct.go @@ -51,11 +51,9 @@ type viewChange struct { } // newViewChange returns a new viewChange object -func newViewChange(verifyBlock VerifyBlockFunc) *viewChange { +func newViewChange() *viewChange { vc := viewChange{} vc.Reset() - vc.verifyBlock = verifyBlock - return &vc } diff --git a/consensus/view_change_msg.go b/consensus/view_change_msg.go index c0a9863dd..6c4b08005 100644 --- a/consensus/view_change_msg.go +++ b/consensus/view_change_msg.go @@ -45,7 +45,7 @@ func (consensus *Consensus) constructViewChangeMessage(priKey *bls.PrivateKeyWra Interface("preparedMsg", preparedMsg). Msg("[constructViewChangeMessage] found prepared msg") if block != nil { - if err := consensus.fBFTLog.verifyBlock(block); err == nil { + if err := consensus.verifyBlock(block); err == nil { tmpEncoded, err := rlp.EncodeToBytes(block) if err != nil { consensus.getLogger().Err(err).Msg("[constructViewChangeMessage] Failed encoding block") diff --git a/core/blockchain.go b/core/blockchain.go index 40d33100a..0adc96925 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -109,6 +109,14 @@ type BlockChain interface { // but does not write any state. This is used to construct competing side forks // up to the point where they exceed the canonical total difficulty. WriteBlockWithoutState(block *types.Block, td *big.Int) (err error) + // WriteBlockWithState writes the block and all associated state to the database. + WriteBlockWithState( + block *types.Block, receipts []*types.Receipt, + cxReceipts []*types.CXReceipt, + stakeMsgs []types2.StakeMsg, + paid reward.Reader, + state *state.DB, + ) (status WriteStatus, err error) // GetMaxGarbageCollectedBlockNumber .. GetMaxGarbageCollectedBlockNumber() int64 // InsertChain attempts to insert the given batch of blocks in to the canonical diff --git a/core/blockchain_impl.go b/core/blockchain_impl.go index 9e7f1134b..b12de5637 100644 --- a/core/blockchain_impl.go +++ b/core/blockchain_impl.go @@ -1473,8 +1473,7 @@ func (bc *BlockChainImpl) WriteBlockWithoutState(block *types.Block, td *big.Int return nil } -// writeBlockWithState writes the block and all associated state to the database. -func (bc *BlockChainImpl) writeBlockWithState( +func (bc *BlockChainImpl) WriteBlockWithState( block *types.Block, receipts []*types.Receipt, cxReceipts []*types.CXReceipt, stakeMsgs []staking.StakeMsg, @@ -1683,6 +1682,8 @@ func (bc *BlockChainImpl) insertChain(chain types.Blocks, verifyHeaders bool) (i if len(chain) == 0 { return 0, nil, nil, ErrEmptyChain } + first := chain[0] + fmt.Println("insertChain", utils.GetPort(), first.ShardID(), first.Epoch().Uint64(), first.NumberU64()) // Do a sanity check that the provided chain is actually ordered and linked for i := 1; i < len(chain); i++ { if chain[i].NumberU64() != chain[i-1].NumberU64()+1 || chain[i].ParentHash() != chain[i-1].Hash() { @@ -1881,7 +1882,7 @@ func (bc *BlockChainImpl) insertChain(chain types.Blocks, verifyHeaders bool) (i // Write the block to the chain and get the status. substart = time.Now() - status, err := bc.writeBlockWithState( + status, err := bc.WriteBlockWithState( block, receipts, cxReceipts, stakeMsgs, payout, state, ) if err != nil { diff --git a/core/blockchain_stub.go b/core/blockchain_stub.go index e42a12b10..e9ef10ce9 100644 --- a/core/blockchain_stub.go +++ b/core/blockchain_stub.go @@ -124,6 +124,10 @@ func (a Stub) WriteBlockWithoutState(block *types.Block, td *big.Int) (err error return errors.Errorf("method WriteBlockWithoutState not implemented for %s", a.Name) } +func (a Stub) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, cxReceipts []*types.CXReceipt, stakeMsgs []staking.StakeMsg, paid reward.Reader, state *state.DB) (status WriteStatus, err error) { + return 0, errors.Errorf("method WriteBlockWithState not implemented for %s", a.Name) +} + func (a Stub) GetMaxGarbageCollectedBlockNumber() int64 { return 0 } diff --git a/internal/utils/timer.go b/internal/utils/timer.go index 176732fca..d355d5c71 100644 --- a/internal/utils/timer.go +++ b/internal/utils/timer.go @@ -34,11 +34,9 @@ func (timeout *Timeout) Start() { } // Stop stops the timeout clock -func (timeout *Timeout) Stop() (stopped bool) { - stopped = timeout.state != Inactive +func (timeout *Timeout) Stop() { timeout.state = Inactive timeout.start = time.Now() - return stopped } // Expired checks whether the timeout is reached/expired diff --git a/node/node.go b/node/node.go index 8d9665854..f035bf491 100644 --- a/node/node.go +++ b/node/node.go @@ -149,10 +149,6 @@ type Node struct { registry *registry.Registry } -func (node *Node) Host() p2p.Host { - return node.host -} - // Blockchain returns the blockchain for the node's current shard. func (node *Node) Blockchain() core.BlockChain { return node.registry.GetBlockchain() diff --git a/node/node_handler.go b/node/node_handler.go index 89464d3c0..eeaf90f2d 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -404,21 +404,16 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block) error { } // BootstrapConsensus is a goroutine to check number of peers and start the consensus -func BootstrapConsensus(ctx context.Context, consensus *consensus.Consensus, host p2p.Host) error { - ctx, cancel := context.WithTimeout(ctx, time.Minute) +func (node *Node) BootstrapConsensus() error { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() - min := consensus.MinPeers + min := node.Consensus.MinPeers enoughMinPeers := make(chan struct{}) const checkEvery = 3 * time.Second go func() { for { - select { - case <-ctx.Done(): - return - case <-time.After(checkEvery): - } - - numPeersNow := host.GetPeerCount() + <-time.After(checkEvery) + numPeersNow := node.host.GetPeerCount() if numPeersNow >= min { utils.Logger().Info().Msg("[bootstrap] StartConsensus") enoughMinPeers <- struct{}{} @@ -437,7 +432,9 @@ func BootstrapConsensus(ctx context.Context, consensus *consensus.Consensus, hos case <-ctx.Done(): return ctx.Err() case <-enoughMinPeers: - go consensus.StartChannel() + go func() { + node.Consensus.StartChannel() + }() return nil } } diff --git a/p2p/security/security.go b/p2p/security/security.go index f8d032253..db70c76d4 100644 --- a/p2p/security/security.go +++ b/p2p/security/security.go @@ -7,14 +7,14 @@ import ( "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils/blockedpeers" - libp2pnetwork "github.com/libp2p/go-libp2p/core/network" + libp2p_network "github.com/libp2p/go-libp2p/core/network" ma "github.com/multiformats/go-multiaddr" "github.com/pkg/errors" ) type Security interface { - OnConnectCheck(net libp2pnetwork.Network, conn libp2pnetwork.Conn) error - OnDisconnectCheck(conn libp2pnetwork.Conn) error + OnConnectCheck(net libp2p_network.Network, conn libp2p_network.Conn) error + OnDisconnectCheck(conn libp2p_network.Conn) error } type peerMap struct { @@ -87,7 +87,7 @@ func (m *Manager) RangePeers(f func(key string, value []string) bool) { m.peers.Range(f) } -func (m *Manager) OnConnectCheck(net libp2pnetwork.Network, conn libp2pnetwork.Conn) error { +func (m *Manager) OnConnectCheck(net libp2p_network.Network, conn libp2p_network.Conn) error { m.mutex.Lock() defer m.mutex.Unlock() @@ -133,7 +133,7 @@ func (m *Manager) OnConnectCheck(net libp2pnetwork.Network, conn libp2pnetwork.C return nil } -func (m *Manager) OnDisconnectCheck(conn libp2pnetwork.Conn) error { +func (m *Manager) OnDisconnectCheck(conn libp2p_network.Conn) error { m.mutex.Lock() defer m.mutex.Unlock() @@ -171,7 +171,7 @@ func find(slice []string, val string) (int, bool) { return -1, false } -func getRemoteIP(conn libp2pnetwork.Conn) (string, error) { +func getRemoteIP(conn libp2p_network.Conn) (string, error) { for _, protocol := range conn.RemoteMultiaddr().Protocols() { switch protocol.Code { case ma.P_IP4: