Cherrypick crosslink related fixes from t3 to master. (#3045)

* Optimize crosslink verification logic and add more cache (#3032)

* make crosslink broadcast smarter and more efficient (#3036)

* adjust catch up speed to be a sane number

* Fix crosslink broadcast condition (#3041)
pull/3052/head
Rongjian Lan 5 years ago committed by GitHub
parent 70d4e68c0d
commit ea2347fa5e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 13
      consensus/quorum/one-node-staked-vote.go
  2. 11
      consensus/quorum/quorum.go
  3. 119
      node/node_cross_link.go
  4. 45
      node/node_handler.go
  5. 9
      node/node_newblock.go
  6. 3
      node/node_syncing.go
  7. 11
      staking/verify/verify.go

@ -2,6 +2,7 @@ package quorum
import (
"encoding/json"
errors2 "errors"
"math/big"
"github.com/harmony-one/harmony/consensus/votepower"
@ -78,8 +79,6 @@ func (v *stakedVoteWeight) IsQuorumAchievedByMask(mask *bls_cosi.Mask) bool {
return (*currentTotalPower).GT(threshold)
}
func (v *stakedVoteWeight) computeCurrentTotalPower(p Phase) (*numeric.Dec, error) {
w := shard.BLSPublicKey{}
members := v.Participants()
ballot := func() *voteBox {
switch p {
case Prepare:
@ -94,10 +93,14 @@ func (v *stakedVoteWeight) computeCurrentTotalPower(p Phase) (*numeric.Dec, erro
}
}()
for i := range members {
if err := w.FromLibBLSPublicKey(members[i]); err != nil {
return nil, err
members := v.Participants()
membersKeys := v.ParticipantsKeyBytes()
if len(members) != len(membersKeys) {
return nil, errors2.New("Participant keys are not matching")
}
for i := range members {
w := membersKeys[i]
if _, didVote := ballot.voters[w]; !didVote &&
v.ReadBallot(p, members[i]) != nil {
ballot.currentTotal = ballot.currentTotal.Add(

@ -68,6 +68,7 @@ func (p Policy) String() string {
// ParticipantTracker ..
type ParticipantTracker interface {
Participants() []*bls.PublicKey
ParticipantsKeyBytes() []shard.BLSPublicKey
IndexOf(*bls.PublicKey) int
ParticipantsCount() int64
NextAfter(*bls.PublicKey) (bool, *bls.PublicKey)
@ -146,6 +147,7 @@ type Transition struct {
type cIdentities struct {
// Public keys of the committee including leader and validators
publicKeys []*bls.PublicKey
publicKeysByte []shard.BLSPublicKey
prepare *votepower.Round
commit *votepower.Round
// viewIDSigs: every validator
@ -196,12 +198,20 @@ func (s *cIdentities) Participants() []*bls.PublicKey {
return s.publicKeys
}
func (s *cIdentities) ParticipantsKeyBytes() []shard.BLSPublicKey {
return s.publicKeysByte
}
func (s *cIdentities) UpdateParticipants(pubKeys []*bls.PublicKey) {
keyBytes := []shard.BLSPublicKey{}
for i := range pubKeys {
k := shard.BLSPublicKey{}
k.FromLibBLSPublicKey(pubKeys[i])
keyBytes = append(keyBytes, k)
}
s.publicKeys = append(pubKeys[:0:0], pubKeys...)
s.publicKeysByte = keyBytes
}
func (s *cIdentities) ParticipantsCount() int64 {
@ -307,6 +317,7 @@ func (s *cIdentities) ReadAllBallots(p Phase) []*votepower.Ballot {
func newBallotsBackedSignatureReader() *cIdentities {
return &cIdentities{
publicKeys: []*bls.PublicKey{},
publicKeysByte: []shard.BLSPublicKey{},
prepare: votepower.NewRound(),
commit: votepower.NewRound(),
viewChange: votepower.NewRound(),

@ -1,22 +1,32 @@
package node
import (
"fmt"
"math/big"
"time"
common2 "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/consensus/quorum"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/multibls"
"github.com/harmony-one/harmony/shard"
"github.com/harmony-one/harmony/staking/verify"
"github.com/pkg/errors"
"golang.org/x/sync/singleflight"
)
const (
maxPendingCrossLinkSize = 1000
crossLinkBatchSize = 10
crossLinkBatchSize = 3
)
var (
errAlreadyExist = errors.New("crosslink already exist")
deciderCache singleflight.Group
committeeCache singleflight.Group
)
// VerifyBlockCrossLinks verifies the cross links of the block
@ -63,6 +73,11 @@ func (node *Node) ProcessCrossLinkMessage(msgPayload []byte) {
return
}
existingCLs := map[common2.Hash]struct{}{}
for _, pending := range pendingCLs {
existingCLs[pending.Hash()] = struct{}{}
}
crosslinks := []types.CrossLink{}
if err := rlp.DecodeBytes(msgPayload, &crosslinks); err != nil {
utils.Logger().Error().
@ -76,9 +91,17 @@ func (node *Node) ProcessCrossLinkMessage(msgPayload []byte) {
Msgf("[ProcessingCrossLink] Received crosslinks: %d", len(crosslinks))
for i, cl := range crosslinks {
if i > crossLinkBatchSize {
if i > crossLinkBatchSize*2 { // A sanity check to prevent spamming
break
}
if _, ok := existingCLs[cl.Hash()]; ok {
utils.Logger().Err(err).
Msgf("[ProcessingCrossLink] Cross Link already exists in pending queue, pass. Beacon Epoch: %d, Block num: %d, Epoch: %d, shardID %d",
node.Blockchain().CurrentHeader().Epoch(), cl.Number(), cl.Epoch(), cl.ShardID())
continue
}
exist, err := node.Blockchain().ReadCrossLink(cl.ShardID(), cl.Number().Uint64())
if err == nil && exist != nil {
utils.Logger().Err(err).
@ -122,29 +145,95 @@ func (node *Node) VerifyCrossLink(cl types.CrossLink) error {
)
}
// Verify signature of the new cross link header
// TODO: check whether to recalculate shard state
shardState, err := node.Blockchain().ReadShardState(cl.Epoch())
aggSig := &bls.Sign{}
sig := cl.Signature()
if err := aggSig.Deserialize(sig[:]); err != nil {
return errors.Wrapf(
err,
"[VerifyCrossLink] unable to deserialize multi-signature from payload",
)
}
committee, err := node.lookupCommittee(cl.Epoch(), cl.ShardID())
if err != nil {
return err
}
decider, err := node.lookupDecider(cl.Epoch(), cl.ShardID())
if err != nil {
return err
}
committee, err := shardState.FindCommitteeByID(cl.ShardID())
return verify.AggregateSigForCommittee(
node.Blockchain(), committee, decider, aggSig, cl.Hash(), cl.BlockNum(), cl.ViewID().Uint64(), cl.Epoch(), cl.Bitmap(),
)
}
func (node *Node) lookupDecider(
epoch *big.Int, shardID uint32,
) (quorum.Decider, error) {
key := fmt.Sprintf("decider-%d-%d", epoch.Uint64(), shardID)
result, err, _ := deciderCache.Do(
key, func() (interface{}, error) {
committee, err := node.lookupCommittee(epoch, shardID)
if err != nil {
return err
return nil, err
}
aggSig := &bls.Sign{}
sig := cl.Signature()
if err := aggSig.Deserialize(sig[:]); err != nil {
return errors.Wrapf(
err,
"[VerifyCrossLink] unable to deserialize multi-signature from payload",
decider := quorum.NewDecider(
quorum.SuperMajorityStake, committee.ShardID,
)
decider.SetMyPublicKeyProvider(func() (*multibls.PublicKey, error) {
return nil, nil
})
if _, err := decider.SetVoters(committee, epoch); err != nil {
return nil, err
}
return verify.AggregateSigForCommittee(
node.Blockchain(), committee, aggSig, cl.Hash(), cl.BlockNum(), cl.ViewID().Uint64(), cl.Epoch(), cl.Bitmap(),
go func() {
time.Sleep(120 * time.Minute)
deciderCache.Forget(key)
}()
return decider, nil
},
)
if err != nil {
return nil, err
}
return result.(quorum.Decider), nil
}
func (node *Node) lookupCommittee(
epoch *big.Int, shardID uint32,
) (*shard.Committee, error) {
key := fmt.Sprintf("committee-%d-%d", epoch.Uint64(), shardID)
result, err, _ := committeeCache.Do(
key, func() (interface{}, error) {
shardState, err := node.Blockchain().ReadShardState(epoch)
if err != nil {
return nil, err
}
committee, err := shardState.FindCommitteeByID(shardID)
if err != nil {
return nil, err
}
go func() {
time.Sleep(120 * time.Minute)
committeeCache.Forget(key)
}()
return committee, nil
},
)
if err != nil {
return nil, err
}
return result.(*shard.Committee), nil
}

@ -229,7 +229,18 @@ func (node *Node) BroadcastSlash(witness *slash.Record) {
// BroadcastCrossLink is called by consensus leader to
// send the new header as cross link to beacon chain.
func (node *Node) BroadcastCrossLink(newBlock *types.Block) {
func (node *Node) BroadcastCrossLink() {
curBlock := node.Blockchain().CurrentBlock()
if curBlock == nil {
return
}
if node.NodeConfig.ShardID == shard.BeaconChainShardID ||
!node.Blockchain().Config().IsCrossLink(curBlock.Epoch()) {
// no need to broadcast crosslink if it's beacon chain or it's not crosslink epoch
return
}
// no point to broadcast the crosslink if we aren't even in the right epoch yet
if !node.Blockchain().Config().IsCrossLink(
node.Blockchain().CurrentHeader().Epoch(),
@ -242,36 +253,50 @@ func (node *Node) BroadcastCrossLink(newBlock *types.Block) {
nodeconfig.NewGroupIDByShardID(shard.BeaconChainShardID),
)
headers := []*block.Header{}
lastLink, err := node.Beaconchain().ReadShardLastCrossLink(newBlock.ShardID())
lastLink, err := node.Beaconchain().ReadShardLastCrossLink(curBlock.ShardID())
var latestBlockNum uint64
// TODO chao: record the missing crosslink in local database instead of using latest crosslink
// if cannot find latest crosslink, broadcast latest 3 block headers
if err != nil {
utils.Logger().Debug().Err(err).Msg("[BroadcastCrossLink] ReadShardLastCrossLink Failed")
header := node.Blockchain().GetHeaderByNumber(newBlock.NumberU64() - 2)
header := node.Blockchain().GetHeaderByNumber(curBlock.NumberU64() - 2)
if header != nil && node.Blockchain().Config().IsCrossLink(header.Epoch()) {
headers = append(headers, header)
}
header = node.Blockchain().GetHeaderByNumber(newBlock.NumberU64() - 1)
header = node.Blockchain().GetHeaderByNumber(curBlock.NumberU64() - 1)
if header != nil && node.Blockchain().Config().IsCrossLink(header.Epoch()) {
headers = append(headers, header)
}
headers = append(headers, newBlock.Header())
headers = append(headers, curBlock.Header())
} else {
latestBlockNum = lastLink.BlockNum()
for blockNum := latestBlockNum + 1; blockNum <= newBlock.NumberU64(); blockNum++ {
batchSize := crossLinkBatchSize
diff := curBlock.Number().Uint64() - latestBlockNum
if diff > 100 {
// Increase batch size by 1 for every 100 blocks beyond
batchSize += int(diff-100) / 100
}
// Cap at a sane size to avoid overload network
if batchSize > crossLinkBatchSize*2 {
batchSize = crossLinkBatchSize * 2
}
for blockNum := latestBlockNum + 1; blockNum <= curBlock.NumberU64(); blockNum++ {
header := node.Blockchain().GetHeaderByNumber(blockNum)
if header != nil && node.Blockchain().Config().IsCrossLink(header.Epoch()) {
headers = append(headers, header)
if len(headers) == crossLinkBatchSize {
if len(headers) == batchSize {
break
}
}
}
}
utils.Logger().Info().Msgf("[BroadcastCrossLink] Broadcasting Block Headers, latestBlockNum %d, currentBlockNum %d, Number of Headers %d", latestBlockNum, newBlock.NumberU64(), len(headers))
utils.Logger().Info().Msgf("[BroadcastCrossLink] Broadcasting Block Headers, latestBlockNum %d, currentBlockNum %d, Number of Headers %d", latestBlockNum, curBlock.NumberU64(), len(headers))
for _, header := range headers {
utils.Logger().Debug().Msgf(
"[BroadcastCrossLink] Broadcasting %d",
@ -415,10 +440,6 @@ func (node *Node) PostConsensusProcessing(
if node.NodeConfig.ShardID == shard.BeaconChainShardID {
node.BroadcastNewBlock(newBlock)
}
if node.NodeConfig.ShardID != shard.BeaconChainShardID &&
node.Blockchain().Config().IsCrossLink(newBlock.Epoch()) {
node.BroadcastCrossLink(newBlock)
}
node.BroadcastCXReceipts(newBlock)
} else {
if node.Consensus.Mode() != consensus.Listening {

@ -186,12 +186,15 @@ func (node *Node) proposeNewBlock() (*types.Block, error) {
AnErr("[proposeNewBlock] pending crosslink is already committed onchain", err)
continue
}
if err := node.VerifyCrossLink(pending); err != nil {
invalidToDelete = append(invalidToDelete, pending)
// Crosslink is already verified before it's accepted to pending,
// no need to verify again in proposal.
if !node.Blockchain().Config().IsCrossLink(pending.Epoch()) {
utils.Logger().Debug().
AnErr("[proposeNewBlock] pending crosslink verification failed", err)
AnErr("[proposeNewBlock] pending crosslink that's before crosslink epoch", err)
continue
}
crossLinksToPropose = append(crossLinksToPropose, pending)
}
utils.Logger().Debug().

@ -168,6 +168,9 @@ func (node *Node) DoBeaconSyncing() {
)
if err != nil {
node.beaconSync.AddLastMileBlock(beaconBlock)
} else if node.Consensus.IsLeader() {
// Only leader broadcast crosslink to avoid spamming p2p
node.BroadcastCrossLink()
}
}
}

@ -9,7 +9,6 @@ import (
"github.com/harmony-one/harmony/consensus/signature"
"github.com/harmony-one/harmony/core"
bls_cosi "github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/multibls"
"github.com/harmony-one/harmony/shard"
"github.com/pkg/errors"
)
@ -23,6 +22,7 @@ var (
func AggregateSigForCommittee(
chain *core.BlockChain,
committee *shard.Committee,
decider quorum.Decider,
aggSignature *bls.Sign,
hash common.Hash,
blockNum, viewID uint64,
@ -41,15 +41,6 @@ func AggregateSigForCommittee(
return err
}
decider := quorum.NewDecider(
quorum.SuperMajorityStake, committee.ShardID,
)
decider.SetMyPublicKeyProvider(func() (*multibls.PublicKey, error) {
return nil, nil
})
if _, err := decider.SetVoters(committee, epoch); err != nil {
return err
}
if !decider.IsQuorumAchievedByMask(mask) {
return errQuorumVerifyAggSign
}

Loading…
Cancel
Save