diff --git a/apps/explorer/lib/explorer/counters/average_block_time.ex b/apps/explorer/lib/explorer/counters/average_block_time.ex index 02e8e34647..44d459cd38 100644 --- a/apps/explorer/lib/explorer/counters/average_block_time.ex +++ b/apps/explorer/lib/explorer/counters/average_block_time.ex @@ -44,7 +44,12 @@ defmodule Explorer.Counters.AverageBlockTime do refresh_period = Application.get_env(:explorer, __MODULE__)[:cache_period] Process.send_after(self(), :refresh_timestamps, refresh_period) - {:ok, refresh_timestamps()} + {:ok, %{}, {:continue, :ok}} + end + + @impl true + def handle_continue(:ok, _state) do + {:noreply, refresh_timestamps()} end @impl true diff --git a/apps/explorer/lib/explorer/migrator/filling_migration.ex b/apps/explorer/lib/explorer/migrator/filling_migration.ex index 507dfcb6e5..37d1264810 100644 --- a/apps/explorer/lib/explorer/migrator/filling_migration.ex +++ b/apps/explorer/lib/explorer/migrator/filling_migration.ex @@ -32,15 +32,20 @@ defmodule Explorer.Migrator.FillingMigration do @impl true def init(_) do + {:ok, %{}, {:continue, :ok}} + end + + @impl true + def handle_continue(:ok, state) do case MigrationStatus.get_status(migration_name()) do "completed" -> update_cache() - :ignore + {:stop, :normal, state} _ -> MigrationStatus.set_status(migration_name(), "started") schedule_batch_migration() - {:ok, %{}} + {:noreply, %{}} end end diff --git a/apps/explorer/lib/explorer/migrator/sanitize_incorrect_nft_token_transfers.ex b/apps/explorer/lib/explorer/migrator/sanitize_incorrect_nft_token_transfers.ex index 4933abbf5b..81eaa1ac5b 100644 --- a/apps/explorer/lib/explorer/migrator/sanitize_incorrect_nft_token_transfers.ex +++ b/apps/explorer/lib/explorer/migrator/sanitize_incorrect_nft_token_transfers.ex @@ -24,14 +24,19 @@ defmodule Explorer.Migrator.SanitizeIncorrectNFTTokenTransfers do @impl true def init(_) do + {:ok, %{}, {:continue, :ok}} + end + + @impl true + def handle_continue(:ok, state) do case MigrationStatus.get_status(@migration_name) do "completed" -> - :ignore + {:stop, :normal, state} _ -> MigrationStatus.set_status(@migration_name, "started") schedule_batch_migration() - {:ok, %{step: :delete}} + {:noreply, %{step: :delete}} end end diff --git a/apps/explorer/lib/explorer/migrator/sanitize_incorrect_weth_token_transfers.ex b/apps/explorer/lib/explorer/migrator/sanitize_incorrect_weth_token_transfers.ex index 038ce5fd6d..9bd33e3444 100644 --- a/apps/explorer/lib/explorer/migrator/sanitize_incorrect_weth_token_transfers.ex +++ b/apps/explorer/lib/explorer/migrator/sanitize_incorrect_weth_token_transfers.ex @@ -24,14 +24,19 @@ defmodule Explorer.Migrator.SanitizeIncorrectWETHTokenTransfers do @impl true def init(_) do + {:ok, %{}, {:continue, :ok}} + end + + @impl true + def handle_continue(:ok, state) do case MigrationStatus.get_status(@migration_name) do "completed" -> - :ignore + {:stop, :normal, state} _ -> MigrationStatus.set_status(@migration_name, "started") schedule_batch_migration() - {:ok, %{step: :delete_duplicates}} + {:noreply, %{step: :delete_duplicates}} end end diff --git a/apps/explorer/lib/explorer/migrator/sanitize_missing_block_ranges.ex b/apps/explorer/lib/explorer/migrator/sanitize_missing_block_ranges.ex index 29408229c0..03166816f9 100644 --- a/apps/explorer/lib/explorer/migrator/sanitize_missing_block_ranges.ex +++ b/apps/explorer/lib/explorer/migrator/sanitize_missing_block_ranges.ex @@ -15,19 +15,19 @@ defmodule Explorer.Migrator.SanitizeMissingBlockRanges do end def init(_) do + {:ok, %{}, {:continue, :ok}} + end + + def handle_continue(:ok, state) do case MigrationStatus.get_status(@migration_name) do "completed" -> - :ignore + :ok _ -> MigrationStatus.set_status(@migration_name, "started") - {:ok, %{}, {:continue, :ok}} + MissingBlockRange.sanitize_missing_block_ranges() + MigrationStatus.set_status(@migration_name, "completed") end - end - - def handle_continue(:ok, state) do - MissingBlockRange.sanitize_missing_block_ranges() - MigrationStatus.set_status(@migration_name, "completed") {:stop, :normal, state} end diff --git a/apps/indexer/lib/indexer/fetcher/polygon_edge/deposit.ex b/apps/indexer/lib/indexer/fetcher/polygon_edge/deposit.ex index 556acfd892..6864d66273 100644 --- a/apps/indexer/lib/indexer/fetcher/polygon_edge/deposit.ex +++ b/apps/indexer/lib/indexer/fetcher/polygon_edge/deposit.ex @@ -45,19 +45,27 @@ defmodule Indexer.Fetcher.PolygonEdge.Deposit do @impl GenServer def init(_args) do + {:ok, %{}, {:continue, :ok}} + end + + @impl GenServer + def handle_continue(:ok, state) do Logger.metadata(fetcher: @fetcher_name) env = Application.get_all_env(:indexer)[__MODULE__] - PolygonEdge.init_l1( - Deposit, - env, - self(), - env[:state_sender], - "State Sender", - "polygon_edge_deposits", - "Deposits" - ) + case PolygonEdge.init_l1( + Deposit, + env, + self(), + env[:state_sender], + "State Sender", + "polygon_edge_deposits", + "Deposits" + ) do + :ignore -> {:stop, :normal, state} + {:ok, new_state} -> {:noreply, new_state} + end end @impl GenServer diff --git a/apps/indexer/lib/indexer/fetcher/polygon_edge/deposit_execute.ex b/apps/indexer/lib/indexer/fetcher/polygon_edge/deposit_execute.ex index c716774836..90159b16cc 100644 --- a/apps/indexer/lib/indexer/fetcher/polygon_edge/deposit_execute.ex +++ b/apps/indexer/lib/indexer/fetcher/polygon_edge/deposit_execute.ex @@ -44,21 +44,29 @@ defmodule Indexer.Fetcher.PolygonEdge.DepositExecute do @impl GenServer def init(args) do + {:ok, %{}, {:continue, args}} + end + + @impl GenServer + def handle_continue(args, state) do Logger.metadata(fetcher: @fetcher_name) json_rpc_named_arguments = args[:json_rpc_named_arguments] env = Application.get_all_env(:indexer)[__MODULE__] - PolygonEdge.init_l2( - DepositExecute, - env, - self(), - env[:state_receiver], - "StateReceiver", - "polygon_edge_deposit_executes", - "Deposit Executes", - json_rpc_named_arguments - ) + case PolygonEdge.init_l2( + DepositExecute, + env, + self(), + env[:state_receiver], + "StateReceiver", + "polygon_edge_deposit_executes", + "Deposit Executes", + json_rpc_named_arguments + ) do + :ignore -> {:stop, :normal, state} + {:ok, new_state} -> {:noreply, new_state} + end end @impl GenServer diff --git a/apps/indexer/lib/indexer/fetcher/polygon_edge/withdrawal.ex b/apps/indexer/lib/indexer/fetcher/polygon_edge/withdrawal.ex index c629b1df33..47f6661830 100644 --- a/apps/indexer/lib/indexer/fetcher/polygon_edge/withdrawal.ex +++ b/apps/indexer/lib/indexer/fetcher/polygon_edge/withdrawal.ex @@ -49,21 +49,29 @@ defmodule Indexer.Fetcher.PolygonEdge.Withdrawal do @impl GenServer def init(args) do + {:ok, %{}, {:continue, args}} + end + + @impl GenServer + def handle_continue(args, state) do Logger.metadata(fetcher: @fetcher_name) json_rpc_named_arguments = args[:json_rpc_named_arguments] env = Application.get_all_env(:indexer)[__MODULE__] - PolygonEdge.init_l2( - Withdrawal, - env, - self(), - env[:state_sender], - "L2StateSender", - "polygon_edge_withdrawals", - "Withdrawals", - json_rpc_named_arguments - ) + case PolygonEdge.init_l2( + Withdrawal, + env, + self(), + env[:state_sender], + "L2StateSender", + "polygon_edge_withdrawals", + "Withdrawals", + json_rpc_named_arguments + ) do + :ignore -> {:stop, :normal, state} + {:ok, new_state} -> {:noreply, new_state} + end end @impl GenServer diff --git a/apps/indexer/lib/indexer/fetcher/polygon_edge/withdrawal_exit.ex b/apps/indexer/lib/indexer/fetcher/polygon_edge/withdrawal_exit.ex index e19ea6517c..f949dbee7e 100644 --- a/apps/indexer/lib/indexer/fetcher/polygon_edge/withdrawal_exit.ex +++ b/apps/indexer/lib/indexer/fetcher/polygon_edge/withdrawal_exit.ex @@ -37,19 +37,27 @@ defmodule Indexer.Fetcher.PolygonEdge.WithdrawalExit do @impl GenServer def init(_args) do + {:ok, %{}, {:continue, :ok}} + end + + @impl GenServer + def handle_continue(:ok, state) do Logger.metadata(fetcher: @fetcher_name) env = Application.get_all_env(:indexer)[__MODULE__] - PolygonEdge.init_l1( - WithdrawalExit, - env, - self(), - env[:exit_helper], - "Exit Helper", - "polygon_edge_withdrawal_exits", - "Withdrawals" - ) + case PolygonEdge.init_l1( + WithdrawalExit, + env, + self(), + env[:exit_helper], + "Exit Helper", + "polygon_edge_withdrawal_exits", + "Withdrawals" + ) do + :ignore -> {:stop, :normal, state} + {:ok, new_state} -> {:noreply, new_state} + end end @impl GenServer diff --git a/apps/indexer/lib/indexer/fetcher/token_instance/sanitize_erc721.ex b/apps/indexer/lib/indexer/fetcher/token_instance/sanitize_erc721.ex index bbe8bf7540..7d8938057c 100644 --- a/apps/indexer/lib/indexer/fetcher/token_instance/sanitize_erc721.ex +++ b/apps/indexer/lib/indexer/fetcher/token_instance/sanitize_erc721.ex @@ -28,10 +28,15 @@ defmodule Indexer.Fetcher.TokenInstance.SanitizeERC721 do @impl true def init(opts) do + {:ok, %{}, {:continue, opts}} + end + + @impl true + def handle_continue(opts, _state) do last_token_address_hash = Constants.get_last_processed_token_address_hash() GenServer.cast(__MODULE__, :fetch_tokens_queue) - {:ok, Map.put(opts, :last_token_address_hash, last_token_address_hash)} + {:noreply, Map.put(opts, :last_token_address_hash, last_token_address_hash)} end @impl true diff --git a/apps/indexer/lib/indexer/fetcher/transaction_action.ex b/apps/indexer/lib/indexer/fetcher/transaction_action.ex index 46dda6639e..0e1a8a1756 100644 --- a/apps/indexer/lib/indexer/fetcher/transaction_action.ex +++ b/apps/indexer/lib/indexer/fetcher/transaction_action.ex @@ -66,6 +66,47 @@ defmodule Indexer.Fetcher.TransactionAction do end end + @impl true + def handle_continue({opts, first_block, last_block}, _state) do + logger_metadata = Logger.metadata() + Logger.metadata(fetcher: :transaction_action) + + max_block_number = Chain.fetch_max_block_number() + + if last_block > max_block_number do + Logger.warning( + "Note, that the last block number (#{last_block}) provided to #{__MODULE__} exceeds max block number available in DB (#{max_block_number})." + ) + end + + supported_protocols = + TransactionAction.supported_protocols() + |> Enum.map(&Atom.to_string(&1)) + + protocols = + opts + |> Keyword.get(:reindex_protocols, "") + |> String.trim() + |> String.split(",") + |> Enum.map(&String.trim(&1)) + |> Enum.filter(&Enum.member?(supported_protocols, &1)) + + next_block = get_next_block(first_block, last_block, protocols) + + state = + %__MODULE__{ + first_block: first_block, + next_block: next_block, + last_block: last_block, + protocols: protocols + } + |> run_fetch() + + Logger.reset_metadata(logger_metadata) + + {:noreply, state} + end + @impl GenServer def handle_info(:fetch, %__MODULE__{} = state) do task = Task.Supervisor.async_nolink(Indexer.Fetcher.TransactionAction.TaskSupervisor, fn -> task(state) end) @@ -195,53 +236,14 @@ defmodule Indexer.Fetcher.TransactionAction do end defp init_fetching(opts, first_block, last_block) do - logger_metadata = Logger.metadata() - Logger.metadata(fetcher: :transaction_action) - first_block = parse_integer(first_block) last_block = parse_integer(last_block) - return = - if is_nil(first_block) or is_nil(last_block) or first_block <= 0 or last_block <= 0 or first_block > last_block do - {:stop, "Correct block range must be provided to #{__MODULE__}."} - else - max_block_number = Chain.fetch_max_block_number() - - if last_block > max_block_number do - Logger.warning( - "Note, that the last block number (#{last_block}) provided to #{__MODULE__} exceeds max block number available in DB (#{max_block_number})." - ) - end - - supported_protocols = - TransactionAction.supported_protocols() - |> Enum.map(&Atom.to_string(&1)) - - protocols = - opts - |> Keyword.get(:reindex_protocols, "") - |> String.trim() - |> String.split(",") - |> Enum.map(&String.trim(&1)) - |> Enum.filter(&Enum.member?(supported_protocols, &1)) - - next_block = get_next_block(first_block, last_block, protocols) - - state = - %__MODULE__{ - first_block: first_block, - next_block: next_block, - last_block: last_block, - protocols: protocols - } - |> run_fetch() - - {:ok, state} - end - - Logger.reset_metadata(logger_metadata) - - return + if is_nil(first_block) or is_nil(last_block) or first_block <= 0 or last_block <= 0 or first_block > last_block do + {:stop, "Correct block range must be provided to #{__MODULE__}."} + else + {:ok, %{}, {:continue, {opts, first_block, last_block}}} + end end defp get_next_block(first_block, last_block, protocols) do diff --git a/apps/indexer/lib/indexer/fetcher/withdrawal.ex b/apps/indexer/lib/indexer/fetcher/withdrawal.ex index fc4924aaf7..b8a7707bd9 100644 --- a/apps/indexer/lib/indexer/fetcher/withdrawal.ex +++ b/apps/indexer/lib/indexer/fetcher/withdrawal.ex @@ -56,7 +56,6 @@ defmodule Indexer.Fetcher.Withdrawal do end state = %__MODULE__{ - blocks_to_fetch: first_block |> Helper.parse_integer() |> missing_block_numbers(), interval: opts[:interval] || @interval, json_rpc_named_arguments: json_rpc_named_arguments, max_batch_size: opts[:max_batch_size] || @batch_size, @@ -65,13 +64,18 @@ defmodule Indexer.Fetcher.Withdrawal do Process.send_after(self(), :fetch_withdrawals, state.interval) - {:ok, state} + {:ok, state, {:continue, first_block}} else Logger.warning("Please, specify the first block of the block range for #{__MODULE__}.") :ignore end end + @impl GenServer + def handle_continue(first_block, state) do + {:noreply, %{state | blocks_to_fetch: first_block |> Helper.parse_integer() |> missing_block_numbers()}} + end + @impl GenServer def handle_info( :fetch_withdrawals,