Merge pull request #1577 from chaosma/cx_rpc

add rpc call to resend missing cross shard receipt
pull/1581/head
chaosma 5 years ago committed by GitHub
commit e903486d27
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      core/blockchain.go
  2. 58
      core/cx_pool.go
  3. 21
      hmy/api_backend.go
  4. 7
      hmy/backend.go
  5. 3
      internal/hmyapi/backend.go
  6. 10
      internal/hmyapi/blockchain.go
  7. 3
      node/node.go
  8. 54
      node/node_cross_shard.go
  9. 2
      node/node_handler.go
  10. 2
      node/rpc.go

@ -2238,3 +2238,10 @@ func (bc *BlockChain) UpdateCXReceiptsCheckpointsByBlock(block *types.Block) {
bc.updateCXReceiptsCheckpoints(k, v)
}
}
// ReadTxLookupEntry returns where the given transaction resides in the chain,
// as a (block hash, block number, index in transaction list) triple.
// returns 0, 0 if not found
func (bc *BlockChain) ReadTxLookupEntry(txID common.Hash) (common.Hash, uint64, uint64) {
return rawdb.ReadTxLookupEntry(bc.db, txID)
}

@ -0,0 +1,58 @@
package core
import (
mapset "github.com/deckarep/golang-set"
"github.com/ethereum/go-ethereum/common"
)
const (
// CxPoolSize is the maximum size of the pool
CxPoolSize = 50
)
// CxEntry represents the egress receipt's blockHash and ToShardID
type CxEntry struct {
BlockHash common.Hash
ToShardID uint32
}
// CxPool is to hold a pool of block outgoing receipts to be resend in next round broadcast
// When a user/client doesn't find the destination shard get the money from cross shard tx
// it can send RPC call along with txID to allow the any validator to
// add the corresponding block's receipts to be resent
type CxPool struct {
pool mapset.Set
maxSize int
}
// NewCxPool creates a new CxPool
func NewCxPool(limit int) *CxPool {
pool := mapset.NewSet()
cxPool := CxPool{pool: pool, maxSize: limit}
return &cxPool
}
// Pool returns the pool of blockHashes of missing receipts
func (cxPool *CxPool) Pool() mapset.Set {
return cxPool.pool
}
// Size return size of the pool
func (cxPool *CxPool) Size() int {
return cxPool.pool.Cardinality()
}
// Add add element into the pool if not exceed limit
func (cxPool *CxPool) Add(entry CxEntry) bool {
if cxPool.Size() > cxPool.maxSize {
return false
}
cxPool.pool.Add(entry)
return true
}
// Clear empty the pool
func (cxPool *CxPool) Clear() {
cxPool.pool.Clear()
}

@ -232,3 +232,24 @@ func (b *APIBackend) RPCGasCap() *big.Int {
func (b *APIBackend) GetShardID() uint32 {
return b.hmy.shardID
}
// ResendCx retrieve blockHash from txID and add blockHash to CxPool for resending
func (b *APIBackend) ResendCx(ctx context.Context, txID common.Hash) (uint64, bool) {
blockHash, blockNum, index := b.hmy.BlockChain().ReadTxLookupEntry(txID)
blk := b.hmy.BlockChain().GetBlockByHash(blockHash)
if blk == nil {
return 0, false
}
txs := blk.Transactions()
// a valid index is from 0 to len-1
if int(index) > len(txs)-1 {
return 0, false
}
tx := txs[int(index)]
if tx.ShardID() == tx.ToShardID() || blk.Header().ShardID() != tx.ShardID() {
return 0, false
}
entry := core.CxEntry{blockHash, tx.ToShardID()}
success := b.hmy.CxPool().Add(entry)
return blockNum, success
}

@ -21,6 +21,7 @@ type Harmony struct {
blockchain *core.BlockChain
txPool *core.TxPool
cxPool *core.CxPool
accountManager *accounts.Manager
eventMux *event.TypeMux
// DB interfaces
@ -51,13 +52,14 @@ type NodeAPI interface {
// New creates a new Harmony object (including the
// initialisation of the common Harmony object)
func New(nodeAPI NodeAPI, txPool *core.TxPool, eventMux *event.TypeMux, shardID uint32) (*Harmony, error) {
func New(nodeAPI NodeAPI, txPool *core.TxPool, cxPool *core.CxPool, eventMux *event.TypeMux, shardID uint32) (*Harmony, error) {
chainDb := nodeAPI.Blockchain().ChainDB()
hmy := &Harmony{
shutdownChan: make(chan bool),
bloomRequests: make(chan chan *bloombits.Retrieval),
blockchain: nodeAPI.Blockchain(),
txPool: txPool,
cxPool: cxPool,
accountManager: nodeAPI.AccountManager(),
eventMux: eventMux,
chainDb: chainDb,
@ -75,6 +77,9 @@ func New(nodeAPI NodeAPI, txPool *core.TxPool, eventMux *event.TypeMux, shardID
// TxPool ...
func (s *Harmony) TxPool() *core.TxPool { return s.txPool }
// CxPool is used to store the blockHashes, where the corresponding block contains the cross shard receipts to be sent
func (s *Harmony) CxPool() *core.CxPool { return s.cxPool }
// BlockChain ...
func (s *Harmony) BlockChain() *core.BlockChain { return s.blockchain }

@ -64,6 +64,9 @@ type Backend interface {
// Get balance
GetBalance(address common.Address) (*hexutil.Big, error)
GetShardID() uint32
// retrieve the blockHash using txID and add blockHash to CxPool for resending
ResendCx(ctx context.Context, txID common.Hash) (uint64, bool)
}
// GetAPIs returns all the APIs.

@ -112,6 +112,16 @@ func (s *PublicBlockChainAPI) BlockNumber() hexutil.Uint64 {
return hexutil.Uint64(header.Number().Uint64())
}
// ResendCx requests that the egress receipt for the given cross-shard
// transaction be sent to the destination shard for credit. This is used for
// unblocking a half-complete cross-shard transaction whose fund has been
// withdrawn already from the source shard but not credited yet in the
// destination account due to transient failures.
func (s *PublicBlockChainAPI) ResendCx(ctx context.Context, txID common.Hash) (bool, error) {
_, success := s.b.ResendCx(ctx, txID)
return success, nil
}
// Call executes the given transaction on the state for the given block number.
// It doesn't make and changes in the state/blockchain and is useful to execute and retrieve values.
func (s *PublicBlockChainAPI) Call(ctx context.Context, args CallArgs, blockNr rpc.BlockNumber) (hexutil.Bytes, error) {

@ -120,6 +120,8 @@ type Node struct {
TxPool *core.TxPool // TODO migrate to TxPool from pendingTransactions list below
CxPool *core.CxPool // pool for missing cross shard receipts resend
pendingTransactions types.Transactions // All the transactions received but not yet processed for Consensus
pendingTxMutex sync.Mutex
recentTxsStats types.RecentTxsStats
@ -391,6 +393,7 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, chainDBFactory shardc
node.BeaconBlockChannel = make(chan *types.Block)
node.recentTxsStats = make(types.RecentTxsStats)
node.TxPool = core.NewTxPool(core.DefaultTxPoolConfig, node.Blockchain().Config(), blockchain)
node.CxPool = core.NewCxPool(core.CxPoolSize)
node.Worker = worker.New(node.Blockchain().Config(), blockchain, chain.Engine, node.Consensus.ShardID)
if node.Blockchain().ShardID() != 0 {
node.BeaconWorker = worker.New(node.Beaconchain().Config(), beaconChain, chain.Engine, node.Consensus.ShardID)

@ -47,20 +47,56 @@ func (node *Node) BroadcastCXReceipts(newBlock *types.Block, lastCommits []byte)
if i == int(myShardID) {
continue
}
cxReceipts, err := node.Blockchain().ReadCXReceipts(uint32(i), newBlock.NumberU64(), newBlock.Hash(), false)
go node.BroadcastCXReceiptsWithShardID(newBlock, commitSig, commitBitmap, uint32(i))
}
}
// BroadcastCXReceiptsWithShardID broadcasts cross shard receipts to given ToShardID
func (node *Node) BroadcastCXReceiptsWithShardID(block *types.Block, commitSig []byte, commitBitmap []byte, toShardID uint32) {
myShardID := node.Consensus.ShardID
utils.Logger().Info().Uint32("toShardID", toShardID).Uint32("myShardID", myShardID).Uint64("blockNum", block.NumberU64()).Msg("[BroadcastCXReceiptsWithShardID]")
cxReceipts, err := node.Blockchain().ReadCXReceipts(toShardID, block.NumberU64(), block.Hash(), false)
if err != nil || len(cxReceipts) == 0 {
utils.Logger().Warn().Err(err).Uint32("ToShardID", uint32(i)).Int("numCXReceipts", len(cxReceipts)).Msg("[BroadcastCXReceipts] No ReadCXReceipts found")
continue
utils.Logger().Warn().Err(err).Uint32("ToShardID", toShardID).Int("numCXReceipts", len(cxReceipts)).Msg("[BroadcastCXReceiptsWithShardID] No ReadCXReceipts found")
return
}
merkleProof, err := node.Blockchain().CXMerkleProof(uint32(i), newBlock)
merkleProof, err := node.Blockchain().CXMerkleProof(toShardID, block)
if err != nil {
utils.Logger().Warn().Uint32("ToShardID", uint32(i)).Msg("[BroadcastCXReceipts] Unable to get merkleProof")
continue
utils.Logger().Warn().Uint32("ToShardID", toShardID).Msg("[BroadcastCXReceiptsWithShardID] Unable to get merkleProof")
return
}
utils.Logger().Info().Uint32("ToShardID", toShardID).Msg("[BroadcastCXReceiptsWithShardID] ReadCXReceipts and MerkleProof Found")
groupID := p2p.ShardID(toShardID)
go node.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(groupID)}, host.ConstructP2pMessage(byte(0), proto_node.ConstructCXReceiptsProof(cxReceipts, merkleProof, block.Header(), commitSig, commitBitmap)))
}
utils.Logger().Info().Uint32("ToShardID", uint32(i)).Msg("[BroadcastCXReceipts] ReadCXReceipts and MerkleProof Found")
groupID := p2p.ShardID(i)
go node.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(groupID)}, host.ConstructP2pMessage(byte(0), proto_node.ConstructCXReceiptsProof(cxReceipts, merkleProof, newBlock.Header(), commitSig, commitBitmap)))
// BroadcastMissingCXReceipts broadcasts missing cross shard receipts per request
func (node *Node) BroadcastMissingCXReceipts() {
sendNextTime := []core.CxEntry{}
it := node.CxPool.Pool().Iterator()
for entry := range it.C {
cxEntry := entry.(core.CxEntry)
toShardID := cxEntry.ToShardID
blk := node.Blockchain().GetBlockByHash(cxEntry.BlockHash)
if blk == nil {
continue
}
blockNum := blk.NumberU64()
nextHeader := node.Blockchain().GetHeaderByNumber(blockNum + 1)
if nextHeader == nil {
sendNextTime = append(sendNextTime, cxEntry)
continue
}
sig := nextHeader.LastCommitSignature()
bitmap := nextHeader.LastCommitBitmap()
go node.BroadcastCXReceiptsWithShardID(blk, sig[:], bitmap, toShardID)
}
node.CxPool.Clear()
// this should not happen or maybe happen for impatient user
for _, entry := range sendNextTime {
node.CxPool.Add(entry)
}
}

@ -392,6 +392,8 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block, commitSigAndBit
}
}
node.BroadcastMissingCXReceipts()
// TODO chao: uncomment this after beacon syncing is stable
// node.Blockchain().UpdateCXReceiptsCheckpointsByBlock(newBlock)

@ -46,7 +46,7 @@ var (
// StartRPC start RPC service
func (node *Node) StartRPC(nodePort string) error {
// Gather all the possible APIs to surface
harmony, _ = hmy.New(node, node.TxPool, new(event.TypeMux), node.Consensus.ShardID)
harmony, _ = hmy.New(node, node.TxPool, node.CxPool, new(event.TypeMux), node.Consensus.ShardID)
apis := node.APIs()

Loading…
Cancel
Save