diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/subscription.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/subscription.ex index 266a9197cf..428a9c30c8 100644 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/subscription.ex +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/subscription.ex @@ -5,8 +5,8 @@ defmodule EthereumJSONRPC.Subscription do alias EthereumJSONRPC.Transport - @enforce_keys ~w(id subscriber_pid transport transport_options)a - defstruct ~w(id subscriber_pid transport transport_options)a + @enforce_keys ~w(reference subscriber_pid transport transport_options)a + defstruct ~w(reference subscriber_pid transport transport_options)a @typedoc """ An event that can be subscribed to. @@ -26,12 +26,17 @@ defmodule EthereumJSONRPC.Subscription do @type params :: list() @typedoc """ - * `id` - the `t:/id/0` of the subscription on the server - * `subscriber_pid` - the `t:pid/0` of process where `transport_pid` should send messages - * `transport` - the `t:EthereumJSONRPC.Transport.t/0` callback module - * `transport_options` - options passed to `c:EthereumJSONRPC.Transport.json_rpc/2` + * `reference` - the `t:reference/0` for referring to the subscription when talking to `transport_pid`. + * `subscriber_pid` - the `t:pid/0` of process where `transport_pid` should send messages. + * `transport` - the `t:EthereumJSONRPC.Transport.t/0` callback module. + * `transport_options` - options passed to `c:EthereumJSONRPC.Transport.json_rpc/2`. """ - @type t :: %__MODULE__{id: id, subscriber_pid: pid, transport: Transport.t(), transport_options: Transport.options()} + @type t :: %__MODULE__{ + reference: reference(), + subscriber_pid: pid(), + transport: Transport.t(), + transport_options: Transport.options() + } @doc """ Publishes `messages` to all `subscriptions`s' `subscriber_pid`s. diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket.ex index 95d4885d3d..a0523f2761 100644 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket.ex +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket.ex @@ -7,6 +7,9 @@ defmodule EthereumJSONRPC.WebSocket do @behaviour Transport + @enforce_keys ~w(url web_socket)a + defstruct ~w(url web_socket web_socket_options)a + @typedoc """ WebSocket name """ @@ -19,12 +22,17 @@ defmodule EthereumJSONRPC.WebSocket do # same as `t:GenServer.server/0` @type web_socket :: pid() | name() | {atom(), node()} + @typedoc """ + Options for `web_socket` `t:module/0` in `t:t/0`. + """ + @type web_socket_options :: %{required(:web_socket) => web_socket(), optional(atom()) => term()} + @typedoc """ Options passed to `EthereumJSONRPC.Transport` callbacks. **MUST** contain `t:web_socket/0` referring to `t:pid/0` returned by `c:start_link/2`. """ - @type options :: [{:web_socket, web_socket()} | {:web_socket_options, term()}] + @type t :: %__MODULE__{web_socket: module(), web_socket_options: web_socket_options} @doc """ Allow `c:start_link/1` to be called as part of a supervision tree. @@ -33,8 +41,6 @@ defmodule EthereumJSONRPC.WebSocket do @doc """ Starts web socket attached to `url` with `options`. - - """ # Return is same as `t:GenServer.on_start/0` @callback start_link([url :: String.t() | options :: term()]) :: @@ -83,31 +89,27 @@ defmodule EthereumJSONRPC.WebSocket do @callback unsubscribe(web_socket(), Subscription.t()) :: :ok | {:error, reason :: term()} @impl Transport - @spec json_rpc(Transport.request(), options) :: {:ok, Transport.result()} | {:error, reason :: term()} - def json_rpc(request, options) do - web_socket_module = Keyword.fetch!(options, :web_socket) - %{web_socket: web_socket} = Keyword.fetch!(options, :web_socket_options) - + @spec json_rpc(Transport.request(), t()) :: {:ok, Transport.result()} | {:error, reason :: term()} + def json_rpc(request, %__MODULE__{web_socket: web_socket_module, web_socket_options: %{web_socket: web_socket}}) do web_socket_module.json_rpc(web_socket, request) end @impl Transport - @spec subscribe(event :: Subscription.event(), params :: Subscription.params(), options) :: + @spec subscribe(event :: Subscription.event(), params :: Subscription.params(), t()) :: {:ok, Subscription.t()} | {:error, reason :: term()} - def subscribe(event, params, options) when is_binary(event) and is_list(params) do - web_socket_module = Keyword.fetch!(options, :web_socket) - %{web_socket: web_socket} = Keyword.fetch!(options, :web_socket_options) - + def subscribe(event, params, %__MODULE__{web_socket: web_socket_module, web_socket_options: %{web_socket: web_socket}}) + when is_binary(event) and is_list(params) do web_socket_module.subscribe(web_socket, event, params) end @impl Transport - @spec unsubscribe(%Subscription{transport: __MODULE__, transport_options: options}) :: - :ok | {:error, reason :: term()} - def unsubscribe(%Subscription{transport: __MODULE__, transport_options: transport_options} = subscription) do - web_socket_module = Keyword.fetch!(transport_options, :web_socket) - %{web_socket: web_socket} = Keyword.fetch!(transport_options, :web_socket_options) - + @spec unsubscribe(%Subscription{transport: __MODULE__, transport_options: t()}) :: :ok | {:error, reason :: term()} + def unsubscribe( + %Subscription{ + transport: __MODULE__, + transport_options: %__MODULE__{web_socket: web_socket_module, web_socket_options: %{web_socket: web_socket}} + } = subscription + ) do web_socket_module.unsubscribe(web_socket, subscription) end end diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/registration.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/registration.ex index 5c3e41caec..af3e720a5e 100644 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/registration.ex +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/registration.ex @@ -3,10 +3,10 @@ defmodule EthereumJSONRPC.WebSocket.Registration do When a caller registers for responses to asynchronous frame responses. """ - alias EthereumJSONRPC.Subscription + alias EthereumJSONRPC.{Subscription, Transport} - @enforce_keys ~w(from type)a - defstruct ~w(from type subscription_id)a + @enforce_keys ~w(from request type)a + defstruct ~w(from request type)a @typedoc """ What kind of request will be issued by the caller @@ -20,5 +20,62 @@ defmodule EthereumJSONRPC.WebSocket.Registration do """ @type type :: :json_rpc | :subscribe | :unsubscribe - @type t :: %__MODULE__{from: GenServer.from(), type: type, subscription_id: Subscription.id()} + @typedoc """ + `"eth_subscribe"` + """ + @type subscribe :: Transport.method() + + @typedoc """ + The event to `t:subscribe/0` to. + """ + @type event :: String.t() + + @typedoc """ + Parameters unique to `t:event/0` that customize the `t:subscribe`. + """ + @type event_param :: term() + + @typedoc """ + Parameters to `t:subscribe/0` `t:EthereumJSONRPC.Transport.request/0`. + + A list that start with the `t:event/0` followed by zero or more `t:event_param/0`. + """ + @type subscribe_params :: [event | event_param, ...] + + @typedoc """ + `"eth_unsubscribe"` + """ + @type unsubscribe :: Transport.method() + + @typedoc """ + A list containing the `t:EthereumJSONRPC.Subscription.id/0` that is being unsubscribed. + """ + @type unsubscribe_params :: [Subscription.id(), ...] + + @typedoc """ + * `from` - used to `GenServer.reply/2` to caller + * `type` - the `t:type/0` of request + * `request` - the request sent to the server. Used to replay the request on disconnect. + """ + @type t :: + %__MODULE__{ + from: GenServer.from(), + type: :json_rpc, + request: %{jsonrpc: String.t(), method: Transport.method(), params: list(), id: non_neg_integer()} + } + | %__MODULE__{ + from: GenServer.from(), + type: :subscribe, + request: %{jsonrpc: String.t(), method: subscribe(), params: subscribe_params(), id: non_neg_integer()} + } + | %__MODULE__{ + from: GenServer.from(), + type: :unsubscribe, + request: %{ + jsonrpc: String.t(), + method: unsubscribe(), + params: unsubscribe_params(), + id: non_neg_integer() + } + } end diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/web_socket_client.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/web_socket_client.ex index 469c339804..5ff3d331b9 100644 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/web_socket_client.ex +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/web_socket_client.ex @@ -9,15 +9,38 @@ defmodule EthereumJSONRPC.WebSocket.WebSocketClient do alias EthereumJSONRPC.{Subscription, Transport, WebSocket} alias EthereumJSONRPC.WebSocket.Registration + alias EthereumJSONRPC.WebSocket.WebSocketClient.Options @behaviour :websocket_client @behaviour WebSocket @enforce_keys ~w(url)a - defstruct request_id_to_registration: %{}, - subscription_id_to_subscription: %{}, + defstruct connected: false, + request_id_to_registration: %{}, + subscription_id_to_subscription_reference: %{}, + subscription_reference_to_subscription_id: %{}, + subscription_reference_to_subscription: %{}, url: nil + @typedoc """ + * `request_id_to_registration` - maps id of requests in flight to their + `t:EthereumSJONRPC.WebSocket.Registration.t/0`, so that when the response is received from the server, the caller + in `from` of the registration can be `GenServer.reply/2`ed to. + * `subscription_id_to_subscription_reference` - maps id of subscription on the server to the `t:reference/0` used in + the `t:EthereumJSONRPC.Subscription.t/0`. Subscriptions use a `t:reference/0` instead of the server-side id, so + that on reconnect, the id can change, but the subscribe does not need to be notified. + * `subscription_reference_to_subscription` - maps `t:reference/0` in `t:EthereumJSONRPC.Subscription.t/0` to that + `t:EthereumJSONRPC.Subscription.t/0`, so that the `subscriber_pid` can be notified of subscription messages. + * `subscription_reference_to_subscription_id` - maps `t:reference/0` in `t:EthereumJSONRPC.Subscription.t/0 to id of + the subscription on the server, so that the subscriber can unsubscribe with the `t:reference/0`. + """ + @type t :: %__MODULE__{ + request_id_to_registration: %{EthereumJSONRPC.request_id() => Registration.t()}, + subscription_id_to_subscription_reference: %{Subscription.id() => reference()}, + subscription_reference_to_subscription: %{reference() => Subscription.t()}, + subscription_reference_to_subscription_id: %{reference() => Subscription.id()} + } + # Supervisor interface @impl WebSocket @@ -98,13 +121,15 @@ defmodule EthereumJSONRPC.WebSocket.WebSocketClient do end @impl :websocket_client - def onconnect(_, %__MODULE__{} = state) do - {:ok, state} + def onconnect(_, %__MODULE__{connected: false} = state) do + {:ok, reconnect(%__MODULE__{state | connected: true})} end @impl :websocket_client - def ondisconnect(reason, %__MODULE__{} = state) do - {:close, reason, state} + def ondisconnect(_reason, %__MODULE__{request_id_to_registration: request_id_to_registration} = state) do + final_state = Enum.reduce(request_id_to_registration, state, &disconnect_request_id_registration/2) + + {:reconnect, %__MODULE__{final_state | connected: false}} end @impl :websocket_client @@ -121,7 +146,10 @@ defmodule EthereumJSONRPC.WebSocket.WebSocketClient do @impl :websocket_client def websocket_info({:"$gen_call", from, request}, _, %__MODULE__{} = state) do - handle_call(request, from, state) + case handle_call(request, from, state) do + {:reply, _, %__MODULE__{}} = reply -> reply + {:noreply, %__MODULE__{} = new_state} -> {:ok, new_state} + end end @impl :websocket_client @@ -129,24 +157,80 @@ defmodule EthereumJSONRPC.WebSocket.WebSocketClient do broadcast(close, state) end - defp broadcast(message, %__MODULE__{subscription_id_to_subscription: id_to_subscription}) do - id_to_subscription + defp broadcast(message, %__MODULE__{subscription_reference_to_subscription: subscription_reference_to_subscription}) do + subscription_reference_to_subscription |> Map.values() |> Subscription.broadcast(message) end - defp handle_call(message, from, %__MODULE__{} = state) do - {updated_state, unique_request} = register(message, from, state) + # Not re-subscribing after disconnect is the same as a successful unsubscribe + defp disconnect_request_id_registration( + {request_id, + %Registration{ + type: :unsubscribe, + from: from, + request: %{method: "eth_unsubscribe", params: [subscription_id]} + }}, + %__MODULE__{ + request_id_to_registration: request_id_to_registration, + subscription_id_to_subscription_reference: subscription_id_to_subscription_reference, + subscription_reference_to_subscription: subscription_reference_to_subscription, + subscription_reference_to_subscription_id: subscription_reference_to_subscription_id + } = acc_state + ) do + GenServer.reply(from, :ok) + + %{^subscription_id => subscription_reference} = subscription_id_to_subscription_reference + + %__MODULE__{ + acc_state + | request_id_to_registration: Map.delete(request_id_to_registration, request_id), + subscription_id_to_subscription_reference: + Map.delete(subscription_id_to_subscription_reference, subscription_id), + subscription_reference_to_subscription: + Map.delete(subscription_reference_to_subscription, subscription_reference), + subscription_reference_to_subscription_id: + Map.delete(subscription_reference_to_subscription_id, subscription_reference) + } + end - {:reply, {:text, Jason.encode!(unique_request)}, updated_state} + # Re-run in `onconnect\2` + defp disconnect_request_id_registration({_request_id, %Registration{type: type}}, state) + when type in ~w(json_rpc subscribe)a do + state + end + + defp frame(request) do + {:text, Jason.encode!(request)} + end + + defp handle_call(message, from, %__MODULE__{connected: connected} = state) do + case register(message, from, state) do + {:ok, unique_request, updated_state} -> + case connected do + true -> + {:reply, frame(unique_request), updated_state} + + false -> + {:noreply, updated_state} + end + + {:error, _reason} = error -> + GenServer.reply(from, error) + {:noreply, state} + end end defp handle_response( %{"method" => "eth_subscription", "params" => %{"result" => result, "subscription" => subscription_id}}, - %__MODULE__{subscription_id_to_subscription: subscription_id_to_subscription} = state + %__MODULE__{ + subscription_id_to_subscription_reference: subscription_id_to_subscription_reference, + subscription_reference_to_subscription: subscription_reference_to_subscription + } = state ) do - case subscription_id_to_subscription do - %{^subscription_id => subscription} -> + case subscription_id_to_subscription_reference do + %{^subscription_id => subscription_reference} -> + %{^subscription_reference => subscription} = subscription_reference_to_subscription Subscription.publish(subscription, {:ok, result}) _ -> @@ -157,7 +241,7 @@ defmodule EthereumJSONRPC.WebSocket.WebSocketClient do ") result (", inspect(result), "). Subscription ID not in known subscription IDs (", - subscription_id_to_subscription + subscription_id_to_subscription_reference |> Map.values() |> Enum.map(&inspect/1), ")." @@ -173,7 +257,9 @@ defmodule EthereumJSONRPC.WebSocket.WebSocketClient do %__MODULE__{request_id_to_registration: request_id_to_registration} = state ) do {registration, new_request_id_to_registration} = Map.pop(request_id_to_registration, id) - respond_to_registration(registration, new_request_id_to_registration, response, state) + new_state = %__MODULE__{state | request_id_to_registration: new_request_id_to_registration} + + respond_to_registration(registration, response, new_state) end defp handle_response(response, %__MODULE__{} = state) do @@ -190,21 +276,30 @@ defmodule EthereumJSONRPC.WebSocket.WebSocketClient do {:ok, state} end + defp reconnect(%__MODULE__{} = state) do + state + |> rerequest() + |> resubscribe() + end + defp register( {:json_rpc, original_request}, from, %__MODULE__{request_id_to_registration: request_id_to_registration} = state ) do unique_id = unique_request_id(state) + request = %{original_request | id: unique_id} - {%__MODULE__{ + {:ok, request, + %__MODULE__{ state | request_id_to_registration: Map.put(request_id_to_registration, unique_id, %Registration{ from: from, - type: :json_rpc + type: :json_rpc, + request: request }) - }, %{original_request | id: unique_id}} + }} end defp register( @@ -214,41 +309,58 @@ defmodule EthereumJSONRPC.WebSocket.WebSocketClient do ) when is_binary(event) and is_list(params) do unique_id = unique_request_id(state) + request = request(%{id: unique_id, method: "eth_subscribe", params: [event | params]}) - { - %__MODULE__{ - state - | request_id_to_registration: - Map.put(request_id_to_registration, unique_id, %Registration{from: from, type: :subscribe}) - }, - request(%{id: unique_id, method: "eth_subscribe", params: [event | params]}) - } + {:ok, request, + %__MODULE__{ + state + | request_id_to_registration: + Map.put(request_id_to_registration, unique_id, %Registration{from: from, type: :subscribe, request: request}) + }} end defp register( - {:unsubscribe, %Subscription{id: subscription_id}}, + {:unsubscribe, %Subscription{reference: subscription_reference}}, from, - %__MODULE__{request_id_to_registration: request_id_to_registration} = state + %__MODULE__{ + request_id_to_registration: request_id_to_registration, + subscription_reference_to_subscription_id: subscription_reference_to_subscription_id + } = state ) do - unique_id = unique_request_id(state) + case subscription_reference_to_subscription_id do + %{^subscription_reference => subscription_id} -> + unique_id = unique_request_id(state) + request = request(%{id: unique_id, method: "eth_unsubscribe", params: [subscription_id]}) + + { + :ok, + request, + %__MODULE__{ + state + | request_id_to_registration: + Map.put(request_id_to_registration, unique_id, %Registration{ + from: from, + type: :unsubscribe, + request: request + }) + } + } - { - %__MODULE__{ - state - | request_id_to_registration: - Map.put(request_id_to_registration, unique_id, %Registration{ - from: from, - type: :unsubscribe, - subscription_id: subscription_id - }) - }, - request(%{id: unique_id, method: "eth_unsubscribe", params: [subscription_id]}) - } + _ -> + {:error, :not_found} + end + end + + defp rerequest(%__MODULE__{request_id_to_registration: request_id_to_registration} = state) do + Enum.each(request_id_to_registration, fn {_, %Registration{request: request}} -> + :websocket_client.cast(self(), frame(request)) + end) + + state end defp respond_to_registration( %Registration{type: :json_rpc, from: from}, - new_request_id_to_registration, response, %__MODULE__{} = state ) do @@ -260,48 +372,96 @@ defmodule EthereumJSONRPC.WebSocket.WebSocketClient do GenServer.reply(from, reply) - {:ok, %__MODULE__{state | request_id_to_registration: new_request_id_to_registration}} + {:ok, state} end defp respond_to_registration( - %Registration{type: :subscribe, from: {subscriber_pid, _} = from}, - new_request_id_to_registration, + %Registration{ + type: :subscribe, + from: {subscriber_pid, from_reference} = from, + request: %{params: [event | params]} + }, %{"result" => subscription_id}, - %__MODULE__{url: url} = state + %__MODULE__{ + subscription_id_to_subscription_reference: subscription_id_to_subscription_reference, + subscription_reference_to_subscription: subscription_reference_to_subscription, + subscription_reference_to_subscription_id: subscription_reference_to_subscription_id, + url: url + } = state ) do - subscription = %Subscription{ - id: subscription_id, - subscriber_pid: subscriber_pid, - transport: EthereumJSONRPC.WebSocket, - transport_options: [web_socket: __MODULE__, web_socket_options: %{web_socket: self()}, url: url] - } - - GenServer.reply(from, {:ok, subscription}) - new_state = - state - |> put_in([Access.key!(:request_id_to_registration)], new_request_id_to_registration) - |> put_in([Access.key!(:subscription_id_to_subscription), subscription_id], subscription) + case subscription_reference_to_subscription do + # resubscribe + %{ + ^from_reference => %Subscription{ + subscriber_pid: ^subscriber_pid, + transport_options: %WebSocket{ + web_socket: __MODULE__, + web_socket_options: %Options{event: ^event, params: ^params} + } + } + } -> + %__MODULE__{ + state + | subscription_id_to_subscription_reference: + Map.put(subscription_id_to_subscription_reference, subscription_id, from_reference), + subscription_reference_to_subscription_id: + Map.put(subscription_reference_to_subscription_id, from_reference, subscription_id) + } + + # new subscription + _ -> + subscription_reference = make_ref() + + subscription = %Subscription{ + reference: subscription_reference, + subscriber_pid: subscriber_pid, + transport: EthereumJSONRPC.WebSocket, + transport_options: %EthereumJSONRPC.WebSocket{ + web_socket: __MODULE__, + web_socket_options: %Options{web_socket: self(), event: event, params: params}, + url: url + } + } + + GenServer.reply(from, {:ok, subscription}) + + %__MODULE__{ + state + | subscription_reference_to_subscription: + Map.put(subscription_reference_to_subscription, subscription_reference, subscription), + subscription_id_to_subscription_reference: + Map.put(subscription_id_to_subscription_reference, subscription_id, subscription_reference), + subscription_reference_to_subscription_id: + Map.put(subscription_reference_to_subscription_id, subscription_reference, subscription_id) + } + end {:ok, new_state} end defp respond_to_registration( %Registration{type: :subscribe, from: from}, - new_request_id_to_registration, %{"error" => error}, %__MODULE__{} = state ) do GenServer.reply(from, {:error, error}) - {:ok, %__MODULE__{state | request_id_to_registration: new_request_id_to_registration}} + {:ok, state} end defp respond_to_registration( - %Registration{type: :unsubscribe, from: from, subscription_id: subscription_id}, - new_request_id_to_registration, + %Registration{ + type: :unsubscribe, + from: from, + request: %{method: "eth_unsubscribe", params: [subscription_id]} + }, response, - %__MODULE__{} = state + %__MODULE__{ + subscription_id_to_subscription_reference: subscription_id_to_subscription_reference, + subscription_reference_to_subscription: subscription_reference_to_subscription, + subscription_reference_to_subscription_id: subscription_reference_to_subscription_id + } = state ) do reply = case response do @@ -314,19 +474,77 @@ defmodule EthereumJSONRPC.WebSocket.WebSocketClient do GenServer.reply(from, reply) new_state = - state - |> put_in([Access.key!(:request_id_to_registration)], new_request_id_to_registration) - |> update_in([Access.key!(:subscription_id_to_subscription)], &Map.delete(&1, subscription_id)) + case subscription_id_to_subscription_reference do + %{^subscription_id => subscription_reference} -> + %__MODULE__{ + state + | subscription_id_to_subscription_reference: + Map.delete(subscription_id_to_subscription_reference, subscription_id), + subscription_reference_to_subscription: + Map.delete(subscription_reference_to_subscription, subscription_reference), + subscription_reference_to_subscription_id: + Map.delete(subscription_reference_to_subscription_id, subscription_reference) + } + + _ -> + state + end {:ok, new_state} end - defp respond_to_registration(nil, _, response, %__MODULE__{} = state) do - Logger.error(fn -> ["Got response for unregistered request ID: ", inspect(response)] end) + defp respond_to_registration( + nil, + response, + %__MODULE__{request_id_to_registration: request_id_to_registration} = state + ) do + Logger.error(fn -> + [ + "Got response for unregistered request ID: ", + inspect(response), + ". Outstanding request registrations: ", + inspect(request_id_to_registration) + ] + end) {:ok, state} end + defp resubscribe( + %__MODULE__{subscription_reference_to_subscription: subscription_reference_to_subscription} = initial_state + ) do + Enum.reduce(subscription_reference_to_subscription, initial_state, fn {subscription_reference, + %Subscription{ + subscriber_pid: subscriber_pid, + transport_options: %WebSocket{ + web_socket: __MODULE__, + web_socket_options: %Options{ + event: event, + params: params + } + } + }}, + %__MODULE__{ + request_id_to_registration: + acc_request_id_to_registration + } = acc_state -> + request_id = unique_request_id(acc_state) + request = request(%{id: request_id, method: "eth_subscribe", params: [event | params]}) + + :websocket_client.cast(self(), frame(request)) + + %__MODULE__{ + acc_state + | request_id_to_registration: + Map.put(acc_request_id_to_registration, request_id, %Registration{ + from: {subscriber_pid, subscription_reference}, + type: :subscribe, + request: request + }) + } + end) + end + defp unique_request_id(%__MODULE__{request_id_to_registration: request_id_to_registration} = state) do unique_request_id = EthereumJSONRPC.unique_request_id() diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/web_socket_client/options.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/web_socket_client/options.ex new file mode 100644 index 0000000000..6d283ddfce --- /dev/null +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/web_socket_client/options.ex @@ -0,0 +1,17 @@ +defmodule EthereumJSONRPC.WebSocket.WebSocketClient.Options do + @moduledoc """ + `t:EthereumJSONRPC.WebSocket.options/0` for `EthereumJSONRPC.WebSocket.WebSocketClient` `t:EthereumJSONRPC.Subscription.t/0` `transport_options`. + """ + + alias EthereumJSONRPC.Subscription + + @enforce_keys ~w(web_socket)a + defstruct ~w(web_socket event params)a + + @typedoc """ + * `web_socket` - the `t:pid/0` of the `EthereumJSONRPC.WebSocket.WebSocketClient`. + * `event` - the event that should be resubscribed to after disconnect. + * `params` - the parameters that should be used to customized `event` when resubscribing after disconnect. + """ + @type t :: %__MODULE__{web_socket: pid(), event: Subscription.event(), params: Subscription.params()} +end diff --git a/apps/ethereum_jsonrpc/mix.exs b/apps/ethereum_jsonrpc/mix.exs index 62651ea5f9..d93c6bf7f1 100644 --- a/apps/ethereum_jsonrpc/mix.exs +++ b/apps/ethereum_jsonrpc/mix.exs @@ -58,8 +58,10 @@ defmodule EthereumJsonrpc.MixProject do # Run "mix help deps" to learn about dependencies. defp deps do [ - # CACerts bundle for `EthereumJSONRPC.WebSocket.Client` + # CACerts bundle for `EthereumJSONRPC.WebSocket.WebSocketClient` {:certifi, "~> 2.3"}, + # WebSocket-server for testing `EthereumJSONRPC.WebSocket.WebSocketClient`. + {:cowboy, "~> 1.1", only: :test}, # Style Checking {:credo, "0.9.2", only: [:dev, :test], runtime: false}, # Static Type Checking diff --git a/apps/ethereum_jsonrpc/test/ethereum_jsonrpc/web_socket/web_socket_client_test.exs b/apps/ethereum_jsonrpc/test/ethereum_jsonrpc/web_socket/web_socket_client_test.exs new file mode 100644 index 0000000000..e2ebf64181 --- /dev/null +++ b/apps/ethereum_jsonrpc/test/ethereum_jsonrpc/web_socket/web_socket_client_test.exs @@ -0,0 +1,223 @@ +defmodule EthereumJSONRPC.WebSocket.WebSocketClientTest do + use ExUnit.Case + + alias EthereumJSONRPC.Subscription + alias EthereumJSONRPC.WebSocket.{Registration, WebSocketClient} + + import EthereumJSONRPC, only: [unique_request_id: 0] + + describe "ondisconnect/2" do + setup :example_state + + test "reconnects", %{state: state} do + assert {:reconnect, _} = WebSocketClient.ondisconnect({:closed, :remote}, state) + end + + test "treats in-progress unsubscribes as successful", %{state: state} do + subscription_id = 1 + + state = put_subscription(state, subscription_id) + + %Registration{from: {_, ref}} = + registration = registration(%{type: :unsubscribe, subscription_id: subscription_id}) + + state = put_registration(state, registration) + + assert {_, disconnected_state} = WebSocketClient.ondisconnect({:closed, :remote}, state) + + assert Enum.empty?(disconnected_state.request_id_to_registration) + assert Enum.empty?(disconnected_state.subscription_id_to_subscription_reference) + assert Enum.empty?(disconnected_state.subscription_reference_to_subscription) + assert Enum.empty?(disconnected_state.subscription_reference_to_subscription_id) + + assert_receive {^ref, :ok} + end + + test "keeps :json_rpc requests for re-requesting on reconnect", %{state: state} do + state = put_registration(state, %{type: :json_rpc, method: "eth_getBlockByNumber", params: [1, true]}) + + assert {_, disconnected_state} = WebSocketClient.ondisconnect({:closed, :remote}, state) + + assert Enum.count(disconnected_state.request_id_to_registration) == 1 + end + + test "keeps :subscribe requests for re-requesting on reconnect", %{state: state} do + state = put_registration(state, %{type: :subscribe}) + + assert {_, disconnected_state} = WebSocketClient.ondisconnect({:closed, :remote}, state) + + assert Enum.count(disconnected_state.request_id_to_registration) == 1 + end + end + + describe "websocket_handle/3" do + setup :example_state + + test "Jason.decode errors are broadcast to all subscribers", %{state: %WebSocketClient{url: url} = state} do + subscription_id = 1 + subscription_reference = make_ref() + subscription = subscription(%{url: url, reference: subscription_reference}) + state = put_subscription(state, subscription_id, subscription) + + assert {:ok, ^state} = WebSocketClient.websocket_handle({:text, ""}, nil, state) + assert_receive {^subscription, {:error, %Jason.DecodeError{}}} + end + end + + describe "websocket_terminate/3" do + setup :example_state + + test "broadcasts close to all subscribers", %{state: %WebSocketClient{url: url} = state} do + subscription_id = 1 + subscription_reference = make_ref() + subscription = subscription(%{url: url, reference: subscription_reference}) + state = put_subscription(state, subscription_id, subscription) + + assert {:ok, ^state} = WebSocketClient.websocket_handle({:text, ""}, nil, state) + assert_receive {^subscription, {:error, %Jason.DecodeError{}}} + end + end + + describe "reconnect" do + setup do + dispatch = :cowboy_router.compile([{:_, [{"/websocket", EthereumJSONRPC.WebSocket.Cowboy.WebSocketHandler, []}]}]) + {:ok, _} = :cowboy.start_http(EthereumJSONRPC.WebSocket.Cowboy, 100, [], env: [dispatch: dispatch]) + + on_exit(fn -> + :ranch.stop_listener(EthereumJSONRPC.WebSocket.Cowboy) + end) + + port = :ranch.get_port(EthereumJSONRPC.WebSocket.Cowboy) + + pid = start_supervised!({WebSocketClient, ["ws://localhost:#{port}/websocket", []]}) + + %{pid: pid, port: port} + end + + test "resubscribes", %{pid: pid, port: port} do + assert {:ok, subscription} = WebSocketClient.subscribe(pid, "newHeads", []) + + assert_receive {^subscription, {:ok, %{}}}, 500 + + assert :ok = :ranch.stop_listener(EthereumJSONRPC.WebSocket.Cowboy) + + refute_receive {^subscription, {:ok, %{}}}, 100 + + cowboy(port) + + assert_receive {^subscription, {:ok, %{}}}, 500 + end + + test "rerequests", %{pid: pid, port: port} do + first_params = [1] + + # json_rpc requests work before connection is closed + assert {:ok, ^first_params} = + WebSocketClient.json_rpc( + pid, + EthereumJSONRPC.request(%{id: :erlang.unique_integer(), method: "echo", params: first_params}) + ) + + assert :ok = :ranch.stop_listener(EthereumJSONRPC.WebSocket.Cowboy) + + spawn_link(fn -> + Process.sleep(500) + cowboy(port) + end) + + second_params = [2] + + assert {:ok, ^second_params} = + WebSocketClient.json_rpc( + pid, + EthereumJSONRPC.request(%{id: :erlang.unique_integer(), method: "echo", params: second_params}) + ) + end + end + + defp cowboy(0) do + dispatch = :cowboy_router.compile([{:_, [{"/websocket", EthereumJSONRPC.WebSocket.Cowboy.WebSocketHandler, []}]}]) + {:ok, _} = :cowboy.start_http(EthereumJSONRPC.WebSocket.Cowboy, 100, [], env: [dispatch: dispatch]) + :ranch.get_port(EthereumJSONRPC.WebSocket.Cowboy) + end + + defp cowboy(port) do + dispatch = :cowboy_router.compile([{:_, [{"/websocket", EthereumJSONRPC.WebSocket.Cowboy.WebSocketHandler, []}]}]) + {:ok, _} = :cowboy.start_http(EthereumJSONRPC.WebSocket.Cowboy, 100, [port: port], env: [dispatch: dispatch]) + port + end + + defp example_state(_) do + %{state: %WebSocketClient{url: "ws://example.com"}} + end + + defp put_registration(%WebSocketClient{} = state, %Registration{request: %{id: request_id}} = registration) do + %WebSocketClient{state | request_id_to_registration: %{request_id => registration}} + end + + defp put_registration(%WebSocketClient{} = state, map) when is_map(map) do + put_registration(state, registration(map)) + end + + defp put_subscription(%WebSocketClient{url: url} = state, subscription_id) when is_integer(subscription_id) do + subscription_reference = make_ref() + put_subscription(state, subscription_id, subscription(%{url: url, reference: subscription_reference})) + end + + defp put_subscription( + %WebSocketClient{url: url} = state, + subscription_id, + %Subscription{ + reference: subscription_reference, + transport_options: %EthereumJSONRPC.WebSocket{url: url} + } = subscription + ) do + %WebSocketClient{ + state + | subscription_id_to_subscription_reference: %{subscription_id => subscription_reference}, + subscription_reference_to_subscription: %{subscription_reference => subscription}, + subscription_reference_to_subscription_id: %{subscription_reference => subscription_id} + } + end + + defp registration(%{type: :subscribe = type}) do + %Registration{ + type: type, + from: {self(), make_ref()}, + request: %{id: unique_request_id(), method: "eth_subscribe", params: ["newHeads"]} + } + end + + defp registration(%{type: :unsubscribe = type, subscription_id: subscription_id}) do + %Registration{ + type: type, + from: {self(), make_ref()}, + request: %{id: unique_request_id(), method: "eth_unsubscribe", params: [subscription_id]} + } + end + + defp registration(%{type: type, method: method, params: params}) do + %Registration{ + type: type, + from: {self(), make_ref()}, + request: %{id: unique_request_id(), method: method, params: params} + } + end + + defp subscription(%{reference: reference, url: url}) do + %Subscription{ + reference: reference, + subscriber_pid: self(), + transport: EthereumJSONRPC.WebSocket, + transport_options: %EthereumJSONRPC.WebSocket{ + url: url, + web_socket: WebSocketClient, + web_socket_options: %WebSocketClient.Options{ + web_socket: self(), + event: "newHeads", + params: [] + } + } + } + end +end diff --git a/apps/ethereum_jsonrpc/test/ethereum_jsonrpc/web_socket_test.exs b/apps/ethereum_jsonrpc/test/ethereum_jsonrpc/web_socket_test.exs index 9f932720e4..0433d59a11 100644 --- a/apps/ethereum_jsonrpc/test/ethereum_jsonrpc/web_socket_test.exs +++ b/apps/ethereum_jsonrpc/test/ethereum_jsonrpc/web_socket_test.exs @@ -5,14 +5,15 @@ defmodule EthereumJSONRPC.WebSocketTest do import Mox alias EthereumJSONRPC.{Subscription, WebSocket} + alias EthereumJSONRPC.WebSocket.WebSocketClient setup :verify_on_exit! describe "json_rpc/2" do test "can get result", %{subscribe_named_arguments: subscribe_named_arguments} do - transport_options = subscribe_named_arguments[:transport_options] + %WebSocket{web_socket: web_socket} = transport_options = subscribe_named_arguments[:transport_options] - if transport_options[:web_socket] == EthereumJSONRPC.WebSocket.Mox do + if web_socket == EthereumJSONRPC.WebSocket.Mox do expect(EthereumJSONRPC.WebSocket.Mox, :json_rpc, fn _, _ -> {:ok, %{"number" => "0x0"}} end) @@ -27,9 +28,9 @@ defmodule EthereumJSONRPC.WebSocketTest do # Infura timeouts on 2018-09-10 @tag :no_geth test "can get error", %{subscribe_named_arguments: subscribe_named_arguments} do - transport_options = subscribe_named_arguments[:transport_options] + %WebSocket{web_socket: web_socket} = transport_options = subscribe_named_arguments[:transport_options] - if transport_options[:web_socket] == EthereumJSONRPC.WebSocket.Mox do + if web_socket == EthereumJSONRPC.WebSocket.Mox do expect(EthereumJSONRPC.WebSocket.Mox, :json_rpc, fn _, _ -> {:error, %{ @@ -55,30 +56,39 @@ defmodule EthereumJSONRPC.WebSocketTest do describe "subscribe/2" do test "can subscribe to newHeads", %{subscribe_named_arguments: subscribe_named_arguments} do transport = Keyword.fetch!(subscribe_named_arguments, :transport) - transport_options = subscribe_named_arguments[:transport_options] + %WebSocket{web_socket: web_socket_module} = transport_options = subscribe_named_arguments[:transport_options] subscriber_pid = self() - if transport_options[:web_socket] == EthereumJSONRPC.WebSocket.Mox do - expect(EthereumJSONRPC.WebSocket.Mox, :subscribe, fn _, _, _ -> - {:ok, - %Subscription{ - id: "0x1", - subscriber_pid: subscriber_pid, - transport: transport, - transport_options: transport_options - }} - end) - end + subscription_transport_options = + case web_socket_module do + EthereumJSONRPC.WebSocket.Mox -> + expect(EthereumJSONRPC.WebSocket.Mox, :subscribe, fn _, "newHeads", [] -> + {:ok, + %Subscription{ + reference: make_ref(), + subscriber_pid: subscriber_pid, + transport: transport, + transport_options: transport_options + }} + end) + + transport_options + + EthereumJSONRPC.WebSocket.WebSocketClient -> + update_in(transport_options.web_socket_options, fn %WebSocketClient.Options{} = web_socket_options -> + %WebSocketClient.Options{web_socket_options | event: "newHeads", params: []} + end) + end assert {:ok, %Subscription{ - id: subscription_id, + reference: subscription_reference, subscriber_pid: ^subscriber_pid, transport: ^transport, - transport_options: ^transport_options + transport_options: ^subscription_transport_options }} = WebSocket.subscribe("newHeads", [], transport_options) - assert is_binary(subscription_id) + assert is_reference(subscription_reference) end # Infura timeouts on 2018-09-10 @@ -87,14 +97,13 @@ defmodule EthereumJSONRPC.WebSocketTest do block_interval: block_interval, subscribe_named_arguments: subscribe_named_arguments } do - transport_options = subscribe_named_arguments[:transport_options] - web_socket_module = Keyword.fetch!(transport_options, :web_socket) + %WebSocket{web_socket: web_socket_module} = transport_options = subscribe_named_arguments[:transport_options] subscriber_pid = self() if web_socket_module == EthereumJSONRPC.WebSocket.Mox do expect(web_socket_module, :subscribe, fn _, _, _ -> subscription = %Subscription{ - id: "0x1", + reference: make_ref(), subscriber_pid: subscriber_pid, transport: Keyword.fetch!(subscribe_named_arguments, :transport), transport_options: transport_options @@ -114,13 +123,12 @@ defmodule EthereumJSONRPC.WebSocketTest do describe "unsubscribe/2" do test "can unsubscribe", %{subscribe_named_arguments: subscribe_named_arguments} do - transport_options = subscribe_named_arguments[:transport_options] - web_socket_module = Keyword.fetch!(transport_options, :web_socket) + %WebSocket{web_socket: web_socket_module} = transport_options = subscribe_named_arguments[:transport_options] subscriber_pid = self() if web_socket_module == EthereumJSONRPC.WebSocket.Mox do subscription = %Subscription{ - id: "0x1", + reference: make_ref(), subscriber_pid: subscriber_pid, transport: Keyword.fetch!(subscribe_named_arguments, :transport), transport_options: transport_options @@ -142,13 +150,12 @@ defmodule EthereumJSONRPC.WebSocketTest do block_interval: block_interval, subscribe_named_arguments: subscribe_named_arguments } do - transport_options = subscribe_named_arguments[:transport_options] - web_socket_module = Keyword.fetch!(transport_options, :web_socket) + %WebSocket{web_socket: web_socket_module} = transport_options = subscribe_named_arguments[:transport_options] subscriber_pid = self() if web_socket_module == EthereumJSONRPC.WebSocket.Mox do subscription = %Subscription{ - id: "0x1", + reference: make_ref(), subscriber_pid: subscriber_pid, transport: Keyword.fetch!(subscribe_named_arguments, :transport), transport_options: transport_options @@ -189,13 +196,12 @@ defmodule EthereumJSONRPC.WebSocketTest do end test "return error if already unsubscribed", %{subscribe_named_arguments: subscribe_named_arguments} do - transport_options = subscribe_named_arguments[:transport_options] - web_socket_module = Keyword.fetch!(transport_options, :web_socket) + %WebSocket{web_socket: web_socket_module} = transport_options = subscribe_named_arguments[:transport_options] subscriber_pid = self() if web_socket_module == EthereumJSONRPC.WebSocket.Mox do subscription = %Subscription{ - id: "0x1", + reference: make_ref(), subscriber_pid: subscriber_pid, transport: Keyword.fetch!(subscribe_named_arguments, :transport), transport_options: transport_options diff --git a/apps/ethereum_jsonrpc/test/ethereum_jsonrpc_test.exs b/apps/ethereum_jsonrpc/test/ethereum_jsonrpc_test.exs index b21a08d801..022bf62bdc 100644 --- a/apps/ethereum_jsonrpc/test/ethereum_jsonrpc_test.exs +++ b/apps/ethereum_jsonrpc/test/ethereum_jsonrpc_test.exs @@ -5,6 +5,7 @@ defmodule EthereumJSONRPCTest do import Mox alias EthereumJSONRPC.Subscription + alias EthereumJSONRPC.WebSocket.WebSocketClient setup :verify_on_exit! @@ -548,27 +549,36 @@ defmodule EthereumJSONRPCTest do transport_options = subscribe_named_arguments[:transport_options] subscriber_pid = self() - if transport == EthereumJSONRPC.Mox do - expect(transport, :subscribe, fn _, _, _ -> - {:ok, - %Subscription{ - id: "0x1", - subscriber_pid: subscriber_pid, - transport: transport, - transport_options: transport_options - }} - end) - end + subscription_transport_options = + case transport do + EthereumJSONRPC.Mox -> + expect(transport, :subscribe, fn "newHeads", [], _ -> + {:ok, + %Subscription{ + reference: make_ref(), + subscriber_pid: subscriber_pid, + transport: transport, + transport_options: transport_options + }} + end) + + transport_options + + EthereumJSONRPC.WebSocket -> + update_in(transport_options.web_socket_options, fn %WebSocketClient.Options{} = web_socket_options -> + %WebSocketClient.Options{web_socket_options | event: "newHeads", params: []} + end) + end assert {:ok, %Subscription{ - id: subscription_id, + reference: subscription_reference, subscriber_pid: ^subscriber_pid, transport: ^transport, - transport_options: ^transport_options + transport_options: ^subscription_transport_options }} = EthereumJSONRPC.subscribe("newHeads", subscribe_named_arguments) - assert is_binary(subscription_id) + assert is_reference(subscription_reference) end # Infura timeouts on 2018-09-12 @@ -584,7 +594,7 @@ defmodule EthereumJSONRPCTest do if transport == EthereumJSONRPC.Mox do expect(transport, :subscribe, fn _, _, _ -> subscription = %Subscription{ - id: "0x1", + reference: make_ref(), subscriber_pid: subscriber_pid, transport: transport, transport_options: transport_options @@ -612,7 +622,7 @@ defmodule EthereumJSONRPCTest do if transport == EthereumJSONRPC.Mox do subscription = %Subscription{ - id: "0x1", + reference: make_ref(), subscriber_pid: subscriber_pid, transport: transport, transport_options: transport_options @@ -639,7 +649,7 @@ defmodule EthereumJSONRPCTest do if transport == EthereumJSONRPC.Mox do subscription = %Subscription{ - id: "0x1", + reference: make_ref(), subscriber_pid: subscriber_pid, transport: transport, transport_options: Keyword.fetch!(subscribe_named_arguments, :transport_options) @@ -685,7 +695,7 @@ defmodule EthereumJSONRPCTest do if transport == EthereumJSONRPC.Mox do subscription = %Subscription{ - id: "0x1", + reference: make_ref(), subscriber_pid: subscriber_pid, transport: transport, transport_options: transport_options diff --git a/apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/case/geth.ex b/apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/case/geth.ex index 6d235f0e9b..f808c24900 100644 --- a/apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/case/geth.ex +++ b/apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/case/geth.ex @@ -14,11 +14,11 @@ defmodule EthereumJSONRPC.WebSocket.Case.Geth do block_interval: 25_000, subscribe_named_arguments: [ transport: EthereumJSONRPC.WebSocket, - transport_options: [ + transport_options: %EthereumJSONRPC.WebSocket{ web_socket: web_socket_module, - web_socket_options: %{web_socket: web_socket}, + web_socket_options: %EthereumJSONRPC.WebSocket.WebSocketClient.Options{web_socket: web_socket}, url: url - ] + } ] } end diff --git a/apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/case/mox.ex b/apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/case/mox.ex index 93544a7e4c..94d0d00b61 100644 --- a/apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/case/mox.ex +++ b/apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/case/mox.ex @@ -35,11 +35,11 @@ defmodule EthereumJSONRPC.WebSocket.Case.Mox do block_interval: @block_interval, subscribe_named_arguments: [ transport: EthereumJSONRPC.WebSocket, - transport_options: [ + transport_options: %EthereumJSONRPC.WebSocket{ web_socket: web_socket_module, web_socket_options: %{web_socket: web_socket}, url: url - ] + } ] } end diff --git a/apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/case/parity.ex b/apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/case/parity.ex index 11f08beeb1..f63321283c 100644 --- a/apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/case/parity.ex +++ b/apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/case/parity.ex @@ -14,11 +14,11 @@ defmodule EthereumJSONRPC.WebSocket.Case.Parity do block_interval: 5_000, subscribe_named_arguments: [ transport: EthereumJSONRPC.WebSocket, - transport_options: [ + transport_options: %EthereumJSONRPC.WebSocket{ web_socket: web_socket_module, - web_socket_options: %{web_socket: web_socket}, + web_socket_options: %EthereumJSONRPC.WebSocket.WebSocketClient.Options{web_socket: web_socket}, url: url - ] + } ] } end diff --git a/apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/cowboy/websocket_handler.ex b/apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/cowboy/websocket_handler.ex new file mode 100644 index 0000000000..723e729e00 --- /dev/null +++ b/apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/cowboy/websocket_handler.ex @@ -0,0 +1,71 @@ +# See https://github.com/ninenines/cowboy/blob/1.1.x/examples/websocket/src/ws_handler.erl +defmodule EthereumJSONRPC.WebSocket.Cowboy.WebSocketHandler do + @behaviour :cowboy_websocket_handler + + defstruct subscription_id_set: MapSet.new(), + new_heads_timer_reference: nil + + def init({:tcp, :http}, _request, _opts) do + {:upgrade, :protocol, :cowboy_websocket} + end + + @impl :cowboy_websocket_handler + def websocket_init(_transport_name, request, _opts) do + {:ok, request, %__MODULE__{}} + end + + @impl :cowboy_websocket_handler + def websocket_handle( + {:text, text}, + request, + %__MODULE__{subscription_id_set: subscription_id_set, new_heads_timer_reference: new_heads_timer_reference} = + state + ) do + json = Jason.decode!(text) + + case json do + %{"id" => id, "method" => "eth_subscribe", "params" => ["newHeads"]} -> + subscription_id = :erlang.unique_integer() + response = %{id: id, result: subscription_id} + frame = {:text, Jason.encode!(response)} + + new_heads_timer_reference = + case new_heads_timer_reference do + nil -> + {:ok, timer_reference} = :timer.send_interval(10, :new_head) + timer_reference + + _ -> + new_heads_timer_reference + end + + {:reply, frame, request, + %__MODULE__{ + state + | new_heads_timer_reference: new_heads_timer_reference, + subscription_id_set: MapSet.put(subscription_id_set, subscription_id) + }} + + %{"id" => id, "method" => "echo", "params" => params} -> + response = %{id: id, result: params} + frame = {:text, Jason.encode!(response)} + {:reply, frame, request, state} + end + end + + @impl :cowboy_websocket_handler + def websocket_info(:new_head, request, %__MODULE__{subscription_id_set: subscription_id_set} = state) do + frames = + Enum.map(subscription_id_set, fn subscription_id -> + response = %{method: "eth_subscription", params: %{result: %{}, subscription: subscription_id}} + {:text, Jason.encode!(response)} + end) + + {:reply, frames, request, state} + end + + @impl :cowboy_websocket_handler + def websocket_terminate(_reason, _request, _state) do + :ok + end +end diff --git a/apps/indexer/lib/indexer/block/realtime/supervisor.ex b/apps/indexer/lib/indexer/block/realtime/supervisor.ex index c46fc82deb..0da69b04fc 100644 --- a/apps/indexer/lib/indexer/block/realtime/supervisor.ex +++ b/apps/indexer/lib/indexer/block/realtime/supervisor.ex @@ -14,13 +14,16 @@ defmodule Indexer.Block.Realtime.Supervisor do children = case Keyword.fetch!(subscribe_named_arguments, :transport) do EthereumJSONRPC.WebSocket -> - transport_options = Keyword.fetch!(subscribe_named_arguments, :transport_options) - url = Keyword.fetch!(transport_options, :url) - web_socket_module = Keyword.fetch!(transport_options, :web_socket) + transport_options = + struct!(EthereumJSONRPC.WebSocket, Keyword.fetch!(subscribe_named_arguments, :transport_options)) + web_socket = Indexer.Block.Realtime.WebSocket + web_socket_options = %EthereumJSONRPC.WebSocket.WebSocketClient.Options{web_socket: web_socket} + transport_options = %EthereumJSONRPC.WebSocket{transport_options | web_socket_options: web_socket_options} + %EthereumJSONRPC.WebSocket{url: url, web_socket: web_socket_module} = transport_options block_fetcher_subscribe_named_arguments = - put_in(subscribe_named_arguments[:transport_options][:web_socket_options], %{web_socket: web_socket}) + put_in(subscribe_named_arguments[:transport_options], transport_options) [ {Task.Supervisor, name: Indexer.Block.Realtime.TaskSupervisor},