WIP: Finish/clean up rolling window tracker

pull/961/head
zachdaniel 6 years ago committed by Luke Imhoff
parent be7271a8d9
commit bd31796d40
  1. 2
      apps/ethereum_jsonrpc/config/config.exs
  2. 2
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/application.ex
  3. 31
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/request_coordinator.ex
  4. 138
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/rolling_window.ex

@ -10,7 +10,7 @@ config :ethereum_jsonrpc, EthereumJSONRPC.RequestCoordinator,
rolling_window_opts: [ rolling_window_opts: [
window_count: 6, window_count: 6,
window_length: :timer.seconds(10), window_length: :timer.seconds(10),
bucket: EthereumJSONRPC.RequestCoordinator.TimeoutCounter table: EthereumJSONRPC.RequestCoordinator.TimeoutCounter
], ],
wait_per_timeout: :timer.seconds(10) wait_per_timeout: :timer.seconds(10)

@ -10,7 +10,7 @@ defmodule EthereumJSONRPC.Application do
@impl Application @impl Application
def start(_type, _args) do def start(_type, _args) do
rolling_window_opts = rolling_window_opts =
:ethereum_jsonrpc, :ethereum_jsonrpc
|> Application.fetch_env!(RequestCoordinator) |> Application.fetch_env!(RequestCoordinator)
|> Keyword.fetch!(:rolling_window_opts) |> Keyword.fetch!(:rolling_window_opts)

@ -13,7 +13,7 @@ defmodule EthereumJSONRPC.RequestCoordinator do
* `:rolling_window_opts` - Options for the process tracking timeouts * `:rolling_window_opts` - Options for the process tracking timeouts
* `:window_count` - Number of windows * `:window_count` - Number of windows
* `:window_length` - Length of each window in milliseconds * `:window_length` - Length of each window in milliseconds
* `:bucket` - name of the bucket to uniquely identify the dataset * `:table` - name of the ets table to store the data in
* `:wait_per_timeout` - Milliseconds to wait for each recent timeout within the tracked window * `:wait_per_timeout` - Milliseconds to wait for each recent timeout within the tracked window
### Example Configuration ### Example Configuration
@ -22,7 +22,7 @@ defmodule EthereumJSONRPC.RequestCoordinator do
rolling_window_opts: [ rolling_window_opts: [
window_count: 6, window_count: 6,
window_length: :timer.seconds(10), window_length: :timer.seconds(10),
bucket: EthereumJSONRPC.RequestCoordinator.TimeoutCounter table: EthereumJSONRPC.RequestCoordinator.TimeoutCounter
], ],
wait_per_timeout: :timer.seconds(10) wait_per_timeout: :timer.seconds(10)
@ -30,21 +30,8 @@ defmodule EthereumJSONRPC.RequestCoordinator do
""" """
alias EthereumJSONRPC.{RollingWindow, Transport} alias EthereumJSONRPC.{RollingWindow, Transport}
alias EthereumJSONRPC.RequestCoordinator.TimeoutCounter
@timeout_key :timeout @timeout_key :timeout
@wait_per_timeout :timer.seconds(5)
@rolling_window_opts [
bucket: :ethereum_jsonrpc_bucket,
window_length: :timer.seconds(10),
window_count: 6
]
@doc "Options used when initializing the RollingWindow used by this module."
@spec rolling_window_opts() :: Keyword.t()
def rolling_window_opts do
@rolling_window_opts
end
@doc """ @doc """
Performs a JSON RPC request and adds necessary backoff. Performs a JSON RPC request and adds necessary backoff.
@ -53,8 +40,10 @@ defmodule EthereumJSONRPC.RequestCoordinator do
request were to exceed someout threshold, the request isn't performed and request were to exceed someout threshold, the request isn't performed and
`{:error, :timeout}` is returned. `{:error, :timeout}` is returned.
""" """
@spec perform(Transport.request(), Transport.t(), Transport.options(), non_neg_integer()) :: {:ok, Transport.result()} | {:error, term()} @spec perform(Transport.request(), Transport.t(), Transport.options(), non_neg_integer()) ::
@spec perform(Transport.batch_request(), Transport.t(), Transport.options(), non_neg_integer()) :: {:ok, Transport.batch_result()} | {:error, term()} {:ok, Transport.result()} | {:error, term()}
@spec perform(Transport.batch_request(), Transport.t(), Transport.options(), non_neg_integer()) ::
{:ok, Transport.batch_result()} | {:error, term()}
def perform(request, transport, transport_options, throttle_timeout) do def perform(request, transport, transport_options, throttle_timeout) do
sleep_time = sleep_time() sleep_time = sleep_time()
@ -77,7 +66,7 @@ defmodule EthereumJSONRPC.RequestCoordinator do
defp handle_transport_response(response), do: response defp handle_transport_response(response), do: response
defp sleep_time do defp sleep_time do
wait_coefficient = RollingWindow.count(bucket(), @timeout_key) wait_coefficient = RollingWindow.count(table(), @timeout_key)
wait_per_timeout = wait_per_timeout =
:ethereum_jsonrpc :ethereum_jsonrpc
@ -88,12 +77,12 @@ defmodule EthereumJSONRPC.RequestCoordinator do
end end
defp increment_recent_timeouts do defp increment_recent_timeouts do
RollingWindow.inc(bucket(), @timeout_key) RollingWindow.inc(table(), @timeout_key)
:ok :ok
end end
defp bucket do defp table do
Application.get_env(:ethereum_jsonrpc, __MODULE__)[:rolling_window_opts] Application.get_env(:ethereum_jsonrpc, __MODULE__)[:rolling_window_opts][:table]
end end
end end

@ -26,12 +26,14 @@ defmodule EthereumJSONRPC.RollingWindow do
end end
def init(opts) do def init(opts) do
table_name = Keyword.fetch!(opts, :bucket) table_name = Keyword.fetch!(opts, :table)
window_length = Keyword.fetch!(opts, :window_length) window_length = Keyword.fetch!(opts, :window_length)
window_count = Keyword.fetch!(opts, :window_count) window_count = Keyword.fetch!(opts, :window_count)
table = :ets.new(table_name, [:named_table, :set, :public, read_concurrency: true, write_concurrency: true]) table = :ets.new(table_name, [:named_table, :set, :public, read_concurrency: true, write_concurrency: true])
# TODO: Calculate the match spec for the given window count here, and store it in state
state = %{ state = %{
table: table, table: table,
window_length: window_length, window_length: window_length,
@ -43,41 +45,9 @@ defmodule EthereumJSONRPC.RollingWindow do
{:ok, state} {:ok, state}
end end
def handle_call({:count, key}, _from, %{table: table} = state) do
count =
case :ets.lookup(table, key) do
[windows] -> windows |> Tuple.to_list() |> tl() |> Enum.sum()
_ -> 0
end
{:reply, count, state}
end
def handle_call({:inspect, key}, _from, %{table: table, window_count: window_count} = state) do
windows =
case :ets.lookup(table, key) do
[windows] ->
windows |> Tuple.to_list() |> tl()
_ ->
List.duplicate(0, window_count)
end
{:reply, windows, state}
end
def handle_cast({:inc, key}, %{table: table, window_count: window_count} = state) do
windows = List.duplicate(0, window_count)
default = List.to_tuple([key | windows])
:ets.update_counter(table, key, {2, 1}, default)
# TODO consider broadcasting to indexers than some threshold has been met with result of updating the counter
{:noreply, state}
end
def handle_info(:sweep, %{window_count: window_count, table: table, window_length: window_length} = state) do def handle_info(:sweep, %{window_count: window_count, table: table, window_length: window_length} = state) do
Logger.debug(fn -> "Sweeping windows" end) Logger.debug(fn -> "Sweeping windows" end)
# TODO consider broadcasting to indexers than some threshold has been met with result of updating the counter
# Delete any rows wheree all windows empty # Delete any rows wheree all windows empty
delete_match_spec = delete_match_spec(window_count) delete_match_spec = delete_match_spec(window_count)
@ -94,39 +64,70 @@ defmodule EthereumJSONRPC.RollingWindow do
end end
defp match_spec(window_count) do defp match_spec(window_count) do
# This match spec represents this function:
#
# :ets.fun2ms(fn
# {key, n, [a, b, _]} ->
# {key, 0, [n, a, b]}
#
# {key, n, windows} ->
# {key, 0, [n | windows]}
# end)
#
# This function is an example for when window size is 3. The match spec
# matches on all but the last element of the list
[ [
{ {
match_spec_matcher(window_count), full_windows_match_spec_matcher(window_count),
[], [],
match_spec_mapper(window_count) full_windows_match_spec_mapper(window_count)
},
{
partial_windows_match_spec_matcher(),
[],
partial_windows_match_spec_mapper()
} }
] ]
end end
defp match_spec_matcher(window_count) do defp full_windows_match_spec_matcher(1) do
range = Range.new(1, window_count + 1) {:"$1", :"$2", []}
range
|> Enum.map(&:"$#{&1}")
|> List.to_tuple()
end end
defp delete_match_spec(window_count) do defp full_windows_match_spec_matcher(window_count) do
List.to_tuple([:"$1" | List.duplicate(0, window_count)]) windows =
3
|> Range.new(window_count)
|> Enum.map(&:"$#{&1}")
|> Kernel.++([:_])
{:"$1", :"$2", windows}
end end
defp match_spec_mapper(1) do defp full_windows_match_spec_mapper(1) do
[{{:"$1", 0}}] [{{:"$1", 0, []}}]
end end
defp match_spec_mapper(window_count) do defp full_windows_match_spec_mapper(window_count) do
inner_tuple = windows =
1..window_count 3
|> Range.new(window_count)
|> Enum.map(&:"$#{&1}") |> Enum.map(&:"$#{&1}")
|> List.to_tuple()
|> Tuple.insert_at(1, 0)
[{inner_tuple}] [{{:"$1", 0, [:"$2" | windows]}}]
end
defp partial_windows_match_spec_matcher do
{:"$1", :"$2", :"$3"}
end
defp partial_windows_match_spec_mapper do
[{{:"$1", 0, [:"$2" | :"$3"]}}]
end
defp delete_match_spec(window_count) do
{:"$1", 0, List.duplicate(0, window_count - 1)}
end end
defp schedule_sweep(window_length) do defp schedule_sweep(window_length) do
@ -136,26 +137,37 @@ defmodule EthereumJSONRPC.RollingWindow do
@doc """ @doc """
Increment the count of events in the current window Increment the count of events in the current window
""" """
@spec inc(GenServer.server(), key :: term()) :: :ok @spec inc(table :: atom, key :: term()) :: :ok
def inc(server, key) do def inc(table, key) do
# Consider requiring the bucket and key to be passed in here default = {key, 0, []}
# so that this and count/2 do not need to call to the server
GenServer.cast(server, {:inc, key}) :ets.update_counter(table, key, {2, 1}, default)
:ok
end end
@doc """ @doc """
Count all events in all windows Count all events in all windows
""" """
@spec count(GenServer.server(), key :: term()) :: non_neg_integer() @spec count(table :: atom, key :: term()) :: non_neg_integer()
def count(server, key) do def count(table, key) do
GenServer.call(server, {:count, key}) case :ets.lookup(table, key) do
[{_, current_window, windows}] -> current_window + Enum.sum(windows)
_ -> 0
end
end end
@doc """ @doc """
Display the raw contents of all windows for a given key Display the raw contents of all windows for a given key
""" """
@spec inspect(GenServer.server(), key :: term()) :: nonempty_list(non_neg_integer) @spec inspect(table :: atom, key :: term()) :: nonempty_list(non_neg_integer)
def inspect(server, key) do def inspect(table, key) do
GenServer.call(server, {:inspect, key}) case :ets.lookup(table, key) do
[{_, current_window, windows}] ->
[current_window | windows]
_ ->
[]
end
end end
end end

Loading…
Cancel
Save