WIP: test rolling window and request coordinator

pull/961/head
zachdaniel 6 years ago committed by Luke Imhoff
parent bd31796d40
commit 75c88fac85
  1. 8
      apps/ethereum_jsonrpc/config/test.exs
  2. 6
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex
  3. 2
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/application.ex
  4. 2
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/request_coordinator.ex
  5. 52
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/rolling_window.ex
  6. 65
      apps/ethereum_jsonrpc/test/ethereum_jsonrpc/request_coordinator_test.exs
  7. 102
      apps/ethereum_jsonrpc/test/ethereum_jsonrpc/rolling_window_test.exs

@ -3,3 +3,11 @@ use Mix.Config
config :logger, :ethereum_jsonrpc, config :logger, :ethereum_jsonrpc,
level: :warn, level: :warn,
path: Path.absname("logs/test/ethereum_jsonrpc.log") path: Path.absname("logs/test/ethereum_jsonrpc.log")
config :ethereum_jsonrpc, EthereumJSONRPC.RequestCoordinator,
rolling_window_opts: [
window_count: 3,
window_length: :timer.seconds(5),
table: EthereumJSONRPC.RequestCoordinator.TimeoutCounter
],
wait_per_timeout: 1

@ -17,6 +17,7 @@ defmodule EthereumJSONRPC do
""" """
alias Explorer.Chain.Block alias Explorer.Chain.Block
alias EthereumJSONRPC.{ alias EthereumJSONRPC.{
Blocks, Blocks,
Receipts, Receipts,
@ -58,7 +59,10 @@ defmodule EthereumJSONRPC do
""" """
@type json_rpc_named_arguments :: [ @type json_rpc_named_arguments :: [
{:transport, Transport.t()} | {:transport_options, Transport.options()} | {:variant, Variant.t()} | {:throttle_timeout, non_neg_integer()} {:transport, Transport.t()}
| {:transport_options, Transport.options()}
| {:variant, Variant.t()}
| {:throttle_timeout, non_neg_integer()}
] ]
@typedoc """ @typedoc """

@ -5,7 +5,7 @@ defmodule EthereumJSONRPC.Application do
use Application use Application
alias EthereumJSONRPC.{RollingWindow, TimeoutCounter, RequestCoordinator} alias EthereumJSONRPC.{RollingWindow, RequestCoordinator}
@impl Application @impl Application
def start(_type, _args) do def start(_type, _args) do

@ -73,7 +73,7 @@ defmodule EthereumJSONRPC.RequestCoordinator do
|> Application.get_env(__MODULE__) |> Application.get_env(__MODULE__)
|> Keyword.fetch!(:wait_per_timeout) |> Keyword.fetch!(:wait_per_timeout)
wait_coefficient * @wait_per_timeout wait_coefficient * wait_per_timeout
end end
defp increment_recent_timeouts do defp increment_recent_timeouts do

@ -32,12 +32,15 @@ defmodule EthereumJSONRPC.RollingWindow do
table = :ets.new(table_name, [:named_table, :set, :public, read_concurrency: true, write_concurrency: true]) table = :ets.new(table_name, [:named_table, :set, :public, read_concurrency: true, write_concurrency: true])
# TODO: Calculate the match spec for the given window count here, and store it in state replace_match_spec = match_spec(window_count)
delete_match_spec = delete_match_spec(window_count)
state = %{ state = %{
table: table, table: table,
window_length: window_length, window_length: window_length,
window_count: window_count window_count: window_count,
replace_match_spec: replace_match_spec,
delete_match_spec: delete_match_spec
} }
schedule_sweep(window_length) schedule_sweep(window_length)
@ -45,25 +48,48 @@ defmodule EthereumJSONRPC.RollingWindow do
{:ok, state} {:ok, state}
end end
def handle_info(:sweep, %{window_count: window_count, table: table, window_length: window_length} = state) do def handle_info(
Logger.debug(fn -> "Sweeping windows" end) :sweep,
# TODO consider broadcasting to indexers than some threshold has been met with result of updating the counter %{
table: table,
window_length: window_length,
delete_match_spec: delete_match_spec,
replace_match_spec: replace_match_spec
} = state
) do
sweep(table, delete_match_spec, replace_match_spec)
# Delete any rows wheree all windows empty schedule_sweep(window_length)
delete_match_spec = delete_match_spec(window_count)
:ets.match_delete(table, delete_match_spec) {:noreply, state}
end
match_spec = match_spec(window_count) # This handle_call e
def handle_call(
:sweep,
_from,
%{
table: table,
delete_match_spec: delete_match_spec,
replace_match_spec: replace_match_spec
} = state
) do
sweep(table, delete_match_spec, replace_match_spec)
:ets.select_replace(table, match_spec) {:reply, :ok, state}
end
schedule_sweep(window_length) # Public for testing
defp sweep(table, delete_match_spec, replace_match_spec) do
Logger.debug(fn -> "Sweeping windows" end)
{:noreply, state} # Delete any rows wheree all windows empty
:ets.match_delete(table, delete_match_spec)
:ets.select_replace(table, replace_match_spec)
end end
defp match_spec(window_count) do def match_spec(window_count) do
# This match spec represents this function: # This match spec represents this function:
# #
# :ets.fun2ms(fn # :ets.fun2ms(fn

@ -0,0 +1,65 @@
defmodule EthereumJSONRPC.RequestCoordinatorTest do
use ExUnit.Case
use EthereumJSONRPC.Case
alias EthereumJSONRPC.RollingWindow
alias EthereumJSONRPC.RequestCoordinator
import Mox
setup :set_mox_global
setup :verify_on_exit!
defp sleep_time(timeouts) do
wait_per_timeout =
:ethereum_jsonrpc
|> Application.get_env(RequestCoordinator)
|> Keyword.fetch!(:wait_per_timeout)
timeouts * wait_per_timeout
end
setup do
table = Application.get_env(:ethereum_jsonrpc, EthereumJSONRPC.RequestCoordinator)[:rolling_window_opts][:table]
:ets.delete_all_objects(table)
%{table: table}
end
test "rolling window increments on timeout", %{table: table} do
expect(EthereumJSONRPC.Mox, :json_rpc, fn _, _ -> {:error, :timeout} end)
RequestCoordinator.perform(%{}, EthereumJSONRPC.Mox, [], :timer.minutes(60))
assert RollingWindow.count(table, :timeout) == 1
end
test "waits the configured amount of time per failure", %{table: table} do
RollingWindow.inc(table, :timeout)
RollingWindow.inc(table, :timeout)
RollingWindow.inc(table, :timeout)
RollingWindow.inc(table, :timeout)
RollingWindow.inc(table, :timeout)
RollingWindow.inc(table, :timeout)
test_process = self()
expect(EthereumJSONRPC.Mox, :json_rpc, fn _, _ ->
send(test_process, :called_json_rpc)
end)
# Calculate expected sleep time as if there were one less failure, allowing
# a margin of error between the refute_receive, assert_receive, and actual
# call.
wait_time = sleep_time(5)
Task.async(fn ->
RequestCoordinator.perform(%{}, EthereumJSONRPC.Mox, [], :timer.minutes(60))
end)
refute_receive(:called_json_rpc, wait_time)
assert_receive(:called_json_rpc, wait_time)
end
end

@ -0,0 +1,102 @@
defmodule EthereumJSONRPC.RollingWindowTest do
use ExUnit.Case, async: true
use EthereumJSONRPC.Case
alias EthereumJSONRPC.RollingWindow
@table :table
setup do
# We set `window_length` to a large time frame so that we can sweep manually to simulate
# time passing
RollingWindow.start_link([table: @table, window_length: :timer.minutes(120), window_count: 3], name: RollingWindow)
:ok
end
defp sweep do
GenServer.call(RollingWindow, :sweep)
end
test "when no increments have happened, inspect returns an empty list" do
assert RollingWindow.inspect(@table, :foobar) == []
end
test "when no increments hafve happened, count returns 0" do
assert RollingWindow.count(@table, :foobar) == 0
end
test "when an increment has happened, inspect returns the count for that window" do
RollingWindow.inc(@table, :foobar)
assert RollingWindow.inspect(@table, :foobar) == [1]
end
test "when an increment has happened, count returns the count for that window" do
RollingWindow.inc(@table, :foobar)
assert RollingWindow.count(@table, :foobar) == 1
end
test "when an increment has happened in multiple windows, inspect returns the count for both windows" do
RollingWindow.inc(@table, :foobar)
sweep()
RollingWindow.inc(@table, :foobar)
assert RollingWindow.inspect(@table, :foobar) == [1, 1]
end
test "when an increment has happened in multiple windows, count returns the sum of both windows" do
RollingWindow.inc(@table, :foobar)
sweep()
RollingWindow.inc(@table, :foobar)
assert RollingWindow.count(@table, :foobar) == 2
end
test "when an increment has happened in multiple windows, with an empty window in between, inspect shows that empty window" do
RollingWindow.inc(@table, :foobar)
sweep()
sweep()
RollingWindow.inc(@table, :foobar)
assert RollingWindow.inspect(@table, :foobar) == [1, 0, 1]
end
test "when an increment has happened in multiple windows, with an empty window in between, count still sums all windows" do
RollingWindow.inc(@table, :foobar)
sweep()
sweep()
RollingWindow.inc(@table, :foobar)
assert RollingWindow.count(@table, :foobar) == 2
end
test "when an increment has happened, but has been swept <window_count> times, it no longer appears in inspect" do
RollingWindow.inc(@table, :foobar)
sweep()
sweep()
RollingWindow.inc(@table, :foobar)
sweep()
RollingWindow.inc(@table, :foobar)
assert RollingWindow.inspect(@table, :foobar) == [1, 1, 0]
end
test "when an increment has happened, but has been swept <window_count> times, it no longer is included in count" do
RollingWindow.inc(@table, :foobar)
sweep()
sweep()
RollingWindow.inc(@table, :foobar)
sweep()
RollingWindow.inc(@table, :foobar)
assert RollingWindow.count(@table, :foobar) == 2
end
test "sweeping schedules another sweep" do
{:ok, state} = RollingWindow.init(table: :anything, window_length: 1, window_count: 1)
RollingWindow.handle_info(:sweep, state)
assert_receive(:sweep)
end
end
Loading…
Cancel
Save