First draft of ether balance modeling

model-balances
Nathan 5 years ago
parent 01c7afd6a7
commit 584bdf13d6
  1. 62
      mythril/analysis/modules/ether_thief.py
  2. 37
      mythril/analysis/solver.py
  3. 2
      mythril/analysis/templates/report_as_text.jinja2
  4. 12
      mythril/laser/ethereum/call.py
  5. 91
      mythril/laser/ethereum/instructions.py
  6. 2
      mythril/laser/ethereum/state/account.py
  7. 20
      mythril/laser/ethereum/transaction/transaction_models.py
  8. 4
      tests/laser/evm_testsuite/evm_test.py

@ -15,7 +15,15 @@ from mythril.analysis.swc_data import UNPROTECTED_ETHER_WITHDRAWAL
from mythril.exceptions import UnsatError
from mythril.laser.ethereum.transaction import ContractCreationTransaction
from mythril.laser.ethereum.state.global_state import GlobalState
from mythril.laser.smt import UGT, Sum, symbol_factory, BVAddNoOverflow, If
from mythril.laser.smt import (
UGT,
Sum,
symbol_factory,
BVAddNoOverflow,
If,
simplify,
UGE,
)
log = logging.getLogger(__name__)
@ -74,55 +82,35 @@ class EtherThief(DetectionModule):
:param state:
:return:
"""
state = copy(state)
instruction = state.get_current_instruction()
if instruction["opcode"] != "CALL":
return []
address = instruction["address"]
value = state.mstate.stack[-3]
target = state.mstate.stack[-2]
eth_sent_by_attacker = symbol_factory.BitVecVal(0, 256)
constraints = copy(state.mstate.constraints)
for tx in state.world_state.transaction_sequence:
"""
Constraint: The call value must be greater than the sum of Ether sent by the attacker over all
transactions. This prevents false positives caused by legitimate refund functions.
Also constrain the addition from overflowing (otherwise the solver produces solutions with
ridiculously high call values).
"""
constraints += [BVAddNoOverflow(eth_sent_by_attacker, tx.call_value, False)]
eth_sent_by_attacker = Sum(
eth_sent_by_attacker,
tx.call_value * If(tx.caller == ATTACKER_ADDRESS, 1, 0),
)
"""
Constraint: All transactions must originate from regular users (not the creator/owner).
This prevents false positives where the owner willingly transfers ownership to another address.
"""
if not isinstance(tx, ContractCreationTransaction):
constraints += [tx.caller != CREATOR_ADDRESS]
"""
Require that the current transaction is sent by the attacker and
that the Ether is sent to the attacker's address.
that the Ether sent to the attacker's address is greater than the
amount of Ether the attacker sent.
"""
attacker_address_bitvec = symbol_factory.BitVecVal(ATTACKER_ADDRESS, 256)
eth_recieved_by_attacker = (
value
+ state.world_state.balances[attacker_address_bitvec]
- state.world_state.starting_balances[attacker_address_bitvec]
constraints += [
UGE(
state.world_state.balances[state.environment.active_account.address],
value,
)
eth_recieved_by_attacker.simplify()
print(eth_recieved_by_attacker)
]
state.world_state.balances[attacker_address_bitvec] += value
state.world_state.balances[state.environment.active_account.address] -= value
constraints += [
UGT(eth_recieved_by_attacker, eth_sent_by_attacker),
UGT(
state.world_state.balances[attacker_address_bitvec],
state.world_state.starting_balances[attacker_address_bitvec],
),
target == ATTACKER_ADDRESS,
state.current_transaction.caller == ATTACKER_ADDRESS,
]

@ -95,7 +95,7 @@ def get_transaction_sequence(
concrete_transactions = []
tx_constraints, minimize = _set_minimisation_constraints(
transaction_sequence, constraints.copy(), [], 5000
transaction_sequence, constraints.copy(), [], 5000, global_state.world_state
)
try:
@ -103,19 +103,23 @@ def get_transaction_sequence(
except UnsatError:
raise UnsatError
min_price_dict = {} # type: Dict[str, int]
# Include creation account in initial state
# Note: This contains the code, which should not exist until after the first tx
initial_world_state = transaction_sequence[0].world_state
initial_accounts = initial_world_state.accounts
for transaction in transaction_sequence:
concrete_transaction = _get_concrete_transaction(model, transaction)
concrete_transactions.append(concrete_transaction)
caller = concrete_transaction["origin"]
value = int(concrete_transaction["value"], 16)
min_price_dict[caller] = min_price_dict.get(caller, 0) + value
if isinstance(transaction_sequence[0], ContractCreationTransaction):
initial_accounts = transaction_sequence[0].prev_world_state.accounts
else:
initial_accounts = transaction_sequence[0].world_state.accounts
min_price_dict = {} # type: Dict[str, int]
for address in initial_accounts.keys():
min_price_dict[address] = model.eval(
initial_world_state.starting_balances[
symbol_factory.BitVecVal(address, 256)
].raw,
model_completion=True,
).as_long()
concrete_initial_state = _get_concrete_state(initial_accounts, min_price_dict)
@ -171,7 +175,7 @@ def _get_concrete_transaction(model: z3.Model, transaction: BaseTransaction):
def _set_minimisation_constraints(
transaction_sequence, constraints, minimize, max_size
transaction_sequence, constraints, minimize, max_size, world_state
) -> Tuple[Constraints, tuple]:
""" Set constraints that minimise key transaction values
@ -192,6 +196,15 @@ def _set_minimisation_constraints(
# Minimize
minimize.append(transaction.call_data.calldatasize)
minimize.append(transaction.call_value)
for account in world_state.accounts.values():
# Lazy way to prevent overflows and to ensure "reasonable" balances
# Each account starts with less than 100 ETH
constraints.append(
UGE(
symbol_factory.BitVecVal(100000000000000000000, 256),
world_state.starting_balances[account.address],
)
)
return constraints, tuple(minimize)

@ -21,8 +21,6 @@ In file: {{ issue.filename }}:{{ issue.lineno }}
{% if issue.tx_sequence %}
Transaction Sequence:
{{ issue.tx_sequence.initialState }}
{% for step in issue.tx_sequence.steps %}
{% if step == issue.tx_sequence.steps[0] and step.input != "0x" and step.origin == "0xaffeaffeaffeaffeaffeaffeaffeaffeaffeaffe" %}
Caller: [CREATOR], data: [CONTRACT CREATION], value: {{ step.value }}

@ -88,13 +88,12 @@ def get_callee_address(
except TypeError:
log.debug("Symbolic call encountered")
match = re.search(r"storage_(\d+)", str(simplify(symbolic_to_address)))
match = re.search(r"Storage\[(\d+)\]", str(simplify(symbolic_to_address)))
log.debug("CALL to: " + str(simplify(symbolic_to_address)))
if match is None or dynamic_loader is None:
# TODO: Fix types
return symbolic_to_address
raise ValueError()
index = int(match.group(1))
log.debug("Dynamic contract address at storage index {}".format(index))
@ -106,8 +105,7 @@ def get_callee_address(
)
# TODO: verify whether this happens or not
except:
log.debug("Error accessing contract storage.")
raise ValueError
return symbolic_to_address
# testrpc simply returns the address, geth response is more elaborate.
if not re.match(r"^0x[0-9a-f]{40}$", callee_address):
@ -135,7 +133,7 @@ def get_callee_account(
if callee_address.symbolic:
return Account(callee_address, balances=global_state.world_state.balances)
else:
callee_address = callee_address.value
callee_address = hex(callee_address.value)[2:]
try:
return global_state.accounts[int(callee_address, 16)]
@ -223,12 +221,12 @@ def get_call_data(
def native_call(
global_state: GlobalState,
callee_address: str,
callee_address: Union[str, BitVec],
call_data: BaseCalldata,
memory_out_offset: Union[int, Expression],
memory_out_size: Union[int, Expression],
) -> Union[List[GlobalState], None]:
if not 0 < int(callee_address, 16) < 5:
if isinstance(callee_address, BitVec) or not 0 < int(callee_address, 16) < 5:
return None
log.debug("Native contract called: " + callee_address)

@ -25,7 +25,9 @@ from mythril.laser.smt import (
Bool,
Not,
LShR,
BVSubNoUnderflow, UGE)
BVSubNoUnderflow,
UGE,
)
from mythril.laser.smt import symbol_factory
import mythril.laser.ethereum.util as helper
@ -1619,7 +1621,7 @@ class Instruction:
transfer_amount = global_state.environment.active_account.balance()
# Often the target of the suicide instruction will be symbolic
# If it isn't then we'll transfer the balance to the indicated contract
global_state.world_state[target].add_balance(transfer_amount)
global_state.world_state.balances[target] += transfer_amount
global_state.environment.active_account = deepcopy(
global_state.environment.active_account
@ -1692,13 +1694,20 @@ class Instruction:
)
if callee_account is not None and callee_account.code.bytecode == "":
log.warning(
"The call is related to ether transfer between accounts"
) # TODO: was debug
log.debug("The call is related to ether transfer between accounts")
sender = environment.active_account.address
receiver = callee_account.address
value = (
value
if isinstance(value, BitVec)
else symbol_factory.BitVecVal(value, 256)
)
global_state.mstate.constraints.append(UGE(global_state.world_state.balances[environment.active_account.address], value))
#global_state.world_state.balances[environment.active_account.address] -= value
global_state.world_state.balances[callee_account.address] += value
global_state.mstate.constraints.append(
UGE(global_state.world_state.balances[sender], value)
)
global_state.world_state.balances[receiver] += value
global_state.world_state.balances[sender] -= value
global_state.mstate.stack.append(
global_state.new_bitvec("retval_" + str(instr["address"]), 256)
@ -1815,6 +1824,28 @@ class Instruction:
callee_address, callee_account, call_data, value, gas, _, _ = get_call_parameters(
global_state, self.dynamic_loader, True
)
if callee_account is not None and callee_account.code.bytecode == "":
log.debug("The call is related to ether transfer between accounts")
sender = global_state.environment.active_account.address
receiver = callee_account.address
value = (
value
if isinstance(value, BitVec)
else symbol_factory.BitVecVal(value, 256)
)
global_state.mstate.constraints.append(
UGE(global_state.world_state.balances[sender], value)
)
global_state.world_state.balances[receiver] += value
global_state.world_state.balances[sender] -= value
global_state.mstate.stack.append(
global_state.new_bitvec("retval_" + str(instr["address"]), 256)
)
return [global_state]
except ValueError as e:
log.debug(
"Could not determine required parameters for call, putting fresh symbol on the stack. \n{}".format(
@ -1918,6 +1949,28 @@ class Instruction:
callee_address, callee_account, call_data, value, gas, _, _ = get_call_parameters(
global_state, self.dynamic_loader
)
if callee_account is not None and callee_account.code.bytecode == "":
log.debug("The call is related to ether transfer between accounts")
sender = environment.active_account.address
receiver = callee_account.address
value = (
value
if isinstance(value, BitVec)
else symbol_factory.BitVecVal(value, 256)
)
global_state.mstate.constraints.append(
UGE(global_state.world_state.balances[sender], value)
)
global_state.world_state.balances[receiver] += value
global_state.world_state.balances[sender] -= value
global_state.mstate.stack.append(
global_state.new_bitvec("retval_" + str(instr["address"]), 256)
)
return [global_state]
except ValueError as e:
log.debug(
"Could not determine required parameters for call, putting fresh symbol on the stack. \n{}".format(
@ -2019,6 +2072,28 @@ class Instruction:
callee_address, callee_account, call_data, value, gas, memory_out_offset, memory_out_size = get_call_parameters(
global_state, self.dynamic_loader
)
if callee_account is not None and callee_account.code.bytecode == "":
log.debug("The call is related to ether transfer between accounts")
sender = global_state.environment.active_account.address
receiver = callee_account.address
value = (
value
if isinstance(value, BitVec)
else symbol_factory.BitVecVal(value, 256)
)
global_state.mstate.constraints.append(
UGE(global_state.world_state.balances[sender], value)
)
global_state.world_state.balances[receiver] += value
global_state.world_state.balances[sender] -= value
global_state.mstate.stack.append(
global_state.new_bitvec("retval_" + str(instr["address"]), 256)
)
return [global_state]
except ValueError as e:
log.debug(
"Could not determine required parameters for call, putting fresh symbol on the stack. \n{}".format(

@ -189,8 +189,6 @@ class Account:
self._balances = balances
self.balance = lambda: self._balances[self.address]
# TODO: Add initial balance
def __str__(self) -> str:
return str(self.as_dict)

@ -4,7 +4,7 @@ execution."""
import array
from copy import deepcopy
from z3 import ExprRef
from typing import Union, Optional, cast
from typing import Union, Optional
from mythril.laser.ethereum.state.calldata import ConcreteCalldata
from mythril.laser.ethereum.state.account import Account
@ -12,8 +12,10 @@ from mythril.laser.ethereum.state.calldata import BaseCalldata, SymbolicCalldata
from mythril.laser.ethereum.state.environment import Environment
from mythril.laser.ethereum.state.global_state import GlobalState
from mythril.laser.ethereum.state.world_state import WorldState
from mythril.disassembler.disassembly import Disassembly
from mythril.laser.smt import symbol_factory, BVSubNoUnderflow, UGE
from mythril.laser.smt import symbol_factory, UGE, BitVec
import logging
log = logging.getLogger(__name__)
_next_transaction_id = 0
@ -115,11 +117,17 @@ class BaseTransaction:
sender = environment.sender
receiver = environment.active_account.address
value = environment.callvalue
value = (
environment.callvalue
if isinstance(environment.callvalue, BitVec)
else symbol_factory.BitVecVal(environment.callvalue, 256)
)
global_state.mstate.constraints.append(UGE(global_state.world_state.balances[sender], value))
#global_state.world_state.balances[sender] -= value
global_state.mstate.constraints.append(
UGE(global_state.world_state.balances[sender], value)
)
global_state.world_state.balances[receiver] += value
global_state.world_state.balances[sender] -= value
return global_state

@ -165,8 +165,8 @@ def test_vmtest(
# no more work to do if error happens or out of gas
assert len(laser_evm.open_states) == 0
else:
assert len(laser_evm.open_states) == 1
world_state = laser_evm.open_states[0]
assert len(final_states) == 1
world_state = final_states[0].world_state
for address, details in post_condition.items():
account = world_state[symbol_factory.BitVecVal(int(address, 16), 256)]

Loading…
Cancel
Save