Reconnect, rerequest, and resubscribe when websocket disconnects

* Use struct, `%EthereumJSONRPC.WebSocket.WebSocketClient.Options{}` to
  hold `web_socket_options`.
* `%EthereumJSONRPC.WebSocket.Registration{}`
  * Record `t:EthereumJSONRPC.request/0` in
    `%EthereumJSONRPC.WebSocket.Registration{}`, so that the request can
    be re-requested if the websocket disconnects and reconnects.
  * Drop server-side `subscription_id` from
    `%EthereumJSONRPC.WebSocket.Registration{}` as it can retrieved from
    `request` now.
* `EthereumJSONRPC.WebSocket.WebSocketClient`
  * Track whether the web socket is connected or not, so that requests
    aren't sent when in the disconnected state.  This prevents
    double-sends as `handle_call` can be called before `onconnect`,
    which caused that first subscribe to be sent twice.
  * Instead of the subscriber knowing the server-side subscription id,
    an opaque `t:reference/0` is used, so that the reference is stable
    in the subscriber's `%Subscription{}`, but the subscription ID can
    change on reconnect, which means replacing
    `subscription_id_to_subscription` in `%WebSocketClient{}` with 3 maps

    1. `subscription_id_to_subscription_reference` - used translate
       server-side, per-connection subscription to the stable reference
       in `subscription_reference_to_subscription`
    2. `subscription_reference_to_subscription_id` - used to convert
       stable reference to subscription id to unsubscribe.
    3. `subscription_reference_to_subscription` - used to get full
       Subscription, including the subscriber_pid for when subscription
       messages are delivered or the subscription needs to be
       resubscribed using the `web_socket_options`.
  * `ondisconnect`, `:reconnect` immediately.
    * When disconnecting, tell unsubscribers they were successful.
    * Hold any json_rpc or subscribe requests to retry on reconnect.
    * Hold any subscriptions to resubscribe on reconnect.
  * `onconnect`
    * re-request the saved json_rpc and subscribe requests.
    * issue new requests to re-establish the subscriptions that had
      already succeeded in their subscribe requests.
* `%EthereumJSONRPC.Subscription{}`
  * Replace `id`, which did match the server created ID, with a
    `t:reference/0`, which allows for indirection and for the server
    created ID to change on reconnect while the subscriber does not
    need to care about the reconnections.
* `EthereumJSONRPC.WebSocket`
  * `options` is switched from a Keyword.t to a struct, `t`, so that it
    is easier to pattern match for the reconnect logic in
    `WebSocketClient`.
  * `web_socket_options` is no longer a generic `term` because the code
    did require it to be a map with a `:web_socket` key for the
    `t:GenServer.server/0`.
pull/934/head
Luke Imhoff 6 years ago
parent 32535bd73c
commit b1f4696569
  1. 19
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/subscription.ex
  2. 40
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket.ex
  3. 65
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/registration.ex
  4. 301
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/web_socket_client.ex
  5. 17
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/web_socket_client/options.ex
  6. 68
      apps/ethereum_jsonrpc/test/ethereum_jsonrpc/web_socket_test.exs
  7. 46
      apps/ethereum_jsonrpc/test/ethereum_jsonrpc_test.exs
  8. 6
      apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/case/geth.ex
  9. 4
      apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/case/mox.ex
  10. 6
      apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/case/parity.ex
  11. 11
      apps/indexer/lib/indexer/block/realtime/supervisor.ex

@ -5,8 +5,8 @@ defmodule EthereumJSONRPC.Subscription do
alias EthereumJSONRPC.Transport alias EthereumJSONRPC.Transport
@enforce_keys ~w(id subscriber_pid transport transport_options)a @enforce_keys ~w(reference subscriber_pid transport transport_options)a
defstruct ~w(id subscriber_pid transport transport_options)a defstruct ~w(reference subscriber_pid transport transport_options)a
@typedoc """ @typedoc """
An event that can be subscribed to. An event that can be subscribed to.
@ -26,12 +26,17 @@ defmodule EthereumJSONRPC.Subscription do
@type params :: list() @type params :: list()
@typedoc """ @typedoc """
* `id` - the `t:/id/0` of the subscription on the server * `reference` - the `t:reference/0` for referring to the subscription when talking to `transport_pid`.
* `subscriber_pid` - the `t:pid/0` of process where `transport_pid` should send messages * `subscriber_pid` - the `t:pid/0` of process where `transport_pid` should send messages.
* `transport` - the `t:EthereumJSONRPC.Transport.t/0` callback module * `transport` - the `t:EthereumJSONRPC.Transport.t/0` callback module.
* `transport_options` - options passed to `c:EthereumJSONRPC.Transport.json_rpc/2` * `transport_options` - options passed to `c:EthereumJSONRPC.Transport.json_rpc/2`.
""" """
@type t :: %__MODULE__{id: id, subscriber_pid: pid, transport: Transport.t(), transport_options: Transport.options()} @type t :: %__MODULE__{
reference: reference(),
subscriber_pid: pid(),
transport: Transport.t(),
transport_options: Transport.options()
}
@doc """ @doc """
Publishes `messages` to all `subscriptions`s' `subscriber_pid`s. Publishes `messages` to all `subscriptions`s' `subscriber_pid`s.

@ -7,6 +7,9 @@ defmodule EthereumJSONRPC.WebSocket do
@behaviour Transport @behaviour Transport
@enforce_keys ~w(url web_socket)a
defstruct ~w(url web_socket web_socket_options)a
@typedoc """ @typedoc """
WebSocket name WebSocket name
""" """
@ -19,12 +22,17 @@ defmodule EthereumJSONRPC.WebSocket do
# same as `t:GenServer.server/0` # same as `t:GenServer.server/0`
@type web_socket :: pid() | name() | {atom(), node()} @type web_socket :: pid() | name() | {atom(), node()}
@typedoc """
Options for `web_socket` `t:module/0` in `t:t/0`.
"""
@type web_socket_options :: %{required(:web_socket) => web_socket(), optional(atom()) => term()}
@typedoc """ @typedoc """
Options passed to `EthereumJSONRPC.Transport` callbacks. Options passed to `EthereumJSONRPC.Transport` callbacks.
**MUST** contain `t:web_socket/0` referring to `t:pid/0` returned by `c:start_link/2`. **MUST** contain `t:web_socket/0` referring to `t:pid/0` returned by `c:start_link/2`.
""" """
@type options :: [{:web_socket, web_socket()} | {:web_socket_options, term()}] @type t :: %__MODULE__{web_socket: module(), web_socket_options: web_socket_options}
@doc """ @doc """
Allow `c:start_link/1` to be called as part of a supervision tree. Allow `c:start_link/1` to be called as part of a supervision tree.
@ -33,8 +41,6 @@ defmodule EthereumJSONRPC.WebSocket do
@doc """ @doc """
Starts web socket attached to `url` with `options`. Starts web socket attached to `url` with `options`.
""" """
# Return is same as `t:GenServer.on_start/0` # Return is same as `t:GenServer.on_start/0`
@callback start_link([url :: String.t() | options :: term()]) :: @callback start_link([url :: String.t() | options :: term()]) ::
@ -83,31 +89,27 @@ defmodule EthereumJSONRPC.WebSocket do
@callback unsubscribe(web_socket(), Subscription.t()) :: :ok | {:error, reason :: term()} @callback unsubscribe(web_socket(), Subscription.t()) :: :ok | {:error, reason :: term()}
@impl Transport @impl Transport
@spec json_rpc(Transport.request(), options) :: {:ok, Transport.result()} | {:error, reason :: term()} @spec json_rpc(Transport.request(), t()) :: {:ok, Transport.result()} | {:error, reason :: term()}
def json_rpc(request, options) do def json_rpc(request, %__MODULE__{web_socket: web_socket_module, web_socket_options: %{web_socket: web_socket}}) do
web_socket_module = Keyword.fetch!(options, :web_socket)
%{web_socket: web_socket} = Keyword.fetch!(options, :web_socket_options)
web_socket_module.json_rpc(web_socket, request) web_socket_module.json_rpc(web_socket, request)
end end
@impl Transport @impl Transport
@spec subscribe(event :: Subscription.event(), params :: Subscription.params(), options) :: @spec subscribe(event :: Subscription.event(), params :: Subscription.params(), t()) ::
{:ok, Subscription.t()} | {:error, reason :: term()} {:ok, Subscription.t()} | {:error, reason :: term()}
def subscribe(event, params, options) when is_binary(event) and is_list(params) do def subscribe(event, params, %__MODULE__{web_socket: web_socket_module, web_socket_options: %{web_socket: web_socket}})
web_socket_module = Keyword.fetch!(options, :web_socket) when is_binary(event) and is_list(params) do
%{web_socket: web_socket} = Keyword.fetch!(options, :web_socket_options)
web_socket_module.subscribe(web_socket, event, params) web_socket_module.subscribe(web_socket, event, params)
end end
@impl Transport @impl Transport
@spec unsubscribe(%Subscription{transport: __MODULE__, transport_options: options}) :: @spec unsubscribe(%Subscription{transport: __MODULE__, transport_options: t()}) :: :ok | {:error, reason :: term()}
:ok | {:error, reason :: term()} def unsubscribe(
def unsubscribe(%Subscription{transport: __MODULE__, transport_options: transport_options} = subscription) do %Subscription{
web_socket_module = Keyword.fetch!(transport_options, :web_socket) transport: __MODULE__,
%{web_socket: web_socket} = Keyword.fetch!(transport_options, :web_socket_options) transport_options: %__MODULE__{web_socket: web_socket_module, web_socket_options: %{web_socket: web_socket}}
} = subscription
) do
web_socket_module.unsubscribe(web_socket, subscription) web_socket_module.unsubscribe(web_socket, subscription)
end end
end end

@ -3,10 +3,10 @@ defmodule EthereumJSONRPC.WebSocket.Registration do
When a caller registers for responses to asynchronous frame responses. When a caller registers for responses to asynchronous frame responses.
""" """
alias EthereumJSONRPC.Subscription alias EthereumJSONRPC.{Subscription, Transport}
@enforce_keys ~w(from type)a @enforce_keys ~w(from request type)a
defstruct ~w(from type subscription_id)a defstruct ~w(from request type)a
@typedoc """ @typedoc """
What kind of request will be issued by the caller What kind of request will be issued by the caller
@ -20,5 +20,62 @@ defmodule EthereumJSONRPC.WebSocket.Registration do
""" """
@type type :: :json_rpc | :subscribe | :unsubscribe @type type :: :json_rpc | :subscribe | :unsubscribe
@type t :: %__MODULE__{from: GenServer.from(), type: type, subscription_id: Subscription.id()} @typedoc """
`"eth_subscribe"`
"""
@type subscribe :: Transport.method()
@typedoc """
The event to `t:subscribe/0` to.
"""
@type event :: String.t()
@typedoc """
Parameters unique to `t:event/0` that customize the `t:subscribe`.
"""
@type event_param :: term()
@typedoc """
Parameters to `t:subscribe/0` `t:EthereumJSONRPC.Transport.request/0`.
A list that start with the `t:event/0` followed by zero or more `t:event_param/0`.
"""
@type subscribe_params :: [event | event_param, ...]
@typedoc """
`"eth_unsubscribe"`
"""
@type unsubscribe :: Transport.method()
@typedoc """
A list containing the `t:EthereumJSONRPC.Subscription.id/0` that is being unsubscribed.
"""
@type unsubscribe_params :: [Subscription.id(), ...]
@typedoc """
* `from` - used to `GenServer.reply/2` to caller
* `type` - the `t:type/0` of request
* `request` - the request sent to the server. Used to replay the request on disconnect.
"""
@type t ::
%__MODULE__{
from: GenServer.from(),
type: :json_rpc,
request: %{jsonrpc: String.t(), method: Transport.method(), params: list(), id: non_neg_integer()}
}
| %__MODULE__{
from: GenServer.from(),
type: :subscribe,
request: %{jsonrpc: String.t(), method: subscribe(), params: subscribe_params(), id: non_neg_integer()}
}
| %__MODULE__{
from: GenServer.from(),
type: :unsubscribe,
request: %{
jsonrpc: String.t(),
method: unsubscribe(),
params: unsubscribe_params(),
id: non_neg_integer()
}
}
end end

@ -9,15 +9,38 @@ defmodule EthereumJSONRPC.WebSocket.WebSocketClient do
alias EthereumJSONRPC.{Subscription, Transport, WebSocket} alias EthereumJSONRPC.{Subscription, Transport, WebSocket}
alias EthereumJSONRPC.WebSocket.Registration alias EthereumJSONRPC.WebSocket.Registration
alias EthereumJSONRPC.WebSocket.WebSocketClient.Options
@behaviour :websocket_client @behaviour :websocket_client
@behaviour WebSocket @behaviour WebSocket
@enforce_keys ~w(url)a @enforce_keys ~w(url)a
defstruct request_id_to_registration: %{}, defstruct connected: false,
subscription_id_to_subscription: %{}, request_id_to_registration: %{},
subscription_id_to_subscription_reference: %{},
subscription_reference_to_subscription_id: %{},
subscription_reference_to_subscription: %{},
url: nil url: nil
@typedoc """
* `request_id_to_registration` - maps id of requests in flight to their
`t:EthereumSJONRPC.WebSocket.Registration.t/0`, so that when the response is received from the server, the caller
in `from` of the registration can be `GenServer.reply/2`ed to.
* `subscription_id_to_subscription_reference` - maps id of subscription on the server to the `t:reference/0` used in
the `t:EthereumJSONRPC.Subscription.t/0`. Subscriptions use a `t:reference/0` instead of the server-side id, so
that on reconnect, the id can change, but the subscribe does not need to be notified.
* `subscription_reference_to_subscription` - maps `t:reference/0` in `t:EthereumJSONRPC.Subscription.t/0` to that
`t:EthereumJSONRPC.Subscription.t/0`, so that the `subscriber_pid` can be notified of subscription messages.
* `subscription_reference_to_subscription_id` - maps `t:reference/0` in `t:EthereumJSONRPC.Subscription.t/0 to id of
the subscription on the server, so that the subscriber can unsubscribe with the `t:reference/0`.
"""
@type t :: %__MODULE__{
request_id_to_registration: %{EthereumJSONRPC.request_id() => Registration.t()},
subscription_id_to_subscription_reference: %{Subscription.id() => reference()},
subscription_reference_to_subscription: %{reference() => Subscription.t()},
subscription_reference_to_subscription_id: %{reference() => Subscription.id()}
}
# Supervisor interface # Supervisor interface
@impl WebSocket @impl WebSocket
@ -98,13 +121,15 @@ defmodule EthereumJSONRPC.WebSocket.WebSocketClient do
end end
@impl :websocket_client @impl :websocket_client
def onconnect(_, %__MODULE__{} = state) do def onconnect(_, %__MODULE__{connected: false} = state) do
{:ok, state} {:ok, reconnect(%__MODULE__{state | connected: true})}
end end
@impl :websocket_client @impl :websocket_client
def ondisconnect(reason, %__MODULE__{} = state) do def ondisconnect(_reason, %__MODULE__{request_id_to_registration: request_id_to_registration} = state) do
{:close, reason, state} final_state = Enum.reduce(request_id_to_registration, state, &disconnect_request_id_registration/2)
{:reconnect, %__MODULE__{final_state | connected: false}}
end end
@impl :websocket_client @impl :websocket_client
@ -121,7 +146,10 @@ defmodule EthereumJSONRPC.WebSocket.WebSocketClient do
@impl :websocket_client @impl :websocket_client
def websocket_info({:"$gen_call", from, request}, _, %__MODULE__{} = state) do def websocket_info({:"$gen_call", from, request}, _, %__MODULE__{} = state) do
handle_call(request, from, state) case handle_call(request, from, state) do
{:reply, _, %__MODULE__{}} = reply -> reply
{:noreply, %__MODULE__{} = new_state} -> {:ok, new_state}
end
end end
@impl :websocket_client @impl :websocket_client
@ -129,24 +157,79 @@ defmodule EthereumJSONRPC.WebSocket.WebSocketClient do
broadcast(close, state) broadcast(close, state)
end end
defp broadcast(message, %__MODULE__{subscription_id_to_subscription: id_to_subscription}) do defp broadcast(message, %__MODULE__{subscription_reference_to_subscription: subscription_reference_to_subscription}) do
id_to_subscription subscription_reference_to_subscription
|> Map.values() |> Map.values()
|> Subscription.broadcast(message) |> Subscription.broadcast(message)
end end
defp handle_call(message, from, %__MODULE__{} = state) do # Not re-subscribing after disconnect is the same as a successful unsubscribe
{updated_state, unique_request} = register(message, from, state) defp disconnect_request_id_registration(
{request_id,
%Registration{
type: :unsubscribe,
from: from,
request: %{method: "eth_unsubscribe", params: [subscription_id]}
}},
%__MODULE__{
request_id_to_registration: request_id_to_registration,
subscription_id_to_subscription_reference: subscription_id_to_subscription_reference,
subscription_reference_to_subscription: subscription_reference_to_subscription,
subscription_reference_to_subscription_id: subscription_reference_to_subscription_id
} = acc_state
) do
GenServer.reply(from, :ok)
%{^subscription_id => subscription_reference} = subscription_id_to_subscription_reference
%__MODULE__{
acc_state
| request_id_to_registration: Map.delete(request_id_to_registration, request_id),
subscription_id_to_subscription_reference:
Map.delete(subscription_id_to_subscription_reference, subscription_id),
subscription_reference_to_subscription:
Map.delete(subscription_reference_to_subscription, subscription_reference),
subscription_reference_to_subscription_id:
Map.delete(subscription_reference_to_subscription_id, subscription_reference)
}
end
# Re-run in `onconnect\2`
defp disconnect_request_id_registration(%Registration{type: type}, state) when type in ~w(json_rpc subscribe)a do
state
end
defp frame(request) do
{:text, Jason.encode!(request)}
end
defp handle_call(message, from, %__MODULE__{connected: connected} = state) do
case register(message, from, state) do
{:ok, unique_request, updated_state} ->
case connected do
true ->
{:reply, frame(unique_request), updated_state}
false ->
{:noreply, updated_state}
end
{:reply, {:text, Jason.encode!(unique_request)}, updated_state} {:error, _reason} = error ->
GenServer.reply(from, error)
{:noreply, state}
end
end end
defp handle_response( defp handle_response(
%{"method" => "eth_subscription", "params" => %{"result" => result, "subscription" => subscription_id}}, %{"method" => "eth_subscription", "params" => %{"result" => result, "subscription" => subscription_id}},
%__MODULE__{subscription_id_to_subscription: subscription_id_to_subscription} = state %__MODULE__{
subscription_id_to_subscription_reference: subscription_id_to_subscription_reference,
subscription_reference_to_subscription: subscription_reference_to_subscription
} = state
) do ) do
case subscription_id_to_subscription do case subscription_id_to_subscription_reference do
%{^subscription_id => subscription} -> %{^subscription_id => subscription_reference} ->
%{^subscription_reference => subscription} = subscription_reference_to_subscription
Subscription.publish(subscription, {:ok, result}) Subscription.publish(subscription, {:ok, result})
_ -> _ ->
@ -157,7 +240,7 @@ defmodule EthereumJSONRPC.WebSocket.WebSocketClient do
") result (", ") result (",
inspect(result), inspect(result),
"). Subscription ID not in known subscription IDs (", "). Subscription ID not in known subscription IDs (",
subscription_id_to_subscription subscription_id_to_subscription_reference
|> Map.values() |> Map.values()
|> Enum.map(&inspect/1), |> Enum.map(&inspect/1),
")." ")."
@ -192,21 +275,30 @@ defmodule EthereumJSONRPC.WebSocket.WebSocketClient do
{:ok, state} {:ok, state}
end end
defp reconnect(%__MODULE__{} = state) do
state
|> rerequest()
|> resubscribe()
end
defp register( defp register(
{:json_rpc, original_request}, {:json_rpc, original_request},
from, from,
%__MODULE__{request_id_to_registration: request_id_to_registration} = state %__MODULE__{request_id_to_registration: request_id_to_registration} = state
) do ) do
unique_id = unique_request_id(state) unique_id = unique_request_id(state)
request = %{original_request | id: unique_id}
{%__MODULE__{ {:ok, request,
%__MODULE__{
state state
| request_id_to_registration: | request_id_to_registration:
Map.put(request_id_to_registration, unique_id, %Registration{ Map.put(request_id_to_registration, unique_id, %Registration{
from: from, from: from,
type: :json_rpc type: :json_rpc,
request: request
}) })
}, %{original_request | id: unique_id}} }}
end end
defp register( defp register(
@ -216,36 +308,54 @@ defmodule EthereumJSONRPC.WebSocket.WebSocketClient do
) )
when is_binary(event) and is_list(params) do when is_binary(event) and is_list(params) do
unique_id = unique_request_id(state) unique_id = unique_request_id(state)
request = request(%{id: unique_id, method: "eth_subscribe", params: [event | params]})
{ {:ok, request,
%__MODULE__{ %__MODULE__{
state state
| request_id_to_registration: | request_id_to_registration:
Map.put(request_id_to_registration, unique_id, %Registration{from: from, type: :subscribe}) Map.put(request_id_to_registration, unique_id, %Registration{from: from, type: :subscribe, request: request})
}, }}
request(%{id: unique_id, method: "eth_subscribe", params: [event | params]})
}
end end
defp register( defp register(
{:unsubscribe, %Subscription{id: subscription_id}}, {:unsubscribe, %Subscription{reference: subscription_reference}},
from, from,
%__MODULE__{request_id_to_registration: request_id_to_registration} = state %__MODULE__{
request_id_to_registration: request_id_to_registration,
subscription_reference_to_subscription_id: subscription_reference_to_subscription_id
} = state
) do ) do
unique_id = unique_request_id(state) case subscription_reference_to_subscription_id do
%{^subscription_reference => subscription_id} ->
unique_id = unique_request_id(state)
request = request(%{id: unique_id, method: "eth_unsubscribe", params: [subscription_id]})
{
:ok,
request,
%__MODULE__{
state
| request_id_to_registration:
Map.put(request_id_to_registration, unique_id, %Registration{
from: from,
type: :unsubscribe,
request: request
})
}
}
{ _ ->
%__MODULE__{ {:error, :not_found}
state end
| request_id_to_registration: end
Map.put(request_id_to_registration, unique_id, %Registration{
from: from, defp rerequest(%__MODULE__{request_id_to_registration: request_id_to_registration} = state) do
type: :unsubscribe, Enum.each(request_id_to_registration, fn {_, %Registration{request: request}} ->
subscription_id: subscription_id :websocket_client.cast(self(), frame(request))
}) end)
},
request(%{id: unique_id, method: "eth_unsubscribe", params: [subscription_id]}) state
}
end end
defp respond_to_registration( defp respond_to_registration(
@ -265,22 +375,38 @@ defmodule EthereumJSONRPC.WebSocket.WebSocketClient do
end end
defp respond_to_registration( defp respond_to_registration(
%Registration{type: :subscribe, from: {subscriber_pid, _} = from}, %Registration{type: :subscribe, from: {subscriber_pid, _} = from, request: %{params: [event | params]}},
%{"result" => subscription_id}, %{"result" => subscription_id},
%__MODULE__{subscription_id_to_subscription: subscription_id_to_subscription, url: url} = state %__MODULE__{
subscription_id_to_subscription_reference: subscription_id_to_subscription_reference,
subscription_reference_to_subscription: subscription_reference_to_subscription,
subscription_reference_to_subscription_id: subscription_reference_to_subscription_id,
url: url
} = state
) do ) do
subscription_reference = make_ref()
subscription = %Subscription{ subscription = %Subscription{
id: subscription_id, reference: subscription_reference,
subscriber_pid: subscriber_pid, subscriber_pid: subscriber_pid,
transport: EthereumJSONRPC.WebSocket, transport: EthereumJSONRPC.WebSocket,
transport_options: [web_socket: __MODULE__, web_socket_options: %{web_socket: self()}, url: url] transport_options: %EthereumJSONRPC.WebSocket{
web_socket: __MODULE__,
web_socket_options: %Options{web_socket: self(), event: event, params: params},
url: url
}
} }
GenServer.reply(from, {:ok, subscription}) GenServer.reply(from, {:ok, subscription})
new_state = %__MODULE__{ new_state = %__MODULE__{
state state
| subscription_id_to_subscription: Map.put(subscription_id_to_subscription, subscription_id, subscription) | subscription_id_to_subscription_reference:
Map.put(subscription_id_to_subscription_reference, subscription_id, subscription_reference),
subscription_reference_to_subscription:
Map.put(subscription_reference_to_subscription, subscription_reference, subscription),
subscription_reference_to_subscription_id:
Map.put(subscription_reference_to_subscription_id, subscription_reference, subscription_id)
} }
{:ok, new_state} {:ok, new_state}
@ -297,9 +423,17 @@ defmodule EthereumJSONRPC.WebSocket.WebSocketClient do
end end
defp respond_to_registration( defp respond_to_registration(
%Registration{type: :unsubscribe, from: from, subscription_id: subscription_id}, %Registration{
type: :unsubscribe,
from: from,
request: %{method: "eth_unsubscribe", params: [subscription_id]}
},
response, response,
%__MODULE__{subscription_id_to_subscription: subscription_id_to_subscription} = state %__MODULE__{
subscription_id_to_subscription_reference: subscription_id_to_subscription_reference,
subscription_reference_to_subscription: subscription_reference_to_subscription,
subscription_reference_to_subscription_id: subscription_reference_to_subscription_id
} = state
) do ) do
reply = reply =
case response do case response do
@ -311,20 +445,77 @@ defmodule EthereumJSONRPC.WebSocket.WebSocketClient do
GenServer.reply(from, reply) GenServer.reply(from, reply)
new_state = %__MODULE__{ new_state =
state case subscription_id_to_subscription_reference do
| subscription_id_to_subscription: Map.delete(subscription_id_to_subscription, subscription_id) %{^subscription_id => subscription_reference} ->
} %__MODULE__{
state
| subscription_id_to_subscription_reference:
Map.delete(subscription_id_to_subscription_reference, subscription_id),
subscription_reference_to_subscription:
Map.delete(subscription_reference_to_subscription, subscription_reference),
subscription_reference_to_subscription_id:
Map.delete(subscription_reference_to_subscription_id, subscription_reference)
}
_ ->
state
end
{:ok, new_state} {:ok, new_state}
end end
defp respond_to_registration(nil, response, %__MODULE__{} = state) do defp respond_to_registration(
Logger.error(fn -> ["Got response for unregistered request ID: ", inspect(response)] end) nil,
response,
%__MODULE__{request_id_to_registration: request_id_to_registration} = state
) do
Logger.error(fn ->
[
"Got response for unregistered request ID: ",
inspect(response),
". Outstanding request registrations: ",
inspect(request_id_to_registration)
]
end)
{:ok, state} {:ok, state}
end end
defp resubscribe(
%__MODULE__{subscription_reference_to_subscription: subscription_reference_to_subscription} = initial_state
) do
Enum.reduce(subscription_reference_to_subscription, initial_state, fn {subscription_reference,
%Subscription{
transport_options: %WebSocket{
web_socket: __MODULE__,
web_socket_options: %Options{
event: event,
params: params
}
}
}},
%__MODULE__{
request_id_to_registration:
acc_request_id_to_registration
} = acc_state ->
request_id = unique_request_id(acc_state)
request = request(%{id: request_id, method: "eth_subscribe", params: [event | params]})
:websocket_client.cast(self(), frame(request))
%__MODULE__{
acc_state
| request_id_to_registration:
Map.put(acc_request_id_to_registration, request_id, %Registration{
from: {self(), subscription_reference},
type: :subscribe,
request: request
})
}
end)
end
defp unique_request_id(%__MODULE__{request_id_to_registration: request_id_to_registration} = state) do defp unique_request_id(%__MODULE__{request_id_to_registration: request_id_to_registration} = state) do
unique_request_id = EthereumJSONRPC.unique_request_id() unique_request_id = EthereumJSONRPC.unique_request_id()

@ -0,0 +1,17 @@
defmodule EthereumJSONRPC.WebSocket.WebSocketClient.Options do
@moduledoc """
`t:EthereumJSONRPC.WebSocket.options/0` for `EthereumJSONRPC.WebSocket.WebSocketClient` `t:EthereumJSONRPC.Subscription.t/0` `transport_options`.
"""
alias EthereumJSONRPC.Subscription
@enforce_keys ~w(web_socket)a
defstruct ~w(web_socket event params)a
@typedoc """
* `web_socket` - the `t:pid/0` of the `EthereumJSONRPC.WebSocket.WebSocketClient`.
* `event` - the event that should be resubscribed to after disconnect.
* `params` - the parameters that should be used to customized `event` when resubscribing after disconnect.
"""
@type t :: %__MODULE__{web_socket: pid(), event: Subscription.event(), params: Subscription.params()}
end

@ -5,14 +5,15 @@ defmodule EthereumJSONRPC.WebSocketTest do
import Mox import Mox
alias EthereumJSONRPC.{Subscription, WebSocket} alias EthereumJSONRPC.{Subscription, WebSocket}
alias EthereumJSONRPC.WebSocket.WebSocketClient
setup :verify_on_exit! setup :verify_on_exit!
describe "json_rpc/2" do describe "json_rpc/2" do
test "can get result", %{subscribe_named_arguments: subscribe_named_arguments} do test "can get result", %{subscribe_named_arguments: subscribe_named_arguments} do
transport_options = subscribe_named_arguments[:transport_options] %WebSocket{web_socket: web_socket} = transport_options = subscribe_named_arguments[:transport_options]
if transport_options[:web_socket] == EthereumJSONRPC.WebSocket.Mox do if web_socket == EthereumJSONRPC.WebSocket.Mox do
expect(EthereumJSONRPC.WebSocket.Mox, :json_rpc, fn _, _ -> expect(EthereumJSONRPC.WebSocket.Mox, :json_rpc, fn _, _ ->
{:ok, %{"number" => "0x0"}} {:ok, %{"number" => "0x0"}}
end) end)
@ -27,9 +28,9 @@ defmodule EthereumJSONRPC.WebSocketTest do
# Infura timeouts on 2018-09-10 # Infura timeouts on 2018-09-10
@tag :no_geth @tag :no_geth
test "can get error", %{subscribe_named_arguments: subscribe_named_arguments} do test "can get error", %{subscribe_named_arguments: subscribe_named_arguments} do
transport_options = subscribe_named_arguments[:transport_options] %WebSocket{web_socket: web_socket} = transport_options = subscribe_named_arguments[:transport_options]
if transport_options[:web_socket] == EthereumJSONRPC.WebSocket.Mox do if web_socket == EthereumJSONRPC.WebSocket.Mox do
expect(EthereumJSONRPC.WebSocket.Mox, :json_rpc, fn _, _ -> expect(EthereumJSONRPC.WebSocket.Mox, :json_rpc, fn _, _ ->
{:error, {:error,
%{ %{
@ -55,30 +56,39 @@ defmodule EthereumJSONRPC.WebSocketTest do
describe "subscribe/2" do describe "subscribe/2" do
test "can subscribe to newHeads", %{subscribe_named_arguments: subscribe_named_arguments} do test "can subscribe to newHeads", %{subscribe_named_arguments: subscribe_named_arguments} do
transport = Keyword.fetch!(subscribe_named_arguments, :transport) transport = Keyword.fetch!(subscribe_named_arguments, :transport)
transport_options = subscribe_named_arguments[:transport_options] %WebSocket{web_socket: web_socket_module} = transport_options = subscribe_named_arguments[:transport_options]
subscriber_pid = self() subscriber_pid = self()
if transport_options[:web_socket] == EthereumJSONRPC.WebSocket.Mox do subscription_transport_options =
expect(EthereumJSONRPC.WebSocket.Mox, :subscribe, fn _, _, _ -> case web_socket_module do
{:ok, EthereumJSONRPC.WebSocket.Mox ->
%Subscription{ expect(EthereumJSONRPC.WebSocket.Mox, :subscribe, fn _, "newHeads", [] ->
id: "0x1", {:ok,
subscriber_pid: subscriber_pid, %Subscription{
transport: transport, reference: make_ref(),
transport_options: transport_options subscriber_pid: subscriber_pid,
}} transport: transport,
end) transport_options: transport_options
end }}
end)
transport_options
EthereumJSONRPC.WebSocket.WebSocketClient ->
update_in(transport_options.web_socket_options, fn %WebSocketClient.Options{} = web_socket_options ->
%WebSocketClient.Options{web_socket_options | event: "newHeads", params: []}
end)
end
assert {:ok, assert {:ok,
%Subscription{ %Subscription{
id: subscription_id, reference: subscription_reference,
subscriber_pid: ^subscriber_pid, subscriber_pid: ^subscriber_pid,
transport: ^transport, transport: ^transport,
transport_options: ^transport_options transport_options: ^subscription_transport_options
}} = WebSocket.subscribe("newHeads", [], transport_options) }} = WebSocket.subscribe("newHeads", [], transport_options)
assert is_binary(subscription_id) assert is_reference(subscription_reference)
end end
# Infura timeouts on 2018-09-10 # Infura timeouts on 2018-09-10
@ -87,14 +97,13 @@ defmodule EthereumJSONRPC.WebSocketTest do
block_interval: block_interval, block_interval: block_interval,
subscribe_named_arguments: subscribe_named_arguments subscribe_named_arguments: subscribe_named_arguments
} do } do
transport_options = subscribe_named_arguments[:transport_options] %WebSocket{web_socket: web_socket_module} = transport_options = subscribe_named_arguments[:transport_options]
web_socket_module = Keyword.fetch!(transport_options, :web_socket)
subscriber_pid = self() subscriber_pid = self()
if web_socket_module == EthereumJSONRPC.WebSocket.Mox do if web_socket_module == EthereumJSONRPC.WebSocket.Mox do
expect(web_socket_module, :subscribe, fn _, _, _ -> expect(web_socket_module, :subscribe, fn _, _, _ ->
subscription = %Subscription{ subscription = %Subscription{
id: "0x1", reference: make_ref(),
subscriber_pid: subscriber_pid, subscriber_pid: subscriber_pid,
transport: Keyword.fetch!(subscribe_named_arguments, :transport), transport: Keyword.fetch!(subscribe_named_arguments, :transport),
transport_options: transport_options transport_options: transport_options
@ -114,13 +123,12 @@ defmodule EthereumJSONRPC.WebSocketTest do
describe "unsubscribe/2" do describe "unsubscribe/2" do
test "can unsubscribe", %{subscribe_named_arguments: subscribe_named_arguments} do test "can unsubscribe", %{subscribe_named_arguments: subscribe_named_arguments} do
transport_options = subscribe_named_arguments[:transport_options] %WebSocket{web_socket: web_socket_module} = transport_options = subscribe_named_arguments[:transport_options]
web_socket_module = Keyword.fetch!(transport_options, :web_socket)
subscriber_pid = self() subscriber_pid = self()
if web_socket_module == EthereumJSONRPC.WebSocket.Mox do if web_socket_module == EthereumJSONRPC.WebSocket.Mox do
subscription = %Subscription{ subscription = %Subscription{
id: "0x1", reference: make_ref(),
subscriber_pid: subscriber_pid, subscriber_pid: subscriber_pid,
transport: Keyword.fetch!(subscribe_named_arguments, :transport), transport: Keyword.fetch!(subscribe_named_arguments, :transport),
transport_options: transport_options transport_options: transport_options
@ -142,13 +150,12 @@ defmodule EthereumJSONRPC.WebSocketTest do
block_interval: block_interval, block_interval: block_interval,
subscribe_named_arguments: subscribe_named_arguments subscribe_named_arguments: subscribe_named_arguments
} do } do
transport_options = subscribe_named_arguments[:transport_options] %WebSocket{web_socket: web_socket_module} = transport_options = subscribe_named_arguments[:transport_options]
web_socket_module = Keyword.fetch!(transport_options, :web_socket)
subscriber_pid = self() subscriber_pid = self()
if web_socket_module == EthereumJSONRPC.WebSocket.Mox do if web_socket_module == EthereumJSONRPC.WebSocket.Mox do
subscription = %Subscription{ subscription = %Subscription{
id: "0x1", reference: make_ref(),
subscriber_pid: subscriber_pid, subscriber_pid: subscriber_pid,
transport: Keyword.fetch!(subscribe_named_arguments, :transport), transport: Keyword.fetch!(subscribe_named_arguments, :transport),
transport_options: transport_options transport_options: transport_options
@ -189,13 +196,12 @@ defmodule EthereumJSONRPC.WebSocketTest do
end end
test "return error if already unsubscribed", %{subscribe_named_arguments: subscribe_named_arguments} do test "return error if already unsubscribed", %{subscribe_named_arguments: subscribe_named_arguments} do
transport_options = subscribe_named_arguments[:transport_options] %WebSocket{web_socket: web_socket_module} = transport_options = subscribe_named_arguments[:transport_options]
web_socket_module = Keyword.fetch!(transport_options, :web_socket)
subscriber_pid = self() subscriber_pid = self()
if web_socket_module == EthereumJSONRPC.WebSocket.Mox do if web_socket_module == EthereumJSONRPC.WebSocket.Mox do
subscription = %Subscription{ subscription = %Subscription{
id: "0x1", reference: make_ref(),
subscriber_pid: subscriber_pid, subscriber_pid: subscriber_pid,
transport: Keyword.fetch!(subscribe_named_arguments, :transport), transport: Keyword.fetch!(subscribe_named_arguments, :transport),
transport_options: transport_options transport_options: transport_options

@ -5,6 +5,7 @@ defmodule EthereumJSONRPCTest do
import Mox import Mox
alias EthereumJSONRPC.Subscription alias EthereumJSONRPC.Subscription
alias EthereumJSONRPC.WebSocket.WebSocketClient
setup :verify_on_exit! setup :verify_on_exit!
@ -548,27 +549,36 @@ defmodule EthereumJSONRPCTest do
transport_options = subscribe_named_arguments[:transport_options] transport_options = subscribe_named_arguments[:transport_options]
subscriber_pid = self() subscriber_pid = self()
if transport == EthereumJSONRPC.Mox do subscription_transport_options =
expect(transport, :subscribe, fn _, _, _ -> case transport do
{:ok, EthereumJSONRPC.Mox ->
%Subscription{ expect(transport, :subscribe, fn "newHeads", [], _ ->
id: "0x1", {:ok,
subscriber_pid: subscriber_pid, %Subscription{
transport: transport, reference: make_ref(),
transport_options: transport_options subscriber_pid: subscriber_pid,
}} transport: transport,
end) transport_options: transport_options
end }}
end)
transport_options
EthereumJSONRPC.WebSocket ->
update_in(transport_options.web_socket_options, fn %WebSocketClient.Options{} = web_socket_options ->
%WebSocketClient.Options{web_socket_options | event: "newHeads", params: []}
end)
end
assert {:ok, assert {:ok,
%Subscription{ %Subscription{
id: subscription_id, reference: subscription_reference,
subscriber_pid: ^subscriber_pid, subscriber_pid: ^subscriber_pid,
transport: ^transport, transport: ^transport,
transport_options: ^transport_options transport_options: ^subscription_transport_options
}} = EthereumJSONRPC.subscribe("newHeads", subscribe_named_arguments) }} = EthereumJSONRPC.subscribe("newHeads", subscribe_named_arguments)
assert is_binary(subscription_id) assert is_reference(subscription_reference)
end end
# Infura timeouts on 2018-09-12 # Infura timeouts on 2018-09-12
@ -584,7 +594,7 @@ defmodule EthereumJSONRPCTest do
if transport == EthereumJSONRPC.Mox do if transport == EthereumJSONRPC.Mox do
expect(transport, :subscribe, fn _, _, _ -> expect(transport, :subscribe, fn _, _, _ ->
subscription = %Subscription{ subscription = %Subscription{
id: "0x1", reference: make_ref(),
subscriber_pid: subscriber_pid, subscriber_pid: subscriber_pid,
transport: transport, transport: transport,
transport_options: transport_options transport_options: transport_options
@ -612,7 +622,7 @@ defmodule EthereumJSONRPCTest do
if transport == EthereumJSONRPC.Mox do if transport == EthereumJSONRPC.Mox do
subscription = %Subscription{ subscription = %Subscription{
id: "0x1", reference: make_ref(),
subscriber_pid: subscriber_pid, subscriber_pid: subscriber_pid,
transport: transport, transport: transport,
transport_options: transport_options transport_options: transport_options
@ -639,7 +649,7 @@ defmodule EthereumJSONRPCTest do
if transport == EthereumJSONRPC.Mox do if transport == EthereumJSONRPC.Mox do
subscription = %Subscription{ subscription = %Subscription{
id: "0x1", reference: make_ref(),
subscriber_pid: subscriber_pid, subscriber_pid: subscriber_pid,
transport: transport, transport: transport,
transport_options: Keyword.fetch!(subscribe_named_arguments, :transport_options) transport_options: Keyword.fetch!(subscribe_named_arguments, :transport_options)
@ -685,7 +695,7 @@ defmodule EthereumJSONRPCTest do
if transport == EthereumJSONRPC.Mox do if transport == EthereumJSONRPC.Mox do
subscription = %Subscription{ subscription = %Subscription{
id: "0x1", reference: make_ref(),
subscriber_pid: subscriber_pid, subscriber_pid: subscriber_pid,
transport: transport, transport: transport,
transport_options: transport_options transport_options: transport_options

@ -14,11 +14,11 @@ defmodule EthereumJSONRPC.WebSocket.Case.Geth do
block_interval: 25_000, block_interval: 25_000,
subscribe_named_arguments: [ subscribe_named_arguments: [
transport: EthereumJSONRPC.WebSocket, transport: EthereumJSONRPC.WebSocket,
transport_options: [ transport_options: %EthereumJSONRPC.WebSocket{
web_socket: web_socket_module, web_socket: web_socket_module,
web_socket_options: %{web_socket: web_socket}, web_socket_options: %EthereumJSONRPC.WebSocket.WebSocketClient.Options{web_socket: web_socket},
url: url url: url
] }
] ]
} }
end end

@ -35,11 +35,11 @@ defmodule EthereumJSONRPC.WebSocket.Case.Mox do
block_interval: @block_interval, block_interval: @block_interval,
subscribe_named_arguments: [ subscribe_named_arguments: [
transport: EthereumJSONRPC.WebSocket, transport: EthereumJSONRPC.WebSocket,
transport_options: [ transport_options: %EthereumJSONRPC.WebSocket{
web_socket: web_socket_module, web_socket: web_socket_module,
web_socket_options: %{web_socket: web_socket}, web_socket_options: %{web_socket: web_socket},
url: url url: url
] }
] ]
} }
end end

@ -14,11 +14,11 @@ defmodule EthereumJSONRPC.WebSocket.Case.Parity do
block_interval: 5_000, block_interval: 5_000,
subscribe_named_arguments: [ subscribe_named_arguments: [
transport: EthereumJSONRPC.WebSocket, transport: EthereumJSONRPC.WebSocket,
transport_options: [ transport_options: %EthereumJSONRPC.WebSocket{
web_socket: web_socket_module, web_socket: web_socket_module,
web_socket_options: %{web_socket: web_socket}, web_socket_options: %EthereumJSONRPC.WebSocket.WebSocketClient.Options{web_socket: web_socket},
url: url url: url
] }
] ]
} }
end end

@ -14,13 +14,16 @@ defmodule Indexer.Block.Realtime.Supervisor do
children = children =
case Keyword.fetch!(subscribe_named_arguments, :transport) do case Keyword.fetch!(subscribe_named_arguments, :transport) do
EthereumJSONRPC.WebSocket -> EthereumJSONRPC.WebSocket ->
transport_options = Keyword.fetch!(subscribe_named_arguments, :transport_options) transport_options =
url = Keyword.fetch!(transport_options, :url) struct!(EthereumJSONRPC.WebSocket, Keyword.fetch!(subscribe_named_arguments, :transport_options))
web_socket_module = Keyword.fetch!(transport_options, :web_socket)
web_socket = Indexer.Block.Realtime.WebSocket web_socket = Indexer.Block.Realtime.WebSocket
web_socket_options = %EthereumJSONRPC.WebSocket.WebSocketClient.Options{web_socket: web_socket}
transport_options = %EthereumJSONRPC.WebSocket{transport_options | web_socket_options: web_socket_options}
%EthereumJSONRPC.WebSocket{url: url, web_socket: web_socket_module} = transport_options
block_fetcher_subscribe_named_arguments = block_fetcher_subscribe_named_arguments =
put_in(subscribe_named_arguments[:transport_options][:web_socket_options], %{web_socket: web_socket}) put_in(subscribe_named_arguments[:transport_options], transport_options)
[ [
{Task.Supervisor, name: Indexer.Block.Realtime.TaskSupervisor}, {Task.Supervisor, name: Indexer.Block.Realtime.TaskSupervisor},

Loading…
Cancel
Save