add tx type check; improve several efficiency issues

pull/1577/head
chao 5 years ago
parent c78b8fa352
commit 00c2356b5e
  1. 9
      core/blockchain.go
  2. 13
      core/cx_pool.go
  3. 18
      hmy/api_backend.go
  4. 2
      hmy/backend.go
  5. 2
      internal/hmyapi/backend.go
  6. 10
      internal/hmyapi/blockchain.go
  7. 60
      node/node_cross_shard.go

@ -2238,8 +2238,9 @@ func (bc *BlockChain) UpdateCXReceiptsCheckpointsByBlock(block *types.Block) {
} }
} }
// ReadTxLookupEntry returns the corresponding blockHash and blockNum where transaction locates // ReadTxLookupEntry returns where the given transaction resides in the chain,
func (bc *BlockChain) ReadTxLookupEntry(txID common.Hash) (common.Hash, uint64) { // as a (block hash, block number, index in transaction list) triple.
blockHash, blockNum, _ := rawdb.ReadTxLookupEntry(bc.db, txID) // returns 0, 0 if not found
return blockHash, blockNum func (bc *BlockChain) ReadTxLookupEntry(txID common.Hash) (common.Hash, uint64, uint64) {
return rawdb.ReadTxLookupEntry(bc.db, txID)
} }

@ -7,14 +7,23 @@ import (
) )
const ( const (
// CxPoolSize is the maximum size of the pool
CxPoolSize = 50 CxPoolSize = 50
) )
// CxEntry represents the egress receipt's blockHash and ToShardID
type CxEntry struct {
BlockHash common.Hash
ToShardID uint32
}
// CxPool is to hold a pool of block outgoing receipts to be resend in next round broadcast
type CxPool struct { type CxPool struct {
pool mapset.Set pool mapset.Set
maxSize int maxSize int
} }
// NewCxPool creates a new CxPool
func NewCxPool(limit int) *CxPool { func NewCxPool(limit int) *CxPool {
pool := mapset.NewSet() pool := mapset.NewSet()
cxPool := CxPool{pool: pool, maxSize: limit} cxPool := CxPool{pool: pool, maxSize: limit}
@ -32,11 +41,11 @@ func (cxPool *CxPool) Size() int {
} }
// Add add element into the pool if not exceed limit // Add add element into the pool if not exceed limit
func (cxPool *CxPool) Add(hash common.Hash) bool { func (cxPool *CxPool) Add(entry CxEntry) bool {
if cxPool.Size() > cxPool.maxSize { if cxPool.Size() > cxPool.maxSize {
return false return false
} }
cxPool.pool.Add(hash) cxPool.pool.Add(entry)
return true return true
} }

@ -233,13 +233,23 @@ func (b *APIBackend) GetShardID() uint32 {
return b.hmy.shardID return b.hmy.shardID
} }
// AddBlockHashToCxPool retrieve blockHash from txID and add blockHash to CxPool for resending // ResendCx retrieve blockHash from txID and add blockHash to CxPool for resending
func (b *APIBackend) AddBlockHashToCxPool(ctx context.Context, txID common.Hash) (uint64, bool) { func (b *APIBackend) ResendCx(ctx context.Context, txID common.Hash) (uint64, bool) {
blockHash, blockNum := b.hmy.BlockChain().ReadTxLookupEntry(txID) blockHash, blockNum, index := b.hmy.BlockChain().ReadTxLookupEntry(txID)
blk := b.hmy.BlockChain().GetBlockByHash(blockHash) blk := b.hmy.BlockChain().GetBlockByHash(blockHash)
if blk == nil { if blk == nil {
return 0, false return 0, false
} }
success := b.hmy.CxPool().Add(blockHash) txs := blk.Transactions()
// a valid index is from 0 to len-1
if int(index) > len(txs)-1 {
return 0, false
}
tx := txs[int(index)]
if tx.ShardID() == tx.ToShardID() || blk.Header().ShardID() != tx.ShardID() {
return 0, false
}
entry := core.CxEntry{blockHash, tx.ToShardID()}
success := b.hmy.CxPool().Add(entry)
return blockNum, success return blockNum, success
} }

@ -77,7 +77,7 @@ func New(nodeAPI NodeAPI, txPool *core.TxPool, cxPool *core.CxPool, eventMux *ev
// TxPool ... // TxPool ...
func (s *Harmony) TxPool() *core.TxPool { return s.txPool } func (s *Harmony) TxPool() *core.TxPool { return s.txPool }
// CxPool ... // CxPool is used to store the blockHashes, where the corresponding block contains the cross shard receipts to be sent
func (s *Harmony) CxPool() *core.CxPool { return s.cxPool } func (s *Harmony) CxPool() *core.CxPool { return s.cxPool }
// BlockChain ... // BlockChain ...

@ -66,7 +66,7 @@ type Backend interface {
GetShardID() uint32 GetShardID() uint32
// retrieve the blockHash using txID and add blockHash to CxPool for resending // retrieve the blockHash using txID and add blockHash to CxPool for resending
AddBlockHashToCxPool(ctx context.Context, txID common.Hash) (uint64, bool) ResendCx(ctx context.Context, txID common.Hash) (uint64, bool)
} }
// GetAPIs returns all the APIs. // GetAPIs returns all the APIs.

@ -112,9 +112,13 @@ func (s *PublicBlockChainAPI) BlockNumber() hexutil.Uint64 {
return hexutil.Uint64(header.Number().Uint64()) return hexutil.Uint64(header.Number().Uint64())
} }
// ResendTx retrieves blockHash by TxID and add blockHash to CxPool for resending // ResendCx requests that the egress receipt for the given cross-shard
func (s *PublicBlockChainAPI) ResendTx(ctx context.Context, txID common.Hash) (bool, error) { // transaction be sent to the destination shard for credit. This is used for
_, success := s.b.AddBlockHashToCxPool(ctx, txID) // unblocking a half-complete cross-shard transaction whose fund has been
// withdrawn already from the source shard but not credited yet in the
// destination account due to transient failures.
func (s *PublicBlockChainAPI) ResendCx(ctx context.Context, txID common.Hash) (bool, error) {
_, success := s.b.ResendCx(ctx, txID)
return success, nil return success, nil
} }

@ -64,30 +64,68 @@ func (node *Node) BroadcastCXReceipts(newBlock *types.Block, lastCommits []byte)
} }
} }
// BroadcastCXReceiptsWithShardID broadcasts cross shard receipts to given ToShardID
func (node *Node) BroadcastCXReceiptsWithShardID(newBlock *types.Block, lastCommits []byte, toShardID uint32) {
//#### Read payload data from committed msg
if len(lastCommits) <= 96 {
utils.Logger().Debug().Int("lastCommitsLen", len(lastCommits)).Msg("[BroadcastCXReceipts] lastCommits Not Enough Length")
}
commitSig := make([]byte, 96)
commitBitmap := make([]byte, len(lastCommits)-96)
offset := 0
copy(commitSig[:], lastCommits[offset:offset+96])
offset += 96
copy(commitBitmap[:], lastCommits[offset:])
//#### END Read payload data from committed msg
epoch := newBlock.Header().Epoch()
shardingConfig := core.ShardingSchedule.InstanceForEpoch(epoch)
shardNum := int(shardingConfig.NumShards())
myShardID := node.Consensus.ShardID
utils.Logger().Info().Int("shardNum", shardNum).Uint32("myShardID", myShardID).Uint64("blockNum", newBlock.NumberU64()).Msg("[BroadcastCXReceipts]")
cxReceipts, err := node.Blockchain().ReadCXReceipts(toShardID, newBlock.NumberU64(), newBlock.Hash(), false)
if err != nil || len(cxReceipts) == 0 {
utils.Logger().Warn().Err(err).Uint32("ToShardID", toShardID).Int("numCXReceipts", len(cxReceipts)).Msg("[BroadcastCXReceipts] No ReadCXReceipts found")
return
}
merkleProof, err := node.Blockchain().CXMerkleProof(toShardID, newBlock)
if err != nil {
utils.Logger().Warn().Uint32("ToShardID", toShardID).Msg("[BroadcastCXReceipts] Unable to get merkleProof")
return
}
utils.Logger().Info().Uint32("ToShardID", toShardID).Msg("[BroadcastCXReceipts] ReadCXReceipts and MerkleProof Found")
groupID := p2p.ShardID(toShardID)
go node.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(groupID)}, host.ConstructP2pMessage(byte(0), proto_node.ConstructCXReceiptsProof(cxReceipts, merkleProof, newBlock.Header(), commitSig, commitBitmap)))
}
// BroadcastMissingCXReceipts broadcasts missing cross shard receipts per request // BroadcastMissingCXReceipts broadcasts missing cross shard receipts per request
func (node *Node) BroadcastMissingCXReceipts() { func (node *Node) BroadcastMissingCXReceipts() {
sendNextTime := []common.Hash{} sendNextTime := []core.CxEntry{}
it := node.CxPool.Pool().Iterator() it := node.CxPool.Pool().Iterator()
for blockHash := range it.C { for entry := range it.C {
blk := node.Blockchain().GetBlockByHash(blockHash.(common.Hash)) cxEntry := entry.(core.CxEntry)
toShardID := cxEntry.ToShardID
blk := node.Blockchain().GetBlockByHash(cxEntry.BlockHash)
if blk == nil { if blk == nil {
continue continue
} }
blockNum := blk.NumberU64() blockNum := blk.NumberU64()
nextBlk := node.Blockchain().GetBlockByNumber(blockNum + 1) nextHeader := node.Blockchain().GetHeaderByNumber(blockNum + 1)
if nextBlk == nil { if nextHeader == nil {
sendNextTime = append(sendNextTime, blockHash.(common.Hash)) sendNextTime = append(sendNextTime, cxEntry)
continue continue
} }
sig := nextBlk.Header().LastCommitSignature() sig := nextHeader.LastCommitSignature()
bitmap := nextBlk.Header().LastCommitBitmap() bitmap := nextHeader.LastCommitBitmap()
lastCommits := append(sig[:], bitmap...) lastCommits := append(sig[:], bitmap...)
go node.BroadcastCXReceipts(blk, lastCommits) go node.BroadcastCXReceiptsWithShardID(blk, lastCommits, toShardID)
} }
node.CxPool.Clear() node.CxPool.Clear()
// this should not happen or maybe happen for impatient user // this should not happen or maybe happen for impatient user
for _, blockHash := range sendNextTime { for _, entry := range sendNextTime {
node.CxPool.Add(blockHash) node.CxPool.Add(entry)
} }
} }

Loading…
Cancel
Save