chore: Refactor init functions to use continue if needed (#10300)

pull/10381/head
Qwerty5Uiop 5 months ago committed by GitHub
parent c89696b412
commit 2a00b0cd1b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 7
      apps/explorer/lib/explorer/counters/average_block_time.ex
  2. 9
      apps/explorer/lib/explorer/migrator/filling_migration.ex
  3. 9
      apps/explorer/lib/explorer/migrator/sanitize_incorrect_nft_token_transfers.ex
  4. 9
      apps/explorer/lib/explorer/migrator/sanitize_incorrect_weth_token_transfers.ex
  5. 14
      apps/explorer/lib/explorer/migrator/sanitize_missing_block_ranges.ex
  6. 26
      apps/indexer/lib/indexer/fetcher/polygon_edge/deposit.ex
  7. 28
      apps/indexer/lib/indexer/fetcher/polygon_edge/deposit_execute.ex
  8. 28
      apps/indexer/lib/indexer/fetcher/polygon_edge/withdrawal.ex
  9. 26
      apps/indexer/lib/indexer/fetcher/polygon_edge/withdrawal_exit.ex
  10. 7
      apps/indexer/lib/indexer/fetcher/token_instance/sanitize_erc721.ex
  11. 90
      apps/indexer/lib/indexer/fetcher/transaction_action.ex
  12. 8
      apps/indexer/lib/indexer/fetcher/withdrawal.ex

@ -44,7 +44,12 @@ defmodule Explorer.Counters.AverageBlockTime do
refresh_period = Application.get_env(:explorer, __MODULE__)[:cache_period] refresh_period = Application.get_env(:explorer, __MODULE__)[:cache_period]
Process.send_after(self(), :refresh_timestamps, refresh_period) Process.send_after(self(), :refresh_timestamps, refresh_period)
{:ok, refresh_timestamps()} {:ok, %{}, {:continue, :ok}}
end
@impl true
def handle_continue(:ok, _state) do
{:noreply, refresh_timestamps()}
end end
@impl true @impl true

@ -32,15 +32,20 @@ defmodule Explorer.Migrator.FillingMigration do
@impl true @impl true
def init(_) do def init(_) do
{:ok, %{}, {:continue, :ok}}
end
@impl true
def handle_continue(:ok, state) do
case MigrationStatus.get_status(migration_name()) do case MigrationStatus.get_status(migration_name()) do
"completed" -> "completed" ->
update_cache() update_cache()
:ignore {:stop, :normal, state}
_ -> _ ->
MigrationStatus.set_status(migration_name(), "started") MigrationStatus.set_status(migration_name(), "started")
schedule_batch_migration() schedule_batch_migration()
{:ok, %{}} {:noreply, %{}}
end end
end end

@ -24,14 +24,19 @@ defmodule Explorer.Migrator.SanitizeIncorrectNFTTokenTransfers do
@impl true @impl true
def init(_) do def init(_) do
{:ok, %{}, {:continue, :ok}}
end
@impl true
def handle_continue(:ok, state) do
case MigrationStatus.get_status(@migration_name) do case MigrationStatus.get_status(@migration_name) do
"completed" -> "completed" ->
:ignore {:stop, :normal, state}
_ -> _ ->
MigrationStatus.set_status(@migration_name, "started") MigrationStatus.set_status(@migration_name, "started")
schedule_batch_migration() schedule_batch_migration()
{:ok, %{step: :delete}} {:noreply, %{step: :delete}}
end end
end end

@ -24,14 +24,19 @@ defmodule Explorer.Migrator.SanitizeIncorrectWETHTokenTransfers do
@impl true @impl true
def init(_) do def init(_) do
{:ok, %{}, {:continue, :ok}}
end
@impl true
def handle_continue(:ok, state) do
case MigrationStatus.get_status(@migration_name) do case MigrationStatus.get_status(@migration_name) do
"completed" -> "completed" ->
:ignore {:stop, :normal, state}
_ -> _ ->
MigrationStatus.set_status(@migration_name, "started") MigrationStatus.set_status(@migration_name, "started")
schedule_batch_migration() schedule_batch_migration()
{:ok, %{step: :delete_duplicates}} {:noreply, %{step: :delete_duplicates}}
end end
end end

@ -15,19 +15,19 @@ defmodule Explorer.Migrator.SanitizeMissingBlockRanges do
end end
def init(_) do def init(_) do
{:ok, %{}, {:continue, :ok}}
end
def handle_continue(:ok, state) do
case MigrationStatus.get_status(@migration_name) do case MigrationStatus.get_status(@migration_name) do
"completed" -> "completed" ->
:ignore :ok
_ -> _ ->
MigrationStatus.set_status(@migration_name, "started") MigrationStatus.set_status(@migration_name, "started")
{:ok, %{}, {:continue, :ok}} MissingBlockRange.sanitize_missing_block_ranges()
MigrationStatus.set_status(@migration_name, "completed")
end end
end
def handle_continue(:ok, state) do
MissingBlockRange.sanitize_missing_block_ranges()
MigrationStatus.set_status(@migration_name, "completed")
{:stop, :normal, state} {:stop, :normal, state}
end end

@ -45,19 +45,27 @@ defmodule Indexer.Fetcher.PolygonEdge.Deposit do
@impl GenServer @impl GenServer
def init(_args) do def init(_args) do
{:ok, %{}, {:continue, :ok}}
end
@impl GenServer
def handle_continue(:ok, state) do
Logger.metadata(fetcher: @fetcher_name) Logger.metadata(fetcher: @fetcher_name)
env = Application.get_all_env(:indexer)[__MODULE__] env = Application.get_all_env(:indexer)[__MODULE__]
PolygonEdge.init_l1( case PolygonEdge.init_l1(
Deposit, Deposit,
env, env,
self(), self(),
env[:state_sender], env[:state_sender],
"State Sender", "State Sender",
"polygon_edge_deposits", "polygon_edge_deposits",
"Deposits" "Deposits"
) ) do
:ignore -> {:stop, :normal, state}
{:ok, new_state} -> {:noreply, new_state}
end
end end
@impl GenServer @impl GenServer

@ -44,21 +44,29 @@ defmodule Indexer.Fetcher.PolygonEdge.DepositExecute do
@impl GenServer @impl GenServer
def init(args) do def init(args) do
{:ok, %{}, {:continue, args}}
end
@impl GenServer
def handle_continue(args, state) do
Logger.metadata(fetcher: @fetcher_name) Logger.metadata(fetcher: @fetcher_name)
json_rpc_named_arguments = args[:json_rpc_named_arguments] json_rpc_named_arguments = args[:json_rpc_named_arguments]
env = Application.get_all_env(:indexer)[__MODULE__] env = Application.get_all_env(:indexer)[__MODULE__]
PolygonEdge.init_l2( case PolygonEdge.init_l2(
DepositExecute, DepositExecute,
env, env,
self(), self(),
env[:state_receiver], env[:state_receiver],
"StateReceiver", "StateReceiver",
"polygon_edge_deposit_executes", "polygon_edge_deposit_executes",
"Deposit Executes", "Deposit Executes",
json_rpc_named_arguments json_rpc_named_arguments
) ) do
:ignore -> {:stop, :normal, state}
{:ok, new_state} -> {:noreply, new_state}
end
end end
@impl GenServer @impl GenServer

@ -49,21 +49,29 @@ defmodule Indexer.Fetcher.PolygonEdge.Withdrawal do
@impl GenServer @impl GenServer
def init(args) do def init(args) do
{:ok, %{}, {:continue, args}}
end
@impl GenServer
def handle_continue(args, state) do
Logger.metadata(fetcher: @fetcher_name) Logger.metadata(fetcher: @fetcher_name)
json_rpc_named_arguments = args[:json_rpc_named_arguments] json_rpc_named_arguments = args[:json_rpc_named_arguments]
env = Application.get_all_env(:indexer)[__MODULE__] env = Application.get_all_env(:indexer)[__MODULE__]
PolygonEdge.init_l2( case PolygonEdge.init_l2(
Withdrawal, Withdrawal,
env, env,
self(), self(),
env[:state_sender], env[:state_sender],
"L2StateSender", "L2StateSender",
"polygon_edge_withdrawals", "polygon_edge_withdrawals",
"Withdrawals", "Withdrawals",
json_rpc_named_arguments json_rpc_named_arguments
) ) do
:ignore -> {:stop, :normal, state}
{:ok, new_state} -> {:noreply, new_state}
end
end end
@impl GenServer @impl GenServer

@ -37,19 +37,27 @@ defmodule Indexer.Fetcher.PolygonEdge.WithdrawalExit do
@impl GenServer @impl GenServer
def init(_args) do def init(_args) do
{:ok, %{}, {:continue, :ok}}
end
@impl GenServer
def handle_continue(:ok, state) do
Logger.metadata(fetcher: @fetcher_name) Logger.metadata(fetcher: @fetcher_name)
env = Application.get_all_env(:indexer)[__MODULE__] env = Application.get_all_env(:indexer)[__MODULE__]
PolygonEdge.init_l1( case PolygonEdge.init_l1(
WithdrawalExit, WithdrawalExit,
env, env,
self(), self(),
env[:exit_helper], env[:exit_helper],
"Exit Helper", "Exit Helper",
"polygon_edge_withdrawal_exits", "polygon_edge_withdrawal_exits",
"Withdrawals" "Withdrawals"
) ) do
:ignore -> {:stop, :normal, state}
{:ok, new_state} -> {:noreply, new_state}
end
end end
@impl GenServer @impl GenServer

@ -28,10 +28,15 @@ defmodule Indexer.Fetcher.TokenInstance.SanitizeERC721 do
@impl true @impl true
def init(opts) do def init(opts) do
{:ok, %{}, {:continue, opts}}
end
@impl true
def handle_continue(opts, _state) do
last_token_address_hash = Constants.get_last_processed_token_address_hash() last_token_address_hash = Constants.get_last_processed_token_address_hash()
GenServer.cast(__MODULE__, :fetch_tokens_queue) GenServer.cast(__MODULE__, :fetch_tokens_queue)
{:ok, Map.put(opts, :last_token_address_hash, last_token_address_hash)} {:noreply, Map.put(opts, :last_token_address_hash, last_token_address_hash)}
end end
@impl true @impl true

@ -66,6 +66,47 @@ defmodule Indexer.Fetcher.TransactionAction do
end end
end end
@impl true
def handle_continue({opts, first_block, last_block}, _state) do
logger_metadata = Logger.metadata()
Logger.metadata(fetcher: :transaction_action)
max_block_number = Chain.fetch_max_block_number()
if last_block > max_block_number do
Logger.warning(
"Note, that the last block number (#{last_block}) provided to #{__MODULE__} exceeds max block number available in DB (#{max_block_number})."
)
end
supported_protocols =
TransactionAction.supported_protocols()
|> Enum.map(&Atom.to_string(&1))
protocols =
opts
|> Keyword.get(:reindex_protocols, "")
|> String.trim()
|> String.split(",")
|> Enum.map(&String.trim(&1))
|> Enum.filter(&Enum.member?(supported_protocols, &1))
next_block = get_next_block(first_block, last_block, protocols)
state =
%__MODULE__{
first_block: first_block,
next_block: next_block,
last_block: last_block,
protocols: protocols
}
|> run_fetch()
Logger.reset_metadata(logger_metadata)
{:noreply, state}
end
@impl GenServer @impl GenServer
def handle_info(:fetch, %__MODULE__{} = state) do def handle_info(:fetch, %__MODULE__{} = state) do
task = Task.Supervisor.async_nolink(Indexer.Fetcher.TransactionAction.TaskSupervisor, fn -> task(state) end) task = Task.Supervisor.async_nolink(Indexer.Fetcher.TransactionAction.TaskSupervisor, fn -> task(state) end)
@ -195,53 +236,14 @@ defmodule Indexer.Fetcher.TransactionAction do
end end
defp init_fetching(opts, first_block, last_block) do defp init_fetching(opts, first_block, last_block) do
logger_metadata = Logger.metadata()
Logger.metadata(fetcher: :transaction_action)
first_block = parse_integer(first_block) first_block = parse_integer(first_block)
last_block = parse_integer(last_block) last_block = parse_integer(last_block)
return = if is_nil(first_block) or is_nil(last_block) or first_block <= 0 or last_block <= 0 or first_block > last_block do
if is_nil(first_block) or is_nil(last_block) or first_block <= 0 or last_block <= 0 or first_block > last_block do {:stop, "Correct block range must be provided to #{__MODULE__}."}
{:stop, "Correct block range must be provided to #{__MODULE__}."} else
else {:ok, %{}, {:continue, {opts, first_block, last_block}}}
max_block_number = Chain.fetch_max_block_number() end
if last_block > max_block_number do
Logger.warning(
"Note, that the last block number (#{last_block}) provided to #{__MODULE__} exceeds max block number available in DB (#{max_block_number})."
)
end
supported_protocols =
TransactionAction.supported_protocols()
|> Enum.map(&Atom.to_string(&1))
protocols =
opts
|> Keyword.get(:reindex_protocols, "")
|> String.trim()
|> String.split(",")
|> Enum.map(&String.trim(&1))
|> Enum.filter(&Enum.member?(supported_protocols, &1))
next_block = get_next_block(first_block, last_block, protocols)
state =
%__MODULE__{
first_block: first_block,
next_block: next_block,
last_block: last_block,
protocols: protocols
}
|> run_fetch()
{:ok, state}
end
Logger.reset_metadata(logger_metadata)
return
end end
defp get_next_block(first_block, last_block, protocols) do defp get_next_block(first_block, last_block, protocols) do

@ -56,7 +56,6 @@ defmodule Indexer.Fetcher.Withdrawal do
end end
state = %__MODULE__{ state = %__MODULE__{
blocks_to_fetch: first_block |> Helper.parse_integer() |> missing_block_numbers(),
interval: opts[:interval] || @interval, interval: opts[:interval] || @interval,
json_rpc_named_arguments: json_rpc_named_arguments, json_rpc_named_arguments: json_rpc_named_arguments,
max_batch_size: opts[:max_batch_size] || @batch_size, max_batch_size: opts[:max_batch_size] || @batch_size,
@ -65,13 +64,18 @@ defmodule Indexer.Fetcher.Withdrawal do
Process.send_after(self(), :fetch_withdrawals, state.interval) Process.send_after(self(), :fetch_withdrawals, state.interval)
{:ok, state} {:ok, state, {:continue, first_block}}
else else
Logger.warning("Please, specify the first block of the block range for #{__MODULE__}.") Logger.warning("Please, specify the first block of the block range for #{__MODULE__}.")
:ignore :ignore
end end
end end
@impl GenServer
def handle_continue(first_block, state) do
{:noreply, %{state | blocks_to_fetch: first_block |> Helper.parse_integer() |> missing_block_numbers()}}
end
@impl GenServer @impl GenServer
def handle_info( def handle_info(
:fetch_withdrawals, :fetch_withdrawals,

Loading…
Cancel
Save