Refactor cli (#1033)

* Refactor cli

* enhance cli

* Re-Refactor the cli structure for more flexibility

* Add cli tests for error

* Move the cmd_line_test to the previous directory

* Add documentation

* Add more tests and change docs

* Add more tests for storage slots and execution

* support a for analyze and add more tests

* Improve cli interface

* Fix previous errors

* Refactor with black

* Add help command

* Add new tests, fix an edge case and improve code and help messages
pull/1099/head
Nikhil Parasaram 5 years ago committed by GitHub
parent 5252a05524
commit ae26b1462d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 570
      mythril/interfaces/cli.py
  2. 4
      tests/cli_tests/test_cli_opts.py
  3. 58
      tests/cmd_line_test.py

@ -15,6 +15,7 @@ import coloredlogs
import traceback import traceback
import mythril.support.signatures as sigs import mythril.support.signatures as sigs
from argparse import ArgumentParser, Namespace
from mythril.exceptions import AddressNotFoundError, CriticalError from mythril.exceptions import AddressNotFoundError, CriticalError
from mythril.mythril import ( from mythril.mythril import (
MythrilAnalyzer, MythrilAnalyzer,
@ -24,13 +25,31 @@ from mythril.mythril import (
) )
from mythril.__version__ import __version__ as VERSION from mythril.__version__ import __version__ as VERSION
ANALYZE_LIST = ("analyze", "a")
DISASSEMBLE_LIST = ("disassemble", "d")
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
COMMAND_LIST = (
ANALYZE_LIST
+ DISASSEMBLE_LIST
+ (
"read-storage",
"leveldb-search",
"function-to-hash",
"hash-to-address",
"version",
"truffle",
"help",
)
)
def exit_with_error(format_, message): def exit_with_error(format_, message):
""" """
:param format_: Exits with error
:param message: :param format_: The format of the message
:param message: message
""" """
if format_ == "text" or format_ == "markdown": if format_ == "text" or format_ == "markdown":
log.error(message) log.error(message)
@ -53,94 +72,46 @@ def exit_with_error(format_, message):
sys.exit() sys.exit()
def main() -> None: def get_input_parser() -> ArgumentParser:
"""The main CLI interface entry point."""
parser = argparse.ArgumentParser(
description="Security analysis of Ethereum smart contracts"
)
create_parser(parser)
# Get config values
args = parser.parse_args()
parse_args(parser=parser, args=args)
def create_parser(parser: argparse.ArgumentParser) -> None:
""" """
Creates the parser by setting all the possible arguments Returns Parser which handles input
:param parser: The parser :return: Parser which handles input
""" """
parser.add_argument("solidity_file", nargs="*") parser = ArgumentParser(add_help=False)
parser.add_argument(
commands = parser.add_argument_group("commands")
commands.add_argument("-g", "--graph", help="generate a control flow graph")
commands.add_argument(
"-V",
"--version",
action="store_true",
help="print the Mythril version number and exit",
)
commands.add_argument(
"-x",
"--fire-lasers",
action="store_true",
help="detect vulnerabilities, use with -c, -a or solidity file(s)",
)
commands.add_argument(
"--truffle",
action="store_true",
help="analyze a truffle project (run from project dir)",
)
commands.add_argument(
"-d", "--disassemble", action="store_true", help="print disassembly"
)
commands.add_argument(
"-j",
"--statespace-json",
help="dumps the statespace json",
metavar="OUTPUT_FILE",
)
inputs = parser.add_argument_group("input arguments")
inputs.add_argument(
"-c", "-c",
"--code", "--code",
help='hex-encoded bytecode string ("6060604052...")', help='hex-encoded bytecode string ("6060604052...")',
metavar="BYTECODE", metavar="BYTECODE",
) )
inputs.add_argument( parser.add_argument(
"-f", "-f",
"--codefile", "--codefile",
help="file containing hex-encoded bytecode string", help="file containing hex-encoded bytecode string",
metavar="BYTECODEFILE", metavar="BYTECODEFILE",
type=argparse.FileType("r"), type=argparse.FileType("r"),
) )
inputs.add_argument( parser.add_argument(
"-a", "-a",
"--address", "--address",
help="pull contract from the blockchain", help="pull contract from the blockchain",
metavar="CONTRACT_ADDRESS", metavar="CONTRACT_ADDRESS",
) )
inputs.add_argument( parser.add_argument(
"-l",
"--dynld",
action="store_true",
help="auto-load dependencies from the blockchain",
)
inputs.add_argument(
"--no-onchain-storage-access",
action="store_true",
help="turns off getting the data from onchain contracts",
)
inputs.add_argument(
"--bin-runtime", "--bin-runtime",
action="store_true", action="store_true",
help="Only when -c or -f is used. Consider the input bytecode as binary runtime code, default being the contract creation bytecode.", help="Only when -c or -f is used. Consider the input bytecode as binary runtime code, default being the contract creation bytecode.",
) )
return parser
outputs = parser.add_argument_group("output formats") def get_output_parser() -> ArgumentParser:
outputs.add_argument( """
Get parser which handles output
:return: Parser which handles output
"""
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument(
"-o", "-o",
"--outform", "--outform",
choices=["text", "markdown", "json", "jsonv2"], choices=["text", "markdown", "json", "jsonv2"],
@ -148,43 +119,199 @@ def create_parser(parser: argparse.ArgumentParser) -> None:
help="report output format", help="report output format",
metavar="<text/markdown/json/jsonv2>", metavar="<text/markdown/json/jsonv2>",
) )
outputs.add_argument( parser.add_argument(
"--verbose-report", "--verbose-report",
action="store_true", action="store_true",
help="Include debugging information in report", help="Include debugging information in report",
) )
return parser
def get_rpc_parser() -> ArgumentParser:
"""
Get parser which handles RPC flags
:return: Parser which handles rpc inputs
"""
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument(
"--rpc",
help="custom RPC settings",
metavar="HOST:PORT / ganache / infura-[network_name]",
default="infura-mainnet",
)
parser.add_argument(
"--rpctls", type=bool, default=False, help="RPC connection over TLS"
)
return parser
def get_utilities_parser() -> ArgumentParser:
"""
Get parser which handles utilities flags
:return: Parser which handles utility flags
"""
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument("--solc-args", help="Extra arguments for solc")
parser.add_argument(
"--solv",
help="specify solidity compiler version. If not present, will try to install it (Experimental)",
metavar="SOLV",
)
return parser
def main() -> None:
"""The main CLI interface entry point."""
rpc_parser = get_rpc_parser()
utilities_parser = get_utilities_parser()
input_parser = get_input_parser()
output_parser = get_output_parser()
parser = argparse.ArgumentParser(
description="Security analysis of Ethereum smart contracts"
)
parser.add_argument("--epic", action="store_true", help=argparse.SUPPRESS)
parser.add_argument(
"-v", type=int, help="log level (0-5)", metavar="LOG_LEVEL", default=2
)
subparsers = parser.add_subparsers(dest="command", help="Commands")
analyzer_parser = subparsers.add_parser(
ANALYZE_LIST[0],
help="Triggers the analysis of the smart contract",
parents=[rpc_parser, utilities_parser, input_parser, output_parser],
aliases=ANALYZE_LIST[1:],
)
create_analyzer_parser(analyzer_parser)
disassemble_parser = subparsers.add_parser(
DISASSEMBLE_LIST[0],
help="Disassembles the smart contract",
aliases=DISASSEMBLE_LIST[1:],
parents=[rpc_parser, utilities_parser, input_parser],
)
create_disassemble_parser(disassemble_parser)
read_storage_parser = subparsers.add_parser(
"read-storage",
help="Retrieves storage slots from a given address through rpc",
parents=[rpc_parser],
)
leveldb_search_parser = subparsers.add_parser(
"leveldb-search", help="Searches the code fragment in local leveldb"
)
contract_func_to_hash = subparsers.add_parser(
"function-to-hash", help="Returns the hash signature of the function"
)
contract_hash_to_addr = subparsers.add_parser(
"hash-to-address",
help="converts the hashes in the blockchain to ethereum address",
)
subparsers.add_parser(
"version", parents=[output_parser], help="Outputs the version"
)
create_read_storage_parser(read_storage_parser)
create_hash_to_addr_parser(contract_hash_to_addr)
create_func_to_hash_parser(contract_func_to_hash)
create_leveldb_parser(leveldb_search_parser)
subparsers.add_parser("truffle", parents=[analyzer_parser], add_help=False)
subparsers.add_parser("help", add_help=False)
# Get config values
args = parser.parse_args()
parse_args_and_execute(parser=parser, args=args)
def create_disassemble_parser(parser: ArgumentParser):
"""
Modify parser to handle disassembly
:param parser:
:return:
"""
parser.add_argument("solidity_file", nargs="*")
def create_read_storage_parser(read_storage_parser: ArgumentParser):
"""
Modify parser to handle storage slots
:param read_storage_parser:
:return:
"""
database = parser.add_argument_group("local contracts database") read_storage_parser.add_argument(
database.add_argument( "storage_slots",
"-s", "--search", help="search the contract database", metavar="EXPRESSION" help="read state variables from storage index",
metavar="INDEX,NUM_SLOTS,[array] / mapping,INDEX,[KEY1, KEY2...]",
)
read_storage_parser.add_argument(
"address", help="contract address", metavar="ADDRESS"
) )
database.add_argument(
def create_leveldb_parser(parser: ArgumentParser):
"""
Modify parser to handle leveldb-search
:param parser:
:return:
"""
parser.add_argument("search")
parser.add_argument(
"--leveldb-dir", "--leveldb-dir",
help="specify leveldb directory for search or direct access operations", help="specify leveldb directory for search or direct access operations",
metavar="LEVELDB_PATH", metavar="LEVELDB_PATH",
) )
utilities = parser.add_argument_group("utilities")
utilities.add_argument( def create_func_to_hash_parser(parser: ArgumentParser):
"--hash", help="calculate function signature hash", metavar="SIGNATURE" """
) Modify parser to handle func_to_hash command
utilities.add_argument( :param parser:
"--storage", :return:
help="read state variables from storage index, use with -a", """
metavar="INDEX,NUM_SLOTS,[array] / mapping,INDEX,[KEY1, KEY2...]", parser.add_argument(
"func_name", help="calculate function signature hash", metavar="SIGNATURE"
) )
utilities.add_argument(
"--solv",
help="specify solidity compiler version. If not present, will try to install it (Experimental)", def create_hash_to_addr_parser(hash_parser: ArgumentParser):
metavar="SOLV", """
Modify parser to handle hash_to_addr command
:param hash_parser:
:return:
"""
hash_parser.add_argument(
"hash", help="Find the address from hash", metavar="FUNCTION_NAME"
) )
utilities.add_argument( hash_parser.add_argument(
"--contract-hash-to-address", "--leveldb-dir",
help="returns corresponding address for a contract address hash", help="specify leveldb directory for search or direct access operations",
metavar="SHA3_TO_LOOK_FOR", metavar="LEVELDB_PATH",
) )
options = parser.add_argument_group("options")
def create_analyzer_parser(analyzer_parser: ArgumentParser):
"""
Modify parser to handle analyze command
:param analyzer_parser:
:return:
"""
analyzer_parser.add_argument("solidity_file", nargs="*")
commands = analyzer_parser.add_argument_group("commands")
commands.add_argument("-g", "--graph", help="generate a control flow graph")
commands.add_argument(
"-j",
"--statespace-json",
help="dumps the statespace json",
metavar="OUTPUT_FILE",
)
commands.add_argument(
"--truffle",
action="store_true",
help="analyze a truffle project (run from project dir)",
)
options = analyzer_parser.add_argument_group("options")
options.add_argument( options.add_argument(
"-m", "-m",
"--modules", "--modules",
@ -230,15 +357,23 @@ def create_parser(parser: argparse.ArgumentParser) -> None:
default=10, default=10,
help="The amount of seconds to spend on " "the initial contract creation", help="The amount of seconds to spend on " "the initial contract creation",
) )
options.add_argument("--solc-args", help="Extra arguments for solc")
options.add_argument( options.add_argument(
"--phrack", action="store_true", help="Phrack-style call graph" "-l",
"--dynld",
action="store_true",
help="auto-load dependencies from the blockchain",
) )
options.add_argument( options.add_argument(
"--enable-physics", action="store_true", help="enable graph physics simulation" "--no-onchain-storage-access",
action="store_true",
help="turns off getting the data from onchain contracts",
) )
options.add_argument( options.add_argument(
"-v", type=int, help="log level (0-5)", metavar="LOG_LEVEL", default=2 "--phrack", action="store_true", help="Phrack-style call graph"
)
options.add_argument(
"--enable-physics", action="store_true", help="enable graph physics simulation"
) )
options.add_argument( options.add_argument(
"-q", "-q",
@ -255,36 +390,14 @@ def create_parser(parser: argparse.ArgumentParser) -> None:
help="Deactivate dependency-based pruning", help="Deactivate dependency-based pruning",
) )
rpc = parser.add_argument_group("RPC options")
rpc.add_argument(
"--rpc",
help="custom RPC settings",
metavar="HOST:PORT / ganache / infura-[network_name]",
default="infura-mainnet",
)
rpc.add_argument(
"--rpctls", type=bool, default=False, help="RPC connection over TLS"
)
parser.add_argument("--epic", action="store_true", help=argparse.SUPPRESS)
def validate_args(parser: argparse.ArgumentParser, args: argparse.Namespace): def validate_args(args: Namespace):
if not ( """
args.search Validate cli args
or args.hash :param args:
or args.disassemble :return:
or args.graph """
or args.fire_lasers if args.__dict__.get("v", False):
or args.storage
or args.truffle
or args.statespace_json
or args.contract_hash_to_address
):
parser.print_help()
sys.exit()
if args.v:
if 0 <= args.v < 6: if 0 <= args.v < 6:
log_levels = [ log_levels = [
logging.NOTSET, logging.NOTSET,
@ -303,81 +416,92 @@ def validate_args(parser: argparse.ArgumentParser, args: argparse.Namespace):
args.outform, "Invalid -v value, you can find valid values in usage" args.outform, "Invalid -v value, you can find valid values in usage"
) )
if args.query_signature: if args.command in ANALYZE_LIST:
if sigs.ethereum_input_decoder is None: if args.query_signature and sigs.ethereum_input_decoder is None:
exit_with_error( exit_with_error(
args.outform, args.outform,
"The --query-signature function requires the python package ethereum-input-decoder", "The --query-signature function requires the python package ethereum-input-decoder",
) )
if args.enable_iprof: if args.enable_iprof and args.v < 4:
if args.v < 4:
exit_with_error( exit_with_error(
args.outform, args.outform,
"--enable-iprof must be used with -v LOG_LEVEL where LOG_LEVEL >= 4", "--enable-iprof must be used with -v LOG_LEVEL where LOG_LEVEL >= 4",
) )
elif not (args.graph or args.fire_lasers or args.statespace_json):
exit_with_error(
args.outform,
"--enable-iprof must be used with one of -g, --graph, -x, --fire-lasers, -j and --statespace-json",
)
def quick_commands(args: argparse.Namespace):
if args.hash:
print(MythrilDisassembler.hash_for_function_signature(args.hash))
sys.exit()
def set_config(args: argparse.Namespace): def set_config(args: Namespace):
"""
Set config based on args
:param args:
:return: modified config
"""
config = MythrilConfig() config = MythrilConfig()
if args.dynld or not args.no_onchain_storage_access and not (args.rpc or args.i): if (
args.command in ANALYZE_LIST
and (args.dynld or not args.no_onchain_storage_access)
) and not (args.rpc or args.i):
config.set_api_from_config_path() config.set_api_from_config_path()
if args.address: if args.__dict__.get("address", None):
# Establish RPC connection if necessary # Establish RPC connection if necessary
config.set_api_rpc(rpc=args.rpc, rpctls=args.rpctls) config.set_api_rpc(rpc=args.rpc, rpctls=args.rpctls)
elif args.search or args.contract_hash_to_address: if args.command in ("hash-to-address", "leveldb-search"):
# Open LevelDB if necessary # Open LevelDB if necessary
config.set_api_leveldb( if not args.__dict__.get("leveldb_dir", None):
config.leveldb_dir if not args.leveldb_dir else args.leveldb_dir leveldb_dir = config.leveldb_dir
) else:
leveldb_dir = args.leveldb_dir
config.set_api_leveldb(leveldb_dir)
return config return config
def leveldb_search(config: MythrilConfig, args: argparse.Namespace): def leveldb_search(config: MythrilConfig, args: Namespace):
if args.search or args.contract_hash_to_address: """
Handle leveldb search
:param config:
:param args:
:return:
"""
if args.command in ("hash-to-address", "leveldb-search"):
leveldb_searcher = MythrilLevelDB(config.eth_db) leveldb_searcher = MythrilLevelDB(config.eth_db)
if args.search: if args.command == "leveldb-search":
# Database search ops # Database search ops
leveldb_searcher.search_db(args.search) leveldb_searcher.search_db(args.search)
else: else:
# search corresponding address # search corresponding address
try: try:
leveldb_searcher.contract_hash_to_address(args.contract_hash_to_address) leveldb_searcher.contract_hash_to_address(args.hash)
except AddressNotFoundError: except AddressNotFoundError:
print("Address not found.") print("Address not found.")
sys.exit() sys.exit()
def get_code(disassembler: MythrilDisassembler, args: argparse.Namespace): def load_code(disassembler: MythrilDisassembler, args: Namespace):
"""
Loads code into disassembly and returns address
:param disassembler:
:param args:
:return: Address
"""
address = None address = None
if args.code: if args.__dict__.get("code", False):
# Load from bytecode # Load from bytecode
code = args.code[2:] if args.code.startswith("0x") else args.code code = args.code[2:] if args.code.startswith("0x") else args.code
address, _ = disassembler.load_from_bytecode(code, args.bin_runtime) address, _ = disassembler.load_from_bytecode(code, args.bin_runtime)
elif args.codefile: elif args.__dict__.get("codefile", False):
bytecode = "".join([l.strip() for l in args.codefile if len(l.strip()) > 0]) bytecode = "".join([l.strip() for l in args.codefile if len(l.strip()) > 0])
bytecode = bytecode[2:] if bytecode.startswith("0x") else bytecode bytecode = bytecode[2:] if bytecode.startswith("0x") else bytecode
address, _ = disassembler.load_from_bytecode(bytecode, args.bin_runtime) address, _ = disassembler.load_from_bytecode(bytecode, args.bin_runtime)
elif args.address: elif args.__dict__.get("address", False):
# Get bytecode from a contract address # Get bytecode from a contract address
address, _ = disassembler.load_from_address(args.address) address, _ = disassembler.load_from_address(args.address)
elif args.solidity_file: elif args.__dict__.get("solidity_file", False):
# Compile Solidity source file(s) # Compile Solidity source file(s)
if args.graph and len(args.solidity_file) > 1: if args.command in ANALYZE_LIST and args.graph and len(args.solidity_file) > 1:
exit_with_error( exit_with_error(
args.outform, args.outform,
"Cannot generate call graphs from multiple input files. Please do it one at a time.", "Cannot generate call graphs from multiple input files. Please do it one at a time.",
@ -387,8 +511,8 @@ def get_code(disassembler: MythrilDisassembler, args: argparse.Namespace):
) # list of files ) # list of files
else: else:
exit_with_error( exit_with_error(
args.outform, args.__dict__.get("outform", "text"),
"No input bytecode. Please provide EVM code via -c BYTECODE, -a ADDRESS, or -i SOLIDITY_FILES", "No input bytecode. Please provide EVM code via -c BYTECODE, -a ADDRESS, -f BYTECODE_FILE or <SOLIDITY_FILE>",
) )
return address return address
@ -396,45 +520,44 @@ def get_code(disassembler: MythrilDisassembler, args: argparse.Namespace):
def execute_command( def execute_command(
disassembler: MythrilDisassembler, disassembler: MythrilDisassembler,
address: str, address: str,
parser: argparse.ArgumentParser, parser: ArgumentParser,
args: argparse.Namespace, args: Namespace,
): ):
"""
Execute command
:param disassembler:
:param address:
:param parser:
:param args:
:return:
"""
if args.storage: if args.command == "read-storage":
if not args.address:
exit_with_error(
args.outform,
"To read storage, provide the address of a deployed contract with the -a option.",
)
storage = disassembler.get_state_variable_from_storage( storage = disassembler.get_state_variable_from_storage(
address=address, params=[a.strip() for a in args.storage.strip().split(",")] address=address,
params=[a.strip() for a in args.storage_slots.strip().split(",")],
) )
print(storage) print(storage)
return
analyzer = MythrilAnalyzer(
strategy=args.strategy,
disassembler=disassembler,
address=address,
max_depth=args.max_depth,
execution_timeout=args.execution_timeout,
loop_bound=args.loop_bound,
create_timeout=args.create_timeout,
enable_iprof=args.enable_iprof,
disable_dependency_pruning=args.disable_dependency_pruning,
onchain_storage_access=not args.no_onchain_storage_access,
)
if args.disassemble:
# or mythril.disassemble(mythril.contracts[0])
elif args.command in DISASSEMBLE_LIST:
if disassembler.contracts[0].code: if disassembler.contracts[0].code:
print("Runtime Disassembly: \n" + disassembler.contracts[0].get_easm()) print("Runtime Disassembly: \n" + disassembler.contracts[0].get_easm())
if disassembler.contracts[0].creation_code: if disassembler.contracts[0].creation_code:
print("Disassembly: \n" + disassembler.contracts[0].get_creation_easm()) print("Disassembly: \n" + disassembler.contracts[0].get_creation_easm())
elif args.graph or args.fire_lasers: elif args.command in ANALYZE_LIST:
analyzer = MythrilAnalyzer(
strategy=args.strategy,
disassembler=disassembler,
address=address,
max_depth=args.max_depth,
execution_timeout=args.execution_timeout,
create_timeout=args.create_timeout,
enable_iprof=args.enable_iprof,
disable_dependency_pruning=args.disable_dependency_pruning,
onchain_storage_access=not args.no_onchain_storage_access,
)
if not disassembler.contracts: if not disassembler.contracts:
exit_with_error( exit_with_error(
args.outform, "input files do not contain any valid contracts" args.outform, "input files do not contain any valid contracts"
@ -454,6 +577,21 @@ def execute_command(
except Exception as e: except Exception as e:
exit_with_error(args.outform, "Error saving graph: " + str(e)) exit_with_error(args.outform, "Error saving graph: " + str(e))
elif args.statespace_json:
if not analyzer.contracts:
exit_with_error(
args.outform, "input files do not contain any valid contracts"
)
statespace = analyzer.dump_statespace(contract=analyzer.contracts[0])
try:
with open(args.statespace_json, "w") as f:
json.dump(statespace, f)
except Exception as e:
exit_with_error(args.outform, "Error saving json: " + str(e))
else: else:
try: try:
report = analyzer.fire_lasers( report = analyzer.fire_lasers(
@ -472,29 +610,24 @@ def execute_command(
print(outputs[args.outform]) print(outputs[args.outform])
except ModuleNotFoundError as e: except ModuleNotFoundError as e:
exit_with_error( exit_with_error(
args.outform, "Error loading analyis modules: " + format(e) args.outform, "Error loading analysis modules: " + format(e)
) )
elif args.statespace_json:
if not analyzer.contracts:
exit_with_error(
args.outform, "input files do not contain any valid contracts"
)
statespace = analyzer.dump_statespace(contract=analyzer.contracts[0])
try:
with open(args.statespace_json, "w") as f:
json.dump(statespace, f)
except Exception as e:
exit_with_error(args.outform, "Error saving json: " + str(e))
else: else:
parser.print_help() parser.print_help()
def parse_args(parser: argparse.ArgumentParser, args: argparse.Namespace) -> None: def contract_hash_to_address(args: Namespace):
"""
prints the hash from function signature
:param args:
:return:
"""
print(MythrilDisassembler.hash_for_function_signature(args.func_name))
sys.exit()
def parse_args_and_execute(parser: ArgumentParser, args: Namespace) -> None:
""" """
Parses the arguments Parses the arguments
:param parser: The parser :param parser: The parser
@ -507,42 +640,55 @@ def parse_args(parser: argparse.ArgumentParser, args: argparse.Namespace) -> Non
os.system(" ".join(sys.argv) + " | python3 " + path + "/epic.py") os.system(" ".join(sys.argv) + " | python3 " + path + "/epic.py")
sys.exit() sys.exit()
if args.version: if args.command not in COMMAND_LIST or args.command is None:
parser.print_help()
sys.exit()
if args.command == "version":
if args.outform == "json": if args.outform == "json":
print(json.dumps({"version_str": VERSION})) print(json.dumps({"version_str": VERSION}))
else: else:
print("Mythril version {}".format(VERSION)) print("Mythril version {}".format(VERSION))
sys.exit() sys.exit()
if args.command == "help":
parser.print_help()
sys.exit()
# Parse cmdline args # Parse cmdline args
validate_args(parser, args) validate_args(args)
try: try:
quick_commands(args) if args.command == "function-to-hash":
contract_hash_to_address(args)
config = set_config(args) config = set_config(args)
leveldb_search(config, args) leveldb_search(config, args)
query_signature = args.__dict__.get("query_signature", None)
solc_args = args.__dict__.get("solc_args", None)
solv = args.__dict__.get("solv", None)
disassembler = MythrilDisassembler( disassembler = MythrilDisassembler(
eth=config.eth, eth=config.eth,
solc_version=args.solv, solc_version=solv,
solc_args=args.solc_args, solc_args=solc_args,
enable_online_lookup=args.query_signature, enable_online_lookup=query_signature,
) )
if args.truffle: if args.command == "truffle":
try: try:
disassembler.analyze_truffle_project(args) disassembler.analyze_truffle_project(args)
except FileNotFoundError: except FileNotFoundError:
print( print(
"Build directory not found. Make sure that you start the analysis from the project root, and that 'truffle compile' has executed successfully." "Build directory not found. Make sure that you start the analysis from the project root, "
"and that 'truffle compile' has executed successfully."
) )
sys.exit() sys.exit()
address = get_code(disassembler, args) address = load_code(disassembler, args)
execute_command( execute_command(
disassembler=disassembler, address=address, parser=parser, args=args disassembler=disassembler, address=address, parser=parser, args=args
) )
except CriticalError as ce: except CriticalError as ce:
exit_with_error(args.outform, str(ce)) exit_with_error(args.__dict__.get("outform", "text"), str(ce))
except Exception: except Exception:
exit_with_error(args.outform, traceback.format_exc()) exit_with_error(args.__dict__.get("outform", "text"), traceback.format_exc())
if __name__ == "__main__": if __name__ == "__main__":

@ -8,7 +8,7 @@ import sys
def test_version_opt(capsys): def test_version_opt(capsys):
# Check that "myth --version" returns a string with the word # Check that "myth --version" returns a string with the word
# "version" in it # "version" in it
sys.argv = ["mythril", "--version"] sys.argv = ["mythril", "version"]
with pytest.raises(SystemExit) as pytest_wrapped_e: with pytest.raises(SystemExit) as pytest_wrapped_e:
main() main()
assert pytest_wrapped_e.type == SystemExit assert pytest_wrapped_e.type == SystemExit
@ -16,7 +16,7 @@ def test_version_opt(capsys):
assert captured.out.find(" version ") >= 1 assert captured.out.find(" version ") >= 1
# Check that "myth --version -o json" returns a JSON object # Check that "myth --version -o json" returns a JSON object
sys.argv = ["mythril", "--version", "-o", "json"] sys.argv = ["mythril", "version", "-o", "json"]
with pytest.raises(SystemExit) as pytest_wrapped_e: with pytest.raises(SystemExit) as pytest_wrapped_e:
main() main()
assert pytest_wrapped_e.type == SystemExit assert pytest_wrapped_e.type == SystemExit

@ -1,5 +1,6 @@
from subprocess import check_output from subprocess import check_output
from tests import BaseTestCase, TESTDATA, PROJECT_DIR, TESTS_DIR from tests import BaseTestCase, TESTDATA, PROJECT_DIR, TESTS_DIR
from mock import patch
MYTH = str(PROJECT_DIR / "myth") MYTH = str(PROJECT_DIR / "myth")
@ -15,23 +16,64 @@ def output_of(command):
class CommandLineToolTestCase(BaseTestCase): class CommandLineToolTestCase(BaseTestCase):
def test_disassemble_code_correctly(self): def test_disassemble_code_correctly(self):
command = "python3 {} MYTH -d --bin-runtime -c 0x5050 --solv 0.5.0".format(MYTH) command = "python3 {} disassemble --bin-runtime -c 0x5050".format(MYTH)
self.assertIn("0 POP\n1 POP\n", output_of(command)) self.assertIn("0 POP\n1 POP\n", output_of(command))
def test_disassemble_solidity_file_correctly(self): def test_disassemble_solidity_file_correctly(self):
solidity_file = str(TESTDATA / "input_contracts" / "metacoin.sol") solidity_file = str(TESTDATA / "input_contracts" / "metacoin.sol")
command = "python3 {} -d {} --solv 0.5.0".format(MYTH, solidity_file) command = "python3 {} disassemble {}".format(MYTH, solidity_file)
self.assertIn("2 PUSH1 0x40\n4 MSTORE", output_of(command)) self.assertIn("2 PUSH1 0x40\n4 MSTORE", output_of(command))
def test_hash_a_function_correctly(self): def test_hash_a_function_correctly(self):
command = "python3 {} --solv 0.5.0 --hash 'setOwner(address)'".format(MYTH) command = "python3 {} function-to-hash 'setOwner(address)'".format(MYTH)
self.assertIn("0x13af4035\n", output_of(command)) self.assertIn("0x13af4035\n", output_of(command))
def test_failure_json(self):
command = "python3 {} analyze doesnt_exist.sol -o json".format(MYTH)
print(output_of(command))
self.assertIn(""""success": false""", output_of(command))
def test_failure_text(self):
command = "python3 {} analyze doesnt_exist.sol".format(MYTH)
assert output_of(command) == ""
def test_failure_jsonv2(self):
command = "python3 {} analyze doesnt_exist.sol -o jsonv2".format(MYTH)
self.assertIn(""""level": "error""" "", output_of(command))
def test_analyze(self):
solidity_file = str(TESTDATA / "input_contracts" / "origin.sol")
command = "python3 {} analyze {}".format(MYTH, solidity_file)
self.assertIn("111", output_of(command))
def test_analyze_bytecode(self):
solidity_file = str(TESTDATA / "inputs" / "origin.sol.o")
command = "python3 {} analyze --bin-runtime -f {}".format(MYTH, solidity_file)
self.assertIn("111", output_of(command))
def test_invalid_args_iprof(self):
solidity_file = str(TESTDATA / "input_contracts" / "origin.sol")
command = "python3 {} analyze {} --enable-iprof -o json".format(
MYTH, solidity_file
)
self.assertIn(""""success": false""", output_of(command))
def test_only_epic(self):
command = "python3 {}".format(MYTH)
self.assertIn("usage: ", output_of(command))
def test_storage(self):
solidity_file = str(TESTDATA / "input_contracts" / "origin.sol")
command = """python3 {} read-storage "438767356, 3" 0x76799f77587738bfeef09452df215b63d2cfb08a """.format(
MYTH
)
self.assertIn("0x1a270efc", output_of(command))
class TruffleTestCase(BaseTestCase): class TruffleTestCase(BaseTestCase):
def test_analysis_truffle_project(self): def test_analysis_truffle_project(self):
truffle_project_root = str(TESTS_DIR / "truffle_project") truffle_project_root = str(TESTS_DIR / "truffle_project")
command = "cd {}; truffle compile; python3 {} --truffle -t 2".format( command = "cd {}; truffle compile; python3 {} truffle -t 2".format(
truffle_project_root, MYTH truffle_project_root, MYTH
) )
self.assertIn("=== Unprotected Ether Withdrawal ====", output_of(command)) self.assertIn("=== Unprotected Ether Withdrawal ====", output_of(command))
@ -39,7 +81,7 @@ class TruffleTestCase(BaseTestCase):
class InfuraTestCase(BaseTestCase): class InfuraTestCase(BaseTestCase):
def test_infura_mainnet(self): def test_infura_mainnet(self):
command = "python3 {} --rpc infura-mainnet -d -a 0x2a0c0dbecc7e4d658f48e01e3fa353f44050c208".format( command = "python3 {} disassemble --rpc infura-mainnet -a 0x2a0c0dbecc7e4d658f48e01e3fa353f44050c208".format(
MYTH MYTH
) )
output = output_of(command) output = output_of(command)
@ -47,21 +89,21 @@ class InfuraTestCase(BaseTestCase):
self.assertIn("7278 POP\n7279 POP\n7280 JUMP\n7281 STOP", output) self.assertIn("7278 POP\n7279 POP\n7280 JUMP\n7281 STOP", output)
def test_infura_rinkeby(self): def test_infura_rinkeby(self):
command = "python3 {} --rpc infura-rinkeby -d -a 0xB6f2bFED892a662bBF26258ceDD443f50Fa307F5".format( command = "python3 {} disassemble --rpc infura-rinkeby -a 0xB6f2bFED892a662bBF26258ceDD443f50Fa307F5".format(
MYTH MYTH
) )
output = output_of(command) output = output_of(command)
self.assertIn("34 JUMPDEST\n35 CALLVALUE", output) self.assertIn("34 JUMPDEST\n35 CALLVALUE", output)
def test_infura_kovan(self): def test_infura_kovan(self):
command = "python3 {} --rpc infura-kovan -d -a 0xE6bBF9B5A3451242F82f8cd458675092617a1235".format( command = "python3 {} disassemble --rpc infura-kovan -a 0xE6bBF9B5A3451242F82f8cd458675092617a1235".format(
MYTH MYTH
) )
output = output_of(command) output = output_of(command)
self.assertIn("9999 PUSH1 0x00\n10001 NOT\n10002 AND\n10003 PUSH1 0x00", output) self.assertIn("9999 PUSH1 0x00\n10001 NOT\n10002 AND\n10003 PUSH1 0x00", output)
def test_infura_ropsten(self): def test_infura_ropsten(self):
command = "python3 {} --rpc infura-ropsten -d -a 0x6e0E0e02377Bc1d90E8a7c21f12BA385C2C35f78".format( command = "python3 {} disassemble --rpc infura-ropsten -a 0x6e0E0e02377Bc1d90E8a7c21f12BA385C2C35f78".format(
MYTH MYTH
) )
output = output_of(command) output = output_of(command)

Loading…
Cancel
Save