From 7d04cc049d9c0c64a66ce1cba51cd26e266620c1 Mon Sep 17 00:00:00 2001 From: Josselin Date: Mon, 28 Jan 2019 16:01:34 +0100 Subject: [PATCH] Refactor reentrancy detector: - Graph traversal is performed one time for all the variants (speedup) - Add better support for internal calls --- slither/detectors/reentrancy/reentrancy.py | 189 +++++++++++++++++ .../detectors/reentrancy/reentrancy_benign.py | 187 +++-------------- .../detectors/reentrancy/reentrancy_eth.py | 192 +++--------------- .../reentrancy_read_before_write.py | 192 +++--------------- 4 files changed, 273 insertions(+), 487 deletions(-) create mode 100644 slither/detectors/reentrancy/reentrancy.py diff --git a/slither/detectors/reentrancy/reentrancy.py b/slither/detectors/reentrancy/reentrancy.py new file mode 100644 index 000000000..c06c427bd --- /dev/null +++ b/slither/detectors/reentrancy/reentrancy.py @@ -0,0 +1,189 @@ +"""" + Re-entrancy detection + + Based on heuristics, it may lead to FP and FN + Iterate over all the nodes of the graph until reaching a fixpoint +""" + +from slither.core.cfg.node import NodeType +from slither.core.declarations import Function, SolidityFunction +from slither.core.expressions import UnaryOperation, UnaryOperationType +from slither.detectors.abstract_detector import (AbstractDetector, + DetectorClassification) +from slither.slithir.operations import (HighLevelCall, LowLevelCall, + LibraryCall, + Send, Transfer) + +def union_dict(d1, d2): + d3 = {k: d1.get(k, []) + d2.get(k, []) for k in set(list(d1.keys()) + list(d2.keys()))} + return d3 + +def dict_are_equal(d1, d2): + if set(list(d1.keys())) != set(list(d2.keys())): + return False + return all(set(d1[k]) == set(d2[k]) for k in d1.keys()) + +class Reentrancy(AbstractDetector): +# This detector is not meant to be registered +# It is inherited by reentrancy variantsœ +# ARGUMENT = 'reentrancy' +# HELP = 'Reentrancy vulnerabilities' +# IMPACT = DetectorClassification.HIGH +# CONFIDENCE = DetectorClassification.HIGH + + KEY = 'REENTRANCY' + + @staticmethod + def _can_callback(irs): + """ + Detect if the node contains a call that can + be used to re-entrance + + Consider as valid target: + - low level call + - high level call + + Do not consider Send/Transfer as there is not enough gas + """ + for ir in irs: + if isinstance(ir, LowLevelCall): + return True + if isinstance(ir, HighLevelCall) and not isinstance(ir, LibraryCall): + return True + return False + + @staticmethod + def _can_send_eth(irs): + """ + Detect if the node can send eth + """ + for ir in irs: + if isinstance(ir, (HighLevelCall, LowLevelCall, Transfer, Send)): + if ir.call_value: + return True + return False + + def _filter_if(self, node): + """ + Check if the node is a condtional node where + there is an external call checked + Heuristic: + - The call is a IF node + - It contains a, external call + - The condition is the negation (!) + + This will work only on naive implementation + """ + return isinstance(node.expression, UnaryOperation)\ + and node.expression.type == UnaryOperationType.BANG + + def _explore(self, node, visited, skip_father=None): + """ + Explore the CFG and look for re-entrancy + Heuristic: There is a re-entrancy if a state variable is written + after an external call + + node.context will contains the external calls executed + It contains the calls executed in father nodes + + if node.context is not empty, and variables are written, a re-entrancy is possible + """ + if node in visited: + return + + visited = visited + [node] + + # First we add the external calls executed in previous nodes + # send_eth returns the list of calls sending value + # calls returns the list of calls that can callback + # read returns the variable read + # read_prior_calls returns the variable read prior a call + fathers_context = {'send_eth':[], 'calls':[], 'read':[], 'read_prior_calls':{}} + + for father in node.fathers: + if self.KEY in father.context: + fathers_context['send_eth'] += [s for s in father.context[self.KEY]['send_eth'] if s!=skip_father] + fathers_context['calls'] += [c for c in father.context[self.KEY]['calls'] if c!=skip_father] + fathers_context['read'] += father.context[self.KEY]['read'] + fathers_context['read_prior_calls'] = union_dict(fathers_context['read_prior_calls'], father.context[self.KEY]['read_prior_calls']) + + # Exclude path that dont bring further information + if node in self.visited_all_paths: + if all(call in self.visited_all_paths[node]['calls'] for call in fathers_context['calls']): + if all(send in self.visited_all_paths[node]['send_eth'] for send in fathers_context['send_eth']): + if all(read in self.visited_all_paths[node]['read'] for read in fathers_context['read']): + if dict_are_equal(self.visited_all_paths[node]['read_prior_calls'], fathers_context['read_prior_calls']): + return + else: + self.visited_all_paths[node] = {'send_eth':[], 'calls':[], 'read':[], 'read_prior_calls':{}} + + self.visited_all_paths[node]['send_eth'] = list(set(self.visited_all_paths[node]['send_eth'] + fathers_context['send_eth'])) + self.visited_all_paths[node]['calls'] = list(set(self.visited_all_paths[node]['calls'] + fathers_context['calls'])) + self.visited_all_paths[node]['read'] = list(set(self.visited_all_paths[node]['read'] + fathers_context['read'])) + self.visited_all_paths[node]['read_prior_calls'] = union_dict(self.visited_all_paths[node]['read_prior_calls'], fathers_context['read_prior_calls']) + + node.context[self.KEY] = fathers_context + + state_vars_read = node.state_variables_read + + # All the state variables written + state_vars_written = node.state_variables_written + slithir_operations = [] + # Add the state variables written in internal calls + for internal_call in node.internal_calls: + # Filter to Function, as internal_call can be a solidity call + if isinstance(internal_call, Function): + state_vars_written += internal_call.all_state_variables_written() + state_vars_read += internal_call.all_state_variables_read() + slithir_operations += internal_call.all_slithir_operations() + + contains_call = False + node.context[self.KEY]['written'] = state_vars_written + if self._can_callback(node.irs + slithir_operations): + node.context[self.KEY]['calls'] = list(set(node.context[self.KEY]['calls'] + [node])) + node.context[self.KEY]['read_prior_calls'][node] = list(set(node.context[self.KEY]['read_prior_calls'].get(node, []) + node.context[self.KEY]['read']+ state_vars_read)) + contains_call = True + if self._can_send_eth(node.irs + slithir_operations): + node.context[self.KEY]['send_eth'] = list(set(node.context[self.KEY]['send_eth'] + [node])) + + node.context[self.KEY]['read'] = list(set(node.context[self.KEY]['read'] + state_vars_read)) + + sons = node.sons + if contains_call and node.type in [NodeType.IF, NodeType.IFLOOP]: + if self._filter_if(node): + son = sons[0] + self._explore(son, visited, node) + sons = sons[1:] + else: + son = sons[1] + self._explore(son, visited, node) + sons = [sons[0]] + + + for son in sons: + self._explore(son, visited) + + def detect_reentrancy(self, contract): + """ + """ + for function in contract.functions_and_modifiers_not_inherited: + if function.is_implemented: + if self.KEY in function.context: + continue + self._explore(function.entry_point, []) + function.context[self.KEY] = True + + def detect(self): + """ + """ + # if a node was already visited by another path + # we will only explore it if the traversal brings + # new variables written + # This speedup the exploration through a light fixpoint + # Its particular useful on 'complex' functions with several loops and conditions + self.visited_all_paths = {} + + for c in self.contracts: + self.detect_reentrancy(c) + + return [] diff --git a/slither/detectors/reentrancy/reentrancy_benign.py b/slither/detectors/reentrancy/reentrancy_benign.py index bfccf540c..0bed8b379 100644 --- a/slither/detectors/reentrancy/reentrancy_benign.py +++ b/slither/detectors/reentrancy/reentrancy_benign.py @@ -8,14 +8,15 @@ from slither.core.cfg.node import NodeType from slither.core.declarations import Function, SolidityFunction from slither.core.expressions import UnaryOperation, UnaryOperationType -from slither.detectors.abstract_detector import (AbstractDetector, - DetectorClassification) +from slither.detectors.abstract_detector import DetectorClassification from slither.visitors.expression.export_values import ExportValues from slither.slithir.operations import (HighLevelCall, LowLevelCall, LibraryCall, Send, Transfer) -class ReentrancyBenign(AbstractDetector): +from .reentrancy import Reentrancy + +class ReentrancyBenign(Reentrancy): ARGUMENT = 'reentrancy-benign' HELP = 'Benign reentrancy vulnerabilities' IMPACT = DetectorClassification.LOW @@ -23,170 +24,36 @@ class ReentrancyBenign(AbstractDetector): WIKI = 'https://github.com/trailofbits/slither/wiki/Vulnerabilities-Description#reentrancy-vulnerabilities-2' - key = 'REENTRANCY-BENIGN' - - @staticmethod - def _can_callback(node): - """ - Detect if the node contains a call that can - be used to re-entrance - - Consider as valid target: - - low level call - - high level call - - Do not consider Send/Transfer as there is not enough gas - """ - for ir in node.irs: - if isinstance(ir, LowLevelCall): - return True - if isinstance(ir, HighLevelCall) and not isinstance(ir, LibraryCall): - return True - return False - - @staticmethod - def _can_send_eth(node): - """ - Detect if the node can send eth - """ - for ir in node.irs: - if isinstance(ir, (HighLevelCall, LowLevelCall, Transfer, Send)): - if ir.call_value: - return True - return False - - def _filter_if(self, node): - """ - Check if the node is a condtional node where - there is an external call checked - Heuristic: - - The call is a IF node - - It contains a, external call - - The condition is the negation (!) - - This will work only on naive implementation - """ - return isinstance(node.expression, UnaryOperation)\ - and node.expression.type == UnaryOperationType.BANG - - def _explore(self, node, visited, skip_father=None): - """ - Explore the CFG and look for re-entrancy - Heuristic: There is a re-entrancy if a state variable is written - after an external call - - node.context will contains the external calls executed - It contains the calls executed in father nodes - - if node.context is not empty, and variables are written, a re-entrancy is possible - """ - if node in visited: - return - - visited = visited + [node] - - # First we add the external calls executed in previous nodes - # send_eth returns the list of calls sending value - # calls returns the list of calls that can callback - # read returns the variable read - fathers_context = {'send_eth':[], 'calls':[], 'read':[]} - - for father in node.fathers: - if self.key in father.context: - fathers_context['send_eth'] += [s for s in father.context[self.key]['send_eth'] if s!=skip_father] - fathers_context['calls'] += [c for c in father.context[self.key]['calls'] if c!=skip_father] - fathers_context['read'] += father.context[self.key]['read'] - - # Exclude path that dont bring further information - if node in self.visited_all_paths: - if all(call in self.visited_all_paths[node]['calls'] for call in fathers_context['calls']): - if all(send in self.visited_all_paths[node]['send_eth'] for send in fathers_context['send_eth']): - if all(read in self.visited_all_paths[node]['read'] for read in fathers_context['read']): - return - else: - self.visited_all_paths[node] = {'send_eth':[], 'calls':[], 'read':[]} - - self.visited_all_paths[node]['send_eth'] = list(set(self.visited_all_paths[node]['send_eth'] + fathers_context['send_eth'])) - self.visited_all_paths[node]['calls'] = list(set(self.visited_all_paths[node]['calls'] + fathers_context['calls'])) - self.visited_all_paths[node]['read'] = list(set(self.visited_all_paths[node]['read'] + fathers_context['read'])) - - node.context[self.key] = fathers_context - - state_vars_read = node.state_variables_read - - # All the state variables written - state_vars_written = node.state_variables_written - # Add the state variables written in internal calls - for internal_call in node.internal_calls: - # Filter to Function, as internal_call can be a solidity call - if isinstance(internal_call, Function): - state_vars_written += internal_call.all_state_variables_written() - state_vars_read += internal_call.all_state_variables_read() - - contains_call = False - if self._can_callback(node): - node.context[self.key]['calls'] = list(set(node.context[self.key]['calls'] + [node])) - contains_call = True - if self._can_send_eth(node): - node.context[self.key]['send_eth'] = list(set(node.context[self.key]['send_eth'] + [node])) - - not_read_then_written = [(v, node) for v in state_vars_written if v not in node.context[self.key]['read']] - node.context[self.key]['read'] = list(set(node.context[self.key]['read'] + state_vars_read)) - - # If a state variables was read and is then written, there is a dangerous call and - # ether were sent - # We found a potential re-entrancy bug - if (not_read_then_written and - node.context[self.key]['calls']): - # calls are ordered - finding_key = (node.function, - tuple(set(node.context[self.key]['calls'])), - tuple(set(node.context[self.key]['send_eth']))) - finding_vars = not_read_then_written - if finding_key not in self.result: - self.result[finding_key] = [] - self.result[finding_key] = list(set(self.result[finding_key] + finding_vars)) - - sons = node.sons - if contains_call and node.type in [NodeType.IF, NodeType.IFLOOP]: - if self._filter_if(node): - son = sons[0] - self._explore(son, visited, node) - sons = sons[1:] - else: - son = sons[1] - self._explore(son, visited, node) - sons = [sons[0]] - - - for son in sons: - self._explore(son, visited) - - def detect_reentrancy(self, contract): - """ - """ - for function in contract.functions: - if function.is_implemented: - self._explore(function.entry_point, []) + def find_reentrancies(self): + result = {} + for contract in self.contracts: + for f in contract.functions_and_modifiers_not_inherited: + for node in f.nodes: + if node.context[self.KEY]['calls']: + not_read_then_written = [(v, node) for v in node.context[self.KEY]['written'] + if v not in node.context[self.KEY]['read']] + if not_read_then_written: + + # calls are ordered + finding_key = (node.function, + tuple(set(node.context[self.KEY]['calls'])), + tuple(set(node.context[self.KEY]['send_eth']))) + finding_vars = not_read_then_written + if finding_key not in result: + result[finding_key] = [] + result[finding_key] = list(set(result[finding_key] + finding_vars)) + return result def detect(self): """ """ - self.result = {} - # if a node was already visited by another path - # we will only explore it if the traversal brings - # new variables written - # This speedup the exploration through a light fixpoint - # Its particular useful on 'complex' functions with several loops and conditions - self.visited_all_paths = {} - - for c in self.contracts: - self.detect_reentrancy(c) + super().detect() + reentrancies = self.find_reentrancies() results = [] - result_sorted = sorted(list(self.result.items()), key=lambda x:x[0][0].name) + result_sorted = sorted(list(reentrancies.items()), key=lambda x:x[0][0].name) for (func, calls, send_eth), varsWritten in result_sorted: calls = list(set(calls)) send_eth = list(set(send_eth)) @@ -195,7 +62,7 @@ class ReentrancyBenign(AbstractDetector): info += '\tExternal calls:\n' for call_info in calls: info += '\t- {} ({})\n'.format(call_info.expression, call_info.source_mapping_str) - if calls != send_eth: + if calls != send_eth and send_eth: info += '\tExternal calls sending eth:\n' for call_info in send_eth: info += '\t- {} ({})\n'.format(call_info.expression, call_info.source_mapping_str) diff --git a/slither/detectors/reentrancy/reentrancy_eth.py b/slither/detectors/reentrancy/reentrancy_eth.py index baf872e4c..baa0ae73d 100644 --- a/slither/detectors/reentrancy/reentrancy_eth.py +++ b/slither/detectors/reentrancy/reentrancy_eth.py @@ -8,13 +8,14 @@ from slither.core.cfg.node import NodeType from slither.core.declarations import Function, SolidityFunction from slither.core.expressions import UnaryOperation, UnaryOperationType -from slither.detectors.abstract_detector import (AbstractDetector, - DetectorClassification) +from slither.detectors.abstract_detector import DetectorClassification from slither.slithir.operations import (HighLevelCall, LowLevelCall, LibraryCall, Send, Transfer) -class ReentrancyEth(AbstractDetector): + +from .reentrancy import Reentrancy +class ReentrancyEth(Reentrancy): ARGUMENT = 'reentrancy-eth' HELP = 'Reentrancy vulnerabilities (theft of ethers)' IMPACT = DetectorClassification.HIGH @@ -24,175 +25,38 @@ class ReentrancyEth(AbstractDetector): key = 'REENTRANCY-ETHERS' - @staticmethod - def _can_callback(node): - """ - Detect if the node contains a call that can - be used to re-entrance - - Consider as valid target: - - low level call - - high level call - - Do not consider Send/Transfer as there is not enough gas - """ - for ir in node.irs: - if isinstance(ir, LowLevelCall): - return True - if isinstance(ir, HighLevelCall) and not isinstance(ir, LibraryCall): - return True - return False - - @staticmethod - def _can_send_eth(node): - """ - Detect if the node can send eth - """ - for ir in node.irs: - if isinstance(ir, (HighLevelCall, LowLevelCall, Transfer, Send)): - if ir.call_value: - return True - return False - - def _filter_if(self, node): - """ - Check if the node is a condtional node where - there is an external call checked - Heuristic: - - The call is a IF node - - It contains a, external call - - The condition is the negation (!) - - This will work only on naive implementation - """ - return isinstance(node.expression, UnaryOperation)\ - and node.expression.type == UnaryOperationType.BANG - - def _explore(self, node, visited, skip_father=None): - """ - Explore the CFG and look for re-entrancy - Heuristic: There is a re-entrancy if a state variable is written - after an external call - - node.context will contains the external calls executed - It contains the calls executed in father nodes - - if node.context is not empty, and variables are written, a re-entrancy is possible - """ - if node in visited: - return - - visited = visited + [node] - - # First we add the external calls executed in previous nodes - # send_eth returns the list of calls sending value - # calls returns the list of calls that can callback - # read returns the variable read - fathers_context = {'send_eth':[], 'calls':[], 'read':[], 'read_prior_calls':[]} - - for father in node.fathers: - if self.key in father.context: - fathers_context['send_eth'] += [s for s in father.context[self.key]['send_eth'] if s!=skip_father] - fathers_context['calls'] += [c for c in father.context[self.key]['calls'] if c!=skip_father] - fathers_context['read'] += father.context[self.key]['read'] - fathers_context['read_prior_calls'] += father.context[self.key]['read_prior_calls'] - - # Exclude path that dont bring further information - if node in self.visited_all_paths: - if all(call in self.visited_all_paths[node]['calls'] for call in fathers_context['calls']): - if all(send in self.visited_all_paths[node]['send_eth'] for send in fathers_context['send_eth']): - if all(read in self.visited_all_paths[node]['read'] for read in fathers_context['read']): - if all(read in self.visited_all_paths[node]['read_prior_calls'] for read in fathers_context['read_prior_calls']): - return - else: - self.visited_all_paths[node] = {'send_eth':[], 'calls':[], 'read':[], 'read_prior_calls':[]} - - self.visited_all_paths[node]['send_eth'] = list(set(self.visited_all_paths[node]['send_eth'] + fathers_context['send_eth'])) - self.visited_all_paths[node]['calls'] = list(set(self.visited_all_paths[node]['calls'] + fathers_context['calls'])) - self.visited_all_paths[node]['read'] = list(set(self.visited_all_paths[node]['read'] + fathers_context['read'])) - self.visited_all_paths[node]['read_prior_calls'] = list(set(self.visited_all_paths[node]['read_prior_calls'] + fathers_context['read_prior_calls'])) - - node.context[self.key] = fathers_context - - state_vars_read = node.state_variables_read - - # All the state variables written - state_vars_written = node.state_variables_written - # Add the state variables written in internal calls - for internal_call in node.internal_calls: - # Filter to Function, as internal_call can be a solidity call - if isinstance(internal_call, Function): - state_vars_written += internal_call.all_state_variables_written() - state_vars_read += internal_call.all_state_variables_read() - - contains_call = False - if self._can_callback(node): - node.context[self.key]['calls'] = list(set(node.context[self.key]['calls'] + [node])) - node.context[self.key]['read_prior_calls'] = list(set(node.context[self.key]['read_prior_calls'] + node.context[self.key]['read']+ state_vars_read)) - node.context[self.key]['read'] = [] - contains_call = True - if self._can_send_eth(node): - node.context[self.key]['send_eth'] = list(set(node.context[self.key]['send_eth'] + [node])) - - - read_then_written = [(v, node) for v in state_vars_written if v in node.context[self.key]['read_prior_calls']] - - node.context[self.key]['read'] = list(set(node.context[self.key]['read'] + state_vars_read)) - # If a state variables was read and is then written, there is a dangerous call and - # ether were sent - # We found a potential re-entrancy bug - if (read_then_written and - node.context[self.key]['calls'] and - node.context[self.key]['send_eth']): - # calls are ordered - finding_key = (node.function, - tuple(set(node.context[self.key]['calls'])), - tuple(set(node.context[self.key]['send_eth']))) - finding_vars = read_then_written - if finding_key not in self.result: - self.result[finding_key] = [] - self.result[finding_key] = list(set(self.result[finding_key] + finding_vars)) - - sons = node.sons - if contains_call and node.type in [NodeType.IF, NodeType.IFLOOP]: - if self._filter_if(node): - son = sons[0] - self._explore(son, visited, node) - sons = sons[1:] - else: - son = sons[1] - self._explore(son, visited, node) - sons = [sons[0]] - - - for son in sons: - self._explore(son, visited) - - def detect_reentrancy(self, contract): - """ - """ - for function in contract.functions: - if function.is_implemented: - self._explore(function.entry_point, []) + def find_reentrancies(self): + result = {} + for contract in self.contracts: + for f in contract.functions_and_modifiers_not_inherited: + for node in f.nodes: + if node.context[self.KEY]['calls'] and node.context[self.KEY]['send_eth']: + read_then_written = [] + for c in node.context[self.KEY]['calls']: + read_then_written += [(v, node) for v in node.context[self.KEY]['written'] + if v in node.context[self.KEY]['read_prior_calls'][c]] + + if read_then_written: + # calls are ordered + finding_key = (node.function, + tuple(set(node.context[self.KEY]['calls'])), + tuple(set(node.context[self.KEY]['send_eth']))) + finding_vars = read_then_written + if finding_key not in result: + result[finding_key] = [] + result[finding_key] = list(set(result[finding_key] + finding_vars)) + return result def detect(self): """ """ - self.result = {} - - # if a node was already visited by another path - # we will only explore it if the traversal brings - # new variables written - # This speedup the exploration through a light fixpoint - # Its particular useful on 'complex' functions with several loops and conditions - self.visited_all_paths = {} + super().detect() - for c in self.contracts: - self.detect_reentrancy(c) + reentrancies = self.find_reentrancies() results = [] - result_sorted = sorted(list(self.result.items()), key=lambda x:x[0][0].name) + result_sorted = sorted(list(reentrancies.items()), key=lambda x:x[0][0].name) for (func, calls, send_eth), varsWritten in result_sorted: calls = list(set(calls)) send_eth = list(set(send_eth)) diff --git a/slither/detectors/reentrancy/reentrancy_read_before_write.py b/slither/detectors/reentrancy/reentrancy_read_before_write.py index 16b482448..a72943149 100644 --- a/slither/detectors/reentrancy/reentrancy_read_before_write.py +++ b/slither/detectors/reentrancy/reentrancy_read_before_write.py @@ -8,14 +8,16 @@ from slither.core.cfg.node import NodeType from slither.core.declarations import Function, SolidityFunction from slither.core.expressions import UnaryOperation, UnaryOperationType -from slither.detectors.abstract_detector import (AbstractDetector, - DetectorClassification) +from slither.detectors.abstract_detector import DetectorClassification from slither.visitors.expression.export_values import ExportValues from slither.slithir.operations import (HighLevelCall, LowLevelCall, LibraryCall, Send, Transfer) -class ReentrancyReadBeforeWritten(AbstractDetector): + +from .reentrancy import Reentrancy + +class ReentrancyReadBeforeWritten(Reentrancy): ARGUMENT = 'reentrancy-no-eth' HELP = 'Reentrancy vulnerabilities (no theft of ethers)' IMPACT = DetectorClassification.MEDIUM @@ -23,174 +25,38 @@ class ReentrancyReadBeforeWritten(AbstractDetector): WIKI = 'https://github.com/trailofbits/slither/wiki/Vulnerabilities-Description#reentrancy-vulnerabilities-1' - key = 'REENTRANCY-NO-ETHER' - - @staticmethod - def _can_callback(node): - """ - Detect if the node contains a call that can - be used to re-entrance - - Consider as valid target: - - low level call - - high level call - - Do not consider Send/Transfer as there is not enough gas - """ - for ir in node.irs: - if isinstance(ir, LowLevelCall): - return True - if isinstance(ir, HighLevelCall) and not isinstance(ir, LibraryCall): - return True - return False - - @staticmethod - def _can_send_eth(node): - """ - Detect if the node can send eth - """ - for ir in node.irs: - if isinstance(ir, (HighLevelCall, LowLevelCall, Transfer, Send)): - if ir.call_value: - return True - return False - - def _filter_if(self, node): - """ - Check if the node is a condtional node where - there is an external call checked - Heuristic: - - The call is a IF node - - It contains a, external call - - The condition is the negation (!) - - This will work only on naive implementation - """ - return isinstance(node.expression, UnaryOperation)\ - and node.expression.type == UnaryOperationType.BANG - - def _explore(self, node, visited, skip_father=None): - """ - Explore the CFG and look for re-entrancy - Heuristic: There is a re-entrancy if a state variable is written - after an external call - - node.context will contains the external calls executed - It contains the calls executed in father nodes - - if node.context is not empty, and variables are written, a re-entrancy is possible - """ - if node in visited: - return - - visited = visited + [node] - - # First we add the external calls executed in previous nodes - # send_eth returns the list of calls sending value - # calls returns the list of calls that can callback - # read returns the variable read - fathers_context = {'send_eth':[], 'calls':[], 'read':[], 'read_prior_calls':[]} - - for father in node.fathers: - if self.key in father.context: - fathers_context['send_eth'] += [s for s in father.context[self.key]['send_eth'] if s!=skip_father] - fathers_context['calls'] += [c for c in father.context[self.key]['calls'] if c!=skip_father] - fathers_context['read'] += father.context[self.key]['read'] - fathers_context['read_prior_calls'] += father.context[self.key]['read_prior_calls'] - - # Exclude path that dont bring further information - if node in self.visited_all_paths: - if all(call in self.visited_all_paths[node]['calls'] for call in fathers_context['calls']): - if all(send in self.visited_all_paths[node]['send_eth'] for send in fathers_context['send_eth']): - if all(read in self.visited_all_paths[node]['read'] for read in fathers_context['read']): - if all(read in self.visited_all_paths[node]['read_prior_calls'] for read in fathers_context['read_prior_calls']): - return - else: - self.visited_all_paths[node] = {'send_eth':[], 'calls':[], 'read':[], 'read_prior_calls':[]} - - self.visited_all_paths[node]['send_eth'] = list(set(self.visited_all_paths[node]['send_eth'] + fathers_context['send_eth'])) - self.visited_all_paths[node]['calls'] = list(set(self.visited_all_paths[node]['calls'] + fathers_context['calls'])) - self.visited_all_paths[node]['read'] = list(set(self.visited_all_paths[node]['read'] + fathers_context['read'])) - self.visited_all_paths[node]['read_prior_calls'] = list(set(self.visited_all_paths[node]['read_prior_calls'] + fathers_context['read_prior_calls'])) - - node.context[self.key] = fathers_context - - state_vars_read = node.state_variables_read - # All the state variables written - state_vars_written = node.state_variables_written - # Add the state variables written in internal calls - for internal_call in node.internal_calls: - # Filter to Function, as internal_call can be a solidity call - if isinstance(internal_call, Function): - state_vars_written += internal_call.all_state_variables_written() - state_vars_read += internal_call.all_state_variables_read() - - contains_call = False - if self._can_callback(node): - node.context[self.key]['calls'] = list(set(node.context[self.key]['calls'] + [node])) - node.context[self.key]['read_prior_calls'] = list(set(node.context[self.key]['read_prior_calls'] + node.context[self.key]['read'] + state_vars_read)) - node.context[self.key]['read'] = [] - contains_call = True - if self._can_send_eth(node): - node.context[self.key]['send_eth'] = list(set(node.context[self.key]['send_eth'] + [node])) - - read_then_written = [(v, node) for v in state_vars_written if v in node.context[self.key]['read_prior_calls']] - - node.context[self.key]['read'] = list(set(node.context[self.key]['read'] + state_vars_read)) - # If a state variables was read and is then written, there is a dangerous call and - # ether were sent - # We found a potential re-entrancy bug - if (read_then_written and - node.context[self.key]['calls'] and - not node.context[self.key]['send_eth']): - # calls are ordered - finding_key = (node.function, - tuple(set(node.context[self.key]['calls']))) - finding_vars = read_then_written - if finding_key not in self.result: - self.result[finding_key] = [] - self.result[finding_key] = list(set(self.result[finding_key] + finding_vars)) - - sons = node.sons - if contains_call and node.type in [NodeType.IF, NodeType.IFLOOP]: - if self._filter_if(node): - son = sons[0] - self._explore(son, visited, node) - sons = sons[1:] - else: - son = sons[1] - self._explore(son, visited, node) - sons = [sons[0]] - - - for son in sons: - self._explore(son, visited) - - def detect_reentrancy(self, contract): - """ - """ - for function in contract.functions: - if function.is_implemented: - self._explore(function.entry_point, []) + def find_reentrancies(self): + result = {} + for contract in self.contracts: + for f in contract.functions_and_modifiers_not_inherited: + for node in f.nodes: + if node.context[self.KEY]['calls'] and not node.context[self.KEY]['send_eth']: + read_then_written = [] + for c in node.context[self.KEY]['calls']: + read_then_written += [(v, node) for v in node.context[self.KEY]['written'] + if v in node.context[self.KEY]['read_prior_calls'][c]] + + # We found a potential re-entrancy bug + if read_then_written: + # calls are ordered + finding_key = (node.function, + tuple(set(node.context[self.KEY]['calls']))) + finding_vars = read_then_written + if finding_key not in self.result: + result[finding_key] = [] + result[finding_key] = list(set(result[finding_key] + finding_vars)) + return result def detect(self): """ """ - self.result = {} - - # if a node was already visited by another path - # we will only explore it if the traversal brings - # new variables written - # This speedup the exploration through a light fixpoint - # Its particular useful on 'complex' functions with several loops and conditions - self.visited_all_paths = {} - for c in self.contracts: - self.detect_reentrancy(c) + super().detect() + reentrancies = self.find_reentrancies() results = [] - result_sorted = sorted(list(self.result.items()), key=lambda x:x[0][0].name) + result_sorted = sorted(list(reentrancies.items()), key=lambda x:x[0][0].name) for (func, calls), varsWritten in result_sorted: calls = list(set(calls)) info = 'Reentrancy in {}.{} ({}):\n'