From 60a36a4739b2570011f97e04444e14590e3914df Mon Sep 17 00:00:00 2001 From: Ayrat Badykov Date: Thu, 10 Jan 2019 17:00:04 +0300 Subject: [PATCH 1/3] return polling to realtime fetcher --- .../lib/indexer/block/realtime/fetcher.ex | 49 ++++++++++++++++++- 1 file changed, 47 insertions(+), 2 deletions(-) diff --git a/apps/indexer/lib/indexer/block/realtime/fetcher.ex b/apps/indexer/lib/indexer/block/realtime/fetcher.ex index 9196cb21b2..7f6310fd78 100644 --- a/apps/indexer/lib/indexer/block/realtime/fetcher.ex +++ b/apps/indexer/lib/indexer/block/realtime/fetcher.ex @@ -18,10 +18,12 @@ defmodule Indexer.Block.Realtime.Fetcher do alias Indexer.{AddressExtraction, Block, TokenBalances, Tracer} alias Indexer.Block.Realtime.{ConsensusEnsurer, TaskSupervisor} + @polling_period 2_000 + @behaviour Block.Fetcher @enforce_keys ~w(block_fetcher)a - defstruct ~w(block_fetcher subscription previous_number max_number_seen)a + defstruct ~w(block_fetcher last_block_number_received_timestamp subscription previous_number max_number_seen)a @type t :: %__MODULE__{ block_fetcher: %Block.Fetcher{ @@ -31,6 +33,7 @@ defmodule Indexer.Block.Realtime.Fetcher do receipts_batch_size: pos_integer(), receipts_concurrency: pos_integer() }, + last_block_number_received_timestamp: pos_integer() | nil, subscription: Subscription.t(), previous_number: pos_integer() | nil, max_number_seen: pos_integer() | nil @@ -76,14 +79,56 @@ defmodule Indexer.Block.Realtime.Fetcher do start_fetch_and_import(number, block_fetcher, previous_number, max_number_seen) new_max_number = new_max_number(number, max_number_seen) + last_block_number_received_timestamp = System.system_time(:second) + + schedule_polling() - {:noreply, %{state | previous_number: number, max_number_seen: new_max_number}} + {:noreply, + %{ + state + | previous_number: number, + max_number_seen: new_max_number, + last_block_number_received_timestamp: last_block_number_received_timestamp + }} + end + + @impl GenServer + def handle_info( + :poll_latest_block_number, + %__MODULE__{ + block_fetcher: %Block.Fetcher{json_rpc_named_arguments: json_rpc_named_arguments} = block_fetcher, + last_block_number_received_timestamp: last_block_number_timestamp, + previous_number: previous_number, + max_number_seen: max_number_seen + } = state + ) do + block_number_timestamp = + case EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments) do + {:ok, number} -> + start_fetch_and_import(number, block_fetcher, previous_number, max_number_seen) + System.system_time(:second) + + {:error, _} -> + last_block_number_timestamp + end + + schedule_polling() + + {:noreply, + %{ + state + | last_block_number_received_timestamp: block_number_timestamp + }} end defp new_max_number(number, nil), do: number defp new_max_number(number, max_number_seen), do: max(number, max_number_seen) + defp schedule_polling do + Process.send_after(self(), :poll_latest_block_number, @polling_period) + end + @import_options ~w(address_hash_to_fetched_balance_block_number)a @impl Block.Fetcher From 9f45c14ef722fcca8831f622f24e99b739886474 Mon Sep 17 00:00:00 2001 From: Ayrat Badykov Date: Thu, 10 Jan 2019 18:32:23 +0300 Subject: [PATCH 2/3] fix CR issues --- .../lib/indexer/block/realtime/fetcher.ex | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/apps/indexer/lib/indexer/block/realtime/fetcher.ex b/apps/indexer/lib/indexer/block/realtime/fetcher.ex index 7f6310fd78..ecd8d8efa6 100644 --- a/apps/indexer/lib/indexer/block/realtime/fetcher.ex +++ b/apps/indexer/lib/indexer/block/realtime/fetcher.ex @@ -23,7 +23,7 @@ defmodule Indexer.Block.Realtime.Fetcher do @behaviour Block.Fetcher @enforce_keys ~w(block_fetcher)a - defstruct ~w(block_fetcher last_block_number_received_timestamp subscription previous_number max_number_seen)a + defstruct ~w(block_fetcher subscription previous_number max_number_seen)a @type t :: %__MODULE__{ block_fetcher: %Block.Fetcher{ @@ -33,7 +33,6 @@ defmodule Indexer.Block.Realtime.Fetcher do receipts_batch_size: pos_integer(), receipts_concurrency: pos_integer() }, - last_block_number_received_timestamp: pos_integer() | nil, subscription: Subscription.t(), previous_number: pos_integer() | nil, max_number_seen: pos_integer() | nil @@ -79,7 +78,6 @@ defmodule Indexer.Block.Realtime.Fetcher do start_fetch_and_import(number, block_fetcher, previous_number, max_number_seen) new_max_number = new_max_number(number, max_number_seen) - last_block_number_received_timestamp = System.system_time(:second) schedule_polling() @@ -87,8 +85,7 @@ defmodule Indexer.Block.Realtime.Fetcher do %{ state | previous_number: number, - max_number_seen: new_max_number, - last_block_number_received_timestamp: last_block_number_received_timestamp + max_number_seen: new_max_number }} end @@ -97,19 +94,19 @@ defmodule Indexer.Block.Realtime.Fetcher do :poll_latest_block_number, %__MODULE__{ block_fetcher: %Block.Fetcher{json_rpc_named_arguments: json_rpc_named_arguments} = block_fetcher, - last_block_number_received_timestamp: last_block_number_timestamp, previous_number: previous_number, max_number_seen: max_number_seen } = state ) do - block_number_timestamp = + {number, new_max_number} = case EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments) do {:ok, number} -> start_fetch_and_import(number, block_fetcher, previous_number, max_number_seen) - System.system_time(:second) + + {number, new_max_number(number, max_number_seen)} {:error, _} -> - last_block_number_timestamp + {previous_number, max_number_seen} end schedule_polling() @@ -117,7 +114,8 @@ defmodule Indexer.Block.Realtime.Fetcher do {:noreply, %{ state - | last_block_number_received_timestamp: block_number_timestamp + | previous_number: number, + max_number_seen: new_max_number }} end From 5c1fca9ac9fe11c762b611133210e3f5d6d6eacb Mon Sep 17 00:00:00 2001 From: Ayrat Badykov Date: Thu, 10 Jan 2019 18:36:22 +0300 Subject: [PATCH 3/3] fix sigil formatting --- apps/indexer/lib/indexer/block/realtime/fetcher.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/indexer/lib/indexer/block/realtime/fetcher.ex b/apps/indexer/lib/indexer/block/realtime/fetcher.ex index ecd8d8efa6..36c9425ee5 100644 --- a/apps/indexer/lib/indexer/block/realtime/fetcher.ex +++ b/apps/indexer/lib/indexer/block/realtime/fetcher.ex @@ -23,7 +23,7 @@ defmodule Indexer.Block.Realtime.Fetcher do @behaviour Block.Fetcher @enforce_keys ~w(block_fetcher)a - defstruct ~w(block_fetcher subscription previous_number max_number_seen)a + defstruct ~w(block_fetcher subscription previous_number max_number_seen)a @type t :: %__MODULE__{ block_fetcher: %Block.Fetcher{