Merge pull request #3452 from rlan35/main

Fixes viewID and view change ID base
pull/3341/head
Rongjian Lan 4 years ago committed by GitHub
commit d7ba460163
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      consensus/checks.go
  2. 4
      consensus/consensus_service.go
  3. 10
      consensus/consensus_v2.go
  4. 2
      consensus/engine/consensus_engine.go
  5. 3
      consensus/quorum/one-node-one-vote.go
  6. 9
      consensus/validator.go
  7. 10
      consensus/view_change.go
  8. 2
      core/chain_makers.go
  9. 2
      core/state_processor.go
  10. 6
      internal/chain/engine.go
  11. 4
      node/node_handler_test.go
  12. 11
      node/node_newblock.go
  13. 4
      node/node_newblock_test.go
  14. 5
      node/worker/worker.go
  15. 4
      test/chain/main.go

@ -164,7 +164,8 @@ func (consensus *Consensus) onViewChangeSanityCheck(recvMsg *FBFTMessage) bool {
}
if consensus.IsViewChangingMode() &&
consensus.GetCurBlockViewID() > recvMsg.ViewID {
consensus.getLogger().Warn().
consensus.getLogger().Warn().Uint64("curBlockViewID", consensus.GetCurBlockViewID()).
Uint64("msgViewID", recvMsg.ViewID).
Msg("[onViewChangeSanityCheck] ViewChanging ID Is Low")
return false
}

@ -204,9 +204,9 @@ func (consensus *Consensus) checkViewID(msg *FBFTMessage) error {
consensus.LeaderPubKey = msg.SenderPubkeys[0]
consensus.IgnoreViewIDCheck.UnSet()
consensus.consensusTimeout[timeoutConsensus].Start()
consensus.getLogger().Debug().
consensus.getLogger().Info().
Str("leaderKey", consensus.LeaderPubKey.Bytes.Hex()).
Msg("Start consensus timer")
Msg("[checkViewID] Start consensus timer")
return nil
} else if msg.ViewID > consensus.GetCurBlockViewID() {
return consensus_engine.ErrViewIDNotMatch

@ -176,6 +176,7 @@ func (consensus *Consensus) finalCommit() {
Uint64("blockNum", consensus.blockNum).
Msg("[finalCommit] Sent Committed Message")
}
consensus.getLogger().Info().Msg("[finalCommit] Start consensus timer")
consensus.consensusTimeout[timeoutConsensus].Start()
} else {
// delayed send
@ -199,9 +200,7 @@ func (consensus *Consensus) finalCommit() {
if consensus.consensusTimeout[timeoutBootstrap].IsActive() {
consensus.consensusTimeout[timeoutBootstrap].Stop()
consensus.getLogger().Info().Msg("[finalCommit] Start consensus timer; stop bootstrap timer only once")
} else {
consensus.getLogger().Info().Msg("[finalCommit] Start consensus timer")
consensus.getLogger().Info().Msg("[finalCommit] stop bootstrap timer only once")
}
consensus.getLogger().Info().
@ -281,7 +280,7 @@ func (consensus *Consensus) Start(
}
consensus.getLogger().Info().Time("time", time.Now()).Msg("[ConsensusMainLoop] Consensus started")
defer close(stoppedChan)
ticker := time.NewTicker(3 * time.Second)
ticker := time.NewTicker(250 * time.Millisecond)
defer ticker.Stop()
consensus.consensusTimeout[timeoutBootstrap].Start()
consensus.getLogger().Info().Msg("[ConsensusMainLoop] Start bootstrap timeout (only once)")
@ -295,7 +294,6 @@ func (consensus *Consensus) Start(
case <-toStart:
start = true
case <-ticker.C:
consensus.getLogger().Debug().Msg("[ConsensusMainLoop] Ticker")
if !start && isInitialLeader {
continue
}
@ -325,6 +323,7 @@ func (consensus *Consensus) Start(
consensus.SetViewIDs(consensus.Blockchain.CurrentHeader().ViewID().Uint64() + 1)
mode := consensus.UpdateConsensusInformation()
consensus.current.SetMode(mode)
consensus.getLogger().Info().Msg("[syncReadyChan] Start consensus timer")
consensus.consensusTimeout[timeoutConsensus].Start()
consensus.getLogger().Info().Str("Mode", mode.String()).Msg("Node is IN SYNC")
}
@ -557,6 +556,7 @@ func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error {
Uint64("blockNum", consensus.blockNum).
Msg("[preCommitAndPropose] Sent Committed Message")
}
consensus.getLogger().Info().Msg("[preCommitAndPropose] Start consensus timer")
consensus.consensusTimeout[timeoutConsensus].Start()
// Send signal to Node to propose the new block for consensus

@ -117,6 +117,6 @@ type Engine interface {
state *state.DB, txs []*types.Transaction,
receipts []*types.Receipt, outcxs []*types.CXReceipt,
incxs []*types.CXReceiptsProof, stks staking.StakingTransactions,
doubleSigners slash.Records, sigsReady chan bool,
doubleSigners slash.Records, sigsReady chan bool, viewID func() uint64,
) (*types.Block, reward.Reader, error)
}

@ -52,6 +52,9 @@ func (v *uniformVoteWeight) IsQuorumAchieved(p Phase) bool {
// IsQuorumAchivedByMask ..
func (v *uniformVoteWeight) IsQuorumAchievedByMask(mask *bls_cosi.Mask) bool {
if mask == nil {
return false
}
threshold := v.TwoThirdsSignersCount()
currentTotalPower := utils.CountOneBits(mask.Bitmap)
if currentTotalPower < threshold {

@ -293,6 +293,7 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
}
}
initBn := consensus.blockNum
consensus.tryCatchup()
if recvMsg.BlockNum > consensus.blockNum {
consensus.getLogger().Info().Uint64("MsgBlockNum", recvMsg.BlockNum).Msg("[OnCommitted] OUT OF SYNC")
@ -306,12 +307,14 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
if consensus.consensusTimeout[timeoutBootstrap].IsActive() {
consensus.consensusTimeout[timeoutBootstrap].Stop()
consensus.getLogger().Debug().Msg("[OnCommitted] Start consensus timer; stop bootstrap timer only once")
} else {
consensus.getLogger().Debug().Msg("[OnCommitted] Start consensus timer")
consensus.getLogger().Debug().Msg("[OnCommitted] stop bootstrap timer only once")
}
if initBn < consensus.blockNum {
consensus.getLogger().Info().Msg("[OnCommitted] Start consensus timer (new block added)")
consensus.consensusTimeout[timeoutConsensus].Start()
}
}
// Collect private keys that are part of the current committee.
// TODO: cache valid private keys and only update when keys change.

@ -125,7 +125,7 @@ func (consensus *Consensus) getNextViewID() (uint64, time.Duration) {
return consensus.fallbackNextViewID()
}
blockTimestamp := curHeader.Time().Int64()
lastBlockViewID := curHeader.ViewID().Uint64()
stuckBlockViewID := curHeader.ViewID().Uint64() + 1
curTimestamp := time.Now().Unix()
// timestamp messed up in current validator node
@ -135,13 +135,13 @@ func (consensus *Consensus) getNextViewID() (uint64, time.Duration) {
// diff only increases, since view change timeout is shorter than
// view change slot now, we want to make sure diff is always greater than 0
diff := uint64((curTimestamp-blockTimestamp)/viewChangeSlot + 1)
nextViewID := diff + lastBlockViewID
nextViewID := diff + stuckBlockViewID
consensus.getLogger().Info().
Int64("curTimestamp", curTimestamp).
Int64("blockTimestamp", blockTimestamp).
Uint64("nextViewID", nextViewID).
Uint64("lastBlockViewID", lastBlockViewID).
Uint64("stuckBlockViewID", stuckBlockViewID).
Msg("[getNextViewID]")
// duration is always the fixed view change duration for synchronous view change
@ -170,8 +170,8 @@ func (consensus *Consensus) getNextLeaderKey(viewID uint64) *bls.PublicKeyWrappe
consensus.getLogger().Error().Msg("[getNextLeaderKey] Failed to get current header from blockchain")
lastLeaderPubKey = consensus.LeaderPubKey
} else {
lastBlockViewID := curHeader.ViewID().Uint64()
gap = int(viewID - lastBlockViewID)
stuckBlockViewID := curHeader.ViewID().Uint64() + 1
gap = int(viewID - stuckBlockViewID)
// this is the truth of the leader based on blockchain blocks
lastLeaderPubKey, err = consensus.getLeaderPubKeyFromCoinbase(curHeader)
if err != nil || lastLeaderPubKey == nil {

@ -196,7 +196,7 @@ func GenerateChain(
if b.engine != nil {
// Finalize and seal the block
block, _, err := b.engine.Finalize(
chainreader, b.header, statedb, b.txs, b.receipts, nil, nil, nil, nil, nil,
chainreader, b.header, statedb, b.txs, b.receipts, nil, nil, nil, nil, nil, func() uint64 { return 0 },
)
if err != nil {
panic(err)

@ -142,7 +142,7 @@ func (p *StateProcessor) Process(
}()
_, payout, err := p.engine.Finalize(
p.bc, header, statedb, block.Transactions(),
receipts, outcxs, incxs, block.StakingTransactions(), slashes, sigsReady,
receipts, outcxs, incxs, block.StakingTransactions(), slashes, sigsReady, func() uint64 { return header.ViewID().Uint64() },
)
if err != nil {
return nil, nil, nil, 0, nil, errors.New("[Process] Cannot finalize block")

@ -209,7 +209,7 @@ func (e *engineImpl) Finalize(
state *state.DB, txs []*types.Transaction,
receipts []*types.Receipt, outcxs []*types.CXReceipt,
incxs []*types.CXReceiptsProof, stks staking.StakingTransactions,
doubleSigners slash.Records, sigsReady chan bool,
doubleSigners slash.Records, sigsReady chan bool, viewID func() uint64,
) (*types.Block, reward.Reader, error) {
isBeaconChain := header.ShardID() == shard.BeaconChainShardID
@ -263,6 +263,10 @@ func (e *engineImpl) Finalize(
return nil, nil, errors.New("slashes proposed in non-beacon chain or non-staking epoch")
}
// ViewID setting needs to happen after commig sig reward logic for pipelining reason.
// TODO: make the viewID fetch from caller of the block proposal.
header.SetViewID(new(big.Int).SetUint64(viewID()))
// Finalize the state root
header.SetRoot(state.IntermediateRoot(chain.Config().IsS3(header.Epoch())))
return types.NewBlock(header, txs, receipts, outcxs, incxs, stks), payout, nil

@ -47,7 +47,7 @@ func TestAddNewBlock(t *testing.T) {
commitSigs <- []byte{}
}()
block, _ := node.Worker.FinalizeNewBlock(
commitSigs, 0, common.Address{}, nil, nil,
commitSigs, func() uint64 { return 0 }, common.Address{}, nil, nil,
)
_, err = node.Blockchain().InsertChain([]*types.Block{block}, true)
@ -90,7 +90,7 @@ func TestVerifyNewBlock(t *testing.T) {
commitSigs <- []byte{}
}()
block, _ := node.Worker.FinalizeNewBlock(
commitSigs, 0, common.Address{}, nil, nil,
commitSigs, func() uint64 { return 0 }, common.Address{}, nil, nil,
)
if err := node.VerifyNewBlock(block); err != nil {

@ -49,6 +49,7 @@ func (node *Node) WaitForConsensusReadyV2(readySignal chan consensus.ProposalTyp
time.Sleep(SleepPeriod)
utils.Logger().Info().
Uint64("blockNum", node.Blockchain().CurrentBlock().NumberU64()+1).
Bool("asyncProposal", proposalType == consensus.AsyncProposal).
Msg("PROPOSING NEW BLOCK ------------------------------------------------")
// Prepare last commit signatures
@ -123,15 +124,16 @@ func (node *Node) ProposeNewBlock(commitSigs chan []byte) (*types.Block, error)
header := node.Worker.GetCurrentHeader()
// Update worker's current header and
// state data in preparation to propose/process new transactions
leaderKey := node.Consensus.LeaderPubKey
var (
coinbase = node.GetAddressForBLSKey(node.Consensus.LeaderPubKey.Object, header.Epoch())
coinbase = node.GetAddressForBLSKey(leaderKey.Object, header.Epoch())
beneficiary = coinbase
err error
)
// After staking, all coinbase will be the address of bls pub key
if node.Blockchain().Config().IsStaking(header.Epoch()) {
blsPubKeyBytes := node.Consensus.LeaderPubKey.Object.GetAddress()
blsPubKeyBytes := leaderKey.Object.GetAddress()
coinbase.SetBytes(blsPubKeyBytes[:])
}
@ -261,8 +263,11 @@ func (node *Node) ProposeNewBlock(commitSigs chan []byte) (*types.Block, error)
return nil, err
}
viewIDFunc := func() uint64 {
return node.Consensus.GetCurBlockViewID()
}
finalizedBlock, err := node.Worker.FinalizeNewBlock(
commitSigs, node.Consensus.GetCurBlockViewID(),
commitSigs, viewIDFunc,
coinbase, crossLinksToPropose, shardState,
)
if err != nil {

@ -52,7 +52,7 @@ func TestFinalizeNewBlockAsync(t *testing.T) {
}()
block, _ := node.Worker.FinalizeNewBlock(
commitSigs, 0, common.Address{}, nil, nil,
commitSigs, func() uint64 { return 0 }, common.Address{}, nil, nil,
)
if err := node.VerifyNewBlock(block); err != nil {
@ -64,7 +64,7 @@ func TestFinalizeNewBlockAsync(t *testing.T) {
node.Worker.UpdateCurrent()
_, err = node.Worker.FinalizeNewBlock(
commitSigs, 0, common.Address{}, nil, nil,
commitSigs, func() uint64 { return 0 }, common.Address{}, nil, nil,
)
if !strings.Contains(err.Error(), "cannot finalize block") {

@ -427,11 +427,10 @@ func (w *Worker) verifySlashes(
// FinalizeNewBlock generate a new block for the next consensus round.
func (w *Worker) FinalizeNewBlock(
commitSigs chan []byte, viewID uint64, coinbase common.Address,
commitSigs chan []byte, viewID func() uint64, coinbase common.Address,
crossLinks types.CrossLinks, shardState *shard.State,
) (*types.Block, error) {
w.current.header.SetCoinbase(coinbase)
w.current.header.SetViewID(new(big.Int).SetUint64(viewID))
// Put crosslinks into header
if len(crossLinks) > 0 {
@ -515,7 +514,7 @@ func (w *Worker) FinalizeNewBlock(
block, _, err := w.engine.Finalize(
w.chain, copyHeader, state, w.current.txs, w.current.receipts,
w.current.outcxs, w.current.incxs, w.current.stakingTxs,
w.current.slashes, sigsReady,
w.current.slashes, sigsReady, viewID,
)
if err != nil {
return nil, errors.Wrapf(err, "cannot finalize block")

@ -133,7 +133,7 @@ func fundFaucetContract(chain *core.BlockChain) {
commitSigs <- []byte{}
}()
block, _ := contractworker.
FinalizeNewBlock(commitSigs, 0, common.Address{}, nil, nil)
FinalizeNewBlock(commitSigs, func() uint64 { return 0 }, common.Address{}, nil, nil)
_, err = chain.InsertChain(types.Blocks{block}, true /* verifyHeaders */)
if err != nil {
fmt.Println(err)
@ -182,7 +182,7 @@ func callFaucetContractToFundAnAddress(chain *core.BlockChain) {
commitSigs <- []byte{}
}()
block, _ := contractworker.FinalizeNewBlock(
commitSigs, 0, common.Address{}, nil, nil,
commitSigs, func() uint64 { return 0 }, common.Address{}, nil, nil,
)
_, err = chain.InsertChain(types.Blocks{block}, true /* verifyHeaders */)
if err != nil {

Loading…
Cancel
Save