Fix node sync issue which caused failure to sign and node being kicked out (#2564)

* Only rollup last crosslink for beacon chain

* Fix node sync'ing issue

* add more log

* Fix sync loop

* Remove prepared check during sync'ing

* update debug info
pull/2580/head
Rongjian Lan 5 years ago committed by GitHub
parent 146e1c2f80
commit 2cbb3fb2e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      api/service/syncing/syncing.go
  2. 15
      consensus/checks.go
  3. 9
      consensus/consensus.go
  4. 18
      consensus/consensus_v2.go
  5. 6
      consensus/validator.go
  6. 5
      core/offchain.go
  7. 6
      node/node_handler.go
  8. 21
      node/node_syncing.go

@ -805,7 +805,7 @@ Loop:
}
ss.purgeOldBlocksFromCache()
if consensus != nil {
consensus.UpdateConsensusInformation()
consensus.SetMode(consensus.UpdateConsensusInformation())
}
}
}

@ -11,6 +11,11 @@ import (
const MaxBlockNumDiff = 100
func (consensus *Consensus) validatorSanityChecks(msg *msg_pb.Message) bool {
consensus.getLogger().Debug().
Uint64("blockNum", msg.GetConsensus().BlockNum).
Uint64("viewID", msg.GetConsensus().ViewId).
Str("msgType", msg.Type.String()).
Msg("[validatorSanityChecks] Checking new message")
senderKey, err := consensus.verifySenderKey(msg)
if err != nil {
if err == shard.ErrValidNotInCommittee {
@ -42,6 +47,11 @@ func (consensus *Consensus) validatorSanityChecks(msg *msg_pb.Message) bool {
}
func (consensus *Consensus) leaderSanityChecks(msg *msg_pb.Message) bool {
consensus.getLogger().Debug().
Uint64("blockNum", msg.GetConsensus().BlockNum).
Uint64("viewID", msg.GetConsensus().ViewId).
Str("msgType", msg.Type.String()).
Msg("[leaderSanityChecks] Checking new message")
senderKey, err := consensus.verifySenderKey(msg)
if err != nil {
if err == shard.ErrValidNotInCommittee {
@ -186,6 +196,11 @@ func (consensus *Consensus) onPreparedSanityChecks(
}
func (consensus *Consensus) viewChangeSanityCheck(msg *msg_pb.Message) bool {
consensus.getLogger().Debug().
Uint64("blockNum", msg.GetConsensus().BlockNum).
Uint64("viewID", msg.GetConsensus().ViewId).
Str("msgType", msg.Type.String()).
Msg("[viewChangeSanityCheck] Checking new message")
senderKey, err := consensus.verifyViewChangeSenderKey(msg)
if err != nil {
consensus.getLogger().Error().Err(err).Msgf(

@ -112,7 +112,7 @@ type Consensus struct {
// verified block to state sync broadcast
VerifiedNewBlock chan *types.Block
// will trigger state syncing when blockNum is low
blockNumLowChan chan struct{}
BlockNumLowChan chan struct{}
// Channel for DRG protocol to send pRnd (preimage of randomness resulting from combined vrf
// randomnesses) to consensus. The first 32 bytes are randomness, the rest is for bitmap.
PRndChannel chan []byte
@ -163,11 +163,6 @@ func (consensus *Consensus) BlocksNotSynchronized() {
consensus.syncNotReadyChan <- struct{}{}
}
// WaitForSyncing informs the node syncing service to start syncing
func (consensus *Consensus) WaitForSyncing() {
<-consensus.blockNumLowChan
}
// VdfSeedSize returns the number of VRFs for VDF computation
func (consensus *Consensus) VdfSeedSize() int {
return int(consensus.Decider.ParticipantsCount()) * 2 / 3
@ -204,7 +199,7 @@ func New(
consensus.Decider = Decider
consensus.host = host
consensus.msgSender = NewMessageSender(host)
consensus.blockNumLowChan = make(chan struct{})
consensus.BlockNumLowChan = make(chan struct{})
// FBFT related
consensus.FBFTLog = NewFBFTLog()
consensus.phase = FBFTAnnounce

@ -32,12 +32,9 @@ func (consensus *Consensus) handleMessageUpdate(payload []byte) {
// when node is in ViewChanging mode, it still accepts normal messages into FBFTLog
// in order to avoid possible trap forever but drop PREPARE and COMMIT
// which are message types specifically for a node acting as leader
switch {
case (consensus.current.Mode() == ViewChanging) &&
if (consensus.current.Mode() == ViewChanging) &&
(msg.Type == msg_pb.MessageType_PREPARE ||
msg.Type == msg_pb.MessageType_COMMIT):
return
case consensus.current.Mode() == Listening:
msg.Type == msg_pb.MessageType_COMMIT) {
return
}
@ -62,16 +59,8 @@ func (consensus *Consensus) handleMessageUpdate(payload []byte) {
}
}
notMemberButStillCatchup := !consensus.Decider.AmIMemberOfCommitee() &&
msg.Type == msg_pb.MessageType_COMMITTED
if notMemberButStillCatchup {
consensus.onCommitted(msg)
return
}
intendedForValidator, intendedForLeader :=
!(consensus.IsLeader() && consensus.current.Mode() == Normal),
!consensus.IsLeader(),
consensus.IsLeader()
switch t := msg.Type; true {
@ -485,7 +474,6 @@ func (consensus *Consensus) Start(
consensus.announce(newBlock)
case msg := <-consensus.MsgChan:
consensus.getLogger().Debug().Msg("[ConsensusMainLoop] MsgChan")
consensus.handleMessageUpdate(msg)
case viewID := <-consensus.commitFinishChan:

@ -69,6 +69,7 @@ func (consensus *Consensus) prepare() {
}
// TODO: this will not return immediatey, may block
if consensus.current.Mode() != Listening {
if err := consensus.msgSender.SendWithoutRetry(
groupID,
host.ConstructP2pMessage(byte(17), networkMessage.Bytes),
@ -80,6 +81,7 @@ func (consensus *Consensus) prepare() {
Msg("[OnAnnounce] Sent Prepare Message!!")
}
}
}
consensus.getLogger().Debug().
Str("From", consensus.phase.String()).
Str("To", FBFTPrepare.String()).
@ -207,6 +209,7 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
time.Sleep(consensus.delayCommit)
}
if consensus.current.Mode() != Listening {
if err := consensus.msgSender.SendWithoutRetry(
groupID,
host.ConstructP2pMessage(byte(17), networkMessage.Bytes),
@ -219,6 +222,7 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
Msg("[OnPrepared] Sent Commit Message!!")
}
}
}
consensus.getLogger().Debug().
Str("From", consensus.phase.String()).
Str("To", FBFTCommit.String()).
@ -277,7 +281,7 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
consensus.getLogger().Debug().Uint64("MsgBlockNum", recvMsg.BlockNum).Msg("[OnCommitted] out of sync")
go func() {
select {
case consensus.blockNumLowChan <- struct{}{}:
case consensus.BlockNumLowChan <- struct{}{}:
consensus.current.SetMode(Syncing)
for _, v := range consensus.consensusTimeout {
v.Stop()

@ -147,7 +147,7 @@ func (bc *BlockChain) CommitOffChainData(
}
// Writing beacon chain cross links
if header.ShardID() == shard.BeaconChainShardID &&
if isBeaconChain &&
bc.chainConfig.IsCrossLink(block.Epoch()) &&
len(header.CrossLinks()) > 0 {
crossLinks := &types.CrossLinks{}
@ -192,6 +192,8 @@ func (bc *BlockChain) CommitOffChainData(
Msgf(msg, len(*crossLinks), num)
utils.Logger().Debug().Msgf(msg, len(*crossLinks), num)
}
if isBeaconChain {
// Roll up latest crosslinks
for i, c := uint32(0), shard.Schedule.InstanceForEpoch(
epoch,
@ -201,6 +203,7 @@ func (bc *BlockChain) CommitOffChainData(
Err(err).Msg("could not batch process last continuous crosslink")
}
}
}
// Update block reward accumulator and slashes
if isBeaconChain {

@ -7,6 +7,8 @@ import (
"math/rand"
"time"
"github.com/harmony-one/harmony/consensus"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/api/proto"
@ -456,6 +458,7 @@ func (node *Node) PostConsensusProcessing(
}
node.BroadcastCXReceipts(newBlock, commitSigAndBitmap)
} else {
if node.Consensus.Mode() != consensus.Listening {
utils.Logger().Info().
Uint64("blockNum", newBlock.NumberU64()).
Uint64("epochNum", newBlock.Epoch().Uint64()).
@ -475,13 +478,14 @@ func (node *Node) PostConsensusProcessing(
node.BroadcastCXReceipts(newBlock, commitSigAndBitmap)
}
}
}
// Broadcast client requested missing cross shard receipts if there is any
node.BroadcastMissingCXReceipts()
// Update consensus keys at last so the change of leader status doesn't mess up normal flow
if len(newBlock.Header().ShardState()) > 0 {
node.Consensus.UpdateConsensusInformation()
node.Consensus.SetMode(node.Consensus.UpdateConsensusInformation())
}
if h := node.NodeConfig.WebHooks.Hooks; h != nil {
if h.Availability != nil {

@ -203,10 +203,20 @@ func (node *Node) DoBeaconSyncing() {
// DoSyncing keep the node in sync with other peers, willJoinConsensus means the node will try to join consensus after catch up
func (node *Node) DoSyncing(bc *core.BlockChain, worker *worker.Worker, willJoinConsensus bool) {
ticker := time.NewTicker(time.Duration(node.syncFreq) * time.Second)
// TODO ek – infinite loop; add shutdown/cleanup logic
SyncingLoop:
for {
select {
case <-ticker.C:
node.doSync(bc, worker, willJoinConsensus)
case <-node.Consensus.BlockNumLowChan:
node.doSync(bc, worker, willJoinConsensus)
}
}
}
// doSync keep the node in sync with other peers, willJoinConsensus means the node will try to join consensus after catch up
func (node *Node) doSync(bc *core.BlockChain, worker *worker.Worker, willJoinConsensus bool) {
if node.stateSync == nil {
node.stateSync = syncing.CreateStateSync(node.SelfPeer.IP, node.SelfPeer.Port, node.GetSyncID())
utils.Logger().Debug().Msg("[SYNC] initialized state sync")
@ -219,14 +229,14 @@ SyncingLoop:
Err(err).
Uint32("shard_id", shardID).
Msg("cannot retrieve syncing peers")
continue SyncingLoop
return
}
if err := node.stateSync.CreateSyncConfig(peers, false); err != nil {
utils.Logger().Warn().
Err(err).
Interface("peers", peers).
Msg("[SYNC] create peers error")
continue SyncingLoop
return
}
utils.Logger().Debug().Int("len", node.stateSync.GetActivePeerNumber()).Msg("[SYNC] Get Active Peers")
}
@ -249,9 +259,6 @@ SyncingLoop:
node.stateMutex.Lock()
node.State = NodeReadyForConsensus
node.stateMutex.Unlock()
// TODO on demand syncing
time.Sleep(time.Duration(node.syncFreq) * time.Second)
}
}
// SupportBeaconSyncing sync with beacon chain for archival node in beacon chan or non-beacon node

Loading…
Cancel
Save