Token id migration process

pull/6391/head
Qwerty5Uiop 2 years ago
parent 0d3264dcff
commit 357bcd5f04
  1. 62
      apps/explorer/lib/explorer/utility/token_transfer_token_id_migrator_progress.ex
  2. 11
      apps/explorer/priv/account/migrations/20221104104635_create_token_transfer_token_id_migrator_progress.exs
  3. 11
      apps/explorer/priv/repo/migrations/20221104104635_create_token_transfer_token_id_migrator_progress.exs
  4. 61
      apps/indexer/lib/indexer/fetcher/token_transfer_token_id_migration/lowest_block_number_updater.ex
  5. 42
      apps/indexer/lib/indexer/fetcher/token_transfer_token_id_migration/supervisor.ex
  6. 79
      apps/indexer/lib/indexer/fetcher/token_transfer_token_id_migration/worker.ex
  7. 4
      apps/indexer/lib/indexer/supervisor.ex
  8. 37
      apps/indexer/test/indexer/fetcher/token_transfer_token_id_migration/lowest_block_number_updater_test.exs
  9. 30
      apps/indexer/test/indexer/fetcher/token_transfer_token_id_migration/worker_test.ex
  10. 9
      config/runtime.exs
  11. 3
      docker-compose/envs/common-blockscout.env

@ -0,0 +1,62 @@
defmodule Explorer.Utility.TokenTransferTokenIdMigratorProgress do
use Explorer.Schema
require Logger
alias Explorer.Chain.Cache.BlockNumber
alias Explorer.Repo
schema "token_transfer_token_id_migrator_progress" do
field(:last_processed_block_number, :integer)
timestamps()
end
@doc false
def changeset(progress \\ %__MODULE__{}, params) do
cast(progress, params, [:last_processed_block_number])
end
def get_current_progress do
from(
p in __MODULE__,
order_by: [desc: p.updated_at],
limit: 1
)
|> Repo.one()
end
def get_last_processed_block_number do
case get_current_progress() do
nil ->
latest_processed_block_number = BlockNumber.get_max() + 1
update_last_processed_block_number(latest_processed_block_number)
latest_processed_block_number
%{last_processed_block_number: block_number} ->
block_number
end
end
def update_last_processed_block_number(block_number) do
case get_current_progress() do
nil ->
%{last_processed_block_number: block_number}
|> changeset()
|> Repo.insert()
progress ->
if progress.last_processed_block_number < block_number do
Logger.error(
"TokenTransferTokenIdMigratorProgress new block_number is above the last one. Last: #{progress.last_processed_block_number}, new: #{block_number}"
)
{:error, :invalid_block_number}
else
progress
|> changeset(%{last_processed_block_number: block_number})
|> Repo.update()
end
end
end
end

@ -0,0 +1,11 @@
defmodule Explorer.Repo.Account.Migrations.CreateTokenTransferTokenIdMigratorProgress do
use Ecto.Migration
def change do
create table(:token_transfer_token_id_migrator_progress) do
add(:last_processed_block_number, :integer)
timestamps()
end
end
end

@ -0,0 +1,11 @@
defmodule Explorer.Repo.Migrations.CreateTokenTransferTokenIdMigratorProgress do
use Ecto.Migration
def change do
create table(:token_transfer_token_id_migrator_progress) do
add(:last_processed_block_number, :integer)
timestamps()
end
end
end

@ -0,0 +1,61 @@
defmodule Indexer.Fetcher.TokenTransferTokenIdMigration.LowestBlockNumberUpdater do
use GenServer
alias Explorer.Utility.TokenTransferTokenIdMigratorProgress
def start_link(_) do
GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
end
@impl true
def init(_) do
last_processed_block_number = TokenTransferTokenIdMigratorProgress.get_last_processed_block_number()
{:ok, %{last_processed_block_number: last_processed_block_number, processed_ranges: []}}
end
def add_range(from, to) do
GenServer.cast(__MODULE__, {:add_range, from..to})
end
@impl true
def handle_cast({:add_range, range}, %{processed_ranges: processed_ranges} = state) do
ranges =
[range | processed_ranges]
|> Enum.sort_by(& &1.last, &>=/2)
|> normalize_ranges()
{new_last_number, new_ranges} = maybe_update_last_processed_number(state.last_processed_block_number, ranges)
{:noreply, %{last_processed_block_number: new_last_number, processed_ranges: new_ranges}}
end
defp normalize_ranges(ranges) do
%{prev_range: prev, result: result} =
Enum.reduce(ranges, %{prev_range: nil, result: []}, fn range, %{prev_range: prev_range, result: result} ->
case {prev_range, range} do
{nil, _} ->
%{prev_range: range, result: result}
{%{first: f1, last: l1} = r1, %{first: f2, last: l2} = r2} ->
if l1 - 1 <= f2 do
%{prev_range: f1..l2, result: result}
else
%{prev_range: r2, result: result ++ [r1]}
end
end
end)
result ++ [prev]
end
# since ranges are normalized, we need to check only the first range to determine the new last_processed_number
defp maybe_update_last_processed_number(current_last, [from..to | rest] = ranges) when current_last - 1 <= from do
case TokenTransferTokenIdMigratorProgress.update_last_processed_block_number(to) do
{:ok, _} -> {to, rest}
_ -> {current_last, ranges}
end
end
defp maybe_update_last_processed_number(current_last, ranges), do: {current_last, ranges}
end

@ -0,0 +1,42 @@
defmodule Indexer.Fetcher.TokenTransferTokenIdMigration.Supervisor do
use Supervisor
alias Explorer.Utility.TokenTransferTokenIdMigratorProgress
alias Indexer.Fetcher.TokenTransferTokenIdMigration.LowestBlockNumberUpdater
alias Indexer.Fetcher.TokenTransferTokenIdMigration.Worker
@default_first_block 0
@default_workers_count 1
def start_link(_) do
Supervisor.start_link(__MODULE__, :ok, name: __MODULE__)
end
@impl true
def init(_) do
first_block = Application.get_env(:indexer, :token_id_migration)[:first_block] || @default_first_block
last_block = TokenTransferTokenIdMigratorProgress.get_last_processed_block_number()
if last_block > first_block do
workers_count = Application.get_env(:indexer, :token_id_migration)[:concurrency] || @default_workers_count
workers =
Enum.map(1..workers_count, fn id ->
worker_name = build_worker_name(id)
Supervisor.child_spec(
{Worker,
idx: id, first_block: first_block, last_block: last_block, step: workers_count - 1, name: worker_name},
id: worker_name,
restart: :transient
)
end)
Supervisor.init([LowestBlockNumberUpdater | workers], strategy: :one_for_one)
else
:ignore
end
end
defp build_worker_name(worker_id), do: :"#{Worker}_#{worker_id}"
end

@ -0,0 +1,79 @@
defmodule Indexer.Fetcher.TokenTransferTokenIdMigration.Worker do
use GenServer
import Ecto.Query
alias Explorer.Chain.TokenTransfer
alias Explorer.Repo
alias Indexer.Fetcher.TokenTransferTokenIdMigration.LowestBlockNumberUpdater
@default_batch_size 500
@interval 10
def start_link(idx: idx, first_block: first, last_block: last, step: step, name: name) do
GenServer.start_link(__MODULE__, %{idx: idx, bottom_block: first, last_block: last, step: step}, name: name)
end
@impl true
def init(%{idx: idx, bottom_block: bottom_block, last_block: last_block, step: step}) do
batch_size = Application.get_env(:indexer, :token_id_migration)[:batch_size] || @default_batch_size
range = calculate_new_range(last_block, bottom_block, batch_size, idx - 1)
schedule_next_update()
{:ok, %{batch_size: batch_size, bottom_block: bottom_block, step: step, current_range: range}}
end
@impl true
def handle_info(:update, %{current_range: :out_of_bound} = state) do
{:stop, :normal, state}
end
@impl true
def handle_info(:update, %{current_range: {lower_bound, upper_bound}} = state) do
case do_update(lower_bound, upper_bound) do
{_total, _result} ->
LowestBlockNumberUpdater.add_range(upper_bound, lower_bound)
new_range = calculate_new_range(lower_bound, state.bottom_block, state.batch_size, state.step)
schedule_next_update()
{:noreply, %{state | current_range: new_range}}
_ ->
schedule_next_update()
{:noreply, state}
end
end
defp calculate_new_range(last_processed_block, bottom_block, batch_size, step) do
upper_bound = last_processed_block - step * batch_size - 1
lower_bound = max(upper_bound - batch_size + 1, bottom_block)
if upper_bound >= bottom_block do
{lower_bound, upper_bound}
else
:out_of_bound
end
end
defp do_update(lower_bound, upper_bound) do
query =
from(
tt in TokenTransfer,
where: tt.block_number >= ^lower_bound,
where: tt.block_number <= ^upper_bound,
where: not is_nil(tt.token_id),
update: [
set: [
token_ids: fragment("ARRAY_APPEND(ARRAY[]::decimal[], ?)", tt.token_id),
token_id: nil
]
]
)
Repo.update_all(query, [], timeout: :infinity)
end
defp schedule_next_update do
Process.send_after(self(), :update, @interval)
end
end

@ -26,6 +26,7 @@ defmodule Indexer.Supervisor do
TokenBalance,
TokenInstance,
TokenTotalSupplyOnDemand,
TokenTransferTokenIdMigration,
TokenUpdater,
UncleBlock
}
@ -139,7 +140,8 @@ defmodule Indexer.Supervisor do
[[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]},
{BlocksTransactionsMismatch.Supervisor,
[[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]},
{PendingOpsCleaner, [[], []]}
{PendingOpsCleaner, [[], []]},
{TokenTransferTokenIdMigration.Supervisor, [[]]}
]
|> List.flatten()

@ -0,0 +1,37 @@
defmodule Indexer.Fetcher.TokenTransferTokenIdMigration.LowestBlockNumberUpdaterTest do
use Explorer.DataCase
alias Explorer.Repo
alias Explorer.Utility.TokenTransferTokenIdMigratorProgress
alias Indexer.Fetcher.TokenTransferTokenIdMigration.LowestBlockNumberUpdater
describe "Add range and update last processed block number" do
test "add_range/2" do
TokenTransferTokenIdMigratorProgress.update_last_processed_block_number(2000)
LowestBlockNumberUpdater.start_link([])
LowestBlockNumberUpdater.add_range(1000, 500)
LowestBlockNumberUpdater.add_range(1500, 1001)
Process.sleep(10)
assert %{last_processed_block_number: 2000, processed_ranges: [1500..500//-1]} =
:sys.get_state(LowestBlockNumberUpdater)
assert %{last_processed_block_number: 2000} = Repo.one(TokenTransferTokenIdMigratorProgress)
LowestBlockNumberUpdater.add_range(499, 300)
LowestBlockNumberUpdater.add_range(299, 0)
Process.sleep(10)
assert %{last_processed_block_number: 2000, processed_ranges: [1500..0//-1]} =
:sys.get_state(LowestBlockNumberUpdater)
assert %{last_processed_block_number: 2000} = Repo.one(TokenTransferTokenIdMigratorProgress)
LowestBlockNumberUpdater.add_range(1999, 1501)
Process.sleep(10)
assert %{last_processed_block_number: 0, processed_ranges: []} = :sys.get_state(LowestBlockNumberUpdater)
assert %{last_processed_block_number: 0} = Repo.one(TokenTransferTokenIdMigratorProgress)
end
end
end

@ -0,0 +1,30 @@
defmodule Indexer.Fetcher.TokenTransferTokenIdMigration.WorkerTest do
use Explorer.DataCase
alias Explorer.Repo
alias Indexer.Fetcher.TokenTransferTokenIdMigration.Worker
alias Explorer.Utility.TokenTransferTokenIdMigratorProgress
alias Indexer.Fetcher.TokenTransferTokenIdMigration.LowestBlockNumberUpdater
describe "Move TokenTransfer token_id to token_ids" do
test "Move token_ids and update last processed block number" do
insert(:token_transfer, block_number: 1, token_id: 1, transaction: insert(:transaction))
insert(:token_transfer, block_number: 500, token_id: 2, transaction: insert(:transaction))
insert(:token_transfer, block_number: 1000, token_id: 3, transaction: insert(:transaction))
insert(:token_transfer, block_number: 1500, token_id: 4, transaction: insert(:transaction))
insert(:token_transfer, block_number: 2000, token_id: 5, transaction: insert(:transaction))
TokenTransferTokenIdMigratorProgress.update_last_processed_block_number(3000)
LowestBlockNumberUpdater.start_link([])
Worker.start_link(idx: 1, first_block: 0, last_block: 3000, step: 0, name: :worker_name)
Process.sleep(200)
token_transfers = Repo.all(Explorer.Chain.TokenTransfer)
assert Enum.all?(token_transfers, fn tt -> is_nil(tt.token_id) end)
expected_token_ids = [[Decimal.new(1)], [Decimal.new(2)], [Decimal.new(3)], [Decimal.new(4)], [Decimal.new(5)]]
assert ^expected_token_ids = token_transfers |> Enum.map(& &1.token_ids) |> Enum.sort_by(&List.first/1)
end
end
end

@ -458,6 +458,15 @@ config :indexer, Indexer.Block.Catchup.Fetcher,
batch_size: blocks_catchup_fetcher_batch_size,
concurrency: blocks_catchup_fetcher_concurrency
{token_id_migration_first_block, _} = Integer.parse(System.get_env("TOKEN_ID_MIGRATION_FIRST_BLOCK", "0"))
{token_id_migration_concurrency, _} = Integer.parse(System.get_env("TOKEN_ID_MIGRATION_CONCURRENCY", "1"))
{token_id_migration_batch_size, _} = Integer.parse(System.get_env("TOKEN_ID_MIGRATION_BATCH_SIZE", "500"))
config :indexer, :token_id_migration,
first_block: token_id_migration_first_block,
concurrency: token_id_migration_concurrency,
batch_size: token_id_migration_batch_size
Code.require_file("#{config_env()}.exs", "config/runtime")
for config <- "../apps/*/config/runtime/#{config_env()}.exs" |> Path.expand(__DIR__) |> Path.wildcard() do

@ -84,6 +84,9 @@ INDEXER_DISABLE_PENDING_TRANSACTIONS_FETCHER=false
INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER=false
# INDEXER_CATCHUP_BLOCKS_BATCH_SIZE=
# INDEXER_CATCHUP_BLOCKS_CONCURRENCY=
# TOKEN_ID_MIGRATION_FIRST_BLOCK=
# TOKEN_ID_MIGRATION_CONCURRENCY=
# TOKEN_ID_MIGRATION_BATCH_SIZE=
# WEBAPP_URL=
# API_URL=
WOBSERVER_ENABLED=false

Loading…
Cancel
Save