Add initial BufferedTask behaviour

Add internal_transactions_indexed_at to transactions schema
for catching up on deferred internal transactions on boot.
pull/218/head
Chris McCord 7 years ago committed by Luke Imhoff
parent 5fe8abd863
commit e813045564
  1. 162
      apps/explorer/lib/explorer/buffered_task.ex
  2. 26
      apps/explorer/lib/explorer/chain.ex
  3. 3
      apps/explorer/lib/explorer/chain/transaction.ex
  4. 151
      apps/explorer/lib/explorer/indexer/address_balance_fetcher.ex
  5. 12
      apps/explorer/lib/explorer/indexer/block_fetcher.ex
  6. 37
      apps/explorer/lib/explorer/indexer/internal_transaction_fetcher.ex
  7. 3
      apps/explorer/lib/explorer/indexer/supervisor.ex
  8. 4
      apps/explorer/priv/repo/migrations/20180117221923_create_transactions.exs
  9. 98
      apps/explorer/test/explorer/buffered_task_test.exs
  10. 18
      apps/explorer/test/explorer/indexer/address_balance_fetcher_test.exs
  11. 4
      apps/explorer/test/explorer/indexer/block_fetcher_test.exs
  12. 15
      apps/explorer/test/support/indexer/address_balance_fetcher_case.ex
  13. 2
      coveralls.json

@ -0,0 +1,162 @@
defmodule Explorer.BufferedTask do
@moduledoc """
TODO
"""
use GenServer
require Logger
@callback init(initial :: term, reducer :: function) ::
{:ok, accumulated_results :: term | initial :: term} | {:error, reason :: term}
@callback run(entries :: list) :: :ok | {:retry, reason :: term} | {:halt, reason :: term}
@flush_interval :timer.seconds(3)
def buffer(server, entry) do
GenServer.call(server, {:buffer, entry})
end
def start_link({module, base_opts}) do
default_opts = Application.fetch_env!(:explorer, :indexer)
opts = Keyword.merge(default_opts, base_opts)
GenServer.start_link(__MODULE__, {module, opts}, name: opts[:name])
end
def init({callback_module, opts}) do
send(self(), :initial_stream)
state = %{
callback_module: callback_module,
debug_logs: Keyword.get(opts, :debug_logs, false),
flush_timer: nil,
flush_interval: Keyword.get(opts, :flush_interval, @flush_interval),
max_batch_size: Keyword.fetch!(opts, :max_batch_size),
max_concurrency: Keyword.fetch!(opts, :max_concurrency),
buffer: :queue.new(),
tasks: %{}
}
{:ok, state}
end
def handle_info(:initial_stream, state) do
{:noreply, do_initial_stream(state)}
end
def handle_info(:flush, state) do
{:noreply, state |> spawn_next_batch([]) |> schedule_next_buffer_flush()}
end
def handle_info({:async_perform, entries}, state) do
{:noreply, spawn_next_batch(state, entries)}
end
def handle_info({ref, {:performed, :ok}}, state) do
{:noreply, drop_task(state, ref)}
end
def handle_info({ref, {:performed, {:retry, _reason}}}, state) do
{:noreply, drop_task_and_retry(state, ref)}
end
def handle_info({ref, {:performed, {:halt, _reason}}}, state) do
{:noreply, drop_task(state, ref)}
end
def handle_info({:DOWN, _ref, :process, _pid, :normal}, state) do
{:noreply, state}
end
def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do
{:noreply, drop_task_and_retry(state, ref)}
end
def handle_call({:buffer, entries}, _from, state) do
{:reply, :ok, buffer_entries(state, entries)}
end
defp drop_task(state, ref) do
schedule_async_perform([])
%{state | tasks: Map.delete(state.tasks, ref)}
end
defp drop_task_and_retry(state, ref) do
batch = Map.fetch!(state.tasks, ref)
state
|> drop_task(ref)
|> buffer_entries(batch)
end
defp buffer_entries(state, entries) do
%{state | buffer: :queue.join(state.buffer, :queue.from_list(entries))}
end
defp do_initial_stream(state) do
state.buffer
|> state.callback_module.init(fn entry, acc ->
batch = :queue.in(entry, acc)
if :queue.len(batch) >= state.max_batch_size do
schedule_async_perform(:queue.to_list(batch))
:queue.new()
else
batch
end
end)
|> catchup_remaining()
schedule_next_buffer_flush(state)
end
defp catchup_remaining({:ok, batch}) do
if :queue.len(batch) > 0 do
schedule_async_perform(:queue.to_list(batch))
end
:ok
end
defp take_batch(state) do
{entries, remaining_queue} =
Enum.reduce_while(1..state.max_batch_size, {[], state.buffer}, fn _, {entries, queue_acc} ->
case :queue.out(queue_acc) do
{{:value, entry}, new_queue} -> {:cont, {[entry | entries], new_queue}}
{:empty, new_queue} -> {:halt, {entries, new_queue}}
end
end)
{Enum.reverse(entries), remaining_queue}
end
defp schedule_async_perform(entries, after_ms \\ 0) do
Process.send_after(self(), {:async_perform, entries}, after_ms)
end
defp schedule_next_buffer_flush(state) do
timer = Process.send_after(self(), :flush, state.flush_interval)
%{state | flush_timer: timer}
end
defp spawn_next_batch(state, entries) do
state = buffer_entries(state, entries)
if Enum.count(state.tasks) < state.max_concurrency and :queue.len(state.buffer) > 0 do
{batch, new_queue} = take_batch(state)
task =
Task.Supervisor.async_nolink(Explorer.TaskSupervisor, fn ->
debug(state, fn -> "processing #{Enum.count(batch)} entries for #{inspect(state.callback_module)}" end)
{:performed, state.callback_module.run(batch)}
end)
%{state | tasks: Map.put(state.tasks, task.ref, batch), buffer: new_queue}
else
state
end
end
defp debug(%{debug_logs: true}, func), do: Logger.debug(func)
defp debug(%{debug_logs: false}, _func), do: :noop
end

@ -999,6 +999,17 @@ defmodule Explorer.Chain do
end)
end
def stream_transactions_with_unfetched_internal_transactions(initial, reducer)
when is_function(reducer) do
Repo.transaction(fn ->
query = from(t in Transaction, where: is_nil(t.internal_transactions_indexed_at))
query
|> Repo.stream()
|> Enum.reduce(initial, reducer)
end)
end
@doc """
The number of `t:Explorer.Chain.Log.t/0`.
@ -1663,6 +1674,21 @@ defmodule Explorer.Chain do
|> Repo.transaction(timeout: Keyword.get(options, :transaction_timeout, @transaction_timeout))
end
def import_internal_transactions(internal_transactions_params) do
changes =
ecto_schema_module_to_params_list_to_ecto_schema_module_to_changes_list(%{
InternalTransaction => internal_transactions_params
})
Repo.transaction(fn ->
insert_internal_transactions(
changes,
timestamps: timestamps(),
timeout: @insert_internal_transactions_timeout
)
end)
end
@spec insert_internal_transactions([map()], [timestamps_option]) ::
{:ok, [%{index: non_neg_integer, transaction_hash: Hash.t()}]}
| {:error, [Changeset.t()]}

@ -88,6 +88,7 @@ defmodule Explorer.Chain.Transaction do
* `input`- data sent along with the transaction
* `internal_transactions` - transactions (value transfers) created while executing contract used for this
transaction
* `internal_transactions_indexed_at` - when `internal_transactions` were fetched by `Explorer.Indexer`.
* `logs` - events that occurred while mining the `transaction`.
* `nonce` - the number of transaction made by the sender prior to this one
* `public_key` - public key of the signer of the transaction
@ -116,6 +117,7 @@ defmodule Explorer.Chain.Transaction do
index: non_neg_integer() | nil,
input: Data.t(),
internal_transactions: %Ecto.Association.NotLoaded{} | [InternalTransaction.t()],
internal_transactions_indexed_at: Timex.Ecto.DateTime.t(),
logs: %Ecto.Association.NotLoaded{} | [Log.t()],
nonce: non_neg_integer(),
public_key: public_key(),
@ -137,6 +139,7 @@ defmodule Explorer.Chain.Transaction do
field(:gas_price, Wei)
field(:gas_used, :decimal)
field(:index, :integer)
field(:internal_transactions_indexed_at, Timex.Ecto.DateTime)
field(:input, Data)
field(:nonce, :integer)
field(:public_key, Data)

@ -1,157 +1,28 @@
defmodule Explorer.Indexer.AddressBalanceFetcher do
@moduledoc """
Fetches and indexes `t:Explorer.Chain.Address.t/0` balances.
Fetches `t:Explorer.Chain.Address.t/0` `fetched_balance`.
"""
use GenServer
require Logger
alias EthereumJSONRPC
alias Explorer.Chain
alias Explorer.Chain.{Address, Hash}
alias Explorer.{BufferedTask, Chain}
alias Explorer.Chain.{Hash, Address}
@fetch_interval :timer.seconds(3)
@max_batch_size 100
@max_concurrency 2
@behaviour BufferedTask
def async_fetch_balances(address_hashes) do
GenServer.cast(__MODULE__, {:buffer_addresses, address_hashes})
end
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(opts) do
opts = Keyword.merge(Application.fetch_env!(:explorer, :indexer), opts)
send(self(), :fetch_unfetched_addresses)
state = %{
debug_logs: Keyword.get(opts, :debug_logs, false),
flush_timer: nil,
fetch_interval: Keyword.get(opts, :fetch_interval, @fetch_interval),
max_batch_size: Keyword.get(opts, :max_batch_size, @max_batch_size),
buffer: :queue.new(),
tasks: %{}
}
{:ok, state}
end
def handle_info(:fetch_unfetched_addresses, state) do
{:noreply, stream_unfetched_addresses(state)}
end
def handle_info(:flush, state) do
{:noreply, state |> fetch_next_batch([]) |> schedule_next_buffer_flush()}
end
def handle_info({:async_fetch, hashes}, state) do
{:noreply, fetch_next_batch(state, hashes)}
end
def handle_info({ref, {:fetched_balances, results}}, state) do
:ok = Chain.update_balances(results)
{:noreply, drop_task(state, ref)}
end
def handle_info({:DOWN, _ref, :process, _pid, :normal}, state) do
{:noreply, state}
end
def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do
batch = Map.fetch!(state.tasks, ref)
new_state =
state
|> drop_task(ref)
|> buffer_addresses(batch)
{:noreply, new_state}
end
def handle_cast({:buffer_addresses, address_hashes}, state) do
string_hashes = for hash <- address_hashes, do: Hash.to_string(hash)
{:noreply, buffer_addresses(state, string_hashes)}
BufferedTask.buffer(__MODULE__, string_hashes)
end
defp drop_task(state, ref) do
schedule_async_fetch([])
%{state | tasks: Map.delete(state.tasks, ref)}
end
defp buffer_addresses(state, string_hashes) do
%{state | buffer: :queue.join(state.buffer, :queue.from_list(string_hashes))}
end
defp stream_unfetched_addresses(state) do
state.buffer
|> Chain.stream_unfetched_addresses(fn %Address{hash: hash}, batch ->
batch = :queue.in(Hash.to_string(hash), batch)
if :queue.len(batch) >= state.max_batch_size do
schedule_async_fetch(:queue.to_list(batch))
:queue.new()
else
batch
end
def init(acc, reducer) do
Chain.stream_unfetched_addresses(acc, fn %Address{hash: hash}, acc ->
reducer.(Hash.to_string(hash), acc)
end)
|> fetch_remaining()
schedule_next_buffer_flush(state)
end
defp fetch_remaining({:ok, batch}) do
if :queue.len(batch) > 0 do
schedule_async_fetch(:queue.to_list(batch))
end
def run(string_hashes) do
{:ok, results} = EthereumJSONRPC.fetch_balances_by_hash(string_hashes)
:ok = Chain.update_balances(results)
:ok
end
defp do_fetch_addresses(address_hashes) do
EthereumJSONRPC.fetch_balances_by_hash(address_hashes)
end
defp take_batch(queue) do
{hashes, remaining_queue} =
Enum.reduce_while(1..@max_batch_size, {[], queue}, fn _, {hashes, queue_acc} ->
case :queue.out(queue_acc) do
{{:value, hash}, new_queue} -> {:cont, {[hash | hashes], new_queue}}
{:empty, new_queue} -> {:halt, {hashes, new_queue}}
end
end)
{Enum.reverse(hashes), remaining_queue}
end
defp schedule_async_fetch(hashes, after_ms \\ 0) do
Process.send_after(self(), {:async_fetch, hashes}, after_ms)
end
defp schedule_next_buffer_flush(state) do
timer = Process.send_after(self(), :flush, state.fetch_interval)
%{state | flush_timer: timer}
end
defp fetch_next_batch(state, hashes) do
state = buffer_addresses(state, hashes)
if Enum.count(state.tasks) < @max_concurrency and :queue.len(state.buffer) > 0 do
{batch, new_queue} = take_batch(state.buffer)
task =
Task.Supervisor.async_nolink(Explorer.Indexer.TaskSupervisor, fn ->
debug(state, fn -> "fetching #{Enum.count(batch)} balances" end)
{:ok, balances} = do_fetch_addresses(batch)
{:fetched_balances, balances}
end)
%{state | tasks: Map.put(state.tasks, task.ref, batch), buffer: new_queue}
else
buffer_addresses(state, hashes)
end
end
defp debug(%{debug_logs: true}, func), do: Logger.debug(func)
defp debug(%{debug_logs: false}, _func), do: :noop
end

@ -21,7 +21,7 @@ defmodule Explorer.Indexer.BlockFetcher do
@debug_logs false
@blocks_batch_size 10
@blocks_batch_size 100
@blocks_concurrency 10
@internal_transactions_batch_size 50
@ -202,9 +202,9 @@ defmodule Explorer.Indexer.BlockFetcher do
end
defp insert(%{} = state, seq, range, params) do
with {:ok, %{addresses: address_hashes}} = ok <- Chain.import_blocks(params) do
:ok = AddressBalanceFetcher.async_fetch_balances(address_hashes)
ok
with {:ok, results} <- Chain.import_blocks(params) do
post_block_insert_triggers(results)
{:ok, results}
else
{:error, step, reason} = error ->
debug(state, fn ->
@ -217,6 +217,10 @@ defmodule Explorer.Indexer.BlockFetcher do
end
end
defp post_block_insert_triggers(%{transactions: _transactions, addresses: address_hashes}) do
AddressBalanceFetcher.async_fetch_balances(address_hashes)
end
defp missing_block_numbers(%{blocks_batch_size: blocks_batch_size}) do
{count, missing_ranges} = Chain.missing_block_numbers()

@ -0,0 +1,37 @@
defmodule Explorer.Indexer.InternalTransactionFetcher do
@moduledoc """
Fetches and indexes `t:Explorer.Chain.InternalTransaction.t/0`.
"""
alias Explorer.{BufferedTask, Chain}
alias Explorer.Chain.{Hash, Transaction}
@behaviour BufferedTask
def async_fetch(transactions) do
string_hashes =
Enum.map(transactions, fn %Transaction{hash: hash} ->
Hash.to_string(hash)
end)
BufferedTask.buffer(__MODULE__, string_hashes)
end
def init(acc, reducer) do
Chain.stream_transactions_with_unfetched_internal_transactions(acc, fn %Transaction{hash: hash}, acc ->
reducer.(Hash.to_string(hash), acc)
end)
end
def run(transaction_hashes) do
case EthereumJSONRPC.fetch_internal_transactions(transaction_hashes) do
{:ok, internal_params} ->
{:ok, _} = Chain.import_internal_transactions(internal_params)
:ok
{:error, reason} ->
{:retry, reason}
end
end
end

@ -5,6 +5,7 @@ defmodule Explorer.Indexer.Supervisor do
use Supervisor
alias Explorer.BufferedTask
alias Explorer.Indexer.{AddressBalanceFetcher, BlockFetcher}
def start_link(opts) do
@ -15,7 +16,7 @@ defmodule Explorer.Indexer.Supervisor do
def init(_opts) do
children = [
{Task.Supervisor, name: Explorer.Indexer.TaskSupervisor},
{AddressBalanceFetcher, []},
{BufferedTask, {AddressBalanceFetcher, max_batch_size: 100, max_concurrency: 2, name: AddressBalanceFetcher}},
{BlockFetcher, []}
]

@ -18,6 +18,10 @@ defmodule Explorer.Repo.Migrations.CreateTransactions do
add(:index, :integer, null: true)
add(:input, :bytea, null: false)
# `null` when `internal_transactions` has never been fetched
add(:internal_transactions_indexed_at, :utc_datetime, null: true)
add(:nonce, :integer, null: false)
add(:public_key, :bytea, null: false)
add(:r, :numeric, precision: 100, null: false)

@ -0,0 +1,98 @@
defmodule Explorer.BufferedTaskTest do
use ExUnit.Case, async: true
alias Explorer.BufferedTask
@max_batch_size 2
defp start_buffer(callback_module) do
start_supervised(
{BufferedTask, {callback_module, flush_interval: 50, max_batch_size: @max_batch_size, max_concurrency: 2}}
)
end
defmodule CounterTask do
@behaviour BufferedTask
def initial_collection, do: for(i <- 1..11, do: "#{i}")
def init(acc, reducer) do
{:ok, Enum.reduce(initial_collection(), acc, fn item, acc -> reducer.(item, acc) end)}
end
def run(batch) do
send(__MODULE__, {:run, batch})
:ok
end
end
defmodule FunTask do
@behaviour BufferedTask
def init(acc, _reducer) do
{:ok, acc}
end
def run([agent, func]) when is_function(func) do
count = Agent.get_and_update(agent, &{&1, &1 + 1})
send(__MODULE__, {:run, count})
func.(count)
end
def run(batch) do
send(__MODULE__, {:run, batch})
:ok
end
end
test "init allows buffer to be loaded up with initial entries" do
Process.register(self(), CounterTask)
{:ok, buffer} = start_buffer(CounterTask)
CounterTask.initial_collection()
|> Enum.chunk_every(@max_batch_size)
|> Enum.each(fn batch ->
assert_receive {:run, ^batch}
end)
refute_receive _
BufferedTask.buffer(buffer, ~w(12 13 14 15 16))
assert_receive {:run, ~w(12 13)}
assert_receive {:run, ~w(14 15)}
assert_receive {:run, ~w(16)}
refute_receive _
end
test "init with zero entries schedules future buffer flushes" do
Process.register(self(), FunTask)
{:ok, buffer} = start_buffer(FunTask)
refute_receive _
BufferedTask.buffer(buffer, ~w(some more entries))
assert_receive {:run, ~w(some more)}
assert_receive {:run, ~w(entries)}
refute_receive _
end
test "run/1 allows tasks to be programmatically retried" do
Process.register(self(), FunTask)
{:ok, buffer} = start_buffer(FunTask)
{:ok, count} = Agent.start_link(fn -> 1 end)
BufferedTask.buffer(buffer, [
count,
fn
1 -> {:retry, :because_reasons}
2 -> {:retry, :because_reasons}
3 -> :ok
end
])
assert_receive {:run, 1}
assert_receive {:run, 2}
assert_receive {:run, 3}
refute_receive _
end
end

@ -4,7 +4,7 @@ defmodule Explorer.Indexer.AddressBalanceFetcherTest do
use Explorer.DataCase, async: false
alias Explorer.Chain.Address
alias Explorer.Indexer.AddressBalanceFetcher
alias Explorer.Indexer.{AddressBalanceFetcher, AddressBalanceFetcherCase}
@hash %Explorer.Chain.Hash{
byte_count: 20,
@ -24,7 +24,7 @@ defmodule Explorer.Indexer.AddressBalanceFetcherTest do
assert unfetched_address.fetched_balance == nil
assert unfetched_address.balance_fetched_at == nil
start_address_fetcher()
AddressBalanceFetcherCase.start_supervised!()
fetched_address =
wait(fn ->
@ -37,7 +37,7 @@ defmodule Explorer.Indexer.AddressBalanceFetcherTest do
test "fetches unfetched addresses when less than max batch size" do
insert(:address, hash: @hash)
start_address_fetcher(max_batch_size: 2)
AddressBalanceFetcherCase.start_supervised!(max_batch_size: 2)
fetched_address =
wait(fn ->
@ -50,7 +50,7 @@ defmodule Explorer.Indexer.AddressBalanceFetcherTest do
describe "async_fetch_balances/1" do
test "fetches balances for address_hashes" do
start_address_fetcher()
AddressBalanceFetcherCase.start_supervised!()
assert :ok = AddressBalanceFetcher.async_fetch_balances([@hash])
@ -63,16 +63,6 @@ defmodule Explorer.Indexer.AddressBalanceFetcherTest do
end
end
defp start_address_fetcher(options \\ []) when is_list(options) do
start_supervised!(
{AddressBalanceFetcher,
Keyword.merge(
[debug_logs: false, fetch_interval: 1, max_batch_size: 1, max_concurrency: 1],
options
)}
)
end
defp wait(producer) do
producer.()
rescue

@ -5,7 +5,7 @@ defmodule Explorer.Indexer.BlockFetcherTest do
import ExUnit.CaptureLog
alias Explorer.Chain.{Address, Block, InternalTransaction, Log, Transaction}
alias Explorer.Indexer.{BlockFetcher, Sequence}
alias Explorer.Indexer.{AddressBalanceFetcherCase, BlockFetcher, Sequence}
@tag capture_log: true
@ -32,6 +32,7 @@ defmodule Explorer.Indexer.BlockFetcherTest do
assert Repo.aggregate(Block, :count, :hash) == 0
start_supervised!({Task.Supervisor, name: Explorer.Indexer.TaskSupervisor})
AddressBalanceFetcherCase.start_supervised!()
start_supervised!(BlockFetcher)
wait(fn ->
@ -85,6 +86,7 @@ defmodule Explorer.Indexer.BlockFetcherTest do
setup do
start_supervised!({Task.Supervisor, name: Explorer.Indexer.TaskSupervisor})
AddressBalanceFetcherCase.start_supervised!()
{:ok, state} = BlockFetcher.init(debug_logs: false)

@ -0,0 +1,15 @@
defmodule Explorer.Indexer.AddressBalanceFetcherCase do
alias Explorer.BufferedTask
alias Explorer.Indexer.AddressBalanceFetcher
def start_supervised!(options \\ []) when is_list(options) do
ExUnit.Callbacks.start_supervised!(
{BufferedTask,
{AddressBalanceFetcher,
Keyword.merge(
[debug_logs: false, fetch_interval: 1, max_batch_size: 1, max_concurrency: 1, name: AddressBalanceFetcher],
options
)}}
)
end
end

@ -1,7 +1,7 @@
{
"coverage_options": {
"treat_no_relevant_lines_as_covered": true,
"minimum_coverage": 92.6
"minimum_coverage": 91.6
},
"terminal_options": {
"file_column_width": 120

Loading…
Cancel
Save