Extract Explorer.Chain.Import.Addresses

pull/859/head
Luke Imhoff 6 years ago
parent c4f973c55b
commit 31ae4f51fe
  1. 101
      apps/explorer/lib/explorer/chain/import.ex
  2. 108
      apps/explorer/lib/explorer/chain/import/addresses.ex

@ -21,17 +21,13 @@ defmodule Explorer.Chain.Import do
Transaction, Transaction,
Wei Wei
} }
alias Explorer.Chain.Import.Addresses
alias Explorer.Repo alias Explorer.Repo
@type changeset_function_name :: atom @type changeset_function_name :: atom
@type on_conflict :: :nothing | :replace_all @type on_conflict :: :nothing | :replace_all
@type params :: [map()] @type params :: [map()]
@type addresses_options :: %{
required(:params) => params,
optional(:timeout) => timeout,
optional(:with) => changeset_function_name
}
@type balances_options :: %{ @type balances_options :: %{
required(:params) => params, required(:params) => params,
optional(:timeout) => timeout optional(:timeout) => timeout
@ -80,7 +76,7 @@ defmodule Explorer.Chain.Import do
optional(:timeout) => timeout optional(:timeout) => timeout
} }
@type all_options :: %{ @type all_options :: %{
optional(:addresses) => addresses_options, optional(:addresses) => Addresses.options,
optional(:balances) => balances_options, optional(:balances) => balances_options,
optional(:blocks) => blocks_options, optional(:blocks) => blocks_options,
optional(:block_second_degree_relations) => block_second_degree_relations_options, optional(:block_second_degree_relations) => block_second_degree_relations_options,
@ -98,7 +94,7 @@ defmodule Explorer.Chain.Import do
@type all_result :: @type all_result ::
{:ok, {:ok,
%{ %{
optional(:addresses) => [Address.t()], optional(:addresses) => Addresses.imported(),
optional(:balances) => [ optional(:balances) => [
%{required(:address_hash) => Hash.Address.t(), required(:block_number) => Block.block_number()} %{required(:address_hash) => Hash.Address.t(), required(:block_number) => Block.block_number()}
], ],
@ -123,13 +119,12 @@ defmodule Explorer.Chain.Import do
| {:error, step :: Ecto.Multi.name(), failed_value :: any(), | {:error, step :: Ecto.Multi.name(), failed_value :: any(),
changes_so_far :: %{optional(Ecto.Multi.name()) => any()}} changes_so_far :: %{optional(Ecto.Multi.name()) => any()}}
@typep timestamps :: %{inserted_at: DateTime.t(), updated_at: DateTime.t()} @type timestamps :: %{inserted_at: DateTime.t(), updated_at: DateTime.t()}
# timeouts all in milliseconds # timeouts all in milliseconds
@transaction_timeout 120_000 @transaction_timeout 120_000
@insert_addresses_timeout 60_000
@insert_balances_timeout 60_000 @insert_balances_timeout 60_000
@insert_blocks_timeout 60_000 @insert_blocks_timeout 60_000
@insert_block_second_degree_relations_timeout 60_000 @insert_block_second_degree_relations_timeout 60_000
@ -178,7 +173,7 @@ defmodule Explorer.Chain.Import do
* `:addresses` * `:addresses`
* `:params` - `list` of params for `Explorer.Chain.Address.changeset/2`. * `:params` - `list` of params for `Explorer.Chain.Address.changeset/2`.
* `:timeout` - the timeout for inserting all addresses. Defaults to `#{@insert_addresses_timeout}` milliseconds. * `:timeout` - the timeout for inserting all addresses. Defaults to `#{Addresses.timeout()}` milliseconds.
* `:with` - the changeset function on `Explorer.Chain.Address` to use validate `:params`. * `:with` - the changeset function on `Explorer.Chain.Address` to use validate `:params`.
* `:balances` * `:balances`
* `:params` - `list` of params for `Explorer.Chain.Address.CoinBalance.changeset/2`. * `:params` - `list` of params for `Explorer.Chain.Address.CoinBalance.changeset/2`.
@ -321,7 +316,7 @@ defmodule Explorer.Chain.Import do
full_options = Map.put(options, :timestamps, timestamps) full_options = Map.put(options, :timestamps, timestamps)
Multi.new() Multi.new()
|> run_addresses(ecto_schema_module_to_changes_list_map, full_options) |> Addresses.run(ecto_schema_module_to_changes_list_map, full_options)
|> run_balances(ecto_schema_module_to_changes_list_map, full_options) |> run_balances(ecto_schema_module_to_changes_list_map, full_options)
|> run_blocks(ecto_schema_module_to_changes_list_map, full_options) |> run_blocks(ecto_schema_module_to_changes_list_map, full_options)
|> run_block_second_degree_relations(ecto_schema_module_to_changes_list_map, full_options) |> run_block_second_degree_relations(ecto_schema_module_to_changes_list_map, full_options)
@ -334,27 +329,6 @@ defmodule Explorer.Chain.Import do
|> run_token_balances(ecto_schema_module_to_changes_list_map, full_options) |> run_token_balances(ecto_schema_module_to_changes_list_map, full_options)
end end
defp run_addresses(multi, ecto_schema_module_to_changes_list_map, options)
when is_map(ecto_schema_module_to_changes_list_map) and is_map(options) do
case ecto_schema_module_to_changes_list_map do
%{Address => addresses_changes} ->
timestamps = Map.fetch!(options, :timestamps)
Multi.run(multi, :addresses, fn _ ->
insert_addresses(
addresses_changes,
%{
timeout: options[:addresses][:timeout] || @insert_addresses_timeout,
timestamps: timestamps
}
)
end)
_ ->
multi
end
end
defp run_balances(multi, ecto_schema_module_to_changes_list_map, options) defp run_balances(multi, ecto_schema_module_to_changes_list_map, options)
when is_map(ecto_schema_module_to_changes_list_map) and is_map(options) do when is_map(ecto_schema_module_to_changes_list_map) and is_map(options) do
case ecto_schema_module_to_changes_list_map do case ecto_schema_module_to_changes_list_map do
@ -602,67 +576,6 @@ defmodule Explorer.Chain.Import do
end end
end end
@spec insert_addresses([%{hash: Hash.Address.t()}], %{
required(:timeout) => timeout,
required(:timestamps) => timestamps
}) :: {:ok, [Hash.Address.t()]}
defp insert_addresses(changes_list, %{timeout: timeout, timestamps: timestamps}) when is_list(changes_list) do
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = sort_address_changes_list(changes_list)
insert_changes_list(
ordered_changes_list,
conflict_target: :hash,
on_conflict:
from(
address in Address,
update: [
set: [
contract_code: fragment("COALESCE(?, EXCLUDED.contract_code)", address.contract_code),
# ARGMAX on two columns
fetched_coin_balance:
fragment(
"""
CASE WHEN EXCLUDED.fetched_coin_balance_block_number IS NOT NULL AND
(? IS NULL OR
EXCLUDED.fetched_coin_balance_block_number >= ?) THEN
EXCLUDED.fetched_coin_balance
ELSE ?
END
""",
address.fetched_coin_balance_block_number,
address.fetched_coin_balance_block_number,
address.fetched_coin_balance
),
# MAX on two columns
fetched_coin_balance_block_number:
fragment(
"""
CASE WHEN EXCLUDED.fetched_coin_balance_block_number IS NOT NULL AND
(? IS NULL OR
EXCLUDED.fetched_coin_balance_block_number >= ?) THEN
EXCLUDED.fetched_coin_balance_block_number
ELSE ?
END
""",
address.fetched_coin_balance_block_number,
address.fetched_coin_balance_block_number,
address.fetched_coin_balance_block_number
)
]
]
),
for: Address,
returning: true,
timeout: timeout,
timestamps: timestamps
)
end
defp sort_address_changes_list(changes_list) do
Enum.sort_by(changes_list, & &1.hash)
end
@spec insert_balances( @spec insert_balances(
[ [
%{ %{
@ -977,7 +890,7 @@ defmodule Explorer.Chain.Import do
) )
end end
defp insert_changes_list(changes_list, options) when is_list(changes_list) do def insert_changes_list(changes_list, options) when is_list(changes_list) do
ecto_schema_module = Keyword.fetch!(options, :for) ecto_schema_module = Keyword.fetch!(options, :for)
timestamped_changes_list = timestamp_changes_list(changes_list, Keyword.fetch!(options, :timestamps)) timestamped_changes_list = timestamp_changes_list(changes_list, Keyword.fetch!(options, :timestamps))

@ -0,0 +1,108 @@
defmodule Explorer.Chain.Import.Addresses do
@moduledoc """
Bulk imports `t:Explorer.Chain.Address.t/0`.
"""
require Ecto.Query
alias Ecto.Multi
alias Explorer.Chain.{Address, Import}
import Ecto.Query, only: [from: 2]
# milliseconds
@timeout 60_000
@type imported :: [Address.t()]
@type options :: %{
required(:params) => Import.params(),
optional(:timeout) => timeout,
optional(:with) => Import.changeset_function_name()
}
def run(multi, ecto_schema_module_to_changes_list_map, options)
when is_map(ecto_schema_module_to_changes_list_map) and is_map(options) do
case ecto_schema_module_to_changes_list_map do
%{Address => addresses_changes} ->
timestamps = Map.fetch!(options, :timestamps)
Multi.run(multi, :addresses, fn _ ->
insert(
addresses_changes,
%{
timeout: options[:addresses][:timeout] || @timeout,
timestamps: timestamps
}
)
end)
_ ->
multi
end
end
def timeout, do: @timeout
## Private Functions
@spec insert([%{hash: Hash.Address.t()}], %{
required(:timeout) => timeout,
required(:timestamps) => Import.timestamps()
}) :: {:ok, [Hash.Address.t()]}
defp insert(changes_list, %{timeout: timeout, timestamps: timestamps}) when is_list(changes_list) do
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = sort_changes_list(changes_list)
Import.insert_changes_list(
ordered_changes_list,
conflict_target: :hash,
on_conflict:
from(
address in Address,
update: [
set: [
contract_code: fragment("COALESCE(?, EXCLUDED.contract_code)", address.contract_code),
# ARGMAX on two columns
fetched_coin_balance:
fragment(
"""
CASE WHEN EXCLUDED.fetched_coin_balance_block_number IS NOT NULL AND
(? IS NULL OR
EXCLUDED.fetched_coin_balance_block_number >= ?) THEN
EXCLUDED.fetched_coin_balance
ELSE ?
END
""",
address.fetched_coin_balance_block_number,
address.fetched_coin_balance_block_number,
address.fetched_coin_balance
),
# MAX on two columns
fetched_coin_balance_block_number:
fragment(
"""
CASE WHEN EXCLUDED.fetched_coin_balance_block_number IS NOT NULL AND
(? IS NULL OR
EXCLUDED.fetched_coin_balance_block_number >= ?) THEN
EXCLUDED.fetched_coin_balance_block_number
ELSE ?
END
""",
address.fetched_coin_balance_block_number,
address.fetched_coin_balance_block_number,
address.fetched_coin_balance_block_number
)
]
]
),
for: Address,
returning: true,
timeout: timeout,
timestamps: timestamps
)
end
defp sort_changes_list(changes_list) do
Enum.sort_by(changes_list, & &1.hash)
end
end
Loading…
Cancel
Save