diff --git a/consensus/consensus_msg_sender.go b/consensus/consensus_msg_sender.go index bbd63a746..b43b69306 100644 --- a/consensus/consensus_msg_sender.go +++ b/consensus/consensus_msg_sender.go @@ -115,6 +115,7 @@ func (sender *MessageSender) StopRetry(msgType msg_pb.MessageType) { if ok { msgRetry := data.(*MessageRetry) atomic.StoreUint32(&msgRetry.isActive, 0) + utils.Logger().Info().Str("type", msgType.String()).Uint32("isActive", msgRetry.isActive).Msg("STOPPING RETRY") } } diff --git a/consensus/consensus_v2.go b/consensus/consensus_v2.go index 4b9459277..9b02635e7 100644 --- a/consensus/consensus_v2.go +++ b/consensus/consensus_v2.go @@ -142,6 +142,7 @@ func (consensus *Consensus) finalCommit() { return } + // if leader success finalize the block, send committed message to validators // TODO: once leader rotation is implemented, leader who is about to be switched out // needs to send the committed message immediately so the next leader can @@ -484,6 +485,9 @@ func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error { return err } + consensus.mutex.Lock() + defer consensus.mutex.Unlock() + network, err := consensus.construct(msg_pb.MessageType_COMMITTED, nil, []*bls.PrivateKeyWrapper{leaderPriKey}) if err != nil { return errors.Wrap(err, "[preCommitAndPropose] Unable to construct Committed message") diff --git a/consensus/leader.go b/consensus/leader.go index d67301311..fa9da37c5 100644 --- a/consensus/leader.go +++ b/consensus/leader.go @@ -3,6 +3,8 @@ package consensus import ( "time" + "github.com/harmony-one/harmony/internal/utils" + "github.com/harmony-one/harmony/crypto/bls" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" @@ -199,8 +201,9 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) { } func (consensus *Consensus) onCommit(msg *msg_pb.Message) { - if consensus.GetViewChangingID() == 10 { - return + utils.Logger().Info().Msgf("ViewChanging %d %d", consensus.GetCurBlockViewID(), consensus.GetViewChangingID()) + if consensus.GetCurBlockViewID() == 10 { + //return } recvMsg, err := consensus.ParseFBFTMessage(msg) if err != nil { diff --git a/consensus/quorum/quorum.go b/consensus/quorum/quorum.go index 6f9c68678..ddca4bcd9 100644 --- a/consensus/quorum/quorum.go +++ b/consensus/quorum/quorum.go @@ -2,6 +2,7 @@ package quorum import ( "fmt" + "github.com/harmony-one/harmony/internal/configs/sharding" "math/big" "github.com/harmony-one/harmony/crypto/bls" @@ -73,6 +74,7 @@ type ParticipantTracker interface { IndexOf(bls.SerializedPublicKey) int ParticipantsCount() int64 NthNext(*bls.PublicKeyWrapper, int) (bool, *bls.PublicKeyWrapper) + NthNextHmy(shardingconfig.Instance, *bls.PublicKeyWrapper, int) (bool, *bls.PublicKeyWrapper) UpdateParticipants(pubKeys []bls.PublicKeyWrapper) } @@ -218,6 +220,23 @@ func (s *cIdentities) NthNext(pubKey *bls.PublicKeyWrapper, next int) (bool, *bl return found, &s.publicKeys[idx] } +// NthNextHmy return the Nth next pubkey of Harmony nodes, next can be negative number +func (s *cIdentities) NthNextHmy(instance shardingconfig.Instance, pubKey *bls.PublicKeyWrapper, next int) (bool, *bls.PublicKeyWrapper) { + found := false + + idx := s.IndexOf(pubKey.Bytes) + if idx != -1 { + found = true + } + numNodes := instance.NumHarmonyOperatedNodesPerShard() + // sanity check to avoid out of bound access + if numNodes <= 0 || numNodes > len(s.publicKeys) { + numNodes = len(s.publicKeys) + } + idx = (idx + next) % numNodes + return found, &s.publicKeys[idx] +} + func (s *cIdentities) Participants() multibls.PublicKeys { return s.publicKeys } diff --git a/consensus/validator.go b/consensus/validator.go index 7bfc1ad1f..55f6c1f1d 100644 --- a/consensus/validator.go +++ b/consensus/validator.go @@ -227,6 +227,12 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) { consensus.getLogger().Warn().Msg("[OnCommitted] unable to parse msg") return } + + consensus.getLogger().Info(). + Uint64("MsgBlockNum", recvMsg.BlockNum). + Uint64("MsgViewID", recvMsg.ViewID). + Msg("[OnCommitted] Received committed message") + // It's ok to receive committed message for last block due to pipelining. // The committed message for last block could include more signatures now. if recvMsg.BlockNum < consensus.blockNum-1 { diff --git a/consensus/view_change.go b/consensus/view_change.go index d9235adad..4c71d2197 100644 --- a/consensus/view_change.go +++ b/consensus/view_change.go @@ -1,6 +1,8 @@ package consensus import ( + "github.com/harmony-one/harmony/shard" + "math/big" "sync" "time" @@ -90,20 +92,122 @@ func (pm *State) GetViewChangeDuraion() time.Duration { return time.Duration(diff * diff * int64(viewChangeDuration)) } -// GetNextLeaderKey uniquely determine who is the leader for given viewID -func (consensus *Consensus) GetNextLeaderKey(viewID uint64) *bls.PublicKeyWrapper { +// fallbackNextViewID return the next view ID and duration when there is an exception +// to calculate the time-based viewId +func (consensus *Consensus) fallbackNextViewID() (uint64, time.Duration) { + diff := int64(consensus.GetViewChangingID() + 1 - consensus.GetCurBlockViewID()) + if diff <= 0 { + diff = int64(1) + } + consensus.getLogger().Error(). + Int64("diff", diff). + Msg("[fallbackNextViewID] use legacy viewID algorithm") + return consensus.GetViewChangingID() + 1, time.Duration(diff * diff * int64(viewChangeDuration)) +} + +// getNextViewID return the next view ID based on the timestamp +// The next view ID is calculated based on the difference of validator's timestamp +// and the block's timestamp. So that it can be deterministic to return the next view ID +// only based on the blockchain block and the validator's current timestamp. +// The next view ID is the single factor used to determine +// the next leader, so it is mod the number of nodes per shard. +// It returns the next viewID and duration of the view change +// The view change duration is a fixed duration now to avoid stuck into offline nodes during +// the view change. +// viewID is only used as the fallback mechansim to determine the nextViewID +func (consensus *Consensus) getNextViewID() (uint64, time.Duration) { + // handle corner case at first + if consensus.Blockchain == nil { + return consensus.fallbackNextViewID() + } + curHeader := consensus.Blockchain.CurrentHeader() + if curHeader == nil { + return consensus.fallbackNextViewID() + } + blockTimestamp := curHeader.Time().Int64() + curTimestamp := time.Now().Unix() + + // timestamp messed up in current validator node + if curTimestamp <= blockTimestamp { + return consensus.fallbackNextViewID() + } + totalNode := consensus.Decider.ParticipantsCount() + // diff is at least 1, and it won't exceed the totalNode + diff := uint64(((curTimestamp - blockTimestamp) / viewChangeTimeout) % int64(totalNode)) + nextViewID := diff + consensus.GetCurBlockViewID() + + consensus.getLogger().Info(). + Int64("curTimestamp", curTimestamp). + Int64("blockTimestamp", blockTimestamp). + Uint64("nextViewID", nextViewID). + Uint64("curViewID", consensus.GetCurBlockViewID()). + Msg("[getNextViewID]") + + // duration is always the fixed view change duration for synchronous view change + return nextViewID, viewChangeDuration +} + +// getNextLeaderKey uniquely determine who is the leader for given viewID +// It reads the current leader's pubkey based on the blockchain data and returns +// the next leader based on the gap of the viewID of the view change and the last +// know view id of the block. +func (consensus *Consensus) getNextLeaderKey(viewID uint64) *bls.PublicKeyWrapper { gap := 1 + + if viewID > consensus.GetCurBlockViewID() { + gap = int(viewID - consensus.GetCurBlockViewID()) + } + var lastLeaderPubKey *bls.PublicKeyWrapper + var err error + epoch := big.NewInt(0) + if consensus.Blockchain == nil { + consensus.getLogger().Error().Msg("[getNextLeaderKey] ChainReader is nil. Use consensus.LeaderPubKey") + lastLeaderPubKey = consensus.LeaderPubKey + } else { + curHeader := consensus.Blockchain.CurrentHeader() + if curHeader == nil { + consensus.getLogger().Error().Msg("[getNextLeaderKey] Failed to get current header from blockchain") + lastLeaderPubKey = consensus.LeaderPubKey + } else { + // this is the truth of the leader based on blockchain blocks + lastLeaderPubKey, err = consensus.getLeaderPubKeyFromCoinbase(curHeader) + if err != nil || lastLeaderPubKey == nil { + consensus.getLogger().Error().Err(err). + Msg("[getNextLeaderKey] Unable to get leaderPubKey from coinbase. Set it to consensus.LeaderPubKey") + lastLeaderPubKey = consensus.LeaderPubKey + } + epoch = curHeader.Epoch() + // viewchange happened at the first block of new epoch + // use the LeaderPubKey as the base of the next leader + // as we shouldn't use lastLeader from coinbase as the base. + // The LeaderPubKey should be updated to the index 0 of the committee + if curHeader.IsLastBlockInEpoch() { + consensus.getLogger().Info().Msg("[getNextLeaderKey] view change in the first block of new epoch") + lastLeaderPubKey = consensus.LeaderPubKey + } + } + } consensus.getLogger().Info(). + Str("lastLeaderPubKey", lastLeaderPubKey.Bytes.Hex()). Str("leaderPubKey", consensus.LeaderPubKey.Bytes.Hex()). + Int("gap", gap). Uint64("newViewID", viewID). Uint64("myCurBlockViewID", consensus.GetCurBlockViewID()). - Msg("[GetNextLeaderKey] got leaderPubKey from coinbase") - wasFound, next := consensus.Decider.NthNext(consensus.LeaderPubKey, gap) + Msg("[getNextLeaderKey] got leaderPubKey from coinbase") + // wasFound, next := consensus.Decider.NthNext(lastLeaderPubKey, gap) + // FIXME: rotate leader on harmony nodes only before fully externalization + wasFound, next := consensus.Decider.NthNextHmy( + shard.Schedule.InstanceForEpoch(epoch), + lastLeaderPubKey, + gap) if !wasFound { consensus.getLogger().Warn(). Str("key", consensus.LeaderPubKey.Bytes.Hex()). - Msg("GetNextLeaderKey: currentLeaderKey not found") + Msg("[getNextLeaderKey] currentLeaderKey not found") } + consensus.getLogger().Info(). + Str("nextLeader", next.Bytes.Hex()). + Msg("[getNextLeaderKey] next Leader") return next } @@ -125,7 +229,7 @@ func (consensus *Consensus) startViewChange(viewID uint64) { consensus.consensusTimeout[timeoutBootstrap].Stop() consensus.current.SetMode(ViewChanging) consensus.SetViewChangingID(viewID) - consensus.LeaderPubKey = consensus.GetNextLeaderKey(viewID) + consensus.LeaderPubKey = consensus.getNextLeaderKey(viewID) duration := consensus.current.GetViewChangeDuraion() consensus.getLogger().Warn(). diff --git a/node/node_newblock.go b/node/node_newblock.go index 26227d7fc..c9de062ea 100644 --- a/node/node_newblock.go +++ b/node/node_newblock.go @@ -57,7 +57,7 @@ func (node *Node) WaitForConsensusReadyV2(readySignal chan struct{}, commitSigsC if len(commitSigs) > bls.BLSSignatureSizeInBytes { newCommitSigsChan <- commitSigs } - case <-time.After(5 * time.Second): + case <-time.After(4 * time.Second): sigs, err := node.Consensus.BlockCommitSigs(node.Blockchain().CurrentBlock().NumberU64()) if err != nil { @@ -65,7 +65,6 @@ func (node *Node) WaitForConsensusReadyV2(readySignal chan struct{}, commitSigsC } else { newCommitSigsChan <- sigs } - } }() node.Consensus.StartFinalityCount()