Merge branch 'fix/constraints-after-issues' of github.com:ConsenSys/mythril-classic into fix/constraints-after-issues

fix/constraints-after-issues
Nathan 5 years ago
commit e6963632df
  1. 110
      mythril/analysis/modules/delegatecall.py
  2. 7
      mythril/analysis/solver.py
  3. 2
      mythril/analysis/templates/report_as_markdown.jinja2
  4. 2
      mythril/analysis/templates/report_as_text.jinja2
  5. 6
      mythril/laser/ethereum/evm_exceptions.py
  6. 255
      mythril/laser/ethereum/instructions.py
  7. 16
      mythril/laser/ethereum/plugins/implementations/mutation_pruner.py
  8. 2
      mythril/laser/ethereum/state/annotation.py
  9. 4
      mythril/laser/ethereum/state/environment.py
  10. 4
      mythril/laser/ethereum/svm.py
  11. 4
      mythril/laser/ethereum/transaction/symbolic.py
  12. 10
      mythril/laser/ethereum/transaction/transaction_models.py
  13. 124
      tests/instructions/static_call_test.py

@ -20,60 +20,6 @@ from mythril.laser.smt import symbol_factory, UGT
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class DelegateCallAnnotation(StateAnnotation):
def __init__(self, call_state: GlobalState, constraints: List) -> None:
"""
Initialize DelegateCall Annotation
:param call_state: Call state
"""
self.call_state = call_state
self.constraints = constraints
self.return_value = call_state.new_bitvec(
"retval_{}".format(call_state.get_current_instruction()["address"]), 256
)
def _copy__(self):
return DelegateCallAnnotation(self.call_state, copy(self.constraints))
def get_issue(self, global_state: GlobalState, transaction_sequence: Dict) -> Issue:
"""
Returns Issue for the annotation
:param global_state: Global State
:param transaction_sequence: Transaction sequence
:return: Issue
"""
address = self.call_state.get_current_instruction()["address"]
logging.debug(
"[DELEGATECALL] Detected delegatecall to a user-supplied address : {}".format(
address
)
)
description_head = "The contract delegates execution to another contract with a user-supplied address."
description_tail = (
"The smart contract delegates execution to a user-supplied address. Note that callers "
"can execute arbitrary contracts and that the callee contract "
"can access the storage of the calling contract. "
)
return Issue(
contract=self.call_state.environment.active_account.contract_name,
function_name=self.call_state.environment.active_function_name,
address=address,
swc_id=DELEGATECALL_TO_UNTRUSTED_CONTRACT,
title="Delegatecall Proxy To User-Supplied Address",
bytecode=global_state.environment.code.bytecode,
severity="Medium",
description_head=description_head,
description_tail=description_tail,
transaction_sequence=transaction_sequence,
gas_used=(
global_state.mstate.min_gas_used,
global_state.mstate.max_gas_used,
),
)
class DelegateCallModule(DetectionModule): class DelegateCallModule(DetectionModule):
"""This module detects calldata being forwarded using DELEGATECALL.""" """This module detects calldata being forwarded using DELEGATECALL."""
@ -84,7 +30,7 @@ class DelegateCallModule(DetectionModule):
swc_id=DELEGATECALL_TO_UNTRUSTED_CONTRACT, swc_id=DELEGATECALL_TO_UNTRUSTED_CONTRACT,
description="Check for invocations of delegatecall(msg.data) in the fallback function.", description="Check for invocations of delegatecall(msg.data) in the fallback function.",
entrypoint="callback", entrypoint="callback",
pre_hooks=["DELEGATECALL", "RETURN", "STOP"], pre_hooks=["DELEGATECALL"],
) )
def _execute(self, state: GlobalState) -> None: def _execute(self, state: GlobalState) -> None:
@ -106,17 +52,8 @@ class DelegateCallModule(DetectionModule):
:param state: the current state :param state: the current state
:return: returns the issues for that corresponding state :return: returns the issues for that corresponding state
""" """
issues = []
op_code = state.get_current_instruction()["opcode"] op_code = state.get_current_instruction()["opcode"]
annotations = cast(
List[DelegateCallAnnotation],
list(state.get_annotations(DelegateCallAnnotation)),
)
if len(annotations) == 0 and op_code in ("RETURN", "STOP"):
return []
if op_code == "DELEGATECALL":
gas = state.mstate.stack[-1] gas = state.mstate.stack[-1]
to = state.mstate.stack[-2] to = state.mstate.stack[-2]
@ -129,27 +66,42 @@ class DelegateCallModule(DetectionModule):
if not isinstance(tx, ContractCreationTransaction): if not isinstance(tx, ContractCreationTransaction):
constraints.append(tx.caller == ATTACKER_ADDRESS) constraints.append(tx.caller == ATTACKER_ADDRESS)
state.annotate(DelegateCallAnnotation(state, constraints))
return []
else:
for annotation in annotations:
try: try:
transaction_sequence = solver.get_transaction_sequence( transaction_sequence = solver.get_transaction_sequence(
state, state, state.mstate.constraints + constraints
state.mstate.constraints
+ annotation.constraints
+ [annotation.return_value == 1],
) )
issues.append(
annotation.get_issue( address = state.get_current_instruction()["address"]
state, transaction_sequence=transaction_sequence logging.debug(
"[DELEGATECALL] Detected delegatecall to a user-supplied address : {}".format(
address
) )
) )
except UnsatError: description_head = "The contract delegates execution to another contract with a user-supplied address."
continue description_tail = (
"The smart contract delegates execution to a user-supplied address. Note that callers "
"can execute arbitrary contracts and that the callee contract "
"can access the storage of the calling contract. "
)
return [
Issue(
contract=state.environment.active_account.contract_name,
function_name=state.environment.active_function_name,
address=address,
swc_id=DELEGATECALL_TO_UNTRUSTED_CONTRACT,
bytecode=state.environment.code.bytecode,
title="Delegatecall Proxy To User-Supplied Address",
severity="Medium",
description_head=description_head,
description_tail=description_tail,
transaction_sequence=transaction_sequence,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
]
return issues except UnsatError:
return []
detector = DelegateCallModule() detector = DelegateCallModule()

@ -152,11 +152,12 @@ def _get_concrete_transaction(model: z3.Model, transaction: BaseTransaction):
"%x" % model.eval(transaction.caller.raw, model_completion=True).as_long() "%x" % model.eval(transaction.caller.raw, model_completion=True).as_long()
).zfill(40) ).zfill(40)
input_ = ""
if isinstance(transaction, ContractCreationTransaction): if isinstance(transaction, ContractCreationTransaction):
address = "" address = ""
input_ = transaction.code.bytecode input_ += transaction.code.bytecode
else:
input_ = "".join( input_ += "".join(
[ [
hex(b)[2:] if len(hex(b)) % 2 == 0 else "0" + hex(b)[2:] hex(b)[2:] if len(hex(b)) % 2 == 0 else "0" + hex(b)[2:]
for b in transaction.call_data.concrete(model) for b in transaction.call_data.concrete(model)

@ -36,7 +36,7 @@ Account: {% if key == "0xaffeaffeaffeaffeaffeaffeaffeaffeaffeaffe" %}[CREATOR]{%
{% for step in issue.tx_sequence.steps %} {% for step in issue.tx_sequence.steps %}
{% if step == issue.tx_sequence.steps[0] and step.input != "0x" and step.origin == "0xaffeaffeaffeaffeaffeaffeaffeaffeaffeaffe" %} {% if step == issue.tx_sequence.steps[0] and step.input != "0x" and step.origin == "0xaffeaffeaffeaffeaffeaffeaffeaffeaffeaffe" %}
Caller: [CREATOR], data: [CONTRACT CREATION], value: {{ step.value }} Caller: [CREATOR], data: {{ step.input }}, value: {{ step.value }}
{% else %} {% else %}
Caller: {% if step.origin == "0xaffeaffeaffeaffeaffeaffeaffeaffeaffeaffe" %}[CREATOR]{% elif step.origin == "0xdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef" %}[ATTACKER]{% else %}[SOMEGUY]{% endif %}, function: {{ step.name }}, txdata: {{ step.input }}, value: {{ step.value }} Caller: {% if step.origin == "0xaffeaffeaffeaffeaffeaffeaffeaffeaffeaffe" %}[CREATOR]{% elif step.origin == "0xdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef" %}[ATTACKER]{% else %}[SOMEGUY]{% endif %}, function: {{ step.name }}, txdata: {{ step.input }}, value: {{ step.value }}
{% endif %} {% endif %}

@ -29,7 +29,7 @@ Transaction Sequence:
{% for step in issue.tx_sequence.steps %} {% for step in issue.tx_sequence.steps %}
{% if step == issue.tx_sequence.steps[0] and step.input != "0x" and step.origin == "0xaffeaffeaffeaffeaffeaffeaffeaffeaffeaffe" %} {% if step == issue.tx_sequence.steps[0] and step.input != "0x" and step.origin == "0xaffeaffeaffeaffeaffeaffeaffeaffeaffeaffe" %}
Caller: [CREATOR], data: [CONTRACT CREATION], value: {{ step.value }} Caller: [CREATOR], data: {{ step.input }}, value: {{ step.value }}
{% else %} {% else %}
Caller: {% if step.origin == "0xaffeaffeaffeaffeaffeaffeaffeaffeaffeaffe" %}[CREATOR]{% elif step.origin == "0xdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef" %}[ATTACKER]{% else %}[SOMEGUY]{% endif %}, function: {{ step.name }}, txdata: {{ step.input }}, value: {{ step.value }} Caller: {% if step.origin == "0xaffeaffeaffeaffeaffeaffeaffeaffeaffeaffe" %}[CREATOR]{% elif step.origin == "0xdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef" %}[ATTACKER]{% else %}[SOMEGUY]{% endif %}, function: {{ step.name }}, txdata: {{ step.input }}, value: {{ step.value }}
{% endif %} {% endif %}

@ -37,6 +37,12 @@ class OutOfGasException(VmException):
pass pass
class WriteProtection(VmException):
"""A VM exception denoting that a write operation is executed on a write protected environment"""
pass
class ProgramCounterException(VmException): class ProgramCounterException(VmException):
"""A VM exception denoting an invalid PC value (No stop instruction is reached).""" """A VM exception denoting an invalid PC value (No stop instruction is reached)."""

@ -38,6 +38,7 @@ from mythril.laser.ethereum.evm_exceptions import (
InvalidJumpDestination, InvalidJumpDestination,
InvalidInstruction, InvalidInstruction,
OutOfGasException, OutOfGasException,
WriteProtection,
) )
from mythril.laser.ethereum.gas import OPCODE_GAS from mythril.laser.ethereum.gas import OPCODE_GAS
from mythril.laser.ethereum.state.global_state import GlobalState from mythril.laser.ethereum.state.global_state import GlobalState
@ -88,14 +89,18 @@ class StateTransition(object):
if `increment_pc=True`. if `increment_pc=True`.
""" """
def __init__(self, increment_pc=True, enable_gas=True): def __init__(
self, increment_pc=True, enable_gas=True, is_state_mutation_instruction=False
):
""" """
:param increment_pc: :param increment_pc:
:param enable_gas: :param enable_gas:
:param is_state_mutation_instruction: The function mutates state
""" """
self.increment_pc = increment_pc self.increment_pc = increment_pc
self.enable_gas = enable_gas self.enable_gas = enable_gas
self.is_state_mutation_instruction = is_state_mutation_instruction
@staticmethod @staticmethod
def call_on_state_copy(func: Callable, func_obj: "Instruction", state: GlobalState): def call_on_state_copy(func: Callable, func_obj: "Instruction", state: GlobalState):
@ -165,6 +170,13 @@ class StateTransition(object):
:param global_state: :param global_state:
:return: :return:
""" """
if self.is_state_mutation_instruction and global_state.environment.static:
raise WriteProtection(
"The function {} cannot be executed in a static call".format(
func.__name__[:-1]
)
)
new_global_states = self.call_on_state_copy(func, func_obj, global_state) new_global_states = self.call_on_state_copy(func, func_obj, global_state)
new_global_states = [ new_global_states = [
self.accumulate_gas(state) for state in new_global_states self.accumulate_gas(state) for state in new_global_states
@ -758,34 +770,33 @@ class Instruction:
""" """
state = global_state.mstate state = global_state.mstate
environment = global_state.environment environment = global_state.environment
if isinstance(global_state.current_transaction, ContractCreationTransaction):
log.debug("Attempt to use CALLDATASIZE in creation transaction")
state.stack.append(0)
else:
state.stack.append(environment.calldata.calldatasize) state.stack.append(environment.calldata.calldatasize)
return [global_state]
@StateTransition() return [global_state]
def calldatacopy_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state: @staticmethod
:return: def _calldata_copy_helper(global_state, state, mstart, dstart, size):
"""
state = global_state.mstate
environment = global_state.environment environment = global_state.environment
op0, op1, op2 = state.stack.pop(), state.stack.pop(), state.stack.pop()
try: try:
mstart = util.get_concrete_int(op0) mstart = util.get_concrete_int(mstart)
except TypeError: except TypeError:
log.debug("Unsupported symbolic memory offset in CALLDATACOPY") log.debug("Unsupported symbolic memory offset in CALLDATACOPY")
return [global_state] return [global_state]
try: try:
dstart = util.get_concrete_int(op1) # type: Union[int, BitVec] dstart = util.get_concrete_int(dstart) # type: Union[int, BitVec]
except TypeError: except TypeError:
log.debug("Unsupported symbolic calldata offset in CALLDATACOPY") log.debug("Unsupported symbolic calldata offset in CALLDATACOPY")
dstart = simplify(op1) dstart = simplify(dstart)
try: try:
size = util.get_concrete_int(op2) # type: Union[int, BitVec] size = util.get_concrete_int(size) # type: Union[int, BitVec]
except TypeError: except TypeError:
log.debug("Unsupported symbolic size in CALLDATACOPY") log.debug("Unsupported symbolic size in CALLDATACOPY")
size = 320 # The excess size will get overwritten size = 320 # The excess size will get overwritten
@ -839,6 +850,22 @@ class Instruction:
) )
return [global_state] return [global_state]
@StateTransition()
def calldatacopy_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
op0, op1, op2 = state.stack.pop(), state.stack.pop(), state.stack.pop()
if isinstance(global_state.current_transaction, ContractCreationTransaction):
log.debug("Attempt to use CALLDATACOPY in creation transaction")
return [global_state]
return self._calldata_copy_helper(global_state, state, op0, op1, op2)
# Environment # Environment
@StateTransition() @StateTransition()
def address_(self, global_state: GlobalState) -> List[GlobalState]: def address_(self, global_state: GlobalState) -> List[GlobalState]:
@ -902,7 +929,17 @@ class Instruction:
state = global_state.mstate state = global_state.mstate
environment = global_state.environment environment = global_state.environment
disassembly = environment.code disassembly = environment.code
state.stack.append(len(disassembly.bytecode) // 2) if isinstance(global_state.current_transaction, ContractCreationTransaction):
# Hacky way to ensure constructor arguments work - Pick some reasonably large size.
no_of_bytes = (
len(disassembly.bytecode) // 2 + 0x200
) # space for 16 32-byte arguments
global_state.mstate.constraints.append(
global_state.environment.calldata.size == no_of_bytes
)
else:
no_of_bytes = len(disassembly.bytecode) // 2
state.stack.append(no_of_bytes)
return [global_state] return [global_state]
@StateTransition(enable_gas=False) @StateTransition(enable_gas=False)
@ -1026,6 +1063,20 @@ class Instruction:
global_state.mstate.stack.pop(), global_state.mstate.stack.pop(),
global_state.mstate.stack.pop(), global_state.mstate.stack.pop(),
) )
if (
isinstance(global_state.current_transaction, ContractCreationTransaction)
and code_offset >= len(global_state.environment.code.bytecode) // 2
):
# Treat creation code after the expected disassembly as calldata.
# This is a slightly hacky way to ensure that symbolic constructor
# arguments work correctly.
offset = code_offset - len(global_state.environment.code.bytecode) // 2
log.warning("Doing hacky thing offset: {} size: {}".format(offset, size))
return self._calldata_copy_helper(
global_state, global_state.mstate, memory_offset, offset, size
)
else:
return self._code_copy_helper( return self._code_copy_helper(
code=global_state.environment.code.bytecode, code=global_state.environment.code.bytecode,
memory_offset=memory_offset, memory_offset=memory_offset,
@ -1396,7 +1447,7 @@ class Instruction:
state.stack.append(global_state.environment.active_account.storage[index]) state.stack.append(global_state.environment.active_account.storage[index])
return [global_state] return [global_state]
@StateTransition() @StateTransition(is_state_mutation_instruction=True)
def sstore_(self, global_state: GlobalState) -> List[GlobalState]: def sstore_(self, global_state: GlobalState) -> List[GlobalState]:
""" """
@ -1563,7 +1614,7 @@ class Instruction:
global_state.mstate.stack.append(global_state.new_bitvec("gas", 256)) global_state.mstate.stack.append(global_state.new_bitvec("gas", 256))
return [global_state] return [global_state]
@StateTransition() @StateTransition(is_state_mutation_instruction=True)
def log_(self, global_state: GlobalState) -> List[GlobalState]: def log_(self, global_state: GlobalState) -> List[GlobalState]:
""" """
@ -1578,7 +1629,7 @@ class Instruction:
# Not supported # Not supported
return [global_state] return [global_state]
@StateTransition() @StateTransition(is_state_mutation_instruction=True)
def create_(self, global_state: GlobalState) -> List[GlobalState]: def create_(self, global_state: GlobalState) -> List[GlobalState]:
""" """
@ -1586,13 +1637,14 @@ class Instruction:
:return: :return:
""" """
# TODO: implement me # TODO: implement me
state = global_state.mstate state = global_state.mstate
state.stack.pop(), state.stack.pop(), state.stack.pop() state.stack.pop(), state.stack.pop(), state.stack.pop()
# Not supported # Not supported
state.stack.append(0) state.stack.append(0)
return [global_state] return [global_state]
@StateTransition() @StateTransition(is_state_mutation_instruction=True)
def create2_(self, global_state: GlobalState) -> List[GlobalState]: def create2_(self, global_state: GlobalState) -> List[GlobalState]:
""" """
@ -1628,7 +1680,7 @@ class Instruction:
return_data = state.memory[offset : offset + length] return_data = state.memory[offset : offset + length]
global_state.current_transaction.end(global_state, return_data) global_state.current_transaction.end(global_state, return_data)
@StateTransition() @StateTransition(is_state_mutation_instruction=True)
def suicide_(self, global_state: GlobalState): def suicide_(self, global_state: GlobalState):
""" """
@ -1733,6 +1785,21 @@ class Instruction:
) )
return [global_state] return [global_state]
if environment.static:
if isinstance(value, int) and value > 0:
raise WriteProtection(
"Cannot call with non zero value in a static call"
)
if isinstance(value, BitVec):
if value.symbolic:
global_state.mstate.constraints.append(
value == symbol_factory.BitVecVal(0, 256)
)
elif value.value > 0:
raise WriteProtection(
"Cannot call with non zero value in a static call"
)
native_result = native_call( native_result = native_call(
global_state, callee_address, call_data, memory_out_offset, memory_out_size global_state, callee_address, call_data, memory_out_offset, memory_out_size
) )
@ -1747,8 +1814,9 @@ class Instruction:
callee_account=callee_account, callee_account=callee_account,
call_data=call_data, call_data=call_data,
call_value=value, call_value=value,
static=environment.static,
) )
raise TransactionStartSignal(transaction, self.op_code) raise TransactionStartSignal(transaction, self.op_code, global_state)
@StateTransition() @StateTransition()
def call_post(self, global_state: GlobalState) -> List[GlobalState]: def call_post(self, global_state: GlobalState) -> List[GlobalState]:
@ -1757,65 +1825,8 @@ class Instruction:
:param global_state: :param global_state:
:return: :return:
""" """
instr = global_state.get_current_instruction()
try: return self.post_handler(global_state, function_name="call")
callee_address, callee_account, call_data, value, gas, memory_out_offset, memory_out_size = get_call_parameters(
global_state, self.dynamic_loader, True
)
except ValueError as e:
log.debug(
"Could not determine required parameters for call, putting fresh symbol on the stack. \n{}".format(
e
)
)
global_state.mstate.stack.append(
global_state.new_bitvec("retval_" + str(instr["address"]), 256)
)
return [global_state]
if global_state.last_return_data is None:
# Put return value on stack
return_value = global_state.new_bitvec(
"retval_" + str(instr["address"]), 256
)
global_state.mstate.stack.append(return_value)
global_state.mstate.constraints.append(return_value == 0)
return [global_state]
try:
memory_out_offset = (
util.get_concrete_int(memory_out_offset)
if isinstance(memory_out_offset, Expression)
else memory_out_offset
)
memory_out_size = (
util.get_concrete_int(memory_out_size)
if isinstance(memory_out_size, Expression)
else memory_out_size
)
except TypeError:
global_state.mstate.stack.append(
global_state.new_bitvec("retval_" + str(instr["address"]), 256)
)
return [global_state]
# Copy memory
global_state.mstate.mem_extend(
memory_out_offset, min(memory_out_size, len(global_state.last_return_data))
)
for i in range(min(memory_out_size, len(global_state.last_return_data))):
global_state.mstate.memory[
i + memory_out_offset
] = global_state.last_return_data[i]
# Put return value on stack
return_value = global_state.new_bitvec("retval_" + str(instr["address"]), 256)
global_state.mstate.stack.append(return_value)
global_state.mstate.constraints.append(return_value == 1)
return [global_state]
@StateTransition() @StateTransition()
def callcode_(self, global_state: GlobalState) -> List[GlobalState]: def callcode_(self, global_state: GlobalState) -> List[GlobalState]:
@ -1831,7 +1842,6 @@ class Instruction:
callee_address, callee_account, call_data, value, gas, _, _ = get_call_parameters( callee_address, callee_account, call_data, value, gas, _, _ = get_call_parameters(
global_state, self.dynamic_loader, True global_state, self.dynamic_loader, True
) )
if callee_account is not None and callee_account.code.bytecode == "": if callee_account is not None and callee_account.code.bytecode == "":
log.debug("The call is related to ether transfer between accounts") log.debug("The call is related to ether transfer between accounts")
sender = global_state.environment.active_account.address sender = global_state.environment.active_account.address
@ -1864,8 +1874,9 @@ class Instruction:
callee_account=environment.active_account, callee_account=environment.active_account,
call_data=call_data, call_data=call_data,
call_value=value, call_value=value,
static=environment.static,
) )
raise TransactionStartSignal(transaction, self.op_code) raise TransactionStartSignal(transaction, self.op_code, global_state)
@StateTransition() @StateTransition()
def callcode_post(self, global_state: GlobalState) -> List[GlobalState]: def callcode_post(self, global_state: GlobalState) -> List[GlobalState]:
@ -1949,7 +1960,7 @@ class Instruction:
if callee_account is not None and callee_account.code.bytecode == "": if callee_account is not None and callee_account.code.bytecode == "":
log.debug("The call is related to ether transfer between accounts") log.debug("The call is related to ether transfer between accounts")
sender = environment.active_account.address sender = global_state.environment.active_account.address
receiver = callee_account.address receiver = callee_account.address
transfer_ether(global_state, sender, receiver, value) transfer_ether(global_state, sender, receiver, value)
@ -1979,8 +1990,9 @@ class Instruction:
callee_account=environment.active_account, callee_account=environment.active_account,
call_data=call_data, call_data=call_data,
call_value=environment.callvalue, call_value=environment.callvalue,
static=environment.static,
) )
raise TransactionStartSignal(transaction, self.op_code) raise TransactionStartSignal(transaction, self.op_code, global_state)
@StateTransition() @StateTransition()
def delegatecall_post(self, global_state: GlobalState) -> List[GlobalState]: def delegatecall_post(self, global_state: GlobalState) -> List[GlobalState]:
@ -2012,6 +2024,7 @@ class Instruction:
"retval_" + str(instr["address"]), 256 "retval_" + str(instr["address"]), 256
) )
global_state.mstate.stack.append(return_value) global_state.mstate.stack.append(return_value)
global_state.mstate.constraints.append(return_value == 0)
return [global_state] return [global_state]
try: try:
@ -2053,8 +2066,8 @@ class Instruction:
:param global_state: :param global_state:
:return: :return:
""" """
# TODO: implement me
instr = global_state.get_current_instruction() instr = global_state.get_current_instruction()
environment = global_state.environment
try: try:
callee_address, callee_account, call_data, value, gas, memory_out_offset, memory_out_size = get_call_parameters( callee_address, callee_account, call_data, value, gas, memory_out_offset, memory_out_size = get_call_parameters(
global_state, self.dynamic_loader global_state, self.dynamic_loader
@ -2062,7 +2075,7 @@ class Instruction:
if callee_account is not None and callee_account.code.bytecode == "": if callee_account is not None and callee_account.code.bytecode == "":
log.debug("The call is related to ether transfer between accounts") log.debug("The call is related to ether transfer between accounts")
sender = global_state.environment.active_account.address sender = environment.active_account.address
receiver = callee_account.address receiver = callee_account.address
transfer_ether(global_state, sender, receiver, value) transfer_ether(global_state, sender, receiver, value)
@ -2085,10 +2098,82 @@ class Instruction:
native_result = native_call( native_result = native_call(
global_state, callee_address, call_data, memory_out_offset, memory_out_size global_state, callee_address, call_data, memory_out_offset, memory_out_size
) )
if native_result: if native_result:
return native_result return native_result
transaction = MessageCallTransaction(
world_state=global_state.world_state,
gas_price=environment.gasprice,
gas_limit=gas,
origin=environment.origin,
code=callee_account.code,
caller=environment.address,
callee_account=environment.active_account,
call_data=call_data,
call_value=value,
static=True,
)
raise TransactionStartSignal(transaction, self.op_code, global_state)
def staticcall_post(self, global_state: GlobalState) -> List[GlobalState]:
return self.post_handler(global_state, function_name="staticcall")
def post_handler(self, global_state, function_name: str):
instr = global_state.get_current_instruction()
try:
callee_address, callee_account, call_data, value, gas, memory_out_offset, memory_out_size = get_call_parameters(
global_state, self.dynamic_loader, True
)
except ValueError as e:
log.debug(
"Could not determine required parameters for {}, putting fresh symbol on the stack. \n{}".format(
function_name, e
)
)
global_state.mstate.stack.append(
global_state.new_bitvec("retval_" + str(instr["address"]), 256)
)
return [global_state]
if global_state.last_return_data is None:
# Put return value on stack
return_value = global_state.new_bitvec(
"retval_" + str(instr["address"]), 256
)
global_state.mstate.stack.append(return_value)
return [global_state]
try:
memory_out_offset = (
util.get_concrete_int(memory_out_offset)
if isinstance(memory_out_offset, Expression)
else memory_out_offset
)
memory_out_size = (
util.get_concrete_int(memory_out_size)
if isinstance(memory_out_size, Expression)
else memory_out_size
)
except TypeError:
global_state.mstate.stack.append( global_state.mstate.stack.append(
global_state.new_bitvec("retval_" + str(instr["address"]), 256) global_state.new_bitvec("retval_" + str(instr["address"]), 256)
) )
return [global_state] return [global_state]
# Copy memory
global_state.mstate.mem_extend(
memory_out_offset, min(memory_out_size, len(global_state.last_return_data))
)
for i in range(min(memory_out_size, len(global_state.last_return_data))):
global_state.mstate.memory[
i + memory_out_offset
] = global_state.last_return_data[i]
# Put return value on stack
return_value = global_state.new_bitvec("retval_" + str(instr["address"]), 256)
global_state.mstate.stack.append(return_value)
global_state.mstate.constraints.append(return_value == 1)
return [global_state]

@ -15,7 +15,10 @@ class MutationAnnotation(StateAnnotation):
This is the annotation used by the MutationPruner plugin to record mutations This is the annotation used by the MutationPruner plugin to record mutations
""" """
pass @property
def persist_to_world_state(self):
# This should persist among calls, and be but as a world state annotation.
return True
class MutationPruner(LaserPlugin): class MutationPruner(LaserPlugin):
@ -44,10 +47,16 @@ class MutationPruner(LaserPlugin):
@symbolic_vm.pre_hook("SSTORE") @symbolic_vm.pre_hook("SSTORE")
def sstore_mutator_hook(global_state: GlobalState): def sstore_mutator_hook(global_state: GlobalState):
global_state.annotate(MutationAnnotation()) global_state.annotate(MutationAnnotation())
assert len(
list(global_state.world_state.get_annotations(MutationAnnotation))
)
@symbolic_vm.pre_hook("CALL") @symbolic_vm.pre_hook("CALL")
def call_mutator_hook(global_state: GlobalState): def call_mutator_hook(global_state: GlobalState):
global_state.annotate(MutationAnnotation()) global_state.annotate(MutationAnnotation())
assert len(
list(global_state.world_state.get_annotations(MutationAnnotation))
)
@symbolic_vm.laser_hook("add_world_state") @symbolic_vm.laser_hook("add_world_state")
def world_state_filter_hook(global_state: GlobalState): def world_state_filter_hook(global_state: GlobalState):
@ -63,5 +72,8 @@ class MutationPruner(LaserPlugin):
global_state.current_transaction, ContractCreationTransaction global_state.current_transaction, ContractCreationTransaction
): ):
return return
if len(list(global_state.get_annotations(MutationAnnotation))) == 0: if (
len(list(global_state.world_state.get_annotations(MutationAnnotation)))
== 0
):
raise PluginSkipWorldState raise PluginSkipWorldState

@ -12,6 +12,8 @@ class StateAnnotation:
traverse the state space themselves. traverse the state space themselves.
""" """
# TODO: Remove this? It seems to be used only in the MutationPruner, and
# we could simply use world state annotations if we want them to be persisted.
@property @property
def persist_to_world_state(self) -> bool: def persist_to_world_state(self) -> bool:
"""If this function returns true then laser will also annotate the """If this function returns true then laser will also annotate the

@ -22,6 +22,7 @@ class Environment:
callvalue: ExprRef, callvalue: ExprRef,
origin: ExprRef, origin: ExprRef,
code=None, code=None,
static=False,
) -> None: ) -> None:
""" """
@ -32,7 +33,7 @@ class Environment:
:param callvalue: :param callvalue:
:param origin: :param origin:
:param code: :param code:
:param calldata_type: :param static: Makes the environment static.
""" """
# Metadata # Metadata
@ -50,6 +51,7 @@ class Environment:
self.gasprice = gasprice self.gasprice = gasprice
self.origin = origin self.origin = origin
self.callvalue = callvalue self.callvalue = callvalue
self.static = static
def __str__(self) -> str: def __str__(self) -> str:
""" """

@ -325,7 +325,9 @@ class LaserEVM:
global_state.transaction_stack global_state.transaction_stack
) + [(start_signal.transaction, global_state)] ) + [(start_signal.transaction, global_state)]
new_global_state.node = global_state.node new_global_state.node = global_state.node
new_global_state.mstate.constraints = global_state.mstate.constraints new_global_state.mstate.constraints = (
start_signal.global_state.mstate.constraints
)
log.debug("Starting new transaction %s", start_signal.transaction) log.debug("Starting new transaction %s", start_signal.transaction)

@ -85,6 +85,8 @@ def execute_contract_creation(
new_account = None new_account = None
for open_world_state in open_states: for open_world_state in open_states:
next_transaction_id = get_next_transaction_id() next_transaction_id = get_next_transaction_id()
# call_data "should" be '[]', but it is easier to model the calldata symbolically
# and add logic in codecopy/codesize/calldatacopy/calldatasize than to model code "correctly"
transaction = ContractCreationTransaction( transaction = ContractCreationTransaction(
world_state=open_world_state, world_state=open_world_state,
identifier=next_transaction_id, identifier=next_transaction_id,
@ -98,7 +100,7 @@ def execute_contract_creation(
code=Disassembly(contract_initialization_code), code=Disassembly(contract_initialization_code),
caller=symbol_factory.BitVecVal(CREATOR_ADDRESS, 256), caller=symbol_factory.BitVecVal(CREATOR_ADDRESS, 256),
contract_name=contract_name, contract_name=contract_name,
call_data=[], call_data=None,
call_value=symbol_factory.BitVecSym( call_value=symbol_factory.BitVecSym(
"call_value{}".format(next_transaction_id), 256 "call_value{}".format(next_transaction_id), 256
), ),

@ -45,9 +45,11 @@ class TransactionStartSignal(Exception):
self, self,
transaction: Union["MessageCallTransaction", "ContractCreationTransaction"], transaction: Union["MessageCallTransaction", "ContractCreationTransaction"],
op_code: str, op_code: str,
global_state: GlobalState,
) -> None: ) -> None:
self.transaction = transaction self.transaction = transaction
self.op_code = op_code self.op_code = op_code
self.global_state = global_state
class BaseTransaction: class BaseTransaction:
@ -66,6 +68,7 @@ class BaseTransaction:
code=None, code=None,
call_value=None, call_value=None,
init_call_data=True, init_call_data=True,
static=False,
) -> None: ) -> None:
assert isinstance(world_state, WorldState) assert isinstance(world_state, WorldState)
self.world_state = world_state self.world_state = world_state
@ -101,7 +104,7 @@ class BaseTransaction:
if call_value is not None if call_value is not None
else symbol_factory.BitVecSym("callvalue{}".format(identifier), 256) else symbol_factory.BitVecSym("callvalue{}".format(identifier), 256)
) )
self.static = static
self.return_data = None # type: str self.return_data = None # type: str
def initial_global_state_from_environment(self, environment, active_function): def initial_global_state_from_environment(self, environment, active_function):
@ -159,6 +162,7 @@ class MessageCallTransaction(BaseTransaction):
self.call_value, self.call_value,
self.origin, self.origin,
code=self.code or self.callee_account.code, code=self.code or self.callee_account.code,
static=self.static,
) )
return super().initial_global_state_from_environment( return super().initial_global_state_from_environment(
environment, active_function="fallback" environment, active_function="fallback"
@ -197,6 +201,8 @@ class ContractCreationTransaction(BaseTransaction):
0, concrete_storage=True, creator=caller.value 0, concrete_storage=True, creator=caller.value
) )
callee_account.contract_name = contract_name callee_account.contract_name = contract_name
# init_call_data "should" be false, but it is easier to model the calldata symbolically
# and add logic in codecopy/codesize/calldatacopy/calldatasize than to model code "correctly"
super().__init__( super().__init__(
world_state=world_state, world_state=world_state,
callee_account=callee_account, callee_account=callee_account,
@ -208,7 +214,7 @@ class ContractCreationTransaction(BaseTransaction):
origin=origin, origin=origin,
code=code, code=code,
call_value=call_value, call_value=call_value,
init_call_data=False, init_call_data=True,
) )
def initial_global_state(self) -> GlobalState: def initial_global_state(self) -> GlobalState:

@ -0,0 +1,124 @@
import pytest
from mock import patch
from mythril.disassembler.disassembly import Disassembly
from mythril.laser.smt import symbol_factory
from mythril.laser.ethereum.state.environment import Environment
from mythril.laser.ethereum.state.account import Account
from mythril.laser.ethereum.state.machine_state import MachineState
from mythril.laser.ethereum.state.global_state import GlobalState
from mythril.laser.ethereum.state.world_state import WorldState
from mythril.laser.ethereum.instructions import Instruction
from mythril.laser.ethereum.transaction.transaction_models import MessageCallTransaction
from mythril.laser.ethereum.call import SymbolicCalldata
from mythril.laser.ethereum.transaction import TransactionStartSignal
from mythril.laser.ethereum.evm_exceptions import WriteProtection
def get_global_state():
active_account = Account("0x0", code=Disassembly("60606040"))
environment = Environment(
active_account, None, SymbolicCalldata("2"), None, None, None
)
world_state = WorldState()
world_state.put_account(active_account)
state = GlobalState(world_state, environment, None, MachineState(gas_limit=8000000))
state.transaction_stack.append(
(MessageCallTransaction(world_state=world_state, gas_limit=8000000), None)
)
return state
@patch(
"mythril.laser.ethereum.instructions.get_call_parameters",
return_value=(
"0",
Account(code=Disassembly(code="0x00"), address="0x19"),
0,
0,
0,
0,
0,
),
)
def test_staticcall(f1):
# Arrange
state = get_global_state()
state.mstate.stack = [10, 10, 10, 10, 10, 10, 10, 10, 0]
instruction = Instruction("staticcall", dynamic_loader=None)
# Act and Assert
with pytest.raises(TransactionStartSignal) as ts:
instruction.evaluate(state)
assert ts.value.transaction.static
assert ts.value.transaction.initial_global_state().environment.static
test_data = (
"suicide",
"create",
"create2",
"log0",
"log1",
"log2",
"log3",
"log4",
"sstore",
)
@pytest.mark.parametrize("input", test_data)
def test_staticness(input):
# Arrange
state = get_global_state()
state.environment.static = True
state.mstate.stack = []
instruction = Instruction(input, dynamic_loader=None)
# Act and Assert
with pytest.raises(WriteProtection):
instruction.evaluate(state)
test_data_call = ((0, True), (100, False))
@pytest.mark.parametrize("input, success", test_data_call)
@patch("mythril.laser.ethereum.instructions.get_call_parameters")
def test_staticness_call_concrete(f1, input, success):
# Arrange
state = get_global_state()
state.environment.static = True
state.mstate.stack = []
code = Disassembly(code="616263")
f1.return_value = ("0", Account(code=code, address="0x19"), 0, input, 0, 0, 0)
instruction = Instruction("call", dynamic_loader=None)
# Act and Assert
if success:
with pytest.raises(TransactionStartSignal) as ts:
instruction.evaluate(state)
assert ts.value.transaction.static
else:
with pytest.raises(WriteProtection):
instruction.evaluate(state)
@patch("mythril.laser.ethereum.instructions.get_call_parameters")
def test_staticness_call_symbolic(f1):
# Arrange
state = get_global_state()
state.environment.static = True
state.mstate.stack = []
call_value = symbol_factory.BitVecSym("x", 256)
code = Disassembly(code="616263")
f1.return_value = ("0", Account(code=code, address="0x19"), 0, call_value, 0, 0, 0)
instruction = Instruction("call", dynamic_loader=None)
# Act and Assert
with pytest.raises(TransactionStartSignal) as ts:
instruction.evaluate(state)
assert ts.value.transaction.static
assert ts.value.global_state.mstate.constraints[-1] == (call_value == 0)
Loading…
Cancel
Save