Separate usage of Task.Supervisor

Have a separate subtree for each fetcher with a supervisor supervising
the fetcher and a Task.Supervisor that is only used by the fetcher, so
that looking at the Task.Supervisor in that subtree will show which
fetcher is causing any memory, message, or blocking problems.
pull/707/head
Luke Imhoff 6 years ago
parent 9b3ed74507
commit 09051ceb8a
  1. 2
      .credo.exs
  2. 25
      apps/indexer/lib/indexer/application.ex
  3. 16
      apps/indexer/lib/indexer/block_fetcher/catchup.ex
  4. 110
      apps/indexer/lib/indexer/block_fetcher/catchup/bound_interval_supervisor.ex
  5. 116
      apps/indexer/lib/indexer/block_fetcher/catchup/supervisor.ex
  6. 4
      apps/indexer/lib/indexer/block_fetcher/realtime.ex
  7. 15
      apps/indexer/lib/indexer/buffered_task.ex
  8. 4
      apps/indexer/lib/indexer/coin_balance/fetcher.ex
  9. 38
      apps/indexer/lib/indexer/coin_balance/supervisor.ex
  10. 8
      apps/indexer/lib/indexer/internal_transaction/fetcher.ex
  11. 39
      apps/indexer/lib/indexer/internal_transaction/supervisor.ex
  12. 41
      apps/indexer/lib/indexer/pending_transaction/fetcher.ex
  13. 39
      apps/indexer/lib/indexer/pending_transaction/supervisor.ex
  14. 4
      apps/indexer/lib/indexer/token/fetcher.ex
  15. 38
      apps/indexer/lib/indexer/token/supervisor.ex
  16. 4
      apps/indexer/lib/indexer/token_balance/fetcher.ex
  17. 38
      apps/indexer/lib/indexer/token_balance/supervisor.ex
  18. 72
      apps/indexer/test/indexer/block_fetcher/catchup/bound_interval_supervisor_test.exs
  19. 5
      apps/indexer/test/indexer/block_fetcher/realtime_test.exs
  20. 31
      apps/indexer/test/indexer/block_fetcher_test.exs
  21. 4
      apps/indexer/test/indexer/buffered_task_test.exs
  22. 26
      apps/indexer/test/indexer/coin_balance/fetcher_test.exs
  23. 31
      apps/indexer/test/indexer/internal_transaction/fetcher_test.exs
  24. 5
      apps/indexer/test/indexer/pending_transaction/fetcher_test.exs
  25. 18
      apps/indexer/test/indexer/token/fetcher_test.exs
  26. 15
      apps/indexer/test/indexer/token_balance/fetcher_test.exs
  27. 9
      apps/indexer/test/support/indexer/block_fetcher/catchup/supervisor/case.ex
  28. 18
      apps/indexer/test/support/indexer/coin_balance/supervisor/case.ex
  29. 16
      apps/indexer/test/support/indexer/coin_balance_fetcher_case.ex
  30. 18
      apps/indexer/test/support/indexer/internal_transaction/supervisor/case.ex
  31. 10
      apps/indexer/test/support/indexer/internal_transaction_fetcher_case.ex
  32. 18
      apps/indexer/test/support/indexer/pending_transaction/supervisor/case.ex
  33. 18
      apps/indexer/test/support/indexer/token/supervisor/case.ex
  34. 18
      apps/indexer/test/support/indexer/token_balance/supervisor/case.ex
  35. 16
      apps/indexer/test/support/indexer/token_balance_fetcher_case.ex
  36. 16
      apps/indexer/test/support/indexer/token_fetcher_case.ex

@ -76,7 +76,7 @@
#
{Credo.Check.Design.AliasUsage,
excluded_namespaces: ~w(Socket Task),
excluded_lastnames: ~w(Address DateTime Full Name Number Repo Time Unit),
excluded_lastnames: ~w(Address DateTime Fetcher Full Name Number Repo Time Unit),
priority: :low},
# For some checks, you can also set other parameters

@ -6,12 +6,12 @@ defmodule Indexer.Application do
use Application
alias Indexer.{
CoinBalanceFetcher,
CoinBalance,
BlockFetcher,
InternalTransactionFetcher,
PendingTransactionFetcher,
TokenFetcher,
TokenBalanceFetcher
InternalTransaction,
PendingTransaction,
Token,
TokenBalance
}
@impl Application
@ -28,13 +28,14 @@ defmodule Indexer.Application do
|> Enum.into(%{})
children = [
{Task.Supervisor, name: Indexer.TaskSupervisor},
{CoinBalanceFetcher, [[json_rpc_named_arguments: json_rpc_named_arguments], [name: CoinBalanceFetcher]]},
{PendingTransactionFetcher, name: PendingTransactionFetcher, json_rpc_named_arguments: json_rpc_named_arguments},
{InternalTransactionFetcher,
[[json_rpc_named_arguments: json_rpc_named_arguments], [name: InternalTransactionFetcher]]},
{TokenFetcher, [[json_rpc_named_arguments: json_rpc_named_arguments], [name: TokenFetcher]]},
{TokenBalanceFetcher, [[json_rpc_named_arguments: json_rpc_named_arguments], [name: TokenBalanceFetcher]]},
{CoinBalance.Supervisor, [[json_rpc_named_arguments: json_rpc_named_arguments], [name: CoinBalance.Supervisor]]},
{PendingTransaction.Supervisor,
[[json_rpc_named_arguments: json_rpc_named_arguments], [name: PendingTransactionFetcher]]},
{InternalTransaction.Supervisor,
[[json_rpc_named_arguments: json_rpc_named_arguments], [name: InternalTransaction.Supervisor]]},
{Token.Supervisor, [[json_rpc_named_arguments: json_rpc_named_arguments], [name: Token.Supervisor]]},
{TokenBalance.Supervisor,
[[json_rpc_named_arguments: json_rpc_named_arguments], [name: TokenBalance.Supervisor]]},
{BlockFetcher.Supervisor, [block_fetcher_supervisor_named_arguments, [name: BlockFetcher.Supervisor]]}
]

@ -10,12 +10,12 @@ defmodule Indexer.BlockFetcher.Catchup do
alias Explorer.Chain
alias Indexer.{
CoinBalanceFetcher,
BlockFetcher,
InternalTransactionFetcher,
CoinBalance,
InternalTransaction,
Sequence,
TokenFetcher,
TokenBalanceFetcher
Token,
TokenBalance
}
@behaviour BlockFetcher
@ -130,20 +130,20 @@ defmodule Indexer.BlockFetcher.Catchup do
block_number = Map.fetch!(address_hash_to_block_number, to_string(address_hash))
%{address_hash: address_hash, block_number: block_number}
end)
|> CoinBalanceFetcher.async_fetch_balances()
|> CoinBalance.Fetcher.async_fetch_balances()
transaction_hashes
|> Enum.map(fn transaction_hash ->
block_number = Map.fetch!(transaction_hash_to_block_number, to_string(transaction_hash))
%{block_number: block_number, hash: transaction_hash}
end)
|> InternalTransactionFetcher.async_fetch(10_000)
|> InternalTransaction.Fetcher.async_fetch(10_000)
tokens
|> Enum.map(& &1.contract_address_hash)
|> TokenFetcher.async_fetch()
|> Token.Fetcher.async_fetch()
TokenBalanceFetcher.async_fetch(token_balances)
TokenBalance.Fetcher.async_fetch(token_balances)
end
defp stream_fetch_and_import(%__MODULE__{blocks_concurrency: blocks_concurrency} = state, sequence)

@ -0,0 +1,110 @@
defmodule Indexer.BlockFetcher.Catchup.BoundIntervalSupervisor do
@moduledoc """
Supervises the `Indexer.BlockerFetcher.Catchup` with exponential backoff for restarts.
"""
# NOT a `Supervisor` because of the `Task` restart strategies are custom.
use GenServer
require Logger
alias Indexer.{BlockFetcher, BoundInterval}
alias Indexer.BlockFetcher.Catchup
# milliseconds
@block_interval 5_000
@enforce_keys ~w(bound_interval catchup)a
defstruct bound_interval: nil,
catchup: %Catchup{},
task: nil
def child_spec(arg) do
# The `child_spec` from `use Supervisor` because the one from `use GenServer` will set the `type` to `:worker`
# instead of `:supervisor` and use the wrong shutdown timeout
Supervisor.child_spec(%{id: __MODULE__, start: {__MODULE__, :start_link, [arg]}, type: :supervisor}, [])
end
@doc """
Starts supervisor of `Indexer.BlockerFetcher.Catchup` and `Indexer.BlockFetcher.Realtime`.
For `named_arguments` see `Indexer.BlockFetcher.new/1`. For `t:GenServer.options/0` see `GenServer.start_link/3`.
"""
@spec start_link([named_arguments :: list() | GenServer.options()]) :: {:ok, pid}
def start_link([named_arguments, gen_server_options]) when is_map(named_arguments) and is_list(gen_server_options) do
GenServer.start_link(__MODULE__, named_arguments, gen_server_options)
end
@impl GenServer
def init(named_arguments) do
state = new(named_arguments)
send(self(), :catchup_index)
{:ok, state}
end
defp new(%{block_fetcher: common_block_fetcher} = named_arguments) do
block_fetcher = %BlockFetcher{common_block_fetcher | broadcast: false, callback_module: Catchup}
block_interval = Map.get(named_arguments, :block_interval, @block_interval)
minimum_interval = div(block_interval, 2)
bound_interval = BoundInterval.within(minimum_interval..(minimum_interval * 10))
%__MODULE__{
catchup: %Catchup{block_fetcher: block_fetcher},
bound_interval: bound_interval
}
end
@impl GenServer
def handle_info(:catchup_index, %__MODULE__{catchup: %Catchup{} = catchup} = state) do
{:noreply,
%__MODULE__{state | task: Task.Supervisor.async_nolink(Catchup.TaskSupervisor, Catchup, :task, [catchup])}}
end
def handle_info(
{ref, %{first_block_number: first_block_number, missing_block_count: missing_block_count}},
%__MODULE__{
bound_interval: bound_interval,
task: %Task{ref: ref}
} = state
)
when is_integer(missing_block_count) do
new_bound_interval =
case missing_block_count do
0 ->
Logger.info("Index already caught up in #{first_block_number}-0")
BoundInterval.increase(bound_interval)
_ ->
Logger.info("Index had to catch up #{missing_block_count} blocks in #{first_block_number}-0")
BoundInterval.decrease(bound_interval)
end
Process.demonitor(ref, [:flush])
interval = new_bound_interval.current
Logger.info(fn ->
"Checking if index needs to catch up in #{interval}ms"
end)
Process.send_after(self(), :catchup_index, interval)
{:noreply, %__MODULE__{state | bound_interval: new_bound_interval, task: nil}}
end
def handle_info(
{:DOWN, ref, :process, pid, reason},
%__MODULE__{task: %Task{pid: pid, ref: ref}} = state
) do
Logger.error(fn -> "Catchup index stream exited with reason (#{inspect(reason)}). Restarting" end)
send(self(), :catchup_index)
{:noreply, %__MODULE__{state | task: nil}}
end
end

@ -1,110 +1,38 @@
defmodule Indexer.BlockFetcher.Catchup.Supervisor do
@moduledoc """
Supervises the `Indexer.BlockerFetcher.Catchup` with exponential backoff for restarts.
Supervises `Indexer.BlockFetcher.Catchup.TaskSupervisor` and `Indexer.BlockFetcher.Catchup.BoundIntervalSupervisor`
"""
# NOT a `Supervisor` because of the `Task` restart strategies are custom.
use GenServer
use Supervisor
require Logger
alias Indexer.BlockFetcher.Catchup.BoundIntervalSupervisor
alias Indexer.{BlockFetcher, BoundInterval}
alias Indexer.BlockFetcher.Catchup
# milliseconds
@block_interval 5_000
@enforce_keys ~w(bound_interval catchup)a
defstruct bound_interval: nil,
catchup: %Catchup{},
task: nil
def child_spec(arg) do
# The `child_spec` from `use Supervisor` because the one from `use GenServer` will set the `type` to `:worker`
# instead of `:supervisor` and use the wrong shutdown timeout
Supervisor.child_spec(%{id: __MODULE__, start: {__MODULE__, :start_link, [arg]}, type: :supervisor}, [])
end
@doc """
Starts supervisor of `Indexer.BlockerFetcher.Catchup` and `Indexer.BlockFetcher.Realtime`.
For `named_arguments` see `Indexer.BlockFetcher.new/1`. For `t:GenServer.options/0` see `GenServer.start_link/3`.
"""
@spec start_link([named_arguments :: list() | GenServer.options()]) :: {:ok, pid}
def start_link([named_arguments, gen_server_options]) when is_map(named_arguments) and is_list(gen_server_options) do
GenServer.start_link(__MODULE__, named_arguments, gen_server_options)
def child_spec([init_arguments]) do
child_spec([init_arguments, []])
end
@impl GenServer
def init(named_arguments) do
state = new(named_arguments)
send(self(), :catchup_index)
{:ok, state}
end
defp new(%{block_fetcher: common_block_fetcher} = named_arguments) do
block_fetcher = %BlockFetcher{common_block_fetcher | broadcast: false, callback_module: Catchup}
block_interval = Map.get(named_arguments, :block_interval, @block_interval)
minimum_interval = div(block_interval, 2)
bound_interval = BoundInterval.within(minimum_interval..(minimum_interval * 10))
%__MODULE__{
catchup: %Catchup{block_fetcher: block_fetcher},
bound_interval: bound_interval
def child_spec([_init_arguments, _gen_server_options] = start_link_arguments) do
default = %{
id: __MODULE__,
start: {__MODULE__, :start_link, start_link_arguments},
type: :supervisor
}
end
@impl GenServer
def handle_info(:catchup_index, %__MODULE__{catchup: %Catchup{} = catchup} = state) do
{:noreply,
%__MODULE__{state | task: Task.Supervisor.async_nolink(Indexer.TaskSupervisor, Catchup, :task, [catchup])}}
Supervisor.child_spec(default, [])
end
def handle_info(
{ref, %{first_block_number: first_block_number, missing_block_count: missing_block_count}},
%__MODULE__{
bound_interval: bound_interval,
task: %Task{ref: ref}
} = state
)
when is_integer(missing_block_count) do
new_bound_interval =
case missing_block_count do
0 ->
Logger.info("Index already caught up in #{first_block_number}-0")
BoundInterval.increase(bound_interval)
_ ->
Logger.info("Index had to catch up #{missing_block_count} blocks in #{first_block_number}-0")
BoundInterval.decrease(bound_interval)
def start_link(arguments, gen_server_options \\ []) do
Supervisor.start_link(__MODULE__, arguments, gen_server_options)
end
Process.demonitor(ref, [:flush])
interval = new_bound_interval.current
Logger.info(fn ->
"Checking if index needs to catch up in #{interval}ms"
end)
Process.send_after(self(), :catchup_index, interval)
{:noreply, %__MODULE__{state | bound_interval: new_bound_interval, task: nil}}
end
def handle_info(
{:DOWN, ref, :process, pid, reason},
%__MODULE__{task: %Task{pid: pid, ref: ref}} = state
) do
Logger.error(fn -> "Catchup index stream exited with reason (#{inspect(reason)}). Restarting" end)
send(self(), :catchup_index)
{:noreply, %__MODULE__{state | task: nil}}
@impl Supervisor
def init(bound_interval_supervisor_arguments) do
Supervisor.init(
[
{Task.Supervisor, name: Indexer.BlockFetcher.Catchup.TaskSupervisor},
{BoundIntervalSupervisor, [bound_interval_supervisor_arguments, [name: BoundIntervalSupervisor]]}
],
strategy: :one_for_one
)
end
end

@ -12,7 +12,7 @@ defmodule Indexer.BlockFetcher.Realtime do
alias EthereumJSONRPC.Subscription
alias Explorer.Chain
alias Indexer.{AddressExtraction, BlockFetcher, TokenBalances, TokenFetcher}
alias Indexer.{AddressExtraction, BlockFetcher, Token, TokenBalances}
@behaviour BlockFetcher
@ -154,7 +154,7 @@ defmodule Indexer.BlockFetcher.Realtime do
defp async_import_remaining_block_data(%{tokens: tokens}) do
tokens
|> Enum.map(& &1.contract_address_hash)
|> TokenFetcher.async_fetch()
|> Token.Fetcher.async_fetch()
end
defp internal_transactions(

@ -158,6 +158,19 @@ defmodule Indexer.BufferedTask do
GenServer.call(server, {:buffer, entries}, timeout)
end
def child_spec([init_arguments]) do
child_spec([init_arguments, []])
end
def child_spec([_init_arguments, _gen_server_options] = start_link_arguments) do
default = %{
id: __MODULE__,
start: {__MODULE__, :start_link, start_link_arguments}
}
Supervisor.child_spec(default, [])
end
@doc false
def debug_count(server) do
GenServer.call(server, :debug_count)
@ -198,7 +211,7 @@ defmodule Indexer.BufferedTask do
| {:state, state}
]}
) :: {:ok, pid()} | {:error, {:already_started, pid()}}
def start_link([{module, base_init_opts}, genserver_opts]) do
def start_link({module, base_init_opts}, genserver_opts \\ []) do
default_opts = Application.get_all_env(:indexer)
init_opts = Keyword.merge(default_opts, base_init_opts)

@ -1,4 +1,4 @@
defmodule Indexer.CoinBalanceFetcher do
defmodule Indexer.CoinBalance.Fetcher do
@moduledoc """
Fetches `t:Explorer.Chain.Address.CoinBalance.t/0` and updates `t:Explorer.Chain.Address.t/0` `fetched_coin_balance` and
`fetched_coin_balance_block_number` to value at max `t:Explorer.Chain.Address.CoinBalance.t/0` `block_number` for the given `t:Explorer.Chain.Address.t/` `hash`.
@ -19,7 +19,7 @@ defmodule Indexer.CoinBalanceFetcher do
max_batch_size: 500,
max_concurrency: 4,
init_chunk_size: 1000,
task_supervisor: Indexer.TaskSupervisor
task_supervisor: Indexer.CoinBalance.TaskSupervisor
]
@doc """

@ -0,0 +1,38 @@
defmodule Indexer.CoinBalance.Supervisor do
@moduledoc """
Supervises `Indexer.CoinBalance.Fetcher` and its batch tasks through `Indexer.CoinBalance.TaskSupervisor`
"""
use Supervisor
alias Indexer.CoinBalance.Fetcher
def child_spec([init_arguments]) do
child_spec([init_arguments, []])
end
def child_spec([_init_arguments, _gen_server_options] = start_link_arguments) do
default = %{
id: __MODULE__,
start: {__MODULE__, :start_link, start_link_arguments},
type: :supervisor
}
Supervisor.child_spec(default, [])
end
def start_link(arguments, gen_server_options \\ []) do
Supervisor.start_link(__MODULE__, arguments, Keyword.put_new(gen_server_options, :name, __MODULE__))
end
@impl Supervisor
def init(fetcher_arguments) do
Supervisor.init(
[
{Task.Supervisor, name: Indexer.CoinBalance.TaskSupervisor},
{Fetcher, [fetcher_arguments, [name: Fetcher]]}
],
strategy: :one_for_one
)
end
end

@ -1,4 +1,4 @@
defmodule Indexer.InternalTransactionFetcher do
defmodule Indexer.InternalTransaction.Fetcher do
@moduledoc """
Fetches and indexes `t:Explorer.Chain.InternalTransaction.t/0`.
@ -8,7 +8,7 @@ defmodule Indexer.InternalTransactionFetcher do
require Logger
alias Explorer.Chain
alias Indexer.{CoinBalanceFetcher, AddressExtraction, BufferedTask}
alias Indexer.{AddressExtraction, BufferedTask, CoinBalance}
alias Explorer.Chain.{Block, Hash}
@behaviour BufferedTask
@ -20,7 +20,7 @@ defmodule Indexer.InternalTransactionFetcher do
max_concurrency: @max_concurrency,
max_batch_size: @max_batch_size,
init_chunk_size: 5000,
task_supervisor: Indexer.TaskSupervisor
task_supervisor: Indexer.InternalTransaction.TaskSupervisor
]
@doc """
@ -109,7 +109,7 @@ defmodule Indexer.InternalTransactionFetcher do
block_number = Map.fetch!(address_hash_to_block_number, to_string(address_hash))
%{address_hash: address_hash, block_number: block_number}
end)
|> CoinBalanceFetcher.async_fetch_balances()
|> CoinBalance.Fetcher.async_fetch_balances()
else
{:error, step, reason, _changes_so_far} ->
Logger.debug(fn ->

@ -0,0 +1,39 @@
defmodule Indexer.InternalTransaction.Supervisor do
@moduledoc """
Supervises `Indexer.InternalTransaction.Fetcher` and its batch tasks through
`Indexer.InternalTransaction.TaskSupervisor`.
"""
use Supervisor
alias Indexer.InternalTransaction.Fetcher
def child_spec([init_arguments]) do
child_spec([init_arguments, []])
end
def child_spec([_init_arguments, _gen_server_options] = start_link_arguments) do
default = %{
id: __MODULE__,
start: {__MODULE__, :start_link, start_link_arguments},
type: :supervisor
}
Supervisor.child_spec(default, [])
end
def start_link(arguments, gen_server_options \\ []) do
Supervisor.start_link(__MODULE__, arguments, Keyword.put_new(gen_server_options, :name, __MODULE__))
end
@impl Supervisor
def init(fetcher_arguments) do
Supervisor.init(
[
{Task.Supervisor, name: Indexer.InternalTransaction.TaskSupervisor},
{Fetcher, [fetcher_arguments, [name: Fetcher]]}
],
strategy: :one_for_one
)
end
end

@ -1,4 +1,4 @@
defmodule Indexer.PendingTransactionFetcher do
defmodule Indexer.PendingTransaction.Fetcher do
@moduledoc """
Fetches pending transactions and imports them.
@ -12,7 +12,7 @@ defmodule Indexer.PendingTransactionFetcher do
import EthereumJSONRPC, only: [fetch_pending_transactions: 1]
alias Explorer.Chain
alias Indexer.{AddressExtraction, PendingTransactionFetcher}
alias Indexer.{AddressExtraction, PendingTransaction}
# milliseconds
@default_interval 1_000
@ -21,7 +21,18 @@ defmodule Indexer.PendingTransactionFetcher do
json_rpc_named_arguments: [],
task: nil
@gen_server_options ~w(debug name spawn_opt timeout)a
def child_spec([init_arguments]) do
child_spec([init_arguments, []])
end
def child_spec([_init_arguments, _gen_server_options] = start_link_arguments) do
default = %{
id: __MODULE__,
start: {__MODULE__, :start_link, start_link_arguments}
}
Supervisor.child_spec(default, [])
end
@doc """
Starts the pending transaction fetcher.
@ -41,19 +52,19 @@ defmodule Indexer.PendingTransactionFetcher do
be terminated and the start function will return `{:error, :timeout}`
"""
def start_link(opts) do
GenServer.start_link(__MODULE__, Keyword.drop(opts, @gen_server_options), Keyword.take(opts, @gen_server_options))
def start_link(arguments, gen_server_options \\ []) do
GenServer.start_link(__MODULE__, arguments, gen_server_options)
end
@impl GenServer
def init(opts) do
def init(opts) when is_list(opts) do
opts =
:indexer
|> Application.get_all_env()
|> Keyword.merge(opts)
state =
%PendingTransactionFetcher{
%PendingTransaction.Fetcher{
json_rpc_named_arguments: Keyword.fetch!(opts, :json_rpc_named_arguments),
interval: opts[:pending_transaction_interval] || @default_interval
}
@ -63,12 +74,12 @@ defmodule Indexer.PendingTransactionFetcher do
end
@impl GenServer
def handle_info(:fetch, %PendingTransactionFetcher{} = state) do
task = Task.Supervisor.async_nolink(Indexer.TaskSupervisor, fn -> task(state) end)
{:noreply, %PendingTransactionFetcher{state | task: task}}
def handle_info(:fetch, %PendingTransaction.Fetcher{} = state) do
task = Task.Supervisor.async_nolink(PendingTransaction.TaskSupervisor, fn -> task(state) end)
{:noreply, %PendingTransaction.Fetcher{state | task: task}}
end
def handle_info({ref, _}, %PendingTransactionFetcher{task: %Task{ref: ref}} = state) do
def handle_info({ref, _}, %PendingTransaction.Fetcher{task: %Task{ref: ref}} = state) do
Process.demonitor(ref, [:flush])
{:noreply, schedule_fetch(state)}
@ -76,19 +87,19 @@ defmodule Indexer.PendingTransactionFetcher do
def handle_info(
{:DOWN, ref, :process, pid, reason},
%PendingTransactionFetcher{task: %Task{pid: pid, ref: ref}} = state
%PendingTransaction.Fetcher{task: %Task{pid: pid, ref: ref}} = state
) do
Logger.error(fn -> "pending transaction fetcher task exited due to #{inspect(reason)}. Rescheduling." end)
{:noreply, schedule_fetch(state)}
end
defp schedule_fetch(%PendingTransactionFetcher{interval: interval} = state) do
defp schedule_fetch(%PendingTransaction.Fetcher{interval: interval} = state) do
Process.send_after(self(), :fetch, interval)
%PendingTransactionFetcher{state | task: nil}
%PendingTransaction.Fetcher{state | task: nil}
end
defp task(%PendingTransactionFetcher{json_rpc_named_arguments: json_rpc_named_arguments} = _state) do
defp task(%PendingTransaction.Fetcher{json_rpc_named_arguments: json_rpc_named_arguments} = _state) do
case fetch_pending_transactions(json_rpc_named_arguments) do
{:ok, transactions_params} ->
addresses_params = AddressExtraction.extract_addresses(%{transactions: transactions_params}, pending: true)

@ -0,0 +1,39 @@
defmodule Indexer.PendingTransaction.Supervisor do
@moduledoc """
Supervises `Indexer.PendingTransaction.Fetcher` and its batch tasks through
`Indexer.PendingTransaction.TaskSupervisor`.
"""
use Supervisor
alias Indexer.PendingTransaction.Fetcher
def child_spec([init_arguments]) do
child_spec([init_arguments, []])
end
def child_spec([_init_arguments, _gen_server_options] = start_link_arguments) do
default = %{
id: __MODULE__,
start: {__MODULE__, :start_link, start_link_arguments},
type: :supervisor
}
Supervisor.child_spec(default, [])
end
def start_link(arguments, gen_server_options \\ []) do
Supervisor.start_link(__MODULE__, arguments, Keyword.put_new(gen_server_options, :name, __MODULE__))
end
@impl Supervisor
def init(fetcher_arguments) do
Supervisor.init(
[
{Task.Supervisor, name: Indexer.PendingTransaction.TaskSupervisor},
{Fetcher, [fetcher_arguments, [name: Fetcher]]}
],
strategy: :one_for_one
)
end
end

@ -1,4 +1,4 @@
defmodule Indexer.TokenFetcher do
defmodule Indexer.Token.Fetcher do
@moduledoc """
Fetches information about a token.
"""
@ -16,7 +16,7 @@ defmodule Indexer.TokenFetcher do
max_batch_size: 1,
max_concurrency: 10,
init_chunk_size: 1,
task_supervisor: Indexer.TaskSupervisor
task_supervisor: Indexer.Token.TaskSupervisor
]
@contract_abi [

@ -0,0 +1,38 @@
defmodule Indexer.Token.Supervisor do
@moduledoc """
Supervises `Indexer.Token.Fetcher` and its batch tasks through `Indexer.Token.TaskSupervisor`.
"""
use Supervisor
alias Indexer.Token.Fetcher
def child_spec([init_arguments]) do
child_spec([init_arguments, []])
end
def child_spec([_init_arguments, _gen_server_options] = start_link_arguments) do
default = %{
id: __MODULE__,
start: {__MODULE__, :start_link, start_link_arguments},
type: :supervisor
}
Supervisor.child_spec(default, [])
end
def start_link(arguments, gen_server_options \\ []) do
Supervisor.start_link(__MODULE__, arguments, Keyword.put_new(gen_server_options, :name, __MODULE__))
end
@impl Supervisor
def init(fetcher_arguments) do
Supervisor.init(
[
{Task.Supervisor, name: Indexer.Token.TaskSupervisor},
{Fetcher, [fetcher_arguments, [name: Fetcher]]}
],
strategy: :one_for_one
)
end
end

@ -1,4 +1,4 @@
defmodule Indexer.TokenBalanceFetcher do
defmodule Indexer.TokenBalance.Fetcher do
@moduledoc """
Fetches the token balances values.
"""
@ -14,7 +14,7 @@ defmodule Indexer.TokenBalanceFetcher do
max_batch_size: 1,
max_concurrency: 10,
init_chunk_size: 1,
task_supervisor: Indexer.TaskSupervisor
task_supervisor: Indexer.TokenBalance.TaskSupervisor
]
def async_fetch(token_balances_params) do

@ -0,0 +1,38 @@
defmodule Indexer.TokenBalance.Supervisor do
@moduledoc """
Supervises `Indexer.TokenBalance.Fetcher` and its batch tasks through `Indexer.TokenBalance.TaskSupervisor`
"""
use Supervisor
alias Indexer.TokenBalance.Fetcher
def child_spec([init_arguments]) do
child_spec([init_arguments, []])
end
def child_spec([_init_arguments, _gen_server_options] = start_link_arguments) do
default = %{
id: __MODULE__,
start: {__MODULE__, :start_link, start_link_arguments},
type: :supervisor
}
Supervisor.child_spec(default, [])
end
def start_link(arguments, gen_server_options \\ []) do
Supervisor.start_link(__MODULE__, arguments, Keyword.put_new(gen_server_options, :name, __MODULE__))
end
@impl Supervisor
def init(fetcher_arguments) do
Supervisor.init(
[
{Task.Supervisor, name: Indexer.TokenBalance.TaskSupervisor},
{Fetcher, [fetcher_arguments, [name: Fetcher]]}
],
strategy: :one_for_one
)
end
end

@ -1,4 +1,4 @@
defmodule Indexer.BlockFetcher.Catchup.SupervisorTest do
defmodule Indexer.BlockFetcher.Catchup.BoundIntervalSupervisorTest do
# `async: false` due to use of named GenServer
use EthereumJSONRPC.Case, async: false
use Explorer.DataCase
@ -7,16 +7,7 @@ defmodule Indexer.BlockFetcher.Catchup.SupervisorTest do
import EthereumJSONRPC, only: [integer_to_quantity: 1]
alias Explorer.Chain.Block
alias Indexer.{
CoinBalanceFetcherCase,
BlockFetcher,
BoundInterval,
InternalTransactionFetcherCase,
TokenFetcherCase,
TokenBalanceFetcherCase
}
alias Indexer.{BlockFetcher, BoundInterval, CoinBalance, InternalTransaction, Token, TokenBalance}
alias Indexer.BlockFetcher.Catchup
@moduletag capture_log: true
@ -206,15 +197,14 @@ defmodule Indexer.BlockFetcher.Catchup.SupervisorTest do
assert Repo.aggregate(Block, :count, :hash) == 0
start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor})
CoinBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
InternalTransactionFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
TokenFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
TokenBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
InternalTransaction.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
TokenBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
start_supervised!(
{Catchup.Supervisor, [%{block_fetcher: %BlockFetcher{json_rpc_named_arguments: json_rpc_named_arguments}}, []]}
)
BlockFetcher.Catchup.Supervisor.Case.start_supervised!(%{
block_fetcher: %BlockFetcher{json_rpc_named_arguments: json_rpc_named_arguments}
})
first_catchup_block_number = latest_block_number - 1
@ -254,16 +244,18 @@ defmodule Indexer.BlockFetcher.Catchup.SupervisorTest do
{:ok, %{"number" => "0x1"}}
end)
start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor})
CoinBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
InternalTransactionFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
TokenFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
TokenBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
start_supervised!({Task.Supervisor, name: Indexer.BlockFetcher.Catchup.TaskSupervisor})
CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
InternalTransaction.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
TokenBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
# from `setup :state`
assert_received :catchup_index
assert {:noreply, %Catchup.Supervisor{catchup: %Catchup{}, task: %Task{pid: pid, ref: ref}} = catchup_index_state} =
Catchup.Supervisor.handle_info(:catchup_index, state)
assert {:noreply,
%Catchup.BoundIntervalSupervisor{catchup: %Catchup{}, task: %Task{pid: pid, ref: ref}} =
catchup_index_state} = Catchup.BoundIntervalSupervisor.handle_info(:catchup_index, state)
assert_receive {^ref, %{first_block_number: 0, missing_block_count: 0}} = message
@ -272,7 +264,7 @@ defmodule Indexer.BlockFetcher.Catchup.SupervisorTest do
# DOWN is not flushed
assert {:messages, [{:DOWN, ^ref, :process, ^pid, :normal}]} = Process.info(self(), :messages)
assert {:noreply, message_state} = Catchup.Supervisor.handle_info(message, catchup_index_state)
assert {:noreply, message_state} = Catchup.BoundIntervalSupervisor.handle_info(message, catchup_index_state)
# DOWN is flushed
assert {:messages, []} = Process.info(self(), :messages)
@ -325,16 +317,18 @@ defmodule Indexer.BlockFetcher.Catchup.SupervisorTest do
{:ok, [%{id: id, jsonrpc: "2.0", result: "0x0"}]}
end)
start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor})
CoinBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
InternalTransactionFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
TokenFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
TokenBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
start_supervised({Task.Supervisor, name: Indexer.BlockFetcher.Catchup.TaskSupervisor})
CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
InternalTransaction.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
TokenBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
# from `setup :state`
assert_received :catchup_index
assert {:noreply, %Catchup.Supervisor{catchup: %Catchup{}, task: %Task{pid: pid, ref: ref}} = catchup_index_state} =
Catchup.Supervisor.handle_info(:catchup_index, state)
assert {:noreply,
%Catchup.BoundIntervalSupervisor{catchup: %Catchup{}, task: %Task{pid: pid, ref: ref}} =
catchup_index_state} = Catchup.BoundIntervalSupervisor.handle_info(:catchup_index, state)
# 2 blocks are missing, but latest is assumed to be handled by realtime_index, so only 1 is missing for
# catchup_index
@ -345,7 +339,7 @@ defmodule Indexer.BlockFetcher.Catchup.SupervisorTest do
# DOWN is not flushed
assert {:messages, [{:DOWN, ^ref, :process, ^pid, :normal}]} = Process.info(self(), :messages)
assert {:noreply, message_state} = Catchup.Supervisor.handle_info(message, catchup_index_state)
assert {:noreply, message_state} = Catchup.BoundIntervalSupervisor.handle_info(message, catchup_index_state)
# DOWN is flushed
assert {:messages, []} = Process.info(self(), :messages)
@ -357,7 +351,9 @@ defmodule Indexer.BlockFetcher.Catchup.SupervisorTest do
above_minimum_state = update_in(catchup_index_state.bound_interval, &BoundInterval.increase/1)
assert above_minimum_state.bound_interval.current > message_state.bound_interval.minimum
assert {:noreply, above_minimum_message_state} = Catchup.Supervisor.handle_info(message, above_minimum_state)
assert {:noreply, above_minimum_message_state} =
Catchup.BoundIntervalSupervisor.handle_info(message, above_minimum_state)
assert above_minimum_message_state.bound_interval.current < above_minimum_state.bound_interval.current
end
@ -365,7 +361,9 @@ defmodule Indexer.BlockFetcher.Catchup.SupervisorTest do
defp state(%{json_rpc_named_arguments: json_rpc_named_arguments}) do
{:ok, state} =
Catchup.Supervisor.init(%{block_fetcher: %BlockFetcher{json_rpc_named_arguments: json_rpc_named_arguments}})
Catchup.BoundIntervalSupervisor.init(%{
block_fetcher: %BlockFetcher{json_rpc_named_arguments: json_rpc_named_arguments}
})
%{state: state}
end

@ -7,7 +7,7 @@ defmodule Indexer.BlockFetcher.RealtimeTest do
alias Explorer.Chain.{Address, Block}
alias Indexer.{BlockFetcher, Sequence}
alias Indexer.BlockFetcher.Realtime
alias Indexer.TokenFetcherCase
alias Indexer.Token
@moduletag capture_log: true
@ -45,8 +45,7 @@ defmodule Indexer.BlockFetcher.RealtimeTest do
{:ok, sequence} = Sequence.start_link(ranges: [], step: 2)
Sequence.cap(sequence)
start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor})
TokenFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do
EthereumJSONRPC.Mox

@ -10,14 +10,12 @@ defmodule Indexer.BlockFetcherTest do
alias Explorer.Chain.{Address, Block, Log, Transaction, Wei}
alias Indexer.{
CoinBalanceFetcher,
CoinBalanceFetcherCase,
CoinBalance,
BlockFetcher,
BufferedTask,
InternalTransactionFetcher,
InternalTransactionFetcherCase,
TokenFetcherCase,
TokenBalanceFetcherCase
InternalTransaction,
Token,
TokenBalance
}
@moduletag capture_log: true
@ -48,11 +46,10 @@ defmodule Indexer.BlockFetcherTest do
describe "import_range/2" do
setup %{json_rpc_named_arguments: json_rpc_named_arguments} do
start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor})
CoinBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
InternalTransactionFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
TokenFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
TokenBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
InternalTransaction.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
TokenBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
%{
block_fetcher: %BlockFetcher{
@ -223,8 +220,8 @@ defmodule Indexer.BlockFetcherTest do
transactions: []
}, :more}} = result
wait_for_tasks(InternalTransactionFetcher)
wait_for_tasks(CoinBalanceFetcher)
wait_for_tasks(InternalTransaction.Fetcher)
wait_for_tasks(CoinBalance.Fetcher)
assert Repo.aggregate(Block, :count, :hash) == 1
assert Repo.aggregate(Address, :count, :hash) == 1
@ -472,8 +469,8 @@ defmodule Indexer.BlockFetcherTest do
]
}} = BlockFetcher.fetch_and_import_range(block_fetcher, block_number..block_number)
wait_for_tasks(InternalTransactionFetcher)
wait_for_tasks(CoinBalanceFetcher)
wait_for_tasks(InternalTransaction.Fetcher)
wait_for_tasks(CoinBalance.Fetcher)
assert Repo.aggregate(Block, :count, :hash) == 1
assert Repo.aggregate(Address, :count, :hash) == 5
@ -559,8 +556,8 @@ defmodule Indexer.BlockFetcherTest do
]
}, :more}} = BlockFetcher.fetch_and_import_range(block_fetcher, block_number..block_number)
wait_for_tasks(InternalTransactionFetcher)
wait_for_tasks(CoinBalanceFetcher)
wait_for_tasks(InternalTransaction.Fetcher)
wait_for_tasks(CoinBalance.Fetcher)
assert Repo.aggregate(Block, :count, :hash) == 1
assert Repo.aggregate(Address, :count, :hash) == 2

@ -11,13 +11,15 @@ defmodule Indexer.BufferedTaskTest do
start_supervised(
{BufferedTask,
[
{callback_module,
state: nil,
task_supervisor: BufferedTaskSup,
flush_interval: 50,
max_batch_size: @max_batch_size,
max_concurrency: 2,
init_chunk_size: @max_batch_size * 2}}
init_chunk_size: @max_batch_size * 2}
]}
)
end

@ -1,4 +1,4 @@
defmodule Indexer.CoinBalanceFetcherTest do
defmodule Indexer.CoinBalance.FetcherTest do
# MUST be `async: false` so that {:shared, pid} is set for connection to allow CoinBalanceFetcher's self-send to have
# connection allowed immediately.
use EthereumJSONRPC.Case, async: false
@ -8,8 +8,7 @@ defmodule Indexer.CoinBalanceFetcherTest do
import Mox
alias Explorer.Chain.{Address, Hash, Wei}
alias Explorer.Chain.Address.CoinBalance
alias Indexer.{CoinBalanceFetcher, CoinBalanceFetcherCase}
alias Indexer.CoinBalance
@moduletag :capture_log
@ -69,7 +68,7 @@ defmodule Indexer.CoinBalanceFetcherTest do
assert miner.fetched_coin_balance == nil
assert miner.fetched_coin_balance_block_number == nil
CoinBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
fetched_address =
wait(fn ->
@ -122,7 +121,10 @@ defmodule Indexer.CoinBalanceFetcherTest do
block = insert(:block, miner: miner, number: block_number)
insert(:unfetched_balance, address_hash: miner.hash, block_number: block_number)
CoinBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments, max_batch_size: 2)
CoinBalance.Supervisor.Case.start_supervised!(
json_rpc_named_arguments: json_rpc_named_arguments,
max_batch_size: 2
)
fetched_address =
wait(fn ->
@ -178,9 +180,9 @@ defmodule Indexer.CoinBalanceFetcherTest do
end)
end
CoinBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
assert :ok = CoinBalanceFetcher.async_fetch_balances([%{address_hash: hash, block_number: block_number}])
assert :ok = CoinBalance.Fetcher.async_fetch_balances([%{address_hash: hash, block_number: block_number}])
address =
wait(fn ->
@ -251,21 +253,21 @@ defmodule Indexer.CoinBalanceFetcherTest do
params_list = Enum.map(block_quantities, &%{block_quantity: &1, hash_data: hash_data})
case CoinBalanceFetcher.run(params_list, 0, json_rpc_named_arguments) do
case CoinBalance.Fetcher.run(params_list, 0, json_rpc_named_arguments) do
:ok ->
balances = Repo.all(from(balance in CoinBalance, where: balance.address_hash == ^hash_data))
balances = Repo.all(from(balance in Address.CoinBalance, where: balance.address_hash == ^hash_data))
assert Enum.count(balances) == 2
balance_by_block_number =
Enum.into(balances, %{}, fn %CoinBalance{block_number: block_number} = balance ->
Enum.into(balances, %{}, fn %Address.CoinBalance{block_number: block_number} = balance ->
{block_number, balance}
end)
Enum.each(expected_balance_by_block_number, fn {block_number, expected_balance} ->
expected_value = %Explorer.Chain.Wei{value: Decimal.new(expected_balance)}
assert %CoinBalance{value: ^expected_value} = balance_by_block_number[block_number]
assert %Address.CoinBalance{value: ^expected_value} = balance_by_block_number[block_number]
end)
fetched_address = Repo.one!(from(address in Address, where: address.hash == ^hash_data))
@ -295,7 +297,7 @@ defmodule Indexer.CoinBalanceFetcherTest do
end)
end
assert CoinBalanceFetcher.run(
assert CoinBalance.Fetcher.run(
[%{block_quantity: "0x1", hash_data: hash_data}, %{block_quantity: "0x1", hash_data: hash_data}],
0,
json_rpc_named_arguments

@ -1,11 +1,11 @@
defmodule Indexer.InternalTransactionFetcherTest do
defmodule Indexer.InternalTransaction.FetcherTest do
use EthereumJSONRPC.Case, async: false
use Explorer.DataCase
import ExUnit.CaptureLog
import Mox
alias Indexer.{CoinBalanceFetcherCase, InternalTransactionFetcher, PendingTransactionFetcher}
alias Indexer.{CoinBalance, InternalTransaction, PendingTransaction}
# MUST use global mode because we aren't guaranteed to get PendingTransactionFetcher's pid back fast enough to `allow`
# it to use expectations and stubs from test's pid.
@ -15,7 +15,7 @@ defmodule Indexer.InternalTransactionFetcherTest do
@moduletag [capture_log: true, no_geth: true]
test "does not try to fetch pending transactions from Indexer.PendingTransactionFetcher", %{
test "does not try to fetch pending transactions from Indexer.PendingTransaction.Fetcher", %{
json_rpc_named_arguments: json_rpc_named_arguments
} do
if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do
@ -61,25 +61,24 @@ defmodule Indexer.InternalTransactionFetcherTest do
end
end
start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor})
CoinBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
start_supervised!({PendingTransactionFetcher, json_rpc_named_arguments: json_rpc_named_arguments})
CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
PendingTransaction.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
wait_for_results(fn ->
Repo.one!(from(transaction in Explorer.Chain.Transaction, where: is_nil(transaction.block_hash), limit: 1))
end)
hash_strings =
InternalTransactionFetcher.init([], fn hash_string, acc -> [hash_string | acc] end, json_rpc_named_arguments)
InternalTransaction.Fetcher.init([], fn hash_string, acc -> [hash_string | acc] end, json_rpc_named_arguments)
assert :ok = InternalTransactionFetcher.run(hash_strings, 0, json_rpc_named_arguments)
assert :ok = InternalTransaction.Fetcher.run(hash_strings, 0, json_rpc_named_arguments)
end
describe "init/2" do
test "does not buffer pending transactions", %{json_rpc_named_arguments: json_rpc_named_arguments} do
insert(:transaction)
assert InternalTransactionFetcher.init(
assert InternalTransaction.Fetcher.init(
[],
fn hash_string, acc -> [hash_string | acc] end,
json_rpc_named_arguments
@ -96,7 +95,7 @@ defmodule Indexer.InternalTransactionFetcherTest do
|> insert()
|> with_block(block)
assert InternalTransactionFetcher.init(
assert InternalTransaction.Fetcher.init(
[],
fn hash_string, acc -> [hash_string | acc] end,
json_rpc_named_arguments
@ -112,7 +111,7 @@ defmodule Indexer.InternalTransactionFetcherTest do
|> insert()
|> with_block(internal_transactions_indexed_at: DateTime.utc_now())
assert InternalTransactionFetcher.init(
assert InternalTransaction.Fetcher.init(
[],
fn hash_string, acc -> [hash_string | acc] end,
json_rpc_named_arguments
@ -128,14 +127,13 @@ defmodule Indexer.InternalTransactionFetcherTest do
end)
end
start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor})
CoinBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
insert(:transaction, hash: "0x03cd5899a63b6f6222afda8705d059fd5a7d126bcabe962fb654d9736e6bcafa")
log =
capture_log(fn ->
InternalTransactionFetcher.run(
InternalTransaction.Fetcher.run(
[
%{block_number: 1, hash_data: "0x03cd5899a63b6f6222afda8705d059fd5a7d126bcabe962fb654d9736e6bcafa"},
%{block_number: 1, hash_data: "0x03cd5899a63b6f6222afda8705d059fd5a7d126bcabe962fb654d9736e6bcafa"}
@ -160,13 +158,12 @@ defmodule Indexer.InternalTransactionFetcherTest do
end)
end
start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor})
CoinBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
# not a real transaction hash, so that it fails
insert(:transaction, hash: "0x0000000000000000000000000000000000000000000000000000000000000001")
assert InternalTransactionFetcher.run(
assert InternalTransaction.Fetcher.run(
[
%{block_number: 1, hash_data: "0x0000000000000000000000000000000000000000000000000000000000000001"},
%{block_number: 1, hash_data: "0x0000000000000000000000000000000000000000000000000000000000000001"}

@ -6,7 +6,7 @@ defmodule Indexer.PendingTransactionFetcherTest do
import Mox
alias Explorer.Chain.Transaction
alias Indexer.PendingTransactionFetcher
alias Indexer.PendingTransaction
# MUST use global mode because we aren't guaranteed to get PendingTransactionFetcher's pid back fast enough to `allow`
# it to use expectations and stubs from test's pid.
@ -58,8 +58,7 @@ defmodule Indexer.PendingTransactionFetcherTest do
assert Repo.aggregate(Transaction, :count, :hash) == 0
start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor})
start_supervised!({PendingTransactionFetcher, json_rpc_named_arguments: json_rpc_named_arguments})
PendingTransaction.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
wait_for_results(fn ->
Repo.one!(from(transaction in Transaction, where: is_nil(transaction.block_hash), limit: 1))

@ -1,12 +1,12 @@
defmodule Indexer.TokenFetcherTest do
defmodule Indexer.Token.FetcherTest do
use EthereumJSONRPC.Case
use Explorer.DataCase
import Mox
alias Explorer.{Chain, Repo}
alias Explorer.Chain
alias Explorer.Chain.Token
alias Indexer.TokenFetcher
alias Indexer.Token.Fetcher
setup :verify_on_exit!
@ -15,7 +15,7 @@ defmodule Indexer.TokenFetcherTest do
insert(:token, cataloged: true)
%Token{contract_address_hash: uncatalog_address} = insert(:token, cataloged: false)
assert TokenFetcher.init([], &[&1 | &2], json_rpc_named_arguments) == [uncatalog_address]
assert Fetcher.init([], &[&1 | &2], json_rpc_named_arguments) == [uncatalog_address]
end
end
@ -23,7 +23,7 @@ defmodule Indexer.TokenFetcherTest do
test "skips tokens that have already been cataloged", %{json_rpc_named_arguments: json_rpc_named_arguments} do
expect(EthereumJSONRPC.Mox, :json_rpc, 0, fn _, _ -> :ok end)
%Token{contract_address_hash: contract_address_hash} = insert(:token, cataloged: true)
assert TokenFetcher.run([contract_address_hash], 0, json_rpc_named_arguments) == :ok
assert Fetcher.run([contract_address_hash], 0, json_rpc_named_arguments) == :ok
end
test "catalogs tokens that haven't been cataloged", %{json_rpc_named_arguments: json_rpc_named_arguments} do
@ -60,7 +60,7 @@ defmodule Indexer.TokenFetcherTest do
end
)
assert TokenFetcher.run([contract_address_hash], 0, json_rpc_named_arguments) == :ok
assert Fetcher.run([contract_address_hash], 0, json_rpc_named_arguments) == :ok
expected_supply = Decimal.new(1_000_000_000_000_000_000)
@ -111,7 +111,7 @@ defmodule Indexer.TokenFetcherTest do
end
)
assert TokenFetcher.run([contract_address_hash], 0, json_rpc_named_arguments) == :ok
assert Fetcher.run([contract_address_hash], 0, json_rpc_named_arguments) == :ok
assert {:ok, %Token{cataloged: true, name: "0x0000"}} = Chain.token_from_address_hash(contract_address_hash)
end
end
@ -150,7 +150,7 @@ defmodule Indexer.TokenFetcherTest do
end
)
assert TokenFetcher.run([contract_address_hash], 0, json_rpc_named_arguments) == :ok
assert Fetcher.run([contract_address_hash], 0, json_rpc_named_arguments) == :ok
assert {:ok, %Token{cataloged: true, symbol: nil}} = Chain.token_from_address_hash(contract_address_hash)
end
end
@ -184,7 +184,7 @@ defmodule Indexer.TokenFetcherTest do
end
)
assert TokenFetcher.run([contract_address_hash], 0, json_rpc_named_arguments) == :ok
assert Fetcher.run([contract_address_hash], 0, json_rpc_named_arguments) == :ok
assert {:ok, %Token{cataloged: true, name: nil}} = Chain.token_from_address_hash(contract_address_hash)
end
end

@ -1,22 +1,23 @@
defmodule Indexer.TokenBalanceFetcherTest do
defmodule Indexer.TokenBalance.FetcherTest do
use EthereumJSONRPC.Case
use Explorer.DataCase
import Mox
alias Explorer.Chain.{Address.TokenBalance}
alias Indexer.TokenBalanceFetcher
alias Explorer.Chain.Address
alias Indexer.TokenBalance
setup :verify_on_exit!
setup :set_mox_global
describe "init/3" do
test "returns unfetched token balances" do
%TokenBalance{address_hash: address_hash} = insert(:token_balance, block_number: 1_000, value_fetched_at: nil)
%Address.TokenBalance{address_hash: address_hash} =
insert(:token_balance, block_number: 1_000, value_fetched_at: nil)
insert(:token_balance, value_fetched_at: DateTime.utc_now())
assert TokenBalanceFetcher.init([], &[&1.address_hash | &2], nil) == [address_hash]
assert TokenBalance.Fetcher.init([], &[&1.address_hash | &2], nil) == [address_hash]
end
end
@ -39,10 +40,10 @@ defmodule Indexer.TokenBalanceFetcherTest do
end
)
assert TokenBalanceFetcher.run([token_balance], 0, nil) == :ok
assert TokenBalance.Fetcher.run([token_balance], 0, nil) == :ok
token_balance_updated =
TokenBalance
Address.TokenBalance
|> Explorer.Repo.get_by(address_hash: token_balance.address_hash)
assert token_balance_updated.value == Decimal.new(1_000_000_000_000_000_000_000_000)

@ -0,0 +1,9 @@
defmodule Indexer.BlockFetcher.Catchup.Supervisor.Case do
alias Indexer.BlockFetcher.Catchup
def start_supervised!(fetcher_arguments) when is_map(fetcher_arguments) do
[fetcher_arguments]
|> Catchup.Supervisor.child_spec()
|> ExUnit.Callbacks.start_supervised!()
end
end

@ -0,0 +1,18 @@
defmodule Indexer.CoinBalance.Supervisor.Case do
alias Indexer.CoinBalance
def start_supervised!(fetcher_arguments \\ []) when is_list(fetcher_arguments) do
merged_fetcher_arguments =
Keyword.merge(
fetcher_arguments,
flush_interval: 50,
init_chunk_size: 1,
max_batch_size: 1,
max_concurrency: 1
)
[merged_fetcher_arguments]
|> CoinBalance.Supervisor.child_spec()
|> ExUnit.Callbacks.start_supervised!()
end
end

@ -1,16 +0,0 @@
defmodule Indexer.CoinBalanceFetcherCase do
alias Indexer.CoinBalanceFetcher
def start_supervised!(options \\ []) when is_list(options) do
options
|> Keyword.merge(
flush_interval: 50,
init_chunk_size: 1,
max_batch_size: 1,
max_concurrency: 1,
name: CoinBalanceFetcher
)
|> CoinBalanceFetcher.child_spec()
|> ExUnit.Callbacks.start_supervised!()
end
end

@ -0,0 +1,18 @@
defmodule Indexer.InternalTransaction.Supervisor.Case do
alias Indexer.InternalTransaction
def start_supervised!(fetcher_arguments \\ []) when is_list(fetcher_arguments) do
merged_fetcher_arguments =
Keyword.merge(
fetcher_arguments,
flush_interval: 50,
init_chunk_size: 1,
max_batch_size: 1,
max_concurrency: 1
)
[merged_fetcher_arguments]
|> InternalTransaction.Supervisor.child_spec()
|> ExUnit.Callbacks.start_supervised!()
end
end

@ -1,10 +0,0 @@
defmodule Indexer.InternalTransactionFetcherCase do
alias Indexer.InternalTransactionFetcher
def start_supervised!(options \\ []) when is_list(options) do
options
|> Keyword.put(:name, InternalTransactionFetcher)
|> InternalTransactionFetcher.child_spec()
|> ExUnit.Callbacks.start_supervised!()
end
end

@ -0,0 +1,18 @@
defmodule Indexer.PendingTransaction.Supervisor.Case do
alias Indexer.PendingTransaction
def start_supervised!(fetcher_arguments \\ []) when is_list(fetcher_arguments) do
merged_fetcher_arguments =
Keyword.merge(
fetcher_arguments,
flush_interval: 50,
init_chunk_size: 1,
max_batch_size: 1,
max_concurrency: 1
)
[merged_fetcher_arguments]
|> PendingTransaction.Supervisor.child_spec()
|> ExUnit.Callbacks.start_supervised!()
end
end

@ -0,0 +1,18 @@
defmodule Indexer.Token.Supervisor.Case do
alias Indexer.Token
def start_supervised!(fetcher_arguments \\ []) when is_list(fetcher_arguments) do
merged_fetcher_arguments =
Keyword.merge(
fetcher_arguments,
flush_interval: 50,
init_chunk_size: 1,
max_batch_size: 1,
max_concurrency: 1
)
[merged_fetcher_arguments]
|> Token.Supervisor.child_spec()
|> ExUnit.Callbacks.start_supervised!()
end
end

@ -0,0 +1,18 @@
defmodule Indexer.TokenBalance.Supervisor.Case do
alias Indexer.TokenBalance
def start_supervised!(fetcher_arguments \\ []) when is_list(fetcher_arguments) do
merged_fetcher_arguments =
Keyword.merge(
fetcher_arguments,
flush_interval: 50,
init_chunk_size: 1,
max_batch_size: 1,
max_concurrency: 1
)
[merged_fetcher_arguments]
|> TokenBalance.Supervisor.child_spec()
|> ExUnit.Callbacks.start_supervised!()
end
end

@ -1,16 +0,0 @@
defmodule Indexer.TokenBalanceFetcherCase do
alias Indexer.TokenBalanceFetcher
def start_supervised!(options \\ []) when is_list(options) do
options
|> Keyword.merge(
flush_interval: 50,
init_chunk_size: 1,
max_batch_size: 1,
max_concurrency: 1,
name: TokenBalanceFetcher
)
|> TokenBalanceFetcher.child_spec()
|> ExUnit.Callbacks.start_supervised!()
end
end

@ -1,16 +0,0 @@
defmodule Indexer.TokenFetcherCase do
alias Indexer.TokenFetcher
def start_supervised!(options \\ []) when is_list(options) do
options
|> Keyword.merge(
flush_interval: 50,
init_chunk_size: 1,
max_batch_size: 1,
max_concurrency: 1,
name: TokenFetcher
)
|> TokenFetcher.child_spec()
|> ExUnit.Callbacks.start_supervised!()
end
end
Loading…
Cancel
Save