Move token_id migrator to Explorer application

pull/6391/head
Qwerty5Uiop 2 years ago
parent 039be2566b
commit 9abcfd5faf
  1. 5
      apps/explorer/lib/explorer/application.ex
  2. 2
      apps/explorer/lib/explorer/token_transfer_token_id_migration/lowest_block_number_updater.ex
  3. 17
      apps/explorer/lib/explorer/token_transfer_token_id_migration/supervisor.ex
  4. 12
      apps/explorer/lib/explorer/token_transfer_token_id_migration/worker.ex
  5. 4
      apps/explorer/test/explorer/token_transfer_token_id_migration/lowest_block_number_updater_test.exs
  6. 3
      apps/explorer/test/explorer/token_transfer_token_id_migration/worker_test.ex
  7. 4
      apps/indexer/lib/indexer/supervisor.ex
  8. 18
      config/runtime.exs

@ -5,7 +5,7 @@ defmodule Explorer.Application do
use Application use Application
alias Explorer.Admin alias Explorer.{Admin, TokenTransferTokenIdMigration}
alias Explorer.Chain.Cache.{ alias Explorer.Chain.Cache.{
Accounts, Accounts,
@ -68,7 +68,8 @@ defmodule Explorer.Application do
Transactions, Transactions,
Accounts, Accounts,
Uncles, Uncles,
{Redix, redix_opts()} {Redix, redix_opts()},
{TokenTransferTokenIdMigration.Supervisor, [[]]}
] ]
children = base_children ++ configurable_children() children = base_children ++ configurable_children()

@ -1,4 +1,4 @@
defmodule Indexer.Fetcher.TokenTransferTokenIdMigration.LowestBlockNumberUpdater do defmodule Explorer.TokenTransferTokenIdMigration.LowestBlockNumberUpdater do
@moduledoc """ @moduledoc """
Collects processed block numbers from token id migration workers Collects processed block numbers from token id migration workers
and updates last_processed_block_number according to them. and updates last_processed_block_number according to them.

@ -1,4 +1,4 @@
defmodule Indexer.Fetcher.TokenTransferTokenIdMigration.Supervisor do defmodule Explorer.TokenTransferTokenIdMigration.Supervisor do
@moduledoc """ @moduledoc """
Supervises parts of token id migration process. Supervises parts of token id migration process.
@ -35,8 +35,8 @@ defmodule Indexer.Fetcher.TokenTransferTokenIdMigration.Supervisor do
""" """
use Supervisor use Supervisor
alias Explorer.TokenTransferTokenIdMigration.{LowestBlockNumberUpdater, Worker}
alias Explorer.Utility.TokenTransferTokenIdMigratorProgress alias Explorer.Utility.TokenTransferTokenIdMigratorProgress
alias Indexer.Fetcher.TokenTransferTokenIdMigration.{LowestBlockNumberUpdater, Worker}
@default_first_block 0 @default_first_block 0
@default_workers_count 1 @default_workers_count 1
@ -47,20 +47,17 @@ defmodule Indexer.Fetcher.TokenTransferTokenIdMigration.Supervisor do
@impl true @impl true
def init(_) do def init(_) do
first_block = Application.get_env(:indexer, :token_id_migration)[:first_block] || @default_first_block first_block = Application.get_env(:explorer, :token_id_migration)[:first_block] || @default_first_block
last_block = TokenTransferTokenIdMigratorProgress.get_last_processed_block_number() last_block = TokenTransferTokenIdMigratorProgress.get_last_processed_block_number()
if last_block > first_block do if last_block > first_block do
workers_count = Application.get_env(:indexer, :token_id_migration)[:concurrency] || @default_workers_count workers_count = Application.get_env(:explorer, :token_id_migration)[:concurrency] || @default_workers_count
workers = workers =
Enum.map(1..workers_count, fn id -> Enum.map(1..workers_count, fn id ->
worker_name = build_worker_name(id)
Supervisor.child_spec( Supervisor.child_spec(
{Worker, {Worker, idx: id, first_block: first_block, last_block: last_block, step: workers_count - 1},
idx: id, first_block: first_block, last_block: last_block, step: workers_count - 1, name: worker_name}, id: {Worker, id},
id: worker_name,
restart: :transient restart: :transient
) )
end) end)
@ -70,6 +67,4 @@ defmodule Indexer.Fetcher.TokenTransferTokenIdMigration.Supervisor do
:ignore :ignore
end end
end end
defp build_worker_name(worker_id), do: :"#{Worker}_#{worker_id}"
end end

@ -1,7 +1,7 @@
defmodule Indexer.Fetcher.TokenTransferTokenIdMigration.Worker do defmodule Explorer.TokenTransferTokenIdMigration.Worker do
@moduledoc """ @moduledoc """
Performs the migration of TokenTransfer token_id to token_ids by batches. Performs the migration of TokenTransfer token_id to token_ids by batches.
Full algorithm is in the 'Indexer.Fetcher.TokenTransferTokenIdMigration.Supervisor' module doc. Full algorithm is in the 'Explorer.TokenTransferTokenIdMigration.Supervisor' module doc.
""" """
use GenServer use GenServer
@ -9,18 +9,18 @@ defmodule Indexer.Fetcher.TokenTransferTokenIdMigration.Worker do
alias Explorer.Chain.TokenTransfer alias Explorer.Chain.TokenTransfer
alias Explorer.Repo alias Explorer.Repo
alias Indexer.Fetcher.TokenTransferTokenIdMigration.LowestBlockNumberUpdater alias Explorer.TokenTransferTokenIdMigration.LowestBlockNumberUpdater
@default_batch_size 500 @default_batch_size 500
@interval 10 @interval 10
def start_link(idx: idx, first_block: first, last_block: last, step: step, name: name) do def start_link(idx: idx, first_block: first, last_block: last, step: step) do
GenServer.start_link(__MODULE__, %{idx: idx, bottom_block: first, last_block: last, step: step}, name: name) GenServer.start_link(__MODULE__, %{idx: idx, bottom_block: first, last_block: last, step: step})
end end
@impl true @impl true
def init(%{idx: idx, bottom_block: bottom_block, last_block: last_block, step: step}) do def init(%{idx: idx, bottom_block: bottom_block, last_block: last_block, step: step}) do
batch_size = Application.get_env(:indexer, :token_id_migration)[:batch_size] || @default_batch_size batch_size = Application.get_env(:explorer, :token_id_migration)[:batch_size] || @default_batch_size
range = calculate_new_range(last_block, bottom_block, batch_size, idx - 1) range = calculate_new_range(last_block, bottom_block, batch_size, idx - 1)
schedule_next_update() schedule_next_update()

@ -1,9 +1,9 @@
defmodule Indexer.Fetcher.TokenTransferTokenIdMigration.LowestBlockNumberUpdaterTest do defmodule Explorer.TokenTransferTokenIdMigration.LowestBlockNumberUpdaterTest do
use Explorer.DataCase use Explorer.DataCase
alias Explorer.Repo alias Explorer.Repo
alias Explorer.TokenTransferTokenIdMigration.LowestBlockNumberUpdater
alias Explorer.Utility.TokenTransferTokenIdMigratorProgress alias Explorer.Utility.TokenTransferTokenIdMigratorProgress
alias Indexer.Fetcher.TokenTransferTokenIdMigration.LowestBlockNumberUpdater
describe "Add range and update last processed block number" do describe "Add range and update last processed block number" do
test "add_range/2" do test "add_range/2" do

@ -2,9 +2,8 @@ defmodule Indexer.Fetcher.TokenTransferTokenIdMigration.WorkerTest do
use Explorer.DataCase use Explorer.DataCase
alias Explorer.Repo alias Explorer.Repo
alias Indexer.Fetcher.TokenTransferTokenIdMigration.Worker alias Explorer.TokenTransferTokenIdMigration.{LowestBlockNumberUpdater, Worker}
alias Explorer.Utility.TokenTransferTokenIdMigratorProgress alias Explorer.Utility.TokenTransferTokenIdMigratorProgress
alias Indexer.Fetcher.TokenTransferTokenIdMigration.LowestBlockNumberUpdater
describe "Move TokenTransfer token_id to token_ids" do describe "Move TokenTransfer token_id to token_ids" do
test "Move token_ids and update last processed block number" do test "Move token_ids and update last processed block number" do

@ -26,7 +26,6 @@ defmodule Indexer.Supervisor do
TokenBalance, TokenBalance,
TokenInstance, TokenInstance,
TokenTotalSupplyOnDemand, TokenTotalSupplyOnDemand,
TokenTransferTokenIdMigration,
TokenUpdater, TokenUpdater,
UncleBlock UncleBlock
} }
@ -140,8 +139,7 @@ defmodule Indexer.Supervisor do
[[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]}, [[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]},
{BlocksTransactionsMismatch.Supervisor, {BlocksTransactionsMismatch.Supervisor,
[[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]}, [[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]},
{PendingOpsCleaner, [[], []]}, {PendingOpsCleaner, [[], []]}
{TokenTransferTokenIdMigration.Supervisor, [[]]}
] ]
|> List.flatten() |> List.flatten()

@ -357,6 +357,15 @@ config :explorer, Explorer.Account,
template: System.get_env("ACCOUNT_SENDGRID_TEMPLATE") template: System.get_env("ACCOUNT_SENDGRID_TEMPLATE")
] ]
{token_id_migration_first_block, _} = Integer.parse(System.get_env("TOKEN_ID_MIGRATION_FIRST_BLOCK", "0"))
{token_id_migration_concurrency, _} = Integer.parse(System.get_env("TOKEN_ID_MIGRATION_CONCURRENCY", "1"))
{token_id_migration_batch_size, _} = Integer.parse(System.get_env("TOKEN_ID_MIGRATION_BATCH_SIZE", "500"))
config :explorer, :token_id_migration,
first_block: token_id_migration_first_block,
concurrency: token_id_migration_concurrency,
batch_size: token_id_migration_batch_size
############### ###############
### Indexer ### ### Indexer ###
############### ###############
@ -458,15 +467,6 @@ config :indexer, Indexer.Block.Catchup.Fetcher,
batch_size: blocks_catchup_fetcher_batch_size, batch_size: blocks_catchup_fetcher_batch_size,
concurrency: blocks_catchup_fetcher_concurrency concurrency: blocks_catchup_fetcher_concurrency
{token_id_migration_first_block, _} = Integer.parse(System.get_env("TOKEN_ID_MIGRATION_FIRST_BLOCK", "0"))
{token_id_migration_concurrency, _} = Integer.parse(System.get_env("TOKEN_ID_MIGRATION_CONCURRENCY", "1"))
{token_id_migration_batch_size, _} = Integer.parse(System.get_env("TOKEN_ID_MIGRATION_BATCH_SIZE", "500"))
config :indexer, :token_id_migration,
first_block: token_id_migration_first_block,
concurrency: token_id_migration_concurrency,
batch_size: token_id_migration_batch_size
Code.require_file("#{config_env()}.exs", "config/runtime") Code.require_file("#{config_env()}.exs", "config/runtime")
for config <- "../apps/*/config/runtime/#{config_env()}.exs" |> Path.expand(__DIR__) |> Path.wildcard() do for config <- "../apps/*/config/runtime/#{config_env()}.exs" |> Path.expand(__DIR__) |> Path.wildcard() do

Loading…
Cancel
Save