on-fly fetching of token instances

Currently, we have to restart indexer to fetch metadata
for newly imported ERC-721 token instances.

This PR triggers fetching of new token instances after blocks
imports in Catchup and Realtime fetchers.
pull/2762/head
Ayrat Badykov 5 years ago
parent 766abfc82d
commit d34275d2fe
No known key found for this signature in database
GPG Key ID: B44668E265E9396F
  1. 23
      apps/explorer/lib/explorer/chain.ex
  2. 2
      apps/indexer/lib/indexer/block/catchup/fetcher.ex
  3. 7
      apps/indexer/lib/indexer/block/fetcher.ex
  4. 2
      apps/indexer/lib/indexer/block/realtime/fetcher.ex
  5. 6
      apps/indexer/lib/indexer/fetcher/token_instance.ex

@ -2954,6 +2954,29 @@ defmodule Explorer.Chain do
Repo.stream_reduce(query, initial, reducer) Repo.stream_reduce(query, initial, reducer)
end end
def token_transfers_without_instances(token_transfers) do
token_contract_address_hashes =
token_transfers
|> Enum.map(fn token_transfer -> token_transfer.token_contract_address_hash end)
|> Enum.uniq()
query =
from(
token_transfer in TokenTransfer,
inner_join: token in Token,
on: token.contract_address_hash == token_transfer.token_contract_address_hash,
left_join: instance in Instance,
on: token_transfer.token_id == instance.token_id,
where:
token.type == ^"ERC-721" and is_nil(instance.token_id) and
token_transfer.token_contract_address_hash in ^token_contract_address_hashes,
distinct: [token_transfer.token_contract_address_hash, token_transfer.token_id],
select: %{contract_address_hash: token_transfer.token_contract_address_hash, token_id: token_transfer.token_id}
)
Repo.all(query)
end
@doc """ @doc """
Streams a list of token contract addresses that have been cataloged. Streams a list of token contract addresses that have been cataloged.
""" """

@ -16,6 +16,7 @@ defmodule Indexer.Block.Catchup.Fetcher do
async_import_replaced_transactions: 1, async_import_replaced_transactions: 1,
async_import_tokens: 1, async_import_tokens: 1,
async_import_token_balances: 1, async_import_token_balances: 1,
async_import_token_instances: 1,
async_import_uncles: 1, async_import_uncles: 1,
fetch_and_import_range: 2 fetch_and_import_range: 2
] ]
@ -164,6 +165,7 @@ defmodule Indexer.Block.Catchup.Fetcher do
async_import_token_balances(imported) async_import_token_balances(imported)
async_import_uncles(imported) async_import_uncles(imported)
async_import_replaced_transactions(imported) async_import_replaced_transactions(imported)
async_import_token_instances(imported)
end end
defp stream_fetch_and_import(%__MODULE__{blocks_concurrency: blocks_concurrency} = state, sequence) defp stream_fetch_and_import(%__MODULE__{blocks_concurrency: blocks_concurrency} = state, sequence)

@ -25,6 +25,7 @@ defmodule Indexer.Block.Fetcher do
StakingPools, StakingPools,
Token, Token,
TokenBalance, TokenBalance,
TokenInstance,
UncleBlock UncleBlock
} }
@ -229,6 +230,12 @@ defmodule Indexer.Block.Fetcher do
callback_module.import(state, options_with_broadcast) callback_module.import(state, options_with_broadcast)
end end
def async_import_token_instances(%{token_transfers: token_transfers}) do
TokenInstance.async_fetch(token_transfers)
end
def async_import_token_instances(_), do: :ok
def async_import_block_rewards([]), do: :ok def async_import_block_rewards([]), do: :ok
def async_import_block_rewards(errors) when is_list(errors) do def async_import_block_rewards(errors) when is_list(errors) do

@ -19,6 +19,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
async_import_replaced_transactions: 1, async_import_replaced_transactions: 1,
async_import_tokens: 1, async_import_tokens: 1,
async_import_token_balances: 1, async_import_token_balances: 1,
async_import_token_instances: 1,
async_import_uncles: 1, async_import_uncles: 1,
fetch_and_import_range: 2, fetch_and_import_range: 2,
async_import_staking_pools: 0 async_import_staking_pools: 0
@ -358,6 +359,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
async_import_internal_transactions(imported, Keyword.get(json_rpc_named_arguments, :variant)) async_import_internal_transactions(imported, Keyword.get(json_rpc_named_arguments, :variant))
async_import_tokens(imported) async_import_tokens(imported)
async_import_token_balances(imported) async_import_token_balances(imported)
async_import_token_instances(imported)
async_import_uncles(imported) async_import_uncles(imported)
async_import_replaced_transactions(imported) async_import_replaced_transactions(imported)
async_import_staking_pools() async_import_staking_pools()

@ -69,6 +69,12 @@ defmodule Indexer.Fetcher.TokenInstance do
@doc """ @doc """
Fetches token instance data asynchronously. Fetches token instance data asynchronously.
""" """
def async_fetch(token_transfers) when is_list(token_transfers) do
data = Chain.token_transfers_without_instances(token_transfers)
BufferedTask.buffer(__MODULE__, data)
end
def async_fetch(data) do def async_fetch(data) do
BufferedTask.buffer(__MODULE__, data) BufferedTask.buffer(__MODULE__, data)
end end

Loading…
Cancel
Save