Merge branch 'develop' into feature/docs

pull/845/head
Dominik Muhs 6 years ago
commit 9fcc6c37c6
  1. 2
      MANIFEST.in
  2. 52
      mythril/analysis/call_helpers.py
  3. 7
      mythril/analysis/callgraph.py
  4. 4
      mythril/analysis/modules/base.py
  5. 109
      mythril/analysis/modules/delegatecall.py
  6. 308
      mythril/analysis/modules/dependence_on_predictable_vars.py
  7. 5
      mythril/analysis/modules/deprecated_ops.py
  8. 8
      mythril/analysis/modules/ether_thief.py
  9. 110
      mythril/analysis/modules/exceptions.py
  10. 10
      mythril/analysis/modules/external_calls.py
  11. 34
      mythril/analysis/modules/integer.py
  12. 9
      mythril/analysis/modules/suicide.py
  13. 5
      mythril/analysis/modules/transaction_order_dependence.py
  14. 9
      mythril/analysis/modules/unchecked_retval.py
  15. 2
      mythril/analysis/ops.py
  16. 4
      mythril/analysis/report.py
  17. 31
      mythril/analysis/security.py
  18. 25
      mythril/analysis/solver.py
  19. 3
      mythril/analysis/traceexplore.py
  20. 1
      mythril/disassembler/disassembly.py
  21. 6
      mythril/ethereum/interface/leveldb/accountindexing.py
  22. 4
      mythril/ethereum/interface/leveldb/client.py
  23. 6
      mythril/ethereum/interface/rpc/client.py
  24. 22
      mythril/interfaces/cli.py
  25. 53
      mythril/laser/ethereum/call.py
  26. 218
      mythril/laser/ethereum/instructions.py
  27. 26
      mythril/laser/ethereum/keccak.py
  28. 6
      mythril/laser/ethereum/natives.py
  29. 11
      mythril/laser/ethereum/state/account.py
  30. 91
      mythril/laser/ethereum/state/calldata.py
  31. 3
      mythril/laser/ethereum/state/environment.py
  32. 14
      mythril/laser/ethereum/state/global_state.py
  33. 36
      mythril/laser/ethereum/state/memory.py
  34. 11
      mythril/laser/ethereum/state/world_state.py
  35. 107
      mythril/laser/ethereum/svm.py
  36. 92
      mythril/laser/ethereum/taint_analysis.py
  37. 14
      mythril/laser/ethereum/transaction/concolic.py
  38. 36
      mythril/laser/ethereum/transaction/symbolic.py
  39. 15
      mythril/laser/ethereum/transaction/transaction_models.py
  40. 100
      mythril/laser/ethereum/util.py
  41. 28
      mythril/laser/smt/__init__.py
  42. 46
      mythril/laser/smt/array.py
  43. 106
      mythril/laser/smt/bitvec.py
  44. 32
      mythril/laser/smt/bool.py
  45. 4
      mythril/laser/smt/expression.py
  46. 66
      mythril/laser/smt/solver.py
  47. 20
      mythril/mythril.py
  48. 8
      mythril/support/loader.py
  49. 9
      mythril/support/signatures.py
  50. 4
      mythril/support/truffle.py
  51. 2
      mythril/version.py
  52. 1
      requirements.txt
  53. 3
      setup.py
  54. 9
      tests/laser/evm_testsuite/evm_test.py
  55. 9
      tests/laser/state/calldata_test.py
  56. 6
      tests/laser/state/mstate_test.py
  57. 4
      tests/laser/state/storage_test.py
  58. 2
      tests/testdata/input_contracts/suicide.sol

@ -1,2 +1,2 @@
include mythril/disassembler/signatures.db
include signatures.db
include mythril/analysis/templates/*

@ -0,0 +1,52 @@
from typing import Union
from mythril.analysis.ops import VarType, Call, get_variable
from mythril.laser.ethereum.state.global_state import GlobalState
def get_call_from_state(state: GlobalState) -> Union[Call, None]:
instruction = state.get_current_instruction()
op = instruction["opcode"]
stack = state.mstate.stack
if op in ("CALL", "CALLCODE"):
gas, to, value, meminstart, meminsz, memoutstart, memoutsz = (
get_variable(stack[-1]),
get_variable(stack[-2]),
get_variable(stack[-3]),
get_variable(stack[-4]),
get_variable(stack[-5]),
get_variable(stack[-6]),
get_variable(stack[-7]),
)
if to.type == VarType.CONCRETE and to.val < 5:
return None
if meminstart.type == VarType.CONCRETE and meminsz.type == VarType.CONCRETE:
return Call(
state.node,
state,
None,
op,
to,
gas,
value,
state.mstate.memory[meminstart.val : meminsz.val * 4],
)
else:
return Call(state.node, state, None, op, to, gas, value)
else:
gas, to, meminstart, meminsz, memoutstart, memoutsz = (
get_variable(stack[-1]),
get_variable(stack[-2]),
get_variable(stack[-3]),
get_variable(stack[-4]),
get_variable(stack[-5]),
get_variable(stack[-6]),
)
return Call(state.node, state, None, op, to, gas)

@ -2,7 +2,8 @@ import re
from jinja2 import Environment, PackageLoader, select_autoescape
from mythril.laser.ethereum.svm import NodeFlags
import z3
from z3 import Z3Exception
from mythril.laser.smt import simplify
default_opts = {
"autoResize": True,
@ -185,8 +186,8 @@ def extract_edges(statespace):
label = ""
else:
try:
label = str(z3.simplify(edge.condition)).replace("\n", "")
except z3.Z3Exception:
label = str(simplify(edge.condition)).replace("\n", "")
except Z3Exception:
label = str(edge.condition).replace("\n", "")
label = re.sub(

@ -1,6 +1,8 @@
import logging
from typing import List
log = logging.getLogger(__name__)
class DetectionModule:
"""The base detection module.
@ -28,7 +30,7 @@ class DetectionModule:
self.hooks = hooks
self.description = description
if entrypoint not in ("post", "callback"):
logging.error(
log.error(
"Invalid entrypoint in module %s, must be one of {post, callback}",
self.name,
)

@ -1,9 +1,16 @@
import re
import logging
from typing import List
from mythril.analysis.swc_data import DELEGATECALL_TO_UNTRUSTED_CONTRACT
from mythril.analysis.ops import get_variable, VarType
from mythril.analysis.ops import get_variable, VarType, Call, Variable
from mythril.analysis.report import Issue
from mythril.analysis.call_helpers import get_call_from_state
from mythril.analysis.modules.base import DetectionModule
import logging
from mythril.laser.ethereum.state.global_state import GlobalState
log = logging.getLogger(__name__)
class DelegateCallModule(DetectionModule):
@ -14,57 +21,77 @@ class DelegateCallModule(DetectionModule):
swc_id=DELEGATECALL_TO_UNTRUSTED_CONTRACT,
hooks=["DELEGATECALL"],
description="Check for invocations of delegatecall(msg.data) in the fallback function.",
entrypoint="callback",
)
self._issues = []
def execute(self, statespace):
"""
Executes analysis module for delegate call analysis module
:param statespace: Statespace to analyse
:return: Found issues
"""
issues = []
@property
def issues(self) -> list:
return self._issues
for call in statespace.calls:
def execute(self, state: GlobalState) -> list:
log.debug("Executing module: DELEGATE_CALL")
self._issues.extend(_analyze_states(state))
return self.issues
if call.type is not "DELEGATECALL":
continue
if call.node.function_name is not "fallback":
continue
state = call.state
address = state.get_current_instruction()["address"]
meminstart = get_variable(state.mstate.stack[-3])
def _analyze_states(state: GlobalState) -> List[Issue]:
"""
:param state: the current state
:return: returns the issues for that corresponding state
"""
call = get_call_from_state(state)
issues = []
if meminstart.type == VarType.CONCRETE:
issues += self._concrete_call(call, state, address, meminstart)
if call.type is not "DELEGATECALL":
return []
if call.node.function_name is not "fallback":
return []
return issues
state = call.state
address = state.get_current_instruction()["address"]
meminstart = get_variable(state.mstate.stack[-3])
def _concrete_call(self, call, state, address, meminstart):
if not re.search(r"calldata.*_0", str(state.mstate.memory[meminstart.val])):
return []
if meminstart.type == VarType.CONCRETE:
issues += _concrete_call(call, state, address, meminstart)
issue = Issue(
contract=call.node.contract_name,
function_name=call.node.function_name,
address=address,
swc_id=DELEGATECALL_TO_UNTRUSTED_CONTRACT,
bytecode=state.environment.code.bytecode,
title="Call data forwarded with delegatecall()",
_type="Informational",
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
return issues
issue.description = (
"This contract forwards its call data via DELEGATECALL in its fallback function. "
"This means that any function in the called contract can be executed. Note that the callee contract will have "
"access to the storage of the calling contract.\n"
)
target = hex(call.to.val) if call.to.type == VarType.CONCRETE else str(call.to)
issue.description += "DELEGATECALL target: {}".format(target)
def _concrete_call(
call: Call, state: GlobalState, address: int, meminstart: Variable
) -> List[Issue]:
"""
:param call: The current call's information
:param state: The current state
:param address: The PC address
:param meminstart: memory starting position
:return: issues
"""
if not re.search(r"calldata.*\[0", str(state.mstate.memory[meminstart.val])):
return []
issue = Issue(
contract=call.node.contract_name,
function_name=call.node.function_name,
address=address,
swc_id=DELEGATECALL_TO_UNTRUSTED_CONTRACT,
bytecode=state.environment.code.bytecode,
title="Call data forwarded with delegatecall()",
_type="Informational",
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
issue.description = (
"This contract forwards its call data via DELEGATECALL in its fallback function. "
"This means that any function in the called contract can be executed. Note that the callee contract will have "
"access to the storage of the calling contract.\n"
)
target = hex(call.to.val) if call.to.type == VarType.CONCRETE else str(call.to)
issue.description += "DELEGATECALL target: {}".format(target)
return [issue]
return [issue]
detector = DelegateCallModule()

@ -1,13 +1,16 @@
import re
from z3 import *
from mythril.analysis.ops import VarType
from mythril.laser.ethereum.state.global_state import GlobalState
from mythril.analysis.ops import Call, VarType
from mythril.analysis import solver
from mythril.analysis.report import Issue
from mythril.analysis.call_helpers import get_call_from_state
from mythril.analysis.swc_data import TIMESTAMP_DEPENDENCE, PREDICTABLE_VARS_DEPENDENCE
from mythril.analysis.modules.base import DetectionModule
from mythril.exceptions import UnsatError
import logging
log = logging.getLogger(__name__)
class PredictableDependenceModule(DetectionModule):
"""This module detects whether Ether is sent using predictable parameters."""
@ -15,183 +18,190 @@ class PredictableDependenceModule(DetectionModule):
super().__init__(
name="Dependence of Predictable Variables",
swc_id="{} {}".format(TIMESTAMP_DEPENDENCE, PREDICTABLE_VARS_DEPENDENCE),
hooks=["COINBASE", "GASLIMIT", "TIMESTAMP", "NUMBER", "BLOCKHASH"],
hooks=["CALL", "CALLCODE", "DELEGATECALL", "STATICCALL"],
description=(
"Check for CALLs that send >0 Ether as a result of computation "
"based on predictable variables such as block.coinbase, "
"block.gaslimit, block.timestamp, block.number"
),
entrypoint="callback",
)
self._issues = []
@property
def issues(self) -> list:
return self._issues
def execute(self, statespace):
def execute(self, state: GlobalState) -> list:
"""
:param statespace:
:return:
"""
logging.debug("Executing module: DEPENDENCE_ON_PREDICTABLE_VARS")
log.debug("Executing module: DEPENDENCE_ON_PREDICTABLE_VARS")
self._issues.extend(_analyze_states(state))
return self.issues
detector = PredictableDependenceModule()
def _analyze_states(state: GlobalState) -> list:
issues = []
call = get_call_from_state(state)
if call is None:
return []
if "callvalue" in str(call.value):
log.debug("[DEPENDENCE_ON_PREDICTABLE_VARS] Skipping refund function")
return []
issues = []
# We're only interested in calls that send Ether
if call.value.type == VarType.CONCRETE and call.value.val == 0:
return []
for call in statespace.calls:
address = call.state.get_current_instruction()["address"]
if "callvalue" in str(call.value):
logging.debug(
"[DEPENDENCE_ON_PREDICTABLE_VARS] Skipping refund function"
)
continue
description = "In the function `" + call.node.function_name + "` "
description += "the following predictable state variables are used to determine Ether recipient:\n"
# We're only interested in calls that send Ether
# First check: look for predictable state variables in node & call recipient constraints
if call.value.type == VarType.CONCRETE and call.value.val == 0:
continue
vars = ["coinbase", "gaslimit", "timestamp", "number"]
found = []
address = call.state.get_current_instruction()["address"]
for var in vars:
for constraint in call.node.constraints[:] + [call.to]:
if var in str(constraint):
found.append(var)
if len(found):
for item in found:
description += "- block.{}\n".format(item)
if solve(call):
swc_type = (
TIMESTAMP_DEPENDENCE
if item == "timestamp"
else PREDICTABLE_VARS_DEPENDENCE
)
issue = Issue(
contract=call.node.contract_name,
function_name=call.node.function_name,
address=address,
swc_id=swc_type,
bytecode=call.state.environment.code.bytecode,
title="Dependence on predictable environment variable",
_type="Warning",
description=description,
gas_used=(
call.state.mstate.min_gas_used,
call.state.mstate.max_gas_used,
),
)
issues.append(issue)
# Second check: blockhash
for constraint in call.node.constraints[:] + [call.to]:
if "blockhash" in str(constraint):
description = "In the function `" + call.node.function_name + "` "
description += "the following predictable state variables are used to determine Ether recipient:\n"
# First check: look for predictable state variables in node & call recipient constraints
vars = ["coinbase", "gaslimit", "timestamp", "number"]
found = []
for var in vars:
for constraint in call.node.constraints[:] + [call.to]:
if var in str(constraint):
found.append(var)
if len(found):
for item in found:
description += "- block.{}\n".format(item)
if self.solve(call):
swc_type = (
TIMESTAMP_DEPENDENCE
if item == "timestamp"
else PREDICTABLE_VARS_DEPENDENCE
)
if "number" in str(constraint):
m = re.search(r"blockhash\w+(\s-\s(\d+))*", str(constraint))
if m and solve(call):
found = m.group(1)
if found: # block.blockhash(block.number - N)
description += (
"predictable expression 'block.blockhash(block.number - "
+ m.group(2)
+ ")' is used to determine Ether recipient"
)
if int(m.group(2)) > 255:
description += (
", this expression will always be equal to zero."
)
elif "storage" in str(
constraint
): # block.blockhash(block.number - storage_0)
description += (
"predictable expression 'block.blockhash(block.number - "
+ "some_storage_var)' is used to determine Ether recipient"
)
else: # block.blockhash(block.number)
description += (
"predictable expression 'block.blockhash(block.number)'"
+ " is used to determine Ether recipient"
)
description += ", this expression will always be equal to zero."
issue = Issue(
contract=call.node.contract_name,
function_name=call.node.function_name,
address=address,
swc_id=swc_type,
bytecode=call.state.environment.code.bytecode,
title="Dependence on predictable environment variable",
title="Dependence on predictable variable",
_type="Warning",
description=description,
swc_id=PREDICTABLE_VARS_DEPENDENCE,
gas_used=(
call.state.mstate.min_gas_used,
call.state.mstate.max_gas_used,
),
)
issues.append(issue)
# Second check: blockhash
for constraint in call.node.constraints[:] + [call.to]:
if "blockhash" in str(constraint):
description = "In the function `" + call.node.function_name + "` "
if "number" in str(constraint):
m = re.search(r"blockhash\w+(\s-\s(\d+))*", str(constraint))
if m and self.solve(call):
found = m.group(1)
if found: # block.blockhash(block.number - N)
description += (
"predictable expression 'block.blockhash(block.number - "
+ m.group(2)
+ ")' is used to determine Ether recipient"
)
if int(m.group(2)) > 255:
description += ", this expression will always be equal to zero."
elif "storage" in str(
constraint
): # block.blockhash(block.number - storage_0)
description += (
"predictable expression 'block.blockhash(block.number - "
+ "some_storage_var)' is used to determine Ether recipient"
)
else: # block.blockhash(block.number)
description += (
"predictable expression 'block.blockhash(block.number)'"
+ " is used to determine Ether recipient"
)
description += (
", this expression will always be equal to zero."
)
issue = Issue(
contract=call.node.contract_name,
function_name=call.node.function_name,
address=address,
bytecode=call.state.environment.code.bytecode,
title="Dependence on predictable variable",
_type="Warning",
description=description,
swc_id=PREDICTABLE_VARS_DEPENDENCE,
gas_used=(
call.state.mstate.min_gas_used,
call.state.mstate.max_gas_used,
),
)
issues.append(issue)
break
else:
r = re.search(r"storage_([a-z0-9_&^]+)", str(constraint))
if r: # block.blockhash(storage_0)
"""
We actually can do better here by adding a constraint blockhash_block_storage_0 == 0
and checking model satisfiability. When this is done, severity can be raised
from 'Informational' to 'Warning'.
Checking that storage at given index can be tainted is not necessary, since it usually contains
block.number of the 'commit' transaction in commit-reveal workflow.
"""
index = r.group(1)
if index and self.solve(call):
description += (
"block.blockhash() is calculated using a value from storage "
"at index {}".format(index)
)
issue = Issue(
contract=call.node.contract_name,
function_name=call.node.function_name,
address=address,
bytecode=call.state.environment.code.bytecode,
title="Dependence on predictable variable",
_type="Informational",
description=description,
swc_id=PREDICTABLE_VARS_DEPENDENCE,
gas_used=(
call.state.mstate.min_gas_used,
call.state.mstate.max_gas_used,
),
)
issues.append(issue)
break
return issues
def solve(self, call):
"""
:param call:
:return:
"""
try:
model = solver.get_model(call.node.constraints)
logging.debug("[DEPENDENCE_ON_PREDICTABLE_VARS] MODEL: " + str(model))
pretty_model = solver.pretty_print_model(model)
logging.debug(
"[DEPENDENCE_ON_PREDICTABLE_VARS] main model: \n%s" % pretty_model
)
return True
except UnsatError:
logging.debug("[DEPENDENCE_ON_PREDICTABLE_VARS] no model found")
return False
detector = PredictableDependenceModule()
break
else:
r = re.search(r"storage_([a-z0-9_&^]+)", str(constraint))
if r: # block.blockhash(storage_0)
"""
We actually can do better here by adding a constraint blockhash_block_storage_0 == 0
and checking model satisfiability. When this is done, severity can be raised
from 'Informational' to 'Warning'.
Checking that storage at given index can be tainted is not necessary, since it usually contains
block.number of the 'commit' transaction in commit-reveal workflow.
"""
index = r.group(1)
if index and solve(call):
description += (
"block.blockhash() is calculated using a value from storage "
"at index {}".format(index)
)
issue = Issue(
contract=call.node.contract_name,
function_name=call.node.function_name,
address=address,
bytecode=call.state.environment.code.bytecode,
title="Dependence on predictable variable",
_type="Informational",
description=description,
swc_id=PREDICTABLE_VARS_DEPENDENCE,
gas_used=(
call.state.mstate.min_gas_used,
call.state.mstate.max_gas_used,
),
)
issues.append(issue)
break
return issues
def solve(call: Call) -> bool:
"""
:param call:
:return:
"""
try:
model = solver.get_model(call.node.constraints)
log.debug("[DEPENDENCE_ON_PREDICTABLE_VARS] MODEL: " + str(model))
pretty_model = solver.pretty_print_model(model)
log.debug("[DEPENDENCE_ON_PREDICTABLE_VARS] main model: \n%s" % pretty_model)
return True
except UnsatError:
log.debug("[DEPENDENCE_ON_PREDICTABLE_VARS] no model found")
return False

@ -4,6 +4,7 @@ from mythril.analysis.modules.base import DetectionModule
from mythril.laser.ethereum.state.global_state import GlobalState
import logging
log = logging.getLogger(__name__)
DESCRIPTION = """
Check for usage of deprecated opcodes
@ -15,7 +16,7 @@ def _analyze_state(state):
instruction = state.get_current_instruction()
if instruction["opcode"] == "ORIGIN":
logging.debug("ORIGIN in function " + node.function_name)
log.debug("ORIGIN in function " + node.function_name)
title = "Use of tx.origin"
description = (
"The function `{}` retrieves the transaction origin (tx.origin) using the ORIGIN opcode. "
@ -27,7 +28,7 @@ def _analyze_state(state):
swc_id = DEPRICATED_FUNCTIONS_USAGE
elif instruction["opcode"] == "CALLCODE":
logging.debug("CALLCODE in function " + node.function_name)
log.debug("CALLCODE in function " + node.function_name)
title = "Use of callcode"
description = (
"The function `{}` uses callcode. Callcode does not persist sender or value over the call. "

@ -5,10 +5,12 @@ from mythril.analysis.swc_data import UNPROTECTED_ETHER_WITHDRAWAL
from mythril.analysis.modules.base import DetectionModule
from mythril.laser.ethereum.state.global_state import GlobalState
from mythril.exceptions import UnsatError
from z3 import BitVecVal, UGT, Sum
from mythril.laser.smt import symbol_factory, UGT, Sum, BVAddNoOverflow
import logging
from copy import copy
log = logging.getLogger(__name__)
DESCRIPTION = """
Search for cases where Ether can be withdrawn to a user-specified address.
@ -33,7 +35,7 @@ def _analyze_state(state):
call_value = state.mstate.stack[-3]
target = state.mstate.stack[-2]
eth_sent_total = BitVecVal(0, 256)
eth_sent_total = symbol_factory.BitVecVal(0, 256)
constraints = copy(node.constraints)
@ -68,7 +70,7 @@ def _analyze_state(state):
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
except UnsatError:
logging.debug("[ETHER_THIEF] no model found")
log.debug("[ETHER_THIEF] no model found")
return []
return [issue]

@ -5,74 +5,68 @@ from mythril.analysis import solver
from mythril.analysis.modules.base import DetectionModule
import logging
from mythril.laser.ethereum.state.global_state import GlobalState
log = logging.getLogger(__name__)
def _analyze_state(state) -> list:
log.info("Exceptions module: found ASSERT_FAIL instruction")
node = state.node
log.debug("ASSERT_FAIL in function " + node.function_name)
try:
address = state.get_current_instruction()["address"]
description = (
"A reachable exception (opcode 0xfe) has been detected. "
"This can be caused by type errors, division by zero, "
"out-of-bounds array access, or assert violations. "
"Note that explicit `assert()` should only be used to check invariants. "
"Use `require()` for regular input checking."
)
debug = str(solver.get_transaction_sequence(state, node.constraints))
issue = Issue(
contract=node.contract_name,
function_name=node.function_name,
address=address,
swc_id=ASSERT_VIOLATION,
title="Exception state",
_type="Informational",
description=description,
bytecode=state.environment.code.bytecode,
debug=debug,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
return [issue]
except UnsatError:
log.debug("no model found")
return []
class ReachableExceptionsModule(DetectionModule):
"""This module checks whether any exception states are reachable."""
def __init__(self):
super().__init__(
name="Reachable Exceptions",
swc_id=ASSERT_VIOLATION,
hooks=["ASSERT_FAIL"],
description="Checks whether any exception states are reachable.",
entrypoint="callback",
)
self._issues = []
def execute(self, state: GlobalState) -> list:
self._issues.extend(_analyze_state(state))
return self.issues
def execute(self, statespace):
"""
:param statespace:
:return:
"""
logging.debug("Executing module: EXCEPTIONS")
issues = []
for k in statespace.nodes:
node = statespace.nodes[k]
for state in node.states:
instruction = state.get_current_instruction()
if instruction["opcode"] == "ASSERT_FAIL":
try:
model = solver.get_model(node.constraints)
address = state.get_current_instruction()["address"]
description = (
"A reachable exception (opcode 0xfe) has been detected. "
"This can be caused by type errors, division by zero, "
"out-of-bounds array access, or assert violations. "
)
description += (
"Note that explicit `assert()` should only be used to check invariants. "
"Use `require()` for regular input checking."
)
debug = str(
solver.get_transaction_sequence(state, node.constraints)
)
issues.append(
Issue(
contract=node.contract_name,
function_name=node.function_name,
address=address,
swc_id=ASSERT_VIOLATION,
title="Exception state",
_type="Informational",
description=description,
bytecode=state.environment.code.bytecode,
debug=debug,
gas_used=(
state.mstate.min_gas_used,
state.mstate.max_gas_used,
),
)
)
except UnsatError:
logging.debug("[EXCEPTIONS] no model found")
return issues
@property
def issues(self) -> list:
return self._issues
detector = ReachableExceptionsModule()

@ -1,12 +1,14 @@
from z3 import *
from mythril.analysis.report import Issue
from mythril.analysis import solver
from mythril.analysis.swc_data import REENTRANCY
from mythril.analysis.modules.base import DetectionModule
from mythril.laser.smt import UGT, symbol_factory
from mythril.laser.ethereum.state.global_state import GlobalState
from mythril.exceptions import UnsatError
import logging
log = logging.getLogger(__name__)
DESCRIPTION = """
Search for low level calls (e.g. call.value()) that forward all gas to the callee.
@ -27,7 +29,7 @@ def _analyze_state(state):
try:
constraints = node.constraints
transaction_sequence = solver.get_transaction_sequence(
state, constraints + [UGT(gas, BitVecVal(2300, 256))]
state, constraints + [UGT(gas, symbol_factory.BitVecVal(2300, 256))]
)
# Check whether we can also set the callee address
@ -58,7 +60,7 @@ def _analyze_state(state):
except UnsatError:
logging.debug(
log.debug(
"[EXTERNAL_CALLS] Callee address cannot be modified. Reporting informational issue."
)
@ -82,7 +84,7 @@ def _analyze_state(state):
)
except UnsatError:
logging.debug("[EXTERNAL_CALLS] No model found.")
log.debug("[EXTERNAL_CALLS] No model found.")
return []
return [issue]

@ -1,16 +1,26 @@
from z3 import *
from mythril.analysis import solver
from mythril.analysis.ops import *
from mythril.analysis.report import Issue
from mythril.analysis.swc_data import INTEGER_OVERFLOW_AND_UNDERFLOW
from mythril.exceptions import UnsatError
from mythril.laser.ethereum.taint_analysis import TaintRunner
from mythril.analysis.modules.base import DetectionModule
import re
from mythril.laser.smt import (
BVAddNoOverflow,
BVSubNoUnderflow,
BVMulNoOverflow,
BitVec,
symbol_factory,
Not,
)
import copy
import logging
log = logging.getLogger(__name__)
class IntegerOverflowUnderflowModule(DetectionModule):
"""This module searches for integer over- and underflows."""
def __init__(self):
@ -31,7 +41,7 @@ class IntegerOverflowUnderflowModule(DetectionModule):
:param statespace: Statespace to analyse
:return: Found issues
"""
logging.debug("Executing module: INTEGER")
log.debug("Executing module: INTEGER")
issues = []
@ -65,15 +75,15 @@ class IntegerOverflowUnderflowModule(DetectionModule):
# An integer overflow is possible if op0 + op1 or op0 * op1 > MAX_UINT
# Do a type check
allowed_types = [int, BitVecRef, BitVecNumRef]
allowed_types = [int, BitVec]
if not (type(op0) in allowed_types and type(op1) in allowed_types):
return issues
# Change ints to BitVec
if type(op0) is int:
op0 = BitVecVal(op0, 256)
op0 = symbol_factory.BitVecVal(op0, 256)
if type(op1) is int:
op1 = BitVecVal(op1, 256)
op1 = symbol_factory.BitVecVal(op1, 256)
# Formulate expression
# FIXME: handle exponentiation
@ -90,7 +100,7 @@ class IntegerOverflowUnderflowModule(DetectionModule):
model = self._try_constraints(node.constraints, [constraint])
if model is None:
logging.debug("[INTEGER_OVERFLOW] no model found")
log.debug("[INTEGER_OVERFLOW] no model found")
return issues
# Build issue
@ -171,13 +181,13 @@ class IntegerOverflowUnderflowModule(DetectionModule):
if type(op0) == int and type(op1) == int:
return []
logging.debug(
log.debug(
"[INTEGER_UNDERFLOW] Checking SUB {0}, {1} at address {2}".format(
str(op0), str(op1), str(instruction["address"])
)
)
allowed_types = [int, BitVecRef, BitVecNumRef]
allowed_types = [int, BitVec]
if type(op0) in allowed_types and type(op1) in allowed_types:
constraints.append(Not(BVSubNoUnderflow(op0, op1, signed=False)))
@ -216,7 +226,7 @@ class IntegerOverflowUnderflowModule(DetectionModule):
issues.append(issue)
except UnsatError:
logging.debug("[INTEGER_UNDERFLOW] no model found")
log.debug("[INTEGER_UNDERFLOW] no model found")
return issues
def _check_usage(self, state, taint_result):
@ -267,7 +277,7 @@ class IntegerOverflowUnderflowModule(DetectionModule):
if constraint is None:
constraint = []
logging.debug("SEARCHING NODE for usage of an overflowed variable %d", node.uid)
log.debug("SEARCHING NODE for usage of an overflowed variable %d", node.uid)
if taint_result is None:
state = node.states[index]

@ -1,5 +1,4 @@
from mythril.analysis import solver
from mythril.analysis.ops import *
from mythril.analysis.report import Issue
from mythril.analysis.swc_data import UNPROTECTED_SELFDESTRUCT
from mythril.exceptions import UnsatError
@ -7,6 +6,8 @@ from mythril.analysis.modules.base import DetectionModule
from mythril.laser.ethereum.state.global_state import GlobalState
import logging
log = logging.getLogger(__name__)
DESCRIPTION = """
Check if the contact can be 'accidentally' killed by anyone.
For kill-able contracts, also check whether it is possible to direct the contract balance to the attacker.
@ -14,12 +15,12 @@ For kill-able contracts, also check whether it is possible to direct the contrac
def _analyze_state(state):
logging.info("Suicide module: Analyzing suicide instruction")
log.info("Suicide module: Analyzing suicide instruction")
node = state.node
instruction = state.get_current_instruction()
to = state.mstate.stack[-1]
logging.debug("[SUICIDE] SUICIDE in function " + node.function_name)
log.debug("[SUICIDE] SUICIDE in function " + node.function_name)
try:
try:
@ -52,7 +53,7 @@ def _analyze_state(state):
)
return [issue]
except UnsatError:
logging.info("[UNCHECKED_SUICIDE] no model found")
log.info("[UNCHECKED_SUICIDE] no model found")
return []

@ -10,6 +10,9 @@ from mythril.analysis.modules.base import DetectionModule
from mythril.exceptions import UnsatError
log = logging.getLogger(__name__)
class TxOrderDependenceModule(DetectionModule):
"""This module finds the existance of transaction order dependence."""
def __init__(self):
@ -27,7 +30,7 @@ class TxOrderDependenceModule(DetectionModule):
def execute(self, statespace):
""" Executes the analysis module"""
logging.debug("Executing module: TOD")
log.debug("Executing module: TOD")
issues = []

@ -6,9 +6,10 @@ from mythril.laser.ethereum.svm import NodeFlags
import logging
import re
log = logging.getLogger(__name__)
class UncheckedRetvalModule(DetectionModule):
"""This module checks whether CALL return value is checked."""
def __init__(self):
super().__init__(
name="Unchecked Return Value",
@ -26,12 +27,8 @@ class UncheckedRetvalModule(DetectionModule):
)
def execute(self, statespace):
"""
:param statespace:
:return:
"""
logging.debug("Executing module: UNCHECKED_RETVAL")
log.debug("Executing module: UNCHECKED_RETVAL")
issues = []

@ -1,4 +1,4 @@
from z3 import *
from mythril.laser.smt import simplify
from enum import Enum
from mythril.laser.ethereum import util

@ -5,6 +5,8 @@ from jinja2 import PackageLoader, Environment
import _pysha3 as sha3
import hashlib
log = logging.getLogger(__name__)
class Issue:
"""
@ -42,7 +44,7 @@ class Issue:
keccak.update(bytes.fromhex(bytecode))
self.bytecode_hash = "0x" + keccak.hexdigest()
except ValueError:
logging.debug(
log.debug(
"Unable to change the bytecode to bytes. Bytecode: {}".format(bytecode)
)
self.bytecode_hash = ""

@ -5,25 +5,18 @@ import pkgutil
import importlib.util
import logging
log = logging.getLogger(__name__)
OPCODE_LIST = [c[0] for _, c in opcodes.items()]
def reset_callback_modules():
"""
"""
modules = get_detection_modules("callback")
for module in modules:
module.detector._issues = []
def get_detection_module_hooks(modules):
"""
:param modules:
:return:
"""
hook_dict = defaultdict(list)
_modules = get_detection_modules(entrypoint="callback", include_modules=modules)
for module in _modules:
@ -35,7 +28,7 @@ def get_detection_module_hooks(modules):
for actual_hook in to_register:
hook_dict[actual_hook].append(module.detector.execute)
else:
logging.error(
log.error(
"Encountered invalid hook opcode %s in module %s",
op_code,
module.detector.name,
@ -44,12 +37,6 @@ def get_detection_module_hooks(modules):
def get_detection_modules(entrypoint, include_modules=()):
"""
:param entrypoint:
:param include_modules:
:return:
"""
include_modules = list(include_modules)
_modules = []
@ -68,30 +55,24 @@ def get_detection_modules(entrypoint, include_modules=()):
if module.__name__ != "base" and module.detector.entrypoint == entrypoint:
_modules.append(module)
logging.info("Found %s detection modules", len(_modules))
log.info("Found %s detection modules", len(_modules))
return _modules
def fire_lasers(statespace, module_names=()):
"""
:param statespace:
:param module_names:
:return:
"""
logging.info("Starting analysis")
log.info("Starting analysis")
issues = []
for module in get_detection_modules(
entrypoint="post", include_modules=module_names
):
logging.info("Executing " + module.detector.name)
log.info("Executing " + module.detector.name)
issues += module.detector.execute(statespace)
for module in get_detection_modules(
entrypoint="callback", include_modules=module_names
):
logging.debug("Retrieving results for " + module.detector.name)
log.debug("Retrieving results for " + module.detector.name)
issues += module.detector.issues
reset_callback_modules()

@ -1,10 +1,13 @@
from z3 import Solver, simplify, sat, unknown, FuncInterp, UGE, Optimize
from z3 import sat, unknown, FuncInterp
from mythril.laser.smt import simplify, UGE, Optimize, symbol_factory
from mythril.exceptions import UnsatError
from mythril.laser.ethereum.transaction.transaction_models import (
ContractCreationTransaction,
)
import logging
log = logging.getLogger(__name__)
def get_model(constraints, minimize=(), maximize=()):
"""
@ -15,7 +18,7 @@ def get_model(constraints, minimize=(), maximize=()):
:return:
"""
s = Optimize()
s.set("timeout", 100000)
s.set_timeout(100000)
for constraint in constraints:
if type(constraint) == bool and not constraint:
@ -34,7 +37,7 @@ def get_model(constraints, minimize=(), maximize=()):
if result == sat:
return s.model()
elif result == unknown:
logging.debug("Timeout encountered while solving expression using z3")
log.debug("Timeout encountered while solving expression using z3")
raise UnsatError
@ -86,13 +89,12 @@ def get_transaction_sequence(global_state, constraints):
minimize = []
transactions = []
model = None
for transaction in transaction_sequence:
tx_id = str(transaction.id)
if not isinstance(transaction, ContractCreationTransaction):
transactions.append(transaction)
# Constrain calldatasize
max_calldatasize = 5000
max_calldatasize = symbol_factory.BitVecVal(5000, 256)
tx_constraints.append(
UGE(max_calldatasize, transaction.call_data.calldatasize)
)
@ -102,16 +104,10 @@ def get_transaction_sequence(global_state, constraints):
concrete_transactions[tx_id] = tx_template.copy()
try:
model = get_model(tx_constraints, minimize=minimize)
break
except UnsatError:
continue
else:
creation_tx_ids.append(tx_id)
if model is None:
model = get_model(tx_constraints, minimize=minimize)
model = get_model(tx_constraints, minimize=minimize)
for transaction in transactions:
tx_id = str(transaction.id)
@ -124,10 +120,11 @@ def get_transaction_sequence(global_state, constraints):
)
concrete_transactions[tx_id]["call_value"] = (
"0x%x" % model.eval(transaction.call_value, model_completion=True).as_long()
"0x%x"
% model.eval(transaction.call_value.raw, model_completion=True).as_long()
)
concrete_transactions[tx_id]["caller"] = "0x" + (
"%x" % model.eval(transaction.caller, model_completion=True).as_long()
"%x" % model.eval(transaction.caller.raw, model_completion=True).as_long()
).zfill(40)
return concrete_transactions

@ -1,4 +1,5 @@
from z3 import Z3Exception, simplify
from z3 import Z3Exception
from mythril.laser.smt import simplify
from mythril.laser.ethereum.svm import NodeFlags
import re

@ -1,7 +1,6 @@
from mythril.ethereum import util
from mythril.disassembler import asm
from mythril.support.signatures import SignatureDB
import logging
class Disassembly(object):

@ -8,6 +8,8 @@ from ethereum import utils
from ethereum.utils import hash32, address, int256
from mythril.exceptions import AddressNotFoundError
log = logging.getLogger(__name__)
BATCH_SIZE = 8 * 4096
@ -85,9 +87,7 @@ class AccountIndexer(object):
"""
Processesing method
"""
logging.debug(
"Processing blocks %d to %d" % (startblock, startblock + BATCH_SIZE)
)
log.debug("Processing blocks %d to %d" % (startblock, startblock + BATCH_SIZE))
addresses = []

@ -13,6 +13,8 @@ from mythril.ethereum.interface.leveldb.eth_db import ETH_DB
from mythril.ethereum.evmcontract import EVMContract
from mythril.exceptions import AddressNotFoundError
log = logging.getLogger(__name__)
# Per https://github.com/ethereum/go-ethereum/blob/master/core/rawdb/schema.go
# prefixes and suffixes for keys in geth
header_prefix = b"h" # header_prefix + num (uint64 big endian) + hash -> header
@ -213,7 +215,7 @@ class EthLevelDB(object):
cnt += 1
if not cnt % 1000:
logging.info("Searched %d contracts" % cnt)
log.info("Searched %d contracts" % cnt)
def contract_hash_to_address(self, contract_hash):
"""Tries to find corresponding account address"""

@ -11,6 +11,8 @@ from .exceptions import (
)
from .base_client import BaseClient
log = logging.getLogger(__name__)
GETH_DEFAULT_RPC_PORT = 8545
ETH_DEFAULT_RPC_PORT = 8545
PARITY_DEFAULT_RPC_PORT = 8545
@ -44,7 +46,7 @@ class EthJsonRpc(BaseClient):
scheme += "s"
url = "{}://{}:{}".format(scheme, self.host, self.port)
headers = {"Content-Type": JSON_MEDIA_TYPE}
logging.debug("rpc send: %s" % json.dumps(data))
log.debug("rpc send: %s" % json.dumps(data))
try:
r = self.session.post(url, headers=headers, data=json.dumps(data))
except RequestsConnectionError:
@ -53,7 +55,7 @@ class EthJsonRpc(BaseClient):
raise BadStatusCodeError(r.status_code)
try:
response = r.json()
logging.debug("rpc response: %s" % response)
log.debug("rpc response: %s" % response)
except ValueError:
raise BadJsonError(r.text)
try:

@ -18,6 +18,8 @@ from mythril.mythril import Mythril
from mythril.version import VERSION
import mythril.support.signatures as sigs
log = logging.getLogger(__name__)
def exit_with_error(format_, message):
"""
@ -26,7 +28,7 @@ def exit_with_error(format_, message):
:param message:
"""
if format_ == "text" or format_ == "markdown":
logging.error(message)
log.error(message)
else:
result = {"success": False, "error": str(message), "issues": []}
print(json.dumps(result))
@ -199,7 +201,9 @@ def main():
options.add_argument(
"--enable-physics", action="store_true", help="enable graph physics simulation"
)
options.add_argument("-v", type=int, help="log level (0-2)", metavar="LOG_LEVEL")
options.add_argument(
"-v", type=int, help="log level (0-5)", metavar="LOG_LEVEL", default=2
)
options.add_argument(
"-q",
"--query-signature",
@ -254,11 +258,19 @@ def main():
sys.exit()
if args.v:
if 0 <= args.v < 3:
if 0 <= args.v < 6:
log_levels = [
logging.NOTSET,
logging.CRITICAL,
logging.ERROR,
logging.WARNING,
logging.INFO,
logging.DEBUG,
]
coloredlogs.install(
fmt="%(name)s[%(process)d] %(levelname)s %(message)s",
level=[logging.NOTSET, logging.INFO, logging.DEBUG][args.v],
fmt="%(name)s [%(levelname)s]: %(message)s", level=log_levels[args.v]
)
logging.getLogger("mythril").setLevel(log_levels[args.v])
else:
exit_with_error(
args.outform, "Invalid -v value, you can find valid values in usage"

@ -5,7 +5,7 @@ to get the necessary elements from the stack and determine the parameters for th
import logging
from typing import Union
from z3 import simplify, ExprRef, Extract
from mythril.laser.smt import simplify, Expression, symbol_factory
import mythril.laser.ethereum.util as util
from mythril.laser.ethereum.state.account import Account
from mythril.laser.ethereum.state.calldata import (
@ -17,6 +17,13 @@ from mythril.laser.ethereum.state.global_state import GlobalState
from mythril.support.loader import DynLoader
import re
"""
This module contains the business logic used by Instruction in instructions.py
to get the necessary elements from the stack and determine the parameters for the new global state.
"""
log = logging.getLogger(__name__)
def get_call_parameters(
global_state: GlobalState, dynamic_loader: DynLoader, with_value=False
@ -60,7 +67,9 @@ def get_call_parameters(
def get_callee_address(
global_state: GlobalState, dynamic_loader: DynLoader, symbolic_to_address: ExprRef
global_state: GlobalState,
dynamic_loader: DynLoader,
symbolic_to_address: Expression,
):
"""
Gets the address of the callee
@ -74,16 +83,16 @@ def get_callee_address(
try:
callee_address = hex(util.get_concrete_int(symbolic_to_address))
except TypeError:
logging.debug("Symbolic call encountered")
log.debug("Symbolic call encountered")
match = re.search(r"storage_(\d+)", str(simplify(symbolic_to_address)))
logging.debug("CALL to: " + str(simplify(symbolic_to_address)))
log.debug("CALL to: " + str(simplify(symbolic_to_address)))
if match is None or dynamic_loader is None:
raise ValueError()
index = int(match.group(1))
logging.debug("Dynamic contract address at storage index {}".format(index))
log.debug("Dynamic contract address at storage index {}".format(index))
# attempt to read the contract address from instance storage
try:
@ -92,7 +101,7 @@ def get_callee_address(
)
# TODO: verify whether this happens or not
except:
logging.debug("Error accessing contract storage.")
log.debug("Error accessing contract storage.")
raise ValueError
# testrpc simply returns the address, geth response is more elaborate.
@ -119,22 +128,22 @@ def get_callee_account(
return global_state.accounts[callee_address]
except KeyError:
# We have a valid call address, but contract is not in the modules list
logging.debug("Module with address " + callee_address + " not loaded.")
log.debug("Module with address " + callee_address + " not loaded.")
if dynamic_loader is None:
raise ValueError()
logging.debug("Attempting to load dependency")
log.debug("Attempting to load dependency")
try:
code = dynamic_loader.dynld(environment.active_account.address, callee_address)
except ValueError as error:
logging.debug("Unable to execute dynamic loader because: {}".format(str(error)))
log.debug("Unable to execute dynamic loader because: {}".format(str(error)))
raise error
if code is None:
logging.debug("No code returned, not a contract account?")
log.debug("No code returned, not a contract account?")
raise ValueError()
logging.debug("Dependency loaded: " + callee_address)
log.debug("Dependency loaded: " + callee_address)
callee_account = Account(
callee_address, code, callee_address, dynamic_loader=dynamic_loader
@ -146,8 +155,8 @@ def get_callee_account(
def get_call_data(
global_state: GlobalState,
memory_start: Union[int, ExprRef],
memory_size: Union[int, ExprRef],
memory_start: Union[int, Expression],
memory_size: Union[int, Expression],
):
"""
Gets call_data from the global_state
@ -158,19 +167,29 @@ def get_call_data(
"""
state = global_state.mstate
transaction_id = "{}_internalcall".format(global_state.current_transaction.id)
memory_start = (
symbol_factory.BitVecVal(memory_start, 256)
if isinstance(memory_start, int)
else memory_start
)
memory_size = (
symbol_factory.BitVecVal(memory_size, 256)
if isinstance(memory_size, int)
else memory_size
)
try:
calldata_from_mem = state.memory[
util.get_concrete_int(memory_start) : util.get_concrete_int(
memory_start + memory_size
)
]
i = 0
call_data = ConcreteCalldata(transaction_id, calldata_from_mem)
call_data_type = CalldataType.CONCRETE
logging.debug("Calldata: " + str(call_data))
log.debug("Calldata: " + str(call_data))
except TypeError:
logging.debug("Unsupported symbolic calldata offset")
log.debug("Unsupported symbolic calldata offset")
call_data_type = CalldataType.SYMBOLIC
call_data = SymbolicCalldata("{}_internalcall".format(transaction_id))

@ -6,28 +6,26 @@ from typing import Callable, List, Union
from functools import reduce
from ethereum import utils
from z3 import (
from mythril.laser.smt import (
Extract,
Expression,
UDiv,
simplify,
Concat,
ULT,
UGT,
BitVecRef,
BitVecNumRef,
Not,
BitVec,
is_true,
is_false,
is_expr,
ExprRef,
URem,
SRem,
BitVec,
is_true,
BitVecVal,
If,
BoolRef,
Bool,
Or,
Not,
)
from mythril.laser.smt import symbol_factory
import mythril.laser.ethereum.natives as natives
import mythril.laser.ethereum.util as helper
@ -53,6 +51,8 @@ from mythril.laser.ethereum.transaction import (
from mythril.support.loader import DynLoader
from mythril.analysis.solver import get_model
log = logging.getLogger(__name__)
TT256 = 2 ** 256
TT256M1 = 2 ** 256 - 1
@ -102,13 +102,11 @@ class StateTransition(object):
:return:
"""
global_state.mstate.check_gas()
if isinstance(global_state.current_transaction.gas_limit, BitVecRef):
try:
global_state.current_transaction.gas_limit = (
global_state.current_transaction.gas_limit.as_long()
)
except AttributeError:
if isinstance(global_state.current_transaction.gas_limit, BitVec):
value = global_state.current_transaction.gas_limit.value
if value is None:
return
global_state.current_transaction.gas_limit = value
if (
global_state.mstate.min_gas_used
>= global_state.current_transaction.gas_limit
@ -160,7 +158,7 @@ class Instruction:
def evaluate(self, global_state: GlobalState, post=False) -> List[GlobalState]:
""" Performs the mutation for this instruction """
# Generalize some ops
logging.debug("Evaluating {}".format(self.op_code))
log.debug("Evaluating {}".format(self.op_code))
op = self.op_code.lower()
if self.op_code.startswith("PUSH"):
op = "push"
@ -207,7 +205,9 @@ class Instruction:
raise VmException("Invalid Push instruction")
push_value += "0" * max(length_of_value - len(push_value), 0)
global_state.mstate.stack.append(BitVecVal(int(push_value, 16), 256))
global_state.mstate.stack.append(
symbol_factory.BitVecVal(int(push_value, 16), 256)
)
return [global_state]
@StateTransition()
@ -252,10 +252,18 @@ class Instruction:
"""
stack = global_state.mstate.stack
op1, op2 = stack.pop(), stack.pop()
if type(op1) == BoolRef:
op1 = If(op1, BitVecVal(1, 256), BitVecVal(0, 256))
if type(op2) == BoolRef:
op2 = If(op2, BitVecVal(1, 256), BitVecVal(0, 256))
if isinstance(op1, Bool):
op1 = If(
op1, symbol_factory.BitVecVal(1, 256), symbol_factory.BitVecVal(0, 256)
)
if isinstance(op2, Bool):
op2 = If(
op2, symbol_factory.BitVecVal(1, 256), symbol_factory.BitVecVal(0, 256)
)
if not isinstance(op1, Expression):
op1 = symbol_factory.BitVecVal(op1, 256)
if not isinstance(op2, Expression):
op2 = symbol_factory.BitVecVal(op2, 256)
stack.append(op1 & op2)
return [global_state]
@ -270,11 +278,15 @@ class Instruction:
stack = global_state.mstate.stack
op1, op2 = stack.pop(), stack.pop()
if type(op1) == BoolRef:
op1 = If(op1, BitVecVal(1, 256), BitVecVal(0, 256))
if isinstance(op1, Bool):
op1 = If(
op1, symbol_factory.BitVecVal(1, 256), symbol_factory.BitVecVal(0, 256)
)
if type(op2) == BoolRef:
op2 = If(op2, BitVecVal(1, 256), BitVecVal(0, 256))
if isinstance(op2, Bool):
op2 = If(
op2, symbol_factory.BitVecVal(1, 256), symbol_factory.BitVecVal(0, 256)
)
stack.append(op1 | op2)
@ -299,7 +311,7 @@ class Instruction:
:return:
"""
mstate = global_state.mstate
mstate.stack.append(TT256M1 - mstate.stack.pop())
mstate.stack.append(symbol_factory.BitVecVal(TT256M1, 256) - mstate.stack.pop())
return [global_state]
@StateTransition()
@ -311,19 +323,22 @@ class Instruction:
"""
mstate = global_state.mstate
op0, op1 = mstate.stack.pop(), mstate.stack.pop()
if not isinstance(op1, ExprRef):
op1 = BitVecVal(op1, 256)
if not isinstance(op1, Expression):
op1 = symbol_factory.BitVecVal(op1, 256)
try:
index = util.get_concrete_int(op0)
offset = (31 - index) * 8
if offset >= 0:
result = simplify(
Concat(BitVecVal(0, 248), Extract(offset + 7, offset, op1))
Concat(
symbol_factory.BitVecVal(0, 248),
Extract(offset + 7, offset, op1),
)
)
else:
result = 0
except TypeError:
logging.debug("BYTE: Unsupported symbolic byte offset")
log.debug("BYTE: Unsupported symbolic byte offset")
result = global_state.new_bitvec(
str(simplify(op1)) + "[" + str(simplify(op0)) + "]", 256
)
@ -389,7 +404,7 @@ class Instruction:
util.pop_bitvec(global_state.mstate),
)
if op1 == 0:
global_state.mstate.stack.append(BitVecVal(0, 256))
global_state.mstate.stack.append(symbol_factory.BitVecVal(0, 256))
else:
global_state.mstate.stack.append(UDiv(op0, op1))
return [global_state]
@ -406,7 +421,7 @@ class Instruction:
util.pop_bitvec(global_state.mstate),
)
if s1 == 0:
global_state.mstate.stack.append(BitVecVal(0, 256))
global_state.mstate.stack.append(symbol_factory.BitVecVal(0, 256))
else:
global_state.mstate.stack.append(s0 / s1)
return [global_state]
@ -479,7 +494,7 @@ class Instruction:
state = global_state.mstate
base, exponent = util.pop_bitvec(state), util.pop_bitvec(state)
if (type(base) != BitVecNumRef) or (type(exponent) != BitVecNumRef):
if base.symbolic or exponent.symbolic:
state.stack.append(
global_state.new_bitvec(
"(" + str(simplify(base)) + ")**(" + str(simplify(exponent)) + ")",
@ -488,7 +503,7 @@ class Instruction:
)
else:
state.stack.append(pow(base.as_long(), exponent.as_long(), 2 ** 256))
state.stack.append(pow(base.value, exponent.value, 2 ** 256))
return [global_state]
@ -540,7 +555,8 @@ class Instruction:
:return:
"""
state = global_state.mstate
exp = UGT(util.pop_bitvec(state), util.pop_bitvec(state))
op1, op2 = util.pop_bitvec(state), util.pop_bitvec(state)
exp = UGT(op1, op2)
state.stack.append(exp)
return [global_state]
@ -581,11 +597,15 @@ class Instruction:
op1 = state.stack.pop()
op2 = state.stack.pop()
if type(op1) == BoolRef:
op1 = If(op1, BitVecVal(1, 256), BitVecVal(0, 256))
if isinstance(op1, Bool):
op1 = If(
op1, symbol_factory.BitVecVal(1, 256), symbol_factory.BitVecVal(0, 256)
)
if type(op2) == BoolRef:
op2 = If(op2, BitVecVal(1, 256), BitVecVal(0, 256))
if isinstance(op2, Bool):
op2 = If(
op2, symbol_factory.BitVecVal(1, 256), symbol_factory.BitVecVal(0, 256)
)
exp = op1 == op2
@ -602,7 +622,7 @@ class Instruction:
state = global_state.mstate
val = state.stack.pop()
exp = val == False if type(val) == BoolRef else val == 0
exp = (val == False) if isinstance(val, Bool) else val == 0
state.stack.append(exp)
return [global_state]
@ -664,14 +684,14 @@ class Instruction:
try:
mstart = util.get_concrete_int(op0)
except TypeError:
logging.debug("Unsupported symbolic memory offset in CALLDATACOPY")
log.debug("Unsupported symbolic memory offset in CALLDATACOPY")
return [global_state]
dstart_sym = False
try:
dstart = util.get_concrete_int(op1)
except TypeError:
logging.debug("Unsupported symbolic calldata offset in CALLDATACOPY")
log.debug("Unsupported symbolic calldata offset in CALLDATACOPY")
dstart = simplify(op1)
dstart_sym = True
@ -679,7 +699,7 @@ class Instruction:
try:
size = util.get_concrete_int(op2)
except TypeError:
logging.debug("Unsupported symbolic size in CALLDATACOPY")
log.debug("Unsupported symbolic size in CALLDATACOPY")
size = simplify(op2)
size_sym = True
@ -701,7 +721,7 @@ class Instruction:
try:
state.mem_extend(mstart, size)
except TypeError:
logging.debug(
log.debug(
"Memory allocation error: mstart = "
+ str(mstart)
+ ", size = "
@ -734,7 +754,7 @@ class Instruction:
state.memory[i + mstart] = new_memory[i]
except IndexError:
logging.debug("Exception copying calldata to memory")
log.debug("Exception copying calldata to memory")
state.memory[mstart] = global_state.new_bitvec(
"calldata_"
@ -826,9 +846,11 @@ class Instruction:
index, length = util.get_concrete_int(op0), util.get_concrete_int(op1)
except TypeError:
# Can't access symbolic memory offsets
if is_expr(op0):
if isinstance(op0, Expression):
op0 = simplify(op0)
state.stack.append(BitVec("KECCAC_mem[" + str(op0) + "]", 256))
state.stack.append(
symbol_factory.BitVecSym("KECCAC_mem[" + str(op0) + "]", 256)
)
state.min_gas_used += OPCODE_GAS["SHA3"][0]
state.max_gas_used += OPCODE_GAS["SHA3"][1]
return [global_state]
@ -850,15 +872,17 @@ class Instruction:
except TypeError:
argument = str(state.memory[index]).replace(" ", "_")
result = BitVec("KECCAC[{}]".format(argument), 256)
result = symbol_factory.BitVecSym("KECCAC[{}]".format(argument), 256)
keccak_function_manager.add_keccak(result, state.memory[index])
state.stack.append(result)
return [global_state]
keccak = utils.sha3(utils.bytearray_to_bytestr(data))
logging.debug("Computed SHA3 Hash: " + str(binascii.hexlify(keccak)))
log.debug("Computed SHA3 Hash: " + str(binascii.hexlify(keccak)))
state.stack.append(BitVecVal(util.concrete_int_from_bytes(keccak, 0), 256))
state.stack.append(
symbol_factory.BitVecVal(util.concrete_int_from_bytes(keccak, 0), 256)
)
return [global_state]
@StateTransition()
@ -887,7 +911,7 @@ class Instruction:
try:
concrete_memory_offset = helper.get_concrete_int(memory_offset)
except TypeError:
logging.debug("Unsupported symbolic memory offset in CODECOPY")
log.debug("Unsupported symbolic memory offset in CODECOPY")
return [global_state]
try:
@ -910,7 +934,7 @@ class Instruction:
try:
concrete_code_offset = helper.get_concrete_int(code_offset)
except TypeError:
logging.debug("Unsupported symbolic code offset in CODECOPY")
log.debug("Unsupported symbolic code offset in CODECOPY")
global_state.mstate.mem_extend(concrete_memory_offset, size)
for i in range(size):
global_state.mstate.memory[
@ -975,14 +999,14 @@ class Instruction:
try:
addr = hex(helper.get_concrete_int(addr))
except TypeError:
logging.debug("unsupported symbolic address for EXTCODESIZE")
log.debug("unsupported symbolic address for EXTCODESIZE")
state.stack.append(global_state.new_bitvec("extcodesize_" + str(addr), 256))
return [global_state]
try:
code = self.dynamic_loader.dynld(environment.active_account.address, addr)
except (ValueError, AttributeError) as e:
logging.debug("error accessing contract storage due to: " + str(e))
log.debug("error accessing contract storage due to: " + str(e))
state.stack.append(global_state.new_bitvec("extcodesize_" + str(addr), 256))
return [global_state]
@ -1107,12 +1131,12 @@ class Instruction:
state = global_state.mstate
op0 = state.stack.pop()
logging.debug("MLOAD[" + str(op0) + "]")
log.debug("MLOAD[" + str(op0) + "]")
try:
offset = util.get_concrete_int(op0)
except TypeError:
logging.debug("Can't MLOAD from symbolic index")
log.debug("Can't MLOAD from symbolic index")
data = global_state.new_bitvec("mem[" + str(simplify(op0)) + "]", 256)
state.stack.append(data)
return [global_state]
@ -1120,7 +1144,7 @@ class Instruction:
state.mem_extend(offset, 32)
data = state.memory.get_word_at(offset)
logging.debug("Load from memory[" + str(offset) + "]: " + str(data))
log.debug("Load from memory[" + str(offset) + "]: " + str(data))
state.stack.append(data)
return [global_state]
@ -1138,17 +1162,15 @@ class Instruction:
try:
mstart = util.get_concrete_int(op0)
except TypeError:
logging.debug("MSTORE to symbolic index. Not supported")
log.debug("MSTORE to symbolic index. Not supported")
return [global_state]
try:
state.mem_extend(mstart, 32)
except Exception:
logging.debug(
"Error extending memory, mstart = " + str(mstart) + ", size = 32"
)
log.debug("Error extending memory, mstart = " + str(mstart) + ", size = 32")
logging.debug("MSTORE to mem[" + str(mstart) + "]: " + str(value))
log.debug("MSTORE to mem[" + str(mstart) + "]: " + str(value))
state.memory.write_word_at(mstart, value)
@ -1167,7 +1189,7 @@ class Instruction:
try:
offset = util.get_concrete_int(op0)
except TypeError:
logging.debug("MSTORE to symbolic index. Not supported")
log.debug("MSTORE to symbolic index. Not supported")
return [global_state]
state.mem_extend(offset, 1)
@ -1176,7 +1198,7 @@ class Instruction:
value_to_write = util.get_concrete_int(value) ^ 0xFF
except TypeError: # BitVec
value_to_write = Extract(7, 0, value)
logging.debug("MSTORE8 to mem[" + str(offset) + "]: " + str(value_to_write))
log.debug("MSTORE8 to mem[" + str(offset) + "]: " + str(value_to_write))
state.memory[offset] = value_to_write
@ -1193,7 +1215,7 @@ class Instruction:
state = global_state.mstate
index = state.stack.pop()
logging.debug("Storage access at index " + str(index))
log.debug("Storage access at index " + str(index))
try:
index = util.get_concrete_int(index)
@ -1233,7 +1255,7 @@ class Instruction:
@staticmethod
def _sload_helper(
global_state: GlobalState, index: Union[int, ExprRef], constraints=None
global_state: GlobalState, index: Union[int, Expression], constraints=None
):
try:
data = global_state.environment.active_account.storage[index]
@ -1266,7 +1288,7 @@ class Instruction:
global keccak_function_manager
state = global_state.mstate
index, value = state.stack.pop(), state.stack.pop()
logging.debug("Write to storage[" + str(index) + "]")
log.debug("Write to storage[" + str(index) + "]")
try:
index = util.get_concrete_int(index)
@ -1327,10 +1349,10 @@ class Instruction:
] = global_state.environment.active_account
global_state.environment.active_account.storage[index] = (
value if not isinstance(value, ExprRef) else simplify(value)
value if not isinstance(value, Expression) else simplify(value)
)
except KeyError:
logging.debug("Error writing to storage: Invalid index")
log.debug("Error writing to storage: Invalid index")
if constraint is not None:
global_state.mstate.constraints.append(constraint)
@ -1393,7 +1415,7 @@ class Instruction:
try:
jump_addr = util.get_concrete_int(op0)
except TypeError:
logging.debug("Skipping JUMPI to invalid destination.")
log.debug("Skipping JUMPI to invalid destination.")
global_state.mstate.pc += 1
global_state.mstate.min_gas_used += min_gas
global_state.mstate.max_gas_used += max_gas
@ -1401,11 +1423,11 @@ class Instruction:
# False case
negated = (
simplify(Not(condition)) if type(condition) == BoolRef else condition == 0
simplify(Not(condition)) if isinstance(condition, Bool) else condition == 0
)
if (type(negated) == bool and negated) or (
type(negated) == BoolRef and not is_false(negated)
isinstance(condition, Bool) and not is_false(negated)
):
new_state = copy(global_state)
# add JUMPI gas cost
@ -1418,22 +1440,22 @@ class Instruction:
new_state.mstate.constraints.append(negated)
states.append(new_state)
else:
logging.debug("Pruned unreachable states.")
log.debug("Pruned unreachable states.")
# True case
# Get jump destination
index = util.get_instruction_index(disassembly.instruction_list, jump_addr)
if not index:
logging.debug("Invalid jump destination: " + str(jump_addr))
log.debug("Invalid jump destination: " + str(jump_addr))
return states
instr = disassembly.instruction_list[index]
condi = simplify(condition) if type(condition) == BoolRef else condition != 0
condi = simplify(condition) if isinstance(condition, Bool) else condition != 0
if instr["opcode"] == "JUMPDEST":
if (type(condi) == bool and condi) or (
type(condi) == BoolRef and not is_false(condi)
isinstance(condi, Bool) and not is_false(condi)
):
new_state = copy(global_state)
# add JUMPI gas cost
@ -1446,7 +1468,7 @@ class Instruction:
new_state.mstate.constraints.append(condi)
states.append(new_state)
else:
logging.debug("Pruned unreachable states.")
log.debug("Pruned unreachable states.")
del global_state
return states
@ -1524,7 +1546,7 @@ class Instruction:
util.get_concrete_int(offset) : util.get_concrete_int(offset + length)
]
except TypeError:
logging.debug("Return with symbolic length or offset. Not supported")
log.debug("Return with symbolic length or offset. Not supported")
global_state.current_transaction.end(global_state, return_data)
@StateTransition()
@ -1537,8 +1559,8 @@ class Instruction:
account_created = False
# Often the target of the suicide instruction will be symbolic
# If it isn't then well transfer the balance to the indicated contract
if isinstance(target, BitVecNumRef):
target = "0x" + hex(target.as_long())[-40:]
if isinstance(target, BitVec) and not target.symbolic:
target = "0x" + hex(target.value)[-40:]
if isinstance(target, str):
try:
global_state.world_state[
@ -1576,7 +1598,7 @@ class Instruction:
util.get_concrete_int(offset) : util.get_concrete_int(offset + length)
]
except TypeError:
logging.debug("Return with symbolic length or offset. Not supported")
log.debug("Return with symbolic length or offset. Not supported")
global_state.current_transaction.end(
global_state, return_data=return_data, revert=True
)
@ -1621,7 +1643,7 @@ class Instruction:
global_state, self.dynamic_loader, True
)
except ValueError as e:
logging.debug(
log.debug(
"Could not determine required parameters for call, putting fresh symbol on the stack. \n{}".format(
e
)
@ -1636,16 +1658,16 @@ class Instruction:
)
if 0 < int(callee_address, 16) < 5:
logging.debug("Native contract called: " + callee_address)
log.debug("Native contract called: " + callee_address)
if call_data == [] and call_data_type == CalldataType.SYMBOLIC:
logging.debug("CALL with symbolic data not supported")
log.debug("CALL with symbolic data not supported")
return [global_state]
try:
mem_out_start = helper.get_concrete_int(memory_out_offset)
mem_out_sz = memory_out_size.as_long()
mem_out_sz = helper.get_concrete_int(memory_out_size)
except TypeError:
logging.debug("CALL with symbolic start or offset not supported")
log.debug("CALL with symbolic start or offset not supported")
return [global_state]
contract_list = ["ecrecover", "sha256", "ripemd160", "identity"]
@ -1684,7 +1706,9 @@ class Instruction:
gas_price=environment.gasprice,
gas_limit=gas,
origin=environment.origin,
caller=BitVecVal(int(environment.active_account.address, 16), 256),
caller=symbol_factory.BitVecVal(
int(environment.active_account.address, 16), 256
),
callee_account=callee_account,
call_data=call_data,
call_data_type=call_data_type,
@ -1706,7 +1730,7 @@ class Instruction:
global_state, self.dynamic_loader, True
)
except ValueError as e:
logging.debug(
log.debug(
"Could not determine required parameters for call, putting fresh symbol on the stack. \n{}".format(
e
)
@ -1729,12 +1753,12 @@ class Instruction:
try:
memory_out_offset = (
util.get_concrete_int(memory_out_offset)
if isinstance(memory_out_offset, ExprRef)
if isinstance(memory_out_offset, Expression)
else memory_out_offset
)
memory_out_size = (
util.get_concrete_int(memory_out_size)
if isinstance(memory_out_size, ExprRef)
if isinstance(memory_out_size, Expression)
else memory_out_size
)
except TypeError:
@ -1774,7 +1798,7 @@ class Instruction:
global_state, self.dynamic_loader, True
)
except ValueError as e:
logging.debug(
log.debug(
"Could not determine required parameters for call, putting fresh symbol on the stack. \n{}".format(
e
)
@ -1812,7 +1836,7 @@ class Instruction:
global_state, self.dynamic_loader, True
)
except ValueError as e:
logging.debug(
log.debug(
"Could not determine required parameters for call, putting fresh symbol on the stack. \n{}".format(
e
)
@ -1878,7 +1902,7 @@ class Instruction:
global_state, self.dynamic_loader
)
except ValueError as e:
logging.debug(
log.debug(
"Could not determine required parameters for call, putting fresh symbol on the stack. \n{}".format(
e
)
@ -1916,7 +1940,7 @@ class Instruction:
global_state, self.dynamic_loader
)
except ValueError as e:
logging.debug(
log.debug(
"Could not determine required parameters for call, putting fresh symbol on the stack. \n{}".format(
e
)
@ -1938,12 +1962,12 @@ class Instruction:
try:
memory_out_offset = (
util.get_concrete_int(memory_out_offset)
if isinstance(memory_out_offset, ExprRef)
if isinstance(memory_out_offset, Expression)
else memory_out_offset
)
memory_out_size = (
util.get_concrete_int(memory_out_size)
if isinstance(memory_out_size, ExprRef)
if isinstance(memory_out_size, Expression)
else memory_out_size
)
except TypeError:

@ -1,36 +1,18 @@
from z3 import ExprRef
from mythril.laser.smt import Expression
class KeccakFunctionManager:
"""
"""
def __init__(self):
self.keccak_expression_mapping = {}
def is_keccak(self, expression: ExprRef) -> bool:
"""
:param expression:
:return:
"""
def is_keccak(self, expression: Expression) -> bool:
return str(expression) in self.keccak_expression_mapping.keys()
def get_argument(self, expression: str) -> ExprRef:
"""
:param expression:
:return:
"""
def get_argument(self, expression: str) -> Expression:
if not self.is_keccak(expression):
raise ValueError("Expression is not a recognized keccac result")
return self.keccak_expression_mapping[str(expression)][1]
def add_keccak(self, expression: ExprRef, argument: ExprRef) -> None:
"""
:param expression:
:param argument:
"""
def add_keccak(self, expression: Expression, argument: Expression) -> None:
index = str(expression)
self.keccak_expression_mapping[index] = (expression, argument)

@ -10,7 +10,9 @@ from rlp.utils import ALL_BYTES
from mythril.laser.ethereum.state.calldata import BaseCalldata, ConcreteCalldata
from mythril.laser.ethereum.util import bytearray_to_int, sha3, get_concrete_int
from z3 import Concat, simplify
from mythril.laser.smt import Concat, simplify
log = logging.getLogger(__name__)
class NativeContractException(Exception):
@ -67,7 +69,7 @@ def ecrecover(data: Union[bytes, str, List[int]]) -> bytes:
try:
pub = ecrecover_to_pub(message, v, r, s)
except Exception as e:
logging.debug("An error has occured while extracting public key: " + e)
log.debug("An error has occured while extracting public key: " + e)
return []
o = [0] * 12 + [x for x in sha3(pub)[-20:]]
return o

@ -1,7 +1,8 @@
from typing import Dict, Union, Any, KeysView
from z3 import BitVec, BitVecVal, ExprRef
from z3 import ExprRef
from mythril.laser.smt import symbol_factory
from mythril.disassembler.disassembly import Disassembly
@ -41,7 +42,7 @@ class Storage:
pass
if self.concrete:
return 0
self._storage[item] = BitVecVal(0, 256)
self._storage[item] = symbol_factory.BitVecVal(0, 256)
return self._storage[item]
def __setitem__(self, key: str, value: ExprRef) -> None:
@ -79,7 +80,11 @@ class Account:
"""
self.nonce = 0
self.code = code or Disassembly("")
self.balance = balance if balance else BitVec("{}_balance".format(address), 256)
self.balance = (
balance
if balance
else symbol_factory.BitVecSym("{}_balance".format(address), 256)
)
self.storage = Storage(
concrete_storage, address=address, dynamic_loader=dynamic_loader
)

@ -1,23 +1,14 @@
from enum import Enum
from typing import Union, Any
from z3 import (
BitVecVal,
BitVecRef,
BitVec,
simplify,
Concat,
If,
ExprRef,
K,
Array,
BitVecSort,
Store,
is_bv,
)
from z3.z3types import Z3Exception, Model
from mythril.laser.smt import K, Array, If, simplify, Concat, Expression, BitVec
from mythril.laser.smt import symbol_factory
from mythril.laser.ethereum.util import get_concrete_int
from z3 import Model
from z3.z3types import Z3Exception
class CalldataType(Enum):
CONCRETE = 1
@ -34,22 +25,22 @@ class BaseCalldata:
self.tx_id = tx_id
@property
def calldatasize(self) -> ExprRef:
def calldatasize(self) -> Expression:
"""
:return: Calldata size for this calldata object
"""
result = self.size
if isinstance(result, int):
return BitVecVal(result, 256)
return symbol_factory.BitVecVal(result, 256)
return result
def get_word_at(self, offset: int) -> ExprRef:
def get_word_at(self, offset: int) -> Expression:
""" Gets word at offset"""
parts = self[offset : offset + 32]
return simplify(Concat(parts))
def __getitem__(self, item: Union[int, slice]) -> Any:
if isinstance(item, int) or isinstance(item, ExprRef):
if isinstance(item, int) or isinstance(item, Expression):
return self._load(item)
if isinstance(item, slice):
@ -59,13 +50,15 @@ class BaseCalldata:
try:
current_index = (
start if isinstance(start, BitVecRef) else BitVecVal(start, 256)
start
if isinstance(start, Expression)
else symbol_factory.BitVecVal(start, 256)
)
parts = []
while simplify(current_index != stop):
element = self._load(current_index)
if not isinstance(element, ExprRef):
element = BitVecVal(element, 8)
if not isinstance(element, Expression):
element = symbol_factory.BitVecVal(element, 8)
parts.append(element)
current_index = simplify(current_index + step)
@ -76,11 +69,11 @@ class BaseCalldata:
raise ValueError
def _load(self, item: Union[int, ExprRef]) -> Any:
def _load(self, item: Union[int, Expression]) -> Any:
raise NotImplementedError()
@property
def size(self) -> Union[ExprRef, int]:
def size(self) -> Union[Expression, int]:
""" Returns the exact size of this calldata, this is not normalized"""
raise NotImplementedError()
@ -100,13 +93,19 @@ class ConcreteCalldata(BaseCalldata):
:param calldata: The concrete calldata content
"""
self._concrete_calldata = calldata
self._calldata = K(BitVecSort(256), BitVecVal(0, 8))
self._calldata = K(256, 8, 0)
for i, element in enumerate(calldata, 0):
self._calldata = Store(self._calldata, i, element)
element = (
symbol_factory.BitVecVal(element, 8)
if isinstance(element, int)
else element
)
self._calldata[symbol_factory.BitVecVal(i, 256)] = element
super().__init__(tx_id)
def _load(self, item: Union[int, ExprRef]) -> BitVecSort(8):
item = BitVecVal(item, 256) if isinstance(item, int) else item
def _load(self, item: Union[int, Expression]) -> BitVec:
item = symbol_factory.BitVecVal(item, 256) if isinstance(item, int) else item
return simplify(self._calldata[item])
def concrete(self, model: Model) -> list:
@ -140,14 +139,14 @@ class BasicConcreteCalldata(BaseCalldata):
self._calldata = calldata
super().__init__(tx_id)
def _load(self, item: Union[int, ExprRef]) -> Any:
def _load(self, item: Union[int, Expression]) -> Any:
if isinstance(item, int):
try:
return self._calldata[item]
except IndexError:
return 0
value = BitVecVal(0x0, 8)
value = symbol_factory.BitVecVal(0x0, 8)
for i in range(self.size):
value = If(item == i, self._calldata[i], value)
return value
@ -178,15 +177,19 @@ class SymbolicCalldata(BaseCalldata):
Initializes the SymbolicCalldata object
:param tx_id: Id of the transaction that the calldata is for.
"""
self._size = BitVec(str(tx_id) + "_calldatasize", 256)
self._calldata = Array(
"{}_calldata".format(tx_id), BitVecSort(256), BitVecSort(8)
)
self._size = symbol_factory.BitVecSym(str(tx_id) + "_calldatasize", 256)
self._calldata = Array("{}_calldata".format(tx_id), 256, 8)
super().__init__(tx_id)
def _load(self, item: Union[int, ExprRef]) -> Any:
item = BitVecVal(item, 256) if isinstance(item, int) else item
return simplify(If(item < self._size, simplify(self._calldata[item]), 0))
def _load(self, item: Union[int, Expression]) -> Any:
item = symbol_factory.BitVecVal(item, 256) if isinstance(item, int) else item
return simplify(
If(
item < self._size,
simplify(self._calldata[item]),
symbol_factory.BitVecVal(0, 8),
)
)
def concrete(self, model: Model) -> list:
"""
@ -194,17 +197,17 @@ class SymbolicCalldata(BaseCalldata):
:param model:
:return:
"""
concrete_length = get_concrete_int(model.eval(self.size, model_completion=True))
concrete_length = model.eval(self.size.raw, model_completion=True).as_long()
result = []
for i in range(concrete_length):
value = self._load(i)
c_value = get_concrete_int(model.eval(value, model_completion=True))
c_value = model.eval(value.raw, model_completion=True).as_long()
result.append(c_value)
return result
@property
def size(self) -> ExprRef:
def size(self) -> Expression:
"""
:return:
@ -225,12 +228,12 @@ class BasicSymbolicCalldata(BaseCalldata):
self._size = BitVec(str(tx_id) + "_calldatasize", 256)
super().__init__(tx_id)
def _load(self, item: Union[int, ExprRef], clean=False) -> Any:
x = BitVecVal(item, 256) if isinstance(item, int) else item
def _load(self, item: Union[int, Expression], clean=False) -> Any:
x = symbol_factory.BitVecVal(item, 256) if isinstance(item, int) else item
symbolic_base_value = If(
x >= self._size,
BitVecVal(0, 8),
symbol_factory.BitVecVal(0, 8),
BitVec("{}_calldata_{}".format(self.tx_id, str(item)), 8),
)
return_value = symbolic_base_value
@ -257,7 +260,7 @@ class BasicSymbolicCalldata(BaseCalldata):
return result
@property
def size(self) -> ExprRef:
def size(self) -> Expression:
"""
:return:

@ -2,6 +2,7 @@ from typing import Dict
from z3 import ExprRef, BitVecVal
from mythril.laser.smt import symbol_factory
from mythril.laser.ethereum.state.account import Account
from mythril.laser.ethereum.state.calldata import CalldataType, BaseCalldata
@ -27,7 +28,7 @@ class Environment:
self.active_account = active_account
self.active_function_name = ""
self.address = BitVecVal(int(active_account.address, 16), 256)
self.address = symbol_factory.BitVecVal(int(active_account.address, 16), 256)
# Ib
self.code = active_account.code if code is None else code

@ -1,8 +1,9 @@
from typing import Dict, Union, List
from typing import Dict, Union, List, Iterable
from copy import copy, deepcopy
from z3 import BitVec
from mythril.laser.smt import symbol_factory
from mythril.laser.ethereum.cfg import Node
from mythril.laser.ethereum.state.environment import Environment
from mythril.laser.ethereum.state.machine_state import MachineState
@ -96,7 +97,7 @@ class GlobalState:
:return:
"""
transaction_id = self.current_transaction.id
return BitVec("{}_{}".format(transaction_id, name), size)
return symbol_factory.BitVecSym("{}_{}".format(transaction_id, name), size)
def annotate(self, annotation: StateAnnotation) -> None:
"""
@ -115,3 +116,12 @@ class GlobalState:
:return:
"""
return self._annotations
def get_annotations(self, annotation_type: type) -> Iterable[StateAnnotation]:
"""
Filters annotations for the queried annotation type. Designed particularly for
modules with annotations: globalstate.get_annotations(MySpecificModuleAnnotation)
:param annotation_type: The type to filter annotations for
:return: filter of matching annotations
"""
return filter(lambda x: isinstance(x, annotation_type), self.annotations)

@ -1,6 +1,14 @@
from typing import Union
from z3 import BitVecRef, Extract, BitVecVal, If, BoolRef, Z3Exception, simplify, Concat
from z3 import Z3Exception
from mythril.laser.smt import (
BitVec,
symbol_factory,
If,
Concat,
simplify,
Bool,
Extract,
)
from mythril.laser.ethereum import util
@ -21,7 +29,7 @@ class Memory:
"""
self._memory.extend(bytearray(size))
def get_word_at(self, index: int) -> Union[int, BitVecRef]:
def get_word_at(self, index: int) -> Union[int, BitVec]:
"""
Access a word from a specified memory index
:param index: integer representing the index to access
@ -31,11 +39,11 @@ class Memory:
return util.concrete_int_from_bytes(
bytes([util.get_concrete_int(b) for b in self[index : index + 32]]), 0
)
except TypeError:
except:
result = simplify(
Concat(
[
b if isinstance(b, BitVecRef) else BitVecVal(b, 8)
b if isinstance(b, BitVec) else symbol_factory.BitVecVal(b, 8)
for b in self[index : index + 32]
]
)
@ -43,9 +51,7 @@ class Memory:
assert result.size() == 256
return result
def write_word_at(
self, index: int, value: Union[int, BitVecRef, bool, BoolRef]
) -> None:
def write_word_at(self, index: int, value: Union[int, BitVec, bool, Bool]) -> None:
"""
Writes a 32 byte word to memory at the specified index`
:param index: index to write to
@ -64,8 +70,12 @@ class Memory:
assert len(_bytes) == 32
self[index : index + 32] = _bytes
except (Z3Exception, AttributeError): # BitVector or BoolRef
if isinstance(value, BoolRef):
value_to_write = If(value, BitVecVal(1, 256), BitVecVal(0, 256))
if isinstance(value, Bool):
value_to_write = If(
value,
symbol_factory.BitVecVal(1, 256),
symbol_factory.BitVecVal(0, 256),
)
else:
value_to_write = value
assert value_to_write.size() == 256
@ -73,7 +83,7 @@ class Memory:
for i in range(0, value_to_write.size(), 8):
self[index + 31 - (i // 8)] = Extract(i + 7, i, value_to_write)
def __getitem__(self, item: Union[int, slice]) -> Union[BitVecRef, int, list]:
def __getitem__(self, item: Union[int, slice]) -> Union[BitVec, int, list]:
if isinstance(item, slice):
start, step, stop = item.start, item.step, item.stop
if start is None:
@ -89,7 +99,7 @@ class Memory:
except IndexError:
return 0
def __setitem__(self, key: Union[int, slice], value: Union[BitVecRef, int, list]):
def __setitem__(self, key: Union[int, slice], value: Union[BitVec, int, list]):
if isinstance(key, slice):
start, step, stop = key.start, key.step, key.stop
@ -106,6 +116,6 @@ class Memory:
else:
if isinstance(value, int):
assert 0 <= value <= 0xFF
if isinstance(value, BitVecRef):
if isinstance(value, BitVec):
assert value.size() == 8
self._memory[key] = value

@ -1,6 +1,6 @@
from copy import copy
from random import randint
from typing import List
from typing import List, Iterator
from mythril.laser.ethereum.state.account import Account
from mythril.laser.ethereum.state.annotation import StateAnnotation
@ -91,6 +91,15 @@ class WorldState:
"""
return self._annotations
def get_annotations(self, annotation_type: type) -> Iterator[StateAnnotation]:
"""
Filters annotations for the queried annotation type. Designed particularly for
modules with annotations: worldstate.get_annotations(MySpecificModuleAnnotation)
:param annotation_type: The type to filter annotations for
:return: filter of matching annotations
"""
return filter(lambda x: isinstance(x, annotation_type), self.annotations)
def _generate_new_address(self) -> str:
""" Generates a new address for the global state"""
while True:

@ -1,29 +1,29 @@
import logging
from collections import defaultdict
from ethereum.opcodes import opcodes
from copy import copy
from datetime import datetime, timedelta
from functools import reduce
from typing import List, Tuple, Union, Callable, Dict
from mythril.analysis.security import get_detection_modules
from mythril.disassembler.disassembly import Disassembly
from mythril.laser.ethereum.cfg import NodeFlags, Node, Edge, JumpType
from mythril.laser.ethereum.evm_exceptions import StackUnderflowException
from mythril.laser.ethereum.evm_exceptions import VmException
from mythril.laser.ethereum.instructions import Instruction
from mythril.laser.ethereum.state.account import Account
from mythril.laser.ethereum.state.world_state import WorldState
from mythril.laser.ethereum.state.global_state import GlobalState
from mythril.laser.ethereum.state.world_state import WorldState
from mythril.laser.ethereum.strategy.basic import DepthFirstSearchStrategy
from mythril.laser.ethereum.transaction import (
TransactionStartSignal,
TransactionEndSignal,
ContractCreationTransaction,
)
from mythril.laser.ethereum.evm_exceptions import StackUnderflowException
from mythril.laser.ethereum.instructions import Instruction
from mythril.laser.ethereum.cfg import NodeFlags, Node, Edge, JumpType
from mythril.laser.ethereum.strategy.basic import DepthFirstSearchStrategy
from datetime import datetime, timedelta
from copy import copy
from mythril.laser.ethereum.transaction import (
execute_contract_creation,
execute_message_call,
)
from functools import reduce
from mythril.laser.ethereum.evm_exceptions import VmException
log = logging.getLogger(__name__)
class SVMError(Exception):
@ -76,16 +76,9 @@ class LaserEVM:
self.pre_hooks = defaultdict(list)
self.post_hooks = defaultdict(list)
logging.info(
"LASER EVM initialized with dynamic loader: " + str(dynamic_loader)
)
log.info("LASER EVM initialized with dynamic loader: " + str(dynamic_loader))
def register_hooks(self, hook_type: str, hook_dict: Dict[str, List[Callable]]):
"""
:param hook_type:
:param hook_dict:
"""
if hook_type == "pre":
entrypoint = self.pre_hooks
elif hook_type == "post":
@ -100,48 +93,38 @@ class LaserEVM:
@property
def accounts(self) -> Dict[str, Account]:
"""
:return:
"""
return self.world_state.accounts
def sym_exec(
self, main_address=None, creation_code=None, contract_name=None
) -> None:
"""
:param main_address:
:param creation_code:
:param contract_name:
"""
logging.debug("Starting LASER execution")
log.debug("Starting LASER execution")
self.time = datetime.now()
if main_address:
logging.info("Starting message call transaction to {}".format(main_address))
log.info("Starting message call transaction to {}".format(main_address))
self._execute_transactions(main_address)
elif creation_code:
logging.info("Starting contract creation transaction")
log.info("Starting contract creation transaction")
created_account = execute_contract_creation(
self, creation_code, contract_name
)
logging.info(
log.info(
"Finished contract creation, found {} open states".format(
len(self.open_states)
)
)
if len(self.open_states) == 0:
logging.warning(
log.warning(
"No contract was created during the execution of contract creation "
"Increase the resources for creation execution (--max-depth or --create-timeout)"
)
self._execute_transactions(created_account.address)
logging.info("Finished symbolic execution")
logging.info(
log.info("Finished symbolic execution")
log.info(
"%d nodes, %d edges, %d total states",
len(self.nodes),
len(self.edges),
@ -153,7 +136,7 @@ class LaserEVM:
/ float(coverage[0])
* 100
)
logging.info("Achieved {:.2f}% coverage for code: {}".format(cov, code))
log.info("Achieved {:.2f}% coverage for code: {}".format(cov, code))
def _execute_transactions(self, address):
"""
@ -166,13 +149,13 @@ class LaserEVM:
initial_coverage = self._get_covered_instructions()
self.time = datetime.now()
logging.info("Starting message call transaction, iteration: {}".format(i))
log.info("Starting message call transaction, iteration: {}".format(i))
execute_message_call(self, address)
end_coverage = self._get_covered_instructions()
logging.info(
log.info(
"Number of new instructions covered in tx %d: %d"
% (i, end_coverage - initial_coverage)
)
@ -187,12 +170,6 @@ class LaserEVM:
return total_covered_instructions
def exec(self, create=False, track_gas=False) -> Union[List[GlobalState], None]:
"""
:param create:
:param track_gas:
:return:
"""
final_states = []
for global_state in self.strategy:
if self.execution_timeout and not create:
@ -208,7 +185,7 @@ class LaserEVM:
try:
new_states, op_code = self.execute_state(global_state)
except NotImplementedError:
logging.debug("Encountered unimplemented instruction")
log.debug("Encountered unimplemented instruction")
continue
self.manage_cfg(op_code, new_states)
@ -222,11 +199,6 @@ class LaserEVM:
def execute_state(
self, global_state: GlobalState
) -> Tuple[List[GlobalState], Union[str, None]]:
"""
:param global_state:
:return:
"""
instructions = global_state.environment.code.instruction_list
try:
op_code = instructions[global_state.mstate.pc]["opcode"]
@ -248,9 +220,7 @@ class LaserEVM:
# In this case we don't put an unmodified world state in the open_states list Since in the case of an
# exceptional halt all changes should be discarded, and this world state would not provide us with a
# previously unseen world state
logging.debug(
"Encountered a VmException, ending path: `{}`".format(str(e))
)
log.debug("Encountered a VmException, ending path: `{}`".format(str(e)))
new_global_states = []
else:
# First execute the post hook for the transaction ending instruction
@ -347,11 +317,6 @@ class LaserEVM:
self.coverage[code][1][instruction_index] = True
def manage_cfg(self, opcode: str, new_states: List[GlobalState]) -> None:
"""
:param opcode:
:param new_states:
"""
if opcode == "JUMP":
assert len(new_states) <= 1
for state in new_states:
@ -419,7 +384,7 @@ class LaserEVM:
]
new_node.flags |= NodeFlags.FUNC_ENTRY
logging.debug(
log.debug(
"- Entering function "
+ environment.active_account.contract_name
+ ":"
@ -447,17 +412,7 @@ class LaserEVM:
hook(global_state)
def pre_hook(self, op_code: str) -> Callable:
"""
:param op_code:
:return:
"""
def hook_decorator(func: Callable):
"""
:param func:
:return:
"""
if op_code not in self.pre_hooks.keys():
self.pre_hooks[op_code] = []
self.pre_hooks[op_code].append(func)
@ -466,17 +421,7 @@ class LaserEVM:
return hook_decorator
def post_hook(self, op_code: str) -> Callable:
"""
:param op_code:
:return:
"""
def hook_decorator(func: Callable):
"""
:param func:
:return:
"""
if op_code not in self.post_hooks.keys():
self.post_hooks[op_code] = []
self.post_hooks[op_code].append(func)

@ -1,12 +1,14 @@
import logging
import copy
from typing import Union, List, Tuple
from z3 import ExprRef
import mythril.laser.ethereum.util as helper
from mythril.laser.ethereum.cfg import JumpType, Node
from mythril.laser.ethereum.state.environment import Environment
from mythril.laser.ethereum.state.global_state import GlobalState
from mythril.analysis.symbolic import SymExecWrapper
from mythril.laser.smt import Expression
log = logging.getLogger(__name__)
class TaintRecord:
@ -94,10 +96,10 @@ class TaintRunner:
) -> TaintResult:
"""
Runs taint analysis on the statespace
:param initial_stack:
:param statespace: symbolic statespace to run taint analysis on
:param node: taint introduction node
:param state: taint introduction state
:param stack_indexes: stack indexes to introduce taint
:return: TaintResult object containing analysis results
"""
if initial_stack is None:
@ -134,14 +136,6 @@ class TaintRunner:
environment: Environment,
transaction_stack_length: int,
) -> List[Node]:
"""
:param node:
:param statespace:
:param environment:
:param transaction_stack_length:
:return:
"""
direct_children = [
statespace.nodes[edge.node_to]
for edge in statespace.edges
@ -182,12 +176,6 @@ class TaintRunner:
@staticmethod
def execute_state(record: TaintRecord, state: GlobalState) -> TaintRecord:
"""
:param record:
:param state:
:return:
"""
assert len(state.mstate.stack) == len(record.stack)
""" Runs taint analysis on a state """
record.add_state(state)
@ -218,17 +206,12 @@ class TaintRunner:
elif op in ("CALL", "CALLCODE", "DELEGATECALL", "STATICCALL"):
TaintRunner.mutate_call(new_record, op)
else:
logging.debug("Unknown operation encountered: {}".format(op))
log.debug("Unknown operation encountered: {}".format(op))
return new_record
@staticmethod
def mutate_stack(record: TaintRecord, mutator: Tuple[int, int]) -> None:
"""
:param record:
:param mutator:
"""
pop, push = mutator
values = []
@ -242,124 +225,75 @@ class TaintRunner:
@staticmethod
def mutate_push(op: str, record: TaintRecord) -> None:
"""
:param op:
:param record:
"""
TaintRunner.mutate_stack(record, (0, 1))
@staticmethod
def mutate_dup(op: str, record: TaintRecord) -> None:
"""
:param op:
:param record:
"""
depth = int(op[3:])
index = len(record.stack) - depth
record.stack.append(record.stack[index])
@staticmethod
def mutate_swap(op: str, record: TaintRecord) -> None:
"""
:param op:
:param record:
"""
depth = int(op[4:])
l = len(record.stack) - 1
i = l - depth
record.stack[l], record.stack[i] = record.stack[i], record.stack[l]
@staticmethod
def mutate_mload(record: TaintRecord, op0: ExprRef) -> None:
"""
:param record:
:param op0:
:return:
"""
def mutate_mload(record: TaintRecord, op0: Expression) -> None:
_ = record.stack.pop()
try:
index = helper.get_concrete_int(op0)
except TypeError:
logging.debug("Can't MLOAD taint track symbolically")
log.debug("Can't MLOAD taint track symbolically")
record.stack.append(False)
return
record.stack.append(record.memory_tainted(index))
@staticmethod
def mutate_mstore(record: TaintRecord, op0: ExprRef) -> None:
"""
:param record:
:param op0:
:return:
"""
def mutate_mstore(record: TaintRecord, op0: Expression) -> None:
_, value_taint = record.stack.pop(), record.stack.pop()
try:
index = helper.get_concrete_int(op0)
except TypeError:
logging.debug("Can't mstore taint track symbolically")
log.debug("Can't mstore taint track symbolically")
return
record.memory[index] = value_taint
@staticmethod
def mutate_sload(record: TaintRecord, op0: ExprRef) -> None:
"""
:param record:
:param op0:
:return:
"""
def mutate_sload(record: TaintRecord, op0: Expression) -> None:
_ = record.stack.pop()
try:
index = helper.get_concrete_int(op0)
except TypeError:
logging.debug("Can't MLOAD taint track symbolically")
log.debug("Can't MLOAD taint track symbolically")
record.stack.append(False)
return
record.stack.append(record.storage_tainted(index))
@staticmethod
def mutate_sstore(record: TaintRecord, op0: ExprRef) -> None:
"""
:param record:
:param op0:
:return:
"""
def mutate_sstore(record: TaintRecord, op0: Expression) -> None:
_, value_taint = record.stack.pop(), record.stack.pop()
try:
index = helper.get_concrete_int(op0)
except TypeError:
logging.debug("Can't mstore taint track symbolically")
log.debug("Can't mstore taint track symbolically")
return
record.storage[index] = value_taint
@staticmethod
def mutate_log(record: TaintRecord, op: str) -> None:
"""
:param record:
:param op:
"""
depth = int(op[3:])
for _ in range(depth + 2):
record.stack.pop()
@staticmethod
def mutate_call(record: TaintRecord, op: str) -> None:
"""
:param record:
:param op:
"""
pops = 6
if op in ("CALL", "CALLCODE"):
pops += 1

@ -1,17 +1,13 @@
from typing import List, Union
from mythril.disassembler.disassembly import Disassembly
from mythril.laser.ethereum.cfg import Node, Edge, JumpType
from mythril.laser.ethereum.state.calldata import CalldataType, ConcreteCalldata
from mythril.laser.ethereum.state.global_state import GlobalState
from mythril.laser.ethereum.transaction.transaction_models import (
MessageCallTransaction,
ContractCreationTransaction,
get_next_transaction_id,
)
from z3 import BitVec
from mythril.laser.ethereum.state.environment import Environment
from mythril.laser.ethereum.state.calldata import CalldataType, ConcreteCalldata
from mythril.laser.ethereum.state.account import Account
from mythril.laser.ethereum.state.world_state import WorldState
from mythril.laser.ethereum.state.global_state import GlobalState
from mythril.disassembler.disassembly import Disassembly
from mythril.laser.ethereum.cfg import Node, Edge, JumpType
def execute_message_call(

@ -1,6 +1,7 @@
from z3 import BitVec, BitVecVal
from logging import debug
import logging
from mythril.laser.smt import symbol_factory
from mythril.disassembler.disassembly import Disassembly
from mythril.laser.ethereum.cfg import Node, Edge, JumpType
from mythril.laser.ethereum.state.calldata import (
@ -15,6 +16,7 @@ from mythril.laser.ethereum.transaction.transaction_models import (
get_next_transaction_id,
)
log = logging.getLogger(__name__)
CREATOR_ADDRESS = 0xAFFEAFFEAFFEAFFEAFFEAFFEAFFEAFFEAFFEAFFE
ATTACKER_ADDRESS = 0xDEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEF
@ -28,21 +30,27 @@ def execute_message_call(laser_evm, callee_address: str) -> None:
for open_world_state in open_states:
if open_world_state[callee_address].deleted:
debug("Can not execute dead contract, skipping.")
log.debug("Can not execute dead contract, skipping.")
continue
next_transaction_id = get_next_transaction_id()
transaction = MessageCallTransaction(
world_state=open_world_state,
identifier=next_transaction_id,
gas_price=BitVec("gas_price{}".format(next_transaction_id), 256),
gas_price=symbol_factory.BitVecSym(
"gas_price{}".format(next_transaction_id), 256
),
gas_limit=8000000, # block gas limit
origin=BitVec("origin{}".format(next_transaction_id), 256),
caller=BitVecVal(ATTACKER_ADDRESS, 256),
origin=symbol_factory.BitVecSym(
"origin{}".format(next_transaction_id), 256
),
caller=symbol_factory.BitVecVal(ATTACKER_ADDRESS, 256),
callee_account=open_world_state[callee_address],
call_data=SymbolicCalldata(next_transaction_id),
call_data_type=CalldataType.SYMBOLIC,
call_value=BitVec("call_value{}".format(next_transaction_id), 256),
call_value=symbol_factory.BitVecSym(
"call_value{}".format(next_transaction_id), 256
),
)
_setup_global_state_for_execution(laser_evm, transaction)
@ -68,15 +76,21 @@ def execute_contract_creation(
transaction = ContractCreationTransaction(
world_state=open_world_state,
identifier=next_transaction_id,
gas_price=BitVec("gas_price{}".format(next_transaction_id), 256),
gas_price=symbol_factory.BitVecSym(
"gas_price{}".format(next_transaction_id), 256
),
gas_limit=8000000, # block gas limit
origin=BitVec("origin{}".format(next_transaction_id), 256),
origin=symbol_factory.BitVecSym(
"origin{}".format(next_transaction_id), 256
),
code=Disassembly(contract_initialization_code),
caller=BitVecVal(CREATOR_ADDRESS, 256),
caller=symbol_factory.BitVecVal(CREATOR_ADDRESS, 256),
callee_account=new_account,
call_data=[],
call_data_type=CalldataType.SYMBOLIC,
call_value=BitVec("call_value{}".format(next_transaction_id), 256),
call_value=symbol_factory.BitVecSym(
"call_value{}".format(next_transaction_id), 256
),
)
_setup_global_state_for_execution(laser_evm, transaction)
laser_evm.exec(True)

@ -1,6 +1,7 @@
import logging
from typing import Union
from mythril.disassembler.disassembly import Disassembly
from mythril.laser.smt import symbol_factory
from mythril.laser.ethereum.state.environment import Environment
from mythril.laser.ethereum.state.calldata import (
BaseCalldata,
@ -10,7 +11,7 @@ from mythril.laser.ethereum.state.calldata import (
from mythril.laser.ethereum.state.account import Account
from mythril.laser.ethereum.state.world_state import WorldState
from mythril.laser.ethereum.state.global_state import GlobalState
from z3 import BitVec, ExprRef
from z3 import ExprRef
import array
_next_transaction_id = 0
@ -71,12 +72,14 @@ class BaseTransaction:
self.gas_price = (
gas_price
if gas_price is not None
else BitVec("gasprice{}".format(identifier), 256)
else symbol_factory.BitVecSym("gasprice{}".format(identifier), 256)
)
self.gas_limit = gas_limit
self.origin = (
origin if origin is not None else BitVec("origin{}".format(identifier), 256)
origin
if origin is not None
else symbol_factory.BitVecSym("origin{}".format(identifier), 256)
)
self.code = code
@ -90,12 +93,12 @@ class BaseTransaction:
self.call_data_type = (
call_data_type
if call_data_type is not None
else BitVec("call_data_type{}".format(identifier), 256)
else symbol_factory.BitVecSym("call_data_type{}".format(identifier), 256)
)
self.call_value = (
call_value
if call_value is not None
else BitVec("callvalue{}".format(identifier), 256)
else symbol_factory.BitVecSym("callvalue{}".format(identifier), 256)
)
self.return_data = None

@ -1,5 +1,8 @@
import re
from z3 import *
from mythril.laser.smt import is_false, is_true, simplify, If, BitVec, Bool, Expression
from mythril.laser.smt import symbol_factory
import logging
from typing import Union, List, Dict
@ -12,20 +15,10 @@ TT255 = 2 ** 255
def sha3(seed: str) -> bytes:
"""
:param seed:
:return:
"""
return _sha3.keccak_256(bytes(seed)).digest()
def safe_decode(hex_encoded_string: str) -> bytes:
"""
:param hex_encoded_string:
:return:
"""
if hex_encoded_string.startswith("0x"):
return bytes.fromhex(hex_encoded_string[2:])
else:
@ -33,23 +26,12 @@ def safe_decode(hex_encoded_string: str) -> bytes:
def to_signed(i: int) -> int:
"""
:param i:
:return:
"""
return i if i < TT255 else i - TT256
def get_instruction_index(
instruction_list: List[Dict], address: int
) -> Union[int, None]:
"""
:param instruction_list:
:param address:
:return:
"""
index = 0
for instr in instruction_list:
if instr["address"] == address:
@ -59,76 +41,50 @@ def get_instruction_index(
def get_trace_line(instr: Dict, state: "MachineState") -> str:
"""
:param instr:
:param state:
:return:
"""
stack = str(state.stack[::-1])
# stack = re.sub("(\d+)", lambda m: hex(int(m.group(1))), stack)
stack = re.sub("\n", "", stack)
return str(instr["address"]) + " " + instr["opcode"] + "\tSTACK: " + stack
def pop_bitvec(state: "MachineState") -> BitVecVal:
"""
:param state:
:return:
"""
def pop_bitvec(state: "MachineState") -> BitVec:
# pop one element from stack, converting boolean expressions and
# concrete Python variables to BitVecVal
item = state.stack.pop()
if type(item) == BoolRef:
return If(item, BitVecVal(1, 256), BitVecVal(0, 256))
if type(item) == Bool:
return If(
item, symbol_factory.BitVecVal(1, 256), symbol_factory.BitVecVal(0, 256)
)
elif type(item) == bool:
if item:
return BitVecVal(1, 256)
return symbol_factory.BitVecVal(1, 256)
else:
return BitVecVal(0, 256)
return symbol_factory.BitVecVal(0, 256)
elif type(item) == int:
return BitVecVal(item, 256)
return symbol_factory.BitVecVal(item, 256)
else:
return simplify(item)
def get_concrete_int(item: Union[int, ExprRef]) -> int:
"""
:param item:
:return:
"""
def get_concrete_int(item: Union[int, Expression]) -> int:
if isinstance(item, int):
return item
elif isinstance(item, BitVecNumRef):
return item.as_long()
elif isinstance(item, BoolRef):
simplified = simplify(item)
if is_false(simplified):
return 0
elif is_true(simplified):
return 1
else:
elif isinstance(item, BitVec):
if item.symbolic:
raise TypeError("Got a symbolic BitVecRef")
return item.value
elif isinstance(item, Bool):
value = item.value
if value is None:
raise TypeError("Symbolic boolref encountered")
try:
return simplify(item).as_long()
except AttributeError:
raise TypeError("Got a symbolic BitVecRef")
return value
def concrete_int_from_bytes(concrete_bytes: bytes, start_index: int) -> int:
"""
:param concrete_bytes:
:param start_index:
:return:
"""
concrete_bytes = [
byte.as_long() if type(byte) == BitVecNumRef else byte
byte.value if isinstance(byte, BitVec) and not byte.symbolic else byte
for byte in concrete_bytes
]
integer_bytes = concrete_bytes[start_index : start_index + 32]
@ -137,23 +93,13 @@ def concrete_int_from_bytes(concrete_bytes: bytes, start_index: int) -> int:
def concrete_int_to_bytes(val):
"""
:param val:
:return:
"""
# logging.debug("concrete_int_to_bytes " + str(val))
if type(val) == int:
return val.to_bytes(32, byteorder="big")
return (simplify(val).as_long()).to_bytes(32, byteorder="big")
return (simplify(val).value).to_bytes(32, byteorder="big")
def bytearray_to_int(arr):
"""
:param arr:
:return:
"""
o = 0
for a in arr:
o = (o << 8) + a

@ -1,6 +1,23 @@
from mythril.laser.smt.bitvec import BitVec
from mythril.laser.smt.expression import Expression
from mythril.laser.smt.bool import Bool
from mythril.laser.smt.bitvec import (
BitVec,
If,
UGT,
ULT,
Concat,
Extract,
URem,
SRem,
UDiv,
UGE,
Sum,
BVAddNoOverflow,
BVMulNoOverflow,
BVSubNoUnderflow,
)
from mythril.laser.smt.expression import Expression, simplify
from mythril.laser.smt.bool import Bool, is_true, is_false, Or, Not
from mythril.laser.smt.array import K, Array, BaseArray
from mythril.laser.smt.solver import Solver, Optimize
import z3
@ -70,4 +87,7 @@ class _Z3SymbolFactory(SymbolFactory):
# This is the instance that other parts of mythril should use
symbol_factory = _Z3SymbolFactory()
# Type hints are not allowed here in 3.5
# symbol_factory: SymbolFactory = _SmtSymbolFactory()
symbol_factory = _SmtSymbolFactory()

@ -0,0 +1,46 @@
from mythril.laser.smt.bitvec import BitVec
import z3
class BaseArray:
"""
Base array type, implements basic store and set operations
"""
def __getitem__(self, item: BitVec):
""" Gets item from the array, item can be symbolic"""
if isinstance(item, slice):
raise ValueError(
"Instance of BaseArray, does not support getitem with slices"
)
return BitVec(z3.Select(self.raw, item.raw))
def __setitem__(self, key: BitVec, value: BitVec):
""" Sets an item in the array, key can be symbolic"""
self.raw = z3.Store(self.raw, key.raw, value.raw)
class Array(BaseArray):
def __init__(self, name: str, domain: int, value_range: int):
"""
Initializes a symbolic array
:param name: Name of the array
:param domain: The domain for the array (10 -> all the values that a bv of size 10 could take)
:param value_range: The range for the values in the array (10 -> all the values that a bv of size 10 could take)
"""
self.domain = z3.BitVecSort(domain)
self.range = z3.BitVecSort(value_range)
self.raw = z3.Array(name, self.domain, self.range)
class K(BaseArray):
def __init__(self, domain: int, value_range: int, value: int):
"""
Initializes an array with a default value
:param domain: The domain for the array (10 -> all the values that a bv of size 10 could take)
:param value_range: The range for the values in the array (10 -> all the values that a bv of size 10 could take)
:param value: The default value to use for this array
"""
self.domain = z3.BitVecSort(domain)
self.value = z3.BitVecVal(value, value_range)
self.raw = z3.K(self.domain, self.value)

@ -3,6 +3,8 @@ import z3
from mythril.laser.smt.expression import Expression
from mythril.laser.smt.bool import Bool
from typing import Union
# fmt: off
@ -13,6 +15,9 @@ class BitVec(Expression):
def __init__(self, raw, annotations=None):
super().__init__(raw, annotations)
def size(self):
return self.raw.size()
@property
def symbolic(self):
""" Returns whether this symbol doesn't have a concrete value """
@ -27,77 +32,113 @@ class BitVec(Expression):
assert isinstance(self.raw, z3.BitVecNumRef)
return self.raw.as_long()
def __add__(self, other: "BV") -> "BV":
def __add__(self, other) -> "BitVec":
""" Create an addition expression """
if isinstance(other, int):
return BitVec(self.raw + other, annotations=self.annotations)
union = self.annotations + other.annotations
return BitVec(self.raw + other.raw, annotations=union)
def __sub__(self, other: "BV") -> "BV":
def __sub__(self, other) -> "BitVec":
""" Create a subtraction expression """
if isinstance(other, int):
return BitVec(self.raw - other, annotations=self.annotations)
union = self.annotations + other.annotations
return BitVec(self.raw - other.raw, annotations=union)
def __mul__(self, other: "BV") -> "BV":
def __mul__(self, other) -> "BitVec":
""" Create a multiplication expression """
union = self.annotations + other.annotations
return BitVec(self.raw * other.raw, annotations=union)
def __truediv__(self, other: "BV") -> "BV":
def __truediv__(self, other) -> "BitVec":
""" Create a signed division expression """
union = self.annotations + other.annotations
return BitVec(self.raw / other.raw, annotations=union)
def __and__(self, other: "BV") -> "BV":
def __and__(self, other) -> "BitVec":
""" Create an and expression """
if not isinstance(other, BitVec):
other = BitVec(z3.BitVecVal(other, 256))
union = self.annotations + other.annotations
return BitVec(self.raw & other.raw, annotations=union)
def __or__(self, other: "BV") -> "BV":
def __or__(self, other) -> "BitVec":
""" Create an or expression """
union = self.annotations + other.annotations
return BitVec(self.raw | other.raw, annotations=union)
def __xor__(self, other: "BV") -> "BV":
def __xor__(self, other) -> "BitVec":
""" Create a xor expression """
union = self.annotations + other.annotations
return BitVec(self.raw ^ other.raw, annotations=union)
def __lt__(self, other: "BV") -> Bool:
def __lt__(self, other) -> Bool:
""" Create a signed less than expression """
union = self.annotations + other.annotations
return Bool(self.raw < other.raw, annotations=union)
def __gt__(self, other: "BV") -> Bool:
def __gt__(self, other) -> Bool:
""" Create a signed greater than expression """
union = self.annotations + other.annotations
return Bool(self.raw < other.raw, annotations=union)
return Bool(self.raw > other.raw, annotations=union)
def __eq__(self, other: "BV") -> Bool:
def __eq__(self, other) -> Bool:
""" Create an equality expression """
if not isinstance(other, BitVec):
return Bool(self.raw == other, annotations=self.annotations)
union = self.annotations + other.annotations
return Bool(self.raw == other.raw, annotations=union)
def __ne__(self, other) -> Bool:
""" Create an inequality expression """
if not isinstance(other, BitVec):
return Bool(self.raw != other, annotations=self.annotations)
union = self.annotations + other.annotations
return Bool(self.raw != other.raw, annotations=union)
def If(a: Bool, b: BitVec, c: BitVec):
""" Create an if-then-else expression """
if not isinstance(a, Expression):
a = Bool(z3.BoolVal(a))
if not isinstance(b, Expression):
b = BitVec(z3.BitVecVal(b, 256))
if not isinstance(c, Expression):
c = BitVec(z3.BitVecVal(c, 256))
union = a.annotations + b.annotations + c.annotations
return BitVec(z3.If(a, b, c), union)
return BitVec(z3.If(a.raw, b.raw, c.raw), union)
def UGT(a: BitVec, b: BitVec) -> Bool:
""" Create an unsigned greater than expression """
annotations = a.annotations + b.annotations
return Bool(z3.UGT(a, b), annotations)
return Bool(z3.UGT(a.raw, b.raw), annotations)
def UGE(a: BitVec, b:BitVec) -> Bool:
annotations = a.annotations + b.annotations
return Bool(z3.UGE(a.raw, b.raw), annotations)
def ULT(a: BitVec, b: BitVec) -> Bool:
""" Create an unsigned less than expression """
annotations = a.annotations + b.annotations
return Bool(z3.ULT(a, b), annotations)
return Bool(z3.ULT(a.raw, b.raw), annotations)
def Concat(*args) -> BitVec:
""" Create a concatenation expression """
# The following statement is used if a list is provided as an argument to concat
if len(args) == 1 and isinstance(args[0], list):
args = args[0]
nraw = z3.Concat([a.raw for a in args])
annotations = []
for bv in args:
@ -126,3 +167,40 @@ def UDiv(a: BitVec, b: BitVec) -> BitVec:
""" Create an unsigned division expression """
union = a.annotations + b.annotations
return BitVec(z3.UDiv(a.raw, b.raw), annotations=union)
def Sum(*args) -> BitVec:
""" Create sum expression"""
nraw = z3.Sum([a.raw for a in args])
annotations = []
for bv in args:
annotations += bv.annotations
return BitVec(nraw, annotations)
def BVAddNoOverflow(a: Union[BitVec, int], b: Union[BitVec, int], signed: bool) -> Bool:
"""Creates predicate that verifies that the addition doesn't overflow"""
if not isinstance(a, Expression):
a = BitVec(z3.BitVecVal(a, 256))
if not isinstance(b, Expression):
b = BitVec(z3.BitVecVal(b, 256))
return Bool(z3.BVAddNoOverflow(a.raw, b.raw, signed))
def BVMulNoOverflow(a: Union[BitVec, int], b: Union[BitVec, int], signed: bool) -> Bool:
"""Creates predicate that verifies that the multiplication doesn't overflow"""
if not isinstance(a, Expression):
a = BitVec(z3.BitVecVal(a, 256))
if not isinstance(b, Expression):
b = BitVec(z3.BitVecVal(b, 256))
return Bool(z3.BVMulNoOverflow(a.raw, b.raw, signed))
def BVSubNoUnderflow(a: Union[BitVec, int], b: Union[BitVec, int], signed: bool) -> Bool:
"""Creates predicate that verifies that the subtraction doesn't overflow"""
if not isinstance(a, Expression):
a = BitVec(z3.BitVecVal(a, 256))
if not isinstance(b, Expression):
b = BitVec(z3.BitVecVal(b, 256))
return Bool(z3.BVSubNoUnderflow(a.raw, b.raw, signed))

@ -26,6 +26,7 @@ class Bool(Expression):
@property
def value(self) -> Union[bool, None]:
""" Returns the concrete value of this bool if concrete, otherwise None"""
self.simplify()
if self.is_true:
return True
elif self.is_false:
@ -33,12 +34,39 @@ class Bool(Expression):
else:
return None
def __eq__(self, other):
if isinstance(other, Expression):
return Bool(self.raw == other.raw, self.annotations + other.annotations)
return Bool(self.raw == other, self.annotations)
def __ne__(self, other):
if isinstance(other, Expression):
return Bool(self.raw != other.raw, self.annotations + other.annotations)
return Bool(self.raw != other, self.annotations)
def __bool__(self):
if self.value is not None:
return self.value
else:
return False
def Or(a: Bool, b: Bool):
""" Create an or expression"""
union = a.annotations + b.annotations
return Bool(z3.Or(a.raw, b.raw), annotations=union)
def Not(a: Bool):
""" Create a Not expression"""
return Bool(z3.Not(a.raw), a.annotations)
def is_false(a: Bool) -> bool:
""" Returns whether the provided bool can be simplified to false"""
return is_false(a)
return z3.is_false(a.raw)
def is_true(a: Bool) -> bool:
""" Returns whether the provided bool can be simplified to true"""
return is_true(a)
return z3.is_true(a.raw)

@ -26,7 +26,11 @@ class Expression:
""" Simplifies this expression """
self.raw = z3.simplify(self.raw)
def __repr__(self):
return repr(self.raw)
def simplify(expression: Expression):
""" Simplifies the expression """
expression.simplify()
return expression

@ -0,0 +1,66 @@
import z3
from mythril.laser.smt.bool import Bool
from mythril.laser.smt.expression import Expression
class Solver:
"""
An smt solver object
"""
def __init__(self):
self.raw = z3.Solver()
def set_timeout(self, timeout: int) -> None:
""" Sets the timeout that will be used by this solver, timeout is in milliseconds"""
self.raw.set(timeout=timeout)
def add(self, constraints: list) -> None:
""" Adds the constraints to this solver """
if not isinstance(constraints, list):
self.raw.add(constraints.raw)
return
constraints = [c.raw for c in constraints]
self.raw.add(constraints)
def append(self, constraints: list) -> None:
""" Adds the constraints to this solver """
if not isinstance(constraints, list):
self.raw.append(constraints.raw)
return
constraints = [c.raw for c in constraints]
self.raw.add(constraints)
def check(self):
""" Returns z3 smt check result"""
return self.raw.check()
def model(self):
""" Returns z3 model for a solution"""
return self.raw.model()
def reset(self) -> None:
""" Resets this solver """
self.raw.reset()
def pop(self, num) -> None:
""" Pop num constraints from this solver """
self.raw.pop(num)
class Optimize(Solver):
"""
An optimizing smt solver
"""
def __init__(self):
super().__init__()
self.raw = z3.Optimize()
def minimize(self, element: Expression):
""" In solving this solver will try to minimize the passed expression """
self.raw.minimize(element.raw)
def maximize(self, element: Expression):
""" In solving this solver will try to maximize the passed expression """
self.raw.maximize(element.raw)

@ -35,7 +35,7 @@ from mythril.analysis.security import fire_lasers
from mythril.analysis.report import Report
from mythril.ethereum.interface.leveldb.client import EthLevelDB
# logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger(__name__)
class Mythril(object):
@ -120,7 +120,7 @@ class Mythril(object):
if not os.path.exists(mythril_dir):
# Initialize data directory
logging.info("Creating mythril data directory")
log.info("Creating mythril data directory")
os.mkdir(mythril_dir)
db_path = str(Path(mythril_dir) / "signatures.db")
@ -155,7 +155,7 @@ class Mythril(object):
leveldb_fallback_dir = os.path.join(leveldb_fallback_dir, "geth", "chaindata")
if not os.path.exists(self.config_path):
logging.info("No config file found. Creating default: " + self.config_path)
log.info("No config file found. Creating default: " + self.config_path)
open(self.config_path, "a").close()
config = ConfigParser(allow_no_value=True)
@ -233,12 +233,12 @@ class Mythril(object):
"Could not extract solc version from string {}".format(main_version)
)
if version == main_version_number:
logging.info("Given version matches installed version")
log.info("Given version matches installed version")
solc_binary = os.environ.get("SOLC") or "solc"
else:
solc_binary = util.solc_exists(version)
if solc_binary:
logging.info("Given version is already installed")
log.info("Given version is already installed")
else:
try:
solc.install_solc("v" + version)
@ -250,7 +250,7 @@ class Mythril(object):
"There was an error when trying to install the specified solc version"
)
logging.info("Setting the compiler to %s", solc_binary)
log.info("Setting the compiler to %s", solc_binary)
return solc_binary
@ -269,7 +269,7 @@ class Mythril(object):
"""
self.eth = EthJsonRpc("mainnet.infura.io", 443, True)
logging.info("Using INFURA for RPC queries")
log.info("Using INFURA for RPC queries")
def set_api_rpc(self, rpc=None, rpctls=False):
"""
@ -294,7 +294,7 @@ class Mythril(object):
if rpcconfig:
self.eth = EthJsonRpc(rpcconfig[0], int(rpcconfig[1]), rpcconfig[2])
logging.info("Using RPC settings: %s" % str(rpcconfig))
log.info("Using RPC settings: %s" % str(rpcconfig))
else:
raise CriticalError("Invalid RPC settings, check help for details.")
@ -303,7 +303,7 @@ class Mythril(object):
"""
self.eth = EthJsonRpc("localhost", 8545)
logging.info("Using default RPC settings: http://localhost:8545")
log.info("Using default RPC settings: http://localhost:8545")
def set_api_from_config_path(self):
"""
@ -459,7 +459,7 @@ class Mythril(object):
except CompilerError as e:
raise CriticalError(e)
except NoContractFoundError:
logging.error(
log.error(
"The file " + file + " does not contain a compilable contract."
)

@ -2,6 +2,8 @@ from mythril.disassembler.disassembly import Disassembly
import logging
import re
log = logging.getLogger(__name__)
class DynLoader:
"""
@ -59,9 +61,7 @@ class DynLoader:
if not self.contract_loading:
raise ValueError("Cannot load contract when contract_loading flag is false")
logging.debug(
"Dynld at contract " + contract_address + ": " + dependency_address
)
log.debug("Dynld at contract " + contract_address + ": " + dependency_address)
m = re.match(r"^(0x[0-9a-fA-F]{40})$", dependency_address)
@ -71,7 +71,7 @@ class DynLoader:
else:
return None
logging.debug("Dependency address: " + dependency_address)
log.debug("Dependency address: " + dependency_address)
code = self.eth.eth_getCode(dependency_address)

@ -15,6 +15,8 @@ from subprocess import Popen, PIPE
from mythril.exceptions import CompilerError
log = logging.getLogger(__name__)
lock = multiprocessing.Lock()
@ -108,7 +110,7 @@ class SignatureDB(object, metaclass=Singleton):
)
self.path = os.path.join(self.path, "signatures.db")
logging.info("Using signature database at %s", self.path)
log.info("Using signature database at %s", self.path)
# NOTE: Creates a new DB file if it doesn't exist already
with SQLiteDB(self.path) as cur:
cur.execute(
@ -203,7 +205,7 @@ class SignatureDB(object, metaclass=Singleton):
except FourByteDirectoryOnlineLookupError as fbdole:
# wait at least 2 mins to try again
self.online_lookup_timeout = time.time() + 2 * 60
logging.warning("Online lookup failed, not retrying for 2min: %s", fbdole)
log.warning("Online lookup failed, not retrying for 2min: %s", fbdole)
return []
def import_solidity_file(
@ -246,8 +248,7 @@ class SignatureDB(object, metaclass=Singleton):
solc_bytes = "0x" + line.split(":")[0]
solc_text = line.split(":")[1].strip()
self.solidity_sigs[solc_bytes].append(solc_text)
logging.debug(
log.debug(
"Signatures: found %d signatures after parsing" % len(self.solidity_sigs)
)

@ -14,6 +14,8 @@ from mythril.analysis.report import Report
from mythril.ethereum import util
from mythril.laser.ethereum.util import get_instruction_index
log = logging.getLogger(__name__)
def analyze_truffle_project(sigs, args):
"""
@ -109,7 +111,7 @@ def analyze_truffle_project(sigs, args):
].decode("utf-8")
issue.lineno = mappings[index].lineno
except IndexError:
logging.debug("No code mapping at index %d", index)
log.debug("No code mapping at index %d", index)
report.append_issue(issue)

@ -1,3 +1,3 @@
# This file is suitable for sourcing inside POSIX shell, e.g. bash as
# well as for importing into Python
VERSION = "v0.19.8" # NOQA
VERSION = "v0.19.11" # NOQA

@ -1,6 +1,7 @@
coloredlogs>=10.0
configparser>=3.5.0
coverage
py_ecc==1.4.2
eth_abi==1.3.0
eth-account>=0.1.0a2
ethereum>=2.3.2

@ -77,6 +77,7 @@ setup(
packages=find_packages(exclude=["contrib", "docs", "tests"]),
install_requires=[
"coloredlogs>=10.0",
"py_ecc==1.4.2",
"ethereum>=2.3.2",
"z3-solver>=4.8.0.0",
"requests",
@ -104,7 +105,7 @@ setup(
tests_require=["pytest>=3.6.0", "pytest_mock", "pytest-cov"],
python_requires=">=3.5",
extras_require={},
package_data={"mythril.analysis.templates": ["*"]},
package_data={"mythril.analysis.templates": ["*"], "": ["signatures.db"]},
include_package_data=True,
entry_points={"console_scripts": ["myth=mythril.interfaces.cli:main"]},
cmdclass={"verify": VerifyVersionCommand},

@ -2,6 +2,7 @@ from mythril.laser.ethereum.svm import LaserEVM
from mythril.laser.ethereum.state.account import Account
from mythril.disassembler.disassembly import Disassembly
from mythril.laser.ethereum.transaction.concolic import execute_message_call
from mythril.laser.smt import Expression, BitVec
from mythril.analysis.solver import get_model
from datetime import datetime
@ -138,11 +139,9 @@ def test_vmtest(
for index, value in details["storage"].items():
expected = int(value, 16)
actual = account.storage[int(index, 16)]
if isinstance(actual, ExprRef):
actual = model.eval(actual)
actual = (
1 if actual == True else 0 if actual == False else actual
) # Comparisons should be done with == than 'is' here as actual can be a BoolRef
if isinstance(actual, Expression):
actual = actual.value
actual = 1 if actual is True else 0 if actual is False else actual
else:
if type(actual) == bytes:
actual = int(binascii.b2a_hex(actual), 16)

@ -1,6 +1,7 @@
import pytest
from mythril.laser.ethereum.state.calldata import ConcreteCalldata, SymbolicCalldata
from z3 import Solver, simplify, BitVec, sat, unsat
from mythril.laser.smt import Solver, symbol_factory
from z3 import sat, unsat
from z3.z3types import Z3Exception
from mock import MagicMock
@ -32,7 +33,7 @@ def test_concrete_calldata_calldatasize():
# Act
solver.check()
model = solver.model()
result = model.eval(calldata.calldatasize)
result = model.eval(calldata.calldatasize.raw)
# Assert
assert result == 7
@ -75,8 +76,8 @@ def test_symbolic_calldata_constrain_index():
def test_symbolic_calldata_equal_indices():
calldata = SymbolicCalldata(0)
index_a = BitVec("index_a", 256)
index_b = BitVec("index_b", 256)
index_a = symbol_factory.BitVecSym("index_a", 256)
index_b = symbol_factory.BitVecSym("index_b", 256)
# Act
a = calldata[index_a]

@ -1,5 +1,5 @@
import pytest
from z3 import BitVec, simplify
from mythril.laser.smt import simplify, symbol_factory
from mythril.laser.ethereum.state.machine_state import MachineState
from mythril.laser.ethereum.evm_exceptions import StackUnderflowException
@ -105,8 +105,8 @@ def test_memory_write():
mem = Memory()
mem.extend(200 + 32)
a = BitVec("a", 256)
b = BitVec("b", 8)
a = symbol_factory.BitVecSym("a", 256)
b = symbol_factory.BitVecSym("b", 8)
# Act
mem[11] = 10

@ -1,6 +1,6 @@
import pytest
from mythril.laser.ethereum.state.account import Storage
from z3 import ExprRef
from mythril.laser.smt import Expression
storage_uninitialized_test_data = [({}, 1), ({1: 5}, 2), ({1: 5, 3: 10}, 2)]
@ -28,7 +28,7 @@ def test_symbolic_storage_uninitialized_index(initial_storage, key):
value = storage[key]
# Assert
assert isinstance(value, ExprRef)
assert isinstance(value, Expression)
def test_storage_set_item():

@ -7,4 +7,4 @@ contract Suicide {
selfdestruct(addr);
}
}
}

Loading…
Cancel
Save