From 7378a3bc87009a302939bdd5b47997beb44e3296 Mon Sep 17 00:00:00 2001 From: Joran Honig Date: Thu, 28 Mar 2019 13:48:35 +0100 Subject: [PATCH 1/3] removes obsolete taint module --- mythril/laser/ethereum/taint_analysis.py | 454 ----------------------- 1 file changed, 454 deletions(-) delete mode 100644 mythril/laser/ethereum/taint_analysis.py diff --git a/mythril/laser/ethereum/taint_analysis.py b/mythril/laser/ethereum/taint_analysis.py deleted file mode 100644 index daf82c31..00000000 --- a/mythril/laser/ethereum/taint_analysis.py +++ /dev/null @@ -1,454 +0,0 @@ -"""This module implements classes needed to perform taint analysis.""" -import copy -import logging -from typing import List, Tuple, Union - -import mythril.laser.ethereum.util as helper -from mythril.analysis.symbolic import SymExecWrapper -from mythril.laser.ethereum.cfg import JumpType, Node -from mythril.laser.ethereum.state.environment import Environment -from mythril.laser.ethereum.state.global_state import GlobalState -from mythril.laser.smt import Expression - -log = logging.getLogger(__name__) - - -class TaintRecord: - """TaintRecord contains tainting information for a specific (state, node) - the information specifies the taint status before executing the operation - belonging to the state.""" - - def __init__(self): - """Builds a taint record.""" - self.stack = [] - self.memory = {} - self.storage = {} - self.states = [] - - def stack_tainted(self, index: int) -> Union[bool, None]: - """Returns taint value of stack element at index. - - :param index: - :return: - """ - if index < len(self.stack): - return self.stack[index] - return None - - def memory_tainted(self, index: int) -> bool: - """Returns taint value of memory element at index. - - :param index: - :return: - """ - if index in self.memory.keys(): - return self.memory[index] - return False - - def storage_tainted(self, index: int) -> bool: - """Returns taint value of storage element at index. - - :param index: - :return: - """ - if index in self.storage.keys(): - return self.storage[index] - return False - - def add_state(self, state: GlobalState) -> None: - """Adds state with this taint record. - - :param state: - """ - self.states.append(state) - - def clone(self) -> "TaintRecord": - """Clones this record. - - :return: - """ - clone = TaintRecord() - clone.stack = copy.deepcopy(self.stack) - clone.memory = copy.deepcopy(self.memory) - clone.storage = copy.deepcopy(self.storage) - return clone - - -class TaintResult: - """Taint analysis result obtained after having ran the taint runner.""" - - def __init__(self): - """Create a new tains result.""" - self.records = [] - - def check(self, state: GlobalState, stack_index: int) -> Union[bool, None]: - """Checks if stack variable is tainted, before executing the - instruction. - - :param state: state to check variable in - :param stack_index: index of stack variable - :return: tainted - """ - record = self._try_get_record(state) - if record is None: - return None - return record.stack_tainted(stack_index) - - def add_records(self, records: List[TaintRecord]) -> None: - """Adds records to this taint result. - - :param records: - """ - self.records += records - - def _try_get_record(self, state: GlobalState) -> Union[TaintRecord, None]: - """Finds record belonging to the state. - - :param state: - :return: - """ - for record in self.records: - if state in record.states: - return record - return None - - -class TaintRunner: - """Taint runner, is able to run taint analysis on symbolic execution - result.""" - - @staticmethod - def execute( - statespace: SymExecWrapper, node: Node, state: GlobalState, initial_stack=None - ) -> TaintResult: - """Runs taint analysis on the statespace. - - :param initial_stack: - :param statespace: symbolic statespace to run taint analysis on - :param node: taint introduction node - :param state: taint introduction state - :return: TaintResult object containing analysis results - """ - if initial_stack is None: - initial_stack = [] - result = TaintResult() - transaction_stack_length = len(node.states[0].transaction_stack) - # Build initial current_node - init_record = TaintRecord() - init_record.stack = initial_stack - - state_index = node.states.index(state) - - # List of (Node, TaintRecord, index) - current_nodes = [(node, init_record, state_index)] - environment = node.states[0].environment - - for node, record, index in current_nodes: - records = TaintRunner.execute_node(node, record, index) - - result.add_records(records) - if len(records) == 0: # continue if there is no record to work on - continue - children = TaintRunner.children( - node, statespace, environment, transaction_stack_length - ) - for child in children: - current_nodes.append((child, records[-1], 0)) - return result - - @staticmethod - def children( - node: Node, - statespace: SymExecWrapper, - environment: Environment, - transaction_stack_length: int, - ) -> List[Node]: - """ - - :param node: - :param statespace: - :param environment: - :param transaction_stack_length: - :return: - """ - direct_children = [ - statespace.nodes[edge.node_to] - for edge in statespace.edges - if edge.node_from == node.uid and edge.type != JumpType.Transaction - ] - children = [] - for child in direct_children: - if all( - len(state.transaction_stack) == transaction_stack_length - for state in child.states - ): - children.append(child) - elif all( - len(state.transaction_stack) > transaction_stack_length - for state in child.states - ): - children += TaintRunner.children( - child, statespace, environment, transaction_stack_length - ) - return children - - @staticmethod - def execute_node( - node: Node, last_record: TaintRecord, state_index=0 - ) -> List[TaintRecord]: - """Runs taint analysis on a given node. - - :param node: node to analyse - :param last_record: last taint record to work from - :param state_index: state index to start from - :return: List of taint records linked to the states in this node - """ - records = [last_record] - for index in range(state_index, len(node.states)): - current_state = node.states[index] - records.append(TaintRunner.execute_state(records[-1], current_state)) - return records[1:] - - @staticmethod - def execute_state(record: TaintRecord, state: GlobalState) -> TaintRecord: - """ - - :param record: - :param state: - :return: - """ - assert len(state.mstate.stack) == len(record.stack) - """ Runs taint analysis on a state """ - record.add_state(state) - new_record = record.clone() - - # Apply Change - op = state.get_current_instruction()["opcode"] - - if op in TaintRunner.stack_taint_table.keys(): - mutator = TaintRunner.stack_taint_table[op] - TaintRunner.mutate_stack(new_record, mutator) - elif op.startswith("PUSH"): - TaintRunner.mutate_push(op, new_record) - elif op.startswith("DUP"): - TaintRunner.mutate_dup(op, new_record) - elif op.startswith("SWAP"): - TaintRunner.mutate_swap(op, new_record) - elif op is "MLOAD": - TaintRunner.mutate_mload(new_record, state.mstate.stack[-1]) - elif op.startswith("MSTORE"): - TaintRunner.mutate_mstore(new_record, state.mstate.stack[-1]) - elif op is "SLOAD": - TaintRunner.mutate_sload(new_record, state.mstate.stack[-1]) - elif op is "SSTORE": - TaintRunner.mutate_sstore(new_record, state.mstate.stack[-1]) - elif op.startswith("LOG"): - TaintRunner.mutate_log(new_record, op) - elif op in ("CALL", "CALLCODE", "DELEGATECALL", "STATICCALL"): - TaintRunner.mutate_call(new_record, op) - else: - log.debug("Unknown operation encountered: {}".format(op)) - - return new_record - - @staticmethod - def mutate_stack(record: TaintRecord, mutator: Tuple[int, int]) -> None: - """ - - :param record: - :param mutator: - """ - pop, push = mutator - - values = [] - for i in range(pop): - values.append(record.stack.pop()) - - taint = any(values) - - for i in range(push): - record.stack.append(taint) - - @staticmethod - def mutate_push(op: str, record: TaintRecord) -> None: - """ - - :param op: - :param record: - """ - TaintRunner.mutate_stack(record, (0, 1)) - - @staticmethod - def mutate_dup(op: str, record: TaintRecord) -> None: - """ - - :param op: - :param record: - """ - depth = int(op[3:]) - index = len(record.stack) - depth - record.stack.append(record.stack[index]) - - @staticmethod - def mutate_swap(op: str, record: TaintRecord) -> None: - """ - - :param op: - :param record: - """ - depth = int(op[4:]) - l = len(record.stack) - 1 - i = l - depth - record.stack[l], record.stack[i] = record.stack[i], record.stack[l] - - @staticmethod - def mutate_mload(record: TaintRecord, op0: Expression) -> None: - """ - - :param record: - :param op0: - :return: - """ - _ = record.stack.pop() - try: - index = helper.get_concrete_int(op0) - except TypeError: - log.debug("Can't MLOAD taint track symbolically") - record.stack.append(False) - return - - record.stack.append(record.memory_tainted(index)) - - @staticmethod - def mutate_mstore(record: TaintRecord, op0: Expression) -> None: - """ - - :param record: - :param op0: - :return: - """ - _, value_taint = record.stack.pop(), record.stack.pop() - try: - index = helper.get_concrete_int(op0) - except TypeError: - log.debug("Can't mstore taint track symbolically") - return - - record.memory[index] = value_taint - - @staticmethod - def mutate_sload(record: TaintRecord, op0: Expression) -> None: - """ - - :param record: - :param op0: - :return: - """ - _ = record.stack.pop() - try: - index = helper.get_concrete_int(op0) - except TypeError: - log.debug("Can't MLOAD taint track symbolically") - record.stack.append(False) - return - - record.stack.append(record.storage_tainted(index)) - - @staticmethod - def mutate_sstore(record: TaintRecord, op0: Expression) -> None: - """ - - :param record: - :param op0: - :return: - """ - _, value_taint = record.stack.pop(), record.stack.pop() - try: - index = helper.get_concrete_int(op0) - except TypeError: - log.debug("Can't mstore taint track symbolically") - return - - record.storage[index] = value_taint - - @staticmethod - def mutate_log(record: TaintRecord, op: str) -> None: - """ - - :param record: - :param op: - """ - depth = int(op[3:]) - for _ in range(depth + 2): - record.stack.pop() - - @staticmethod - def mutate_call(record: TaintRecord, op: str) -> None: - """ - - :param record: - :param op: - """ - pops = 6 - if op in ("CALL", "CALLCODE"): - pops += 1 - for _ in range(pops): - record.stack.pop() - - record.stack.append(False) - - stack_taint_table = { - # instruction: (taint source, taint target) - "POP": (1, 0), - "ADD": (2, 1), - "MUL": (2, 1), - "SUB": (2, 1), - "AND": (2, 1), - "OR": (2, 1), - "XOR": (2, 1), - "NOT": (1, 1), - "BYTE": (2, 1), - "DIV": (2, 1), - "MOD": (2, 1), - "SDIV": (2, 1), - "SMOD": (2, 1), - "ADDMOD": (3, 1), - "MULMOD": (3, 1), - "EXP": (2, 1), - "SIGNEXTEND": (2, 1), - "LT": (2, 1), - "GT": (2, 1), - "SLT": (2, 1), - "SGT": (2, 1), - "EQ": (2, 1), - "ISZERO": (1, 1), - "CALLVALUE": (0, 1), - "CALLDATALOAD": (1, 1), - "CALLDATACOPY": (3, 0), # todo - "CALLDATASIZE": (0, 1), - "ADDRESS": (0, 1), - "BALANCE": (1, 1), - "ORIGIN": (0, 1), - "CALLER": (0, 1), - "CODESIZE": (0, 1), - "SHA3": (2, 1), - "GASPRICE": (0, 1), - "CODECOPY": (3, 0), - "EXTCODESIZE": (1, 1), - "EXTCODECOPY": (4, 0), - "RETURNDATASIZE": (0, 1), - "BLOCKHASH": (1, 1), - "COINBASE": (0, 1), - "TIMESTAMP": (0, 1), - "NUMBER": (0, 1), - "DIFFICULTY": (0, 1), - "GASLIMIT": (0, 1), - "JUMP": (1, 0), - "JUMPI": (2, 0), - "PC": (0, 1), - "MSIZE": (0, 1), - "GAS": (0, 1), - "CREATE": (3, 1), - "CREATE2": (4, 1), - "RETURN": (2, 0), - } From 135a59a0881c194397e00eb4bc1e7dca810d3677 Mon Sep 17 00:00:00 2001 From: Joran Honig Date: Thu, 28 Mar 2019 13:48:49 +0100 Subject: [PATCH 2/3] remove tests related to obsolete taint module --- tests/taint_mutate_stack_test.py | 30 ---------- tests/taint_record_test.py | 36 ------------ tests/taint_result_test.py | 35 ----------- tests/taint_runner_test.py | 99 -------------------------------- 4 files changed, 200 deletions(-) delete mode 100644 tests/taint_mutate_stack_test.py delete mode 100644 tests/taint_record_test.py delete mode 100644 tests/taint_result_test.py delete mode 100644 tests/taint_runner_test.py diff --git a/tests/taint_mutate_stack_test.py b/tests/taint_mutate_stack_test.py deleted file mode 100644 index 6414340b..00000000 --- a/tests/taint_mutate_stack_test.py +++ /dev/null @@ -1,30 +0,0 @@ -from mythril.laser.ethereum.taint_analysis import * - - -def test_mutate_not_tainted(): - # Arrange - record = TaintRecord() - - record.stack = [True, False, False] - # Act - TaintRunner.mutate_stack(record, (2, 1)) - - # Assert - assert record.stack_tainted(0) - assert record.stack_tainted(1) is False - assert record.stack == [True, False] - - -def test_mutate_tainted(): - # Arrange - record = TaintRecord() - - record.stack = [True, False, True] - - # Act - TaintRunner.mutate_stack(record, (2, 1)) - - # Assert - assert record.stack_tainted(0) - assert record.stack_tainted(1) - assert record.stack == [True, True] diff --git a/tests/taint_record_test.py b/tests/taint_record_test.py deleted file mode 100644 index ba45f295..00000000 --- a/tests/taint_record_test.py +++ /dev/null @@ -1,36 +0,0 @@ -from mythril.laser.ethereum.taint_analysis import * - - -def test_record_tainted_check(): - # arrange - record = TaintRecord() - record.stack = [True, False, True] - - # act - tainted = record.stack_tainted(2) - - # assert - assert tainted is True - - -def test_record_untainted_check(): - # arrange - record = TaintRecord() - record.stack = [True, False, False] - - # act - tainted = record.stack_tainted(2) - - # assert - assert tainted is False - - -def test_record_untouched_check(): - # arrange - record = TaintRecord() - - # act - tainted = record.stack_tainted(3) - - # assert - assert tainted is None diff --git a/tests/taint_result_test.py b/tests/taint_result_test.py deleted file mode 100644 index 6670f228..00000000 --- a/tests/taint_result_test.py +++ /dev/null @@ -1,35 +0,0 @@ -from mythril.laser.ethereum.taint_analysis import * -from mythril.laser.ethereum.state.global_state import GlobalState - - -def test_result_state(): - # arrange - taint_result = TaintResult() - record = TaintRecord() - state = GlobalState(2, None, None) - state.mstate.stack = [1, 2, 3] - record.add_state(state) - record.stack = [False, False, False] - # act - taint_result.add_records([record]) - tainted = taint_result.check(state, 2) - - # assert - assert tainted is False - assert record in taint_result.records - - -def test_result_no_state(): - # arrange - taint_result = TaintResult() - record = TaintRecord() - state = GlobalState(2, None, None) - state.mstate.stack = [1, 2, 3] - - # act - taint_result.add_records([record]) - tainted = taint_result.check(state, 2) - - # assert - assert tainted is None - assert record in taint_result.records diff --git a/tests/taint_runner_test.py b/tests/taint_runner_test.py deleted file mode 100644 index a4f40bbc..00000000 --- a/tests/taint_runner_test.py +++ /dev/null @@ -1,99 +0,0 @@ -import mock -import pytest -from pytest_mock import mocker -from mythril.laser.ethereum.taint_analysis import * -from mythril.laser.ethereum.cfg import Node, Edge -from mythril.laser.ethereum.state.account import Account -from mythril.laser.ethereum.state.environment import Environment -from mythril.laser.ethereum.state.machine_state import MachineState -from mythril.laser.ethereum.state.global_state import GlobalState -from mythril.laser.ethereum.svm import LaserEVM - - -def test_execute_state(mocker): - record = TaintRecord() - record.stack = [True, False, True] - - state = GlobalState(None, None, None) - state.mstate.stack = [1, 2, 3] - mocker.patch.object(state, "get_current_instruction") - state.get_current_instruction.return_value = {"opcode": "ADD"} - - # Act - new_record = TaintRunner.execute_state(record, state) - - # Assert - assert new_record.stack == [True, True] - assert record.stack == [True, False, True] - - -def test_execute_node(mocker): - record = TaintRecord() - record.stack = [True, True, False, False] - - state_1 = GlobalState(None, None, None) - state_1.mstate.stack = [1, 2, 3, 1] - state_1.mstate.pc = 1 - mocker.patch.object(state_1, "get_current_instruction") - state_1.get_current_instruction.return_value = {"opcode": "SWAP1"} - - state_2 = GlobalState(None, 1, None) - state_2.mstate.stack = [1, 2, 4, 1] - mocker.patch.object(state_2, "get_current_instruction") - state_2.get_current_instruction.return_value = {"opcode": "ADD"} - - node = Node("Test contract") - node.states = [state_1, state_2] - - # Act - records = TaintRunner.execute_node(node, record) - - # Assert - assert len(records) == 2 - - assert records[0].stack == [True, True, False, False] - assert records[1].stack == [True, True, False] - - assert state_2 in records[0].states - assert state_1 in record.states - - -def test_execute(mocker): - active_account = Account("0x00") - environment = Environment(active_account, None, None, None, None, None) - state_1 = GlobalState(None, environment, None, MachineState(gas_limit=8000000)) - state_1.mstate.stack = [1, 2] - mocker.patch.object(state_1, "get_current_instruction") - state_1.get_current_instruction.return_value = {"opcode": "PUSH"} - - state_2 = GlobalState(None, environment, None, MachineState(gas_limit=8000000)) - state_2.mstate.stack = [1, 2, 3] - mocker.patch.object(state_2, "get_current_instruction") - state_2.get_current_instruction.return_value = {"opcode": "ADD"} - - node_1 = Node("Test contract") - node_1.states = [state_1, state_2] - - state_3 = GlobalState(None, environment, None, MachineState(gas_limit=8000000)) - state_3.mstate.stack = [1, 2] - mocker.patch.object(state_3, "get_current_instruction") - state_3.get_current_instruction.return_value = {"opcode": "ADD"} - - node_2 = Node("Test contract") - node_2.states = [state_3] - - edge = Edge(node_1.uid, node_2.uid) - - statespace = LaserEVM(None) - statespace.edges = [edge] - statespace.nodes[node_1.uid] = node_1 - statespace.nodes[node_2.uid] = node_2 - - # Act - result = TaintRunner.execute(statespace, node_1, state_1, [True, True]) - - # Assert - print(result) - assert len(result.records) == 3 - assert result.records[2].states == [] - assert state_3 in result.records[1].states From 17ca323535461484b10e6c254362ac31e513ff7a Mon Sep 17 00:00:00 2001 From: Joran Honig Date: Thu, 28 Mar 2019 14:14:22 +0100 Subject: [PATCH 3/3] remove old tod module --- .../modules/transaction_order_dependence.py | 198 ------------------ 1 file changed, 198 deletions(-) delete mode 100644 mythril/analysis/modules/transaction_order_dependence.py diff --git a/mythril/analysis/modules/transaction_order_dependence.py b/mythril/analysis/modules/transaction_order_dependence.py deleted file mode 100644 index 4759d751..00000000 --- a/mythril/analysis/modules/transaction_order_dependence.py +++ /dev/null @@ -1,198 +0,0 @@ -"""This module contains the detection code to find the existence of transaction -order dependence.""" -import copy -import logging -import re - -from mythril.analysis import solver -from mythril.analysis.modules.base import DetectionModule -from mythril.analysis.ops import * -from mythril.analysis.report import Issue -from mythril.analysis.swc_data import TX_ORDER_DEPENDENCE -from mythril.exceptions import UnsatError - -log = logging.getLogger(__name__) - - -class TxOrderDependenceModule(DetectionModule): - """This module finds the existence of transaction order dependence.""" - - def __init__(self): - super().__init__( - name="Transaction Order Dependence", - swc_id=TX_ORDER_DEPENDENCE, - description=( - "This module finds the existance of transaction order dependence " - "vulnerabilities. The following webpage contains an extensive description " - "of the vulnerability: " - "https://consensys.github.io/smart-contract-best-practices/known_attacks/#transaction-ordering-dependence-tod-front-running" - ), - ) - - def execute(self, statespace): - """Executes the analysis module. - - :param statespace: - :return: - """ - log.debug("Executing module: TOD") - - issues = [] - - for call in statespace.calls: - # Do analysis - interesting_storages = list(self._get_influencing_storages(call)) - changing_sstores = list( - self._get_influencing_sstores(statespace, interesting_storages) - ) - - description_tail = ( - "A transaction order dependence vulnerability may exist in this contract. The value or " - "target of the call statement is loaded from a writable storage location." - ) - - # Build issue if necessary - if len(changing_sstores) > 0: - node = call.node - instruction = call.state.get_current_instruction() - issue = Issue( - contract=node.contract_name, - function_name=node.function_name, - address=instruction["address"], - title="Transaction Order Dependence", - bytecode=call.state.environment.code.bytecode, - swc_id=TX_ORDER_DEPENDENCE, - severity="Medium", - description_head="The call outcome may depend on transaction order.", - description_tail=description_tail, - gas_used=( - call.state.mstate.min_gas_used, - call.state.mstate.max_gas_used, - ), - ) - - issues.append(issue) - - return issues - - # TODO: move to __init__ or util module - @staticmethod - def _get_states_with_opcode(statespace, opcode): - """Gets all (state, node) tuples in statespace with opcode. - - :param statespace: - :param opcode: - """ - for k in statespace.nodes: - node = statespace.nodes[k] - for state in node.states: - if state.get_current_instruction()["opcode"] == opcode: - yield state, node - - @staticmethod - def _dependent_on_storage(expression): - """Checks if expression is dependent on a storage symbol and returns - the influencing storages. - - :param expression: - :return: - """ - pattern = re.compile(r"storage_[a-z0-9_&^]*[0-9]+") - return pattern.findall(str(simplify(expression))) - - @staticmethod - def _get_storage_variable(storage, state): - """Get storage z3 object given storage name and the state. - - :param storage: storage name example: storage_0 - :param state: state to retrieve the variable from - :return: z3 object representing storage - """ - index = int(re.search("[0-9]+", storage).group()) - try: - return state.environment.active_account.storage[index] - except KeyError: - return None - - def _can_change(self, constraints, variable): - """Checks if the variable can change given some constraints. - - :param constraints: - :param variable: - :return: - """ - _constraints = copy.deepcopy(constraints) - try: - model = solver.get_model(_constraints) - except UnsatError: - return False - try: - initial_value = int(str(model.eval(variable, model_completion=True))) - return ( - self._try_constraints(constraints, [variable != initial_value]) - is not None - ) - except AttributeError: - return False - - def _get_influencing_storages(self, call): - """Examines a Call object and returns an iterator of all storages that - influence the call value or direction. - - :param call: - """ - state = call.state - node = call.node - - # Get relevant storages - to, value = call.to, call.value - storages = [] - if to.type == VarType.SYMBOLIC: - storages += self._dependent_on_storage(to.val) - if value.type == VarType.SYMBOLIC: - storages += self._dependent_on_storage(value.val) - - # See if they can change within the constraints of the node - for storage in storages: - variable = self._get_storage_variable(storage, state) - can_change = self._can_change(node.constraints, variable) - if can_change: - yield storage - - def _get_influencing_sstores(self, statespace, interesting_storages): - """Gets sstore (state, node) tuples that write to interesting_storages. - - :param statespace: - :param interesting_storages: - """ - for sstore_state, node in self._get_states_with_opcode(statespace, "SSTORE"): - index, value = sstore_state.mstate.stack[-1], sstore_state.mstate.stack[-2] - try: - index = util.get_concrete_int(index) - except TypeError: - index = str(index) - if "storage_{}".format(index) not in interesting_storages: - continue - - yield sstore_state, node - - # TODO: remove - @staticmethod - def _try_constraints(constraints, new_constraints): - """Tries new constraints. - - :param constraints: - :param new_constraints: - :return Model if satisfiable otherwise None - """ - _constraints = copy.deepcopy(constraints) - for constraint in new_constraints: - _constraints.append(copy.deepcopy(constraint)) - try: - model = solver.get_model(_constraints) - return model - except UnsatError: - return None - - -detector = TxOrderDependenceModule()