Missed token transfer cataloger (#807)
* Rename token transfer params parser * Rename token transfer parsing function * Requeue blocks with incomplete token transfer indexing on start up * Add additional test for worker * Add missed token transfers to the front of the catchup queue * Address code review concerns * Remove unused alias * Add process names for uncataloged token transfer processes * Improved handling of when sequence isn't avaialbe to queue * Move Uncataloged.Supervisor name out of init and into start_link call Name is not a valid option to init as it is a GenServer.option, which means it is passed to the 3rd argument to Supervisor.start_link.pull/849/head
parent
b53d846e68
commit
d7e38a223e
@ -0,0 +1,126 @@ |
||||
defmodule Indexer.TokenTransfer.Parser do |
||||
@moduledoc """ |
||||
Helper functions for transforming data for ERC-20 and ERC-721 token transfers. |
||||
""" |
||||
|
||||
require Logger |
||||
|
||||
alias ABI.TypeDecoder |
||||
alias Explorer.Chain.TokenTransfer |
||||
|
||||
@doc """ |
||||
Returns a list of token transfers given a list of logs. |
||||
""" |
||||
def parse(logs) do |
||||
initial_acc = %{tokens: [], token_transfers: []} |
||||
|
||||
logs |
||||
|> Enum.filter(&(&1.first_topic == unquote(TokenTransfer.constant()))) |
||||
|> Enum.reduce(initial_acc, &do_parse/2) |
||||
end |
||||
|
||||
defp do_parse(log, %{tokens: tokens, token_transfers: token_transfers} = acc) do |
||||
{token, token_transfer} = parse_params(log) |
||||
|
||||
%{ |
||||
tokens: [token | tokens], |
||||
token_transfers: [token_transfer | token_transfers] |
||||
} |
||||
rescue |
||||
_ in [FunctionClauseError, MatchError] -> |
||||
Logger.error(fn -> "Unknown token transfer format: #{inspect(log)}" end) |
||||
acc |
||||
end |
||||
|
||||
# ERC-20 token transfer |
||||
defp parse_params(%{second_topic: second_topic, third_topic: third_topic, fourth_topic: nil} = log) |
||||
when not is_nil(second_topic) and not is_nil(third_topic) do |
||||
[amount] = decode_data(log.data, [{:uint, 256}]) |
||||
|
||||
token_transfer = %{ |
||||
amount: Decimal.new(amount || 0), |
||||
block_number: log.block_number, |
||||
log_index: log.index, |
||||
from_address_hash: truncate_address_hash(log.second_topic), |
||||
to_address_hash: truncate_address_hash(log.third_topic), |
||||
token_contract_address_hash: log.address_hash, |
||||
transaction_hash: log.transaction_hash, |
||||
token_type: "ERC-20" |
||||
} |
||||
|
||||
token = %{ |
||||
contract_address_hash: log.address_hash, |
||||
type: "ERC-20" |
||||
} |
||||
|
||||
{token, token_transfer} |
||||
end |
||||
|
||||
# ERC-721 token transfer with topics as addresses |
||||
defp parse_params(%{second_topic: second_topic, third_topic: third_topic, fourth_topic: fourth_topic} = log) |
||||
when not is_nil(second_topic) and not is_nil(third_topic) and not is_nil(fourth_topic) do |
||||
[token_id] = decode_data(fourth_topic, [{:uint, 256}]) |
||||
|
||||
token_transfer = %{ |
||||
block_number: log.block_number, |
||||
log_index: log.index, |
||||
from_address_hash: truncate_address_hash(log.second_topic), |
||||
to_address_hash: truncate_address_hash(log.third_topic), |
||||
token_contract_address_hash: log.address_hash, |
||||
token_id: token_id || 0, |
||||
transaction_hash: log.transaction_hash, |
||||
token_type: "ERC-721" |
||||
} |
||||
|
||||
token = %{ |
||||
contract_address_hash: log.address_hash, |
||||
type: "ERC-721" |
||||
} |
||||
|
||||
{token, token_transfer} |
||||
end |
||||
|
||||
# ERC-721 token transfer with info in data field instead of in log topics |
||||
defp parse_params(%{second_topic: nil, third_topic: nil, fourth_topic: nil, data: data} = log) |
||||
when not is_nil(data) do |
||||
[from_address_hash, to_address_hash, token_id] = decode_data(data, [:address, :address, {:uint, 256}]) |
||||
|
||||
token_transfer = %{ |
||||
block_number: log.block_number, |
||||
log_index: log.index, |
||||
from_address_hash: encode_address_hash(from_address_hash), |
||||
to_address_hash: encode_address_hash(to_address_hash), |
||||
token_contract_address_hash: log.address_hash, |
||||
token_id: token_id, |
||||
transaction_hash: log.transaction_hash, |
||||
token_type: "ERC-721" |
||||
} |
||||
|
||||
token = %{ |
||||
contract_address_hash: log.address_hash, |
||||
type: "ERC-721" |
||||
} |
||||
|
||||
{token, token_transfer} |
||||
end |
||||
|
||||
defp truncate_address_hash(nil), do: "0x0000000000000000000000000000000000000000" |
||||
|
||||
defp truncate_address_hash("0x000000000000000000000000" <> truncated_hash) do |
||||
"0x#{truncated_hash}" |
||||
end |
||||
|
||||
defp encode_address_hash(binary) do |
||||
"0x" <> Base.encode16(binary, case: :lower) |
||||
end |
||||
|
||||
defp decode_data("0x", types) do |
||||
for _ <- types, do: nil |
||||
end |
||||
|
||||
defp decode_data("0x" <> encoded_data, types) do |
||||
encoded_data |
||||
|> Base.decode16!(case: :mixed) |
||||
|> TypeDecoder.decode_raw(types) |
||||
end |
||||
end |
@ -0,0 +1,44 @@ |
||||
defmodule Indexer.TokenTransfer.Uncataloged.Supervisor do |
||||
@moduledoc """ |
||||
Supervises process for ensuring uncataloged token transfers get queued for indexing. |
||||
""" |
||||
|
||||
use Supervisor |
||||
|
||||
alias Indexer.TokenTransfer.Uncataloged.Worker |
||||
|
||||
def child_spec([]) do |
||||
child_spec([[]]) |
||||
end |
||||
|
||||
def child_spec([init_arguments]) do |
||||
child_spec([init_arguments, [name: __MODULE__]]) |
||||
end |
||||
|
||||
def child_spec([_init_arguments, _gen_server_options] = start_link_arguments) do |
||||
spec = %{ |
||||
id: __MODULE__, |
||||
start: {__MODULE__, :start_link, start_link_arguments}, |
||||
restart: :transient, |
||||
type: :supervisor |
||||
} |
||||
|
||||
Supervisor.child_spec(spec, []) |
||||
end |
||||
|
||||
def start_link(init_arguments, gen_server_options \\ []) do |
||||
Supervisor.start_link(__MODULE__, init_arguments, gen_server_options) |
||||
end |
||||
|
||||
@impl Supervisor |
||||
def init(_) do |
||||
children = [ |
||||
{Worker, [[supervisor: self()], [name: Worker]]}, |
||||
{Task.Supervisor, name: Indexer.TokenTransfer.Uncataloged.TaskSupervisor} |
||||
] |
||||
|
||||
opts = [strategy: :one_for_all] |
||||
|
||||
Supervisor.init(children, opts) |
||||
end |
||||
end |
@ -0,0 +1,90 @@ |
||||
defmodule Indexer.TokenTransfer.Uncataloged.Worker do |
||||
@moduledoc """ |
||||
Catalogs token tranfer logs missing an accompanying token transfer record. |
||||
|
||||
Missed token transfers happen due to formats that aren't supported at the time |
||||
they were parsed during main indexing. Updated the parser and rebooting will allow |
||||
this process to properly catalog those missed token transfers. |
||||
""" |
||||
|
||||
use GenServer |
||||
|
||||
alias Explorer.Chain |
||||
alias Indexer.Block.Catchup.Fetcher |
||||
alias Indexer.TokenTransfer.Uncataloged |
||||
|
||||
def child_spec([init_arguments]) do |
||||
child_spec([init_arguments, []]) |
||||
end |
||||
|
||||
def child_spec([_init_arguments, _gen_server_options] = start_link_arguments) do |
||||
spec = %{ |
||||
id: __MODULE__, |
||||
start: {__MODULE__, :start_link, start_link_arguments}, |
||||
restart: :transient, |
||||
type: :worker |
||||
} |
||||
|
||||
Supervisor.child_spec(spec, []) |
||||
end |
||||
|
||||
def start_link(init_arguments, gen_server_options \\ []) do |
||||
GenServer.start_link(__MODULE__, init_arguments, gen_server_options) |
||||
end |
||||
|
||||
def init(opts) do |
||||
sup_pid = Keyword.fetch!(opts, :supervisor) |
||||
retry_interval = Keyword.get(opts, :retry_interval, 10_000) |
||||
|
||||
send(self(), :scan) |
||||
|
||||
state = %{ |
||||
block_numbers: [], |
||||
retry_interval: retry_interval, |
||||
sup_pid: sup_pid, |
||||
task_ref: nil |
||||
} |
||||
|
||||
{:ok, state} |
||||
end |
||||
|
||||
def handle_info(:scan, state) do |
||||
{:ok, block_numbers} = Chain.uncataloged_token_transfer_block_numbers() |
||||
|
||||
case block_numbers do |
||||
[] -> |
||||
Supervisor.stop(state.sup_pid, :normal) |
||||
{:noreply, state} |
||||
|
||||
block_numbers -> |
||||
Process.send_after(self(), :enqueue_blocks, state.retry_interval) |
||||
{:noreply, %{state | block_numbers: block_numbers}} |
||||
end |
||||
end |
||||
|
||||
def handle_info(:enqueue_blocks, %{block_numbers: block_numbers} = state) do |
||||
%Task{ref: ref} = async_enqueue(block_numbers) |
||||
{:noreply, %{state | task_ref: ref}} |
||||
end |
||||
|
||||
def handle_info({ref, :ok}, %{task_ref: ref, sup_pid: sup_pid}) do |
||||
Process.demonitor(ref, [:flush]) |
||||
Supervisor.stop(sup_pid, :normal) |
||||
{:stop, :shutdown} |
||||
end |
||||
|
||||
def handle_info({ref, {:error, :queue_unavailable}}, %{task_ref: ref, retry_interval: millis} = state) do |
||||
Process.demonitor(ref, [:flush]) |
||||
Process.send_after(self(), :enqueue_blocks, millis) |
||||
{:noreply, %{state | task_ref: nil}} |
||||
end |
||||
|
||||
def handle_info({:DOWN, ref, :process, _, _}, %{task_ref: ref, retry_interval: millis} = state) do |
||||
Process.send_after(self(), :enqueue_blocks, millis) |
||||
{:noreply, %{state | task_ref: nil}} |
||||
end |
||||
|
||||
defp async_enqueue(block_numbers) do |
||||
Task.Supervisor.async_nolink(Uncataloged.TaskSupervisor, Fetcher, :enqueue, [block_numbers]) |
||||
end |
||||
end |
@ -1,126 +1,9 @@ |
||||
defmodule Indexer.TokenTransfers do |
||||
@moduledoc """ |
||||
Helper functions for transforming data for ERC-20 and ERC-721 token transfers. |
||||
Context for working with token transfers. |
||||
""" |
||||
|
||||
require Logger |
||||
alias Indexer.TokenTransfer.Parser |
||||
|
||||
alias ABI.TypeDecoder |
||||
alias Explorer.Chain.TokenTransfer |
||||
|
||||
@doc """ |
||||
Returns a list of token transfers given a list of logs. |
||||
""" |
||||
def from_log_params(logs) do |
||||
initial_acc = %{tokens: [], token_transfers: []} |
||||
|
||||
logs |
||||
|> Enum.filter(&(&1.first_topic == unquote(TokenTransfer.constant()))) |
||||
|> Enum.reduce(initial_acc, &do_from_log_params/2) |
||||
end |
||||
|
||||
defp do_from_log_params(log, %{tokens: tokens, token_transfers: token_transfers} = acc) do |
||||
{token, token_transfer} = parse_params(log) |
||||
|
||||
%{ |
||||
tokens: [token | tokens], |
||||
token_transfers: [token_transfer | token_transfers] |
||||
} |
||||
rescue |
||||
_ in [FunctionClauseError, MatchError] -> |
||||
Logger.error(fn -> "Unknown token transfer format: #{inspect(log)}" end) |
||||
acc |
||||
end |
||||
|
||||
# ERC-20 token transfer |
||||
defp parse_params(%{second_topic: second_topic, third_topic: third_topic, fourth_topic: nil} = log) |
||||
when not is_nil(second_topic) and not is_nil(third_topic) do |
||||
[amount] = decode_data(log.data, [{:uint, 256}]) |
||||
|
||||
token_transfer = %{ |
||||
amount: Decimal.new(amount || 0), |
||||
block_number: log.block_number, |
||||
log_index: log.index, |
||||
from_address_hash: truncate_address_hash(log.second_topic), |
||||
to_address_hash: truncate_address_hash(log.third_topic), |
||||
token_contract_address_hash: log.address_hash, |
||||
transaction_hash: log.transaction_hash, |
||||
token_type: "ERC-20" |
||||
} |
||||
|
||||
token = %{ |
||||
contract_address_hash: log.address_hash, |
||||
type: "ERC-20" |
||||
} |
||||
|
||||
{token, token_transfer} |
||||
end |
||||
|
||||
# ERC-721 token transfer with topics as addresses |
||||
defp parse_params(%{second_topic: second_topic, third_topic: third_topic, fourth_topic: fourth_topic} = log) |
||||
when not is_nil(second_topic) and not is_nil(third_topic) and not is_nil(fourth_topic) do |
||||
[token_id] = decode_data(fourth_topic, [{:uint, 256}]) |
||||
|
||||
token_transfer = %{ |
||||
block_number: log.block_number, |
||||
log_index: log.index, |
||||
from_address_hash: truncate_address_hash(log.second_topic), |
||||
to_address_hash: truncate_address_hash(log.third_topic), |
||||
token_contract_address_hash: log.address_hash, |
||||
token_id: token_id || 0, |
||||
transaction_hash: log.transaction_hash, |
||||
token_type: "ERC-721" |
||||
} |
||||
|
||||
token = %{ |
||||
contract_address_hash: log.address_hash, |
||||
type: "ERC-721" |
||||
} |
||||
|
||||
{token, token_transfer} |
||||
end |
||||
|
||||
# ERC-721 token transfer with info in data field instead of in log topics |
||||
defp parse_params(%{second_topic: nil, third_topic: nil, fourth_topic: nil, data: data} = log) |
||||
when not is_nil(data) do |
||||
[from_address_hash, to_address_hash, token_id] = decode_data(data, [:address, :address, {:uint, 256}]) |
||||
|
||||
token_transfer = %{ |
||||
block_number: log.block_number, |
||||
log_index: log.index, |
||||
from_address_hash: encode_address_hash(from_address_hash), |
||||
to_address_hash: encode_address_hash(to_address_hash), |
||||
token_contract_address_hash: log.address_hash, |
||||
token_id: token_id, |
||||
transaction_hash: log.transaction_hash, |
||||
token_type: "ERC-721" |
||||
} |
||||
|
||||
token = %{ |
||||
contract_address_hash: log.address_hash, |
||||
type: "ERC-721" |
||||
} |
||||
|
||||
{token, token_transfer} |
||||
end |
||||
|
||||
defp truncate_address_hash(nil), do: "0x0000000000000000000000000000000000000000" |
||||
|
||||
defp truncate_address_hash("0x000000000000000000000000" <> truncated_hash) do |
||||
"0x#{truncated_hash}" |
||||
end |
||||
|
||||
defp encode_address_hash(binary) do |
||||
"0x" <> Base.encode16(binary, case: :lower) |
||||
end |
||||
|
||||
defp decode_data("0x", types) do |
||||
for _ <- types, do: nil |
||||
end |
||||
|
||||
defp decode_data("0x" <> encoded_data, types) do |
||||
encoded_data |
||||
|> Base.decode16!(case: :mixed) |
||||
|> TypeDecoder.decode_raw(types) |
||||
end |
||||
defdelegate parse(items), to: Parser |
||||
end |
||||
|
@ -0,0 +1,76 @@ |
||||
defmodule Indexer.TokenTransfer.Uncataloged.WorkerTest do |
||||
use Explorer.DataCase |
||||
|
||||
alias Indexer.TokenTransfer.Uncataloged.{Worker, TaskSupervisor} |
||||
|
||||
describe "start_link/1" do |
||||
test "starts the worker" do |
||||
assert {:ok, _pid} = Worker.start_link(supervisor: self()) |
||||
end |
||||
end |
||||
|
||||
describe "init/1" do |
||||
test "sends message to self" do |
||||
pid = self() |
||||
assert {:ok, %{task_ref: nil, block_numbers: [], sup_pid: ^pid}} = Worker.init(supervisor: self()) |
||||
assert_received :scan |
||||
end |
||||
end |
||||
|
||||
describe "handle_info with :scan" do |
||||
test "sends shutdown to supervisor" do |
||||
state = %{task_ref: nil, block_numbers: [], sup_pid: self()} |
||||
Task.async(fn -> Worker.handle_info(:scan, state) end) |
||||
assert_receive {_, _, {:terminate, :normal}} |
||||
end |
||||
|
||||
test "sends message to self when uncataloged token transfers are found" do |
||||
log = insert(:token_transfer_log) |
||||
block_number = log.transaction.block_number |
||||
|
||||
expected_state = %{task_ref: nil, block_numbers: [block_number], retry_interval: 1} |
||||
state = %{task_ref: nil, block_numbers: [], retry_interval: 1} |
||||
|
||||
assert {:noreply, ^expected_state} = Worker.handle_info(:scan, state) |
||||
assert_receive :enqueue_blocks |
||||
end |
||||
end |
||||
|
||||
describe "handle_info with :enqueue_blocks" do |
||||
test "starts a task" do |
||||
task_sup_pid = start_supervised!({Task.Supervisor, name: TaskSupervisor}) |
||||
|
||||
state = %{task_ref: nil, block_numbers: [1]} |
||||
assert {:noreply, new_state} = Worker.handle_info(:enqueue_blocks, state) |
||||
assert is_reference(new_state.task_ref) |
||||
|
||||
stop_supervised(task_sup_pid) |
||||
end |
||||
end |
||||
|
||||
describe "handle_info with task ref tuple" do |
||||
test "sends shutdown to supervisor on success" do |
||||
ref = Process.monitor(self()) |
||||
state = %{task_ref: ref, block_numbers: [], sup_pid: self()} |
||||
Task.async(fn -> assert Worker.handle_info({ref, :ok}, state) end) |
||||
assert_receive {_, _, {:terminate, :normal}} |
||||
end |
||||
|
||||
test "sends message to self to try again on failure" do |
||||
ref = Process.monitor(self()) |
||||
state = %{task_ref: ref, block_numbers: [1], sup_pid: self(), retry_interval: 1} |
||||
expected_state = %{state | task_ref: nil} |
||||
assert {:noreply, ^expected_state} = Worker.handle_info({ref, {:error, :queue_unavailable}}, state) |
||||
assert_receive :enqueue_blocks |
||||
end |
||||
end |
||||
|
||||
describe "handle_info with failed task" do |
||||
test "sends message to self to try again" do |
||||
ref = Process.monitor(self()) |
||||
state = %{task_ref: ref, block_numbers: [1], sup_pid: self(), retry_interval: 1} |
||||
assert {:noreply, %{task_ref: nil}} = Worker.handle_info({:DOWN, ref, :process, self(), :EXIT}, state) |
||||
assert_receive :enqueue_blocks |
||||
end |
||||
end |
||||
end |
Loading…
Reference in new issue