Merge branch 'master' into ab-batch-token-updates

pull/2748/head
Ayrat Badykov 5 years ago committed by GitHub
commit 3855be4af4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 68
      apps/explorer/lib/explorer/chain/import/runner/blocks.ex
  3. 109
      apps/explorer/lib/explorer/chain/import/runner/internal_transactions.ex
  4. 2
      apps/explorer/lib/explorer/chain/import/runner/internal_transactions_indexed_at_blocks.ex
  5. 72
      apps/explorer/test/explorer/chain/import/runner/internal_transactions_test.exs

@ -1,6 +1,7 @@
## Current ## Current
### Features ### Features
- [#2726](https://github.com/poanetwork/blockscout/pull/2726) - Remove internal_transaction block_number setting from blocks runner
- [#2717](https://github.com/poanetwork/blockscout/pull/2717) - Improve speed of nonconsensus data removal - [#2717](https://github.com/poanetwork/blockscout/pull/2717) - Improve speed of nonconsensus data removal
- [#2679](https://github.com/poanetwork/blockscout/pull/2679) - added fixed height for card chain blocks and card chain transactions - [#2679](https://github.com/poanetwork/blockscout/pull/2679) - added fixed height for card chain blocks and card chain transactions
- [#2678](https://github.com/poanetwork/blockscout/pull/2678) - fixed dashboard banner height bug - [#2678](https://github.com/poanetwork/blockscout/pull/2678) - fixed dashboard banner height bug

@ -56,7 +56,7 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
# Note, needs to be executed after `lose_consensus` for lock acquisition # Note, needs to be executed after `lose_consensus` for lock acquisition
insert(repo, changes_list, insert_options) insert(repo, changes_list, insert_options)
end) end)
|> Multi.run(:uncle_fetched_block_second_degree_relations, fn repo, %{blocks: blocks} when is_list(blocks) -> |> Multi.run(:uncle_fetched_block_second_degree_relations, fn repo, _ ->
update_block_second_degree_relations(repo, hashes, %{ update_block_second_degree_relations(repo, hashes, %{
timeout: timeout:
options[Runner.Block.SecondDegreeRelations.option_key()][:timeout] || options[Runner.Block.SecondDegreeRelations.option_key()][:timeout] ||
@ -86,15 +86,9 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
|> Multi.run(:remove_nonconsensus_logs, fn repo, %{derive_transaction_forks: transactions} -> |> Multi.run(:remove_nonconsensus_logs, fn repo, %{derive_transaction_forks: transactions} ->
remove_nonconsensus_logs(repo, transactions, insert_options) remove_nonconsensus_logs(repo, transactions, insert_options)
end) end)
|> Multi.run(:acquire_internal_transactions, fn repo, %{derive_transaction_forks: transactions} ->
acquire_internal_transactions(repo, hashes, transactions)
end)
|> Multi.run(:remove_nonconsensus_internal_transactions, fn repo, %{derive_transaction_forks: transactions} -> |> Multi.run(:remove_nonconsensus_internal_transactions, fn repo, %{derive_transaction_forks: transactions} ->
remove_nonconsensus_internal_transactions(repo, transactions, insert_options) remove_nonconsensus_internal_transactions(repo, transactions, insert_options)
end) end)
|> Multi.run(:internal_transaction_transaction_block_number, fn repo, _ ->
update_internal_transaction_block_number(repo, hashes)
end)
|> Multi.run(:acquire_contract_address_tokens, fn repo, _ -> |> Multi.run(:acquire_contract_address_tokens, fn repo, _ ->
acquire_contract_address_tokens(repo, consensus_block_numbers) acquire_contract_address_tokens(repo, consensus_block_numbers)
end) end)
@ -139,26 +133,6 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
Tokens.acquire_contract_address_tokens(repo, contract_address_hashes) Tokens.acquire_contract_address_tokens(repo, contract_address_hashes)
end end
defp acquire_internal_transactions(repo, hashes, forked_transaction_hashes) do
query =
from(internal_transaction in InternalTransaction,
join: transaction in Transaction,
on: internal_transaction.transaction_hash == transaction.hash,
where: transaction.block_hash in ^hashes,
or_where: transaction.hash in ^forked_transaction_hashes,
select: {internal_transaction.transaction_hash, internal_transaction.index},
# Enforce InternalTransaction ShareLocks order (see docs: sharelocks.md)
order_by: [
internal_transaction.transaction_hash,
internal_transaction.index
],
# NOTE: find a better way to know the alias that ecto gives to token
lock: "FOR UPDATE OF i0"
)
{:ok, repo.all(query)}
end
defp fork_transactions(%{ defp fork_transactions(%{
repo: repo, repo: repo,
timeout: timeout, timeout: timeout,
@ -379,13 +353,27 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
defp remove_nonconsensus_internal_transactions(repo, forked_transaction_hashes, %{timeout: timeout}) do defp remove_nonconsensus_internal_transactions(repo, forked_transaction_hashes, %{timeout: timeout}) do
query = query =
from(internal_transaction in InternalTransaction, from(
internal_transaction in InternalTransaction,
where: internal_transaction.transaction_hash in ^forked_transaction_hashes, where: internal_transaction.transaction_hash in ^forked_transaction_hashes,
select: map(internal_transaction, [:transaction_hash, :index]) select: %{transaction_hash: internal_transaction.transaction_hash},
# Enforce InternalTransaction ShareLocks order (see docs: sharelocks.md)
order_by: [
internal_transaction.transaction_hash,
internal_transaction.index
],
lock: "FOR UPDATE"
)
delete_query =
from(
i in InternalTransaction,
join: s in subquery(query),
on: i.transaction_hash == s.transaction_hash,
select: map(i, [:transaction_hash, :index])
) )
# ShareLocks order already enforced by `acquire_internal_transactions` (see docs: sharelocks.md) {_count, deleted_internal_transactions} = repo.delete_all(delete_query, timeout: timeout)
{_count, deleted_internal_transactions} = repo.delete_all(query, timeout: timeout)
{:ok, deleted_internal_transactions} {:ok, deleted_internal_transactions}
rescue rescue
@ -645,24 +633,6 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
end end
end end
defp update_internal_transaction_block_number(repo, blocks_hashes) when is_list(blocks_hashes) do
query =
from(
internal_transaction in InternalTransaction,
join: transaction in Transaction,
on: internal_transaction.transaction_hash == transaction.hash,
join: block in Block,
on: block.hash == transaction.block_hash,
where: block.hash in ^blocks_hashes,
update: [set: [block_number: block.number]]
)
# ShareLocks order already enforced by `acquire_internal_transactions` (see docs: sharelocks.md)
{total, _} = repo.update_all(query, [])
{:ok, total}
end
defp where_forked(blocks_changes) when is_list(blocks_changes) do defp where_forked(blocks_changes) when is_list(blocks_changes) do
initial = from(t in Transaction, where: false) initial = from(t in Transaction, where: false)

@ -4,9 +4,10 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do
""" """
require Ecto.Query require Ecto.Query
require Logger
alias Ecto.{Changeset, Multi, Repo} alias Ecto.{Changeset, Multi, Repo}
alias Explorer.Chain.{Hash, Import, InternalTransaction, Transaction} alias Explorer.Chain.{Block, Hash, Import, InternalTransaction, Transaction}
alias Explorer.Chain.Import.Runner alias Explorer.Chain.Import.Runner
import Ecto.Query, only: [from: 2] import Ecto.Query, only: [from: 2]
@ -52,32 +53,43 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do
|> Multi.run(:acquire_transactions, fn repo, _ -> |> Multi.run(:acquire_transactions, fn repo, _ ->
acquire_transactions(repo, changes_list) acquire_transactions(repo, changes_list)
end) end)
|> Multi.run(:internal_transactions, fn repo, _ -> |> Multi.run(:internal_transactions, fn repo, %{acquire_transactions: transactions} ->
insert(repo, changes_list, insert_options) insert(repo, changes_list, transactions, insert_options)
end) end)
|> Multi.run(:internal_transactions_indexed_at_transactions, fn repo, %{acquire_transactions: transaction_hashes} -> |> Multi.run(:internal_transactions_indexed_at_transactions, fn repo, %{acquire_transactions: transactions} ->
update_transactions(repo, transaction_hashes, update_transactions_options) update_transactions(repo, transactions, update_transactions_options)
end) end)
|> Multi.run(
:remove_consensus_of_missing_transactions_blocks,
fn repo, %{internal_transactions: inserted} = results_map ->
# NOTE: for this to work it has to follow the runner `internal_transactions_indexed_at_blocks`
block_hashes = Map.get(results_map, :internal_transactions_indexed_at_blocks, [])
remove_consensus_of_missing_transactions_blocks(repo, block_hashes, changes_list, inserted)
end
)
end end
@impl Runner @impl Runner
def timeout, do: @timeout def timeout, do: @timeout
@spec insert(Repo.t(), [map], %{ @spec insert(Repo.t(), [map], [Transaction.t()], %{
optional(:on_conflict) => Runner.on_conflict(), optional(:on_conflict) => Runner.on_conflict(),
required(:timeout) => timeout, required(:timeout) => timeout,
required(:timestamps) => Import.timestamps() required(:timestamps) => Import.timestamps()
}) :: }) ::
{:ok, [%{index: non_neg_integer, transaction_hash: Hash.t()}]} {:ok, [%{index: non_neg_integer, transaction_hash: Hash.t()}]}
| {:error, [Changeset.t()]} | {:error, [Changeset.t()]}
defp insert(repo, changes_list, %{timeout: timeout, timestamps: timestamps} = options) defp insert(repo, changes_list, transactions, %{timeout: timeout, timestamps: timestamps} = options)
when is_list(changes_list) do when is_list(changes_list) do
on_conflict = Map.get_lazy(options, :on_conflict, &default_on_conflict/0) on_conflict = Map.get_lazy(options, :on_conflict, &default_on_conflict/0)
# Enforce InternalTransaction ShareLocks order (see docs: sharelocks.md) transactions_map = Map.new(transactions, &{&1.hash, &1})
ordered_changes_list = Enum.sort_by(changes_list, &{&1.transaction_hash, &1.index})
final_changes_list = reject_pending_transactions(ordered_changes_list, repo) final_changes_list =
changes_list
# Enforce InternalTransaction ShareLocks order (see docs: sharelocks.md)
|> Enum.sort_by(&{&1.transaction_hash, &1.index})
|> reject_missing_transactions(transactions_map)
{:ok, internal_transactions} = {:ok, internal_transactions} =
Import.insert_changes_list( Import.insert_changes_list(
@ -86,16 +98,12 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do
conflict_target: [:transaction_hash, :index], conflict_target: [:transaction_hash, :index],
for: InternalTransaction, for: InternalTransaction,
on_conflict: on_conflict, on_conflict: on_conflict,
returning: [:transaction_hash, :index], returning: true,
timeout: timeout, timeout: timeout,
timestamps: timestamps timestamps: timestamps
) )
{:ok, {:ok, internal_transactions}
for(
internal_transaction <- internal_transactions,
do: Map.take(internal_transaction, [:id, :index, :transaction_hash])
)}
end end
defp default_on_conflict do defp default_on_conflict do
@ -158,26 +166,28 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do
from( from(
t in Transaction, t in Transaction,
where: t.hash in ^transaction_hashes, where: t.hash in ^transaction_hashes,
# do not consider pending transactions
where: not is_nil(t.block_hash), where: not is_nil(t.block_hash),
select: t.hash, select: map(t, [:hash, :block_hash, :block_number]),
# Enforce Transaction ShareLocks order (see docs: sharelocks.md) # Enforce Transaction ShareLocks order (see docs: sharelocks.md)
order_by: t.hash, order_by: t.hash,
lock: "FOR UPDATE" lock: "FOR UPDATE"
) )
hashes = repo.all(query) {:ok, repo.all(query)}
{:ok, hashes}
end end
defp update_transactions(repo, transaction_hashes, %{ defp update_transactions(repo, transactions, %{
timeout: timeout, timeout: timeout,
timestamps: timestamps timestamps: timestamps
}) })
when is_list(transaction_hashes) do when is_list(transactions) do
transaction_hashes = Enum.map(transactions, & &1.hash)
update_query = update_query =
from( from(
t in Transaction, t in Transaction,
# pending transactions are already excluded by `acquire_transactions`
where: t.hash in ^transaction_hashes, where: t.hash in ^transaction_hashes,
# ShareLocks order already enforced by `acquire_transactions` (see docs: sharelocks.md) # ShareLocks order already enforced by `acquire_transactions` (see docs: sharelocks.md)
update: [ update: [
@ -214,22 +224,51 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do
end end
end end
defp reject_pending_transactions(ordered_changes_list, repo) do # If not using Parity this is not relevant
transaction_hashes = defp remove_consensus_of_missing_transactions_blocks(_, [], _, _), do: {:ok, []}
ordered_changes_list
|> Enum.map(& &1.transaction_hash)
|> Enum.dedup()
query = defp remove_consensus_of_missing_transactions_blocks(repo, block_hashes, changes_list, inserted) do
from(t in Transaction, inserted_block_numbers = MapSet.new(inserted, & &1.block_number)
where: t.hash in ^transaction_hashes,
where: is_nil(t.block_hash), missing_transactions_block_numbers =
select: t.hash changes_list
|> MapSet.new(& &1.block_number)
|> MapSet.difference(inserted_block_numbers)
|> MapSet.to_list()
update_query =
from(
b in Block,
where: b.number in ^missing_transactions_block_numbers,
where: b.hash in ^block_hashes,
# ShareLocks order already enforced by `internal_transactions_indexed_at_blocks` (see docs: sharelocks.md)
update: [set: [consensus: false, internal_transactions_indexed_at: nil]]
) )
pending_transactions = repo.all(query) try do
{_num, result} = repo.update_all(update_query, [])
Logger.debug(fn ->
[
"consensus removed from blocks with numbers: ",
inspect(missing_transactions_block_numbers),
" because of missing transactions"
]
end)
{:ok, result}
rescue
postgrex_error in Postgrex.Error ->
{:error, %{exception: postgrex_error, missing_transactions_block_numbers: missing_transactions_block_numbers}}
end
end
ordered_changes_list defp reject_missing_transactions(ordered_changes_list, transactions_map) do
|> Enum.reject(fn %{transaction_hash: hash} -> Enum.member?(pending_transactions, hash) end) Enum.reject(ordered_changes_list, fn %{transaction_hash: hash} ->
transactions_map
|> Map.get(hash, %{})
|> Map.get(:block_hash)
|> is_nil()
end)
end end
end end

@ -68,7 +68,7 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactionsIndexedAtBlocks do
try do try do
{_, result} = {_, result} =
repo.update_all( repo.update_all(
from(b in Block, join: s in subquery(query), on: b.hash == s.hash), from(b in Block, join: s in subquery(query), on: b.hash == s.hash, select: b.hash),
[set: [internal_transactions_indexed_at: timestamps.updated_at]], [set: [internal_transactions_indexed_at: timestamps.updated_at]],
timeout: timeout timeout: timeout
) )

@ -2,7 +2,7 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactionsTest do
use Explorer.DataCase use Explorer.DataCase
alias Ecto.Multi alias Ecto.Multi
alias Explorer.Chain.{Data, Wei, Transaction, InternalTransaction} alias Explorer.Chain.{Block, Data, Wei, Transaction, InternalTransaction}
alias Explorer.Chain.Import.Runner.InternalTransactions alias Explorer.Chain.Import.Runner.InternalTransactions
describe "run/1" do describe "run/1" do
@ -42,10 +42,78 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactionsTest do
assert is_nil(Repo.get(Transaction, pending.hash).block_hash) assert is_nil(Repo.get(Transaction, pending.hash).block_hash)
end end
test "removes consensus to blocks where transactions are missing" do
empty_block = insert(:block)
pending = insert(:transaction)
assert is_nil(pending.block_hash)
full_block = insert(:block)
inserted = insert(:transaction) |> with_block(full_block)
assert full_block.hash == inserted.block_hash
index = 0
pending_transaction_changes =
pending.hash
|> make_internal_transaction_changes(index, nil)
|> Map.put(:block_number, empty_block.number)
transaction_changes =
inserted.hash
|> make_internal_transaction_changes(index, nil)
|> Map.put(:block_number, full_block.number)
multi =
Multi.new()
|> Multi.run(:internal_transactions_indexed_at_blocks, fn _, _ -> {:ok, [empty_block.hash, full_block.hash]} end)
assert {:ok, _} = run_internal_transactions([pending_transaction_changes, transaction_changes], multi)
assert from(i in InternalTransaction, where: i.transaction_hash == ^pending.hash) |> Repo.one() |> is_nil()
assert %{consensus: false} = Repo.get(Block, empty_block.hash)
assert from(i in InternalTransaction, where: i.transaction_hash == ^inserted.hash) |> Repo.one() |> is_nil() ==
false
assert %{consensus: true} = Repo.get(Block, full_block.hash)
end end
defp run_internal_transactions(changes_list) when is_list(changes_list) do test "does not remove consensus when block is empty and no transactions are missing" do
empty_block = insert(:block)
full_block = insert(:block)
inserted = insert(:transaction) |> with_block(full_block)
assert full_block.hash == inserted.block_hash
index = 0
transaction_changes =
inserted.hash
|> make_internal_transaction_changes(index, nil)
|> Map.put(:block_number, full_block.number)
multi =
Multi.new() Multi.new()
|> Multi.run(:internal_transactions_indexed_at_blocks, fn _, _ -> {:ok, [empty_block.hash, full_block.hash]} end)
assert {:ok, _} = run_internal_transactions([transaction_changes], multi)
assert %{consensus: true} = Repo.get(Block, empty_block.hash)
assert from(i in InternalTransaction, where: i.transaction_hash == ^inserted.hash) |> Repo.one() |> is_nil() ==
false
assert %{consensus: true} = Repo.get(Block, full_block.hash)
end
end
defp run_internal_transactions(changes_list, multi \\ Multi.new()) when is_list(changes_list) do
multi
|> InternalTransactions.run(changes_list, %{ |> InternalTransactions.run(changes_list, %{
timeout: :infinity, timeout: :infinity,
timestamps: %{inserted_at: DateTime.utc_now(), updated_at: DateTime.utc_now()} timestamps: %{inserted_at: DateTime.utc_now(), updated_at: DateTime.utc_now()}

Loading…
Cancel
Save