From 61c9781ee00624aec95b6f1643fd55c18ee54a76 Mon Sep 17 00:00:00 2001 From: Ayrat Badykov Date: Mon, 25 Feb 2019 12:32:01 +0300 Subject: [PATCH] add supervisor to supervise failed created addresses fixer --- .../temporary/failed_created_addresses.ex | 46 ++++++++++++++----- .../failed_created_addresses/supervisor.ex | 38 +++++++++++++++ .../failed_created_addresses_test.exs | 10 +++- 3 files changed, 81 insertions(+), 13 deletions(-) create mode 100644 apps/indexer/lib/indexer/temporary/failed_created_addresses/supervisor.ex diff --git a/apps/indexer/lib/indexer/temporary/failed_created_addresses.ex b/apps/indexer/lib/indexer/temporary/failed_created_addresses.ex index 00b7a45960..d066f5ce28 100644 --- a/apps/indexer/lib/indexer/temporary/failed_created_addresses.ex +++ b/apps/indexer/lib/indexer/temporary/failed_created_addresses.ex @@ -3,10 +3,26 @@ defmodule Indexer.Temporary.FailedCreatedAddresses do Temporary module to fix internal transactions and their created transactions if a parent transaction has failed. """ + use GenServer + import Ecto.Query alias Explorer.Chain.{InternalTransaction, Transaction} alias Explorer.Repo + alias Indexer.Temporary.FailedCreatedAddresses.TaskSupervisor + + @task_options [max_concurrency: 3, timeout: 15_000] + + def start_link([json_rpc_named_arguments, gen_server_options]) do + GenServer.start_link(__MODULE__, json_rpc_named_arguments, gen_server_options) + end + + @impl GenServer + def init(json_rpc_named_arguments) do + run(json_rpc_named_arguments) + + {:ok, json_rpc_named_arguments} + end def run(json_rpc_named_arguments) do query = @@ -17,17 +33,25 @@ defmodule Indexer.Temporary.FailedCreatedAddresses do preload: :transaction ) - query - |> Repo.all() - |> Enum.each(fn internal_transaction -> - internal_transaction - |> code_entry() - |> Indexer.Code.Fetcher.run(json_rpc_named_arguments) - - internal_transaction.transaction - |> transaction_entry() - |> Indexer.InternalTransaction.Fetcher.run(json_rpc_named_arguments) - end) + found_internal_transactions = Repo.all(query) + + TaskSupervisor + |> Task.Supervisor.async_stream( + found_internal_transactions, + fn internal_transaction -> fix_internal_transaction(internal_transaction, json_rpc_named_arguments) end, + @task_options + ) + |> Enum.to_list() + end + + def fix_internal_transaction(internal_transaction, json_rpc_named_arguments) do + internal_transaction + |> code_entry() + |> Indexer.Code.Fetcher.run(json_rpc_named_arguments) + + internal_transaction.transaction + |> transaction_entry() + |> Indexer.InternalTransaction.Fetcher.run(json_rpc_named_arguments) end def code_entry(%InternalTransaction{ diff --git a/apps/indexer/lib/indexer/temporary/failed_created_addresses/supervisor.ex b/apps/indexer/lib/indexer/temporary/failed_created_addresses/supervisor.ex new file mode 100644 index 0000000000..1e92cde5ee --- /dev/null +++ b/apps/indexer/lib/indexer/temporary/failed_created_addresses/supervisor.ex @@ -0,0 +1,38 @@ +defmodule Indexer.Temporary.FailedCreatedAddresses.Supervisor do + @moduledoc """ + Supervises `Indexer.Temporary.FailedCreatedAddresses`. + """ + + use Supervisor + + alias Indexer.Temporary.FailedCreatedAddresses + + def child_spec([init_arguments]) do + child_spec([init_arguments, []]) + end + + def child_spec([_init_arguments, _gen_server_options] = start_link_arguments) do + default = %{ + id: __MODULE__, + start: {__MODULE__, :start_link, start_link_arguments}, + type: :supervisor + } + + Supervisor.child_spec(default, []) |> IO.inspect() + end + + def start_link(json_rpc_named_arguments, gen_server_options \\ []) do + Supervisor.start_link(__MODULE__, json_rpc_named_arguments, gen_server_options) + end + + @impl Supervisor + def init(json_rpc_named_arguments) do + Supervisor.init( + [ + {Task.Supervisor, name: Indexer.Temporary.FailedCreatedAddresses.TaskSupervisor}, + {FailedCreatedAddresses, [json_rpc_named_arguments, [name: FailedCreatedAddresses]]} + ], + strategy: :rest_for_one + ) + end +end diff --git a/apps/indexer/test/indexer/temporary/failed_created_addresses_test.exs b/apps/indexer/test/indexer/temporary/failed_created_addresses_test.exs index ac4591dd0b..693797bdab 100644 --- a/apps/indexer/test/indexer/temporary/failed_created_addresses_test.exs +++ b/apps/indexer/test/indexer/temporary/failed_created_addresses_test.exs @@ -8,7 +8,7 @@ defmodule Indexer.Temporary.FailedCreatedAddressesTest do alias Explorer.Repo alias Explorer.Chain.{Address, InternalTransaction, Transaction} - alias Indexer.Temporary.FailedCreatedAddresses + alias Indexer.Temporary.FailedCreatedAddresses.Supervisor alias Indexer.CoinBalance @moduletag capture_log: true @@ -105,7 +105,13 @@ defmodule Indexer.Temporary.FailedCreatedAddressesTest do end) end - FailedCreatedAddresses.run(json_rpc_named_arguments) + params = [json_rpc_named_arguments, [name: TestFailedCreatedAddresses]] + + params + |> Supervisor.child_spec() + |> ExUnit.Callbacks.start_supervised!() + + Process.sleep(3_000) query = from(t in Transaction,