Allow Realtime Fetcher to wait for small skips

Problem: Sometimes the realtime fetcher's `newHeads` websocket subscription skips a small amount of block numbers, just to be followed by those very same numbers that were skipped, without repeating the number that caused the skipping.
This may be cause by out-of-order events or be a legitimate reorg, but in any case this causes close-in-time refetching which in turn leads to async insertions problems.

Solution: allow the Realtime fetcher to keep track of a small (settable) skip and wait for the block number inside the skip to arrive before fetching the whole window.
Obviously if it gets notified of the blocks following the skip (or it is forced into polling) the whole window gets fetched and inserted.
pull/2470/head
pasqu4le 5 years ago
parent 8cbf9e6a31
commit e8c2fa554e
No known key found for this signature in database
GPG Key ID: 8F3EE01F1DC90687
  1. 1
      CHANGELOG.md
  2. 9
      apps/indexer/config/config.exs
  3. 30
      apps/indexer/lib/indexer/block/catchup/fetcher.ex
  4. 60
      apps/indexer/lib/indexer/block/realtime/fetcher.ex
  5. 5
      apps/indexer/test/indexer/block/catchup/bound_interval_supervisor_test.exs
  6. 5
      apps/indexer/test/indexer/block/catchup/fetcher_test.exs
  7. 52
      apps/indexer/test/indexer/block/realtime/fetcher_test.exs
  8. 1
      docs/env-variables.md

@ -1,6 +1,7 @@
## Current
### Features
- [#2470](https://github.com/poanetwork/blockscout/pull/2470) - Allow Realtime Fetcher to wait for small skips
- [#2733](https://github.com/poanetwork/blockscout/pull/2733) - Add cache for first page of uncles
- [#2735](https://github.com/poanetwork/blockscout/pull/2735) - Add pending transactions cache
- [#2726](https://github.com/poanetwork/blockscout/pull/2726) - Remove internal_transaction block_number setting from blocks runner

@ -28,6 +28,12 @@ block_transformer =
transformer
end
max_skipping_distance =
case Integer.parse(System.get_env("MAX_SKIPPING_DISTANCE", "")) do
{num, ""} -> num
_ -> 5
end
config :indexer,
block_transformer: block_transformer,
ecto_repos: [Explorer.Repo],
@ -36,7 +42,8 @@ config :indexer,
# bytes
memory_limit: 1 <<< 30,
first_block: System.get_env("FIRST_BLOCK") || "0",
last_block: System.get_env("LAST_BLOCK") || ""
last_block: System.get_env("LAST_BLOCK") || "",
max_skipping_distance: max_skipping_distance
# config :indexer, Indexer.Fetcher.ReplacedTransaction.Supervisor, disabled?: true
# config :indexer, Indexer.Fetcher.BlockReward.Supervisor, disabled?: true

@ -72,21 +72,12 @@ defmodule Indexer.Block.Catchup.Fetcher do
) do
Logger.metadata(fetcher: :block_catchup)
{:ok, latest_block_number} =
case latest_block() do
nil ->
EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments)
number ->
{:ok, number}
end
case latest_block_number do
case latest_block(json_rpc_named_arguments) do
# let realtime indexer get the genesis block
0 ->
%{first_block_number: 0, missing_block_count: 0, shrunk: false}
_ ->
latest_block_number ->
# realtime indexer gets the current latest block
first = latest_block_number - 1
last = last_block()
@ -345,12 +336,23 @@ defmodule Indexer.Block.Catchup.Fetcher do
end
end
defp latest_block do
defp latest_block(json_rpc_named_arguments) do
string_value = Application.get_env(:indexer, :last_block)
case Integer.parse(string_value) do
{integer, ""} -> integer
_ -> nil
{integer, ""} ->
integer
_ ->
{:ok, number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments)
# leave to realtime indexer the blocks in the skipping window
skipping_distance = Application.get_env(:indexer, :max_skipping_distance)
if number > skipping_distance do
number - skipping_distance
else
0
end
end
end
end

@ -87,9 +87,16 @@ defmodule Indexer.Block.Realtime.Fetcher do
number = quantity_to_integer(quantity)
# Subscriptions don't support getting all the blocks and transactions data,
# so we need to go back and get the full block
start_fetch_and_import(number, block_fetcher, previous_number, max_number_seen)
{new_previous_number, new_max_number} =
case start_fetch_and_import(number, block_fetcher, previous_number, max_number_seen) do
# The number may have not been inserted if it was part of a small skip
:skip ->
Logger.debug(["#{inspect(number)} was skipped"])
{previous_number, max_number_seen}
new_max_number = new_max_number(number, max_number_seen)
_ ->
{number, new_max_number(number, max_number_seen)}
end
Process.cancel_timer(timer)
new_timer = schedule_polling()
@ -97,7 +104,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
{:noreply,
%{
state
| previous_number: number,
| previous_number: new_previous_number,
max_number_seen: new_max_number,
timer: new_timer
}}
@ -115,7 +122,14 @@ defmodule Indexer.Block.Realtime.Fetcher do
{number, new_max_number} =
case EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments) do
{:ok, number} when is_nil(max_number_seen) or number > max_number_seen ->
start_fetch_and_import(number, block_fetcher, previous_number, number)
# in case of polling the realtime fetcher should take care of all the
# blocks in the skipping window, because the cathup fetcher wont
max_skipping_distance = Application.get_env(:indexer, :max_skipping_distance)
last_catchup_number = max(0, 10 - max_skipping_distance - 1)
starting_number = max(previous_number, last_catchup_number) || last_catchup_number
start_fetch_and_import(number, block_fetcher, starting_number, nil)
{max_number_seen, number}
@ -211,27 +225,31 @@ defmodule Indexer.Block.Realtime.Fetcher do
end
defp start_fetch_and_import(number, block_fetcher, previous_number, max_number_seen) do
start_at = determine_start_at(number, previous_number, max_number_seen)
fetching_action = determine_fetching_action(number, previous_number, max_number_seen)
for block_number_to_fetch <- start_at..number do
if fetching_action != :skip do
for block_number_to_fetch <- fetching_action do
args = [block_number_to_fetch, block_fetcher, reorg?(number, max_number_seen)]
Task.Supervisor.start_child(TaskSupervisor, __MODULE__, :fetch_and_import_block, args)
end
end
defp determine_start_at(number, nil, nil), do: number
defp determine_start_at(number, nil, max_number_seen) do
determine_start_at(number, number - 1, max_number_seen)
fetching_action
end
defp determine_start_at(number, previous_number, max_number_seen) do
if reorg?(number, max_number_seen) do
# set start_at to NOT fill in skipped numbers
number
else
# set start_at to fill in skipped numbers, if any
previous_number + 1
def determine_fetching_action(number, previous_number, max_number_seen) do
cond do
reorg?(number, max_number_seen) ->
[number]
can_be_skipped?(number, max_number_seen) ->
:skip
is_nil(previous_number) ->
[number]
true ->
(previous_number + 1)..number
end
end
@ -241,6 +259,14 @@ defmodule Indexer.Block.Realtime.Fetcher do
defp reorg?(_, _), do: false
defp can_be_skipped?(number, max_number_seen) when is_integer(max_number_seen) and number > max_number_seen + 1 do
max_skipping_distance = Application.get_env(:indexer, :max_skipping_distance)
max_skipping_distance > 1 and number <= max_number_seen + max_skipping_distance
end
defp can_be_skipped?(_, _), do: false
@reorg_delay 5_000
@decorate trace(name: "fetch", resource: "Indexer.Block.Realtime.Fetcher.fetch_and_import_block/3", tracer: Tracer)

@ -28,6 +28,11 @@ defmodule Indexer.Block.Catchup.BoundIntervalSupervisorTest do
setup :verify_on_exit!
# run the tests without the skipping window
setup do
Application.put_env(:indexer, :max_skipping_distance, 0)
end
describe "start_link/1" do
# See https://github.com/poanetwork/blockscout/issues/597
@tag :no_geth

@ -32,6 +32,11 @@ defmodule Indexer.Block.Catchup.FetcherTest do
}
end
setup do
# run the tests without the skipping window
Application.put_env(:indexer, :max_skipping_distance, 0)
end
describe "import/1" do
test "fetches uncles asynchronously", %{json_rpc_named_arguments: json_rpc_named_arguments} do
CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)

@ -40,6 +40,11 @@ defmodule Indexer.Block.Realtime.FetcherTest do
%{block_fetcher: block_fetcher, json_rpc_named_arguments: core_json_rpc_named_arguments}
end
setup do
# run the tests with a realistic skipping window
Application.put_env(:indexer, :max_skipping_distance, 3)
end
describe "Indexer.Block.Fetcher.fetch_and_import_range/1" do
@tag :no_geth
test "in range with internal transactions", %{
@ -424,4 +429,51 @@ defmodule Indexer.Block.Realtime.FetcherTest do
}} = Indexer.Block.Fetcher.fetch_and_import_range(block_fetcher, 3_946_079..3_946_080)
end
end
describe "determine_fetching_action/4" do
test "when everything (except number) is nil results in fetching only the number" do
assert [14] == Realtime.Fetcher.determine_fetching_action(14, nil, nil)
end
test "when number is also max_number_seen results in fetching only the number" do
number = 23
assert [number] == Realtime.Fetcher.determine_fetching_action(number, nil, number)
assert [number] == Realtime.Fetcher.determine_fetching_action(number, 21, number)
end
test "when max_number_seen is nil, fetching will start from previous_number" do
# note: this is a way to force this behavior, used by `poll_latest_block_number`
number = 156
previous_number = 150
old_number = 94
assert (previous_number + 1)..number == Realtime.Fetcher.determine_fetching_action(number, previous_number, nil)
assert (old_number + 1)..number == Realtime.Fetcher.determine_fetching_action(number, old_number, nil)
end
test "when number immediately follows the previous_number it is fetched" do
max_number_seen = 26
number = 27
assert [number] == Realtime.Fetcher.determine_fetching_action(number, nil, max_number_seen)
end
test "when number is inside the allowed skipping window nothing is fetched" do
max_number_seen = 26
assert :skip == Realtime.Fetcher.determine_fetching_action(28, nil, max_number_seen)
assert :skip == Realtime.Fetcher.determine_fetching_action(29, nil, max_number_seen)
end
test "when number is over the allowed skipping window all the values since the previous_number will be fetched" do
max_number_seen = 390
previous_number = 381
max_skipping_distance = Application.get_env(:indexer, :max_skipping_distance)
number = max_number_seen + max_skipping_distance + 1
assert (previous_number + 1)..number ==
Realtime.Fetcher.determine_fetching_action(number, previous_number, max_number_seen)
end
end
end

@ -70,3 +70,4 @@ $ export NETWORK=POA
| `EMISSION_FORMAT` | | Should be set to `POA` if you have block emission indentical to POA Network. This env var is used only if `CHAIN_SPEC_PATH` is set | `STANDARD` | v2.0.4+ | | |
| `REWARDS_CONTRACT_ADDRESS` | | Emission rewards contract address. This env var is used only if `EMISSION_FORMAT` is set to `POA` | `0xeca443e8e1ab29971a45a9c57a6a9875701698a5` | v2.0.4+ | | |
| `BLOCKSCOUT_PROTOCOL` | | Url scheme for blockscout | in prod env `https` is used, in dev env `http` is used | master | | |
| `MAX_SKIPPING_DISTANCE` | | The maximum distance the indexer is allowed to wait for when notified of a number not following the lask known one. | 4 | master | |

Loading…
Cancel
Save