Land #285: on_conflict for insert_internal_transactions

pull/291/head
Luke Imhoff 7 years ago committed by GitHub
commit fa9ac73125
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 147
      apps/explorer/lib/explorer/chain.ex
  2. 33
      apps/explorer/lib/explorer/repo.ex
  3. 2
      apps/explorer/test/explorer/buffered_task_test.exs
  4. 40
      apps/explorer/test/explorer/repo_test.exs
  5. 2
      coveralls.json

@ -873,6 +873,151 @@ defmodule Explorer.Chain do
status: {"can't be blank when the transaction is collated into a block", []}
]
Because there are multiple processes potentially writing to the same tables at the same time,
`c:Ecto.Repo.insert_all/2`'s
[`:conflict_target` and `:on_conflict` options](https://hexdocs.pm/ecto/Ecto.Repo.html#c:insert_all/3-options) are
used to perform [upserts](https://hexdocs.pm/ecto/Ecto.Repo.html#c:insert_all/3-upserts) on all tables, so that
a pre-existing unique key will not trigger a failure, but instead replace or otherwise update the row.
iex> options = [
...> blocks: [
...> params: [
...> %{
...> difficulty: 340282366920938463463374607431768211454,
...> gas_limit: 6946336,
...> gas_used: 50450,
...> hash: "0xf6b4b8c88df3ebd252ec476328334dc026cf66606a84fb769b3d3cbccc8471bd",
...> miner_hash: "0xe8ddc5c7a2d2f0d7a9798459c0104fdf5e987aca",
...> nonce: 0,
...> number: 37,
...> parent_hash: "0xc37bbad7057945d1bf128c1ff009fb1ad632110bf6a000aac025a80f7766b66e",
...> size: 719,
...> timestamp: Timex.parse!("2017-12-15T21:06:30Z", "{ISO:Extended:Z}"),
...> total_difficulty: 12590447576074723148144860474975121280509
...> }
...> ],
...> ],
...> internal_transactions: [
...> params: [
...> %{
...> call_type: "call",
...> from_address_hash: "0xe8ddc5c7a2d2f0d7a9798459c0104fdf5e987aca",
...> gas: 4677320,
...> gas_used: 27770,
...> index: 0,
...> output: "0x",
...> to_address_hash: "0x8bf38d4764929064f2d4d3a56520a76ab3df415b",
...> trace_address: [],
...> transaction_hash: "0x53bd884872de3e488692881baeec262e7b95234d3965248c39fe992fffd433e5",
...> type: "call",
...> value: 0
...> }
...> ],
...> ],
...> logs: [
...> params: [
...> %{
...> address_hash: "0x8bf38d4764929064f2d4d3a56520a76ab3df415b",
...> data: "0x000000000000000000000000862d67cb0773ee3f8ce7ea89b328ffea861ab3ef",
...> first_topic: "0x600bcf04a13e752d1e3670a5a9f1c21177ca2a93c6f5391d4f1298d098097c22",
...> fourth_topic: nil,
...> index: 0,
...> second_topic: nil,
...> third_topic: nil,
...> transaction_hash: "0x53bd884872de3e488692881baeec262e7b95234d3965248c39fe992fffd433e5",
...> type: "mined"
...> }
...> ],
...> ],
...> transactions: [
...> on_conflict: :replace_all,
...> params: [
...> %{
...> block_hash: "0xf6b4b8c88df3ebd252ec476328334dc026cf66606a84fb769b3d3cbccc8471bd",
...> block_number: 37,
...> cumulative_gas_used: 50450,
...> from_address_hash: "0xe8ddc5c7a2d2f0d7a9798459c0104fdf5e987aca",
...> gas: 4700000,
...> gas_price: 100000000000,
...> gas_used: 50450,
...> hash: "0x53bd884872de3e488692881baeec262e7b95234d3965248c39fe992fffd433e5",
...> index: 0,
...> input: "0x10855269000000000000000000000000862d67cb0773ee3f8ce7ea89b328ffea861ab3ef",
...> nonce: 4,
...> public_key: "0xe5d196ad4ceada719d9e592f7166d0c75700f6eab2e3c3de34ba751ea786527cb3f6eb96ad9fdfdb9989ff572df50f1c42ef800af9c5207a38b929aff969b5c9",
...> r: 0xa7f8f45cce375bb7af8750416e1b03e0473f93c256da2285d1134fc97a700e01,
...> s: 0x1f87a076f13824f4be8963e3dffd7300dae64d5f23c9a062af0c6ead347c135f,
...> standard_v: 1,
...> status: :ok,
...> to_address_hash: "0x8bf38d4764929064f2d4d3a56520a76ab3df415b",
...> v: 0xbe,
...> value: 0
...> }
...> ]
...> ],
...> addresses: [
...> params: [
...> %{hash: "0x8bf38d4764929064f2d4d3a56520a76ab3df415b"},
...> %{hash: "0xe8ddc5c7a2d2f0d7a9798459c0104fdf5e987aca"}
...> ]
...> ]
...> ]
iex> {:ok, inserted} = Explorer.Chain.import_blocks(options)
iex> {:ok, ^inserted} = Explorer.Chain.import_blocks(options)
{:ok,
%{
addresses: [
%Explorer.Chain.Hash{
byte_count: 20,
bytes: <<139, 243, 141, 71, 100, 146, 144, 100, 242, 212, 211,
165, 101, 32, 167, 106, 179, 223, 65, 91>>
},
%Explorer.Chain.Hash{
byte_count: 20,
bytes: <<232, 221, 197, 199, 162, 210, 240, 215, 169, 121,
132, 89, 192, 16, 79, 223, 94, 152, 122, 202>>
}
],
blocks: [
%Explorer.Chain.Hash{
byte_count: 32,
bytes: <<246, 180, 184, 200, 141, 243, 235, 210, 82, 236, 71,
99, 40, 51, 77, 192, 38, 207, 102, 96, 106, 132, 251, 118,
155, 61, 60, 188, 204, 132, 113, 189>>
}
],
internal_transactions: [
%{
index: 0,
transaction_hash: %Explorer.Chain.Hash{
byte_count: 32,
bytes: <<83, 189, 136, 72, 114, 222, 62, 72, 134, 146, 136,
27, 174, 236, 38, 46, 123, 149, 35, 77, 57, 101, 36, 140,
57, 254, 153, 47, 255, 212, 51, 229>>
}
}
],
logs: [
%{
index: 0,
transaction_hash: %Explorer.Chain.Hash{
byte_count: 32,
bytes: <<83, 189, 136, 72, 114, 222, 62, 72, 134, 146, 136,
27, 174, 236, 38, 46, 123, 149, 35, 77, 57, 101, 36, 140,
57, 254, 153, 47, 255, 212, 51, 229>>
}
}
],
transactions: [
%Explorer.Chain.Hash{
byte_count: 32,
bytes: <<83, 189, 136, 72, 114, 222, 62, 72, 134, 146, 136,
27, 174, 236, 38, 46, 123, 149, 35, 77, 57, 101, 36, 140,
57, 254, 153, 47, 255, 212, 51, 229>>
}
]
}}
## Tree
* `t:Explorer.Chain.Block.t/0`s
@ -1874,7 +2019,9 @@ defmodule Explorer.Chain do
{:ok, internal_transactions} =
insert_changes_list(
ordered_changes_list,
conflict_target: [:transaction_hash, :index],
for: InternalTransaction,
on_conflict: :replace_all,
returning: [:index, :transaction_hash],
timestamps: timestamps
)

@ -2,6 +2,8 @@ defmodule Explorer.Repo do
use Ecto.Repo, otp_app: :explorer
use Scrivener, page_size: 10
require Logger
@doc """
Dynamically loads the repository url from the
DATABASE_URL environment variable.
@ -21,7 +23,36 @@ defmodule Explorer.Repo do
elements
|> Enum.chunk_every(1000)
|> Enum.reduce({0, []}, fn chunk, {total_count, acc} ->
{count, inserted} = insert_all(kind, chunk, opts)
{count, inserted} =
try do
insert_all(kind, chunk, opts)
rescue
exception ->
Logger.error(fn ->
[
"Could not insert all of chunk into ",
to_string(kind),
" using options because of error.\n",
"\n",
"Chunk:\n",
"\n",
inspect(chunk),
"\n",
"\n",
"Options:\n",
"\n",
inspect(opts),
"\n",
"\n",
"Exception:\n",
"\n",
Exception.format(:error, exception)
]
end)
# reraise to kill caller
raise exception
end
if returning do
{count + total_count, acc ++ inserted}

@ -122,7 +122,7 @@ defmodule Explorer.BufferedTaskTest do
BufferedTask.buffer(buffer, [:boom])
assert_receive {:run, {0, :boom}}
assert_receive {:run, {1, :boom}}
assert_receive {:run, {1, :boom}}, 200
refute_receive _
end

@ -0,0 +1,40 @@
defmodule Explorer.RepoTest do
use Explorer.DataCase
import ExUnit.CaptureLog, only: [capture_log: 1]
alias Ecto.Changeset
alias Explorer.Chain.InternalTransaction
@moduletag :capture_log
describe "safe_insert_all/3" do
test "inserting duplicate rows in one chunk is logged before re-raising exception" do
transaction = insert(:transaction)
params = params_for(:internal_transaction, transaction_hash: transaction.hash, index: 0)
%Changeset{valid?: true, changes: changes} = InternalTransaction.changeset(%InternalTransaction{}, params)
at = DateTime.utc_now()
timestamped_changes = Map.merge(changes, %{inserted_at: at, updated_at: at})
log =
capture_log(fn ->
assert_raise Postgrex.Error, fn ->
Repo.safe_insert_all(
InternalTransaction,
[timestamped_changes, timestamped_changes],
conflict_target: [:transaction_hash, :index],
on_conflict: :replace_all
)
end
end)
assert log =~ "Chunk:\n"
assert log =~ "index: 0"
assert log =~ "Options:\n\n[conflict_target: [:transaction_hash, :index], on_conflict: :replace_all]\n\n"
assert log =~
"Exception:\n\n** (Postgrex.Error) ERROR 21000 (cardinality_violation): ON CONFLICT DO UPDATE command cannot affect row a second time\n"
end
end
end

@ -1,7 +1,7 @@
{
"coverage_options": {
"treat_no_relevant_lines_as_covered": true,
"minimum_coverage": 92.7
"minimum_coverage": 93.0
},
"terminal_options": {
"file_column_width": 120

Loading…
Cancel
Save