From e520b5d5bd348f5066b3cc9c0dd79156eb4063bf Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Wed, 6 Jun 2018 13:19:52 -0500 Subject: [PATCH] :transactions :on_conflict required option for import_blocks Because the repository transaction for a pending `Explorer.Chain.Transaction`s could `COMMIT` after the repository transaction for that same transaction being collated into a block, writers, it is recommended to use `:nothing` for pending transactions and `:replace_all` for collated transactions, so that collated transactions win. --- apps/explorer/lib/explorer/chain.ex | 22 +++++++++++++++---- .../lib/explorer/indexer/block_fetcher.ex | 2 +- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/apps/explorer/lib/explorer/chain.ex b/apps/explorer/lib/explorer/chain.ex index 7f0b3c3f13..ac6e835c54 100644 --- a/apps/explorer/lib/explorer/chain.ex +++ b/apps/explorer/lib/explorer/chain.ex @@ -49,6 +49,7 @@ defmodule Explorer.Chain do @typep direction_option :: {:direction, direction} @typep inserted_after_option :: {:inserted_after, DateTime.t()} @typep necessity_by_association_option :: {:necessity_by_association, necessity_by_association} + @typep on_conflict_option :: {:on_conflict, :nothing | :replace_all} @typep pagination_option :: {:pagination, pagination} @typep paging_options :: {:paging_options, PagingOptions.t()} @typep params_option :: {:params, map()} @@ -60,7 +61,7 @@ defmodule Explorer.Chain do @typep internal_transactions_option :: {:internal_transactions, [params_option | timeout_option]} @typep logs_option :: {:logs, [params_option | timeout_option]} @typep receipts_option :: {:receipts, [params_option | timeout_option]} - @typep transactions_option :: {:transactions, [params_option | timeout_option]} + @typep transactions_option :: {:transactions, [on_conflict_option | params_option | timeout_option]} @doc """ `t:Explorer.Chain.InternalTransaction/0`s from `address`. @@ -650,6 +651,7 @@ defmodule Explorer.Chain do ...> ], ...> ], ...> transactions: [ + ...> on_conflict: :replace_all, ...> params: [ ...> %{ ...> block_hash: "0xf6b4b8c88df3ebd252ec476328334dc026cf66606a84fb769b3d3cbccc8471bd", @@ -874,6 +876,13 @@ defmodule Explorer.Chain do * `:timeout` - the timeout for the whole `c:Ecto.Repo.transaction/0` call. Defaults to `#{@transaction_timeout}` milliseconds. * `:transactions` + * `:on_conflict` - Whether to do `:nothing` or `:replace_all` columns when there is a pre-existing transaction + with the same hash. + + *NOTE*: Because the repository transaction for a pending `Explorer.Chain.Transaction`s could `COMMIT` after the + repository transaction for that same transaction being collated into a block, writers, it is recomended to use + `:nothing` for pending transactions and `:replace_all` for collated transactions, so that collated transactions + win. * `:params` - `list` of params for `Explorer.Chain.Transaction.changeset/2`. * `:timeout` - the timeout for inserting all transactions found in the params lists across all types. Defaults to `#{@insert_transactions_timeout}` milliseconds. @@ -1878,12 +1887,13 @@ defmodule Explorer.Chain do {:ok, inserted} end - @spec insert_transactions([map()], [timeout_option | timestamps_option]) :: + @spec insert_transactions([map()], [on_conflict_option | timeout_option | timestamps_option]) :: {:ok, [Hash.t()]} | {:error, [Changeset.t()]} defp insert_transactions(changes_list, named_arguments) when is_list(changes_list) and is_list(named_arguments) do timestamps = Keyword.fetch!(named_arguments, :timestamps) timeout = Keyword.fetch!(named_arguments, :timeout) + on_conflict = Keyword.fetch!(named_arguments, :on_conflict) # order so that row ShareLocks are grabbed in a consistent order ordered_changes_list = Enum.sort_by(changes_list, & &1.hash) @@ -1892,7 +1902,7 @@ defmodule Explorer.Chain do insert_changes_list( ordered_changes_list, conflict_target: :hash, - on_conflict: :replace_all, + on_conflict: on_conflict, for: Transaction, returning: [:hash], timeout: timeout, @@ -1985,12 +1995,16 @@ defmodule Explorer.Chain do when is_map(ecto_schema_module_to_changes_list) and is_list(options) do case ecto_schema_module_to_changes_list do %{Transaction => transactions_changes} -> + # check required options as early as possible + transactions_options = Keyword.fetch!(options, :transactions) + on_conflict = Keyword.fetch!(transactions_options, :on_conflict) timestamps = Keyword.fetch!(options, :timestamps) Multi.run(multi, :transactions, fn _ -> insert_transactions( transactions_changes, - timeout: options[:transations][:timeout] || @insert_transactions_timeout, + on_conflict: on_conflict, + timeout: transactions_options[:timeout] || @insert_transactions_timeout, timestamps: timestamps ) end) diff --git a/apps/explorer/lib/explorer/indexer/block_fetcher.ex b/apps/explorer/lib/explorer/indexer/block_fetcher.ex index bc3dbac4c1..016fa3c8d5 100644 --- a/apps/explorer/lib/explorer/indexer/block_fetcher.ex +++ b/apps/explorer/lib/explorer/indexer/block_fetcher.ex @@ -279,7 +279,7 @@ defmodule Explorer.Indexer.BlockFetcher do blocks: [params: blocks], logs: [params: logs], receipts: [params: receipts], - transactions: [params: transactions_with_receipts] + transactions: [on_conflict: :replace_all, params: transactions_with_receipts] ) else {step, {:error, reason}} ->