Merge pull request #934 from poanetwork/924

Reconnect, rerequest, and resubscribe when websocket disconnects
pull/939/merge
Luke Imhoff 6 years ago committed by GitHub
commit c23c10520d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 19
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/subscription.ex
  2. 40
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket.ex
  3. 65
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/registration.ex
  4. 358
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/web_socket_client.ex
  5. 17
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/web_socket_client/options.ex
  6. 4
      apps/ethereum_jsonrpc/mix.exs
  7. 223
      apps/ethereum_jsonrpc/test/ethereum_jsonrpc/web_socket/web_socket_client_test.exs
  8. 68
      apps/ethereum_jsonrpc/test/ethereum_jsonrpc/web_socket_test.exs
  9. 46
      apps/ethereum_jsonrpc/test/ethereum_jsonrpc_test.exs
  10. 6
      apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/case/geth.ex
  11. 4
      apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/case/mox.ex
  12. 6
      apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/case/parity.ex
  13. 71
      apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/cowboy/websocket_handler.ex
  14. 11
      apps/indexer/lib/indexer/block/realtime/supervisor.ex

@ -5,8 +5,8 @@ defmodule EthereumJSONRPC.Subscription do
alias EthereumJSONRPC.Transport
@enforce_keys ~w(id subscriber_pid transport transport_options)a
defstruct ~w(id subscriber_pid transport transport_options)a
@enforce_keys ~w(reference subscriber_pid transport transport_options)a
defstruct ~w(reference subscriber_pid transport transport_options)a
@typedoc """
An event that can be subscribed to.
@ -26,12 +26,17 @@ defmodule EthereumJSONRPC.Subscription do
@type params :: list()
@typedoc """
* `id` - the `t:/id/0` of the subscription on the server
* `subscriber_pid` - the `t:pid/0` of process where `transport_pid` should send messages
* `transport` - the `t:EthereumJSONRPC.Transport.t/0` callback module
* `transport_options` - options passed to `c:EthereumJSONRPC.Transport.json_rpc/2`
* `reference` - the `t:reference/0` for referring to the subscription when talking to `transport_pid`.
* `subscriber_pid` - the `t:pid/0` of process where `transport_pid` should send messages.
* `transport` - the `t:EthereumJSONRPC.Transport.t/0` callback module.
* `transport_options` - options passed to `c:EthereumJSONRPC.Transport.json_rpc/2`.
"""
@type t :: %__MODULE__{id: id, subscriber_pid: pid, transport: Transport.t(), transport_options: Transport.options()}
@type t :: %__MODULE__{
reference: reference(),
subscriber_pid: pid(),
transport: Transport.t(),
transport_options: Transport.options()
}
@doc """
Publishes `messages` to all `subscriptions`s' `subscriber_pid`s.

@ -7,6 +7,9 @@ defmodule EthereumJSONRPC.WebSocket do
@behaviour Transport
@enforce_keys ~w(url web_socket)a
defstruct ~w(url web_socket web_socket_options)a
@typedoc """
WebSocket name
"""
@ -19,12 +22,17 @@ defmodule EthereumJSONRPC.WebSocket do
# same as `t:GenServer.server/0`
@type web_socket :: pid() | name() | {atom(), node()}
@typedoc """
Options for `web_socket` `t:module/0` in `t:t/0`.
"""
@type web_socket_options :: %{required(:web_socket) => web_socket(), optional(atom()) => term()}
@typedoc """
Options passed to `EthereumJSONRPC.Transport` callbacks.
**MUST** contain `t:web_socket/0` referring to `t:pid/0` returned by `c:start_link/2`.
"""
@type options :: [{:web_socket, web_socket()} | {:web_socket_options, term()}]
@type t :: %__MODULE__{web_socket: module(), web_socket_options: web_socket_options}
@doc """
Allow `c:start_link/1` to be called as part of a supervision tree.
@ -33,8 +41,6 @@ defmodule EthereumJSONRPC.WebSocket do
@doc """
Starts web socket attached to `url` with `options`.
"""
# Return is same as `t:GenServer.on_start/0`
@callback start_link([url :: String.t() | options :: term()]) ::
@ -83,31 +89,27 @@ defmodule EthereumJSONRPC.WebSocket do
@callback unsubscribe(web_socket(), Subscription.t()) :: :ok | {:error, reason :: term()}
@impl Transport
@spec json_rpc(Transport.request(), options) :: {:ok, Transport.result()} | {:error, reason :: term()}
def json_rpc(request, options) do
web_socket_module = Keyword.fetch!(options, :web_socket)
%{web_socket: web_socket} = Keyword.fetch!(options, :web_socket_options)
@spec json_rpc(Transport.request(), t()) :: {:ok, Transport.result()} | {:error, reason :: term()}
def json_rpc(request, %__MODULE__{web_socket: web_socket_module, web_socket_options: %{web_socket: web_socket}}) do
web_socket_module.json_rpc(web_socket, request)
end
@impl Transport
@spec subscribe(event :: Subscription.event(), params :: Subscription.params(), options) ::
@spec subscribe(event :: Subscription.event(), params :: Subscription.params(), t()) ::
{:ok, Subscription.t()} | {:error, reason :: term()}
def subscribe(event, params, options) when is_binary(event) and is_list(params) do
web_socket_module = Keyword.fetch!(options, :web_socket)
%{web_socket: web_socket} = Keyword.fetch!(options, :web_socket_options)
def subscribe(event, params, %__MODULE__{web_socket: web_socket_module, web_socket_options: %{web_socket: web_socket}})
when is_binary(event) and is_list(params) do
web_socket_module.subscribe(web_socket, event, params)
end
@impl Transport
@spec unsubscribe(%Subscription{transport: __MODULE__, transport_options: options}) ::
:ok | {:error, reason :: term()}
def unsubscribe(%Subscription{transport: __MODULE__, transport_options: transport_options} = subscription) do
web_socket_module = Keyword.fetch!(transport_options, :web_socket)
%{web_socket: web_socket} = Keyword.fetch!(transport_options, :web_socket_options)
@spec unsubscribe(%Subscription{transport: __MODULE__, transport_options: t()}) :: :ok | {:error, reason :: term()}
def unsubscribe(
%Subscription{
transport: __MODULE__,
transport_options: %__MODULE__{web_socket: web_socket_module, web_socket_options: %{web_socket: web_socket}}
} = subscription
) do
web_socket_module.unsubscribe(web_socket, subscription)
end
end

@ -3,10 +3,10 @@ defmodule EthereumJSONRPC.WebSocket.Registration do
When a caller registers for responses to asynchronous frame responses.
"""
alias EthereumJSONRPC.Subscription
alias EthereumJSONRPC.{Subscription, Transport}
@enforce_keys ~w(from type)a
defstruct ~w(from type subscription_id)a
@enforce_keys ~w(from request type)a
defstruct ~w(from request type)a
@typedoc """
What kind of request will be issued by the caller
@ -20,5 +20,62 @@ defmodule EthereumJSONRPC.WebSocket.Registration do
"""
@type type :: :json_rpc | :subscribe | :unsubscribe
@type t :: %__MODULE__{from: GenServer.from(), type: type, subscription_id: Subscription.id()}
@typedoc """
`"eth_subscribe"`
"""
@type subscribe :: Transport.method()
@typedoc """
The event to `t:subscribe/0` to.
"""
@type event :: String.t()
@typedoc """
Parameters unique to `t:event/0` that customize the `t:subscribe`.
"""
@type event_param :: term()
@typedoc """
Parameters to `t:subscribe/0` `t:EthereumJSONRPC.Transport.request/0`.
A list that start with the `t:event/0` followed by zero or more `t:event_param/0`.
"""
@type subscribe_params :: [event | event_param, ...]
@typedoc """
`"eth_unsubscribe"`
"""
@type unsubscribe :: Transport.method()
@typedoc """
A list containing the `t:EthereumJSONRPC.Subscription.id/0` that is being unsubscribed.
"""
@type unsubscribe_params :: [Subscription.id(), ...]
@typedoc """
* `from` - used to `GenServer.reply/2` to caller
* `type` - the `t:type/0` of request
* `request` - the request sent to the server. Used to replay the request on disconnect.
"""
@type t ::
%__MODULE__{
from: GenServer.from(),
type: :json_rpc,
request: %{jsonrpc: String.t(), method: Transport.method(), params: list(), id: non_neg_integer()}
}
| %__MODULE__{
from: GenServer.from(),
type: :subscribe,
request: %{jsonrpc: String.t(), method: subscribe(), params: subscribe_params(), id: non_neg_integer()}
}
| %__MODULE__{
from: GenServer.from(),
type: :unsubscribe,
request: %{
jsonrpc: String.t(),
method: unsubscribe(),
params: unsubscribe_params(),
id: non_neg_integer()
}
}
end

@ -9,15 +9,38 @@ defmodule EthereumJSONRPC.WebSocket.WebSocketClient do
alias EthereumJSONRPC.{Subscription, Transport, WebSocket}
alias EthereumJSONRPC.WebSocket.Registration
alias EthereumJSONRPC.WebSocket.WebSocketClient.Options
@behaviour :websocket_client
@behaviour WebSocket
@enforce_keys ~w(url)a
defstruct request_id_to_registration: %{},
subscription_id_to_subscription: %{},
defstruct connected: false,
request_id_to_registration: %{},
subscription_id_to_subscription_reference: %{},
subscription_reference_to_subscription_id: %{},
subscription_reference_to_subscription: %{},
url: nil
@typedoc """
* `request_id_to_registration` - maps id of requests in flight to their
`t:EthereumSJONRPC.WebSocket.Registration.t/0`, so that when the response is received from the server, the caller
in `from` of the registration can be `GenServer.reply/2`ed to.
* `subscription_id_to_subscription_reference` - maps id of subscription on the server to the `t:reference/0` used in
the `t:EthereumJSONRPC.Subscription.t/0`. Subscriptions use a `t:reference/0` instead of the server-side id, so
that on reconnect, the id can change, but the subscribe does not need to be notified.
* `subscription_reference_to_subscription` - maps `t:reference/0` in `t:EthereumJSONRPC.Subscription.t/0` to that
`t:EthereumJSONRPC.Subscription.t/0`, so that the `subscriber_pid` can be notified of subscription messages.
* `subscription_reference_to_subscription_id` - maps `t:reference/0` in `t:EthereumJSONRPC.Subscription.t/0 to id of
the subscription on the server, so that the subscriber can unsubscribe with the `t:reference/0`.
"""
@type t :: %__MODULE__{
request_id_to_registration: %{EthereumJSONRPC.request_id() => Registration.t()},
subscription_id_to_subscription_reference: %{Subscription.id() => reference()},
subscription_reference_to_subscription: %{reference() => Subscription.t()},
subscription_reference_to_subscription_id: %{reference() => Subscription.id()}
}
# Supervisor interface
@impl WebSocket
@ -98,13 +121,15 @@ defmodule EthereumJSONRPC.WebSocket.WebSocketClient do
end
@impl :websocket_client
def onconnect(_, %__MODULE__{} = state) do
{:ok, state}
def onconnect(_, %__MODULE__{connected: false} = state) do
{:ok, reconnect(%__MODULE__{state | connected: true})}
end
@impl :websocket_client
def ondisconnect(reason, %__MODULE__{} = state) do
{:close, reason, state}
def ondisconnect(_reason, %__MODULE__{request_id_to_registration: request_id_to_registration} = state) do
final_state = Enum.reduce(request_id_to_registration, state, &disconnect_request_id_registration/2)
{:reconnect, %__MODULE__{final_state | connected: false}}
end
@impl :websocket_client
@ -121,7 +146,10 @@ defmodule EthereumJSONRPC.WebSocket.WebSocketClient do
@impl :websocket_client
def websocket_info({:"$gen_call", from, request}, _, %__MODULE__{} = state) do
handle_call(request, from, state)
case handle_call(request, from, state) do
{:reply, _, %__MODULE__{}} = reply -> reply
{:noreply, %__MODULE__{} = new_state} -> {:ok, new_state}
end
end
@impl :websocket_client
@ -129,24 +157,80 @@ defmodule EthereumJSONRPC.WebSocket.WebSocketClient do
broadcast(close, state)
end
defp broadcast(message, %__MODULE__{subscription_id_to_subscription: id_to_subscription}) do
id_to_subscription
defp broadcast(message, %__MODULE__{subscription_reference_to_subscription: subscription_reference_to_subscription}) do
subscription_reference_to_subscription
|> Map.values()
|> Subscription.broadcast(message)
end
defp handle_call(message, from, %__MODULE__{} = state) do
{updated_state, unique_request} = register(message, from, state)
# Not re-subscribing after disconnect is the same as a successful unsubscribe
defp disconnect_request_id_registration(
{request_id,
%Registration{
type: :unsubscribe,
from: from,
request: %{method: "eth_unsubscribe", params: [subscription_id]}
}},
%__MODULE__{
request_id_to_registration: request_id_to_registration,
subscription_id_to_subscription_reference: subscription_id_to_subscription_reference,
subscription_reference_to_subscription: subscription_reference_to_subscription,
subscription_reference_to_subscription_id: subscription_reference_to_subscription_id
} = acc_state
) do
GenServer.reply(from, :ok)
%{^subscription_id => subscription_reference} = subscription_id_to_subscription_reference
%__MODULE__{
acc_state
| request_id_to_registration: Map.delete(request_id_to_registration, request_id),
subscription_id_to_subscription_reference:
Map.delete(subscription_id_to_subscription_reference, subscription_id),
subscription_reference_to_subscription:
Map.delete(subscription_reference_to_subscription, subscription_reference),
subscription_reference_to_subscription_id:
Map.delete(subscription_reference_to_subscription_id, subscription_reference)
}
end
{:reply, {:text, Jason.encode!(unique_request)}, updated_state}
# Re-run in `onconnect\2`
defp disconnect_request_id_registration({_request_id, %Registration{type: type}}, state)
when type in ~w(json_rpc subscribe)a do
state
end
defp frame(request) do
{:text, Jason.encode!(request)}
end
defp handle_call(message, from, %__MODULE__{connected: connected} = state) do
case register(message, from, state) do
{:ok, unique_request, updated_state} ->
case connected do
true ->
{:reply, frame(unique_request), updated_state}
false ->
{:noreply, updated_state}
end
{:error, _reason} = error ->
GenServer.reply(from, error)
{:noreply, state}
end
end
defp handle_response(
%{"method" => "eth_subscription", "params" => %{"result" => result, "subscription" => subscription_id}},
%__MODULE__{subscription_id_to_subscription: subscription_id_to_subscription} = state
%__MODULE__{
subscription_id_to_subscription_reference: subscription_id_to_subscription_reference,
subscription_reference_to_subscription: subscription_reference_to_subscription
} = state
) do
case subscription_id_to_subscription do
%{^subscription_id => subscription} ->
case subscription_id_to_subscription_reference do
%{^subscription_id => subscription_reference} ->
%{^subscription_reference => subscription} = subscription_reference_to_subscription
Subscription.publish(subscription, {:ok, result})
_ ->
@ -157,7 +241,7 @@ defmodule EthereumJSONRPC.WebSocket.WebSocketClient do
") result (",
inspect(result),
"). Subscription ID not in known subscription IDs (",
subscription_id_to_subscription
subscription_id_to_subscription_reference
|> Map.values()
|> Enum.map(&inspect/1),
")."
@ -173,7 +257,9 @@ defmodule EthereumJSONRPC.WebSocket.WebSocketClient do
%__MODULE__{request_id_to_registration: request_id_to_registration} = state
) do
{registration, new_request_id_to_registration} = Map.pop(request_id_to_registration, id)
respond_to_registration(registration, new_request_id_to_registration, response, state)
new_state = %__MODULE__{state | request_id_to_registration: new_request_id_to_registration}
respond_to_registration(registration, response, new_state)
end
defp handle_response(response, %__MODULE__{} = state) do
@ -190,21 +276,30 @@ defmodule EthereumJSONRPC.WebSocket.WebSocketClient do
{:ok, state}
end
defp reconnect(%__MODULE__{} = state) do
state
|> rerequest()
|> resubscribe()
end
defp register(
{:json_rpc, original_request},
from,
%__MODULE__{request_id_to_registration: request_id_to_registration} = state
) do
unique_id = unique_request_id(state)
request = %{original_request | id: unique_id}
{%__MODULE__{
{:ok, request,
%__MODULE__{
state
| request_id_to_registration:
Map.put(request_id_to_registration, unique_id, %Registration{
from: from,
type: :json_rpc
type: :json_rpc,
request: request
})
}, %{original_request | id: unique_id}}
}}
end
defp register(
@ -214,41 +309,58 @@ defmodule EthereumJSONRPC.WebSocket.WebSocketClient do
)
when is_binary(event) and is_list(params) do
unique_id = unique_request_id(state)
request = request(%{id: unique_id, method: "eth_subscribe", params: [event | params]})
{
%__MODULE__{
state
| request_id_to_registration:
Map.put(request_id_to_registration, unique_id, %Registration{from: from, type: :subscribe})
},
request(%{id: unique_id, method: "eth_subscribe", params: [event | params]})
}
{:ok, request,
%__MODULE__{
state
| request_id_to_registration:
Map.put(request_id_to_registration, unique_id, %Registration{from: from, type: :subscribe, request: request})
}}
end
defp register(
{:unsubscribe, %Subscription{id: subscription_id}},
{:unsubscribe, %Subscription{reference: subscription_reference}},
from,
%__MODULE__{request_id_to_registration: request_id_to_registration} = state
%__MODULE__{
request_id_to_registration: request_id_to_registration,
subscription_reference_to_subscription_id: subscription_reference_to_subscription_id
} = state
) do
unique_id = unique_request_id(state)
case subscription_reference_to_subscription_id do
%{^subscription_reference => subscription_id} ->
unique_id = unique_request_id(state)
request = request(%{id: unique_id, method: "eth_unsubscribe", params: [subscription_id]})
{
:ok,
request,
%__MODULE__{
state
| request_id_to_registration:
Map.put(request_id_to_registration, unique_id, %Registration{
from: from,
type: :unsubscribe,
request: request
})
}
}
{
%__MODULE__{
state
| request_id_to_registration:
Map.put(request_id_to_registration, unique_id, %Registration{
from: from,
type: :unsubscribe,
subscription_id: subscription_id
})
},
request(%{id: unique_id, method: "eth_unsubscribe", params: [subscription_id]})
}
_ ->
{:error, :not_found}
end
end
defp rerequest(%__MODULE__{request_id_to_registration: request_id_to_registration} = state) do
Enum.each(request_id_to_registration, fn {_, %Registration{request: request}} ->
:websocket_client.cast(self(), frame(request))
end)
state
end
defp respond_to_registration(
%Registration{type: :json_rpc, from: from},
new_request_id_to_registration,
response,
%__MODULE__{} = state
) do
@ -260,48 +372,96 @@ defmodule EthereumJSONRPC.WebSocket.WebSocketClient do
GenServer.reply(from, reply)
{:ok, %__MODULE__{state | request_id_to_registration: new_request_id_to_registration}}
{:ok, state}
end
defp respond_to_registration(
%Registration{type: :subscribe, from: {subscriber_pid, _} = from},
new_request_id_to_registration,
%Registration{
type: :subscribe,
from: {subscriber_pid, from_reference} = from,
request: %{params: [event | params]}
},
%{"result" => subscription_id},
%__MODULE__{url: url} = state
%__MODULE__{
subscription_id_to_subscription_reference: subscription_id_to_subscription_reference,
subscription_reference_to_subscription: subscription_reference_to_subscription,
subscription_reference_to_subscription_id: subscription_reference_to_subscription_id,
url: url
} = state
) do
subscription = %Subscription{
id: subscription_id,
subscriber_pid: subscriber_pid,
transport: EthereumJSONRPC.WebSocket,
transport_options: [web_socket: __MODULE__, web_socket_options: %{web_socket: self()}, url: url]
}
GenServer.reply(from, {:ok, subscription})
new_state =
state
|> put_in([Access.key!(:request_id_to_registration)], new_request_id_to_registration)
|> put_in([Access.key!(:subscription_id_to_subscription), subscription_id], subscription)
case subscription_reference_to_subscription do
# resubscribe
%{
^from_reference => %Subscription{
subscriber_pid: ^subscriber_pid,
transport_options: %WebSocket{
web_socket: __MODULE__,
web_socket_options: %Options{event: ^event, params: ^params}
}
}
} ->
%__MODULE__{
state
| subscription_id_to_subscription_reference:
Map.put(subscription_id_to_subscription_reference, subscription_id, from_reference),
subscription_reference_to_subscription_id:
Map.put(subscription_reference_to_subscription_id, from_reference, subscription_id)
}
# new subscription
_ ->
subscription_reference = make_ref()
subscription = %Subscription{
reference: subscription_reference,
subscriber_pid: subscriber_pid,
transport: EthereumJSONRPC.WebSocket,
transport_options: %EthereumJSONRPC.WebSocket{
web_socket: __MODULE__,
web_socket_options: %Options{web_socket: self(), event: event, params: params},
url: url
}
}
GenServer.reply(from, {:ok, subscription})
%__MODULE__{
state
| subscription_reference_to_subscription:
Map.put(subscription_reference_to_subscription, subscription_reference, subscription),
subscription_id_to_subscription_reference:
Map.put(subscription_id_to_subscription_reference, subscription_id, subscription_reference),
subscription_reference_to_subscription_id:
Map.put(subscription_reference_to_subscription_id, subscription_reference, subscription_id)
}
end
{:ok, new_state}
end
defp respond_to_registration(
%Registration{type: :subscribe, from: from},
new_request_id_to_registration,
%{"error" => error},
%__MODULE__{} = state
) do
GenServer.reply(from, {:error, error})
{:ok, %__MODULE__{state | request_id_to_registration: new_request_id_to_registration}}
{:ok, state}
end
defp respond_to_registration(
%Registration{type: :unsubscribe, from: from, subscription_id: subscription_id},
new_request_id_to_registration,
%Registration{
type: :unsubscribe,
from: from,
request: %{method: "eth_unsubscribe", params: [subscription_id]}
},
response,
%__MODULE__{} = state
%__MODULE__{
subscription_id_to_subscription_reference: subscription_id_to_subscription_reference,
subscription_reference_to_subscription: subscription_reference_to_subscription,
subscription_reference_to_subscription_id: subscription_reference_to_subscription_id
} = state
) do
reply =
case response do
@ -314,19 +474,77 @@ defmodule EthereumJSONRPC.WebSocket.WebSocketClient do
GenServer.reply(from, reply)
new_state =
state
|> put_in([Access.key!(:request_id_to_registration)], new_request_id_to_registration)
|> update_in([Access.key!(:subscription_id_to_subscription)], &Map.delete(&1, subscription_id))
case subscription_id_to_subscription_reference do
%{^subscription_id => subscription_reference} ->
%__MODULE__{
state
| subscription_id_to_subscription_reference:
Map.delete(subscription_id_to_subscription_reference, subscription_id),
subscription_reference_to_subscription:
Map.delete(subscription_reference_to_subscription, subscription_reference),
subscription_reference_to_subscription_id:
Map.delete(subscription_reference_to_subscription_id, subscription_reference)
}
_ ->
state
end
{:ok, new_state}
end
defp respond_to_registration(nil, _, response, %__MODULE__{} = state) do
Logger.error(fn -> ["Got response for unregistered request ID: ", inspect(response)] end)
defp respond_to_registration(
nil,
response,
%__MODULE__{request_id_to_registration: request_id_to_registration} = state
) do
Logger.error(fn ->
[
"Got response for unregistered request ID: ",
inspect(response),
". Outstanding request registrations: ",
inspect(request_id_to_registration)
]
end)
{:ok, state}
end
defp resubscribe(
%__MODULE__{subscription_reference_to_subscription: subscription_reference_to_subscription} = initial_state
) do
Enum.reduce(subscription_reference_to_subscription, initial_state, fn {subscription_reference,
%Subscription{
subscriber_pid: subscriber_pid,
transport_options: %WebSocket{
web_socket: __MODULE__,
web_socket_options: %Options{
event: event,
params: params
}
}
}},
%__MODULE__{
request_id_to_registration:
acc_request_id_to_registration
} = acc_state ->
request_id = unique_request_id(acc_state)
request = request(%{id: request_id, method: "eth_subscribe", params: [event | params]})
:websocket_client.cast(self(), frame(request))
%__MODULE__{
acc_state
| request_id_to_registration:
Map.put(acc_request_id_to_registration, request_id, %Registration{
from: {subscriber_pid, subscription_reference},
type: :subscribe,
request: request
})
}
end)
end
defp unique_request_id(%__MODULE__{request_id_to_registration: request_id_to_registration} = state) do
unique_request_id = EthereumJSONRPC.unique_request_id()

@ -0,0 +1,17 @@
defmodule EthereumJSONRPC.WebSocket.WebSocketClient.Options do
@moduledoc """
`t:EthereumJSONRPC.WebSocket.options/0` for `EthereumJSONRPC.WebSocket.WebSocketClient` `t:EthereumJSONRPC.Subscription.t/0` `transport_options`.
"""
alias EthereumJSONRPC.Subscription
@enforce_keys ~w(web_socket)a
defstruct ~w(web_socket event params)a
@typedoc """
* `web_socket` - the `t:pid/0` of the `EthereumJSONRPC.WebSocket.WebSocketClient`.
* `event` - the event that should be resubscribed to after disconnect.
* `params` - the parameters that should be used to customized `event` when resubscribing after disconnect.
"""
@type t :: %__MODULE__{web_socket: pid(), event: Subscription.event(), params: Subscription.params()}
end

@ -58,8 +58,10 @@ defmodule EthereumJsonrpc.MixProject do
# Run "mix help deps" to learn about dependencies.
defp deps do
[
# CACerts bundle for `EthereumJSONRPC.WebSocket.Client`
# CACerts bundle for `EthereumJSONRPC.WebSocket.WebSocketClient`
{:certifi, "~> 2.3"},
# WebSocket-server for testing `EthereumJSONRPC.WebSocket.WebSocketClient`.
{:cowboy, "~> 1.1", only: :test},
# Style Checking
{:credo, "0.9.2", only: [:dev, :test], runtime: false},
# Static Type Checking

@ -0,0 +1,223 @@
defmodule EthereumJSONRPC.WebSocket.WebSocketClientTest do
use ExUnit.Case
alias EthereumJSONRPC.Subscription
alias EthereumJSONRPC.WebSocket.{Registration, WebSocketClient}
import EthereumJSONRPC, only: [unique_request_id: 0]
describe "ondisconnect/2" do
setup :example_state
test "reconnects", %{state: state} do
assert {:reconnect, _} = WebSocketClient.ondisconnect({:closed, :remote}, state)
end
test "treats in-progress unsubscribes as successful", %{state: state} do
subscription_id = 1
state = put_subscription(state, subscription_id)
%Registration{from: {_, ref}} =
registration = registration(%{type: :unsubscribe, subscription_id: subscription_id})
state = put_registration(state, registration)
assert {_, disconnected_state} = WebSocketClient.ondisconnect({:closed, :remote}, state)
assert Enum.empty?(disconnected_state.request_id_to_registration)
assert Enum.empty?(disconnected_state.subscription_id_to_subscription_reference)
assert Enum.empty?(disconnected_state.subscription_reference_to_subscription)
assert Enum.empty?(disconnected_state.subscription_reference_to_subscription_id)
assert_receive {^ref, :ok}
end
test "keeps :json_rpc requests for re-requesting on reconnect", %{state: state} do
state = put_registration(state, %{type: :json_rpc, method: "eth_getBlockByNumber", params: [1, true]})
assert {_, disconnected_state} = WebSocketClient.ondisconnect({:closed, :remote}, state)
assert Enum.count(disconnected_state.request_id_to_registration) == 1
end
test "keeps :subscribe requests for re-requesting on reconnect", %{state: state} do
state = put_registration(state, %{type: :subscribe})
assert {_, disconnected_state} = WebSocketClient.ondisconnect({:closed, :remote}, state)
assert Enum.count(disconnected_state.request_id_to_registration) == 1
end
end
describe "websocket_handle/3" do
setup :example_state
test "Jason.decode errors are broadcast to all subscribers", %{state: %WebSocketClient{url: url} = state} do
subscription_id = 1
subscription_reference = make_ref()
subscription = subscription(%{url: url, reference: subscription_reference})
state = put_subscription(state, subscription_id, subscription)
assert {:ok, ^state} = WebSocketClient.websocket_handle({:text, ""}, nil, state)
assert_receive {^subscription, {:error, %Jason.DecodeError{}}}
end
end
describe "websocket_terminate/3" do
setup :example_state
test "broadcasts close to all subscribers", %{state: %WebSocketClient{url: url} = state} do
subscription_id = 1
subscription_reference = make_ref()
subscription = subscription(%{url: url, reference: subscription_reference})
state = put_subscription(state, subscription_id, subscription)
assert {:ok, ^state} = WebSocketClient.websocket_handle({:text, ""}, nil, state)
assert_receive {^subscription, {:error, %Jason.DecodeError{}}}
end
end
describe "reconnect" do
setup do
dispatch = :cowboy_router.compile([{:_, [{"/websocket", EthereumJSONRPC.WebSocket.Cowboy.WebSocketHandler, []}]}])
{:ok, _} = :cowboy.start_http(EthereumJSONRPC.WebSocket.Cowboy, 100, [], env: [dispatch: dispatch])
on_exit(fn ->
:ranch.stop_listener(EthereumJSONRPC.WebSocket.Cowboy)
end)
port = :ranch.get_port(EthereumJSONRPC.WebSocket.Cowboy)
pid = start_supervised!({WebSocketClient, ["ws://localhost:#{port}/websocket", []]})
%{pid: pid, port: port}
end
test "resubscribes", %{pid: pid, port: port} do
assert {:ok, subscription} = WebSocketClient.subscribe(pid, "newHeads", [])
assert_receive {^subscription, {:ok, %{}}}, 500
assert :ok = :ranch.stop_listener(EthereumJSONRPC.WebSocket.Cowboy)
refute_receive {^subscription, {:ok, %{}}}, 100
cowboy(port)
assert_receive {^subscription, {:ok, %{}}}, 500
end
test "rerequests", %{pid: pid, port: port} do
first_params = [1]
# json_rpc requests work before connection is closed
assert {:ok, ^first_params} =
WebSocketClient.json_rpc(
pid,
EthereumJSONRPC.request(%{id: :erlang.unique_integer(), method: "echo", params: first_params})
)
assert :ok = :ranch.stop_listener(EthereumJSONRPC.WebSocket.Cowboy)
spawn_link(fn ->
Process.sleep(500)
cowboy(port)
end)
second_params = [2]
assert {:ok, ^second_params} =
WebSocketClient.json_rpc(
pid,
EthereumJSONRPC.request(%{id: :erlang.unique_integer(), method: "echo", params: second_params})
)
end
end
defp cowboy(0) do
dispatch = :cowboy_router.compile([{:_, [{"/websocket", EthereumJSONRPC.WebSocket.Cowboy.WebSocketHandler, []}]}])
{:ok, _} = :cowboy.start_http(EthereumJSONRPC.WebSocket.Cowboy, 100, [], env: [dispatch: dispatch])
:ranch.get_port(EthereumJSONRPC.WebSocket.Cowboy)
end
defp cowboy(port) do
dispatch = :cowboy_router.compile([{:_, [{"/websocket", EthereumJSONRPC.WebSocket.Cowboy.WebSocketHandler, []}]}])
{:ok, _} = :cowboy.start_http(EthereumJSONRPC.WebSocket.Cowboy, 100, [port: port], env: [dispatch: dispatch])
port
end
defp example_state(_) do
%{state: %WebSocketClient{url: "ws://example.com"}}
end
defp put_registration(%WebSocketClient{} = state, %Registration{request: %{id: request_id}} = registration) do
%WebSocketClient{state | request_id_to_registration: %{request_id => registration}}
end
defp put_registration(%WebSocketClient{} = state, map) when is_map(map) do
put_registration(state, registration(map))
end
defp put_subscription(%WebSocketClient{url: url} = state, subscription_id) when is_integer(subscription_id) do
subscription_reference = make_ref()
put_subscription(state, subscription_id, subscription(%{url: url, reference: subscription_reference}))
end
defp put_subscription(
%WebSocketClient{url: url} = state,
subscription_id,
%Subscription{
reference: subscription_reference,
transport_options: %EthereumJSONRPC.WebSocket{url: url}
} = subscription
) do
%WebSocketClient{
state
| subscription_id_to_subscription_reference: %{subscription_id => subscription_reference},
subscription_reference_to_subscription: %{subscription_reference => subscription},
subscription_reference_to_subscription_id: %{subscription_reference => subscription_id}
}
end
defp registration(%{type: :subscribe = type}) do
%Registration{
type: type,
from: {self(), make_ref()},
request: %{id: unique_request_id(), method: "eth_subscribe", params: ["newHeads"]}
}
end
defp registration(%{type: :unsubscribe = type, subscription_id: subscription_id}) do
%Registration{
type: type,
from: {self(), make_ref()},
request: %{id: unique_request_id(), method: "eth_unsubscribe", params: [subscription_id]}
}
end
defp registration(%{type: type, method: method, params: params}) do
%Registration{
type: type,
from: {self(), make_ref()},
request: %{id: unique_request_id(), method: method, params: params}
}
end
defp subscription(%{reference: reference, url: url}) do
%Subscription{
reference: reference,
subscriber_pid: self(),
transport: EthereumJSONRPC.WebSocket,
transport_options: %EthereumJSONRPC.WebSocket{
url: url,
web_socket: WebSocketClient,
web_socket_options: %WebSocketClient.Options{
web_socket: self(),
event: "newHeads",
params: []
}
}
}
end
end

@ -5,14 +5,15 @@ defmodule EthereumJSONRPC.WebSocketTest do
import Mox
alias EthereumJSONRPC.{Subscription, WebSocket}
alias EthereumJSONRPC.WebSocket.WebSocketClient
setup :verify_on_exit!
describe "json_rpc/2" do
test "can get result", %{subscribe_named_arguments: subscribe_named_arguments} do
transport_options = subscribe_named_arguments[:transport_options]
%WebSocket{web_socket: web_socket} = transport_options = subscribe_named_arguments[:transport_options]
if transport_options[:web_socket] == EthereumJSONRPC.WebSocket.Mox do
if web_socket == EthereumJSONRPC.WebSocket.Mox do
expect(EthereumJSONRPC.WebSocket.Mox, :json_rpc, fn _, _ ->
{:ok, %{"number" => "0x0"}}
end)
@ -27,9 +28,9 @@ defmodule EthereumJSONRPC.WebSocketTest do
# Infura timeouts on 2018-09-10
@tag :no_geth
test "can get error", %{subscribe_named_arguments: subscribe_named_arguments} do
transport_options = subscribe_named_arguments[:transport_options]
%WebSocket{web_socket: web_socket} = transport_options = subscribe_named_arguments[:transport_options]
if transport_options[:web_socket] == EthereumJSONRPC.WebSocket.Mox do
if web_socket == EthereumJSONRPC.WebSocket.Mox do
expect(EthereumJSONRPC.WebSocket.Mox, :json_rpc, fn _, _ ->
{:error,
%{
@ -55,30 +56,39 @@ defmodule EthereumJSONRPC.WebSocketTest do
describe "subscribe/2" do
test "can subscribe to newHeads", %{subscribe_named_arguments: subscribe_named_arguments} do
transport = Keyword.fetch!(subscribe_named_arguments, :transport)
transport_options = subscribe_named_arguments[:transport_options]
%WebSocket{web_socket: web_socket_module} = transport_options = subscribe_named_arguments[:transport_options]
subscriber_pid = self()
if transport_options[:web_socket] == EthereumJSONRPC.WebSocket.Mox do
expect(EthereumJSONRPC.WebSocket.Mox, :subscribe, fn _, _, _ ->
{:ok,
%Subscription{
id: "0x1",
subscriber_pid: subscriber_pid,
transport: transport,
transport_options: transport_options
}}
end)
end
subscription_transport_options =
case web_socket_module do
EthereumJSONRPC.WebSocket.Mox ->
expect(EthereumJSONRPC.WebSocket.Mox, :subscribe, fn _, "newHeads", [] ->
{:ok,
%Subscription{
reference: make_ref(),
subscriber_pid: subscriber_pid,
transport: transport,
transport_options: transport_options
}}
end)
transport_options
EthereumJSONRPC.WebSocket.WebSocketClient ->
update_in(transport_options.web_socket_options, fn %WebSocketClient.Options{} = web_socket_options ->
%WebSocketClient.Options{web_socket_options | event: "newHeads", params: []}
end)
end
assert {:ok,
%Subscription{
id: subscription_id,
reference: subscription_reference,
subscriber_pid: ^subscriber_pid,
transport: ^transport,
transport_options: ^transport_options
transport_options: ^subscription_transport_options
}} = WebSocket.subscribe("newHeads", [], transport_options)
assert is_binary(subscription_id)
assert is_reference(subscription_reference)
end
# Infura timeouts on 2018-09-10
@ -87,14 +97,13 @@ defmodule EthereumJSONRPC.WebSocketTest do
block_interval: block_interval,
subscribe_named_arguments: subscribe_named_arguments
} do
transport_options = subscribe_named_arguments[:transport_options]
web_socket_module = Keyword.fetch!(transport_options, :web_socket)
%WebSocket{web_socket: web_socket_module} = transport_options = subscribe_named_arguments[:transport_options]
subscriber_pid = self()
if web_socket_module == EthereumJSONRPC.WebSocket.Mox do
expect(web_socket_module, :subscribe, fn _, _, _ ->
subscription = %Subscription{
id: "0x1",
reference: make_ref(),
subscriber_pid: subscriber_pid,
transport: Keyword.fetch!(subscribe_named_arguments, :transport),
transport_options: transport_options
@ -114,13 +123,12 @@ defmodule EthereumJSONRPC.WebSocketTest do
describe "unsubscribe/2" do
test "can unsubscribe", %{subscribe_named_arguments: subscribe_named_arguments} do
transport_options = subscribe_named_arguments[:transport_options]
web_socket_module = Keyword.fetch!(transport_options, :web_socket)
%WebSocket{web_socket: web_socket_module} = transport_options = subscribe_named_arguments[:transport_options]
subscriber_pid = self()
if web_socket_module == EthereumJSONRPC.WebSocket.Mox do
subscription = %Subscription{
id: "0x1",
reference: make_ref(),
subscriber_pid: subscriber_pid,
transport: Keyword.fetch!(subscribe_named_arguments, :transport),
transport_options: transport_options
@ -142,13 +150,12 @@ defmodule EthereumJSONRPC.WebSocketTest do
block_interval: block_interval,
subscribe_named_arguments: subscribe_named_arguments
} do
transport_options = subscribe_named_arguments[:transport_options]
web_socket_module = Keyword.fetch!(transport_options, :web_socket)
%WebSocket{web_socket: web_socket_module} = transport_options = subscribe_named_arguments[:transport_options]
subscriber_pid = self()
if web_socket_module == EthereumJSONRPC.WebSocket.Mox do
subscription = %Subscription{
id: "0x1",
reference: make_ref(),
subscriber_pid: subscriber_pid,
transport: Keyword.fetch!(subscribe_named_arguments, :transport),
transport_options: transport_options
@ -189,13 +196,12 @@ defmodule EthereumJSONRPC.WebSocketTest do
end
test "return error if already unsubscribed", %{subscribe_named_arguments: subscribe_named_arguments} do
transport_options = subscribe_named_arguments[:transport_options]
web_socket_module = Keyword.fetch!(transport_options, :web_socket)
%WebSocket{web_socket: web_socket_module} = transport_options = subscribe_named_arguments[:transport_options]
subscriber_pid = self()
if web_socket_module == EthereumJSONRPC.WebSocket.Mox do
subscription = %Subscription{
id: "0x1",
reference: make_ref(),
subscriber_pid: subscriber_pid,
transport: Keyword.fetch!(subscribe_named_arguments, :transport),
transport_options: transport_options

@ -5,6 +5,7 @@ defmodule EthereumJSONRPCTest do
import Mox
alias EthereumJSONRPC.Subscription
alias EthereumJSONRPC.WebSocket.WebSocketClient
setup :verify_on_exit!
@ -548,27 +549,36 @@ defmodule EthereumJSONRPCTest do
transport_options = subscribe_named_arguments[:transport_options]
subscriber_pid = self()
if transport == EthereumJSONRPC.Mox do
expect(transport, :subscribe, fn _, _, _ ->
{:ok,
%Subscription{
id: "0x1",
subscriber_pid: subscriber_pid,
transport: transport,
transport_options: transport_options
}}
end)
end
subscription_transport_options =
case transport do
EthereumJSONRPC.Mox ->
expect(transport, :subscribe, fn "newHeads", [], _ ->
{:ok,
%Subscription{
reference: make_ref(),
subscriber_pid: subscriber_pid,
transport: transport,
transport_options: transport_options
}}
end)
transport_options
EthereumJSONRPC.WebSocket ->
update_in(transport_options.web_socket_options, fn %WebSocketClient.Options{} = web_socket_options ->
%WebSocketClient.Options{web_socket_options | event: "newHeads", params: []}
end)
end
assert {:ok,
%Subscription{
id: subscription_id,
reference: subscription_reference,
subscriber_pid: ^subscriber_pid,
transport: ^transport,
transport_options: ^transport_options
transport_options: ^subscription_transport_options
}} = EthereumJSONRPC.subscribe("newHeads", subscribe_named_arguments)
assert is_binary(subscription_id)
assert is_reference(subscription_reference)
end
# Infura timeouts on 2018-09-12
@ -584,7 +594,7 @@ defmodule EthereumJSONRPCTest do
if transport == EthereumJSONRPC.Mox do
expect(transport, :subscribe, fn _, _, _ ->
subscription = %Subscription{
id: "0x1",
reference: make_ref(),
subscriber_pid: subscriber_pid,
transport: transport,
transport_options: transport_options
@ -612,7 +622,7 @@ defmodule EthereumJSONRPCTest do
if transport == EthereumJSONRPC.Mox do
subscription = %Subscription{
id: "0x1",
reference: make_ref(),
subscriber_pid: subscriber_pid,
transport: transport,
transport_options: transport_options
@ -639,7 +649,7 @@ defmodule EthereumJSONRPCTest do
if transport == EthereumJSONRPC.Mox do
subscription = %Subscription{
id: "0x1",
reference: make_ref(),
subscriber_pid: subscriber_pid,
transport: transport,
transport_options: Keyword.fetch!(subscribe_named_arguments, :transport_options)
@ -685,7 +695,7 @@ defmodule EthereumJSONRPCTest do
if transport == EthereumJSONRPC.Mox do
subscription = %Subscription{
id: "0x1",
reference: make_ref(),
subscriber_pid: subscriber_pid,
transport: transport,
transport_options: transport_options

@ -14,11 +14,11 @@ defmodule EthereumJSONRPC.WebSocket.Case.Geth do
block_interval: 25_000,
subscribe_named_arguments: [
transport: EthereumJSONRPC.WebSocket,
transport_options: [
transport_options: %EthereumJSONRPC.WebSocket{
web_socket: web_socket_module,
web_socket_options: %{web_socket: web_socket},
web_socket_options: %EthereumJSONRPC.WebSocket.WebSocketClient.Options{web_socket: web_socket},
url: url
]
}
]
}
end

@ -35,11 +35,11 @@ defmodule EthereumJSONRPC.WebSocket.Case.Mox do
block_interval: @block_interval,
subscribe_named_arguments: [
transport: EthereumJSONRPC.WebSocket,
transport_options: [
transport_options: %EthereumJSONRPC.WebSocket{
web_socket: web_socket_module,
web_socket_options: %{web_socket: web_socket},
url: url
]
}
]
}
end

@ -14,11 +14,11 @@ defmodule EthereumJSONRPC.WebSocket.Case.Parity do
block_interval: 5_000,
subscribe_named_arguments: [
transport: EthereumJSONRPC.WebSocket,
transport_options: [
transport_options: %EthereumJSONRPC.WebSocket{
web_socket: web_socket_module,
web_socket_options: %{web_socket: web_socket},
web_socket_options: %EthereumJSONRPC.WebSocket.WebSocketClient.Options{web_socket: web_socket},
url: url
]
}
]
}
end

@ -0,0 +1,71 @@
# See https://github.com/ninenines/cowboy/blob/1.1.x/examples/websocket/src/ws_handler.erl
defmodule EthereumJSONRPC.WebSocket.Cowboy.WebSocketHandler do
@behaviour :cowboy_websocket_handler
defstruct subscription_id_set: MapSet.new(),
new_heads_timer_reference: nil
def init({:tcp, :http}, _request, _opts) do
{:upgrade, :protocol, :cowboy_websocket}
end
@impl :cowboy_websocket_handler
def websocket_init(_transport_name, request, _opts) do
{:ok, request, %__MODULE__{}}
end
@impl :cowboy_websocket_handler
def websocket_handle(
{:text, text},
request,
%__MODULE__{subscription_id_set: subscription_id_set, new_heads_timer_reference: new_heads_timer_reference} =
state
) do
json = Jason.decode!(text)
case json do
%{"id" => id, "method" => "eth_subscribe", "params" => ["newHeads"]} ->
subscription_id = :erlang.unique_integer()
response = %{id: id, result: subscription_id}
frame = {:text, Jason.encode!(response)}
new_heads_timer_reference =
case new_heads_timer_reference do
nil ->
{:ok, timer_reference} = :timer.send_interval(10, :new_head)
timer_reference
_ ->
new_heads_timer_reference
end
{:reply, frame, request,
%__MODULE__{
state
| new_heads_timer_reference: new_heads_timer_reference,
subscription_id_set: MapSet.put(subscription_id_set, subscription_id)
}}
%{"id" => id, "method" => "echo", "params" => params} ->
response = %{id: id, result: params}
frame = {:text, Jason.encode!(response)}
{:reply, frame, request, state}
end
end
@impl :cowboy_websocket_handler
def websocket_info(:new_head, request, %__MODULE__{subscription_id_set: subscription_id_set} = state) do
frames =
Enum.map(subscription_id_set, fn subscription_id ->
response = %{method: "eth_subscription", params: %{result: %{}, subscription: subscription_id}}
{:text, Jason.encode!(response)}
end)
{:reply, frames, request, state}
end
@impl :cowboy_websocket_handler
def websocket_terminate(_reason, _request, _state) do
:ok
end
end

@ -14,13 +14,16 @@ defmodule Indexer.Block.Realtime.Supervisor do
children =
case Keyword.fetch!(subscribe_named_arguments, :transport) do
EthereumJSONRPC.WebSocket ->
transport_options = Keyword.fetch!(subscribe_named_arguments, :transport_options)
url = Keyword.fetch!(transport_options, :url)
web_socket_module = Keyword.fetch!(transport_options, :web_socket)
transport_options =
struct!(EthereumJSONRPC.WebSocket, Keyword.fetch!(subscribe_named_arguments, :transport_options))
web_socket = Indexer.Block.Realtime.WebSocket
web_socket_options = %EthereumJSONRPC.WebSocket.WebSocketClient.Options{web_socket: web_socket}
transport_options = %EthereumJSONRPC.WebSocket{transport_options | web_socket_options: web_socket_options}
%EthereumJSONRPC.WebSocket{url: url, web_socket: web_socket_module} = transport_options
block_fetcher_subscribe_named_arguments =
put_in(subscribe_named_arguments[:transport_options][:web_socket_options], %{web_socket: web_socket})
put_in(subscribe_named_arguments[:transport_options], transport_options)
[
{Task.Supervisor, name: Indexer.Block.Realtime.TaskSupervisor},

Loading…
Cancel
Save