From 00c2356b5ed951fc0f3246f7973b96cc58bbbe1e Mon Sep 17 00:00:00 2001 From: chao Date: Wed, 11 Sep 2019 19:54:16 -0700 Subject: [PATCH] add tx type check; improve several efficiency issues --- core/blockchain.go | 9 +++--- core/cx_pool.go | 13 ++++++-- hmy/api_backend.go | 18 ++++++++--- hmy/backend.go | 2 +- internal/hmyapi/backend.go | 2 +- internal/hmyapi/blockchain.go | 10 ++++-- node/node_cross_shard.go | 60 ++++++++++++++++++++++++++++------- 7 files changed, 88 insertions(+), 26 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index c63a6e6ac..abf279c50 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2238,8 +2238,9 @@ func (bc *BlockChain) UpdateCXReceiptsCheckpointsByBlock(block *types.Block) { } } -// ReadTxLookupEntry returns the corresponding blockHash and blockNum where transaction locates -func (bc *BlockChain) ReadTxLookupEntry(txID common.Hash) (common.Hash, uint64) { - blockHash, blockNum, _ := rawdb.ReadTxLookupEntry(bc.db, txID) - return blockHash, blockNum +// ReadTxLookupEntry returns where the given transaction resides in the chain, +// as a (block hash, block number, index in transaction list) triple. +// returns 0, 0 if not found +func (bc *BlockChain) ReadTxLookupEntry(txID common.Hash) (common.Hash, uint64, uint64) { + return rawdb.ReadTxLookupEntry(bc.db, txID) } diff --git a/core/cx_pool.go b/core/cx_pool.go index 0efcdd2e6..76f83b75a 100644 --- a/core/cx_pool.go +++ b/core/cx_pool.go @@ -7,14 +7,23 @@ import ( ) const ( + // CxPoolSize is the maximum size of the pool CxPoolSize = 50 ) +// CxEntry represents the egress receipt's blockHash and ToShardID +type CxEntry struct { + BlockHash common.Hash + ToShardID uint32 +} + +// CxPool is to hold a pool of block outgoing receipts to be resend in next round broadcast type CxPool struct { pool mapset.Set maxSize int } +// NewCxPool creates a new CxPool func NewCxPool(limit int) *CxPool { pool := mapset.NewSet() cxPool := CxPool{pool: pool, maxSize: limit} @@ -32,11 +41,11 @@ func (cxPool *CxPool) Size() int { } // Add add element into the pool if not exceed limit -func (cxPool *CxPool) Add(hash common.Hash) bool { +func (cxPool *CxPool) Add(entry CxEntry) bool { if cxPool.Size() > cxPool.maxSize { return false } - cxPool.pool.Add(hash) + cxPool.pool.Add(entry) return true } diff --git a/hmy/api_backend.go b/hmy/api_backend.go index 5f448dc03..bc5a3bfa8 100644 --- a/hmy/api_backend.go +++ b/hmy/api_backend.go @@ -233,13 +233,23 @@ func (b *APIBackend) GetShardID() uint32 { return b.hmy.shardID } -// AddBlockHashToCxPool retrieve blockHash from txID and add blockHash to CxPool for resending -func (b *APIBackend) AddBlockHashToCxPool(ctx context.Context, txID common.Hash) (uint64, bool) { - blockHash, blockNum := b.hmy.BlockChain().ReadTxLookupEntry(txID) +// ResendCx retrieve blockHash from txID and add blockHash to CxPool for resending +func (b *APIBackend) ResendCx(ctx context.Context, txID common.Hash) (uint64, bool) { + blockHash, blockNum, index := b.hmy.BlockChain().ReadTxLookupEntry(txID) blk := b.hmy.BlockChain().GetBlockByHash(blockHash) if blk == nil { return 0, false } - success := b.hmy.CxPool().Add(blockHash) + txs := blk.Transactions() + // a valid index is from 0 to len-1 + if int(index) > len(txs)-1 { + return 0, false + } + tx := txs[int(index)] + if tx.ShardID() == tx.ToShardID() || blk.Header().ShardID() != tx.ShardID() { + return 0, false + } + entry := core.CxEntry{blockHash, tx.ToShardID()} + success := b.hmy.CxPool().Add(entry) return blockNum, success } diff --git a/hmy/backend.go b/hmy/backend.go index 6d94225f8..fab35c7aa 100644 --- a/hmy/backend.go +++ b/hmy/backend.go @@ -77,7 +77,7 @@ func New(nodeAPI NodeAPI, txPool *core.TxPool, cxPool *core.CxPool, eventMux *ev // TxPool ... func (s *Harmony) TxPool() *core.TxPool { return s.txPool } -// CxPool ... +// CxPool is used to store the blockHashes, where the corresponding block contains the cross shard receipts to be sent func (s *Harmony) CxPool() *core.CxPool { return s.cxPool } // BlockChain ... diff --git a/internal/hmyapi/backend.go b/internal/hmyapi/backend.go index 44093dd9d..360defc49 100644 --- a/internal/hmyapi/backend.go +++ b/internal/hmyapi/backend.go @@ -66,7 +66,7 @@ type Backend interface { GetShardID() uint32 // retrieve the blockHash using txID and add blockHash to CxPool for resending - AddBlockHashToCxPool(ctx context.Context, txID common.Hash) (uint64, bool) + ResendCx(ctx context.Context, txID common.Hash) (uint64, bool) } // GetAPIs returns all the APIs. diff --git a/internal/hmyapi/blockchain.go b/internal/hmyapi/blockchain.go index 5f420fd0f..6a0a23704 100644 --- a/internal/hmyapi/blockchain.go +++ b/internal/hmyapi/blockchain.go @@ -112,9 +112,13 @@ func (s *PublicBlockChainAPI) BlockNumber() hexutil.Uint64 { return hexutil.Uint64(header.Number().Uint64()) } -// ResendTx retrieves blockHash by TxID and add blockHash to CxPool for resending -func (s *PublicBlockChainAPI) ResendTx(ctx context.Context, txID common.Hash) (bool, error) { - _, success := s.b.AddBlockHashToCxPool(ctx, txID) +// ResendCx requests that the egress receipt for the given cross-shard +// transaction be sent to the destination shard for credit. This is used for +// unblocking a half-complete cross-shard transaction whose fund has been +// withdrawn already from the source shard but not credited yet in the +// destination account due to transient failures. +func (s *PublicBlockChainAPI) ResendCx(ctx context.Context, txID common.Hash) (bool, error) { + _, success := s.b.ResendCx(ctx, txID) return success, nil } diff --git a/node/node_cross_shard.go b/node/node_cross_shard.go index 872111362..5a1eef548 100644 --- a/node/node_cross_shard.go +++ b/node/node_cross_shard.go @@ -64,30 +64,68 @@ func (node *Node) BroadcastCXReceipts(newBlock *types.Block, lastCommits []byte) } } +// BroadcastCXReceiptsWithShardID broadcasts cross shard receipts to given ToShardID +func (node *Node) BroadcastCXReceiptsWithShardID(newBlock *types.Block, lastCommits []byte, toShardID uint32) { + //#### Read payload data from committed msg + if len(lastCommits) <= 96 { + utils.Logger().Debug().Int("lastCommitsLen", len(lastCommits)).Msg("[BroadcastCXReceipts] lastCommits Not Enough Length") + } + commitSig := make([]byte, 96) + commitBitmap := make([]byte, len(lastCommits)-96) + offset := 0 + copy(commitSig[:], lastCommits[offset:offset+96]) + offset += 96 + copy(commitBitmap[:], lastCommits[offset:]) + //#### END Read payload data from committed msg + + epoch := newBlock.Header().Epoch() + shardingConfig := core.ShardingSchedule.InstanceForEpoch(epoch) + shardNum := int(shardingConfig.NumShards()) + myShardID := node.Consensus.ShardID + utils.Logger().Info().Int("shardNum", shardNum).Uint32("myShardID", myShardID).Uint64("blockNum", newBlock.NumberU64()).Msg("[BroadcastCXReceipts]") + + cxReceipts, err := node.Blockchain().ReadCXReceipts(toShardID, newBlock.NumberU64(), newBlock.Hash(), false) + if err != nil || len(cxReceipts) == 0 { + utils.Logger().Warn().Err(err).Uint32("ToShardID", toShardID).Int("numCXReceipts", len(cxReceipts)).Msg("[BroadcastCXReceipts] No ReadCXReceipts found") + return + } + merkleProof, err := node.Blockchain().CXMerkleProof(toShardID, newBlock) + if err != nil { + utils.Logger().Warn().Uint32("ToShardID", toShardID).Msg("[BroadcastCXReceipts] Unable to get merkleProof") + return + } + utils.Logger().Info().Uint32("ToShardID", toShardID).Msg("[BroadcastCXReceipts] ReadCXReceipts and MerkleProof Found") + + groupID := p2p.ShardID(toShardID) + go node.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(groupID)}, host.ConstructP2pMessage(byte(0), proto_node.ConstructCXReceiptsProof(cxReceipts, merkleProof, newBlock.Header(), commitSig, commitBitmap))) +} + // BroadcastMissingCXReceipts broadcasts missing cross shard receipts per request func (node *Node) BroadcastMissingCXReceipts() { - sendNextTime := []common.Hash{} + sendNextTime := []core.CxEntry{} it := node.CxPool.Pool().Iterator() - for blockHash := range it.C { - blk := node.Blockchain().GetBlockByHash(blockHash.(common.Hash)) + for entry := range it.C { + cxEntry := entry.(core.CxEntry) + toShardID := cxEntry.ToShardID + blk := node.Blockchain().GetBlockByHash(cxEntry.BlockHash) if blk == nil { continue } blockNum := blk.NumberU64() - nextBlk := node.Blockchain().GetBlockByNumber(blockNum + 1) - if nextBlk == nil { - sendNextTime = append(sendNextTime, blockHash.(common.Hash)) + nextHeader := node.Blockchain().GetHeaderByNumber(blockNum + 1) + if nextHeader == nil { + sendNextTime = append(sendNextTime, cxEntry) continue } - sig := nextBlk.Header().LastCommitSignature() - bitmap := nextBlk.Header().LastCommitBitmap() + sig := nextHeader.LastCommitSignature() + bitmap := nextHeader.LastCommitBitmap() lastCommits := append(sig[:], bitmap...) - go node.BroadcastCXReceipts(blk, lastCommits) + go node.BroadcastCXReceiptsWithShardID(blk, lastCommits, toShardID) } node.CxPool.Clear() // this should not happen or maybe happen for impatient user - for _, blockHash := range sendNextTime { - node.CxPool.Add(blockHash) + for _, entry := range sendNextTime { + node.CxPool.Add(entry) } }