import json from eth_account.datastructures import SignedTransaction from decimal import Decimal, InvalidOperation from .account import get_balance, is_valid_address from .numbers import convert_one_to_atto from .exceptions import ( InvalidValidatorError, RPCError, RequestsError, RequestsTimeoutError, ) from .staking import get_all_validator_addresses, get_validator_information from .staking_structures import Directive from .staking_signing import sign_staking_transaction _default_endpoint = "http://localhost:9500" _default_timeout = 30 # TODO: Add unit testing class Validator: name_char_limit = 140 identity_char_limit = 140 website_char_limit = 140 security_contact_char_limit = 140 details_char_limit = 280 min_required_delegation = convert_one_to_atto(10000) # in ATTO def __init__(self, address): if not isinstance(address, str): raise InvalidValidatorError(1, "given ONE address was not a string") if not is_valid_address(address): raise InvalidValidatorError(1, f"{address} is not valid ONE address") self._address = address self._bls_keys = [] self._bls_key_sigs = [] self._name = None self._identity = None self._website = None self._details = None self._security_contact = None self._min_self_delegation = None self._max_total_delegation = None self._inital_delegation = None self._rate = None self._max_change_rate = None self._max_rate = None def _sanitize_input(self, data, check_str=False) -> str: """ If data is None, return '' else return data Raises ------ InvalidValidatorError if check_str is True and str is not passed """ if check_str: if not isinstance(data, str): raise InvalidValidatorError( 3, f"Expected data to be string to avoid floating point precision issues but got {data}", ) return "" if not data else str(data) def __str__(self) -> str: """ Returns JSON string representation of Validator fields """ info = self.export() for key, value in info.items(): if isinstance(value, Decimal): info[key] = str(value) return json.dumps(info) def __repr__(self) -> str: return f"" def get_address(self) -> str: """ Get validator address Returns ------- str Validator address """ return self._address def add_bls_key(self, key) -> bool: """ Add BLS public key to validator BLS keys if not already in list Returns ------- bool If adding BLS key succeeded """ key = self._sanitize_input(key) if key not in self._bls_keys: self._bls_keys.append(key) return True return False def remove_bls_key(self, key) -> bool: """ Remove BLS public key from validator BLS keys if exists Returns ------- bool If removing BLS key succeeded """ key = self._sanitize_input(key) if key in self._bls_keys: self._bls_keys.remove(key) return True return False def get_bls_keys(self) -> list: """ Get list of validator BLS keys Returns ------- list List of validator BLS keys (strings) """ return self._bls_keys def add_bls_key_sig(self, key) -> bool: """ Add BLS public key to validator BLS keys if not already in list Returns ------- bool If adding BLS key succeeded """ key = self._sanitize_input(key) if key not in self._bls_key_sigs: self._bls_key_sigs.append(key) return True return False def remove_bls_key_sig(self, key) -> bool: """ Remove BLS public key from validator BLS keys if exists Returns ------- bool If removing BLS key succeeded """ key = self._sanitize_input(key) if key in self._bls_key_sigs: self._bls_key_sigs.remove(key) return True return False def get_bls_key_sigs(self) -> list: """ Get list of validator BLS keys Returns ------- list List of validator BLS keys (strings) """ return self._bls_key_sigs def set_name(self, name): """ Set validator name Parameters ---------- name: str Name of validator Raises ------ InvalidValidatorError If input is invalid """ name = self._sanitize_input(name) if len(name) > self.name_char_limit: raise InvalidValidatorError( 3, f"Name must be less than {self.name_char_limit} characters" ) self._name = name def get_name(self) -> str: """ Get validator name Returns ------- str Validator name """ return self._name def set_identity(self, identity): """ Set validator identity Parameters ---------- identity: str Identity of validator Raises ------ InvalidValidatorError If input is invalid """ identity = self._sanitize_input(identity) if len(identity) > self.identity_char_limit: raise InvalidValidatorError( 3, f"Identity must be less than {self.identity_char_limit} characters" ) self._identity = identity def get_identity(self) -> str: """ Get validator identity Returns ------- str Validator identity """ return self._identity def set_website(self, website): """ Set validator website Parameters ---------- website: str Website of validator Raises ------ InvalidValidatorError If input is invalid """ website = self._sanitize_input(website) if len(website) > self.website_char_limit: raise InvalidValidatorError( 3, f"Website must be less than {self.website_char_limit} characters" ) self._website = website def get_website(self) -> str: """ Get validator website Returns ------- str Validator website """ return self._website def set_security_contact(self, contact): """ Set validator security contact Parameters ---------- contact: str Security contact of validator Raises ------ InvalidValidatorError If input is invalid """ contact = self._sanitize_input(contact) if len(contact) > self.security_contact_char_limit: raise InvalidValidatorError( 3, f"Security contact must be less than {self.security_contact_char_limit} characters", ) self._security_contact = contact def get_security_contact(self) -> str: """ Get validator security contact Returns ------- str Validator security contact """ return self._security_contact def set_details(self, details): """ Set validator details Parameters ---------- details: str Details of validator Raises ------ InvalidValidatorError If input is invalid """ details = self._sanitize_input(details) if len(details) > self.details_char_limit: raise InvalidValidatorError( 3, f"Details must be less than {self.details_char_limit} characters" ) self._details = details def get_details(self) -> str: """ Get validator details Returns ------- str Validator details """ return self._details def set_min_self_delegation(self, delegation): """ Set validator min self delegation Parameters ---------- delegation: int Minimum self delegation of validator in ATTO Raises ------ InvalidValidatorError If input is invalid """ delegation = self._sanitize_input(delegation) try: delegation = Decimal(delegation) except (TypeError, InvalidOperation) as e: raise InvalidValidatorError( 3, "Min self delegation must be a number" ) from e if delegation < self.min_required_delegation: raise InvalidValidatorError( 3, f"Min self delegation must be greater than {self.min_required_delegation} ATTO", ) self._min_self_delegation = delegation def get_min_self_delegation(self) -> Decimal: """ Get validator min self delegation Returns ------- Decimal Validator min self delegation in ATTO """ return self._min_self_delegation def set_max_total_delegation(self, max_delegation): """ Set validator max total delegation Parameters ---------- max_delegation: int Maximum total delegation of validator in ATTO Raises ------ InvalidValidatorError If input is invalid """ max_delegation = self._sanitize_input(max_delegation) try: max_delegation = Decimal(max_delegation) except (TypeError, InvalidOperation) as e: raise InvalidValidatorError( 3, "Max total delegation must be a number" ) from e if self._min_self_delegation: if max_delegation < self._min_self_delegation: raise InvalidValidatorError( 3, f"Max total delegation must be greater than min self delegation: " "{self._min_self_delegation}", ) else: raise InvalidValidatorError( 4, "Min self delegation must be set before max total delegation" ) self._max_total_delegation = max_delegation def get_max_total_delegation(self) -> Decimal: """ Get validator max total delegation Returns ------- Decimal Validator max total delegation in ATTO """ return self._max_total_delegation def set_amount(self, amount): """ Set validator initial delegation amount Parameters ---------- amount: str Initial delegation amount of validator in ATTO Raises ------ InvalidValidatorError If input is invalid """ amount = self._sanitize_input(amount) try: amount = Decimal(amount) except (TypeError, InvalidOperation) as e: raise InvalidValidatorError(3, "Amount must be a number") from e if self._min_self_delegation: if amount < self._min_self_delegation: raise InvalidValidatorError( 3, "Amount must be greater than min self delegation: " f"{self._min_self_delegation}", ) else: raise InvalidValidatorError( 4, "Min self delegation must be set before amount" ) if self._max_total_delegation: if amount > self._max_total_delegation: raise InvalidValidatorError( 3, "Amount must be less than max total delegation: " f"{self._max_total_delegation}", ) else: raise InvalidValidatorError( 4, "Max total delegation must be set before amount" ) self._inital_delegation = amount def get_amount(self) -> Decimal: """ Get validator initial delegation amount Returns ------- Decimal Intended initial delegation amount in ATTO """ return self._inital_delegation def set_max_rate(self, rate): """ Set validator max commission rate Parameters ---------- rate: str (to avoid precision troubles) Max commission rate of validator Raises ------ InvalidValidatorError If input is invalid """ rate = self._sanitize_input(rate, True) try: rate = Decimal(rate) except (TypeError, InvalidOperation) as e: raise InvalidValidatorError(3, "Max rate must be a number") from e if rate < 0 or rate > 1: raise InvalidValidatorError(3, "Max rate must be between 0 and 1") self._max_rate = rate def get_max_rate(self) -> Decimal: """ Get validator max commission rate Returns ------- Decimal Validator max rate """ return self._max_rate def set_max_change_rate(self, rate): """ Set validator max commission change rate Parameters ---------- rate: str (to avoid precision troubles) Max commission change rate of validator Raises ------ InvalidValidatorError If input is invalid """ rate = self._sanitize_input(rate, True) try: rate = Decimal(rate) except (TypeError, InvalidOperation) as e: raise InvalidValidatorError(3, "Max change rate must be a number") from e if rate < 0: raise InvalidValidatorError( 3, "Max change rate must be greater than or equal to 0" ) if self._max_rate: if rate > self._max_rate: raise InvalidValidatorError( 3, f"Max change rate must be less than or equal to max rate: {self._max_rate}", ) else: raise InvalidValidatorError( 4, "Max rate must be set before max change rate" ) self._max_change_rate = rate def get_max_change_rate(self) -> Decimal: """ Get validator max commission change rate Returns ------- Decimal (to avoid precision troubles) Validator max change rate """ return self._max_change_rate def set_rate(self, rate): """ Set validator commission rate Parameters ---------- rate: str (to avoid precision troubles) Commission rate of validator Raises ------ InvalidValidatorError If input is invalid """ rate = self._sanitize_input(rate, True) try: rate = Decimal(rate) except (TypeError, InvalidOperation) as e: raise InvalidValidatorError(3, "Rate must be a number") from e if rate < 0: raise InvalidValidatorError(3, "Rate must be greater than or equal to 0") if self._max_rate: if rate > self._max_rate: raise InvalidValidatorError( 3, f"Rate must be less than or equal to max rate: {self._max_rate}" ) else: raise InvalidValidatorError(4, "Max rate must be set before rate") self._rate = rate def get_rate(self) -> Decimal: """ Get validator commission rate Returns ------- Decimal Validator rate """ return self._rate def does_validator_exist( self, endpoint=_default_endpoint, timeout=_default_timeout ) -> bool: """ Check if validator exists on blockchain Parameters ---------- endpoint: :obj:`str`, optional Endpoint to send request to timeout: :obj:`int`, optional Timeout in seconds Returns ------- bool Does validator exist on chain Raises ------ RPCError, RequestsError, RequestsTimeoutError If unable to get list of validators on chain """ all_validators = get_all_validator_addresses(endpoint, timeout) if self._address in all_validators: return True return False def load(self, info): """ Import validator information Parameters ---------- info: dict Validator information with dictionary Will ignore any extra fields in the input dictionary Example input: { "name": "", "website": "", "security-contact": "", "identity": "", "details": "", "amount": 0, "min-self-delegation": 0, "max-total-delegation": 0, "rate": '0', "max-rate": '0', "max-change-rate": '0', "bls-public-keys": [ "" ] "bls-key-sigs": [ "" ] } Raises ------ InvalidValidatorError If input value is invalid """ try: self.set_name(info["name"]) self.set_identity(info["identity"]) self.set_website(info["website"]) self.set_details(info["details"]) self.set_security_contact(info["security-contact"]) self.set_min_self_delegation(info["min-self-delegation"]) self.set_max_total_delegation(info["max-total-delegation"]) self.set_amount(info["amount"]) self.set_max_rate(info["max-rate"]) self.set_max_change_rate(info["max-change-rate"]) self.set_rate(info["rate"]) self._bls_keys = [] for key in info["bls-public-keys"]: self.add_bls_key(key) self._bls_key_sigs = [] for key in info["bls-key-sigs"]: self.add_bls_key_sig(key) except KeyError as e: raise InvalidValidatorError(3, "Info has missing key") from e def load_from_blockchain( self, endpoint=_default_endpoint, timeout=_default_timeout ): """ Import validator information from blockchain with given address At the moment, this is unable to fetch the BLS Signature, which is not implemented in the Node API Parameters ---------- endpoint: :obj:`str`, optional Endpoint to send request to timeout: :obj:`int`, optional Timeout in seconds Raises ------ InvalidValidatorError If any error occur getting & importing validator information from the blockchain """ try: if not self.does_validator_exist(endpoint, timeout): raise InvalidValidatorError( 5, f"Validator does not exist on chain according to {endpoint}" ) except (RPCError, RequestsError, RequestsTimeoutError) as e: raise InvalidValidatorError( 5, "Error requesting validator information" ) from e try: validator_info = get_validator_information(self._address, endpoint, timeout) except (RPCError, RequestsError, RequestsTimeoutError) as e: raise InvalidValidatorError( 5, "Error requesting validator information" ) from e # Skip additional sanity checks when importing from chain try: info = validator_info["validator"] self._name = info["name"] self._identity = info["identity"] self._website = info["website"] self._details = info["details"] self._security_contact = info["security-contact"] self._min_self_delegation = info["min-self-delegation"] self._max_total_delegation = info["max-total-delegation"] self._inital_delegation = ( self._min_self_delegation ) # Since validator exists, set initial delegation to 0 self._max_rate = Decimal(info["max-rate"]) self._max_change_rate = Decimal(info["max-change-rate"]) self._rate = Decimal(info["rate"]) self._bls_keys = info["bls-public-keys"] except KeyError as e: raise InvalidValidatorError( 5, "Error importing validator information from RPC result" ) from e def export(self) -> dict: """ Export validator information as dict Returns ------- dict Dictionary representation of validator """ info = { "validator-addr": self._address, "name": self._name, "website": self._website, "security-contact": self._security_contact, "identity": self._identity, "details": self._details, "amount": self._inital_delegation, "min-self-delegation": self._min_self_delegation, "max-total-delegation": self._max_total_delegation, "rate": self._rate, "max-rate": self._max_rate, "max-change-rate": self._max_change_rate, "bls-public-keys": self._bls_keys, "bls-key-sigs": self._bls_key_sigs, } return info def sign_create_validator_transaction( self, nonce, gas_price, gas_limit, private_key, chain_id=None ) -> SignedTransaction: """ Create but not post a transaction to Create the Validator using private_key Returns ------- SignedTransaction object, the hash of which can be used to send the transaction using transaction.send_raw_transaction Raises ------ rlp.exceptions.ObjectSerializationError for malformed inputs API Reference ------------- https://github.com/harmony-one/sdk/blob/99a827782fabcd5f91f025af0d8de228956d42b4/packages/harmony-staking/src/stakingTransaction.ts#L413 """ info = self.export().copy() info["directive"] = Directive.CreateValidator info["validatorAddress"] = info.pop("validator-addr") # change the key info["nonce"] = nonce info["gasPrice"] = gas_price info["gasLimit"] = gas_limit if chain_id: info["chainId"] = chain_id return sign_staking_transaction(info, private_key) def sign_edit_validator_transaction( self, nonce, gas_price, gas_limit, rate, bls_key_to_remove, bls_key_to_add, bls_key_to_add_sig, private_key, chain_id=None, ) -> SignedTransaction: """ Create but not post a transaction to Edit the Validator using private_key Returns ------- SignedTransaction object, the hash of which can be used to send the transaction using transaction.send_raw_transaction Raises ------ rlp.exceptions.ObjectSerializationError for malformed inputs API Reference ------------- https://github.com/harmony-one/sdk/blob/99a827782fabcd5f91f025af0d8de228956d42b4/packages/harmony-staking/src/stakingTransaction.ts#L460 """ self.set_rate(rate) self.add_bls_key(bls_key_to_add) self.remove_bls_key(bls_key_to_remove) info = self.export().copy() info["directive"] = Directive.EditValidator info["validatorAddress"] = info.pop("validator-addr") # change the key info["nonce"] = nonce info["gasPrice"] = gas_price info["gasLimit"] = gas_limit _ = info.pop("max-rate") # not needed _ = info.pop("max-change-rate") # not needed _ = info.pop("bls-public-keys") # remove this list _ = info.pop("amount") # also unused info["bls-key-to-remove"] = bls_key_to_remove info["bls-key-to-add"] = bls_key_to_add info["bls-key-to-add-sig"] = bls_key_to_add_sig if chain_id: info["chainId"] = chain_id return sign_staking_transaction(info, private_key)