return polling to realtime fetcher

pull/1330/head
Ayrat Badykov 6 years ago
parent 5c30625b89
commit 60a36a4739
No known key found for this signature in database
GPG Key ID: B44668E265E9396F
  1. 49
      apps/indexer/lib/indexer/block/realtime/fetcher.ex

@ -18,10 +18,12 @@ defmodule Indexer.Block.Realtime.Fetcher do
alias Indexer.{AddressExtraction, Block, TokenBalances, Tracer} alias Indexer.{AddressExtraction, Block, TokenBalances, Tracer}
alias Indexer.Block.Realtime.{ConsensusEnsurer, TaskSupervisor} alias Indexer.Block.Realtime.{ConsensusEnsurer, TaskSupervisor}
@polling_period 2_000
@behaviour Block.Fetcher @behaviour Block.Fetcher
@enforce_keys ~w(block_fetcher)a @enforce_keys ~w(block_fetcher)a
defstruct ~w(block_fetcher subscription previous_number max_number_seen)a defstruct ~w(block_fetcher last_block_number_received_timestamp subscription previous_number max_number_seen)a
@type t :: %__MODULE__{ @type t :: %__MODULE__{
block_fetcher: %Block.Fetcher{ block_fetcher: %Block.Fetcher{
@ -31,6 +33,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
receipts_batch_size: pos_integer(), receipts_batch_size: pos_integer(),
receipts_concurrency: pos_integer() receipts_concurrency: pos_integer()
}, },
last_block_number_received_timestamp: pos_integer() | nil,
subscription: Subscription.t(), subscription: Subscription.t(),
previous_number: pos_integer() | nil, previous_number: pos_integer() | nil,
max_number_seen: pos_integer() | nil max_number_seen: pos_integer() | nil
@ -76,14 +79,56 @@ defmodule Indexer.Block.Realtime.Fetcher do
start_fetch_and_import(number, block_fetcher, previous_number, max_number_seen) start_fetch_and_import(number, block_fetcher, previous_number, max_number_seen)
new_max_number = new_max_number(number, max_number_seen) new_max_number = new_max_number(number, max_number_seen)
last_block_number_received_timestamp = System.system_time(:second)
schedule_polling()
{:noreply, %{state | previous_number: number, max_number_seen: new_max_number}} {:noreply,
%{
state
| previous_number: number,
max_number_seen: new_max_number,
last_block_number_received_timestamp: last_block_number_received_timestamp
}}
end
@impl GenServer
def handle_info(
:poll_latest_block_number,
%__MODULE__{
block_fetcher: %Block.Fetcher{json_rpc_named_arguments: json_rpc_named_arguments} = block_fetcher,
last_block_number_received_timestamp: last_block_number_timestamp,
previous_number: previous_number,
max_number_seen: max_number_seen
} = state
) do
block_number_timestamp =
case EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments) do
{:ok, number} ->
start_fetch_and_import(number, block_fetcher, previous_number, max_number_seen)
System.system_time(:second)
{:error, _} ->
last_block_number_timestamp
end
schedule_polling()
{:noreply,
%{
state
| last_block_number_received_timestamp: block_number_timestamp
}}
end end
defp new_max_number(number, nil), do: number defp new_max_number(number, nil), do: number
defp new_max_number(number, max_number_seen), do: max(number, max_number_seen) defp new_max_number(number, max_number_seen), do: max(number, max_number_seen)
defp schedule_polling do
Process.send_after(self(), :poll_latest_block_number, @polling_period)
end
@import_options ~w(address_hash_to_fetched_balance_block_number)a @import_options ~w(address_hash_to_fetched_balance_block_number)a
@impl Block.Fetcher @impl Block.Fetcher

Loading…
Cancel
Save