Subscribe to only realtime-indexer events for channel broadcasts
pull/472/head
Jimmy Lauzau 6 years ago committed by GitHub
commit 00c965254f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 13
      apps/explorer/lib/explorer/chain.ex
  2. 9
      apps/explorer/test/explorer/chain_test.exs
  3. 5
      apps/explorer_web/lib/explorer_web/application.ex
  4. 19
      apps/indexer/lib/indexer/block_fetcher.ex
  5. 8
      apps/indexer/test/indexer/block_fetcher_test.exs

@ -71,6 +71,7 @@ defmodule Explorer.Chain do
@typep addresses_option :: {:addresses, [params_option | timeout_option | with_option]}
@typep balances_option :: {:balances, [params_option | timeout_option]}
@typep blocks_option :: {:blocks, [params_option | timeout_option]}
@typep broadcast_option :: {:broadcast, Boolean}
@typep internal_transactions_option :: {:internal_transactions, [params_option | timeout_option]}
@typep logs_option :: {:logs, [params_option | timeout_option]}
@typep receipts_option :: {:receipts, [params_option | timeout_option]}
@ -612,6 +613,7 @@ defmodule Explorer.Chain do
| `:addresses` | `[Explorer.Chain.Address.t()]` | List of `t:Explorer.Chain.Address.t/0`s |
| `:balances` | `[%{address_hash: Explorer.Chain.Hash.t(), block_number: Explorer.Chain.Block.block_number()}]` | List of `t:Explorer.Chain.Address.t/0`s |
| `:blocks` | `[Explorer.Chain.Block.t()]` | List of `t:Explorer.Chain.Block.t/0`s |
| `:broacast` | `Boolean` | Boolean of whether to broadcast |
| `:internal_transactions` | `[%{index: non_neg_integer(), transaction_hash: Explorer.Chain.Hash.t()}]` | List of maps of the `t:Explorer.Chain.InternalTransaction.t/0` `index` and `transaction_hash` |
| `:logs` | `[Explorer.Chain.Log.t()]` | List of `t:Explorer.Chain.Log.t/0`s |
| `:transactions` | `[Explorer.Chain.Hash.t()]` | List of `t:Explorer.Chain.Transaction.t/0` `hash` |
@ -657,6 +659,7 @@ defmodule Explorer.Chain do
* `:blocks`
* `:params` - `list` of params for `Explorer.Chain.Block.changeset/2`.
* `:timeout` - the timeout for inserting all blocks. Defaults to `#{@insert_blocks_timeout}` milliseconds.
* `:broacast` - Boolean flag indicating whether or not to broadcast the event.
* `:internal_transactions`
* `:params` - `list` of params for `Explorer.Chain.InternalTransaction.changeset/2`.
* `:timeout` - the timeout for inserting all internal transactions. Defaults to
@ -682,6 +685,7 @@ defmodule Explorer.Chain do
addresses_option
| balances_option
| blocks_option
| broadcast_option
| internal_transactions_option
| logs_option
| receipts_option
@ -695,6 +699,7 @@ defmodule Explorer.Chain do
%{required(:address_hash) => Hash.Address.t(), required(:block_number) => Block.block_number()}
],
optional(:blocks) => [Block.t()],
optional(:broadcast) => Boolean,
optional(:internal_transactions) => [
%{required(:index) => non_neg_integer(), required(:transaction_hash) => Hash.Full.t()}
],
@ -706,12 +711,18 @@ defmodule Explorer.Chain do
| {:error, step :: Ecto.Multi.name(), failed_value :: any(),
changes_so_far :: %{optional(Ecto.Multi.name()) => any()}}
def import_blocks(options) when is_list(options) do
broadcast =
case Keyword.fetch(options, :broadcast) do
{:ok, broadcast} -> broadcast
:error -> false
end
changes_list_arguments_list = import_options_to_changes_list_arguments_list(options)
with {:ok, ecto_schema_module_to_changes_list} <-
changes_list_arguments_list_to_ecto_schema_module_to_changes_list(changes_list_arguments_list),
{:ok, data} <- insert_ecto_schema_module_to_changes_list(ecto_schema_module_to_changes_list, options) do
broadcast_events(data)
if broadcast, do: broadcast_events(data)
{:ok, data}
end
end

@ -1459,6 +1459,7 @@ defmodule Explorer.ChainTest do
}
]
],
broadcast: true,
internal_transactions: [
params: [
%{
@ -1543,5 +1544,13 @@ defmodule Explorer.ChainTest do
Chain.import_blocks(@import_data)
assert_received {:chain_event, :logs, [%Log{}]}
end
test "does not broadcast if broadcast option is false" do
non_broadcast_data = Keyword.merge(@import_data, broadcast: false)
Chain.subscribe_to_events(:logs)
Chain.import_blocks(non_broadcast_data)
refute_received {:chain_event, :logs, [%Log{}]}
end
end
end

@ -5,7 +5,7 @@ defmodule ExplorerWeb.Application do
use Application
alias ExplorerWeb.Endpoint
alias ExplorerWeb.{Endpoint, EventHandler}
def start(_type, _args) do
import Supervisor.Spec
@ -13,7 +13,8 @@ defmodule ExplorerWeb.Application do
# Define workers and child supervisors to be supervised
children = [
# Start the endpoint when the application starts
supervisor(Endpoint, [])
supervisor(Endpoint, []),
{EventHandler, name: EventHandler}
# Start your own worker by calling: PoaexpWeb.Worker.start_link(arg1, arg2, arg3)
# worker(PoaexpWeb.Worker, [arg1, arg2, arg3]),
]

@ -14,7 +14,7 @@ defmodule Indexer.BlockFetcher do
alias Indexer.{BalanceFetcher, AddressExtraction, BoundInterval, InternalTransactionFetcher, Sequence}
# dialyzer thinks that Logger.debug functions always have no_local_return
@dialyzer {:nowarn_function, import_range: 3}
@dialyzer {:nowarn_function, import_range: 4}
# These are all the *default* values for options.
# DO NOT use them directly in the code. Get options from `state`.
@ -265,20 +265,22 @@ defmodule Indexer.BlockFetcher do
{:ok, seq} = Sequence.start_link(ranges: missing_ranges, step: -1 * state.blocks_batch_size)
Sequence.cap(seq)
stream_import(state, seq, max_concurrency: state.blocks_concurrency)
stream_import(state, seq, :catchup_index, max_concurrency: state.blocks_concurrency)
end
missing_block_count
end
end
defp insert(seq, range, options) when is_list(options) do
defp insert(seq, range, indexer_mode, options) when is_list(options) do
{address_hash_to_fetched_balance_block_number, import_options} =
pop_address_hash_to_fetched_balance_block_number(options)
transaction_hash_to_block_number = get_transaction_hash_to_block_number(import_options)
with {:ok, results} <- Chain.import_blocks(import_options) do
options_with_broadcast = Keyword.merge(import_options, broadcast: indexer_mode == :realtime_index)
with {:ok, results} <- Chain.import_blocks(options_with_broadcast) do
async_import_remaining_block_data(
results,
address_hash_to_fetched_balance_block_number: address_hash_to_fetched_balance_block_number,
@ -349,14 +351,14 @@ defmodule Indexer.BlockFetcher do
defp realtime_task(%__MODULE__{json_rpc_named_arguments: json_rpc_named_arguments} = state) do
{:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments)
{:ok, seq} = Sequence.start_link(first: latest_block_number, step: 2)
stream_import(state, seq, max_concurrency: 1)
stream_import(state, seq, :realtime_index, max_concurrency: 1)
end
defp stream_import(%__MODULE__{} = state, seq, task_opts) do
defp stream_import(%__MODULE__{} = state, seq, indexer_mode, task_opts) do
seq
|> Sequence.build_stream()
|> Task.async_stream(
&import_range(&1, state, seq),
&import_range(&1, state, seq, indexer_mode),
Keyword.merge(task_opts, timeout: :infinity)
)
|> Stream.run()
@ -365,7 +367,7 @@ defmodule Indexer.BlockFetcher do
# Run at state.blocks_concurrency max_concurrency when called by `stream_import/3`
# Only public for testing
@doc false
def import_range(range, %__MODULE__{json_rpc_named_arguments: json_rpc_named_arguments} = state, seq) do
def import_range(range, %__MODULE__{json_rpc_named_arguments: json_rpc_named_arguments} = state, seq, indexer_mode) do
with {:blocks, {:ok, next, result}} <-
{:blocks, EthereumJSONRPC.fetch_blocks_by_range(range, json_rpc_named_arguments)},
%{blocks: blocks, transactions: transactions_without_receipts} = result,
@ -384,6 +386,7 @@ defmodule Indexer.BlockFetcher do
insert(
seq,
range,
indexer_mode,
addresses: [params: addresses],
blocks: [params: blocks],
logs: [params: logs],

@ -367,7 +367,7 @@ defmodule Indexer.BlockFetcherTest do
end
end
describe "import_range/3" do
describe "import_range/4" do
setup :state
setup %{json_rpc_named_arguments: json_rpc_named_arguments} do
@ -532,7 +532,7 @@ defmodule Indexer.BlockFetcherTest do
end
log_bad_gateway(
fn -> BlockFetcher.import_range(block_number..block_number, state, sequence) end,
fn -> BlockFetcher.import_range(block_number..block_number, state, sequence, :realtime_index) end,
fn result ->
assert {:ok,
%{
@ -792,7 +792,7 @@ defmodule Indexer.BlockFetcherTest do
154, 143, 4, 28, 171, 95, 190, 255, 254, 174, 75, 182>>
}
]
}} = BlockFetcher.import_range(block_number..block_number, state, sequence)
}} = BlockFetcher.import_range(block_number..block_number, state, sequence, :realtime_index)
wait_for_tasks(InternalTransactionFetcher)
wait_for_tasks(BalanceFetcher)
@ -879,7 +879,7 @@ defmodule Indexer.BlockFetcherTest do
57, 101, 36, 140, 57, 254, 153, 47, 255, 212, 51, 229>>
}
]
}} = BlockFetcher.import_range(block_number..block_number, state, sequence)
}} = BlockFetcher.import_range(block_number..block_number, state, sequence, :realtime_index)
wait_for_tasks(InternalTransactionFetcher)
wait_for_tasks(BalanceFetcher)

Loading…
Cancel
Save