mirror of https://github.com/ConsenSys/mythril
parent
6f1d9b24da
commit
dec47b102b
@ -0,0 +1,172 @@ |
||||
from mythril.analysis.swc_data import REENTRANCY |
||||
from mythril.analysis.modules.base import DetectionModule |
||||
from mythril.analysis.report import Issue |
||||
from mythril.laser.smt import symbol_factory, UGT, BitVec, Or |
||||
from mythril.laser.ethereum.state.global_state import GlobalState |
||||
from mythril.laser.ethereum.state.annotation import StateAnnotation |
||||
from mythril.analysis import solver |
||||
from mythril.exceptions import UnsatError |
||||
from typing import List, cast, Optional |
||||
from copy import copy |
||||
|
||||
import logging |
||||
|
||||
log = logging.getLogger(__name__) |
||||
|
||||
DESCRIPTION = """ |
||||
|
||||
Check whether there is a state change of the contract after the execution of an external call |
||||
""" |
||||
|
||||
|
||||
class StateChangeCallsAnnotation(StateAnnotation): |
||||
def __init__(self, call_state: GlobalState, user_defined_address: bool) -> None: |
||||
self.call_state = call_state |
||||
self.state_change_states = [] # type: List[GlobalState] |
||||
self.user_defined_address = user_defined_address |
||||
|
||||
def __copy__(self): |
||||
new_annotation = StateChangeCallsAnnotation( |
||||
self.call_state, self.user_defined_address |
||||
) |
||||
new_annotation.state_change_states = self.state_change_states[:] |
||||
return new_annotation |
||||
|
||||
def get_issue(self, global_state: GlobalState) -> Optional[Issue]: |
||||
if not self.state_change_states: |
||||
return None |
||||
|
||||
severity = "Medium" if self.user_defined_address else "Low" |
||||
address = global_state.get_current_instruction()["address"] |
||||
logging.debug( |
||||
"[EXTERNAL_CALLS] Detected state changes at addresses: {}".format(address) |
||||
) |
||||
description_head = ( |
||||
"The contract account state is changed after an external call. " |
||||
) |
||||
description_tail = ( |
||||
"Consider that the called contract could re-enter the function before this " |
||||
"state change takes place. This can lead to business logic vulnerabilities." |
||||
) |
||||
|
||||
return Issue( |
||||
contract=global_state.environment.active_account.contract_name, |
||||
function_name=global_state.environment.active_function_name, |
||||
address=address, |
||||
title="State change after external call", |
||||
severity=severity, |
||||
description_head=description_head, |
||||
description_tail=description_tail, |
||||
swc_id=REENTRANCY, |
||||
bytecode=global_state.environment.code.bytecode, |
||||
) |
||||
|
||||
|
||||
class StateChange(DetectionModule): |
||||
"""This module searches for state change after low level calls (e.g. call.value()) that |
||||
forward gas to the callee.""" |
||||
|
||||
def __init__(self): |
||||
"""""" |
||||
super().__init__( |
||||
name="State Change After External calls", |
||||
swc_id=REENTRANCY, |
||||
description=DESCRIPTION, |
||||
entrypoint="callback", |
||||
pre_hooks=[ |
||||
"CALL", |
||||
"SSTORE", |
||||
"DELEGATECALL", |
||||
"STATICCALL", |
||||
"CREATE", |
||||
"CREATE2", |
||||
"CALLCODE", |
||||
], |
||||
) |
||||
|
||||
def execute(self, state: GlobalState): |
||||
self._issues.extend(self._analyze_state(state)) |
||||
return self.issues |
||||
|
||||
@staticmethod |
||||
def _add_external_call(global_state: GlobalState) -> None: |
||||
gas = global_state.mstate.stack[-1] |
||||
to = global_state.mstate.stack[-2] |
||||
try: |
||||
constraints = copy(global_state.mstate.constraints) |
||||
solver.get_model( |
||||
constraints |
||||
+ [ |
||||
UGT(gas, symbol_factory.BitVecVal(2300, 256)), |
||||
Or( |
||||
to > symbol_factory.BitVecVal(16, 256), |
||||
to == symbol_factory.BitVecVal(0, 256), |
||||
), |
||||
] |
||||
) |
||||
|
||||
# Check whether we can also set the callee address |
||||
try: |
||||
constraints += [to == 0xDEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEF] |
||||
solver.get_model(constraints) |
||||
|
||||
global_state.annotate(StateChangeCallsAnnotation(global_state, True)) |
||||
except UnsatError: |
||||
global_state.annotate(StateChangeCallsAnnotation(global_state, False)) |
||||
except UnsatError: |
||||
pass |
||||
|
||||
@staticmethod |
||||
def _analyze_state(global_state: GlobalState) -> List[Issue]: |
||||
|
||||
annotations = cast( |
||||
List[StateChangeCallsAnnotation], |
||||
list(global_state.get_annotations(StateChangeCallsAnnotation)), |
||||
) |
||||
op_code = global_state.get_current_instruction()["opcode"] |
||||
|
||||
if len(annotations) == 0: |
||||
if op_code in ("SSTORE", "CREATE", "CREATE2"): |
||||
return [] |
||||
if op_code in ("SSTORE", "CREATE", "CREATE2"): |
||||
for annotation in annotations: |
||||
annotation.state_change_states.append(global_state) |
||||
|
||||
# Record state changes following from a transfer of ether |
||||
if op_code in ("CALL", "DELEGATECALL", "CALLCODE"): |
||||
value = global_state.mstate.stack[-3] # type: BitVec |
||||
if StateChange._balance_change(value, global_state): |
||||
for annotation in annotations: |
||||
annotation.state_change_states.append(global_state) |
||||
|
||||
# Record external calls |
||||
if op_code in ("CALL", "DELEGATECALL", "CALLCODE"): |
||||
StateChange._add_external_call(global_state) |
||||
|
||||
# Check for vulnerabilities |
||||
vulnerabilities = [] |
||||
for annotation in annotations: |
||||
if not annotation.state_change_states: |
||||
continue |
||||
vulnerabilities.append(annotation.get_issue(global_state)) |
||||
return vulnerabilities |
||||
|
||||
@staticmethod |
||||
def _balance_change(value: BitVec, global_state: GlobalState) -> bool: |
||||
if not value.symbolic: |
||||
assert value.value is not None |
||||
return value.value > 0 |
||||
|
||||
else: |
||||
constraints = copy(global_state.mstate.constraints) |
||||
|
||||
try: |
||||
solver.get_model( |
||||
constraints + [value > symbol_factory.BitVecVal(0, 256)] |
||||
) |
||||
return True |
||||
except UnsatError: |
||||
return False |
||||
|
||||
|
||||
detector = StateChange() |
@ -1,47 +0,0 @@ |
||||
from mythril.laser.ethereum.svm import LaserEVM |
||||
from mythril.laser.ethereum.plugins.plugin import LaserPlugin |
||||
from mythril.laser.ethereum.state.world_state import WorldState |
||||
|
||||
from typing import List |
||||
|
||||
import logging |
||||
|
||||
log = logging.getLogger(__name__) |
||||
|
||||
|
||||
class SaveInitialWorldState(LaserPlugin): |
||||
"""SaveInitialWorldState |
||||
This plugin is used to save initial world state so it can be used for the output to display |
||||
|
||||
""" |
||||
|
||||
def __init__(self): |
||||
pass |
||||
|
||||
def initialize(self, symbolic_vm: LaserEVM): |
||||
""" |
||||
:param symbolic_vm: |
||||
:return: |
||||
""" |
||||
|
||||
@symbolic_vm.laser_hook("end_contract_creation") |
||||
def set_standard_initial_state(openstates: List[WorldState]): |
||||
""" |
||||
This function initializes the initial state to all the open states |
||||
:param openstates: |
||||
:return: |
||||
""" |
||||
accounts = openstates[0].accounts |
||||
initial_state = openstates[0].initial_state_account |
||||
initial_state[ |
||||
"accounts" |
||||
] = {} # This variable persists for all world states. |
||||
for address, account in accounts.items(): |
||||
if address == "0x" + "0" * 40: |
||||
continue |
||||
initial_state["accounts"][address] = { |
||||
"nounce": account.nonce, |
||||
"balance": "<ARBITRARY_BALANCE>", |
||||
"code": account.code.bytecode, |
||||
"storage": {}, |
||||
} |
@ -1 +1,11 @@ |
||||
[{"issues": [], "meta": {}, "sourceFormat": "evm-byzantium-bytecode", "sourceList": ["0x3746c7c2ae7b0d4c3f8b1905df9a7ea169b9f93bec68a10a00b4c9d27a18c6fb"], "sourceType": "raw-bytecode"}] |
||||
[ |
||||
{ |
||||
"issues": [], |
||||
"meta": {}, |
||||
"sourceFormat": "evm-byzantium-bytecode", |
||||
"sourceList": [ |
||||
"0x3746c7c2ae7b0d4c3f8b1905df9a7ea169b9f93bec68a10a00b4c9d27a18c6fb" |
||||
], |
||||
"sourceType": "raw-bytecode" |
||||
} |
||||
] |
@ -1 +1,11 @@ |
||||
[{"issues": [], "meta": {}, "sourceFormat": "evm-byzantium-bytecode", "sourceList": ["0x0e6f727bb3301e02d3be831bf34357522fd2f1d40e90dff8e2214553b06b5f6c"], "sourceType": "raw-bytecode"}] |
||||
[ |
||||
{ |
||||
"issues": [], |
||||
"meta": {}, |
||||
"sourceFormat": "evm-byzantium-bytecode", |
||||
"sourceList": [ |
||||
"0x0e6f727bb3301e02d3be831bf34357522fd2f1d40e90dff8e2214553b06b5f6c" |
||||
], |
||||
"sourceType": "raw-bytecode" |
||||
} |
||||
] |
@ -1 +1,11 @@ |
||||
[{"issues": [], "meta": {}, "sourceFormat": "evm-byzantium-bytecode", "sourceList": ["0x11a78eb09819f505ba4f10747e6d1f7a44480e602c67573b7abac2f733a85d93"], "sourceType": "raw-bytecode"}] |
||||
[ |
||||
{ |
||||
"issues": [], |
||||
"meta": {}, |
||||
"sourceFormat": "evm-byzantium-bytecode", |
||||
"sourceList": [ |
||||
"0x11a78eb09819f505ba4f10747e6d1f7a44480e602c67573b7abac2f733a85d93" |
||||
], |
||||
"sourceType": "raw-bytecode" |
||||
} |
||||
] |
Loading…
Reference in new issue