diff --git a/apps/ethereum_jsonrpc/config/test.exs b/apps/ethereum_jsonrpc/config/test.exs index d066e4a38a..1de5f3b63b 100644 --- a/apps/ethereum_jsonrpc/config/test.exs +++ b/apps/ethereum_jsonrpc/config/test.exs @@ -3,3 +3,11 @@ use Mix.Config config :logger, :ethereum_jsonrpc, level: :warn, path: Path.absname("logs/test/ethereum_jsonrpc.log") + +config :ethereum_jsonrpc, EthereumJSONRPC.RequestCoordinator, + rolling_window_opts: [ + window_count: 3, + window_length: :timer.seconds(5), + table: EthereumJSONRPC.RequestCoordinator.TimeoutCounter + ], + wait_per_timeout: 1 diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex index bf6feada79..a33d60dc6e 100644 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex @@ -17,6 +17,7 @@ defmodule EthereumJSONRPC do """ alias Explorer.Chain.Block + alias EthereumJSONRPC.{ Blocks, Receipts, @@ -58,7 +59,10 @@ defmodule EthereumJSONRPC do """ @type json_rpc_named_arguments :: [ - {:transport, Transport.t()} | {:transport_options, Transport.options()} | {:variant, Variant.t()} | {:throttle_timeout, non_neg_integer()} + {:transport, Transport.t()} + | {:transport_options, Transport.options()} + | {:variant, Variant.t()} + | {:throttle_timeout, non_neg_integer()} ] @typedoc """ diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/application.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/application.ex index ddcb3539fa..64aeb31c8b 100644 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/application.ex +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/application.ex @@ -5,7 +5,7 @@ defmodule EthereumJSONRPC.Application do use Application - alias EthereumJSONRPC.{RollingWindow, TimeoutCounter, RequestCoordinator} + alias EthereumJSONRPC.{RollingWindow, RequestCoordinator} @impl Application def start(_type, _args) do diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/request_coordinator.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/request_coordinator.ex index 0a8783118f..cb726a1480 100644 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/request_coordinator.ex +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/request_coordinator.ex @@ -73,7 +73,7 @@ defmodule EthereumJSONRPC.RequestCoordinator do |> Application.get_env(__MODULE__) |> Keyword.fetch!(:wait_per_timeout) - wait_coefficient * @wait_per_timeout + wait_coefficient * wait_per_timeout end defp increment_recent_timeouts do diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/rolling_window.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/rolling_window.ex index f878c4ce7e..97b8f3cb60 100644 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/rolling_window.ex +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/rolling_window.ex @@ -32,12 +32,15 @@ defmodule EthereumJSONRPC.RollingWindow do table = :ets.new(table_name, [:named_table, :set, :public, read_concurrency: true, write_concurrency: true]) - # TODO: Calculate the match spec for the given window count here, and store it in state + replace_match_spec = match_spec(window_count) + delete_match_spec = delete_match_spec(window_count) state = %{ table: table, window_length: window_length, - window_count: window_count + window_count: window_count, + replace_match_spec: replace_match_spec, + delete_match_spec: delete_match_spec } schedule_sweep(window_length) @@ -45,25 +48,48 @@ defmodule EthereumJSONRPC.RollingWindow do {:ok, state} end - def handle_info(:sweep, %{window_count: window_count, table: table, window_length: window_length} = state) do - Logger.debug(fn -> "Sweeping windows" end) - # TODO consider broadcasting to indexers than some threshold has been met with result of updating the counter + def handle_info( + :sweep, + %{ + table: table, + window_length: window_length, + delete_match_spec: delete_match_spec, + replace_match_spec: replace_match_spec + } = state + ) do + sweep(table, delete_match_spec, replace_match_spec) - # Delete any rows wheree all windows empty - delete_match_spec = delete_match_spec(window_count) + schedule_sweep(window_length) - :ets.match_delete(table, delete_match_spec) + {:noreply, state} + end - match_spec = match_spec(window_count) + # This handle_call e + def handle_call( + :sweep, + _from, + %{ + table: table, + delete_match_spec: delete_match_spec, + replace_match_spec: replace_match_spec + } = state + ) do + sweep(table, delete_match_spec, replace_match_spec) - :ets.select_replace(table, match_spec) + {:reply, :ok, state} + end - schedule_sweep(window_length) + # Public for testing + defp sweep(table, delete_match_spec, replace_match_spec) do + Logger.debug(fn -> "Sweeping windows" end) - {:noreply, state} + # Delete any rows wheree all windows empty + :ets.match_delete(table, delete_match_spec) + + :ets.select_replace(table, replace_match_spec) end - defp match_spec(window_count) do + def match_spec(window_count) do # This match spec represents this function: # # :ets.fun2ms(fn diff --git a/apps/ethereum_jsonrpc/test/ethereum_jsonrpc/request_coordinator_test.exs b/apps/ethereum_jsonrpc/test/ethereum_jsonrpc/request_coordinator_test.exs new file mode 100644 index 0000000000..df917b02f5 --- /dev/null +++ b/apps/ethereum_jsonrpc/test/ethereum_jsonrpc/request_coordinator_test.exs @@ -0,0 +1,65 @@ +defmodule EthereumJSONRPC.RequestCoordinatorTest do + use ExUnit.Case + use EthereumJSONRPC.Case + + alias EthereumJSONRPC.RollingWindow + alias EthereumJSONRPC.RequestCoordinator + + import Mox + + setup :set_mox_global + setup :verify_on_exit! + + defp sleep_time(timeouts) do + wait_per_timeout = + :ethereum_jsonrpc + |> Application.get_env(RequestCoordinator) + |> Keyword.fetch!(:wait_per_timeout) + + timeouts * wait_per_timeout + end + + setup do + table = Application.get_env(:ethereum_jsonrpc, EthereumJSONRPC.RequestCoordinator)[:rolling_window_opts][:table] + + :ets.delete_all_objects(table) + + %{table: table} + end + + test "rolling window increments on timeout", %{table: table} do + expect(EthereumJSONRPC.Mox, :json_rpc, fn _, _ -> {:error, :timeout} end) + + RequestCoordinator.perform(%{}, EthereumJSONRPC.Mox, [], :timer.minutes(60)) + + assert RollingWindow.count(table, :timeout) == 1 + end + + test "waits the configured amount of time per failure", %{table: table} do + RollingWindow.inc(table, :timeout) + RollingWindow.inc(table, :timeout) + RollingWindow.inc(table, :timeout) + RollingWindow.inc(table, :timeout) + RollingWindow.inc(table, :timeout) + RollingWindow.inc(table, :timeout) + + test_process = self() + + expect(EthereumJSONRPC.Mox, :json_rpc, fn _, _ -> + send(test_process, :called_json_rpc) + end) + + # Calculate expected sleep time as if there were one less failure, allowing + # a margin of error between the refute_receive, assert_receive, and actual + # call. + wait_time = sleep_time(5) + + Task.async(fn -> + RequestCoordinator.perform(%{}, EthereumJSONRPC.Mox, [], :timer.minutes(60)) + end) + + refute_receive(:called_json_rpc, wait_time) + + assert_receive(:called_json_rpc, wait_time) + end +end diff --git a/apps/ethereum_jsonrpc/test/ethereum_jsonrpc/rolling_window_test.exs b/apps/ethereum_jsonrpc/test/ethereum_jsonrpc/rolling_window_test.exs new file mode 100644 index 0000000000..4eabe5305f --- /dev/null +++ b/apps/ethereum_jsonrpc/test/ethereum_jsonrpc/rolling_window_test.exs @@ -0,0 +1,102 @@ +defmodule EthereumJSONRPC.RollingWindowTest do + use ExUnit.Case, async: true + use EthereumJSONRPC.Case + + alias EthereumJSONRPC.RollingWindow + + @table :table + + setup do + # We set `window_length` to a large time frame so that we can sweep manually to simulate + # time passing + RollingWindow.start_link([table: @table, window_length: :timer.minutes(120), window_count: 3], name: RollingWindow) + + :ok + end + + defp sweep do + GenServer.call(RollingWindow, :sweep) + end + + test "when no increments have happened, inspect returns an empty list" do + assert RollingWindow.inspect(@table, :foobar) == [] + end + + test "when no increments hafve happened, count returns 0" do + assert RollingWindow.count(@table, :foobar) == 0 + end + + test "when an increment has happened, inspect returns the count for that window" do + RollingWindow.inc(@table, :foobar) + + assert RollingWindow.inspect(@table, :foobar) == [1] + end + + test "when an increment has happened, count returns the count for that window" do + RollingWindow.inc(@table, :foobar) + + assert RollingWindow.count(@table, :foobar) == 1 + end + + test "when an increment has happened in multiple windows, inspect returns the count for both windows" do + RollingWindow.inc(@table, :foobar) + sweep() + RollingWindow.inc(@table, :foobar) + + assert RollingWindow.inspect(@table, :foobar) == [1, 1] + end + + test "when an increment has happened in multiple windows, count returns the sum of both windows" do + RollingWindow.inc(@table, :foobar) + sweep() + RollingWindow.inc(@table, :foobar) + + assert RollingWindow.count(@table, :foobar) == 2 + end + + test "when an increment has happened in multiple windows, with an empty window in between, inspect shows that empty window" do + RollingWindow.inc(@table, :foobar) + sweep() + sweep() + RollingWindow.inc(@table, :foobar) + + assert RollingWindow.inspect(@table, :foobar) == [1, 0, 1] + end + + test "when an increment has happened in multiple windows, with an empty window in between, count still sums all windows" do + RollingWindow.inc(@table, :foobar) + sweep() + sweep() + RollingWindow.inc(@table, :foobar) + + assert RollingWindow.count(@table, :foobar) == 2 + end + + test "when an increment has happened, but has been swept times, it no longer appears in inspect" do + RollingWindow.inc(@table, :foobar) + sweep() + sweep() + RollingWindow.inc(@table, :foobar) + sweep() + RollingWindow.inc(@table, :foobar) + + assert RollingWindow.inspect(@table, :foobar) == [1, 1, 0] + end + + test "when an increment has happened, but has been swept times, it no longer is included in count" do + RollingWindow.inc(@table, :foobar) + sweep() + sweep() + RollingWindow.inc(@table, :foobar) + sweep() + RollingWindow.inc(@table, :foobar) + + assert RollingWindow.count(@table, :foobar) == 2 + end + + test "sweeping schedules another sweep" do + {:ok, state} = RollingWindow.init(table: :anything, window_length: 1, window_count: 1) + RollingWindow.handle_info(:sweep, state) + assert_receive(:sweep) + end +end