diff --git a/api/service/syncing/syncing.go b/api/service/syncing/syncing.go index 9bcc6e8af..e24ab9189 100644 --- a/api/service/syncing/syncing.go +++ b/api/service/syncing/syncing.go @@ -87,6 +87,8 @@ type StateSync struct { // AddLastMileBlock add the lastest a few block into queue for syncing func (ss *StateSync) AddLastMileBlock(block *types.Block) { + ss.syncMux.Lock() + defer ss.syncMux.Unlock() ss.lastMileBlocks = append(ss.lastMileBlocks, block) } diff --git a/consensus/consensus.go b/consensus/consensus.go index 743911f6e..06a41d0d9 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -92,7 +92,7 @@ type Consensus struct { OnConsensusDone func(*types.Block) // current consensus block to check if out of sync - ConsensusBlock chan *types.Block + ConsensusBlock chan *BFTBlockInfo // verified block to state sync broadcast VerifiedNewBlock chan *types.Block @@ -108,6 +108,13 @@ type Consensus struct { OfflinePeerList []p2p.Peer } +// BFTBlockInfo send the latest block that was in BFT consensus process as well as its consensusID to state syncing +// consensusID is necessary to make sure the out of sync node can enter the correct view +type BFTBlockInfo struct { + Block *types.Block + ConsensusID uint32 +} + // BlockConsensusStatus used to keep track of the consensus status of multiple blocks received so far // This is mainly used in the case that this node is lagging behind and needs to catch up. // For example, the consensus moved to round N and this node received message(N). @@ -119,6 +126,16 @@ type BlockConsensusStatus struct { state State // the latest state of the consensus } +// UpdateConsensusID is used to update latest consensusID for nodes that out of sync +func (consensus *Consensus) UpdateConsensusID(consensusID uint32) { + consensus.mutex.Lock() + defer consensus.mutex.Unlock() + if consensus.consensusID < consensusID { + utils.GetLogInstance().Debug("update consensusID", "myConsensusID", consensus.consensusID, "newConsensusID", consensusID) + consensus.consensusID = consensusID + } +} + // New creates a new Consensus object func New(host p2p.Host, ShardID string, peers []p2p.Peer, leader p2p.Peer) *Consensus { consensus := Consensus{} @@ -197,7 +214,8 @@ func New(host p2p.Host, ShardID string, peers []p2p.Peer, leader p2p.Peer) *Cons } // Checks the basic meta of a consensus message. -func (consensus *Consensus) checkConsensusMessage(message consensus_proto.Message, publicKey *bls.PublicKey) bool { +// +func (consensus *Consensus) checkConsensusMessage(message consensus_proto.Message, publicKey *bls.PublicKey) error { consensusID := message.ConsensusId blockHash := message.BlockHash @@ -205,20 +223,20 @@ func (consensus *Consensus) checkConsensusMessage(message consensus_proto.Messag err := verifyMessageSig(publicKey, message) if err != nil { utils.GetLogInstance().Warn("Failed to verify the message signature", "Error", err) - return false + return ErrInvalidConsensusMessage } // check consensus Id if consensusID != consensus.consensusID { utils.GetLogInstance().Warn("Wrong consensus Id", "myConsensusId", consensus.consensusID, "theirConsensusId", consensusID, "consensus", consensus) - return false + return ErrConsensusIDNotMatch } if !bytes.Equal(blockHash, consensus.blockHash[:]) { utils.GetLogInstance().Warn("Wrong blockHash", "consensus", consensus) - return false + return ErrInvalidConsensusMessage } - return true + return nil } // Gets the validator peer based on validator ID. diff --git a/consensus/consensus_leader.go b/consensus/consensus_leader.go index b3b918cef..e1fcb241b 100644 --- a/consensus/consensus_leader.go +++ b/consensus/consensus_leader.go @@ -123,7 +123,7 @@ func (consensus *Consensus) processPrepareMessage(message consensus_proto.Messag validatorPeer := consensus.getValidatorPeerByID(validatorID) - if !consensus.checkConsensusMessage(message, validatorPeer.PubKey) { + if err := consensus.checkConsensusMessage(message, validatorPeer.PubKey); err != nil { utils.GetLogInstance().Debug("Failed to check the validator message", "validatorID", validatorID) return } @@ -185,7 +185,7 @@ func (consensus *Consensus) processCommitMessage(message consensus_proto.Message validatorPeer := consensus.getValidatorPeerByID(validatorID) - if !consensus.checkConsensusMessage(message, validatorPeer.PubKey) { + if err := consensus.checkConsensusMessage(message, validatorPeer.PubKey); err != nil { utils.GetLogInstance().Debug("Failed to check the validator message", "validatorID", validatorID) return } diff --git a/consensus/consensus_validator.go b/consensus/consensus_validator.go index 8374d6efa..783095173 100644 --- a/consensus/consensus_validator.go +++ b/consensus/consensus_validator.go @@ -12,6 +12,30 @@ import ( "github.com/harmony-one/harmony/internal/utils" ) +// sendBFTBlockToStateSyncing will send the latest BFT consensus block to state syncing checkingjjkkkkkkkkkkkkkkkjnjk +func (consensus *Consensus) sendBFTBlockToStateSyncing(consensusID uint32) { + // validator send consensus block to state syncing + if val, ok := consensus.blocksReceived[consensusID]; ok { + consensus.mutex.Lock() + delete(consensus.blocksReceived, consensusID) + consensus.mutex.Unlock() + + var blockObj types.Block + err := rlp.DecodeBytes(val.block, &blockObj) + if err != nil { + utils.GetLogInstance().Debug("failed to construct the cached block") + return + } + blockInfo := &BFTBlockInfo{Block: &blockObj, ConsensusID: consensusID} + select { + case consensus.ConsensusBlock <- blockInfo: + default: + utils.GetLogInstance().Warn("consensus block unable to sent to state sync", "height", blockObj.NumberU64(), "blockHash", blockObj.Hash().Hex()) + } + } + return +} + // ProcessMessageValidator dispatches validator's consensus message. func (consensus *Consensus) ProcessMessageValidator(payload []byte) { message := consensus_proto.Message{} @@ -40,11 +64,20 @@ func (consensus *Consensus) processAnnounceMessage(message consensus_proto.Messa blockHash := message.BlockHash block := message.Payload + // Add block to received block cache + consensus.mutex.Lock() + consensus.blocksReceived[consensusID] = &BlockConsensusStatus{block, consensus.state} + consensus.mutex.Unlock() + copy(consensus.blockHash[:], blockHash[:]) consensus.block = block - if !consensus.checkConsensusMessage(message, consensus.leader.PubKey) { + if err := consensus.checkConsensusMessage(message, consensus.leader.PubKey); err != nil { utils.GetLogInstance().Debug("Failed to check the leader message") + if err == ErrConsensusIDNotMatch { + utils.GetLogInstance().Debug("sending bft block to state syncing") + consensus.sendBFTBlockToStateSyncing(consensusID) + } return } @@ -56,11 +89,6 @@ func (consensus *Consensus) processAnnounceMessage(message consensus_proto.Messa return } - // Add block to received block cache - consensus.mutex.Lock() - consensus.blocksReceived[consensusID] = &BlockConsensusStatus{block, consensus.state} - consensus.mutex.Unlock() - // Add attack model of IncorrectResponse if attack.GetInstance().IncorrectResponse() { utils.GetLogInstance().Warn("IncorrectResponse attacked") @@ -102,8 +130,8 @@ func (consensus *Consensus) processPreparedMessage(message consensus_proto.Messa // Update readyByConsensus for attack. attack.GetInstance().UpdateConsensusReady(consensusID) - if !consensus.checkConsensusMessage(message, consensus.leader.PubKey) { - utils.GetLogInstance().Debug("Failed to check the leader message") + if err := consensus.checkConsensusMessage(message, consensus.leader.PubKey); err != nil { + utils.GetLogInstance().Debug("processPreparedMessage error", "error", err) return } @@ -161,8 +189,8 @@ func (consensus *Consensus) processCommittedMessage(message consensus_proto.Mess // Update readyByConsensus for attack. attack.GetInstance().UpdateConsensusReady(consensusID) - if !consensus.checkConsensusMessage(message, consensus.leader.PubKey) { - utils.GetLogInstance().Debug("Failed to check the leader message") + if err := consensus.checkConsensusMessage(message, consensus.leader.PubKey); err != nil { + utils.GetLogInstance().Debug("processCommittedMessage error", "error", err) return } @@ -195,6 +223,7 @@ func (consensus *Consensus) processCommittedMessage(message consensus_proto.Mess consensus.state = CommittedDone // TODO: the block catch up logic is a temporary workaround for full failure node catchup. Implement the full node catchup logic // The logic is to roll up to the latest blocks one by one to try catching up with the leader. + // but because of checkConsensusMessage, the catchup logic will never be used here for { val, ok := consensus.blocksReceived[consensus.consensusID] if ok { @@ -205,10 +234,6 @@ func (consensus *Consensus) processCommittedMessage(message consensus_proto.Mess var blockObj types.Block err := rlp.DecodeBytes(val.block, &blockObj) - if err != nil { - utils.GetLogInstance().Warn("Unparseable block header data", "error", err) - return - } if err != nil { utils.GetLogInstance().Debug("failed to construct the new block after consensus") } diff --git a/consensus/errors.go b/consensus/errors.go index a005c5f63..6156c4aed 100644 --- a/consensus/errors.go +++ b/consensus/errors.go @@ -34,4 +34,10 @@ var ( // ErrInvalidNumber is returned if a block's number doesn't equal it's parent's // plus one. ErrInvalidNumber = errors.New("invalid block number") + + // ErrConsensusIDNotMatch is returned if the current consensusID is not equal message's consensusID + ErrConsensusIDNotMatch = errors.New("consensusID not match") + + // ErrInvalidConsensusMessage is returned is the consensus message received is invalid + ErrInvalidConsensusMessage = errors.New("invalid consensus message") ) diff --git a/node/node.go b/node/node.go index 584696976..375662208 100644 --- a/node/node.go +++ b/node/node.go @@ -91,7 +91,7 @@ func (state State) String() string { // Constants related to doing syncing. const ( lastMileThreshold = 4 - inSyncThreshold = 2 + inSyncThreshold = 1 ) const ( @@ -280,7 +280,7 @@ func New(host p2p.Host, consensus *bft.Consensus, db ethdb.Database) *Node { node.Worker = worker.New(params.TestChainConfig, chain, node.Consensus, pki.GetAddressFromPublicKey(node.SelfPeer.PubKey), node.Consensus.ShardID) node.AddSmartContractsToPendingTransactions() - node.Consensus.ConsensusBlock = make(chan *types.Block) + node.Consensus.ConsensusBlock = make(chan *bft.BFTBlockInfo) node.Consensus.VerifiedNewBlock = make(chan *types.Block) } @@ -301,11 +301,16 @@ func New(host p2p.Host, consensus *bft.Consensus, db ethdb.Database) *Node { } // IsOutOfSync checks whether the node is out of sync by comparing latest block with consensus block -func (node *Node) IsOutOfSync(consensusBlock *types.Block) bool { +func (node *Node) IsOutOfSync(consensusBlockInfo *bft.BFTBlockInfo) bool { + consensusBlock := consensusBlockInfo.Block + consensusID := consensusBlockInfo.ConsensusID + myHeight := node.blockchain.CurrentBlock().NumberU64() newHeight := consensusBlock.NumberU64() utils.GetLogInstance().Debug("[SYNC]", "myHeight", myHeight, "newHeight", newHeight) if newHeight-myHeight <= inSyncThreshold { + node.stateSync.AddLastMileBlock(consensusBlock) + node.Consensus.UpdateConsensusID(consensusID + 1) return false } // cache latest blocks for last mile catch up @@ -321,21 +326,24 @@ func (node *Node) DoSyncing() { select { // in current implementation logic, timeout means in sync case <-time.After(5 * time.Second): + //myHeight := node.blockchain.CurrentBlock().NumberU64() + //utils.GetLogInstance().Debug("[SYNC]", "currentHeight", myHeight) node.stateMutex.Lock() node.State = NodeReadyForConsensus node.stateMutex.Unlock() continue - case consensusBlock := <-node.Consensus.ConsensusBlock: - // never reached from chao - if !node.IsOutOfSync(consensusBlock) { + case consensusBlockInfo := <-node.Consensus.ConsensusBlock: + if !node.IsOutOfSync(consensusBlockInfo) { if node.State == NodeNotInSync { utils.GetLogInstance().Info("[SYNC] Node is now IN SYNC!") - node.stateSync.CloseConnections() - node.stateSync = nil } node.stateMutex.Lock() node.State = NodeReadyForConsensus node.stateMutex.Unlock() + // wait for last mile block finish; think a better way + time.Sleep(200 * time.Millisecond) + node.stateSync.CloseConnections() + node.stateSync = nil continue } else { utils.GetLogInstance().Debug("[SYNC] node is out of sync") diff --git a/node/node_handler.go b/node/node_handler.go index 674b19b2e..881935775 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -200,17 +200,8 @@ func (node *Node) VerifyNewBlock(newBlock *types.Block) bool { err := node.blockchain.ValidateNewBlock(newBlock, pki.GetAddressFromPublicKey(node.SelfPeer.PubKey)) if err != nil { utils.GetLogInstance().Debug("Failed verifying new block", "Error", err, "tx", newBlock.Transactions()[0]) - - // send consensus block to state syncing - select { - case node.Consensus.ConsensusBlock <- newBlock: - default: - utils.GetLogInstance().Warn("consensus block unable to sent to state sync", "height", newBlock.NumberU64(), "blockHash", newBlock.Hash().Hex()) - } - return false } - return true }