Merge branch 'develop'

pull/1146/head
Bernhard Mueller 5 years ago
commit b3ef04bb09
  1. 3
      mythril/analysis/modules/base.py
  2. 105
      mythril/analysis/modules/delegatecall.py
  3. 242
      mythril/analysis/modules/dependence_on_predictable_vars.py
  4. 116
      mythril/analysis/modules/deprecated_ops.py
  5. 19
      mythril/analysis/modules/dos.py
  6. 19
      mythril/analysis/modules/ether_thief.py
  7. 93
      mythril/analysis/modules/exceptions.py
  8. 180
      mythril/analysis/modules/external_calls.py
  9. 11
      mythril/analysis/modules/integer.py
  10. 102
      mythril/analysis/modules/multiple_sends.py
  11. 26
      mythril/analysis/modules/state_change_external_calls.py
  12. 18
      mythril/analysis/modules/suicide.py
  13. 115
      mythril/analysis/modules/unchecked_retval.py
  14. 17
      mythril/analysis/solver.py
  15. 4
      mythril/laser/ethereum/plugins/implementations/dependency_pruner.py
  16. 6
      mythril/laser/ethereum/state/constraints.py

@ -2,7 +2,7 @@
modules."""
import logging
from typing import List
from typing import List, Set
from mythril.analysis.report import Issue
log = logging.getLogger(__name__)
@ -35,6 +35,7 @@ class DetectionModule:
)
self.entrypoint = entrypoint
self._issues = [] # type: List[Issue]
self._cache = set() # type: Set[int]
@property
def issues(self):

@ -93,58 +93,63 @@ class DelegateCallModule(DetectionModule):
:param state:
:return:
"""
self._issues.extend(_analyze_states(state))
def _analyze_states(state: GlobalState) -> List[Issue]:
"""
:param state: the current state
:return: returns the issues for that corresponding state
"""
issues = []
op_code = state.get_current_instruction()["opcode"]
annotations = cast(
List[DelegateCallAnnotation],
list(state.get_annotations(DelegateCallAnnotation)),
)
if len(annotations) == 0 and op_code in ("RETURN", "STOP"):
return []
if op_code == "DELEGATECALL":
gas = state.mstate.stack[-1]
to = state.mstate.stack[-2]
constraints = [
to == ATTACKER_ADDRESS,
UGT(gas, symbol_factory.BitVecVal(2300, 256)),
]
for tx in state.world_state.transaction_sequence:
if not isinstance(tx, ContractCreationTransaction):
constraints.append(tx.caller == ATTACKER_ADDRESS)
state.annotate(DelegateCallAnnotation(state, constraints))
return []
else:
for annotation in annotations:
try:
transaction_sequence = solver.get_transaction_sequence(
state,
state.mstate.constraints
+ annotation.constraints
+ [annotation.return_value == 1],
)
issues.append(
annotation.get_issue(
state, transaction_sequence=transaction_sequence
if state.get_current_instruction()["address"] in self._cache:
return
issues = self._analyze_state(state)
for issue in issues:
self._cache.add(issue.address)
self._issues.extend(issues)
@staticmethod
def _analyze_state(state: GlobalState) -> List[Issue]:
"""
:param state: the current state
:return: returns the issues for that corresponding state
"""
issues = []
op_code = state.get_current_instruction()["opcode"]
annotations = cast(
List[DelegateCallAnnotation],
list(state.get_annotations(DelegateCallAnnotation)),
)
if len(annotations) == 0 and op_code in ("RETURN", "STOP"):
return []
if op_code == "DELEGATECALL":
gas = state.mstate.stack[-1]
to = state.mstate.stack[-2]
constraints = [
to == ATTACKER_ADDRESS,
UGT(gas, symbol_factory.BitVecVal(2300, 256)),
]
for tx in state.world_state.transaction_sequence:
if not isinstance(tx, ContractCreationTransaction):
constraints.append(tx.caller == ATTACKER_ADDRESS)
state.annotate(DelegateCallAnnotation(state, constraints))
return []
else:
for annotation in annotations:
try:
transaction_sequence = solver.get_transaction_sequence(
state,
state.mstate.constraints
+ annotation.constraints
+ [annotation.return_value == 1],
)
issues.append(
annotation.get_issue(
state, transaction_sequence=transaction_sequence
)
)
)
except UnsatError:
continue
except UnsatError:
continue
return issues
return issues
detector = DelegateCallModule()

@ -1,6 +1,7 @@
"""This module contains the detection code for predictable variable
dependence."""
import logging
from copy import copy
from mythril.analysis.modules.base import DetectionModule
from mythril.analysis.report import Issue
@ -27,22 +28,25 @@ def is_prehook() -> bool:
class PredictableValueAnnotation:
"""Symbol annotation used if a variable is initialized from a predictable environment variable."""
def __init__(self, operation: str) -> None:
def __init__(self, operation: str, add_constraints=None) -> None:
self.operation = operation
self.add_constraints = add_constraints
class PredictablePathAnnotation(StateAnnotation):
"""State annotation used when a path is chosen based on a predictable variable."""
def __init__(self, operation: str, location: int) -> None:
def __init__(self, operation: str, location: int, add_constraints=None) -> None:
self.operation = operation
self.location = location
self.add_constraints = add_constraints
class OldBlockNumberUsedAnnotation(StateAnnotation):
"""State annotation set in blockhash prehook if the input value is lower than the current block number."""
def __init__(self) -> None:
def __init__(self, constraints) -> None:
self.block_constraints = constraints
pass
@ -70,135 +74,163 @@ class PredictableDependenceModule(DetectionModule):
:param state:
:return:
"""
if state.get_current_instruction()["address"] in self._cache:
return
issues = self._analyze_state(state)
for issue in issues:
self._cache.add(issue.address)
self._issues.extend(issues)
@staticmethod
def _analyze_state(state: GlobalState) -> list:
"""
self._issues.extend(_analyze_states(state))
def _analyze_states(state: GlobalState) -> list:
"""
:param state:
:return:
"""
issues = []
if is_prehook():
opcode = state.get_current_instruction()["opcode"]
if opcode in final_ops:
:param state:
:return:
"""
for annotation in state.annotations:
issues = []
if is_prehook():
opcode = state.get_current_instruction()["opcode"]
if opcode in final_ops:
for annotation in state.annotations:
if isinstance(annotation, PredictablePathAnnotation):
if annotation.add_constraints:
constraints = (
state.mstate.constraints + annotation.add_constraints
)
else:
constraints = copy(state.mstate.constraints)
try:
transaction_sequence = solver.get_transaction_sequence(
state, constraints
)
except UnsatError:
continue
description = (
"The "
+ annotation.operation
+ " is used in to determine a control flow decision. "
)
description += (
"Note that the values of variables like coinbase, gaslimit, block number and timestamp "
"are predictable and can be manipulated by a malicious miner. Also keep in mind that attackers "
"know hashes of earlier blocks. Don't use any of those environment variables for random number "
"generation or to make critical control flow decisions."
)
if isinstance(annotation, PredictablePathAnnotation):
description = (
"The "
+ annotation.operation
+ " is used in to determine a control flow decision. "
)
description += (
"Note that the values of variables like coinbase, gaslimit, block number and timestamp "
"are predictable and can be manipulated by a malicious miner. Also keep in mind that attackers "
"know hashes of earlier blocks. Don't use any of those environment variables for random number "
"generation or to make critical control flow decisions."
)
"""
Usually report low severity except in cases where the hash of a previous block is used to
determine control flow.
"""
"""
Usually report low severity except in cases where the hash of a previous block is used to
determine control flow.
"""
severity = "Medium" if "hash" in annotation.operation else "Low"
severity = "Medium" if "hash" in annotation.operation else "Low"
"""
Note: We report the location of the JUMPI that lead to this path. Usually this maps to an if or
require statement.
"""
"""
Note: We report the location of the JUMPI that lead to this path. Usually this maps to an if or
require statement.
"""
swc_id = (
TIMESTAMP_DEPENDENCE
if "timestamp" in annotation.operation
else WEAK_RANDOMNESS
)
swc_id = (
TIMESTAMP_DEPENDENCE
if "timestamp" in annotation.operation
else WEAK_RANDOMNESS
)
issue = Issue(
contract=state.environment.active_account.contract_name,
function_name=state.environment.active_function_name,
address=annotation.location,
swc_id=swc_id,
bytecode=state.environment.code.bytecode,
title="Dependence on predictable environment variable",
severity=severity,
description_head="A control flow decision is made based on a predictable variable.",
description_tail=description,
gas_used=(
state.mstate.min_gas_used,
state.mstate.max_gas_used,
),
transaction_sequence=transaction_sequence,
)
issue = Issue(
contract=state.environment.active_account.contract_name,
function_name=state.environment.active_function_name,
address=annotation.location,
swc_id=swc_id,
bytecode=state.environment.code.bytecode,
title="Dependence on predictable environment variable",
severity=severity,
description_head="A control flow decision is made based on a predictable variable.",
description_tail=description,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
issues.append(issue)
issues.append(issue)
elif opcode == "JUMPI":
elif opcode == "JUMPI":
# Look for predictable state variables in jump condition
# Look for predictable state variables in jump condition
for annotation in state.mstate.stack[-2].annotations:
for annotation in state.mstate.stack[-2].annotations:
if isinstance(annotation, PredictableValueAnnotation):
state.annotate(
PredictablePathAnnotation(
annotation.operation,
state.get_current_instruction()["address"],
if isinstance(annotation, PredictableValueAnnotation):
state.annotate(
PredictablePathAnnotation(
annotation.operation,
state.get_current_instruction()["address"],
add_constraints=annotation.add_constraints,
)
)
)
break
break
elif opcode == "BLOCKHASH":
elif opcode == "BLOCKHASH":
param = state.mstate.stack[-1]
param = state.mstate.stack[-1]
try:
constraint = [
ULT(param, state.environment.block_number),
ULT(
state.environment.block_number,
symbol_factory.BitVecVal(2 ** 255, 256),
),
]
try:
constraint = [
ULT(param, state.environment.block_number),
ULT(
state.environment.block_number,
symbol_factory.BitVecVal(2 ** 255, 256),
),
]
# Why the second constraint? Because without it Z3 returns a solution where param overflows.
# Why the second constraint? Because without it Z3 returns a solution where param overflows.
solver.get_model(constraint)
state.annotate(OldBlockNumberUsedAnnotation())
solver.get_model(state.mstate.constraints + constraint)
state.annotate(OldBlockNumberUsedAnnotation(constraint))
except UnsatError:
pass
except UnsatError:
pass
else:
# we're in post hook
opcode = state.environment.code.instruction_list[state.mstate.pc - 1]["opcode"]
else:
# we're in post hook
if opcode == "BLOCKHASH":
# if we're in the post hook of a BLOCKHASH op, check if an old block number was used to create it.
opcode = state.environment.code.instruction_list[state.mstate.pc - 1][
"opcode"
]
annotations = cast(
List[OldBlockNumberUsedAnnotation],
list(state.get_annotations(OldBlockNumberUsedAnnotation)),
)
if opcode == "BLOCKHASH":
# if we're in the post hook of a BLOCKHASH op, check if an old block number was used to create it.
if len(annotations):
state.mstate.stack[-1].annotate(
PredictableValueAnnotation("block hash of a previous block")
annotations = cast(
List[OldBlockNumberUsedAnnotation],
list(state.get_annotations(OldBlockNumberUsedAnnotation)),
)
else:
# Always create an annotation when COINBASE, GASLIMIT, TIMESTAMP or NUMBER is executed.
state.mstate.stack[-1].annotate(
PredictableValueAnnotation(
"block.{} environment variable".format(opcode.lower())
if len(annotations):
# We can append any block constraint here
state.mstate.stack[-1].annotate(
PredictableValueAnnotation(
"block hash of a previous block",
add_constraints=annotations[0].block_constraints,
)
)
else:
# Always create an annotation when COINBASE, GASLIMIT, TIMESTAMP or NUMBER is executed.
state.mstate.stack[-1].annotate(
PredictableValueAnnotation(
"block.{} environment variable".format(opcode.lower())
)
)
)
return issues
return issues
detector = PredictableDependenceModule()

@ -1,5 +1,6 @@
"""This module contains the detection code for deprecated op codes."""
from mythril.analysis.report import Issue
from mythril.analysis.solver import get_transaction_sequence, UnsatError
from mythril.analysis.swc_data import DEPRECATED_FUNCTIONS_USAGE
from mythril.analysis.modules.base import DetectionModule
from mythril.laser.ethereum.state.global_state import GlobalState
@ -12,57 +13,6 @@ Check for usage of deprecated opcodes
"""
def _analyze_state(state):
"""
:param state:
:return:
"""
node = state.node
instruction = state.get_current_instruction()
if instruction["opcode"] == "ORIGIN":
log.debug("ORIGIN in function " + node.function_name)
title = "Use of tx.origin"
description_head = "Use of tx.origin is deprecated."
description_tail = (
"The smart contract retrieves the transaction origin (tx.origin) using msg.origin. "
"Use of msg.origin is deprecated and the instruction may be removed in the future. "
"Use msg.sender instead.\nSee also: "
"https://solidity.readthedocs.io/en/develop/security-considerations.html#tx-origin".format(
state.environment.active_function_name
)
)
swc_id = DEPRECATED_FUNCTIONS_USAGE
elif instruction["opcode"] == "CALLCODE":
log.debug("CALLCODE in function " + state.environment.active_function_name)
title = "Use of callcode"
description_head = "Use of callcode is deprecated."
description_tail = (
"The callcode method executes code of another contract in the context of the caller account. "
"Due to a bug in the implementation it does not persist sender and value over the call. It was "
"therefore deprecated and may be removed in the future. Use the delegatecall method instead."
)
swc_id = DEPRECATED_FUNCTIONS_USAGE
else:
return
issue = Issue(
contract=state.environment.active_account.contract_name,
function_name=state.environment.active_function_name,
address=instruction["address"],
title=title,
bytecode=state.environment.code.bytecode,
swc_id=swc_id,
severity="Medium",
description_head=description_head,
description_tail=description_tail,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
return [issue]
class DeprecatedOperationsModule(DetectionModule):
"""This module checks for the usage of deprecated op codes."""
@ -82,7 +32,69 @@ class DeprecatedOperationsModule(DetectionModule):
:param state:
:return:
"""
self._issues.extend(_analyze_state(state))
if state.get_current_instruction()["address"] in self._cache:
return
issues = self._analyze_state(state)
for issue in issues:
self._cache.add(issue.address)
self._issues.extend(issues)
@staticmethod
def _analyze_state(state):
"""
:param state:
:return:
"""
node = state.node
instruction = state.get_current_instruction()
if instruction["opcode"] == "ORIGIN":
log.debug("ORIGIN in function " + node.function_name)
title = "Use of tx.origin"
description_head = "Use of tx.origin is deprecated."
description_tail = (
"The smart contract retrieves the transaction origin (tx.origin) using msg.origin. "
"Use of msg.origin is deprecated and the instruction may be removed in the future. "
"Use msg.sender instead.\nSee also: "
"https://solidity.readthedocs.io/en/develop/security-considerations.html#tx-origin".format(
state.environment.active_function_name
)
)
swc_id = DEPRECATED_FUNCTIONS_USAGE
elif instruction["opcode"] == "CALLCODE":
log.debug("CALLCODE in function " + state.environment.active_function_name)
title = "Use of callcode"
description_head = "Use of callcode is deprecated."
description_tail = (
"The callcode method executes code of another contract in the context of the caller account. "
"Due to a bug in the implementation it does not persist sender and value over the call. It was "
"therefore deprecated and may be removed in the future. Use the delegatecall method instead."
)
swc_id = DEPRECATED_FUNCTIONS_USAGE
else:
return
try:
transaction_sequence = get_transaction_sequence(
state, state.mstate.constraints
)
except UnsatError:
return
issue = Issue(
contract=state.environment.active_account.contract_name,
function_name=state.environment.active_function_name,
address=instruction["address"],
title=title,
bytecode=state.environment.code.bytecode,
swc_id=swc_id,
severity="Medium",
description_head=description_head,
description_tail=description_tail,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
transaction_sequence=transaction_sequence,
)
return [issue]
detector = DeprecatedOperationsModule()

@ -6,6 +6,7 @@ from typing import Dict, cast, List
from mythril.analysis.swc_data import DOS_WITH_BLOCK_GAS_LIMIT
from mythril.analysis.report import Issue
from mythril.analysis.modules.base import DetectionModule
from mythril.analysis.solver import get_transaction_sequence, UnsatError
from mythril.laser.ethereum.state.global_state import GlobalState
from mythril.laser.ethereum.state.annotation import StateAnnotation
from mythril.laser.ethereum import util
@ -29,7 +30,7 @@ class VisitsAnnotation(StateAnnotation):
return result
class DOS(DetectionModule):
class DosModule(DetectionModule):
"""This module consists of a makeshift loop detector that annotates the state with
a list of byte ranges likely to be loops. If a CALL or SSTORE detection is found in
one of the ranges it creates a low-severity issue. This is not super precise but
@ -53,10 +54,10 @@ class DOS(DetectionModule):
:param state:
:return:
"""
issues = self._analyze_state(state)
self._issues.extend(issues)
self._issues.extend(self._analyze_states(state))
def _analyze_states(self, state: GlobalState) -> List[Issue]:
def _analyze_state(self, state: GlobalState) -> List[Issue]:
"""
:param state: the current state
:return: returns the issues for that corresponding state
@ -104,6 +105,13 @@ class DOS(DetectionModule):
operation
)
try:
transaction_sequence = get_transaction_sequence(
state, state.mstate.constraints
)
except UnsatError:
return []
issue = Issue(
contract=state.environment.active_account.contract_name,
function_name=state.environment.active_function_name,
@ -115,6 +123,7 @@ class DOS(DetectionModule):
description_head=description_head,
description_tail=description_tail,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
transaction_sequence=transaction_sequence,
)
return [issue]
@ -122,4 +131,4 @@ class DOS(DetectionModule):
return []
detector = DOS()
detector = DosModule()

@ -47,7 +47,6 @@ class EtherThief(DetectionModule):
entrypoint="callback",
pre_hooks=["CALL"],
)
self._cache_addresses = {}
def reset_module(self):
"""
@ -55,7 +54,6 @@ class EtherThief(DetectionModule):
:return:
"""
super().reset_module()
self._cache_addresses = {}
def _execute(self, state: GlobalState) -> None:
"""
@ -63,9 +61,15 @@ class EtherThief(DetectionModule):
:param state:
:return:
"""
self._issues.extend(self._analyze_state(state))
def _analyze_state(self, state):
if state.get_current_instruction()["address"] in self._cache:
return
issues = self._analyze_state(state)
for issue in issues:
self._cache.add(issue.address)
self._issues.extend(issues)
@staticmethod
def _analyze_state(state):
"""
:param state:
@ -77,8 +81,7 @@ class EtherThief(DetectionModule):
return []
address = instruction["address"]
if self._cache_addresses.get(address, False):
return []
value = state.mstate.stack[-3]
target = state.mstate.stack[-2]
@ -141,8 +144,6 @@ class EtherThief(DetectionModule):
log.debug("[ETHER_THIEF] no model found")
return []
# self._cache_addresses[address] = True
return [issue]

@ -12,50 +12,6 @@ from mythril.laser.ethereum.state.global_state import GlobalState
log = logging.getLogger(__name__)
def _analyze_state(state) -> list:
"""
:param state:
:return:
"""
log.debug("ASSERT_FAIL in function " + state.environment.active_function_name)
try:
address = state.get_current_instruction()["address"]
description_tail = (
"It is possible to trigger an exception (opcode 0xfe). "
"Exceptions can be caused by type errors, division by zero, "
"out-of-bounds array access, or assert violations. "
"Note that explicit `assert()` should only be used to check invariants. "
"Use `require()` for regular input checking."
)
transaction_sequence = solver.get_transaction_sequence(
state, state.mstate.constraints
)
issue = Issue(
contract=state.environment.active_account.contract_name,
function_name=state.environment.active_function_name,
address=address,
swc_id=ASSERT_VIOLATION,
title="Exception State",
severity="Low",
description_head="A reachable exception has been detected.",
description_tail=description_tail,
bytecode=state.environment.code.bytecode,
transaction_sequence=transaction_sequence,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
return [issue]
except UnsatError:
log.debug("no model found")
return []
class ReachableExceptionsModule(DetectionModule):
""""""
@ -75,7 +31,54 @@ class ReachableExceptionsModule(DetectionModule):
:param state:
:return:
"""
self._issues.extend(_analyze_state(state))
issues = self._analyze_state(state)
for issue in issues:
self._cache.add(issue.address)
self._issues.extend(issues)
@staticmethod
def _analyze_state(state) -> list:
"""
:param state:
:return:
"""
log.debug("ASSERT_FAIL in function " + state.environment.active_function_name)
try:
address = state.get_current_instruction()["address"]
description_tail = (
"It is possible to trigger an exception (opcode 0xfe). "
"Exceptions can be caused by type errors, division by zero, "
"out-of-bounds array access, or assert violations. "
"Note that explicit `assert()` should only be used to check invariants. "
"Use `require()` for regular input checking."
)
transaction_sequence = solver.get_transaction_sequence(
state, state.mstate.constraints
)
issue = Issue(
contract=state.environment.active_account.contract_name,
function_name=state.environment.active_function_name,
address=address,
swc_id=ASSERT_VIOLATION,
title="Exception State",
severity="Low",
description_head="A reachable exception has been detected.",
description_tail=description_tail,
bytecode=state.environment.code.bytecode,
transaction_sequence=transaction_sequence,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
return [issue]
except UnsatError:
log.debug("no model found")
return []
detector = ReachableExceptionsModule()

@ -27,93 +27,6 @@ an informational issue.
"""
def _analyze_state(state):
"""
:param state:
:return:
"""
gas = state.mstate.stack[-1]
to = state.mstate.stack[-2]
address = state.get_current_instruction()["address"]
try:
constraints = copy(state.mstate.constraints)
transaction_sequence = solver.get_transaction_sequence(
state, constraints + [UGT(gas, symbol_factory.BitVecVal(2300, 256))]
)
# Check whether we can also set the callee address
try:
constraints += [to == ATTACKER_ADDRESS]
for tx in state.world_state.transaction_sequence:
if not isinstance(tx, ContractCreationTransaction):
constraints.append(tx.caller == ATTACKER_ADDRESS)
transaction_sequence = solver.get_transaction_sequence(state, constraints)
description_head = "A call to a user-supplied address is executed."
description_tail = (
"The callee address of an external message call can be set by "
"the caller. Note that the callee can contain arbitrary code and may re-enter any function "
"in this contract. Review the business logic carefully to prevent averse effects on the "
"contract state."
)
issue = Issue(
contract=state.environment.active_account.contract_name,
function_name=state.environment.active_function_name,
address=address,
swc_id=REENTRANCY,
title="External Call To User-Supplied Address",
bytecode=state.environment.code.bytecode,
severity="Medium",
description_head=description_head,
description_tail=description_tail,
transaction_sequence=transaction_sequence,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
except UnsatError:
if _is_precompile_call(state):
return []
log.debug(
"[EXTERNAL_CALLS] Callee address cannot be modified. Reporting informational issue."
)
debug = json.dumps(transaction_sequence, indent=4)
description_head = "The contract executes an external message call."
description_tail = (
"An external function call to a fixed contract address is executed. Make sure "
"that the callee contract has been reviewed carefully."
)
issue = Issue(
contract=state.environment.active_account.contract_name,
function_name=state.environment.active_function_name,
address=address,
swc_id=REENTRANCY,
title="External Call To Fixed Address",
bytecode=state.environment.code.bytecode,
severity="Low",
description_head=description_head,
description_tail=description_tail,
transaction_sequence=transaction_sequence,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
except UnsatError:
log.debug("[EXTERNAL_CALLS] No model found.")
return []
return [issue]
def _is_precompile_call(global_state: GlobalState):
to = global_state.mstate.stack[-2] # type: BitVec
constraints = copy(global_state.mstate.constraints)
@ -151,7 +64,98 @@ class ExternalCalls(DetectionModule):
:param state:
:return:
"""
self._issues.extend(_analyze_state(state))
issues = self._analyze_state(state)
for issue in issues:
self._cache.add(issue.address)
self._issues.extend(issues)
@staticmethod
def _analyze_state(state):
"""
:param state:
:return:
"""
gas = state.mstate.stack[-1]
to = state.mstate.stack[-2]
address = state.get_current_instruction()["address"]
try:
constraints = copy(state.mstate.constraints)
transaction_sequence = solver.get_transaction_sequence(
state, constraints + [UGT(gas, symbol_factory.BitVecVal(2300, 256))]
)
# Check whether we can also set the callee address
try:
constraints += [to == ATTACKER_ADDRESS]
for tx in state.world_state.transaction_sequence:
if not isinstance(tx, ContractCreationTransaction):
constraints.append(tx.caller == ATTACKER_ADDRESS)
transaction_sequence = solver.get_transaction_sequence(
state, constraints
)
description_head = "A call to a user-supplied address is executed."
description_tail = (
"The callee address of an external message call can be set by "
"the caller. Note that the callee can contain arbitrary code and may re-enter any function "
"in this contract. Review the business logic carefully to prevent averse effects on the "
"contract state."
)
issue = Issue(
contract=state.environment.active_account.contract_name,
function_name=state.environment.active_function_name,
address=address,
swc_id=REENTRANCY,
title="External Call To User-Supplied Address",
bytecode=state.environment.code.bytecode,
severity="Medium",
description_head=description_head,
description_tail=description_tail,
transaction_sequence=transaction_sequence,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
except UnsatError:
if _is_precompile_call(state):
return []
log.debug(
"[EXTERNAL_CALLS] Callee address cannot be modified. Reporting informational issue."
)
description_head = "The contract executes an external message call."
description_tail = (
"An external function call to a fixed contract address is executed. Make sure "
"that the callee contract has been reviewed carefully."
)
issue = Issue(
contract=state.environment.active_account.contract_name,
function_name=state.environment.active_function_name,
address=address,
swc_id=REENTRANCY,
title="External Call To Fixed Address",
bytecode=state.environment.code.bytecode,
severity="Low",
description_head=description_head,
description_tail=description_tail,
transaction_sequence=transaction_sequence,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
except UnsatError:
log.debug("[EXTERNAL_CALLS] No model found.")
return []
return [issue]
detector = ExternalCalls()

@ -88,12 +88,6 @@ class IntegerOverflowUnderflowModule(DetectionModule):
],
)
"""
Cache addresses for which overflows or underflows already have been detected.
"""
self._overflow_cache = {} # type: Dict[int, bool]
"""
Cache satisfiability of overflow constraints
"""
@ -107,7 +101,6 @@ class IntegerOverflowUnderflowModule(DetectionModule):
:return:
"""
super().reset_module()
self._overflow_cache = {}
self._ostates_satisfiable = set()
self._ostates_unsatisfiable = set()
@ -120,7 +113,7 @@ class IntegerOverflowUnderflowModule(DetectionModule):
address = _get_address_from_state(state)
if self._overflow_cache.get(address, False):
if address in self._cache:
return
opcode = state.get_current_instruction()["opcode"]
@ -341,7 +334,7 @@ class IntegerOverflowUnderflowModule(DetectionModule):
)
address = _get_address_from_state(ostate)
self._overflow_cache[address] = True
self._cache.add(address)
self._issues.append(issue)

@ -4,6 +4,7 @@ from copy import copy
from typing import cast, List
from mythril.analysis.report import Issue
from mythril.analysis.solver import get_transaction_sequence, UnsatError
from mythril.analysis.swc_data import MULTIPLE_SENDS
from mythril.analysis.modules.base import DetectionModule
from mythril.laser.ethereum.state.annotation import StateAnnotation
@ -44,57 +45,68 @@ class MultipleSendsModule(DetectionModule):
)
def _execute(self, state: GlobalState) -> None:
self._issues.extend(_analyze_state(state))
def _analyze_state(state: GlobalState):
"""
:param state: the current state
:return: returns the issues for that corresponding state
"""
instruction = state.get_current_instruction()
annotations = cast(
List[MultipleSendsAnnotation],
list(state.get_annotations(MultipleSendsAnnotation)),
)
if len(annotations) == 0:
state.annotate(MultipleSendsAnnotation())
if state.get_current_instruction()["address"] in self._cache:
return
issues = self._analyze_state(state)
for issue in issues:
self._cache.add(issue.address)
self._issues.extend(issues)
@staticmethod
def _analyze_state(state: GlobalState):
"""
:param state: the current state
:return: returns the issues for that corresponding state
"""
instruction = state.get_current_instruction()
annotations = cast(
List[MultipleSendsAnnotation],
list(state.get_annotations(MultipleSendsAnnotation)),
)
call_offsets = annotations[0].call_offsets
if instruction["opcode"] in ["CALL", "DELEGATECALL", "STATICCALL", "CALLCODE"]:
call_offsets.append(state.get_current_instruction()["address"])
else: # RETURN or STOP
for offset in call_offsets[1:]:
description_tail = (
"This call is executed after a previous call in the same transaction. "
"Try to isolate each call, transfer or send into its own transaction."
)
issue = Issue(
contract=state.environment.active_account.contract_name,
function_name=state.environment.active_function_name,
address=offset,
swc_id=MULTIPLE_SENDS,
bytecode=state.environment.code.bytecode,
title="Multiple Calls in a Single Transaction",
severity="Low",
description_head="Multiple calls are executed in the same transaction.",
description_tail=description_tail,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
if len(annotations) == 0:
state.annotate(MultipleSendsAnnotation())
annotations = cast(
List[MultipleSendsAnnotation],
list(state.get_annotations(MultipleSendsAnnotation)),
)
return [issue]
return []
call_offsets = annotations[0].call_offsets
if instruction["opcode"] in ["CALL", "DELEGATECALL", "STATICCALL", "CALLCODE"]:
call_offsets.append(state.get_current_instruction()["address"])
else: # RETURN or STOP
for offset in call_offsets[1:]:
try:
transaction_sequence = get_transaction_sequence(
state, state.mstate.constraints
)
except UnsatError:
continue
description_tail = (
"This call is executed after a previous call in the same transaction. "
"Try to isolate each call, transfer or send into its own transaction."
)
issue = Issue(
contract=state.environment.active_account.contract_name,
function_name=state.environment.active_function_name,
address=offset,
swc_id=MULTIPLE_SENDS,
bytecode=state.environment.code.bytecode,
title="Multiple Calls in a Single Transaction",
severity="Low",
description_head="Multiple calls are executed in the same transaction.",
description_tail=description_tail,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
transaction_sequence=transaction_sequence,
)
return [issue]
return []
detector = MultipleSendsModule()

@ -35,7 +35,25 @@ class StateChangeCallsAnnotation(StateAnnotation):
def get_issue(self, global_state: GlobalState) -> Optional[Issue]:
if not self.state_change_states:
return None
constraints = copy(global_state.mstate.constraints)
gas = self.call_state.mstate.stack[-1]
to = self.call_state.mstate.stack[-2]
constraints += [
UGT(gas, symbol_factory.BitVecVal(2300, 256)),
Or(
to > symbol_factory.BitVecVal(16, 256),
to == symbol_factory.BitVecVal(0, 256),
),
]
if self.user_defined_address:
constraints += [to == 0xDEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEF]
try:
transaction_sequence = solver.get_transaction_sequence(
global_state, constraints
)
except UnsatError:
return None
severity = "Medium" if self.user_defined_address else "Low"
address = global_state.get_current_instruction()["address"]
logging.debug(
@ -59,6 +77,7 @@ class StateChangeCallsAnnotation(StateAnnotation):
description_tail=description_tail,
swc_id=REENTRANCY,
bytecode=global_state.environment.code.bytecode,
transaction_sequence=transaction_sequence,
)
@ -85,7 +104,12 @@ class StateChange(DetectionModule):
)
def _execute(self, state: GlobalState) -> None:
self._issues.extend(self._analyze_state(state))
if state.get_current_instruction()["address"] in self._cache:
return
issues = self._analyze_state(state)
for issue in issues:
self._cache.add(issue.address)
self._issues.extend(issues)
@staticmethod
def _add_external_call(global_state: GlobalState) -> None:

@ -39,7 +39,6 @@ class SuicideModule(DetectionModule):
:return:
"""
super().reset_module()
self._cache_address = {}
def _execute(self, state: GlobalState) -> None:
"""
@ -47,13 +46,18 @@ class SuicideModule(DetectionModule):
:param state:
:return:
"""
self._issues.extend(self._analyze_state(state))
def _analyze_state(self, state):
if state.get_current_instruction()["address"] in self._cache:
return
issues = self._analyze_state(state)
for issue in issues:
self._cache.add(issue.address)
self._issues.extend(issues)
@staticmethod
def _analyze_state(state):
log.info("Suicide module: Analyzing suicide instruction")
instruction = state.get_current_instruction()
if self._cache_address.get(instruction["address"], False):
return []
to = state.mstate.stack[-1]
log.debug(
@ -84,8 +88,6 @@ class SuicideModule(DetectionModule):
)
description_tail = "Arbitrary senders can kill this contract."
self._cache_address[instruction["address"]] = True
issue = Issue(
contract=state.environment.active_account.contract_name,
function_name=state.environment.active_function_name,

@ -55,70 +55,77 @@ class UncheckedRetvalModule(DetectionModule):
:param state:
:return:
"""
self._issues.extend(_analyze_state(state))
if state.get_current_instruction()["address"] in self._cache:
return
issues = self._analyze_state(state)
for issue in issues:
self._cache.add(issue.address)
self._issues.extend(issues)
def _analyze_state(self, state: GlobalState) -> list:
instruction = state.get_current_instruction()
def _analyze_state(state: GlobalState) -> list:
instruction = state.get_current_instruction()
annotations = cast(
List[UncheckedRetvalAnnotation],
[a for a in state.get_annotations(UncheckedRetvalAnnotation)],
)
if len(annotations) == 0:
state.annotate(UncheckedRetvalAnnotation())
annotations = cast(
List[UncheckedRetvalAnnotation],
[a for a in state.get_annotations(UncheckedRetvalAnnotation)],
)
retvals = annotations[0].retvals
if instruction["opcode"] in ("STOP", "RETURN"):
issues = []
for retval in retvals:
try:
solver.get_model(state.mstate.constraints + [retval["retval"] == 0])
except UnsatError:
continue
description_tail = (
"External calls return a boolean value. If the callee contract halts with an exception, 'false' is "
"returned and execution continues in the caller. It is usually recommended to wrap external calls "
"into a require statement to prevent unexpected states."
if len(annotations) == 0:
state.annotate(UncheckedRetvalAnnotation())
annotations = cast(
List[UncheckedRetvalAnnotation],
[a for a in state.get_annotations(UncheckedRetvalAnnotation)],
)
issue = Issue(
contract=state.environment.active_account.contract_name,
function_name=state.environment.active_function_name,
address=retval["address"],
bytecode=state.environment.code.bytecode,
title="Unchecked Call Return Value",
swc_id=UNCHECKED_RET_VAL,
severity="Low",
description_head="The return value of a message call is not checked.",
description_tail=description_tail,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
retvals = annotations[0].retvals
if instruction["opcode"] in ("STOP", "RETURN"):
issues = []
for retval in retvals:
try:
transaction_sequence = solver.get_transaction_sequence(
state, state.mstate.constraints + [retval["retval"] == 0]
)
except UnsatError:
continue
description_tail = (
"External calls return a boolean value. If the callee contract halts with an exception, 'false' is "
"returned and execution continues in the caller. It is usually recommended to wrap external calls "
"into a require statement to prevent unexpected states."
)
issue = Issue(
contract=state.environment.active_account.contract_name,
function_name=state.environment.active_function_name,
address=retval["address"],
bytecode=state.environment.code.bytecode,
title="Unchecked Call Return Value",
swc_id=UNCHECKED_RET_VAL,
severity="Low",
description_head="The return value of a message call is not checked.",
description_tail=description_tail,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
transaction_sequence=transaction_sequence,
)
issues.append(issue)
return issues
else:
log.debug("End of call, extracting retval")
assert state.environment.code.instruction_list[state.mstate.pc - 1][
"opcode"
] in ["CALL", "DELEGATECALL", "STATICCALL", "CALLCODE"]
retval = state.mstate.stack[-1]
# Use Typed Dict after release of mypy 0.670 and remove type ignore
retvals.append(
{ # type: ignore
"address": state.instruction["address"] - 1,
"retval": retval,
}
)
issues.append(issue)
return issues
else:
log.debug("End of call, extracting retval")
assert state.environment.code.instruction_list[state.mstate.pc - 1][
"opcode"
] in ["CALL", "DELEGATECALL", "STATICCALL", "CALLCODE"]
retval = state.mstate.stack[-1]
# Use Typed Dict after release of mypy 0.670 and remove type ignore
retvals.append(
{ # type: ignore
"address": state.instruction["address"] - 1,
"retval": retval,
}
)
return []
return []
detector = UncheckedRetvalModule()

@ -1,10 +1,10 @@
"""This module contains analysis module helpers to solve path constraints."""
from typing import Dict, List, Union
from functools import lru_cache
from typing import Dict, Tuple, Union
from z3 import sat, unknown, FuncInterp
import z3
from mythril.laser.ethereum.state.global_state import GlobalState
from mythril.laser.ethereum.state.world_state import Account
from mythril.laser.ethereum.state.constraints import Constraints
from mythril.laser.ethereum.transaction import BaseTransaction
from mythril.laser.smt import UGE, Optimize, symbol_factory
@ -17,7 +17,8 @@ import logging
log = logging.getLogger(__name__)
# LRU cache works great when used in powers of 2
@lru_cache(maxsize=2 ** 23)
def get_model(constraints, minimize=(), maximize=(), enforce_execution_time=True):
"""
@ -95,7 +96,11 @@ def get_transaction_sequence(
tx_constraints, minimize = _set_minimisation_constraints(
transaction_sequence, constraints.copy(), [], 5000
)
model = get_model(tx_constraints, minimize=minimize)
try:
model = get_model(tx_constraints, minimize=minimize)
except UnsatError:
raise UnsatError
min_price_dict = {} # type: Dict[str, int]
for transaction in transaction_sequence:
@ -166,7 +171,7 @@ def _get_concrete_transaction(model: z3.Model, transaction: BaseTransaction):
def _set_minimisation_constraints(
transaction_sequence, constraints, minimize, max_size
):
) -> Tuple[Constraints, tuple]:
""" Set constraints that minimise key transaction values
Constraints generated:
@ -188,4 +193,4 @@ def _set_minimisation_constraints(
minimize.append(transaction.call_data.calldatasize)
minimize.append(transaction.call_value)
return constraints, minimize
return constraints, tuple(minimize)

@ -200,7 +200,7 @@ class DependencyPruner(LaserPlugin):
# Is there a known read operation along this path that matches a write in the previous transaction?
try:
solver.get_model([location == dependency])
solver.get_model((location == dependency,))
return True
except UnsatError:
@ -210,7 +210,7 @@ class DependencyPruner(LaserPlugin):
for dependency in annotation.storage_loaded:
try:
solver.get_model([location == dependency])
solver.get_model((location == dependency,))
return True
except UnsatError:
continue

@ -82,6 +82,9 @@ class Constraints(list):
constraint_list = super(Constraints, self).copy()
return Constraints(constraint_list, is_possible=self._is_possible)
def copy(self) -> "Constraints":
return self.__copy__()
def __deepcopy__(self, memodict=None) -> "Constraints":
"""
@ -117,3 +120,6 @@ class Constraints(list):
constraint if isinstance(constraint, Bool) else Bool(constraint)
for constraint in constraints
]
def __hash__(self):
return tuple(self[:]).__hash__()

Loading…
Cancel
Save