diff --git a/apps/ethereum_jsonrpc/config/config.exs b/apps/ethereum_jsonrpc/config/config.exs index 39c4928bdc..75008c8fa5 100644 --- a/apps/ethereum_jsonrpc/config/config.exs +++ b/apps/ethereum_jsonrpc/config/config.exs @@ -10,7 +10,7 @@ config :ethereum_jsonrpc, EthereumJSONRPC.RequestCoordinator, rolling_window_opts: [ window_count: 6, window_length: :timer.seconds(10), - bucket: EthereumJSONRPC.RequestCoordinator.TimeoutCounter + table: EthereumJSONRPC.RequestCoordinator.TimeoutCounter ], wait_per_timeout: :timer.seconds(10) diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/application.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/application.ex index 3383e3a875..ddcb3539fa 100644 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/application.ex +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/application.ex @@ -10,7 +10,7 @@ defmodule EthereumJSONRPC.Application do @impl Application def start(_type, _args) do rolling_window_opts = - :ethereum_jsonrpc, + :ethereum_jsonrpc |> Application.fetch_env!(RequestCoordinator) |> Keyword.fetch!(:rolling_window_opts) diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/request_coordinator.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/request_coordinator.ex index 21a9c5b843..0a8783118f 100644 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/request_coordinator.ex +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/request_coordinator.ex @@ -13,7 +13,7 @@ defmodule EthereumJSONRPC.RequestCoordinator do * `:rolling_window_opts` - Options for the process tracking timeouts * `:window_count` - Number of windows * `:window_length` - Length of each window in milliseconds - * `:bucket` - name of the bucket to uniquely identify the dataset + * `:table` - name of the ets table to store the data in * `:wait_per_timeout` - Milliseconds to wait for each recent timeout within the tracked window ### Example Configuration @@ -22,7 +22,7 @@ defmodule EthereumJSONRPC.RequestCoordinator do rolling_window_opts: [ window_count: 6, window_length: :timer.seconds(10), - bucket: EthereumJSONRPC.RequestCoordinator.TimeoutCounter + table: EthereumJSONRPC.RequestCoordinator.TimeoutCounter ], wait_per_timeout: :timer.seconds(10) @@ -30,21 +30,8 @@ defmodule EthereumJSONRPC.RequestCoordinator do """ alias EthereumJSONRPC.{RollingWindow, Transport} - alias EthereumJSONRPC.RequestCoordinator.TimeoutCounter @timeout_key :timeout - @wait_per_timeout :timer.seconds(5) - @rolling_window_opts [ - bucket: :ethereum_jsonrpc_bucket, - window_length: :timer.seconds(10), - window_count: 6 - ] - - @doc "Options used when initializing the RollingWindow used by this module." - @spec rolling_window_opts() :: Keyword.t() - def rolling_window_opts do - @rolling_window_opts - end @doc """ Performs a JSON RPC request and adds necessary backoff. @@ -53,8 +40,10 @@ defmodule EthereumJSONRPC.RequestCoordinator do request were to exceed someout threshold, the request isn't performed and `{:error, :timeout}` is returned. """ - @spec perform(Transport.request(), Transport.t(), Transport.options(), non_neg_integer()) :: {:ok, Transport.result()} | {:error, term()} - @spec perform(Transport.batch_request(), Transport.t(), Transport.options(), non_neg_integer()) :: {:ok, Transport.batch_result()} | {:error, term()} + @spec perform(Transport.request(), Transport.t(), Transport.options(), non_neg_integer()) :: + {:ok, Transport.result()} | {:error, term()} + @spec perform(Transport.batch_request(), Transport.t(), Transport.options(), non_neg_integer()) :: + {:ok, Transport.batch_result()} | {:error, term()} def perform(request, transport, transport_options, throttle_timeout) do sleep_time = sleep_time() @@ -77,7 +66,7 @@ defmodule EthereumJSONRPC.RequestCoordinator do defp handle_transport_response(response), do: response defp sleep_time do - wait_coefficient = RollingWindow.count(bucket(), @timeout_key) + wait_coefficient = RollingWindow.count(table(), @timeout_key) wait_per_timeout = :ethereum_jsonrpc @@ -88,12 +77,12 @@ defmodule EthereumJSONRPC.RequestCoordinator do end defp increment_recent_timeouts do - RollingWindow.inc(bucket(), @timeout_key) + RollingWindow.inc(table(), @timeout_key) :ok end - defp bucket do - Application.get_env(:ethereum_jsonrpc, __MODULE__)[:rolling_window_opts] + defp table do + Application.get_env(:ethereum_jsonrpc, __MODULE__)[:rolling_window_opts][:table] end end diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/rolling_window.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/rolling_window.ex index d85bfe598a..f878c4ce7e 100644 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/rolling_window.ex +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/rolling_window.ex @@ -26,12 +26,14 @@ defmodule EthereumJSONRPC.RollingWindow do end def init(opts) do - table_name = Keyword.fetch!(opts, :bucket) + table_name = Keyword.fetch!(opts, :table) window_length = Keyword.fetch!(opts, :window_length) window_count = Keyword.fetch!(opts, :window_count) table = :ets.new(table_name, [:named_table, :set, :public, read_concurrency: true, write_concurrency: true]) + # TODO: Calculate the match spec for the given window count here, and store it in state + state = %{ table: table, window_length: window_length, @@ -43,41 +45,9 @@ defmodule EthereumJSONRPC.RollingWindow do {:ok, state} end - def handle_call({:count, key}, _from, %{table: table} = state) do - count = - case :ets.lookup(table, key) do - [windows] -> windows |> Tuple.to_list() |> tl() |> Enum.sum() - _ -> 0 - end - - {:reply, count, state} - end - - def handle_call({:inspect, key}, _from, %{table: table, window_count: window_count} = state) do - windows = - case :ets.lookup(table, key) do - [windows] -> - windows |> Tuple.to_list() |> tl() - - _ -> - List.duplicate(0, window_count) - end - - {:reply, windows, state} - end - - def handle_cast({:inc, key}, %{table: table, window_count: window_count} = state) do - windows = List.duplicate(0, window_count) - default = List.to_tuple([key | windows]) - - :ets.update_counter(table, key, {2, 1}, default) - # TODO consider broadcasting to indexers than some threshold has been met with result of updating the counter - - {:noreply, state} - end - def handle_info(:sweep, %{window_count: window_count, table: table, window_length: window_length} = state) do Logger.debug(fn -> "Sweeping windows" end) + # TODO consider broadcasting to indexers than some threshold has been met with result of updating the counter # Delete any rows wheree all windows empty delete_match_spec = delete_match_spec(window_count) @@ -94,39 +64,70 @@ defmodule EthereumJSONRPC.RollingWindow do end defp match_spec(window_count) do + # This match spec represents this function: + # + # :ets.fun2ms(fn + # {key, n, [a, b, _]} -> + # {key, 0, [n, a, b]} + # + # {key, n, windows} -> + # {key, 0, [n | windows]} + # end) + # + # This function is an example for when window size is 3. The match spec + # matches on all but the last element of the list + [ { - match_spec_matcher(window_count), + full_windows_match_spec_matcher(window_count), [], - match_spec_mapper(window_count) + full_windows_match_spec_mapper(window_count) + }, + { + partial_windows_match_spec_matcher(), + [], + partial_windows_match_spec_mapper() } ] end - defp match_spec_matcher(window_count) do - range = Range.new(1, window_count + 1) - - range - |> Enum.map(&:"$#{&1}") - |> List.to_tuple() + defp full_windows_match_spec_matcher(1) do + {:"$1", :"$2", []} end - defp delete_match_spec(window_count) do - List.to_tuple([:"$1" | List.duplicate(0, window_count)]) + defp full_windows_match_spec_matcher(window_count) do + windows = + 3 + |> Range.new(window_count) + |> Enum.map(&:"$#{&1}") + |> Kernel.++([:_]) + + {:"$1", :"$2", windows} end - defp match_spec_mapper(1) do - [{{:"$1", 0}}] + defp full_windows_match_spec_mapper(1) do + [{{:"$1", 0, []}}] end - defp match_spec_mapper(window_count) do - inner_tuple = - 1..window_count + defp full_windows_match_spec_mapper(window_count) do + windows = + 3 + |> Range.new(window_count) |> Enum.map(&:"$#{&1}") - |> List.to_tuple() - |> Tuple.insert_at(1, 0) - [{inner_tuple}] + [{{:"$1", 0, [:"$2" | windows]}}] + end + + defp partial_windows_match_spec_matcher do + {:"$1", :"$2", :"$3"} + end + + defp partial_windows_match_spec_mapper do + [{{:"$1", 0, [:"$2" | :"$3"]}}] + end + + defp delete_match_spec(window_count) do + {:"$1", 0, List.duplicate(0, window_count - 1)} end defp schedule_sweep(window_length) do @@ -136,26 +137,37 @@ defmodule EthereumJSONRPC.RollingWindow do @doc """ Increment the count of events in the current window """ - @spec inc(GenServer.server(), key :: term()) :: :ok - def inc(server, key) do - # Consider requiring the bucket and key to be passed in here - # so that this and count/2 do not need to call to the server - GenServer.cast(server, {:inc, key}) + @spec inc(table :: atom, key :: term()) :: :ok + def inc(table, key) do + default = {key, 0, []} + + :ets.update_counter(table, key, {2, 1}, default) + + :ok end @doc """ Count all events in all windows """ - @spec count(GenServer.server(), key :: term()) :: non_neg_integer() - def count(server, key) do - GenServer.call(server, {:count, key}) + @spec count(table :: atom, key :: term()) :: non_neg_integer() + def count(table, key) do + case :ets.lookup(table, key) do + [{_, current_window, windows}] -> current_window + Enum.sum(windows) + _ -> 0 + end end @doc """ Display the raw contents of all windows for a given key """ - @spec inspect(GenServer.server(), key :: term()) :: nonempty_list(non_neg_integer) - def inspect(server, key) do - GenServer.call(server, {:inspect, key}) + @spec inspect(table :: atom, key :: term()) :: nonempty_list(non_neg_integer) + def inspect(table, key) do + case :ets.lookup(table, key) do + [{_, current_window, windows}] -> + [current_window | windows] + + _ -> + [] + end end end