@ -9,32 +9,45 @@ defmodule Explorer.Chain.Import do
alias Explorer.Chain . { Address , Balance , Block , Hash , InternalTransaction , Log , Transaction , Wei }
alias Explorer.Repo
@typep addresses_option :: { :addresses , [ params_option | timeout_option | with_option ] }
@typep balances_option :: { :balances , [ params_option | timeout_option ] }
@typep blocks_option :: { :blocks , [ params_option | timeout_option ] }
@typep broadcast_option :: { :broadcast , Boolean }
@typep internal_transactions_option :: { :internal_transactions , [ params_option | timeout_option ] }
@typep logs_option :: { :logs , [ params_option | timeout_option ] }
@typep on_conflict_option :: { :on_conflict , :nothing | :replace_all }
@typep params_option :: { :params , [ map ( ) ] }
@typep receipts_option :: { :receipts , [ params_option | timeout_option ] }
@typep timeout_option :: { :timeout , timeout }
@typep timestamps :: %{ inserted_at : DateTime . t ( ) , updated_at : DateTime . t ( ) }
@typep timestamps_option :: { :timestamps , timestamps }
@typep transactions_option :: { :transactions , [ on_conflict_option | params_option | timeout_option | with_option ] }
@typep with_option :: { :with , changeset_function_name :: atom }
@type all_options :: [
addresses_option
| balances_option
| blocks_option
| broadcast_option
| internal_transactions_option
| logs_option
| receipts_option
| timeout_option
| transactions_option
]
@type changeset_function_name :: atom
@type on_conflict :: :nothing | :replace_all
@type params :: [ map ( ) ]
@type all_options :: %{
optional ( :addresses ) = > %{
required ( :params ) = > params ,
optional ( :timeout ) = > timeout ,
optional ( :with ) = > changeset_function_name
} ,
optional ( :balances ) = > %{
required ( :params ) = > params ,
optional ( :timeout ) = > timeout
} ,
optional ( :blocks ) = > %{
required ( :params ) = > params ,
optional ( :timeout ) = > timeout
} ,
optional ( :broadcast ) = > boolean ,
optional ( :internal_transactions ) = > %{
required ( :params ) = > params ,
optional ( :timeout ) = > timeout
} ,
optional ( :logs ) = > %{
required ( :params ) = > params ,
optional ( :timeout ) = > timeout
} ,
optional ( :receipts ) = > %{
required ( :params ) = > params ,
optional ( :timeout ) = > timeout
} ,
optional ( :timeout ) = > timeout ,
optional ( :transactions ) = > %{
required ( :params ) = > params ,
optional ( :with ) = > changeset_function_name ,
optional ( :on_conflict ) = > :nothing | :replace_all ,
optional ( :timeout ) = > timeout
}
}
@type all_result ::
{ :ok ,
%{
@ -54,23 +67,7 @@ defmodule Explorer.Chain.Import do
| { :error , step :: Ecto.Multi . name ( ) , failed_value :: any ( ) ,
changes_so_far :: %{ optional ( Ecto.Multi . name ( ) ) = > any ( ) } }
@type internal_transactions_options :: [
addresses_option
| internal_transactions_option
| timeout_option
| { :transactions , [ { :hashes , [ String . t ( ) ] } | timeout_option ] }
]
@type internal_transactions_result ::
{ :ok ,
%{
optional ( :addresses ) = > [ Hash.Address . t ( ) ] ,
optional ( :internal_transactions ) = > [
%{ required ( :index ) = > non_neg_integer ( ) , required ( :transaction_hash ) = > Hash.Full . t ( ) }
]
} }
| { :error , [ Changeset . t ( ) ] }
| { :error , step :: Ecto.Multi . name ( ) , failed_value :: any ( ) ,
changes_so_far :: %{ optional ( Ecto.Multi . name ( ) ) = > any ( ) } }
@typep timestamps :: %{ inserted_at : DateTime . t ( ) , updated_at : DateTime . t ( ) }
# timeouts all in milliseconds
@ -148,19 +145,13 @@ defmodule Explorer.Chain.Import do
* ` :with ` - the changeset function on ` Explorer.Chain.Transaction ` to use validate ` :params ` .
"""
@spec all ( all_options ( ) ) :: all_result ( )
def all ( options ) when is_list ( options ) do
broadcast =
case Keyword . fetch ( options , :broadcast ) do
{ :ok , broadcast } -> broadcast
:error -> false
end
def all ( options ) when is_map ( options ) do
changes_list_arguments_list = import_options_to_changes_list_arguments_list ( options )
with { :ok , ecto_schema_module_to_changes_list_map } <-
changes_list_arguments_list_to_ecto_schema_module_to_changes_list_map ( changes_list_arguments_list ) ,
{ :ok , data } <- insert_ecto_schema_module_to_changes_list_map ( ecto_schema_module_to_changes_list_map , options ) do
if broadcast , do : broadcast_events ( data )
if Map . get ( options , :broadcast , false ) , do : broadcast_events ( data )
{ :ok , data }
end
end
@ -236,9 +227,9 @@ defmodule Explorer.Chain.Import do
}
defp ecto_schema_module_to_changes_list_map_to_multi ( ecto_schema_module_to_changes_list_map , options )
when is_list ( options ) do
when is_map ( options ) do
timestamps = timestamps ( )
full_options = Keyword . put ( options , :timestamps , timestamps )
full_options = Map . put ( options , :timestamps , timestamps )
Multi . new ( )
|> run_addresses ( ecto_schema_module_to_changes_list_map , full_options )
@ -250,16 +241,18 @@ defmodule Explorer.Chain.Import do
end
defp run_addresses ( multi , ecto_schema_module_to_changes_list_map , options )
when is_map ( ecto_schema_module_to_changes_list_map ) and is_list ( options ) do
when is_map ( ecto_schema_module_to_changes_list_map ) and is_map ( options ) do
case ecto_schema_module_to_changes_list_map do
%{ Address = > addresses_changes } ->
timestamps = Keyword . fetch! ( options , :timestamps )
timestamps = Map . fetch! ( options , :timestamps )
Multi . run ( multi , :addresses , fn _ ->
insert_addresses (
addresses_changes ,
timeout : options [ :addresses ] [ :timeout ] || @insert_addresses_timeout ,
timestamps : timestamps
%{
timeout : options [ :addresses ] [ :timeout ] || @insert_addresses_timeout ,
timestamps : timestamps
}
)
end )
@ -269,16 +262,18 @@ defmodule Explorer.Chain.Import do
end
defp run_balances ( multi , ecto_schema_module_to_changes_list_map , options )
when is_map ( ecto_schema_module_to_changes_list_map ) and is_list ( options ) do
when is_map ( ecto_schema_module_to_changes_list_map ) and is_map ( options ) do
case ecto_schema_module_to_changes_list_map do
%{ Balance = > balances_changes } ->
timestamps = Keyword . fetch! ( options , :timestamps )
timestamps = Map . fetch! ( options , :timestamps )
Multi . run ( multi , :balances , fn _ ->
insert_balances (
balances_changes ,
timeout : options [ :balances ] [ :timeout ] || @insert_balances_timeout ,
timestamps : timestamps
%{
timeout : options [ :balances ] [ :timeout ] || @insert_balances_timeout ,
timestamps : timestamps
}
)
end )
@ -288,16 +283,18 @@ defmodule Explorer.Chain.Import do
end
defp run_blocks ( multi , ecto_schema_module_to_changes_list_map , options )
when is_map ( ecto_schema_module_to_changes_list_map ) and is_list ( options ) do
when is_map ( ecto_schema_module_to_changes_list_map ) and is_map ( options ) do
case ecto_schema_module_to_changes_list_map do
%{ Block = > blocks_changes } ->
timestamps = Keyword . fetch! ( options , :timestamps )
timestamps = Map . fetch! ( options , :timestamps )
Multi . run ( multi , :blocks , fn _ ->
insert_blocks (
blocks_changes ,
timeout : options [ :blocks ] [ :timeout ] || @insert_blocks_timeout ,
timestamps : timestamps
%{
timeout : options [ :blocks ] [ :timeout ] || @insert_blocks_timeout ,
timestamps : timestamps
}
)
end )
@ -307,20 +304,20 @@ defmodule Explorer.Chain.Import do
end
defp run_transactions ( multi , ecto_schema_module_to_changes_list_map , options )
when is_map ( ecto_schema_module_to_changes_list_map ) and is_list ( options ) do
when is_map ( ecto_schema_module_to_changes_list_map ) and is_map ( options ) do
case ecto_schema_module_to_changes_list_map do
%{ Transaction = > transactions_changes } ->
# check required options as early as possible
transactions_options = Keyword . fetch! ( options , :transactions )
on_conflict = Keyword . fetch! ( transactions_options , :on_conflict )
timestamps = Keyword . fetch! ( options , :timestamps )
%{ timestamps : timestamps , transactions : %{ on_conflict : on_conflict } = transactions_options } = options
Multi . run ( multi , :transactions , fn _ ->
insert_transactions (
transactions_changes ,
on_conflict : on_conflict ,
timeout : transactions_options [ :timeout ] || @insert_transactions_timeout ,
timestamps : timestamps
%{
on_conflict : on_conflict ,
timeout : transactions_options [ :timeout ] || @insert_transactions_timeout ,
timestamps : timestamps
}
)
end )
@ -330,25 +327,29 @@ defmodule Explorer.Chain.Import do
end
defp run_internal_transactions ( multi , ecto_schema_module_to_changes_list_map , options )
when is_map ( ecto_schema_module_to_changes_list_map ) and is_list ( options ) do
when is_map ( ecto_schema_module_to_changes_list_map ) and is_map ( options ) do
case ecto_schema_module_to_changes_list_map do
%{ InternalTransaction = > internal_transactions_changes } ->
timestamps = Keyword . fetch! ( options , :timestamps )
timestamps = Map . fetch! ( options , :timestamps )
multi
|> Multi . run ( :internal_transactions , fn _ ->
insert_internal_transactions (
internal_transactions_changes ,
timeout : options [ :internal_transactions ] [ :timeout ] || @insert_internal_transactions_timeout ,
timestamps : timestamps
%{
timeout : options [ :internal_transactions ] [ :timeout ] || @insert_internal_transactions_timeout ,
timestamps : timestamps
}
)
end )
|> Multi . run ( :internal_transactions_indexed_at_transactions , fn %{ internal_transactions : internal_transactions }
when is_list ( internal_transactions ) ->
update_transactions_internal_transactions_indexed_at (
internal_transactions ,
timeout : options [ :transactions ] [ :timeout ] || @insert_transactions_timeout ,
timestamps : timestamps
%{
timeout : options [ :transactions ] [ :timeout ] || @insert_transactions_timeout ,
timestamps : timestamps
}
)
end )
@ -358,16 +359,18 @@ defmodule Explorer.Chain.Import do
end
defp run_logs ( multi , ecto_schema_module_to_changes_list_map , options )
when is_map ( ecto_schema_module_to_changes_list_map ) and is_list ( options ) do
when is_map ( ecto_schema_module_to_changes_list_map ) and is_map ( options ) do
case ecto_schema_module_to_changes_list_map do
%{ Log = > logs_changes } ->
timestamps = Keyword . fetch! ( options , :timestamps )
timestamps = Map . fetch! ( options , :timestamps )
Multi . run ( multi , :logs , fn _ ->
insert_logs (
logs_changes ,
timeout : options [ :logs ] [ :timeout ] || @insert_logs_timeout ,
timestamps : timestamps
%{
timeout : options [ :logs ] [ :timeout ] || @insert_logs_timeout ,
timestamps : timestamps
}
)
end )
@ -376,13 +379,11 @@ defmodule Explorer.Chain.Import do
end
end
@spec insert_addresses ( [ %{ hash : Hash.Address . t ( ) } ] , [ timeout_option | timestamps_option | with_option ] ) ::
{ :ok , [ Hash.Address . t ( ) ] }
defp insert_addresses ( changes_list , named_arguments )
when is_list ( changes_list ) and is_list ( named_arguments ) do
timestamps = Keyword . fetch! ( named_arguments , :timestamps )
timeout = Keyword . fetch! ( named_arguments , :timeout )
@spec insert_addresses ( [ %{ hash : Hash.Address . t ( ) } ] , %{
required ( :timeout ) = > timeout ,
required ( :timestamps ) = > timestamps
} ) :: { :ok , [ Hash.Address . t ( ) ] }
defp insert_addresses ( changes_list , %{ timeout : timeout , timestamps : timestamps } ) when is_list ( changes_list ) do
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = sort_address_changes_list ( changes_list )
@ -447,14 +448,14 @@ defmodule Explorer.Chain.Import do
required ( :value ) = > Wei . t ( )
}
] ,
[ timeout_option ]
%{
required ( :timeout ) = > timeout ,
required ( :timestamps ) = > timestamps
}
) ::
{ :ok , [ %{ required ( :address_hash ) = > Hash.Address . t ( ) , required ( :block_number ) = > Block . block_number ( ) } ] }
| { :error , [ Changeset . t ( ) ] }
defp insert_balances ( changes_list , named_arguments ) when is_list ( changes_list ) and is_list ( named_arguments ) do
timestamps = Keyword . fetch! ( named_arguments , :timestamps )
timeout = Keyword . fetch! ( named_arguments , :timeout )
defp insert_balances ( changes_list , %{ timeout : timeout , timestamps : timestamps } ) when is_list ( changes_list ) do
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = Enum . sort_by ( changes_list , & { &1 . address_hash , &1 . block_number } )
@ -490,12 +491,10 @@ defmodule Explorer.Chain.Import do
{ :ok , Enum . map ( ordered_changes_list , & Map . take ( &1 , ~w( address_hash block_number )a ) ) }
end
@spec insert_blocks ( [ map ( ) ] , [ timeout_option | timestamps_option ] ) :: { :ok , [ Block . t ( ) ] } | { :error , [ Changeset . t ( ) ] }
defp insert_blocks ( changes_list , named_arguments )
when is_list ( changes_list ) and is_list ( named_arguments ) do
timestamps = Keyword . fetch! ( named_arguments , :timestamps )
timeout = Keyword . fetch! ( named_arguments , :timeout )
@spec insert_blocks ( [ map ( ) ] , %{ required ( :timeout ) = > timeout , required ( :timestamps ) = > timestamps } ) ::
{ :ok , [ Block . t ( ) ] } | { :error , [ Changeset . t ( ) ] }
defp insert_blocks ( changes_list , %{ timeout : timeout , timestamps : timestamps } )
when is_list ( changes_list ) do
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = Enum . sort_by ( changes_list , & { &1 . number , &1 . hash } )
@ -513,13 +512,11 @@ defmodule Explorer.Chain.Import do
{ :ok , blocks }
end
@spec insert_internal_transactions ( [ map ] , [ timeout_option | timestamps_option ] ) ::
@spec insert_internal_transactions ( [ map ] , %{ required ( :timeout ) = > timeout , required ( :timestamps ) = > timestamps } ) ::
{ :ok , [ %{ index : non_neg_integer , transaction_hash : Hash . t ( ) } ] }
| { :error , [ Changeset . t ( ) ] }
defp insert_internal_transactions ( changes_list , named_arguments )
when is_list ( changes_list ) and is_list ( named_arguments ) do
timestamps = Keyword . fetch! ( named_arguments , :timestamps )
defp insert_internal_transactions ( changes_list , %{ timeout : timeout , timestamps : timestamps } )
when is_list ( changes_list ) do
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = Enum . sort_by ( changes_list , & { &1 . transaction_hash , &1 . index } )
@ -530,6 +527,7 @@ defmodule Explorer.Chain.Import do
for : InternalTransaction ,
on_conflict : :replace_all ,
returning : [ :index , :transaction_hash ] ,
timeout : timeout ,
timestamps : timestamps
)
@ -540,14 +538,11 @@ defmodule Explorer.Chain.Import do
) }
end
@spec insert_logs ( [ map ( ) ] , [ timeout_option | timestamps_option ] ) ::
@spec insert_logs ( [ map ( ) ] , %{ required ( :timeout ) = > timeout , required ( :timestamps ) = > timestamps } ) ::
{ :ok , [ Log . t ( ) ] }
| { :error , [ Changeset . t ( ) ] }
defp insert_logs ( changes_list , named_arguments )
when is_list ( changes_list ) and is_list ( named_arguments ) do
timestamps = Keyword . fetch! ( named_arguments , :timestamps )
timeout = Keyword . fetch! ( named_arguments , :timeout )
defp insert_logs ( changes_list , %{ timeout : timeout , timestamps : timestamps } )
when is_list ( changes_list ) do
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = Enum . sort_by ( changes_list , & { &1 . transaction_hash , &1 . index } )
@ -563,14 +558,13 @@ defmodule Explorer.Chain.Import do
)
end
@spec insert_transactions ( [ map ( ) ] , [ on_conflict_option | timeout_option | timestamps_option ] ) ::
{ :ok , [ Hash . t ( ) ] } | { :error , [ Changeset . t ( ) ] }
defp insert_transactions ( changes_list , named_arguments )
when is_list ( changes_list ) and is_list ( named_arguments ) do
timestamps = Keyword . fetch! ( named_arguments , :timestamps )
timeout = Keyword . fetch! ( named_arguments , :timeout )
on_conflict = Keyword . fetch! ( named_arguments , :on_conflict )
@spec insert_transactions ( [ map ( ) ] , %{
required ( :on_conflict ) = > on_conflict ,
required ( :timeout ) = > timeout ,
required ( :timestamps ) = > timestamps
} ) :: { :ok , [ Hash . t ( ) ] } | { :error , [ Changeset . t ( ) ] }
defp insert_transactions ( changes_list , %{ on_conflict : on_conflict , timeout : timeout , timestamps : timestamps } )
when is_list ( changes_list ) do
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = Enum . sort_by ( changes_list , & &1 . hash )
@ -603,11 +597,11 @@ defmodule Explorer.Chain.Import do
{ :ok , inserted }
end
defp update_transactions_internal_transactions_indexed_at ( internal_transactions , named_arguments )
when is_list ( internal_transactions ) and is_list ( named_arguments ) do
timeout = Keyword . fetch! ( named_arguments , :timeout )
timestamps = Keyword . fetch! ( named_arguments , :timestamps )
defp update_transactions_internal_transactions_indexed_at ( internal_transactions , %{
timeout : timeout ,
timestamps : timestamps
} )
when is_list ( internal_transactions ) do
ordered_transaction_hashes =
internal_transactions
|> MapSet . new ( & &1 . transaction_hash )
@ -637,12 +631,12 @@ defmodule Explorer.Chain.Import do
defp import_options_to_changes_list_arguments_list ( options ) do
Enum . flat_map ( @import_option_key_to_ecto_schema_module , fn { option_key , ecto_schema_module } ->
case Keyword . fetch ( options , option_key ) do
{ :ok , option_value } when is_list ( option_value ) ->
case Map . fetch ( options , option_key ) do
{ :ok , option_value } when is_map ( option_value ) ->
[
[
Keyword . fetch! ( option_value , :params ) ,
[ for : ecto_schema_module , with : Keyword . get ( option_value , :with , :changeset ) ]
Map . fetch! ( option_value , :params ) ,
[ for : ecto_schema_module , with : Map . get ( option_value , :with , :changeset ) ]
]
]
@ -652,15 +646,13 @@ defmodule Explorer.Chain.Import do
end )
end
defp import_transaction ( multi , options ) when is_list ( options ) do
Repo . transaction ( multi , timeout : Keyword . get ( options , :timeout , @transaction_timeout ) )
defp import_transaction ( multi , options ) when is_map ( options ) do
Repo . transaction ( multi , timeout : Map . get ( options , :timeout , @transaction_timeout ) )
end
defp insert_ecto_schema_module_to_changes_list_map ( ecto_schema_module_to_changes_list_map , options ) do
timestamps = timestamps ( )
ecto_schema_module_to_changes_list_map
|> ecto_schema_module_to_changes_list_map_to_multi ( Keyword . put ( options , :timestamps , timestamps ) )
|> ecto_schema_module_to_changes_list_map_to_multi ( options )
|> import_transaction ( options )
end