Remove EthereumJSONRPC.WebSocket impl that don't work with Infura

Don't rebase them out, so that we have the history that we tried to use
`socket` and `websockex`, but we couldn't get them to work.
pull/572/head
Luke Imhoff 6 years ago committed by Luke Imhoff
parent 72bd8d7c68
commit 9a4a23b5c5
  1. 1
      .dialyzer-ignore
  2. 338
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/socket.ex
  3. 71
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/socket/receiver.ex
  4. 300
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/web_sockex.ex
  5. 6
      apps/ethereum_jsonrpc/mix.exs
  6. 2
      mix.lock

@ -1,4 +1,3 @@
:0: Unknown function 'Elixir.ExUnit.Callbacks':'__merge__'/3
:0: Unknown function 'Elixir.ExUnit.CaseTemplate':'__proxy__'/2
:0: Unknown type 'Elixir.Map':t/0
lib/websockex.ex:6: The inferred return type of handle_ping/2 ({'reply','pong' | {'pong',_},_}) has nothing in common with {'close',_} | {'ok',_} | {'close',{integer(),binary()},_} | {'reply',{'binary',binary()} | {'ping',binary()} | {'text',binary()},_}, which is the expected return type for the callback of the 'Elixir.WebSockex' behaviour

@ -1,338 +0,0 @@
defmodule EthereumJSONRPC.WebSocket.Socket do
@moduledoc """
Implements `EthereumJSONRPC.WebSocket` using `Socket.Web`.
"""
use GenServer
require Logger
import EthereumJSONRPC, only: [request: 1]
alias EthereumJSONRPC.{Subscription, Transport, WebSocket}
alias EthereumJSONRPC.WebSocket.Registration
alias EthereumJSONRPC.WebSocket.Socket.Receiver
@behaviour WebSocket
@enforce_keys ~w(receiver socket_web url)a
defstruct receiver: nil,
request_id_to_registration: %{},
socket_web: nil,
subscription_id_to_subscription: %{},
url: nil
@type t :: %__MODULE__{
receiver: pid(),
request_id_to_registration: %{non_neg_integer() => Registration.t()},
socket_web: Socket.Web.t(),
subscription_id_to_subscription: %{Subscription.id() => Subscription.t()},
url: String.t()
}
# Supervisor interface
@impl WebSocket
# only allow secure WSS
def start_link(["wss://" <> _ = url, gen_server_options]) when is_list(gen_server_options) do
GenServer.start_link(__MODULE__, url, gen_server_options)
end
# Client interface
@impl WebSocket
@spec json_rpc(WebSocket.web_socket(), Transport.request()) :: {:ok, Transport.result()} | {:error, reason :: term()}
def json_rpc(web_socket, request) do
GenServer.call(web_socket, {:json_rpc, request})
end
@impl WebSocket
@spec subscribe(WebSocket.web_socket(), Subscription.event(), Subscription.params()) ::
{:ok, Subscription.t()} | {:error, reason :: term()}
def subscribe(web_socket, event, params) when is_binary(event) and is_list(params) do
GenServer.call(web_socket, {:subscribe, event, params})
end
@impl WebSocket
@spec unsubscribe(WebSocket.web_socket(), Subscription.t()) :: :ok | {:error, :not_found}
def unsubscribe(web_socket, %Subscription{} = subscription) do
GenServer.call(web_socket, {:unsubscribe, subscription})
end
@impl GenServer
def init("wss://" <> _ = url) do
uri = URI.parse(url)
address = uri_to_address(uri)
options =
options_put_uri(
[
authorities: [path: :certifi.cacertfile()],
partial_chain: &partial_chain/1,
secure: true,
server_name: uri.host,
verify: [function: &:ssl_verify_hostname.verify_fun/3, data: [check_hostname: String.to_charlist(uri.host)]]
],
uri
)
case Socket.Web.connect(address, options) do
{:ok, socket_web} ->
receiver = Receiver.spawn_link(%{parent: self(), socket_web: socket_web})
{:ok, %__MODULE__{receiver: receiver, socket_web: socket_web, url: url}}
{:error, reason} ->
{:stop, reason}
end
end
@impl GenServer
def handle_call(message, from, %__MODULE__{socket_web: socket_web} = state) do
{updated_state, unique_request} = register(message, from, state)
Socket.Web.send(socket_web, {:text, Jason.encode!(unique_request)})
{:noreply, updated_state}
end
@impl GenServer
def handle_cast({:text, text}, %__MODULE__{} = state) do
case Jason.decode(text) do
{:ok, json} ->
handle_response(json, state)
{:error, _} = error ->
broadcast(error, state)
{:noreply, state}
end
end
def handle_cast(:close = close, %__MODULE__{} = state) do
broadcast({:error, close}, state)
end
def handle_cast({:close, _code, _data} = close, %__MODULE__{} = state) do
broadcast({:error, close}, state)
end
def handle_cast({:error, _reason} = error, %__MODULE__{} = state) do
broadcast(error, state)
end
defp broadcast(message, %__MODULE__{subscription_id_to_subscription: id_to_subscription}) do
id_to_subscription
|> Map.values()
|> Subscription.broadcast(message)
end
defp handle_response(
%{"method" => "eth_subscription", "params" => %{"result" => result, "subscription" => subscription_id}},
%__MODULE__{subscription_id_to_subscription: subscription_id_to_subscription} = state
) do
case subscription_id_to_subscription do
%{^subscription_id => subscription} ->
Subscription.publish(subscription, {:ok, result})
_ ->
Logger.error(fn ->
[
"Unexpected `eth_subscription` subscription ID (",
inspect(subscription_id),
") result (",
inspect(result),
"). Subscription ID not in known subscription IDs (",
subscription_id_to_subscription
|> Map.values()
|> Enum.map(&inspect/1),
")."
]
end)
end
{:noreply, state}
end
defp handle_response(
%{"id" => id} = response,
%__MODULE__{request_id_to_registration: request_id_to_registration} = state
) do
{registration, new_request_id_to_registration} = Map.pop(request_id_to_registration, id)
respond_to_registration(registration, new_request_id_to_registration, response, state)
end
defp handle_response(response, %__MODULE__{} = state) do
Logger.error(fn ->
[
"Unexpected JSON response from web socket\n",
"\n",
" Response:\n",
" ",
inspect(response)
]
end)
{:noreply, state}
end
defp options_put_uri(options, %URI{path: nil}), do: options
defp options_put_uri(options, %URI{path: path}) when is_binary(path) do
Keyword.put(options, :path, path)
end
defp partial_chain(certs) do
raise "BBOOM: #{inspect(certs)}"
end
defp register(
{:json_rpc, original_request},
from,
%__MODULE__{request_id_to_registration: request_id_to_registration} = state
) do
unique_id = unique_request_id(state)
{%__MODULE__{
state
| request_id_to_registration:
Map.put(request_id_to_registration, unique_id, %Registration{
from: from,
type: :json_rpc
})
}, %{original_request | id: unique_id}}
end
defp register(
{:subscribe, event, params},
from,
%__MODULE__{request_id_to_registration: request_id_to_registration} = state
)
when is_binary(event) and is_list(params) do
unique_id = unique_request_id(state)
{
%__MODULE__{
state
| request_id_to_registration:
Map.put(request_id_to_registration, unique_id, %Registration{from: from, type: :subscribe})
},
request(%{id: unique_id, method: "eth_subscribe", params: [event | params]})
}
end
defp register(
{:unsubscribe, %Subscription{id: subscription_id}},
from,
%__MODULE__{request_id_to_registration: request_id_to_registration} = state
) do
unique_id = unique_request_id(state)
{
%__MODULE__{
state
| request_id_to_registration:
Map.put(request_id_to_registration, unique_id, %Registration{
from: from,
type: :unsubscribe,
subscription_id: subscription_id
})
},
request(%{id: unique_id, method: "eth_unsubscribe", params: [subscription_id]})
}
end
defp unique_request_id(%__MODULE__{request_id_to_registration: request_id_to_registration} = state) do
unique_request_id = EthereumJSONRPC.unique_request_id()
case request_id_to_registration do
# collision
%{^unique_request_id => _} ->
unique_request_id(state)
_ ->
unique_request_id
end
end
defp respond_to_registration(
%Registration{type: :json_rpc, from: from},
new_request_id_to_registration,
response,
%__MODULE__{} = state
) do
reply =
case response do
%{"result" => result} -> {:ok, result}
%{"error" => error} -> {:error, error}
end
GenServer.reply(from, reply)
{:noreply, %__MODULE__{state | request_id_to_registration: new_request_id_to_registration}}
end
defp respond_to_registration(
%Registration{type: :subscribe, from: {subscriber_pid, _} = from},
new_request_id_to_registration,
%{"result" => subscription_id},
%__MODULE__{url: url} = state
) do
subscription = %Subscription{
id: subscription_id,
subscriber_pid: subscriber_pid,
transport: EthereumJSONRPC.WebSocket,
transport_options: [web_socket: __MODULE__, web_socket_options: %{web_socket: self()}, url: url]
}
GenServer.reply(from, {:ok, subscription})
new_state =
state
|> put_in([Access.key!(:request_id_to_registration)], new_request_id_to_registration)
|> put_in([Access.key!(:subscription_id_to_subscription), subscription_id], subscription)
{:noreply, new_state}
end
defp respond_to_registration(
%Registration{type: :subscribe, from: from},
new_request_id_to_registration,
%{"error" => error},
%__MODULE__{} = state
) do
GenServer.reply(from, {:error, error})
{:noreply, %__MODULE__{state | request_id_to_registration: new_request_id_to_registration}}
end
defp respond_to_registration(
%Registration{type: :unsubscribe, from: from, subscription_id: subscription_id},
new_request_id_to_registration,
response,
%__MODULE__{} = state
) do
reply =
case response do
%{"result" => true} -> :ok
%{"result" => false} -> {:error, :not_found}
%{"error" => error} -> {:error, error}
end
GenServer.reply(from, reply)
new_state =
state
|> put_in([Access.key!(:request_id_to_registration)], new_request_id_to_registration)
|> update_in([Access.key!(:subscription_id_to_subscription)], &Map.delete(&1, subscription_id))
{:noreply, new_state}
end
defp respond_to_registration(nil, _, response, %__MODULE__{} = state) do
Logger.error(fn -> ["Got response for unregistered request ID: ", inspect(response)] end)
{:noreply, state}
end
defp uri_to_address(%URI{host: host, port: nil}), do: host
defp uri_to_address(%URI{host: host, port: port}) when is_integer(port), do: {host, port}
end

@ -1,71 +0,0 @@
defmodule EthereumJSONRPC.WebSocket.Socket.Receiver do
@moduledoc """
Receives WebSocket messages with `Socket.Web.recv` and send non-ping/pong messages on to `EthereumJSONRPC.WebSocket.Socket`
"""
@enforce_keys ~w(parent socket_web)a
defstruct fragments_type: nil,
fragments: [],
parent: nil,
socket_web: nil
def spawn_link(named_arguments) do
spawn_link(__MODULE__, :loop, [struct!(__MODULE__, named_arguments)])
end
@doc false
def loop(%__MODULE__{socket_web: socket_web} = state) do
loop(state, Socket.Web.recv(socket_web))
end
# start of fragment
defp loop(state, {:ok, {:fragmented, type, fragment}}) when type in ~w(binary text)a do
loop(%__MODULE__{state | fragments_type: type, fragments: [fragment]})
end
# middle of fragment
defp loop(%__MODULE__{fragments: fragments} = state, {:ok, {:fragmented, :continuation, fragment}}) do
loop(%__MODULE__{state | fragments: [fragments | fragment]})
end
# end of fragment
defp loop(
%__MODULE__{fragments_type: fragments_type, fragments: fragments, parent: parent} = state,
{:ok, {:fragmented, :end, fragment}}
) do
GenServer.cast(parent, {fragments_type, IO.iodata_to_binary([fragments | fragment])})
loop(%__MODULE__{state | fragments_type: nil, fragments: []})
end
defp loop(%__MODULE__{parent: parent} = state, {:ok, {:binary, _} = binary}) do
GenServer.cast(parent, binary)
loop(state)
end
defp loop(%__MODULE__{parent: parent} = state, {:ok, {:text, _} = text}) do
GenServer.cast(parent, text)
loop(state)
end
defp loop(%__MODULE__{socket_web: socket_web} = state, {:ok, {:ping, data}}) do
Socket.Web.send!(socket_web, {:pong, data})
loop(state)
end
defp loop(%__MODULE__{parent: parent} = state, {:ok, {:pong, _data} = pong}) do
GenServer.cast(parent, pong)
loop(state)
end
defp loop(%__MODULE__{parent: parent}, {:ok, :close = close}) do
GenServer.cast(parent, close)
end
defp loop(%__MODULE__{parent: parent}, {:ok, {:close, _code, _data} = close}) do
GenServer.cast(parent, close)
end
defp loop(%__MODULE__{parent: parent}, {:error, _reason} = error) do
GenServer.cast(parent, error)
end
end

@ -1,300 +0,0 @@
defmodule EthereumJSONRPC.WebSocket.WebSockex do
@moduledoc """
Implements `EthereumJSONRPC.WebSocket` using `WebSockex`.
"""
use WebSockex
require Logger
import EthereumJSONRPC, only: [request: 1]
alias EthereumJSONRPC.{Subscription, Transport, WebSocket}
alias EthereumJSONRPC.WebSocket.Registration
@behaviour WebSocket
@enforce_keys ~w(url)a
defstruct request_id_to_registration: %{},
subscription_id_to_subscription: %{},
url: nil
@type t :: %__MODULE__{
request_id_to_registration: %{non_neg_integer() => Registration.t()},
subscription_id_to_subscription: %{String.t() => Subscription.t()},
url: String.t()
}
# Supervisor interface
@impl WebSocket
# only allow secure WSS
def start_link(["wss://" <> _ = url, gen_server_options]) when is_list(gen_server_options) do
WebSockex.start_link(url, __MODULE__, %__MODULE__{url: url}, [
{:cacerts, :certifi.cacerts()},
{:insecure, false} | gen_server_options
])
end
# Client interface
@impl WebSocket
@spec json_rpc(WebSocket.web_socket(), Transport.request()) :: {:ok, Transport.result()} | {:error, reason :: term()}
def json_rpc(client, request) do
unique_id = EthereumJSONRPC.unique_request_id()
unique_request = Map.put(request, :id, unique_id)
{:ok, reference} = GenServer.call(client, {:register, %{id: unique_id, type: :json_rpc}})
WebSockex.send_frame(client, {:text, Jason.encode!(unique_request)})
receive do
{^reference, reply} -> reply
after
5000 ->
exit(:timeout)
end
end
@impl WebSocket
@spec subscribe(WebSocket.web_socket(), Subscription.event(), Subscription.params()) ::
{:ok, Subscription.t()} | {:error, reason :: term()}
def subscribe(client, event, params) when is_binary(event) and is_list(params) do
unique_id = EthereumJSONRPC.unique_request_id()
unique_request =
%{id: unique_id, method: "eth_subscribe", params: [event | params]}
|> request()
{:ok, reference} = GenServer.call(client, {:register, %{id: unique_id, type: :subscribe}})
WebSockex.send_frame(client, {:text, Jason.encode!(unique_request)})
receive do
{^reference, reply} -> reply
after
5000 ->
exit(:timeout)
end
end
@impl WebSocket
@spec unsubscribe(WebSocket.web_socket(), Subscription.t()) :: :ok | {:error, :not_found}
def unsubscribe(client, %Subscription{id: subscription_id}) do
unique_id = EthereumJSONRPC.unique_request_id()
unique_request =
%{id: unique_id, method: "eth_unsubscribe", params: [subscription_id]}
|> request()
with {:ok, reference} <-
GenServer.call(client, {:register, %{id: unique_id, type: :unsubscribe, subscription_id: subscription_id}}) do
WebSockex.send_frame(client, {:text, Jason.encode!(unique_request)})
receive do
{^reference, reply} -> reply
after
5000 ->
exit(:timeout)
end
end
end
# WebSockex functions
@impl WebSockex
def handle_frame({:text, message}, %__MODULE__{} = state) do
case Jason.decode(message) do
{:ok, json} ->
handle_response(json, state)
{:error, _} = error ->
broadcast(error, state)
{:ok, state}
end
end
@impl WebSockex
def handle_info({:"$gen_call", from, request}, %__MODULE__{} = state) do
handle_call(request, from, state)
end
defp handle_call(
{:register, %{id: id} = options},
from,
%__MODULE__{request_id_to_registration: request_id_to_registration} = state
)
when is_integer(id) do
case request_id_to_registration do
%{^id => _} ->
GenServer.reply(from, {:error, :already_registered})
{:ok, state}
_ ->
register(options, from, state)
end
end
defp register(%{id: id, type: type}, {_, reference} = from, state)
when type in ~w(json_rpc subscribe)a and is_reference(reference) do
GenServer.reply(from, {:ok, reference})
{:ok, put_in(state.request_id_to_registration[id], %Registration{from: from, type: type})}
end
defp register(
%{id: id, type: :unsubscribe = type, subscription_id: subscription_id},
{_, reference} = from,
%__MODULE__{subscription_id_to_subscription: subscription_id_to_subscription} = state
)
when is_reference(reference) do
case subscription_id_to_subscription do
%{^subscription_id => _} ->
GenServer.reply(from, {:ok, reference})
{:ok,
put_in(state.request_id_to_registration[id], %Registration{
from: from,
type: type,
subscription_id: subscription_id
})}
_ ->
GenServer.reply(from, {:error, :not_found})
{:ok, state}
end
end
defp handle_response(
%{"method" => "eth_subscription", "params" => %{"result" => result, "subscription" => subscription_id}},
%__MODULE__{subscription_id_to_subscription: subscription_id_to_subscription} = state
) do
case subscription_id_to_subscription do
%{^subscription_id => subscription} ->
Subscription.publish(subscription, {:ok, result})
_ ->
Logger.error(fn ->
[
"Unexpected `eth_subscription` subscripton ID (",
inspect(subscription_id),
") result (",
inspect(result),
"). Subscription ID not in known subscription IDs (",
subscription_id_to_subscription
|> Map.values()
|> Enum.map(&inspect/1),
")."
]
end)
end
{:ok, state}
end
defp handle_response(
%{"id" => id} = response,
%__MODULE__{request_id_to_registration: request_id_to_registration} = state
) do
{registration, new_request_id_to_registration} = Map.pop(request_id_to_registration, id)
respond_to_registration(registration, new_request_id_to_registration, response, state)
end
defp handle_response(response, %__MODULE__{} = state) do
Logger.error(fn ->
[
"Unexpected JSON response from web socket\n",
"\n",
" Response:\n",
" ",
inspect(response)
]
end)
{:ok, state}
end
defp respond_to_registration(
%Registration{type: :json_rpc, from: from},
new_request_id_to_registration,
response,
%__MODULE__{} = state
) do
reply =
case response do
%{"result" => result} -> {:ok, result}
%{"error" => error} -> {:error, error}
end
GenServer.reply(from, reply)
{:ok, %__MODULE__{state | request_id_to_registration: new_request_id_to_registration}}
end
defp respond_to_registration(
%Registration{type: :subscribe, from: {subscriber_pid, _} = from},
new_request_id_to_registration,
%{"result" => subscription_id},
%__MODULE__{url: url} = state
) do
subscription = %Subscription{
id: subscription_id,
subscriber_pid: subscriber_pid,
transport: EthereumJSONRPC.WebSocket,
transport_options: [web_socket: __MODULE__, web_socket_options: %{web_socket: self()}, url: url]
}
GenServer.reply(from, {:ok, subscription})
new_state =
state
|> put_in([Access.key!(:request_id_to_registration)], new_request_id_to_registration)
|> put_in([Access.key!(:subscription_id_to_subscription), subscription_id], subscription)
{:ok, new_state}
end
defp respond_to_registration(
%Registration{type: :subscribe, from: from},
new_request_id_to_registration,
%{"error" => error},
%__MODULE__{} = state
) do
GenServer.reply(from, {:error, error})
{:ok, %__MODULE__{state | request_id_to_registration: new_request_id_to_registration}}
end
defp respond_to_registration(
%Registration{type: :unsubscribe, from: from, subscription_id: subscription_id},
new_request_id_to_registration,
response,
%__MODULE__{} = state
) do
reply =
case response do
%{"result" => true} -> :ok
%{"result" => false} -> {:error, :not_found}
%{"error" => error} -> {:error, error}
end
GenServer.reply(from, reply)
new_state =
state
|> put_in([Access.key!(:request_id_to_registration)], new_request_id_to_registration)
|> update_in([Access.key!(:subscription_id_to_subscription)], &Map.delete(&1, subscription_id))
{:ok, new_state}
end
defp respond_to_registration(nil, _, response, %__MODULE__{} = state) do
Logger.error(fn -> ["Got response for unregistered request ID: ", inspect(response)] end)
{:ok, state}
end
defp broadcast(message, %__MODULE__{subscription_id_to_subscription: id_to_subscription}) do
id_to_subscription
|> Map.values()
|> Subscription.broadcast(message)
end
end

@ -75,14 +75,10 @@ defmodule EthereumJsonrpc.MixProject do
{:timex, "~> 3.1.24"},
# Encode/decode function names and arguments
{:ex_abi, "~> 0.1.16"},
# `EthereumJSONRPC.WebSocket`
{:socket, "~> 0.3.13"},
# `:verify_fun` for `Socket.Web.connect`
{:ssl_verify_fun, "~> 1.1"},
# `EthereumJSONRPC.WebSocket`
{:websocket_client, "~> 1.3"},
# `EthereumJSONRPC.WebSocket`
{:websockex, "~> 0.4.1"}
{:websocket_client, "~> 1.3"}
]
end
end

@ -71,7 +71,6 @@
"ranch": {:hex, :ranch, "1.3.2", "e4965a144dc9fbe70e5c077c65e73c57165416a901bd02ea899cfd95aa890986", [:rebar3], []},
"set_locale": {:git, "https://github.com/minifast/set_locale.git", "da9ae029642bc0fbd9212c2aaf86c0adca70c084", [branch: "master"]},
"sobelow": {:hex, :sobelow, "0.7.0", "68ac7cb55040e8d33fb0e5df0008b4f612d2b97ea2bc99a013967a9a200ffc57", [:mix], []},
"socket": {:hex, :socket, "0.3.13", "98a2ab20ce17f95fb512c5cadddba32b57273e0d2dba2d2e5f976c5969d0c632", [:mix], [], "hexpm"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.1", "28a4d65b7f59893bc2c7de786dec1e1555bd742d336043fe644ae956c3497fbe", [:make, :rebar], []},
"timex": {:hex, :timex, "3.1.25", "6002dae5432f749d1c93e2cd103eb73cecb53e50d2c885349e8e4146fc96bd44", [:mix], [{:combine, "~> 0.10", [hex: :combine, optional: false]}, {:gettext, "~> 0.10", [hex: :gettext, optional: false]}, {:tzdata, "~> 0.1.8 or ~> 0.5", [hex: :tzdata, optional: false]}]},
"timex_ecto": {:hex, :timex_ecto, "3.2.1", "461140751026e1ca03298fab628f78ab189e78784175f5e301eefa034ee530aa", [:mix], [{:ecto, "~> 2.2", [hex: :ecto, optional: false]}, {:timex, "~> 3.1", [hex: :timex, optional: false]}]},
@ -79,5 +78,4 @@
"unicode_util_compat": {:hex, :unicode_util_compat, "0.3.1", "a1f612a7b512638634a603c8f401892afbf99b8ce93a45041f8aaca99cadb85e", [:rebar3], []},
"wallaby": {:hex, :wallaby, "0.20.0", "cc6663555ff7b05afbebb2a8b461d18a5b321658b9017f7bc77d494b7063266a", [:mix], [{:httpoison, "~> 0.12", [hex: :httpoison, optional: false]}, {:poison, ">= 1.4.0", [hex: :poison, optional: false]}, {:poolboy, "~> 1.5", [hex: :poolboy, optional: false]}]},
"websocket_client": {:hex, :websocket_client, "1.3.0", "2275d7daaa1cdacebf2068891c9844b15f4fdc3de3ec2602420c2fb486db59b6", [:rebar3], [], "hexpm"},
"websockex": {:hex, :websockex, "0.4.1", "d7b7191ec3d5dd136683b60114405a5a130a175faade773a07cb52dbd98f9442", [:mix], [], "hexpm"},
}

Loading…
Cancel
Save