commit
ce735c1294
@ -1,4 +1,4 @@ |
||||
<%= link( |
||||
gettext("Block #%{number}", number: to_string(@block.number)), |
||||
to: block_path(BlockScoutWeb.Endpoint, :show, @block) |
||||
to: block_path(BlockScoutWeb.Endpoint, :show, @block.hash) |
||||
) %> |
||||
|
@ -0,0 +1,45 @@ |
||||
defmodule Explorer.Chain.Cache.BackgroundMigrations do |
||||
@moduledoc """ |
||||
Caches background migrations' status. |
||||
""" |
||||
|
||||
require Logger |
||||
|
||||
use Explorer.Chain.MapCache, |
||||
name: :background_migrations_status, |
||||
key: :denormalization_finished, |
||||
key: :tb_token_type_finished, |
||||
key: :ctb_token_type_finished |
||||
|
||||
@dialyzer :no_match |
||||
|
||||
alias Explorer.Migrator.{ |
||||
AddressCurrentTokenBalanceTokenType, |
||||
AddressTokenBalanceTokenType, |
||||
TransactionsDenormalization |
||||
} |
||||
|
||||
defp handle_fallback(:denormalization_finished) do |
||||
Task.start(fn -> |
||||
set_denormalization_finished(TransactionsDenormalization.migration_finished?()) |
||||
end) |
||||
|
||||
{:return, false} |
||||
end |
||||
|
||||
defp handle_fallback(:tb_token_type_finished) do |
||||
Task.start(fn -> |
||||
set_tb_token_type_finished(AddressTokenBalanceTokenType.migration_finished?()) |
||||
end) |
||||
|
||||
{:return, false} |
||||
end |
||||
|
||||
defp handle_fallback(:ctb_token_type_finished) do |
||||
Task.start(fn -> |
||||
set_ctb_token_type_finished(AddressCurrentTokenBalanceTokenType.migration_finished?()) |
||||
end) |
||||
|
||||
{:return, false} |
||||
end |
||||
end |
@ -0,0 +1,50 @@ |
||||
defmodule Explorer.Chain.DenormalizationHelper do |
||||
@moduledoc """ |
||||
Helper functions for dynamic logic based on denormalization migration completeness |
||||
""" |
||||
|
||||
alias Explorer.Chain.Cache.BackgroundMigrations |
||||
|
||||
@spec extend_block_necessity(keyword(), :optional | :required) :: keyword() |
||||
def extend_block_necessity(opts, necessity \\ :optional) do |
||||
if denormalization_finished?() do |
||||
opts |
||||
else |
||||
Keyword.update(opts, :necessity_by_association, %{:block => necessity}, &Map.put(&1, :block, necessity)) |
||||
end |
||||
end |
||||
|
||||
@spec extend_transaction_block_necessity(keyword(), :optional | :required) :: keyword() |
||||
def extend_transaction_block_necessity(opts, necessity \\ :optional) do |
||||
if denormalization_finished?() do |
||||
opts |
||||
else |
||||
Keyword.update( |
||||
opts, |
||||
:necessity_by_association, |
||||
%{[transaction: :block] => necessity}, |
||||
&(&1 |> Map.delete(:transaction) |> Map.put([transaction: :block], necessity)) |
||||
) |
||||
end |
||||
end |
||||
|
||||
@spec extend_transaction_preload(list()) :: list() |
||||
def extend_transaction_preload(preloads) do |
||||
if denormalization_finished?() do |
||||
preloads |
||||
else |
||||
[transaction: :block] ++ (preloads -- [:transaction]) |
||||
end |
||||
end |
||||
|
||||
@spec extend_block_preload(list()) :: list() |
||||
def extend_block_preload(preloads) do |
||||
if denormalization_finished?() do |
||||
preloads |
||||
else |
||||
[:block | preloads] |
||||
end |
||||
end |
||||
|
||||
def denormalization_finished?, do: BackgroundMigrations.get_denormalization_finished() |
||||
end |
@ -0,0 +1,51 @@ |
||||
defmodule Explorer.Migrator.AddressCurrentTokenBalanceTokenType do |
||||
@moduledoc """ |
||||
Fill empty token_type's for address_current_token_balances |
||||
""" |
||||
|
||||
use Explorer.Migrator.FillingMigration |
||||
|
||||
import Ecto.Query |
||||
|
||||
alias Explorer.Chain.Address.CurrentTokenBalance |
||||
alias Explorer.Chain.Cache.BackgroundMigrations |
||||
alias Explorer.Migrator.FillingMigration |
||||
alias Explorer.Repo |
||||
|
||||
@migration_name "ctb_token_type" |
||||
|
||||
@impl FillingMigration |
||||
def migration_name, do: @migration_name |
||||
|
||||
@impl FillingMigration |
||||
def last_unprocessed_identifiers do |
||||
limit = batch_size() * concurrency() |
||||
|
||||
unprocessed_data_query() |
||||
|> select([ctb], ctb.id) |
||||
|> limit(^limit) |
||||
|> Repo.all(timeout: :infinity) |
||||
end |
||||
|
||||
@impl FillingMigration |
||||
def unprocessed_data_query do |
||||
from(ctb in CurrentTokenBalance, where: is_nil(ctb.token_type)) |
||||
end |
||||
|
||||
@impl FillingMigration |
||||
def update_batch(token_balance_ids) do |
||||
query = |
||||
from(current_token_balance in CurrentTokenBalance, |
||||
join: token in assoc(current_token_balance, :token), |
||||
where: current_token_balance.id in ^token_balance_ids, |
||||
update: [set: [token_type: token.type]] |
||||
) |
||||
|
||||
Repo.update_all(query, [], timeout: :infinity) |
||||
end |
||||
|
||||
@impl FillingMigration |
||||
def update_cache do |
||||
BackgroundMigrations.set_ctb_token_type_finished(true) |
||||
end |
||||
end |
@ -0,0 +1,51 @@ |
||||
defmodule Explorer.Migrator.AddressTokenBalanceTokenType do |
||||
@moduledoc """ |
||||
Fill empty token_type's for address_token_balances |
||||
""" |
||||
|
||||
use Explorer.Migrator.FillingMigration |
||||
|
||||
import Ecto.Query |
||||
|
||||
alias Explorer.Chain.Address.TokenBalance |
||||
alias Explorer.Chain.Cache.BackgroundMigrations |
||||
alias Explorer.Migrator.FillingMigration |
||||
alias Explorer.Repo |
||||
|
||||
@migration_name "tb_token_type" |
||||
|
||||
@impl FillingMigration |
||||
def migration_name, do: @migration_name |
||||
|
||||
@impl FillingMigration |
||||
def last_unprocessed_identifiers do |
||||
limit = batch_size() * concurrency() |
||||
|
||||
unprocessed_data_query() |
||||
|> select([tb], tb.id) |
||||
|> limit(^limit) |
||||
|> Repo.all(timeout: :infinity) |
||||
end |
||||
|
||||
@impl FillingMigration |
||||
def unprocessed_data_query do |
||||
from(tb in TokenBalance, where: is_nil(tb.token_type)) |
||||
end |
||||
|
||||
@impl FillingMigration |
||||
def update_batch(token_balance_ids) do |
||||
query = |
||||
from(token_balance in TokenBalance, |
||||
join: token in assoc(token_balance, :token), |
||||
where: token_balance.id in ^token_balance_ids, |
||||
update: [set: [token_type: token.type]] |
||||
) |
||||
|
||||
Repo.update_all(query, [], timeout: :infinity) |
||||
end |
||||
|
||||
@impl FillingMigration |
||||
def update_cache do |
||||
BackgroundMigrations.set_tb_token_type_finished(true) |
||||
end |
||||
end |
@ -0,0 +1,84 @@ |
||||
defmodule Explorer.Migrator.FillingMigration do |
||||
@moduledoc """ |
||||
Template for creating migrations that fills some fields in existing entities |
||||
""" |
||||
|
||||
@callback migration_name :: String.t() |
||||
@callback unprocessed_data_query :: Ecto.Query.t() |
||||
@callback last_unprocessed_identifiers :: [any()] |
||||
@callback update_batch([any()]) :: any() |
||||
@callback update_cache :: any() |
||||
|
||||
defmacro __using__(_opts) do |
||||
quote do |
||||
@behaviour Explorer.Migrator.FillingMigration |
||||
|
||||
use GenServer, restart: :transient |
||||
|
||||
import Ecto.Query |
||||
|
||||
alias Explorer.Migrator.MigrationStatus |
||||
alias Explorer.Repo |
||||
|
||||
@default_batch_size 500 |
||||
|
||||
def start_link(_) do |
||||
GenServer.start_link(__MODULE__, :ok, name: __MODULE__) |
||||
end |
||||
|
||||
def migration_finished? do |
||||
MigrationStatus.get_status(migration_name()) == "completed" |
||||
end |
||||
|
||||
@impl true |
||||
def init(_) do |
||||
case MigrationStatus.get_status(migration_name()) do |
||||
"completed" -> |
||||
update_cache() |
||||
:ignore |
||||
|
||||
_ -> |
||||
MigrationStatus.set_status(migration_name(), "started") |
||||
schedule_batch_migration() |
||||
{:ok, %{}} |
||||
end |
||||
end |
||||
|
||||
@impl true |
||||
def handle_info(:migrate_batch, state) do |
||||
case last_unprocessed_identifiers() do |
||||
[] -> |
||||
update_cache() |
||||
MigrationStatus.set_status(migration_name(), "completed") |
||||
{:stop, :normal, state} |
||||
|
||||
hashes -> |
||||
hashes |
||||
|> Enum.chunk_every(batch_size()) |
||||
|> Enum.map(&run_task/1) |
||||
|> Task.await_many(:infinity) |
||||
|
||||
schedule_batch_migration() |
||||
|
||||
{:noreply, state} |
||||
end |
||||
end |
||||
|
||||
defp run_task(batch), do: Task.async(fn -> update_batch(batch) end) |
||||
|
||||
defp schedule_batch_migration do |
||||
Process.send(self(), :migrate_batch, []) |
||||
end |
||||
|
||||
defp batch_size do |
||||
Application.get_env(:explorer, __MODULE__)[:batch_size] || @default_batch_size |
||||
end |
||||
|
||||
defp concurrency do |
||||
default = 4 * System.schedulers_online() |
||||
|
||||
Application.get_env(:explorer, __MODULE__)[:concurrency] || default |
||||
end |
||||
end |
||||
end |
||||
end |
@ -0,0 +1,32 @@ |
||||
defmodule Explorer.Migrator.MigrationStatus do |
||||
@moduledoc """ |
||||
Module is responsible for keeping the current status of background migrations. |
||||
""" |
||||
use Explorer.Schema |
||||
|
||||
alias Explorer.Repo |
||||
|
||||
@primary_key false |
||||
schema "migrations_status" do |
||||
field(:migration_name, :string) |
||||
# ["started", "completed"] |
||||
field(:status, :string) |
||||
|
||||
timestamps() |
||||
end |
||||
|
||||
@doc false |
||||
def changeset(migration_status \\ %__MODULE__{}, params) do |
||||
cast(migration_status, params, [:migration_name, :status]) |
||||
end |
||||
|
||||
def get_status(migration_name) do |
||||
Repo.one(from(ms in __MODULE__, where: ms.migration_name == ^migration_name, select: ms.status)) |
||||
end |
||||
|
||||
def set_status(migration_name, status) do |
||||
%{migration_name: migration_name, status: status} |
||||
|> changeset() |
||||
|> Repo.insert(on_conflict: {:replace_all_except, [:inserted_at]}, conflict_target: :migration_name) |
||||
end |
||||
end |
@ -0,0 +1,53 @@ |
||||
defmodule Explorer.Migrator.TransactionsDenormalization do |
||||
@moduledoc """ |
||||
Migrates all transactions to have set block_consensus and block_timestamp |
||||
""" |
||||
|
||||
use Explorer.Migrator.FillingMigration |
||||
|
||||
import Ecto.Query |
||||
|
||||
alias Explorer.Chain.Cache.BackgroundMigrations |
||||
alias Explorer.Chain.Transaction |
||||
alias Explorer.Migrator.FillingMigration |
||||
alias Explorer.Repo |
||||
|
||||
@migration_name "denormalization" |
||||
|
||||
@impl FillingMigration |
||||
def migration_name, do: @migration_name |
||||
|
||||
@impl FillingMigration |
||||
def last_unprocessed_identifiers do |
||||
limit = batch_size() * concurrency() |
||||
|
||||
unprocessed_data_query() |
||||
|> select([t], t.hash) |
||||
|> limit(^limit) |
||||
|> Repo.all(timeout: :infinity) |
||||
end |
||||
|
||||
@impl FillingMigration |
||||
def unprocessed_data_query do |
||||
from(t in Transaction, |
||||
where: not is_nil(t.block_hash) and (is_nil(t.block_consensus) or is_nil(t.block_timestamp)) |
||||
) |
||||
end |
||||
|
||||
@impl FillingMigration |
||||
def update_batch(transaction_hashes) do |
||||
query = |
||||
from(transaction in Transaction, |
||||
join: block in assoc(transaction, :block), |
||||
where: transaction.hash in ^transaction_hashes, |
||||
update: [set: [block_consensus: block.consensus, block_timestamp: block.timestamp]] |
||||
) |
||||
|
||||
Repo.update_all(query, [], timeout: :infinity) |
||||
end |
||||
|
||||
@impl FillingMigration |
||||
def update_cache do |
||||
BackgroundMigrations.set_denormalization_finished(true) |
||||
end |
||||
end |
@ -1,65 +0,0 @@ |
||||
defmodule Explorer.TokenTransferTokenIdMigration.LowestBlockNumberUpdater do |
||||
@moduledoc """ |
||||
Collects processed block numbers from token id migration workers |
||||
and updates last_processed_block_number according to them. |
||||
Full algorithm is in the 'Indexer.Fetcher.TokenTransferTokenIdMigration.Supervisor' module doc. |
||||
""" |
||||
use GenServer |
||||
|
||||
alias Explorer.Utility.TokenTransferTokenIdMigratorProgress |
||||
|
||||
def start_link(_) do |
||||
GenServer.start_link(__MODULE__, :ok, name: __MODULE__) |
||||
end |
||||
|
||||
@impl true |
||||
def init(_) do |
||||
last_processed_block_number = TokenTransferTokenIdMigratorProgress.get_last_processed_block_number() |
||||
|
||||
{:ok, %{last_processed_block_number: last_processed_block_number, processed_ranges: []}} |
||||
end |
||||
|
||||
def add_range(from, to) do |
||||
GenServer.cast(__MODULE__, {:add_range, from..to}) |
||||
end |
||||
|
||||
@impl true |
||||
def handle_cast({:add_range, range}, %{processed_ranges: processed_ranges} = state) do |
||||
ranges = |
||||
[range | processed_ranges] |
||||
|> Enum.sort_by(& &1.last, &>=/2) |
||||
|> normalize_ranges() |
||||
|
||||
{new_last_number, new_ranges} = maybe_update_last_processed_number(state.last_processed_block_number, ranges) |
||||
|
||||
{:noreply, %{last_processed_block_number: new_last_number, processed_ranges: new_ranges}} |
||||
end |
||||
|
||||
defp normalize_ranges(ranges) do |
||||
%{prev_range: prev, result: result} = |
||||
Enum.reduce(ranges, %{prev_range: nil, result: []}, fn range, %{prev_range: prev_range, result: result} -> |
||||
case {prev_range, range} do |
||||
{nil, _} -> |
||||
%{prev_range: range, result: result} |
||||
|
||||
{%{last: l1} = r1, %{first: f2} = r2} when l1 - 1 > f2 -> |
||||
%{prev_range: r2, result: [r1 | result]} |
||||
|
||||
{%{first: f1}, %{last: l2}} -> |
||||
%{prev_range: f1..l2, result: result} |
||||
end |
||||
end) |
||||
|
||||
Enum.reverse([prev | result]) |
||||
end |
||||
|
||||
# since ranges are normalized, we need to check only the first range to determine the new last_processed_number |
||||
defp maybe_update_last_processed_number(current_last, [from..to | rest] = ranges) when current_last - 1 <= from do |
||||
case TokenTransferTokenIdMigratorProgress.update_last_processed_block_number(to) do |
||||
{:ok, _} -> {to, rest} |
||||
_ -> {current_last, ranges} |
||||
end |
||||
end |
||||
|
||||
defp maybe_update_last_processed_number(current_last, ranges), do: {current_last, ranges} |
||||
end |
@ -1,70 +0,0 @@ |
||||
defmodule Explorer.TokenTransferTokenIdMigration.Supervisor do |
||||
@moduledoc """ |
||||
Supervises parts of token id migration process. |
||||
|
||||
Migration process algorithm: |
||||
|
||||
Defining the bounds of migration (by the first and the last block number of TokenTransfer). |
||||
Supervisor starts the workers in amount equal to 'TOKEN_ID_MIGRATION_CONCURRENCY' env value (defaults to 1) |
||||
and the 'LowestBlockNumberUpdater'. |
||||
|
||||
Each worker goes through the token transfers by batches ('TOKEN_ID_MIGRATION_BATCH_SIZE', defaults to 500) |
||||
and updates the token_ids to be equal of [token_id] for transfers that has any token_id. |
||||
Worker goes from the newest blocks to latest. |
||||
After worker is done with current batch, it sends the information about processed batch to 'LowestBlockNumberUpdater' |
||||
and takes the next by defining its bounds based on amount of all workers. |
||||
|
||||
For example, if batch size is 10, we have 5 workers and 100 items to be processed, |
||||
the distribution will be like this: |
||||
1 worker - 99..90, 49..40 |
||||
2 worker - 89..80, 39..30 |
||||
3 worker - 79..70, 29..20 |
||||
4 worker - 69..60, 19..10 |
||||
5 worker - 59..50, 9..0 |
||||
|
||||
'LowestBlockNumberUpdater' keeps the information about the last processed block number |
||||
(which is stored in the 'token_transfer_token_id_migrator_progress' db entity) |
||||
and block ranges that has already been processed by the workers but couldn't be committed |
||||
to last processed block number yet (because of the possible gap between the current last block |
||||
and upper bound of the last processed batch). Uncommitted block numbers are stored in normalize ranges. |
||||
When there is no gap between the last processed block number and the upper bound of the upper range, |
||||
'LowestBlockNumberUpdater' updates the last processed block number in db and drops this range from its state. |
||||
|
||||
This supervisor won't start if the migration is completed |
||||
(last processed block number in db == 'TOKEN_ID_MIGRATION_FIRST_BLOCK' (defaults to 0)). |
||||
""" |
||||
use Supervisor |
||||
|
||||
alias Explorer.TokenTransferTokenIdMigration.{LowestBlockNumberUpdater, Worker} |
||||
alias Explorer.Utility.TokenTransferTokenIdMigratorProgress |
||||
|
||||
@default_first_block 0 |
||||
@default_workers_count 1 |
||||
|
||||
def start_link(_) do |
||||
Supervisor.start_link(__MODULE__, :ok, name: __MODULE__) |
||||
end |
||||
|
||||
@impl true |
||||
def init(_) do |
||||
first_block = Application.get_env(:explorer, :token_id_migration)[:first_block] || @default_first_block |
||||
last_block = TokenTransferTokenIdMigratorProgress.get_last_processed_block_number() |
||||
|
||||
if last_block > first_block do |
||||
workers_count = Application.get_env(:explorer, :token_id_migration)[:concurrency] || @default_workers_count |
||||
|
||||
workers = |
||||
Enum.map(1..workers_count, fn id -> |
||||
Supervisor.child_spec( |
||||
{Worker, idx: id, first_block: first_block, last_block: last_block, step: workers_count - 1}, |
||||
id: {Worker, id}, |
||||
restart: :transient |
||||
) |
||||
end) |
||||
|
||||
Supervisor.init([LowestBlockNumberUpdater | workers], strategy: :one_for_one) |
||||
else |
||||
:ignore |
||||
end |
||||
end |
||||
end |
@ -1,84 +0,0 @@ |
||||
defmodule Explorer.TokenTransferTokenIdMigration.Worker do |
||||
@moduledoc """ |
||||
Performs the migration of TokenTransfer token_id to token_ids by batches. |
||||
Full algorithm is in the 'Explorer.TokenTransferTokenIdMigration.Supervisor' module doc. |
||||
""" |
||||
use GenServer |
||||
|
||||
import Ecto.Query |
||||
|
||||
alias Explorer.Chain.TokenTransfer |
||||
alias Explorer.Repo |
||||
alias Explorer.TokenTransferTokenIdMigration.LowestBlockNumberUpdater |
||||
|
||||
@default_batch_size 500 |
||||
@interval 10 |
||||
|
||||
def start_link(idx: idx, first_block: first, last_block: last, step: step) do |
||||
GenServer.start_link(__MODULE__, %{idx: idx, bottom_block: first, last_block: last, step: step}) |
||||
end |
||||
|
||||
@impl true |
||||
def init(%{idx: idx, bottom_block: bottom_block, last_block: last_block, step: step}) do |
||||
batch_size = Application.get_env(:explorer, :token_id_migration)[:batch_size] || @default_batch_size |
||||
range = calculate_new_range(last_block, bottom_block, batch_size, idx - 1) |
||||
|
||||
schedule_next_update() |
||||
|
||||
{:ok, %{batch_size: batch_size, bottom_block: bottom_block, step: step, current_range: range}} |
||||
end |
||||
|
||||
@impl true |
||||
def handle_info(:update, %{current_range: :out_of_bound} = state) do |
||||
{:stop, :normal, state} |
||||
end |
||||
|
||||
@impl true |
||||
def handle_info(:update, %{current_range: {lower_bound, upper_bound}} = state) do |
||||
case do_update(lower_bound, upper_bound) do |
||||
true -> |
||||
LowestBlockNumberUpdater.add_range(upper_bound, lower_bound) |
||||
new_range = calculate_new_range(lower_bound, state.bottom_block, state.batch_size, state.step) |
||||
schedule_next_update() |
||||
{:noreply, %{state | current_range: new_range}} |
||||
|
||||
_ -> |
||||
schedule_next_update() |
||||
{:noreply, state} |
||||
end |
||||
end |
||||
|
||||
defp calculate_new_range(last_processed_block, bottom_block, batch_size, step) do |
||||
upper_bound = last_processed_block - step * batch_size - 1 |
||||
lower_bound = max(upper_bound - batch_size + 1, bottom_block) |
||||
|
||||
if upper_bound >= bottom_block do |
||||
{lower_bound, upper_bound} |
||||
else |
||||
:out_of_bound |
||||
end |
||||
end |
||||
|
||||
defp do_update(lower_bound, upper_bound) do |
||||
token_transfers_batch_query = |
||||
from( |
||||
tt in TokenTransfer, |
||||
where: tt.block_number >= ^lower_bound, |
||||
where: tt.block_number <= ^upper_bound |
||||
) |
||||
|
||||
token_transfers_batch_query |
||||
|> Repo.all() |
||||
|> Enum.filter(fn %{token_id: token_id} -> not is_nil(token_id) end) |
||||
|> Enum.map(fn token_transfer -> |
||||
token_transfer |
||||
|> TokenTransfer.changeset(%{token_ids: [token_transfer.token_id], token_id: nil}) |
||||
|> Repo.update() |
||||
end) |
||||
|> Enum.all?(&match?({:ok, _}, &1)) |
||||
end |
||||
|
||||
defp schedule_next_update do |
||||
Process.send_after(self(), :update, @interval) |
||||
end |
||||
end |
@ -1,67 +0,0 @@ |
||||
defmodule Explorer.Utility.TokenTransferTokenIdMigratorProgress do |
||||
@moduledoc """ |
||||
Module is responsible for keeping the current progress of TokenTransfer token_id migration. |
||||
Full algorithm is in the 'Indexer.Fetcher.TokenTransferTokenIdMigration.Supervisor' module doc. |
||||
""" |
||||
use Explorer.Schema |
||||
|
||||
require Logger |
||||
|
||||
alias Explorer.Chain.Cache.BlockNumber |
||||
alias Explorer.Repo |
||||
|
||||
schema "token_transfer_token_id_migrator_progress" do |
||||
field(:last_processed_block_number, :integer) |
||||
|
||||
timestamps() |
||||
end |
||||
|
||||
@doc false |
||||
def changeset(progress \\ %__MODULE__{}, params) do |
||||
cast(progress, params, [:last_processed_block_number]) |
||||
end |
||||
|
||||
def get_current_progress do |
||||
Repo.one( |
||||
from( |
||||
p in __MODULE__, |
||||
order_by: [desc: p.updated_at], |
||||
limit: 1 |
||||
) |
||||
) |
||||
end |
||||
|
||||
def get_last_processed_block_number do |
||||
case get_current_progress() do |
||||
nil -> |
||||
latest_processed_block_number = BlockNumber.get_max() + 1 |
||||
update_last_processed_block_number(latest_processed_block_number) |
||||
latest_processed_block_number |
||||
|
||||
%{last_processed_block_number: block_number} -> |
||||
block_number |
||||
end |
||||
end |
||||
|
||||
def update_last_processed_block_number(block_number, force \\ false) do |
||||
case get_current_progress() do |
||||
nil -> |
||||
%{last_processed_block_number: block_number} |
||||
|> changeset() |
||||
|> Repo.insert() |
||||
|
||||
progress -> |
||||
if not force and progress.last_processed_block_number < block_number do |
||||
Logger.error( |
||||
"TokenTransferTokenIdMigratorProgress new block_number is above the last one. Last: #{progress.last_processed_block_number}, new: #{block_number}" |
||||
) |
||||
|
||||
{:error, :invalid_block_number} |
||||
else |
||||
progress |
||||
|> changeset(%{last_processed_block_number: block_number}) |
||||
|> Repo.update() |
||||
end |
||||
end |
||||
end |
||||
end |
@ -0,0 +1,13 @@ |
||||
defmodule Explorer.Repo.Migrations.AddBlockTimestampAndConsensusToTransactions do |
||||
use Ecto.Migration |
||||
|
||||
def change do |
||||
alter table(:transactions) do |
||||
add_if_not_exists(:block_timestamp, :utc_datetime_usec) |
||||
add_if_not_exists(:block_consensus, :boolean, default: true) |
||||
end |
||||
|
||||
create_if_not_exists(index(:transactions, :block_timestamp)) |
||||
create_if_not_exists(index(:transactions, :block_consensus)) |
||||
end |
||||
end |
@ -0,0 +1,12 @@ |
||||
defmodule Explorer.Repo.Migrations.CreateMigrationsStatus do |
||||
use Ecto.Migration |
||||
|
||||
def change do |
||||
create table(:migrations_status, primary_key: false) do |
||||
add(:migration_name, :string, primary_key: true) |
||||
add(:status, :string) |
||||
|
||||
timestamps() |
||||
end |
||||
end |
||||
end |
@ -0,0 +1,11 @@ |
||||
defmodule Explorer.Repo.Migrations.CreateBtreeGinExtension do |
||||
use Ecto.Migration |
||||
|
||||
def up do |
||||
execute("CREATE EXTENSION IF NOT EXISTS btree_gin") |
||||
end |
||||
|
||||
def down do |
||||
execute("DROP EXTENSION IF EXISTS btree_gin") |
||||
end |
||||
end |
@ -0,0 +1,26 @@ |
||||
defmodule Explorer.Repo.Migrations.AddTokenTransfersTokenContractAddressTokenIdsIndex do |
||||
use Ecto.Migration |
||||
@disable_ddl_transaction true |
||||
@disable_migration_lock true |
||||
|
||||
def up do |
||||
create( |
||||
index( |
||||
:token_transfers, |
||||
[:token_contract_address_hash, :token_ids], |
||||
name: "token_transfers_token_contract_address_hash_token_ids_index", |
||||
using: "GIN", |
||||
concurrently: true |
||||
) |
||||
) |
||||
end |
||||
|
||||
def down do |
||||
drop_if_exists( |
||||
index(:token_transfers, [:token_contract_address_hash, :token_ids], |
||||
name: :token_transfers_token_contract_address_hash_token_ids_index |
||||
), |
||||
concurrently: true |
||||
) |
||||
end |
||||
end |
@ -0,0 +1,7 @@ |
||||
defmodule Explorer.Repo.Migrations.DropTokenTransfersTokenIdsIndex do |
||||
use Ecto.Migration |
||||
|
||||
def change do |
||||
drop_if_exists(index(:token_transfers, [:token_ids], name: :token_transfers_token_ids_index)) |
||||
end |
||||
end |
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in new issue