Refactor external calls module

pull/757/head
Bernhard Mueller 6 years ago
parent f9519367a6
commit 66149578bc
  1. 43
      mythril/analysis/modules/delegatecall.py
  2. 215
      mythril/analysis/modules/external_calls.py

@ -37,9 +37,6 @@ class DelegateCallModule(DetectionModule):
if meminstart.type == VarType.CONCRETE:
issues += self._concrete_call(call, state, address, meminstart)
if call.to.type == VarType.SYMBOLIC:
issues += self._symbolic_call(call, state, address, statespace)
return issues
def _concrete_call(self, call, state, address, meminstart):
@ -68,45 +65,5 @@ class DelegateCallModule(DetectionModule):
return [issue]
def _symbolic_call(self, call, state, address, statespace):
issue = Issue(
contract=call.node.contract_name,
function_name=call.node.function_name,
address=address,
swc_id=DELEGATECALL_TO_UNTRUSTED_CONTRACT,
bytecode=state.environment.code.bytecode,
title=call.type + " to a user-supplied address",
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
if "calldata" in str(call.to):
issue.description = "This contract delegates execution to a contract address obtained from calldata."
else:
m = re.search(r"storage_([a-z0-9_&^]+)", str(call.to))
if m:
idx = m.group(1)
func = statespace.find_storage_write(
state.environment.active_account.address, idx
)
if func:
issue.description = (
"This contract delegates execution to a contract address in storage slot "
+ str(idx)
+ ". This storage slot can be written to by calling the function `"
+ func
+ "`."
)
else:
logging.debug(
"[DELEGATECALL] No storage writes to index " + str(idx)
)
issue.description += " Be aware that the called contract gets unrestricted access to this contract's state."
return [issue]
detector = DelegateCallModule()

@ -4,9 +4,8 @@ from mythril.analysis.report import Issue
from mythril.analysis import solver
from mythril.analysis.swc_data import REENTRANCY
from mythril.analysis.modules.base import DetectionModule
import re
from mythril.exceptions import UnsatError
import logging
from mythril.laser.ethereum.cfg import JumpType
class ExternalCallModule(DetectionModule):
@ -20,199 +19,81 @@ class ExternalCallModule(DetectionModule):
self.max_search_depth = max_search_depth
self.calls_visited = []
def search_children(
self, statespace, node, transaction_id, start_index=0, depth=0, results=None
):
if results is None:
results = []
logging.debug("SEARCHING NODE %d", node.uid)
if depth < self.max_search_depth:
n_states = len(node.states)
if n_states > start_index:
for j in range(start_index, n_states):
if (
node.states[j].get_current_instruction()["opcode"] == "SSTORE"
and node.states[j].current_transaction.id == transaction_id
):
results.append(
node.states[j].get_current_instruction()["address"]
)
children = []
for edge in statespace.edges:
if edge.node_from == node.uid and edge.type != JumpType.Transaction:
children.append(statespace.nodes[edge.node_to])
if len(children):
for node in children:
results += self.search_children(
statespace,
node,
transaction_id,
depth=depth + 1,
results=results,
)
return results
def execute(self, statespace):
def execute(self, state_space):
logging.debug("Executing module: %s", self.name)
issues = []
for call in statespace.calls:
state = call.state
address = state.get_current_instruction()["address"]
if call.type == "CALL":
logging.debug(
"[EXTERNAL_CALLS] Call to: %s, value = %s, gas = %s"
% (str(call.to), str(call.value), str(call.gas))
)
for k in state_space.nodes:
node = state_space.nodes[k]
for state in node.states:
issues += self._analyze_state(state, node)
if (
call.to.type == VarType.SYMBOLIC
and (call.gas.type == VarType.CONCRETE and call.gas.val > 2300)
or (
call.gas.type == VarType.SYMBOLIC
and "2300" not in str(call.gas)
)
):
description = "This contract executes a message call to "
target = str(call.to)
user_supplied = False
if "calldata" in target or "caller" in target:
if "calldata" in target:
description += (
"an address provided as a function argument. "
)
else:
description += "the address of the transaction sender. "
return issues
user_supplied = True
else:
m = re.search(r"storage_([a-z0-9_&^]+)", str(call.to))
@staticmethod
def _analyze_state(state, node):
issues = []
instruction = state.get_current_instruction()
if m:
idx = m.group(1)
if instruction["opcode"] not in ("CALL", "CALLCODE", "DELEGATECALL", "STATICCALL"):
return []
func = statespace.find_storage_write(
state.environment.active_account.address, idx
)
gas = state.mstate.stack[-1]
to = state.mstate.stack[-2]
if func:
address = state.get_current_instruction()["address"]
description += (
"an address found at storage slot "
+ str(idx)
+ ". "
+ "This storage slot can be written to by calling the function `"
+ func
+ "`. "
)
user_supplied = True
try:
constraints = state.node.constraints + [gas > 2300]
transaction_sequence = solver.get_transaction_sequence(state, constraints)
if user_supplied:
try:
constraints += [to == 0xDEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEF]
transaction_sequence = solver.get_transaction_sequence(state, constraints)
description += (
"Generally, it is not recommended to call user-supplied addresses using Solidity's call() construct. "
"Note that attackers might leverage reentrancy attacks to exploit race conditions or manipulate this contract's state."
)
except UnsatError:
debug = str(transaction_sequence)
description = "The contract executes a function call to an external address. " \
"Verify that the code at this address is trusted and immutable."
issue = Issue(
contract=call.node.contract_name,
function_name=call.node.function_name,
contract=node.contract_name,
function_name=state.node.function_name,
address=address,
title="Message call to external contract",
_type="Warning",
description=description,
bytecode=state.environment.code.bytecode,
swc_id=REENTRANCY,
gas_used=(
state.mstate.min_gas_used,
state.mstate.max_gas_used,
),
)
else:
description += "to another contract. Make sure that the called contract is trusted and does not execute user-supplied code."
issue = Issue(
contract=call.node.contract_name,
function_name=call.node.function_name,
address=address,
title="Message call to external contract",
title="External call",
_type="Informational",
description=description,
bytecode=state.environment.code.bytecode,
swc_id=REENTRANCY,
gas_used=(
state.mstate.min_gas_used,
state.mstate.max_gas_used,
),
description=description,
debug=debug,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
issues.append(issue)
return issues
if address not in self.calls_visited:
self.calls_visited.append(address)
logging.debug(
"[EXTERNAL_CALLS] Checking for state changes starting from "
+ call.node.function_name
)
# Check for SSTORE in remaining instructions in current node & nodes down the CFG
state_change_addresses = self.search_children(
statespace,
call.node,
call.state.current_transaction.id,
call.state_index + 1,
depth=0,
results=[],
)
logging.debug(
"[EXTERNAL_CALLS] Detected state changes at addresses: "
+ str(state_change_addresses)
)
if len(state_change_addresses):
for address in state_change_addresses:
description = (
"The contract account state is changed after an external call. "
"Consider that the called contract could re-enter the function before this "
"state change takes place. This can lead to business logic vulnerabilities."
)
debug = str(transaction_sequence)
description = "The contract executes a function call with high gas to a user-supplied address. " \
"Note that the callee can contain arbitrary code and may re-enter any function in this contract. " \
"Review the business logic carefully to prevent unanticipated effects on the contract state."
issue = Issue(
contract=call.node.contract_name,
function_name=call.node.function_name,
contract=node.contract_name,
function_name=node.function_name,
address=address,
title="State change after external call",
swc_id=REENTRANCY,
title="External call to user-supplied address",
_type="Warning",
description=description,
bytecode=state.environment.code.bytecode,
swc_id=REENTRANCY,
gas_used=(
state.mstate.min_gas_used,
state.mstate.max_gas_used,
),
description=description,
debug=debug,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
issues.append(issue)
return issues
except UnsatError:
logging.debug("[EXTERNAL_CALLS] no model found for setting caller address.")
detector = ExternalCallModule()

Loading…
Cancel
Save