From 805afcd536d59580abce5d815129315755453190 Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Mon, 4 Jun 2018 14:01:24 -0500 Subject: [PATCH] Start realtime index immediately from latest block on boot. --- apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex | 54 +++++++++++++++++-- .../test/etheream_jsonrpc_test.exs | 5 ++ .../lib/explorer/indexer/block_fetcher.ex | 14 +++-- 3 files changed, 64 insertions(+), 9 deletions(-) create mode 100644 apps/ethereum_jsonrpc/test/etheream_jsonrpc_test.exs diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex index 08e5c8513a..7756a34810 100644 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex @@ -56,6 +56,17 @@ defmodule EthereumJSONRPC do """ @type quantity :: String.t() + @typedoc """ + A logic block tag that can be used in place of a block number. + + | Tag | Description | + |--------------|--------------------------------| + | `"earliest"` | The first block in the chain | + | `"latest"` | The latest collated block. | + | `"pending"` | The next block to be collated. | + """ + @type tag :: String.t() + @typedoc """ Unix timestamp encoded as a hexadecimal number in a `String.t` """ @@ -107,7 +118,7 @@ defmodule EthereumJSONRPC do block_hashes |> get_block_by_hash_requests() |> json_rpc(config(:url)) - |> handle_get_block() + |> handle_get_blocks() |> case do {:ok, _next, results} -> {:ok, results} {:error, reason} -> {:error, reason} @@ -121,7 +132,30 @@ defmodule EthereumJSONRPC do block_start |> get_block_by_number_requests(block_end) |> json_rpc(config(:url)) - |> handle_get_block() + |> handle_get_blocks() + end + + @doc """ + Fetches block number by `t:tag/0`. + + The `"earliest"` tag is the earlist block number, which is `0`. + + iex> EthereumJSONRPC.fetch_block_number_by_tag("earliest") + {:ok, 0} + + ## Returns + + * `{:ok, number}` - the block number for the given `tag`. + * `{:error, :invalid_tag}` - When `tag` is not a valid `t:tag/0`. + * `{:error, reason}` - other JSONRPC error. + + """ + @spec fetch_block_number_by_tag(tag()) :: {:ok, non_neg_integer()} | {:error, reason :: :invalid_tag | term()} + def fetch_block_number_by_tag(tag) when tag in ~w(earliest latest pending) do + tag + |> get_block_by_tag_request() + |> json_rpc(config(:url)) + |> handle_get_block_by_tag() end @doc """ @@ -215,6 +249,11 @@ defmodule EthereumJSONRPC do request(%{id: id, method: "eth_getBlockByNumber", params: get_block_by_number_params(options)}) end + defp get_block_by_tag_request(tag) do + # eth_getBlockByNumber accepts either a number OR a tag + get_block_by_number_request(%{id: tag, tag: tag, transactions: :hashes}) + end + defp request(%{id: id, method: method, params: params}) do %{ "id" => id, @@ -271,7 +310,7 @@ defmodule EthereumJSONRPC do raise("bad jason") end - defp handle_get_block({:ok, results}) do + defp handle_get_blocks({:ok, results}) do {blocks, next} = Enum.reduce(results, {[], :more}, fn %{"result" => nil}, {blocks, _} -> {blocks, :end_of_chain} @@ -290,10 +329,15 @@ defmodule EthereumJSONRPC do }} end - defp handle_get_block({:error, reason}) do - {:error, reason} + defp handle_get_blocks({:error, _} = error), do: error + + defp handle_get_block_by_tag({:ok, %{"number" => quantity}}) do + {:ok, quantity_to_integer(quantity)} end + defp handle_get_block_by_tag({:error, %{"code" => -32602}}), do: {:error, :invalid_tag} + defp handle_get_block_by_tag({:error, _} = error), do: error + defp handle_response(resp, 200) do case resp do [%{} | _] = batch_resp -> {:ok, batch_resp} diff --git a/apps/ethereum_jsonrpc/test/etheream_jsonrpc_test.exs b/apps/ethereum_jsonrpc/test/etheream_jsonrpc_test.exs new file mode 100644 index 0000000000..f7a1a2e6f7 --- /dev/null +++ b/apps/ethereum_jsonrpc/test/etheream_jsonrpc_test.exs @@ -0,0 +1,5 @@ +defmodule EthereumJSONRPCTest do + use ExUnit.Case, async: true + + doctest EthereumJSONRPC +end diff --git a/apps/explorer/lib/explorer/indexer/block_fetcher.ex b/apps/explorer/lib/explorer/indexer/block_fetcher.ex index 288a0bfc58..32818968e5 100644 --- a/apps/explorer/lib/explorer/indexer/block_fetcher.ex +++ b/apps/explorer/lib/explorer/indexer/block_fetcher.ex @@ -74,7 +74,12 @@ defmodule Explorer.Indexer.BlockFetcher do receipts_concurrency: Keyword.get(opts, :receipts_concurrency, @receipts_concurrency) } - {:ok, schedule_next_catchup_index(state)} + scheduled_state = + state + |> schedule_next_catchup_index() + |> schedule_next_realtime_fetch() + + {:ok, scheduled_state} end @impl GenServer @@ -100,8 +105,8 @@ defmodule Explorer.Indexer.BlockFetcher do end def handle_info({:DOWN, _ref, :process, pid, :normal}, %{genesis_task: pid} = state) do - Logger.info(fn -> "Finished index from genesis. Transitioning to realtime index." end) - {:noreply, schedule_next_realtime_fetch(%{state | genesis_task: nil})} + Logger.info(fn -> "Finished index from genesis. Transitioning to only realtime index." end) + {:noreply, %{state | genesis_task: nil}} end def handle_info({:DOWN, _ref, :process, pid, _reason}, %{genesis_task: pid} = state) do @@ -222,7 +227,8 @@ defmodule Explorer.Indexer.BlockFetcher do end defp realtime_task(%{} = state) do - {:ok, seq} = Sequence.start_link([], Indexer.next_block_number(), 2) + {:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest") + {:ok, seq} = Sequence.start_link([], latest_block_number, 2) stream_import(state, seq, max_concurrency: 1) end