Merge pull request #8167 from blockscout/vb-manage-concurrency

Manage concurrency for Token and TokenBalance fetchers
pull/8169/head
Victor Baranov 1 year ago committed by GitHub
commit 238a3bbcaf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 18
      apps/indexer/lib/indexer/fetcher/token.ex
  3. 3
      apps/indexer/lib/indexer/fetcher/token_balance.ex
  4. 5
      config/runtime.exs
  5. 2
      docker-compose/envs/common-blockscout.env
  6. 6
      docker/Makefile

@ -22,6 +22,7 @@
### Chore ### Chore
- [#8167](https://github.com/blockscout/blockscout/pull/8167) - Manage concurrency for Token and TokenBalance fetcher
- [#8146](https://github.com/blockscout/blockscout/pull/8146) - Add method_id to write methods in API v2 response - [#8146](https://github.com/blockscout/blockscout/pull/8146) - Add method_id to write methods in API v2 response
- [#8105](https://github.com/blockscout/blockscout/pull/8105) - Extend API v1 with endpoints used by new UI - [#8105](https://github.com/blockscout/blockscout/pull/8105) - Extend API v1 with endpoints used by new UI
- [#8104](https://github.com/blockscout/blockscout/pull/8104) - remove "TODO" from API v2 response - [#8104](https://github.com/blockscout/blockscout/pull/8104) - remove "TODO" from API v2 response

@ -14,12 +14,7 @@ defmodule Indexer.Fetcher.Token do
@behaviour BufferedTask @behaviour BufferedTask
@defaults [ @default_max_concurrency 10
flush_interval: 300,
max_batch_size: 1,
max_concurrency: 10,
task_supervisor: Indexer.Fetcher.Token.TaskSupervisor
]
@doc false @doc false
def child_spec([init_options, gen_server_options]) do def child_spec([init_options, gen_server_options]) do
@ -32,7 +27,7 @@ defmodule Indexer.Fetcher.Token do
end end
merged_init_opts = merged_init_opts =
@defaults defaults()
|> Keyword.merge(mergeable_init_options) |> Keyword.merge(mergeable_init_options)
|> Keyword.put(:state, state) |> Keyword.put(:state, state)
@ -81,4 +76,13 @@ defmodule Indexer.Fetcher.Token do
{:ok, _} = Chain.update_token(token, token_params) {:ok, _} = Chain.update_token(token, token_params)
:ok :ok
end end
defp defaults do
[
flush_interval: 300,
max_batch_size: 1,
max_concurrency: Application.get_env(:indexer, __MODULE__)[:concurrency] || @default_max_concurrency,
task_supervisor: Indexer.Fetcher.Token.TaskSupervisor
]
end
end end

@ -26,6 +26,7 @@ defmodule Indexer.Fetcher.TokenBalance do
@behaviour BufferedTask @behaviour BufferedTask
@default_max_batch_size 100 @default_max_batch_size 100
@default_max_concurrency 10
@max_retries 3 @max_retries 3
@ -234,7 +235,7 @@ defmodule Indexer.Fetcher.TokenBalance do
[ [
flush_interval: 300, flush_interval: 300,
max_batch_size: Application.get_env(:indexer, __MODULE__)[:batch_size] || @default_max_batch_size, max_batch_size: Application.get_env(:indexer, __MODULE__)[:batch_size] || @default_max_batch_size,
max_concurrency: 10, max_concurrency: Application.get_env(:indexer, __MODULE__)[:concurrency] || @default_max_concurrency,
task_supervisor: Indexer.Fetcher.TokenBalance.TaskSupervisor task_supervisor: Indexer.Fetcher.TokenBalance.TaskSupervisor
] ]
end end

@ -444,8 +444,11 @@ config :indexer, Indexer.Fetcher.PendingTransaction.Supervisor,
System.get_env("ETHEREUM_JSONRPC_VARIANT") == "besu" || System.get_env("ETHEREUM_JSONRPC_VARIANT") == "besu" ||
ConfigHelper.parse_bool_env_var("INDEXER_DISABLE_PENDING_TRANSACTIONS_FETCHER") ConfigHelper.parse_bool_env_var("INDEXER_DISABLE_PENDING_TRANSACTIONS_FETCHER")
config :indexer, Indexer.Fetcher.Token, concurrency: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_CONCURRENCY", 10)
config :indexer, Indexer.Fetcher.TokenBalance, config :indexer, Indexer.Fetcher.TokenBalance,
batch_size: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_BALANCES_BATCH_SIZE", 100) batch_size: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_BALANCES_BATCH_SIZE", 100),
concurrency: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_BALANCES_CONCURRENCY", 10)
config :indexer, Indexer.Fetcher.TokenBalanceOnDemand, config :indexer, Indexer.Fetcher.TokenBalanceOnDemand,
threshold: ConfigHelper.parse_time_env_var("TOKEN_BALANCE_ON_DEMAND_FETCHER_THRESHOLD", "1h"), threshold: ConfigHelper.parse_time_env_var("TOKEN_BALANCE_ON_DEMAND_FETCHER_THRESHOLD", "1h"),

@ -114,7 +114,9 @@ INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER=false
# INDEXER_COIN_BALANCES_CONCURRENCY= # INDEXER_COIN_BALANCES_CONCURRENCY=
# INDEXER_RECEIPTS_BATCH_SIZE= # INDEXER_RECEIPTS_BATCH_SIZE=
# INDEXER_RECEIPTS_CONCURRENCY= # INDEXER_RECEIPTS_CONCURRENCY=
# INDEXER_TOKEN_CONCURRENCY=
# INDEXER_TOKEN_BALANCES_BATCH_SIZE= # INDEXER_TOKEN_BALANCES_BATCH_SIZE=
# INDEXER_TOKEN_BALANCES_CONCURRENCY=
# INDEXER_TX_ACTIONS_ENABLE= # INDEXER_TX_ACTIONS_ENABLE=
# INDEXER_TX_ACTIONS_MAX_TOKEN_CACHE_SIZE= # INDEXER_TX_ACTIONS_MAX_TOKEN_CACHE_SIZE=
# INDEXER_TX_ACTIONS_REINDEX_FIRST_BLOCK= # INDEXER_TX_ACTIONS_REINDEX_FIRST_BLOCK=

@ -561,9 +561,15 @@ endif
ifdef INDEXER_EMPTY_BLOCKS_SANITIZER_BATCH_SIZE ifdef INDEXER_EMPTY_BLOCKS_SANITIZER_BATCH_SIZE
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_EMPTY_BLOCKS_SANITIZER_BATCH_SIZE=$(INDEXER_EMPTY_BLOCKS_SANITIZER_BATCH_SIZE)' BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_EMPTY_BLOCKS_SANITIZER_BATCH_SIZE=$(INDEXER_EMPTY_BLOCKS_SANITIZER_BATCH_SIZE)'
endif endif
ifdef INDEXER_TOKEN_CONCURRENCY
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_TOKEN_CONCURRENCY=$(INDEXER_TOKEN_CONCURRENCY)'
endif
ifdef INDEXER_TOKEN_BALANCES_BATCH_SIZE ifdef INDEXER_TOKEN_BALANCES_BATCH_SIZE
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_TOKEN_BALANCES_BATCH_SIZE=$(INDEXER_TOKEN_BALANCES_BATCH_SIZE)' BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_TOKEN_BALANCES_BATCH_SIZE=$(INDEXER_TOKEN_BALANCES_BATCH_SIZE)'
endif endif
ifdef INDEXER_TOKEN_BALANCES_CONCURRENCY
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_TOKEN_BALANCES_CONCURRENCY=$(INDEXER_TOKEN_BALANCES_CONCURRENCY)'
endif
ifdef INDEXER_REALTIME_FETCHER_MAX_GAP ifdef INDEXER_REALTIME_FETCHER_MAX_GAP
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_REALTIME_FETCHER_MAX_GAP=$(INDEXER_REALTIME_FETCHER_MAX_GAP)' BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_REALTIME_FETCHER_MAX_GAP=$(INDEXER_REALTIME_FETCHER_MAX_GAP)'
endif endif

Loading…
Cancel
Save