mirror of https://github.com/crytic/slither
commit
adb3eb1abf
@ -0,0 +1,136 @@ |
|||||||
|
import logging |
||||||
|
import uuid |
||||||
|
from typing import List, Union |
||||||
|
|
||||||
|
from slither.detectors.abstract_detector import AbstractDetector, DetectorClassification |
||||||
|
from slither.utils import codex |
||||||
|
from slither.utils.output import Output, SupportedOutput |
||||||
|
|
||||||
|
logger = logging.getLogger("Slither") |
||||||
|
|
||||||
|
VULN_FOUND = "VULN_FOUND" |
||||||
|
|
||||||
|
|
||||||
|
class Codex(AbstractDetector): |
||||||
|
""" |
||||||
|
Use codex to detect vulnerability |
||||||
|
""" |
||||||
|
|
||||||
|
ARGUMENT = "codex" |
||||||
|
HELP = "Use Codex to find vulnerabilities." |
||||||
|
IMPACT = DetectorClassification.HIGH |
||||||
|
CONFIDENCE = DetectorClassification.LOW |
||||||
|
|
||||||
|
WIKI = "https://github.com/crytic/slither/wiki/Detector-Documentation#codex" |
||||||
|
|
||||||
|
WIKI_TITLE = "Codex" |
||||||
|
WIKI_DESCRIPTION = "Use [codex](https://openai.com/blog/openai-codex/) to find vulnerabilities" |
||||||
|
|
||||||
|
# region wiki_exploit_scenario |
||||||
|
WIKI_EXPLOIT_SCENARIO = """N/A""" |
||||||
|
# endregion wiki_exploit_scenario |
||||||
|
|
||||||
|
WIKI_RECOMMENDATION = "Review codex's message." |
||||||
|
|
||||||
|
def _run_codex(self, logging_file: str, prompt: str) -> str: |
||||||
|
""" |
||||||
|
Handle the codex logic |
||||||
|
|
||||||
|
Args: |
||||||
|
logging_file (str): file where to log the queries |
||||||
|
prompt (str): prompt to send to codex |
||||||
|
|
||||||
|
Returns: |
||||||
|
codex answer (str) |
||||||
|
""" |
||||||
|
openai_module = codex.openai_module() # type: ignore |
||||||
|
if openai_module is None: |
||||||
|
return "" |
||||||
|
|
||||||
|
if self.slither.codex_log: |
||||||
|
codex.log_codex(logging_file, "Q: " + prompt) |
||||||
|
|
||||||
|
answer = "" |
||||||
|
res = {} |
||||||
|
try: |
||||||
|
res = openai_module.Completion.create( |
||||||
|
prompt=prompt, |
||||||
|
model=self.slither.codex_model, |
||||||
|
temperature=self.slither.codex_temperature, |
||||||
|
max_tokens=self.slither.codex_max_tokens, |
||||||
|
) |
||||||
|
except Exception as e: # pylint: disable=broad-except |
||||||
|
logger.info("OpenAI request failed: " + str(e)) |
||||||
|
|
||||||
|
# """ OpenAI completion response shape example: |
||||||
|
# { |
||||||
|
# "choices": [ |
||||||
|
# { |
||||||
|
# "finish_reason": "stop", |
||||||
|
# "index": 0, |
||||||
|
# "logprobs": null, |
||||||
|
# "text": "VULNERABILITIES:. The withdraw() function does not check..." |
||||||
|
# } |
||||||
|
# ], |
||||||
|
# "created": 1670357537, |
||||||
|
# "id": "cmpl-6KYaXdA6QIisHlTMM7RCJ1nR5wTKx", |
||||||
|
# "model": "text-davinci-003", |
||||||
|
# "object": "text_completion", |
||||||
|
# "usage": { |
||||||
|
# "completion_tokens": 80, |
||||||
|
# "prompt_tokens": 249, |
||||||
|
# "total_tokens": 329 |
||||||
|
# } |
||||||
|
# } """ |
||||||
|
|
||||||
|
if res: |
||||||
|
if self.slither.codex_log: |
||||||
|
codex.log_codex(logging_file, "A: " + str(res)) |
||||||
|
else: |
||||||
|
codex.log_codex(logging_file, "A: Codex failed") |
||||||
|
|
||||||
|
if res.get("choices", []) and VULN_FOUND in res["choices"][0].get("text", ""): |
||||||
|
# remove VULN_FOUND keyword and cleanup |
||||||
|
answer = ( |
||||||
|
res["choices"][0]["text"] |
||||||
|
.replace(VULN_FOUND, "") |
||||||
|
.replace("\n", "") |
||||||
|
.replace(": ", "") |
||||||
|
) |
||||||
|
return answer |
||||||
|
|
||||||
|
def _detect(self) -> List[Output]: |
||||||
|
results: List[Output] = [] |
||||||
|
|
||||||
|
if not self.slither.codex_enabled: |
||||||
|
return [] |
||||||
|
|
||||||
|
logging_file = str(uuid.uuid4()) |
||||||
|
|
||||||
|
for contract in self.compilation_unit.contracts: |
||||||
|
if ( |
||||||
|
self.slither.codex_contracts != "all" |
||||||
|
and contract.name not in self.slither.codex_contracts.split(",") |
||||||
|
): |
||||||
|
continue |
||||||
|
prompt = f"Analyze this Solidity contract and find the vulnerabilities. If you find any vulnerabilities, begin the response with {VULN_FOUND}\n" |
||||||
|
src_mapping = contract.source_mapping |
||||||
|
content = contract.compilation_unit.core.source_code[src_mapping.filename.absolute] |
||||||
|
start = src_mapping.start |
||||||
|
end = src_mapping.start + src_mapping.length |
||||||
|
prompt += content[start:end] |
||||||
|
|
||||||
|
answer = self._run_codex(logging_file, prompt) |
||||||
|
|
||||||
|
if answer: |
||||||
|
info: List[Union[str, SupportedOutput]] = [ |
||||||
|
"Codex detected a potential bug in ", |
||||||
|
contract, |
||||||
|
"\n", |
||||||
|
answer, |
||||||
|
"\n", |
||||||
|
] |
||||||
|
|
||||||
|
new_result = self.generate_result(info) |
||||||
|
results.append(new_result) |
||||||
|
return results |
@ -0,0 +1,53 @@ |
|||||||
|
import logging |
||||||
|
import os |
||||||
|
from pathlib import Path |
||||||
|
|
||||||
|
logger = logging.getLogger("Slither") |
||||||
|
|
||||||
|
|
||||||
|
# TODO: investigate how to set the correct return type |
||||||
|
# So that the other modules can work with openai |
||||||
|
def openai_module(): # type: ignore |
||||||
|
""" |
||||||
|
Return the openai module |
||||||
|
Consider checking the usage of open (slither.codex_enabled) before using this function |
||||||
|
|
||||||
|
Returns: |
||||||
|
Optional[the openai module] |
||||||
|
""" |
||||||
|
try: |
||||||
|
# pylint: disable=import-outside-toplevel |
||||||
|
import openai |
||||||
|
|
||||||
|
api_key = os.getenv("OPENAI_API_KEY") |
||||||
|
if api_key is None: |
||||||
|
logger.info( |
||||||
|
"Please provide an Open API Key in OPENAI_API_KEY (https://beta.openai.com/account/api-keys)" |
||||||
|
) |
||||||
|
return None |
||||||
|
openai.api_key = api_key |
||||||
|
except ImportError: |
||||||
|
logger.info("OpenAI was not installed") # type: ignore |
||||||
|
logger.info('run "pip install openai"') |
||||||
|
return None |
||||||
|
return openai |
||||||
|
|
||||||
|
|
||||||
|
def log_codex(filename: str, prompt: str) -> None: |
||||||
|
""" |
||||||
|
Log the prompt in crytic/export/codex/filename |
||||||
|
Append to the file |
||||||
|
|
||||||
|
Args: |
||||||
|
filename: filename to write to |
||||||
|
prompt: prompt to write |
||||||
|
|
||||||
|
Returns: |
||||||
|
None |
||||||
|
""" |
||||||
|
|
||||||
|
Path("crytic_export/codex").mkdir(parents=True, exist_ok=True) |
||||||
|
|
||||||
|
with open(Path("crytic_export/codex", filename), "a", encoding="utf8") as file: |
||||||
|
file.write(prompt) |
||||||
|
file.write("\n") |
Loading…
Reference in new issue