parent
1ae7206d83
commit
0e3fdc3995
@ -0,0 +1,68 @@ |
||||
defmodule EthereumJSONRPC.RequestCoordinator do |
||||
@failure_rate_limit_interval :timer.minutes(3) |
||||
@failure_rate_limit 30 |
||||
|
||||
def perform(request, named_arguments) do |
||||
transport = Keyword.fetch!(named_arguments, :transport) |
||||
transport_options = Keyword.fetch!(named_arguments, :transport_options) |
||||
retry_options = Keyword.get(named_arguments, :retry_options) |
||||
|
||||
if retry_options do |
||||
retry_timeout = Keyword.get(retry_options, :retry_timeout, 5_000) |
||||
|
||||
fn -> |
||||
request_with_retry(transport, request, transport_options) |
||||
end |
||||
|> Task.async() |
||||
|> Task.await() |
||||
else |
||||
request(transport, request, transport_options) |
||||
end |
||||
end |
||||
|
||||
defp request_with_retry(transport, request, transport_options) do |
||||
key = something_that_uniquely_identifies_this_transport(transport, transport_options) |
||||
|
||||
sleep_if_too_many_recent_timeouts(key) |
||||
|
||||
case request(transport, request, transport_options) do |
||||
{:error, :timeout} -> |
||||
increment_recent_timeouts(key) |
||||
|
||||
request_with_retry(transport, request, transport_options) |
||||
|
||||
response -> |
||||
response |
||||
end |
||||
end |
||||
|
||||
defp request(transport, request, transport_options), do: transport.json_rpc(request, transport_options) |
||||
|
||||
@spec increment_recent_timeouts(String.t()) :: :ok |
||||
defp increment_recent_timeouts(key) do |
||||
# TODO: Call into rolling window rate limiter |
||||
# ExRated.check_rate(key, @failure_rate_limit_interval, @failure_rate_limit) |
||||
|
||||
:ok |
||||
end |
||||
|
||||
defp sleep_if_too_many_recent_timeouts(key) do |
||||
wait_coefficient = count_of_recent_timeouts(key) |
||||
|
||||
# TODO: Math TBD |
||||
:timer.sleep(:timer.seconds(wait_coefficient)) |
||||
end |
||||
|
||||
defp something_that_uniquely_identifies_this_transport(transport, transport_options) do |
||||
to_string(transport) <> "." <> transport_options[:url] |
||||
end |
||||
|
||||
defp count_of_recent_timeouts(key) do |
||||
# if we are using ex_rated it looks like this: |
||||
# TODO: Call into rolling window rate limiter |
||||
# {count, _count_remaining, _ms_to_next_bucket, _created_at, _updated_at} = |
||||
# ExRated.inspect_bucket(key, @failure_rate_limit_interval, @failure_rate_limit) |
||||
|
||||
count |
||||
end |
||||
end |
@ -0,0 +1,49 @@ |
||||
defmodule EthereumJSONRPC.RollingWindow do |
||||
use GenServer |
||||
require Logger |
||||
|
||||
@sweep_after :timer.seconds(10) |
||||
@interval :timer.seconds(60) |
||||
@tab :rate_limiter_requests |
||||
|
||||
## Client |
||||
|
||||
def start_link do |
||||
GenServer.start_link(__MODULE__, [], name: __MODULE__) |
||||
end |
||||
|
||||
def log_timeout(key) do |
||||
:ets.update_counter(@tab, key, {2, 1}, {key, 0, 0, 0, 0, 0, 0}) |
||||
end |
||||
|
||||
def count_timeouts(key) do |
||||
case :ets.lookup(@tab, key) do |
||||
[{_, a, b, c, d, e, f}] -> a + b + c + d + e + f |
||||
_ -> 0 |
||||
end |
||||
end |
||||
|
||||
## Server |
||||
def init(_) do |
||||
:ets.new(@tab, [:set, :named_table, :public, read_concurrency: true, write_concurrency: true]) |
||||
schedule_sweep() |
||||
{:ok, %{}} |
||||
end |
||||
|
||||
def handle_info(:sweep, state) do |
||||
Logger.debug("Sweeping requests") |
||||
|
||||
match_spec = [ |
||||
{{:"$1", :"$2", :"$3", :"$4", :"$5", :"$6", :"$7"}, [], [{{:"$1", 0, :"$2", :"$3", :"$4", :"$5", :"$6"}}]} |
||||
] |
||||
|
||||
:ets.select_replace(@tab, match_spec) |
||||
|
||||
schedule_sweep() |
||||
{:noreply, state} |
||||
end |
||||
|
||||
defp schedule_sweep do |
||||
Process.send_after(self(), :sweep, @sweep_after) |
||||
end |
||||
end |
Loading…
Reference in new issue