diff --git a/api/proto/node/node.go b/api/proto/node/node.go index 67402193f..aff7ff382 100644 --- a/api/proto/node/node.go +++ b/api/proto/node/node.go @@ -42,30 +42,33 @@ type BlockMessageType int // Block sync message subtype const ( - Sync BlockMessageType = iota - CrossLink // used for crosslink from beacon chain to shard chain - Receipt // cross-shard transaction receipts - SlashCandidate // A report of a double-signing event + Sync BlockMessageType = iota + CrossLink // used for crosslink from beacon chain to shard chain + Receipt // cross-shard transaction receipts + SlashCandidate // A report of a double-signing event + CrosslinkHeartbeat // Heart beat signal for crosslinks. Needed for epoch chain. ) var ( // B suffix means Byte - nodeB = byte(proto.Node) - blockB = byte(Block) - slashB = byte(SlashCandidate) - txnB = byte(Transaction) - sendB = byte(Send) - stakingB = byte(Staking) - syncB = byte(Sync) - crossLinkB = byte(CrossLink) - receiptB = byte(Receipt) + nodeB = byte(proto.Node) + blockB = byte(Block) + slashB = byte(SlashCandidate) + txnB = byte(Transaction) + sendB = byte(Send) + stakingB = byte(Staking) + syncB = byte(Sync) + crossLinkB = byte(CrossLink) + crossLinkHeardBeatB = byte(CrosslinkHeartbeat) + receiptB = byte(Receipt) // H suffix means header - slashH = []byte{nodeB, blockB, slashB} - transactionListH = []byte{nodeB, txnB, sendB} - stakingTxnListH = []byte{nodeB, stakingB, sendB} - syncH = []byte{nodeB, blockB, syncB} - crossLinkH = []byte{nodeB, blockB, crossLinkB} - cxReceiptH = []byte{nodeB, blockB, receiptB} + slashH = []byte{nodeB, blockB, slashB} + transactionListH = []byte{nodeB, txnB, sendB} + stakingTxnListH = []byte{nodeB, stakingB, sendB} + syncH = []byte{nodeB, blockB, syncB} + crossLinkH = []byte{nodeB, blockB, crossLinkB} + cxReceiptH = []byte{nodeB, blockB, receiptB} + crossLinkHeartBeatH = []byte{nodeB, blockB, crossLinkHeardBeatB} ) // ConstructTransactionListMessageAccount constructs serialized transactions in account model @@ -110,6 +113,13 @@ func ConstructSlashMessage(witnesses slash.Records) []byte { return byteBuffer.Bytes() } +func ConstructCrossLinkHeartBeatMessage(hb types.CrosslinkHeartbeat) []byte { + byteBuffer := bytes.NewBuffer(crossLinkHeartBeatH) + data, _ := rlp.EncodeToBytes(hb) + byteBuffer.Write(data) + return byteBuffer.Bytes() +} + // ConstructCrossLinkMessage constructs cross link message to send to beacon chain func ConstructCrossLinkMessage(bc engine.ChainReader, headers []*block.Header) []byte { byteBuffer := bytes.NewBuffer(crossLinkH) diff --git a/api/service/crosslink_sending/service.go b/api/service/crosslink_sending/service.go new file mode 100644 index 000000000..b9618e7ef --- /dev/null +++ b/api/service/crosslink_sending/service.go @@ -0,0 +1,61 @@ +package crosslink_sending + +import ( + "github.com/harmony-one/harmony/core" + "github.com/harmony-one/harmony/shard" +) + +type broadcast interface { + BroadcastCrosslinkHeartbeatSignalFromBeaconToShards() + BroadcastCrossLinkFromShardsToBeacon() +} + +type Service struct { + node broadcast + bc *core.BlockChain + ch chan core.ChainEvent + closeCh chan struct{} + beacon bool +} + +func New(node broadcast, bc *core.BlockChain) *Service { + return &Service{ + node: node, + bc: bc, + ch: make(chan core.ChainEvent, 1), + closeCh: make(chan struct{}), + beacon: bc.ShardID() == shard.BeaconChainShardID, + } +} + +// Start starts service. +func (s *Service) Start() error { + s.bc.SubscribeChainEvent(s.ch) + go s.run() + return nil +} + +func (s *Service) run() { + for { + select { + case _, ok := <-s.ch: + if !ok { + return + } + if s.beacon { + go s.node.BroadcastCrosslinkHeartbeatSignalFromBeaconToShards() + } else { + // TODO: this should be uncommented for beacon sync, no need to have it now. + //go s.node.BroadcastCrossLinkFromShardsToBeacon() + } + case <-s.closeCh: + return + } + } +} + +// Stop stops service. +func (s *Service) Stop() error { + close(s.closeCh) + return nil +} diff --git a/api/service/legacysync/downloader/client.go b/api/service/legacysync/downloader/client.go index ca9d54fc9..42d5954b8 100644 --- a/api/service/legacysync/downloader/client.go +++ b/api/service/legacysync/downloader/client.go @@ -65,9 +65,9 @@ func (client *Client) GetBlocksByHeights(heights []uint64) *pb.DownloaderRespons Type: pb.DownloaderRequest_BLOCKBYHEIGHT, Heights: heights, } - response, err := client.dlClient.Query(ctx, request) + response, err := client.dlClient.Query(ctx, request, grpc.MaxCallRecvMsgSize(32*1024*1024)) if err != nil { - utils.Logger().Error().Err(err).Str("target", client.conn.Target()).Msg("[SYNC] GetBlockHashes query failed") + utils.Logger().Error().Err(err).Str("target", client.conn.Target()).Msg("[SYNC] GetBlocksByHeights query failed") } return response } diff --git a/api/service/manager.go b/api/service/manager.go index e12d3f78f..57ca15d60 100644 --- a/api/service/manager.go +++ b/api/service/manager.go @@ -22,6 +22,7 @@ const ( Pprof Prometheus Synchronize + CrosslinkSending ) func (t Type) String() string { @@ -42,6 +43,8 @@ func (t Type) String() string { return "Prometheus" case Synchronize: return "Synchronize" + case CrosslinkSending: + return "CrosslinkSending" default: return "Unknown" } diff --git a/cmd/harmony/main.go b/cmd/harmony/main.go index 377e7e40d..f28a19630 100644 --- a/cmd/harmony/main.go +++ b/cmd/harmony/main.go @@ -15,6 +15,7 @@ import ( "syscall" "time" + "github.com/harmony-one/harmony/api/service/crosslink_sending" rosetta_common "github.com/harmony-one/harmony/rosetta/common" harmonyconfig "github.com/harmony-one/harmony/internal/configs/harmony" @@ -408,6 +409,7 @@ func setupNodeAndRun(hc harmonyconfig.HarmonyConfig) { } else if currentNode.NodeConfig.Role() == nodeconfig.ExplorerNode { currentNode.RegisterExplorerServices() } + currentNode.RegisterService(service.CrosslinkSending, crosslink_sending.New(currentNode, currentNode.Blockchain())) if hc.Pprof.Enabled { setupPprofService(currentNode, hc) } diff --git a/consensus/consensus.go b/consensus/consensus.go index 246c93852..2ee60ccbc 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -168,6 +168,10 @@ func (consensus *Consensus) GetPublicKeys() multibls.PublicKeys { return consensus.priKey.GetPublicKeys() } +func (consensus *Consensus) GetPrivateKeys() multibls.PrivateKeys { + return consensus.priKey +} + // GetLeaderPrivateKey returns leader private key if node is the leader func (consensus *Consensus) GetLeaderPrivateKey(leaderKey *bls_core.PublicKey) (*bls.PrivateKeyWrapper, error) { for i, key := range consensus.priKey { diff --git a/core/types/crosslink.go b/core/types/crosslink.go index b0cd567f3..36464484f 100644 --- a/core/types/crosslink.go +++ b/core/types/crosslink.go @@ -130,17 +130,25 @@ type CrossLinks []CrossLink // Sort crosslinks by shardID and then tie break by blockNum then by viewID func (cls CrossLinks) Sort() { sort.Slice(cls, func(i, j int) bool { - return cls[i].ShardID() < cls[j].ShardID() || - (cls[i].ShardID() == cls[j].ShardID() && cls[i].Number().Cmp(cls[j].Number()) < 0) || - (cls[i].ShardID() == cls[j].ShardID() && cls[i].Number() == cls[j].Number() && cls[i].ViewID().Cmp(cls[j].ViewID()) < 0) + if s1, s2 := cls[i].ShardID(), cls[j].ShardID(); s1 != s2 { + return s1 < s2 + } + if s1, s2 := cls[i].Number(), cls[j].Number(); s1.Cmp(s2) != 0 { + return s1.Cmp(s2) < 0 + } + return cls[i].ViewID().Cmp(cls[j].ViewID()) < 0 }) } // IsSorted checks whether the cross links are sorted func (cls CrossLinks) IsSorted() bool { return sort.SliceIsSorted(cls, func(i, j int) bool { - return cls[i].ShardID() < cls[j].ShardID() || - (cls[i].ShardID() == cls[j].ShardID() && cls[i].Number().Cmp(cls[j].Number()) < 0) || - (cls[i].ShardID() == cls[j].ShardID() && cls[i].Number() == cls[j].Number() && cls[i].ViewID().Cmp(cls[j].ViewID()) < 0) + if s1, s2 := cls[i].ShardID(), cls[j].ShardID(); s1 != s2 { + return s1 < s2 + } + if s1, s2 := cls[i].Number(), cls[j].Number(); s1.Cmp(s2) != 0 { + return s1.Cmp(s2) < 0 + } + return cls[i].ViewID().Cmp(cls[j].ViewID()) < 0 }) } diff --git a/core/types/crosslink_heartbeat.go b/core/types/crosslink_heartbeat.go new file mode 100644 index 000000000..2ef9dbf66 --- /dev/null +++ b/core/types/crosslink_heartbeat.go @@ -0,0 +1,9 @@ +package types + +type CrosslinkHeartbeat struct { + ShardID uint32 + LatestContinuousBlockNum uint64 + Epoch uint64 + PublicKey []byte + Signature []byte +} diff --git a/core/types/crosslink_test.go b/core/types/crosslink_test.go new file mode 100644 index 000000000..a9f6bc746 --- /dev/null +++ b/core/types/crosslink_test.go @@ -0,0 +1,48 @@ +package types + +import ( + "math/big" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestCrosslinks_Sorting(t *testing.T) { + bb := CrossLinks{ + { + BlockNumberF: big.NewInt(1), + ViewIDF: big.NewInt(0), + }, + { + BlockNumberF: big.NewInt(1), + ViewIDF: big.NewInt(1), + }, + { + BlockNumberF: big.NewInt(1), + ViewIDF: big.NewInt(4), + }, + { + BlockNumberF: big.NewInt(1), + ViewIDF: big.NewInt(3), + }, + { + BlockNumberF: big.NewInt(1), + ViewIDF: big.NewInt(2), + }, + } + bb.Sort() + + for i, v := range bb { + require.EqualValues(t, i, v.ViewID().Uint64()) + } + + require.True(t, bb.IsSorted()) +} + +func TestBigNumberInequality(t *testing.T) { + type A struct { + X int + } + require.False(t, big.NewInt(1) == big.NewInt(1)) + require.False(t, &A{} == &A{}) +} diff --git a/node/node.go b/node/node.go index 99a7ce318..13f6e879b 100644 --- a/node/node.go +++ b/node/node.go @@ -465,6 +465,8 @@ func (node *Node) validateNodeMessage(ctx context.Context, payload []byte) ( node.NodeConfig.Role() == nodeconfig.ExplorerNode { return nil, 0, errIgnoreBeaconMsg } + case proto_node.CrosslinkHeartbeat: + nodeNodeMessageCounterVec.With(prometheus.Labels{"type": "crosslink_heartbeat"}).Inc() default: nodeNodeMessageCounterVec.With(prometheus.Labels{"type": "invalid_block_type"}).Inc() return nil, 0, errInvalidNodeMsg diff --git a/node/node_cross_link.go b/node/node_cross_link.go index fce630dfa..3811d033f 100644 --- a/node/node_cross_link.go +++ b/node/node_cross_link.go @@ -53,6 +53,14 @@ func (node *Node) VerifyBlockCrossLinks(block *types.Block) error { return nil } +// ProcessCrossLinkHeartbeatMessage process crosslink heart beat signal. +func (node *Node) ProcessCrossLinkHeartbeatMessage(msgPayload []byte) { + if node.IsRunningBeaconChain() { + return + } + // process in next pr. +} + // ProcessCrossLinkMessage verify and process Node/CrossLink message into crosslink when it's valid func (node *Node) ProcessCrossLinkMessage(msgPayload []byte) { if node.IsRunningBeaconChain() { @@ -68,7 +76,7 @@ func (node *Node) ProcessCrossLinkMessage(msgPayload []byte) { existingCLs[pending.Hash()] = struct{}{} } - crosslinks := []types.CrossLink{} + var crosslinks []types.CrossLink if err := rlp.DecodeBytes(msgPayload, &crosslinks); err != nil { utils.Logger().Error(). Err(err). @@ -76,7 +84,7 @@ func (node *Node) ProcessCrossLinkMessage(msgPayload []byte) { return } - candidates := []types.CrossLink{} + var candidates []types.CrossLink utils.Logger().Debug(). Msgf("[ProcessingCrossLink] Received crosslinks: %d", len(crosslinks)) diff --git a/node/node_handler.go b/node/node_handler.go index ec027c0df..2372b6536 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -4,13 +4,17 @@ import ( "bytes" "context" "math/rand" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/rlp" + "github.com/harmony-one/harmony/crypto/bls" + "github.com/harmony-one/harmony/api/proto" proto_node "github.com/harmony-one/harmony/api/proto/node" "github.com/harmony-one/harmony/block" "github.com/harmony-one/harmony/consensus" + "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/types" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/internal/utils" @@ -39,6 +43,8 @@ func (node *Node) processSkippedMsgTypeByteValue( node.ProcessReceiptMessage(content) case proto_node.CrossLink: node.ProcessCrossLinkMessage(content) + case proto_node.CrosslinkHeartbeat: + node.ProcessCrossLinkHeartbeatMessage(content) default: utils.Logger().Error(). Int("message-iota-value", int(cat)). @@ -47,9 +53,9 @@ func (node *Node) processSkippedMsgTypeByteValue( } var ( - errInvalidPayloadSize = errors.New("invalid payload size") - errWrongBlockMsgSize = errors.New("invalid block message size") - latestSentCrosslinkNum uint64 = 0 + errInvalidPayloadSize = errors.New("invalid payload size") + errWrongBlockMsgSize = errors.New("invalid block message size") + latestSentCrosslink uint64 = 0 ) // HandleNodeMessage parses the message and dispatch the actions. @@ -88,7 +94,8 @@ func (node *Node) HandleNodeMessage( case proto_node.SlashCandidate, proto_node.Receipt, - proto_node.CrossLink: + proto_node.CrossLink, + proto_node.CrosslinkHeartbeat: // skip first byte which is blockMsgType node.processSkippedMsgTypeByteValue(blockMsgType, msgPayload[1:]) } @@ -164,17 +171,20 @@ func (node *Node) BroadcastSlash(witness *slash.Record) { utils.Logger().Info().Msg("broadcast the double sign record") } -// BroadcastCrossLink is called by consensus leader to +// BroadcastCrossLinkFromShardsToBeacon is called by consensus leader to // send the new header as cross link to beacon chain. -func (node *Node) BroadcastCrossLink() { +func (node *Node) BroadcastCrossLinkFromShardsToBeacon() { // leader of 1-3 shards + if node.IsRunningBeaconChain() { + return + } curBlock := node.Blockchain().CurrentBlock() if curBlock == nil { return } + shardID := curBlock.ShardID() - if node.IsRunningBeaconChain() || - !node.Blockchain().Config().IsCrossLink(curBlock.Epoch()) { - // no need to broadcast crosslink if it's beacon chain or it's not crosslink epoch + if !node.Blockchain().Config().IsCrossLink(curBlock.Epoch()) { + // no need to broadcast crosslink if it's beacon chain, or it's not crosslink epoch return } @@ -189,27 +199,110 @@ func (node *Node) BroadcastCrossLink() { "Construct and Broadcasting new crosslink to beacon chain groupID %s", nodeconfig.NewGroupIDByShardID(shard.BeaconChainShardID), ) - headers := []*block.Header{} - lastLink, err := node.Beaconchain().ReadShardLastCrossLink(curBlock.ShardID()) + + headers, err := getCrosslinkHeadersForShards(node.Beaconchain(), node.Blockchain(), curBlock, shardID, &latestSentCrosslink) + if err != nil { + utils.Logger().Error().Err(err).Msg("[BroadcastCrossLink] failed to get crosslinks") + return + } + + if len(headers) == 0 { + utils.Logger().Info().Msg("[BroadcastCrossLink] no crosslinks to broadcast") + return + } + + node.host.SendMessageToGroups( + []nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(shard.BeaconChainShardID)}, + p2p.ConstructMessage( + proto_node.ConstructCrossLinkMessage(node.Consensus.Blockchain, headers)), + ) +} + +// BroadcastCrosslinkHeartbeatSignalFromBeaconToShards is called by consensus leader or 1% validators to +// send last cross link to shard chains. +func (node *Node) BroadcastCrosslinkHeartbeatSignalFromBeaconToShards() { // leader of 0 shard + if !node.IsRunningBeaconChain() { + return + } + if !(node.IsCurrentlyLeader() || rand.Intn(100) == 0) { + return + } + + curBlock := node.Beaconchain().CurrentBlock() + if curBlock == nil { + return + } + + if !node.Blockchain().Config().IsCrossLink(curBlock.Epoch()) { + // no need to broadcast crosslink if it's beacon chain, or it's not crosslink epoch + return + } + + var privToSing *bls.PrivateKeyWrapper + for _, priv := range node.Consensus.GetPrivateKeys() { + if node.Consensus.IsValidatorInCommittee(priv.Pub.Bytes) { + privToSing = &priv + break + } + } + + if privToSing == nil { + return + } + + for _, shardID := range []uint32{1, 2, 3} { + lastLink, err := node.Blockchain().ReadShardLastCrossLink(shardID) + if err != nil { + utils.Logger().Error().Err(err).Msg("[BroadcastCrossLinkSignal] failed to get crosslinks") + continue + } + + hb := types.CrosslinkHeartbeat{ + ShardID: lastLink.ShardID(), + LatestContinuousBlockNum: lastLink.BlockNum(), + Epoch: lastLink.Epoch().Uint64(), + PublicKey: privToSing.Pub.Bytes[:], + Signature: nil, + } + + rs, err := rlp.EncodeToBytes(hb) + if err != nil { + utils.Logger().Error().Err(err).Msg("[BroadcastCrossLinkSignal] failed to encode signal") + continue + } + hb.Signature = privToSing.Pri.SignHash(rs).Serialize() + bts := proto_node.ConstructCrossLinkHeartBeatMessage(hb) + node.host.SendMessageToGroups( + []nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(shardID))}, + p2p.ConstructMessage(bts), + ) + } +} + +// getCrosslinkHeadersForShards get headers required for crosslink creation. +func getCrosslinkHeadersForShards(beacon *core.BlockChain, shardChain *core.BlockChain, curBlock *types.Block, shardID uint32, latestSentCrosslink *uint64) ([]*block.Header, error) { + var headers []*block.Header + lastLink, err := beacon.ReadShardLastCrossLink(shardID) var latestBlockNum uint64 // TODO chao: record the missing crosslink in local database instead of using latest crosslink // if cannot find latest crosslink, broadcast latest 3 block headers if err != nil { utils.Logger().Debug().Err(err).Msg("[BroadcastCrossLink] ReadShardLastCrossLink Failed") - header := node.Blockchain().GetHeaderByNumber(curBlock.NumberU64() - 2) - if header != nil && node.Blockchain().Config().IsCrossLink(header.Epoch()) { + header := shardChain.GetHeaderByNumber(curBlock.NumberU64() - 2) + if header != nil && shardChain.Config().IsCrossLink(header.Epoch()) { headers = append(headers, header) } - header = node.Blockchain().GetHeaderByNumber(curBlock.NumberU64() - 1) - if header != nil && node.Blockchain().Config().IsCrossLink(header.Epoch()) { + header = shardChain.GetHeaderByNumber(curBlock.NumberU64() - 1) + if header != nil && shardChain.Config().IsCrossLink(header.Epoch()) { headers = append(headers, header) } headers = append(headers, curBlock.Header()) } else { latestBlockNum = lastLink.BlockNum() - if latestSentCrosslinkNum > latestBlockNum && latestSentCrosslinkNum <= latestBlockNum+crossLinkBatchSize*6 { - latestBlockNum = latestSentCrosslinkNum + latest := atomic.LoadUint64(latestSentCrosslink) + if latest > latestBlockNum && latest <= latestBlockNum+crossLinkBatchSize*6 { + latestBlockNum = latest } batchSize := crossLinkBatchSize @@ -224,8 +317,8 @@ func (node *Node) BroadcastCrossLink() { } for blockNum := latestBlockNum + 1; blockNum <= curBlock.NumberU64(); blockNum++ { - header := node.Blockchain().GetHeaderByNumber(blockNum) - if header != nil && node.Blockchain().Config().IsCrossLink(header.Epoch()) { + header := shardChain.GetHeaderByNumber(blockNum) + if header != nil && shardChain.Config().IsCrossLink(header.Epoch()) { headers = append(headers, header) if len(headers) == batchSize { break @@ -241,18 +334,14 @@ func (node *Node) BroadcastCrossLink() { header.Number().Uint64(), ) if i == len(headers)-1 { - latestSentCrosslinkNum = header.Number().Uint64() + atomic.StoreUint64(latestSentCrosslink, header.Number().Uint64()) } } - node.host.SendMessageToGroups( - []nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(shard.BeaconChainShardID)}, - p2p.ConstructMessage( - proto_node.ConstructCrossLinkMessage(node.Consensus.Blockchain, headers)), - ) + return headers, nil } // VerifyNewBlock is called by consensus participants to verify the block (account model) they are -// running consensus on +// running consensus on. func (node *Node) VerifyNewBlock(newBlock *types.Block) error { if newBlock == nil || newBlock.Header() == nil { return errors.New("nil header or block asked to verify") diff --git a/node/node_syncing.go b/node/node_syncing.go index bc0b3e71f..3ebb0863d 100644 --- a/node/node_syncing.go +++ b/node/node_syncing.go @@ -52,7 +52,7 @@ func (node *Node) BeaconSyncHook() { if node.Consensus.IsLeader() || rand.Intn(100) == 0 { // TODO: Instead of leader, it would better be validator do this broadcast since leader do // not have much idle resources. - node.BroadcastCrossLink() + node.BroadcastCrossLinkFromShardsToBeacon() } } @@ -235,7 +235,7 @@ func (node *Node) doBeaconSyncing() { } else if node.Consensus.IsLeader() || rand.Intn(100) == 0 { // Only leader or 1% of validators broadcast crosslink to avoid spamming p2p if beaconBlock.NumberU64() == node.Beaconchain().CurrentBlock().NumberU64() { - node.BroadcastCrossLink() + node.BroadcastCrossLinkFromShardsToBeacon() } } }