pull/162/head
Luke Imhoff 7 years ago
parent 3500daaf14
commit d7a4706d03
  1. 14
      apps/explorer/lib/explorer/indexer/address_fetcher.ex
  2. 10
      apps/explorer/lib/explorer/indexer/block_fetcher.ex
  3. 4
      apps/explorer/lib/explorer/indexer/supervisor.ex
  4. 2
      apps/explorer/lib/explorer/jsonrpc.ex
  5. 2
      apps/explorer/mix.exs
  6. 1
      mix.exs

@ -46,6 +46,7 @@ defmodule Explorer.Indexer.AddressFetcher do
def handle_info(:flush, state) do def handle_info(:flush, state) do
{:noreply, state |> fetch_next_batch([]) |> schedule_next_buffer_flush()} {:noreply, state |> fetch_next_batch([]) |> schedule_next_buffer_flush()}
end end
def handle_info({:async_fetch, hashes}, state) do def handle_info({:async_fetch, hashes}, state) do
{:noreply, fetch_next_batch(state, hashes)} {:noreply, fetch_next_batch(state, hashes)}
end end
@ -105,6 +106,7 @@ defmodule Explorer.Indexer.AddressFetcher do
if :queue.len(batch) > 0 do if :queue.len(batch) > 0 do
schedule_async_fetch(:queue.to_list(batch)) schedule_async_fetch(:queue.to_list(batch))
end end
:ok :ok
end end
@ -138,11 +140,13 @@ defmodule Explorer.Indexer.AddressFetcher do
if Enum.count(state.tasks) < @max_concurrency and :queue.len(state.buffer) > 0 do if Enum.count(state.tasks) < @max_concurrency and :queue.len(state.buffer) > 0 do
{batch, new_queue} = take_batch(state.buffer) {batch, new_queue} = take_batch(state.buffer)
task = Task.Supervisor.async_nolink(Explorer.Indexer.TaskSupervisor, fn ->
debug(state, fn -> "fetching #{Enum.count(batch)} balances" end) task =
{:ok, balances} = do_fetch_addresses(batch) Task.Supervisor.async_nolink(Explorer.Indexer.TaskSupervisor, fn ->
{:fetched_balances, balances} debug(state, fn -> "fetching #{Enum.count(batch)} balances" end)
end) {:ok, balances} = do_fetch_addresses(batch)
{:fetched_balances, balances}
end)
%{state | tasks: Map.put(state.tasks, task.ref, batch), buffer: new_queue} %{state | tasks: Map.put(state.tasks, task.ref, batch), buffer: new_queue}
else else

@ -11,7 +11,7 @@ defmodule Explorer.Indexer.BlockFetcher do
alias Explorer.Indexer.{ alias Explorer.Indexer.{
Sequence, Sequence,
BlockImporter, BlockImporter
} }
alias Explorer.JSONRPC.Transactions alias Explorer.JSONRPC.Transactions
@ -27,7 +27,6 @@ defmodule Explorer.Indexer.BlockFetcher do
@receipts_batch_size 250 @receipts_batch_size 250
@receipts_concurrency 20 @receipts_concurrency 20
@doc """ @doc """
Starts the server. Starts the server.
@ -54,7 +53,7 @@ defmodule Explorer.Indexer.BlockFetcher do
state = %{ state = %{
genesis_task: nil, genesis_task: nil,
debug_logs: Keyword.get(opts, :debug_logs, false), debug_logs: Keyword.get(opts, :debug_logs, false),
realtime_interval: (opts[:block_rate] || @block_rate) * 2, realtime_interval: (opts[:block_rate] || @block_rate) * 2
} }
{:ok, state} {:ok, state}
@ -152,8 +151,7 @@ defmodule Explorer.Indexer.BlockFetcher do
|> Enum.chunk_every(@receipts_batch_size) |> Enum.chunk_every(@receipts_batch_size)
|> Task.async_stream(&JSONRPC.fetch_transaction_receipts(&1), stream_opts) |> Task.async_stream(&JSONRPC.fetch_transaction_receipts(&1), stream_opts)
|> Enum.reduce_while({:ok, %{logs: [], receipts: []}}, fn |> Enum.reduce_while({:ok, %{logs: [], receipts: []}}, fn
{:ok, {:ok, %{logs: logs, receipts: receipts}}}, {:ok, {:ok, %{logs: logs, receipts: receipts}}}, {:ok, %{logs: acc_logs, receipts: acc_receipts}} ->
{:ok, %{logs: acc_logs, receipts: acc_receipts}} ->
{:cont, {:ok, %{logs: acc_logs ++ logs, receipts: acc_receipts ++ receipts}}} {:cont, {:ok, %{logs: acc_logs ++ logs, receipts: acc_receipts ++ receipts}}}
{:ok, {:error, reason}}, {:ok, _acc} -> {:ok, {:error, reason}}, {:ok, _acc} ->
@ -214,7 +212,6 @@ defmodule Explorer.Indexer.BlockFetcher do
{:ok, receipt_params} <- fetch_transaction_receipts(state, transaction_hashes), {:ok, receipt_params} <- fetch_transaction_receipts(state, transaction_hashes),
%{logs: logs, receipts: receipts} <- receipt_params, %{logs: logs, receipts: receipts} <- receipt_params,
{:ok, internal_transactions} <- fetch_internal_transactions(state, transaction_hashes) do {:ok, internal_transactions} <- fetch_internal_transactions(state, transaction_hashes) do
insert(state, seq, range, %{ insert(state, seq, range, %{
blocks: blocks, blocks: blocks,
internal_transactions: internal_transactions, internal_transactions: internal_transactions,
@ -222,7 +219,6 @@ defmodule Explorer.Indexer.BlockFetcher do
receipts: receipts, receipts: receipts,
transactions: transactions transactions: transactions
}) })
else else
{:error, reason} -> {:error, reason} ->
debug(state, fn -> debug(state, fn ->

@ -8,7 +8,7 @@ defmodule Explorer.Indexer.Supervisor do
alias Explorer.Indexer.{ alias Explorer.Indexer.{
BlockFetcher, BlockFetcher,
BlockImporter, BlockImporter,
AddressFetcher, AddressFetcher
} }
def start_link(opts) do def start_link(opts) do
@ -21,7 +21,7 @@ defmodule Explorer.Indexer.Supervisor do
{Task.Supervisor, name: Explorer.Indexer.TaskSupervisor}, {Task.Supervisor, name: Explorer.Indexer.TaskSupervisor},
{BlockFetcher, []}, {BlockFetcher, []},
{BlockImporter, []}, {BlockImporter, []},
{AddressFetcher, []}, {AddressFetcher, []}
] ]
Supervisor.init(children, strategy: :rest_for_one) Supervisor.init(children, strategy: :rest_for_one)

@ -151,7 +151,7 @@ defmodule Explorer.JSONRPC do
|> json_rpc(config(:url)) |> json_rpc(config(:url))
|> handle_get_block_by_number() |> handle_get_block_by_number()
|> case do |> case do
{:ok, _next, results} -> {:ok, results} {:ok, _next, results} -> {:ok, results}
{:error, reason} -> {:error, reason} {:error, reason} -> {:error, reason}
end end
end end

@ -105,7 +105,9 @@ defmodule Explorer.Mixfile do
test: ["ecto.drop", "ecto.create --quiet", "ecto.migrate", "test"] test: ["ecto.drop", "ecto.create --quiet", "ecto.migrate", "test"]
] ++ env_aliases(env) ] ++ env_aliases(env)
end end
defp env_aliases(:dev), do: [] defp env_aliases(:dev), do: []
defp env_aliases(_env) do defp env_aliases(_env) do
[compile: "compile --warnings-as-errors"] [compile: "compile --warnings-as-errors"]
end end

@ -30,6 +30,7 @@ defmodule ExplorerUmbrella.Mixfile do
defp aliases(:dev) do defp aliases(:dev) do
[] []
end end
defp aliases(_env) do defp aliases(_env) do
[ [
compile: "compile --warnings-as-errors" compile: "compile --warnings-as-errors"

Loading…
Cancel
Save