diff --git a/apps/explorer/lib/explorer/chain/import.ex b/apps/explorer/lib/explorer/chain/import.ex index c4f0ffe779..4ea5d7b0c2 100644 --- a/apps/explorer/lib/explorer/chain/import.ex +++ b/apps/explorer/lib/explorer/chain/import.ex @@ -26,10 +26,6 @@ defmodule Explorer.Chain.Import do @type changeset_function_name :: atom @type on_conflict :: :nothing | :replace_all @type params :: [map()] - @type block_second_degree_relations_options :: %{ - required(:params) => params, - optional(:timeout) => timeout - } @type internal_transactions_options :: %{ required(:params) => params, optional(:timeout) => timeout @@ -69,7 +65,7 @@ defmodule Explorer.Chain.Import do optional(:addresses) => Import.Addresses.options(), optional(:address_coin_balances) => Import.Address.CoinBalances.options(), optional(:blocks) => Import.Blocks.options(), - optional(:block_second_degree_relations) => block_second_degree_relations_options, + optional(:block_second_degree_relations) => Import.Block.SecondDegreeRelations.options(), optional(:broadcast) => boolean, optional(:internal_transactions) => internal_transactions_options, optional(:logs) => logs_options, @@ -87,9 +83,7 @@ defmodule Explorer.Chain.Import do optional(:addresses) => Import.Addresses.imported(), optional(:address_coin_balances) => Import.Address.CoinBalances.imported(), optional(:blocks) => Import.Blocks.imported(), - optional(:block_second_degree_relations) => [ - %{required(:nephew_hash) => Hash.Full.t(), required(:uncle_hash) => Hash.Full.t()} - ], + optional(:block_second_degree_relations) => Import.Block.SecondDegreeRelations.imported(), optional(:internal_transactions) => [ %{required(:index) => non_neg_integer(), required(:transaction_hash) => Hash.Full.t()} ], @@ -113,7 +107,6 @@ defmodule Explorer.Chain.Import do @transaction_timeout 120_000 - @insert_block_second_degree_relations_timeout 60_000 @insert_internal_transactions_timeout 60_000 @insert_logs_timeout 60_000 @insert_token_transfers_timeout 60_000 @@ -122,7 +115,6 @@ defmodule Explorer.Chain.Import do @insert_transactions_timeout 60_000 @insert_transaction_forks_timeout 60_000 - def block_second_degree_relations_timeout, do: @insert_block_second_degree_relations_timeout def transactions_timeout, do: @insert_transactions_timeout def transaction_forks_timeout, do: @insert_transaction_forks_timeout @@ -174,7 +166,8 @@ defmodule Explorer.Chain.Import do * `:timeout` - the timeout for inserting all blocks. Defaults to `#{Import.Blocks.timeout()}` milliseconds. * `:block_second_degree_relations` * `:params` - `list` of params `for `Explorer.Chain.Block.SecondDegreeRelation.changeset/2`. - * `:timeout` - the timeout for inserting all uncles found in the params list. + * `:timeout` - the timeout for inserting all uncles found in the params list. Defaults to + `#{Import.Block.SecondDegreeRelations.timeout()}` milliseconds. * `:broadcast` - Boolean flag indicating whether or not to broadcast the event. * `:internal_transactions` * `:params` - `list` of params for `Explorer.Chain.InternalTransaction.changeset/2`. @@ -310,7 +303,7 @@ defmodule Explorer.Chain.Import do |> Import.Addresses.run(ecto_schema_module_to_changes_list_map, full_options) |> Import.Address.CoinBalances.run(ecto_schema_module_to_changes_list_map, full_options) |> Import.Blocks.run(ecto_schema_module_to_changes_list_map, full_options) - |> run_block_second_degree_relations(ecto_schema_module_to_changes_list_map, full_options) + |> Import.Block.SecondDegreeRelations.run(ecto_schema_module_to_changes_list_map, full_options) |> run_transactions(ecto_schema_module_to_changes_list_map, full_options) |> run_transaction_forks(ecto_schema_module_to_changes_list_map, full_options) |> run_internal_transactions(ecto_schema_module_to_changes_list_map, full_options) @@ -481,51 +474,6 @@ defmodule Explorer.Chain.Import do end end - defp run_block_second_degree_relations(multi, ecto_schema_module_to_changes_list, options) - when is_map(ecto_schema_module_to_changes_list) and is_map(options) do - case ecto_schema_module_to_changes_list do - %{Block.SecondDegreeRelation => block_second_degree_relations_changes} -> - Multi.run(multi, :block_second_degree_relations, fn _ -> - insert_block_second_degree_relations( - block_second_degree_relations_changes, - %{ - timeout: - options[:block_second_degree_relations][:timeout] || @insert_block_second_degree_relations_timeout - } - ) - end) - - _ -> - multi - end - end - - @spec insert_block_second_degree_relations([map()], %{required(:timeout) => timeout}) :: - {:ok, %{nephew_hash: Hash.Full.t(), uncle_hash: Hash.Full.t()}} | {:error, [Changeset.t()]} - defp insert_block_second_degree_relations(changes_list, %{timeout: timeout}) when is_list(changes_list) do - # order so that row ShareLocks are grabbed in a consistent order - ordered_changes_list = Enum.sort_by(changes_list, &{&1.nephew_hash, &1.uncle_hash}) - - insert_changes_list(ordered_changes_list, - conflict_target: [:nephew_hash, :uncle_hash], - on_conflict: - from( - block_second_degree_relation in Block.SecondDegreeRelation, - update: [ - set: [ - uncle_fetched_at: - fragment("LEAST(?, EXCLUDED.uncle_fetched_at)", block_second_degree_relation.uncle_fetched_at) - ] - ] - ), - for: Block.SecondDegreeRelation, - returning: [:nephew_hash, :uncle_hash], - timeout: timeout, - # block_second_degree_relations doesn't have timestamps - timestamps: %{} - ) - end - @spec insert_internal_transactions([map], %{required(:timeout) => timeout, required(:timestamps) => timestamps}) :: {:ok, [%{index: non_neg_integer, transaction_hash: Hash.t()}]} | {:error, [Changeset.t()]} diff --git a/apps/explorer/lib/explorer/chain/import/block/second_degree_relations.ex b/apps/explorer/lib/explorer/chain/import/block/second_degree_relations.ex new file mode 100644 index 0000000000..3490cfa7e4 --- /dev/null +++ b/apps/explorer/lib/explorer/chain/import/block/second_degree_relations.ex @@ -0,0 +1,68 @@ +defmodule Explorer.Chain.Import.Block.SecondDegreeRelations do + @moduledoc """ + Bulk imports `t:Explorer.Chain.Block.SecondDegreeRelation.t/0`. + """ + + require Ecto.Query + + import Ecto.Query, only: [from: 2] + + alias Ecto.{Changeset, Multi} + alias Explorer.Chain.{Block, Hash, Import} + + @timeout 60_000 + + @type options :: %{ + required(:params) => Import.params(), + optional(:timeout) => timeout + } + @type imported :: [ + %{required(:nephew_hash) => Hash.Full.t(), required(:uncle_hash) => Hash.Full.t()} + ] + + def run(multi, ecto_schema_module_to_changes_list, options) + when is_map(ecto_schema_module_to_changes_list) and is_map(options) do + case ecto_schema_module_to_changes_list do + %{Block.SecondDegreeRelation => block_second_degree_relations_changes} -> + Multi.run(multi, :block_second_degree_relations, fn _ -> + insert( + block_second_degree_relations_changes, + %{ + timeout: options[:block_second_degree_relations][:timeout] || @timeout + } + ) + end) + + _ -> + multi + end + end + + def timeout, do: @timeout + + @spec insert([map()], %{required(:timeout) => timeout}) :: + {:ok, %{nephew_hash: Hash.Full.t(), uncle_hash: Hash.Full.t()}} | {:error, [Changeset.t()]} + defp insert(changes_list, %{timeout: timeout}) when is_list(changes_list) do + # order so that row ShareLocks are grabbed in a consistent order + ordered_changes_list = Enum.sort_by(changes_list, &{&1.nephew_hash, &1.uncle_hash}) + + Import.insert_changes_list(ordered_changes_list, + conflict_target: [:nephew_hash, :uncle_hash], + on_conflict: + from( + block_second_degree_relation in Block.SecondDegreeRelation, + update: [ + set: [ + uncle_fetched_at: + fragment("LEAST(?, EXCLUDED.uncle_fetched_at)", block_second_degree_relation.uncle_fetched_at) + ] + ] + ), + for: Block.SecondDegreeRelation, + returning: [:nephew_hash, :uncle_hash], + timeout: timeout, + # block_second_degree_relations doesn't have timestamps + timestamps: %{} + ) + end +end diff --git a/apps/explorer/lib/explorer/chain/import/blocks.ex b/apps/explorer/lib/explorer/chain/import/blocks.ex index 7903bbc999..dbd71c0de8 100644 --- a/apps/explorer/lib/explorer/chain/import/blocks.ex +++ b/apps/explorer/lib/explorer/chain/import/blocks.ex @@ -56,7 +56,7 @@ defmodule Explorer.Chain.Import.Blocks do blocks, %{ timeout: - options[:block_second_degree_relations][:timeout] || Import.block_second_degree_relations_timeout(), + options[:block_second_degree_relations][:timeout] || Import.Block.SecondDegreeRelations.timeout(), timestamps: timestamps } ) diff --git a/apps/indexer/lib/indexer/block/fetcher.ex b/apps/indexer/lib/indexer/block/fetcher.ex index 29e80907b5..7c5ec0f37b 100644 --- a/apps/indexer/lib/indexer/block/fetcher.ex +++ b/apps/indexer/lib/indexer/block/fetcher.ex @@ -26,7 +26,7 @@ defmodule Indexer.Block.Fetcher do addresses: Import.Addresses.options(), address_coin_balances: Import.Address.CoinBalances.options(), blocks: Import.Blocks.options(), - block_second_degree_relations: Import.block_second_degree_relations_options(), + block_second_degree_relations: Import.Block.SecondDegreeRelations.options(), broadcast: boolean, logs: Import.logs_options(), receipts: Import.receipts_options(),