Merge pull request #994 from ConsenSys/enhance/w/886

Propose some changes to enhance/886
pull/919/head
Nikhil Parasaram 6 years ago committed by GitHub
commit f4893eea65
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      mythril/analysis/modules/external_calls.py
  2. 209
      mythril/analysis/modules/state_change_external_calls.py

@ -5,7 +5,6 @@ from mythril.analysis import solver
from mythril.analysis.swc_data import REENTRANCY
from mythril.analysis.modules.base import DetectionModule
from mythril.analysis.report import Issue
from mythril.analysis.call_helpers import get_call_from_state
from mythril.laser.smt import UGT, symbol_factory
from mythril.laser.ethereum.state.global_state import GlobalState
from mythril.exceptions import UnsatError
@ -32,11 +31,8 @@ def _analyze_state(state):
"""
gas = state.mstate.stack[-1]
to = state.mstate.stack[-2]
address = state.get_current_instruction()["address"]
call = get_call_from_state(state)
if call is None:
return []
address = state.get_current_instruction()["address"]
try:
constraints = copy(state.mstate.constraints)

@ -1,14 +1,12 @@
from mythril.analysis.ops import Call, Variable, VarType
from mythril.analysis.swc_data import REENTRANCY
from mythril.analysis.modules.base import DetectionModule
from mythril.analysis.report import Issue
from mythril.analysis.call_helpers import get_call_from_state
from mythril.laser.smt import symbol_factory, simplify, UGT
from mythril.laser.smt import symbol_factory, UGT, BitVec
from mythril.laser.ethereum.state.global_state import GlobalState
from mythril.laser.ethereum.state.annotation import StateAnnotation
from mythril.analysis import solver
from mythril.exceptions import UnsatError
from typing import List, cast
from typing import List, cast, Optional
from copy import copy
import logging
@ -21,25 +19,47 @@ Check whether there is a state change of the contract after the execution of an
"""
class CallIssue:
""" This class is a struct of
call: the Call struct
user_defined_address: Whether the address can be defined by user or not
"""
def __init__(self, call: Call, user_defined_address: bool) -> None:
self.call = call
class StateChangeCallsAnnotation(StateAnnotation):
def __init__(self, call_state: GlobalState, user_defined_address: bool) -> None:
self.call_state = call_state
self.state_change_states = [] # type: List[GlobalState]
self.user_defined_address = user_defined_address
def __copy__(self):
new_annotation = StateChangeCallsAnnotation(
self.call_state, self.user_defined_address
)
new_annotation.state_change_states = self.state_change_states[:]
return new_annotation
class StateChangeCallsAnnotation(StateAnnotation):
def __init__(self) -> None:
self.calls = [] # type: List[CallIssue]
def get_issue(self) -> Optional[Issue]:
if not self.state_change_states:
return None
def __copy__(self):
result = StateChangeCallsAnnotation()
result.calls = copy(self.calls)
return result
severity = "Medium" if self.user_defined_address else "Low"
address = self.call_state.get_current_instruction()["address"]
logging.debug(
"[EXTERNAL_CALLS] Detected state changes at addresses: {}".format(address)
)
description_head = (
"The contract account state is changed after an external call. "
)
description_tail = (
"Consider that the called contract could re-enter the function before this "
"state change takes place. This can lead to business logic vulnerabilities."
)
return Issue(
contract=self.call_state.environment.active_account.contract_name,
function_name=self.call_state.environment.active_function_name,
address=address,
title="State change after external call",
severity=severity,
description_head=description_head,
description_tail=description_tail,
swc_id=REENTRANCY,
bytecode=self.call_state.environment.code.bytecode,
)
class StateChange(DetectionModule):
@ -64,137 +84,82 @@ class StateChange(DetectionModule):
)
def execute(self, state: GlobalState):
"""
:param state:
:return:
"""
self._issues.extend(self._analyze_state(state))
return self.issues
@staticmethod
def _add_external_call(
state: GlobalState, annotations: List[StateChangeCallsAnnotation]
) -> None:
call = get_call_from_state(state)
gas = state.mstate.stack[-1]
to = state.mstate.stack[-2]
if call is None:
return
def _add_external_call(global_state: GlobalState) -> None:
gas = global_state.mstate.stack[-1]
to = global_state.mstate.stack[-2]
try:
constraints = copy(state.mstate.constraints)
constraints = copy(global_state.mstate.constraints)
solver.get_model(
constraints + [UGT(gas, symbol_factory.BitVecVal(2300, 256))]
constraints + [UGT(gas, symbol_factory.BitVecVal(2300, 256)), to > 16]
)
# Check whether we can also set the callee address
try:
constraints += [to == 0xDEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEF]
solver.get_model(constraints)
annotations[0].calls.append(
CallIssue(call=call, user_defined_address=True)
)
except UnsatError:
annotations[0].calls.append(
CallIssue(call=call, user_defined_address=False)
)
global_state.annotate(StateChangeCallsAnnotation(global_state, True))
except UnsatError:
global_state.annotate(StateChangeCallsAnnotation(global_state, False))
except UnsatError:
pass
@staticmethod
def _analyze_state(state: GlobalState) -> List[Issue]:
"""
def _analyze_state(global_state: GlobalState) -> List[Issue]:
:param state:
:return:
"""
address = state.get_current_instruction()["address"]
annotations = cast(
List[StateChangeCallsAnnotation],
list(state.get_annotations(StateChangeCallsAnnotation)),
list(global_state.get_annotations(StateChangeCallsAnnotation)),
)
opcode = state.get_current_instruction()["opcode"]
op_code = global_state.get_current_instruction()["opcode"]
if len(annotations) == 0:
if opcode in ("SSTORE", "CREATE", "CREATE2"):
return []
log.debug("Creating annotation for state")
state.annotate(StateChangeCallsAnnotation())
annotations = cast(
List[StateChangeCallsAnnotation],
list(state.get_annotations(StateChangeCallsAnnotation)),
)
if opcode in ("SSTORE", "CREATE", "CREATE2"):
return StateChange._handle_state_change(
state, address=address, annotation=annotations[0]
)
call = get_call_from_state(state)
if call is None:
return []
if StateChange._balance_change(call.value):
return StateChange._handle_state_change(
state, address=address, annotation=annotations[0]
)
if opcode == "CALL":
StateChange._add_external_call(state, annotations=annotations)
if op_code in ("SSTORE", "CREATE", "CREATE2"):
return []
if op_code in ("SSTORE", "CREATE", "CREATE2"):
for annotation in annotations:
annotation.state_change_states.append(global_state)
# Record state changes following from a transfer of ether
if op_code in ("CALL", "DELEGATECALL", "CALLCODE"):
value = global_state.mstate.stack[-3] # type: BitVec
if StateChange._balance_change(value, global_state):
for annotation in annotations:
annotation.state_change_states.append(global_state)
# Record external calls
if op_code in ("CALL", "DELEGATECALL", "CALLCODE"):
StateChange._add_external_call(global_state)
# Check for vulnerabilities
vulnerabilities = []
for annotation in annotations:
if not annotation.state_change_states:
continue
vulnerabilities.append(annotation.get_issue())
global_state.annotations.remove(annotation)
return vulnerabilities
@staticmethod
def _get_state_change_issues(
callissues: List[CallIssue], state: GlobalState, address: int
) -> List[Issue]:
issues = []
for callissue in callissues:
severity = "Medium" if callissue.user_defined_address else "Low"
call = callissue.call
logging.debug(
"[EXTERNAL_CALLS] Detected state changes at addresses: {}".format(
address
)
)
description_head = (
"The contract account state is changed after an external call. "
)
description_tail = (
"Consider that the called contract could re-enter the function before this "
"state change takes place. This can lead to business logic vulnerabilities."
)
issue = Issue(
contract=call.state.environment.active_account.contract_name,
function_name=call.state.environment.active_function_name,
address=address,
title="State change after external call",
severity=severity,
description_head=description_head,
description_tail=description_tail,
swc_id=REENTRANCY,
bytecode=state.environment.code.bytecode,
)
issues.append(issue)
return issues
def _balance_change(value: BitVec, global_state: GlobalState) -> bool:
if not value.symbolic:
assert value.value is not None
return value.value > 0
@staticmethod
def _handle_state_change(
state: GlobalState, address: int, annotation: StateChangeCallsAnnotation
) -> List[Issue]:
calls = annotation.calls
issues = StateChange._get_state_change_issues(calls, state, address)
return issues
@staticmethod
def _balance_change(value: Variable) -> bool:
if value.type == VarType.CONCRETE:
return value.val > 0
else:
zero = symbol_factory.BitVecVal(0, 256)
return simplify(value.val > zero)
constraints = copy(global_state.mstate.constraints)
try:
solver.get_model(
constraints + [value > symbol_factory.BitVecVal(0, 256)]
)
return True
except UnsatError:
return False
detector = StateChange()

Loading…
Cancel
Save