Add summaries and additional bugfixes (#1830)

* Add summaries and additional bugfixes

* Update test w.r.t bugfixes

* Update namespace

* Disable sonar
z3
Nikhil Parasaram 10 months ago committed by GitHub
parent f61c2d4515
commit 205d50a50f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 18
      .circleci/config.yml
  2. 20
      mythril/analysis/report.py
  3. 3
      mythril/analysis/symbolic.py
  4. 5
      mythril/interfaces/cli.py
  5. 1
      mythril/laser/plugin/plugins/__init__.py
  6. 1
      mythril/laser/plugin/plugins/summary/__init__.py
  7. 44
      mythril/laser/plugin/plugins/summary/annotations.py
  8. 439
      mythril/laser/plugin/plugins/summary/core.py
  9. 151
      mythril/laser/plugin/plugins/summary/summary.py
  10. 1
      mythril/mythril/mythril_analyzer.py
  11. 1
      mythril/support/support_args.py
  12. 10
      tests/analysis/abi_decode_test.py
  13. 1
      tests/graph_test.py
  14. 114
      tests/integration_tests/summary_test.py
  15. 1
      tests/mythril/mythril_analyzer_test.py
  16. 1
      tests/statespace_test.py
  17. 21
      tests/testdata/input_contracts/base_case.sol
  18. 88
      tests/testdata/input_contracts/complex.sol
  19. 88
      tests/testdata/input_contracts/destruct.sol
  20. 15
      tests/testdata/input_contracts/hash_test.sol
  21. 14
      tests/testdata/input_contracts/large.sol
  22. 8
      tests/testdata/input_contracts/simple_theft.sol
  23. 25
      tests/testdata/input_contracts/theft.sol

@ -72,15 +72,15 @@ jobs:
command: python3 -m build
working_directory: /home/mythril
- run:
name: Sonar analysis
command: if [ -z "$CIRCLE_PR_NUMBER" ]; then if [ -z "$CIRCLE_TAG" ]; then
sonar-scanner -Dsonar.projectKey=$SONAR_PROJECT_KEY
-Dsonar.organization=$SONAR_ORGANIZATION
-Dsonar.branch.name=$CIRCLE_BRANCH
-Dsonar.projectBaseDir=/home/mythril -Dsonar.sources=mythril
-Dsonar.host.url=$SONAR_HOST_URL -Dsonar.tests=/home/mythril/tests
-Dsonar.login=$SONAR_LOGIN; fi; fi
#- run:
# name: Sonar analysis
# command: if [ -z "$CIRCLE_PR_NUMBER" ]; then if [ -z "$CIRCLE_TAG" ]; then
# sonar-scanner -Dsonar.projectKey=$SONAR_PROJECT_KEY
# -Dsonar.organization=$SONAR_ORGANIZATION
# -Dsonar.branch.name=$CIRCLE_BRANCH
# -Dsonar.projectBaseDir=/home/mythril -Dsonar.sources=mythril
# -Dsonar.host.url=$SONAR_HOST_URL -Dsonar.tests=/home/mythril/tests
# -Dsonar.login=$SONAR_LOGIN; fi; fi
# - run:
# name: Integration tests
# command: if [ -z "$CIRCLE_PR_NUMBER" ]; then ./run-integration-tests.sh; fi

@ -1,4 +1,5 @@
"""This module provides classes that make up an issue report."""
import base64
import logging
import re
import json
@ -10,7 +11,7 @@ except ImportError:
from eth_abi import decode_abi as decode
from jinja2 import PackageLoader, Environment
from typing import Dict, List, Any, Optional
from typing import Dict, Iterable, List, Any, Optional
import hashlib
from mythril.laser.execution_info import ExecutionInfo
@ -236,11 +237,28 @@ class Issue:
data += "0" * (64 - len(data) % 64)
try:
decoded_output = decode(type_info, bytes.fromhex(data))
decoded_output = tuple(
convert_bytes(item) if isinstance(item, (bytes, Iterable)) else item
for item in decoded_output
)
return decoded_output
except Exception as e:
return None
def convert_bytes(item):
"""
Converts bytes to a serializable format. Handles nested iterables.
"""
if isinstance(item, bytes):
return item.hex()
elif isinstance(item, Iterable) and not isinstance(item, (str, bytes)):
# Recursively apply convert_bytes to each item in the iterable
return type(item)(convert_bytes(subitem) for subitem in item)
else:
return item
class Report:
"""A report containing the content of multiple issues."""

@ -26,6 +26,7 @@ from mythril.laser.plugin.plugins import (
CoveragePluginBuilder,
CallDepthLimitBuilder,
InstructionProfilerBuilder,
SymbolicSummaryPluginBuilder,
)
from mythril.laser.ethereum.strategy.extensions.bounded_loops import (
BoundedLoopsStrategy,
@ -148,6 +149,8 @@ class SymExecWrapper:
plugin_loader.load(MutationPrunerBuilder())
if not args.disable_iprof:
plugin_loader.load(InstructionProfilerBuilder())
if args.enable_summaries:
plugin_loader.load(SymbolicSummaryPluginBuilder())
plugin_loader.load(CallDepthLimitBuilder())
plugin_loader.add_args(

@ -572,6 +572,11 @@ def add_analysis_args(options):
action="store_true",
help="Disable mutation pruner",
)
options.add_argument(
"--enable-summaries",
action="store_true",
help="Enable using symbolic summaries",
)
options.add_argument(
"--custom-modules-directory",
help="Designates a separate directory to search for custom analysis modules",

@ -11,3 +11,4 @@ from mythril.laser.plugin.plugins.dependency_pruner import DependencyPrunerBuild
from mythril.laser.plugin.plugins.mutation_pruner import MutationPrunerBuilder
from mythril.laser.plugin.plugins.call_depth_limiter import CallDepthLimitBuilder
from mythril.laser.plugin.plugins.instruction_profiler import InstructionProfilerBuilder
from mythril.laser.plugin.plugins.summary import SymbolicSummaryPluginBuilder

@ -0,0 +1 @@
from .core import SymbolicSummaryPluginBuilder

@ -0,0 +1,44 @@
from mythril.laser.ethereum.state.annotation import StateAnnotation
from mythril.laser.ethereum.state.global_state import GlobalState
from mythril.laser.ethereum.state.environment import Environment
from mythril.laser.smt import Bool, BaseArray
from typing import List, Tuple
from copy import deepcopy, copy
class SummaryTrackingAnnotation(StateAnnotation):
"""SummaryTrackingAnnotation
This annotation is used by the symbolic summary plugin to keep track of data related to a summary that
will be computed during the future exploration of the annotated world state.
"""
def __init__(
self,
entry: GlobalState,
storage_pairs: List[Tuple[BaseArray, BaseArray]],
storage_constraints: List[Bool],
environment_pair: Tuple[Environment, Environment],
balance_pair: Tuple[BaseArray, BaseArray],
code: str,
):
self.entry = entry
self.trace = []
self.storage_pairs = storage_pairs
self.storage_constraints = storage_constraints
self.environment_pair = environment_pair
self.balance_pair = balance_pair
self.code = code
def __copy__(self):
annotation = SummaryTrackingAnnotation(
entry=self.entry,
storage_pairs=deepcopy(self.storage_pairs),
storage_constraints=deepcopy(self.storage_constraints),
environment_pair=deepcopy(self.environment_pair),
balance_pair=deepcopy(self.balance_pair),
code=self.code,
)
annotation.trace = self.trace
return annotation

@ -0,0 +1,439 @@
from .summary import SymbolicSummary, substitute_exprs
from .annotations import SummaryTrackingAnnotation
from mythril.analysis.issue_annotation import IssueAnnotation
from mythril.analysis.potential_issues import check_potential_issues
from mythril.analysis import solver
from mythril.analysis.solver import get_transaction_sequence
from mythril.exceptions import UnsatError
from mythril.laser.ethereum.svm import LaserEVM
from mythril.laser.plugin.builder import PluginBuilder
from mythril.laser.plugin.interface import LaserPlugin
from mythril.laser.plugin.signals import PluginSkipState
from mythril.laser.plugin.plugins.plugin_annotations import MutationAnnotation
from mythril.laser.ethereum.transaction.transaction_models import (
ContractCreationTransaction,
BaseTransaction,
)
from mythril.support.support_utils import get_code_hash
from mythril.laser.ethereum.function_managers import (
keccak_function_manager,
KeccakFunctionManager,
)
from mythril.laser.ethereum.state.global_state import GlobalState
from mythril.laser.ethereum.state.constraints import Constraints
from mythril.laser.ethereum.state.environment import Environment
from mythril.laser.ethereum.state.calldata import SymbolicCalldata
from mythril.laser.ethereum.state.account import Account
from mythril.laser.smt import (
K,
Array,
BaseArray,
Bool,
simplify,
Solver,
Not,
Or,
symbol_factory,
Expression,
)
from mythril.support.support_args import args
import z3
from typing import Dict, Tuple, List, Optional, Set
from copy import copy, deepcopy
from mythril.support.model import get_model
import logging
log = logging.getLogger(__name__)
class SymbolicSummaryPluginBuilder(PluginBuilder):
name = "Symbolic Summaries"
def __call__(self, *args, **kwargs):
return SymbolicSummaryPlugin()
class SymbolicSummaryPlugin(LaserPlugin):
def __init__(self):
self.summaries = []
args.use_issue_annotations = True
self.issue_cache: Set[Tuple[str, str, int]] = set()
self.init_save_states = []
self.save_init_balance = None
def initialize(self, symbolic_vm: LaserEVM):
"""Initializes the symbolic summary generating plugin
Introduces hooks for each instruction
:param symbolic_vm: the symbolic virtual machine to initialize this plugin for
"""
"""
@symbolic_vm.laser_hook("start_execute_transactions"):
def start_exec_txs_hook():
log.info(f"Started executing transactions")
symbolic_vm.executed_transactions = True
"""
@symbolic_vm.laser_hook("stop_sym_exec")
def stop_sym_exec_hook():
# Print results
log.info(f"Generated {len(self.summaries)} summaries")
@symbolic_vm.laser_hook("execute_state")
def execute_start_sym_trans_hook(global_state: GlobalState):
if global_state.mstate.pc == 0:
if len(global_state.world_state.transaction_sequence) == 2:
self.init_save_states.append(deepcopy(global_state))
self._apply_summaries(laser_evm=symbolic_vm, global_state=global_state)
self.save_init_balance = deepcopy(global_state.world_state.balances)
self._summary_entry(global_state)
@symbolic_vm.post_hook("JUMPI")
@symbolic_vm.post_hook("JUMP")
def call_mutator_hook(global_state: GlobalState):
for annotation in global_state.get_annotations(SummaryTrackingAnnotation):
annotation.trace.append(global_state.instruction["address"])
@symbolic_vm.laser_hook("transaction_end")
def transaction_end_hook(
global_state: GlobalState,
transaction: BaseTransaction,
return_global_state: Optional[GlobalState],
revert: bool = True,
):
if return_global_state is not None:
return
if (
not isinstance(transaction, ContractCreationTransaction)
or transaction.return_data
) and (not revert or list(global_state.get_annotations(IssueAnnotation))):
check_potential_issues(global_state)
self._summary_exit(global_state, transaction, revert)
def _summary_entry(self, global_state: GlobalState):
"""Handles logic for when the analysis reaches an entry point of a to-be recorded symbolic summary
:param global_state: The state at the entry of the symbolic summary
"""
summary_constraints = []
storage_pairs = []
# Rewrite storage
for index, account in global_state.world_state.accounts.items():
actual_storage = deepcopy(account.storage._standard_storage)
symbolic_storage = Array(f"{index}_symbolic_storage", 256, 256)
account.storage._standard_storage = symbolic_storage
storage_pairs.append((actual_storage, symbolic_storage))
account.storage.keys_get = set()
account.storage.keys_set = set()
# Rewrite balances
previous_balances, summary_balances = (
global_state.world_state.balances,
Array("summary_balance", 256, 256),
)
global_state.world_state.balances = summary_balances
balances_pair = (previous_balances, summary_balances)
# Rewrite environment
previous_environment = global_state.environment
summary_environment = self._create_summary_environment(previous_environment)
environment_pair = (previous_environment, summary_environment)
# Construct the summary tracking annotation
summary_annotation = SummaryTrackingAnnotation(
global_state,
storage_pairs,
summary_constraints,
environment_pair,
balances_pair,
global_state.environment.code.bytecode,
)
# Introduce annotation and constraints to the global state
for constraint in summary_constraints:
global_state.world_state.constraints.append(constraint)
global_state.annotate(summary_annotation)
def _create_summary_environment(self, base_environment: Environment) -> Environment:
return Environment(
# No need to rewrite, accounts are handled in other procedure
active_account=base_environment.active_account,
# Need to rewrite, different symbol for each transaction
sender=symbol_factory.BitVecSym("summary_sender", 256),
# Need to rewrite, different symbol for each transaction
origin=symbol_factory.BitVecSym("summary_origin", 256),
# Need to rewrite, different symbol for each transaction
calldata=SymbolicCalldata("summary"),
# Need to rewrite, different symbol for each transaction
gasprice=symbol_factory.BitVecSym("summary_origin", 256),
# Need to rewrite, different symbol for each transaction
callvalue=symbol_factory.BitVecSym("summary_callvalue", 256),
# No need to rewrite, this can be inherited from the original environment
static=base_environment.static,
# No need to rewrite, this can be inherited from the original environment
code=base_environment.code,
basefee=base_environment.basefee,
)
@classmethod
def _restore_environment(
cls,
summary_tracking_annotation: SummaryTrackingAnnotation,
global_state: GlobalState,
):
global_state.environment = summary_tracking_annotation.environment_pair[0]
original, summary = summary_tracking_annotation.environment_pair
# Rewrite sender
cls._rewrite(global_state, summary.sender, original.sender)
# Rewrite origin
cls._rewrite(global_state, summary.origin, original.origin)
# Rewrite calldata
cls._rewrite(
global_state, summary.calldata.calldatasize, original.calldata.calldatasize
)
cls._rewrite(
global_state, summary.calldata._calldata, original.calldata._calldata
)
# Rewrite gasprice
cls._rewrite(global_state, summary.gasprice, original.gasprice)
# Rewrite callvalue
cls._rewrite(global_state, summary.callvalue, original.callvalue)
def check_for_issues(self, global_state):
for summary in self.summaries:
for issue in summary.issues:
self._check_issue(global_state, issue)
def storage_dependent(self, summary, global_state: GlobalState) -> bool:
"""
Checks if storage of summary depends on global state's previous storage stores
"""
total_key_set = set()
for index in global_state.accounts:
total_key_set = total_key_set.union(
global_state.accounts[index].storage.keys_set
)
if len(global_state.world_state.transaction_sequence) <= 3:
return True
for index, storage_get in summary.get_map.items():
for key in storage_get:
if key.symbolic is False:
if key in global_state.accounts[index].storage.keys_set:
return True
else:
for state_key in global_state.accounts[index].storage.keys_set:
s = Solver()
s.set_timeout(3000)
s.add(state_key == key)
s.add(keccak_function_manager.create_conditions())
sat = s.check() == z3.sat
if sat:
return True
return False
def _apply_summaries(self, laser_evm: LaserEVM, global_state: GlobalState):
"""
Applies summaries on the EVM
"""
pc = global_state.instruction["address"]
self.check_for_issues(global_state)
summaries = [
summary
for summary in self.summaries
if summary.entry == pc
and summary.code == global_state.environment.code.bytecode
and not summary.revert
and summary.storage_effect
]
for summary in summaries:
resulting_state = summary.apply_summary(global_state)
if resulting_state:
laser_evm._add_world_state(resulting_state[0])
if summaries:
raise PluginSkipState
def issue_in_cache(
self, global_state: GlobalState, issue_annotation: IssueAnnotation
) -> bool:
address = (
issue_annotation.issue.source_location or issue_annotation.issue.address
)
return (
issue_annotation.detector.swc_id,
address,
get_code_hash(global_state.environment.code.bytecode),
) in self.issue_cache
def _check_issue(
self, global_state: GlobalState, issue_annotation: IssueAnnotation
):
if self.issue_in_cache(global_state, issue_annotation):
return
success = 0
tx_seq = []
for constraint in issue_annotation.conditions:
condition = self._translate_condition(
global_state,
[constraint, deepcopy(keccak_function_manager.create_conditions())],
)
condition = [
expr
for expr in global_state.world_state.constraints.as_list + condition
]
try:
tx_seq = get_transaction_sequence(global_state, Constraints(condition))
success += 1
except UnsatError:
break
if success == len(issue_annotation.conditions):
log.info("Found an issue")
new_issue = copy(issue_annotation.issue)
new_issue.transaction_sequence = tx_seq
issue_annotation.detector.issues += [new_issue]
addresss = (
issue_annotation.issue.source_location or issue_annotation.issue.address
)
self.issue_cache.add(
(
issue_annotation.detector.swc_id,
addresss,
get_code_hash(global_state.environment.code.bytecode),
)
)
def _translate_condition(self, global_state: GlobalState, condition: List[Bool]):
condition = deepcopy(condition)
for account_id, account in global_state.world_state.accounts.items():
for expression in condition:
substitute_exprs(expression, account_id, account, global_state)
return condition
def _summary_exit(
self, global_state: GlobalState, transaction: BaseTransaction, revert: bool
):
"""Handles logic for when the analysis reaches the summary exit
This function populates self.summaries with the discovered symbolic summaries
:param global_state: The state at the exit of the discovered symbolic summary
"""
summary_annotation = self._get_and_remove_summary_tracking_annotation(
global_state
)
if not summary_annotation:
log.error("Missing Annotation")
return
self._record_symbolic_summary(
global_state, summary_annotation, transaction, revert
)
self._restore_previous_state(global_state, summary_annotation)
@staticmethod
def _get_and_remove_summary_tracking_annotation(
global_state: GlobalState,
) -> Optional[SummaryTrackingAnnotation]:
"""Retrieves symbolic summary from the global state"""
summary_annotation: List[SummaryTrackingAnnotation] = list(
global_state.get_annotations(SummaryTrackingAnnotation)
)
if len(summary_annotation) != 1:
logging.warning(
f"Unexpected number of summary tracking annotations found: {len(summary_annotation)}\nSkipping..."
)
summary_annotation: SummaryTrackingAnnotation = summary_annotation[0]
global_state.annotations.remove(summary_annotation)
return summary_annotation
def _record_symbolic_summary(
self,
global_state: GlobalState,
tracking_annotation: SummaryTrackingAnnotation,
transaction: BaseTransaction,
revert,
):
"""Records a summary between tracking_annotation.entry and global_state"""
if (
len(list(global_state.get_annotations(MutationAnnotation))) == 0
and list(global_state.get_annotations(IssueAnnotation)) == 0
):
return
storage_mutations = []
return_value = transaction.return_data
set_map = {}
get_map = {}
for index, account in global_state.world_state.accounts.items():
if account.storage._standard_storage not in [
p[1] for p in tracking_annotation.storage_pairs
]:
get_map[account.address] = account.storage.keys_get
set_map[account.address] = account.storage.keys_set
storage_mutations.append(
(index, copy(account.storage._standard_storage))
)
condition = global_state.world_state.constraints.get_all_constraints()
for constraint in tracking_annotation.entry.world_state.constraints:
condition.remove(constraint)
annotations = list(global_state.get_annotations(IssueAnnotation))
summary = SymbolicSummary(
storage_effect=deepcopy(storage_mutations),
balance_effect=copy(global_state.world_state.balances),
condition=deepcopy(condition),
return_value=return_value,
entry=tracking_annotation.entry.mstate.pc,
exit=global_state.mstate.pc,
trace=tracking_annotation.trace,
code=tracking_annotation.code,
issues=list(global_state.get_annotations(IssueAnnotation)),
revert=revert,
get_map=get_map,
set_map=set_map,
)
log.debug(list(global_state.get_annotations(IssueAnnotation)))
# Calculate issues for the first transaction
if len(global_state.world_state.transaction_sequence) == 2:
for state in self.init_save_states:
for issue in summary.issues:
self._check_issue(state, issue)
self.summaries.append(summary)
@classmethod
def _restore_previous_state(
cls, global_state: GlobalState, tracking_annotation: SummaryTrackingAnnotation
):
"""Restores the previous persistent variables to the global state"""
for og_storage, sym_storage in tracking_annotation.storage_pairs:
cls._rewrite(global_state, sym_storage, og_storage)
cls._rewrite(
global_state,
tracking_annotation.balance_pair[1],
tracking_annotation.balance_pair[0],
)
cls._restore_environment(tracking_annotation, global_state)
@staticmethod
def _rewrite(global_state: GlobalState, original: Expression, new: Expression):
for account in global_state.world_state.accounts.values():
account.storage._standard_storage.substitute(original, new)
global_state.world_state.balances.substitute(original, new)
for constraint in global_state.world_state.constraints:
constraint.substitute(original, new)

@ -0,0 +1,151 @@
from mythril.laser.smt import BaseArray, Array, Solver, symbol_factory
from mythril.support.support_args import args
from mythril.laser.ethereum.state.global_state import GlobalState
from mythril.laser.plugin.plugins.plugin_annotations import MutationAnnotation
from copy import deepcopy
import logging
import z3
log = logging.getLogger(__name__)
class SymbolicSummary:
"""Symbolic Summary
A symbolic summary is an awesome construct that allows mythril to record and re-use partial analysis results
"""
def __init__(
self,
storage_effect,
balance_effect,
condition,
return_value,
entry,
exit,
trace,
code,
issues,
revert,
set_map=None,
get_map=None,
):
self.storage_effect = storage_effect
self.balance_effect = balance_effect
self.condition = condition
self.return_value = return_value
self.entry = entry
self.exit = exit
self.trace = trace
self.code = code
self.issues = issues
self.revert = revert
self.set_map = set_map
self.get_map = get_map
@property
def as_csv(self, delimiter=",", sub_array_delimiter=";", tuple_delimiter=":"):
condition = sub_array_delimiter.join(map(str, self.condition))
storage_effect = sub_array_delimiter.join(
[f"{ap[0]}{tuple_delimiter}{ap[1]}" for ap in self.storage_effect]
)
return_value = None
trace = sub_array_delimiter.join(map(str, self.trace))
return (
delimiter.join(
map(
str,
[
self.entry,
condition,
self.exit,
storage_effect,
return_value,
trace,
],
)
)
.replace("\n", "")
.replace(" ", "")
)
@property
def as_dict(self):
return dict(
entry=self.entry,
condition=list(map(str, self.condition)),
exit=self.exit,
storage_effect=list(map(str, self.storage_effect)),
balance_effect=str(self.balance_effect),
return_value=self.return_value,
trace=self.trace[:],
code=self.code,
issues=len(self.issues),
revert=self.revert,
)
def apply_summary(self, global_state: GlobalState):
# Copy and apply summary
global_state = deepcopy(global_state)
conditions = deepcopy(self.condition)
for account_id, account in global_state.world_state.accounts.items():
for expression in conditions:
substitute_exprs(expression, account_id, account, global_state)
for account_id, effect in self.storage_effect:
account = global_state.world_state.accounts[account_id]
new_storage = deepcopy(effect)
substitute_exprs(new_storage, account_id, account, global_state)
account.storage._standard_storage = new_storage
new_balances = deepcopy(self.balance_effect)
new_balances.substitute(
Array("summary_balance", 256, 256), global_state.world_state.balances
)
global_state.world_state.balances = new_balances
# Set constraints
global_state.world_state.constraints += [c for c in conditions]
# Check Condition
solver = Solver()
solver.set_timeout(args.solver_timeout)
solver.add(*(global_state.world_state.constraints.as_list))
sat = solver.check() == z3.sat
if not sat:
return []
global_state.node.constraints = global_state.world_state.constraints
global_state.world_state.node = global_state.node
global_state.annotate(MutationAnnotation())
return [global_state]
def substitute_exprs(expression, account_id, account, global_state):
a = Array("2_calldata", 256, 8)
b = Array(f"{global_state.current_transaction.id}_calldata", 256, 8)
expression.substitute(a, b)
a = symbol_factory.BitVecSym("2_calldatasize", 256)
b = symbol_factory.BitVecSym(
f"{global_state.current_transaction.id}_calldatasize", 256
)
expression.substitute(a, b)
a = symbol_factory.BitVecSym("sender_2", 256)
b = symbol_factory.BitVecSym(f"sender_{global_state.current_transaction.id}", 256)
expression.substitute(a, b)
a = symbol_factory.BitVecSym("call_value2", 256)
b = symbol_factory.BitVecSym(
f"call_value{global_state.current_transaction.id}", 256
)
expression.substitute(a, b)
a = Array(f"{account_id}_symbolic_storage", 256, 256)
b = account.storage._standard_storage
expression.substitute(a, b)
a = Array("summary_balance", 256, 256)
b = global_state.world_state.balances
expression.substitute(a, b)

@ -72,6 +72,7 @@ class MythrilAnalyzer:
args.transaction_sequences = cmd_args.transaction_sequences
args.disable_coverage_strategy = cmd_args.disable_coverage_strategy
args.disable_mutation_pruner = cmd_args.disable_mutation_pruner
args.enable_summaries = cmd_args.enable_summaries
if args.pruning_factor is None:
if self.execution_timeout > LARGE_TIME:

@ -23,6 +23,7 @@ class Args(object, metaclass=Singleton):
self.disable_coverage_strategy = False
self.disable_mutation_pruner = False
self.incremental_txs = True
self.enable_summaries = False
args = Args()

@ -30,17 +30,17 @@ test_data = (
"0xa0cce1b3000000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000090000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000005000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000000000000",
"func(bytes32,(bytes32,bytes32,uint8,uint8)[],(address[],uint32))",
(
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02",
"0000000000000000000000000000000000000000000000000000000000000002",
(
(
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00",
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x90",
"0000000000000000000000000000000000000000000000000000000000000000",
"0000000000000000000000000000000000000000000000000000000000000090",
0,
0,
),
(
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00P\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00",
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00",
"0000000000000000000000000000005000000000000000000000000000000000",
"0000000000000000000000000000000000000000000000000000000000000000",
1,
0,
),

@ -34,6 +34,7 @@ def test_generate_graph():
transaction_sequences=None,
disable_coverage_strategy=False,
disable_mutation_pruner=False,
enable_summaries=False,
)
analyzer = MythrilAnalyzer(
disassembler=disassembler,

@ -0,0 +1,114 @@
import pytest
import json
import sys
import os
from tests import PROJECT_DIR, TESTDATA
from subprocess import check_output, CalledProcessError
MYTH = str(PROJECT_DIR / "myth")
def output_of(command):
"""
:param command:
:return:
"""
try:
return check_output(command, shell=True).decode("UTF-8")
except CalledProcessError as exc:
return exc.output.decode("UTF-8")
test_data = (
# TODO: The commented tests should be sped up!
# (
# "destruct.sol",
# {
# "TX_COUNT": 5,
# "MODULE": "AccidentallyKillable",
# "ISSUE_COUNT": 1,
# "VERSION": "v0.5.0",
# },
# ),
# (
# "destruct.sol",
# {
# "TX_COUNT": 4,
# "MODULE": "AccidentallyKillable",
# "ISSUE_COUNT": 0,
# "VERSION": "v0.5.0",
# },
# ),
(
"theft.sol",
{"TX_COUNT": 4, "MODULE": "EtherThief", "ISSUE_COUNT": 1, "VERSION": "v0.5.0"},
),
(
"theft.sol",
{"TX_COUNT": 3, "MODULE": "EtherThief", "ISSUE_COUNT": 0, "VERSION": "v0.5.0"},
),
(
"large.sol",
{
"TX_COUNT": 11,
"MODULE": "AccidentallyKillable",
"ISSUE_COUNT": 1,
"VERSION": "v0.5.0",
},
),
(
"large.sol",
{
"TX_COUNT": 10,
"MODULE": "AccidentallyKillable",
"ISSUE_COUNT": 0,
"VERSION": "v0.5.0",
},
),
(
"hash_test.sol",
{
"TX_COUNT": 2,
"MODULE": "AccidentallyKillable",
"ISSUE_COUNT": 1,
"VERSION": "v0.4.24",
},
),
(
"complex.sol",
{
"TX_COUNT": 2,
"MODULE": "AccidentallyKillable",
"ISSUE_COUNT": 1,
"VERSION": "v0.5.0",
},
),
(
"base_case.sol",
{
"TX_COUNT": 1,
"MODULE": "AccidentallyKillable",
"ISSUE_COUNT": 1,
"VERSION": "v0.5.0",
},
),
(
"simple_theft.sol",
{
"TX_COUNT": 1,
"MODULE": "EtherThief",
"ISSUE_COUNT": 0,
"VERSION": "v0.5.0",
},
),
)
@pytest.mark.parametrize("file_name, tx_data", test_data)
def test_analysis(file_name, tx_data):
file_path = str(TESTDATA / "input_contracts" / file_name)
command = f"""python3 {MYTH} analyze {file_path} -t {tx_data["TX_COUNT"]} -o jsonv2 -m {tx_data["MODULE"]} --solver-timeout 60000 --solv {tx_data["VERSION"]} --execution-timeout 300 --enable-summaries"""
output = json.loads(output_of(command))
assert len(output[0]["issues"]) == tx_data["ISSUE_COUNT"]

@ -41,6 +41,7 @@ def test_fire_lasers(mock_sym, mock_fire_lasers, mock_code_info):
transaction_sequences=None,
disable_coverage_strategy=False,
disable_mutation_pruner=False,
enable_summaries=False,
)
analyzer = MythrilAnalyzer(disassembler, cmd_args=args)

@ -31,6 +31,7 @@ def test_statespace_dump():
transaction_sequences=None,
disable_coverage_strategy=False,
disable_mutation_pruner=False,
enable_summaries=False,
)
analyzer = MythrilAnalyzer(
disassembler=disassembler,

@ -0,0 +1,21 @@
contract B{
uint x=0;
function incr() public returns(uint){
require(x==0);
x += 1;
}
function incr2() public payable returns(uint){
require(x==1);
x += 1;
}
function continous_incr() public payable returns(uint){
require(x>=2);
x += 1;
}
function destroy() public returns(uint){
selfdestruct(msg.sender);
}
}

@ -0,0 +1,88 @@
pragma solidity 0.5.0;
contract WalletLibrary {
struct PendingState {
uint yetNeeded;
uint ownersDone;
uint index;
}
struct Transaction {
address to;
uint value;
bytes data;
}
modifier onlymanyowners(bytes32 _operation) {
if (confirmAndCheck(_operation))
_;
}
function initMultiowned(address[] memory _owners, uint _required) public only_uninitialized {
m_numOwners = _owners.length + 1;
m_owners[1] = uint(msg.sender);
m_ownerIndex[uint(msg.sender)] = 1;
for (uint i = 0; i < _owners.length; ++i)
{
m_owners[2 + i] = uint(_owners[i]);
m_ownerIndex[uint(_owners[i])] = 2 + i;
}
m_required = _required;
}
modifier only_uninitialized { require(m_numOwners == 0); _; }
function kill(address payable _to) onlymanyowners(keccak256(msg.data)) external {
selfdestruct(_to);
}
function confirmAndCheck(bytes32 _operation) internal returns (bool) {
uint ownerIndex = m_ownerIndex[uint(msg.sender)];
if (ownerIndex == 0) return false;
PendingState memory pending = m_pending[_operation];
if (pending.yetNeeded == 0) {
pending.yetNeeded = m_required;
pending.ownersDone = 0;
pending.index = m_pendingIndex.length++;
m_pendingIndex[pending.index] = _operation;
}
uint ownerIndexBit = 2**ownerIndex;
if (pending.ownersDone & ownerIndexBit == 0) {
if (pending.yetNeeded <= 1) {
delete m_pendingIndex[m_pending[_operation].index];
delete m_pending[_operation];
return true;
}
else
{
// not enough: record that this owner in particular confirmed.
pending.yetNeeded--;
pending.ownersDone |= ownerIndexBit;
}
}
}
uint public m_required;
uint public m_numOwners;
// list of owners
uint[256] m_owners;
mapping(uint => uint) m_ownerIndex;
mapping(bytes32 => PendingState) m_pending;
bytes32[] m_pendingIndex;
}

@ -0,0 +1,88 @@
pragma solidity 0.5.0;
contract WalletLibrary {
struct PendingState {
uint yetNeeded;
uint ownersDone;
uint index;
}
struct Transaction {
address to;
uint value;
bytes data;
}
modifier onlymanyowners(bytes32 _operation) {
if (confirmAndCheck(_operation))
_;
}
function initMultiowned(address[] memory _owners, uint _required) public only_uninitialized {
m_numOwners = _owners.length + 1;
m_owners[1] = uint(msg.sender);
m_ownerIndex[uint(msg.sender)] = 1;
for (uint i = 0; i < _owners.length; ++i)
{
m_owners[2 + i] = uint(_owners[i]);
m_ownerIndex[uint(_owners[i])] = 2 + i;
}
m_required = _required;
}
modifier only_uninitialized { require(m_numOwners == 0); _; }
function kill(address payable _to) onlymanyowners(keccak256(msg.data)) external {
selfdestruct(_to);
}
function confirmAndCheck(bytes32 _operation) internal returns (bool) {
uint ownerIndex = m_ownerIndex[uint(msg.sender)];
if (ownerIndex == 0) return false;
PendingState memory pending = m_pending[_operation];
if (pending.yetNeeded == 0) {
pending.yetNeeded = m_required;
pending.ownersDone = 0;
pending.index = m_pendingIndex.length++;
m_pendingIndex[pending.index] = _operation;
}
uint ownerIndexBit = 2**ownerIndex;
if (pending.ownersDone & ownerIndexBit == 0) {
if (pending.yetNeeded <= 1) {
delete m_pendingIndex[m_pending[_operation].index];
delete m_pending[_operation];
return true;
}
else
{
// not enough: record that this owner in particular confirmed.
pending.yetNeeded--;
pending.ownersDone |= ownerIndexBit;
}
}
}
uint public m_required;
uint public m_numOwners;
// list of owners
uint[256] m_owners;
mapping(uint => uint) m_ownerIndex;
mapping(bytes32 => PendingState) m_pending;
bytes32[] m_pendingIndex;
}

@ -0,0 +1,15 @@
contract StorageTest {
mapping(bytes32 => address) data;
function confirmAndCheck(uint256 x) public{
data[keccak256(abi.encodePacked(x))] = msg.sender;
}
function destruct(bytes32 x) public{
require(data[x] == msg.sender);
selfdestruct(data[x]);
}
}

@ -0,0 +1,14 @@
contract B{
uint x=0;
uint total=0;
function incr() public returns(uint){
x += 1;
}
function foo() public returns(uint){
require(x==10);
selfdestruct(msg.sender);
}
}

@ -0,0 +1,8 @@
pragma solidity ^0.8.0;
contract Fallback {
function withdraw() public { payable(msg.sender).transfer(address(this).balance); }
}

@ -0,0 +1,25 @@
contract B{
uint x=0;
uint total = 0;
function incr() public returns(uint){
require(x==0);
x += 1;
}
function incr2() public payable returns(uint){
require(x==1);
x += 1;
total += msg.value;
}
function continous_incr(uint val) public payable returns(uint){
require(x>=2);
x += val;
total += msg.value;
}
function foo() public returns(uint){
require(x==4);
x += 1;
msg.sender.transfer(total);
}
}
Loading…
Cancel
Save