mirror of https://github.com/ConsenSys/mythril
parent
989d46eb52
commit
17ca323535
@ -1,198 +0,0 @@ |
||||
"""This module contains the detection code to find the existence of transaction |
||||
order dependence.""" |
||||
import copy |
||||
import logging |
||||
import re |
||||
|
||||
from mythril.analysis import solver |
||||
from mythril.analysis.modules.base import DetectionModule |
||||
from mythril.analysis.ops import * |
||||
from mythril.analysis.report import Issue |
||||
from mythril.analysis.swc_data import TX_ORDER_DEPENDENCE |
||||
from mythril.exceptions import UnsatError |
||||
|
||||
log = logging.getLogger(__name__) |
||||
|
||||
|
||||
class TxOrderDependenceModule(DetectionModule): |
||||
"""This module finds the existence of transaction order dependence.""" |
||||
|
||||
def __init__(self): |
||||
super().__init__( |
||||
name="Transaction Order Dependence", |
||||
swc_id=TX_ORDER_DEPENDENCE, |
||||
description=( |
||||
"This module finds the existance of transaction order dependence " |
||||
"vulnerabilities. The following webpage contains an extensive description " |
||||
"of the vulnerability: " |
||||
"https://consensys.github.io/smart-contract-best-practices/known_attacks/#transaction-ordering-dependence-tod-front-running" |
||||
), |
||||
) |
||||
|
||||
def execute(self, statespace): |
||||
"""Executes the analysis module. |
||||
|
||||
:param statespace: |
||||
:return: |
||||
""" |
||||
log.debug("Executing module: TOD") |
||||
|
||||
issues = [] |
||||
|
||||
for call in statespace.calls: |
||||
# Do analysis |
||||
interesting_storages = list(self._get_influencing_storages(call)) |
||||
changing_sstores = list( |
||||
self._get_influencing_sstores(statespace, interesting_storages) |
||||
) |
||||
|
||||
description_tail = ( |
||||
"A transaction order dependence vulnerability may exist in this contract. The value or " |
||||
"target of the call statement is loaded from a writable storage location." |
||||
) |
||||
|
||||
# Build issue if necessary |
||||
if len(changing_sstores) > 0: |
||||
node = call.node |
||||
instruction = call.state.get_current_instruction() |
||||
issue = Issue( |
||||
contract=node.contract_name, |
||||
function_name=node.function_name, |
||||
address=instruction["address"], |
||||
title="Transaction Order Dependence", |
||||
bytecode=call.state.environment.code.bytecode, |
||||
swc_id=TX_ORDER_DEPENDENCE, |
||||
severity="Medium", |
||||
description_head="The call outcome may depend on transaction order.", |
||||
description_tail=description_tail, |
||||
gas_used=( |
||||
call.state.mstate.min_gas_used, |
||||
call.state.mstate.max_gas_used, |
||||
), |
||||
) |
||||
|
||||
issues.append(issue) |
||||
|
||||
return issues |
||||
|
||||
# TODO: move to __init__ or util module |
||||
@staticmethod |
||||
def _get_states_with_opcode(statespace, opcode): |
||||
"""Gets all (state, node) tuples in statespace with opcode. |
||||
|
||||
:param statespace: |
||||
:param opcode: |
||||
""" |
||||
for k in statespace.nodes: |
||||
node = statespace.nodes[k] |
||||
for state in node.states: |
||||
if state.get_current_instruction()["opcode"] == opcode: |
||||
yield state, node |
||||
|
||||
@staticmethod |
||||
def _dependent_on_storage(expression): |
||||
"""Checks if expression is dependent on a storage symbol and returns |
||||
the influencing storages. |
||||
|
||||
:param expression: |
||||
:return: |
||||
""" |
||||
pattern = re.compile(r"storage_[a-z0-9_&^]*[0-9]+") |
||||
return pattern.findall(str(simplify(expression))) |
||||
|
||||
@staticmethod |
||||
def _get_storage_variable(storage, state): |
||||
"""Get storage z3 object given storage name and the state. |
||||
|
||||
:param storage: storage name example: storage_0 |
||||
:param state: state to retrieve the variable from |
||||
:return: z3 object representing storage |
||||
""" |
||||
index = int(re.search("[0-9]+", storage).group()) |
||||
try: |
||||
return state.environment.active_account.storage[index] |
||||
except KeyError: |
||||
return None |
||||
|
||||
def _can_change(self, constraints, variable): |
||||
"""Checks if the variable can change given some constraints. |
||||
|
||||
:param constraints: |
||||
:param variable: |
||||
:return: |
||||
""" |
||||
_constraints = copy.deepcopy(constraints) |
||||
try: |
||||
model = solver.get_model(_constraints) |
||||
except UnsatError: |
||||
return False |
||||
try: |
||||
initial_value = int(str(model.eval(variable, model_completion=True))) |
||||
return ( |
||||
self._try_constraints(constraints, [variable != initial_value]) |
||||
is not None |
||||
) |
||||
except AttributeError: |
||||
return False |
||||
|
||||
def _get_influencing_storages(self, call): |
||||
"""Examines a Call object and returns an iterator of all storages that |
||||
influence the call value or direction. |
||||
|
||||
:param call: |
||||
""" |
||||
state = call.state |
||||
node = call.node |
||||
|
||||
# Get relevant storages |
||||
to, value = call.to, call.value |
||||
storages = [] |
||||
if to.type == VarType.SYMBOLIC: |
||||
storages += self._dependent_on_storage(to.val) |
||||
if value.type == VarType.SYMBOLIC: |
||||
storages += self._dependent_on_storage(value.val) |
||||
|
||||
# See if they can change within the constraints of the node |
||||
for storage in storages: |
||||
variable = self._get_storage_variable(storage, state) |
||||
can_change = self._can_change(node.constraints, variable) |
||||
if can_change: |
||||
yield storage |
||||
|
||||
def _get_influencing_sstores(self, statespace, interesting_storages): |
||||
"""Gets sstore (state, node) tuples that write to interesting_storages. |
||||
|
||||
:param statespace: |
||||
:param interesting_storages: |
||||
""" |
||||
for sstore_state, node in self._get_states_with_opcode(statespace, "SSTORE"): |
||||
index, value = sstore_state.mstate.stack[-1], sstore_state.mstate.stack[-2] |
||||
try: |
||||
index = util.get_concrete_int(index) |
||||
except TypeError: |
||||
index = str(index) |
||||
if "storage_{}".format(index) not in interesting_storages: |
||||
continue |
||||
|
||||
yield sstore_state, node |
||||
|
||||
# TODO: remove |
||||
@staticmethod |
||||
def _try_constraints(constraints, new_constraints): |
||||
"""Tries new constraints. |
||||
|
||||
:param constraints: |
||||
:param new_constraints: |
||||
:return Model if satisfiable otherwise None |
||||
""" |
||||
_constraints = copy.deepcopy(constraints) |
||||
for constraint in new_constraints: |
||||
_constraints.append(copy.deepcopy(constraint)) |
||||
try: |
||||
model = solver.get_model(_constraints) |
||||
return model |
||||
except UnsatError: |
||||
return None |
||||
|
||||
|
||||
detector = TxOrderDependenceModule() |
Loading…
Reference in new issue