Start realtime index immediately from latest block on boot.

pull/248/head
Luke Imhoff 7 years ago
parent a025091af1
commit 805afcd536
  1. 54
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex
  2. 5
      apps/ethereum_jsonrpc/test/etheream_jsonrpc_test.exs
  3. 14
      apps/explorer/lib/explorer/indexer/block_fetcher.ex

@ -56,6 +56,17 @@ defmodule EthereumJSONRPC do
"""
@type quantity :: String.t()
@typedoc """
A logic block tag that can be used in place of a block number.
| Tag | Description |
|--------------|--------------------------------|
| `"earliest"` | The first block in the chain |
| `"latest"` | The latest collated block. |
| `"pending"` | The next block to be collated. |
"""
@type tag :: String.t()
@typedoc """
Unix timestamp encoded as a hexadecimal number in a `String.t`
"""
@ -107,7 +118,7 @@ defmodule EthereumJSONRPC do
block_hashes
|> get_block_by_hash_requests()
|> json_rpc(config(:url))
|> handle_get_block()
|> handle_get_blocks()
|> case do
{:ok, _next, results} -> {:ok, results}
{:error, reason} -> {:error, reason}
@ -121,7 +132,30 @@ defmodule EthereumJSONRPC do
block_start
|> get_block_by_number_requests(block_end)
|> json_rpc(config(:url))
|> handle_get_block()
|> handle_get_blocks()
end
@doc """
Fetches block number by `t:tag/0`.
The `"earliest"` tag is the earlist block number, which is `0`.
iex> EthereumJSONRPC.fetch_block_number_by_tag("earliest")
{:ok, 0}
## Returns
* `{:ok, number}` - the block number for the given `tag`.
* `{:error, :invalid_tag}` - When `tag` is not a valid `t:tag/0`.
* `{:error, reason}` - other JSONRPC error.
"""
@spec fetch_block_number_by_tag(tag()) :: {:ok, non_neg_integer()} | {:error, reason :: :invalid_tag | term()}
def fetch_block_number_by_tag(tag) when tag in ~w(earliest latest pending) do
tag
|> get_block_by_tag_request()
|> json_rpc(config(:url))
|> handle_get_block_by_tag()
end
@doc """
@ -215,6 +249,11 @@ defmodule EthereumJSONRPC do
request(%{id: id, method: "eth_getBlockByNumber", params: get_block_by_number_params(options)})
end
defp get_block_by_tag_request(tag) do
# eth_getBlockByNumber accepts either a number OR a tag
get_block_by_number_request(%{id: tag, tag: tag, transactions: :hashes})
end
defp request(%{id: id, method: method, params: params}) do
%{
"id" => id,
@ -271,7 +310,7 @@ defmodule EthereumJSONRPC do
raise("bad jason")
end
defp handle_get_block({:ok, results}) do
defp handle_get_blocks({:ok, results}) do
{blocks, next} =
Enum.reduce(results, {[], :more}, fn
%{"result" => nil}, {blocks, _} -> {blocks, :end_of_chain}
@ -290,10 +329,15 @@ defmodule EthereumJSONRPC do
}}
end
defp handle_get_block({:error, reason}) do
{:error, reason}
defp handle_get_blocks({:error, _} = error), do: error
defp handle_get_block_by_tag({:ok, %{"number" => quantity}}) do
{:ok, quantity_to_integer(quantity)}
end
defp handle_get_block_by_tag({:error, %{"code" => -32602}}), do: {:error, :invalid_tag}
defp handle_get_block_by_tag({:error, _} = error), do: error
defp handle_response(resp, 200) do
case resp do
[%{} | _] = batch_resp -> {:ok, batch_resp}

@ -0,0 +1,5 @@
defmodule EthereumJSONRPCTest do
use ExUnit.Case, async: true
doctest EthereumJSONRPC
end

@ -74,7 +74,12 @@ defmodule Explorer.Indexer.BlockFetcher do
receipts_concurrency: Keyword.get(opts, :receipts_concurrency, @receipts_concurrency)
}
{:ok, schedule_next_catchup_index(state)}
scheduled_state =
state
|> schedule_next_catchup_index()
|> schedule_next_realtime_fetch()
{:ok, scheduled_state}
end
@impl GenServer
@ -100,8 +105,8 @@ defmodule Explorer.Indexer.BlockFetcher do
end
def handle_info({:DOWN, _ref, :process, pid, :normal}, %{genesis_task: pid} = state) do
Logger.info(fn -> "Finished index from genesis. Transitioning to realtime index." end)
{:noreply, schedule_next_realtime_fetch(%{state | genesis_task: nil})}
Logger.info(fn -> "Finished index from genesis. Transitioning to only realtime index." end)
{:noreply, %{state | genesis_task: nil}}
end
def handle_info({:DOWN, _ref, :process, pid, _reason}, %{genesis_task: pid} = state) do
@ -222,7 +227,8 @@ defmodule Explorer.Indexer.BlockFetcher do
end
defp realtime_task(%{} = state) do
{:ok, seq} = Sequence.start_link([], Indexer.next_block_number(), 2)
{:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest")
{:ok, seq} = Sequence.start_link([], latest_block_number, 2)
stream_import(state, seq, max_concurrency: 1)
end

Loading…
Cancel
Save