add supervisor to supervise failed created addresses fixer

pull/1470/head
Ayrat Badykov 6 years ago
parent a92df4cbf5
commit 61c9781ee0
No known key found for this signature in database
GPG Key ID: B44668E265E9396F
  1. 46
      apps/indexer/lib/indexer/temporary/failed_created_addresses.ex
  2. 38
      apps/indexer/lib/indexer/temporary/failed_created_addresses/supervisor.ex
  3. 10
      apps/indexer/test/indexer/temporary/failed_created_addresses_test.exs

@ -3,10 +3,26 @@ defmodule Indexer.Temporary.FailedCreatedAddresses do
Temporary module to fix internal transactions and their created transactions if a parent transaction has failed.
"""
use GenServer
import Ecto.Query
alias Explorer.Chain.{InternalTransaction, Transaction}
alias Explorer.Repo
alias Indexer.Temporary.FailedCreatedAddresses.TaskSupervisor
@task_options [max_concurrency: 3, timeout: 15_000]
def start_link([json_rpc_named_arguments, gen_server_options]) do
GenServer.start_link(__MODULE__, json_rpc_named_arguments, gen_server_options)
end
@impl GenServer
def init(json_rpc_named_arguments) do
run(json_rpc_named_arguments)
{:ok, json_rpc_named_arguments}
end
def run(json_rpc_named_arguments) do
query =
@ -17,17 +33,25 @@ defmodule Indexer.Temporary.FailedCreatedAddresses do
preload: :transaction
)
query
|> Repo.all()
|> Enum.each(fn internal_transaction ->
internal_transaction
|> code_entry()
|> Indexer.Code.Fetcher.run(json_rpc_named_arguments)
internal_transaction.transaction
|> transaction_entry()
|> Indexer.InternalTransaction.Fetcher.run(json_rpc_named_arguments)
end)
found_internal_transactions = Repo.all(query)
TaskSupervisor
|> Task.Supervisor.async_stream(
found_internal_transactions,
fn internal_transaction -> fix_internal_transaction(internal_transaction, json_rpc_named_arguments) end,
@task_options
)
|> Enum.to_list()
end
def fix_internal_transaction(internal_transaction, json_rpc_named_arguments) do
internal_transaction
|> code_entry()
|> Indexer.Code.Fetcher.run(json_rpc_named_arguments)
internal_transaction.transaction
|> transaction_entry()
|> Indexer.InternalTransaction.Fetcher.run(json_rpc_named_arguments)
end
def code_entry(%InternalTransaction{

@ -0,0 +1,38 @@
defmodule Indexer.Temporary.FailedCreatedAddresses.Supervisor do
@moduledoc """
Supervises `Indexer.Temporary.FailedCreatedAddresses`.
"""
use Supervisor
alias Indexer.Temporary.FailedCreatedAddresses
def child_spec([init_arguments]) do
child_spec([init_arguments, []])
end
def child_spec([_init_arguments, _gen_server_options] = start_link_arguments) do
default = %{
id: __MODULE__,
start: {__MODULE__, :start_link, start_link_arguments},
type: :supervisor
}
Supervisor.child_spec(default, []) |> IO.inspect()
end
def start_link(json_rpc_named_arguments, gen_server_options \\ []) do
Supervisor.start_link(__MODULE__, json_rpc_named_arguments, gen_server_options)
end
@impl Supervisor
def init(json_rpc_named_arguments) do
Supervisor.init(
[
{Task.Supervisor, name: Indexer.Temporary.FailedCreatedAddresses.TaskSupervisor},
{FailedCreatedAddresses, [json_rpc_named_arguments, [name: FailedCreatedAddresses]]}
],
strategy: :rest_for_one
)
end
end

@ -8,7 +8,7 @@ defmodule Indexer.Temporary.FailedCreatedAddressesTest do
alias Explorer.Repo
alias Explorer.Chain.{Address, InternalTransaction, Transaction}
alias Indexer.Temporary.FailedCreatedAddresses
alias Indexer.Temporary.FailedCreatedAddresses.Supervisor
alias Indexer.CoinBalance
@moduletag capture_log: true
@ -105,7 +105,13 @@ defmodule Indexer.Temporary.FailedCreatedAddressesTest do
end)
end
FailedCreatedAddresses.run(json_rpc_named_arguments)
params = [json_rpc_named_arguments, [name: TestFailedCreatedAddresses]]
params
|> Supervisor.child_spec()
|> ExUnit.Callbacks.start_supervised!()
Process.sleep(3_000)
query =
from(t in Transaction,

Loading…
Cancel
Save