mirror of https://github.com/crytic/slither
commit
879157cfc1
@ -0,0 +1,50 @@ |
||||
--- |
||||
name: Test slither-read-storage |
||||
|
||||
defaults: |
||||
run: |
||||
# To load bashrc |
||||
shell: bash -ieo pipefail {0} |
||||
|
||||
on: |
||||
pull_request: |
||||
branches: [master, dev] |
||||
schedule: |
||||
# run CI every day even if no PRs/merges occur |
||||
- cron: '0 12 * * *' |
||||
|
||||
jobs: |
||||
build: |
||||
name: Test slither-read-storage |
||||
runs-on: ubuntu-latest |
||||
|
||||
steps: |
||||
- uses: actions/checkout@v2 |
||||
- name: Setup node |
||||
uses: actions/setup-node@v2 |
||||
with: |
||||
node-version: '14' |
||||
|
||||
- name: Install ganache |
||||
run: npm install --global ganache |
||||
|
||||
- name: Set up Python 3.8 |
||||
uses: actions/setup-python@v2 |
||||
with: |
||||
python-version: 3.8 |
||||
|
||||
- name: Install python dependencies |
||||
run: | |
||||
pip install ".[dev]" |
||||
pip install web3 |
||||
solc-select install 0.8.1 |
||||
solc-select install 0.8.10 |
||||
solc-select use 0.8.1 |
||||
|
||||
- name: Run slither-read-storage |
||||
run: | |
||||
pytest tests/test_read_storage.py |
||||
|
||||
- name: Run storage layout tests |
||||
run: | |
||||
pytest tests/test_storage_layout.py |
@ -0,0 +1,95 @@ |
||||
from typing import List |
||||
from slither.core.cfg.node import Node |
||||
from slither.core.declarations.solidity_variables import SolidityVariable |
||||
from slither.slithir.operations import HighLevelCall, LibraryCall |
||||
from slither.core.declarations import Contract, Function, SolidityVariableComposed |
||||
from slither.analyses.data_dependency.data_dependency import is_dependent |
||||
from slither.core.compilation_unit import SlitherCompilationUnit |
||||
|
||||
|
||||
class ArbitrarySendErc20: |
||||
"""Detects instances where ERC20 can be sent from an arbitrary from address.""" |
||||
|
||||
def __init__(self, compilation_unit: SlitherCompilationUnit): |
||||
self._compilation_unit = compilation_unit |
||||
self._no_permit_results: List[Node] = [] |
||||
self._permit_results: List[Node] = [] |
||||
|
||||
@property |
||||
def compilation_unit(self) -> SlitherCompilationUnit: |
||||
return self._compilation_unit |
||||
|
||||
@property |
||||
def no_permit_results(self) -> List[Node]: |
||||
return self._no_permit_results |
||||
|
||||
@property |
||||
def permit_results(self) -> List[Node]: |
||||
return self._permit_results |
||||
|
||||
def _detect_arbitrary_from(self, contract: Contract): |
||||
for f in contract.functions: |
||||
all_high_level_calls = [ |
||||
f_called[1].solidity_signature |
||||
for f_called in f.high_level_calls |
||||
if isinstance(f_called[1], Function) |
||||
] |
||||
all_library_calls = [f_called[1].solidity_signature for f_called in f.library_calls] |
||||
if ( |
||||
"transferFrom(address,address,uint256)" in all_high_level_calls |
||||
or "safeTransferFrom(address,address,address,uint256)" in all_library_calls |
||||
): |
||||
if ( |
||||
"permit(address,address,uint256,uint256,uint8,bytes32,bytes32)" |
||||
in all_high_level_calls |
||||
): |
||||
ArbitrarySendErc20._arbitrary_from(f.nodes, self._permit_results) |
||||
else: |
||||
ArbitrarySendErc20._arbitrary_from(f.nodes, self._no_permit_results) |
||||
|
||||
@staticmethod |
||||
def _arbitrary_from(nodes: List[Node], results: List[Node]): |
||||
"""Finds instances of (safe)transferFrom that do not use msg.sender or address(this) as from parameter.""" |
||||
for node in nodes: |
||||
for ir in node.irs: |
||||
if ( |
||||
isinstance(ir, HighLevelCall) |
||||
and isinstance(ir.function, Function) |
||||
and ir.function.solidity_signature == "transferFrom(address,address,uint256)" |
||||
and not ( |
||||
is_dependent( |
||||
ir.arguments[0], |
||||
SolidityVariableComposed("msg.sender"), |
||||
node.function.contract, |
||||
) |
||||
or is_dependent( |
||||
ir.arguments[0], |
||||
SolidityVariable("this"), |
||||
node.function.contract, |
||||
) |
||||
) |
||||
): |
||||
results.append(ir.node) |
||||
elif ( |
||||
isinstance(ir, LibraryCall) |
||||
and ir.function.solidity_signature |
||||
== "safeTransferFrom(address,address,address,uint256)" |
||||
and not ( |
||||
is_dependent( |
||||
ir.arguments[1], |
||||
SolidityVariableComposed("msg.sender"), |
||||
node.function.contract, |
||||
) |
||||
or is_dependent( |
||||
ir.arguments[1], |
||||
SolidityVariable("this"), |
||||
node.function.contract, |
||||
) |
||||
) |
||||
): |
||||
results.append(ir.node) |
||||
|
||||
def detect(self): |
||||
"""Detect transfers that use arbitrary `from` parameter.""" |
||||
for c in self.compilation_unit.contracts_derived: |
||||
self._detect_arbitrary_from(c) |
@ -0,0 +1,45 @@ |
||||
from typing import List |
||||
from slither.detectors.abstract_detector import AbstractDetector, DetectorClassification |
||||
from slither.utils.output import Output |
||||
from .arbitrary_send_erc20 import ArbitrarySendErc20 |
||||
|
||||
|
||||
class ArbitrarySendErc20NoPermit(AbstractDetector): |
||||
""" |
||||
Detect when `msg.sender` is not used as `from` in transferFrom |
||||
""" |
||||
|
||||
ARGUMENT = "arbitrary-send-erc20" |
||||
HELP = "transferFrom uses arbitrary `from`" |
||||
IMPACT = DetectorClassification.HIGH |
||||
CONFIDENCE = DetectorClassification.HIGH |
||||
|
||||
WIKI = "https://github.com/trailofbits/slither/wiki/Detector-Documentation#arbitrary-send-erc20" |
||||
|
||||
WIKI_TITLE = "Arbitrary `from` in transferFrom" |
||||
WIKI_DESCRIPTION = "Detect when `msg.sender` is not used as `from` in transferFrom." |
||||
WIKI_EXPLOIT_SCENARIO = """ |
||||
```solidity |
||||
function a(address from, address to, uint256 amount) public { |
||||
erc20.transferFrom(from, to, am); |
||||
} |
||||
``` |
||||
Alice approves this contract to spend her ERC20 tokens. Bob can call `a` and specify Alice's address as the `from` parameter in `transferFrom`, allowing him to transfer Alice's tokens to himself.""" |
||||
|
||||
WIKI_RECOMMENDATION = """ |
||||
Use `msg.sender` as `from` in transferFrom. |
||||
""" |
||||
|
||||
def _detect(self) -> List[Output]: |
||||
"""""" |
||||
results: List[Output] = [] |
||||
|
||||
arbitrary_sends = ArbitrarySendErc20(self.compilation_unit) |
||||
arbitrary_sends.detect() |
||||
for node in arbitrary_sends.no_permit_results: |
||||
func = node.function |
||||
info = [func, " uses arbitrary from in transferFrom: ", node, "\n"] |
||||
res = self.generate_result(info) |
||||
results.append(res) |
||||
|
||||
return results |
@ -0,0 +1,53 @@ |
||||
from typing import List |
||||
from slither.detectors.abstract_detector import AbstractDetector, DetectorClassification |
||||
from slither.utils.output import Output |
||||
from .arbitrary_send_erc20 import ArbitrarySendErc20 |
||||
|
||||
|
||||
class ArbitrarySendErc20Permit(AbstractDetector): |
||||
""" |
||||
Detect when `msg.sender` is not used as `from` in transferFrom along with the use of permit. |
||||
""" |
||||
|
||||
ARGUMENT = "arbitrary-send-erc20-permit" |
||||
HELP = "transferFrom uses arbitrary from with permit" |
||||
IMPACT = DetectorClassification.HIGH |
||||
CONFIDENCE = DetectorClassification.MEDIUM |
||||
|
||||
WIKI = "https://github.com/trailofbits/slither/wiki/Detector-Documentation#arbitrary-send-erc20-permit" |
||||
|
||||
WIKI_TITLE = "Arbitrary `from` in transferFrom used with permit" |
||||
WIKI_DESCRIPTION = ( |
||||
"Detect when `msg.sender` is not used as `from` in transferFrom and permit is used." |
||||
) |
||||
WIKI_EXPLOIT_SCENARIO = """ |
||||
```solidity |
||||
function bad(address from, uint256 value, uint256 deadline, uint8 v, bytes32 r, bytes32 s, address to) public { |
||||
erc20.permit(from, address(this), value, deadline, v, r, s); |
||||
erc20.transferFrom(from, to, value); |
||||
} |
||||
``` |
||||
If an ERC20 token does not implement permit and has a fallback function e.g. WETH, transferFrom allows an attacker to transfer all tokens approved for this contract.""" |
||||
|
||||
WIKI_RECOMMENDATION = """ |
||||
Ensure that the underlying ERC20 token correctly implements a permit function. |
||||
""" |
||||
|
||||
def _detect(self) -> List[Output]: |
||||
"""""" |
||||
results: List[Output] = [] |
||||
|
||||
arbitrary_sends = ArbitrarySendErc20(self.compilation_unit) |
||||
arbitrary_sends.detect() |
||||
for node in arbitrary_sends.permit_results: |
||||
func = node.function |
||||
info = [ |
||||
func, |
||||
" uses arbitrary from in transferFrom in combination with permit: ", |
||||
node, |
||||
"\n", |
||||
] |
||||
res = self.generate_result(info) |
||||
results.append(res) |
||||
|
||||
return results |
@ -0,0 +1,91 @@ |
||||
# Slither-read-storage |
||||
|
||||
Slither-read-storage is a tool to retrieve the storage slots and values of entire contracts or single variables. |
||||
|
||||
## Usage |
||||
|
||||
### CLI Interface |
||||
|
||||
```shell |
||||
positional arguments: |
||||
contract_source (DIR) ADDRESS The deployed contract address if verified on etherscan. Prepend project directory for unverified contracts. |
||||
|
||||
optional arguments: |
||||
--variable-name VARIABLE_NAME The name of the variable whose value will be returned. |
||||
--rpc-url RPC_URL An endpoint for web3 requests. |
||||
--key KEY The key/ index whose value will be returned from a mapping or array. |
||||
--deep-key DEEP_KEY The key/ index whose value will be returned from a deep mapping or multidimensional array. |
||||
--struct-var STRUCT_VAR The name of the variable whose value will be returned from a struct. |
||||
--storage-address STORAGE_ADDRESS The address of the storage contract (if a proxy pattern is used). |
||||
--contract-name CONTRACT_NAME The name of the logic contract. |
||||
--layout Toggle used to write a JSON file with the entire storage layout. |
||||
--value Toggle used to include values in output. |
||||
--max-depth MAX_DEPTH Max depth to search in data structure. |
||||
``` |
||||
|
||||
### Examples |
||||
|
||||
Retrieve the storage slots of a local contract: |
||||
|
||||
```shell |
||||
slither-read-storage file.sol 0x8ad599c3a0ff1de082011efddc58f1908eb6e6d8 --layout |
||||
``` |
||||
|
||||
Retrieve the storage slots of a contract verified on an Etherscan-like platform: |
||||
|
||||
```shell |
||||
slither-read-storage 0x8ad599c3a0ff1de082011efddc58f1908eb6e6d8 --layout |
||||
``` |
||||
|
||||
To retrieve the values as well, pass `--value` and `--rpc-url $RPC_URL`: |
||||
|
||||
```shell |
||||
slither-read-storage 0x8ad599c3a0ff1de082011efddc58f1908eb6e6d8 --layout --rpc-url $RPC_URL --value |
||||
``` |
||||
|
||||
To view only the slot of the `slot0` structure variable, pass `--variable-name slot0`: |
||||
|
||||
```shell |
||||
slither-read-storage 0x8ad599c3a0ff1de082011efddc58f1908eb6e6d8 --variable-name slot0 --rpc-url $RPC_URL --value |
||||
``` |
||||
|
||||
To view a member of the `slot0` struct, pass `--struct-var tick` |
||||
|
||||
```shell |
||||
slither-read-storage 0x8ad599c3a0ff1de082011efddc58f1908eb6e6d8 --variable-name slot0 --rpc-url $RPC_URL --value --struct-var tick |
||||
``` |
||||
|
||||
Retrieve the ERC20 balance slot of an account: |
||||
|
||||
```shell |
||||
slither-read-storage 0xa2327a938Febf5FEC13baCFb16Ae10EcBc4cbDCF --variable-name balances --key 0xab5801a7d398351b8be11c439e05c5b3259aec9b |
||||
``` |
||||
|
||||
To retrieve the actual balance, pass `--variable-name balances` and `--key 0xab5801a7d398351b8be11c439e05c5b3259aec9b`. (`balances` is a `mapping(address => uint)`) |
||||
Since this contract uses the delegatecall-proxy pattern, the proxy address must be passed as the `--storage-address`. Otherwise, it is not required. |
||||
|
||||
```shell |
||||
slither-read-storage 0xa2327a938Febf5FEC13baCFb16Ae10EcBc4cbDCF --variable-name balances --key 0xab5801a7d398351b8be11c439e05c5b3259aec9b --storage-address 0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48 --rpc-url $RPC_URL --value |
||||
``` |
||||
|
||||
## Troubleshooting/FAQ |
||||
|
||||
- If the storage slots or values of a contract verified Etherscan are wrong, try passing `--contract $CONTRACT_NAME` explicitly. Otherwise, the storage may be retrieved from storage slots based off an unrelated contract (Etherscan includes these). (Also, make sure that the RPC is for the correct network.) |
||||
|
||||
- If Etherscan fails to return a source code, try passing `--etherscan-apikey $API_KEY` to avoid hitting a rate-limit. |
||||
|
||||
- How do I use this tool on other chains? |
||||
If an EVM chain has an Etherscan-like platform the crytic-compile supports, this tool supports it by making the following modifications. |
||||
Take Avalanche, for instance: |
||||
|
||||
```shell |
||||
slither-read-storage avax:0x0000000000000000000000000000000000000000 --layout --value --rpc-url $AVAX_RPC_URL |
||||
``` |
||||
|
||||
## Limitations |
||||
|
||||
- Requires source code. |
||||
- Only works on Solidity contracts. |
||||
- Cannot find variables with unstructured storage. |
||||
- Does not support all data types (please open an issue or PR). |
||||
- Mappings cannot be completely enumerated since all keys used historically are not immediately available. |
@ -0,0 +1 @@ |
||||
from .read_storage import SlitherReadStorage |
@ -0,0 +1,145 @@ |
||||
""" |
||||
Tool to read on-chain storage from EVM |
||||
""" |
||||
import json |
||||
import argparse |
||||
|
||||
from crytic_compile import cryticparser |
||||
|
||||
from slither import Slither |
||||
from slither.tools.read_storage.read_storage import SlitherReadStorage |
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace: |
||||
"""Parse the underlying arguments for the program. |
||||
Returns: |
||||
The arguments for the program. |
||||
""" |
||||
parser = argparse.ArgumentParser( |
||||
description="Read a variable's value from storage for a deployed contract", |
||||
usage=( |
||||
"\nTo retrieve a single variable's value:\n" |
||||
+ "\tslither-read-storage $TARGET address --variable-name $NAME\n" |
||||
+ "To retrieve a contract's storage layout:\n" |
||||
+ "\tslither-read-storage $TARGET address --contract-name $NAME --layout\n" |
||||
+ "To retrieve a contract's storage layout and values:\n" |
||||
+ "\tslither-read-storage $TARGET address --contract-name $NAME --layout --values\n" |
||||
+ "TARGET can be a contract address or project directory" |
||||
), |
||||
) |
||||
|
||||
parser.add_argument( |
||||
"contract_source", |
||||
help="The deployed contract address if verified on etherscan. Prepend project directory for unverified contracts.", |
||||
nargs="+", |
||||
) |
||||
|
||||
parser.add_argument( |
||||
"--variable-name", |
||||
help="The name of the variable whose value will be returned.", |
||||
default=None, |
||||
) |
||||
|
||||
parser.add_argument("--rpc-url", help="An endpoint for web3 requests.") |
||||
|
||||
parser.add_argument( |
||||
"--key", |
||||
help="The key/ index whose value will be returned from a mapping or array.", |
||||
default=None, |
||||
) |
||||
|
||||
parser.add_argument( |
||||
"--deep-key", |
||||
help="The key/ index whose value will be returned from a deep mapping or multidimensional array.", |
||||
default=None, |
||||
) |
||||
|
||||
parser.add_argument( |
||||
"--struct-var", |
||||
help="The name of the variable whose value will be returned from a struct.", |
||||
default=None, |
||||
) |
||||
|
||||
parser.add_argument( |
||||
"--storage-address", |
||||
help="The address of the storage contract (if a proxy pattern is used).", |
||||
default=None, |
||||
) |
||||
|
||||
parser.add_argument( |
||||
"--contract-name", |
||||
help="The name of the logic contract.", |
||||
default=None, |
||||
) |
||||
|
||||
parser.add_argument( |
||||
"--layout", |
||||
action="store_true", |
||||
help="Toggle used to write a JSON file with the entire storage layout.", |
||||
) |
||||
|
||||
parser.add_argument( |
||||
"--value", |
||||
action="store_true", |
||||
help="Toggle used to include values in output.", |
||||
) |
||||
|
||||
parser.add_argument("--max-depth", help="Max depth to search in data structure.", default=20) |
||||
|
||||
cryticparser.init(parser) |
||||
|
||||
return parser.parse_args() |
||||
|
||||
|
||||
def main() -> None: |
||||
args = parse_args() |
||||
|
||||
if len(args.contract_source) == 2: |
||||
# Source code is file.sol or project directory |
||||
source_code, target = args.contract_source |
||||
slither = Slither(source_code, **vars(args)) |
||||
else: |
||||
# Source code is published and retrieved via etherscan |
||||
target = args.contract_source[0] |
||||
slither = Slither(target, **vars(args)) |
||||
|
||||
if args.contract_name: |
||||
contracts = slither.get_contract_from_name(args.contract_name) |
||||
else: |
||||
contracts = slither.contracts |
||||
|
||||
srs = SlitherReadStorage(contracts, args.max_depth) |
||||
|
||||
if args.rpc_url: |
||||
# Remove target prefix e.g. rinkeby:0x0 -> 0x0. |
||||
address = target[target.find(":") + 1 :] |
||||
# Default to implementation address unless a storage address is given. |
||||
if not args.storage_address: |
||||
args.storage_address = address |
||||
srs.storage_address = args.storage_address |
||||
|
||||
srs.rpc = args.rpc_url |
||||
|
||||
if args.layout: |
||||
srs.get_all_storage_variables() |
||||
srs.get_storage_layout() |
||||
else: |
||||
assert args.variable_name |
||||
# Use a lambda func to only return variables that have same name as target. |
||||
# x is a tuple (`Contract`, `StateVariable`). |
||||
srs.get_all_storage_variables(lambda x: bool(x[1].name == args.variable_name)) |
||||
srs.get_target_variables(**vars(args)) |
||||
|
||||
# To retrieve slot values an rpc url is required. |
||||
if args.value: |
||||
assert args.rpc_url |
||||
srs.get_slot_values() |
||||
|
||||
# Only write file if storage layout is used. |
||||
if len(srs.slot_info) > 1: |
||||
with open("storage_layout.json", "w", encoding="utf-8") as file: |
||||
json.dump(srs.slot_info, file, indent=4) |
||||
|
||||
|
||||
if __name__ == "__main__": |
||||
main() |
@ -0,0 +1,551 @@ |
||||
import sys |
||||
import logging |
||||
from math import floor |
||||
|
||||
from typing import Callable, Optional, Tuple, Union, List, Dict |
||||
|
||||
try: |
||||
from typing import TypedDict |
||||
except ImportError: |
||||
# < Python 3.8 |
||||
from typing_extensions import TypedDict |
||||
|
||||
try: |
||||
from web3 import Web3 |
||||
from eth_typing.evm import ChecksumAddress |
||||
from eth_abi import decode_single, encode_abi |
||||
from eth_utils import keccak |
||||
from hexbytes import HexBytes |
||||
from .utils import ( |
||||
is_elementary, |
||||
is_array, |
||||
is_mapping, |
||||
is_struct, |
||||
is_user_defined_type, |
||||
get_offset_value, |
||||
get_storage_data, |
||||
coerce_type, |
||||
) |
||||
except ImportError: |
||||
print("ERROR: in order to use slither-read-storage, you need to install web3") |
||||
print("$ pip3 install web3 --user\n") |
||||
sys.exit(-1) |
||||
|
||||
from slither.core.solidity_types.type import Type |
||||
from slither.core.solidity_types import ArrayType |
||||
from slither.core.declarations import Contract, StructureContract |
||||
from slither.core.variables.state_variable import StateVariable |
||||
from slither.core.variables.structure_variable import StructureVariable |
||||
|
||||
|
||||
logging.basicConfig() |
||||
logger = logging.getLogger("Slither-read-storage") |
||||
logger.setLevel(logging.INFO) |
||||
|
||||
|
||||
class SlotInfo(TypedDict): |
||||
type_string: str |
||||
slot: int |
||||
size: int |
||||
offset: int |
||||
value: Optional[Union[int, bool, str, ChecksumAddress, hex]] |
||||
elems: Optional[TypedDict] # same types as SlotInfo |
||||
|
||||
|
||||
class SlitherReadStorageException(Exception): |
||||
pass |
||||
|
||||
|
||||
# pylint: disable=too-many-instance-attributes |
||||
class SlitherReadStorage: |
||||
def __init__(self, contracts, max_depth): |
||||
self._contracts: List[Contract] = contracts |
||||
self._max_depth: int = max_depth |
||||
self._log: str = "" |
||||
self._slot_info: SlotInfo = {} |
||||
self._target_variables = [] |
||||
self._web3: Optional[Web3] = None |
||||
self._checksum_address: Optional[ChecksumAddress] = None |
||||
self.storage_address: Optional[str] = None |
||||
self.rpc: Optional[str] = None |
||||
|
||||
@property |
||||
def contracts(self) -> List[Contract]: |
||||
return self._contracts |
||||
|
||||
@property |
||||
def max_depth(self) -> int: |
||||
return int(self._max_depth) |
||||
|
||||
@property |
||||
def log(self) -> str: |
||||
return self._log |
||||
|
||||
@log.setter |
||||
def log(self, log) -> str: |
||||
self._log = log |
||||
|
||||
@property |
||||
def web3(self) -> Web3: |
||||
if not self._web3: |
||||
self._web3 = Web3(Web3.HTTPProvider(self.rpc)) |
||||
return self._web3 |
||||
|
||||
@property |
||||
def checksum_address(self) -> ChecksumAddress: |
||||
if not self._checksum_address: |
||||
self._checksum_address = self.web3.toChecksumAddress(self.storage_address) |
||||
return self._checksum_address |
||||
|
||||
@property |
||||
def target_variables(self) -> List[Tuple[Contract, StateVariable]]: |
||||
"""Storage variables (not constant or immutable) and their associated contract.""" |
||||
return self._target_variables |
||||
|
||||
@property |
||||
def slot_info(self) -> SlotInfo: |
||||
"""Contains the location, type, size, offset, and value of contract slots.""" |
||||
return self._slot_info |
||||
|
||||
def get_storage_layout(self) -> None: |
||||
"""Retrieves the storage layout of entire contract.""" |
||||
tmp = {} |
||||
for contract, var in self.target_variables: |
||||
type_ = var.type |
||||
info = self.get_storage_slot(var, contract) |
||||
tmp[var.name] = info |
||||
|
||||
if is_user_defined_type(type_) and is_struct(type_.type): |
||||
tmp[var.name]["elems"] = self._all_struct_slots(var, contract) |
||||
continue |
||||
|
||||
if is_array(type_): |
||||
elems = self._all_array_slots(var, contract, type_, info["slot"]) |
||||
tmp[var.name]["elems"] = elems |
||||
|
||||
self._slot_info = tmp |
||||
|
||||
def get_storage_slot( |
||||
self, |
||||
target_variable: StateVariable, |
||||
contract: Contract, |
||||
**kwargs, |
||||
) -> Union[SlotInfo, None]: |
||||
"""Finds the storage slot of a variable in a given contract. |
||||
Args: |
||||
target_variable (`StateVariable`): The variable to retrieve the slot for. |
||||
contracts (`Contract`): The contract that contains the given state variable. |
||||
**kwargs: |
||||
key (str): Key of a mapping or index position if an array. |
||||
deep_key (str): Key of a mapping embedded within another mapping or secondary index if array. |
||||
struct_var (str): Structure variable name. |
||||
Returns: |
||||
(`SlotInfo`) | None : A dictionary of the slot information. |
||||
""" |
||||
|
||||
key = kwargs.get("key", None) |
||||
deep_key = kwargs.get("deep_key", None) |
||||
struct_var = kwargs.get("struct_var", None) |
||||
info = "" |
||||
var_log_name = target_variable.name |
||||
try: |
||||
int_slot, size, offset, type_to = self.get_variable_info(contract, target_variable) |
||||
except KeyError: |
||||
# Only the child contract of a parent contract will show up in the storage layout when inheritance is used |
||||
logger.info( |
||||
f"\nContract {contract} not found in storage layout. It is possibly a parent contract\n" |
||||
) |
||||
return None |
||||
|
||||
slot = int.to_bytes(int_slot, 32, byteorder="big") |
||||
|
||||
if is_elementary(target_variable.type): |
||||
type_to = target_variable.type.name |
||||
|
||||
elif is_array(target_variable.type) and key: |
||||
info, type_to, slot, size, offset = self._find_array_slot( |
||||
target_variable, slot, key, deep_key=deep_key, struct_var=struct_var |
||||
) |
||||
self.log += info |
||||
|
||||
elif is_user_defined_type(target_variable.type) and struct_var: |
||||
var_log_name = struct_var |
||||
elems = target_variable.type.type.elems_ordered |
||||
info, type_to, slot, size, offset = self._find_struct_var_slot(elems, slot, struct_var) |
||||
self.log += info |
||||
|
||||
elif is_mapping(target_variable.type) and key: |
||||
info, type_to, slot, size, offset = self._find_mapping_slot( |
||||
target_variable, slot, key, struct_var=struct_var, deep_key=deep_key |
||||
) |
||||
self.log += info |
||||
|
||||
int_slot = int.from_bytes(slot, byteorder="big") |
||||
self.log += f"\nName: {var_log_name}\nType: {type_to}\nSlot: {int_slot}\n" |
||||
logger.info(self.log) |
||||
self.log = "" |
||||
return { |
||||
"type_string": type_to, |
||||
"slot": int_slot, |
||||
"size": size, |
||||
"offset": offset, |
||||
} |
||||
|
||||
def get_target_variables(self, **kwargs) -> None: |
||||
""" |
||||
Retrieves every instance of a given variable in a list of contracts. |
||||
Should be called after setting `target_variables` with `get_all_storage_variables()`. |
||||
**kwargs: |
||||
key (str): Key of a mapping or index position if an array. |
||||
deep_key (str): Key of a mapping embedded within another mapping or secondary index if array. |
||||
struct_var (str): Structure variable name. |
||||
""" |
||||
for contract, var in self.target_variables: |
||||
self._slot_info[f"{contract.name}.{var.name}"] = self.get_storage_slot( |
||||
var, contract, **kwargs |
||||
) |
||||
|
||||
def get_slot_values(self) -> SlotInfo: |
||||
""" |
||||
Fetches the slot values and inserts them in slot info dictionary. |
||||
Returns: |
||||
(`SlotInfo`): The dictionary of slot info. |
||||
""" |
||||
stack = list(self.slot_info.items()) |
||||
while stack: |
||||
_, v = stack.pop() |
||||
if isinstance(v, dict): |
||||
stack.extend(v.items()) |
||||
if "slot" in v: |
||||
hex_bytes = get_storage_data(self.web3, self.checksum_address, v["slot"]) |
||||
v["value"] = self.convert_value_to_type( |
||||
hex_bytes, v["size"], v["offset"], v["type_string"] |
||||
) |
||||
logger.info(f"\nValue: {v['value']}\n") |
||||
return self.slot_info |
||||
|
||||
def get_all_storage_variables(self, func: Callable = None) -> None: |
||||
"""Fetches all storage variables from a list of contracts. |
||||
kwargs: |
||||
func (Callable, optional): A criteria to filter functions e.g. name. |
||||
""" |
||||
for contract in self.contracts: |
||||
self._target_variables.extend( |
||||
filter( |
||||
func, |
||||
[ |
||||
(contract, var) |
||||
for var in contract.variables |
||||
if not var.is_constant and not var.is_immutable |
||||
], |
||||
) |
||||
) |
||||
|
||||
@staticmethod |
||||
def _find_struct_var_slot( |
||||
elems: List[StructureVariable], slot: bytes, struct_var: str |
||||
) -> Tuple[str, str, bytes, int, int]: |
||||
"""Finds the slot of a structure variable. |
||||
Args: |
||||
elems (List[StructureVariable]): Ordered list of structure variables. |
||||
slot (bytes): The slot of the struct to begin searching at. |
||||
struct_var (str): The target structure variable. |
||||
Returns: |
||||
info (str): Info about the target variable to log. |
||||
type_to (str): The type of the target variable. |
||||
slot (bytes): The storage location of the target variable. |
||||
size (int): The size (in bits) of the target variable. |
||||
offset (int): The size of other variables that share the same slot. |
||||
""" |
||||
slot = int.from_bytes(slot, "big") |
||||
offset = 0 |
||||
for var in elems: |
||||
size = var.type.size |
||||
if offset >= 256: |
||||
slot += 1 |
||||
offset = 0 |
||||
if struct_var == var.name: |
||||
type_to = var.type.name |
||||
break # found struct var |
||||
offset += size |
||||
|
||||
slot = int.to_bytes(slot, 32, byteorder="big") |
||||
info = f"\nStruct Variable: {struct_var}" |
||||
return info, type_to, slot, size, offset |
||||
|
||||
# pylint: disable=too-many-branches |
||||
@staticmethod |
||||
def _find_array_slot( |
||||
target_variable: StateVariable, |
||||
slot: bytes, |
||||
key: int, |
||||
deep_key: int = None, |
||||
struct_var: str = None, |
||||
) -> Tuple[str, str, bytes]: |
||||
"""Finds the slot of array's index. |
||||
Args: |
||||
target_variable (`StateVariable`): The array that contains the target variable. |
||||
slot (bytes): The starting slot of the array. |
||||
key (int): The target variable's index position. |
||||
deep_key (int, optional): Secondary index if nested array. |
||||
struct_var (str, optional): Structure variable name. |
||||
Returns: |
||||
info (str): Info about the target variable to log. |
||||
type_to (str): The type of the target variable. |
||||
slot (bytes): The storage location of the target variable. |
||||
""" |
||||
info = f"\nKey: {key}" |
||||
offset = 0 |
||||
size = 256 |
||||
|
||||
if is_array( |
||||
target_variable.type.type |
||||
): # multidimensional array uint[i][], , uint[][i], or uint[][] |
||||
size = target_variable.type.type.type.size |
||||
type_to = target_variable.type.type.type.name |
||||
|
||||
if target_variable.type.is_fixed_array: # uint[][i] |
||||
slot_int = int.from_bytes(slot, "big") + int(key) |
||||
else: |
||||
slot = keccak(slot) |
||||
key = int(key) |
||||
if target_variable.type.type.is_fixed_array: # arr[i][] |
||||
key *= int(str(target_variable.type.type.length)) |
||||
slot_int = int.from_bytes(slot, "big") + key |
||||
|
||||
if not deep_key: |
||||
return info, type_to, int.to_bytes(slot_int, 32, "big"), size, offset |
||||
|
||||
info += f"\nDeep Key: {deep_key}" |
||||
if target_variable.type.type.is_dynamic_array: # uint[][] |
||||
# keccak256(keccak256(slot) + index) + floor(j / floor(256 / size)) |
||||
slot = keccak(int.to_bytes(slot_int, 32, "big")) |
||||
slot_int = int.from_bytes(slot, "big") |
||||
|
||||
# keccak256(slot) + index + floor(j / floor(256 / size)) |
||||
slot_int += floor(int(deep_key) / floor(256 / size)) # uint[i][] |
||||
|
||||
elif target_variable.type.is_fixed_array: |
||||
slot_int = int.from_bytes(slot, "big") + int(key) |
||||
if is_user_defined_type(target_variable.type.type): # struct[i] |
||||
type_to = target_variable.type.type.type.name |
||||
if not struct_var: |
||||
return info, type_to, int.to_bytes(slot_int, 32, "big"), size, offset |
||||
elems = target_variable.type.type.type.elems_ordered |
||||
slot = int.to_bytes(slot_int, 32, byteorder="big") |
||||
info_tmp, type_to, slot, size, offset = SlitherReadStorage._find_struct_var_slot( |
||||
elems, slot, struct_var |
||||
) |
||||
info += info_tmp |
||||
|
||||
else: |
||||
type_to = target_variable.type.type.name |
||||
size = target_variable.type.type.size # bits |
||||
|
||||
elif is_user_defined_type(target_variable.type.type): # struct[] |
||||
slot = keccak(slot) |
||||
slot_int = int.from_bytes(slot, "big") + int(key) |
||||
type_to = target_variable.type.type.type.name |
||||
if not struct_var: |
||||
return info, type_to, int.to_bytes(slot_int, 32, "big"), size, offset |
||||
elems = target_variable.type.type.type.elems_ordered |
||||
slot = int.to_bytes(slot_int, 32, byteorder="big") |
||||
info_tmp, type_to, slot, size, offset = SlitherReadStorage._find_struct_var_slot( |
||||
elems, slot, struct_var |
||||
) |
||||
info += info_tmp |
||||
|
||||
else: |
||||
slot = keccak(slot) |
||||
slot_int = int.from_bytes(slot, "big") + int(key) |
||||
type_to = target_variable.type.type.name |
||||
size = target_variable.type.type.size # bits |
||||
|
||||
slot = int.to_bytes(slot_int, 32, byteorder="big") |
||||
|
||||
return info, type_to, slot, size, offset |
||||
|
||||
@staticmethod |
||||
def _find_mapping_slot( |
||||
target_variable: StateVariable, |
||||
slot: bytes, |
||||
key: Union[int, str], |
||||
deep_key: Union[int, str] = None, |
||||
struct_var: str = None, |
||||
) -> Tuple[str, str, bytes, int, int]: |
||||
"""Finds the data slot of a target variable within a mapping. |
||||
target_variable (`StateVariable`): The mapping that contains the target variable. |
||||
slot (bytes): The starting slot of the mapping. |
||||
key (Union[int, str]): The key the variable is stored at. |
||||
deep_key (int, optional): Key of a mapping embedded within another mapping. |
||||
struct_var (str, optional): Structure variable name. |
||||
:returns: |
||||
log (str): Info about the target variable to log. |
||||
type_to (bytes): The type of the target variable. |
||||
slot (bytes): The storage location of the target variable. |
||||
size (int): The size (in bits) of the target variable. |
||||
offset (int): The size of other variables that share the same slot. |
||||
|
||||
""" |
||||
info = "" |
||||
offset = 0 |
||||
if key: |
||||
info += f"\nKey: {key}" |
||||
if deep_key: |
||||
info += f"\nDeep Key: {deep_key}" |
||||
|
||||
key_type = target_variable.type.type_from.name |
||||
assert key |
||||
if "int" in key_type: # without this eth_utils encoding fails |
||||
key = int(key) |
||||
key = coerce_type(key_type, key) |
||||
slot = keccak(encode_abi([key_type, "uint256"], [key, decode_single("uint256", slot)])) |
||||
|
||||
if is_user_defined_type(target_variable.type.type_to) and is_struct( |
||||
target_variable.type.type_to.type |
||||
): # mapping(elem => struct) |
||||
assert struct_var |
||||
elems = target_variable.type.type_to.type.elems_ordered |
||||
info_tmp, type_to, slot, size, offset = SlitherReadStorage._find_struct_var_slot( |
||||
elems, slot, struct_var |
||||
) |
||||
info += info_tmp |
||||
|
||||
elif is_mapping(target_variable.type.type_to): # mapping(elem => mapping(elem => ???)) |
||||
assert deep_key |
||||
key_type = target_variable.type.type_to.type_from.name |
||||
if "int" in key_type: # without this eth_utils encoding fails |
||||
deep_key = int(deep_key) |
||||
|
||||
# If deep map, will be keccak256(abi.encode(key1, keccak256(abi.encode(key0, uint(slot))))) |
||||
slot = keccak(encode_abi([key_type, "bytes32"], [deep_key, slot])) |
||||
|
||||
# mapping(elem => mapping(elem => elem)) |
||||
type_to = target_variable.type.type_to.type_to.type |
||||
byte_size, _ = target_variable.type.type_to.type_to.storage_size |
||||
size = byte_size * 8 # bits |
||||
offset = 0 |
||||
|
||||
if is_user_defined_type(target_variable.type.type_to.type_to) and is_struct( |
||||
target_variable.type.type_to.type_to.type |
||||
): # mapping(elem => mapping(elem => struct)) |
||||
assert struct_var |
||||
elems = target_variable.type.type_to.type_to.type.elems_ordered |
||||
# If map struct, will be bytes32(uint256(keccak256(abi.encode(key1, keccak256(abi.encode(key0, uint(slot)))))) + structFieldDepth); |
||||
info_tmp, type_to, slot, size, offset = SlitherReadStorage._find_struct_var_slot( |
||||
elems, slot, struct_var |
||||
) |
||||
info += info_tmp |
||||
|
||||
# TODO: suppory mapping with dynamic arrays |
||||
|
||||
else: # mapping(elem => elem) |
||||
type_to = target_variable.type.type_to.name # the value's elementary type |
||||
byte_size, _ = target_variable.type.type_to.storage_size |
||||
size = byte_size * 8 # bits |
||||
|
||||
return info, type_to, slot, size, offset |
||||
|
||||
@staticmethod |
||||
def get_variable_info( |
||||
contract: Contract, target_variable: StateVariable |
||||
) -> Tuple[int, int, int, str]: |
||||
"""Return slot, size, offset, and type.""" |
||||
type_to = str(target_variable.type) |
||||
byte_size, _ = target_variable.type.storage_size |
||||
size = byte_size * 8 # bits |
||||
(int_slot, offset) = contract.compilation_unit.storage_layout_of(contract, target_variable) |
||||
offset *= 8 # bits |
||||
logger.info( |
||||
f"\nContract '{contract.name}'\n{target_variable.canonical_name} with type {target_variable.type} is located at slot: {int_slot}\n" |
||||
) |
||||
|
||||
return int_slot, size, offset, type_to |
||||
|
||||
@staticmethod |
||||
def convert_value_to_type( |
||||
hex_bytes: HexBytes, size: int, offset: int, type_to: str |
||||
) -> Union[int, bool, str, ChecksumAddress, hex]: |
||||
"""Convert slot data to type representation.""" |
||||
# Account for storage packing |
||||
offset_hex_bytes = get_offset_value(hex_bytes, offset, size) |
||||
try: |
||||
value = coerce_type(type_to, offset_hex_bytes) |
||||
except ValueError: |
||||
return coerce_type("int", offset_hex_bytes) |
||||
|
||||
return value |
||||
|
||||
def _all_struct_slots( |
||||
self, var: Union[StructureVariable, StructureContract], contract: Contract, key=None |
||||
) -> Dict[str, SlotInfo]: |
||||
"""Retrieves all members of a struct.""" |
||||
if isinstance(var.type.type, StructureContract): |
||||
struct_elems = var.type.type.elems_ordered |
||||
else: |
||||
struct_elems = var.type.type.type.elems_ordered |
||||
data = {} |
||||
for elem in struct_elems: |
||||
info = self.get_storage_slot( |
||||
var, |
||||
contract, |
||||
key=key, |
||||
struct_var=elem.name, |
||||
) |
||||
data[elem.name] = info |
||||
|
||||
return data |
||||
|
||||
def _all_array_slots( |
||||
self, var: ArrayType, contract: Contract, type_: Type, slot: int |
||||
) -> Dict[int, SlotInfo]: |
||||
"""Retrieves all members of an array.""" |
||||
array_length = self._get_array_length(type_, slot) |
||||
elems = {} |
||||
if is_user_defined_type(type_.type): |
||||
for i in range(min(array_length, self.max_depth)): |
||||
elems[i] = self._all_struct_slots(var, contract, key=str(i)) |
||||
continue |
||||
|
||||
else: |
||||
for i in range(min(array_length, self.max_depth)): |
||||
info = self.get_storage_slot( |
||||
var, |
||||
contract, |
||||
key=str(i), |
||||
) |
||||
elems[i] = info |
||||
|
||||
if is_array(type_.type): # multidimensional array |
||||
array_length = self._get_array_length(type_.type, info["slot"]) |
||||
|
||||
elems[i]["elems"] = {} |
||||
for j in range(min(array_length, self.max_depth)): |
||||
info = self.get_storage_slot( |
||||
var, |
||||
contract, |
||||
key=str(i), |
||||
deep_key=str(j), |
||||
) |
||||
|
||||
elems[i]["elems"][j] = info |
||||
return elems |
||||
|
||||
def _get_array_length(self, type_: Type, slot: int = None) -> int: |
||||
"""Gets the length of dynamic and fixed arrays. |
||||
Args: |
||||
type_ (`Type`): The array type. |
||||
slot (int, optional): Slot a dynamic array's length is stored at. |
||||
Returns: |
||||
(int): The length of the array. |
||||
""" |
||||
val = 0 |
||||
if self.rpc: |
||||
# The length of dynamic arrays is stored at the starting slot. |
||||
# Convert from hexadecimal to decimal. |
||||
val = int(get_storage_data(self.web3, self.checksum_address, slot).hex(), 16) |
||||
if is_array(type_): |
||||
if type_.is_fixed_array: |
||||
val = int(str(type_.length)) |
||||
|
||||
return val |
@ -0,0 +1,11 @@ |
||||
from .utils import ( |
||||
is_elementary, |
||||
is_array, |
||||
is_enum, |
||||
is_mapping, |
||||
is_struct, |
||||
is_user_defined_type, |
||||
get_offset_value, |
||||
get_storage_data, |
||||
coerce_type, |
||||
) |
@ -0,0 +1,100 @@ |
||||
from typing import Union |
||||
from hexbytes import HexBytes |
||||
from eth_typing.evm import ChecksumAddress |
||||
from eth_utils import to_int, to_text, to_checksum_address |
||||
|
||||
from slither.core.declarations import Structure, Enum |
||||
from slither.core.solidity_types import ArrayType, MappingType, UserDefinedType, ElementaryType |
||||
from slither.core.variables.state_variable import StateVariable |
||||
|
||||
|
||||
def is_elementary(variable: StateVariable) -> bool: |
||||
"""Returns whether variable is an elementary type.""" |
||||
return isinstance(variable, ElementaryType) |
||||
|
||||
|
||||
def is_array(variable: StateVariable) -> bool: |
||||
"""Returns whether variable is an array.""" |
||||
return isinstance(variable, ArrayType) |
||||
|
||||
|
||||
def is_mapping(variable: StateVariable) -> bool: |
||||
"""Returns whether variable is a mapping.""" |
||||
return isinstance(variable, MappingType) |
||||
|
||||
|
||||
def is_struct(variable: StateVariable) -> bool: |
||||
"""Returns whether variable is a struct.""" |
||||
return isinstance(variable, Structure) |
||||
|
||||
|
||||
def is_enum(variable: StateVariable) -> bool: |
||||
"""Returns whether variable is an enum.""" |
||||
return isinstance(variable, Enum) |
||||
|
||||
|
||||
def is_user_defined_type(variable: StateVariable) -> bool: |
||||
"""Returns whether variable is a struct.""" |
||||
return isinstance(variable, UserDefinedType) |
||||
|
||||
|
||||
def get_offset_value(hex_bytes: HexBytes, offset: int, size: int) -> bytes: |
||||
""" |
||||
Trims slot data to only contain the target variable's. |
||||
Args: |
||||
hex_bytes (HexBytes): String representation of type |
||||
offset (int): The size (in bits) of other variables that share the same slot. |
||||
size (int): The size (in bits) of the target variable. |
||||
Returns: |
||||
(bytes): The target variable's trimmed data. |
||||
""" |
||||
size = int(size / 8) |
||||
offset = int(offset / 8) |
||||
if offset == 0: |
||||
value = hex_bytes[-size:] |
||||
else: |
||||
start = size + offset |
||||
value = hex_bytes[-start:-offset] |
||||
return value |
||||
|
||||
|
||||
def coerce_type(solidity_type: str, value: bytes) -> Union[int, bool, str, ChecksumAddress, hex]: |
||||
""" |
||||
Converts input to the indicated type. |
||||
Args: |
||||
solidity_type (str): String representation of type. |
||||
value (bytes): The value to be converted. |
||||
Returns: |
||||
(Union[int, bool, str, ChecksumAddress, hex]): The type representation of the value. |
||||
""" |
||||
if "int" in solidity_type: |
||||
converted_value = to_int(value) |
||||
elif "bool" in solidity_type: |
||||
converted_value = bool(to_int(value)) |
||||
elif "string" in solidity_type: |
||||
# length * 2 is stored in lower end bits |
||||
# TODO handle bytes and strings greater than 32 bytes |
||||
length = int(int.from_bytes(value[-2:], "big") / 2) |
||||
converted_value = to_text(value[:length]) |
||||
|
||||
elif "address" in solidity_type: |
||||
converted_value = to_checksum_address(value) |
||||
else: |
||||
converted_value = value.hex() |
||||
|
||||
return converted_value |
||||
|
||||
|
||||
def get_storage_data(web3, checksum_address: ChecksumAddress, slot: bytes) -> HexBytes: |
||||
""" |
||||
Retrieves the storage data from the blockchain at target address and slot. |
||||
Args: |
||||
web3: Web3 instance provider. |
||||
checksum_address (ChecksumAddress): The address to query. |
||||
slot (bytes): The slot to retrieve data from. |
||||
Returns: |
||||
(HexBytes): The slot's storage data. |
||||
""" |
||||
return bytes(web3.eth.get_storage_at(checksum_address, slot)).rjust( |
||||
32, bytes(1) |
||||
) # pad to 32 bytes |
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in new issue