Optimize buffered task chunking

pull/218/head
Chris McCord 7 years ago committed by Luke Imhoff
parent c5037a089c
commit 6b12526060
  1. 124
      apps/explorer/lib/explorer/buffered_task.ex
  2. 84
      apps/explorer/lib/explorer/chain.ex
  3. 5
      apps/explorer/lib/explorer/indexer/address_balance_fetcher.ex
  4. 20
      apps/explorer/lib/explorer/indexer/block_fetcher.ex
  5. 9
      apps/explorer/lib/explorer/indexer/internal_transaction_fetcher.ex
  6. 2
      apps/explorer/lib/explorer/indexer/supervisor.ex
  7. 18
      apps/explorer/test/explorer/buffered_task_test.exs
  8. 3
      apps/explorer/test/explorer/indexer/block_fetcher_test.exs

@ -10,8 +10,16 @@ defmodule Explorer.BufferedTask do
@callback run(entries :: list, retries :: pos_integer) :: :ok | {:retry, reason :: term} | {:halt, reason :: term}
def buffer(server, entries) when is_list(entries) do
GenServer.call(server, {:buffer, entries})
@doc """
Buffers list of entries for future async execution.
"""
def buffer(server, entries, timeout \\ 5000) when is_list(entries) do
GenServer.call(server, {:buffer, entries}, timeout)
end
@doc false
def debug_count(server) do
GenServer.call(server, :debug_count)
end
def start_link({module, base_opts}) do
@ -25,11 +33,14 @@ defmodule Explorer.BufferedTask do
send(self(), :initial_stream)
state = %{
callback_module: callback_module,
pid: self(),
init_task: nil,
flush_timer: nil,
callback_module: callback_module,
flush_interval: Keyword.fetch!(opts, :flush_interval),
max_batch_size: Keyword.fetch!(opts, :max_batch_size),
max_concurrency: Keyword.fetch!(opts, :max_concurrency),
stream_chunk_size: Keyword.fetch!(opts, :stream_chunk_size),
current_buffer: [],
buffer: :queue.new(),
tasks: %{}
@ -46,10 +57,6 @@ defmodule Explorer.BufferedTask do
{:noreply, flush(state)}
end
def handle_info({:async_perform, entries}, state) do
{:noreply, spawn_next_batch(state, entries)}
end
def handle_info({ref, {:performed, :ok}}, state) do
{:noreply, drop_task(state, ref)}
end
@ -62,6 +69,14 @@ defmodule Explorer.BufferedTask do
{:noreply, drop_task(state, ref)}
end
def handle_info({ref, :ok}, %{init_task: ref} = state) do
{:noreply, state}
end
def handle_info({:DOWN, ref, :process, _pid, :normal}, %{init_task: ref} = state) do
{:noreply, %{state | init_task: :complete}}
end
def handle_info({:DOWN, _ref, :process, _pid, :normal}, state) do
{:noreply, state}
end
@ -70,13 +85,23 @@ defmodule Explorer.BufferedTask do
{:noreply, drop_task_and_retry(state, ref)}
end
def handle_call({:async_perform, stream_que}, _from, state) do
new_buffer = :queue.join(state.buffer, stream_que)
{:reply, :ok, spawn_next_batch(%{state | buffer: new_buffer})}
end
def handle_call({:buffer, entries}, _from, state) do
{:reply, :ok, buffer_entries(state, entries)}
end
def handle_call(:debug_count, _from, state) do
count = length(state.current_buffer) + :queue.len(state.buffer) * state.max_batch_size
{:reply, count, state}
end
defp drop_task(state, ref) do
schedule_async_perform([])
%{state | tasks: Map.delete(state.tasks, ref)}
spawn_next_batch(%{state | tasks: Map.delete(state.tasks, ref)})
end
defp drop_task_and_retry(state, ref) do
@ -90,44 +115,55 @@ defmodule Explorer.BufferedTask do
defp buffer_entries(state, []), do: state
defp buffer_entries(state, entries) do
current_buffer = entries ++ state.current_buffer
{batch, overflow} = Enum.split(current_buffer, state.max_batch_size)
if length(batch) == state.max_batch_size do
queue(%{state | current_buffer: overflow}, batch, 0)
else
%{state | current_buffer: current_buffer}
end
%{state | current_buffer: [entries | state.current_buffer]}
end
defp queue(state, batch, retries) do
%{state | buffer: :queue.in({batch, retries}, state.buffer)}
defp queue(%{} = state, batch, retries) do
%{state | buffer: queue(state.buffer, batch, retries)}
end
defp do_initial_stream(state) do
{0, []}
|> state.callback_module.init(fn entry, {len, acc} ->
batch = [entry | acc]
defp queue({_, _} = que, batch, retries) do
:queue.in({batch, retries}, que)
end
if len + 1 >= state.max_batch_size do
schedule_async_perform(Enum.reverse(batch))
defp do_initial_stream(%{stream_chunk_size: stream_chunk_size} = state) do
task =
Task.Supervisor.async(Explorer.TaskSupervisor, fn ->
{0, []}
else
{len + 1, batch}
end
end)
|> catchup_remaining()
|> state.callback_module.init(fn
entry, {len, acc} when len + 1 >= stream_chunk_size ->
[entry | acc]
|> chunk_into_queue(state)
|> async_perform(state.pid)
schedule_next_buffer_flush(state)
{0, []}
entry, {len, acc} ->
{len + 1, [entry | acc]}
end)
|> catchup_remaining(state)
end)
schedule_next_buffer_flush(%{state | init_task: task.ref})
end
defp catchup_remaining({:ok, {0, []}}), do: :ok
defp catchup_remaining({:ok, {0, []}}, _state), do: :ok
defp catchup_remaining({:ok, {_len, batch}}, state) do
batch
|> chunk_into_queue(state)
|> async_perform(state.pid)
defp catchup_remaining({:ok, {_len, batch}}) do
schedule_async_perform(Enum.reverse(batch))
:ok
end
defp chunk_into_queue(entries, state) do
entries
|> Enum.reverse()
|> Enum.chunk_every(state.max_batch_size)
|> Enum.reduce(:queue.new(), fn batch, acc -> queue(acc, batch, 0) end)
end
defp take_batch(state) do
case :queue.out(state.buffer) do
{{:value, batch}, new_queue} -> {batch, new_queue}
@ -135,8 +171,8 @@ defmodule Explorer.BufferedTask do
end
end
defp schedule_async_perform(entries, after_ms \\ 0) do
Process.send_after(self(), {:async_perform, entries}, after_ms)
defp async_perform(entries, dest) do
GenServer.call(dest, {:async_perform, entries})
end
defp schedule_next_buffer_flush(state) do
@ -144,9 +180,7 @@ defmodule Explorer.BufferedTask do
%{state | flush_timer: timer}
end
defp spawn_next_batch(state, entries) do
state = buffer_entries(state, entries)
defp spawn_next_batch(state) do
if Enum.count(state.tasks) < state.max_concurrency and :queue.len(state.buffer) > 0 do
{{batch, retries}, new_queue} = take_batch(state)
@ -162,14 +196,16 @@ defmodule Explorer.BufferedTask do
end
defp flush(%{current_buffer: []} = state) do
state |> spawn_next_batch([]) |> schedule_next_buffer_flush()
state |> spawn_next_batch() |> schedule_next_buffer_flush()
end
defp flush(%{current_buffer: current} = state) do
{batch, overflow} = Enum.split(current, state.max_batch_size)
%{state | current_buffer: overflow}
|> queue(batch, 0)
current
|> List.flatten()
|> Enum.chunk_every(state.max_batch_size)
|> Enum.reduce(%{state | current_buffer: []}, fn batch, state_acc ->
queue(state_acc, batch, 0)
end)
|> flush()
end
end

@ -3,17 +3,7 @@ defmodule Explorer.Chain do
The chain context.
"""
import Ecto.Query,
only: [
from: 2,
join: 4,
or_where: 3,
order_by: 2,
order_by: 3,
preload: 2,
where: 2,
where: 3
]
import Ecto.Query, only: [from: 2, join: 4, or_where: 3, order_by: 2, order_by: 3, preload: 2, where: 2, where: 3]
alias Ecto.{Changeset, Multi}
@ -154,7 +144,7 @@ defmodule Explorer.Chain do
# timeouts all in milliseconds
@transaction_timeout 60_000
@transaction_timeout 120_000
@insert_addresses_timeout 60_000
@insert_blocks_timeout 60_000
@insert_internal_transactions_timeout 60_000
@ -199,14 +189,19 @@ defmodule Explorer.Chain do
# MUST match order used in `insert_addresses/2`
ordered_changes_list = sort_address_changes_list(changes_list)
{_, _} =
Repo.safe_insert_all(
Address,
ordered_changes_list,
conflict_target: :hash,
on_conflict: :replace_all,
timeout: Keyword.get(options, :timeout, @insert_addresses_timeout)
)
Repo.transaction(
fn ->
{_, _} =
Repo.safe_insert_all(
Address,
ordered_changes_list,
conflict_target: :hash,
on_conflict: :replace_all,
timeout: Keyword.get(options, :timeout, @insert_addresses_timeout)
)
end,
timeout: @transaction_timeout
)
:ok
end
@ -1020,6 +1015,7 @@ defmodule Explorer.Chain do
iex> [first_address_hash, second_address_hash] = 2 |> insert_list(:address) |> Enum.map(& &1.hash)
iex> {:ok, address_hash_set} = Explorer.Chain.stream_unfetched_addresses(
...> [:hash],
...> MapSet.new([]),
...> fn %Explorer.Chain.Address{hash: hash}, acc ->
...> MapSet.put(acc, hash)
@ -1033,37 +1029,47 @@ defmodule Explorer.Chain do
When there are no addresses, the `reducer` is never called and the `initial` is returned in an `:ok` tuple.
iex> {:ok, pid} = Agent.start_link(fn -> 0 end)
iex> Explorer.Chain.stream_unfetched_addresses(MapSet.new([]), fn %Explorer.Chain.Address{hash: hash}, acc ->
...> Agent.update(pid, &(&1 + 1))
...> MapSet.put(acc, hash)
...> end)
iex> Explorer.Chain.stream_unfetched_addresses(
...> [:hash],
...> MapSet.new([]),
...> fn %Explorer.Chain.Address{hash: hash}, acc ->
...> Agent.update(pid, &(&1 + 1))
...> MapSet.put(acc, hash)
...> end
...> )
{:ok, MapSet.new([])}
iex> Agent.get(pid, & &1)
0
"""
def stream_unfetched_addresses(initial, reducer) when is_function(reducer) do
Repo.transaction(fn ->
query = from(a in Address, where: is_nil(a.balance_fetched_at))
def stream_unfetched_addresses(fields, initial, reducer) when is_function(reducer) do
Repo.transaction(
fn ->
query = from(a in Address, where: is_nil(a.balance_fetched_at), select: ^fields)
query
|> Repo.stream()
|> Enum.reduce(initial, reducer)
end)
query
|> Repo.stream(timeout: :infinity)
|> Enum.reduce(initial, reducer)
end,
timeout: :infinity
)
end
@doc """
Returns a stream of all transactions with unfetched internal transactions.
"""
def stream_transactions_with_unfetched_internal_transactions(initial, reducer)
def stream_transactions_with_unfetched_internal_transactions(fields, initial, reducer)
when is_function(reducer) do
Repo.transaction(fn ->
query = from(t in Transaction, where: is_nil(t.internal_transactions_indexed_at))
Repo.transaction(
fn ->
query = from(t in Transaction, where: is_nil(t.internal_transactions_indexed_at), select: ^fields)
query
|> Repo.stream()
|> Enum.reduce(initial, reducer)
end)
query
|> Repo.stream(timeout: :infinity)
|> Enum.reduce(initial, reducer)
end,
timeout: :infinity
)
end
@doc """
@ -1142,7 +1148,7 @@ defmodule Explorer.Chain do
query = from(b in Block, select: b.number, order_by: [asc: b.number])
query
|> Repo.stream(max_rows: 1000)
|> Repo.stream(max_rows: 1000, timeout: :infinity)
|> Enum.reduce({-1, 0, []}, fn
num, {prev, missing_count, acc} when prev + 1 == num ->
{num, missing_count, acc}

@ -12,7 +12,8 @@ defmodule Explorer.Indexer.AddressBalanceFetcher do
@defaults [
flush_interval: :timer.seconds(3),
max_batch_size: 100,
max_concurrency: 2
max_concurrency: 4,
stream_chunk_size: 1000
]
@doc """
@ -31,7 +32,7 @@ defmodule Explorer.Indexer.AddressBalanceFetcher do
@impl BufferedTask
def init(acc, reducer) do
Chain.stream_unfetched_addresses(acc, fn %Address{hash: hash}, acc ->
Chain.stream_unfetched_addresses([:hash], acc, fn %Address{hash: hash}, acc ->
reducer.(Hash.to_string(hash), acc)
end)
end

@ -11,7 +11,7 @@ defmodule Explorer.Indexer.BlockFetcher do
alias EthereumJSONRPC
alias EthereumJSONRPC.Transactions
alias Explorer.{Chain, Indexer}
alias Explorer.{BufferedTask, Chain, Indexer}
alias Explorer.Indexer.BlockFetcher.AddressExtraction
alias Explorer.Indexer.{AddressBalanceFetcher, InternalTransactionFetcher, Sequence}
@ -21,12 +21,9 @@ defmodule Explorer.Indexer.BlockFetcher do
# These are all the *default* values for options.
# DO NOT use them directly in the code. Get options from `state`.
@blocks_batch_size 100
@blocks_batch_size 50
@blocks_concurrency 10
@internal_transactions_batch_size 50
@internal_transactions_concurrency 8
# milliseconds
@block_rate 5_000
@ -73,10 +70,6 @@ defmodule Explorer.Indexer.BlockFetcher do
realtime_interval: (opts[:block_rate] || @block_rate) * 2,
blocks_batch_size: Keyword.get(opts, :blocks_batch_size, @blocks_batch_size),
blocks_concurrency: Keyword.get(opts, :blocks_concurrency, @blocks_concurrency),
internal_transactions_batch_size:
Keyword.get(opts, :internal_transactions_batch_size, @internal_transactions_batch_size),
internal_transactions_concurrency:
Keyword.get(opts, :internal_transactions_concurrency, @internal_transactions_concurrency),
receipts_batch_size: Keyword.get(opts, :receipts_batch_size, @receipts_batch_size),
receipts_concurrency: Keyword.get(opts, :receipts_concurrency, @receipts_concurrency)
}
@ -124,10 +117,17 @@ defmodule Explorer.Indexer.BlockFetcher do
================================
persisted counts
================================
addresses: #{Chain.address_count()}
blocks: #{Chain.block_count()}
internal transactions: #{Chain.internal_transaction_count()}
logs: #{Chain.log_count()}
addresses: #{Chain.address_count()}
================================
deferred fetches
================================
address balances: #{BufferedTask.debug_count(AddressBalanceFetcher)}
internal transactions: #{BufferedTask.debug_count(InternalTransactionFetcher)}
"""
end)

@ -12,12 +12,13 @@ defmodule Explorer.Indexer.InternalTransactionFetcher do
@behaviour BufferedTask
@max_batch_size 50
@max_concurrency 8
@max_batch_size 10
@max_concurrency 4
@defaults [
flush_interval: :timer.seconds(3),
max_concurrency: @max_concurrency,
max_batch_size: @max_batch_size
max_batch_size: @max_batch_size,
stream_chunk_size: 5000
]
@doc """
@ -50,7 +51,7 @@ defmodule Explorer.Indexer.InternalTransactionFetcher do
@impl BufferedTask
def init(acc, reducer) do
Chain.stream_transactions_with_unfetched_internal_transactions(acc, fn %Transaction{hash: hash}, acc ->
Chain.stream_transactions_with_unfetched_internal_transactions([:hash], acc, fn %Transaction{hash: hash}, acc ->
reducer.(Hash.to_string(hash), acc)
end)
end

@ -20,6 +20,6 @@ defmodule Explorer.Indexer.Supervisor do
{BlockFetcher, []}
]
Supervisor.init(children, strategy: :rest_for_one)
Supervisor.init(children, strategy: :one_for_one)
end
end

@ -7,7 +7,9 @@ defmodule Explorer.BufferedTaskTest do
defp start_buffer(callback_module) do
start_supervised(
{BufferedTask, {callback_module, flush_interval: 50, max_batch_size: @max_batch_size, max_concurrency: 2}}
{BufferedTask,
{callback_module,
flush_interval: 50, max_batch_size: @max_batch_size, max_concurrency: 2, stream_chunk_size: @max_batch_size * 2}}
)
end
@ -56,6 +58,11 @@ defmodule Explorer.BufferedTaskTest do
:ok
end
def run([{:sleep, time}], _) do
:timer.sleep(time)
:ok
end
def run(batch, retries) when retries < 2 do
send(__MODULE__, {:run, {retries, batch}})
{:retry, :because_reasons}
@ -126,4 +133,13 @@ defmodule Explorer.BufferedTaskTest do
assert_receive {:final_run, {2, [3]}}
refute_receive _
end
test "debug_count/1 returns count of buffered entries" do
{:ok, buffer} = start_buffer(RetryableTask)
assert 0 = BufferedTask.debug_count(buffer)
BufferedTask.buffer(buffer, [{:sleep, 100}])
BufferedTask.buffer(buffer, [{:sleep, 100}])
BufferedTask.buffer(buffer, [{:sleep, 100}])
assert 3 = BufferedTask.debug_count(buffer)
end
end

@ -72,6 +72,9 @@ defmodule Explorer.Indexer.BlockFetcherTest do
end
test "with debug_logs", %{state: state} do
AddressBalanceFetcherCase.start_supervised!()
InternalTransactionFetcherCase.start_supervised!()
log =
capture_log_at_level(:debug, fn ->
Indexer.enable_debug_logs()

Loading…
Cancel
Save