Merge remote-tracking branch 'origin/develop' into features/cleanup_cfg

pull/977/head
Joran Honig 6 years ago
commit d300f6e676
  1. 200
      mythril/analysis/modules/transaction_order_dependence.py
  2. 4
      mythril/analysis/symbolic.py
  3. 21
      mythril/laser/ethereum/plugins/__init__.py
  4. 7
      mythril/laser/ethereum/plugins/implementations/__init__.py
  5. 3
      mythril/laser/ethereum/plugins/implementations/benchmark.py
  6. 3
      mythril/laser/ethereum/plugins/implementations/mutation_pruner.py
  7. 23
      mythril/laser/ethereum/plugins/plugin.py
  8. 19
      mythril/laser/ethereum/plugins/plugin_factory.py
  9. 34
      mythril/laser/ethereum/plugins/plugin_loader.py
  10. 454
      mythril/laser/ethereum/taint_analysis.py
  11. 30
      tests/taint_mutate_stack_test.py
  12. 36
      tests/taint_record_test.py
  13. 35
      tests/taint_result_test.py
  14. 99
      tests/taint_runner_test.py

@ -1,200 +0,0 @@
"""This module contains the detection code to find the existence of transaction
order dependence."""
import copy
import logging
import re
from mythril.analysis import solver
from mythril.analysis.modules.base import DetectionModule
from mythril.analysis.ops import *
from mythril.analysis.report import Issue
from mythril.analysis.swc_data import TX_ORDER_DEPENDENCE
from mythril.exceptions import UnsatError
log = logging.getLogger(__name__)
# TODO: make callback & remove dependency from cfg
class TxOrderDependenceModule(DetectionModule):
"""This module finds the existence of transaction order dependence."""
def __init__(self):
super().__init__(
name="Transaction Order Dependence",
swc_id=TX_ORDER_DEPENDENCE,
description=(
"This module finds the existance of transaction order dependence "
"vulnerabilities. The following webpage contains an extensive description "
"of the vulnerability: "
"https://consensys.github.io/smart-contract-best-practices/known_attacks/#transaction-ordering-dependence-tod-front-running"
),
)
def execute(self, statespace):
"""Executes the analysis module.
:param statespace:
:return:
"""
log.debug("Executing module: TOD")
issues = []
for call in statespace.calls:
# Do analysis
interesting_storages = list(self._get_influencing_storages(call))
changing_sstores = list(
self._get_influencing_sstores(statespace, interesting_storages)
)
description_tail = (
"A transaction order dependence vulnerability may exist in this contract. The value or "
"target of the call statement is loaded from a writable storage location."
)
# Build issue if necessary
if len(changing_sstores) > 0:
node = call.node
instruction = call.state.get_current_instruction()
issue = Issue(
contract=node.contract_name,
function_name=node.function_name,
address=instruction["address"],
title="Transaction Order Dependence",
bytecode=call.state.environment.code.bytecode,
swc_id=TX_ORDER_DEPENDENCE,
severity="Medium",
description_head="The call outcome may depend on transaction order.",
description_tail=description_tail,
gas_used=(
call.state.mstate.min_gas_used,
call.state.mstate.max_gas_used,
),
)
issues.append(issue)
return issues
# TODO: move to __init__ or util module
@staticmethod
def _get_states_with_opcode(statespace, opcode):
"""Gets all (state, node) tuples in statespace with opcode.
:param statespace:
:param opcode:
"""
for k in statespace.nodes:
node = statespace.nodes[k]
for state in node.states:
if state.get_current_instruction()["opcode"] == opcode:
yield state, node
@staticmethod
def _dependent_on_storage(expression):
"""Checks if expression is dependent on a storage symbol and returns
the influencing storages.
:param expression:
:return:
"""
pattern = re.compile(r"storage_[a-z0-9_&^]*[0-9]+")
return pattern.findall(str(simplify(expression)))
@staticmethod
def _get_storage_variable(storage, state):
"""Get storage z3 object given storage name and the state.
:param storage: storage name example: storage_0
:param state: state to retrieve the variable from
:return: z3 object representing storage
"""
index = int(re.search("[0-9]+", storage).group())
try:
return state.environment.active_account.storage[index]
except KeyError:
return None
def _can_change(self, constraints, variable):
"""Checks if the variable can change given some constraints.
:param constraints:
:param variable:
:return:
"""
_constraints = copy.deepcopy(constraints)
try:
model = solver.get_model(_constraints)
except UnsatError:
return False
try:
initial_value = int(str(model.eval(variable, model_completion=True)))
return (
self._try_constraints(constraints, [variable != initial_value])
is not None
)
except AttributeError:
return False
def _get_influencing_storages(self, call):
"""Examines a Call object and returns an iterator of all storages that
influence the call value or direction.
:param call:
"""
state = call.state
node = call.node
# Get relevant storages
to, value = call.to, call.value
storages = []
if to.type == VarType.SYMBOLIC:
storages += self._dependent_on_storage(to.val)
if value.type == VarType.SYMBOLIC:
storages += self._dependent_on_storage(value.val)
# See if they can change within the constraints of the node
for storage in storages:
variable = self._get_storage_variable(storage, state)
can_change = self._can_change(node.constraints, variable)
if can_change:
yield storage
def _get_influencing_sstores(self, statespace, interesting_storages):
"""Gets sstore (state, node) tuples that write to interesting_storages.
:param statespace:
:param interesting_storages:
"""
for sstore_state, node in self._get_states_with_opcode(statespace, "SSTORE"):
index, value = sstore_state.mstate.stack[-1], sstore_state.mstate.stack[-2]
try:
index = util.get_concrete_int(index)
except TypeError:
index = str(index)
if "storage_{}".format(index) not in interesting_storages:
continue
yield sstore_state, node
# TODO: remove
@staticmethod
def _try_constraints(constraints, new_constraints):
"""Tries new constraints.
:param constraints:
:param new_constraints:
:return Model if satisfiable otherwise None
"""
_constraints = copy.deepcopy(constraints)
for constraint in new_constraints:
_constraints.append(copy.deepcopy(constraint))
try:
model = solver.get_model(_constraints)
return model
except UnsatError:
return None
detector = TxOrderDependenceModule()

@ -12,7 +12,9 @@ from mythril.laser.ethereum.strategy.basic import (
ReturnWeightedRandomStrategy,
)
from mythril.laser.ethereum.plugins.mutation_pruner import MutationPruner
from mythril.laser.ethereum.plugins.implementations.mutation_pruner import (
MutationPruner,
)
from mythril.solidity.soliditycontract import EVMContract, SolidityContract
from .ops import Call, SStore, VarType, get_variable

@ -0,0 +1,21 @@
""" Laser plugins
This module contains everything to do with laser plugins
Laser plugins are a way of extending laser's functionality without complicating the core business logic.
Different features that have been implemented in the form of plugins are:
- benchmarking
- path pruning
Plugins also provide a way to implement optimisations outside of the mythril code base and to inject them.
The api that laser currently provides is still unstable and will probably change to suit our needs
as more plugins get developed.
For the implementation of plugins the following modules are of interest:
- laser.plugins.plugin
- laser.plugins.signals
- laser.svm
Which show the basic interfaces with which plugins are able to interact
"""
from mythril.laser.ethereum.plugins.signals import PluginSignal

@ -0,0 +1,7 @@
""" Plugin implementations
This module contains the implementation of some features
- benchmarking
- pruning
"""

@ -1,4 +1,5 @@
from mythril.laser.ethereum.svm import LaserEVM
from mythril.laser.ethereum.plugins.plugin import LaserPlugin
from time import time
import matplotlib.pyplot as plt
import logging
@ -6,7 +7,7 @@ import logging
log = logging.getLogger(__name__)
class BenchmarkPlugin:
class BenchmarkPlugin(LaserPlugin):
"""Benchmark Plugin
This plugin aggregates the following information:

@ -1,6 +1,7 @@
from mythril.laser.ethereum.state.annotation import StateAnnotation
from mythril.laser.ethereum.svm import LaserEVM
from mythril.laser.ethereum.plugins.signals import PluginSkipWorldState
from mythril.laser.ethereum.plugins.plugin import LaserPlugin
from mythril.laser.ethereum.state.global_state import GlobalState
from mythril.laser.ethereum.transaction.transaction_models import (
ContractCreationTransaction,
@ -17,7 +18,7 @@ class MutationAnnotation(StateAnnotation):
pass
class MutationPruner:
class MutationPruner(LaserPlugin):
"""Mutation pruner plugin
Let S be a world state from which T is a symbolic transaction, and S' is the resulting world state.

@ -0,0 +1,23 @@
from mythril.laser.ethereum.svm import LaserEVM
class LaserPlugin:
""" Base class for laser plugins
Functionality in laser that the symbolic execution process does not need to depend on
can be implemented in the form of a laser plugin.
Laser plugins implement the function initialize(symbolic_vm) which is called with the laser virtual machine
when they are loaded.
Regularly a plugin will introduce several hooks into laser in this function
Plugins can direct actions by raising Signals defined in mythril.laser.ethereum.plugins.signals
For example, a pruning plugin might raise the PluginSkipWorldState signal.
"""
def initialize(self, symbolic_vm: LaserEVM) -> None:
""" Initializes this plugin on the symbolic virtual machine
:param symbolic_vm: symbolic virtual machine to initialize the laser plugin on
"""
raise NotImplementedError

@ -0,0 +1,19 @@
from mythril.laser.ethereum.plugins.plugin import LaserPlugin
from mythril.laser.ethereum.plugins.implementations.benchmark import BenchmarkPlugin
from mythril.laser.ethereum.plugins.implementations.mutation_pruner import (
MutationPruner,
)
class PluginFactory:
""" The plugin factory constructs the plugins provided with laser """
@staticmethod
def build_benchmark_plugin(name: str) -> LaserPlugin:
""" Creates an instance of the benchmark plugin with the given name """
return BenchmarkPlugin(name)
@staticmethod
def build_mutation_pruner_plugin() -> LaserPlugin:
""" Creates an instance of the mutation pruner plugin"""
return MutationPruner()

@ -0,0 +1,34 @@
from mythril.laser.ethereum.svm import LaserEVM
from mythril.laser.ethereum.plugins.plugin import LaserPlugin
from typing import List
class LaserPluginLoader:
"""
The LaserPluginLoader is used to abstract the logic relating to plugins.
Components outside of laser thus don't have to be aware of the interface that plugins provide
"""
def __init__(self, symbolic_vm: LaserEVM) -> None:
""" Initializes the plugin loader
:param symbolic_vm: symbolic virtual machine to load plugins for
"""
self.symbolic_vm = symbolic_vm
self.laser_plugins: List[LaserPlugin] = []
def load(self, laser_plugin: LaserPlugin) -> None:
""" Loads the plugin
:param laser_plugin: plugin that will be loaded in the symbolic virtual machine
"""
laser_plugin.initialize(self.symbolic_vm)
self.laser_plugins.append(laser_plugin)
def is_enabled(self, laser_plugin: LaserPlugin) -> bool:
""" Returns whether the plugin is loaded in the symbolic_vm
:param laser_plugin: plugin that will be checked
"""
return laser_plugin in self.laser_plugins

@ -1,454 +0,0 @@
"""This module implements classes needed to perform taint analysis."""
import copy
import logging
from typing import List, Tuple, Union
import mythril.laser.ethereum.util as helper
from mythril.analysis.symbolic import SymExecWrapper
from mythril.laser.ethereum.cfg import JumpType, Node
from mythril.laser.ethereum.state.environment import Environment
from mythril.laser.ethereum.state.global_state import GlobalState
from mythril.laser.smt import Expression
log = logging.getLogger(__name__)
class TaintRecord:
"""TaintRecord contains tainting information for a specific (state, node)
the information specifies the taint status before executing the operation
belonging to the state."""
def __init__(self):
"""Builds a taint record."""
self.stack = []
self.memory = {}
self.storage = {}
self.states = []
def stack_tainted(self, index: int) -> Union[bool, None]:
"""Returns taint value of stack element at index.
:param index:
:return:
"""
if index < len(self.stack):
return self.stack[index]
return None
def memory_tainted(self, index: int) -> bool:
"""Returns taint value of memory element at index.
:param index:
:return:
"""
if index in self.memory.keys():
return self.memory[index]
return False
def storage_tainted(self, index: int) -> bool:
"""Returns taint value of storage element at index.
:param index:
:return:
"""
if index in self.storage.keys():
return self.storage[index]
return False
def add_state(self, state: GlobalState) -> None:
"""Adds state with this taint record.
:param state:
"""
self.states.append(state)
def clone(self) -> "TaintRecord":
"""Clones this record.
:return:
"""
clone = TaintRecord()
clone.stack = copy.deepcopy(self.stack)
clone.memory = copy.deepcopy(self.memory)
clone.storage = copy.deepcopy(self.storage)
return clone
class TaintResult:
"""Taint analysis result obtained after having ran the taint runner."""
def __init__(self):
"""Create a new tains result."""
self.records = []
def check(self, state: GlobalState, stack_index: int) -> Union[bool, None]:
"""Checks if stack variable is tainted, before executing the
instruction.
:param state: state to check variable in
:param stack_index: index of stack variable
:return: tainted
"""
record = self._try_get_record(state)
if record is None:
return None
return record.stack_tainted(stack_index)
def add_records(self, records: List[TaintRecord]) -> None:
"""Adds records to this taint result.
:param records:
"""
self.records += records
def _try_get_record(self, state: GlobalState) -> Union[TaintRecord, None]:
"""Finds record belonging to the state.
:param state:
:return:
"""
for record in self.records:
if state in record.states:
return record
return None
class TaintRunner:
"""Taint runner, is able to run taint analysis on symbolic execution
result."""
@staticmethod
def execute(
statespace: SymExecWrapper, node: Node, state: GlobalState, initial_stack=None
) -> TaintResult:
"""Runs taint analysis on the statespace.
:param initial_stack:
:param statespace: symbolic statespace to run taint analysis on
:param node: taint introduction node
:param state: taint introduction state
:return: TaintResult object containing analysis results
"""
if initial_stack is None:
initial_stack = []
result = TaintResult()
transaction_stack_length = len(node.states[0].transaction_stack)
# Build initial current_node
init_record = TaintRecord()
init_record.stack = initial_stack
state_index = node.states.index(state)
# List of (Node, TaintRecord, index)
current_nodes = [(node, init_record, state_index)]
environment = node.states[0].environment
for node, record, index in current_nodes:
records = TaintRunner.execute_node(node, record, index)
result.add_records(records)
if len(records) == 0: # continue if there is no record to work on
continue
children = TaintRunner.children(
node, statespace, environment, transaction_stack_length
)
for child in children:
current_nodes.append((child, records[-1], 0))
return result
@staticmethod
def children(
node: Node,
statespace: SymExecWrapper,
environment: Environment,
transaction_stack_length: int,
) -> List[Node]:
"""
:param node:
:param statespace:
:param environment:
:param transaction_stack_length:
:return:
"""
direct_children = [
statespace.nodes[edge.node_to]
for edge in statespace.edges
if edge.node_from == node.uid and edge.type != JumpType.Transaction
]
children = []
for child in direct_children:
if all(
len(state.transaction_stack) == transaction_stack_length
for state in child.states
):
children.append(child)
elif all(
len(state.transaction_stack) > transaction_stack_length
for state in child.states
):
children += TaintRunner.children(
child, statespace, environment, transaction_stack_length
)
return children
@staticmethod
def execute_node(
node: Node, last_record: TaintRecord, state_index=0
) -> List[TaintRecord]:
"""Runs taint analysis on a given node.
:param node: node to analyse
:param last_record: last taint record to work from
:param state_index: state index to start from
:return: List of taint records linked to the states in this node
"""
records = [last_record]
for index in range(state_index, len(node.states)):
current_state = node.states[index]
records.append(TaintRunner.execute_state(records[-1], current_state))
return records[1:]
@staticmethod
def execute_state(record: TaintRecord, state: GlobalState) -> TaintRecord:
"""
:param record:
:param state:
:return:
"""
assert len(state.mstate.stack) == len(record.stack)
""" Runs taint analysis on a state """
record.add_state(state)
new_record = record.clone()
# Apply Change
op = state.get_current_instruction()["opcode"]
if op in TaintRunner.stack_taint_table.keys():
mutator = TaintRunner.stack_taint_table[op]
TaintRunner.mutate_stack(new_record, mutator)
elif op.startswith("PUSH"):
TaintRunner.mutate_push(op, new_record)
elif op.startswith("DUP"):
TaintRunner.mutate_dup(op, new_record)
elif op.startswith("SWAP"):
TaintRunner.mutate_swap(op, new_record)
elif op is "MLOAD":
TaintRunner.mutate_mload(new_record, state.mstate.stack[-1])
elif op.startswith("MSTORE"):
TaintRunner.mutate_mstore(new_record, state.mstate.stack[-1])
elif op is "SLOAD":
TaintRunner.mutate_sload(new_record, state.mstate.stack[-1])
elif op is "SSTORE":
TaintRunner.mutate_sstore(new_record, state.mstate.stack[-1])
elif op.startswith("LOG"):
TaintRunner.mutate_log(new_record, op)
elif op in ("CALL", "CALLCODE", "DELEGATECALL", "STATICCALL"):
TaintRunner.mutate_call(new_record, op)
else:
log.debug("Unknown operation encountered: {}".format(op))
return new_record
@staticmethod
def mutate_stack(record: TaintRecord, mutator: Tuple[int, int]) -> None:
"""
:param record:
:param mutator:
"""
pop, push = mutator
values = []
for i in range(pop):
values.append(record.stack.pop())
taint = any(values)
for i in range(push):
record.stack.append(taint)
@staticmethod
def mutate_push(op: str, record: TaintRecord) -> None:
"""
:param op:
:param record:
"""
TaintRunner.mutate_stack(record, (0, 1))
@staticmethod
def mutate_dup(op: str, record: TaintRecord) -> None:
"""
:param op:
:param record:
"""
depth = int(op[3:])
index = len(record.stack) - depth
record.stack.append(record.stack[index])
@staticmethod
def mutate_swap(op: str, record: TaintRecord) -> None:
"""
:param op:
:param record:
"""
depth = int(op[4:])
l = len(record.stack) - 1
i = l - depth
record.stack[l], record.stack[i] = record.stack[i], record.stack[l]
@staticmethod
def mutate_mload(record: TaintRecord, op0: Expression) -> None:
"""
:param record:
:param op0:
:return:
"""
_ = record.stack.pop()
try:
index = helper.get_concrete_int(op0)
except TypeError:
log.debug("Can't MLOAD taint track symbolically")
record.stack.append(False)
return
record.stack.append(record.memory_tainted(index))
@staticmethod
def mutate_mstore(record: TaintRecord, op0: Expression) -> None:
"""
:param record:
:param op0:
:return:
"""
_, value_taint = record.stack.pop(), record.stack.pop()
try:
index = helper.get_concrete_int(op0)
except TypeError:
log.debug("Can't mstore taint track symbolically")
return
record.memory[index] = value_taint
@staticmethod
def mutate_sload(record: TaintRecord, op0: Expression) -> None:
"""
:param record:
:param op0:
:return:
"""
_ = record.stack.pop()
try:
index = helper.get_concrete_int(op0)
except TypeError:
log.debug("Can't MLOAD taint track symbolically")
record.stack.append(False)
return
record.stack.append(record.storage_tainted(index))
@staticmethod
def mutate_sstore(record: TaintRecord, op0: Expression) -> None:
"""
:param record:
:param op0:
:return:
"""
_, value_taint = record.stack.pop(), record.stack.pop()
try:
index = helper.get_concrete_int(op0)
except TypeError:
log.debug("Can't mstore taint track symbolically")
return
record.storage[index] = value_taint
@staticmethod
def mutate_log(record: TaintRecord, op: str) -> None:
"""
:param record:
:param op:
"""
depth = int(op[3:])
for _ in range(depth + 2):
record.stack.pop()
@staticmethod
def mutate_call(record: TaintRecord, op: str) -> None:
"""
:param record:
:param op:
"""
pops = 6
if op in ("CALL", "CALLCODE"):
pops += 1
for _ in range(pops):
record.stack.pop()
record.stack.append(False)
stack_taint_table = {
# instruction: (taint source, taint target)
"POP": (1, 0),
"ADD": (2, 1),
"MUL": (2, 1),
"SUB": (2, 1),
"AND": (2, 1),
"OR": (2, 1),
"XOR": (2, 1),
"NOT": (1, 1),
"BYTE": (2, 1),
"DIV": (2, 1),
"MOD": (2, 1),
"SDIV": (2, 1),
"SMOD": (2, 1),
"ADDMOD": (3, 1),
"MULMOD": (3, 1),
"EXP": (2, 1),
"SIGNEXTEND": (2, 1),
"LT": (2, 1),
"GT": (2, 1),
"SLT": (2, 1),
"SGT": (2, 1),
"EQ": (2, 1),
"ISZERO": (1, 1),
"CALLVALUE": (0, 1),
"CALLDATALOAD": (1, 1),
"CALLDATACOPY": (3, 0), # todo
"CALLDATASIZE": (0, 1),
"ADDRESS": (0, 1),
"BALANCE": (1, 1),
"ORIGIN": (0, 1),
"CALLER": (0, 1),
"CODESIZE": (0, 1),
"SHA3": (2, 1),
"GASPRICE": (0, 1),
"CODECOPY": (3, 0),
"EXTCODESIZE": (1, 1),
"EXTCODECOPY": (4, 0),
"RETURNDATASIZE": (0, 1),
"BLOCKHASH": (1, 1),
"COINBASE": (0, 1),
"TIMESTAMP": (0, 1),
"NUMBER": (0, 1),
"DIFFICULTY": (0, 1),
"GASLIMIT": (0, 1),
"JUMP": (1, 0),
"JUMPI": (2, 0),
"PC": (0, 1),
"MSIZE": (0, 1),
"GAS": (0, 1),
"CREATE": (3, 1),
"CREATE2": (4, 1),
"RETURN": (2, 0),
}

@ -1,30 +0,0 @@
from mythril.laser.ethereum.taint_analysis import *
def test_mutate_not_tainted():
# Arrange
record = TaintRecord()
record.stack = [True, False, False]
# Act
TaintRunner.mutate_stack(record, (2, 1))
# Assert
assert record.stack_tainted(0)
assert record.stack_tainted(1) is False
assert record.stack == [True, False]
def test_mutate_tainted():
# Arrange
record = TaintRecord()
record.stack = [True, False, True]
# Act
TaintRunner.mutate_stack(record, (2, 1))
# Assert
assert record.stack_tainted(0)
assert record.stack_tainted(1)
assert record.stack == [True, True]

@ -1,36 +0,0 @@
from mythril.laser.ethereum.taint_analysis import *
def test_record_tainted_check():
# arrange
record = TaintRecord()
record.stack = [True, False, True]
# act
tainted = record.stack_tainted(2)
# assert
assert tainted is True
def test_record_untainted_check():
# arrange
record = TaintRecord()
record.stack = [True, False, False]
# act
tainted = record.stack_tainted(2)
# assert
assert tainted is False
def test_record_untouched_check():
# arrange
record = TaintRecord()
# act
tainted = record.stack_tainted(3)
# assert
assert tainted is None

@ -1,35 +0,0 @@
from mythril.laser.ethereum.taint_analysis import *
from mythril.laser.ethereum.state.global_state import GlobalState
def test_result_state():
# arrange
taint_result = TaintResult()
record = TaintRecord()
state = GlobalState(2, None, None)
state.mstate.stack = [1, 2, 3]
record.add_state(state)
record.stack = [False, False, False]
# act
taint_result.add_records([record])
tainted = taint_result.check(state, 2)
# assert
assert tainted is False
assert record in taint_result.records
def test_result_no_state():
# arrange
taint_result = TaintResult()
record = TaintRecord()
state = GlobalState(2, None, None)
state.mstate.stack = [1, 2, 3]
# act
taint_result.add_records([record])
tainted = taint_result.check(state, 2)
# assert
assert tainted is None
assert record in taint_result.records

@ -1,99 +0,0 @@
import mock
import pytest
from pytest_mock import mocker
from mythril.laser.ethereum.taint_analysis import *
from mythril.laser.ethereum.cfg import Node, Edge
from mythril.laser.ethereum.state.account import Account
from mythril.laser.ethereum.state.environment import Environment
from mythril.laser.ethereum.state.machine_state import MachineState
from mythril.laser.ethereum.state.global_state import GlobalState
from mythril.laser.ethereum.svm import LaserEVM
def test_execute_state(mocker):
record = TaintRecord()
record.stack = [True, False, True]
state = GlobalState(None, None, None)
state.mstate.stack = [1, 2, 3]
mocker.patch.object(state, "get_current_instruction")
state.get_current_instruction.return_value = {"opcode": "ADD"}
# Act
new_record = TaintRunner.execute_state(record, state)
# Assert
assert new_record.stack == [True, True]
assert record.stack == [True, False, True]
def test_execute_node(mocker):
record = TaintRecord()
record.stack = [True, True, False, False]
state_1 = GlobalState(None, None, None)
state_1.mstate.stack = [1, 2, 3, 1]
state_1.mstate.pc = 1
mocker.patch.object(state_1, "get_current_instruction")
state_1.get_current_instruction.return_value = {"opcode": "SWAP1"}
state_2 = GlobalState(None, 1, None)
state_2.mstate.stack = [1, 2, 4, 1]
mocker.patch.object(state_2, "get_current_instruction")
state_2.get_current_instruction.return_value = {"opcode": "ADD"}
node = Node("Test contract")
node.states = [state_1, state_2]
# Act
records = TaintRunner.execute_node(node, record)
# Assert
assert len(records) == 2
assert records[0].stack == [True, True, False, False]
assert records[1].stack == [True, True, False]
assert state_2 in records[0].states
assert state_1 in record.states
def test_execute(mocker):
active_account = Account("0x00")
environment = Environment(active_account, None, None, None, None, None)
state_1 = GlobalState(None, environment, None, MachineState(gas_limit=8000000))
state_1.mstate.stack = [1, 2]
mocker.patch.object(state_1, "get_current_instruction")
state_1.get_current_instruction.return_value = {"opcode": "PUSH"}
state_2 = GlobalState(None, environment, None, MachineState(gas_limit=8000000))
state_2.mstate.stack = [1, 2, 3]
mocker.patch.object(state_2, "get_current_instruction")
state_2.get_current_instruction.return_value = {"opcode": "ADD"}
node_1 = Node("Test contract")
node_1.states = [state_1, state_2]
state_3 = GlobalState(None, environment, None, MachineState(gas_limit=8000000))
state_3.mstate.stack = [1, 2]
mocker.patch.object(state_3, "get_current_instruction")
state_3.get_current_instruction.return_value = {"opcode": "ADD"}
node_2 = Node("Test contract")
node_2.states = [state_3]
edge = Edge(node_1.uid, node_2.uid)
statespace = LaserEVM(None)
statespace.edges = [edge]
statespace.nodes[node_1.uid] = node_1
statespace.nodes[node_2.uid] = node_2
# Act
result = TaintRunner.execute(statespace, node_1, state_1, [True, True])
# Assert
print(result)
assert len(result.records) == 3
assert result.records[2].states == []
assert state_3 in result.records[1].states
Loading…
Cancel
Save