Implement EXTCODECOPY (#928)

* Implement EXTCODECOPY

* Cache the code received from Dynld

* Reuse code_copy_helper return value

* Add tests to cover the extcodecopy cases

* Shift the addr function to WorldState

* Refactor with black

* Change based on recent PRs

* Add docstrings

* Shorten the code
pull/1099/head
Nikhil Parasaram 6 years ago committed by GitHub
parent 842f5370c5
commit 5252a05524
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 120
      mythril/laser/ethereum/instructions.py
  2. 22
      mythril/laser/ethereum/state/world_state.py
  3. 50
      tests/laser/state/world_state_account_exist_load_test.py

@ -1009,16 +1009,61 @@ class Instruction:
global_state.mstate.stack.pop(),
global_state.mstate.stack.pop(),
)
return self._code_copy_helper(
code=global_state.environment.code.bytecode,
memory_offset=memory_offset,
code_offset=code_offset,
size=size,
op="CODECOPY",
global_state=global_state,
)
@StateTransition()
def extcodesize_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
addr = state.stack.pop()
try:
addr = hex(helper.get_concrete_int(addr))
except TypeError:
log.debug("unsupported symbolic address for EXTCODESIZE")
state.stack.append(global_state.new_bitvec("extcodesize_" + str(addr), 256))
return [global_state]
try:
code = global_state.world_state.accounts_exist_or_load(
addr, self.dynamic_loader
)
except (ValueError, AttributeError) as e:
log.debug("error accessing contract storage due to: " + str(e))
state.stack.append(global_state.new_bitvec("extcodesize_" + str(addr), 256))
return [global_state]
state.stack.append(len(code) // 2)
return [global_state]
@staticmethod
def _code_copy_helper(
code: str,
memory_offset: BitVec,
code_offset: BitVec,
size: BitVec,
op: str,
global_state: GlobalState,
) -> List[GlobalState]:
try:
concrete_memory_offset = helper.get_concrete_int(memory_offset)
except TypeError:
log.debug("Unsupported symbolic memory offset in CODECOPY")
log.debug("Unsupported symbolic memory offset in {}".format(op))
return [global_state]
try:
size = helper.get_concrete_int(size)
global_state.mstate.mem_extend(concrete_memory_offset, size)
concrete_size = helper.get_concrete_int(size)
global_state.mstate.mem_extend(concrete_memory_offset, concrete_size)
except TypeError:
# except both attribute error and Exception
@ -1036,9 +1081,9 @@ class Instruction:
try:
concrete_code_offset = helper.get_concrete_int(code_offset)
except TypeError:
log.debug("Unsupported symbolic code offset in CODECOPY")
global_state.mstate.mem_extend(concrete_memory_offset, size)
for i in range(size):
log.debug("Unsupported symbolic code offset in {}".format(op))
global_state.mstate.mem_extend(concrete_memory_offset, concrete_size)
for i in range(concrete_size):
global_state.mstate.memory[
concrete_memory_offset + i
] = global_state.new_bitvec(
@ -1049,21 +1094,20 @@ class Instruction:
)
return [global_state]
bytecode = global_state.environment.code.bytecode
if bytecode[0:2] == "0x":
bytecode = bytecode[2:]
if code[0:2] == "0x":
code = code[2:]
if size == 0 and isinstance(
if concrete_size == 0 and isinstance(
global_state.current_transaction, ContractCreationTransaction
):
if concrete_code_offset >= len(bytecode) // 2:
self._handle_symbolic_args(global_state, concrete_memory_offset)
if concrete_code_offset >= len(code) // 2:
Instruction._handle_symbolic_args(global_state, concrete_memory_offset)
return [global_state]
for i in range(size):
if 2 * (concrete_code_offset + i + 1) <= len(bytecode):
for i in range(concrete_size):
if 2 * (concrete_code_offset + i + 1) <= len(code):
global_state.mstate.memory[concrete_memory_offset + i] = int(
bytecode[
code[
2
* (concrete_code_offset + i) : 2
* (concrete_code_offset + i + 1)
@ -1083,35 +1127,41 @@ class Instruction:
return [global_state]
@StateTransition()
def extcodesize_(self, global_state: GlobalState) -> List[GlobalState]:
def extcodecopy_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
addr = state.stack.pop()
environment = global_state.environment
addr, memory_offset, code_offset, size = (
state.stack.pop(),
state.stack.pop(),
state.stack.pop(),
state.stack.pop(),
)
try:
addr = hex(helper.get_concrete_int(addr))
except TypeError:
log.debug("unsupported symbolic address for EXTCODESIZE")
state.stack.append(global_state.new_bitvec("extcodesize_" + str(addr), 256))
log.debug("unsupported symbolic address for EXTCODECOPY")
return [global_state]
try:
code = self.dynamic_loader.dynld(addr)
code = global_state.world_state.accounts_exist_or_load(
addr, self.dynamic_loader
)
except (ValueError, AttributeError) as e:
log.debug("error accessing contract storage due to: " + str(e))
state.stack.append(global_state.new_bitvec("extcodesize_" + str(addr), 256))
return [global_state]
if code is None:
state.stack.append(0)
else:
state.stack.append(len(code.bytecode) // 2)
return [global_state]
return self._code_copy_helper(
code=code,
memory_offset=memory_offset,
code_offset=code_offset,
size=size,
op="EXTCODECOPY",
global_state=global_state,
)
@StateTransition
def extcodehash_(self, global_state: GlobalState) -> List[GlobalState]:
@ -1127,20 +1177,6 @@ class Instruction:
)
return [global_state]
@StateTransition()
def extcodecopy_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
# FIXME: not implemented
state = global_state.mstate
addr = state.stack.pop()
start, s2, size = state.stack.pop(), state.stack.pop(), state.stack.pop()
return [global_state]
@StateTransition()
def returndatacopy_(self, global_state: GlobalState) -> List[GlobalState]:
"""

@ -3,6 +3,7 @@ from copy import copy
from random import randint
from typing import Dict, List, Iterator, Optional, TYPE_CHECKING
from mythril.support.loader import DynLoader
from mythril.laser.smt import symbol_factory, Array, BitVec
from ethereum.utils import mk_contract_address
from mythril.laser.ethereum.state.account import Account
@ -64,6 +65,27 @@ class WorldState:
new_world_state.node = self.node
return new_world_state
def accounts_exist_or_load(self, addr: str, dynamic_loader: DynLoader) -> str:
"""
returns account if it exists, else it loads from the dynamic loader
:param addr: address
:param dynamic_loader: Dynamic Loader
:return: The code
"""
addr_bitvec = symbol_factory.BitVecVal(int(addr, 16), 256)
if addr_bitvec.value in self.accounts:
code = self.accounts[addr_bitvec.value].code
else:
code = dynamic_loader.dynld(addr)
self.create_account(
balance=0, address=addr_bitvec.value, dynamic_loader=dynamic_loader
)
if code is None:
code = ""
else:
code = code.bytecode
return code
def create_account(
self,
balance=0,

@ -0,0 +1,50 @@
import pytest
from mythril.disassembler.disassembly import Disassembly
from mythril.laser.ethereum.state.environment import Environment
from mythril.laser.ethereum.state.account import Account
from mythril.laser.ethereum.state.machine_state import MachineState
from mythril.laser.ethereum.state.global_state import GlobalState
from mythril.laser.ethereum.state.world_state import WorldState
from mythril.support.loader import DynLoader
from mythril.ethereum.interface.rpc.client import EthJsonRpc
from mythril.laser.ethereum.instructions import Instruction
def _get_global_state():
active_account = Account("0x0", code=Disassembly("60606040"))
passive_account = Account(
"0x325345346564645654645", code=Disassembly("6060604061626364")
)
environment = Environment(active_account, None, None, None, None, None)
world_state = WorldState()
world_state.put_account(active_account)
world_state.put_account(passive_account)
return GlobalState(world_state, environment, None, MachineState(gas_limit=8000000))
@pytest.mark.parametrize(
"addr, eth, code_len",
[
(
"0xb09C477eCDAd49DD5Ac26c2C64914C3a6693843a",
EthJsonRpc("rinkeby.infura.io", 443, True),
1548,
),
(
"0x863DF6BFa4469f3ead0bE8f9F2AAE51c91A907b4",
EthJsonRpc("mainnet.infura.io", 443, True),
0,
),
(
"0x325345346564645654645",
EthJsonRpc("mainnet.infura.io", 443, True),
16,
), # This contract tests Address Cache
],
)
def test_extraction(addr, eth, code_len):
global_state = _get_global_state()
dynamic_loader = DynLoader(eth=eth)
code = global_state.world_state.accounts_exist_or_load(addr, dynamic_loader)
assert len(code) == code_len
Loading…
Cancel
Save