From c78b8fa352229e202d0c334f03587b4491cc267e Mon Sep 17 00:00:00 2001 From: chao Date: Wed, 11 Sep 2019 12:48:31 -0700 Subject: [PATCH 1/4] add http rpc call to resend missing cross shard receipt --- core/blockchain.go | 6 +++++ core/cx_pool.go | 46 +++++++++++++++++++++++++++++++++++ hmy/api_backend.go | 11 +++++++++ hmy/backend.go | 7 +++++- internal/hmyapi/backend.go | 3 +++ internal/hmyapi/blockchain.go | 6 +++++ node/node.go | 3 +++ node/node_cross_shard.go | 27 ++++++++++++++++++++ node/node_handler.go | 2 ++ node/rpc.go | 2 +- 10 files changed, 111 insertions(+), 2 deletions(-) create mode 100644 core/cx_pool.go diff --git a/core/blockchain.go b/core/blockchain.go index 81c5a0761..c63a6e6ac 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2237,3 +2237,9 @@ func (bc *BlockChain) UpdateCXReceiptsCheckpointsByBlock(block *types.Block) { bc.updateCXReceiptsCheckpoints(k, v) } } + +// ReadTxLookupEntry returns the corresponding blockHash and blockNum where transaction locates +func (bc *BlockChain) ReadTxLookupEntry(txID common.Hash) (common.Hash, uint64) { + blockHash, blockNum, _ := rawdb.ReadTxLookupEntry(bc.db, txID) + return blockHash, blockNum +} diff --git a/core/cx_pool.go b/core/cx_pool.go new file mode 100644 index 000000000..0efcdd2e6 --- /dev/null +++ b/core/cx_pool.go @@ -0,0 +1,46 @@ +package core + +import ( + mapset "github.com/deckarep/golang-set" + + "github.com/ethereum/go-ethereum/common" +) + +const ( + CxPoolSize = 50 +) + +type CxPool struct { + pool mapset.Set + maxSize int +} + +func NewCxPool(limit int) *CxPool { + pool := mapset.NewSet() + cxPool := CxPool{pool: pool, maxSize: limit} + return &cxPool +} + +// Pool returns the pool of blockHashes of missing receipts +func (cxPool *CxPool) Pool() mapset.Set { + return cxPool.pool +} + +// Size return size of the pool +func (cxPool *CxPool) Size() int { + return cxPool.pool.Cardinality() +} + +// Add add element into the pool if not exceed limit +func (cxPool *CxPool) Add(hash common.Hash) bool { + if cxPool.Size() > cxPool.maxSize { + return false + } + cxPool.pool.Add(hash) + return true +} + +// Clear empty the pool +func (cxPool *CxPool) Clear() { + cxPool.pool.Clear() +} diff --git a/hmy/api_backend.go b/hmy/api_backend.go index a03901235..5f448dc03 100644 --- a/hmy/api_backend.go +++ b/hmy/api_backend.go @@ -232,3 +232,14 @@ func (b *APIBackend) RPCGasCap() *big.Int { func (b *APIBackend) GetShardID() uint32 { return b.hmy.shardID } + +// AddBlockHashToCxPool retrieve blockHash from txID and add blockHash to CxPool for resending +func (b *APIBackend) AddBlockHashToCxPool(ctx context.Context, txID common.Hash) (uint64, bool) { + blockHash, blockNum := b.hmy.BlockChain().ReadTxLookupEntry(txID) + blk := b.hmy.BlockChain().GetBlockByHash(blockHash) + if blk == nil { + return 0, false + } + success := b.hmy.CxPool().Add(blockHash) + return blockNum, success +} diff --git a/hmy/backend.go b/hmy/backend.go index c6a933b61..6d94225f8 100644 --- a/hmy/backend.go +++ b/hmy/backend.go @@ -21,6 +21,7 @@ type Harmony struct { blockchain *core.BlockChain txPool *core.TxPool + cxPool *core.CxPool accountManager *accounts.Manager eventMux *event.TypeMux // DB interfaces @@ -51,13 +52,14 @@ type NodeAPI interface { // New creates a new Harmony object (including the // initialisation of the common Harmony object) -func New(nodeAPI NodeAPI, txPool *core.TxPool, eventMux *event.TypeMux, shardID uint32) (*Harmony, error) { +func New(nodeAPI NodeAPI, txPool *core.TxPool, cxPool *core.CxPool, eventMux *event.TypeMux, shardID uint32) (*Harmony, error) { chainDb := nodeAPI.Blockchain().ChainDB() hmy := &Harmony{ shutdownChan: make(chan bool), bloomRequests: make(chan chan *bloombits.Retrieval), blockchain: nodeAPI.Blockchain(), txPool: txPool, + cxPool: cxPool, accountManager: nodeAPI.AccountManager(), eventMux: eventMux, chainDb: chainDb, @@ -75,6 +77,9 @@ func New(nodeAPI NodeAPI, txPool *core.TxPool, eventMux *event.TypeMux, shardID // TxPool ... func (s *Harmony) TxPool() *core.TxPool { return s.txPool } +// CxPool ... +func (s *Harmony) CxPool() *core.CxPool { return s.cxPool } + // BlockChain ... func (s *Harmony) BlockChain() *core.BlockChain { return s.blockchain } diff --git a/internal/hmyapi/backend.go b/internal/hmyapi/backend.go index d6adb727d..44093dd9d 100644 --- a/internal/hmyapi/backend.go +++ b/internal/hmyapi/backend.go @@ -64,6 +64,9 @@ type Backend interface { // Get balance GetBalance(address common.Address) (*hexutil.Big, error) GetShardID() uint32 + + // retrieve the blockHash using txID and add blockHash to CxPool for resending + AddBlockHashToCxPool(ctx context.Context, txID common.Hash) (uint64, bool) } // GetAPIs returns all the APIs. diff --git a/internal/hmyapi/blockchain.go b/internal/hmyapi/blockchain.go index f123dfa32..5f420fd0f 100644 --- a/internal/hmyapi/blockchain.go +++ b/internal/hmyapi/blockchain.go @@ -112,6 +112,12 @@ func (s *PublicBlockChainAPI) BlockNumber() hexutil.Uint64 { return hexutil.Uint64(header.Number().Uint64()) } +// ResendTx retrieves blockHash by TxID and add blockHash to CxPool for resending +func (s *PublicBlockChainAPI) ResendTx(ctx context.Context, txID common.Hash) (bool, error) { + _, success := s.b.AddBlockHashToCxPool(ctx, txID) + return success, nil +} + // Call executes the given transaction on the state for the given block number. // It doesn't make and changes in the state/blockchain and is useful to execute and retrieve values. func (s *PublicBlockChainAPI) Call(ctx context.Context, args CallArgs, blockNr rpc.BlockNumber) (hexutil.Bytes, error) { diff --git a/node/node.go b/node/node.go index 51281f4e0..b90d2bcf3 100644 --- a/node/node.go +++ b/node/node.go @@ -120,6 +120,8 @@ type Node struct { TxPool *core.TxPool // TODO migrate to TxPool from pendingTransactions list below + CxPool *core.CxPool // pool for missing cross shard receipts resend + pendingTransactions types.Transactions // All the transactions received but not yet processed for Consensus pendingTxMutex sync.Mutex recentTxsStats types.RecentTxsStats @@ -391,6 +393,7 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, chainDBFactory shardc node.BeaconBlockChannel = make(chan *types.Block) node.recentTxsStats = make(types.RecentTxsStats) node.TxPool = core.NewTxPool(core.DefaultTxPoolConfig, node.Blockchain().Config(), blockchain) + node.CxPool = core.NewCxPool(core.CxPoolSize) node.Worker = worker.New(node.Blockchain().Config(), blockchain, chain.Engine, node.Consensus.ShardID) if node.Blockchain().ShardID() != 0 { node.BeaconWorker = worker.New(node.Beaconchain().Config(), beaconChain, chain.Engine, node.Consensus.ShardID) diff --git a/node/node_cross_shard.go b/node/node_cross_shard.go index f8076d258..872111362 100644 --- a/node/node_cross_shard.go +++ b/node/node_cross_shard.go @@ -64,6 +64,33 @@ func (node *Node) BroadcastCXReceipts(newBlock *types.Block, lastCommits []byte) } } +// BroadcastMissingCXReceipts broadcasts missing cross shard receipts per request +func (node *Node) BroadcastMissingCXReceipts() { + sendNextTime := []common.Hash{} + it := node.CxPool.Pool().Iterator() + for blockHash := range it.C { + blk := node.Blockchain().GetBlockByHash(blockHash.(common.Hash)) + if blk == nil { + continue + } + blockNum := blk.NumberU64() + nextBlk := node.Blockchain().GetBlockByNumber(blockNum + 1) + if nextBlk == nil { + sendNextTime = append(sendNextTime, blockHash.(common.Hash)) + continue + } + sig := nextBlk.Header().LastCommitSignature() + bitmap := nextBlk.Header().LastCommitBitmap() + lastCommits := append(sig[:], bitmap...) + go node.BroadcastCXReceipts(blk, lastCommits) + } + node.CxPool.Clear() + // this should not happen or maybe happen for impatient user + for _, blockHash := range sendNextTime { + node.CxPool.Add(blockHash) + } +} + // VerifyBlockCrossLinks verifies the cross links of the block func (node *Node) VerifyBlockCrossLinks(block *types.Block) error { if len(block.Header().CrossLinks()) == 0 { diff --git a/node/node_handler.go b/node/node_handler.go index 556478e46..d53a9428a 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -392,6 +392,8 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block, commitSigAndBit } } + node.BroadcastMissingCXReceipts() + // TODO chao: uncomment this after beacon syncing is stable // node.Blockchain().UpdateCXReceiptsCheckpointsByBlock(newBlock) diff --git a/node/rpc.go b/node/rpc.go index c5c593386..2a7875e1a 100644 --- a/node/rpc.go +++ b/node/rpc.go @@ -46,7 +46,7 @@ var ( // StartRPC start RPC service func (node *Node) StartRPC(nodePort string) error { // Gather all the possible APIs to surface - harmony, _ = hmy.New(node, node.TxPool, new(event.TypeMux), node.Consensus.ShardID) + harmony, _ = hmy.New(node, node.TxPool, node.CxPool, new(event.TypeMux), node.Consensus.ShardID) apis := node.APIs() From 00c2356b5ed951fc0f3246f7973b96cc58bbbe1e Mon Sep 17 00:00:00 2001 From: chao Date: Wed, 11 Sep 2019 19:54:16 -0700 Subject: [PATCH 2/4] add tx type check; improve several efficiency issues --- core/blockchain.go | 9 +++--- core/cx_pool.go | 13 ++++++-- hmy/api_backend.go | 18 ++++++++--- hmy/backend.go | 2 +- internal/hmyapi/backend.go | 2 +- internal/hmyapi/blockchain.go | 10 ++++-- node/node_cross_shard.go | 60 ++++++++++++++++++++++++++++------- 7 files changed, 88 insertions(+), 26 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index c63a6e6ac..abf279c50 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2238,8 +2238,9 @@ func (bc *BlockChain) UpdateCXReceiptsCheckpointsByBlock(block *types.Block) { } } -// ReadTxLookupEntry returns the corresponding blockHash and blockNum where transaction locates -func (bc *BlockChain) ReadTxLookupEntry(txID common.Hash) (common.Hash, uint64) { - blockHash, blockNum, _ := rawdb.ReadTxLookupEntry(bc.db, txID) - return blockHash, blockNum +// ReadTxLookupEntry returns where the given transaction resides in the chain, +// as a (block hash, block number, index in transaction list) triple. +// returns 0, 0 if not found +func (bc *BlockChain) ReadTxLookupEntry(txID common.Hash) (common.Hash, uint64, uint64) { + return rawdb.ReadTxLookupEntry(bc.db, txID) } diff --git a/core/cx_pool.go b/core/cx_pool.go index 0efcdd2e6..76f83b75a 100644 --- a/core/cx_pool.go +++ b/core/cx_pool.go @@ -7,14 +7,23 @@ import ( ) const ( + // CxPoolSize is the maximum size of the pool CxPoolSize = 50 ) +// CxEntry represents the egress receipt's blockHash and ToShardID +type CxEntry struct { + BlockHash common.Hash + ToShardID uint32 +} + +// CxPool is to hold a pool of block outgoing receipts to be resend in next round broadcast type CxPool struct { pool mapset.Set maxSize int } +// NewCxPool creates a new CxPool func NewCxPool(limit int) *CxPool { pool := mapset.NewSet() cxPool := CxPool{pool: pool, maxSize: limit} @@ -32,11 +41,11 @@ func (cxPool *CxPool) Size() int { } // Add add element into the pool if not exceed limit -func (cxPool *CxPool) Add(hash common.Hash) bool { +func (cxPool *CxPool) Add(entry CxEntry) bool { if cxPool.Size() > cxPool.maxSize { return false } - cxPool.pool.Add(hash) + cxPool.pool.Add(entry) return true } diff --git a/hmy/api_backend.go b/hmy/api_backend.go index 5f448dc03..bc5a3bfa8 100644 --- a/hmy/api_backend.go +++ b/hmy/api_backend.go @@ -233,13 +233,23 @@ func (b *APIBackend) GetShardID() uint32 { return b.hmy.shardID } -// AddBlockHashToCxPool retrieve blockHash from txID and add blockHash to CxPool for resending -func (b *APIBackend) AddBlockHashToCxPool(ctx context.Context, txID common.Hash) (uint64, bool) { - blockHash, blockNum := b.hmy.BlockChain().ReadTxLookupEntry(txID) +// ResendCx retrieve blockHash from txID and add blockHash to CxPool for resending +func (b *APIBackend) ResendCx(ctx context.Context, txID common.Hash) (uint64, bool) { + blockHash, blockNum, index := b.hmy.BlockChain().ReadTxLookupEntry(txID) blk := b.hmy.BlockChain().GetBlockByHash(blockHash) if blk == nil { return 0, false } - success := b.hmy.CxPool().Add(blockHash) + txs := blk.Transactions() + // a valid index is from 0 to len-1 + if int(index) > len(txs)-1 { + return 0, false + } + tx := txs[int(index)] + if tx.ShardID() == tx.ToShardID() || blk.Header().ShardID() != tx.ShardID() { + return 0, false + } + entry := core.CxEntry{blockHash, tx.ToShardID()} + success := b.hmy.CxPool().Add(entry) return blockNum, success } diff --git a/hmy/backend.go b/hmy/backend.go index 6d94225f8..fab35c7aa 100644 --- a/hmy/backend.go +++ b/hmy/backend.go @@ -77,7 +77,7 @@ func New(nodeAPI NodeAPI, txPool *core.TxPool, cxPool *core.CxPool, eventMux *ev // TxPool ... func (s *Harmony) TxPool() *core.TxPool { return s.txPool } -// CxPool ... +// CxPool is used to store the blockHashes, where the corresponding block contains the cross shard receipts to be sent func (s *Harmony) CxPool() *core.CxPool { return s.cxPool } // BlockChain ... diff --git a/internal/hmyapi/backend.go b/internal/hmyapi/backend.go index 44093dd9d..360defc49 100644 --- a/internal/hmyapi/backend.go +++ b/internal/hmyapi/backend.go @@ -66,7 +66,7 @@ type Backend interface { GetShardID() uint32 // retrieve the blockHash using txID and add blockHash to CxPool for resending - AddBlockHashToCxPool(ctx context.Context, txID common.Hash) (uint64, bool) + ResendCx(ctx context.Context, txID common.Hash) (uint64, bool) } // GetAPIs returns all the APIs. diff --git a/internal/hmyapi/blockchain.go b/internal/hmyapi/blockchain.go index 5f420fd0f..6a0a23704 100644 --- a/internal/hmyapi/blockchain.go +++ b/internal/hmyapi/blockchain.go @@ -112,9 +112,13 @@ func (s *PublicBlockChainAPI) BlockNumber() hexutil.Uint64 { return hexutil.Uint64(header.Number().Uint64()) } -// ResendTx retrieves blockHash by TxID and add blockHash to CxPool for resending -func (s *PublicBlockChainAPI) ResendTx(ctx context.Context, txID common.Hash) (bool, error) { - _, success := s.b.AddBlockHashToCxPool(ctx, txID) +// ResendCx requests that the egress receipt for the given cross-shard +// transaction be sent to the destination shard for credit. This is used for +// unblocking a half-complete cross-shard transaction whose fund has been +// withdrawn already from the source shard but not credited yet in the +// destination account due to transient failures. +func (s *PublicBlockChainAPI) ResendCx(ctx context.Context, txID common.Hash) (bool, error) { + _, success := s.b.ResendCx(ctx, txID) return success, nil } diff --git a/node/node_cross_shard.go b/node/node_cross_shard.go index 872111362..5a1eef548 100644 --- a/node/node_cross_shard.go +++ b/node/node_cross_shard.go @@ -64,30 +64,68 @@ func (node *Node) BroadcastCXReceipts(newBlock *types.Block, lastCommits []byte) } } +// BroadcastCXReceiptsWithShardID broadcasts cross shard receipts to given ToShardID +func (node *Node) BroadcastCXReceiptsWithShardID(newBlock *types.Block, lastCommits []byte, toShardID uint32) { + //#### Read payload data from committed msg + if len(lastCommits) <= 96 { + utils.Logger().Debug().Int("lastCommitsLen", len(lastCommits)).Msg("[BroadcastCXReceipts] lastCommits Not Enough Length") + } + commitSig := make([]byte, 96) + commitBitmap := make([]byte, len(lastCommits)-96) + offset := 0 + copy(commitSig[:], lastCommits[offset:offset+96]) + offset += 96 + copy(commitBitmap[:], lastCommits[offset:]) + //#### END Read payload data from committed msg + + epoch := newBlock.Header().Epoch() + shardingConfig := core.ShardingSchedule.InstanceForEpoch(epoch) + shardNum := int(shardingConfig.NumShards()) + myShardID := node.Consensus.ShardID + utils.Logger().Info().Int("shardNum", shardNum).Uint32("myShardID", myShardID).Uint64("blockNum", newBlock.NumberU64()).Msg("[BroadcastCXReceipts]") + + cxReceipts, err := node.Blockchain().ReadCXReceipts(toShardID, newBlock.NumberU64(), newBlock.Hash(), false) + if err != nil || len(cxReceipts) == 0 { + utils.Logger().Warn().Err(err).Uint32("ToShardID", toShardID).Int("numCXReceipts", len(cxReceipts)).Msg("[BroadcastCXReceipts] No ReadCXReceipts found") + return + } + merkleProof, err := node.Blockchain().CXMerkleProof(toShardID, newBlock) + if err != nil { + utils.Logger().Warn().Uint32("ToShardID", toShardID).Msg("[BroadcastCXReceipts] Unable to get merkleProof") + return + } + utils.Logger().Info().Uint32("ToShardID", toShardID).Msg("[BroadcastCXReceipts] ReadCXReceipts and MerkleProof Found") + + groupID := p2p.ShardID(toShardID) + go node.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(groupID)}, host.ConstructP2pMessage(byte(0), proto_node.ConstructCXReceiptsProof(cxReceipts, merkleProof, newBlock.Header(), commitSig, commitBitmap))) +} + // BroadcastMissingCXReceipts broadcasts missing cross shard receipts per request func (node *Node) BroadcastMissingCXReceipts() { - sendNextTime := []common.Hash{} + sendNextTime := []core.CxEntry{} it := node.CxPool.Pool().Iterator() - for blockHash := range it.C { - blk := node.Blockchain().GetBlockByHash(blockHash.(common.Hash)) + for entry := range it.C { + cxEntry := entry.(core.CxEntry) + toShardID := cxEntry.ToShardID + blk := node.Blockchain().GetBlockByHash(cxEntry.BlockHash) if blk == nil { continue } blockNum := blk.NumberU64() - nextBlk := node.Blockchain().GetBlockByNumber(blockNum + 1) - if nextBlk == nil { - sendNextTime = append(sendNextTime, blockHash.(common.Hash)) + nextHeader := node.Blockchain().GetHeaderByNumber(blockNum + 1) + if nextHeader == nil { + sendNextTime = append(sendNextTime, cxEntry) continue } - sig := nextBlk.Header().LastCommitSignature() - bitmap := nextBlk.Header().LastCommitBitmap() + sig := nextHeader.LastCommitSignature() + bitmap := nextHeader.LastCommitBitmap() lastCommits := append(sig[:], bitmap...) - go node.BroadcastCXReceipts(blk, lastCommits) + go node.BroadcastCXReceiptsWithShardID(blk, lastCommits, toShardID) } node.CxPool.Clear() // this should not happen or maybe happen for impatient user - for _, blockHash := range sendNextTime { - node.CxPool.Add(blockHash) + for _, entry := range sendNextTime { + node.CxPool.Add(entry) } } From a81544f3ff8c18eda20649f9abe16aa7501058e9 Mon Sep 17 00:00:00 2001 From: chao Date: Wed, 11 Sep 2019 20:18:34 -0700 Subject: [PATCH 3/4] add more comments to CxPool --- core/cx_pool.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/cx_pool.go b/core/cx_pool.go index 76f83b75a..60abea67d 100644 --- a/core/cx_pool.go +++ b/core/cx_pool.go @@ -18,6 +18,9 @@ type CxEntry struct { } // CxPool is to hold a pool of block outgoing receipts to be resend in next round broadcast +// When a user/client doesn't find the destination shard get the money from cross shard tx +// it can send RPC call along with txID to allow the any validator to +// add the corresponding block's receipts to be resent type CxPool struct { pool mapset.Set maxSize int From ad10f192356ca6593b8084d346a39b91b8c1aa04 Mon Sep 17 00:00:00 2001 From: Chao Ma Date: Wed, 11 Sep 2019 21:24:51 -0700 Subject: [PATCH 4/4] refactor BroadcastCXReceiptsWithShardID and clean up corresponding logs --- node/node_cross_shard.go | 49 ++++++++-------------------------------- 1 file changed, 10 insertions(+), 39 deletions(-) diff --git a/node/node_cross_shard.go b/node/node_cross_shard.go index 5a1eef548..bdc7c744d 100644 --- a/node/node_cross_shard.go +++ b/node/node_cross_shard.go @@ -47,57 +47,29 @@ func (node *Node) BroadcastCXReceipts(newBlock *types.Block, lastCommits []byte) if i == int(myShardID) { continue } - cxReceipts, err := node.Blockchain().ReadCXReceipts(uint32(i), newBlock.NumberU64(), newBlock.Hash(), false) - if err != nil || len(cxReceipts) == 0 { - utils.Logger().Warn().Err(err).Uint32("ToShardID", uint32(i)).Int("numCXReceipts", len(cxReceipts)).Msg("[BroadcastCXReceipts] No ReadCXReceipts found") - continue - } - merkleProof, err := node.Blockchain().CXMerkleProof(uint32(i), newBlock) - if err != nil { - utils.Logger().Warn().Uint32("ToShardID", uint32(i)).Msg("[BroadcastCXReceipts] Unable to get merkleProof") - continue - } - utils.Logger().Info().Uint32("ToShardID", uint32(i)).Msg("[BroadcastCXReceipts] ReadCXReceipts and MerkleProof Found") - - groupID := p2p.ShardID(i) - go node.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(groupID)}, host.ConstructP2pMessage(byte(0), proto_node.ConstructCXReceiptsProof(cxReceipts, merkleProof, newBlock.Header(), commitSig, commitBitmap))) + go node.BroadcastCXReceiptsWithShardID(newBlock, commitSig, commitBitmap, uint32(i)) } } // BroadcastCXReceiptsWithShardID broadcasts cross shard receipts to given ToShardID -func (node *Node) BroadcastCXReceiptsWithShardID(newBlock *types.Block, lastCommits []byte, toShardID uint32) { - //#### Read payload data from committed msg - if len(lastCommits) <= 96 { - utils.Logger().Debug().Int("lastCommitsLen", len(lastCommits)).Msg("[BroadcastCXReceipts] lastCommits Not Enough Length") - } - commitSig := make([]byte, 96) - commitBitmap := make([]byte, len(lastCommits)-96) - offset := 0 - copy(commitSig[:], lastCommits[offset:offset+96]) - offset += 96 - copy(commitBitmap[:], lastCommits[offset:]) - //#### END Read payload data from committed msg - - epoch := newBlock.Header().Epoch() - shardingConfig := core.ShardingSchedule.InstanceForEpoch(epoch) - shardNum := int(shardingConfig.NumShards()) +func (node *Node) BroadcastCXReceiptsWithShardID(block *types.Block, commitSig []byte, commitBitmap []byte, toShardID uint32) { myShardID := node.Consensus.ShardID - utils.Logger().Info().Int("shardNum", shardNum).Uint32("myShardID", myShardID).Uint64("blockNum", newBlock.NumberU64()).Msg("[BroadcastCXReceipts]") + utils.Logger().Info().Uint32("toShardID", toShardID).Uint32("myShardID", myShardID).Uint64("blockNum", block.NumberU64()).Msg("[BroadcastCXReceiptsWithShardID]") - cxReceipts, err := node.Blockchain().ReadCXReceipts(toShardID, newBlock.NumberU64(), newBlock.Hash(), false) + cxReceipts, err := node.Blockchain().ReadCXReceipts(toShardID, block.NumberU64(), block.Hash(), false) if err != nil || len(cxReceipts) == 0 { - utils.Logger().Warn().Err(err).Uint32("ToShardID", toShardID).Int("numCXReceipts", len(cxReceipts)).Msg("[BroadcastCXReceipts] No ReadCXReceipts found") + utils.Logger().Warn().Err(err).Uint32("ToShardID", toShardID).Int("numCXReceipts", len(cxReceipts)).Msg("[BroadcastCXReceiptsWithShardID] No ReadCXReceipts found") return } - merkleProof, err := node.Blockchain().CXMerkleProof(toShardID, newBlock) + merkleProof, err := node.Blockchain().CXMerkleProof(toShardID, block) if err != nil { - utils.Logger().Warn().Uint32("ToShardID", toShardID).Msg("[BroadcastCXReceipts] Unable to get merkleProof") + utils.Logger().Warn().Uint32("ToShardID", toShardID).Msg("[BroadcastCXReceiptsWithShardID] Unable to get merkleProof") return } - utils.Logger().Info().Uint32("ToShardID", toShardID).Msg("[BroadcastCXReceipts] ReadCXReceipts and MerkleProof Found") + utils.Logger().Info().Uint32("ToShardID", toShardID).Msg("[BroadcastCXReceiptsWithShardID] ReadCXReceipts and MerkleProof Found") groupID := p2p.ShardID(toShardID) - go node.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(groupID)}, host.ConstructP2pMessage(byte(0), proto_node.ConstructCXReceiptsProof(cxReceipts, merkleProof, newBlock.Header(), commitSig, commitBitmap))) + go node.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(groupID)}, host.ConstructP2pMessage(byte(0), proto_node.ConstructCXReceiptsProof(cxReceipts, merkleProof, block.Header(), commitSig, commitBitmap))) } // BroadcastMissingCXReceipts broadcasts missing cross shard receipts per request @@ -119,8 +91,7 @@ func (node *Node) BroadcastMissingCXReceipts() { } sig := nextHeader.LastCommitSignature() bitmap := nextHeader.LastCommitBitmap() - lastCommits := append(sig[:], bitmap...) - go node.BroadcastCXReceiptsWithShardID(blk, lastCommits, toShardID) + go node.BroadcastCXReceiptsWithShardID(blk, sig[:], bitmap, toShardID) } node.CxPool.Clear() // this should not happen or maybe happen for impatient user