Merge branch 'develop' into truffle/warning

pull/951/head
JoranHonig 6 years ago committed by GitHub
commit 89bdd91725
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      README.md
  2. 16
      mythril/analysis/report.py
  3. 389
      mythril/interfaces/cli.py
  4. 5
      mythril/mythril.py

@ -12,9 +12,10 @@
[![Sonarcloud - Maintainability](https://sonarcloud.io/api/project_badges/measure?project=mythril&metric=sqale_rating)](https://sonarcloud.io/dashboard?id=mythril)
[![Downloads](https://pepy.tech/badge/mythril)](https://pepy.tech/project/mythril)
Mythril Classic is an open-source security analysis tool for Ethereum smart contracts. It uses symbolic analysis, taint analysis and control flow checking to detect a variety of security vulnerabilities.
Mythril Classic is an open-source security analysis tool for Ethereum smart contracts. It uses symbolic analysis, taint analysis and control flow checking to detect a variety of security vulnerabilities. It's also an experimental tool designed for security pros. If you a smart contract developer you might prefer smoother tools such as:
This is an experimental tool designed for security guys. If you a smart contract developer you might prefer [Truffle Security](https://github.com/ConsenSys/truffle-security) or other convenient tools built on the [MythX API](https://mythx.io).
- [Mythos](https://github.com/cleanunicorn/mythos)
- [Truffle Security](https://github.com/ConsenSys/truffle-security)
Whether you want to contribute, need support, or want to learn what we have cooking for the future, our [Discord server](https://discord.gg/E3YrVtG) will serve your needs.

@ -3,6 +3,7 @@ import logging
import json
import operator
from jinja2 import PackageLoader, Environment
from typing import Dict, List
import _pysha3 as sha3
import hashlib
@ -130,7 +131,7 @@ class Report:
loader=PackageLoader("mythril.analysis"), trim_blocks=True
)
def __init__(self, verbose=False, source=None):
def __init__(self, verbose=False, source=None, exceptions=None):
"""
:param verbose:
@ -140,6 +141,7 @@ class Report:
self.solc_version = ""
self.meta = {}
self.source = source or Source()
self.exceptions = exceptions or []
def sorted_issues(self):
"""
@ -177,6 +179,14 @@ class Report:
result = {"success": True, "error": None, "issues": self.sorted_issues()}
return json.dumps(result, sort_keys=True)
def _get_exception_data(self) -> dict:
if not self.exceptions:
return {}
logs = [] # type: List[Dict]
for exception in self.exceptions:
logs += [{"level": "error", "hidden": "true", "error": exception}]
return {"logs": logs}
def as_swc_standard_format(self):
"""Format defined for integration and correlation.
@ -211,14 +221,14 @@ class Report:
"extra": {},
}
)
meta_data = self._get_exception_data()
result = [
{
"issues": _issues,
"sourceType": "raw-bytecode",
"sourceFormat": "evm-byzantium-bytecode",
"sourceList": source_list,
"meta": {},
"meta": meta_data,
}
]

@ -10,6 +10,7 @@ import json
import logging
import os
import sys
import traceback
import coloredlogs
@ -31,17 +32,47 @@ def exit_with_error(format_, message):
"""
if format_ == "text" or format_ == "markdown":
log.error(message)
else:
elif format_ == "json":
result = {"success": False, "error": str(message), "issues": []}
print(json.dumps(result))
else:
result = [
{
"issues": [],
"sourceType": "",
"sourceFormat": "",
"sourceList": [],
"meta": {
"logs": [{"level": "error", "hidden": "true", "error": message}]
},
}
]
print(json.dumps(result))
sys.exit()
def main():
def main() -> None:
"""The main CLI interface entry point."""
parser = argparse.ArgumentParser(
description="Security analysis of Ethereum smart contracts"
)
create_parser(parser)
# Get config values
args = parser.parse_args()
parse_args(parser=parser, args=args)
if __name__ == "__main__":
main()
def create_parser(parser: argparse.ArgumentParser) -> None:
"""
Creates the parser by setting all the possible arguments
:param parser: The parser
"""
parser.add_argument("solidity_file", nargs="*")
commands = parser.add_argument_group("commands")
@ -227,25 +258,8 @@ def main():
)
parser.add_argument("--epic", action="store_true", help=argparse.SUPPRESS)
# Get config values
args = parser.parse_args()
if args.epic:
path = os.path.dirname(os.path.realpath(__file__))
sys.argv.remove("--epic")
os.system(" ".join(sys.argv) + " | python3 " + path + "/epic.py")
sys.exit()
if args.version:
if args.outform == "json":
print(json.dumps({"version_str": VERSION}))
else:
print("Mythril version {}".format(VERSION))
sys.exit()
# Parse cmdline args
def validate_args(parser: argparse.ArgumentParser, args: argparse.Namespace):
if not (
args.search
or args.hash
@ -298,177 +312,119 @@ def main():
"--enable-iprof must be used with one of -g, --graph, -x, --fire-lasers, -j and --statespace-json",
)
# -- commands --
def quick_commands(args: argparse.Namespace):
if args.hash:
print(Mythril.hash_for_function_signature(args.hash))
sys.exit()
try:
# the mythril object should be our main interface
# infura = None, rpc = None, rpctls = None
# solc_args = None, dynld = None, max_recursion_depth = 12):
mythril = Mythril(
solv=args.solv,
dynld=args.dynld,
onchain_storage_access=(not args.no_onchain_storage_access),
solc_args=args.solc_args,
enable_online_lookup=args.query_signature,
def set_config(args: argparse.Namespace):
mythril = Mythril(
solv=args.solv,
dynld=args.dynld,
onchain_storage_access=(not args.no_onchain_storage_access),
solc_args=args.solc_args,
enable_online_lookup=args.query_signature,
)
if args.dynld or not args.no_onchain_storage_access and not (args.rpc or args.i):
mythril.set_api_from_config_path()
if args.address:
# Establish RPC connection if necessary
mythril.set_api_rpc(rpc=args.rpc, rpctls=args.rpctls)
elif args.search or args.contract_hash_to_address:
# Open LevelDB if necessary
mythril.set_api_leveldb(
mythril.leveldb_dir if not args.leveldb_dir else args.leveldb_dir
)
if (
args.dynld
or not args.no_onchain_storage_access
and not (args.rpc or args.i)
):
mythril.set_api_from_config_path()
if args.address:
# Establish RPC connection if necessary
mythril.set_api_rpc(rpc=args.rpc, rpctls=args.rpctls)
elif args.search or args.contract_hash_to_address:
# Open LevelDB if necessary
mythril.set_api_leveldb(
mythril.leveldb_dir if not args.leveldb_dir else args.leveldb_dir
)
return mythril
if args.search:
# Database search ops
mythril.search_db(args.search)
sys.exit()
if args.contract_hash_to_address:
# search corresponding address
try:
mythril.contract_hash_to_address(args.contract_hash_to_address)
except AddressNotFoundError:
print("Address not found.")
def leveldb_search(mythril: Mythril, args: argparse.Namespace):
if args.search:
# Database search ops
mythril.search_db(args.search)
sys.exit()
sys.exit()
if args.contract_hash_to_address:
# search corresponding address
try:
mythril.contract_hash_to_address(args.contract_hash_to_address)
except AddressNotFoundError:
print("Address not found.")
if args.truffle:
try:
# not really pythonic atm. needs refactoring
mythril.analyze_truffle_project(args)
except FileNotFoundError:
print(
"Build directory not found. Make sure that you start the analysis from the project root, and that 'truffle compile' has executed successfully."
)
sys.exit()
sys.exit()
# Load / compile input contracts
address = None
if args.code:
# Load from bytecode
code = args.code[2:] if args.code.startswith("0x") else args.code
address, _ = mythril.load_from_bytecode(code, args.bin_runtime)
elif args.codefile:
bytecode = "".join([l.strip() for l in args.codefile if len(l.strip()) > 0])
bytecode = bytecode[2:] if bytecode.startswith("0x") else bytecode
address, _ = mythril.load_from_bytecode(bytecode, args.bin_runtime)
elif args.address:
# Get bytecode from a contract address
address, _ = mythril.load_from_address(args.address)
elif args.solidity_file:
# Compile Solidity source file(s)
if args.graph and len(args.solidity_file) > 1:
exit_with_error(
args.outform,
"Cannot generate call graphs from multiple input files. Please do it one at a time.",
)
address, _ = mythril.load_from_solidity(args.solidity_file) # list of files
else:
def get_code(mythril: Mythril, args: argparse.Namespace):
address = None
if args.code:
# Load from bytecode
code = args.code[2:] if args.code.startswith("0x") else args.code
address, _ = mythril.load_from_bytecode(code, args.bin_runtime)
elif args.codefile:
bytecode = "".join([l.strip() for l in args.codefile if len(l.strip()) > 0])
bytecode = bytecode[2:] if bytecode.startswith("0x") else bytecode
address, _ = mythril.load_from_bytecode(bytecode, args.bin_runtime)
elif args.address:
# Get bytecode from a contract address
address, _ = mythril.load_from_address(args.address)
elif args.solidity_file:
# Compile Solidity source file(s)
if args.graph and len(args.solidity_file) > 1:
exit_with_error(
args.outform,
"No input bytecode. Please provide EVM code via -c BYTECODE, -a ADDRESS, or -i SOLIDITY_FILES",
"Cannot generate call graphs from multiple input files. Please do it one at a time.",
)
address, _ = mythril.load_from_solidity(args.solidity_file) # list of files
else:
exit_with_error(
args.outform,
"No input bytecode. Please provide EVM code via -c BYTECODE, -a ADDRESS, or -i SOLIDITY_FILES",
)
return address
# Commands
if args.storage:
if not args.address:
exit_with_error(
args.outform,
"To read storage, provide the address of a deployed contract with the -a option.",
)
storage = mythril.get_state_variable_from_storage(
address=address,
params=[a.strip() for a in args.storage.strip().split(",")],
def execute_command(
mythril: Mythril,
address: str,
parser: argparse.ArgumentParser,
args: argparse.Namespace,
):
if args.storage:
if not args.address:
exit_with_error(
args.outform,
"To read storage, provide the address of a deployed contract with the -a option.",
)
print(storage)
elif args.disassemble:
# or mythril.disassemble(mythril.contracts[0])
if mythril.contracts[0].code:
print("Runtime Disassembly: \n" + mythril.contracts[0].get_easm())
if mythril.contracts[0].creation_code:
print("Disassembly: \n" + mythril.contracts[0].get_creation_easm())
storage = mythril.get_state_variable_from_storage(
address=address, params=[a.strip() for a in args.storage.strip().split(",")]
)
print(storage)
elif args.graph or args.fire_lasers:
if not mythril.contracts:
exit_with_error(
args.outform, "input files do not contain any valid contracts"
)
elif args.disassemble:
# or mythril.disassemble(mythril.contracts[0])
if args.graph:
html = mythril.graph_html(
strategy=args.strategy,
contract=mythril.contracts[0],
address=address,
enable_physics=args.enable_physics,
phrackify=args.phrack,
max_depth=args.max_depth,
execution_timeout=args.execution_timeout,
create_timeout=args.create_timeout,
enable_iprof=args.enable_iprof,
)
if mythril.contracts[0].code:
print("Runtime Disassembly: \n" + mythril.contracts[0].get_easm())
if mythril.contracts[0].creation_code:
print("Disassembly: \n" + mythril.contracts[0].get_creation_easm())
try:
with open(args.graph, "w") as f:
f.write(html)
except Exception as e:
exit_with_error(args.outform, "Error saving graph: " + str(e))
else:
try:
report = mythril.fire_lasers(
strategy=args.strategy,
address=address,
modules=[m.strip() for m in args.modules.strip().split(",")]
if args.modules
else [],
verbose_report=args.verbose_report,
max_depth=args.max_depth,
execution_timeout=args.execution_timeout,
create_timeout=args.create_timeout,
transaction_count=args.transaction_count,
enable_iprof=args.enable_iprof,
)
outputs = {
"json": report.as_json(),
"jsonv2": report.as_swc_standard_format(),
"text": report.as_text(),
"markdown": report.as_markdown(),
}
print(outputs[args.outform])
except ModuleNotFoundError as e:
exit_with_error(
args.outform, "Error loading analyis modules: " + format(e)
)
elif args.statespace_json:
if not mythril.contracts:
exit_with_error(
args.outform, "input files do not contain any valid contracts"
)
elif args.graph or args.fire_lasers:
if not mythril.contracts:
exit_with_error(
args.outform, "input files do not contain any valid contracts"
)
statespace = mythril.dump_statespace(
if args.graph:
html = mythril.graph_html(
strategy=args.strategy,
contract=mythril.contracts[0],
address=address,
enable_physics=args.enable_physics,
phrackify=args.phrack,
max_depth=args.max_depth,
execution_timeout=args.execution_timeout,
create_timeout=args.create_timeout,
@ -476,16 +432,107 @@ def main():
)
try:
with open(args.statespace_json, "w") as f:
json.dump(statespace, f)
with open(args.graph, "w") as f:
f.write(html)
except Exception as e:
exit_with_error(args.outform, "Error saving json: " + str(e))
exit_with_error(args.outform, "Error saving graph: " + str(e))
else:
parser.print_help()
try:
report = mythril.fire_lasers(
strategy=args.strategy,
address=address,
modules=[m.strip() for m in args.modules.strip().split(",")]
if args.modules
else [],
verbose_report=args.verbose_report,
max_depth=args.max_depth,
execution_timeout=args.execution_timeout,
create_timeout=args.create_timeout,
transaction_count=args.transaction_count,
enable_iprof=args.enable_iprof,
)
outputs = {
"json": report.as_json(),
"jsonv2": report.as_swc_standard_format(),
"text": report.as_text(),
"markdown": report.as_markdown(),
}
print(outputs[args.outform])
except ModuleNotFoundError as e:
exit_with_error(
args.outform, "Error loading analyis modules: " + format(e)
)
elif args.statespace_json:
if not mythril.contracts:
exit_with_error(
args.outform, "input files do not contain any valid contracts"
)
statespace = mythril.dump_statespace(
strategy=args.strategy,
contract=mythril.contracts[0],
address=address,
max_depth=args.max_depth,
execution_timeout=args.execution_timeout,
create_timeout=args.create_timeout,
enable_iprof=args.enable_iprof,
)
try:
with open(args.statespace_json, "w") as f:
json.dump(statespace, f)
except Exception as e:
exit_with_error(args.outform, "Error saving json: " + str(e))
else:
parser.print_help()
def parse_args(parser: argparse.ArgumentParser, args: argparse.Namespace) -> None:
"""
Parses the arguments
:param parser: The parser
:param args: The args
"""
if args.epic:
path = os.path.dirname(os.path.realpath(__file__))
sys.argv.remove("--epic")
os.system(" ".join(sys.argv) + " | python3 " + path + "/epic.py")
sys.exit()
if args.version:
if args.outform == "json":
print(json.dumps({"version_str": VERSION}))
else:
print("Mythril version {}".format(VERSION))
sys.exit()
# Parse cmdline args
validate_args(parser, args)
try:
quick_commands(args)
mythril = set_config(args)
leveldb_search(mythril, args)
if args.truffle:
try:
mythril.analyze_truffle_project(args)
except FileNotFoundError:
print(
"Build directory not found. Make sure that you start the analysis from the project root, and that 'truffle compile' has executed successfully."
)
sys.exit()
address = get_code(mythril, args)
execute_command(mythril=mythril, address=address, parser=parser, args=args)
except CriticalError as ce:
exit_with_error(args.outform, str(ce))
except Exception:
exit_with_error(args.outform, traceback.format_exc())
if __name__ == "__main__":

@ -570,6 +570,7 @@ class Mythril(object):
"""
all_issues = []
SolverStatistics().enabled = True
exceptions = []
for contract in contracts or self.contracts:
StartTime() # Reinitialize start time for new contracts
try:
@ -600,7 +601,7 @@ class Mythril(object):
+ traceback.format_exc()
)
issues = retrieve_callback_issues(modules)
exceptions.append(traceback.format_exc())
for issue in issues:
issue.add_code_info(contract)
@ -610,7 +611,7 @@ class Mythril(object):
source_data = Source()
source_data.get_source_from_contracts_list(self.contracts)
# Finally, output the results
report = Report(verbose_report, source_data)
report = Report(verbose_report, source_data, exceptions=exceptions)
for issue in all_issues:
report.append_issue(issue)

Loading…
Cancel
Save