package node import ( "github.com/ethereum/go-ethereum/rlp" proto_node "github.com/harmony-one/harmony/api/proto/node" "github.com/harmony-one/harmony/block" "github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/types" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/shard" ) // BroadcastCXReceipts broadcasts cross shard receipts to correspoding // destination shards func BroadcastCXReceipts(newBlock *types.Block, consensus *consensus.Consensus) { commitSigAndBitmap := newBlock.GetCurrentCommitSig() //#### Read payload data from committed msg if len(commitSigAndBitmap) <= 96 { utils.Logger().Debug().Int("commitSigAndBitmapLen", len(commitSigAndBitmap)).Msg("[BroadcastCXReceipts] commitSigAndBitmap Not Enough Length") return } commitSig := make([]byte, 96) commitBitmap := make([]byte, len(commitSigAndBitmap)-96) offset := 0 copy(commitSig[:], commitSigAndBitmap[offset:offset+96]) offset += 96 copy(commitBitmap[:], commitSigAndBitmap[offset:]) //#### END Read payload data from committed msg epoch := newBlock.Header().Epoch() shardingConfig := shard.Schedule.InstanceForEpoch(epoch) shardNum := int(shardingConfig.NumShards()) myShardID := consensus.ShardID utils.Logger().Info().Int("shardNum", shardNum).Uint32("myShardID", myShardID).Uint64("blockNum", newBlock.NumberU64()).Msg("[BroadcastCXReceipts]") for i := 0; i < shardNum; i++ { if i == int(myShardID) { continue } BroadcastCXReceiptsWithShardID(newBlock.Header(), commitSig, commitBitmap, uint32(i), consensus) } } // BroadcastCXReceiptsWithShardID broadcasts cross shard receipts to given ToShardID func BroadcastCXReceiptsWithShardID(block *block.Header, commitSig []byte, commitBitmap []byte, toShardID uint32, consensus *consensus.Consensus) { myShardID := consensus.ShardID utils.Logger().Debug(). Uint32("toShardID", toShardID). Uint32("myShardID", myShardID). Uint64("blockNum", block.NumberU64()). Msg("[BroadcastCXReceiptsWithShardID]") cxReceipts, err := consensus.Blockchain().ReadCXReceipts(toShardID, block.NumberU64(), block.Hash()) if err != nil || len(cxReceipts) == 0 { utils.Logger().Debug().Uint32("ToShardID", toShardID). Int("numCXReceipts", len(cxReceipts)). Msg("[CXMerkleProof] No receipts found for the destination shard") return } merkleProof, err := consensus.Blockchain().CXMerkleProof(toShardID, block) if err != nil { utils.Logger().Warn(). Uint32("ToShardID", toShardID). Msg("[BroadcastCXReceiptsWithShardID] Unable to get merkleProof") return } cxReceiptsProof := &types.CXReceiptsProof{ Receipts: cxReceipts, MerkleProof: merkleProof, Header: block, CommitSig: commitSig, CommitBitmap: commitBitmap, } groupID := nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(toShardID)) utils.Logger().Info().Uint32("ToShardID", toShardID). Str("GroupID", string(groupID)). Interface("cxp", cxReceiptsProof). Msg("[BroadcastCXReceiptsWithShardID] ReadCXReceipts and MerkleProof ready. Sending CX receipts...") // TODO ek – limit concurrency go consensus.GetHost().SendMessageToGroups([]nodeconfig.GroupID{groupID}, p2p.ConstructMessage(proto_node.ConstructCXReceiptsProof(cxReceiptsProof)), ) } // BroadcastMissingCXReceipts broadcasts missing cross shard receipts per request func BroadcastMissingCXReceipts(c *consensus.Consensus) { var ( sendNextTime = make([]core.CxEntry, 0) cxPool = c.Registry().GetCxPool() blockchain = c.Blockchain() ) it := cxPool.Pool().Iterator() for entry := range it.C { cxEntry := entry.(core.CxEntry) toShardID := cxEntry.ToShardID blk := blockchain.GetBlockByHash(cxEntry.BlockHash) if blk == nil { continue } blockNum := blk.NumberU64() nextHeader := blockchain.GetHeaderByNumber(blockNum + 1) if nextHeader == nil { sendNextTime = append(sendNextTime, cxEntry) continue } sig := nextHeader.LastCommitSignature() bitmap := nextHeader.LastCommitBitmap() BroadcastCXReceiptsWithShardID(blk.Header(), sig[:], bitmap, toShardID, c) } cxPool.Clear() // this should not happen or maybe happen for impatient user for _, entry := range sendNextTime { cxPool.Add(entry) } } // ProcessReceiptMessage store the receipts and merkle proof in local data store func (node *Node) ProcessReceiptMessage(msgPayload []byte) { cxp := types.CXReceiptsProof{} if err := rlp.DecodeBytes(msgPayload, &cxp); err != nil { utils.Logger().Error().Err(err). Msg("[ProcessReceiptMessage] Unable to Decode message Payload") return } utils.Logger().Debug().Interface("cxp", cxp). Msg("[ProcessReceiptMessage] Add CXReceiptsProof to pending Receipts") // TODO: integrate with txpool node.AddPendingReceipts(&cxp) }