Merge pull request #906 from ConsenSys/refactor/896

Refactors mythril.py
pull/973/head
JoranHonig 6 years ago committed by GitHub
commit 7041f32ba7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      .gitignore
  2. 4
      mythril/ethereum/interface/leveldb/state.py
  3. 147
      mythril/interfaces/cli.py
  4. 4
      mythril/mythril/__init__.py
  5. 173
      mythril/mythril/mythril_analyzer.py
  6. 226
      mythril/mythril/mythril_config.py
  7. 303
      mythril/mythril/mythril_disassembler.py
  8. 49
      mythril/mythril/mythril_leveldb.py
  9. 15
      tests/graph_test.py
  10. 6
      tests/laser/transaction/create_transaction_test.py
  11. 31
      tests/mythril/mythril_analyzer_test.py
  12. 58
      tests/mythril/mythril_config_test.py
  13. 70
      tests/mythril/mythril_disassembler_test.py
  14. 51
      tests/mythril/mythril_leveldb_test.py
  15. 5
      tests/native_test.py
  16. 8
      tests/solidity_contract_test.py
  17. 2
      tests/testdata/mythril_config_inputs/config.ini
  18. 2
      tox.ini

2
.gitignore vendored

@ -175,7 +175,7 @@ lol*
coverage_html_report/ coverage_html_report/
tests/testdata/outputs_current/ tests/testdata/outputs_current/
tests/testdata/outputs_current_laser_result/ tests/testdata/outputs_current_laser_result/
tests/mythril_dir/signatures.db tests/testdata/mythril_config_inputs/config.ini
# VSCode # VSCode
.vscode .vscode

@ -150,7 +150,7 @@ class State:
rlpdata = self.trie.get(addr) rlpdata = self.trie.get(addr)
if rlpdata != trie.BLANK_NODE: if rlpdata != trie.BLANK_NODE:
o = rlp.decode(rlpdata, Account, db=self.db, address=addr) o = rlp.decode(rlpdata, Account, db=self.db, addr=addr)
else: else:
o = Account.blank_account(self.db, addr, 0) o = Account.blank_account(self.db, addr, 0)
self.cache[addr] = o self.cache[addr] = o
@ -162,4 +162,4 @@ class State:
"""iterates through trie to and yields non-blank leafs as accounts.""" """iterates through trie to and yields non-blank leafs as accounts."""
for address_hash, rlpdata in self.secure_trie.trie.iter_branch(): for address_hash, rlpdata in self.secure_trie.trie.iter_branch():
if rlpdata != trie.BLANK_NODE: if rlpdata != trie.BLANK_NODE:
yield rlp.decode(rlpdata, Account, db=self.db, address=address_hash) yield rlp.decode(rlpdata, Account, db=self.db, addr=address_hash)

@ -10,13 +10,18 @@ import json
import logging import logging
import os import os
import sys import sys
import traceback
import coloredlogs import coloredlogs
import traceback
import mythril.support.signatures as sigs import mythril.support.signatures as sigs
from mythril.exceptions import AddressNotFoundError, CriticalError from mythril.exceptions import AddressNotFoundError, CriticalError
from mythril.mythril import Mythril from mythril.mythril import (
MythrilAnalyzer,
MythrilDisassembler,
MythrilConfig,
MythrilLevelDB,
)
from mythril.version import VERSION from mythril.version import VERSION
# logging.basicConfig(level=logging.DEBUG) # logging.basicConfig(level=logging.DEBUG)
@ -26,7 +31,6 @@ log = logging.getLogger(__name__)
def exit_with_error(format_, message): def exit_with_error(format_, message):
""" """
:param format_: :param format_:
:param message: :param message:
""" """
@ -64,10 +68,6 @@ def main() -> None:
parse_args(parser=parser, args=args) parse_args(parser=parser, args=args)
if __name__ == "__main__":
main()
def create_parser(parser: argparse.ArgumentParser) -> None: def create_parser(parser: argparse.ArgumentParser) -> None:
""" """
Creates the parser by setting all the possible arguments Creates the parser by setting all the possible arguments
@ -315,61 +315,56 @@ def validate_args(parser: argparse.ArgumentParser, args: argparse.Namespace):
def quick_commands(args: argparse.Namespace): def quick_commands(args: argparse.Namespace):
if args.hash: if args.hash:
print(Mythril.hash_for_function_signature(args.hash)) print(MythrilDisassembler.hash_for_function_signature(args.hash))
sys.exit() sys.exit()
def set_config(args: argparse.Namespace): def set_config(args: argparse.Namespace):
mythril = Mythril( config = MythrilConfig()
solv=args.solv,
dynld=args.dynld,
onchain_storage_access=(not args.no_onchain_storage_access),
solc_args=args.solc_args,
enable_online_lookup=args.query_signature,
)
if args.dynld or not args.no_onchain_storage_access and not (args.rpc or args.i): if args.dynld or not args.no_onchain_storage_access and not (args.rpc or args.i):
mythril.set_api_from_config_path() config.set_api_from_config_path()
if args.address: if args.address:
# Establish RPC connection if necessary # Establish RPC connection if necessary
mythril.set_api_rpc(rpc=args.rpc, rpctls=args.rpctls) config.set_api_rpc(rpc=args.rpc, rpctls=args.rpctls)
elif args.search or args.contract_hash_to_address: elif args.search or args.contract_hash_to_address:
# Open LevelDB if necessary # Open LevelDB if necessary
mythril.set_api_leveldb( config.set_api_leveldb(
mythril.leveldb_dir if not args.leveldb_dir else args.leveldb_dir config.leveldb_dir if not args.leveldb_dir else args.leveldb_dir
) )
return mythril return config
def leveldb_search(mythril: Mythril, args: argparse.Namespace): def leveldb_search(config: MythrilConfig, args: argparse.Namespace):
if args.search: if args.search or args.contract_hash_to_address:
# Database search ops leveldb_searcher = MythrilLevelDB(config.eth_db)
mythril.search_db(args.search) if args.search:
sys.exit() # Database search ops
leveldb_searcher.search_db(args.search)
if args.contract_hash_to_address: else:
# search corresponding address # search corresponding address
try: try:
mythril.contract_hash_to_address(args.contract_hash_to_address) leveldb_searcher.contract_hash_to_address(args.contract_hash_to_address)
except AddressNotFoundError: except AddressNotFoundError:
print("Address not found.") print("Address not found.")
sys.exit() sys.exit()
def get_code(mythril: Mythril, args: argparse.Namespace): def get_code(disassembler: MythrilDisassembler, args: argparse.Namespace):
address = None address = None
if args.code: if args.code:
# Load from bytecode # Load from bytecode
code = args.code[2:] if args.code.startswith("0x") else args.code code = args.code[2:] if args.code.startswith("0x") else args.code
address, _ = mythril.load_from_bytecode(code, args.bin_runtime) address, _ = disassembler.load_from_bytecode(code, args.bin_runtime)
elif args.codefile: elif args.codefile:
bytecode = "".join([l.strip() for l in args.codefile if len(l.strip()) > 0]) bytecode = "".join([l.strip() for l in args.codefile if len(l.strip()) > 0])
bytecode = bytecode[2:] if bytecode.startswith("0x") else bytecode bytecode = bytecode[2:] if bytecode.startswith("0x") else bytecode
address, _ = mythril.load_from_bytecode(bytecode, args.bin_runtime) address, _ = disassembler.load_from_bytecode(bytecode, args.bin_runtime)
elif args.address: elif args.address:
# Get bytecode from a contract address # Get bytecode from a contract address
address, _ = mythril.load_from_address(args.address) address, _ = disassembler.load_from_address(args.address)
elif args.solidity_file: elif args.solidity_file:
# Compile Solidity source file(s) # Compile Solidity source file(s)
if args.graph and len(args.solidity_file) > 1: if args.graph and len(args.solidity_file) > 1:
@ -377,7 +372,9 @@ def get_code(mythril: Mythril, args: argparse.Namespace):
args.outform, args.outform,
"Cannot generate call graphs from multiple input files. Please do it one at a time.", "Cannot generate call graphs from multiple input files. Please do it one at a time.",
) )
address, _ = mythril.load_from_solidity(args.solidity_file) # list of files address, _ = disassembler.load_from_solidity(
args.solidity_file
) # list of files
else: else:
exit_with_error( exit_with_error(
args.outform, args.outform,
@ -387,11 +384,12 @@ def get_code(mythril: Mythril, args: argparse.Namespace):
def execute_command( def execute_command(
mythril: Mythril, disassembler: MythrilDisassembler,
address: str, address: str,
parser: argparse.ArgumentParser, parser: argparse.ArgumentParser,
args: argparse.Namespace, args: argparse.Namespace,
): ):
if args.storage: if args.storage:
if not args.address: if not args.address:
exit_with_error( exit_with_error(
@ -399,36 +397,42 @@ def execute_command(
"To read storage, provide the address of a deployed contract with the -a option.", "To read storage, provide the address of a deployed contract with the -a option.",
) )
storage = mythril.get_state_variable_from_storage( storage = disassembler.get_state_variable_from_storage(
address=address, params=[a.strip() for a in args.storage.strip().split(",")] address=address, params=[a.strip() for a in args.storage.strip().split(",")]
) )
print(storage) print(storage)
return
analyzer = MythrilAnalyzer(
strategy=args.strategy,
disassembler=disassembler,
address=address,
max_depth=args.max_depth,
execution_timeout=args.execution_timeout,
create_timeout=args.create_timeout,
enable_iprof=args.enable_iprof,
onchain_storage_access=not args.no_onchain_storage_access,
)
elif args.disassemble: if args.disassemble:
# or mythril.disassemble(mythril.contracts[0]) # or mythril.disassemble(mythril.contracts[0])
if mythril.contracts[0].code: if disassembler.contracts[0].code:
print("Runtime Disassembly: \n" + mythril.contracts[0].get_easm()) print("Runtime Disassembly: \n" + disassembler.contracts[0].get_easm())
if mythril.contracts[0].creation_code: if disassembler.contracts[0].creation_code:
print("Disassembly: \n" + mythril.contracts[0].get_creation_easm()) print("Disassembly: \n" + disassembler.contracts[0].get_creation_easm())
elif args.graph or args.fire_lasers: elif args.graph or args.fire_lasers:
if not mythril.contracts: if not disassembler.contracts:
exit_with_error( exit_with_error(
args.outform, "input files do not contain any valid contracts" args.outform, "input files do not contain any valid contracts"
) )
if args.graph: if args.graph:
html = mythril.graph_html( html = analyzer.graph_html(
strategy=args.strategy, contract=analyzer.contracts[0],
contract=mythril.contracts[0],
address=address,
enable_physics=args.enable_physics, enable_physics=args.enable_physics,
phrackify=args.phrack, phrackify=args.phrack,
max_depth=args.max_depth,
execution_timeout=args.execution_timeout,
create_timeout=args.create_timeout,
enable_iprof=args.enable_iprof,
) )
try: try:
@ -439,18 +443,12 @@ def execute_command(
else: else:
try: try:
report = mythril.fire_lasers( report = analyzer.fire_lasers(
strategy=args.strategy,
address=address,
modules=[m.strip() for m in args.modules.strip().split(",")] modules=[m.strip() for m in args.modules.strip().split(",")]
if args.modules if args.modules
else [], else [],
verbose_report=args.verbose_report, verbose_report=args.verbose_report,
max_depth=args.max_depth,
execution_timeout=args.execution_timeout,
create_timeout=args.create_timeout,
transaction_count=args.transaction_count, transaction_count=args.transaction_count,
enable_iprof=args.enable_iprof,
) )
outputs = { outputs = {
"json": report.as_json(), "json": report.as_json(),
@ -466,20 +464,12 @@ def execute_command(
elif args.statespace_json: elif args.statespace_json:
if not mythril.contracts: if not analyzer.contracts:
exit_with_error( exit_with_error(
args.outform, "input files do not contain any valid contracts" args.outform, "input files do not contain any valid contracts"
) )
statespace = mythril.dump_statespace( statespace = analyzer.dump_statespace(contract=analyzer.contracts[0])
strategy=args.strategy,
contract=mythril.contracts[0],
address=address,
max_depth=args.max_depth,
execution_timeout=args.execution_timeout,
create_timeout=args.create_timeout,
enable_iprof=args.enable_iprof,
)
try: try:
with open(args.statespace_json, "w") as f: with open(args.statespace_json, "w") as f:
@ -515,20 +505,27 @@ def parse_args(parser: argparse.ArgumentParser, args: argparse.Namespace) -> Non
validate_args(parser, args) validate_args(parser, args)
try: try:
quick_commands(args) quick_commands(args)
mythril = set_config(args) config = set_config(args)
leveldb_search(mythril, args) leveldb_search(config, args)
dissasembler = MythrilDisassembler(
eth=config.eth,
solc_version=args.solv,
solc_args=args.solc_args,
enable_online_lookup=args.query_signature,
)
if args.truffle: if args.truffle:
try: try:
mythril.analyze_truffle_project(args) dissasembler.analyze_truffle_project(args)
except FileNotFoundError: except FileNotFoundError:
print( print(
"Build directory not found. Make sure that you start the analysis from the project root, and that 'truffle compile' has executed successfully." "Build directory not found. Make sure that you start the analysis from the project root, and that 'truffle compile' has executed successfully."
) )
sys.exit() sys.exit()
address = get_code(mythril, args) address = get_code(dissasembler, args)
execute_command(mythril=mythril, address=address, parser=parser, args=args) execute_command(
disassembler=dissasembler, address=address, parser=parser, args=args
)
except CriticalError as ce: except CriticalError as ce:
exit_with_error(args.outform, str(ce)) exit_with_error(args.outform, str(ce))
except Exception: except Exception:

@ -0,0 +1,4 @@
from .mythril_disassembler import MythrilDisassembler
from .mythril_analyzer import MythrilAnalyzer
from .mythril_config import MythrilConfig
from .mythril_leveldb import MythrilLevelDB

@ -0,0 +1,173 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import logging
import traceback
from typing import Optional, List
from . import MythrilDisassembler
from mythril.support.source_support import Source
from mythril.support.loader import DynLoader
from mythril.analysis.symbolic import SymExecWrapper
from mythril.analysis.callgraph import generate_graph
from mythril.analysis.traceexplore import get_serializable_statespace
from mythril.analysis.security import fire_lasers, retrieve_callback_issues
from mythril.analysis.report import Report, Issue
from mythril.ethereum.evmcontract import EVMContract
from mythril.laser.smt import SolverStatistics
from mythril.support.start_time import StartTime
log = logging.getLogger(__name__)
class MythrilAnalyzer:
"""
The Mythril Analyzer class
Responsible for the analysis of the smart contracts
"""
def __init__(
self,
disassembler: MythrilDisassembler,
requires_dynld: bool = False,
onchain_storage_access: bool = True,
strategy: str = "dfs",
address: Optional[str] = None,
max_depth: Optional[int] = None,
execution_timeout: Optional[int] = None,
create_timeout: Optional[int] = None,
enable_iprof: bool = False,
):
"""
:param disassembler: The MythrilDisassembler class
:param requires_dynld: whether dynamic loading should be done or not
:param onchain_storage_access: Whether onchain access should be done or not
"""
self.eth = disassembler.eth
self.contracts = disassembler.contracts or [] # type: List[EVMContract]
self.enable_online_lookup = disassembler.enable_online_lookup
self.dynld = requires_dynld
self.onchain_storage_access = onchain_storage_access
self.strategy = strategy
self.address = address
self.max_depth = max_depth
self.execution_timeout = execution_timeout
self.create_timeout = create_timeout
self.enable_iprof = enable_iprof
def dump_statespace(self, contract: EVMContract = None) -> str:
"""
Returns serializable statespace of the contract
:param contract: The Contract on which the analysis should be done
:return: The serialized state space
"""
sym = SymExecWrapper(
contract or self.contracts[0],
self.address,
self.strategy,
dynloader=DynLoader(
self.eth,
storage_loading=self.onchain_storage_access,
contract_loading=self.dynld,
),
max_depth=self.max_depth,
execution_timeout=self.execution_timeout,
create_timeout=self.create_timeout,
enable_iprof=self.enable_iprof,
)
return get_serializable_statespace(sym)
def graph_html(
self,
contract: EVMContract = None,
enable_physics: bool = False,
phrackify: bool = False,
transaction_count: Optional[int] = None,
) -> str:
"""
:param contract: The Contract on which the analysis should be done
:param enable_physics: If true then enables the graph physics simulation
:param phrackify: If true generates Phrack-style call graph
:param transaction_count: The amount of transactions to be executed
:return: The generated graph in html format
"""
sym = SymExecWrapper(
contract or self.contracts[0],
self.address,
self.strategy,
dynloader=DynLoader(
self.eth,
storage_loading=self.onchain_storage_access,
contract_loading=self.dynld,
),
max_depth=self.max_depth,
execution_timeout=self.execution_timeout,
transaction_count=transaction_count,
create_timeout=self.create_timeout,
enable_iprof=self.enable_iprof,
)
return generate_graph(sym, physics=enable_physics, phrackify=phrackify)
def fire_lasers(
self,
modules: Optional[List[str]] = None,
verbose_report: bool = False,
transaction_count: Optional[int] = None,
) -> Report:
"""
:param modules: The analysis modules which should be executed
:param verbose_report: Gives out the transaction sequence of the vulnerability
:param transaction_count: The amount of transactions to be executed
:return: The Report class which contains the all the issues/vulnerabilities
"""
all_issues = [] # type: List[Issue]
SolverStatistics().enabled = True
exceptions = []
for contract in self.contracts:
StartTime() # Reinitialize start time for new contracts
try:
sym = SymExecWrapper(
contract,
self.address,
self.strategy,
dynloader=DynLoader(
self.eth,
storage_loading=self.onchain_storage_access,
contract_loading=self.dynld,
),
max_depth=self.max_depth,
execution_timeout=self.execution_timeout,
create_timeout=self.create_timeout,
transaction_count=transaction_count,
modules=modules,
compulsory_statespace=False,
enable_iprof=self.enable_iprof,
)
issues = fire_lasers(sym, modules)
except KeyboardInterrupt:
log.critical("Keyboard Interrupt")
issues = retrieve_callback_issues(modules)
except Exception:
log.critical(
"Exception occurred, aborting analysis. Please report this issue to the Mythril GitHub page.\n"
+ traceback.format_exc()
)
issues = retrieve_callback_issues(modules)
exceptions.append(traceback.format_exc())
for issue in issues:
issue.add_code_info(contract)
all_issues += issues
log.info("Solver statistics: \n{}".format(str(SolverStatistics())))
source_data = Source()
source_data.get_source_from_contracts_list(self.contracts)
# Finally, output the results
report = Report(verbose_report, source_data, exceptions=exceptions)
for issue in all_issues:
report.append_issue(issue)
return report

@ -0,0 +1,226 @@
import codecs
import logging
import os
import platform
import re
from pathlib import Path
from shutil import copyfile
from configparser import ConfigParser
from typing import Optional
from mythril.exceptions import CriticalError
from mythril.ethereum.interface.rpc.client import EthJsonRpc
from mythril.ethereum.interface.leveldb.client import EthLevelDB
log = logging.getLogger(__name__)
class MythrilConfig:
"""
The Mythril Analyzer class
Responsible for setup of the mythril environment
"""
def __init__(self):
self.mythril_dir = self._init_mythril_dir()
self.config_path = os.path.join(self.mythril_dir, "config.ini")
self.leveldb_dir = None
self._init_config()
self.eth = None # type: Optional[EthJsonRpc]
self.eth_db = None # type: Optional[EthLevelDB]
@staticmethod
def _init_mythril_dir() -> str:
"""
Initializes the mythril dir and config.ini file
:return: The mythril dir's path
"""
try:
mythril_dir = os.environ["MYTHRIL_DIR"]
except KeyError:
mythril_dir = os.path.join(os.path.expanduser("~"), ".mythril")
if not os.path.exists(mythril_dir):
# Initialize data directory
log.info("Creating mythril data directory")
os.mkdir(mythril_dir)
db_path = str(Path(mythril_dir) / "signatures.db")
if not os.path.exists(db_path):
# if the default mythril dir doesn't contain a signature DB
# initialize it with the default one from the project root
asset_dir = Path(__file__).parent.parent / "support" / "assets"
copyfile(str(asset_dir / "signatures.db"), db_path)
return mythril_dir
def _init_config(self):
"""If no config file exists, create it and add default options.
Defaults:-
- Default LevelDB path is specified based on OS
- dynamic loading is set to infura by default in the file
This function also sets self.leveldb_dir path
"""
leveldb_default_path = self._get_default_leveldb_path()
if not os.path.exists(self.config_path):
log.info("No config file found. Creating default: " + self.config_path)
open(self.config_path, "a").close()
config = ConfigParser(allow_no_value=True)
config.optionxform = str
config.read(self.config_path, "utf-8")
if "defaults" not in config.sections():
self._add_default_options(config)
if not config.has_option("defaults", "leveldb_dir"):
self._add_leveldb_option(config, leveldb_default_path)
if not config.has_option("defaults", "dynamic_loading"):
self._add_dynamic_loading_option(config)
with codecs.open(self.config_path, "w", "utf-8") as fp:
config.write(fp)
leveldb_dir = config.get(
"defaults", "leveldb_dir", fallback=leveldb_default_path
)
self.leveldb_dir = os.path.expanduser(leveldb_dir)
@staticmethod
def _get_default_leveldb_path() -> str:
"""
Returns the LevelDB path
:return: The LevelDB path
"""
system = platform.system().lower()
leveldb_fallback_dir = os.path.expanduser("~")
if system.startswith("darwin"):
leveldb_fallback_dir = os.path.join(
leveldb_fallback_dir, "Library", "Ethereum"
)
elif system.startswith("windows"):
leveldb_fallback_dir = os.path.join(
leveldb_fallback_dir, "AppData", "Roaming", "Ethereum"
)
else:
leveldb_fallback_dir = os.path.join(leveldb_fallback_dir, ".ethereum")
return os.path.join(leveldb_fallback_dir, "geth", "chaindata")
@staticmethod
def _add_default_options(config: ConfigParser) -> None:
"""
Adds defaults option to config.ini
:param config: The config file object
:return: None
"""
config.add_section("defaults")
@staticmethod
def _add_leveldb_option(config: ConfigParser, leveldb_fallback_dir: str) -> None:
"""
Sets a default leveldb path in .mythril/config.ini file
:param config: The config file object
:param leveldb_fallback_dir: The leveldb dir to use by default for searches
:return: None
"""
config.set("defaults", "#Default chaindata locations:", "")
config.set("defaults", "#– Mac: ~/Library/Ethereum/geth/chaindata", "")
config.set("defaults", "#– Linux: ~/.ethereum/geth/chaindata", "")
config.set(
"defaults",
"#– Windows: %USERPROFILE%\\AppData\\Roaming\\Ethereum\\geth\\chaindata",
"",
)
config.set("defaults", "leveldb_dir", leveldb_fallback_dir)
@staticmethod
def _add_dynamic_loading_option(config: ConfigParser) -> None:
"""
Sets the dynamic loading config option in .mythril/config.ini file
:param config: The config file object
:return: None
"""
config.set(
"defaults", "#– To connect to Infura use dynamic_loading: infura", ""
)
config.set(
"defaults",
"#– To connect to Rpc use "
"dynamic_loading: HOST:PORT / ganache / infura-[network_name]",
"",
)
config.set(
"defaults", "#– To connect to local host use dynamic_loading: localhost", ""
)
config.set("defaults", "dynamic_loading", "infura")
def set_api_leveldb(self, leveldb_path: str) -> None:
"""
"""
self.eth_db = EthLevelDB(leveldb_path)
def set_api_rpc_infura(self) -> None:
"""Set the RPC mode to INFURA on Mainnet."""
log.info("Using INFURA Main Net for RPC queries")
self.eth = EthJsonRpc("mainnet.infura.io", 443, True)
def set_api_rpc(self, rpc: str = None, rpctls: bool = False) -> None:
"""
Sets the RPC mode to either of ganache or infura
:param rpc: either of the strings - ganache, infura-mainnet, infura-rinkeby, infura-kovan, infura-ropsten
"""
if rpc == "ganache":
rpcconfig = ("localhost", 8545, False)
else:
m = re.match(r"infura-(.*)", rpc)
if m and m.group(1) in ["mainnet", "rinkeby", "kovan", "ropsten"]:
rpcconfig = (m.group(1) + ".infura.io", 443, True)
else:
try:
host, port = rpc.split(":")
rpcconfig = (host, int(port), rpctls)
except ValueError:
raise CriticalError(
"Invalid RPC argument, use 'ganache', 'infura-[network]' or 'HOST:PORT'"
)
if rpcconfig:
log.info("Using RPC settings: %s" % str(rpcconfig))
self.eth = EthJsonRpc(rpcconfig[0], int(rpcconfig[1]), rpcconfig[2])
else:
raise CriticalError("Invalid RPC settings, check help for details.")
def set_api_rpc_localhost(self) -> None:
"""Set the RPC mode to a local instance."""
log.info("Using default RPC settings: http://localhost:8545")
self.eth = EthJsonRpc("localhost", 8545)
def set_api_from_config_path(self) -> None:
"""Set the RPC mode based on a given config file."""
config = ConfigParser(allow_no_value=False)
# TODO: Remove this after this issue https://github.com/python/mypy/issues/2427 is closed
config.optionxform = str # type:ignore
config.read(self.config_path, "utf-8")
if config.has_option("defaults", "dynamic_loading"):
dynamic_loading = config.get("defaults", "dynamic_loading")
else:
dynamic_loading = "infura"
self._set_rpc(dynamic_loading)
def _set_rpc(self, rpc_type: str) -> None:
"""
Sets rpc based on the type
:param rpc_type: The type of connection: like infura, ganache, localhost
:return:
"""
if rpc_type == "infura":
self.set_api_rpc_infura()
elif rpc_type == "localhost":
self.set_api_rpc_localhost()
else:
self.set_api_rpc(rpc_type)

@ -0,0 +1,303 @@
import logging
import re
import solc
import os
from ethereum import utils
from solc.exceptions import SolcError
from typing import List, Tuple, Optional
from mythril.ethereum import util
from mythril.ethereum.interface.rpc.client import EthJsonRpc
from mythril.exceptions import CriticalError, CompilerError, NoContractFoundError
from mythril.support import signatures
from mythril.support.truffle import analyze_truffle_project
from mythril.ethereum.evmcontract import EVMContract
from mythril.ethereum.interface.rpc.exceptions import ConnectionError
from mythril.solidity.soliditycontract import SolidityContract, get_contracts_from_file
log = logging.getLogger(__name__)
class MythrilDisassembler:
"""
The Mythril Disassembler class
Responsible for generating disassembly of smart contracts
- Compiles solc code from file/onchain
- Can also be used to access onchain storage data
"""
def __init__(
self,
eth: Optional[EthJsonRpc] = None,
solc_version: str = None,
solc_args: str = None,
enable_online_lookup: bool = False,
) -> None:
self.solc_binary = self._init_solc_binary(solc_version)
self.solc_args = solc_args
self.eth = eth
self.enable_online_lookup = enable_online_lookup
self.sigs = signatures.SignatureDB(enable_online_lookup=enable_online_lookup)
self.contracts = [] # type: List[EVMContract]
@staticmethod
def _init_solc_binary(version: str) -> str:
"""
Only proper versions are supported. No nightlies, commits etc (such as available in remix).
:param version: Version of the solc binary required
:return: The solc binary of the corresponding version
"""
if not version:
return os.environ.get("SOLC") or "solc"
# tried converting input to semver, seemed not necessary so just slicing for now
main_version = solc.main.get_solc_version_string()
main_version_number = re.match(r"\d+.\d+.\d+", main_version)
if main_version is None:
raise CriticalError(
"Could not extract solc version from string {}".format(main_version)
)
if version == main_version_number:
log.info("Given version matches installed version")
solc_binary = os.environ.get("SOLC") or "solc"
else:
solc_binary = util.solc_exists(version)
if solc_binary:
log.info("Given version is already installed")
else:
try:
solc.install_solc("v" + version)
solc_binary = util.solc_exists(version)
if not solc_binary:
raise SolcError()
except SolcError:
raise CriticalError(
"There was an error when trying to install the specified solc version"
)
log.info("Setting the compiler to %s", solc_binary)
return solc_binary
def load_from_bytecode(
self, code: str, bin_runtime: bool = False, address: Optional[str] = None
) -> Tuple[str, EVMContract]:
"""
Returns the address and the contract class for the given bytecode
:param code: Bytecode
:param bin_runtime: Whether the code is runtime code or creation code
:param address: address of contract
:return: tuple(address, Contract class)
"""
if address is None:
address = util.get_indexed_address(0)
if bin_runtime:
self.contracts.append(
EVMContract(
code=code,
name="MAIN",
enable_online_lookup=self.enable_online_lookup,
)
)
else:
self.contracts.append(
EVMContract(
creation_code=code,
name="MAIN",
enable_online_lookup=self.enable_online_lookup,
)
)
return address, self.contracts[-1] # return address and contract object
def load_from_address(self, address: str) -> Tuple[str, EVMContract]:
"""
Returns the contract given it's on chain address
:param address: The on chain address of a contract
:return: tuple(address, contract)
"""
if not re.match(r"0x[a-fA-F0-9]{40}", address):
raise CriticalError("Invalid contract address. Expected format is '0x...'.")
try:
code = self.eth.eth_getCode(address)
except FileNotFoundError as e:
raise CriticalError("IPC error: " + str(e))
except ConnectionError:
raise CriticalError(
"Could not connect to RPC server. Make sure that your node is running and that RPC parameters are set correctly."
)
except Exception as e:
raise CriticalError("IPC / RPC error: " + str(e))
if code == "0x" or code == "0x0":
raise CriticalError(
"Received an empty response from eth_getCode. Check the contract address and verify that you are on the correct chain."
)
else:
self.contracts.append(
EVMContract(
code, name=address, enable_online_lookup=self.enable_online_lookup
)
)
return address, self.contracts[-1] # return address and contract object
def load_from_solidity(
self, solidity_files: List[str]
) -> Tuple[str, List[SolidityContract]]:
"""
:param solidity_files: List of solidity_files
:return: tuple of address, contract class list
"""
address = util.get_indexed_address(0)
contracts = []
for file in solidity_files:
if ":" in file:
file, contract_name = file.split(":")
else:
contract_name = None
file = os.path.expanduser(file)
try:
# import signatures from solidity source
self.sigs.import_solidity_file(
file, solc_binary=self.solc_binary, solc_args=self.solc_args
)
if contract_name is not None:
contract = SolidityContract(
input_file=file,
name=contract_name,
solc_args=self.solc_args,
solc_binary=self.solc_binary,
)
self.contracts.append(contract)
contracts.append(contract)
else:
for contract in get_contracts_from_file(
input_file=file,
solc_args=self.solc_args,
solc_binary=self.solc_binary,
):
self.contracts.append(contract)
contracts.append(contract)
except FileNotFoundError:
raise CriticalError("Input file not found: " + file)
except CompilerError as e:
raise CriticalError(e)
except NoContractFoundError:
log.error(
"The file " + file + " does not contain a compilable contract."
)
return address, contracts
def analyze_truffle_project(self, *args, **kwargs) -> None:
"""
:param args:
:param kwargs:
:return:
"""
analyze_truffle_project(
self.sigs, *args, **kwargs
) # just passthru by passing signatures for now
@staticmethod
def hash_for_function_signature(func: str) -> str:
"""
Return function names corresponding signature hash
:param func: function name
:return: Its hash signature
"""
return "0x%s" % utils.sha3(func)[:4].hex()
def get_state_variable_from_storage(
self, address: str, params: Optional[List[str]] = None
) -> str:
"""
Get variables from the storage
:param address: The contract address
:param params: The list of parameters
param types: [position, length] or ["mapping", position, key1, key2, ... ]
or [position, length, array]
:return: The corresponding storage slot and its value
"""
params = params or []
(position, length, mappings) = (0, 1, [])
try:
if params[0] == "mapping":
if len(params) < 3:
raise CriticalError("Invalid number of parameters.")
position = int(params[1])
position_formatted = utils.zpad(utils.int_to_big_endian(position), 32)
for i in range(2, len(params)):
key = bytes(params[i], "utf8")
key_formatted = utils.rzpad(key, 32)
mappings.append(
int.from_bytes(
utils.sha3(key_formatted + position_formatted),
byteorder="big",
)
)
length = len(mappings)
if length == 1:
position = mappings[0]
else:
if len(params) >= 4:
raise CriticalError("Invalid number of parameters.")
if len(params) >= 1:
position = int(params[0])
if len(params) >= 2:
length = int(params[1])
if len(params) == 3 and params[2] == "array":
position_formatted = utils.zpad(
utils.int_to_big_endian(position), 32
)
position = int.from_bytes(
utils.sha3(position_formatted), byteorder="big"
)
except ValueError:
raise CriticalError(
"Invalid storage index. Please provide a numeric value."
)
outtxt = []
try:
if length == 1:
outtxt.append(
"{}: {}".format(
position, self.eth.eth_getStorageAt(address, position)
)
)
else:
if len(mappings) > 0:
for i in range(0, len(mappings)):
position = mappings[i]
outtxt.append(
"{}: {}".format(
hex(position),
self.eth.eth_getStorageAt(address, position),
)
)
else:
for i in range(position, position + length):
outtxt.append(
"{}: {}".format(
hex(i), self.eth.eth_getStorageAt(address, i)
)
)
except FileNotFoundError as e:
raise CriticalError("IPC error: " + str(e))
except ConnectionError:
raise CriticalError(
"Could not connect to RPC server. "
"Make sure that your node is running and that RPC parameters are set correctly."
)
return "\n".join(outtxt)

@ -0,0 +1,49 @@
import re
from mythril.exceptions import CriticalError
class MythrilLevelDB:
"""
Class which does search operations on leveldb
There are two DBs
1) Key value pairs of hashes and it's corresponding address
2) The LevelDB Trie
"""
def __init__(self, leveldb):
"""
:param leveldb: Leveldb path
"""
self.leveldb = leveldb
def search_db(self, search):
"""
Searches the corresponding code
:param search: The code part to be searched
"""
def search_callback(_, address, balance):
"""
:param _:
:param address: The address of the contract with the code in search
:param balance: The balance of the corresponding contract
"""
print("Address: " + address + ", balance: " + str(balance))
try:
self.leveldb.search(search, search_callback)
except SyntaxError:
raise CriticalError("Syntax error in search expression.")
def contract_hash_to_address(self, contract_hash):
"""
Returns address of the corresponding hash by searching the leveldb
:param contract_hash: Hash to be searched
"""
if not re.match(r"0x[a-fA-F0-9]{64}", contract_hash):
raise CriticalError("Invalid address hash. Expected format is '0x...'.")
print(self.leveldb.contract_hash_to_address(contract_hash))

@ -1,5 +1,5 @@
from mythril.analysis.callgraph import generate_graph from mythril.analysis.callgraph import generate_graph
from mythril.analysis.symbolic import SymExecWrapper from mythril.mythril import MythrilAnalyzer, MythrilDisassembler
from mythril.ethereum import util from mythril.ethereum import util
from mythril.solidity.soliditycontract import EVMContract from mythril.solidity.soliditycontract import EVMContract
from tests import ( from tests import (
@ -22,16 +22,17 @@ class GraphTest(BaseTestCase):
) )
contract = EVMContract(input_file.read_text()) contract = EVMContract(input_file.read_text())
disassembler = MythrilDisassembler()
sym = SymExecWrapper( disassembler.contracts.append(contract)
contract, analyzer = MythrilAnalyzer(
address=(util.get_indexed_address(0)), disassembler=disassembler,
strategy="dfs", strategy="dfs",
transaction_count=1,
execution_timeout=5, execution_timeout=5,
max_depth=30,
address=(util.get_indexed_address(0)),
) )
html = generate_graph(sym) html = analyzer.graph_html(transaction_count=1)
output_current.write_text(html) output_current.write_text(html)
lines_expected = re.findall( lines_expected = re.findall(

@ -1,4 +1,4 @@
from mythril.mythril import Mythril from mythril.mythril import MythrilDisassembler
from mythril.laser.ethereum.transaction import execute_contract_creation from mythril.laser.ethereum.transaction import execute_contract_creation
from mythril.ethereum import util from mythril.ethereum import util
import mythril.laser.ethereum.svm as svm import mythril.laser.ethereum.svm as svm
@ -13,7 +13,7 @@ from mythril.analysis.symbolic import SymExecWrapper
def test_create(): def test_create():
contract = SolidityContract( contract = SolidityContract(
str(tests.TESTDATA_INPUTS_CONTRACTS / "calls.sol"), str(tests.TESTDATA_INPUTS_CONTRACTS / "calls.sol"),
solc_binary=Mythril._init_solc_binary("0.5.0"), solc_binary=MythrilDisassembler._init_solc_binary("0.5.0"),
) )
laser_evm = svm.LaserEVM({}) laser_evm = svm.LaserEVM({})
@ -37,7 +37,7 @@ def test_create():
def test_sym_exec(): def test_sym_exec():
contract = SolidityContract( contract = SolidityContract(
str(tests.TESTDATA_INPUTS_CONTRACTS / "calls.sol"), str(tests.TESTDATA_INPUTS_CONTRACTS / "calls.sol"),
solc_binary=Mythril._init_solc_binary("0.5.0"), solc_binary=MythrilDisassembler._init_solc_binary("0.5.0"),
) )
sym = SymExecWrapper( sym = SymExecWrapper(

@ -0,0 +1,31 @@
from pathlib import Path
from mythril.mythril import MythrilDisassembler, MythrilAnalyzer
from mythril.analysis.report import Issue
from mock import patch
@patch("mythril.analysis.report.Issue.add_code_info", return_value=None)
@patch(
"mythril.mythril.mythril_analyzer.fire_lasers",
return_value=[Issue("", "", "234", "101", "title", "0x02445")],
)
@patch("mythril.mythril.mythril_analyzer.SymExecWrapper", return_value=None)
def test_fire_lasers(mock_sym, mock_fire_lasers, mock_code_info):
disassembler = MythrilDisassembler(eth=None)
disassembler.load_from_solidity(
[
str(
(
Path(__file__).parent.parent / "testdata/input_contracts/origin.sol"
).absolute()
)
]
)
analyzer = MythrilAnalyzer(disassembler, strategy="dfs")
issues = analyzer.fire_lasers(modules=[]).sorted_issues()
mock_sym.assert_called()
mock_fire_lasers.assert_called()
mock_code_info.assert_called()
assert len(issues) == 1
assert issues[0]["swc-id"] == "101"

@ -0,0 +1,58 @@
import pytest
from configparser import ConfigParser
from pathlib import Path
from mythril.mythril import MythrilConfig
from mythril.exceptions import CriticalError
def test_config_path_dynloading():
config = MythrilConfig()
config.config_path = str(
Path(__file__).parent.parent / "testdata/mythril_config_inputs/config.ini"
)
config.set_api_from_config_path()
assert config.eth.host == "mainnet.infura.io"
assert config.eth.port == 443
rpc_types_tests = [
("infura", "mainnet.infura.io", 443, True),
("ganache", "localhost", 8545, True),
("infura-rinkeby", "rinkeby.infura.io", 443, True),
("infura-ropsten", "ropsten.infura.io", 443, True),
("infura-kovan", "kovan.infura.io", 443, True),
("localhost", "localhost", 8545, True),
("localhost:9022", "localhost", 9022, True),
("pinfura", None, None, False),
("infura-finkeby", None, None, False),
]
@pytest.mark.parametrize("rpc_type,host,port,success", rpc_types_tests)
def test_set_rpc(rpc_type, host, port, success):
config = MythrilConfig()
if success:
config._set_rpc(rpc_type)
assert config.eth.host == host
assert config.eth.port == port
else:
with pytest.raises(CriticalError):
config._set_rpc(rpc_type)
def test_leveldb_config_addition():
config = ConfigParser()
config.add_section("defaults")
MythrilConfig._add_leveldb_option(config, "test")
assert config.has_section("defaults")
assert config.get("defaults", "leveldb_dir") == "test"
def test_dynld_config_addition():
config = ConfigParser()
config.add_section("defaults")
MythrilConfig._add_dynamic_loading_option(config)
assert config.has_section("defaults")
assert config.get("defaults", "dynamic_loading") == "infura"

@ -0,0 +1,70 @@
import pytest
from mythril.mythril import MythrilConfig, MythrilDisassembler
from mythril.exceptions import CriticalError
storage_test = [
(
["438767356", "3"],
[
"0x1a270efc: 0x0000000000000000000000000000000000000000000000000000000000000000",
"0x1a270efd: 0x0000000000000000000000000000000000000000000000000000000000000000",
"0x1a270efe: 0x0000000000000000000000000000000000000000000000000000000000000000",
],
),
(
["mapping", "4588934759847", "1", "2"],
[
"0x7e523d5aeb10cdb378b0b1f76138c28063a2cb9ec8ff710f42a0972f4d53cf44: "
"0x0000000000000000000000000000000000000000000000000000000000000000",
"0xba36da34ceec88853a2ebdde88e023c6919b90348f41e8905b422dc9ce22301c: "
"0x0000000000000000000000000000000000000000000000000000000000000000",
],
),
(
["mapping", "4588934759847", "10"],
[
"45998575720532480608987132552042185415362901038635143236141343153058112000553: "
"0x0000000000000000000000000000000000000000000000000000000000000000"
],
),
(
["4588934759847", "1", "array"],
[
"30699902832541380821728647136767910246735388184559883985790189062258823875816: "
"0x0000000000000000000000000000000000000000000000000000000000000000"
],
),
]
@pytest.mark.parametrize("params,ans", storage_test)
def test_get_data_from_storage(params, ans):
config = MythrilConfig()
config.set_api_rpc_infura()
disassembler = MythrilDisassembler(eth=config.eth, solc_version="0.4.23")
outtext = disassembler.get_state_variable_from_storage(
"0x76799f77587738bfeef09452df215b63d2cfb08a", params
).split("\n")
assert outtext == ans
storage_test_incorrect_params = [
(["1", "2", "3", "4"]),
(["mapping", "1"]),
(["a", "b", "c"]),
]
@pytest.mark.parametrize("params", storage_test_incorrect_params)
def test_get_data_from_storage_incorrect_params(params):
config = MythrilConfig()
config.set_api_rpc_infura()
disassembler = MythrilDisassembler(eth=config.eth, solc_version="0.4.23")
with pytest.raises(CriticalError):
disassembler.get_state_variable_from_storage(
"0x76799f77587738bfeef09452df215b63d2cfb08a", params
)
def test_solc_install():
MythrilDisassembler(eth=None, solc_version="0.4.19")

@ -0,0 +1,51 @@
import io
import pytest
from contextlib import redirect_stdout
from mock import patch
from mythril.mythril import MythrilLevelDB, MythrilConfig
from mythril.exceptions import CriticalError
@patch("mythril.ethereum.interface.leveldb.client.EthLevelDB.search")
@patch("mythril.ethereum.interface.leveldb.client.ETH_DB", return_value=None)
@patch("mythril.ethereum.interface.leveldb.client.LevelDBReader", return_value=None)
@patch("mythril.ethereum.interface.leveldb.client.LevelDBWriter", return_value=None)
def test_leveldb_code_search(mock_leveldb, f1, f2, f3):
config = MythrilConfig()
config.set_api_leveldb("some path")
leveldb_search = MythrilLevelDB(leveldb=config.eth_db)
leveldb_search.search_db("code#PUSH#")
mock_leveldb.assert_called()
@patch("mythril.ethereum.interface.leveldb.client.ETH_DB", return_value=None)
@patch("mythril.ethereum.interface.leveldb.client.LevelDBReader", return_value=None)
@patch("mythril.ethereum.interface.leveldb.client.LevelDBWriter", return_value=None)
def test_leveldb_hash_search_incorrect_input(f1, f2, f3):
config = MythrilConfig()
config.set_api_leveldb("some path")
leveldb_search = MythrilLevelDB(leveldb=config.eth_db)
with pytest.raises(CriticalError):
leveldb_search.contract_hash_to_address("0x23")
@patch(
"mythril.ethereum.interface.leveldb.client.EthLevelDB.contract_hash_to_address",
return_value="0xddbb615cb2ffaff7233d8a6f3601621de94795e1",
)
@patch("mythril.ethereum.interface.leveldb.client.ETH_DB", return_value=None)
@patch("mythril.ethereum.interface.leveldb.client.LevelDBReader", return_value=None)
@patch("mythril.ethereum.interface.leveldb.client.LevelDBWriter", return_value=None)
def test_leveldb_hash_search_correct_input(mock_hash_to_address, f1, f2, f3):
config = MythrilConfig()
config.set_api_leveldb("some path")
leveldb_search = MythrilLevelDB(leveldb=config.eth_db)
f = io.StringIO()
with redirect_stdout(f):
leveldb_search.contract_hash_to_address(
"0x0464e651bcc40de28fc7fcde269218d16850bac9689da5f4a6bd640fd3cdf6aa"
)
out = f.getvalue()
mock_hash_to_address.assert_called()
assert out == "0xddbb615cb2ffaff7233d8a6f3601621de94795e1\n"

@ -1,5 +1,5 @@
from mythril.solidity.soliditycontract import SolidityContract from mythril.solidity.soliditycontract import SolidityContract
from mythril.mythril import Mythril from mythril.mythril import MythrilDisassembler
from mythril.laser.ethereum.state.account import Account from mythril.laser.ethereum.state.account import Account
from mythril.laser.ethereum.state.machine_state import MachineState from mythril.laser.ethereum.state.machine_state import MachineState
from mythril.laser.ethereum.state.global_state import GlobalState from mythril.laser.ethereum.state.global_state import GlobalState
@ -84,7 +84,8 @@ class NativeTests(BaseTestCase):
def runTest(): def runTest():
"""""" """"""
disassembly = SolidityContract( disassembly = SolidityContract(
"./tests/native_tests.sol", solc_binary=Mythril._init_solc_binary("0.5.0") "./tests/native_tests.sol",
solc_binary=MythrilDisassembler._init_solc_binary("0.5.0"),
).disassembly ).disassembly
account = Account("0x0000000000000000000000000000000000000000", disassembly) account = Account("0x0000000000000000000000000000000000000000", disassembly)
accounts = {account.address: account} accounts = {account.address: account}

@ -1,6 +1,6 @@
from pathlib import Path from pathlib import Path
from mythril.mythril import Mythril from mythril.mythril import MythrilDisassembler
from mythril.solidity.soliditycontract import SolidityContract from mythril.solidity.soliditycontract import SolidityContract
from tests import BaseTestCase from tests import BaseTestCase
@ -11,7 +11,7 @@ class SolidityContractTest(BaseTestCase):
def test_get_source_info_without_name_gets_latest_contract_info(self): def test_get_source_info_without_name_gets_latest_contract_info(self):
input_file = TEST_FILES / "multi_contracts.sol" input_file = TEST_FILES / "multi_contracts.sol"
contract = SolidityContract( contract = SolidityContract(
str(input_file), solc_binary=Mythril._init_solc_binary("0.5.0") str(input_file), solc_binary=MythrilDisassembler._init_solc_binary("0.5.0")
) )
code_info = contract.get_source_info(142) code_info = contract.get_source_info(142)
@ -25,7 +25,7 @@ class SolidityContractTest(BaseTestCase):
contract = SolidityContract( contract = SolidityContract(
str(input_file), str(input_file),
name="Transfer1", name="Transfer1",
solc_binary=Mythril._init_solc_binary("0.5.0"), solc_binary=MythrilDisassembler._init_solc_binary("0.5.0"),
) )
code_info = contract.get_source_info(142) code_info = contract.get_source_info(142)
@ -39,7 +39,7 @@ class SolidityContractTest(BaseTestCase):
contract = SolidityContract( contract = SolidityContract(
str(input_file), str(input_file),
name="AssertFail", name="AssertFail",
solc_binary=Mythril._init_solc_binary("0.5.0"), solc_binary=MythrilDisassembler._init_solc_binary("0.5.0"),
) )
code_info = contract.get_source_info(70, constructor=True) code_info = contract.get_source_info(70, constructor=True)

@ -0,0 +1,2 @@
[defaults]
dynamic_loading = infura

@ -12,6 +12,7 @@ commands =
mkdir -p {toxinidir}/tests/testdata/outputs_current_laser_result/ mkdir -p {toxinidir}/tests/testdata/outputs_current_laser_result/
py.test -v \ py.test -v \
--junitxml={toxworkdir}/output/{envname}/junit.xml \ --junitxml={toxworkdir}/output/{envname}/junit.xml \
--disable-pytest-warnings \
{posargs} {posargs}
[testenv:py36] [testenv:py36]
@ -35,6 +36,7 @@ commands =
--cov-report=xml:{toxworkdir}/output/{envname}/coverage.xml \ --cov-report=xml:{toxworkdir}/output/{envname}/coverage.xml \
--cov-report=html:{toxworkdir}/output/{envname}/covhtml \ --cov-report=html:{toxworkdir}/output/{envname}/covhtml \
--junitxml={toxworkdir}/output/{envname}/junit.xml \ --junitxml={toxworkdir}/output/{envname}/junit.xml \
--disable-pytest-warnings \
{posargs} {posargs}

Loading…
Cancel
Save