Update tests for new task buffer behavior

Co-authored-by: Luke Imhoff <luke.imhoff@dockyard.com>
pull/218/head
Chris McCord 7 years ago committed by Luke Imhoff
parent 6b12526060
commit 95a380f7bb
  1. 57
      apps/explorer/lib/explorer/buffered_task.ex
  2. 15
      apps/explorer/lib/explorer/chain.ex
  3. 5
      apps/explorer/lib/explorer/indexer/address_balance_fetcher.ex
  4. 12
      apps/explorer/lib/explorer/indexer/block_fetcher.ex
  5. 7
      apps/explorer/lib/explorer/indexer/internal_transaction_fetcher.ex
  6. 12
      apps/explorer/test/explorer/buffered_task_test.exs
  7. 53
      apps/explorer/test/explorer/indexer/block_fetcher_test.exs

@ -1,6 +1,48 @@
defmodule Explorer.BufferedTask do
@moduledoc """
TODO
Provides a behaviour for batched task running with retries.
## Options
The following list of options may be passed when starting a child:
* `:name` - The optional registred name for the new process.
* `:flush_interval` - The required interval in milliseconds to flush the buffer.
* `:max_concurrency` - The required maximum number of tasks to run
concurrently at any give time.
* `:max_batch_size` - The required maximum batch passed to run callbacks.
* `:init_chunk_size` - The required chunk size to chunk init entries for
initial buffer population.
* `:task_supervisor` - The required `Task.Supervisor` name to spawn tasks under.
## Callbacks
The `init/2` is used for a task to populate the buffer on
boot with an initial set of entries. For example, the follow
callback would buffer all unfetched account balances on startup:
def init(acc, reducer) do
Chain.stream_unfetched_addresses([:hash], acc, fn %{hash: hash}, acc ->
reducer.(Hash.to_string(hash), acc)
end)
end
The `init/2` operation may be long running and allows concurrent calls to
`Explorer.BufferedTask.buffer/2` for on-demand entries. As concurrency becomes
available, the `run/2` callback of the task is invoked, with a list of batched
entries to be processed. For example, the `run/2` callback for above may look
like:
def run(string_hashes, _retries) do
case EthereumJSONRPC.fetch_balances_by_hash(string_hashes) do
{:ok, results} -> :ok = Chain.update_balances(results)
{:error, reason} -> {:retry, reason}
end
end
If a task crashes, it will be retries automatically with an increased `retries`
passed in as the second argument. Tasks may also be programmatically
retried by returning `{:retry, reason}` from `run/2`.
"""
use GenServer
require Logger
@ -37,10 +79,11 @@ defmodule Explorer.BufferedTask do
init_task: nil,
flush_timer: nil,
callback_module: callback_module,
task_supervisor: Keyword.fetch!(opts, :task_supervisor),
flush_interval: Keyword.fetch!(opts, :flush_interval),
max_batch_size: Keyword.fetch!(opts, :max_batch_size),
max_concurrency: Keyword.fetch!(opts, :max_concurrency),
stream_chunk_size: Keyword.fetch!(opts, :stream_chunk_size),
init_chunk_size: Keyword.fetch!(opts, :init_chunk_size),
current_buffer: [],
buffer: :queue.new(),
tasks: %{}
@ -97,7 +140,7 @@ defmodule Explorer.BufferedTask do
def handle_call(:debug_count, _from, state) do
count = length(state.current_buffer) + :queue.len(state.buffer) * state.max_batch_size
{:reply, count, state}
{:reply, %{buffer: count, tasks: Enum.count(state.tasks)}, state}
end
defp drop_task(state, ref) do
@ -126,12 +169,12 @@ defmodule Explorer.BufferedTask do
:queue.in({batch, retries}, que)
end
defp do_initial_stream(%{stream_chunk_size: stream_chunk_size} = state) do
defp do_initial_stream(%{init_chunk_size: init_chunk_size} = state) do
task =
Task.Supervisor.async(Explorer.TaskSupervisor, fn ->
Task.Supervisor.async(state.task_supervisor, fn ->
{0, []}
|> state.callback_module.init(fn
entry, {len, acc} when len + 1 >= stream_chunk_size ->
entry, {len, acc} when len + 1 >= init_chunk_size ->
[entry | acc]
|> chunk_into_queue(state)
|> async_perform(state.pid)
@ -185,7 +228,7 @@ defmodule Explorer.BufferedTask do
{{batch, retries}, new_queue} = take_batch(state)
task =
Task.Supervisor.async_nolink(Explorer.TaskSupervisor, fn ->
Task.Supervisor.async_nolink(state.task_supervisor, fn ->
{:performed, state.callback_module.run(batch, retries)}
end)

@ -1014,8 +1014,7 @@ defmodule Explorer.Chain do
When there are addresses, the `reducer` is called for each `t:Explorer.Chain.Address.t/0`.
iex> [first_address_hash, second_address_hash] = 2 |> insert_list(:address) |> Enum.map(& &1.hash)
iex> {:ok, address_hash_set} = Explorer.Chain.stream_unfetched_addresses(
...> [:hash],
iex> {:ok, address_hash_set} = Explorer.Chain.stream_unfetched_addresses([:hash],
...> MapSet.new([]),
...> fn %Explorer.Chain.Address{hash: hash}, acc ->
...> MapSet.put(acc, hash)
@ -1029,14 +1028,10 @@ defmodule Explorer.Chain do
When there are no addresses, the `reducer` is never called and the `initial` is returned in an `:ok` tuple.
iex> {:ok, pid} = Agent.start_link(fn -> 0 end)
iex> Explorer.Chain.stream_unfetched_addresses(
...> [:hash],
...> MapSet.new([]),
...> fn %Explorer.Chain.Address{hash: hash}, acc ->
...> Agent.update(pid, &(&1 + 1))
...> MapSet.put(acc, hash)
...> end
...> )
iex> Explorer.Chain.stream_unfetched_addresses([:hash], MapSet.new([]), fn %Explorer.Chain.Address{hash: hash}, acc ->
...> Agent.update(pid, &(&1 + 1))
...> MapSet.put(acc, hash)
...> end)
{:ok, MapSet.new([])}
iex> Agent.get(pid, & &1)
0

@ -11,9 +11,10 @@ defmodule Explorer.Indexer.AddressBalanceFetcher do
@defaults [
flush_interval: :timer.seconds(3),
max_batch_size: 100,
max_batch_size: 500,
max_concurrency: 4,
stream_chunk_size: 1000
init_chunk_size: 1000,
task_supervisor: Explorer.Indexer.TaskSupervisor
]
@doc """

@ -21,14 +21,14 @@ defmodule Explorer.Indexer.BlockFetcher do
# These are all the *default* values for options.
# DO NOT use them directly in the code. Get options from `state`.
@blocks_batch_size 50
@blocks_batch_size 10
@blocks_concurrency 10
# milliseconds
@block_rate 5_000
@receipts_batch_size 250
@receipts_concurrency 20
@receipts_concurrency 10
@doc """
Starts the server.
@ -121,13 +121,13 @@ defmodule Explorer.Indexer.BlockFetcher do
blocks: #{Chain.block_count()}
internal transactions: #{Chain.internal_transaction_count()}
logs: #{Chain.log_count()}
addresses: #{Chain.address_count()}
================================
deferred fetches
================================
address balances: #{BufferedTask.debug_count(AddressBalanceFetcher)}
internal transactions: #{BufferedTask.debug_count(InternalTransactionFetcher)}
address balances: #{inspect(BufferedTask.debug_count(AddressBalanceFetcher))}
internal transactions: #{inspect(BufferedTask.debug_count(InternalTransactionFetcher))}
"""
end)
@ -194,7 +194,7 @@ defmodule Explorer.Indexer.BlockFetcher do
%{transactions: transaction_hashes, addresses: address_hashes} = results
AddressBalanceFetcher.async_fetch_balances(address_hashes)
InternalTransactionFetcher.async_fetch(transaction_hashes)
InternalTransactionFetcher.async_fetch(transaction_hashes, 10_000)
end
defp missing_block_numbers(%{blocks_batch_size: blocks_batch_size}) do

@ -18,7 +18,8 @@ defmodule Explorer.Indexer.InternalTransactionFetcher do
flush_interval: :timer.seconds(3),
max_concurrency: @max_concurrency,
max_batch_size: @max_batch_size,
stream_chunk_size: 5000
init_chunk_size: 5000,
task_supervisor: Explorer.Indexer.TaskSupervisor
]
@doc """
@ -37,10 +38,10 @@ defmodule Explorer.Indexer.InternalTransactionFetcher do
*Note*: The internal transactions for individual transactions cannot be paginated,
so the total number of internal transactions that could be produced is unknown.
"""
def async_fetch(transaction_hashes) do
def async_fetch(transaction_hashes, timeout \\ 5000) do
string_hashes = for hash <- transaction_hashes, do: Hash.to_string(hash)
BufferedTask.buffer(__MODULE__, string_hashes)
BufferedTask.buffer(__MODULE__, string_hashes, timeout)
end
@doc false

@ -6,10 +6,16 @@ defmodule Explorer.BufferedTaskTest do
@max_batch_size 2
defp start_buffer(callback_module) do
start_supervised!({Task.Supervisor, name: BufferedTaskSup})
start_supervised(
{BufferedTask,
{callback_module,
flush_interval: 50, max_batch_size: @max_batch_size, max_concurrency: 2, stream_chunk_size: @max_batch_size * 2}}
task_supervisor: BufferedTaskSup,
flush_interval: 50,
max_batch_size: @max_batch_size,
max_concurrency: 2,
init_chunk_size: @max_batch_size * 2}}
)
end
@ -136,10 +142,10 @@ defmodule Explorer.BufferedTaskTest do
test "debug_count/1 returns count of buffered entries" do
{:ok, buffer} = start_buffer(RetryableTask)
assert 0 = BufferedTask.debug_count(buffer)
assert %{buffer: 0, tasks: 0} = BufferedTask.debug_count(buffer)
BufferedTask.buffer(buffer, [{:sleep, 100}])
BufferedTask.buffer(buffer, [{:sleep, 100}])
BufferedTask.buffer(buffer, [{:sleep, 100}])
assert 3 = BufferedTask.debug_count(buffer)
assert %{buffer: 3, tasks: 0} = BufferedTask.debug_count(buffer)
end
end

@ -6,7 +6,8 @@ defmodule Explorer.Indexer.BlockFetcherTest do
alias Explorer.Chain.{Address, Block, Log, Transaction}
alias Explorer.Indexer
alias Explorer.Indexer.{AddressBalanceFetcherCase, BlockFetcher, InternalTransactionFetcherCase, Sequence}
alias Explorer.Indexer.{AddressBalanceFetcherCase, BlockFetcher, InternalTransactionFetcher, InternalTransactionFetcherCase, Sequence}
@tag capture_log: true
@ -64,14 +65,22 @@ defmodule Explorer.Indexer.BlockFetcherTest do
:ok
end
@tag :capture_log
@heading "persisted counts"
test "without debug_logs", %{state: state} do
assert capture_log_at_level(:debug, fn ->
start_supervised!({Task.Supervisor, name: Explorer.Indexer.TaskSupervisor})
AddressBalanceFetcherCase.start_supervised!()
InternalTransactionFetcherCase.start_supervised!()
refute capture_log_at_level(:debug, fn ->
Indexer.disable_debug_logs()
BlockFetcher.handle_info(:debug_count, state)
end) == ""
end) =~ @heading
end
@tag :capture_log
test "with debug_logs", %{state: state} do
start_supervised!({Task.Supervisor, name: Explorer.Indexer.TaskSupervisor})
AddressBalanceFetcherCase.start_supervised!()
InternalTransactionFetcherCase.start_supervised!()
@ -81,6 +90,7 @@ defmodule Explorer.Indexer.BlockFetcherTest do
BlockFetcher.handle_info(:debug_count, state)
end)
assert log =~ @heading
assert log =~ "blocks: 4"
assert log =~ "internal transactions: 3"
assert log =~ "logs: 3"
@ -95,7 +105,7 @@ defmodule Explorer.Indexer.BlockFetcherTest do
start_supervised!({Task.Supervisor, name: Explorer.Indexer.TaskSupervisor})
AddressBalanceFetcherCase.start_supervised!()
InternalTransactionFetcherCase.start_supervised!()
{:ok, state} = BlockFetcher.init(debug_logs: false)
{:ok, state} = BlockFetcher.init([])
%{state: state}
end
@ -123,6 +133,9 @@ defmodule Explorer.Indexer.BlockFetcherTest do
transactions: []
}} = BlockFetcher.import_range({0, 0}, state, sequence)
wait_for_tasks(InternalTransactionFetcher)
wait_for_tasks(AddressBalanceFetcher)
assert Repo.aggregate(Block, :count, :hash) == 1
assert Repo.aggregate(Address, :count, :hash) == 1
end
@ -173,6 +186,9 @@ defmodule Explorer.Indexer.BlockFetcherTest do
]
}} = BlockFetcher.import_range({@first_full_block_number, @first_full_block_number}, state, sequence)
wait_for_tasks(InternalTransactionFetcher)
wait_for_tasks(AddressBalanceFetcher)
assert Repo.aggregate(Block, :count, :hash) == 1
assert Repo.aggregate(Address, :count, :hash) == 2
assert Repo.aggregate(Log, :count, :id) == 1
@ -211,6 +227,35 @@ defmodule Explorer.Indexer.BlockFetcherTest do
%{state: state}
end
defp wait_until(timeout, producer) do
parent = self()
ref = make_ref()
spawn(fn -> do_wait_until(parent, ref, producer) end)
receive do
{^ref, :ok} -> :ok
after
timeout -> exit(:timeout)
end
end
defp do_wait_until(parent, ref, producer) do
if producer.() do
send(parent, {ref, :ok})
else
:timer.sleep(100)
do_wait_until(parent, ref, producer)
end
end
defp wait_for_tasks(buffered_task) do
wait_until(5000, fn ->
counts = Explorer.BufferedTask.debug_count(buffered_task)
counts.buffer == 0 and counts.tasks == 0
end)
end
defp wait(producer) do
producer.()
rescue

Loading…
Cancel
Save