diff --git a/CHANGELOG.md b/CHANGELOG.md index 76acc264cc..9951737b07 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ### Features +- [#6196](https://github.com/blockscout/blockscout/pull/6196) - INDEXER_CATCHUP_BLOCKS_BATCH_SIZE and INDEXER_CATCHUP_BLOCKS_CONCURRENCY env varaibles - [#6187](https://github.com/blockscout/blockscout/pull/6187) - Filter by created time of verified contracts in listcontracts API endpoint - [#6092](https://github.com/blockscout/blockscout/pull/6092) - Blockscout Account functionality - [#6073](https://github.com/blockscout/blockscout/pull/6073) - Add vyper support for rust verifier microservice integration diff --git a/apps/indexer/lib/indexer/block/catchup/fetcher.ex b/apps/indexer/lib/indexer/block/catchup/fetcher.ex index 7bd9c83cb3..fd8df89d23 100644 --- a/apps/indexer/lib/indexer/block/catchup/fetcher.ex +++ b/apps/indexer/lib/indexer/block/catchup/fetcher.ex @@ -30,43 +30,19 @@ defmodule Indexer.Block.Catchup.Fetcher do @behaviour Block.Fetcher - # These are all the *default* values for options. - # DO NOT use them directly in the code. Get options from `state`. - - @blocks_batch_size 10 - @blocks_concurrency 10 @sequence_name :block_catchup_sequencer - defstruct blocks_batch_size: @blocks_batch_size, - blocks_concurrency: @blocks_concurrency, - block_fetcher: nil, + defstruct block_fetcher: nil, memory_monitor: nil - @doc false - def default_blocks_batch_size, do: @blocks_batch_size - @doc """ Required named arguments * `:json_rpc_named_arguments` - `t:EthereumJSONRPC.json_rpc_named_arguments/0` passed to `EthereumJSONRPC.json_rpc/2`. - - The follow options can be overridden: - - * `:blocks_batch_size` - The number of blocks to request in one call to the JSONRPC. Defaults to - `#{@blocks_batch_size}`. Block requests also include the transactions for those blocks. *These transactions - are not paginated.* - * `:blocks_concurrency` - The number of concurrent requests of `:blocks_batch_size` to allow against the JSONRPC. - Defaults to #{@blocks_concurrency}. So, up to `blocks_concurrency * block_batch_size` (defaults to - `#{@blocks_concurrency * @blocks_batch_size}`) blocks can be requested from the JSONRPC at once over all - connections. Up to `block_concurrency * receipts_batch_size * receipts_concurrency` (defaults to - `#{@blocks_concurrency * Block.Fetcher.default_receipts_batch_size() * Block.Fetcher.default_receipts_batch_size()}` - ) receipts can be requested from the JSONRPC at once over all connections. - """ def task( %__MODULE__{ - blocks_batch_size: blocks_batch_size, block_fetcher: %Block.Fetcher{json_rpc_named_arguments: json_rpc_named_arguments} } = state ) do @@ -111,7 +87,7 @@ defmodule Indexer.Block.Catchup.Fetcher do false _ -> - step = step(first, last, blocks_batch_size) + step = step(first, last, blocks_batch_size()) sequence_opts = put_memory_monitor([ranges: missing_ranges, step: step], state) gen_server_opts = [name: @sequence_name] {:ok, sequence} = Sequence.start_link(sequence_opts, gen_server_opts) @@ -132,6 +108,27 @@ defmodule Indexer.Block.Catchup.Fetcher do end end + @doc """ + The number of blocks to request in one call to the JSONRPC. Defaults to + 10. Block requests also include the transactions for those blocks. *These transactions + are not paginated. + """ + def blocks_batch_size do + Application.get_env(:indexer, __MODULE__)[:batch_size] + end + + @doc """ + The number of concurrent requests of `blocks_batch_size` to allow against the JSONRPC. + Defaults to 10. So, up to `blocks_concurrency * block_batch_size` (defaults to + `10 * 10`) blocks can be requested from the JSONRPC at once over all + connections. Up to `block_concurrency * receipts_batch_size * receipts_concurrency` (defaults to + `#{10 * Block.Fetcher.default_receipts_batch_size() * Block.Fetcher.default_receipts_concurrency()}` + ) receipts can be requested from the JSONRPC at once over all connections. + """ + def blocks_concurrency do + Application.get_env(:indexer, __MODULE__)[:concurrency] + end + defp fetch_last_block(json_rpc_named_arguments) do case latest_block() do nil -> @@ -184,13 +181,13 @@ defmodule Indexer.Block.Catchup.Fetcher do async_import_token_instances(imported) end - defp stream_fetch_and_import(%__MODULE__{blocks_concurrency: blocks_concurrency} = state, sequence) + defp stream_fetch_and_import(state, sequence) when is_pid(sequence) do sequence |> Sequence.build_stream() |> Task.async_stream( &fetch_and_import_range_from_sequence(state, &1, sequence), - max_concurrency: blocks_concurrency, + max_concurrency: blocks_concurrency(), timeout: :infinity ) |> Stream.run() diff --git a/apps/indexer/test/indexer/block/catchup/bound_interval_supervisor_test.exs b/apps/indexer/test/indexer/block/catchup/bound_interval_supervisor_test.exs index de4199a2fb..9519e8e25f 100644 --- a/apps/indexer/test/indexer/block/catchup/bound_interval_supervisor_test.exs +++ b/apps/indexer/test/indexer/block/catchup/bound_interval_supervisor_test.exs @@ -207,7 +207,7 @@ defmodule Indexer.Block.Catchup.BoundIntervalSupervisorTest do {:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments) - default_blocks_batch_size = Catchup.Fetcher.default_blocks_batch_size() + default_blocks_batch_size = Catchup.Fetcher.blocks_batch_size() assert latest_block_number > default_blocks_batch_size diff --git a/apps/indexer/test/indexer/block/catchup/fetcher_test.exs b/apps/indexer/test/indexer/block/catchup/fetcher_test.exs index 21f873ff1a..dc8e6ebaee 100644 --- a/apps/indexer/test/indexer/block/catchup/fetcher_test.exs +++ b/apps/indexer/test/indexer/block/catchup/fetcher_test.exs @@ -126,6 +126,7 @@ defmodule Indexer.Block.Catchup.FetcherTest do test "ignores fetched beneficiaries with different hash for same number", %{ json_rpc_named_arguments: json_rpc_named_arguments } do + Application.put_env(:indexer, Indexer.Block.Catchup.Fetcher, batch_size: 1, concurrency: 10) CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) InternalTransaction.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) @@ -264,7 +265,6 @@ defmodule Indexer.Block.Catchup.FetcherTest do assert %{first_block_number: ^block_number, last_block_number: 0, missing_block_count: 2, shrunk: false} = Fetcher.task(%Fetcher{ - blocks_batch_size: 1, block_fetcher: %Block.Fetcher{ callback_module: Fetcher, json_rpc_named_arguments: json_rpc_named_arguments @@ -280,6 +280,7 @@ defmodule Indexer.Block.Catchup.FetcherTest do test "async fetches beneficiaries when individual responses error out", %{ json_rpc_named_arguments: json_rpc_named_arguments } do + Application.put_env(:indexer, Indexer.Block.Catchup.Fetcher, batch_size: 1, concurrency: 10) CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) InternalTransaction.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) @@ -413,7 +414,6 @@ defmodule Indexer.Block.Catchup.FetcherTest do assert %{first_block_number: ^block_number, last_block_number: 0, missing_block_count: 2, shrunk: false} = Fetcher.task(%Fetcher{ - blocks_batch_size: 1, block_fetcher: %Block.Fetcher{ callback_module: Fetcher, json_rpc_named_arguments: json_rpc_named_arguments @@ -431,6 +431,7 @@ defmodule Indexer.Block.Catchup.FetcherTest do test "async fetches beneficiaries when entire call errors out", %{ json_rpc_named_arguments: json_rpc_named_arguments } do + Application.put_env(:indexer, Indexer.Block.Catchup.Fetcher, batch_size: 1, concurrency: 10) CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) InternalTransaction.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) @@ -557,7 +558,6 @@ defmodule Indexer.Block.Catchup.FetcherTest do assert %{first_block_number: ^block_number, last_block_number: 0, missing_block_count: 2, shrunk: false} = Fetcher.task(%Fetcher{ - blocks_batch_size: 1, block_fetcher: %Block.Fetcher{ callback_module: Fetcher, json_rpc_named_arguments: json_rpc_named_arguments diff --git a/config/runtime.exs b/config/runtime.exs index 080870947f..67dc3c7d30 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -422,6 +422,19 @@ config :indexer, Indexer.Supervisor, enabled: System.get_env("DISABLE_INDEXER") config :indexer, Indexer.Block.Realtime.Supervisor, enabled: System.get_env("DISABLE_REALTIME_INDEXER") != "true" +blocks_catchup_fetcher_batch_size_default_str = "10" +blocks_catchup_fetcher_concurrency_default_str = "10" + +{blocks_catchup_fetcher_batch_size, _} = + Integer.parse(System.get_env("INDEXER_CATCHUP_BLOCKS_BATCH_SIZE", blocks_catchup_fetcher_batch_size_default_str)) + +{blocks_catchup_fetcher_concurrency, _} = + Integer.parse(System.get_env("INDEXER_CATCHUP_BLOCKS_CONCURRENCY", blocks_catchup_fetcher_concurrency_default_str)) + +config :indexer, Indexer.Block.Catchup.Fetcher, + batch_size: blocks_catchup_fetcher_batch_size, + concurrency: blocks_catchup_fetcher_concurrency + Code.require_file("#{config_env()}.exs", "config/runtime") for config <- "../apps/*/config/runtime/#{config_env()}.exs" |> Path.expand(__DIR__) |> Path.wildcard() do diff --git a/docker-compose/envs/common-blockscout.env b/docker-compose/envs/common-blockscout.env index 0d3a9fbc5d..27f6506682 100644 --- a/docker-compose/envs/common-blockscout.env +++ b/docker-compose/envs/common-blockscout.env @@ -81,6 +81,8 @@ DISABLE_INDEXER=false DISABLE_REALTIME_INDEXER=false INDEXER_DISABLE_PENDING_TRANSACTIONS_FETCHER=false INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER=false +# INDEXER_CATCHUP_BLOCKS_BATCH_SIZE= +# INDEXER_CATCHUP_BLOCKS_CONCURRENCY= # WEBAPP_URL= # API_URL= WOBSERVER_ENABLED=false diff --git a/docker/Makefile b/docker/Makefile index 331633bbb1..4b4ad8f521 100644 --- a/docker/Makefile +++ b/docker/Makefile @@ -370,30 +370,18 @@ endif ifdef API_RATE_LIMIT_WHITELISTED_IPS BLOCKSCOUT_CONTAINER_PARAMS += -e 'API_RATE_LIMIT_WHITELISTED_IPS=$(API_RATE_LIMIT_WHITELISTED_IPS)' endif -ifdef INDEXER_DISABLE_PENDING_TRANSACTIONS_FETCHER - BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_DISABLE_PENDING_TRANSACTIONS_FETCHER=$(INDEXER_DISABLE_PENDING_TRANSACTIONS_FETCHER)' -endif -ifdef INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER - BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER=$(INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER)' -endif ifdef TOKEN_BALANCE_ON_DEMAND_FETCHER_THRESHOLD_MINUTES BLOCKSCOUT_CONTAINER_PARAMS += -e 'TOKEN_BALANCE_ON_DEMAND_FETCHER_THRESHOLD_MINUTES=$(TOKEN_BALANCE_ON_DEMAND_FETCHER_THRESHOLD_MINUTES)' endif ifdef COIN_BALANCE_ON_DEMAND_FETCHER_THRESHOLD_MINUTES BLOCKSCOUT_CONTAINER_PARAMS += -e 'COIN_BALANCE_ON_DEMAND_FETCHER_THRESHOLD_MINUTES=$(COIN_BALANCE_ON_DEMAND_FETCHER_THRESHOLD_MINUTES)' endif -ifdef INDEXER_MEMORY_LIMIT - BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_MEMORY_LIMIT=$(INDEXER_MEMORY_LIMIT)' -endif ifdef ETHEREUM_JSONRPC_DEBUG_TRACE_TRANSACTION_TIMEOUT BLOCKSCOUT_CONTAINER_PARAMS += -e 'ETHEREUM_JSONRPC_DEBUG_TRACE_TRANSACTION_TIMEOUT=$(ETHEREUM_JSONRPC_DEBUG_TRACE_TRANSACTION_TIMEOUT)' endif ifdef FETCH_REWARDS_WAY BLOCKSCOUT_CONTAINER_PARAMS += -e 'FETCH_REWARDS_WAY=$(FETCH_REWARDS_WAY)' endif -ifdef INDEXER_EMPTY_BLOCKS_SANITIZER_BATCH_SIZE - BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_EMPTY_BLOCKS_SANITIZER_BATCH_SIZE=$(INDEXER_EMPTY_BLOCKS_SANITIZER_BATCH_SIZE)' -endif ifdef FOOTER_CHAT_LINK BLOCKSCOUT_CONTAINER_PARAMS += -e 'FOOTER_CHAT_LINK=$(FOOTER_CHAT_LINK)' endif @@ -436,6 +424,15 @@ endif ifdef ENABLE_TXS_STATS BLOCKSCOUT_CONTAINER_PARAMS += -e 'ENABLE_TXS_STATS=$(ENABLE_TXS_STATS)' endif +ifdef INDEXER_MEMORY_LIMIT + BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_MEMORY_LIMIT=$(INDEXER_MEMORY_LIMIT)' +endif +ifdef INDEXER_DISABLE_PENDING_TRANSACTIONS_FETCHER + BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_DISABLE_PENDING_TRANSACTIONS_FETCHER=$(INDEXER_DISABLE_PENDING_TRANSACTIONS_FETCHER)' +endif +ifdef INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER + BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER=$(INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER)' +endif ifdef INDEXER_DISABLE_BLOCK_REWARD_FETCHER BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_DISABLE_BLOCK_REWARD_FETCHER=$(INDEXER_DISABLE_BLOCK_REWARD_FETCHER)' endif @@ -448,6 +445,15 @@ endif ifdef INDEXER_DISABLE_EMPTY_BLOCK_SANITIZER BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_DISABLE_EMPTY_BLOCK_SANITIZER=$(INDEXER_DISABLE_EMPTY_BLOCK_SANITIZER)' endif +ifdef INDEXER_CATCHUP_BLOCKS_BATCH_SIZE + BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_CATCHUP_BLOCKS_BATCH_SIZE=$(INDEXER_CATCHUP_BLOCKS_BATCH_SIZE)' +endif +ifdef INDEXER_CATCHUP_BLOCKS_CONCURRENCY + BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_CATCHUP_BLOCKS_CONCURRENCY=$(INDEXER_CATCHUP_BLOCKS_CONCURRENCY)' +endif +ifdef INDEXER_EMPTY_BLOCKS_SANITIZER_BATCH_SIZE + BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_EMPTY_BLOCKS_SANITIZER_BATCH_SIZE=$(INDEXER_EMPTY_BLOCKS_SANITIZER_BATCH_SIZE)' +endif ifdef SECRET_KEY_BASE BLOCKSCOUT_CONTAINER_PARAMS += -e 'SECRET_KEY_BASE=$(SECRET_KEY_BASE)' endif