diff --git a/core/blockchain.go b/core/blockchain.go index 81c5a0761..c63a6e6ac 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2237,3 +2237,9 @@ func (bc *BlockChain) UpdateCXReceiptsCheckpointsByBlock(block *types.Block) { bc.updateCXReceiptsCheckpoints(k, v) } } + +// ReadTxLookupEntry returns the corresponding blockHash and blockNum where transaction locates +func (bc *BlockChain) ReadTxLookupEntry(txID common.Hash) (common.Hash, uint64) { + blockHash, blockNum, _ := rawdb.ReadTxLookupEntry(bc.db, txID) + return blockHash, blockNum +} diff --git a/core/cx_pool.go b/core/cx_pool.go new file mode 100644 index 000000000..0efcdd2e6 --- /dev/null +++ b/core/cx_pool.go @@ -0,0 +1,46 @@ +package core + +import ( + mapset "github.com/deckarep/golang-set" + + "github.com/ethereum/go-ethereum/common" +) + +const ( + CxPoolSize = 50 +) + +type CxPool struct { + pool mapset.Set + maxSize int +} + +func NewCxPool(limit int) *CxPool { + pool := mapset.NewSet() + cxPool := CxPool{pool: pool, maxSize: limit} + return &cxPool +} + +// Pool returns the pool of blockHashes of missing receipts +func (cxPool *CxPool) Pool() mapset.Set { + return cxPool.pool +} + +// Size return size of the pool +func (cxPool *CxPool) Size() int { + return cxPool.pool.Cardinality() +} + +// Add add element into the pool if not exceed limit +func (cxPool *CxPool) Add(hash common.Hash) bool { + if cxPool.Size() > cxPool.maxSize { + return false + } + cxPool.pool.Add(hash) + return true +} + +// Clear empty the pool +func (cxPool *CxPool) Clear() { + cxPool.pool.Clear() +} diff --git a/hmy/api_backend.go b/hmy/api_backend.go index a03901235..5f448dc03 100644 --- a/hmy/api_backend.go +++ b/hmy/api_backend.go @@ -232,3 +232,14 @@ func (b *APIBackend) RPCGasCap() *big.Int { func (b *APIBackend) GetShardID() uint32 { return b.hmy.shardID } + +// AddBlockHashToCxPool retrieve blockHash from txID and add blockHash to CxPool for resending +func (b *APIBackend) AddBlockHashToCxPool(ctx context.Context, txID common.Hash) (uint64, bool) { + blockHash, blockNum := b.hmy.BlockChain().ReadTxLookupEntry(txID) + blk := b.hmy.BlockChain().GetBlockByHash(blockHash) + if blk == nil { + return 0, false + } + success := b.hmy.CxPool().Add(blockHash) + return blockNum, success +} diff --git a/hmy/backend.go b/hmy/backend.go index c6a933b61..6d94225f8 100644 --- a/hmy/backend.go +++ b/hmy/backend.go @@ -21,6 +21,7 @@ type Harmony struct { blockchain *core.BlockChain txPool *core.TxPool + cxPool *core.CxPool accountManager *accounts.Manager eventMux *event.TypeMux // DB interfaces @@ -51,13 +52,14 @@ type NodeAPI interface { // New creates a new Harmony object (including the // initialisation of the common Harmony object) -func New(nodeAPI NodeAPI, txPool *core.TxPool, eventMux *event.TypeMux, shardID uint32) (*Harmony, error) { +func New(nodeAPI NodeAPI, txPool *core.TxPool, cxPool *core.CxPool, eventMux *event.TypeMux, shardID uint32) (*Harmony, error) { chainDb := nodeAPI.Blockchain().ChainDB() hmy := &Harmony{ shutdownChan: make(chan bool), bloomRequests: make(chan chan *bloombits.Retrieval), blockchain: nodeAPI.Blockchain(), txPool: txPool, + cxPool: cxPool, accountManager: nodeAPI.AccountManager(), eventMux: eventMux, chainDb: chainDb, @@ -75,6 +77,9 @@ func New(nodeAPI NodeAPI, txPool *core.TxPool, eventMux *event.TypeMux, shardID // TxPool ... func (s *Harmony) TxPool() *core.TxPool { return s.txPool } +// CxPool ... +func (s *Harmony) CxPool() *core.CxPool { return s.cxPool } + // BlockChain ... func (s *Harmony) BlockChain() *core.BlockChain { return s.blockchain } diff --git a/internal/hmyapi/backend.go b/internal/hmyapi/backend.go index d6adb727d..44093dd9d 100644 --- a/internal/hmyapi/backend.go +++ b/internal/hmyapi/backend.go @@ -64,6 +64,9 @@ type Backend interface { // Get balance GetBalance(address common.Address) (*hexutil.Big, error) GetShardID() uint32 + + // retrieve the blockHash using txID and add blockHash to CxPool for resending + AddBlockHashToCxPool(ctx context.Context, txID common.Hash) (uint64, bool) } // GetAPIs returns all the APIs. diff --git a/internal/hmyapi/blockchain.go b/internal/hmyapi/blockchain.go index f123dfa32..5f420fd0f 100644 --- a/internal/hmyapi/blockchain.go +++ b/internal/hmyapi/blockchain.go @@ -112,6 +112,12 @@ func (s *PublicBlockChainAPI) BlockNumber() hexutil.Uint64 { return hexutil.Uint64(header.Number().Uint64()) } +// ResendTx retrieves blockHash by TxID and add blockHash to CxPool for resending +func (s *PublicBlockChainAPI) ResendTx(ctx context.Context, txID common.Hash) (bool, error) { + _, success := s.b.AddBlockHashToCxPool(ctx, txID) + return success, nil +} + // Call executes the given transaction on the state for the given block number. // It doesn't make and changes in the state/blockchain and is useful to execute and retrieve values. func (s *PublicBlockChainAPI) Call(ctx context.Context, args CallArgs, blockNr rpc.BlockNumber) (hexutil.Bytes, error) { diff --git a/node/node.go b/node/node.go index 51281f4e0..b90d2bcf3 100644 --- a/node/node.go +++ b/node/node.go @@ -120,6 +120,8 @@ type Node struct { TxPool *core.TxPool // TODO migrate to TxPool from pendingTransactions list below + CxPool *core.CxPool // pool for missing cross shard receipts resend + pendingTransactions types.Transactions // All the transactions received but not yet processed for Consensus pendingTxMutex sync.Mutex recentTxsStats types.RecentTxsStats @@ -391,6 +393,7 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, chainDBFactory shardc node.BeaconBlockChannel = make(chan *types.Block) node.recentTxsStats = make(types.RecentTxsStats) node.TxPool = core.NewTxPool(core.DefaultTxPoolConfig, node.Blockchain().Config(), blockchain) + node.CxPool = core.NewCxPool(core.CxPoolSize) node.Worker = worker.New(node.Blockchain().Config(), blockchain, chain.Engine, node.Consensus.ShardID) if node.Blockchain().ShardID() != 0 { node.BeaconWorker = worker.New(node.Beaconchain().Config(), beaconChain, chain.Engine, node.Consensus.ShardID) diff --git a/node/node_cross_shard.go b/node/node_cross_shard.go index f8076d258..872111362 100644 --- a/node/node_cross_shard.go +++ b/node/node_cross_shard.go @@ -64,6 +64,33 @@ func (node *Node) BroadcastCXReceipts(newBlock *types.Block, lastCommits []byte) } } +// BroadcastMissingCXReceipts broadcasts missing cross shard receipts per request +func (node *Node) BroadcastMissingCXReceipts() { + sendNextTime := []common.Hash{} + it := node.CxPool.Pool().Iterator() + for blockHash := range it.C { + blk := node.Blockchain().GetBlockByHash(blockHash.(common.Hash)) + if blk == nil { + continue + } + blockNum := blk.NumberU64() + nextBlk := node.Blockchain().GetBlockByNumber(blockNum + 1) + if nextBlk == nil { + sendNextTime = append(sendNextTime, blockHash.(common.Hash)) + continue + } + sig := nextBlk.Header().LastCommitSignature() + bitmap := nextBlk.Header().LastCommitBitmap() + lastCommits := append(sig[:], bitmap...) + go node.BroadcastCXReceipts(blk, lastCommits) + } + node.CxPool.Clear() + // this should not happen or maybe happen for impatient user + for _, blockHash := range sendNextTime { + node.CxPool.Add(blockHash) + } +} + // VerifyBlockCrossLinks verifies the cross links of the block func (node *Node) VerifyBlockCrossLinks(block *types.Block) error { if len(block.Header().CrossLinks()) == 0 { diff --git a/node/node_handler.go b/node/node_handler.go index 556478e46..d53a9428a 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -392,6 +392,8 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block, commitSigAndBit } } + node.BroadcastMissingCXReceipts() + // TODO chao: uncomment this after beacon syncing is stable // node.Blockchain().UpdateCXReceiptsCheckpointsByBlock(newBlock) diff --git a/node/rpc.go b/node/rpc.go index c5c593386..2a7875e1a 100644 --- a/node/rpc.go +++ b/node/rpc.go @@ -46,7 +46,7 @@ var ( // StartRPC start RPC service func (node *Node) StartRPC(nodePort string) error { // Gather all the possible APIs to surface - harmony, _ = hmy.New(node, node.TxPool, new(event.TypeMux), node.Consensus.ShardID) + harmony, _ = hmy.New(node, node.TxPool, node.CxPool, new(event.TypeMux), node.Consensus.ShardID) apis := node.APIs()