Merge branch 'develop' into pending-opcodes

pending-opcodes
Nathan 5 years ago committed by GitHub
commit 612fc4df34
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      docs/source/index.rst
  2. 63
      docs/source/mythx-analysis.rst
  3. 2
      docs/source/security-analysis.rst
  4. 14
      mythril/analysis/modules/base.py
  5. 140
      mythril/analysis/modules/delegatecall.py
  6. 6
      mythril/analysis/modules/dependence_on_predictable_vars.py
  7. 31
      mythril/analysis/modules/deprecated_ops.py
  8. 2
      mythril/analysis/modules/dos.py
  9. 58
      mythril/analysis/modules/ether_thief.py
  10. 4
      mythril/analysis/modules/exceptions.py
  11. 36
      mythril/analysis/modules/external_calls.py
  12. 6
      mythril/analysis/modules/integer.py
  13. 6
      mythril/analysis/modules/multiple_sends.py
  14. 33
      mythril/analysis/modules/state_change_external_calls.py
  15. 6
      mythril/analysis/modules/suicide.py
  16. 6
      mythril/analysis/modules/unchecked_retval.py
  17. 108
      mythril/analysis/potential_issues.py
  18. 6
      mythril/analysis/templates/report_as_markdown.jinja2
  19. 4
      mythril/analysis/templates/report_as_text.jinja2
  20. 64
      mythril/ethereum/util.py
  21. 103
      mythril/interfaces/cli.py
  22. 7
      mythril/interfaces/old_cli.py
  23. 3
      mythril/laser/ethereum/svm.py
  24. 2
      mythril/laser/ethereum/util.py
  25. 12
      mythril/mythril/mythril_disassembler.py
  26. 111
      mythril/mythx/__init__.py
  27. 79
      mythril/solidity/soliditycontract.py
  28. 47
      mythril/support/signatures.py
  29. 3
      requirements.txt
  30. 3
      setup.py
  31. 6
      tests/disassembler_test.py

@ -9,6 +9,7 @@ Welcome to Mythril's documentation!
installation installation
security-analysis security-analysis
analysis-modules analysis-modules
mythx-analysis
mythril mythril

@ -0,0 +1,63 @@
MythX Analysis
=================
Run :code:`myth pro` with one of the input options described below will run a `MythX analysis <https://mythx.io>`_ on the desired input. This includes a run of Mythril, the fuzzer Harvey, and the static analysis engine Maru and has some false-positive filtering only possible by combining the tool capabilities.
**************
Authentication
**************
In order to authenticate with the MythX API, set the environment variables ``MYTHX_PASSWORD`` and ``MYTHX_ETH_ADDRESS``.
.. code-block:: bash
$ export MYTHX_ETH_ADDRESS='0x0000000000000000000000000000000000000000'
$ export MYTHX_PASSWORD='password'
***********************
Analyzing Solidity Code
***********************
The input format is the same as a regular Mythril analysis.
.. code-block:: bash
$ myth pro ether_send.sol
==== Unprotected Ether Withdrawal ====
SWC ID: 105
Severity: High
Contract: Crowdfunding
Function name: withdrawfunds()
PC address: 730
Anyone can withdraw ETH from the contract account.
Arbitrary senders other than the contract creator can withdraw ETH from the contract account without previously having sent an equivalent amount of ETH to it. This is likely to be a vulnerability.
--------------------
In file: tests/testdata/input_contracts/ether_send.sol:21
msg.sender.transfer(address(this).balance)
--------------------
If an input file contains multiple contract definitions, Mythril analyzes the *last* bytecode output produced by solc. You can override this by specifying the contract name explicitly:
.. code-block:: bash
myth pro OmiseGo.sol:OMGToken
To specify a contract address, use :code:`-a <address>`
****************************
Analyzing On-Chain Contracts
****************************
Analyzing a mainnet contract via INFURA:
.. code-block:: bash
myth pro -a 0x5c436ff914c458983414019195e0f4ecbef9e6dd
Adding the :code:`-l` flag will cause mythril to automatically retrieve dependencies, such as dynamically linked library contracts:
.. code-block:: bash
myth -v4 pro -l -a 0xEbFD99838cb0c132016B9E117563CB41f2B02264

@ -1,7 +1,7 @@
Security Analysis Security Analysis
================= =================
Run :code:`myth -x` with one of the input options described below will run the analysis modules in the `/analysis/modules <https://github.com/ConsenSys/mythril/tree/master/mythril/analysis/modules>`_ directory. Run :code:`myth analyze` with one of the input options described below will run the analysis modules in the `/analysis/modules <https://github.com/ConsenSys/mythril/tree/master/mythril/analysis/modules>`_ directory.
*********************** ***********************
Analyzing Solidity Code Analyzing Solidity Code

@ -3,6 +3,7 @@ modules."""
import logging import logging
from typing import List, Set from typing import List, Set
from mythril.analysis.report import Issue from mythril.analysis.report import Issue
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -34,21 +35,14 @@ class DetectionModule:
self.name, self.name,
) )
self.entrypoint = entrypoint self.entrypoint = entrypoint
self._issues = [] # type: List[Issue] self.issues = [] # type: List[Issue]
self._cache = set() # type: Set[int] self.cache = set() # type: Set[int]
@property
def issues(self):
"""
Returns the issues
"""
return self._issues
def reset_module(self): def reset_module(self):
""" """
Resets issues Resets issues
""" """
self._issues = [] self.issues = []
def execute(self, statespace) -> None: def execute(self, statespace) -> None:
"""The entry point for execution, which is being called by Mythril. """The entry point for execution, which is being called by Mythril.

@ -20,60 +20,6 @@ from mythril.laser.smt import symbol_factory, UGT
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class DelegateCallAnnotation(StateAnnotation):
def __init__(self, call_state: GlobalState, constraints: List) -> None:
"""
Initialize DelegateCall Annotation
:param call_state: Call state
"""
self.call_state = call_state
self.constraints = constraints
self.return_value = call_state.new_bitvec(
"retval_{}".format(call_state.get_current_instruction()["address"]), 256
)
def _copy__(self):
return DelegateCallAnnotation(self.call_state, copy(self.constraints))
def get_issue(self, global_state: GlobalState, transaction_sequence: Dict) -> Issue:
"""
Returns Issue for the annotation
:param global_state: Global State
:param transaction_sequence: Transaction sequence
:return: Issue
"""
address = self.call_state.get_current_instruction()["address"]
logging.debug(
"[DELEGATECALL] Detected delegatecall to a user-supplied address : {}".format(
address
)
)
description_head = "The contract delegates execution to another contract with a user-supplied address."
description_tail = (
"The smart contract delegates execution to a user-supplied address. Note that callers "
"can execute arbitrary contracts and that the callee contract "
"can access the storage of the calling contract. "
)
return Issue(
contract=self.call_state.environment.active_account.contract_name,
function_name=self.call_state.environment.active_function_name,
address=address,
swc_id=DELEGATECALL_TO_UNTRUSTED_CONTRACT,
title="Delegatecall Proxy To User-Supplied Address",
bytecode=global_state.environment.code.bytecode,
severity="Medium",
description_head=description_head,
description_tail=description_tail,
transaction_sequence=transaction_sequence,
gas_used=(
global_state.mstate.min_gas_used,
global_state.mstate.max_gas_used,
),
)
class DelegateCallModule(DetectionModule): class DelegateCallModule(DetectionModule):
"""This module detects calldata being forwarded using DELEGATECALL.""" """This module detects calldata being forwarded using DELEGATECALL."""
@ -84,7 +30,7 @@ class DelegateCallModule(DetectionModule):
swc_id=DELEGATECALL_TO_UNTRUSTED_CONTRACT, swc_id=DELEGATECALL_TO_UNTRUSTED_CONTRACT,
description="Check for invocations of delegatecall(msg.data) in the fallback function.", description="Check for invocations of delegatecall(msg.data) in the fallback function.",
entrypoint="callback", entrypoint="callback",
pre_hooks=["DELEGATECALL", "RETURN", "STOP"], pre_hooks=["DELEGATECALL"],
) )
def _execute(self, state: GlobalState) -> None: def _execute(self, state: GlobalState) -> None:
@ -93,12 +39,12 @@ class DelegateCallModule(DetectionModule):
:param state: :param state:
:return: :return:
""" """
if state.get_current_instruction()["address"] in self._cache: if state.get_current_instruction()["address"] in self.cache:
return return
issues = self._analyze_state(state) issues = self._analyze_state(state)
for issue in issues: for issue in issues:
self._cache.add(issue.address) self.cache.add(issue.address)
self._issues.extend(issues) self.issues.extend(issues)
@staticmethod @staticmethod
def _analyze_state(state: GlobalState) -> List[Issue]: def _analyze_state(state: GlobalState) -> List[Issue]:
@ -106,50 +52,56 @@ class DelegateCallModule(DetectionModule):
:param state: the current state :param state: the current state
:return: returns the issues for that corresponding state :return: returns the issues for that corresponding state
""" """
issues = []
op_code = state.get_current_instruction()["opcode"] op_code = state.get_current_instruction()["opcode"]
annotations = cast(
List[DelegateCallAnnotation],
list(state.get_annotations(DelegateCallAnnotation)),
)
if len(annotations) == 0 and op_code in ("RETURN", "STOP"): gas = state.mstate.stack[-1]
return [] to = state.mstate.stack[-2]
if op_code == "DELEGATECALL": constraints = [
gas = state.mstate.stack[-1] to == ATTACKER_ADDRESS,
to = state.mstate.stack[-2] UGT(gas, symbol_factory.BitVecVal(2300, 256)),
]
constraints = [ for tx in state.world_state.transaction_sequence:
to == ATTACKER_ADDRESS, if not isinstance(tx, ContractCreationTransaction):
UGT(gas, symbol_factory.BitVecVal(2300, 256)), constraints.append(tx.caller == ATTACKER_ADDRESS)
]
try:
transaction_sequence = solver.get_transaction_sequence(
state, state.mstate.constraints + constraints
)
for tx in state.world_state.transaction_sequence: address = state.get_current_instruction()["address"]
if not isinstance(tx, ContractCreationTransaction): logging.debug(
constraints.append(tx.caller == ATTACKER_ADDRESS) "[DELEGATECALL] Detected delegatecall to a user-supplied address : {}".format(
address
)
)
description_head = "The contract delegates execution to another contract with a user-supplied address."
description_tail = (
"The smart contract delegates execution to a user-supplied address. Note that callers "
"can execute arbitrary contracts and that the callee contract "
"can access the storage of the calling contract. "
)
state.annotate(DelegateCallAnnotation(state, constraints)) return [
Issue(
contract=state.environment.active_account.contract_name,
function_name=state.environment.active_function_name,
address=address,
swc_id=DELEGATECALL_TO_UNTRUSTED_CONTRACT,
bytecode=state.environment.code.bytecode,
title="Delegatecall Proxy To User-Supplied Address",
severity="Medium",
description_head=description_head,
description_tail=description_tail,
transaction_sequence=transaction_sequence,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
]
except UnsatError:
return [] return []
else:
for annotation in annotations:
try:
transaction_sequence = solver.get_transaction_sequence(
state,
state.mstate.constraints
+ annotation.constraints
+ [annotation.return_value == 1],
)
issues.append(
annotation.get_issue(
state, transaction_sequence=transaction_sequence
)
)
except UnsatError:
continue
return issues
detector = DelegateCallModule() detector = DelegateCallModule()

@ -74,12 +74,12 @@ class PredictableDependenceModule(DetectionModule):
:param state: :param state:
:return: :return:
""" """
if state.get_current_instruction()["address"] in self._cache: if state.get_current_instruction()["address"] in self.cache:
return return
issues = self._analyze_state(state) issues = self._analyze_state(state)
for issue in issues: for issue in issues:
self._cache.add(issue.address) self.cache.add(issue.address)
self._issues.extend(issues) self.issues.extend(issues)
@staticmethod @staticmethod
def _analyze_state(state: GlobalState) -> list: def _analyze_state(state: GlobalState) -> list:

@ -1,6 +1,8 @@
"""This module contains the detection code for deprecated op codes.""" """This module contains the detection code for deprecated op codes."""
from mythril.analysis.report import Issue from mythril.analysis.potential_issues import (
from mythril.analysis.solver import get_transaction_sequence, UnsatError PotentialIssue,
get_potential_issues_annotation,
)
from mythril.analysis.swc_data import DEPRECATED_FUNCTIONS_USAGE from mythril.analysis.swc_data import DEPRECATED_FUNCTIONS_USAGE
from mythril.analysis.modules.base import DetectionModule from mythril.analysis.modules.base import DetectionModule
from mythril.laser.ethereum.state.global_state import GlobalState from mythril.laser.ethereum.state.global_state import GlobalState
@ -32,16 +34,14 @@ class DeprecatedOperationsModule(DetectionModule):
:param state: :param state:
:return: :return:
""" """
if state.get_current_instruction()["address"] in self._cache: if state.get_current_instruction()["address"] in self.cache:
return return
issues = self._analyze_state(state) issues = self._analyze_state(state)
for issue in issues: annotation = get_potential_issues_annotation(state)
self._cache.add(issue.address) annotation.potential_issues.extend(issues)
self._issues.extend(issues)
@staticmethod def _analyze_state(self, state):
def _analyze_state(state):
""" """
:param state: :param state:
@ -76,26 +76,21 @@ class DeprecatedOperationsModule(DetectionModule):
swc_id = DEPRECATED_FUNCTIONS_USAGE swc_id = DEPRECATED_FUNCTIONS_USAGE
else: else:
return [] return []
try:
transaction_sequence = get_transaction_sequence( potential_issue = PotentialIssue(
state, state.mstate.constraints
)
except UnsatError:
return []
issue = Issue(
contract=state.environment.active_account.contract_name, contract=state.environment.active_account.contract_name,
function_name=state.environment.active_function_name, function_name=state.environment.active_function_name,
address=instruction["address"], address=instruction["address"],
title=title, title=title,
bytecode=state.environment.code.bytecode, bytecode=state.environment.code.bytecode,
detector=self,
swc_id=swc_id, swc_id=swc_id,
severity="Medium", severity="Medium",
description_head=description_head, description_head=description_head,
description_tail=description_tail, description_tail=description_tail,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used), constraints=[],
transaction_sequence=transaction_sequence,
) )
return [issue] return [potential_issue]
detector = DeprecatedOperationsModule() detector = DeprecatedOperationsModule()

@ -56,7 +56,7 @@ class DosModule(DetectionModule):
:return: :return:
""" """
issues = self._analyze_state(state) issues = self._analyze_state(state)
self._issues.extend(issues) self.issues.extend(issues)
def _analyze_state(self, state: GlobalState) -> List[Issue]: def _analyze_state(self, state: GlobalState) -> List[Issue]:
""" """

@ -3,15 +3,16 @@ withdrawal."""
import logging import logging
from copy import copy from copy import copy
from mythril.analysis import solver
from mythril.analysis.modules.base import DetectionModule from mythril.analysis.modules.base import DetectionModule
from mythril.analysis.report import Issue from mythril.analysis.potential_issues import (
get_potential_issues_annotation,
PotentialIssue,
)
from mythril.laser.ethereum.transaction.symbolic import ( from mythril.laser.ethereum.transaction.symbolic import (
ATTACKER_ADDRESS, ATTACKER_ADDRESS,
CREATOR_ADDRESS, CREATOR_ADDRESS,
) )
from mythril.analysis.swc_data import UNPROTECTED_ETHER_WITHDRAWAL from mythril.analysis.swc_data import UNPROTECTED_ETHER_WITHDRAWAL
from mythril.exceptions import UnsatError
from mythril.laser.ethereum.state.global_state import GlobalState from mythril.laser.ethereum.state.global_state import GlobalState
from mythril.laser.ethereum.transaction import ContractCreationTransaction from mythril.laser.ethereum.transaction import ContractCreationTransaction
@ -60,15 +61,14 @@ class EtherThief(DetectionModule):
:param state: :param state:
:return: :return:
""" """
if state.get_current_instruction()["address"] in self._cache: if state.get_current_instruction()["address"] in self.cache:
return return
issues = self._analyze_state(state) potential_issues = self._analyze_state(state)
for issue in issues:
self._cache.add(issue.address) annotation = get_potential_issues_annotation(state)
self._issues.extend(issues) annotation.potential_issues.extend(potential_issues)
@staticmethod def _analyze_state(self, state):
def _analyze_state(state):
""" """
:param state: :param state:
@ -115,29 +115,23 @@ class EtherThief(DetectionModule):
state.current_transaction.caller == ATTACKER_ADDRESS, state.current_transaction.caller == ATTACKER_ADDRESS,
] ]
try: potential_issue = PotentialIssue(
transaction_sequence = solver.get_transaction_sequence(state, constraints) contract=state.environment.active_account.contract_name,
function_name=state.environment.active_function_name,
issue = Issue( address=instruction["address"],
contract=state.environment.active_account.contract_name, swc_id=UNPROTECTED_ETHER_WITHDRAWAL,
function_name=state.environment.active_function_name, title="Unprotected Ether Withdrawal",
address=instruction["address"], severity="High",
swc_id=UNPROTECTED_ETHER_WITHDRAWAL, bytecode=state.environment.code.bytecode,
title="Unprotected Ether Withdrawal", description_head="Anyone can withdraw ETH from the contract account.",
severity="High", description_tail="Arbitrary senders other than the contract creator can withdraw ETH from the contract"
bytecode=state.environment.code.bytecode, + " account without previously having sent an equivalent amount of ETH to it. This is likely to be"
description_head="Anyone can withdraw ETH from the contract account.", + " a vulnerability.",
description_tail="Arbitrary senders other than the contract creator can withdraw ETH from the contract" detector=self,
+ " account without previously having sent an equivalent amount of ETH to it. This is likely to be" constraints=constraints,
+ " a vulnerability.", )
transaction_sequence=transaction_sequence,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
)
except UnsatError:
log.debug("No model found")
return []
return [issue] return [potential_issue]
detector = EtherThief() detector = EtherThief()

@ -33,8 +33,8 @@ class ReachableExceptionsModule(DetectionModule):
""" """
issues = self._analyze_state(state) issues = self._analyze_state(state)
for issue in issues: for issue in issues:
self._cache.add(issue.address) self.cache.add(issue.address)
self._issues.extend(issues) self.issues.extend(issues)
@staticmethod @staticmethod
def _analyze_state(state) -> list: def _analyze_state(state) -> list:

@ -2,20 +2,23 @@
calls.""" calls."""
from mythril.analysis import solver from mythril.analysis import solver
from mythril.analysis.potential_issues import (
PotentialIssue,
get_potential_issues_annotation,
)
from mythril.analysis.swc_data import REENTRANCY from mythril.analysis.swc_data import REENTRANCY
from mythril.laser.ethereum.state.constraints import Constraints
from mythril.laser.ethereum.transaction.symbolic import ATTACKER_ADDRESS from mythril.laser.ethereum.transaction.symbolic import ATTACKER_ADDRESS
from mythril.laser.ethereum.transaction.transaction_models import ( from mythril.laser.ethereum.transaction.transaction_models import (
ContractCreationTransaction, ContractCreationTransaction,
) )
from mythril.analysis.modules.base import DetectionModule from mythril.analysis.modules.base import DetectionModule
from mythril.analysis.report import Issue
from mythril.laser.smt import UGT, symbol_factory, Or, BitVec from mythril.laser.smt import UGT, symbol_factory, Or, BitVec
from mythril.laser.ethereum.natives import PRECOMPILE_COUNT from mythril.laser.ethereum.natives import PRECOMPILE_COUNT
from mythril.laser.ethereum.state.global_state import GlobalState from mythril.laser.ethereum.state.global_state import GlobalState
from mythril.exceptions import UnsatError from mythril.exceptions import UnsatError
from copy import copy from copy import copy
import logging import logging
import json
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -65,13 +68,12 @@ class ExternalCalls(DetectionModule):
:param state: :param state:
:return: :return:
""" """
issues = self._analyze_state(state) potential_issues = self._analyze_state(state)
for issue in issues:
self._cache.add(issue.address) annotation = get_potential_issues_annotation(state)
self._issues.extend(issues) annotation.potential_issues.extend(potential_issues)
@staticmethod def _analyze_state(self, state: GlobalState):
def _analyze_state(state):
""" """
:param state: :param state:
@ -83,10 +85,10 @@ class ExternalCalls(DetectionModule):
address = state.get_current_instruction()["address"] address = state.get_current_instruction()["address"]
try: try:
constraints = copy(state.mstate.constraints) constraints = Constraints([UGT(gas, symbol_factory.BitVecVal(2300, 256))])
transaction_sequence = solver.get_transaction_sequence( transaction_sequence = solver.get_transaction_sequence(
state, constraints + [UGT(gas, symbol_factory.BitVecVal(2300, 256))] state, constraints + state.mstate.constraints
) )
# Check whether we can also set the callee address # Check whether we can also set the callee address
@ -99,7 +101,7 @@ class ExternalCalls(DetectionModule):
constraints.append(tx.caller == ATTACKER_ADDRESS) constraints.append(tx.caller == ATTACKER_ADDRESS)
transaction_sequence = solver.get_transaction_sequence( transaction_sequence = solver.get_transaction_sequence(
state, constraints state, constraints + state.mstate.constraints
) )
description_head = "A call to a user-supplied address is executed." description_head = "A call to a user-supplied address is executed."
@ -110,7 +112,7 @@ class ExternalCalls(DetectionModule):
"contract state." "contract state."
) )
issue = Issue( issue = PotentialIssue(
contract=state.environment.active_account.contract_name, contract=state.environment.active_account.contract_name,
function_name=state.environment.active_function_name, function_name=state.environment.active_function_name,
address=address, address=address,
@ -120,8 +122,8 @@ class ExternalCalls(DetectionModule):
severity="Medium", severity="Medium",
description_head=description_head, description_head=description_head,
description_tail=description_tail, description_tail=description_tail,
transaction_sequence=transaction_sequence, constraints=constraints,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used), detector=self,
) )
except UnsatError: except UnsatError:
@ -138,7 +140,7 @@ class ExternalCalls(DetectionModule):
"that the callee contract has been reviewed carefully." "that the callee contract has been reviewed carefully."
) )
issue = Issue( issue = PotentialIssue(
contract=state.environment.active_account.contract_name, contract=state.environment.active_account.contract_name,
function_name=state.environment.active_function_name, function_name=state.environment.active_function_name,
address=address, address=address,
@ -148,8 +150,8 @@ class ExternalCalls(DetectionModule):
severity="Low", severity="Low",
description_head=description_head, description_head=description_head,
description_tail=description_tail, description_tail=description_tail,
transaction_sequence=transaction_sequence, constraints=constraints,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used), detector=self,
) )
except UnsatError: except UnsatError:

@ -113,7 +113,7 @@ class IntegerOverflowUnderflowModule(DetectionModule):
address = _get_address_from_state(state) address = _get_address_from_state(state)
if address in self._cache: if address in self.cache:
return return
opcode = state.get_current_instruction()["opcode"] opcode = state.get_current_instruction()["opcode"]
@ -331,8 +331,8 @@ class IntegerOverflowUnderflowModule(DetectionModule):
) )
address = _get_address_from_state(ostate) address = _get_address_from_state(ostate)
self._cache.add(address) self.cache.add(address)
self._issues.append(issue) self.issues.append(issue)
detector = IntegerOverflowUnderflowModule() detector = IntegerOverflowUnderflowModule()

@ -45,12 +45,12 @@ class MultipleSendsModule(DetectionModule):
) )
def _execute(self, state: GlobalState) -> None: def _execute(self, state: GlobalState) -> None:
if state.get_current_instruction()["address"] in self._cache: if state.get_current_instruction()["address"] in self.cache:
return return
issues = self._analyze_state(state) issues = self._analyze_state(state)
for issue in issues: for issue in issues:
self._cache.add(issue.address) self.cache.add(issue.address)
self._issues.extend(issues) self.issues.extend(issues)
@staticmethod @staticmethod
def _analyze_state(state: GlobalState): def _analyze_state(state: GlobalState):

@ -1,6 +1,10 @@
from mythril.analysis.potential_issues import (
PotentialIssue,
get_potential_issues_annotation,
)
from mythril.analysis.swc_data import REENTRANCY from mythril.analysis.swc_data import REENTRANCY
from mythril.analysis.modules.base import DetectionModule from mythril.analysis.modules.base import DetectionModule
from mythril.analysis.report import Issue from mythril.laser.ethereum.state.constraints import Constraints
from mythril.laser.smt import symbol_factory, UGT, BitVec, Or from mythril.laser.smt import symbol_factory, UGT, BitVec, Or
from mythril.laser.ethereum.state.global_state import GlobalState from mythril.laser.ethereum.state.global_state import GlobalState
from mythril.laser.ethereum.state.annotation import StateAnnotation from mythril.laser.ethereum.state.annotation import StateAnnotation
@ -32,10 +36,12 @@ class StateChangeCallsAnnotation(StateAnnotation):
new_annotation.state_change_states = self.state_change_states[:] new_annotation.state_change_states = self.state_change_states[:]
return new_annotation return new_annotation
def get_issue(self, global_state: GlobalState) -> Optional[Issue]: def get_issue(
self, global_state: GlobalState, detector: DetectionModule
) -> Optional[PotentialIssue]:
if not self.state_change_states: if not self.state_change_states:
return None return None
constraints = copy(global_state.mstate.constraints) constraints = Constraints()
gas = self.call_state.mstate.stack[-1] gas = self.call_state.mstate.stack[-1]
to = self.call_state.mstate.stack[-2] to = self.call_state.mstate.stack[-2]
constraints += [ constraints += [
@ -50,10 +56,11 @@ class StateChangeCallsAnnotation(StateAnnotation):
try: try:
transaction_sequence = solver.get_transaction_sequence( transaction_sequence = solver.get_transaction_sequence(
global_state, constraints global_state, constraints + global_state.mstate.constraints
) )
except UnsatError: except UnsatError:
return None return None
severity = "Medium" if self.user_defined_address else "Low" severity = "Medium" if self.user_defined_address else "Low"
address = global_state.get_current_instruction()["address"] address = global_state.get_current_instruction()["address"]
logging.debug( logging.debug(
@ -67,7 +74,7 @@ class StateChangeCallsAnnotation(StateAnnotation):
"state change takes place. This can lead to business logic vulnerabilities." "state change takes place. This can lead to business logic vulnerabilities."
) )
return Issue( return PotentialIssue(
contract=global_state.environment.active_account.contract_name, contract=global_state.environment.active_account.contract_name,
function_name=global_state.environment.active_function_name, function_name=global_state.environment.active_function_name,
address=address, address=address,
@ -77,7 +84,8 @@ class StateChangeCallsAnnotation(StateAnnotation):
description_tail=description_tail, description_tail=description_tail,
swc_id=REENTRANCY, swc_id=REENTRANCY,
bytecode=global_state.environment.code.bytecode, bytecode=global_state.environment.code.bytecode,
transaction_sequence=transaction_sequence, constraints=constraints,
detector=detector,
) )
@ -104,12 +112,12 @@ class StateChange(DetectionModule):
) )
def _execute(self, state: GlobalState) -> None: def _execute(self, state: GlobalState) -> None:
if state.get_current_instruction()["address"] in self._cache: if state.get_current_instruction()["address"] in self.cache:
return return
issues = self._analyze_state(state) issues = self._analyze_state(state)
for issue in issues:
self._cache.add(issue.address) annotation = get_potential_issues_annotation(state)
self._issues.extend(issues) annotation.potential_issues.extend(issues)
@staticmethod @staticmethod
def _add_external_call(global_state: GlobalState) -> None: def _add_external_call(global_state: GlobalState) -> None:
@ -139,8 +147,7 @@ class StateChange(DetectionModule):
except UnsatError: except UnsatError:
pass pass
@staticmethod def _analyze_state(self, global_state: GlobalState) -> List[PotentialIssue]:
def _analyze_state(global_state: GlobalState) -> List[Issue]:
annotations = cast( annotations = cast(
List[StateChangeCallsAnnotation], List[StateChangeCallsAnnotation],
@ -171,7 +178,7 @@ class StateChange(DetectionModule):
for annotation in annotations: for annotation in annotations:
if not annotation.state_change_states: if not annotation.state_change_states:
continue continue
issue = annotation.get_issue(global_state) issue = annotation.get_issue(global_state, self)
if issue: if issue:
vulnerabilities.append(issue) vulnerabilities.append(issue)
return vulnerabilities return vulnerabilities

@ -46,12 +46,12 @@ class SuicideModule(DetectionModule):
:param state: :param state:
:return: :return:
""" """
if state.get_current_instruction()["address"] in self._cache: if state.get_current_instruction()["address"] in self.cache:
return return
issues = self._analyze_state(state) issues = self._analyze_state(state)
for issue in issues: for issue in issues:
self._cache.add(issue.address) self.cache.add(issue.address)
self._issues.extend(issues) self.issues.extend(issues)
@staticmethod @staticmethod
def _analyze_state(state): def _analyze_state(state):

@ -55,12 +55,12 @@ class UncheckedRetvalModule(DetectionModule):
:param state: :param state:
:return: :return:
""" """
if state.get_current_instruction()["address"] in self._cache: if state.get_current_instruction()["address"] in self.cache:
return return
issues = self._analyze_state(state) issues = self._analyze_state(state)
for issue in issues: for issue in issues:
self._cache.add(issue.address) self.cache.add(issue.address)
self._issues.extend(issues) self.issues.extend(issues)
def _analyze_state(self, state: GlobalState) -> list: def _analyze_state(self, state: GlobalState) -> list:
instruction = state.get_current_instruction() instruction = state.get_current_instruction()

@ -0,0 +1,108 @@
from mythril.analysis.report import Issue
from mythril.analysis.solver import get_transaction_sequence
from mythril.exceptions import UnsatError
from mythril.laser.ethereum.state.annotation import StateAnnotation
from mythril.laser.ethereum.state.global_state import GlobalState
class PotentialIssue:
"""Representation of a potential issue"""
def __init__(
self,
contract,
function_name,
address,
swc_id,
title,
bytecode,
detector,
severity=None,
description_head="",
description_tail="",
constraints=None,
):
"""
:param contract: The contract
:param function_name: Function name where the issue is detected
:param address: The address of the issue
:param swc_id: Issue's corresponding swc-id
:param title: Title
:param bytecode: bytecode of the issue
:param detector: The detector the potential issue belongs to
:param gas_used: amount of gas used
:param severity: The severity of the issue
:param description_head: The top part of description
:param description_tail: The bottom part of the description
:param constraints: The non-path related constraints for the potential issue
"""
self.title = title
self.contract = contract
self.function_name = function_name
self.address = address
self.description_head = description_head
self.description_tail = description_tail
self.severity = severity
self.swc_id = swc_id
self.bytecode = bytecode
self.constraints = constraints or []
self.detector = detector
class PotentialIssuesAnnotation(StateAnnotation):
def __init__(self):
self.potential_issues = []
def get_potential_issues_annotation(state: GlobalState) -> PotentialIssuesAnnotation:
"""
Returns the potential issues annotation of the given global state, and creates one if
one does not already exist.
:param state: The global state
:return:
"""
for annotation in state.annotations:
if isinstance(annotation, PotentialIssuesAnnotation):
return annotation
annotation = PotentialIssuesAnnotation()
state.annotate(annotation)
return annotation
def check_potential_issues(state: GlobalState) -> None:
"""
Called at the end of a transaction, checks potential issues, and
adds valid issues to the detector.
:param state: The final global state of a transaction
:return:
"""
annotation = get_potential_issues_annotation(state)
for potential_issue in annotation.potential_issues:
try:
transaction_sequence = get_transaction_sequence(
state, state.mstate.constraints + potential_issue.constraints
)
except UnsatError:
continue
annotation.potential_issues.remove(potential_issue)
potential_issue.detector.cache.add(potential_issue.address)
potential_issue.detector.issues.append(
Issue(
contract=potential_issue.contract,
function_name=potential_issue.function_name,
address=potential_issue.address,
title=potential_issue.title,
bytecode=potential_issue.bytecode,
swc_id=potential_issue.swc_id,
gas_used=(state.mstate.min_gas_used, state.mstate.max_gas_used),
severity=potential_issue.severity,
description_head=potential_issue.description_head,
description_tail=potential_issue.description_tail,
transaction_sequence=transaction_sequence,
)
)

@ -6,15 +6,21 @@
- SWC ID: {{ issue['swc-id'] }} - SWC ID: {{ issue['swc-id'] }}
- Severity: {{ issue.severity }} - Severity: {{ issue.severity }}
- Contract: {{ issue.contract | default("Unknown") }} - Contract: {{ issue.contract | default("Unknown") }}
{% if issue.function %}
- Function name: `{{ issue.function }}` - Function name: `{{ issue.function }}`
{% endif %}
- PC address: {{ issue.address }} - PC address: {{ issue.address }}
{% if issue.min_gas_used or issue.max_gas_used %}
- Estimated Gas Usage: {{ issue.min_gas_used }} - {{ issue.max_gas_used }} - Estimated Gas Usage: {{ issue.min_gas_used }} - {{ issue.max_gas_used }}
{% endif %}
### Description ### Description
{{ issue.description.rstrip() }} {{ issue.description.rstrip() }}
{% if issue.filename and issue.lineno %} {% if issue.filename and issue.lineno %}
In file: {{ issue.filename }}:{{ issue.lineno }} In file: {{ issue.filename }}:{{ issue.lineno }}
{% elif issue.filename %}
In file: {{ issue.filename }}
{% endif %} {% endif %}
{% if issue.code %} {% if issue.code %}

@ -4,9 +4,13 @@
SWC ID: {{ issue['swc-id'] }} SWC ID: {{ issue['swc-id'] }}
Severity: {{ issue.severity }} Severity: {{ issue.severity }}
Contract: {{ issue.contract | default("Unknown") }} Contract: {{ issue.contract | default("Unknown") }}
{% if issue.function %}
Function name: {{ issue.function }} Function name: {{ issue.function }}
{% endif %}
PC address: {{ issue.address }} PC address: {{ issue.address }}
{% if issue.min_gas_used or issue.max_gas_used %}
Estimated Gas Usage: {{ issue.min_gas_used }} - {{ issue.max_gas_used }} Estimated Gas Usage: {{ issue.min_gas_used }} - {{ issue.max_gas_used }}
{% endif %}
{{ issue.description }} {{ issue.description }}
-------------------- --------------------
{% if issue.filename and issue.lineno %} {% if issue.filename and issue.lineno %}

@ -24,39 +24,44 @@ def safe_decode(hex_encoded_string):
return bytes.fromhex(hex_encoded_string) return bytes.fromhex(hex_encoded_string)
def get_solc_json(file, solc_binary="solc", solc_args=None): def get_solc_json(file, solc_binary="solc", solc_settings_json=None):
""" """
:param file: :param file:
:param solc_binary: :param solc_binary:
:param solc_args: :param solc_settings_json:
:return: :return:
""" """
cmd = [solc_binary, "--standard-json", "--allow-paths", "."]
cmd = [solc_binary, "--combined-json", "bin,bin-runtime,srcmap,srcmap-runtime,ast"]
settings = json.loads(solc_settings_json) if solc_settings_json else {}
if solc_args: settings.update(
cmd.extend(solc_args.split()) {
if not "--allow-paths" in cmd: "outputSelection": {
cmd.extend(["--allow-paths", "."]) "*": {
else: "": ["ast"],
for i, arg in enumerate(cmd): "*": [
if arg == "--allow-paths": "metadata",
cmd[i + 1] += ",." "evm.bytecode",
"evm.deployedBytecode",
cmd.append(file) "evm.methodIdentifiers",
],
}
}
}
)
input_json = json.dumps(
{
"language": "Solidity",
"sources": {file: {"urls": [file]}},
"settings": settings,
}
)
try: try:
p = Popen(cmd, stdout=PIPE, stderr=PIPE) p = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)
stdout, stderr = p.communicate(bytes(input_json, "utf8"))
stdout, stderr = p.communicate()
ret = p.returncode
if ret != 0:
raise CompilerError(
"Solc experienced a fatal error (code %d).\n\n%s"
% (ret, stderr.decode("UTF-8"))
)
except FileNotFoundError: except FileNotFoundError:
raise CompilerError( raise CompilerError(
"Compiler not found. Make sure that solc is installed and in PATH, or set the SOLC environment variable." "Compiler not found. Make sure that solc is installed and in PATH, or set the SOLC environment variable."
@ -64,10 +69,15 @@ def get_solc_json(file, solc_binary="solc", solc_args=None):
out = stdout.decode("UTF-8") out = stdout.decode("UTF-8")
if not len(out): result = json.loads(out)
raise CompilerError("Compilation failed.")
for error in result.get("errors", []):
if error["severity"] == "error":
raise CompilerError(
"Solc experienced a fatal error.\n\n%s" % error["formattedMessage"]
)
return json.loads(out) return result
def encode_calldata(func_name, arg_types, args): def encode_calldata(func_name, arg_types, args):

@ -16,6 +16,7 @@ import traceback
import mythril.support.signatures as sigs import mythril.support.signatures as sigs
from argparse import ArgumentParser, Namespace, RawTextHelpFormatter from argparse import ArgumentParser, Namespace, RawTextHelpFormatter
from mythril import mythx
from mythril.exceptions import AddressNotFoundError, CriticalError from mythril.exceptions import AddressNotFoundError, CriticalError
from mythril.mythril import ( from mythril.mythril import (
MythrilAnalyzer, MythrilAnalyzer,
@ -27,12 +28,14 @@ from mythril.__version__ import __version__ as VERSION
ANALYZE_LIST = ("analyze", "a") ANALYZE_LIST = ("analyze", "a")
DISASSEMBLE_LIST = ("disassemble", "d") DISASSEMBLE_LIST = ("disassemble", "d")
PRO_LIST = ("pro", "p")
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
COMMAND_LIST = ( COMMAND_LIST = (
ANALYZE_LIST ANALYZE_LIST
+ DISASSEMBLE_LIST + DISASSEMBLE_LIST
+ PRO_LIST
+ ( + (
"read-storage", "read-storage",
"leveldb-search", "leveldb-search",
@ -41,6 +44,7 @@ COMMAND_LIST = (
"version", "version",
"truffle", "truffle",
"help", "help",
"pro",
) )
) )
@ -70,7 +74,27 @@ def exit_with_error(format_, message):
sys.exit() sys.exit()
def get_input_parser() -> ArgumentParser: def get_runtime_input_parser() -> ArgumentParser:
"""
Returns Parser which handles input
:return: Parser which handles input
"""
parser = ArgumentParser(add_help=False)
parser.add_argument(
"-a",
"--address",
help="pull contract from the blockchain",
metavar="CONTRACT_ADDRESS",
)
parser.add_argument(
"--bin-runtime",
action="store_true",
help="Only when -c or -f is used. Consider the input bytecode as binary runtime code, default being the contract creation bytecode.",
)
return parser
def get_creation_input_parser() -> ArgumentParser:
""" """
Returns Parser which handles input Returns Parser which handles input
:return: Parser which handles input :return: Parser which handles input
@ -89,17 +113,6 @@ def get_input_parser() -> ArgumentParser:
metavar="BYTECODEFILE", metavar="BYTECODEFILE",
type=argparse.FileType("r"), type=argparse.FileType("r"),
) )
parser.add_argument(
"-a",
"--address",
help="pull contract from the blockchain",
metavar="CONTRACT_ADDRESS",
)
parser.add_argument(
"--bin-runtime",
action="store_true",
help="Only when -c or -f is used. Consider the input bytecode as binary runtime code, default being the contract creation bytecode.",
)
return parser return parser
@ -144,7 +157,10 @@ def get_utilities_parser() -> ArgumentParser:
:return: Parser which handles utility flags :return: Parser which handles utility flags
""" """
parser = argparse.ArgumentParser(add_help=False) parser = argparse.ArgumentParser(add_help=False)
parser.add_argument("--solc-args", help="Extra arguments for solc") parser.add_argument(
"--solc-json",
help="Json for the optional 'settings' parameter of solc's standard-json input",
)
parser.add_argument( parser.add_argument(
"--solv", "--solv",
help="specify solidity compiler version. If not present, will try to install it (Experimental)", help="specify solidity compiler version. If not present, will try to install it (Experimental)",
@ -158,7 +174,8 @@ def main() -> None:
rpc_parser = get_rpc_parser() rpc_parser = get_rpc_parser()
utilities_parser = get_utilities_parser() utilities_parser = get_utilities_parser()
input_parser = get_input_parser() runtime_input_parser = get_runtime_input_parser()
creation_input_parser = get_creation_input_parser()
output_parser = get_output_parser() output_parser = get_output_parser()
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description="Security analysis of Ethereum smart contracts" description="Security analysis of Ethereum smart contracts"
@ -172,7 +189,13 @@ def main() -> None:
analyzer_parser = subparsers.add_parser( analyzer_parser = subparsers.add_parser(
ANALYZE_LIST[0], ANALYZE_LIST[0],
help="Triggers the analysis of the smart contract", help="Triggers the analysis of the smart contract",
parents=[rpc_parser, utilities_parser, input_parser, output_parser], parents=[
rpc_parser,
utilities_parser,
creation_input_parser,
runtime_input_parser,
output_parser,
],
aliases=ANALYZE_LIST[1:], aliases=ANALYZE_LIST[1:],
formatter_class=RawTextHelpFormatter, formatter_class=RawTextHelpFormatter,
) )
@ -182,11 +205,25 @@ def main() -> None:
DISASSEMBLE_LIST[0], DISASSEMBLE_LIST[0],
help="Disassembles the smart contract", help="Disassembles the smart contract",
aliases=DISASSEMBLE_LIST[1:], aliases=DISASSEMBLE_LIST[1:],
parents=[rpc_parser, utilities_parser, input_parser], parents=[
rpc_parser,
utilities_parser,
creation_input_parser,
runtime_input_parser,
],
formatter_class=RawTextHelpFormatter, formatter_class=RawTextHelpFormatter,
) )
create_disassemble_parser(disassemble_parser) create_disassemble_parser(disassemble_parser)
pro_parser = subparsers.add_parser(
PRO_LIST[0],
help="Analyzes input with the MythX API (https://mythx.io)",
aliases=PRO_LIST[1:],
parents=[utilities_parser, creation_input_parser, output_parser],
formatter_class=RawTextHelpFormatter,
)
create_pro_parser(pro_parser)
read_storage_parser = subparsers.add_parser( read_storage_parser = subparsers.add_parser(
"read-storage", "read-storage",
help="Retrieves storage slots from a given address through rpc", help="Retrieves storage slots from a given address through rpc",
@ -234,6 +271,25 @@ def create_disassemble_parser(parser: ArgumentParser):
) )
def create_pro_parser(parser: ArgumentParser):
"""
Modify parser to handle mythx analysis
:param parser:
:return:
"""
parser.add_argument(
"solidity_files",
nargs="*",
help="Inputs file name and contract name. \n"
"usage: file1.sol:OptionalContractName file2.sol file3.sol:OptionalContractName",
)
parser.add_argument(
"--full",
help="Run a full analysis. Default: quick analysis",
action="store_true",
)
def create_read_storage_parser(read_storage_parser: ArgumentParser): def create_read_storage_parser(read_storage_parser: ArgumentParser):
""" """
Modify parser to handle storage slots Modify parser to handle storage slots
@ -564,6 +620,17 @@ def execute_command(
) )
print(storage) print(storage)
elif args.command in PRO_LIST:
mode = "full" if args.full else "quick"
report = mythx.analyze(disassembler.contracts, mode)
outputs = {
"json": report.as_json(),
"jsonv2": report.as_swc_standard_format(),
"text": report.as_text(),
"markdown": report.as_markdown(),
}
print(outputs[args.outform])
elif args.command in DISASSEMBLE_LIST: elif args.command in DISASSEMBLE_LIST:
if disassembler.contracts[0].code: if disassembler.contracts[0].code:
print("Runtime Disassembly: \n" + disassembler.contracts[0].get_easm()) print("Runtime Disassembly: \n" + disassembler.contracts[0].get_easm())
@ -694,12 +761,12 @@ def parse_args_and_execute(parser: ArgumentParser, args: Namespace) -> None:
config = set_config(args) config = set_config(args)
leveldb_search(config, args) leveldb_search(config, args)
query_signature = args.__dict__.get("query_signature", None) query_signature = args.__dict__.get("query_signature", None)
solc_args = args.__dict__.get("solc_args", None) solc_json = args.__dict__.get("solc_json", None)
solv = args.__dict__.get("solv", None) solv = args.__dict__.get("solv", None)
disassembler = MythrilDisassembler( disassembler = MythrilDisassembler(
eth=config.eth, eth=config.eth,
solc_version=solv, solc_version=solv,
solc_args=solc_args, solc_settings_json=solc_json,
enable_online_lookup=query_signature, enable_online_lookup=query_signature,
) )
if args.command == "truffle": if args.command == "truffle":

@ -229,7 +229,10 @@ def create_parser(parser: argparse.ArgumentParser) -> None:
default=10, default=10,
help="The amount of seconds to spend on " "the initial contract creation", help="The amount of seconds to spend on " "the initial contract creation",
) )
options.add_argument("--solc-args", help="Extra arguments for solc") options.add_argument(
"--solc-json",
help="Json for the optional 'settings' parameter of solc's standard-json input",
)
options.add_argument( options.add_argument(
"--phrack", action="store_true", help="Phrack-style call graph" "--phrack", action="store_true", help="Phrack-style call graph"
) )
@ -522,7 +525,7 @@ def parse_args(parser: argparse.ArgumentParser, args: argparse.Namespace) -> Non
disassembler = MythrilDisassembler( disassembler = MythrilDisassembler(
eth=config.eth, eth=config.eth,
solc_version=args.solv, solc_version=args.solv,
solc_args=args.solc_args, solc_settings_json=args.solc_json,
enable_online_lookup=args.query_signature, enable_online_lookup=args.query_signature,
) )
if args.truffle: if args.truffle:

@ -5,6 +5,7 @@ from copy import copy
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Callable, Dict, DefaultDict, List, Tuple, Optional from typing import Callable, Dict, DefaultDict, List, Tuple, Optional
from mythril.analysis.potential_issues import check_potential_issues
from mythril.laser.ethereum.cfg import NodeFlags, Node, Edge, JumpType from mythril.laser.ethereum.cfg import NodeFlags, Node, Edge, JumpType
from mythril.laser.ethereum.evm_exceptions import StackUnderflowException from mythril.laser.ethereum.evm_exceptions import StackUnderflowException
from mythril.laser.ethereum.evm_exceptions import VmException from mythril.laser.ethereum.evm_exceptions import VmException
@ -344,6 +345,8 @@ class LaserEVM:
not isinstance(transaction, ContractCreationTransaction) not isinstance(transaction, ContractCreationTransaction)
or transaction.return_data or transaction.return_data
) and not end_signal.revert: ) and not end_signal.revert:
check_potential_issues(global_state)
end_signal.global_state.world_state.node = global_state.node end_signal.global_state.world_state.node = global_state.node
self._add_world_state(end_signal.global_state) self._add_world_state(end_signal.global_state)
new_global_states = [] new_global_states = []

@ -45,7 +45,7 @@ def get_instruction_index(
""" """
index = 0 index = 0
for instr in instruction_list: for instr in instruction_list:
if instr["address"] == address: if instr["address"] >= address:
return index return index
index += 1 index += 1
return None return None

@ -30,11 +30,11 @@ class MythrilDisassembler:
self, self,
eth: Optional[EthJsonRpc] = None, eth: Optional[EthJsonRpc] = None,
solc_version: str = None, solc_version: str = None,
solc_args: str = None, solc_settings_json: str = None,
enable_online_lookup: bool = False, enable_online_lookup: bool = False,
) -> None: ) -> None:
self.solc_binary = self._init_solc_binary(solc_version) self.solc_binary = self._init_solc_binary(solc_version)
self.solc_args = solc_args self.solc_settings_json = solc_settings_json
self.eth = eth self.eth = eth
self.enable_online_lookup = enable_online_lookup self.enable_online_lookup = enable_online_lookup
self.sigs = signatures.SignatureDB(enable_online_lookup=enable_online_lookup) self.sigs = signatures.SignatureDB(enable_online_lookup=enable_online_lookup)
@ -163,13 +163,15 @@ class MythrilDisassembler:
try: try:
# import signatures from solidity source # import signatures from solidity source
self.sigs.import_solidity_file( self.sigs.import_solidity_file(
file, solc_binary=self.solc_binary, solc_args=self.solc_args file,
solc_binary=self.solc_binary,
solc_settings_json=self.solc_settings_json,
) )
if contract_name is not None: if contract_name is not None:
contract = SolidityContract( contract = SolidityContract(
input_file=file, input_file=file,
name=contract_name, name=contract_name,
solc_args=self.solc_args, solc_settings_json=self.solc_settings_json,
solc_binary=self.solc_binary, solc_binary=self.solc_binary,
) )
self.contracts.append(contract) self.contracts.append(contract)
@ -177,7 +179,7 @@ class MythrilDisassembler:
else: else:
for contract in get_contracts_from_file( for contract in get_contracts_from_file(
input_file=file, input_file=file,
solc_args=self.solc_args, solc_settings_json=self.solc_settings_json,
solc_binary=self.solc_binary, solc_binary=self.solc_binary,
): ):
self.contracts.append(contract) self.contracts.append(contract)

@ -0,0 +1,111 @@
import sys
import os
import time
from mythx_models.exceptions import MythXAPIError
from typing import List, Dict, Any
from mythril.analysis.report import Issue, Report
from mythril.solidity.soliditycontract import SolidityContract
from pythx import Client
import logging
log = logging.getLogger(__name__)
TRIAL_ETH_ADDRESS = "0x0000000000000000000000000000000000000000"
TRIAL_PASSWORD = "trial"
def analyze(contracts: List[SolidityContract], analysis_mode: str = "quick") -> Report:
"""
Analyze contracts via the MythX API.
:param contracts: List of solidity contracts to analyze
:param analysis_mode: The mode to submit the analysis request with. "quick" or "full" (default: "quick")
:return: Report with analyzed contracts
"""
assert analysis_mode in ("quick", "full"), "analysis_mode must be 'quick' or 'full'"
c = Client(
eth_address=os.environ.get("MYTHX_ETH_ADDRESS", TRIAL_ETH_ADDRESS),
password=os.environ.get("MYTHX_PASSWORD", TRIAL_PASSWORD),
)
if c.eth_address == TRIAL_ETH_ADDRESS:
print(
"You are currently running MythX in Trial mode. This mode reports only a partial analysis of your smart contracts, limited to three vulnerabilities. To get a more complete analysis, sign up for a free account at https://mythx.io."
)
issues = [] # type: List[Issue]
# TODO: Analyze multiple contracts asynchronously.
for contract in contracts:
source_codes = {}
source_list = []
sources = {} # type: Dict[str, Any]
main_source = None
try:
main_source = contract.input_file
for solidity_file in contract.solidity_files:
source_codes[solidity_file.filename] = solidity_file.data
for filename in contract.solc_json["sources"].keys():
sources[filename] = {}
if source_codes[filename]:
sources[filename]["source"] = source_codes[filename]
sources[filename]["ast"] = contract.solc_json["sources"][filename][
"ast"
]
source_list.append(filename)
source_list.sort(
key=lambda fname: contract.solc_json["sources"][fname]["id"]
)
except AttributeError:
# No solidity file
pass
assert contract.creation_code, "Creation bytecode must exist."
try:
resp = c.analyze(
contract_name=contract.name,
analysis_mode=analysis_mode,
bytecode=contract.creation_code or None,
deployed_bytecode=contract.code or None,
sources=sources or None,
main_source=main_source,
source_list=source_list or None,
)
except MythXAPIError as e:
log.critical(e)
while not c.analysis_ready(resp.uuid):
log.info(c.status(resp.uuid).analysis)
time.sleep(5)
for issue in c.report(resp.uuid):
issue = Issue(
contract=contract.name,
function_name=None,
address=issue.locations[0].source_map.components[0].offset
if issue.locations
else -1,
swc_id=issue.swc_id[4:] or "None", # remove 'SWC-' prefix
title=issue.swc_title,
bytecode=contract.creation_code,
severity=issue.severity.capitalize(),
description_head=issue.description_short,
description_tail=issue.description_long,
)
issue.add_code_info(contract)
issues.append(issue)
report = Report(contracts=contracts)
for issue in issues:
report.append_issue(issue)
return report

@ -44,23 +44,28 @@ class SourceCodeInfo:
self.solc_mapping = mapping self.solc_mapping = mapping
def get_contracts_from_file(input_file, solc_args=None, solc_binary="solc"): def get_contracts_from_file(input_file, solc_settings_json=None, solc_binary="solc"):
""" """
:param input_file: :param input_file:
:param solc_args: :param solc_settings_json:
:param solc_binary: :param solc_binary:
""" """
data = get_solc_json(input_file, solc_args=solc_args, solc_binary=solc_binary) data = get_solc_json(
input_file, solc_settings_json=solc_settings_json, solc_binary=solc_binary
)
try: try:
for key, contract in data["contracts"].items(): for contract_name in data["contracts"][input_file].keys():
filename, name = key.split(":") if len(
if filename == input_file and len(contract["bin-runtime"]): data["contracts"][input_file][contract_name]["evm"]["deployedBytecode"][
"object"
]
):
yield SolidityContract( yield SolidityContract(
input_file=input_file, input_file=input_file,
name=name, name=contract_name,
solc_args=solc_args, solc_settings_json=solc_settings_json,
solc_binary=solc_binary, solc_binary=solc_binary,
) )
except KeyError: except KeyError:
@ -70,16 +75,22 @@ def get_contracts_from_file(input_file, solc_args=None, solc_binary="solc"):
class SolidityContract(EVMContract): class SolidityContract(EVMContract):
"""Representation of a Solidity contract.""" """Representation of a Solidity contract."""
def __init__(self, input_file, name=None, solc_args=None, solc_binary="solc"): def __init__(
data = get_solc_json(input_file, solc_args=solc_args, solc_binary=solc_binary) self, input_file, name=None, solc_settings_json=None, solc_binary="solc"
):
data = get_solc_json(
input_file, solc_settings_json=solc_settings_json, solc_binary=solc_binary
)
self.solidity_files = [] self.solidity_files = []
self.solc_json = data
self.input_file = input_file
for filename in data["sourceList"]: for filename, contract in data["sources"].items():
with open(filename, "r", encoding="utf-8") as file: with open(filename, "r", encoding="utf-8") as file:
code = file.read() code = file.read()
full_contract_src_maps = self.get_full_contract_src_maps( full_contract_src_maps = self.get_full_contract_src_maps(
data["sources"][filename]["AST"] contract["ast"]
) )
self.solidity_files.append( self.solidity_files.append(
SolidityFile(filename, code, full_contract_src_maps) SolidityFile(filename, code, full_contract_src_maps)
@ -91,32 +102,28 @@ class SolidityContract(EVMContract):
srcmap_constructor = [] srcmap_constructor = []
srcmap = [] srcmap = []
if name: if name:
for key, contract in sorted(data["contracts"].items()): contract = data["contracts"][input_file][name]
filename, _name = key.split(":") if len(contract["evm"]["deployedBytecode"]["object"]):
code = contract["evm"]["deployedBytecode"]["object"]
if ( creation_code = contract["evm"]["bytecode"]["object"]
filename == input_file srcmap = contract["evm"]["deployedBytecode"]["sourceMap"].split(";")
and name == _name srcmap_constructor = contract["evm"]["bytecode"]["sourceMap"].split(";")
and len(contract["bin-runtime"]) has_contract = True
):
code = contract["bin-runtime"]
creation_code = contract["bin"]
srcmap = contract["srcmap-runtime"].split(";")
srcmap_constructor = contract["srcmap"].split(";")
has_contract = True
break
# If no contract name is specified, get the last bytecode entry for the input file # If no contract name is specified, get the last bytecode entry for the input file
else: else:
for key, contract in sorted(data["contracts"].items()): for contract_name, contract in sorted(
filename, name = key.split(":") data["contracts"][input_file].items()
):
if filename == input_file and len(contract["bin-runtime"]): if len(contract["evm"]["deployedBytecode"]["object"]):
code = contract["bin-runtime"] name = contract_name
creation_code = contract["bin"] code = contract["evm"]["deployedBytecode"]["object"]
srcmap = contract["srcmap-runtime"].split(";") creation_code = contract["evm"]["bytecode"]["object"]
srcmap_constructor = contract["srcmap"].split(";") srcmap = contract["evm"]["deployedBytecode"]["sourceMap"].split(";")
srcmap_constructor = contract["evm"]["bytecode"]["sourceMap"].split(
";"
)
has_contract = True has_contract = True
if not has_contract: if not has_contract:
@ -139,8 +146,8 @@ class SolidityContract(EVMContract):
:return: The source maps :return: The source maps
""" """
source_maps = set() source_maps = set()
for child in ast["children"]: for child in ast["nodes"]:
if "contractKind" in child["attributes"]: if child.get("contractKind"):
source_maps.add(child["src"]) source_maps.add(child["src"])
return source_maps return source_maps

@ -1,5 +1,6 @@
"""The Mythril function signature database.""" """The Mythril function signature database."""
import functools import functools
import json
import logging import logging
import multiprocessing import multiprocessing
import os import os
@ -9,6 +10,7 @@ from collections import defaultdict
from subprocess import PIPE, Popen from subprocess import PIPE, Popen
from typing import List, Set, DefaultDict, Dict from typing import List, Set, DefaultDict, Dict
from mythril.ethereum.util import get_solc_json
from mythril.exceptions import CompilerError from mythril.exceptions import CompilerError
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -231,53 +233,20 @@ class SignatureDB(object, metaclass=Singleton):
return [] return []
def import_solidity_file( def import_solidity_file(
self, file_path: str, solc_binary: str = "solc", solc_args: str = None self, file_path: str, solc_binary: str = "solc", solc_settings_json: str = None
): ):
"""Import Function Signatures from solidity source files. """Import Function Signatures from solidity source files.
:param solc_binary: :param solc_binary:
:param solc_args: :param solc_settings_json:
:param file_path: solidity source code file path :param file_path: solidity source code file path
:return: :return:
""" """
cmd = [solc_binary, "--hashes", file_path] solc_json = get_solc_json(file_path, solc_binary, solc_settings_json)
if solc_args:
cmd.extend(solc_args.split())
try: for contract in solc_json["contracts"][file_path].values():
p = Popen(cmd, stdout=PIPE, stderr=PIPE) for name, hash in contract["evm"]["methodIdentifiers"].items():
stdout, stderr = p.communicate() self.add("0x" + hash, name)
ret = p.returncode
if ret != 0:
raise CompilerError(
"Solc has experienced a fatal error (code {}).\n\n{}".format(
ret, stderr.decode("utf-8")
)
)
except FileNotFoundError:
raise CompilerError(
(
"Compiler not found. Make sure that solc is installed and in PATH, "
"or the SOLC environment variable is set."
)
)
stdout = stdout.decode("unicode_escape").split("\n")
for line in stdout:
# the ':' need not be checked but just to be sure
if all(map(lambda x: x in line, ["(", ")", ":"])):
solc_bytes = "0x" + line.split(":")[0]
solc_text = line.split(":")[1].strip()
self.solidity_sigs[solc_bytes].append(solc_text)
log.debug(
"Signatures: found %d signatures after parsing" % len(self.solidity_sigs)
)
# update DB with what we've found
for byte_sig, text_sigs in self.solidity_sigs.items():
for text_sig in text_sigs:
self.add(byte_sig, text_sig)
@staticmethod @staticmethod
def lookup_online(byte_sig: str, timeout: int, proxies=None) -> List[str]: def lookup_online(byte_sig: str, timeout: int, proxies=None) -> List[str]:

@ -21,9 +21,10 @@ py-solc
pytest>=3.6.0 pytest>=3.6.0
pytest-cov pytest-cov
pytest_mock pytest_mock
requests requests>=2.22.0
rlp>=1.0.1 rlp>=1.0.1
transaction>=2.2.1 transaction>=2.2.1
z3-solver>=4.8.5.0 z3-solver>=4.8.5.0
pysha3 pysha3
matplotlib matplotlib
pythx

@ -28,7 +28,7 @@ REQUIRED = [
"py_ecc==1.6.0", "py_ecc==1.6.0",
"ethereum>=2.3.2", "ethereum>=2.3.2",
"z3-solver>=4.8.5.0", "z3-solver>=4.8.5.0",
"requests", "requests>=2.22.0",
"py-solc", "py-solc",
"plyvel", "plyvel",
"eth_abi==1.3.0", "eth_abi==1.3.0",
@ -49,6 +49,7 @@ REQUIRED = [
"persistent>=4.2.0", "persistent>=4.2.0",
"ethereum-input-decoder>=0.2.2", "ethereum-input-decoder>=0.2.2",
"matplotlib", "matplotlib",
"pythx",
] ]
TESTS_REQUIRE = ["mypy", "pytest>=3.6.0", "pytest_mock", "pytest-cov"] TESTS_REQUIRE = ["mypy", "pytest>=3.6.0", "pytest_mock", "pytest-cov"]

File diff suppressed because one or more lines are too long
Loading…
Cancel
Save