diff --git a/apps/explorer/lib/explorer/chain/import/runner/blocks.ex b/apps/explorer/lib/explorer/chain/import/runner/blocks.ex index 7d7c7e0ca1..5253256f41 100644 --- a/apps/explorer/lib/explorer/chain/import/runner/blocks.ex +++ b/apps/explorer/lib/explorer/chain/import/runner/blocks.ex @@ -8,7 +8,6 @@ defmodule Explorer.Chain.Import.Runner.Blocks do import Ecto.Query, only: [from: 2, subquery: 1] alias Ecto.{Changeset, Multi, Repo} - alias Explorer.Chain alias Explorer.Chain.{Address, Block, Import, PendingBlockOperation, Transaction} alias Explorer.Chain.Block.Reward alias Explorer.Chain.Import.Runner @@ -303,8 +302,6 @@ defmodule Explorer.Chain.Import.Runner.Blocks do timeout: timeout ) - :ok = Chain.remove_nonconsensus_blocks_from_pending_ops(removed_consensus_block_hashes) - {:ok, removed_consensus_block_hashes} rescue postgrex_error in Postgrex.Error -> diff --git a/apps/indexer/lib/indexer/pending_ops_cleaner.ex b/apps/indexer/lib/indexer/pending_ops_cleaner.ex new file mode 100644 index 0000000000..dd3e39950c --- /dev/null +++ b/apps/indexer/lib/indexer/pending_ops_cleaner.ex @@ -0,0 +1,45 @@ +defmodule Indexer.PendingOpsCleaner do + @moduledoc """ + Peiodically cleans non-consensus pending ops. + """ + + use GenServer + + require Logger + + alias Explorer.Chain + + @interval :timer.minutes(60) + + def start_link([init_opts, gen_server_opts]) do + start_link(init_opts, gen_server_opts) + end + + def start_link(init_opts, gen_server_opts) do + GenServer.start_link(__MODULE__, init_opts, gen_server_opts) + end + + def init(opts) do + interval = opts[:interval] || @interval + + Process.send_after(self(), :clean_nonconsensus_pending_ops, interval) + + {:ok, %{interval: interval}} + end + + def handle_info(:clean_nonconsensus_pending_ops, %{interval: interval} = state) do + Logger.debug(fn -> "Cleaning non-consensus pending ops" end) + + clean_nonconsensus_pending_ops() + + Process.send_after(self(), :clean_nonconsensus_pending_ops, interval) + + {:noreply, state} + end + + defp clean_nonconsensus_pending_ops do + :ok = Chain.remove_nonconsensus_blocks_from_pending_ops() + + Logger.debug(fn -> "Non-consensus pending ops are cleaned" end) + end +end diff --git a/apps/indexer/lib/indexer/supervisor.ex b/apps/indexer/lib/indexer/supervisor.ex index f922a1a180..f22b6de487 100644 --- a/apps/indexer/lib/indexer/supervisor.ex +++ b/apps/indexer/lib/indexer/supervisor.ex @@ -5,7 +5,7 @@ defmodule Indexer.Supervisor do use Supervisor - alias Indexer.Block + alias Indexer.{Block, PendingOpsCleaner} alias Indexer.Block.{Catchup, Realtime} alias Indexer.Fetcher.{ @@ -129,7 +129,8 @@ defmodule Indexer.Supervisor do {UnclesWithoutIndex.Supervisor, [[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]}, {BlocksTransactionsMismatch.Supervisor, - [[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]} + [[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]}, + {PendingOpsCleaner, [[], []]} ], strategy: :one_for_one ) diff --git a/apps/indexer/test/indexer/pending_ops_cleaner_test.exs b/apps/indexer/test/indexer/pending_ops_cleaner_test.exs new file mode 100644 index 0000000000..f8f1cbdb30 --- /dev/null +++ b/apps/indexer/test/indexer/pending_ops_cleaner_test.exs @@ -0,0 +1,34 @@ +defmodule Indexer.PendingOpsCleanerTest do + use Explorer.DataCase + + alias Explorer.Chain.PendingBlockOperation + alias Indexer.PendingOpsCleaner + + describe "init/1" do + test "deletes non-consensus pending ops on init" do + block = insert(:block, consensus: false) + + insert(:pending_block_operation, block_hash: block.hash, fetch_internal_transactions: true) + + assert Repo.one(from(block in PendingBlockOperation, where: block.block_hash == ^block.hash)) + + start_supervised!({PendingOpsCleaner, [[interval: :timer.seconds(1)], [name: :PendingOpsTest]]}) + + Process.sleep(2_000) + + assert is_nil(Repo.one(from(block in PendingBlockOperation, where: block.block_hash == ^block.hash))) + end + + test "re-schedules deletion" do + start_supervised!({PendingOpsCleaner, [[interval: :timer.seconds(1)], [name: :PendingOpsTest]]}) + + block = insert(:block, consensus: false) + + insert(:pending_block_operation, block_hash: block.hash, fetch_internal_transactions: true) + + Process.sleep(2_000) + + assert is_nil(Repo.one(from(block in PendingBlockOperation, where: block.block_hash == ^block.hash))) + end + end +end