From 056e105beef3a1bd37b0db57bcbacbac908a20c2 Mon Sep 17 00:00:00 2001 From: Joran Honig Date: Fri, 5 Apr 2019 11:19:38 +0200 Subject: [PATCH] refactor state change after external call module removed call issue class as this didn't add any extra features over annotations removed dependency on get_call_from_state removed dependency on ops moved issue building logic to the annotation consider different call opcodes for state changes and external calls use bitvecs and smt solving for ether transfer check --- .../modules/state_change_external_calls.py | 204 ++++++++---------- 1 file changed, 84 insertions(+), 120 deletions(-) diff --git a/mythril/analysis/modules/state_change_external_calls.py b/mythril/analysis/modules/state_change_external_calls.py index 53849587..67b120ca 100644 --- a/mythril/analysis/modules/state_change_external_calls.py +++ b/mythril/analysis/modules/state_change_external_calls.py @@ -1,14 +1,12 @@ -from mythril.analysis.ops import Call, Variable, VarType from mythril.analysis.swc_data import REENTRANCY from mythril.analysis.modules.base import DetectionModule from mythril.analysis.report import Issue -from mythril.analysis.call_helpers import get_call_from_state -from mythril.laser.smt import symbol_factory, simplify, UGT +from mythril.laser.smt import symbol_factory, UGT, BitVec from mythril.laser.ethereum.state.global_state import GlobalState from mythril.laser.ethereum.state.annotation import StateAnnotation from mythril.analysis import solver from mythril.exceptions import UnsatError -from typing import List, cast +from typing import List, cast, Optional from copy import copy import logging @@ -21,25 +19,47 @@ Check whether there is a state change of the contract after the execution of an """ -class CallIssue: - """ This class is a struct of - call: the Call struct - user_defined_address: Whether the address can be defined by user or not - """ - - def __init__(self, call: Call, user_defined_address: bool) -> None: - self.call = call +class StateChangeCallsAnnotation(StateAnnotation): + def __init__(self, call_state: GlobalState, user_defined_address: bool) -> None: + self.call_state = call_state + self.state_change_states = [] # type: List[GlobalState] self.user_defined_address = user_defined_address + def __copy__(self): + new_annotation = StateChangeCallsAnnotation( + self.call_state, self.user_defined_address + ) + new_annotation.state_change_states = self.state_change_states[:] + return new_annotation -class StateChangeCallsAnnotation(StateAnnotation): - def __init__(self) -> None: - self.calls = [] # type: List[CallIssue] + def get_issue(self) -> Optional[Issue]: + if not self.state_change_states: + return None - def __copy__(self): - result = StateChangeCallsAnnotation() - result.calls = copy(self.calls) - return result + severity = "Medium" if self.user_defined_address else "Low" + address = self.call_state.get_current_instruction()["address"] + logging.debug( + "[EXTERNAL_CALLS] Detected state changes at addresses: {}".format(address) + ) + description_head = ( + "The contract account state is changed after an external call. " + ) + description_tail = ( + "Consider that the called contract could re-enter the function before this " + "state change takes place. This can lead to business logic vulnerabilities." + ) + + return Issue( + contract=self.call_state.environment.active_account.contract_name, + function_name=self.call_state.environment.active_function_name, + address=address, + title="State change after external call", + severity=severity, + description_head=description_head, + description_tail=description_tail, + swc_id=REENTRANCY, + bytecode=self.call_state.environment.code.bytecode, + ) class StateChange(DetectionModule): @@ -64,25 +84,15 @@ class StateChange(DetectionModule): ) def execute(self, state: GlobalState): - """ - - :param state: - :return: - """ self._issues.extend(self._analyze_state(state)) return self.issues @staticmethod - def _add_external_call( - state: GlobalState, annotations: List[StateChangeCallsAnnotation] - ) -> None: - call = get_call_from_state(state) - gas = state.mstate.stack[-1] - to = state.mstate.stack[-2] - if call is None: - return + def _add_external_call(global_state: GlobalState) -> None: + gas = global_state.mstate.stack[-1] + to = global_state.mstate.stack[-2] try: - constraints = copy(state.mstate.constraints) + constraints = copy(global_state.mstate.constraints) solver.get_model( constraints + [UGT(gas, symbol_factory.BitVecVal(2300, 256))] ) @@ -91,111 +101,65 @@ class StateChange(DetectionModule): try: constraints += [to == 0xDEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEF] solver.get_model(constraints) - annotations[0].calls.append( - CallIssue(call=call, user_defined_address=True) - ) - except UnsatError: - annotations[0].calls.append( - CallIssue(call=call, user_defined_address=False) - ) + global_state.annotate(StateChangeCallsAnnotation(global_state, True)) + except UnsatError: + global_state.annotate(StateChangeCallsAnnotation(global_state, False)) except UnsatError: pass @staticmethod - def _analyze_state(state: GlobalState) -> List[Issue]: - """ + def _analyze_state(global_state: GlobalState) -> List[Issue]: - :param state: - :return: - """ - address = state.get_current_instruction()["address"] annotations = cast( List[StateChangeCallsAnnotation], - list(state.get_annotations(StateChangeCallsAnnotation)), + list(global_state.get_annotations(StateChangeCallsAnnotation)), ) - opcode = state.get_current_instruction()["opcode"] + op_code = global_state.get_current_instruction()["opcode"] if len(annotations) == 0: - if opcode in ("SSTORE", "CREATE", "CREATE2"): + if op_code in ("SSTORE", "CREATE", "CREATE2"): return [] - log.debug("Creating annotation for state") - state.annotate(StateChangeCallsAnnotation()) - annotations = cast( - List[StateChangeCallsAnnotation], - list(state.get_annotations(StateChangeCallsAnnotation)), - ) - - if opcode in ("SSTORE", "CREATE", "CREATE2"): - return StateChange._handle_state_change( - state, address=address, annotation=annotations[0] - ) - call = get_call_from_state(state) - - if call is None: - return [] - - if StateChange._balance_change(call.value): - return StateChange._handle_state_change( - state, address=address, annotation=annotations[0] - ) - - if opcode == "CALL": - StateChange._add_external_call(state, annotations=annotations) - - return [] - - @staticmethod - def _get_state_change_issues( - callissues: List[CallIssue], state: GlobalState, address: int - ) -> List[Issue]: - issues = [] - for callissue in callissues: - severity = "Medium" if callissue.user_defined_address else "Low" - call = callissue.call - logging.debug( - "[EXTERNAL_CALLS] Detected state changes at addresses: {}".format( - address - ) - ) - description_head = ( - "The contract account state is changed after an external call. " - ) - description_tail = ( - "Consider that the called contract could re-enter the function before this " - "state change takes place. This can lead to business logic vulnerabilities." - ) - - issue = Issue( - contract=call.state.environment.active_account.contract_name, - function_name=call.state.environment.active_function_name, - address=address, - title="State change after external call", - severity=severity, - description_head=description_head, - description_tail=description_tail, - swc_id=REENTRANCY, - bytecode=state.environment.code.bytecode, - ) - issues.append(issue) - return issues - - @staticmethod - def _handle_state_change( - state: GlobalState, address: int, annotation: StateChangeCallsAnnotation - ) -> List[Issue]: - calls = annotation.calls - issues = StateChange._get_state_change_issues(calls, state, address) - return issues + if op_code in ("SSTORE", "CREATE", "CREATE2"): + for annotation in annotations: + annotation.state_change_states.append(global_state) + + # Record state changes following from a transfer of ether + if op_code in ("CALL", "DELEGATECALL", "CALLCODE"): + value: BitVec = global_state.mstate.stack[-3] + if StateChange._balance_change(value, global_state): + for annotation in annotations: + annotation.state_change_states.append(global_state) + + # Record external calls + if op_code in ("CALL", "DELEGATECALL", "CALLCODE"): + StateChange._add_external_call(global_state) + + # Check for vulnerabilities + vulnerabilities = [] + for annotation in annotations: + if not annotation.state_change_states: + continue + vulnerabilities.append(annotation.get_issue()) + global_state.annotations.remove(annotation) + return vulnerabilities @staticmethod - def _balance_change(value: BitVec) -> bool: + def _balance_change(value: BitVec, global_state: GlobalState) -> bool: if not value.symbolic: assert value.value is not None return value.value > 0 + else: - zero = symbol_factory.BitVecVal(0, 256) - return simplify(value > zero) + constraints = copy(global_state.mstate.constraints) + + try: + solver.get_model( + constraints + [value > symbol_factory.BitVecVal(0, 256)] + ) + return True + except UnsatError: + return False detector = StateChange()