|
|
|
@ -6,12 +6,42 @@ import os |
|
|
|
|
import time |
|
|
|
|
import logging |
|
|
|
|
import sqlite3 |
|
|
|
|
import multiprocessing |
|
|
|
|
import functools |
|
|
|
|
from typing import List |
|
|
|
|
from collections import defaultdict |
|
|
|
|
|
|
|
|
|
from subprocess import Popen, PIPE |
|
|
|
|
from mythril.exceptions import CompilerError |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
lock = multiprocessing.Lock() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def synchronized(sync_lock): |
|
|
|
|
""" Synchronization decorator """ |
|
|
|
|
|
|
|
|
|
def wrapper(f): |
|
|
|
|
@functools.wraps(f) |
|
|
|
|
def inner_wrapper(*args, **kw): |
|
|
|
|
with sync_lock: |
|
|
|
|
return f(*args, **kw) |
|
|
|
|
|
|
|
|
|
return inner_wrapper |
|
|
|
|
|
|
|
|
|
return wrapper |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Singleton(type): |
|
|
|
|
_instances = {} |
|
|
|
|
|
|
|
|
|
@synchronized(lock) |
|
|
|
|
def __call__(cls, *args, **kwargs): |
|
|
|
|
if cls not in cls._instances: |
|
|
|
|
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs) |
|
|
|
|
return cls._instances[cls] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
# load if available but do not fail |
|
|
|
|
import ethereum_input_decoder |
|
|
|
@ -24,7 +54,7 @@ except ImportError: |
|
|
|
|
|
|
|
|
|
class SQLiteDB(object): |
|
|
|
|
""" |
|
|
|
|
Simple CM for sqlite3 databases. Commits everything at exit. |
|
|
|
|
Simple context manager for sqlite3 databases. Commits everything at exit. |
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
def __init__(self, path): |
|
|
|
@ -45,11 +75,15 @@ class SQLiteDB(object): |
|
|
|
|
return "<SQLiteDB path={}>".format(self.path) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class SignatureDB(object): |
|
|
|
|
class SignatureDB(object, metaclass=Singleton): |
|
|
|
|
def __init__(self, enable_online_lookup: bool = False, path: str = None) -> None: |
|
|
|
|
self.enable_online_lookup = enable_online_lookup |
|
|
|
|
self.online_lookup_miss = set() |
|
|
|
|
self.online_lookup_timeout = 0 |
|
|
|
|
|
|
|
|
|
# if we're analysing a Solidity file, store its hashes |
|
|
|
|
# here to prevent unnecessary lookups |
|
|
|
|
self.solidity_sigs = defaultdict(list) |
|
|
|
|
if path is None: |
|
|
|
|
self.path = os.environ.get("MYTHRIL_DIR") or os.path.join( |
|
|
|
|
os.path.expanduser("~"), ".mythril" |
|
|
|
@ -116,6 +150,12 @@ class SignatureDB(object): |
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
byte_sig = self._normalize_byte_sig(byte_sig) |
|
|
|
|
|
|
|
|
|
# check if we have any Solidity signatures to look up |
|
|
|
|
text_sigs = self.solidity_sigs.get(byte_sig) |
|
|
|
|
if text_sigs is not None: |
|
|
|
|
return text_sigs |
|
|
|
|
|
|
|
|
|
# try lookup in the local DB |
|
|
|
|
with SQLiteDB(self.path) as cur: |
|
|
|
|
cur.execute("SELECT text_sig FROM signatures WHERE byte_sig=?", (byte_sig,)) |
|
|
|
@ -156,7 +196,6 @@ class SignatureDB(object): |
|
|
|
|
:param file_path: solidity source code file path |
|
|
|
|
:return: |
|
|
|
|
""" |
|
|
|
|
sigs = {} |
|
|
|
|
cmd = [solc_binary, "--hashes", file_path] |
|
|
|
|
if solc_args: |
|
|
|
|
cmd.extend(solc_args.split()) |
|
|
|
@ -184,12 +223,16 @@ class SignatureDB(object): |
|
|
|
|
for line in stdout: |
|
|
|
|
# the ':' need not be checked but just to be sure |
|
|
|
|
if all(map(lambda x: x in line, ["(", ")", ":"])): |
|
|
|
|
sigs["0x" + line.split(":")[0]] = [line.split(":")[1].strip()] |
|
|
|
|
solc_bytes = "0x" + line.split(":")[0] |
|
|
|
|
solc_text = line.split(":")[1].strip() |
|
|
|
|
self.solidity_sigs[solc_bytes].append(solc_text) |
|
|
|
|
|
|
|
|
|
logging.debug("Signatures: found %d signatures after parsing" % len(sigs)) |
|
|
|
|
logging.debug( |
|
|
|
|
"Signatures: found %d signatures after parsing" % len(self.solidity_sigs) |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
# update DB with what we've found |
|
|
|
|
for byte_sig, text_sigs in sigs.items(): |
|
|
|
|
for byte_sig, text_sigs in self.solidity_sigs.items(): |
|
|
|
|
for text_sig in text_sigs: |
|
|
|
|
self.add(byte_sig, text_sig) |
|
|
|
|
|
|
|
|
|