Add pending constraints strategy and make it the default search strategy (#1767)

* Check for 'unchecked' before turning off Integer module

* Use threading during solving constraints

* Hide segfaults

* Misc fixes

* Misc fixes

* Misc fixes

* reformat with black

* Mythril v0.23.21 (#1763)

* Misc bug fixes (#1764)

* Mythril v0.23.22 (#1765)

* rebase

* rebase

* save changes

* Save changes

* Pop

* Reformat
pull/1768/head
Nikhil Parasaram 2 years ago committed by GitHub
parent 80f7db0c4c
commit b2c409f53b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      mythril/analysis/symbolic.py
  2. 2
      mythril/interfaces/cli.py
  3. 16
      mythril/laser/ethereum/strategy/constraint_strategy.py
  4. 26
      mythril/laser/ethereum/svm.py
  5. 2
      mythril/support/model.py
  6. 4
      tests/integration_tests/analysis_tests.py

@ -95,7 +95,7 @@ class SymExecWrapper:
elif "beam-search: " in strategy:
beam_width = int(strategy.split("beam-search: ")[1])
s_strategy = BeamSearch
elif "delayed" in strategy:
elif "pending" in strategy:
s_strategy = DelayConstraintStrategy
else:
raise ValueError("Invalid strategy argument supplied")

@ -464,7 +464,7 @@ def add_analysis_args(options):
options.add_argument(
"--strategy",
choices=["dfs", "bfs", "naive-random", "weighted-random", "delayed"],
choices=["dfs", "bfs", "naive-random", "weighted-random", "pending"],
default="bfs",
help="Symbolic execution strategy",
)

@ -29,19 +29,11 @@ class DelayConstraintStrategy(BasicSearchStrategy):
:return: Global state
"""
while True:
if len(self.work_list) == 0:
while len(self.work_list) == 0:
state = self.pending_worklist.pop(0)
model = state.world_state.constraints.get_model()
if model is not None:
self.model_cache.put(model, 1)
return state
else:
state = self.work_list[0]
c_val = self.model_cache.check_quick_sat(
simplify(And(*state.world_state.constraints)).raw
)
if c_val == False:
self.pending_worklist.append(state)
self.work_list.pop(0)
else:
return self.work_list.pop(0)
self.work_list.append(state)
state = self.work_list.pop(0)
return state

@ -28,7 +28,7 @@ from mythril.laser.ethereum.transaction import (
execute_contract_creation,
execute_message_call,
)
from mythril.laser.smt import symbol_factory
from mythril.laser.smt import And, symbol_factory, simplify
from mythril.support.support_args import args
log = logging.getLogger(__name__)
@ -242,12 +242,24 @@ class LaserEVM:
old_states_count = len(self.open_states)
if self.use_reachability_check:
self.open_states = [
state
for state in self.open_states
if state.constraints.is_possible()
]
prune_count = old_states_count - len(self.open_states)
if isinstance(self.strategy, DelayConstraintStrategy):
open_states = []
for state in self.open_states:
c_val = self.strategy.model_cache.check_quick_sat(
simplify(And(*state.world_state.constraints)).raw
)
if c_val:
open_states.append(self.open_states)
else:
self.strategy.pending_worklist.append(state)
else:
self.open_states = [
state
for state in self.open_states
if state.constraints.is_possible()
]
prune_count = old_states_count - len(self.open_states)
if prune_count:
log.info("Pruned {} unreachable states".format(prune_count))
log.info(

@ -8,6 +8,7 @@ import logging
import os
import signal
import sys
import threading
from collections import OrderedDict
from copy import deepcopy
@ -96,7 +97,6 @@ def get_model(
ret_model = model_cache.check_quick_sat(simplify(And(*constraints)).raw)
if ret_model:
return ret_model
pool = ThreadPool(1)
try:
thread_result = pool.apply_async(

@ -69,9 +69,9 @@ def test_analysis(file_name, tx_data, calldata):
@pytest.mark.parametrize("file_name, tx_data, calldata", test_data)
def test_analysis_delayed(file_name, tx_data, calldata):
def test_analysis_pending(file_name, tx_data, calldata):
bytecode_file = str(TESTDATA / "inputs" / file_name)
command = f"""python3 {MYTH} analyze -f {bytecode_file} -t {tx_data["TX_COUNT"]} -o jsonv2 -m {tx_data["MODULE"]} --solver-timeout 60000 --strategy delayed --no-onchain-data"""
command = f"""python3 {MYTH} analyze -f {bytecode_file} -t {tx_data["TX_COUNT"]} -o jsonv2 -m {tx_data["MODULE"]} --solver-timeout 60000 --strategy pending --no-onchain-data"""
output = json.loads(output_of(command))
assert len(output[0]["issues"]) == tx_data["ISSUE_COUNT"]

Loading…
Cancel
Save