Merge pull request #709 from ConsenSys/feature/detection-module

Detection Module Refactoring
pull/713/head
Bernhard Mueller 6 years ago committed by GitHub
commit 0d736764f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 37
      mythril/analysis/modules/base.py
  2. 194
      mythril/analysis/modules/delegatecall.py
  3. 286
      mythril/analysis/modules/dependence_on_predictable_vars.py
  4. 74
      mythril/analysis/modules/deprecated_ops.py
  5. 123
      mythril/analysis/modules/ether_thief.py
  6. 101
      mythril/analysis/modules/exceptions.py
  7. 328
      mythril/analysis/modules/external_calls.py
  8. 581
      mythril/analysis/modules/integer.py
  9. 144
      mythril/analysis/modules/multiple_sends.py
  10. 123
      mythril/analysis/modules/suicide.py
  11. 269
      mythril/analysis/modules/transaction_order_dependence.py
  12. 174
      mythril/analysis/modules/unchecked_retval.py
  13. 51
      mythril/analysis/security.py
  14. 4
      mythril/analysis/symbolic.py
  15. 22
      mythril/laser/ethereum/svm.py
  16. 34
      tests/analysis/test_delegatecall.py
  17. 2
      tests/native_test.py
  18. 4
      tests/svm_test.py

@ -0,0 +1,37 @@
import logging
from typing import List
class DetectionModule:
def __init__(
self,
name: str,
swc_id: str,
hooks: List[str],
description: str,
entrypoint: str = "post",
):
self.name = name
self.swc_id = swc_id
self.hooks = hooks
self.description = description
if entrypoint not in ("post", "callback"):
logging.error(
"Invalid entrypoint in module %s, must be one of {post, callback}",
self.name,
)
self.entrypoint = entrypoint
def execute(self, statespace):
raise NotImplementedError()
def __repr__(self):
return (
"<"
"DetectionModule "
"name={0.name} "
"swc_id={0.swc_id} "
"hooks={0.hooks} "
"description={0.description}"
">"
).format(self)

@ -2,105 +2,111 @@ import re
from mythril.analysis.swc_data import DELEGATECALL_TO_UNTRUSTED_CONTRACT
from mythril.analysis.ops import get_variable, VarType
from mythril.analysis.report import Issue
from mythril.analysis.modules.base import DetectionModule
import logging
"""
MODULE DESCRIPTION:
Check for invocations of delegatecall(msg.data) in the fallback function.
"""
def execute(statespace):
"""
Executes analysis module for delegate call analysis module
:param statespace: Statespace to analyse
:return: Found issues
"""
issues = []
for call in statespace.calls:
if call.type is not "DELEGATECALL":
continue
if call.node.function_name is not "fallback":
continue
state = call.state
address = state.get_current_instruction()["address"]
meminstart = get_variable(state.mstate.stack[-3])
if meminstart.type == VarType.CONCRETE:
issues += _concrete_call(call, state, address, meminstart)
if call.to.type == VarType.SYMBOLIC:
issues += _symbolic_call(call, state, address, statespace)
return issues
def _concrete_call(call, state, address, meminstart):
if not re.search(r"calldata.*_0", str(state.mstate.memory[meminstart.val])):
return []
issue = Issue(
contract=call.node.contract_name,
function_name=call.node.function_name,
address=address,
swc_id=DELEGATECALL_TO_UNTRUSTED_CONTRACT,
bytecode=state.environment.code.bytecode,
title="Call data forwarded with delegatecall()",
_type="Informational",
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
issue.description = (
"This contract forwards its call data via DELEGATECALL in its fallback function. "
"This means that any function in the called contract can be executed. Note that the callee contract will have "
"access to the storage of the calling contract.\n "
)
target = hex(call.to.val) if call.to.type == VarType.CONCRETE else str(call.to)
issue.description += "DELEGATECALL target: {}".format(target)
return [issue]
def _symbolic_call(call, state, address, statespace):
issue = Issue(
contract=call.node.contract_name,
function_name=call.node.function_name,
address=address,
swc_id=DELEGATECALL_TO_UNTRUSTED_CONTRACT,
bytecode=state.environment.code.bytecode,
title=call.type + " to a user-supplied address",
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
if "calldata" in str(call.to):
issue.description = "This contract delegates execution to a contract address obtained from calldata. "
class DelegateCallModule(DetectionModule):
def __init__(self):
super().__init__(
name="DELEGATECALL Usage in Fallback Function",
swc_id=DELEGATECALL_TO_UNTRUSTED_CONTRACT,
hooks=["DELEGATECALL"],
description="Check for invocations of delegatecall(msg.data) in the fallback function.",
)
def execute(self, statespace):
"""
Executes analysis module for delegate call analysis module
:param statespace: Statespace to analyse
:return: Found issues
"""
issues = []
for call in statespace.calls:
if call.type is not "DELEGATECALL":
continue
if call.node.function_name is not "fallback":
continue
state = call.state
address = state.get_current_instruction()["address"]
meminstart = get_variable(state.mstate.stack[-3])
if meminstart.type == VarType.CONCRETE:
issues += self._concrete_call(call, state, address, meminstart)
if call.to.type == VarType.SYMBOLIC:
issues += self._symbolic_call(call, state, address, statespace)
return issues
def _concrete_call(self, call, state, address, meminstart):
if not re.search(r"calldata.*_0", str(state.mstate.memory[meminstart.val])):
return []
issue = Issue(
contract=call.node.contract_name,
function_name=call.node.function_name,
address=address,
swc_id=DELEGATECALL_TO_UNTRUSTED_CONTRACT,
bytecode=state.environment.code.bytecode,
title="Call data forwarded with delegatecall()",
_type="Informational",
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
issue.description = (
"This contract forwards its call data via DELEGATECALL in its fallback function. "
"This means that any function in the called contract can be executed. Note that the callee contract will have "
"access to the storage of the calling contract.\n "
)
target = hex(call.to.val) if call.to.type == VarType.CONCRETE else str(call.to)
issue.description += "DELEGATECALL target: {}".format(target)
return [issue]
def _symbolic_call(self, call, state, address, statespace):
issue = Issue(
contract=call.node.contract_name,
function_name=call.node.function_name,
address=address,
swc_id=DELEGATECALL_TO_UNTRUSTED_CONTRACT,
bytecode=state.environment.code.bytecode,
title=call.type + " to a user-supplied address",
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
if "calldata" in str(call.to):
issue.description = "This contract delegates execution to a contract address obtained from calldata. "
else:
m = re.search(r"storage_([a-z0-9_&^]+)", str(call.to))
if m:
idx = m.group(1)
func = statespace.find_storage_write(
state.environment.active_account.address, idx
)
else:
m = re.search(r"storage_([a-z0-9_&^]+)", str(call.to))
if func:
issue.description = (
"This contract delegates execution to a contract address in storage slot "
+ str(idx)
+ ". This storage slot can be written to by calling the function `"
+ func
+ "`. "
)
if m:
idx = m.group(1)
func = statespace.find_storage_write(
state.environment.active_account.address, idx
)
else:
logging.debug(
"[DELEGATECALL] No storage writes to index " + str(idx)
)
if func:
issue.description = (
"This contract delegates execution to a contract address in storage slot "
+ str(idx)
+ ". This storage slot can be written to by calling the function `"
+ func
+ "`. "
)
issue.description += "Be aware that the called contract gets unrestricted access to this contract's state."
return [issue]
else:
logging.debug("[DELEGATECALL] No storage writes to index " + str(idx))
issue.description += "Be aware that the called contract gets unrestricted access to this contract's state."
return [issue]
detector = DelegateCallModule()

@ -4,157 +4,125 @@ from mythril.analysis.ops import VarType
from mythril.analysis import solver
from mythril.analysis.report import Issue
from mythril.analysis.swc_data import TIMESTAMP_DEPENDENCE, PREDICTABLE_VARS_DEPENDENCE
from mythril.analysis.modules.base import DetectionModule
from mythril.exceptions import UnsatError
import logging
"""
MODULE DESCRIPTION:
Check for CALLs that send >0 Ether as a result of computation based on predictable variables such as
block.coinbase, block.gaslimit, block.timestamp, block.number
TODO:
- block.blockhash(block.number-1)
- block.blockhash(some_block_past_256_blocks_from_now)==0
- external source of random numbers (e.g. Oraclize)
"""
def execute(statespace):
logging.debug("Executing module: DEPENDENCE_ON_PREDICTABLE_VARS")
issues = []
for call in statespace.calls:
if "callvalue" in str(call.value):
logging.debug("[DEPENDENCE_ON_PREDICTABLE_VARS] Skipping refund function")
continue
# We're only interested in calls that send Ether
if call.value.type == VarType.CONCRETE and call.value.val == 0:
continue
class PredictableDependenceModule(DetectionModule):
def __init__(self):
super().__init__(
name="Dependence of Predictable Variables",
swc_id="{} {}".format(TIMESTAMP_DEPENDENCE, PREDICTABLE_VARS_DEPENDENCE),
hooks=["COINBASE", "GASLIMIT", "TIMESTAMP", "NUMBER", "BLOCKHASH"],
description=(
"Check for CALLs that send >0 Ether as a result of computation "
"based on predictable variables such as block.coinbase, "
"block.gaslimit, block.timestamp, block.number"
),
)
address = call.state.get_current_instruction()["address"]
def execute(self, statespace):
description = "In the function `" + call.node.function_name + "` "
description += "the following predictable state variables are used to determine Ether recipient:\n"
logging.debug("Executing module: DEPENDENCE_ON_PREDICTABLE_VARS")
# First check: look for predictable state variables in node & call recipient constraints
issues = []
vars = ["coinbase", "gaslimit", "timestamp", "number"]
for call in statespace.calls:
found = []
for var in vars:
for constraint in call.node.constraints[:] + [call.to]:
if var in str(constraint):
found.append(var)
if len(found):
for item in found:
description += "- block.{}\n".format(item)
if solve(call):
swc_type = (
TIMESTAMP_DEPENDENCE
if item == "timestamp"
else PREDICTABLE_VARS_DEPENDENCE
)
issue = Issue(
contract=call.node.contract_name,
function_name=call.node.function_name,
address=address,
swc_id=swc_type,
bytecode=call.state.environment.code.bytecode,
title="Dependence on predictable environment variable",
_type="Warning",
description=description,
gas_used=(
call.state.mstate.min_gas_used,
call.state.mstate.max_gas_used,
),
if "callvalue" in str(call.value):
logging.debug(
"[DEPENDENCE_ON_PREDICTABLE_VARS] Skipping refund function"
)
issues.append(issue)
continue
# We're only interested in calls that send Ether
if call.value.type == VarType.CONCRETE and call.value.val == 0:
continue
address = call.state.get_current_instruction()["address"]
description = "In the function `" + call.node.function_name + "` "
description += "the following predictable state variables are used to determine Ether recipient:\n"
# First check: look for predictable state variables in node & call recipient constraints
vars = ["coinbase", "gaslimit", "timestamp", "number"]
found = []
for var in vars:
for constraint in call.node.constraints[:] + [call.to]:
if var in str(constraint):
found.append(var)
if len(found):
for item in found:
description += "- block.{}\n".format(item)
if self.solve(call):
swc_type = (
TIMESTAMP_DEPENDENCE
if item == "timestamp"
else PREDICTABLE_VARS_DEPENDENCE
)
issue = Issue(
contract=call.node.contract_name,
function_name=call.node.function_name,
address=address,
swc_id=swc_type,
bytecode=call.state.environment.code.bytecode,
title="Dependence on predictable environment variable",
_type="Warning",
description=description,
gas_used=(
call.state.mstate.min_gas_used,
call.state.mstate.max_gas_used,
),
)
issues.append(issue)
# Second check: blockhash
# Second check: blockhash
for constraint in call.node.constraints[:] + [call.to]:
if "blockhash" in str(constraint):
description = "In the function `" + call.node.function_name + "` "
if "number" in str(constraint):
m = re.search(r"blockhash\w+(\s-\s(\d+))*", str(constraint))
if m and solve(call):
for constraint in call.node.constraints[:] + [call.to]:
if "blockhash" in str(constraint):
description = "In the function `" + call.node.function_name + "` "
if "number" in str(constraint):
m = re.search(r"blockhash\w+(\s-\s(\d+))*", str(constraint))
if m and self.solve(call):
found = m.group(1)
found = m.group(1)
if found: # block.blockhash(block.number - N)
description += (
"predictable expression 'block.blockhash(block.number - "
+ m.group(2)
+ ")' is used to determine Ether recipient"
)
if int(m.group(2)) > 255:
if found: # block.blockhash(block.number - N)
description += (
"predictable expression 'block.blockhash(block.number - "
+ m.group(2)
+ ")' is used to determine Ether recipient"
)
if int(m.group(2)) > 255:
description += ", this expression will always be equal to zero."
elif "storage" in str(
constraint
): # block.blockhash(block.number - storage_0)
description += (
"predictable expression 'block.blockhash(block.number - "
+ "some_storage_var)' is used to determine Ether recipient"
)
else: # block.blockhash(block.number)
description += (
"predictable expression 'block.blockhash(block.number)'"
+ " is used to determine Ether recipient"
)
description += (
", this expression will always be equal to zero."
)
elif "storage" in str(
constraint
): # block.blockhash(block.number - storage_0)
description += (
"predictable expression 'block.blockhash(block.number - "
+ "some_storage_var)' is used to determine Ether recipient"
)
else: # block.blockhash(block.number)
description += (
"predictable expression 'block.blockhash(block.number)'"
+ " is used to determine Ether recipient"
)
description += (
", this expression will always be equal to zero."
)
issue = Issue(
contract=call.node.contract_name,
function_name=call.node.function_name,
address=address,
bytecode=call.state.environment.code.bytecode,
title="Dependence on predictable variable",
_type="Warning",
description=description,
swc_id=PREDICTABLE_VARS_DEPENDENCE,
gas_used=(
call.state.mstate.min_gas_used,
call.state.mstate.max_gas_used,
),
)
issues.append(issue)
break
else:
r = re.search(r"storage_([a-z0-9_&^]+)", str(constraint))
if r: # block.blockhash(storage_0)
"""
We actually can do better here by adding a constraint blockhash_block_storage_0 == 0
and checking model satisfiability. When this is done, severity can be raised
from 'Informational' to 'Warning'.
Checking that storage at given index can be tainted is not necessary, since it usually contains
block.number of the 'commit' transaction in commit-reveal workflow.
"""
index = r.group(1)
if index and solve(call):
description += (
"block.blockhash() is calculated using a value from storage "
"at index {}".format(index)
)
issue = Issue(
contract=call.node.contract_name,
function_name=call.node.function_name,
address=address,
bytecode=call.state.environment.code.bytecode,
title="Dependence on predictable variable",
_type="Informational",
_type="Warning",
description=description,
swc_id=PREDICTABLE_VARS_DEPENDENCE,
gas_used=(
@ -164,20 +132,56 @@ def execute(statespace):
)
issues.append(issue)
break
return issues
else:
r = re.search(r"storage_([a-z0-9_&^]+)", str(constraint))
if r: # block.blockhash(storage_0)
"""
We actually can do better here by adding a constraint blockhash_block_storage_0 == 0
and checking model satisfiability. When this is done, severity can be raised
from 'Informational' to 'Warning'.
Checking that storage at given index can be tainted is not necessary, since it usually contains
block.number of the 'commit' transaction in commit-reveal workflow.
"""
index = r.group(1)
if index and self.solve(call):
description += (
"block.blockhash() is calculated using a value from storage "
"at index {}".format(index)
)
issue = Issue(
contract=call.node.contract_name,
function_name=call.node.function_name,
address=address,
bytecode=call.state.environment.code.bytecode,
title="Dependence on predictable variable",
_type="Informational",
description=description,
swc_id=PREDICTABLE_VARS_DEPENDENCE,
gas_used=(
call.state.mstate.min_gas_used,
call.state.mstate.max_gas_used,
),
)
issues.append(issue)
break
return issues
def solve(self, call):
try:
model = solver.get_model(call.node.constraints)
logging.debug("[DEPENDENCE_ON_PREDICTABLE_VARS] MODEL: " + str(model))
pretty_model = solver.pretty_print_model(model)
def solve(call):
try:
model = solver.get_model(call.node.constraints)
logging.debug("[DEPENDENCE_ON_PREDICTABLE_VARS] MODEL: " + str(model))
pretty_model = solver.pretty_print_model(model)
logging.debug(
"[DEPENDENCE_ON_PREDICTABLE_VARS] main model: \n%s" % pretty_model
)
return True
except UnsatError:
logging.debug("[DEPENDENCE_ON_PREDICTABLE_VARS] no model found")
return False
logging.debug(
"[DEPENDENCE_ON_PREDICTABLE_VARS] main model: \n%s" % pretty_model
)
return True
except UnsatError:
logging.debug("[DEPENDENCE_ON_PREDICTABLE_VARS] no model found")
return False
detector = PredictableDependenceModule()

@ -1,5 +1,6 @@
from mythril.analysis.report import Issue
from mythril.analysis.swc_data import TX_ORIGIN_USAGE
from mythril.analysis.modules.base import DetectionModule
import logging
@ -10,39 +11,54 @@ Check for constraints on tx.origin (i.e., access to some functionality is restri
"""
def execute(statespace):
class DeprecatedOperationsModule(DetectionModule):
def __init__(self):
super().__init__(
name="Deprecated Operations",
swc_id=TX_ORIGIN_USAGE,
hooks=["ORIGIN"],
description=(
"Check for constraints on tx.origin (i.e., access to some "
"functionality is restricted to a specific origin)."
),
)
logging.debug("Executing module: DEPRECATED OPCODES")
def execute(self, statespace):
issues = []
logging.debug("Executing module: DEPRECATED OPCODES")
for k in statespace.nodes:
node = statespace.nodes[k]
issues = []
for state in node.states:
for k in statespace.nodes:
node = statespace.nodes[k]
instruction = state.get_current_instruction()
for state in node.states:
if instruction["opcode"] == "ORIGIN":
description = (
"The function `{}` retrieves the transaction origin (tx.origin) using the ORIGIN opcode. "
"Use msg.sender instead.\nSee also: "
"https://solidity.readthedocs.io/en/develop/security-considerations.html#tx-origin".format(
node.function_name
instruction = state.get_current_instruction()
if instruction["opcode"] == "ORIGIN":
description = (
"The function `{}` retrieves the transaction origin (tx.origin) using the ORIGIN opcode. "
"Use msg.sender instead.\nSee also: "
"https://solidity.readthedocs.io/en/develop/security-considerations.html#tx-origin".format(
node.function_name
)
)
issue = Issue(
contract=node.contract_name,
function_name=node.function_name,
address=instruction["address"],
title="Use of tx.origin",
bytecode=state.environment.code.bytecode,
_type="Warning",
swc_id=TX_ORIGIN_USAGE,
description=description,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
)
issue = Issue(
contract=node.contract_name,
function_name=node.function_name,
address=instruction["address"],
title="Use of tx.origin",
bytecode=state.environment.code.bytecode,
_type="Warning",
swc_id=TX_ORIGIN_USAGE,
description=description,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
issues.append(issue)
return issues
issues.append(issue)
return issues
detector = DeprecatedOperationsModule()

@ -3,6 +3,7 @@ from mythril.analysis import solver
from mythril.analysis.analysis_utils import get_non_creator_constraints
from mythril.analysis.report import Issue
from mythril.analysis.swc_data import UNPROTECTED_ETHER_WITHDRAWAL
from mythril.analysis.modules.base import DetectionModule
from mythril.exceptions import UnsatError
import logging
@ -10,8 +11,7 @@ import logging
"""
MODULE DESCRIPTION:
Search for cases where Ether can be withdrawn to a user-specified address.
Search for cases where Ether can be withdrawn to a user-specified address.
An issue is reported ONLY IF:
- The transaction sender does not match contract creator;
@ -22,73 +22,88 @@ This is somewhat coarse and needs to be refined in the future.
"""
def execute(state_space):
logging.debug("Executing module: ETHER_THIEF")
class EtherThief(DetectionModule):
def __init__(self):
super().__init__(
name="Ether Thief",
swc_id=UNPROTECTED_ETHER_WITHDRAWAL,
hooks=["CALL"],
description=(
"Search for cases where Ether can be withdrawn to a user-specified address.\n"
"An issue is reported ONLY IF:\n\n"
"- The transaction sender does not match contract creator\n"
"- The sender has not previously sent any ETH to the contract account.\n\n"
"This is somewhat coarse and needs to be refined in the future."
),
)
issues = []
def execute(self, state_space):
logging.debug("Executing module: ETHER_THIEF")
issues = []
for k in state_space.nodes:
node = state_space.nodes[k]
for k in state_space.nodes:
node = state_space.nodes[k]
for state in node.states:
issues += self._analyze_state(state, node)
for state in node.states:
issues += _analyze_state(state, node)
return issues
return issues
@staticmethod
def _analyze_state(state, node):
issues = []
instruction = state.get_current_instruction()
if instruction["opcode"] != "CALL":
return []
def _analyze_state(state, node):
issues = []
instruction = state.get_current_instruction()
call_value = state.mstate.stack[-3]
target = state.mstate.stack[-2]
if instruction["opcode"] != "CALL":
return []
not_creator_constraints, constrained = get_non_creator_constraints(state)
if constrained:
return []
call_value = state.mstate.stack[-3]
target = state.mstate.stack[-2]
try:
not_creator_constraints, constrained = get_non_creator_constraints(state)
if constrained:
return []
"""
FIXME: Instead of solving for call_value > 0, check whether call value can be greater than
the total value of all transactions received by the caller
"""
try:
model = solver.get_model(
node.constraints + not_creator_constraints + [call_value > 0]
)
"""
FIXME: Instead of solving for call_value > 0, check whether call value can be greater than
the total value of all transactions received by the caller
"""
transaction_sequence = solver.get_transaction_sequence(
state, node.constraints + not_creator_constraints + [call_value > 0]
)
model = solver.get_model(
node.constraints + not_creator_constraints + [call_value > 0]
)
# For now we only report an issue if zero ETH has been sent to the contract account.
transaction_sequence = solver.get_transaction_sequence(
state, node.constraints + not_creator_constraints + [call_value > 0]
)
for key, value in transaction_sequence.items():
if int(value["call_value"], 16) > 0:
return []
# For now we only report an issue if zero ETH has been sent to the contract account.
debug = "Transaction Sequence: " + str(transaction_sequence)
for key, value in transaction_sequence.items():
if int(value["call_value"], 16) > 0:
return []
issue = Issue(
contract=node.contract_name,
function_name=node.function_name,
address=instruction["address"],
swc_id=UNPROTECTED_ETHER_WITHDRAWAL,
title="Ether thief",
_type="Warning",
bytecode=state.environment.code.bytecode,
description="Users other than the contract creator can withdraw ETH from the contract account"
+ " without previously having sent any ETH to it. This is likely to be vulnerability.",
debug=debug,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
issues.append(issue)
except UnsatError:
logging.debug("[ETHER_THIEF] no model found")
debug = "Transaction Sequence: " + str(transaction_sequence)
return issues
issue = Issue(
contract=node.contract_name,
function_name=node.function_name,
address=instruction["address"],
swc_id=UNPROTECTED_ETHER_WITHDRAWAL,
title="Ether thief",
_type="Warning",
bytecode=state.environment.code.bytecode,
description="Users other than the contract creator can withdraw ETH from the contract account"
+ " without previously having sent any ETH to it. This is likely to be vulnerability.",
debug=debug,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
issues.append(issue)
except UnsatError:
logging.debug("[ETHER_THIEF] no model found")
return issues
detector = EtherThief()

@ -2,67 +2,72 @@ from mythril.analysis.report import Issue
from mythril.analysis.swc_data import ASSERT_VIOLATION
from mythril.exceptions import UnsatError
from mythril.analysis import solver
from mythril.analysis.modules.base import DetectionModule
import logging
"""
MODULE DESCRIPTION:
class ReachableExceptionsModule(DetectionModule):
def __init__(self):
super().__init__(
name="Reachable Exceptions",
swc_id=ASSERT_VIOLATION,
hooks=["ASSERT_FAIL"],
description="Checks whether any exception states are reachable.",
)
Checks whether any exception states are reachable.
def execute(self, statespace):
"""
logging.debug("Executing module: EXCEPTIONS")
issues = []
def execute(statespace):
for k in statespace.nodes:
node = statespace.nodes[k]
logging.debug("Executing module: EXCEPTIONS")
for state in node.states:
issues = []
instruction = state.get_current_instruction()
if instruction["opcode"] == "ASSERT_FAIL":
try:
model = solver.get_model(node.constraints)
address = state.get_current_instruction()["address"]
for k in statespace.nodes:
node = statespace.nodes[k]
for state in node.states:
description = (
"A reachable exception (opcode 0xfe) has been detected. "
"This can be caused by type errors, division by zero, "
"out-of-bounds array access, or assert violations. "
)
description += (
"Note that explicit `assert()` should only be used to check invariants. "
"Use `require()` for regular input checking. "
)
instruction = state.get_current_instruction()
if instruction["opcode"] == "ASSERT_FAIL":
try:
model = solver.get_model(node.constraints)
address = state.get_current_instruction()["address"]
debug = "Transaction Sequence: " + str(
solver.get_transaction_sequence(state, node.constraints)
)
description = (
"A reachable exception (opcode 0xfe) has been detected. "
"This can be caused by type errors, division by zero, "
"out-of-bounds array access, or assert violations. "
)
description += (
"Note that explicit `assert()` should only be used to check invariants. "
"Use `require()` for regular input checking. "
)
issues.append(
Issue(
contract=node.contract_name,
function_name=node.function_name,
address=address,
swc_id=ASSERT_VIOLATION,
title="Exception state",
_type="Informational",
description=description,
bytecode=state.environment.code.bytecode,
debug=debug,
gas_used=(
state.mstate.min_gas_used,
state.mstate.max_gas_used,
),
)
)
debug = "Transaction Sequence: " + str(
solver.get_transaction_sequence(state, node.constraints)
)
except UnsatError:
logging.debug("[EXCEPTIONS] no model found")
issues.append(
Issue(
contract=node.contract_name,
function_name=node.function_name,
address=address,
swc_id=ASSERT_VIOLATION,
title="Exception state",
_type="Informational",
description=description,
bytecode=state.environment.code.bytecode,
debug=debug,
gas_used=(
state.mstate.min_gas_used,
state.mstate.max_gas_used,
),
)
)
return issues
except UnsatError:
logging.debug("[EXCEPTIONS] no model found")
return issues
detector = ReachableExceptionsModule()

@ -3,196 +3,216 @@ from mythril.analysis.ops import *
from mythril.analysis.report import Issue
from mythril.analysis import solver
from mythril.analysis.swc_data import REENTRANCY
from mythril.analysis.modules.base import DetectionModule
import re
import logging
from mythril.laser.ethereum.cfg import JumpType
"""
MODULE DESCRIPTION:
Check for call.value()() to external addresses
"""
class ExternalCallModule(DetectionModule):
def __init__(self, max_search_depth=64):
super().__init__(
name="External Calls",
swc_id=REENTRANCY,
hooks=["CALL"],
description="Check for call.value()() to external addresses",
)
self.max_search_depth = max_search_depth
self.calls_visited = []
def search_children(
self, statespace, node, transaction_id, start_index=0, depth=0, results=None
):
if results is None:
results = []
logging.debug("SEARCHING NODE %d", node.uid)
if depth < self.max_search_depth:
n_states = len(node.states)
if n_states > start_index:
for j in range(start_index, n_states):
if (
node.states[j].get_current_instruction()["opcode"] == "SSTORE"
and node.states[j].current_transaction.id == transaction_id
):
results.append(
node.states[j].get_current_instruction()["address"]
)
children = []
MAX_SEARCH_DEPTH = 64
for edge in statespace.edges:
if edge.node_from == node.uid and edge.type != JumpType.Transaction:
children.append(statespace.nodes[edge.node_to])
if len(children):
for node in children:
results += self.search_children(
statespace,
node,
transaction_id,
depth=depth + 1,
results=results,
)
def search_children(
statespace, node, transaction_id, start_index=0, depth=0, results=None
):
if results is None:
results = []
logging.debug("SEARCHING NODE %d", node.uid)
return results
if depth < MAX_SEARCH_DEPTH:
def execute(self, statespace):
n_states = len(node.states)
issues = []
if n_states > start_index:
for call in statespace.calls:
for j in range(start_index, n_states):
if (
node.states[j].get_current_instruction()["opcode"] == "SSTORE"
and node.states[j].current_transaction.id == transaction_id
):
results.append(node.states[j].get_current_instruction()["address"])
children = []
state = call.state
address = state.get_current_instruction()["address"]
for edge in statespace.edges:
if edge.node_from == node.uid and edge.type != JumpType.Transaction:
children.append(statespace.nodes[edge.node_to])
if call.type == "CALL":
if len(children):
for node in children:
results += search_children(
statespace, node, transaction_id, depth=depth + 1, results=results
logging.debug(
"[EXTERNAL_CALLS] Call to: %s, value = %s, gas = %s"
% (str(call.to), str(call.value), str(call.gas))
)
return results
calls_visited = []
def execute(statespace):
issues = []
for call in statespace.calls:
state = call.state
address = state.get_current_instruction()["address"]
if call.type == "CALL":
logging.debug(
"[EXTERNAL_CALLS] Call to: %s, value = %s, gas = %s"
% (str(call.to), str(call.value), str(call.gas))
)
if (
call.to.type == VarType.SYMBOLIC
and (call.gas.type == VarType.CONCRETE and call.gas.val > 2300)
or (
call.gas.type == VarType.SYMBOLIC
and "2300" not in str(call.gas)
)
):
if (
call.to.type == VarType.SYMBOLIC
and (call.gas.type == VarType.CONCRETE and call.gas.val > 2300)
or (call.gas.type == VarType.SYMBOLIC and "2300" not in str(call.gas))
):
description = "This contract executes a message call to "
description = "This contract executes a message call to "
target = str(call.to)
user_supplied = False
target = str(call.to)
user_supplied = False
if "calldata" in target or "caller" in target:
if "calldata" in target or "caller" in target:
if "calldata" in target:
description += (
"an address provided as a function argument. "
)
else:
description += "the address of the transaction sender. "
if "calldata" in target:
description += "an address provided as a function argument. "
user_supplied = True
else:
description += "the address of the transaction sender. "
m = re.search(r"storage_([a-z0-9_&^]+)", str(call.to))
user_supplied = True
else:
m = re.search(r"storage_([a-z0-9_&^]+)", str(call.to))
if m:
idx = m.group(1)
if m:
idx = m.group(1)
func = statespace.find_storage_write(
state.environment.active_account.address, idx
)
if func:
description += (
"an address found at storage slot "
+ str(idx)
+ ". "
+ "This storage slot can be written to by calling the function `"
+ func
+ "`. "
func = statespace.find_storage_write(
state.environment.active_account.address, idx
)
user_supplied = True
if user_supplied:
description += (
"Generally, it is not recommended to call user-supplied addresses using Solidity's call() construct. "
"Note that attackers might leverage reentrancy attacks to exploit race conditions or manipulate this contract's state."
)
if func:
issue = Issue(
contract=call.node.contract_name,
function_name=call.node.function_name,
address=address,
title="Message call to external contract",
_type="Warning",
description=description,
bytecode=state.environment.code.bytecode,
swc_id=REENTRANCY,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
description += (
"an address found at storage slot "
+ str(idx)
+ ". "
+ "This storage slot can be written to by calling the function `"
+ func
+ "`. "
)
user_supplied = True
else:
if user_supplied:
description += "to another contract. Make sure that the called contract is trusted and does not execute user-supplied code."
description += (
"Generally, it is not recommended to call user-supplied addresses using Solidity's call() construct. "
"Note that attackers might leverage reentrancy attacks to exploit race conditions or manipulate this contract's state."
)
issue = Issue(
contract=call.node.contract_name,
function_name=call.node.function_name,
address=address,
title="Message call to external contract",
_type="Informational",
description=description,
bytecode=state.environment.code.bytecode,
swc_id=REENTRANCY,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
issue = Issue(
contract=call.node.contract_name,
function_name=call.node.function_name,
address=address,
title="Message call to external contract",
_type="Warning",
description=description,
bytecode=state.environment.code.bytecode,
swc_id=REENTRANCY,
gas_used=(
state.mstate.min_gas_used,
state.mstate.max_gas_used,
),
)
issues.append(issue)
else:
if address not in calls_visited:
calls_visited.append(address)
description += "to another contract. Make sure that the called contract is trusted and does not execute user-supplied code."
issue = Issue(
contract=call.node.contract_name,
function_name=call.node.function_name,
address=address,
title="Message call to external contract",
_type="Informational",
description=description,
bytecode=state.environment.code.bytecode,
swc_id=REENTRANCY,
gas_used=(
state.mstate.min_gas_used,
state.mstate.max_gas_used,
),
)
logging.debug(
"[EXTERNAL_CALLS] Checking for state changes starting from "
+ call.node.function_name
)
issues.append(issue)
# Check for SSTORE in remaining instructions in current node & nodes down the CFG
if address not in self.calls_visited:
self.calls_visited.append(address)
state_change_addresses = search_children(
statespace,
call.node,
call.state.current_transaction.id,
call.state_index + 1,
depth=0,
results=[],
)
logging.debug(
"[EXTERNAL_CALLS] Checking for state changes starting from "
+ call.node.function_name
)
logging.debug(
"[EXTERNAL_CALLS] Detected state changes at addresses: "
+ str(state_change_addresses)
)
# Check for SSTORE in remaining instructions in current node & nodes down the CFG
if len(state_change_addresses):
for address in state_change_addresses:
description = (
"The contract account state is changed after an external call. "
"Consider that the called contract could re-enter the function before this "
"state change takes place. This can lead to business logic vulnerabilities."
)
state_change_addresses = self.search_children(
statespace,
call.node,
call.state.current_transaction.id,
call.state_index + 1,
depth=0,
results=[],
)
issue = Issue(
contract=call.node.contract_name,
function_name=call.node.function_name,
address=address,
title="State change after external call",
_type="Warning",
description=description,
bytecode=state.environment.code.bytecode,
swc_id=REENTRANCY,
gas_used=(
state.mstate.min_gas_used,
state.mstate.max_gas_used,
),
)
issues.append(issue)
logging.debug(
"[EXTERNAL_CALLS] Detected state changes at addresses: "
+ str(state_change_addresses)
)
return issues
if len(state_change_addresses):
for address in state_change_addresses:
description = (
"The contract account state is changed after an external call. "
"Consider that the called contract could re-enter the function before this "
"state change takes place. This can lead to business logic vulnerabilities."
)
issue = Issue(
contract=call.node.contract_name,
function_name=call.node.function_name,
address=address,
title="State change after external call",
_type="Warning",
description=description,
bytecode=state.environment.code.bytecode,
swc_id=REENTRANCY,
gas_used=(
state.mstate.min_gas_used,
state.mstate.max_gas_used,
),
)
issues.append(issue)
return issues
detector = ExternalCallModule()

@ -5,324 +5,325 @@ from mythril.analysis.report import Issue
from mythril.analysis.swc_data import INTEGER_OVERFLOW_AND_UNDERFLOW
from mythril.exceptions import UnsatError
from mythril.laser.ethereum.taint_analysis import TaintRunner
from mythril.analysis.modules.base import DetectionModule
import re
import copy
import logging
"""
MODULE DESCRIPTION:
Check for integer underflows.
For every SUB instruction, check if there's a possible state where op1 > op0.
For every ADD, MUL instruction, check if there's a possible state where op1 + op0 > 2^32 - 1
"""
def execute(statespace):
"""
Executes analysis module for integer underflow and integer overflow
:param statespace: Statespace to analyse
:return: Found issues
"""
logging.debug("Executing module: INTEGER")
class IntegerOverflowUnderflowModule(DetectionModule):
def __init__(self):
super().__init__(
name="Integer Overflow and Underflow",
swc_id=INTEGER_OVERFLOW_AND_UNDERFLOW,
hooks=["ADD", "MUL"],
description=(
"For every SUB instruction, check if there's a possible state "
"where op1 > op0. For every ADD, MUL instruction, check if "
"there's a possible state where op1 + op0 > 2^32 - 1"
),
)
issues = []
def execute(self, statespace):
"""
Executes analysis module for integer underflow and integer overflow
:param statespace: Statespace to analyse
:return: Found issues
"""
logging.debug("Executing module: INTEGER")
for k in statespace.nodes:
node = statespace.nodes[k]
issues = []
for state in node.states:
issues += _check_integer_underflow(statespace, state, node)
issues += _check_integer_overflow(statespace, state, node)
for k in statespace.nodes:
node = statespace.nodes[k]
return issues
for state in node.states:
issues += self._check_integer_underflow(statespace, state, node)
issues += self._check_integer_overflow(statespace, state, node)
return issues
def _check_integer_overflow(statespace, state, node):
"""
Checks for integer overflow
:param statespace: statespace that is being examined
:param state: state from node to examine
:param node: node to examine
:return: found issue
"""
issues = []
def _check_integer_overflow(self, statespace, state, node):
"""
Checks for integer overflow
:param statespace: statespace that is being examined
:param state: state from node to examine
:param node: node to examine
:return: found issue
"""
issues = []
# Check the instruction
instruction = state.get_current_instruction()
if instruction["opcode"] not in ("ADD", "MUL"):
return issues
# Formulate overflow constraints
stack = state.mstate.stack
op0, op1 = stack[-1], stack[-2]
# Check the instruction
instruction = state.get_current_instruction()
if instruction["opcode"] not in ("ADD", "MUL"):
return issues
# An integer overflow is possible if op0 + op1 or op0 * op1 > MAX_UINT
# Do a type check
allowed_types = [int, BitVecRef, BitVecNumRef]
if not (type(op0) in allowed_types and type(op1) in allowed_types):
return issues
# Change ints to BitVec
if type(op0) is int:
op0 = BitVecVal(op0, 256)
if type(op1) is int:
op1 = BitVecVal(op1, 256)
# Formulate expression
# FIXME: handle exponentiation
if instruction["opcode"] == "ADD":
operator = "add"
expr = op0 + op1
constraint = Not(BVAddNoOverflow(op0, op1, signed=False))
else:
operator = "multiply"
expr = op1 * op0
constraint = Not(BVMulNoOverflow(op0, op1, signed=False))
# Check satisfiable
model = self._try_constraints(node.constraints, [constraint])
if model is None:
logging.debug("[INTEGER_OVERFLOW] no model found")
return issues
# Build issue
issue = Issue(
contract=node.contract_name,
function_name=node.function_name,
address=instruction["address"],
swc_id=INTEGER_OVERFLOW_AND_UNDERFLOW,
bytecode=state.environment.code.bytecode,
title="Integer Overflow",
_type="Warning",
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
# Formulate overflow constraints
stack = state.mstate.stack
op0, op1 = stack[-1], stack[-2]
issue.description = "This binary {} operation can result in integer overflow.\n".format(
operator
)
try:
issue.debug = "Transaction Sequence: " + str(
solver.get_transaction_sequence(state, node.constraints)
)
except UnsatError:
return issues
# An integer overflow is possible if op0 + op1 or op0 * op1 > MAX_UINT
# Do a type check
allowed_types = [int, BitVecRef, BitVecNumRef]
if not (type(op0) in allowed_types and type(op1) in allowed_types):
return issues
issues.append(issue)
# Change ints to BitVec
if type(op0) is int:
op0 = BitVecVal(op0, 256)
if type(op1) is int:
op1 = BitVecVal(op1, 256)
# Formulate expression
# FIXME: handle exponentiation
if instruction["opcode"] == "ADD":
operator = "add"
expr = op0 + op1
constraint = Not(BVAddNoOverflow(op0, op1, signed=False))
else:
operator = "multiply"
expr = op1 * op0
constraint = Not(BVMulNoOverflow(op0, op1, signed=False))
# Check satisfiable
model = _try_constraints(node.constraints, [constraint])
if model is None:
logging.debug("[INTEGER_OVERFLOW] no model found")
return issues
if not _verify_integer_overflow(
statespace, node, expr, state, model, constraint, op0, op1
def _verify_integer_overflow(
self, statespace, node, expr, state, model, constraint, op0, op1
):
return issues
# Build issue
issue = Issue(
contract=node.contract_name,
function_name=node.function_name,
address=instruction["address"],
swc_id=INTEGER_OVERFLOW_AND_UNDERFLOW,
bytecode=state.environment.code.bytecode,
title="Integer Overflow",
_type="Warning",
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
issue.description = "This binary {} operation can result in integer overflow.\n".format(
operator
)
try:
issue.debug = "Transaction Sequence: " + str(
solver.get_transaction_sequence(state, node.constraints)
""" Verifies existence of integer overflow """
# If we get to this point then there has been an integer overflow
# Find out if the overflowed value is actually used
interesting_usages = self._search_children(
statespace,
node,
expr,
constraint=[constraint],
index=node.states.index(state),
)
except UnsatError:
return issues
issues.append(issue)
return issues
def _verify_integer_overflow(
statespace, node, expr, state, model, constraint, op0, op1
):
""" Verifies existence of integer overflow """
# If we get to this point then there has been an integer overflow
# Find out if the overflowed value is actually used
interesting_usages = _search_children(
statespace, node, expr, constraint=[constraint], index=node.states.index(state)
)
# Stop if it isn't
if len(interesting_usages) == 0:
return False
return _try_constraints(node.constraints, [Not(constraint)]) is not None
def _try_constraints(constraints, new_constraints):
"""
Tries new constraints
:return Model if satisfiable otherwise None
"""
try:
model = solver.get_model(constraints + new_constraints)
return model
except UnsatError:
return None
def _check_integer_underflow(statespace, state, node):
"""
Checks for integer underflow
:param state: state from node to examine
:param node: node to examine
:return: found issue
"""
issues = []
instruction = state.get_current_instruction()
if instruction["opcode"] == "SUB":
# Stop if it isn't
if len(interesting_usages) == 0:
return False
return self._try_constraints(node.constraints, [Not(constraint)]) is not None
def _try_constraints(self, constraints, new_constraints):
"""
Tries new constraints
:return Model if satisfiable otherwise None
"""
try:
model = solver.get_model(constraints + new_constraints)
return model
except UnsatError:
return None
def _check_integer_underflow(self, statespace, state, node):
"""
Checks for integer underflow
:param state: state from node to examine
:param node: node to examine
:return: found issue
"""
issues = []
instruction = state.get_current_instruction()
if instruction["opcode"] == "SUB":
stack = state.mstate.stack
op0 = stack[-1]
op1 = stack[-2]
constraints = copy.deepcopy(node.constraints)
# Filter for patterns that indicate benign underflows
# Pattern 1: (96 + calldatasize_MAIN) - (96), where (96 + calldatasize_MAIN) would underflow if calldatasize is very large.
# Pattern 2: (256*If(1 & storage_0 == 0, 1, 0)) - 1, this would underlow if storage_0 = 0
if type(op0) == int and type(op1) == int:
return []
if re.search(r"calldatasize_", str(op0)):
return []
if re.search(r"256\*.*If\(1", str(op0), re.DOTALL) or re.search(
r"256\*.*If\(1", str(op1), re.DOTALL
):
return []
if re.search(r"32 \+.*calldata", str(op0), re.DOTALL) or re.search(
r"32 \+.*calldata", str(op1), re.DOTALL
):
return []
logging.debug(
"[INTEGER_UNDERFLOW] Checking SUB {0}, {1} at address {2}".format(
str(op0), str(op1), str(instruction["address"])
)
)
allowed_types = [int, BitVecRef, BitVecNumRef]
if type(op0) in allowed_types and type(op1) in allowed_types:
constraints.append(UGT(op1, op0))
try:
model = solver.get_model(constraints)
# If we get to this point then there has been an integer overflow
# Find out if the overflowed value is actually used
interesting_usages = self._search_children(
statespace, node, (op0 - op1), index=node.states.index(state)
)
# Stop if it isn't
if len(interesting_usages) == 0:
return issues
issue = Issue(
contract=node.contract_name,
function_name=node.function_name,
address=instruction["address"],
swc_id=INTEGER_OVERFLOW_AND_UNDERFLOW,
bytecode=state.environment.code.bytecode,
title="Integer Underflow",
_type="Warning",
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
issue.description = (
"The subtraction can result in an integer underflow.\n"
)
issue.debug = "Transaction Sequence: " + str(
solver.get_transaction_sequence(state, node.constraints)
)
issues.append(issue)
except UnsatError:
logging.debug("[INTEGER_UNDERFLOW] no model found")
return issues
stack = state.mstate.stack
def _check_usage(self, state, taint_result):
"""Delegates checks to _check_{instruction_name}()"""
opcode = state.get_current_instruction()["opcode"]
op0 = stack[-1]
op1 = stack[-2]
if opcode == "JUMPI":
if self._check_jumpi(state, taint_result):
return [state]
elif opcode == "SSTORE":
if self._check_sstore(state, taint_result):
return [state]
return []
constraints = copy.deepcopy(node.constraints)
def _check_jumpi(self, state, taint_result):
""" Check if conditional jump is dependent on the result of expression"""
assert state.get_current_instruction()["opcode"] == "JUMPI"
return taint_result.check(state, -2)
def _check_sstore(self, state, taint_result):
""" Check if store operation is dependent on the result of expression"""
assert state.get_current_instruction()["opcode"] == "SSTORE"
return taint_result.check(state, -2)
def _search_children(
self,
statespace,
node,
expression,
taint_result=None,
constraint=None,
index=0,
depth=0,
max_depth=64,
):
"""
Checks the statespace for children states, with JUMPI or SSTORE instuctions,
for dependency on expression
:param statespace: The statespace to explore
:param node: Current node to explore from
:param expression: Expression to look for
:param taint_result: Result of taint analysis
:param index: Current state index node.states[index] == current_state
:param depth: Current depth level
:param max_depth: Max depth to explore
:return: List of states that match the opcodes and are dependent on expression
"""
if constraint is None:
constraint = []
logging.debug("SEARCHING NODE for usage of an overflowed variable %d", node.uid)
if taint_result is None:
state = node.states[index]
taint_stack = [False for _ in state.mstate.stack]
taint_stack[-1] = True
taint_result = TaintRunner.execute(
statespace, node, state, initial_stack=taint_stack
)
# Filter for patterns that indicate benign underflows
results = []
# Pattern 1: (96 + calldatasize_MAIN) - (96), where (96 + calldatasize_MAIN) would underflow if calldatasize is very large.
# Pattern 2: (256*If(1 & storage_0 == 0, 1, 0)) - 1, this would underlow if storage_0 = 0
if type(op0) == int and type(op1) == int:
return []
if re.search(r"calldatasize_", str(op0)):
return []
if re.search(r"256\*.*If\(1", str(op0), re.DOTALL) or re.search(
r"256\*.*If\(1", str(op1), re.DOTALL
):
return []
if re.search(r"32 \+.*calldata", str(op0), re.DOTALL) or re.search(
r"32 \+.*calldata", str(op1), re.DOTALL
):
if depth >= max_depth:
return []
logging.debug(
"[INTEGER_UNDERFLOW] Checking SUB {0}, {1} at address {2}".format(
str(op0), str(op1), str(instruction["address"])
# Explore current node from index
for j in range(index, len(node.states)):
current_state = node.states[j]
current_instruction = current_state.get_current_instruction()
if current_instruction["opcode"] in ("JUMPI", "SSTORE"):
element = self._check_usage(current_state, taint_result)
if len(element) < 1:
continue
results += element
# Recursively search children
children = [
statespace.nodes[edge.node_to]
for edge in statespace.edges
if edge.node_from == node.uid
# and _try_constraints(statespace.nodes[edge.node_to].constraints, constraint) is not None
]
for child in children:
results += self._search_children(
statespace,
child,
expression,
taint_result,
depth=depth + 1,
max_depth=max_depth,
)
)
allowed_types = [int, BitVecRef, BitVecNumRef]
if type(op0) in allowed_types and type(op1) in allowed_types:
constraints.append(UGT(op1, op0))
try:
model = solver.get_model(constraints)
# If we get to this point then there has been an integer overflow
# Find out if the overflowed value is actually used
interesting_usages = _search_children(
statespace, node, (op0 - op1), index=node.states.index(state)
)
# Stop if it isn't
if len(interesting_usages) == 0:
return issues
issue = Issue(
contract=node.contract_name,
function_name=node.function_name,
address=instruction["address"],
swc_id=INTEGER_OVERFLOW_AND_UNDERFLOW,
bytecode=state.environment.code.bytecode,
title="Integer Underflow",
_type="Warning",
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
return results
issue.description = (
"The subtraction can result in an integer underflow.\n"
)
issue.debug = "Transaction Sequence: " + str(
solver.get_transaction_sequence(state, node.constraints)
)
issues.append(issue)
except UnsatError:
logging.debug("[INTEGER_UNDERFLOW] no model found")
return issues
def _check_usage(state, taint_result):
"""Delegates checks to _check_{instruction_name}()"""
opcode = state.get_current_instruction()["opcode"]
if opcode == "JUMPI":
if _check_jumpi(state, taint_result):
return [state]
elif opcode == "SSTORE":
if _check_sstore(state, taint_result):
return [state]
return []
def _check_jumpi(state, taint_result):
""" Check if conditional jump is dependent on the result of expression"""
assert state.get_current_instruction()["opcode"] == "JUMPI"
return taint_result.check(state, -2)
def _check_sstore(state, taint_result):
""" Check if store operation is dependent on the result of expression"""
assert state.get_current_instruction()["opcode"] == "SSTORE"
return taint_result.check(state, -2)
def _search_children(
statespace,
node,
expression,
taint_result=None,
constraint=None,
index=0,
depth=0,
max_depth=64,
):
"""
Checks the statespace for children states, with JUMPI or SSTORE instuctions,
for dependency on expression
:param statespace: The statespace to explore
:param node: Current node to explore from
:param expression: Expression to look for
:param taint_result: Result of taint analysis
:param index: Current state index node.states[index] == current_state
:param depth: Current depth level
:param max_depth: Max depth to explore
:return: List of states that match the opcodes and are dependent on expression
"""
if constraint is None:
constraint = []
logging.debug("SEARCHING NODE for usage of an overflowed variable %d", node.uid)
if taint_result is None:
state = node.states[index]
taint_stack = [False for _ in state.mstate.stack]
taint_stack[-1] = True
taint_result = TaintRunner.execute(
statespace, node, state, initial_stack=taint_stack
)
results = []
if depth >= max_depth:
return []
# Explore current node from index
for j in range(index, len(node.states)):
current_state = node.states[j]
current_instruction = current_state.get_current_instruction()
if current_instruction["opcode"] in ("JUMPI", "SSTORE"):
element = _check_usage(current_state, taint_result)
if len(element) < 1:
continue
results += element
# Recursively search children
children = [
statespace.nodes[edge.node_to]
for edge in statespace.edges
if edge.node_from == node.uid
# and _try_constraints(statespace.nodes[edge.node_to].constraints, constraint) is not None
]
for child in children:
results += _search_children(
statespace,
child,
expression,
taint_result,
depth=depth + 1,
max_depth=max_depth,
)
return results
detector = IntegerOverflowUnderflowModule()

@ -1,83 +1,87 @@
from mythril.analysis.report import Issue
from mythril.analysis.swc_data import *
from mythril.analysis.swc_data import MULTIPLE_SENDS
from mythril.analysis.modules.base import DetectionModule
from mythril.laser.ethereum.cfg import JumpType
"""
MODULE DESCRIPTION:
Check for multiple sends in a single transaction
"""
def execute(statespace):
issues = []
for call in statespace.calls:
findings = []
# explore state
findings += _explore_states(call, statespace)
# explore nodes
findings += _explore_nodes(call, statespace)
if len(findings) > 0:
node = call.node
instruction = call.state.get_current_instruction()
issue = Issue(
contract=node.contract_name,
function_name=node.function_name,
address=instruction["address"],
swc_id=MULTIPLE_SENDS,
bytecode=call.state.environment.code.bytecode,
title="Multiple Calls",
_type="Informational",
gas_used=(
call.state.mstate.min_gas_used,
call.state.mstate.max_gas_used,
),
)
issue.description = (
"Multiple sends are executed in a single transaction. "
"Try to isolate each external call into its own transaction,"
" as external calls can fail accidentally or deliberately.\nConsecutive calls: \n"
)
class MultipleSendsModule(DetectionModule):
def __init__(self):
super().__init__(
name="Multiple Sends",
swc_id=MULTIPLE_SENDS,
hooks=["CALL", "DELEGATECALL", "STATICCALL", "CALLCODE"],
description="Check for multiple sends in a single transaction",
)
for finding in findings:
issue.description += "Call at address: {}\n".format(
finding.state.get_current_instruction()["address"]
def execute(self, statespace):
issues = []
for call in statespace.calls:
findings = []
# explore state
findings += self._explore_states(call, statespace)
# explore nodes
findings += self._explore_nodes(call, statespace)
if len(findings) > 0:
node = call.node
instruction = call.state.get_current_instruction()
issue = Issue(
contract=node.contract_name,
function_name=node.function_name,
address=instruction["address"],
swc_id=MULTIPLE_SENDS,
bytecode=call.state.environment.code.bytecode,
title="Multiple Calls",
_type="Informational",
gas_used=(
call.state.mstate.min_gas_used,
call.state.mstate.max_gas_used,
),
)
issues.append(issue)
return issues
def _explore_nodes(call, statespace):
children = _child_nodes(statespace, call.node)
sending_children = list(filter(lambda c: c.node in children, statespace.calls))
return sending_children
issue.description = (
"Multiple sends are executed in a single transaction. "
"Try to isolate each external call into its own transaction,"
" as external calls can fail accidentally or deliberately.\nConsecutive calls: \n"
)
def _explore_states(call, statespace):
other_calls = list(
filter(
lambda other: other.node == call.node
and other.state_index > call.state_index,
statespace.calls,
for finding in findings:
issue.description += "Call at address: {}\n".format(
finding.state.get_current_instruction()["address"]
)
issues.append(issue)
return issues
def _explore_nodes(self, call, statespace):
children = self._child_nodes(statespace, call.node)
sending_children = list(filter(lambda c: c.node in children, statespace.calls))
return sending_children
def _explore_states(self, call, statespace):
other_calls = list(
filter(
lambda other: other.node == call.node
and other.state_index > call.state_index,
statespace.calls,
)
)
)
return other_calls
return other_calls
def _child_nodes(self, statespace, node):
result = []
children = [
statespace.nodes[edge.node_to]
for edge in statespace.edges
if edge.node_from == node.uid and edge.type != JumpType.Transaction
]
for child in children:
result.append(child)
result += self._child_nodes(statespace, child)
def _child_nodes(statespace, node):
result = []
children = [
statespace.nodes[edge.node_to]
for edge in statespace.edges
if edge.node_from == node.uid and edge.type != JumpType.Transaction
]
return result
for child in children:
result.append(child)
result += _child_nodes(statespace, child)
return result
detector = MultipleSendsModule()

@ -4,84 +4,101 @@ from mythril.analysis.ops import *
from mythril.analysis.report import Issue
from mythril.analysis.swc_data import UNPROTECTED_SELFDESTRUCT
from mythril.exceptions import UnsatError
from mythril.analysis.modules.base import DetectionModule
from mythril.laser.ethereum.transaction import ContractCreationTransaction
import re
import logging
"""
MODULE DESCRIPTION:
Check for SUICIDE instructions that either can be reached by anyone, or where msg.sender is checked against a tainted storage index
(i.e. there's a write to that index is unconstrained by msg.sender).
"""
def execute(state_space):
class SuicideModule(DetectionModule):
def __init__(self):
super().__init__(
name="Unprotected Suicide",
swc_id=UNPROTECTED_SELFDESTRUCT,
hooks=["SUICIDE"],
description=(
"Check for SUICIDE instructions that either can be reached by anyone, "
"or where msg.sender is checked against a tainted storage index (i.e. "
"there's a write to that index is unconstrained by msg.sender)."
),
)
logging.debug("Executing module: UNCHECKED_SUICIDE")
def execute(self, state_space):
issues = []
logging.debug("Executing module: UNCHECKED_SUICIDE")
for k in state_space.nodes:
node = state_space.nodes[k]
issues = []
for state in node.states:
issues += _analyze_state(state, node)
for k in state_space.nodes:
node = state_space.nodes[k]
return issues
for state in node.states:
issues += self._analyze_state(state, node)
return issues
def _analyze_state(state, node):
issues = []
instruction = state.get_current_instruction()
def _analyze_state(self, state, node):
issues = []
instruction = state.get_current_instruction()
if instruction["opcode"] != "SUICIDE":
return []
if instruction["opcode"] != "SUICIDE":
return []
to = state.mstate.stack[-1]
to = state.mstate.stack[-1]
logging.debug("[UNCHECKED_SUICIDE] suicide in function " + node.function_name)
logging.debug("[UNCHECKED_SUICIDE] suicide in function " + node.function_name)
description = "A reachable SUICIDE instruction was detected. "
description = "A reachable SUICIDE instruction was detected. "
if "caller" in str(to):
description += "The remaining Ether is sent to the caller's address.\n"
elif "storage" in str(to):
description += "The remaining Ether is sent to a stored address.\n"
elif "calldata" in str(to):
description += "The remaining Ether is sent to an address provided as a function argument.\n"
elif type(to) == BitVecNumRef:
description += "The remaining Ether is sent to: " + hex(to.as_long()) + "\n"
else:
description += "The remaining Ether is sent to: " + str(to) + "\n"
if "caller" in str(to):
description += "The remaining Ether is sent to the caller's address.\n"
elif "storage" in str(to):
description += "The remaining Ether is sent to a stored address.\n"
elif "calldata" in str(to):
description += "The remaining Ether is sent to an address provided as a function argument.\n"
elif type(to) == BitVecNumRef:
description += "The remaining Ether is sent to: " + hex(to.as_long()) + "\n"
else:
description += "The remaining Ether is sent to: " + str(to) + "\n"
not_creator_constraints, constrained = get_non_creator_constraints(state)
not_creator_constraints, constrained = get_non_creator_constraints(state)
if constrained:
return []
if constrained:
return []
try:
model = solver.get_model(node.constraints + not_creator_constraints)
try:
model = solver.get_model(node.constraints + not_creator_constraints)
debug = "Transaction Sequence: " + str(
solver.get_transaction_sequence(
state, node.constraints + not_creator_constraints
debug = "Transaction Sequence: " + str(
solver.get_transaction_sequence(
state, node.constraints + not_creator_constraints
)
)
)
issue = Issue(
contract=node.contract_name,
function_name=node.function_name,
address=instruction["address"],
swc_id=UNPROTECTED_SELFDESTRUCT,
bytecode=state.environment.code.bytecode,
title="Unchecked SUICIDE",
_type="Warning",
description=description,
debug=debug,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
issues.append(issue)
except UnsatError:
logging.debug("[UNCHECKED_SUICIDE] no model found")
issue = Issue(
contract=node.contract_name,
function_name=node.function_name,
address=instruction["address"],
swc_id=UNPROTECTED_SELFDESTRUCT,
bytecode=state.environment.code.bytecode,
title="Unchecked SUICIDE",
_type="Warning",
description=description,
debug=debug,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
issues.append(issue)
except UnsatError:
logging.debug("[UNCHECKED_SUICIDE] no model found")
return issues
return issues
detector = SuicideModule()

@ -6,146 +6,157 @@ from mythril.analysis import solver
from mythril.analysis.ops import *
from mythril.analysis.report import Issue
from mythril.analysis.swc_data import TX_ORDER_DEPENDENCE
from mythril.analysis.modules.base import DetectionModule
from mythril.exceptions import UnsatError
"""
MODULE DESCRIPTION:
This module finds the existance of transaction order dependence vulnerabilities.
The following webpage contains an extensive description of the vulnerability:
https://consensys.github.io/smart-contract-best-practices/known_attacks/#transaction-ordering-dependence-tod-front-running
"""
def execute(statespace):
""" Executes the analysis module"""
logging.debug("Executing module: TOD")
class TxOrderDependenceModule(DetectionModule):
def __init__(self):
super().__init__(
name="Transaction Order Dependence",
swc_id=TX_ORDER_DEPENDENCE,
hooks=[],
description=(
"This module finds the existance of transaction order dependence "
"vulnerabilities. The following webpage contains an extensive description "
"of the vulnerability: "
"https://consensys.github.io/smart-contract-best-practices/known_attacks/#transaction-ordering-dependence-tod-front-running"
),
)
issues = []
def execute(self, statespace):
""" Executes the analysis module"""
logging.debug("Executing module: TOD")
for call in statespace.calls:
# Do analysis
interesting_storages = list(_get_influencing_storages(call))
changing_sstores = list(
_get_influencing_sstores(statespace, interesting_storages)
)
issues = []
# Build issue if necessary
if len(changing_sstores) > 0:
node = call.node
instruction = call.state.get_current_instruction()
issue = Issue(
contract=node.contract_name,
function_name=node.function_name,
address=instruction["address"],
title="Transaction order dependence",
bytecode=call.state.environment.code.bytecode,
swc_id=TX_ORDER_DEPENDENCE,
_type="Warning",
gas_used=(
call.state.mstate.min_gas_used,
call.state.mstate.max_gas_used,
),
for call in statespace.calls:
# Do analysis
interesting_storages = list(self._get_influencing_storages(call))
changing_sstores = list(
self._get_influencing_sstores(statespace, interesting_storages)
)
issue.description = (
"Possible transaction order dependence vulnerability: The value or "
"direction of the call statement is determined from a tainted storage location"
# Build issue if necessary
if len(changing_sstores) > 0:
node = call.node
instruction = call.state.get_current_instruction()
issue = Issue(
contract=node.contract_name,
function_name=node.function_name,
address=instruction["address"],
title="Transaction order dependence",
bytecode=call.state.environment.code.bytecode,
swc_id=TX_ORDER_DEPENDENCE,
_type="Warning",
gas_used=(
call.state.mstate.min_gas_used,
call.state.mstate.max_gas_used,
),
)
issue.description = (
"Possible transaction order dependence vulnerability: The value or "
"direction of the call statement is determined from a tainted storage location"
)
issues.append(issue)
return issues
# TODO: move to __init__ or util module
def _get_states_with_opcode(self, statespace, opcode):
""" Gets all (state, node) tuples in statespace with opcode"""
for k in statespace.nodes:
node = statespace.nodes[k]
for state in node.states:
if state.get_current_instruction()["opcode"] == opcode:
yield state, node
def _dependent_on_storage(self, expression):
""" Checks if expression is dependent on a storage symbol and returns the influencing storages"""
pattern = re.compile(r"storage_[a-z0-9_&^]*[0-9]+")
return pattern.findall(str(simplify(expression)))
def _get_storage_variable(self, storage, state):
"""
Get storage z3 object given storage name and the state
:param storage: storage name example: storage_0
:param state: state to retrieve the variable from
:return: z3 object representing storage
"""
index = int(re.search("[0-9]+", storage).group())
try:
return state.environment.active_account.storage[index]
except KeyError:
return None
def _can_change(self, constraints, variable):
""" Checks if the variable can change given some constraints """
_constraints = copy.deepcopy(constraints)
try:
model = solver.get_model(_constraints)
except UnsatError:
return False
try:
initial_value = int(str(model.eval(variable, model_completion=True)))
return (
self._try_constraints(constraints, [variable != initial_value])
is not None
)
issues.append(issue)
return issues
# TODO: move to __init__ or util module
def _get_states_with_opcode(statespace, opcode):
""" Gets all (state, node) tuples in statespace with opcode"""
for k in statespace.nodes:
node = statespace.nodes[k]
for state in node.states:
if state.get_current_instruction()["opcode"] == opcode:
yield state, node
def _dependent_on_storage(expression):
""" Checks if expression is dependent on a storage symbol and returns the influencing storages"""
pattern = re.compile(r"storage_[a-z0-9_&^]*[0-9]+")
return pattern.findall(str(simplify(expression)))
def _get_storage_variable(storage, state):
"""
Get storage z3 object given storage name and the state
:param storage: storage name example: storage_0
:param state: state to retrieve the variable from
:return: z3 object representing storage
"""
index = int(re.search("[0-9]+", storage).group())
try:
return state.environment.active_account.storage[index]
except KeyError:
return None
def _can_change(constraints, variable):
""" Checks if the variable can change given some constraints """
_constraints = copy.deepcopy(constraints)
try:
model = solver.get_model(_constraints)
except UnsatError:
return False
try:
initial_value = int(str(model.eval(variable, model_completion=True)))
return _try_constraints(constraints, [variable != initial_value]) is not None
except AttributeError:
return False
def _get_influencing_storages(call):
""" Examines a Call object and returns an iterator of all storages that influence the call value or direction"""
state = call.state
node = call.node
# Get relevant storages
to, value = call.to, call.value
storages = []
if to.type == VarType.SYMBOLIC:
storages += _dependent_on_storage(to.val)
if value.type == VarType.SYMBOLIC:
storages += _dependent_on_storage(value.val)
# See if they can change within the constraints of the node
for storage in storages:
variable = _get_storage_variable(storage, state)
can_change = _can_change(node.constraints, variable)
if can_change:
yield storage
def _get_influencing_sstores(statespace, interesting_storages):
""" Gets sstore (state, node) tuples that write to interesting_storages"""
for sstore_state, node in _get_states_with_opcode(statespace, "SSTORE"):
index, value = sstore_state.mstate.stack[-1], sstore_state.mstate.stack[-2]
except AttributeError:
return False
def _get_influencing_storages(self, call):
""" Examines a Call object and returns an iterator of all storages that influence the call value or direction"""
state = call.state
node = call.node
# Get relevant storages
to, value = call.to, call.value
storages = []
if to.type == VarType.SYMBOLIC:
storages += self._dependent_on_storage(to.val)
if value.type == VarType.SYMBOLIC:
storages += self._dependent_on_storage(value.val)
# See if they can change within the constraints of the node
for storage in storages:
variable = self._get_storage_variable(storage, state)
can_change = self._can_change(node.constraints, variable)
if can_change:
yield storage
def _get_influencing_sstores(self, statespace, interesting_storages):
""" Gets sstore (state, node) tuples that write to interesting_storages"""
for sstore_state, node in self._get_states_with_opcode(statespace, "SSTORE"):
index, value = sstore_state.mstate.stack[-1], sstore_state.mstate.stack[-2]
try:
index = util.get_concrete_int(index)
except TypeError:
index = str(index)
if "storage_{}".format(index) not in interesting_storages:
continue
yield sstore_state, node
# TODO: remove
def _try_constraints(self, constraints, new_constraints):
"""
Tries new constraints
:return Model if satisfiable otherwise None
"""
_constraints = copy.deepcopy(constraints)
for constraint in new_constraints:
_constraints.append(copy.deepcopy(constraint))
try:
index = util.get_concrete_int(index)
except TypeError:
index = str(index)
if "storage_{}".format(index) not in interesting_storages:
continue
yield sstore_state, node
# TODO: remove
def _try_constraints(constraints, new_constraints):
"""
Tries new constraints
:return Model if satisfiable otherwise None
"""
_constraints = copy.deepcopy(constraints)
for constraint in new_constraints:
_constraints.append(copy.deepcopy(constraint))
try:
model = solver.get_model(_constraints)
return model
except UnsatError:
return None
model = solver.get_model(_constraints)
return model
except UnsatError:
return None
detector = TxOrderDependenceModule()

@ -1,125 +1,127 @@
from mythril.analysis.report import Issue
from mythril.analysis.swc_data import UNCHECKED_RET_VAL
from mythril.analysis.modules.base import DetectionModule
from mythril.laser.ethereum.svm import NodeFlags
import logging
import re
"""
MODULE DESCRIPTION:
class UncheckedRetvalModule(DetectionModule):
def __init__(self):
super().__init__(
name="Unchecked Return Value",
swc_id=UNCHECKED_RET_VAL,
hooks=[],
description=(
"Test whether CALL return value is checked. "
"For direct calls, the Solidity compiler auto-generates this check. E.g.:\n"
" Alice c = Alice(address);\n"
" c.ping(42);\n"
"Here the CALL will be followed by IZSERO(retval), if retval = ZERO then state is reverted. "
"For low-level-calls this check is omitted. E.g.:\n"
' c.call.value(0)(bytes4(sha3("ping(uint256)")),1);'
),
)
Test whether CALL return value is checked.
def execute(self, statespace):
For direct calls, the Solidity compiler auto-generates this check. E.g.:
logging.debug("Executing module: UNCHECKED_RETVAL")
Alice c = Alice(address);
c.ping(42);
issues = []
Here the CALL will be followed by IZSERO(retval), if retval = ZERO then state is reverted.
for k in statespace.nodes:
For low-level-calls this check is omitted. E.g.:
node = statespace.nodes[k]
c.call.value(0)(bytes4(sha3("ping(uint256)")),1);
if NodeFlags.CALL_RETURN in node.flags:
"""
retval_checked = False
for state in node.states:
def execute(statespace):
instr = state.get_current_instruction()
logging.debug("Executing module: UNCHECKED_RETVAL")
if instr["opcode"] == "ISZERO" and re.search(
r"retval", str(state.mstate.stack[-1])
):
retval_checked = True
break
issues = []
if not retval_checked:
for k in statespace.nodes:
address = state.get_current_instruction()["address"]
issue = Issue(
contract=node.contract_name,
function_name=node.function_name,
address=address,
bytecode=state.environment.code.bytecode,
title="Unchecked CALL return value",
swc_id=UNCHECKED_RET_VAL,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
node = statespace.nodes[k]
issue.description = (
"The return value of an external call is not checked. "
"Note that execution continue even if the called contract throws."
)
if NodeFlags.CALL_RETURN in node.flags:
issues.append(issue)
retval_checked = False
else:
for state in node.states:
n_states = len(node.states)
instr = state.get_current_instruction()
for idx in range(
0, n_states - 1
): # Ignore CALLs at last position in a node
if instr["opcode"] == "ISZERO" and re.search(
r"retval", str(state.mstate.stack[-1])
):
retval_checked = True
break
state = node.states[idx]
instr = state.get_current_instruction()
if not retval_checked:
if instr["opcode"] == "CALL":
address = state.get_current_instruction()["address"]
issue = Issue(
contract=node.contract_name,
function_name=node.function_name,
address=address,
bytecode=state.environment.code.bytecode,
title="Unchecked CALL return value",
swc_id=UNCHECKED_RET_VAL,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
retval_checked = False
issue.description = (
"The return value of an external call is not checked. "
"Note that execution continue even if the called contract throws."
)
for _idx in range(idx, idx + 10):
issues.append(issue)
try:
_state = node.states[_idx]
_instr = _state.get_current_instruction()
else:
if _instr["opcode"] == "ISZERO" and re.search(
r"retval", str(_state.mstate.stack[-1])
):
retval_checked = True
break
n_states = len(node.states)
for idx in range(
0, n_states - 1
): # Ignore CALLs at last position in a node
except IndexError:
break
state = node.states[idx]
instr = state.get_current_instruction()
if not retval_checked:
if instr["opcode"] == "CALL":
address = instr["address"]
issue = Issue(
contract=node.contract_name,
function_name=node.function_name,
bytecode=state.environment.code.bytecode,
address=address,
title="Unchecked CALL return value",
swc_id=UNCHECKED_RET_VAL,
gas_used=(
state.mstate.min_gas_used,
state.mstate.max_gas_used,
),
)
retval_checked = False
issue.description = (
"The return value of an external call is not checked. "
"Note that execution continue even if the called contract throws."
)
for _idx in range(idx, idx + 10):
issues.append(issue)
try:
_state = node.states[_idx]
_instr = _state.get_current_instruction()
return issues
if _instr["opcode"] == "ISZERO" and re.search(
r"retval", str(_state.mstate.stack[-1])
):
retval_checked = True
break
except IndexError:
break
if not retval_checked:
address = instr["address"]
issue = Issue(
contract=node.contract_name,
function_name=node.function_name,
bytecode=state.environment.code.bytecode,
address=address,
title="Unchecked CALL return value",
swc_id=UNCHECKED_RET_VAL,
gas_used=(
state.mstate.min_gas_used,
state.mstate.max_gas_used,
),
)
issue.description = (
"The return value of an external call is not checked. "
"Note that execution continue even if the called contract throws."
)
issues.append(issue)
return issues
detector = UncheckedRetvalModule()

@ -1,22 +1,55 @@
from mythril.analysis.report import Report
from collections import defaultdict
from ethereum.opcodes import opcodes
from mythril.analysis import modules
import pkgutil
import logging
def fire_lasers(statespace, module_names=None):
OPCODE_LIST = [c[0] for _, c in opcodes.items()]
issues = []
def get_detection_module_hooks():
hook_dict = defaultdict(list)
_modules = get_detection_modules(entrypoint="callback")
for module in _modules:
for op_code in map(lambda x: x.upper(), module.detector.hooks):
if op_code in OPCODE_LIST:
hook_dict[op_code].append(module.detector.execute)
elif op_code.endswith("*"):
to_register = filter(lambda x: x.startswith(op_code[:-1]), OPCODE_LIST)
for actual_hook in to_register:
hook_dict[actual_hook].append(module.detector.execute)
else:
logging.error(
"Encountered invalid hook opcode %s in module %s",
op_code,
module.detector.name,
)
return dict(hook_dict)
def get_detection_modules(entrypoint, except_modules=()):
except_modules = list(except_modules) + ["base"]
_modules = []
for loader, name, is_pkg in pkgutil.walk_packages(modules.__path__):
_modules.append(loader.find_module(name).load_module(name))
for loader, name, _ in pkgutil.walk_packages(modules.__path__):
module = loader.find_module(name).load_module(name)
if (
module.__name__ not in except_modules
and module.detector.entrypoint == entrypoint
):
_modules.append(module)
logging.info("Found %s detection modules", len(_modules))
return _modules
def fire_lasers(statespace, module_names=()):
logging.info("Starting analysis")
for module in _modules:
if not module_names or module.__name__ in module_names:
logging.info("Executing " + str(module))
issues += module.execute(statespace)
issues = []
for module in get_detection_modules(entrypoint="post", except_modules=module_names):
logging.info("Executing " + module.detector.name)
issues += module.detector.execute(statespace)
return issues

@ -1,3 +1,4 @@
from mythril.analysis.security import get_detection_module_hooks
from mythril.laser.ethereum import svm
from mythril.laser.ethereum.state.account import Account
from mythril.ether.soliditycontract import SolidityContract, ETHContract
@ -59,6 +60,9 @@ class SymExecWrapper:
create_timeout=create_timeout,
max_transaction_count=max_transaction_count,
)
self.laser.register_hooks(
hook_type="post", hook_dict=get_detection_module_hooks()
)
if isinstance(contract, SolidityContract):
self.laser.sym_exec(

@ -1,5 +1,8 @@
import logging
from collections import defaultdict
from ethereum.opcodes import opcodes
from typing import List, Tuple, Union, Callable, Dict
from mythril.analysis.security import get_detection_modules
from mythril.disassembler.disassembly import Disassembly
from mythril.laser.ethereum.state.account import Account
from mythril.laser.ethereum.state.world_state import WorldState
@ -70,13 +73,26 @@ class LaserEVM:
self.time = None
self.pre_hooks = {}
self.post_hooks = {}
self.pre_hooks = defaultdict(list)
self.post_hooks = defaultdict(list)
logging.info(
"LASER EVM initialized with dynamic loader: " + str(dynamic_loader)
)
def register_hooks(self, hook_type: str, hook_dict: Dict[str, List[Callable]]):
if hook_type == "pre":
entrypoint = self.pre_hooks
elif hook_type == "post":
entrypoint = self.post_hooks
else:
raise ValueError(
"Invalid hook type %s. Must be one of {pre, post}", hook_type
)
for op_code, funcs in hook_dict.items():
entrypoint[op_code].extend(funcs)
@property
def accounts(self) -> Dict[str, Account]:
return self.world_state.accounts
@ -399,7 +415,7 @@ class LaserEVM:
for global_state in global_states:
hook(global_state)
def hook(self, op_code: str) -> Callable:
def pre_hook(self, op_code: str) -> Callable:
def hook_decorator(func: Callable):
if op_code not in self.pre_hooks.keys():
self.pre_hooks[op_code] = []

@ -1,8 +1,4 @@
from mythril.analysis.modules.delegatecall import (
execute,
_concrete_call,
_symbolic_call,
)
from mythril.analysis.modules.delegatecall import detector
from mythril.analysis.ops import Call, Variable, VarType
from mythril.analysis.symbolic import SymExecWrapper
from mythril.laser.ethereum.cfg import Node
@ -34,7 +30,7 @@ def test_concrete_call():
call = Call(node, state, None, None, to, None)
# act
issues = _concrete_call(call, state, address, meminstart)
issues = detector._concrete_call(call, state, address, meminstart)
# assert
issue = issues[0]
@ -71,7 +67,7 @@ def test_concrete_call_symbolic_to():
call = Call(node, state, None, None, to, None)
# act
issues = _concrete_call(call, state, address, meminstart)
issues = detector._concrete_call(call, state, address, meminstart)
# assert
issue = issues[0]
@ -96,7 +92,7 @@ def test_concrete_call_not_calldata():
meminstart = Variable(1, VarType.CONCRETE)
# act
issues = _concrete_call(None, state, None, meminstart)
issues = detector._concrete_call(None, state, None, meminstart)
# assert
assert issues == []
@ -126,7 +122,7 @@ def test_symbolic_call_storage_to(mocker):
statespace.find_storage_write.return_value = "Function name"
# act
issues = _symbolic_call(call, state, address, statespace)
issues = detector._symbolic_call(call, state, address, statespace)
# assert
issue = issues[0]
@ -167,7 +163,7 @@ def test_symbolic_call_calldata_to(mocker):
statespace.find_storage_write.return_value = "Function name"
# act
issues = _symbolic_call(call, state, address, statespace)
issues = detector._symbolic_call(call, state, address, statespace)
# assert
issue = issues[0]
@ -184,8 +180,8 @@ def test_symbolic_call_calldata_to(mocker):
@patch("mythril.laser.ethereum.state.global_state.GlobalState.get_current_instruction")
@patch("mythril.analysis.modules.delegatecall._concrete_call")
@patch("mythril.analysis.modules.delegatecall._symbolic_call")
@patch("mythril.analysis.modules.delegatecall.detector._concrete_call")
@patch("mythril.analysis.modules.delegatecall.detector._symbolic_call")
def test_delegate_call(sym_mock, concrete_mock, curr_instruction):
# arrange
# sym_mock = mocker.patch.object(delegatecall, "_symbolic_call")
@ -214,15 +210,15 @@ def test_delegate_call(sym_mock, concrete_mock, curr_instruction):
statespace.calls = [call]
# act
execute(statespace)
detector.execute(statespace)
# assert
assert concrete_mock.call_count == 1
assert sym_mock.call_count == 1
@patch("mythril.analysis.modules.delegatecall._concrete_call")
@patch("mythril.analysis.modules.delegatecall._symbolic_call")
@patch("mythril.analysis.modules.delegatecall.detector._concrete_call")
@patch("mythril.analysis.modules.delegatecall.detector._symbolic_call")
def test_delegate_call_not_delegate(sym_mock, concrete_mock):
# arrange
# sym_mock = mocker.patch.object(delegatecall, "_symbolic_call")
@ -240,7 +236,7 @@ def test_delegate_call_not_delegate(sym_mock, concrete_mock):
statespace.calls = [call]
# act
issues = execute(statespace)
issues = detector.execute(statespace)
# assert
assert issues == []
@ -248,8 +244,8 @@ def test_delegate_call_not_delegate(sym_mock, concrete_mock):
assert sym_mock.call_count == 0
@patch("mythril.analysis.modules.delegatecall._concrete_call")
@patch("mythril.analysis.modules.delegatecall._symbolic_call")
@patch("mythril.analysis.modules.delegatecall.detector._concrete_call")
@patch("mythril.analysis.modules.delegatecall.detector._symbolic_call")
def test_delegate_call_not_fallback(sym_mock, concrete_mock):
# arrange
# sym_mock = mocker.patch.object(delegatecall, "_symbolic_call")
@ -267,7 +263,7 @@ def test_delegate_call_not_fallback(sym_mock, concrete_mock):
statespace.calls = [call]
# act
issues = execute(statespace)
issues = detector.execute(statespace)
# assert
assert issues == []

@ -1,6 +1,4 @@
import json
from mythril.ether.soliditycontract import SolidityContract
from mythril.laser.ethereum.state.account import Account
from mythril.laser.ethereum.state.machine_state import MachineState
from mythril.laser.ethereum.state.global_state import GlobalState

@ -1,4 +1,5 @@
import json
from mythril.analysis.security import get_detection_module_hooks
from mythril.analysis.symbolic import SymExecWrapper
from mythril.analysis.callgraph import generate_graph
from mythril.ether.ethcontract import ETHContract
@ -85,6 +86,9 @@ class SVMTestCase(BaseTestCase):
accounts = {account.address: account}
laser = svm.LaserEVM(accounts, max_depth=22, max_transaction_count=1)
laser.register_hooks(
hook_type="post", hook_dict=get_detection_module_hooks()
)
laser.sym_exec(account.address)
laser_info = _all_info(laser)

Loading…
Cancel
Save