Feature 4160 beacon crosslink signal. (#4169)

* Sending crosslinks from beacon to non-beacon chains.

* Sending crosslinks from beacon to non-beacon chains.

* Broadcast.

* Fix comment.

* Crosslink signal.

* Crosslink signal.

* Add comment to run tests.

* Fix comments.

* Fix comments.

* Fix review comments.

* Fix comments.

* Fix comments.

Co-authored-by: Konstantin <k.potapov@softpro.com>
pull/4193/head
Konstantin 2 years ago committed by GitHub
parent 9ac7866d05
commit 7a8e5d468c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 48
      api/proto/node/node.go
  2. 61
      api/service/crosslink_sending/service.go
  3. 4
      api/service/legacysync/downloader/client.go
  4. 3
      api/service/manager.go
  5. 2
      cmd/harmony/main.go
  6. 4
      consensus/consensus.go
  7. 20
      core/types/crosslink.go
  8. 9
      core/types/crosslink_heartbeat.go
  9. 48
      core/types/crosslink_test.go
  10. 2
      node/node.go
  11. 12
      node/node_cross_link.go
  12. 141
      node/node_handler.go
  13. 4
      node/node_syncing.go

@ -42,30 +42,33 @@ type BlockMessageType int
// Block sync message subtype
const (
Sync BlockMessageType = iota
CrossLink // used for crosslink from beacon chain to shard chain
Receipt // cross-shard transaction receipts
SlashCandidate // A report of a double-signing event
Sync BlockMessageType = iota
CrossLink // used for crosslink from beacon chain to shard chain
Receipt // cross-shard transaction receipts
SlashCandidate // A report of a double-signing event
CrosslinkHeartbeat // Heart beat signal for crosslinks. Needed for epoch chain.
)
var (
// B suffix means Byte
nodeB = byte(proto.Node)
blockB = byte(Block)
slashB = byte(SlashCandidate)
txnB = byte(Transaction)
sendB = byte(Send)
stakingB = byte(Staking)
syncB = byte(Sync)
crossLinkB = byte(CrossLink)
receiptB = byte(Receipt)
nodeB = byte(proto.Node)
blockB = byte(Block)
slashB = byte(SlashCandidate)
txnB = byte(Transaction)
sendB = byte(Send)
stakingB = byte(Staking)
syncB = byte(Sync)
crossLinkB = byte(CrossLink)
crossLinkHeardBeatB = byte(CrosslinkHeartbeat)
receiptB = byte(Receipt)
// H suffix means header
slashH = []byte{nodeB, blockB, slashB}
transactionListH = []byte{nodeB, txnB, sendB}
stakingTxnListH = []byte{nodeB, stakingB, sendB}
syncH = []byte{nodeB, blockB, syncB}
crossLinkH = []byte{nodeB, blockB, crossLinkB}
cxReceiptH = []byte{nodeB, blockB, receiptB}
slashH = []byte{nodeB, blockB, slashB}
transactionListH = []byte{nodeB, txnB, sendB}
stakingTxnListH = []byte{nodeB, stakingB, sendB}
syncH = []byte{nodeB, blockB, syncB}
crossLinkH = []byte{nodeB, blockB, crossLinkB}
cxReceiptH = []byte{nodeB, blockB, receiptB}
crossLinkHeartBeatH = []byte{nodeB, blockB, crossLinkHeardBeatB}
)
// ConstructTransactionListMessageAccount constructs serialized transactions in account model
@ -110,6 +113,13 @@ func ConstructSlashMessage(witnesses slash.Records) []byte {
return byteBuffer.Bytes()
}
func ConstructCrossLinkHeartBeatMessage(hb types.CrosslinkHeartbeat) []byte {
byteBuffer := bytes.NewBuffer(crossLinkHeartBeatH)
data, _ := rlp.EncodeToBytes(hb)
byteBuffer.Write(data)
return byteBuffer.Bytes()
}
// ConstructCrossLinkMessage constructs cross link message to send to beacon chain
func ConstructCrossLinkMessage(bc engine.ChainReader, headers []*block.Header) []byte {
byteBuffer := bytes.NewBuffer(crossLinkH)

@ -0,0 +1,61 @@
package crosslink_sending
import (
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/shard"
)
type broadcast interface {
BroadcastCrosslinkHeartbeatSignalFromBeaconToShards()
BroadcastCrossLinkFromShardsToBeacon()
}
type Service struct {
node broadcast
bc *core.BlockChain
ch chan core.ChainEvent
closeCh chan struct{}
beacon bool
}
func New(node broadcast, bc *core.BlockChain) *Service {
return &Service{
node: node,
bc: bc,
ch: make(chan core.ChainEvent, 1),
closeCh: make(chan struct{}),
beacon: bc.ShardID() == shard.BeaconChainShardID,
}
}
// Start starts service.
func (s *Service) Start() error {
s.bc.SubscribeChainEvent(s.ch)
go s.run()
return nil
}
func (s *Service) run() {
for {
select {
case _, ok := <-s.ch:
if !ok {
return
}
if s.beacon {
go s.node.BroadcastCrosslinkHeartbeatSignalFromBeaconToShards()
} else {
// TODO: this should be uncommented for beacon sync, no need to have it now.
//go s.node.BroadcastCrossLinkFromShardsToBeacon()
}
case <-s.closeCh:
return
}
}
}
// Stop stops service.
func (s *Service) Stop() error {
close(s.closeCh)
return nil
}

@ -65,9 +65,9 @@ func (client *Client) GetBlocksByHeights(heights []uint64) *pb.DownloaderRespons
Type: pb.DownloaderRequest_BLOCKBYHEIGHT,
Heights: heights,
}
response, err := client.dlClient.Query(ctx, request)
response, err := client.dlClient.Query(ctx, request, grpc.MaxCallRecvMsgSize(32*1024*1024))
if err != nil {
utils.Logger().Error().Err(err).Str("target", client.conn.Target()).Msg("[SYNC] GetBlockHashes query failed")
utils.Logger().Error().Err(err).Str("target", client.conn.Target()).Msg("[SYNC] GetBlocksByHeights query failed")
}
return response
}

@ -22,6 +22,7 @@ const (
Pprof
Prometheus
Synchronize
CrosslinkSending
)
func (t Type) String() string {
@ -42,6 +43,8 @@ func (t Type) String() string {
return "Prometheus"
case Synchronize:
return "Synchronize"
case CrosslinkSending:
return "CrosslinkSending"
default:
return "Unknown"
}

@ -15,6 +15,7 @@ import (
"syscall"
"time"
"github.com/harmony-one/harmony/api/service/crosslink_sending"
rosetta_common "github.com/harmony-one/harmony/rosetta/common"
harmonyconfig "github.com/harmony-one/harmony/internal/configs/harmony"
@ -408,6 +409,7 @@ func setupNodeAndRun(hc harmonyconfig.HarmonyConfig) {
} else if currentNode.NodeConfig.Role() == nodeconfig.ExplorerNode {
currentNode.RegisterExplorerServices()
}
currentNode.RegisterService(service.CrosslinkSending, crosslink_sending.New(currentNode, currentNode.Blockchain()))
if hc.Pprof.Enabled {
setupPprofService(currentNode, hc)
}

@ -168,6 +168,10 @@ func (consensus *Consensus) GetPublicKeys() multibls.PublicKeys {
return consensus.priKey.GetPublicKeys()
}
func (consensus *Consensus) GetPrivateKeys() multibls.PrivateKeys {
return consensus.priKey
}
// GetLeaderPrivateKey returns leader private key if node is the leader
func (consensus *Consensus) GetLeaderPrivateKey(leaderKey *bls_core.PublicKey) (*bls.PrivateKeyWrapper, error) {
for i, key := range consensus.priKey {

@ -130,17 +130,25 @@ type CrossLinks []CrossLink
// Sort crosslinks by shardID and then tie break by blockNum then by viewID
func (cls CrossLinks) Sort() {
sort.Slice(cls, func(i, j int) bool {
return cls[i].ShardID() < cls[j].ShardID() ||
(cls[i].ShardID() == cls[j].ShardID() && cls[i].Number().Cmp(cls[j].Number()) < 0) ||
(cls[i].ShardID() == cls[j].ShardID() && cls[i].Number() == cls[j].Number() && cls[i].ViewID().Cmp(cls[j].ViewID()) < 0)
if s1, s2 := cls[i].ShardID(), cls[j].ShardID(); s1 != s2 {
return s1 < s2
}
if s1, s2 := cls[i].Number(), cls[j].Number(); s1.Cmp(s2) != 0 {
return s1.Cmp(s2) < 0
}
return cls[i].ViewID().Cmp(cls[j].ViewID()) < 0
})
}
// IsSorted checks whether the cross links are sorted
func (cls CrossLinks) IsSorted() bool {
return sort.SliceIsSorted(cls, func(i, j int) bool {
return cls[i].ShardID() < cls[j].ShardID() ||
(cls[i].ShardID() == cls[j].ShardID() && cls[i].Number().Cmp(cls[j].Number()) < 0) ||
(cls[i].ShardID() == cls[j].ShardID() && cls[i].Number() == cls[j].Number() && cls[i].ViewID().Cmp(cls[j].ViewID()) < 0)
if s1, s2 := cls[i].ShardID(), cls[j].ShardID(); s1 != s2 {
return s1 < s2
}
if s1, s2 := cls[i].Number(), cls[j].Number(); s1.Cmp(s2) != 0 {
return s1.Cmp(s2) < 0
}
return cls[i].ViewID().Cmp(cls[j].ViewID()) < 0
})
}

@ -0,0 +1,9 @@
package types
type CrosslinkHeartbeat struct {
ShardID uint32
LatestContinuousBlockNum uint64
Epoch uint64
PublicKey []byte
Signature []byte
}

@ -0,0 +1,48 @@
package types
import (
"math/big"
"testing"
"github.com/stretchr/testify/require"
)
func TestCrosslinks_Sorting(t *testing.T) {
bb := CrossLinks{
{
BlockNumberF: big.NewInt(1),
ViewIDF: big.NewInt(0),
},
{
BlockNumberF: big.NewInt(1),
ViewIDF: big.NewInt(1),
},
{
BlockNumberF: big.NewInt(1),
ViewIDF: big.NewInt(4),
},
{
BlockNumberF: big.NewInt(1),
ViewIDF: big.NewInt(3),
},
{
BlockNumberF: big.NewInt(1),
ViewIDF: big.NewInt(2),
},
}
bb.Sort()
for i, v := range bb {
require.EqualValues(t, i, v.ViewID().Uint64())
}
require.True(t, bb.IsSorted())
}
func TestBigNumberInequality(t *testing.T) {
type A struct {
X int
}
require.False(t, big.NewInt(1) == big.NewInt(1))
require.False(t, &A{} == &A{})
}

@ -465,6 +465,8 @@ func (node *Node) validateNodeMessage(ctx context.Context, payload []byte) (
node.NodeConfig.Role() == nodeconfig.ExplorerNode {
return nil, 0, errIgnoreBeaconMsg
}
case proto_node.CrosslinkHeartbeat:
nodeNodeMessageCounterVec.With(prometheus.Labels{"type": "crosslink_heartbeat"}).Inc()
default:
nodeNodeMessageCounterVec.With(prometheus.Labels{"type": "invalid_block_type"}).Inc()
return nil, 0, errInvalidNodeMsg

@ -53,6 +53,14 @@ func (node *Node) VerifyBlockCrossLinks(block *types.Block) error {
return nil
}
// ProcessCrossLinkHeartbeatMessage process crosslink heart beat signal.
func (node *Node) ProcessCrossLinkHeartbeatMessage(msgPayload []byte) {
if node.IsRunningBeaconChain() {
return
}
// process in next pr.
}
// ProcessCrossLinkMessage verify and process Node/CrossLink message into crosslink when it's valid
func (node *Node) ProcessCrossLinkMessage(msgPayload []byte) {
if node.IsRunningBeaconChain() {
@ -68,7 +76,7 @@ func (node *Node) ProcessCrossLinkMessage(msgPayload []byte) {
existingCLs[pending.Hash()] = struct{}{}
}
crosslinks := []types.CrossLink{}
var crosslinks []types.CrossLink
if err := rlp.DecodeBytes(msgPayload, &crosslinks); err != nil {
utils.Logger().Error().
Err(err).
@ -76,7 +84,7 @@ func (node *Node) ProcessCrossLinkMessage(msgPayload []byte) {
return
}
candidates := []types.CrossLink{}
var candidates []types.CrossLink
utils.Logger().Debug().
Msgf("[ProcessingCrossLink] Received crosslinks: %d", len(crosslinks))

@ -4,13 +4,17 @@ import (
"bytes"
"context"
"math/rand"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/api/proto"
proto_node "github.com/harmony-one/harmony/api/proto/node"
"github.com/harmony-one/harmony/block"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils"
@ -39,6 +43,8 @@ func (node *Node) processSkippedMsgTypeByteValue(
node.ProcessReceiptMessage(content)
case proto_node.CrossLink:
node.ProcessCrossLinkMessage(content)
case proto_node.CrosslinkHeartbeat:
node.ProcessCrossLinkHeartbeatMessage(content)
default:
utils.Logger().Error().
Int("message-iota-value", int(cat)).
@ -47,9 +53,9 @@ func (node *Node) processSkippedMsgTypeByteValue(
}
var (
errInvalidPayloadSize = errors.New("invalid payload size")
errWrongBlockMsgSize = errors.New("invalid block message size")
latestSentCrosslinkNum uint64 = 0
errInvalidPayloadSize = errors.New("invalid payload size")
errWrongBlockMsgSize = errors.New("invalid block message size")
latestSentCrosslink uint64 = 0
)
// HandleNodeMessage parses the message and dispatch the actions.
@ -88,7 +94,8 @@ func (node *Node) HandleNodeMessage(
case
proto_node.SlashCandidate,
proto_node.Receipt,
proto_node.CrossLink:
proto_node.CrossLink,
proto_node.CrosslinkHeartbeat:
// skip first byte which is blockMsgType
node.processSkippedMsgTypeByteValue(blockMsgType, msgPayload[1:])
}
@ -164,17 +171,20 @@ func (node *Node) BroadcastSlash(witness *slash.Record) {
utils.Logger().Info().Msg("broadcast the double sign record")
}
// BroadcastCrossLink is called by consensus leader to
// BroadcastCrossLinkFromShardsToBeacon is called by consensus leader to
// send the new header as cross link to beacon chain.
func (node *Node) BroadcastCrossLink() {
func (node *Node) BroadcastCrossLinkFromShardsToBeacon() { // leader of 1-3 shards
if node.IsRunningBeaconChain() {
return
}
curBlock := node.Blockchain().CurrentBlock()
if curBlock == nil {
return
}
shardID := curBlock.ShardID()
if node.IsRunningBeaconChain() ||
!node.Blockchain().Config().IsCrossLink(curBlock.Epoch()) {
// no need to broadcast crosslink if it's beacon chain or it's not crosslink epoch
if !node.Blockchain().Config().IsCrossLink(curBlock.Epoch()) {
// no need to broadcast crosslink if it's beacon chain, or it's not crosslink epoch
return
}
@ -189,27 +199,110 @@ func (node *Node) BroadcastCrossLink() {
"Construct and Broadcasting new crosslink to beacon chain groupID %s",
nodeconfig.NewGroupIDByShardID(shard.BeaconChainShardID),
)
headers := []*block.Header{}
lastLink, err := node.Beaconchain().ReadShardLastCrossLink(curBlock.ShardID())
headers, err := getCrosslinkHeadersForShards(node.Beaconchain(), node.Blockchain(), curBlock, shardID, &latestSentCrosslink)
if err != nil {
utils.Logger().Error().Err(err).Msg("[BroadcastCrossLink] failed to get crosslinks")
return
}
if len(headers) == 0 {
utils.Logger().Info().Msg("[BroadcastCrossLink] no crosslinks to broadcast")
return
}
node.host.SendMessageToGroups(
[]nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(shard.BeaconChainShardID)},
p2p.ConstructMessage(
proto_node.ConstructCrossLinkMessage(node.Consensus.Blockchain, headers)),
)
}
// BroadcastCrosslinkHeartbeatSignalFromBeaconToShards is called by consensus leader or 1% validators to
// send last cross link to shard chains.
func (node *Node) BroadcastCrosslinkHeartbeatSignalFromBeaconToShards() { // leader of 0 shard
if !node.IsRunningBeaconChain() {
return
}
if !(node.IsCurrentlyLeader() || rand.Intn(100) == 0) {
return
}
curBlock := node.Beaconchain().CurrentBlock()
if curBlock == nil {
return
}
if !node.Blockchain().Config().IsCrossLink(curBlock.Epoch()) {
// no need to broadcast crosslink if it's beacon chain, or it's not crosslink epoch
return
}
var privToSing *bls.PrivateKeyWrapper
for _, priv := range node.Consensus.GetPrivateKeys() {
if node.Consensus.IsValidatorInCommittee(priv.Pub.Bytes) {
privToSing = &priv
break
}
}
if privToSing == nil {
return
}
for _, shardID := range []uint32{1, 2, 3} {
lastLink, err := node.Blockchain().ReadShardLastCrossLink(shardID)
if err != nil {
utils.Logger().Error().Err(err).Msg("[BroadcastCrossLinkSignal] failed to get crosslinks")
continue
}
hb := types.CrosslinkHeartbeat{
ShardID: lastLink.ShardID(),
LatestContinuousBlockNum: lastLink.BlockNum(),
Epoch: lastLink.Epoch().Uint64(),
PublicKey: privToSing.Pub.Bytes[:],
Signature: nil,
}
rs, err := rlp.EncodeToBytes(hb)
if err != nil {
utils.Logger().Error().Err(err).Msg("[BroadcastCrossLinkSignal] failed to encode signal")
continue
}
hb.Signature = privToSing.Pri.SignHash(rs).Serialize()
bts := proto_node.ConstructCrossLinkHeartBeatMessage(hb)
node.host.SendMessageToGroups(
[]nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(shardID))},
p2p.ConstructMessage(bts),
)
}
}
// getCrosslinkHeadersForShards get headers required for crosslink creation.
func getCrosslinkHeadersForShards(beacon *core.BlockChain, shardChain *core.BlockChain, curBlock *types.Block, shardID uint32, latestSentCrosslink *uint64) ([]*block.Header, error) {
var headers []*block.Header
lastLink, err := beacon.ReadShardLastCrossLink(shardID)
var latestBlockNum uint64
// TODO chao: record the missing crosslink in local database instead of using latest crosslink
// if cannot find latest crosslink, broadcast latest 3 block headers
if err != nil {
utils.Logger().Debug().Err(err).Msg("[BroadcastCrossLink] ReadShardLastCrossLink Failed")
header := node.Blockchain().GetHeaderByNumber(curBlock.NumberU64() - 2)
if header != nil && node.Blockchain().Config().IsCrossLink(header.Epoch()) {
header := shardChain.GetHeaderByNumber(curBlock.NumberU64() - 2)
if header != nil && shardChain.Config().IsCrossLink(header.Epoch()) {
headers = append(headers, header)
}
header = node.Blockchain().GetHeaderByNumber(curBlock.NumberU64() - 1)
if header != nil && node.Blockchain().Config().IsCrossLink(header.Epoch()) {
header = shardChain.GetHeaderByNumber(curBlock.NumberU64() - 1)
if header != nil && shardChain.Config().IsCrossLink(header.Epoch()) {
headers = append(headers, header)
}
headers = append(headers, curBlock.Header())
} else {
latestBlockNum = lastLink.BlockNum()
if latestSentCrosslinkNum > latestBlockNum && latestSentCrosslinkNum <= latestBlockNum+crossLinkBatchSize*6 {
latestBlockNum = latestSentCrosslinkNum
latest := atomic.LoadUint64(latestSentCrosslink)
if latest > latestBlockNum && latest <= latestBlockNum+crossLinkBatchSize*6 {
latestBlockNum = latest
}
batchSize := crossLinkBatchSize
@ -224,8 +317,8 @@ func (node *Node) BroadcastCrossLink() {
}
for blockNum := latestBlockNum + 1; blockNum <= curBlock.NumberU64(); blockNum++ {
header := node.Blockchain().GetHeaderByNumber(blockNum)
if header != nil && node.Blockchain().Config().IsCrossLink(header.Epoch()) {
header := shardChain.GetHeaderByNumber(blockNum)
if header != nil && shardChain.Config().IsCrossLink(header.Epoch()) {
headers = append(headers, header)
if len(headers) == batchSize {
break
@ -241,18 +334,14 @@ func (node *Node) BroadcastCrossLink() {
header.Number().Uint64(),
)
if i == len(headers)-1 {
latestSentCrosslinkNum = header.Number().Uint64()
atomic.StoreUint64(latestSentCrosslink, header.Number().Uint64())
}
}
node.host.SendMessageToGroups(
[]nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(shard.BeaconChainShardID)},
p2p.ConstructMessage(
proto_node.ConstructCrossLinkMessage(node.Consensus.Blockchain, headers)),
)
return headers, nil
}
// VerifyNewBlock is called by consensus participants to verify the block (account model) they are
// running consensus on
// running consensus on.
func (node *Node) VerifyNewBlock(newBlock *types.Block) error {
if newBlock == nil || newBlock.Header() == nil {
return errors.New("nil header or block asked to verify")

@ -52,7 +52,7 @@ func (node *Node) BeaconSyncHook() {
if node.Consensus.IsLeader() || rand.Intn(100) == 0 {
// TODO: Instead of leader, it would better be validator do this broadcast since leader do
// not have much idle resources.
node.BroadcastCrossLink()
node.BroadcastCrossLinkFromShardsToBeacon()
}
}
@ -235,7 +235,7 @@ func (node *Node) doBeaconSyncing() {
} else if node.Consensus.IsLeader() || rand.Intn(100) == 0 {
// Only leader or 1% of validators broadcast crosslink to avoid spamming p2p
if beaconBlock.NumberU64() == node.Beaconchain().CurrentBlock().NumberU64() {
node.BroadcastCrossLink()
node.BroadcastCrossLinkFromShardsToBeacon()
}
}
}

Loading…
Cancel
Save