diff --git a/mythril/analysis/symbolic.py b/mythril/analysis/symbolic.py index e5990a77..55ebf39d 100644 --- a/mythril/analysis/symbolic.py +++ b/mythril/analysis/symbolic.py @@ -16,7 +16,9 @@ from mythril.laser.ethereum.strategy.basic import ( ReturnWeightedRandomStrategy, BasicSearchStrategy, ) -from mythril.laser.ethereum.strategy.custom import BFSBoundedLoopsStrategy +from mythril.laser.ethereum.strategy.extensions.bounded_loops import ( + BoundedLoopsStrategy, +) from mythril.laser.smt import symbol_factory, BitVec from typing import Union, List, Dict, Type from mythril.solidity.soliditycontract import EVMContract, SolidityContract @@ -38,6 +40,7 @@ class SymExecWrapper: dynloader=None, max_depth=22, execution_timeout=None, + loop_bound=4, create_timeout=None, transaction_count=2, modules=(), @@ -70,8 +73,6 @@ class SymExecWrapper: s_strategy = ReturnRandomNaivelyStrategy elif strategy == "weighted-random": s_strategy = ReturnWeightedRandomStrategy - elif strategy == "bfs-bounded": - s_strategy = BFSBoundedLoopsStrategy else: raise ValueError("Invalid strategy argument supplied") @@ -90,6 +91,9 @@ class SymExecWrapper: enable_iprof=enable_iprof, ) + if loop_bound is not None: + self.laser.extend_strategy(BoundedLoopsStrategy, loop_bound) + plugin_loader = LaserPluginLoader(self.laser) plugin_loader.load(PluginFactory.build_mutation_pruner_plugin()) plugin_loader.load(PluginFactory.build_instruction_coverage_plugin()) diff --git a/mythril/interfaces/cli.py b/mythril/interfaces/cli.py index 4b0ed347..2d3a63e1 100644 --- a/mythril/interfaces/cli.py +++ b/mythril/interfaces/cli.py @@ -197,13 +197,20 @@ def create_parser(parser: argparse.ArgumentParser) -> None: default=50, help="Maximum recursion depth for symbolic execution", ) - options.add_argument( "--strategy", - choices=["dfs", "bfs", "naive-random", "weighted-random", "bfs-bounded"], - default="bfs-bounded", + choices=["dfs", "bfs", "naive-random", "weighted-random"], + default="bfs", help="Symbolic execution strategy", ) + options.add_argument( + "-b", + "--loop-bound", + type=int, + default=4, + help="Bound loops at n iterations", + metavar="N", + ) options.add_argument( "-t", "--transaction-count", @@ -407,6 +414,7 @@ def execute_command( address=address, max_depth=args.max_depth, execution_timeout=args.execution_timeout, + loop_bound=args.loop_bound, create_timeout=args.create_timeout, enable_iprof=args.enable_iprof, onchain_storage_access=not args.no_onchain_storage_access, diff --git a/mythril/laser/ethereum/strategy/custom.py b/mythril/laser/ethereum/strategy/custom.py deleted file mode 100644 index 9e89a552..00000000 --- a/mythril/laser/ethereum/strategy/custom.py +++ /dev/null @@ -1,69 +0,0 @@ -from mythril.laser.ethereum.state.global_state import GlobalState -from mythril.laser.ethereum.strategy.basic import BreadthFirstSearchStrategy -from mythril.laser.ethereum.state.annotation import StateAnnotation -from mythril.laser.ethereum import util -from typing import Dict, cast, List -from copy import copy -import logging - - -JUMPDEST_LIMIT = 4 -log = logging.getLogger(__name__) - - -class JumpdestCountAnnotation(StateAnnotation): - """State annotation used when a path is chosen based on a predictable variable.""" - - def __init__(self) -> None: - self._jumpdest_count = {} # type: Dict[int, int] - - def __copy__(self): - result = JumpdestCountAnnotation() - result._jumpdest_count = copy(self._jumpdest_count) - return result - - -class BFSBoundedLoopsStrategy(BreadthFirstSearchStrategy): - """Implements a breadth first search strategy that prunes loops. - JUMPI instructions are skipped after a jump destination has been - targeted JUMPDEST_LIMIT times. - """ - - def __init__(self, work_list, max_depth) -> None: - super().__init__(work_list, max_depth) - - def get_strategic_global_state(self) -> GlobalState: - """ - :return: - """ - - state = self.work_list.pop(0) - - opcode = state.get_current_instruction()["opcode"] - - if opcode != "JUMPI": - return state - - annotations = cast( - List[JumpdestCountAnnotation], - list(state.get_annotations(JumpdestCountAnnotation)), - ) - - if len(annotations) == 0: - annotation = JumpdestCountAnnotation() - state.annotate(annotation) - else: - annotation = annotations[0] - - target = int(util.get_concrete_int(state.mstate.stack[-1])) - - try: - annotation._jumpdest_count[target] += 1 - except KeyError: - annotation._jumpdest_count[target] = 1 - - if annotation._jumpdest_count[target] > JUMPDEST_LIMIT: - log.debug("JUMPDEST limit reached, skipping JUMPI") - return self.work_list.pop(0) - - return state diff --git a/mythril/laser/ethereum/strategy/extensions/bounded_loops.py b/mythril/laser/ethereum/strategy/extensions/bounded_loops.py new file mode 100644 index 00000000..10525ffa --- /dev/null +++ b/mythril/laser/ethereum/strategy/extensions/bounded_loops.py @@ -0,0 +1,81 @@ +from mythril.laser.ethereum.state.global_state import GlobalState +from mythril.laser.ethereum.strategy.basic import BasicSearchStrategy +from mythril.laser.ethereum.state.annotation import StateAnnotation +from mythril.laser.ethereum import util +from typing import Dict, cast, List +from copy import copy +import logging + + +log = logging.getLogger(__name__) + + +class JumpdestCountAnnotation(StateAnnotation): + """State annotation that counts the number of jumps per destination.""" + + def __init__(self) -> None: + self._jumpdest_count = {} # type: Dict[int, int] + + def __copy__(self): + result = JumpdestCountAnnotation() + result._jumpdest_count = copy(self._jumpdest_count) + return result + + +class BoundedLoopsStrategy(BasicSearchStrategy): + """Adds loop pruning to the search strategy. + Ignores JUMPI instruction if the destination was targeted >JUMPDEST_LIMIT times. + """ + + def __init__(self, super_strategy: BasicSearchStrategy, *args) -> None: + """""" + + self.super_strategy = super_strategy + self.jumpdest_limit = args[0][0] + + log.info( + "Loaded search strategy extension: Loop bounds (limit = {})".format( + self.jumpdest_limit + ) + ) + + BasicSearchStrategy.__init__( + self, super_strategy.work_list, super_strategy.max_depth + ) + + def get_strategic_global_state(self) -> GlobalState: + """ + :return: + """ + + while 1: + + state = self.super_strategy.get_strategic_global_state() + opcode = state.get_current_instruction()["opcode"] + + if opcode != "JUMPI": + return state + + annotations = cast( + List[JumpdestCountAnnotation], + list(state.get_annotations(JumpdestCountAnnotation)), + ) + + if len(annotations) == 0: + annotation = JumpdestCountAnnotation() + state.annotate(annotation) + else: + annotation = annotations[0] + + target = int(util.get_concrete_int(state.mstate.stack[-1])) + + try: + annotation._jumpdest_count[target] += 1 + except KeyError: + annotation._jumpdest_count[target] = 1 + + if annotation._jumpdest_count[target] > self.jumpdest_limit: + log.debug("JUMPDEST limit reached, skipping JUMPI") + continue + + return state diff --git a/mythril/laser/ethereum/svm.py b/mythril/laser/ethereum/svm.py index 8f74410a..a13cf5b7 100644 --- a/mythril/laser/ethereum/svm.py +++ b/mythril/laser/ethereum/svm.py @@ -14,6 +14,7 @@ from mythril.laser.ethereum.plugins.signals import PluginSkipWorldState from mythril.laser.ethereum.state.global_state import GlobalState from mythril.laser.ethereum.state.world_state import WorldState from mythril.laser.ethereum.strategy.basic import DepthFirstSearchStrategy +from abc import ABCMeta from mythril.laser.ethereum.time_handler import time_handler from mythril.laser.ethereum.transaction import ( ContractCreationTransaction, @@ -102,6 +103,9 @@ class LaserEVM: log.info("LASER EVM initialized with dynamic loader: " + str(dynamic_loader)) + def extend_strategy(self, extension: ABCMeta, *args) -> None: + self.strategy = extension(self.strategy, args) + def sym_exec( self, world_state: WorldState = None, diff --git a/mythril/mythril/mythril_analyzer.py b/mythril/mythril/mythril_analyzer.py index d4d4cd5b..9433278e 100644 --- a/mythril/mythril/mythril_analyzer.py +++ b/mythril/mythril/mythril_analyzer.py @@ -35,6 +35,7 @@ class MythrilAnalyzer: address: Optional[str] = None, max_depth: Optional[int] = None, execution_timeout: Optional[int] = None, + loop_bound: Optional[int] = None, create_timeout: Optional[int] = None, enable_iprof: bool = False, ): @@ -53,6 +54,7 @@ class MythrilAnalyzer: self.address = address self.max_depth = max_depth self.execution_timeout = execution_timeout + self.loop_bound = loop_bound self.create_timeout = create_timeout self.enable_iprof = enable_iprof @@ -142,12 +144,14 @@ class MythrilAnalyzer: ), max_depth=self.max_depth, execution_timeout=self.execution_timeout, + loop_bound=self.loop_bound, create_timeout=self.create_timeout, transaction_count=transaction_count, modules=modules, compulsory_statespace=False, enable_iprof=self.enable_iprof, ) + issues = fire_lasers(sym, modules) except KeyboardInterrupt: log.critical("Keyboard Interrupt")