Merge pull request #8613 from blockscout/vb-refactor-first-last-block

Refactor parsing of FIRST_BLOCK, LAST_BLOCK, TRACE_FIRST_BLOCK, TRACE_LAST_BLOCK env variables
pull/8594/head
Victor Baranov 1 year ago committed by GitHub
commit f390e83432
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 42
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex
  3. 37
      apps/explorer/lib/explorer/chain.ex
  4. 23
      apps/explorer/lib/explorer/chain/import/runner/blocks.ex
  5. 8
      apps/explorer/lib/explorer/chain/import/runner/internal_transactions.ex
  6. 8
      apps/explorer/test/explorer/chain_test.exs
  7. 25
      apps/indexer/lib/indexer/block/catchup/missing_ranges_collector.ex
  8. 5
      apps/indexer/lib/indexer/fetcher/coin_balance.ex
  9. 4
      apps/indexer/lib/indexer/fetcher/internal_transaction.ex
  10. 2
      apps/indexer/test/indexer/block/catchup/fetcher_test.exs
  11. 4
      apps/indexer/test/indexer/block/catchup/missing_ranges_collector_test.exs
  12. 11
      config/config_helper.exs
  13. 8
      config/runtime.exs

@ -22,6 +22,7 @@
- [#8661](https://github.com/blockscout/blockscout/pull/8661) - arm64-compatible docker image
- [#8649](https://github.com/blockscout/blockscout/pull/8649) - Set max 30sec JSON RPC poll frequency for realtime fetcher when WS is disabled
- [#8614](https://github.com/blockscout/blockscout/pull/8614) - Disable market history cataloger fetcher when exchange rates are disabled
- [#8613](https://github.com/blockscout/blockscout/pull/8613) - Refactor parsing of FIRST_BLOCK, LAST_BLOCK, TRACE_FIRST_BLOCK, TRACE_LAST_BLOCK env variables
- [#8572](https://github.com/blockscout/blockscout/pull/8572) - Refactor docker-compose config
- [#8552](https://github.com/blockscout/blockscout/pull/8552) - Add CHAIN_TYPE build arg to Dockerfile
- [#8550](https://github.com/blockscout/blockscout/pull/8550) - Sanitize paging params

@ -198,7 +198,13 @@ defmodule EthereumJSONRPC do
params_list
end
id_to_params = id_to_params(filtered_params)
filtered_params_in_range =
filtered_params
|> Enum.filter(fn
%{block_quantity: block_quantity} -> is_block_number_in_range?(block_quantity)
end)
id_to_params = id_to_params(filtered_params_in_range)
with {:ok, responses} <-
id_to_params
@ -234,7 +240,7 @@ defmodule EthereumJSONRPC do
@spec fetch_beneficiaries([block_number], json_rpc_named_arguments) ::
{:ok, FetchedBeneficiaries.t()} | {:error, reason :: term} | :ignore
def fetch_beneficiaries(block_numbers, json_rpc_named_arguments) when is_list(block_numbers) do
filtered_block_numbers = block_numbers_in_range(block_numbers)
filtered_block_numbers = are_block_numbers_in_range?(block_numbers)
Keyword.fetch!(json_rpc_named_arguments, :variant).fetch_beneficiaries(
filtered_block_numbers,
@ -339,7 +345,7 @@ defmodule EthereumJSONRPC do
Fetches internal transactions for entire blocks from variant API.
"""
def fetch_block_internal_transactions(block_numbers, json_rpc_named_arguments) when is_list(block_numbers) do
filtered_block_numbers = block_numbers_in_range(block_numbers)
filtered_block_numbers = are_block_numbers_in_range?(block_numbers)
Keyword.fetch!(json_rpc_named_arguments, :variant).fetch_block_internal_transactions(
filtered_block_numbers,
@ -347,12 +353,13 @@ defmodule EthereumJSONRPC do
)
end
def block_numbers_in_range(block_numbers) do
min_block = first_block_to_fetch(:trace_first_block)
def are_block_numbers_in_range?(block_numbers) do
min_block = Application.get_env(:indexer, :trace_first_block)
max_block = Application.get_env(:indexer, :trace_last_block)
block_numbers
|> Enum.filter(fn block_number ->
block_number >= min_block
block_number >= min_block && if max_block, do: block_number <= max_block, else: true
end)
end
@ -448,6 +455,20 @@ defmodule EthereumJSONRPC do
end
end
@spec is_block_number_in_range?(quantity) :: boolean()
defp is_block_number_in_range?(block_quantity) do
min_block = Application.get_env(:indexer, :trace_first_block)
max_block = Application.get_env(:indexer, :trace_last_block)
block_number = quantity_to_integer(block_quantity)
if !block_number ||
(block_number && block_number >= min_block && if(max_block, do: block_number <= max_block, else: true)) do
true
else
false
end
end
defp maybe_replace_url(url, _replace_url, EthereumJSONRPC.HTTP), do: url
defp maybe_replace_url(url, replace_url, _), do: EndpointAvailabilityObserver.maybe_replace_url(url, replace_url)
@ -562,13 +583,4 @@ defmodule EthereumJSONRPC do
defp chunk_requests(requests, nil), do: requests
defp chunk_requests(requests, chunk_size), do: Enum.chunk_every(requests, chunk_size)
def first_block_to_fetch(config) do
string_value = Application.get_env(:indexer, config)
case Integer.parse(string_value) do
{integer, ""} -> integer
_ -> 0
end
end
end

@ -1151,7 +1151,7 @@ defmodule Explorer.Chain do
select_repo(options).aggregate(Transaction, :min, :block_number) do
min_block_number =
min_block_number
|> Decimal.max(EthereumJSONRPC.first_block_to_fetch(:trace_first_block))
|> Decimal.max(Application.get_env(:indexer, :trace_first_block))
|> Decimal.to_integer()
query =
@ -1778,15 +1778,18 @@ defmodule Explorer.Chain do
if Application.get_env(:indexer, Indexer.Supervisor)[:enabled] do
%{min: min_saved_block_number, max: max_saved_block_number} = BlockNumber.get_all()
min_blockchain_block_number = min_block_number_from_config(:first_block)
min_blockchain_block_number = Application.get_env(:indexer, :first_block)
case {min_saved_block_number, max_saved_block_number} do
{0, 0} ->
Decimal.new(0)
_ ->
BlockCache.estimated_count()
|> Decimal.div(max_saved_block_number - min_blockchain_block_number + 1)
divisor = max_saved_block_number - min_blockchain_block_number + 1
ratio = get_ratio(BlockCache.estimated_count(), divisor)
ratio
|> (&if(
greater_or_equal_0_99(&1) &&
min_saved_block_number <= min_blockchain_block_number,
@ -1807,7 +1810,7 @@ defmodule Explorer.Chain do
%{max: max_saved_block_number} = BlockNumber.get_all()
pbo_count = PendingBlockOperationCache.estimated_count()
min_blockchain_trace_block_number = min_block_number_from_config(:trace_first_block)
min_blockchain_trace_block_number = Application.get_env(:indexer, :trace_first_block)
case max_saved_block_number do
0 ->
@ -1815,10 +1818,11 @@ defmodule Explorer.Chain do
_ ->
full_blocks_range = max_saved_block_number - min_blockchain_trace_block_number + 1
processed_int_txs_for_blocks_count = full_blocks_range - pbo_count
processed_int_txs_for_blocks_count = max(0, full_blocks_range - pbo_count)
processed_int_txs_for_blocks_count
|> Decimal.div(full_blocks_range)
ratio = get_ratio(processed_int_txs_for_blocks_count, full_blocks_range)
ratio
|> (&if(
greater_or_equal_0_99(&1),
do: Decimal.new(1),
@ -1831,20 +1835,19 @@ defmodule Explorer.Chain do
end
end
@spec get_ratio(non_neg_integer(), non_neg_integer()) :: Decimal.t()
defp get_ratio(dividend, divisor) do
if divisor > 0,
do: dividend |> Decimal.div(divisor),
else: Decimal.new(1)
end
@spec greater_or_equal_0_99(Decimal.t()) :: boolean()
defp greater_or_equal_0_99(value) do
Decimal.compare(value, Decimal.from_float(0.99)) == :gt ||
Decimal.compare(value, Decimal.from_float(0.99)) == :eq
end
@spec min_block_number_from_config(atom()) :: integer()
defp min_block_number_from_config(block_type) do
case Integer.parse(Application.get_env(:indexer, block_type)) do
{block_number, _} -> block_number
_ -> 0
end
end
@spec format_indexed_ratio(Decimal.t()) :: Decimal.t()
defp format_indexed_ratio(raw_ratio) do
raw_ratio
@ -5825,7 +5828,7 @@ defmodule Explorer.Chain do
if transaction_index == 0 do
0
else
filtered_block_numbers = EthereumJSONRPC.block_numbers_in_range([block_number])
filtered_block_numbers = EthereumJSONRPC.are_block_numbers_in_range?([block_number])
{:ok, traces} = fetch_block_internal_transactions(filtered_block_numbers, json_rpc_named_arguments)
sorted_traces =

@ -60,11 +60,9 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
hashes = Enum.map(changes_list, & &1.hash)
minimal_block_height = trace_minimal_block_height()
items_for_pending_ops =
changes_list
|> filter_by_min_height(&(&1.number >= minimal_block_height))
|> filter_by_height_range(&is_block_in_range?(&1.number))
|> Enum.filter(& &1.consensus)
|> Enum.map(&{&1.number, &1.hash})
@ -74,7 +72,7 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
run_func = fn repo ->
{:ok, nonconsensus_items} = lose_consensus(repo, hashes, consensus_block_numbers, changes_list, insert_options)
{:ok, filter_by_min_height(nonconsensus_items, fn {number, _hash} -> number >= minimal_block_height end)}
{:ok, filter_by_height_range(nonconsensus_items, fn {number, _hash} -> is_block_in_range?(number) end)}
end
multi
@ -216,6 +214,12 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
@impl Runner
def timeout, do: @timeout
defp is_block_in_range?(number) do
minimal_block_height = Application.get_env(:indexer, :trace_first_block)
maximal_block_height = Application.get_env(:indexer, :trace_last_block)
number >= minimal_block_height && if(maximal_block_height, do: number <= maximal_block_height, else: true)
end
defp fork_transactions(%{
repo: repo,
timeout: timeout,
@ -414,10 +418,6 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
lose_consensus(ExplorerRepo, [], block_numbers, [], opts)
end
defp trace_minimal_block_height do
EthereumJSONRPC.first_block_to_fetch(:trace_first_block)
end
defp new_pending_operations(repo, nonconsensus_items, items, %{
timeout: timeout,
timestamps: timestamps
@ -889,10 +889,11 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
where(invalid_neighbors_query, [block], block.consensus)
end
defp filter_by_min_height(blocks, filter_func) do
minimal_block_height = trace_minimal_block_height()
defp filter_by_height_range(blocks, filter_func) do
minimal_block_height = Application.get_env(:indexer, :trace_first_block)
maximal_block_height = Application.get_env(:indexer, :trace_last_block)
if minimal_block_height > 0 do
if minimal_block_height > 0 || maximal_block_height do
Enum.filter(blocks, &filter_func.(&1))
else
blocks

@ -15,7 +15,7 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do
alias Explorer.Repo, as: ExplorerRepo
alias Explorer.Utility.MissingRangesManipulator
import Ecto.Query, only: [from: 2]
import Ecto.Query, only: [from: 2, where: 3]
@behaviour Runner
@ -691,7 +691,8 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do
end
defp remove_consensus_of_invalid_blocks(repo, invalid_block_numbers) do
minimal_block = EthereumJSONRPC.first_block_to_fetch(:trace_first_block)
minimal_block = Application.get_env(:indexer, :trace_first_block)
maximal_block = Application.get_env(:indexer, :trace_last_block)
if Enum.count(invalid_block_numbers) > 0 do
update_query =
@ -704,6 +705,9 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do
update: [set: [consensus: false]]
)
update_query =
if maximal_block, do: update_query |> where([block], block.number < ^maximal_block), else: update_query
try do
{_num, result} = repo.update_all(update_query, [])

@ -1478,7 +1478,7 @@ defmodule Explorer.ChainTest do
Supervisor.restart_child(Explorer.Supervisor, Explorer.Chain.Cache.Block.child_id())
on_exit(fn ->
Application.put_env(:indexer, :first_block, "")
Application.put_env(:indexer, :first_block, 0)
end)
end
@ -1508,7 +1508,7 @@ defmodule Explorer.ChainTest do
end
test "returns 1.0 if fully indexed blocks starting from given FIRST_BLOCK" do
Application.put_env(:indexer, :first_block, "5")
Application.put_env(:indexer, :first_block, 5)
for index <- 5..9 do
insert(:block, number: index, consensus: true)
@ -1527,7 +1527,7 @@ defmodule Explorer.ChainTest do
Supervisor.restart_child(Explorer.Supervisor, PendingBlockOperationCache.child_id())
on_exit(fn ->
Application.put_env(:indexer, :trace_first_block, "")
Application.put_env(:indexer, :trace_first_block, 0)
Supervisor.terminate_child(Explorer.Supervisor, PendingBlockOperationCache.child_id())
end)
end
@ -1559,7 +1559,7 @@ defmodule Explorer.ChainTest do
end
test "returns 1.0 if fully indexed blocks with internal transactions starting from given TRACE_FIRST_BLOCK" do
Application.put_env(:indexer, :trace_first_block, "5")
Application.put_env(:indexer, :trace_first_block, 5)
for index <- 5..9 do
insert(:block, number: index)

@ -182,27 +182,19 @@ defmodule Indexer.Block.Catchup.MissingRangesCollector do
end
defp first_block do
string_value = Application.get_env(:indexer, :first_block)
first_block_from_config = Application.get_env(:indexer, :first_block)
case Integer.parse(string_value) do
{integer, ""} ->
integer
_ ->
min_missing_block_number =
"min_missing_block_number"
|> Chain.get_last_fetched_counter()
|> Decimal.to_integer()
min_missing_block_number
end
max(first_block_from_config, min_missing_block_number)
end
defp last_block do
case Integer.parse(Application.get_env(:indexer, :last_block)) do
{block, ""} -> block + 1
_ -> fetch_max_block_number()
end
last_block = Application.get_env(:indexer, :last_block)
if last_block, do: last_block + 1, else: fetch_max_block_number()
end
defp fetch_max_block_number do
@ -221,9 +213,12 @@ defmodule Indexer.Block.Catchup.MissingRangesCollector do
end
defp continue_future_updating?(max_fetched_block_number) do
case Integer.parse(Application.get_env(:indexer, :last_block)) do
{block, ""} -> max_fetched_block_number < block
_ -> true
last_block = Application.get_env(:indexer, :last_block)
if last_block do
max_fetched_block_number < last_block
else
true
end
end

@ -83,9 +83,12 @@ defmodule Indexer.Fetcher.CoinBalance do
# `{address, block}`, so take unique params only
unique_entries = Enum.uniq(entries)
min_block = Application.get_env(:indexer, :trace_first_block)
max_block = Application.get_env(:indexer, :trace_last_block)
unique_filtered_entries =
Enum.filter(unique_entries, fn {_hash, block_number} ->
block_number >= EthereumJSONRPC.first_block_to_fetch(:trace_first_block)
block_number >= min_block && if max_block, do: block_number <= max_block, else: true
end)
unique_entry_count = Enum.count(unique_filtered_entries)

@ -100,7 +100,7 @@ defmodule Indexer.Fetcher.InternalTransaction do
filtered_unique_numbers =
unique_numbers
|> EthereumJSONRPC.block_numbers_in_range()
|> EthereumJSONRPC.are_block_numbers_in_range?()
|> drop_genesis(json_rpc_named_arguments)
filtered_unique_numbers_count = Enum.count(filtered_unique_numbers)
@ -159,7 +159,7 @@ defmodule Indexer.Fetcher.InternalTransaction do
end
defp drop_genesis(block_numbers, json_rpc_named_arguments) do
first_block = EthereumJSONRPC.first_block_to_fetch(:trace_first_block)
first_block = Application.get_env(:indexer, :trace_first_block)
if first_block in block_numbers do
case EthereumJSONRPC.fetch_blocks_by_numbers([first_block], json_rpc_named_arguments) do

@ -38,7 +38,7 @@ defmodule Indexer.Block.Catchup.FetcherTest do
describe "import/1" do
setup do
configuration = Application.get_env(:indexer, :last_block)
Application.put_env(:indexer, :last_block, "0")
Application.put_env(:indexer, :last_block, 0)
on_exit(fn ->
Application.put_env(:indexer, :last_block, configuration)

@ -36,8 +36,8 @@ defmodule Indexer.Block.Catchup.MissingRangesCollectorTest do
end
test "FIRST_BLOCK and LAST_BLOCK envs" do
Application.put_env(:indexer, :first_block, "100")
Application.put_env(:indexer, :last_block, "200")
Application.put_env(:indexer, :first_block, 100)
Application.put_env(:indexer, :last_block, 200)
insert(:missing_block_range, from_number: 250, to_number: 220)
insert(:missing_block_range, from_number: 220, to_number: 190)

@ -47,6 +47,17 @@ defmodule ConfigHelper do
end
end
@spec parse_integer_or_nil_env_var(String.t()) :: non_neg_integer() | nil
def parse_integer_or_nil_env_var(env_var) do
env_var
|> System.get_env("")
|> Integer.parse()
|> case do
{integer, _} -> integer
_ -> nil
end
end
@spec parse_time_env_var(String.t(), String.t() | nil) :: non_neg_integer()
def parse_time_env_var(env_var, default_value) do
case env_var |> safe_get_env(default_value) |> String.downcase() |> Integer.parse() do

@ -445,10 +445,10 @@ config :indexer,
block_transformer: ConfigHelper.block_transformer(),
metadata_updater_milliseconds_interval: ConfigHelper.parse_time_env_var("TOKEN_METADATA_UPDATE_INTERVAL", "48h"),
block_ranges: System.get_env("BLOCK_RANGES"),
first_block: System.get_env("FIRST_BLOCK") || "",
last_block: System.get_env("LAST_BLOCK") || "",
trace_first_block: System.get_env("TRACE_FIRST_BLOCK") || "",
trace_last_block: System.get_env("TRACE_LAST_BLOCK") || "",
first_block: ConfigHelper.parse_integer_env_var("FIRST_BLOCK", 0),
last_block: ConfigHelper.parse_integer_or_nil_env_var("LAST_BLOCK"),
trace_first_block: ConfigHelper.parse_integer_env_var("TRACE_FIRST_BLOCK", 0),
trace_last_block: ConfigHelper.parse_integer_or_nil_env_var("TRACE_LAST_BLOCK"),
fetch_rewards_way: System.get_env("FETCH_REWARDS_WAY", "trace_block"),
memory_limit: ConfigHelper.indexer_memory_limit(),
receipts_batch_size: ConfigHelper.parse_integer_env_var("INDEXER_RECEIPTS_BATCH_SIZE", 250),

Loading…
Cancel
Save