Merge pull request #1029 from ConsenSys/dependence

Refactor predictable vars module
analysis/TOD
Bernhard Mueller 6 years ago committed by GitHub
commit 90fc3a6bf2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 269
      mythril/analysis/modules/dependence_on_predictable_vars.py
  2. 15
      mythril/laser/ethereum/instructions.py
  3. 1
      mythril/laser/ethereum/state/environment.py

@ -1,22 +1,53 @@
"""This module contains the detection code for predictable variable
dependence."""
import logging
import re
from mythril.analysis import solver
from mythril.analysis.call_helpers import get_call_from_state
from mythril.analysis.modules.base import DetectionModule
from mythril.analysis.ops import Call, VarType
from mythril.analysis.report import Issue
from mythril.analysis.swc_data import TIMESTAMP_DEPENDENCE, WEAK_RANDOMNESS
from mythril.exceptions import UnsatError
from mythril.analysis import solver
from mythril.laser.smt import ULT, symbol_factory
from mythril.analysis.swc_data import TIMESTAMP_DEPENDENCE, WEAK_RANDOMNESS
from mythril.laser.ethereum.state.global_state import GlobalState
from mythril.laser.ethereum.state.annotation import StateAnnotation
from typing import cast, List
import traceback
log = logging.getLogger(__name__)
predictable_ops = ["COINBASE", "GASLIMIT", "TIMESTAMP", "NUMBER"]
final_ops = ["CALL", "SUICIDE", "STOP", "RETURN"]
def is_prehook() -> bool:
"""Check if we are in prehook. One of Bernhard's trademark hacks!"""
return "pre_hook" in traceback.format_stack()[-4]
class PredictableValueAnnotation:
"""Symbol annotation used if a variable is initialized from a predictable environment variable."""
def __init__(self, operation: str) -> None:
self.operation = operation
class PredictablePathAnnotation(StateAnnotation):
"""State annotation used when a path is chosen based on a predictable variable."""
def __init__(self, operation: str, location: int) -> None:
self.operation = operation
self.location = location
class OldBlockNumberUsedAnnotation(StateAnnotation):
"""State annotation set in blockhash prehook if the input value is lower than the current block number."""
def __init__(self) -> None:
pass
class PredictableDependenceModule(DetectionModule):
"""This module detects whether Ether is sent using predictable
"""This module detects whether control flow decisions are made using predictable
parameters."""
def __init__(self) -> None:
@ -25,12 +56,12 @@ class PredictableDependenceModule(DetectionModule):
name="Dependence of Predictable Variables",
swc_id="{} {}".format(TIMESTAMP_DEPENDENCE, WEAK_RANDOMNESS),
description=(
"Check for CALLs that send >0 Ether as a result of computation "
"based on predictable variables such as block.coinbase, "
"block.gaslimit, block.timestamp, block.number"
"Check whether important control flow decisions are influenced by block.coinbase,"
"block.gaslimit, block.timestamp or block.number."
),
entrypoint="callback",
pre_hooks=["CALL", "CALLCODE", "DELEGATECALL", "STATICCALL"],
pre_hooks=["BLOCKHASH", "JUMPI"] + final_ops,
post_hooks=["BLOCKHASH"] + predictable_ops,
)
def execute(self, state: GlobalState) -> list:
@ -39,186 +70,138 @@ class PredictableDependenceModule(DetectionModule):
:param state:
:return:
"""
log.debug("Executing module: DEPENDENCE_ON_PREDICTABLE_VARS")
self._issues.extend(_analyze_states(state))
return self.issues
detector = PredictableDependenceModule()
def _analyze_states(state: GlobalState) -> list:
"""
:param state:
:return:
"""
issues = []
call = get_call_from_state(state)
if call is None:
return []
if "callvalue" in str(call.value):
log.debug("[DEPENDENCE_ON_PREDICTABLE_VARS] Skipping refund function")
return []
# We're only interested in calls that send Ether
if call.value.type == VarType.CONCRETE and call.value.val == 0:
return []
if is_prehook():
opcode = state.get_current_instruction()["opcode"]
if opcode in final_ops:
address = call.state.get_current_instruction()["address"]
for annotation in state.annotations:
if isinstance(annotation, PredictablePathAnnotation):
description = (
"The contract sends Ether depending on the values of the following variables:\n"
"The "
+ annotation.operation
+ " is used in to determine a control flow decision. "
)
description += (
"Note that the values of variables like coinbase, gaslimit, block number and timestamp "
"are predictable and can be manipulated by a malicious miner. Also keep in mind that attackers "
"know hashes of earlier blocks. Don't use any of those environment variables for random number "
"generation or to make critical control flow decisions."
)
# First check: look for predictable state variables in state & call recipient constraints
vars = ["coinbase", "gaslimit", "timestamp", "number"]
found = []
"""
Usually report low severity except in cases where the hash of a previous block is used to
determine control flow.
"""
for var in vars:
for constraint in call.state.mstate.constraints[:] + [call.to]:
if var in str(constraint):
found.append(var)
severity = "Medium" if "hash" in annotation.operation else "Low"
if len(found):
for item in found:
description += "- block.{}\n".format(item)
if solve(call):
swc_id = TIMESTAMP_DEPENDENCE if item == "timestamp" else WEAK_RANDOMNESS
"""
Note: We report the location of the JUMPI that lead to this path. Usually this maps to an if or
require statement.
"""
description += (
"Note that the values of variables like coinbase, gaslimit, block number and timestamp "
"are predictable and/or can be manipulated by a malicious miner. "
"Don't use them for random number generation or to make critical decisions."
swc_id = (
TIMESTAMP_DEPENDENCE
if "timestamp" in annotation.operation
else WEAK_RANDOMNESS
)
issue = Issue(
contract=state.environment.active_account.contract_name,
function_name=state.environment.active_function_name,
address=address,
address=annotation.location,
swc_id=swc_id,
bytecode=call.state.environment.code.bytecode,
bytecode=state.environment.code.bytecode,
title="Dependence on predictable environment variable",
severity="Low",
description_head="Sending of Ether depends on a predictable variable.",
severity=severity,
description_head="A control flow decision is made based on a predictable variable.",
description_tail=description,
gas_used=(
call.state.mstate.min_gas_used,
call.state.mstate.max_gas_used,
),
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
issues.append(issue)
# Second check: blockhash
elif opcode == "JUMPI":
for constraint in call.state.mstate.constraints[:] + [call.to]:
if "blockhash" in str(constraint):
if "number" in str(constraint):
m = re.search(r"blockhash\w+(\s-\s(\d+))*", str(constraint))
if m and solve(call):
# Look for predictable state variables in jump condition
found_item = m.group(1)
for annotation in state.mstate.stack[-2].annotations:
if found_item: # block.blockhash(block.number - N)
description = (
"The predictable expression 'block.blockhash(block.number - "
+ m.group(2)
+ ")' is used to determine Ether recipient"
)
if int(m.group(2)) > 255:
description += (
", this expression will always be equal to zero."
)
elif "storage" in str(
constraint
): # block.blockhash(block.number - storage_0)
description = (
"The predictable expression 'block.blockhash(block.number - "
+ "some_storage_var)' is used to determine Ether recipient"
if isinstance(annotation, PredictableValueAnnotation):
state.annotate(
PredictablePathAnnotation(
annotation.operation,
state.get_current_instruction()["address"],
)
else: # block.blockhash(block.number)
description = (
"The predictable expression 'block.blockhash(block.number)'"
+ " is used to determine Ether recipient"
)
description += ", this expression will always be equal to zero."
break
issue = Issue(
contract=state.environment.active_account.contract_name,
function_name=state.environment.active_function_name,
address=address,
bytecode=call.state.environment.code.bytecode,
title="Dependence on Predictable Variable",
severity="Low",
description_head="Sending of Ether depends on the blockhash.",
description_tail=description,
swc_id=WEAK_RANDOMNESS,
gas_used=(
call.state.mstate.min_gas_used,
call.state.mstate.max_gas_used,
elif opcode == "BLOCKHASH":
param = state.mstate.stack[-1]
try:
constraint = [
ULT(param, state.environment.block_number),
ULT(
state.environment.block_number,
symbol_factory.BitVecVal(2 ** 255, 256),
),
)
issues.append(issue)
break
]
# Why the second constraint? Because without it Z3 returns a solution where param overflows.
solver.get_model(constraint)
state.annotate(OldBlockNumberUsedAnnotation())
except UnsatError:
pass
else:
# we're in post hook
r = re.search(r"storage_([a-z0-9_&^]+)", str(constraint))
if r: # block.blockhash(storage_0)
opcode = state.environment.code.instruction_list[state.mstate.pc - 1]["opcode"]
"""We actually can do better here by adding a constraint
blockhash_block_storage_0 == 0 and checking model
satisfiability.
if opcode == "BLOCKHASH":
# if we're in the post hook of a BLOCKHASH op, check if an old block number was used to create it.
When this is done, severity can be raised from
'Informational' to 'Warning'. Checking that storage
at given index can be tainted is not necessary,
since it usually contains block.number of the
'commit' transaction in commit-reveal workflow.
"""
annotations = cast(
List[OldBlockNumberUsedAnnotation],
list(state.get_annotations(OldBlockNumberUsedAnnotation)),
)
index = r.group(1)
if index and solve(call):
description = (
"A block hash is calculated using the block.blockhash(uint blockNumber) method. "
"The block number is obtained from storage index {}".format(
index
if len(annotations):
state.mstate.stack[-1].annotate(
PredictableValueAnnotation("block hash of a previous block")
)
else:
# Always create an annotation when COINBASE, GASLIMIT, TIMESTAMP or NUMBER is executed.
state.mstate.stack[-1].annotate(
PredictableValueAnnotation(
"block.{} environment variable".format(opcode.lower())
)
issue = Issue(
contract=state.environment.active_account.contract_name,
function_name=state.environment.active_function_name,
address=address,
bytecode=call.state.environment.code.bytecode,
title="Dependence on Predictable Variable",
severity="Low",
description_head="Sending of Ether depends on the blockhash.",
description_tail=description,
swc_id=WEAK_RANDOMNESS,
gas_used=(
call.state.mstate.min_gas_used,
call.state.mstate.max_gas_used,
),
)
issues.append(issue)
break
return issues
def solve(call: Call) -> bool:
"""
:param call:
:return:
"""
try:
model = solver.get_model(call.state.mstate.constraints)
log.debug("[DEPENDENCE_ON_PREDICTABLE_VARS] MODEL: " + str(model))
pretty_model = solver.pretty_print_model(model)
return issues
log.debug("[DEPENDENCE_ON_PREDICTABLE_VARS] main model: \n%s" % pretty_model)
return True
except UnsatError:
log.debug("[DEPENDENCE_ON_PREDICTABLE_VARS] no model found")
return False
detector = PredictableDependenceModule()

@ -939,9 +939,20 @@ class Instruction:
data = symbol_factory.BitVecVal(0, 1)
if data.symbolic:
annotations = []
for b in state.memory[index : index + length]:
if isinstance(b, BitVec):
annotations.append(b.annotations)
argument_str = str(state.memory[index]).replace(" ", "_")
result = symbol_factory.BitVecFuncSym(
"KECCAC[{}]".format(argument_str), "keccak256", 256, input_=data
"KECCAC[{}]".format(argument_str),
"keccak256",
256,
input_=data,
annotations=annotations,
)
log.debug("Created BitVecFunc hash.")
@ -1243,7 +1254,7 @@ class Instruction:
:param global_state:
:return:
"""
global_state.mstate.stack.append(global_state.new_bitvec("block_number", 256))
global_state.mstate.stack.append(global_state.environment.block_number)
return [global_state]
@StateTransition()

@ -40,6 +40,7 @@ class Environment:
self.active_function_name = ""
self.address = active_account.address
self.block_number = symbol_factory.BitVecSym("block_number", 256)
# Ib
self.code = active_account.code if code is None else code

Loading…
Cancel
Save