Merge pull request #2791 from poanetwork/ab-ipc-client

add ipc client
pull/2826/head
Victor Baranov 5 years ago committed by GitHub
commit 66058cf7a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 4
      apps/ethereum_jsonrpc/config/config.exs
  3. 25
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/application.ex
  4. 6
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/http.ex
  5. 94
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/ipc.ex
  6. 3
      apps/ethereum_jsonrpc/mix.exs
  7. 6
      apps/indexer/config/dev/ganache.exs
  8. 6
      apps/indexer/config/dev/geth.exs
  9. 7
      apps/indexer/config/dev/parity.exs
  10. 6
      apps/indexer/config/dev/rsk.exs
  11. 6
      apps/indexer/config/prod/ganache.exs
  12. 6
      apps/indexer/config/prod/geth.exs
  13. 6
      apps/indexer/config/prod/parity.exs
  14. 6
      apps/indexer/config/prod/rsk.exs

@ -1,6 +1,7 @@
## Current
### Features
- [#2791](https://github.com/poanetwork/blockscout/pull/2791) - add ipc client
- [#2449](https://github.com/poanetwork/blockscout/pull/2449) - add ability to send notification events through postgres notify
### Fixes

@ -9,6 +9,10 @@ config :ethereum_jsonrpc, EthereumJSONRPC.RequestCoordinator,
wait_per_timeout: :timer.seconds(20),
max_jitter: :timer.seconds(2)
config :ethereum_jsonrpc,
rpc_transport: if(System.get_env("ETHEREUM_JSONRPC_JSON_RPC_TRANSPORT", "http") == "http", do: :http, else: :ipc),
ipc_path: System.get_env("IPC_PATH")
# Add this configuration to add global RPC request throttling.
# throttle_rate_limit: 250,
# throttle_rolling_window_opts: [

@ -5,7 +5,7 @@ defmodule EthereumJSONRPC.Application do
use Application
alias EthereumJSONRPC.{RequestCoordinator, RollingWindow}
alias EthereumJSONRPC.{IPC, RequestCoordinator, RollingWindow}
@impl Application
def start(_type, _args) do
@ -18,6 +18,7 @@ defmodule EthereumJSONRPC.Application do
Supervisor.child_spec({RollingWindow, [rolling_window_opts]}, id: RollingWindow.ErrorThrottle)
]
|> add_throttle_rolling_window(config)
|> add_ipc_client()
|> Supervisor.start_link(strategy: :one_for_one, name: EthereumJSONRPC.Supervisor)
end
@ -37,4 +38,26 @@ defmodule EthereumJSONRPC.Application do
children
end
end
defp add_ipc_client(children) do
case Application.get_env(:ethereum_jsonrpc, :rpc_transport) do
:ipc ->
[
:poolboy.child_spec(:worker, poolboy_config(), path: Application.get_env(:ethereum_jsonrpc, :ipc_path))
| children
]
_ ->
children
end
end
defp poolboy_config do
[
{:name, {:local, :ipc_worker}},
{:worker_module, IPC},
{:size, 10},
{:max_overflow, 5}
]
end
end

@ -132,7 +132,7 @@ defmodule EthereumJSONRPC.HTTP do
# restrict response to only those fields supported by the JSON-RPC 2.0 standard, which means that level of keys is
# validated, so we can indicate that with switch to atom keys.
defp standardize_response(%{"jsonrpc" => "2.0" = jsonrpc, "id" => id} = unstandardized) do
def standardize_response(%{"jsonrpc" => "2.0" = jsonrpc, "id" => id} = unstandardized) do
# Nethermind return string ids
id = quantity_to_integer(id)
@ -155,8 +155,8 @@ defmodule EthereumJSONRPC.HTTP do
# restrict error to only those fields supported by the JSON-RPC 2.0 standard, which means that level of keys is
# validated, so we can indicate that with switch to atom keys.
defp standardize_error(%{"code" => code, "message" => message} = unstandardized)
when is_integer(code) and is_binary(message) do
def standardize_error(%{"code" => code, "message" => message} = unstandardized)
when is_integer(code) and is_binary(message) do
standardized = %{code: code, message: message}
case Map.fetch(unstandardized, "data") do

@ -0,0 +1,94 @@
defmodule EthereumJSONRPC.IPC do
use GenServer
@moduledoc false
import EthereumJSONRPC.HTTP, only: [standardize_response: 1]
# Server
def start_link(opts) do
GenServer.start_link(__MODULE__, Keyword.merge(opts, socket: nil))
end
def init(state) do
opts = [:binary, active: false, reuseaddr: true]
response = :gen_tcp.connect({:local, state[:path]}, 0, opts)
case response do
{:ok, socket} -> {:ok, Keyword.put(state, :socket, socket)}
{:error, reason} -> {:error, reason}
end
end
def post(pid, request) do
GenServer.call(pid, {:request, request})
end
def receive_response(data, socket, timeout, result \\ <<>>)
def receive_response({:error, reason}, _socket, _timeout, _result) do
{:error, reason}
end
def receive_response(:ok, socket, timeout, result) do
with {:ok, response} <- :gen_tcp.recv(socket, 0, timeout) do
new_result = result <> response
if String.ends_with?(response, "\n") do
{:ok, new_result}
else
receive_response(:ok, socket, timeout, new_result)
end
end
end
def receive_response(data, _socket, _timeout, _result) do
{:error, data}
end
def handle_call(
{:request, request},
_from,
[socket: socket, path: _] = state
) do
response =
socket
|> :gen_tcp.send(request)
|> receive_response(socket, 500_000)
{:reply, response, state}
end
# Client
def request(pid, payload) do
with {:ok, response} <- post(pid, Jason.encode!(payload)),
{:ok, decoded_body} <- Jason.decode(response) do
case decoded_body do
%{"error" => error} ->
{:error, error}
result = [%{} | _] ->
list =
result
|> Enum.reverse()
|> List.flatten()
|> Enum.map(&standardize_response/1)
{:ok, list}
result ->
{:ok, Map.get(result, "result")}
end
else
{:error, %Jason.DecodeError{data: ""}} -> {:error, :empty_response}
{:error, error} -> {:error, {:invalid_json, error}}
{:error, error} -> {:error, error}
end
end
def json_rpc(payload, _opts) do
:poolboy.transaction(:ipc_worker, fn pid -> request(pid, payload) end, 600_000)
end
end

@ -91,7 +91,8 @@ defmodule EthereumJsonrpc.MixProject do
{:websocket_client, "~> 1.3"},
{:decimal, "~> 1.0"},
{:decorator, "~> 1.2"},
{:hackney, "~> 1.15.2"}
{:hackney, "~> 1.15.2"},
{:poolboy, "~> 1.5.2"}
]
end
end

@ -3,7 +3,11 @@ use Mix.Config
config :indexer,
block_interval: :timer.seconds(5),
json_rpc_named_arguments: [
transport: EthereumJSONRPC.HTTP,
transport:
if(System.get_env("ETHEREUM_JSONRPC_JSON_RPC_TRANSPORT", "http") == "http",
do: EthereumJSONRPC.HTTP,
else: EthereumJSONRPC.IPC
),
transport_options: [
http: EthereumJSONRPC.HTTP.HTTPoison,
url: System.get_env("ETHEREUM_JSONRPC_HTTP_URL") || "http://localhost:7545",

@ -3,7 +3,11 @@ use Mix.Config
config :indexer,
block_interval: :timer.seconds(5),
json_rpc_named_arguments: [
transport: EthereumJSONRPC.HTTP,
transport:
if(System.get_env("ETHEREUM_JSONRPC_JSON_RPC_TRANSPORT", "http") == "http",
do: EthereumJSONRPC.HTTP,
else: EthereumJSONRPC.IPC
),
transport_options: [
http: EthereumJSONRPC.HTTP.HTTPoison,
url: System.get_env("ETHEREUM_JSONRPC_HTTP_URL") || "https://mainnet.infura.io/8lTvJTKmHPCHazkneJsY",

@ -3,7 +3,12 @@ use Mix.Config
config :indexer,
block_interval: :timer.seconds(5),
json_rpc_named_arguments: [
transport: EthereumJSONRPC.HTTP,
transport:
if(System.get_env("ETHEREUM_JSONRPC_JSON_RPC_TRANSPORT", "http") == "http",
do: EthereumJSONRPC.HTTP,
else: EthereumJSONRPC.IPC
),
else: EthereumJSONRPC.IPC,
transport_options: [
http: EthereumJSONRPC.HTTP.HTTPoison,
url: System.get_env("ETHEREUM_JSONRPC_HTTP_URL") || "http://localhost:8545",

@ -5,7 +5,11 @@ config :indexer,
blocks_concurrency: 1,
receipts_concurrency: 1,
json_rpc_named_arguments: [
transport: EthereumJSONRPC.HTTP,
transport:
if(System.get_env("ETHEREUM_JSONRPC_JSON_RPC_TRANSPORT", "http") == "http",
do: EthereumJSONRPC.HTTP,
else: EthereumJSONRPC.IPC
),
transport_options: [
http: EthereumJSONRPC.HTTP.HTTPoison,
url: System.get_env("ETHEREUM_JSONRPC_HTTP_URL") || "http://localhost:8545",

@ -3,7 +3,11 @@ use Mix.Config
config :indexer,
block_interval: :timer.seconds(5),
json_rpc_named_arguments: [
transport: EthereumJSONRPC.HTTP,
transport:
if(System.get_env("ETHEREUM_JSONRPC_JSON_RPC_TRANSPORT", "http") == "http",
do: EthereumJSONRPC.HTTP,
else: EthereumJSONRPC.IPC
),
transport_options: [
http: EthereumJSONRPC.HTTP.HTTPoison,
url: System.get_env("ETHEREUM_JSONRPC_HTTP_URL") || "http://localhost:7545",

@ -3,7 +3,11 @@ use Mix.Config
config :indexer,
block_interval: :timer.seconds(5),
json_rpc_named_arguments: [
transport: EthereumJSONRPC.HTTP,
transport:
if(System.get_env("ETHEREUM_JSONRPC_JSON_RPC_TRANSPORT", "http") == "http",
do: EthereumJSONRPC.HTTP,
else: EthereumJSONRPC.IPC
),
transport_options: [
http: EthereumJSONRPC.HTTP.HTTPoison,
url: System.get_env("ETHEREUM_JSONRPC_HTTP_URL") || "https://mainnet.infura.io/8lTvJTKmHPCHazkneJsY",

@ -3,7 +3,11 @@ use Mix.Config
config :indexer,
block_interval: :timer.seconds(5),
json_rpc_named_arguments: [
transport: EthereumJSONRPC.HTTP,
transport:
if(System.get_env("ETHEREUM_JSONRPC_JSON_RPC_TRANSPORT", "http") == "http",
do: EthereumJSONRPC.HTTP,
else: EthereumJSONRPC.IPC
),
transport_options: [
http: EthereumJSONRPC.HTTP.HTTPoison,
url: System.get_env("ETHEREUM_JSONRPC_HTTP_URL"),

@ -5,7 +5,11 @@ config :indexer,
blocks_concurrency: 1,
receipts_concurrency: 1,
json_rpc_named_arguments: [
transport: EthereumJSONRPC.HTTP,
transport:
if(System.get_env("ETHEREUM_JSONRPC_JSON_RPC_TRANSPORT", "http") == "http",
do: EthereumJSONRPC.HTTP,
else: EthereumJSONRPC.IPC
),
transport_options: [
http: EthereumJSONRPC.HTTP.HTTPoison,
url: System.get_env("ETHEREUM_JSONRPC_HTTP_URL"),

Loading…
Cancel
Save