@ -18,9 +18,14 @@ defmodule EthereumJSONRPC.RequestCoordinator do
the tracked window
the tracked window
* ` :max_jitter ` - Maximimum amount of time in milliseconds to be added to each
* ` :max_jitter ` - Maximimum amount of time in milliseconds to be added to each
wait before multiplied by timeout count
wait before multiplied by timeout count
* ` :throttle_rolling_window_opts ` - Options for the process tracking all requests
* ` :window_count ` - Number of windows
* ` :duration ` - Total amount of time to coumt events in milliseconds
* ` :table ` - name of the ets table to store the data in
* ` :throttle_rate_limit ` - The total number of requests allowed in the all windows .
See the docs for ` EthereumJSONRPC.RollingWindow ` for more documentation for
See the docs for ` EthereumJSONRPC.RollingWindow ` for more documentation for
` :rolling_window_opts ` .
` :rolling_window_opts ` and ` :throttle_rolling_window_opts ` .
This is how the wait time for each request is calculated :
This is how the wait time for each request is calculated :
@ -33,13 +38,20 @@ defmodule EthereumJSONRPC.RequestCoordinator do
config :ethereum_jsonrpc , EthereumJSONRPC.RequestCoordinator ,
config :ethereum_jsonrpc , EthereumJSONRPC.RequestCoordinator ,
rolling_window_opts : [
rolling_window_opts : [
window_count : 6 ,
window_count : 6 ,
duration : :timer . second s( 10 ) ,
duration : :timer . minute s( 1 ) ,
table : EthereumJSONRPC.RequestCoordinator.TimeoutCounter
table : EthereumJSONRPC.RequestCoordinator.TimeoutCounter
] ,
] ,
wait_per_timeout : :timer . seconds ( 10 ) ,
wait_per_timeout : :timer . seconds ( 10 ) ,
max_jitter : :timer . seconds ( 1 )
max_jitter : :timer . seconds ( 1 )
throttle_rate_limit : 60 ,
throttle_rolling_window_opts : [
window_count : 3 ,
duration : :timer . seconds ( 10 ) ,
table : EthereumJSONRPC.RequestCoordinator.RequestCounter
]
With this configuration , timeouts are tracked for 6 windows of 10 seconds for a total of 1 minute .
With this configuration , timeouts are tracked for 6 windows of 10 seconds for a total of 1 minute .
Requests are tracked for 3 windows of 10 seconds , for a total of 30 seconds , and
"""
"""
require EthereumJSONRPC.Tracer
require EthereumJSONRPC.Tracer
@ -47,6 +59,7 @@ defmodule EthereumJSONRPC.RequestCoordinator do
alias EthereumJSONRPC . { RollingWindow , Tracer , Transport }
alias EthereumJSONRPC . { RollingWindow , Tracer , Transport }
@error_key :throttleable_error_count
@error_key :throttleable_error_count
@throttle_key :throttle_requests_count
@doc """
@doc """
Performs a JSON RPC request and adds necessary backoff .
Performs a JSON RPC request and adds necessary backoff .
@ -64,12 +77,19 @@ defmodule EthereumJSONRPC.RequestCoordinator do
if sleep_time <= throttle_timeout do
if sleep_time <= throttle_timeout do
:timer . sleep ( sleep_time )
:timer . sleep ( sleep_time )
remaining_wait_time = throttle_timeout - sleep_time
trace_request ( request , fn ->
request
case throttle_request ( remaining_wait_time ) do
|> transport . json_rpc ( transport_options )
:ok ->
|> handle_transport_response ( )
trace_request ( request , fn ->
end )
request
|> transport . json_rpc ( transport_options )
|> handle_transport_response ( )
end )
:error ->
{ :error , :timeout }
end
else
else
:timer . sleep ( throttle_timeout )
:timer . sleep ( throttle_timeout )
@ -91,33 +111,78 @@ defmodule EthereumJSONRPC.RequestCoordinator do
defp handle_transport_response ( { :error , { :bad_gateway , _ } } = error ) do
defp handle_transport_response ( { :error , { :bad_gateway , _ } } = error ) do
RollingWindow . inc ( table ( ) , @error_key )
RollingWindow . inc ( table ( ) , @error_key )
inc_throttle_table ( )
error
error
end
end
defp handle_transport_response ( { :error , :timeout } = error ) do
defp handle_transport_response ( { :error , :timeout } = error ) do
RollingWindow . inc ( table ( ) , @error_key )
RollingWindow . inc ( table ( ) , @error_key )
inc_throttle_table ( )
error
error
end
end
defp handle_transport_response ( response ) , do : response
defp handle_transport_response ( response ) do
inc_throttle_table ( )
response
end
defp inc_throttle_table do
if config ( :throttle_rolling_window_opts ) do
RollingWindow . inc ( throttle_table ( ) , @throttle_key )
end
end
defp throttle_request (
remaining_time ,
rate_limit \\ config ( :throttle_rate_limit ) ,
opts \\ config ( :throttle_rolling_window_opts )
) do
if opts [ :throttle_rate_limit ] && RollingWindow . count ( throttle_table ( ) , @throttle_key ) >= rate_limit do
if opts [ :duration ] >= remaining_time do
:timer . sleep ( remaining_time )
:error
else
new_remaining_time = remaining_time - opts [ :duration ]
:timer . sleep ( opts [ :duration ] )
throttle_request ( new_remaining_time , rate_limit , opts )
end
else
:ok
end
end
defp sleep_time do
defp sleep_time do
wait_coefficient = RollingWindow . count ( table ( ) , @error_key )
wait_coefficient = RollingWindow . count ( table ( ) , @error_key )
jitter = :rand . uniform ( config ( :max_jitter ) )
jitter = :rand . uniform ( config! ( :max_jitter ) )
wait_per_timeout = config ( :wait_per_timeout )
wait_per_timeout = config! ( :wait_per_timeout )
wait_coefficient * ( wait_per_timeout + jitter )
wait_coefficient * ( wait_per_timeout + jitter )
end
end
defp table do
defp table do
:rolling_window_opts
:rolling_window_opts
|> config ( )
|> config! ( )
|> Keyword . fetch! ( :table )
|> Keyword . fetch! ( :table )
end
end
defp config ( key ) do
defp throttle_table do
case config ( :throttle_rolling_window_opts ) do
nil -> :ignore
keyword -> Keyword . fetch! ( keyword , :table )
end
end
defp config! ( key ) do
:ethereum_jsonrpc
:ethereum_jsonrpc
|> Application . get_env ( __MODULE__ )
|> Application . get_env ( __MODULE__ )
|> Keyword . fetch! ( key )
|> Keyword . fetch! ( key )
end
end
defp config ( key ) do
:ethereum_jsonrpc
|> Application . get_env ( __MODULE__ )
|> Keyword . get ( key )
end
end
end