From 31ae4f51fe2c187ff0bd393985bff22288243c2c Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Wed, 3 Oct 2018 13:07:20 -0500 Subject: [PATCH] Extract Explorer.Chain.Import.Addresses --- apps/explorer/lib/explorer/chain/import.ex | 101 ++-------------- .../lib/explorer/chain/import/addresses.ex | 108 ++++++++++++++++++ 2 files changed, 115 insertions(+), 94 deletions(-) create mode 100644 apps/explorer/lib/explorer/chain/import/addresses.ex diff --git a/apps/explorer/lib/explorer/chain/import.ex b/apps/explorer/lib/explorer/chain/import.ex index ec253df704..2411accd76 100644 --- a/apps/explorer/lib/explorer/chain/import.ex +++ b/apps/explorer/lib/explorer/chain/import.ex @@ -21,17 +21,13 @@ defmodule Explorer.Chain.Import do Transaction, Wei } + alias Explorer.Chain.Import.Addresses alias Explorer.Repo @type changeset_function_name :: atom @type on_conflict :: :nothing | :replace_all @type params :: [map()] - @type addresses_options :: %{ - required(:params) => params, - optional(:timeout) => timeout, - optional(:with) => changeset_function_name - } @type balances_options :: %{ required(:params) => params, optional(:timeout) => timeout @@ -80,7 +76,7 @@ defmodule Explorer.Chain.Import do optional(:timeout) => timeout } @type all_options :: %{ - optional(:addresses) => addresses_options, + optional(:addresses) => Addresses.options, optional(:balances) => balances_options, optional(:blocks) => blocks_options, optional(:block_second_degree_relations) => block_second_degree_relations_options, @@ -98,7 +94,7 @@ defmodule Explorer.Chain.Import do @type all_result :: {:ok, %{ - optional(:addresses) => [Address.t()], + optional(:addresses) => Addresses.imported(), optional(:balances) => [ %{required(:address_hash) => Hash.Address.t(), required(:block_number) => Block.block_number()} ], @@ -123,13 +119,12 @@ defmodule Explorer.Chain.Import do | {:error, step :: Ecto.Multi.name(), failed_value :: any(), changes_so_far :: %{optional(Ecto.Multi.name()) => any()}} - @typep timestamps :: %{inserted_at: DateTime.t(), updated_at: DateTime.t()} + @type timestamps :: %{inserted_at: DateTime.t(), updated_at: DateTime.t()} # timeouts all in milliseconds @transaction_timeout 120_000 - @insert_addresses_timeout 60_000 @insert_balances_timeout 60_000 @insert_blocks_timeout 60_000 @insert_block_second_degree_relations_timeout 60_000 @@ -178,7 +173,7 @@ defmodule Explorer.Chain.Import do * `:addresses` * `:params` - `list` of params for `Explorer.Chain.Address.changeset/2`. - * `:timeout` - the timeout for inserting all addresses. Defaults to `#{@insert_addresses_timeout}` milliseconds. + * `:timeout` - the timeout for inserting all addresses. Defaults to `#{Addresses.timeout()}` milliseconds. * `:with` - the changeset function on `Explorer.Chain.Address` to use validate `:params`. * `:balances` * `:params` - `list` of params for `Explorer.Chain.Address.CoinBalance.changeset/2`. @@ -321,7 +316,7 @@ defmodule Explorer.Chain.Import do full_options = Map.put(options, :timestamps, timestamps) Multi.new() - |> run_addresses(ecto_schema_module_to_changes_list_map, full_options) + |> Addresses.run(ecto_schema_module_to_changes_list_map, full_options) |> run_balances(ecto_schema_module_to_changes_list_map, full_options) |> run_blocks(ecto_schema_module_to_changes_list_map, full_options) |> run_block_second_degree_relations(ecto_schema_module_to_changes_list_map, full_options) @@ -334,27 +329,6 @@ defmodule Explorer.Chain.Import do |> run_token_balances(ecto_schema_module_to_changes_list_map, full_options) end - defp run_addresses(multi, ecto_schema_module_to_changes_list_map, options) - when is_map(ecto_schema_module_to_changes_list_map) and is_map(options) do - case ecto_schema_module_to_changes_list_map do - %{Address => addresses_changes} -> - timestamps = Map.fetch!(options, :timestamps) - - Multi.run(multi, :addresses, fn _ -> - insert_addresses( - addresses_changes, - %{ - timeout: options[:addresses][:timeout] || @insert_addresses_timeout, - timestamps: timestamps - } - ) - end) - - _ -> - multi - end - end - defp run_balances(multi, ecto_schema_module_to_changes_list_map, options) when is_map(ecto_schema_module_to_changes_list_map) and is_map(options) do case ecto_schema_module_to_changes_list_map do @@ -602,67 +576,6 @@ defmodule Explorer.Chain.Import do end end - @spec insert_addresses([%{hash: Hash.Address.t()}], %{ - required(:timeout) => timeout, - required(:timestamps) => timestamps - }) :: {:ok, [Hash.Address.t()]} - defp insert_addresses(changes_list, %{timeout: timeout, timestamps: timestamps}) when is_list(changes_list) do - # order so that row ShareLocks are grabbed in a consistent order - ordered_changes_list = sort_address_changes_list(changes_list) - - insert_changes_list( - ordered_changes_list, - conflict_target: :hash, - on_conflict: - from( - address in Address, - update: [ - set: [ - contract_code: fragment("COALESCE(?, EXCLUDED.contract_code)", address.contract_code), - # ARGMAX on two columns - fetched_coin_balance: - fragment( - """ - CASE WHEN EXCLUDED.fetched_coin_balance_block_number IS NOT NULL AND - (? IS NULL OR - EXCLUDED.fetched_coin_balance_block_number >= ?) THEN - EXCLUDED.fetched_coin_balance - ELSE ? - END - """, - address.fetched_coin_balance_block_number, - address.fetched_coin_balance_block_number, - address.fetched_coin_balance - ), - # MAX on two columns - fetched_coin_balance_block_number: - fragment( - """ - CASE WHEN EXCLUDED.fetched_coin_balance_block_number IS NOT NULL AND - (? IS NULL OR - EXCLUDED.fetched_coin_balance_block_number >= ?) THEN - EXCLUDED.fetched_coin_balance_block_number - ELSE ? - END - """, - address.fetched_coin_balance_block_number, - address.fetched_coin_balance_block_number, - address.fetched_coin_balance_block_number - ) - ] - ] - ), - for: Address, - returning: true, - timeout: timeout, - timestamps: timestamps - ) - end - - defp sort_address_changes_list(changes_list) do - Enum.sort_by(changes_list, & &1.hash) - end - @spec insert_balances( [ %{ @@ -977,7 +890,7 @@ defmodule Explorer.Chain.Import do ) end - defp insert_changes_list(changes_list, options) when is_list(changes_list) do + def insert_changes_list(changes_list, options) when is_list(changes_list) do ecto_schema_module = Keyword.fetch!(options, :for) timestamped_changes_list = timestamp_changes_list(changes_list, Keyword.fetch!(options, :timestamps)) diff --git a/apps/explorer/lib/explorer/chain/import/addresses.ex b/apps/explorer/lib/explorer/chain/import/addresses.ex new file mode 100644 index 0000000000..43ed0ad404 --- /dev/null +++ b/apps/explorer/lib/explorer/chain/import/addresses.ex @@ -0,0 +1,108 @@ +defmodule Explorer.Chain.Import.Addresses do + @moduledoc """ + Bulk imports `t:Explorer.Chain.Address.t/0`. + """ + + require Ecto.Query + + alias Ecto.Multi + alias Explorer.Chain.{Address, Import} + + import Ecto.Query, only: [from: 2] + + # milliseconds + @timeout 60_000 + + @type imported :: [Address.t()] + @type options :: %{ + required(:params) => Import.params(), + optional(:timeout) => timeout, + optional(:with) => Import.changeset_function_name() + } + + def run(multi, ecto_schema_module_to_changes_list_map, options) + when is_map(ecto_schema_module_to_changes_list_map) and is_map(options) do + case ecto_schema_module_to_changes_list_map do + %{Address => addresses_changes} -> + timestamps = Map.fetch!(options, :timestamps) + + Multi.run(multi, :addresses, fn _ -> + insert( + addresses_changes, + %{ + timeout: options[:addresses][:timeout] || @timeout, + timestamps: timestamps + } + ) + end) + + _ -> + multi + end + end + + def timeout, do: @timeout + + ## Private Functions + + @spec insert([%{hash: Hash.Address.t()}], %{ + required(:timeout) => timeout, + required(:timestamps) => Import.timestamps() + }) :: {:ok, [Hash.Address.t()]} + defp insert(changes_list, %{timeout: timeout, timestamps: timestamps}) when is_list(changes_list) do + # order so that row ShareLocks are grabbed in a consistent order + ordered_changes_list = sort_changes_list(changes_list) + + Import.insert_changes_list( + ordered_changes_list, + conflict_target: :hash, + on_conflict: + from( + address in Address, + update: [ + set: [ + contract_code: fragment("COALESCE(?, EXCLUDED.contract_code)", address.contract_code), + # ARGMAX on two columns + fetched_coin_balance: + fragment( + """ + CASE WHEN EXCLUDED.fetched_coin_balance_block_number IS NOT NULL AND + (? IS NULL OR + EXCLUDED.fetched_coin_balance_block_number >= ?) THEN + EXCLUDED.fetched_coin_balance + ELSE ? + END + """, + address.fetched_coin_balance_block_number, + address.fetched_coin_balance_block_number, + address.fetched_coin_balance + ), + # MAX on two columns + fetched_coin_balance_block_number: + fragment( + """ + CASE WHEN EXCLUDED.fetched_coin_balance_block_number IS NOT NULL AND + (? IS NULL OR + EXCLUDED.fetched_coin_balance_block_number >= ?) THEN + EXCLUDED.fetched_coin_balance_block_number + ELSE ? + END + """, + address.fetched_coin_balance_block_number, + address.fetched_coin_balance_block_number, + address.fetched_coin_balance_block_number + ) + ] + ] + ), + for: Address, + returning: true, + timeout: timeout, + timestamps: timestamps + ) + end + + defp sort_changes_list(changes_list) do + Enum.sort_by(changes_list, & &1.hash) + end +end