Improve locks acquiring (#7947)

* Decrease locks level

* Move tokens lock acquiring

* Update EmptyBlocksSanitizer locks
pull/7980/head
Qwerty5Uiop 1 year ago committed by GitHub
parent 6b047b5144
commit ccab6d6cb9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 6
      apps/explorer/lib/explorer/chain.ex
  3. 30
      apps/explorer/lib/explorer/chain/import/runner/address/current_token_balances.ex
  4. 2
      apps/explorer/lib/explorer/chain/import/runner/addresses.ex
  5. 29
      apps/explorer/lib/explorer/chain/import/runner/blocks.ex
  6. 4
      apps/explorer/lib/explorer/chain/import/runner/internal_transactions.ex
  7. 80
      apps/explorer/lib/explorer/chain/import/runner/tokens.ex
  8. 2
      apps/explorer/lib/explorer/chain/import/runner/transactions.ex
  9. 21
      apps/indexer/lib/indexer/fetcher/empty_blocks_sanitizer.ex
  10. 2
      apps/indexer/lib/indexer/temporary/blocks_transactions_mismatch.ex

@ -16,6 +16,7 @@
- [#7859](https://github.com/blockscout/blockscout/pull/7859) - Add TokenTotalSupplyUpdater - [#7859](https://github.com/blockscout/blockscout/pull/7859) - Add TokenTotalSupplyUpdater
- [#7873](https://github.com/blockscout/blockscout/pull/7873) - Chunk realtime balances requests - [#7873](https://github.com/blockscout/blockscout/pull/7873) - Chunk realtime balances requests
- [#7927](https://github.com/blockscout/blockscout/pull/7927) - Delete token balances only for blocks that lost consensus - [#7927](https://github.com/blockscout/blockscout/pull/7927) - Delete token balances only for blocks that lost consensus
- [#7947](https://github.com/blockscout/blockscout/pull/7947) - Improve locks acquiring
### Fixes ### Fixes

@ -4221,7 +4221,7 @@ defmodule Explorer.Chain do
where: address_name.address_hash == ^address_hash, where: address_name.address_hash == ^address_hash,
# Enforce Name ShareLocks order (see docs: sharelocks.md) # Enforce Name ShareLocks order (see docs: sharelocks.md)
order_by: [asc: :address_hash, asc: :name], order_by: [asc: :address_hash, asc: :name],
lock: "FOR UPDATE" lock: "FOR NO KEY UPDATE"
) )
repo.update_all( repo.update_all(
@ -5159,7 +5159,7 @@ defmodule Explorer.Chain do
) )
# Enforce Transaction ShareLocks order (see docs: sharelocks.md) # Enforce Transaction ShareLocks order (see docs: sharelocks.md)
|> order_by(asc: :hash) |> order_by(asc: :hash)
|> lock("FOR UPDATE") |> lock("FOR NO KEY UPDATE")
hashes = Enum.map(transactions, & &1.hash) hashes = Enum.map(transactions, & &1.hash)
@ -5204,7 +5204,7 @@ defmodule Explorer.Chain do
end) end)
# Enforce Transaction ShareLocks order (see docs: sharelocks.md) # Enforce Transaction ShareLocks order (see docs: sharelocks.md)
|> order_by(asc: :hash) |> order_by(asc: :hash)
|> lock("FOR UPDATE") |> lock("FOR NO KEY UPDATE")
Repo.update_all( Repo.update_all(
from(t in Transaction, join: s in subquery(query), on: t.hash == s.hash), from(t in Transaction, join: s in subquery(query), on: t.hash == s.hash),

@ -107,35 +107,13 @@ defmodule Explorer.Chain.Import.Runner.Address.CurrentTokenBalances do
|> Map.put_new(:timeout, @timeout) |> Map.put_new(:timeout, @timeout)
|> Map.put(:timestamps, timestamps) |> Map.put(:timestamps, timestamps)
# Enforce ShareLocks tables order (see docs: sharelocks.md)
run_func = fn repo ->
token_contract_address_hashes_and_ids =
changes_list
|> Enum.map(fn change ->
token_id = get_token_id(change)
{change.token_contract_address_hash, token_id}
end)
|> Enum.uniq()
Tokens.acquire_contract_address_tokens(repo, token_contract_address_hashes_and_ids)
end
multi multi
|> Multi.run(:acquire_contract_address_tokens, fn repo, _ ->
Instrumenter.block_import_stage_runner(
fn -> run_func.(repo) end,
:block_following,
:current_token_balances,
:acquire_contract_address_tokens
)
end)
|> Multi.run(:address_current_token_balances, fn repo, _ -> |> Multi.run(:address_current_token_balances, fn repo, _ ->
Instrumenter.block_import_stage_runner( Instrumenter.block_import_stage_runner(
fn -> insert(repo, changes_list, insert_options) end, fn -> insert(repo, changes_list, insert_options) end,
:block_following, :block_following,
:current_token_balances, :current_token_balances,
:acquire_contract_address_tokens :address_current_token_balances
) )
end) end)
|> Multi.run(:address_current_token_balances_update_token_holder_counts, fn repo, |> Multi.run(:address_current_token_balances_update_token_holder_counts, fn repo,
@ -156,15 +134,11 @@ defmodule Explorer.Chain.Import.Runner.Address.CurrentTokenBalances do
end, end,
:block_following, :block_following,
:current_token_balances, :current_token_balances,
:acquire_contract_address_tokens :address_current_token_balances_update_token_holder_counts
) )
end) end)
end end
defp get_token_id(change) do
if Map.has_key?(change, :token_id), do: change.token_id, else: nil
end
@impl Import.Runner @impl Import.Runner
def timeout, do: @timeout def timeout, do: @timeout

@ -243,7 +243,7 @@ defmodule Explorer.Chain.Import.Runner.Addresses do
where: t.created_contract_address_hash in ^ordered_created_contract_hashes, where: t.created_contract_address_hash in ^ordered_created_contract_hashes,
# Enforce Transaction ShareLocks order (see docs: sharelocks.md) # Enforce Transaction ShareLocks order (see docs: sharelocks.md)
order_by: t.hash, order_by: t.hash,
lock: "FOR UPDATE" lock: "FOR NO KEY UPDATE"
) )
try do try do

@ -149,14 +149,6 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
:derive_transaction_forks :derive_transaction_forks
) )
end) end)
|> Multi.run(:acquire_contract_address_tokens, fn repo, %{lose_consensus: non_consensus_blocks} ->
Instrumenter.block_import_stage_runner(
fn -> acquire_contract_address_tokens(repo, non_consensus_blocks) end,
:address_referencing,
:blocks,
:acquire_contract_address_tokens
)
end)
|> Multi.run(:delete_address_token_balances, fn repo, %{lose_consensus: non_consensus_blocks} -> |> Multi.run(:delete_address_token_balances, fn repo, %{lose_consensus: non_consensus_blocks} ->
Instrumenter.block_import_stage_runner( Instrumenter.block_import_stage_runner(
fn -> delete_address_token_balances(repo, non_consensus_blocks, insert_options) end, fn -> delete_address_token_balances(repo, non_consensus_blocks, insert_options) end,
@ -205,21 +197,6 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
@impl Runner @impl Runner
def timeout, do: @timeout def timeout, do: @timeout
defp acquire_contract_address_tokens(repo, non_consensus_blocks) do
non_consensus_block_numbers = Enum.map(non_consensus_blocks, fn {number, _hash} -> number end)
query =
from(ctb in Address.CurrentTokenBalance,
where: ctb.block_number in ^non_consensus_block_numbers,
select: {ctb.token_contract_address_hash, ctb.token_id},
distinct: [ctb.token_contract_address_hash, ctb.token_id]
)
contract_address_hashes_and_token_ids = repo.all(query)
Tokens.acquire_contract_address_tokens(repo, contract_address_hashes_and_token_ids)
end
defp fork_transactions(%{ defp fork_transactions(%{
repo: repo, repo: repo,
timeout: timeout, timeout: timeout,
@ -232,7 +209,7 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
select: transaction, select: transaction,
# Enforce Transaction ShareLocks order (see docs: sharelocks.md) # Enforce Transaction ShareLocks order (see docs: sharelocks.md)
order_by: [asc: :hash], order_by: [asc: :hash],
lock: "FOR UPDATE" lock: "FOR NO KEY UPDATE"
) )
update_query = update_query =
@ -378,7 +355,7 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
select: block.hash, select: block.hash,
# Enforce Block ShareLocks order (see docs: sharelocks.md) # Enforce Block ShareLocks order (see docs: sharelocks.md)
order_by: [asc: block.hash], order_by: [asc: block.hash],
lock: "FOR UPDATE" lock: "FOR NO KEY UPDATE"
) )
{_, removed_consensus_block_hashes} = {_, removed_consensus_block_hashes} =
@ -683,7 +660,7 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
where: bsdr.uncle_hash in ^uncle_hashes, where: bsdr.uncle_hash in ^uncle_hashes,
# Enforce SeconDegreeRelation ShareLocks order (see docs: sharelocks.md) # Enforce SeconDegreeRelation ShareLocks order (see docs: sharelocks.md)
order_by: [asc: :nephew_hash, asc: :uncle_hash], order_by: [asc: :nephew_hash, asc: :uncle_hash],
lock: "FOR UPDATE" lock: "FOR NO KEY UPDATE"
) )
update_query = update_query =

@ -286,7 +286,7 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do
select: block.hash, select: block.hash,
# Enforce Block ShareLocks order (see docs: sharelocks.md) # Enforce Block ShareLocks order (see docs: sharelocks.md)
order_by: [asc: block.hash], order_by: [asc: block.hash],
lock: "FOR UPDATE" lock: "FOR NO KEY UPDATE"
) )
{:ok, repo.all(query)} {:ok, repo.all(query)}
@ -314,7 +314,7 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do
select: map(t, [:hash, :block_hash, :block_number, :cumulative_gas_used]), select: map(t, [:hash, :block_hash, :block_number, :cumulative_gas_used]),
# Enforce Transaction ShareLocks order (see docs: sharelocks.md) # Enforce Transaction ShareLocks order (see docs: sharelocks.md)
order_by: [asc: t.hash], order_by: [asc: t.hash],
lock: "FOR UPDATE" lock: "FOR NO KEY UPDATE"
) )
{:ok, repo.all(query)} {:ok, repo.all(query)}

@ -22,79 +22,10 @@ defmodule Explorer.Chain.Import.Runner.Tokens do
@type holder_count :: non_neg_integer() @type holder_count :: non_neg_integer()
@type token_holder_count :: %{contract_address_hash: Hash.Address.t(), count: holder_count()} @type token_holder_count :: %{contract_address_hash: Hash.Address.t(), count: holder_count()}
def acquire_contract_address_tokens(repo, contract_address_hashes_and_token_ids) do
initial_query_no_token_id =
from(token in Token,
select: token
)
initial_query_with_token_id =
from(token in Token,
left_join: instance in Token.Instance,
on: token.contract_address_hash == instance.token_contract_address_hash,
select: token
)
{query_no_token_id, query_with_token_id} =
contract_address_hashes_and_token_ids
|> Enum.reduce({initial_query_no_token_id, initial_query_with_token_id}, fn {contract_address_hash, token_id},
{query_no_token_id,
query_with_token_id} ->
if is_nil(token_id) do
{from(
token in query_no_token_id,
or_where: token.contract_address_hash == ^contract_address_hash
), query_with_token_id}
else
{query_no_token_id,
from(
[token, instance] in query_with_token_id,
or_where: token.contract_address_hash == ^contract_address_hash and instance.token_id == ^token_id
)}
end
end)
final_query_no_token_id =
if query_no_token_id == initial_query_no_token_id do
nil
else
from(
token in query_no_token_id,
# Enforce Token ShareLocks order (see docs: sharelocks.md)
order_by: [
token.contract_address_hash
],
lock: "FOR NO KEY UPDATE"
)
end
final_query_with_token_id =
if query_with_token_id == initial_query_with_token_id do
nil
else
from(
[token, instance] in query_with_token_id,
# Enforce Token ShareLocks order (see docs: sharelocks.md)
order_by: [
token.contract_address_hash,
instance.token_id
],
lock: "FOR NO KEY UPDATE OF t0"
)
end
tokens_no_token_id = (final_query_no_token_id && repo.all(final_query_no_token_id)) || []
tokens_with_token_id = (final_query_with_token_id && repo.all(final_query_with_token_id)) || []
tokens = tokens_no_token_id ++ tokens_with_token_id
{:ok, tokens}
end
def update_holder_counts_with_deltas(repo, token_holder_count_deltas, %{ def update_holder_counts_with_deltas(repo, token_holder_count_deltas, %{
timeout: timeout, timeout: timeout,
timestamps: %{updated_at: updated_at} timestamps: %{updated_at: updated_at}
}) do }) do
# NOTE that acquire_contract_address_tokens needs to be called before this
{hashes, deltas} = {hashes, deltas} =
token_holder_count_deltas token_holder_count_deltas
|> Enum.map(fn %{contract_address_hash: contract_address_hash, delta: delta} -> |> Enum.map(fn %{contract_address_hash: contract_address_hash, delta: delta} ->
@ -103,6 +34,15 @@ defmodule Explorer.Chain.Import.Runner.Tokens do
end) end)
|> Enum.unzip() |> Enum.unzip()
token_query =
from(
token in Token,
where: token.contract_address_hash in ^hashes,
select: token.contract_address_hash,
order_by: token.contract_address_hash,
lock: "FOR NO KEY UPDATE"
)
query = query =
from( from(
token in Token, token in Token,
@ -113,8 +53,8 @@ defmodule Explorer.Chain.Import.Runner.Tokens do
^deltas ^deltas
), ),
on: token.contract_address_hash == deltas.contract_address_hash, on: token.contract_address_hash == deltas.contract_address_hash,
where: token.contract_address_hash in subquery(token_query),
where: not is_nil(token.holder_count), where: not is_nil(token.holder_count),
# ShareLocks order already enforced by `acquire_contract_address_tokens` (see docs: sharelocks.md)
update: [ update: [
set: [ set: [
holder_count: token.holder_count + deltas.delta, holder_count: token.holder_count + deltas.delta,

@ -215,7 +215,7 @@ defmodule Explorer.Chain.Import.Runner.Transactions do
where: block.hash in ^block_hashes, where: block.hash in ^block_hashes,
# Enforce Block ShareLocks order (see docs: sharelocks.md) # Enforce Block ShareLocks order (see docs: sharelocks.md)
order_by: [asc: block.hash], order_by: [asc: block.hash],
lock: "FOR UPDATE" lock: "FOR NO KEY UPDATE"
) )
try do try do

@ -68,19 +68,15 @@ defmodule Indexer.Fetcher.EmptyBlocksSanitizer do
end end
defp sanitize_empty_blocks(json_rpc_named_arguments) do defp sanitize_empty_blocks(json_rpc_named_arguments) do
unprocessed_non_empty_blocks_from_db = unprocessed_non_empty_blocks_query_list(limit()) unprocessed_non_empty_blocks_query = unprocessed_non_empty_blocks_query(limit())
uniq_block_hashes = unprocessed_non_empty_blocks_from_db
if Enum.count(uniq_block_hashes) > 0 do
Repo.update_all( Repo.update_all(
from( from(
block in Block, block in Block,
where: block.hash in ^uniq_block_hashes where: block.hash in subquery(unprocessed_non_empty_blocks_query)
), ),
set: [is_empty: false, updated_at: Timex.now()] set: [is_empty: false, updated_at: Timex.now()]
) )
end
unprocessed_empty_blocks_from_db = unprocessed_empty_blocks_query_list(limit()) unprocessed_empty_blocks_from_db = unprocessed_empty_blocks_query_list(limit())
@ -141,25 +137,20 @@ defmodule Indexer.Fetcher.EmptyBlocksSanitizer do
where: block.consensus == true, where: block.consensus == true,
order_by: [asc: block.hash], order_by: [asc: block.hash],
limit: ^limit, limit: ^limit,
offset: 1000, offset: 1000
lock: "FOR UPDATE"
) )
end end
defp unprocessed_non_empty_blocks_query_list(limit) do defp unprocessed_non_empty_blocks_query(limit) do
blocks_query = consensus_blocks_with_nil_is_empty_query(limit) blocks_query = consensus_blocks_with_nil_is_empty_query(limit)
query =
from(q in subquery(blocks_query), from(q in subquery(blocks_query),
inner_join: transaction in Transaction, inner_join: transaction in Transaction,
on: q.number == transaction.block_number, on: q.number == transaction.block_number,
select: q.hash, select: q.hash,
distinct: q.hash, order_by: [asc: q.hash],
order_by: [asc: q.hash] lock: fragment("FOR NO KEY UPDATE OF ?", q)
) )
query
|> Repo.all(timeout: :infinity)
end end
defp unprocessed_empty_blocks_query_list(limit) do defp unprocessed_empty_blocks_query_list(limit) do

@ -136,7 +136,7 @@ defmodule Indexer.Temporary.BlocksTransactionsMismatch do
where: block.hash in ^hashes, where: block.hash in ^hashes,
# Enforce Block ShareLocks order (see docs: sharelocks.md) # Enforce Block ShareLocks order (see docs: sharelocks.md)
order_by: [asc: block.hash], order_by: [asc: block.hash],
lock: "FOR UPDATE" lock: "FOR NO KEY UPDATE"
) )
Repo.update_all( Repo.update_all(

Loading…
Cancel
Save