diff --git a/mythril/analysis/modules/dependence_on_predictable_vars.py b/mythril/analysis/modules/dependence_on_predictable_vars.py index 7a931997..a8c7ac75 100644 --- a/mythril/analysis/modules/dependence_on_predictable_vars.py +++ b/mythril/analysis/modules/dependence_on_predictable_vars.py @@ -1,22 +1,53 @@ """This module contains the detection code for predictable variable dependence.""" import logging -import re -from mythril.analysis import solver -from mythril.analysis.call_helpers import get_call_from_state from mythril.analysis.modules.base import DetectionModule -from mythril.analysis.ops import Call, VarType from mythril.analysis.report import Issue -from mythril.analysis.swc_data import TIMESTAMP_DEPENDENCE, WEAK_RANDOMNESS from mythril.exceptions import UnsatError +from mythril.analysis import solver +from mythril.laser.smt import ULT, symbol_factory +from mythril.analysis.swc_data import TIMESTAMP_DEPENDENCE, WEAK_RANDOMNESS from mythril.laser.ethereum.state.global_state import GlobalState +from mythril.laser.ethereum.state.annotation import StateAnnotation +from typing import cast, List +import traceback log = logging.getLogger(__name__) +predictable_ops = ["COINBASE", "GASLIMIT", "TIMESTAMP", "NUMBER"] +final_ops = ["CALL", "SUICIDE", "STOP", "RETURN"] + + +def is_prehook() -> bool: + """Check if we are in prehook. One of Bernhard's trademark hacks!""" + return "pre_hook" in traceback.format_stack()[-4] + + +class PredictableValueAnnotation: + """Symbol annotation used if a variable is initialized from a predictable environment variable.""" + + def __init__(self, operation: str) -> None: + self.operation = operation + + +class PredictablePathAnnotation(StateAnnotation): + """State annotation used when a path is chosen based on a predictable variable.""" + + def __init__(self, operation: str, location: int) -> None: + self.operation = operation + self.location = location + + +class OldBlockNumberUsedAnnotation(StateAnnotation): + """State annotation set in blockhash prehook if the input value is lower than the current block number.""" + + def __init__(self) -> None: + pass + class PredictableDependenceModule(DetectionModule): - """This module detects whether Ether is sent using predictable + """This module detects whether control flow decisions are made using predictable parameters.""" def __init__(self) -> None: @@ -25,12 +56,12 @@ class PredictableDependenceModule(DetectionModule): name="Dependence of Predictable Variables", swc_id="{} {}".format(TIMESTAMP_DEPENDENCE, WEAK_RANDOMNESS), description=( - "Check for CALLs that send >0 Ether as a result of computation " - "based on predictable variables such as block.coinbase, " - "block.gaslimit, block.timestamp, block.number" + "Check whether important control flow decisions are influenced by block.coinbase," + "block.gaslimit, block.timestamp or block.number." ), entrypoint="callback", - pre_hooks=["CALL", "CALLCODE", "DELEGATECALL", "STATICCALL"], + pre_hooks=["BLOCKHASH", "JUMPI"] + final_ops, + post_hooks=["BLOCKHASH"] + predictable_ops, ) def execute(self, state: GlobalState) -> list: @@ -39,186 +70,138 @@ class PredictableDependenceModule(DetectionModule): :param state: :return: """ + log.debug("Executing module: DEPENDENCE_ON_PREDICTABLE_VARS") + self._issues.extend(_analyze_states(state)) return self.issues -detector = PredictableDependenceModule() - - def _analyze_states(state: GlobalState) -> list: """ :param state: :return: """ + issues = [] - call = get_call_from_state(state) - if call is None: - return [] - if "callvalue" in str(call.value): - log.debug("[DEPENDENCE_ON_PREDICTABLE_VARS] Skipping refund function") - return [] - - # We're only interested in calls that send Ether - if call.value.type == VarType.CONCRETE and call.value.val == 0: - return [] - - address = call.state.get_current_instruction()["address"] - - description = ( - "The contract sends Ether depending on the values of the following variables:\n" - ) - - # First check: look for predictable state variables in state & call recipient constraints - - vars = ["coinbase", "gaslimit", "timestamp", "number"] - found = [] - - for var in vars: - for constraint in call.state.mstate.constraints[:] + [call.to]: - if var in str(constraint): - found.append(var) - - if len(found): - for item in found: - description += "- block.{}\n".format(item) - if solve(call): - swc_id = TIMESTAMP_DEPENDENCE if item == "timestamp" else WEAK_RANDOMNESS - - description += ( - "Note that the values of variables like coinbase, gaslimit, block number and timestamp " - "are predictable and/or can be manipulated by a malicious miner. " - "Don't use them for random number generation or to make critical decisions." - ) - issue = Issue( - contract=state.environment.active_account.contract_name, - function_name=state.environment.active_function_name, - address=address, - swc_id=swc_id, - bytecode=call.state.environment.code.bytecode, - title="Dependence on predictable environment variable", - severity="Low", - description_head="Sending of Ether depends on a predictable variable.", - description_tail=description, - gas_used=( - call.state.mstate.min_gas_used, - call.state.mstate.max_gas_used, - ), - ) - issues.append(issue) + if is_prehook(): - # Second check: blockhash + opcode = state.get_current_instruction()["opcode"] - for constraint in call.state.mstate.constraints[:] + [call.to]: - if "blockhash" in str(constraint): - if "number" in str(constraint): - m = re.search(r"blockhash\w+(\s-\s(\d+))*", str(constraint)) - if m and solve(call): + if opcode in final_ops: - found_item = m.group(1) + for annotation in state.annotations: - if found_item: # block.blockhash(block.number - N) - description = ( - "The predictable expression 'block.blockhash(block.number - " - + m.group(2) - + ")' is used to determine Ether recipient" - ) - if int(m.group(2)) > 255: - description += ( - ", this expression will always be equal to zero." - ) - elif "storage" in str( - constraint - ): # block.blockhash(block.number - storage_0) - description = ( - "The predictable expression 'block.blockhash(block.number - " - + "some_storage_var)' is used to determine Ether recipient" - ) - else: # block.blockhash(block.number) - description = ( - "The predictable expression 'block.blockhash(block.number)'" - + " is used to determine Ether recipient" - ) - description += ", this expression will always be equal to zero." + if isinstance(annotation, PredictablePathAnnotation): + description = ( + "The " + + annotation.operation + + " is used in to determine a control flow decision. " + ) + description += ( + "Note that the values of variables like coinbase, gaslimit, block number and timestamp " + "are predictable and can be manipulated by a malicious miner. Also keep in mind that attackers " + "know hashes of earlier blocks. Don't use any of those environment variables for random number " + "generation or to make critical control flow decisions." + ) + + """ + Usually report low severity except in cases where the hash of a previous block is used to + determine control flow. + """ + + severity = "Medium" if "hash" in annotation.operation else "Low" + + """ + Note: We report the location of the JUMPI that lead to this path. Usually this maps to an if or + require statement. + """ + + swc_id = ( + TIMESTAMP_DEPENDENCE + if "timestamp" in annotation.operation + else WEAK_RANDOMNESS + ) issue = Issue( contract=state.environment.active_account.contract_name, function_name=state.environment.active_function_name, - address=address, - bytecode=call.state.environment.code.bytecode, - title="Dependence on Predictable Variable", - severity="Low", - description_head="Sending of Ether depends on the blockhash.", + address=annotation.location, + swc_id=swc_id, + bytecode=state.environment.code.bytecode, + title="Dependence on predictable environment variable", + severity=severity, + description_head="A control flow decision is made based on a predictable variable.", description_tail=description, - swc_id=WEAK_RANDOMNESS, - gas_used=( - call.state.mstate.min_gas_used, - call.state.mstate.max_gas_used, - ), + gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used), ) issues.append(issue) - break - else: - r = re.search(r"storage_([a-z0-9_&^]+)", str(constraint)) - if r: # block.blockhash(storage_0) + elif opcode == "JUMPI": - """We actually can do better here by adding a constraint - blockhash_block_storage_0 == 0 and checking model - satisfiability. + # Look for predictable state variables in jump condition - When this is done, severity can be raised from - 'Informational' to 'Warning'. Checking that storage - at given index can be tainted is not necessary, - since it usually contains block.number of the - 'commit' transaction in commit-reveal workflow. - """ + for annotation in state.mstate.stack[-2].annotations: - index = r.group(1) - if index and solve(call): - description = ( - "A block hash is calculated using the block.blockhash(uint blockNumber) method. " - "The block number is obtained from storage index {}".format( - index - ) + if isinstance(annotation, PredictableValueAnnotation): + state.annotate( + PredictablePathAnnotation( + annotation.operation, + state.get_current_instruction()["address"], ) - issue = Issue( - contract=state.environment.active_account.contract_name, - function_name=state.environment.active_function_name, - address=address, - bytecode=call.state.environment.code.bytecode, - title="Dependence on Predictable Variable", - severity="Low", - description_head="Sending of Ether depends on the blockhash.", - description_tail=description, - swc_id=WEAK_RANDOMNESS, - gas_used=( - call.state.mstate.min_gas_used, - call.state.mstate.max_gas_used, - ), - ) - issues.append(issue) - break - return issues + ) + break + elif opcode == "BLOCKHASH": -def solve(call: Call) -> bool: - """ + param = state.mstate.stack[-1] - :param call: - :return: - """ - try: - model = solver.get_model(call.state.mstate.constraints) - log.debug("[DEPENDENCE_ON_PREDICTABLE_VARS] MODEL: " + str(model)) - pretty_model = solver.pretty_print_model(model) + try: + constraint = [ + ULT(param, state.environment.block_number), + ULT( + state.environment.block_number, + symbol_factory.BitVecVal(2 ** 255, 256), + ), + ] + + # Why the second constraint? Because without it Z3 returns a solution where param overflows. + + solver.get_model(constraint) + state.annotate(OldBlockNumberUsedAnnotation()) - log.debug("[DEPENDENCE_ON_PREDICTABLE_VARS] main model: \n%s" % pretty_model) - return True + except UnsatError: + pass - except UnsatError: - log.debug("[DEPENDENCE_ON_PREDICTABLE_VARS] no model found") - return False + else: + # we're in post hook + + opcode = state.environment.code.instruction_list[state.mstate.pc - 1]["opcode"] + + if opcode == "BLOCKHASH": + # if we're in the post hook of a BLOCKHASH op, check if an old block number was used to create it. + + annotations = cast( + List[OldBlockNumberUsedAnnotation], + list(state.get_annotations(OldBlockNumberUsedAnnotation)), + ) + + if len(annotations): + state.mstate.stack[-1].annotate( + PredictableValueAnnotation("block hash of a previous block") + ) + else: + # Always create an annotation when COINBASE, GASLIMIT, TIMESTAMP or NUMBER is executed. + + state.mstate.stack[-1].annotate( + PredictableValueAnnotation( + "block.{} environment variable".format(opcode.lower()) + ) + ) + + return issues + + +detector = PredictableDependenceModule() diff --git a/mythril/laser/ethereum/instructions.py b/mythril/laser/ethereum/instructions.py index a353fbc1..f17d4401 100644 --- a/mythril/laser/ethereum/instructions.py +++ b/mythril/laser/ethereum/instructions.py @@ -939,9 +939,20 @@ class Instruction: data = symbol_factory.BitVecVal(0, 1) if data.symbolic: + + annotations = [] + + for b in state.memory[index : index + length]: + if isinstance(b, BitVec): + annotations.append(b.annotations) + argument_str = str(state.memory[index]).replace(" ", "_") result = symbol_factory.BitVecFuncSym( - "KECCAC[{}]".format(argument_str), "keccak256", 256, input_=data + "KECCAC[{}]".format(argument_str), + "keccak256", + 256, + input_=data, + annotations=annotations, ) log.debug("Created BitVecFunc hash.") @@ -1243,7 +1254,7 @@ class Instruction: :param global_state: :return: """ - global_state.mstate.stack.append(global_state.new_bitvec("block_number", 256)) + global_state.mstate.stack.append(global_state.environment.block_number) return [global_state] @StateTransition() diff --git a/mythril/laser/ethereum/state/environment.py b/mythril/laser/ethereum/state/environment.py index 0f467413..a3300f9a 100644 --- a/mythril/laser/ethereum/state/environment.py +++ b/mythril/laser/ethereum/state/environment.py @@ -40,6 +40,7 @@ class Environment: self.active_function_name = "" self.address = active_account.address + self.block_number = symbol_factory.BitVecSym("block_number", 256) # Ib self.code = active_account.code if code is None else code