Explorer.Chain.Import.Runner behaviour

pull/859/head
Luke Imhoff 6 years ago
parent 4ccfceb93a
commit 1918040eeb
  1. 413
      apps/explorer/lib/explorer/chain/import.ex
  2. 123
      apps/explorer/lib/explorer/chain/import/address/coin_balances.ex
  3. 128
      apps/explorer/lib/explorer/chain/import/address/token_balances.ex
  4. 137
      apps/explorer/lib/explorer/chain/import/addresses.ex
  5. 71
      apps/explorer/lib/explorer/chain/import/block/second_degree_relations.ex
  6. 130
      apps/explorer/lib/explorer/chain/import/blocks.ex
  7. 71
      apps/explorer/lib/explorer/chain/import/internal_transactions.ex
  8. 55
      apps/explorer/lib/explorer/chain/import/logs.ex
  9. 37
      apps/explorer/lib/explorer/chain/import/runner.ex
  10. 55
      apps/explorer/lib/explorer/chain/import/token_transfers.ex
  11. 85
      apps/explorer/lib/explorer/chain/import/tokens.ex
  12. 74
      apps/explorer/lib/explorer/chain/import/transaction/forks.ex
  13. 54
      apps/explorer/lib/explorer/chain/import/transactions.ex
  14. 2
      apps/explorer/lib/explorer/chain/transaction.ex
  15. 19
      apps/indexer/lib/indexer/block/fetcher.ex
  16. 3
      apps/indexer/lib/indexer/block/uncle/fetcher.ex

@ -4,55 +4,62 @@ defmodule Explorer.Chain.Import do
"""
alias Ecto.{Changeset, Multi}
alias Explorer.Chain.Import
alias Explorer.Repo
alias Explorer.Chain.{
Address,
Address.CoinBalance,
Address.TokenBalance,
Block,
Import,
InternalTransaction,
Log,
Token,
TokenTransfer,
Transaction
}
# in order so that foreign keys are inserted before being referenced
@runners [
Import.Addresses,
Import.Address.CoinBalances,
Import.Blocks,
Import.Block.SecondDegreeRelations,
Import.Transactions,
Import.Transaction.Forks,
Import.InternalTransactions,
Import.Logs,
Import.Tokens,
Import.TokenTransfers,
Import.Address.TokenBalances
]
quoted_runner_option_value =
quote do
Import.Runner.options()
end
alias Explorer.Repo
quoted_runner_options =
for runner <- @runners do
quoted_key =
quote do
optional(unquote(runner.option_key()))
end
{quoted_key, quoted_runner_option_value}
end
@type changeset_function_name :: atom
@type on_conflict :: :nothing | :replace_all
@type params :: [map()]
@type all_options :: %{
optional(:addresses) => Import.Addresses.options(),
optional(:address_coin_balances) => Import.Address.CoinBalances.options(),
optional(:address_token_balances) => Import.Address.TokenBalances.options(),
optional(:blocks) => Import.Blocks.options(),
optional(:block_second_degree_relations) => Import.Block.SecondDegreeRelations.options(),
optional(:broadcast) => boolean,
optional(:internal_transactions) => Import.InternalTransactions.options(),
optional(:logs) => Import.Logs.options(),
optional(:timeout) => timeout,
optional(:token_transfers) => Import.TokenTransfers.options(),
optional(:tokens) => Import.Tokens.options(),
optional(:transactions) => Import.Transactions.options(),
optional(:transaction_forks) => Import.Transaction.Forks.options()
unquote_splicing(quoted_runner_options)
}
quoted_runner_imported =
for runner <- @runners do
quoted_key =
quote do
optional(unquote(runner.option_key()))
end
quoted_value =
quote do
unquote(runner).imported()
end
{quoted_key, quoted_value}
end
@type all_result ::
{:ok,
%{
optional(:addresses) => Import.Addresses.imported(),
optional(:address_coin_balances) => Import.Address.CoinBalances.imported(),
optional(:address_token_balances) => Import.Address.TokenBalances.imported(),
optional(:blocks) => Import.Blocks.imported(),
optional(:block_second_degree_relations) => Import.Block.SecondDegreeRelations.imported(),
optional(:internal_transactions) => Import.InternalTransactions.imported(),
optional(:logs) => Import.Logs.imported(),
optional(:token_transfers) => Import.TokenTransfers.imported(),
optional(:tokens) => Import.Tokens.imported(),
optional(:transactions) => Import.Transactions.imported(),
optional(:transaction_forks) => Import.Transaction.Forks.imported()
}}
{:ok, %{unquote_splicing(quoted_runner_imported)}}
| {:error, [Changeset.t()]}
| {:error, step :: Ecto.Multi.name(), failed_value :: any(),
changes_so_far :: %{optional(Ecto.Multi.name()) => any()}}
@ -62,23 +69,37 @@ defmodule Explorer.Chain.Import do
# milliseconds
@transaction_timeout 120_000
@imported_table_rows @runners
|> Stream.map(&Map.put(&1.imported_table_row(), :key, &1.option_key()))
|> Enum.map_join("\n", fn %{
key: key,
value_type: value_type,
value_description: value_description
} ->
"| `#{inspect(key)}` | `#{value_type}` | #{value_description} |"
end)
@runner_options_doc Enum.map_join(@runners, fn runner ->
ecto_schema_module = runner.ecto_schema_module()
"""
* `#{runner.option_key() |> inspect()}`
* `:on_conflict` - what to do if a conflict occurs with a pre-existing row: `:nothing`, `:replace_all`, or an
`t:Ecto.Query.t/0` to update specific columns.
* `:params` - `list` of params for changeset function in `#{ecto_schema_module}`.
* `:with` - changeset function to use in `#{ecto_schema_module}`. Default to `:changeset`.
* `:timeout` - the timeout for inserting each batch of changes from `:params`.
Defaults to `#{runner.timeout()}` milliseconds.
"""
end)
@doc """
Bulk insert all data stored in the `Explorer`.
The import returns the unique key(s) for each type of record inserted.
| Key | Value Type | Value Description |
|----------------------------------|-------------------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------|
| `:addresses` | `[Explorer.Chain.Address.t()]` | List of `t:Explorer.Chain.Address.t/0`s |
| `:address_coin_balances` | `[%{address_hash: Explorer.Chain.Hash.t(), block_number: Explorer.Chain.Block.block_number()}]` | List of maps of the `t:Explorer.Chain.Address.CoinBalance.t/0` `address_hash` and `block_number` |
| `:blocks` | `[Explorer.Chain.Block.t()]` | List of `t:Explorer.Chain.Block.t/0`s |
| `:internal_transactions` | `[%{index: non_neg_integer(), transaction_hash: Explorer.Chain.Hash.t()}]` | List of maps of the `t:Explorer.Chain.InternalTransaction.t/0` `index` and `transaction_hash` |
| `:logs` | `[Explorer.Chain.Log.t()]` | List of `t:Explorer.Chain.Log.t/0`s |
| `:token_transfers` | `[Explorer.Chain.TokenTransfer.t()]` | List of `t:Explorer.Chain.TokenTransfer.t/0`s |
| `:tokens` | `[Explorer.Chain.Token.t()]` | List of `t:Explorer.Chain.token.t/0`s |
| `:transactions` | `[Explorer.Chain.Hash.t()]` | List of `t:Explorer.Chain.Transaction.t/0` `hash` |
| `:transaction_forks` | `[%{uncle_hash: Explorer.Chain.Hash.t(), hash: Explorer.Chain.Hash.t()}]` | List of maps of the `t:Explorer.Chain.Transaction.Fork.t/0` `uncle_hash` and `hash` |
| `:block_second_degree_relations` | `[%{uncle_hash: Explorer.Chain.Hash.t(), nephew_hash: Explorer.Chain.Hash.t()]` | List of maps of the `t:Explorer.Chain.Block.SecondDegreeRelation.t/0` `uncle_hash` and `nephew_hash` |
| Key | Value Type | Value Description |
|-----|------------|-------------------|
#{@imported_table_rows}
The params for each key are validated using the corresponding `Ecto.Schema` module's `changeset/2` function. If there
are errors, they are returned in `Ecto.Changeset.t`s, so that the original, invalid value can be reconstructed for any
@ -97,68 +118,17 @@ defmodule Explorer.Chain.Import do
## Options
* `:addresses`
* `:params` - `list` of params for `Explorer.Chain.Address.changeset/2`.
* `:timeout` - the timeout for inserting all addresses. Defaults to `#{Import.Addresses.timeout()}` milliseconds.
* `:with` - the changeset function on `Explorer.Chain.Address` to use validate `:params`.
* `:address_coin_balances`
* `:params` - `list` of params for `Explorer.Chain.Address.CoinBalance.changeset/2`.
* `:timeout` - the timeout for inserting all balances. Defaults to `#{Import.Address.CoinBalances.timeout()}`
milliseconds.
* `:address_token_balances`
* `:params` - `list` of params for `Explorer.Chain.TokenBalance.changeset/2`
* `:blocks`
* `:params` - `list` of params for `Explorer.Chain.Block.changeset/2`.
* `:timeout` - the timeout for inserting all blocks. Defaults to `#{Import.Blocks.timeout()}` milliseconds.
* `:block_second_degree_relations`
* `:params` - `list` of params `for `Explorer.Chain.Block.SecondDegreeRelation.changeset/2`.
* `:timeout` - the timeout for inserting all uncles found in the params list. Defaults to
`#{Import.Block.SecondDegreeRelations.timeout()}` milliseconds.
* `:broadcast` - Boolean flag indicating whether or not to broadcast the event.
* `:internal_transactions`
* `:params` - `list` of params for `Explorer.Chain.InternalTransaction.changeset/2`.
* `:timeout` - the timeout for inserting all internal transactions. Defaults to
`#{Import.InternalTransactions.timeout()}` milliseconds.
* `:logs`
* `:params` - `list` of params for `Explorer.Chain.Log.changeset/2`.
* `:timeout` - the timeout for inserting all logs. Defaults to `#{Import.Logs.timeout()}` milliseconds.
* `:timeout` - the timeout for the whole `c:Ecto.Repo.transaction/0` call. Defaults to `#{@transaction_timeout}`
milliseconds.
* `:token_transfers`
* `:params` - `list` of params for `Explorer.Chain.TokenTransfer.changeset/2`
* `:timeout` - the timeout for inserting all token transfers. Defaults to `#{Import.TokenTransfers.timeout()}`
milliseconds.
* `:tokens`
* `:on_conflict` - Whether to do `:nothing` or `:replace_all` columns when there is a pre-existing token
with the same contract address hash.
* `:params` - `list` of params for `Explorer.Chain.Token.changeset/2`
* `:timeout` - the timeout for inserting all tokens. Defaults to `#{Import.Tokens.timeout()}` milliseconds.
* `:transactions`
* `:on_conflict` - Whether to do `:nothing` or `:replace_all` columns when there is a pre-existing transaction
with the same hash.
*NOTE*: Because the repository transaction for a pending `Explorer.Chain.Transaction`s could `COMMIT` after the
repository transaction for that same transaction being collated into a block, writers, it is recommended to use
`:nothing` for pending transactions and `:replace_all` for collated transactions, so that collated transactions
win.
* `:params` - `list` of params for `Explorer.Chain.Transaction.changeset/2`.
* `:timeout` - the timeout for inserting all transactions found in the params lists across all
types. Defaults to `#{Import.Transactions.timeout()}` milliseconds.
* `:with` - the changeset function on `Explorer.Chain.Transaction` to use validate `:params`.
* `:transaction_forks`
* `:params` - `list` of params for `Explorer.Chain.Transaction.Fork.changeset/2`.
* `:timeout` - the timeout for inserting all transaction forks. Defaults to
`#{Import.Transaction.Forks.timeout()}` milliseconds.
* `:timeout` - the timeout for `Repo.transaction`. Defaults to `#{@transaction_timeout}` milliseconds.
#{@runner_options_doc}
"""
@spec all(all_options()) :: all_result()
def all(options) when is_map(options) do
changes_list_arguments_list = import_options_to_changes_list_arguments_list(options)
with {:ok, ecto_schema_module_to_changes_list_map} <-
changes_list_arguments_list_to_ecto_schema_module_to_changes_list_map(changes_list_arguments_list),
{:ok, data} <- insert_ecto_schema_module_to_changes_list_map(ecto_schema_module_to_changes_list_map, options) do
with {:ok, runner_options_pairs} <- validate_options(options),
{:ok, valid_runner_option_pairs} <- validate_runner_options_pairs(runner_options_pairs),
{:ok, runner_changes_list_pairs} <- runner_changes_list_pairs(valid_runner_option_pairs),
{:ok, data} <- insert_runner_changes_list_pairs(runner_changes_list_pairs, options) do
if Map.get(options, :broadcast, false), do: broadcast_events(data)
{:ok, data}
end
@ -179,84 +149,147 @@ defmodule Explorer.Chain.Import do
end)
end
defp changes_list_arguments_list_to_ecto_schema_module_to_changes_list_map(changes_list_arguments_list) do
changes_list_arguments_list
|> Stream.map(fn [params_list, options] ->
ecto_schema_module = Keyword.fetch!(options, :for)
{ecto_schema_module, changes_list(params_list, options)}
end)
|> Enum.reduce({:ok, %{}}, fn
{ecto_schema_module, {:ok, changes_list}}, {:ok, ecto_schema_module_to_changes_list_map} ->
{:ok, Map.put(ecto_schema_module_to_changes_list_map, ecto_schema_module, changes_list)}
defp runner_changes_list_pairs(runner_options_pairs) when is_list(runner_options_pairs) do
{status, reversed} =
runner_options_pairs
|> Stream.map(fn {runner, options} -> runner_changes_list(runner, options) end)
|> Enum.reduce({:ok, []}, fn
{:ok, runner_changes_pair}, {:ok, acc_runner_changes_pairs} ->
{:ok, [runner_changes_pair | acc_runner_changes_pairs]}
{_, {:ok, _}}, {:error, _} = error ->
error
{:ok, _}, {:error, _} = error ->
error
{_, {:error, _} = error}, {:ok, _} ->
error
{:error, _} = error, {:ok, _} ->
error
{_, {:error, changesets}}, {:error, acc_changesets} ->
{:error, acc_changesets ++ changesets}
end)
{:error, runner_changesets}, {:error, acc_changesets} ->
{:error, acc_changesets ++ runner_changesets}
end)
{status, Enum.reverse(reversed)}
end
@spec changes_list(params :: [map], [{:for, module} | {:with, atom}]) :: {:ok, [map]} | {:error, [Changeset.t()]}
defp changes_list(params, options) when is_list(options) do
ecto_schema_module = Keyword.fetch!(options, :for)
changeset_function_name = Keyword.get(options, :with, :changeset)
defp runner_changes_list(runner, %{params: params} = options) do
ecto_schema_module = runner.ecto_schema_module()
changeset_function_name = Map.get(options, :with, :changeset)
struct = ecto_schema_module.__struct__()
{status, acc} =
params
|> Stream.map(&apply(ecto_schema_module, changeset_function_name, [struct, &1]))
|> Enum.reduce({:ok, []}, fn
changeset = %Changeset{valid?: false}, {:ok, _} ->
{:error, [changeset]}
params
|> Stream.map(&apply(ecto_schema_module, changeset_function_name, [struct, &1]))
|> Enum.reduce({:ok, []}, fn
changeset = %Changeset{valid?: false}, {:ok, _} ->
{:error, [changeset]}
changeset = %Changeset{valid?: false}, {:error, acc_changesets} ->
{:error, [changeset | acc_changesets]}
%Changeset{changes: changes, valid?: true}, {:ok, acc_changes} ->
{:ok, [changes | acc_changes]}
%Changeset{valid?: true}, {:error, _} = error ->
error
end)
|> case do
{:ok, changes} -> {:ok, {runner, changes}}
{:error, _} = error -> error
end
end
@global_options ~w(broadcast timeout)a
defp validate_options(options) when is_map(options) do
local_options = Map.drop(options, @global_options)
{reverse_runner_options_pairs, unknown_options} =
Enum.reduce(@runners, {[], local_options}, fn runner, {acc_runner_options_pairs, unknown_options} = acc ->
option_key = runner.option_key()
changeset = %Changeset{valid?: false}, {:error, acc_changesets} ->
{:error, [changeset | acc_changesets]}
case local_options do
%{^option_key => option_value} ->
{[{runner, option_value} | acc_runner_options_pairs], Map.delete(unknown_options, option_key)}
%Changeset{changes: changes, valid?: true}, {:ok, acc_changes} ->
{:ok, [changes | acc_changes]}
_ ->
acc
end
end)
%Changeset{valid?: true}, {:error, _} = error ->
case Enum.empty?(unknown_options) do
true -> {:ok, Enum.reverse(reverse_runner_options_pairs)}
false -> {:error, {:unknown_options, unknown_options}}
end
end
defp validate_runner_options_pairs(runner_options_pairs) when is_list(runner_options_pairs) do
{status, reversed} =
runner_options_pairs
|> Stream.map(fn {runner, options} -> validate_runner_options(runner, options) end)
|> Enum.reduce({:ok, []}, fn
:ignore, acc ->
acc
{:ok, valid_runner_option_pair}, {:ok, valid_runner_options_pairs} ->
{:ok, [valid_runner_option_pair | valid_runner_options_pairs]}
{:ok, _}, {:error, _} = error ->
error
{:error, reason}, {:ok, _} ->
{:error, [reason]}
{:error, reason}, {:error, reasons} ->
{:error, [reason | reasons]}
end)
{status, Enum.reverse(acc)}
{status, Enum.reverse(reversed)}
end
defp validate_runner_options(runner, options) when is_map(options) do
option_key = runner.option_key()
case {validate_runner_option_params_required(option_key, options),
validate_runner_options_known(option_key, options)} do
{:ignore, :ok} -> :ignore
{:ignore, {:error, _} = error} -> error
{:ok, :ok} -> {:ok, {runner, options}}
{:ok, {:error, _} = error} -> error
{{:error, reason}, :ok} -> {:error, [reason]}
{{:error, reason}, {:error, reasons}} -> {:error, [reason | reasons]}
end
end
defp validate_runner_option_params_required(_, %{params: params}) do
case Enum.empty?(params) do
false -> :ok
true -> :ignore
end
end
defp validate_runner_option_params_required(runner_option_key, _),
do: {:error, {:required, [runner_option_key, :params]}}
@local_options ~w(on_conflict params with timeout)a
defp validate_runner_options_known(runner_option_key, options) do
unknown_option_keys = Map.keys(options) -- @local_options
if Enum.empty?(unknown_option_keys) do
:ok
else
reasons = Enum.map(unknown_option_keys, &{:unknown, [runner_option_key, &1]})
{:error, reasons}
end
end
@import_option_key_to_ecto_schema_module %{
addresses: Address,
address_coin_balances: CoinBalance,
address_token_balances: TokenBalance,
blocks: Block,
block_second_degree_relations: Block.SecondDegreeRelation,
internal_transactions: InternalTransaction,
logs: Log,
token_transfers: TokenTransfer,
tokens: Token,
transactions: Transaction,
transaction_forks: Transaction.Fork
}
defp ecto_schema_module_to_changes_list_map_to_multi(ecto_schema_module_to_changes_list_map, options)
when is_map(options) do
defp runner_changes_list_pairs_to_multi(runner_changes_list_pairs, options)
when is_list(runner_changes_list_pairs) and is_map(options) do
timestamps = timestamps()
full_options = Map.put(options, :timestamps, timestamps)
Multi.new()
|> Import.Addresses.run(ecto_schema_module_to_changes_list_map, full_options)
|> Import.Address.CoinBalances.run(ecto_schema_module_to_changes_list_map, full_options)
|> Import.Blocks.run(ecto_schema_module_to_changes_list_map, full_options)
|> Import.Block.SecondDegreeRelations.run(ecto_schema_module_to_changes_list_map, full_options)
|> Import.Transactions.run(ecto_schema_module_to_changes_list_map, full_options)
|> Import.Transaction.Forks.run(ecto_schema_module_to_changes_list_map, full_options)
|> Import.InternalTransactions.run(ecto_schema_module_to_changes_list_map, full_options)
|> Import.Logs.run(ecto_schema_module_to_changes_list_map, full_options)
|> Import.Tokens.run(ecto_schema_module_to_changes_list_map, full_options)
|> Import.TokenTransfers.run(ecto_schema_module_to_changes_list_map, full_options)
|> Import.Address.TokenBalances.run(ecto_schema_module_to_changes_list_map, full_options)
Enum.reduce(runner_changes_list_pairs, Multi.new(), fn {runner, changes_list}, acc ->
runner.run(acc, changes_list, full_options)
end)
end
def insert_changes_list(changes_list, options) when is_list(changes_list) do
@ -282,49 +315,13 @@ defmodule Explorer.Chain.Import do
Map.merge(changes, timestamps)
end
defp import_options_to_changes_list_arguments_list(options) do
Enum.flat_map(
@import_option_key_to_ecto_schema_module,
&import_options_to_changes_list_arguments_list_flat_mapper(options, &1)
)
end
defp import_options_to_changes_list_arguments_list_flat_mapper(options, {option_key, ecto_schema_module}) do
case Map.fetch(options, option_key) do
{:ok, option_value} ->
import_option_to_changes_list_arguments_list_flat_mapper(option_value, ecto_schema_module)
:error ->
[]
end
end
defp import_option_to_changes_list_arguments_list_flat_mapper(%{params: params} = option_value, ecto_schema_module) do
# Use `Enum.empty?` instead of `[_ | _]` as params are allowed to be any collection of maps
case Enum.empty?(params) do
false ->
[
[
params,
[for: ecto_schema_module, with: Map.get(option_value, :with, :changeset)]
]
]
# filter out empty params as early as possible, so that later stages don't need to deal with empty params
# leading to selecting all rows because they produce no where conditions as happened in
# https://github.com/poanetwork/blockscout/issues/850
true ->
[]
end
end
defp import_transaction(multi, options) when is_map(options) do
Repo.transaction(multi, timeout: Map.get(options, :timeout, @transaction_timeout))
end
defp insert_ecto_schema_module_to_changes_list_map(ecto_schema_module_to_changes_list_map, options) do
ecto_schema_module_to_changes_list_map
|> ecto_schema_module_to_changes_list_map_to_multi(options)
defp insert_runner_changes_list_pairs(runner_changes_list_pairs, options) do
runner_changes_list_pairs
|> runner_changes_list_pairs_to_multi(options)
|> import_transaction(options)
end

@ -11,38 +11,40 @@ defmodule Explorer.Chain.Import.Address.CoinBalances do
alias Explorer.Chain.Address.CoinBalance
alias Explorer.Chain.{Block, Hash, Import, Wei}
@behaviour Import.Runner
# milliseconds
@timeout 60_000
@type options :: %{
required(:params) => Import.params(),
optional(:timeout) => timeout
}
@type imported :: [
%{required(:address_hash) => Hash.Address.t(), required(:block_number) => Block.block_number()}
]
def run(multi, ecto_schema_module_to_changes_list_map, options)
when is_map(ecto_schema_module_to_changes_list_map) and is_map(options) do
case ecto_schema_module_to_changes_list_map do
%{CoinBalance => balances_changes} ->
timestamps = Map.fetch!(options, :timestamps)
@impl Import.Runner
def ecto_schema_module, do: CoinBalance
Multi.run(multi, :address_coin_balances, fn _ ->
insert(
balances_changes,
%{
timeout: options[:address_coin_balances][:timeout] || @timeout,
timestamps: timestamps
}
)
end)
@impl Import.Runner
def option_key, do: :address_coin_balances
@impl Import.Runner
def imported_table_row do
%{
value_type: "[%{address_hash: Explorer.Chain.Hash.t(), block_number: Explorer.Chain.Block.block_number()}]",
value_description: "List of maps of the `t:#{ecto_schema_module()}.t/0` `address_hash` and `block_number`"
}
end
_ ->
multi
end
@impl Import.Runner
def run(multi, changes_list, options) when is_map(options) do
timestamps = Map.fetch!(options, :timestamps)
timeout = options[option_key()][:timeout] || @timeout
Multi.run(multi, :address_coin_balances, fn _ ->
insert(changes_list, %{timeout: timeout, timestamps: timestamps})
end)
end
@impl Import.Runner
def timeout, do: @timeout
@spec insert(
@ -60,7 +62,9 @@ defmodule Explorer.Chain.Import.Address.CoinBalances do
) ::
{:ok, [%{required(:address_hash) => Hash.Address.t(), required(:block_number) => Block.block_number()}]}
| {:error, [Changeset.t()]}
defp insert(changes_list, %{timeout: timeout, timestamps: timestamps}) when is_list(changes_list) do
defp insert(changes_list, %{timeout: timeout, timestamps: timestamps} = options) when is_list(changes_list) do
on_conflict = Map.get_lazy(options, :on_conflict, &default_on_conflict/0)
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = Enum.sort_by(changes_list, &{&1.address_hash, &1.block_number})
@ -68,42 +72,7 @@ defmodule Explorer.Chain.Import.Address.CoinBalances do
Import.insert_changes_list(
ordered_changes_list,
conflict_target: [:address_hash, :block_number],
on_conflict:
from(
balance in CoinBalance,
update: [
set: [
inserted_at: fragment("LEAST(EXCLUDED.inserted_at, ?)", balance.inserted_at),
updated_at: fragment("GREATEST(EXCLUDED.updated_at, ?)", balance.updated_at),
value:
fragment(
"""
CASE WHEN EXCLUDED.value IS NOT NULL AND (? IS NULL OR EXCLUDED.value_fetched_at > ?) THEN
EXCLUDED.value
ELSE
?
END
""",
balance.value_fetched_at,
balance.value_fetched_at,
balance.value
),
value_fetched_at:
fragment(
"""
CASE WHEN EXCLUDED.value IS NOT NULL AND (? IS NULL OR EXCLUDED.value_fetched_at > ?) THEN
EXCLUDED.value_fetched_at
ELSE
?
END
""",
balance.value_fetched_at,
balance.value_fetched_at,
balance.value_fetched_at
)
]
]
),
on_conflict: on_conflict,
for: CoinBalance,
timeout: timeout,
timestamps: timestamps
@ -111,4 +80,42 @@ defmodule Explorer.Chain.Import.Address.CoinBalances do
{:ok, Enum.map(ordered_changes_list, &Map.take(&1, ~w(address_hash block_number)a))}
end
def default_on_conflict do
from(
balance in CoinBalance,
update: [
set: [
inserted_at: fragment("LEAST(EXCLUDED.inserted_at, ?)", balance.inserted_at),
updated_at: fragment("GREATEST(EXCLUDED.updated_at, ?)", balance.updated_at),
value:
fragment(
"""
CASE WHEN EXCLUDED.value IS NOT NULL AND (? IS NULL OR EXCLUDED.value_fetched_at > ?) THEN
EXCLUDED.value
ELSE
?
END
""",
balance.value_fetched_at,
balance.value_fetched_at,
balance.value
),
value_fetched_at:
fragment(
"""
CASE WHEN EXCLUDED.value IS NOT NULL AND (? IS NULL OR EXCLUDED.value_fetched_at > ?) THEN
EXCLUDED.value_fetched_at
ELSE
?
END
""",
balance.value_fetched_at,
balance.value_fetched_at,
balance.value_fetched_at
)
]
]
)
end
end

@ -11,36 +11,38 @@ defmodule Explorer.Chain.Import.Address.TokenBalances do
alias Explorer.Chain.Address.TokenBalance
alias Explorer.Chain.Import
@behaviour Import.Runner
# milliseconds
@timeout 60_000
@type options :: %{
required(:params) => Import.params(),
optional(:timeout) => timeout
}
@type imported :: [TokenBalance.t()]
def run(multi, ecto_schema_module_to_changes_list, options)
when is_map(ecto_schema_module_to_changes_list) and is_map(options) do
case ecto_schema_module_to_changes_list do
%{TokenBalance => token_balances_changes} ->
timestamps = Map.fetch!(options, :timestamps)
Multi.run(multi, :address_token_balances, fn _ ->
insert(
token_balances_changes,
%{
timeout: options[:address_token_balances][:timeout] || @timeout,
timestamps: timestamps
}
)
end)
_ ->
multi
end
@impl Import.Runner
def ecto_schema_module, do: TokenBalance
@impl Import.Runner
def option_key, do: :address_token_balances
@impl Import.Runner
def imported_table_row do
%{
value_type: "[#{ecto_schema_module()}.t()]",
value_description: "List of `t:#{ecto_schema_module()}.t/0`s"
}
end
@impl Import.Runner
def run(multi, changes_list, options) when is_map(options) do
timestamps = Map.fetch!(options, :timestamps)
timeout = options[option_key()][:timeout] || @timeout
Multi.run(multi, :address_token_balances, fn _ ->
insert(changes_list, %{timeout: timeout, timestamps: timestamps})
end)
end
@impl Import.Runner
def timeout, do: @timeout
@spec insert([map()], %{
@ -49,8 +51,9 @@ defmodule Explorer.Chain.Import.Address.TokenBalances do
}) ::
{:ok, [TokenBalance.t()]}
| {:error, [Changeset.t()]}
def insert(changes_list, %{timeout: timeout, timestamps: timestamps})
when is_list(changes_list) do
def insert(changes_list, %{timeout: timeout, timestamps: timestamps} = options) when is_list(changes_list) do
on_conflict = Map.get_lazy(options, :on_conflict, &default_on_conflict/0)
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = Enum.sort_by(changes_list, &{&1.address_hash, &1.block_number})
@ -58,46 +61,49 @@ defmodule Explorer.Chain.Import.Address.TokenBalances do
Import.insert_changes_list(
ordered_changes_list,
conflict_target: ~w(address_hash token_contract_address_hash block_number)a,
on_conflict:
from(
token_balance in TokenBalance,
update: [
set: [
inserted_at: fragment("LEAST(EXCLUDED.inserted_at, ?)", token_balance.inserted_at),
updated_at: fragment("GREATEST(EXCLUDED.updated_at, ?)", token_balance.updated_at),
value:
fragment(
"""
CASE WHEN EXCLUDED.value IS NOT NULL AND (? IS NULL OR EXCLUDED.value_fetched_at > ?) THEN
EXCLUDED.value
ELSE
?
END
""",
token_balance.value_fetched_at,
token_balance.value_fetched_at,
token_balance.value
),
value_fetched_at:
fragment(
"""
CASE WHEN EXCLUDED.value IS NOT NULL AND (? IS NULL OR EXCLUDED.value_fetched_at > ?) THEN
EXCLUDED.value_fetched_at
ELSE
?
END
""",
token_balance.value_fetched_at,
token_balance.value_fetched_at,
token_balance.value_fetched_at
)
]
]
),
on_conflict: on_conflict,
for: TokenBalance,
returning: true,
timeout: timeout,
timestamps: timestamps
)
end
defp default_on_conflict do
from(
token_balance in TokenBalance,
update: [
set: [
inserted_at: fragment("LEAST(EXCLUDED.inserted_at, ?)", token_balance.inserted_at),
updated_at: fragment("GREATEST(EXCLUDED.updated_at, ?)", token_balance.updated_at),
value:
fragment(
"""
CASE WHEN EXCLUDED.value IS NOT NULL AND (? IS NULL OR EXCLUDED.value_fetched_at > ?) THEN
EXCLUDED.value
ELSE
?
END
""",
token_balance.value_fetched_at,
token_balance.value_fetched_at,
token_balance.value
),
value_fetched_at:
fragment(
"""
CASE WHEN EXCLUDED.value IS NOT NULL AND (? IS NULL OR EXCLUDED.value_fetched_at > ?) THEN
EXCLUDED.value_fetched_at
ELSE
?
END
""",
token_balance.value_fetched_at,
token_balance.value_fetched_at,
token_balance.value_fetched_at
)
]
]
)
end
end

@ -10,37 +10,38 @@ defmodule Explorer.Chain.Import.Addresses do
import Ecto.Query, only: [from: 2]
@behaviour Import.Runner
# milliseconds
@timeout 60_000
@type imported :: [Address.t()]
@type options :: %{
required(:params) => Import.params(),
optional(:timeout) => timeout,
optional(:with) => Import.changeset_function_name()
}
def run(multi, ecto_schema_module_to_changes_list_map, options)
when is_map(ecto_schema_module_to_changes_list_map) and is_map(options) do
case ecto_schema_module_to_changes_list_map do
%{Address => addresses_changes} ->
timestamps = Map.fetch!(options, :timestamps)
Multi.run(multi, :addresses, fn _ ->
insert(
addresses_changes,
%{
timeout: options[:addresses][:timeout] || @timeout,
timestamps: timestamps
}
)
end)
_ ->
multi
end
@impl Import.Runner
def ecto_schema_module, do: Address
@impl Import.Runner
def option_key, do: :addresses
@impl Import.Runner
def imported_table_row do
%{
value_type: "[#{ecto_schema_module()}.t()]",
value_description: "List of `t:#{ecto_schema_module()}.t/0`s"
}
end
@impl Import.Runner
def run(multi, changes_list, options) when is_map(options) do
timestamps = Map.fetch!(options, :timestamps)
timeout = options[:addresses][:timeout] || @timeout
Multi.run(multi, :addresses, fn _ ->
insert(changes_list, %{timeout: timeout, timestamps: timestamps})
end)
end
@impl Import.Runner
def timeout, do: @timeout
## Private Functions
@ -48,53 +49,17 @@ defmodule Explorer.Chain.Import.Addresses do
@spec insert([%{hash: Hash.Address.t()}], %{
required(:timeout) => timeout,
required(:timestamps) => Import.timestamps()
}) :: {:ok, [Hash.Address.t()]}
defp insert(changes_list, %{timeout: timeout, timestamps: timestamps}) when is_list(changes_list) do
}) :: {:ok, [Address.t()]}
defp insert(changes_list, %{timeout: timeout, timestamps: timestamps} = options) when is_list(changes_list) do
on_conflict = Map.get_lazy(options, :on_conflict, &default_on_conflict/0)
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = sort_changes_list(changes_list)
Import.insert_changes_list(
ordered_changes_list,
conflict_target: :hash,
on_conflict:
from(
address in Address,
update: [
set: [
contract_code: fragment("COALESCE(?, EXCLUDED.contract_code)", address.contract_code),
# ARGMAX on two columns
fetched_coin_balance:
fragment(
"""
CASE WHEN EXCLUDED.fetched_coin_balance_block_number IS NOT NULL AND
(? IS NULL OR
EXCLUDED.fetched_coin_balance_block_number >= ?) THEN
EXCLUDED.fetched_coin_balance
ELSE ?
END
""",
address.fetched_coin_balance_block_number,
address.fetched_coin_balance_block_number,
address.fetched_coin_balance
),
# MAX on two columns
fetched_coin_balance_block_number:
fragment(
"""
CASE WHEN EXCLUDED.fetched_coin_balance_block_number IS NOT NULL AND
(? IS NULL OR
EXCLUDED.fetched_coin_balance_block_number >= ?) THEN
EXCLUDED.fetched_coin_balance_block_number
ELSE ?
END
""",
address.fetched_coin_balance_block_number,
address.fetched_coin_balance_block_number,
address.fetched_coin_balance_block_number
)
]
]
),
on_conflict: on_conflict,
for: Address,
returning: true,
timeout: timeout,
@ -102,6 +67,46 @@ defmodule Explorer.Chain.Import.Addresses do
)
end
defp default_on_conflict do
from(address in Address,
update: [
set: [
contract_code: fragment("COALESCE(?, EXCLUDED.contract_code)", address.contract_code),
# ARGMAX on two columns
fetched_coin_balance:
fragment(
"""
CASE WHEN EXCLUDED.fetched_coin_balance_block_number IS NOT NULL AND
(? IS NULL OR
EXCLUDED.fetched_coin_balance_block_number >= ?) THEN
EXCLUDED.fetched_coin_balance
ELSE ?
END
""",
address.fetched_coin_balance_block_number,
address.fetched_coin_balance_block_number,
address.fetched_coin_balance
),
# MAX on two columns
fetched_coin_balance_block_number:
fragment(
"""
CASE WHEN EXCLUDED.fetched_coin_balance_block_number IS NOT NULL AND
(? IS NULL OR
EXCLUDED.fetched_coin_balance_block_number >= ?) THEN
EXCLUDED.fetched_coin_balance_block_number
ELSE ?
END
""",
address.fetched_coin_balance_block_number,
address.fetched_coin_balance_block_number,
address.fetched_coin_balance_block_number
)
]
]
)
end
defp sort_changes_list(changes_list) do
Enum.sort_by(changes_list, & &1.hash)
end

@ -10,54 +10,51 @@ defmodule Explorer.Chain.Import.Block.SecondDegreeRelations do
alias Ecto.{Changeset, Multi}
alias Explorer.Chain.{Block, Hash, Import}
@behaviour Import.Runner
@timeout 60_000
@type options :: %{
required(:params) => Import.params(),
optional(:timeout) => timeout
}
@type imported :: [
%{required(:nephew_hash) => Hash.Full.t(), required(:uncle_hash) => Hash.Full.t()}
]
def run(multi, ecto_schema_module_to_changes_list, options)
when is_map(ecto_schema_module_to_changes_list) and is_map(options) do
case ecto_schema_module_to_changes_list do
%{Block.SecondDegreeRelation => block_second_degree_relations_changes} ->
Multi.run(multi, :block_second_degree_relations, fn _ ->
insert(
block_second_degree_relations_changes,
%{
timeout: options[:block_second_degree_relations][:timeout] || @timeout
}
)
end)
_ ->
multi
end
@impl Import.Runner
def ecto_schema_module, do: Block.SecondDegreeRelation
@impl Import.Runner
def option_key, do: :block_second_degree_relations
@impl Import.Runner
def imported_table_row do
%{
value_type: "[%{uncle_hash: Explorer.Chain.Hash.t(), nephew_hash: Explorer.Chain.Hash.t()]",
value_description: "List of maps of the `t:#{ecto_schema_module()}.t/0` `uncle_hash` and `nephew_hash`"
}
end
@impl Import.Runner
def run(multi, changes_list, options) when is_map(options) do
timeout = options[:block_second_degree_relations][:timeout] || @timeout
Multi.run(multi, :block_second_degree_relations, fn _ ->
insert(changes_list, %{timeout: timeout})
end)
end
@impl Import.Runner
def timeout, do: @timeout
@spec insert([map()], %{required(:timeout) => timeout}) ::
{:ok, %{nephew_hash: Hash.Full.t(), uncle_hash: Hash.Full.t()}} | {:error, [Changeset.t()]}
defp insert(changes_list, %{timeout: timeout}) when is_list(changes_list) do
defp insert(changes_list, %{timeout: timeout} = options) when is_list(changes_list) do
on_conflict = Map.get_lazy(options, :on_conflict, &default_on_conflict/0)
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = Enum.sort_by(changes_list, &{&1.nephew_hash, &1.uncle_hash})
Import.insert_changes_list(ordered_changes_list,
conflict_target: [:nephew_hash, :uncle_hash],
on_conflict:
from(
block_second_degree_relation in Block.SecondDegreeRelation,
update: [
set: [
uncle_fetched_at:
fragment("LEAST(?, EXCLUDED.uncle_fetched_at)", block_second_degree_relation.uncle_fetched_at)
]
]
),
on_conflict: on_conflict,
for: Block.SecondDegreeRelation,
returning: [:nephew_hash, :uncle_hash],
timeout: timeout,
@ -65,4 +62,16 @@ defmodule Explorer.Chain.Import.Block.SecondDegreeRelations do
timestamps: %{}
)
end
defp default_on_conflict do
from(
block_second_degree_relation in Block.SecondDegreeRelation,
update: [
set: [
uncle_fetched_at:
fragment("LEAST(?, EXCLUDED.uncle_fetched_at)", block_second_degree_relation.uncle_fetched_at)
]
]
)
end
end

@ -12,61 +12,69 @@ defmodule Explorer.Chain.Import.Blocks do
alias Explorer.Chain.{Block, Import, Transaction}
alias Explorer.Repo
@behaviour Import.Runner
# milliseconds
@timeout 60_000
@type options :: %{
required(:params) => Import.params(),
optional(:timeout) => timeout
}
@type imported :: [Block.t()]
def run(multi, ecto_schema_module_to_changes_list_map, options)
when is_map(ecto_schema_module_to_changes_list_map) and is_map(options) do
case ecto_schema_module_to_changes_list_map do
%{Block => blocks_changes} ->
timestamps = Map.fetch!(options, :timestamps)
blocks_timeout = options[:blocks][:timeout] || @timeout
where_forked = where_forked(blocks_changes)
multi
|> Multi.run(:derive_transaction_forks, fn _ ->
derive_transaction_forks(%{
timeout: options[:transaction_forks][:timeout] || Import.Transaction.Forks.timeout(),
timestamps: timestamps,
where_forked: where_forked
})
end)
# MUST be after `:derive_transaction_forks`, which depends on values in `transactions` table
|> Multi.run(:fork_transactions, fn _ ->
fork_transactions(%{
timeout: options[:transactions][:timeout] || Import.Transactions.timeout(),
timestamps: timestamps,
where_forked: where_forked
})
end)
|> Multi.run(:lose_consenus, fn _ ->
lose_consensus(blocks_changes, %{timeout: blocks_timeout, timestamps: timestamps})
end)
|> Multi.run(:blocks, fn _ ->
insert(blocks_changes, %{timeout: blocks_timeout, timestamps: timestamps})
end)
|> Multi.run(:uncle_fetched_block_second_degree_relations, fn %{blocks: blocks} when is_list(blocks) ->
update_block_second_degree_relations(
blocks,
%{
timeout:
options[:block_second_degree_relations][:timeout] || Import.Block.SecondDegreeRelations.timeout(),
timestamps: timestamps
}
)
end)
_ ->
multi
end
@impl Import.Runner
def ecto_schema_module, do: Block
@impl Import.Runner
def option_key, do: :blocks
@impl Import.Runner
def imported_table_row do
%{
value_type: "[#{ecto_schema_module()}.t()]",
value_description: "List of `t:#{ecto_schema_module()}.t/0`s"
}
end
@impl Import.Runner
def run(multi, changes_list, options) when is_map(options) do
timestamps = Map.fetch!(options, :timestamps)
blocks_timeout = options[option_key()][:timeout] || @timeout
where_forked = where_forked(changes_list)
multi
|> Multi.run(:derive_transaction_forks, fn _ ->
derive_transaction_forks(%{
timeout: options[Import.Transaction.Forks.option_key()][:timeout] || Import.Transaction.Forks.timeout(),
timestamps: timestamps,
where_forked: where_forked
})
end)
# MUST be after `:derive_transaction_forks`, which depends on values in `transactions` table
|> Multi.run(:fork_transactions, fn _ ->
fork_transactions(%{
timeout: options[Import.Transactions.option_key()][:timeout] || Import.Transactions.timeout(),
timestamps: timestamps,
where_forked: where_forked
})
end)
|> Multi.run(:lose_consenus, fn _ ->
lose_consensus(changes_list, %{timeout: blocks_timeout, timestamps: timestamps})
end)
|> Multi.run(:blocks, fn _ ->
insert(changes_list, %{timeout: blocks_timeout, timestamps: timestamps})
end)
|> Multi.run(:uncle_fetched_block_second_degree_relations, fn %{blocks: blocks} when is_list(blocks) ->
update_block_second_degree_relations(
blocks,
%{
timeout:
options[Import.Block.SecondDegreeRelations.option_key()][:timeout] ||
Import.Block.SecondDegreeRelations.timeout(),
timestamps: timestamps
}
)
end)
end
@impl Import.Runner
def timeout, do: @timeout
# sobelow_skip ["SQL.Query"]
@ -135,23 +143,21 @@ defmodule Explorer.Chain.Import.Blocks do
@spec insert([map()], %{required(:timeout) => timeout, required(:timestamps) => Import.timestamps()}) ::
{:ok, [Block.t()]} | {:error, [Changeset.t()]}
defp insert(changes_list, %{timeout: timeout, timestamps: timestamps})
when is_list(changes_list) do
defp insert(changes_list, %{timeout: timeout, timestamps: timestamps} = options) when is_list(changes_list) do
on_conflict = Map.get(options, :on_conflict, :replace_all)
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = Enum.sort_by(changes_list, &{&1.number, &1.hash})
{:ok, blocks} =
Import.insert_changes_list(
ordered_changes_list,
conflict_target: :hash,
on_conflict: :replace_all,
for: Block,
returning: true,
timeout: timeout,
timestamps: timestamps
)
{:ok, blocks}
Import.insert_changes_list(
ordered_changes_list,
conflict_target: :hash,
on_conflict: on_conflict,
for: Block,
returning: true,
timeout: timeout,
timestamps: timestamps
)
end
defp lose_consensus(blocks_changes, %{timeout: timeout, timestamps: %{updated_at: updated_at}})

@ -11,56 +11,55 @@ defmodule Explorer.Chain.Import.InternalTransactions do
import Ecto.Query, only: [from: 2]
@behaviour Import.Runner
# milliseconds
@timeout 60_000
@type options :: %{
required(:params) => Import.params(),
optional(:timeout) => timeout
}
@type imported :: [
%{required(:index) => non_neg_integer(), required(:transaction_hash) => Hash.Full.t()}
]
def run(multi, ecto_schema_module_to_changes_list_map, options)
when is_map(ecto_schema_module_to_changes_list_map) and is_map(options) do
case ecto_schema_module_to_changes_list_map do
%{InternalTransaction => internal_transactions_changes} ->
timestamps = Map.fetch!(options, :timestamps)
multi
|> Multi.run(:internal_transactions, fn _ ->
insert(
internal_transactions_changes,
%{
timeout: options[:internal_transactions][:timeout] || @timeout,
timestamps: timestamps
}
)
end)
|> Multi.run(:internal_transactions_indexed_at_transactions, fn %{internal_transactions: internal_transactions}
when is_list(internal_transactions) ->
update_transactions(
internal_transactions,
%{
timeout: options[:transactions][:timeout] || Import.Transactions.timeout(),
timestamps: timestamps
}
)
end)
_ ->
multi
end
@impl Import.Runner
def ecto_schema_module, do: InternalTransaction
@impl Import.Runner
def option_key, do: :internal_transactions
@impl Import.Runner
def imported_table_row do
%{
value_type: "[%{index: non_neg_integer(), transaction_hash: Explorer.Chain.Hash.t()}]",
value_description: "List of maps of the `t:Explorer.Chain.InternalTransaction.t/0` `index` and `transaction_hash`"
}
end
@impl Import.Runner
def run(multi, changes_list, options) when is_map(options) do
timestamps = Map.fetch!(options, :timestamps)
internal_transactions_timeout = options[option_key()][:timeout] || @timeout
transactions_timeout = options[Import.Transactions.option_key()][:timeout] || Import.Transactions.timeout()
multi
|> Multi.run(:internal_transactions, fn _ ->
insert(changes_list, %{timeout: internal_transactions_timeout, timestamps: timestamps})
end)
|> Multi.run(:internal_transactions_indexed_at_transactions, fn %{internal_transactions: internal_transactions}
when is_list(internal_transactions) ->
update_transactions(internal_transactions, %{timeout: transactions_timeout, timestamps: timestamps})
end)
end
@impl Import.Runner
def timeout, do: @timeout
@spec insert([map], %{required(:timeout) => timeout, required(:timestamps) => Import.timestamps()}) ::
{:ok, [%{index: non_neg_integer, transaction_hash: Hash.t()}]}
| {:error, [Changeset.t()]}
defp insert(changes_list, %{timeout: timeout, timestamps: timestamps})
defp insert(changes_list, %{timeout: timeout, timestamps: timestamps} = options)
when is_list(changes_list) do
on_conflict = Map.get(options, :on_conflict, :replace_all)
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = Enum.sort_by(changes_list, &{&1.transaction_hash, &1.index})
@ -69,7 +68,7 @@ defmodule Explorer.Chain.Import.InternalTransactions do
ordered_changes_list,
conflict_target: [:transaction_hash, :index],
for: InternalTransaction,
on_conflict: :replace_all,
on_conflict: on_conflict,
returning: [:id, :index, :transaction_hash],
timeout: timeout,
timestamps: timestamps

@ -8,43 +8,46 @@ defmodule Explorer.Chain.Import.Logs do
alias Ecto.{Changeset, Multi}
alias Explorer.Chain.{Import, Log}
@behaviour Import.Runner
# milliseconds
@timeout 60_000
@type options :: %{
required(:params) => Import.params(),
optional(:timeout) => timeout
}
@type imported :: [Log.t()]
def run(multi, ecto_schema_module_to_changes_list_map, options)
when is_map(ecto_schema_module_to_changes_list_map) and is_map(options) do
case ecto_schema_module_to_changes_list_map do
%{Log => logs_changes} ->
timestamps = Map.fetch!(options, :timestamps)
Multi.run(multi, :logs, fn _ ->
insert(
logs_changes,
%{
timeout: options[:logs][:timeout] || @timeout,
timestamps: timestamps
}
)
end)
_ ->
multi
end
@impl Import.Runner
def ecto_schema_module, do: Log
@impl Import.Runner
def option_key, do: :logs
@impl Import.Runner
def imported_table_row do
%{
value_type: "[#{ecto_schema_module()}.t()]",
value_description: "List of `t:#{ecto_schema_module()}.t/0`s"
}
end
@impl Import.Runner
def run(multi, changes_list, options) when is_map(options) do
timestamps = Map.fetch!(options, :timestamps)
timeout = options[option_key()][:timeout] || @timeout
Multi.run(multi, :logs, fn _ ->
insert(changes_list, %{timeout: timeout, timestamps: timestamps})
end)
end
@impl Import.Runner
def timeout, do: @timeout
@spec insert([map()], %{required(:timeout) => timeout, required(:timestamps) => Import.timestamps()}) ::
{:ok, [Log.t()]}
| {:error, [Changeset.t()]}
defp insert(changes_list, %{timeout: timeout, timestamps: timestamps})
when is_list(changes_list) do
defp insert(changes_list, %{timeout: timeout, timestamps: timestamps} = options) when is_list(changes_list) do
on_conflict = Map.get(options, :on_conflict, :replace_all)
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = Enum.sort_by(changes_list, &{&1.transaction_hash, &1.index})
@ -52,7 +55,7 @@ defmodule Explorer.Chain.Import.Logs do
Import.insert_changes_list(
ordered_changes_list,
conflict_target: [:transaction_hash, :index],
on_conflict: :replace_all,
on_conflict: on_conflict,
for: Log,
returning: true,
timeout: timeout,

@ -0,0 +1,37 @@
defmodule Explorer.Chain.Import.Runner do
@moduledoc """
Behaviour used by `Explorer.Chain.Import.all/1` to import data into separate tables.
"""
alias Ecto.Multi
@type changeset_function_name :: atom
@type on_conflict :: :nothing | :replace_all | Ecto.Query.t()
@typedoc """
Runner-specific options under `c:option_key/0` in all options passed to `c:run/3`.
"""
@type options :: %{
required(:params) => [map()],
optional(:on_conflict) => on_conflict(),
optional(:timeout) => timeout,
optional(:with) => changeset_function_name()
}
@doc """
Key in `t:all_options` used by this `Explorer.Chain.Import` behaviour implementation.
"""
@callback option_key() :: atom()
@doc """
Row of markdown table explaining format of `imported` from the module for use in `all/1` docs.
"""
@callback imported_table_row() :: %{value_type: String.t(), value_description: String.t()}
@doc """
The `Ecto.Schema` module that contains the `:changeset` function for validating `options[options_key][:params]`.
"""
@callback ecto_schema_module() :: module()
@callback run(Multi.t(), changes_list :: [%{optional(atom()) => term()}], %{optional(atom()) => term()}) :: Multi.t()
@callback timeout() :: timeout()
end

@ -8,43 +8,46 @@ defmodule Explorer.Chain.Import.TokenTransfers do
alias Ecto.{Changeset, Multi}
alias Explorer.Chain.{Import, TokenTransfer}
@behaviour Import.Runner
# milliseconds
@timeout 60_000
@type options :: %{
required(:params) => Import.params(),
optional(:timeout) => timeout
}
@type imported :: [TokenTransfer.t()]
def run(multi, ecto_schema_module_to_changes_list, options)
when is_map(ecto_schema_module_to_changes_list) and is_map(options) do
case ecto_schema_module_to_changes_list do
%{TokenTransfer => token_transfers_changes} ->
timestamps = Map.fetch!(options, :timestamps)
Multi.run(multi, :token_transfers, fn _ ->
insert(
token_transfers_changes,
%{
timeout: options[:token_transfers][:timeout] || @timeout,
timestamps: timestamps
}
)
end)
_ ->
multi
end
@impl Import.Runner
def ecto_schema_module, do: TokenTransfer
@impl Import.Runner
def option_key, do: :token_transfers
@impl Import.Runner
def imported_table_row do
%{
value_type: "[#{ecto_schema_module()}.t()]",
value_description: "List of `t:#{ecto_schema_module()}.t/0`s"
}
end
@impl Import.Runner
def run(multi, changes_list, options) when is_map(options) do
timestamps = Map.fetch!(options, :timestamps)
timeout = options[option_key()][:timeout] || @timeout
Multi.run(multi, :token_transfers, fn _ ->
insert(changes_list, %{timeout: timeout, timestamps: timestamps})
end)
end
@impl Import.Runner
def timeout, do: @timeout
@spec insert([map()], %{required(:timeout) => timeout(), required(:timestamps) => Import.timestamps()}) ::
{:ok, [TokenTransfer.t()]}
| {:error, [Changeset.t()]}
def insert(changes_list, %{timeout: timeout, timestamps: timestamps})
when is_list(changes_list) do
def insert(changes_list, %{timeout: timeout, timestamps: timestamps} = options) when is_list(changes_list) do
on_conflict = Map.get(options, :on_conflict, :replace_all)
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = Enum.sort_by(changes_list, &{&1.transaction_hash, &1.log_index})
@ -52,7 +55,7 @@ defmodule Explorer.Chain.Import.TokenTransfers do
Import.insert_changes_list(
ordered_changes_list,
conflict_target: [:transaction_hash, :log_index],
on_conflict: :replace_all,
on_conflict: on_conflict,
for: TokenTransfer,
returning: true,
timeout: timeout,

@ -5,64 +5,69 @@ defmodule Explorer.Chain.Import.Tokens do
require Ecto.Query
alias Ecto.{Changeset, Multi}
alias Ecto.Multi
alias Explorer.Chain.{Import, Token}
@behaviour Import.Runner
# milliseconds
@timeout 60_000
@type options :: %{
required(:params) => Import.params(),
optional(:on_conflict) => :nothing | :replace_all,
optional(:timeout) => timeout
}
@type imported :: [Token.t()]
def run(multi, ecto_schema_module_to_changes_list, options)
when is_map(ecto_schema_module_to_changes_list) and is_map(options) do
case ecto_schema_module_to_changes_list do
%{Token => tokens_changes} ->
%{timestamps: timestamps, tokens: %{on_conflict: on_conflict}} = options
@impl Import.Runner
def ecto_schema_module, do: Token
Multi.run(multi, :tokens, fn _ ->
insert(
tokens_changes,
%{
on_conflict: on_conflict,
timeout: options[:tokens][:timeout] || @timeout,
timestamps: timestamps
}
)
end)
@impl Import.Runner
def option_key, do: :tokens
_ ->
multi
end
@impl Import.Runner
def imported_table_row do
%{
value_type: "[#{ecto_schema_module()}.t()]",
value_description: "List of `t:#{ecto_schema_module()}.t/0`s"
}
end
@impl Import.Runner
def run(multi, changes_list, options) when is_map(options) do
%{timestamps: timestamps, tokens: %{on_conflict: on_conflict}} = options
timeout = options[option_key()][:timeout] || @timeout
Multi.run(multi, :tokens, fn _ ->
insert(changes_list, %{on_conflict: on_conflict, timeout: timeout, timestamps: timestamps})
end)
end
@impl Import.Runner
def timeout, do: @timeout
@spec insert([map()], %{
required(:on_conflict) => Import.on_conflict(),
required(:on_conflict) => Import.Runner.on_conflict(),
required(:timeout) => timeout(),
required(:timestamps) => Import.timestamps()
}) ::
{:ok, [Token.t()]}
| {:error, [Changeset.t()]}
def insert(changes_list, %{on_conflict: on_conflict, timeout: timeout, timestamps: timestamps})
when is_list(changes_list) do
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = Enum.sort_by(changes_list, & &1.contract_address_hash)
| {:error, {:required, :on_conflict}}
def insert(changes_list, %{timeout: timeout, timestamps: timestamps} = options) when is_list(changes_list) do
case options do
%{on_conflict: on_conflict} ->
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = Enum.sort_by(changes_list, & &1.contract_address_hash)
{:ok, _} =
Import.insert_changes_list(
ordered_changes_list,
conflict_target: :contract_address_hash,
on_conflict: on_conflict,
for: Token,
returning: true,
timeout: timeout,
timestamps: timestamps
)
{:ok, _} =
Import.insert_changes_list(
ordered_changes_list,
conflict_target: :contract_address_hash,
on_conflict: on_conflict,
for: Token,
returning: true,
timeout: timeout,
timestamps: timestamps
)
_ ->
{:error, {:required, :on_conflict}}
end
end
end

@ -10,65 +10,71 @@ defmodule Explorer.Chain.Import.Transaction.Forks do
alias Ecto.Multi
alias Explorer.Chain.{Hash, Import, Transaction}
@behaviour Import.Runner
# milliseconds
@timeout 60_000
@type options :: %{
required(:params) => Import.params(),
optional(:timeout) => timeout
}
@type imported :: [
%{required(:uncle_hash) => Hash.Full.t(), required(:hash) => Hash.Full.t()}
]
def run(multi, ecto_schema_module_to_changes_list_map, options)
when is_map(ecto_schema_module_to_changes_list_map) and is_map(options) do
case ecto_schema_module_to_changes_list_map do
%{Transaction.Fork => transaction_fork_changes} ->
%{timestamps: timestamps} = options
Multi.run(multi, :transaction_forks, fn _ ->
insert(
transaction_fork_changes,
%{
timeout: options[:transaction_forks][:timeout] || @timeout,
timestamps: timestamps
}
)
end)
_ ->
multi
end
@impl Import.Runner
def ecto_schema_module, do: Transaction.Fork
@impl Import.Runner
def option_key, do: :transaction_forks
@impl Import.Runner
def imported_table_row do
%{
value_type: "[%{uncle_hash: Explorer.Chain.Hash.t(), hash: Explorer.Chain.Hash.t()}]",
value_description: "List of maps of the `t:#{ecto_schema_module()}.t/0` `uncle_hash` and `hash` "
}
end
@impl Import.Runner
def run(multi, changes_list, options) when is_map(options) do
%{timestamps: timestamps} = options
timeout = options[option_key()][:timeout] || @timeout
Multi.run(multi, :transaction_forks, fn _ ->
insert(changes_list, %{timeout: timeout, timestamps: timestamps})
end)
end
@impl Import.Runner
def timeout, do: @timeout
@spec insert([map()], %{
required(:timeout) => timeout,
required(:timestamps) => Import.timestamps()
}) :: {:ok, [%{uncle_hash: Hash.t(), hash: Hash.t()}]}
defp insert(changes_list, %{timeout: timeout, timestamps: timestamps})
when is_list(changes_list) do
defp insert(changes_list, %{timeout: timeout, timestamps: timestamps} = options) when is_list(changes_list) do
on_conflict = Map.get_lazy(options, :on_conflict, &default_on_conflict/0)
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = Enum.sort_by(changes_list, &{&1.uncle_hash, &1.hash})
Import.insert_changes_list(
ordered_changes_list,
conflict_target: [:uncle_hash, :index],
on_conflict:
from(
transaction_fork in Transaction.Fork,
update: [
set: [
hash: fragment("EXCLUDED.hash")
]
]
),
on_conflict: on_conflict,
for: Transaction.Fork,
returning: [:uncle_hash, :hash],
timeout: timeout,
timestamps: timestamps
)
end
defp default_on_conflict do
from(
transaction_fork in Transaction.Fork,
update: [
set: [
hash: fragment("EXCLUDED.hash")
]
]
)
end
end

@ -5,50 +5,48 @@ defmodule Explorer.Chain.Import.Transactions do
require Ecto.Query
alias Ecto.{Changeset, Multi}
alias Ecto.Multi
alias Explorer.Chain.{Hash, Import, Transaction}
@behaviour Import.Runner
# milliseconds
@timeout 60_000
@type options :: %{
required(:params) => Import.params(),
optional(:with) => Import.changeset_function_name(),
optional(:on_conflict) => :nothing | :replace_all,
optional(:timeout) => timeout
}
@type imported :: [Hash.Full.t()]
def run(multi, ecto_schema_module_to_changes_list_map, options)
when is_map(ecto_schema_module_to_changes_list_map) and is_map(options) do
case ecto_schema_module_to_changes_list_map do
%{Transaction => transactions_changes} ->
# check required options as early as possible
%{timestamps: timestamps, transactions: %{on_conflict: on_conflict} = transactions_options} = options
@impl Import.Runner
def ecto_schema_module, do: Transaction
@impl Import.Runner
def option_key, do: :transactions
@impl Import.Runner
def imported_table_row do
%{
value_type: "[#{ecto_schema_module()}.t()]",
value_description: "List of `t:#{ecto_schema_module()}.t/0`s"
}
end
Multi.run(multi, :transactions, fn _ ->
insert(
transactions_changes,
%{
on_conflict: on_conflict,
timeout: transactions_options[:timeout] || @timeout,
timestamps: timestamps
}
)
end)
@impl Import.Runner
def run(multi, changes_list, options) when is_map(options) do
%{timestamps: timestamps, transactions: %{on_conflict: on_conflict} = transactions_options} = options
timeout = transactions_options[:timeout] || @timeout
_ ->
multi
end
Multi.run(multi, :transactions, fn _ ->
insert(changes_list, %{on_conflict: on_conflict, timeout: timeout, timestamps: timestamps})
end)
end
@impl Import.Runner
def timeout, do: @timeout
@spec insert([map()], %{
required(:on_conflict) => Import.on_conflict(),
required(:on_conflict) => Import.Runner.on_conflict(),
required(:timeout) => timeout,
required(:timestamps) => Import.timestamps()
}) :: {:ok, [Hash.t()]} | {:error, [Changeset.t()]}
}) :: {:ok, [Hash.t()]}
defp insert(changes_list, %{on_conflict: on_conflict, timeout: timeout, timestamps: timestamps})
when is_list(changes_list) do
# order so that row ShareLocks are grabbed in a consistent order

@ -100,7 +100,7 @@ defmodule Explorer.Chain.Transaction do
* `s` - The S field of the signature. The (r, s) is the normal output of an ECDSA signature, where r is computed as
the X coordinate of a point R, modulo the curve order n.
* `status` - whether the transaction was successfully mined or failed. `nil` when transaction is pending or has only
been collated into one of the `uncles` in one of the `forks.
been collated into one of the `uncles` in one of the `forks`.
* `to_address` - sink of `value`
* `to_address_hash` - `to_address` foreign key
* `uncles` - uncle blocks where `forks` were collated

@ -23,16 +23,16 @@ defmodule Indexer.Block.Fetcher do
%{
address_hash_to_fetched_balance_block_number: address_hash_to_fetched_balance_block_number,
transaction_hash_to_block_number_option: transaction_hash_to_block_number,
addresses: Import.Addresses.options(),
address_coin_balances: Import.Address.CoinBalances.options(),
address_token_balances: Import.Address.TokenBalances.options(),
blocks: Import.Blocks.options(),
block_second_degree_relations: Import.Block.SecondDegreeRelations.options(),
addresses: Import.Runner.options(),
address_coin_balances: Import.Runner.options(),
address_token_balances: Import.Runner.options(),
blocks: Import.Runner.options(),
block_second_degree_relations: Import.Runner.options(),
broadcast: boolean,
logs: Import.Logs.options(),
token_transfers: Import.TokenTransfers.options(),
tokens: Import.Tokens.options(),
transactions: Import.Transactions.options()
logs: Import.Runner.options(),
token_transfers: Import.Runner.options(),
tokens: Import.Runner.options(),
transactions: Import.Runner.options()
}
) :: Import.all_result()
@ -124,7 +124,6 @@ defmodule Indexer.Block.Fetcher do
blocks: %{params: blocks},
block_second_degree_relations: %{params: block_second_degree_relations},
logs: %{params: logs},
receipts: %{params: receipts},
token_transfers: %{params: token_transfers},
tokens: %{on_conflict: :nothing, params: tokens},
transactions: %{params: transactions_with_receipts, on_conflict: :replace_all}

@ -98,10 +98,13 @@ defmodule Indexer.Block.Uncle.Fetcher do
end
end
@ignored_options ~w(address_hash_to_fetched_balance_block_number transaction_hash_to_block_number)a
@impl Block.Fetcher
def import(_, options) when is_map(options) do
with {:ok, %{block_second_degree_relations: block_second_degree_relations}} = ok <-
options
|> Map.drop(@ignored_options)
|> uncle_blocks()
|> fork_transactions()
|> Chain.import() do

Loading…
Cancel
Save