Merge pull request #3 from harmony-one/validator_by_block_rpcs

[SDK] Update with more methods & RPCs
pull/4/head
Janet Liang 5 years ago committed by GitHub
commit c137b125d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      Makefile
  2. 173
      pyhmy/rpc/account.py
  3. 153
      pyhmy/rpc/blockchain.py
  4. 9
      pyhmy/rpc/exceptions.py
  5. 81
      pyhmy/rpc/staking.py
  6. 4
      pyhmy/rpc/transaction.py
  7. 2
      setup.py
  8. 6
      tests/rpc-pyhmy/conftest.py
  9. 31
      tests/rpc-pyhmy/test_account.py
  10. 76
      tests/rpc-pyhmy/test_blockchain.py
  11. 37
      tests/rpc-pyhmy/test_staking.py
  12. 43
      tests/rpc-pyhmy/test_transaction.py

@ -24,6 +24,10 @@ clean-py:
find . -name '*.pyo' -exec rm -f {} +
find . -name '*~' -exec rm -f {} +
dev:
python3 -m pip install pytest
python3 -m pip install pytest-ordering
test:
python3 -m py.test -r s -s tests

@ -6,6 +6,14 @@ from .blockchain import (
get_sharding_structure
)
from .exceptions import (
RPCError,
InvalidRPCReplyError,
JSONDecodeError,
RequestsError,
RequestsTimeoutError
)
_default_endpoint = 'http://localhost:9500'
_default_timeout = 30
@ -27,24 +35,34 @@ def get_balance(address, endpoint=_default_endpoint, timeout=_default_timeout) -
-------
int
Account balance in ATTO
Raises
------
InvalidRPCReplyError
If received unknown result from endpoint
"""
method = 'hmy_getBalance'
params = [
address,
'latest'
]
return int(rpc_request('hmy_getBalance', params=params, endpoint=endpoint, timeout=timeout)['result'], 16)
balance = rpc_request(method, params=params, endpoint=endpoint, timeout=timeout)['result']
try:
return int(balance, 16)
except TypeError as e:
raise InvalidRPCReplyError(method, endpoint) from e
def get_balance_by_block(address, block_num, endpoint=_default_endpoint, timeout=_default_timeout) -> int:
"""
Get account balance at time of given block
Get account balance for address at a given block number
Parameters
----------
address: str
Address to get balance for
block_num: int
Block number to req
Block to get balance at
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
@ -53,23 +71,36 @@ def get_balance_by_block(address, block_num, endpoint=_default_endpoint, timeout
Returns
-------
int
Account balance in ATTO at given block
Account balance in ATTO
Raises
------
InvalidRPCReplyError
If received unknown result from endpoint
"""
method = 'hmy_getBalanceByBlockNumber'
params = [
address,
str(hex(block_num))
]
return int(rpc_request('hmy_getBalanceByBlockNumber', params=params, endpoint=endpoint, timeout=timeout)['result'], 16)
balance = rpc_request(method, params=params, endpoint=endpoint, timeout=timeout)['result']
try:
return int(balance, 16)
except TypeError as e:
raise InvalidRPCReplyError(method, endpoint) from e
def get_transaction_count(address, endpoint=_default_endpoint, timeout=_default_timeout) -> int:
def get_account_nonce(address, true_nonce=False, endpoint=_default_endpoint, timeout=_default_timeout) -> int:
"""
Get number of transactions & staking transactions sent by an account
Get the account nonce
Parameters
----------
address: str
Address to get transaction count for
true_nonce: :obj:`bool`, optional
True to get on-chain nonce
False to get nonce based on pending transaction pool
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
@ -78,13 +109,48 @@ def get_transaction_count(address, endpoint=_default_endpoint, timeout=_default_
Returns
-------
int
Number of transactions sent by the account (account nonce)
Account nonce
Raises
------
InvalidRPCReplyError
If received unknown result from endpoint
"""
method = 'hmy_getTransactionCount'
params = [
address,
'latest'
'latest' if true_nonce else 'pending'
]
return int(rpc_request('hmy_getTransactionCount', params=params, endpoint=endpoint, timeout=timeout)['result'], 16)
nonce = rpc_request(method, params=params, endpoint=endpoint, timeout=timeout)['result']
try:
return int(nonce, 16)
except TypeError as e:
raise InvalidRPCReplyError(method, endpoint) from e
def get_transaction_count(address, endpoint=_default_endpoint, timeout=_default_timeout) -> int:
"""
Get number of transactions & staking transactions sent by an account
Parameters
----------
address: str
Address to get transaction count for
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
int
Number of transactions sent by the account
See also
--------
get_account_nonce
"""
return get_account_nonce(address, true_nonce=True, endpoint=endpoint, timeout=timeout)
def get_transaction_history(address, page=0, page_size=1000, include_full_tx=False, tx_type='ALL',
@ -120,6 +186,11 @@ def get_transaction_history(address, page=0, page_size=1000, include_full_tx=Fal
-------
list
# TODO: Add link to reference RPC documentation
Raises
------
InvalidRPCReplyError
If received unknown result from endpoint
"""
params = [
{
@ -131,8 +202,12 @@ def get_transaction_history(address, page=0, page_size=1000, include_full_tx=Fal
'order': order
}
]
tx_history = rpc_request('hmy_getTransactionsHistory', params=params, endpoint=endpoint, timeout=timeout)
return tx_history['result']['transactions']
method = 'hmy_getTransactionsHistory'
tx_history = rpc_request(method, params=params, endpoint=endpoint, timeout=timeout)
try:
return tx_history['result']['transactions']
except KeyError as e:
raise InvalidRPCReplyError(method, endpoint) from e
def get_staking_transaction_history(address, page=0, page_size=1000, include_full_tx=False, tx_type='ALL',
@ -166,6 +241,11 @@ def get_staking_transaction_history(address, page=0, page_size=1000, include_ful
-------
list
# TODO: Add link to reference RPC documentation
Raises
------
InvalidRPCReplyError
If received unknown result from endpoint
"""
params = [
{
@ -178,18 +258,25 @@ def get_staking_transaction_history(address, page=0, page_size=1000, include_ful
}
]
# Using v2 API, because getStakingTransactionHistory not implemented in v1
stx_history = rpc_request('hmyv2_getStakingTransactionsHistory', params=params, endpoint=endpoint, timeout=timeout)
return stx_history['result']['staking_transactions']
method = 'hmyv2_getStakingTransactionsHistory'
stx_history = rpc_request(method, params=params, endpoint=endpoint, timeout=timeout)['result']
try:
return stx_history['staking_transactions']
except KeyError as e:
raise InvalidRPCReplyError(method, endpoint) from e
def get_balance_on_all_shards(address, endpoint=_default_endpoint, timeout=_default_timeout):
def get_balance_on_all_shards(address, skip_error=True, endpoint=_default_endpoint, timeout=_default_timeout) -> list:
"""
Get current account balance in all shards
Get current account balance in all shards & optionally report errors getting account balance for a shard
Parameters
----------
address: str
Address to get balance for
skip_error: :obj:`bool`, optional
True to ignore errors getting balance for shard
False to include errors when getting balance for shard
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
@ -197,11 +284,59 @@ def get_balance_on_all_shards(address, endpoint=_default_endpoint, timeout=_defa
Returns
-------
dict
list
Account balance per shard in ATTO
Example reply:
[
{
'shard': 0,
'balance': 0,
},
...
]
"""
balances = {}
balances = []
sharding_structure = get_sharding_structure(endpoint=endpoint, timeout=timeout)
for shard in sharding_structure:
balances[shard['shardID']] = get_balance(address, endpoint=shard['http'], timeout=timeout)
try:
balances.append({
'shard': shard['shardID'],
'balance': get_balance(address, endpoint=shard['http'], timeout=timeout)
})
except (KeyError, RPCError, RequestsError, RequestsTimeoutError, JSONDecodeError):
if not skip_error:
balances.append({
'shard': shard['shardID'],
'balance': None
})
return balances
def get_total_balance(address, endpoint=_default_endpoint, timeout=_default_timeout) -> int:
"""
Get total account balance on all shards
Parameters
----------
address: str
Address to get balance for
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds per request
Returns
-------
int
Total account balance in ATTO
Raises
------
RuntimeError
If error occurred getting account balance for a shard
"""
try:
balances = get_balance_on_all_shards(address, skip_error=False, endpoint=endpoint, timeout=timeout)
return sum(b['balance'] for b in balances)
except TypeError as e:
raise RuntimeError from e

@ -2,6 +2,10 @@ from .request import (
rpc_request
)
from .exceptions import (
InvalidRPCReplyError
)
_default_endpoint = 'http://localhost:9500'
_default_timeout = 30
@ -28,6 +32,92 @@ def get_node_metadata(endpoint=_default_endpoint, timeout=_default_timeout) -> d
return rpc_request('hmy_getNodeMetadata', endpoint=endpoint, timeout=timeout)['result']
def get_shard(endpoint=_default_endpoint, timeout=_default_timeout) -> int:
"""
Get config for the node
Parameters
----------
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
int
Shard ID of node
Raises
------
InvalidRPCReplyError
If received unknown result from endpoint
"""
method = 'hmy_getNodeMetadata'
try:
return rpc_request(method, endpoint=endpoint, timeout=timeout)['result']['shard-id']
except KeyError as e:
raise InvalidRPCReplyError(method, endpoint) from e
def get_staking_epoch(endpoint=_default_endpoint, timeout=_default_timeout) -> int:
"""
Get epoch number when blockchain switches to EPoS election
Parameters
----------
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
int
Epoch at which blockchain switches to EPoS election
Raises
------
InvalidRPCReplyError
If received unknown result from endpoint
"""
method = 'hmy_getNodeMetadata'
data = rpc_request(method, endpoint=endpoint, timeout=timeout)['result']
try:
return int(data['chain-config']['staking-epoch'])
except (KeyError, TypeError) as e:
raise InvalidRPCReplyError(method, endpoint) from e
def get_prestaking_epoch(endpoint=_default_endpoint, timeout=_default_timeout) -> int:
"""
Get epoch number when blockchain switches to allow staking features without election
Parameters
----------
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
int
Epoch at which blockchain switches to allow staking features without election
Raises
------
InvalidRPCReplyError
If received unknown result from endpoint
"""
method = 'hmy_getNodeMetadata'
data = rpc_request(method, endpoint=endpoint, timeout=timeout)['result']
try:
return int(data['chain-config']['prestaking-epoch'])
except (KeyError, TypeError) as e:
raise InvalidRPCReplyError(method, endpoint) from e
def get_sharding_structure(endpoint=_default_endpoint, timeout=_default_timeout) -> list:
"""
Get network sharding structure
@ -81,8 +171,17 @@ def get_block_number(endpoint=_default_endpoint, timeout=_default_timeout) -> in
-------
int
Current block number
Raises
------
InvalidRPCReplyError
If received unknown result from endpoint
"""
return int(rpc_request('hmy_blockNumber', endpoint=endpoint, timeout=timeout)['result'], 16)
method = 'hmy_blockNumber'
try:
return int(rpc_request(method, endpoint=endpoint, timeout=timeout)['result'], 16)
except TypeError as e:
raise InvalidRPCReplyError(method, endpoint) from e
def get_current_epoch(endpoint=_default_endpoint, timeout=_default_timeout) -> int:
@ -100,8 +199,17 @@ def get_current_epoch(endpoint=_default_endpoint, timeout=_default_timeout) -> i
-------
int
Current epoch number
Raises
------
InvalidRPCReplyError
If received unknown result from endpoint
"""
return int(rpc_request('hmy_getEpoch', endpoint=endpoint, timeout=timeout)['result'], 16)
method = 'hmy_getEpoch'
try:
return int(rpc_request(method, endpoint=endpoint, timeout=timeout)['result'], 16)
except TypeError as e:
raise InvalidRPCReplyError(method, endpoint) from e
def get_gas_price(endpoint=_default_endpoint, timeout=_default_timeout) -> int:
@ -119,8 +227,17 @@ def get_gas_price(endpoint=_default_endpoint, timeout=_default_timeout) -> int:
-------
int
Network gas price
Raises
------
InvalidRPCReplyError
If received unknown result from endpoint
"""
return int(rpc_request('hmy_gasPrice', endpoint=endpoint, timeout=timeout)['result'], 16)
method = 'hmy_gasPrice'
try:
return int(rpc_request(method, endpoint=endpoint, timeout=timeout)['result'], 16)
except TypeError as e:
raise InvalidRPCReplyError(method, endpoint) from e
def get_num_peers(endpoint=_default_endpoint, timeout=_default_timeout) -> int:
@ -138,8 +255,16 @@ def get_num_peers(endpoint=_default_endpoint, timeout=_default_timeout) -> int:
-------
int
Number of connected peers
Raises
------
InvalidRPCReplyError
If received unknown result from endpoint
"""
return int(rpc_request('net_peerCount', endpoint=endpoint, timeout=timeout)['result'], 16)
method = 'net_peerCount'
try:
return int(rpc_request(method, endpoint=endpoint, timeout=timeout)['result'], 16)
except TypeError as e:
raise InvalidRPCReplyError(method, endpoint) from e
##############
@ -229,6 +354,7 @@ def get_block_by_hash(block_hash, endpoint=_default_endpoint, include_full_tx=Fa
-------
dict
# TODO: Add link to reference RPC documentation
None if block hash is not found
"""
params = [
block_hash,
@ -376,3 +502,22 @@ def get_validators(epoch, endpoint=_default_endpoint, timeout=_default_timeout)
epoch
]
return rpc_request('hmy_getValidators', params=params, endpoint=endpoint, timeout=timeout)['result']
def get_bad_blocks(endpoint=_default_endpoint, timeout=_default_timeout) -> list:
"""
Get list of bad blocks in memory of specific node
Parameters
----------
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
list
# TODO: Add link to reference RPC documentation
"""
return rpc_request('hmy_getCurrentBadBlocks', endpoint=endpoint, timeout=timeout)['result']

@ -7,6 +7,15 @@ class RPCError(RuntimeError):
Exception raised when RPC call returns an error
"""
class InvalidRPCReplyError(RuntimeError):
"""
Exception raised when RPC call returns unexpected result
Generally indicates Harmony API has been updated & pyhmy library needs to be updated as well
"""
def __init__(self, method, endpoint):
self.message = f'Unexpected reply for {method} from {endpoint}'
class JSONDecodeError(json.decoder.JSONDecodeError):
"""
Wrapper for json lib DecodeError exception

@ -53,6 +53,33 @@ def get_validator_information(validator_addr, endpoint=_default_endpoint, timeou
return rpc_request('hmy_getValidatorInformation', params=params, endpoint=endpoint, timeout=timeout)['result']
def get_validator_information_by_block(validator_addr, block_num, endpoint=_default_endpoint, timeout=_default_timeout):
"""
Get validator information for validator address at a block
Parameters
----------
validator_addr: str
One address of the validator to get information for
block_num: int
Block number to query validator information at
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
list
# TODO: Add link to reference RPC documentation
"""
params = [
validator_addr,
str(hex(block_num))
]
return rpc_request('hmy_getValidatorInformationByBlockNumber', params=params, endpoint=endpoint, timeout=timeout)['result']
def get_all_validator_information(page=-1, endpoint=_default_endpoint, timeout=_default_timeout) -> list:
"""
Get validator information for all validators on chain
@ -77,6 +104,33 @@ def get_all_validator_information(page=-1, endpoint=_default_endpoint, timeout=_
return rpc_request('hmy_getAllValidatorInformation', params=params, endpoint=endpoint, timeout=timeout)['result']
def get_all_validator_information_by_block(block_num, page=-1, endpoint=_default_endpoint, timeout=_default_timeout) -> list:
"""
Get validator information at block number for all validators on chain
Parameters
----------
block_num: int
Block number to get validator information for
page: :obj:`int`, optional
Page to request (-1 for all validators)
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
list
# TODO: Add link to reference RPC documentation
"""
params = [
page,
str(hex(block_num))
]
return rpc_request('hmy_getAllValidatorInformationByBlockNumber', params=params, endpoint=endpoint, timeout=timeout)['result']
###################
# Delegation RPCs #
###################
@ -104,6 +158,33 @@ def get_delegations_by_delegator(delegator_addr, endpoint=_default_endpoint, tim
return rpc_request('hmy_getDelegationsByDelegator', params=params, endpoint=endpoint, timeout=timeout)['result']
def get_delegations_by_delegator_by_block(delegator_addr, block_num, endpoint=_default_endpoint, timeout=_default_timeout) -> list:
"""
Get list of delegations by a delegator at a specific block
Parameters
----------
delegator_addr: str
Delegator address to get list of delegations for
block_num: int
Block number to query delgator information at
endpoint: :obj:`str`, optional
Endpoint to send request to
timeout: :obj:`int`, optional
Timeout in seconds
Returns
-------
list
# TODO: Add link to reference RPC documentation
"""
params = [
delegator_addr,
str(hex(block_num))
]
return rpc_request('hmy_getDelegationsByDelegatorByBlockNumber', params=params, endpoint=endpoint, timeout=timeout)['result']
def get_delegations_by_validator(validator_addr, endpoint=_default_endpoint, timeout=_default_timeout) -> list:
"""
Get list of delegations to a validator

@ -49,6 +49,7 @@ def get_transaction_by_hash(tx_hash, endpoint=_default_endpoint, timeout=_defaul
-------
dict
# TODO: Add link to reference RPC documentation
None if transaction hash not found
"""
params = [
tx_hash
@ -131,6 +132,7 @@ def get_transaction_receipt(tx_receipt, endpoint=_default_endpoint, timeout=_def
-------
dict
# TODO: Add link to reference RPC documentation
None if transcation receipt hash not found
"""
params = [
tx_receipt
@ -220,6 +222,7 @@ def get_cx_receipt_by_hash(cx_hash, endpoint = _default_endpoint, timeout = _def
-------
dict
# TODO: Add link to reference RPC documentation
None if cx receipt hash not found
"""
params = [
cx_hash
@ -271,6 +274,7 @@ def get_staking_transaction_by_hash(tx_hash, endpoint=_default_endpoint, timeout
-------
dict
# TODO: Add link to reference RPC documentation
None if staking transaction hash not found
"""
params = [
tx_hash

@ -19,6 +19,7 @@ setup(
install_requires=[
'pexpect',
'requests',
'incremental',
],
setup_requires=[
'incremental',
@ -37,4 +38,3 @@ setup(
'Programming Language :: Python :: 3.8',
]
)

@ -48,6 +48,12 @@ def setup_blockchain():
if not stx_data['result']:
pytest.skip(f"Staking transaction failed: {staking_tx_hash}", allow_module_level=True)
# TODO: Build data object to return data instead of hard coded values in the test files
try:
return int(stx_data['result']['blockNumber'], 16)
except (TypeError, KeyError) as e:
pytest.skip(f"Unexpected reply for hmy_getStakingTransactionByHash: {stx_data['result']}", allow_module_level=True)
def _check_connection():
try:

@ -33,24 +33,47 @@ def _test_account_rpc(fn, *args, **kwargs):
@pytest.mark.run(order=1)
def test_get_balance(setup_blockchain):
balance = _test_account_rpc(account.get_balance, local_test_address)
assert isinstance(balance, int)
assert balance > 0
@pytest.mark.run(order=2)
def test_get_balance_by_block(setup_blockchain):
balance = _test_account_rpc(account.get_balance_by_block, local_test_address, genesis_block_number)
assert isinstance(balance, int)
assert balance > 0
@pytest.mark.run(order=3)
def test_get_transaction_count(setup_blockchain):
transactions = _test_account_rpc(account.get_transaction_count, local_test_address, endpoint=endpoint_shard_one)
assert transactions > 0
def test_get_true_nonce(setup_blockchain):
true_nonce = _test_account_rpc(account.get_account_nonce, local_test_address, true_nonce=True, endpoint=endpoint_shard_one)
assert isinstance(true_nonce, int)
assert true_nonce > 0
@pytest.mark.run(order=4)
def test_get_pending_nonce(setup_blockchain):
pending_nonce = _test_account_rpc(account.get_account_nonce, local_test_address, endpoint=endpoint_shard_one)
assert isinstance(pending_nonce, int)
assert pending_nonce > 0
@pytest.mark.run(order=5)
def test_get_transaction_history(setup_blockchain):
tx_history = _test_account_rpc(account.get_transaction_history, local_test_address, endpoint=explorer_endpoint)
assert isinstance(tx_history, list)
assert len(tx_history) >= 0
@pytest.mark.run(order=5)
@pytest.mark.run(order=6)
def test_get_staking_transaction_history(setup_blockchain):
staking_tx_history = _test_account_rpc(account.get_staking_transaction_history, test_validator_address, endpoint=explorer_endpoint)
assert isinstance(staking_tx_history, list)
assert len(staking_tx_history) > 0
@pytest.mark.run(order=7)
def test_get_balance_on_all_shards(setup_blockchain):
balances = _test_account_rpc(account.get_balance_on_all_shards, local_test_address)
assert isinstance(balances, list)
assert len(balances) == 2
@pytest.mark.run(order=8)
def test_get_total_balance(setup_blockchain):
total_balance = _test_account_rpc(account.get_total_balance, local_test_address)
assert isinstance(total_balance, int)
assert total_balance > 0

@ -29,70 +29,116 @@ def _test_blockchain_rpc(fn, *args, **kwargs):
@pytest.mark.run(order=1)
def test_get_node_metadata(setup_blockchain):
_test_blockchain_rpc(blockchain.get_node_metadata)
metadata = _test_blockchain_rpc(blockchain.get_node_metadata)
assert isinstance(metadata, dict)
@pytest.mark.run(order=2)
def test_get_sharding_structure(setup_blockchain):
_test_blockchain_rpc(blockchain.get_sharding_structure)
sharding_structure = _test_blockchain_rpc(blockchain.get_sharding_structure)
assert isinstance(sharding_structure, list)
assert len(sharding_structure) > 0
@pytest.mark.run(order=3)
def test_get_leader_address(setup_blockchain):
_test_blockchain_rpc(blockchain.get_leader_address)
leader = _test_blockchain_rpc(blockchain.get_leader_address)
assert isinstance(leader, str)
assert 'one1' in leader
@pytest.mark.run(order=4)
def test_get_block_number(setup_blockchain):
_test_blockchain_rpc(blockchain.get_block_number)
current_block_number = _test_blockchain_rpc(blockchain.get_block_number)
assert isinstance(current_block_number, int)
@pytest.mark.run(order=5)
def test_get_current_epoch(setup_blockchain):
_test_blockchain_rpc(blockchain.get_current_epoch)
current_epoch = _test_blockchain_rpc(blockchain.get_current_epoch)
assert isinstance(current_epoch, int)
@pytest.mark.run(order=6)
def tset_get_gas_price(setup_blockchain):
_test_blockchain_rpc(blockchain.get_gas_price)
gas = _test_blockchain_rpc(blockchain.get_gas_price)
assert isinstance(gas, int)
@pytest.mark.run(order=7)
def test_get_num_peers(setup_blockchain):
_test_blockchain_rpc(blockchain.get_num_peers)
peers = _test_blockchain_rpc(blockchain.get_num_peers)
assert isinstance(peers, int)
@pytest.mark.run(order=8)
def test_get_latest_header(setup_blockchain):
_test_blockchain_rpc(blockchain.get_latest_header)
header = _test_blockchain_rpc(blockchain.get_latest_header)
assert isinstance(header, dict)
@pytest.mark.run(order=9)
def test_get_latest_headers(setup_blockchain):
_test_blockchain_rpc(blockchain.get_latest_headers)
header_pair = _test_blockchain_rpc(blockchain.get_latest_headers)
assert isinstance(header_pair, dict)
@pytest.mark.run(order=10)
def test_get_block_by_number(setup_blockchain):
global test_block_hash
block = _test_blockchain_rpc(blockchain.get_block_by_number, test_block_number)
assert isinstance(block, dict)
assert 'hash' in block.keys()
test_block_hash = block['hash']
@pytest.mark.run(order=11)
def test_get_block_by_hash(setup_blockchain):
if not test_block_hash:
pytest.skip('Failed to get reference block hash')
_test_blockchain_rpc(blockchain.get_block_by_hash, test_block_hash)
block = _test_blockchain_rpc(blockchain.get_block_by_hash, test_block_hash)
assert isinstance(block, dict)
@pytest.mark.run(order=12)
def test_get_block_transaction_count_by_number(setup_blockchain):
_test_blockchain_rpc(blockchain.get_block_transaction_count_by_number, test_block_number)
tx_count = _test_blockchain_rpc(blockchain.get_block_transaction_count_by_number, test_block_number)
assert isinstance(tx_count, int)
@pytest.mark.run(order=13)
def test_get_block_transaction_count_by_hash(setup_blockchain):
if not test_block_hash:
pytest.skip('Failed to get reference block hash')
_test_blockchain_rpc(blockchain.get_block_transaction_count_by_hash, test_block_hash)
tx_count = _test_blockchain_rpc(blockchain.get_block_transaction_count_by_hash, test_block_hash)
assert isinstance(tx_count, int)
@pytest.mark.run(order=14)
def test_get_blocks(setup_blockchain):
_test_blockchain_rpc(blockchain.get_blocks, genesis_block_number, test_block_number)
blocks = _test_blockchain_rpc(blockchain.get_blocks, genesis_block_number, test_block_number)
assert isinstance(blocks, list)
assert len(blocks) == (test_block_number - genesis_block_number + 1)
@pytest.mark.run(order=15)
def test_get_block_signers(setup_blockchain):
_test_blockchain_rpc(blockchain.get_block_signers, test_block_number)
block_signers = _test_blockchain_rpc(blockchain.get_block_signers, test_block_number)
assert isinstance(block_signers, list)
assert len(block_signers) > 0
@pytest.mark.run(order=16)
def test_get_validators(setup_blockchain):
_test_blockchain_rpc(blockchain.get_validators, test_epoch_number)
validators = _test_blockchain_rpc(blockchain.get_validators, test_epoch_number)
assert isinstance(validators, dict)
assert 'validators' in validators.keys()
assert len(validators['validators']) > 0
@pytest.mark.run(order=17)
def test_get_shard(setup_blockchain):
shard = _test_blockchain_rpc(blockchain.get_shard)
assert isinstance(shard, int)
assert shard == 0
@pytest.mark.run(order=18)
def test_get_staking_epoch(setup_blockchain):
staking_epoch = _test_blockchain_rpc(blockchain.get_staking_epoch)
assert isinstance(staking_epoch, int)
@pytest.mark.run(order=19)
def test_get_prestaking_epoch(setup_blockchain):
prestaking_epoch = _test_blockchain_rpc(blockchain.get_prestaking_epoch)
assert isinstance(prestaking_epoch, int)
@pytest.mark.run(order=20)
def test_get_bad_blocks(setup_blockchain):
# TODO: Remove skip when RPC is fixed
pytest.skip("Known error with hmy_getCurrentBadBlocks")
bad_blocks = _test_blockchain_rpc(blockchain.get_bad_blocks)
assert isinstance(bad_blocks, list)

@ -10,6 +10,7 @@ from pyhmy.rpc import (
)
explorer_endpoint = 'http://localhost:9599'
test_validator_address = 'one18tvf56zqjkjnak686lwutcp5mqfnvee35xjnhc'
def _test_staking_rpc(fn, *args, **kwargs):
@ -27,40 +28,66 @@ def _test_staking_rpc(fn, *args, **kwargs):
@pytest.mark.run(order=1)
def test_get_all_validator_addresses(setup_blockchain):
validator_addresses = _test_staking_rpc(staking.get_all_validator_addresses)
assert isinstance(validator_addresses, list)
assert len(validator_addresses) > 0
assert test_validator_address in validator_addresses
@pytest.mark.run(order=2)
def test_get_validator_information(setup_blockchain):
_test_staking_rpc(staking.get_validator_information, test_validator_address)
info = _test_staking_rpc(staking.get_validator_information, test_validator_address)
assert isinstance(info, dict)
@pytest.mark.run(order=3)
def test_get_all_validator_information(setup_blockchain):
all_validator_information = _test_staking_rpc(staking.get_all_validator_information)
assert isinstance(all_validator_information, list)
assert len(all_validator_information) > 0
@pytest.mark.run(order=4)
def test_get_delegations_by_delegator(setup_blockchain):
delegations = _test_staking_rpc(staking.get_delegations_by_delegator, test_validator_address)
assert isinstance(delegations, list)
assert len(delegations) > 0
@pytest.mark.run(order=5)
def test_get_delegations_by_validator(setup_blockchain):
delegations = _test_staking_rpc(staking.get_delegations_by_validator, test_validator_address)
assert isinstance(delegations, list)
assert len(delegations) > 0
@pytest.mark.run(order=6)
def test_get_current_utility_metrics(setup_blockchain):
_test_staking_rpc(staking.get_current_utility_metrics)
metrics = _test_staking_rpc(staking.get_current_utility_metrics)
assert isinstance(metrics, dict)
@pytest.mark.run(order=7)
def test_get_staking_network_info(setup_blockchain):
_test_staking_rpc(staking.get_staking_network_info)
info = _test_staking_rpc(staking.get_staking_network_info)
assert isinstance(info, dict)
@pytest.mark.run(order=8)
def test_get_super_committees(setup_blockchain):
_test_staking_rpc(staking.get_super_committees)
committee = _test_staking_rpc(staking.get_super_committees)
assert isinstance(committee, dict)
@pytest.mark.run(order=9)
def test_get_raw_median_stake_snapshot(setup_blockchain):
_test_staking_rpc(staking.get_raw_median_stake_snapshot)
median_stake = _test_staking_rpc(staking.get_raw_median_stake_snapshot)
assert isinstance(median_stake, dict)
@pytest.mark.run(order=10)
def test_get_validator_information_by_block(setup_blockchain):
# Apparently validator information not created until block after create-validator transaction is accepted, so +1 block
info = _test_staking_rpc(staking.get_validator_information_by_block, test_validator_address, setup_blockchain + 1, endpoint=explorer_endpoint)
assert isinstance(info, dict)
@pytest.mark.run(order=11)
def test_get_validator_information_by_block(setup_blockchain):
# Apparently validator information not created until block after create-validator transaction is accepted, so +1 block
info = _test_staking_rpc(staking.get_all_validator_information_by_block, setup_blockchain + 1, endpoint=explorer_endpoint)
assert isinstance(info, list)
@pytest.mark.run(order=12)
def test_get_delegations_by_delegator_by_block(setup_blockchain):
delegations = _test_staking_rpc(staking.get_delegations_by_delegator_by_block, test_validator_address, setup_blockchain + 1, endpoint=explorer_endpoint)
assert isinstance(delegations, list)

@ -37,12 +37,16 @@ def _test_transaction_rpc(fn, *args, **kwargs):
@pytest.mark.run(order=1)
def test_get_pending_transactions(setup_blockchain):
_test_transaction_rpc(transaction.get_pending_transactions)
pool = _test_transaction_rpc(transaction.get_pending_transactions)
assert isinstance(pool, list)
@pytest.mark.run(order=2)
def test_get_transaction_by_hash(setup_blockchain):
tx = _test_transaction_rpc(transaction.get_transaction_by_hash, tx_hash, endpoint=localhost_shard_one)
assert tx is not None
assert tx
assert isinstance(tx, dict)
assert 'blockNumber' in tx.keys()
assert 'blockHash' in tx.keys()
global tx_block_num
tx_block_num = int(tx['blockNumber'], 0)
global tx_block_hash
@ -54,7 +58,8 @@ def test_get_transaction_by_block_hash_and_index(setup_blockchain):
pytest.skip('Failed to get reference block hash')
tx = _test_transaction_rpc(transaction.get_transaction_by_block_hash_and_index,
tx_block_hash, test_index, endpoint=localhost_shard_one)
assert tx is not None
assert tx
assert isinstance(tx, dict)
@pytest.mark.run(order=4)
def test_get_transaction_by_block_number_and_index(setup_blockchain):
@ -62,40 +67,50 @@ def test_get_transaction_by_block_number_and_index(setup_blockchain):
pytest.skip('Failed to get reference block num')
tx = _test_transaction_rpc(transaction.get_transaction_by_block_number_and_index, tx_block_num, test_index,
endpoint=localhost_shard_one)
assert tx is not None
assert tx
assert isinstance(tx, dict)
@pytest.mark.run(order=5)
def test_get_transaction_receipt(setup_blockchain):
tx_receipt = _test_transaction_rpc(transaction.get_transaction_receipt, tx_hash, endpoint=localhost_shard_one)
assert tx_receipt is not None
assert tx_receipt
assert isinstance(tx_receipt, dict)
@pytest.mark.run(order=6)
def test_get_transaction_error_sink(setup_blockchain):
_test_transaction_rpc(transaction.get_transaction_error_sink)
errors = _test_transaction_rpc(transaction.get_transaction_error_sink)
assert isinstance(errors, list)
@pytest.mark.run(order=7)
def test_send_raw_transaction(setup_blockchain):
test_tx_hash = _test_transaction_rpc(transaction.send_raw_transaction, raw_tx)
assert isinstance(test_tx_hash, str)
assert test_tx_hash == tx_hash
@pytest.mark.run(order=8)
def test_get_pending_cx_receipts(setup_blockchain):
_test_transaction_rpc(transaction.get_pending_cx_receipts)
pending = _test_transaction_rpc(transaction.get_pending_cx_receipts)
assert isinstance(pending, list)
@pytest.mark.run(order=9)
def test_get_cx_receipt_by_hash(setup_blockchain):
cx = _test_transaction_rpc(transaction.get_cx_receipt_by_hash, cx_hash)
assert cx is not None
assert cx
assert isinstance(cx, dict)
@pytest.mark.run(order=10)
def test_resend_cx_receipt(setup_blockchain):
sent = _test_transaction_rpc(transaction.resend_cx_receipt, cx_hash)
assert isinstance(sent, bool)
assert not sent
@pytest.mark.run(order=11)
def test_get_staking_transaction_by_hash(setup_blockchain):
staking_tx = _test_transaction_rpc(transaction.get_staking_transaction_by_hash, stx_hash)
assert staking_tx is not None
assert staking_tx
assert isinstance(staking_tx, dict)
assert 'blockNumber' in staking_tx.keys()
assert 'blockHash' in staking_tx.keys()
global stx_block_num
stx_block_num = int(staking_tx['blockNumber'], 0)
global stx_block_hash
@ -106,20 +121,24 @@ def test_get_transaction_by_block_hash_and_index(setup_blockchain):
if not stx_block_hash:
pytest.skip('Failed to get reference block hash')
stx = _test_transaction_rpc(transaction.get_staking_transaction_by_block_hash_and_index, stx_block_hash, test_index)
assert stx is not None
assert stx
assert isinstance(stx, dict)
@pytest.mark.run(order=13)
def test_get_transaction_by_block_number_and_index(setup_blockchain):
if not stx_block_num:
pytest.skip('Failed to get reference block num')
stx = _test_transaction_rpc(transaction.get_staking_transaction_by_block_number_and_index, stx_block_num, test_index)
assert stx is not None
assert stx
assert isinstance(stx, dict)
@pytest.mark.run(order=14)
def test_get_staking_transaction_error_sink(setup_blockchain):
_test_transaction_rpc(transaction.get_staking_transaction_error_sink)
errors = _test_transaction_rpc(transaction.get_staking_transaction_error_sink)
assert isinstance(errors, list)
@pytest.mark.run(order=15)
def test_send_raw_staking_transaction(setup_blockchain):
test_stx_hash = _test_transaction_rpc(transaction.send_raw_staking_transaction, raw_stx)
assert isinstance(test_stx_hash, str)
assert test_stx_hash == stx_hash

Loading…
Cancel
Save