Docs + refactor

pull/6391/head
Qwerty5Uiop 2 years ago
parent 357bcd5f04
commit bfa4296a42
  1. 15
      apps/explorer/lib/explorer/utility/token_transfer_token_id_migrator_progress.ex
  2. 18
      apps/indexer/lib/indexer/fetcher/token_transfer_token_id_migration/lowest_block_number_updater.ex
  3. 37
      apps/indexer/lib/indexer/fetcher/token_transfer_token_id_migration/supervisor.ex
  4. 4
      apps/indexer/lib/indexer/fetcher/token_transfer_token_id_migration/worker.ex
  5. 9
      docker/Makefile

@ -1,4 +1,8 @@
defmodule Explorer.Utility.TokenTransferTokenIdMigratorProgress do
@moduledoc """
Module is responsible for keeping the current progress of TokenTransfer token_id migration.
Full algorithm is in the 'Indexer.Fetcher.TokenTransferTokenIdMigration.Supervisor' module doc.
"""
use Explorer.Schema
require Logger
@ -18,12 +22,13 @@ defmodule Explorer.Utility.TokenTransferTokenIdMigratorProgress do
end
def get_current_progress do
from(
p in __MODULE__,
order_by: [desc: p.updated_at],
limit: 1
Repo.one(
from(
p in __MODULE__,
order_by: [desc: p.updated_at],
limit: 1
)
)
|> Repo.one()
end
def get_last_processed_block_number do

@ -1,4 +1,9 @@
defmodule Indexer.Fetcher.TokenTransferTokenIdMigration.LowestBlockNumberUpdater do
@moduledoc """
Collects processed block numbers from token id migration workers
and updates last_processed_block_number according to them.
Full algorithm is in the 'Indexer.Fetcher.TokenTransferTokenIdMigration.Supervisor' module doc.
"""
use GenServer
alias Explorer.Utility.TokenTransferTokenIdMigratorProgress
@ -37,16 +42,15 @@ defmodule Indexer.Fetcher.TokenTransferTokenIdMigration.LowestBlockNumberUpdater
{nil, _} ->
%{prev_range: range, result: result}
{%{first: f1, last: l1} = r1, %{first: f2, last: l2} = r2} ->
if l1 - 1 <= f2 do
%{prev_range: f1..l2, result: result}
else
%{prev_range: r2, result: result ++ [r1]}
end
{%{last: l1} = r1, %{first: f2} = r2} when l1 - 1 > f2 ->
%{prev_range: r2, result: [r1 | result]}
{%{first: f1}, %{last: l2}} ->
%{prev_range: f1..l2, result: result}
end
end)
result ++ [prev]
Enum.reverse([prev | result])
end
# since ranges are normalized, we need to check only the first range to determine the new last_processed_number

@ -1,9 +1,42 @@
defmodule Indexer.Fetcher.TokenTransferTokenIdMigration.Supervisor do
@moduledoc """
Supervises parts of token id migration process.
Migration process algorithm:
Defining the bounds of migration (by the first and the last block number of TokenTransfer).
Supervisor starts the workers in amount equal to 'TOKEN_ID_MIGRATION_CONCURRENCY' env value (defaults to 1)
and the 'LowestBlockNumberUpdater'.
Each worker goes through the token transfers by batches ('TOKEN_ID_MIGRATION_BATCH_SIZE', defaults to 500)
and updates the token_ids to be equal of [token_id] for transfers that has any token_id.
Worker goes from the newest blocks to latest.
After worker is done with current batch, it sends the information about processed batch to 'LowestBlockNumberUpdater'
and takes the next by defining its bounds based on amount of all workers.
For example, if batch size is 10, we have 5 workers and 100 items to be processed,
the distribution will be like this:
1 worker - 99..90, 49..40
2 worker - 89..80, 39..30
3 worker - 79..70, 29..20
4 worker - 69..60, 19..10
5 worker - 59..50, 9..0
'LowestBlockNumberUpdater' keeps the information about the last processed block number
(which is stored in the 'token_transfer_token_id_migrator_progress' db entity)
and block ranges that has already been processed by the workers but couldn't be committed
to last processed block number yet (because of the possible gap between the current last block
and upper bound of the last processed batch). Uncommitted block numbers are stored in normalize ranges.
When there is no gap between the last processed block number and the upper bound of the upper range,
'LowestBlockNumberUpdater' updates the last processed block number in db and drops this range from its state.
This supervisor won't start if the migration is completed
(last processed block number in db == 'TOKEN_ID_MIGRATION_FIRST_BLOCK' (defaults to 0)).
"""
use Supervisor
alias Explorer.Utility.TokenTransferTokenIdMigratorProgress
alias Indexer.Fetcher.TokenTransferTokenIdMigration.LowestBlockNumberUpdater
alias Indexer.Fetcher.TokenTransferTokenIdMigration.Worker
alias Indexer.Fetcher.TokenTransferTokenIdMigration.{LowestBlockNumberUpdater, Worker}
@default_first_block 0
@default_workers_count 1

@ -1,4 +1,8 @@
defmodule Indexer.Fetcher.TokenTransferTokenIdMigration.Worker do
@moduledoc """
Performs the migration of TokenTransfer token_id to token_ids by batches.
Full algorithm is in the 'Indexer.Fetcher.TokenTransferTokenIdMigration.Supervisor' module doc.
"""
use GenServer
import Ecto.Query

@ -457,6 +457,15 @@ endif
ifdef INDEXER_EMPTY_BLOCKS_SANITIZER_BATCH_SIZE
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_EMPTY_BLOCKS_SANITIZER_BATCH_SIZE=$(INDEXER_EMPTY_BLOCKS_SANITIZER_BATCH_SIZE)'
endif
ifdef TOKEN_ID_MIGRATION_FIRST_BLOCK
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_EMPTY_BLOCKS_SANITIZER_BATCH_SIZE=$(INDEXER_EMPTY_BLOCKS_SANITIZER_BATCH_SIZE)'
endif
ifdef TOKEN_ID_MIGRATION_CONCURRENCY
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_EMPTY_BLOCKS_SANITIZER_BATCH_SIZE=$(INDEXER_EMPTY_BLOCKS_SANITIZER_BATCH_SIZE)'
endif
ifdef TOKEN_ID_MIGRATION_BATCH_SIZE
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_EMPTY_BLOCKS_SANITIZER_BATCH_SIZE=$(INDEXER_EMPTY_BLOCKS_SANITIZER_BATCH_SIZE)'
endif
ifdef SECRET_KEY_BASE
BLOCKSCOUT_CONTAINER_PARAMS += -e 'SECRET_KEY_BASE=$(SECRET_KEY_BASE)'
endif

Loading…
Cancel
Save