Refactor reentrancy detector:

- Graph traversal is performed one time for all the variants (speedup)
        - Add better support for internal calls
pull/162/head
Josselin 6 years ago
parent bec190e877
commit 7d04cc049d
  1. 189
      slither/detectors/reentrancy/reentrancy.py
  2. 179
      slither/detectors/reentrancy/reentrancy_benign.py
  3. 186
      slither/detectors/reentrancy/reentrancy_eth.py
  4. 180
      slither/detectors/reentrancy/reentrancy_read_before_write.py

@ -0,0 +1,189 @@
""""
Re-entrancy detection
Based on heuristics, it may lead to FP and FN
Iterate over all the nodes of the graph until reaching a fixpoint
"""
from slither.core.cfg.node import NodeType
from slither.core.declarations import Function, SolidityFunction
from slither.core.expressions import UnaryOperation, UnaryOperationType
from slither.detectors.abstract_detector import (AbstractDetector,
DetectorClassification)
from slither.slithir.operations import (HighLevelCall, LowLevelCall,
LibraryCall,
Send, Transfer)
def union_dict(d1, d2):
d3 = {k: d1.get(k, []) + d2.get(k, []) for k in set(list(d1.keys()) + list(d2.keys()))}
return d3
def dict_are_equal(d1, d2):
if set(list(d1.keys())) != set(list(d2.keys())):
return False
return all(set(d1[k]) == set(d2[k]) for k in d1.keys())
class Reentrancy(AbstractDetector):
# This detector is not meant to be registered
# It is inherited by reentrancy variantsœ
# ARGUMENT = 'reentrancy'
# HELP = 'Reentrancy vulnerabilities'
# IMPACT = DetectorClassification.HIGH
# CONFIDENCE = DetectorClassification.HIGH
KEY = 'REENTRANCY'
@staticmethod
def _can_callback(irs):
"""
Detect if the node contains a call that can
be used to re-entrance
Consider as valid target:
- low level call
- high level call
Do not consider Send/Transfer as there is not enough gas
"""
for ir in irs:
if isinstance(ir, LowLevelCall):
return True
if isinstance(ir, HighLevelCall) and not isinstance(ir, LibraryCall):
return True
return False
@staticmethod
def _can_send_eth(irs):
"""
Detect if the node can send eth
"""
for ir in irs:
if isinstance(ir, (HighLevelCall, LowLevelCall, Transfer, Send)):
if ir.call_value:
return True
return False
def _filter_if(self, node):
"""
Check if the node is a condtional node where
there is an external call checked
Heuristic:
- The call is a IF node
- It contains a, external call
- The condition is the negation (!)
This will work only on naive implementation
"""
return isinstance(node.expression, UnaryOperation)\
and node.expression.type == UnaryOperationType.BANG
def _explore(self, node, visited, skip_father=None):
"""
Explore the CFG and look for re-entrancy
Heuristic: There is a re-entrancy if a state variable is written
after an external call
node.context will contains the external calls executed
It contains the calls executed in father nodes
if node.context is not empty, and variables are written, a re-entrancy is possible
"""
if node in visited:
return
visited = visited + [node]
# First we add the external calls executed in previous nodes
# send_eth returns the list of calls sending value
# calls returns the list of calls that can callback
# read returns the variable read
# read_prior_calls returns the variable read prior a call
fathers_context = {'send_eth':[], 'calls':[], 'read':[], 'read_prior_calls':{}}
for father in node.fathers:
if self.KEY in father.context:
fathers_context['send_eth'] += [s for s in father.context[self.KEY]['send_eth'] if s!=skip_father]
fathers_context['calls'] += [c for c in father.context[self.KEY]['calls'] if c!=skip_father]
fathers_context['read'] += father.context[self.KEY]['read']
fathers_context['read_prior_calls'] = union_dict(fathers_context['read_prior_calls'], father.context[self.KEY]['read_prior_calls'])
# Exclude path that dont bring further information
if node in self.visited_all_paths:
if all(call in self.visited_all_paths[node]['calls'] for call in fathers_context['calls']):
if all(send in self.visited_all_paths[node]['send_eth'] for send in fathers_context['send_eth']):
if all(read in self.visited_all_paths[node]['read'] for read in fathers_context['read']):
if dict_are_equal(self.visited_all_paths[node]['read_prior_calls'], fathers_context['read_prior_calls']):
return
else:
self.visited_all_paths[node] = {'send_eth':[], 'calls':[], 'read':[], 'read_prior_calls':{}}
self.visited_all_paths[node]['send_eth'] = list(set(self.visited_all_paths[node]['send_eth'] + fathers_context['send_eth']))
self.visited_all_paths[node]['calls'] = list(set(self.visited_all_paths[node]['calls'] + fathers_context['calls']))
self.visited_all_paths[node]['read'] = list(set(self.visited_all_paths[node]['read'] + fathers_context['read']))
self.visited_all_paths[node]['read_prior_calls'] = union_dict(self.visited_all_paths[node]['read_prior_calls'], fathers_context['read_prior_calls'])
node.context[self.KEY] = fathers_context
state_vars_read = node.state_variables_read
# All the state variables written
state_vars_written = node.state_variables_written
slithir_operations = []
# Add the state variables written in internal calls
for internal_call in node.internal_calls:
# Filter to Function, as internal_call can be a solidity call
if isinstance(internal_call, Function):
state_vars_written += internal_call.all_state_variables_written()
state_vars_read += internal_call.all_state_variables_read()
slithir_operations += internal_call.all_slithir_operations()
contains_call = False
node.context[self.KEY]['written'] = state_vars_written
if self._can_callback(node.irs + slithir_operations):
node.context[self.KEY]['calls'] = list(set(node.context[self.KEY]['calls'] + [node]))
node.context[self.KEY]['read_prior_calls'][node] = list(set(node.context[self.KEY]['read_prior_calls'].get(node, []) + node.context[self.KEY]['read']+ state_vars_read))
contains_call = True
if self._can_send_eth(node.irs + slithir_operations):
node.context[self.KEY]['send_eth'] = list(set(node.context[self.KEY]['send_eth'] + [node]))
node.context[self.KEY]['read'] = list(set(node.context[self.KEY]['read'] + state_vars_read))
sons = node.sons
if contains_call and node.type in [NodeType.IF, NodeType.IFLOOP]:
if self._filter_if(node):
son = sons[0]
self._explore(son, visited, node)
sons = sons[1:]
else:
son = sons[1]
self._explore(son, visited, node)
sons = [sons[0]]
for son in sons:
self._explore(son, visited)
def detect_reentrancy(self, contract):
"""
"""
for function in contract.functions_and_modifiers_not_inherited:
if function.is_implemented:
if self.KEY in function.context:
continue
self._explore(function.entry_point, [])
function.context[self.KEY] = True
def detect(self):
"""
"""
# if a node was already visited by another path
# we will only explore it if the traversal brings
# new variables written
# This speedup the exploration through a light fixpoint
# Its particular useful on 'complex' functions with several loops and conditions
self.visited_all_paths = {}
for c in self.contracts:
self.detect_reentrancy(c)
return []

@ -8,14 +8,15 @@
from slither.core.cfg.node import NodeType from slither.core.cfg.node import NodeType
from slither.core.declarations import Function, SolidityFunction from slither.core.declarations import Function, SolidityFunction
from slither.core.expressions import UnaryOperation, UnaryOperationType from slither.core.expressions import UnaryOperation, UnaryOperationType
from slither.detectors.abstract_detector import (AbstractDetector, from slither.detectors.abstract_detector import DetectorClassification
DetectorClassification)
from slither.visitors.expression.export_values import ExportValues from slither.visitors.expression.export_values import ExportValues
from slither.slithir.operations import (HighLevelCall, LowLevelCall, from slither.slithir.operations import (HighLevelCall, LowLevelCall,
LibraryCall, LibraryCall,
Send, Transfer) Send, Transfer)
class ReentrancyBenign(AbstractDetector): from .reentrancy import Reentrancy
class ReentrancyBenign(Reentrancy):
ARGUMENT = 'reentrancy-benign' ARGUMENT = 'reentrancy-benign'
HELP = 'Benign reentrancy vulnerabilities' HELP = 'Benign reentrancy vulnerabilities'
IMPACT = DetectorClassification.LOW IMPACT = DetectorClassification.LOW
@ -23,170 +24,36 @@ class ReentrancyBenign(AbstractDetector):
WIKI = 'https://github.com/trailofbits/slither/wiki/Vulnerabilities-Description#reentrancy-vulnerabilities-2' WIKI = 'https://github.com/trailofbits/slither/wiki/Vulnerabilities-Description#reentrancy-vulnerabilities-2'
key = 'REENTRANCY-BENIGN' def find_reentrancies(self):
result = {}
@staticmethod for contract in self.contracts:
def _can_callback(node): for f in contract.functions_and_modifiers_not_inherited:
""" for node in f.nodes:
Detect if the node contains a call that can if node.context[self.KEY]['calls']:
be used to re-entrance not_read_then_written = [(v, node) for v in node.context[self.KEY]['written']
if v not in node.context[self.KEY]['read']]
Consider as valid target: if not_read_then_written:
- low level call
- high level call
Do not consider Send/Transfer as there is not enough gas
"""
for ir in node.irs:
if isinstance(ir, LowLevelCall):
return True
if isinstance(ir, HighLevelCall) and not isinstance(ir, LibraryCall):
return True
return False
@staticmethod
def _can_send_eth(node):
"""
Detect if the node can send eth
"""
for ir in node.irs:
if isinstance(ir, (HighLevelCall, LowLevelCall, Transfer, Send)):
if ir.call_value:
return True
return False
def _filter_if(self, node):
"""
Check if the node is a condtional node where
there is an external call checked
Heuristic:
- The call is a IF node
- It contains a, external call
- The condition is the negation (!)
This will work only on naive implementation
"""
return isinstance(node.expression, UnaryOperation)\
and node.expression.type == UnaryOperationType.BANG
def _explore(self, node, visited, skip_father=None):
"""
Explore the CFG and look for re-entrancy
Heuristic: There is a re-entrancy if a state variable is written
after an external call
node.context will contains the external calls executed
It contains the calls executed in father nodes
if node.context is not empty, and variables are written, a re-entrancy is possible
"""
if node in visited:
return
visited = visited + [node]
# First we add the external calls executed in previous nodes
# send_eth returns the list of calls sending value
# calls returns the list of calls that can callback
# read returns the variable read
fathers_context = {'send_eth':[], 'calls':[], 'read':[]}
for father in node.fathers:
if self.key in father.context:
fathers_context['send_eth'] += [s for s in father.context[self.key]['send_eth'] if s!=skip_father]
fathers_context['calls'] += [c for c in father.context[self.key]['calls'] if c!=skip_father]
fathers_context['read'] += father.context[self.key]['read']
# Exclude path that dont bring further information
if node in self.visited_all_paths:
if all(call in self.visited_all_paths[node]['calls'] for call in fathers_context['calls']):
if all(send in self.visited_all_paths[node]['send_eth'] for send in fathers_context['send_eth']):
if all(read in self.visited_all_paths[node]['read'] for read in fathers_context['read']):
return
else:
self.visited_all_paths[node] = {'send_eth':[], 'calls':[], 'read':[]}
self.visited_all_paths[node]['send_eth'] = list(set(self.visited_all_paths[node]['send_eth'] + fathers_context['send_eth']))
self.visited_all_paths[node]['calls'] = list(set(self.visited_all_paths[node]['calls'] + fathers_context['calls']))
self.visited_all_paths[node]['read'] = list(set(self.visited_all_paths[node]['read'] + fathers_context['read']))
node.context[self.key] = fathers_context
state_vars_read = node.state_variables_read
# All the state variables written
state_vars_written = node.state_variables_written
# Add the state variables written in internal calls
for internal_call in node.internal_calls:
# Filter to Function, as internal_call can be a solidity call
if isinstance(internal_call, Function):
state_vars_written += internal_call.all_state_variables_written()
state_vars_read += internal_call.all_state_variables_read()
contains_call = False
if self._can_callback(node):
node.context[self.key]['calls'] = list(set(node.context[self.key]['calls'] + [node]))
contains_call = True
if self._can_send_eth(node):
node.context[self.key]['send_eth'] = list(set(node.context[self.key]['send_eth'] + [node]))
not_read_then_written = [(v, node) for v in state_vars_written if v not in node.context[self.key]['read']]
node.context[self.key]['read'] = list(set(node.context[self.key]['read'] + state_vars_read))
# If a state variables was read and is then written, there is a dangerous call and
# ether were sent
# We found a potential re-entrancy bug
if (not_read_then_written and
node.context[self.key]['calls']):
# calls are ordered # calls are ordered
finding_key = (node.function, finding_key = (node.function,
tuple(set(node.context[self.key]['calls'])), tuple(set(node.context[self.KEY]['calls'])),
tuple(set(node.context[self.key]['send_eth']))) tuple(set(node.context[self.KEY]['send_eth'])))
finding_vars = not_read_then_written finding_vars = not_read_then_written
if finding_key not in self.result: if finding_key not in result:
self.result[finding_key] = [] result[finding_key] = []
self.result[finding_key] = list(set(self.result[finding_key] + finding_vars)) result[finding_key] = list(set(result[finding_key] + finding_vars))
return result
sons = node.sons
if contains_call and node.type in [NodeType.IF, NodeType.IFLOOP]:
if self._filter_if(node):
son = sons[0]
self._explore(son, visited, node)
sons = sons[1:]
else:
son = sons[1]
self._explore(son, visited, node)
sons = [sons[0]]
for son in sons:
self._explore(son, visited)
def detect_reentrancy(self, contract):
"""
"""
for function in contract.functions:
if function.is_implemented:
self._explore(function.entry_point, [])
def detect(self): def detect(self):
""" """
""" """
self.result = {}
# if a node was already visited by another path super().detect()
# we will only explore it if the traversal brings reentrancies = self.find_reentrancies()
# new variables written
# This speedup the exploration through a light fixpoint
# Its particular useful on 'complex' functions with several loops and conditions
self.visited_all_paths = {}
for c in self.contracts:
self.detect_reentrancy(c)
results = [] results = []
result_sorted = sorted(list(self.result.items()), key=lambda x:x[0][0].name) result_sorted = sorted(list(reentrancies.items()), key=lambda x:x[0][0].name)
for (func, calls, send_eth), varsWritten in result_sorted: for (func, calls, send_eth), varsWritten in result_sorted:
calls = list(set(calls)) calls = list(set(calls))
send_eth = list(set(send_eth)) send_eth = list(set(send_eth))
@ -195,7 +62,7 @@ class ReentrancyBenign(AbstractDetector):
info += '\tExternal calls:\n' info += '\tExternal calls:\n'
for call_info in calls: for call_info in calls:
info += '\t- {} ({})\n'.format(call_info.expression, call_info.source_mapping_str) info += '\t- {} ({})\n'.format(call_info.expression, call_info.source_mapping_str)
if calls != send_eth: if calls != send_eth and send_eth:
info += '\tExternal calls sending eth:\n' info += '\tExternal calls sending eth:\n'
for call_info in send_eth: for call_info in send_eth:
info += '\t- {} ({})\n'.format(call_info.expression, call_info.source_mapping_str) info += '\t- {} ({})\n'.format(call_info.expression, call_info.source_mapping_str)

@ -8,13 +8,14 @@
from slither.core.cfg.node import NodeType from slither.core.cfg.node import NodeType
from slither.core.declarations import Function, SolidityFunction from slither.core.declarations import Function, SolidityFunction
from slither.core.expressions import UnaryOperation, UnaryOperationType from slither.core.expressions import UnaryOperation, UnaryOperationType
from slither.detectors.abstract_detector import (AbstractDetector, from slither.detectors.abstract_detector import DetectorClassification
DetectorClassification)
from slither.slithir.operations import (HighLevelCall, LowLevelCall, from slither.slithir.operations import (HighLevelCall, LowLevelCall,
LibraryCall, LibraryCall,
Send, Transfer) Send, Transfer)
class ReentrancyEth(AbstractDetector):
from .reentrancy import Reentrancy
class ReentrancyEth(Reentrancy):
ARGUMENT = 'reentrancy-eth' ARGUMENT = 'reentrancy-eth'
HELP = 'Reentrancy vulnerabilities (theft of ethers)' HELP = 'Reentrancy vulnerabilities (theft of ethers)'
IMPACT = DetectorClassification.HIGH IMPACT = DetectorClassification.HIGH
@ -24,175 +25,38 @@ class ReentrancyEth(AbstractDetector):
key = 'REENTRANCY-ETHERS' key = 'REENTRANCY-ETHERS'
@staticmethod def find_reentrancies(self):
def _can_callback(node): result = {}
""" for contract in self.contracts:
Detect if the node contains a call that can for f in contract.functions_and_modifiers_not_inherited:
be used to re-entrance for node in f.nodes:
if node.context[self.KEY]['calls'] and node.context[self.KEY]['send_eth']:
Consider as valid target: read_then_written = []
- low level call for c in node.context[self.KEY]['calls']:
- high level call read_then_written += [(v, node) for v in node.context[self.KEY]['written']
if v in node.context[self.KEY]['read_prior_calls'][c]]
Do not consider Send/Transfer as there is not enough gas
""" if read_then_written:
for ir in node.irs:
if isinstance(ir, LowLevelCall):
return True
if isinstance(ir, HighLevelCall) and not isinstance(ir, LibraryCall):
return True
return False
@staticmethod
def _can_send_eth(node):
"""
Detect if the node can send eth
"""
for ir in node.irs:
if isinstance(ir, (HighLevelCall, LowLevelCall, Transfer, Send)):
if ir.call_value:
return True
return False
def _filter_if(self, node):
"""
Check if the node is a condtional node where
there is an external call checked
Heuristic:
- The call is a IF node
- It contains a, external call
- The condition is the negation (!)
This will work only on naive implementation
"""
return isinstance(node.expression, UnaryOperation)\
and node.expression.type == UnaryOperationType.BANG
def _explore(self, node, visited, skip_father=None):
"""
Explore the CFG and look for re-entrancy
Heuristic: There is a re-entrancy if a state variable is written
after an external call
node.context will contains the external calls executed
It contains the calls executed in father nodes
if node.context is not empty, and variables are written, a re-entrancy is possible
"""
if node in visited:
return
visited = visited + [node]
# First we add the external calls executed in previous nodes
# send_eth returns the list of calls sending value
# calls returns the list of calls that can callback
# read returns the variable read
fathers_context = {'send_eth':[], 'calls':[], 'read':[], 'read_prior_calls':[]}
for father in node.fathers:
if self.key in father.context:
fathers_context['send_eth'] += [s for s in father.context[self.key]['send_eth'] if s!=skip_father]
fathers_context['calls'] += [c for c in father.context[self.key]['calls'] if c!=skip_father]
fathers_context['read'] += father.context[self.key]['read']
fathers_context['read_prior_calls'] += father.context[self.key]['read_prior_calls']
# Exclude path that dont bring further information
if node in self.visited_all_paths:
if all(call in self.visited_all_paths[node]['calls'] for call in fathers_context['calls']):
if all(send in self.visited_all_paths[node]['send_eth'] for send in fathers_context['send_eth']):
if all(read in self.visited_all_paths[node]['read'] for read in fathers_context['read']):
if all(read in self.visited_all_paths[node]['read_prior_calls'] for read in fathers_context['read_prior_calls']):
return
else:
self.visited_all_paths[node] = {'send_eth':[], 'calls':[], 'read':[], 'read_prior_calls':[]}
self.visited_all_paths[node]['send_eth'] = list(set(self.visited_all_paths[node]['send_eth'] + fathers_context['send_eth']))
self.visited_all_paths[node]['calls'] = list(set(self.visited_all_paths[node]['calls'] + fathers_context['calls']))
self.visited_all_paths[node]['read'] = list(set(self.visited_all_paths[node]['read'] + fathers_context['read']))
self.visited_all_paths[node]['read_prior_calls'] = list(set(self.visited_all_paths[node]['read_prior_calls'] + fathers_context['read_prior_calls']))
node.context[self.key] = fathers_context
state_vars_read = node.state_variables_read
# All the state variables written
state_vars_written = node.state_variables_written
# Add the state variables written in internal calls
for internal_call in node.internal_calls:
# Filter to Function, as internal_call can be a solidity call
if isinstance(internal_call, Function):
state_vars_written += internal_call.all_state_variables_written()
state_vars_read += internal_call.all_state_variables_read()
contains_call = False
if self._can_callback(node):
node.context[self.key]['calls'] = list(set(node.context[self.key]['calls'] + [node]))
node.context[self.key]['read_prior_calls'] = list(set(node.context[self.key]['read_prior_calls'] + node.context[self.key]['read']+ state_vars_read))
node.context[self.key]['read'] = []
contains_call = True
if self._can_send_eth(node):
node.context[self.key]['send_eth'] = list(set(node.context[self.key]['send_eth'] + [node]))
read_then_written = [(v, node) for v in state_vars_written if v in node.context[self.key]['read_prior_calls']]
node.context[self.key]['read'] = list(set(node.context[self.key]['read'] + state_vars_read))
# If a state variables was read and is then written, there is a dangerous call and
# ether were sent
# We found a potential re-entrancy bug
if (read_then_written and
node.context[self.key]['calls'] and
node.context[self.key]['send_eth']):
# calls are ordered # calls are ordered
finding_key = (node.function, finding_key = (node.function,
tuple(set(node.context[self.key]['calls'])), tuple(set(node.context[self.KEY]['calls'])),
tuple(set(node.context[self.key]['send_eth']))) tuple(set(node.context[self.KEY]['send_eth'])))
finding_vars = read_then_written finding_vars = read_then_written
if finding_key not in self.result: if finding_key not in result:
self.result[finding_key] = [] result[finding_key] = []
self.result[finding_key] = list(set(self.result[finding_key] + finding_vars)) result[finding_key] = list(set(result[finding_key] + finding_vars))
return result
sons = node.sons
if contains_call and node.type in [NodeType.IF, NodeType.IFLOOP]:
if self._filter_if(node):
son = sons[0]
self._explore(son, visited, node)
sons = sons[1:]
else:
son = sons[1]
self._explore(son, visited, node)
sons = [sons[0]]
for son in sons:
self._explore(son, visited)
def detect_reentrancy(self, contract):
"""
"""
for function in contract.functions:
if function.is_implemented:
self._explore(function.entry_point, [])
def detect(self): def detect(self):
""" """
""" """
self.result = {} super().detect()
# if a node was already visited by another path
# we will only explore it if the traversal brings
# new variables written
# This speedup the exploration through a light fixpoint
# Its particular useful on 'complex' functions with several loops and conditions
self.visited_all_paths = {}
for c in self.contracts: reentrancies = self.find_reentrancies()
self.detect_reentrancy(c)
results = [] results = []
result_sorted = sorted(list(self.result.items()), key=lambda x:x[0][0].name) result_sorted = sorted(list(reentrancies.items()), key=lambda x:x[0][0].name)
for (func, calls, send_eth), varsWritten in result_sorted: for (func, calls, send_eth), varsWritten in result_sorted:
calls = list(set(calls)) calls = list(set(calls))
send_eth = list(set(send_eth)) send_eth = list(set(send_eth))

@ -8,14 +8,16 @@
from slither.core.cfg.node import NodeType from slither.core.cfg.node import NodeType
from slither.core.declarations import Function, SolidityFunction from slither.core.declarations import Function, SolidityFunction
from slither.core.expressions import UnaryOperation, UnaryOperationType from slither.core.expressions import UnaryOperation, UnaryOperationType
from slither.detectors.abstract_detector import (AbstractDetector, from slither.detectors.abstract_detector import DetectorClassification
DetectorClassification)
from slither.visitors.expression.export_values import ExportValues from slither.visitors.expression.export_values import ExportValues
from slither.slithir.operations import (HighLevelCall, LowLevelCall, from slither.slithir.operations import (HighLevelCall, LowLevelCall,
LibraryCall, LibraryCall,
Send, Transfer) Send, Transfer)
class ReentrancyReadBeforeWritten(AbstractDetector):
from .reentrancy import Reentrancy
class ReentrancyReadBeforeWritten(Reentrancy):
ARGUMENT = 'reentrancy-no-eth' ARGUMENT = 'reentrancy-no-eth'
HELP = 'Reentrancy vulnerabilities (no theft of ethers)' HELP = 'Reentrancy vulnerabilities (no theft of ethers)'
IMPACT = DetectorClassification.MEDIUM IMPACT = DetectorClassification.MEDIUM
@ -23,174 +25,38 @@ class ReentrancyReadBeforeWritten(AbstractDetector):
WIKI = 'https://github.com/trailofbits/slither/wiki/Vulnerabilities-Description#reentrancy-vulnerabilities-1' WIKI = 'https://github.com/trailofbits/slither/wiki/Vulnerabilities-Description#reentrancy-vulnerabilities-1'
key = 'REENTRANCY-NO-ETHER' def find_reentrancies(self):
result = {}
@staticmethod for contract in self.contracts:
def _can_callback(node): for f in contract.functions_and_modifiers_not_inherited:
""" for node in f.nodes:
Detect if the node contains a call that can if node.context[self.KEY]['calls'] and not node.context[self.KEY]['send_eth']:
be used to re-entrance read_then_written = []
for c in node.context[self.KEY]['calls']:
Consider as valid target: read_then_written += [(v, node) for v in node.context[self.KEY]['written']
- low level call if v in node.context[self.KEY]['read_prior_calls'][c]]
- high level call
Do not consider Send/Transfer as there is not enough gas
"""
for ir in node.irs:
if isinstance(ir, LowLevelCall):
return True
if isinstance(ir, HighLevelCall) and not isinstance(ir, LibraryCall):
return True
return False
@staticmethod
def _can_send_eth(node):
"""
Detect if the node can send eth
"""
for ir in node.irs:
if isinstance(ir, (HighLevelCall, LowLevelCall, Transfer, Send)):
if ir.call_value:
return True
return False
def _filter_if(self, node):
"""
Check if the node is a condtional node where
there is an external call checked
Heuristic:
- The call is a IF node
- It contains a, external call
- The condition is the negation (!)
This will work only on naive implementation
"""
return isinstance(node.expression, UnaryOperation)\
and node.expression.type == UnaryOperationType.BANG
def _explore(self, node, visited, skip_father=None):
"""
Explore the CFG and look for re-entrancy
Heuristic: There is a re-entrancy if a state variable is written
after an external call
node.context will contains the external calls executed
It contains the calls executed in father nodes
if node.context is not empty, and variables are written, a re-entrancy is possible
"""
if node in visited:
return
visited = visited + [node]
# First we add the external calls executed in previous nodes
# send_eth returns the list of calls sending value
# calls returns the list of calls that can callback
# read returns the variable read
fathers_context = {'send_eth':[], 'calls':[], 'read':[], 'read_prior_calls':[]}
for father in node.fathers:
if self.key in father.context:
fathers_context['send_eth'] += [s for s in father.context[self.key]['send_eth'] if s!=skip_father]
fathers_context['calls'] += [c for c in father.context[self.key]['calls'] if c!=skip_father]
fathers_context['read'] += father.context[self.key]['read']
fathers_context['read_prior_calls'] += father.context[self.key]['read_prior_calls']
# Exclude path that dont bring further information
if node in self.visited_all_paths:
if all(call in self.visited_all_paths[node]['calls'] for call in fathers_context['calls']):
if all(send in self.visited_all_paths[node]['send_eth'] for send in fathers_context['send_eth']):
if all(read in self.visited_all_paths[node]['read'] for read in fathers_context['read']):
if all(read in self.visited_all_paths[node]['read_prior_calls'] for read in fathers_context['read_prior_calls']):
return
else:
self.visited_all_paths[node] = {'send_eth':[], 'calls':[], 'read':[], 'read_prior_calls':[]}
self.visited_all_paths[node]['send_eth'] = list(set(self.visited_all_paths[node]['send_eth'] + fathers_context['send_eth']))
self.visited_all_paths[node]['calls'] = list(set(self.visited_all_paths[node]['calls'] + fathers_context['calls']))
self.visited_all_paths[node]['read'] = list(set(self.visited_all_paths[node]['read'] + fathers_context['read']))
self.visited_all_paths[node]['read_prior_calls'] = list(set(self.visited_all_paths[node]['read_prior_calls'] + fathers_context['read_prior_calls']))
node.context[self.key] = fathers_context
state_vars_read = node.state_variables_read
# All the state variables written
state_vars_written = node.state_variables_written
# Add the state variables written in internal calls
for internal_call in node.internal_calls:
# Filter to Function, as internal_call can be a solidity call
if isinstance(internal_call, Function):
state_vars_written += internal_call.all_state_variables_written()
state_vars_read += internal_call.all_state_variables_read()
contains_call = False
if self._can_callback(node):
node.context[self.key]['calls'] = list(set(node.context[self.key]['calls'] + [node]))
node.context[self.key]['read_prior_calls'] = list(set(node.context[self.key]['read_prior_calls'] + node.context[self.key]['read'] + state_vars_read))
node.context[self.key]['read'] = []
contains_call = True
if self._can_send_eth(node):
node.context[self.key]['send_eth'] = list(set(node.context[self.key]['send_eth'] + [node]))
read_then_written = [(v, node) for v in state_vars_written if v in node.context[self.key]['read_prior_calls']]
node.context[self.key]['read'] = list(set(node.context[self.key]['read'] + state_vars_read))
# If a state variables was read and is then written, there is a dangerous call and
# ether were sent
# We found a potential re-entrancy bug # We found a potential re-entrancy bug
if (read_then_written and if read_then_written:
node.context[self.key]['calls'] and
not node.context[self.key]['send_eth']):
# calls are ordered # calls are ordered
finding_key = (node.function, finding_key = (node.function,
tuple(set(node.context[self.key]['calls']))) tuple(set(node.context[self.KEY]['calls'])))
finding_vars = read_then_written finding_vars = read_then_written
if finding_key not in self.result: if finding_key not in self.result:
self.result[finding_key] = [] result[finding_key] = []
self.result[finding_key] = list(set(self.result[finding_key] + finding_vars)) result[finding_key] = list(set(result[finding_key] + finding_vars))
return result
sons = node.sons
if contains_call and node.type in [NodeType.IF, NodeType.IFLOOP]:
if self._filter_if(node):
son = sons[0]
self._explore(son, visited, node)
sons = sons[1:]
else:
son = sons[1]
self._explore(son, visited, node)
sons = [sons[0]]
for son in sons:
self._explore(son, visited)
def detect_reentrancy(self, contract):
"""
"""
for function in contract.functions:
if function.is_implemented:
self._explore(function.entry_point, [])
def detect(self): def detect(self):
""" """
""" """
self.result = {}
# if a node was already visited by another path
# we will only explore it if the traversal brings
# new variables written
# This speedup the exploration through a light fixpoint
# Its particular useful on 'complex' functions with several loops and conditions
self.visited_all_paths = {}
for c in self.contracts: super().detect()
self.detect_reentrancy(c) reentrancies = self.find_reentrancies()
results = [] results = []
result_sorted = sorted(list(self.result.items()), key=lambda x:x[0][0].name) result_sorted = sorted(list(reentrancies.items()), key=lambda x:x[0][0].name)
for (func, calls), varsWritten in result_sorted: for (func, calls), varsWritten in result_sorted:
calls = list(set(calls)) calls = list(set(calls))
info = 'Reentrancy in {}.{} ({}):\n' info = 'Reentrancy in {}.{} ({}):\n'

Loading…
Cancel
Save