remove old tainting logic

remove_deprecated_taint
Joran Honig 6 years ago
parent 40d02c662a
commit 999e1a22c8
  1. 453
      mythril/laser/ethereum/taint_analysis.py
  2. 30
      tests/taint_mutate_stack_test.py
  3. 36
      tests/taint_record_test.py
  4. 35
      tests/taint_result_test.py
  5. 99
      tests/taint_runner_test.py

@ -1,453 +0,0 @@
"""This module implements classes needed to perform taint analysis."""
import copy
import logging
from typing import List, Tuple, Union
import mythril.laser.ethereum.util as helper
from mythril.analysis.symbolic import SymExecWrapper
from mythril.laser.ethereum.cfg import JumpType, Node
from mythril.laser.ethereum.state.environment import Environment
from mythril.laser.ethereum.state.global_state import GlobalState
from mythril.laser.smt import Expression
log = logging.getLogger(__name__)
class TaintRecord:
"""TaintRecord contains tainting information for a specific (state, node)
the information specifies the taint status before executing the operation
belonging to the state."""
def __init__(self):
"""Builds a taint record."""
self.stack = []
self.memory = {}
self.storage = {}
self.states = []
def stack_tainted(self, index: int) -> Union[bool, None]:
"""Returns taint value of stack element at index.
:param index:
:return:
"""
if index < len(self.stack):
return self.stack[index]
return None
def memory_tainted(self, index: int) -> bool:
"""Returns taint value of memory element at index.
:param index:
:return:
"""
if index in self.memory.keys():
return self.memory[index]
return False
def storage_tainted(self, index: int) -> bool:
"""Returns taint value of storage element at index.
:param index:
:return:
"""
if index in self.storage.keys():
return self.storage[index]
return False
def add_state(self, state: GlobalState) -> None:
"""Adds state with this taint record.
:param state:
"""
self.states.append(state)
def clone(self) -> "TaintRecord":
"""Clones this record.
:return:
"""
clone = TaintRecord()
clone.stack = copy.deepcopy(self.stack)
clone.memory = copy.deepcopy(self.memory)
clone.storage = copy.deepcopy(self.storage)
return clone
class TaintResult:
"""Taint analysis result obtained after having ran the taint runner."""
def __init__(self):
"""Create a new tains result."""
self.records = []
def check(self, state: GlobalState, stack_index: int) -> Union[bool, None]:
"""Checks if stack variable is tainted, before executing the
instruction.
:param state: state to check variable in
:param stack_index: index of stack variable
:return: tainted
"""
record = self._try_get_record(state)
if record is None:
return None
return record.stack_tainted(stack_index)
def add_records(self, records: List[TaintRecord]) -> None:
"""Adds records to this taint result.
:param records:
"""
self.records += records
def _try_get_record(self, state: GlobalState) -> Union[TaintRecord, None]:
"""Finds record belonging to the state.
:param state:
:return:
"""
for record in self.records:
if state in record.states:
return record
return None
class TaintRunner:
"""Taint runner, is able to run taint analysis on symbolic execution
result."""
@staticmethod
def execute(
statespace: SymExecWrapper, node: Node, state: GlobalState, initial_stack=None
) -> TaintResult:
"""Runs taint analysis on the statespace.
:param initial_stack:
:param statespace: symbolic statespace to run taint analysis on
:param node: taint introduction node
:param state: taint introduction state
:return: TaintResult object containing analysis results
"""
if initial_stack is None:
initial_stack = []
result = TaintResult()
transaction_stack_length = len(node.states[0].transaction_stack)
# Build initial current_node
init_record = TaintRecord()
init_record.stack = initial_stack
state_index = node.states.index(state)
# List of (Node, TaintRecord, index)
current_nodes = [(node, init_record, state_index)]
environment = node.states[0].environment
for node, record, index in current_nodes:
records = TaintRunner.execute_node(node, record, index)
result.add_records(records)
if len(records) == 0: # continue if there is no record to work on
continue
children = TaintRunner.children(
node, statespace, environment, transaction_stack_length
)
for child in children:
current_nodes.append((child, records[-1], 0))
return result
@staticmethod
def children(
node: Node,
statespace: SymExecWrapper,
environment: Environment,
transaction_stack_length: int,
) -> List[Node]:
"""
:param node:
:param statespace:
:param environment:
:param transaction_stack_length:
:return:
"""
direct_children = [
statespace.nodes[edge.node_to]
for edge in statespace.edges
if edge.node_from == node.uid and edge.type != JumpType.Transaction
]
children = []
for child in direct_children:
if all(
len(state.transaction_stack) == transaction_stack_length
for state in child.states
):
children.append(child)
elif all(
len(state.transaction_stack) > transaction_stack_length
for state in child.states
):
children += TaintRunner.children(
child, statespace, environment, transaction_stack_length
)
return children
@staticmethod
def execute_node(
node: Node, last_record: TaintRecord, state_index=0
) -> List[TaintRecord]:
"""Runs taint analysis on a given node.
:param node: node to analyse
:param last_record: last taint record to work from
:param state_index: state index to start from
:return: List of taint records linked to the states in this node
"""
records = [last_record]
for index in range(state_index, len(node.states)):
current_state = node.states[index]
records.append(TaintRunner.execute_state(records[-1], current_state))
return records[1:]
@staticmethod
def execute_state(record: TaintRecord, state: GlobalState) -> TaintRecord:
"""
:param record:
:param state:
:return:
"""
assert len(state.mstate.stack) == len(record.stack)
""" Runs taint analysis on a state """
record.add_state(state)
new_record = record.clone()
# Apply Change
op = state.get_current_instruction()["opcode"]
if op in TaintRunner.stack_taint_table.keys():
mutator = TaintRunner.stack_taint_table[op]
TaintRunner.mutate_stack(new_record, mutator)
elif op.startswith("PUSH"):
TaintRunner.mutate_push(op, new_record)
elif op.startswith("DUP"):
TaintRunner.mutate_dup(op, new_record)
elif op.startswith("SWAP"):
TaintRunner.mutate_swap(op, new_record)
elif op is "MLOAD":
TaintRunner.mutate_mload(new_record, state.mstate.stack[-1])
elif op.startswith("MSTORE"):
TaintRunner.mutate_mstore(new_record, state.mstate.stack[-1])
elif op is "SLOAD":
TaintRunner.mutate_sload(new_record, state.mstate.stack[-1])
elif op is "SSTORE":
TaintRunner.mutate_sstore(new_record, state.mstate.stack[-1])
elif op.startswith("LOG"):
TaintRunner.mutate_log(new_record, op)
elif op in ("CALL", "CALLCODE", "DELEGATECALL", "STATICCALL"):
TaintRunner.mutate_call(new_record, op)
else:
log.debug("Unknown operation encountered: {}".format(op))
return new_record
@staticmethod
def mutate_stack(record: TaintRecord, mutator: Tuple[int, int]) -> None:
"""
:param record:
:param mutator:
"""
pop, push = mutator
values = []
for i in range(pop):
values.append(record.stack.pop())
taint = any(values)
for i in range(push):
record.stack.append(taint)
@staticmethod
def mutate_push(op: str, record: TaintRecord) -> None:
"""
:param op:
:param record:
"""
TaintRunner.mutate_stack(record, (0, 1))
@staticmethod
def mutate_dup(op: str, record: TaintRecord) -> None:
"""
:param op:
:param record:
"""
depth = int(op[3:])
index = len(record.stack) - depth
record.stack.append(record.stack[index])
@staticmethod
def mutate_swap(op: str, record: TaintRecord) -> None:
"""
:param op:
:param record:
"""
depth = int(op[4:])
l = len(record.stack) - 1
i = l - depth
record.stack[l], record.stack[i] = record.stack[i], record.stack[l]
@staticmethod
def mutate_mload(record: TaintRecord, op0: Expression) -> None:
"""
:param record:
:param op0:
:return:
"""
_ = record.stack.pop()
try:
index = helper.get_concrete_int(op0)
except TypeError:
log.debug("Can't MLOAD taint track symbolically")
record.stack.append(False)
return
record.stack.append(record.memory_tainted(index))
@staticmethod
def mutate_mstore(record: TaintRecord, op0: Expression) -> None:
"""
:param record:
:param op0:
:return:
"""
_, value_taint = record.stack.pop(), record.stack.pop()
try:
index = helper.get_concrete_int(op0)
except TypeError:
log.debug("Can't mstore taint track symbolically")
return
record.memory[index] = value_taint
@staticmethod
def mutate_sload(record: TaintRecord, op0: Expression) -> None:
"""
:param record:
:param op0:
:return:
"""
_ = record.stack.pop()
try:
index = helper.get_concrete_int(op0)
except TypeError:
log.debug("Can't MLOAD taint track symbolically")
record.stack.append(False)
return
record.stack.append(record.storage_tainted(index))
@staticmethod
def mutate_sstore(record: TaintRecord, op0: Expression) -> None:
"""
:param record:
:param op0:
:return:
"""
_, value_taint = record.stack.pop(), record.stack.pop()
try:
index = helper.get_concrete_int(op0)
except TypeError:
log.debug("Can't mstore taint track symbolically")
return
record.storage[index] = value_taint
@staticmethod
def mutate_log(record: TaintRecord, op: str) -> None:
"""
:param record:
:param op:
"""
depth = int(op[3:])
for _ in range(depth + 2):
record.stack.pop()
@staticmethod
def mutate_call(record: TaintRecord, op: str) -> None:
"""
:param record:
:param op:
"""
pops = 6
if op in ("CALL", "CALLCODE"):
pops += 1
for _ in range(pops):
record.stack.pop()
record.stack.append(False)
stack_taint_table = {
# instruction: (taint source, taint target)
"POP": (1, 0),
"ADD": (2, 1),
"MUL": (2, 1),
"SUB": (2, 1),
"AND": (2, 1),
"OR": (2, 1),
"XOR": (2, 1),
"NOT": (1, 1),
"BYTE": (2, 1),
"DIV": (2, 1),
"MOD": (2, 1),
"SDIV": (2, 1),
"SMOD": (2, 1),
"ADDMOD": (3, 1),
"MULMOD": (3, 1),
"EXP": (2, 1),
"SIGNEXTEND": (2, 1),
"LT": (2, 1),
"GT": (2, 1),
"SLT": (2, 1),
"SGT": (2, 1),
"EQ": (2, 1),
"ISZERO": (1, 1),
"CALLVALUE": (0, 1),
"CALLDATALOAD": (1, 1),
"CALLDATACOPY": (3, 0), # todo
"CALLDATASIZE": (0, 1),
"ADDRESS": (0, 1),
"BALANCE": (1, 1),
"ORIGIN": (0, 1),
"CALLER": (0, 1),
"CODESIZE": (0, 1),
"SHA3": (2, 1),
"GASPRICE": (0, 1),
"CODECOPY": (3, 0),
"EXTCODESIZE": (1, 1),
"EXTCODECOPY": (4, 0),
"RETURNDATASIZE": (0, 1),
"BLOCKHASH": (1, 1),
"COINBASE": (0, 1),
"TIMESTAMP": (0, 1),
"NUMBER": (0, 1),
"DIFFICULTY": (0, 1),
"GASLIMIT": (0, 1),
"JUMP": (1, 0),
"JUMPI": (2, 0),
"PC": (0, 1),
"MSIZE": (0, 1),
"GAS": (0, 1),
"CREATE": (3, 1),
"RETURN": (2, 0),
}

@ -1,30 +0,0 @@
from mythril.laser.ethereum.taint_analysis import *
def test_mutate_not_tainted():
# Arrange
record = TaintRecord()
record.stack = [True, False, False]
# Act
TaintRunner.mutate_stack(record, (2, 1))
# Assert
assert record.stack_tainted(0)
assert record.stack_tainted(1) is False
assert record.stack == [True, False]
def test_mutate_tainted():
# Arrange
record = TaintRecord()
record.stack = [True, False, True]
# Act
TaintRunner.mutate_stack(record, (2, 1))
# Assert
assert record.stack_tainted(0)
assert record.stack_tainted(1)
assert record.stack == [True, True]

@ -1,36 +0,0 @@
from mythril.laser.ethereum.taint_analysis import *
def test_record_tainted_check():
# arrange
record = TaintRecord()
record.stack = [True, False, True]
# act
tainted = record.stack_tainted(2)
# assert
assert tainted is True
def test_record_untainted_check():
# arrange
record = TaintRecord()
record.stack = [True, False, False]
# act
tainted = record.stack_tainted(2)
# assert
assert tainted is False
def test_record_untouched_check():
# arrange
record = TaintRecord()
# act
tainted = record.stack_tainted(3)
# assert
assert tainted is None

@ -1,35 +0,0 @@
from mythril.laser.ethereum.taint_analysis import *
from mythril.laser.ethereum.state.global_state import GlobalState
def test_result_state():
# arrange
taint_result = TaintResult()
record = TaintRecord()
state = GlobalState(2, None, None)
state.mstate.stack = [1, 2, 3]
record.add_state(state)
record.stack = [False, False, False]
# act
taint_result.add_records([record])
tainted = taint_result.check(state, 2)
# assert
assert tainted is False
assert record in taint_result.records
def test_result_no_state():
# arrange
taint_result = TaintResult()
record = TaintRecord()
state = GlobalState(2, None, None)
state.mstate.stack = [1, 2, 3]
# act
taint_result.add_records([record])
tainted = taint_result.check(state, 2)
# assert
assert tainted is None
assert record in taint_result.records

@ -1,99 +0,0 @@
import mock
import pytest
from pytest_mock import mocker
from mythril.laser.ethereum.taint_analysis import *
from mythril.laser.ethereum.cfg import Node, Edge
from mythril.laser.ethereum.state.account import Account
from mythril.laser.ethereum.state.environment import Environment
from mythril.laser.ethereum.state.machine_state import MachineState
from mythril.laser.ethereum.state.global_state import GlobalState
from mythril.laser.ethereum.svm import LaserEVM
def test_execute_state(mocker):
record = TaintRecord()
record.stack = [True, False, True]
state = GlobalState(None, None, None)
state.mstate.stack = [1, 2, 3]
mocker.patch.object(state, "get_current_instruction")
state.get_current_instruction.return_value = {"opcode": "ADD"}
# Act
new_record = TaintRunner.execute_state(record, state)
# Assert
assert new_record.stack == [True, True]
assert record.stack == [True, False, True]
def test_execute_node(mocker):
record = TaintRecord()
record.stack = [True, True, False, False]
state_1 = GlobalState(None, None, None)
state_1.mstate.stack = [1, 2, 3, 1]
state_1.mstate.pc = 1
mocker.patch.object(state_1, "get_current_instruction")
state_1.get_current_instruction.return_value = {"opcode": "SWAP1"}
state_2 = GlobalState(None, 1, None)
state_2.mstate.stack = [1, 2, 4, 1]
mocker.patch.object(state_2, "get_current_instruction")
state_2.get_current_instruction.return_value = {"opcode": "ADD"}
node = Node("Test contract")
node.states = [state_1, state_2]
# Act
records = TaintRunner.execute_node(node, record)
# Assert
assert len(records) == 2
assert records[0].stack == [True, True, False, False]
assert records[1].stack == [True, True, False]
assert state_2 in records[0].states
assert state_1 in record.states
def test_execute(mocker):
active_account = Account("0x00")
environment = Environment(active_account, None, None, None, None, None)
state_1 = GlobalState(None, environment, None, MachineState(gas_limit=8000000))
state_1.mstate.stack = [1, 2]
mocker.patch.object(state_1, "get_current_instruction")
state_1.get_current_instruction.return_value = {"opcode": "PUSH"}
state_2 = GlobalState(None, environment, None, MachineState(gas_limit=8000000))
state_2.mstate.stack = [1, 2, 3]
mocker.patch.object(state_2, "get_current_instruction")
state_2.get_current_instruction.return_value = {"opcode": "ADD"}
node_1 = Node("Test contract")
node_1.states = [state_1, state_2]
state_3 = GlobalState(None, environment, None, MachineState(gas_limit=8000000))
state_3.mstate.stack = [1, 2]
mocker.patch.object(state_3, "get_current_instruction")
state_3.get_current_instruction.return_value = {"opcode": "ADD"}
node_2 = Node("Test contract")
node_2.states = [state_3]
edge = Edge(node_1.uid, node_2.uid)
statespace = LaserEVM(None)
statespace.edges = [edge]
statespace.nodes[node_1.uid] = node_1
statespace.nodes[node_2.uid] = node_2
# Act
result = TaintRunner.execute(statespace, node_1, state_1, [True, True])
# Assert
print(result)
assert len(result.records) == 3
assert result.records[2].states == []
assert state_3 in result.records[1].states
Loading…
Cancel
Save