Merge pull request #3415 from rlan35/pipeline_prep

Allow validator to update block commit sigs if it has more sigs
pull/3408/head
Rongjian Lan 4 years ago committed by GitHub
commit d8d684a305
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      cmd/harmony/main.go
  2. 4
      consensus/consensus.go
  3. 24
      consensus/consensus_service.go
  4. 28
      consensus/consensus_v2.go
  5. 46
      consensus/construct.go
  6. 4
      consensus/double_sign.go
  7. 2
      consensus/leader.go
  8. 2
      consensus/threshold.go
  9. 24
      consensus/validator.go
  10. 10
      consensus/view_change.go
  11. 6
      internal/chain/sig.go
  12. 4
      node/node.go
  13. 2
      node/node_handler.go
  14. 2
      shard/committee/assignment.go

@ -615,7 +615,7 @@ func setupConsensusAndNode(hc harmonyConfig, nodeConfig *nodeconfig.ConfigType)
} }
// TODO: refactor the creation of blockchain out of node.New() // TODO: refactor the creation of blockchain out of node.New()
currentConsensus.ChainReader = currentNode.Blockchain() currentConsensus.Blockchain = currentNode.Blockchain()
currentNode.NodeConfig.DNSZone = hc.Network.DNSZone currentNode.NodeConfig.DNSZone = hc.Network.DNSZone
currentNode.NodeConfig.SetBeaconGroupID( currentNode.NodeConfig.SetBeaconGroupID(

@ -54,8 +54,8 @@ type Consensus struct {
multiSigBitmap *bls_cosi.Mask // Bitmap for parsing multisig bitmap from validators multiSigBitmap *bls_cosi.Mask // Bitmap for parsing multisig bitmap from validators
multiSigMutex sync.RWMutex multiSigMutex sync.RWMutex
// The chain reader for the blockchain this consensus is working on // The blockchain this consensus is working on
ChainReader *core.BlockChain Blockchain *core.BlockChain
// Minimal number of peers in the shard // Minimal number of peers in the shard
// If the number of validators is less than minPeers, the consensus won't start // If the number of validators is less than minPeers, the consensus won't start
MinPeers int MinPeers int

@ -246,7 +246,7 @@ func (consensus *Consensus) ReadSignatureBitmapPayload(
func (consensus *Consensus) getLeaderPubKeyFromCoinbase( func (consensus *Consensus) getLeaderPubKeyFromCoinbase(
header *block.Header, header *block.Header,
) (*bls.PublicKeyWrapper, error) { ) (*bls.PublicKeyWrapper, error) {
shardState, err := consensus.ChainReader.ReadShardState(header.Epoch()) shardState, err := consensus.Blockchain.ReadShardState(header.Epoch())
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "cannot read shard state %v %s", return nil, errors.Wrapf(err, "cannot read shard state %v %s",
header.Epoch(), header.Epoch(),
@ -260,7 +260,7 @@ func (consensus *Consensus) getLeaderPubKeyFromCoinbase(
} }
committerKey := new(bls_core.PublicKey) committerKey := new(bls_core.PublicKey)
isStaking := consensus.ChainReader.Config().IsStaking(header.Epoch()) isStaking := consensus.Blockchain.Config().IsStaking(header.Epoch())
for _, member := range committee.Slots { for _, member := range committee.Slots {
if isStaking { if isStaking {
// After staking the coinbase address will be the address of bls public key // After staking the coinbase address will be the address of bls public key
@ -296,7 +296,7 @@ func (consensus *Consensus) getLeaderPubKeyFromCoinbase(
// (b) node in committed but has any err during processing: Syncing mode // (b) node in committed but has any err during processing: Syncing mode
// (c) node in committed and everything looks good: Normal mode // (c) node in committed and everything looks good: Normal mode
func (consensus *Consensus) UpdateConsensusInformation() Mode { func (consensus *Consensus) UpdateConsensusInformation() Mode {
curHeader := consensus.ChainReader.CurrentHeader() curHeader := consensus.Blockchain.CurrentHeader()
curEpoch := curHeader.Epoch() curEpoch := curHeader.Epoch()
nextEpoch := new(big.Int).Add(curHeader.Epoch(), common.Big1) nextEpoch := new(big.Int).Add(curHeader.Epoch(), common.Big1)
@ -314,15 +314,15 @@ func (consensus *Consensus) UpdateConsensusInformation() Mode {
consensus.BlockPeriod = 5 * time.Second consensus.BlockPeriod = 5 * time.Second
// Enable aggregate sig at epoch 1000 for mainnet, at epoch 53000 for testnet, and always for other nets. // Enable aggregate sig at epoch 1000 for mainnet, at epoch 53000 for testnet, and always for other nets.
if (consensus.ChainReader.Config().ChainID == params.MainnetChainID && curEpoch.Cmp(big.NewInt(1000)) > 0) || if (consensus.Blockchain.Config().ChainID == params.MainnetChainID && curEpoch.Cmp(big.NewInt(1000)) > 0) ||
(consensus.ChainReader.Config().ChainID == params.TestnetChainID && curEpoch.Cmp(big.NewInt(54500)) > 0) || (consensus.Blockchain.Config().ChainID == params.TestnetChainID && curEpoch.Cmp(big.NewInt(54500)) > 0) ||
(consensus.ChainReader.Config().ChainID != params.MainnetChainID && consensus.ChainReader.Config().ChainID != params.TestChainID) { (consensus.Blockchain.Config().ChainID != params.MainnetChainID && consensus.Blockchain.Config().ChainID != params.TestChainID) {
consensus.AggregateSig = true consensus.AggregateSig = true
} }
isFirstTimeStaking := consensus.ChainReader.Config().IsStaking(nextEpoch) && isFirstTimeStaking := consensus.Blockchain.Config().IsStaking(nextEpoch) &&
curHeader.IsLastBlockInEpoch() && !consensus.ChainReader.Config().IsStaking(curEpoch) curHeader.IsLastBlockInEpoch() && !consensus.Blockchain.Config().IsStaking(curEpoch)
haventUpdatedDecider := consensus.ChainReader.Config().IsStaking(curEpoch) && haventUpdatedDecider := consensus.Blockchain.Config().IsStaking(curEpoch) &&
consensus.Decider.Policy() != quorum.SuperMajorityStake consensus.Decider.Policy() != quorum.SuperMajorityStake
// Only happens once, the flip-over to a new Decider policy // Only happens once, the flip-over to a new Decider policy
@ -338,7 +338,7 @@ func (consensus *Consensus) UpdateConsensusInformation() Mode {
epochToSet := curEpoch epochToSet := curEpoch
hasError := false hasError := false
curShardState, err := committee.WithStakingEnabled.ReadFromDB( curShardState, err := committee.WithStakingEnabled.ReadFromDB(
curEpoch, consensus.ChainReader, curEpoch, consensus.Blockchain,
) )
if err != nil { if err != nil {
utils.Logger().Error(). utils.Logger().Error().
@ -354,7 +354,7 @@ func (consensus *Consensus) UpdateConsensusInformation() Mode {
if curHeader.IsLastBlockInEpoch() && isNotGenesisBlock { if curHeader.IsLastBlockInEpoch() && isNotGenesisBlock {
nextShardState, err := committee.WithStakingEnabled.ReadFromDB( nextShardState, err := committee.WithStakingEnabled.ReadFromDB(
nextEpoch, consensus.ChainReader, nextEpoch, consensus.Blockchain,
) )
if err != nil { if err != nil {
utils.Logger().Error(). utils.Logger().Error().
@ -561,7 +561,7 @@ func (consensus *Consensus) selfCommit(payload []byte) error {
consensus.switchPhase("selfCommit", FBFTCommit) consensus.switchPhase("selfCommit", FBFTCommit)
consensus.aggregatedPrepareSig = aggSig consensus.aggregatedPrepareSig = aggSig
consensus.prepareBitmap = mask consensus.prepareBitmap = mask
commitPayload := signature.ConstructCommitPayload(consensus.ChainReader, commitPayload := signature.ConstructCommitPayload(consensus.Blockchain,
block.Epoch(), block.Hash(), block.NumberU64(), block.Header().ViewID().Uint64()) block.Epoch(), block.Hash(), block.NumberU64(), block.Header().ViewID().Uint64())
for i, key := range consensus.priKey { for i, key := range consensus.priKey {
if err := consensus.commitBitmap.SetKey(key.Pub.Bytes, true); err != nil { if err := consensus.commitBitmap.SetKey(key.Pub.Bytes, true); err != nil {

@ -196,7 +196,7 @@ func (consensus *Consensus) BlockCommitSigs(blockNum uint64) ([]byte, error) {
if consensus.blockNum <= 1 { if consensus.blockNum <= 1 {
return nil, nil return nil, nil
} }
lastCommits, err := consensus.ChainReader.ReadCommitSig(blockNum) lastCommits, err := consensus.Blockchain.ReadCommitSig(blockNum)
if err != nil || if err != nil ||
len(lastCommits) < bls.BLSSignatureSizeInBytes { len(lastCommits) < bls.BLSSignatureSizeInBytes {
msgs := consensus.FBFTLog.GetMessagesByTypeSeq( msgs := consensus.FBFTLog.GetMessagesByTypeSeq(
@ -274,15 +274,15 @@ func (consensus *Consensus) Start(
} }
case <-consensus.syncReadyChan: case <-consensus.syncReadyChan:
consensus.getLogger().Info().Msg("[ConsensusMainLoop] syncReadyChan") consensus.getLogger().Info().Msg("[ConsensusMainLoop] syncReadyChan")
consensus.SetBlockNum(consensus.ChainReader.CurrentHeader().Number().Uint64() + 1) consensus.SetBlockNum(consensus.Blockchain.CurrentHeader().Number().Uint64() + 1)
consensus.SetViewIDs(consensus.ChainReader.CurrentHeader().ViewID().Uint64() + 1) consensus.SetViewIDs(consensus.Blockchain.CurrentHeader().ViewID().Uint64() + 1)
mode := consensus.UpdateConsensusInformation() mode := consensus.UpdateConsensusInformation()
consensus.current.SetMode(mode) consensus.current.SetMode(mode)
consensus.getLogger().Info().Str("Mode", mode.String()).Msg("Node is IN SYNC") consensus.getLogger().Info().Str("Mode", mode.String()).Msg("Node is IN SYNC")
case <-consensus.syncNotReadyChan: case <-consensus.syncNotReadyChan:
consensus.getLogger().Info().Msg("[ConsensusMainLoop] syncNotReadyChan") consensus.getLogger().Info().Msg("[ConsensusMainLoop] syncNotReadyChan")
consensus.SetBlockNum(consensus.ChainReader.CurrentHeader().Number().Uint64() + 1) consensus.SetBlockNum(consensus.Blockchain.CurrentHeader().Number().Uint64() + 1)
consensus.current.SetMode(Syncing) consensus.current.SetMode(Syncing)
consensus.getLogger().Info().Msg("[ConsensusMainLoop] Node is OUT OF SYNC") consensus.getLogger().Info().Msg("[ConsensusMainLoop] Node is OUT OF SYNC")
@ -294,8 +294,8 @@ func (consensus *Consensus) Start(
//VRF/VDF is only generated in the beacon chain //VRF/VDF is only generated in the beacon chain
if consensus.NeedsRandomNumberGeneration(newBlock.Header().Epoch()) { if consensus.NeedsRandomNumberGeneration(newBlock.Header().Epoch()) {
// generate VRF if the current block has a new leader // generate VRF if the current block has a new leader
if !consensus.ChainReader.IsSameLeaderAsPreviousBlock(newBlock) { if !consensus.Blockchain.IsSameLeaderAsPreviousBlock(newBlock) {
vrfBlockNumbers, err := consensus.ChainReader.ReadEpochVrfBlockNums(newBlock.Header().Epoch()) vrfBlockNumbers, err := consensus.Blockchain.ReadEpochVrfBlockNums(newBlock.Header().Epoch())
if err != nil { if err != nil {
consensus.getLogger().Info(). consensus.getLogger().Info().
Uint64("MsgBlockNum", newBlock.NumberU64()). Uint64("MsgBlockNum", newBlock.NumberU64()).
@ -326,7 +326,7 @@ func (consensus *Consensus) Start(
if (!vdfInProgress) && len(vrfBlockNumbers) >= consensus.VdfSeedSize() { if (!vdfInProgress) && len(vrfBlockNumbers) >= consensus.VdfSeedSize() {
//check local database to see if there's a VDF generated for this epoch //check local database to see if there's a VDF generated for this epoch
//generate a VDF if no blocknum is available //generate a VDF if no blocknum is available
_, err := consensus.ChainReader.ReadEpochVdfBlockNum(newBlock.Header().Epoch()) _, err := consensus.Blockchain.ReadEpochVdfBlockNum(newBlock.Header().Epoch())
if err != nil { if err != nil {
consensus.GenerateVdfAndProof(newBlock, vrfBlockNumbers) consensus.GenerateVdfAndProof(newBlock, vrfBlockNumbers)
vdfInProgress = true vdfInProgress = true
@ -347,7 +347,7 @@ func (consensus *Consensus) Start(
Msg("[ConsensusMainLoop] failed to verify the VDF output") Msg("[ConsensusMainLoop] failed to verify the VDF output")
} else { } else {
//write the VDF only if VDF has not been generated //write the VDF only if VDF has not been generated
_, err := consensus.ChainReader.ReadEpochVdfBlockNum(newBlock.Header().Epoch()) _, err := consensus.Blockchain.ReadEpochVdfBlockNum(newBlock.Header().Epoch())
if err == nil { if err == nil {
consensus.getLogger().Info(). consensus.getLogger().Info().
Uint64("MsgBlockNum", newBlock.NumberU64()). Uint64("MsgBlockNum", newBlock.NumberU64()).
@ -555,7 +555,7 @@ func (consensus *Consensus) GenerateVrfAndProof(newBlock *types.Block, vrfBlockN
} }
sk := vrf_bls.NewVRFSigner(key.Pri) sk := vrf_bls.NewVRFSigner(key.Pri)
blockHash := [32]byte{} blockHash := [32]byte{}
previousHeader := consensus.ChainReader.GetHeaderByNumber( previousHeader := consensus.Blockchain.GetHeaderByNumber(
newBlock.NumberU64() - 1, newBlock.NumberU64() - 1,
) )
if previousHeader == nil { if previousHeader == nil {
@ -580,7 +580,7 @@ func (consensus *Consensus) GenerateVrfAndProof(newBlock *types.Block, vrfBlockN
func (consensus *Consensus) ValidateVrfAndProof(headerObj *block.Header) bool { func (consensus *Consensus) ValidateVrfAndProof(headerObj *block.Header) bool {
vrfPk := vrf_bls.NewVRFVerifier(consensus.LeaderPubKey.Object) vrfPk := vrf_bls.NewVRFVerifier(consensus.LeaderPubKey.Object)
var blockHash [32]byte var blockHash [32]byte
previousHeader := consensus.ChainReader.GetHeaderByNumber( previousHeader := consensus.Blockchain.GetHeaderByNumber(
headerObj.Number().Uint64() - 1, headerObj.Number().Uint64() - 1,
) )
if previousHeader == nil { if previousHeader == nil {
@ -608,7 +608,7 @@ func (consensus *Consensus) ValidateVrfAndProof(headerObj *block.Header) bool {
return false return false
} }
vrfBlockNumbers, _ := consensus.ChainReader.ReadEpochVrfBlockNums( vrfBlockNumbers, _ := consensus.Blockchain.ReadEpochVrfBlockNums(
headerObj.Epoch(), headerObj.Epoch(),
) )
consensus.getLogger().Info(). consensus.getLogger().Info().
@ -624,7 +624,7 @@ func (consensus *Consensus) GenerateVdfAndProof(newBlock *types.Block, vrfBlockN
//derive VDF seed from VRFs generated in the current epoch //derive VDF seed from VRFs generated in the current epoch
seed := [32]byte{} seed := [32]byte{}
for i := 0; i < consensus.VdfSeedSize(); i++ { for i := 0; i < consensus.VdfSeedSize(); i++ {
previousVrf := consensus.ChainReader.GetVrfByNumber(vrfBlockNumbers[i]) previousVrf := consensus.Blockchain.GetVrfByNumber(vrfBlockNumbers[i])
for j := 0; j < len(seed); j++ { for j := 0; j < len(seed); j++ {
seed[j] = seed[j] ^ previousVrf[j] seed[j] = seed[j] ^ previousVrf[j]
} }
@ -658,7 +658,7 @@ func (consensus *Consensus) GenerateVdfAndProof(newBlock *types.Block, vrfBlockN
// ValidateVdfAndProof validates the VDF/proof in the current epoch // ValidateVdfAndProof validates the VDF/proof in the current epoch
func (consensus *Consensus) ValidateVdfAndProof(headerObj *block.Header) bool { func (consensus *Consensus) ValidateVdfAndProof(headerObj *block.Header) bool {
vrfBlockNumbers, err := consensus.ChainReader.ReadEpochVrfBlockNums(headerObj.Epoch()) vrfBlockNumbers, err := consensus.Blockchain.ReadEpochVrfBlockNums(headerObj.Epoch())
if err != nil { if err != nil {
consensus.getLogger().Error().Err(err). consensus.getLogger().Error().Err(err).
Str("MsgBlockNum", headerObj.Number().String()). Str("MsgBlockNum", headerObj.Number().String()).
@ -673,7 +673,7 @@ func (consensus *Consensus) ValidateVdfAndProof(headerObj *block.Header) bool {
seed := [32]byte{} seed := [32]byte{}
for i := 0; i < consensus.VdfSeedSize(); i++ { for i := 0; i < consensus.VdfSeedSize(); i++ {
previousVrf := consensus.ChainReader.GetVrfByNumber(vrfBlockNumbers[i]) previousVrf := consensus.Blockchain.GetVrfByNumber(vrfBlockNumbers[i])
for j := 0; j < len(seed); j++ { for j := 0; j < len(seed); j++ {
seed[j] = seed[j] ^ previousVrf[j] seed[j] = seed[j] ^ previousVrf[j]
} }

@ -98,16 +98,8 @@ func (consensus *Consensus) construct(
// Do the signing, 96 byte of bls signature // Do the signing, 96 byte of bls signature
needMsgSig := true needMsgSig := true
switch p { switch p {
case msg_pb.MessageType_PREPARED: case msg_pb.MessageType_ANNOUNCE:
consensusMsg.Block = consensus.block consensusMsg.Payload = consensus.blockHash[:]
// Payload
buffer := bytes.Buffer{}
// 96 bytes aggregated signature
aggSig = consensus.Decider.AggregateVotes(quorum.Prepare)
buffer.Write(aggSig.Serialize())
// Bitmap
buffer.Write(consensus.prepareBitmap.Bitmap)
consensusMsg.Payload = buffer.Bytes()
case msg_pb.MessageType_PREPARE: case msg_pb.MessageType_PREPARE:
needMsgSig = false needMsgSig = false
sig := bls_core.Sign{} sig := bls_core.Sign{}
@ -126,16 +118,11 @@ func (consensus *Consensus) construct(
} }
} }
consensusMsg.Payload = sig.Serialize() consensusMsg.Payload = sig.Serialize()
case msg_pb.MessageType_PREPARED:
consensusMsg.Block = consensus.block
consensusMsg.Payload = consensus.constructQuorumSigAndBitmap(quorum.Prepare)
case msg_pb.MessageType_COMMITTED: case msg_pb.MessageType_COMMITTED:
buffer := bytes.Buffer{} consensusMsg.Payload = consensus.constructQuorumSigAndBitmap(quorum.Commit)
// 96 bytes aggregated signature
aggSig = consensus.Decider.AggregateVotes(quorum.Commit)
buffer.Write(aggSig.Serialize())
// Bitmap
buffer.Write(consensus.commitBitmap.Bitmap)
consensusMsg.Payload = buffer.Bytes()
case msg_pb.MessageType_ANNOUNCE:
consensusMsg.Payload = consensus.blockHash[:]
} }
var marshaledMessage []byte var marshaledMessage []byte
@ -171,3 +158,24 @@ func (consensus *Consensus) construct(
OptionalAggregateSignature: aggSig, OptionalAggregateSignature: aggSig,
}, nil }, nil
} }
// constructQuorumSigAndBitmap constructs the aggregated sig and bitmap as
// a byte slice in format of: [[aggregated sig], [sig bitmap]]
func (consensus *Consensus) constructQuorumSigAndBitmap(p quorum.Phase) []byte {
buffer := bytes.Buffer{}
// 96 bytes aggregated signature
aggSig := consensus.Decider.AggregateVotes(p)
buffer.Write(aggSig.Serialize())
// Bitmap
if p == quorum.Prepare {
buffer.Write(consensus.prepareBitmap.Bitmap)
} else if p == quorum.Commit {
buffer.Write(consensus.commitBitmap.Bitmap)
} else {
utils.Logger().Error().
Str("phase", p.String()).
Msg("[constructQuorumSigAndBitmap] Invalid phase is supplied.")
return []byte{}
}
return buffer.Bytes()
}

@ -40,8 +40,8 @@ func (consensus *Consensus) checkDoubleSign(recvMsg *FBFTMessage) bool {
return true return true
} }
curHeader := consensus.ChainReader.CurrentHeader() curHeader := consensus.Blockchain.CurrentHeader()
committee, err := consensus.ChainReader.ReadShardState(curHeader.Epoch()) committee, err := consensus.Blockchain.ReadShardState(curHeader.Epoch())
if err != nil { if err != nil {
consensus.getLogger().Err(err). consensus.getLogger().Err(err).
Uint32("shard", consensus.ShardID). Uint32("shard", consensus.ShardID).

@ -242,7 +242,7 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) {
Msg("[OnCommit] Failed finding a matching block for committed message") Msg("[OnCommit] Failed finding a matching block for committed message")
return return
} }
commitPayload := signature.ConstructCommitPayload(consensus.ChainReader, commitPayload := signature.ConstructCommitPayload(consensus.Blockchain,
blockObj.Epoch(), blockObj.Hash(), blockObj.NumberU64(), blockObj.Header().ViewID().Uint64()) blockObj.Epoch(), blockObj.Hash(), blockObj.NumberU64(), blockObj.Header().ViewID().Uint64())
logger = logger.With(). logger = logger.With().
Uint64("MsgViewID", recvMsg.ViewID). Uint64("MsgViewID", recvMsg.ViewID).

@ -46,7 +46,7 @@ func (consensus *Consensus) didReachPrepareQuorum() error {
Msg("[didReachPrepareQuorum] Unparseable block data") Msg("[didReachPrepareQuorum] Unparseable block data")
return err return err
} }
commitPayload := signature.ConstructCommitPayload(consensus.ChainReader, commitPayload := signature.ConstructCommitPayload(consensus.Blockchain,
blockObj.Epoch(), blockObj.Hash(), blockObj.NumberU64(), blockObj.Header().ViewID().Uint64()) blockObj.Epoch(), blockObj.Hash(), blockObj.NumberU64(), blockObj.Header().ViewID().Uint64())
// so by this point, everyone has committed to the blockhash of this block // so by this point, everyone has committed to the blockhash of this block

@ -81,7 +81,7 @@ func (consensus *Consensus) sendCommitMessages(blockObj *types.Block) {
priKeys := consensus.getPriKeysInCommittee() priKeys := consensus.getPriKeysInCommittee()
// Sign commit signature on the received block and construct the p2p messages // Sign commit signature on the received block and construct the p2p messages
commitPayload := signature.ConstructCommitPayload(consensus.ChainReader, commitPayload := signature.ConstructCommitPayload(consensus.Blockchain,
blockObj.Epoch(), blockObj.Hash(), blockObj.NumberU64(), blockObj.Header().ViewID().Uint64()) blockObj.Epoch(), blockObj.Hash(), blockObj.NumberU64(), blockObj.Header().ViewID().Uint64())
p2pMsgs := consensus.constructP2pMessages(msg_pb.MessageType_COMMIT, commitPayload, priKeys) p2pMsgs := consensus.constructP2pMessages(msg_pb.MessageType_COMMIT, commitPayload, priKeys)
@ -227,6 +227,11 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
consensus.getLogger().Warn().Msg("[OnCommitted] unable to parse msg") consensus.getLogger().Warn().Msg("[OnCommitted] unable to parse msg")
return return
} }
consensus.getLogger().Info().
Uint64("MsgBlockNum", recvMsg.BlockNum).
Uint64("MsgViewID", recvMsg.ViewID).
Msg("[OnCommitted] Received committed message")
// NOTE let it handle its own logs // NOTE let it handle its own logs
if !consensus.isRightBlockNumCheck(recvMsg) { if !consensus.isRightBlockNumCheck(recvMsg) {
return return
@ -256,7 +261,7 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
Msg("[OnCommitted] Failed finding a matching block for committed message") Msg("[OnCommitted] Failed finding a matching block for committed message")
return return
} }
commitPayload := signature.ConstructCommitPayload(consensus.ChainReader, commitPayload := signature.ConstructCommitPayload(consensus.Blockchain,
blockObj.Epoch(), blockObj.Hash(), blockObj.NumberU64(), blockObj.Header().ViewID().Uint64()) blockObj.Epoch(), blockObj.Hash(), blockObj.NumberU64(), blockObj.Header().ViewID().Uint64())
if !aggSig.VerifyHash(mask.AggregatePublic, commitPayload) { if !aggSig.VerifyHash(mask.AggregatePublic, commitPayload) {
consensus.getLogger().Error(). consensus.getLogger().Error().
@ -273,6 +278,21 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
consensus.aggregatedCommitSig = aggSig consensus.aggregatedCommitSig = aggSig
consensus.commitBitmap = mask consensus.commitBitmap = mask
// If we already have a committed signature received before, check whether the new one
// has more signatures and if yes, override the old data.
// Otherwise, simply write the commit signature in db.
commitSigBitmap, err := consensus.Blockchain.ReadCommitSig(blockObj.NumberU64())
if err == nil && len(commitSigBitmap) == len(recvMsg.Payload) {
new := mask.CountEnabled()
mask.SetMask(commitSigBitmap[bls.BLSSignatureSizeInBytes:])
cur := mask.CountEnabled()
if new > cur {
consensus.Blockchain.WriteCommitSig(blockObj.NumberU64(), recvMsg.Payload)
}
} else {
consensus.Blockchain.WriteCommitSig(blockObj.NumberU64(), recvMsg.Payload)
}
consensus.tryCatchup() consensus.tryCatchup()
if recvMsg.BlockNum > consensus.blockNum { if recvMsg.BlockNum > consensus.blockNum {
consensus.getLogger().Info().Uint64("MsgBlockNum", recvMsg.BlockNum).Msg("[OnCommitted] OUT OF SYNC") consensus.getLogger().Info().Uint64("MsgBlockNum", recvMsg.BlockNum).Msg("[OnCommitted] OUT OF SYNC")

@ -117,10 +117,10 @@ func (consensus *Consensus) fallbackNextViewID() (uint64, time.Duration) {
// viewID is only used as the fallback mechansim to determine the nextViewID // viewID is only used as the fallback mechansim to determine the nextViewID
func (consensus *Consensus) getNextViewID() (uint64, time.Duration) { func (consensus *Consensus) getNextViewID() (uint64, time.Duration) {
// handle corner case at first // handle corner case at first
if consensus.ChainReader == nil { if consensus.Blockchain == nil {
return consensus.fallbackNextViewID() return consensus.fallbackNextViewID()
} }
curHeader := consensus.ChainReader.CurrentHeader() curHeader := consensus.Blockchain.CurrentHeader()
if curHeader == nil { if curHeader == nil {
return consensus.fallbackNextViewID() return consensus.fallbackNextViewID()
} }
@ -160,11 +160,11 @@ func (consensus *Consensus) getNextLeaderKey(viewID uint64) *bls.PublicKeyWrappe
var lastLeaderPubKey *bls.PublicKeyWrapper var lastLeaderPubKey *bls.PublicKeyWrapper
var err error var err error
epoch := big.NewInt(0) epoch := big.NewInt(0)
if consensus.ChainReader == nil { if consensus.Blockchain == nil {
consensus.getLogger().Error().Msg("[getNextLeaderKey] ChainReader is nil. Use consensus.LeaderPubKey") consensus.getLogger().Error().Msg("[getNextLeaderKey] Blockchain is nil. Use consensus.LeaderPubKey")
lastLeaderPubKey = consensus.LeaderPubKey lastLeaderPubKey = consensus.LeaderPubKey
} else { } else {
curHeader := consensus.ChainReader.CurrentHeader() curHeader := consensus.Blockchain.CurrentHeader()
if curHeader == nil { if curHeader == nil {
consensus.getLogger().Error().Msg("[getNextLeaderKey] Failed to get current header from blockchain") consensus.getLogger().Error().Msg("[getNextLeaderKey] Failed to get current header from blockchain")
lastLeaderPubKey = consensus.LeaderPubKey lastLeaderPubKey = consensus.LeaderPubKey

@ -11,15 +11,15 @@ import (
// ReadSignatureBitmapByPublicKeys read the payload of signature and bitmap based on public keys // ReadSignatureBitmapByPublicKeys read the payload of signature and bitmap based on public keys
func ReadSignatureBitmapByPublicKeys(recvPayload []byte, publicKeys []bls.PublicKeyWrapper) (*bls_core.Sign, *bls.Mask, error) { func ReadSignatureBitmapByPublicKeys(recvPayload []byte, publicKeys []bls.PublicKeyWrapper) (*bls_core.Sign, *bls.Mask, error) {
if len(recvPayload) < 96 { if len(recvPayload) < bls.BLSSignatureSizeInBytes {
return nil, nil, errors.New("payload not have enough length") return nil, nil, errors.New("payload not have enough length")
} }
payload := append(recvPayload[:0:0], recvPayload...) payload := append(recvPayload[:0:0], recvPayload...)
//#### Read payload data //#### Read payload data
// 96 byte of multi-sig // 96 byte of multi-sig
offset := 0 offset := 0
multiSig := payload[offset : offset+96] multiSig := payload[offset : offset+bls.BLSSignatureSizeInBytes]
offset += 96 offset += bls.BLSSignatureSizeInBytes
// bitmap // bitmap
bitmap := payload[offset:] bitmap := payload[offset:]
//#### END Read payload data //#### END Read payload data

@ -1053,7 +1053,7 @@ func (node *Node) InitConsensusWithValidators() (err error) {
Uint64("epoch", epoch.Uint64()). Uint64("epoch", epoch.Uint64()).
Msg("[InitConsensusWithValidators] Try To Get PublicKeys") Msg("[InitConsensusWithValidators] Try To Get PublicKeys")
shardState, err := committee.WithStakingEnabled.Compute( shardState, err := committee.WithStakingEnabled.Compute(
epoch, node.Consensus.ChainReader, epoch, node.Consensus.Blockchain,
) )
if err != nil { if err != nil {
utils.Logger().Err(err). utils.Logger().Err(err).
@ -1159,7 +1159,7 @@ func (node *Node) populateSelfAddresses(epoch *big.Int) {
node.keysToAddrsEpoch = epoch node.keysToAddrsEpoch = epoch
shardID := node.Consensus.ShardID shardID := node.Consensus.ShardID
shardState, err := node.Consensus.ChainReader.ReadShardState(epoch) shardState, err := node.Consensus.Blockchain.ReadShardState(epoch)
if err != nil { if err != nil {
utils.Logger().Error().Err(err). utils.Logger().Error().Err(err).
Int64("epoch", epoch.Int64()). Int64("epoch", epoch.Int64()).

@ -241,7 +241,7 @@ func (node *Node) BroadcastCrossLink() {
node.host.SendMessageToGroups( node.host.SendMessageToGroups(
[]nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(shard.BeaconChainShardID)}, []nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(shard.BeaconChainShardID)},
p2p.ConstructMessage( p2p.ConstructMessage(
proto_node.ConstructCrossLinkMessage(node.Consensus.ChainReader, headers)), proto_node.ConstructCrossLinkMessage(node.Consensus.Blockchain, headers)),
) )
} }

@ -227,7 +227,7 @@ func IsEligibleForEPoSAuction(snapshot *staking.ValidatorSnapshot, validator *st
} }
} }
// ChainReader is a subset of Engine.ChainReader, just enough to do assignment // ChainReader is a subset of Engine.Blockchain, just enough to do assignment
type ChainReader interface { type ChainReader interface {
// ReadShardState retrieves sharding state given the epoch number. // ReadShardState retrieves sharding state given the epoch number.
// This api reads the shard state cached or saved on the chaindb. // This api reads the shard state cached or saved on the chaindb.

Loading…
Cancel
Save