From 66149578bc8767d35f6228b2cf5f5102a0f9eafa Mon Sep 17 00:00:00 2001 From: Bernhard Mueller Date: Wed, 28 Nov 2018 11:14:51 +0700 Subject: [PATCH] Refactor external calls module --- mythril/analysis/modules/delegatecall.py | 43 ---- mythril/analysis/modules/external_calls.py | 259 ++++++--------------- 2 files changed, 70 insertions(+), 232 deletions(-) diff --git a/mythril/analysis/modules/delegatecall.py b/mythril/analysis/modules/delegatecall.py index 5098b9c2..7d70e735 100644 --- a/mythril/analysis/modules/delegatecall.py +++ b/mythril/analysis/modules/delegatecall.py @@ -37,9 +37,6 @@ class DelegateCallModule(DetectionModule): if meminstart.type == VarType.CONCRETE: issues += self._concrete_call(call, state, address, meminstart) - if call.to.type == VarType.SYMBOLIC: - issues += self._symbolic_call(call, state, address, statespace) - return issues def _concrete_call(self, call, state, address, meminstart): @@ -68,45 +65,5 @@ class DelegateCallModule(DetectionModule): return [issue] - def _symbolic_call(self, call, state, address, statespace): - issue = Issue( - contract=call.node.contract_name, - function_name=call.node.function_name, - address=address, - swc_id=DELEGATECALL_TO_UNTRUSTED_CONTRACT, - bytecode=state.environment.code.bytecode, - title=call.type + " to a user-supplied address", - gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used), - ) - - if "calldata" in str(call.to): - issue.description = "This contract delegates execution to a contract address obtained from calldata." - - else: - m = re.search(r"storage_([a-z0-9_&^]+)", str(call.to)) - - if m: - idx = m.group(1) - func = statespace.find_storage_write( - state.environment.active_account.address, idx - ) - - if func: - issue.description = ( - "This contract delegates execution to a contract address in storage slot " - + str(idx) - + ". This storage slot can be written to by calling the function `" - + func - + "`." - ) - - else: - logging.debug( - "[DELEGATECALL] No storage writes to index " + str(idx) - ) - - issue.description += " Be aware that the called contract gets unrestricted access to this contract's state." - return [issue] - detector = DelegateCallModule() diff --git a/mythril/analysis/modules/external_calls.py b/mythril/analysis/modules/external_calls.py index 3a2a089b..4e073626 100644 --- a/mythril/analysis/modules/external_calls.py +++ b/mythril/analysis/modules/external_calls.py @@ -4,9 +4,8 @@ from mythril.analysis.report import Issue from mythril.analysis import solver from mythril.analysis.swc_data import REENTRANCY from mythril.analysis.modules.base import DetectionModule -import re +from mythril.exceptions import UnsatError import logging -from mythril.laser.ethereum.cfg import JumpType class ExternalCallModule(DetectionModule): @@ -20,199 +19,81 @@ class ExternalCallModule(DetectionModule): self.max_search_depth = max_search_depth self.calls_visited = [] - def search_children( - self, statespace, node, transaction_id, start_index=0, depth=0, results=None - ): - if results is None: - results = [] - logging.debug("SEARCHING NODE %d", node.uid) - - if depth < self.max_search_depth: - - n_states = len(node.states) - - if n_states > start_index: - - for j in range(start_index, n_states): - if ( - node.states[j].get_current_instruction()["opcode"] == "SSTORE" - and node.states[j].current_transaction.id == transaction_id - ): - results.append( - node.states[j].get_current_instruction()["address"] - ) - children = [] - - for edge in statespace.edges: - if edge.node_from == node.uid and edge.type != JumpType.Transaction: - children.append(statespace.nodes[edge.node_to]) - - if len(children): - for node in children: - results += self.search_children( - statespace, - node, - transaction_id, - depth=depth + 1, - results=results, - ) - - return results - - def execute(self, statespace): - + def execute(self, state_space): + logging.debug("Executing module: %s", self.name) issues = [] - for call in statespace.calls: + for k in state_space.nodes: + node = state_space.nodes[k] + for state in node.states: + issues += self._analyze_state(state, node) - state = call.state - address = state.get_current_instruction()["address"] - - if call.type == "CALL": + return issues - logging.debug( - "[EXTERNAL_CALLS] Call to: %s, value = %s, gas = %s" - % (str(call.to), str(call.value), str(call.gas)) + @staticmethod + def _analyze_state(state, node): + issues = [] + instruction = state.get_current_instruction() + + if instruction["opcode"] not in ("CALL", "CALLCODE", "DELEGATECALL", "STATICCALL"): + return [] + + gas = state.mstate.stack[-1] + to = state.mstate.stack[-2] + + address = state.get_current_instruction()["address"] + + try: + constraints = state.node.constraints + [gas > 2300] + transaction_sequence = solver.get_transaction_sequence(state, constraints) + + try: + constraints += [to == 0xDEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEF] + transaction_sequence = solver.get_transaction_sequence(state, constraints) + + except UnsatError: + debug = str(transaction_sequence) + description = "The contract executes a function call to an external address. " \ + "Verify that the code at this address is trusted and immutable." + + issue = Issue( + contract=node.contract_name, + function_name=state.node.function_name, + address=address, + swc_id=REENTRANCY, + title="External call", + _type="Informational", + bytecode=state.environment.code.bytecode, + description=description, + debug=debug, + gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used), ) - if ( - call.to.type == VarType.SYMBOLIC - and (call.gas.type == VarType.CONCRETE and call.gas.val > 2300) - or ( - call.gas.type == VarType.SYMBOLIC - and "2300" not in str(call.gas) - ) - ): - - description = "This contract executes a message call to " - - target = str(call.to) - user_supplied = False - - if "calldata" in target or "caller" in target: - - if "calldata" in target: - description += ( - "an address provided as a function argument. " - ) - else: - description += "the address of the transaction sender. " - - user_supplied = True - else: - m = re.search(r"storage_([a-z0-9_&^]+)", str(call.to)) - - if m: - idx = m.group(1) - - func = statespace.find_storage_write( - state.environment.active_account.address, idx - ) - - if func: - - description += ( - "an address found at storage slot " - + str(idx) - + ". " - + "This storage slot can be written to by calling the function `" - + func - + "`. " - ) - user_supplied = True - - if user_supplied: - - description += ( - "Generally, it is not recommended to call user-supplied addresses using Solidity's call() construct. " - "Note that attackers might leverage reentrancy attacks to exploit race conditions or manipulate this contract's state." - ) - - issue = Issue( - contract=call.node.contract_name, - function_name=call.node.function_name, - address=address, - title="Message call to external contract", - _type="Warning", - description=description, - bytecode=state.environment.code.bytecode, - swc_id=REENTRANCY, - gas_used=( - state.mstate.min_gas_used, - state.mstate.max_gas_used, - ), - ) - - else: - - description += "to another contract. Make sure that the called contract is trusted and does not execute user-supplied code." - - issue = Issue( - contract=call.node.contract_name, - function_name=call.node.function_name, - address=address, - title="Message call to external contract", - _type="Informational", - description=description, - bytecode=state.environment.code.bytecode, - swc_id=REENTRANCY, - gas_used=( - state.mstate.min_gas_used, - state.mstate.max_gas_used, - ), - ) - - issues.append(issue) - - if address not in self.calls_visited: - self.calls_visited.append(address) - - logging.debug( - "[EXTERNAL_CALLS] Checking for state changes starting from " - + call.node.function_name - ) - - # Check for SSTORE in remaining instructions in current node & nodes down the CFG - - state_change_addresses = self.search_children( - statespace, - call.node, - call.state.current_transaction.id, - call.state_index + 1, - depth=0, - results=[], - ) - - logging.debug( - "[EXTERNAL_CALLS] Detected state changes at addresses: " - + str(state_change_addresses) - ) - - if len(state_change_addresses): - for address in state_change_addresses: - description = ( - "The contract account state is changed after an external call. " - "Consider that the called contract could re-enter the function before this " - "state change takes place. This can lead to business logic vulnerabilities." - ) - - issue = Issue( - contract=call.node.contract_name, - function_name=call.node.function_name, - address=address, - title="State change after external call", - _type="Warning", - description=description, - bytecode=state.environment.code.bytecode, - swc_id=REENTRANCY, - gas_used=( - state.mstate.min_gas_used, - state.mstate.max_gas_used, - ), - ) - issues.append(issue) - - return issues + issues.append(issue) + return issues + + debug = str(transaction_sequence) + description = "The contract executes a function call with high gas to a user-supplied address. " \ + "Note that the callee can contain arbitrary code and may re-enter any function in this contract. " \ + "Review the business logic carefully to prevent unanticipated effects on the contract state." + + issue = Issue( + contract=node.contract_name, + function_name=node.function_name, + address=address, + swc_id=REENTRANCY, + title="External call to user-supplied address", + _type="Warning", + bytecode=state.environment.code.bytecode, + description=description, + debug=debug, + gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used), + ) + issues.append(issue) + return issues + + except UnsatError: + logging.debug("[EXTERNAL_CALLS] no model found for setting caller address.") detector = ExternalCallModule()