@ -1,14 +1,12 @@
from mythril . analysis . ops import Call , Variable , VarType
from mythril . analysis . swc_data import REENTRANCY
from mythril . analysis . swc_data import REENTRANCY
from mythril . analysis . modules . base import DetectionModule
from mythril . analysis . modules . base import DetectionModule
from mythril . analysis . report import Issue
from mythril . analysis . report import Issue
from mythril . analysis . call_helpers import get_call_from_state
from mythril . laser . smt import symbol_factory , UGT , BitVec
from mythril . laser . smt import symbol_factory , simplify , UGT
from mythril . laser . ethereum . state . global_state import GlobalState
from mythril . laser . ethereum . state . global_state import GlobalState
from mythril . laser . ethereum . state . annotation import StateAnnotation
from mythril . laser . ethereum . state . annotation import StateAnnotation
from mythril . analysis import solver
from mythril . analysis import solver
from mythril . exceptions import UnsatError
from mythril . exceptions import UnsatError
from typing import List , cast
from typing import List , cast , Optional
from copy import copy
from copy import copy
import logging
import logging
@ -21,25 +19,47 @@ Check whether there is a state change of the contract after the execution of an
"""
"""
class CallIssue :
class StateChangeCallsAnnotation ( StateAnnotation ) :
""" This class is a struct of
def __init__ ( self , call_state : GlobalState , user_defined_address : bool ) - > None :
call : the Call struct
self . call_state = call_state
user_defined_address : Whether the address can be defined by user or not
self . state_change_states = [ ] # type: List[GlobalState]
"""
def __init__ ( self , call : Call , user_defined_address : bool ) - > None :
self . call = call
self . user_defined_address = user_defined_address
self . user_defined_address = user_defined_address
def __copy__ ( self ) :
new_annotation = StateChangeCallsAnnotation (
self . call_state , self . user_defined_address
)
new_annotation . state_change_states = self . state_change_states [ : ]
return new_annotation
class StateChangeCallsAnnotation ( StateAnnotation ) :
def get_issue ( self ) - > Optional [ Issue ] :
def __init__ ( self ) - > None :
if not self . state_change_states :
self . calls = [ ] # type: List[CallIssue]
return None
def __copy__ ( self ) :
severity = " Medium " if self . user_defined_address else " Low "
result = StateChangeCallsAnnotation ( )
address = self . call_state . get_current_instruction ( ) [ " address " ]
result . calls = copy ( self . calls )
logging . debug (
return result
" [EXTERNAL_CALLS] Detected state changes at addresses: {} " . format ( address )
)
description_head = (
" The contract account state is changed after an external call. "
)
description_tail = (
" Consider that the called contract could re-enter the function before this "
" state change takes place. This can lead to business logic vulnerabilities. "
)
return Issue (
contract = self . call_state . environment . active_account . contract_name ,
function_name = self . call_state . environment . active_function_name ,
address = address ,
title = " State change after external call " ,
severity = severity ,
description_head = description_head ,
description_tail = description_tail ,
swc_id = REENTRANCY ,
bytecode = self . call_state . environment . code . bytecode ,
)
class StateChange ( DetectionModule ) :
class StateChange ( DetectionModule ) :
@ -64,25 +84,15 @@ class StateChange(DetectionModule):
)
)
def execute ( self , state : GlobalState ) :
def execute ( self , state : GlobalState ) :
"""
: param state :
: return :
"""
self . _issues . extend ( self . _analyze_state ( state ) )
self . _issues . extend ( self . _analyze_state ( state ) )
return self . issues
return self . issues
@staticmethod
@staticmethod
def _add_external_call (
def _add_external_call ( global_state : GlobalState ) - > None :
state : GlobalState , annotations : List [ StateChangeCallsAnnotation ]
gas = global_state . mstate . stack [ - 1 ]
) - > None :
to = global_state . mstate . stack [ - 2 ]
call = get_call_from_state ( state )
gas = state . mstate . stack [ - 1 ]
to = state . mstate . stack [ - 2 ]
if call is None :
return
try :
try :
constraints = copy ( state . mstate . constraints )
constraints = copy ( global_state . mstate . constraints )
solver . get_model (
solver . get_model (
constraints + [ UGT ( gas , symbol_factory . BitVecVal ( 2300 , 256 ) ) ]
constraints + [ UGT ( gas , symbol_factory . BitVecVal ( 2300 , 256 ) ) ]
)
)
@ -91,111 +101,65 @@ class StateChange(DetectionModule):
try :
try :
constraints + = [ to == 0xDEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEF ]
constraints + = [ to == 0xDEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEF ]
solver . get_model ( constraints )
solver . get_model ( constraints )
annotations [ 0 ] . calls . append (
CallIssue ( call = call , user_defined_address = True )
)
except UnsatError :
annotations [ 0 ] . calls . append (
CallIssue ( call = call , user_defined_address = False )
)
global_state . annotate ( StateChangeCallsAnnotation ( global_state , True ) )
except UnsatError :
global_state . annotate ( StateChangeCallsAnnotation ( global_state , False ) )
except UnsatError :
except UnsatError :
pass
pass
@staticmethod
@staticmethod
def _analyze_state ( state : GlobalState ) - > List [ Issue ] :
def _analyze_state ( global_state : GlobalState ) - > List [ Issue ] :
"""
: param state :
: return :
"""
address = state . get_current_instruction ( ) [ " address " ]
annotations = cast (
annotations = cast (
List [ StateChangeCallsAnnotation ] ,
List [ StateChangeCallsAnnotation ] ,
list ( state . get_annotations ( StateChangeCallsAnnotation ) ) ,
list ( global_state . get_annotations ( StateChangeCallsAnnotation ) ) ,
)
)
opcode = state . get_current_instruction ( ) [ " opcode " ]
op_ code = global_ state. get_current_instruction ( ) [ " opcode " ]
if len ( annotations ) == 0 :
if len ( annotations ) == 0 :
if opcode in ( " SSTORE " , " CREATE " , " CREATE2 " ) :
if op_code in ( " SSTORE " , " CREATE " , " CREATE2 " ) :
return [ ]
log . debug ( " Creating annotation for state " )
state . annotate ( StateChangeCallsAnnotation ( ) )
annotations = cast (
List [ StateChangeCallsAnnotation ] ,
list ( state . get_annotations ( StateChangeCallsAnnotation ) ) ,
)
if opcode in ( " SSTORE " , " CREATE " , " CREATE2 " ) :
return StateChange . _handle_state_change (
state , address = address , annotation = annotations [ 0 ]
)
call = get_call_from_state ( state )
if call is None :
return [ ]
return [ ]
if op_code in ( " SSTORE " , " CREATE " , " CREATE2 " ) :
if StateChange . _balance_change ( call . value ) :
for annotation in annotations :
return StateChange . _handle_state_change (
annotation . state_change_states . append ( global_state )
state , address = address , annotation = annotations [ 0 ]
)
# Record state changes following from a transfer of ether
if op_code in ( " CALL " , " DELEGATECALL " , " CALLCODE " ) :
if opcode == " CALL " :
value : BitVec = global_state . mstate . stack [ - 3 ]
StateChange . _add_external_call ( state , annotations = annotations )
if StateChange . _balance_change ( value , global_state ) :
for annotation in annotations :
return [ ]
annotation . state_change_states . append ( global_state )
@staticmethod
# Record external calls
def _get_state_change_issues (
if op_code in ( " CALL " , " DELEGATECALL " , " CALLCODE " ) :
callissues : List [ CallIssue ] , state : GlobalState , address : int
StateChange . _add_external_call ( global_state )
) - > List [ Issue ] :
issues = [ ]
# Check for vulnerabilities
for callissue in callissues :
vulnerabilities = [ ]
severity = " Medium " if callissue . user_defined_address else " Low "
for annotation in annotations :
call = callissue . call
if not annotation . state_change_states :
logging . debug (
continue
" [EXTERNAL_CALLS] Detected state changes at addresses: {} " . format (
vulnerabilities . append ( annotation . get_issue ( ) )
address
global_state . annotations . remove ( annotation )
)
return vulnerabilities
)
description_head = (
" The contract account state is changed after an external call. "
)
description_tail = (
" Consider that the called contract could re-enter the function before this "
" state change takes place. This can lead to business logic vulnerabilities. "
)
issue = Issue (
contract = call . state . environment . active_account . contract_name ,
function_name = call . state . environment . active_function_name ,
address = address ,
title = " State change after external call " ,
severity = severity ,
description_head = description_head ,
description_tail = description_tail ,
swc_id = REENTRANCY ,
bytecode = state . environment . code . bytecode ,
)
issues . append ( issue )
return issues
@staticmethod
@staticmethod
def _handle_state_change (
def _balance_change ( value : BitVec , global_state : GlobalState ) - > bool :
state : GlobalState , address : int , annotation : StateChangeCallsAnnotation
) - > List [ Issue ] :
calls = annotation . calls
issues = StateChange . _get_state_change_issues ( calls , state , address )
return issues
@staticmethod
def _balance_change ( value : BitVec ) - > bool :
if not value . symbolic :
if not value . symbolic :
assert value . value is not None
assert value . value is not None
return value . value > 0
return value . value > 0
else :
else :
zero = symbol_factory . BitVecVal ( 0 , 256 )
constraints = copy ( global_state . mstate . constraints )
return simplify ( value > zero )
try :
solver . get_model (
constraints + [ value > symbol_factory . BitVecVal ( 0 , 256 ) ]
)
return True
except UnsatError :
return False
detector = StateChange ( )
detector = StateChange ( )