mirror of https://github.com/ConsenSys/mythril
Concolic tool (#1445)
* Mythril v0.22.14 * Add trace plugin and find trace * Add finding trace * Add some changes * change help text * Add some comments * Add support for concolic strategy * Fix issues * Add concolic exec tool * Add a few fixes * Fix a few errors * Add tests * Fix test interface * Add muti flip test * Add multi contract test * Fix cases with multicontract calls * Add more tests and fixes * Fix tests * Fix mypy * Change tests and other small fixes * Add more documentation * Add some changes * Update mythril/concolic/find_trace.py Co-authored-by: JoranHonig <JoranHonig@users.noreply.github.com> * Remove space Co-authored-by: JoranHonig <JoranHonig@users.noreply.github.com> * Fix typing * Fix Review issues * Change test type * Remove deepcopy * Use 3.8 for typing * Use typing_extensions * Fix prev_state * Add changes to tests and fix some bugs * Fix address assignment * Fix issues and format * Remove set_option * Split plugins * Refactor Co-authored-by: JoranHonig <JoranHonig@users.noreply.github.com>pull/1595/head
parent
0ea0f4fbe2
commit
d8e2689c8e
@ -0,0 +1,2 @@ |
||||
from mythril.concolic.concolic_execution import concolic_execution |
||||
from mythril.concolic.find_trace import concrete_execution |
@ -0,0 +1,85 @@ |
||||
import json |
||||
import binascii |
||||
|
||||
from datetime import datetime, timedelta |
||||
from typing import Dict, List, Any |
||||
from copy import deepcopy |
||||
|
||||
from mythril.concolic.concrete_data import ConcreteData |
||||
from mythril.concolic.find_trace import concrete_execution |
||||
from mythril.disassembler.disassembly import Disassembly |
||||
from mythril.laser.ethereum.strategy.concolic import ConcolicStrategy |
||||
from mythril.laser.ethereum.svm import LaserEVM |
||||
from mythril.laser.ethereum.state.world_state import WorldState |
||||
from mythril.laser.ethereum.state.account import Account |
||||
from mythril.laser.ethereum.transaction.symbolic import execute_transaction |
||||
from mythril.laser.ethereum.transaction.transaction_models import tx_id_manager |
||||
from mythril.laser.smt import Expression, BitVec, symbol_factory |
||||
from mythril.laser.ethereum.time_handler import time_handler |
||||
from mythril.support.support_args import args |
||||
|
||||
|
||||
def flip_branches( |
||||
init_state: WorldState, |
||||
concrete_data: ConcreteData, |
||||
jump_addresses: List[str], |
||||
trace: List, |
||||
) -> List[Dict[str, Dict[str, Any]]]: |
||||
""" |
||||
Flips branches and prints the input required for branch flip |
||||
|
||||
:param concrete_data: Concrete data |
||||
:param jump_addresses: Jump addresses to flip |
||||
:param trace: trace to follow |
||||
""" |
||||
tx_id_manager.restart_counter() |
||||
output_list = [] |
||||
laser_evm = LaserEVM( |
||||
execution_timeout=600, use_reachability_check=False, transaction_count=10 |
||||
) |
||||
laser_evm.open_states = [deepcopy(init_state)] |
||||
laser_evm.strategy = ConcolicStrategy( |
||||
work_list=laser_evm.work_list, |
||||
max_depth=100, |
||||
trace=trace, |
||||
flip_branch_addresses=jump_addresses, |
||||
) |
||||
|
||||
time_handler.start_execution(laser_evm.execution_timeout) |
||||
laser_evm.time = datetime.now() |
||||
|
||||
for transaction in concrete_data["steps"]: |
||||
execute_transaction( |
||||
laser_evm, |
||||
callee_address=transaction["address"], |
||||
caller_address=symbol_factory.BitVecVal( |
||||
int(transaction["origin"], 16), 256 |
||||
), |
||||
data=transaction["input"][2:], |
||||
) |
||||
|
||||
if laser_evm.strategy.results: |
||||
for addr in jump_addresses: |
||||
output_list.append(laser_evm.strategy.results[addr]) |
||||
return output_list |
||||
|
||||
|
||||
def concolic_execution( |
||||
concrete_data: ConcreteData, jump_addresses: List, solver_timeout=100000 |
||||
) -> List[Dict[str, Dict[str, Any]]]: |
||||
""" |
||||
Executes codes and prints input required to cover the branch flips |
||||
:param input_file: Input file |
||||
:param jump_addresses: Jump addresses to flip |
||||
:param solver_timeout: Solver timeout |
||||
|
||||
""" |
||||
init_state, trace = concrete_execution(concrete_data) |
||||
args.solver_timeout = solver_timeout |
||||
output_list = flip_branches( |
||||
init_state=init_state, |
||||
concrete_data=concrete_data, |
||||
jump_addresses=jump_addresses, |
||||
trace=trace, |
||||
) |
||||
return output_list |
@ -0,0 +1,34 @@ |
||||
from typing import Dict, List |
||||
from typing_extensions import TypedDict |
||||
|
||||
|
||||
class AccountData(TypedDict): |
||||
balance: str |
||||
code: str |
||||
nonce: int |
||||
storage: dict |
||||
|
||||
|
||||
class InitialState(TypedDict): |
||||
accounts: Dict[str, AccountData] |
||||
|
||||
|
||||
class TransactionData(TypedDict): |
||||
address: str |
||||
blockCoinbase: str |
||||
blockDifficulty: str |
||||
blockGasLimit: str |
||||
blockNumber: str |
||||
blockTime: str |
||||
calldata: str |
||||
gasLimit: str |
||||
gasPrice: str |
||||
input: str |
||||
name: str |
||||
origin: str |
||||
value: str |
||||
|
||||
|
||||
class ConcreteData(TypedDict): |
||||
initialState: InitialState |
||||
steps: List[TransactionData] |
@ -0,0 +1,78 @@ |
||||
import json |
||||
import binascii |
||||
|
||||
from copy import deepcopy |
||||
from datetime import datetime |
||||
from typing import Dict, List, Tuple |
||||
|
||||
from mythril.concolic.concrete_data import ConcreteData |
||||
|
||||
from mythril.disassembler.disassembly import Disassembly |
||||
from mythril.laser.ethereum.svm import LaserEVM |
||||
from mythril.laser.ethereum.state.world_state import WorldState |
||||
from mythril.laser.ethereum.state.account import Account |
||||
from mythril.laser.ethereum.transaction.concolic import execute_transaction |
||||
from mythril.laser.plugin.loader import LaserPluginLoader |
||||
from mythril.laser.smt import Expression, BitVec, symbol_factory |
||||
from mythril.laser.ethereum.transaction.transaction_models import tx_id_manager |
||||
from mythril.plugin.discovery import PluginDiscovery |
||||
|
||||
|
||||
def setup_concrete_initial_state(concrete_data: ConcreteData) -> WorldState: |
||||
""" |
||||
Sets up concrete initial state |
||||
:param concrete_data: Concrete data |
||||
:return: initialised world state |
||||
""" |
||||
world_state = WorldState() |
||||
for address, details in concrete_data["initialState"]["accounts"].items(): |
||||
account = Account(address, concrete_storage=True) |
||||
account.code = Disassembly(details["code"][2:]) |
||||
account.nonce = details["nonce"] |
||||
if type(details["storage"]) == str: |
||||
details["storage"] = eval(details["storage"]) # type: ignore |
||||
for key, value in details["storage"].items(): |
||||
key_bitvec = symbol_factory.BitVecVal(int(key, 16), 256) |
||||
account.storage[key_bitvec] = symbol_factory.BitVecVal(int(value, 16), 256) |
||||
|
||||
world_state.put_account(account) |
||||
account.set_balance(int(details["balance"], 16)) |
||||
return world_state |
||||
|
||||
|
||||
def concrete_execution(concrete_data: ConcreteData) -> Tuple[WorldState, List]: |
||||
""" |
||||
Executes code concretely to find the path to be followed by concolic executor |
||||
:param concrete_data: Concrete data |
||||
:return: path trace |
||||
""" |
||||
tx_id_manager.restart_counter() |
||||
init_state = setup_concrete_initial_state(concrete_data) |
||||
laser_evm = LaserEVM(execution_timeout=1000) |
||||
laser_evm.open_states = [deepcopy(init_state)] |
||||
plugin_loader = LaserPluginLoader() |
||||
assert PluginDiscovery().is_installed("myth_concolic_execution") |
||||
trace_plugin = PluginDiscovery().installed_plugins["myth_concolic_execution"]() |
||||
|
||||
plugin_loader.load(trace_plugin) |
||||
laser_evm.time = datetime.now() |
||||
plugin_loader.instrument_virtual_machine(laser_evm, None) |
||||
for transaction in concrete_data["steps"]: |
||||
execute_transaction( |
||||
laser_evm, |
||||
callee_address=transaction["address"], |
||||
caller_address=symbol_factory.BitVecVal( |
||||
int(transaction["origin"], 16), 256 |
||||
), |
||||
origin_address=symbol_factory.BitVecVal( |
||||
int(transaction["origin"], 16), 256 |
||||
), |
||||
gas_limit=int(transaction.get("gasLimit", "0x9999999999999999999999"), 16), |
||||
data=binascii.a2b_hex(transaction["input"][2:]), |
||||
gas_price=int(transaction.get("gasPrice", "0x773594000"), 16), |
||||
value=int(transaction["value"], 16), |
||||
track_gas=False, |
||||
) |
||||
|
||||
tx_id_manager.restart_counter() |
||||
return init_state, plugin_loader.plugin_list["MythX Trace Finder"].tx_trace # type: ignore |
@ -0,0 +1,133 @@ |
||||
from mythril.laser.ethereum.state.global_state import GlobalState |
||||
from mythril.laser.ethereum.state.constraints import Constraints |
||||
from mythril.laser.ethereum.strategy.basic import BasicSearchStrategy |
||||
from mythril.laser.ethereum.state.annotation import StateAnnotation |
||||
from mythril.laser.ethereum.transaction import ContractCreationTransaction |
||||
from mythril.laser.ethereum.util import get_instruction_index |
||||
from mythril.analysis.solver import get_transaction_sequence |
||||
from mythril.laser.smt import Not |
||||
from mythril.exceptions import UnsatError |
||||
|
||||
from functools import reduce |
||||
from typing import Dict, cast, List, Any, Tuple |
||||
from copy import copy |
||||
from . import CriterionSearchStrategy |
||||
import logging |
||||
import operator |
||||
|
||||
log = logging.getLogger(__name__) |
||||
|
||||
|
||||
class TraceAnnotation(StateAnnotation): |
||||
""" |
||||
This is the annotation used by the ConcolicStrategy to store concolic traces. |
||||
""" |
||||
|
||||
def __init__(self, trace=None): |
||||
self.trace = trace or [] |
||||
|
||||
@property |
||||
def persist_over_calls(self) -> bool: |
||||
return True |
||||
|
||||
def __copy__(self): |
||||
return TraceAnnotation(copy(self.trace)) |
||||
|
||||
|
||||
class ConcolicStrategy(CriterionSearchStrategy): |
||||
""" |
||||
Executes program concolically using the input trace till a specific branch |
||||
""" |
||||
|
||||
def __init__( |
||||
self, |
||||
work_list: List[GlobalState], |
||||
max_depth: int, |
||||
trace: List[List[Tuple[int, str]]], |
||||
flip_branch_addresses: List[str], |
||||
): |
||||
""" |
||||
|
||||
work_list: The work-list of states |
||||
max_depth: The maximum depth for the strategy to go through |
||||
trace: This is the trace to be followed, each element is of the type Tuple(program counter, tx_id) |
||||
flip_branch_addresses: Branch addresses to be flipped. |
||||
""" |
||||
super().__init__(work_list, max_depth) |
||||
self.trace: List[Tuple[int, str]] = reduce(operator.iconcat, trace, []) |
||||
self.last_tx_count: int = len(trace) |
||||
self.flip_branch_addresses: List[str] = flip_branch_addresses |
||||
self.results: Dict[str, Dict[str, Any]] = {} |
||||
|
||||
def check_completion_criterion(self): |
||||
if len(self.flip_branch_addresses) == len(self.results): |
||||
self.set_criterion_satisfied() |
||||
|
||||
def get_strategic_global_state(self) -> GlobalState: |
||||
""" |
||||
This function does the following:- |
||||
1) Choose the states by following the concolic trace. |
||||
2) In case we have an executed JUMPI that is in flip_branch_addresses, flip that branch. |
||||
:return: |
||||
""" |
||||
while len(self.work_list) > 0: |
||||
state = self.work_list.pop() |
||||
seq_id = len(state.world_state.transaction_sequence) |
||||
|
||||
trace_annotations = cast( |
||||
List[TraceAnnotation], |
||||
list(state.world_state.get_annotations(TraceAnnotation)), |
||||
) |
||||
|
||||
if len(trace_annotations) == 0: |
||||
annotation = TraceAnnotation() |
||||
state.world_state.annotate(annotation) |
||||
else: |
||||
annotation = trace_annotations[0] |
||||
|
||||
# Appends trace |
||||
annotation.trace.append((state.mstate.pc, state.current_transaction.id)) |
||||
|
||||
# If length of trace is 1 then it is not a JUMPI |
||||
if len(annotation.trace) < 2: |
||||
# If trace does not follow the specified path, ignore the state |
||||
if annotation.trace != self.trace[: len(annotation.trace)]: |
||||
continue |
||||
return state |
||||
|
||||
# Get the address of the previous pc |
||||
addr: str = str( |
||||
state.environment.code.instruction_list[annotation.trace[-2][0]][ |
||||
"address" |
||||
] |
||||
) |
||||
if ( |
||||
annotation.trace == self.trace[: len(annotation.trace)] |
||||
and seq_id == self.last_tx_count |
||||
and addr in self.flip_branch_addresses |
||||
and addr not in self.results |
||||
): |
||||
if ( |
||||
state.environment.code.instruction_list[annotation.trace[-2][0]][ |
||||
"opcode" |
||||
] |
||||
!= "JUMPI" |
||||
): |
||||
log.error( |
||||
f"The branch {addr} does not lead " |
||||
"to a jump address, skipping this branch" |
||||
) |
||||
continue |
||||
|
||||
constraints = Constraints(state.world_state.constraints[:-1]) |
||||
constraints.append(Not(state.world_state.constraints[-1])) |
||||
|
||||
try: |
||||
self.results[addr] = get_transaction_sequence(state, constraints) |
||||
except UnsatError: |
||||
self.results[addr] = None |
||||
elif annotation.trace != self.trace[: len(annotation.trace)]: |
||||
continue |
||||
self.check_completion_criterion() |
||||
return state |
||||
raise StopIteration |
Loading…
Reference in new issue