Use arguments from constructor in MythrilAnalyzer and fix typos

pull/906/head
Nikhil Parasaram 6 years ago
parent 231e3ae8fc
commit 636f7defaf
  1. 34
      mythril/interfaces/cli.py
  2. 95
      mythril/mythril/mythril_analyzer.py
  3. 6
      mythril/mythril/mythril_disassembler.py
  4. 8
      tests/graph_test.py
  5. 4
      tests/mythril/mythril_analyzer_test.py

@ -350,15 +350,15 @@ def parse_args(parser: argparse.ArgumentParser, args: argparse.Namespace) -> Non
)
if args.search or args.contract_hash_to_address:
leveldb_sercher = MythrilLevelDB(config.eth_db)
leveldb_searcher = MythrilLevelDB(config.eth_db)
if args.search:
# Database search ops
leveldb_sercher.search_db(args.search)
leveldb_searcher.search_db(args.search)
else:
# search corresponding address
try:
leveldb_sercher.contract_hash_to_address(
leveldb_searcher.contract_hash_to_address(
args.contract_hash_to_address
)
except AddressNotFoundError:
@ -413,7 +413,13 @@ def parse_args(parser: argparse.ArgumentParser, args: argparse.Namespace) -> Non
)
analyzer = MythrilAnalyzer(
strategy=args.strategy,
disassembler=dissasembler,
address=address,
max_depth=args.max_depth,
execution_timeout=args.execution_timeout,
create_timeout=args.create_timeout,
enable_iprof=args.enable_iprof,
onchain_storage_access=not args.no_onchain_storage_access,
)
# Commands
@ -447,15 +453,9 @@ def parse_args(parser: argparse.ArgumentParser, args: argparse.Namespace) -> Non
if args.graph:
html = analyzer.graph_html(
strategy=args.strategy,
contract=analyzer.contracts[0],
address=address,
enable_physics=args.enable_physics,
phrackify=args.phrack,
max_depth=args.max_depth,
execution_timeout=args.execution_timeout,
create_timeout=args.create_timeout,
enable_iprof=args.enable_iprof,
)
try:
@ -467,17 +467,11 @@ def parse_args(parser: argparse.ArgumentParser, args: argparse.Namespace) -> Non
else:
try:
report = analyzer.fire_lasers(
strategy=args.strategy,
address=address,
modules=[m.strip() for m in args.modules.strip().split(",")]
if args.modules
else [],
verbose_report=args.verbose_report,
max_depth=args.max_depth,
execution_timeout=args.execution_timeout,
create_timeout=args.create_timeout,
transaction_count=args.transaction_count,
enable_iprof=args.enable_iprof,
)
outputs = {
"json": report.as_json(),
@ -498,15 +492,7 @@ def parse_args(parser: argparse.ArgumentParser, args: argparse.Namespace) -> Non
args.outform, "input files do not contain any valid contracts"
)
statespace = analyzer.dump_statespace(
strategy=args.strategy,
contract=analyzer.contracts[0],
address=address,
max_depth=args.max_depth,
execution_timeout=args.execution_timeout,
create_timeout=args.create_timeout,
enable_iprof=args.enable_iprof,
)
statespace = analyzer.dump_statespace(contract=analyzer.contracts[0])
try:
with open(args.statespace_json, "w") as f:

@ -29,6 +29,12 @@ class MythrilAnalyzer:
disassembler: MythrilDisassembler,
requires_dynld: bool = False,
onchain_storage_access: bool = True,
strategy: str = "dfs",
address: Optional[str] = None,
max_depth: Optional[int] = None,
execution_timeout: Optional[int] = None,
create_timeout: Optional[int] = None,
enable_iprof: bool = False,
):
"""
@ -41,134 +47,99 @@ class MythrilAnalyzer:
self.enable_online_lookup = disassembler.enable_online_lookup
self.dynld = requires_dynld
self.onchain_storage_access = onchain_storage_access
def dump_statespace(
self,
strategy: str,
contract: EVMContract = None,
address: Optional[str] = None,
max_depth: Optional[int] = None,
execution_timeout: Optional[int] = None,
create_timeout: Optional[int] = None,
enable_iprof: bool = False,
) -> str:
self.strategy = strategy
self.address = address
self.max_depth = max_depth
self.execution_timeout = execution_timeout
self.create_timeout = create_timeout
self.enable_iprof = enable_iprof
def dump_statespace(self, contract: EVMContract = None) -> str:
"""
Returns serializable statespace of the contract
:param strategy: The search strategy to go through the CFG
:param contract: The Contract on which the analysis should be done
:param address: The Contract address
:param max_depth: The max depth till which the CFG should be constructed
:param execution_timeout: The total execution timeout of the contract
:param create_timeout: The total contract creation timeout
:param enable_iprof: Enables/disables instruction profiler
:return: The serialized state space
"""
sym = SymExecWrapper(
contract or self.contracts[0],
address,
strategy,
self.address,
self.strategy,
dynloader=DynLoader(
self.eth,
storage_loading=self.onchain_storage_access,
contract_loading=self.dynld,
),
max_depth=max_depth,
execution_timeout=execution_timeout,
create_timeout=create_timeout,
enable_iprof=enable_iprof,
max_depth=self.max_depth,
execution_timeout=self.execution_timeout,
create_timeout=self.create_timeout,
enable_iprof=self.enable_iprof,
)
return get_serializable_statespace(sym)
def graph_html(
self,
strategy: str,
address: str,
contract: EVMContract = None,
max_depth: Optional[int] = None,
enable_physics: bool = False,
phrackify: bool = False,
execution_timeout: Optional[int] = None,
create_timeout: Optional[int] = None,
transaction_count: Optional[int] = None,
enable_iprof: bool = False,
) -> str:
"""
:param strategy: The search strategy to go through the CFG
:param contract: The Contract on which the analysis should be done
:param address: The Contract address
:param max_depth: The max depth till which the CFG should be constructed
:param enable_physics: If true then enables the graph physics simulation
:param phrackify: If true generates Phrack-style call graph
:param execution_timeout: The total execution timeout of the contract
:param create_timeout: The total contract creation timeout
:param transaction_count: The amount of transactions to be executed
:param enable_iprof: Enables/disables instruction profiler
:return: The generated graph in html format
"""
sym = SymExecWrapper(
contract or self.contracts[0],
address,
strategy,
self.address,
self.strategy,
dynloader=DynLoader(
self.eth,
storage_loading=self.onchain_storage_access,
contract_loading=self.dynld,
),
max_depth=max_depth,
execution_timeout=execution_timeout,
max_depth=self.max_depth,
execution_timeout=self.execution_timeout,
transaction_count=transaction_count,
create_timeout=create_timeout,
enable_iprof=enable_iprof,
create_timeout=self.create_timeout,
enable_iprof=self.enable_iprof,
)
return generate_graph(sym, physics=enable_physics, phrackify=phrackify)
def fire_lasers(
self,
strategy: str,
contracts: Optional[List[EVMContract]] = None,
address: Optional[str] = None,
modules: Optional[List[str]] = None,
verbose_report: bool = False,
max_depth: Optional[int] = None,
execution_timeout: Optional[int] = None,
create_timeout: Optional[int] = None,
transaction_count: Optional[int] = None,
enable_iprof: bool = False,
) -> Report:
"""
:param strategy: The search strategy to go through the CFG
:param contracts: The Contracts list on which the analysis should be done
:param address: The Contract address
:param modules: The analysis modules which should be executed
:param verbose_report: Gives out the transaction sequence of the vulnerability
:param max_depth: The max depth till which the CFG should be constructed
:param execution_timeout: The total execution timeout of the contract
:param create_timeout: The total contract creation timeout
:param transaction_count: The amount of transactions to be executed
:param enable_iprof: Enables/disables instruction profiler
:return: The Report class which contains the all the issues/vulnerabilities
"""
all_issues = [] # type: List[Issue]
for contract in contracts or self.contracts:
for contract in self.contracts:
try:
sym = SymExecWrapper(
contract,
address,
strategy,
self.address,
self.strategy,
dynloader=DynLoader(
self.eth,
storage_loading=self.onchain_storage_access,
contract_loading=self.dynld,
),
max_depth=max_depth,
execution_timeout=execution_timeout,
create_timeout=create_timeout,
max_depth=self.max_depth,
execution_timeout=self.execution_timeout,
create_timeout=self.create_timeout,
transaction_count=transaction_count,
modules=modules,
compulsory_statespace=False,
enable_iprof=enable_iprof,
enable_iprof=self.enable_iprof,
)
issues = fire_lasers(sym, modules)
except KeyboardInterrupt:

@ -122,7 +122,7 @@ class MythrilDisassembler:
)
except Exception as e:
raise CriticalError("IPC / RPC error: " + str(e))
else:
if code == "0x" or code == "0x0":
raise CriticalError(
"Received an empty response from eth_getCode. Check the contract address and verify that you are on the correct chain."
@ -130,9 +130,7 @@ class MythrilDisassembler:
else:
self.contracts.append(
EVMContract(
code,
name=address,
enable_online_lookup=self.enable_online_lookup,
code, name=address, enable_online_lookup=self.enable_online_lookup
)
)
return address, self.contracts[-1] # return address and contract object

@ -24,15 +24,15 @@ class GraphTest(BaseTestCase):
contract = EVMContract(input_file.read_text())
disassembler = MythrilDisassembler()
disassembler.contracts.append(contract)
analyzer = MythrilAnalyzer(disassembler)
html = analyzer.graph_html(
analyzer = MythrilAnalyzer(
disassembler=disassembler,
strategy="dfs",
transaction_count=1,
execution_timeout=5,
max_depth=30,
address=(util.get_indexed_address(0)),
)
html = analyzer.graph_html(transaction_count=1)
output_current.write_text(html)
lines_expected = re.findall(

@ -21,9 +21,9 @@ def test_fire_lasers(mock_sym, mock_fire_lasers, mock_code_info):
)
]
)
analyzer = MythrilAnalyzer(disassembler)
analyzer = MythrilAnalyzer(disassembler, strategy="dfs")
issues = analyzer.fire_lasers(strategy="dfs", modules=[]).sorted_issues()
issues = analyzer.fire_lasers(modules=[]).sorted_issues()
mock_sym.assert_called()
mock_fire_lasers.assert_called()
mock_code_info.assert_called()

Loading…
Cancel
Save