Use where clause for on_conflict to only write tuples that are different

When the `ON CONFLICT` clause is triggered, it still causes a new tuple
in Postgres to be written even if the value of all the columns is the
same.  Making a new tuple means that the table needs to be vacuumed
sooner and leads to more work and churn for the database and its storage.
But, there is a way to prevent this unnecessary churn: Postgres supports
a `WHERE` clause on the `ON CONFLICT DO UPDATE` that allows us to not
write a new row if the data would be the same.

While testing this, it already showed it can prevent unnecessary updates
to the `tokens` table when the MetadataUpdater is running.
pull/1260/head
Luke Imhoff 6 years ago
parent 85883011eb
commit 04d3644839
  1. 21
      apps/explorer/lib/explorer/chain.ex
  2. 11
      apps/explorer/lib/explorer/chain/import/runner/address/coin_balances.ex
  3. 38
      apps/explorer/lib/explorer/chain/import/runner/address/current_token_balances.ex
  4. 36
      apps/explorer/lib/explorer/chain/import/runner/address/token_balances.ex
  5. 11
      apps/explorer/lib/explorer/chain/import/runner/addresses.ex
  6. 8
      apps/explorer/lib/explorer/chain/import/runner/block/second_degree_relations.ex
  7. 20
      apps/explorer/lib/explorer/chain/import/runner/block_rewards.ex
  8. 10
      apps/explorer/lib/explorer/chain/import/runner/blocks.ex
  9. 22
      apps/explorer/lib/explorer/chain/import/runner/internal_transactions.ex
  10. 13
      apps/explorer/lib/explorer/chain/import/runner/logs.ex
  11. 11
      apps/explorer/lib/explorer/chain/import/runner/token_transfers.ex
  12. 12
      apps/explorer/lib/explorer/chain/import/runner/tokens.ex
  13. 3
      apps/explorer/lib/explorer/chain/import/runner/transaction/forks.ex
  14. 25
      apps/explorer/lib/explorer/chain/import/runner/transactions.ex

@ -18,7 +18,7 @@ defmodule Explorer.Chain do
]
alias Ecto.Adapters.SQL
alias Ecto.Multi
alias Ecto.{Changeset, Multi}
alias Explorer.Chain.{
Address,
@ -2047,12 +2047,27 @@ defmodule Explorer.Chain do
token_changeset = Token.changeset(token, params)
address_name_changeset = Address.Name.changeset(%Address.Name{}, Map.put(params, :address_hash, address_hash))
token_opts = [on_conflict: Runner.Tokens.default_on_conflict(), conflict_target: :contract_address_hash]
stale_error_field = :contract_address_hash
stale_error_message = "is up to date"
token_opts = [
on_conflict: Runner.Tokens.default_on_conflict(),
conflict_target: :contract_address_hash,
stale_error_field: stale_error_field,
stale_error_message: stale_error_message
]
address_name_opts = [on_conflict: :nothing, conflict_target: [:address_hash, :name]]
insert_result =
Multi.new()
|> Multi.insert(:token, token_changeset, token_opts)
|> Multi.run(:token, fn repo, _ ->
with {:error, %Changeset{errors: [{^stale_error_field, {^stale_error_message, []}}]}} <-
repo.insert(token_changeset, token_opts) do
# the original token passed into `update_token/2` as stale error means it is unchanged
{:ok, token}
end
end)
|> Multi.run(
:address_name,
fn repo, _ ->

@ -93,8 +93,6 @@ defmodule Explorer.Chain.Import.Runner.Address.CoinBalances do
balance in CoinBalance,
update: [
set: [
inserted_at: fragment("LEAST(EXCLUDED.inserted_at, ?)", balance.inserted_at),
updated_at: fragment("GREATEST(EXCLUDED.updated_at, ?)", balance.updated_at),
value:
fragment(
"""
@ -120,9 +118,14 @@ defmodule Explorer.Chain.Import.Runner.Address.CoinBalances do
balance.value_fetched_at,
balance.value_fetched_at,
balance.value_fetched_at
)
]
),
inserted_at: fragment("LEAST(EXCLUDED.inserted_at, ?)", balance.inserted_at),
updated_at: fragment("GREATEST(EXCLUDED.updated_at, ?)", balance.updated_at)
]
],
where:
fragment("EXCLUDED.value IS NOT NULL") and
(is_nil(balance.value_fetched_at) or fragment("EXCLUDED.value_fetched_at > ?", balance.value_fetched_at))
)
end
end

@ -89,38 +89,14 @@ defmodule Explorer.Chain.Import.Runner.Address.CurrentTokenBalances do
current_token_balance in CurrentTokenBalance,
update: [
set: [
block_number:
fragment(
"CASE WHEN EXCLUDED.block_number > ? THEN EXCLUDED.block_number ELSE ? END",
current_token_balance.block_number,
current_token_balance.block_number
),
inserted_at:
fragment(
"CASE WHEN EXCLUDED.block_number > ? THEN EXCLUDED.inserted_at ELSE ? END",
current_token_balance.block_number,
current_token_balance.inserted_at
),
updated_at:
fragment(
"CASE WHEN EXCLUDED.block_number > ? THEN EXCLUDED.updated_at ELSE ? END",
current_token_balance.block_number,
current_token_balance.updated_at
),
value:
fragment(
"CASE WHEN EXCLUDED.block_number > ? THEN EXCLUDED.value ELSE ? END",
current_token_balance.block_number,
current_token_balance.value
),
value_fetched_at:
fragment(
"CASE WHEN EXCLUDED.block_number > ? THEN EXCLUDED.value_fetched_at ELSE ? END",
current_token_balance.block_number,
current_token_balance.value_fetched_at
)
]
block_number: fragment("EXCLUDED.block_number"),
value: fragment("EXCLUDED.value"),
value_fetched_at: fragment("EXCLUDED.value_fetched_at"),
inserted_at: fragment("LEAST(EXCLUDED.inserted_at, ?)", current_token_balance.inserted_at),
updated_at: fragment("GREATEST(EXCLUDED.updated_at, ?)", current_token_balance.updated_at)
]
],
where: fragment("? < EXCLUDED.block_number", current_token_balance.block_number)
)
end
end

@ -80,36 +80,16 @@ defmodule Explorer.Chain.Import.Runner.Address.TokenBalances do
token_balance in TokenBalance,
update: [
set: [
value: fragment("EXCLUDED.value"),
value_fetched_at: fragment("EXCLUDED.value_fetched_at"),
inserted_at: fragment("LEAST(EXCLUDED.inserted_at, ?)", token_balance.inserted_at),
updated_at: fragment("GREATEST(EXCLUDED.updated_at, ?)", token_balance.updated_at),
value:
fragment(
"""
CASE WHEN EXCLUDED.value IS NOT NULL AND (? IS NULL OR EXCLUDED.value_fetched_at > ?) THEN
EXCLUDED.value
ELSE
?
END
""",
token_balance.value_fetched_at,
token_balance.value_fetched_at,
token_balance.value
),
value_fetched_at:
fragment(
"""
CASE WHEN EXCLUDED.value IS NOT NULL AND (? IS NULL OR EXCLUDED.value_fetched_at > ?) THEN
EXCLUDED.value_fetched_at
ELSE
?
END
""",
token_balance.value_fetched_at,
token_balance.value_fetched_at,
token_balance.value_fetched_at
)
]
updated_at: fragment("GREATEST(EXCLUDED.updated_at, ?)", token_balance.updated_at)
]
],
where:
fragment("EXCLUDED.value IS NOT NULL") and
(is_nil(token_balance.value_fetched_at) or
fragment("? < EXCLUDED.value_fetched_at", token_balance.value_fetched_at))
)
end
end

@ -101,7 +101,16 @@ defmodule Explorer.Chain.Import.Runner.Addresses do
),
nonce: fragment("GREATEST(EXCLUDED.nonce, ?)", address.nonce)
]
]
],
# where any of `set`s would make a change
# This is so that tuples are only generated when a change would occur
where:
fragment("COALESCE(?, EXCLUDED.contract_code) IS DISTINCT FROM ?", address.contract_code, address.contract_code) or
fragment(
"EXCLUDED.fetched_coin_balance_block_number IS NOT NULL AND (? IS NULL OR EXCLUDED.fetched_coin_balance_block_number >= ?)",
address.fetched_coin_balance_block_number,
address.fetched_coin_balance_block_number
) or fragment("GREATEST(?, EXCLUDED.nonce) IS DISTINCT FROM ?", address.nonce, address.nonce)
)
end

@ -77,7 +77,13 @@ defmodule Explorer.Chain.Import.Runner.Block.SecondDegreeRelations do
uncle_fetched_at:
fragment("LEAST(?, EXCLUDED.uncle_fetched_at)", block_second_degree_relation.uncle_fetched_at)
]
]
],
where:
fragment(
"LEAST(?, EXCLUDED.uncle_fetched_at) IS DISTINCT FROM ?",
block_second_degree_relation.uncle_fetched_at,
block_second_degree_relation.uncle_fetched_at
)
)
end
end

@ -3,8 +3,6 @@ defmodule Explorer.Chain.Import.Runner.Block.Rewards do
Bulk imports `t:Explorer.Chain.Block.Reward.t/0`.
"""
import Ecto.Query, only: [from: 2]
alias Ecto.{Changeset, Multi, Repo}
alias Explorer.Chain.Block.Reward
alias Explorer.Chain.Import
@ -53,27 +51,11 @@ defmodule Explorer.Chain.Import.Runner.Block.Rewards do
repo,
changes_list,
conflict_target: [:address_hash, :address_type, :block_hash],
on_conflict: on_conflict(),
on_conflict: :nothing,
for: ecto_schema_module(),
returning: true,
timeout: timeout,
timestamps: timestamps
)
end
defp on_conflict do
from(
block_reward in Reward,
update: [
set: [
address_hash: block_reward.address_hash,
address_type: block_reward.address_type,
block_hash: block_reward.block_hash,
reward: block_reward.reward,
inserted_at: fragment("LEAST(?, EXCLUDED.inserted_at)", block_reward.inserted_at),
updated_at: fragment("GREATEST(?, EXCLUDED.updated_at)", block_reward.updated_at)
]
]
)
end
end

@ -205,6 +205,7 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
)
end
# credo:disable-for-next-line Credo.Check.Refactor.CyclomaticComplexity
defp default_on_conflict do
from(
block in Block,
@ -225,7 +226,14 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
inserted_at: fragment("LEAST(?, EXCLUDED.inserted_at)", block.inserted_at),
updated_at: fragment("GREATEST(?, EXCLUDED.updated_at)", block.updated_at)
]
]
],
where:
fragment("EXCLUDED.consensus <> ?", block.consensus) or fragment("EXCLUDED.difficulty <> ?", block.difficulty) or
fragment("EXCLUDED.gas_limit <> ?", block.gas_limit) or fragment("EXCLUDED.gas_used <> ?", block.gas_used) or
fragment("EXCLUDED.miner_hash <> ?", block.miner_hash) or fragment("EXCLUDED.nonce <> ?", block.nonce) or
fragment("EXCLUDED.number <> ?", block.number) or fragment("EXCLUDED.parent_hash <> ?", block.parent_hash) or
fragment("EXCLUDED.size <> ?", block.size) or fragment("EXCLUDED.timestamp <> ?", block.timestamp) or
fragment("EXCLUDED.total_difficulty <> ?", block.total_difficulty)
)
end

@ -120,7 +120,27 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do
inserted_at: fragment("LEAST(?, EXCLUDED.inserted_at)", internal_transaction.inserted_at),
updated_at: fragment("GREATEST(?, EXCLUDED.updated_at)", internal_transaction.updated_at)
]
]
],
# `IS DISTINCT FROM` is used because it allows `NULL` to be equal to itself
where:
fragment(
"(EXCLUDED.call_type, EXCLUDED.created_contract_address_hash, EXCLUDED.created_contract_code, EXCLUDED.error, EXCLUDED.from_address_hash, EXCLUDED.gas, EXCLUDED.gas_used, EXCLUDED.init, EXCLUDED.input, EXCLUDED.output, EXCLUDED.to_address_hash, EXCLUDED.trace_address, EXCLUDED.transaction_index, EXCLUDED.type, EXCLUDED.value) IS DISTINCT FROM (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
internal_transaction.call_type,
internal_transaction.created_contract_address_hash,
internal_transaction.created_contract_code,
internal_transaction.error,
internal_transaction.from_address_hash,
internal_transaction.gas,
internal_transaction.gas_used,
internal_transaction.init,
internal_transaction.input,
internal_transaction.output,
internal_transaction.to_address_hash,
internal_transaction.trace_address,
internal_transaction.transaction_index,
internal_transaction.type,
internal_transaction.value
)
)
end

@ -91,7 +91,18 @@ defmodule Explorer.Chain.Import.Runner.Logs do
inserted_at: fragment("LEAST(?, EXCLUDED.inserted_at)", log.inserted_at),
updated_at: fragment("GREATEST(?, EXCLUDED.updated_at)", log.updated_at)
]
]
],
where:
fragment(
"(EXCLUDED.address_hash, EXCLUDED.data, EXCLUDED.first_topic, EXCLUDED.second_topic, EXCLUDED.third_topic, EXCLUDED.fourth_topic, EXCLUDED.type) IS DISTINCT FROM (?, ?, ?, ?, ?, ?, ?)",
log.address_hash,
log.data,
log.first_topic,
log.second_topic,
log.third_topic,
log.fourth_topic,
log.type
)
)
end
end

@ -85,7 +85,16 @@ defmodule Explorer.Chain.Import.Runner.TokenTransfers do
inserted_at: fragment("LEAST(?, EXCLUDED.inserted_at)", token_transfer.inserted_at),
updated_at: fragment("GREATEST(?, EXCLUDED.updated_at)", token_transfer.updated_at)
]
]
],
where:
fragment(
"(EXCLUDED.amount, EXCLUDED.from_address_hash, EXCLUDED.to_address_hash, EXCLUDED.token_contract_address_hash, EXCLUDED.token_id) IS DISTINCT FROM (?, ? ,? , ?, ?)",
token_transfer.amount,
token_transfer.from_address_hash,
token_transfer.to_address_hash,
token_transfer.token_contract_address_hash,
token_transfer.token_id
)
)
end
end

@ -86,7 +86,17 @@ defmodule Explorer.Chain.Import.Runner.Tokens do
inserted_at: fragment("LEAST(?, EXCLUDED.inserted_at)", token.inserted_at),
updated_at: fragment("GREATEST(?, EXCLUDED.updated_at)", token.updated_at)
]
]
],
where:
fragment(
"(EXCLUDED.name, EXCLUDED.symbol, EXCLUDED.total_supply, EXCLUDED.decimals, EXCLUDED.type, EXCLUDED.cataloged) IS DISTINCT FROM (?, ?, ?, ?, ?, ?)",
token.name,
token.symbol,
token.total_supply,
token.decimals,
token.type,
token.cataloged
)
)
end
end

@ -80,7 +80,8 @@ defmodule Explorer.Chain.Import.Runner.Transaction.Forks do
set: [
hash: fragment("EXCLUDED.hash")
]
]
],
where: fragment("EXCLUDED.hash <> ?", transaction_fork.hash)
)
end
end

@ -103,7 +103,30 @@ defmodule Explorer.Chain.Import.Runner.Transactions do
inserted_at: fragment("LEAST(?, EXCLUDED.inserted_at)", transaction.inserted_at),
updated_at: fragment("GREATEST(?, EXCLUDED.updated_at)", transaction.updated_at)
]
]
],
where:
fragment(
"(EXCLUDED.block_hash, EXCLUDED.block_number, EXCLUDED.created_contract_address_hash, EXCLUDED.cumulative_gas_used, EXCLUDED.cumulative_gas_used, EXCLUDED.from_address_hash, EXCLUDED.gas, EXCLUDED.gas_price, EXCLUDED.gas_used, EXCLUDED.index, EXCLUDED.internal_transactions_indexed_at, EXCLUDED.input, EXCLUDED.nonce, EXCLUDED.r, EXCLUDED.s, EXCLUDED.status, EXCLUDED.to_address_hash, EXCLUDED.v, EXCLUDED.value) IS DISTINCT FROM (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
transaction.block_hash,
transaction.block_number,
transaction.created_contract_address_hash,
transaction.cumulative_gas_used,
transaction.cumulative_gas_used,
transaction.from_address_hash,
transaction.gas,
transaction.gas_price,
transaction.gas_used,
transaction.index,
transaction.internal_transactions_indexed_at,
transaction.input,
transaction.nonce,
transaction.r,
transaction.s,
transaction.status,
transaction.to_address_hash,
transaction.v,
transaction.value
)
)
end
end

Loading…
Cancel
Save