add http rpc call to resend missing cross shard receipt

pull/1577/head
chao 5 years ago
parent 4ff17bbc59
commit c78b8fa352
  1. 6
      core/blockchain.go
  2. 46
      core/cx_pool.go
  3. 11
      hmy/api_backend.go
  4. 7
      hmy/backend.go
  5. 3
      internal/hmyapi/backend.go
  6. 6
      internal/hmyapi/blockchain.go
  7. 3
      node/node.go
  8. 27
      node/node_cross_shard.go
  9. 2
      node/node_handler.go
  10. 2
      node/rpc.go

@ -2237,3 +2237,9 @@ func (bc *BlockChain) UpdateCXReceiptsCheckpointsByBlock(block *types.Block) {
bc.updateCXReceiptsCheckpoints(k, v)
}
}
// ReadTxLookupEntry returns the corresponding blockHash and blockNum where transaction locates
func (bc *BlockChain) ReadTxLookupEntry(txID common.Hash) (common.Hash, uint64) {
blockHash, blockNum, _ := rawdb.ReadTxLookupEntry(bc.db, txID)
return blockHash, blockNum
}

@ -0,0 +1,46 @@
package core
import (
mapset "github.com/deckarep/golang-set"
"github.com/ethereum/go-ethereum/common"
)
const (
CxPoolSize = 50
)
type CxPool struct {
pool mapset.Set
maxSize int
}
func NewCxPool(limit int) *CxPool {
pool := mapset.NewSet()
cxPool := CxPool{pool: pool, maxSize: limit}
return &cxPool
}
// Pool returns the pool of blockHashes of missing receipts
func (cxPool *CxPool) Pool() mapset.Set {
return cxPool.pool
}
// Size return size of the pool
func (cxPool *CxPool) Size() int {
return cxPool.pool.Cardinality()
}
// Add add element into the pool if not exceed limit
func (cxPool *CxPool) Add(hash common.Hash) bool {
if cxPool.Size() > cxPool.maxSize {
return false
}
cxPool.pool.Add(hash)
return true
}
// Clear empty the pool
func (cxPool *CxPool) Clear() {
cxPool.pool.Clear()
}

@ -232,3 +232,14 @@ func (b *APIBackend) RPCGasCap() *big.Int {
func (b *APIBackend) GetShardID() uint32 {
return b.hmy.shardID
}
// AddBlockHashToCxPool retrieve blockHash from txID and add blockHash to CxPool for resending
func (b *APIBackend) AddBlockHashToCxPool(ctx context.Context, txID common.Hash) (uint64, bool) {
blockHash, blockNum := b.hmy.BlockChain().ReadTxLookupEntry(txID)
blk := b.hmy.BlockChain().GetBlockByHash(blockHash)
if blk == nil {
return 0, false
}
success := b.hmy.CxPool().Add(blockHash)
return blockNum, success
}

@ -21,6 +21,7 @@ type Harmony struct {
blockchain *core.BlockChain
txPool *core.TxPool
cxPool *core.CxPool
accountManager *accounts.Manager
eventMux *event.TypeMux
// DB interfaces
@ -51,13 +52,14 @@ type NodeAPI interface {
// New creates a new Harmony object (including the
// initialisation of the common Harmony object)
func New(nodeAPI NodeAPI, txPool *core.TxPool, eventMux *event.TypeMux, shardID uint32) (*Harmony, error) {
func New(nodeAPI NodeAPI, txPool *core.TxPool, cxPool *core.CxPool, eventMux *event.TypeMux, shardID uint32) (*Harmony, error) {
chainDb := nodeAPI.Blockchain().ChainDB()
hmy := &Harmony{
shutdownChan: make(chan bool),
bloomRequests: make(chan chan *bloombits.Retrieval),
blockchain: nodeAPI.Blockchain(),
txPool: txPool,
cxPool: cxPool,
accountManager: nodeAPI.AccountManager(),
eventMux: eventMux,
chainDb: chainDb,
@ -75,6 +77,9 @@ func New(nodeAPI NodeAPI, txPool *core.TxPool, eventMux *event.TypeMux, shardID
// TxPool ...
func (s *Harmony) TxPool() *core.TxPool { return s.txPool }
// CxPool ...
func (s *Harmony) CxPool() *core.CxPool { return s.cxPool }
// BlockChain ...
func (s *Harmony) BlockChain() *core.BlockChain { return s.blockchain }

@ -64,6 +64,9 @@ type Backend interface {
// Get balance
GetBalance(address common.Address) (*hexutil.Big, error)
GetShardID() uint32
// retrieve the blockHash using txID and add blockHash to CxPool for resending
AddBlockHashToCxPool(ctx context.Context, txID common.Hash) (uint64, bool)
}
// GetAPIs returns all the APIs.

@ -112,6 +112,12 @@ func (s *PublicBlockChainAPI) BlockNumber() hexutil.Uint64 {
return hexutil.Uint64(header.Number().Uint64())
}
// ResendTx retrieves blockHash by TxID and add blockHash to CxPool for resending
func (s *PublicBlockChainAPI) ResendTx(ctx context.Context, txID common.Hash) (bool, error) {
_, success := s.b.AddBlockHashToCxPool(ctx, txID)
return success, nil
}
// Call executes the given transaction on the state for the given block number.
// It doesn't make and changes in the state/blockchain and is useful to execute and retrieve values.
func (s *PublicBlockChainAPI) Call(ctx context.Context, args CallArgs, blockNr rpc.BlockNumber) (hexutil.Bytes, error) {

@ -120,6 +120,8 @@ type Node struct {
TxPool *core.TxPool // TODO migrate to TxPool from pendingTransactions list below
CxPool *core.CxPool // pool for missing cross shard receipts resend
pendingTransactions types.Transactions // All the transactions received but not yet processed for Consensus
pendingTxMutex sync.Mutex
recentTxsStats types.RecentTxsStats
@ -391,6 +393,7 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, chainDBFactory shardc
node.BeaconBlockChannel = make(chan *types.Block)
node.recentTxsStats = make(types.RecentTxsStats)
node.TxPool = core.NewTxPool(core.DefaultTxPoolConfig, node.Blockchain().Config(), blockchain)
node.CxPool = core.NewCxPool(core.CxPoolSize)
node.Worker = worker.New(node.Blockchain().Config(), blockchain, chain.Engine, node.Consensus.ShardID)
if node.Blockchain().ShardID() != 0 {
node.BeaconWorker = worker.New(node.Beaconchain().Config(), beaconChain, chain.Engine, node.Consensus.ShardID)

@ -64,6 +64,33 @@ func (node *Node) BroadcastCXReceipts(newBlock *types.Block, lastCommits []byte)
}
}
// BroadcastMissingCXReceipts broadcasts missing cross shard receipts per request
func (node *Node) BroadcastMissingCXReceipts() {
sendNextTime := []common.Hash{}
it := node.CxPool.Pool().Iterator()
for blockHash := range it.C {
blk := node.Blockchain().GetBlockByHash(blockHash.(common.Hash))
if blk == nil {
continue
}
blockNum := blk.NumberU64()
nextBlk := node.Blockchain().GetBlockByNumber(blockNum + 1)
if nextBlk == nil {
sendNextTime = append(sendNextTime, blockHash.(common.Hash))
continue
}
sig := nextBlk.Header().LastCommitSignature()
bitmap := nextBlk.Header().LastCommitBitmap()
lastCommits := append(sig[:], bitmap...)
go node.BroadcastCXReceipts(blk, lastCommits)
}
node.CxPool.Clear()
// this should not happen or maybe happen for impatient user
for _, blockHash := range sendNextTime {
node.CxPool.Add(blockHash)
}
}
// VerifyBlockCrossLinks verifies the cross links of the block
func (node *Node) VerifyBlockCrossLinks(block *types.Block) error {
if len(block.Header().CrossLinks()) == 0 {

@ -392,6 +392,8 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block, commitSigAndBit
}
}
node.BroadcastMissingCXReceipts()
// TODO chao: uncomment this after beacon syncing is stable
// node.Blockchain().UpdateCXReceiptsCheckpointsByBlock(newBlock)

@ -46,7 +46,7 @@ var (
// StartRPC start RPC service
func (node *Node) StartRPC(nodePort string) error {
// Gather all the possible APIs to surface
harmony, _ = hmy.New(node, node.TxPool, new(event.TypeMux), node.Consensus.ShardID)
harmony, _ = hmy.New(node, node.TxPool, node.CxPool, new(event.TypeMux), node.Consensus.ShardID)
apis := node.APIs()

Loading…
Cancel
Save