From 0aa3edab5914c8b9abba808f9e8cba80ab1cb597 Mon Sep 17 00:00:00 2001 From: pasqu4le Date: Tue, 27 Aug 2019 14:22:08 +0200 Subject: [PATCH] Execute all address' transactions page queries in parallel --- CHANGELOG.md | 1 + apps/explorer/lib/explorer/chain.ex | 76 +++++++++++++++---- .../chain/address_transaction_csv_exporter.ex | 5 +- .../lib/explorer/chain/transaction.ex | 44 ++++------- 4 files changed, 82 insertions(+), 44 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f8335319cb..6392632d70 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ ## Current ### Features +- [#2636](https://github.com/poanetwork/blockscout/pull/2636) - Execute all address' transactions page queries in parallel - [#2596](https://github.com/poanetwork/blockscout/pull/2596) - support AuRa's empty step reward type - [#2561](https://github.com/poanetwork/blockscout/pull/2561) - Add token's type to the response of tokenlist method - [#2499](https://github.com/poanetwork/blockscout/pull/2499) - import emission reward ranges diff --git a/apps/explorer/lib/explorer/chain.ex b/apps/explorer/lib/explorer/chain.ex index 58253c0f27..f0374c5d3f 100644 --- a/apps/explorer/lib/explorer/chain.ex +++ b/apps/explorer/lib/explorer/chain.ex @@ -230,9 +230,8 @@ defmodule Explorer.Chain do Reward.fetch_emission_rewards_tuples(address_hash, paging_options) end) - address_hash - |> address_to_transactions_without_rewards(paging_options, options) - |> Enum.concat(Task.await(rewards_task, :timer.seconds(20))) + [rewards_task | address_to_transactions_tasks(address_hash, options)] + |> wait_for_address_transactions() |> Enum.sort_by(fn item -> case item do {%Reward{} = emission_reward, _} -> @@ -242,25 +241,76 @@ defmodule Explorer.Chain do {-item.block_number, -item.index} end end) + |> Enum.dedup_by(fn item -> + case item do + {%Reward{} = emission_reward, _} -> + {emission_reward.block_hash, emission_reward.address_hash, emission_reward.address_type} + + transaction -> + transaction.hash + end + end) |> Enum.take(paging_options.page_size) else - address_to_transactions_without_rewards(address_hash, paging_options, options) + address_to_transactions_without_rewards(address_hash, options) end end - def address_to_transactions_without_rewards(address_hash, paging_options, options) do + def address_to_transactions_without_rewards(address_hash, options) do + paging_options = Keyword.get(options, :paging_options, @default_paging_options) + + address_hash + |> address_to_transactions_tasks(options) + |> wait_for_address_transactions() + |> Enum.sort_by(&{&1.block_number, &1.index}, &>=/2) + |> Enum.dedup_by(& &1.hash) + |> Enum.take(paging_options.page_size) + end + + defp address_to_transactions_tasks(address_hash, options) do + paging_options = Keyword.get(options, :paging_options, @default_paging_options) direction = Keyword.get(options, :direction) necessity_by_association = Keyword.get(options, :necessity_by_association, %{}) - transaction_hashes_from_token_transfers = - TokenTransfer.where_any_address_fields_match(direction, address_hash, paging_options) + base_query = + paging_options + |> fetch_transactions() + |> join_associations(necessity_by_association) + |> Transaction.preload_token_transfers(address_hash) - paging_options - |> fetch_transactions() - |> Transaction.where_transaction_matches(transaction_hashes_from_token_transfers, direction, address_hash) - |> join_associations(necessity_by_association) - |> Transaction.preload_token_transfers(address_hash) - |> Repo.all() + direction_tasks = + base_query + |> Transaction.matching_address_queries_list(direction, address_hash) + |> Enum.map(fn query -> Task.async(fn -> Repo.all(query) end) end) + + token_transfers_task = + Task.async(fn -> + transaction_hashes_from_token_transfers = + TokenTransfer.where_any_address_fields_match(direction, address_hash, paging_options) + + final_query = where(base_query, [t], t.hash in ^transaction_hashes_from_token_transfers) + + Repo.all(final_query) + end) + + [token_transfers_task | direction_tasks] + end + + defp wait_for_address_transactions(tasks) do + tasks + |> Task.yield_many(:timer.seconds(20)) + |> Enum.flat_map(fn {_task, res} -> + case res do + {:ok, result} -> + result + + {:exit, reason} -> + raise "Query fetching address transactions terminated: #{inspect(reason)}" + + nil -> + raise "Query fetching address transactions timed out." + end + end) end @spec address_to_logs(Hash.Address.t(), Keyword.t()) :: [Log.t()] diff --git a/apps/explorer/lib/explorer/chain/address_transaction_csv_exporter.ex b/apps/explorer/lib/explorer/chain/address_transaction_csv_exporter.ex index 0196de3bc0..d608cc55e3 100644 --- a/apps/explorer/lib/explorer/chain/address_transaction_csv_exporter.ex +++ b/apps/explorer/lib/explorer/chain/address_transaction_csv_exporter.ex @@ -42,8 +42,9 @@ defmodule Explorer.Chain.AddressTransactionCsvExporter do end defp fetch_all_transactions(address_hash, paging_options, acc \\ []) do - transactions = - Chain.address_to_transactions_without_rewards(address_hash, paging_options, @necessity_by_association) + options = Keyword.put(@necessity_by_association, :paging_options, paging_options) + + transactions = Chain.address_to_transactions_without_rewards(address_hash, options) new_acc = transactions ++ acc diff --git a/apps/explorer/lib/explorer/chain/transaction.ex b/apps/explorer/lib/explorer/chain/transaction.ex index 78d162fa4e..1274202bd5 100644 --- a/apps/explorer/lib/explorer/chain/transaction.ex +++ b/apps/explorer/lib/explorer/chain/transaction.ex @@ -484,40 +484,26 @@ defmodule Explorer.Chain.Transaction do end @doc """ - Modifies a query to filter for transactions whose hash is in a list or that are - linked to the given address_hash through a direction. - - Be careful to not pass a large list, because this will lead to performance - problems. + Produces a list of queries starting from the given one and adding filters for + transactions that are linked to the given address_hash through a direction. """ - def where_transaction_matches(query, transaction_hashes, :from, address_hash) do - where( - query, - [t], - t.hash in ^transaction_hashes or - t.from_address_hash == ^address_hash - ) + def matching_address_queries_list(query, :from, address_hash) do + [where(query, [t], t.from_address_hash == ^address_hash)] end - def where_transaction_matches(query, transaction_hashes, :to, address_hash) do - where( - query, - [t], - t.hash in ^transaction_hashes or - t.to_address_hash == ^address_hash or - t.created_contract_address_hash == ^address_hash - ) + def matching_address_queries_list(query, :to, address_hash) do + [ + where(query, [t], t.to_address_hash == ^address_hash), + where(query, [t], t.created_contract_address_hash == ^address_hash) + ] end - def where_transaction_matches(query, transaction_hashes, _direction, address_hash) do - where( - query, - [t], - t.hash in ^transaction_hashes or - t.from_address_hash == ^address_hash or - t.to_address_hash == ^address_hash or - t.created_contract_address_hash == ^address_hash - ) + def matching_address_queries_list(query, _direction, address_hash) do + [ + where(query, [t], t.from_address_hash == ^address_hash), + where(query, [t], t.to_address_hash == ^address_hash), + where(query, [t], t.created_contract_address_hash == ^address_hash) + ] end @collated_fields ~w(block_number cumulative_gas_used gas_used index)a