Refactor mythril/mythril.py

pull/906/head
Nikhil Parasaram 6 years ago
parent ef98d051c2
commit 1a63fb20ef
  1. 119
      mythril/interfaces/cli.py
  2. 714
      mythril/mythril.py
  3. 4
      mythril/mythril/__init__.py
  4. 197
      mythril/mythril/mythril_analyzer.py
  5. 183
      mythril/mythril/mythril_config.py
  6. 293
      mythril/mythril/mythril_disassembler.py
  7. 42
      mythril/mythril/mythril_leveldb.py
  8. 6
      tests/laser/transaction/create_transaction_test.py
  9. 5
      tests/native_test.py
  10. 8
      tests/solidity_contract_test.py

@ -15,7 +15,12 @@ import coloredlogs
import mythril.support.signatures as sigs
from mythril.exceptions import AddressNotFoundError, CriticalError
from mythril.mythril import Mythril
from mythril.mythril import (
MythrilAnalyzer,
MythrilDisassembler,
MythrilConfig,
MythrilLevelDB,
)
from mythril.version import VERSION
# logging.basicConfig(level=logging.DEBUG)
@ -42,6 +47,19 @@ def main():
parser = argparse.ArgumentParser(
description="Security analysis of Ethereum smart contracts"
)
create_parser(parser)
# Get config values
args = parser.parse_args()
parse_args(parser=parser, args=args)
if __name__ == "__main__":
main()
def create_parser(parser):
parser.add_argument("solidity_file", nargs="*")
commands = parser.add_argument_group("commands")
@ -227,10 +245,8 @@ def main():
)
parser.add_argument("--epic", action="store_true", help=argparse.SUPPRESS)
# Get config values
args = parser.parse_args()
def parse_args(parser, args):
if args.epic:
path = os.path.dirname(os.path.realpath(__file__))
sys.argv.remove("--epic")
@ -280,7 +296,7 @@ def main():
)
if args.query_signature:
if sigs.ethereum_input_decoder == None:
if sigs.ethereum_input_decoder is None:
exit_with_error(
args.outform,
"The --query-signature function requires the python package ethereum-input-decoder",
@ -300,55 +316,57 @@ def main():
# -- commands --
if args.hash:
print(Mythril.hash_for_function_signature(args.hash))
print(MythrilDisassembler.hash_for_function_signature(args.hash))
sys.exit()
try:
# the mythril object should be our main interface
# infura = None, rpc = None, rpctls = None
# solc_args = None, dynld = None, max_recursion_depth = 12):
mythril = Mythril(
solv=args.solv,
dynld=args.dynld,
onchain_storage_access=(not args.no_onchain_storage_access),
solc_args=args.solc_args,
enable_online_lookup=args.query_signature,
)
config = MythrilConfig()
if (
args.dynld
or not args.no_onchain_storage_access
and not (args.rpc or args.i)
):
mythril.set_api_from_config_path()
config.set_api_from_config_path()
if args.address:
# Establish RPC connection if necessary
mythril.set_api_rpc(rpc=args.rpc, rpctls=args.rpctls)
config.set_api_rpc(rpc=args.rpc, rpctls=args.rpctls)
elif args.search or args.contract_hash_to_address:
# Open LevelDB if necessary
mythril.set_api_leveldb(
mythril.leveldb_dir if not args.leveldb_dir else args.leveldb_dir
config.set_api_leveldb(
config.leveldb_dir if not args.leveldb_dir else args.leveldb_dir
)
if args.search:
# Database search ops
mythril.search_db(args.search)
sys.exit()
if args.search or args.contract_hash_to_address:
leveldb_sercher = MythrilLevelDB(config.leveldb_dir)
if args.search:
# Database search ops
leveldb_sercher.search_db(args.search)
if args.contract_hash_to_address:
# search corresponding address
try:
mythril.contract_hash_to_address(args.contract_hash_to_address)
except AddressNotFoundError:
print("Address not found.")
else:
# search corresponding address
try:
leveldb_sercher.contract_hash_to_address(
args.contract_hash_to_address
)
except AddressNotFoundError:
print("Address not found.")
sys.exit()
dissasembler = MythrilDisassembler(
eth=config.eth,
solc_version=args.solv,
solc_args=args.solc_args,
enable_online_lookup=args.query_signature,
)
if args.truffle:
try:
# not really pythonic atm. needs refactoring
mythril.analyze_truffle_project(args)
dissasembler.analyze_truffle_project(args)
except FileNotFoundError:
print(
"Build directory not found. Make sure that you start the analysis from the project root, and that 'truffle compile' has executed successfully."
@ -361,14 +379,14 @@ def main():
if args.code:
# Load from bytecode
code = args.code[2:] if args.code.startswith("0x") else args.code
address, _ = mythril.load_from_bytecode(code, args.bin_runtime)
address, _ = dissasembler.load_from_bytecode(code, args.bin_runtime)
elif args.codefile:
bytecode = "".join([l.strip() for l in args.codefile if len(l.strip()) > 0])
bytecode = bytecode[2:] if bytecode.startswith("0x") else bytecode
address, _ = mythril.load_from_bytecode(bytecode, args.bin_runtime)
address, _ = dissasembler.load_from_bytecode(bytecode, args.bin_runtime)
elif args.address:
# Get bytecode from a contract address
address, _ = mythril.load_from_address(args.address)
address, _ = dissasembler.load_from_address(args.address)
elif args.solidity_file:
# Compile Solidity source file(s)
if args.graph and len(args.solidity_file) > 1:
@ -376,13 +394,20 @@ def main():
args.outform,
"Cannot generate call graphs from multiple input files. Please do it one at a time.",
)
address, _ = mythril.load_from_solidity(args.solidity_file) # list of files
address, _ = dissasembler.load_from_solidity(
args.solidity_file
) # list of files
else:
exit_with_error(
args.outform,
"No input bytecode. Please provide EVM code via -c BYTECODE, -a ADDRESS, or -i SOLIDITY_FILES",
)
analyzer = MythrilAnalyzer(
disassembler=dissasembler,
dynld=args.dynld,
onchain_storage_access=not args.no_onchain_storage_access,
)
# Commands
if args.storage:
@ -392,7 +417,7 @@ def main():
"To read storage, provide the address of a deployed contract with the -a option.",
)
storage = mythril.get_state_variable_from_storage(
storage = dissasembler.get_state_variable_from_storage(
address=address,
params=[a.strip() for a in args.storage.strip().split(",")],
)
@ -401,21 +426,21 @@ def main():
elif args.disassemble:
# or mythril.disassemble(mythril.contracts[0])
if mythril.contracts[0].code:
print("Runtime Disassembly: \n" + mythril.contracts[0].get_easm())
if mythril.contracts[0].creation_code:
print("Disassembly: \n" + mythril.contracts[0].get_creation_easm())
if dissasembler.contracts[0].code:
print("Runtime Disassembly: \n" + dissasembler.contracts[0].get_easm())
if dissasembler.contracts[0].creation_code:
print("Disassembly: \n" + dissasembler.contracts[0].get_creation_easm())
elif args.graph or args.fire_lasers:
if not mythril.contracts:
if not dissasembler.contracts:
exit_with_error(
args.outform, "input files do not contain any valid contracts"
)
if args.graph:
html = mythril.graph_html(
html = analyzer.graph_html(
strategy=args.strategy,
contract=mythril.contracts[0],
contract=analyzer.contracts[0],
address=address,
enable_physics=args.enable_physics,
phrackify=args.phrack,
@ -433,7 +458,7 @@ def main():
else:
try:
report = mythril.fire_lasers(
report = analyzer.fire_lasers(
strategy=args.strategy,
address=address,
modules=[m.strip() for m in args.modules.strip().split(",")]
@ -460,14 +485,14 @@ def main():
elif args.statespace_json:
if not mythril.contracts:
if not analyzer.contracts:
exit_with_error(
args.outform, "input files do not contain any valid contracts"
)
statespace = mythril.dump_statespace(
statespace = analyzer.dump_statespace(
strategy=args.strategy,
contract=mythril.contracts[0],
contract=analyzer.contracts[0],
address=address,
max_depth=args.max_depth,
execution_timeout=args.execution_timeout,
@ -486,7 +511,3 @@ def main():
except CriticalError as ce:
exit_with_error(args.outform, str(ce))
if __name__ == "__main__":
main()

@ -1,714 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""mythril.py: Bug hunting on the Ethereum blockchain
http://www.github.com/b-mueller/mythril
"""
import codecs
import logging
import os
import platform
import re
import traceback
from pathlib import Path
from shutil import copyfile
from configparser import ConfigParser
import solc
from ethereum import utils
from solc.exceptions import SolcError
from mythril.ethereum import util
from mythril.ethereum.evmcontract import EVMContract
from mythril.ethereum.interface.rpc.client import EthJsonRpc
from mythril.ethereum.interface.rpc.exceptions import ConnectionError
from mythril.solidity.soliditycontract import SolidityContract, get_contracts_from_file
from mythril.support import signatures
from mythril.support.source_support import Source
from mythril.support.loader import DynLoader
from mythril.exceptions import CompilerError, NoContractFoundError, CriticalError
from mythril.analysis.symbolic import SymExecWrapper
from mythril.analysis.callgraph import generate_graph
from mythril.analysis.traceexplore import get_serializable_statespace
from mythril.analysis.security import fire_lasers, retrieve_callback_issues
from mythril.analysis.report import Report
from mythril.support.truffle import analyze_truffle_project
from mythril.ethereum.interface.leveldb.client import EthLevelDB
log = logging.getLogger(__name__)
class Mythril(object):
"""Mythril main interface class.
1. create mythril object
2. set rpc or leveldb interface if needed
3. load contracts (from solidity, bytecode, address)
4. fire_lasers
.. code-block:: python
mythril = Mythril()
mythril.set_api_rpc_infura()
# (optional) other API adapters
mythril.set_api_rpc(args)
mythril.set_api_rpc_localhost()
mythril.set_api_leveldb(path)
# (optional) other func
mythril.analyze_truffle_project(args)
mythril.search_db(args)
# load contract
mythril.load_from_bytecode(bytecode)
mythril.load_from_address(address)
mythril.load_from_solidity(solidity_file)
# analyze
print(mythril.fire_lasers(args).as_text())
# (optional) graph
for contract in mythril.contracts:
# prints html or save it to file
print(mythril.graph_html(args))
# (optional) other funcs
mythril.dump_statespaces(args)
mythril.disassemble(contract)
mythril.get_state_variable_from_storage(args)
"""
def __init__(
self,
solv=None,
solc_args=None,
dynld=False,
enable_online_lookup=False,
onchain_storage_access=True,
):
self.solv = solv
self.solc_args = solc_args
self.dynld = dynld
self.onchain_storage_access = onchain_storage_access
self.enable_online_lookup = enable_online_lookup
self.mythril_dir = self._init_mythril_dir()
# tries mythril_dir/signatures.db by default (provide path= arg to make this configurable)
self.sigs = signatures.SignatureDB(
enable_online_lookup=self.enable_online_lookup
)
self.solc_binary = self._init_solc_binary(solv)
self.config_path = os.path.join(self.mythril_dir, "config.ini")
self.leveldb_dir = self._init_config()
self.eth = None # ethereum API client
self.eth_db = None # ethereum LevelDB client
self.contracts = [] # loaded contracts
@staticmethod
def _init_mythril_dir():
try:
mythril_dir = os.environ["MYTHRIL_DIR"]
except KeyError:
mythril_dir = os.path.join(os.path.expanduser("~"), ".mythril")
if not os.path.exists(mythril_dir):
# Initialize data directory
log.info("Creating mythril data directory")
os.mkdir(mythril_dir)
db_path = str(Path(mythril_dir) / "signatures.db")
if not os.path.exists(db_path):
# if the default mythril dir doesn't contain a signature DB
# initialize it with the default one from the project root
asset_dir = Path(__file__).parent / "support" / "assets"
copyfile(str(asset_dir / "signatures.db"), db_path)
return mythril_dir
def _init_config(self):
"""If no config file exists, create it and add default options.
Default LevelDB path is specified based on OS
dynamic loading is set to infura by default in the file
Returns: leveldb directory
"""
system = platform.system().lower()
leveldb_fallback_dir = os.path.expanduser("~")
if system.startswith("darwin"):
leveldb_fallback_dir = os.path.join(
leveldb_fallback_dir, "Library", "Ethereum"
)
elif system.startswith("windows"):
leveldb_fallback_dir = os.path.join(
leveldb_fallback_dir, "AppData", "Roaming", "Ethereum"
)
else:
leveldb_fallback_dir = os.path.join(leveldb_fallback_dir, ".ethereum")
leveldb_fallback_dir = os.path.join(leveldb_fallback_dir, "geth", "chaindata")
if not os.path.exists(self.config_path):
log.info("No config file found. Creating default: " + self.config_path)
open(self.config_path, "a").close()
config = ConfigParser(allow_no_value=True)
config.optionxform = str
config.read(self.config_path, "utf-8")
if "defaults" not in config.sections():
self._add_default_options(config)
if not config.has_option("defaults", "leveldb_dir"):
self._add_leveldb_option(config, leveldb_fallback_dir)
if not config.has_option("defaults", "dynamic_loading"):
self._add_dynamic_loading_option(config)
with codecs.open(self.config_path, "w", "utf-8") as fp:
config.write(fp)
leveldb_dir = config.get(
"defaults", "leveldb_dir", fallback=leveldb_fallback_dir
)
return os.path.expanduser(leveldb_dir)
@staticmethod
def _add_default_options(config):
config.add_section("defaults")
@staticmethod
def _add_leveldb_option(config, leveldb_fallback_dir):
config.set("defaults", "#Default chaindata locations:")
config.set("defaults", "#– Mac: ~/Library/Ethereum/geth/chaindata")
config.set("defaults", "#– Linux: ~/.ethereum/geth/chaindata")
config.set(
"defaults",
"#– Windows: %USERPROFILE%\\AppData\\Roaming\\Ethereum\\geth\\chaindata",
)
config.set("defaults", "leveldb_dir", leveldb_fallback_dir)
@staticmethod
def _add_dynamic_loading_option(config):
config.set("defaults", "#– To connect to Infura use dynamic_loading: infura")
config.set(
"defaults",
"#– To connect to Rpc use "
"dynamic_loading: HOST:PORT / ganache / infura-[network_name]",
)
config.set(
"defaults", "#– To connect to local host use dynamic_loading: localhost"
)
config.set("defaults", "dynamic_loading", "infura")
def analyze_truffle_project(self, *args, **kwargs):
"""
:param args:
:param kwargs:
:return:
"""
return analyze_truffle_project(
self.sigs, *args, **kwargs
) # just passthru by passing signatures for now
@staticmethod
def _init_solc_binary(version):
"""Figure out solc binary and version.
Only proper versions are supported. No nightlies, commits etc (such as available in remix).
"""
if not version:
return os.environ.get("SOLC") or "solc"
# tried converting input to semver, seemed not necessary so just slicing for now
main_version = solc.main.get_solc_version_string()
main_version_number = re.match(r"\d+.\d+.\d+", main_version)
if main_version is None:
raise CriticalError(
"Could not extract solc version from string {}".format(main_version)
)
if version == main_version_number:
log.info("Given version matches installed version")
solc_binary = os.environ.get("SOLC") or "solc"
else:
solc_binary = util.solc_exists(version)
if solc_binary:
log.info("Given version is already installed")
else:
try:
solc.install_solc("v" + version)
solc_binary = util.solc_exists(version)
if not solc_binary:
raise SolcError()
except SolcError:
raise CriticalError(
"There was an error when trying to install the specified solc version"
)
log.info("Setting the compiler to %s", solc_binary)
return solc_binary
def set_api_leveldb(self, leveldb):
"""
:param leveldb:
:return:
"""
self.eth_db = EthLevelDB(leveldb)
self.eth = self.eth_db
return self.eth
def set_api_rpc_infura(self):
"""Set the RPC mode to INFURA on mainnet."""
self.eth = EthJsonRpc("mainnet.infura.io", 443, True)
log.info("Using INFURA for RPC queries")
def set_api_rpc(self, rpc=None, rpctls=False):
"""
:param rpc:
:param rpctls:
"""
if rpc == "ganache":
rpcconfig = ("localhost", 8545, False)
else:
m = re.match(r"infura-(.*)", rpc)
if m and m.group(1) in ["mainnet", "rinkeby", "kovan", "ropsten"]:
rpcconfig = (m.group(1) + ".infura.io", 443, True)
else:
try:
host, port = rpc.split(":")
rpcconfig = (host, int(port), rpctls)
except ValueError:
raise CriticalError(
"Invalid RPC argument, use 'ganache', 'infura-[network]' or 'HOST:PORT'"
)
if rpcconfig:
self.eth = EthJsonRpc(rpcconfig[0], int(rpcconfig[1]), rpcconfig[2])
log.info("Using RPC settings: %s" % str(rpcconfig))
else:
raise CriticalError("Invalid RPC settings, check help for details.")
def set_api_rpc_localhost(self):
"""Set the RPC mode to a local instance."""
self.eth = EthJsonRpc("localhost", 8545)
log.info("Using default RPC settings: http://localhost:8545")
def set_api_from_config_path(self):
"""Set the RPC mode based on a given config file."""
config = ConfigParser(allow_no_value=False)
config.optionxform = str
config.read(self.config_path, "utf-8")
if config.has_option("defaults", "dynamic_loading"):
dynamic_loading = config.get("defaults", "dynamic_loading")
else:
dynamic_loading = "infura"
if dynamic_loading == "infura":
self.set_api_rpc_infura()
elif dynamic_loading == "localhost":
self.set_api_rpc_localhost()
else:
self.set_api_rpc(dynamic_loading)
def search_db(self, search):
"""
:param search:
"""
def search_callback(_, address, balance):
"""
:param _:
:param address:
:param balance:
"""
print("Address: " + address + ", balance: " + str(balance))
try:
self.eth_db.search(search, search_callback)
except SyntaxError:
raise CriticalError("Syntax error in search expression.")
def contract_hash_to_address(self, hash):
"""
:param hash:
"""
if not re.match(r"0x[a-fA-F0-9]{64}", hash):
raise CriticalError("Invalid address hash. Expected format is '0x...'.")
print(self.eth_db.contract_hash_to_address(hash))
def load_from_bytecode(self, code, bin_runtime=False, address=None):
"""
:param code:
:param bin_runtime:
:param address:
:return:
"""
if address is None:
address = util.get_indexed_address(0)
if bin_runtime:
self.contracts.append(
EVMContract(
code=code,
name="MAIN",
enable_online_lookup=self.enable_online_lookup,
)
)
else:
self.contracts.append(
EVMContract(
creation_code=code,
name="MAIN",
enable_online_lookup=self.enable_online_lookup,
)
)
return address, self.contracts[-1] # return address and contract object
def load_from_address(self, address):
"""
:param address:
:return:
"""
if not re.match(r"0x[a-fA-F0-9]{40}", address):
raise CriticalError("Invalid contract address. Expected format is '0x...'.")
try:
code = self.eth.eth_getCode(address)
except FileNotFoundError as e:
raise CriticalError("IPC error: " + str(e))
except ConnectionError:
raise CriticalError(
"Could not connect to RPC server. Make sure that your node is running and that RPC parameters are set correctly."
)
except Exception as e:
raise CriticalError("IPC / RPC error: " + str(e))
else:
if code == "0x" or code == "0x0":
raise CriticalError(
"Received an empty response from eth_getCode. Check the contract address and verify that you are on the correct chain."
)
else:
self.contracts.append(
EVMContract(
code,
name=address,
enable_online_lookup=self.enable_online_lookup,
)
)
return address, self.contracts[-1] # return address and contract object
def load_from_solidity(self, solidity_files):
"""
:param solidity_files:
:return:
"""
address = util.get_indexed_address(0)
contracts = []
for file in solidity_files:
if ":" in file:
file, contract_name = file.split(":")
else:
contract_name = None
file = os.path.expanduser(file)
try:
# import signatures from solidity source
self.sigs.import_solidity_file(
file, solc_binary=self.solc_binary, solc_args=self.solc_args
)
if contract_name is not None:
contract = SolidityContract(
input_file=file,
name=contract_name,
solc_args=self.solc_args,
solc_binary=self.solc_binary,
)
self.contracts.append(contract)
contracts.append(contract)
else:
for contract in get_contracts_from_file(
input_file=file,
solc_args=self.solc_args,
solc_binary=self.solc_binary,
):
self.contracts.append(contract)
contracts.append(contract)
except FileNotFoundError:
raise CriticalError("Input file not found: " + file)
except CompilerError as e:
raise CriticalError(e)
except NoContractFoundError:
log.error(
"The file " + file + " does not contain a compilable contract."
)
return address, contracts
def dump_statespace(
self,
strategy,
contract,
address=None,
max_depth=None,
execution_timeout=None,
create_timeout=None,
enable_iprof=False,
):
"""
:param strategy:
:param contract:
:param address:
:param max_depth:
:param execution_timeout:
:param create_timeout:
:return:
"""
sym = SymExecWrapper(
contract,
address,
strategy,
dynloader=DynLoader(
self.eth,
storage_loading=self.onchain_storage_access,
contract_loading=self.dynld,
),
max_depth=max_depth,
execution_timeout=execution_timeout,
create_timeout=create_timeout,
enable_iprof=enable_iprof,
)
return get_serializable_statespace(sym)
def graph_html(
self,
strategy,
contract,
address,
max_depth=None,
enable_physics=False,
phrackify=False,
execution_timeout=None,
create_timeout=None,
enable_iprof=False,
):
"""
:param strategy:
:param contract:
:param address:
:param max_depth:
:param enable_physics:
:param phrackify:
:param execution_timeout:
:param create_timeout:
:return:
"""
sym = SymExecWrapper(
contract,
address,
strategy,
dynloader=DynLoader(
self.eth,
storage_loading=self.onchain_storage_access,
contract_loading=self.dynld,
),
max_depth=max_depth,
execution_timeout=execution_timeout,
create_timeout=create_timeout,
enable_iprof=enable_iprof,
)
return generate_graph(sym, physics=enable_physics, phrackify=phrackify)
def fire_lasers(
self,
strategy,
contracts=None,
address=None,
modules=None,
verbose_report=False,
max_depth=None,
execution_timeout=None,
create_timeout=None,
transaction_count=None,
enable_iprof=False,
):
"""
:param strategy:
:param contracts:
:param address:
:param modules:
:param verbose_report:
:param max_depth:
:param execution_timeout:
:param create_timeout:
:param transaction_count:
:return:
"""
all_issues = []
for contract in contracts or self.contracts:
try:
sym = SymExecWrapper(
contract,
address,
strategy,
dynloader=DynLoader(
self.eth,
storage_loading=self.onchain_storage_access,
contract_loading=self.dynld,
),
max_depth=max_depth,
execution_timeout=execution_timeout,
create_timeout=create_timeout,
transaction_count=transaction_count,
modules=modules,
compulsory_statespace=False,
enable_iprof=enable_iprof,
)
issues = fire_lasers(sym, modules)
except KeyboardInterrupt:
log.critical("Keyboard Interrupt")
issues = retrieve_callback_issues(modules)
except Exception:
log.critical(
"Exception occurred, aborting analysis. Please report this issue to the Mythril GitHub page.\n"
+ traceback.format_exc()
)
issues = retrieve_callback_issues(modules)
for issue in issues:
issue.add_code_info(contract)
all_issues += issues
source_data = Source()
source_data.get_source_from_contracts_list(self.contracts)
# Finally, output the results
report = Report(verbose_report, source_data)
for issue in all_issues:
report.append_issue(issue)
return report
def get_state_variable_from_storage(self, address, params=None):
"""
:param address:
:param params:
:return:
"""
if params is None:
params = []
(position, length, mappings) = (0, 1, [])
try:
if params[0] == "mapping":
if len(params) < 3:
raise CriticalError("Invalid number of parameters.")
position = int(params[1])
position_formatted = utils.zpad(utils.int_to_big_endian(position), 32)
for i in range(2, len(params)):
key = bytes(params[i], "utf8")
key_formatted = utils.rzpad(key, 32)
mappings.append(
int.from_bytes(
utils.sha3(key_formatted + position_formatted),
byteorder="big",
)
)
length = len(mappings)
if length == 1:
position = mappings[0]
else:
if len(params) >= 4:
raise CriticalError("Invalid number of parameters.")
if len(params) >= 1:
position = int(params[0])
if len(params) >= 2:
length = int(params[1])
if len(params) == 3 and params[2] == "array":
position_formatted = utils.zpad(
utils.int_to_big_endian(position), 32
)
position = int.from_bytes(
utils.sha3(position_formatted), byteorder="big"
)
except ValueError:
raise CriticalError(
"Invalid storage index. Please provide a numeric value."
)
outtxt = []
try:
if length == 1:
outtxt.append(
"{}: {}".format(
position, self.eth.eth_getStorageAt(address, position)
)
)
else:
if len(mappings) > 0:
for i in range(0, len(mappings)):
position = mappings[i]
outtxt.append(
"{}: {}".format(
hex(position),
self.eth.eth_getStorageAt(address, position),
)
)
else:
for i in range(position, position + length):
outtxt.append(
"{}: {}".format(
hex(i), self.eth.eth_getStorageAt(address, i)
)
)
except FileNotFoundError as e:
raise CriticalError("IPC error: " + str(e))
except ConnectionError:
raise CriticalError(
"Could not connect to RPC server. Make sure that your node is running and that RPC parameters are set correctly."
)
return "\n".join(outtxt)
@staticmethod
def disassemble(contract):
"""
:param contract:
:return:
"""
return contract.get_easm()
@staticmethod
def hash_for_function_signature(sig):
"""
:param sig:
:return:
"""
return "0x%s" % utils.sha3(sig)[:4].hex()

@ -0,0 +1,4 @@
from .mythril_disassembler import MythrilDisassembler
from .mythril_analyzer import MythrilAnalyzer
from .mythril_config import MythrilConfig
from .mythril_leveldb import MythrilLevelDB

@ -0,0 +1,197 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""mythril.py: Bug hunting on the Ethereum blockchain
http://www.github.com/b-mueller/mythril
"""
import logging
import traceback
from mythril.support.source_support import Source
from mythril.support.loader import DynLoader
from mythril.analysis.symbolic import SymExecWrapper
from mythril.analysis.callgraph import generate_graph
from mythril.analysis.traceexplore import get_serializable_statespace
from mythril.analysis.security import fire_lasers, retrieve_callback_issues
from mythril.analysis.report import Report
log = logging.getLogger(__name__)
class MythrilAnalyzer(object):
"""Mythril main interface class.
.. code-block:: python
mythril = Mythril()
# analyze
print(mythril.fire_lasers(args).as_text())
# (optional) graph
for contract in mythril.contracts:
# prints html or save it to file
print(mythril.graph_html(args))
# (optional) other funcs
mythril.dump_statespaces(args)
mythril.disassemble(contract)
mythril.get_state_variable_from_storage(args)
"""
def __init__(self, disassembler, dynld=False, onchain_storage_access=True):
self.eth = disassembler.eth
self.contracts = disassembler.contracts or []
self.enable_online_lookup = disassembler.enable_online_lookup
self.dynld = dynld
self.onchain_storage_access = onchain_storage_access
def dump_statespace(
self,
strategy,
contract,
address=None,
max_depth=None,
execution_timeout=None,
create_timeout=None,
enable_iprof=False,
):
"""
:param strategy:
:param contract:
:param address:
:param max_depth:
:param execution_timeout:
:param create_timeout:
:param enable_iprof:
:return:
"""
sym = SymExecWrapper(
contract,
address,
strategy,
dynloader=DynLoader(
self.eth,
storage_loading=self.onchain_storage_access,
contract_loading=self.dynld,
),
max_depth=max_depth,
execution_timeout=execution_timeout,
create_timeout=create_timeout,
enable_iprof=enable_iprof,
)
return get_serializable_statespace(sym)
def graph_html(
self,
strategy,
contract,
address,
max_depth=None,
enable_physics=False,
phrackify=False,
execution_timeout=None,
create_timeout=None,
enable_iprof=False,
):
"""
:param strategy:
:param contract:
:param address:
:param max_depth:
:param enable_physics:
:param phrackify:
:param execution_timeout:
:param create_timeout:
:param enable_iprof:
:return:
"""
sym = SymExecWrapper(
contract,
address,
strategy,
dynloader=DynLoader(
self.eth,
storage_loading=self.onchain_storage_access,
contract_loading=self.dynld,
),
max_depth=max_depth,
execution_timeout=execution_timeout,
create_timeout=create_timeout,
enable_iprof=enable_iprof,
)
return generate_graph(sym, physics=enable_physics, phrackify=phrackify)
def fire_lasers(
self,
strategy,
contracts=None,
address=None,
modules=None,
verbose_report=False,
max_depth=None,
execution_timeout=None,
create_timeout=None,
transaction_count=None,
enable_iprof=False,
):
"""
:param strategy:
:param contracts:
:param address:
:param modules:
:param verbose_report:
:param max_depth:
:param execution_timeout:
:param create_timeout:
:param transaction_count:
:param enable_iprof:
:return:
"""
all_issues = []
for contract in contracts or self.contracts:
try:
sym = SymExecWrapper(
contract,
address,
strategy,
dynloader=DynLoader(
self.eth,
storage_loading=self.onchain_storage_access,
contract_loading=self.dynld,
),
max_depth=max_depth,
execution_timeout=execution_timeout,
create_timeout=create_timeout,
transaction_count=transaction_count,
modules=modules,
compulsory_statespace=False,
enable_iprof=enable_iprof,
)
issues = fire_lasers(sym, modules)
except KeyboardInterrupt:
log.critical("Keyboard Interrupt")
issues = retrieve_callback_issues(modules)
except Exception:
log.critical(
"Exception occurred, aborting analysis. Please report this issue to the Mythril GitHub page.\n"
+ traceback.format_exc()
)
issues = retrieve_callback_issues(modules)
for issue in issues:
issue.add_code_info(contract)
all_issues += issues
source_data = Source()
source_data.get_source_from_contracts_list(self.contracts)
# Finally, output the results
report = Report(verbose_report, source_data)
for issue in all_issues:
report.append_issue(issue)
return report

@ -0,0 +1,183 @@
import codecs
import logging
import os
import platform
import re
from pathlib import Path
from shutil import copyfile
from configparser import ConfigParser
from mythril.exceptions import CriticalError
from mythril.ethereum.interface.rpc.client import EthJsonRpc
from mythril.ethereum.interface.leveldb.client import EthLevelDB
log = logging.getLogger(__name__)
class MythrilConfig(object):
def __init__(self):
self.mythril_dir = self._init_mythril_dir()
self.config_path = os.path.join(self.mythril_dir, "config.ini")
self.leveldb_dir = self._init_config()
self.eth = None
@staticmethod
def _init_mythril_dir():
try:
mythril_dir = os.environ["MYTHRIL_DIR"]
except KeyError:
mythril_dir = os.path.join(os.path.expanduser("~"), ".mythril")
if not os.path.exists(mythril_dir):
# Initialize data directory
log.info("Creating mythril data directory")
os.mkdir(mythril_dir)
db_path = str(Path(mythril_dir) / "signatures.db")
if not os.path.exists(db_path):
# if the default mythril dir doesn't contain a signature DB
# initialize it with the default one from the project root
asset_dir = Path(__file__).parent / "support" / "assets"
copyfile(str(asset_dir / "signatures.db"), db_path)
return mythril_dir
def _init_config(self):
"""If no config file exists, create it and add default options.
Default LevelDB path is specified based on OS
dynamic loading is set to infura by default in the file
Returns: leveldb directory
"""
leveldb_fallback_dir = self._get_fallback_dir()
if not os.path.exists(self.config_path):
log.info("No config file found. Creating default: " + self.config_path)
open(self.config_path, "a").close()
config = ConfigParser(allow_no_value=True)
config.optionxform = str
config.read(self.config_path, "utf-8")
if "defaults" not in config.sections():
self._add_default_options(config)
if not config.has_option("defaults", "leveldb_dir"):
self._add_leveldb_option(config, leveldb_fallback_dir)
if not config.has_option("defaults", "dynamic_loading"):
self._add_dynamic_loading_option(config)
with codecs.open(self.config_path, "w", "utf-8") as fp:
config.write(fp)
leveldb_dir = config.get(
"defaults", "leveldb_dir", fallback=leveldb_fallback_dir
)
return os.path.expanduser(leveldb_dir)
@staticmethod
def _get_fallback_dir():
system = platform.system().lower()
leveldb_fallback_dir = os.path.expanduser("~")
if system.startswith("darwin"):
leveldb_fallback_dir = os.path.join(
leveldb_fallback_dir, "Library", "Ethereum"
)
elif system.startswith("windows"):
leveldb_fallback_dir = os.path.join(
leveldb_fallback_dir, "AppData", "Roaming", "Ethereum"
)
else:
leveldb_fallback_dir = os.path.join(leveldb_fallback_dir, ".ethereum")
return os.path.join(leveldb_fallback_dir, "geth", "chaindata")
@staticmethod
def _add_default_options(config):
config.add_section("defaults")
@staticmethod
def _add_leveldb_option(config, leveldb_fallback_dir):
config.set("defaults", "#Default chaindata locations:")
config.set("defaults", "#– Mac: ~/Library/Ethereum/geth/chaindata")
config.set("defaults", "#– Linux: ~/.ethereum/geth/chaindata")
config.set(
"defaults",
"#– Windows: %USERPROFILE%\\AppData\\Roaming\\Ethereum\\geth\\chaindata",
)
config.set("defaults", "leveldb_dir", leveldb_fallback_dir)
@staticmethod
def _add_dynamic_loading_option(config):
config.set("defaults", "#– To connect to Infura use dynamic_loading: infura")
config.set(
"defaults",
"#– To connect to Rpc use "
"dynamic_loading: HOST:PORT / ganache / infura-[network_name]",
)
config.set(
"defaults", "#– To connect to local host use dynamic_loading: localhost"
)
config.set("defaults", "dynamic_loading", "infura")
def set_api_leveldb(self, leveldb_path):
"""
:param leveldb:
:return:
"""
self.eth = EthLevelDB(leveldb_path)
def set_api_rpc_infura(self):
"""Set the RPC mode to INFURA on mainnet."""
log.info("Using INFURA for RPC queries")
self.eth = EthJsonRpc("mainnet.infura.io", 443, True)
def set_api_rpc(self, rpc=None, rpctls=False):
"""
:param rpc:
:param rpctls:
"""
if rpc == "ganache":
rpcconfig = ("localhost", 8545, False)
else:
m = re.match(r"infura-(.*)", rpc)
if m and m.group(1) in ["mainnet", "rinkeby", "kovan", "ropsten"]:
rpcconfig = (m.group(1) + ".infura.io", 443, True)
else:
try:
host, port = rpc.split(":")
rpcconfig = (host, int(port), rpctls)
except ValueError:
raise CriticalError(
"Invalid RPC argument, use 'ganache', 'infura-[network]' or 'HOST:PORT'"
)
if rpcconfig:
log.info("Using RPC settings: %s" % str(rpcconfig))
self.eth = EthJsonRpc(rpcconfig[0], int(rpcconfig[1]), rpcconfig[2])
else:
raise CriticalError("Invalid RPC settings, check help for details.")
def set_api_rpc_localhost():
"""Set the RPC mode to a local instance."""
log.info("Using default RPC settings: http://localhost:8545")
self.eth = EthJsonRpc("localhost", 8545)
def set_api_from_config_path(self):
"""Set the RPC mode based on a given config file."""
config = ConfigParser(allow_no_value=False)
config.optionxform = str
config.read(self.config_path, "utf-8")
if config.has_option("defaults", "dynamic_loading"):
dynamic_loading = config.get("defaults", "dynamic_loading")
else:
dynamic_loading = "infura"
if dynamic_loading == "infura":
return self.set_api_rpc_infura()
elif dynamic_loading == "localhost":
return self.set_api_rpc_localhost()
else:
return self.set_api_rpc(dynamic_loading)

@ -0,0 +1,293 @@
import logging
import re
import solc
import os
from ethereum import utils
from solc.exceptions import SolcError
from mythril.ethereum import util
from mythril.exceptions import CriticalError, CompilerError, NoContractFoundError
from mythril.support import signatures
from mythril.support.truffle import analyze_truffle_project
from mythril.ethereum.evmcontract import EVMContract
from mythril.ethereum.interface.rpc.exceptions import ConnectionError
from mythril.solidity.soliditycontract import SolidityContract, get_contracts_from_file
log = logging.getLogger(__name__)
class MythrilDisassembler:
def __init__(
self, eth, solc_version=None, solc_args=None, enable_online_lookup=False
):
self.solc_binary = self._init_solc_binary(solc_version)
self.solc_args = solc_args
self.eth = eth
self.enable_online_lookup = enable_online_lookup
self.sigs = signatures.SignatureDB(enable_online_lookup=enable_online_lookup)
self.contracts = []
@staticmethod
def _init_solc_binary(version):
"""Figure out solc binary and version.
Only proper versions are supported. No nightlies, commits etc (such as available in remix).
"""
if not version:
return os.environ.get("SOLC") or "solc"
# tried converting input to semver, seemed not necessary so just slicing for now
main_version = solc.main.get_solc_version_string()
main_version_number = re.match(r"\d+.\d+.\d+", main_version)
if main_version is None:
raise CriticalError(
"Could not extract solc version from string {}".format(main_version)
)
if version == main_version_number:
log.info("Given version matches installed version")
solc_binary = os.environ.get("SOLC") or "solc"
else:
solc_binary = util.solc_exists(version)
if solc_binary:
log.info("Given version is already installed")
else:
try:
solc.install_solc("v" + version)
solc_binary = util.solc_exists(version)
if not solc_binary:
raise SolcError()
except SolcError:
raise CriticalError(
"There was an error when trying to install the specified solc version"
)
log.info("Setting the compiler to %s", solc_binary)
return solc_binary
def load_from_bytecode(self, code, bin_runtime=False, address=None):
"""
:param code:
:param bin_runtime:
:param address:
:return:
"""
if address is None:
address = util.get_indexed_address(0)
if bin_runtime:
self.contracts.append(
EVMContract(
code=code,
name="MAIN",
enable_online_lookup=self.enable_online_lookup,
)
)
else:
self.contracts.append(
EVMContract(
creation_code=code,
name="MAIN",
enable_online_lookup=self.enable_online_lookup,
)
)
return address, self.contracts[-1] # return address and contract object
def load_from_address(self, address):
"""
:param address:
:return:
"""
if not re.match(r"0x[a-fA-F0-9]{40}", address):
raise CriticalError("Invalid contract address. Expected format is '0x...'.")
try:
code = self.eth.eth_getCode(address)
except FileNotFoundError as e:
raise CriticalError("IPC error: " + str(e))
except ConnectionError:
raise CriticalError(
"Could not connect to RPC server. Make sure that your node is running and that RPC parameters are set correctly."
)
except Exception as e:
raise CriticalError("IPC / RPC error: " + str(e))
else:
if code == "0x" or code == "0x0":
raise CriticalError(
"Received an empty response from eth_getCode. Check the contract address and verify that you are on the correct chain."
)
else:
self.contracts.append(
EVMContract(
code,
name=address,
enable_online_lookup=self.enable_online_lookup,
)
)
return address, self.contracts[-1] # return address and contract object
def load_from_solidity(self, solidity_files):
"""
:param solidity_files:
:return:
"""
address = util.get_indexed_address(0)
contracts = []
for file in solidity_files:
if ":" in file:
file, contract_name = file.split(":")
else:
contract_name = None
file = os.path.expanduser(file)
try:
# import signatures from solidity source
self.sigs.import_solidity_file(
file, solc_binary=self.solc_binary, solc_args=self.solc_args
)
if contract_name is not None:
contract = SolidityContract(
input_file=file,
name=contract_name,
solc_args=self.solc_args,
solc_binary=self.solc_binary,
)
self.contracts.append(contract)
contracts.append(contract)
else:
for contract in get_contracts_from_file(
input_file=file,
solc_args=self.solc_args,
solc_binary=self.solc_binary,
):
self.contracts.append(contract)
contracts.append(contract)
except FileNotFoundError:
raise CriticalError("Input file not found: " + file)
except CompilerError as e:
raise CriticalError(e)
except NoContractFoundError:
log.error(
"The file " + file + " does not contain a compilable contract."
)
return address, contracts
def analyze_truffle_project(self, *args, **kwargs):
"""
:param args:
:param kwargs:
:return:
"""
return analyze_truffle_project(
self.sigs, *args, **kwargs
) # just passthru by passing signatures for now
@staticmethod
def disassemble(contract):
"""
:param contract:
:return:
"""
return contract.get_easm()
@staticmethod
def hash_for_function_signature(sig):
"""
:param sig:
:return:
"""
return "0x%s" % utils.sha3(sig)[:4].hex()
def get_state_variable_from_storage(self, address, params=None):
"""
:param address:
:param params:
:return:
"""
if params is None:
params = []
(position, length, mappings) = (0, 1, [])
try:
if params[0] == "mapping":
if len(params) < 3:
raise CriticalError("Invalid number of parameters.")
position = int(params[1])
position_formatted = utils.zpad(utils.int_to_big_endian(position), 32)
for i in range(2, len(params)):
key = bytes(params[i], "utf8")
key_formatted = utils.rzpad(key, 32)
mappings.append(
int.from_bytes(
utils.sha3(key_formatted + position_formatted),
byteorder="big",
)
)
length = len(mappings)
if length == 1:
position = mappings[0]
else:
if len(params) >= 4:
raise CriticalError("Invalid number of parameters.")
if len(params) >= 1:
position = int(params[0])
if len(params) >= 2:
length = int(params[1])
if len(params) == 3 and params[2] == "array":
position_formatted = utils.zpad(
utils.int_to_big_endian(position), 32
)
position = int.from_bytes(
utils.sha3(position_formatted), byteorder="big"
)
except ValueError:
raise CriticalError(
"Invalid storage index. Please provide a numeric value."
)
outtxt = []
try:
if length == 1:
outtxt.append(
"{}: {}".format(
position, self.eth.eth_getStorageAt(address, position)
)
)
else:
if len(mappings) > 0:
for i in range(0, len(mappings)):
position = mappings[i]
outtxt.append(
"{}: {}".format(
hex(position),
self.eth.eth_getStorageAt(address, position),
)
)
else:
for i in range(position, position + length):
outtxt.append(
"{}: {}".format(
hex(i), self.eth.eth_getStorageAt(address, i)
)
)
except FileNotFoundError as e:
raise CriticalError("IPC error: " + str(e))
except ConnectionError:
raise CriticalError(
"Could not connect to RPC server. Make sure that your node is running and that RPC parameters are set correctly."
)
return "\n".join(outtxt)

@ -0,0 +1,42 @@
import re
from mythril.exceptions import CriticalError
class MythrilLevelDB(object):
"""
Class which does operations on leveldb
"""
def __init__(self, leveldb):
self.level_db = leveldb
def search_db(self, search):
"""
Searches
:param search:
"""
def search_callback(_, address, balance):
"""
:param _:
:param address:
:param balance:
"""
print("Address: " + address + ", balance: " + str(balance))
try:
self.level_db.search(search, search_callback)
except SyntaxError:
raise CriticalError("Syntax error in search expression.")
def contract_hash_to_address(self, hash):
"""
Returns address of the corresponding hash by searching the leveldb
:param hash: Hash to be searched
"""
if not re.match(r"0x[a-fA-F0-9]{64}", hash):
raise CriticalError("Invalid address hash. Expected format is '0x...'.")
print(self.level_db.contract_hash_to_address(hash))

@ -1,4 +1,4 @@
from mythril.mythril import Mythril
from mythril.mythril import MythrilDisassembler
from mythril.laser.ethereum.transaction import execute_contract_creation
from mythril.ethereum import util
import mythril.laser.ethereum.svm as svm
@ -13,7 +13,7 @@ from mythril.analysis.symbolic import SymExecWrapper
def test_create():
contract = SolidityContract(
str(tests.TESTDATA_INPUTS_CONTRACTS / "calls.sol"),
solc_binary=Mythril._init_solc_binary("0.5.0"),
solc_binary=MythrilDisassembler._init_solc_binary("0.5.0"),
)
laser_evm = svm.LaserEVM({})
@ -37,7 +37,7 @@ def test_create():
def test_sym_exec():
contract = SolidityContract(
str(tests.TESTDATA_INPUTS_CONTRACTS / "calls.sol"),
solc_binary=Mythril._init_solc_binary("0.5.0"),
solc_binary=MythrilDisassembler._init_solc_binary("0.5.0"),
)
sym = SymExecWrapper(

@ -1,5 +1,5 @@
from mythril.solidity.soliditycontract import SolidityContract
from mythril.mythril import Mythril
from mythril.mythril import MythrilDisassembler
from mythril.laser.ethereum.state.account import Account
from mythril.laser.ethereum.state.machine_state import MachineState
from mythril.laser.ethereum.state.global_state import GlobalState
@ -84,7 +84,8 @@ class NativeTests(BaseTestCase):
def runTest():
""""""
disassembly = SolidityContract(
"./tests/native_tests.sol", solc_binary=Mythril._init_solc_binary("0.5.0")
"./tests/native_tests.sol",
solc_binary=MythrilDisassembler._init_solc_binary("0.5.0"),
).disassembly
account = Account("0x0000000000000000000000000000000000000000", disassembly)
accounts = {account.address: account}

@ -1,6 +1,6 @@
from pathlib import Path
from mythril.mythril import Mythril
from mythril.mythril import MythrilDisassembler
from mythril.solidity.soliditycontract import SolidityContract
from tests import BaseTestCase
@ -11,7 +11,7 @@ class SolidityContractTest(BaseTestCase):
def test_get_source_info_without_name_gets_latest_contract_info(self):
input_file = TEST_FILES / "multi_contracts.sol"
contract = SolidityContract(
str(input_file), solc_binary=Mythril._init_solc_binary("0.5.0")
str(input_file), solc_binary=MythrilDisassembler._init_solc_binary("0.5.0")
)
code_info = contract.get_source_info(142)
@ -25,7 +25,7 @@ class SolidityContractTest(BaseTestCase):
contract = SolidityContract(
str(input_file),
name="Transfer1",
solc_binary=Mythril._init_solc_binary("0.5.0"),
solc_binary=MythrilDisassembler._init_solc_binary("0.5.0"),
)
code_info = contract.get_source_info(142)
@ -39,7 +39,7 @@ class SolidityContractTest(BaseTestCase):
contract = SolidityContract(
str(input_file),
name="AssertFail",
solc_binary=Mythril._init_solc_binary("0.5.0"),
solc_binary=MythrilDisassembler._init_solc_binary("0.5.0"),
)
code_info = contract.get_source_info(70, constructor=True)

Loading…
Cancel
Save