refactoring myth script

pull/88/head
freewind 7 years ago
parent e2c2652863
commit d8046bf569
  1. 315
      myth
  2. 6
      mythril/analysis/report.py

315
myth

@ -4,6 +4,18 @@
http://www.github.com/b-mueller/mythril
"""
from json.decoder import JSONDecodeError
import logging
import json
import sys
import argparse
import os
import re
from ethereum import utils
from solc.exceptions import SolcError
import solc
from mythril.ether import util
from mythril.ether.contractstorage import get_persistent_storage
from mythril.ether.ethcontract import ETHContract
@ -19,18 +31,6 @@ from mythril.analysis.symbolic import SymExecWrapper
from mythril.analysis.callgraph import generate_graph
from mythril.analysis.security import fire_lasers
from mythril.analysis.report import Report
from laser.ethereum import helper
from ethereum import utils
from pathlib import Path
from json.decoder import JSONDecodeError
from solc.exceptions import SolcError
import solc
import logging
import json
import sys
import argparse
import os
import re
def searchCallback(code_hash, code, addresses, balances):
@ -41,12 +41,11 @@ def searchCallback(code_hash, code, addresses, balances):
def exitWithError(format, message):
if (format == 'text' or format == 'markdown'):
if format == 'text' or format == 'markdown':
print(message)
else:
result = {'success': False, 'error': str(message), 'issues': []}
print(json.dumps(result))
sys.exit()
@ -107,36 +106,31 @@ except KeyError:
# Detect unsupported combinations of command line args
if (args.dynld) and not (args.address):
if args.dynld and not args.address:
exitWithError(args.outform, "Dynamic loader can be used in on-chain analysis mode only (-a).")
# Initialize data directry and signature database
# Initialize data directory and signature database
if not os.path.exists(mythril_dir):
logging.info("Creating mythril data directory")
os.mkdir(mythril_dir)
# If no function signature file exists, create it. Function signatures from Solidity source code are added automatically.
signatures_file = os.path.join(mythril_dir, 'signatures.json')
sigs = {}
if not os.path.exists(signatures_file):
logging.info("No signature database found. Creating empty database: " + signatures_file + "\n"
"Consider replacing it with the pre-initialized database at "
"https://raw.githubusercontent.com/ConsenSys/mythril/master/signatures.json")
sigs = {}
logging.info("No signature database found. Creating empty database: " + signatures_file + "\n" +
"Consider replacing it with the pre-initialized database at https://raw.githubusercontent.com/ConsenSys/mythril/master/signatures.json")
with open(signatures_file, 'a') as f:
json.dump({}, f)
else:
with open(signatures_file) as f:
try:
sigs = json.load(f)
except JSONDecodeError as e:
exitWithError(args.outform, "Invalid JSON in signatures file " + signatures_file + "\n" + str(e))
with open(signatures_file) as f:
try:
sigs = json.load(f)
except JSONDecodeError as e:
exitWithError(args.outform, "Invalid JSON in signatures file " + signatures_file + "\n" + str(e))
# Parse cmdline args
@ -144,22 +138,21 @@ if not (args.search or args.init_db or args.hash or args.disassemble or args.gra
parser.print_help()
sys.exit()
if (args.v):
if (0 <= args.v < 3):
if args.v:
if 0 <= args.v < 3:
logging.basicConfig(level=[logging.NOTSET, logging.INFO, logging.DEBUG][args.v])
else:
exitWithError(args.outform, "Invalid -v value, you can find valid values in usage")
elif (args.hash):
if args.hash:
print("0x" + utils.sha3(args.hash)[:4].hex())
sys.exit()
if args.truffle:
try:
analyze_truffle_project(args)
except FileNotFoundError:
print("Build directory not found. Make sure that you start the analysis from the project root, and that 'truffle compile' has executed successfully.")
sys.exit()
# Figure out solc binary and version
@ -177,147 +170,126 @@ if args.solv:
else:
if util.solc_exists(version):
logging.info('Given version is already installed')
solc_binary = os.path.join(os.environ['HOME'], ".py-solc/solc-v" + version, "bin/solc")
logging.info("Setting the compiler to " + str(solc_binary))
else:
try:
solc.install_solc('v' + version)
solc_binary = os.path.join(os.environ['HOME'], ".py-solc/solc-v" + version, "bin/solc")
logging.info("Setting the compiler to " + str(solc_binary))
except SolcError:
exitWithError(args.outform, "There was an error when trying to install the specified solc version")
solc_binary = os.path.join(os.environ['HOME'], ".py-solc/solc-v" + version, "bin/solc")
logging.info("Setting the compiler to " + str(solc_binary))
else:
if not args.solv:
try:
solc_binary = os.environ['SOLC']
except KeyError:
try:
solc_binary = 'solc'
except:
exitWithError('No solidity compiler found, please make sure it is installed or specify it manually.')
try:
solc_binary = os.environ['SOLC']
except KeyError:
solc_binary = 'solc'
# Establish RPC/IPC connection if necessary
if (args.address or args.init_db):
if args.infura_mainnet:
eth = EthJsonRpc('mainnet.infura.io', 443, True)
elif args.infura_rinkeby:
eth = EthJsonRpc('rinkeby.infura.io', 443, True)
elif args.infura_kovan:
eth = EthJsonRpc('kovan.infura.io', 443, True)
elif args.infura_ropsten:
eth = EthJsonRpc('ropsten.infura.io', 443, True)
elif args.ganache:
eth = EthJsonRpc('localhost', 7545, False)
elif args.rpc:
try:
host, port = args.rpc.split(":")
except ValueError:
exitWithError(args.outform, "Invalid RPC argument, use HOST:PORT")
tls = args.rpctls
eth = EthJsonRpc(host, int(port), tls)
eth = None
if args.address or args.init_db:
if args.infura_mainnet:
eth = EthJsonRpc('mainnet.infura.io', 443, True)
elif args.infura_rinkeby:
eth = EthJsonRpc('rinkeby.infura.io', 443, True)
elif args.infura_kovan:
eth = EthJsonRpc('kovan.infura.io', 443, True)
elif args.infura_ropsten:
eth = EthJsonRpc('ropsten.infura.io', 443, True)
elif args.ganache:
eth = EthJsonRpc('localhost', 7545, False)
elif args.rpc:
try:
host, port = args.rpc.split(":")
except ValueError:
exitWithError(args.outform, "Invalid RPC argument, use HOST:PORT")
else:
try:
eth = EthIpc()
except:
exitWithError(args.outform, "IPC initialization failed. Please verify that your local Ethereum node is running, or use the -i flag to connect to INFURA.")
tls = args.rpctls
eth = EthJsonRpc(host, int(port), tls)
else:
try:
eth = EthIpc()
except Exception as e:
exitWithError(args.outform, "IPC initialization failed. Please verify that your local Ethereum node is running, or use the -i flag to connect to INFURA. \n" + str(e))
# Database search ops
if args.search or args.init_db:
contract_storage = get_persistent_storage(mythril_dir)
if (args.search):
if args.search:
try:
contract_storage.search(args.search, searchCallback)
except SyntaxError:
exitWithError(args.outform, "Syntax error in search expression.")
elif (args.init_db):
try:
contract_storage.initialize(eth, args.sync_all)
except FileNotFoundError as e:
exitWithError(args.outform, "Error syncing database over IPC: " + str(e))
except ConnectionError as e:
exitWithError(args.outform, "Could not connect to RPC server. Make sure that your node is running and that RPC parameters are set correctly.")
elif args.init_db:
try:
contract_storage.initialize(eth, args.sync_all)
except FileNotFoundError as e:
exitWithError(args.outform, "Error syncing database over IPC: " + str(e))
except ConnectionError as e:
exitWithError(args.outform, "Could not connect to RPC server. Make sure that your node is running and that RPC parameters are set correctly.")
sys.exit()
# Load / compile input contracts
contracts = []
address = None
if (args.code):
if args.code:
address = util.get_indexed_address(0)
contracts.append(ETHContract(args.code, name="MAIN"))
# Get bytecode from a contract address
elif (args.address):
elif args.address:
address = args.address
if not re.match(r'0x[a-fA-F0-9]{40}', args.address):
exitWithError(args.outform, "Invalid contract address. Expected format is '0x...'.")
try:
code = eth.eth_getCode(args.address)
if (code == "0x"):
exitWithError(args.outform, "Received an empty response from eth_getCode. Check the contract address and verify that you are on the correct chain.")
except FileNotFoundError as e:
exitWithError(args.outform, "IPC error: " + str(e))
except ConnectionError as e:
exitWithError(args.outform, "Could not connect to RPC server. Make sure that your node is running and that RPC parameters are set correctly.")
except Exception as e:
exitWithError(args.outform, "IPC / RPC error: " + str(e))
contracts.append(ETHContract(code, name=args.address))
else:
if code == "0x":
exitWithError(args.outform, "Received an empty response from eth_getCode. Check the contract address and verify that you are on the correct chain.")
else:
contracts.append(ETHContract(code, name=args.address))
# Compile Solidity source file(s)
elif (len(args.solidity_file)):
elif args.solidity_file:
address = util.get_indexed_address(0)
if(args.graph and len(args.solidity_file) > 1):
if args.graph and len(args.solidity_file) > 1:
exitWithError(args.outform, "Cannot generate call graphs from multiple input files. Please do it one at a time.")
for file in args.solidity_file:
if ":" in file:
file, contract_name = file.split(":")
else:
contract_name = None
file = file.replace("~", str(Path.home())) # Expand user path
file = os.path.expanduser(file)
try:
signatures.add_signatures_from_file(file, sigs) # Parse file for new function signatures
signatures.add_signatures_from_file(file, sigs)
contract = SolidityContract(file, contract_name, solc_args=args.solc_args)
logging.info("Analyzing contract %s:%s" % (file, contract.name))
contracts.append(contract)
except FileNotFoundError:
exitWithError(args.outform, "Input file not found: " + file)
except CompilerError as e:
exitWithError(args.outform, e)
except NoContractFoundError:
logging.info("The file " + file + " does not contain a compilable contract.")
except FileNotFoundError:
exitWithError(args.outform, "Input file not found: " + file)
else:
contracts.append(contract)
# Save updated function signatures
with open(signatures_file, 'w') as f:
json.dump(sigs, f)
@ -330,66 +302,51 @@ if args.storage:
if not args.address:
exitWithError(args.outform, "To read storage, provide the address of a deployed contract with the -a option.")
else:
position = 0
length = 1
array = 0
(position, length) = (0, 1)
try:
params = (args.storage).split(",")
if len(params) >= 1 and len(params) <= 3:
position = int(params[0])
if len(params) >= 2 and len(params) <= 3:
length = int(params[1])
if len(params) == 3:
if re.match("array", params[2]):
array = 1
params = args.storage.split(",")
if len(params) >= 4:
exitWithError(args.outform, "Invalid number of parameters.")
if len(params) >= 1:
position = int(params[0])
if len(params) >= 2:
length = int(params[1])
if len(params) == 3 and params[2] == "array":
position_formatted = str(position).zfill(64)
position = int(utils.sha3(position_formatted), 16)
except ValueError:
exitWithError(args.outform, "Invalid storage index. Please provide a numeric value.")
if array:
position_formated = str(position).zfill(64)
position = int(utils.sha3(position_formated), 16)
try:
if length == 1:
print("{}: ".format(position) + eth.eth_getStorageAt(args.address, position))
print("{}: {}".format(position, eth.eth_getStorageAt(args.address, position)))
else:
for i in range(position, position + length):
print("{}: ".format(hex(i)) + eth.eth_getStorageAt(args.address, i))
print("{}: {}".format(hex(i), eth.eth_getStorageAt(args.address, i)))
except FileNotFoundError as e:
exitWithError(args.outform, "IPC error: " + str(e))
except ConnectionError as e:
exitWithError(args.outform, "Could not connect to RPC server. Make sure that your node is running and that RPC parameters are set correctly.")
elif (args.disassemble):
elif args.disassemble:
easm_text = contracts[0].get_easm()
sys.stdout.write(easm_text)
elif (args.xrefs):
elif args.xrefs:
print("\n".join(contracts[0].get_xrefs()))
elif (args.graph) or (args.fire_lasers):
if not len(contracts):
elif args.graph or args.fire_lasers:
if not contracts:
exitWithError(args.outform, "input files do not contain any valid contracts")
if (args.graph):
# try:
if (args.dynld):
if args.graph:
if args.dynld:
sym = SymExecWrapper(contracts[0], address, dynloader=DynLoader(eth), max_depth=args.max_depth)
else:
sym = SymExecWrapper(contracts[0], address, max_depth=args.max_depth)
# except:
# exitWithError(args.outform, "Symbolic execution error: " + str(e))
if args.enable_physics is not None:
physics = True
html = generate_graph(sym, args.enable_physics, args.phrack)
@ -400,67 +357,39 @@ elif (args.graph) or (args.fire_lasers):
exitWithError(args.outform, "Error saving graph: " + str(e))
else:
_issues = []
report = Report(args.verbose_report)
all_issues = []
for contract in contracts:
# try:
if (args.dynld):
if args.dynld:
sym = SymExecWrapper(contract, address, dynloader=DynLoader(eth), max_depth=args.max_depth)
else:
sym = SymExecWrapper(contract, address, max_depth=args.max_depth)
# except Exception as e:
# exitWithError(args.outform, "Symbolic exection error: " + str(e))
if args.modules:
issues = fire_lasers(sym, args.modules.split(","))
else:
issues = fire_lasers(sym)
if len(issues):
# Append source code info for SolidityContract
if (type(contract) == SolidityContract):
for issue in issues:
if (issue.pc):
codeinfo = contract.get_source_info(issue.pc)
issue.filename = codeinfo.filename
issue.code = codeinfo.code
issue.lineno = codeinfo.lineno
if type(contract) == SolidityContract:
for issue in issues:
report.append_issue(issue) # For text and markdown output
_issues.append(issue.as_dict()) # List of dicts for JSON output
# Finally, output the results
if (len(_issues)):
if (args.outform == 'json'):
if issue.pc:
codeinfo = contract.get_source_info(issue.pc)
issue.filename = codeinfo.filename
issue.code = codeinfo.code
issue.lineno = codeinfo.lineno
result = {'success': True, 'error': None, 'issues': _issues}
print(json.dumps(result))
all_issues += issues
else:
if (args.outform == 'text'):
print(report.as_text())
elif (args.outform == 'markdown'):
print(report.as_markdown())
else:
if (args.outform == 'text' or args.outform == 'markdown'):
print("The analysis was completed successfully. No issues were detected.")
else:
result = {'success': True, 'error': None, 'issues': []}
print(json.dumps(result))
# Finally, output the results
report = Report(args.verbose_report)
for issue in all_issues:
report.append_issue(issue)
outputs = {
'json': report.as_json(),
'text': report.as_text() or "The analysis was completed successfully. No issues were detected.",
'markdown': report.as_markdown() or "The analysis was completed successfully. No issues were detected."
}
print(outputs[args.outform])
else:
parser.print_help()

@ -76,12 +76,10 @@ class Report:
def as_json(self):
issues = []
for key, issue in self.issues.items():
issues.append(issue.as_dict())
return json.dumps(issues)
result = {'success': True, 'error': None, 'issues': issues}
return json.dumps(result)
def as_markdown(self):
text = "# Analysis Results\n"

Loading…
Cancel
Save