diff --git a/.credo.exs b/.credo.exs index 604639c40e..7cae86277c 100644 --- a/.credo.exs +++ b/.credo.exs @@ -75,7 +75,9 @@ # Priority values are: `low, normal, high, higher` # {Credo.Check.Design.AliasUsage, - excluded_lastnames: ~w(Address DateTime Full Number Repo Time Unit), priority: :low}, + excluded_namespaces: ~w(Socket Task), + excluded_lastnames: ~w(Address DateTime Full Number Repo Time Unit), + priority: :low}, # For some checks, you can also set other parameters # diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/web_sockex/registration.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/registration.ex similarity index 83% rename from apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/web_sockex/registration.ex rename to apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/registration.ex index ecbe7b341f..d53b6dc61f 100644 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/web_sockex/registration.ex +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/registration.ex @@ -1,6 +1,6 @@ -defmodule EthereumJSONRPC.WebSocket.WebSockex.Registration do +defmodule EthereumJSONRPC.WebSocket.Registration do @moduledoc """ - When a caller registers for responses to asynchronous `WebSockex.send_frame` responses. + When a caller registers for responses to asynchronous frame responses. """ alias EthereumJSONRPC.Subscription @@ -18,7 +18,7 @@ defmodule EthereumJSONRPC.WebSocket.WebSockex.Registration do * `:unsubscribe` - an `eth_unsubscribe` request will be issued by the caller. Its response needs to be returned to caller **AND** the client needs to stop tracking the subscription. """ - @type type :: :json_rpc | :subscribe + @type type :: :json_rpc | :subscribe | :unsubscribe @type t :: %__MODULE__{from: GenServer.from(), type: type, subscription_id: Subscription.id()} end diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/socket.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/socket.ex new file mode 100644 index 0000000000..ef727c826a --- /dev/null +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/socket.ex @@ -0,0 +1,338 @@ +defmodule EthereumJSONRPC.WebSocket.Socket do + @moduledoc """ + Implements `EthereumJSONRPC.WebSocket` using `Socket.Web`. + """ + + use GenServer + + require Logger + + import EthereumJSONRPC, only: [request: 1] + + alias EthereumJSONRPC.{Subscription, Transport, WebSocket} + alias EthereumJSONRPC.WebSocket.Registration + alias EthereumJSONRPC.WebSocket.Socket.Receiver + + @behaviour WebSocket + + @enforce_keys ~w(receiver socket_web url)a + defstruct receiver: nil, + request_id_to_registration: %{}, + socket_web: nil, + subscription_id_to_subscription: %{}, + url: nil + + @type t :: %__MODULE__{ + receiver: pid(), + request_id_to_registration: %{non_neg_integer() => Registration.t()}, + socket_web: Socket.Web.t(), + subscription_id_to_subscription: %{Subscription.id() => Subscription.t()}, + url: String.t() + } + + # Supervisor interface + + @impl WebSocket + # only allow secure WSS + def start_link(["wss://" <> _ = url, gen_server_options]) when is_list(gen_server_options) do + GenServer.start_link(__MODULE__, url, gen_server_options) + end + + # Client interface + + @impl WebSocket + @spec json_rpc(WebSocket.web_socket(), Transport.request()) :: {:ok, Transport.result()} | {:error, reason :: term()} + def json_rpc(web_socket, request) do + GenServer.call(web_socket, {:json_rpc, request}) + end + + @impl WebSocket + @spec subscribe(WebSocket.web_socket(), Subscription.event(), Subscription.params()) :: + {:ok, Subscription.t()} | {:error, reason :: term()} + def subscribe(web_socket, event, params) when is_binary(event) and is_list(params) do + GenServer.call(web_socket, {:subscribe, event, params}) + end + + @impl WebSocket + @spec unsubscribe(WebSocket.web_socket(), Subscription.t()) :: :ok | {:error, :not_found} + def unsubscribe(web_socket, %Subscription{} = subscription) do + GenServer.call(web_socket, {:unsubscribe, subscription}) + end + + @impl GenServer + def init("wss://" <> _ = url) do + uri = URI.parse(url) + address = uri_to_address(uri) + + options = + options_put_uri( + [ + authorities: [path: :certifi.cacertfile()], + partial_chain: &partial_chain/1, + secure: true, + server_name: uri.host, + verify: [function: &:ssl_verify_hostname.verify_fun/3, data: [check_hostname: String.to_charlist(uri.host)]] + ], + uri + ) + + case Socket.Web.connect(address, options) do + {:ok, socket_web} -> + receiver = Receiver.spawn_link(%{parent: self(), socket_web: socket_web}) + {:ok, %__MODULE__{receiver: receiver, socket_web: socket_web, url: url}} + + {:error, reason} -> + {:stop, reason} + end + end + + @impl GenServer + def handle_call(message, from, %__MODULE__{socket_web: socket_web} = state) do + {updated_state, unique_request} = register(message, from, state) + + Socket.Web.send(socket_web, {:text, Jason.encode!(unique_request)}) + + {:noreply, updated_state} + end + + @impl GenServer + def handle_cast({:text, text}, %__MODULE__{} = state) do + case Jason.decode(text) do + {:ok, json} -> + handle_response(json, state) + + {:error, _} = error -> + broadcast(error, state) + {:noreply, state} + end + end + + def handle_cast(:close = close, %__MODULE__{} = state) do + broadcast({:error, close}, state) + end + + def handle_cast({:close, _code, _data} = close, %__MODULE__{} = state) do + broadcast({:error, close}, state) + end + + def handle_cast({:error, _reason} = error, %__MODULE__{} = state) do + broadcast(error, state) + end + + defp broadcast(message, %__MODULE__{subscription_id_to_subscription: id_to_subscription}) do + id_to_subscription + |> Map.values() + |> Subscription.broadcast(message) + end + + defp handle_response( + %{"method" => "eth_subscription", "params" => %{"result" => result, "subscription" => subscription_id}}, + %__MODULE__{subscription_id_to_subscription: subscription_id_to_subscription} = state + ) do + case subscription_id_to_subscription do + %{^subscription_id => subscription} -> + Subscription.publish(subscription, {:ok, result}) + + _ -> + Logger.error(fn -> + [ + "Unexpected `eth_subscription` subscription ID (", + inspect(subscription_id), + ") result (", + inspect(result), + "). Subscription ID not in known subscription IDs (", + subscription_id_to_subscription + |> Map.values() + |> Enum.map(&inspect/1), + ")." + ] + end) + end + + {:noreply, state} + end + + defp handle_response( + %{"id" => id} = response, + %__MODULE__{request_id_to_registration: request_id_to_registration} = state + ) do + {registration, new_request_id_to_registration} = Map.pop(request_id_to_registration, id) + respond_to_registration(registration, new_request_id_to_registration, response, state) + end + + defp handle_response(response, %__MODULE__{} = state) do + Logger.error(fn -> + [ + "Unexpected JSON response from web socket\n", + "\n", + " Response:\n", + " ", + inspect(response) + ] + end) + + {:noreply, state} + end + + defp options_put_uri(options, %URI{path: nil}), do: options + + defp options_put_uri(options, %URI{path: path}) when is_binary(path) do + Keyword.put(options, :path, path) + end + + defp partial_chain(certs) do + raise "BBOOM: #{inspect certs}" + end + + defp register( + {:json_rpc, original_request}, + from, + %__MODULE__{request_id_to_registration: request_id_to_registration} = state + ) do + unique_id = unique_request_id(state) + + {%__MODULE__{ + state + | request_id_to_registration: + Map.put(request_id_to_registration, unique_id, %Registration{ + from: from, + type: :json_rpc + }) + }, %{original_request | id: unique_id}} + end + + defp register( + {:subscribe, event, params}, + from, + %__MODULE__{request_id_to_registration: request_id_to_registration} = state + ) + when is_binary(event) and is_list(params) do + unique_id = unique_request_id(state) + + { + %__MODULE__{ + state + | request_id_to_registration: + Map.put(request_id_to_registration, unique_id, %Registration{from: from, type: :subscribe}) + }, + request(%{id: unique_id, method: "eth_subscribe", params: [event | params]}) + } + end + + defp register( + {:unsubscribe, %Subscription{id: subscription_id}}, + from, + %__MODULE__{request_id_to_registration: request_id_to_registration} = state + ) do + unique_id = unique_request_id(state) + + { + %__MODULE__{ + state + | request_id_to_registration: + Map.put(request_id_to_registration, unique_id, %Registration{ + from: from, + type: :unsubscribe, + subscription_id: subscription_id + }) + }, + request(%{id: unique_id, method: "eth_unsubscribe", params: [subscription_id]}) + } + end + + defp unique_request_id(%__MODULE__{request_id_to_registration: request_id_to_registration} = state) do + unique_request_id = EthereumJSONRPC.unique_request_id() + + case request_id_to_registration do + # collision + %{^unique_request_id => _} -> + unique_request_id(state) + + _ -> + unique_request_id + end + end + + defp respond_to_registration( + %Registration{type: :json_rpc, from: from}, + new_request_id_to_registration, + response, + %__MODULE__{} = state + ) do + reply = + case response do + %{"result" => result} -> {:ok, result} + %{"error" => error} -> {:error, error} + end + + GenServer.reply(from, reply) + + {:noreply, %__MODULE__{state | request_id_to_registration: new_request_id_to_registration}} + end + + defp respond_to_registration( + %Registration{type: :subscribe, from: {subscriber_pid, _} = from}, + new_request_id_to_registration, + %{"result" => subscription_id}, + %__MODULE__{url: url} = state + ) do + subscription = %Subscription{ + id: subscription_id, + subscriber_pid: subscriber_pid, + transport: EthereumJSONRPC.WebSocket, + transport_options: [web_socket: __MODULE__, web_socket_options: %{web_socket: self()}, url: url] + } + + GenServer.reply(from, {:ok, subscription}) + + new_state = + state + |> put_in([Access.key!(:request_id_to_registration)], new_request_id_to_registration) + |> put_in([Access.key!(:subscription_id_to_subscription), subscription_id], subscription) + + {:noreply, new_state} + end + + defp respond_to_registration( + %Registration{type: :subscribe, from: from}, + new_request_id_to_registration, + %{"error" => error}, + %__MODULE__{} = state + ) do + GenServer.reply(from, {:error, error}) + + {:noreply, %__MODULE__{state | request_id_to_registration: new_request_id_to_registration}} + end + + defp respond_to_registration( + %Registration{type: :unsubscribe, from: from, subscription_id: subscription_id}, + new_request_id_to_registration, + response, + %__MODULE__{} = state + ) do + reply = + case response do + %{"result" => true} -> :ok + %{"result" => false} -> {:error, :not_found} + %{"error" => error} -> {:error, error} + end + + GenServer.reply(from, reply) + + new_state = + state + |> put_in([Access.key!(:request_id_to_registration)], new_request_id_to_registration) + |> update_in([Access.key!(:subscription_id_to_subscription)], &Map.delete(&1, subscription_id)) + + {:noreply, new_state} + end + + defp respond_to_registration(nil, _, response, %__MODULE__{} = state) do + Logger.error(fn -> ["Got response for unregistered request ID: ", inspect(response)] end) + + {:noreply, state} + end + + defp uri_to_address(%URI{host: host, port: nil}), do: host + defp uri_to_address(%URI{host: host, port: port}) when is_integer(port), do: {host, port} +end diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/socket/receiver.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/socket/receiver.ex new file mode 100644 index 0000000000..4994eb63f5 --- /dev/null +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/socket/receiver.ex @@ -0,0 +1,71 @@ +defmodule EthereumJSONRPC.WebSocket.Socket.Receiver do + @moduledoc """ + Receives WebSocket messages with `Socket.Web.recv` and send non-ping/pong messages on to `EthereumJSONRPC.WebSocket.Socket` + """ + + @enforce_keys ~w(parent socket_web)a + defstruct fragments_type: nil, + fragments: [], + parent: nil, + socket_web: nil + + def spawn_link(named_arguments) do + spawn_link(__MODULE__, :loop, [struct!(__MODULE__, named_arguments)]) + end + + @doc false + def loop(%__MODULE__{socket_web: socket_web} = state) do + loop(state, Socket.Web.recv(socket_web)) + end + + # start of fragment + defp loop(state, {:ok, {:fragmented, type, fragment}}) when type in ~w(binary text)a do + loop(%__MODULE__{state | fragments_type: type, fragments: [fragment]}) + end + + # middle of fragment + defp loop(%__MODULE__{fragments: fragments} = state, {:ok, {:fragmented, :continuation, fragment}}) do + loop(%__MODULE__{state | fragments: [fragments | fragment]}) + end + + # end of fragment + defp loop( + %__MODULE__{fragments_type: fragments_type, fragments: fragments, parent: parent} = state, + {:ok, {:fragmented, :end, fragment}} + ) do + GenServer.cast(parent, {fragments_type, IO.iodata_to_binary([fragments | fragment])}) + loop(%__MODULE__{state | fragments_type: nil, fragments: []}) + end + + defp loop(%__MODULE__{parent: parent} = state, {:ok, {:binary, _} = binary}) do + GenServer.cast(parent, binary) + loop(state) + end + + defp loop(%__MODULE__{parent: parent} = state, {:ok, {:text, _} = text}) do + GenServer.cast(parent, text) + loop(state) + end + + defp loop(%__MODULE__{socket_web: socket_web} = state, {:ok, {:ping, data}}) do + Socket.Web.send!(socket_web, {:pong, data}) + loop(state) + end + + defp loop(%__MODULE__{parent: parent} = state, {:ok, {:pong, _data} = pong}) do + GenServer.cast(parent, pong) + loop(state) + end + + defp loop(%__MODULE__{parent: parent}, {:ok, :close = close}) do + GenServer.cast(parent, close) + end + + defp loop(%__MODULE__{parent: parent}, {:ok, {:close, _code, _data} = close}) do + GenServer.cast(parent, close) + end + + defp loop(%__MODULE__{parent: parent}, {:error, _reason} = error) do + GenServer.cast(parent, error) + end +end diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/web_sockex.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/web_sockex.ex index c5b77d8acd..8049252f60 100644 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/web_sockex.ex +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/web_sockex.ex @@ -10,7 +10,7 @@ defmodule EthereumJSONRPC.WebSocket.WebSockex do import EthereumJSONRPC, only: [request: 1] alias EthereumJSONRPC.{Subscription, Transport, WebSocket} - alias EthereumJSONRPC.WebSocket.WebSockex.Registration + alias EthereumJSONRPC.WebSocket.Registration @behaviour WebSocket diff --git a/apps/ethereum_jsonrpc/mix.exs b/apps/ethereum_jsonrpc/mix.exs index 5349af3076..9f1341c66e 100644 --- a/apps/ethereum_jsonrpc/mix.exs +++ b/apps/ethereum_jsonrpc/mix.exs @@ -75,7 +75,11 @@ defmodule EthereumJsonrpc.MixProject do {:timex, "~> 3.1.24"}, # Encode/decode function names and arguments {:ex_abi, "~> 0.1.16"}, - # `EthereumJSONRPC.WebSocket.Client` + # `EthereumJSONRPC.WebSocket` + {:socket, "~> 0.3.13"}, + # `:verify_fun` for `Socket.Web.connect` + {:ssl_verify_fun, "~> 1.1"}, + # `EthereumJSONRPC.WebSocket` {:websockex, "~> 0.4.1"} ] end diff --git a/apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/case/geth.ex b/apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/case/geth.ex index 51bf57aec0..7f08d19a0b 100644 --- a/apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/case/geth.ex +++ b/apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/case/geth.ex @@ -7,7 +7,7 @@ defmodule EthereumJSONRPC.WebSocket.Case.Geth do def setup do url = "wss://mainnet.infura.io/ws/8lTvJTKmHPCHazkneJsY" - web_socket_module = EthereumJSONRPC.WebSocket.WebSockex + web_socket_module = EthereumJSONRPC.WebSocket.Socket web_socket = start_supervised!({web_socket_module, [url, []]}) %{ diff --git a/apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/case/parity.ex b/apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/case/parity.ex index a13afd79e5..9edd9aa012 100644 --- a/apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/case/parity.ex +++ b/apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/case/parity.ex @@ -7,7 +7,7 @@ defmodule EthereumJSONRPC.WebSocket.Case.Parity do def setup do url = "wss://sokol-ws.poa.network/ws" - web_socket_module = EthereumJSONRPC.WebSocket.WebSockex + web_socket_module = EthereumJSONRPC.WebSocket.Socket web_socket = start_supervised!({web_socket_module, [url, []]}) %{ diff --git a/mix.lock b/mix.lock index 4e9b56f646..bdb5c8fd9c 100644 --- a/mix.lock +++ b/mix.lock @@ -71,6 +71,7 @@ "ranch": {:hex, :ranch, "1.3.2", "e4965a144dc9fbe70e5c077c65e73c57165416a901bd02ea899cfd95aa890986", [:rebar3], []}, "set_locale": {:git, "https://github.com/minifast/set_locale.git", "da9ae029642bc0fbd9212c2aaf86c0adca70c084", [branch: "master"]}, "sobelow": {:hex, :sobelow, "0.7.0", "68ac7cb55040e8d33fb0e5df0008b4f612d2b97ea2bc99a013967a9a200ffc57", [:mix], []}, + "socket": {:hex, :socket, "0.3.13", "98a2ab20ce17f95fb512c5cadddba32b57273e0d2dba2d2e5f976c5969d0c632", [:mix], [], "hexpm"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.1", "28a4d65b7f59893bc2c7de786dec1e1555bd742d336043fe644ae956c3497fbe", [:make, :rebar], []}, "timex": {:hex, :timex, "3.1.25", "6002dae5432f749d1c93e2cd103eb73cecb53e50d2c885349e8e4146fc96bd44", [:mix], [{:combine, "~> 0.10", [hex: :combine, optional: false]}, {:gettext, "~> 0.10", [hex: :gettext, optional: false]}, {:tzdata, "~> 0.1.8 or ~> 0.5", [hex: :tzdata, optional: false]}]}, "timex_ecto": {:hex, :timex_ecto, "3.2.1", "461140751026e1ca03298fab628f78ab189e78784175f5e301eefa034ee530aa", [:mix], [{:ecto, "~> 2.2", [hex: :ecto, optional: false]}, {:timex, "~> 3.1", [hex: :timex, optional: false]}]},