|
|
|
@ -4,13 +4,13 @@ import ( |
|
|
|
|
"bytes" |
|
|
|
|
"context" |
|
|
|
|
"encoding/hex" |
|
|
|
|
"math/big" |
|
|
|
|
"fmt" |
|
|
|
|
"sync/atomic" |
|
|
|
|
"time" |
|
|
|
|
|
|
|
|
|
"github.com/ethereum/go-ethereum/common" |
|
|
|
|
bls2 "github.com/harmony-one/bls/ffi/go/bls" |
|
|
|
|
"github.com/harmony-one/harmony/consensus/signature" |
|
|
|
|
|
|
|
|
|
nodeconfig "github.com/harmony-one/harmony/internal/configs/node" |
|
|
|
|
"github.com/harmony-one/harmony/internal/utils" |
|
|
|
|
|
|
|
|
@ -46,24 +46,16 @@ const ( |
|
|
|
|
|
|
|
|
|
// IsViewChangingMode return true if curernt mode is viewchanging
|
|
|
|
|
func (consensus *Consensus) IsViewChangingMode() bool { |
|
|
|
|
consensus.mutex.RLock() |
|
|
|
|
defer consensus.mutex.RUnlock() |
|
|
|
|
return consensus.isViewChangingMode() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (consensus *Consensus) isViewChangingMode() bool { |
|
|
|
|
return consensus.current.Mode() == ViewChanging |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// HandleMessageUpdate will update the consensus state according to received message
|
|
|
|
|
func (consensus *Consensus) HandleMessageUpdate(ctx context.Context, msg *msg_pb.Message, senderKey *bls.SerializedPublicKey) error { |
|
|
|
|
consensus.mutex.Lock() |
|
|
|
|
defer consensus.mutex.Unlock() |
|
|
|
|
// when node is in ViewChanging mode, it still accepts normal messages into FBFTLog
|
|
|
|
|
// in order to avoid possible trap forever but drop PREPARE and COMMIT
|
|
|
|
|
// which are message types specifically for a node acting as leader
|
|
|
|
|
// so we just ignore those messages
|
|
|
|
|
if consensus.isViewChangingMode() && |
|
|
|
|
if consensus.IsViewChangingMode() && |
|
|
|
|
(msg.Type == msg_pb.MessageType_PREPARE || |
|
|
|
|
msg.Type == msg_pb.MessageType_COMMIT) { |
|
|
|
|
return nil |
|
|
|
@ -95,7 +87,7 @@ func (consensus *Consensus) HandleMessageUpdate(ctx context.Context, msg *msg_pb |
|
|
|
|
members := consensus.Decider.Participants() |
|
|
|
|
fbftMsg, err = ParseNewViewMessage(msg, members) |
|
|
|
|
default: |
|
|
|
|
fbftMsg, err = consensus.parseFBFTMessage(msg) |
|
|
|
|
fbftMsg, err = consensus.ParseFBFTMessage(msg) |
|
|
|
|
} |
|
|
|
|
if err != nil || fbftMsg == nil { |
|
|
|
|
return errors.Wrapf(err, "unable to parse consensus msg with type: %s", msg.Type) |
|
|
|
@ -103,8 +95,8 @@ func (consensus *Consensus) HandleMessageUpdate(ctx context.Context, msg *msg_pb |
|
|
|
|
|
|
|
|
|
canHandleViewChange := true |
|
|
|
|
intendedForValidator, intendedForLeader := |
|
|
|
|
!consensus.isLeader(), |
|
|
|
|
consensus.isLeader() |
|
|
|
|
!consensus.IsLeader(), |
|
|
|
|
consensus.IsLeader() |
|
|
|
|
|
|
|
|
|
// if in backup normal mode, force ignore view change event and leader event.
|
|
|
|
|
if consensus.current.Mode() == NormalBackup { |
|
|
|
@ -139,14 +131,15 @@ func (consensus *Consensus) HandleMessageUpdate(ctx context.Context, msg *msg_pb |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (consensus *Consensus) finalCommit() { |
|
|
|
|
// THIS IS NOT GOOD PLACE FOR LEADER SWITCHING
|
|
|
|
|
numCommits := consensus.Decider.SignersCount(quorum.Commit) |
|
|
|
|
|
|
|
|
|
consensus.getLogger().Info(). |
|
|
|
|
Int64("NumCommits", numCommits). |
|
|
|
|
Msg("[finalCommit] Finalizing Consensus") |
|
|
|
|
beforeCatchupNum := consensus.getBlockNum() |
|
|
|
|
beforeCatchupNum := consensus.BlockNum() |
|
|
|
|
|
|
|
|
|
leaderPriKey, err := consensus.getConsensusLeaderPrivateKey() |
|
|
|
|
leaderPriKey, err := consensus.GetConsensusLeaderPrivateKey() |
|
|
|
|
if err != nil { |
|
|
|
|
consensus.getLogger().Error().Err(err).Msg("[finalCommit] leader not found") |
|
|
|
|
return |
|
|
|
@ -185,7 +178,7 @@ func (consensus *Consensus) finalCommit() { |
|
|
|
|
// Note: leader already sent 67% commit in preCommit. The 100% commit won't be sent immediately
|
|
|
|
|
// to save network traffic. It will only be sent in retry if consensus doesn't move forward.
|
|
|
|
|
// Or if the leader is changed for next block, the 100% committed sig will be sent to the next leader immediately.
|
|
|
|
|
if !consensus.isLeader() || block.IsLastBlockInEpoch() { |
|
|
|
|
if !consensus.IsLeader() || block.IsLastBlockInEpoch() { |
|
|
|
|
// send immediately
|
|
|
|
|
if err := consensus.msgSender.SendWithRetry( |
|
|
|
|
block.NumberU64(), |
|
|
|
@ -250,7 +243,7 @@ func (consensus *Consensus) finalCommit() { |
|
|
|
|
|
|
|
|
|
// If still the leader, send commit sig/bitmap to finish the new block proposal,
|
|
|
|
|
// else, the block proposal will timeout by itself.
|
|
|
|
|
if consensus.isLeader() { |
|
|
|
|
if consensus.IsLeader() { |
|
|
|
|
if block.IsLastBlockInEpoch() { |
|
|
|
|
// No pipelining
|
|
|
|
|
go func() { |
|
|
|
@ -298,29 +291,143 @@ func (consensus *Consensus) BlockCommitSigs(blockNum uint64) ([]byte, error) { |
|
|
|
|
|
|
|
|
|
// Start waits for the next new block and run consensus
|
|
|
|
|
func (consensus *Consensus) Start( |
|
|
|
|
stopChan chan struct{}, |
|
|
|
|
blockChannel chan *types.Block, stopChan, stoppedChan, startChannel chan struct{}, |
|
|
|
|
) { |
|
|
|
|
go func() { |
|
|
|
|
toStart := make(chan struct{}, 1) |
|
|
|
|
isInitialLeader := consensus.IsLeader() |
|
|
|
|
if isInitialLeader { |
|
|
|
|
consensus.getLogger().Info().Time("time", time.Now()).Msg("[ConsensusMainLoop] Waiting for consensus start") |
|
|
|
|
// send a signal to indicate it's ready to run consensus
|
|
|
|
|
// this signal is consumed by node object to create a new block and in turn trigger a new consensus on it
|
|
|
|
|
go func() { |
|
|
|
|
<-startChannel |
|
|
|
|
toStart <- struct{}{} |
|
|
|
|
consensus.getLogger().Info().Time("time", time.Now()).Msg("[ConsensusMainLoop] Send ReadySignal") |
|
|
|
|
consensus.ReadySignal <- SyncProposal |
|
|
|
|
}() |
|
|
|
|
} |
|
|
|
|
consensus.getLogger().Info().Time("time", time.Now()).Msg("[ConsensusMainLoop] Consensus started") |
|
|
|
|
go func() { |
|
|
|
|
ticker := time.NewTicker(250 * time.Millisecond) |
|
|
|
|
defer ticker.Stop() |
|
|
|
|
for { |
|
|
|
|
select { |
|
|
|
|
case <-stopChan: |
|
|
|
|
return |
|
|
|
|
case <-ticker.C: |
|
|
|
|
consensus.Tick() |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
consensus.mutex.Lock() |
|
|
|
|
defer close(stoppedChan) |
|
|
|
|
ticker := time.NewTicker(250 * time.Millisecond) |
|
|
|
|
defer ticker.Stop() |
|
|
|
|
consensus.consensusTimeout[timeoutBootstrap].Start() |
|
|
|
|
consensus.getLogger().Info().Msg("[ConsensusMainLoop] Start bootstrap timeout (only once)") |
|
|
|
|
|
|
|
|
|
// Set up next block due time.
|
|
|
|
|
consensus.NextBlockDue = time.Now().Add(consensus.BlockPeriod) |
|
|
|
|
consensus.mutex.Unlock() |
|
|
|
|
start := false |
|
|
|
|
for { |
|
|
|
|
select { |
|
|
|
|
case <-toStart: |
|
|
|
|
start = true |
|
|
|
|
case <-ticker.C: |
|
|
|
|
if !start && isInitialLeader { |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
for k, v := range consensus.consensusTimeout { |
|
|
|
|
// stop timer in listening mode
|
|
|
|
|
if consensus.current.Mode() == Listening { |
|
|
|
|
v.Stop() |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if consensus.current.Mode() == Syncing { |
|
|
|
|
// never stop bootstrap timer here in syncing mode as it only starts once
|
|
|
|
|
// if it is stopped, bootstrap will be stopped and nodes
|
|
|
|
|
// can't start view change or join consensus
|
|
|
|
|
// the bootstrap timer will be stopped once consensus is reached or view change
|
|
|
|
|
// is succeeded
|
|
|
|
|
if k != timeoutBootstrap { |
|
|
|
|
consensus.getLogger().Debug(). |
|
|
|
|
Str("k", k.String()). |
|
|
|
|
Str("Mode", consensus.current.Mode().String()). |
|
|
|
|
Msg("[ConsensusMainLoop] consensusTimeout stopped!!!") |
|
|
|
|
v.Stop() |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if !v.CheckExpire() { |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
if k != timeoutViewChange { |
|
|
|
|
consensus.getLogger().Warn().Msg("[ConsensusMainLoop] Ops Consensus Timeout!!!") |
|
|
|
|
consensus.startViewChange() |
|
|
|
|
break |
|
|
|
|
} else { |
|
|
|
|
consensus.getLogger().Warn().Msg("[ConsensusMainLoop] Ops View Change Timeout!!!") |
|
|
|
|
consensus.startViewChange() |
|
|
|
|
break |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// TODO: Refactor this piece of code to consensus/downloader.go after DNS legacy sync is removed
|
|
|
|
|
case <-consensus.syncReadyChan: |
|
|
|
|
consensus.getLogger().Info().Msg("[ConsensusMainLoop] syncReadyChan") |
|
|
|
|
consensus.mutex.Lock() |
|
|
|
|
if consensus.BlockNum() < consensus.Blockchain().CurrentHeader().Number().Uint64()+1 { |
|
|
|
|
consensus.SetBlockNum(consensus.Blockchain().CurrentHeader().Number().Uint64() + 1) |
|
|
|
|
consensus.SetViewIDs(consensus.Blockchain().CurrentHeader().ViewID().Uint64() + 1) |
|
|
|
|
mode := consensus.UpdateConsensusInformation() |
|
|
|
|
consensus.current.SetMode(mode) |
|
|
|
|
consensus.getLogger().Info().Msg("[syncReadyChan] Start consensus timer") |
|
|
|
|
consensus.consensusTimeout[timeoutConsensus].Start() |
|
|
|
|
consensus.getLogger().Info().Str("Mode", mode.String()).Msg("Node is IN SYNC") |
|
|
|
|
consensusSyncCounterVec.With(prometheus.Labels{"consensus": "in_sync"}).Inc() |
|
|
|
|
} else if consensus.Mode() == Syncing { |
|
|
|
|
// Corner case where sync is triggered before `onCommitted` and there is a race
|
|
|
|
|
// for block insertion between consensus and downloader.
|
|
|
|
|
mode := consensus.UpdateConsensusInformation() |
|
|
|
|
consensus.SetMode(mode) |
|
|
|
|
consensus.getLogger().Info().Msg("[syncReadyChan] Start consensus timer") |
|
|
|
|
consensus.consensusTimeout[timeoutConsensus].Start() |
|
|
|
|
consensusSyncCounterVec.With(prometheus.Labels{"consensus": "in_sync"}).Inc() |
|
|
|
|
} |
|
|
|
|
consensus.mutex.Unlock() |
|
|
|
|
|
|
|
|
|
// TODO: Refactor this piece of code to consensus/downloader.go after DNS legacy sync is removed
|
|
|
|
|
case <-consensus.syncNotReadyChan: |
|
|
|
|
consensus.getLogger().Info().Msg("[ConsensusMainLoop] syncNotReadyChan") |
|
|
|
|
consensus.SetBlockNum(consensus.Blockchain().CurrentHeader().Number().Uint64() + 1) |
|
|
|
|
consensus.current.SetMode(Syncing) |
|
|
|
|
consensus.getLogger().Info().Msg("[ConsensusMainLoop] Node is OUT OF SYNC") |
|
|
|
|
consensusSyncCounterVec.With(prometheus.Labels{"consensus": "out_of_sync"}).Inc() |
|
|
|
|
|
|
|
|
|
case newBlock := <-blockChannel: |
|
|
|
|
//consensus.ReshardingNextLeader(newBlock)
|
|
|
|
|
consensus.getLogger().Info(). |
|
|
|
|
Uint64("MsgBlockNum", newBlock.NumberU64()). |
|
|
|
|
Msg("[ConsensusMainLoop] Received Proposed New Block!") |
|
|
|
|
|
|
|
|
|
if newBlock.NumberU64() < consensus.BlockNum() { |
|
|
|
|
consensus.getLogger().Warn().Uint64("newBlockNum", newBlock.NumberU64()). |
|
|
|
|
Msg("[ConsensusMainLoop] received old block, abort") |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
// Sleep to wait for the full block time
|
|
|
|
|
consensus.getLogger().Info().Msg("[ConsensusMainLoop] Waiting for Block Time") |
|
|
|
|
|
|
|
|
|
<-time.After(time.Until(consensus.NextBlockDue)) |
|
|
|
|
consensus.StartFinalityCount() |
|
|
|
|
|
|
|
|
|
// Update time due for next block
|
|
|
|
|
consensus.NextBlockDue = time.Now().Add(consensus.BlockPeriod) |
|
|
|
|
|
|
|
|
|
startTime = time.Now() |
|
|
|
|
consensus.msgSender.Reset(newBlock.NumberU64()) |
|
|
|
|
|
|
|
|
|
consensus.getLogger().Info(). |
|
|
|
|
Int("numTxs", len(newBlock.Transactions())). |
|
|
|
|
Int("numStakingTxs", len(newBlock.StakingTransactions())). |
|
|
|
|
Time("startTime", startTime). |
|
|
|
|
Int64("publicKeys", consensus.Decider.ParticipantsCount()). |
|
|
|
|
Msg("[ConsensusMainLoop] STARTING CONSENSUS") |
|
|
|
|
consensus.announce(newBlock) |
|
|
|
|
case <-stopChan: |
|
|
|
|
consensus.getLogger().Info().Msg("[ConsensusMainLoop] stopChan") |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
if consensus.dHelper != nil { |
|
|
|
@ -328,129 +435,28 @@ func (consensus *Consensus) Start( |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (consensus *Consensus) StartChannel() { |
|
|
|
|
consensus.mutex.Lock() |
|
|
|
|
consensus.isInitialLeader = consensus.isLeader() |
|
|
|
|
if consensus.isInitialLeader { |
|
|
|
|
consensus.start = true |
|
|
|
|
consensus.getLogger().Info().Time("time", time.Now()).Msg("[ConsensusMainLoop] Send ReadySignal") |
|
|
|
|
consensus.mutex.Unlock() |
|
|
|
|
consensus.ReadySignal <- SyncProposal |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
consensus.mutex.Unlock() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (consensus *Consensus) syncReadyChan() { |
|
|
|
|
consensus.getLogger().Info().Msg("[ConsensusMainLoop] syncReadyChan") |
|
|
|
|
if consensus.getBlockNum() < consensus.Blockchain().CurrentHeader().Number().Uint64()+1 { |
|
|
|
|
consensus.setBlockNum(consensus.Blockchain().CurrentHeader().Number().Uint64() + 1) |
|
|
|
|
consensus.setViewIDs(consensus.Blockchain().CurrentHeader().ViewID().Uint64() + 1) |
|
|
|
|
mode := consensus.updateConsensusInformation() |
|
|
|
|
consensus.current.SetMode(mode) |
|
|
|
|
consensus.getLogger().Info().Msg("[syncReadyChan] Start consensus timer") |
|
|
|
|
consensus.consensusTimeout[timeoutConsensus].Start() |
|
|
|
|
consensus.getLogger().Info().Str("Mode", mode.String()).Msg("Node is IN SYNC") |
|
|
|
|
consensusSyncCounterVec.With(prometheus.Labels{"consensus": "in_sync"}).Inc() |
|
|
|
|
} else if consensus.mode() == Syncing { |
|
|
|
|
// Corner case where sync is triggered before `onCommitted` and there is a race
|
|
|
|
|
// for block insertion between consensus and downloader.
|
|
|
|
|
mode := consensus.updateConsensusInformation() |
|
|
|
|
consensus.setMode(mode) |
|
|
|
|
consensus.getLogger().Info().Msg("[syncReadyChan] Start consensus timer") |
|
|
|
|
consensus.consensusTimeout[timeoutConsensus].Start() |
|
|
|
|
consensusSyncCounterVec.With(prometheus.Labels{"consensus": "in_sync"}).Inc() |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (consensus *Consensus) syncNotReadyChan() { |
|
|
|
|
consensus.getLogger().Info().Msg("[ConsensusMainLoop] syncNotReadyChan") |
|
|
|
|
consensus.setBlockNum(consensus.Blockchain().CurrentHeader().Number().Uint64() + 1) |
|
|
|
|
consensus.current.SetMode(Syncing) |
|
|
|
|
consensus.getLogger().Info().Msg("[ConsensusMainLoop] Node is OUT OF SYNC") |
|
|
|
|
consensusSyncCounterVec.With(prometheus.Labels{"consensus": "out_of_sync"}).Inc() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (consensus *Consensus) Tick() { |
|
|
|
|
consensus.mutex.Lock() |
|
|
|
|
defer consensus.mutex.Unlock() |
|
|
|
|
consensus.tick() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (consensus *Consensus) tick() { |
|
|
|
|
if !consensus.start && consensus.isInitialLeader { |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
for k, v := range consensus.consensusTimeout { |
|
|
|
|
// stop timer in listening mode
|
|
|
|
|
if consensus.current.Mode() == Listening { |
|
|
|
|
v.Stop() |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if consensus.current.Mode() == Syncing { |
|
|
|
|
// never stop bootstrap timer here in syncing mode as it only starts once
|
|
|
|
|
// if it is stopped, bootstrap will be stopped and nodes
|
|
|
|
|
// can't start view change or join consensus
|
|
|
|
|
// the bootstrap timer will be stopped once consensus is reached or view change
|
|
|
|
|
// is succeeded
|
|
|
|
|
if k != timeoutBootstrap { |
|
|
|
|
consensus.getLogger().Debug(). |
|
|
|
|
Str("k", k.String()). |
|
|
|
|
Str("Mode", consensus.current.Mode().String()). |
|
|
|
|
Msg("[ConsensusMainLoop] consensusTimeout stopped!!!") |
|
|
|
|
v.Stop() |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if !v.CheckExpire() { |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
if k != timeoutViewChange { |
|
|
|
|
consensus.getLogger().Warn().Msg("[ConsensusMainLoop] Ops Consensus Timeout!!!") |
|
|
|
|
consensus.startViewChange() |
|
|
|
|
break |
|
|
|
|
} else { |
|
|
|
|
consensus.getLogger().Warn().Msg("[ConsensusMainLoop] Ops View Change Timeout!!!") |
|
|
|
|
consensus.startViewChange() |
|
|
|
|
break |
|
|
|
|
} |
|
|
|
|
// Close closes the consensus. If current is in normal commit phase, wait until the commit
|
|
|
|
|
// phase end.
|
|
|
|
|
func (consensus *Consensus) Close() error { |
|
|
|
|
if consensus.dHelper != nil { |
|
|
|
|
consensus.dHelper.close() |
|
|
|
|
} |
|
|
|
|
consensus.waitForCommit() |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (consensus *Consensus) BlockChannel(newBlock *types.Block) { |
|
|
|
|
consensus.GetLogger().Info(). |
|
|
|
|
Uint64("MsgBlockNum", newBlock.NumberU64()). |
|
|
|
|
Msg("[ConsensusMainLoop] Received Proposed New Block!") |
|
|
|
|
|
|
|
|
|
if newBlock.NumberU64() < consensus.BlockNum() { |
|
|
|
|
consensus.getLogger().Warn().Uint64("newBlockNum", newBlock.NumberU64()). |
|
|
|
|
Msg("[ConsensusMainLoop] received old block, abort") |
|
|
|
|
// waitForCommit wait extra 2 seconds for commit phase to finish
|
|
|
|
|
func (consensus *Consensus) waitForCommit() { |
|
|
|
|
if consensus.Mode() != Normal || consensus.phase.Get() != FBFTCommit { |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
// Sleep to wait for the full block time
|
|
|
|
|
consensus.getLogger().Info().Msg("[ConsensusMainLoop] Waiting for Block Time") |
|
|
|
|
time.AfterFunc(time.Until(consensus.NextBlockDue), func() { |
|
|
|
|
consensus.StartFinalityCount() |
|
|
|
|
consensus.mutex.Lock() |
|
|
|
|
defer consensus.mutex.Unlock() |
|
|
|
|
// Update time due for next block
|
|
|
|
|
consensus.NextBlockDue = time.Now().Add(consensus.BlockPeriod) |
|
|
|
|
|
|
|
|
|
startTime = time.Now() |
|
|
|
|
consensus.msgSender.Reset(newBlock.NumberU64()) |
|
|
|
|
// We only need to wait consensus is in normal commit phase
|
|
|
|
|
utils.Logger().Warn().Str("phase", consensus.phase.String()).Msg("[shutdown] commit phase has to wait") |
|
|
|
|
|
|
|
|
|
consensus.getLogger().Info(). |
|
|
|
|
Int("numTxs", len(newBlock.Transactions())). |
|
|
|
|
Int("numStakingTxs", len(newBlock.StakingTransactions())). |
|
|
|
|
Time("startTime", startTime). |
|
|
|
|
Int64("publicKeys", consensus.Decider.ParticipantsCount()). |
|
|
|
|
Msg("[ConsensusMainLoop] STARTING CONSENSUS") |
|
|
|
|
consensus.announce(newBlock) |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
if consensus.dHelper != nil { |
|
|
|
|
consensus.dHelper.start() |
|
|
|
|
maxWait := time.Now().Add(2 * consensus.BlockPeriod) |
|
|
|
|
for time.Now().Before(maxWait) && consensus.GetConsensusPhase() == "Commit" { |
|
|
|
|
utils.Logger().Warn().Msg("[shutdown] wait for consensus finished") |
|
|
|
|
time.Sleep(time.Millisecond * 100) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -465,29 +471,24 @@ type LastMileBlockIter struct { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// GetLastMileBlockIter get the iterator of the last mile blocks starting from number bnStart
|
|
|
|
|
func (consensus *Consensus) GetLastMileBlockIter(bnStart uint64, cb func(iter *LastMileBlockIter) error) error { |
|
|
|
|
func (consensus *Consensus) GetLastMileBlockIter(bnStart uint64) (*LastMileBlockIter, error) { |
|
|
|
|
consensus.mutex.Lock() |
|
|
|
|
defer consensus.mutex.Unlock() |
|
|
|
|
|
|
|
|
|
return consensus.getLastMileBlockIter(bnStart, cb) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// GetLastMileBlockIter get the iterator of the last mile blocks starting from number bnStart
|
|
|
|
|
func (consensus *Consensus) getLastMileBlockIter(bnStart uint64, cb func(iter *LastMileBlockIter) error) error { |
|
|
|
|
if consensus.BlockVerifier == nil { |
|
|
|
|
return errors.New("consensus haven't initialized yet") |
|
|
|
|
return nil, errors.New("consensus haven't initialized yet") |
|
|
|
|
} |
|
|
|
|
blocks, _, err := consensus.getLastMileBlocksAndMsg(bnStart) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
return cb(&LastMileBlockIter{ |
|
|
|
|
return &LastMileBlockIter{ |
|
|
|
|
blockCandidates: blocks, |
|
|
|
|
fbftLog: consensus.FBFTLog, |
|
|
|
|
verify: consensus.BlockVerifier, |
|
|
|
|
curIndex: 0, |
|
|
|
|
logger: consensus.getLogger(), |
|
|
|
|
}) |
|
|
|
|
}, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Next iterate to the next last mile block
|
|
|
|
@ -530,11 +531,12 @@ func (consensus *Consensus) getLastMileBlocksAndMsg(bnStart uint64) ([]*types.Bl |
|
|
|
|
// preCommitAndPropose commit the current block with 67% commit signatures and start
|
|
|
|
|
// proposing new block which will wait on the full commit signatures to finish
|
|
|
|
|
func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error { |
|
|
|
|
//fmt.Println("preCommitAndPropose", utils.GetPort(), blk.NumberU64())
|
|
|
|
|
if blk == nil { |
|
|
|
|
return errors.New("block to pre-commit is nil") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
leaderPriKey, err := consensus.getConsensusLeaderPrivateKey() |
|
|
|
|
leaderPriKey, err := consensus.GetConsensusLeaderPrivateKey() |
|
|
|
|
if err != nil { |
|
|
|
|
consensus.getLogger().Error().Err(err).Msg("[preCommitAndPropose] leader not found") |
|
|
|
|
return err |
|
|
|
@ -624,7 +626,7 @@ func (consensus *Consensus) tryCatchup() error { |
|
|
|
|
if consensus.BlockVerifier == nil { |
|
|
|
|
return errors.New("consensus haven't finished initialization") |
|
|
|
|
} |
|
|
|
|
initBN := consensus.getBlockNum() |
|
|
|
|
initBN := consensus.BlockNum() |
|
|
|
|
defer consensus.postCatchup(initBN) |
|
|
|
|
|
|
|
|
|
blks, msgs, err := consensus.getLastMileBlocksAndMsg(initBN) |
|
|
|
@ -638,7 +640,7 @@ func (consensus *Consensus) tryCatchup() error { |
|
|
|
|
} |
|
|
|
|
blk.SetCurrentCommitSig(msg.Payload) |
|
|
|
|
|
|
|
|
|
if err := consensus.verifyBlock(blk); err != nil { |
|
|
|
|
if err := consensus.VerifyBlock(blk); err != nil { |
|
|
|
|
consensus.getLogger().Err(err).Msg("[TryCatchup] failed block verifier") |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
@ -647,6 +649,7 @@ func (consensus *Consensus) tryCatchup() error { |
|
|
|
|
consensus.getLogger().Error().Err(err).Msg("[TryCatchup] Failed to add block to chain") |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
//fmt.Println("tryCatchup ", utils.GetPort(), blk.NumberU64())
|
|
|
|
|
select { |
|
|
|
|
// TODO: Remove this when removing dns sync and stream sync is fully up
|
|
|
|
|
case consensus.VerifiedNewBlock <- blk: |
|
|
|
@ -661,6 +664,8 @@ func (consensus *Consensus) tryCatchup() error { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (consensus *Consensus) commitBlock(blk *types.Block, committedMsg *FBFTMessage) error { |
|
|
|
|
// this function evaluates for all, leader and validators.
|
|
|
|
|
|
|
|
|
|
if consensus.Blockchain().CurrentBlock().NumberU64() < blk.NumberU64() { |
|
|
|
|
if _, err := consensus.Blockchain().InsertChain([]*types.Block{blk}, !consensus.FBFTLog.IsBlockVerified(blk.Hash())); err != nil { |
|
|
|
|
consensus.getLogger().Error().Err(err).Msg("[commitBlock] Failed to add block to chain") |
|
|
|
@ -674,102 +679,58 @@ func (consensus *Consensus) commitBlock(blk *types.Block, committedMsg *FBFTMess |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
consensus.FinishFinalityCount() |
|
|
|
|
go func() { |
|
|
|
|
consensus.PostConsensusJob(blk) |
|
|
|
|
}() |
|
|
|
|
consensus.setupForNewConsensus(blk, committedMsg) |
|
|
|
|
consensus.PostConsensusJob(blk) |
|
|
|
|
consensus.SetupForNewConsensus(blk, committedMsg) |
|
|
|
|
utils.Logger().Info().Uint64("blockNum", blk.NumberU64()). |
|
|
|
|
Str("hash", blk.Header().Hash().Hex()). |
|
|
|
|
Msg("Added New Block to Blockchain!!!") |
|
|
|
|
|
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// rotateLeader rotates the leader to the next leader in the committee.
|
|
|
|
|
// This function must be called with enabled leader rotation.
|
|
|
|
|
func (consensus *Consensus) rotateLeader(epoch *big.Int) { |
|
|
|
|
prev := consensus.getLeaderPubKey() |
|
|
|
|
bc := consensus.Blockchain() |
|
|
|
|
curNumber := bc.CurrentHeader().Number().Uint64() |
|
|
|
|
utils.Logger().Info().Msgf("[Rotating leader] epoch: %v rotation:%v numblocks:%d", epoch.Uint64(), bc.Config().IsLeaderRotation(epoch), bc.Config().LeaderRotationBlocksCount) |
|
|
|
|
leader := consensus.getLeaderPubKey() |
|
|
|
|
for i := 0; i < bc.Config().LeaderRotationBlocksCount; i++ { |
|
|
|
|
header := bc.GetHeaderByNumber(curNumber - uint64(i)) |
|
|
|
|
if header == nil { |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
// Previous epoch, we should not change leader.
|
|
|
|
|
if header.Epoch().Uint64() != epoch.Uint64() { |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
// Check if the same leader.
|
|
|
|
|
pub, err := bc.GetLeaderPubKeyFromCoinbase(header) |
|
|
|
|
if err != nil { |
|
|
|
|
utils.Logger().Error().Err(err).Msg("Failed to get leader public key from coinbase") |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
if !pub.Object.IsEqual(leader.Object) { |
|
|
|
|
// Another leader.
|
|
|
|
|
return |
|
|
|
|
// SetupForNewConsensus sets the state for new consensus
|
|
|
|
|
func (consensus *Consensus) SetupForNewConsensus(blk *types.Block, committedMsg *FBFTMessage) { |
|
|
|
|
atomic.StoreUint64(&consensus.blockNum, blk.NumberU64()+1) |
|
|
|
|
curBlockViewID := consensus.SetCurBlockViewID(committedMsg.ViewID + 1) // first view id is going to be 2.
|
|
|
|
|
prev := consensus.GetLeaderPubKey() |
|
|
|
|
idx := consensus.SetLeaderIndex(func(i int) int { |
|
|
|
|
if curBlockViewID%3 == 0 { |
|
|
|
|
return i + 1 |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// Passed all checks, we can change leader.
|
|
|
|
|
var ( |
|
|
|
|
wasFound bool |
|
|
|
|
next *bls.PublicKeyWrapper |
|
|
|
|
) |
|
|
|
|
if consensus.ShardID == shard.BeaconChainShardID { |
|
|
|
|
wasFound, next = consensus.Decider.NthNextHmy(shard.Schedule.InstanceForEpoch(epoch), leader, 1) |
|
|
|
|
} else { |
|
|
|
|
wasFound, next = consensus.Decider.NthNext(leader, 1) |
|
|
|
|
} |
|
|
|
|
if !wasFound { |
|
|
|
|
utils.Logger().Error().Msg("Failed to get next leader") |
|
|
|
|
return |
|
|
|
|
} else { |
|
|
|
|
consensus.setLeaderPubKey(next) |
|
|
|
|
} |
|
|
|
|
if consensus.isLeader() && !consensus.getLeaderPubKey().Object.IsEqual(prev.Object) { |
|
|
|
|
return i |
|
|
|
|
}) |
|
|
|
|
pps := consensus.Decider.Participants() |
|
|
|
|
consensus.pubKeyLock.Lock() |
|
|
|
|
consensus.LeaderPubKey = &pps[idx%len(pps)] |
|
|
|
|
fmt.Printf("SetupForNewConsensus :%d idx: %d future v%d new: %s prev: %s %q\n", utils.GetPort(), idx, curBlockViewID, consensus.LeaderPubKey.Bytes.Hex(), prev.Bytes.Hex(), consensus.isLeader()) |
|
|
|
|
consensus.pubKeyLock.Unlock() |
|
|
|
|
if consensus.IsLeader() && !consensus.GetLeaderPubKey().Object.IsEqual(prev.Object) { |
|
|
|
|
// leader changed
|
|
|
|
|
go func() { |
|
|
|
|
fmt.Printf("ReadySignal :%d for leader %s\n", utils.GetPort(), consensus.GetLeaderPubKey().Bytes.Hex()) |
|
|
|
|
defer fmt.Printf("Defer ReadySignal :%d for leader %s\n", utils.GetPort(), consensus.GetLeaderPubKey().Bytes.Hex()) |
|
|
|
|
consensus.ReadySignal <- SyncProposal |
|
|
|
|
|
|
|
|
|
}() |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// SetupForNewConsensus sets the state for new consensus
|
|
|
|
|
func (consensus *Consensus) setupForNewConsensus(blk *types.Block, committedMsg *FBFTMessage) { |
|
|
|
|
atomic.StoreUint64(&consensus.blockNum, blk.NumberU64()+1) |
|
|
|
|
consensus.setCurBlockViewID(committedMsg.ViewID + 1) |
|
|
|
|
consensus.LeaderPubKey = committedMsg.SenderPubkeys[0] |
|
|
|
|
var epoch *big.Int |
|
|
|
|
if blk.IsLastBlockInEpoch() { |
|
|
|
|
epoch = new(big.Int).Add(blk.Epoch(), common.Big1) |
|
|
|
|
} else { |
|
|
|
|
epoch = blk.Epoch() |
|
|
|
|
} |
|
|
|
|
if consensus.Blockchain().Config().IsLeaderRotation(epoch) { |
|
|
|
|
consensus.rotateLeader(epoch) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Update consensus keys at last so the change of leader status doesn't mess up normal flow
|
|
|
|
|
if blk.IsLastBlockInEpoch() { |
|
|
|
|
consensus.setMode(consensus.updateConsensusInformation()) |
|
|
|
|
consensus.SetMode(consensus.UpdateConsensusInformation()) |
|
|
|
|
} |
|
|
|
|
consensus.FBFTLog.PruneCacheBeforeBlock(blk.NumberU64()) |
|
|
|
|
consensus.resetState() |
|
|
|
|
consensus.ResetState() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (consensus *Consensus) postCatchup(initBN uint64) { |
|
|
|
|
if initBN < consensus.getBlockNum() { |
|
|
|
|
if initBN < consensus.BlockNum() { |
|
|
|
|
consensus.getLogger().Info(). |
|
|
|
|
Uint64("From", initBN). |
|
|
|
|
Uint64("To", consensus.getBlockNum()). |
|
|
|
|
Uint64("To", consensus.BlockNum()). |
|
|
|
|
Msg("[TryCatchup] Caught up!") |
|
|
|
|
consensus.switchPhase("TryCatchup", FBFTAnnounce) |
|
|
|
|
} |
|
|
|
|
// catch up and skip from view change trap
|
|
|
|
|
if initBN < consensus.getBlockNum() && consensus.isViewChangingMode() { |
|
|
|
|
if initBN < consensus.BlockNum() && consensus.IsViewChangingMode() { |
|
|
|
|
consensus.current.SetMode(Normal) |
|
|
|
|
consensus.consensusTimeout[timeoutViewChange].Stop() |
|
|
|
|
} |
|
|
|
@ -777,7 +738,7 @@ func (consensus *Consensus) postCatchup(initBN uint64) { |
|
|
|
|
|
|
|
|
|
// GenerateVrfAndProof generates new VRF/Proof from hash of previous block
|
|
|
|
|
func (consensus *Consensus) GenerateVrfAndProof(newHeader *block.Header) error { |
|
|
|
|
key, err := consensus.getConsensusLeaderPrivateKey() |
|
|
|
|
key, err := consensus.GetConsensusLeaderPrivateKey() |
|
|
|
|
if err != nil { |
|
|
|
|
return errors.New("[GenerateVrfAndProof] no leader private key provided") |
|
|
|
|
} |
|
|
|
@ -830,7 +791,7 @@ func (consensus *Consensus) GenerateVdfAndProof(newBlock *types.Block, vrfBlockN |
|
|
|
|
start := time.Now() |
|
|
|
|
vdf.Execute() |
|
|
|
|
duration := time.Since(start) |
|
|
|
|
consensus.GetLogger().Info(). |
|
|
|
|
consensus.getLogger().Info(). |
|
|
|
|
Dur("duration", duration). |
|
|
|
|
Msg("[ConsensusMainLoop] VDF computation finished") |
|
|
|
|
output := <-outputChannel |
|
|
|
|