select queries by batches

pull/1577/head
Ayrat Badykov 6 years ago
parent e105bf5ba8
commit 99e30c5a52
No known key found for this signature in database
GPG Key ID: B44668E265E9396F
  1. 68
      apps/indexer/lib/indexer/temporary/addresses_without_code.ex
  2. 2
      apps/indexer/test/indexer/temporary/addresses_without_code_test.exs

@ -15,7 +15,7 @@ defmodule Indexer.Temporary.AddressesWithoutCode do
alias Indexer.Temporary.AddressesWithoutCode.TaskSupervisor
@task_options [max_concurrency: 3, timeout: :infinity]
@query_timeout :infinity
@batch_size 500
def start_link([fetcher, gen_server_options]) do
GenServer.start_link(__MODULE__, fetcher, gen_server_options)
@ -45,13 +45,6 @@ defmodule Indexer.Temporary.AddressesWithoutCode do
end
def fix_transaction_without_to_address_and_created_contract_address(fetcher) do
Logger.debug(
[
"Started the first query to fetch addresses without code"
],
fetcher: :addresses_without_code
)
query =
from(block in Block,
left_join: transaction in Transaction,
@ -62,34 +55,24 @@ defmodule Indexer.Temporary.AddressesWithoutCode do
distinct: block.hash
)
found_blocks = Repo.all(query, timeout: @query_timeout)
Logger.debug(
[
"Finished the first query to fetch blocks that need to be re-fetched. Number of records is #{
Enum.count(found_blocks)
}"
],
fetcher: :addresses_without_code
)
TaskSupervisor
|> Task.Supervisor.async_stream_nolink(
found_blocks,
fn block -> refetch_block(block, fetcher) end,
@task_options
)
|> Enum.to_list()
process_query(query, fetcher)
end
def fix_addresses_with_creation_transaction_but_without_code(fetcher) do
Logger.debug(
[
"Started the second query to fetch addresses without code"
],
fetcher: :addresses_without_code
)
defp process_query(query, fetcher) do
query_stream = Repo.stream(query, max_rows: @batch_size)
stream =
TaskSupervisor
|> Task.Supervisor.async_stream_nolink(
query_stream,
fn block -> refetch_block(block, fetcher) end,
@task_options
)
Repo.transaction(fn -> Stream.run(stream) end)
end
def fix_addresses_with_creation_transaction_but_without_code(fetcher) do
second_query =
from(block in Block,
left_join: transaction in Transaction,
@ -103,24 +86,7 @@ defmodule Indexer.Temporary.AddressesWithoutCode do
distinct: block.hash
)
second_found_blocks = Repo.all(second_query, timeout: @query_timeout)
Logger.debug(
[
"Finished the second query to fetch blocks that need to be re-fetched. Number of records is #{
Enum.count(second_found_blocks)
}"
],
fetcher: :addresses_without_code
)
TaskSupervisor
|> Task.Supervisor.async_stream_nolink(
second_found_blocks,
fn block -> refetch_block(block, fetcher) end,
@task_options
)
|> Enum.to_list()
process_query(second_query, fetcher)
end
def refetch_block(block, fetcher) do

@ -203,7 +203,7 @@ defmodule Indexer.Temporary.AddressesWithoutCodeTest do
|> Supervisor.child_spec()
|> ExUnit.Callbacks.start_supervised!()
Process.sleep(2_000)
Process.sleep(5_000)
updated_address =
from(a in Address, where: a.hash == ^address.hash, preload: :contracts_creation_transaction) |> Repo.one()

Loading…
Cancel
Save