Support TX constraints and refactor CLI (#1607)

* Support TX constraints and refactor CLI

* Fix typing

* Add additional typing

* Use dict over accessor

* Fix z3py requirements

* Init storage args

* Init storage args

* Init args in vm tests
pull/1608/head
Nikhil Parasaram 3 years ago committed by GitHub
parent 7a4946144a
commit bb1dd3e794
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 203
      mythril/interfaces/cli.py
  2. 10
      mythril/laser/ethereum/svm.py
  3. 55
      mythril/laser/ethereum/transaction/symbolic.py
  4. 55
      mythril/mythril/mythril_analyzer.py
  5. 1
      mythril/support/support_args.py
  6. 2
      requirements.txt
  7. 22
      tests/graph_test.py
  8. 3
      tests/laser/evm_testsuite/evm_test.py
  9. 2
      tests/laser/state/storage_test.py
  10. 2
      tests/laser/transaction/symbolic_test.py
  11. 20
      tests/mythril/mythril_analyzer_test.py
  12. 22
      tests/statespace_test.py

@ -13,6 +13,7 @@ import sys
import coloredlogs
import traceback
from ast import literal_eval
import mythril.support.signatures as sigs
from argparse import ArgumentParser, Namespace, RawTextHelpFormatter
@ -411,122 +412,15 @@ def create_safe_functions_parser(parser: ArgumentParser):
)
options = parser.add_argument_group("options")
add_analysis_args(options)
options.add_argument(
"-m",
"--modules",
help="Comma-separated list of security analysis modules",
metavar="MODULES",
)
options.add_argument(
"--max-depth",
type=int,
default=128,
help="Maximum recursion depth for symbolic execution",
)
options.add_argument(
"--call-depth-limit",
type=int,
default=10,
help="Maximum call depth limit for symbolic execution",
)
options.add_argument(
"--strategy",
choices=["dfs", "bfs", "naive-random", "weighted-random"],
default="bfs",
help="Symbolic execution strategy",
)
options.add_argument(
"--beam-search",
type=int,
default=None,
help="Beam search with with",
)
options.add_argument(
"-b",
"--loop-bound",
type=int,
default=5,
help="Bound loops at n iterations",
metavar="N",
)
options.add_argument(
"--execution-timeout",
type=int,
default=7200,
help="The amount of seconds to spend on symbolic execution",
)
options.add_argument(
"--solver-timeout",
type=int,
default=100000,
help="The maximum amount of time(in milli seconds) the solver spends for queries from analysis modules",
)
options.add_argument(
"--parallel-solving",
action="store_true",
help="Enable solving z3 queries in parallel",
)
options.add_argument(
"--solver-log",
help="Path to the directory for solver log",
metavar="SOLVER_LOG",
)
options.add_argument(
"-q",
"--query-signature",
action="store_true",
help="Lookup function signatures through www.4byte.directory",
)
options.add_argument(
"--create-timeout",
type=int,
default=10,
help="The amount of seconds to spend on the initial contract creation",
)
options.add_argument(
"--enable-iprof", action="store_true", help="enable the instruction profiler"
)
options.add_argument(
"--enable-coverage-strategy",
action="store_true",
help="enable coverage based search strategy",
)
options.add_argument(
"--custom-modules-directory",
help="designates a separate directory to search for custom analysis modules",
metavar="CUSTOM_MODULES_DIRECTORY",
)
options.add_argument(
"--attacker-address",
help="Designates a specific attacker address to use during analysis",
metavar="ATTACKER_ADDRESS",
)
options.add_argument(
"--creator-address",
help="Designates a specific creator address to use during analysis",
metavar="CREATOR_ADDRESS",
)
def create_analyzer_parser(analyzer_parser: ArgumentParser):
"""
Modify parser to handle analyze command
:param analyzer_parser:
:return:
def add_analysis_args(options):
"""[summary]
Adds arguments for analysis
Args:
options ([type]): [description]
"""
analyzer_parser.add_argument(
"solidity_files",
nargs="*",
help="Inputs file name and contract name. \n"
"usage: file1.sol:OptionalContractName file2.sol file3.sol:OptionalContractName",
)
add_graph_commands(analyzer_parser)
options = analyzer_parser.add_argument_group("options")
options.add_argument(
"-m",
"--modules",
@ -552,6 +446,14 @@ def create_analyzer_parser(analyzer_parser: ArgumentParser):
default="bfs",
help="Symbolic execution strategy",
)
options.add_argument(
"--transaction-sequences",
type=str,
default=None,
help="The possible transaction sequences to be executed. "
"Like [[func_hash1, func_hash2], [func_hash2, func_hash3]] where for the first transaction is constrained "
"with func_hash1 and func_hash2, and the second tx is constrained with func_hash2 and func_hash3",
)
options.add_argument(
"--beam-search",
type=int,
@ -659,6 +561,23 @@ def create_analyzer_parser(analyzer_parser: ArgumentParser):
)
def create_analyzer_parser(analyzer_parser: ArgumentParser):
"""
Modify parser to handle analyze command
:param analyzer_parser:
:return:
"""
analyzer_parser.add_argument(
"solidity_files",
nargs="*",
help="Inputs file name and contract name. \n"
"usage: file1.sol:OptionalContractName file2.sol file3.sol:OptionalContractName",
)
add_graph_commands(analyzer_parser)
options = analyzer_parser.add_argument_group("options")
add_analysis_args(options)
def validate_args(args: Namespace):
"""
Validate cli args
@ -687,6 +606,20 @@ def validate_args(args: Namespace):
if args.command in DISASSEMBLE_LIST and len(args.solidity_files) > 1:
exit_with_error("text", "Only a single arg is supported for using disassemble")
if args.__dict__.get("transaction_sequences", None):
try:
args.transaction_sequences = literal_eval(str(args.transaction_sequences))
except ValueError:
exit_with_error(
args.outform,
"The transaction sequence is in incorrect format, It should be "
"[list of possible function hashes in 1st transaction, "
"list of possible func hashes in 2nd tx, ..]"
"If any list is empty then all possible functions are considered for that transaction",
)
if len(args.transaction_sequences) != args.transaction_count:
args.transaction_count = len(args.transaction_sequences)
if args.command in ANALYZE_LIST:
if args.query_signature and sigs.ethereum_input_decoder is None:
exit_with_error(
@ -816,26 +749,12 @@ def execute_command(
print("Disassembly: \n" + disassembler.contracts[0].get_creation_easm())
elif args.command == SAFE_FUNCTIONS_COMMAND:
args.unconstrained_storage = True
args.sparse_pruning = False
args.disable_dependency_pruning = True
args.no_onchain_data = True
function_analyzer = MythrilAnalyzer(
strategy=strategy,
disassembler=disassembler,
address=address,
max_depth=args.max_depth,
execution_timeout=args.execution_timeout,
loop_bound=args.loop_bound,
create_timeout=args.create_timeout,
enable_iprof=args.enable_iprof,
disable_dependency_pruning=True,
use_onchain_data=False,
solver_timeout=args.solver_timeout,
parallel_solving=args.parallel_solving,
custom_modules_directory=args.custom_modules_directory
if args.custom_modules_directory
else "",
call_depth_limit=args.call_depth_limit,
sparse_pruning=False,
unconstrained_storage=True,
solver_log=args.solver_log,
strategy=strategy, disassembler=disassembler, address=address, cmd_args=args
)
try:
report = function_analyzer.fire_lasers(
@ -852,25 +771,7 @@ def execute_command(
elif args.command in ANALYZE_LIST:
analyzer = MythrilAnalyzer(
strategy=strategy,
disassembler=disassembler,
address=address,
max_depth=args.max_depth,
execution_timeout=args.execution_timeout,
loop_bound=args.loop_bound,
create_timeout=args.create_timeout,
enable_iprof=args.enable_iprof,
disable_dependency_pruning=args.disable_dependency_pruning,
use_onchain_data=not args.no_onchain_data,
solver_timeout=args.solver_timeout,
parallel_solving=args.parallel_solving,
custom_modules_directory=args.custom_modules_directory
if args.custom_modules_directory
else "",
call_depth_limit=args.call_depth_limit,
sparse_pruning=args.sparse_pruning,
unconstrained_storage=args.unconstrained_storage,
solver_log=args.solver_log,
strategy=strategy, disassembler=disassembler, address=address, cmd_args=args
)
if not disassembler.contracts:

@ -229,11 +229,17 @@ class LaserEVM:
i, len(self.open_states)
)
)
func_hashes = (
args.transaction_sequences[i] if args.transaction_sequences else None
)
if func_hashes:
func_hashes = [
bytes.fromhex(hex(func_hash)[2:]) for func_hash in func_hashes
]
for hook in self._start_sym_trans_hooks:
hook()
execute_message_call(self, address)
execute_message_call(self, address, func_hashes=func_hashes)
for hook in self._stop_sym_trans_hooks:
hook()

@ -1,13 +1,15 @@
"""This module contains functions setting up and executing transactions with
symbolic values."""
import logging
from typing import Optional
from typing import Optional, List, Union
from copy import deepcopy
from mythril.disassembler.disassembly import Disassembly
from mythril.laser.ethereum.cfg import Node, Edge, JumpType
from mythril.laser.ethereum.state.account import Account
from mythril.laser.ethereum.state.calldata import SymbolicCalldata
from mythril.laser.ethereum.state.constraints import Constraints
from mythril.laser.ethereum.state.world_state import WorldState
from mythril.laser.ethereum.transaction.transaction_models import (
MessageCallTransaction,
@ -15,8 +17,11 @@ from mythril.laser.ethereum.transaction.transaction_models import (
tx_id_manager,
BaseTransaction,
)
from typing import List, Union
from mythril.laser.smt import symbol_factory, Or, BitVec
from mythril.laser.smt import symbol_factory, Or, Bool, BitVec
from mythril.support.support_args import args as cmd_args
FUNCTION_HASH_BYTE_LENGTH = 4
log = logging.getLogger(__name__)
@ -69,7 +74,31 @@ class Actors:
ACTORS = Actors()
def execute_message_call(laser_evm, callee_address: BitVec) -> None:
def generate_function_constraints(
calldata: SymbolicCalldata, func_hashes: List[List[int]]
) -> List[Bool]:
"""
This will generate constraints for fixing the function call part of calldata
:param calldata: Calldata
:param func_hashes: The list of function hashes allowed for this transaction
:return: Constraints List
"""
if len(func_hashes) == 0:
return []
constraints = []
for i in range(FUNCTION_HASH_BYTE_LENGTH):
constraint = Bool(False)
for func_hash in func_hashes:
constraint = Or(
constraint, calldata[i] == symbol_factory.BitVecVal(func_hash[i], 8)
)
constraints.append(constraint)
return constraints
def execute_message_call(
laser_evm, callee_address: BitVec, func_hashes: List[List[int]] = None
) -> None:
"""Executes a message call transaction from all open states.
:param laser_evm:
@ -89,7 +118,7 @@ def execute_message_call(laser_evm, callee_address: BitVec) -> None:
external_sender = symbol_factory.BitVecSym(
"sender_{}".format(next_transaction_id), 256
)
calldata = SymbolicCalldata(next_transaction_id)
transaction = MessageCallTransaction(
world_state=open_world_state,
identifier=next_transaction_id,
@ -100,12 +129,17 @@ def execute_message_call(laser_evm, callee_address: BitVec) -> None:
origin=external_sender,
caller=external_sender,
callee_account=open_world_state[callee_address],
call_data=SymbolicCalldata(next_transaction_id),
call_data=calldata,
call_value=symbol_factory.BitVecSym(
"call_value{}".format(next_transaction_id), 256
),
)
_setup_global_state_for_execution(laser_evm, transaction)
constraints = (
generate_function_constraints(calldata, func_hashes)
if func_hashes
else None
)
_setup_global_state_for_execution(laser_evm, transaction, constraints)
laser_evm.exec()
@ -158,7 +192,11 @@ def execute_contract_creation(
return new_account
def _setup_global_state_for_execution(laser_evm, transaction: BaseTransaction) -> None:
def _setup_global_state_for_execution(
laser_evm,
transaction: BaseTransaction,
initial_constraints: Optional[List[Bool]] = None,
) -> None:
"""Sets up global state and cfg for a transactions execution.
:param laser_evm:
@ -167,6 +205,7 @@ def _setup_global_state_for_execution(laser_evm, transaction: BaseTransaction) -
# TODO: Resolve circular import between .transaction and ..svm to import LaserEVM here
global_state = transaction.initial_global_state()
global_state.transaction_stack.append((transaction, None))
global_state.world_state.constraints += initial_constraints or []
global_state.world_state.constraints.append(
Or(*[transaction.caller == actor for actor in ACTORS.addresses.values()])

@ -4,6 +4,7 @@
import logging
import traceback
from typing import Optional, List
from argparse import Namespace
from . import MythrilDisassembler
from mythril.support.source_support import Source
@ -32,49 +33,41 @@ class MythrilAnalyzer:
def __init__(
self,
disassembler: MythrilDisassembler,
requires_dynld: bool = False,
use_onchain_data: bool = True,
cmd_args: Namespace,
strategy: str = "dfs",
address: Optional[str] = None,
max_depth: Optional[int] = None,
execution_timeout: Optional[int] = None,
loop_bound: Optional[int] = None,
create_timeout: Optional[int] = None,
enable_iprof: bool = False,
disable_dependency_pruning: bool = False,
solver_timeout: Optional[int] = None,
custom_modules_directory: str = "",
sparse_pruning: bool = False,
unconstrained_storage: bool = False,
parallel_solving: bool = False,
call_depth_limit: int = 3,
solver_log: Optional[str] = None,
):
"""
:param disassembler: The MythrilDisassembler class
:param requires_dynld: whether dynamic loading should be done or not
:param onchain_storage_access: Whether onchain access should be done or not
:param cmd_args: The command line args Namespace
:param strategy: Search strategy
:param address: Address of the contract
"""
self.eth = disassembler.eth
self.contracts = disassembler.contracts or [] # type: List[EVMContract]
self.enable_online_lookup = disassembler.enable_online_lookup
self.use_onchain_data = use_onchain_data
self.use_onchain_data = not cmd_args.no_onchain_data
self.strategy = strategy
self.address = address
self.max_depth = max_depth
self.execution_timeout = execution_timeout
self.loop_bound = loop_bound
self.create_timeout = create_timeout
self.disable_dependency_pruning = disable_dependency_pruning
self.custom_modules_directory = custom_modules_directory
args.sparse_pruning = sparse_pruning
args.solver_timeout = solver_timeout
args.parallel_solving = parallel_solving
args.unconstrained_storage = unconstrained_storage
args.call_depth_limit = call_depth_limit
args.iprof = enable_iprof
args.solver_log = solver_log
self.max_depth = cmd_args.max_depth
self.execution_timeout = cmd_args.execution_timeout
self.loop_bound = cmd_args.loop_bound
self.create_timeout = cmd_args.create_timeout
self.disable_dependency_pruning = cmd_args.disable_dependency_pruning
self.custom_modules_directory = (
cmd_args.custom_modules_directory
if cmd_args.custom_modules_directory
else ""
)
args.sparse_pruning = cmd_args.sparse_pruning
args.solver_timeout = cmd_args.solver_timeout
args.parallel_solving = cmd_args.parallel_solving
args.unconstrained_storage = cmd_args.unconstrained_storage
args.call_depth_limit = cmd_args.call_depth_limit
args.iprof = cmd_args.enable_iprof
args.solver_log = cmd_args.solver_log
args.transaction_sequences = cmd_args.transaction_sequences
def dump_statespace(self, contract: EVMContract = None) -> str:
"""

@ -12,6 +12,7 @@ class Args:
self.call_depth_limit = 3
self.iprof = True
self.solver_log = None
self.transaction_sequences: List[List[str]] = None
args = Args()

@ -32,7 +32,7 @@ rlp<3
semantic_version
transaction>=2.2.1
typing-extensions<4,>=3.7.4
z3-solver>=4.8.8.0
z3-solver<4.8.15.0,>=4.8.8.0
pysha3
matplotlib
pre-commit

@ -5,6 +5,7 @@ from mythril.mythril import MythrilAnalyzer, MythrilDisassembler
from mythril.ethereum import util
from mythril.solidity.soliditycontract import EVMContract
from tests import TESTDATA_INPUTS
from types import SimpleNamespace
def test_generate_graph():
@ -15,13 +16,28 @@ def test_generate_graph():
disassembler = MythrilDisassembler()
disassembler.contracts.append(contract)
args = SimpleNamespace(
execution_timeout=5,
max_depth=30,
solver_timeout=10000,
no_onchain_data=True,
loop_bound=None,
create_timeout=None,
disable_dependency_pruning=False,
custom_modules_directory=None,
sparse_pruning=True,
parallel_solving=True,
unconstrained_storage=True,
call_depth_limit=3,
enable_iprof=False,
solver_log=None,
transaction_sequences=None,
)
analyzer = MythrilAnalyzer(
disassembler=disassembler,
strategy="dfs",
execution_timeout=5,
max_depth=30,
address=(util.get_indexed_address(0)),
solver_timeout=10000,
cmd_args=args,
)
analyzer.graph_html(transaction_count=1)

@ -6,6 +6,7 @@ from mythril.laser.ethereum.function_managers import keccak_function_manager
from mythril.disassembler.disassembly import Disassembly
from mythril.laser.ethereum.transaction.concolic import execute_message_call
from mythril.laser.smt import Expression, BitVec, symbol_factory
from mythril.support.support_args import args
from datetime import datetime
import binascii
@ -119,7 +120,7 @@ def test_vmtest(
if test_name in ignored_test_names:
return
world_state = WorldState()
args.unconstrained_storage = False
for address, details in pre_condition.items():
account = Account(address, concrete_storage=True)
account.code = Disassembly(details["code"][2:])

@ -2,6 +2,7 @@ import pytest
from mythril.laser.smt import symbol_factory
from mythril.laser.ethereum.state.account import Storage
from mythril.laser.smt import Expression
from mythril.support.support_args import args
BVV = symbol_factory.BitVecVal
@ -11,6 +12,7 @@ storage_uninitialized_test_data = [({}, 1), ({1: 5}, 2), ({1: 5, 3: 10}, 2)]
@pytest.mark.parametrize("initial_storage,key", storage_uninitialized_test_data)
def test_concrete_storage_uninitialized_index(initial_storage, key):
# Arrange
args.unconstrained_storage = False
storage = Storage(concrete=True)
for k, val in initial_storage.items():
storage[BVV(k, 256)] = BVV(val, 256)

@ -15,7 +15,7 @@ import unittest.mock as mock
from unittest.mock import MagicMock
def _is_message_call(_, transaction):
def _is_message_call(_, transaction, transaction_sequences):
assert isinstance(transaction, MessageCallTransaction)

@ -2,6 +2,7 @@ from pathlib import Path
from mythril.mythril import MythrilDisassembler, MythrilAnalyzer
from mythril.analysis.report import Issue
from mock import patch, PropertyMock
from types import SimpleNamespace
@patch("mythril.analysis.report.Issue.add_code_info", return_value=None)
@ -22,7 +23,24 @@ def test_fire_lasers(mock_sym, mock_fire_lasers, mock_code_info):
)
]
)
analyzer = MythrilAnalyzer(disassembler, strategy="dfs")
args = SimpleNamespace(
execution_timeout=5,
max_depth=30,
solver_timeout=10000,
no_onchain_data=True,
loop_bound=None,
create_timeout=None,
disable_dependency_pruning=False,
custom_modules_directory=None,
sparse_pruning=True,
parallel_solving=True,
unconstrained_storage=True,
call_depth_limit=3,
enable_iprof=False,
solver_log=None,
transaction_sequences=None,
)
analyzer = MythrilAnalyzer(disassembler, cmd_args=args)
issues = analyzer.fire_lasers(modules=[]).sorted_issues()
mock_sym.assert_called()

@ -2,6 +2,7 @@ from mythril.mythril import MythrilAnalyzer, MythrilDisassembler
from mythril.ethereum import util
from mythril.solidity.soliditycontract import EVMContract
from tests import TESTDATA_INPUTS
from types import SimpleNamespace
def test_statespace_dump():
@ -12,13 +13,28 @@ def test_statespace_dump():
contract = EVMContract(input_file.read_text())
disassembler = MythrilDisassembler()
disassembler.contracts.append(contract)
args = SimpleNamespace(
execution_timeout=5,
max_depth=30,
solver_timeout=10000,
no_onchain_data=True,
loop_bound=None,
create_timeout=None,
disable_dependency_pruning=False,
custom_modules_directory=None,
sparse_pruning=True,
parallel_solving=True,
unconstrained_storage=True,
call_depth_limit=3,
enable_iprof=False,
solver_log=None,
transaction_sequences=None,
)
analyzer = MythrilAnalyzer(
disassembler=disassembler,
strategy="dfs",
execution_timeout=5,
max_depth=30,
address=(util.get_indexed_address(0)),
solver_timeout=10000,
cmd_args=args,
)
analyzer.dump_statespace(contract=contract)

Loading…
Cancel
Save