#!/usr/bin/env python """mythril.py: Bug hunting on the Ethereum blockchain http://www.github.com/b-mueller/mythril """ from mythril.ether import evm, util from mythril.ether.contractstorage import get_persistent_storage from mythril.ether.ethcontract import ETHContract from mythril.ether.util import compile_solidity from mythril.rpc.client import EthJsonRpc from mythril.ipc.client import EthIpc from mythril.rpc.exceptions import ConnectionError from mythril.support import signatures from mythril.support.loader import DynLoader from mythril.exceptions import CompilerError from mythril.analysis.symbolic import StateSpace from mythril.analysis.callgraph import generate_graph from mythril.analysis.security import fire_lasers from ethereum import utils from pathlib import Path from json.decoder import JSONDecodeError import logging import json import sys import argparse import os import re def searchCallback(code_hash, code, addresses, balances): print("Matched contract with code hash " + code_hash) for i in range(0, len(addresses)): print("Address: " + addresses[i] + ", balance: " + str(balances[i])) def exitWithError(message): print(message) sys.exit() parser = argparse.ArgumentParser(description='Bug hunting on the Ethereum blockchain') parser.add_argument("solidity_file", nargs='*') commands = parser.add_argument_group('commands') commands.add_argument('-d', '--disassemble', action='store_true', help='disassemble') commands.add_argument('-g', '--graph', help='generate a control flow graph', metavar='OUTPUT_FILE') commands.add_argument('-x', '--fire-lasers', action='store_true', help='detect vulnerabilities') commands.add_argument('-t', '--trace', action='store_true', help='trace contract, use with --data (optional)') commands.add_argument('-s', '--search', help='search the contract database', metavar='EXPRESSION') commands.add_argument('--xrefs', action='store_true', help='get xrefs from a contract') commands.add_argument('--hash', help='calculate function signature hash', metavar='SIGNATURE') commands.add_argument('--storage', help='read data from storage index, use with -a', metavar='INDEX') commands.add_argument('--init-db', action='store_true', help='initialize the contract database') inputs = parser.add_argument_group('input arguments') inputs.add_argument('-c', '--code', help='hex-encoded bytecode string ("6060604052...")', metavar='BYTECODE') inputs.add_argument('-a', '--address', help='pull contract from the blockchain', metavar='CONTRACT_ADDRESS') inputs.add_argument('-l', '--dynld', action='store_true', help='auto-load dependencies (experimental)') inputs.add_argument('--data', help='message call input data for tracing') options = parser.add_argument_group('options') options.add_argument('--sync-all', action='store_true', help='Also sync contracts with zero balance') options.add_argument('--infura-mainnet', action='store_true', help='Use Infura Node service, equivalent to: --rpchost=mainnet.infura.io --rpcport=443 --rpctls="True"') options.add_argument('--infura-rinkeby', action='store_true', help='Use Infura Node service, equivalent to: --rpchost=rinkeby.infura.io --rpcport=443 --rpctls="True"') options.add_argument('--infura-kovan', action='store_true', help='Use Infura Node service, equivalent to: --rpchost=kovan.infura.io --rpcport=443 --rpctls="True"') options.add_argument('--infura-ropsten', action='store_true', help='Use Infura Node service, equivalent to: --rpchost=ropsten.infura.io --rpcport=443 --rpctls="True"') options.add_argument('--rpchost', default='127.0.0.1', help='RPC host') options.add_argument('--rpcport', type=int, default=8545, help='RPC port') options.add_argument('--rpctls', type=bool, default=False, help='RPC port') options.add_argument('--ipc', help='use IPC interface instead of RPC', action='store_true') options.add_argument('--max-depth', type=int, default=12, help='Maximum recursion depth for symbolic execution') options.add_argument('--enable-physics', type=bool, default=False, help='enable graph physics simulation') options.add_argument('-v', type=int, help='log level (0-2)', metavar='LOG_LEVEL') # Get config values try: mythril_dir = os.environ['MYTHRIL_DIR'] except KeyError: mythril_dir = os.path.join(os.path.expanduser('~'), ".mythril") try: solc_binary = os.environ['SOLC'] except KeyError: solc_binary = 'solc' # Initialize data directry and singature database if not os.path.exists(mythril_dir): logging.info("Creating mythril data directory") os.mkdir(mythril_dir) # If no function signature file exists, create it. Function signatures from Solidity source code are added automatically. signatures_file = os.path.join(mythril_dir, 'signatures.json') if not os.path.exists(signatures_file): print("No signature database found. Creating empty database: " + signatures_file + "\n" \ "Consider replacing it with the pre-initialized database at " \ "https://raw.githubusercontent.com/ConsenSys/mythril/master/signatures.json") sigs = {} with open(signatures_file, 'a') as f: json.dump({},f) else: with open(signatures_file) as f: try: sigs = json.load(f) except JSONDecodeError as e: exitWithError("Invalid JSON in signatures file " + signatures_file + "\n" + str(e)) # Parse cmdline args args = parser.parse_args() if not (args.search or args.init_db or args.hash or args.disassemble or args.graph or args.xrefs or args.fire_lasers or args.trace or args.storage): parser.print_help() sys.exit() if (args.v): if (0 <= args.v < 3): logging.basicConfig(level=[logging.NOTSET, logging.INFO, logging.DEBUG][args.v]) elif (args.hash): print("0x" + utils.sha3(args.hash)[:4].hex()) sys.exit() # Establish RPC/IPC connection if necessary if (args.address or len(args.solidity_file) or args.init_db): if args.ipc: eth = EthIpc() else: if args.infura_mainnet: eth = EthJsonRpc('mainnet.infura.io', 443, True) elif args.infura_rinkeby: eth = EthJsonRpc('rinkeby.infura.io', 443, True) elif args.infura_kovan: eth = EthJsonRpc('kovan.infura.io', 443, True) elif args.infura_ropsten: eth = EthJsonRpc('ropsten.infura.io', 443, True) else: eth = EthJsonRpc(args.rpchost, args.rpcport, args.rpctls) # Database search ops if args.search or args.init_db: contract_storage = get_persistent_storage(mythril_dir) if (args.search): try: contract_storage.search(args.search, searchCallback) except SyntaxError: exitWithError("Syntax error in search expression.") elif (args.init_db): contract_storage.initialize(eth, args.sync_all) sys.exit() # Load / compile input contracts contracts = [] if (args.code): contracts.append(ETHContract(args.code, name="MAIN", address = util.get_indexed_address(0))) elif (args.address): if not re.match(r'0x[a-fA-F0-9]{40}', args.address): exitWithError("Invalid contract address. Expected format is '0x...'.") try: code = eth.eth_getCode(args.address) if (code == "0x"): exitWithError("Received an empty response from eth_getCode. Check the contract address and verify that you are on the correct chain.") except ConnectionError: exitWithError("Error connecting to Ethereum node. Make sure that your node is up and that the provided RPC/IPC parameters are correct.") contracts.append(ETHContract(code, name=args.address, address = args.address)) elif (len(args.solidity_file)): index = 0 for file in args.solidity_file: file = file.replace("~", str(Path.home())) # Expand user path signatures.add_signatures_from_file(file, sigs) logging.debug("Adding function signatures from source code:\n" + str(sigs)) try: name, bytecode = compile_solidity(solc_binary, file) except CompilerError as e: exitWithError(e) # Max. 16 input files supported! contract = ETHContract(bytecode, name = name, address = util.get_indexed_address(index)) index += 1 contracts.append(contract) logging.info(contract.name + " at " + contract.address) # Save updated signature with open(signatures_file, 'w') as f: json.dump(sigs, f) else: exitWithError("No input bytecode. Please provide EVM code via -c BYTECODE, -a ADDRESS, or -i SOLIDITY_FILES") # Commands if args.storage: if not args.address: exitWithError("To read storage, provide the address of a deployed contract with the -a option.") else: try: position = int(args.storage) except ValueError: exitWithError("Invalid storage index. Please provide a numeric value.") print(eth.eth_getStorageAt(args.address, position=position, block='latest')) elif (args.disassemble): easm_text = contracts[0].get_easm() sys.stdout.write(easm_text) elif (args.trace): if (args.data): trace = evm.trace(contracts[0].code, args.data) else: trace = evm.trace(contracts[0].code) for i in trace: if (re.match(r'^PUSH.*', i['op'])): print(str(i['pc']) + " " + i['op'] + " " + i['pushvalue'] + ";\tSTACK: " + i['stack']) else: print(str(i['pc']) + " " + i['op'] + ";\tSTACK: " + i['stack']) elif (args.xrefs): print("\n".join(contracts[0].get_xrefs())) elif (args.graph) or (args.fire_lasers): if (args.graph): if (args.dynld): states = StateSpace(contracts, dynloader=DynLoader(eth), max_depth=args.max_depth) else: states = StateSpace(contracts, max_depth=args.max_depth) if args.enable_physics is not None: physics = True html = generate_graph(states, args.enable_physics) try: with open(args.graph, "w") as f: f.write(html) except Exception as e: print("Error saving graph: " + str(e)) else: if (args.dynld): states = StateSpace(contracts, dynloader=DynLoader(eth), max_depth=args.max_depth) else: states = StateSpace(contracts, max_depth=args.max_depth) fire_lasers(states) else: parser.print_help()