Security analysis tool for EVM bytecode. Supports smart contracts built for Ethereum, Hedera, Quorum, Vechain, Roostock, Tron and other EVM-compatible blockchains.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
mythril/myth

466 lines
17 KiB

#!/usr/bin/env python3
7 years ago
"""mythril.py: Bug hunting on the Ethereum blockchain
7 years ago
http://www.github.com/b-mueller/mythril
"""
from mythril.ether import util
from mythril.ether.contractstorage import get_persistent_storage
from mythril.ether.ethcontract import ETHContract
from mythril.ether.soliditycontract import SolidityContract
from mythril.rpc.client import EthJsonRpc
7 years ago
from mythril.ipc.client import EthIpc
from mythril.rpc.exceptions import ConnectionError
from mythril.support import signatures
from mythril.support.truffle import analyze_truffle_project
from mythril.support.loader import DynLoader
from mythril.exceptions import CompilerError, NoContractFoundError
7 years ago
from mythril.analysis.symbolic import SymExecWrapper
from mythril.analysis.callgraph import generate_graph
from mythril.analysis.security import fire_lasers
from mythril.analysis.report import Report
from laser.ethereum import helper
from ethereum import utils
7 years ago
from pathlib import Path
from json.decoder import JSONDecodeError
7 years ago
from solc.exceptions import SolcError
import solc
7 years ago
import logging
import json
7 years ago
import sys
import argparse
import os
import re
7 years ago
7 years ago
7 years ago
def searchCallback(code_hash, code, addresses, balances):
print("Matched contract with code hash " + code_hash)
7 years ago
for i in range(0, len(addresses)):
print("Address: " + addresses[i] + ", balance: " + str(balances[i]))
def exitWithError(format, message):
if (format == 'text' or format == 'markdown'):
print(message)
else:
result = {'success': False, 'error': str(message), 'issues': []}
print(json.dumps(result))
7 years ago
sys.exit()
parser = argparse.ArgumentParser(description='Security analysis of Ethereum smart contracts')
7 years ago
parser.add_argument("solidity_file", nargs='*')
commands = parser.add_argument_group('commands')
7 years ago
commands.add_argument('-g', '--graph', help='generate a control flow graph', metavar='OUTPUT_FILE')
commands.add_argument('-x', '--fire-lasers', action='store_true', help='detect vulnerabilities, use with -c, -a or solidity file(s)')
commands.add_argument('-t', '--truffle', action='store_true', help='analyze a truffle project (run from project dir)')
inputs = parser.add_argument_group('input arguments')
inputs.add_argument('-c', '--code', help='hex-encoded bytecode string ("6060604052...")', metavar='BYTECODE')
7 years ago
inputs.add_argument('-a', '--address', help='pull contract from the blockchain', metavar='CONTRACT_ADDRESS')
inputs.add_argument('-l', '--dynld', action='store_true', help='auto-load dependencies from the blockchain')
inputs = parser.add_argument_group('output formats')
inputs.add_argument('-o', '--outform', choices=['text', 'markdown', 'json'], default='text', help='report output format', metavar='<text/json>')
database = parser.add_argument_group('local contracts database')
database.add_argument('--init-db', action='store_true', help='initialize the contract database')
database.add_argument('-s', '--search', help='search the contract database', metavar='EXPRESSION')
utilities = parser.add_argument_group('utilities')
7 years ago
utilities.add_argument('-d', '--disassemble', action='store_true', help='print disassembly')
utilities.add_argument('--xrefs', action='store_true', help='get xrefs from a contract')
utilities.add_argument('--hash', help='calculate function signature hash', metavar='SIGNATURE')
utilities.add_argument('--storage', help='read state variables from storage index, use with -a', metavar='INDEX,NUM_SLOTS,[array]')
utilities.add_argument('--solv', help='specify solidity compiler version. If not present, will try to install it (Experimental)', metavar='SOLV')
options = parser.add_argument_group('options')
options.add_argument('-m', '--modules', help='Comma-separated list of security analysis modules', metavar='MODULES')
options.add_argument('--sync-all', action='store_true', help='Also sync contracts with zero balance')
options.add_argument('--max-depth', type=int, default=12, help='Maximum recursion depth for symbolic execution')
options.add_argument('--solc-args', help='Extra arguments for solc')
options.add_argument('--phrack', action='store_true', help='Phrack-style call graph')
options.add_argument('--enable-physics', action='store_true', help='enable graph physics simulation')
options.add_argument('-v', type=int, help='log level (0-2)', metavar='LOG_LEVEL')
rpc = parser.add_argument_group('RPC options')
rpc.add_argument('--rpc', help='connect via RPC', metavar='HOST:PORT')
rpc.add_argument('--rpctls', type=bool, default=False, help='RPC connection over TLS')
rpc.add_argument('--ganache', action='store_true', help='Preset: local Ganache')
rpc.add_argument('-i', '--infura-mainnet', action='store_true', help='Preset: Infura Node service (Mainnet)')
rpc.add_argument('--infura-rinkeby', action='store_true', help='Preset: Infura Node service (Rinkeby)')
rpc.add_argument('--infura-kovan', action='store_true', help='Preset: Infura Node service (Kovan)')
rpc.add_argument('--infura-ropsten', action='store_true', help='Preset: Infura Node service (Ropsten)')
# Get config values
7 years ago
args = parser.parse_args()
try:
mythril_dir = os.environ['MYTHRIL_DIR']
except KeyError:
mythril_dir = os.path.join(os.path.expanduser('~'), ".mythril")
7 years ago
# Detect unsupported combinations of command line args
if (args.dynld) and not (args.address):
exitWithError(args.outform, "Dynamic loader can be used in on-chain analysis mode only (-a).")
# Initialize data directry and signature database
if not os.path.exists(mythril_dir):
logging.info("Creating mythril data directory")
os.mkdir(mythril_dir)
# If no function signature file exists, create it. Function signatures from Solidity source code are added automatically.
signatures_file = os.path.join(mythril_dir, 'signatures.json')
if not os.path.exists(signatures_file):
7 years ago
logging.info("No signature database found. Creating empty database: " + signatures_file + "\n"
"Consider replacing it with the pre-initialized database at "
"https://raw.githubusercontent.com/ConsenSys/mythril/master/signatures.json")
sigs = {}
with open(signatures_file, 'a') as f:
7 years ago
json.dump({}, f)
else:
with open(signatures_file) as f:
try:
sigs = json.load(f)
except JSONDecodeError as e:
exitWithError(args.outform, "Invalid JSON in signatures file " + signatures_file + "\n" + str(e))
7 years ago
# Parse cmdline args
if not (args.search or args.init_db or args.hash or args.disassemble or args.graph or args.xrefs or args.fire_lasers or args.storage or args.truffle):
parser.print_help()
sys.exit()
7 years ago
if (args.v):
if (0 <= args.v < 3):
logging.basicConfig(level=[logging.NOTSET, logging.INFO, logging.DEBUG][args.v])
elif (args.hash):
print("0x" + utils.sha3(args.hash)[:4].hex())
sys.exit()
if args.truffle:
try:
analyze_truffle_project(args)
except FileNotFoundError:
print("Build directory not found. Make sure that you start the analysis from the project root, and that 'truffle compile' has executed successfully.")
sys.exit()
7 years ago
# Figure out solc binary and version
# Only proper versions are supported. No nightlies, commits etc (such as available in remix)
if args.solv:
version = args.solv
7 years ago
# tried converting input to semver, seemed not necessary so just slicing for now
7 years ago
if version == str(solc.main.get_solc_version())[:6]:
logging.info('Given version matches installed version')
7 years ago
try:
solc_binary = os.environ['SOLC']
except KeyError:
solc_binary = 'solc'
else:
if util.solc_exists(version):
logging.info('Given version is already installed')
solc_binary = os.path.join(os.environ['HOME'], ".py-solc/solc-v" + version, "bin/solc")
logging.info("Setting the compiler to " + str(solc_binary))
7 years ago
else:
try:
solc.install_solc('v' + version)
solc_binary = os.path.join(os.environ['HOME'], ".py-solc/solc-v" + version, "bin/solc")
logging.info("Setting the compiler to " + str(solc_binary))
7 years ago
except SolcError:
exitWithError(args.outform, "There was an error when trying to install the specified solc version")
else:
if not args.solv:
try:
solc_binary = os.environ['SOLC']
except KeyError:
try:
solc_binary = 'solc'
except:
exitWithError('No solidity compiler found, please make sure it is installed or specify it manually.')
# Establish RPC/IPC connection if necessary
if (args.address or args.init_db):
if args.infura_mainnet:
eth = EthJsonRpc('mainnet.infura.io', 443, True)
elif args.infura_rinkeby:
eth = EthJsonRpc('rinkeby.infura.io', 443, True)
elif args.infura_kovan:
eth = EthJsonRpc('kovan.infura.io', 443, True)
elif args.infura_ropsten:
eth = EthJsonRpc('ropsten.infura.io', 443, True)
elif args.ganache:
eth = EthJsonRpc('localhost', 7545, False)
elif args.rpc:
try:
host, port = args.rpc.split(":")
except ValueError:
exitWithError(args.outform, "Invalid RPC argument, use HOST:PORT")
tls = args.rpctls
eth = EthJsonRpc(host, int(port), tls)
else:
try:
eth = EthIpc()
except:
exitWithError(args.outform, "IPC initialization failed. Please verify that your local Ethereum node is running, or use the -i flag to connect to INFURA.")
# Database search ops
if args.search or args.init_db:
contract_storage = get_persistent_storage(mythril_dir)
if (args.search):
try:
contract_storage.search(args.search, searchCallback)
except SyntaxError:
exitWithError(args.outform, "Syntax error in search expression.")
elif (args.init_db):
try:
contract_storage.initialize(eth, args.sync_all)
except FileNotFoundError as e:
exitWithError(args.outform, "Error syncing database over IPC: " + str(e))
except ConnectionError as e:
exitWithError(args.outform, "Could not connect to RPC server. Make sure that your node is running and that RPC parameters are set correctly.")
sys.exit()
7 years ago
7 years ago
7 years ago
# Load / compile input contracts
contracts = []
if (args.code):
address = util.get_indexed_address(0)
contracts.append(ETHContract(args.code, name="MAIN"))
# Get bytecode from a contract address
elif (args.address):
address = args.address
if not re.match(r'0x[a-fA-F0-9]{40}', args.address):
exitWithError(args.outform, "Invalid contract address. Expected format is '0x...'.")
try:
code = eth.eth_getCode(args.address)
if (code == "0x"):
exitWithError(args.outform, "Received an empty response from eth_getCode. Check the contract address and verify that you are on the correct chain.")
except FileNotFoundError as e:
exitWithError(args.outform, "IPC error: " + str(e))
except ConnectionError as e:
exitWithError(args.outform, "Could not connect to RPC server. Make sure that your node is running and that RPC parameters are set correctly.")
except Exception as e:
exitWithError(args.outform, "IPC / RPC error: " + str(e))
contracts.append(ETHContract(code, name=args.address))
# Compile Solidity source file(s)
7 years ago
elif (len(args.solidity_file)):
address = util.get_indexed_address(0)
if(args.graph and len(args.solidity_file) > 1):
7 years ago
exitWithError(args.outform, "Cannot generate call graphs from multiple input files. Please do it one at a time.")
7 years ago
for file in args.solidity_file:
if ":" in file:
file, contract_name = file.split(":")
else:
contract_name = None
7 years ago
file = file.replace("~", str(Path.home())) # Expand user path
try:
7 years ago
signatures.add_signatures_from_file(file, sigs) # Parse file for new function signatures
contract = SolidityContract(file, contract_name, solc_args=args.solc_args)
logging.info("Analyzing contract %s:%s" % (file, contract.name))
contracts.append(contract)
except CompilerError as e:
exitWithError(args.outform, e)
except NoContractFoundError:
logging.info("The file " + file + " does not contain a compilable contract.")
except FileNotFoundError:
exitWithError(args.outform, "Input file not found: " + file)
7 years ago
# Save updated function signatures
with open(signatures_file, 'w') as f:
json.dump(sigs, f)
else:
7 years ago
exitWithError(args.outform, "No input bytecode. Please provide EVM code via -c BYTECODE, -a ADDRESS, or -i SOLIDITY_FILES")
# Commands
7 years ago
if args.storage:
if not args.address:
exitWithError(args.outform, "To read storage, provide the address of a deployed contract with the -a option.")
else:
position = 0
length = 1
array = 0
try:
params = (args.storage).split(",")
if len(params) >= 1 and len(params) <= 3:
position = int(params[0])
if len(params) >= 2 and len(params) <= 3:
7 years ago
length = int(params[1])
if len(params) == 3:
7 years ago
if re.match("array", params[2]):
array = 1
if len(params) >= 4:
exitWithError(args.outform, "Invalid number of parameters.")
except ValueError:
exitWithError(args.outform, "Invalid storage index. Please provide a numeric value.")
if array:
position_formated = str(position).zfill(64)
position = int(utils.sha3(position_formated), 16)
try:
if length == 1:
7 years ago
print("{}: ".format(position) + eth.eth_getStorageAt(args.address, position))
else:
for i in range(position, position + length):
7 years ago
print("{}: ".format(hex(i)) + eth.eth_getStorageAt(args.address, i))
except FileNotFoundError as e:
exitWithError(args.outform, "IPC error: " + str(e))
except ConnectionError as e:
exitWithError(args.outform, "Could not connect to RPC server. Make sure that your node is running and that RPC parameters are set correctly.")
elif (args.disassemble):
7 years ago
easm_text = contracts[0].get_easm()
sys.stdout.write(easm_text)
elif (args.xrefs):
7 years ago
print("\n".join(contracts[0].get_xrefs()))
7 years ago
elif (args.graph) or (args.fire_lasers):
7 years ago
if not len(contracts):
exitWithError(args.outform, "input files do not contain any valid contracts")
7 years ago
if (args.graph):
7 years ago
# try:
if (args.dynld):
sym = SymExecWrapper(contracts[0], address, dynloader=DynLoader(eth), max_depth=args.max_depth)
else:
sym = SymExecWrapper(contracts[0], address, max_depth=args.max_depth)
# except:
# exitWithError(args.outform, "Symbolic execution error: " + str(e))
7 years ago
if args.enable_physics is not None:
physics = True
7 years ago
html = generate_graph(sym, args.enable_physics, args.phrack)
7 years ago
7 years ago
try:
with open(args.graph, "w") as f:
f.write(html)
except Exception as e:
exitWithError(args.outform, "Error saving graph: " + str(e))
7 years ago
else:
7 years ago
_issues = []
report = Report()
for contract in contracts:
# try:
if (args.dynld):
sym = SymExecWrapper(contract, address, dynloader=DynLoader(eth), max_depth=args.max_depth)
else:
sym = SymExecWrapper(contract, address, max_depth=args.max_depth)
# except Exception as e:
# exitWithError(args.outform, "Symbolic exection error: " + str(e))
if args.modules:
issues = fire_lasers(sym, args.modules.split(","))
else:
issues = fire_lasers(sym)
if len(issues):
# Append source code info for SolidityContract
if (type(contract) == SolidityContract):
for issue in issues:
if (issue.pc):
codeinfo = contract.get_source_info(issue.pc)
issue.filename = codeinfo.filename
issue.code = codeinfo.code
issue.lineno = codeinfo.lineno
for issue in issues:
7 years ago
report.append_issue(issue) # For text and markdown output
_issues.append(issue.as_dict()) # List of dicts for JSON output
# Finally, output the results
if (len(_issues)):
if (args.outform == 'json'):
result = {'success': True, 'error': None, 'issues': _issues}
print(json.dumps(result))
else:
if (args.outform == 'text'):
print(report.as_text())
elif (args.outform == 'markdown'):
print(report.as_markdown())
else:
if (args.outform == 'text' or args.outform == 'markdown'):
print("The analysis was completed successfully. No issues were detected.")
else:
result = {'success': True, 'error': None, 'issues': []}
print(json.dumps(result))
else:
parser.print_help()