From e5e695331f630f757babaf5a3150855bc66b1ef9 Mon Sep 17 00:00:00 2001 From: jimmay5469 Date: Tue, 24 Jul 2018 16:32:34 -0400 Subject: [PATCH] Subscribe to only realtime-indexer events for channel broadcasts --- apps/explorer/lib/explorer/chain.ex | 13 ++++++++++++- apps/explorer/test/explorer/chain_test.exs | 9 +++++++++ .../lib/explorer_web/application.ex | 5 +++-- apps/indexer/lib/indexer/block_fetcher.ex | 19 +++++++++++-------- .../test/indexer/block_fetcher_test.exs | 8 ++++---- 5 files changed, 39 insertions(+), 15 deletions(-) diff --git a/apps/explorer/lib/explorer/chain.ex b/apps/explorer/lib/explorer/chain.ex index ba7c1a8424..de945b5446 100644 --- a/apps/explorer/lib/explorer/chain.ex +++ b/apps/explorer/lib/explorer/chain.ex @@ -71,6 +71,7 @@ defmodule Explorer.Chain do @typep addresses_option :: {:addresses, [params_option | timeout_option | with_option]} @typep balances_option :: {:balances, [params_option | timeout_option]} @typep blocks_option :: {:blocks, [params_option | timeout_option]} + @typep broadcast_option :: {:broadcast, Boolean} @typep internal_transactions_option :: {:internal_transactions, [params_option | timeout_option]} @typep logs_option :: {:logs, [params_option | timeout_option]} @typep receipts_option :: {:receipts, [params_option | timeout_option]} @@ -612,6 +613,7 @@ defmodule Explorer.Chain do | `:addresses` | `[Explorer.Chain.Address.t()]` | List of `t:Explorer.Chain.Address.t/0`s | | `:balances` | `[%{address_hash: Explorer.Chain.Hash.t(), block_number: Explorer.Chain.Block.block_number()}]` | List of `t:Explorer.Chain.Address.t/0`s | | `:blocks` | `[Explorer.Chain.Block.t()]` | List of `t:Explorer.Chain.Block.t/0`s | + | `:broacast` | `Boolean` | Boolean of whether to broadcast | | `:internal_transactions` | `[%{index: non_neg_integer(), transaction_hash: Explorer.Chain.Hash.t()}]` | List of maps of the `t:Explorer.Chain.InternalTransaction.t/0` `index` and `transaction_hash` | | `:logs` | `[Explorer.Chain.Log.t()]` | List of `t:Explorer.Chain.Log.t/0`s | | `:transactions` | `[Explorer.Chain.Hash.t()]` | List of `t:Explorer.Chain.Transaction.t/0` `hash` | @@ -657,6 +659,7 @@ defmodule Explorer.Chain do * `:blocks` * `:params` - `list` of params for `Explorer.Chain.Block.changeset/2`. * `:timeout` - the timeout for inserting all blocks. Defaults to `#{@insert_blocks_timeout}` milliseconds. + * `:broacast` - Boolean flag indicating whether or not to broadcast the event. * `:internal_transactions` * `:params` - `list` of params for `Explorer.Chain.InternalTransaction.changeset/2`. * `:timeout` - the timeout for inserting all internal transactions. Defaults to @@ -682,6 +685,7 @@ defmodule Explorer.Chain do addresses_option | balances_option | blocks_option + | broadcast_option | internal_transactions_option | logs_option | receipts_option @@ -695,6 +699,7 @@ defmodule Explorer.Chain do %{required(:address_hash) => Hash.Address.t(), required(:block_number) => Block.block_number()} ], optional(:blocks) => [Block.t()], + optional(:broadcast) => Boolean, optional(:internal_transactions) => [ %{required(:index) => non_neg_integer(), required(:transaction_hash) => Hash.Full.t()} ], @@ -706,12 +711,18 @@ defmodule Explorer.Chain do | {:error, step :: Ecto.Multi.name(), failed_value :: any(), changes_so_far :: %{optional(Ecto.Multi.name()) => any()}} def import_blocks(options) when is_list(options) do + broadcast = + case Keyword.fetch(options, :broadcast) do + {:ok, broadcast} -> broadcast + :error -> false + end + changes_list_arguments_list = import_options_to_changes_list_arguments_list(options) with {:ok, ecto_schema_module_to_changes_list} <- changes_list_arguments_list_to_ecto_schema_module_to_changes_list(changes_list_arguments_list), {:ok, data} <- insert_ecto_schema_module_to_changes_list(ecto_schema_module_to_changes_list, options) do - broadcast_events(data) + if broadcast, do: broadcast_events(data) {:ok, data} end end diff --git a/apps/explorer/test/explorer/chain_test.exs b/apps/explorer/test/explorer/chain_test.exs index ddae5cde9f..546034970b 100644 --- a/apps/explorer/test/explorer/chain_test.exs +++ b/apps/explorer/test/explorer/chain_test.exs @@ -1459,6 +1459,7 @@ defmodule Explorer.ChainTest do } ] ], + broadcast: true, internal_transactions: [ params: [ %{ @@ -1543,5 +1544,13 @@ defmodule Explorer.ChainTest do Chain.import_blocks(@import_data) assert_received {:chain_event, :logs, [%Log{}]} end + + test "does not broadcast if broadcast option is false" do + non_broadcast_data = Keyword.merge(@import_data, broadcast: false) + + Chain.subscribe_to_events(:logs) + Chain.import_blocks(non_broadcast_data) + refute_received {:chain_event, :logs, [%Log{}]} + end end end diff --git a/apps/explorer_web/lib/explorer_web/application.ex b/apps/explorer_web/lib/explorer_web/application.ex index 0b934041d9..b253b333b2 100644 --- a/apps/explorer_web/lib/explorer_web/application.ex +++ b/apps/explorer_web/lib/explorer_web/application.ex @@ -5,7 +5,7 @@ defmodule ExplorerWeb.Application do use Application - alias ExplorerWeb.Endpoint + alias ExplorerWeb.{Endpoint, EventHandler} def start(_type, _args) do import Supervisor.Spec @@ -13,7 +13,8 @@ defmodule ExplorerWeb.Application do # Define workers and child supervisors to be supervised children = [ # Start the endpoint when the application starts - supervisor(Endpoint, []) + supervisor(Endpoint, []), + {EventHandler, name: EventHandler} # Start your own worker by calling: PoaexpWeb.Worker.start_link(arg1, arg2, arg3) # worker(PoaexpWeb.Worker, [arg1, arg2, arg3]), ] diff --git a/apps/indexer/lib/indexer/block_fetcher.ex b/apps/indexer/lib/indexer/block_fetcher.ex index 86b54385ad..45ba0eb544 100644 --- a/apps/indexer/lib/indexer/block_fetcher.ex +++ b/apps/indexer/lib/indexer/block_fetcher.ex @@ -14,7 +14,7 @@ defmodule Indexer.BlockFetcher do alias Indexer.{BalanceFetcher, AddressExtraction, BoundInterval, InternalTransactionFetcher, Sequence} # dialyzer thinks that Logger.debug functions always have no_local_return - @dialyzer {:nowarn_function, import_range: 3} + @dialyzer {:nowarn_function, import_range: 4} # These are all the *default* values for options. # DO NOT use them directly in the code. Get options from `state`. @@ -265,20 +265,22 @@ defmodule Indexer.BlockFetcher do {:ok, seq} = Sequence.start_link(ranges: missing_ranges, step: -1 * state.blocks_batch_size) Sequence.cap(seq) - stream_import(state, seq, max_concurrency: state.blocks_concurrency) + stream_import(state, seq, :catchup_index, max_concurrency: state.blocks_concurrency) end missing_block_count end end - defp insert(seq, range, options) when is_list(options) do + defp insert(seq, range, indexer_mode, options) when is_list(options) do {address_hash_to_fetched_balance_block_number, import_options} = pop_address_hash_to_fetched_balance_block_number(options) transaction_hash_to_block_number = get_transaction_hash_to_block_number(import_options) - with {:ok, results} <- Chain.import_blocks(import_options) do + options_with_broadcast = Keyword.merge(import_options, broadcast: indexer_mode == :realtime_index) + + with {:ok, results} <- Chain.import_blocks(options_with_broadcast) do async_import_remaining_block_data( results, address_hash_to_fetched_balance_block_number: address_hash_to_fetched_balance_block_number, @@ -349,14 +351,14 @@ defmodule Indexer.BlockFetcher do defp realtime_task(%__MODULE__{json_rpc_named_arguments: json_rpc_named_arguments} = state) do {:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments) {:ok, seq} = Sequence.start_link(first: latest_block_number, step: 2) - stream_import(state, seq, max_concurrency: 1) + stream_import(state, seq, :realtime_index, max_concurrency: 1) end - defp stream_import(%__MODULE__{} = state, seq, task_opts) do + defp stream_import(%__MODULE__{} = state, seq, indexer_mode, task_opts) do seq |> Sequence.build_stream() |> Task.async_stream( - &import_range(&1, state, seq), + &import_range(&1, state, seq, indexer_mode), Keyword.merge(task_opts, timeout: :infinity) ) |> Stream.run() @@ -365,7 +367,7 @@ defmodule Indexer.BlockFetcher do # Run at state.blocks_concurrency max_concurrency when called by `stream_import/3` # Only public for testing @doc false - def import_range(range, %__MODULE__{json_rpc_named_arguments: json_rpc_named_arguments} = state, seq) do + def import_range(range, %__MODULE__{json_rpc_named_arguments: json_rpc_named_arguments} = state, seq, indexer_mode) do with {:blocks, {:ok, next, result}} <- {:blocks, EthereumJSONRPC.fetch_blocks_by_range(range, json_rpc_named_arguments)}, %{blocks: blocks, transactions: transactions_without_receipts} = result, @@ -384,6 +386,7 @@ defmodule Indexer.BlockFetcher do insert( seq, range, + indexer_mode, addresses: [params: addresses], blocks: [params: blocks], logs: [params: logs], diff --git a/apps/indexer/test/indexer/block_fetcher_test.exs b/apps/indexer/test/indexer/block_fetcher_test.exs index d54ac986c3..d34464d8f7 100644 --- a/apps/indexer/test/indexer/block_fetcher_test.exs +++ b/apps/indexer/test/indexer/block_fetcher_test.exs @@ -367,7 +367,7 @@ defmodule Indexer.BlockFetcherTest do end end - describe "import_range/3" do + describe "import_range/4" do setup :state setup %{json_rpc_named_arguments: json_rpc_named_arguments} do @@ -532,7 +532,7 @@ defmodule Indexer.BlockFetcherTest do end log_bad_gateway( - fn -> BlockFetcher.import_range(block_number..block_number, state, sequence) end, + fn -> BlockFetcher.import_range(block_number..block_number, state, sequence, :realtime_index) end, fn result -> assert {:ok, %{ @@ -792,7 +792,7 @@ defmodule Indexer.BlockFetcherTest do 154, 143, 4, 28, 171, 95, 190, 255, 254, 174, 75, 182>> } ] - }} = BlockFetcher.import_range(block_number..block_number, state, sequence) + }} = BlockFetcher.import_range(block_number..block_number, state, sequence, :realtime_index) wait_for_tasks(InternalTransactionFetcher) wait_for_tasks(BalanceFetcher) @@ -879,7 +879,7 @@ defmodule Indexer.BlockFetcherTest do 57, 101, 36, 140, 57, 254, 153, 47, 255, 212, 51, 229>> } ] - }} = BlockFetcher.import_range(block_number..block_number, state, sequence) + }} = BlockFetcher.import_range(block_number..block_number, state, sequence, :realtime_index) wait_for_tasks(InternalTransactionFetcher) wait_for_tasks(BalanceFetcher)