chore: Add meta to migrations_status (#10678)

pull/10756/head
Qwerty5Uiop 2 months ago committed by GitHub
parent 5934c4f6be
commit eeee120768
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 1
      apps/explorer/config/runtime/test.exs
  2. 10
      apps/explorer/lib/explorer/migrator/filling_migration.ex
  3. 55
      apps/explorer/lib/explorer/migrator/migration_status.ex
  4. 8
      apps/explorer/lib/explorer/migrator/shrink_internal_transactions.ex
  5. 9
      apps/explorer/priv/repo/migrations/20240830142652_add_meta_to_migrations_status.exs

@ -47,6 +47,7 @@ config :explorer, Explorer.Migrator.TokenTransferTokenType, enabled: false
config :explorer, Explorer.Migrator.SanitizeIncorrectWETHTokenTransfers, enabled: false
config :explorer, Explorer.Migrator.TransactionBlockConsensus, enabled: false
config :explorer, Explorer.Migrator.TokenTransferBlockConsensus, enabled: false
config :explorer, Explorer.Migrator.ShrinkInternalTransactions, enabled: false
config :explorer, Explorer.Migrator.RestoreOmittedWETHTransfers, enabled: false
config :explorer,

@ -37,15 +37,15 @@ defmodule Explorer.Migrator.FillingMigration do
@impl true
def handle_continue(:ok, state) do
case MigrationStatus.get_status(migration_name()) do
"completed" ->
case MigrationStatus.fetch(migration_name()) do
%{status: "completed"} ->
update_cache()
{:stop, :normal, state}
_ ->
migration_status ->
MigrationStatus.set_status(migration_name(), "started")
schedule_batch_migration()
{:noreply, %{}}
{:noreply, (migration_status && migration_status.meta) || %{}}
end
end
@ -63,6 +63,8 @@ defmodule Explorer.Migrator.FillingMigration do
|> Enum.map(&run_task/1)
|> Task.await_many(:infinity)
MigrationStatus.update_meta(migration_name(), new_state)
schedule_batch_migration()
{:noreply, new_state}

@ -8,25 +8,72 @@ defmodule Explorer.Migrator.MigrationStatus do
@primary_key false
typed_schema "migrations_status" do
field(:migration_name, :string)
field(:migration_name, :string, primary_key: true)
# ["started", "completed"]
field(:status, :string)
field(:meta, :map)
timestamps()
end
@doc false
def changeset(migration_status \\ %__MODULE__{}, params) do
cast(migration_status, params, [:migration_name, :status])
cast(migration_status, params, [:migration_name, :status, :meta])
end
@doc """
Get the `MigrationStatus` struct by migration name.
"""
@spec fetch(String.t()) :: __MODULE__.t() | nil
def fetch(migration_name) do
migration_name
|> get_by_migration_name_query()
|> Repo.one()
end
@doc """
Get the status of migration by its name.
"""
@spec get_status(String.t()) :: String.t() | nil
def get_status(migration_name) do
Repo.one(from(ms in __MODULE__, where: ms.migration_name == ^migration_name, select: ms.status))
migration_name
|> get_by_migration_name_query()
|> select([ms], ms.status)
|> Repo.one()
end
@doc """
Set the status of migration by its name.
"""
@spec set_status(String.t(), String.t()) :: {:ok, __MODULE__.t()} | {:error, Ecto.Changeset.t()}
def set_status(migration_name, status) do
%{migration_name: migration_name, status: status}
|> changeset()
|> Repo.insert(on_conflict: {:replace_all_except, [:inserted_at]}, conflict_target: :migration_name)
|> Repo.insert(on_conflict: {:replace_all_except, [:inserted_at, :meta]}, conflict_target: :migration_name)
end
@doc """
Update migration meta by its name.
"""
@spec update_meta(String.t(), map()) :: :ok | {:ok, __MODULE__.t()} | {:error, Ecto.Changeset.t()}
def update_meta(migration_name, new_meta) do
migration_name
|> get_by_migration_name_query()
|> Repo.one()
|> case do
nil ->
:ok
migration_status ->
updated_meta = Map.merge(migration_status.meta || %{}, new_meta)
migration_status
|> changeset(%{meta: updated_meta})
|> Repo.update()
end
end
defp get_by_migration_name_query(query \\ __MODULE__, migration_name) do
from(ms in query, where: ms.migration_name == ^migration_name)
end
end

@ -18,13 +18,13 @@ defmodule Explorer.Migrator.ShrinkInternalTransactions do
def migration_name, do: @migration_name
@impl FillingMigration
def last_unprocessed_identifiers(%{max_block_number: -1} = state), do: {[], state}
def last_unprocessed_identifiers(%{"max_block_number" => -1} = state), do: {[], state}
def last_unprocessed_identifiers(%{max_block_number: from_block_number} = state) do
def last_unprocessed_identifiers(%{"max_block_number" => from_block_number} = state) do
limit = batch_size() * concurrency()
to_block_number = max(from_block_number - limit + 1, 0)
{Enum.to_list(from_block_number..to_block_number), %{state | max_block_number: to_block_number - 1}}
{Enum.to_list(from_block_number..to_block_number), %{state | "max_block_number" => to_block_number - 1}}
end
def last_unprocessed_identifiers(state) do
@ -38,7 +38,7 @@ defmodule Explorer.Migrator.ShrinkInternalTransactions do
max_block_number = Repo.one(query, timeout: :infinity)
state
|> Map.put(:max_block_number, max_block_number || -1)
|> Map.put("max_block_number", max_block_number || -1)
|> last_unprocessed_identifiers()
end

@ -0,0 +1,9 @@
defmodule Explorer.Repo.Migrations.AddMetaToMigrationsStatus do
use Ecto.Migration
def change do
alter table(:migrations_status) do
add(:meta, :map)
end
end
end
Loading…
Cancel
Save