diff --git a/mythril/interfaces/cli.py b/mythril/interfaces/cli.py index 5334eb4b..9b0166f1 100644 --- a/mythril/interfaces/cli.py +++ b/mythril/interfaces/cli.py @@ -15,7 +15,12 @@ import coloredlogs import mythril.support.signatures as sigs from mythril.exceptions import AddressNotFoundError, CriticalError -from mythril.mythril import Mythril +from mythril.mythril import ( + MythrilAnalyzer, + MythrilDisassembler, + MythrilConfig, + MythrilLevelDB, +) from mythril.version import VERSION # logging.basicConfig(level=logging.DEBUG) @@ -42,6 +47,19 @@ def main(): parser = argparse.ArgumentParser( description="Security analysis of Ethereum smart contracts" ) + create_parser(parser) + + # Get config values + + args = parser.parse_args() + parse_args(parser=parser, args=args) + + +if __name__ == "__main__": + main() + + +def create_parser(parser): parser.add_argument("solidity_file", nargs="*") commands = parser.add_argument_group("commands") @@ -227,10 +245,8 @@ def main(): ) parser.add_argument("--epic", action="store_true", help=argparse.SUPPRESS) - # Get config values - - args = parser.parse_args() +def parse_args(parser, args): if args.epic: path = os.path.dirname(os.path.realpath(__file__)) sys.argv.remove("--epic") @@ -280,7 +296,7 @@ def main(): ) if args.query_signature: - if sigs.ethereum_input_decoder == None: + if sigs.ethereum_input_decoder is None: exit_with_error( args.outform, "The --query-signature function requires the python package ethereum-input-decoder", @@ -300,55 +316,57 @@ def main(): # -- commands -- if args.hash: - print(Mythril.hash_for_function_signature(args.hash)) + print(MythrilDisassembler.hash_for_function_signature(args.hash)) sys.exit() try: # the mythril object should be our main interface # infura = None, rpc = None, rpctls = None # solc_args = None, dynld = None, max_recursion_depth = 12): - - mythril = Mythril( - solv=args.solv, - dynld=args.dynld, - onchain_storage_access=(not args.no_onchain_storage_access), - solc_args=args.solc_args, - enable_online_lookup=args.query_signature, - ) + config = MythrilConfig() if ( args.dynld or not args.no_onchain_storage_access and not (args.rpc or args.i) ): - mythril.set_api_from_config_path() + config.set_api_from_config_path() if args.address: # Establish RPC connection if necessary - mythril.set_api_rpc(rpc=args.rpc, rpctls=args.rpctls) + config.set_api_rpc(rpc=args.rpc, rpctls=args.rpctls) elif args.search or args.contract_hash_to_address: # Open LevelDB if necessary - mythril.set_api_leveldb( - mythril.leveldb_dir if not args.leveldb_dir else args.leveldb_dir + config.set_api_leveldb( + config.leveldb_dir if not args.leveldb_dir else args.leveldb_dir ) - if args.search: - # Database search ops - mythril.search_db(args.search) - sys.exit() + if args.search or args.contract_hash_to_address: + leveldb_sercher = MythrilLevelDB(config.leveldb_dir) + if args.search: + # Database search ops + leveldb_sercher.search_db(args.search) - if args.contract_hash_to_address: - # search corresponding address - try: - mythril.contract_hash_to_address(args.contract_hash_to_address) - except AddressNotFoundError: - print("Address not found.") + else: + # search corresponding address + try: + leveldb_sercher.contract_hash_to_address( + args.contract_hash_to_address + ) + except AddressNotFoundError: + print("Address not found.") sys.exit() + dissasembler = MythrilDisassembler( + eth=config.eth, + solc_version=args.solv, + solc_args=args.solc_args, + enable_online_lookup=args.query_signature, + ) if args.truffle: try: # not really pythonic atm. needs refactoring - mythril.analyze_truffle_project(args) + dissasembler.analyze_truffle_project(args) except FileNotFoundError: print( "Build directory not found. Make sure that you start the analysis from the project root, and that 'truffle compile' has executed successfully." @@ -361,14 +379,14 @@ def main(): if args.code: # Load from bytecode code = args.code[2:] if args.code.startswith("0x") else args.code - address, _ = mythril.load_from_bytecode(code, args.bin_runtime) + address, _ = dissasembler.load_from_bytecode(code, args.bin_runtime) elif args.codefile: bytecode = "".join([l.strip() for l in args.codefile if len(l.strip()) > 0]) bytecode = bytecode[2:] if bytecode.startswith("0x") else bytecode - address, _ = mythril.load_from_bytecode(bytecode, args.bin_runtime) + address, _ = dissasembler.load_from_bytecode(bytecode, args.bin_runtime) elif args.address: # Get bytecode from a contract address - address, _ = mythril.load_from_address(args.address) + address, _ = dissasembler.load_from_address(args.address) elif args.solidity_file: # Compile Solidity source file(s) if args.graph and len(args.solidity_file) > 1: @@ -376,13 +394,20 @@ def main(): args.outform, "Cannot generate call graphs from multiple input files. Please do it one at a time.", ) - address, _ = mythril.load_from_solidity(args.solidity_file) # list of files + address, _ = dissasembler.load_from_solidity( + args.solidity_file + ) # list of files else: exit_with_error( args.outform, "No input bytecode. Please provide EVM code via -c BYTECODE, -a ADDRESS, or -i SOLIDITY_FILES", ) + analyzer = MythrilAnalyzer( + disassembler=dissasembler, + dynld=args.dynld, + onchain_storage_access=not args.no_onchain_storage_access, + ) # Commands if args.storage: @@ -392,7 +417,7 @@ def main(): "To read storage, provide the address of a deployed contract with the -a option.", ) - storage = mythril.get_state_variable_from_storage( + storage = dissasembler.get_state_variable_from_storage( address=address, params=[a.strip() for a in args.storage.strip().split(",")], ) @@ -401,21 +426,21 @@ def main(): elif args.disassemble: # or mythril.disassemble(mythril.contracts[0]) - if mythril.contracts[0].code: - print("Runtime Disassembly: \n" + mythril.contracts[0].get_easm()) - if mythril.contracts[0].creation_code: - print("Disassembly: \n" + mythril.contracts[0].get_creation_easm()) + if dissasembler.contracts[0].code: + print("Runtime Disassembly: \n" + dissasembler.contracts[0].get_easm()) + if dissasembler.contracts[0].creation_code: + print("Disassembly: \n" + dissasembler.contracts[0].get_creation_easm()) elif args.graph or args.fire_lasers: - if not mythril.contracts: + if not dissasembler.contracts: exit_with_error( args.outform, "input files do not contain any valid contracts" ) if args.graph: - html = mythril.graph_html( + html = analyzer.graph_html( strategy=args.strategy, - contract=mythril.contracts[0], + contract=analyzer.contracts[0], address=address, enable_physics=args.enable_physics, phrackify=args.phrack, @@ -433,7 +458,7 @@ def main(): else: try: - report = mythril.fire_lasers( + report = analyzer.fire_lasers( strategy=args.strategy, address=address, modules=[m.strip() for m in args.modules.strip().split(",")] @@ -460,14 +485,14 @@ def main(): elif args.statespace_json: - if not mythril.contracts: + if not analyzer.contracts: exit_with_error( args.outform, "input files do not contain any valid contracts" ) - statespace = mythril.dump_statespace( + statespace = analyzer.dump_statespace( strategy=args.strategy, - contract=mythril.contracts[0], + contract=analyzer.contracts[0], address=address, max_depth=args.max_depth, execution_timeout=args.execution_timeout, @@ -486,7 +511,3 @@ def main(): except CriticalError as ce: exit_with_error(args.outform, str(ce)) - - -if __name__ == "__main__": - main() diff --git a/mythril/mythril.py b/mythril/mythril.py deleted file mode 100644 index 5c08ab8a..00000000 --- a/mythril/mythril.py +++ /dev/null @@ -1,714 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -"""mythril.py: Bug hunting on the Ethereum blockchain - - http://www.github.com/b-mueller/mythril -""" - -import codecs -import logging -import os -import platform -import re -import traceback -from pathlib import Path -from shutil import copyfile -from configparser import ConfigParser - -import solc -from ethereum import utils -from solc.exceptions import SolcError - -from mythril.ethereum import util -from mythril.ethereum.evmcontract import EVMContract -from mythril.ethereum.interface.rpc.client import EthJsonRpc -from mythril.ethereum.interface.rpc.exceptions import ConnectionError -from mythril.solidity.soliditycontract import SolidityContract, get_contracts_from_file -from mythril.support import signatures -from mythril.support.source_support import Source -from mythril.support.loader import DynLoader -from mythril.exceptions import CompilerError, NoContractFoundError, CriticalError -from mythril.analysis.symbolic import SymExecWrapper -from mythril.analysis.callgraph import generate_graph -from mythril.analysis.traceexplore import get_serializable_statespace -from mythril.analysis.security import fire_lasers, retrieve_callback_issues -from mythril.analysis.report import Report -from mythril.support.truffle import analyze_truffle_project -from mythril.ethereum.interface.leveldb.client import EthLevelDB - -log = logging.getLogger(__name__) - - -class Mythril(object): - """Mythril main interface class. - - 1. create mythril object - 2. set rpc or leveldb interface if needed - 3. load contracts (from solidity, bytecode, address) - 4. fire_lasers - - .. code-block:: python - - mythril = Mythril() - mythril.set_api_rpc_infura() - - # (optional) other API adapters - mythril.set_api_rpc(args) - mythril.set_api_rpc_localhost() - mythril.set_api_leveldb(path) - - # (optional) other func - mythril.analyze_truffle_project(args) - mythril.search_db(args) - - # load contract - mythril.load_from_bytecode(bytecode) - mythril.load_from_address(address) - mythril.load_from_solidity(solidity_file) - - # analyze - print(mythril.fire_lasers(args).as_text()) - - # (optional) graph - for contract in mythril.contracts: - # prints html or save it to file - print(mythril.graph_html(args)) - - # (optional) other funcs - mythril.dump_statespaces(args) - mythril.disassemble(contract) - mythril.get_state_variable_from_storage(args) - """ - - def __init__( - self, - solv=None, - solc_args=None, - dynld=False, - enable_online_lookup=False, - onchain_storage_access=True, - ): - - self.solv = solv - self.solc_args = solc_args - self.dynld = dynld - self.onchain_storage_access = onchain_storage_access - self.enable_online_lookup = enable_online_lookup - - self.mythril_dir = self._init_mythril_dir() - - # tries mythril_dir/signatures.db by default (provide path= arg to make this configurable) - self.sigs = signatures.SignatureDB( - enable_online_lookup=self.enable_online_lookup - ) - - self.solc_binary = self._init_solc_binary(solv) - self.config_path = os.path.join(self.mythril_dir, "config.ini") - self.leveldb_dir = self._init_config() - - self.eth = None # ethereum API client - self.eth_db = None # ethereum LevelDB client - - self.contracts = [] # loaded contracts - - @staticmethod - def _init_mythril_dir(): - try: - mythril_dir = os.environ["MYTHRIL_DIR"] - except KeyError: - mythril_dir = os.path.join(os.path.expanduser("~"), ".mythril") - - if not os.path.exists(mythril_dir): - # Initialize data directory - log.info("Creating mythril data directory") - os.mkdir(mythril_dir) - - db_path = str(Path(mythril_dir) / "signatures.db") - if not os.path.exists(db_path): - # if the default mythril dir doesn't contain a signature DB - # initialize it with the default one from the project root - asset_dir = Path(__file__).parent / "support" / "assets" - copyfile(str(asset_dir / "signatures.db"), db_path) - - return mythril_dir - - def _init_config(self): - """If no config file exists, create it and add default options. - - Default LevelDB path is specified based on OS - dynamic loading is set to infura by default in the file - Returns: leveldb directory - """ - - system = platform.system().lower() - leveldb_fallback_dir = os.path.expanduser("~") - if system.startswith("darwin"): - leveldb_fallback_dir = os.path.join( - leveldb_fallback_dir, "Library", "Ethereum" - ) - elif system.startswith("windows"): - leveldb_fallback_dir = os.path.join( - leveldb_fallback_dir, "AppData", "Roaming", "Ethereum" - ) - else: - leveldb_fallback_dir = os.path.join(leveldb_fallback_dir, ".ethereum") - leveldb_fallback_dir = os.path.join(leveldb_fallback_dir, "geth", "chaindata") - - if not os.path.exists(self.config_path): - log.info("No config file found. Creating default: " + self.config_path) - open(self.config_path, "a").close() - - config = ConfigParser(allow_no_value=True) - config.optionxform = str - config.read(self.config_path, "utf-8") - if "defaults" not in config.sections(): - self._add_default_options(config) - - if not config.has_option("defaults", "leveldb_dir"): - self._add_leveldb_option(config, leveldb_fallback_dir) - - if not config.has_option("defaults", "dynamic_loading"): - self._add_dynamic_loading_option(config) - - with codecs.open(self.config_path, "w", "utf-8") as fp: - config.write(fp) - - leveldb_dir = config.get( - "defaults", "leveldb_dir", fallback=leveldb_fallback_dir - ) - return os.path.expanduser(leveldb_dir) - - @staticmethod - def _add_default_options(config): - config.add_section("defaults") - - @staticmethod - def _add_leveldb_option(config, leveldb_fallback_dir): - config.set("defaults", "#Default chaindata locations:") - config.set("defaults", "#– Mac: ~/Library/Ethereum/geth/chaindata") - config.set("defaults", "#– Linux: ~/.ethereum/geth/chaindata") - config.set( - "defaults", - "#– Windows: %USERPROFILE%\\AppData\\Roaming\\Ethereum\\geth\\chaindata", - ) - config.set("defaults", "leveldb_dir", leveldb_fallback_dir) - - @staticmethod - def _add_dynamic_loading_option(config): - config.set("defaults", "#– To connect to Infura use dynamic_loading: infura") - config.set( - "defaults", - "#– To connect to Rpc use " - "dynamic_loading: HOST:PORT / ganache / infura-[network_name]", - ) - config.set( - "defaults", "#– To connect to local host use dynamic_loading: localhost" - ) - config.set("defaults", "dynamic_loading", "infura") - - def analyze_truffle_project(self, *args, **kwargs): - """ - - :param args: - :param kwargs: - :return: - """ - return analyze_truffle_project( - self.sigs, *args, **kwargs - ) # just passthru by passing signatures for now - - @staticmethod - def _init_solc_binary(version): - """Figure out solc binary and version. - - Only proper versions are supported. No nightlies, commits etc (such as available in remix). - """ - - if not version: - return os.environ.get("SOLC") or "solc" - - # tried converting input to semver, seemed not necessary so just slicing for now - main_version = solc.main.get_solc_version_string() - main_version_number = re.match(r"\d+.\d+.\d+", main_version) - if main_version is None: - raise CriticalError( - "Could not extract solc version from string {}".format(main_version) - ) - if version == main_version_number: - log.info("Given version matches installed version") - solc_binary = os.environ.get("SOLC") or "solc" - else: - solc_binary = util.solc_exists(version) - if solc_binary: - log.info("Given version is already installed") - else: - try: - solc.install_solc("v" + version) - solc_binary = util.solc_exists(version) - if not solc_binary: - raise SolcError() - except SolcError: - raise CriticalError( - "There was an error when trying to install the specified solc version" - ) - - log.info("Setting the compiler to %s", solc_binary) - - return solc_binary - - def set_api_leveldb(self, leveldb): - """ - - :param leveldb: - :return: - """ - self.eth_db = EthLevelDB(leveldb) - self.eth = self.eth_db - return self.eth - - def set_api_rpc_infura(self): - """Set the RPC mode to INFURA on mainnet.""" - self.eth = EthJsonRpc("mainnet.infura.io", 443, True) - log.info("Using INFURA for RPC queries") - - def set_api_rpc(self, rpc=None, rpctls=False): - """ - - :param rpc: - :param rpctls: - """ - if rpc == "ganache": - rpcconfig = ("localhost", 8545, False) - else: - m = re.match(r"infura-(.*)", rpc) - if m and m.group(1) in ["mainnet", "rinkeby", "kovan", "ropsten"]: - rpcconfig = (m.group(1) + ".infura.io", 443, True) - else: - try: - host, port = rpc.split(":") - rpcconfig = (host, int(port), rpctls) - except ValueError: - raise CriticalError( - "Invalid RPC argument, use 'ganache', 'infura-[network]' or 'HOST:PORT'" - ) - - if rpcconfig: - self.eth = EthJsonRpc(rpcconfig[0], int(rpcconfig[1]), rpcconfig[2]) - log.info("Using RPC settings: %s" % str(rpcconfig)) - else: - raise CriticalError("Invalid RPC settings, check help for details.") - - def set_api_rpc_localhost(self): - """Set the RPC mode to a local instance.""" - self.eth = EthJsonRpc("localhost", 8545) - log.info("Using default RPC settings: http://localhost:8545") - - def set_api_from_config_path(self): - """Set the RPC mode based on a given config file.""" - config = ConfigParser(allow_no_value=False) - config.optionxform = str - config.read(self.config_path, "utf-8") - if config.has_option("defaults", "dynamic_loading"): - dynamic_loading = config.get("defaults", "dynamic_loading") - else: - dynamic_loading = "infura" - if dynamic_loading == "infura": - self.set_api_rpc_infura() - elif dynamic_loading == "localhost": - self.set_api_rpc_localhost() - else: - self.set_api_rpc(dynamic_loading) - - def search_db(self, search): - """ - - :param search: - """ - - def search_callback(_, address, balance): - """ - - :param _: - :param address: - :param balance: - """ - print("Address: " + address + ", balance: " + str(balance)) - - try: - self.eth_db.search(search, search_callback) - - except SyntaxError: - raise CriticalError("Syntax error in search expression.") - - def contract_hash_to_address(self, hash): - """ - - :param hash: - """ - if not re.match(r"0x[a-fA-F0-9]{64}", hash): - raise CriticalError("Invalid address hash. Expected format is '0x...'.") - - print(self.eth_db.contract_hash_to_address(hash)) - - def load_from_bytecode(self, code, bin_runtime=False, address=None): - """ - - :param code: - :param bin_runtime: - :param address: - :return: - """ - if address is None: - address = util.get_indexed_address(0) - if bin_runtime: - self.contracts.append( - EVMContract( - code=code, - name="MAIN", - enable_online_lookup=self.enable_online_lookup, - ) - ) - else: - self.contracts.append( - EVMContract( - creation_code=code, - name="MAIN", - enable_online_lookup=self.enable_online_lookup, - ) - ) - return address, self.contracts[-1] # return address and contract object - - def load_from_address(self, address): - """ - - :param address: - :return: - """ - if not re.match(r"0x[a-fA-F0-9]{40}", address): - raise CriticalError("Invalid contract address. Expected format is '0x...'.") - - try: - code = self.eth.eth_getCode(address) - except FileNotFoundError as e: - raise CriticalError("IPC error: " + str(e)) - except ConnectionError: - raise CriticalError( - "Could not connect to RPC server. Make sure that your node is running and that RPC parameters are set correctly." - ) - except Exception as e: - raise CriticalError("IPC / RPC error: " + str(e)) - else: - if code == "0x" or code == "0x0": - raise CriticalError( - "Received an empty response from eth_getCode. Check the contract address and verify that you are on the correct chain." - ) - else: - self.contracts.append( - EVMContract( - code, - name=address, - enable_online_lookup=self.enable_online_lookup, - ) - ) - return address, self.contracts[-1] # return address and contract object - - def load_from_solidity(self, solidity_files): - """ - - :param solidity_files: - :return: - """ - address = util.get_indexed_address(0) - contracts = [] - for file in solidity_files: - if ":" in file: - file, contract_name = file.split(":") - else: - contract_name = None - - file = os.path.expanduser(file) - - try: - # import signatures from solidity source - self.sigs.import_solidity_file( - file, solc_binary=self.solc_binary, solc_args=self.solc_args - ) - if contract_name is not None: - contract = SolidityContract( - input_file=file, - name=contract_name, - solc_args=self.solc_args, - solc_binary=self.solc_binary, - ) - self.contracts.append(contract) - contracts.append(contract) - else: - for contract in get_contracts_from_file( - input_file=file, - solc_args=self.solc_args, - solc_binary=self.solc_binary, - ): - self.contracts.append(contract) - contracts.append(contract) - - except FileNotFoundError: - raise CriticalError("Input file not found: " + file) - except CompilerError as e: - raise CriticalError(e) - except NoContractFoundError: - log.error( - "The file " + file + " does not contain a compilable contract." - ) - - return address, contracts - - def dump_statespace( - self, - strategy, - contract, - address=None, - max_depth=None, - execution_timeout=None, - create_timeout=None, - enable_iprof=False, - ): - """ - - :param strategy: - :param contract: - :param address: - :param max_depth: - :param execution_timeout: - :param create_timeout: - :return: - """ - sym = SymExecWrapper( - contract, - address, - strategy, - dynloader=DynLoader( - self.eth, - storage_loading=self.onchain_storage_access, - contract_loading=self.dynld, - ), - max_depth=max_depth, - execution_timeout=execution_timeout, - create_timeout=create_timeout, - enable_iprof=enable_iprof, - ) - - return get_serializable_statespace(sym) - - def graph_html( - self, - strategy, - contract, - address, - max_depth=None, - enable_physics=False, - phrackify=False, - execution_timeout=None, - create_timeout=None, - enable_iprof=False, - ): - """ - - :param strategy: - :param contract: - :param address: - :param max_depth: - :param enable_physics: - :param phrackify: - :param execution_timeout: - :param create_timeout: - :return: - """ - sym = SymExecWrapper( - contract, - address, - strategy, - dynloader=DynLoader( - self.eth, - storage_loading=self.onchain_storage_access, - contract_loading=self.dynld, - ), - max_depth=max_depth, - execution_timeout=execution_timeout, - create_timeout=create_timeout, - enable_iprof=enable_iprof, - ) - return generate_graph(sym, physics=enable_physics, phrackify=phrackify) - - def fire_lasers( - self, - strategy, - contracts=None, - address=None, - modules=None, - verbose_report=False, - max_depth=None, - execution_timeout=None, - create_timeout=None, - transaction_count=None, - enable_iprof=False, - ): - """ - - :param strategy: - :param contracts: - :param address: - :param modules: - :param verbose_report: - :param max_depth: - :param execution_timeout: - :param create_timeout: - :param transaction_count: - :return: - """ - all_issues = [] - for contract in contracts or self.contracts: - try: - sym = SymExecWrapper( - contract, - address, - strategy, - dynloader=DynLoader( - self.eth, - storage_loading=self.onchain_storage_access, - contract_loading=self.dynld, - ), - max_depth=max_depth, - execution_timeout=execution_timeout, - create_timeout=create_timeout, - transaction_count=transaction_count, - modules=modules, - compulsory_statespace=False, - enable_iprof=enable_iprof, - ) - issues = fire_lasers(sym, modules) - except KeyboardInterrupt: - log.critical("Keyboard Interrupt") - issues = retrieve_callback_issues(modules) - except Exception: - log.critical( - "Exception occurred, aborting analysis. Please report this issue to the Mythril GitHub page.\n" - + traceback.format_exc() - ) - issues = retrieve_callback_issues(modules) - - for issue in issues: - issue.add_code_info(contract) - - all_issues += issues - - source_data = Source() - source_data.get_source_from_contracts_list(self.contracts) - # Finally, output the results - report = Report(verbose_report, source_data) - for issue in all_issues: - report.append_issue(issue) - - return report - - def get_state_variable_from_storage(self, address, params=None): - """ - - :param address: - :param params: - :return: - """ - if params is None: - params = [] - (position, length, mappings) = (0, 1, []) - try: - if params[0] == "mapping": - if len(params) < 3: - raise CriticalError("Invalid number of parameters.") - position = int(params[1]) - position_formatted = utils.zpad(utils.int_to_big_endian(position), 32) - for i in range(2, len(params)): - key = bytes(params[i], "utf8") - key_formatted = utils.rzpad(key, 32) - mappings.append( - int.from_bytes( - utils.sha3(key_formatted + position_formatted), - byteorder="big", - ) - ) - - length = len(mappings) - if length == 1: - position = mappings[0] - - else: - if len(params) >= 4: - raise CriticalError("Invalid number of parameters.") - - if len(params) >= 1: - position = int(params[0]) - if len(params) >= 2: - length = int(params[1]) - if len(params) == 3 and params[2] == "array": - position_formatted = utils.zpad( - utils.int_to_big_endian(position), 32 - ) - position = int.from_bytes( - utils.sha3(position_formatted), byteorder="big" - ) - - except ValueError: - raise CriticalError( - "Invalid storage index. Please provide a numeric value." - ) - - outtxt = [] - - try: - if length == 1: - outtxt.append( - "{}: {}".format( - position, self.eth.eth_getStorageAt(address, position) - ) - ) - else: - if len(mappings) > 0: - for i in range(0, len(mappings)): - position = mappings[i] - outtxt.append( - "{}: {}".format( - hex(position), - self.eth.eth_getStorageAt(address, position), - ) - ) - else: - for i in range(position, position + length): - outtxt.append( - "{}: {}".format( - hex(i), self.eth.eth_getStorageAt(address, i) - ) - ) - except FileNotFoundError as e: - raise CriticalError("IPC error: " + str(e)) - except ConnectionError: - raise CriticalError( - "Could not connect to RPC server. Make sure that your node is running and that RPC parameters are set correctly." - ) - return "\n".join(outtxt) - - @staticmethod - def disassemble(contract): - """ - - :param contract: - :return: - """ - return contract.get_easm() - - @staticmethod - def hash_for_function_signature(sig): - """ - - :param sig: - :return: - """ - return "0x%s" % utils.sha3(sig)[:4].hex() diff --git a/mythril/mythril/__init__.py b/mythril/mythril/__init__.py new file mode 100644 index 00000000..189a7812 --- /dev/null +++ b/mythril/mythril/__init__.py @@ -0,0 +1,4 @@ +from .mythril_disassembler import MythrilDisassembler +from .mythril_analyzer import MythrilAnalyzer +from .mythril_config import MythrilConfig +from .mythril_leveldb import MythrilLevelDB diff --git a/mythril/mythril/mythril_analyzer.py b/mythril/mythril/mythril_analyzer.py new file mode 100644 index 00000000..e862e090 --- /dev/null +++ b/mythril/mythril/mythril_analyzer.py @@ -0,0 +1,197 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +"""mythril.py: Bug hunting on the Ethereum blockchain + + http://www.github.com/b-mueller/mythril +""" + +import logging +import traceback +from mythril.support.source_support import Source +from mythril.support.loader import DynLoader +from mythril.analysis.symbolic import SymExecWrapper +from mythril.analysis.callgraph import generate_graph +from mythril.analysis.traceexplore import get_serializable_statespace +from mythril.analysis.security import fire_lasers, retrieve_callback_issues +from mythril.analysis.report import Report + +log = logging.getLogger(__name__) + + +class MythrilAnalyzer(object): + """Mythril main interface class. + .. code-block:: python + + mythril = Mythril() + + # analyze + print(mythril.fire_lasers(args).as_text()) + + # (optional) graph + for contract in mythril.contracts: + # prints html or save it to file + print(mythril.graph_html(args)) + + # (optional) other funcs + mythril.dump_statespaces(args) + mythril.disassemble(contract) + mythril.get_state_variable_from_storage(args) + """ + + def __init__(self, disassembler, dynld=False, onchain_storage_access=True): + self.eth = disassembler.eth + self.contracts = disassembler.contracts or [] + self.enable_online_lookup = disassembler.enable_online_lookup + self.dynld = dynld + self.onchain_storage_access = onchain_storage_access + + def dump_statespace( + self, + strategy, + contract, + address=None, + max_depth=None, + execution_timeout=None, + create_timeout=None, + enable_iprof=False, + ): + """ + + :param strategy: + :param contract: + :param address: + :param max_depth: + :param execution_timeout: + :param create_timeout: + :param enable_iprof: + :return: + """ + sym = SymExecWrapper( + contract, + address, + strategy, + dynloader=DynLoader( + self.eth, + storage_loading=self.onchain_storage_access, + contract_loading=self.dynld, + ), + max_depth=max_depth, + execution_timeout=execution_timeout, + create_timeout=create_timeout, + enable_iprof=enable_iprof, + ) + + return get_serializable_statespace(sym) + + def graph_html( + self, + strategy, + contract, + address, + max_depth=None, + enable_physics=False, + phrackify=False, + execution_timeout=None, + create_timeout=None, + enable_iprof=False, + ): + """ + + :param strategy: + :param contract: + :param address: + :param max_depth: + :param enable_physics: + :param phrackify: + :param execution_timeout: + :param create_timeout: + :param enable_iprof: + :return: + """ + sym = SymExecWrapper( + contract, + address, + strategy, + dynloader=DynLoader( + self.eth, + storage_loading=self.onchain_storage_access, + contract_loading=self.dynld, + ), + max_depth=max_depth, + execution_timeout=execution_timeout, + create_timeout=create_timeout, + enable_iprof=enable_iprof, + ) + return generate_graph(sym, physics=enable_physics, phrackify=phrackify) + + def fire_lasers( + self, + strategy, + contracts=None, + address=None, + modules=None, + verbose_report=False, + max_depth=None, + execution_timeout=None, + create_timeout=None, + transaction_count=None, + enable_iprof=False, + ): + """ + + :param strategy: + :param contracts: + :param address: + :param modules: + :param verbose_report: + :param max_depth: + :param execution_timeout: + :param create_timeout: + :param transaction_count: + :param enable_iprof: + :return: + """ + all_issues = [] + for contract in contracts or self.contracts: + try: + sym = SymExecWrapper( + contract, + address, + strategy, + dynloader=DynLoader( + self.eth, + storage_loading=self.onchain_storage_access, + contract_loading=self.dynld, + ), + max_depth=max_depth, + execution_timeout=execution_timeout, + create_timeout=create_timeout, + transaction_count=transaction_count, + modules=modules, + compulsory_statespace=False, + enable_iprof=enable_iprof, + ) + issues = fire_lasers(sym, modules) + except KeyboardInterrupt: + log.critical("Keyboard Interrupt") + issues = retrieve_callback_issues(modules) + except Exception: + log.critical( + "Exception occurred, aborting analysis. Please report this issue to the Mythril GitHub page.\n" + + traceback.format_exc() + ) + issues = retrieve_callback_issues(modules) + + for issue in issues: + issue.add_code_info(contract) + + all_issues += issues + + source_data = Source() + source_data.get_source_from_contracts_list(self.contracts) + # Finally, output the results + report = Report(verbose_report, source_data) + for issue in all_issues: + report.append_issue(issue) + + return report diff --git a/mythril/mythril/mythril_config.py b/mythril/mythril/mythril_config.py new file mode 100644 index 00000000..e52a079c --- /dev/null +++ b/mythril/mythril/mythril_config.py @@ -0,0 +1,183 @@ +import codecs +import logging +import os +import platform +import re + +from pathlib import Path +from shutil import copyfile +from configparser import ConfigParser + +from mythril.exceptions import CriticalError +from mythril.ethereum.interface.rpc.client import EthJsonRpc +from mythril.ethereum.interface.leveldb.client import EthLevelDB + +log = logging.getLogger(__name__) + + +class MythrilConfig(object): + def __init__(self): + self.mythril_dir = self._init_mythril_dir() + self.config_path = os.path.join(self.mythril_dir, "config.ini") + self.leveldb_dir = self._init_config() + self.eth = None + + @staticmethod + def _init_mythril_dir(): + try: + mythril_dir = os.environ["MYTHRIL_DIR"] + except KeyError: + mythril_dir = os.path.join(os.path.expanduser("~"), ".mythril") + + if not os.path.exists(mythril_dir): + # Initialize data directory + log.info("Creating mythril data directory") + os.mkdir(mythril_dir) + + db_path = str(Path(mythril_dir) / "signatures.db") + if not os.path.exists(db_path): + # if the default mythril dir doesn't contain a signature DB + # initialize it with the default one from the project root + asset_dir = Path(__file__).parent / "support" / "assets" + copyfile(str(asset_dir / "signatures.db"), db_path) + + return mythril_dir + + def _init_config(self): + """If no config file exists, create it and add default options. + + Default LevelDB path is specified based on OS + dynamic loading is set to infura by default in the file + Returns: leveldb directory + """ + + leveldb_fallback_dir = self._get_fallback_dir() + + if not os.path.exists(self.config_path): + log.info("No config file found. Creating default: " + self.config_path) + open(self.config_path, "a").close() + + config = ConfigParser(allow_no_value=True) + config.optionxform = str + config.read(self.config_path, "utf-8") + if "defaults" not in config.sections(): + self._add_default_options(config) + + if not config.has_option("defaults", "leveldb_dir"): + self._add_leveldb_option(config, leveldb_fallback_dir) + + if not config.has_option("defaults", "dynamic_loading"): + self._add_dynamic_loading_option(config) + + with codecs.open(self.config_path, "w", "utf-8") as fp: + config.write(fp) + + leveldb_dir = config.get( + "defaults", "leveldb_dir", fallback=leveldb_fallback_dir + ) + return os.path.expanduser(leveldb_dir) + + @staticmethod + def _get_fallback_dir(): + system = platform.system().lower() + leveldb_fallback_dir = os.path.expanduser("~") + if system.startswith("darwin"): + leveldb_fallback_dir = os.path.join( + leveldb_fallback_dir, "Library", "Ethereum" + ) + elif system.startswith("windows"): + leveldb_fallback_dir = os.path.join( + leveldb_fallback_dir, "AppData", "Roaming", "Ethereum" + ) + else: + leveldb_fallback_dir = os.path.join(leveldb_fallback_dir, ".ethereum") + return os.path.join(leveldb_fallback_dir, "geth", "chaindata") + + @staticmethod + def _add_default_options(config): + config.add_section("defaults") + + @staticmethod + def _add_leveldb_option(config, leveldb_fallback_dir): + config.set("defaults", "#Default chaindata locations:") + config.set("defaults", "#– Mac: ~/Library/Ethereum/geth/chaindata") + config.set("defaults", "#– Linux: ~/.ethereum/geth/chaindata") + config.set( + "defaults", + "#– Windows: %USERPROFILE%\\AppData\\Roaming\\Ethereum\\geth\\chaindata", + ) + config.set("defaults", "leveldb_dir", leveldb_fallback_dir) + + @staticmethod + def _add_dynamic_loading_option(config): + config.set("defaults", "#– To connect to Infura use dynamic_loading: infura") + config.set( + "defaults", + "#– To connect to Rpc use " + "dynamic_loading: HOST:PORT / ganache / infura-[network_name]", + ) + config.set( + "defaults", "#– To connect to local host use dynamic_loading: localhost" + ) + config.set("defaults", "dynamic_loading", "infura") + + def set_api_leveldb(self, leveldb_path): + """ + + :param leveldb: + :return: + """ + self.eth = EthLevelDB(leveldb_path) + + def set_api_rpc_infura(self): + """Set the RPC mode to INFURA on mainnet.""" + log.info("Using INFURA for RPC queries") + self.eth = EthJsonRpc("mainnet.infura.io", 443, True) + + def set_api_rpc(self, rpc=None, rpctls=False): + """ + + :param rpc: + :param rpctls: + """ + if rpc == "ganache": + rpcconfig = ("localhost", 8545, False) + else: + m = re.match(r"infura-(.*)", rpc) + if m and m.group(1) in ["mainnet", "rinkeby", "kovan", "ropsten"]: + rpcconfig = (m.group(1) + ".infura.io", 443, True) + else: + try: + host, port = rpc.split(":") + rpcconfig = (host, int(port), rpctls) + except ValueError: + raise CriticalError( + "Invalid RPC argument, use 'ganache', 'infura-[network]' or 'HOST:PORT'" + ) + + if rpcconfig: + log.info("Using RPC settings: %s" % str(rpcconfig)) + self.eth = EthJsonRpc(rpcconfig[0], int(rpcconfig[1]), rpcconfig[2]) + else: + raise CriticalError("Invalid RPC settings, check help for details.") + + def set_api_rpc_localhost(): + """Set the RPC mode to a local instance.""" + log.info("Using default RPC settings: http://localhost:8545") + self.eth = EthJsonRpc("localhost", 8545) + + def set_api_from_config_path(self): + """Set the RPC mode based on a given config file.""" + config = ConfigParser(allow_no_value=False) + config.optionxform = str + config.read(self.config_path, "utf-8") + if config.has_option("defaults", "dynamic_loading"): + dynamic_loading = config.get("defaults", "dynamic_loading") + else: + dynamic_loading = "infura" + if dynamic_loading == "infura": + return self.set_api_rpc_infura() + elif dynamic_loading == "localhost": + return self.set_api_rpc_localhost() + else: + return self.set_api_rpc(dynamic_loading) diff --git a/mythril/mythril/mythril_disassembler.py b/mythril/mythril/mythril_disassembler.py new file mode 100644 index 00000000..1fdd9f0d --- /dev/null +++ b/mythril/mythril/mythril_disassembler.py @@ -0,0 +1,293 @@ +import logging +import re +import solc +import os + +from ethereum import utils +from solc.exceptions import SolcError + +from mythril.ethereum import util +from mythril.exceptions import CriticalError, CompilerError, NoContractFoundError +from mythril.support import signatures +from mythril.support.truffle import analyze_truffle_project +from mythril.ethereum.evmcontract import EVMContract +from mythril.ethereum.interface.rpc.exceptions import ConnectionError +from mythril.solidity.soliditycontract import SolidityContract, get_contracts_from_file + +log = logging.getLogger(__name__) + + +class MythrilDisassembler: + def __init__( + self, eth, solc_version=None, solc_args=None, enable_online_lookup=False + ): + self.solc_binary = self._init_solc_binary(solc_version) + self.solc_args = solc_args + self.eth = eth + self.enable_online_lookup = enable_online_lookup + self.sigs = signatures.SignatureDB(enable_online_lookup=enable_online_lookup) + self.contracts = [] + + @staticmethod + def _init_solc_binary(version): + """Figure out solc binary and version. + + Only proper versions are supported. No nightlies, commits etc (such as available in remix). + """ + + if not version: + return os.environ.get("SOLC") or "solc" + + # tried converting input to semver, seemed not necessary so just slicing for now + main_version = solc.main.get_solc_version_string() + main_version_number = re.match(r"\d+.\d+.\d+", main_version) + if main_version is None: + raise CriticalError( + "Could not extract solc version from string {}".format(main_version) + ) + if version == main_version_number: + log.info("Given version matches installed version") + solc_binary = os.environ.get("SOLC") or "solc" + else: + solc_binary = util.solc_exists(version) + if solc_binary: + log.info("Given version is already installed") + else: + try: + solc.install_solc("v" + version) + solc_binary = util.solc_exists(version) + if not solc_binary: + raise SolcError() + except SolcError: + raise CriticalError( + "There was an error when trying to install the specified solc version" + ) + + log.info("Setting the compiler to %s", solc_binary) + + return solc_binary + + def load_from_bytecode(self, code, bin_runtime=False, address=None): + """ + + :param code: + :param bin_runtime: + :param address: + :return: + """ + if address is None: + address = util.get_indexed_address(0) + if bin_runtime: + self.contracts.append( + EVMContract( + code=code, + name="MAIN", + enable_online_lookup=self.enable_online_lookup, + ) + ) + else: + self.contracts.append( + EVMContract( + creation_code=code, + name="MAIN", + enable_online_lookup=self.enable_online_lookup, + ) + ) + return address, self.contracts[-1] # return address and contract object + + def load_from_address(self, address): + """ + + :param address: + :return: + """ + if not re.match(r"0x[a-fA-F0-9]{40}", address): + raise CriticalError("Invalid contract address. Expected format is '0x...'.") + + try: + code = self.eth.eth_getCode(address) + except FileNotFoundError as e: + raise CriticalError("IPC error: " + str(e)) + except ConnectionError: + raise CriticalError( + "Could not connect to RPC server. Make sure that your node is running and that RPC parameters are set correctly." + ) + except Exception as e: + raise CriticalError("IPC / RPC error: " + str(e)) + else: + if code == "0x" or code == "0x0": + raise CriticalError( + "Received an empty response from eth_getCode. Check the contract address and verify that you are on the correct chain." + ) + else: + self.contracts.append( + EVMContract( + code, + name=address, + enable_online_lookup=self.enable_online_lookup, + ) + ) + return address, self.contracts[-1] # return address and contract object + + def load_from_solidity(self, solidity_files): + """ + + :param solidity_files: + :return: + """ + address = util.get_indexed_address(0) + contracts = [] + for file in solidity_files: + if ":" in file: + file, contract_name = file.split(":") + else: + contract_name = None + + file = os.path.expanduser(file) + + try: + # import signatures from solidity source + self.sigs.import_solidity_file( + file, solc_binary=self.solc_binary, solc_args=self.solc_args + ) + if contract_name is not None: + contract = SolidityContract( + input_file=file, + name=contract_name, + solc_args=self.solc_args, + solc_binary=self.solc_binary, + ) + self.contracts.append(contract) + contracts.append(contract) + else: + for contract in get_contracts_from_file( + input_file=file, + solc_args=self.solc_args, + solc_binary=self.solc_binary, + ): + self.contracts.append(contract) + contracts.append(contract) + + except FileNotFoundError: + raise CriticalError("Input file not found: " + file) + except CompilerError as e: + raise CriticalError(e) + except NoContractFoundError: + log.error( + "The file " + file + " does not contain a compilable contract." + ) + + return address, contracts + + def analyze_truffle_project(self, *args, **kwargs): + """ + :param args: + :param kwargs: + :return: + """ + return analyze_truffle_project( + self.sigs, *args, **kwargs + ) # just passthru by passing signatures for now + + @staticmethod + def disassemble(contract): + """ + + :param contract: + :return: + """ + return contract.get_easm() + + @staticmethod + def hash_for_function_signature(sig): + """ + + :param sig: + :return: + """ + return "0x%s" % utils.sha3(sig)[:4].hex() + + def get_state_variable_from_storage(self, address, params=None): + """ + + :param address: + :param params: + :return: + """ + if params is None: + params = [] + (position, length, mappings) = (0, 1, []) + try: + if params[0] == "mapping": + if len(params) < 3: + raise CriticalError("Invalid number of parameters.") + position = int(params[1]) + position_formatted = utils.zpad(utils.int_to_big_endian(position), 32) + for i in range(2, len(params)): + key = bytes(params[i], "utf8") + key_formatted = utils.rzpad(key, 32) + mappings.append( + int.from_bytes( + utils.sha3(key_formatted + position_formatted), + byteorder="big", + ) + ) + + length = len(mappings) + if length == 1: + position = mappings[0] + + else: + if len(params) >= 4: + raise CriticalError("Invalid number of parameters.") + + if len(params) >= 1: + position = int(params[0]) + if len(params) >= 2: + length = int(params[1]) + if len(params) == 3 and params[2] == "array": + position_formatted = utils.zpad( + utils.int_to_big_endian(position), 32 + ) + position = int.from_bytes( + utils.sha3(position_formatted), byteorder="big" + ) + + except ValueError: + raise CriticalError( + "Invalid storage index. Please provide a numeric value." + ) + + outtxt = [] + + try: + if length == 1: + outtxt.append( + "{}: {}".format( + position, self.eth.eth_getStorageAt(address, position) + ) + ) + else: + if len(mappings) > 0: + for i in range(0, len(mappings)): + position = mappings[i] + outtxt.append( + "{}: {}".format( + hex(position), + self.eth.eth_getStorageAt(address, position), + ) + ) + else: + for i in range(position, position + length): + outtxt.append( + "{}: {}".format( + hex(i), self.eth.eth_getStorageAt(address, i) + ) + ) + except FileNotFoundError as e: + raise CriticalError("IPC error: " + str(e)) + except ConnectionError: + raise CriticalError( + "Could not connect to RPC server. Make sure that your node is running and that RPC parameters are set correctly." + ) + return "\n".join(outtxt) diff --git a/mythril/mythril/mythril_leveldb.py b/mythril/mythril/mythril_leveldb.py new file mode 100644 index 00000000..84db9469 --- /dev/null +++ b/mythril/mythril/mythril_leveldb.py @@ -0,0 +1,42 @@ +import re +from mythril.exceptions import CriticalError + + +class MythrilLevelDB(object): + """ + Class which does operations on leveldb + """ + + def __init__(self, leveldb): + self.level_db = leveldb + + def search_db(self, search): + """ + Searches + :param search: + """ + + def search_callback(_, address, balance): + """ + + :param _: + :param address: + :param balance: + """ + print("Address: " + address + ", balance: " + str(balance)) + + try: + self.level_db.search(search, search_callback) + + except SyntaxError: + raise CriticalError("Syntax error in search expression.") + + def contract_hash_to_address(self, hash): + """ + Returns address of the corresponding hash by searching the leveldb + :param hash: Hash to be searched + """ + if not re.match(r"0x[a-fA-F0-9]{64}", hash): + raise CriticalError("Invalid address hash. Expected format is '0x...'.") + + print(self.level_db.contract_hash_to_address(hash)) diff --git a/tests/laser/transaction/create_transaction_test.py b/tests/laser/transaction/create_transaction_test.py index 10a1f837..5c4e93ad 100644 --- a/tests/laser/transaction/create_transaction_test.py +++ b/tests/laser/transaction/create_transaction_test.py @@ -1,4 +1,4 @@ -from mythril.mythril import Mythril +from mythril.mythril import MythrilDisassembler from mythril.laser.ethereum.transaction import execute_contract_creation from mythril.ethereum import util import mythril.laser.ethereum.svm as svm @@ -13,7 +13,7 @@ from mythril.analysis.symbolic import SymExecWrapper def test_create(): contract = SolidityContract( str(tests.TESTDATA_INPUTS_CONTRACTS / "calls.sol"), - solc_binary=Mythril._init_solc_binary("0.5.0"), + solc_binary=MythrilDisassembler._init_solc_binary("0.5.0"), ) laser_evm = svm.LaserEVM({}) @@ -37,7 +37,7 @@ def test_create(): def test_sym_exec(): contract = SolidityContract( str(tests.TESTDATA_INPUTS_CONTRACTS / "calls.sol"), - solc_binary=Mythril._init_solc_binary("0.5.0"), + solc_binary=MythrilDisassembler._init_solc_binary("0.5.0"), ) sym = SymExecWrapper( diff --git a/tests/native_test.py b/tests/native_test.py index 3d0438ff..c58a25e3 100644 --- a/tests/native_test.py +++ b/tests/native_test.py @@ -1,5 +1,5 @@ from mythril.solidity.soliditycontract import SolidityContract -from mythril.mythril import Mythril +from mythril.mythril import MythrilDisassembler from mythril.laser.ethereum.state.account import Account from mythril.laser.ethereum.state.machine_state import MachineState from mythril.laser.ethereum.state.global_state import GlobalState @@ -84,7 +84,8 @@ class NativeTests(BaseTestCase): def runTest(): """""" disassembly = SolidityContract( - "./tests/native_tests.sol", solc_binary=Mythril._init_solc_binary("0.5.0") + "./tests/native_tests.sol", + solc_binary=MythrilDisassembler._init_solc_binary("0.5.0"), ).disassembly account = Account("0x0000000000000000000000000000000000000000", disassembly) accounts = {account.address: account} diff --git a/tests/solidity_contract_test.py b/tests/solidity_contract_test.py index 6b55aad1..4edb8e6e 100644 --- a/tests/solidity_contract_test.py +++ b/tests/solidity_contract_test.py @@ -1,6 +1,6 @@ from pathlib import Path -from mythril.mythril import Mythril +from mythril.mythril import MythrilDisassembler from mythril.solidity.soliditycontract import SolidityContract from tests import BaseTestCase @@ -11,7 +11,7 @@ class SolidityContractTest(BaseTestCase): def test_get_source_info_without_name_gets_latest_contract_info(self): input_file = TEST_FILES / "multi_contracts.sol" contract = SolidityContract( - str(input_file), solc_binary=Mythril._init_solc_binary("0.5.0") + str(input_file), solc_binary=MythrilDisassembler._init_solc_binary("0.5.0") ) code_info = contract.get_source_info(142) @@ -25,7 +25,7 @@ class SolidityContractTest(BaseTestCase): contract = SolidityContract( str(input_file), name="Transfer1", - solc_binary=Mythril._init_solc_binary("0.5.0"), + solc_binary=MythrilDisassembler._init_solc_binary("0.5.0"), ) code_info = contract.get_source_info(142) @@ -39,7 +39,7 @@ class SolidityContractTest(BaseTestCase): contract = SolidityContract( str(input_file), name="AssertFail", - solc_binary=Mythril._init_solc_binary("0.5.0"), + solc_binary=MythrilDisassembler._init_solc_binary("0.5.0"), ) code_info = contract.get_source_info(70, constructor=True)