pull/3405/head
Rongjian Lan 4 years ago
parent 611448a0a1
commit ee3dd70676
  1. 1
      consensus/consensus_msg_sender.go
  2. 4
      consensus/consensus_v2.go
  3. 7
      consensus/leader.go
  4. 19
      consensus/quorum/quorum.go
  5. 6
      consensus/validator.go
  6. 116
      consensus/view_change.go
  7. 3
      node/node_newblock.go

@ -115,6 +115,7 @@ func (sender *MessageSender) StopRetry(msgType msg_pb.MessageType) {
if ok {
msgRetry := data.(*MessageRetry)
atomic.StoreUint32(&msgRetry.isActive, 0)
utils.Logger().Info().Str("type", msgType.String()).Uint32("isActive", msgRetry.isActive).Msg("STOPPING RETRY")
}
}

@ -142,6 +142,7 @@ func (consensus *Consensus) finalCommit() {
return
}
// if leader success finalize the block, send committed message to validators
// TODO: once leader rotation is implemented, leader who is about to be switched out
// needs to send the committed message immediately so the next leader can
@ -484,6 +485,9 @@ func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error {
return err
}
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
network, err := consensus.construct(msg_pb.MessageType_COMMITTED, nil, []*bls.PrivateKeyWrapper{leaderPriKey})
if err != nil {
return errors.Wrap(err, "[preCommitAndPropose] Unable to construct Committed message")

@ -3,6 +3,8 @@ package consensus
import (
"time"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/crypto/bls"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
@ -199,8 +201,9 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) {
}
func (consensus *Consensus) onCommit(msg *msg_pb.Message) {
if consensus.GetViewChangingID() == 10 {
return
utils.Logger().Info().Msgf("ViewChanging %d %d", consensus.GetCurBlockViewID(), consensus.GetViewChangingID())
if consensus.GetCurBlockViewID() == 10 {
//return
}
recvMsg, err := consensus.ParseFBFTMessage(msg)
if err != nil {

@ -2,6 +2,7 @@ package quorum
import (
"fmt"
"github.com/harmony-one/harmony/internal/configs/sharding"
"math/big"
"github.com/harmony-one/harmony/crypto/bls"
@ -73,6 +74,7 @@ type ParticipantTracker interface {
IndexOf(bls.SerializedPublicKey) int
ParticipantsCount() int64
NthNext(*bls.PublicKeyWrapper, int) (bool, *bls.PublicKeyWrapper)
NthNextHmy(shardingconfig.Instance, *bls.PublicKeyWrapper, int) (bool, *bls.PublicKeyWrapper)
UpdateParticipants(pubKeys []bls.PublicKeyWrapper)
}
@ -218,6 +220,23 @@ func (s *cIdentities) NthNext(pubKey *bls.PublicKeyWrapper, next int) (bool, *bl
return found, &s.publicKeys[idx]
}
// NthNextHmy return the Nth next pubkey of Harmony nodes, next can be negative number
func (s *cIdentities) NthNextHmy(instance shardingconfig.Instance, pubKey *bls.PublicKeyWrapper, next int) (bool, *bls.PublicKeyWrapper) {
found := false
idx := s.IndexOf(pubKey.Bytes)
if idx != -1 {
found = true
}
numNodes := instance.NumHarmonyOperatedNodesPerShard()
// sanity check to avoid out of bound access
if numNodes <= 0 || numNodes > len(s.publicKeys) {
numNodes = len(s.publicKeys)
}
idx = (idx + next) % numNodes
return found, &s.publicKeys[idx]
}
func (s *cIdentities) Participants() multibls.PublicKeys {
return s.publicKeys
}

@ -227,6 +227,12 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
consensus.getLogger().Warn().Msg("[OnCommitted] unable to parse msg")
return
}
consensus.getLogger().Info().
Uint64("MsgBlockNum", recvMsg.BlockNum).
Uint64("MsgViewID", recvMsg.ViewID).
Msg("[OnCommitted] Received committed message")
// It's ok to receive committed message for last block due to pipelining.
// The committed message for last block could include more signatures now.
if recvMsg.BlockNum < consensus.blockNum-1 {

@ -1,6 +1,8 @@
package consensus
import (
"github.com/harmony-one/harmony/shard"
"math/big"
"sync"
"time"
@ -90,20 +92,122 @@ func (pm *State) GetViewChangeDuraion() time.Duration {
return time.Duration(diff * diff * int64(viewChangeDuration))
}
// GetNextLeaderKey uniquely determine who is the leader for given viewID
func (consensus *Consensus) GetNextLeaderKey(viewID uint64) *bls.PublicKeyWrapper {
// fallbackNextViewID return the next view ID and duration when there is an exception
// to calculate the time-based viewId
func (consensus *Consensus) fallbackNextViewID() (uint64, time.Duration) {
diff := int64(consensus.GetViewChangingID() + 1 - consensus.GetCurBlockViewID())
if diff <= 0 {
diff = int64(1)
}
consensus.getLogger().Error().
Int64("diff", diff).
Msg("[fallbackNextViewID] use legacy viewID algorithm")
return consensus.GetViewChangingID() + 1, time.Duration(diff * diff * int64(viewChangeDuration))
}
// getNextViewID return the next view ID based on the timestamp
// The next view ID is calculated based on the difference of validator's timestamp
// and the block's timestamp. So that it can be deterministic to return the next view ID
// only based on the blockchain block and the validator's current timestamp.
// The next view ID is the single factor used to determine
// the next leader, so it is mod the number of nodes per shard.
// It returns the next viewID and duration of the view change
// The view change duration is a fixed duration now to avoid stuck into offline nodes during
// the view change.
// viewID is only used as the fallback mechansim to determine the nextViewID
func (consensus *Consensus) getNextViewID() (uint64, time.Duration) {
// handle corner case at first
if consensus.Blockchain == nil {
return consensus.fallbackNextViewID()
}
curHeader := consensus.Blockchain.CurrentHeader()
if curHeader == nil {
return consensus.fallbackNextViewID()
}
blockTimestamp := curHeader.Time().Int64()
curTimestamp := time.Now().Unix()
// timestamp messed up in current validator node
if curTimestamp <= blockTimestamp {
return consensus.fallbackNextViewID()
}
totalNode := consensus.Decider.ParticipantsCount()
// diff is at least 1, and it won't exceed the totalNode
diff := uint64(((curTimestamp - blockTimestamp) / viewChangeTimeout) % int64(totalNode))
nextViewID := diff + consensus.GetCurBlockViewID()
consensus.getLogger().Info().
Int64("curTimestamp", curTimestamp).
Int64("blockTimestamp", blockTimestamp).
Uint64("nextViewID", nextViewID).
Uint64("curViewID", consensus.GetCurBlockViewID()).
Msg("[getNextViewID]")
// duration is always the fixed view change duration for synchronous view change
return nextViewID, viewChangeDuration
}
// getNextLeaderKey uniquely determine who is the leader for given viewID
// It reads the current leader's pubkey based on the blockchain data and returns
// the next leader based on the gap of the viewID of the view change and the last
// know view id of the block.
func (consensus *Consensus) getNextLeaderKey(viewID uint64) *bls.PublicKeyWrapper {
gap := 1
if viewID > consensus.GetCurBlockViewID() {
gap = int(viewID - consensus.GetCurBlockViewID())
}
var lastLeaderPubKey *bls.PublicKeyWrapper
var err error
epoch := big.NewInt(0)
if consensus.Blockchain == nil {
consensus.getLogger().Error().Msg("[getNextLeaderKey] ChainReader is nil. Use consensus.LeaderPubKey")
lastLeaderPubKey = consensus.LeaderPubKey
} else {
curHeader := consensus.Blockchain.CurrentHeader()
if curHeader == nil {
consensus.getLogger().Error().Msg("[getNextLeaderKey] Failed to get current header from blockchain")
lastLeaderPubKey = consensus.LeaderPubKey
} else {
// this is the truth of the leader based on blockchain blocks
lastLeaderPubKey, err = consensus.getLeaderPubKeyFromCoinbase(curHeader)
if err != nil || lastLeaderPubKey == nil {
consensus.getLogger().Error().Err(err).
Msg("[getNextLeaderKey] Unable to get leaderPubKey from coinbase. Set it to consensus.LeaderPubKey")
lastLeaderPubKey = consensus.LeaderPubKey
}
epoch = curHeader.Epoch()
// viewchange happened at the first block of new epoch
// use the LeaderPubKey as the base of the next leader
// as we shouldn't use lastLeader from coinbase as the base.
// The LeaderPubKey should be updated to the index 0 of the committee
if curHeader.IsLastBlockInEpoch() {
consensus.getLogger().Info().Msg("[getNextLeaderKey] view change in the first block of new epoch")
lastLeaderPubKey = consensus.LeaderPubKey
}
}
}
consensus.getLogger().Info().
Str("lastLeaderPubKey", lastLeaderPubKey.Bytes.Hex()).
Str("leaderPubKey", consensus.LeaderPubKey.Bytes.Hex()).
Int("gap", gap).
Uint64("newViewID", viewID).
Uint64("myCurBlockViewID", consensus.GetCurBlockViewID()).
Msg("[GetNextLeaderKey] got leaderPubKey from coinbase")
wasFound, next := consensus.Decider.NthNext(consensus.LeaderPubKey, gap)
Msg("[getNextLeaderKey] got leaderPubKey from coinbase")
// wasFound, next := consensus.Decider.NthNext(lastLeaderPubKey, gap)
// FIXME: rotate leader on harmony nodes only before fully externalization
wasFound, next := consensus.Decider.NthNextHmy(
shard.Schedule.InstanceForEpoch(epoch),
lastLeaderPubKey,
gap)
if !wasFound {
consensus.getLogger().Warn().
Str("key", consensus.LeaderPubKey.Bytes.Hex()).
Msg("GetNextLeaderKey: currentLeaderKey not found")
Msg("[getNextLeaderKey] currentLeaderKey not found")
}
consensus.getLogger().Info().
Str("nextLeader", next.Bytes.Hex()).
Msg("[getNextLeaderKey] next Leader")
return next
}
@ -125,7 +229,7 @@ func (consensus *Consensus) startViewChange(viewID uint64) {
consensus.consensusTimeout[timeoutBootstrap].Stop()
consensus.current.SetMode(ViewChanging)
consensus.SetViewChangingID(viewID)
consensus.LeaderPubKey = consensus.GetNextLeaderKey(viewID)
consensus.LeaderPubKey = consensus.getNextLeaderKey(viewID)
duration := consensus.current.GetViewChangeDuraion()
consensus.getLogger().Warn().

@ -57,7 +57,7 @@ func (node *Node) WaitForConsensusReadyV2(readySignal chan struct{}, commitSigsC
if len(commitSigs) > bls.BLSSignatureSizeInBytes {
newCommitSigsChan <- commitSigs
}
case <-time.After(5 * time.Second):
case <-time.After(4 * time.Second):
sigs, err := node.Consensus.BlockCommitSigs(node.Blockchain().CurrentBlock().NumberU64())
if err != nil {
@ -65,7 +65,6 @@ func (node *Node) WaitForConsensusReadyV2(readySignal chan struct{}, commitSigsC
} else {
newCommitSigsChan <- sigs
}
}
}()
node.Consensus.StartFinalityCount()

Loading…
Cancel
Save