Rewrite external_calls, refactor thief & extcalls to callbacks

pull/757/head
Bernhard Mueller 6 years ago
parent a0b1721989
commit e441907c38
  1. 50
      mythril/analysis/modules/ether_thief.py
  2. 93
      mythril/analysis/modules/external_calls.py
  3. 2
      mythril/laser/ethereum/instructions.py

@ -3,6 +3,7 @@ from mythril.analysis import solver
from mythril.analysis.report import Issue from mythril.analysis.report import Issue
from mythril.analysis.swc_data import UNPROTECTED_ETHER_WITHDRAWAL from mythril.analysis.swc_data import UNPROTECTED_ETHER_WITHDRAWAL
from mythril.analysis.modules.base import DetectionModule from mythril.analysis.modules.base import DetectionModule
from mythril.laser.ethereum.state.global_state import GlobalState
from mythril.exceptions import UnsatError from mythril.exceptions import UnsatError
from z3 import BitVecVal, UGT, Sum from z3 import BitVecVal, UGT, Sum
import logging import logging
@ -22,30 +23,9 @@ An issue is reported if:
""" """
class EtherThief(DetectionModule): def _analyze_state(state):
def __init__(self):
super().__init__(
name="Ether Thief",
swc_id=UNPROTECTED_ETHER_WITHDRAWAL,
hooks=["CALL"],
description=DESCRIPTION,
)
def execute(self, state_space):
logging.debug("Executing module: %s", self.name)
issues = []
for k in state_space.nodes:
node = state_space.nodes[k]
for state in node.states:
issues += self._analyze_state(state, node)
return issues
@staticmethod
def _analyze_state(state, node):
issues = []
instruction = state.get_current_instruction() instruction = state.get_current_instruction()
node = state.node
if instruction["opcode"] != "CALL": if instruction["opcode"] != "CALL":
return [] return []
@ -91,11 +71,31 @@ class EtherThief(DetectionModule):
debug=debug, debug=debug,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used), gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
) )
issues.append(issue)
except UnsatError: except UnsatError:
logging.debug("[ETHER_THIEF] no model found") logging.debug("[ETHER_THIEF] no model found")
return []
return [issue]
class EtherThief(DetectionModule):
def __init__(self):
super().__init__(
name="Ether Thief",
swc_id=UNPROTECTED_ETHER_WITHDRAWAL,
hooks=["CALL"],
description=DESCRIPTION,
entrypoint="callback",
)
self._issues = []
def execute(self, state: GlobalState):
self._issues.extend(_analyze_state(state))
return self.issues
return issues @property
def issues(self):
return self._issues
detector = EtherThief() detector = EtherThief()

@ -1,13 +1,12 @@
from z3 import * from z3 import *
from mythril.analysis.ops import *
from mythril.analysis.report import Issue from mythril.analysis.report import Issue
from mythril.analysis import solver from mythril.analysis import solver
from mythril.analysis.swc_data import REENTRANCY from mythril.analysis.swc_data import REENTRANCY
from mythril.analysis.modules.base import DetectionModule from mythril.analysis.modules.base import DetectionModule
from mythril.laser.ethereum.state.global_state import GlobalState
from mythril.exceptions import UnsatError from mythril.exceptions import UnsatError
import logging import logging
DESCRIPTION = """ DESCRIPTION = """
Search for low level calls (e.g. call.value()) that forward all gas to the callee. Search for low level calls (e.g. call.value()) that forward all gas to the callee.
@ -17,90 +16,88 @@ an informational issue.
""" """
class ExternalCallModule(DetectionModule): def _analyze_state(state):
def __init__(self):
super().__init__(
name="External Calls",
swc_id=REENTRANCY,
hooks=["CALL", "DELEGATECALL", "STATICCALL", "CALLCODE"],
description="Check for call.value()() to external addresses",
)
def execute(self, state_space):
logging.debug("Executing module: %s", self.name)
issues = []
for k in state_space.nodes:
node = state_space.nodes[k]
for state in node.states:
issues += self._analyze_state(state, node)
return issues
@staticmethod
def _analyze_state(state, node):
issues = []
instruction = state.get_current_instruction()
if instruction["opcode"] not in ("CALL", "CALLCODE", "DELEGATECALL", "STATICCALL"):
return []
node = state.node
gas = state.mstate.stack[-1] gas = state.mstate.stack[-1]
to = state.mstate.stack[-2] to = state.mstate.stack[-2]
address = state.get_current_instruction()["address"] address = state.get_current_instruction()["address"]
try: try:
constraints = state.node.constraints + [gas > 2300] constraints = node.constraints + [gas > 2300]
transaction_sequence = solver.get_transaction_sequence(state, constraints) transaction_sequence = solver.get_transaction_sequence(state, constraints)
# Check whether we can also set the callee address
try: try:
constraints += [to == 0xDEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEF] constraints += [to == 0xDEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEF]
transaction_sequence = solver.get_transaction_sequence(state, constraints) transaction_sequence = solver.get_transaction_sequence(state, constraints)
except UnsatError:
debug = str(transaction_sequence) debug = str(transaction_sequence)
description = "The contract executes a function call to an external address. " \ description = "The contract executes a function call with high gas to a user-supplied address. " \
"Verify that the code at this address is trusted and immutable." "Note that the callee can contain arbitrary code and may re-enter any function in this contract. " \
"Review the business logic carefully to prevent unanticipated effects on the contract state."
issue = Issue( issue = Issue(
contract=node.contract_name, contract=node.contract_name,
function_name=state.node.function_name, function_name=node.function_name,
address=address, address=address,
swc_id=REENTRANCY, swc_id=REENTRANCY,
title="External call", title="External call to user-supplied address",
_type="Informational", _type="Warning",
bytecode=state.environment.code.bytecode, bytecode=state.environment.code.bytecode,
description=description, description=description,
debug=debug, debug=debug,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used), gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
) )
issues.append(issue) except UnsatError:
return issues
logging.debug("[EXTERNAL_CALLS] Callee address cannot be modified. Reporting informational issue.")
debug = str(transaction_sequence) debug = str(transaction_sequence)
description = "The contract executes a function call with high gas to a user-supplied address. " \ description = "The contract executes a function call to an external address. " \
"Note that the callee can contain arbitrary code and may re-enter any function in this contract. " \ "Verify that the code at this address is trusted and immutable."
"Review the business logic carefully to prevent unanticipated effects on the contract state."
issue = Issue( issue = Issue(
contract=node.contract_name, contract=node.contract_name,
function_name=node.function_name, function_name=state.node.function_name,
address=address, address=address,
swc_id=REENTRANCY, swc_id=REENTRANCY,
title="External call to user-supplied address", title="External call",
_type="Warning", _type="Informational",
bytecode=state.environment.code.bytecode, bytecode=state.environment.code.bytecode,
description=description, description=description,
debug=debug, debug=debug,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used), gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
) )
issues.append(issue)
return issues
except UnsatError: except UnsatError:
logging.debug("[EXTERNAL_CALLS] no model found for setting caller address.") logging.debug("[EXTERNAL_CALLS] No model found.")
return []
return [issue]
class ExternalCalls(DetectionModule):
def __init__(self):
super().__init__(
name="External calls",
swc_id=REENTRANCY,
hooks=["CALL"],
description=(DESCRIPTION),
entrypoint="callback",
)
self._issues = []
def execute(self, state: GlobalState):
self._issues.extend(_analyze_state(state))
return self.issues
@property
def issues(self):
return self._issues
detector = ExternalCallModule() detector = ExternalCalls()

@ -131,7 +131,7 @@ class Instruction:
def evaluate(self, global_state: GlobalState, post=False) -> List[GlobalState]: def evaluate(self, global_state: GlobalState, post=False) -> List[GlobalState]:
""" Performs the mutation for this instruction """ """ Performs the mutation for this instruction """
# Generalize some ops # Generalize some ops
logging.debug("Evaluating {}".format(self.op_code)) # logging.debug("Evaluating {}".format(self.op_code))
op = self.op_code.lower() op = self.op_code.lower()
if self.op_code.startswith("PUSH"): if self.op_code.startswith("PUSH"):
op = "push" op = "push"

Loading…
Cancel
Save