Merge pull request #2302 from crytic/mutator/fit-and-finish

slither-mutate: fit and finish
pull/2394/head
alpharush 8 months ago committed by GitHub
commit 28402aecd0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 258
      slither/tools/mutator/__main__.py
  2. 2
      slither/tools/mutator/mutators/RR.py
  3. 95
      slither/tools/mutator/mutators/abstract_mutator.py
  4. 109
      slither/tools/mutator/utils/file_handling.py
  5. 120
      slither/tools/mutator/utils/testing_generated_mutant.py

@ -1,14 +1,17 @@
import argparse
import inspect
import logging
import sys
import os
import shutil
import sys
import time
from pathlib import Path
from typing import Type, List, Any, Optional
from crytic_compile import cryticparser
from slither import Slither
from slither.tools.mutator.utils.testing_generated_mutant import run_test_cmd
from slither.tools.mutator.mutators import all_mutators
from slither.utils.colors import yellow, magenta
from slither.utils.colors import blue, green, magenta, red
from .mutators.abstract_mutator import AbstractMutator
from .utils.command_line import output_mutators
from .utils.file_handling import (
@ -67,8 +70,18 @@ def parse_args() -> argparse.Namespace:
# to print just all the mutants
parser.add_argument(
"-v",
"--verbose",
help="output all mutants generated",
help="log mutants that are caught as well as those that are uncaught",
action="store_true",
default=False,
)
# to print just all the mutants
parser.add_argument(
"-vv",
"--very-verbose",
help="log mutants that are caught, uncaught, and fail to compile. And more!",
action="store_true",
default=False,
)
@ -87,8 +100,8 @@ def parse_args() -> argparse.Namespace:
# flag to run full mutation based revert mutator output
parser.add_argument(
"--quick",
help="to stop full mutation if revert mutator passes",
"--comprehensive",
help="continue testing minor mutations if severe mutants are uncaught",
action="store_true",
default=False,
)
@ -135,7 +148,7 @@ class ListMutators(argparse.Action): # pylint: disable=too-few-public-methods
###################################################################################
def main() -> (None): # pylint: disable=too-many-statements,too-many-branches,too-many-locals
def main() -> None: # pylint: disable=too-many-statements,too-many-branches,too-many-locals
args = parse_args()
# arguments
@ -146,31 +159,32 @@ def main() -> (None): # pylint: disable=too-many-statements,too-many-branches,t
timeout: Optional[int] = args.timeout
solc_remappings: Optional[str] = args.solc_remaps
verbose: Optional[bool] = args.verbose
very_verbose: Optional[bool] = args.very_verbose
mutators_to_run: Optional[List[str]] = args.mutators_to_run
contract_names: Optional[List[str]] = args.contract_names
quick_flag: Optional[bool] = args.quick
comprehensive_flag: Optional[bool] = args.comprehensive
logger.info(magenta(f"Starting Mutation Campaign in '{args.codebase} \n"))
logger.info(blue(f"Starting mutation campaign in {args.codebase}"))
if paths_to_ignore:
paths_to_ignore_list = paths_to_ignore.strip("][").split(",")
logger.info(magenta(f"Ignored paths - {', '.join(paths_to_ignore_list)} \n"))
logger.info(blue(f"Ignored paths - {', '.join(paths_to_ignore_list)}"))
else:
paths_to_ignore_list = []
contract_names: List[str] = []
if args.contract_names:
contract_names = args.contract_names.split(",")
# get all the contracts as a list from given codebase
sol_file_list: List[str] = get_sol_file_list(args.codebase, paths_to_ignore_list)
sol_file_list: List[str] = get_sol_file_list(Path(args.codebase), paths_to_ignore_list)
# folder where backup files and valid mutants created
# folder where backup files and uncaught mutants are saved
if output_dir is None:
output_dir = "/mutation_campaign"
output_folder = os.getcwd() + output_dir
if os.path.exists(output_folder):
shutil.rmtree(output_folder)
output_dir = "./mutation_campaign"
# set default timeout
if timeout is None:
timeout = 30
output_folder = Path(output_dir).resolve()
if output_folder.is_dir():
shutil.rmtree(output_folder)
# setting RR mutator as first mutator
mutators_list = _get_mutators(mutators_to_run)
@ -187,51 +201,138 @@ def main() -> (None): # pylint: disable=too-many-statements,too-many-branches,t
CR_RR_list.insert(1, M)
mutators_list = CR_RR_list + mutators_list
logger.info(blue("Timing tests.."))
# run and time tests, abort if they're broken
start_time = time.time()
# no timeout or target_file during the first run, but be verbose on failure
if not run_test_cmd(test_command, None, None, True):
logger.error(red("Test suite fails with mutations, aborting"))
return
elapsed_time = round(time.time() - start_time)
# set default timeout
# default to twice as long as it usually takes to run the test suite
if timeout is None:
timeout = int(elapsed_time * 2)
else:
timeout = int(timeout)
if timeout < elapsed_time:
logger.info(
red(
f"Provided timeout {timeout} is too short for tests that run in {elapsed_time} seconds"
)
)
return
logger.info(
green(
f"Test suite passes in {elapsed_time} seconds, commencing mutation campaign with a timeout of {timeout} seconds\n"
)
)
# Keep a list of all already mutated contracts so we don't mutate them twice
mutated_contracts: List[str] = []
for filename in sol_file_list: # pylint: disable=too-many-nested-blocks
contract_name = os.path.split(filename)[1].split(".sol")[0]
file_name = os.path.split(filename)[1].split(".sol")[0]
# slither object
sl = Slither(filename, **vars(args))
# create a backup files
files_dict = backup_source_file(sl.source_code, output_folder)
# total count of mutants
total_count = 0
# count of valid mutants
v_count = 0
# total revert/comment/tweak mutants that were generated and compiled
total_mutant_counts = [0, 0, 0]
# total uncaught revert/comment/tweak mutants
uncaught_mutant_counts = [0, 0, 0]
# lines those need not be mutated (taken from RR and CR)
dont_mutate_lines = []
# mutation
target_contract = "SLITHER_SKIP_MUTATIONS" if contract_names else ""
try:
for compilation_unit_of_main_file in sl.compilation_units:
contract_instance = ""
for contract in compilation_unit_of_main_file.contracts:
if contract_names is not None and contract.name in contract_names:
contract_instance = contract
elif str(contract.name).lower() == contract_name.lower():
contract_instance = contract
if contract_instance == "":
logger.error("Can't find the contract")
else:
for M in mutators_list:
m = M(
compilation_unit_of_main_file,
int(timeout),
test_command,
test_directory,
contract_instance,
solc_remappings,
verbose,
output_folder,
dont_mutate_lines,
)
(count_valid, count_invalid, lines_list) = m.mutate()
v_count += count_valid
total_count += count_valid + count_invalid
dont_mutate_lines = lines_list
if not quick_flag:
dont_mutate_lines = []
if contract.name in contract_names and contract.name not in mutated_contracts:
target_contract = contract
break
if not contract_names and contract.name.lower() == file_name.lower():
target_contract = contract
break
if target_contract == "":
logger.info(
f"Cannot find contracts in file {filename}, try specifying them with --contract-names"
)
continue
if target_contract == "SLITHER_SKIP_MUTATIONS":
logger.debug(f"Skipping mutations in {filename}")
continue
# TODO: find a more specific way to omit interfaces
# Ideally, we wouldn't depend on naming conventions
if target_contract.name.startswith("I"):
logger.debug(f"Skipping mutations on interface {filename}")
continue
# Add our target to the mutation list
mutated_contracts.append(target_contract.name)
logger.info(blue(f"Mutating contract {target_contract}"))
for M in mutators_list:
m = M(
compilation_unit_of_main_file,
int(timeout),
test_command,
test_directory,
target_contract,
solc_remappings,
verbose,
very_verbose,
output_folder,
dont_mutate_lines,
)
(total_counts, uncaught_counts, lines_list) = m.mutate()
if m.NAME == "RR":
total_mutant_counts[0] += total_counts[0]
uncaught_mutant_counts[0] += uncaught_counts[0]
if verbose:
logger.info(
magenta(
f"Mutator {m.NAME} found {uncaught_counts[0]} uncaught revert mutants (out of {total_counts[0]} that compile)"
)
)
elif m.NAME == "CR":
total_mutant_counts[1] += total_counts[1]
uncaught_mutant_counts[1] += uncaught_counts[1]
if verbose:
logger.info(
magenta(
f"Mutator {m.NAME} found {uncaught_counts[1]} uncaught comment mutants (out of {total_counts[1]} that compile)"
)
)
else:
total_mutant_counts[2] += total_counts[2]
uncaught_mutant_counts[2] += uncaught_counts[2]
if verbose:
logger.info(
magenta(
f"Mutator {m.NAME} found {uncaught_counts[2]} uncaught tweak mutants (out of {total_counts[2]} that compile)"
)
)
logger.info(
magenta(
f"Running total: found {uncaught_mutant_counts[2]} uncaught tweak mutants (out of {total_mutant_counts[2]} that compile)"
)
)
dont_mutate_lines = lines_list
if comprehensive_flag:
dont_mutate_lines = []
except Exception as e: # pylint: disable=broad-except
logger.error(e)
transfer_and_delete(files_dict)
except KeyboardInterrupt:
# transfer and delete the backup files if interrupted
@ -241,14 +342,59 @@ def main() -> (None): # pylint: disable=too-many-statements,too-many-branches,t
# transfer and delete the backup files
transfer_and_delete(files_dict)
# output
logger.info(
yellow(
f"Done mutating, '{filename}'. Valid mutant count: '{v_count}' and Total mutant count '{total_count}'.\n"
# log results for this file
logger.info(blue(f"Done mutating {target_contract}."))
if total_mutant_counts[0] > 0:
logger.info(
magenta(
f"Revert mutants: {uncaught_mutant_counts[0]} uncaught of {total_mutant_counts[0]} ({100 * uncaught_mutant_counts[0]/total_mutant_counts[0]}%)"
)
)
)
else:
logger.info(magenta("Zero Revert mutants analyzed"))
if total_mutant_counts[1] > 0:
logger.info(
magenta(
f"Comment mutants: {uncaught_mutant_counts[1]} uncaught of {total_mutant_counts[1]} ({100 * uncaught_mutant_counts[1]/total_mutant_counts[1]}%)"
)
)
else:
logger.info(magenta("Zero Comment mutants analyzed"))
if total_mutant_counts[2] > 0:
logger.info(
magenta(
f"Tweak mutants: {uncaught_mutant_counts[2]} uncaught of {total_mutant_counts[2]} ({100 * uncaught_mutant_counts[2]/total_mutant_counts[2]}%)\n"
)
)
else:
logger.info(magenta("Zero Tweak mutants analyzed\n"))
# Reset mutant counts before moving on to the next file
if very_verbose:
logger.info(blue("Reseting mutant counts to zero"))
total_mutant_counts[0] = 0
total_mutant_counts[1] = 0
total_mutant_counts[2] = 0
uncaught_mutant_counts[0] = 0
uncaught_mutant_counts[1] = 0
uncaught_mutant_counts[2] = 0
# Print the total time elapsed in a human-readable time format
elapsed_time = round(time.time() - start_time)
hours, remainder = divmod(elapsed_time, 3600)
minutes, seconds = divmod(remainder, 60)
if hours > 0:
elapsed_string = f"{hours} {'hour' if hours == 1 else 'hours'}"
elif minutes > 0:
elapsed_string = f"{minutes} {'minute' if minutes == 1 else 'minutes'}"
else:
elapsed_string = f"{seconds} {'second' if seconds == 1 else 'seconds'}"
logger.info(magenta(f"Finished Mutation Campaign in '{args.codebase}' \n"))
logger.info(
blue(f"Finished mutation testing assessment of '{args.codebase}' in {elapsed_string}\n")
)
# endregion

@ -24,7 +24,7 @@ class RR(AbstractMutator): # pylint: disable=too-few-public-methods
old_str = self.in_file_str[start:stop]
line_no = node.source_mapping.lines
if not line_no[0] in self.dont_mutate_line:
if old_str != "revert()":
if not old_str.lstrip().startswith("revert"):
new_str = "revert()"
create_patch_with_line(
result,

@ -1,10 +1,10 @@
import abc
import logging
from pathlib import Path
from typing import Optional, Dict, Tuple, List
from slither.core.compilation_unit import SlitherCompilationUnit
from slither.formatters.utils.patches import apply_patch, create_diff
from slither.tools.mutator.utils.testing_generated_mutant import test_patch
from slither.utils.colors import yellow
from slither.core.declarations import Contract
logger = logging.getLogger("Slither-Mutate")
@ -19,8 +19,6 @@ class AbstractMutator(
): # pylint: disable=too-few-public-methods,too-many-instance-attributes
NAME = ""
HELP = ""
VALID_MUTANTS_COUNT = 0
INVALID_MUTANTS_COUNT = 0
def __init__( # pylint: disable=too-many-arguments
self,
@ -31,7 +29,8 @@ class AbstractMutator(
contract_instance: Contract,
solc_remappings: str | None,
verbose: bool,
output_folder: str,
very_verbose: bool,
output_folder: Path,
dont_mutate_line: List[int],
rate: int = 10,
seed: Optional[int] = None,
@ -45,11 +44,16 @@ class AbstractMutator(
self.timeout = timeout
self.solc_remappings = solc_remappings
self.verbose = verbose
self.very_verbose = very_verbose
self.output_folder = output_folder
self.contract = contract_instance
self.in_file = self.contract.source_mapping.filename.absolute
self.in_file_str = self.contract.compilation_unit.core.source_code[self.in_file]
self.dont_mutate_line = dont_mutate_line
# total revert/comment/tweak mutants that were generated and compiled
self.total_mutant_counts = [0, 0, 0]
# total uncaught revert/comment/tweak mutants
self.uncaught_mutant_counts = [0, 0, 0]
if not self.NAME:
raise IncorrectMutatorInitialization(
@ -71,50 +75,75 @@ class AbstractMutator(
"""TODO Documentation"""
return {}
def mutate(self) -> Tuple[int, int, List[int]]:
# pylint: disable=too-many-branches
def mutate(self) -> Tuple[List[int], List[int], List[int]]:
# call _mutate function from different mutators
(all_patches) = self._mutate()
if "patches" not in all_patches:
logger.debug("No patches found by %s", self.NAME)
return (0, 0, self.dont_mutate_line)
return ([0, 0, 0], [0, 0, 0], self.dont_mutate_line)
for file in all_patches["patches"]:
for file in all_patches["patches"]: # Note: This should only loop over a single file
original_txt = self.slither.source_code[file].encode("utf8")
patches = all_patches["patches"][file]
patches.sort(key=lambda x: x["start"])
logger.info(yellow(f"Mutating {file} with {self.NAME} \n"))
for patch in patches:
# test the patch
flag = test_patch(
patchWasCaught = test_patch(
self.output_folder,
file,
patch,
self.test_command,
self.VALID_MUTANTS_COUNT,
self.NAME,
self.timeout,
self.solc_remappings,
self.verbose,
self.very_verbose,
)
# if RR or CR and valid mutant, add line no.
if self.NAME in ("RR", "CR") and flag:
self.dont_mutate_line.append(patch["line_number"])
# count the valid and invalid mutants
if not flag:
self.INVALID_MUTANTS_COUNT += 1
continue
self.VALID_MUTANTS_COUNT += 1
patched_txt, _ = apply_patch(original_txt, patch, 0)
diff = create_diff(self.compilation_unit, original_txt, patched_txt, file)
if not diff:
logger.info(f"Impossible to generate patch; empty {patches}")
# add valid mutant patches to a output file
with open(
self.output_folder + "/patches_file.txt", "a", encoding="utf8"
) as patches_file:
patches_file.write(diff + "\n")
return (
self.VALID_MUTANTS_COUNT,
self.INVALID_MUTANTS_COUNT,
self.dont_mutate_line,
)
# count the uncaught mutants, flag RR/CR mutants to skip further mutations
if patchWasCaught == 0:
if self.NAME == "RR":
self.uncaught_mutant_counts[0] += 1
self.dont_mutate_line.append(patch["line_number"])
elif self.NAME == "CR":
self.uncaught_mutant_counts[1] += 1
self.dont_mutate_line.append(patch["line_number"])
else:
self.uncaught_mutant_counts[2] += 1
patched_txt, _ = apply_patch(original_txt, patch, 0)
diff = create_diff(self.compilation_unit, original_txt, patched_txt, file)
if not diff:
logger.info(f"Impossible to generate patch; empty {patches}")
# add uncaught mutant patches to a output file
with (self.output_folder / "patches_files.txt").open(
"a", encoding="utf8"
) as patches_file:
patches_file.write(diff + "\n")
# count the total number of mutants that we were able to compile
if patchWasCaught != 2:
if self.NAME == "RR":
self.total_mutant_counts[0] += 1
elif self.NAME == "CR":
self.total_mutant_counts[1] += 1
else:
self.total_mutant_counts[2] += 1
if self.very_verbose:
if self.NAME == "RR":
logger.info(
f"Found {self.uncaught_mutant_counts[0]} uncaught revert mutants so far (out of {self.total_mutant_counts[0]} that compile)"
)
elif self.NAME == "CR":
logger.info(
f"Found {self.uncaught_mutant_counts[1]} uncaught comment mutants so far (out of {self.total_mutant_counts[1]} that compile)"
)
else:
logger.info(
f"Found {self.uncaught_mutant_counts[2]} uncaught tweak mutants so far (out of {self.total_mutant_counts[2]} that compile)"
)
return (self.total_mutant_counts, self.uncaught_mutant_counts, self.dont_mutate_line)

@ -1,106 +1,101 @@
import os
from typing import Dict, List
import traceback
from typing import Dict, List, Union
import logging
from pathlib import Path
import hashlib
logger = logging.getLogger("Slither-Mutate")
duplicated_files = {}
HashedPath = str
backuped_files: Dict[str, HashedPath] = {}
def backup_source_file(source_code: Dict, output_folder: str) -> Dict:
def backup_source_file(source_code: Dict, output_folder: Path) -> Dict[str, HashedPath]:
"""
function to backup the source file
returns: dictionary of duplicated files
"""
os.makedirs(output_folder, exist_ok=True)
output_folder.mkdir(exist_ok=True, parents=True)
for file_path, content in source_code.items():
directory, filename = os.path.split(file_path)
new_filename = f"{output_folder}/backup_{filename}"
new_file_path = os.path.join(directory, new_filename)
path_hash = hashlib.md5(bytes(file_path, "utf8")).hexdigest()
(output_folder / path_hash).write_text(content, encoding="utf8")
with open(new_file_path, "w", encoding="utf8") as new_file:
new_file.write(content)
duplicated_files[file_path] = new_file_path
backuped_files[file_path] = (output_folder / path_hash).as_posix()
return duplicated_files
return backuped_files
def transfer_and_delete(files_dict: Dict) -> None:
def transfer_and_delete(files_dict: Dict[str, HashedPath]) -> None:
"""function to transfer the original content to the sol file after campaign"""
try:
files_dict_copy = files_dict.copy()
for item, value in files_dict_copy.items():
with open(value, "r", encoding="utf8") as duplicated_file:
for original_path, hashed_path in files_dict_copy.items():
with open(hashed_path, "r", encoding="utf8") as duplicated_file:
content = duplicated_file.read()
with open(item, "w", encoding="utf8") as original_file:
with open(original_path, "w", encoding="utf8") as original_file:
original_file.write(content)
os.remove(value)
Path(hashed_path).unlink()
# delete elements from the global dict
del duplicated_files[item]
del backuped_files[original_path]
except Exception as e: # pylint: disable=broad-except
logger.error(f"Error transferring content: {e}")
except FileNotFoundError as e: # pylint: disable=broad-except
logger.error("Error transferring content: %s", e)
global_counter = {}
def create_mutant_file(file: str, count: int, rule: str) -> None:
def create_mutant_file(output_folder: Path, file: str, rule: str) -> None:
"""function to create new mutant file"""
try:
_, filename = os.path.split(file)
if rule not in global_counter:
global_counter[rule] = 0
file_path = Path(file)
# Read content from the duplicated file
with open(file, "r", encoding="utf8") as source_file:
content = source_file.read()
content = file_path.read_text(encoding="utf8")
# Write content to the original file
mutant_name = filename.split(".")[0]
mutant_name = file_path.stem
# create folder for each contract
os.makedirs("mutation_campaign/" + mutant_name, exist_ok=True)
with open(
"mutation_campaign/"
+ mutant_name
+ "/"
+ mutant_name
+ "_"
+ rule
+ "_"
+ str(count)
+ ".sol",
"w",
encoding="utf8",
) as mutant_file:
mutation_dir = output_folder / mutant_name
mutation_dir.mkdir(parents=True, exist_ok=True)
mutation_filename = f"{mutant_name}_{rule}_{global_counter[rule]}.sol"
with (mutation_dir / mutation_filename).open("w", encoding="utf8") as mutant_file:
mutant_file.write(content)
global_counter[rule] += 1
# reset the file
with open(duplicated_files[file], "r", encoding="utf8") as duplicated_file:
duplicate_content = duplicated_file.read()
duplicate_content = Path(backuped_files[file]).read_text("utf8")
with open(file, "w", encoding="utf8") as source_file:
source_file.write(duplicate_content)
except Exception as e: # pylint: disable=broad-except
logger.error(f"Error creating mutant: {e}")
traceback_str = traceback.format_exc()
logger.error(traceback_str) # Log the stack trace
def reset_file(file: str) -> None:
"""function to reset the file"""
try:
# directory, filename = os.path.split(file)
# reset the file
with open(duplicated_files[file], "r", encoding="utf8") as duplicated_file:
with open(backuped_files[file], "r", encoding="utf8") as duplicated_file:
duplicate_content = duplicated_file.read()
with open(file, "w", encoding="utf8") as source_file:
source_file.write(duplicate_content)
except Exception as e: # pylint: disable=broad-except
logger.error(f"Error resetting file: {e}")
logger.error("Error resetting file: %s", e)
def get_sol_file_list(codebase: str, ignore_paths: List[str] | None) -> List[str]:
def get_sol_file_list(codebase: Path, ignore_paths: Union[List[str], None]) -> List[str]:
"""
function to get the contracts list
returns: list of .sol files
@ -110,21 +105,13 @@ def get_sol_file_list(codebase: str, ignore_paths: List[str] | None) -> List[str
ignore_paths = []
# if input is contract file
if os.path.isfile(codebase):
return [codebase]
if codebase.is_file():
return [codebase.as_posix()]
# if input is folder
if os.path.isdir(codebase):
directory = os.path.abspath(codebase)
for file in os.listdir(directory):
filename = os.path.join(directory, file)
if os.path.isfile(filename):
sol_file_list.append(filename)
elif os.path.isdir(filename):
_, dirname = os.path.split(filename)
if dirname in ignore_paths:
continue
for i in get_sol_file_list(filename, ignore_paths):
sol_file_list.append(i)
if codebase.is_dir():
for file_name in codebase.rglob("*.sol"):
if not any(part in ignore_paths for part in file_name.parts):
sol_file_list.append(file_name.as_posix())
return sol_file_list

@ -1,12 +1,11 @@
import subprocess
import os
import logging
import time
import signal
from typing import Dict
import sys
import subprocess
from pathlib import Path
from typing import Dict, Union
import crytic_compile
from slither.tools.mutator.utils.file_handling import create_mutant_file, reset_file
from slither.utils.colors import green, red
from slither.utils.colors import green, red, yellow
logger = logging.getLogger("Slither-Mutate")
@ -23,13 +22,12 @@ def compile_generated_mutant(file_path: str, mappings: str) -> bool:
return False
def run_test_cmd(cmd: str, test_dir: str, timeout: int) -> bool:
def run_test_cmd(cmd: str, timeout: int | None, target_file: str | None, verbose: bool) -> bool:
"""
function to run codebase tests
returns: boolean whether the tests passed or not
"""
# future purpose
_ = test_dir
# add --fail-fast for foundry tests, to exit after first failure
if "forge test" in cmd and "--fail-fast" not in cmd:
cmd += " --fail-fast"
@ -37,41 +35,62 @@ def run_test_cmd(cmd: str, test_dir: str, timeout: int) -> bool:
elif "hardhat test" in cmd or "truffle test" in cmd and "--bail" not in cmd:
cmd += " --bail"
start = time.time()
if timeout is None and "hardhat" not in cmd: # hardhat doesn't support --force flag on tests
# if no timeout, ensure all contracts are recompiled w/out using any cache
cmd += " --force"
try:
result = subprocess.run(
cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=timeout,
check=False, # True: Raises a CalledProcessError if the return code is non-zero
)
except subprocess.TimeoutExpired:
# Timeout, treat this as a test failure
logger.info("Tests took too long, consider increasing the timeout")
result = None # or set result to a default value
except KeyboardInterrupt:
logger.info(yellow("Ctrl-C received"))
if target_file is not None:
logger.info("Restoring original files")
reset_file(target_file)
logger.info("Exiting")
sys.exit(1)
# if result is 0 then it is an uncaught mutant because tests didn't fail
if result:
code = result.returncode
if code == 0:
return True
# starting new process
with subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as P:
try:
# checking whether the process is completed or not within 30 seconds(default)
while P.poll() is None and (time.time() - start) < timeout:
time.sleep(0.05)
finally:
if P.poll() is None:
logger.error("HAD TO TERMINATE ANALYSIS (TIMEOUT OR EXCEPTION)")
# sends a SIGTERM signal to process group - bascially killing the process
os.killpg(os.getpgid(P.pid), signal.SIGTERM)
# Avoid any weird race conditions from grabbing the return code
time.sleep(0.05)
# indicates whether the command executed sucessfully or not
r = P.returncode
# If tests fail in verbose-mode, print both stdout and stderr for easier debugging
if verbose:
logger.info(yellow(result.stdout.decode("utf-8")))
logger.info(red(result.stderr.decode("utf-8")))
# if r is 0 then it is valid mutant because tests didn't fail
return r == 0
return False
# return 0 if uncaught, 1 if caught, and 2 if compilation fails
def test_patch( # pylint: disable=too-many-arguments
output_folder: Path,
file: str,
patch: Dict,
command: str,
index: int,
generator_name: str,
timeout: int,
mappings: str | None,
mappings: Union[str, None],
verbose: bool,
) -> bool:
very_verbose: bool,
) -> int:
"""
function to verify the validity of each patch
returns: valid or invalid patch
function to verify whether each patch is caught by tests
returns: 0 (uncaught), 1 (caught), or 2 (compilation failure)
"""
with open(file, "r", encoding="utf-8") as filepath:
content = filepath.read()
@ -80,21 +99,36 @@ def test_patch( # pylint: disable=too-many-arguments
# Write the modified content back to the file
with open(file, "w", encoding="utf-8") as filepath:
filepath.write(replaced_content)
if compile_generated_mutant(file, mappings):
if run_test_cmd(command, file, timeout):
create_mutant_file(file, index, generator_name)
print(
green(
f"String '{patch['old_string']}' replaced with '{patch['new_string']}' at line no. '{patch['line_number']}' ---> VALID\n"
if run_test_cmd(command, timeout, file, False):
create_mutant_file(output_folder, file, generator_name)
logger.info(
red(
f"[{generator_name}] Line {patch['line_number']}: '{patch['old_string']}' ==> '{patch['new_string']}' --> UNCAUGHT"
)
)
return True
reset_file(file)
return 0 # uncaught
else:
if very_verbose:
logger.info(
yellow(
f"[{generator_name}] Line {patch['line_number']}: '{patch['old_string']}' ==> '{patch['new_string']}' --> COMPILATION FAILURE"
)
)
reset_file(file)
return 2 # compile failure
reset_file(file)
if verbose:
print(
red(
f"String '{patch['old_string']}' replaced with '{patch['new_string']}' at line no. '{patch['line_number']}' ---> INVALID\n"
logger.info(
green(
f"[{generator_name}] Line {patch['line_number']}: '{patch['old_string']}' ==> '{patch['new_string']}' --> CAUGHT"
)
)
return False
reset_file(file)
return 1 # caught

Loading…
Cancel
Save