From 5fbc9fc49d1b1bb41e76337a015e2352da9ab618 Mon Sep 17 00:00:00 2001 From: Viktor Baranov Date: Thu, 10 Aug 2023 19:26:32 +0300 Subject: [PATCH] Manage concurrency for Token and TokenBalance fetchers --- CHANGELOG.md | 1 + apps/indexer/lib/indexer/fetcher/token.ex | 18 +++++++++++------- .../lib/indexer/fetcher/token_balance.ex | 3 ++- config/runtime.exs | 5 ++++- docker-compose/envs/common-blockscout.env | 2 ++ docker/Makefile | 6 ++++++ 6 files changed, 26 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e6fb6123a6..6076326dae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ ### Chore +- [#8167](https://github.com/blockscout/blockscout/pull/8167) - Manage concurrency for Token and TokenBalance fetcher - [#8146](https://github.com/blockscout/blockscout/pull/8146) - Add method_id to write methods in API v2 response - [#8105](https://github.com/blockscout/blockscout/pull/8105) - Extend API v1 with endpoints used by new UI - [#8104](https://github.com/blockscout/blockscout/pull/8104) - remove "TODO" from API v2 response diff --git a/apps/indexer/lib/indexer/fetcher/token.ex b/apps/indexer/lib/indexer/fetcher/token.ex index 57b815a03e..9d4b0dd253 100644 --- a/apps/indexer/lib/indexer/fetcher/token.ex +++ b/apps/indexer/lib/indexer/fetcher/token.ex @@ -14,12 +14,7 @@ defmodule Indexer.Fetcher.Token do @behaviour BufferedTask - @defaults [ - flush_interval: 300, - max_batch_size: 1, - max_concurrency: 10, - task_supervisor: Indexer.Fetcher.Token.TaskSupervisor - ] + @default_max_concurrency 10 @doc false def child_spec([init_options, gen_server_options]) do @@ -32,7 +27,7 @@ defmodule Indexer.Fetcher.Token do end merged_init_opts = - @defaults + defaults() |> Keyword.merge(mergeable_init_options) |> Keyword.put(:state, state) @@ -81,4 +76,13 @@ defmodule Indexer.Fetcher.Token do {:ok, _} = Chain.update_token(token, token_params) :ok end + + defp defaults do + [ + flush_interval: 300, + max_batch_size: 1, + max_concurrency: Application.get_env(:indexer, __MODULE__)[:concurrency] || @default_max_concurrency, + task_supervisor: Indexer.Fetcher.Token.TaskSupervisor + ] + end end diff --git a/apps/indexer/lib/indexer/fetcher/token_balance.ex b/apps/indexer/lib/indexer/fetcher/token_balance.ex index 9804e0d3be..93add63827 100644 --- a/apps/indexer/lib/indexer/fetcher/token_balance.ex +++ b/apps/indexer/lib/indexer/fetcher/token_balance.ex @@ -26,6 +26,7 @@ defmodule Indexer.Fetcher.TokenBalance do @behaviour BufferedTask @default_max_batch_size 100 + @default_max_concurrency 10 @max_retries 3 @@ -234,7 +235,7 @@ defmodule Indexer.Fetcher.TokenBalance do [ flush_interval: 300, max_batch_size: Application.get_env(:indexer, __MODULE__)[:batch_size] || @default_max_batch_size, - max_concurrency: 10, + max_concurrency: Application.get_env(:indexer, __MODULE__)[:concurrency] || @default_max_concurrency, task_supervisor: Indexer.Fetcher.TokenBalance.TaskSupervisor ] end diff --git a/config/runtime.exs b/config/runtime.exs index 37818f36a5..146c433493 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -444,8 +444,11 @@ config :indexer, Indexer.Fetcher.PendingTransaction.Supervisor, System.get_env("ETHEREUM_JSONRPC_VARIANT") == "besu" || ConfigHelper.parse_bool_env_var("INDEXER_DISABLE_PENDING_TRANSACTIONS_FETCHER") +config :indexer, Indexer.Fetcher.Token, concurrency: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_CONCURRENCY", 10) + config :indexer, Indexer.Fetcher.TokenBalance, - batch_size: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_BALANCES_BATCH_SIZE", 100) + batch_size: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_BALANCES_BATCH_SIZE", 100), + concurrency: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_BALANCES_CONCURRENCY", 10) config :indexer, Indexer.Fetcher.TokenBalanceOnDemand, threshold: ConfigHelper.parse_time_env_var("TOKEN_BALANCE_ON_DEMAND_FETCHER_THRESHOLD", "1h"), diff --git a/docker-compose/envs/common-blockscout.env b/docker-compose/envs/common-blockscout.env index c7dfa9eae6..ac9ee0e86c 100644 --- a/docker-compose/envs/common-blockscout.env +++ b/docker-compose/envs/common-blockscout.env @@ -114,7 +114,9 @@ INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER=false # INDEXER_COIN_BALANCES_CONCURRENCY= # INDEXER_RECEIPTS_BATCH_SIZE= # INDEXER_RECEIPTS_CONCURRENCY= +# INDEXER_TOKEN_CONCURRENCY= # INDEXER_TOKEN_BALANCES_BATCH_SIZE= +# INDEXER_TOKEN_BALANCES_CONCURRENCY= # INDEXER_TX_ACTIONS_ENABLE= # INDEXER_TX_ACTIONS_MAX_TOKEN_CACHE_SIZE= # INDEXER_TX_ACTIONS_REINDEX_FIRST_BLOCK= diff --git a/docker/Makefile b/docker/Makefile index b9f15c84c0..bccf86840c 100644 --- a/docker/Makefile +++ b/docker/Makefile @@ -561,9 +561,15 @@ endif ifdef INDEXER_EMPTY_BLOCKS_SANITIZER_BATCH_SIZE BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_EMPTY_BLOCKS_SANITIZER_BATCH_SIZE=$(INDEXER_EMPTY_BLOCKS_SANITIZER_BATCH_SIZE)' endif +ifdef INDEXER_TOKEN_CONCURRENCY + BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_TOKEN_CONCURRENCY=$(INDEXER_TOKEN_CONCURRENCY)' +endif ifdef INDEXER_TOKEN_BALANCES_BATCH_SIZE BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_TOKEN_BALANCES_BATCH_SIZE=$(INDEXER_TOKEN_BALANCES_BATCH_SIZE)' endif +ifdef INDEXER_TOKEN_BALANCES_CONCURRENCY + BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_TOKEN_BALANCES_CONCURRENCY=$(INDEXER_TOKEN_BALANCES_CONCURRENCY)' +endif ifdef INDEXER_REALTIME_FETCHER_MAX_GAP BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_REALTIME_FETCHER_MAX_GAP=$(INDEXER_REALTIME_FETCHER_MAX_GAP)' endif