|
|
@ -57,8 +57,7 @@ defmodule Indexer.Block.Realtime.Fetcher do |
|
|
|
end |
|
|
|
end |
|
|
|
|
|
|
|
|
|
|
|
@impl GenServer |
|
|
|
@impl GenServer |
|
|
|
def init(%{block_fetcher: %Block.Fetcher{} = block_fetcher, subscribe_named_arguments: subscribe_named_arguments}) |
|
|
|
def init(%{block_fetcher: %Block.Fetcher{} = block_fetcher, subscribe_named_arguments: subscribe_named_arguments}) do |
|
|
|
when is_list(subscribe_named_arguments) do |
|
|
|
|
|
|
|
Logger.metadata(fetcher: :block_realtime) |
|
|
|
Logger.metadata(fetcher: :block_realtime) |
|
|
|
|
|
|
|
|
|
|
|
{:ok, %__MODULE__{block_fetcher: %Block.Fetcher{block_fetcher | broadcast: :realtime, callback_module: __MODULE__}}, |
|
|
|
{:ok, %__MODULE__{block_fetcher: %Block.Fetcher{block_fetcher | broadcast: :realtime, callback_module: __MODULE__}}, |
|
|
@ -66,17 +65,9 @@ defmodule Indexer.Block.Realtime.Fetcher do |
|
|
|
end |
|
|
|
end |
|
|
|
|
|
|
|
|
|
|
|
@impl GenServer |
|
|
|
@impl GenServer |
|
|
|
def handle_continue({:init, subscribe_named_arguments}, %__MODULE__{subscription: nil} = state) |
|
|
|
def handle_continue({:init, subscribe_named_arguments}, %__MODULE__{subscription: nil} = state) do |
|
|
|
when is_list(subscribe_named_arguments) do |
|
|
|
timer = schedule_polling() |
|
|
|
case EthereumJSONRPC.subscribe("newHeads", subscribe_named_arguments) do |
|
|
|
{:noreply, %__MODULE__{state | timer: timer} |> subscribe_to_new_heads(subscribe_named_arguments)} |
|
|
|
{:ok, subscription} -> |
|
|
|
|
|
|
|
timer = schedule_polling() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
{:noreply, %__MODULE__{state | subscription: subscription, timer: timer}} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
{:error, reason} -> |
|
|
|
|
|
|
|
{:stop, reason, state} |
|
|
|
|
|
|
|
end |
|
|
|
|
|
|
|
end |
|
|
|
end |
|
|
|
|
|
|
|
|
|
|
|
@impl GenServer |
|
|
|
@impl GenServer |
|
|
@ -141,6 +132,20 @@ defmodule Indexer.Block.Realtime.Fetcher do |
|
|
|
}} |
|
|
|
}} |
|
|
|
end |
|
|
|
end |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
defp subscribe_to_new_heads(%__MODULE__{subscription: nil} = state, subscribe_named_arguments) |
|
|
|
|
|
|
|
when is_list(subscribe_named_arguments) do |
|
|
|
|
|
|
|
case EthereumJSONRPC.subscribe("newHeads", subscribe_named_arguments) do |
|
|
|
|
|
|
|
{:ok, subscription} -> |
|
|
|
|
|
|
|
%__MODULE__{state | subscription: subscription} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
{:error, reason} -> |
|
|
|
|
|
|
|
Logger.debug(fn -> ["Could not connect to websocket: ", reason, ". Continuing with polling."] end) |
|
|
|
|
|
|
|
state |
|
|
|
|
|
|
|
end |
|
|
|
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
defp subscribe_to_new_heads(state, _), do: state |
|
|
|
|
|
|
|
|
|
|
|
defp new_max_number(number, nil), do: number |
|
|
|
defp new_max_number(number, nil), do: number |
|
|
|
|
|
|
|
|
|
|
|
defp new_max_number(number, max_number_seen), do: max(number, max_number_seen) |
|
|
|
defp new_max_number(number, max_number_seen), do: max(number, max_number_seen) |
|
|
|