Add security analysis modules

pull/23/head
Bernhard Mueller 7 years ago
parent d746ab1043
commit 5496ad2f58
  1. 41
      mythril/analysis/modules/delegatecall_forward.py
  2. 44
      mythril/analysis/modules/ether_send.py
  3. 50
      mythril/analysis/modules/unchecked_suicide.py
  4. 8
      mythril/analysis/ops.py
  5. 30
      mythril/analysis/report.py
  6. 11
      mythril/analysis/security.py
  7. 2
      mythril/analysis/symbolic.py

@ -0,0 +1,41 @@
from z3 import *
import re
from mythril.analysis.ops import *
from mythril.analysis.report import Issue
'''
MODULE DESCRIPTION:
Check for invocations of delegatecall(msg.data) in the fallback function.
'''
def execute(statespace):
issues = []
for call in statespace.calls:
if (call.type == "DELEGATECALL") and (call.node.function_name == "main"):
stack = call.state.stack
meminstart = get_variable(stack[-3])
if meminstart.type == VarType.CONCRETE:
if (re.search(r'calldata.*_0', str(call.state.memory[meminstart.val]))):
issue = Issue("CALLDATA forwarded with delegatecall()", "INFORMATION")
issue.description = \
"The contract '" + str(call.node.module_name) + "' forwards its calldata via DELEGATECALL in its fallback function. " \
"This means that any function in the called contract can be executed. Make sure to scan with the -l option so dependencies are checked as well.\n"
if (call.to.type == VarType.CONCRETE):
issue.description += ("DELEGATECALL target: " + hex(call.to.val))
else:
issue.description += "DELEGATECALL target: " + str(call.to)
issues.append(issue)
return issues

@ -1,37 +1,47 @@
from z3 import * from z3 import *
from laser.ethereum import helper
import re import re
import logging import logging
from enum import Enum from enum import Enum
from mythril.analysis.ops import * from mythril.analysis.ops import *
class TransferType(Enum): class InputSource(Enum):
HARDCODED = 1 CALLDATA = 1
CALLDATA = 2 STORAGE = 2
CALLER = 3
OTHER = 3
def execute(statespace): def execute(statespace):
for call in statespace.calls: for call in statespace.calls:
# print(str(call.node.module_name) + ":" + str(call.node.function_name) + ": " + str(call.call_type) + " " + str(call.to) + " value " + str(call.call_value)) logging.info("CALL: " + str(call.node.module_name) + ":" + str(call.node.function_name) + ": " + str(call.type) + " " + str(call.to) + " value " + str(call.value))
# Checks on value
interesting = False interesting = False
if (call.call_value.type == VarType.CONCRETE and call.call_value.val > 0): if (call.value.type == VarType.CONCRETE and call.value.val > 0) or (call.value.type == VarType.SYMBOLIC):
interesting = True
logging.info("CALL with non-zero value: " + str(call.node.module_name) + ":" + str(call.node.function_name) + ": " + str(call.type) + " " + str(call.to) + " value " + str(call.value))
if (call.call_value.type == VarType.SYMBOLIC):
interesting = True interesting = True
if (interesting): if call.to.type == VarType.CONCRETE:
logging.info("Concrete 'to' address: " + call.to.hex())
else:
m = re.search(r"calldata", str(call.to))
if (m):
logging.info("To-address from calldata: " + str(call.to))
print(str(call.node.module_name) + ":" + str(call.node.function_name) + ": " + str(call.call_type) + " " + str(call.to) + " value " + str(call.call_value)) if call.value.type == VarType.CONCRETE:
logging.info("Sending concrete amount: " + call.value)
else:
m = re.search(r"calldata", str(call.to))
if (m):
logging.info("To-address from calldata: " + str(call.to))
if (interesting):
s = Solver() s = Solver()
s.set(timeout=10000) s.set(timeout=10000)
@ -39,14 +49,14 @@ def execute(statespace):
for constraint in call.node.constraints: for constraint in call.node.constraints:
s.add(constraint) s.add(constraint)
print("SOLUTION:") logging.info("SOLUTION:")
if (s.check() == sat): if (s.check() == sat):
m = s.model() m = s.model()
for d in m.decls(): for d in m.decls():
print("%s = 0x%x" % (d.name(), m[d].as_long())) logging.info("%s = 0x%x" % (d.name(), m[d].as_long()))
else: else:
print("unsat") logging.info("unsat")

@ -1,7 +1,19 @@
from z3 import * from z3 import *
from mythril.analysis.ops import *
from mythril.analysis.report import Issue
def execute(svm): '''
MODULE DESCRIPTION:
Check for SUICIDE instructions that are not constrained by caller.
'''
def execute(statespace):
issues = []
svm = statespace.svm
for k in svm.nodes: for k in svm.nodes:
node = svm.nodes[k] node = svm.nodes[k]
@ -9,23 +21,41 @@ def execute(svm):
for instruction in node.instruction_list: for instruction in node.instruction_list:
if(instruction['opcode'] == "SUICIDE"): if(instruction['opcode'] == "SUICIDE"):
state = node.states[instruction['address']] state = node.states[instruction['address']]
to = state.stack.pop() to = state.stack.pop()
print("SUICIDE to: " + str(to)) constraint_on_caller = False
print("function: " + str(node.function_name))
s = Solver()
for constraint in node.constraints: for constraint in node.constraints:
s.add(constraint) if "caller" in str(constraint):
constraint_on_caller = True
break
if not constraint_on_caller:
s = Solver()
if (s.check() == sat): if (s.check() == sat):
print("MODEL:") issue = Issue("Unchecked SUICIDE", "VULNERABILITY")
issue.description = "The function " + node.function_name + " calls the SUICIDE instruction. It appears as if the function does not verify the caller address.\n"
if ("caller" in str(to)):
issue.description += "The remaining Ether is sent to the caller's address.\n"
elif ("storage" in str(to)):
issue.description += "The remaining Ether is sent to a stored address\n"
else:
issue.description += "The remaining Ether is sent to: " + str(to) + "\n"
issue.description += "Solver output:\n"
m = s.model() m = s.model()
for d in m.decls(): for d in m.decls():
print("%s = 0x%x" % (d.name(), m[d].as_long())) issue.description += "%s = 0x%x\n" % (d.name(), m[d].as_long())
else:
print("unsat") issues.append(issue)
return issues

@ -23,15 +23,17 @@ class Op:
def __init__(self, node, addr): def __init__(self, node, addr):
self.node = node self.node = node
self.addr = addr self.addr = addr
self.state = node.states[addr]
class Call(Op): class Call(Op):
def __init__(self, node, addr, call_type, to, value = Variable(0, VarType.CONCRETE), data = None): def __init__(self, node, addr, _type, to, value = Variable(0, VarType.CONCRETE), data = None):
super().__init__(node, addr) super().__init__(node, addr)
self.to = to self.to = to
self.call_type = call_type self.type = _type
self.call_value = value self.value = value
self.data = data self.data = data
class Suicide(Op): class Suicide(Op):

@ -0,0 +1,30 @@
class Issue:
def __init__(self, title, _type = "INFORMATIONAL", description = ""):
self.title = title
self.description = description
self.type = _type
def as_dict(self):
return {'title': self.title, 'description':self.description, 'type': self.type}
class Report:
def __init__(self, issues = []):
self.issues = issues
def as_text(self):
text = ""
for issue in self.issues:
text += "=== " + issue.title + " ===\n"
text += "STATUS: " + issue.type + "\n"
text += issue.description + "\n"
return text

@ -1,9 +1,14 @@
from .modules import ether_send from mythril.analysis.report import Report
from .modules import delegatecall_forward, unchecked_suicide
def fire_lasers(statespace): def fire_lasers(statespace):
ether_send.execute(statespace) issues = []
pass issues += delegatecall_forward.execute(statespace)
issues += unchecked_suicide.execute(statespace)
report = Report(issues)
print(report.as_text())

@ -40,7 +40,7 @@ class StateSpace:
op = instruction['opcode'] op = instruction['opcode']
if op in ('CALL', 'CALLCODE', 'DELEGATECALL', 'STATICCALL'): if op in ('CALL', 'CALLCODE', 'DELEGATECALL', 'STATICCALL'):
stack = self.svm.nodes[key].states[instruction['address']].stack stack = copy.deepcopy(self.svm.nodes[key].states[instruction['address']].stack)
if op in ('CALL', 'CALLCODE'): if op in ('CALL', 'CALLCODE'):
gas, to, value, meminstart, meminsz, memoutstart, memoutsz = \ gas, to, value, meminstart, meminsz, memoutstart, memoutsz = \

Loading…
Cancel
Save