diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/subscription.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/subscription.ex index 266a9197cf..428a9c30c8 100644 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/subscription.ex +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/subscription.ex @@ -5,8 +5,8 @@ defmodule EthereumJSONRPC.Subscription do alias EthereumJSONRPC.Transport - @enforce_keys ~w(id subscriber_pid transport transport_options)a - defstruct ~w(id subscriber_pid transport transport_options)a + @enforce_keys ~w(reference subscriber_pid transport transport_options)a + defstruct ~w(reference subscriber_pid transport transport_options)a @typedoc """ An event that can be subscribed to. @@ -26,12 +26,17 @@ defmodule EthereumJSONRPC.Subscription do @type params :: list() @typedoc """ - * `id` - the `t:/id/0` of the subscription on the server - * `subscriber_pid` - the `t:pid/0` of process where `transport_pid` should send messages - * `transport` - the `t:EthereumJSONRPC.Transport.t/0` callback module - * `transport_options` - options passed to `c:EthereumJSONRPC.Transport.json_rpc/2` + * `reference` - the `t:reference/0` for referring to the subscription when talking to `transport_pid`. + * `subscriber_pid` - the `t:pid/0` of process where `transport_pid` should send messages. + * `transport` - the `t:EthereumJSONRPC.Transport.t/0` callback module. + * `transport_options` - options passed to `c:EthereumJSONRPC.Transport.json_rpc/2`. """ - @type t :: %__MODULE__{id: id, subscriber_pid: pid, transport: Transport.t(), transport_options: Transport.options()} + @type t :: %__MODULE__{ + reference: reference(), + subscriber_pid: pid(), + transport: Transport.t(), + transport_options: Transport.options() + } @doc """ Publishes `messages` to all `subscriptions`s' `subscriber_pid`s. diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket.ex index 95d4885d3d..a0523f2761 100644 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket.ex +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket.ex @@ -7,6 +7,9 @@ defmodule EthereumJSONRPC.WebSocket do @behaviour Transport + @enforce_keys ~w(url web_socket)a + defstruct ~w(url web_socket web_socket_options)a + @typedoc """ WebSocket name """ @@ -19,12 +22,17 @@ defmodule EthereumJSONRPC.WebSocket do # same as `t:GenServer.server/0` @type web_socket :: pid() | name() | {atom(), node()} + @typedoc """ + Options for `web_socket` `t:module/0` in `t:t/0`. + """ + @type web_socket_options :: %{required(:web_socket) => web_socket(), optional(atom()) => term()} + @typedoc """ Options passed to `EthereumJSONRPC.Transport` callbacks. **MUST** contain `t:web_socket/0` referring to `t:pid/0` returned by `c:start_link/2`. """ - @type options :: [{:web_socket, web_socket()} | {:web_socket_options, term()}] + @type t :: %__MODULE__{web_socket: module(), web_socket_options: web_socket_options} @doc """ Allow `c:start_link/1` to be called as part of a supervision tree. @@ -33,8 +41,6 @@ defmodule EthereumJSONRPC.WebSocket do @doc """ Starts web socket attached to `url` with `options`. - - """ # Return is same as `t:GenServer.on_start/0` @callback start_link([url :: String.t() | options :: term()]) :: @@ -83,31 +89,27 @@ defmodule EthereumJSONRPC.WebSocket do @callback unsubscribe(web_socket(), Subscription.t()) :: :ok | {:error, reason :: term()} @impl Transport - @spec json_rpc(Transport.request(), options) :: {:ok, Transport.result()} | {:error, reason :: term()} - def json_rpc(request, options) do - web_socket_module = Keyword.fetch!(options, :web_socket) - %{web_socket: web_socket} = Keyword.fetch!(options, :web_socket_options) - + @spec json_rpc(Transport.request(), t()) :: {:ok, Transport.result()} | {:error, reason :: term()} + def json_rpc(request, %__MODULE__{web_socket: web_socket_module, web_socket_options: %{web_socket: web_socket}}) do web_socket_module.json_rpc(web_socket, request) end @impl Transport - @spec subscribe(event :: Subscription.event(), params :: Subscription.params(), options) :: + @spec subscribe(event :: Subscription.event(), params :: Subscription.params(), t()) :: {:ok, Subscription.t()} | {:error, reason :: term()} - def subscribe(event, params, options) when is_binary(event) and is_list(params) do - web_socket_module = Keyword.fetch!(options, :web_socket) - %{web_socket: web_socket} = Keyword.fetch!(options, :web_socket_options) - + def subscribe(event, params, %__MODULE__{web_socket: web_socket_module, web_socket_options: %{web_socket: web_socket}}) + when is_binary(event) and is_list(params) do web_socket_module.subscribe(web_socket, event, params) end @impl Transport - @spec unsubscribe(%Subscription{transport: __MODULE__, transport_options: options}) :: - :ok | {:error, reason :: term()} - def unsubscribe(%Subscription{transport: __MODULE__, transport_options: transport_options} = subscription) do - web_socket_module = Keyword.fetch!(transport_options, :web_socket) - %{web_socket: web_socket} = Keyword.fetch!(transport_options, :web_socket_options) - + @spec unsubscribe(%Subscription{transport: __MODULE__, transport_options: t()}) :: :ok | {:error, reason :: term()} + def unsubscribe( + %Subscription{ + transport: __MODULE__, + transport_options: %__MODULE__{web_socket: web_socket_module, web_socket_options: %{web_socket: web_socket}} + } = subscription + ) do web_socket_module.unsubscribe(web_socket, subscription) end end diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/registration.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/registration.ex index 5c3e41caec..af3e720a5e 100644 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/registration.ex +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/registration.ex @@ -3,10 +3,10 @@ defmodule EthereumJSONRPC.WebSocket.Registration do When a caller registers for responses to asynchronous frame responses. """ - alias EthereumJSONRPC.Subscription + alias EthereumJSONRPC.{Subscription, Transport} - @enforce_keys ~w(from type)a - defstruct ~w(from type subscription_id)a + @enforce_keys ~w(from request type)a + defstruct ~w(from request type)a @typedoc """ What kind of request will be issued by the caller @@ -20,5 +20,62 @@ defmodule EthereumJSONRPC.WebSocket.Registration do """ @type type :: :json_rpc | :subscribe | :unsubscribe - @type t :: %__MODULE__{from: GenServer.from(), type: type, subscription_id: Subscription.id()} + @typedoc """ + `"eth_subscribe"` + """ + @type subscribe :: Transport.method() + + @typedoc """ + The event to `t:subscribe/0` to. + """ + @type event :: String.t() + + @typedoc """ + Parameters unique to `t:event/0` that customize the `t:subscribe`. + """ + @type event_param :: term() + + @typedoc """ + Parameters to `t:subscribe/0` `t:EthereumJSONRPC.Transport.request/0`. + + A list that start with the `t:event/0` followed by zero or more `t:event_param/0`. + """ + @type subscribe_params :: [event | event_param, ...] + + @typedoc """ + `"eth_unsubscribe"` + """ + @type unsubscribe :: Transport.method() + + @typedoc """ + A list containing the `t:EthereumJSONRPC.Subscription.id/0` that is being unsubscribed. + """ + @type unsubscribe_params :: [Subscription.id(), ...] + + @typedoc """ + * `from` - used to `GenServer.reply/2` to caller + * `type` - the `t:type/0` of request + * `request` - the request sent to the server. Used to replay the request on disconnect. + """ + @type t :: + %__MODULE__{ + from: GenServer.from(), + type: :json_rpc, + request: %{jsonrpc: String.t(), method: Transport.method(), params: list(), id: non_neg_integer()} + } + | %__MODULE__{ + from: GenServer.from(), + type: :subscribe, + request: %{jsonrpc: String.t(), method: subscribe(), params: subscribe_params(), id: non_neg_integer()} + } + | %__MODULE__{ + from: GenServer.from(), + type: :unsubscribe, + request: %{ + jsonrpc: String.t(), + method: unsubscribe(), + params: unsubscribe_params(), + id: non_neg_integer() + } + } end diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/web_socket_client.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/web_socket_client.ex index ccdc2d50ca..304b314c3e 100644 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/web_socket_client.ex +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/web_socket_client.ex @@ -9,15 +9,38 @@ defmodule EthereumJSONRPC.WebSocket.WebSocketClient do alias EthereumJSONRPC.{Subscription, Transport, WebSocket} alias EthereumJSONRPC.WebSocket.Registration + alias EthereumJSONRPC.WebSocket.WebSocketClient.Options @behaviour :websocket_client @behaviour WebSocket @enforce_keys ~w(url)a - defstruct request_id_to_registration: %{}, - subscription_id_to_subscription: %{}, + defstruct connected: false, + request_id_to_registration: %{}, + subscription_id_to_subscription_reference: %{}, + subscription_reference_to_subscription_id: %{}, + subscription_reference_to_subscription: %{}, url: nil + @typedoc """ + * `request_id_to_registration` - maps id of requests in flight to their + `t:EthereumSJONRPC.WebSocket.Registration.t/0`, so that when the response is received from the server, the caller + in `from` of the registration can be `GenServer.reply/2`ed to. + * `subscription_id_to_subscription_reference` - maps id of subscription on the server to the `t:reference/0` used in + the `t:EthereumJSONRPC.Subscription.t/0`. Subscriptions use a `t:reference/0` instead of the server-side id, so + that on reconnect, the id can change, but the subscribe does not need to be notified. + * `subscription_reference_to_subscription` - maps `t:reference/0` in `t:EthereumJSONRPC.Subscription.t/0` to that + `t:EthereumJSONRPC.Subscription.t/0`, so that the `subscriber_pid` can be notified of subscription messages. + * `subscription_reference_to_subscription_id` - maps `t:reference/0` in `t:EthereumJSONRPC.Subscription.t/0 to id of + the subscription on the server, so that the subscriber can unsubscribe with the `t:reference/0`. + """ + @type t :: %__MODULE__{ + request_id_to_registration: %{EthereumJSONRPC.request_id() => Registration.t()}, + subscription_id_to_subscription_reference: %{Subscription.id() => reference()}, + subscription_reference_to_subscription: %{reference() => Subscription.t()}, + subscription_reference_to_subscription_id: %{reference() => Subscription.id()} + } + # Supervisor interface @impl WebSocket @@ -98,13 +121,15 @@ defmodule EthereumJSONRPC.WebSocket.WebSocketClient do end @impl :websocket_client - def onconnect(_, %__MODULE__{} = state) do - {:ok, state} + def onconnect(_, %__MODULE__{connected: false} = state) do + {:ok, reconnect(%__MODULE__{state | connected: true})} end @impl :websocket_client - def ondisconnect(reason, %__MODULE__{} = state) do - {:close, reason, state} + def ondisconnect(_reason, %__MODULE__{request_id_to_registration: request_id_to_registration} = state) do + final_state = Enum.reduce(request_id_to_registration, state, &disconnect_request_id_registration/2) + + {:reconnect, %__MODULE__{final_state | connected: false}} end @impl :websocket_client @@ -121,7 +146,10 @@ defmodule EthereumJSONRPC.WebSocket.WebSocketClient do @impl :websocket_client def websocket_info({:"$gen_call", from, request}, _, %__MODULE__{} = state) do - handle_call(request, from, state) + case handle_call(request, from, state) do + {:reply, _, %__MODULE__{}} = reply -> reply + {:noreply, %__MODULE__{} = new_state} -> {:ok, new_state} + end end @impl :websocket_client @@ -129,24 +157,79 @@ defmodule EthereumJSONRPC.WebSocket.WebSocketClient do broadcast(close, state) end - defp broadcast(message, %__MODULE__{subscription_id_to_subscription: id_to_subscription}) do - id_to_subscription + defp broadcast(message, %__MODULE__{subscription_reference_to_subscription: subscription_reference_to_subscription}) do + subscription_reference_to_subscription |> Map.values() |> Subscription.broadcast(message) end - defp handle_call(message, from, %__MODULE__{} = state) do - {updated_state, unique_request} = register(message, from, state) + # Not re-subscribing after disconnect is the same as a successful unsubscribe + defp disconnect_request_id_registration( + {request_id, + %Registration{ + type: :unsubscribe, + from: from, + request: %{method: "eth_unsubscribe", params: [subscription_id]} + }}, + %__MODULE__{ + request_id_to_registration: request_id_to_registration, + subscription_id_to_subscription_reference: subscription_id_to_subscription_reference, + subscription_reference_to_subscription: subscription_reference_to_subscription, + subscription_reference_to_subscription_id: subscription_reference_to_subscription_id + } = acc_state + ) do + GenServer.reply(from, :ok) + + %{^subscription_id => subscription_reference} = subscription_id_to_subscription_reference + + %__MODULE__{ + acc_state + | request_id_to_registration: Map.delete(request_id_to_registration, request_id), + subscription_id_to_subscription_reference: + Map.delete(subscription_id_to_subscription_reference, subscription_id), + subscription_reference_to_subscription: + Map.delete(subscription_reference_to_subscription, subscription_reference), + subscription_reference_to_subscription_id: + Map.delete(subscription_reference_to_subscription_id, subscription_reference) + } + end + + # Re-run in `onconnect\2` + defp disconnect_request_id_registration(%Registration{type: type}, state) when type in ~w(json_rpc subscribe)a do + state + end + + defp frame(request) do + {:text, Jason.encode!(request)} + end + + defp handle_call(message, from, %__MODULE__{connected: connected} = state) do + case register(message, from, state) do + {:ok, unique_request, updated_state} -> + case connected do + true -> + {:reply, frame(unique_request), updated_state} + + false -> + {:noreply, updated_state} + end - {:reply, {:text, Jason.encode!(unique_request)}, updated_state} + {:error, _reason} = error -> + GenServer.reply(from, error) + {:noreply, state} + end end defp handle_response( %{"method" => "eth_subscription", "params" => %{"result" => result, "subscription" => subscription_id}}, - %__MODULE__{subscription_id_to_subscription: subscription_id_to_subscription} = state + %__MODULE__{ + subscription_id_to_subscription_reference: subscription_id_to_subscription_reference, + subscription_reference_to_subscription: subscription_reference_to_subscription + } = state ) do - case subscription_id_to_subscription do - %{^subscription_id => subscription} -> + case subscription_id_to_subscription_reference do + %{^subscription_id => subscription_reference} -> + %{^subscription_reference => subscription} = subscription_reference_to_subscription Subscription.publish(subscription, {:ok, result}) _ -> @@ -157,7 +240,7 @@ defmodule EthereumJSONRPC.WebSocket.WebSocketClient do ") result (", inspect(result), "). Subscription ID not in known subscription IDs (", - subscription_id_to_subscription + subscription_id_to_subscription_reference |> Map.values() |> Enum.map(&inspect/1), ")." @@ -192,21 +275,30 @@ defmodule EthereumJSONRPC.WebSocket.WebSocketClient do {:ok, state} end + defp reconnect(%__MODULE__{} = state) do + state + |> rerequest() + |> resubscribe() + end + defp register( {:json_rpc, original_request}, from, %__MODULE__{request_id_to_registration: request_id_to_registration} = state ) do unique_id = unique_request_id(state) + request = %{original_request | id: unique_id} - {%__MODULE__{ + {:ok, request, + %__MODULE__{ state | request_id_to_registration: Map.put(request_id_to_registration, unique_id, %Registration{ from: from, - type: :json_rpc + type: :json_rpc, + request: request }) - }, %{original_request | id: unique_id}} + }} end defp register( @@ -216,36 +308,54 @@ defmodule EthereumJSONRPC.WebSocket.WebSocketClient do ) when is_binary(event) and is_list(params) do unique_id = unique_request_id(state) + request = request(%{id: unique_id, method: "eth_subscribe", params: [event | params]}) - { - %__MODULE__{ - state - | request_id_to_registration: - Map.put(request_id_to_registration, unique_id, %Registration{from: from, type: :subscribe}) - }, - request(%{id: unique_id, method: "eth_subscribe", params: [event | params]}) - } + {:ok, request, + %__MODULE__{ + state + | request_id_to_registration: + Map.put(request_id_to_registration, unique_id, %Registration{from: from, type: :subscribe, request: request}) + }} end defp register( - {:unsubscribe, %Subscription{id: subscription_id}}, + {:unsubscribe, %Subscription{reference: subscription_reference}}, from, - %__MODULE__{request_id_to_registration: request_id_to_registration} = state + %__MODULE__{ + request_id_to_registration: request_id_to_registration, + subscription_reference_to_subscription_id: subscription_reference_to_subscription_id + } = state ) do - unique_id = unique_request_id(state) + case subscription_reference_to_subscription_id do + %{^subscription_reference => subscription_id} -> + unique_id = unique_request_id(state) + request = request(%{id: unique_id, method: "eth_unsubscribe", params: [subscription_id]}) + + { + :ok, + request, + %__MODULE__{ + state + | request_id_to_registration: + Map.put(request_id_to_registration, unique_id, %Registration{ + from: from, + type: :unsubscribe, + request: request + }) + } + } - { - %__MODULE__{ - state - | request_id_to_registration: - Map.put(request_id_to_registration, unique_id, %Registration{ - from: from, - type: :unsubscribe, - subscription_id: subscription_id - }) - }, - request(%{id: unique_id, method: "eth_unsubscribe", params: [subscription_id]}) - } + _ -> + {:error, :not_found} + end + end + + defp rerequest(%__MODULE__{request_id_to_registration: request_id_to_registration} = state) do + Enum.each(request_id_to_registration, fn {_, %Registration{request: request}} -> + :websocket_client.cast(self(), frame(request)) + end) + + state end defp respond_to_registration( @@ -265,22 +375,38 @@ defmodule EthereumJSONRPC.WebSocket.WebSocketClient do end defp respond_to_registration( - %Registration{type: :subscribe, from: {subscriber_pid, _} = from}, + %Registration{type: :subscribe, from: {subscriber_pid, _} = from, request: %{params: [event | params]}}, %{"result" => subscription_id}, - %__MODULE__{subscription_id_to_subscription: subscription_id_to_subscription, url: url} = state + %__MODULE__{ + subscription_id_to_subscription_reference: subscription_id_to_subscription_reference, + subscription_reference_to_subscription: subscription_reference_to_subscription, + subscription_reference_to_subscription_id: subscription_reference_to_subscription_id, + url: url + } = state ) do + subscription_reference = make_ref() + subscription = %Subscription{ - id: subscription_id, + reference: subscription_reference, subscriber_pid: subscriber_pid, transport: EthereumJSONRPC.WebSocket, - transport_options: [web_socket: __MODULE__, web_socket_options: %{web_socket: self()}, url: url] + transport_options: %EthereumJSONRPC.WebSocket{ + web_socket: __MODULE__, + web_socket_options: %Options{web_socket: self(), event: event, params: params}, + url: url + } } GenServer.reply(from, {:ok, subscription}) new_state = %__MODULE__{ state - | subscription_id_to_subscription: Map.put(subscription_id_to_subscription, subscription_id, subscription) + | subscription_id_to_subscription_reference: + Map.put(subscription_id_to_subscription_reference, subscription_id, subscription_reference), + subscription_reference_to_subscription: + Map.put(subscription_reference_to_subscription, subscription_reference, subscription), + subscription_reference_to_subscription_id: + Map.put(subscription_reference_to_subscription_id, subscription_reference, subscription_id) } {:ok, new_state} @@ -297,9 +423,17 @@ defmodule EthereumJSONRPC.WebSocket.WebSocketClient do end defp respond_to_registration( - %Registration{type: :unsubscribe, from: from, subscription_id: subscription_id}, + %Registration{ + type: :unsubscribe, + from: from, + request: %{method: "eth_unsubscribe", params: [subscription_id]} + }, response, - %__MODULE__{subscription_id_to_subscription: subscription_id_to_subscription} = state + %__MODULE__{ + subscription_id_to_subscription_reference: subscription_id_to_subscription_reference, + subscription_reference_to_subscription: subscription_reference_to_subscription, + subscription_reference_to_subscription_id: subscription_reference_to_subscription_id + } = state ) do reply = case response do @@ -311,20 +445,77 @@ defmodule EthereumJSONRPC.WebSocket.WebSocketClient do GenServer.reply(from, reply) - new_state = %__MODULE__{ - state - | subscription_id_to_subscription: Map.delete(subscription_id_to_subscription, subscription_id) - } + new_state = + case subscription_id_to_subscription_reference do + %{^subscription_id => subscription_reference} -> + %__MODULE__{ + state + | subscription_id_to_subscription_reference: + Map.delete(subscription_id_to_subscription_reference, subscription_id), + subscription_reference_to_subscription: + Map.delete(subscription_reference_to_subscription, subscription_reference), + subscription_reference_to_subscription_id: + Map.delete(subscription_reference_to_subscription_id, subscription_reference) + } + + _ -> + state + end {:ok, new_state} end - defp respond_to_registration(nil, response, %__MODULE__{} = state) do - Logger.error(fn -> ["Got response for unregistered request ID: ", inspect(response)] end) + defp respond_to_registration( + nil, + response, + %__MODULE__{request_id_to_registration: request_id_to_registration} = state + ) do + Logger.error(fn -> + [ + "Got response for unregistered request ID: ", + inspect(response), + ". Outstanding request registrations: ", + inspect(request_id_to_registration) + ] + end) {:ok, state} end + defp resubscribe( + %__MODULE__{subscription_reference_to_subscription: subscription_reference_to_subscription} = initial_state + ) do + Enum.reduce(subscription_reference_to_subscription, initial_state, fn {subscription_reference, + %Subscription{ + transport_options: %WebSocket{ + web_socket: __MODULE__, + web_socket_options: %Options{ + event: event, + params: params + } + } + }}, + %__MODULE__{ + request_id_to_registration: + acc_request_id_to_registration + } = acc_state -> + request_id = unique_request_id(acc_state) + request = request(%{id: request_id, method: "eth_subscribe", params: [event | params]}) + + :websocket_client.cast(self(), frame(request)) + + %__MODULE__{ + acc_state + | request_id_to_registration: + Map.put(acc_request_id_to_registration, request_id, %Registration{ + from: {self(), subscription_reference}, + type: :subscribe, + request: request + }) + } + end) + end + defp unique_request_id(%__MODULE__{request_id_to_registration: request_id_to_registration} = state) do unique_request_id = EthereumJSONRPC.unique_request_id() diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/web_socket_client/options.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/web_socket_client/options.ex new file mode 100644 index 0000000000..6d283ddfce --- /dev/null +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/web_socket_client/options.ex @@ -0,0 +1,17 @@ +defmodule EthereumJSONRPC.WebSocket.WebSocketClient.Options do + @moduledoc """ + `t:EthereumJSONRPC.WebSocket.options/0` for `EthereumJSONRPC.WebSocket.WebSocketClient` `t:EthereumJSONRPC.Subscription.t/0` `transport_options`. + """ + + alias EthereumJSONRPC.Subscription + + @enforce_keys ~w(web_socket)a + defstruct ~w(web_socket event params)a + + @typedoc """ + * `web_socket` - the `t:pid/0` of the `EthereumJSONRPC.WebSocket.WebSocketClient`. + * `event` - the event that should be resubscribed to after disconnect. + * `params` - the parameters that should be used to customized `event` when resubscribing after disconnect. + """ + @type t :: %__MODULE__{web_socket: pid(), event: Subscription.event(), params: Subscription.params()} +end diff --git a/apps/ethereum_jsonrpc/test/ethereum_jsonrpc/web_socket_test.exs b/apps/ethereum_jsonrpc/test/ethereum_jsonrpc/web_socket_test.exs index 9f932720e4..0433d59a11 100644 --- a/apps/ethereum_jsonrpc/test/ethereum_jsonrpc/web_socket_test.exs +++ b/apps/ethereum_jsonrpc/test/ethereum_jsonrpc/web_socket_test.exs @@ -5,14 +5,15 @@ defmodule EthereumJSONRPC.WebSocketTest do import Mox alias EthereumJSONRPC.{Subscription, WebSocket} + alias EthereumJSONRPC.WebSocket.WebSocketClient setup :verify_on_exit! describe "json_rpc/2" do test "can get result", %{subscribe_named_arguments: subscribe_named_arguments} do - transport_options = subscribe_named_arguments[:transport_options] + %WebSocket{web_socket: web_socket} = transport_options = subscribe_named_arguments[:transport_options] - if transport_options[:web_socket] == EthereumJSONRPC.WebSocket.Mox do + if web_socket == EthereumJSONRPC.WebSocket.Mox do expect(EthereumJSONRPC.WebSocket.Mox, :json_rpc, fn _, _ -> {:ok, %{"number" => "0x0"}} end) @@ -27,9 +28,9 @@ defmodule EthereumJSONRPC.WebSocketTest do # Infura timeouts on 2018-09-10 @tag :no_geth test "can get error", %{subscribe_named_arguments: subscribe_named_arguments} do - transport_options = subscribe_named_arguments[:transport_options] + %WebSocket{web_socket: web_socket} = transport_options = subscribe_named_arguments[:transport_options] - if transport_options[:web_socket] == EthereumJSONRPC.WebSocket.Mox do + if web_socket == EthereumJSONRPC.WebSocket.Mox do expect(EthereumJSONRPC.WebSocket.Mox, :json_rpc, fn _, _ -> {:error, %{ @@ -55,30 +56,39 @@ defmodule EthereumJSONRPC.WebSocketTest do describe "subscribe/2" do test "can subscribe to newHeads", %{subscribe_named_arguments: subscribe_named_arguments} do transport = Keyword.fetch!(subscribe_named_arguments, :transport) - transport_options = subscribe_named_arguments[:transport_options] + %WebSocket{web_socket: web_socket_module} = transport_options = subscribe_named_arguments[:transport_options] subscriber_pid = self() - if transport_options[:web_socket] == EthereumJSONRPC.WebSocket.Mox do - expect(EthereumJSONRPC.WebSocket.Mox, :subscribe, fn _, _, _ -> - {:ok, - %Subscription{ - id: "0x1", - subscriber_pid: subscriber_pid, - transport: transport, - transport_options: transport_options - }} - end) - end + subscription_transport_options = + case web_socket_module do + EthereumJSONRPC.WebSocket.Mox -> + expect(EthereumJSONRPC.WebSocket.Mox, :subscribe, fn _, "newHeads", [] -> + {:ok, + %Subscription{ + reference: make_ref(), + subscriber_pid: subscriber_pid, + transport: transport, + transport_options: transport_options + }} + end) + + transport_options + + EthereumJSONRPC.WebSocket.WebSocketClient -> + update_in(transport_options.web_socket_options, fn %WebSocketClient.Options{} = web_socket_options -> + %WebSocketClient.Options{web_socket_options | event: "newHeads", params: []} + end) + end assert {:ok, %Subscription{ - id: subscription_id, + reference: subscription_reference, subscriber_pid: ^subscriber_pid, transport: ^transport, - transport_options: ^transport_options + transport_options: ^subscription_transport_options }} = WebSocket.subscribe("newHeads", [], transport_options) - assert is_binary(subscription_id) + assert is_reference(subscription_reference) end # Infura timeouts on 2018-09-10 @@ -87,14 +97,13 @@ defmodule EthereumJSONRPC.WebSocketTest do block_interval: block_interval, subscribe_named_arguments: subscribe_named_arguments } do - transport_options = subscribe_named_arguments[:transport_options] - web_socket_module = Keyword.fetch!(transport_options, :web_socket) + %WebSocket{web_socket: web_socket_module} = transport_options = subscribe_named_arguments[:transport_options] subscriber_pid = self() if web_socket_module == EthereumJSONRPC.WebSocket.Mox do expect(web_socket_module, :subscribe, fn _, _, _ -> subscription = %Subscription{ - id: "0x1", + reference: make_ref(), subscriber_pid: subscriber_pid, transport: Keyword.fetch!(subscribe_named_arguments, :transport), transport_options: transport_options @@ -114,13 +123,12 @@ defmodule EthereumJSONRPC.WebSocketTest do describe "unsubscribe/2" do test "can unsubscribe", %{subscribe_named_arguments: subscribe_named_arguments} do - transport_options = subscribe_named_arguments[:transport_options] - web_socket_module = Keyword.fetch!(transport_options, :web_socket) + %WebSocket{web_socket: web_socket_module} = transport_options = subscribe_named_arguments[:transport_options] subscriber_pid = self() if web_socket_module == EthereumJSONRPC.WebSocket.Mox do subscription = %Subscription{ - id: "0x1", + reference: make_ref(), subscriber_pid: subscriber_pid, transport: Keyword.fetch!(subscribe_named_arguments, :transport), transport_options: transport_options @@ -142,13 +150,12 @@ defmodule EthereumJSONRPC.WebSocketTest do block_interval: block_interval, subscribe_named_arguments: subscribe_named_arguments } do - transport_options = subscribe_named_arguments[:transport_options] - web_socket_module = Keyword.fetch!(transport_options, :web_socket) + %WebSocket{web_socket: web_socket_module} = transport_options = subscribe_named_arguments[:transport_options] subscriber_pid = self() if web_socket_module == EthereumJSONRPC.WebSocket.Mox do subscription = %Subscription{ - id: "0x1", + reference: make_ref(), subscriber_pid: subscriber_pid, transport: Keyword.fetch!(subscribe_named_arguments, :transport), transport_options: transport_options @@ -189,13 +196,12 @@ defmodule EthereumJSONRPC.WebSocketTest do end test "return error if already unsubscribed", %{subscribe_named_arguments: subscribe_named_arguments} do - transport_options = subscribe_named_arguments[:transport_options] - web_socket_module = Keyword.fetch!(transport_options, :web_socket) + %WebSocket{web_socket: web_socket_module} = transport_options = subscribe_named_arguments[:transport_options] subscriber_pid = self() if web_socket_module == EthereumJSONRPC.WebSocket.Mox do subscription = %Subscription{ - id: "0x1", + reference: make_ref(), subscriber_pid: subscriber_pid, transport: Keyword.fetch!(subscribe_named_arguments, :transport), transport_options: transport_options diff --git a/apps/ethereum_jsonrpc/test/ethereum_jsonrpc_test.exs b/apps/ethereum_jsonrpc/test/ethereum_jsonrpc_test.exs index b21a08d801..022bf62bdc 100644 --- a/apps/ethereum_jsonrpc/test/ethereum_jsonrpc_test.exs +++ b/apps/ethereum_jsonrpc/test/ethereum_jsonrpc_test.exs @@ -5,6 +5,7 @@ defmodule EthereumJSONRPCTest do import Mox alias EthereumJSONRPC.Subscription + alias EthereumJSONRPC.WebSocket.WebSocketClient setup :verify_on_exit! @@ -548,27 +549,36 @@ defmodule EthereumJSONRPCTest do transport_options = subscribe_named_arguments[:transport_options] subscriber_pid = self() - if transport == EthereumJSONRPC.Mox do - expect(transport, :subscribe, fn _, _, _ -> - {:ok, - %Subscription{ - id: "0x1", - subscriber_pid: subscriber_pid, - transport: transport, - transport_options: transport_options - }} - end) - end + subscription_transport_options = + case transport do + EthereumJSONRPC.Mox -> + expect(transport, :subscribe, fn "newHeads", [], _ -> + {:ok, + %Subscription{ + reference: make_ref(), + subscriber_pid: subscriber_pid, + transport: transport, + transport_options: transport_options + }} + end) + + transport_options + + EthereumJSONRPC.WebSocket -> + update_in(transport_options.web_socket_options, fn %WebSocketClient.Options{} = web_socket_options -> + %WebSocketClient.Options{web_socket_options | event: "newHeads", params: []} + end) + end assert {:ok, %Subscription{ - id: subscription_id, + reference: subscription_reference, subscriber_pid: ^subscriber_pid, transport: ^transport, - transport_options: ^transport_options + transport_options: ^subscription_transport_options }} = EthereumJSONRPC.subscribe("newHeads", subscribe_named_arguments) - assert is_binary(subscription_id) + assert is_reference(subscription_reference) end # Infura timeouts on 2018-09-12 @@ -584,7 +594,7 @@ defmodule EthereumJSONRPCTest do if transport == EthereumJSONRPC.Mox do expect(transport, :subscribe, fn _, _, _ -> subscription = %Subscription{ - id: "0x1", + reference: make_ref(), subscriber_pid: subscriber_pid, transport: transport, transport_options: transport_options @@ -612,7 +622,7 @@ defmodule EthereumJSONRPCTest do if transport == EthereumJSONRPC.Mox do subscription = %Subscription{ - id: "0x1", + reference: make_ref(), subscriber_pid: subscriber_pid, transport: transport, transport_options: transport_options @@ -639,7 +649,7 @@ defmodule EthereumJSONRPCTest do if transport == EthereumJSONRPC.Mox do subscription = %Subscription{ - id: "0x1", + reference: make_ref(), subscriber_pid: subscriber_pid, transport: transport, transport_options: Keyword.fetch!(subscribe_named_arguments, :transport_options) @@ -685,7 +695,7 @@ defmodule EthereumJSONRPCTest do if transport == EthereumJSONRPC.Mox do subscription = %Subscription{ - id: "0x1", + reference: make_ref(), subscriber_pid: subscriber_pid, transport: transport, transport_options: transport_options diff --git a/apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/case/geth.ex b/apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/case/geth.ex index 6d235f0e9b..f808c24900 100644 --- a/apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/case/geth.ex +++ b/apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/case/geth.ex @@ -14,11 +14,11 @@ defmodule EthereumJSONRPC.WebSocket.Case.Geth do block_interval: 25_000, subscribe_named_arguments: [ transport: EthereumJSONRPC.WebSocket, - transport_options: [ + transport_options: %EthereumJSONRPC.WebSocket{ web_socket: web_socket_module, - web_socket_options: %{web_socket: web_socket}, + web_socket_options: %EthereumJSONRPC.WebSocket.WebSocketClient.Options{web_socket: web_socket}, url: url - ] + } ] } end diff --git a/apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/case/mox.ex b/apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/case/mox.ex index 93544a7e4c..94d0d00b61 100644 --- a/apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/case/mox.ex +++ b/apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/case/mox.ex @@ -35,11 +35,11 @@ defmodule EthereumJSONRPC.WebSocket.Case.Mox do block_interval: @block_interval, subscribe_named_arguments: [ transport: EthereumJSONRPC.WebSocket, - transport_options: [ + transport_options: %EthereumJSONRPC.WebSocket{ web_socket: web_socket_module, web_socket_options: %{web_socket: web_socket}, url: url - ] + } ] } end diff --git a/apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/case/parity.ex b/apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/case/parity.ex index 11f08beeb1..f63321283c 100644 --- a/apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/case/parity.ex +++ b/apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/case/parity.ex @@ -14,11 +14,11 @@ defmodule EthereumJSONRPC.WebSocket.Case.Parity do block_interval: 5_000, subscribe_named_arguments: [ transport: EthereumJSONRPC.WebSocket, - transport_options: [ + transport_options: %EthereumJSONRPC.WebSocket{ web_socket: web_socket_module, - web_socket_options: %{web_socket: web_socket}, + web_socket_options: %EthereumJSONRPC.WebSocket.WebSocketClient.Options{web_socket: web_socket}, url: url - ] + } ] } end diff --git a/apps/indexer/lib/indexer/block/realtime/supervisor.ex b/apps/indexer/lib/indexer/block/realtime/supervisor.ex index c46fc82deb..0da69b04fc 100644 --- a/apps/indexer/lib/indexer/block/realtime/supervisor.ex +++ b/apps/indexer/lib/indexer/block/realtime/supervisor.ex @@ -14,13 +14,16 @@ defmodule Indexer.Block.Realtime.Supervisor do children = case Keyword.fetch!(subscribe_named_arguments, :transport) do EthereumJSONRPC.WebSocket -> - transport_options = Keyword.fetch!(subscribe_named_arguments, :transport_options) - url = Keyword.fetch!(transport_options, :url) - web_socket_module = Keyword.fetch!(transport_options, :web_socket) + transport_options = + struct!(EthereumJSONRPC.WebSocket, Keyword.fetch!(subscribe_named_arguments, :transport_options)) + web_socket = Indexer.Block.Realtime.WebSocket + web_socket_options = %EthereumJSONRPC.WebSocket.WebSocketClient.Options{web_socket: web_socket} + transport_options = %EthereumJSONRPC.WebSocket{transport_options | web_socket_options: web_socket_options} + %EthereumJSONRPC.WebSocket{url: url, web_socket: web_socket_module} = transport_options block_fetcher_subscribe_named_arguments = - put_in(subscribe_named_arguments[:transport_options][:web_socket_options], %{web_socket: web_socket}) + put_in(subscribe_named_arguments[:transport_options], transport_options) [ {Task.Supervisor, name: Indexer.Block.Realtime.TaskSupervisor},