|
|
|
@ -4,8 +4,7 @@ defmodule EthereumJSONRPC.RollingWindow do |
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
use GenServer |
|
|
|
|
|
|
|
|
|
@sweep_after :timer.seconds(10) |
|
|
|
|
require Logger |
|
|
|
|
|
|
|
|
|
def child_spec([init_arguments]) do |
|
|
|
|
child_spec([init_arguments, []]) |
|
|
|
@ -29,9 +28,9 @@ defmodule EthereumJSONRPC.RollingWindow do |
|
|
|
|
def init(opts) do |
|
|
|
|
table_name = Keyword.fetch!(opts, :bucket) |
|
|
|
|
window_length = Keyword.fetch!(opts, :window_length) |
|
|
|
|
window_count = Keyword.fetch(otps, :window_count) |
|
|
|
|
window_count = Keyword.fetch!(opts, :window_count) |
|
|
|
|
|
|
|
|
|
table = :ets.new(bucket, [:named_table, :set, :public, read_concurrency: true, write_concurrency: true]) |
|
|
|
|
table = :ets.new(table_name, [:named_table, :set, :public, read_concurrency: true, write_concurrency: true]) |
|
|
|
|
|
|
|
|
|
state = %{ |
|
|
|
|
table: table, |
|
|
|
@ -44,9 +43,46 @@ defmodule EthereumJSONRPC.RollingWindow do |
|
|
|
|
{:ok, state} |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
def handle_call({:count, key}, _from, %{table: table} = state) do |
|
|
|
|
count = |
|
|
|
|
case :ets.lookup(table, key) do |
|
|
|
|
[windows] -> windows |> Tuple.to_list() |> tl() |> Enum.sum() |
|
|
|
|
_ -> 0 |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
{:reply, count, state} |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
def handle_call({:inspect, key}, _from, %{table: table, window_count: window_count} = state) do |
|
|
|
|
windows = |
|
|
|
|
case :ets.lookup(table, key) do |
|
|
|
|
[windows] -> |
|
|
|
|
windows |> Tuple.to_list() |> tl() |
|
|
|
|
|
|
|
|
|
_ -> |
|
|
|
|
List.duplicate(0, window_count) |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
{:reply, windows, state} |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
def handle_cast({:inc, key}, %{table: table, window_count: window_count} = state) do |
|
|
|
|
windows = List.duplicate(0, window_count) |
|
|
|
|
default = List.to_tuple([key | windows]) |
|
|
|
|
|
|
|
|
|
:ets.update_counter(table, key, {2, 1}, default) |
|
|
|
|
|
|
|
|
|
{:noreply, state} |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
def handle_info(:sweep, %{window_count: window_count, table: table, window_length: window_length} = state) do |
|
|
|
|
Logger.debug(fn -> "Sweeping windows" end) |
|
|
|
|
|
|
|
|
|
# Delete any rows wheree all windows empty |
|
|
|
|
delete_match_spec = delete_match_spec(window_count) |
|
|
|
|
|
|
|
|
|
:ets.match_delete(table, delete_match_spec) |
|
|
|
|
|
|
|
|
|
match_spec = match_spec(window_count) |
|
|
|
|
|
|
|
|
|
:ets.select_replace(table, match_spec) |
|
|
|
@ -57,19 +93,25 @@ defmodule EthereumJSONRPC.RollingWindow do |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
defp match_spec(window_count) do |
|
|
|
|
[{ |
|
|
|
|
[ |
|
|
|
|
{ |
|
|
|
|
match_spec_matcher(window_count), |
|
|
|
|
[], |
|
|
|
|
match_spec_mapper(window_count) |
|
|
|
|
}] |
|
|
|
|
} |
|
|
|
|
] |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
defp match_spec_matcher(window_count) do |
|
|
|
|
range = Range.new(1, window_count + 1) |
|
|
|
|
|
|
|
|
|
range |
|
|
|
|
|> Enum.map(& :"$#{&1}") |
|
|
|
|
|> to_tuple() |
|
|
|
|
|> Enum.map(&:"$#{&1}") |
|
|
|
|
|> List.to_tuple() |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
defp delete_match_spec(window_count) do |
|
|
|
|
List.to_tuple([:"$1" | List.duplicate(0, window_count)]) |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
defp match_spec_mapper(1) do |
|
|
|
@ -79,9 +121,10 @@ defmodule EthereumJSONRPC.RollingWindow do |
|
|
|
|
defp match_spec_mapper(window_count) do |
|
|
|
|
inner_tuple = |
|
|
|
|
1..window_count |
|
|
|
|
|> Enum.map(& :"$#{&1}") |
|
|
|
|
|> to_tuple() |
|
|
|
|
|> Enum.map(&:"$#{&1}") |
|
|
|
|
|> List.to_tuple() |
|
|
|
|
|> Tuple.insert_at(1, 0) |
|
|
|
|
|
|
|
|
|
[{inner_tuple}] |
|
|
|
|
end |
|
|
|
|
|
|
|
|
@ -89,16 +132,29 @@ defmodule EthereumJSONRPC.RollingWindow do |
|
|
|
|
Process.send_after(self(), :sweep, window_length) |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
def log_timeout(key) do |
|
|
|
|
# TODO account for tables of different window counts |
|
|
|
|
:ets.update_counter(@tab, key, {2, 1}, {key, 0, 0, 0, 0, 0, 0}) |
|
|
|
|
@doc """ |
|
|
|
|
Increment the count of events in the current window |
|
|
|
|
""" |
|
|
|
|
@spec inc(GenServer.server(), key :: term()) :: :ok |
|
|
|
|
def inc(server, key) do |
|
|
|
|
# Consider requiring the bucket and key to be passed in here |
|
|
|
|
# so that this and count/2 do not need to call to the server |
|
|
|
|
GenServer.cast(server, {:inc, key}) |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
def count_timeouts(key) do |
|
|
|
|
# TODO account for tables of different window counts |
|
|
|
|
case :ets.lookup(@tab, key) do |
|
|
|
|
[{_, a, b, c, d, e, f}] -> a + b + c + d + e + f |
|
|
|
|
_ -> 0 |
|
|
|
|
@doc """ |
|
|
|
|
Count all events in all windows |
|
|
|
|
""" |
|
|
|
|
@spec count(GenServer.server(), key :: term()) :: non_neg_integer() |
|
|
|
|
def count(server, key) do |
|
|
|
|
GenServer.call(server, {:count, key}) |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
@doc """ |
|
|
|
|
Display the raw contents of all windows for a given key |
|
|
|
|
""" |
|
|
|
|
@spec inspect(GenServer.server(), key :: term()) :: nonempty_list(non_neg_integer) |
|
|
|
|
def inspect(server, key) do |
|
|
|
|
GenServer.call(server, {:inspect, key}) |
|
|
|
|
end |
|
|
|
|
end |
|
|
|
|