mirror of https://github.com/ConsenSys/mythril
commit
ba564ce437
@ -1,30 +0,0 @@ |
||||
from mythril.support.support_utils import Singleton |
||||
|
||||
|
||||
class AnalysisArgs(object, metaclass=Singleton): |
||||
""" |
||||
This module helps in preventing args being sent through multiple of classes to reach analysis modules |
||||
""" |
||||
|
||||
def __init__(self): |
||||
self._loop_bound = 3 |
||||
self._solver_timeout = 10000 |
||||
|
||||
def set_loop_bound(self, loop_bound: int): |
||||
if loop_bound is not None: |
||||
self._loop_bound = loop_bound |
||||
|
||||
def set_solver_timeout(self, solver_timeout: int): |
||||
if solver_timeout is not None: |
||||
self._solver_timeout = solver_timeout |
||||
|
||||
@property |
||||
def loop_bound(self): |
||||
return self._loop_bound |
||||
|
||||
@property |
||||
def solver_timeout(self): |
||||
return self._solver_timeout |
||||
|
||||
|
||||
analysis_args = AnalysisArgs() |
@ -0,0 +1,112 @@ |
||||
"""This module contains the detection code for predictable variable |
||||
dependence.""" |
||||
import logging |
||||
from copy import copy |
||||
|
||||
from mythril.analysis.module.base import DetectionModule, EntryPoint |
||||
from mythril.analysis.report import Issue |
||||
from mythril.exceptions import UnsatError |
||||
from mythril.analysis import solver |
||||
from mythril.analysis.swc_data import TX_ORIGIN_USAGE |
||||
from mythril.laser.ethereum.state.global_state import GlobalState |
||||
from typing import List |
||||
|
||||
log = logging.getLogger(__name__) |
||||
|
||||
|
||||
class TxOriginAnnotation: |
||||
"""Symbol annotation added to a variable that is initialized with a call to the ORIGIN instruction.""" |
||||
|
||||
def __init__(self) -> None: |
||||
pass |
||||
|
||||
|
||||
class TxOrigin(DetectionModule): |
||||
"""This module detects whether control flow decisions are made based on the transaction origin.""" |
||||
|
||||
name = "Control flow depends on tx.origin" |
||||
swc_id = TX_ORIGIN_USAGE |
||||
description = "Check whether control flow decisions are influenced by tx.origin" |
||||
entry_point = EntryPoint.CALLBACK |
||||
|
||||
pre_hooks = ["JUMPI"] |
||||
post_hooks = ["ORIGIN"] |
||||
|
||||
def _execute(self, state: GlobalState) -> None: |
||||
""" |
||||
|
||||
:param state: |
||||
:return: |
||||
""" |
||||
if state.get_current_instruction()["address"] in self.cache: |
||||
return |
||||
issues = self._analyze_state(state) |
||||
for issue in issues: |
||||
self.cache.add(issue.address) |
||||
self.issues.extend(issues) |
||||
|
||||
@staticmethod |
||||
def _analyze_state(state: GlobalState) -> list: |
||||
""" |
||||
|
||||
:param state: |
||||
:return: |
||||
""" |
||||
|
||||
issues = [] |
||||
|
||||
if state.get_current_instruction()["opcode"] == "JUMPI": |
||||
# We're in JUMPI prehook |
||||
|
||||
for annotation in state.mstate.stack[-2].annotations: |
||||
|
||||
if isinstance(annotation, TxOriginAnnotation): |
||||
constraints = copy(state.world_state.constraints) |
||||
|
||||
try: |
||||
transaction_sequence = solver.get_transaction_sequence( |
||||
state, constraints |
||||
) |
||||
except UnsatError: |
||||
continue |
||||
|
||||
description = ( |
||||
"The tx.origin environment variable has been found to influence a control flow decision. " |
||||
"Note that using tx.origin as a security control might cause a situation where a user " |
||||
"inadvertently authorizes a smart contract to perform an action on their behalf. It is " |
||||
"recommended to use msg.sender instead." |
||||
) |
||||
|
||||
severity = "Low" |
||||
|
||||
""" |
||||
Note: We report the location of the JUMPI instruction. Usually this maps to an if or |
||||
require statement. |
||||
""" |
||||
|
||||
issue = Issue( |
||||
contract=state.environment.active_account.contract_name, |
||||
function_name=state.environment.active_function_name, |
||||
address=state.get_current_instruction()["address"], |
||||
swc_id=TX_ORIGIN_USAGE, |
||||
bytecode=state.environment.code.bytecode, |
||||
title="Dependence on tx.origin", |
||||
severity=severity, |
||||
description_head="Use of tx.origin as a part of authorization control.", |
||||
description_tail=description, |
||||
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used), |
||||
transaction_sequence=transaction_sequence, |
||||
) |
||||
|
||||
issues.append(issue) |
||||
|
||||
else: |
||||
|
||||
# In ORIGIN posthook |
||||
|
||||
state.mstate.stack[-1].annotate(TxOriginAnnotation()) |
||||
|
||||
return issues |
||||
|
||||
|
||||
detector = TxOrigin() |
@ -1,90 +0,0 @@ |
||||
"""This module contains the detection code for deprecated op codes.""" |
||||
from mythril.analysis.potential_issues import ( |
||||
PotentialIssue, |
||||
get_potential_issues_annotation, |
||||
) |
||||
from mythril.analysis.swc_data import DEPRECATED_FUNCTIONS_USAGE |
||||
from mythril.analysis.module.base import DetectionModule, EntryPoint |
||||
from mythril.laser.ethereum.state.global_state import GlobalState |
||||
import logging |
||||
|
||||
log = logging.getLogger(__name__) |
||||
|
||||
DESCRIPTION = """ |
||||
Check for usage of deprecated opcodes |
||||
""" |
||||
|
||||
|
||||
class DeprecatedOperations(DetectionModule): |
||||
"""This module checks for the usage of deprecated op codes.""" |
||||
|
||||
name = "Usage of deprecated instructions" |
||||
swc_id = DEPRECATED_FUNCTIONS_USAGE |
||||
description = DESCRIPTION |
||||
entry_point = EntryPoint.CALLBACK |
||||
pre_hooks = ["ORIGIN", "CALLCODE"] |
||||
|
||||
def _execute(self, state: GlobalState) -> None: |
||||
""" |
||||
|
||||
:param state: |
||||
:return: |
||||
""" |
||||
if state.get_current_instruction()["address"] in self.cache: |
||||
return |
||||
issues = self._analyze_state(state) |
||||
|
||||
annotation = get_potential_issues_annotation(state) |
||||
annotation.potential_issues.extend(issues) |
||||
|
||||
def _analyze_state(self, state): |
||||
""" |
||||
|
||||
:param state: |
||||
:return: |
||||
""" |
||||
node = state.node |
||||
instruction = state.get_current_instruction() |
||||
|
||||
if instruction["opcode"] == "ORIGIN": |
||||
log.debug("ORIGIN in function " + node.function_name) |
||||
title = "Use of tx.origin" |
||||
description_head = "Use of tx.origin is deprecated." |
||||
description_tail = ( |
||||
"The smart contract retrieves the transaction origin (tx.origin) using msg.origin. " |
||||
"Use of msg.origin is deprecated and the instruction may be removed in the future. " |
||||
"Use msg.sender instead.\nSee also: " |
||||
"https://solidity.readthedocs.io/en/develop/security-considerations.html#tx-origin" |
||||
) |
||||
swc_id = DEPRECATED_FUNCTIONS_USAGE |
||||
|
||||
elif instruction["opcode"] == "CALLCODE": |
||||
log.debug("CALLCODE in function " + state.environment.active_function_name) |
||||
title = "Use of callcode" |
||||
description_head = "Use of callcode is deprecated." |
||||
description_tail = ( |
||||
"The callcode method executes code of another contract in the context of the caller account. " |
||||
"Due to a bug in the implementation it does not persist sender and value over the call. It was " |
||||
"therefore deprecated and may be removed in the future. Use the delegatecall method instead." |
||||
) |
||||
swc_id = DEPRECATED_FUNCTIONS_USAGE |
||||
else: |
||||
return [] |
||||
|
||||
potential_issue = PotentialIssue( |
||||
contract=state.environment.active_account.contract_name, |
||||
function_name=state.environment.active_function_name, |
||||
address=instruction["address"], |
||||
title=title, |
||||
bytecode=state.environment.code.bytecode, |
||||
detector=self, |
||||
swc_id=swc_id, |
||||
severity="Medium", |
||||
description_head=description_head, |
||||
description_tail=description_tail, |
||||
constraints=[], |
||||
) |
||||
return [potential_issue] |
||||
|
||||
|
||||
detector = DeprecatedOperations() |
@ -0,0 +1,49 @@ |
||||
from functools import lru_cache |
||||
from z3 import sat, unknown |
||||
|
||||
from mythril.support.support_args import args |
||||
from mythril.laser.smt import Optimize |
||||
from mythril.laser.ethereum.time_handler import time_handler |
||||
from mythril.exceptions import UnsatError |
||||
import logging |
||||
|
||||
log = logging.getLogger(__name__) |
||||
|
||||
# LRU cache works great when used in powers of 2 |
||||
|
||||
|
||||
@lru_cache(maxsize=2 ** 23) |
||||
def get_model(constraints, minimize=(), maximize=(), enforce_execution_time=True): |
||||
""" |
||||
Returns a model based on given constraints as a tuple |
||||
:param constraints: Tuple of constraints |
||||
:param minimize: Tuple of minimization conditions |
||||
:param maximize: Tuple of maximization conditions |
||||
:param enforce_execution_time: Bool variable which enforces --execution-timeout's time |
||||
:return: |
||||
""" |
||||
s = Optimize() |
||||
timeout = args.solver_timeout |
||||
if enforce_execution_time: |
||||
timeout = min(timeout, time_handler.time_remaining() - 500) |
||||
if timeout <= 0: |
||||
raise UnsatError |
||||
s.set_timeout(timeout) |
||||
for constraint in constraints: |
||||
if type(constraint) == bool and not constraint: |
||||
raise UnsatError |
||||
|
||||
constraints = [constraint for constraint in constraints if type(constraint) != bool] |
||||
|
||||
for constraint in constraints: |
||||
s.add(constraint) |
||||
for e in minimize: |
||||
s.minimize(e) |
||||
for e in maximize: |
||||
s.maximize(e) |
||||
result = s.check() |
||||
if result == sat: |
||||
return s.model() |
||||
elif result == unknown: |
||||
log.debug("Timeout encountered while solving expression using z3") |
||||
raise UnsatError |
@ -0,0 +1,13 @@ |
||||
class Args: |
||||
""" |
||||
This module helps in preventing args being sent through multiple of classes to reach |
||||
any analysis/laser module |
||||
""" |
||||
|
||||
def __init__(self): |
||||
self.solver_timeout = 10000 |
||||
self.sparse_pruning = True |
||||
self.unconstrained_storage = False |
||||
|
||||
|
||||
args = Args() |
@ -0,0 +1,20 @@ |
||||
import pytest |
||||
from mythril.laser.ethereum.strategy.extensions.bounded_loops import ( |
||||
BoundedLoopsStrategy, |
||||
) |
||||
|
||||
|
||||
@pytest.mark.parametrize( |
||||
"trace, count", |
||||
[ |
||||
([6, 7, 7, 7], 3), |
||||
([6, 8, 6, 7, 6, 7, 6, 7, 6, 7], 4), |
||||
([6, 6, 6, 6], 4), |
||||
([6, 7, 8] * 10, 10), |
||||
([7, 9, 10] + list(range(1, 100)) * 100, 100), |
||||
([7, 10, 15], 0), |
||||
([7] * 100, 100), |
||||
], |
||||
) |
||||
def test_loop_count(trace, count): |
||||
assert count == BoundedLoopsStrategy.get_loop_count(trace) |
Loading…
Reference in new issue