[cleanup] remove is_genesis flag

Signed-off-by: Leo Chen <leo@harmony.one>

[nodetype] add nodetype to replace is_explorer

Signed-off-by: Leo Chen <leo@harmony.one>

fix beacon sync channel blocking issue

use lastMileMux to protect lastMileBlock queue to avoid potential blocking; use sleep instead of ticker

Fix the beacon committee check

[release] add release action to go_executable_build.sh

Signed-off-by: Leo Chen <leo@harmony.one>

[node.sh] add -d option

-d              download only

Signed-off-by: Leo Chen <leo@harmony.one>

[node.sh] add -T node_type option

-T node_type       support node type (validator/explorer)

Signed-off-by: Leo Chen <leo@harmony.one>

[node.sh] backward compatible with older harmony binary

Signed-off-by: Leo Chen <leo@harmony.one>

[node.sh] support -i shard_id option

-i shard_id             specify shard_id, this is applicable only to
explorer node

Signed-off-by: Leo Chen <leo@harmony.one>

Revisited api

Fix rpc integration

address some minor issues in comments and code

addressed comments on others' buckets

Add Global Access to OS Temp Directory Variable and Move DHT Files Into Temp Directory

Add flag to disable signers

Fix explorer handler

Fix explorer handler

fix explorer for loop

add shardID information in getTransactionByHash RPC CALL

Add CXReceipt RPC call; Add indexing for CXReceipt by TxHash

use only one index for CXEntryLookup; use shardID instead of fromShardID in RPC returned results
pull/1587/head
flicker-harmony 5 years ago
parent 49b94e7bbb
commit 809d77ebf7
  1. 1
      api/service/explorer/service.go
  2. 4
      cmd/harmony/main.go
  3. 3
      core/blockchain.go
  4. 73
      core/rawdb/accessors_indexes.go
  5. 10
      core/rawdb/schema.go
  6. 4
      core/types/block.go
  7. 7
      core/types/bodyv0.go
  8. 16
      core/types/bodyv1.go
  9. 8
      internal/hmyapi/transactionpool.go
  10. 56
      internal/hmyapi/types.go
  11. 60
      node/node_explorer.go

@ -187,6 +187,7 @@ func (s *Service) GetExplorerBlocks(w http.ResponseWriter, r *http.Request) {
if withSignersParam == "true" {
withSigners = true
}
data := &Data{
Blocks: []*Block{},
}

@ -457,8 +457,8 @@ func main() {
nodeConfig := createGlobalConfig()
currentNode := setupConsensusAndNode(nodeConfig)
if nodeConfig.ShardID != 0 {
utils.GetLogInstance().Info("SupportBeaconSyncing", "shardID", currentNode.Blockchain().ShardID(), "shardID1", nodeConfig.ShardID)
if nodeConfig.ShardID != 0 && currentNode.NodeConfig.Role() != nodeconfig.ExplorerNode {
utils.GetLogInstance().Info("SupportBeaconSyncing", "shardID", currentNode.Blockchain().ShardID(), "shardID", nodeConfig.ShardID)
go currentNode.SupportBeaconSyncing()
}

@ -1109,6 +1109,9 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
rawdb.WriteTxLookupEntries(batch, block)
rawdb.WritePreimages(batch, block.NumberU64(), state.Preimages())
// write the positional metadata for CXReceipts lookups
rawdb.WriteCxLookupEntries(batch, block)
status = CanonStatTy
} else {
status = SideStatTy

@ -123,3 +123,76 @@ func WriteBloomBits(db DatabaseWriter, bit uint, section uint64, head common.Has
utils.Logger().Error().Err(err).Msg("Failed to store bloom bits")
}
}
// ReadCxLookupEntry retrieves the positional metadata associated with a transaction hash
// to allow retrieving cross shard receipt by hash in destination shard
// not the original transaction in source shard
// return nil if not found
func ReadCxLookupEntry(db DatabaseReader, hash common.Hash) (common.Hash, uint64, uint64) {
data, _ := db.Get(cxLookupKey(hash))
if len(data) == 0 {
return common.Hash{}, 0, 0
}
var entry TxLookupEntry
if err := rlp.DecodeBytes(data, &entry); err != nil {
utils.Logger().Error().Err(err).Str("hash", hash.Hex()).Msg("Invalid transaction lookup entry RLP")
return common.Hash{}, 0, 0
}
return entry.BlockHash, entry.BlockIndex, entry.Index
}
// WriteCxLookupEntries stores a positional metadata for every transaction from
// a block, enabling hash based transaction and receipt lookups.
func WriteCxLookupEntries(db DatabaseWriter, block *types.Block) {
previousSum := 0
for _, cxp := range block.IncomingReceipts() {
for j, cx := range cxp.Receipts {
entry := TxLookupEntry{
BlockHash: block.Hash(),
BlockIndex: block.NumberU64(),
Index: uint64(j + previousSum),
}
data, err := rlp.EncodeToBytes(entry)
if err != nil {
utils.Logger().Error().Err(err).Msg("Failed to encode transaction lookup entry")
}
if err := db.Put(cxLookupKey(cx.TxHash), data); err != nil {
utils.Logger().Error().Err(err).Msg("Failed to store transaction lookup entry")
}
}
previousSum += len(cxp.Receipts)
}
}
// DeleteCxLookupEntry removes all transaction data associated with a hash.
func DeleteCxLookupEntry(db DatabaseDeleter, hash common.Hash) {
db.Delete(cxLookupKey(hash))
}
// ReadCXReceipt retrieves a specific transaction from the database, along with
// its added positional metadata.
func ReadCXReceipt(db DatabaseReader, hash common.Hash) (*types.CXReceipt, common.Hash, uint64, uint64) {
blockHash, blockNumber, cxIndex := ReadCxLookupEntry(db, hash)
if blockHash == (common.Hash{}) {
return nil, common.Hash{}, 0, 0
}
body := ReadBody(db, blockHash, blockNumber)
if body == nil {
utils.Logger().Error().
Uint64("number", blockNumber).
Str("hash", blockHash.Hex()).
Uint64("index", cxIndex).
Msg("block Body referenced missing")
return nil, common.Hash{}, 0, 0
}
cx := body.CXReceiptAt(int(cxIndex))
if cx == nil {
utils.Logger().Error().
Uint64("number", blockNumber).
Str("hash", blockHash.Hex()).
Uint64("index", cxIndex).
Msg("CXReceipt referenced missing")
return nil, common.Hash{}, 0, 0
}
return cx, blockHash, blockNumber, cxIndex
}

@ -51,8 +51,9 @@ var (
blockBodyPrefix = []byte("b") // blockBodyPrefix + num (uint64 big endian) + hash -> block body
blockReceiptsPrefix = []byte("r") // blockReceiptsPrefix + num (uint64 big endian) + hash -> block receipts
txLookupPrefix = []byte("l") // txLookupPrefix + hash -> transaction/receipt lookup metadata
bloomBitsPrefix = []byte("B") // bloomBitsPrefix + bit (uint16 big endian) + section (uint64 big endian) + hash -> bloom bits
txLookupPrefix = []byte("l") // txLookupPrefix + hash -> transaction/receipt lookup metadata
cxLookupPrefix = []byte("cx") // cxLookupPrefix + hash -> cxReceipt lookup metadata
bloomBitsPrefix = []byte("B") // bloomBitsPrefix + bit (uint16 big endian) + section (uint64 big endian) + hash -> bloom bits
shardStatePrefix = []byte("ss") // shardStatePrefix + num (uint64 big endian) + hash -> shardState
lastCommitsKey = []byte("LastCommits")
@ -137,6 +138,11 @@ func txLookupKey(hash common.Hash) []byte {
return append(txLookupPrefix, hash.Bytes()...)
}
// cxLookupKey = cxLookupPrefix + hash
func cxLookupKey(hash common.Hash) []byte {
return append(cxLookupPrefix, hash.Bytes()...)
}
// bloomBitsKey = bloomBitsPrefix + bit (uint16 big endian) + section (uint64 big endian) + hash
func bloomBitsKey(bit uint, section uint64, hash common.Hash) []byte {
key := append(append(bloomBitsPrefix, make([]byte, 10)...), hash.Bytes()...)

@ -86,6 +86,10 @@ type BodyInterface interface {
// It returns nil if index is out of bounds.
TransactionAt(index int) *Transaction
// CXReceiptAt returns the CXReceipt given index (calculated from IncomingReceipts)
// It returns nil if index is out of bounds
CXReceiptAt(index int) *CXReceipt
// SetTransactions sets the list of transactions with a deep copy of the
// given list.
SetTransactions(newTransactions []*Transaction)

@ -39,6 +39,13 @@ func (b *BodyV0) TransactionAt(index int) *Transaction {
return b.f.Transactions[index].Copy()
}
// CXReceiptAt returns the CXReceipt at given index in this block
// It returns nil if index is out of bounds
// V0 will just return nil because we don't support CXReceipt
func (b *BodyV0) CXReceiptAt(index int) *CXReceipt {
return nil
}
// SetTransactions sets the list of transactions with a deep copy of the given
// list.
func (b *BodyV0) SetTransactions(newTransactions []*Transaction) {

@ -39,6 +39,22 @@ func (b *BodyV1) TransactionAt(index int) *Transaction {
return b.f.Transactions[index].Copy()
}
// CXReceiptAt returns the CXReceipt at given index in this block
// It returns nil if index is out of bounds
func (b *BodyV1) CXReceiptAt(index int) *CXReceipt {
if index < 0 {
return nil
}
for _, cxp := range b.f.IncomingReceipts {
cxs := cxp.Receipts
if index < len(cxs) {
return cxs[index].Copy()
}
index -= len(cxs)
}
return nil
}
// SetTransactions sets the list of transactions with a deep copy of the given
// list.
func (b *BodyV1) SetTransactions(newTransactions []*Transaction) {

@ -212,3 +212,11 @@ func (s *PublicTransactionPoolAPI) PendingTransactions() ([]*RPCTransaction, err
}
return transactions, nil
}
// GetCXReceiptByHash returns the transaction for the given hash
func (s *PublicTransactionPoolAPI) GetCXReceiptByHash(ctx context.Context, hash common.Hash) *RPCCXReceipt {
if cx, blockHash, blockNumber, _ := rawdb.ReadCXReceipt(s.b.ChainDb(), hash); cx != nil {
return newRPCCXReceipt(cx, blockHash, blockNumber)
}
return nil
}

@ -22,11 +22,43 @@ type RPCTransaction struct {
To *common.Address `json:"to"`
TransactionIndex hexutil.Uint `json:"transactionIndex"`
Value *hexutil.Big `json:"value"`
ShardID uint32 `json:"shardID"`
ToShardID uint32 `json:"toShardID"`
V *hexutil.Big `json:"v"`
R *hexutil.Big `json:"r"`
S *hexutil.Big `json:"s"`
}
// RPCCXReceipt represents a CXReceipt that will serialize to the RPC representation of a CXReceipt
type RPCCXReceipt struct {
BlockHash common.Hash `json:"blockHash"`
BlockNumber *hexutil.Big `json:"blockNumber"`
TxHash common.Hash `json:"hash"`
From common.Address `json:"from"`
To *common.Address `json:"to"`
ShardID uint32 `json:"shardID"`
ToShardID uint32 `json:"toShardID"`
Amount *hexutil.Big `json:"value"`
}
// newRPCCXReceipt returns a CXReceipt that will serialize to the RPC representation
func newRPCCXReceipt(cx *types.CXReceipt, blockHash common.Hash, blockNumber uint64) *RPCCXReceipt {
result := &RPCCXReceipt{
BlockHash: blockHash,
TxHash: cx.TxHash,
From: cx.From,
To: cx.To,
Amount: (*hexutil.Big)(cx.Amount),
ShardID: cx.ShardID,
ToShardID: cx.ToShardID,
}
if blockHash != (common.Hash{}) {
result.BlockHash = blockHash
result.BlockNumber = (*hexutil.Big)(new(big.Int).SetUint64(blockNumber))
}
return result
}
// newRPCTransaction returns a transaction that will serialize to the RPC
// representation, with the given location metadata set (if available).
func newRPCTransaction(tx *types.Transaction, blockHash common.Hash, blockNumber uint64, index uint64) *RPCTransaction {
@ -38,17 +70,19 @@ func newRPCTransaction(tx *types.Transaction, blockHash common.Hash, blockNumber
v, r, s := tx.RawSignatureValues()
result := &RPCTransaction{
From: from,
Gas: hexutil.Uint64(tx.Gas()),
GasPrice: (*hexutil.Big)(tx.GasPrice()),
Hash: tx.Hash(),
Input: hexutil.Bytes(tx.Data()),
Nonce: hexutil.Uint64(tx.Nonce()),
To: tx.To(),
Value: (*hexutil.Big)(tx.Value()),
V: (*hexutil.Big)(v),
R: (*hexutil.Big)(r),
S: (*hexutil.Big)(s),
From: from,
Gas: hexutil.Uint64(tx.Gas()),
GasPrice: (*hexutil.Big)(tx.GasPrice()),
Hash: tx.Hash(),
Input: hexutil.Bytes(tx.Data()),
Nonce: hexutil.Uint64(tx.Nonce()),
To: tx.To(),
Value: (*hexutil.Big)(tx.Value()),
ShardID: tx.ShardID(),
ToShardID: tx.ToShardID(),
V: (*hexutil.Big)(v),
R: (*hexutil.Big)(r),
S: (*hexutil.Big)(s),
}
if blockHash != (common.Hash{}) {
result.BlockHash = blockHash

@ -72,7 +72,7 @@ func (node *Node) ExplorerMessageHandler(payload []byte) {
return
}
node.AddNewBlockForExplorer()
node.AddNewBlockForExplorer(block)
node.commitBlockForExplorer(block)
} else if msg.Type == msg_pb.MessageType_PREPARED {
@ -91,7 +91,7 @@ func (node *Node) ExplorerMessageHandler(payload []byte) {
msgs := node.Consensus.PbftLog.GetMessagesByTypeSeqHash(msg_pb.MessageType_COMMITTED, blockObj.NumberU64(), blockObj.Hash())
// If found, then add the new block into blockchain db.
if len(msgs) > 0 {
node.AddNewBlockForExplorer()
node.AddNewBlockForExplorer(blockObj)
node.commitBlockForExplorer(blockObj)
}
}
@ -99,41 +99,29 @@ func (node *Node) ExplorerMessageHandler(payload []byte) {
}
// AddNewBlockForExplorer add new block for explorer.
func (node *Node) AddNewBlockForExplorer() {
utils.Logger().Debug().Msg("[Explorer] Add new block for explorer")
// Search for the next block in PbftLog and commit the block into blockchain for explorer node.
for {
blocks := node.Consensus.PbftLog.GetBlocksByNumber(node.Blockchain().CurrentBlock().NumberU64() + 1)
if len(blocks) == 0 {
break
} else {
if len(blocks) > 1 {
utils.Logger().Error().Msg("[Explorer] We should have not received more than one block with the same block height.")
}
utils.Logger().Debug().Uint64("blockHeight", blocks[0].NumberU64()).Msg("Adding new block for explorer node")
if err := node.AddNewBlock(blocks[0]); err == nil {
if core.IsEpochLastBlock(blocks[0]) {
node.Consensus.UpdateConsensusInformation()
}
// Clean up the blocks to avoid OOM.
node.Consensus.PbftLog.DeleteBlockByNumber(blocks[0].NumberU64())
// Do dump all blocks from state syncing for explorer one time
// TODO: some blocks can be dumped before state syncing finished.
// And they would be dumped again here. Please fix it.
once.Do(func() {
utils.Logger().Info().Int64("starting height", int64(blocks[0].NumberU64())-1).
Msg("[Explorer] Populating explorer data from state synced blocks")
go func() {
for blockHeight := int64(blocks[0].NumberU64()) - 1; blockHeight >= 0; blockHeight-- {
explorer.GetStorageInstance(node.SelfPeer.IP, node.SelfPeer.Port, true).Dump(
node.Blockchain().GetBlockByNumber(uint64(blockHeight)), uint64(blockHeight))
}
}()
})
} else {
utils.Logger().Error().Err(err).Msg("[Explorer] Error when adding new block for explorer node")
}
func (node *Node) AddNewBlockForExplorer(block *types.Block) {
utils.Logger().Debug().Uint64("blockHeight", block.NumberU64()).Msg("[Explorer] Adding new block for explorer node")
if err := node.AddNewBlock(block); err == nil {
if core.IsEpochLastBlock(block) {
node.Consensus.UpdateConsensusInformation()
}
// Clean up the blocks to avoid OOM.
node.Consensus.PbftLog.DeleteBlockByNumber(block.NumberU64())
// Do dump all blocks from state syncing for explorer one time
// TODO: some blocks can be dumped before state syncing finished.
// And they would be dumped again here. Please fix it.
once.Do(func() {
utils.Logger().Info().Int64("starting height", int64(block.NumberU64())-1).
Msg("[Explorer] Populating explorer data from state synced blocks")
go func() {
for blockHeight := int64(block.NumberU64()) - 1; blockHeight >= 0; blockHeight-- {
explorer.GetStorageInstance(node.SelfPeer.IP, node.SelfPeer.Port, true).Dump(
node.Blockchain().GetBlockByNumber(uint64(blockHeight)), uint64(blockHeight))
}
}()
})
} else {
utils.Logger().Error().Err(err).Msg("[Explorer] Error when adding new block for explorer node")
}
}

Loading…
Cancel
Save