Add constantinople reentrancy check modules

constantinope-reentrancy-check
Bernhard Mueller 6 years ago
parent a175d268cb
commit 571c8708ba
  1. 93
      mythril/analysis/modules/constantinople_bug_1.py
  2. 96
      mythril/analysis/modules/constantinople_bug_2.py
  3. 10
      mythril/laser/ethereum/gas.py
  4. 8
      mythril/laser/ethereum/plugins/mutation_pruner.py

@ -0,0 +1,93 @@
"""This module checks for the Constantinople reentrancy vulnerability reported by ChainSecurity."""
from mythril.analysis import solver
from mythril.analysis.swc_data import REENTRANCY
from mythril.analysis.modules.base import DetectionModule
from mythril.laser.ethereum.plugins.mutation_pruner import MutationAnnotation
from mythril.analysis.report import Issue
from mythril.laser.ethereum.state.global_state import GlobalState
from mythril.exceptions import UnsatError
import logging
log = logging.getLogger(__name__)
DESCRIPTION = """
This module reports an issue when:
1. The STOP or RETURN instruction is reached AND
2. A state variable has been modified during the transaction AND
2. The minimum gas used by the transaction is less than or equal to 1600.
"""
def _analyze_state(state):
"""
:param state:
:return:
"""
node = state.node
if len(list(state.get_annotations(MutationAnnotation))) > 0:
if state.mstate.min_gas_used <= 1600:
# Verify that this state is reachable
try:
constraints = node.constraints
transaction_sequence = solver.get_transaction_sequence(
state, constraints
)
except UnsatError:
log.debug("State is unreachable.")
return []
issue = Issue(
contract=node.contract_name,
function_name=node.function_name,
address=state.instruction["address"],
swc_id=REENTRANCY,
title="State write for 1600 gas or less",
severity="Medium",
bytecode=state.environment.code.bytecode,
description_head="Caller can modify state for 1600 gas or less.",
description_tail="The planned Constantinople hard fork reduces the gas cost for writing to state "
+"variables that have been written to in the same transaction. In some cases "
+"this may cause re-rentrancy vulnerabilities in previously safe contracts.",
debug=transaction_sequence,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
return [issue]
return []
class ConstantinopleReentrancy(DetectionModule):
def __init__(self):
""""""
super().__init__(
name="Constantinople reentrancy vulnerability",
swc_id=REENTRANCY,
description=DESCRIPTION,
entrypoint="callback",
pre_hooks=["STOP", "RETURN"],
)
def execute(self, state: GlobalState):
"""
:param state:
:return:
"""
self._issues.extend(_analyze_state(state))
return self.issues
detector = ConstantinopleReentrancy()

@ -0,0 +1,96 @@
"""This module contains the detection code for potentially insecure low-level
calls."""
from mythril.analysis import solver
from mythril.analysis.swc_data import REENTRANCY
from mythril.analysis.modules.base import DetectionModule
from mythril.analysis.report import Issue
from mythril.laser.smt import UGT, symbol_factory
from mythril.laser.ethereum.state.global_state import GlobalState
from mythril.laser.ethereum.state.annotation import StateAnnotation
from mythril.exceptions import UnsatError
import logging
import json
log = logging.getLogger(__name__)
DESCRIPTION = """
Search for external calls followed by a state change (CALL / SSTORE).
"""
class PostCallAnnotation(StateAnnotation):
pass
def _analyze_state(state):
"""
:param state:
:return:
"""
node = state.node
if len(list(state.get_annotations(PostCallAnnotation))) > 0:
# Verify that this state is reachable
try:
constraints = node.constraints
transaction_sequence = solver.get_transaction_sequence(
state, constraints
)
except UnsatError:
log.debug("State is unreachable.")
return []
issue = Issue(
contract=node.contract_name,
function_name=node.function_name,
address=state.instruction["address"],
swc_id=REENTRANCY,
title="State change after external call",
severity="Medium",
bytecode=state.environment.code.bytecode,
description_head="State change after external call.",
description_tail="The contract account state is changed after an external call. Consider that the "
"called contract could re-enter the function before this state change takes place. This can lead to "
"business logic vulnerabilities.",
debug=transaction_sequence,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
return [issue]
elif state.instruction["opcode"] == "CALL":
state.annotate(PostCallAnnotation())
return []
class ExternalCalls(DetectionModule):
"""This module searches for low level calls (e.g. call.value()) that
forward all gas to the callee."""
def __init__(self):
""""""
super().__init__(
name="External calls",
swc_id=REENTRANCY,
description=DESCRIPTION,
entrypoint="callback",
pre_hooks=["CALL", "SSTORE"]
)
def execute(self, state: GlobalState):
"""
:param state:
:return:
"""
self._issues.extend(_analyze_state(state))
return self.issues
detector = ExternalCalls()

@ -94,7 +94,7 @@ OPCODE_GAS = {
"MSTORE8": (3, 98), "MSTORE8": (3, 98),
# assume 64 byte r/w cost as upper bound # assume 64 byte r/w cost as upper bound
"SLOAD": (400, 400), "SLOAD": (400, 400),
"SSTORE": (5000, 25000), "SSTORE": (200, 25000),
"JUMP": (8, 8), "JUMP": (8, 8),
"JUMPI": (10, 10), "JUMPI": (10, 10),
"PC": (2, 2), "PC": (2, 2),
@ -170,10 +170,10 @@ OPCODE_GAS = {
# allows, but let's stick to the reasonable standard here. # allows, but let's stick to the reasonable standard here.
# https://ethereum.stackexchange.com/a/1691 # https://ethereum.stackexchange.com/a/1691
"LOG0": (375, 375 + 8 * 32), "LOG0": (375, 375 + 8 * 32),
"LOG1": (2 * 375, 2 * 375 + 8 * 32), "LOG1": (375, 2 * 375 + 8 * 32),
"LOG2": (3 * 375, 3 * 375 + 8 * 32), "LOG2": (375, 3 * 375 + 8 * 32),
"LOG3": (4 * 375, 4 * 375 + 8 * 32), "LOG3": (375, 4 * 375 + 8 * 32),
"LOG4": (5 * 375, 5 * 375 + 8 * 32), "LOG4": (375, 5 * 375 + 8 * 32),
"CREATE": (32000, 32000), "CREATE": (32000, 32000),
"CALL": (700, 700 + 9000 + 25000), "CALL": (700, 700 + 9000 + 25000),
"NATIVE_COST": calculate_native_gas, "NATIVE_COST": calculate_native_gas,

@ -3,7 +3,7 @@ from mythril.laser.ethereum.svm import LaserEVM
from mythril.laser.ethereum.plugins.signals import PluginSkipWorldState from mythril.laser.ethereum.plugins.signals import PluginSkipWorldState
from mythril.laser.ethereum.state.global_state import GlobalState from mythril.laser.ethereum.state.global_state import GlobalState
from mythril.laser.ethereum.transaction.transaction_models import ( from mythril.laser.ethereum.transaction.transaction_models import (
ContractCreationTransaction, MessageCallTransaction,
) )
@ -48,9 +48,5 @@ class MutationPruner:
@symbolic_vm.laser_hook("add_world_state") @symbolic_vm.laser_hook("add_world_state")
def world_state_filter_hook(global_state: GlobalState): def world_state_filter_hook(global_state: GlobalState):
if isinstance( if isinstance(global_state.current_transaction, MessageCallTransaction) and len(list(global_state.get_annotations(MutationAnnotation))) == 0:
global_state.current_transaction, ContractCreationTransaction
):
return
if len(list(global_state.get_annotations(MutationAnnotation))) == 0:
raise PluginSkipWorldState raise PluginSkipWorldState

Loading…
Cancel
Save