#!/usr/bin/env python
"""mythril.py: Bug hunting on the Ethereum blockchain
http://www.github.com/b-mueller/mythril
"""
from mythril.ether import evm,util
from mythril.disassembler.disassembly import Disassembly
from mythril.disassembler.callgraph import generate_callgraph
from mythril.ether.contractstorage import get_persistent_storage
from mythril.rpc.client import EthJsonRpc
from mythril.ipc.client import EthIpc
from ethereum import utils
from laser.ethereum import laserfree
import logging
import binascii
import sys
import argparse
import os
import re
def searchCallback(code_hash, code, addresses, balances):
print("Matched contract with code hash " + code_hash )
for i in range(0, len(addresses)):
print("Address: " + addresses[i] + ", balance: " + str(balances[i]))
def exitWithError(message):
print(message)
sys.exit()
parser = argparse.ArgumentParser(description='Bug hunting on the Ethereum blockchain')
commands = parser.add_argument_group('commands')
commands.add_argument('-d', '--disassemble', action='store_true', help='disassemble, specify input with -c or -a')
commands.add_argument('-t', '--trace', action='store_true', help='trace, use with -c or -a and --data (optional)')
commands.add_argument('-g', '--graph', help='generate a call graph', metavar='OUTPUT_FILE')
commands.add_argument('-l', '--fire-lasers', action='store_true', help='detect vulnerabilities, use with -c or -a')
commands.add_argument('-s', '--search', help='search the contract database')
commands.add_argument('--xrefs', help='get xrefs from contract in database', metavar='CONTRACT_HASH')
commands.add_argument('--hash', help='calculate function signature hash', metavar='SIGNATURE')
commands.add_argument('--init-db', action='store_true', help='initialize the contract database')
inputs = parser.add_argument_group('input arguments')
inputs.add_argument('-c', '--code', help='hex-encoded bytecode string ("6060604052...")', metavar='BYTECODE')
inputs.add_argument('-a', '--address', help='contract address')
inputs.add_argument('--data', help='message call input data for tracing')
options = parser.add_argument_group('options')
options.add_argument('--sync-all', action='store_true', help='Also sync contracts with zero balance')
options.add_argument('--rpchost', default='127.0.0.1', help='RPC host')
options.add_argument('--rpcport', type=int, default=8545, help='RPC port')
options.add_argument('--ipc', help='use IPC interface instead of RPC', action='store_true')
options.add_argument('--disable-physics', action='store_true', help='disable graph physics simulation')
options.add_argument('-v', type=int, help='log level (0-2)', metavar='LOG_LEVEL')
try:
db_dir = os.environ['DB_DIR']
except KeyError:
db_dir = None
args = parser.parse_args()
if (args.v):
if (0 <= args.v < 3):
logging.basicConfig(level=[logging.NOTSET, logging.INFO, logging.DEBUG][args.v])
if (args.disassemble or args.graph or args.fire_lasers):
if (args.code):
encoded_bytecode = args.code
elif (args.address):
if args.ipc:
try:
eth = EthIpc()
encoded_bytecode = eth.eth_getCode(args.address)
except Exception as e:
exitWithError("Exception loading bytecode via IPC: " + str(e))
else:
try:
eth = EthJsonRpc(args.rpchost, args.rpcport)
encoded_bytecode = eth.eth_getCode(args.address)
except Exception as e:
exitWithError("Exception loading bytecode via RPC: " + str(e))
else:
exitWithError("No input bytecode. Please provide the code via -c BYTECODE or -a address")
if encoded_bytecode is not None:
logging.debug("Input bytecode: " + encoded_bytecode)
try:
disassembly = Disassembly(encoded_bytecode)
except binascii.Error:
exitWithError("Disassembler: Invalid code string.")
if (args.disassemble):
easm_text = disassembly.get_easm()
sys.stdout.write(easm_text)
elif (args.graph):
if (args.disable_physics):
physics = False
else:
physics = True
html = generate_callgraph(disassembly, physics)
try:
with open(args.graph, "w") as f:
f.write(html)
except Exception as e:
print("Error saving graph: " + str(e))
elif (args.fire_lasers):
laserfree.fire(disassembly)
elif (args.trace):
if (args.code):
encoded_bytecode = args.code
elif (args.address):
if args.ipc:
eth = EthIpc()
encoded_bytecode = eth.eth_getCode(args.address)
else:
eth = EthJsonRpc(args.rpchost, args.rpcport)
encoded_bytecode = eth.eth_getCode(args.address)
else:
exitWithError("Disassembler: Provide the input bytecode via -c BYTECODE or --id ID")
if (args.data):
trace = evm.trace(encoded_bytecode, args.data)
else:
trace = evm.trace(encoded_bytecode)
for i in trace:
if (re.match(r'^PUSH.*', i['op'])):
print(str(i['pc']) + " " + i['op'] + " " + i['pushvalue'] + ";\tSTACK: " + i['stack'])
else:
print(str(i['pc']) + " " + i['op'] + ";\tSTACK: " + i['stack'])
else:
contract_storage = get_persistent_storage(db_dir)
if (args.search):
try:
contract_storage.search(args.search, searchCallback)
except SyntaxError:
exitWithError("Syntax error in search expression.")
elif (args.xrefs):
try:
contract_hash = util.safe_decode(args.xrefs)
except binascii.Error:
exitWithError("Invalid contract hash.")
try:
contract = contract_storage.get_contract_by_hash(contract_hash)
print("\n".join(contract.get_xrefs()))
except KeyError:
exitWithError("Contract not found in the database.")
elif (args.init_db):
if args.ipc:
contract_storage.initialize(args.rpchost, args.rpcport, args.sync_all, args.ipc)
else:
contract_storage.initialize(args.rpchost, args.rpcport, args.sync_all, args.ipc)
elif (args.hash):
print("0x" + utils.sha3(args.hash)[:4].hex())
else:
parser.print_help()