Merge pull request #3011 from poanetwork/vb-revert-rt-fetcher-small-skips-feature

Revert realtime fetcher small skips feature
pull/3016/head
Victor Baranov 5 years ago committed by GitHub
commit d2ea212d7b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 9
      apps/indexer/config/config.exs
  3. 30
      apps/indexer/lib/indexer/block/catchup/fetcher.ex
  4. 62
      apps/indexer/lib/indexer/block/realtime/fetcher.ex
  5. 5
      apps/indexer/test/indexer/block/catchup/bound_interval_supervisor_test.exs
  6. 5
      apps/indexer/test/indexer/block/catchup/fetcher_test.exs
  7. 52
      apps/indexer/test/indexer/block/realtime/fetcher_test.exs

@ -6,6 +6,7 @@
### Fixes
- [#3012](https://github.com/poanetwork/blockscout/pull/3012) - Speedup token transfers list query
- [#3011](https://github.com/poanetwork/blockscout/pull/3011) - Revert realtime fetcher small skips feature
- [#3009](https://github.com/poanetwork/blockscout/pull/3009) - Fix broken export to CSV
- [#3007](https://github.com/poanetwork/blockscout/pull/3007) - Fix copy UTF8 tx input action
- [#2996](https://github.com/poanetwork/blockscout/pull/2996) - Fix awesomplete lib loading in Firefox

@ -28,12 +28,6 @@ block_transformer =
transformer
end
max_skipping_distance =
case Integer.parse(System.get_env("MAX_SKIPPING_DISTANCE", "")) do
{num, ""} -> num
_ -> 5
end
config :indexer,
block_transformer: block_transformer,
ecto_repos: [Explorer.Repo],
@ -42,8 +36,7 @@ config :indexer,
# bytes
memory_limit: 1 <<< 30,
first_block: System.get_env("FIRST_BLOCK") || "0",
last_block: System.get_env("LAST_BLOCK") || "",
max_skipping_distance: max_skipping_distance
last_block: System.get_env("LAST_BLOCK") || ""
# config :indexer, Indexer.Fetcher.ReplacedTransaction.Supervisor, disabled?: true
# config :indexer, Indexer.Fetcher.BlockReward.Supervisor, disabled?: true

@ -73,12 +73,21 @@ defmodule Indexer.Block.Catchup.Fetcher do
) do
Logger.metadata(fetcher: :block_catchup)
case latest_block(json_rpc_named_arguments) do
{:ok, latest_block_number} =
case latest_block() do
nil ->
EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments)
number ->
{:ok, number}
end
case latest_block_number do
# let realtime indexer get the genesis block
0 ->
%{first_block_number: 0, missing_block_count: 0, last_block_number: 0, shrunk: false}
latest_block_number ->
_ ->
# realtime indexer gets the current latest block
first = latest_block_number - 1
last = last_block()
@ -336,23 +345,12 @@ defmodule Indexer.Block.Catchup.Fetcher do
end
end
defp latest_block(json_rpc_named_arguments) do
defp latest_block do
string_value = Application.get_env(:indexer, :last_block)
case Integer.parse(string_value) do
{integer, ""} ->
integer
_ ->
{:ok, number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments)
# leave to realtime indexer the blocks in the skipping window
skipping_distance = Application.get_env(:indexer, :max_skipping_distance)
if number > skipping_distance do
number - skipping_distance
else
0
end
{integer, ""} -> integer
_ -> nil
end
end
end

@ -88,16 +88,9 @@ defmodule Indexer.Block.Realtime.Fetcher do
number = quantity_to_integer(quantity)
# Subscriptions don't support getting all the blocks and transactions data,
# so we need to go back and get the full block
{new_previous_number, new_max_number} =
case start_fetch_and_import(number, block_fetcher, previous_number, max_number_seen) do
# The number may have not been inserted if it was part of a small skip
:skip ->
Logger.debug(["#{inspect(number)} was skipped"])
{previous_number, max_number_seen}
start_fetch_and_import(number, block_fetcher, previous_number, max_number_seen)
_ ->
{number, new_max_number(number, max_number_seen)}
end
new_max_number = new_max_number(number, max_number_seen)
Process.cancel_timer(timer)
new_timer = schedule_polling()
@ -105,7 +98,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
{:noreply,
%{
state
| previous_number: new_previous_number,
| previous_number: number,
max_number_seen: new_max_number,
timer: new_timer
}}
@ -123,14 +116,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
{number, new_max_number} =
case EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments) do
{:ok, number} when is_nil(max_number_seen) or number > max_number_seen ->
# in case of polling the realtime fetcher should take care of all the
# blocks in the skipping window, because the cathup fetcher wont
max_skipping_distance = Application.get_env(:indexer, :max_skipping_distance)
last_catchup_number = max(0, 10 - max_skipping_distance - 1)
starting_number = max(previous_number, last_catchup_number) || last_catchup_number
start_fetch_and_import(number, block_fetcher, starting_number, nil)
start_fetch_and_import(number, block_fetcher, previous_number, number)
{max_number_seen, number}
@ -225,35 +211,27 @@ defmodule Indexer.Block.Realtime.Fetcher do
end
defp start_fetch_and_import(number, block_fetcher, previous_number, max_number_seen) do
fetching_action = determine_fetching_action(number, previous_number, max_number_seen)
start_at = determine_start_at(number, previous_number, max_number_seen)
if fetching_action != :skip do
for block_number_to_fetch <- fetching_action do
for block_number_to_fetch <- start_at..number do
args = [block_number_to_fetch, block_fetcher, reorg?(number, max_number_seen)]
Task.Supervisor.start_child(TaskSupervisor, __MODULE__, :fetch_and_import_block, args)
end
end
fetching_action
end
def determine_fetching_action(number, previous_number, max_number_seen) do
cond do
reorg?(number, max_number_seen) ->
[number]
defp determine_start_at(number, nil, nil), do: number
can_be_skipped?(number, max_number_seen) ->
:skip
is_nil(previous_number) ->
[number]
defp determine_start_at(number, nil, max_number_seen) do
determine_start_at(number, number - 1, max_number_seen)
end
true ->
if number - previous_number - 1 > 10 do
(number - 10)..number
defp determine_start_at(number, previous_number, max_number_seen) do
if reorg?(number, max_number_seen) do
# set start_at to NOT fill in skipped numbers
number
else
(previous_number + 1)..number
end
# set start_at to fill in skipped numbers, if any
previous_number + 1
end
end
@ -263,14 +241,6 @@ defmodule Indexer.Block.Realtime.Fetcher do
defp reorg?(_, _), do: false
defp can_be_skipped?(number, max_number_seen) when is_integer(max_number_seen) and number > max_number_seen + 1 do
max_skipping_distance = Application.get_env(:indexer, :max_skipping_distance)
max_skipping_distance > 1 and number <= max_number_seen + max_skipping_distance
end
defp can_be_skipped?(_, _), do: false
@reorg_delay 5_000
@decorate trace(name: "fetch", resource: "Indexer.Block.Realtime.Fetcher.fetch_and_import_block/3", tracer: Tracer)

@ -28,11 +28,6 @@ defmodule Indexer.Block.Catchup.BoundIntervalSupervisorTest do
setup :verify_on_exit!
# run the tests without the skipping window
setup do
Application.put_env(:indexer, :max_skipping_distance, 0)
end
describe "start_link/1" do
# See https://github.com/poanetwork/blockscout/issues/597
@tag :no_geth

@ -32,11 +32,6 @@ defmodule Indexer.Block.Catchup.FetcherTest do
}
end
setup do
# run the tests without the skipping window
Application.put_env(:indexer, :max_skipping_distance, 0)
end
describe "import/1" do
test "fetches uncles asynchronously", %{json_rpc_named_arguments: json_rpc_named_arguments} do
CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)

@ -40,11 +40,6 @@ defmodule Indexer.Block.Realtime.FetcherTest do
%{block_fetcher: block_fetcher, json_rpc_named_arguments: core_json_rpc_named_arguments}
end
setup do
# run the tests with a realistic skipping window
Application.put_env(:indexer, :max_skipping_distance, 3)
end
describe "Indexer.Block.Fetcher.fetch_and_import_range/1" do
@tag :no_geth
test "in range with internal transactions", %{
@ -429,51 +424,4 @@ defmodule Indexer.Block.Realtime.FetcherTest do
}} = Indexer.Block.Fetcher.fetch_and_import_range(block_fetcher, 3_946_079..3_946_080)
end
end
describe "determine_fetching_action/4" do
test "when everything (except number) is nil results in fetching only the number" do
assert [14] == Realtime.Fetcher.determine_fetching_action(14, nil, nil)
end
test "when number is also max_number_seen results in fetching only the number" do
number = 23
assert [number] == Realtime.Fetcher.determine_fetching_action(number, nil, number)
assert [number] == Realtime.Fetcher.determine_fetching_action(number, 21, number)
end
test "when max_number_seen is nil, fetching will start from previous_number" do
# note: this is a way to force this behavior, used by `poll_latest_block_number`
number = 156
previous_number = 150
old_number = 94
assert (previous_number + 1)..number == Realtime.Fetcher.determine_fetching_action(number, previous_number, nil)
assert (number - 10)..number == Realtime.Fetcher.determine_fetching_action(number, old_number, nil)
end
test "when number immediately follows the previous_number it is fetched" do
max_number_seen = 26
number = 27
assert [number] == Realtime.Fetcher.determine_fetching_action(number, nil, max_number_seen)
end
test "when number is inside the allowed skipping window nothing is fetched" do
max_number_seen = 26
assert :skip == Realtime.Fetcher.determine_fetching_action(28, nil, max_number_seen)
assert :skip == Realtime.Fetcher.determine_fetching_action(29, nil, max_number_seen)
end
test "when number is over the allowed skipping window all the values since the previous_number will be fetched" do
max_number_seen = 390
previous_number = 381
max_skipping_distance = Application.get_env(:indexer, :max_skipping_distance)
number = max_number_seen + max_skipping_distance + 1
assert (number - 10)..number ==
Realtime.Fetcher.determine_fetching_action(number, previous_number, max_number_seen)
end
end
end

Loading…
Cancel
Save