Merge pull request #4009 from LeoHChen/merge_v4_to_main

Merge v4 to main
pull/4015/head
Leo Chen 3 years ago committed by GitHub
commit 170c4aa3df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      consensus/checks.go
  2. 19
      consensus/consensus_v2.go
  3. 4
      consensus/validator.go
  4. 4
      consensus/view_change.go
  5. 16
      core/tx_pool.go
  6. 2
      node/node.go
  7. 71
      node/node_newblock.go

@ -145,7 +145,7 @@ func (consensus *Consensus) onViewChangeSanityCheck(recvMsg *FBFTMessage) bool {
// TODO: if difference is only one, new leader can still propose the same committed block to avoid another view change
// TODO: new leader catchup without ignore view change message
consensus.getLogger().Info().
consensus.getLogger().Debug().
Uint64("MsgBlockNum", recvMsg.BlockNum).
Uint64("MyViewChangingID", consensus.GetViewChangingID()).
Uint64("MsgViewChangingID", recvMsg.ViewID).
@ -164,7 +164,7 @@ func (consensus *Consensus) onViewChangeSanityCheck(recvMsg *FBFTMessage) bool {
}
if consensus.IsViewChangingMode() &&
consensus.GetCurBlockViewID() > recvMsg.ViewID {
consensus.getLogger().Warn().Uint64("curBlockViewID", consensus.GetCurBlockViewID()).
consensus.getLogger().Debug().Uint64("curBlockViewID", consensus.GetCurBlockViewID()).
Uint64("msgViewID", recvMsg.ViewID).
Msg("[onViewChangeSanityCheck] ViewChanging ID Is Low")
return false

@ -171,15 +171,7 @@ func (consensus *Consensus) finalCommit() {
consensus.getLogger().Info().Hex("new", commitSigAndBitmap).Msg("[finalCommit] Overriding commit signatures!!")
consensus.Blockchain.WriteCommitSig(block.NumberU64(), commitSigAndBitmap)
block.SetCurrentCommitSig(commitSigAndBitmap)
err = consensus.commitBlock(block, FBFTMsg)
if err != nil || consensus.blockNum-beforeCatchupNum != 1 {
consensus.getLogger().Err(err).
Uint64("beforeCatchupBlockNum", beforeCatchupNum).
Msg("[finalCommit] Leader failed to commit the confirmed block")
}
// Send committed message before block insertion.
// if leader successfully finalizes the block, send committed message to validators
// Note: leader already sent 67% commit in preCommit. The 100% commit won't be sent immediately
// to save network traffic. It will only be sent in retry if consensus doesn't move forward.
@ -216,6 +208,15 @@ func (consensus *Consensus) finalCommit() {
Msg("[finalCommit] Queued Committed Message")
}
block.SetCurrentCommitSig(commitSigAndBitmap)
err = consensus.commitBlock(block, FBFTMsg)
if err != nil || consensus.blockNum-beforeCatchupNum != 1 {
consensus.getLogger().Err(err).
Uint64("beforeCatchupBlockNum", beforeCatchupNum).
Msg("[finalCommit] Leader failed to commit the confirmed block")
}
// Dump new block into level db
// In current code, we add signatures in block in tryCatchup, the block dump to explorer does not contains signatures
// but since explorer doesn't need signatures, it should be fine

@ -84,13 +84,15 @@ func (consensus *Consensus) validateNewBlock(recvMsg *FBFTMessage) (*types.Block
blockObj = consensus.FBFTLog.GetBlockByHash(recvMsg.BlockHash)
if blockObj == nil {
if err := rlp.DecodeBytes(recvMsg.Block, blockObj); err != nil {
var blockObj2 types.Block
if err := rlp.DecodeBytes(recvMsg.Block, &blockObj2); err != nil {
consensus.getLogger().Warn().
Err(err).
Uint64("MsgBlockNum", recvMsg.BlockNum).
Msg("[validateNewBlock] Unparseable block header data")
return nil, errors.New("Failed parsing new block")
}
blockObj = &blockObj2
}
consensus.getLogger().Info().
Msg("[validateNewBlock] Block Already verified")

@ -356,7 +356,7 @@ func (consensus *Consensus) onViewChange(recvMsg *FBFTMessage) {
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
consensus.getLogger().Info().
consensus.getLogger().Debug().
Uint64("viewID", recvMsg.ViewID).
Uint64("blockNum", recvMsg.BlockNum).
Interface("SenderPubkeys", recvMsg.SenderPubkeys).
@ -366,7 +366,7 @@ func (consensus *Consensus) onViewChange(recvMsg *FBFTMessage) {
newLeaderKey := recvMsg.LeaderPubkey
newLeaderPriKey, err := consensus.GetLeaderPrivateKey(newLeaderKey.Object)
if err != nil {
consensus.getLogger().Info().
consensus.getLogger().Debug().
Err(err).
Interface("SenderPubkeys", recvMsg.SenderPubkeys).
Str("NextLeader", recvMsg.LeaderPubkey.Bytes.Hex()).

@ -925,12 +925,12 @@ func (pool *TxPool) add(tx types.PoolTransaction, local bool) (bool, error) {
// If the transaction is already known, discard it
hash := tx.Hash()
if pool.all.Get(hash) != nil {
logger.Info().Str("hash", hash.Hex()).Msg("Discarding already known transaction")
logger.Debug().Str("hash", hash.Hex()).Msg("Discarding already known transaction")
return false, errors.WithMessagef(ErrKnownTransaction, "transaction hash %x", hash)
}
// If the transaction fails basic validation, discard it
if err := pool.validateTx(tx, local); err != nil {
logger.Warn().Err(err).Str("hash", hash.Hex()).Msg("Discarding invalid transaction")
logger.Debug().Err(err).Str("hash", hash.Hex()).Msg("Discarding invalid transaction")
invalidTxCounter.Inc(1)
return false, err
}
@ -940,7 +940,7 @@ func (pool *TxPool) add(tx types.PoolTransaction, local bool) (bool, error) {
if !local && pool.priced.Underpriced(tx, pool.locals) {
gasPrice := new(big.Float).SetInt64(tx.GasPrice().Int64())
gasPrice = gasPrice.Mul(gasPrice, new(big.Float).SetFloat64(1e-9)) // Gas-price is in Nano
logger.Warn().
logger.Debug().
Str("hash", hash.Hex()).
Str("price", tx.GasPrice().String()).
Msg("Discarding underpriced transaction")
@ -956,7 +956,7 @@ func (pool *TxPool) add(tx types.PoolTransaction, local bool) (bool, error) {
underpricedTxCounter.Inc(1)
pool.txErrorSink.Add(tx,
errors.WithMessagef(ErrUnderpriced, "transaction gas-price is %.18f ONE in full transaction pool", gasPrice))
logger.Warn().
logger.Debug().
Str("hash", tx.Hash().Hex()).
Str("price", tx.GasPrice().String()).
Msg("Discarding freshly underpriced transaction")
@ -978,7 +978,7 @@ func (pool *TxPool) add(tx types.PoolTransaction, local bool) (bool, error) {
pendingReplaceCounter.Inc(1)
pool.txErrorSink.Add(old,
fmt.Errorf("replaced transaction, new transaction %v has same nonce & higher price", tx.Hash().String()))
logger.Info().
logger.Debug().
Str("hash", old.Hash().String()).
Str("new-tx-hash", tx.Hash().String()).
Str("price", old.GasPrice().String()).
@ -991,7 +991,7 @@ func (pool *TxPool) add(tx types.PoolTransaction, local bool) (bool, error) {
// Set or refresh beat for account timeout eviction
pool.beats[from] = time.Now()
logger.Info().
logger.Debug().
Str("hash", tx.Hash().Hex()).
Interface("from", from).
Interface("to", tx.To()).
@ -1020,7 +1020,7 @@ func (pool *TxPool) add(tx types.PoolTransaction, local bool) (bool, error) {
// Set or refresh beat for account timeout eviction
pool.beats[from] = time.Now()
logger.Info().
logger.Debug().
Str("hash", hash.Hex()).
Interface("from", from).
Interface("to", tx.To()).
@ -1332,7 +1332,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) {
hash := tx.Hash()
if pool.promoteTx(addr, tx) {
logger.Info().Str("hash", hash.Hex()).Msg("Promoting queued transaction")
logger.Debug().Str("hash", hash.Hex()).Msg("Promoting queued transaction")
promoted = append(promoted, tx)
}
}

@ -218,7 +218,7 @@ func (node *Node) addPendingTransactions(newTxs types.Transactions) []error {
errs = append(errs, node.TxPool.AddRemotes(poolTxs)...)
pendingCount, queueCount := node.TxPool.Stats()
utils.Logger().Info().
utils.Logger().Debug().
Interface("err", errs).
Int("length of newTxs", len(newTxs)).
Int("totalPending", pendingCount).

@ -91,7 +91,6 @@ func (node *Node) WaitForConsensusReadyV2(readySignal chan consensus.ProposalTyp
if blk, ok := node.proposedBlock[newBlock.NumberU64()]; ok {
utils.Logger().Info().Uint64("blockNum", newBlock.NumberU64()).Str("blockHash", blk.Hash().Hex()).
Msg("Block with the same number was already proposed, abort.")
break
}
utils.Logger().Info().
Uint64("blockNum", newBlock.NumberU64()).
@ -172,45 +171,47 @@ func (node *Node) ProposeNewBlock(commitSigs chan []byte) (*types.Block, error)
}
}
// Prepare normal and staking transactions retrieved from transaction pool
utils.AnalysisStart("proposeNewBlockChooseFromTxnPool")
if !shard.Schedule.IsLastBlock(header.Number().Uint64()) {
// Prepare normal and staking transactions retrieved from transaction pool
utils.AnalysisStart("proposeNewBlockChooseFromTxnPool")
pendingPoolTxs, err := node.TxPool.Pending()
if err != nil {
utils.Logger().Err(err).Msg("Failed to fetch pending transactions")
return nil, err
}
pendingPlainTxs := map[common.Address]types.Transactions{}
pendingStakingTxs := staking.StakingTransactions{}
for addr, poolTxs := range pendingPoolTxs {
plainTxsPerAcc := types.Transactions{}
for _, tx := range poolTxs {
if plainTx, ok := tx.(*types.Transaction); ok {
plainTxsPerAcc = append(plainTxsPerAcc, plainTx)
} else if stakingTx, ok := tx.(*staking.StakingTransaction); ok {
// Only process staking transactions after pre-staking epoch happened.
if node.Blockchain().Config().IsPreStaking(node.Worker.GetCurrentHeader().Epoch()) {
pendingStakingTxs = append(pendingStakingTxs, stakingTx)
pendingPoolTxs, err := node.TxPool.Pending()
if err != nil {
utils.Logger().Err(err).Msg("Failed to fetch pending transactions")
return nil, err
}
pendingPlainTxs := map[common.Address]types.Transactions{}
pendingStakingTxs := staking.StakingTransactions{}
for addr, poolTxs := range pendingPoolTxs {
plainTxsPerAcc := types.Transactions{}
for _, tx := range poolTxs {
if plainTx, ok := tx.(*types.Transaction); ok {
plainTxsPerAcc = append(plainTxsPerAcc, plainTx)
} else if stakingTx, ok := tx.(*staking.StakingTransaction); ok {
// Only process staking transactions after pre-staking epoch happened.
if node.Blockchain().Config().IsPreStaking(node.Worker.GetCurrentHeader().Epoch()) {
pendingStakingTxs = append(pendingStakingTxs, stakingTx)
}
} else {
utils.Logger().Err(types.ErrUnknownPoolTxType).
Msg("Failed to parse pending transactions")
return nil, types.ErrUnknownPoolTxType
}
} else {
utils.Logger().Err(types.ErrUnknownPoolTxType).
Msg("Failed to parse pending transactions")
return nil, types.ErrUnknownPoolTxType
}
if plainTxsPerAcc.Len() > 0 {
pendingPlainTxs[addr] = plainTxsPerAcc
}
}
if plainTxsPerAcc.Len() > 0 {
pendingPlainTxs[addr] = plainTxsPerAcc
}
}
utils.AnalysisEnd("proposeNewBlockChooseFromTxnPool")
// Try commit normal and staking transactions based on the current state
// The successfully committed transactions will be put in the proposed block
if err := node.Worker.CommitTransactions(
pendingPlainTxs, pendingStakingTxs, beneficiary,
); err != nil {
utils.Logger().Error().Err(err).Msg("cannot commit transactions")
return nil, err
// Try commit normal and staking transactions based on the current state
// The successfully committed transactions will be put in the proposed block
if err := node.Worker.CommitTransactions(
pendingPlainTxs, pendingStakingTxs, beneficiary,
); err != nil {
utils.Logger().Error().Err(err).Msg("cannot commit transactions")
return nil, err
}
utils.AnalysisEnd("proposeNewBlockChooseFromTxnPool")
}
// Prepare cross shard transaction receipts

Loading…
Cancel
Save