feat: Push relevant entries to the front of bound queue (#10193)

* feat: Push relevant entries to the front of bound queue

* Refactor realtime? parameter in async_import_remaining_block_data
pull/10216/head
Qwerty5Uiop 6 months ago committed by GitHub
parent 3bea0e842e
commit 653f91248b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 34
      apps/indexer/lib/indexer/block/catchup/fetcher.ex
  2. 48
      apps/indexer/lib/indexer/block/fetcher.ex
  3. 34
      apps/indexer/lib/indexer/block/realtime/fetcher.ex
  4. 18
      apps/indexer/lib/indexer/bound_queue.ex
  5. 65
      apps/indexer/lib/indexer/buffered_task.ex
  6. 4
      apps/indexer/lib/indexer/fetcher/beacon/blob.ex
  7. 6
      apps/indexer/lib/indexer/fetcher/block_reward.ex
  8. 2
      apps/indexer/lib/indexer/fetcher/coin_balance/catchup.ex
  9. 2
      apps/indexer/lib/indexer/fetcher/coin_balance/realtime.ex
  10. 7
      apps/indexer/lib/indexer/fetcher/contract_code.ex
  11. 6
      apps/indexer/lib/indexer/fetcher/internal_transaction.ex
  12. 2
      apps/indexer/lib/indexer/fetcher/pending_block_operations_sanitizer.ex
  13. 2
      apps/indexer/lib/indexer/fetcher/polygon_zkevm/bridge_l1_tokens.ex
  14. 21
      apps/indexer/lib/indexer/fetcher/replaced_transaction.ex
  15. 6
      apps/indexer/lib/indexer/fetcher/token.ex
  16. 25
      apps/indexer/lib/indexer/fetcher/token_balance.ex
  17. 6
      apps/indexer/lib/indexer/fetcher/token_instance/realtime.ex
  18. 9
      apps/indexer/lib/indexer/fetcher/uncle_block.ex
  19. 2
      apps/indexer/lib/indexer/token_balances.ex
  20. 6
      apps/indexer/test/indexer/block/catchup/fetcher_test.exs
  21. 16
      apps/indexer/test/indexer/buffered_task_test.exs
  22. 2
      apps/indexer/test/indexer/fetcher/beacon/blob_test.exs
  23. 18
      apps/indexer/test/indexer/fetcher/block_reward_test.exs
  24. 9
      apps/indexer/test/indexer/fetcher/contract_code_test.exs
  25. 17
      apps/indexer/test/indexer/fetcher/replaced_transaction_test.exs

@ -9,16 +9,16 @@ defmodule Indexer.Block.Catchup.Fetcher do
import Indexer.Block.Fetcher,
only: [
async_import_blobs: 1,
async_import_block_rewards: 1,
async_import_blobs: 2,
async_import_block_rewards: 2,
async_import_coin_balances: 2,
async_import_created_contract_codes: 1,
async_import_internal_transactions: 1,
async_import_replaced_transactions: 1,
async_import_tokens: 1,
async_import_token_balances: 1,
async_import_created_contract_codes: 2,
async_import_internal_transactions: 2,
async_import_replaced_transactions: 2,
async_import_tokens: 2,
async_import_token_balances: 2,
async_import_token_instances: 1,
async_import_uncles: 1,
async_import_uncles: 2,
fetch_and_import_range: 2
]
@ -127,16 +127,18 @@ defmodule Indexer.Block.Catchup.Fetcher do
imported,
%{block_rewards: %{errors: block_reward_errors}} = options
) do
async_import_block_rewards(block_reward_errors)
realtime? = false
async_import_block_rewards(block_reward_errors, realtime?)
async_import_coin_balances(imported, options)
async_import_created_contract_codes(imported)
async_import_internal_transactions(imported)
async_import_tokens(imported)
async_import_token_balances(imported)
async_import_uncles(imported)
async_import_replaced_transactions(imported)
async_import_created_contract_codes(imported, realtime?)
async_import_internal_transactions(imported, realtime?)
async_import_tokens(imported, realtime?)
async_import_token_balances(imported, realtime?)
async_import_uncles(imported, realtime?)
async_import_replaced_transactions(imported, realtime?)
async_import_token_instances(imported)
async_import_blobs(imported)
async_import_blobs(imported, realtime?)
end
defp stream_fetch_and_import(state, ranges) do

@ -364,25 +364,25 @@ defmodule Indexer.Block.Fetcher do
def async_import_token_instances(_), do: :ok
def async_import_blobs(%{blocks: blocks}) do
def async_import_blobs(%{blocks: blocks}, realtime?) do
timestamps =
blocks
|> Enum.filter(fn block -> block |> Map.get(:blob_gas_used, 0) > 0 end)
|> Enum.map(&Map.get(&1, :timestamp))
if not Enum.empty?(timestamps) do
Blob.async_fetch(timestamps)
Blob.async_fetch(timestamps, realtime?)
end
end
def async_import_blobs(_), do: :ok
def async_import_blobs(_, _), do: :ok
def async_import_block_rewards([]), do: :ok
def async_import_block_rewards([], _realtime?), do: :ok
def async_import_block_rewards(errors) when is_list(errors) do
def async_import_block_rewards(errors, realtime?) when is_list(errors) do
errors
|> block_reward_errors_to_block_numbers()
|> BlockReward.async_fetch()
|> BlockReward.async_fetch(realtime?)
end
def async_import_coin_balances(%{addresses: addresses}, %{
@ -404,7 +404,7 @@ defmodule Indexer.Block.Fetcher do
def async_import_realtime_coin_balances(_), do: :ok
def async_import_created_contract_codes(%{transactions: transactions}) do
def async_import_created_contract_codes(%{transactions: transactions}, realtime?) do
transactions
|> Enum.flat_map(fn
%Transaction{
@ -418,40 +418,40 @@ defmodule Indexer.Block.Fetcher do
%Transaction{created_contract_address_hash: nil} ->
[]
end)
|> ContractCode.async_fetch(10_000)
|> ContractCode.async_fetch(realtime?, 10_000)
end
def async_import_created_contract_codes(_), do: :ok
def async_import_created_contract_codes(_, _), do: :ok
def async_import_internal_transactions(%{blocks: blocks}) do
def async_import_internal_transactions(%{blocks: blocks}, realtime?) do
blocks
|> Enum.map(fn %Block{number: block_number} -> block_number end)
|> InternalTransaction.async_fetch(10_000)
|> InternalTransaction.async_fetch(realtime?, 10_000)
end
def async_import_internal_transactions(_), do: :ok
def async_import_internal_transactions(_, _), do: :ok
def async_import_tokens(%{tokens: tokens}) do
def async_import_tokens(%{tokens: tokens}, realtime?) do
tokens
|> Enum.map(& &1.contract_address_hash)
|> Token.async_fetch()
|> Token.async_fetch(realtime?)
end
def async_import_tokens(_), do: :ok
def async_import_tokens(_, _), do: :ok
def async_import_token_balances(%{address_token_balances: token_balances}) do
TokenBalance.async_fetch(token_balances)
def async_import_token_balances(%{address_token_balances: token_balances}, realtime?) do
TokenBalance.async_fetch(token_balances, realtime?)
end
def async_import_token_balances(_), do: :ok
def async_import_token_balances(_, _), do: :ok
def async_import_uncles(%{block_second_degree_relations: block_second_degree_relations}) do
UncleBlock.async_fetch_blocks(block_second_degree_relations)
def async_import_uncles(%{block_second_degree_relations: block_second_degree_relations}, realtime?) do
UncleBlock.async_fetch_blocks(block_second_degree_relations, realtime?)
end
def async_import_uncles(_), do: :ok
def async_import_uncles(_, _), do: :ok
def async_import_replaced_transactions(%{transactions: transactions}) do
def async_import_replaced_transactions(%{transactions: transactions}, realtime?) do
transactions
|> Enum.flat_map(fn
%Transaction{block_hash: %Hash{} = block_hash, nonce: nonce, from_address_hash: %Hash{} = from_address_hash} ->
@ -460,10 +460,10 @@ defmodule Indexer.Block.Fetcher do
%Transaction{block_hash: nil} ->
[]
end)
|> ReplacedTransaction.async_fetch(10_000)
|> ReplacedTransaction.async_fetch(realtime?, 10_000)
end
def async_import_replaced_transactions(_), do: :ok
def async_import_replaced_transactions(_, _), do: :ok
@doc """
Fills a buffer of L1 token addresses to handle it asynchronously in

@ -14,15 +14,15 @@ defmodule Indexer.Block.Realtime.Fetcher do
import Indexer.Block.Fetcher,
only: [
async_import_realtime_coin_balances: 1,
async_import_blobs: 1,
async_import_block_rewards: 1,
async_import_created_contract_codes: 1,
async_import_internal_transactions: 1,
async_import_replaced_transactions: 1,
async_import_tokens: 1,
async_import_token_balances: 1,
async_import_blobs: 2,
async_import_block_rewards: 2,
async_import_created_contract_codes: 2,
async_import_internal_transactions: 2,
async_import_replaced_transactions: 2,
async_import_tokens: 2,
async_import_token_balances: 2,
async_import_token_instances: 1,
async_import_uncles: 1,
async_import_uncles: 2,
async_import_polygon_zkevm_bridge_l1_tokens: 1,
fetch_and_import_range: 2
]
@ -452,16 +452,18 @@ defmodule Indexer.Block.Realtime.Fetcher do
imported,
%{block_rewards: %{errors: block_reward_errors}}
) do
realtime? = true
async_import_realtime_coin_balances(imported)
async_import_block_rewards(block_reward_errors)
async_import_created_contract_codes(imported)
async_import_internal_transactions(imported)
async_import_tokens(imported)
async_import_token_balances(imported)
async_import_block_rewards(block_reward_errors, realtime?)
async_import_created_contract_codes(imported, realtime?)
async_import_internal_transactions(imported, realtime?)
async_import_tokens(imported, realtime?)
async_import_token_balances(imported, realtime?)
async_import_token_instances(imported)
async_import_uncles(imported)
async_import_replaced_transactions(imported)
async_import_blobs(imported)
async_import_uncles(imported, realtime?)
async_import_replaced_transactions(imported, realtime?)
async_import_blobs(imported, realtime?)
async_import_polygon_zkevm_bridge_l1_tokens(imported)
end
end

@ -103,6 +103,24 @@ defmodule Indexer.BoundQueue do
end
end
@doc """
`push_front/2` items from `items` into `bound_queue` until it is full.
"""
def push_front_until_maximum_size(
%__MODULE__{size: maximum_size, maximum_size: maximum_size} = bound_queue,
remaining
),
do: {bound_queue, remaining}
def push_front_until_maximum_size(%__MODULE__{} = bound_queue, [] = remaining), do: {bound_queue, remaining}
def push_front_until_maximum_size(%__MODULE__{} = bound_queue, [head | tail] = remaining) do
case push_front(bound_queue, head) do
{:ok, new_bound_queue} -> push_front_until_maximum_size(new_bound_queue, tail)
{:error, :maximum_size} -> {bound_queue, remaining}
end
end
@doc """
Shrinks the queue to half its current `size` and sets that as its new `max_size`.
"""

@ -74,6 +74,7 @@ defmodule Indexer.BufferedTask do
poll: true,
metadata: [],
current_buffer: [],
current_front_buffer: [],
bound_queue: %BoundQueue{},
task_ref_to_batch: %{}
@ -155,9 +156,9 @@ defmodule Indexer.BufferedTask do
@doc """
Buffers list of entries for future async execution.
"""
@spec buffer(GenServer.name(), entries(), timeout()) :: :ok
def buffer(server, entries, timeout \\ 5000) when is_list(entries) do
GenServer.call(server, {:buffer, entries}, timeout)
@spec buffer(GenServer.name(), entries(), boolean(), timeout()) :: :ok
def buffer(server, entries, front?, timeout \\ 5000) when is_list(entries) do
GenServer.call(server, {:buffer, entries, front?}, timeout)
end
def child_spec([init_arguments]) do
@ -277,12 +278,12 @@ defmodule Indexer.BufferedTask do
{:noreply, drop_task_and_retry(state, ref)}
end
def handle_info({:buffer, entries}, state) do
{:noreply, buffer_entries(state, entries)}
def handle_info({:buffer, entries, front?}, state) do
{:noreply, buffer_entries(state, entries, front?)}
end
def handle_call({:buffer, entries}, _from, state) do
{:reply, :ok, buffer_entries(state, entries)}
def handle_call({:buffer, entries, front?}, _from, state) do
{:reply, :ok, buffer_entries(state, entries, front?)}
end
def handle_call(
@ -290,12 +291,13 @@ defmodule Indexer.BufferedTask do
_from,
%BufferedTask{
current_buffer: current_buffer,
current_front_buffer: current_front_buffer,
bound_queue: bound_queue,
max_batch_size: max_batch_size,
task_ref_to_batch: task_ref_to_batch
} = state
) do
count = length(current_buffer) + Enum.count(bound_queue) * max_batch_size
count = length(current_buffer) + length(current_front_buffer) + Enum.count(bound_queue) * max_batch_size
{:reply, %{buffer: count, tasks: Enum.count(task_ref_to_batch)}, state}
end
@ -317,6 +319,15 @@ defmodule Indexer.BufferedTask do
{:reply, :ok, new_state}
end
def handle_call({:push_front, entries}, _from, state) when is_list(entries) do
new_state =
state
|> push_front(entries)
|> spawn_next_batch()
{:reply, :ok, new_state}
end
def handle_call(:shrink, _from, %__MODULE__{bound_queue: bound_queue} = state) do
{reply, shrunk_state} =
case BoundQueue.shrink(bound_queue) do
@ -350,9 +361,13 @@ defmodule Indexer.BufferedTask do
|> push_back(new_batch || batch)
end
defp buffer_entries(state, []), do: state
defp buffer_entries(state, [], _front?), do: state
defp buffer_entries(state, entries) do
defp buffer_entries(state, entries, true) do
%{state | current_front_buffer: [entries | state.current_front_buffer]}
end
defp buffer_entries(state, entries, false) do
%{state | current_buffer: [entries | state.current_buffer]}
end
@ -408,9 +423,17 @@ defmodule Indexer.BufferedTask do
GenServer.call(pid, {:push_back, entries})
end
defp push_back(%BufferedTask{bound_queue: bound_queue} = state, entries) when is_list(entries) do
defp push_back(%BufferedTask{} = state, entries), do: push(state, entries, false)
defp push_front(pid, entries) when is_pid(pid) and is_list(entries) do
GenServer.call(pid, {:push_front, entries})
end
defp push_front(%BufferedTask{} = state, entries), do: push(state, entries, true)
defp push(%BufferedTask{bound_queue: bound_queue} = state, entries, front?) when is_list(entries) do
new_bound_queue =
case BoundQueue.push_back_until_maximum_size(bound_queue, entries) do
case push_until_maximum_size(bound_queue, entries, front?) do
{new_bound_queue, []} ->
new_bound_queue
@ -433,6 +456,12 @@ defmodule Indexer.BufferedTask do
%BufferedTask{state | bound_queue: new_bound_queue}
end
defp push_until_maximum_size(bound_queue, entries, true),
do: BoundQueue.push_front_until_maximum_size(bound_queue, entries)
defp push_until_maximum_size(bound_queue, entries, false),
do: BoundQueue.push_back_until_maximum_size(bound_queue, entries)
defp take_batch(%BufferedTask{bound_queue: bound_queue, max_batch_size: max_batch_size} = state) do
{batch, new_bound_queue} = take_batch(bound_queue, max_batch_size)
{batch, %BufferedTask{state | bound_queue: new_bound_queue}}
@ -525,17 +554,19 @@ defmodule Indexer.BufferedTask do
callback_module.run(batch, callback_module_state)
end
defp flush(%BufferedTask{current_buffer: []} = state) do
defp flush(%BufferedTask{current_buffer: [], current_front_buffer: []} = state) do
state
|> spawn_next_batch()
|> schedule_next()
end
defp flush(%BufferedTask{current_buffer: current} = state) do
entries = List.flatten(current)
defp flush(%BufferedTask{current_buffer: buffer, current_front_buffer: front_buffer} = state) do
back_entries = List.flatten(buffer)
front_entries = List.flatten(front_buffer)
%BufferedTask{state | current_buffer: []}
|> push_back(entries)
%BufferedTask{state | current_buffer: [], current_front_buffer: []}
|> push_back(back_entries)
|> push_front(front_entries)
|> flush()
end
end

@ -25,11 +25,11 @@ defmodule Indexer.Fetcher.Beacon.Blob do
@doc """
Asynchronously fetches blobs for given `block_timestamp`.
"""
def async_fetch(block_timestamps) do
def async_fetch(block_timestamps, realtime?) do
if BlobSupervisor.disabled?() do
:ok
else
BufferedTask.buffer(__MODULE__, block_timestamps |> Enum.map(&entry/1))
BufferedTask.buffer(__MODULE__, Enum.map(block_timestamps, &entry/1), realtime?)
end
end

@ -31,12 +31,12 @@ defmodule Indexer.Fetcher.BlockReward do
@doc """
Asynchronously fetches block rewards for each `t:Explorer.Chain.Explorer.block_number/0`` in `block_numbers`.
"""
@spec async_fetch([Block.block_number()]) :: :ok
def async_fetch(block_numbers) when is_list(block_numbers) do
@spec async_fetch([Block.block_number()], boolean()) :: :ok
def async_fetch(block_numbers, realtime?) when is_list(block_numbers) do
if BlockRewardSupervisor.disabled?() do
:ok
else
BufferedTask.buffer(__MODULE__, block_numbers)
BufferedTask.buffer(__MODULE__, block_numbers, realtime?)
end
end

@ -30,7 +30,7 @@ defmodule Indexer.Fetcher.CoinBalance.Catchup do
else
entries = Enum.map(balance_fields, &Helper.entry/1)
BufferedTask.buffer(__MODULE__, entries)
BufferedTask.buffer(__MODULE__, entries, false)
end
end

@ -24,7 +24,7 @@ defmodule Indexer.Fetcher.CoinBalance.Realtime do
def async_fetch_balances(balance_fields) when is_list(balance_fields) do
entries = Enum.map(balance_fields, &Helper.entry/1)
BufferedTask.buffer(__MODULE__, entries)
BufferedTask.buffer(__MODULE__, entries, true)
end
def child_spec(params) do

@ -29,11 +29,12 @@ defmodule Indexer.Fetcher.ContractCode do
metadata: [fetcher: :code]
]
@spec async_fetch([%{required(:block_number) => Block.block_number(), required(:hash) => Hash.Full.t()}]) :: :ok
def async_fetch(transactions_fields, timeout \\ 5000) when is_list(transactions_fields) do
@spec async_fetch([%{required(:block_number) => Block.block_number(), required(:hash) => Hash.Full.t()}], boolean()) ::
:ok
def async_fetch(transactions_fields, realtime?, timeout \\ 5000) when is_list(transactions_fields) do
entries = Enum.map(transactions_fields, &entry/1)
BufferedTask.buffer(__MODULE__, entries, timeout)
BufferedTask.buffer(__MODULE__, entries, realtime?, timeout)
end
@doc false

@ -41,12 +41,12 @@ defmodule Indexer.Fetcher.InternalTransaction do
*Note*: The internal transactions for individual transactions cannot be paginated,
so the total number of internal transactions that could be produced is unknown.
"""
@spec async_fetch([Block.block_number()]) :: :ok
def async_fetch(block_numbers, timeout \\ 5000) when is_list(block_numbers) do
@spec async_fetch([Block.block_number()], boolean()) :: :ok
def async_fetch(block_numbers, realtime?, timeout \\ 5000) when is_list(block_numbers) do
if InternalTransactionSupervisor.disabled?() do
:ok
else
BufferedTask.buffer(__MODULE__, block_numbers, timeout)
BufferedTask.buffer(__MODULE__, block_numbers, realtime?, timeout)
end
end

@ -60,7 +60,7 @@ defmodule Indexer.Fetcher.PendingBlockOperationsSanitizer do
|> update([pbo, po, b], set: [block_number: b.number])
|> Repo.update_all([], timeout: @timeout)
InternalTransaction.async_fetch(block_numbers)
InternalTransaction.async_fetch(block_numbers, false)
block_numbers
end

@ -63,7 +63,7 @@ defmodule Indexer.Fetcher.PolygonZkevm.BridgeL1Tokens do
|> Enum.map(fn operation -> operation.l1_token_address end)
|> Enum.uniq()
BufferedTask.buffer(__MODULE__, l1_token_addresses)
BufferedTask.buffer(__MODULE__, l1_token_addresses, true)
end
defp defaults do

@ -25,19 +25,22 @@ defmodule Indexer.Fetcher.ReplacedTransaction do
metadata: [fetcher: :replaced_transaction]
]
@spec async_fetch([
%{
required(:nonce) => non_neg_integer,
required(:from_address_hash) => Hash.Address.t(),
required(:block_hash) => Hash.Full.t()
}
]) :: :ok
def async_fetch(transactions_fields, timeout \\ 5000) when is_list(transactions_fields) do
@spec async_fetch(
[
%{
required(:nonce) => non_neg_integer,
required(:from_address_hash) => Hash.Address.t(),
required(:block_hash) => Hash.Full.t()
}
],
boolean()
) :: :ok
def async_fetch(transactions_fields, realtime?, timeout \\ 5000) when is_list(transactions_fields) do
if ReplacedTransactionSupervisor.disabled?() do
:ok
else
entries = Enum.map(transactions_fields, &entry/1)
BufferedTask.buffer(__MODULE__, entries, timeout)
BufferedTask.buffer(__MODULE__, entries, realtime?, timeout)
end
end

@ -60,9 +60,9 @@ defmodule Indexer.Fetcher.Token do
@doc """
Fetches token data asynchronously given a list of `t:Explorer.Chain.Token.t/0`s.
"""
@spec async_fetch([Address.t()]) :: :ok
def async_fetch(token_contract_addresses) do
BufferedTask.buffer(__MODULE__, token_contract_addresses)
@spec async_fetch([Address.t()], boolean()) :: :ok
def async_fetch(token_contract_addresses, realtime?) do
BufferedTask.buffer(__MODULE__, token_contract_addresses, realtime?)
end
defp catalog_token(%Token{contract_address_hash: contract_address_hash} = token) do

@ -34,22 +34,25 @@ defmodule Indexer.Fetcher.TokenBalance do
@max_retries 3
@spec async_fetch([
%{
token_contract_address_hash: Hash.Address.t(),
address_hash: Hash.Address.t(),
block_number: non_neg_integer(),
token_type: String.t(),
token_id: non_neg_integer()
}
]) :: :ok
def async_fetch(token_balances) do
@spec async_fetch(
[
%{
token_contract_address_hash: Hash.Address.t(),
address_hash: Hash.Address.t(),
block_number: non_neg_integer(),
token_type: String.t(),
token_id: non_neg_integer()
}
],
boolean()
) :: :ok
def async_fetch(token_balances, realtime?) do
if TokenBalanceSupervisor.disabled?() do
:ok
else
formatted_params = Enum.map(token_balances, &entry/1)
BufferedTask.buffer(__MODULE__, formatted_params, :infinity)
BufferedTask.buffer(__MODULE__, formatted_params, realtime?, :infinity)
end
end

@ -74,11 +74,11 @@ defmodule Indexer.Fetcher.TokenInstance.Realtime do
|> List.flatten()
|> Enum.uniq()
BufferedTask.buffer(__MODULE__, data)
BufferedTask.buffer(__MODULE__, data, true)
end
def async_fetch(data, _disabled?) do
BufferedTask.buffer(__MODULE__, data)
BufferedTask.buffer(__MODULE__, data, true)
end
@spec retry_some_instances([map()], boolean(), map()) :: any()
@ -105,7 +105,7 @@ defmodule Indexer.Fetcher.TokenInstance.Realtime do
if token_instances_to_refetch != [] do
timeout = Application.get_env(:indexer, Indexer.Fetcher.TokenInstance.Realtime)[:retry_timeout]
Process.send_after(__MODULE__, {:buffer, token_instances_to_refetch}, timeout)
Process.send_after(__MODULE__, {:buffer, token_instances_to_refetch, false}, timeout)
end
end

@ -34,13 +34,16 @@ defmodule Indexer.Fetcher.UncleBlock do
Asynchronously fetches `t:Explorer.Chain.Block.t/0` for the given `nephew_hash` and `index`
and updates `t:Explorer.Chain.Block.SecondDegreeRelation.t/0` `block_fetched_at`.
"""
@spec async_fetch_blocks([%{required(:nephew_hash) => Hash.Full.t(), required(:index) => non_neg_integer()}]) :: :ok
def async_fetch_blocks(relations) when is_list(relations) do
@spec async_fetch_blocks(
[%{required(:nephew_hash) => Hash.Full.t(), required(:index) => non_neg_integer()}],
boolean()
) :: :ok
def async_fetch_blocks(relations, realtime? \\ false) when is_list(relations) do
if UncleBlockSupervisor.disabled?() do
:ok
else
entries = Enum.map(relations, &entry/1)
BufferedTask.buffer(__MODULE__, entries)
BufferedTask.buffer(__MODULE__, entries, realtime?)
end
end

@ -137,7 +137,7 @@ defmodule Indexer.TokenBalances do
block_number: token_balance.block_number
})
end)
|> TokenBalance.async_fetch()
|> TokenBalance.async_fetch(false)
end
defp ignore_request_with_errors(%{value: nil, value_fetched_at: nil, error: _error}), do: false

@ -59,7 +59,7 @@ defmodule Indexer.Block.Catchup.FetcherTest do
pid =
spawn_link(fn ->
receive do
{:"$gen_call", from, {:buffer, uncles}} ->
{:"$gen_call", from, {:buffer, uncles, _front?}} ->
GenServer.reply(from, :ok)
send(parent, {:uncles, uncles})
end
@ -434,7 +434,7 @@ defmodule Indexer.Block.Catchup.FetcherTest do
pid =
spawn_link(fn ->
receive do
{:"$gen_call", from, {:buffer, block_numbers}} ->
{:"$gen_call", from, {:buffer, block_numbers, _front?}} ->
GenServer.reply(from, :ok)
send(parent, {:block_numbers, block_numbers})
end
@ -584,7 +584,7 @@ defmodule Indexer.Block.Catchup.FetcherTest do
pid =
spawn_link(fn ->
receive do
{:"$gen_call", from, {:buffer, block_numbers}} ->
{:"$gen_call", from, {:buffer, block_numbers, _front?}} ->
GenServer.reply(from, :ok)
send(parent, {:block_numbers, block_numbers})
end

@ -74,13 +74,13 @@ defmodule Indexer.BufferedTaskTest do
refute_receive _
BufferedTask.buffer(buffer, ~w(12 13 14 15 16))
BufferedTask.buffer(buffer, ~w(12 13 14 15 16), false)
assert_receive {:run, ~w(12 13)}, @assert_receive_timeout
assert_receive {:run, ~w(14 15)}, @assert_receive_timeout
assert_receive {:run, ~w(16)}, @assert_receive_timeout
refute_receive _
BufferedTask.buffer(buffer, ~w(17))
BufferedTask.buffer(buffer, ~w(17), false)
assert_receive {:run, ~w(17)}, @assert_receive_timeout
refute_receive _
end
@ -90,7 +90,7 @@ defmodule Indexer.BufferedTaskTest do
{:ok, buffer} = start_buffer(EmptyTask)
refute_receive _
BufferedTask.buffer(buffer, ~w(some more entries))
BufferedTask.buffer(buffer, ~w(some more entries), false)
assert_receive {:run, ~w(some more)}, @assert_receive_timeout
assert_receive {:run, ~w(entries)}, @assert_receive_timeout
@ -113,7 +113,7 @@ defmodule Indexer.BufferedTaskTest do
Process.register(self(), RetryableTask)
{:ok, buffer} = start_buffer(RetryableTask)
BufferedTask.buffer(buffer, [:boom])
BufferedTask.buffer(buffer, [:boom], false)
assert_receive {:run, {0, [:boom]}}, @assert_receive_timeout
assert_receive {:run, {1, [:boom]}}, @assert_receive_timeout
refute_receive _
@ -150,7 +150,7 @@ defmodule Indexer.BufferedTaskTest do
Process.register(self(), RetryableTask)
{:ok, buffer} = start_buffer(RetryableTask)
BufferedTask.buffer(buffer, [1, 2, 3])
BufferedTask.buffer(buffer, [1, 2, 3], false)
assert_receive {:run, {0, [1, 2]}}, @assert_receive_timeout
assert_receive {:run, {0, [3]}}, @assert_receive_timeout
assert_receive {:run, {1, [1, 2]}}, @assert_receive_timeout
@ -172,9 +172,9 @@ defmodule Indexer.BufferedTaskTest do
assert %{buffer: 0, tasks: 0} = BufferedTask.debug_count(buffer)
BufferedTask.buffer(buffer, [{:sleep, 1_000}])
BufferedTask.buffer(buffer, [{:sleep, 1_000}])
BufferedTask.buffer(buffer, [{:sleep, 1_000}])
BufferedTask.buffer(buffer, [{:sleep, 1_000}], false)
BufferedTask.buffer(buffer, [{:sleep, 1_000}], false)
BufferedTask.buffer(buffer, [{:sleep, 1_000}], false)
Process.sleep(200)
assert %{buffer: buffer, tasks: tasks} = BufferedTask.debug_count(buffer)

@ -148,7 +148,7 @@ defmodule Indexer.Fetcher.Beacon.BlobTest do
BlobSupervisor.Case.start_supervised!()
assert :ok = Indexer.Fetcher.Beacon.Blob.async_fetch([block_a.timestamp])
assert :ok = Indexer.Fetcher.Beacon.Blob.async_fetch([block_a.timestamp], false)
wait_for_results(fn ->
Repo.one!(from(blob in Blob, where: blob.hash == ^blob_hash_a))

@ -126,7 +126,7 @@ defmodule Indexer.Fetcher.BlockRewardTest do
pid =
spawn_link(fn ->
receive do
{:"$gen_call", from, {:buffer, balance_fields}} ->
{:"$gen_call", from, {:buffer, balance_fields, _front?}} ->
GenServer.reply(from, :ok)
send(parent, {:balance_fields, balance_fields})
end
@ -134,7 +134,7 @@ defmodule Indexer.Fetcher.BlockRewardTest do
Process.register(pid, Indexer.Fetcher.CoinBalance.Catchup)
assert :ok = BlockReward.async_fetch([block_number])
assert :ok = BlockReward.async_fetch([block_number], false)
wait_for_tasks(BlockReward)
@ -199,7 +199,7 @@ defmodule Indexer.Fetcher.BlockRewardTest do
pid =
spawn_link(fn ->
receive do
{:"$gen_call", from, {:buffer, balance_fields}} ->
{:"$gen_call", from, {:buffer, balance_fields, _front?}} ->
GenServer.reply(from, :ok)
send(parent, {:balance_fields, balance_fields})
end
@ -207,7 +207,7 @@ defmodule Indexer.Fetcher.BlockRewardTest do
Process.register(pid, Indexer.Fetcher.CoinBalance.Catchup)
assert :ok = BlockReward.async_fetch([block_number])
assert :ok = BlockReward.async_fetch([block_number], false)
wait_for_tasks(BlockReward)
@ -260,7 +260,7 @@ defmodule Indexer.Fetcher.BlockRewardTest do
}
end)
assert :ok = BlockReward.async_fetch([block_number])
assert :ok = BlockReward.async_fetch([block_number], false)
wait_for_tasks(BlockReward)
@ -334,7 +334,7 @@ defmodule Indexer.Fetcher.BlockRewardTest do
pid =
spawn_link(fn ->
receive do
{:"$gen_call", from, {:buffer, balance_fields}} ->
{:"$gen_call", from, {:buffer, balance_fields, _front?}} ->
GenServer.reply(from, :ok)
send(parent, {:balance_fields, balance_fields})
end
@ -424,7 +424,7 @@ defmodule Indexer.Fetcher.BlockRewardTest do
pid =
spawn_link(fn ->
receive do
{:"$gen_call", from, {:buffer, balance_fields}} ->
{:"$gen_call", from, {:buffer, balance_fields, _front?}} ->
GenServer.reply(from, :ok)
send(parent, {:balance_fields, balance_fields})
end
@ -508,7 +508,7 @@ defmodule Indexer.Fetcher.BlockRewardTest do
pid =
spawn_link(fn ->
receive do
{:"$gen_call", from, {:buffer, balance_fields}} ->
{:"$gen_call", from, {:buffer, balance_fields, _front?}} ->
GenServer.reply(from, :ok)
send(parent, {:balance_fields, balance_fields})
end
@ -645,7 +645,7 @@ defmodule Indexer.Fetcher.BlockRewardTest do
pid =
spawn_link(fn ->
receive do
{:"$gen_call", from, {:buffer, balance_fields}} ->
{:"$gen_call", from, {:buffer, balance_fields, _front?}} ->
GenServer.reply(from, :ok)
send(parent, {:balance_fields, balance_fields})
end

@ -86,9 +86,12 @@ defmodule Indexer.Fetcher.ContractCodeTest do
ContractCode.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
assert :ok =
ContractCode.async_fetch([
%{created_contract_address_hash: address, block_number: block_number, hash: hash}
])
ContractCode.async_fetch(
[
%{created_contract_address_hash: address, block_number: block_number, hash: hash}
],
false
)
fetched_address =
wait(fn ->

@ -71,13 +71,16 @@ defmodule Indexer.Fetcher.ReplacedTransactionTest do
ReplacedTransaction.Supervisor.Case.start_supervised!()
assert :ok =
ReplacedTransaction.async_fetch([
%{
block_hash: mined_transaction.block_hash,
nonce: mined_transaction.nonce,
from_address_hash: mined_transaction.from_address_hash
}
])
ReplacedTransaction.async_fetch(
[
%{
block_hash: mined_transaction.block_hash,
nonce: mined_transaction.nonce,
from_address_hash: mined_transaction.from_address_hash
}
],
false
)
found_replaced_transaction =
wait(fn ->

Loading…
Cancel
Save