WIP: clean up request coordinator with functionality and docs

pull/961/head
Alex Garibay 6 years ago committed by Luke Imhoff
parent 9410a86605
commit be7271a8d9
  1. 8
      apps/ethereum_jsonrpc/config/config.exs
  2. 16
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex
  3. 7
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/application.ex
  4. 108
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/request_coordinator.ex
  5. 1
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/rolling_window.ex

@ -6,6 +6,14 @@ config :logger, :ethereum_jsonrpc,
metadata: [:application, :request_id], metadata: [:application, :request_id],
metadata_filter: [application: :ethereum_jsonrpc] metadata_filter: [application: :ethereum_jsonrpc]
config :ethereum_jsonrpc, EthereumJSONRPC.RequestCoordinator,
rolling_window_opts: [
window_count: 6,
window_length: :timer.seconds(10),
bucket: EthereumJSONRPC.RequestCoordinator.TimeoutCounter
],
wait_per_timeout: :timer.seconds(10)
# Import environment specific config. This must remain at the bottom # Import environment specific config. This must remain at the bottom
# of this file so it overrides the configuration defined above. # of this file so it overrides the configuration defined above.
import_config "#{Mix.env()}.exs" import_config "#{Mix.env()}.exs"

@ -17,7 +17,16 @@ defmodule EthereumJSONRPC do
""" """
alias Explorer.Chain.Block alias Explorer.Chain.Block
alias EthereumJSONRPC.{Blocks, Receipts, Subscription, Transactions, Transport, Uncles, Variant} alias EthereumJSONRPC.{
Blocks,
Receipts,
RequestCoordinator,
Subscription,
Transactions,
Transport,
Uncles,
Variant
}
@typedoc """ @typedoc """
Truncated 20-byte [KECCAK-256](https://en.wikipedia.org/wiki/SHA-3) hash encoded as a hexadecimal number in a Truncated 20-byte [KECCAK-256](https://en.wikipedia.org/wiki/SHA-3) hash encoded as a hexadecimal number in a
@ -49,7 +58,7 @@ defmodule EthereumJSONRPC do
""" """
@type json_rpc_named_arguments :: [ @type json_rpc_named_arguments :: [
{:transport, Transport.t()} | {:transport_options, Transport.options()} | {:variant, Variant.t()} {:transport, Transport.t()} | {:transport_options, Transport.options()} | {:variant, Variant.t()} | {:throttle_timeout, non_neg_integer()}
] ]
@typedoc """ @typedoc """
@ -310,8 +319,9 @@ defmodule EthereumJSONRPC do
def json_rpc(request, named_arguments) when (is_map(request) or is_list(request)) and is_list(named_arguments) do def json_rpc(request, named_arguments) when (is_map(request) or is_list(request)) and is_list(named_arguments) do
transport = Keyword.fetch!(named_arguments, :transport) transport = Keyword.fetch!(named_arguments, :transport)
transport_options = Keyword.fetch!(named_arguments, :transport_options) transport_options = Keyword.fetch!(named_arguments, :transport_options)
throttle_timeout = Keyword.get(named_arguments, :throttle_timeout, 60_000)
transport.json_rpc(request, transport_options) RequestCoordinator.perform(request, transport, transport_options, throttle_timeout)
end end
@doc """ @doc """

@ -9,9 +9,14 @@ defmodule EthereumJSONRPC.Application do
@impl Application @impl Application
def start(_type, _args) do def start(_type, _args) do
rolling_window_opts =
:ethereum_jsonrpc,
|> Application.fetch_env!(RequestCoordinator)
|> Keyword.fetch!(:rolling_window_opts)
children = [ children = [
:hackney_pool.child_spec(:ethereum_jsonrpc, recv_timeout: 60_000, timeout: 60_000, max_connections: 1000), :hackney_pool.child_spec(:ethereum_jsonrpc, recv_timeout: 60_000, timeout: 60_000, max_connections: 1000),
{RollingWindow, [RequestCoordinator.rolling_window_opts(), [name: TimeoutCounter]]} {RollingWindow, [rolling_window_opts]}
] ]
Supervisor.start_link(children, strategy: :one_for_one, name: EthereumJSONRPC.Supervisor) Supervisor.start_link(children, strategy: :one_for_one, name: EthereumJSONRPC.Supervisor)

@ -1,14 +1,38 @@
defmodule EthereumJSONRPC.RequestCoordinator do defmodule EthereumJSONRPC.RequestCoordinator do
@moduledoc """ @moduledoc """
Retries JSONRPC requests according to the provided retry_options Coordinates requests with a backoff strategy.
Leverages `EthereumJSONRPC.RollingWindow` to keep track of the count This module leverages `EthereumJSONRPC.RollingWindow` to track request timeout
of recent timeouts, and waits a small amount of time per timeout. that have occurred recently. Options for this functionality can be changed at
the application configuration level.
To see the rolling window options, see `EthereumJSONRPC.Application` ## Configuration
The following are the expected and supported options for this module:
* `:rolling_window_opts` - Options for the process tracking timeouts
* `:window_count` - Number of windows
* `:window_length` - Length of each window in milliseconds
* `:bucket` - name of the bucket to uniquely identify the dataset
* `:wait_per_timeout` - Milliseconds to wait for each recent timeout within the tracked window
### Example Configuration
config :ethereum_jsonrpc, EthereumJSONRPC.RequestCoordinator,
rolling_window_opts: [
window_count: 6,
window_length: :timer.seconds(10),
bucket: EthereumJSONRPC.RequestCoordinator.TimeoutCounter
],
wait_per_timeout: :timer.seconds(10)
With this configuration, timeouts are tracked for 6 windows of 10 seconds for a total of 1 minute.
""" """
alias EthereumJSONRPC.{RollingWindow, TimeoutCounter}
alias EthereumJSONRPC.{RollingWindow, Transport}
alias EthereumJSONRPC.RequestCoordinator.TimeoutCounter
@timeout_key :timeout
@wait_per_timeout :timer.seconds(5) @wait_per_timeout :timer.seconds(5)
@rolling_window_opts [ @rolling_window_opts [
bucket: :ethereum_jsonrpc_bucket, bucket: :ethereum_jsonrpc_bucket,
@ -23,67 +47,53 @@ defmodule EthereumJSONRPC.RequestCoordinator do
end end
@doc """ @doc """
Retries the request according to the provided retry_options Performs a JSON RPC request and adds necessary backoff.
If none were provided, the request is not retried. In all cases, the request In the event that too many requests have timed out recently and the current
waits an amount of time before proceeding based on the count of recent request were to exceed someout threshold, the request isn't performed and
failures. `{:error, :timeout}` is returned.
""" """
def perform(request, named_arguments) do @spec perform(Transport.request(), Transport.t(), Transport.options(), non_neg_integer()) :: {:ok, Transport.result()} | {:error, term()}
transport = Keyword.fetch!(named_arguments, :transport) @spec perform(Transport.batch_request(), Transport.t(), Transport.options(), non_neg_integer()) :: {:ok, Transport.batch_result()} | {:error, term()}
transport_options = Keyword.fetch!(named_arguments, :transport_options) def perform(request, transport, transport_options, throttle_timeout) do
retry_options = Keyword.get(named_arguments, :retry_options) sleep_time = sleep_time()
if retry_options do if sleep_time <= throttle_timeout do
retry_timeout = Keyword.get(retry_options, :retry_timeout, 5_000) :timer.sleep(sleep_time)
fn -> request
request(transport, request, transport_options, true) |> transport.json_rpc(transport_options)
end |> handle_transport_response()
|> Task.async()
|> Task.await(retry_timeout)
else else
request(transport, request, transport_options, false) {:error, :timeout}
end end
end end
defp request(transport, request, transport_options, retry?) do defp handle_transport_response({:error, :timeout} = error) do
key = something_that_uniquely_identifies_this_transport(transport, transport_options) increment_recent_timeouts()
sleep_if_too_many_recent_timeouts(key)
case transport.json_rpc(request, transport_options) do
{:error, :timeout} = error ->
increment_recent_timeouts(key)
if retry? do
request(transport, request, transport_options, true)
else
error error
end end
response -> defp handle_transport_response(response), do: response
response
end
end
defp increment_recent_timeouts(key) do defp sleep_time do
RollingWindow.inc(TimeoutCounter, key) wait_coefficient = RollingWindow.count(bucket(), @timeout_key)
:ok wait_per_timeout =
end :ethereum_jsonrpc
|> Application.get_env(__MODULE__)
defp sleep_if_too_many_recent_timeouts(key) do |> Keyword.fetch!(:wait_per_timeout)
wait_coefficient = count_of_recent_timeouts(key)
:timer.sleep(wait_coefficient * @wait_per_timeout) wait_coefficient * @wait_per_timeout
end end
defp something_that_uniquely_identifies_this_transport(transport, transport_options) do defp increment_recent_timeouts do
to_string(transport) <> "." <> transport_options[:url] RollingWindow.inc(bucket(), @timeout_key)
:ok
end end
defp count_of_recent_timeouts(key) do defp bucket do
RollingWindow.count(TimeoutCounter, key) Application.get_env(:ethereum_jsonrpc, __MODULE__)[:rolling_window_opts]
end end
end end

@ -71,6 +71,7 @@ defmodule EthereumJSONRPC.RollingWindow do
default = List.to_tuple([key | windows]) default = List.to_tuple([key | windows])
:ets.update_counter(table, key, {2, 1}, default) :ets.update_counter(table, key, {2, 1}, default)
# TODO consider broadcasting to indexers than some threshold has been met with result of updating the counter
{:noreply, state} {:noreply, state}
end end

Loading…
Cancel
Save