@ -3,27 +3,18 @@ defmodule Explorer.Chain.Import do
Bulk importing of data into ` Explorer.Repo `
"""
alias Ecto . { Changeset , Multi }
alias Ecto.Changeset
alias Explorer.Chain.Import
alias Explorer.Repo
# in order so that foreign keys are inserted before being referenced
@runners [
Import.Addresses ,
Import.Address.CoinBalances ,
Import.Blocks ,
Import.Block.Rewards ,
Import.Block.SecondDegreeRelations ,
Import.Transactions ,
Import.Transaction.Forks ,
Import.InternalTransactions ,
Import.Logs ,
Import.Tokens ,
Import.TokenTransfers ,
Import.Address.CurrentTokenBalances ,
Import.Address.TokenBalances
@stages [
Import.Stage.Addresses ,
Import.Stage.AddressReferencing
]
# in order so that foreign keys are inserted before being referenced
@runners Enum . flat_map ( @stages , fn stage -> stage . runners ( ) end )
quoted_runner_option_value =
quote do
Import.Runner . options ( )
@ -129,8 +120,8 @@ defmodule Explorer.Chain.Import do
def all ( options ) when is_map ( options ) do
with { :ok , runner_options_pairs } <- validate_options ( options ) ,
{ :ok , valid_runner_option_pairs } <- validate_runner_options_pairs ( runner_options_pairs ) ,
{ :ok , runner_changes_list_pairs } <- runner_changes_list_pairs ( valid_runner_option_pairs ) ,
{ :ok , data } <- insert_runner_changes_list_pairs ( runner_changes_list_pairs , options ) do
{ :ok , runner_to_ changes_list } <- runner_to_ changes_list ( valid_runner_option_pairs ) ,
{ :ok , data } <- insert_runner_to_ changes_list ( runner_to_ changes_list , options ) do
broadcast_events ( data , Map . get ( options , :broadcast , false ) )
{ :ok , data }
end
@ -153,25 +144,22 @@ defmodule Explorer.Chain.Import do
end )
end
defp runner_changes_list_pairs ( runner_options_pairs ) when is_list ( runner_options_pairs ) do
{ status , reversed } =
runner_options_pairs
|> Stream . map ( fn { runner , options } -> runner_changes_list ( runner , options ) end )
|> Enum . reduce ( { :ok , [ ] } , fn
{ :ok , runner_changes_pair } , { :ok , acc_runner_changes_pairs } ->
{ :ok , [ runner_changes_pair | acc_runner_changes_pairs ] }
defp runner_to_changes_list ( runner_options_pairs ) when is_list ( runner_options_pairs ) do
runner_options_pairs
|> Stream . map ( fn { runner , options } -> runner_changes_list ( runner , options ) end )
|> Enum . reduce ( { :ok , %{ } } , fn
{ :ok , { runner , changes_list } } , { :ok , acc_runner_to_changes_list } ->
{ :ok , Map . put ( acc_runner_to_changes_list , runner , changes_list ) }
{ :ok , _ } , { :error , _ } = error ->
error
{ :error , _ } = error , { :ok , _ } ->
error
{ :ok , _ } , { :error , _ } = error ->
error
{ :error , runner_changesets } , { :error , acc_changesets } ->
{ :error , acc_changesets ++ runner_changesets }
end )
{ :error , _ } = error , { :ok , _ } ->
error
{ status , Enum . reverse ( reversed ) }
{ :error , runner_changesets } , { :error , acc_changesets } ->
{ :error , acc_changesets ++ runner_changesets }
end )
end
defp runner_changes_list ( runner , %{ params : params } = options ) do
@ -286,14 +274,22 @@ defmodule Explorer.Chain.Import do
end
end
defp runner_changes_list_pairs_ to_multi ( runner_changes_list_pairs , options )
when is_list ( runner_changes_list_pairs ) and is_map ( options ) do
defp runner_to_ changes_list_to_multis ( runner_to_ changes_list , options )
when is_map ( runner_to_ changes_list ) and is_map ( options ) do
timestamps = timestamps ( )
full_options = Map . put ( options , :timestamps , timestamps )
Enum . reduce ( runner_changes_list_pairs , Multi . new ( ) , fn { runner , changes_list } , acc ->
runner . run ( acc , changes_list , full_options )
end )
{ multis , final_runner_to_changes_list } =
Enum . flat_map_reduce ( @stages , runner_to_changes_list , fn stage , remaining_runner_to_changes_list ->
stage . multis ( remaining_runner_to_changes_list , full_options )
end )
unless Enum . empty? ( final_runner_to_changes_list ) do
raise ArgumentError ,
" No stages consumed the following runners: #{ final_runner_to_changes_list |> Map . keys ( ) |> inspect ( ) } "
end
multis
end
def insert_changes_list ( repo , changes_list , options ) when is_atom ( repo ) and is_list ( changes_list ) do
@ -319,14 +315,29 @@ defmodule Explorer.Chain.Import do
Map . merge ( changes , timestamps )
end
defp import_transaction ( multi , options ) when is_map ( options ) do
Repo . transaction ( multi , timeout : Map . get ( options , :timeout , @transaction_timeout ) )
defp insert_runner_to_changes_list ( runner_to_changes_list , options ) when is_map ( runner_to_changes_list ) do
runner_to_changes_list
|> runner_to_changes_list_to_multis ( options )
|> logged_import ( options )
end
defp logged_import ( multis , options ) when is_list ( multis ) and is_map ( options ) do
import_id = :erlang . unique_integer ( [ :positive ] )
Explorer.Logger . metadata ( fn -> import_transactions ( multis , options ) end , import_id : import_id )
end
defp insert_runner_changes_list_pairs ( runner_changes_list_pairs , options ) do
runner_changes_list_pairs
|> runner_changes_list_pairs_to_multi ( options )
|> import_transaction ( options )
defp import_transactions ( multis , options ) when is_list ( multis ) and is_map ( options ) do
Enum . reduce_while ( multis , { :ok , %{ } } , fn multi , { :ok , acc_changes } ->
case import_transaction ( multi , options ) do
{ :ok , changes } -> { :cont , { :ok , Map . merge ( acc_changes , changes ) } }
{ :error , _ , _ , _ } = error -> { :halt , error }
end
end )
end
defp import_transaction ( multi , options ) when is_map ( options ) do
Repo . logged_transaction ( multi , timeout : Map . get ( options , :timeout , @transaction_timeout ) )
end
@spec timestamps ( ) :: timestamps