Realtime.Fetcher async fetching and no skips

Why:

* For the realtime fetcher to keep up on ETH we want it to fetch blocks
asynchronously. We noticed that the realtime fetcher was falling behind
on ETH mainnet. Currently the realtime fetcher fetches and imports one
block at a time. This commit changes that, so that whenever we're
notified of a new block number, through websockets, we start a new
process to fetch and import each block asynchronously.
* Also, whenever the websocket subscription, through which we're notified
of new block numbers, skips block numbers, we want to make sure the
skipped blocks are also fetched and imported.
* Issue link: n/a

This change addresses the need by:

* Adding a new `Task.Supervisor` called
`Indexer.Block.Realtime.TaskSupervisor` under the
`Indexer.Block.Realtime.Supervisor` supervisor. This supervisor was
created to supervise fetching and importing processes started by the
realtime fetcher every time a new block number is received through the
websocket subscription.
* Editing `Indexer.Block.Realtime.Fetcher` to start a new process, under
the new supervisor mentioned above, to fetch the latest block per the
websocket `newHeads` subscription. It also determines if any block
numbers have been skipped and starts individual processes to fetch and
import the skipped blocks also.
pull/781/head
Sebastian Abondano 6 years ago
parent 3a2c6f93ed
commit 14b916f9e8
  1. 113
      apps/indexer/lib/indexer/block/realtime/fetcher.ex
  2. 1
      apps/indexer/lib/indexer/block/realtime/supervisor.ex

@ -13,11 +13,12 @@ defmodule Indexer.Block.Realtime.Fetcher do
alias EthereumJSONRPC.Subscription
alias Explorer.Chain
alias Indexer.{AddressExtraction, Block, Token, TokenBalances}
alias Indexer.Block.Realtime.TaskSupervisor
@behaviour Block.Fetcher
@enforce_keys ~w(block_fetcher)a
defstruct ~w(block_fetcher subscription)a
defstruct ~w(block_fetcher subscription previous_number)a
@type t :: %__MODULE__{
block_fetcher: %Block.Fetcher{
@ -27,7 +28,8 @@ defmodule Indexer.Block.Realtime.Fetcher do
receipts_batch_size: pos_integer(),
receipts_concurrency: pos_integer()
},
subscription: Subscription.t()
subscription: Subscription.t(),
previous_number: pos_integer() | nil
}
def start_link([arguments, gen_server_options]) do
@ -55,58 +57,18 @@ defmodule Indexer.Block.Realtime.Fetcher do
{subscription, {:ok, %{"number" => quantity}}},
%__MODULE__{
block_fetcher: %Block.Fetcher{} = block_fetcher,
subscription: %Subscription{} = subscription
subscription: %Subscription{} = subscription,
previous_number: previous_number
} = state
)
when is_binary(quantity) do
number = quantity_to_integer(quantity)
# Subscriptions don't support getting all the blocks and transactions data, so we need to go back and get the full block
case fetch_and_import_range(block_fetcher, number..number) do
{:ok, {_inserted, _next}} ->
Logger.debug(fn ->
["realtime indexer fetched and imported block ", to_string(number)]
end)
# Subscriptions don't support getting all the blocks and transactions data,
# so we need to go back and get the full block
start_fetch_and_import(number, block_fetcher, previous_number)
{:error, {step, reason}} ->
Logger.error(fn ->
[
"realtime indexer failed to fetch ",
to_string(step),
" for block ",
to_string(number),
": ",
inspect(reason),
". Block will be retried by catchup indexer."
]
end)
{:error, changesets} when is_list(changesets) ->
Logger.error(fn ->
[
"realtime indexer failed to validate for block ",
to_string(number),
": ",
inspect(changesets),
". Block will be retried by catchup indexer."
]
end)
{:error, {step, failed_value, _changes_so_far}} ->
Logger.error(fn ->
[
"realtime indexer failed to insert ",
to_string(step),
" for block ",
to_string(number),
": ",
inspect(failed_value),
". Block will be retried by catchup indexer."
]
end)
end
{:noreply, state}
{:noreply, %{state | previous_number: number}}
end
@import_options ~w(address_hash_to_fetched_balance_block_number transaction_hash_to_block_number)a
@ -152,6 +114,61 @@ defmodule Indexer.Block.Realtime.Fetcher do
end
end
defp start_fetch_and_import(number, block_fetcher, previous_number) do
start_at = if is_integer(previous_number), do: previous_number + 1, else: number
for block_number_to_fetch <- start_at..number do
args = [block_number_to_fetch, block_fetcher]
Task.Supervisor.start_child(TaskSupervisor, __MODULE__, :fetch_and_import_block, args)
end
end
def fetch_and_import_block(block_number_to_fetch, block_fetcher) do
case fetch_and_import_range(block_fetcher, block_number_to_fetch..block_number_to_fetch) do
{:ok, {_inserted, _next}} ->
Logger.debug(fn ->
["realtime indexer fetched and imported block ", to_string(block_number_to_fetch)]
end)
{:error, {step, reason}} ->
Logger.error(fn ->
[
"realtime indexer failed to fetch ",
to_string(step),
" for block ",
to_string(block_number_to_fetch),
": ",
inspect(reason),
". Block will be retried by catchup indexer."
]
end)
{:error, changesets} when is_list(changesets) ->
Logger.error(fn ->
[
"realtime indexer failed to validate for block ",
to_string(block_number_to_fetch),
": ",
inspect(changesets),
". Block will be retried by catchup indexer."
]
end)
{:error, {step, failed_value, _changes_so_far}} ->
Logger.error(fn ->
[
"realtime indexer failed to insert ",
to_string(step),
" for block ",
to_string(block_number_to_fetch),
": ",
inspect(failed_value),
". Block will be retried by catchup indexer."
]
end)
end
end
defp async_import_remaining_block_data(%{tokens: tokens}) do
tokens
|> Enum.map(& &1.contract_address_hash)

@ -23,6 +23,7 @@ defmodule Indexer.Block.Realtime.Supervisor do
put_in(subscribe_named_arguments[:transport_options][:web_socket_options], %{web_socket: web_socket})
[
{Task.Supervisor, name: Indexer.Block.Realtime.TaskSupervisor},
{web_socket_module, [url, [name: web_socket]]},
{Indexer.Block.Realtime.Fetcher,
[

Loading…
Cancel
Save