Execute all address' transactions page queries in parallel

pull/2636/head
pasqu4le 5 years ago
parent 81d2b10959
commit 0aa3edab59
No known key found for this signature in database
GPG Key ID: 8F3EE01F1DC90687
  1. 1
      CHANGELOG.md
  2. 70
      apps/explorer/lib/explorer/chain.ex
  3. 5
      apps/explorer/lib/explorer/chain/address_transaction_csv_exporter.ex
  4. 44
      apps/explorer/lib/explorer/chain/transaction.ex

@ -1,6 +1,7 @@
## Current ## Current
### Features ### Features
- [#2636](https://github.com/poanetwork/blockscout/pull/2636) - Execute all address' transactions page queries in parallel
- [#2596](https://github.com/poanetwork/blockscout/pull/2596) - support AuRa's empty step reward type - [#2596](https://github.com/poanetwork/blockscout/pull/2596) - support AuRa's empty step reward type
- [#2561](https://github.com/poanetwork/blockscout/pull/2561) - Add token's type to the response of tokenlist method - [#2561](https://github.com/poanetwork/blockscout/pull/2561) - Add token's type to the response of tokenlist method
- [#2499](https://github.com/poanetwork/blockscout/pull/2499) - import emission reward ranges - [#2499](https://github.com/poanetwork/blockscout/pull/2499) - import emission reward ranges

@ -230,9 +230,8 @@ defmodule Explorer.Chain do
Reward.fetch_emission_rewards_tuples(address_hash, paging_options) Reward.fetch_emission_rewards_tuples(address_hash, paging_options)
end) end)
address_hash [rewards_task | address_to_transactions_tasks(address_hash, options)]
|> address_to_transactions_without_rewards(paging_options, options) |> wait_for_address_transactions()
|> Enum.concat(Task.await(rewards_task, :timer.seconds(20)))
|> Enum.sort_by(fn item -> |> Enum.sort_by(fn item ->
case item do case item do
{%Reward{} = emission_reward, _} -> {%Reward{} = emission_reward, _} ->
@ -242,25 +241,76 @@ defmodule Explorer.Chain do
{-item.block_number, -item.index} {-item.block_number, -item.index}
end end
end) end)
|> Enum.dedup_by(fn item ->
case item do
{%Reward{} = emission_reward, _} ->
{emission_reward.block_hash, emission_reward.address_hash, emission_reward.address_type}
transaction ->
transaction.hash
end
end)
|> Enum.take(paging_options.page_size) |> Enum.take(paging_options.page_size)
else else
address_to_transactions_without_rewards(address_hash, paging_options, options) address_to_transactions_without_rewards(address_hash, options)
end end
end end
def address_to_transactions_without_rewards(address_hash, paging_options, options) do def address_to_transactions_without_rewards(address_hash, options) do
paging_options = Keyword.get(options, :paging_options, @default_paging_options)
address_hash
|> address_to_transactions_tasks(options)
|> wait_for_address_transactions()
|> Enum.sort_by(&{&1.block_number, &1.index}, &>=/2)
|> Enum.dedup_by(& &1.hash)
|> Enum.take(paging_options.page_size)
end
defp address_to_transactions_tasks(address_hash, options) do
paging_options = Keyword.get(options, :paging_options, @default_paging_options)
direction = Keyword.get(options, :direction) direction = Keyword.get(options, :direction)
necessity_by_association = Keyword.get(options, :necessity_by_association, %{}) necessity_by_association = Keyword.get(options, :necessity_by_association, %{})
transaction_hashes_from_token_transfers = base_query =
TokenTransfer.where_any_address_fields_match(direction, address_hash, paging_options)
paging_options paging_options
|> fetch_transactions() |> fetch_transactions()
|> Transaction.where_transaction_matches(transaction_hashes_from_token_transfers, direction, address_hash)
|> join_associations(necessity_by_association) |> join_associations(necessity_by_association)
|> Transaction.preload_token_transfers(address_hash) |> Transaction.preload_token_transfers(address_hash)
|> Repo.all()
direction_tasks =
base_query
|> Transaction.matching_address_queries_list(direction, address_hash)
|> Enum.map(fn query -> Task.async(fn -> Repo.all(query) end) end)
token_transfers_task =
Task.async(fn ->
transaction_hashes_from_token_transfers =
TokenTransfer.where_any_address_fields_match(direction, address_hash, paging_options)
final_query = where(base_query, [t], t.hash in ^transaction_hashes_from_token_transfers)
Repo.all(final_query)
end)
[token_transfers_task | direction_tasks]
end
defp wait_for_address_transactions(tasks) do
tasks
|> Task.yield_many(:timer.seconds(20))
|> Enum.flat_map(fn {_task, res} ->
case res do
{:ok, result} ->
result
{:exit, reason} ->
raise "Query fetching address transactions terminated: #{inspect(reason)}"
nil ->
raise "Query fetching address transactions timed out."
end
end)
end end
@spec address_to_logs(Hash.Address.t(), Keyword.t()) :: [Log.t()] @spec address_to_logs(Hash.Address.t(), Keyword.t()) :: [Log.t()]

@ -42,8 +42,9 @@ defmodule Explorer.Chain.AddressTransactionCsvExporter do
end end
defp fetch_all_transactions(address_hash, paging_options, acc \\ []) do defp fetch_all_transactions(address_hash, paging_options, acc \\ []) do
transactions = options = Keyword.put(@necessity_by_association, :paging_options, paging_options)
Chain.address_to_transactions_without_rewards(address_hash, paging_options, @necessity_by_association)
transactions = Chain.address_to_transactions_without_rewards(address_hash, options)
new_acc = transactions ++ acc new_acc = transactions ++ acc

@ -484,40 +484,26 @@ defmodule Explorer.Chain.Transaction do
end end
@doc """ @doc """
Modifies a query to filter for transactions whose hash is in a list or that are Produces a list of queries starting from the given one and adding filters for
linked to the given address_hash through a direction. transactions that are linked to the given address_hash through a direction.
Be careful to not pass a large list, because this will lead to performance
problems.
""" """
def where_transaction_matches(query, transaction_hashes, :from, address_hash) do def matching_address_queries_list(query, :from, address_hash) do
where( [where(query, [t], t.from_address_hash == ^address_hash)]
query,
[t],
t.hash in ^transaction_hashes or
t.from_address_hash == ^address_hash
)
end end
def where_transaction_matches(query, transaction_hashes, :to, address_hash) do def matching_address_queries_list(query, :to, address_hash) do
where( [
query, where(query, [t], t.to_address_hash == ^address_hash),
[t], where(query, [t], t.created_contract_address_hash == ^address_hash)
t.hash in ^transaction_hashes or ]
t.to_address_hash == ^address_hash or
t.created_contract_address_hash == ^address_hash
)
end end
def where_transaction_matches(query, transaction_hashes, _direction, address_hash) do def matching_address_queries_list(query, _direction, address_hash) do
where( [
query, where(query, [t], t.from_address_hash == ^address_hash),
[t], where(query, [t], t.to_address_hash == ^address_hash),
t.hash in ^transaction_hashes or where(query, [t], t.created_contract_address_hash == ^address_hash)
t.from_address_hash == ^address_hash or ]
t.to_address_hash == ^address_hash or
t.created_contract_address_hash == ^address_hash
)
end end
@collated_fields ~w(block_number cumulative_gas_used gas_used index)a @collated_fields ~w(block_number cumulative_gas_used gas_used index)a

Loading…
Cancel
Save