pull/4455/head
frozen 1 year ago
parent 96e03868ae
commit 64d7392c3b
No known key found for this signature in database
GPG Key ID: 5391C63E79B03EDE
  1. 5
      Makefile
  2. 5
      api/service/stagedstreamsync/downloader.go
  3. 4
      api/service/stagedstreamsync/downloaders.go
  4. 4
      api/service/stagedstreamsync/service.go
  5. 2
      api/service/stagedstreamsync/stage_heads.go
  6. 34
      api/service/stagedstreamsync/stage_short_range.go
  7. 25
      api/service/stagedstreamsync/syncing.go
  8. 5
      cmd/harmony/main.go
  9. 19
      consensus/consensus.go
  10. 5
      consensus/consensus_test.go
  11. 26
      consensus/consensus_v2.go
  12. 8
      consensus/fbft_log.go
  13. 4
      consensus/fbft_log_test.go
  14. 13
      consensus/validator.go
  15. 4
      consensus/view_change_construct.go
  16. 2
      consensus/view_change_msg.go
  17. 8
      core/blockchain.go
  18. 7
      core/blockchain_impl.go
  19. 4
      core/blockchain_stub.go
  20. 4
      internal/utils/timer.go
  21. 4
      node/node.go
  22. 19
      node/node_handler.go
  23. 12
      p2p/security/security.go

@ -179,4 +179,7 @@ debug_external: clean
bash test/debug-external.sh bash test/debug-external.sh
build_localnet_validator: build_localnet_validator:
bash test/build-localnet-validator.sh bash test/build-localnet-validator.sh
tt:
go test -v -test.run OnDisconnectCheck ./p2p/security

@ -6,6 +6,8 @@ import (
"time" "time"
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
"github.com/rs/zerolog"
"github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node" nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
@ -14,7 +16,6 @@ import (
"github.com/harmony-one/harmony/p2p/stream/common/streammanager" "github.com/harmony-one/harmony/p2p/stream/common/streammanager"
"github.com/harmony-one/harmony/p2p/stream/protocols/sync" "github.com/harmony-one/harmony/p2p/stream/protocols/sync"
"github.com/harmony-one/harmony/shard" "github.com/harmony-one/harmony/shard"
"github.com/rs/zerolog"
) )
type ( type (
@ -37,7 +38,7 @@ type (
) )
// NewDownloader creates a new downloader // NewDownloader creates a new downloader
func NewDownloader(host p2p.Host, bc core.BlockChain, consensus *consensus.Consensus, dbDir string, isBeaconNode bool, config Config, c *consensus.Consensus) *Downloader { func NewDownloader(host p2p.Host, bc core.BlockChain, consensus *consensus.Consensus, dbDir string, isBeaconNode bool, config Config) *Downloader {
config.fixValues() config.fixValues()
sp := sync.NewProtocol(sync.Config{ sp := sync.NewProtocol(sync.Config{

@ -16,7 +16,7 @@ type Downloaders struct {
} }
// NewDownloaders creates Downloaders for sync of multiple blockchains // NewDownloaders creates Downloaders for sync of multiple blockchains
func NewDownloaders(host p2p.Host, bcs []core.BlockChain, consensus *consensus.Consensus, dbDir string, config Config, c *consensus.Consensus) *Downloaders { func NewDownloaders(host p2p.Host, bcs []core.BlockChain, consensus *consensus.Consensus, dbDir string, config Config) *Downloaders {
ds := make(map[uint32]*Downloader) ds := make(map[uint32]*Downloader)
isBeaconNode := len(bcs) == 1 isBeaconNode := len(bcs) == 1
for _, bc := range bcs { for _, bc := range bcs {
@ -26,7 +26,7 @@ func NewDownloaders(host p2p.Host, bcs []core.BlockChain, consensus *consensus.C
if _, ok := ds[bc.ShardID()]; ok { if _, ok := ds[bc.ShardID()]; ok {
continue continue
} }
ds[bc.ShardID()] = NewDownloader(host, bc, consensus, dbDir, isBeaconNode, config, c) ds[bc.ShardID()] = NewDownloader(host, bc, consensus, dbDir, isBeaconNode, config)
} }
return &Downloaders{ return &Downloaders{
ds: ds, ds: ds,

@ -12,9 +12,9 @@ type StagedStreamSyncService struct {
} }
// NewService creates a new downloader service // NewService creates a new downloader service
func NewService(host p2p.Host, bcs []core.BlockChain, consensus *consensus.Consensus, config Config, dbDir string, c *consensus.Consensus) *StagedStreamSyncService { func NewService(host p2p.Host, bcs []core.BlockChain, consensus *consensus.Consensus, config Config, dbDir string) *StagedStreamSyncService {
return &StagedStreamSyncService{ return &StagedStreamSyncService{
Downloaders: NewDownloaders(host, bcs, consensus, dbDir, config, c), Downloaders: NewDownloaders(host, bcs, consensus, dbDir, config),
} }
} }

@ -53,7 +53,7 @@ func (heads *StageHeads) Exec(ctx context.Context, firstCycle bool, invalidBlock
maxHeight := s.state.status.targetBN maxHeight := s.state.status.targetBN
maxBlocksPerSyncCycle := uint64(1024) // TODO: should be in config -> s.state.MaxBlocksPerSyncCycle maxBlocksPerSyncCycle := uint64(1024) // TODO: should be in config -> s.state.MaxBlocksPerSyncCycle
currentHeight := heads.configs.bc.CurrentHeader().NumberU64() currentHeight := heads.configs.bc.CurrentBlock().NumberU64()
s.state.currentCycle.TargetHeight = maxHeight s.state.currentCycle.TargetHeight = maxHeight
targetHeight := uint64(0) targetHeight := uint64(0)
if errV := CreateView(ctx, heads.configs.db, tx, func(etx kv.Tx) (err error) { if errV := CreateView(ctx, heads.configs.db, tx, func(etx kv.Tx) (err error) {

@ -3,9 +3,7 @@ package stagedstreamsync
import ( import (
"context" "context"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
sttypes "github.com/harmony-one/harmony/p2p/stream/types" sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/harmony-one/harmony/shard" "github.com/harmony-one/harmony/shard"
@ -20,7 +18,6 @@ type StageShortRange struct {
type StageShortRangeCfg struct { type StageShortRangeCfg struct {
bc core.BlockChain bc core.BlockChain
db kv.RwDB db kv.RwDB
c *consensus.Consensus
} }
func NewStageShortRange(cfg StageShortRangeCfg) *StageShortRange { func NewStageShortRange(cfg StageShortRangeCfg) *StageShortRange {
@ -29,11 +26,10 @@ func NewStageShortRange(cfg StageShortRangeCfg) *StageShortRange {
} }
} }
func NewStageShortRangeCfg(bc core.BlockChain, db kv.RwDB, c *consensus.Consensus) StageShortRangeCfg { func NewStageShortRangeCfg(bc core.BlockChain, db kv.RwDB) StageShortRangeCfg {
return StageShortRangeCfg{ return StageShortRangeCfg{
bc: bc, bc: bc,
db: db, db: db,
c: c,
} }
} }
@ -108,12 +104,9 @@ func (sr *StageShortRange) doShortRangeSync(ctx context.Context, s *StageState)
return 0, errors.Wrap(err, "prerequisite") return 0, errors.Wrap(err, "prerequisite")
} }
} }
var ( curBN := sr.configs.bc.CurrentBlock().NumberU64()
bc = sr.configs.bc blkNums := sh.prepareBlockHashNumbers(curBN)
curBN = bc.CurrentHeader().NumberU64() hashChain, whitelist, err := sh.getHashChain(ctx, blkNums)
blkNums = sh.prepareBlockHashNumbers(curBN)
hashChain, whitelist, err = sh.getHashChain(ctx, blkNums)
)
if err != nil { if err != nil {
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
return 0, nil return 0, nil
@ -163,25 +156,6 @@ func (sr *StageShortRange) doShortRangeSync(ctx context.Context, s *StageState)
return 0, err return 0, err
} }
numInserted := 0
err = sr.configs.c.GetLastMileBlockIter(sr.configs.bc.CurrentHeader().NumberU64()+1, func(blockIter *consensus.LastMileBlockIter) error {
for {
block := blockIter.Next()
if block == nil {
break
}
if _, err := bc.InsertChain(types.Blocks{block}, true); err != nil {
return errors.Wrap(err, "failed to InsertChain")
}
numInserted++
}
return nil
})
if err != nil {
return 0, errors.WithMessage(err, "failed to InsertChain for last mile blocks")
}
utils.Logger().Info().Int("last mile blocks inserted", numInserted).Msg("Insert last mile blocks success")
return n, nil return n, nil
} }

@ -47,6 +47,7 @@ func CreateStagedSync(ctx context.Context,
config Config, config Config,
logger zerolog.Logger, logger zerolog.Logger,
) (*StagedStreamSync, error) { ) (*StagedStreamSync, error) {
logger.Info(). logger.Info().
Uint32("shard", bc.ShardID()). Uint32("shard", bc.ShardID()).
Bool("beaconNode", isBeaconNode). Bool("beaconNode", isBeaconNode).
@ -55,6 +56,7 @@ func CreateStagedSync(ctx context.Context,
Bool("serverOnly", config.ServerOnly). Bool("serverOnly", config.ServerOnly).
Int("minStreams", config.MinStreams). Int("minStreams", config.MinStreams).
Msg(WrapStagedSyncMsg("creating staged sync")) Msg(WrapStagedSyncMsg("creating staged sync"))
var mainDB kv.RwDB var mainDB kv.RwDB
dbs := make([]kv.RwDB, config.Concurrency) dbs := make([]kv.RwDB, config.Concurrency)
if config.UseMemDB { if config.UseMemDB {
@ -80,7 +82,7 @@ func CreateStagedSync(ctx context.Context,
} }
stageHeadsCfg := NewStageHeadersCfg(bc, mainDB) stageHeadsCfg := NewStageHeadersCfg(bc, mainDB)
stageShortRangeCfg := NewStageShortRangeCfg(bc, mainDB, consensus) stageShortRangeCfg := NewStageShortRangeCfg(bc, mainDB)
stageSyncEpochCfg := NewStageEpochCfg(bc, mainDB) stageSyncEpochCfg := NewStageEpochCfg(bc, mainDB)
stageBodiesCfg := NewStageBodiesCfg(bc, mainDB, dbs, config.Concurrency, protocol, isBeaconNode, config.LogProgress) stageBodiesCfg := NewStageBodiesCfg(bc, mainDB, dbs, config.Concurrency, protocol, isBeaconNode, config.LogProgress)
stageStatesCfg := NewStageStatesCfg(bc, mainDB, dbs, config.Concurrency, logger, config.LogProgress) stageStatesCfg := NewStageStatesCfg(bc, mainDB, dbs, config.Concurrency, logger, config.LogProgress)
@ -225,9 +227,6 @@ func (s *StagedStreamSync) doSync(downloaderContext context.Context, initSync bo
return 0, 0, err return 0, 0, err
} }
s.startSyncing()
defer s.finishSyncing()
var estimatedHeight uint64 var estimatedHeight uint64
if initSync { if initSync {
if h, err := s.estimateCurrentNumber(downloaderContext); err != nil { if h, err := s.estimateCurrentNumber(downloaderContext); err != nil {
@ -244,20 +243,13 @@ func (s *StagedStreamSync) doSync(downloaderContext context.Context, initSync bo
} }
} }
i := 0 s.startSyncing()
defer s.finishSyncing()
for { for {
i++
ctx, cancel := context.WithCancel(downloaderContext) ctx, cancel := context.WithCancel(downloaderContext)
started := s.bc.CurrentHeader().NumberU64()
n, err := s.doSyncCycle(ctx, initSync) n, err := s.doSyncCycle(ctx, initSync)
finished := s.bc.CurrentHeader().NumberU64()
utils.Logger().Info().
Uint64("from", started).
Int("returned", n).
Uint64("to", finished).
Bool("initSync", initSync).
Int("cycle", i).
Msg(WrapStagedSyncMsg("synced blocks"))
if err != nil { if err != nil {
utils.Logger().Error(). utils.Logger().Error().
Err(err). Err(err).
@ -377,9 +369,6 @@ func (s *StagedStreamSync) finishSyncing() {
if s.evtDownloadFinishedSubscribed { if s.evtDownloadFinishedSubscribed {
s.evtDownloadFinished.Send(struct{}{}) s.evtDownloadFinished.Send(struct{}{})
} }
utils.Logger().Info().
Bool("evtDownloadFinishedSubscribed", s.evtDownloadFinishedSubscribed).
Msg(WrapStagedSyncMsg("finished syncing"))
} }
func (s *StagedStreamSync) checkPrerequisites() error { func (s *StagedStreamSync) checkPrerequisites() error {

@ -1,7 +1,6 @@
package main package main
import ( import (
"context"
"fmt" "fmt"
"math/big" "math/big"
"math/rand" "math/rand"
@ -522,7 +521,7 @@ func setupNodeAndRun(hc harmonyconfig.HarmonyConfig) {
Msg("Start p2p host failed") Msg("Start p2p host failed")
} }
if err := node.BootstrapConsensus(context.TODO(), currentNode.Consensus, currentNode.Host()); err != nil { if err := currentNode.BootstrapConsensus(); err != nil {
fmt.Fprint(os.Stderr, "could not bootstrap consensus", err.Error()) fmt.Fprint(os.Stderr, "could not bootstrap consensus", err.Error())
if !currentNode.NodeConfig.IsOffline { if !currentNode.NodeConfig.IsOffline {
os.Exit(-1) os.Exit(-1)
@ -1033,7 +1032,7 @@ func setupStagedSyncService(node *node.Node, host p2p.Host, hc harmonyconfig.Har
} }
} }
//Setup stream sync service //Setup stream sync service
s := stagedstreamsync.NewService(host, blockchains, node.Consensus, sConfig, hc.General.DataDir, node.Consensus) s := stagedstreamsync.NewService(host, blockchains, node.Consensus, sConfig, hc.General.DataDir)
node.RegisterService(service.StagedStreamSync, s) node.RegisterService(service.StagedStreamSync, s)

@ -94,6 +94,8 @@ type Consensus struct {
// The post-consensus job func passed from Node object // The post-consensus job func passed from Node object
// Called when consensus on a new block is done // Called when consensus on a new block is done
PostConsensusJob func(*types.Block) error PostConsensusJob func(*types.Block) error
// The verifier func passed from Node object
BlockVerifier VerifyBlockFunc
// verified block to state sync broadcast // verified block to state sync broadcast
VerifiedNewBlock chan *types.Block VerifiedNewBlock chan *types.Block
// will trigger state syncing when blockNum is low // will trigger state syncing when blockNum is low
@ -169,12 +171,12 @@ func (consensus *Consensus) Beaconchain() core.BlockChain {
} }
// VerifyBlock is a function used to verify the block and keep trace of verified blocks. // VerifyBlock is a function used to verify the block and keep trace of verified blocks.
func (FBFTLog *FBFTLog) verifyBlock(block *types.Block) error { func (consensus *Consensus) verifyBlock(block *types.Block) error {
if !FBFTLog.IsBlockVerified(block.Hash()) { if !consensus.fBFTLog.IsBlockVerified(block.Hash()) {
if err := FBFTLog.BlockVerify(block); err != nil { if err := consensus.BlockVerifier(block); err != nil {
return errors.Errorf("Block verification failed: %s", err) return errors.Errorf("Block verification failed: %s", err)
} }
FBFTLog.MarkBlockVerified(block) consensus.fBFTLog.MarkBlockVerified(block)
} }
return nil return nil
} }
@ -268,7 +270,7 @@ func New(
consensus := Consensus{ consensus := Consensus{
mutex: &sync.RWMutex{}, mutex: &sync.RWMutex{},
ShardID: shard, ShardID: shard,
fBFTLog: NewFBFTLog(VerifyNewBlock(registry.GetWebHooks(), registry.GetBlockchain(), registry.GetBeaconchain())), fBFTLog: NewFBFTLog(),
phase: FBFTAnnounce, phase: FBFTAnnounce,
current: State{mode: Normal}, current: State{mode: Normal},
Decider: Decider, Decider: Decider,
@ -302,7 +304,12 @@ func New(
consensus.RndChannel = make(chan [vdfAndSeedSize]byte) consensus.RndChannel = make(chan [vdfAndSeedSize]byte)
consensus.IgnoreViewIDCheck = abool.NewBool(false) consensus.IgnoreViewIDCheck = abool.NewBool(false)
// Make Sure Verifier is not null // Make Sure Verifier is not null
consensus.vc = newViewChange(consensus.fBFTLog.BlockVerify) consensus.vc = newViewChange()
// TODO: reference to blockchain/beaconchain should be removed.
verifier := VerifyNewBlock(registry.GetWebHooks(), consensus.Blockchain(), consensus.Beaconchain())
consensus.BlockVerifier = verifier
consensus.vc.verifyBlock = consensus.verifyBlock
// init prometheus metrics // init prometheus metrics
initMetrics() initMetrics()
consensus.AddPubkeyMetrics() consensus.AddPubkeyMetrics()

@ -22,7 +22,6 @@ func TestConsensusInitialization(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
messageSender := &MessageSender{host: host, retryTimes: int(phaseDuration.Seconds()) / RetryIntervalInSec} messageSender := &MessageSender{host: host, retryTimes: int(phaseDuration.Seconds()) / RetryIntervalInSec}
fbtLog := NewFBFTLog(nil)
state := State{mode: Normal} state := State{mode: Normal}
timeouts := createTimeout() timeouts := createTimeout()
@ -37,10 +36,6 @@ func TestConsensusInitialization(t *testing.T) {
assert.IsType(t, make(chan struct{}), consensus.BlockNumLowChan) assert.IsType(t, make(chan struct{}), consensus.BlockNumLowChan)
// FBFTLog // FBFTLog
assert.Equal(t, fbtLog.blocks, consensus.fBFTLog.blocks)
assert.Equal(t, fbtLog.messages, consensus.fBFTLog.messages)
assert.Equal(t, len(fbtLog.verifiedBlocks), 0)
assert.Equal(t, fbtLog.verifiedBlocks, consensus.fBFTLog.verifiedBlocks)
assert.NotNil(t, consensus.FBFTLog()) assert.NotNil(t, consensus.FBFTLog())
assert.Equal(t, FBFTAnnounce, consensus.phase) assert.Equal(t, FBFTAnnounce, consensus.phase)

@ -8,14 +8,13 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
libp2p_peer "github.com/libp2p/go-libp2p/core/peer"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
bls2 "github.com/harmony-one/bls/ffi/go/bls" bls2 "github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/consensus/signature" "github.com/harmony-one/harmony/consensus/signature"
"github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node" nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
libp2p_peer "github.com/libp2p/go-libp2p/core/peer"
"github.com/rs/zerolog" "github.com/rs/zerolog"
msg_pb "github.com/harmony-one/harmony/api/proto/message" msg_pb "github.com/harmony-one/harmony/api/proto/message"
@ -395,12 +394,11 @@ func (consensus *Consensus) tick() {
// the bootstrap timer will be stopped once consensus is reached or view change // the bootstrap timer will be stopped once consensus is reached or view change
// is succeeded // is succeeded
if k != timeoutBootstrap { if k != timeoutBootstrap {
if v.Stop() { // prevent useless logs consensus.getLogger().Debug().
consensus.getLogger().Debug(). Str("k", k.String()).
Str("k", k.String()). Str("Mode", consensus.current.Mode().String()).
Str("Mode", consensus.current.Mode().String()). Msg("[ConsensusMainLoop] consensusTimeout stopped!!!")
Msg("[ConsensusMainLoop] consensusTimeout stopped!!!") v.Stop()
}
continue continue
} }
} }
@ -456,6 +454,7 @@ func (consensus *Consensus) BlockChannel(newBlock *types.Block) {
type LastMileBlockIter struct { type LastMileBlockIter struct {
blockCandidates []*types.Block blockCandidates []*types.Block
fbftLog *FBFTLog fbftLog *FBFTLog
verify func(*types.Block) error
curIndex int curIndex int
logger *zerolog.Logger logger *zerolog.Logger
} }
@ -470,6 +469,9 @@ func (consensus *Consensus) GetLastMileBlockIter(bnStart uint64, cb func(iter *L
// GetLastMileBlockIter get the iterator of the last mile blocks starting from number bnStart // GetLastMileBlockIter get the iterator of the last mile blocks starting from number bnStart
func (consensus *Consensus) getLastMileBlockIter(bnStart uint64, cb func(iter *LastMileBlockIter) error) error { func (consensus *Consensus) getLastMileBlockIter(bnStart uint64, cb func(iter *LastMileBlockIter) error) error {
if consensus.BlockVerifier == nil {
return errors.New("consensus haven't initialized yet")
}
blocks, _, err := consensus.getLastMileBlocksAndMsg(bnStart) blocks, _, err := consensus.getLastMileBlocksAndMsg(bnStart)
if err != nil { if err != nil {
return err return err
@ -477,6 +479,7 @@ func (consensus *Consensus) getLastMileBlockIter(bnStart uint64, cb func(iter *L
return cb(&LastMileBlockIter{ return cb(&LastMileBlockIter{
blockCandidates: blocks, blockCandidates: blocks,
fbftLog: consensus.fBFTLog, fbftLog: consensus.fBFTLog,
verify: consensus.BlockVerifier,
curIndex: 0, curIndex: 0,
logger: consensus.getLogger(), logger: consensus.getLogger(),
}) })
@ -491,7 +494,7 @@ func (iter *LastMileBlockIter) Next() *types.Block {
iter.curIndex++ iter.curIndex++
if !iter.fbftLog.IsBlockVerified(block.Hash()) { if !iter.fbftLog.IsBlockVerified(block.Hash()) {
if err := iter.fbftLog.BlockVerify(block); err != nil { if err := iter.verify(block); err != nil {
iter.logger.Debug().Err(err).Msg("block verification failed in consensus last mile block") iter.logger.Debug().Err(err).Msg("block verification failed in consensus last mile block")
return nil return nil
} }
@ -618,6 +621,9 @@ func (consensus *Consensus) verifyLastCommitSig(lastCommitSig []byte, blk *types
// tryCatchup add the last mile block in PBFT log memory cache to blockchain. // tryCatchup add the last mile block in PBFT log memory cache to blockchain.
func (consensus *Consensus) tryCatchup() error { func (consensus *Consensus) tryCatchup() error {
// TODO: change this to a more systematic symbol // TODO: change this to a more systematic symbol
if consensus.BlockVerifier == nil {
return errors.New("consensus haven't finished initialization")
}
initBN := consensus.getBlockNum() initBN := consensus.getBlockNum()
defer consensus.postCatchup(initBN) defer consensus.postCatchup(initBN)
@ -632,7 +638,7 @@ func (consensus *Consensus) tryCatchup() error {
} }
blk.SetCurrentCommitSig(msg.Payload) blk.SetCurrentCommitSig(msg.Payload)
if err := consensus.fBFTLog.verifyBlock(blk); err != nil { if err := consensus.verifyBlock(blk); err != nil {
consensus.getLogger().Err(err).Msg("[TryCatchup] failed block verifier") consensus.getLogger().Err(err).Msg("[TryCatchup] failed block verifier")
return err return err
} }

@ -113,16 +113,14 @@ type FBFTLog struct {
blocks map[common.Hash]*types.Block // store blocks received in FBFT blocks map[common.Hash]*types.Block // store blocks received in FBFT
verifiedBlocks map[common.Hash]struct{} // store block hashes for blocks that has already been verified verifiedBlocks map[common.Hash]struct{} // store block hashes for blocks that has already been verified
messages map[fbftMsgID]*FBFTMessage // store messages received in FBFT messages map[fbftMsgID]*FBFTMessage // store messages received in FBFT
verifyNewBlock func(*types.Block) error // block verification function
} }
// NewFBFTLog returns new instance of FBFTLog // NewFBFTLog returns new instance of FBFTLog
func NewFBFTLog(verifyNewBlock func(*types.Block) error) *FBFTLog { func NewFBFTLog() *FBFTLog {
pbftLog := FBFTLog{ pbftLog := FBFTLog{
blocks: make(map[common.Hash]*types.Block), blocks: make(map[common.Hash]*types.Block),
messages: make(map[fbftMsgID]*FBFTMessage), messages: make(map[fbftMsgID]*FBFTMessage),
verifiedBlocks: make(map[common.Hash]struct{}), verifiedBlocks: make(map[common.Hash]struct{}),
verifyNewBlock: verifyNewBlock,
} }
return &pbftLog return &pbftLog
} }
@ -132,10 +130,6 @@ func (log *FBFTLog) AddBlock(block *types.Block) {
log.blocks[block.Hash()] = block log.blocks[block.Hash()] = block
} }
func (log *FBFTLog) BlockVerify(block *types.Block) error {
return log.verifyNewBlock(block)
}
// MarkBlockVerified marks the block as verified // MarkBlockVerified marks the block as verified
func (log *FBFTLog) MarkBlockVerified(block *types.Block) { func (log *FBFTLog) MarkBlockVerified(block *types.Block) {
log.verifiedBlocks[block.Hash()] = struct{}{} log.verifiedBlocks[block.Hash()] = struct{}{}

@ -65,7 +65,7 @@ func TestGetMessagesByTypeSeqViewHash(t *testing.T) {
ViewID: 3, ViewID: 3,
BlockHash: [32]byte{01, 02}, BlockHash: [32]byte{01, 02},
} }
log := NewFBFTLog(nil) log := NewFBFTLog()
log.AddVerifiedMessage(&pbftMsg) log.AddVerifiedMessage(&pbftMsg)
found := log.GetMessagesByTypeSeqViewHash( found := log.GetMessagesByTypeSeqViewHash(
@ -90,7 +90,7 @@ func TestHasMatchingAnnounce(t *testing.T) {
ViewID: 3, ViewID: 3,
BlockHash: [32]byte{01, 02}, BlockHash: [32]byte{01, 02},
} }
log := NewFBFTLog(nil) log := NewFBFTLog()
log.AddVerifiedMessage(&pbftMsg) log.AddVerifiedMessage(&pbftMsg)
found := log.HasMatchingViewAnnounce(2, 3, [32]byte{01, 02}) found := log.HasMatchingViewAnnounce(2, 3, [32]byte{01, 02})
if !found { if !found {

@ -63,11 +63,6 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) {
go func() { go func() {
// Best effort check, no need to error out. // Best effort check, no need to error out.
_, err := consensus.ValidateNewBlock(recvMsg) _, err := consensus.ValidateNewBlock(recvMsg)
if err != nil {
// maybe ban sender
consensus.getLogger().Error().
Err(err).Msgf("[Announce] Failed to validate block")
}
if err == nil { if err == nil {
consensus.GetLogger().Info(). consensus.GetLogger().Info().
Msg("[Announce] Block verified") Msg("[Announce] Block verified")
@ -81,7 +76,6 @@ func (consensus *Consensus) ValidateNewBlock(recvMsg *FBFTMessage) (*types.Block
defer consensus.mutex.Unlock() defer consensus.mutex.Unlock()
return consensus.validateNewBlock(recvMsg) return consensus.validateNewBlock(recvMsg)
} }
func (consensus *Consensus) validateNewBlock(recvMsg *FBFTMessage) (*types.Block, error) { func (consensus *Consensus) validateNewBlock(recvMsg *FBFTMessage) (*types.Block, error) {
if consensus.fBFTLog.IsBlockVerified(recvMsg.BlockHash) { if consensus.fBFTLog.IsBlockVerified(recvMsg.BlockHash) {
var blockObj *types.Block var blockObj *types.Block
@ -131,7 +125,12 @@ func (consensus *Consensus) validateNewBlock(recvMsg *FBFTMessage) (*types.Block
Hex("blockHash", recvMsg.BlockHash[:]). Hex("blockHash", recvMsg.BlockHash[:]).
Msg("[validateNewBlock] Prepared message and block added") Msg("[validateNewBlock] Prepared message and block added")
if err := consensus.fBFTLog.verifyBlock(&blockObj); err != nil { if consensus.BlockVerifier == nil {
consensus.getLogger().Debug().Msg("[validateNewBlock] consensus received message before init. Ignoring")
return nil, errors.New("nil block verifier")
}
if err := consensus.verifyBlock(&blockObj); err != nil {
consensus.getLogger().Error().Err(err).Msg("[validateNewBlock] Block verification failed") consensus.getLogger().Error().Err(err).Msg("[validateNewBlock] Block verification failed")
return nil, errors.Errorf("Block verification failed: %s", err.Error()) return nil, errors.Errorf("Block verification failed: %s", err.Error())
} }

@ -51,11 +51,9 @@ type viewChange struct {
} }
// newViewChange returns a new viewChange object // newViewChange returns a new viewChange object
func newViewChange(verifyBlock VerifyBlockFunc) *viewChange { func newViewChange() *viewChange {
vc := viewChange{} vc := viewChange{}
vc.Reset() vc.Reset()
vc.verifyBlock = verifyBlock
return &vc return &vc
} }

@ -45,7 +45,7 @@ func (consensus *Consensus) constructViewChangeMessage(priKey *bls.PrivateKeyWra
Interface("preparedMsg", preparedMsg). Interface("preparedMsg", preparedMsg).
Msg("[constructViewChangeMessage] found prepared msg") Msg("[constructViewChangeMessage] found prepared msg")
if block != nil { if block != nil {
if err := consensus.fBFTLog.verifyBlock(block); err == nil { if err := consensus.verifyBlock(block); err == nil {
tmpEncoded, err := rlp.EncodeToBytes(block) tmpEncoded, err := rlp.EncodeToBytes(block)
if err != nil { if err != nil {
consensus.getLogger().Err(err).Msg("[constructViewChangeMessage] Failed encoding block") consensus.getLogger().Err(err).Msg("[constructViewChangeMessage] Failed encoding block")

@ -109,6 +109,14 @@ type BlockChain interface {
// but does not write any state. This is used to construct competing side forks // but does not write any state. This is used to construct competing side forks
// up to the point where they exceed the canonical total difficulty. // up to the point where they exceed the canonical total difficulty.
WriteBlockWithoutState(block *types.Block, td *big.Int) (err error) WriteBlockWithoutState(block *types.Block, td *big.Int) (err error)
// WriteBlockWithState writes the block and all associated state to the database.
WriteBlockWithState(
block *types.Block, receipts []*types.Receipt,
cxReceipts []*types.CXReceipt,
stakeMsgs []types2.StakeMsg,
paid reward.Reader,
state *state.DB,
) (status WriteStatus, err error)
// GetMaxGarbageCollectedBlockNumber .. // GetMaxGarbageCollectedBlockNumber ..
GetMaxGarbageCollectedBlockNumber() int64 GetMaxGarbageCollectedBlockNumber() int64
// InsertChain attempts to insert the given batch of blocks in to the canonical // InsertChain attempts to insert the given batch of blocks in to the canonical

@ -1473,8 +1473,7 @@ func (bc *BlockChainImpl) WriteBlockWithoutState(block *types.Block, td *big.Int
return nil return nil
} }
// writeBlockWithState writes the block and all associated state to the database. func (bc *BlockChainImpl) WriteBlockWithState(
func (bc *BlockChainImpl) writeBlockWithState(
block *types.Block, receipts []*types.Receipt, block *types.Block, receipts []*types.Receipt,
cxReceipts []*types.CXReceipt, cxReceipts []*types.CXReceipt,
stakeMsgs []staking.StakeMsg, stakeMsgs []staking.StakeMsg,
@ -1683,6 +1682,8 @@ func (bc *BlockChainImpl) insertChain(chain types.Blocks, verifyHeaders bool) (i
if len(chain) == 0 { if len(chain) == 0 {
return 0, nil, nil, ErrEmptyChain return 0, nil, nil, ErrEmptyChain
} }
first := chain[0]
fmt.Println("insertChain", utils.GetPort(), first.ShardID(), first.Epoch().Uint64(), first.NumberU64())
// Do a sanity check that the provided chain is actually ordered and linked // Do a sanity check that the provided chain is actually ordered and linked
for i := 1; i < len(chain); i++ { for i := 1; i < len(chain); i++ {
if chain[i].NumberU64() != chain[i-1].NumberU64()+1 || chain[i].ParentHash() != chain[i-1].Hash() { if chain[i].NumberU64() != chain[i-1].NumberU64()+1 || chain[i].ParentHash() != chain[i-1].Hash() {
@ -1881,7 +1882,7 @@ func (bc *BlockChainImpl) insertChain(chain types.Blocks, verifyHeaders bool) (i
// Write the block to the chain and get the status. // Write the block to the chain and get the status.
substart = time.Now() substart = time.Now()
status, err := bc.writeBlockWithState( status, err := bc.WriteBlockWithState(
block, receipts, cxReceipts, stakeMsgs, payout, state, block, receipts, cxReceipts, stakeMsgs, payout, state,
) )
if err != nil { if err != nil {

@ -124,6 +124,10 @@ func (a Stub) WriteBlockWithoutState(block *types.Block, td *big.Int) (err error
return errors.Errorf("method WriteBlockWithoutState not implemented for %s", a.Name) return errors.Errorf("method WriteBlockWithoutState not implemented for %s", a.Name)
} }
func (a Stub) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, cxReceipts []*types.CXReceipt, stakeMsgs []staking.StakeMsg, paid reward.Reader, state *state.DB) (status WriteStatus, err error) {
return 0, errors.Errorf("method WriteBlockWithState not implemented for %s", a.Name)
}
func (a Stub) GetMaxGarbageCollectedBlockNumber() int64 { func (a Stub) GetMaxGarbageCollectedBlockNumber() int64 {
return 0 return 0
} }

@ -34,11 +34,9 @@ func (timeout *Timeout) Start() {
} }
// Stop stops the timeout clock // Stop stops the timeout clock
func (timeout *Timeout) Stop() (stopped bool) { func (timeout *Timeout) Stop() {
stopped = timeout.state != Inactive
timeout.state = Inactive timeout.state = Inactive
timeout.start = time.Now() timeout.start = time.Now()
return stopped
} }
// Expired checks whether the timeout is reached/expired // Expired checks whether the timeout is reached/expired

@ -149,10 +149,6 @@ type Node struct {
registry *registry.Registry registry *registry.Registry
} }
func (node *Node) Host() p2p.Host {
return node.host
}
// Blockchain returns the blockchain for the node's current shard. // Blockchain returns the blockchain for the node's current shard.
func (node *Node) Blockchain() core.BlockChain { func (node *Node) Blockchain() core.BlockChain {
return node.registry.GetBlockchain() return node.registry.GetBlockchain()

@ -404,21 +404,16 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block) error {
} }
// BootstrapConsensus is a goroutine to check number of peers and start the consensus // BootstrapConsensus is a goroutine to check number of peers and start the consensus
func BootstrapConsensus(ctx context.Context, consensus *consensus.Consensus, host p2p.Host) error { func (node *Node) BootstrapConsensus() error {
ctx, cancel := context.WithTimeout(ctx, time.Minute) ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel() defer cancel()
min := consensus.MinPeers min := node.Consensus.MinPeers
enoughMinPeers := make(chan struct{}) enoughMinPeers := make(chan struct{})
const checkEvery = 3 * time.Second const checkEvery = 3 * time.Second
go func() { go func() {
for { for {
select { <-time.After(checkEvery)
case <-ctx.Done(): numPeersNow := node.host.GetPeerCount()
return
case <-time.After(checkEvery):
}
numPeersNow := host.GetPeerCount()
if numPeersNow >= min { if numPeersNow >= min {
utils.Logger().Info().Msg("[bootstrap] StartConsensus") utils.Logger().Info().Msg("[bootstrap] StartConsensus")
enoughMinPeers <- struct{}{} enoughMinPeers <- struct{}{}
@ -437,7 +432,9 @@ func BootstrapConsensus(ctx context.Context, consensus *consensus.Consensus, hos
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
case <-enoughMinPeers: case <-enoughMinPeers:
go consensus.StartChannel() go func() {
node.Consensus.StartChannel()
}()
return nil return nil
} }
} }

@ -7,14 +7,14 @@ import (
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/internal/utils/blockedpeers" "github.com/harmony-one/harmony/internal/utils/blockedpeers"
libp2pnetwork "github.com/libp2p/go-libp2p/core/network" libp2p_network "github.com/libp2p/go-libp2p/core/network"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
type Security interface { type Security interface {
OnConnectCheck(net libp2pnetwork.Network, conn libp2pnetwork.Conn) error OnConnectCheck(net libp2p_network.Network, conn libp2p_network.Conn) error
OnDisconnectCheck(conn libp2pnetwork.Conn) error OnDisconnectCheck(conn libp2p_network.Conn) error
} }
type peerMap struct { type peerMap struct {
@ -87,7 +87,7 @@ func (m *Manager) RangePeers(f func(key string, value []string) bool) {
m.peers.Range(f) m.peers.Range(f)
} }
func (m *Manager) OnConnectCheck(net libp2pnetwork.Network, conn libp2pnetwork.Conn) error { func (m *Manager) OnConnectCheck(net libp2p_network.Network, conn libp2p_network.Conn) error {
m.mutex.Lock() m.mutex.Lock()
defer m.mutex.Unlock() defer m.mutex.Unlock()
@ -133,7 +133,7 @@ func (m *Manager) OnConnectCheck(net libp2pnetwork.Network, conn libp2pnetwork.C
return nil return nil
} }
func (m *Manager) OnDisconnectCheck(conn libp2pnetwork.Conn) error { func (m *Manager) OnDisconnectCheck(conn libp2p_network.Conn) error {
m.mutex.Lock() m.mutex.Lock()
defer m.mutex.Unlock() defer m.mutex.Unlock()
@ -171,7 +171,7 @@ func find(slice []string, val string) (int, bool) {
return -1, false return -1, false
} }
func getRemoteIP(conn libp2pnetwork.Conn) (string, error) { func getRemoteIP(conn libp2p_network.Conn) (string, error) {
for _, protocol := range conn.RemoteMultiaddr().Protocols() { for _, protocol := range conn.RemoteMultiaddr().Protocols() {
switch protocol.Code { switch protocol.Code {
case ma.P_IP4: case ma.P_IP4:

Loading…
Cancel
Save