mirror of https://github.com/ConsenSys/mythril
Merge pull request #1335 from ConsenSys/refine_detection_architecture
Clean up detection modulespull/1343/head
commit
29e011dc3e
@ -0,0 +1 @@ |
||||
from mythril.analysis.symbolic import SymExecWrapper |
@ -0,0 +1,6 @@ |
||||
from mythril.analysis.module.base import EntryPoint, DetectionModule |
||||
from mythril.analysis.module.loader import ModuleLoader |
||||
from mythril.analysis.module.util import ( |
||||
get_detection_module_hooks, |
||||
reset_callback_modules, |
||||
) |
@ -0,0 +1,94 @@ |
||||
""" Mythril Detection Modules |
||||
|
||||
This module includes an definition of the DetectionModule interface. |
||||
DetectionModules implement different analysis rules to find weaknesses and vulnerabilities. |
||||
""" |
||||
import logging |
||||
from typing import List, Set, Optional |
||||
|
||||
from mythril.analysis.report import Issue |
||||
from mythril.laser.ethereum.state.global_state import GlobalState |
||||
|
||||
from abc import ABC, abstractmethod |
||||
from enum import Enum |
||||
|
||||
# Get logger instance |
||||
log = logging.getLogger(__name__) |
||||
|
||||
|
||||
class EntryPoint(Enum): |
||||
""" EntryPoint Enum |
||||
|
||||
This enum is used to signify the entry_point of detection modules. |
||||
See also the class documentation of DetectionModule |
||||
""" |
||||
|
||||
POST = 1 |
||||
CALLBACK = 2 |
||||
|
||||
|
||||
class DetectionModule(ABC): |
||||
"""The base detection module. |
||||
|
||||
All custom-built detection modules must inherit from this class. |
||||
|
||||
There are several class properties that expose information about the detection modules |
||||
- name: The name of the detection module |
||||
- swc_id: The SWC ID associated with the weakness that the module detects |
||||
- description: A description of the detection module, and what it detects |
||||
- entry_point: Mythril can run callback style detection modules, or modules that search the statespace. |
||||
[IMPORTANT] POST entry points severely slow down the analysis, try to always use callback style modules |
||||
- pre_hooks: A list of instructions to hook the laser vm for (pre execution of the instruction) |
||||
- post_hooks: A list of instructions to hook the laser vm for (post execution of the instruction) |
||||
""" |
||||
|
||||
name = "Detection Module Name" |
||||
swc_id = "SWC-000" |
||||
description = "Detection module description" |
||||
entry_point = EntryPoint.CALLBACK # type: EntryPoint |
||||
pre_hooks = [] # type: List[str] |
||||
post_hooks = [] # type: List[str] |
||||
|
||||
def __init__(self) -> None: |
||||
self.issues = [] # type: List[Issue] |
||||
self.cache = set() # type: Set[int] |
||||
|
||||
def reset_module(self): |
||||
""" Resets the storage of this module """ |
||||
self.issues = [] |
||||
|
||||
def execute(self, target: GlobalState) -> Optional[List[Issue]]: |
||||
"""The entry point for execution, which is being called by Mythril. |
||||
|
||||
:param target: The target of the analysis, either a global state (callback) or the entire statespace (post) |
||||
:return: List of encountered issues |
||||
""" |
||||
|
||||
log.debug("Entering analysis module: {}".format(self.__class__.__name__)) |
||||
|
||||
result = self._execute(target) |
||||
|
||||
log.debug("Exiting analysis module: {}".format(self.__class__.__name__)) |
||||
|
||||
return result |
||||
|
||||
@abstractmethod |
||||
def _execute(self, target) -> Optional[List[Issue]]: |
||||
"""Module main method (override this) |
||||
|
||||
:param target: The target of the analysis, either a global state (callback) or the entire statespace (post) |
||||
:return: List of encountered issues |
||||
""" |
||||
pass |
||||
|
||||
def __repr__(self) -> str: |
||||
return ( |
||||
"<" |
||||
"DetectionModule " |
||||
"name={0.name} " |
||||
"swc_id={0.swc_id} " |
||||
"pre_hooks={0.pre_hooks} " |
||||
"post_hooks={0.post_hooks} " |
||||
"description={0.description}" |
||||
">" |
||||
).format(self) |
@ -0,0 +1,81 @@ |
||||
from mythril.analysis.module.base import DetectionModule, EntryPoint |
||||
from mythril.support.support_utils import Singleton |
||||
|
||||
from mythril.analysis.module.modules.arbitrary_jump import ArbitraryJump |
||||
from mythril.analysis.module.modules.arbitrary_write import ArbitraryStorage |
||||
from mythril.analysis.module.modules.delegatecall import DelegateCallModule |
||||
from mythril.analysis.module.modules.dependence_on_predictable_vars import ( |
||||
PredictableDependenceModule, |
||||
) |
||||
from mythril.analysis.module.modules.deprecated_ops import DeprecatedOperationsModule |
||||
from mythril.analysis.module.modules.ether_thief import EtherThief |
||||
from mythril.analysis.module.modules.exceptions import ReachableExceptionsModule |
||||
from mythril.analysis.module.modules.external_calls import ExternalCalls |
||||
from mythril.analysis.module.modules.integer import IntegerOverflowUnderflowModule |
||||
from mythril.analysis.module.modules.multiple_sends import MultipleSendsModule |
||||
from mythril.analysis.module.modules.state_change_external_calls import StateChange |
||||
from mythril.analysis.module.modules.suicide import SuicideModule |
||||
from mythril.analysis.module.modules.unchecked_retval import UncheckedRetvalModule |
||||
from mythril.analysis.module.modules.user_assertions import UserAssertions |
||||
|
||||
from mythril.analysis.module.base import EntryPoint |
||||
|
||||
from typing import Optional, List |
||||
|
||||
|
||||
class ModuleLoader(object, metaclass=Singleton): |
||||
"""ModuleLoader |
||||
|
||||
The module loader class implements a singleton loader for detection modules. |
||||
|
||||
By default it will load the detection modules in the mythril package. |
||||
Additional detection modules can be loaded using the register_module function call implemented by the ModuleLoader |
||||
""" |
||||
|
||||
def __init__(self): |
||||
self._modules = [] |
||||
self._register_mythril_modules() |
||||
|
||||
def register_module(self, detection_module: DetectionModule): |
||||
"""Registers a detection module with the module loader""" |
||||
if not isinstance(detection_module, DetectionModule): |
||||
raise ValueError("The passed variable is not a valid detection module") |
||||
self._modules.append(detection_module) |
||||
|
||||
def get_detection_modules( |
||||
self, |
||||
entry_point: Optional[EntryPoint] = None, |
||||
white_list: Optional[List[str]] = None, |
||||
) -> List[DetectionModule]: |
||||
""" Gets registered detection modules |
||||
|
||||
:param entry_point: If specified: only return detection modules with this entry point |
||||
:param white_list: If specified: only return whitelisted detection modules |
||||
:return: The selected detection modules |
||||
""" |
||||
result = self._modules[:] |
||||
if entry_point: |
||||
result = [module for module in result if module.entry_point == entry_point] |
||||
if white_list: |
||||
result = [module for module in result if module.name in white_list] |
||||
return result |
||||
|
||||
def _register_mythril_modules(self): |
||||
self._modules.extend( |
||||
[ |
||||
ArbitraryJump(), |
||||
ArbitraryStorage(), |
||||
DelegateCallModule(), |
||||
PredictableDependenceModule(), |
||||
DeprecatedOperationsModule(), |
||||
EtherThief(), |
||||
ReachableExceptionsModule(), |
||||
ExternalCalls(), |
||||
IntegerOverflowUnderflowModule(), |
||||
MultipleSendsModule(), |
||||
StateChange(), |
||||
SuicideModule(), |
||||
UncheckedRetvalModule(), |
||||
UserAssertions(), |
||||
] |
||||
) |
@ -0,0 +1 @@ |
||||
|
@ -0,0 +1,50 @@ |
||||
from collections import defaultdict |
||||
from typing import List, Optional, Callable, Mapping, Dict |
||||
import logging |
||||
|
||||
from mythril.support.opcodes import opcodes |
||||
from mythril.analysis.module.base import DetectionModule, EntryPoint |
||||
from mythril.analysis.module.loader import ModuleLoader |
||||
|
||||
log = logging.getLogger(__name__) |
||||
OP_CODE_LIST = [c[0] for _, c in opcodes.items()] |
||||
|
||||
|
||||
def get_detection_module_hooks( |
||||
modules: List[DetectionModule], hook_type="pre" |
||||
) -> Dict[str, List[Callable]]: |
||||
""" Gets a dictionary with the hooks for the passed detection modules |
||||
|
||||
:param modules: The modules for which to retrieve hooks |
||||
:param hook_type: The type of hooks to retrieve (default: "pre") |
||||
:return: Dictionary with discovered hooks |
||||
""" |
||||
hook_dict = defaultdict(list) # type: Mapping[str, List[Callable]] |
||||
for module in modules: |
||||
|
||||
hooks = module.pre_hooks if hook_type == "pre" else module.post_hooks |
||||
|
||||
for op_code in map(lambda x: x.upper(), hooks): |
||||
# A hook can be either OP_CODE or START* |
||||
# When an entry like the second is encountered we hook all opcodes that start with START |
||||
if op_code in OP_CODE_LIST: |
||||
hook_dict[op_code].append(module.execute) |
||||
elif op_code.endswith("*"): |
||||
to_register = filter(lambda x: x.startswith(op_code[:-1]), OP_CODE_LIST) |
||||
for actual_hook in to_register: |
||||
hook_dict[actual_hook].append(module.execute) |
||||
else: |
||||
log.error( |
||||
"Encountered invalid hook opcode %s in module %s", |
||||
op_code, |
||||
module.name, |
||||
) |
||||
|
||||
return dict(hook_dict) |
||||
|
||||
|
||||
def reset_callback_modules(module_names: Optional[List[str]] = None): |
||||
"""Clean the issue records of every callback-based module.""" |
||||
modules = ModuleLoader().get_detection_modules(EntryPoint.CALLBACK, module_names) |
||||
for module in modules: |
||||
module.reset_module() |
@ -1,79 +0,0 @@ |
||||
"""This module contains the base class for all user-defined detection |
||||
modules.""" |
||||
|
||||
import logging |
||||
from typing import List, Set |
||||
|
||||
from mythril.analysis.report import Issue |
||||
|
||||
log = logging.getLogger(__name__) |
||||
|
||||
|
||||
class DetectionModule: |
||||
"""The base detection module. |
||||
|
||||
All custom-built detection modules must inherit from this class. |
||||
""" |
||||
|
||||
def __init__( |
||||
self, |
||||
name: str, |
||||
swc_id: str, |
||||
description: str, |
||||
entrypoint: str = "post", |
||||
pre_hooks: List[str] = None, |
||||
post_hooks: List[str] = None, |
||||
) -> None: |
||||
self.name = name |
||||
self.swc_id = swc_id |
||||
self.pre_hooks = pre_hooks if pre_hooks else [] |
||||
self.post_hooks = post_hooks if post_hooks else [] |
||||
self.description = description |
||||
if entrypoint not in ("post", "callback"): |
||||
log.error( |
||||
"Invalid entrypoint in module %s, must be one of {post, callback}", |
||||
self.name, |
||||
) |
||||
self.entrypoint = entrypoint |
||||
self.issues = [] # type: List[Issue] |
||||
self.cache = set() # type: Set[int] |
||||
|
||||
def reset_module(self): |
||||
""" |
||||
Resets issues |
||||
""" |
||||
self.issues = [] |
||||
|
||||
def execute(self, statespace) -> None: |
||||
"""The entry point for execution, which is being called by Mythril. |
||||
|
||||
:param statespace: |
||||
:return: |
||||
""" |
||||
|
||||
log.debug("Entering analysis module: {}".format(self.__class__.__name__)) |
||||
|
||||
self._execute(statespace) |
||||
|
||||
log.debug("Exiting analysis module: {}".format(self.__class__.__name__)) |
||||
|
||||
def _execute(self, statespace): |
||||
"""Module main method (override this) |
||||
|
||||
:param statespace: |
||||
:return: |
||||
""" |
||||
|
||||
raise NotImplementedError() |
||||
|
||||
def __repr__(self) -> str: |
||||
return ( |
||||
"<" |
||||
"DetectionModule " |
||||
"name={0.name} " |
||||
"swc_id={0.swc_id} " |
||||
"pre_hooks={0.pre_hooks} " |
||||
"post_hooks={0.post_hooks} " |
||||
"description={0.description}" |
||||
">" |
||||
).format(self) |
@ -1,131 +1,46 @@ |
||||
"""This module contains functionality for hooking in detection modules and |
||||
executing them.""" |
||||
from collections import defaultdict |
||||
|
||||
from mythril.support.opcodes import opcodes |
||||
from mythril.analysis import modules |
||||
import pkgutil |
||||
import importlib.util |
||||
from mythril.analysis.module import ModuleLoader, reset_callback_modules |
||||
from mythril.analysis.module.base import EntryPoint |
||||
from mythril.analysis.report import Issue |
||||
|
||||
from typing import Optional, List |
||||
import logging |
||||
import os |
||||
import sys |
||||
|
||||
log = logging.getLogger(__name__) |
||||
|
||||
OPCODE_LIST = [c[0] for _, c in opcodes.items()] |
||||
|
||||
|
||||
def reset_callback_modules(module_names=(), custom_modules_directory=""): |
||||
"""Clean the issue records of every callback-based module.""" |
||||
modules = get_detection_modules("callback", module_names, custom_modules_directory) |
||||
for module in modules: |
||||
module.detector.reset_module() |
||||
|
||||
|
||||
def get_detection_module_hooks(modules, hook_type="pre", custom_modules_directory=""): |
||||
hook_dict = defaultdict(list) |
||||
_modules = get_detection_modules( |
||||
entrypoint="callback", |
||||
include_modules=modules, |
||||
custom_modules_directory=custom_modules_directory, |
||||
) |
||||
for module in _modules: |
||||
hooks = ( |
||||
module.detector.pre_hooks |
||||
if hook_type == "pre" |
||||
else module.detector.post_hooks |
||||
) |
||||
|
||||
for op_code in map(lambda x: x.upper(), hooks): |
||||
if op_code in OPCODE_LIST: |
||||
hook_dict[op_code].append(module.detector.execute) |
||||
elif op_code.endswith("*"): |
||||
to_register = filter(lambda x: x.startswith(op_code[:-1]), OPCODE_LIST) |
||||
for actual_hook in to_register: |
||||
hook_dict[actual_hook].append(module.detector.execute) |
||||
else: |
||||
log.error( |
||||
"Encountered invalid hook opcode %s in module %s", |
||||
op_code, |
||||
module.detector.name, |
||||
) |
||||
return dict(hook_dict) |
||||
|
||||
|
||||
def get_detection_modules(entrypoint, include_modules=(), custom_modules_directory=""): |
||||
""" |
||||
|
||||
:param entrypoint: |
||||
:param include_modules: |
||||
:return: |
||||
""" |
||||
module = importlib.import_module("mythril.analysis.modules.base") |
||||
module.log.setLevel(log.level) |
||||
|
||||
include_modules = list(include_modules) |
||||
|
||||
_modules = [] |
||||
|
||||
for loader, module_name, _ in pkgutil.walk_packages(modules.__path__): |
||||
if include_modules and module_name not in include_modules: |
||||
continue |
||||
|
||||
if module_name != "base": |
||||
module = importlib.import_module("mythril.analysis.modules." + module_name) |
||||
module.log.setLevel(log.level) |
||||
if module.detector.entrypoint == entrypoint: |
||||
_modules.append(module) |
||||
if custom_modules_directory: |
||||
custom_modules_path = os.path.abspath(custom_modules_directory) |
||||
if custom_modules_path not in sys.path: |
||||
sys.path.append(custom_modules_path) |
||||
|
||||
for loader, module_name, _ in pkgutil.walk_packages([custom_modules_path]): |
||||
if include_modules and module_name not in include_modules: |
||||
continue |
||||
def retrieve_callback_issues(white_list: Optional[List[str]] = None) -> List[Issue]: |
||||
""" Get the issues discovered by callback type detection modules""" |
||||
issues = [] # type: List[Issue] |
||||
for module in ModuleLoader().get_detection_modules( |
||||
entry_point=EntryPoint.CALLBACK, white_list=white_list |
||||
): |
||||
log.debug("Retrieving results for " + module.name) |
||||
issues += module.issues |
||||
|
||||
if module_name != "base": |
||||
module = importlib.import_module(module_name, custom_modules_path) |
||||
module.log.setLevel(log.level) |
||||
if module.detector.entrypoint == entrypoint: |
||||
_modules.append(module) |
||||
reset_callback_modules(module_names=white_list) |
||||
|
||||
log.info("Found %s detection modules", len(_modules)) |
||||
return _modules |
||||
return issues |
||||
|
||||
|
||||
def fire_lasers(statespace, module_names=(), custom_modules_directory=""): |
||||
""" |
||||
def fire_lasers(statespace, white_list: Optional[List[str]] = None) -> List[Issue]: |
||||
""" Fire lasers at analysed statespace object |
||||
|
||||
:param statespace: |
||||
:param module_names: |
||||
:return: |
||||
:param statespace: Symbolic statespace to analyze |
||||
:param white_list: Optionally whitelist modules to use for the analysis |
||||
:return: Discovered issues |
||||
""" |
||||
log.info("Starting analysis") |
||||
|
||||
issues = [] |
||||
for module in get_detection_modules( |
||||
entrypoint="post", |
||||
include_modules=module_names, |
||||
custom_modules_directory=custom_modules_directory, |
||||
): |
||||
log.info("Executing " + module.detector.name) |
||||
issues += module.detector.execute(statespace) |
||||
|
||||
issues += retrieve_callback_issues(module_names, custom_modules_directory) |
||||
return issues |
||||
|
||||
|
||||
def retrieve_callback_issues(module_names=(), custom_modules_directory=""): |
||||
issues = [] |
||||
for module in get_detection_modules( |
||||
entrypoint="callback", |
||||
include_modules=module_names, |
||||
custom_modules_directory=custom_modules_directory, |
||||
issues = [] # type: List[Issue] |
||||
for module in ModuleLoader().get_detection_modules( |
||||
entry_point=EntryPoint.POST, white_list=white_list |
||||
): |
||||
log.debug("Retrieving results for " + module.detector.name) |
||||
issues += module.detector.issues |
||||
log.info("Executing " + module.name) |
||||
issues += module.execute(statespace) |
||||
|
||||
reset_callback_modules( |
||||
module_names=module_names, custom_modules_directory=custom_modules_directory |
||||
) |
||||
issues += retrieve_callback_issues(white_list) |
||||
return issues |
||||
|
Loading…
Reference in new issue