Merge pull request #266 from crytic/dev-std-redirect

Opt-in JSON result types + JSON fields for console log/compilation
pull/303/head
Feist Josselin 5 years ago committed by GitHub
commit 0ddec85cc4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      CONTRIBUTING.md
  2. 186
      slither/__main__.py
  3. 7
      slither/core/slither_core.py
  4. 13
      slither/core/source_mapping/source_mapping.py
  5. 19
      slither/slither.py
  6. 30
      slither/utils/command_line.py
  7. 90
      slither/utils/output_capture.py

@ -1,4 +1,4 @@
# Contributing to Manticore
# Contributing to Slither
First, thanks for your interest in contributing to Slither! We welcome and appreciate all contributions, including bug reports, feature suggestions, tutorials/blog posts, and code improvements.
If you're unsure where to start, we recommend our [`good first issue`](https://github.com/crytic/slither/issues?q=is%3Aissue+is%3Aopen+label%3A%22good+first+issue%22) and [`help wanted`](https://github.com/crytic/slither/issues?q=is%3Aissue+is%3Aopen+label%3A%22help+wanted%22) issue labels.

@ -6,12 +6,12 @@ import inspect
import json
import logging
import os
import subprocess
import sys
import traceback
from pkg_resources import iter_entry_points, require
from crytic_compile import cryticparser
from crytic_compile.platform.standard import generate_standard_export
from slither.detectors import all_detectors
from slither.detectors.abstract_detector import (AbstractDetector,
@ -19,12 +19,13 @@ from slither.detectors.abstract_detector import (AbstractDetector,
from slither.printers import all_printers
from slither.printers.abstract_printer import AbstractPrinter
from slither.slither import Slither
from slither.utils.output_capture import StandardOutputCapture
from slither.utils.colors import red, yellow, set_colorization_enabled
from slither.utils.command_line import (output_detectors, output_results_to_markdown,
output_detectors_json, output_printers,
output_detectors_json, output_printers, output_printers_json,
output_to_markdown, output_wiki, defaults_flag_in_config,
read_config_file)
from crytic_compile import is_supported
read_config_file, JSON_OUTPUT_TYPES)
from crytic_compile import compile_all, is_supported
from slither.exceptions import SlitherException
logging.basicConfig()
@ -36,7 +37,8 @@ logger = logging.getLogger("Slither")
###################################################################################
###################################################################################
def process(filename, args, detector_classes, printer_classes):
def process_single(target, args, detector_classes, printer_classes):
"""
The core high-level code for running Slither static analysis.
@ -46,12 +48,26 @@ def process(filename, args, detector_classes, printer_classes):
ast = '--ast-compact-json'
if args.legacy_ast:
ast = '--ast-json'
slither = Slither(filename,
slither = Slither(target,
ast_format=ast,
**vars(args))
return _process(slither, detector_classes, printer_classes)
def process_all(target, args, detector_classes, printer_classes):
compilations = compile_all(target, **vars(args))
slither_instances = []
results = []
analyzed_contracts_count = 0
for compilation in compilations:
(slither, current_results, current_analyzed_count) = process_single(compilation, args, detector_classes, printer_classes)
results.extend(current_results)
slither_instances.append(slither)
analyzed_contracts_count += current_analyzed_count
return slither_instances, results, analyzed_contracts_count
def _process(slither, detector_classes, printer_classes):
for detector_cls in detector_classes:
slither.register_detector(detector_cls)
@ -72,10 +88,10 @@ def _process(slither, detector_classes, printer_classes):
slither.run_printers() # Currently printers does not return results
return results, analyzed_contracts_count
return slither, results, analyzed_contracts_count
def process_files(filenames, args, detector_classes, printer_classes):
def process_from_asts(filenames, args, detector_classes, printer_classes):
all_contracts = []
for filename in filenames:
@ -83,10 +99,9 @@ def process_files(filenames, args, detector_classes, printer_classes):
contract_loaded = json.load(f)
all_contracts.append(contract_loaded['ast'])
slither = Slither(all_contracts,
**vars(args))
return process_single(all_contracts, args, detector_classes, printer_classes)
return _process(slither, detector_classes, printer_classes)
# endregion
###################################################################################
@ -96,26 +111,15 @@ def process_files(filenames, args, detector_classes, printer_classes):
###################################################################################
def wrap_json_detectors_results(success, error_message, results=None):
"""
Wrap the detector results.
:param success:
:param error_message:
:param results:
:return:
"""
results_json = {}
if results:
results_json['detectors'] = results
return {
"success": success,
"error": error_message,
"results": results_json
def output_json(filename, error, results):
# Create our encapsulated JSON result.
json_result = {
"success": error is None,
"error": error,
"results": results
}
def output_json(results, filename):
json_result = wrap_json_detectors_results(True, None, results)
# Determine if we should output to stdout
if filename is None:
# Write json to console
print(json.dumps(json_result))
@ -269,7 +273,7 @@ def parse_args(detector_classes, printer_classes):
group_detector = parser.add_argument_group('Detectors')
group_printer = parser.add_argument_group('Printers')
group_misc = parser.add_argument_group('Additional option')
group_misc = parser.add_argument_group('Additional options')
group_detector.add_argument('--detect',
help='Comma-separated list of detectors, defaults to all, '
@ -335,12 +339,17 @@ def parse_args(detector_classes, printer_classes):
action='store_true',
default=defaults_flag_in_config['exclude_high'])
group_misc.add_argument('--json',
help='Export the results as a JSON file ("--json -" to export to stdout)',
action='store',
default=defaults_flag_in_config['json'])
group_misc.add_argument('--json-types',
help='Comma-separated list of result types to output to JSON, defaults to all, '
'available types: {}'.format(
', '.join(output_type for output_type in JSON_OUTPUT_TYPES)),
action='store',
default=defaults_flag_in_config['json-types'])
group_misc.add_argument('--disable-color',
help='Disable output colorization',
@ -381,7 +390,6 @@ def parse_args(detector_classes, printer_classes):
action=OutputMarkdown,
default=False)
group_misc.add_argument('--checklist',
help=argparse.SUPPRESS,
action='store_true',
@ -423,6 +431,12 @@ def parse_args(detector_classes, printer_classes):
args.filter_paths = parse_filter_paths(args)
# Verify our json-type output is valid
args.json_types = set(args.json_types.split(','))
for json_type in args.json_types:
if json_type not in JSON_OUTPUT_TYPES:
raise Exception(f"Error: \"{json_type}\" is not a valid JSON result output type.")
return args
class ListDetectors(argparse.Action):
@ -434,7 +448,8 @@ class ListDetectors(argparse.Action):
class ListDetectorsJson(argparse.Action):
def __call__(self, parser, *args, **kwargs):
detectors, _ = get_detectors_and_printers()
output_detectors_json(detectors)
detector_types_json = output_detectors_json(detectors)
print(json.dumps(detector_types_json))
parser.exit()
class ListPrinters(argparse.Action):
@ -501,10 +516,16 @@ def main_impl(all_detector_classes, all_printer_classes):
# Set colorization option
set_colorization_enabled(not args.disable_color)
# If we are outputting json to stdout, we'll want to disable any logging.
stdout_json = args.json == "-"
if stdout_json:
logging.disable(logging.CRITICAL)
# Define some variables for potential JSON output
json_results = {}
output_error = None
outputting_json = args.json is not None
outputting_json_stdout = args.json == '-'
# If we are outputting JSON, capture all standard output. If we are outputting to stdout, we block typical stdout
# output.
if outputting_json:
StandardOutputCapture.enable(outputting_json_stdout)
printer_classes = choose_printers(args, all_printer_classes)
detector_classes = choose_detectors(args, all_detector_classes)
@ -540,33 +561,56 @@ def main_impl(all_detector_classes, all_printer_classes):
try:
filename = args.filename
globbed_filenames = glob.glob(filename, recursive=True)
if os.path.isfile(filename) or is_supported(filename):
(results, number_contracts) = process(filename, args, detector_classes, printer_classes)
elif os.path.isdir(filename) or len(globbed_filenames) > 0:
extension = "*.sol" if not args.solc_ast else "*.json"
filenames = glob.glob(os.path.join(filename, extension))
# Determine if we are handling ast from solc
if args.solc_ast or (filename.endswith('.json') and not is_supported(filename)):
globbed_filenames = glob.glob(filename, recursive=True)
filenames = glob.glob(os.path.join(filename, "*.json"))
if not filenames:
filenames = globbed_filenames
number_contracts = 0
results = []
if args.splitted and args.solc_ast:
(results, number_contracts) = process_files(filenames, args, detector_classes, printer_classes)
slither_instances = []
if args.splitted:
(slither_instance, results, number_contracts) = process_from_asts(filenames, args, detector_classes, printer_classes)
slither_instances.append(slither_instance)
else:
for filename in filenames:
(results_tmp, number_contracts_tmp) = process(filename, args, detector_classes, printer_classes)
(slither_instance, results_tmp, number_contracts_tmp) = process_single(filename, args, detector_classes, printer_classes)
number_contracts += number_contracts_tmp
results += results_tmp
slither_instances.append(slither_instance)
# Rely on CryticCompile to discern the underlying type of compilations.
else:
raise Exception("Unrecognised file/dir path: '#{filename}'".format(filename=filename))
if args.json:
output_json(results, None if stdout_json else args.json)
(slither_instances, results, number_contracts) = process_all(filename, args, detector_classes, printer_classes)
# Determine if we are outputting JSON
if outputting_json:
# Add our compilation information to JSON
if 'compilations' in args.json_types:
compilation_results = []
for slither_instance in slither_instances:
compilation_results.append(generate_standard_export(slither_instance.crytic_compile))
json_results['compilations'] = compilation_results
# Add our detector results to JSON if desired.
if results and 'detectors' in args.json_types:
json_results['detectors'] = results
# Add our detector types to JSON
if 'list-detectors' in args.json_types:
detectors, _ = get_detectors_and_printers()
json_results['list-detectors'] = output_detectors_json(detectors)
# Add our detector types to JSON
if 'list-printers' in args.json_types:
_, printers = get_detectors_and_printers()
json_results['list-printers'] = output_printers_json(printers)
# Output our results to markdown if we wish to compile a checklist.
if args.checklist:
output_results_to_markdown(results)
# Dont print the number of result for printers
if number_contracts == 0:
logger.warn(red('No contract was analyzed'))
@ -576,27 +620,33 @@ def main_impl(all_detector_classes, all_printer_classes):
logger.info('%s analyzed (%d contracts), %d result(s) found', filename, number_contracts, len(results))
if args.ignore_return_value:
return
exit(results)
except SlitherException as se:
# Output our error accordingly, via JSON or logging.
if stdout_json:
print(json.dumps(wrap_json_detectors_results(False, str(se), [])))
else:
logging.error(red('Error:'))
logging.error(red(se))
logging.error('Please report an issue to https://github.com/crytic/slither/issues')
sys.exit(-1)
output_error = str(se)
logging.error(red('Error:'))
logging.error(red(output_error))
logging.error('Please report an issue to https://github.com/crytic/slither/issues')
except Exception:
# Output our error accordingly, via JSON or logging.
if stdout_json:
print(json.dumps(wrap_json_detectors_results(False, traceback.format_exc(), [])))
else:
logging.error('Error in %s' % args.filename)
logging.error(traceback.format_exc())
output_error = traceback.format_exc()
logging.error('Error in %s' % args.filename)
logging.error(output_error)
# If we are outputting JSON, capture the redirected output and disable the redirect to output the final JSON.
if outputting_json:
if 'console' in args.json_types:
json_results['console'] = {
'stdout': StandardOutputCapture.get_stdout_output(),
'stderr': StandardOutputCapture.get_stderr_output()
}
StandardOutputCapture.disable()
output_json(None if outputting_json_stdout else args.json, output_error, json_results)
# Exit with the appropriate status code
if output_error:
sys.exit(-1)
else:
exit(results)
if __name__ == '__main__':

@ -61,8 +61,11 @@ class Slither(Context):
:param path:
:return:
"""
with open(path, encoding='utf8', newline='') as f:
self.source_code[path] = f.read()
if path in self.crytic_compile.src_content:
self.source_code[path] = self.crytic_compile.src_content[path]
else:
with open(path, encoding='utf8', newline='') as f:
self.source_code[path] = f.read()
# endregion
###################################################################################

@ -90,18 +90,23 @@ class SourceMapping(Context):
is_dependency = slither.crytic_compile.is_dependency(filename_absolute)
if filename_absolute in slither.source_code:
if filename_absolute in slither.source_code or filename_absolute in slither.crytic_compile.src_content:
filename = filename_absolute
elif filename_relative in slither.source_code:
filename = filename_relative
elif filename_short in slither.source_code:
filename = filename_short
else:#
filename = filename_used.used
else:
filename = filename_used
else:
filename = filename_used
if filename in slither.source_code:
if slither.crytic_compile and filename in slither.crytic_compile.src_content:
source_code = slither.crytic_compile.src_content[filename]
(lines, starting_column, ending_column) = SourceMapping._compute_line(source_code,
s,
l)
elif filename in slither.source_code:
source_code = slither.source_code[filename]
(lines, starting_column, ending_column) = SourceMapping._compute_line(source_code,
s,

@ -22,14 +22,14 @@ logger_printer = logging.getLogger("Printers")
class Slither(SlitherSolc):
def __init__(self, contract, **kwargs):
def __init__(self, target, **kwargs):
'''
Args:
contract (str| list(json))
target (str | list(json) | CryticCompile)
Keyword Args:
solc (str): solc binary location (default 'solc')
disable_solc_warnings (bool): True to disable solc warnings (default false)
solc_argeuments (str): solc arguments (default '')
solc_arguments (str): solc arguments (default '')
ast_format (str): ast format (default '--ast-compact-json')
filter_paths (list(str)): list of path to filter (default [])
triage_mode (bool): if true, switch to triage mode (default false)
@ -46,14 +46,17 @@ class Slither(SlitherSolc):
'''
# list of files provided (see --splitted option)
if isinstance(contract, list):
self._init_from_list(contract)
elif contract.endswith('.json'):
self._init_from_raw_json(contract)
if isinstance(target, list):
self._init_from_list(target)
elif isinstance(target, str) and target.endswith('.json'):
self._init_from_raw_json(target)
else:
super(Slither, self).__init__('')
try:
crytic_compile = CryticCompile(contract, **kwargs)
if isinstance(target, CryticCompile):
crytic_compile = target
else:
crytic_compile = CryticCompile(target, **kwargs)
self._crytic_compile = crytic_compile
except InvalidCompilation as e:
raise SlitherError('Invalid compilation: \n'+str(e))

@ -10,6 +10,10 @@ from .colors import yellow, red
logger = logging.getLogger("Slither")
DEFAULT_JSON_OUTPUT_TYPES = ["detectors"]
JSON_OUTPUT_TYPES = ["compilations", "console", "detectors", "list-detectors", "list-printers"]
# Those are the flags shared by the command line and the config file
defaults_flag_in_config = {
'detectors_to_run': 'all',
@ -22,6 +26,7 @@ defaults_flag_in_config = {
'exclude_medium': False,
'exclude_high': False,
'json': None,
'json-types': ','.join(DEFAULT_JSON_OUTPUT_TYPES),
'disable_color': False,
'filter_paths': None,
# debug command
@ -196,6 +201,7 @@ def output_detectors(detector_classes):
idx = idx + 1
print(table)
def output_detectors_json(detector_classes):
detectors_list = []
for detector in detector_classes:
@ -234,7 +240,7 @@ def output_detectors_json(detector_classes):
'exploit_scenario':exploit,
'recommendation':recommendation})
idx = idx + 1
print(json.dumps(table))
return table
def output_printers(printer_classes):
printers_list = []
@ -253,3 +259,25 @@ def output_printers(printer_classes):
table.add_row([idx, argument, help_info])
idx = idx + 1
print(table)
def output_printers_json(printer_classes):
printers_list = []
for printer in printer_classes:
argument = printer.ARGUMENT
help_info = printer.HELP
printers_list.append((argument,
help_info))
# Sort by name
printers_list = sorted(printers_list, key=lambda element: (element[0]))
idx = 1
table = []
for (argument, help_info) in printers_list:
table.append({'index': idx,
'check': argument,
'title': help_info})
idx = idx + 1
return table

@ -0,0 +1,90 @@
import io
import logging
import sys
class CapturingStringIO(io.StringIO):
"""
I/O implementation which captures output, and optionally mirrors it to the original I/O stream it replaces.
"""
def __init__(self, original_io=None):
super(CapturingStringIO, self).__init__()
self.original_io = original_io
def write(self, s):
super().write(s)
if self.original_io:
self.original_io.write(s)
class StandardOutputCapture:
"""
Redirects and captures standard output/errors.
"""
original_stdout = None
original_stderr = None
original_logger_handlers = None
@staticmethod
def enable(block_original=True):
"""
Redirects stdout and stderr to a capturable StringIO.
:param block_original: If True, blocks all output to the original stream. If False, duplicates output.
:return: None
"""
# Redirect stdout
if StandardOutputCapture.original_stdout is None:
StandardOutputCapture.original_stdout = sys.stdout
sys.stdout = CapturingStringIO(None if block_original else StandardOutputCapture.original_stdout)
# Redirect stderr
if StandardOutputCapture.original_stderr is None:
StandardOutputCapture.original_stderr = sys.stderr
sys.stderr = CapturingStringIO(None if block_original else StandardOutputCapture.original_stderr)
# Backup and swap root logger handlers
root_logger = logging.getLogger()
StandardOutputCapture.original_logger_handlers = root_logger.handlers
root_logger.handlers = [logging.StreamHandler(sys.stderr)]
@staticmethod
def disable():
"""
Disables redirection of stdout/stderr, if previously enabled.
:return: None
"""
# If we have a stdout backup, restore it.
if StandardOutputCapture.original_stdout is not None:
sys.stdout.close()
sys.stdout = StandardOutputCapture.original_stdout
StandardOutputCapture.original_stdout = None
# If we have an stderr backup, restore it.
if StandardOutputCapture.original_stderr is not None:
sys.stderr.close()
sys.stderr = StandardOutputCapture.original_stderr
StandardOutputCapture.original_stderr = None
# Restore our logging handlers
if StandardOutputCapture.original_logger_handlers is not None:
root_logger = logging.getLogger()
root_logger.handlers = StandardOutputCapture.original_logger_handlers
StandardOutputCapture.original_logger_handlers = None
@staticmethod
def get_stdout_output():
"""
Obtains the output from the currently set stdout
:return: Returns stdout output as a string
"""
sys.stdout.seek(0)
return sys.stdout.read()
@staticmethod
def get_stderr_output():
"""
Obtains the output from the currently set stderr
:return: Returns stderr output as a string
"""
sys.stderr.seek(0)
return sys.stderr.read()
Loading…
Cancel
Save