diff --git a/apps/ethereum_jsonrpc/config/config.exs b/apps/ethereum_jsonrpc/config/config.exs index f89de8c121..39c4928bdc 100644 --- a/apps/ethereum_jsonrpc/config/config.exs +++ b/apps/ethereum_jsonrpc/config/config.exs @@ -6,6 +6,14 @@ config :logger, :ethereum_jsonrpc, metadata: [:application, :request_id], metadata_filter: [application: :ethereum_jsonrpc] +config :ethereum_jsonrpc, EthereumJSONRPC.RequestCoordinator, + rolling_window_opts: [ + window_count: 6, + window_length: :timer.seconds(10), + bucket: EthereumJSONRPC.RequestCoordinator.TimeoutCounter + ], + wait_per_timeout: :timer.seconds(10) + # Import environment specific config. This must remain at the bottom # of this file so it overrides the configuration defined above. import_config "#{Mix.env()}.exs" diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex index e9f065342b..bf6feada79 100644 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex @@ -17,7 +17,16 @@ defmodule EthereumJSONRPC do """ alias Explorer.Chain.Block - alias EthereumJSONRPC.{Blocks, Receipts, Subscription, Transactions, Transport, Uncles, Variant} + alias EthereumJSONRPC.{ + Blocks, + Receipts, + RequestCoordinator, + Subscription, + Transactions, + Transport, + Uncles, + Variant + } @typedoc """ Truncated 20-byte [KECCAK-256](https://en.wikipedia.org/wiki/SHA-3) hash encoded as a hexadecimal number in a @@ -49,7 +58,7 @@ defmodule EthereumJSONRPC do """ @type json_rpc_named_arguments :: [ - {:transport, Transport.t()} | {:transport_options, Transport.options()} | {:variant, Variant.t()} + {:transport, Transport.t()} | {:transport_options, Transport.options()} | {:variant, Variant.t()} | {:throttle_timeout, non_neg_integer()} ] @typedoc """ @@ -310,8 +319,9 @@ defmodule EthereumJSONRPC do def json_rpc(request, named_arguments) when (is_map(request) or is_list(request)) and is_list(named_arguments) do transport = Keyword.fetch!(named_arguments, :transport) transport_options = Keyword.fetch!(named_arguments, :transport_options) + throttle_timeout = Keyword.get(named_arguments, :throttle_timeout, 60_000) - transport.json_rpc(request, transport_options) + RequestCoordinator.perform(request, transport, transport_options, throttle_timeout) end @doc """ diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/application.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/application.ex index 2cc8ec0a1a..3383e3a875 100644 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/application.ex +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/application.ex @@ -9,9 +9,14 @@ defmodule EthereumJSONRPC.Application do @impl Application def start(_type, _args) do + rolling_window_opts = + :ethereum_jsonrpc, + |> Application.fetch_env!(RequestCoordinator) + |> Keyword.fetch!(:rolling_window_opts) + children = [ :hackney_pool.child_spec(:ethereum_jsonrpc, recv_timeout: 60_000, timeout: 60_000, max_connections: 1000), - {RollingWindow, [RequestCoordinator.rolling_window_opts(), [name: TimeoutCounter]]} + {RollingWindow, [rolling_window_opts]} ] Supervisor.start_link(children, strategy: :one_for_one, name: EthereumJSONRPC.Supervisor) diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/request_coordinator.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/request_coordinator.ex index c8a18b9279..21a9c5b843 100644 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/request_coordinator.ex +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/request_coordinator.ex @@ -1,14 +1,38 @@ defmodule EthereumJSONRPC.RequestCoordinator do @moduledoc """ - Retries JSONRPC requests according to the provided retry_options + Coordinates requests with a backoff strategy. - Leverages `EthereumJSONRPC.RollingWindow` to keep track of the count - of recent timeouts, and waits a small amount of time per timeout. + This module leverages `EthereumJSONRPC.RollingWindow` to track request timeout + that have occurred recently. Options for this functionality can be changed at + the application configuration level. - To see the rolling window options, see `EthereumJSONRPC.Application` + ## Configuration + + The following are the expected and supported options for this module: + + * `:rolling_window_opts` - Options for the process tracking timeouts + * `:window_count` - Number of windows + * `:window_length` - Length of each window in milliseconds + * `:bucket` - name of the bucket to uniquely identify the dataset + * `:wait_per_timeout` - Milliseconds to wait for each recent timeout within the tracked window + + ### Example Configuration + + config :ethereum_jsonrpc, EthereumJSONRPC.RequestCoordinator, + rolling_window_opts: [ + window_count: 6, + window_length: :timer.seconds(10), + bucket: EthereumJSONRPC.RequestCoordinator.TimeoutCounter + ], + wait_per_timeout: :timer.seconds(10) + + With this configuration, timeouts are tracked for 6 windows of 10 seconds for a total of 1 minute. """ - alias EthereumJSONRPC.{RollingWindow, TimeoutCounter} + alias EthereumJSONRPC.{RollingWindow, Transport} + alias EthereumJSONRPC.RequestCoordinator.TimeoutCounter + + @timeout_key :timeout @wait_per_timeout :timer.seconds(5) @rolling_window_opts [ bucket: :ethereum_jsonrpc_bucket, @@ -23,67 +47,53 @@ defmodule EthereumJSONRPC.RequestCoordinator do end @doc """ - Retries the request according to the provided retry_options + Performs a JSON RPC request and adds necessary backoff. - If none were provided, the request is not retried. In all cases, the request - waits an amount of time before proceeding based on the count of recent - failures. + In the event that too many requests have timed out recently and the current + request were to exceed someout threshold, the request isn't performed and + `{:error, :timeout}` is returned. """ - def perform(request, named_arguments) do - transport = Keyword.fetch!(named_arguments, :transport) - transport_options = Keyword.fetch!(named_arguments, :transport_options) - retry_options = Keyword.get(named_arguments, :retry_options) - - if retry_options do - retry_timeout = Keyword.get(retry_options, :retry_timeout, 5_000) - - fn -> - request(transport, request, transport_options, true) - end - |> Task.async() - |> Task.await(retry_timeout) + @spec perform(Transport.request(), Transport.t(), Transport.options(), non_neg_integer()) :: {:ok, Transport.result()} | {:error, term()} + @spec perform(Transport.batch_request(), Transport.t(), Transport.options(), non_neg_integer()) :: {:ok, Transport.batch_result()} | {:error, term()} + def perform(request, transport, transport_options, throttle_timeout) do + sleep_time = sleep_time() + + if sleep_time <= throttle_timeout do + :timer.sleep(sleep_time) + + request + |> transport.json_rpc(transport_options) + |> handle_transport_response() else - request(transport, request, transport_options, false) + {:error, :timeout} end end - defp request(transport, request, transport_options, retry?) do - key = something_that_uniquely_identifies_this_transport(transport, transport_options) + defp handle_transport_response({:error, :timeout} = error) do + increment_recent_timeouts() + error + end - sleep_if_too_many_recent_timeouts(key) + defp handle_transport_response(response), do: response - case transport.json_rpc(request, transport_options) do - {:error, :timeout} = error -> - increment_recent_timeouts(key) + defp sleep_time do + wait_coefficient = RollingWindow.count(bucket(), @timeout_key) - if retry? do - request(transport, request, transport_options, true) - else - error - end + wait_per_timeout = + :ethereum_jsonrpc + |> Application.get_env(__MODULE__) + |> Keyword.fetch!(:wait_per_timeout) - response -> - response - end + wait_coefficient * @wait_per_timeout end - defp increment_recent_timeouts(key) do - RollingWindow.inc(TimeoutCounter, key) + defp increment_recent_timeouts do + RollingWindow.inc(bucket(), @timeout_key) :ok end - defp sleep_if_too_many_recent_timeouts(key) do - wait_coefficient = count_of_recent_timeouts(key) - - :timer.sleep(wait_coefficient * @wait_per_timeout) - end - - defp something_that_uniquely_identifies_this_transport(transport, transport_options) do - to_string(transport) <> "." <> transport_options[:url] - end - - defp count_of_recent_timeouts(key) do - RollingWindow.count(TimeoutCounter, key) + defp bucket do + Application.get_env(:ethereum_jsonrpc, __MODULE__)[:rolling_window_opts] end end diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/rolling_window.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/rolling_window.ex index 0b0f142853..d85bfe598a 100644 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/rolling_window.ex +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/rolling_window.ex @@ -71,6 +71,7 @@ defmodule EthereumJSONRPC.RollingWindow do default = List.to_tuple([key | windows]) :ets.update_counter(table, key, {2, 1}, default) + # TODO consider broadcasting to indexers than some threshold has been met with result of updating the counter {:noreply, state} end