Refactor parsing of FIRST_BLOCK, LAST_BLOCK, TRACE_FIRST_BLOCK, TRACE_LAST_BLOCK env variables

pull/8613/head
Viktor Baranov 1 year ago
parent c70968c780
commit 2e0acd5471
  1. 1
      CHANGELOG.md
  2. 36
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex
  3. 35
      apps/explorer/lib/explorer/chain.ex
  4. 21
      apps/explorer/lib/explorer/chain/import/runner/blocks.ex
  5. 8
      apps/explorer/lib/explorer/chain/import/runner/internal_transactions.ex
  6. 8
      apps/explorer/test/explorer/chain_test.exs
  7. 33
      apps/indexer/lib/indexer/block/catchup/missing_ranges_collector.ex
  8. 5
      apps/indexer/lib/indexer/fetcher/coin_balance.ex
  9. 2
      apps/indexer/lib/indexer/fetcher/internal_transaction.ex
  10. 2
      apps/indexer/test/indexer/block/catchup/fetcher_test.exs
  11. 4
      apps/indexer/test/indexer/block/catchup/missing_ranges_collector_test.exs
  12. 11
      config/config_helper.exs
  13. 8
      config/runtime.exs

@ -15,6 +15,7 @@
### Fixes ### Fixes
- [#8614](https://github.com/blockscout/blockscout/pull/8614) - Disable market history cataloger fetcher when exchange rates are disabled - [#8614](https://github.com/blockscout/blockscout/pull/8614) - Disable market history cataloger fetcher when exchange rates are disabled
- [#8613](https://github.com/blockscout/blockscout/pull/8613) - Refactor parsing of FIRST_BLOCK, LAST_BLOCK, TRACE_FIRST_BLOCK, TRACE_LAST_BLOCK env variables
- [#8572](https://github.com/blockscout/blockscout/pull/8572) - Refactor docker-compose config - [#8572](https://github.com/blockscout/blockscout/pull/8572) - Refactor docker-compose config
- [#8552](https://github.com/blockscout/blockscout/pull/8552) - Add CHAIN_TYPE build arg to Dockerfile - [#8552](https://github.com/blockscout/blockscout/pull/8552) - Add CHAIN_TYPE build arg to Dockerfile
- [#8550](https://github.com/blockscout/blockscout/pull/8550) - Sanitize paging params - [#8550](https://github.com/blockscout/blockscout/pull/8550) - Sanitize paging params

@ -198,7 +198,13 @@ defmodule EthereumJSONRPC do
params_list params_list
end end
id_to_params = id_to_params(filtered_params) filtered_params_in_range =
filtered_params
|> Enum.filter(fn
%{block_quantity: block_quantity} -> block_number_in_range(block_quantity)
end)
id_to_params = id_to_params(filtered_params_in_range)
with {:ok, responses} <- with {:ok, responses} <-
id_to_params id_to_params
@ -348,11 +354,12 @@ defmodule EthereumJSONRPC do
end end
def block_numbers_in_range(block_numbers) do def block_numbers_in_range(block_numbers) do
min_block = first_block_to_fetch(:trace_first_block) min_block = Application.get_env(:indexer, :trace_first_block)
max_block = Application.get_env(:indexer, :trace_last_block)
block_numbers block_numbers
|> Enum.filter(fn block_number -> |> Enum.filter(fn block_number ->
block_number >= min_block block_number >= min_block && if max_block, do: block_number <= max_block, else: true
end) end)
end end
@ -448,6 +455,20 @@ defmodule EthereumJSONRPC do
end end
end end
@spec block_number_in_range(quantity) :: boolean()
defp block_number_in_range(block_quantity) do
min_block = Application.get_env(:indexer, :trace_first_block)
max_block = Application.get_env(:indexer, :trace_last_block)
block_number = quantity_to_integer(block_quantity)
if !block_number ||
(block_number && block_number >= min_block && if(max_block, do: block_number <= max_block, else: true)) do
true
else
false
end
end
defp maybe_replace_url(url, _replace_url, EthereumJSONRPC.HTTP), do: url defp maybe_replace_url(url, _replace_url, EthereumJSONRPC.HTTP), do: url
defp maybe_replace_url(url, replace_url, _), do: EndpointAvailabilityObserver.maybe_replace_url(url, replace_url) defp maybe_replace_url(url, replace_url, _), do: EndpointAvailabilityObserver.maybe_replace_url(url, replace_url)
@ -562,13 +583,4 @@ defmodule EthereumJSONRPC do
defp chunk_requests(requests, nil), do: requests defp chunk_requests(requests, nil), do: requests
defp chunk_requests(requests, chunk_size), do: Enum.chunk_every(requests, chunk_size) defp chunk_requests(requests, chunk_size), do: Enum.chunk_every(requests, chunk_size)
def first_block_to_fetch(config) do
string_value = Application.get_env(:indexer, config)
case Integer.parse(string_value) do
{integer, ""} -> integer
_ -> 0
end
end
end end

@ -1151,7 +1151,7 @@ defmodule Explorer.Chain do
select_repo(options).aggregate(Transaction, :min, :block_number) do select_repo(options).aggregate(Transaction, :min, :block_number) do
min_block_number = min_block_number =
min_block_number min_block_number
|> Decimal.max(EthereumJSONRPC.first_block_to_fetch(:trace_first_block)) |> Decimal.max(Application.get_env(:indexer, :trace_first_block))
|> Decimal.to_integer() |> Decimal.to_integer()
query = query =
@ -1778,15 +1778,18 @@ defmodule Explorer.Chain do
if Application.get_env(:indexer, Indexer.Supervisor)[:enabled] do if Application.get_env(:indexer, Indexer.Supervisor)[:enabled] do
%{min: min_saved_block_number, max: max_saved_block_number} = BlockNumber.get_all() %{min: min_saved_block_number, max: max_saved_block_number} = BlockNumber.get_all()
min_blockchain_block_number = min_block_number_from_config(:first_block) min_blockchain_block_number = Application.get_env(:indexer, :first_block)
case {min_saved_block_number, max_saved_block_number} do case {min_saved_block_number, max_saved_block_number} do
{0, 0} -> {0, 0} ->
Decimal.new(0) Decimal.new(0)
_ -> _ ->
BlockCache.estimated_count() divisor = max_saved_block_number - min_blockchain_block_number + 1
|> Decimal.div(max_saved_block_number - min_blockchain_block_number + 1)
ratio = get_ratio(BlockCache.estimated_count(), divisor)
ratio
|> (&if( |> (&if(
greater_or_equal_0_99(&1) && greater_or_equal_0_99(&1) &&
min_saved_block_number <= min_blockchain_block_number, min_saved_block_number <= min_blockchain_block_number,
@ -1807,7 +1810,7 @@ defmodule Explorer.Chain do
%{max: max_saved_block_number} = BlockNumber.get_all() %{max: max_saved_block_number} = BlockNumber.get_all()
pbo_count = PendingBlockOperationCache.estimated_count() pbo_count = PendingBlockOperationCache.estimated_count()
min_blockchain_trace_block_number = min_block_number_from_config(:trace_first_block) min_blockchain_trace_block_number = Application.get_env(:indexer, :trace_first_block)
case max_saved_block_number do case max_saved_block_number do
0 -> 0 ->
@ -1815,10 +1818,11 @@ defmodule Explorer.Chain do
_ -> _ ->
full_blocks_range = max_saved_block_number - min_blockchain_trace_block_number + 1 full_blocks_range = max_saved_block_number - min_blockchain_trace_block_number + 1
processed_int_txs_for_blocks_count = full_blocks_range - pbo_count processed_int_txs_for_blocks_count = max(0, full_blocks_range - pbo_count)
processed_int_txs_for_blocks_count ratio = get_ratio(processed_int_txs_for_blocks_count, full_blocks_range)
|> Decimal.div(full_blocks_range)
ratio
|> (&if( |> (&if(
greater_or_equal_0_99(&1), greater_or_equal_0_99(&1),
do: Decimal.new(1), do: Decimal.new(1),
@ -1831,20 +1835,19 @@ defmodule Explorer.Chain do
end end
end end
@spec get_ratio(non_neg_integer(), non_neg_integer()) :: Decimal.t()
defp get_ratio(dividend, divisor) do
if divisor > 0,
do: dividend |> Decimal.div(divisor),
else: Decimal.new(1)
end
@spec greater_or_equal_0_99(Decimal.t()) :: boolean() @spec greater_or_equal_0_99(Decimal.t()) :: boolean()
defp greater_or_equal_0_99(value) do defp greater_or_equal_0_99(value) do
Decimal.compare(value, Decimal.from_float(0.99)) == :gt || Decimal.compare(value, Decimal.from_float(0.99)) == :gt ||
Decimal.compare(value, Decimal.from_float(0.99)) == :eq Decimal.compare(value, Decimal.from_float(0.99)) == :eq
end end
@spec min_block_number_from_config(atom()) :: integer()
defp min_block_number_from_config(block_type) do
case Integer.parse(Application.get_env(:indexer, block_type)) do
{block_number, _} -> block_number
_ -> 0
end
end
@spec format_indexed_ratio(Decimal.t()) :: Decimal.t() @spec format_indexed_ratio(Decimal.t()) :: Decimal.t()
defp format_indexed_ratio(raw_ratio) do defp format_indexed_ratio(raw_ratio) do
raw_ratio raw_ratio

@ -60,11 +60,15 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
hashes = Enum.map(changes_list, & &1.hash) hashes = Enum.map(changes_list, & &1.hash)
minimal_block_height = trace_minimal_block_height() minimal_block_height = Application.get_env(:indexer, :trace_first_block)
maximal_block_height = Application.get_env(:indexer, :trace_last_block)
items_for_pending_ops = items_for_pending_ops =
changes_list changes_list
|> filter_by_min_height(&(&1.number >= minimal_block_height)) |> filter_by_height_range(
&(&1.number >= minimal_block_height &&
if(maximal_block_height, do: &1.number <= maximal_block_height, else: true))
)
|> Enum.filter(& &1.consensus) |> Enum.filter(& &1.consensus)
|> Enum.map(&{&1.number, &1.hash}) |> Enum.map(&{&1.number, &1.hash})
@ -74,7 +78,7 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
run_func = fn repo -> run_func = fn repo ->
{:ok, nonconsensus_items} = lose_consensus(repo, hashes, consensus_block_numbers, changes_list, insert_options) {:ok, nonconsensus_items} = lose_consensus(repo, hashes, consensus_block_numbers, changes_list, insert_options)
{:ok, filter_by_min_height(nonconsensus_items, fn {number, _hash} -> number >= minimal_block_height end)} {:ok, filter_by_height_range(nonconsensus_items, fn {number, _hash} -> number >= minimal_block_height end)}
end end
multi multi
@ -414,10 +418,6 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
lose_consensus(ExplorerRepo, [], block_numbers, [], opts) lose_consensus(ExplorerRepo, [], block_numbers, [], opts)
end end
defp trace_minimal_block_height do
EthereumJSONRPC.first_block_to_fetch(:trace_first_block)
end
defp new_pending_operations(repo, nonconsensus_items, items, %{ defp new_pending_operations(repo, nonconsensus_items, items, %{
timeout: timeout, timeout: timeout,
timestamps: timestamps timestamps: timestamps
@ -893,10 +893,11 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
where(invalid_neighbors_query, [block], block.consensus) where(invalid_neighbors_query, [block], block.consensus)
end end
defp filter_by_min_height(blocks, filter_func) do defp filter_by_height_range(blocks, filter_func) do
minimal_block_height = trace_minimal_block_height() minimal_block_height = Application.get_env(:indexer, :trace_first_block)
maximal_block_height = Application.get_env(:indexer, :trace_last_block)
if minimal_block_height > 0 do if minimal_block_height > 0 || maximal_block_height do
Enum.filter(blocks, &filter_func.(&1)) Enum.filter(blocks, &filter_func.(&1))
else else
blocks blocks

@ -15,7 +15,7 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do
alias Explorer.Repo, as: ExplorerRepo alias Explorer.Repo, as: ExplorerRepo
alias Explorer.Utility.MissingRangesManipulator alias Explorer.Utility.MissingRangesManipulator
import Ecto.Query, only: [from: 2] import Ecto.Query, only: [from: 2, where: 3]
@behaviour Runner @behaviour Runner
@ -691,7 +691,8 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do
end end
defp remove_consensus_of_invalid_blocks(repo, invalid_block_numbers) do defp remove_consensus_of_invalid_blocks(repo, invalid_block_numbers) do
minimal_block = EthereumJSONRPC.first_block_to_fetch(:trace_first_block) minimal_block = Application.get_env(:indexer, :trace_first_block)
maximal_block = Application.get_env(:indexer, :trace_last_block)
if Enum.count(invalid_block_numbers) > 0 do if Enum.count(invalid_block_numbers) > 0 do
update_query = update_query =
@ -704,6 +705,9 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do
update: [set: [consensus: false]] update: [set: [consensus: false]]
) )
update_query =
if maximal_block, do: update_query |> where([block], block.number < ^maximal_block), else: update_query
try do try do
{_num, result} = repo.update_all(update_query, []) {_num, result} = repo.update_all(update_query, [])

@ -1478,7 +1478,7 @@ defmodule Explorer.ChainTest do
Supervisor.restart_child(Explorer.Supervisor, Explorer.Chain.Cache.Block.child_id()) Supervisor.restart_child(Explorer.Supervisor, Explorer.Chain.Cache.Block.child_id())
on_exit(fn -> on_exit(fn ->
Application.put_env(:indexer, :first_block, "") Application.put_env(:indexer, :first_block, 0)
end) end)
end end
@ -1508,7 +1508,7 @@ defmodule Explorer.ChainTest do
end end
test "returns 1.0 if fully indexed blocks starting from given FIRST_BLOCK" do test "returns 1.0 if fully indexed blocks starting from given FIRST_BLOCK" do
Application.put_env(:indexer, :first_block, "5") Application.put_env(:indexer, :first_block, 5)
for index <- 5..9 do for index <- 5..9 do
insert(:block, number: index, consensus: true) insert(:block, number: index, consensus: true)
@ -1527,7 +1527,7 @@ defmodule Explorer.ChainTest do
Supervisor.restart_child(Explorer.Supervisor, PendingBlockOperationCache.child_id()) Supervisor.restart_child(Explorer.Supervisor, PendingBlockOperationCache.child_id())
on_exit(fn -> on_exit(fn ->
Application.put_env(:indexer, :trace_first_block, "") Application.put_env(:indexer, :trace_first_block, 0)
Supervisor.terminate_child(Explorer.Supervisor, PendingBlockOperationCache.child_id()) Supervisor.terminate_child(Explorer.Supervisor, PendingBlockOperationCache.child_id())
end) end)
end end
@ -1559,7 +1559,7 @@ defmodule Explorer.ChainTest do
end end
test "returns 1.0 if fully indexed blocks with internal transactions starting from given TRACE_FIRST_BLOCK" do test "returns 1.0 if fully indexed blocks with internal transactions starting from given TRACE_FIRST_BLOCK" do
Application.put_env(:indexer, :trace_first_block, "5") Application.put_env(:indexer, :trace_first_block, 5)
for index <- 5..9 do for index <- 5..9 do
insert(:block, number: index) insert(:block, number: index)

@ -182,27 +182,19 @@ defmodule Indexer.Block.Catchup.MissingRangesCollector do
end end
defp first_block do defp first_block do
string_value = Application.get_env(:indexer, :first_block) first_block_from_config = Application.get_env(:indexer, :first_block)
case Integer.parse(string_value) do min_missing_block_number =
{integer, ""} -> "min_missing_block_number"
integer |> Chain.get_last_fetched_counter()
|> Decimal.to_integer()
_ -> max(first_block_from_config, min_missing_block_number)
min_missing_block_number =
"min_missing_block_number"
|> Chain.get_last_fetched_counter()
|> Decimal.to_integer()
min_missing_block_number
end
end end
defp last_block do defp last_block do
case Integer.parse(Application.get_env(:indexer, :last_block)) do last_block = Application.get_env(:indexer, :last_block)
{block, ""} -> block + 1 if last_block, do: last_block + 1, else: fetch_max_block_number()
_ -> fetch_max_block_number()
end
end end
defp fetch_max_block_number do defp fetch_max_block_number do
@ -221,9 +213,12 @@ defmodule Indexer.Block.Catchup.MissingRangesCollector do
end end
defp continue_future_updating?(max_fetched_block_number) do defp continue_future_updating?(max_fetched_block_number) do
case Integer.parse(Application.get_env(:indexer, :last_block)) do last_block = Application.get_env(:indexer, :last_block)
{block, ""} -> max_fetched_block_number < block
_ -> true if last_block do
max_fetched_block_number < last_block
else
true
end end
end end

@ -83,9 +83,12 @@ defmodule Indexer.Fetcher.CoinBalance do
# `{address, block}`, so take unique params only # `{address, block}`, so take unique params only
unique_entries = Enum.uniq(entries) unique_entries = Enum.uniq(entries)
min_block = Application.get_env(:indexer, :trace_first_block)
max_block = Application.get_env(:indexer, :trace_last_block)
unique_filtered_entries = unique_filtered_entries =
Enum.filter(unique_entries, fn {_hash, block_number} -> Enum.filter(unique_entries, fn {_hash, block_number} ->
block_number >= EthereumJSONRPC.first_block_to_fetch(:trace_first_block) block_number >= min_block && if max_block, do: block_number <= max_block, else: true
end) end)
unique_entry_count = Enum.count(unique_filtered_entries) unique_entry_count = Enum.count(unique_filtered_entries)

@ -158,7 +158,7 @@ defmodule Indexer.Fetcher.InternalTransaction do
end end
defp drop_genesis(block_numbers, json_rpc_named_arguments) do defp drop_genesis(block_numbers, json_rpc_named_arguments) do
first_block = EthereumJSONRPC.first_block_to_fetch(:trace_first_block) first_block = Application.get_env(:indexer, :trace_first_block)
if first_block in block_numbers do if first_block in block_numbers do
case EthereumJSONRPC.fetch_blocks_by_numbers([first_block], json_rpc_named_arguments) do case EthereumJSONRPC.fetch_blocks_by_numbers([first_block], json_rpc_named_arguments) do

@ -38,7 +38,7 @@ defmodule Indexer.Block.Catchup.FetcherTest do
describe "import/1" do describe "import/1" do
setup do setup do
configuration = Application.get_env(:indexer, :last_block) configuration = Application.get_env(:indexer, :last_block)
Application.put_env(:indexer, :last_block, "0") Application.put_env(:indexer, :last_block, 0)
on_exit(fn -> on_exit(fn ->
Application.put_env(:indexer, :last_block, configuration) Application.put_env(:indexer, :last_block, configuration)

@ -36,8 +36,8 @@ defmodule Indexer.Block.Catchup.MissingRangesCollectorTest do
end end
test "FIRST_BLOCK and LAST_BLOCK envs" do test "FIRST_BLOCK and LAST_BLOCK envs" do
Application.put_env(:indexer, :first_block, "100") Application.put_env(:indexer, :first_block, 100)
Application.put_env(:indexer, :last_block, "200") Application.put_env(:indexer, :last_block, 200)
insert(:missing_block_range, from_number: 250, to_number: 220) insert(:missing_block_range, from_number: 250, to_number: 220)
insert(:missing_block_range, from_number: 220, to_number: 190) insert(:missing_block_range, from_number: 220, to_number: 190)

@ -45,6 +45,17 @@ defmodule ConfigHelper do
end end
end end
@spec parse_integer_or_nil_env_var(String.t()) :: non_neg_integer() | nil
def parse_integer_or_nil_env_var(env_var) do
env_var
|> System.get_env("")
|> Integer.parse()
|> case do
{integer, _} -> integer
_ -> nil
end
end
@spec parse_time_env_var(String.t(), String.t() | nil) :: non_neg_integer() @spec parse_time_env_var(String.t(), String.t() | nil) :: non_neg_integer()
def parse_time_env_var(env_var, default_value) do def parse_time_env_var(env_var, default_value) do
case env_var |> safe_get_env(default_value) |> String.downcase() |> Integer.parse() do case env_var |> safe_get_env(default_value) |> String.downcase() |> Integer.parse() do

@ -430,10 +430,10 @@ config :indexer,
block_transformer: ConfigHelper.block_transformer(), block_transformer: ConfigHelper.block_transformer(),
metadata_updater_milliseconds_interval: ConfigHelper.parse_time_env_var("TOKEN_METADATA_UPDATE_INTERVAL", "48h"), metadata_updater_milliseconds_interval: ConfigHelper.parse_time_env_var("TOKEN_METADATA_UPDATE_INTERVAL", "48h"),
block_ranges: System.get_env("BLOCK_RANGES"), block_ranges: System.get_env("BLOCK_RANGES"),
first_block: System.get_env("FIRST_BLOCK") || "", first_block: ConfigHelper.parse_integer_env_var("FIRST_BLOCK", 0),
last_block: System.get_env("LAST_BLOCK") || "", last_block: ConfigHelper.parse_integer_or_nil_env_var("LAST_BLOCK"),
trace_first_block: System.get_env("TRACE_FIRST_BLOCK") || "", trace_first_block: ConfigHelper.parse_integer_env_var("TRACE_FIRST_BLOCK", 0),
trace_last_block: System.get_env("TRACE_LAST_BLOCK") || "", trace_last_block: ConfigHelper.parse_integer_or_nil_env_var("TRACE_LAST_BLOCK"),
fetch_rewards_way: System.get_env("FETCH_REWARDS_WAY", "trace_block"), fetch_rewards_way: System.get_env("FETCH_REWARDS_WAY", "trace_block"),
memory_limit: ConfigHelper.indexer_memory_limit(), memory_limit: ConfigHelper.indexer_memory_limit(),
receipts_batch_size: ConfigHelper.parse_integer_env_var("INDEXER_RECEIPTS_BATCH_SIZE", 250), receipts_batch_size: ConfigHelper.parse_integer_env_var("INDEXER_RECEIPTS_BATCH_SIZE", 250),

Loading…
Cancel
Save