make state syncing compatible with new BLS consensus code

last mile issue need to be fixed with nonce too high err
pull/372/head
chaosma 6 years ago committed by GitHub
parent 5ddab11138
commit cd6475c7e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      api/service/syncing/syncing.go
  2. 30
      consensus/consensus.go
  3. 4
      consensus/consensus_leader.go
  4. 53
      consensus/consensus_validator.go
  5. 6
      consensus/errors.go
  6. 24
      node/node.go
  7. 9
      node/node_handler.go

@ -87,6 +87,8 @@ type StateSync struct {
// AddLastMileBlock add the lastest a few block into queue for syncing
func (ss *StateSync) AddLastMileBlock(block *types.Block) {
ss.syncMux.Lock()
defer ss.syncMux.Unlock()
ss.lastMileBlocks = append(ss.lastMileBlocks, block)
}

@ -92,7 +92,7 @@ type Consensus struct {
OnConsensusDone func(*types.Block)
// current consensus block to check if out of sync
ConsensusBlock chan *types.Block
ConsensusBlock chan *BFTBlockInfo
// verified block to state sync broadcast
VerifiedNewBlock chan *types.Block
@ -108,6 +108,13 @@ type Consensus struct {
OfflinePeerList []p2p.Peer
}
// BFTBlockInfo send the latest block that was in BFT consensus process as well as its consensusID to state syncing
// consensusID is necessary to make sure the out of sync node can enter the correct view
type BFTBlockInfo struct {
Block *types.Block
ConsensusID uint32
}
// BlockConsensusStatus used to keep track of the consensus status of multiple blocks received so far
// This is mainly used in the case that this node is lagging behind and needs to catch up.
// For example, the consensus moved to round N and this node received message(N).
@ -119,6 +126,16 @@ type BlockConsensusStatus struct {
state State // the latest state of the consensus
}
// UpdateConsensusID is used to update latest consensusID for nodes that out of sync
func (consensus *Consensus) UpdateConsensusID(consensusID uint32) {
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
if consensus.consensusID < consensusID {
utils.GetLogInstance().Debug("update consensusID", "myConsensusID", consensus.consensusID, "newConsensusID", consensusID)
consensus.consensusID = consensusID
}
}
// New creates a new Consensus object
func New(host p2p.Host, ShardID string, peers []p2p.Peer, leader p2p.Peer) *Consensus {
consensus := Consensus{}
@ -197,7 +214,8 @@ func New(host p2p.Host, ShardID string, peers []p2p.Peer, leader p2p.Peer) *Cons
}
// Checks the basic meta of a consensus message.
func (consensus *Consensus) checkConsensusMessage(message consensus_proto.Message, publicKey *bls.PublicKey) bool {
//
func (consensus *Consensus) checkConsensusMessage(message consensus_proto.Message, publicKey *bls.PublicKey) error {
consensusID := message.ConsensusId
blockHash := message.BlockHash
@ -205,20 +223,20 @@ func (consensus *Consensus) checkConsensusMessage(message consensus_proto.Messag
err := verifyMessageSig(publicKey, message)
if err != nil {
utils.GetLogInstance().Warn("Failed to verify the message signature", "Error", err)
return false
return ErrInvalidConsensusMessage
}
// check consensus Id
if consensusID != consensus.consensusID {
utils.GetLogInstance().Warn("Wrong consensus Id", "myConsensusId", consensus.consensusID, "theirConsensusId", consensusID, "consensus", consensus)
return false
return ErrConsensusIDNotMatch
}
if !bytes.Equal(blockHash, consensus.blockHash[:]) {
utils.GetLogInstance().Warn("Wrong blockHash", "consensus", consensus)
return false
return ErrInvalidConsensusMessage
}
return true
return nil
}
// Gets the validator peer based on validator ID.

@ -123,7 +123,7 @@ func (consensus *Consensus) processPrepareMessage(message consensus_proto.Messag
validatorPeer := consensus.getValidatorPeerByID(validatorID)
if !consensus.checkConsensusMessage(message, validatorPeer.PubKey) {
if err := consensus.checkConsensusMessage(message, validatorPeer.PubKey); err != nil {
utils.GetLogInstance().Debug("Failed to check the validator message", "validatorID", validatorID)
return
}
@ -185,7 +185,7 @@ func (consensus *Consensus) processCommitMessage(message consensus_proto.Message
validatorPeer := consensus.getValidatorPeerByID(validatorID)
if !consensus.checkConsensusMessage(message, validatorPeer.PubKey) {
if err := consensus.checkConsensusMessage(message, validatorPeer.PubKey); err != nil {
utils.GetLogInstance().Debug("Failed to check the validator message", "validatorID", validatorID)
return
}

@ -12,6 +12,30 @@ import (
"github.com/harmony-one/harmony/internal/utils"
)
// sendBFTBlockToStateSyncing will send the latest BFT consensus block to state syncing checkingjjkkkkkkkkkkkkkkkjnjk
func (consensus *Consensus) sendBFTBlockToStateSyncing(consensusID uint32) {
// validator send consensus block to state syncing
if val, ok := consensus.blocksReceived[consensusID]; ok {
consensus.mutex.Lock()
delete(consensus.blocksReceived, consensusID)
consensus.mutex.Unlock()
var blockObj types.Block
err := rlp.DecodeBytes(val.block, &blockObj)
if err != nil {
utils.GetLogInstance().Debug("failed to construct the cached block")
return
}
blockInfo := &BFTBlockInfo{Block: &blockObj, ConsensusID: consensusID}
select {
case consensus.ConsensusBlock <- blockInfo:
default:
utils.GetLogInstance().Warn("consensus block unable to sent to state sync", "height", blockObj.NumberU64(), "blockHash", blockObj.Hash().Hex())
}
}
return
}
// ProcessMessageValidator dispatches validator's consensus message.
func (consensus *Consensus) ProcessMessageValidator(payload []byte) {
message := consensus_proto.Message{}
@ -40,11 +64,20 @@ func (consensus *Consensus) processAnnounceMessage(message consensus_proto.Messa
blockHash := message.BlockHash
block := message.Payload
// Add block to received block cache
consensus.mutex.Lock()
consensus.blocksReceived[consensusID] = &BlockConsensusStatus{block, consensus.state}
consensus.mutex.Unlock()
copy(consensus.blockHash[:], blockHash[:])
consensus.block = block
if !consensus.checkConsensusMessage(message, consensus.leader.PubKey) {
if err := consensus.checkConsensusMessage(message, consensus.leader.PubKey); err != nil {
utils.GetLogInstance().Debug("Failed to check the leader message")
if err == ErrConsensusIDNotMatch {
utils.GetLogInstance().Debug("sending bft block to state syncing")
consensus.sendBFTBlockToStateSyncing(consensusID)
}
return
}
@ -56,11 +89,6 @@ func (consensus *Consensus) processAnnounceMessage(message consensus_proto.Messa
return
}
// Add block to received block cache
consensus.mutex.Lock()
consensus.blocksReceived[consensusID] = &BlockConsensusStatus{block, consensus.state}
consensus.mutex.Unlock()
// Add attack model of IncorrectResponse
if attack.GetInstance().IncorrectResponse() {
utils.GetLogInstance().Warn("IncorrectResponse attacked")
@ -102,8 +130,8 @@ func (consensus *Consensus) processPreparedMessage(message consensus_proto.Messa
// Update readyByConsensus for attack.
attack.GetInstance().UpdateConsensusReady(consensusID)
if !consensus.checkConsensusMessage(message, consensus.leader.PubKey) {
utils.GetLogInstance().Debug("Failed to check the leader message")
if err := consensus.checkConsensusMessage(message, consensus.leader.PubKey); err != nil {
utils.GetLogInstance().Debug("processPreparedMessage error", "error", err)
return
}
@ -161,8 +189,8 @@ func (consensus *Consensus) processCommittedMessage(message consensus_proto.Mess
// Update readyByConsensus for attack.
attack.GetInstance().UpdateConsensusReady(consensusID)
if !consensus.checkConsensusMessage(message, consensus.leader.PubKey) {
utils.GetLogInstance().Debug("Failed to check the leader message")
if err := consensus.checkConsensusMessage(message, consensus.leader.PubKey); err != nil {
utils.GetLogInstance().Debug("processCommittedMessage error", "error", err)
return
}
@ -195,6 +223,7 @@ func (consensus *Consensus) processCommittedMessage(message consensus_proto.Mess
consensus.state = CommittedDone
// TODO: the block catch up logic is a temporary workaround for full failure node catchup. Implement the full node catchup logic
// The logic is to roll up to the latest blocks one by one to try catching up with the leader.
// but because of checkConsensusMessage, the catchup logic will never be used here
for {
val, ok := consensus.blocksReceived[consensus.consensusID]
if ok {
@ -205,10 +234,6 @@ func (consensus *Consensus) processCommittedMessage(message consensus_proto.Mess
var blockObj types.Block
err := rlp.DecodeBytes(val.block, &blockObj)
if err != nil {
utils.GetLogInstance().Warn("Unparseable block header data", "error", err)
return
}
if err != nil {
utils.GetLogInstance().Debug("failed to construct the new block after consensus")
}

@ -34,4 +34,10 @@ var (
// ErrInvalidNumber is returned if a block's number doesn't equal it's parent's
// plus one.
ErrInvalidNumber = errors.New("invalid block number")
// ErrConsensusIDNotMatch is returned if the current consensusID is not equal message's consensusID
ErrConsensusIDNotMatch = errors.New("consensusID not match")
// ErrInvalidConsensusMessage is returned is the consensus message received is invalid
ErrInvalidConsensusMessage = errors.New("invalid consensus message")
)

@ -91,7 +91,7 @@ func (state State) String() string {
// Constants related to doing syncing.
const (
lastMileThreshold = 4
inSyncThreshold = 2
inSyncThreshold = 1
)
const (
@ -280,7 +280,7 @@ func New(host p2p.Host, consensus *bft.Consensus, db ethdb.Database) *Node {
node.Worker = worker.New(params.TestChainConfig, chain, node.Consensus, pki.GetAddressFromPublicKey(node.SelfPeer.PubKey), node.Consensus.ShardID)
node.AddSmartContractsToPendingTransactions()
node.Consensus.ConsensusBlock = make(chan *types.Block)
node.Consensus.ConsensusBlock = make(chan *bft.BFTBlockInfo)
node.Consensus.VerifiedNewBlock = make(chan *types.Block)
}
@ -301,11 +301,16 @@ func New(host p2p.Host, consensus *bft.Consensus, db ethdb.Database) *Node {
}
// IsOutOfSync checks whether the node is out of sync by comparing latest block with consensus block
func (node *Node) IsOutOfSync(consensusBlock *types.Block) bool {
func (node *Node) IsOutOfSync(consensusBlockInfo *bft.BFTBlockInfo) bool {
consensusBlock := consensusBlockInfo.Block
consensusID := consensusBlockInfo.ConsensusID
myHeight := node.blockchain.CurrentBlock().NumberU64()
newHeight := consensusBlock.NumberU64()
utils.GetLogInstance().Debug("[SYNC]", "myHeight", myHeight, "newHeight", newHeight)
if newHeight-myHeight <= inSyncThreshold {
node.stateSync.AddLastMileBlock(consensusBlock)
node.Consensus.UpdateConsensusID(consensusID + 1)
return false
}
// cache latest blocks for last mile catch up
@ -321,21 +326,24 @@ func (node *Node) DoSyncing() {
select {
// in current implementation logic, timeout means in sync
case <-time.After(5 * time.Second):
//myHeight := node.blockchain.CurrentBlock().NumberU64()
//utils.GetLogInstance().Debug("[SYNC]", "currentHeight", myHeight)
node.stateMutex.Lock()
node.State = NodeReadyForConsensus
node.stateMutex.Unlock()
continue
case consensusBlock := <-node.Consensus.ConsensusBlock:
// never reached from chao
if !node.IsOutOfSync(consensusBlock) {
case consensusBlockInfo := <-node.Consensus.ConsensusBlock:
if !node.IsOutOfSync(consensusBlockInfo) {
if node.State == NodeNotInSync {
utils.GetLogInstance().Info("[SYNC] Node is now IN SYNC!")
node.stateSync.CloseConnections()
node.stateSync = nil
}
node.stateMutex.Lock()
node.State = NodeReadyForConsensus
node.stateMutex.Unlock()
// wait for last mile block finish; think a better way
time.Sleep(200 * time.Millisecond)
node.stateSync.CloseConnections()
node.stateSync = nil
continue
} else {
utils.GetLogInstance().Debug("[SYNC] node is out of sync")

@ -200,17 +200,8 @@ func (node *Node) VerifyNewBlock(newBlock *types.Block) bool {
err := node.blockchain.ValidateNewBlock(newBlock, pki.GetAddressFromPublicKey(node.SelfPeer.PubKey))
if err != nil {
utils.GetLogInstance().Debug("Failed verifying new block", "Error", err, "tx", newBlock.Transactions()[0])
// send consensus block to state syncing
select {
case node.Consensus.ConsensusBlock <- newBlock:
default:
utils.GetLogInstance().Warn("consensus block unable to sent to state sync", "height", newBlock.NumberU64(), "blockHash", newBlock.Hash().Hex())
}
return false
}
return true
}

Loading…
Cancel
Save