Merge pull request #1260 from poanetwork/on-conflict-where

Use where clause for on_conflict to only write tuples that are different
pull/1265/head
Luke Imhoff 6 years ago committed by GitHub
commit 4551d7f70c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      apps/ethereum_jsonrpc/test/ethereum_jsonrpc/rolling_window_test.exs
  2. 21
      apps/explorer/lib/explorer/chain.ex
  3. 11
      apps/explorer/lib/explorer/chain/import/runner/address/coin_balances.ex
  4. 38
      apps/explorer/lib/explorer/chain/import/runner/address/current_token_balances.ex
  5. 36
      apps/explorer/lib/explorer/chain/import/runner/address/token_balances.ex
  6. 11
      apps/explorer/lib/explorer/chain/import/runner/addresses.ex
  7. 8
      apps/explorer/lib/explorer/chain/import/runner/block/second_degree_relations.ex
  8. 20
      apps/explorer/lib/explorer/chain/import/runner/block_rewards.ex
  9. 10
      apps/explorer/lib/explorer/chain/import/runner/blocks.ex
  10. 22
      apps/explorer/lib/explorer/chain/import/runner/internal_transactions.ex
  11. 13
      apps/explorer/lib/explorer/chain/import/runner/logs.ex
  12. 11
      apps/explorer/lib/explorer/chain/import/runner/token_transfers.ex
  13. 12
      apps/explorer/lib/explorer/chain/import/runner/tokens.ex
  14. 3
      apps/explorer/lib/explorer/chain/import/runner/transaction/forks.ex
  15. 25
      apps/explorer/lib/explorer/chain/import/runner/transactions.ex
  16. 2
      apps/explorer/test/explorer/chain_test.exs
  17. 22
      apps/explorer/test/explorer/validator/metadata_importer_test.exs
  18. 2
      apps/explorer/test/explorer/validator/metadata_retriever_test.exs
  19. 2
      apps/indexer/test/indexer/token_transfer/uncataloged/worker_test.exs

@ -1,5 +1,7 @@
defmodule EthereumJSONRPC.RollingWindowTest do defmodule EthereumJSONRPC.RollingWindowTest do
use ExUnit.Case, async: true use ExUnit.Case,
# The same named process is used for all tests and they use the same key in the table, so they would interfere
async: false
alias EthereumJSONRPC.RollingWindow alias EthereumJSONRPC.RollingWindow

@ -18,7 +18,7 @@ defmodule Explorer.Chain do
] ]
alias Ecto.Adapters.SQL alias Ecto.Adapters.SQL
alias Ecto.Multi alias Ecto.{Changeset, Multi}
alias Explorer.Chain.{ alias Explorer.Chain.{
Address, Address,
@ -2047,12 +2047,27 @@ defmodule Explorer.Chain do
token_changeset = Token.changeset(token, params) token_changeset = Token.changeset(token, params)
address_name_changeset = Address.Name.changeset(%Address.Name{}, Map.put(params, :address_hash, address_hash)) address_name_changeset = Address.Name.changeset(%Address.Name{}, Map.put(params, :address_hash, address_hash))
token_opts = [on_conflict: Runner.Tokens.default_on_conflict(), conflict_target: :contract_address_hash] stale_error_field = :contract_address_hash
stale_error_message = "is up to date"
token_opts = [
on_conflict: Runner.Tokens.default_on_conflict(),
conflict_target: :contract_address_hash,
stale_error_field: stale_error_field,
stale_error_message: stale_error_message
]
address_name_opts = [on_conflict: :nothing, conflict_target: [:address_hash, :name]] address_name_opts = [on_conflict: :nothing, conflict_target: [:address_hash, :name]]
insert_result = insert_result =
Multi.new() Multi.new()
|> Multi.insert(:token, token_changeset, token_opts) |> Multi.run(:token, fn repo, _ ->
with {:error, %Changeset{errors: [{^stale_error_field, {^stale_error_message, []}}]}} <-
repo.insert(token_changeset, token_opts) do
# the original token passed into `update_token/2` as stale error means it is unchanged
{:ok, token}
end
end)
|> Multi.run( |> Multi.run(
:address_name, :address_name,
fn repo, _ -> fn repo, _ ->

@ -93,8 +93,6 @@ defmodule Explorer.Chain.Import.Runner.Address.CoinBalances do
balance in CoinBalance, balance in CoinBalance,
update: [ update: [
set: [ set: [
inserted_at: fragment("LEAST(EXCLUDED.inserted_at, ?)", balance.inserted_at),
updated_at: fragment("GREATEST(EXCLUDED.updated_at, ?)", balance.updated_at),
value: value:
fragment( fragment(
""" """
@ -120,9 +118,14 @@ defmodule Explorer.Chain.Import.Runner.Address.CoinBalances do
balance.value_fetched_at, balance.value_fetched_at,
balance.value_fetched_at, balance.value_fetched_at,
balance.value_fetched_at balance.value_fetched_at
) ),
] inserted_at: fragment("LEAST(EXCLUDED.inserted_at, ?)", balance.inserted_at),
updated_at: fragment("GREATEST(EXCLUDED.updated_at, ?)", balance.updated_at)
] ]
],
where:
fragment("EXCLUDED.value IS NOT NULL") and
(is_nil(balance.value_fetched_at) or fragment("EXCLUDED.value_fetched_at > ?", balance.value_fetched_at))
) )
end end
end end

@ -89,38 +89,14 @@ defmodule Explorer.Chain.Import.Runner.Address.CurrentTokenBalances do
current_token_balance in CurrentTokenBalance, current_token_balance in CurrentTokenBalance,
update: [ update: [
set: [ set: [
block_number: block_number: fragment("EXCLUDED.block_number"),
fragment( value: fragment("EXCLUDED.value"),
"CASE WHEN EXCLUDED.block_number > ? THEN EXCLUDED.block_number ELSE ? END", value_fetched_at: fragment("EXCLUDED.value_fetched_at"),
current_token_balance.block_number, inserted_at: fragment("LEAST(EXCLUDED.inserted_at, ?)", current_token_balance.inserted_at),
current_token_balance.block_number updated_at: fragment("GREATEST(EXCLUDED.updated_at, ?)", current_token_balance.updated_at)
),
inserted_at:
fragment(
"CASE WHEN EXCLUDED.block_number > ? THEN EXCLUDED.inserted_at ELSE ? END",
current_token_balance.block_number,
current_token_balance.inserted_at
),
updated_at:
fragment(
"CASE WHEN EXCLUDED.block_number > ? THEN EXCLUDED.updated_at ELSE ? END",
current_token_balance.block_number,
current_token_balance.updated_at
),
value:
fragment(
"CASE WHEN EXCLUDED.block_number > ? THEN EXCLUDED.value ELSE ? END",
current_token_balance.block_number,
current_token_balance.value
),
value_fetched_at:
fragment(
"CASE WHEN EXCLUDED.block_number > ? THEN EXCLUDED.value_fetched_at ELSE ? END",
current_token_balance.block_number,
current_token_balance.value_fetched_at
)
]
] ]
],
where: fragment("? < EXCLUDED.block_number", current_token_balance.block_number)
) )
end end
end end

@ -80,36 +80,16 @@ defmodule Explorer.Chain.Import.Runner.Address.TokenBalances do
token_balance in TokenBalance, token_balance in TokenBalance,
update: [ update: [
set: [ set: [
value: fragment("EXCLUDED.value"),
value_fetched_at: fragment("EXCLUDED.value_fetched_at"),
inserted_at: fragment("LEAST(EXCLUDED.inserted_at, ?)", token_balance.inserted_at), inserted_at: fragment("LEAST(EXCLUDED.inserted_at, ?)", token_balance.inserted_at),
updated_at: fragment("GREATEST(EXCLUDED.updated_at, ?)", token_balance.updated_at), updated_at: fragment("GREATEST(EXCLUDED.updated_at, ?)", token_balance.updated_at)
value:
fragment(
"""
CASE WHEN EXCLUDED.value IS NOT NULL AND (? IS NULL OR EXCLUDED.value_fetched_at > ?) THEN
EXCLUDED.value
ELSE
?
END
""",
token_balance.value_fetched_at,
token_balance.value_fetched_at,
token_balance.value
),
value_fetched_at:
fragment(
"""
CASE WHEN EXCLUDED.value IS NOT NULL AND (? IS NULL OR EXCLUDED.value_fetched_at > ?) THEN
EXCLUDED.value_fetched_at
ELSE
?
END
""",
token_balance.value_fetched_at,
token_balance.value_fetched_at,
token_balance.value_fetched_at
)
]
] ]
],
where:
fragment("EXCLUDED.value IS NOT NULL") and
(is_nil(token_balance.value_fetched_at) or
fragment("? < EXCLUDED.value_fetched_at", token_balance.value_fetched_at))
) )
end end
end end

@ -101,7 +101,16 @@ defmodule Explorer.Chain.Import.Runner.Addresses do
), ),
nonce: fragment("GREATEST(EXCLUDED.nonce, ?)", address.nonce) nonce: fragment("GREATEST(EXCLUDED.nonce, ?)", address.nonce)
] ]
] ],
# where any of `set`s would make a change
# This is so that tuples are only generated when a change would occur
where:
fragment("COALESCE(?, EXCLUDED.contract_code) IS DISTINCT FROM ?", address.contract_code, address.contract_code) or
fragment(
"EXCLUDED.fetched_coin_balance_block_number IS NOT NULL AND (? IS NULL OR EXCLUDED.fetched_coin_balance_block_number >= ?)",
address.fetched_coin_balance_block_number,
address.fetched_coin_balance_block_number
) or fragment("GREATEST(?, EXCLUDED.nonce) IS DISTINCT FROM ?", address.nonce, address.nonce)
) )
end end

@ -77,7 +77,13 @@ defmodule Explorer.Chain.Import.Runner.Block.SecondDegreeRelations do
uncle_fetched_at: uncle_fetched_at:
fragment("LEAST(?, EXCLUDED.uncle_fetched_at)", block_second_degree_relation.uncle_fetched_at) fragment("LEAST(?, EXCLUDED.uncle_fetched_at)", block_second_degree_relation.uncle_fetched_at)
] ]
] ],
where:
fragment(
"LEAST(?, EXCLUDED.uncle_fetched_at) IS DISTINCT FROM ?",
block_second_degree_relation.uncle_fetched_at,
block_second_degree_relation.uncle_fetched_at
)
) )
end end
end end

@ -3,8 +3,6 @@ defmodule Explorer.Chain.Import.Runner.Block.Rewards do
Bulk imports `t:Explorer.Chain.Block.Reward.t/0`. Bulk imports `t:Explorer.Chain.Block.Reward.t/0`.
""" """
import Ecto.Query, only: [from: 2]
alias Ecto.{Changeset, Multi, Repo} alias Ecto.{Changeset, Multi, Repo}
alias Explorer.Chain.Block.Reward alias Explorer.Chain.Block.Reward
alias Explorer.Chain.Import alias Explorer.Chain.Import
@ -53,27 +51,11 @@ defmodule Explorer.Chain.Import.Runner.Block.Rewards do
repo, repo,
changes_list, changes_list,
conflict_target: [:address_hash, :address_type, :block_hash], conflict_target: [:address_hash, :address_type, :block_hash],
on_conflict: on_conflict(), on_conflict: :nothing,
for: ecto_schema_module(), for: ecto_schema_module(),
returning: true, returning: true,
timeout: timeout, timeout: timeout,
timestamps: timestamps timestamps: timestamps
) )
end end
defp on_conflict do
from(
block_reward in Reward,
update: [
set: [
address_hash: block_reward.address_hash,
address_type: block_reward.address_type,
block_hash: block_reward.block_hash,
reward: block_reward.reward,
inserted_at: fragment("LEAST(?, EXCLUDED.inserted_at)", block_reward.inserted_at),
updated_at: fragment("GREATEST(?, EXCLUDED.updated_at)", block_reward.updated_at)
]
]
)
end
end end

@ -205,6 +205,7 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
) )
end end
# credo:disable-for-next-line Credo.Check.Refactor.CyclomaticComplexity
defp default_on_conflict do defp default_on_conflict do
from( from(
block in Block, block in Block,
@ -225,7 +226,14 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
inserted_at: fragment("LEAST(?, EXCLUDED.inserted_at)", block.inserted_at), inserted_at: fragment("LEAST(?, EXCLUDED.inserted_at)", block.inserted_at),
updated_at: fragment("GREATEST(?, EXCLUDED.updated_at)", block.updated_at) updated_at: fragment("GREATEST(?, EXCLUDED.updated_at)", block.updated_at)
] ]
] ],
where:
fragment("EXCLUDED.consensus <> ?", block.consensus) or fragment("EXCLUDED.difficulty <> ?", block.difficulty) or
fragment("EXCLUDED.gas_limit <> ?", block.gas_limit) or fragment("EXCLUDED.gas_used <> ?", block.gas_used) or
fragment("EXCLUDED.miner_hash <> ?", block.miner_hash) or fragment("EXCLUDED.nonce <> ?", block.nonce) or
fragment("EXCLUDED.number <> ?", block.number) or fragment("EXCLUDED.parent_hash <> ?", block.parent_hash) or
fragment("EXCLUDED.size <> ?", block.size) or fragment("EXCLUDED.timestamp <> ?", block.timestamp) or
fragment("EXCLUDED.total_difficulty <> ?", block.total_difficulty)
) )
end end

@ -120,7 +120,27 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do
inserted_at: fragment("LEAST(?, EXCLUDED.inserted_at)", internal_transaction.inserted_at), inserted_at: fragment("LEAST(?, EXCLUDED.inserted_at)", internal_transaction.inserted_at),
updated_at: fragment("GREATEST(?, EXCLUDED.updated_at)", internal_transaction.updated_at) updated_at: fragment("GREATEST(?, EXCLUDED.updated_at)", internal_transaction.updated_at)
] ]
] ],
# `IS DISTINCT FROM` is used because it allows `NULL` to be equal to itself
where:
fragment(
"(EXCLUDED.call_type, EXCLUDED.created_contract_address_hash, EXCLUDED.created_contract_code, EXCLUDED.error, EXCLUDED.from_address_hash, EXCLUDED.gas, EXCLUDED.gas_used, EXCLUDED.init, EXCLUDED.input, EXCLUDED.output, EXCLUDED.to_address_hash, EXCLUDED.trace_address, EXCLUDED.transaction_index, EXCLUDED.type, EXCLUDED.value) IS DISTINCT FROM (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
internal_transaction.call_type,
internal_transaction.created_contract_address_hash,
internal_transaction.created_contract_code,
internal_transaction.error,
internal_transaction.from_address_hash,
internal_transaction.gas,
internal_transaction.gas_used,
internal_transaction.init,
internal_transaction.input,
internal_transaction.output,
internal_transaction.to_address_hash,
internal_transaction.trace_address,
internal_transaction.transaction_index,
internal_transaction.type,
internal_transaction.value
)
) )
end end

@ -91,7 +91,18 @@ defmodule Explorer.Chain.Import.Runner.Logs do
inserted_at: fragment("LEAST(?, EXCLUDED.inserted_at)", log.inserted_at), inserted_at: fragment("LEAST(?, EXCLUDED.inserted_at)", log.inserted_at),
updated_at: fragment("GREATEST(?, EXCLUDED.updated_at)", log.updated_at) updated_at: fragment("GREATEST(?, EXCLUDED.updated_at)", log.updated_at)
] ]
] ],
where:
fragment(
"(EXCLUDED.address_hash, EXCLUDED.data, EXCLUDED.first_topic, EXCLUDED.second_topic, EXCLUDED.third_topic, EXCLUDED.fourth_topic, EXCLUDED.type) IS DISTINCT FROM (?, ?, ?, ?, ?, ?, ?)",
log.address_hash,
log.data,
log.first_topic,
log.second_topic,
log.third_topic,
log.fourth_topic,
log.type
)
) )
end end
end end

@ -85,7 +85,16 @@ defmodule Explorer.Chain.Import.Runner.TokenTransfers do
inserted_at: fragment("LEAST(?, EXCLUDED.inserted_at)", token_transfer.inserted_at), inserted_at: fragment("LEAST(?, EXCLUDED.inserted_at)", token_transfer.inserted_at),
updated_at: fragment("GREATEST(?, EXCLUDED.updated_at)", token_transfer.updated_at) updated_at: fragment("GREATEST(?, EXCLUDED.updated_at)", token_transfer.updated_at)
] ]
] ],
where:
fragment(
"(EXCLUDED.amount, EXCLUDED.from_address_hash, EXCLUDED.to_address_hash, EXCLUDED.token_contract_address_hash, EXCLUDED.token_id) IS DISTINCT FROM (?, ? ,? , ?, ?)",
token_transfer.amount,
token_transfer.from_address_hash,
token_transfer.to_address_hash,
token_transfer.token_contract_address_hash,
token_transfer.token_id
)
) )
end end
end end

@ -86,7 +86,17 @@ defmodule Explorer.Chain.Import.Runner.Tokens do
inserted_at: fragment("LEAST(?, EXCLUDED.inserted_at)", token.inserted_at), inserted_at: fragment("LEAST(?, EXCLUDED.inserted_at)", token.inserted_at),
updated_at: fragment("GREATEST(?, EXCLUDED.updated_at)", token.updated_at) updated_at: fragment("GREATEST(?, EXCLUDED.updated_at)", token.updated_at)
] ]
] ],
where:
fragment(
"(EXCLUDED.name, EXCLUDED.symbol, EXCLUDED.total_supply, EXCLUDED.decimals, EXCLUDED.type, EXCLUDED.cataloged) IS DISTINCT FROM (?, ?, ?, ?, ?, ?)",
token.name,
token.symbol,
token.total_supply,
token.decimals,
token.type,
token.cataloged
)
) )
end end
end end

@ -80,7 +80,8 @@ defmodule Explorer.Chain.Import.Runner.Transaction.Forks do
set: [ set: [
hash: fragment("EXCLUDED.hash") hash: fragment("EXCLUDED.hash")
] ]
] ],
where: fragment("EXCLUDED.hash <> ?", transaction_fork.hash)
) )
end end
end end

@ -103,7 +103,30 @@ defmodule Explorer.Chain.Import.Runner.Transactions do
inserted_at: fragment("LEAST(?, EXCLUDED.inserted_at)", transaction.inserted_at), inserted_at: fragment("LEAST(?, EXCLUDED.inserted_at)", transaction.inserted_at),
updated_at: fragment("GREATEST(?, EXCLUDED.updated_at)", transaction.updated_at) updated_at: fragment("GREATEST(?, EXCLUDED.updated_at)", transaction.updated_at)
] ]
] ],
where:
fragment(
"(EXCLUDED.block_hash, EXCLUDED.block_number, EXCLUDED.created_contract_address_hash, EXCLUDED.cumulative_gas_used, EXCLUDED.cumulative_gas_used, EXCLUDED.from_address_hash, EXCLUDED.gas, EXCLUDED.gas_price, EXCLUDED.gas_used, EXCLUDED.index, EXCLUDED.internal_transactions_indexed_at, EXCLUDED.input, EXCLUDED.nonce, EXCLUDED.r, EXCLUDED.s, EXCLUDED.status, EXCLUDED.to_address_hash, EXCLUDED.v, EXCLUDED.value) IS DISTINCT FROM (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
transaction.block_hash,
transaction.block_number,
transaction.created_contract_address_hash,
transaction.cumulative_gas_used,
transaction.cumulative_gas_used,
transaction.from_address_hash,
transaction.gas,
transaction.gas_price,
transaction.gas_used,
transaction.index,
transaction.internal_transactions_indexed_at,
transaction.input,
transaction.nonce,
transaction.r,
transaction.s,
transaction.status,
transaction.to_address_hash,
transaction.v,
transaction.value
)
) )
end end
end end

@ -22,8 +22,6 @@ defmodule Explorer.ChainTest do
Wei Wei
} }
alias Explorer.Chain.Block.Reward
alias Explorer.Chain.Supply.ProofOfAuthority alias Explorer.Chain.Supply.ProofOfAuthority
alias Explorer.Counters.{AddressesWithBalanceCounter, TokenHoldersCounter} alias Explorer.Counters.{AddressesWithBalanceCounter, TokenHoldersCounter}

@ -14,17 +14,19 @@ defmodule Explorer.Validator.MetadataImporterTest do
test "inserts new address names when there's none for the validators" do test "inserts new address names when there's none for the validators" do
address = insert(:address) address = insert(:address)
[%{address_hash: address.hash, name: "Testinit Unitorius", primary: true, metadata: %{"test" => "toast"}}] address_hash = address.hash
[%{address_hash: address_hash, name: "Testinit Unitorius", primary: true, metadata: %{"test" => "toast"}}]
|> MetadataImporter.import_metadata() |> MetadataImporter.import_metadata()
address_names = address_names =
from(an in Address.Name, where: an.address_hash == ^address.hash and an.primary == true) from(an in Address.Name, where: an.address_hash == ^address_hash and an.primary == true)
|> Repo.all() |> Repo.all()
expected_name = %Address.Name{address_hash: address.hash, name: "Testit Unitorus", metadata: %{"test" => "toast"}}
assert length(address_names) == 1 assert length(address_names) == 1
assert expected_name = hd(address_names)
assert %Address.Name{address_hash: ^address_hash, name: "Testinit Unitorius", metadata: %{"test" => "toast"}} =
hd(address_names)
end end
test "updates the primary address name if the validator already has one" do test "updates the primary address name if the validator already has one" do
@ -32,17 +34,19 @@ defmodule Explorer.Validator.MetadataImporterTest do
insert(:address_name, address: address, primary: true, name: "Nodealus Faileddi") insert(:address_name, address: address, primary: true, name: "Nodealus Faileddi")
[%{address_hash: address.hash, name: "Testit Unitorus", primary: true, metadata: %{"test" => "toast"}}] address_hash = address.hash
[%{address_hash: address_hash, name: "Testit Unitorus", primary: true, metadata: %{"test" => "toast"}}]
|> MetadataImporter.import_metadata() |> MetadataImporter.import_metadata()
address_names = address_names =
from(an in Address.Name, where: an.address_hash == ^address.hash and an.primary == true) from(an in Address.Name, where: an.address_hash == ^address.hash and an.primary == true)
|> Repo.all() |> Repo.all()
expected_name = %Address.Name{address_hash: address.hash, name: "Testit Unitorus", metadata: %{"test" => "toast"}}
assert length(address_names) == 1 assert length(address_names) == 1
assert expected_name = hd(address_names)
assert %Address.Name{address_hash: ^address_hash, name: "Testit Unitorus", metadata: %{"test" => "toast"}} =
hd(address_names)
end end
end end
end end

@ -47,7 +47,7 @@ defmodule Explorer.Validator.MetadataRetrieverTest do
expect( expect(
EthereumJSONRPC.Mox, EthereumJSONRPC.Mox,
:json_rpc, :json_rpc,
fn [%{id: id, method: _, params: _}], _options -> fn [%{id: ^id, method: _, params: _}], _options ->
{:ok, {:ok,
[ [
%{ %{

@ -24,7 +24,7 @@ defmodule Indexer.TokenTransfer.Uncataloged.WorkerTest do
test "sends shutdown to supervisor" do test "sends shutdown to supervisor" do
state = %{task_ref: nil, block_numbers: [], sup_pid: self()} state = %{task_ref: nil, block_numbers: [], sup_pid: self()}
Task.async(fn -> Worker.handle_info(:scan, state) end) Task.async(fn -> Worker.handle_info(:scan, state) end)
assert_receive {_, _, {:terminate, :normal}} assert_receive {_, _, {:terminate, :normal}}, 200
end end
test "sends message to self when uncataloged token transfers are found" do test "sends message to self when uncataloged token transfers are found" do

Loading…
Cancel
Save