Merge pull request #6196 from blockscout/vb-catchup-batch-size-concurrency-variables

INDEXER_CATCHUP_BLOCKS_BATCH_SIZE and INDEXER_CATCHUP_BLOCKS_CONCURRENCY env varaibles
pull/6198/head
Victor Baranov 2 years ago committed by GitHub
commit 9b6b480495
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 53
      apps/indexer/lib/indexer/block/catchup/fetcher.ex
  3. 2
      apps/indexer/test/indexer/block/catchup/bound_interval_supervisor_test.exs
  4. 6
      apps/indexer/test/indexer/block/catchup/fetcher_test.exs
  5. 13
      config/runtime.exs
  6. 2
      docker-compose/envs/common-blockscout.env
  7. 30
      docker/Makefile

@ -2,6 +2,7 @@
### Features
- [#6196](https://github.com/blockscout/blockscout/pull/6196) - INDEXER_CATCHUP_BLOCKS_BATCH_SIZE and INDEXER_CATCHUP_BLOCKS_CONCURRENCY env varaibles
- [#6187](https://github.com/blockscout/blockscout/pull/6187) - Filter by created time of verified contracts in listcontracts API endpoint
- [#6092](https://github.com/blockscout/blockscout/pull/6092) - Blockscout Account functionality
- [#6073](https://github.com/blockscout/blockscout/pull/6073) - Add vyper support for rust verifier microservice integration

@ -30,43 +30,19 @@ defmodule Indexer.Block.Catchup.Fetcher do
@behaviour Block.Fetcher
# These are all the *default* values for options.
# DO NOT use them directly in the code. Get options from `state`.
@blocks_batch_size 10
@blocks_concurrency 10
@sequence_name :block_catchup_sequencer
defstruct blocks_batch_size: @blocks_batch_size,
blocks_concurrency: @blocks_concurrency,
block_fetcher: nil,
defstruct block_fetcher: nil,
memory_monitor: nil
@doc false
def default_blocks_batch_size, do: @blocks_batch_size
@doc """
Required named arguments
* `:json_rpc_named_arguments` - `t:EthereumJSONRPC.json_rpc_named_arguments/0` passed to
`EthereumJSONRPC.json_rpc/2`.
The follow options can be overridden:
* `:blocks_batch_size` - The number of blocks to request in one call to the JSONRPC. Defaults to
`#{@blocks_batch_size}`. Block requests also include the transactions for those blocks. *These transactions
are not paginated.*
* `:blocks_concurrency` - The number of concurrent requests of `:blocks_batch_size` to allow against the JSONRPC.
Defaults to #{@blocks_concurrency}. So, up to `blocks_concurrency * block_batch_size` (defaults to
`#{@blocks_concurrency * @blocks_batch_size}`) blocks can be requested from the JSONRPC at once over all
connections. Up to `block_concurrency * receipts_batch_size * receipts_concurrency` (defaults to
`#{@blocks_concurrency * Block.Fetcher.default_receipts_batch_size() * Block.Fetcher.default_receipts_batch_size()}`
) receipts can be requested from the JSONRPC at once over all connections.
"""
def task(
%__MODULE__{
blocks_batch_size: blocks_batch_size,
block_fetcher: %Block.Fetcher{json_rpc_named_arguments: json_rpc_named_arguments}
} = state
) do
@ -111,7 +87,7 @@ defmodule Indexer.Block.Catchup.Fetcher do
false
_ ->
step = step(first, last, blocks_batch_size)
step = step(first, last, blocks_batch_size())
sequence_opts = put_memory_monitor([ranges: missing_ranges, step: step], state)
gen_server_opts = [name: @sequence_name]
{:ok, sequence} = Sequence.start_link(sequence_opts, gen_server_opts)
@ -132,6 +108,27 @@ defmodule Indexer.Block.Catchup.Fetcher do
end
end
@doc """
The number of blocks to request in one call to the JSONRPC. Defaults to
10. Block requests also include the transactions for those blocks. *These transactions
are not paginated.
"""
def blocks_batch_size do
Application.get_env(:indexer, __MODULE__)[:batch_size]
end
@doc """
The number of concurrent requests of `blocks_batch_size` to allow against the JSONRPC.
Defaults to 10. So, up to `blocks_concurrency * block_batch_size` (defaults to
`10 * 10`) blocks can be requested from the JSONRPC at once over all
connections. Up to `block_concurrency * receipts_batch_size * receipts_concurrency` (defaults to
`#{10 * Block.Fetcher.default_receipts_batch_size() * Block.Fetcher.default_receipts_concurrency()}`
) receipts can be requested from the JSONRPC at once over all connections.
"""
def blocks_concurrency do
Application.get_env(:indexer, __MODULE__)[:concurrency]
end
defp fetch_last_block(json_rpc_named_arguments) do
case latest_block() do
nil ->
@ -184,13 +181,13 @@ defmodule Indexer.Block.Catchup.Fetcher do
async_import_token_instances(imported)
end
defp stream_fetch_and_import(%__MODULE__{blocks_concurrency: blocks_concurrency} = state, sequence)
defp stream_fetch_and_import(state, sequence)
when is_pid(sequence) do
sequence
|> Sequence.build_stream()
|> Task.async_stream(
&fetch_and_import_range_from_sequence(state, &1, sequence),
max_concurrency: blocks_concurrency,
max_concurrency: blocks_concurrency(),
timeout: :infinity
)
|> Stream.run()

@ -207,7 +207,7 @@ defmodule Indexer.Block.Catchup.BoundIntervalSupervisorTest do
{:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments)
default_blocks_batch_size = Catchup.Fetcher.default_blocks_batch_size()
default_blocks_batch_size = Catchup.Fetcher.blocks_batch_size()
assert latest_block_number > default_blocks_batch_size

@ -126,6 +126,7 @@ defmodule Indexer.Block.Catchup.FetcherTest do
test "ignores fetched beneficiaries with different hash for same number", %{
json_rpc_named_arguments: json_rpc_named_arguments
} do
Application.put_env(:indexer, Indexer.Block.Catchup.Fetcher, batch_size: 1, concurrency: 10)
CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
InternalTransaction.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
@ -264,7 +265,6 @@ defmodule Indexer.Block.Catchup.FetcherTest do
assert %{first_block_number: ^block_number, last_block_number: 0, missing_block_count: 2, shrunk: false} =
Fetcher.task(%Fetcher{
blocks_batch_size: 1,
block_fetcher: %Block.Fetcher{
callback_module: Fetcher,
json_rpc_named_arguments: json_rpc_named_arguments
@ -280,6 +280,7 @@ defmodule Indexer.Block.Catchup.FetcherTest do
test "async fetches beneficiaries when individual responses error out", %{
json_rpc_named_arguments: json_rpc_named_arguments
} do
Application.put_env(:indexer, Indexer.Block.Catchup.Fetcher, batch_size: 1, concurrency: 10)
CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
InternalTransaction.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
@ -413,7 +414,6 @@ defmodule Indexer.Block.Catchup.FetcherTest do
assert %{first_block_number: ^block_number, last_block_number: 0, missing_block_count: 2, shrunk: false} =
Fetcher.task(%Fetcher{
blocks_batch_size: 1,
block_fetcher: %Block.Fetcher{
callback_module: Fetcher,
json_rpc_named_arguments: json_rpc_named_arguments
@ -431,6 +431,7 @@ defmodule Indexer.Block.Catchup.FetcherTest do
test "async fetches beneficiaries when entire call errors out", %{
json_rpc_named_arguments: json_rpc_named_arguments
} do
Application.put_env(:indexer, Indexer.Block.Catchup.Fetcher, batch_size: 1, concurrency: 10)
CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
InternalTransaction.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
@ -557,7 +558,6 @@ defmodule Indexer.Block.Catchup.FetcherTest do
assert %{first_block_number: ^block_number, last_block_number: 0, missing_block_count: 2, shrunk: false} =
Fetcher.task(%Fetcher{
blocks_batch_size: 1,
block_fetcher: %Block.Fetcher{
callback_module: Fetcher,
json_rpc_named_arguments: json_rpc_named_arguments

@ -422,6 +422,19 @@ config :indexer, Indexer.Supervisor, enabled: System.get_env("DISABLE_INDEXER")
config :indexer, Indexer.Block.Realtime.Supervisor, enabled: System.get_env("DISABLE_REALTIME_INDEXER") != "true"
blocks_catchup_fetcher_batch_size_default_str = "10"
blocks_catchup_fetcher_concurrency_default_str = "10"
{blocks_catchup_fetcher_batch_size, _} =
Integer.parse(System.get_env("INDEXER_CATCHUP_BLOCKS_BATCH_SIZE", blocks_catchup_fetcher_batch_size_default_str))
{blocks_catchup_fetcher_concurrency, _} =
Integer.parse(System.get_env("INDEXER_CATCHUP_BLOCKS_CONCURRENCY", blocks_catchup_fetcher_concurrency_default_str))
config :indexer, Indexer.Block.Catchup.Fetcher,
batch_size: blocks_catchup_fetcher_batch_size,
concurrency: blocks_catchup_fetcher_concurrency
Code.require_file("#{config_env()}.exs", "config/runtime")
for config <- "../apps/*/config/runtime/#{config_env()}.exs" |> Path.expand(__DIR__) |> Path.wildcard() do

@ -81,6 +81,8 @@ DISABLE_INDEXER=false
DISABLE_REALTIME_INDEXER=false
INDEXER_DISABLE_PENDING_TRANSACTIONS_FETCHER=false
INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER=false
# INDEXER_CATCHUP_BLOCKS_BATCH_SIZE=
# INDEXER_CATCHUP_BLOCKS_CONCURRENCY=
# WEBAPP_URL=
# API_URL=
WOBSERVER_ENABLED=false

@ -370,30 +370,18 @@ endif
ifdef API_RATE_LIMIT_WHITELISTED_IPS
BLOCKSCOUT_CONTAINER_PARAMS += -e 'API_RATE_LIMIT_WHITELISTED_IPS=$(API_RATE_LIMIT_WHITELISTED_IPS)'
endif
ifdef INDEXER_DISABLE_PENDING_TRANSACTIONS_FETCHER
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_DISABLE_PENDING_TRANSACTIONS_FETCHER=$(INDEXER_DISABLE_PENDING_TRANSACTIONS_FETCHER)'
endif
ifdef INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER=$(INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER)'
endif
ifdef TOKEN_BALANCE_ON_DEMAND_FETCHER_THRESHOLD_MINUTES
BLOCKSCOUT_CONTAINER_PARAMS += -e 'TOKEN_BALANCE_ON_DEMAND_FETCHER_THRESHOLD_MINUTES=$(TOKEN_BALANCE_ON_DEMAND_FETCHER_THRESHOLD_MINUTES)'
endif
ifdef COIN_BALANCE_ON_DEMAND_FETCHER_THRESHOLD_MINUTES
BLOCKSCOUT_CONTAINER_PARAMS += -e 'COIN_BALANCE_ON_DEMAND_FETCHER_THRESHOLD_MINUTES=$(COIN_BALANCE_ON_DEMAND_FETCHER_THRESHOLD_MINUTES)'
endif
ifdef INDEXER_MEMORY_LIMIT
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_MEMORY_LIMIT=$(INDEXER_MEMORY_LIMIT)'
endif
ifdef ETHEREUM_JSONRPC_DEBUG_TRACE_TRANSACTION_TIMEOUT
BLOCKSCOUT_CONTAINER_PARAMS += -e 'ETHEREUM_JSONRPC_DEBUG_TRACE_TRANSACTION_TIMEOUT=$(ETHEREUM_JSONRPC_DEBUG_TRACE_TRANSACTION_TIMEOUT)'
endif
ifdef FETCH_REWARDS_WAY
BLOCKSCOUT_CONTAINER_PARAMS += -e 'FETCH_REWARDS_WAY=$(FETCH_REWARDS_WAY)'
endif
ifdef INDEXER_EMPTY_BLOCKS_SANITIZER_BATCH_SIZE
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_EMPTY_BLOCKS_SANITIZER_BATCH_SIZE=$(INDEXER_EMPTY_BLOCKS_SANITIZER_BATCH_SIZE)'
endif
ifdef FOOTER_CHAT_LINK
BLOCKSCOUT_CONTAINER_PARAMS += -e 'FOOTER_CHAT_LINK=$(FOOTER_CHAT_LINK)'
endif
@ -436,6 +424,15 @@ endif
ifdef ENABLE_TXS_STATS
BLOCKSCOUT_CONTAINER_PARAMS += -e 'ENABLE_TXS_STATS=$(ENABLE_TXS_STATS)'
endif
ifdef INDEXER_MEMORY_LIMIT
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_MEMORY_LIMIT=$(INDEXER_MEMORY_LIMIT)'
endif
ifdef INDEXER_DISABLE_PENDING_TRANSACTIONS_FETCHER
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_DISABLE_PENDING_TRANSACTIONS_FETCHER=$(INDEXER_DISABLE_PENDING_TRANSACTIONS_FETCHER)'
endif
ifdef INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER=$(INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER)'
endif
ifdef INDEXER_DISABLE_BLOCK_REWARD_FETCHER
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_DISABLE_BLOCK_REWARD_FETCHER=$(INDEXER_DISABLE_BLOCK_REWARD_FETCHER)'
endif
@ -448,6 +445,15 @@ endif
ifdef INDEXER_DISABLE_EMPTY_BLOCK_SANITIZER
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_DISABLE_EMPTY_BLOCK_SANITIZER=$(INDEXER_DISABLE_EMPTY_BLOCK_SANITIZER)'
endif
ifdef INDEXER_CATCHUP_BLOCKS_BATCH_SIZE
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_CATCHUP_BLOCKS_BATCH_SIZE=$(INDEXER_CATCHUP_BLOCKS_BATCH_SIZE)'
endif
ifdef INDEXER_CATCHUP_BLOCKS_CONCURRENCY
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_CATCHUP_BLOCKS_CONCURRENCY=$(INDEXER_CATCHUP_BLOCKS_CONCURRENCY)'
endif
ifdef INDEXER_EMPTY_BLOCKS_SANITIZER_BATCH_SIZE
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_EMPTY_BLOCKS_SANITIZER_BATCH_SIZE=$(INDEXER_EMPTY_BLOCKS_SANITIZER_BATCH_SIZE)'
endif
ifdef SECRET_KEY_BASE
BLOCKSCOUT_CONTAINER_PARAMS += -e 'SECRET_KEY_BASE=$(SECRET_KEY_BASE)'
endif

Loading…
Cancel
Save