package node import ( "encoding/binary" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rlp" "github.com/harmony-one/bls/ffi/go/bls" proto_node "github.com/harmony-one/harmony/api/proto/node" "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/types" bls_cosi "github.com/harmony-one/harmony/crypto/bls" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/internal/ctxerror" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/p2p/host" "github.com/harmony-one/harmony/shard" ) // BroadcastCXReceipts broadcasts cross shard receipts to correspoding // destination shards func (node *Node) BroadcastCXReceipts(newBlock *types.Block, lastCommits []byte) { //#### Read payload data from committed msg if len(lastCommits) <= 96 { utils.Logger().Debug().Int("lastCommitsLen", len(lastCommits)).Msg("[BroadcastCXReceipts] lastCommits Not Enough Length") } commitSig := make([]byte, 96) commitBitmap := make([]byte, len(lastCommits)-96) offset := 0 copy(commitSig[:], lastCommits[offset:offset+96]) offset += 96 copy(commitBitmap[:], lastCommits[offset:]) //#### END Read payload data from committed msg epoch := newBlock.Header().Epoch() shardingConfig := shard.Schedule.InstanceForEpoch(epoch) shardNum := int(shardingConfig.NumShards()) myShardID := node.Consensus.ShardID utils.Logger().Info().Int("shardNum", shardNum).Uint32("myShardID", myShardID).Uint64("blockNum", newBlock.NumberU64()).Msg("[BroadcastCXReceipts]") for i := 0; i < shardNum; i++ { if i == int(myShardID) { continue } go node.BroadcastCXReceiptsWithShardID(newBlock, commitSig, commitBitmap, uint32(i)) } } // BroadcastCXReceiptsWithShardID broadcasts cross shard receipts to given ToShardID func (node *Node) BroadcastCXReceiptsWithShardID(block *types.Block, commitSig []byte, commitBitmap []byte, toShardID uint32) { myShardID := node.Consensus.ShardID utils.Logger().Info().Uint32("toShardID", toShardID).Uint32("myShardID", myShardID).Uint64("blockNum", block.NumberU64()).Msg("[BroadcastCXReceiptsWithShardID]") cxReceipts, err := node.Blockchain().ReadCXReceipts(toShardID, block.NumberU64(), block.Hash()) if err != nil || len(cxReceipts) == 0 { utils.Logger().Info().Err(err).Uint32("ToShardID", toShardID).Int("numCXReceipts", len(cxReceipts)).Msg("[BroadcastCXReceiptsWithShardID] No ReadCXReceipts found") return } merkleProof, err := node.Blockchain().CXMerkleProof(toShardID, block) if err != nil { utils.Logger().Warn().Uint32("ToShardID", toShardID).Msg("[BroadcastCXReceiptsWithShardID] Unable to get merkleProof") return } groupID := nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(toShardID)) utils.Logger().Info().Uint32("ToShardID", toShardID).Str("GroupID", string(groupID)).Msg("[BroadcastCXReceiptsWithShardID] ReadCXReceipts and MerkleProof Found") // TODO ek – limit concurrency go node.host.SendMessageToGroups([]nodeconfig.GroupID{groupID}, host.ConstructP2pMessage(byte(0), proto_node.ConstructCXReceiptsProof(cxReceipts, merkleProof, block.Header(), commitSig, commitBitmap))) } // BroadcastMissingCXReceipts broadcasts missing cross shard receipts per request func (node *Node) BroadcastMissingCXReceipts() { sendNextTime := []core.CxEntry{} it := node.CxPool.Pool().Iterator() for entry := range it.C { cxEntry := entry.(core.CxEntry) toShardID := cxEntry.ToShardID blk := node.Blockchain().GetBlockByHash(cxEntry.BlockHash) if blk == nil { continue } blockNum := blk.NumberU64() nextHeader := node.Blockchain().GetHeaderByNumber(blockNum + 1) if nextHeader == nil { sendNextTime = append(sendNextTime, cxEntry) continue } sig := nextHeader.LastCommitSignature() bitmap := nextHeader.LastCommitBitmap() node.BroadcastCXReceiptsWithShardID(blk, sig[:], bitmap, toShardID) } node.CxPool.Clear() // this should not happen or maybe happen for impatient user for _, entry := range sendNextTime { node.CxPool.Add(entry) } } // VerifyBlockCrossLinks verifies the cross links of the block func (node *Node) VerifyBlockCrossLinks(block *types.Block) error { if len(block.Header().CrossLinks()) == 0 { utils.Logger().Debug().Msgf("[CrossLinkVerification] Zero CrossLinks in the header") return nil } crossLinks := &types.CrossLinks{} err := rlp.DecodeBytes(block.Header().CrossLinks(), crossLinks) if err != nil { return ctxerror.New("[CrossLinkVerification] failed to decode cross links", "blockHash", block.Hash(), "crossLinks", len(block.Header().CrossLinks()), ).WithCause(err) } if !crossLinks.IsSorted() { return ctxerror.New("[CrossLinkVerification] cross links are not sorted", "blockHash", block.Hash(), "crossLinks", len(block.Header().CrossLinks()), ) } for _, crossLink := range *crossLinks { cl, err := node.Blockchain().ReadCrossLink(crossLink.ShardID(), crossLink.BlockNum()) if err == nil && cl != nil { // Add slash for exist same blocknum but different crosslink return ctxerror.New("crosslink already exist!") } if err = node.VerifyCrossLink(crossLink); err != nil { return ctxerror.New("cannot VerifyBlockCrossLinks", "blockHash", block.Hash(), "blockNum", block.Number(), "crossLinkShard", crossLink.ShardID(), "crossLinkBlock", crossLink.BlockNum(), "numTx", len(block.Transactions()), ).WithCause(err) } } return nil } // ProcessCrossLinkMessage verify and process Node/CrossLink message into crosslink when it's valid func (node *Node) ProcessCrossLinkMessage(msgPayload []byte) { // TODO: non-leader in beaconchain doesn't need process crosslink message, but still need to monitor leader's behavior if node.NodeConfig.ShardID == 0 { utils.Logger().Debug().Msgf("[ProcessingCrossLink] leader is processing...") var crosslinks []types.CrossLink err := rlp.DecodeBytes(msgPayload, &crosslinks) if err != nil { utils.Logger().Error(). Err(err). Msg("[ProcessingCrossLink] Crosslink Message Broadcast Unable to Decode") return } candidates := []types.CrossLink{} utils.Logger().Debug(). Msgf("[ProcessingCrossLink] Crosslink going to propose: %d", len(crosslinks)) for _, cl := range crosslinks { exist, err := node.Blockchain().ReadCrossLink(cl.ShardID(), cl.Number().Uint64()) if err == nil && exist != nil { // TODO: leader add double sign checking utils.Logger().Err(err). Msgf("[ProcessingCrossLink] Cross Link already exists, pass. Block num: %d, shardID %d", cl.Number(), cl.ShardID()) continue } if err = node.VerifyCrossLink(cl); err != nil { utils.Logger().Err(err). Msgf("[ProcessingCrossLink] Failed to verify new cross link for blockNum %d epochNum %d shard %d skipped: %v", cl.BlockNum(), cl.Epoch().Uint64(), cl.ShardID(), cl) continue } candidates = append(candidates, cl) utils.Logger().Debug(). Msgf("[ProcessingCrossLink] Committing for shardID %d, blockNum %d", cl.ShardID(), cl.Number().Uint64()) } node.pendingCLMutex.Lock() Len, _ := node.Blockchain().AddPendingCrossLinks(candidates) node.pendingCLMutex.Unlock() utils.Logger().Debug(). Msgf("[ProcessingCrossLink] Add pending crosslinks, total pending: %d", Len) } } func (node *Node) verifyIncomingReceipts(block *types.Block) error { m := make(map[common.Hash]struct{}) cxps := block.IncomingReceipts() for _, cxp := range cxps { // double spent if node.Blockchain().IsSpent(cxp) { return ctxerror.New("[verifyIncomingReceipts] Double Spent!") } hash := cxp.MerkleProof.BlockHash // duplicated receipts if _, ok := m[hash]; ok { return ctxerror.New("[verifyIncomingReceipts] Double Spent!") } m[hash] = struct{}{} for _, item := range cxp.Receipts { if item.ToShardID != node.Blockchain().ShardID() { return ctxerror.New("[verifyIncomingReceipts] Invalid ToShardID", "myShardID", node.Blockchain().ShardID(), "expectShardID", item.ToShardID) } } if err := node.Blockchain().Validator().ValidateCXReceiptsProof(cxp); err != nil { return ctxerror.New("[verifyIncomingReceipts] verification failed").WithCause(err) } } incomingReceiptHash := types.EmptyRootHash if len(cxps) > 0 { incomingReceiptHash = types.DeriveSha(cxps) } if incomingReceiptHash != block.Header().IncomingReceiptHash() { return ctxerror.New("[verifyIncomingReceipts] Invalid IncomingReceiptHash in block header") } return nil } // VerifyCrossLink verifies the header is valid func (node *Node) VerifyCrossLink(cl types.CrossLink) error { if node.Blockchain().ShardID() != shard.BeaconChainShardID { return ctxerror.New("Shard chains should not verify cross links") } if cl.BlockNum() <= 1 { return ctxerror.New("CrossLink BlockNumber should greater than 1") } if !node.Blockchain().Config().IsCrossLink(cl.Epoch()) { return ctxerror.New("CrossLink Epoch should >= cross link starting epoch", "crossLinkEpoch", cl.Epoch(), "cross_link_starting_eoch", node.Blockchain().Config().CrossLinkEpoch) } // Verify signature of the new cross link header // TODO: check whether to recalculate shard state shardState, err := node.Blockchain().ReadShardState(cl.Epoch()) committee := shardState.FindCommitteeByID(cl.ShardID()) if err != nil || committee == nil { return ctxerror.New("[CrossLink] Failed to read shard state for cross link", "shardID", cl.ShardID(), "blockNum", cl.BlockNum()).WithCause(err) } var committerKeys []*bls.PublicKey parseKeysSuccess := true for _, member := range committee.Slots { committerKey := new(bls.PublicKey) err = member.BlsPublicKey.ToLibBLSPublicKey(committerKey) if err != nil { parseKeysSuccess = false break } committerKeys = append(committerKeys, committerKey) } if !parseKeysSuccess { return ctxerror.New("[CrossLink] cannot convert BLS public key", "shardID", cl.ShardID(), "blockNum", cl.BlockNum()).WithCause(err) } mask, err := bls_cosi.NewMask(committerKeys, nil) if err != nil { return ctxerror.New("cannot create group sig mask", "shardID", cl.ShardID(), "blockNum", cl.BlockNum()).WithCause(err) } if err := mask.SetMask(cl.Bitmap()); err != nil { return ctxerror.New("cannot set group sig mask bits", "shardID", cl.ShardID(), "blockNum", cl.BlockNum()).WithCause(err) } aggSig := bls.Sign{} sig := cl.Signature() err = aggSig.Deserialize(sig[:]) if err != nil { return ctxerror.New("unable to deserialize multi-signature from payload").WithCause(err) } hash := cl.Hash() blockNumBytes := make([]byte, 8) binary.LittleEndian.PutUint64(blockNumBytes, cl.BlockNum()) commitPayload := append(blockNumBytes, hash[:]...) if !aggSig.VerifyHash(mask.AggregatePublic, commitPayload) { return ctxerror.New("Failed to verify the signature for cross link", "shardID", cl.ShardID(), "blockNum", cl.BlockNum()) } return nil } // ProcessReceiptMessage store the receipts and merkle proof in local data store func (node *Node) ProcessReceiptMessage(msgPayload []byte) { cxp := types.CXReceiptsProof{} if err := rlp.DecodeBytes(msgPayload, &cxp); err != nil { utils.Logger().Error().Err(err).Msg("[ProcessReceiptMessage] Unable to Decode message Payload") return } utils.Logger().Debug().Interface("cxp", cxp).Msg("[ProcessReceiptMessage] Add CXReceiptsProof to pending Receipts") // TODO: integrate with txpool node.AddPendingReceipts(&cxp) }