feat: support for filecoin native addresses (#10468)
* feat: implement `Ecto` type for filecoin address * fix: use proper hashing algorithm for checksum * refactor: avoid hardcoding * feat: add `NativeAddress.ID` type * chore: add `alias Blake2.Blake2b` to fix credo * feat: implement `Ecto` type for filecoin address * chore: rename id address module * feat: fix formatting * feat: add a table for pending address operations * feat: add filecoin fields to addresses relation * feat: create pending operation when new address is imported * feat: implement filecoin native address fetcher * chore: remove merge artifacts * fix: cspell * fix: alias in `native_address_test.exs` * fix: cspell * fix: lock address and corresponding operation for update * feat: trigger async fetch of address info from block fetcher * fix: compilation deadlock * fix: add fetcher supervisor case * feat: add migrator * fix: create pending address operation even if the address exists * feat: render filecoin address info in API v2 views * fix: user controller test * feat: add gauge metric for pending address operations * feat: save http error code for failed fetches * chore: rename fetcher * fix: rebase artifacts * chore: list migrator envs in `common-blockscout.env` * chore: process review comments by @vbaranov * chore: migrate from `blake2_elixir` to `blake2` package * chore: reduce log level to `debug` * chore: set infinity timeout for gauge metric query * refactor: remove redundant `Multi` in filling migrationpull/10745/head
parent
3132cc793a
commit
e12b010a0e
@ -0,0 +1,35 @@ |
||||
defmodule BlockScoutWeb.API.V2.FilecoinView do |
||||
@moduledoc """ |
||||
View functions for rendering Filecoin-related data in JSON format. |
||||
""" |
||||
|
||||
alias Explorer.Chain.Address |
||||
|
||||
@doc """ |
||||
Extends the json output with a sub-map containing information related to |
||||
Filecoin native addressing. |
||||
""" |
||||
@spec extend_address_json_response(map(), Address.t()) :: map() |
||||
def extend_address_json_response(result, %Address{} = address) do |
||||
filecoin_id = Map.get(address, :filecoin_id) |
||||
filecoin_robust = Map.get(address, :filecoin_robust) |
||||
filecoin_actor_type = Map.get(address, :filecoin_actor_type) |
||||
|
||||
is_fetched = |
||||
Enum.all?( |
||||
[ |
||||
filecoin_id, |
||||
filecoin_robust, |
||||
filecoin_actor_type |
||||
], |
||||
&(not is_nil(&1)) |
||||
) |
||||
|
||||
Map.put(result, :filecoin, %{ |
||||
is_fetched: is_fetched, |
||||
id: filecoin_id, |
||||
robust: filecoin_robust, |
||||
actor_type: filecoin_actor_type |
||||
}) |
||||
end |
||||
end |
@ -0,0 +1,157 @@ |
||||
defmodule Explorer.Chain.Filecoin.IDAddress do |
||||
@moduledoc """ |
||||
Handles Filecoin ID addresses, wrapping the `NativeAddress` type. |
||||
""" |
||||
|
||||
alias Explorer.Chain.Filecoin.NativeAddress |
||||
alias Poison.Encoder.BitString |
||||
|
||||
require Integer |
||||
|
||||
defstruct ~w(value)a |
||||
|
||||
@protocol_indicator 0 |
||||
|
||||
use Ecto.Type |
||||
|
||||
@type t :: %__MODULE__{value: binary()} |
||||
|
||||
@impl Ecto.Type |
||||
@spec type() :: :binary |
||||
def type, do: :binary |
||||
|
||||
defp to_native_address(%__MODULE__{value: value}) do |
||||
%NativeAddress{ |
||||
protocol_indicator: @protocol_indicator, |
||||
payload: value |
||||
} |
||||
end |
||||
|
||||
@doc """ |
||||
Casts a binary string to a `Explorer.Chain.Filecoin.IDAddress`. |
||||
|
||||
## Examples |
||||
|
||||
iex> Explorer.Chain.Filecoin.IDAddress.cast("f01729") |
||||
{:ok, %Explorer.Chain.Filecoin.IDAddress{value: <<193, 13>>}} |
||||
|
||||
iex> Explorer.Chain.Filecoin.IDAddress.cast(%Explorer.Chain.Filecoin.IDAddress{value: <<193, 13>>}) |
||||
{:ok, %Explorer.Chain.Filecoin.IDAddress{value: <<193, 13>>}} |
||||
|
||||
iex> Explorer.Chain.Filecoin.IDAddress.cast("invalid") |
||||
:error |
||||
""" |
||||
@impl Ecto.Type |
||||
def cast(address_string) when is_binary(address_string) do |
||||
address_string |
||||
|> NativeAddress.cast() |
||||
|> case do |
||||
{:ok, |
||||
%NativeAddress{ |
||||
protocol_indicator: @protocol_indicator, |
||||
payload: value |
||||
}} -> |
||||
{:ok, %__MODULE__{value: value}} |
||||
|
||||
:error -> |
||||
:error |
||||
end |
||||
end |
||||
|
||||
@impl Ecto.Type |
||||
def cast(%__MODULE__{} = address), do: {:ok, address} |
||||
|
||||
@impl Ecto.Type |
||||
def cast(_), do: :error |
||||
|
||||
@doc """ |
||||
Dumps an `Explorer.Chain.Filecoin.IDAddress` to its binary representation. |
||||
|
||||
## Examples |
||||
|
||||
iex> address = %Explorer.Chain.Filecoin.IDAddress{value: <<193, 13>>} |
||||
iex> Explorer.Chain.Filecoin.IDAddress.dump(address) |
||||
{:ok, <<0, 193, 13>>} |
||||
|
||||
iex> Explorer.Chain.Filecoin.IDAddress.dump("invalid") |
||||
:error |
||||
""" |
||||
@impl Ecto.Type |
||||
def dump(%__MODULE__{} = address) do |
||||
address |
||||
|> to_native_address() |
||||
|> NativeAddress.dump() |
||||
end |
||||
|
||||
def dump(_), do: :error |
||||
|
||||
@doc """ |
||||
Loads a binary representation of an `Explorer.Chain.Filecoin.IDAddress`. |
||||
|
||||
## Examples |
||||
|
||||
iex> Explorer.Chain.Filecoin.IDAddress.load(<<0, 193, 13>>) |
||||
{:ok, %Explorer.Chain.Filecoin.IDAddress{value: <<193, 13>>}} |
||||
|
||||
iex> Explorer.Chain.Filecoin.IDAddress.load("invalid") |
||||
:error |
||||
""" |
||||
@impl Ecto.Type |
||||
def load(bytes) when is_binary(bytes) do |
||||
bytes |
||||
|> NativeAddress.load() |
||||
|> case do |
||||
{:ok, |
||||
%NativeAddress{ |
||||
protocol_indicator: @protocol_indicator, |
||||
payload: value |
||||
}} -> |
||||
{:ok, %__MODULE__{value: value}} |
||||
|
||||
_ -> |
||||
:error |
||||
end |
||||
end |
||||
|
||||
def load(_), do: :error |
||||
|
||||
@doc """ |
||||
Converts an `Explorer.Chain.Filecoin.IDAddress` to its string representation. |
||||
|
||||
## Examples |
||||
|
||||
iex> address = %Explorer.Chain.Filecoin.IDAddress{value: <<193, 13>>} |
||||
iex> Explorer.Chain.Filecoin.IDAddress.to_string(address) |
||||
"f01729" |
||||
""" |
||||
@spec to_string(t()) :: String.t() |
||||
def to_string(%__MODULE__{} = address) do |
||||
address |
||||
|> to_native_address() |
||||
|> NativeAddress.to_string() |
||||
end |
||||
|
||||
defimpl String.Chars do |
||||
def to_string(address) do |
||||
@for.to_string(address) |
||||
end |
||||
end |
||||
|
||||
defimpl Poison.Encoder do |
||||
def encode(address, options) do |
||||
address |
||||
|> to_string() |
||||
|> BitString.encode(options) |
||||
end |
||||
end |
||||
|
||||
defimpl Jason.Encoder do |
||||
alias Jason.Encode |
||||
|
||||
def encode(address, opts) do |
||||
address |
||||
|> to_string() |
||||
|> Encode.string(opts) |
||||
end |
||||
end |
||||
end |
@ -0,0 +1,408 @@ |
||||
defmodule Explorer.Chain.Filecoin.NativeAddress do |
||||
@moduledoc """ |
||||
Handles Filecoin addresses by parsing, validating, and converting them to and |
||||
from their binary representations. |
||||
|
||||
Addresses are encoded to binary according to the [Filecoin Address |
||||
spec](https://spec.filecoin.io/appendix/address/#section-appendix.address.validatechecksum). |
||||
Details about f4 addresses are provided in |
||||
[FIP-0048](https://github.com/filecoin-project/FIPs/blob/master/FIPS/fip-0048.md). |
||||
|
||||
Internally, f0/f1/f2/f3 addresses are stored as a binary with the following structure: |
||||
|
||||
|--------------------|---------| |
||||
| protocol indicator | payload | |
||||
|--------------------|---------| |
||||
| 1 byte | n bytes | |
||||
|--------------------|---------| |
||||
|
||||
1. The first byte is the protocol indicator. The values are: |
||||
- `0` for f0 addresses |
||||
- `1` for f1 addresses |
||||
- `2` for f2 addresses |
||||
- `3` for f3 addresses |
||||
|
||||
2. The remaining bytes are the payload. |
||||
|
||||
f4 addresses are stored as a binary with the following structure: |
||||
|
||||
|--------------------|----------|---------| |
||||
| protocol indicator | actor id | payload | |
||||
|--------------------|----------|---------| |
||||
| 1 byte | 1 byte | n bytes | |
||||
|--------------------|----------|---------| |
||||
|
||||
1. The first byte is the protocol indicator. The value is `4`. |
||||
2. The second byte is the actor id. |
||||
3. The remaining bytes are the payload. |
||||
""" |
||||
|
||||
alias Explorer.Chain.Hash |
||||
alias Poison.Encoder.BitString |
||||
alias Varint.LEB128 |
||||
|
||||
use Ecto.Type |
||||
|
||||
defstruct ~w(protocol_indicator actor_id payload checksum)a |
||||
|
||||
@checksum_bytes_count 4 |
||||
|
||||
@protocol_indicator_bytes_count 1 |
||||
@max_protocol_indicator 2 ** (@protocol_indicator_bytes_count * Hash.bits_per_byte()) - 1 |
||||
|
||||
@min_address_string_length 3 |
||||
|
||||
# Payload sizes: |
||||
# f1 -- 20 bytes |
||||
# f2 -- 20 bytes |
||||
# f3 -- 48 bytes |
||||
@protocol_indicator_to_payload_byte_count %{ |
||||
1 => 20, |
||||
# For some reason, specs tell that payload for f2 is a SHA256 hash, which is |
||||
# 32 bytes long. However, in practice, it is 20 bytes long... |
||||
# |
||||
# https://spec.filecoin.io/appendix/address/#section-appendix.address.protocol-2-actor |
||||
2 => 20, |
||||
3 => 48 |
||||
} |
||||
@standard_protocol_indicators Map.keys(@protocol_indicator_to_payload_byte_count) |
||||
|
||||
@type t :: %__MODULE__{ |
||||
protocol_indicator: non_neg_integer(), |
||||
actor_id: non_neg_integer() | nil, |
||||
payload: binary(), |
||||
checksum: binary() | nil |
||||
} |
||||
|
||||
@impl Ecto.Type |
||||
@spec type() :: :binary |
||||
def type, do: :binary |
||||
|
||||
defp network_prefix do |
||||
Atom.to_string(Application.get_env(:explorer, __MODULE__)[:network_prefix]) |
||||
end |
||||
|
||||
@doc """ |
||||
Casts `term` to `t:t/0`. |
||||
|
||||
If the term is already in `t:t/0`, then it is returned |
||||
|
||||
iex> Explorer.Chain.Filecoin.NativeAddress.cast( |
||||
...> %Explorer.Chain.Filecoin.NativeAddress{ |
||||
...> protocol_indicator: 0, |
||||
...> actor_id: nil, |
||||
...> payload: <<193, 13>>, |
||||
...> checksum: nil |
||||
...> } |
||||
...> ) |
||||
{ |
||||
:ok, |
||||
%Explorer.Chain.Filecoin.NativeAddress{ |
||||
protocol_indicator: 0, |
||||
actor_id: nil, |
||||
payload: <<193, 13>>, |
||||
checksum: nil |
||||
} |
||||
} |
||||
|
||||
If the term is a binary, then it is parsed to `t:t/0` |
||||
|
||||
iex> Explorer.Chain.Filecoin.NativeAddress.cast("f01729") |
||||
{ |
||||
:ok, |
||||
%Explorer.Chain.Filecoin.NativeAddress{ |
||||
protocol_indicator: 0, |
||||
actor_id: nil, |
||||
payload: <<193, 13>>, |
||||
checksum: nil |
||||
} |
||||
} |
||||
|
||||
iex> Explorer.Chain.Filecoin.NativeAddress.cast("f01729") |
||||
{ |
||||
:ok, |
||||
%Explorer.Chain.Filecoin.NativeAddress{ |
||||
protocol_indicator: 0, |
||||
actor_id: nil, |
||||
payload: <<193, 13>>, |
||||
checksum: nil |
||||
} |
||||
} |
||||
|
||||
iex> NativeAddress.cast("f410fabpafjfjgqkc3douo3yzfug5tq4bwfvuhsewxji") |
||||
{ |
||||
:ok, |
||||
%Explorer.Chain.Filecoin.NativeAddress{ |
||||
protocol_indicator: 4, |
||||
actor_id: 10, |
||||
payload: <<0, 94, 2, 164, 169, 52, 20, 45, 141, 212, 118, 241, 146, 208, 221, 156, 56, 27, 22, 180>>, |
||||
checksum: <<60, 137, 107, 165>> |
||||
} |
||||
} |
||||
""" |
||||
@impl Ecto.Type |
||||
@spec cast(t() | String.t()) :: {:ok, t()} | :error |
||||
def cast(%__MODULE__{} = address), do: {:ok, address} |
||||
|
||||
def cast(address_string) when is_binary(address_string) do |
||||
network = network_prefix() |
||||
|
||||
with true <- String.length(address_string) >= @min_address_string_length, |
||||
^network <> protocol_indicator_and_payload <- address_string, |
||||
{:ok, address} <- cast_protocol_indicator_and_payload(protocol_indicator_and_payload), |
||||
:ok <- verify_checksum(address) do |
||||
{:ok, address} |
||||
else |
||||
_ -> |
||||
:error |
||||
end |
||||
end |
||||
|
||||
defp cast_protocol_indicator_and_payload("0" <> id_string) do |
||||
id_string |
||||
|> Integer.parse() |
||||
|> case do |
||||
{id, ""} when is_integer(id) and id >= 0 -> |
||||
payload = LEB128.encode(id) |
||||
|
||||
{:ok, |
||||
%__MODULE__{ |
||||
protocol_indicator: 0, |
||||
actor_id: nil, |
||||
payload: payload, |
||||
checksum: nil |
||||
}} |
||||
|
||||
_ -> |
||||
:error |
||||
end |
||||
end |
||||
|
||||
defp cast_protocol_indicator_and_payload("4" <> rest) do |
||||
with [actor_id_string, base32_digits] <- String.split(rest, "f", parts: 2), |
||||
{actor_id, ""} when is_integer(actor_id) <- Integer.parse(actor_id_string), |
||||
{:ok, {payload, checksum}} <- cast_base32_digits(base32_digits) do |
||||
{:ok, |
||||
%__MODULE__{ |
||||
protocol_indicator: 4, |
||||
actor_id: actor_id, |
||||
payload: payload, |
||||
checksum: checksum |
||||
}} |
||||
else |
||||
_ -> :error |
||||
end |
||||
end |
||||
|
||||
defp cast_protocol_indicator_and_payload(protocol_indicator_and_payload) do |
||||
with {protocol_indicator_string, base32_digits} <- |
||||
String.split_at( |
||||
protocol_indicator_and_payload, |
||||
1 |
||||
), |
||||
{protocol_indicator, ""} when protocol_indicator in @standard_protocol_indicators <- |
||||
Integer.parse(protocol_indicator_string), |
||||
{:ok, byte_count} <- |
||||
Map.fetch( |
||||
@protocol_indicator_to_payload_byte_count, |
||||
protocol_indicator |
||||
), |
||||
{:ok, {payload, checksum}} <- cast_base32_digits(base32_digits, byte_count) do |
||||
{:ok, |
||||
%__MODULE__{ |
||||
protocol_indicator: protocol_indicator, |
||||
actor_id: nil, |
||||
payload: payload, |
||||
checksum: checksum |
||||
}} |
||||
else |
||||
_ -> :error |
||||
end |
||||
end |
||||
|
||||
defp cast_base32_digits(digits) do |
||||
with {:ok, bytes} <- Base.decode32(digits, case: :lower, padding: false), |
||||
<< |
||||
payload::binary-size(byte_size(bytes) - @checksum_bytes_count), |
||||
checksum::binary-size(@checksum_bytes_count) |
||||
>> <- bytes do |
||||
{:ok, {payload, checksum}} |
||||
else |
||||
_ -> :error |
||||
end |
||||
end |
||||
|
||||
defp cast_base32_digits(digits, expected_bytes_count) do |
||||
with {:ok, {payload, checksum}} <- cast_base32_digits(digits), |
||||
true <- byte_size(payload) == expected_bytes_count do |
||||
{:ok, {payload, checksum}} |
||||
else |
||||
_ -> :error |
||||
end |
||||
end |
||||
|
||||
@doc """ |
||||
Dumps the address to `:binary` (`bytea`) representation format used in |
||||
database. |
||||
""" |
||||
@impl Ecto.Type |
||||
@spec dump(t()) :: {:ok, binary()} | :error |
||||
def dump(%__MODULE__{protocol_indicator: 4, actor_id: actor_id, payload: payload}) |
||||
when is_integer(actor_id) and |
||||
is_binary(payload) and |
||||
actor_id >= 0 and |
||||
actor_id <= @max_protocol_indicator do |
||||
{:ok, <<4, actor_id, payload::binary>>} |
||||
end |
||||
|
||||
def dump(%__MODULE__{protocol_indicator: protocol_indicator, payload: payload}) |
||||
when is_integer(protocol_indicator) and |
||||
is_binary(payload) and |
||||
protocol_indicator >= 0 and |
||||
protocol_indicator <= @max_protocol_indicator do |
||||
{:ok, <<protocol_indicator, payload::binary>>} |
||||
end |
||||
|
||||
def dump(_), do: :error |
||||
|
||||
@doc """ |
||||
Loads the address from `:binary` representation used in database. |
||||
""" |
||||
@impl Ecto.Type |
||||
@spec load(binary()) :: {:ok, t()} | :error |
||||
def load(<<protocol_indicator, rest::binary>> = bytes) do |
||||
case protocol_indicator do |
||||
0 -> |
||||
{:ok, |
||||
%__MODULE__{ |
||||
protocol_indicator: 0, |
||||
actor_id: nil, |
||||
payload: rest, |
||||
checksum: nil |
||||
}} |
||||
|
||||
4 -> |
||||
checksum = to_checksum(bytes) |
||||
<<actor_id, payload::binary>> = rest |
||||
|
||||
{:ok, |
||||
%__MODULE__{ |
||||
protocol_indicator: 4, |
||||
actor_id: actor_id, |
||||
payload: payload, |
||||
checksum: checksum |
||||
}} |
||||
|
||||
protocol_indicator when protocol_indicator in @standard_protocol_indicators -> |
||||
checksum = to_checksum(bytes) |
||||
|
||||
{:ok, |
||||
%__MODULE__{ |
||||
protocol_indicator: protocol_indicator, |
||||
actor_id: nil, |
||||
payload: rest, |
||||
checksum: checksum |
||||
}} |
||||
|
||||
_ -> |
||||
:error |
||||
end |
||||
end |
||||
|
||||
def load(_), do: :error |
||||
|
||||
@doc """ |
||||
Converts the address to a string representation. |
||||
|
||||
iex> Explorer.Chain.Filecoin.NativeAddress.to_string( |
||||
...> %Explorer.Chain.Filecoin.NativeAddress{ |
||||
...> protocol_indicator: 0, |
||||
...> actor_id: nil, |
||||
...> payload: <<193, 13>>, |
||||
...> checksum: nil |
||||
...> } |
||||
...> ) |
||||
"f01729" |
||||
|
||||
iex> Explorer.Chain.Filecoin.NativeAddress.to_string( |
||||
...> %Explorer.Chain.Filecoin.NativeAddress{ |
||||
...> protocol_indicator: 4, |
||||
...> actor_id: 10, |
||||
...> payload: <<0, 94, 2, 164, 169, 52, 20, 45, 141, 212, 118, 241, 146, 208, 221, 156, 56, 27, 22, 180>>, |
||||
...> checksum: <<60, 137, 107, 165>> |
||||
...> } |
||||
...> ) |
||||
"f410fabpafjfjgqkc3douo3yzfug5tq4bwfvuhsewxji" |
||||
""" |
||||
@spec to_string(t) :: String.t() |
||||
def to_string(%__MODULE__{protocol_indicator: 0, payload: payload}) do |
||||
{id, <<>>} = LEB128.decode(payload) |
||||
network_prefix() <> "0" <> Integer.to_string(id) |
||||
end |
||||
|
||||
@spec to_string(t) :: String.t() |
||||
def to_string(%__MODULE__{ |
||||
protocol_indicator: protocol_indicator, |
||||
payload: payload, |
||||
actor_id: actor_id, |
||||
checksum: checksum |
||||
}) do |
||||
payload_with_checksum = |
||||
Base.encode32( |
||||
payload <> checksum, |
||||
case: :lower, |
||||
padding: false |
||||
) |
||||
|
||||
protocol_indicator_part = |
||||
protocol_indicator |
||||
|> case do |
||||
indicator when indicator in @standard_protocol_indicators -> |
||||
Integer.to_string(indicator) |
||||
|
||||
4 -> |
||||
"4" <> Integer.to_string(actor_id) <> "f" |
||||
end |
||||
|
||||
network_prefix() <> protocol_indicator_part <> payload_with_checksum |
||||
end |
||||
|
||||
defp verify_checksum(%__MODULE__{protocol_indicator: 0, checksum: nil}), do: :ok |
||||
|
||||
defp verify_checksum(%__MODULE__{checksum: checksum} = address) |
||||
when not is_nil(checksum) do |
||||
with {:ok, bytes} <- dump(address), |
||||
^checksum <- to_checksum(bytes) do |
||||
:ok |
||||
else |
||||
_ -> :error |
||||
end |
||||
end |
||||
|
||||
defp to_checksum(bytes), |
||||
do: Blake2.hash2b(bytes, @checksum_bytes_count) |
||||
|
||||
defimpl String.Chars do |
||||
def to_string(hash) do |
||||
@for.to_string(hash) |
||||
end |
||||
end |
||||
|
||||
defimpl Poison.Encoder do |
||||
def encode(hash, options) do |
||||
hash |
||||
|> to_string() |
||||
|> BitString.encode(options) |
||||
end |
||||
end |
||||
|
||||
defimpl Jason.Encoder do |
||||
alias Jason.Encode |
||||
|
||||
def encode(hash, opts) do |
||||
hash |
||||
|> to_string() |
||||
|> Encode.string(opts) |
||||
end |
||||
end |
||||
end |
@ -0,0 +1,73 @@ |
||||
defmodule Explorer.Chain.Filecoin.PendingAddressOperation do |
||||
@moduledoc """ |
||||
Tracks an address that is pending for fetching of filecoin address info. |
||||
""" |
||||
|
||||
use Explorer.Schema |
||||
|
||||
import Explorer.Chain, only: [add_fetcher_limit: 2] |
||||
alias Explorer.Chain.{Address, Hash} |
||||
alias Explorer.Repo |
||||
|
||||
@http_error_codes 400..526 |
||||
|
||||
@optional_attrs ~w(http_status_code)a |
||||
@required_attrs ~w(address_hash)a |
||||
|
||||
@attrs @optional_attrs ++ @required_attrs |
||||
|
||||
@typedoc """ |
||||
* `address_hash` - the hash of the address that is pending to be fetched. |
||||
* `http_status_code` - the unsuccessful (non-200) http code returned by Beryx |
||||
API if the fetcher failed to fetch the address. |
||||
""" |
||||
@primary_key false |
||||
typed_schema "filecoin_pending_address_operations" do |
||||
belongs_to(:address, Address, |
||||
foreign_key: :address_hash, |
||||
references: :hash, |
||||
type: Hash.Address, |
||||
primary_key: true |
||||
) |
||||
|
||||
field(:http_status_code, :integer) |
||||
|
||||
timestamps() |
||||
end |
||||
|
||||
@spec changeset( |
||||
Explorer.Chain.Filecoin.PendingAddressOperation.t(), |
||||
:invalid | %{optional(:__struct__) => none(), optional(atom() | binary()) => any()} |
||||
) :: Ecto.Changeset.t() |
||||
def changeset(%__MODULE__{} = pending_ops, attrs) do |
||||
pending_ops |
||||
|> cast(attrs, @attrs) |
||||
|> validate_required(@required_attrs) |
||||
|> foreign_key_constraint(:address_hash, name: :filecoin_pending_address_operations_address_hash_fkey) |
||||
|> unique_constraint(:address_hash, name: :filecoin_pending_address_operations_pkey) |
||||
|> validate_inclusion(:http_status_code, @http_error_codes) |
||||
end |
||||
|
||||
@doc """ |
||||
Returns a stream of pending operations. |
||||
""" |
||||
@spec stream( |
||||
initial :: accumulator, |
||||
reducer :: (entry :: term(), accumulator -> accumulator), |
||||
limited? :: boolean() |
||||
) :: {:ok, accumulator} |
||||
when accumulator: term() |
||||
def stream(initial, reducer, limited? \\ false) |
||||
when is_function(reducer, 2) do |
||||
query = |
||||
from( |
||||
op in __MODULE__, |
||||
select: op, |
||||
order_by: [desc: op.address_hash] |
||||
) |
||||
|
||||
query |
||||
|> add_fetcher_limit(limited?) |
||||
|> Repo.stream_reduce(initial, reducer) |
||||
end |
||||
end |
@ -0,0 +1,22 @@ |
||||
defmodule Explorer.Chain.Import.Runner.Helper do |
||||
@moduledoc """ |
||||
Provides utility functions for the chain import runners. |
||||
""" |
||||
|
||||
@doc """ |
||||
Executes the import function if the configured chain type matches the |
||||
specified `chain_type`. |
||||
""" |
||||
@spec chain_type_dependent_import( |
||||
Ecto.Multi.t(), |
||||
chain_type :: atom(), |
||||
(Ecto.Multi.t() -> Ecto.Multi.t()) |
||||
) :: Ecto.Multi.t() |
||||
def chain_type_dependent_import(multi, chain_type, multi_run) do |
||||
if Application.get_env(:explorer, :chain_type) == chain_type do |
||||
multi_run.(multi) |
||||
else |
||||
multi |
||||
end |
||||
end |
||||
end |
@ -0,0 +1,71 @@ |
||||
defmodule Explorer.Migrator.FilecoinPendingAddressOperations do |
||||
@moduledoc """ |
||||
Creates a pending address operation for each address missing Filecoin address |
||||
information, specifically when `filecoin_id`, `filecoin_robust`, and |
||||
`filecoin_actor_type` are `nil`. |
||||
""" |
||||
|
||||
use Explorer.Migrator.FillingMigration |
||||
|
||||
import Ecto.Query |
||||
|
||||
alias Explorer.Chain.{Address, Filecoin.PendingAddressOperation, Import} |
||||
alias Explorer.Migrator.FillingMigration |
||||
alias Explorer.Repo |
||||
|
||||
@migration_name "filecoin_pending_address_operations" |
||||
|
||||
@impl FillingMigration |
||||
def migration_name, do: @migration_name |
||||
|
||||
@impl FillingMigration |
||||
def last_unprocessed_identifiers(state) do |
||||
limit = batch_size() * concurrency() |
||||
|
||||
ids = |
||||
unprocessed_data_query() |
||||
|> select([address], address.hash) |
||||
|> limit(^limit) |
||||
|> Repo.all(timeout: :infinity) |
||||
|
||||
{ids, state} |
||||
end |
||||
|
||||
@impl FillingMigration |
||||
def unprocessed_data_query do |
||||
from( |
||||
address in Address, |
||||
left_join: op in PendingAddressOperation, |
||||
on: address.hash == op.address_hash, |
||||
where: |
||||
is_nil(address.filecoin_id) and |
||||
is_nil(address.filecoin_robust) and |
||||
is_nil(address.filecoin_actor_type) and |
||||
is_nil(op.address_hash), |
||||
order_by: [asc: address.hash] |
||||
) |
||||
end |
||||
|
||||
@impl FillingMigration |
||||
def update_batch(ordered_address_hashes) do |
||||
ordered_pending_operations = |
||||
Enum.map( |
||||
ordered_address_hashes, |
||||
&%{address_hash: &1} |
||||
) |
||||
|
||||
Import.insert_changes_list( |
||||
Repo, |
||||
ordered_pending_operations, |
||||
conflict_target: :address_hash, |
||||
on_conflict: :nothing, |
||||
for: PendingAddressOperation, |
||||
returning: true, |
||||
timeout: :infinity, |
||||
timestamps: Import.timestamps() |
||||
) |
||||
end |
||||
|
||||
@impl FillingMigration |
||||
def update_cache, do: :ok |
||||
end |
@ -0,0 +1,23 @@ |
||||
defmodule Explorer.Repo.Filecoin.Migrations.CreatePendingAddressOperations do |
||||
use Ecto.Migration |
||||
|
||||
def change do |
||||
create table(:filecoin_pending_address_operations, primary_key: false) do |
||||
add( |
||||
:address_hash, |
||||
references( |
||||
:addresses, |
||||
column: :hash, |
||||
type: :bytea, |
||||
on_delete: :delete_all |
||||
), |
||||
null: false, |
||||
primary_key: true |
||||
) |
||||
|
||||
add(:http_status_code, :smallint) |
||||
|
||||
timestamps() |
||||
end |
||||
end |
||||
end |
@ -0,0 +1,11 @@ |
||||
defmodule Explorer.Repo.Filecoin.Migrations.AddChainTypeFieldsToAddress do |
||||
use Ecto.Migration |
||||
|
||||
def change do |
||||
alter table(:addresses) do |
||||
add(:filecoin_id, :bytea) |
||||
add(:filecoin_robust, :bytea) |
||||
add(:filecoin_actor_type, :smallint) |
||||
end |
||||
end |
||||
end |
@ -0,0 +1,175 @@ |
||||
defmodule Explorer.Chain.Filecoin.NativeAddressTest do |
||||
use ExUnit.Case, async: true |
||||
|
||||
alias Explorer.Chain.Hash |
||||
alias Explorer.Chain.Hash.Address |
||||
alias Explorer.Chain.Filecoin.{NativeAddress, IDAddress} |
||||
|
||||
doctest NativeAddress |
||||
doctest IDAddress |
||||
|
||||
@doc """ |
||||
The following test cases are taken from the filecoin spec: |
||||
https://spec.filecoin.io/appendix/address/#section-appendix.address.test-vectors |
||||
|
||||
The key is the address and the value is the hex-encoded binary representation |
||||
of the address in the database. |
||||
""" |
||||
# cspell:disable |
||||
@test_cases %{ |
||||
"f00" => "0000", |
||||
"f0150" => "009601", |
||||
"f01024" => "008008", |
||||
"f01729" => "00c10d", |
||||
"f018446744073709551615" => "00ffffffffffffffffff01", |
||||
"f17uoq6tp427uzv7fztkbsnn64iwotfrristwpryy" => "01fd1d0f4dfcd7e99afcb99a8326b7dc459d32c628", |
||||
"f1xcbgdhkgkwht3hrrnui3jdopeejsoatkzmoltqy" => "01b882619d46558f3d9e316d11b48dcf211327026a", |
||||
"f1xtwapqc6nh4si2hcwpr3656iotzmlwumogqbuaa" => "01bcec07c05e69f92468e2b3e3bf77c874f2c5da8c", |
||||
"f1wbxhu3ypkuo6eyp6hjx6davuelxaxrvwb2kuwva" => "01b06e7a6f0f551de261fe3a6fe182b422ee0bc6b6", |
||||
"f12fiakbhe2gwd5cnmrenekasyn6v5tnaxaqizq6a" => "01d1500504e4d1ac3e89ac891a4502586fabd9b417", |
||||
"f24vg6ut43yw2h2jqydgbg2xq7x6f4kub3bg6as6i" => "02e54dea4f9bc5b47d261819826d5e1fbf8bc5503b", |
||||
"f25nml2cfbljvn4goqtclhifepvfnicv6g7mfmmvq" => "02eb58bd08a15a6ade19d0989674148fa95a8157c6", |
||||
"f2nuqrg7vuysaue2pistjjnt3fadsdzvyuatqtfei" => "026d21137eb4c4814269e894d296cf6500e43cd714", |
||||
"f24dd4ox4c2vpf5vk5wkadgyyn6qtuvgcpxxon64a" => "02e0c7c75f82d55e5ed55db28033630df4274a984f", |
||||
"f2gfvuyh7v2sx3patm5k23wdzmhyhtmqctasbr23y" => "02316b4c1ff5d4afb7826ceab5bb0f2c3e0f364053", |
||||
"f3vvmn62lofvhjd2ugzca6sof2j2ubwok6cj4xxbfzz4yuxfkgobpihhd2thlanmsh3w2ptld2gqkn2jvlss4a" => |
||||
"03ad58df696e2d4e91ea86c881e938ba4ea81b395e12797b84b9cf314b9546705e839c7a99d606b247ddb4f9ac7a3414dd", |
||||
"f3wmuu6crofhqmm3v4enos73okk2l366ck6yc4owxwbdtkmpk42ohkqxfitcpa57pjdcftql4tojda2poeruwa" => |
||||
"03b3294f0a2e29e0c66ebc235d2fedca5697bf784af605c75af608e6a63d5cd38ea85ca8989e0efde9188b382f9372460d", |
||||
"f3s2q2hzhkpiknjgmf4zq3ejab2rh62qbndueslmsdzervrhapxr7dftie4kpnpdiv2n6tvkr743ndhrsw6d3a" => |
||||
"0396a1a3e4ea7a14d49985e661b22401d44fed402d1d0925b243c923589c0fbc7e32cd04e29ed78d15d37d3aaa3fe6da33", |
||||
"f3q22fijmmlckhl56rn5nkyamkph3mcfu5ed6dheq53c244hfmnq2i7efdma3cj5voxenwiummf2ajlsbxc65a" => |
||||
"0386b454258c589475f7d16f5aac018a79f6c1169d20fc33921dd8b5ce1cac6c348f90a3603624f6aeb91b64518c2e8095", |
||||
"f3u5zgwa4ael3vuocgc5mfgygo4yuqocrntuuhcklf4xzg5tcaqwbyfabxetwtj4tsam3pbhnwghyhijr5mixa" => |
||||
"03a7726b038022f75a384617585360cee629070a2d9d28712965e5f26ecc40858382803724ed34f2720336f09db631f074" |
||||
} |
||||
|
||||
# cspell:enable |
||||
|
||||
describe "cast/1" do |
||||
test "parses f0, f1, f2, f3 addresses from spec test vectors" do |
||||
for {address, hex_string} <- @test_cases do |
||||
{protocol_indicator_hex, payload} = String.split_at(hex_string, 2) |
||||
protocol_indicator = String.to_integer(protocol_indicator_hex, 16) |
||||
payload = Base.decode16!(payload, case: :lower) |
||||
|
||||
assert {:ok, |
||||
%NativeAddress{ |
||||
protocol_indicator: ^protocol_indicator, |
||||
actor_id: nil, |
||||
payload: ^payload |
||||
}} = NativeAddress.cast(address) |
||||
end |
||||
end |
||||
|
||||
test "parses f4 addresses" do |
||||
address = "f410fabpafjfjgqkc3douo3yzfug5tq4bwfvuhsewxji" |
||||
{:ok, evm_address} = Address.cast("0x005E02A4A934142D8DD476F192D0DD9C381B16B4") |
||||
evm_address_bytes = evm_address.bytes |
||||
|
||||
assert {:ok, |
||||
%NativeAddress{ |
||||
protocol_indicator: 4, |
||||
actor_id: 10, |
||||
payload: ^evm_address_bytes |
||||
}} = NativeAddress.cast(address) |
||||
end |
||||
end |
||||
|
||||
describe "dump/1" do |
||||
test "encodes f0, f1, f2, f3 addresses to bytes" do |
||||
for {address, hex_string} <- @test_cases do |
||||
bytes = Base.decode16!(hex_string, case: :lower) |
||||
|
||||
assert {:ok, ^bytes} = |
||||
address |
||||
|> NativeAddress.cast() |
||||
|> elem(1) |
||||
|> NativeAddress.dump() |
||||
end |
||||
end |
||||
|
||||
test "converts f4 addresses" do |
||||
address = "f410fabpafjfjgqkc3douo3yzfug5tq4bwfvuhsewxji" |
||||
{:ok, evm_address} = Address.cast("0x005E02A4A934142D8DD476F192D0DD9C381B16B4") |
||||
bytes = <<4, 10, evm_address.bytes::binary>> |
||||
|
||||
assert {:ok, ^bytes} = |
||||
address |
||||
|> NativeAddress.cast() |
||||
|> elem(1) |
||||
|> NativeAddress.dump() |
||||
end |
||||
end |
||||
|
||||
describe "load/1" do |
||||
test "decodes f0, f1, f2, f3 addresses from bytes" do |
||||
for {address, hex_string} <- Map.values(@test_cases) do |
||||
{protocol_indicator_hex, payload_hex} = String.split_at(hex_string, 2) |
||||
protocol_indicator = String.to_integer(protocol_indicator_hex, 16) |
||||
payload = Base.decode16!(payload_hex, case: :lower) |
||||
|
||||
assert {:ok, |
||||
%NativeAddress{ |
||||
protocol_indicator: ^protocol_indicator, |
||||
actor_id: nil, |
||||
payload: ^payload |
||||
}} = |
||||
address |
||||
|> NativeAddress.cast() |
||||
|> elem(1) |
||||
|> NativeAddress.dump() |
||||
|> elem(1) |
||||
|> NativeAddress.load() |
||||
end |
||||
end |
||||
|
||||
test "decodes f4 addresses" do |
||||
address = "f410fabpafjfjgqkc3douo3yzfug5tq4bwfvuhsewxji" |
||||
{:ok, %Hash{bytes: payload}} = Address.cast("0x005E02A4A934142D8DD476F192D0DD9C381B16B4") |
||||
|
||||
assert {:ok, |
||||
%NativeAddress{ |
||||
protocol_indicator: 4, |
||||
actor_id: 10, |
||||
payload: ^payload |
||||
}} = |
||||
address |
||||
|> NativeAddress.cast() |
||||
|> elem(1) |
||||
|> NativeAddress.dump() |
||||
|> elem(1) |
||||
|> NativeAddress.load() |
||||
end |
||||
end |
||||
|
||||
describe "to_string/1" do |
||||
test "converts f0, f1, f2, f3 addresses to string" do |
||||
for {address, _} <- @test_cases do |
||||
assert ^address = |
||||
address |
||||
|> NativeAddress.cast() |
||||
|> elem(1) |
||||
|> NativeAddress.dump() |
||||
|> elem(1) |
||||
|> NativeAddress.load() |
||||
|> elem(1) |
||||
|> NativeAddress.to_string() |
||||
end |
||||
end |
||||
|
||||
test "converts f4 addresses to string" do |
||||
address = "f410fabpafjfjgqkc3douo3yzfug5tq4bwfvuhsewxji" |
||||
|
||||
assert ^address = |
||||
address |
||||
|> NativeAddress.cast() |
||||
|> elem(1) |
||||
|> NativeAddress.dump() |
||||
|> elem(1) |
||||
|> NativeAddress.load() |
||||
|> elem(1) |
||||
|> NativeAddress.to_string() |
||||
end |
||||
end |
||||
end |
@ -0,0 +1,215 @@ |
||||
defmodule Indexer.Fetcher.Filecoin.AddressInfo do |
||||
@moduledoc """ |
||||
A task for fetching Filecoin addresses info in the Address table using the |
||||
Beryx API. |
||||
|
||||
Due to the lack of batch support in the API, addresses are fetched |
||||
individually, making this fetching an expensive operation. |
||||
""" |
||||
use Indexer.Fetcher, restart: :permanent |
||||
use Spandex.Decorators |
||||
|
||||
alias Ecto.Multi |
||||
alias Explorer.Chain.{Address, Filecoin.PendingAddressOperation} |
||||
alias Explorer.Repo |
||||
alias Indexer.Fetcher.Filecoin.AddressInfo.Supervisor, as: FilecoinAddressInfoSupervisor |
||||
alias Indexer.Fetcher.Filecoin.BeryxAPI |
||||
alias Indexer.{BufferedTask, Tracer} |
||||
|
||||
@http_error_codes 400..526 |
||||
|
||||
@batch_size 1 |
||||
|
||||
@behaviour BufferedTask |
||||
|
||||
require Logger |
||||
|
||||
@doc """ |
||||
Asynchronously fetches filecoin addresses info |
||||
""" |
||||
@spec async_fetch([PendingAddressOperation.t()], boolean(), integer()) :: :ok |
||||
def async_fetch(pending_operations, realtime?, timeout \\ 5000) |
||||
when is_list(pending_operations) do |
||||
if FilecoinAddressInfoSupervisor.disabled?() do |
||||
:ok |
||||
else |
||||
unique_operations = |
||||
Enum.uniq_by( |
||||
pending_operations, |
||||
&to_string(&1.address_hash) |
||||
) |
||||
|
||||
BufferedTask.buffer(__MODULE__, unique_operations, realtime?, timeout) |
||||
end |
||||
end |
||||
|
||||
@doc false |
||||
@spec child_spec([...]) :: Supervisor.child_spec() |
||||
def child_spec([init_options, gen_server_options]) do |
||||
merged_init_opts = |
||||
defaults() |
||||
|> Keyword.merge(init_options) |
||||
|> Keyword.put(:state, nil) |
||||
|
||||
Supervisor.child_spec( |
||||
{BufferedTask, [{__MODULE__, merged_init_opts}, gen_server_options]}, |
||||
id: __MODULE__ |
||||
) |
||||
end |
||||
|
||||
@doc false |
||||
@impl BufferedTask |
||||
def init(initial, reducer, _) do |
||||
{:ok, final} = |
||||
PendingAddressOperation.stream( |
||||
initial, |
||||
fn op, acc -> reducer.(op, acc) end |
||||
) |
||||
|
||||
final |
||||
end |
||||
|
||||
@doc false |
||||
@spec defaults() :: Keyword.t() |
||||
def defaults do |
||||
env = Application.get_env(:indexer, __MODULE__) |
||||
|
||||
[ |
||||
poll: false, |
||||
flush_interval: :timer.seconds(30), |
||||
max_concurrency: env[:concurrency], |
||||
max_batch_size: @batch_size, |
||||
task_supervisor: __MODULE__.TaskSupervisor, |
||||
metadata: [fetcher: :filecoin_address_info] |
||||
] |
||||
end |
||||
|
||||
@doc """ |
||||
Fetches the Filecoin address info for the given pending operation. |
||||
""" |
||||
@impl BufferedTask |
||||
@decorate trace( |
||||
name: "fetch", |
||||
resource: "Indexer.Fetcher.InternalTransaction.run/2", |
||||
service: :indexer, |
||||
tracer: Tracer |
||||
) |
||||
@spec run([Explorer.Chain.Filecoin.PendingAddressOperation.t(), ...], any()) :: :ok | :retry |
||||
def run([pending_operation], _state) do |
||||
fetch_and_update(pending_operation) |
||||
end |
||||
|
||||
@spec fetch_and_update(PendingAddressOperation.t()) :: :ok | :retry |
||||
defp fetch_and_update(%PendingAddressOperation{address_hash: address_hash} = operation) do |
||||
with {:ok, new_params} <- fetch_address_info_using_beryx_api(operation), |
||||
{:ok, _} <- update_address_and_remove_pending_operation(operation, new_params) do |
||||
Logger.debug("Fetched Filecoin address info for: #{to_string(address_hash)}") |
||||
:ok |
||||
else |
||||
_ -> |
||||
Logger.error("Could not fetch Filecoin address info: #{to_string(address_hash)}") |
||||
:retry |
||||
end |
||||
end |
||||
|
||||
@spec update_address_and_remove_pending_operation( |
||||
PendingAddressOperation.t(), |
||||
%{ |
||||
filecoin_id: String.t(), |
||||
filecoin_robust: String.t(), |
||||
filecoin_actor_type: String.t() |
||||
} |
||||
) :: |
||||
{:ok, PendingAddressOperation.t()} |
||||
| {:error, Ecto.Changeset.t()} |
||||
| Ecto.Multi.failure() |
||||
defp update_address_and_remove_pending_operation( |
||||
%PendingAddressOperation{} = operation, |
||||
new_address_params |
||||
) do |
||||
Multi.new() |
||||
|> Multi.run( |
||||
:acquire_address, |
||||
fn repo, _ -> |
||||
case repo.get_by( |
||||
Address, |
||||
[hash: operation.address_hash], |
||||
lock: "FOR UPDATE" |
||||
) do |
||||
nil -> {:error, :not_found} |
||||
address -> {:ok, address} |
||||
end |
||||
end |
||||
) |
||||
|> Multi.run( |
||||
:acquire_pending_address_operation, |
||||
fn repo, _ -> |
||||
case repo.get_by( |
||||
PendingAddressOperation, |
||||
[address_hash: operation.address_hash], |
||||
lock: "FOR UPDATE" |
||||
) do |
||||
nil -> {:error, :not_found} |
||||
pending_operation -> {:ok, pending_operation} |
||||
end |
||||
end |
||||
) |
||||
|> Multi.run( |
||||
:update_address, |
||||
fn repo, %{acquire_address: address} -> |
||||
address |
||||
|> Address.changeset(new_address_params) |
||||
|> repo.update() |
||||
end |
||||
) |
||||
|> Multi.run( |
||||
:delete_pending_operation, |
||||
fn repo, %{acquire_pending_address_operation: operation} -> |
||||
repo.delete(operation) |
||||
end |
||||
) |
||||
|> Repo.transaction() |
||||
end |
||||
|
||||
@spec fetch_address_info_using_beryx_api(PendingAddressOperation.t()) :: |
||||
{:ok, |
||||
%{ |
||||
filecoin_id: String.t(), |
||||
filecoin_robust: String.t(), |
||||
filecoin_actor_type: String.t() |
||||
}} |
||||
| :error |
||||
defp fetch_address_info_using_beryx_api(%PendingAddressOperation{} = operation) do |
||||
with {:ok, body_json} <- operation.address_hash |> to_string() |> BeryxAPI.fetch_account_info(), |
||||
{:ok, id_address_string} <- Map.fetch(body_json, "short"), |
||||
{:ok, robust_address_string} <- Map.fetch(body_json, "robust"), |
||||
{:ok, actor_type_string} <- Map.fetch(body_json, "actor_type") do |
||||
{:ok, |
||||
%{ |
||||
filecoin_id: id_address_string, |
||||
filecoin_robust: robust_address_string, |
||||
filecoin_actor_type: actor_type_string |
||||
}} |
||||
else |
||||
{:error, status_code, %{"error" => reason}} when status_code in @http_error_codes -> |
||||
Logger.error("Beryx API returned error code #{status_code} with reason: #{reason}") |
||||
|
||||
operation |
||||
|> PendingAddressOperation.changeset(%{http_status_code: status_code}) |
||||
|> Repo.update() |
||||
|> case do |
||||
{:ok, _} -> |
||||
Logger.info("Updated pending operation with error status code") |
||||
|
||||
{:error, changeset} -> |
||||
Logger.error("Could not update pending operation with error status code: #{inspect(changeset)}") |
||||
end |
||||
|
||||
:error |
||||
|
||||
error -> |
||||
Logger.error("Error processing Beryx API response: #{inspect(error)}") |
||||
:error |
||||
end |
||||
end |
||||
end |
@ -0,0 +1,50 @@ |
||||
defmodule Indexer.Fetcher.Filecoin.BeryxAPI do |
||||
@moduledoc """ |
||||
Interacts with the Beryx API to fetch account information based on an Ethereum |
||||
address hash |
||||
""" |
||||
|
||||
alias Explorer.Helper |
||||
alias HTTPoison.Response |
||||
|
||||
@doc """ |
||||
Fetches account information for a given Ethereum address hash from the Beryx API. |
||||
|
||||
## Parameters |
||||
- `eth_address_hash` - The Ethereum address hash to fetch information for. |
||||
|
||||
## Returns |
||||
- `{:ok, map()}`: On success, returns the account information as a map. |
||||
- `{:error, integer(), map()}`: On failure, returns the HTTP status code and the error message as a map. |
||||
- `{:error, HTTPoison.Error.t()}`: On network or other HTTP errors, returns the error structure. |
||||
""" |
||||
@spec fetch_account_info(EthereumJSONRPC.address()) :: |
||||
{:ok, map()} |
||||
| {:error, integer(), map()} |
||||
| {:error, HTTPoison.Error.t()} |
||||
def fetch_account_info(eth_address_hash) do |
||||
config = Application.get_env(:indexer, __MODULE__) |
||||
base_url = config |> Keyword.get(:base_url) |> String.trim_trailing("/") |
||||
api_token = config[:api_token] |
||||
|
||||
url = "#{base_url}/mainnet/account/info/#{eth_address_hash}" |
||||
|
||||
headers = [ |
||||
{"Authorization", "Bearer #{api_token}"}, |
||||
{"Content-Type", "application/json"} |
||||
] |
||||
|
||||
case HTTPoison.get(url, headers) do |
||||
{:ok, %Response{body: body, status_code: 200}} -> |
||||
json = Helper.decode_json(body) |
||||
{:ok, json} |
||||
|
||||
{:ok, %Response{body: body, status_code: status_code}} -> |
||||
json = Helper.decode_json(body) |
||||
{:error, status_code, json} |
||||
|
||||
{:error, %HTTPoison.Error{}} = error -> |
||||
error |
||||
end |
||||
end |
||||
end |
@ -0,0 +1,29 @@ |
||||
defmodule Indexer.Prometheus.Collector.FilecoinPendingAddressOperations do |
||||
@moduledoc """ |
||||
Custom collector to count number of records in filecoin_pending_address_operations table. |
||||
""" |
||||
|
||||
use Prometheus.Collector |
||||
|
||||
alias Explorer.Chain.Filecoin.PendingAddressOperation |
||||
alias Explorer.Repo |
||||
alias Prometheus.Model |
||||
|
||||
def collect_mf(_registry, callback) do |
||||
callback.( |
||||
create_gauge( |
||||
:filecoin_pending_address_operations, |
||||
"Number of records in filecoin_pending_address_operations table", |
||||
Repo.aggregate(PendingAddressOperation, :count, timeout: :infinity) |
||||
) |
||||
) |
||||
end |
||||
|
||||
def collect_metrics(:filecoin_pending_address_operations, count) do |
||||
Model.gauge_metrics([{count}]) |
||||
end |
||||
|
||||
defp create_gauge(name, help, data) do |
||||
Model.create_mf(name, help, :gauge, __MODULE__, data) |
||||
end |
||||
end |
@ -1,4 +1,4 @@ |
||||
defmodule Indexer.Prometheus.PendingBlockOperationsCollector do |
||||
defmodule Indexer.Prometheus.Collector.PendingBlockOperations do |
||||
@moduledoc """ |
||||
Custom collector to count number of records in pending_block_operations table. |
||||
""" |
@ -0,0 +1,17 @@ |
||||
defmodule Indexer.Fetcher.Filecoin.AddressInfo.Supervisor.Case do |
||||
alias Indexer.Fetcher.Filecoin.AddressInfo |
||||
|
||||
def start_supervised!(fetcher_arguments \\ []) when is_list(fetcher_arguments) do |
||||
merged_fetcher_arguments = |
||||
Keyword.merge( |
||||
fetcher_arguments, |
||||
flush_interval: 50, |
||||
max_batch_size: 1, |
||||
max_concurrency: 1 |
||||
) |
||||
|
||||
[merged_fetcher_arguments] |
||||
|> AddressInfo.Supervisor.child_spec() |
||||
|> ExUnit.Callbacks.start_supervised!() |
||||
end |
||||
end |
Loading…
Reference in new issue