|
|
|
@ -12,6 +12,30 @@ import ( |
|
|
|
|
"github.com/harmony-one/harmony/internal/utils" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
// sendBFTBlockToStateSyncing will send the latest BFT consensus block to state syncing checkingjjkkkkkkkkkkkkkkkjnjk
|
|
|
|
|
func (consensus *Consensus) sendBFTBlockToStateSyncing(consensusID uint32) { |
|
|
|
|
// validator send consensus block to state syncing
|
|
|
|
|
if val, ok := consensus.blocksReceived[consensusID]; ok { |
|
|
|
|
consensus.mutex.Lock() |
|
|
|
|
delete(consensus.blocksReceived, consensusID) |
|
|
|
|
consensus.mutex.Unlock() |
|
|
|
|
|
|
|
|
|
var blockObj types.Block |
|
|
|
|
err := rlp.DecodeBytes(val.block, &blockObj) |
|
|
|
|
if err != nil { |
|
|
|
|
utils.GetLogInstance().Debug("failed to construct the cached block") |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
blockInfo := &BFTBlockInfo{Block: &blockObj, ConsensusID: consensusID} |
|
|
|
|
select { |
|
|
|
|
case consensus.ConsensusBlock <- blockInfo: |
|
|
|
|
default: |
|
|
|
|
utils.GetLogInstance().Warn("consensus block unable to sent to state sync", "height", blockObj.NumberU64(), "blockHash", blockObj.Hash().Hex()) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// ProcessMessageValidator dispatches validator's consensus message.
|
|
|
|
|
func (consensus *Consensus) ProcessMessageValidator(payload []byte) { |
|
|
|
|
message := consensus_proto.Message{} |
|
|
|
@ -40,11 +64,20 @@ func (consensus *Consensus) processAnnounceMessage(message consensus_proto.Messa |
|
|
|
|
blockHash := message.BlockHash |
|
|
|
|
block := message.Payload |
|
|
|
|
|
|
|
|
|
// Add block to received block cache
|
|
|
|
|
consensus.mutex.Lock() |
|
|
|
|
consensus.blocksReceived[consensusID] = &BlockConsensusStatus{block, consensus.state} |
|
|
|
|
consensus.mutex.Unlock() |
|
|
|
|
|
|
|
|
|
copy(consensus.blockHash[:], blockHash[:]) |
|
|
|
|
consensus.block = block |
|
|
|
|
|
|
|
|
|
if !consensus.checkConsensusMessage(message, consensus.leader.PubKey) { |
|
|
|
|
if err := consensus.checkConsensusMessage(message, consensus.leader.PubKey); err != nil { |
|
|
|
|
utils.GetLogInstance().Debug("Failed to check the leader message") |
|
|
|
|
if err == ErrConsensusIDNotMatch { |
|
|
|
|
utils.GetLogInstance().Debug("sending bft block to state syncing") |
|
|
|
|
consensus.sendBFTBlockToStateSyncing(consensusID) |
|
|
|
|
} |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -56,11 +89,6 @@ func (consensus *Consensus) processAnnounceMessage(message consensus_proto.Messa |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Add block to received block cache
|
|
|
|
|
consensus.mutex.Lock() |
|
|
|
|
consensus.blocksReceived[consensusID] = &BlockConsensusStatus{block, consensus.state} |
|
|
|
|
consensus.mutex.Unlock() |
|
|
|
|
|
|
|
|
|
// Add attack model of IncorrectResponse
|
|
|
|
|
if attack.GetInstance().IncorrectResponse() { |
|
|
|
|
utils.GetLogInstance().Warn("IncorrectResponse attacked") |
|
|
|
@ -102,8 +130,8 @@ func (consensus *Consensus) processPreparedMessage(message consensus_proto.Messa |
|
|
|
|
// Update readyByConsensus for attack.
|
|
|
|
|
attack.GetInstance().UpdateConsensusReady(consensusID) |
|
|
|
|
|
|
|
|
|
if !consensus.checkConsensusMessage(message, consensus.leader.PubKey) { |
|
|
|
|
utils.GetLogInstance().Debug("Failed to check the leader message") |
|
|
|
|
if err := consensus.checkConsensusMessage(message, consensus.leader.PubKey); err != nil { |
|
|
|
|
utils.GetLogInstance().Debug("processPreparedMessage error", "error", err) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -161,8 +189,8 @@ func (consensus *Consensus) processCommittedMessage(message consensus_proto.Mess |
|
|
|
|
// Update readyByConsensus for attack.
|
|
|
|
|
attack.GetInstance().UpdateConsensusReady(consensusID) |
|
|
|
|
|
|
|
|
|
if !consensus.checkConsensusMessage(message, consensus.leader.PubKey) { |
|
|
|
|
utils.GetLogInstance().Debug("Failed to check the leader message") |
|
|
|
|
if err := consensus.checkConsensusMessage(message, consensus.leader.PubKey); err != nil { |
|
|
|
|
utils.GetLogInstance().Debug("processCommittedMessage error", "error", err) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -195,6 +223,7 @@ func (consensus *Consensus) processCommittedMessage(message consensus_proto.Mess |
|
|
|
|
consensus.state = CommittedDone |
|
|
|
|
// TODO: the block catch up logic is a temporary workaround for full failure node catchup. Implement the full node catchup logic
|
|
|
|
|
// The logic is to roll up to the latest blocks one by one to try catching up with the leader.
|
|
|
|
|
// but because of checkConsensusMessage, the catchup logic will never be used here
|
|
|
|
|
for { |
|
|
|
|
val, ok := consensus.blocksReceived[consensus.consensusID] |
|
|
|
|
if ok { |
|
|
|
@ -205,10 +234,6 @@ func (consensus *Consensus) processCommittedMessage(message consensus_proto.Mess |
|
|
|
|
|
|
|
|
|
var blockObj types.Block |
|
|
|
|
err := rlp.DecodeBytes(val.block, &blockObj) |
|
|
|
|
if err != nil { |
|
|
|
|
utils.GetLogInstance().Warn("Unparseable block header data", "error", err) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
if err != nil { |
|
|
|
|
utils.GetLogInstance().Debug("failed to construct the new block after consensus") |
|
|
|
|
} |
|
|
|
|