chore: Refactor import stages (#11013)

pull/11317/head
Qwerty5Uiop 3 days ago committed by GitHub
parent cc4acdda0a
commit 91d642b457
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 68
      apps/explorer/lib/explorer/chain/import.ex
  2. 31
      apps/explorer/lib/explorer/chain/import/stage/block_following.ex
  3. 29
      apps/explorer/lib/explorer/chain/import/stage/block_pending.ex
  4. 35
      apps/explorer/lib/explorer/chain/import/stage/block_referencing.ex
  5. 34
      apps/explorer/lib/explorer/chain/import/stage/block_transaction_referencing.ex
  6. 27
      apps/explorer/lib/explorer/chain/import/stage/internal_transactions.ex
  7. 8
      apps/explorer/lib/explorer/chain/import/stage/main.ex
  8. 29
      apps/explorer/lib/explorer/chain/import/stage/token_referencing.ex

@ -12,16 +12,24 @@ defmodule Explorer.Chain.Import do
require Logger
@stages [
Import.Stage.BlockRelated,
Import.Stage.BlockReferencing,
Import.Stage.BlockFollowing,
Import.Stage.BlockPending,
Import.Stage.ChainTypeSpecific
[
Import.Stage.Main
],
[
Import.Stage.BlockTransactionReferencing,
Import.Stage.TokenReferencing,
Import.Stage.InternalTransactions,
Import.Stage.ChainTypeSpecific
]
]
# in order so that foreign keys are inserted before being referenced
@configured_runners Enum.flat_map(@stages, fn stage -> stage.runners() end)
@all_runners Enum.flat_map(@stages, fn stage -> stage.all_runners() end)
@configured_runners Enum.flat_map(@stages, fn stage_batch ->
Enum.flat_map(stage_batch, fn stage -> stage.runners() end)
end)
@all_runners Enum.flat_map(@stages, fn stage_batch ->
Enum.flat_map(stage_batch, fn stage -> stage.all_runners() end)
end)
quoted_runner_option_value =
quote do
@ -283,9 +291,11 @@ defmodule Explorer.Chain.Import do
timestamps = timestamps()
full_options = Map.put(options, :timestamps, timestamps)
{multis, final_runner_to_changes_list} =
Enum.flat_map_reduce(@stages, runner_to_changes_list, fn stage, remaining_runner_to_changes_list ->
stage.multis(remaining_runner_to_changes_list, full_options)
{multis_batches, final_runner_to_changes_list} =
Enum.map_reduce(@stages, runner_to_changes_list, fn stage_batch, remaining_runner_to_changes_list ->
Enum.flat_map_reduce(stage_batch, remaining_runner_to_changes_list, fn stage, inner_remaining_list ->
stage.multis(inner_remaining_list, full_options)
end)
end)
unless Enum.empty?(final_runner_to_changes_list) do
@ -293,7 +303,7 @@ defmodule Explorer.Chain.Import do
"No stages consumed the following runners: #{final_runner_to_changes_list |> Map.keys() |> inspect()}"
end
multis
multis_batches
end
def insert_changes_list(repo, changes_list, options) when is_atom(repo) and is_list(changes_list) do
@ -337,17 +347,21 @@ defmodule Explorer.Chain.Import do
reraise exception, __STACKTRACE__
end
defp logged_import(multis, options) when is_list(multis) and is_map(options) do
defp logged_import(multis_batches, options) when is_list(multis_batches) and is_map(options) do
import_id = :erlang.unique_integer([:positive])
Explorer.Logger.metadata(fn -> import_transactions(multis, options) end, import_id: import_id)
Explorer.Logger.metadata(fn -> import_batch_transactions(multis_batches, options) end, import_id: import_id)
end
defp import_transactions(multis, options) when is_list(multis) and is_map(options) do
Enum.reduce_while(multis, {:ok, %{}}, fn multi, {:ok, acc_changes} ->
case import_transaction(multi, options) do
{:ok, changes} -> {:cont, {:ok, Map.merge(acc_changes, changes)}}
{:error, _, _, _} = error -> {:halt, error}
defp import_batch_transactions(multis_batches, options) when is_list(multis_batches) and is_map(options) do
Enum.reduce_while(multis_batches, {:ok, %{}}, fn multis, {:ok, acc_changes} ->
multis
|> run_parallel_multis(options)
|> Task.yield_many(:infinity)
|> handle_task_results(acc_changes)
|> case do
{:ok, changes} -> {:cont, {:ok, changes}}
error -> {:halt, error}
end
end)
rescue
@ -358,8 +372,26 @@ defmodule Explorer.Chain.Import do
end
end
defp run_parallel_multis(multis, options) do
Enum.map(multis, fn multi -> Task.async(fn -> import_transaction(multi, options) end) end)
end
defp import_transaction(multi, options) when is_map(options) do
Repo.logged_transaction(multi, timeout: Map.get(options, :timeout, @transaction_timeout))
rescue
exception -> {:exception, exception, __STACKTRACE__}
end
defp handle_task_results(task_results, acc_changes) do
Enum.reduce_while(task_results, {:ok, acc_changes}, fn {_task, task_result}, {:ok, acc_changes_inner} ->
case task_result do
{:ok, {:ok, changes}} -> {:cont, {:ok, Map.merge(acc_changes_inner, changes)}}
{:ok, {:exception, exception, stacktrace}} -> reraise exception, stacktrace
{:ok, error} -> {:halt, error}
{:exit, reason} -> {:halt, reason}
nil -> {:halt, :timeout}
end
end)
end
defp set_refetch_needed_for_partially_imported_blocks(%{blocks: %{params: blocks_params}}) do

@ -1,31 +0,0 @@
defmodule Explorer.Chain.Import.Stage.BlockFollowing do
@moduledoc """
Imports any tables that follows and cannot be imported at the same time as
those imported by `Explorer.Chain.Import.Stage.BlockRelated` and `Explorer.Chain.Import.Stage.BlockReferencing`
"""
alias Explorer.Chain.Import.{Runner, Stage}
@behaviour Stage
@impl Stage
def runners,
do: [
Runner.Block.SecondDegreeRelations,
Runner.Block.Rewards,
Runner.Address.TokenBalances,
Runner.Address.CurrentTokenBalances
]
@impl Stage
def all_runners,
do: runners()
@impl Stage
def multis(runner_to_changes_list, options) do
{final_multi, final_remaining_runner_to_changes_list} =
Stage.single_multi(runners(), runner_to_changes_list, options)
{[final_multi], final_remaining_runner_to_changes_list}
end
end

@ -1,29 +0,0 @@
defmodule Explorer.Chain.Import.Stage.BlockPending do
@moduledoc """
Imports any tables that uses `Explorer.Chain.PendingBlockOperation` to track
progress and cannot be imported at the same time as those imported by
`Explorer.Chain.Import.Stage.BlockRelated` and `Explorer.Chain.Import.Stage.BlockReferencing`
"""
alias Explorer.Chain.Import.{Runner, Stage}
@behaviour Stage
@impl Stage
def runners,
do: [
Runner.InternalTransactions
]
@impl Stage
def all_runners,
do: runners()
@impl Stage
def multis(runner_to_changes_list, options) do
{final_multi, final_remaining_runner_to_changes_list} =
Stage.single_multi(runners(), runner_to_changes_list, options)
{[final_multi], final_remaining_runner_to_changes_list}
end
end

@ -1,35 +0,0 @@
defmodule Explorer.Chain.Import.Stage.BlockReferencing do
@moduledoc """
Imports any tables that reference `t:Explorer.Chain.Block.t/0` and that were
imported by `Explorer.Chain.Import.Stage.BlockRelated`.
"""
alias Explorer.Chain.Import.{Runner, Stage}
@behaviour Stage
@impl Stage
def runners do
[
Runner.Transaction.Forks,
Runner.Logs,
Runner.Tokens,
Runner.TokenInstances,
Runner.TransactionActions,
Runner.Withdrawals,
Runner.SignedAuthorizations
]
end
@impl Stage
def all_runners,
do: runners()
@impl Stage
def multis(runner_to_changes_list, options) do
{final_multi, final_remaining_runner_to_changes_list} =
Stage.single_multi(runners(), runner_to_changes_list, options)
{[final_multi], final_remaining_runner_to_changes_list}
end
end

@ -0,0 +1,34 @@
defmodule Explorer.Chain.Import.Stage.BlockTransactionReferencing do
@moduledoc """
Imports any data that is related to blocks and transactions.
"""
alias Explorer.Chain.Import.{Runner, Stage}
@behaviour Stage
@runners [
Runner.TokenTransfers,
Runner.Transaction.Forks,
Runner.Logs,
Runner.Block.Rewards,
Runner.Block.SecondDegreeRelations,
Runner.TransactionActions,
Runner.Withdrawals,
Runner.SignedAuthorizations
]
@impl Stage
def runners, do: @runners
@impl Stage
def all_runners, do: runners()
@impl Stage
def multis(runner_to_changes_list, options) do
{final_multi, final_remaining_runner_to_changes_list} =
Stage.single_multi(runners(), runner_to_changes_list, options)
{[final_multi], final_remaining_runner_to_changes_list}
end
end

@ -0,0 +1,27 @@
defmodule Explorer.Chain.Import.Stage.InternalTransactions do
@moduledoc """
Imports the rest of the data.
"""
alias Explorer.Chain.Import.{Runner, Stage}
@behaviour Stage
@runners [
Runner.InternalTransactions
]
@impl Stage
def runners, do: @runners
@impl Stage
def all_runners, do: runners()
@impl Stage
def multis(runner_to_changes_list, options) do
{final_multi, final_remaining_runner_to_changes_list} =
Stage.single_multi(runners(), runner_to_changes_list, options)
{[final_multi], final_remaining_runner_to_changes_list}
end
end

@ -1,6 +1,6 @@
defmodule Explorer.Chain.Import.Stage.BlockRelated do
defmodule Explorer.Chain.Import.Stage.Main do
@moduledoc """
Import blocks along with block related entities.
Imports main data (addresses, address_coin_balances, address_coin_balances_daily, tokens, blocks, transactions).
"""
alias Explorer.Chain.Import.{Runner, Stage}
@ -10,11 +10,11 @@ defmodule Explorer.Chain.Import.Stage.BlockRelated do
@addresses_runner Runner.Addresses
@rest_runners [
Runner.Tokens,
Runner.Blocks,
Runner.Address.CoinBalances,
Runner.Address.CoinBalancesDaily,
Runner.Transactions,
Runner.TokenTransfers
Runner.Transactions
]
@impl Stage

@ -0,0 +1,29 @@
defmodule Explorer.Chain.Import.Stage.TokenReferencing do
@moduledoc """
Imports any data that is related to tokens.
"""
alias Explorer.Chain.Import.{Runner, Stage}
@behaviour Stage
@runners [
Runner.TokenInstances,
Runner.Address.TokenBalances,
Runner.Address.CurrentTokenBalances
]
@impl Stage
def runners, do: @runners
@impl Stage
def all_runners, do: runners()
@impl Stage
def multis(runner_to_changes_list, options) do
{final_multi, final_remaining_runner_to_changes_list} =
Stage.single_multi(runners(), runner_to_changes_list, options)
{[final_multi], final_remaining_runner_to_changes_list}
end
end
Loading…
Cancel
Save