update consensus information based on blockchain status

pull/1276/head
chao 5 years ago
parent bcf2dcc92f
commit 49d9a6b421
  1. 7
      cmd/harmony/main.go
  2. 4
      consensus/consensus.go
  3. 77
      consensus/consensus_service.go
  4. 2
      consensus/consensus_v2.go
  5. 6
      core/blockchain.go

@ -301,10 +301,6 @@ func setUpConsensusAndNode(nodeConfig *nodeconfig.ConfigType) *node.Node {
currentConsensus.SetCommitDelay(commitDelay)
currentConsensus.MinPeers = *minPeers
if *isNewNode {
currentConsensus.SetMode(consensus.Listening)
}
if *disableViewChange {
currentConsensus.DisableViewChangeForTestingOnly()
}
@ -399,6 +395,9 @@ func setUpConsensusAndNode(nodeConfig *nodeconfig.ConfigType) *node.Node {
currentConsensus.OnConsensusDone = currentNode.PostConsensusProcessing
currentNode.State = node.NodeWaitToJoin
// update consensus information based on the blockchain
currentConsensus.UpdateConsensusInformation()
// Watching currentNode and currentConsensus.
memprofiling.GetMemProfiling().Add("currentNode", currentNode)
memprofiling.GetMemProfiling().Add("currentConsensus", currentConsensus)

@ -38,6 +38,10 @@ type Consensus struct {
phase PbftPhase
// mode: indicate a node is in normal or viewchanging mode
mode PbftMode
// epoch: current epoch number
epoch uint64
// blockNum: the next blockNumber that PBFT is going to agree on, should be equal to the blockNumber of next block
blockNum uint64
// channel to receive consensus message

@ -5,6 +5,7 @@ import (
"encoding/hex"
"errors"
"fmt"
"math/big"
"time"
"github.com/harmony-one/harmony/crypto/hash"
@ -19,6 +20,7 @@ import (
msg_pb "github.com/harmony-one/harmony/api/proto/message"
consensus_engine "github.com/harmony-one/harmony/consensus/engine"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/state"
"github.com/harmony-one/harmony/core/types"
bls_cosi "github.com/harmony-one/harmony/crypto/bls"
@ -529,6 +531,13 @@ func (consensus *Consensus) SetBlockNum(blockNum uint64) {
consensus.blockNum = blockNum
}
// SetEpochNum sets the epoch in consensus object
func (consensus *Consensus) SetEpochNum(epoch uint64) {
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
consensus.epoch = epoch
}
// ReadSignatureBitmapPayload read the payload for signature and bitmap; offset is the beginning position of reading
func (consensus *Consensus) ReadSignatureBitmapPayload(recvPayload []byte, offset int) (*bls.Sign, *bls_cosi.Mask, error) {
if offset+96 > len(recvPayload) {
@ -650,22 +659,68 @@ func (consensus *Consensus) getLeaderPubKeyFromCoinbase(header *types.Header) (*
return nil, ctxerror.New("cannot find corresponding BLS Public Key", "coinbaseAddr", header.Coinbase)
}
// update consensus information before join consensus after state syncing
func (consensus *Consensus) updateConsensusInformation() {
// UpdateConsensusInformation will update shard information (epoch, publicKeys, blockNum, viewID)
// based on the local blockchain. It is called in two cases for now:
// 1. consensus object initialization. because of current dependency where chainreader is only available
// after node is initialized; node is only available after consensus is initialized
// we need call this function separately after create consensus object
// 2. after state syncing is finished
func (consensus *Consensus) UpdateConsensusInformation() {
var pubKeys []*bls.PublicKey
consensus.mode.SetMode(Syncing)
header := consensus.ChainReader.CurrentHeader()
consensus.SetBlockNum(header.Number.Uint64() + 1)
consensus.SetViewID(header.ViewID.Uint64() + 1)
leaderPubKey, err := consensus.getLeaderPubKeyFromCoinbase(header)
if err != nil || leaderPubKey == nil {
consensus.getLogger().Debug().Err(err).Msg("[SYNC] Unable to get leaderPubKey from coinbase")
consensus.ignoreViewIDCheck = true
epoch := header.Epoch
curPubKeys := core.GetPublicKeys(epoch, header.ShardID)
consensus.numPrevPubKeys = len(curPubKeys)
consensus.getLogger().Info().Msg("[UpdateConsensusInformation] Updating.....")
if core.IsEpochLastBlockByHeader(header) {
// increase epoch by one if it's the last block
consensus.SetEpochNum(epoch.Uint64() + 1)
nextEpoch := new(big.Int).Add(epoch, common.Big1)
pubKeys = core.GetPublicKeys(nextEpoch, header.ShardID)
} else {
consensus.getLogger().Debug().
Str("leaderPubKey", leaderPubKey.SerializeToHexStr()).
Msg("[SYNC] Most Recent LeaderPubKey Updated Based on BlockChain")
consensus.LeaderPubKey = leaderPubKey
consensus.mode.SetMode(Normal)
consensus.SetEpochNum(epoch.Uint64())
pubKeys = curPubKeys
}
if len(pubKeys) == 0 {
consensus.getLogger().Warn().Msg("[UpdateConsensusInformation] PublicKeys is Nil")
return
}
for _, key := range pubKeys {
// only update publicKeys when node is in committee
if key.IsEqual(consensus.PubKey) {
consensus.mode.SetMode(Normal)
consensus.getLogger().Info().
Int("numPubKeys", len(pubKeys)).
Msg("[UpdateConsensusInformation] Successfully updated public keys")
consensus.UpdatePublicKeys(pubKeys)
// take care of possible leader change during the epoch
if !core.IsEpochLastBlockByHeader(header) {
leaderPubKey, err := consensus.getLeaderPubKeyFromCoinbase(header)
if err != nil || leaderPubKey == nil {
consensus.mode.SetMode(Syncing)
consensus.getLogger().Debug().Err(err).Msg("[SYNC] Unable to get leaderPubKey from coinbase")
consensus.ignoreViewIDCheck = true
} else {
consensus.getLogger().Debug().
Str("leaderPubKey", leaderPubKey.SerializeToHexStr()).
Msg("[SYNC] Most Recent LeaderPubKey Updated Based on BlockChain")
consensus.LeaderPubKey = leaderPubKey
}
}
return
}
}
consensus.mode.SetMode(Listening)
}
// IsLeader check if the node is a leader or not by comparing the public key of

@ -1069,7 +1069,7 @@ func (consensus *Consensus) Start(blockChannel chan *types.Block, stopChan chan
}
}
case <-consensus.syncReadyChan:
consensus.updateConsensusInformation()
consensus.UpdateConsensusInformation()
consensus.getLogger().Info().Msg("Node is in sync")
case <-consensus.syncNotReadyChan:

@ -238,6 +238,12 @@ func IsEpochLastBlock(block *types.Block) bool {
return ShardingSchedule.IsLastBlock(block.NumberU64())
}
// IsEpochLastBlockByHeader returns whether this block is the last block of an epoch
// given block header
func IsEpochLastBlockByHeader(header *types.Header) bool {
return ShardingSchedule.IsLastBlock(header.Number.Uint64())
}
func (bc *BlockChain) getProcInterrupt() bool {
return atomic.LoadInt32(&bc.procInterrupt) == 1
}

Loading…
Cancel
Save