Merge branch 'develop' into analyze-with-mythx

analyze-with-mythx
Nathan 5 years ago committed by GitHub
commit f37c62f4dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      .circleci/config.yml
  2. 2
      mythril/__version__.py
  3. 3
      mythril/analysis/modules/base.py
  4. 105
      mythril/analysis/modules/delegatecall.py
  5. 242
      mythril/analysis/modules/dependence_on_predictable_vars.py
  6. 116
      mythril/analysis/modules/deprecated_ops.py
  7. 39
      mythril/analysis/modules/dos.py
  8. 19
      mythril/analysis/modules/ether_thief.py
  9. 93
      mythril/analysis/modules/exceptions.py
  10. 180
      mythril/analysis/modules/external_calls.py
  11. 55
      mythril/analysis/modules/integer.py
  12. 102
      mythril/analysis/modules/multiple_sends.py
  13. 26
      mythril/analysis/modules/state_change_external_calls.py
  14. 18
      mythril/analysis/modules/suicide.py
  15. 115
      mythril/analysis/modules/unchecked_retval.py
  16. 17
      mythril/analysis/solver.py
  17. 2
      mythril/interfaces/cli.py
  18. 2
      mythril/laser/ethereum/call.py
  19. 14
      mythril/laser/ethereum/instructions.py
  20. 60
      mythril/laser/ethereum/plugins/implementations/dependency_pruner.py
  21. 2
      mythril/laser/ethereum/state/account.py
  22. 6
      mythril/laser/ethereum/state/constraints.py
  23. 12
      mythril/laser/ethereum/strategy/extensions/bounded_loops.py
  24. 4
      mythril/laser/smt/__init__.py
  25. 46
      mythril/laser/smt/bitvec.py
  26. 4
      mythril/laser/smt/bitvecfunc.py
  27. 20
      mythril/laser/smt/bool.py
  28. 17
      mythril/laser/smt/expression.py

@ -107,7 +107,7 @@ jobs:
-e CIRCLE_BUILD_URL=$CIRCLE_BUILD_URL \
-e CIRCLE_WEBHOOK_URL=$CIRCLE_WEBHOOK_URL \
--rm edelweiss-mythril:latest \
--timeout 30 \
--timeout 45 \
--output-dir /opt/edelweiss \
--s3 \
--circle-ci CircleCI/mythril.csv \

@ -4,4 +4,4 @@ This file is suitable for sourcing inside POSIX shell, e.g. bash as well
as for importing into Python.
"""
__version__ = "v0.21.4"
__version__ = "v0.21.6"

@ -2,7 +2,7 @@
modules."""
import logging
from typing import List
from typing import List, Set
from mythril.analysis.report import Issue
log = logging.getLogger(__name__)
@ -35,6 +35,7 @@ class DetectionModule:
)
self.entrypoint = entrypoint
self._issues = [] # type: List[Issue]
self._cache = set() # type: Set[int]
@property
def issues(self):

@ -93,58 +93,63 @@ class DelegateCallModule(DetectionModule):
:param state:
:return:
"""
self._issues.extend(_analyze_states(state))
def _analyze_states(state: GlobalState) -> List[Issue]:
"""
:param state: the current state
:return: returns the issues for that corresponding state
"""
issues = []
op_code = state.get_current_instruction()["opcode"]
annotations = cast(
List[DelegateCallAnnotation],
list(state.get_annotations(DelegateCallAnnotation)),
)
if len(annotations) == 0 and op_code in ("RETURN", "STOP"):
return []
if op_code == "DELEGATECALL":
gas = state.mstate.stack[-1]
to = state.mstate.stack[-2]
constraints = [
to == ATTACKER_ADDRESS,
UGT(gas, symbol_factory.BitVecVal(2300, 256)),
]
for tx in state.world_state.transaction_sequence:
if not isinstance(tx, ContractCreationTransaction):
constraints.append(tx.caller == ATTACKER_ADDRESS)
state.annotate(DelegateCallAnnotation(state, constraints))
return []
else:
for annotation in annotations:
try:
transaction_sequence = solver.get_transaction_sequence(
state,
state.mstate.constraints
+ annotation.constraints
+ [annotation.return_value == 1],
)
issues.append(
annotation.get_issue(
state, transaction_sequence=transaction_sequence
if state.get_current_instruction()["address"] in self._cache:
return
issues = self._analyze_state(state)
for issue in issues:
self._cache.add(issue.address)
self._issues.extend(issues)
@staticmethod
def _analyze_state(state: GlobalState) -> List[Issue]:
"""
:param state: the current state
:return: returns the issues for that corresponding state
"""
issues = []
op_code = state.get_current_instruction()["opcode"]
annotations = cast(
List[DelegateCallAnnotation],
list(state.get_annotations(DelegateCallAnnotation)),
)
if len(annotations) == 0 and op_code in ("RETURN", "STOP"):
return []
if op_code == "DELEGATECALL":
gas = state.mstate.stack[-1]
to = state.mstate.stack[-2]
constraints = [
to == ATTACKER_ADDRESS,
UGT(gas, symbol_factory.BitVecVal(2300, 256)),
]
for tx in state.world_state.transaction_sequence:
if not isinstance(tx, ContractCreationTransaction):
constraints.append(tx.caller == ATTACKER_ADDRESS)
state.annotate(DelegateCallAnnotation(state, constraints))
return []
else:
for annotation in annotations:
try:
transaction_sequence = solver.get_transaction_sequence(
state,
state.mstate.constraints
+ annotation.constraints
+ [annotation.return_value == 1],
)
issues.append(
annotation.get_issue(
state, transaction_sequence=transaction_sequence
)
)
)
except UnsatError:
continue
except UnsatError:
continue
return issues
return issues
detector = DelegateCallModule()

@ -1,6 +1,7 @@
"""This module contains the detection code for predictable variable
dependence."""
import logging
from copy import copy
from mythril.analysis.modules.base import DetectionModule
from mythril.analysis.report import Issue
@ -27,22 +28,25 @@ def is_prehook() -> bool:
class PredictableValueAnnotation:
"""Symbol annotation used if a variable is initialized from a predictable environment variable."""
def __init__(self, operation: str) -> None:
def __init__(self, operation: str, add_constraints=None) -> None:
self.operation = operation
self.add_constraints = add_constraints
class PredictablePathAnnotation(StateAnnotation):
"""State annotation used when a path is chosen based on a predictable variable."""
def __init__(self, operation: str, location: int) -> None:
def __init__(self, operation: str, location: int, add_constraints=None) -> None:
self.operation = operation
self.location = location
self.add_constraints = add_constraints
class OldBlockNumberUsedAnnotation(StateAnnotation):
"""State annotation set in blockhash prehook if the input value is lower than the current block number."""
def __init__(self) -> None:
def __init__(self, constraints) -> None:
self.block_constraints = constraints
pass
@ -70,135 +74,163 @@ class PredictableDependenceModule(DetectionModule):
:param state:
:return:
"""
if state.get_current_instruction()["address"] in self._cache:
return
issues = self._analyze_state(state)
for issue in issues:
self._cache.add(issue.address)
self._issues.extend(issues)
@staticmethod
def _analyze_state(state: GlobalState) -> list:
"""
self._issues.extend(_analyze_states(state))
def _analyze_states(state: GlobalState) -> list:
"""
:param state:
:return:
"""
issues = []
if is_prehook():
opcode = state.get_current_instruction()["opcode"]
if opcode in final_ops:
:param state:
:return:
"""
for annotation in state.annotations:
issues = []
if is_prehook():
opcode = state.get_current_instruction()["opcode"]
if opcode in final_ops:
for annotation in state.annotations:
if isinstance(annotation, PredictablePathAnnotation):
if annotation.add_constraints:
constraints = (
state.mstate.constraints + annotation.add_constraints
)
else:
constraints = copy(state.mstate.constraints)
try:
transaction_sequence = solver.get_transaction_sequence(
state, constraints
)
except UnsatError:
continue
description = (
"The "
+ annotation.operation
+ " is used in to determine a control flow decision. "
)
description += (
"Note that the values of variables like coinbase, gaslimit, block number and timestamp "
"are predictable and can be manipulated by a malicious miner. Also keep in mind that attackers "
"know hashes of earlier blocks. Don't use any of those environment variables for random number "
"generation or to make critical control flow decisions."
)
if isinstance(annotation, PredictablePathAnnotation):
description = (
"The "
+ annotation.operation
+ " is used in to determine a control flow decision. "
)
description += (
"Note that the values of variables like coinbase, gaslimit, block number and timestamp "
"are predictable and can be manipulated by a malicious miner. Also keep in mind that attackers "
"know hashes of earlier blocks. Don't use any of those environment variables for random number "
"generation or to make critical control flow decisions."
)
"""
Usually report low severity except in cases where the hash of a previous block is used to
determine control flow.
"""
"""
Usually report low severity except in cases where the hash of a previous block is used to
determine control flow.
"""
severity = "Medium" if "hash" in annotation.operation else "Low"
severity = "Medium" if "hash" in annotation.operation else "Low"
"""
Note: We report the location of the JUMPI that lead to this path. Usually this maps to an if or
require statement.
"""
"""
Note: We report the location of the JUMPI that lead to this path. Usually this maps to an if or
require statement.
"""
swc_id = (
TIMESTAMP_DEPENDENCE
if "timestamp" in annotation.operation
else WEAK_RANDOMNESS
)
swc_id = (
TIMESTAMP_DEPENDENCE
if "timestamp" in annotation.operation
else WEAK_RANDOMNESS
)
issue = Issue(
contract=state.environment.active_account.contract_name,
function_name=state.environment.active_function_name,
address=annotation.location,
swc_id=swc_id,
bytecode=state.environment.code.bytecode,
title="Dependence on predictable environment variable",
severity=severity,
description_head="A control flow decision is made based on a predictable variable.",
description_tail=description,
gas_used=(
state.mstate.min_gas_used,
state.mstate.max_gas_used,
),
transaction_sequence=transaction_sequence,
)
issue = Issue(
contract=state.environment.active_account.contract_name,
function_name=state.environment.active_function_name,
address=annotation.location,
swc_id=swc_id,
bytecode=state.environment.code.bytecode,
title="Dependence on predictable environment variable",
severity=severity,
description_head="A control flow decision is made based on a predictable variable.",
description_tail=description,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
issues.append(issue)
issues.append(issue)
elif opcode == "JUMPI":
elif opcode == "JUMPI":
# Look for predictable state variables in jump condition
# Look for predictable state variables in jump condition
for annotation in state.mstate.stack[-2].annotations:
for annotation in state.mstate.stack[-2].annotations:
if isinstance(annotation, PredictableValueAnnotation):
state.annotate(
PredictablePathAnnotation(
annotation.operation,
state.get_current_instruction()["address"],
if isinstance(annotation, PredictableValueAnnotation):
state.annotate(
PredictablePathAnnotation(
annotation.operation,
state.get_current_instruction()["address"],
add_constraints=annotation.add_constraints,
)
)
)
break
break
elif opcode == "BLOCKHASH":
elif opcode == "BLOCKHASH":
param = state.mstate.stack[-1]
param = state.mstate.stack[-1]
try:
constraint = [
ULT(param, state.environment.block_number),
ULT(
state.environment.block_number,
symbol_factory.BitVecVal(2 ** 255, 256),
),
]
try:
constraint = [
ULT(param, state.environment.block_number),
ULT(
state.environment.block_number,
symbol_factory.BitVecVal(2 ** 255, 256),
),
]
# Why the second constraint? Because without it Z3 returns a solution where param overflows.
# Why the second constraint? Because without it Z3 returns a solution where param overflows.
solver.get_model(constraint)
state.annotate(OldBlockNumberUsedAnnotation())
solver.get_model(state.mstate.constraints + constraint)
state.annotate(OldBlockNumberUsedAnnotation(constraint))
except UnsatError:
pass
except UnsatError:
pass
else:
# we're in post hook
opcode = state.environment.code.instruction_list[state.mstate.pc - 1]["opcode"]
else:
# we're in post hook
if opcode == "BLOCKHASH":
# if we're in the post hook of a BLOCKHASH op, check if an old block number was used to create it.
opcode = state.environment.code.instruction_list[state.mstate.pc - 1][
"opcode"
]
annotations = cast(
List[OldBlockNumberUsedAnnotation],
list(state.get_annotations(OldBlockNumberUsedAnnotation)),
)
if opcode == "BLOCKHASH":
# if we're in the post hook of a BLOCKHASH op, check if an old block number was used to create it.
if len(annotations):
state.mstate.stack[-1].annotate(
PredictableValueAnnotation("block hash of a previous block")
annotations = cast(
List[OldBlockNumberUsedAnnotation],
list(state.get_annotations(OldBlockNumberUsedAnnotation)),
)
else:
# Always create an annotation when COINBASE, GASLIMIT, TIMESTAMP or NUMBER is executed.
state.mstate.stack[-1].annotate(
PredictableValueAnnotation(
"block.{} environment variable".format(opcode.lower())
if len(annotations):
# We can append any block constraint here
state.mstate.stack[-1].annotate(
PredictableValueAnnotation(
"block hash of a previous block",
add_constraints=annotations[0].block_constraints,
)
)
else:
# Always create an annotation when COINBASE, GASLIMIT, TIMESTAMP or NUMBER is executed.
state.mstate.stack[-1].annotate(
PredictableValueAnnotation(
"block.{} environment variable".format(opcode.lower())
)
)
)
return issues
return issues
detector = PredictableDependenceModule()

@ -1,5 +1,6 @@
"""This module contains the detection code for deprecated op codes."""
from mythril.analysis.report import Issue
from mythril.analysis.solver import get_transaction_sequence, UnsatError
from mythril.analysis.swc_data import DEPRECATED_FUNCTIONS_USAGE
from mythril.analysis.modules.base import DetectionModule
from mythril.laser.ethereum.state.global_state import GlobalState
@ -12,57 +13,6 @@ Check for usage of deprecated opcodes
"""
def _analyze_state(state):
"""
:param state:
:return:
"""
node = state.node
instruction = state.get_current_instruction()
if instruction["opcode"] == "ORIGIN":
log.debug("ORIGIN in function " + node.function_name)
title = "Use of tx.origin"
description_head = "Use of tx.origin is deprecated."
description_tail = (
"The smart contract retrieves the transaction origin (tx.origin) using msg.origin. "
"Use of msg.origin is deprecated and the instruction may be removed in the future. "
"Use msg.sender instead.\nSee also: "
"https://solidity.readthedocs.io/en/develop/security-considerations.html#tx-origin".format(
state.environment.active_function_name
)
)
swc_id = DEPRECATED_FUNCTIONS_USAGE
elif instruction["opcode"] == "CALLCODE":
log.debug("CALLCODE in function " + state.environment.active_function_name)
title = "Use of callcode"
description_head = "Use of callcode is deprecated."
description_tail = (
"The callcode method executes code of another contract in the context of the caller account. "
"Due to a bug in the implementation it does not persist sender and value over the call. It was "
"therefore deprecated and may be removed in the future. Use the delegatecall method instead."
)
swc_id = DEPRECATED_FUNCTIONS_USAGE
else:
return
issue = Issue(
contract=state.environment.active_account.contract_name,
function_name=state.environment.active_function_name,
address=instruction["address"],
title=title,
bytecode=state.environment.code.bytecode,
swc_id=swc_id,
severity="Medium",
description_head=description_head,
description_tail=description_tail,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
return [issue]
class DeprecatedOperationsModule(DetectionModule):
"""This module checks for the usage of deprecated op codes."""
@ -82,7 +32,69 @@ class DeprecatedOperationsModule(DetectionModule):
:param state:
:return:
"""
self._issues.extend(_analyze_state(state))
if state.get_current_instruction()["address"] in self._cache:
return
issues = self._analyze_state(state)
for issue in issues:
self._cache.add(issue.address)
self._issues.extend(issues)
@staticmethod
def _analyze_state(state):
"""
:param state:
:return:
"""
node = state.node
instruction = state.get_current_instruction()
if instruction["opcode"] == "ORIGIN":
log.debug("ORIGIN in function " + node.function_name)
title = "Use of tx.origin"
description_head = "Use of tx.origin is deprecated."
description_tail = (
"The smart contract retrieves the transaction origin (tx.origin) using msg.origin. "
"Use of msg.origin is deprecated and the instruction may be removed in the future. "
"Use msg.sender instead.\nSee also: "
"https://solidity.readthedocs.io/en/develop/security-considerations.html#tx-origin".format(
state.environment.active_function_name
)
)
swc_id = DEPRECATED_FUNCTIONS_USAGE
elif instruction["opcode"] == "CALLCODE":
log.debug("CALLCODE in function " + state.environment.active_function_name)
title = "Use of callcode"
description_head = "Use of callcode is deprecated."
description_tail = (
"The callcode method executes code of another contract in the context of the caller account. "
"Due to a bug in the implementation it does not persist sender and value over the call. It was "
"therefore deprecated and may be removed in the future. Use the delegatecall method instead."
)
swc_id = DEPRECATED_FUNCTIONS_USAGE
else:
return
try:
transaction_sequence = get_transaction_sequence(
state, state.mstate.constraints
)
except UnsatError:
return
issue = Issue(
contract=state.environment.active_account.contract_name,
function_name=state.environment.active_function_name,
address=instruction["address"],
title=title,
bytecode=state.environment.code.bytecode,
swc_id=swc_id,
severity="Medium",
description_head=description_head,
description_tail=description_tail,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
transaction_sequence=transaction_sequence,
)
return [issue]
detector = DeprecatedOperationsModule()

@ -6,6 +6,7 @@ from typing import Dict, cast, List
from mythril.analysis.swc_data import DOS_WITH_BLOCK_GAS_LIMIT
from mythril.analysis.report import Issue
from mythril.analysis.modules.base import DetectionModule
from mythril.analysis.solver import get_transaction_sequence, UnsatError
from mythril.laser.ethereum.state.global_state import GlobalState
from mythril.laser.ethereum.state.annotation import StateAnnotation
from mythril.laser.ethereum import util
@ -18,8 +19,8 @@ class VisitsAnnotation(StateAnnotation):
"""State annotation that stores the addresses of state-modifying operations"""
def __init__(self) -> None:
self.loop_start = None
self.jump_targets = [] # type: List[int]
self.loop_start = None # type: int
self.jump_targets = {} # type: Dict[int, int]
def __copy__(self):
result = VisitsAnnotation()
@ -29,7 +30,7 @@ class VisitsAnnotation(StateAnnotation):
return result
class DOS(DetectionModule):
class DosModule(DetectionModule):
"""This module consists of a makeshift loop detector that annotates the state with
a list of byte ranges likely to be loops. If a CALL or SSTORE detection is found in
one of the ranges it creates a low-severity issue. This is not super precise but
@ -53,10 +54,10 @@ class DOS(DetectionModule):
:param state:
:return:
"""
issues = self._analyze_state(state)
self._issues.extend(issues)
self._issues.extend(self._analyze_states(state))
def _analyze_states(self, state: GlobalState) -> List[Issue]:
def _analyze_state(self, state: GlobalState) -> List[Issue]:
"""
:param state: the current state
:return: returns the issues for that corresponding state
@ -76,13 +77,19 @@ class DOS(DetectionModule):
annotation = annotations[0]
if opcode in ["JUMP", "JUMPI"]:
if annotation.loop_start is not None:
return []
target = util.get_concrete_int(state.mstate.stack[-1])
if annotation.loop_start is None:
if target in annotation.jump_targets:
annotation.loop_start = address
else:
annotation.jump_targets.append(target)
if target in annotation.jump_targets:
annotation.jump_targets[target] += 1
else:
annotation.jump_targets[target] = 1
if annotation.jump_targets[target] > 2:
annotation.loop_start = address
elif annotation.loop_start is not None:
@ -98,6 +105,13 @@ class DOS(DetectionModule):
operation
)
try:
transaction_sequence = get_transaction_sequence(
state, state.mstate.constraints
)
except UnsatError:
return []
issue = Issue(
contract=state.environment.active_account.contract_name,
function_name=state.environment.active_function_name,
@ -109,6 +123,7 @@ class DOS(DetectionModule):
description_head=description_head,
description_tail=description_tail,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
transaction_sequence=transaction_sequence,
)
return [issue]
@ -116,4 +131,4 @@ class DOS(DetectionModule):
return []
detector = DOS()
detector = DosModule()

@ -47,7 +47,6 @@ class EtherThief(DetectionModule):
entrypoint="callback",
pre_hooks=["CALL"],
)
self._cache_addresses = {}
def reset_module(self):
"""
@ -55,7 +54,6 @@ class EtherThief(DetectionModule):
:return:
"""
super().reset_module()
self._cache_addresses = {}
def _execute(self, state: GlobalState) -> None:
"""
@ -63,9 +61,15 @@ class EtherThief(DetectionModule):
:param state:
:return:
"""
self._issues.extend(self._analyze_state(state))
def _analyze_state(self, state):
if state.get_current_instruction()["address"] in self._cache:
return
issues = self._analyze_state(state)
for issue in issues:
self._cache.add(issue.address)
self._issues.extend(issues)
@staticmethod
def _analyze_state(state):
"""
:param state:
@ -77,8 +81,7 @@ class EtherThief(DetectionModule):
return []
address = instruction["address"]
if self._cache_addresses.get(address, False):
return []
value = state.mstate.stack[-3]
target = state.mstate.stack[-2]
@ -141,8 +144,6 @@ class EtherThief(DetectionModule):
log.debug("[ETHER_THIEF] no model found")
return []
# self._cache_addresses[address] = True
return [issue]

@ -12,50 +12,6 @@ from mythril.laser.ethereum.state.global_state import GlobalState
log = logging.getLogger(__name__)
def _analyze_state(state) -> list:
"""
:param state:
:return:
"""
log.debug("ASSERT_FAIL in function " + state.environment.active_function_name)
try:
address = state.get_current_instruction()["address"]
description_tail = (
"It is possible to trigger an exception (opcode 0xfe). "
"Exceptions can be caused by type errors, division by zero, "
"out-of-bounds array access, or assert violations. "
"Note that explicit `assert()` should only be used to check invariants. "
"Use `require()` for regular input checking."
)
transaction_sequence = solver.get_transaction_sequence(
state, state.mstate.constraints
)
issue = Issue(
contract=state.environment.active_account.contract_name,
function_name=state.environment.active_function_name,
address=address,
swc_id=ASSERT_VIOLATION,
title="Exception State",
severity="Low",
description_head="A reachable exception has been detected.",
description_tail=description_tail,
bytecode=state.environment.code.bytecode,
transaction_sequence=transaction_sequence,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
return [issue]
except UnsatError:
log.debug("no model found")
return []
class ReachableExceptionsModule(DetectionModule):
""""""
@ -75,7 +31,54 @@ class ReachableExceptionsModule(DetectionModule):
:param state:
:return:
"""
self._issues.extend(_analyze_state(state))
issues = self._analyze_state(state)
for issue in issues:
self._cache.add(issue.address)
self._issues.extend(issues)
@staticmethod
def _analyze_state(state) -> list:
"""
:param state:
:return:
"""
log.debug("ASSERT_FAIL in function " + state.environment.active_function_name)
try:
address = state.get_current_instruction()["address"]
description_tail = (
"It is possible to trigger an exception (opcode 0xfe). "
"Exceptions can be caused by type errors, division by zero, "
"out-of-bounds array access, or assert violations. "
"Note that explicit `assert()` should only be used to check invariants. "
"Use `require()` for regular input checking."
)
transaction_sequence = solver.get_transaction_sequence(
state, state.mstate.constraints
)
issue = Issue(
contract=state.environment.active_account.contract_name,
function_name=state.environment.active_function_name,
address=address,
swc_id=ASSERT_VIOLATION,
title="Exception State",
severity="Low",
description_head="A reachable exception has been detected.",
description_tail=description_tail,
bytecode=state.environment.code.bytecode,
transaction_sequence=transaction_sequence,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
return [issue]
except UnsatError:
log.debug("no model found")
return []
detector = ReachableExceptionsModule()

@ -27,93 +27,6 @@ an informational issue.
"""
def _analyze_state(state):
"""
:param state:
:return:
"""
gas = state.mstate.stack[-1]
to = state.mstate.stack[-2]
address = state.get_current_instruction()["address"]
try:
constraints = copy(state.mstate.constraints)
transaction_sequence = solver.get_transaction_sequence(
state, constraints + [UGT(gas, symbol_factory.BitVecVal(2300, 256))]
)
# Check whether we can also set the callee address
try:
constraints += [to == ATTACKER_ADDRESS]
for tx in state.world_state.transaction_sequence:
if not isinstance(tx, ContractCreationTransaction):
constraints.append(tx.caller == ATTACKER_ADDRESS)
transaction_sequence = solver.get_transaction_sequence(state, constraints)
description_head = "A call to a user-supplied address is executed."
description_tail = (
"The callee address of an external message call can be set by "
"the caller. Note that the callee can contain arbitrary code and may re-enter any function "
"in this contract. Review the business logic carefully to prevent averse effects on the "
"contract state."
)
issue = Issue(
contract=state.environment.active_account.contract_name,
function_name=state.environment.active_function_name,
address=address,
swc_id=REENTRANCY,
title="External Call To User-Supplied Address",
bytecode=state.environment.code.bytecode,
severity="Medium",
description_head=description_head,
description_tail=description_tail,
transaction_sequence=transaction_sequence,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
except UnsatError:
if _is_precompile_call(state):
return []
log.debug(
"[EXTERNAL_CALLS] Callee address cannot be modified. Reporting informational issue."
)
debug = json.dumps(transaction_sequence, indent=4)
description_head = "The contract executes an external message call."
description_tail = (
"An external function call to a fixed contract address is executed. Make sure "
"that the callee contract has been reviewed carefully."
)
issue = Issue(
contract=state.environment.active_account.contract_name,
function_name=state.environment.active_function_name,
address=address,
swc_id=REENTRANCY,
title="External Call To Fixed Address",
bytecode=state.environment.code.bytecode,
severity="Low",
description_head=description_head,
description_tail=description_tail,
transaction_sequence=transaction_sequence,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
except UnsatError:
log.debug("[EXTERNAL_CALLS] No model found.")
return []
return [issue]
def _is_precompile_call(global_state: GlobalState):
to = global_state.mstate.stack[-2] # type: BitVec
constraints = copy(global_state.mstate.constraints)
@ -151,7 +64,98 @@ class ExternalCalls(DetectionModule):
:param state:
:return:
"""
self._issues.extend(_analyze_state(state))
issues = self._analyze_state(state)
for issue in issues:
self._cache.add(issue.address)
self._issues.extend(issues)
@staticmethod
def _analyze_state(state):
"""
:param state:
:return:
"""
gas = state.mstate.stack[-1]
to = state.mstate.stack[-2]
address = state.get_current_instruction()["address"]
try:
constraints = copy(state.mstate.constraints)
transaction_sequence = solver.get_transaction_sequence(
state, constraints + [UGT(gas, symbol_factory.BitVecVal(2300, 256))]
)
# Check whether we can also set the callee address
try:
constraints += [to == ATTACKER_ADDRESS]
for tx in state.world_state.transaction_sequence:
if not isinstance(tx, ContractCreationTransaction):
constraints.append(tx.caller == ATTACKER_ADDRESS)
transaction_sequence = solver.get_transaction_sequence(
state, constraints
)
description_head = "A call to a user-supplied address is executed."
description_tail = (
"The callee address of an external message call can be set by "
"the caller. Note that the callee can contain arbitrary code and may re-enter any function "
"in this contract. Review the business logic carefully to prevent averse effects on the "
"contract state."
)
issue = Issue(
contract=state.environment.active_account.contract_name,
function_name=state.environment.active_function_name,
address=address,
swc_id=REENTRANCY,
title="External Call To User-Supplied Address",
bytecode=state.environment.code.bytecode,
severity="Medium",
description_head=description_head,
description_tail=description_tail,
transaction_sequence=transaction_sequence,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
except UnsatError:
if _is_precompile_call(state):
return []
log.debug(
"[EXTERNAL_CALLS] Callee address cannot be modified. Reporting informational issue."
)
description_head = "The contract executes an external message call."
description_tail = (
"An external function call to a fixed contract address is executed. Make sure "
"that the callee contract has been reviewed carefully."
)
issue = Issue(
contract=state.environment.active_account.contract_name,
function_name=state.environment.active_function_name,
address=address,
swc_id=REENTRANCY,
title="External Call To Fixed Address",
bytecode=state.environment.code.bytecode,
severity="Low",
description_head=description_head,
description_tail=description_tail,
transaction_sequence=transaction_sequence,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
except UnsatError:
log.debug("[EXTERNAL_CALLS] No model found.")
return []
return [issue]
detector = ExternalCalls()

@ -1,8 +1,6 @@
"""This module contains the detection code for integer overflows and
underflows."""
import json
from math import log2, ceil
from typing import cast, List, Dict, Set
from mythril.analysis import solver
@ -42,13 +40,16 @@ class OverUnderflowAnnotation:
self.operator = operator
self.constraint = constraint
def __deepcopy__(self, memodict={}):
new_annotation = copy(self)
return new_annotation
class OverUnderflowStateAnnotation(StateAnnotation):
""" State Annotation used if an overflow is both possible and used in the annotated path"""
def __init__(self) -> None:
self.overflowing_state_annotations = [] # type: List[OverUnderflowAnnotation]
self.ostates_seen = set() # type: Set[GlobalState]
self.overflowing_state_annotations = set() # type: Set[OverUnderflowAnnotation]
def __copy__(self):
new_annotation = OverUnderflowStateAnnotation()
@ -56,7 +57,6 @@ class OverUnderflowStateAnnotation(StateAnnotation):
new_annotation.overflowing_state_annotations = copy(
self.overflowing_state_annotations
)
new_annotation.ostates_seen = copy(self.ostates_seen)
return new_annotation
@ -88,12 +88,6 @@ class IntegerOverflowUnderflowModule(DetectionModule):
],
)
"""
Cache addresses for which overflows or underflows already have been detected.
"""
self._overflow_cache = {} # type: Dict[int, bool]
"""
Cache satisfiability of overflow constraints
"""
@ -107,7 +101,6 @@ class IntegerOverflowUnderflowModule(DetectionModule):
:return:
"""
super().reset_module()
self._overflow_cache = {}
self._ostates_satisfiable = set()
self._ostates_unsatisfiable = set()
@ -120,7 +113,7 @@ class IntegerOverflowUnderflowModule(DetectionModule):
address = _get_address_from_state(state)
if self._overflow_cache.get(address, False):
if address in self._cache:
return
opcode = state.get_current_instruction()["opcode"]
@ -236,13 +229,8 @@ class IntegerOverflowUnderflowModule(DetectionModule):
state_annotation = _get_overflowunderflow_state_annotation(state)
for annotation in value.annotations:
if (
not isinstance(annotation, OverUnderflowAnnotation)
or annotation.overflowing_state in state_annotation.ostates_seen
):
continue
state_annotation.overflowing_state_annotations.append(annotation)
state_annotation.ostates_seen.add(annotation.overflowing_state)
if isinstance(annotation, OverUnderflowAnnotation):
state_annotation.overflowing_state_annotations.add(annotation)
@staticmethod
def _handle_jumpi(state):
@ -253,13 +241,8 @@ class IntegerOverflowUnderflowModule(DetectionModule):
state_annotation = _get_overflowunderflow_state_annotation(state)
for annotation in value.annotations:
if (
not isinstance(annotation, OverUnderflowAnnotation)
or annotation.overflowing_state in state_annotation.ostates_seen
):
continue
state_annotation.overflowing_state_annotations.append(annotation)
state_annotation.ostates_seen.add(annotation.overflowing_state)
if isinstance(annotation, OverUnderflowAnnotation):
state_annotation.overflowing_state_annotations.add(annotation)
@staticmethod
def _handle_call(state):
@ -270,13 +253,8 @@ class IntegerOverflowUnderflowModule(DetectionModule):
state_annotation = _get_overflowunderflow_state_annotation(state)
for annotation in value.annotations:
if (
not isinstance(annotation, OverUnderflowAnnotation)
or annotation.overflowing_state in state_annotation.ostates_seen
):
continue
state_annotation.overflowing_state_annotations.append(annotation)
state_annotation.ostates_seen.add(annotation.overflowing_state)
if isinstance(annotation, OverUnderflowAnnotation):
state_annotation.overflowing_state_annotations.add(annotation)
@staticmethod
def _handle_return(state: GlobalState) -> None:
@ -300,11 +278,8 @@ class IntegerOverflowUnderflowModule(DetectionModule):
continue
for annotation in element.annotations:
if (
isinstance(annotation, OverUnderflowAnnotation)
and annotation not in state_annotation.overflowing_state_annotations
):
state_annotation.overflowing_state_annotations.append(annotation)
if isinstance(annotation, OverUnderflowAnnotation):
state_annotation.overflowing_state_annotations.add(annotation)
def _handle_transaction_end(self, state: GlobalState) -> None:
@ -359,7 +334,7 @@ class IntegerOverflowUnderflowModule(DetectionModule):
)
address = _get_address_from_state(ostate)
self._overflow_cache[address] = True
self._cache.add(address)
self._issues.append(issue)

@ -4,6 +4,7 @@ from copy import copy
from typing import cast, List
from mythril.analysis.report import Issue
from mythril.analysis.solver import get_transaction_sequence, UnsatError
from mythril.analysis.swc_data import MULTIPLE_SENDS
from mythril.analysis.modules.base import DetectionModule
from mythril.laser.ethereum.state.annotation import StateAnnotation
@ -44,57 +45,68 @@ class MultipleSendsModule(DetectionModule):
)
def _execute(self, state: GlobalState) -> None:
self._issues.extend(_analyze_state(state))
def _analyze_state(state: GlobalState):
"""
:param state: the current state
:return: returns the issues for that corresponding state
"""
instruction = state.get_current_instruction()
annotations = cast(
List[MultipleSendsAnnotation],
list(state.get_annotations(MultipleSendsAnnotation)),
)
if len(annotations) == 0:
state.annotate(MultipleSendsAnnotation())
if state.get_current_instruction()["address"] in self._cache:
return
issues = self._analyze_state(state)
for issue in issues:
self._cache.add(issue.address)
self._issues.extend(issues)
@staticmethod
def _analyze_state(state: GlobalState):
"""
:param state: the current state
:return: returns the issues for that corresponding state
"""
instruction = state.get_current_instruction()
annotations = cast(
List[MultipleSendsAnnotation],
list(state.get_annotations(MultipleSendsAnnotation)),
)
call_offsets = annotations[0].call_offsets
if instruction["opcode"] in ["CALL", "DELEGATECALL", "STATICCALL", "CALLCODE"]:
call_offsets.append(state.get_current_instruction()["address"])
else: # RETURN or STOP
for offset in call_offsets[1:]:
description_tail = (
"This call is executed after a previous call in the same transaction. "
"Try to isolate each call, transfer or send into its own transaction."
)
issue = Issue(
contract=state.environment.active_account.contract_name,
function_name=state.environment.active_function_name,
address=offset,
swc_id=MULTIPLE_SENDS,
bytecode=state.environment.code.bytecode,
title="Multiple Calls in a Single Transaction",
severity="Low",
description_head="Multiple calls are executed in the same transaction.",
description_tail=description_tail,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
if len(annotations) == 0:
state.annotate(MultipleSendsAnnotation())
annotations = cast(
List[MultipleSendsAnnotation],
list(state.get_annotations(MultipleSendsAnnotation)),
)
return [issue]
return []
call_offsets = annotations[0].call_offsets
if instruction["opcode"] in ["CALL", "DELEGATECALL", "STATICCALL", "CALLCODE"]:
call_offsets.append(state.get_current_instruction()["address"])
else: # RETURN or STOP
for offset in call_offsets[1:]:
try:
transaction_sequence = get_transaction_sequence(
state, state.mstate.constraints
)
except UnsatError:
continue
description_tail = (
"This call is executed after a previous call in the same transaction. "
"Try to isolate each call, transfer or send into its own transaction."
)
issue = Issue(
contract=state.environment.active_account.contract_name,
function_name=state.environment.active_function_name,
address=offset,
swc_id=MULTIPLE_SENDS,
bytecode=state.environment.code.bytecode,
title="Multiple Calls in a Single Transaction",
severity="Low",
description_head="Multiple calls are executed in the same transaction.",
description_tail=description_tail,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
transaction_sequence=transaction_sequence,
)
return [issue]
return []
detector = MultipleSendsModule()

@ -35,7 +35,25 @@ class StateChangeCallsAnnotation(StateAnnotation):
def get_issue(self, global_state: GlobalState) -> Optional[Issue]:
if not self.state_change_states:
return None
constraints = copy(global_state.mstate.constraints)
gas = self.call_state.mstate.stack[-1]
to = self.call_state.mstate.stack[-2]
constraints += [
UGT(gas, symbol_factory.BitVecVal(2300, 256)),
Or(
to > symbol_factory.BitVecVal(16, 256),
to == symbol_factory.BitVecVal(0, 256),
),
]
if self.user_defined_address:
constraints += [to == 0xDEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEF]
try:
transaction_sequence = solver.get_transaction_sequence(
global_state, constraints
)
except UnsatError:
return None
severity = "Medium" if self.user_defined_address else "Low"
address = global_state.get_current_instruction()["address"]
logging.debug(
@ -59,6 +77,7 @@ class StateChangeCallsAnnotation(StateAnnotation):
description_tail=description_tail,
swc_id=REENTRANCY,
bytecode=global_state.environment.code.bytecode,
transaction_sequence=transaction_sequence,
)
@ -85,7 +104,12 @@ class StateChange(DetectionModule):
)
def _execute(self, state: GlobalState) -> None:
self._issues.extend(self._analyze_state(state))
if state.get_current_instruction()["address"] in self._cache:
return
issues = self._analyze_state(state)
for issue in issues:
self._cache.add(issue.address)
self._issues.extend(issues)
@staticmethod
def _add_external_call(global_state: GlobalState) -> None:

@ -39,7 +39,6 @@ class SuicideModule(DetectionModule):
:return:
"""
super().reset_module()
self._cache_address = {}
def _execute(self, state: GlobalState) -> None:
"""
@ -47,13 +46,18 @@ class SuicideModule(DetectionModule):
:param state:
:return:
"""
self._issues.extend(self._analyze_state(state))
def _analyze_state(self, state):
if state.get_current_instruction()["address"] in self._cache:
return
issues = self._analyze_state(state)
for issue in issues:
self._cache.add(issue.address)
self._issues.extend(issues)
@staticmethod
def _analyze_state(state):
log.info("Suicide module: Analyzing suicide instruction")
instruction = state.get_current_instruction()
if self._cache_address.get(instruction["address"], False):
return []
to = state.mstate.stack[-1]
log.debug(
@ -84,8 +88,6 @@ class SuicideModule(DetectionModule):
)
description_tail = "Arbitrary senders can kill this contract."
self._cache_address[instruction["address"]] = True
issue = Issue(
contract=state.environment.active_account.contract_name,
function_name=state.environment.active_function_name,

@ -55,70 +55,77 @@ class UncheckedRetvalModule(DetectionModule):
:param state:
:return:
"""
self._issues.extend(_analyze_state(state))
if state.get_current_instruction()["address"] in self._cache:
return
issues = self._analyze_state(state)
for issue in issues:
self._cache.add(issue.address)
self._issues.extend(issues)
def _analyze_state(self, state: GlobalState) -> list:
instruction = state.get_current_instruction()
def _analyze_state(state: GlobalState) -> list:
instruction = state.get_current_instruction()
annotations = cast(
List[UncheckedRetvalAnnotation],
[a for a in state.get_annotations(UncheckedRetvalAnnotation)],
)
if len(annotations) == 0:
state.annotate(UncheckedRetvalAnnotation())
annotations = cast(
List[UncheckedRetvalAnnotation],
[a for a in state.get_annotations(UncheckedRetvalAnnotation)],
)
retvals = annotations[0].retvals
if instruction["opcode"] in ("STOP", "RETURN"):
issues = []
for retval in retvals:
try:
solver.get_model(state.mstate.constraints + [retval["retval"] == 0])
except UnsatError:
continue
description_tail = (
"External calls return a boolean value. If the callee contract halts with an exception, 'false' is "
"returned and execution continues in the caller. It is usually recommended to wrap external calls "
"into a require statement to prevent unexpected states."
if len(annotations) == 0:
state.annotate(UncheckedRetvalAnnotation())
annotations = cast(
List[UncheckedRetvalAnnotation],
[a for a in state.get_annotations(UncheckedRetvalAnnotation)],
)
issue = Issue(
contract=state.environment.active_account.contract_name,
function_name=state.environment.active_function_name,
address=retval["address"],
bytecode=state.environment.code.bytecode,
title="Unchecked Call Return Value",
swc_id=UNCHECKED_RET_VAL,
severity="Low",
description_head="The return value of a message call is not checked.",
description_tail=description_tail,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
retvals = annotations[0].retvals
if instruction["opcode"] in ("STOP", "RETURN"):
issues = []
for retval in retvals:
try:
transaction_sequence = solver.get_transaction_sequence(
state, state.mstate.constraints + [retval["retval"] == 0]
)
except UnsatError:
continue
description_tail = (
"External calls return a boolean value. If the callee contract halts with an exception, 'false' is "
"returned and execution continues in the caller. It is usually recommended to wrap external calls "
"into a require statement to prevent unexpected states."
)
issue = Issue(
contract=state.environment.active_account.contract_name,
function_name=state.environment.active_function_name,
address=retval["address"],
bytecode=state.environment.code.bytecode,
title="Unchecked Call Return Value",
swc_id=UNCHECKED_RET_VAL,
severity="Low",
description_head="The return value of a message call is not checked.",
description_tail=description_tail,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
transaction_sequence=transaction_sequence,
)
issues.append(issue)
return issues
else:
log.debug("End of call, extracting retval")
assert state.environment.code.instruction_list[state.mstate.pc - 1][
"opcode"
] in ["CALL", "DELEGATECALL", "STATICCALL", "CALLCODE"]
retval = state.mstate.stack[-1]
# Use Typed Dict after release of mypy 0.670 and remove type ignore
retvals.append(
{ # type: ignore
"address": state.instruction["address"] - 1,
"retval": retval,
}
)
issues.append(issue)
return issues
else:
log.debug("End of call, extracting retval")
assert state.environment.code.instruction_list[state.mstate.pc - 1][
"opcode"
] in ["CALL", "DELEGATECALL", "STATICCALL", "CALLCODE"]
retval = state.mstate.stack[-1]
# Use Typed Dict after release of mypy 0.670 and remove type ignore
retvals.append(
{ # type: ignore
"address": state.instruction["address"] - 1,
"retval": retval,
}
)
return []
return []
detector = UncheckedRetvalModule()

@ -1,10 +1,10 @@
"""This module contains analysis module helpers to solve path constraints."""
from typing import Dict, List, Union
from functools import lru_cache
from typing import Dict, Tuple, Union
from z3 import sat, unknown, FuncInterp
import z3
from mythril.laser.ethereum.state.global_state import GlobalState
from mythril.laser.ethereum.state.world_state import Account
from mythril.laser.ethereum.state.constraints import Constraints
from mythril.laser.ethereum.transaction import BaseTransaction
from mythril.laser.smt import UGE, Optimize, symbol_factory
@ -17,7 +17,8 @@ import logging
log = logging.getLogger(__name__)
# LRU cache works great when used in powers of 2
@lru_cache(maxsize=2 ** 23)
def get_model(constraints, minimize=(), maximize=(), enforce_execution_time=True):
"""
@ -95,7 +96,11 @@ def get_transaction_sequence(
tx_constraints, minimize = _set_minimisation_constraints(
transaction_sequence, constraints.copy(), [], 5000
)
model = get_model(tx_constraints, minimize=minimize)
try:
model = get_model(tx_constraints, minimize=minimize)
except UnsatError:
raise UnsatError
min_price_dict = {} # type: Dict[str, int]
for transaction in transaction_sequence:
@ -166,7 +171,7 @@ def _get_concrete_transaction(model: z3.Model, transaction: BaseTransaction):
def _set_minimisation_constraints(
transaction_sequence, constraints, minimize, max_size
):
) -> Tuple[Constraints, tuple]:
""" Set constraints that minimise key transaction values
Constraints generated:
@ -188,4 +193,4 @@ def _set_minimisation_constraints(
minimize.append(transaction.call_data.calldatasize)
minimize.append(transaction.call_value)
return constraints, minimize
return constraints, tuple(minimize)

@ -380,7 +380,7 @@ def create_analyzer_parser(analyzer_parser: ArgumentParser):
"-b",
"--loop-bound",
type=int,
default=2,
default=3,
help="Bound loops at n iterations",
metavar="N",
)

@ -96,7 +96,7 @@ def get_callee_address(
# attempt to read the contract address from instance storage
try:
callee_address = dynamic_loader.read_storage(
hex(environment.active_account.address.value), index
"0x{:040X}".format(environment.active_account.address.value), index
)
# TODO: verify whether this happens or not
except:

@ -4,7 +4,7 @@ import binascii
import logging
from copy import copy, deepcopy
from typing import cast, Callable, List, Union, Tuple
from typing import cast, Callable, List, Set, Union, Tuple, Any
from datetime import datetime
from math import ceil
from ethereum import utils
@ -553,7 +553,7 @@ class Instruction:
+ str(hash(simplify(exponent)))
+ ")",
256,
base.annotations + exponent.annotations,
base.annotations.union(exponent.annotations),
)
) # Hash is used because str(symbol) takes a long time to be converted to a string
else:
@ -562,7 +562,7 @@ class Instruction:
symbol_factory.BitVecVal(
pow(base.value, exponent.value, 2 ** 256),
256,
annotations=base.annotations + exponent.annotations,
annotations=base.annotations.union(exponent.annotations),
)
)
@ -925,15 +925,15 @@ class Instruction:
if data.symbolic:
annotations = []
annotations = set() # type: Set[Any]
for b in state.memory[index : index + length]:
if isinstance(b, BitVec):
annotations += b.annotations
annotations = annotations.union(b.annotations)
argument_str = str(state.memory[index]).replace(" ", "_")
argument_hash = hash(state.memory[index])
result = symbol_factory.BitVecFuncSym(
"KECCAC[{}]".format(argument_str),
"KECCAC[invhash({})]".format(hash(argument_hash)),
"keccak256",
256,
input_=data,

@ -44,11 +44,15 @@ class DependencyAnnotation(StateAnnotation):
return self.storage_written[iteration]
def extend_storage_write_cache(self, iteration: int, value: object):
if iteration not in self.storage_written:
self.storage_written[iteration] = [value]
else:
if value not in self.storage_written[iteration]:
self.storage_written[iteration].append(value)
try:
if iteration not in self.storage_written:
self.storage_written[iteration] = [value]
else:
if value not in self.storage_written[iteration]:
self.storage_written[iteration].append(value)
except Z3Exception as e:
# FIXME: This should not happen unless there's a bug in laser such as a BitVec512 being generated
log.debug("Error updating storage write cache: {}".format(e))
class WSDependencyAnnotation(StateAnnotation):
@ -146,6 +150,12 @@ class DependencyPruner(LaserPlugin):
:param target_location
"""
log.debug(
"Updating dependency map for path: {} with location: {}".format(
path, target_location
)
)
try:
for address in path:
if address in self.dependency_map:
@ -154,7 +164,7 @@ class DependencyPruner(LaserPlugin):
else:
self.dependency_map[address] = [target_location]
except Z3Exception as e:
# This should not happen unless there's a bug in laser, such as an invalid type being generated.
# FIXME: This should not happen unless there's a bug in laser such as a BitVec512 being generated
log.debug("Error updating dependency map: {}".format(e))
def protect_path(self, path: List[int]) -> None:
@ -166,13 +176,17 @@ class DependencyPruner(LaserPlugin):
for address in path:
self.protected_addresses.add(address)
def wanna_execute(self, address: int, storage_write_cache) -> bool:
def wanna_execute(self, address: int, annotation: DependencyAnnotation) -> bool:
"""Decide whether the basic block starting at 'address' should be executed.
:param address
:param storage_write_cache
"""
storage_write_cache = annotation.get_storage_write_cache(self.iteration - 1)
# Execute the block if it's marked as "protected" or doesn't yet have an entry in the dependency map.
if address in self.protected_addresses or address not in self.dependency_map:
return True
@ -183,8 +197,20 @@ class DependencyPruner(LaserPlugin):
for location in storage_write_cache:
for dependency in dependencies:
# Is there a known read operation along this path that matches a write in the previous transaction?
try:
solver.get_model((location == dependency,))
return True
except UnsatError:
continue
# Has the *currently executed* path been influenced by a write operation in the previous transaction?
for dependency in annotation.storage_loaded:
try:
solver.get_model([location == dependency])
solver.get_model((location == dependency,))
return True
except UnsatError:
continue
@ -212,27 +238,25 @@ class DependencyPruner(LaserPlugin):
@symbolic_vm.post_hook("JUMP")
def jump_hook(state: GlobalState):
address = state.get_current_instruction()["address"]
annotation = get_dependency_annotation(state)
_check_basic_block(address, annotation)
@symbolic_vm.pre_hook("JUMPDEST")
def jumpdest_hook(state: GlobalState):
address = state.get_current_instruction()["address"]
annotation = get_dependency_annotation(state)
annotation.path.append(address)
_check_basic_block(address, annotation)
@symbolic_vm.post_hook("JUMPI")
def jumpi_hook(state: GlobalState):
address = state.get_current_instruction()["address"]
annotation = get_dependency_annotation(state)
annotation.path.append(address)
_check_basic_block(address, annotation)
@symbolic_vm.pre_hook("SSTORE")
def sstore_hook(state: GlobalState):
annotation = get_dependency_annotation(state)
annotation.extend_storage_write_cache(
self.iteration, state.mstate.stack[-1]
)
@ -284,15 +308,11 @@ class DependencyPruner(LaserPlugin):
if self.iteration < 2:
return
annotation.path.append(address)
if self.wanna_execute(
address, annotation.get_storage_write_cache(self.iteration - 1)
):
if self.wanna_execute(address, annotation):
return
else:
log.debug(
"Skipping state: Storage slots {} not read in block at address {}".format(
"Skipping state: Storage slots {} not read in block at address {}, function".format(
annotation.get_storage_write_cache(self.iteration - 1), address
)
)

@ -64,7 +64,7 @@ class Storage:
storage[item] = symbol_factory.BitVecVal(
int(
self.dynld.read_storage(
contract_address=hex(self.address.value),
contract_address="0x{:040X}".format(self.address.value),
index=int(item.value),
),
16,

@ -82,6 +82,9 @@ class Constraints(list):
constraint_list = super(Constraints, self).copy()
return Constraints(constraint_list, is_possible=self._is_possible)
def copy(self) -> "Constraints":
return self.__copy__()
def __deepcopy__(self, memodict=None) -> "Constraints":
"""
@ -117,3 +120,6 @@ class Constraints(list):
constraint if isinstance(constraint, Bool) else Bool(constraint)
for constraint in constraints
]
def __hash__(self):
return tuple(self[:]).__hash__()

@ -1,7 +1,7 @@
from mythril.laser.ethereum.state.global_state import GlobalState
from mythril.laser.ethereum.strategy.basic import BasicSearchStrategy
from mythril.laser.ethereum.state.annotation import StateAnnotation
from mythril.laser.ethereum import util
from mythril.laser.ethereum.transaction import ContractCreationTransaction
from typing import Dict, cast, List
from copy import copy
import logging
@ -71,7 +71,15 @@ class BoundedLoopsStrategy(BasicSearchStrategy):
else:
annotation._reached_count[address] = 1
if annotation._reached_count[address] > self.bound:
# The creation transaction gets a higher loop bound to give it a better chance of success.
# TODO: There's probably a nicer way to do this
if isinstance(
state.current_transaction, ContractCreationTransaction
) and annotation._reached_count[address] < max(8, self.bound):
return state
elif annotation._reached_count[address] > self.bound:
log.debug("Loop bound reached, skipping state")
continue

@ -22,11 +22,11 @@ from mythril.laser.smt.array import K, Array, BaseArray
from mythril.laser.smt.solver import Solver, Optimize, SolverStatistics
from mythril.laser.smt.model import Model
from typing import Union, Any, Optional, List, TypeVar, Generic
from typing import Union, Any, Optional, Set, TypeVar, Generic
import z3
Annotations = Optional[List[Any]]
Annotations = Optional[Set[Any]]
T = TypeVar("T", bound=Union[bool.Bool, z3.BoolRef])
U = TypeVar("U", bound=Union[BitVec, z3.BitVecRef])

@ -1,13 +1,13 @@
"""This module provides classes for an SMT abstraction of bit vectors."""
from typing import Union, overload, List, cast, Any, Optional, Callable
from typing import Union, overload, List, Set, cast, Any, Optional, Callable
from operator import lshift, rshift
import z3
from mythril.laser.smt.bool import Bool, And, Or
from mythril.laser.smt.expression import Expression
Annotations = List[Any]
Annotations = Set[Any]
# fmt: off
@ -61,7 +61,7 @@ class BitVec(Expression[z3.BitVecRef]):
if isinstance(other, int):
return BitVec(self.raw + other, annotations=self.annotations)
union = self.annotations + other.annotations
union = self.annotations.union(other.annotations)
return BitVec(self.raw + other.raw, annotations=union)
def __sub__(self, other: Union[int, "BitVec"]) -> "BitVec":
@ -75,7 +75,7 @@ class BitVec(Expression[z3.BitVecRef]):
if isinstance(other, int):
return BitVec(self.raw - other, annotations=self.annotations)
union = self.annotations + other.annotations
union = self.annotations.union(other.annotations)
return BitVec(self.raw - other.raw, annotations=union)
def __mul__(self, other: "BitVec") -> "BitVec":
@ -86,7 +86,7 @@ class BitVec(Expression[z3.BitVecRef]):
"""
if isinstance(other, BitVecFunc):
return other * self
union = self.annotations + other.annotations
union = self.annotations.union(other.annotations)
return BitVec(self.raw * other.raw, annotations=union)
def __truediv__(self, other: "BitVec") -> "BitVec":
@ -97,7 +97,7 @@ class BitVec(Expression[z3.BitVecRef]):
"""
if isinstance(other, BitVecFunc):
return other / self
union = self.annotations + other.annotations
union = self.annotations.union(other.annotations)
return BitVec(self.raw / other.raw, annotations=union)
def __and__(self, other: Union[int, "BitVec"]) -> "BitVec":
@ -110,7 +110,7 @@ class BitVec(Expression[z3.BitVecRef]):
return other & self
if not isinstance(other, BitVec):
other = BitVec(z3.BitVecVal(other, self.size()))
union = self.annotations + other.annotations
union = self.annotations.union(other.annotations)
return BitVec(self.raw & other.raw, annotations=union)
def __or__(self, other: "BitVec") -> "BitVec":
@ -121,7 +121,7 @@ class BitVec(Expression[z3.BitVecRef]):
"""
if isinstance(other, BitVecFunc):
return other | self
union = self.annotations + other.annotations
union = self.annotations.union(other.annotations)
return BitVec(self.raw | other.raw, annotations=union)
def __xor__(self, other: "BitVec") -> "BitVec":
@ -132,7 +132,7 @@ class BitVec(Expression[z3.BitVecRef]):
"""
if isinstance(other, BitVecFunc):
return other ^ self
union = self.annotations + other.annotations
union = self.annotations.union(other.annotations)
return BitVec(self.raw ^ other.raw, annotations=union)
def __lt__(self, other: "BitVec") -> Bool:
@ -143,7 +143,7 @@ class BitVec(Expression[z3.BitVecRef]):
"""
if isinstance(other, BitVecFunc):
return other > self
union = self.annotations + other.annotations
union = self.annotations.union(other.annotations)
return Bool(self.raw < other.raw, annotations=union)
def __gt__(self, other: "BitVec") -> Bool:
@ -154,7 +154,7 @@ class BitVec(Expression[z3.BitVecRef]):
"""
if isinstance(other, BitVecFunc):
return other < self
union = self.annotations + other.annotations
union = self.annotations.union(other.annotations)
return Bool(self.raw > other.raw, annotations=union)
def __le__(self, other: "BitVec") -> Bool:
@ -163,7 +163,7 @@ class BitVec(Expression[z3.BitVecRef]):
:param other:
:return:
"""
union = self.annotations + other.annotations
union = self.annotations.union(other.annotations)
return Bool(self.raw <= other.raw, annotations=union)
def __ge__(self, other: "BitVec") -> Bool:
@ -172,7 +172,7 @@ class BitVec(Expression[z3.BitVecRef]):
:param other:
:return:
"""
union = self.annotations + other.annotations
union = self.annotations.union(other.annotations)
return Bool(self.raw >= other.raw, annotations=union)
# MYPY: fix complains about overriding __eq__
@ -189,7 +189,7 @@ class BitVec(Expression[z3.BitVecRef]):
cast(z3.BoolRef, self.raw == other), annotations=self.annotations
)
union = self.annotations + other.annotations
union = self.annotations.union(other.annotations)
# MYPY: fix complaints due to z3 overriding __eq__
return Bool(cast(z3.BoolRef, self.raw == other.raw), annotations=union)
@ -207,7 +207,7 @@ class BitVec(Expression[z3.BitVecRef]):
cast(z3.BoolRef, self.raw != other), annotations=self.annotations
)
union = self.annotations + other.annotations
union = self.annotations.union(other.annotations)
# MYPY: fix complaints due to z3 overriding __eq__
return Bool(cast(z3.BoolRef, self.raw != other.raw), annotations=union)
@ -224,7 +224,7 @@ class BitVec(Expression[z3.BitVecRef]):
return BitVec(
operator(self.raw, other), annotations=self.annotations
)
union = self.annotations + other.annotations
union = self.annotations.union(other.annotations)
return BitVec(operator(self.raw, other.raw), annotations=union)
def __lshift__(self, other: Union[int, "BitVec"]) -> "BitVec":
@ -254,7 +254,7 @@ class BitVec(Expression[z3.BitVecRef]):
def _comparison_helper(
a: BitVec, b: BitVec, operation: Callable, default_value: bool, inputs_equal: bool
) -> Bool:
annotations = a.annotations + b.annotations
annotations = a.annotations.union(b.annotations)
if isinstance(a, BitVecFunc):
if not a.symbolic and not b.symbolic:
return Bool(operation(a.raw, b.raw), annotations=annotations)
@ -277,7 +277,7 @@ def _comparison_helper(
def _arithmetic_helper(a: BitVec, b: BitVec, operation: Callable) -> BitVec:
raw = operation(a.raw, b.raw)
union = a.annotations + b.annotations
union = a.annotations.union(b.annotations)
if isinstance(a, BitVecFunc) and isinstance(b, BitVecFunc):
return BitVecFunc(raw=raw, func_name=None, input_=None, annotations=union)
@ -313,7 +313,7 @@ def If(a: Union[Bool, bool], b: Union[BitVec, int], c: Union[BitVec, int]) -> Bi
b = BitVec(z3.BitVecVal(b, 256))
if not isinstance(c, BitVec):
c = BitVec(z3.BitVecVal(c, 256))
union = a.annotations + b.annotations + c.annotations
union = a.annotations.union(b.annotations).union(c.annotations)
return BitVec(z3.If(a.raw, b.raw, c.raw), union)
@ -378,10 +378,10 @@ def Concat(*args: Union[BitVec, List[BitVec]]) -> BitVec:
bvs = cast(List[BitVec], args)
nraw = z3.Concat([a.raw for a in bvs])
annotations = [] # type: Annotations
annotations = set() # type: Annotations
bitvecfunc = False
for bv in bvs:
annotations += bv.annotations
annotations = annotations.union(bv.annotations)
if isinstance(bv, BitVecFunc):
bitvecfunc = True
@ -448,11 +448,11 @@ def Sum(*args: BitVec) -> BitVec:
:return:
"""
raw = z3.Sum([a.raw for a in args])
annotations = [] # type: Annotations
annotations = set() # type: Annotations
bitvecfuncs = []
for bv in args:
annotations += bv.annotations
annotations = annotations.union(bv.annotations)
if isinstance(bv, BitVecFunc):
bitvecfuncs.append(bv)

@ -23,7 +23,7 @@ def _arithmetic_helper(
b = BitVec(z3.BitVecVal(b, a.size()))
raw = operation(a.raw, b.raw)
union = a.annotations + b.annotations
union = a.annotations.union(b.annotations)
if isinstance(b, BitVecFunc):
# TODO: Find better value to set input and name to in this case?
@ -53,7 +53,7 @@ def _comparison_helper(
if isinstance(b, int):
b = BitVec(z3.BitVecVal(b, a.size()))
union = a.annotations + b.annotations
union = a.annotations.union(b.annotations)
if not a.symbolic and not b.symbolic:
return Bool(z3.BoolVal(operation(a.value, b.value)), annotations=union)

@ -1,7 +1,7 @@
"""This module provides classes for an SMT abstraction of boolean
expressions."""
from typing import Union, cast, List
from typing import Union, cast, List, Set
import z3
@ -55,7 +55,7 @@ class Bool(Expression[z3.BoolRef]):
"""
if isinstance(other, Expression):
return Bool(cast(z3.BoolRef, self.raw == other.raw),
self.annotations + other.annotations)
self.annotations.union(other.annotations))
return Bool(cast(z3.BoolRef, self.raw == other), self.annotations)
# MYPY: complains about overloading __ne__ # noqa
@ -67,7 +67,7 @@ class Bool(Expression[z3.BoolRef]):
"""
if isinstance(other, Expression):
return Bool(cast(z3.BoolRef, self.raw != other.raw),
self.annotations + other.annotations)
self.annotations.union(other.annotations))
return Bool(cast(z3.BoolRef, self.raw != other), self.annotations)
def __bool__(self) -> bool:
@ -86,17 +86,17 @@ class Bool(Expression[z3.BoolRef]):
def And(*args: Union[Bool, bool]) -> Bool:
"""Create an And expression."""
union = [] # type: List
annotations = set() # type: Set
args_list = [arg if isinstance(arg, Bool) else Bool(arg) for arg in args]
for arg in args_list:
union += arg.annotations
return Bool(z3.And([a.raw for a in args_list]), union)
annotations = annotations.union(arg.annotations)
return Bool(z3.And([a.raw for a in args_list]), annotations)
def Xor(a: Bool, b: Bool) -> Bool:
"""Create an And expression."""
union = a.annotations + b.annotations
union = a.annotations.union(b.annotations)
return Bool(z3.Xor(a.raw, b.raw), union)
@ -108,10 +108,10 @@ def Or(*args: Union[Bool, bool]) -> Bool:
:return:
"""
args_list = [arg if isinstance(arg, Bool) else Bool(arg) for arg in args]
union = [] # type: List
annotations = set() # type: Set
for arg in args_list:
union += arg.annotations
return Bool(z3.Or([a.raw for a in args_list]), annotations=union)
annotations = annotations.union(arg.annotations)
return Bool(z3.Or([a.raw for a in args_list]), annotations=annotations)
def Not(a: Bool) -> Bool:

@ -1,9 +1,9 @@
"""This module contains the SMT abstraction for a basic symbol expression."""
from typing import Optional, List, Any, TypeVar, Generic, cast
from typing import Optional, Set, Any, TypeVar, Generic, cast
import z3
Annotations = List[Any]
Annotations = Set[Any]
T = TypeVar("T", bound=z3.ExprRef)
@ -18,7 +18,11 @@ class Expression(Generic[T]):
:param annotations:
"""
self.raw = raw
self._annotations = annotations or []
if annotations:
assert isinstance(annotations, set)
self._annotations = annotations or set()
@property
def annotations(self) -> Annotations:
@ -26,6 +30,7 @@ class Expression(Generic[T]):
:return:
"""
return self._annotations
def annotate(self, annotation: Any) -> None:
@ -33,10 +38,8 @@ class Expression(Generic[T]):
:param annotation:
"""
if isinstance(annotation, list):
self._annotations += annotation
else:
self._annotations.append(annotation)
self._annotations.add(annotation)
def simplify(self) -> None:
"""Simplify this expression."""

Loading…
Cancel
Save