Support merge annotations (#1419)

* Support merge annotations

* Fix type hinting

* Update mythril/laser/ethereum/state/annotation.py

Co-authored-by: JoranHonig <JoranHonig@users.noreply.github.com>

* Update mythril/laser/plugin/plugins/plugin_annotations.py

Co-authored-by: JoranHonig <JoranHonig@users.noreply.github.com>

* Review fixes and a bugfix

* Make merge_annotation sideeffect free

* Fix black

* Change List to set

* use set() over {}

* Remove check merge from StateAnnotations

* Add missed out commit

* Add some review fixes

* Add some review fixes

* Merge if conditions

Co-authored-by: JoranHonig <JoranHonig@users.noreply.github.com>
pull/1438/head
Nikhil Parasaram 4 years ago committed by GitHub
parent dce7e868ad
commit 749cfcf673
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 16
      mythril/laser/ethereum/state/annotation.py
  2. 4
      mythril/laser/plugin/plugins/dependency_pruner.py
  3. 2
      mythril/laser/plugin/plugins/instruction_profiler.py
  4. 84
      mythril/laser/plugin/plugins/plugin_annotations.py

@ -4,6 +4,8 @@ This includes the base StateAnnotation class, as well as an adaption,
which will not be copied on every new state.
"""
from abc import abstractmethod
class StateAnnotation:
"""The StateAnnotation class is used to persist information over traces.
@ -35,6 +37,20 @@ class StateAnnotation:
return False
class MergeableStateAnnotation(StateAnnotation):
"""This class allows a base annotation class for annotations that
can be merged.
"""
@abstractmethod
def check_merge_annotation(self, annotation) -> bool:
pass
@abstractmethod
def merge_annotation(self, annotation):
pass
class NoCopyAnnotation(StateAnnotation):
"""This class provides a base annotation class for annotations that
shouldn't be copied on every new state.

@ -242,7 +242,7 @@ class DependencyPruner(LaserPlugin):
location = state.mstate.stack[-1]
if location not in annotation.storage_loaded:
annotation.storage_loaded.append(location)
annotation.storage_loaded.add(location)
# We backwards-annotate the path here as sometimes execution never reaches a stop or return
# (and this may change in a future transaction).
@ -332,6 +332,6 @@ class DependencyPruner(LaserPlugin):
# the next transaction
annotation.path = [0]
annotation.storage_loaded = []
annotation.storage_loaded = set()
world_state_annotation.annotations_stack.append(annotation)

@ -32,7 +32,7 @@ log = logging.getLogger(__name__)
class InstructionProfilerBuilder(PluginBuilder):
plugin_name = "dependency-pruner"
plugin_name = "instruction-profiler"
def __call__(self, *args, **kwargs):
return InstructionProfiler()

@ -1,7 +1,13 @@
from mythril.laser.ethereum.state.annotation import StateAnnotation
from mythril.laser.ethereum.state.annotation import (
StateAnnotation,
MergeableStateAnnotation,
)
from copy import copy
from typing import Dict, List, Set
import logging
log = logging.getLogger(__name__)
class MutationAnnotation(StateAnnotation):
@ -18,15 +24,15 @@ class MutationAnnotation(StateAnnotation):
return True
class DependencyAnnotation(StateAnnotation):
class DependencyAnnotation(MergeableStateAnnotation):
"""Dependency Annotation
This annotation tracks read and write access to the state during each transaction.
"""
def __init__(self):
self.storage_loaded = [] # type: List
self.storage_written = {} # type: Dict[int, List]
self.storage_loaded = set() # type: Set
self.storage_written = {} # type: Dict[int, Set]
self.has_call = False # type: bool
self.path = [0] # type: List
self.blocks_seen = set() # type: Set[int]
@ -41,19 +47,38 @@ class DependencyAnnotation(StateAnnotation):
return result
def get_storage_write_cache(self, iteration: int):
if iteration not in self.storage_written:
self.storage_written[iteration] = []
return self.storage_written[iteration]
return self.storage_written.get(iteration, set())
def extend_storage_write_cache(self, iteration: int, value: object):
if iteration not in self.storage_written:
self.storage_written[iteration] = [value]
elif value not in self.storage_written[iteration]:
self.storage_written[iteration].append(value)
class WSDependencyAnnotation(StateAnnotation):
self.storage_written[iteration] = set()
self.storage_written[iteration].add(value)
def check_merge_annotation(self, other: "DependencyAnnotation"):
if not isinstance(other, DependencyAnnotation):
raise TypeError("Expected an instance of DependencyAnnotation")
return self.has_call == other.has_call and self.path == other.path
def merge_annotation(self, other: "DependencyAnnotation"):
merged_annotation = DependencyAnnotation()
merged_annotation.blocks_seen = self.blocks_seen.union(other.blocks_seen)
merged_annotation.has_call = self.has_call
merged_annotation.path = copy(self.path)
merged_annotation.storage_loaded = self.storage_loaded.union(
other.storage_loaded
)
keys = set(
list(other.storage_written.keys()) + list(self.storage_written.keys())
)
for key in keys:
other_set = other.storage_written.get(key, set())
merged_annotation.storage_written[key] = self.storage_written.get(
key, set()
).union(other_set)
return merged_annotation
class WSDependencyAnnotation(MergeableStateAnnotation):
"""Dependency Annotation for World state
This world state annotation maintains a stack of state annotations.
@ -61,9 +86,38 @@ class WSDependencyAnnotation(StateAnnotation):
"""
def __init__(self):
self.annotations_stack = []
self.annotations_stack: List[DependencyAnnotation] = []
def __copy__(self):
result = WSDependencyAnnotation()
result.annotations_stack = copy(self.annotations_stack)
return result
def check_merge_annotation(self, annotation: "WSDependencyAnnotation") -> bool:
if len(self.annotations_stack) != len(annotation.annotations_stack):
# We can only merge worldstate annotations that have seen an equal amount of transactions
# since the beginning of symbolic execution
return False
for a1, a2 in zip(self.annotations_stack, annotation.annotations_stack):
if a1 == a2:
continue
if (
isinstance(a1, MergeableStateAnnotation)
and isinstance(a2, MergeableStateAnnotation)
and a1.check_merge_annotation(a2) is True
):
continue
log.debug("Aborting merge between annotations {} and {}".format(a1, a2))
return False
return True
def merge_annotation(
self, annotation: "WSDependencyAnnotation"
) -> "WSDependencyAnnotation":
merged_annotation = WSDependencyAnnotation()
for a1, a2 in zip(self.annotations_stack, annotation.annotations_stack):
if a1 == a2:
merged_annotation.annotations_stack.append(copy(a1))
merged_annotation.annotations_stack.append(a1.merge_annotation(a2))
return merged_annotation

Loading…
Cancel
Save