Fix the onsite storage access

pull/1102/head
Nikhil Parasaram 5 years ago
commit ec402a9330
  1. 8
      .circleci/config.yml
  2. 2
      mythril/__version__.py
  3. 45
      mythril/analysis/modules/ether_thief.py
  4. 8
      mythril/analysis/symbolic.py
  5. 573
      mythril/interfaces/cli.py
  6. 2
      mythril/laser/ethereum/call.py
  7. 120
      mythril/laser/ethereum/instructions.py
  8. 330
      mythril/laser/ethereum/plugins/implementations/dependency_pruner.py
  9. 9
      mythril/laser/ethereum/plugins/plugin_factory.py
  10. 10
      mythril/laser/ethereum/plugins/signals.py
  11. 23
      mythril/laser/ethereum/state/account.py
  12. 22
      mythril/laser/ethereum/state/world_state.py
  13. 20
      mythril/laser/ethereum/svm.py
  14. 2
      mythril/laser/ethereum/transaction/concolic.py
  15. 5
      mythril/mythril/mythril_analyzer.py
  16. 4
      setup.py
  17. 4
      tests/cli_tests/test_cli_opts.py
  18. 58
      tests/cmd_line_test.py
  19. 50
      tests/laser/state/world_state_account_exist_load_test.py

@ -107,12 +107,13 @@ jobs:
-e CIRCLE_BUILD_URL=$CIRCLE_BUILD_URL \
-e CIRCLE_WEBHOOK_URL=$CIRCLE_WEBHOOK_URL \
--rm edelweiss-mythril:latest \
--timeout 90 \
--timeout 30 \
--output-dir /opt/edelweiss \
--plugin-dir /opt/mythril \
--s3 \
--circle-ci CircleCI/mythril.csv\
--ignore-false-positives guess_the_random_number_fixed.sol simple_dao.sol old_blockhash.sol
--circle-ci CircleCI/mythril.csv \
--ignore-false-positives $IGNORE_FALSE_POSITIVES \
--ignore-regressions $IGNORE_REGRESSIONS
pypi_release:
<<: *defaults
@ -169,7 +170,6 @@ workflows:
only:
- develop
- master
- feature/ignore-regressions
tags:
only: /v[0-9]+(\.[0-9]+)*/
requires:

@ -4,4 +4,4 @@ This file is suitable for sourcing inside POSIX shell, e.g. bash as well
as for importing into Python.
"""
__version__ = "v0.20.9"
__version__ = "v0.21.3"

@ -7,14 +7,16 @@ from copy import copy
from mythril.analysis import solver
from mythril.analysis.modules.base import DetectionModule
from mythril.analysis.report import Issue
from mythril.laser.ethereum.transaction.symbolic import ATTACKER_ADDRESS
from mythril.laser.ethereum.transaction.transaction_models import (
ContractCreationTransaction,
from mythril.laser.ethereum.transaction.symbolic import (
ATTACKER_ADDRESS,
CREATOR_ADDRESS,
)
from mythril.analysis.swc_data import UNPROTECTED_ETHER_WITHDRAWAL
from mythril.exceptions import UnsatError
from mythril.laser.ethereum.transaction import ContractCreationTransaction
from mythril.laser.ethereum.state.global_state import GlobalState
from mythril.laser.smt import UGT, BVAddNoOverflow, Sum, symbol_factory
from mythril.laser.smt import UGT, Sum, symbol_factory, BVAddNoOverflow
from mythril.laser.smt.bitvec import If
log = logging.getLogger(__name__)
@ -77,21 +79,44 @@ class EtherThief(DetectionModule):
address = instruction["address"]
if self._cache_addresses.get(address, False):
return []
call_value = state.mstate.stack[-3]
value = state.mstate.stack[-3]
target = state.mstate.stack[-2]
eth_sent_total = symbol_factory.BitVecVal(0, 256)
eth_sent_by_attacker = symbol_factory.BitVecVal(0, 256)
constraints = copy(state.mstate.constraints)
for tx in state.world_state.transaction_sequence:
constraints += [BVAddNoOverflow(eth_sent_total, tx.call_value, False)]
eth_sent_total = Sum(eth_sent_total, tx.call_value)
"""
Constraint: The call value must be greater than the sum of Ether sent by the attacker over all
transactions. This prevents false positives caused by legitimate refund functions.
Also constrain the addition from overflowing (otherwise the solver produces solutions with
ridiculously high call values).
"""
constraints += [BVAddNoOverflow(eth_sent_by_attacker, tx.call_value, False)]
eth_sent_by_attacker = Sum(
eth_sent_by_attacker,
tx.call_value * If(tx.caller == ATTACKER_ADDRESS, 1, 0),
)
"""
Constraint: All transactions must originate from regular users (not the creator/owner).
This prevents false positives where the owner willingly transfers ownership to another address.
"""
if not isinstance(tx, ContractCreationTransaction):
constraints.append(tx.caller == ATTACKER_ADDRESS)
constraints += [tx.caller != CREATOR_ADDRESS]
"""
Require that the current transaction is sent by the attacker and
that the Ether is sent to the attacker's address.
"""
constraints += [UGT(call_value, eth_sent_total), target == ATTACKER_ADDRESS]
constraints += [
UGT(value, eth_sent_by_attacker),
target == ATTACKER_ADDRESS,
state.current_transaction.caller == ATTACKER_ADDRESS,
]
try:

@ -56,6 +56,7 @@ class SymExecWrapper:
modules=(),
compulsory_statespace=True,
enable_iprof=False,
disable_dependency_pruning=False,
run_analysis_modules=True,
):
"""
@ -122,6 +123,9 @@ class SymExecWrapper:
plugin_loader.load(PluginFactory.build_mutation_pruner_plugin())
plugin_loader.load(PluginFactory.build_instruction_coverage_plugin())
if not disable_dependency_pruning:
plugin_loader.load(PluginFactory.build_dependency_pruner_plugin())
world_state = WorldState()
for account in self.accounts.values():
world_state.put_account(account)
@ -154,7 +158,9 @@ class SymExecWrapper:
contract.disassembly,
dynamic_loader=dynloader,
contract_name=contract.name,
concrete_storage=False,
concrete_storage=True
if (dynloader is not None and dynloader.storage_loading)
else False,
)
world_state.put_account(account)
self.laser.sym_exec(world_state=world_state, target_address=address.value)

@ -15,6 +15,7 @@ import coloredlogs
import traceback
import mythril.support.signatures as sigs
from argparse import ArgumentParser, Namespace
from mythril.exceptions import AddressNotFoundError, CriticalError
from mythril.mythril import (
MythrilAnalyzer,
@ -24,13 +25,31 @@ from mythril.mythril import (
)
from mythril.__version__ import __version__ as VERSION
ANALYZE_LIST = ("analyze", "a")
DISASSEMBLE_LIST = ("disassemble", "d")
log = logging.getLogger(__name__)
COMMAND_LIST = (
ANALYZE_LIST
+ DISASSEMBLE_LIST
+ (
"read-storage",
"leveldb-search",
"function-to-hash",
"hash-to-address",
"version",
"truffle",
"help",
)
)
def exit_with_error(format_, message):
"""
:param format_:
:param message:
Exits with error
:param format_: The format of the message
:param message: message
"""
if format_ == "text" or format_ == "markdown":
log.error(message)
@ -53,94 +72,46 @@ def exit_with_error(format_, message):
sys.exit()
def main() -> None:
"""The main CLI interface entry point."""
parser = argparse.ArgumentParser(
description="Security analysis of Ethereum smart contracts"
)
create_parser(parser)
# Get config values
args = parser.parse_args()
parse_args(parser=parser, args=args)
def create_parser(parser: argparse.ArgumentParser) -> None:
def get_input_parser() -> ArgumentParser:
"""
Creates the parser by setting all the possible arguments
:param parser: The parser
Returns Parser which handles input
:return: Parser which handles input
"""
parser.add_argument("solidity_file", nargs="*")
commands = parser.add_argument_group("commands")
commands.add_argument("-g", "--graph", help="generate a control flow graph")
commands.add_argument(
"-V",
"--version",
action="store_true",
help="print the Mythril version number and exit",
)
commands.add_argument(
"-x",
"--fire-lasers",
action="store_true",
help="detect vulnerabilities, use with -c, -a or solidity file(s)",
)
commands.add_argument(
"--truffle",
action="store_true",
help="analyze a truffle project (run from project dir)",
)
commands.add_argument(
"-d", "--disassemble", action="store_true", help="print disassembly"
)
commands.add_argument(
"-j",
"--statespace-json",
help="dumps the statespace json",
metavar="OUTPUT_FILE",
)
inputs = parser.add_argument_group("input arguments")
inputs.add_argument(
parser = ArgumentParser(add_help=False)
parser.add_argument(
"-c",
"--code",
help='hex-encoded bytecode string ("6060604052...")',
metavar="BYTECODE",
)
inputs.add_argument(
parser.add_argument(
"-f",
"--codefile",
help="file containing hex-encoded bytecode string",
metavar="BYTECODEFILE",
type=argparse.FileType("r"),
)
inputs.add_argument(
parser.add_argument(
"-a",
"--address",
help="pull contract from the blockchain",
metavar="CONTRACT_ADDRESS",
)
inputs.add_argument(
"-l",
"--dynld",
action="store_true",
help="auto-load dependencies from the blockchain",
)
inputs.add_argument(
"--no-onchain-storage-access",
action="store_true",
help="turns off getting the data from onchain contracts",
)
inputs.add_argument(
parser.add_argument(
"--bin-runtime",
action="store_true",
help="Only when -c or -f is used. Consider the input bytecode as binary runtime code, default being the contract creation bytecode.",
)
return parser
outputs = parser.add_argument_group("output formats")
outputs.add_argument(
def get_output_parser() -> ArgumentParser:
"""
Get parser which handles output
:return: Parser which handles output
"""
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument(
"-o",
"--outform",
choices=["text", "markdown", "json", "jsonv2"],
@ -148,43 +119,199 @@ def create_parser(parser: argparse.ArgumentParser) -> None:
help="report output format",
metavar="<text/markdown/json/jsonv2>",
)
outputs.add_argument(
parser.add_argument(
"--verbose-report",
action="store_true",
help="Include debugging information in report",
)
return parser
def get_rpc_parser() -> ArgumentParser:
"""
Get parser which handles RPC flags
:return: Parser which handles rpc inputs
"""
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument(
"--rpc",
help="custom RPC settings",
metavar="HOST:PORT / ganache / infura-[network_name]",
default="infura-mainnet",
)
parser.add_argument(
"--rpctls", type=bool, default=False, help="RPC connection over TLS"
)
return parser
def get_utilities_parser() -> ArgumentParser:
"""
Get parser which handles utilities flags
:return: Parser which handles utility flags
"""
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument("--solc-args", help="Extra arguments for solc")
parser.add_argument(
"--solv",
help="specify solidity compiler version. If not present, will try to install it (Experimental)",
metavar="SOLV",
)
return parser
def main() -> None:
"""The main CLI interface entry point."""
rpc_parser = get_rpc_parser()
utilities_parser = get_utilities_parser()
input_parser = get_input_parser()
output_parser = get_output_parser()
parser = argparse.ArgumentParser(
description="Security analysis of Ethereum smart contracts"
)
parser.add_argument("--epic", action="store_true", help=argparse.SUPPRESS)
parser.add_argument(
"-v", type=int, help="log level (0-5)", metavar="LOG_LEVEL", default=2
)
subparsers = parser.add_subparsers(dest="command", help="Commands")
analyzer_parser = subparsers.add_parser(
ANALYZE_LIST[0],
help="Triggers the analysis of the smart contract",
parents=[rpc_parser, utilities_parser, input_parser, output_parser],
aliases=ANALYZE_LIST[1:],
)
create_analyzer_parser(analyzer_parser)
disassemble_parser = subparsers.add_parser(
DISASSEMBLE_LIST[0],
help="Disassembles the smart contract",
aliases=DISASSEMBLE_LIST[1:],
parents=[rpc_parser, utilities_parser, input_parser],
)
create_disassemble_parser(disassemble_parser)
read_storage_parser = subparsers.add_parser(
"read-storage",
help="Retrieves storage slots from a given address through rpc",
parents=[rpc_parser],
)
leveldb_search_parser = subparsers.add_parser(
"leveldb-search", help="Searches the code fragment in local leveldb"
)
contract_func_to_hash = subparsers.add_parser(
"function-to-hash", help="Returns the hash signature of the function"
)
contract_hash_to_addr = subparsers.add_parser(
"hash-to-address",
help="converts the hashes in the blockchain to ethereum address",
)
subparsers.add_parser(
"version", parents=[output_parser], help="Outputs the version"
)
create_read_storage_parser(read_storage_parser)
create_hash_to_addr_parser(contract_hash_to_addr)
create_func_to_hash_parser(contract_func_to_hash)
create_leveldb_parser(leveldb_search_parser)
subparsers.add_parser("truffle", parents=[analyzer_parser], add_help=False)
subparsers.add_parser("help", add_help=False)
# Get config values
args = parser.parse_args()
parse_args_and_execute(parser=parser, args=args)
def create_disassemble_parser(parser: ArgumentParser):
"""
Modify parser to handle disassembly
:param parser:
:return:
"""
parser.add_argument("solidity_file", nargs="*")
def create_read_storage_parser(read_storage_parser: ArgumentParser):
"""
Modify parser to handle storage slots
:param read_storage_parser:
:return:
"""
database = parser.add_argument_group("local contracts database")
database.add_argument(
"-s", "--search", help="search the contract database", metavar="EXPRESSION"
read_storage_parser.add_argument(
"storage_slots",
help="read state variables from storage index",
metavar="INDEX,NUM_SLOTS,[array] / mapping,INDEX,[KEY1, KEY2...]",
)
read_storage_parser.add_argument(
"address", help="contract address", metavar="ADDRESS"
)
database.add_argument(
def create_leveldb_parser(parser: ArgumentParser):
"""
Modify parser to handle leveldb-search
:param parser:
:return:
"""
parser.add_argument("search")
parser.add_argument(
"--leveldb-dir",
help="specify leveldb directory for search or direct access operations",
metavar="LEVELDB_PATH",
)
utilities = parser.add_argument_group("utilities")
utilities.add_argument(
"--hash", help="calculate function signature hash", metavar="SIGNATURE"
)
utilities.add_argument(
"--storage",
help="read state variables from storage index, use with -a",
metavar="INDEX,NUM_SLOTS,[array] / mapping,INDEX,[KEY1, KEY2...]",
def create_func_to_hash_parser(parser: ArgumentParser):
"""
Modify parser to handle func_to_hash command
:param parser:
:return:
"""
parser.add_argument(
"func_name", help="calculate function signature hash", metavar="SIGNATURE"
)
utilities.add_argument(
"--solv",
help="specify solidity compiler version. If not present, will try to install it (Experimental)",
metavar="SOLV",
def create_hash_to_addr_parser(hash_parser: ArgumentParser):
"""
Modify parser to handle hash_to_addr command
:param hash_parser:
:return:
"""
hash_parser.add_argument(
"hash", help="Find the address from hash", metavar="FUNCTION_NAME"
)
utilities.add_argument(
"--contract-hash-to-address",
help="returns corresponding address for a contract address hash",
metavar="SHA3_TO_LOOK_FOR",
hash_parser.add_argument(
"--leveldb-dir",
help="specify leveldb directory for search or direct access operations",
metavar="LEVELDB_PATH",
)
options = parser.add_argument_group("options")
def create_analyzer_parser(analyzer_parser: ArgumentParser):
"""
Modify parser to handle analyze command
:param analyzer_parser:
:return:
"""
analyzer_parser.add_argument("solidity_file", nargs="*")
commands = analyzer_parser.add_argument_group("commands")
commands.add_argument("-g", "--graph", help="generate a control flow graph")
commands.add_argument(
"-j",
"--statespace-json",
help="dumps the statespace json",
metavar="OUTPUT_FILE",
)
commands.add_argument(
"--truffle",
action="store_true",
help="analyze a truffle project (run from project dir)",
)
options = analyzer_parser.add_argument_group("options")
options.add_argument(
"-m",
"--modules",
@ -230,15 +357,23 @@ def create_parser(parser: argparse.ArgumentParser) -> None:
default=10,
help="The amount of seconds to spend on " "the initial contract creation",
)
options.add_argument("--solc-args", help="Extra arguments for solc")
options.add_argument(
"--phrack", action="store_true", help="Phrack-style call graph"
"-l",
"--dynld",
action="store_true",
help="auto-load dependencies from the blockchain",
)
options.add_argument(
"--enable-physics", action="store_true", help="enable graph physics simulation"
"--no-onchain-storage-access",
action="store_true",
help="turns off getting the data from onchain contracts",
)
options.add_argument(
"-v", type=int, help="log level (0-5)", metavar="LOG_LEVEL", default=2
"--phrack", action="store_true", help="Phrack-style call graph"
)
options.add_argument(
"--enable-physics", action="store_true", help="enable graph physics simulation"
)
options.add_argument(
"-q",
@ -249,37 +384,20 @@ def create_parser(parser: argparse.ArgumentParser) -> None:
options.add_argument(
"--enable-iprof", action="store_true", help="enable the instruction profiler"
)
rpc = parser.add_argument_group("RPC options")
rpc.add_argument(
"--rpc",
help="custom RPC settings",
metavar="HOST:PORT / ganache / infura-[network_name]",
default="infura-mainnet",
)
rpc.add_argument(
"--rpctls", type=bool, default=False, help="RPC connection over TLS"
options.add_argument(
"--disable-dependency-pruning",
action="store_true",
help="Deactivate dependency-based pruning",
)
parser.add_argument("--epic", action="store_true", help=argparse.SUPPRESS)
def validate_args(parser: argparse.ArgumentParser, args: argparse.Namespace):
if not (
args.search
or args.hash
or args.disassemble
or args.graph
or args.fire_lasers
or args.storage
or args.truffle
or args.statespace_json
or args.contract_hash_to_address
):
parser.print_help()
sys.exit()
if args.v:
def validate_args(args: Namespace):
"""
Validate cli args
:param args:
:return:
"""
if args.__dict__.get("v", False):
if 0 <= args.v < 6:
log_levels = [
logging.NOTSET,
@ -298,81 +416,92 @@ def validate_args(parser: argparse.ArgumentParser, args: argparse.Namespace):
args.outform, "Invalid -v value, you can find valid values in usage"
)
if args.query_signature:
if sigs.ethereum_input_decoder is None:
if args.command in ANALYZE_LIST:
if args.query_signature and sigs.ethereum_input_decoder is None:
exit_with_error(
args.outform,
"The --query-signature function requires the python package ethereum-input-decoder",
)
if args.enable_iprof:
if args.v < 4:
if args.enable_iprof and args.v < 4:
exit_with_error(
args.outform,
"--enable-iprof must be used with -v LOG_LEVEL where LOG_LEVEL >= 4",
)
elif not (args.graph or args.fire_lasers or args.statespace_json):
exit_with_error(
args.outform,
"--enable-iprof must be used with one of -g, --graph, -x, --fire-lasers, -j and --statespace-json",
)
def quick_commands(args: argparse.Namespace):
if args.hash:
print(MythrilDisassembler.hash_for_function_signature(args.hash))
sys.exit()
def set_config(args: argparse.Namespace):
def set_config(args: Namespace):
"""
Set config based on args
:param args:
:return: modified config
"""
config = MythrilConfig()
if args.dynld or not args.no_onchain_storage_access and not (args.rpc or args.i):
if (
args.command in ANALYZE_LIST
and (args.dynld or not args.no_onchain_storage_access)
) and not (args.rpc or args.i):
config.set_api_from_config_path()
if args.address:
if args.__dict__.get("address", None):
# Establish RPC connection if necessary
config.set_api_rpc(rpc=args.rpc, rpctls=args.rpctls)
elif args.search or args.contract_hash_to_address:
if args.command in ("hash-to-address", "leveldb-search"):
# Open LevelDB if necessary
config.set_api_leveldb(
config.leveldb_dir if not args.leveldb_dir else args.leveldb_dir
)
if not args.__dict__.get("leveldb_dir", None):
leveldb_dir = config.leveldb_dir
else:
leveldb_dir = args.leveldb_dir
config.set_api_leveldb(leveldb_dir)
return config
def leveldb_search(config: MythrilConfig, args: argparse.Namespace):
if args.search or args.contract_hash_to_address:
def leveldb_search(config: MythrilConfig, args: Namespace):
"""
Handle leveldb search
:param config:
:param args:
:return:
"""
if args.command in ("hash-to-address", "leveldb-search"):
leveldb_searcher = MythrilLevelDB(config.eth_db)
if args.search:
if args.command == "leveldb-search":
# Database search ops
leveldb_searcher.search_db(args.search)
else:
# search corresponding address
try:
leveldb_searcher.contract_hash_to_address(args.contract_hash_to_address)
leveldb_searcher.contract_hash_to_address(args.hash)
except AddressNotFoundError:
print("Address not found.")
sys.exit()
def get_code(disassembler: MythrilDisassembler, args: argparse.Namespace):
def load_code(disassembler: MythrilDisassembler, args: Namespace):
"""
Loads code into disassembly and returns address
:param disassembler:
:param args:
:return: Address
"""
address = None
if args.code:
if args.__dict__.get("code", False):
# Load from bytecode
code = args.code[2:] if args.code.startswith("0x") else args.code
address, _ = disassembler.load_from_bytecode(code, args.bin_runtime)
elif args.codefile:
elif args.__dict__.get("codefile", False):
bytecode = "".join([l.strip() for l in args.codefile if len(l.strip()) > 0])
bytecode = bytecode[2:] if bytecode.startswith("0x") else bytecode
address, _ = disassembler.load_from_bytecode(bytecode, args.bin_runtime)
elif args.address:
elif args.__dict__.get("address", False):
# Get bytecode from a contract address
address, _ = disassembler.load_from_address(args.address)
elif args.solidity_file:
elif args.__dict__.get("solidity_file", False):
# Compile Solidity source file(s)
if args.graph and len(args.solidity_file) > 1:
if args.command in ANALYZE_LIST and args.graph and len(args.solidity_file) > 1:
exit_with_error(
args.outform,
"Cannot generate call graphs from multiple input files. Please do it one at a time.",
@ -382,8 +511,8 @@ def get_code(disassembler: MythrilDisassembler, args: argparse.Namespace):
) # list of files
else:
exit_with_error(
args.outform,
"No input bytecode. Please provide EVM code via -c BYTECODE, -a ADDRESS, or -i SOLIDITY_FILES",
args.__dict__.get("outform", "text"),
"No input bytecode. Please provide EVM code via -c BYTECODE, -a ADDRESS, -f BYTECODE_FILE or <SOLIDITY_FILE>",
)
return address
@ -391,44 +520,45 @@ def get_code(disassembler: MythrilDisassembler, args: argparse.Namespace):
def execute_command(
disassembler: MythrilDisassembler,
address: str,
parser: argparse.ArgumentParser,
args: argparse.Namespace,
parser: ArgumentParser,
args: Namespace,
):
"""
Execute command
:param disassembler:
:param address:
:param parser:
:param args:
:return:
"""
if args.storage:
if not args.address:
exit_with_error(
args.outform,
"To read storage, provide the address of a deployed contract with the -a option.",
)
if args.command == "read-storage":
storage = disassembler.get_state_variable_from_storage(
address=address, params=[a.strip() for a in args.storage.strip().split(",")]
address=address,
params=[a.strip() for a in args.storage_slots.strip().split(",")],
)
print(storage)
return
analyzer = MythrilAnalyzer(
strategy=args.strategy,
disassembler=disassembler,
address=address,
max_depth=args.max_depth,
execution_timeout=args.execution_timeout,
loop_bound=args.loop_bound,
create_timeout=args.create_timeout,
enable_iprof=args.enable_iprof,
onchain_storage_access=not args.no_onchain_storage_access,
)
if args.disassemble:
# or mythril.disassemble(mythril.contracts[0])
elif args.command in DISASSEMBLE_LIST:
if disassembler.contracts[0].code:
print("Runtime Disassembly: \n" + disassembler.contracts[0].get_easm())
if disassembler.contracts[0].creation_code:
print("Disassembly: \n" + disassembler.contracts[0].get_creation_easm())
elif args.graph or args.fire_lasers:
elif args.command in ANALYZE_LIST:
analyzer = MythrilAnalyzer(
strategy=args.strategy,
disassembler=disassembler,
address=address,
max_depth=args.max_depth,
execution_timeout=args.execution_timeout,
loop_bound=args.loop_bound,
create_timeout=args.create_timeout,
enable_iprof=args.enable_iprof,
disable_dependency_pruning=args.disable_dependency_pruning,
onchain_storage_access=not args.no_onchain_storage_access,
)
if not disassembler.contracts:
exit_with_error(
args.outform, "input files do not contain any valid contracts"
@ -448,6 +578,21 @@ def execute_command(
except Exception as e:
exit_with_error(args.outform, "Error saving graph: " + str(e))
elif args.statespace_json:
if not analyzer.contracts:
exit_with_error(
args.outform, "input files do not contain any valid contracts"
)
statespace = analyzer.dump_statespace(contract=analyzer.contracts[0])
try:
with open(args.statespace_json, "w") as f:
json.dump(statespace, f)
except Exception as e:
exit_with_error(args.outform, "Error saving json: " + str(e))
else:
try:
report = analyzer.fire_lasers(
@ -466,29 +611,24 @@ def execute_command(
print(outputs[args.outform])
except ModuleNotFoundError as e:
exit_with_error(
args.outform, "Error loading analyis modules: " + format(e)
args.outform, "Error loading analysis modules: " + format(e)
)
elif args.statespace_json:
if not analyzer.contracts:
exit_with_error(
args.outform, "input files do not contain any valid contracts"
)
statespace = analyzer.dump_statespace(contract=analyzer.contracts[0])
try:
with open(args.statespace_json, "w") as f:
json.dump(statespace, f)
except Exception as e:
exit_with_error(args.outform, "Error saving json: " + str(e))
else:
parser.print_help()
def parse_args(parser: argparse.ArgumentParser, args: argparse.Namespace) -> None:
def contract_hash_to_address(args: Namespace):
"""
prints the hash from function signature
:param args:
:return:
"""
print(MythrilDisassembler.hash_for_function_signature(args.func_name))
sys.exit()
def parse_args_and_execute(parser: ArgumentParser, args: Namespace) -> None:
"""
Parses the arguments
:param parser: The parser
@ -501,42 +641,55 @@ def parse_args(parser: argparse.ArgumentParser, args: argparse.Namespace) -> Non
os.system(" ".join(sys.argv) + " | python3 " + path + "/epic.py")
sys.exit()
if args.version:
if args.command not in COMMAND_LIST or args.command is None:
parser.print_help()
sys.exit()
if args.command == "version":
if args.outform == "json":
print(json.dumps({"version_str": VERSION}))
else:
print("Mythril version {}".format(VERSION))
sys.exit()
if args.command == "help":
parser.print_help()
sys.exit()
# Parse cmdline args
validate_args(parser, args)
validate_args(args)
try:
quick_commands(args)
if args.command == "function-to-hash":
contract_hash_to_address(args)
config = set_config(args)
leveldb_search(config, args)
query_signature = args.__dict__.get("query_signature", None)
solc_args = args.__dict__.get("solc_args", None)
solv = args.__dict__.get("solv", None)
disassembler = MythrilDisassembler(
eth=config.eth,
solc_version=args.solv,
solc_args=args.solc_args,
enable_online_lookup=args.query_signature,
solc_version=solv,
solc_args=solc_args,
enable_online_lookup=query_signature,
)
if args.truffle:
if args.command == "truffle":
try:
disassembler.analyze_truffle_project(args)
except FileNotFoundError:
print(
"Build directory not found. Make sure that you start the analysis from the project root, and that 'truffle compile' has executed successfully."
"Build directory not found. Make sure that you start the analysis from the project root, "
"and that 'truffle compile' has executed successfully."
)
sys.exit()
address = get_code(disassembler, args)
address = load_code(disassembler, args)
execute_command(
disassembler=disassembler, address=address, parser=parser, args=args
)
except CriticalError as ce:
exit_with_error(args.outform, str(ce))
exit_with_error(args.__dict__.get("outform", "text"), str(ce))
except Exception:
exit_with_error(args.outform, traceback.format_exc())
exit_with_error(args.__dict__.get("outform", "text"), traceback.format_exc())
if __name__ == "__main__":

@ -96,7 +96,7 @@ def get_callee_address(
# attempt to read the contract address from instance storage
try:
callee_address = dynamic_loader.read_storage(
str(hex(environment.active_account.address.value)), index
hex(environment.active_account.address.value), index
)
# TODO: verify whether this happens or not
except:

@ -1002,16 +1002,61 @@ class Instruction:
global_state.mstate.stack.pop(),
global_state.mstate.stack.pop(),
)
return self._code_copy_helper(
code=global_state.environment.code.bytecode,
memory_offset=memory_offset,
code_offset=code_offset,
size=size,
op="CODECOPY",
global_state=global_state,
)
@StateTransition()
def extcodesize_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
addr = state.stack.pop()
try:
addr = hex(helper.get_concrete_int(addr))
except TypeError:
log.debug("unsupported symbolic address for EXTCODESIZE")
state.stack.append(global_state.new_bitvec("extcodesize_" + str(addr), 256))
return [global_state]
try:
code = global_state.world_state.accounts_exist_or_load(
addr, self.dynamic_loader
)
except (ValueError, AttributeError) as e:
log.debug("error accessing contract storage due to: " + str(e))
state.stack.append(global_state.new_bitvec("extcodesize_" + str(addr), 256))
return [global_state]
state.stack.append(len(code) // 2)
return [global_state]
@staticmethod
def _code_copy_helper(
code: str,
memory_offset: BitVec,
code_offset: BitVec,
size: BitVec,
op: str,
global_state: GlobalState,
) -> List[GlobalState]:
try:
concrete_memory_offset = helper.get_concrete_int(memory_offset)
except TypeError:
log.debug("Unsupported symbolic memory offset in CODECOPY")
log.debug("Unsupported symbolic memory offset in {}".format(op))
return [global_state]
try:
size = helper.get_concrete_int(size)
global_state.mstate.mem_extend(concrete_memory_offset, size)
concrete_size = helper.get_concrete_int(size)
global_state.mstate.mem_extend(concrete_memory_offset, concrete_size)
except TypeError:
# except both attribute error and Exception
@ -1029,9 +1074,9 @@ class Instruction:
try:
concrete_code_offset = helper.get_concrete_int(code_offset)
except TypeError:
log.debug("Unsupported symbolic code offset in CODECOPY")
global_state.mstate.mem_extend(concrete_memory_offset, size)
for i in range(size):
log.debug("Unsupported symbolic code offset in {}".format(op))
global_state.mstate.mem_extend(concrete_memory_offset, concrete_size)
for i in range(concrete_size):
global_state.mstate.memory[
concrete_memory_offset + i
] = global_state.new_bitvec(
@ -1042,21 +1087,20 @@ class Instruction:
)
return [global_state]
bytecode = global_state.environment.code.bytecode
if bytecode[0:2] == "0x":
bytecode = bytecode[2:]
if code[0:2] == "0x":
code = code[2:]
if size == 0 and isinstance(
if concrete_size == 0 and isinstance(
global_state.current_transaction, ContractCreationTransaction
):
if concrete_code_offset >= len(bytecode) // 2:
self._handle_symbolic_args(global_state, concrete_memory_offset)
if concrete_code_offset >= len(code) // 2:
Instruction._handle_symbolic_args(global_state, concrete_memory_offset)
return [global_state]
for i in range(size):
if 2 * (concrete_code_offset + i + 1) <= len(bytecode):
for i in range(concrete_size):
if 2 * (concrete_code_offset + i + 1) <= len(code):
global_state.mstate.memory[concrete_memory_offset + i] = int(
bytecode[
code[
2
* (concrete_code_offset + i) : 2
* (concrete_code_offset + i + 1)
@ -1076,35 +1120,41 @@ class Instruction:
return [global_state]
@StateTransition()
def extcodesize_(self, global_state: GlobalState) -> List[GlobalState]:
def extcodecopy_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
addr = state.stack.pop()
environment = global_state.environment
addr, memory_offset, code_offset, size = (
state.stack.pop(),
state.stack.pop(),
state.stack.pop(),
state.stack.pop(),
)
try:
addr = hex(helper.get_concrete_int(addr))
except TypeError:
log.debug("unsupported symbolic address for EXTCODESIZE")
state.stack.append(global_state.new_bitvec("extcodesize_" + str(addr), 256))
log.debug("unsupported symbolic address for EXTCODECOPY")
return [global_state]
try:
code = self.dynamic_loader.dynld(addr)
code = global_state.world_state.accounts_exist_or_load(
addr, self.dynamic_loader
)
except (ValueError, AttributeError) as e:
log.debug("error accessing contract storage due to: " + str(e))
state.stack.append(global_state.new_bitvec("extcodesize_" + str(addr), 256))
return [global_state]
if code is None:
state.stack.append(0)
else:
state.stack.append(len(code.bytecode) // 2)
return [global_state]
return self._code_copy_helper(
code=code,
memory_offset=memory_offset,
code_offset=code_offset,
size=size,
op="EXTCODECOPY",
global_state=global_state,
)
@StateTransition
def extcodehash_(self, global_state: GlobalState) -> List[GlobalState]:
@ -1120,20 +1170,6 @@ class Instruction:
)
return [global_state]
@StateTransition()
def extcodecopy_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
# FIXME: not implemented
state = global_state.mstate
addr = state.stack.pop()
start, s2, size = state.stack.pop(), state.stack.pop(), state.stack.pop()
return [global_state]
@StateTransition()
def returndatacopy_(self, global_state: GlobalState) -> List[GlobalState]:
"""

@ -0,0 +1,330 @@
from mythril.laser.ethereum.svm import LaserEVM
from mythril.laser.ethereum.plugins.plugin import LaserPlugin
from mythril.laser.ethereum.plugins.signals import PluginSkipState
from mythril.laser.ethereum.state.annotation import StateAnnotation
from mythril.laser.ethereum.state.global_state import GlobalState
from mythril.laser.ethereum.transaction.transaction_models import (
ContractCreationTransaction,
)
from mythril.exceptions import UnsatError
from z3.z3types import Z3Exception
from mythril.analysis import solver
from typing import cast, List, Dict, Set
from copy import copy
import logging
log = logging.getLogger(__name__)
class DependencyAnnotation(StateAnnotation):
"""Dependency Annotation
This annotation tracks read and write access to the state during each transaction.
"""
def __init__(self):
self.storage_loaded = [] # type: List
self.storage_written = {} # type: Dict[int, List]
self.has_call = False
self.path = [0] # type: List
def __copy__(self):
result = DependencyAnnotation()
result.storage_loaded = copy(self.storage_loaded)
result.storage_written = copy(self.storage_written)
result.path = copy(self.path)
result.has_call = self.has_call
return result
def get_storage_write_cache(self, iteration: int):
if iteration not in self.storage_written:
self.storage_written[iteration] = []
return self.storage_written[iteration]
def extend_storage_write_cache(self, iteration: int, value: object):
if iteration not in self.storage_written:
self.storage_written[iteration] = [value]
else:
if value not in self.storage_written[iteration]:
self.storage_written[iteration].append(value)
class WSDependencyAnnotation(StateAnnotation):
"""Dependency Annotation for World state
This world state annotation maintains a stack of state annotations.
It is used to transfer individual state annotations from one transaction to the next.
"""
def __init__(self):
self.annotations_stack = []
def __copy__(self):
result = WSDependencyAnnotation()
result.annotations_stack = copy(self.annotations_stack)
return result
def get_dependency_annotation(state: GlobalState) -> DependencyAnnotation:
""" Returns a dependency annotation
:param state: A global state object
"""
annotations = cast(
List[DependencyAnnotation], list(state.get_annotations(DependencyAnnotation))
)
if len(annotations) == 0:
"""FIXME: Hack for carrying over state annotations from the STOP and RETURN states of
the previous states. The states are pushed on a stack in the world state annotation
and popped off the stack in the subsequent iteration. This might break if any
other strategy than bfs is used (?).
"""
try:
world_state_annotation = get_ws_dependency_annotation(state)
annotation = world_state_annotation.annotations_stack.pop()
except IndexError:
annotation = DependencyAnnotation()
state.annotate(annotation)
else:
annotation = annotations[0]
return annotation
def get_ws_dependency_annotation(state: GlobalState) -> WSDependencyAnnotation:
""" Returns the world state annotation
:param state: A global state object
"""
annotations = cast(
List[WSDependencyAnnotation],
list(state.world_state.get_annotations(WSDependencyAnnotation)),
)
if len(annotations) == 0:
annotation = WSDependencyAnnotation()
state.world_state.annotate(annotation)
else:
annotation = annotations[0]
return annotation
class DependencyPruner(LaserPlugin):
"""Dependency Pruner Plugin
For every basic block, this plugin keeps a list of storage locations that
are accessed (read) in the execution path containing that block. This map
is built up over the whole symbolic execution run.
After the initial build up of the map in the first transaction, blocks are
executed only if any of the storage locations written to in the previous
transaction can have an effect on that block or any of its successors.
"""
def __init__(self):
"""Creates DependencyPruner"""
self._reset()
def _reset(self):
self.iteration = 0
self.dependency_map = {} # type: Dict[int, List[object]]
self.protected_addresses = set() # type: Set[int]
def update_dependency_map(self, path: List[int], target_location: object) -> None:
"""Update the dependency map for the block offsets on the given path.
:param path
:param target_location
"""
try:
for address in path:
if address in self.dependency_map:
if target_location not in self.dependency_map[address]:
self.dependency_map[address].append(target_location)
else:
self.dependency_map[address] = [target_location]
except Z3Exception as e:
# This should not happen unless there's a bug in laser, such as an invalid type being generated.
log.debug("Error updating dependency map: {}".format(e))
def protect_path(self, path: List[int]) -> None:
"""Prevent an execution path of being pruned.
:param path
"""
for address in path:
self.protected_addresses.add(address)
def wanna_execute(self, address: int, storage_write_cache) -> bool:
"""Decide whether the basic block starting at 'address' should be executed.
:param address
:param storage_write_cache
"""
if address in self.protected_addresses or address not in self.dependency_map:
return True
dependencies = self.dependency_map[address]
# Return if *any* dependency is found
for location in storage_write_cache:
for dependency in dependencies:
try:
solver.get_model([location == dependency])
return True
except UnsatError:
continue
return False
def initialize(self, symbolic_vm: LaserEVM) -> None:
"""Initializes the DependencyPruner
:param symbolic_vm
"""
self._reset()
@symbolic_vm.laser_hook("start_sym_trans")
def start_sym_trans_hook():
self.iteration += 1
@symbolic_vm.post_hook("CALL")
def call_hook(state: GlobalState):
annotation = get_dependency_annotation(state)
annotation.has_call = True
self.protect_path(annotation.path)
@symbolic_vm.post_hook("JUMP")
def jump_hook(state: GlobalState):
address = state.get_current_instruction()["address"]
annotation = get_dependency_annotation(state)
_check_basic_block(address, annotation)
@symbolic_vm.pre_hook("JUMPDEST")
def jumpdest_hook(state: GlobalState):
address = state.get_current_instruction()["address"]
annotation = get_dependency_annotation(state)
_check_basic_block(address, annotation)
@symbolic_vm.post_hook("JUMPI")
def jumpi_hook(state: GlobalState):
address = state.get_current_instruction()["address"]
annotation = get_dependency_annotation(state)
_check_basic_block(address, annotation)
@symbolic_vm.pre_hook("SSTORE")
def sstore_hook(state: GlobalState):
annotation = get_dependency_annotation(state)
annotation.extend_storage_write_cache(
self.iteration, state.mstate.stack[-1]
)
@symbolic_vm.pre_hook("SLOAD")
def sload_hook(state: GlobalState):
annotation = get_dependency_annotation(state)
location = state.mstate.stack[-1]
if location not in annotation.storage_loaded:
annotation.storage_loaded.append(location)
# We backwards-annotate the path here as sometimes execution never reaches a stop or return
# (and this may change in a future transaction).
self.update_dependency_map(annotation.path, location)
@symbolic_vm.pre_hook("STOP")
def stop_hook(state: GlobalState):
_transaction_end(state)
@symbolic_vm.pre_hook("RETURN")
def return_hook(state: GlobalState):
_transaction_end(state)
def _transaction_end(state: GlobalState) -> None:
"""When a stop or return is reached, the storage locations read along the path are entered into
the dependency map for all nodes encountered in this path.
:param state:
"""
annotation = get_dependency_annotation(state)
if annotation.has_call:
self.protect_path(annotation.path)
for index in annotation.storage_loaded:
self.update_dependency_map(annotation.path, index)
def _check_basic_block(address: int, annotation: DependencyAnnotation):
"""This method is where the actual pruning happens.
:param address: Start address (bytecode offset) of the block
:param annotation
"""
# Don't skip any blocks in the contract creation transaction
if self.iteration < 2:
return
annotation.path.append(address)
if self.wanna_execute(
address, annotation.get_storage_write_cache(self.iteration - 1)
):
return
else:
log.debug(
"Skipping state: Storage slots {} not read in block at address {}".format(
annotation.get_storage_write_cache(self.iteration - 1), address
)
)
raise PluginSkipState
@symbolic_vm.laser_hook("add_world_state")
def world_state_filter_hook(state: GlobalState):
if isinstance(state.current_transaction, ContractCreationTransaction):
# Reset iteration variable
self.iteration = 0
return
world_state_annotation = get_ws_dependency_annotation(state)
annotation = get_dependency_annotation(state)
# Reset the state annotation except for storage written which is carried on to
# the next transaction
annotation.path = [0]
annotation.storage_loaded = []
annotation.has_call = False
world_state_annotation.annotations_stack.append(annotation)
log.debug(
"Iteration {}: Adding world state at address {}, end of function {}.\nDependency map: {}\nStorage written: {}".format(
self.iteration,
state.get_current_instruction()["address"],
state.node.function_name,
self.dependency_map,
annotation.storage_written[self.iteration],
)
)

@ -30,3 +30,12 @@ class PluginFactory:
)
return InstructionCoveragePlugin()
@staticmethod
def build_dependency_pruner_plugin() -> LaserPlugin:
""" Creates an instance of the mutation pruner plugin"""
from mythril.laser.ethereum.plugins.implementations.dependency_pruner import (
DependencyPruner,
)
return DependencyPruner()

@ -15,3 +15,13 @@ class PluginSkipWorldState(PluginSignal):
"""
pass
class PluginSkipState(PluginSignal):
""" Plugin to skip world state
Plugins that raise this signal while the add_world_state hook is being executed
will force laser to abandon that world state.
"""
pass

@ -31,6 +31,29 @@ class Storage:
def __getitem__(self, item: BitVec) -> Any:
storage = self._get_corresponding_storage(item)
value = storage[item]
if (
value.value == 0
and self.address
and item.symbolic is False
and self.address.value != 0
and (self.dynld and self.dynld.storage_loading)
):
try:
storage[item] = symbol_factory.BitVecVal(
int(
self.dynld.read_storage(
contract_address=hex(self.address.value),
index=int(item.value),
),
16,
),
256,
)
return storage[item]
except ValueError:
pass
return simplify(storage[item])
@staticmethod

@ -3,6 +3,7 @@ from copy import copy
from random import randint
from typing import Dict, List, Iterator, Optional, TYPE_CHECKING
from mythril.support.loader import DynLoader
from mythril.laser.smt import symbol_factory, Array, BitVec
from ethereum.utils import mk_contract_address
from mythril.laser.ethereum.state.account import Account
@ -64,6 +65,27 @@ class WorldState:
new_world_state.node = self.node
return new_world_state
def accounts_exist_or_load(self, addr: str, dynamic_loader: DynLoader) -> str:
"""
returns account if it exists, else it loads from the dynamic loader
:param addr: address
:param dynamic_loader: Dynamic Loader
:return: The code
"""
addr_bitvec = symbol_factory.BitVecVal(int(addr, 16), 256)
if addr_bitvec.value in self.accounts:
code = self.accounts[addr_bitvec.value].code
else:
code = dynamic_loader.dynld(addr)
self.create_account(
balance=0, address=addr_bitvec.value, dynamic_loader=dynamic_loader
)
if code is None:
code = ""
else:
code = code.bytecode
return code
def create_account(
self,
balance=0,

@ -10,7 +10,7 @@ from mythril.laser.ethereum.evm_exceptions import StackUnderflowException
from mythril.laser.ethereum.evm_exceptions import VmException
from mythril.laser.ethereum.instructions import Instruction
from mythril.laser.ethereum.iprof import InstructionProfiler
from mythril.laser.ethereum.plugins.signals import PluginSkipWorldState
from mythril.laser.ethereum.plugins.signals import PluginSkipWorldState, PluginSkipState
from mythril.laser.ethereum.state.global_state import GlobalState
from mythril.laser.ethereum.state.world_state import WorldState
from mythril.laser.ethereum.strategy.basic import DepthFirstSearchStrategy
@ -274,7 +274,12 @@ class LaserEVM:
self._add_world_state(global_state)
return [], None
self._execute_pre_hook(op_code, global_state)
try:
self._execute_pre_hook(op_code, global_state)
except PluginSkipState:
self._add_world_state(global_state)
return [], None
try:
new_global_states = Instruction(
op_code, self.dynamic_loader, self.iprof
@ -443,7 +448,11 @@ class LaserEVM:
environment = state.environment
disassembly = environment.code
if address in disassembly.address_to_function_name:
if isinstance(
state.world_state.transaction_sequence[-1], ContractCreationTransaction
):
environment.active_function_name = "constructor"
elif address in disassembly.address_to_function_name:
# Enter a new function
environment.active_function_name = disassembly.address_to_function_name[
address
@ -541,7 +550,10 @@ class LaserEVM:
for hook in self.post_hooks[op_code]:
for global_state in global_states:
hook(global_state)
try:
hook(global_state)
except PluginSkipState:
global_states.remove(global_state)
def pre_hook(self, op_code: str) -> Callable:
"""

@ -88,7 +88,7 @@ def _setup_global_state_for_execution(laser_evm, transaction) -> None:
condition=None,
)
)
global_state.world_state.transaction_sequence.append(transaction)
global_state.node = new_node
new_node.states.append(global_state)
laser_evm.work_list.append(global_state)

@ -38,6 +38,7 @@ class MythrilAnalyzer:
loop_bound: Optional[int] = None,
create_timeout: Optional[int] = None,
enable_iprof: bool = False,
disable_dependency_pruning: bool = False,
):
"""
@ -57,6 +58,7 @@ class MythrilAnalyzer:
self.loop_bound = loop_bound
self.create_timeout = create_timeout
self.enable_iprof = enable_iprof
self.disable_dependency_pruning = disable_dependency_pruning
def dump_statespace(self, contract: EVMContract = None) -> str:
"""
@ -77,6 +79,7 @@ class MythrilAnalyzer:
execution_timeout=self.execution_timeout,
create_timeout=self.create_timeout,
enable_iprof=self.enable_iprof,
disable_dependency_pruning=self.disable_dependency_pruning,
run_analysis_modules=False,
)
@ -111,6 +114,7 @@ class MythrilAnalyzer:
transaction_count=transaction_count,
create_timeout=self.create_timeout,
enable_iprof=self.enable_iprof,
disable_dependency_pruning=self.disable_dependency_pruning,
run_analysis_modules=False,
)
return generate_graph(sym, physics=enable_physics, phrackify=phrackify)
@ -150,6 +154,7 @@ class MythrilAnalyzer:
modules=modules,
compulsory_statespace=False,
enable_iprof=self.enable_iprof,
disable_dependency_pruning=self.disable_dependency_pruning,
)
issues = fire_lasers(sym, modules)

@ -94,9 +94,9 @@ class VerifyVersionCommand(install):
""""""
tag = os.getenv("CIRCLE_TAG")
if tag != VERSION:
if tag != about["__version__"]:
info = "Git tag: {0} does not match the version of this app: {1}".format(
tag, VERSION
tag, about["__version__"]
)
sys.exit(info)

@ -8,7 +8,7 @@ import sys
def test_version_opt(capsys):
# Check that "myth --version" returns a string with the word
# "version" in it
sys.argv = ["mythril", "--version"]
sys.argv = ["mythril", "version"]
with pytest.raises(SystemExit) as pytest_wrapped_e:
main()
assert pytest_wrapped_e.type == SystemExit
@ -16,7 +16,7 @@ def test_version_opt(capsys):
assert captured.out.find(" version ") >= 1
# Check that "myth --version -o json" returns a JSON object
sys.argv = ["mythril", "--version", "-o", "json"]
sys.argv = ["mythril", "version", "-o", "json"]
with pytest.raises(SystemExit) as pytest_wrapped_e:
main()
assert pytest_wrapped_e.type == SystemExit

@ -1,5 +1,6 @@
from subprocess import check_output
from tests import BaseTestCase, TESTDATA, PROJECT_DIR, TESTS_DIR
from mock import patch
MYTH = str(PROJECT_DIR / "myth")
@ -15,23 +16,64 @@ def output_of(command):
class CommandLineToolTestCase(BaseTestCase):
def test_disassemble_code_correctly(self):
command = "python3 {} MYTH -d --bin-runtime -c 0x5050 --solv 0.5.0".format(MYTH)
command = "python3 {} disassemble --bin-runtime -c 0x5050".format(MYTH)
self.assertIn("0 POP\n1 POP\n", output_of(command))
def test_disassemble_solidity_file_correctly(self):
solidity_file = str(TESTDATA / "input_contracts" / "metacoin.sol")
command = "python3 {} -d {} --solv 0.5.0".format(MYTH, solidity_file)
command = "python3 {} disassemble {}".format(MYTH, solidity_file)
self.assertIn("2 PUSH1 0x40\n4 MSTORE", output_of(command))
def test_hash_a_function_correctly(self):
command = "python3 {} --solv 0.5.0 --hash 'setOwner(address)'".format(MYTH)
command = "python3 {} function-to-hash 'setOwner(address)'".format(MYTH)
self.assertIn("0x13af4035\n", output_of(command))
def test_failure_json(self):
command = "python3 {} analyze doesnt_exist.sol -o json".format(MYTH)
print(output_of(command))
self.assertIn(""""success": false""", output_of(command))
def test_failure_text(self):
command = "python3 {} analyze doesnt_exist.sol".format(MYTH)
assert output_of(command) == ""
def test_failure_jsonv2(self):
command = "python3 {} analyze doesnt_exist.sol -o jsonv2".format(MYTH)
self.assertIn(""""level": "error""" "", output_of(command))
def test_analyze(self):
solidity_file = str(TESTDATA / "input_contracts" / "origin.sol")
command = "python3 {} analyze {}".format(MYTH, solidity_file)
self.assertIn("111", output_of(command))
def test_analyze_bytecode(self):
solidity_file = str(TESTDATA / "inputs" / "origin.sol.o")
command = "python3 {} analyze --bin-runtime -f {}".format(MYTH, solidity_file)
self.assertIn("111", output_of(command))
def test_invalid_args_iprof(self):
solidity_file = str(TESTDATA / "input_contracts" / "origin.sol")
command = "python3 {} analyze {} --enable-iprof -o json".format(
MYTH, solidity_file
)
self.assertIn(""""success": false""", output_of(command))
def test_only_epic(self):
command = "python3 {}".format(MYTH)
self.assertIn("usage: ", output_of(command))
def test_storage(self):
solidity_file = str(TESTDATA / "input_contracts" / "origin.sol")
command = """python3 {} read-storage "438767356, 3" 0x76799f77587738bfeef09452df215b63d2cfb08a """.format(
MYTH
)
self.assertIn("0x1a270efc", output_of(command))
class TruffleTestCase(BaseTestCase):
def test_analysis_truffle_project(self):
truffle_project_root = str(TESTS_DIR / "truffle_project")
command = "cd {}; truffle compile; python3 {} --truffle -t 2".format(
command = "cd {}; truffle compile; python3 {} truffle -t 2".format(
truffle_project_root, MYTH
)
self.assertIn("=== Unprotected Ether Withdrawal ====", output_of(command))
@ -39,7 +81,7 @@ class TruffleTestCase(BaseTestCase):
class InfuraTestCase(BaseTestCase):
def test_infura_mainnet(self):
command = "python3 {} --rpc infura-mainnet -d -a 0x2a0c0dbecc7e4d658f48e01e3fa353f44050c208".format(
command = "python3 {} disassemble --rpc infura-mainnet -a 0x2a0c0dbecc7e4d658f48e01e3fa353f44050c208".format(
MYTH
)
output = output_of(command)
@ -47,21 +89,21 @@ class InfuraTestCase(BaseTestCase):
self.assertIn("7278 POP\n7279 POP\n7280 JUMP\n7281 STOP", output)
def test_infura_rinkeby(self):
command = "python3 {} --rpc infura-rinkeby -d -a 0xB6f2bFED892a662bBF26258ceDD443f50Fa307F5".format(
command = "python3 {} disassemble --rpc infura-rinkeby -a 0xB6f2bFED892a662bBF26258ceDD443f50Fa307F5".format(
MYTH
)
output = output_of(command)
self.assertIn("34 JUMPDEST\n35 CALLVALUE", output)
def test_infura_kovan(self):
command = "python3 {} --rpc infura-kovan -d -a 0xE6bBF9B5A3451242F82f8cd458675092617a1235".format(
command = "python3 {} disassemble --rpc infura-kovan -a 0xE6bBF9B5A3451242F82f8cd458675092617a1235".format(
MYTH
)
output = output_of(command)
self.assertIn("9999 PUSH1 0x00\n10001 NOT\n10002 AND\n10003 PUSH1 0x00", output)
def test_infura_ropsten(self):
command = "python3 {} --rpc infura-ropsten -d -a 0x6e0E0e02377Bc1d90E8a7c21f12BA385C2C35f78".format(
command = "python3 {} disassemble --rpc infura-ropsten -a 0x6e0E0e02377Bc1d90E8a7c21f12BA385C2C35f78".format(
MYTH
)
output = output_of(command)

@ -0,0 +1,50 @@
import pytest
from mythril.disassembler.disassembly import Disassembly
from mythril.laser.ethereum.state.environment import Environment
from mythril.laser.ethereum.state.account import Account
from mythril.laser.ethereum.state.machine_state import MachineState
from mythril.laser.ethereum.state.global_state import GlobalState
from mythril.laser.ethereum.state.world_state import WorldState
from mythril.support.loader import DynLoader
from mythril.ethereum.interface.rpc.client import EthJsonRpc
from mythril.laser.ethereum.instructions import Instruction
def _get_global_state():
active_account = Account("0x0", code=Disassembly("60606040"))
passive_account = Account(
"0x325345346564645654645", code=Disassembly("6060604061626364")
)
environment = Environment(active_account, None, None, None, None, None)
world_state = WorldState()
world_state.put_account(active_account)
world_state.put_account(passive_account)
return GlobalState(world_state, environment, None, MachineState(gas_limit=8000000))
@pytest.mark.parametrize(
"addr, eth, code_len",
[
(
"0xb09C477eCDAd49DD5Ac26c2C64914C3a6693843a",
EthJsonRpc("rinkeby.infura.io", 443, True),
1548,
),
(
"0x863DF6BFa4469f3ead0bE8f9F2AAE51c91A907b4",
EthJsonRpc("mainnet.infura.io", 443, True),
0,
),
(
"0x325345346564645654645",
EthJsonRpc("mainnet.infura.io", 443, True),
16,
), # This contract tests Address Cache
],
)
def test_extraction(addr, eth, code_len):
global_state = _get_global_state()
dynamic_loader = DynLoader(eth=eth)
code = global_state.world_state.accounts_exist_or_load(addr, dynamic_loader)
assert len(code) == code_len
Loading…
Cancel
Save