Extract Explorer.Chain.Import.Block.SecondDegreeRelations

pull/859/head
Luke Imhoff 6 years ago
parent 56f0365239
commit 4842462667
  1. 62
      apps/explorer/lib/explorer/chain/import.ex
  2. 68
      apps/explorer/lib/explorer/chain/import/block/second_degree_relations.ex
  3. 2
      apps/explorer/lib/explorer/chain/import/blocks.ex
  4. 2
      apps/indexer/lib/indexer/block/fetcher.ex

@ -26,10 +26,6 @@ defmodule Explorer.Chain.Import do
@type changeset_function_name :: atom @type changeset_function_name :: atom
@type on_conflict :: :nothing | :replace_all @type on_conflict :: :nothing | :replace_all
@type params :: [map()] @type params :: [map()]
@type block_second_degree_relations_options :: %{
required(:params) => params,
optional(:timeout) => timeout
}
@type internal_transactions_options :: %{ @type internal_transactions_options :: %{
required(:params) => params, required(:params) => params,
optional(:timeout) => timeout optional(:timeout) => timeout
@ -69,7 +65,7 @@ defmodule Explorer.Chain.Import do
optional(:addresses) => Import.Addresses.options(), optional(:addresses) => Import.Addresses.options(),
optional(:address_coin_balances) => Import.Address.CoinBalances.options(), optional(:address_coin_balances) => Import.Address.CoinBalances.options(),
optional(:blocks) => Import.Blocks.options(), optional(:blocks) => Import.Blocks.options(),
optional(:block_second_degree_relations) => block_second_degree_relations_options, optional(:block_second_degree_relations) => Import.Block.SecondDegreeRelations.options(),
optional(:broadcast) => boolean, optional(:broadcast) => boolean,
optional(:internal_transactions) => internal_transactions_options, optional(:internal_transactions) => internal_transactions_options,
optional(:logs) => logs_options, optional(:logs) => logs_options,
@ -87,9 +83,7 @@ defmodule Explorer.Chain.Import do
optional(:addresses) => Import.Addresses.imported(), optional(:addresses) => Import.Addresses.imported(),
optional(:address_coin_balances) => Import.Address.CoinBalances.imported(), optional(:address_coin_balances) => Import.Address.CoinBalances.imported(),
optional(:blocks) => Import.Blocks.imported(), optional(:blocks) => Import.Blocks.imported(),
optional(:block_second_degree_relations) => [ optional(:block_second_degree_relations) => Import.Block.SecondDegreeRelations.imported(),
%{required(:nephew_hash) => Hash.Full.t(), required(:uncle_hash) => Hash.Full.t()}
],
optional(:internal_transactions) => [ optional(:internal_transactions) => [
%{required(:index) => non_neg_integer(), required(:transaction_hash) => Hash.Full.t()} %{required(:index) => non_neg_integer(), required(:transaction_hash) => Hash.Full.t()}
], ],
@ -113,7 +107,6 @@ defmodule Explorer.Chain.Import do
@transaction_timeout 120_000 @transaction_timeout 120_000
@insert_block_second_degree_relations_timeout 60_000
@insert_internal_transactions_timeout 60_000 @insert_internal_transactions_timeout 60_000
@insert_logs_timeout 60_000 @insert_logs_timeout 60_000
@insert_token_transfers_timeout 60_000 @insert_token_transfers_timeout 60_000
@ -122,7 +115,6 @@ defmodule Explorer.Chain.Import do
@insert_transactions_timeout 60_000 @insert_transactions_timeout 60_000
@insert_transaction_forks_timeout 60_000 @insert_transaction_forks_timeout 60_000
def block_second_degree_relations_timeout, do: @insert_block_second_degree_relations_timeout
def transactions_timeout, do: @insert_transactions_timeout def transactions_timeout, do: @insert_transactions_timeout
def transaction_forks_timeout, do: @insert_transaction_forks_timeout def transaction_forks_timeout, do: @insert_transaction_forks_timeout
@ -174,7 +166,8 @@ defmodule Explorer.Chain.Import do
* `:timeout` - the timeout for inserting all blocks. Defaults to `#{Import.Blocks.timeout()}` milliseconds. * `:timeout` - the timeout for inserting all blocks. Defaults to `#{Import.Blocks.timeout()}` milliseconds.
* `:block_second_degree_relations` * `:block_second_degree_relations`
* `:params` - `list` of params `for `Explorer.Chain.Block.SecondDegreeRelation.changeset/2`. * `:params` - `list` of params `for `Explorer.Chain.Block.SecondDegreeRelation.changeset/2`.
* `:timeout` - the timeout for inserting all uncles found in the params list. * `:timeout` - the timeout for inserting all uncles found in the params list. Defaults to
`#{Import.Block.SecondDegreeRelations.timeout()}` milliseconds.
* `:broadcast` - Boolean flag indicating whether or not to broadcast the event. * `:broadcast` - Boolean flag indicating whether or not to broadcast the event.
* `:internal_transactions` * `:internal_transactions`
* `:params` - `list` of params for `Explorer.Chain.InternalTransaction.changeset/2`. * `:params` - `list` of params for `Explorer.Chain.InternalTransaction.changeset/2`.
@ -310,7 +303,7 @@ defmodule Explorer.Chain.Import do
|> Import.Addresses.run(ecto_schema_module_to_changes_list_map, full_options) |> Import.Addresses.run(ecto_schema_module_to_changes_list_map, full_options)
|> Import.Address.CoinBalances.run(ecto_schema_module_to_changes_list_map, full_options) |> Import.Address.CoinBalances.run(ecto_schema_module_to_changes_list_map, full_options)
|> Import.Blocks.run(ecto_schema_module_to_changes_list_map, full_options) |> Import.Blocks.run(ecto_schema_module_to_changes_list_map, full_options)
|> run_block_second_degree_relations(ecto_schema_module_to_changes_list_map, full_options) |> Import.Block.SecondDegreeRelations.run(ecto_schema_module_to_changes_list_map, full_options)
|> run_transactions(ecto_schema_module_to_changes_list_map, full_options) |> run_transactions(ecto_schema_module_to_changes_list_map, full_options)
|> run_transaction_forks(ecto_schema_module_to_changes_list_map, full_options) |> run_transaction_forks(ecto_schema_module_to_changes_list_map, full_options)
|> run_internal_transactions(ecto_schema_module_to_changes_list_map, full_options) |> run_internal_transactions(ecto_schema_module_to_changes_list_map, full_options)
@ -481,51 +474,6 @@ defmodule Explorer.Chain.Import do
end end
end end
defp run_block_second_degree_relations(multi, ecto_schema_module_to_changes_list, options)
when is_map(ecto_schema_module_to_changes_list) and is_map(options) do
case ecto_schema_module_to_changes_list do
%{Block.SecondDegreeRelation => block_second_degree_relations_changes} ->
Multi.run(multi, :block_second_degree_relations, fn _ ->
insert_block_second_degree_relations(
block_second_degree_relations_changes,
%{
timeout:
options[:block_second_degree_relations][:timeout] || @insert_block_second_degree_relations_timeout
}
)
end)
_ ->
multi
end
end
@spec insert_block_second_degree_relations([map()], %{required(:timeout) => timeout}) ::
{:ok, %{nephew_hash: Hash.Full.t(), uncle_hash: Hash.Full.t()}} | {:error, [Changeset.t()]}
defp insert_block_second_degree_relations(changes_list, %{timeout: timeout}) when is_list(changes_list) do
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = Enum.sort_by(changes_list, &{&1.nephew_hash, &1.uncle_hash})
insert_changes_list(ordered_changes_list,
conflict_target: [:nephew_hash, :uncle_hash],
on_conflict:
from(
block_second_degree_relation in Block.SecondDegreeRelation,
update: [
set: [
uncle_fetched_at:
fragment("LEAST(?, EXCLUDED.uncle_fetched_at)", block_second_degree_relation.uncle_fetched_at)
]
]
),
for: Block.SecondDegreeRelation,
returning: [:nephew_hash, :uncle_hash],
timeout: timeout,
# block_second_degree_relations doesn't have timestamps
timestamps: %{}
)
end
@spec insert_internal_transactions([map], %{required(:timeout) => timeout, required(:timestamps) => timestamps}) :: @spec insert_internal_transactions([map], %{required(:timeout) => timeout, required(:timestamps) => timestamps}) ::
{:ok, [%{index: non_neg_integer, transaction_hash: Hash.t()}]} {:ok, [%{index: non_neg_integer, transaction_hash: Hash.t()}]}
| {:error, [Changeset.t()]} | {:error, [Changeset.t()]}

@ -0,0 +1,68 @@
defmodule Explorer.Chain.Import.Block.SecondDegreeRelations do
@moduledoc """
Bulk imports `t:Explorer.Chain.Block.SecondDegreeRelation.t/0`.
"""
require Ecto.Query
import Ecto.Query, only: [from: 2]
alias Ecto.{Changeset, Multi}
alias Explorer.Chain.{Block, Hash, Import}
@timeout 60_000
@type options :: %{
required(:params) => Import.params(),
optional(:timeout) => timeout
}
@type imported :: [
%{required(:nephew_hash) => Hash.Full.t(), required(:uncle_hash) => Hash.Full.t()}
]
def run(multi, ecto_schema_module_to_changes_list, options)
when is_map(ecto_schema_module_to_changes_list) and is_map(options) do
case ecto_schema_module_to_changes_list do
%{Block.SecondDegreeRelation => block_second_degree_relations_changes} ->
Multi.run(multi, :block_second_degree_relations, fn _ ->
insert(
block_second_degree_relations_changes,
%{
timeout: options[:block_second_degree_relations][:timeout] || @timeout
}
)
end)
_ ->
multi
end
end
def timeout, do: @timeout
@spec insert([map()], %{required(:timeout) => timeout}) ::
{:ok, %{nephew_hash: Hash.Full.t(), uncle_hash: Hash.Full.t()}} | {:error, [Changeset.t()]}
defp insert(changes_list, %{timeout: timeout}) when is_list(changes_list) do
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = Enum.sort_by(changes_list, &{&1.nephew_hash, &1.uncle_hash})
Import.insert_changes_list(ordered_changes_list,
conflict_target: [:nephew_hash, :uncle_hash],
on_conflict:
from(
block_second_degree_relation in Block.SecondDegreeRelation,
update: [
set: [
uncle_fetched_at:
fragment("LEAST(?, EXCLUDED.uncle_fetched_at)", block_second_degree_relation.uncle_fetched_at)
]
]
),
for: Block.SecondDegreeRelation,
returning: [:nephew_hash, :uncle_hash],
timeout: timeout,
# block_second_degree_relations doesn't have timestamps
timestamps: %{}
)
end
end

@ -56,7 +56,7 @@ defmodule Explorer.Chain.Import.Blocks do
blocks, blocks,
%{ %{
timeout: timeout:
options[:block_second_degree_relations][:timeout] || Import.block_second_degree_relations_timeout(), options[:block_second_degree_relations][:timeout] || Import.Block.SecondDegreeRelations.timeout(),
timestamps: timestamps timestamps: timestamps
} }
) )

@ -26,7 +26,7 @@ defmodule Indexer.Block.Fetcher do
addresses: Import.Addresses.options(), addresses: Import.Addresses.options(),
address_coin_balances: Import.Address.CoinBalances.options(), address_coin_balances: Import.Address.CoinBalances.options(),
blocks: Import.Blocks.options(), blocks: Import.Blocks.options(),
block_second_degree_relations: Import.block_second_degree_relations_options(), block_second_degree_relations: Import.Block.SecondDegreeRelations.options(),
broadcast: boolean, broadcast: boolean,
logs: Import.logs_options(), logs: Import.logs_options(),
receipts: Import.receipts_options(), receipts: Import.receipts_options(),

Loading…
Cancel
Save