Drop unused token_id column from token_transfers table and indexes based on this column (#9005)
* Drop unused token_id column from token_transfers table and indexes on this column * Remove token ids migration * Refactor DB migration * Update CHANGE LOG entryv6.0.0-dev
parent
83978978f9
commit
a4799d35aa
@ -1,65 +0,0 @@ |
||||
defmodule Explorer.TokenTransferTokenIdMigration.LowestBlockNumberUpdater do |
||||
@moduledoc """ |
||||
Collects processed block numbers from token id migration workers |
||||
and updates last_processed_block_number according to them. |
||||
Full algorithm is in the 'Indexer.Fetcher.TokenTransferTokenIdMigration.Supervisor' module doc. |
||||
""" |
||||
use GenServer |
||||
|
||||
alias Explorer.Utility.TokenTransferTokenIdMigratorProgress |
||||
|
||||
def start_link(_) do |
||||
GenServer.start_link(__MODULE__, :ok, name: __MODULE__) |
||||
end |
||||
|
||||
@impl true |
||||
def init(_) do |
||||
last_processed_block_number = TokenTransferTokenIdMigratorProgress.get_last_processed_block_number() |
||||
|
||||
{:ok, %{last_processed_block_number: last_processed_block_number, processed_ranges: []}} |
||||
end |
||||
|
||||
def add_range(from, to) do |
||||
GenServer.cast(__MODULE__, {:add_range, from..to}) |
||||
end |
||||
|
||||
@impl true |
||||
def handle_cast({:add_range, range}, %{processed_ranges: processed_ranges} = state) do |
||||
ranges = |
||||
[range | processed_ranges] |
||||
|> Enum.sort_by(& &1.last, &>=/2) |
||||
|> normalize_ranges() |
||||
|
||||
{new_last_number, new_ranges} = maybe_update_last_processed_number(state.last_processed_block_number, ranges) |
||||
|
||||
{:noreply, %{last_processed_block_number: new_last_number, processed_ranges: new_ranges}} |
||||
end |
||||
|
||||
defp normalize_ranges(ranges) do |
||||
%{prev_range: prev, result: result} = |
||||
Enum.reduce(ranges, %{prev_range: nil, result: []}, fn range, %{prev_range: prev_range, result: result} -> |
||||
case {prev_range, range} do |
||||
{nil, _} -> |
||||
%{prev_range: range, result: result} |
||||
|
||||
{%{last: l1} = r1, %{first: f2} = r2} when l1 - 1 > f2 -> |
||||
%{prev_range: r2, result: [r1 | result]} |
||||
|
||||
{%{first: f1}, %{last: l2}} -> |
||||
%{prev_range: f1..l2, result: result} |
||||
end |
||||
end) |
||||
|
||||
Enum.reverse([prev | result]) |
||||
end |
||||
|
||||
# since ranges are normalized, we need to check only the first range to determine the new last_processed_number |
||||
defp maybe_update_last_processed_number(current_last, [from..to | rest] = ranges) when current_last - 1 <= from do |
||||
case TokenTransferTokenIdMigratorProgress.update_last_processed_block_number(to) do |
||||
{:ok, _} -> {to, rest} |
||||
_ -> {current_last, ranges} |
||||
end |
||||
end |
||||
|
||||
defp maybe_update_last_processed_number(current_last, ranges), do: {current_last, ranges} |
||||
end |
@ -1,70 +0,0 @@ |
||||
defmodule Explorer.TokenTransferTokenIdMigration.Supervisor do |
||||
@moduledoc """ |
||||
Supervises parts of token id migration process. |
||||
|
||||
Migration process algorithm: |
||||
|
||||
Defining the bounds of migration (by the first and the last block number of TokenTransfer). |
||||
Supervisor starts the workers in amount equal to 'TOKEN_ID_MIGRATION_CONCURRENCY' env value (defaults to 1) |
||||
and the 'LowestBlockNumberUpdater'. |
||||
|
||||
Each worker goes through the token transfers by batches ('TOKEN_ID_MIGRATION_BATCH_SIZE', defaults to 500) |
||||
and updates the token_ids to be equal of [token_id] for transfers that has any token_id. |
||||
Worker goes from the newest blocks to latest. |
||||
After worker is done with current batch, it sends the information about processed batch to 'LowestBlockNumberUpdater' |
||||
and takes the next by defining its bounds based on amount of all workers. |
||||
|
||||
For example, if batch size is 10, we have 5 workers and 100 items to be processed, |
||||
the distribution will be like this: |
||||
1 worker - 99..90, 49..40 |
||||
2 worker - 89..80, 39..30 |
||||
3 worker - 79..70, 29..20 |
||||
4 worker - 69..60, 19..10 |
||||
5 worker - 59..50, 9..0 |
||||
|
||||
'LowestBlockNumberUpdater' keeps the information about the last processed block number |
||||
(which is stored in the 'token_transfer_token_id_migrator_progress' db entity) |
||||
and block ranges that has already been processed by the workers but couldn't be committed |
||||
to last processed block number yet (because of the possible gap between the current last block |
||||
and upper bound of the last processed batch). Uncommitted block numbers are stored in normalize ranges. |
||||
When there is no gap between the last processed block number and the upper bound of the upper range, |
||||
'LowestBlockNumberUpdater' updates the last processed block number in db and drops this range from its state. |
||||
|
||||
This supervisor won't start if the migration is completed |
||||
(last processed block number in db == 'TOKEN_ID_MIGRATION_FIRST_BLOCK' (defaults to 0)). |
||||
""" |
||||
use Supervisor |
||||
|
||||
alias Explorer.TokenTransferTokenIdMigration.{LowestBlockNumberUpdater, Worker} |
||||
alias Explorer.Utility.TokenTransferTokenIdMigratorProgress |
||||
|
||||
@default_first_block 0 |
||||
@default_workers_count 1 |
||||
|
||||
def start_link(_) do |
||||
Supervisor.start_link(__MODULE__, :ok, name: __MODULE__) |
||||
end |
||||
|
||||
@impl true |
||||
def init(_) do |
||||
first_block = Application.get_env(:explorer, :token_id_migration)[:first_block] || @default_first_block |
||||
last_block = TokenTransferTokenIdMigratorProgress.get_last_processed_block_number() |
||||
|
||||
if last_block > first_block do |
||||
workers_count = Application.get_env(:explorer, :token_id_migration)[:concurrency] || @default_workers_count |
||||
|
||||
workers = |
||||
Enum.map(1..workers_count, fn id -> |
||||
Supervisor.child_spec( |
||||
{Worker, idx: id, first_block: first_block, last_block: last_block, step: workers_count - 1}, |
||||
id: {Worker, id}, |
||||
restart: :transient |
||||
) |
||||
end) |
||||
|
||||
Supervisor.init([LowestBlockNumberUpdater | workers], strategy: :one_for_one) |
||||
else |
||||
:ignore |
||||
end |
||||
end |
||||
end |
@ -1,84 +0,0 @@ |
||||
defmodule Explorer.TokenTransferTokenIdMigration.Worker do |
||||
@moduledoc """ |
||||
Performs the migration of TokenTransfer token_id to token_ids by batches. |
||||
Full algorithm is in the 'Explorer.TokenTransferTokenIdMigration.Supervisor' module doc. |
||||
""" |
||||
use GenServer |
||||
|
||||
import Ecto.Query |
||||
|
||||
alias Explorer.Chain.TokenTransfer |
||||
alias Explorer.Repo |
||||
alias Explorer.TokenTransferTokenIdMigration.LowestBlockNumberUpdater |
||||
|
||||
@default_batch_size 500 |
||||
@interval 10 |
||||
|
||||
def start_link(idx: idx, first_block: first, last_block: last, step: step) do |
||||
GenServer.start_link(__MODULE__, %{idx: idx, bottom_block: first, last_block: last, step: step}) |
||||
end |
||||
|
||||
@impl true |
||||
def init(%{idx: idx, bottom_block: bottom_block, last_block: last_block, step: step}) do |
||||
batch_size = Application.get_env(:explorer, :token_id_migration)[:batch_size] || @default_batch_size |
||||
range = calculate_new_range(last_block, bottom_block, batch_size, idx - 1) |
||||
|
||||
schedule_next_update() |
||||
|
||||
{:ok, %{batch_size: batch_size, bottom_block: bottom_block, step: step, current_range: range}} |
||||
end |
||||
|
||||
@impl true |
||||
def handle_info(:update, %{current_range: :out_of_bound} = state) do |
||||
{:stop, :normal, state} |
||||
end |
||||
|
||||
@impl true |
||||
def handle_info(:update, %{current_range: {lower_bound, upper_bound}} = state) do |
||||
case do_update(lower_bound, upper_bound) do |
||||
true -> |
||||
LowestBlockNumberUpdater.add_range(upper_bound, lower_bound) |
||||
new_range = calculate_new_range(lower_bound, state.bottom_block, state.batch_size, state.step) |
||||
schedule_next_update() |
||||
{:noreply, %{state | current_range: new_range}} |
||||
|
||||
_ -> |
||||
schedule_next_update() |
||||
{:noreply, state} |
||||
end |
||||
end |
||||
|
||||
defp calculate_new_range(last_processed_block, bottom_block, batch_size, step) do |
||||
upper_bound = last_processed_block - step * batch_size - 1 |
||||
lower_bound = max(upper_bound - batch_size + 1, bottom_block) |
||||
|
||||
if upper_bound >= bottom_block do |
||||
{lower_bound, upper_bound} |
||||
else |
||||
:out_of_bound |
||||
end |
||||
end |
||||
|
||||
defp do_update(lower_bound, upper_bound) do |
||||
token_transfers_batch_query = |
||||
from( |
||||
tt in TokenTransfer, |
||||
where: tt.block_number >= ^lower_bound, |
||||
where: tt.block_number <= ^upper_bound |
||||
) |
||||
|
||||
token_transfers_batch_query |
||||
|> Repo.all() |
||||
|> Enum.filter(fn %{token_id: token_id} -> not is_nil(token_id) end) |
||||
|> Enum.map(fn token_transfer -> |
||||
token_transfer |
||||
|> TokenTransfer.changeset(%{token_ids: [token_transfer.token_id], token_id: nil}) |
||||
|> Repo.update() |
||||
end) |
||||
|> Enum.all?(&match?({:ok, _}, &1)) |
||||
end |
||||
|
||||
defp schedule_next_update do |
||||
Process.send_after(self(), :update, @interval) |
||||
end |
||||
end |
@ -1,67 +0,0 @@ |
||||
defmodule Explorer.Utility.TokenTransferTokenIdMigratorProgress do |
||||
@moduledoc """ |
||||
Module is responsible for keeping the current progress of TokenTransfer token_id migration. |
||||
Full algorithm is in the 'Indexer.Fetcher.TokenTransferTokenIdMigration.Supervisor' module doc. |
||||
""" |
||||
use Explorer.Schema |
||||
|
||||
require Logger |
||||
|
||||
alias Explorer.Chain.Cache.BlockNumber |
||||
alias Explorer.Repo |
||||
|
||||
schema "token_transfer_token_id_migrator_progress" do |
||||
field(:last_processed_block_number, :integer) |
||||
|
||||
timestamps() |
||||
end |
||||
|
||||
@doc false |
||||
def changeset(progress \\ %__MODULE__{}, params) do |
||||
cast(progress, params, [:last_processed_block_number]) |
||||
end |
||||
|
||||
def get_current_progress do |
||||
Repo.one( |
||||
from( |
||||
p in __MODULE__, |
||||
order_by: [desc: p.updated_at], |
||||
limit: 1 |
||||
) |
||||
) |
||||
end |
||||
|
||||
def get_last_processed_block_number do |
||||
case get_current_progress() do |
||||
nil -> |
||||
latest_processed_block_number = BlockNumber.get_max() + 1 |
||||
update_last_processed_block_number(latest_processed_block_number) |
||||
latest_processed_block_number |
||||
|
||||
%{last_processed_block_number: block_number} -> |
||||
block_number |
||||
end |
||||
end |
||||
|
||||
def update_last_processed_block_number(block_number, force \\ false) do |
||||
case get_current_progress() do |
||||
nil -> |
||||
%{last_processed_block_number: block_number} |
||||
|> changeset() |
||||
|> Repo.insert() |
||||
|
||||
progress -> |
||||
if not force and progress.last_processed_block_number < block_number do |
||||
Logger.error( |
||||
"TokenTransferTokenIdMigratorProgress new block_number is above the last one. Last: #{progress.last_processed_block_number}, new: #{block_number}" |
||||
) |
||||
|
||||
{:error, :invalid_block_number} |
||||
else |
||||
progress |
||||
|> changeset(%{last_processed_block_number: block_number}) |
||||
|> Repo.update() |
||||
end |
||||
end |
||||
end |
||||
end |
@ -0,0 +1,12 @@ |
||||
defmodule Explorer.Repo.Migrations.DropTokenTransfersTokenIdColumn do |
||||
use Ecto.Migration |
||||
|
||||
def change do |
||||
drop(index(:token_transfers, [:token_id])) |
||||
drop(index(:token_transfers, [:token_contract_address_hash, "token_id DESC", "block_number DESC"])) |
||||
|
||||
alter table(:token_transfers) do |
||||
remove(:token_id) |
||||
end |
||||
end |
||||
end |
@ -1,37 +0,0 @@ |
||||
defmodule Explorer.TokenTransferTokenIdMigration.LowestBlockNumberUpdaterTest do |
||||
use Explorer.DataCase, async: false |
||||
|
||||
alias Explorer.Repo |
||||
alias Explorer.TokenTransferTokenIdMigration.LowestBlockNumberUpdater |
||||
alias Explorer.Utility.TokenTransferTokenIdMigratorProgress |
||||
|
||||
describe "Add range and update last processed block number" do |
||||
test "add_range/2" do |
||||
TokenTransferTokenIdMigratorProgress.update_last_processed_block_number(2000, true) |
||||
LowestBlockNumberUpdater.start_link([]) |
||||
|
||||
LowestBlockNumberUpdater.add_range(1000, 500) |
||||
LowestBlockNumberUpdater.add_range(1500, 1001) |
||||
Process.sleep(10) |
||||
|
||||
assert %{last_processed_block_number: 2000, processed_ranges: [1500..500//-1]} = |
||||
:sys.get_state(LowestBlockNumberUpdater) |
||||
|
||||
assert %{last_processed_block_number: 2000} = Repo.one(TokenTransferTokenIdMigratorProgress) |
||||
|
||||
LowestBlockNumberUpdater.add_range(499, 300) |
||||
LowestBlockNumberUpdater.add_range(299, 0) |
||||
Process.sleep(10) |
||||
|
||||
assert %{last_processed_block_number: 2000, processed_ranges: [1500..0//-1]} = |
||||
:sys.get_state(LowestBlockNumberUpdater) |
||||
|
||||
assert %{last_processed_block_number: 2000} = Repo.one(TokenTransferTokenIdMigratorProgress) |
||||
|
||||
LowestBlockNumberUpdater.add_range(1999, 1501) |
||||
Process.sleep(10) |
||||
assert %{last_processed_block_number: 0, processed_ranges: []} = :sys.get_state(LowestBlockNumberUpdater) |
||||
assert %{last_processed_block_number: 0} = Repo.one(TokenTransferTokenIdMigratorProgress) |
||||
end |
||||
end |
||||
end |
@ -1,31 +0,0 @@ |
||||
defmodule Explorer.TokenTransferTokenIdMigration.WorkerTest do |
||||
use Explorer.DataCase, async: false |
||||
|
||||
alias Explorer.Repo |
||||
alias Explorer.TokenTransferTokenIdMigration.{LowestBlockNumberUpdater, Worker} |
||||
alias Explorer.Utility.TokenTransferTokenIdMigratorProgress |
||||
|
||||
describe "Move TokenTransfer token_id to token_ids" do |
||||
test "Move token_ids and update last processed block number" do |
||||
insert(:token_transfer, block_number: 1, token_id: 1, transaction: insert(:transaction)) |
||||
insert(:token_transfer, block_number: 500, token_id: 2, transaction: insert(:transaction)) |
||||
insert(:token_transfer, block_number: 1000, token_id: 3, transaction: insert(:transaction)) |
||||
insert(:token_transfer, block_number: 1500, token_id: 4, transaction: insert(:transaction)) |
||||
insert(:token_transfer, block_number: 2000, token_id: 5, transaction: insert(:transaction)) |
||||
|
||||
TokenTransferTokenIdMigratorProgress.update_last_processed_block_number(3000, true) |
||||
LowestBlockNumberUpdater.start_link([]) |
||||
|
||||
Worker.start_link(idx: 1, first_block: 0, last_block: 3000, step: 2) |
||||
Worker.start_link(idx: 2, first_block: 0, last_block: 3000, step: 2) |
||||
Worker.start_link(idx: 3, first_block: 0, last_block: 3000, step: 2) |
||||
Process.sleep(200) |
||||
|
||||
token_transfers = Repo.all(Explorer.Chain.TokenTransfer) |
||||
assert Enum.all?(token_transfers, fn tt -> is_nil(tt.token_id) end) |
||||
|
||||
expected_token_ids = [[Decimal.new(1)], [Decimal.new(2)], [Decimal.new(3)], [Decimal.new(4)], [Decimal.new(5)]] |
||||
assert ^expected_token_ids = token_transfers |> Enum.map(& &1.token_ids) |> Enum.sort_by(&List.first/1) |
||||
end |
||||
end |
||||
end |
Loading…
Reference in new issue