Limit fetchers init tasks

limit-fetchers-init-tasks
Qwerty5Uiop 1 year ago
parent 81c7fed16a
commit fd78aaf3ea
  1. 1
      CHANGELOG.md
  2. 83
      apps/explorer/lib/explorer/chain.ex
  3. 21
      apps/indexer/lib/indexer/buffered_task.ex
  4. 10
      apps/indexer/lib/indexer/fetcher/block_reward.ex
  5. 14
      apps/indexer/lib/indexer/fetcher/coin_balance.ex
  6. 3
      apps/indexer/lib/indexer/fetcher/contract_code.ex
  7. 11
      apps/indexer/lib/indexer/fetcher/internal_transaction.ex
  8. 3
      apps/indexer/lib/indexer/fetcher/replaced_transaction.ex
  9. 10
      apps/indexer/lib/indexer/fetcher/token.ex
  10. 14
      apps/indexer/lib/indexer/fetcher/token_balance.ex
  11. 11
      apps/indexer/lib/indexer/fetcher/token_instance/retry.ex
  12. 2
      apps/indexer/lib/indexer/fetcher/token_updater.ex
  13. 14
      apps/indexer/lib/indexer/fetcher/uncle_block.ex
  14. 7
      apps/indexer/lib/indexer/temporary/blocks_transactions_mismatch.ex
  15. 4
      apps/indexer/lib/indexer/temporary/uncles_without_index.ex
  16. 3
      apps/indexer/test/indexer/buffered_task_test.exs
  17. 3
      apps/indexer/test/support/indexer/fetcher/block_reward_supervisor_case.ex
  18. 3
      config/runtime.exs
  19. 1
      docker-compose/envs/common-blockscout.env
  20. 3
      docker/Makefile

@ -11,6 +11,7 @@
- [#7532](https://github.com/blockscout/blockscout/pull/7532) - Handle empty id in json rpc responses
- [#7544](https://github.com/blockscout/blockscout/pull/7544) - Add ERC-1155 signatures to uncataloged_token_transfer_block_numbers
- [#7363](https://github.com/blockscout/blockscout/pull/7363) - CSV export filters
- [#7697](https://github.com/blockscout/blockscout/pull/7697) - Limit fetchers init tasks
### Fixes

@ -2616,8 +2616,9 @@ defmodule Explorer.Chain do
@doc """
Calls `reducer` on a stream of `t:Explorer.Chain.Block.t/0` without `t:Explorer.Chain.Block.Reward.t/0`.
"""
def stream_blocks_without_rewards(initial, reducer) when is_function(reducer, 2) do
def stream_blocks_without_rewards(initial, reducer, limited? \\ false) when is_function(reducer, 2) do
Block.blocks_without_reward_query()
|> add_fetcher_limit(limited?)
|> Repo.stream_reduce(initial, reducer)
end
@ -2827,10 +2828,11 @@ defmodule Explorer.Chain do
@spec stream_unfetched_balances(
initial :: accumulator,
reducer ::
(entry :: %{address_hash: Hash.Address.t(), block_number: Block.block_number()}, accumulator -> accumulator)
(entry :: %{address_hash: Hash.Address.t(), block_number: Block.block_number()}, accumulator -> accumulator),
limited? :: boolean()
) :: {:ok, accumulator}
when accumulator: term()
def stream_unfetched_balances(initial, reducer) when is_function(reducer, 2) do
def stream_unfetched_balances(initial, reducer, limited? \\ false) when is_function(reducer, 2) do
query =
from(
balance in CoinBalance,
@ -2838,7 +2840,9 @@ defmodule Explorer.Chain do
select: %{address_hash: balance.address_hash, block_number: balance.block_number}
)
Repo.stream_reduce(query, initial, reducer)
query
|> add_fetcher_limit(limited?)
|> Repo.stream_reduce(initial, reducer)
end
@doc """
@ -2846,11 +2850,13 @@ defmodule Explorer.Chain do
"""
@spec stream_unfetched_token_balances(
initial :: accumulator,
reducer :: (entry :: TokenBalance.t(), accumulator -> accumulator)
reducer :: (entry :: TokenBalance.t(), accumulator -> accumulator),
limited? :: boolean()
) :: {:ok, accumulator}
when accumulator: term()
def stream_unfetched_token_balances(initial, reducer) when is_function(reducer, 2) do
def stream_unfetched_token_balances(initial, reducer, limited? \\ false) when is_function(reducer, 2) do
TokenBalance.unfetched_token_balances()
|> add_fetcher_limit(limited?)
|> Repo.stream_reduce(initial, reducer)
end
@ -2872,10 +2878,12 @@ defmodule Explorer.Chain do
"""
@spec stream_blocks_with_unfetched_internal_transactions(
initial :: accumulator,
reducer :: (entry :: term(), accumulator -> accumulator)
reducer :: (entry :: term(), accumulator -> accumulator),
limited? :: boolean()
) :: {:ok, accumulator}
when accumulator: term()
def stream_blocks_with_unfetched_internal_transactions(initial, reducer) when is_function(reducer, 2) do
def stream_blocks_with_unfetched_internal_transactions(initial, reducer, limited? \\ false)
when is_function(reducer, 2) do
query =
from(
po in PendingBlockOperation,
@ -2883,7 +2891,9 @@ defmodule Explorer.Chain do
select: po.block_number
)
Repo.stream_reduce(query, initial, reducer)
query
|> add_fetcher_limit(limited?)
|> Repo.stream_reduce(initial, reducer)
end
def remove_nonconsensus_blocks_from_pending_ops(block_hashes) do
@ -2930,10 +2940,11 @@ defmodule Explorer.Chain do
| :value
],
initial :: accumulator,
reducer :: (entry :: term(), accumulator -> accumulator)
reducer :: (entry :: term(), accumulator -> accumulator),
limited? :: boolean()
) :: {:ok, accumulator}
when accumulator: term()
def stream_transactions_with_unfetched_created_contract_codes(fields, initial, reducer)
def stream_transactions_with_unfetched_created_contract_codes(fields, initial, reducer, limited? \\ false)
when is_function(reducer, 2) do
query =
from(t in Transaction,
@ -2943,7 +2954,9 @@ defmodule Explorer.Chain do
select: ^fields
)
Repo.stream_reduce(query, initial, reducer)
query
|> add_fetcher_limit(limited?)
|> Repo.stream_reduce(initial, reducer)
end
@spec stream_mined_transactions(
@ -2995,14 +3008,16 @@ defmodule Explorer.Chain do
| :value
],
initial :: accumulator,
reducer :: (entry :: term(), accumulator -> accumulator)
reducer :: (entry :: term(), accumulator -> accumulator),
limited? :: boolean()
) :: {:ok, accumulator}
when accumulator: term()
def stream_pending_transactions(fields, initial, reducer) when is_function(reducer, 2) do
def stream_pending_transactions(fields, initial, reducer, limited? \\ false) when is_function(reducer, 2) do
query =
Transaction
|> pending_transactions_query()
|> select(^fields)
|> add_fetcher_limit(limited?)
Repo.stream_reduce(query, initial, reducer)
end
@ -3017,17 +3032,20 @@ defmodule Explorer.Chain do
"""
@spec stream_unfetched_uncles(
initial :: accumulator,
reducer :: (entry :: term(), accumulator -> accumulator)
reducer :: (entry :: term(), accumulator -> accumulator),
limited? :: boolean()
) :: {:ok, accumulator}
when accumulator: term()
def stream_unfetched_uncles(initial, reducer) when is_function(reducer, 2) do
def stream_unfetched_uncles(initial, reducer, limited? \\ false) when is_function(reducer, 2) do
query =
from(bsdr in Block.SecondDegreeRelation,
where: is_nil(bsdr.uncle_fetched_at) and not is_nil(bsdr.index),
select: [:nephew_hash, :index]
)
Repo.stream_reduce(query, initial, reducer)
query
|> add_fetcher_limit(limited?)
|> Repo.stream_reduce(initial, reducer)
end
@doc """
@ -5017,10 +5035,12 @@ defmodule Explorer.Chain do
"""
@spec stream_uncataloged_token_contract_address_hashes(
initial :: accumulator,
reducer :: (entry :: Hash.Address.t(), accumulator -> accumulator)
reducer :: (entry :: Hash.Address.t(), accumulator -> accumulator),
limited? :: boolean()
) :: {:ok, accumulator}
when accumulator: term()
def stream_uncataloged_token_contract_address_hashes(initial, reducer) when is_function(reducer, 2) do
def stream_uncataloged_token_contract_address_hashes(initial, reducer, limited? \\ false)
when is_function(reducer, 2) do
query =
from(
token in Token,
@ -5028,7 +5048,9 @@ defmodule Explorer.Chain do
select: token.contract_address_hash
)
Repo.stream_reduce(query, initial, reducer)
query
|> add_fetcher_limit(limited?)
|> Repo.stream_reduce(initial, reducer)
end
@spec stream_unfetched_token_instances(
@ -5080,10 +5102,11 @@ defmodule Explorer.Chain do
@spec stream_token_instances_with_error(
initial :: accumulator,
reducer :: (entry :: map(), accumulator -> accumulator)
reducer :: (entry :: map(), accumulator -> accumulator),
limited? :: boolean()
) :: {:ok, accumulator}
when accumulator: term()
def stream_token_instances_with_error(initial, reducer) when is_function(reducer, 2) do
def stream_token_instances_with_error(initial, reducer, limited? \\ false) when is_function(reducer, 2) do
Instance
|> where([instance], not is_nil(instance.error))
|> select([instance], %{
@ -5091,6 +5114,7 @@ defmodule Explorer.Chain do
token_id: instance.token_id,
updated_at: instance.updated_at
})
|> add_fetcher_limit(limited?)
|> Repo.stream_reduce(initial, reducer)
end
@ -5099,13 +5123,16 @@ defmodule Explorer.Chain do
"""
@spec stream_cataloged_token_contract_address_hashes(
initial :: accumulator,
reducer :: (entry :: Hash.Address.t(), accumulator -> accumulator)
reducer :: (entry :: Hash.Address.t(), accumulator -> accumulator),
some_time_ago_updated :: integer(),
limited? :: boolean()
) :: {:ok, accumulator}
when accumulator: term()
def stream_cataloged_token_contract_address_hashes(initial, reducer, some_time_ago_updated \\ 2880)
def stream_cataloged_token_contract_address_hashes(initial, reducer, some_time_ago_updated \\ 2880, limited? \\ false)
when is_function(reducer, 2) do
some_time_ago_updated
|> Token.cataloged_tokens()
|> add_fetcher_limit(limited?)
|> order_by(asc: :updated_at)
|> Repo.stream_reduce(initial, reducer)
end
@ -6913,4 +6940,12 @@ defmodule Explorer.Chain do
def count_withdrawals_from_cache(options \\ []) do
"withdrawals_count" |> get_last_fetched_counter(options) |> Decimal.add(1)
end
def add_fetcher_limit(query, false), do: query
def add_fetcher_limit(query, true) do
fetcher_limit = Application.get_env(:indexer, :fetcher_init_limit)
limit(query, ^fetcher_limit)
end
end

@ -71,7 +71,7 @@ defmodule Indexer.BufferedTask do
flush_interval: nil,
max_batch_size: nil,
max_concurrency: nil,
poll: false,
poll: true,
metadata: [],
current_buffer: [],
bound_queue: %BoundQueue{},
@ -231,7 +231,7 @@ defmodule Indexer.BufferedTask do
state = %BufferedTask{
callback_module: callback_module,
callback_module_state: Keyword.fetch!(opts, :state),
poll: Keyword.get(opts, :poll, false),
poll: Keyword.get(opts, :poll, true),
task_supervisor: Keyword.fetch!(opts, :task_supervisor),
flush_interval: Keyword.fetch!(opts, :flush_interval),
max_batch_size: Keyword.fetch!(opts, :max_batch_size),
@ -442,21 +442,8 @@ defmodule Indexer.BufferedTask do
end
# get more work from `init/2`
defp schedule_next(%BufferedTask{poll: true, bound_queue: %BoundQueue{size: 0}} = state) do
do_initial_stream(state)
end
# was shrunk and was out of work, get more work from `init/2`
defp schedule_next(%BufferedTask{bound_queue: %BoundQueue{size: 0, maximum_size: maximum_size}} = state)
when maximum_size != nil do
Logger.info(fn ->
[
"BufferedTask ",
process(self()),
" ran out of work, but work queue was shrunk to save memory, so restoring lost work from `c:init/2`."
]
end)
defp schedule_next(%BufferedTask{poll: true, bound_queue: %BoundQueue{size: 0}, task_ref_to_batch: tasks} = state)
when tasks == %{} do
do_initial_stream(state)
end

@ -62,9 +62,13 @@ defmodule Indexer.Fetcher.BlockReward do
@impl BufferedTask
def init(initial, reducer, _) do
{:ok, final} =
Chain.stream_blocks_without_rewards(initial, fn %{number: number}, acc ->
reducer.(number, acc)
end)
Chain.stream_blocks_without_rewards(
initial,
fn %{number: number}, acc ->
reducer.(number, acc)
end,
true
)
final
end

@ -61,11 +61,15 @@ defmodule Indexer.Fetcher.CoinBalance do
@impl BufferedTask
def init(initial, reducer, _) do
{:ok, final} =
Chain.stream_unfetched_balances(initial, fn address_fields, acc ->
address_fields
|> entry()
|> reducer.(acc)
end)
Chain.stream_unfetched_balances(
initial,
fn address_fields, acc ->
address_fields
|> entry()
|> reducer.(acc)
end,
true
)
final
end

@ -64,7 +64,8 @@ defmodule Indexer.Fetcher.ContractCode do
transaction_fields
|> entry()
|> reducer.(acc)
end
end,
true
)
final

@ -71,9 +71,13 @@ defmodule Indexer.Fetcher.InternalTransaction do
@impl BufferedTask
def init(initial, reducer, _json_rpc_named_arguments) do
{:ok, final} =
Chain.stream_blocks_with_unfetched_internal_transactions(initial, fn block_number, acc ->
reducer.(block_number, acc)
end)
Chain.stream_blocks_with_unfetched_internal_transactions(
initial,
fn block_number, acc ->
reducer.(block_number, acc)
end,
true
)
final
end
@ -349,7 +353,6 @@ defmodule Indexer.Fetcher.InternalTransaction do
flush_interval: :timer.seconds(3),
max_concurrency: Application.get_env(:indexer, __MODULE__)[:concurrency] || @default_max_concurrency,
max_batch_size: Application.get_env(:indexer, __MODULE__)[:batch_size] || @default_max_batch_size,
poll: true,
task_supervisor: Indexer.Fetcher.InternalTransaction.TaskSupervisor,
metadata: [fetcher: :internal_transaction]
]

@ -61,7 +61,8 @@ defmodule Indexer.Fetcher.ReplacedTransaction do
transaction_fields
|> pending_entry()
|> reducer.(acc)
end
end,
true
)
final

@ -42,9 +42,13 @@ defmodule Indexer.Fetcher.Token do
@impl BufferedTask
def init(initial_acc, reducer, _) do
{:ok, acc} =
Chain.stream_uncataloged_token_contract_address_hashes(initial_acc, fn address, acc ->
reducer.(address, acc)
end)
Chain.stream_uncataloged_token_contract_address_hashes(
initial_acc,
fn address, acc ->
reducer.(address, acc)
end,
true
)
acc
end

@ -69,11 +69,15 @@ defmodule Indexer.Fetcher.TokenBalance do
@impl BufferedTask
def init(initial, reducer, _) do
{:ok, final} =
Chain.stream_unfetched_token_balances(initial, fn token_balance, acc ->
token_balance
|> entry()
|> reducer.(acc)
end)
Chain.stream_unfetched_token_balances(
initial,
fn token_balance, acc ->
token_balance
|> entry()
|> reducer.(acc)
end,
true
)
final
end

@ -29,9 +29,13 @@ defmodule Indexer.Fetcher.TokenInstance.Retry do
@impl BufferedTask
def init(initial_acc, reducer, _) do
{:ok, acc} =
Chain.stream_token_instances_with_error(initial_acc, fn data, acc ->
reducer.(data, acc)
end)
Chain.stream_token_instances_with_error(
initial_acc,
fn data, acc ->
reducer.(data, acc)
end,
true
)
acc
end
@ -54,7 +58,6 @@ defmodule Indexer.Fetcher.TokenInstance.Retry do
flush_interval: :timer.minutes(10),
max_concurrency: Application.get_env(:indexer, __MODULE__)[:concurrency] || @default_max_concurrency,
max_batch_size: @default_max_batch_size,
poll: true,
task_supervisor: __MODULE__.TaskSupervisor
]
end

@ -52,7 +52,7 @@ defmodule Indexer.Fetcher.TokenUpdater do
|> Duration.to_minutes()
|> trunc()
{:ok, tokens} = Chain.stream_cataloged_token_contract_address_hashes(initial, reducer, interval_in_minutes)
{:ok, tokens} = Chain.stream_cataloged_token_contract_address_hashes(initial, reducer, interval_in_minutes, true)
tokens
end

@ -65,11 +65,15 @@ defmodule Indexer.Fetcher.UncleBlock do
@impl BufferedTask
def init(initial, reducer, _) do
{:ok, final} =
Chain.stream_unfetched_uncles(initial, fn uncle, acc ->
uncle
|> entry()
|> reducer.(acc)
end)
Chain.stream_unfetched_uncles(
initial,
fn uncle, acc ->
uncle
|> entry()
|> reducer.(acc)
end,
true
)
final
end

@ -14,8 +14,8 @@ defmodule Indexer.Temporary.BlocksTransactionsMismatch do
import Ecto.Query
alias EthereumJSONRPC.Blocks
alias Explorer.{Chain, Repo}
alias Explorer.Chain.Block
alias Explorer.Repo
alias Explorer.Utility.MissingRangesManipulator
alias Indexer.BufferedTask
@ -58,7 +58,10 @@ defmodule Indexer.Temporary.BlocksTransactionsMismatch do
select: {block.hash, count(transactions.hash)}
)
{:ok, final} = Repo.stream_reduce(query, initial, &reducer.(&1, &2))
{:ok, final} =
query
|> Chain.add_fetcher_limit(true)
|> Repo.stream_reduce(initial, &reducer.(&1, &2))
final
end

@ -58,7 +58,9 @@ defmodule Indexer.Temporary.UnclesWithoutIndex do
)
{:ok, final} =
Repo.stream_reduce(query, initial, fn nephew_hash, acc ->
query
|> Chain.add_fetcher_limit(true)
|> Repo.stream_reduce(initial, fn nephew_hash, acc ->
nephew_hash
|> to_string()
|> reducer.(acc)

@ -28,7 +28,8 @@ defmodule Indexer.BufferedTaskTest do
task_supervisor: BufferedTaskSup,
flush_interval: @flush_interval,
max_batch_size: max_batch_size,
max_concurrency: 2}
max_concurrency: 2,
poll: false}
]}
)
end

@ -7,7 +7,8 @@ defmodule Indexer.Fetcher.BlockReward.Supervisor.Case do
fetcher_arguments,
flush_interval: 50,
max_batch_size: 1,
max_concurrency: 1
max_concurrency: 1,
poll: false
)
[merged_fetcher_arguments]

@ -409,7 +409,8 @@ config :indexer,
memory_limit: ConfigHelper.indexer_memory_limit(),
receipts_batch_size: ConfigHelper.parse_integer_env_var("INDEXER_RECEIPTS_BATCH_SIZE", 250),
receipts_concurrency: ConfigHelper.parse_integer_env_var("INDEXER_RECEIPTS_CONCURRENCY", 10),
hide_indexing_progress_alert: ConfigHelper.parse_bool_env_var("INDEXER_HIDE_INDEXING_PROGRESS_ALERT")
hide_indexing_progress_alert: ConfigHelper.parse_bool_env_var("INDEXER_HIDE_INDEXING_PROGRESS_ALERT"),
fetcher_init_limit: ConfigHelper.parse_integer_env_var("INDEXER_FETCHER_INIT_QUERY_LIMIT", 100)
config :indexer, Indexer.Supervisor, enabled: !ConfigHelper.parse_bool_env_var("DISABLE_INDEXER")

@ -120,6 +120,7 @@ INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER=false
# INDEXER_TX_ACTIONS_REINDEX_PROTOCOLS=
# INDEXER_TX_ACTIONS_AAVE_V3_POOL_CONTRACT=
# INDEXER_REALTIME_FETCHER_MAX_GAP=
# INDEXER_FETCHER_INIT_QUERY_LIMIT=
# INDEXER_DISABLE_WITHDRAWALS_FETCHER=
# WITHDRAWALS_FIRST_BLOCK=
# TOKEN_ID_MIGRATION_FIRST_BLOCK=

@ -561,6 +561,9 @@ endif
ifdef INDEXER_REALTIME_FETCHER_MAX_GAP
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_REALTIME_FETCHER_MAX_GAP=$(INDEXER_REALTIME_FETCHER_MAX_GAP)'
endif
ifdef INDEXER_FETCHER_INIT_QUERY_LIMIT
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_FETCHER_INIT_QUERY_LIMIT=$(INDEXER_FETCHER_INIT_QUERY_LIMIT)'
endif
ifdef INDEXER_INTERNAL_TRANSACTIONS_TRACER_TYPE
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_INTERNAL_TRANSACTIONS_TRACER_TYPE=$(INDEXER_INTERNAL_TRANSACTIONS_TRACER_TYPE)'
endif

Loading…
Cancel
Save