|
|
|
@ -58,10 +58,28 @@ defmodule Explorer.Chain.Import.Runner.Addresses do |
|
|
|
|
end) |
|
|
|
|
end) |
|
|
|
|
|
|
|
|
|
ordered_changes_list = |
|
|
|
|
changes_list_with_defaults |
|
|
|
|
|> Enum.group_by(& &1.hash) |
|
|
|
|
|> Enum.map(fn {_, grouped_addresses} -> |
|
|
|
|
Enum.max_by(grouped_addresses, fn address -> |
|
|
|
|
address_max_by(address) |
|
|
|
|
end) |
|
|
|
|
end) |
|
|
|
|
|> Enum.sort_by(& &1.hash) |
|
|
|
|
|
|
|
|
|
multi |
|
|
|
|
|> Multi.run(:addresses, fn repo, _ -> |
|
|
|
|
|> Multi.run(:filter_addresses, fn repo, _ -> |
|
|
|
|
Instrumenter.block_import_stage_runner( |
|
|
|
|
fn -> filter_addresses(repo, ordered_changes_list) end, |
|
|
|
|
:addresses, |
|
|
|
|
:addresses, |
|
|
|
|
:filter_addresses |
|
|
|
|
) |
|
|
|
|
end) |
|
|
|
|
|> Multi.run(:addresses, fn repo, %{filter_addresses: addresses} -> |
|
|
|
|
Instrumenter.block_import_stage_runner( |
|
|
|
|
fn -> insert(repo, changes_list_with_defaults, insert_options) end, |
|
|
|
|
fn -> insert(repo, addresses, insert_options) end, |
|
|
|
|
:addresses, |
|
|
|
|
:addresses, |
|
|
|
|
:addresses |
|
|
|
@ -83,29 +101,56 @@ defmodule Explorer.Chain.Import.Runner.Addresses do |
|
|
|
|
|
|
|
|
|
## Private Functions |
|
|
|
|
|
|
|
|
|
@spec filter_addresses(Repo.t(), [map()]) :: {:ok, [map()]} |
|
|
|
|
defp filter_addresses(repo, changes_list) do |
|
|
|
|
hashes = Enum.map(changes_list, & &1.hash) |
|
|
|
|
|
|
|
|
|
existing_addresses_query = |
|
|
|
|
from(a in Address, |
|
|
|
|
where: a.hash in ^hashes, |
|
|
|
|
select: [:hash, :contract_code, :fetched_coin_balance_block_number, :nonce] |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
existing_addresses_map = |
|
|
|
|
existing_addresses_query |
|
|
|
|
|> repo.all() |
|
|
|
|
|> Map.new(&{&1.hash, &1}) |
|
|
|
|
|
|
|
|
|
filtered_addresses = |
|
|
|
|
changes_list |
|
|
|
|
|> Enum.reduce([], fn address, acc -> |
|
|
|
|
existing_address = existing_addresses_map[address.hash] |
|
|
|
|
|
|
|
|
|
if should_update?(address, existing_address) do |
|
|
|
|
[address | acc] |
|
|
|
|
else |
|
|
|
|
acc |
|
|
|
|
end |
|
|
|
|
end) |
|
|
|
|
|> Enum.reverse() |
|
|
|
|
|
|
|
|
|
{:ok, filtered_addresses} |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
defp should_update?(new_address, existing_address) do |
|
|
|
|
is_nil(existing_address) or |
|
|
|
|
(not is_nil(new_address[:contract_code]) and new_address[:contract_code] != existing_address.contract_code) or |
|
|
|
|
(not is_nil(new_address[:fetched_coin_balance_block_number]) and |
|
|
|
|
(is_nil(existing_address.fetched_coin_balance_block_number) or |
|
|
|
|
new_address[:fetched_coin_balance_block_number] >= existing_address.fetched_coin_balance_block_number)) or |
|
|
|
|
(not is_nil(new_address[:nonce]) and |
|
|
|
|
(is_nil(existing_address.nonce) or new_address[:nonce] > existing_address.nonce)) |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
@spec insert(Repo.t(), [%{hash: Hash.Address.t()}], %{ |
|
|
|
|
optional(:on_conflict) => Import.Runner.on_conflict(), |
|
|
|
|
required(:timeout) => timeout, |
|
|
|
|
required(:timestamps) => Import.timestamps() |
|
|
|
|
}) :: {:ok, [Address.t()]} |
|
|
|
|
defp insert(repo, changes_list, %{timeout: timeout, timestamps: timestamps} = options) when is_list(changes_list) do |
|
|
|
|
defp insert(repo, ordered_changes_list, %{timeout: timeout, timestamps: timestamps} = options) |
|
|
|
|
when is_list(ordered_changes_list) do |
|
|
|
|
on_conflict = Map.get_lazy(options, :on_conflict, &default_on_conflict/0) |
|
|
|
|
|
|
|
|
|
# Enforce Address ShareLocks order (see docs: sharelocks.md) |
|
|
|
|
ordered_changes_list = |
|
|
|
|
changes_list |
|
|
|
|
|> Enum.group_by(fn %{ |
|
|
|
|
hash: hash |
|
|
|
|
} -> |
|
|
|
|
{hash} |
|
|
|
|
end) |
|
|
|
|
|> Enum.map(fn {_, grouped_addresses} -> |
|
|
|
|
Enum.max_by(grouped_addresses, fn address -> |
|
|
|
|
address_max_by(address) |
|
|
|
|
end) |
|
|
|
|
end) |
|
|
|
|
|> Enum.sort_by(& &1.hash) |
|
|
|
|
|
|
|
|
|
Import.insert_changes_list( |
|
|
|
|
repo, |
|
|
|
|
ordered_changes_list, |
|
|
|
|