Test issue constraints at the end of each transaction

fix/constraints-after-issues
Nathan 5 years ago
parent 00dc912f2c
commit bb3aabdb09
  1. 2
      mythril/analysis/modules/base.py
  2. 29
      mythril/analysis/modules/deprecated_ops.py
  3. 56
      mythril/analysis/modules/ether_thief.py
  4. 1
      mythril/analysis/modules/external_calls.py
  5. 31
      mythril/analysis/modules/state_change_external_calls.py
  6. 108
      mythril/analysis/potential_issues.py
  7. 4
      mythril/laser/ethereum/svm.py

@ -3,6 +3,8 @@ modules."""
import logging
from typing import List, Set
from mythril.analysis.potential_issues import PotentialIssue
from mythril.analysis.report import Issue
log = logging.getLogger(__name__)

@ -1,6 +1,8 @@
"""This module contains the detection code for deprecated op codes."""
from mythril.analysis.report import Issue
from mythril.analysis.solver import get_transaction_sequence, UnsatError
from mythril.analysis.potential_issues import (
PotentialIssue,
get_potential_issues_annotation,
)
from mythril.analysis.swc_data import DEPRECATED_FUNCTIONS_USAGE
from mythril.analysis.modules.base import DetectionModule
from mythril.laser.ethereum.state.global_state import GlobalState
@ -36,12 +38,10 @@ class DeprecatedOperationsModule(DetectionModule):
return
issues = self._analyze_state(state)
for issue in issues:
self._cache.add(issue.address)
self._issues.extend(issues)
annotation = get_potential_issues_annotation(state)
annotation.potential_issues.extend(issues)
@staticmethod
def _analyze_state(state):
def _analyze_state(self, state):
"""
:param state:
@ -76,26 +76,21 @@ class DeprecatedOperationsModule(DetectionModule):
swc_id = DEPRECATED_FUNCTIONS_USAGE
else:
return []
try:
transaction_sequence = get_transaction_sequence(
state, state.mstate.constraints
)
except UnsatError:
return []
issue = Issue(
potential_issue = PotentialIssue(
contract=state.environment.active_account.contract_name,
function_name=state.environment.active_function_name,
address=instruction["address"],
title=title,
bytecode=state.environment.code.bytecode,
detector=self,
swc_id=swc_id,
severity="Medium",
description_head=description_head,
description_tail=description_tail,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
transaction_sequence=transaction_sequence,
constraints=[],
)
return [issue]
return [potential_issue]
detector = DeprecatedOperationsModule()

@ -3,15 +3,16 @@ withdrawal."""
import logging
from copy import copy
from mythril.analysis import solver
from mythril.analysis.modules.base import DetectionModule
from mythril.analysis.report import Issue
from mythril.analysis.potential_issues import (
get_potential_issues_annotation,
PotentialIssue,
)
from mythril.laser.ethereum.transaction.symbolic import (
ATTACKER_ADDRESS,
CREATOR_ADDRESS,
)
from mythril.analysis.swc_data import UNPROTECTED_ETHER_WITHDRAWAL
from mythril.exceptions import UnsatError
from mythril.laser.ethereum.state.global_state import GlobalState
from mythril.laser.ethereum.transaction import ContractCreationTransaction
@ -62,13 +63,12 @@ class EtherThief(DetectionModule):
"""
if state.get_current_instruction()["address"] in self._cache:
return
issues = self._analyze_state(state)
for issue in issues:
self._cache.add(issue.address)
self._issues.extend(issues)
potential_issues = self._analyze_state(state)
annotation = get_potential_issues_annotation(state)
annotation.potential_issues.extend(potential_issues)
@staticmethod
def _analyze_state(state):
def _analyze_state(self, state):
"""
:param state:
@ -115,29 +115,23 @@ class EtherThief(DetectionModule):
state.current_transaction.caller == ATTACKER_ADDRESS,
]
try:
transaction_sequence = solver.get_transaction_sequence(state, constraints)
issue = Issue(
contract=state.environment.active_account.contract_name,
function_name=state.environment.active_function_name,
address=instruction["address"],
swc_id=UNPROTECTED_ETHER_WITHDRAWAL,
title="Unprotected Ether Withdrawal",
severity="High",
bytecode=state.environment.code.bytecode,
description_head="Anyone can withdraw ETH from the contract account.",
description_tail="Arbitrary senders other than the contract creator can withdraw ETH from the contract"
+ " account without previously having sent an equivalent amount of ETH to it. This is likely to be"
+ " a vulnerability.",
transaction_sequence=transaction_sequence,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
except UnsatError:
log.debug("No model found")
return []
potential_issue = PotentialIssue(
contract=state.environment.active_account.contract_name,
function_name=state.environment.active_function_name,
address=instruction["address"],
swc_id=UNPROTECTED_ETHER_WITHDRAWAL,
title="Unprotected Ether Withdrawal",
severity="High",
bytecode=state.environment.code.bytecode,
description_head="Anyone can withdraw ETH from the contract account.",
description_tail="Arbitrary senders other than the contract creator can withdraw ETH from the contract"
+ " account without previously having sent an equivalent amount of ETH to it. This is likely to be"
+ " a vulnerability.",
detector=self,
constraints=constraints,
)
return [issue]
return [potential_issue]
detector = EtherThief()

@ -15,7 +15,6 @@ from mythril.laser.ethereum.state.global_state import GlobalState
from mythril.exceptions import UnsatError
from copy import copy
import logging
import json
log = logging.getLogger(__name__)

@ -1,6 +1,10 @@
from mythril.analysis.potential_issues import (
PotentialIssue,
get_potential_issues_annotation,
)
from mythril.analysis.swc_data import REENTRANCY
from mythril.analysis.modules.base import DetectionModule
from mythril.analysis.report import Issue
from mythril.laser.ethereum.state.constraints import Constraints
from mythril.laser.smt import symbol_factory, UGT, BitVec, Or
from mythril.laser.ethereum.state.global_state import GlobalState
from mythril.laser.ethereum.state.annotation import StateAnnotation
@ -32,10 +36,12 @@ class StateChangeCallsAnnotation(StateAnnotation):
new_annotation.state_change_states = self.state_change_states[:]
return new_annotation
def get_issue(self, global_state: GlobalState) -> Optional[Issue]:
def get_issue(
self, global_state: GlobalState, detector: DetectionModule
) -> Optional[PotentialIssue]:
if not self.state_change_states:
return None
constraints = copy(global_state.mstate.constraints)
constraints = Constraints()
gas = self.call_state.mstate.stack[-1]
to = self.call_state.mstate.stack[-2]
constraints += [
@ -50,10 +56,11 @@ class StateChangeCallsAnnotation(StateAnnotation):
try:
transaction_sequence = solver.get_transaction_sequence(
global_state, constraints
global_state, constraints + global_state.mstate.constraints
)
except UnsatError:
return None
severity = "Medium" if self.user_defined_address else "Low"
address = global_state.get_current_instruction()["address"]
logging.debug(
@ -67,7 +74,7 @@ class StateChangeCallsAnnotation(StateAnnotation):
"state change takes place. This can lead to business logic vulnerabilities."
)
return Issue(
return PotentialIssue(
contract=global_state.environment.active_account.contract_name,
function_name=global_state.environment.active_function_name,
address=address,
@ -77,7 +84,8 @@ class StateChangeCallsAnnotation(StateAnnotation):
description_tail=description_tail,
swc_id=REENTRANCY,
bytecode=global_state.environment.code.bytecode,
transaction_sequence=transaction_sequence,
constraints=constraints,
detector=detector,
)
@ -107,9 +115,9 @@ class StateChange(DetectionModule):
if state.get_current_instruction()["address"] in self._cache:
return
issues = self._analyze_state(state)
for issue in issues:
self._cache.add(issue.address)
self._issues.extend(issues)
annotation = get_potential_issues_annotation(state)
annotation.potential_issues.extend(issues)
@staticmethod
def _add_external_call(global_state: GlobalState) -> None:
@ -139,8 +147,7 @@ class StateChange(DetectionModule):
except UnsatError:
pass
@staticmethod
def _analyze_state(global_state: GlobalState) -> List[Issue]:
def _analyze_state(self, global_state: GlobalState) -> List[PotentialIssue]:
annotations = cast(
List[StateChangeCallsAnnotation],
@ -171,7 +178,7 @@ class StateChange(DetectionModule):
for annotation in annotations:
if not annotation.state_change_states:
continue
issue = annotation.get_issue(global_state)
issue = annotation.get_issue(global_state, self)
if issue:
vulnerabilities.append(issue)
return vulnerabilities

@ -0,0 +1,108 @@
from mythril.analysis.report import Issue
from mythril.analysis.solver import get_transaction_sequence
from mythril.exceptions import UnsatError
from mythril.laser.ethereum.state.annotation import StateAnnotation
from mythril.laser.ethereum.state.global_state import GlobalState
class PotentialIssue:
"""Representation of a potential issue"""
def __init__(
self,
contract,
function_name,
address,
swc_id,
title,
bytecode,
detector,
severity=None,
description_head="",
description_tail="",
constraints=None,
):
"""
:param contract: The contract
:param function_name: Function name where the issue is detected
:param address: The address of the issue
:param swc_id: Issue's corresponding swc-id
:param title: Title
:param bytecode: bytecode of the issue
:param detector: The detector the potential issue belongs to
:param gas_used: amount of gas used
:param severity: The severity of the issue
:param description_head: The top part of description
:param description_tail: The bottom part of the description
:param constraints: The non-path related constraints for the potential issue
"""
self.title = title
self.contract = contract
self.function_name = function_name
self.address = address
self.description_head = description_head
self.description_tail = description_tail
self.severity = severity
self.swc_id = swc_id
self.bytecode = bytecode
self.constraints = constraints or []
self.detector = detector
class PotentialIssuesAnnotation(StateAnnotation):
def __init__(self):
self.potential_issues = []
def get_potential_issues_annotation(state: GlobalState) -> PotentialIssuesAnnotation:
"""
Returns the potential issues annotation of the given global state, and creates one if
one does not already exist.
:param state: The global state
:return:
"""
for annotation in state.annotations:
if isinstance(annotation, PotentialIssuesAnnotation):
return annotation
annotation = PotentialIssuesAnnotation()
state.annotate(annotation)
return annotation
def check_potential_issues(state: GlobalState) -> None:
"""
Called at the end of a transaction, checks potential issues, and
adds valid issues to the detector.
:param state: The final global state of a transaction
:return:
"""
annotation = get_potential_issues_annotation(state)
for potential_issue in annotation.potential_issues[:]:
try:
transaction_sequence = get_transaction_sequence(
state, state.mstate.constraints + potential_issue.constraints
)
except UnsatError:
continue
annotation.potential_issues.remove(potential_issue)
potential_issue.detector._cache.add(potential_issue.address)
potential_issue.detector._issues.append(
Issue(
contract=potential_issue.contract,
function_name=potential_issue.function_name,
address=potential_issue.address,
title=potential_issue.title,
bytecode=potential_issue.bytecode,
swc_id=potential_issue.swc_id,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
severity=potential_issue.severity,
description_head=potential_issue.description_head,
description_tail=potential_issue.description_tail,
transaction_sequence=transaction_sequence,
)
)

@ -5,6 +5,7 @@ from copy import copy
from datetime import datetime, timedelta
from typing import Callable, Dict, DefaultDict, List, Tuple, Optional
from mythril.analysis.potential_issues import check_potential_issues
from mythril.laser.ethereum.cfg import NodeFlags, Node, Edge, JumpType
from mythril.laser.ethereum.evm_exceptions import StackUnderflowException
from mythril.laser.ethereum.evm_exceptions import VmException
@ -342,6 +343,8 @@ class LaserEVM:
not isinstance(transaction, ContractCreationTransaction)
or transaction.return_data
) and not end_signal.revert:
check_potential_issues(global_state)
end_signal.global_state.world_state.node = global_state.node
self._add_world_state(end_signal.global_state)
new_global_states = []
@ -375,7 +378,6 @@ class LaserEVM:
:param return_data:
:return:
"""
return_global_state.mstate.constraints += global_state.mstate.constraints
# Resume execution of the transaction initializing instruction
op_code = return_global_state.environment.code.instruction_list[

Loading…
Cancel
Save