Basic Python SDK (#1)

* [project] Update README with correct Python version requirement

* [request] Add rpc request lib
[exceptions] Add exceptions wrapper lib
[common] Define defaults for endpoint & timeout

* [blockchain] Added blockchain related RPCs
[staking] Added staking related RPCs

* [blockchain] Added more RPCs, organized file based on RPC type (network or block)

* [account] Add account related RPCs (balance, transaction history)

* [transaction] Added regular & staking transaction RPCs

* [rpc] Move RPCs to separate lib
[common] Remove common & add defaults to request

* [rpc] Add __init__ file for future use

* [util] Update utils to use rpc lib

* [rpc] Add return type annotations

* [make] Fix import errors

* [test] Add tests for rpc_request using local blockchain
[make] Update make tests to print reason to skip test

* [test] Add proper testing infra for RPC calls

* [numbers] Add utils for converting between atto and one

* [test] Use cross shard as the inital funding transaction & enable cross shard RPC tests

* [test] Refactor test setup to not wait if setup on chain is already done

* [pyhmy] Remove dev version & update major version

* [init] Update imports

* [account] Add get_balance_on_all_shards

* [rpc] Fix documentation errors

* [util] Fix is_active_shard error

* [rpc] Address PR comments
pull/2/head
Janet Liang 5 years ago committed by GitHub
parent b50b8214ae
commit 3cf778876e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      Makefile
  2. 10
      README.md
  3. 6
      pyhmy/__init__.py
  4. 2
      pyhmy/_version.py
  5. 42
      pyhmy/numbers.py
  6. 0
      pyhmy/rpc/__init__.py
  7. 207
      pyhmy/rpc/account.py
  8. 378
      pyhmy/rpc/blockchain.py
  9. 23
      pyhmy/rpc/exceptions.py
  10. 119
      pyhmy/rpc/request.py
  11. 207
      pyhmy/rpc/staking.py
  12. 379
      pyhmy/rpc/transaction.py
  13. 29
      pyhmy/util.py
  14. 36
      tests/numbers-pyhmy/test_numbers.py
  15. 105
      tests/request-pyhmy/test_request.py
  16. 154
      tests/rpc-pyhmy/conftest.py
  17. 56
      tests/rpc-pyhmy/test_account.py
  18. 98
      tests/rpc-pyhmy/test_blockchain.py
  19. 66
      tests/rpc-pyhmy/test_staking.py
  20. 125
      tests/rpc-pyhmy/test_transaction.py

@ -25,7 +25,7 @@ clean-py:
find . -name '*~' -exec rm -f {} +
test:
python3 -m py.test tests
python3 -m py.test -r s -s tests
install:
python3 -m pip install -e .

@ -1,11 +1,11 @@
# Pyhmy - Harmony's python utilities
**This library is for python 3 only.**
**This library only supports Python 3.6+**
A Python library for interacting and working the [Harmony blockchain](https://harmony.one/)
and [related codebases](https://github.com/harmony-one).
[Full documentation is located on Harmony's GitBook](https://harmony.one/) (in progress).
[Full documentation is located on Harmony's GitBook](https://docs.harmony.one/) (in progress).
## Installation
@ -18,8 +18,6 @@ Make sure you have Python3 installed, and use python3 to install pyhmy
sudo pip3 install pathlib
sudo pip3 install pyhmy
Requires Python3.5+
```
## Development
@ -31,6 +29,7 @@ make install
## Running tests
You need to run a local Harmony blockchain (instructions [here](https://github.com/harmony-one/harmony/README.md)) that has staking enabled.
You can run all of the tests with the following:
```
@ -53,3 +52,6 @@ make release
TODO: sample of how to use the library, reference Tezos.
TODO: start (and finish) some of the documentation.
TODO: add more blockchain rpcs
TODO: check None return types for rpcs
TODO: more detailed tests for rpcs

@ -3,6 +3,12 @@ import warnings
from ._version import __version__
from .rpc import (
account,
blockchain,
staking,
transaction
)
from .util import (
Typgpy,

@ -7,5 +7,5 @@ Provides pyhmy version information.
from incremental import Version
__version__ = Version('pyhmy', 20, 1, 30)
__version__ = Version('pyhmy', 20, 5, 1)
__all__ = ["__version__"]

@ -0,0 +1,42 @@
from decimal import Decimal
_conversion_unit = Decimal(1e18)
def convert_atto_to_one(atto):
"""
Convert ATTO to ONE
Parameters
----------
atto: str, int, float, decimal
Value in ATTO to convert to ONE
Float input will be truncated, since ATTO is the lowest possible denomination of ONE
Returns
-------
decimal
Converted value in ONE
"""
if isinstance(atto, float):
atto = int(atto)
return Decimal(atto) / _conversion_unit
def convert_one_to_atto(one):
"""
Convert ONE to ATTO
Parameters
----------
one: str, int, float, decimal
Value in ONE to convert to ATTO
Returns
-------
decimal
Converted value in ATTO
"""
if isinstance(one, float):
one = str(one)
return Decimal(one) * _conversion_unit

@ -0,0 +1,207 @@
from .request import (
rpc_request
)
from .blockchain import (
get_sharding_structure
)
_default_endpoint = 'http://localhost:9500'
_default_timeout = 30
def get_balance(address, endpoint=_default_endpoint, timeout=_default_timeout) -> int:
"""
Get current account balance
Parameters
----------
address: str
Address to get balance for
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
int
Account balance in ATTO
"""
params = [
address,
'latest'
]
return int(rpc_request('hmy_getBalance', params=params, endpoint=endpoint, timeout=timeout)['result'], 16)
def get_balance_by_block(address, block_num, endpoint=_default_endpoint, timeout=_default_timeout) -> int:
"""
Get account balance at time of given block
Parameters
----------
address: str
Address to get balance for
block_num: int
Block number to req
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
int
Account balance in ATTO at given block
"""
params = [
address,
str(hex(block_num))
]
return int(rpc_request('hmy_getBalanceByBlockNumber', params=params, endpoint=endpoint, timeout=timeout)['result'], 16)
def get_transaction_count(address, endpoint=_default_endpoint, timeout=_default_timeout) -> int:
"""
Get number of transactions & staking transactions sent by an account
Parameters
----------
address: str
Address to get transaction count for
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
int
Number of transactions sent by the account (account nonce)
"""
params = [
address,
'latest'
]
return int(rpc_request('hmy_getTransactionCount', params=params, endpoint=endpoint, timeout=timeout)['result'], 16)
def get_transaction_history(address, page=0, page_size=1000, include_full_tx=False, tx_type='ALL',
order='ASC', endpoint=_default_endpoint, timeout=_default_timeout
) -> list:
"""
Get list of transactions sent and/or received by the account
Parameters
----------
address: str
Address to get transaction history for
page: :obj:`int`, optional
Page to request for pagination
page_size: :obj:`int`, optional
Size of page for pagination
include_full_tx: :obj:`bool`, optional
True to include full transaction data
False to just get the transaction hash
tx_type: :obj:`str`, optional
'ALL' to get all transactions send & received by the address
'SENT' to get all transactions sent by the address
'RECEIVED' to get all transactions received by the address
order: :obj:`str`, optional
'ASC' to sort transactions in ascending order based on timestamp
'DESC' to sort transactions in descending order based on timestamp
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
list
# TODO: Add link to reference RPC documentation
"""
params = [
{
'address': address,
'pageIndex': page,
'pageSize': page_size,
'fullTx': include_full_tx,
'txType': tx_type,
'order': order
}
]
tx_history = rpc_request('hmy_getTransactionsHistory', params=params, endpoint=endpoint, timeout=timeout)
return tx_history['result']['transactions']
def get_staking_transaction_history(address, page=0, page_size=1000, include_full_tx=False, tx_type='ALL',
order='ASC', endpoint=_default_endpoint, timeout=_default_timeout
) -> list:
"""
Get list of staking transactions sent by the account
Parameters
----------
address: str
Address to get staking transaction history for
page: :obj:`int`, optional
Page to request for pagination
page-size: :obj:`int`, optional
Size of page for pagination
include_full_tx: :obj:`bool`, optional
True to include full staking transaction data
False to just get the staking transaction hash
tx_type: :obj:`str`, optional
'ALL' to get all staking transactions
order: :obj:`str`, optional
'ASC' to sort transactions in ascending order based on timestamp
'DESC' to sort transactions in descending order based on timestamp
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
list
# TODO: Add link to reference RPC documentation
"""
params = [
{
'address': address,
'pageIndex': page,
'pageSize': page_size,
'fullTx': include_full_tx,
'txType': tx_type,
'order': order
}
]
# Using v2 API, because getStakingTransactionHistory not implemented in v1
stx_history = rpc_request('hmyv2_getStakingTransactionsHistory', params=params, endpoint=endpoint, timeout=timeout)
return stx_history['result']['staking_transactions']
def get_balance_on_all_shards(address, endpoint=_default_endpoint, timeout=_default_timeout):
"""
Get current account balance in all shards
Parameters
----------
address: str
Address to get balance for
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds per request
Returns
-------
dict
Account balance per shard in ATTO
"""
balances = {}
sharding_structure = get_sharding_structure(endpoint=endpoint, timeout=timeout)
for shard in sharding_structure:
balances[shard['shardID']] = get_balance(address, endpoint=shard['http'], timeout=timeout)
return balances

@ -0,0 +1,378 @@
from .request import (
rpc_request
)
_default_endpoint = 'http://localhost:9500'
_default_timeout = 30
################
# Network RPCs #
################
def get_node_metadata(endpoint=_default_endpoint, timeout=_default_timeout) -> dict:
"""
Get config for the node
Parameters
----------
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
dict
# TODO: Add link to reference RPC documentation
"""
return rpc_request('hmy_getNodeMetadata', endpoint=endpoint, timeout=timeout)['result']
def get_sharding_structure(endpoint=_default_endpoint, timeout=_default_timeout) -> list:
"""
Get network sharding structure
Parameters
----------
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
list
# TODO: Add link to reference RPC documentation
"""
return rpc_request('hmy_getShardingStructure', endpoint=endpoint, timeout=timeout)['result']
def get_leader_address(endpoint=_default_endpoint, timeout=_default_timeout) -> str:
"""
Get current leader one address
Parameters
----------
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
str
One address of current leader
"""
return rpc_request('hmy_getLeader', endpoint=endpoint, timeout=timeout)['result']
def get_block_number(endpoint=_default_endpoint, timeout=_default_timeout) -> int:
"""
Get current block number
Parameters
----------
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
int
Current block number
"""
return int(rpc_request('hmy_blockNumber', endpoint=endpoint, timeout=timeout)['result'], 16)
def get_current_epoch(endpoint=_default_endpoint, timeout=_default_timeout) -> int:
"""
Get current epoch number
Parameters
----------
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
int
Current epoch number
"""
return int(rpc_request('hmy_getEpoch', endpoint=endpoint, timeout=timeout)['result'], 16)
def get_gas_price(endpoint=_default_endpoint, timeout=_default_timeout) -> int:
"""
Get network gas price
Parameters
----------
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
int
Network gas price
"""
return int(rpc_request('hmy_gasPrice', endpoint=endpoint, timeout=timeout)['result'], 16)
def get_num_peers(endpoint=_default_endpoint, timeout=_default_timeout) -> int:
"""
Get number of peers connected to the node
Parameters
----------
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
int
Number of connected peers
"""
return int(rpc_request('net_peerCount', endpoint=endpoint, timeout=timeout)['result'], 16)
##############
# Block RPCs #
##############
def get_latest_header(endpoint=_default_endpoint, timeout=_default_timeout) -> dict:
"""
Get block header of latest block
Parameters
----------
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
dict
# TODO: Add link to reference RPC documentation
"""
return rpc_request('hmy_latestHeader', endpoint=endpoint, timeout=timeout)['result']
def get_latest_headers(endpoint=_default_endpoint, timeout=_default_timeout) -> dict:
"""
Get block header of latest block for beacon chain & shard chain
Parameters
----------
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
dict
# TODO: Add link to reference RPC documentation
"""
return rpc_request('hmy_getLatestChainHeaders', endpoint=endpoint, timeout=timeout)['result']
def get_block_by_number(block_num, endpoint=_default_endpoint, include_full_tx=False, timeout=_default_timeout) -> dict:
"""
Get block by number
Parameters
----------
block_num: int
Block number to fetch
endpoint: :obj:`str`, optional
Endpoint to send request to
include_full_tx: :obj:`bool`, optional
Include list of full transactions data for each block
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
dict
# TODO: Add link to reference RPC documentation
"""
params = [
str(hex(block_num)),
include_full_tx
]
return rpc_request('hmy_getBlockByNumber', params=params, endpoint=endpoint, timeout=timeout)['result']
def get_block_by_hash(block_hash, endpoint=_default_endpoint, include_full_tx=False, timeout=_default_timeout) -> dict:
"""
Get block by hash
Parameters
----------
block_hash: str
Block hash to fetch
endpoint: :obj:`str`, optional
Endpoint to send request to
include_full_tx: :obj:`bool`, optional
Include list of full transactions data for each block
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
dict
# TODO: Add link to reference RPC documentation
"""
params = [
block_hash,
include_full_tx
]
return rpc_request('hmy_getBlockByHash', params=params, endpoint=endpoint, timeout=timeout)['result']
def get_block_transaction_count_by_number(block_num, endpoint=_default_endpoint, timeout=_default_timeout) -> int:
"""
Get transaction count for specific block number
Parameters
----------
block_num: int
Block number to get transaction count for
endpoint: :obj:`str`, optional
Endpoint to send request to
include_full_tx: :obj:`bool`, optional
Include list of full transactions data for each block
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
int
Number of transactions in the block
"""
params = [
str(hex(block_num))
]
return int(rpc_request('hmy_getBlockTransactionCountByNumber', params=params,
endpoint=endpoint, timeout=timeout)['result'], 16
)
def get_block_transaction_count_by_hash(block_hash, endpoint=_default_endpoint, timeout=_default_timeout) -> int:
"""
Get transaction count for specific block hash for
Parameters
----------
block_hash: str
Block hash to get transaction count
endpoint: :obj:`str`, optional
Endpoint to send request to
include_full_tx: :obj:`bool`, optional
Include list of full transactions data for each block
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
int
Number of transactions in the block
"""
params = [
block_hash
]
return int(rpc_request('hmy_getBlockTransactionCountByHash', params=params,
endpoint=endpoint, timeout=timeout)['result'], 16
)
def get_blocks(start_block, end_block, endpoint=_default_endpoint, include_full_tx=False,
include_signers=False, timeout=_default_timeout
) -> list:
"""
Get list of blocks from a range
Parameters
----------
start_block: int
First block to fetch (inclusive)
end_block: int
Last block to fetch (inclusive)
endpoint: :obj:`str`, optional
Endpoint to send request to
include_full_tx: :obj:`bool`, optional
Include list of full transactions data for each block
include_signers: :obj:`bool`, optional
Include list of signers for each block
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
list
# TODO: Add link to reference RPC documentation
"""
params = [
str(hex(start_block)),
str(hex(end_block)),
{
'withSigners': include_signers,
'fullTx': include_full_tx,
},
]
return rpc_request('hmy_getBlocks', params=params, endpoint=endpoint, timeout=timeout)['result']
def get_block_signers(block_num, endpoint=_default_endpoint, timeout=_default_timeout) -> list:
"""
Get list of block signers for specific block number
Parameters
----------
block_num: int
Block number to get signers for
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
list
List of one addresses that signed the block
"""
params = [
str(hex(block_num))
]
return rpc_request('hmy_getBlockSigners', params=params, endpoint=endpoint, timeout=timeout)['result']
def get_validators(epoch, endpoint=_default_endpoint, timeout=_default_timeout) -> dict:
"""
Get list of validators for specific epoch number
Parameters
----------
epoch: int
Epoch to get list of validators for
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
dict
# TODO: Add link to reference RPC documentation
"""
params = [
epoch
]
return rpc_request('hmy_getValidators', params=params, endpoint=endpoint, timeout=timeout)['result']

@ -0,0 +1,23 @@
import json
import requests
class RPCError(RuntimeError):
"""
Exception raised when RPC call returns an error
"""
class JSONDecodeError(json.decoder.JSONDecodeError):
"""
Wrapper for json lib DecodeError exception
"""
class RequestsError(requests.exceptions.RequestException):
"""
Wrapper for requests lib exceptions
"""
class RequestsTimeoutError(requests.exceptions.Timeout):
"""
Wrapper for requests lib Timeout exceptions
"""

@ -0,0 +1,119 @@
import json
import requests
from .exceptions import (
JSONDecodeError,
RequestsError,
RequestsTimeoutError,
RPCError
)
_default_endpoint = 'http://localhost:9500'
_default_timeout = 30
def base_request(method, params=None, endpoint=_default_endpoint, timeout=_default_timeout) -> str:
"""
Basic RPC request
Parameters
---------
method: str
RPC Method to call
params: :obj:`list`, optional
Parameters for the RPC method
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
str
Raw output from the request
Raises
------
TypeError
If params is not a list or None
RequestsTimeoutError
If request timed out
RequestsError
If other request error occured
"""
if params is None:
params = []
elif not isinstance(params, list):
raise TypeError(f'invalid type {params.__class__}')
try:
payload = {
"id": "1",
"jsonrpc": "2.0",
"method": method,
"params": params
}
headers = {
'Content-Type': 'application/json'
}
resp = requests.request('POST', endpoint, headers=headers, data=json.dumps(payload),
timeout=timeout, allow_redirects=True)
return resp.content
except requests.exceptions.Timeout as err:
raise RequestsTimeoutError() from err
except requests.exceptions.RequestException as err:
raise RequestsError() from err
def rpc_request(method, params=None, endpoint=_default_endpoint, timeout=_default_timeout) -> dict:
"""
RPC request
Parameters
---------
method: str
RPC Method to call
params: :obj:`list`, optional
Parameters for the RPC method
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
dict
Returns dictionary representation of RPC response
Example format:
{
"jsonrpc": "2.0",
"id": 1,
"result": ...
}
Raises
------
RPCError
If RPC response returned a blockchain error
JSONDecodeError
If RPC response format is not valid JSON object
See Also
--------
base_request
"""
raw_resp = base_request(method, params, endpoint, timeout)
try:
resp = json.loads(raw_resp)
if 'error' in resp:
raise RPCError(str(resp['error']))
return resp
except json.decoder.JSONDecodeError as err:
raise JSONDecodeError() from err
# TODO: Add GET requests

@ -0,0 +1,207 @@
from .request import (
rpc_request
)
_default_endpoint = 'http://localhost:9500'
_default_timeout = 30
##################
# Validator RPCs #
##################
def get_all_validator_addresses(endpoint=_default_endpoint, timeout=_default_timeout) -> list:
"""
Get list of all created validator addresses on chain
Parameters
----------
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
list
List of one addresses for all validators on chain
"""
return rpc_request('hmy_getAllValidatorAddresses', endpoint=endpoint, timeout=timeout)['result']
def get_validator_information(validator_addr, endpoint=_default_endpoint, timeout=_default_timeout) -> dict:
"""
Get validator information for validator address
Parameters
----------
validator_addr: str
One address of the validator to get information for
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
dict
# TODO: Add link to reference RPC documentation
"""
params = [
validator_addr
]
return rpc_request('hmy_getValidatorInformation', params=params, endpoint=endpoint, timeout=timeout)['result']
def get_all_validator_information(page=-1, endpoint=_default_endpoint, timeout=_default_timeout) -> list:
"""
Get validator information for all validators on chain
Parameters
----------
page: :obj:`int`, optional
Page to request (-1 for all validators)
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
list
# TODO: Add link to reference RPC documentation
"""
params = [
page
]
return rpc_request('hmy_getAllValidatorInformation', params=params, endpoint=endpoint, timeout=timeout)['result']
###################
# Delegation RPCs #
###################
def get_delegations_by_delegator(delegator_addr, endpoint=_default_endpoint, timeout=_default_timeout) -> list:
"""
Get list of delegations by a delegator
Parameters
----------
delegator_addr: str
Delegator address to get list of delegations for
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
list
# TODO: Add link to reference RPC documentation
"""
params = [
delegator_addr
]
return rpc_request('hmy_getDelegationsByDelegator', params=params, endpoint=endpoint, timeout=timeout)['result']
def get_delegations_by_validator(validator_addr, endpoint=_default_endpoint, timeout=_default_timeout) -> list:
"""
Get list of delegations to a validator
Parameters
----------
validator_addr: str
Validator address to get list of delegations for
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
list
# TODO: Add link to reference RPC documentation
"""
params = [
validator_addr
]
return rpc_request('hmy_getDelegationsByValidator', params=params, endpoint=endpoint, timeout=timeout)['result']
########################
# Staking Network RPCs #
########################
def get_current_utility_metrics(endpoint=_default_endpoint, timeout=_default_timeout) -> dict:
"""
Get current utility metrics of network
Parameters
----------
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
dict
# TODO: Add link to reference RPC documentation
"""
return rpc_request('hmy_getCurrentUtilityMetrics', endpoint=endpoint, timeout=timeout)['result']
def get_staking_network_info(endpoint=_default_endpoint, timeout=_default_timeout) -> dict:
"""
Get staking network information
Parameters
----------
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
dict
# TODO: Add link to reference RPC documentation
"""
return rpc_request('hmy_getStakingNetworkInfo', endpoint=endpoint, timeout=timeout)['result']
def get_super_committees(endpoint=_default_endpoint, timeout=_default_timeout) -> dict:
"""
Get voting committees for current & previous epoch
Parameters
----------
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
dict
# TODO: Add link to reference RPC documentation
"""
return rpc_request('hmy_getSuperCommittees', endpoint=endpoint, timeout=timeout)['result']
def get_raw_median_stake_snapshot(endpoint=_default_endpoint, timeout=_default_timeout) -> dict:
"""
Get median stake & additional committee data of the current epoch
Parameters
----------
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
dict
# TODO: Add link to reference RPC documentation
"""
return rpc_request('hmy_getMedianRawStakeSnapshot', endpoint=endpoint, timeout=timeout)['result']

@ -0,0 +1,379 @@
from .request import (
rpc_request
)
_default_endpoint = 'http://localhost:9500'
_default_timeout = 30
#########################
# Transaction Pool RPCs #
#########################
def get_pending_transactions(endpoint=_default_endpoint, timeout=_default_timeout) -> list:
"""
Get list of pending transactions
Parameters
----------
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
list
# TODO: Add link to reference RPC documentation
"""
return rpc_request('hmy_pendingTransactions', endpoint=endpoint, timeout=timeout)['result']
####################
# Transaction RPCs #
####################
def get_transaction_by_hash(tx_hash, endpoint=_default_endpoint, timeout=_default_timeout) -> dict:
"""
Get transaction by hash
Parameters
----------
tx_hash: str
Transaction hash to fetch
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
dict
# TODO: Add link to reference RPC documentation
"""
params = [
tx_hash
]
return rpc_request('hmy_getTransactionByHash', params=params, endpoint=endpoint, timeout=timeout)['result']
def get_transaction_by_block_hash_and_index(block_hash, tx_index,
endpoint=_default_endpoint, timeout=_default_timeout
) -> dict:
"""
Get transaction based on index in list of transactions in a block by block hash
Parameters
----------
block_hash: str
Block hash for transaction
tx_index: int
Transaction index to fetch
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
dict
# TODO: Add link to reference RPC documentation
"""
params = [
block_hash,
str(hex(tx_index))
]
return rpc_request('hmy_getTransactionByBlockHashAndIndex', params=params, endpoint=endpoint, timeout=timeout)['result']
def get_transaction_by_block_number_and_index(block_num, tx_index,
endpoint=_default_endpoint, timeout=_default_timeout
) -> dict:
"""
Get transaction based on index in list of transactions in a block by block number
Parameters
----------
block_num: int
Block number for transaction
tx_index: int
Transaction index to fetch
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
dict
# TODO: Add link to reference RPC documentation
"""
params = [
str(hex(block_num)),
str(hex(tx_index))
]
return rpc_request('hmy_getTransactionByBlockNumberAndIndex', params=params, endpoint=endpoint, timeout=timeout)['result']
def get_transaction_receipt(tx_receipt, endpoint=_default_endpoint, timeout=_default_timeout) -> dict:
"""
Get transaction receipt
Parameters
----------
tx_receipt: str
Transaction receipt to fetch
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
dict
# TODO: Add link to reference RPC documentation
"""
params = [
tx_receipt
]
return rpc_request('hmy_getTransactionReceipt', params=params, endpoint=endpoint, timeout=timeout)['result']
def get_transaction_error_sink(endpoint=_default_endpoint, timeout=_default_timeout) -> list:
"""
Get transaction error sink
Parameters
----------
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
list
# TODO: Add link to reference RPC documentation
"""
return rpc_request('hmy_getCurrentTransactionErrorSink', endpoint=endpoint, timeout=timeout)['result']
def send_raw_transaction(raw_tx, endpoint=_default_endpoint, timeout=_default_timeout) -> str:
"""
Send signed transaction
Parameters
----------
raw_tx: str
Hex representation of signed transaction
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
str
Transaction hash
"""
params = [
raw_tx
]
return rpc_request('hmy_sendRawTransaction', params=params, endpoint=endpoint, timeout=timeout)['result']
###############################
# CrossShard Transaction RPCs #
###############################
def get_pending_cx_receipts(endpoint=_default_endpoint, timeout=_default_timeout) -> list:
"""
Get list of pending cross shard transactions
Parameters
----------
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
list
# TODO: Add link to reference RPC documentation
"""
return rpc_request('hmy_getPendingCXReceipts', endpoint=endpoint, timeout=timeout)['result']
def get_cx_receipt_by_hash(cx_hash, endpoint = _default_endpoint, timeout = _default_timeout) -> dict:
"""
Get cross shard receipt by hash
Parameters
----------
cx_hash: str
Hash of cross shard transaction receipt
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
dict
# TODO: Add link to reference RPC documentation
"""
params = [
cx_hash
]
return rpc_request('hmy_getCXReceiptByHash', params=params, endpoint=endpoint, timeout=timeout)['result']
def resend_cx_receipt(cx_receipt, endpoint=_default_endpoint, timeout=_default_timeout) -> bool:
"""
Send cross shard receipt
Parameters
----------
cx_hash: str
Hash of cross shard transaction receipt
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
bool
If the receipt transactions was succesfully resent
"""
params = [
cx_receipt
]
return rpc_request('hmy_resendCx', params=params, endpoint=endpoint, timeout=timeout)['result']
############################
# Staking Transaction RPCs #
############################
def get_staking_transaction_by_hash(tx_hash, endpoint=_default_endpoint, timeout=_default_timeout) -> dict:
"""
Get staking transaction by hash
Parameters
----------
tx_hash: str
Hash of staking transaction to fetch
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
dict
# TODO: Add link to reference RPC documentation
"""
params = [
tx_hash
]
return rpc_request('hmy_getStakingTransactionByHash', params=params, endpoint=endpoint, timeout=timeout)['result']
def get_staking_transaction_by_block_hash_and_index(block_hash, tx_index,
endpoint=_default_endpoint, timeout=_default_timeout
) -> dict:
"""
Get staking transaction based on index in list of staking transactions for a block by block hash
Parameters
----------
block_hash: str
Block hash for staking transaction
tx_index: int
Staking transaction index to fetch
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
dict
# TODO: Add link to reference RPC documentation
"""
params = [
block_hash,
str(hex(tx_index))
]
return rpc_request('hmy_getStakingTransactionByBlockHashAndIndex', params=params, endpoint=endpoint, timeout=timeout)['result']
def get_staking_transaction_by_block_number_and_index(block_num, tx_index,
endpoint=_default_endpoint, timeout=_default_timeout
) -> dict:
"""
Get staking transaction based on index in list of staking transactions for a block by block number
Parameters
----------
block_num: int
Block number for staking transaction
tx_index: int
Staking transaction index to fetch
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
dict
# TODO: Add link to reference RPC documentation
"""
params = [
str(hex(block_num)),
str(hex(tx_index))
]
return rpc_request('hmy_getStakingTransactionByBlockNumberAndIndex', params=params, endpoint=endpoint, timeout=timeout)['result']
def get_staking_transaction_error_sink(endpoint=_default_endpoint, timeout=_default_timeout) -> list:
"""
Get staking transaction error sink
Parameters
----------
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
list
# TODO: Add link to reference RPC documentation
"""
return rpc_request('hmy_getCurrentStakingErrorSink', endpoint=endpoint, timeout=timeout)['result']
def send_raw_staking_transaction(raw_tx, endpoint=_default_endpoint, timeout=_default_timeout) -> str:
"""
Send signed staking transaction
Parameters
----------
raw_tx: str
Hex representation of signed staking transaction
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
str
Staking transaction hash
"""
params = [
raw_tx
]
return rpc_request('hmy_sendRawStakingTransaction', params=params, endpoint=endpoint, timeout=timeout)['result']

@ -6,6 +6,17 @@ import datetime
import requests
from .rpc.blockchain import (
get_latest_header
)
from .rpc.exceptions import (
RPCError,
JSONDecodeError,
RequestsError,
RequestsTimeoutError,
)
datetime_format = "%Y-%m-%d %H:%M:%S.%f"
@ -13,7 +24,7 @@ class Typgpy(str):
"""
Typography constants for pretty printing.
Note that an ENDC is needed to made the end of a 'highlighted' text segment.
Note that an ENDC is needed to mark the end of a 'highlighted' text segment.
"""
HEADER = '\033[95m'
OKBLUE = '\033[94m'
@ -45,24 +56,14 @@ def is_active_shard(endpoint, delay_tolerance=60):
:param delay_tolerance: The time (in seconds) that the shard timestamp can be behind
:return: If shard is active or not
"""
payload = """{
"jsonrpc": "2.0",
"method": "hmy_latestHeader",
"params": [ ],
"id": 1
}"""
headers = {
'Content-Type': 'application/json'
}
try:
curr_time = datetime.datetime.utcnow()
response = requests.request('POST', endpoint, headers=headers, data=payload, allow_redirects=False, timeout=3)
body = json.loads(response.content)
time_str = body["result"]["timestamp"][:19] + '.0' # Fit time format
latest_header = get_latest_header(endpoint=endpoint)
time_str = latest_header["timestamp"][:19] + '.0' # Fit time format
timestamp = datetime.datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S.%f").replace(tzinfo=None)
time_delta = curr_time - timestamp
return abs(time_delta.seconds) < delay_tolerance
except (requests.ConnectionError, json.decoder.JSONDecodeError, KeyError):
except (RPCError, JSONDecodeError, RequestsError, RequestsTimeoutError):
return False

@ -0,0 +1,36 @@
from decimal import Decimal
import pytest
from pyhmy import (
numbers
)
@pytest.mark.run(order=1)
def test_convert_atto_to_one():
a = numbers.convert_atto_to_one(1e18)
assert Decimal(1) == a
b = numbers.convert_atto_to_one(1e18 + 0.6)
assert Decimal(1) == b
c = numbers.convert_atto_to_one('1' + ('0' * 18))
assert Decimal(1) == c
d = numbers.convert_atto_to_one(Decimal(1e18))
assert Decimal(1) == d
@pytest.mark.run(order=2)
def test_convert_one_to_atto():
a = numbers.convert_one_to_atto(1e-18)
assert Decimal(1) == a
b = numbers.convert_one_to_atto(1.5)
assert Decimal(1.5e18) == b
c = numbers.convert_one_to_atto('1')
assert Decimal(1e18) == c
d = numbers.convert_one_to_atto(Decimal(1))
assert Decimal(1e18) == d

@ -0,0 +1,105 @@
import json
import socket
import pytest
import requests
from pyhmy.rpc import (
exceptions,
request
)
@pytest.fixture(scope="session", autouse=True)
def setup():
endpoint = 'http://localhost:9500'
timeout = 30
method = 'hmy_getNodeMetadata'
params = []
payload = {
"id": "1",
"jsonrpc": "2.0",
"method": method,
"params": params
}
headers = {
'Content-Type': 'application/json'
}
try:
response = requests.request('POST', endpoint, headers=headers,
data=json.dumps(payload), timeout=timeout, allow_redirects=True)
except Exception as e:
pytest.skip("can not connect to local blockchain", allow_module_level=True)
@pytest.mark.run(order=1)
def test_request_connection_error():
# Find available port
s = socket.socket()
s.bind(('localhost', 0))
port = s.getsockname()[1]
s.close()
if port == 0:
pytest.skip("could not find available port")
bad_endpoint = f'http://localhost:{port}'
bad_request = None
try:
bad_request = request.rpc_request('hmy_getNodeMetadata', endpoint=bad_endpoint)
except Exception as e:
assert isinstance(e, exceptions.RequestsError)
assert bad_request is None
@pytest.mark.run(order=2)
def test_request_rpc_error():
error_request = None
try:
error_request = request.rpc_request('hmy_getBalance')
except (exceptions.RequestsTimeoutError, exceptions.RequestsError) as err:
pytest.skip("can not connect to local blockchain", allow_module_level=True)
except Exception as e:
assert isinstance(e, exceptions.RPCError)
assert error_request is None
@pytest.mark.run(order=3)
def test_rpc_request():
endpoint = 'http://localhost:9500'
timeout = 30
method = 'hmy_getNodeMetadata'
params = []
payload = {
"id": "1",
"jsonrpc": "2.0",
"method": method,
"params": params
}
headers = {
'Content-Type': 'application/json'
}
response = None
try:
response = requests.request('POST', endpoint, headers=headers,
data=json.dumps(payload), timeout=timeout, allow_redirects=True)
except:
pytest.skip("can not connect to local blockchain")
assert response is not None
resp = None
try:
resp = json.loads(response.content)
except json.decoder.JSONDecodeError as err:
pytest.skip('unable to decode response')
assert resp is not None
rpc_response = None
try:
rpc_response = request.rpc_request(method, params, endpoint, timeout)
except exceptions.RPCError as e:
assert 'error' in resp
if rpc_response is not None:
assert rpc_response == resp

@ -0,0 +1,154 @@
import json
import time
import pytest
import requests
test_validator_address = 'one18tvf56zqjkjnak686lwutcp5mqfnvee35xjnhc'
transfer_raw_transaction = '0xf86f80843b9aca008252080180943ad89a684095a53edb47d7ddc5e034d8133667318a152d02c7e14af68000008027a0ec6c8ad0f70b3c826fa77574c6815a8f73936fafb7b2701a7082ad7d278c95a9a0429f9f166b1c1d385a4ec8f8b86604c26e427c2b0a1c85d9cf4ec6bbd0719508'
tx_hash = '0x1fa20537ea97f162279743139197ecf0eac863278ac1c8ada9a6be5d1e31e633'
create_validator_raw_transaction = '0xf9015680f90105943ad89a684095a53edb47d7ddc5e034d813366731d984746573748474657374847465737484746573748474657374ddc988016345785d8a0000c9880c7d713b49da0000c887b1a2bc2ec500008a022385a827e8155000008b084595161401484a000000f1b0282554f2478661b4844a05a9deb1837aac83931029cb282872f0dcd7239297c499c02ea8da8746d2f08ca2b037e89891f862b86003557e18435c201ecc10b1664d1aea5b4ec59dbfe237233b953dbd9021b86bc9770e116ed3c413fe0334d89562568a10e133d828611f29fee8cdab9719919bbcc1f1bf812c73b9ccd0f89b4f0b9ca7e27e66d58bbb06fcf51c295b1d076cfc878a0228f16f86157860000080843b9aca008351220027a018385211a150ca032c3526cef0aba6a75f99a18cb73f547f67bab746be0c7a64a028be921002c6eb949b3932afd010dfe1de2459ec7fe84403b9d9d8892394a78c'
staking_tx_hash = '0x57ec011aabdeb078a4816502224022f291fa8b07c82bbae8476f514a1d71c730'
endpoint = 'http://localhost:9500'
endpoint_shard_one = 'http://localhost:9501'
timeout = 30
headers = {
'Content-Type': 'application/json'
}
@pytest.fixture(scope="session", autouse=True)
def setup_blockchain():
metadata = _check_connection()
_check_staking_epoch(metadata)
tx_data = _check_funding_transaction()
if not tx_data['result']:
_send_funding_transaction()
time.sleep(20) # Sleep to let cross shard transaction finalize
tx_data = _check_funding_transaction()
if 'error' in tx_data:
pytest.skip(f"Error in hmy_getTransactionByHash reply: {tx_data['error']}", allow_module_level=True)
if not tx_data['result']:
pytest.skip(f"Funding transaction failed: {tx_hash}", allow_module_level=True)
stx_data = _check_staking_transaction()
if not stx_data['result']:
_send_staking_transaction()
time.sleep(30) # Sleep to let transaction finalize
stx_data = _check_staking_transaction()
if 'error' in stx_data:
pytest.skip(f"Error in hmy_getStakingTransactionByHash reply: {stx_data['error']}", allow_module_level=True)
if not stx_data['result']:
pytest.skip(f"Staking transaction failed: {staking_tx_hash}", allow_module_level=True)
def _check_connection():
try:
payload = {
"id": "1",
"jsonrpc": "2.0",
"method": 'hmy_getNodeMetadata',
"params": []
}
response = requests.request('POST', endpoint, headers=headers,
data=json.dumps(payload), timeout=timeout, allow_redirects=True)
metadata = json.loads(response.content)
if 'error' in metadata:
pytest.skip(f"Error in hmy_getNodeMetadata reply: {metadata['error']}", allow_module_level=True)
if 'chain-config' not in metadata['result']:
pytest.skip("Chain config not found in hmy_getNodeMetadata reply", allow_module_level=True)
return metadata
except Exception as e:
pytest.skip('Can not connect to local blockchain or bad hmy_getNodeMetadata reply', allow_module_level=True)
def _check_staking_epoch(metadata):
latest_header = None
try:
payload = {
"id": "1",
"jsonrpc": "2.0",
"method": 'hmy_latestHeader',
"params": []
}
response = requests.request('POST', endpoint, headers=headers,
data=json.dumps(payload), timeout=timeout, allow_redirects=True)
latest_header = json.loads(response.content)
if 'error' in latest_header:
pytest.skip(f"Error in hmy_latestHeader reply: {latest_header['error']}", allow_module_level=True)
except Exception as e:
pytest.skip('Failed to get hmy_latestHeader reply', allow_module_level=True)
if metadata and latest_header:
staking_epoch = metadata['result']['chain-config']['staking-epoch']
current_epoch = latest_header['result']['epoch']
if staking_epoch > current_epoch:
pytest.skip(f'Not staking epoch: current {current_epoch}, staking {staking_epoch}', allow_module_level=True)
def _send_funding_transaction():
try:
payload = {
"id": "1",
"jsonrpc": "2.0",
"method": 'hmy_sendRawTransaction',
"params": [transfer_raw_transaction]
}
response = requests.request('POST', endpoint_shard_one, headers=headers,
data=json.dumps(payload), timeout=timeout, allow_redirects=True)
tx = json.loads(response.content)
if 'error' in tx:
pytest.skip(f"Error in hmy_sendRawTransaction reply: {tx['error']}", allow_module_level=True)
except Exception as e:
pytest.skip('Failed to get hmy_sendRawTransaction reply', allow_module_level=True)
def _check_funding_transaction():
try:
payload = {
"id": "1",
"jsonrpc": "2.0",
"method": 'hmy_getTransactionByHash',
"params": [tx_hash]
}
response = requests.request('POST', endpoint_shard_one, headers=headers,
data=json.dumps(payload), timeout=timeout, allow_redirects=True)
tx_data = json.loads(response.content)
return tx_data
except Exception as e:
pytest.skip('Failed to get hmy_getTransactionByHash reply', allow_module_level=True)
def _send_staking_transaction():
try:
payload = {
"id": "1",
"jsonrpc": "2.0",
"method": 'hmy_sendRawStakingTransaction',
"params": [create_validator_raw_transaction]
}
response = requests.request('POST', endpoint, headers=headers,
data=json.dumps(payload), timeout=timeout, allow_redirects=True)
staking_tx = json.loads(response.content)
if 'error' in staking_tx:
pytest.skip(f"Error in hmy_sendRawStakingTransaction reply: {staking_tx['error']}", allow_module_level=True)
except Exception as e:
pytest.skip('Failed to get hmy_sendRawStakingTransaction reply', allow_module_level=True)
def _check_staking_transaction():
try:
payload = {
"id": "1",
"jsonrpc": "2.0",
"method": 'hmy_getStakingTransactionByHash',
"params": [staking_tx_hash]
}
response = requests.request('POST', endpoint, headers=headers,
data=json.dumps(payload), timeout=timeout, allow_redirects=True)
stx_data = json.loads(response.content)
return stx_data
except Exception as e:
pytest.skip('Failed to get hmy_getStakingTransactionByHash reply', allow_module_level=True)

@ -0,0 +1,56 @@
import pytest
import requests
from pyhmy import (
account
)
from pyhmy.rpc import (
exceptions
)
explorer_endpoint = 'http://localhost:9599'
endpoint_shard_one = 'http://localhost:9501'
local_test_address = 'one1zksj3evekayy90xt4psrz8h6j2v3hla4qwz4ur'
test_validator_address = 'one18tvf56zqjkjnak686lwutcp5mqfnvee35xjnhc'
genesis_block_number = 0
test_block_number = 1
def _test_account_rpc(fn, *args, **kwargs):
if not callable(fn):
pytest.fail(f'Invalid function: {fn}')
try:
response = fn(*args, **kwargs)
except Exception as e:
if isinstance(e, exceptions.RPCError) and 'does not exist/is not available' in str(e):
pytest.skip(f'{str(e)}')
pytest.fail(f'Unexpected error: {e.__class__} {e}')
return response
@pytest.mark.run(order=1)
def test_get_balance(setup_blockchain):
balance = _test_account_rpc(account.get_balance, local_test_address)
assert balance > 0
@pytest.mark.run(order=2)
def test_get_balance_by_block(setup_blockchain):
balance = _test_account_rpc(account.get_balance_by_block, local_test_address, genesis_block_number)
assert balance > 0
@pytest.mark.run(order=3)
def test_get_transaction_count(setup_blockchain):
transactions = _test_account_rpc(account.get_transaction_count, local_test_address, endpoint=endpoint_shard_one)
assert transactions > 0
@pytest.mark.run(order=4)
def test_get_transaction_history(setup_blockchain):
tx_history = _test_account_rpc(account.get_transaction_history, local_test_address, endpoint=explorer_endpoint)
assert len(tx_history) >= 0
@pytest.mark.run(order=5)
def test_get_staking_transaction_history(setup_blockchain):
staking_tx_history = _test_account_rpc(account.get_staking_transaction_history, test_validator_address, endpoint=explorer_endpoint)
assert len(staking_tx_history) > 0

@ -0,0 +1,98 @@
import pytest
import requests
from pyhmy import (
blockchain
)
from pyhmy.rpc import (
exceptions
)
test_epoch_number = 0
genesis_block_number = 0
test_block_number = 1
test_block_hash = None
def _test_blockchain_rpc(fn, *args, **kwargs):
if not callable(fn):
pytest.fail(f'Invalid function: {fn}')
try:
response = fn(*args, **kwargs)
except Exception as e:
if isinstance(e, exceptions.RPCError) and 'does not exist/is not available' in str(e):
pytest.skip(f'{str(e)}')
pytest.fail(f'Unexpected error: {e.__class__} {e}')
return response
@pytest.mark.run(order=1)
def test_get_node_metadata(setup_blockchain):
_test_blockchain_rpc(blockchain.get_node_metadata)
@pytest.mark.run(order=2)
def test_get_sharding_structure(setup_blockchain):
_test_blockchain_rpc(blockchain.get_sharding_structure)
@pytest.mark.run(order=3)
def test_get_leader_address(setup_blockchain):
_test_blockchain_rpc(blockchain.get_leader_address)
@pytest.mark.run(order=4)
def test_get_block_number(setup_blockchain):
_test_blockchain_rpc(blockchain.get_block_number)
@pytest.mark.run(order=5)
def test_get_current_epoch(setup_blockchain):
_test_blockchain_rpc(blockchain.get_current_epoch)
@pytest.mark.run(order=6)
def tset_get_gas_price(setup_blockchain):
_test_blockchain_rpc(blockchain.get_gas_price)
@pytest.mark.run(order=7)
def test_get_num_peers(setup_blockchain):
_test_blockchain_rpc(blockchain.get_num_peers)
@pytest.mark.run(order=8)
def test_get_latest_header(setup_blockchain):
_test_blockchain_rpc(blockchain.get_latest_header)
@pytest.mark.run(order=9)
def test_get_latest_headers(setup_blockchain):
_test_blockchain_rpc(blockchain.get_latest_headers)
@pytest.mark.run(order=10)
def test_get_block_by_number(setup_blockchain):
global test_block_hash
block = _test_blockchain_rpc(blockchain.get_block_by_number, test_block_number)
test_block_hash = block['hash']
@pytest.mark.run(order=11)
def test_get_block_by_hash(setup_blockchain):
if not test_block_hash:
pytest.skip('Failed to get reference block hash')
_test_blockchain_rpc(blockchain.get_block_by_hash, test_block_hash)
@pytest.mark.run(order=12)
def test_get_block_transaction_count_by_number(setup_blockchain):
_test_blockchain_rpc(blockchain.get_block_transaction_count_by_number, test_block_number)
@pytest.mark.run(order=13)
def test_get_block_transaction_count_by_hash(setup_blockchain):
if not test_block_hash:
pytest.skip('Failed to get reference block hash')
_test_blockchain_rpc(blockchain.get_block_transaction_count_by_hash, test_block_hash)
@pytest.mark.run(order=14)
def test_get_blocks(setup_blockchain):
_test_blockchain_rpc(blockchain.get_blocks, genesis_block_number, test_block_number)
@pytest.mark.run(order=15)
def test_get_block_signers(setup_blockchain):
_test_blockchain_rpc(blockchain.get_block_signers, test_block_number)
@pytest.mark.run(order=16)
def test_get_validators(setup_blockchain):
_test_blockchain_rpc(blockchain.get_validators, test_epoch_number)

@ -0,0 +1,66 @@
import pytest
import requests
from pyhmy import (
staking
)
from pyhmy.rpc import (
exceptions
)
test_validator_address = 'one18tvf56zqjkjnak686lwutcp5mqfnvee35xjnhc'
def _test_staking_rpc(fn, *args, **kwargs):
if not callable(fn):
pytest.fail(f'Invalid function: {fn}')
try:
response = fn(*args, **kwargs)
except Exception as e:
if isinstance(e, exceptions.RPCError) and 'does not exist/is not available' in str(e):
pytest.skip(f'{str(e)}')
pytest.fail(f'Unexpected error: {e.__class__} {e}')
return response
@pytest.mark.run(order=1)
def test_get_all_validator_addresses(setup_blockchain):
validator_addresses = _test_staking_rpc(staking.get_all_validator_addresses)
assert len(validator_addresses) > 0
assert test_validator_address in validator_addresses
@pytest.mark.run(order=2)
def test_get_validator_information(setup_blockchain):
_test_staking_rpc(staking.get_validator_information, test_validator_address)
@pytest.mark.run(order=3)
def test_get_all_validator_information(setup_blockchain):
all_validator_information = _test_staking_rpc(staking.get_all_validator_information)
assert len(all_validator_information) > 0
@pytest.mark.run(order=4)
def test_get_delegations_by_delegator(setup_blockchain):
delegations = _test_staking_rpc(staking.get_delegations_by_delegator, test_validator_address)
assert len(delegations) > 0
@pytest.mark.run(order=5)
def test_get_delegations_by_validator(setup_blockchain):
delegations = _test_staking_rpc(staking.get_delegations_by_validator, test_validator_address)
assert len(delegations) > 0
@pytest.mark.run(order=6)
def test_get_current_utility_metrics(setup_blockchain):
_test_staking_rpc(staking.get_current_utility_metrics)
@pytest.mark.run(order=7)
def test_get_staking_network_info(setup_blockchain):
_test_staking_rpc(staking.get_staking_network_info)
@pytest.mark.run(order=8)
def test_get_super_committees(setup_blockchain):
_test_staking_rpc(staking.get_super_committees)
@pytest.mark.run(order=9)
def test_get_raw_median_stake_snapshot(setup_blockchain):
_test_staking_rpc(staking.get_raw_median_stake_snapshot)

@ -0,0 +1,125 @@
import pytest
import requests
from pyhmy import (
transaction
)
from pyhmy.rpc import (
exceptions
)
localhost_shard_one = 'http://localhost:9501'
tx_hash = '0x1fa20537ea97f162279743139197ecf0eac863278ac1c8ada9a6be5d1e31e633'
tx_block_num = None
tx_block_hash = None
cx_hash = '0x1fa20537ea97f162279743139197ecf0eac863278ac1c8ada9a6be5d1e31e633'
stx_hash = '0x57ec011aabdeb078a4816502224022f291fa8b07c82bbae8476f514a1d71c730'
stx_block_num = None
stx_block_hash = None
test_index = 0
raw_tx = '0xf86f80843b9aca008252080180943ad89a684095a53edb47d7ddc5e034d8133667318a152d02c7e14af68000008027a0ec6c8ad0f70b3c826fa77574c6815a8f73936fafb7b2701a7082ad7d278c95a9a0429f9f166b1c1d385a4ec8f8b86604c26e427c2b0a1c85d9cf4ec6bbd0719508'
raw_stx = '0xf9015680f90105943ad89a684095a53edb47d7ddc5e034d813366731d984746573748474657374847465737484746573748474657374ddc988016345785d8a0000c9880c7d713b49da0000c887b1a2bc2ec500008a022385a827e8155000008b084595161401484a000000f1b0282554f2478661b4844a05a9deb1837aac83931029cb282872f0dcd7239297c499c02ea8da8746d2f08ca2b037e89891f862b86003557e18435c201ecc10b1664d1aea5b4ec59dbfe237233b953dbd9021b86bc9770e116ed3c413fe0334d89562568a10e133d828611f29fee8cdab9719919bbcc1f1bf812c73b9ccd0f89b4f0b9ca7e27e66d58bbb06fcf51c295b1d076cfc878a0228f16f86157860000080843b9aca008351220027a018385211a150ca032c3526cef0aba6a75f99a18cb73f547f67bab746be0c7a64a028be921002c6eb949b3932afd010dfe1de2459ec7fe84403b9d9d8892394a78c'
def _test_transaction_rpc(fn, *args, **kwargs):
if not callable(fn):
pytest.fail(f'Invalid function: {fn}')
try:
response = fn(*args, **kwargs)
except Exception as e:
if isinstance(e, exceptions.RPCError) and 'does not exist/is not available' in str(e):
pytest.skip(f'{str(e)}')
pytest.fail(f'Unexpected error: {e.__class__} {e}')
return response
@pytest.mark.run(order=1)
def test_get_pending_transactions(setup_blockchain):
_test_transaction_rpc(transaction.get_pending_transactions)
@pytest.mark.run(order=2)
def test_get_transaction_by_hash(setup_blockchain):
tx = _test_transaction_rpc(transaction.get_transaction_by_hash, tx_hash, endpoint=localhost_shard_one)
assert tx is not None
global tx_block_num
tx_block_num = int(tx['blockNumber'], 0)
global tx_block_hash
tx_block_hash = tx['blockHash']
@pytest.mark.run(order=3)
def test_get_transaction_by_block_hash_and_index(setup_blockchain):
if not tx_block_hash:
pytest.skip('Failed to get reference block hash')
tx = _test_transaction_rpc(transaction.get_transaction_by_block_hash_and_index,
tx_block_hash, test_index, endpoint=localhost_shard_one)
assert tx is not None
@pytest.mark.run(order=4)
def test_get_transaction_by_block_number_and_index(setup_blockchain):
if not tx_block_num:
pytest.skip('Failed to get reference block num')
tx = _test_transaction_rpc(transaction.get_transaction_by_block_number_and_index, tx_block_num, test_index,
endpoint=localhost_shard_one)
assert tx is not None
@pytest.mark.run(order=5)
def test_get_transaction_receipt(setup_blockchain):
tx_receipt = _test_transaction_rpc(transaction.get_transaction_receipt, tx_hash, endpoint=localhost_shard_one)
assert tx_receipt is not None
@pytest.mark.run(order=6)
def test_get_transaction_error_sink(setup_blockchain):
_test_transaction_rpc(transaction.get_transaction_error_sink)
@pytest.mark.run(order=7)
def test_send_raw_transaction(setup_blockchain):
test_tx_hash = _test_transaction_rpc(transaction.send_raw_transaction, raw_tx)
assert test_tx_hash == tx_hash
@pytest.mark.run(order=8)
def test_get_pending_cx_receipts(setup_blockchain):
_test_transaction_rpc(transaction.get_pending_cx_receipts)
@pytest.mark.run(order=9)
def test_get_cx_receipt_by_hash(setup_blockchain):
cx = _test_transaction_rpc(transaction.get_cx_receipt_by_hash, cx_hash)
assert cx is not None
@pytest.mark.run(order=10)
def test_resend_cx_receipt(setup_blockchain):
sent = _test_transaction_rpc(transaction.resend_cx_receipt, cx_hash)
assert not sent
@pytest.mark.run(order=11)
def test_get_staking_transaction_by_hash(setup_blockchain):
staking_tx = _test_transaction_rpc(transaction.get_staking_transaction_by_hash, stx_hash)
assert staking_tx is not None
global stx_block_num
stx_block_num = int(staking_tx['blockNumber'], 0)
global stx_block_hash
stx_block_hash = staking_tx['blockHash']
@pytest.mark.run(order=12)
def test_get_transaction_by_block_hash_and_index(setup_blockchain):
if not stx_block_hash:
pytest.skip('Failed to get reference block hash')
stx = _test_transaction_rpc(transaction.get_staking_transaction_by_block_hash_and_index, stx_block_hash, test_index)
assert stx is not None
@pytest.mark.run(order=13)
def test_get_transaction_by_block_number_and_index(setup_blockchain):
if not stx_block_num:
pytest.skip('Failed to get reference block num')
stx = _test_transaction_rpc(transaction.get_staking_transaction_by_block_number_and_index, stx_block_num, test_index)
assert stx is not None
@pytest.mark.run(order=14)
def test_get_staking_transaction_error_sink(setup_blockchain):
_test_transaction_rpc(transaction.get_staking_transaction_error_sink)
@pytest.mark.run(order=15)
def test_send_raw_staking_transaction(setup_blockchain):
test_stx_hash = _test_transaction_rpc(transaction.send_raw_staking_transaction, raw_stx)
assert test_stx_hash == stx_hash
Loading…
Cancel
Save