diff --git a/CHANGELOG.md b/CHANGELOG.md index a4e527694f..fe99560d8d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ ### Features +- [#7811](https://github.com/blockscout/blockscout/pull/7811) - Filter addresses before insertion + ### Fixes - [#7772](https://github.com/blockscout/blockscout/pull/7772) - Fix parsing of database password period(s) diff --git a/apps/explorer/lib/explorer/chain/import/runner/addresses.ex b/apps/explorer/lib/explorer/chain/import/runner/addresses.ex index 2b703a0100..70d5c85bfe 100644 --- a/apps/explorer/lib/explorer/chain/import/runner/addresses.ex +++ b/apps/explorer/lib/explorer/chain/import/runner/addresses.ex @@ -58,10 +58,28 @@ defmodule Explorer.Chain.Import.Runner.Addresses do end) end) + ordered_changes_list = + changes_list_with_defaults + |> Enum.group_by(& &1.hash) + |> Enum.map(fn {_, grouped_addresses} -> + Enum.max_by(grouped_addresses, fn address -> + address_max_by(address) + end) + end) + |> Enum.sort_by(& &1.hash) + multi - |> Multi.run(:addresses, fn repo, _ -> + |> Multi.run(:filter_addresses, fn repo, _ -> + Instrumenter.block_import_stage_runner( + fn -> filter_addresses(repo, ordered_changes_list) end, + :addresses, + :addresses, + :filter_addresses + ) + end) + |> Multi.run(:addresses, fn repo, %{filter_addresses: addresses} -> Instrumenter.block_import_stage_runner( - fn -> insert(repo, changes_list_with_defaults, insert_options) end, + fn -> insert(repo, addresses, insert_options) end, :addresses, :addresses, :addresses @@ -83,29 +101,56 @@ defmodule Explorer.Chain.Import.Runner.Addresses do ## Private Functions + @spec filter_addresses(Repo.t(), [map()]) :: {:ok, [map()]} + defp filter_addresses(repo, changes_list) do + hashes = Enum.map(changes_list, & &1.hash) + + existing_addresses_query = + from(a in Address, + where: a.hash in ^hashes, + select: [:hash, :contract_code, :fetched_coin_balance_block_number, :nonce] + ) + + existing_addresses_map = + existing_addresses_query + |> repo.all() + |> Map.new(&{&1.hash, &1}) + + filtered_addresses = + changes_list + |> Enum.reduce([], fn address, acc -> + existing_address = existing_addresses_map[address.hash] + + if should_update?(address, existing_address) do + [address | acc] + else + acc + end + end) + |> Enum.reverse() + + {:ok, filtered_addresses} + end + + defp should_update?(new_address, existing_address) do + is_nil(existing_address) or + (not is_nil(new_address[:contract_code]) and new_address[:contract_code] != existing_address.contract_code) or + (not is_nil(new_address[:fetched_coin_balance_block_number]) and + (is_nil(existing_address.fetched_coin_balance_block_number) or + new_address[:fetched_coin_balance_block_number] >= existing_address.fetched_coin_balance_block_number)) or + (not is_nil(new_address[:nonce]) and + (is_nil(existing_address.nonce) or new_address[:nonce] > existing_address.nonce)) + end + @spec insert(Repo.t(), [%{hash: Hash.Address.t()}], %{ optional(:on_conflict) => Import.Runner.on_conflict(), required(:timeout) => timeout, required(:timestamps) => Import.timestamps() }) :: {:ok, [Address.t()]} - defp insert(repo, changes_list, %{timeout: timeout, timestamps: timestamps} = options) when is_list(changes_list) do + defp insert(repo, ordered_changes_list, %{timeout: timeout, timestamps: timestamps} = options) + when is_list(ordered_changes_list) do on_conflict = Map.get_lazy(options, :on_conflict, &default_on_conflict/0) - # Enforce Address ShareLocks order (see docs: sharelocks.md) - ordered_changes_list = - changes_list - |> Enum.group_by(fn %{ - hash: hash - } -> - {hash} - end) - |> Enum.map(fn {_, grouped_addresses} -> - Enum.max_by(grouped_addresses, fn address -> - address_max_by(address) - end) - end) - |> Enum.sort_by(& &1.hash) - Import.insert_changes_list( repo, ordered_changes_list,