Subscribe to newHeads for realtime indexer

Separate Catchup and Realtime indexer's fetching, so that Catchup
continues to use the old polling, `EthereumJSONRPC.json_rpc` while Realtime
subscribes with `EthereumJSONRPC.subscribe`.

Because Realtime gets notified of new blocks through the subscription,
it no longer has a Sequence, so Sequence has been removed from
`%BlockFetcher{}`.  There is also no block concurrency or block batch
size for Realtime, so that was removed from `%BlockFetcher{}` too and
moved to `%BlockFetcher.Catchup{}`.

Because Realtime requires a websocket to be connected, there is now an
`Indexer.BlockFetcher.Realtime.Supervisor` that supervises the
`Indexer.BlockFetcher.Realtime.WebSocket` and
`Indexer.BlockFetcher.Realtime`.  `Indexer.BlockFetcher.Realtime`
because it is waiting on notifications from is subscription becomes a
`GenServer` insteado of a `Task`, so it can be supervised normally.

BlockFetcher.Supervisor becomes a normal `Supervisor` of
`Catchup.Supervisor` and `Realtime.Supervisor`.

As Realtime is using `EthereumJSONRPC.subscribe`,
`subscribe_named_arguments` has been added as a sibling of
`json_rpc_named_arguments` in the `:indexer` `config`s.
pull/572/head
Luke Imhoff 6 years ago committed by Luke Imhoff
parent ae21b7e8b0
commit c8c53405fc
  1. 7
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/web_sockex.ex
  2. 2
      apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/case/geth.ex
  3. 2
      apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/case/parity.ex
  4. 7
      apps/indexer/config/dev/geth.exs
  5. 7
      apps/indexer/config/dev/parity.exs
  6. 7
      apps/indexer/config/prod/geth.exs
  7. 7
      apps/indexer/config/prod/parity.exs
  8. 3
      apps/indexer/lib/indexer/application.ex
  9. 220
      apps/indexer/lib/indexer/block_fetcher.ex
  10. 181
      apps/indexer/lib/indexer/block_fetcher/catchup.ex
  11. 110
      apps/indexer/lib/indexer/block_fetcher/catchup/supervisor.ex
  12. 175
      apps/indexer/lib/indexer/block_fetcher/realtime.ex
  13. 37
      apps/indexer/lib/indexer/block_fetcher/realtime/supervisor.ex
  14. 101
      apps/indexer/lib/indexer/block_fetcher/supervisor.ex
  15. 41
      apps/indexer/test/indexer/block_fetcher/catchup/supervisor_test.exs
  16. 13
      apps/indexer/test/indexer/block_fetcher/realtime_test.exs
  17. 11
      apps/indexer/test/indexer/block_fetcher_test.exs

@ -29,8 +29,11 @@ defmodule EthereumJSONRPC.WebSocket.WebSockex do
@impl WebSocket
# only allow secure WSS
def start_link("wss://" <> _ = url) do
WebSockex.start_link(url, __MODULE__, %__MODULE__{url: url}, cacerts: :certifi.cacerts(), insecure: false)
def start_link(["wss://" <> _ = url, gen_server_options]) when is_list(gen_server_options) do
WebSockex.start_link(url, __MODULE__, %__MODULE__{url: url}, [
{:cacerts, :certifi.cacerts()},
{:insecure, false} | gen_server_options
])
end
# Client interface

@ -8,7 +8,7 @@ defmodule EthereumJSONRPC.WebSocket.Case.Geth do
def setup do
url = "wss://mainnet.infura.io/ws/8lTvJTKmHPCHazkneJsY"
web_socket_module = EthereumJSONRPC.WebSocket.WebSockex
web_socket = start_supervised!({web_socket_module, url})
web_socket = start_supervised!({web_socket_module, [url, []]})
%{
block_interval: 5_000,

@ -8,7 +8,7 @@ defmodule EthereumJSONRPC.WebSocket.Case.Parity do
def setup do
url = "wss://sokol-ws.poa.network/ws"
web_socket_module = EthereumJSONRPC.WebSocket.WebSockex
web_socket = start_supervised!({web_socket_module, url})
web_socket = start_supervised!({web_socket_module, [url, []]})
%{
block_interval: 5_000,

@ -10,4 +10,11 @@ config :indexer,
http_options: [recv_timeout: 60_000, timeout: 60_000, hackney: [pool: :ethereum_jsonrpc]]
],
variant: EthereumJSONRPC.Geth
],
subscribe_named_arguments: [
transport: EthereumJSONRPC.WebSocket,
transport_options: [
web_socket: EthereumJSONRPC.WebSocket.WebSockex,
url: "wss://mainnet.infura.io/ws/8lTvJTKmHPCHazkneJsY"
]
]

@ -14,4 +14,11 @@ config :indexer,
http_options: [recv_timeout: 60_000, timeout: 60_000, hackney: [pool: :ethereum_jsonrpc]]
],
variant: EthereumJSONRPC.Parity
],
subscribe_named_arguments: [
transport: EthereumJSONRPC.WebSocket,
transport_options: [
web_socket: EthereumJSONRPC.WebSocket.WebSockex,
url: "wss://sokol-ws.poa.network/ws"
]
]

@ -10,4 +10,11 @@ config :indexer,
http_options: [recv_timeout: 60_000, timeout: 60_000, hackney: [pool: :ethereum_jsonrpc]]
],
variant: EthereumJSONRPC.Geth
],
subscribe_named_arguments: [
transport: EthereumJSONRPC.WebSocket,
transport_options: [
web_socket: EthereumJSONRPC.WebSocket.WebSockex,
url: "wss://mainnet.infura.io/ws/8lTvJTKmHPCHazkneJsY"
]
]

@ -14,4 +14,11 @@ config :indexer,
http_options: [recv_timeout: 60_000, timeout: 60_000, hackney: [pool: :ethereum_jsonrpc]]
],
variant: EthereumJSONRPC.Parity
],
subscribe_named_arguments: [
transport: EthereumJSONRPC.WebSocket,
transport_options: [
web_socket: EthereumJSONRPC.WebSocket.WebSockex,
url: "wss://sokol-ws.poa.network/ws"
]
]

@ -22,8 +22,9 @@ defmodule Indexer.Application do
|> Application.get_all_env()
|> Keyword.take(
~w(blocks_batch_size blocks_concurrency block_interval json_rpc_named_arguments receipts_batch_size
receipts_concurrency)a
receipts_concurrency subscribe_named_arguments)a
)
|> Enum.into(%{})
children = [
{Task.Supervisor, name: Indexer.TaskSupervisor},

@ -5,24 +5,10 @@ defmodule Indexer.BlockFetcher do
require Logger
import Indexer, only: [debug: 1]
alias Explorer.Chain.{Block, Import}
alias Indexer.{AddressExtraction, Balances, Sequence, TokenTransfers}
alias Indexer.{AddressExtraction, Balances, TokenTransfers}
alias Indexer.BlockFetcher.Receipts
# dialyzer thinks that Logger.debug functions always have no_local_return
@dialyzer {:nowarn_function, import_range: 2}
# These are all the *default* values for options.
# DO NOT use them directly in the code. Get options from `state`.
@blocks_batch_size 10
@blocks_concurrency 10
@receipts_batch_size 250
@receipts_concurrency 10
@type address_hash_to_fetched_balance_block_number :: %{String.t() => Block.block_number()}
@type transaction_hash_to_block_number :: %{String.t() => Block.block_number()}
@ -49,18 +35,25 @@ defmodule Indexer.BlockFetcher do
}
) :: Import.all_result()
# These are all the *default* values for options.
# DO NOT use them directly in the code. Get options from `state`.
@receipts_batch_size 250
@receipts_concurrency 10
@doc false
def default_receipts_batch_size, do: @receipts_batch_size
@doc false
def default_receipts_concurrency, do: @receipts_concurrency
@enforce_keys ~w(json_rpc_named_arguments)a
defstruct blocks_batch_size: @blocks_batch_size,
blocks_concurrency: @blocks_concurrency,
broadcast: nil,
defstruct broadcast: nil,
callback_module: nil,
json_rpc_named_arguments: nil,
receipts_batch_size: @receipts_batch_size,
receipts_concurrency: @receipts_concurrency
@doc false
def default_blocks_batch_size, do: @blocks_batch_size
@doc """
Required named arguments
@ -69,51 +62,75 @@ defmodule Indexer.BlockFetcher do
The follow options can be overridden:
* `:blocks_batch_size` - The number of blocks to request in one call to the JSONRPC. Defaults to
`#{@blocks_batch_size}`. Block requests also include the transactions for those blocks. *These transactions
are not paginated.*
* `:blocks_concurrency` - The number of concurrent requests of `:blocks_batch_size` to allow against the JSONRPC.
Defaults to #{@blocks_concurrency}. So upto `blocks_concurrency * block_batch_size` (defaults to
`#{@blocks_concurrency * @blocks_batch_size}`) blocks can be requested from the JSONRPC at once over all
connections.
* `:receipts_batch_size` - The number of receipts to request in one call to the JSONRPC. Defaults to
`#{@receipts_batch_size}`. Receipt requests also include the logs for when the transaction was collated into the
block. *These logs are not paginated.*
* `:receipts_concurrency` - The number of concurrent requests of `:receipts_batch_size` to allow against the JSONRPC
**for each block range**. Defaults to `#{@receipts_concurrency}`. So upto
`block_concurrency * receipts_batch_size * receipts_concurrency` (defaults to
`#{@blocks_concurrency * @receipts_concurrency * @receipts_batch_size}`) receipts can be requested from the
JSONRPC at once over all connections. *Each transaction only has one receipt.*
**for each block range**. Defaults to `#{@receipts_concurrency}`. *Each transaction only has one receipt.*
"""
def new(named_arguments) when is_list(named_arguments) do
def new(named_arguments) when is_map(named_arguments) do
struct!(__MODULE__, named_arguments)
end
def stream_fetch_and_import(%__MODULE__{blocks_concurrency: blocks_concurrency} = state, sequence)
when is_pid(sequence) do
sequence
|> Sequence.build_stream()
|> Task.async_stream(
&fetch_and_import_range_from_sequence(state, &1, sequence),
max_concurrency: blocks_concurrency,
timeout: :infinity
)
|> Stream.run()
end
defp cap_seq(seq, next, range) do
case next do
:more ->
debug(fn ->
first_block_number..last_block_number = range
"got blocks #{first_block_number} - #{last_block_number}"
end)
:end_of_chain ->
Sequence.cap(seq)
@spec fetch_and_import_range(t, Range.t()) ::
{:ok, {inserted :: %{}, next :: :more | :end_of_chain}}
| {:error,
{step :: atom(), reason :: term()}
| [%Ecto.Changeset{}]
| {step :: atom(), failed_value :: term(), changes_so_far :: term()}}
def fetch_and_import_range(
%__MODULE__{
broadcast: broadcast,
callback_module: callback_module,
json_rpc_named_arguments: json_rpc_named_arguments
} = state,
_.._ = range
)
when broadcast in ~w(true false)a and callback_module != nil do
with {:blocks, {:ok, next, result}} <-
{:blocks, EthereumJSONRPC.fetch_blocks_by_range(range, json_rpc_named_arguments)},
%{blocks: blocks, transactions: transactions_without_receipts} = result,
{:receipts, {:ok, receipt_params}} <- {:receipts, Receipts.fetch(state, transactions_without_receipts)},
%{logs: logs, receipts: receipts} = receipt_params,
transactions_with_receipts = Receipts.put(transactions_without_receipts, receipts),
%{token_transfers: token_transfers, tokens: tokens} = TokenTransfers.from_log_params(logs),
addresses =
AddressExtraction.extract_addresses(%{
blocks: blocks,
logs: logs,
token_transfers: token_transfers,
transactions: transactions_with_receipts
}),
balances_params_set =
Balances.params_set(%{
blocks_params: blocks,
logs_params: logs,
transactions_params: transactions_with_receipts
}),
token_balances = Balances.params_set(%{token_transfers_params: token_transfers}),
{:ok, inserted} <-
import_range(
state,
%{
range: range,
addresses: %{params: addresses},
balances: %{params: balances_params_set},
token_balances: %{params: token_balances},
blocks: %{params: blocks},
logs: %{params: logs},
receipts: %{params: receipts},
token_transfers: %{params: token_transfers},
tokens: %{on_conflict: :nothing, params: tokens},
transactions: %{params: transactions_with_receipts, on_conflict: :replace_all}
}
) do
{:ok, {inserted, next}}
else
{step, {:error, reason}} -> {:error, {step, reason}}
{:error, changesets} = error when is_list(changesets) -> error
{:error, step, failed_value, changes_so_far} -> {:error, {step, failed_value, changes_so_far}}
end
:ok
end
defp import_range(
@ -165,93 +182,4 @@ defmodule Indexer.BlockFetcher do
) do
{{hash, fetched_balance_block_number}, Map.delete(address_params, :fetched_balance_block_number)}
end
# Run at state.blocks_concurrency max_concurrency when called by `stream_import/1`
# Only public for testing
@doc false
def fetch_and_import_range_from_sequence(
%__MODULE__{} = state,
_.._ = range,
sequence
) do
case fetch_and_import_range(state, range) do
{:ok, {inserted, next}} ->
cap_seq(sequence, next, range)
{:ok, inserted}
{:error, {step, reason}} = error ->
Logger.error(fn ->
first..last = range
"failed to fetch #{step} for blocks #{first} - #{last}: #{inspect(reason)}. Retrying block range."
end)
:ok = Sequence.queue(sequence, range)
error
{:error, {step, failed_value, _changes_so_far}} = error ->
Logger.error(fn ->
"failed to insert blocks during #{step} #{inspect(range)}: #{inspect(failed_value)}. Retrying"
end)
:ok = Sequence.queue(sequence, range)
error
{:error, changesets} = error when is_list(changesets) ->
Logger.error(fn ->
"failed to validate blocks #{inspect(range)}: #{inspect(changesets)}. Retrying"
end)
:ok = Sequence.queue(sequence, range)
error
end
end
@spec fetch_and_import_range(t, Range.t()) ::
{:ok, {inserted :: %{}, next :: :more | :end_of_chain}} | {:error, {step :: atom, reason :: term()}}
def fetch_and_import_range(%__MODULE__{json_rpc_named_arguments: json_rpc_named_arguments} = state, _.._ = range) do
with {:blocks, {:ok, next, result}} <-
{:blocks, EthereumJSONRPC.fetch_blocks_by_range(range, json_rpc_named_arguments)},
%{blocks: blocks, transactions: transactions_without_receipts} = result,
{:receipts, {:ok, receipt_params}} <- {:receipts, Receipts.fetch(state, transactions_without_receipts)},
%{logs: logs, receipts: receipts} = receipt_params,
transactions_with_receipts = Receipts.put(transactions_without_receipts, receipts),
%{token_transfers: token_transfers, tokens: tokens} = TokenTransfers.from_log_params(logs),
addresses =
AddressExtraction.extract_addresses(%{
blocks: blocks,
logs: logs,
token_transfers: token_transfers,
transactions: transactions_with_receipts
}),
balances_params_set =
Balances.params_set(%{
blocks_params: blocks,
logs_params: logs,
transactions_params: transactions_with_receipts
}),
token_balances = Balances.params_set(%{token_transfers_params: token_transfers}),
{:ok, inserted} <-
import_range(
state,
%{
range: range,
addresses: %{params: addresses},
balances: %{params: balances_params_set},
token_balances: %{params: token_balances},
blocks: %{params: blocks},
logs: %{params: logs},
receipts: %{params: receipts},
token_transfers: %{params: token_transfers},
tokens: %{on_conflict: :nothing, params: tokens},
transactions: %{params: transactions_with_receipts, on_conflict: :replace_all}
}
) do
{:ok, {inserted, next}}
else
{step, {:error, reason}} -> {:error, {step, reason}}
end
end
end

@ -6,14 +6,13 @@ defmodule Indexer.BlockFetcher.Catchup do
require Logger
import Indexer, only: [debug: 1]
import Indexer.BlockFetcher, only: [stream_fetch_and_import: 2]
import Indexer.BlockFetcher, only: [fetch_and_import_range: 2]
alias Explorer.Chain
alias Indexer.{
BalanceFetcher,
BlockFetcher,
BoundInterval,
InternalTransactionFetcher,
Sequence,
TokenFetcher
@ -21,37 +20,44 @@ defmodule Indexer.BlockFetcher.Catchup do
@behaviour BlockFetcher
@enforce_keys ~w(block_fetcher bound_interval)a
defstruct ~w(block_fetcher bound_interval task)a
# These are all the *default* values for options.
# DO NOT use them directly in the code. Get options from `state`.
def new(%{block_fetcher: %BlockFetcher{} = common_block_fetcher, block_interval: block_interval}) do
block_fetcher = %BlockFetcher{common_block_fetcher | broadcast: false, callback_module: __MODULE__}
minimum_interval = div(block_interval, 2)
@blocks_batch_size 10
@blocks_concurrency 10
%__MODULE__{
block_fetcher: block_fetcher,
bound_interval: BoundInterval.within(minimum_interval..(minimum_interval * 10))
}
end
defstruct blocks_batch_size: @blocks_batch_size,
blocks_concurrency: @blocks_concurrency,
block_fetcher: nil
@doc false
def default_blocks_batch_size, do: @blocks_batch_size
@doc """
Starts `task/1` and puts it in `t:Indexer.BlockFetcher.t/0`
"""
@spec put(%BlockFetcher.Supervisor{catchup: %__MODULE__{task: nil}}) :: %BlockFetcher.Supervisor{
catchup: %__MODULE__{task: Task.t()}
}
def put(%BlockFetcher.Supervisor{catchup: %__MODULE__{task: nil} = state} = supervisor_state) do
put_in(
supervisor_state.catchup.task,
Task.Supervisor.async_nolink(Indexer.TaskSupervisor, __MODULE__, :task, [state])
)
end
Required named arguments
* `:json_rpc_named_arguments` - `t:EthereumJSONRPC.json_rpc_named_arguments/0` passed to
`EthereumJSONRPC.json_rpc/2`.
The follow options can be overridden:
def task(%__MODULE__{
block_fetcher:
%BlockFetcher{blocks_batch_size: blocks_batch_size, json_rpc_named_arguments: json_rpc_named_arguments} =
block_fetcher
}) do
* `:blocks_batch_size` - The number of blocks to request in one call to the JSONRPC. Defaults to
`#{@blocks_batch_size}`. Block requests also include the transactions for those blocks. *These transactions
are not paginated.*
* `:blocks_concurrency` - The number of concurrent requests of `:blocks_batch_size` to allow against the JSONRPC.
Defaults to #{@blocks_concurrency}. So upto `blocks_concurrency * block_batch_size` (defaults to
`#{@blocks_concurrency * @blocks_batch_size}`) blocks can be requested from the JSONRPC at once over all
connections. Upto `block_concurrency * receipts_batch_size * receipts_concurrency` (defaults to
`#{@blocks_concurrency * BlockFetcher.default_receipts_batch_size() * BlockFetcher.default_receipts_batch_size()}`
) receipts can be requested from the JSONRPC at once over all connections.
"""
def task(
%__MODULE__{
blocks_batch_size: blocks_batch_size,
block_fetcher: %BlockFetcher{json_rpc_named_arguments: json_rpc_named_arguments}
} = state
) do
{:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments)
case latest_block_number do
@ -81,7 +87,7 @@ defmodule Indexer.BlockFetcher.Catchup do
{:ok, sequence} = Sequence.start_link(ranges: missing_ranges, step: -1 * blocks_batch_size)
Sequence.cap(sequence)
stream_fetch_and_import(block_fetcher, sequence)
stream_fetch_and_import(state, sequence)
end
%{first_block_number: first, missing_block_count: missing_block_count}
@ -105,55 +111,6 @@ defmodule Indexer.BlockFetcher.Catchup do
end
end
def handle_success(
{ref, %{first_block_number: first_block_number, missing_block_count: missing_block_count}},
%BlockFetcher.Supervisor{
catchup: %__MODULE__{
bound_interval: bound_interval,
task: %Task{ref: ref}
}
} = supervisor_state
)
when is_integer(missing_block_count) do
new_bound_interval =
case missing_block_count do
0 ->
Logger.info("Index already caught up in #{first_block_number}-0")
BoundInterval.increase(bound_interval)
_ ->
Logger.info("Index had to catch up #{missing_block_count} blocks in #{first_block_number}-0")
BoundInterval.decrease(bound_interval)
end
Process.demonitor(ref, [:flush])
interval = new_bound_interval.current
Logger.info(fn ->
"Checking if index needs to catch up in #{interval}ms"
end)
Process.send_after(self(), :catchup_index, interval)
update_in(supervisor_state.catchup, fn state ->
%__MODULE__{state | bound_interval: new_bound_interval, task: nil}
end)
end
def handle_failure(
{:DOWN, ref, :process, pid, reason},
%BlockFetcher.Supervisor{catchup: %__MODULE__{task: %Task{pid: pid, ref: ref}}} = supervisor_state
) do
Logger.error(fn -> "Catchup index stream exited with reason (#{inspect(reason)}). Restarting" end)
send(self(), :catchup_index)
put_in(supervisor_state.catchup.task, nil)
end
defp async_import_remaining_block_data(
%{transactions: transaction_hashes, addresses: address_hashes, tokens: tokens},
%{
@ -179,4 +136,72 @@ defmodule Indexer.BlockFetcher.Catchup do
|> Enum.map(& &1.contract_address_hash)
|> TokenFetcher.async_fetch()
end
defp stream_fetch_and_import(%__MODULE__{blocks_concurrency: blocks_concurrency} = state, sequence)
when is_pid(sequence) do
sequence
|> Sequence.build_stream()
|> Task.async_stream(
&fetch_and_import_range_from_sequence(state, &1, sequence),
max_concurrency: blocks_concurrency,
timeout: :infinity
)
|> Stream.run()
end
# Run at state.blocks_concurrency max_concurrency when called by `stream_import/1`
defp fetch_and_import_range_from_sequence(
%__MODULE__{block_fetcher: %BlockFetcher{} = block_fetcher},
_.._ = range,
sequence
) do
case fetch_and_import_range(block_fetcher, range) do
{:ok, {inserted, next}} ->
cap_seq(sequence, next, range)
{:ok, inserted}
{:error, {step, reason}} = error ->
Logger.error(fn ->
first..last = range
"failed to fetch #{step} for blocks #{first} - #{last}: #{inspect(reason)}. Retrying block range."
end)
:ok = Sequence.queue(sequence, range)
error
{:error, changesets} = error when is_list(changesets) ->
Logger.error(fn ->
"failed to validate blocks #{inspect(range)}: #{inspect(changesets)}. Retrying"
end)
:ok = Sequence.queue(sequence, range)
error
{:error, {step, failed_value, _changes_so_far}} = error ->
Logger.error(fn ->
"failed to insert blocks during #{step} #{inspect(range)}: #{inspect(failed_value)}. Retrying"
end)
:ok = Sequence.queue(sequence, range)
error
end
end
defp cap_seq(seq, next, range) do
case next do
:more ->
debug(fn ->
first_block_number..last_block_number = range
"got blocks #{first_block_number} - #{last_block_number}"
end)
:end_of_chain ->
Sequence.cap(seq)
end
:ok
end
end

@ -0,0 +1,110 @@
defmodule Indexer.BlockFetcher.Catchup.Supervisor do
@moduledoc """
Supervises the `Indexer.BlockerFetcher.Catchup` with exponential backoff for restarts.
"""
# NOT a `Supervisor` because of the `Task` restart strategies are custom.
use GenServer
require Logger
alias Indexer.{BlockFetcher, BoundInterval}
alias Indexer.BlockFetcher.Catchup
# milliseconds
@block_interval 5_000
@enforce_keys ~w(bound_interval catchup)a
defstruct bound_interval: nil,
catchup: %Catchup{},
task: nil
def child_spec(arg) do
# The `child_spec` from `use Supervisor` because the one from `use GenServer` will set the `type` to `:worker`
# instead of `:supervisor` and use the wrong shutdown timeout
Supervisor.child_spec(%{id: __MODULE__, start: {__MODULE__, :start_link, [arg]}, type: :supervisor}, [])
end
@doc """
Starts supervisor of `Indexer.BlockerFetcher.Catchup` and `Indexer.BlockFetcher.Realtime`.
For `named_arguments` see `Indexer.BlockFetcher.new/1`. For `t:GenServer.options/0` see `GenServer.start_link/3`.
"""
@spec start_link([named_arguments :: list() | GenServer.options()]) :: {:ok, pid}
def start_link([named_arguments, gen_server_options]) when is_map(named_arguments) and is_list(gen_server_options) do
GenServer.start_link(__MODULE__, named_arguments, gen_server_options)
end
@impl GenServer
def init(named_arguments) do
state = new(named_arguments)
send(self(), :catchup_index)
{:ok, state}
end
defp new(%{block_fetcher: common_block_fetcher} = named_arguments) do
block_fetcher = %BlockFetcher{common_block_fetcher | broadcast: false, callback_module: Catchup}
block_interval = Map.get(named_arguments, :block_interval, @block_interval)
minimum_interval = div(block_interval, 2)
bound_interval = BoundInterval.within(minimum_interval..(minimum_interval * 10))
%__MODULE__{
catchup: %Catchup{block_fetcher: block_fetcher},
bound_interval: bound_interval
}
end
@impl GenServer
def handle_info(:catchup_index, %__MODULE__{catchup: %Catchup{} = catchup} = state) do
{:noreply,
%__MODULE__{state | task: Task.Supervisor.async_nolink(Indexer.TaskSupervisor, Catchup, :task, [catchup])}}
end
def handle_info(
{ref, %{first_block_number: first_block_number, missing_block_count: missing_block_count}},
%__MODULE__{
bound_interval: bound_interval,
task: %Task{ref: ref}
} = state
)
when is_integer(missing_block_count) do
new_bound_interval =
case missing_block_count do
0 ->
Logger.info("Index already caught up in #{first_block_number}-0")
BoundInterval.increase(bound_interval)
_ ->
Logger.info("Index had to catch up #{missing_block_count} blocks in #{first_block_number}-0")
BoundInterval.decrease(bound_interval)
end
Process.demonitor(ref, [:flush])
interval = new_bound_interval.current
Logger.info(fn ->
"Checking if index needs to catch up in #{interval}ms"
end)
Process.send_after(self(), :catchup_index, interval)
{:noreply, %__MODULE__{state | bound_interval: new_bound_interval, task: nil}}
end
def handle_info(
{:DOWN, ref, :process, pid, reason},
%__MODULE__{task: %Task{pid: pid, ref: ref}} = state
) do
Logger.error(fn -> "Catchup index stream exited with reason (#{inspect(reason)}). Restarting" end)
send(self(), :catchup_index)
{:noreply, %__MODULE__{state | task: nil}}
end
end

@ -1,55 +1,114 @@
defmodule Indexer.BlockFetcher.Realtime do
@moduledoc """
Fetches and indexes block ranges from latest block forward.
Fetches and indexes block ranges from latest block forward using a WebSocket.
"""
use GenServer
require Logger
import EthereumJSONRPC, only: [integer_to_quantity: 1]
import Indexer.BlockFetcher, only: [stream_fetch_and_import: 2]
import EthereumJSONRPC, only: [integer_to_quantity: 1, quantity_to_integer: 1]
import Indexer, only: [debug: 1]
import Indexer.BlockFetcher, only: [fetch_and_import_range: 2]
alias EthereumJSONRPC.Subscription
alias Explorer.Chain
alias Indexer.{
AddressExtraction,
BlockFetcher,
Sequence,
TokenFetcher
}
alias Indexer.{AddressExtraction, BlockFetcher, TokenFetcher}
@behaviour BlockFetcher
@enforce_keys ~w(block_fetcher interval)a
defstruct block_fetcher: nil,
interval: nil,
task_by_ref: %{}
@enforce_keys ~w(block_fetcher subscription)a
defstruct ~w(block_fetcher subscription)a
@type t :: %__MODULE__{
block_fetcher: %BlockFetcher{
broadcast: true,
callback_module: __MODULE__,
json_rpc_named_arguments: EthereumJSONRPC.json_rpc_named_arguments(),
receipts_batch_size: pos_integer(),
receipts_concurrency: pos_integer()
},
subscription: Subscription.t()
}
def start_link([arguments, gen_server_options]) do
GenServer.start_link(__MODULE__, arguments, gen_server_options)
end
def new(%{block_fetcher: %BlockFetcher{} = common_block_fetcher, block_interval: block_interval}) do
block_fetcher = %BlockFetcher{
common_block_fetcher
| callback_module: __MODULE__,
blocks_concurrency: 1,
broadcast: true
}
@impl GenServer
@spec init(%{block_fetcher: BlockFetcher.t(), subscribe_named_arguments: Keyword.t()}) ::
{:ok, t()} | {:stop, reason :: term()}
def init(%{block_fetcher: block_fetcher, subscribe_named_arguments: subscribe_named_arguments}) do
case EthereumJSONRPC.subscribe("newHeads", subscribe_named_arguments) do
{:ok, subscription} ->
{:ok,
%__MODULE__{
block_fetcher: %BlockFetcher{block_fetcher | broadcast: true, callback_module: __MODULE__},
subscription: subscription
}}
{:error, reason} ->
{:stop, reason}
end
end
interval = div(block_interval, 2)
@impl GenServer
def handle_info(
{subscription, {:ok, %{"number" => quantity}}},
%__MODULE__{
block_fetcher: %BlockFetcher{} = block_fetcher,
subscription: %Subscription{} = subscription
} = state
)
when is_binary(quantity) do
number = quantity_to_integer(quantity)
# Subscriptions don't support getting all the blocks and transactions data, so we need to go back and get the full block
case fetch_and_import_range(block_fetcher, number..number) do
{:ok, {_inserted, _next}} ->
debug(fn ->
["realtime indexer fetched and imported block ", to_string(number)]
end)
%__MODULE__{block_fetcher: block_fetcher, interval: interval}
end
{:error, {step, reason}} ->
Logger.error(fn ->
[
"realtime indexer failed to fetch ",
to_string(step),
" for block ",
to_string(number),
": ",
inspect(reason),
". Block will be retried by catchup indexer."
]
end)
@doc """
Starts `task/1` and puts it in `t:Indexer.BlockFetcher.t/0` `realtime_task_by_ref`.
"""
def put(%BlockFetcher.Supervisor{realtime: %__MODULE__{} = state} = supervisor_state) do
%Task{ref: ref} = task = Task.Supervisor.async_nolink(Indexer.TaskSupervisor, __MODULE__, :task, [state])
{:error, changesets} when is_list(changesets) ->
Logger.error(fn ->
[
"realtime indexer failed to validate for block ",
to_string(number),
": ",
inspect(changesets),
". Block will be retried by catchup indexer."
]
end)
put_in(supervisor_state.realtime.task_by_ref[ref], task)
end
{:error, {step, failed_value, _changes_so_far}} ->
Logger.error(fn ->
[
"realtime indexer failed to insert ",
to_string(step),
" for block ",
to_string(number),
": ",
inspect(failed_value),
". Block will be retried by catchup indexer."
]
end)
end
def task(%__MODULE__{block_fetcher: %BlockFetcher{json_rpc_named_arguments: json_rpc_named_arguments} = block_fetcher}) do
{:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments)
{:ok, sequence} = Sequence.start_link(first: latest_block_number, step: 2)
stream_fetch_and_import(block_fetcher, sequence)
{:noreply, state}
end
@import_options ~w(address_hash_to_fetched_balance_block_number transaction_hash_to_block_number)a
@ -91,52 +150,6 @@ defmodule Indexer.BlockFetcher.Realtime do
end
end
def handle_success(
{ref, :ok = result},
%BlockFetcher.Supervisor{realtime: %__MODULE__{task_by_ref: task_by_ref}} = supervisor_state
) do
{task, running_task_by_ref} = Map.pop(task_by_ref, ref)
case task do
nil ->
Logger.error(fn ->
"Unknown ref (#{inspect(ref)}) that is neither the catchup index" <>
" nor a realtime index Task ref returned result (#{inspect(result)})"
end)
_ ->
:ok
end
Process.demonitor(ref, [:flush])
put_in(supervisor_state.realtime.task_by_ref, running_task_by_ref)
end
def handle_failure(
{:DOWN, ref, :process, pid, reason},
%BlockFetcher.Supervisor{realtime: %__MODULE__{task_by_ref: task_by_ref}} = supervisor_state
) do
{task, running_task_by_ref} = Map.pop(task_by_ref, ref)
case task do
nil ->
Logger.error(fn ->
"Unknown ref (#{inspect(ref)}) that is neither the catchup index" <>
" nor a realtime index Task ref reports unknown pid (#{pid}) DOWN due to reason (#{reason}})"
end)
_ ->
Logger.error(fn ->
"Realtime index stream exited with reason (#{inspect(reason)}). " <>
"The next realtime index task will fill the missing block " <>
"if the lastest block number has not advanced by then or the catch up index will fill the missing block."
end)
end
put_in(supervisor_state.realtime.task_by_ref, running_task_by_ref)
end
defp async_import_remaining_block_data(%{tokens: tokens}) do
tokens
|> Enum.map(& &1.contract_address_hash)

@ -0,0 +1,37 @@
defmodule Indexer.BlockFetcher.Realtime.Supervisor do
@moduledoc """
Supervises realtime block fetcher.
"""
use Supervisor
def start_link([arguments, gen_server_options]) do
Supervisor.start_link(__MODULE__, arguments, gen_server_options)
end
@impl Supervisor
def init(%{block_fetcher: block_fetcher, subscribe_named_arguments: subscribe_named_arguments}) do
children =
case Keyword.fetch!(subscribe_named_arguments, :transport) do
EthereumJSONRPC.WebSocket ->
transport_options = Keyword.fetch!(subscribe_named_arguments, :transport_options)
url = Keyword.fetch!(transport_options, :url)
web_socket_module = Keyword.fetch!(transport_options, :web_socket)
web_socket = Indexer.BlockFetcher.Realtime.WebSocket
block_fetcher_subscribe_named_arguments =
put_in(subscribe_named_arguments[:transport_options][:web_socket_options], %{web_socket: web_socket})
[
{web_socket_module, [url, [name: web_socket]]},
{Indexer.BlockFetcher.Realtime,
[
%{block_fetcher: block_fetcher, subscribe_named_arguments: block_fetcher_subscribe_named_arguments},
[name: Indexer.BlockFetcher.Realtime]
]}
]
end
Supervisor.init(children, strategy: :rest_for_one)
end
end

@ -1,84 +1,35 @@
defmodule Indexer.BlockFetcher.Supervisor do
@moduledoc """
Supervises the `Indexer.BlockerFetcher.Catchup` and `Indexer.BlockFetcher.Realtime`.
Supervises catchup and realtime block fetchers
"""
# NOT a `Supervisor` because of the `Task` restart strategies are custom.
use GenServer
require Logger
alias Indexer.BlockFetcher
alias Indexer.BlockFetcher.{Catchup, Realtime}
# milliseconds
@block_interval 5_000
@enforce_keys ~w(catchup realtime)a
defstruct ~w(catchup realtime)a
def child_spec(arg) do
# The `child_spec` from `use Supervisor` because the one from `use GenServer` will set the `type` to `:worker`
# instead of `:supervisor` and use the wrong shutdown timeout
Supervisor.child_spec(%{id: __MODULE__, start: {__MODULE__, :start_link, [arg]}, type: :supervisor}, [])
end
@doc """
Starts supervisor of `Indexer.BlockerFetcher.Catchup` and `Indexer.BlockFetcher.Realtime`.
For `named_arguments` see `Indexer.BlockFetcher.new/1`. For `t:GenServer.options/0` see `GenServer.start_link/3`.
"""
@spec start_link([named_arguments :: list() | GenServer.options()]) :: {:ok, pid}
def start_link([named_arguments, gen_server_options]) when is_list(named_arguments) and is_list(gen_server_options) do
GenServer.start_link(__MODULE__, named_arguments, gen_server_options)
end
@impl GenServer
def init(named_arguments) do
state = new(named_arguments)
send(self(), :catchup_index)
{:ok, _} = :timer.send_interval(state.realtime.interval, :realtime_index)
{:ok, state}
end
defp new(named_arguments) do
{given_block_interval, block_fetcher_named_arguments} = Keyword.pop(named_arguments, :block_interval)
block_fetcher = struct!(BlockFetcher, block_fetcher_named_arguments)
block_interval = given_block_interval || @block_interval
%__MODULE__{
catchup: Catchup.new(%{block_fetcher: block_fetcher, block_interval: block_interval}),
realtime: Realtime.new(%{block_fetcher: block_fetcher, block_interval: block_interval})
}
end
@impl GenServer
def handle_info(:catchup_index, %__MODULE__{} = state) do
{:noreply, Catchup.put(state)}
end
def handle_info({ref, _} = message, %__MODULE__{catchup: %Catchup{task: %Task{ref: ref}}} = state) do
{:noreply, Catchup.handle_success(message, state)}
end
def handle_info(
{:DOWN, ref, :process, pid, _} = message,
%__MODULE__{catchup: %Catchup{task: %Task{pid: pid, ref: ref}}} = state
) do
{:noreply, Catchup.handle_failure(message, state)}
end
def handle_info(:realtime_index, %__MODULE__{} = state) do
{:noreply, Realtime.put(state)}
end
def handle_info({ref, :ok} = message, %__MODULE__{} = state) when is_reference(ref) do
{:noreply, Realtime.handle_success(message, state)}
end
def handle_info({:DOWN, _, :process, _, _} = message, %__MODULE__{} = state) do
{:noreply, Realtime.handle_failure(message, state)}
use Supervisor
def start_link([arguments, gen_server_options]) do
Supervisor.start_link(__MODULE__, arguments, gen_server_options)
end
@impl Supervisor
def init(%{block_interval: block_interval, subscribe_named_arguments: subscribe_named_arguments} = named_arguments) do
block_fetcher =
named_arguments
|> Map.drop(~w(block_interval subscribe_named_arguments)a)
|> BlockFetcher.new()
Supervisor.init(
[
{Catchup.Supervisor,
[%{block_fetcher: block_fetcher, block_interval: block_interval}, [name: Catchup.Supervisor]]},
{Realtime.Supervisor,
[
%{block_fetcher: block_fetcher, subscribe_named_arguments: subscribe_named_arguments},
[name: Realtime.Supervisor]
]}
],
strategy: :one_for_one
)
end
end

@ -1,4 +1,4 @@
defmodule Indexer.BlockFetcher.SupervisorTest do
defmodule Indexer.BlockFetcher.Catchup.SupervisorTest do
# `async: false` due to use of named GenServer
use EthereumJSONRPC.Case, async: false
use Explorer.DataCase
@ -199,7 +199,7 @@ defmodule Indexer.BlockFetcher.SupervisorTest do
{:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments)
default_blocks_batch_size = BlockFetcher.default_blocks_batch_size()
default_blocks_batch_size = BlockFetcher.Catchup.default_blocks_batch_size()
assert latest_block_number > default_blocks_batch_size
@ -209,7 +209,10 @@ defmodule Indexer.BlockFetcher.SupervisorTest do
AddressBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
InternalTransactionFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
TokenFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
start_supervised!({BlockFetcher.Supervisor, [[json_rpc_named_arguments: json_rpc_named_arguments], []]})
start_supervised!(
{Catchup.Supervisor, [%{block_fetcher: %BlockFetcher{json_rpc_named_arguments: json_rpc_named_arguments}}, []]}
)
first_catchup_block_number = latest_block_number - 1
@ -256,9 +259,8 @@ defmodule Indexer.BlockFetcher.SupervisorTest do
# from `setup :state`
assert_received :catchup_index
assert {:noreply,
%BlockFetcher.Supervisor{catchup: %Catchup{task: %Task{pid: pid, ref: ref}}} = catchup_index_state} =
BlockFetcher.Supervisor.handle_info(:catchup_index, state)
assert {:noreply, %Catchup.Supervisor{catchup: %Catchup{}, task: %Task{pid: pid, ref: ref}} = catchup_index_state} =
Catchup.Supervisor.handle_info(:catchup_index, state)
assert_receive {^ref, %{first_block_number: 0, missing_block_count: 0}} = message
@ -267,12 +269,12 @@ defmodule Indexer.BlockFetcher.SupervisorTest do
# DOWN is not flushed
assert {:messages, [{:DOWN, ^ref, :process, ^pid, :normal}]} = Process.info(self(), :messages)
assert {:noreply, message_state} = BlockFetcher.Supervisor.handle_info(message, catchup_index_state)
assert {:noreply, message_state} = Catchup.Supervisor.handle_info(message, catchup_index_state)
# DOWN is flushed
assert {:messages, []} = Process.info(self(), :messages)
assert message_state.catchup.bound_interval.current > catchup_index_state.catchup.bound_interval.current
assert message_state.bound_interval.current > catchup_index_state.bound_interval.current
end
test "decreases catchup_bound_interval if blocks missing", %{
@ -327,38 +329,39 @@ defmodule Indexer.BlockFetcher.SupervisorTest do
# from `setup :state`
assert_received :catchup_index
assert {:noreply,
%BlockFetcher.Supervisor{catchup: %Catchup{task: %Task{pid: pid, ref: ref}}} = catchup_index_state} =
BlockFetcher.Supervisor.handle_info(:catchup_index, state)
assert {:noreply, %Catchup.Supervisor{catchup: %Catchup{}, task: %Task{pid: pid, ref: ref}} = catchup_index_state} =
Catchup.Supervisor.handle_info(:catchup_index, state)
# 2 blocks are missing, but latest is assumed to be handled by realtime_index, so only 1 is missing for
# catchup_index
assert_receive {^ref, %{first_block_number: 0, missing_block_count: 1}} = message, 200
Process.sleep(200)
# DOWN is not flushed
assert {:messages, [{:DOWN, ^ref, :process, ^pid, :normal}]} = Process.info(self(), :messages)
assert {:noreply, message_state} = BlockFetcher.Supervisor.handle_info(message, catchup_index_state)
assert {:noreply, message_state} = Catchup.Supervisor.handle_info(message, catchup_index_state)
# DOWN is flushed
assert {:messages, []} = Process.info(self(), :messages)
assert message_state.catchup.bound_interval.current == message_state.catchup.bound_interval.minimum
assert message_state.bound_interval.current == message_state.bound_interval.minimum
# When not at minimum it is decreased
above_minimum_state = update_in(catchup_index_state.catchup.bound_interval, &BoundInterval.increase/1)
above_minimum_state = update_in(catchup_index_state.bound_interval, &BoundInterval.increase/1)
assert above_minimum_state.catchup.bound_interval.current > message_state.catchup.bound_interval.minimum
assert {:noreply, above_minimum_message_state} = BlockFetcher.Supervisor.handle_info(message, above_minimum_state)
assert above_minimum_state.bound_interval.current > message_state.bound_interval.minimum
assert {:noreply, above_minimum_message_state} = Catchup.Supervisor.handle_info(message, above_minimum_state)
assert above_minimum_message_state.catchup.bound_interval.current <
above_minimum_state.catchup.bound_interval.current
assert above_minimum_message_state.bound_interval.current < above_minimum_state.bound_interval.current
end
end
defp state(%{json_rpc_named_arguments: json_rpc_named_arguments}) do
{:ok, state} = BlockFetcher.Supervisor.init(json_rpc_named_arguments: json_rpc_named_arguments)
{:ok, state} =
Catchup.Supervisor.init(%{block_fetcher: %BlockFetcher{json_rpc_named_arguments: json_rpc_named_arguments}})
%{state: state}
end

@ -27,17 +27,20 @@ defmodule Indexer.BlockFetcher.RealtimeTest do
trace_replayTransaction: "https://core-trace.poa.network"
)
block_fetcher = %{BlockFetcher.new(json_rpc_named_arguments: core_json_rpc_named_arguments) | broadcast: false}
realtime = Realtime.new(%{block_fetcher: block_fetcher, block_interval: 5_000})
block_fetcher = %BlockFetcher{
broadcast: false,
callback_module: Realtime,
json_rpc_named_arguments: core_json_rpc_named_arguments
}
%{json_rpc_named_arguments: core_json_rpc_named_arguments, realtime: realtime}
%{block_fetcher: block_fetcher, json_rpc_named_arguments: core_json_rpc_named_arguments}
end
describe "Indexer.BlockFetcher.stream_import/1" do
@tag :no_geth
test "in range with internal transactions", %{
json_rpc_named_arguments: json_rpc_named_arguments,
realtime: %Realtime{block_fetcher: %BlockFetcher{} = block_fetcher}
block_fetcher: %BlockFetcher{} = block_fetcher,
json_rpc_named_arguments: json_rpc_named_arguments
} do
{:ok, sequence} = Sequence.start_link(ranges: [], step: 2)
Sequence.cap(sequence)

@ -53,12 +53,11 @@ defmodule Indexer.BlockFetcherTest do
TokenFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
%{
block_fetcher:
BlockFetcher.new(
broadcast: false,
callback_module: Indexer.BlockFetcher.Catchup,
json_rpc_named_arguments: json_rpc_named_arguments
)
block_fetcher: %BlockFetcher{
broadcast: false,
callback_module: Indexer.BlockFetcher.Catchup,
json_rpc_named_arguments: json_rpc_named_arguments
}
}
end

Loading…
Cancel
Save