Merge pull request #572 from poanetwork/532

Use WebSocket subscription to newHeads for realtime indexer
pull/574/merge
Luke Imhoff 6 years ago committed by GitHub
commit 050870f8b4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 33
      .circleci/config.yml
  2. 4
      .credo.exs
  3. 55
      README.md
  4. 55
      apps/ethereum_jsonrpc/README.md
  5. 5
      apps/ethereum_jsonrpc/config/test/mox.exs
  6. 58
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex
  7. 3
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/http.ex
  8. 59
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/subscription.ex
  9. 50
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/transport.ex
  10. 113
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket.ex
  11. 24
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/registration.ex
  12. 322
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket/web_socket_client.ex
  13. 8
      apps/ethereum_jsonrpc/mix.exs
  14. 4
      apps/ethereum_jsonrpc/test/ethereum_jsonrpc/http/mox_test.exs
  15. 4
      apps/ethereum_jsonrpc/test/ethereum_jsonrpc/mox_test.exs
  16. 219
      apps/ethereum_jsonrpc/test/ethereum_jsonrpc/web_socket_test.exs
  17. 167
      apps/ethereum_jsonrpc/test/ethereum_jsonrpc_test.exs
  18. 47
      apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/case.ex
  19. 19
      apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/case/geth/http_websocket.ex
  20. 13
      apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/case/geth/mox.ex
  21. 20
      apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/case/parity/http_websocket.ex
  22. 13
      apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/case/parity/mox.ex
  23. 9
      apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/case.ex
  24. 25
      apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/case/geth.ex
  25. 76
      apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/case/mox.ex
  26. 25
      apps/ethereum_jsonrpc/test/support/ethereum_jsonrpc/web_socket/case/parity.ex
  27. 2
      apps/ethereum_jsonrpc/test/test_helper.exs
  28. 11
      apps/explorer/README.md
  29. 8
      apps/explorer/config/dev/geth.exs
  30. 8
      apps/explorer/config/dev/parity.exs
  31. 8
      apps/explorer/config/prod/geth.exs
  32. 8
      apps/explorer/config/prod/parity.exs
  33. 5
      apps/explorer/config/test/geth.exs
  34. 5
      apps/explorer/config/test/parity.exs
  35. 52
      apps/indexer/README.md
  36. 7
      apps/indexer/config/dev/geth.exs
  37. 7
      apps/indexer/config/dev/parity.exs
  38. 7
      apps/indexer/config/prod/geth.exs
  39. 7
      apps/indexer/config/prod/parity.exs
  40. 3
      apps/indexer/lib/indexer/application.ex
  41. 224
      apps/indexer/lib/indexer/block_fetcher.ex
  42. 181
      apps/indexer/lib/indexer/block_fetcher/catchup.ex
  43. 110
      apps/indexer/lib/indexer/block_fetcher/catchup/supervisor.ex
  44. 174
      apps/indexer/lib/indexer/block_fetcher/realtime.ex
  45. 37
      apps/indexer/lib/indexer/block_fetcher/realtime/supervisor.ex
  46. 101
      apps/indexer/lib/indexer/block_fetcher/supervisor.ex
  47. 41
      apps/indexer/test/indexer/block_fetcher/catchup/supervisor_test.exs
  48. 98
      apps/indexer/test/indexer/block_fetcher/realtime_test.exs
  49. 136
      apps/indexer/test/indexer/block_fetcher_test.exs
  50. 29
      apps/indexer/test/indexer/buffered_task_test.exs
  51. 1
      mix.lock

@ -296,7 +296,7 @@ jobs:
name: Scan block_scout_web for vulnerabilities
command: mix sobelow --config
working_directory: "apps/block_scout_web"
test_geth_http:
test_geth_http_websocket:
docker:
# Ensure .tool-versions matches
- image: circleci/elixir:1.7.2-node-browsers
@ -306,10 +306,8 @@ jobs:
PGPASSWORD: postgres
# match POSTGRES_USER for postgres image below
PGUSER: postgres
ETHEREUM_JSONRPC_VARIANT: "EthereumJSONRPC.Geth"
ETHEREUM_JSONRPC_TRANSPORT: "EthereumJSONRPC.HTTP"
ETHEREUM_JSONRPC_HTTP: "EthereumJSONRPC.HTTP.HTTPoison"
ETHEREUM_JSONRPC_HTTP_URL: "https://mainnet.infura.io/8lTvJTKmHPCHazkneJsY"
ETHEREUM_JSONRPC_CASE: "EthereumJSONRPC.Case.Geth.HTTPWebSocket"
ETHEREUM_JSONRPC_WEB_SOCKET_CASE: "EthereumJSONRPC.WebSocket.Case.Geth"
- image: circleci/postgres:10.3-alpine
environment:
# Match apps/explorer/config/test.exs config :explorerer, Explorer.Repo, database
@ -356,8 +354,8 @@ jobs:
PGPASSWORD: postgres
# match POSTGRES_USER for postgres image below
PGUSER: postgres
ETHEREUM_JSONRPC_VARIANT: "EthereumJSONRPC.Geth"
ETHEREUM_JSONRPC_TRANSPORT: "EthereumJSONRPC.Mox"
ETHEREUM_JSONRPC_CASE: "EthereumJSONRPC.Case.Geth.Mox"
ETHEREUM_JSONRPC_WEB_SOCKET_CASE: "EthereumJSONRPC.WebSocket.Case.Mox"
- image: circleci/postgres:10.3-alpine
environment:
# Match apps/explorer/config/test.exs config :explorerer, Explorer.Repo, database
@ -394,7 +392,7 @@ jobs:
path: cover/excoveralls.html
- store_test_results:
path: _build/test/junit
test_parity_http:
test_parity_http_websocket:
docker:
# Ensure .tool-versions matches
- image: circleci/elixir:1.7.2-node-browsers
@ -404,11 +402,8 @@ jobs:
PGPASSWORD: postgres
# match POSTGRES_USER for postgres image below
PGUSER: postgres
ETHEREUM_JSONRPC_VARIANT: "EthereumJSONRPC.Parity"
# enable on-chain tests against Sokol instead of `mox` tests run locally
ETHEREUM_JSONRPC_TRANSPORT: "EthereumJSONRPC.HTTP"
ETHEREUM_JSONRPC_HTTP: "EthereumJSONRPC.HTTP.HTTPoison"
ETHEREUM_JSONRPC_HTTP_URL: "https://sokol-trace.poa.network"
ETHEREUM_JSONRPC_CASE: "EthereumJSONRPC.Case.Parity.HTTPWebSocket"
ETHEREUM_JSONRPC_WEB_SOCKET_CASE: "EthereumJSONRPC.WebSocket.Case.Parity"
- image: circleci/postgres:10.3-alpine
environment:
# Match apps/explorer/config/test.exs config :explorer, Explorer.Repo, database
@ -455,8 +450,8 @@ jobs:
PGPASSWORD: postgres
# match POSTGRES_USER for postgres image below
PGUSER: postgres
ETHEREUM_JSONRPC_VARIANT: "EthereumJSONRPC.Parity"
ETHEREUM_JSONRPC_TRANSPORT: "EthereumJSONRPC.Mox"
ETHEREUM_JSONRPC_CASE: "EthereumJSONRPC.Case.Parity.Mox"
ETHEREUM_JSONRPC_WEB_SOCKET_CASE: "EthereumJSONRPC.WebSocket.Case.Mox"
- image: circleci/postgres:10.3-alpine
environment:
# Match apps/explorer/config/test.exs config :explorer, Explorer.Repo, database
@ -517,9 +512,9 @@ workflows:
- eslint
- jest
- sobelow
- test_parity_http
- test_parity_http_websocket
- test_parity_mox
- test_geth_http
- test_geth_http_websocket
- test_geth_mox
- dialyzer:
requires:
@ -536,13 +531,13 @@ workflows:
- sobelow:
requires:
- build
- test_parity_http:
- test_parity_http_websocket:
requires:
- build
- test_parity_mox:
requires:
- build
- test_geth_http:
- test_geth_http_websocket:
requires:
- build
- test_geth_mox:

@ -75,7 +75,9 @@
# Priority values are: `low, normal, high, higher`
#
{Credo.Check.Design.AliasUsage,
excluded_lastnames: ~w(Address DateTime Full Number Repo Time Unit), priority: :low},
excluded_namespaces: ~w(Socket Task),
excluded_lastnames: ~w(Address DateTime Full Number Repo Time Unit),
priority: :low},
# For some checks, you can also set other parameters
#

@ -125,7 +125,7 @@ To monitor build status, configure your local [CCMenu](http://ccmenu.org/) with
2. Format the Elixir code.
`mix format`
3. Run the test suite with coverage for whole umbrella project.
3. Run the test suite with coverage for whole umbrella project. This step can be run with different configuration outlined below.
`mix coveralls.html --umbrella`
4. Lint the Elixir code.
@ -144,16 +144,53 @@ To monitor build status, configure your local [CCMenu](http://ccmenu.org/) with
8. Test the JavaScript code.
`cd apps/block_scout_web/assets && npm run test; cd -`
##### Variant and Chain
##### Parity
By default, [`mox`](https://github.com/plataformatec/mox) will be used to mock the `EthereumJSONRPC.Transport` and `EthereumJSONRPC.HTTP` behaviours. They mocked behaviours returns differ based on the `EthereumJSONRPC.Variant`.
###### Mox
| `EthereumJSONRPC.Variant` | `EthereumJSONRPC.Transport` | `EthereumJSONRPC.HTTP` | `url` | Command | Usage(s) |
|:--------------------------|:----------------------------|:---------------------------------|:--------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:---------------------------------------------------|
| `EthereumJSONRPC.Parity` | `EthereumJSONRPC.Mox` | `EthereumJSONRPC.HTTP.Mox` | N/A | `mix test` | Local, `circleci/config.yml` `test_parity_mox` job |
| `EthereumJSONRPC.Parity` | `EthereumJSONRPC.HTTP` | `EthereumJSONRPC.HTTP.HTTPoison` | `https://trace-sokol.poa.network` | `ETHEREUM_JSONRPC_VARIANT=EthereumJSONRPC.Parity ETHEREUM_JSONRPC_TRANSPORT=EthereumJSONRPC.HTTP ETHEREUM_JSONRPC_HTTP=EthereumJSONRPC.HTTP.HTTPoison ETHEREUM_JSONRPC_HTTP_URL=https://sokol-trace.poa.network mix test --exclude no_parity` | `.circleci/config.yml` `test_parity_http` job |
| `EthereumJSONRPC.Geth` | `EthereumJSONRPC.Mox` | `EthereumJSONRPC.HTTP.Mox` | N/A | `ETHEREUM_JSONRPC_VARIANT=EthereumJSONRPC.Geth mix test --exclude no_geth` | `.circleci/config.yml` `test_geth_http` job |
| `EthereumJSONRPC.Geth` | `EthereumJSONRPC.HTTP` | `EthereumJSONRPC.HTTP.HTTPoison` | `https://mainnet.infura.io/8lTvJTKmHPCHazkneJsY` | `ETHEREUM_JSONRPC_VARIANT=EthereumJSONRPC.Geth ETHEREUM_JSONRPC_TRANSPORT=EthereumJSONRPC.HTTP ETHEREUM_JSONRPC_HTTP=EthereumJSONRPC.HTTP.HTTPoison ETHEREUM_JSONRPC_HTTP_URL=https://mainnet.infura.io/8lTvJTKmHPCHazkneJsY mix test --exclude no_geth` | `.circleci/config.yml` `test_geth_http` job |
**This is the default setup. `mix coveralls.html --umbrella` will work on its own, but to be explicit, use the following setup**:
```shell
export ETHEREUM_JSONRPC_CASE=EthereumJSONRPC.Case.Parity.Mox
export ETHEREUM_JSONRPC_WEB_SOCKET_CASE=EthereumJSONRPC.WebSocket.Case.Mox
mix coveralls.html --umbrella --exclude no_parity
```
###### HTTP / WebSocket
```shell
export ETHEREUM_JSONRPC_CASE=EthereumJSONRPC.Case.Parity.HTTPWebSocket
export ETHEREUM_JSONRPC_WEB_SOCKET_CASE=EthereumJSONRPC.WebSocket.Case.Parity
mix coveralls.html --umbrella --exclude no_parity
```
| Protocol | URL |
|:----------|:-----------------------------------|
| HTTP | `https://sokol-trace.poa.network` |
| WebSocket | `wss://sokol-ws.poa.network/ws` |
##### Geth
###### Mox
```shell
export ETHEREUM_JSONRPC_CASE=EthereumJSONRPC.Case.Geth.Mox
export ETHEREUM_JSONRPC_WEB_SOCKET_CASE=EthereumJSONRPC.WebSocket.Case.Mox
mix coveralls.html --umbrella --exclude no_geth
```
###### HTTP / WebSocket
```shell
export ETHEREUM_JSONRPC_CASE=EthereumJSONRPC.Case.Geth.HTTPWebSocket
export ETHEREUM_JSONRPC_WEB_SOCKET_CASE=EthereumJSONRPC.WebSocket.Case.Geth
mix coveralls.html --umbrella --exclude no_geth
```
| Protocol | URL |
|:----------|:--------------------------------------------------|
| HTTP | `https://mainnet.infura.io/8lTvJTKmHPCHazkneJsY` |
| WebSocket | `wss://mainnet.infura.io/ws/8lTvJTKmHPCHazkneJsY` |
### API Documentation

@ -22,14 +22,53 @@ library (`HTTPoison`), which forwards the options down to `:hackney`.
## Testing
By default, [`mox`](https://github.com/plataformatec/mox) will be used to mock the `EthereumJSONRPC.Transport` and `EthereumJSONRPC.HTTP` behaviours. The mocked behaviours returns differ based on the `EthereumJSONRPC.Variant`.
| `EthereumJSONRPC.Variant` | `EthereumJSONRPC.Transport` | `EthereumJSONRPC.HTTP` | `url` | Command | Usage(s) |
|:--------------------------|:----------------------------|:---------------------------------|:--------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:---------------------------------------------------|
| `EthereumJSONRPC.Parity` | `EthereumJSONRPC.Mox` | `EthereumJSONRPC.HTTP.Mox` | N/A | `mix test` | Local, `circleci/config.yml` `test_parity_mox` job |
| `EthereumJSONRPC.Parity` | `EthereumJSONRPC.HTTP` | `EthereumJSONRPC.HTTP.HTTPoison` | `https://sokol-trace.poa.network` | `ETHEREUM_JSONRPC_VARIANT=EthereumJSONRPC.Parity ETHEREUM_JSONRPC_TRANSPORT=EthereumJSONRPC.HTTP ETHEREUM_JSONRPC_HTTP=EthereumJSONRPC.HTTP.HTTPoison ETHEREUM_JSONRPC_HTTP_URL=https://sokol-trace.poa.network mix test --exclude no_parity` | `.circleci/config.yml` `test_parity_http` job |
| `EthereumJSONRPC.Geth` | `EthereumJSONRPC.Mox` | `EthereumJSONRPC.HTTP.Mox` | N/A | `ETHEREUM_JSONRPC_VARIANT=EthereumJSONRPC.Geth mix test --exclude no_geth` | `.circleci/config.yml` `test_geth_http` job |
| `EthereumJSONRPC.Geth` | `EthereumJSONRPC.HTTP` | `EthereumJSONRPC.HTTP.HTTPoison` | `https://mainnet.infura.io/8lTvJTKmHPCHazkneJsY` | `ETHEREUM_JSONRPC_VARIANT=EthereumJSONRPC.Geth ETHEREUM_JSONRPC_TRANSPORT=EthereumJSONRPC.HTTP ETHEREUM_JSONRPC_HTTP=EthereumJSONRPC.HTTP.HTTPoison ETHEREUM_JSONRPC_HTTP_URL=https://mainnet.infura.io/8lTvJTKmHPCHazkneJsY mix test --exclude no_geth` | `.circleci/config.yml` `test_geth_http` job |
### Parity
#### Mox
**This is the default setup. `mix test` will work on its own, but to be explicit, use the following setup**:
```shell
export ETHEREUM_JSONRPC_CASE=EthereumJSONRPC.Case.Parity.Mox
export ETHEREUM_JSONRPC_WEB_SOCKET_CASE=EthereumJSONRPC.WebSocket.Case.Mox
mix test --exclude no_parity
```
#### HTTP / WebSocket
```shell
export ETHEREUM_JSONRPC_CASE=EthereumJSONRPC.Case.Parity.HTTPWebSocket
export ETHEREUM_JSONRPC_WEB_SOCKET_CASE=EthereumJSONRPC.WebSocket.Case.Parity
mix test --exclude no_parity
```
| Protocol | URL |
|:----------|:-----------------------------------|
| HTTP | `https://sokol-trace.poa.network` |
| WebSocket | `wss://sokol-ws.poa.network/ws` |
### Geth
#### Mox
```shell
export ETHEREUM_JSONRPC_CASE=EthereumJSONRPC.Case.Geth.Mox
export ETHEREUM_JSONRPC_WEB_SOCKET_CASE=EthereumJSONRPC.WebSocket.Case.Mox
mix test --exclude no_geth
```
#### HTTP / WebSocket
```shell
export ETHEREUM_JSONRPC_CASE=EthereumJSONRPC.Case.Geth.HTTPWebSocket
export ETHEREUM_JSONRPC_WEB_SOCKET_CASE=EthereumJSONRPC.WebSocket.Case.Geth
mix test --exclude no_geth
```
| Protocol | URL |
|:----------|:--------------------------------------------------|
| HTTP | `https://mainnet.infura.io/8lTvJTKmHPCHazkneJsY` |
| WebSocket | `wss://mainnet.infura.io/ws/8lTvJTKmHPCHazkneJsY` |
## Installation

@ -0,0 +1,5 @@
# Tests with everything using `Mox`
use Mix.Config
config :ethereum_jsonrpc, EthereumJSONRPC.Case, json_rpc_named_arguments: [transport: EthereumJSONRPC.Mox]

@ -17,7 +17,7 @@ defmodule EthereumJSONRPC do
"""
alias Explorer.Chain.Block
alias EthereumJSONRPC.{Blocks, Receipts, Transactions, Transport, Variant}
alias EthereumJSONRPC.{Blocks, Receipts, Subscription, Transactions, Transport, Variant}
@typedoc """
Truncated 20-byte [KECCAK-256](https://en.wikipedia.org/wiki/SHA-3) hash encoded as a hexadecimal number in a
@ -52,6 +52,17 @@ defmodule EthereumJSONRPC do
{:transport, Transport.t()} | {:transport_options, Transport.options()} | {:variant, Variant.t()}
]
@typedoc """
Named arguments to `subscribe/2`.
* `:transport` - the `t:EthereumJSONRPC.Transport.t/0` callback module
* `:transport_options` - options passed to `c:EthereumJSONRPC.Transport.json_rpc/2`
* `:variant` - the `t:EthereumJSONRPC.Variant.t/0` callback module
"""
@type subscribe_named_arguments :: [
{:transport, Transport.t()} | {:transport_options, Transport.options()} | {:variant, Variant.t()}
]
@typedoc """
8 byte [KECCAK-256](https://en.wikipedia.org/wiki/SHA-3) hash of the proof-of-work.
"""
@ -292,6 +303,51 @@ defmodule EthereumJSONRPC do
Map.put(map, :jsonrpc, "2.0")
end
@doc """
Subscribes to `t:EthereumJSONRPC.Subscription.event/0` with `t:EthereumJSONRPC.Subscription.params/0`.
Events are delivered in a tuple tagged with the `t:EthereumJSONRPC.Subscription.t/0` and containing the same output
as the single-request form of `json_rpc/2`.
| Message | Description |
|-----------------------------------------------------------------------------------|----------------------------------------|
| `{EthereumJSONRPC.Subscription.t(), {:ok, EthreumsJSONRPC.Transport.result.t()}}` | New result in subscription |
| `{EthereumJSONRPC.Subscription.t(), {:error, reason :: term()}}` | There was an error in the subscription |
Subscription can be canceled by calling `unsubscribe/1` with the returned `t:EthereumJSONRPC.Subscription.t/0`.
"""
@spec subscribe(event :: Subscription.event(), params :: Subscription.params(), subscribe_named_arguments) ::
{:ok, Subscription.t()} | {:error, reason :: term()}
def subscribe(event, params \\ [], named_arguments) when is_list(params) do
transport = Keyword.fetch!(named_arguments, :transport)
transport_options = Keyword.fetch!(named_arguments, :transport_options)
transport.subscribe(event, params, transport_options)
end
@doc """
Unsubscribes to `t:EthereumJSONRPC.Subscription.t/0` created with `subscribe/2`.
## Returns
* `:ok` - subscription was canceled
* `{:error, :not_found}` - subscription could not be canceled. It did not exist because either the server already
canceled it, it never existed, or `unsubscribe/1 ` was called on it before.
* `{:error, reason :: term}` - other error cancelling subscription.
"""
@spec unsubscribe(Subscription.t()) :: :ok | {:error, reason :: term()}
def unsubscribe(%Subscription{transport: transport} = subscription) do
transport.unsubscribe(subscription)
end
# We can only depend on implementations supporting 64-bit integers:
# * Parity only supports u64 (https://github.com/paritytech/jsonrpc-core/blob/f2c61edb817e344d92ab3baf872fa77d1602430a/src/id.rs#L13)
def unique_request_id do
<<unique_request_id::big-integer-size(8)-unit(8)>> = :crypto.strong_rand_bytes(8)
unique_request_id
end
@doc """
Converts `t:timestamp/0` to `t:DateTime.t/0`
"""

@ -9,6 +9,9 @@ defmodule EthereumJSONRPC.HTTP do
@behaviour Transport
@doc """
Sends JSONRPC request encoded as `t:iodata/0` to `url` with `options`
"""
@callback json_rpc(url :: String.t(), json :: iodata(), options :: term()) ::
{:ok, %{body: body :: String.t(), status_code: status_code :: pos_integer()}}
| {:error, reason :: term}

@ -0,0 +1,59 @@
defmodule EthereumJSONRPC.Subscription do
@moduledoc """
A subscription to an event
"""
alias EthereumJSONRPC.Transport
@enforce_keys ~w(id subscriber_pid transport transport_options)a
defstruct ~w(id subscriber_pid transport transport_options)a
@typedoc """
An event that can be suscribed to.
* `"newHeads"` - when new blocks are added to chain including during reorgs.
"""
@type event :: String.t()
@typedoc """
Subscription ID returned from `eth_subscribe` and used to canceled a subscription with `eth_unsubscribe`.
"""
@type id :: String.t()
@typedoc """
Parameters for customizing subscription to `t:event/0`.
"""
@type params :: list()
@typedoc """
* `id` - the `t:/id/0` of the subscription on the server
* `subscriber_pid` - the `t:pid/0` of process where `transport_pid` should send messages
* `transport` - the `t:EthereumJSONRPC.Transport.t/0` callback module
* `transport_options` - options passed to `c:EthereumJSONRPC.Transport.json_rpc/2`
"""
@type t :: %__MODULE__{id: id, subscriber_pid: pid, transport: Transport.t(), transport_options: Transport.options()}
@doc """
Publishes `messages` to all `subscriptions`s' `subscriber_pid`s.
Sends `message` tagged with each `subscription`: `{subscription, message}`.
"""
@spec broadcast(Enumerable.t(), message :: term()) :: :ok
def broadcast(subscriptions, message) do
Enum.each(subscriptions, &publish(&1, message))
end
@doc """
Publishes `message` to the `subscription`'s `subscriber_pid`.
Sends `message` tagged with `subscription`: `{subscription, message}`.
"""
@spec publish(t(), message :: term()) :: :ok
def publish(%__MODULE__{subscriber_pid: subscriber_pid} = subscription, message) do
send(subscriber_pid, subscription_message(subscription, message))
end
defp subscription_message(%__MODULE__{} = subscription, message) do
{subscription, message}
end
end

@ -10,6 +10,8 @@ defmodule EthereumJSONRPC.Transport do
"""
alias EthereumJSONRPC.Subscription
@typedoc @moduledoc
@type t :: module
@ -74,6 +76,54 @@ defmodule EthereumJSONRPC.Transport do
"""
@type options :: term()
@doc """
Run a single Remote Procedure Call (RPC) `t:EthereumJSONRPC.EthereumJSONRPC.request/0` with
`t:EthereumJSONRPC.EthereumJSONRPC.options/0`.
## Returns
* `{:ok, result}` - `result` is the `/result` from JSONRPC response object of format
`%{"id" => ..., "result" => result}`.
* `{:error, reason}` - `reason` is the the `/error` from JSONRPC response object of format
`%{"id" => ..., "error" => reason}`. The transport can also give any `term()` for `reason` if a more specific
reason is possible.
"""
@callback json_rpc(request, options) :: {:ok, result} | {:error, reason :: term()}
@doc """
Runs a batch of Remote Procedure Call (RPC) `request`s with `options`.
## Returns
* `{:ok, [response]}` unlike `json_rpc(request, options)`, the individual `t:response.t/0` are not unwrapped and it
is the callers responsibility to extract the `t:result/0` or error `reason`.
* `{:error, reason}` an error that affects *all* `t:request/0`s, such as the batch as a whole being rejected.
"""
@callback json_rpc(batch_request, options) :: {:ok, batch_response} | {:error, reason :: term()}
@doc """
Subscribes to event in `request`.
Events **MUST** be delivered in a tuple tagged with the `t:EthereumJSONRPC.Subscription.t/0` and containing the same
output as the single-request form of `json_rpc/2`.
| Message | Description |
|-----------------------------------------------------------------------------------|----------------------------------------|
| `{EthereumJSONRPC.Subscription.t(), {:ok, EthereumJSONRPC.Transport.result.t()}}` | New result in subscription |
| `{EthereumJSONRPC.Subscription.t(), {:error, reason :: term()}}` | There was an error in the subscription |
`t:EthereumJSONRPC.Subscription.t/0` must be cancellable by passing it to `c:unsubscribe/1`
"""
@callback subscribe(Subscription.event(), Subscription.params(), options) ::
{:ok, Subscription.t()} | {:error, reason :: term()}
@doc """
Unsubscribes to subscription created by `c:subscribe/2`
"""
@callback unsubscribe(Subscription.t()) :: :ok | {:error, reason :: term()}
# HTTP does not support subscriptions
@optional_callbacks subscribe: 3, unsubscribe: 1
end

@ -0,0 +1,113 @@
defmodule EthereumJSONRPC.WebSocket do
@moduledoc """
JSONRPC over WebSocket.
"""
alias EthereumJSONRPC.{Subscription, Transport}
@behaviour Transport
@typedoc """
WebSocket name
"""
# same as `t:GenServer.name/0`
@type name :: atom() | {:global, term()} | {:via, module(), term()}
@typedoc """
WebSocket reference
"""
# same as `t:GenServer.server/0`
@type web_socket :: pid() | name() | {atom(), node()}
@typedoc """
Options passed to `EthereumJSONRPC.Transport` callbacks.
**MUST** contain `t:web_socket/0` referring to `t:pid/0` returned by `c:start_link/2`.
"""
@type options :: [{:web_socket, web_socket()} | {:web_socket_options, term()}]
@doc """
Allow `c:start_link/1` to be called as part of a supervision tree.
"""
@callback child_spec([url :: String.t() | options :: term()]) :: Supervisor.child_spec()
@doc """
Starts web socket attached to `url` with `options`.
"""
# Return is same as `t:GenServer.on_start/0`
@callback start_link([url :: String.t() | options :: term()]) ::
{:ok, pid()} | :ignore | {:error, {:already_started, pid()} | reason :: term()}
@doc """
Run a single Remote Procedure Call (RPC) `t:EthereumJSONRPC.Transport.request/0` through `t:web_socket/0`.
## Returns
* `{:ok, result}` - `result` is the `/result` from JSONRPC response object of format
`%{"id" => ..., "result" => result}`.
* `{:error, reason}` - `reason` is the the `/error` from JSONRPC response object of format
`%{"id" => ..., "error" => reason}`. The transport can also give any `term()` for `reason` if a more specific
reason is possible.
"""
@callback json_rpc(web_socket(), Transport.request()) :: {:ok, Transport.result()} | {:error, reason :: term()}
@doc """
Subscribes to `t:EthereumJSONRPC.Subscription.event/0` with `t:EthereumJSONRPC.Subscription.params/0` through
`t:web_socket/0`.
Events are delivered in a tuple tagged with the `t:EthereumJSONRPC.Subscription.t/0` and containing the same output
as `json_rpc/2`.
| Message | Description |
|-----------------------------------------------------------------------------------|----------------------------------------|
| `{EthereumJSONRPC.Subscription.t(), {:ok, EthreumsJSONRPC.Transport.result.t()}}` | New result in subscription |
| `{EthereumJSONRPC.Subscription.t(), {:error, reason :: term()}}` | There was an error in the subscription |
Subscription can be canceled by calling `unsubscribe/1` with the returned `t:EthereumJSONRPC.Subscription.t/0`.
"""
@callback subscribe(web_socket(), event :: Subscription.event(), params :: Subscription.params()) ::
{:ok, Subscription.t()} | {:error, reason :: term()}
@doc """
Unsubscribes to `t:EthereumJSONRPC.Subscription.t/0` created with `subscribe/2`.
## Returns
* `:ok` - subscription was canceled
* `{:error, reason}` - subscription could not be canceled.
"""
@callback unsubscribe(web_socket(), Subscription.t()) :: :ok | {:error, reason :: term()}
@impl Transport
@spec json_rpc(Transport.request(), options) :: {:ok, Transport.result()} | {:error, reason :: term()}
def json_rpc(request, options) do
web_socket_module = Keyword.fetch!(options, :web_socket)
%{web_socket: web_socket} = Keyword.fetch!(options, :web_socket_options)
web_socket_module.json_rpc(web_socket, request)
end
@impl Transport
@spec subscribe(event :: Subscription.event(), params :: Subscription.params(), options) ::
{:ok, Subscription.t()} | {:error, reason :: term()}
def subscribe(event, params, options) when is_binary(event) and is_list(params) do
web_socket_module = Keyword.fetch!(options, :web_socket)
%{web_socket: web_socket} = Keyword.fetch!(options, :web_socket_options)
web_socket_module.subscribe(web_socket, event, params)
end
@impl Transport
@spec unsubscribe(%Subscription{transport: __MODULE__, transport_options: options}) ::
:ok | {:error, reason :: term()}
def unsubscribe(%Subscription{transport: __MODULE__, transport_options: transport_options} = subscription) do
web_socket_module = Keyword.fetch!(transport_options, :web_socket)
%{web_socket: web_socket} = Keyword.fetch!(transport_options, :web_socket_options)
web_socket_module.unsubscribe(web_socket, subscription)
end
end

@ -0,0 +1,24 @@
defmodule EthereumJSONRPC.WebSocket.Registration do
@moduledoc """
When a caller registers for responses to asynchronous frame responses.
"""
alias EthereumJSONRPC.Subscription
@enforce_keys ~w(from type)a
defstruct ~w(from type subscription_id)a
@typedoc """
What kind of request will be issued by the caller
* `:json_rpc` - a generic JSONRPC request that just needs to be returned to the caller based on `id` matching.
* `:subscribe` - an `eth_subscribe` request will be issued by the caller. Its response need to be returned to
caller **AND** the client needs to `EthereumsJSONRPC.Subscription.publish/2` any `eth_subscription` messages to
the caller until the `EthereumJSONRPC.WebSocket.Client.unsubscribe/1` is called.
* `:unsubscribe` - an `eth_unsubscribe` request will be issued by the caller. Its response needs to be returned to
caller **AND** the client needs to stop tracking the subscription.
"""
@type type :: :json_rpc | :subscribe | :unsubscribe
@type t :: %__MODULE__{from: GenServer.from(), type: type, subscription_id: Subscription.id()}
end

@ -0,0 +1,322 @@
defmodule EthereumJSONRPC.WebSocket.WebSocketClient do
@moduledoc """
`EthereumJSONRPC.WebSocket` that uses `websocket_client`
"""
require Logger
import EthereumJSONRPC, only: [request: 1]
alias EthereumJSONRPC.{Subscription, Transport, WebSocket}
alias EthereumJSONRPC.WebSocket.Registration
@behaviour :websocket_client
@behaviour WebSocket
@enforce_keys ~w(url)a
defstruct request_id_to_registration: %{},
subscription_id_to_subscription: %{},
url: nil
# Supervisor interface
@impl WebSocket
def child_spec(arg) do
Supervisor.child_spec(%{id: __MODULE__, start: {__MODULE__, :start_link, [arg]}}, [])
end
@impl WebSocket
# only allow secure WSS
def start_link(["wss://" <> _ = url, gen_fsm_options]) when is_list(gen_fsm_options) do
fsm_name =
case Keyword.fetch(gen_fsm_options, :name) do
{:ok, name} when is_atom(name) -> {:local, name}
:error -> :undefined
end
%URI{host: host} = URI.parse(url)
host_charlist = String.to_charlist(host)
# `:depth`, `:verify`, and `:verify_fun`, are based on `:hackney_connect.ssl_opts_1/2` as we use `:hackney` through
# `:httpoison` and this keeps the SSL rules consistent between HTTP and WebSocket
:websocket_client.start_link(fsm_name, url, __MODULE__, url,
ssl_verify: :verify_peer,
socket_opts: [
cacerts: :certifi.cacerts(),
depth: 99,
# SNI extension discloses host name in the clear, but allows for compatibility with Virtual Hosting for TLS
server_name_indication: host_charlist,
verify_fun: {&:ssl_verify_hostname.verify_fun/3, [check_hostname: host_charlist]}
]
)
end
# Client interface
@impl WebSocket
@spec json_rpc(WebSocket.web_socket(), Transport.request()) :: {:ok, Transport.result()} | {:error, reason :: term()}
def json_rpc(web_socket, request) do
GenServer.call(web_socket, {:json_rpc, request})
end
@impl WebSocket
@spec subscribe(WebSocket.web_socket(), Subscription.event(), Subscription.params()) ::
{:ok, Subscription.t()} | {:error, reason :: term()}
def subscribe(web_socket, event, params) when is_binary(event) and is_list(params) do
GenServer.call(web_socket, {:subscribe, event, params})
end
@impl WebSocket
@spec unsubscribe(WebSocket.web_socket(), Subscription.t()) :: :ok | {:error, :not_found}
def unsubscribe(web_socket, %Subscription{} = subscription) do
GenServer.call(web_socket, {:unsubscribe, subscription})
end
@impl :websocket_client
def init(url) do
{:reconnect, %__MODULE__{url: url}}
end
@impl :websocket_client
def onconnect(_, %__MODULE__{} = state) do
{:ok, state}
end
@impl :websocket_client
def ondisconnect(reason, %__MODULE__{} = state) do
{:close, reason, state}
end
@impl :websocket_client
def websocket_handle({:text, text}, _request, %__MODULE__{} = state) do
case Jason.decode(text) do
{:ok, json} ->
handle_response(json, state)
{:error, _} = error ->
broadcast(error, state)
{:ok, state}
end
end
@impl :websocket_client
def websocket_info({:"$gen_call", from, request}, _, %__MODULE__{} = state) do
handle_call(request, from, state)
end
@impl :websocket_client
def websocket_terminate(close, _request, %__MODULE__{} = state) do
broadcast(close, state)
end
defp broadcast(message, %__MODULE__{subscription_id_to_subscription: id_to_subscription}) do
id_to_subscription
|> Map.values()
|> Subscription.broadcast(message)
end
defp handle_call(message, from, %__MODULE__{} = state) do
{updated_state, unique_request} = register(message, from, state)
{:reply, {:text, Jason.encode!(unique_request)}, updated_state}
end
defp handle_response(
%{"method" => "eth_subscription", "params" => %{"result" => result, "subscription" => subscription_id}},
%__MODULE__{subscription_id_to_subscription: subscription_id_to_subscription} = state
) do
case subscription_id_to_subscription do
%{^subscription_id => subscription} ->
Subscription.publish(subscription, {:ok, result})
_ ->
Logger.error(fn ->
[
"Unexpected `eth_subscription` subscription ID (",
inspect(subscription_id),
") result (",
inspect(result),
"). Subscription ID not in known subscription IDs (",
subscription_id_to_subscription
|> Map.values()
|> Enum.map(&inspect/1),
")."
]
end)
end
{:ok, state}
end
defp handle_response(
%{"id" => id} = response,
%__MODULE__{request_id_to_registration: request_id_to_registration} = state
) do
{registration, new_request_id_to_registration} = Map.pop(request_id_to_registration, id)
respond_to_registration(registration, new_request_id_to_registration, response, state)
end
defp handle_response(response, %__MODULE__{} = state) do
Logger.error(fn ->
[
"Unexpected JSON response from web socket\n",
"\n",
" Response:\n",
" ",
inspect(response)
]
end)
{:ok, state}
end
defp register(
{:json_rpc, original_request},
from,
%__MODULE__{request_id_to_registration: request_id_to_registration} = state
) do
unique_id = unique_request_id(state)
{%__MODULE__{
state
| request_id_to_registration:
Map.put(request_id_to_registration, unique_id, %Registration{
from: from,
type: :json_rpc
})
}, %{original_request | id: unique_id}}
end
defp register(
{:subscribe, event, params},
from,
%__MODULE__{request_id_to_registration: request_id_to_registration} = state
)
when is_binary(event) and is_list(params) do
unique_id = unique_request_id(state)
{
%__MODULE__{
state
| request_id_to_registration:
Map.put(request_id_to_registration, unique_id, %Registration{from: from, type: :subscribe})
},
request(%{id: unique_id, method: "eth_subscribe", params: [event | params]})
}
end
defp register(
{:unsubscribe, %Subscription{id: subscription_id}},
from,
%__MODULE__{request_id_to_registration: request_id_to_registration} = state
) do
unique_id = unique_request_id(state)
{
%__MODULE__{
state
| request_id_to_registration:
Map.put(request_id_to_registration, unique_id, %Registration{
from: from,
type: :unsubscribe,
subscription_id: subscription_id
})
},
request(%{id: unique_id, method: "eth_unsubscribe", params: [subscription_id]})
}
end
defp respond_to_registration(
%Registration{type: :json_rpc, from: from},
new_request_id_to_registration,
response,
%__MODULE__{} = state
) do
reply =
case response do
%{"result" => result} -> {:ok, result}
%{"error" => error} -> {:error, error}
end
GenServer.reply(from, reply)
{:ok, %__MODULE__{state | request_id_to_registration: new_request_id_to_registration}}
end
defp respond_to_registration(
%Registration{type: :subscribe, from: {subscriber_pid, _} = from},
new_request_id_to_registration,
%{"result" => subscription_id},
%__MODULE__{url: url} = state
) do
subscription = %Subscription{
id: subscription_id,
subscriber_pid: subscriber_pid,
transport: EthereumJSONRPC.WebSocket,
transport_options: [web_socket: __MODULE__, web_socket_options: %{web_socket: self()}, url: url]
}
GenServer.reply(from, {:ok, subscription})
new_state =
state
|> put_in([Access.key!(:request_id_to_registration)], new_request_id_to_registration)
|> put_in([Access.key!(:subscription_id_to_subscription), subscription_id], subscription)
{:ok, new_state}
end
defp respond_to_registration(
%Registration{type: :subscribe, from: from},
new_request_id_to_registration,
%{"error" => error},
%__MODULE__{} = state
) do
GenServer.reply(from, {:error, error})
{:ok, %__MODULE__{state | request_id_to_registration: new_request_id_to_registration}}
end
defp respond_to_registration(
%Registration{type: :unsubscribe, from: from, subscription_id: subscription_id},
new_request_id_to_registration,
response,
%__MODULE__{} = state
) do
reply =
case response do
%{"result" => true} -> :ok
%{"result" => false} -> {:error, :not_found}
%{"error" => %{"message" => "subscription not found"}} -> {:error, :not_found}
%{"error" => error} -> {:error, error}
end
GenServer.reply(from, reply)
new_state =
state
|> put_in([Access.key!(:request_id_to_registration)], new_request_id_to_registration)
|> update_in([Access.key!(:subscription_id_to_subscription)], &Map.delete(&1, subscription_id))
{:ok, new_state}
end
defp respond_to_registration(nil, _, response, %__MODULE__{} = state) do
Logger.error(fn -> ["Got response for unregistered request ID: ", inspect(response)] end)
{:ok, state}
end
defp unique_request_id(%__MODULE__{request_id_to_registration: request_id_to_registration} = state) do
unique_request_id = EthereumJSONRPC.unique_request_id()
case request_id_to_registration do
# collision
%{^unique_request_id => _} ->
unique_request_id(state)
_ ->
unique_request_id
end
end
end

@ -57,6 +57,8 @@ defmodule EthereumJsonrpc.MixProject do
# Run "mix help deps" to learn about dependencies.
defp deps do
[
# CACerts bundle for `EthereumJSONRPC.WebSocket.Client`
{:certifi, "~> 2.3"},
# Style Checking
{:credo, "0.9.2", only: [:dev, :test], runtime: false},
# Static Type Checking
@ -72,7 +74,11 @@ defmodule EthereumJsonrpc.MixProject do
# Convert unix timestamps in JSONRPC to DateTimes
{:timex, "~> 3.1.24"},
# Encode/decode function names and arguments
{:ex_abi, "~> 0.1.16"}
{:ex_abi, "~> 0.1.16"},
# `:verify_fun` for `Socket.Web.connect`
{:ssl_verify_fun, "~> 1.1"},
# `EthereumJSONRPC.WebSocket`
{:websocket_client, "~> 1.3"}
]
end
end

@ -7,7 +7,6 @@ defmodule EthereumJSONRPC.HTTP.MoxTest do
use ExUnit.Case, async: true
import EthereumJSONRPC, only: [request: 1]
import EthereumJSONRPC.Case, only: [variant: 0]
import EthereumJSONRPC.HTTP.Case
import Mox
@ -20,7 +19,8 @@ defmodule EthereumJSONRPC.HTTP.MoxTest do
url: url(),
http_options: http_options()
],
variant: variant()
# Which one does not matter, so pick one
variant: EthereumJSONRPC.Parity
]
}
end

@ -6,15 +6,13 @@ defmodule EthereumJSONRPC.MoxTest do
use ExUnit.Case, async: true
import EthereumJSONRPC.Case, only: [variant: 0]
import Mox
setup do
%{
json_rpc_named_arguments: [
transport: EthereumJSONRPC.Mox,
transport_options: [],
variant: variant()
transport_options: []
]
}
end

@ -0,0 +1,219 @@
defmodule EthereumJSONRPC.WebSocketTest do
use EthereumJSONRPC.WebSocket.Case, async: true
import EthereumJSONRPC, only: [request: 1]
import Mox
alias EthereumJSONRPC.{Subscription, WebSocket}
setup :verify_on_exit!
describe "json_rpc/2" do
test "can get result", %{subscribe_named_arguments: subscribe_named_arguments} do
transport_options = subscribe_named_arguments[:transport_options]
if transport_options[:web_socket] == EthereumJSONRPC.WebSocket.Mox do
expect(EthereumJSONRPC.WebSocket.Mox, :json_rpc, fn _, _ ->
{:ok, %{"number" => "0x0"}}
end)
end
assert {:ok, %{"number" => "0x0"}} =
%{id: 1, method: "eth_getBlockByNumber", params: ["earliest", false]}
|> request()
|> WebSocket.json_rpc(transport_options)
end
test "can get error", %{subscribe_named_arguments: subscribe_named_arguments} do
transport_options = subscribe_named_arguments[:transport_options]
if transport_options[:web_socket] == EthereumJSONRPC.WebSocket.Mox do
expect(EthereumJSONRPC.WebSocket.Mox, :json_rpc, fn _, _ ->
{:error,
%{
"code" => -32601,
"message" => "Method not found"
}}
end)
end
# purposely misspell method to trigger error
assert {:error,
%{
"code" => -32601,
# Message varies by variant, so don't match on it
"message" => _
}} =
%{id: 1, method: "eth_getBlockByNumbe", params: ["earliest", false]}
|> request()
|> WebSocket.json_rpc(transport_options)
end
end
describe "subscribe/2" do
test "can subscribe to newHeads", %{subscribe_named_arguments: subscribe_named_arguments} do
transport = Keyword.fetch!(subscribe_named_arguments, :transport)
transport_options = subscribe_named_arguments[:transport_options]
subscriber_pid = self()
if transport_options[:web_socket] == EthereumJSONRPC.WebSocket.Mox do
expect(EthereumJSONRPC.WebSocket.Mox, :subscribe, fn _, _, _ ->
{:ok,
%Subscription{
id: "0x1",
subscriber_pid: subscriber_pid,
transport: transport,
transport_options: transport_options
}}
end)
end
assert {:ok,
%Subscription{
id: subscription_id,
subscriber_pid: ^subscriber_pid,
transport: ^transport,
transport_options: ^transport_options
}} = WebSocket.subscribe("newHeads", [], transport_options)
assert is_binary(subscription_id)
end
test "delivers new heads to caller", %{
block_interval: block_interval,
subscribe_named_arguments: subscribe_named_arguments
} do
transport_options = subscribe_named_arguments[:transport_options]
web_socket_module = Keyword.fetch!(transport_options, :web_socket)
subscriber_pid = self()
if web_socket_module == EthereumJSONRPC.WebSocket.Mox do
expect(web_socket_module, :subscribe, fn _, _, _ ->
subscription = %Subscription{
id: "0x1",
subscriber_pid: subscriber_pid,
transport: Keyword.fetch!(subscribe_named_arguments, :transport),
transport_options: transport_options
}
Process.send_after(subscriber_pid, {subscription, {:ok, %{"number" => "0x1"}}}, block_interval)
{:ok, subscription}
end)
end
assert {:ok, subscription} = WebSocket.subscribe("newHeads", [], transport_options)
assert_receive {^subscription, {:ok, %{"number" => _}}}, block_interval * 2
end
end
describe "unsubscribe/2" do
test "can unsubscribe", %{subscribe_named_arguments: subscribe_named_arguments} do
transport_options = subscribe_named_arguments[:transport_options]
web_socket_module = Keyword.fetch!(transport_options, :web_socket)
subscriber_pid = self()
if web_socket_module == EthereumJSONRPC.WebSocket.Mox do
subscription = %Subscription{
id: "0x1",
subscriber_pid: subscriber_pid,
transport: Keyword.fetch!(subscribe_named_arguments, :transport),
transport_options: transport_options
}
web_socket_module
|> expect(:subscribe, fn _, _, _ -> {:ok, subscription} end)
|> expect(:unsubscribe, fn _, ^subscription -> :ok end)
end
assert {:ok, subscription} = WebSocket.subscribe("newHeads", [], transport_options)
assert :ok = WebSocket.unsubscribe(subscription)
end
test "stops messages being sent to subscriber", %{
block_interval: block_interval,
subscribe_named_arguments: subscribe_named_arguments
} do
transport_options = subscribe_named_arguments[:transport_options]
web_socket_module = Keyword.fetch!(transport_options, :web_socket)
subscriber_pid = self()
if web_socket_module == EthereumJSONRPC.WebSocket.Mox do
subscription = %Subscription{
id: "0x1",
subscriber_pid: subscriber_pid,
transport: Keyword.fetch!(subscribe_named_arguments, :transport),
transport_options: transport_options
}
web_socket_module
|> expect(:subscribe, 2, fn pid, _, _ when is_pid(pid) ->
send(pid, {:subscribe, subscription})
{:ok, subscription}
end)
|> expect(:unsubscribe, fn pid, ^subscription when is_pid(pid) ->
send(pid, {:unsubscribe, subscription})
:ok
end)
end
assert {:ok, first_subscription} =
WebSocket.subscribe("newHeads", [], subscribe_named_arguments[:transport_options])
assert {:ok, second_subscription} =
WebSocket.subscribe("newHeads", [], subscribe_named_arguments[:transport_options])
wait = block_interval * 2
assert_receive {^first_subscription, {:ok, %{"number" => _}}}, wait
assert_receive {^second_subscription, {:ok, %{"number" => _}}}, wait
assert :ok = WebSocket.unsubscribe(first_subscription)
clear_mailbox()
# see the message on the second subscription, so that we don't have to wait for the refute_receive, which would
# wait the full timeout
assert_receive {^second_subscription, {:ok, %{"number" => _}}}, wait
refute_receive {^first_subscription, _}
end
test "return error if already unsubscribed", %{subscribe_named_arguments: subscribe_named_arguments} do
transport_options = subscribe_named_arguments[:transport_options]
web_socket_module = Keyword.fetch!(transport_options, :web_socket)
subscriber_pid = self()
if web_socket_module == EthereumJSONRPC.WebSocket.Mox do
subscription = %Subscription{
id: "0x1",
subscriber_pid: subscriber_pid,
transport: Keyword.fetch!(subscribe_named_arguments, :transport),
transport_options: transport_options
}
web_socket_module
|> expect(:subscribe, fn _, _, _ -> {:ok, subscription} end)
|> expect(:unsubscribe, fn _, ^subscription -> :ok end)
|> expect(:unsubscribe, fn _, ^subscription -> {:error, :not_found} end)
end
assert {:ok, subscription} = WebSocket.subscribe("newHeads", [], transport_options)
assert :ok = WebSocket.unsubscribe(subscription)
assert {:error, :not_found} = WebSocket.unsubscribe(subscription)
end
end
defp clear_mailbox do
receive do
_ -> clear_mailbox()
after
0 ->
:ok
end
end
end

@ -4,6 +4,8 @@ defmodule EthereumJSONRPCTest do
import EthereumJSONRPC.Case
import Mox
alias EthereumJSONRPC.Subscription
setup :verify_on_exit!
@moduletag :capture_log
@ -219,4 +221,169 @@ defmodule EthereumJSONRPCTest do
)
end
end
describe "subscribe/2" do
test "can subscribe to newHeads", %{subscribe_named_arguments: subscribe_named_arguments} do
transport = Keyword.fetch!(subscribe_named_arguments, :transport)
transport_options = subscribe_named_arguments[:transport_options]
subscriber_pid = self()
if transport == EthereumJSONRPC.Mox do
expect(transport, :subscribe, fn _, _, _ ->
{:ok,
%Subscription{
id: "0x1",
subscriber_pid: subscriber_pid,
transport: transport,
transport_options: transport_options
}}
end)
end
assert {:ok,
%Subscription{
id: subscription_id,
subscriber_pid: ^subscriber_pid,
transport: ^transport,
transport_options: ^transport_options
}} = EthereumJSONRPC.subscribe("newHeads", subscribe_named_arguments)
assert is_binary(subscription_id)
end
test "delivers new heads to caller", %{
block_interval: block_interval,
subscribe_named_arguments: subscribe_named_arguments
} do
transport = Keyword.fetch!(subscribe_named_arguments, :transport)
transport_options = subscribe_named_arguments[:transport_options]
subscriber_pid = self()
if transport == EthereumJSONRPC.Mox do
expect(transport, :subscribe, fn _, _, _ ->
subscription = %Subscription{
id: "0x1",
subscriber_pid: subscriber_pid,
transport: transport,
transport_options: transport_options
}
Process.send_after(subscriber_pid, {subscription, {:ok, %{"number" => "0x1"}}}, block_interval)
{:ok, subscription}
end)
end
assert {:ok, subscription} = EthereumJSONRPC.subscribe("newHeads", subscribe_named_arguments)
assert_receive {^subscription, {:ok, %{"number" => _}}}, block_interval * 2
end
end
describe "unsubscribe/2" do
test "can unsubscribe", %{subscribe_named_arguments: subscribe_named_arguments} do
transport = Keyword.fetch!(subscribe_named_arguments, :transport)
transport_options = subscribe_named_arguments[:transport_options]
subscriber_pid = self()
if transport == EthereumJSONRPC.Mox do
subscription = %Subscription{
id: "0x1",
subscriber_pid: subscriber_pid,
transport: transport,
transport_options: transport_options
}
transport
|> expect(:subscribe, fn _, _, _ -> {:ok, subscription} end)
|> expect(:unsubscribe, fn ^subscription -> :ok end)
end
assert {:ok, subscription} = EthereumJSONRPC.subscribe("newHeads", subscribe_named_arguments)
assert :ok = EthereumJSONRPC.unsubscribe(subscription)
end
test "stops messages being sent to subscriber", %{
block_interval: block_interval,
subscribe_named_arguments: subscribe_named_arguments
} do
transport = Keyword.fetch!(subscribe_named_arguments, :transport)
subscriber_pid = self()
if transport == EthereumJSONRPC.Mox do
subscription = %Subscription{
id: "0x1",
subscriber_pid: subscriber_pid,
transport: transport,
transport_options: Keyword.fetch!(subscribe_named_arguments, :transport_options)
}
{:ok, pid} = Task.start_link(EthereumJSONRPC.WebSocket.Case.Mox, :loop, [%{}])
transport
|> expect(:subscribe, 2, fn "newHeads", [], _ ->
send(pid, {:subscribe, subscription})
{:ok, subscription}
end)
|> expect(:unsubscribe, fn ^subscription ->
send(pid, {:unsubscribe, subscription})
:ok
end)
end
assert {:ok, first_subscription} = EthereumJSONRPC.subscribe("newHeads", [], subscribe_named_arguments)
assert {:ok, second_subscription} = EthereumJSONRPC.subscribe("newHeads", [], subscribe_named_arguments)
wait = block_interval * 2
assert_receive {^first_subscription, {:ok, %{"number" => _}}}, wait
assert_receive {^second_subscription, {:ok, %{"number" => _}}}, wait
assert :ok = EthereumJSONRPC.unsubscribe(first_subscription)
clear_mailbox()
# see the message on the second subscription, so that we don't have to wait for the refute_receive, which would
# wait the full timeout
assert_receive {^second_subscription, {:ok, %{"number" => _}}}, wait
refute_receive {^first_subscription, _}
end
test "return error if already unsubscribed", %{subscribe_named_arguments: subscribe_named_arguments} do
transport = Keyword.fetch!(subscribe_named_arguments, :transport)
transport_options = subscribe_named_arguments[:transport_options]
subscriber_pid = self()
if transport == EthereumJSONRPC.Mox do
subscription = %Subscription{
id: "0x1",
subscriber_pid: subscriber_pid,
transport: transport,
transport_options: transport_options
}
transport
|> expect(:subscribe, fn _, _, _ -> {:ok, subscription} end)
|> expect(:unsubscribe, fn ^subscription -> :ok end)
|> expect(:unsubscribe, fn ^subscription -> {:error, :not_found} end)
end
assert {:ok, subscription} = EthereumJSONRPC.subscribe("newHeads", [], subscribe_named_arguments)
assert :ok = EthereumJSONRPC.unsubscribe(subscription)
assert {:error, :not_found} = EthereumJSONRPC.unsubscribe(subscription)
end
end
defp clear_mailbox do
receive do
_ -> clear_mailbox()
after
0 ->
:ok
end
end
end

@ -1,20 +1,26 @@
defmodule EthereumJSONRPC.Case do
@moduledoc """
Adds `json_rpc_named_arguments` to context.
Adds `json_rpc_named_arguments` and `subscribe_named_arguments` to context.
Reads `ETHEREUM_JSONRPC_TRANSPORT` environment variable to determine which module to use `:json_rpc_named_arguments`
`:transport`:
## `json_rpc_named_arguments`
Reads `ETHEREUM_JSONRPC_JSON_RPC_TRANSPORT` environment variable to determine which module to use
`:json_rpc_named_arguments` `:transport`:
* `EthereumJSONRPC.HTTP` - Allow testing of HTTP-only behavior like status codes
* `EthereumJSONRPC.Mox` - mock, transport neutral responses. The default for local testing.
* `EthereumJSONRPC.WebSocket` - Allow testing of WebSocket-only behavior like subscriptions
When `ETHEREUM_JSONRPC_TRANSPORT` is `EthereumJSONRPC.HTTP`, then reads `ETHEREUM_JSONRPC_HTTP_URL` environment
variable to determine `:json_rpc_named_arguments` `:transport_options` `:url`. Failure to set
When `ETHEREUM_JSONRPC_JSON_RPC_TRANSPORT` is `EthereumJSONRPC.HTTP`, then reads `ETHEREUM_JSONRPC_HTTP_URL`
environment variable to determine `:json_rpc_named_arguments` `:transport_options` `:url`. Failure to set
`ETHEREUM_JSONRPC_HTTP_URL` in this case will raise an `ArgumentError`.
* `EthereumJSONRPC.HTTP.HTTPoison` - HTTP responses from calls to real chain URLs
* `EthereumJSONRPC.HTTP.Mox` - mock HTTP responses, so can be used for HTTP-only behavior like status codes.
## `subscribe_named_arguments`
Reads `ETHEREUM_JSONRPC_
"""
use ExUnit.CaseTemplate
@ -22,28 +28,7 @@ defmodule EthereumJSONRPC.Case do
require Logger
setup do
transport = transport()
transport_options =
case transport do
EthereumJSONRPC.HTTP ->
[
http: EthereumJSONRPC.HTTP.Case.http(),
url: EthereumJSONRPC.HTTP.Case.url(),
http_options: EthereumJSONRPC.HTTP.Case.http_options()
]
_ ->
[]
end
%{
json_rpc_named_arguments: [
transport: transport,
transport_options: transport_options,
variant: variant()
]
}
module("ETHEREUM_JSONRPC_CASE", "EthereumJSONRPC.Case.Parity.Mox").setup()
end
def log_bad_gateway(under_test, assertions) do
@ -68,12 +53,4 @@ defmodule EthereumJSONRPC.Case do
module
end
def transport do
module("ETHEREUM_JSONRPC_TRANSPORT", "EthereumJSONRPC.Mox")
end
def variant do
module("ETHEREUM_JSONRPC_VARIANT", "EthereumJSONRPC.Parity")
end
end

@ -0,0 +1,19 @@
defmodule EthereumJSONRPC.Case.Geth.HTTPWebSocket do
@moduledoc """
`EthereumJSONRPC.Case` for connecting to Geth using `EthereumJSONRPC.HTTP` for `json_rpc_named_arguments`
`transport` and `EthereumJSONRPC.WebSocket` for `subscribe_named_arguments` `transport`.
"""
def setup do
EthereumJSONRPC.WebSocket.Case.Geth.setup()
|> Map.put(:json_rpc_named_arguments,
transport: EthereumJSONRPC.HTTP,
transport_options: [
http: EthereumJSONRPC.HTTP.HTTPoison,
http_options: [recv_timeout: 60_000, timeout: 60_000, hackney: [pool: :ethereum_jsonrpc]],
url: "https://mainnet.infura.io/8lTvJTKmHPCHazkneJsY"
],
variant: EthereumJSONRPC.Geth
)
end
end

@ -0,0 +1,13 @@
defmodule EthereumJSONRPC.Case.Geth.Mox do
@moduledoc """
`EthereumJSONRPC.Case` for mocking connecting to Geth using `Mox`
"""
def setup do
%{
block_interval: 500,
json_rpc_named_arguments: [transport: EthereumJSONRPC.Mox, transport_options: [], variant: EthereumJSONRPC.Geth],
subscribe_named_arguments: [transport: EthereumJSONRPC.Mox, transport_options: []]
}
end
end

@ -0,0 +1,20 @@
defmodule EthereumJSONRPC.Case.Parity.HTTPWebSocket do
@moduledoc """
`EthereumJSONRPC.Case` for connecting to Parity using `EthereumJSONRPC.HTTP` for `json_rpc_named_arguments`
`transport` and `EthereumJSONRPC.WebSocket` for `subscribe_named_arguments` `transport`.
"""
def setup do
EthereumJSONRPC.WebSocket.Case.Parity.setup()
|> Map.put(
:json_rpc_named_arguments,
transport: EthereumJSONRPC.HTTP,
transport_options: [
http: EthereumJSONRPC.HTTP.HTTPoison,
http_options: [recv_timeout: 60_000, timeout: 60_000, hackney: [pool: :ethereum_jsonrpc]],
url: "https://sokol-trace.poa.network"
],
variant: EthereumJSONRPC.Parity
)
end
end

@ -0,0 +1,13 @@
defmodule EthereumJSONRPC.Case.Parity.Mox do
@moduledoc """
`EthereumJSONRPC.Case` for mocking connecting to Parity using `Mox`
"""
def setup do
%{
block_interval: 500,
json_rpc_named_arguments: [transport: EthereumJSONRPC.Mox, transport_options: [], variant: EthereumJSONRPC.Parity],
subscribe_named_arguments: [transport: EthereumJSONRPC.Mox, transport_options: []]
}
end
end

@ -0,0 +1,9 @@
defmodule EthereumJSONRPC.WebSocket.Case do
use ExUnit.CaseTemplate
import EthereumJSONRPC.Case, only: [module: 2]
setup do
module("ETHEREUM_JSONRPC_WEB_SOCKET_CASE", "EthereumJSONRPC.WebSocket.Case.Mox").setup()
end
end

@ -0,0 +1,25 @@
defmodule EthereumJSONRPC.WebSocket.Case.Geth do
@moduledoc """
`EthereumJSONRPC.WebSocket.Case` connecting to Geth.
"""
import ExUnit.Callbacks, only: [start_supervised!: 1]
def setup do
url = "wss://mainnet.infura.io/ws/8lTvJTKmHPCHazkneJsY"
web_socket_module = EthereumJSONRPC.WebSocket.WebSocketClient
web_socket = start_supervised!({web_socket_module, [url, []]})
%{
block_interval: 25_000,
subscribe_named_arguments: [
transport: EthereumJSONRPC.WebSocket,
transport_options: [
web_socket: web_socket_module,
web_socket_options: %{web_socket: web_socket},
url: url
]
]
}
end
end

@ -0,0 +1,76 @@
defmodule EthereumJSONRPC.WebSocket.Case.Mox do
@moduledoc """
`EthereumJSONRPC.WebSocket.Case` using `Mox`
"""
import ExUnit.Callbacks, only: [start_supervised!: 1]
import Mox
alias EthereumJSONRPC.Subscription
@block_interval 250
def setup do
web_socket_module = EthereumJSONRPC.WebSocket.Mox
web_socket_module
|> allow(self(), supervisor())
|> stub(:child_spec, fn arguments ->
Supervisor.child_spec(
%{
id: web_socket_module,
start: {web_socket_module, :start_link, arguments}
},
[]
)
end)
|> stub(:start_link, fn _ ->
Task.start_link(__MODULE__, :loop, [%{}])
end)
url = "wss://example.com/ws"
web_socket = start_supervised!({web_socket_module, [url]})
%{
block_interval: @block_interval,
subscribe_named_arguments: [
transport: EthereumJSONRPC.WebSocket,
transport_options: [
web_socket: web_socket_module,
web_socket_options: %{web_socket: web_socket},
url: url
]
]
}
end
def loop(%{subscription: subscription, timer_reference: timer_reference}) do
receive do
{:unsubscribe, ^subscription} ->
{:ok, :cancel} = :timer.cancel(timer_reference)
loop(%{})
end
end
def loop(%{}) do
receive do
{:subscribe, %Subscription{subscriber_pid: subscriber_pid} = subscription} ->
{:ok, timer_reference} =
:timer.send_interval(@block_interval, subscriber_pid, {subscription, {:ok, %{"number" => "0x1"}}})
loop(%{subscription: subscription, timer_reference: timer_reference})
end
end
defp supervisor do
case ExUnit.OnExitHandler.get_supervisor(self()) do
{:ok, nil} ->
{:ok, sup} = Supervisor.start_link([], strategy: :one_for_one, max_restarts: 1_000_000, max_seconds: 1)
ExUnit.OnExitHandler.put_supervisor(self(), sup)
sup
{:ok, sup} ->
sup
end
end
end

@ -0,0 +1,25 @@
defmodule EthereumJSONRPC.WebSocket.Case.Parity do
@moduledoc """
`EthereumJSONRPC.WebSocket.Case` connecting to Parity.
"""
import ExUnit.Callbacks, only: [start_supervised!: 1]
def setup do
url = "wss://sokol-ws.poa.network/ws"
web_socket_module = EthereumJSONRPC.WebSocket.WebSocketClient
web_socket = start_supervised!({web_socket_module, [url, []]})
%{
block_interval: 5_000,
subscribe_named_arguments: [
transport: EthereumJSONRPC.WebSocket,
transport_options: [
web_socket: web_socket_module,
web_socket_options: %{web_socket: web_socket},
url: url
]
]
}
end
end

@ -9,6 +9,8 @@ File.mkdir_p!(junit_folder)
Mox.defmock(EthereumJSONRPC.Mox, for: EthereumJSONRPC.Transport)
# for when we need to simulate HTTP-specific stuff like 413 Request Entity Too Large
Mox.defmock(EthereumJSONRPC.HTTP.Mox, for: EthereumJSONRPC.HTTP)
# for when we need to simulate WebSocket-specific stuff
Mox.defmock(EthereumJSONRPC.WebSocket.Mox, for: EthereumJSONRPC.WebSocket)
ExUnit.configure(formatters: [JUnitFormatter, ExUnit.CLIFormatter])
ExUnit.start()

@ -34,17 +34,6 @@ To get BlockScout up and running locally:
* Run the dialyzer: `mix dialyzer --halt-exit-status`
* Check the Elixir code for vulnerabilities: `$ mix sobelow --config`
#### Variant and Chain
By default, [`mox`](https://github.com/plataformatec/mox) will be used to mock the `EthereumJSONRPC.Transport` and `EthereumJSONRPC.HTTP` behaviours. The mocked behaviours returns differ based on the `EthereumJSONRPC.Variant`.
| `EthereumJSONRPC.Variant` | `EthereumJSONRPC.Transport` | `EthereumJSONRPC.HTTP` | `url` | Command | Usage(s) |
|:--------------------------|:----------------------------|:---------------------------------|:--------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:---------------------------------------------------|
| `EthereumJSONRPC.Parity` | `EthereumJSONRPC.Mox` | `EthereumJSONRPC.HTTP.Mox` | N/A | `mix test` | Local, `circleci/config.yml` `test_parity_mox` job |
| `EthereumJSONRPC.Parity` | `EthereumJSONRPC.HTTP` | `EthereumJSONRPC.HTTP.HTTPoison` | `https://trace-sokol.poa.network` | `ETHEREUM_JSONRPC_VARIANT=EthereumJSONRPC.Parity ETHEREUM_JSONRPC_TRANSPORT=EthereumJSONRPC.HTTP ETHEREUM_JSONRPC_HTTP=EthereumJSONRPC.HTTP.HTTPoison ETHEREUM_JSONRPC_HTTP_URL=https://sokol-trace.poa.network mix test --exclude no_parity` | `.circleci/config.yml` `test_parity_http` job |
| `EthereumJSONRPC.Geth` | `EthereumJSONRPC.Mox` | `EthereumJSONRPC.HTTP.Mox` | N/A | `ETHEREUM_JSONRPC_VARIANT=EthereumJSONRPC.Geth mix test --exclude no_geth` | `.circleci/config.yml` `test_geth_http` job |
| `EthereumJSONRPC.Geth` | `EthereumJSONRPC.HTTP` | `EthereumJSONRPC.HTTP.HTTPoison` | `https://mainnet.infura.io/8lTvJTKmHPCHazkneJsY` | `ETHEREUM_JSONRPC_VARIANT=EthereumJSONRPC.Geth ETHEREUM_JSONRPC_TRANSPORT=EthereumJSONRPC.HTTP ETHEREUM_JSONRPC_HTTP=EthereumJSONRPC.HTTP.HTTPoison ETHEREUM_JSONRPC_HTTP_URL=https://mainnet.infura.io/8lTvJTKmHPCHazkneJsY mix test --exclude no_geth` | `.circleci/config.yml` `test_geth_http` job |
### Benchmarking
#### `Explorer.Chain.recent_collated_transactions/0`

@ -9,4 +9,12 @@ config :explorer,
http_options: [recv_timeout: 60_000, timeout: 60_000, hackney: [pool: :ethereum_jsonrpc]]
],
variant: EthereumJSONRPC.Geth
],
subscribe_named_arguments: [
transport: EthereumJSONRPC.WebSocket,
transport_options: [
web_socket: EthereumJSONRPC.WebSocket.WebSocketClient,
url: System.get_env("ETHEREUM_JSONRPC_WEB_SOCKET_URL") || "wss://mainnet.infura.io/8lTvJTKmHPCHazkneJsY/ws"
],
variant: EthereumJSONRPC.Geth
]

@ -13,4 +13,12 @@ config :explorer,
http_options: [recv_timeout: 60_000, timeout: 60_000, hackney: [pool: :ethereum_jsonrpc]]
],
variant: EthereumJSONRPC.Parity
],
subscribe_named_arguments: [
transport: EthereumJSONRPC.WebSocket,
transport_options: [
web_socket: EthereumJSONRPC.WebSocket.WebSocketClient,
url: "wss://sokol-ws.poa.network/ws"
],
variant: EthereumJSONRPC.Parity
]

@ -9,4 +9,12 @@ config :explorer,
http_options: [recv_timeout: 60_000, timeout: 60_000, hackney: [pool: :ethereum_jsonrpc]]
],
variant: EthereumJSONRPC.Geth
],
subscribe_named_arguments: [
transport: EthereumJSONRPC.WebSocket,
transport_options: [
web_socket: EthereumJSONRPC.WebSocket.WebSocketClient,
url: System.get_env("ETHEREUM_JSONRPC_HTTP_URL") || "wss://mainnet.infura.io/8lTvJTKmHPCHazkneJsY/ws"
],
variant: EthereumJSONRPC.Geth
]

@ -13,4 +13,12 @@ config :explorer,
http_options: [recv_timeout: 60_000, timeout: 60_000, hackney: [pool: :ethereum_jsonrpc]]
],
variant: EthereumJSONRPC.Parity
],
subscribe_named_arguments: [
transport: EthereumJSONRPC.WebSocket,
transport_options: [
web_socket: EthereumJSONRPC.WebSocket.WebSocketClient,
url: "wss://sokol-ws.poa.network/ws"
],
variant: EthereumJSONRPC.Parity
]

@ -5,4 +5,9 @@ config :explorer,
transport: EthereumJSONRPC.Mox,
transport_options: [],
variant: EthereumJSONRPC.Geth
],
subscribe_named_arguments: [
transport: EthereumJSONRPC.Mox,
transport_options: [],
variant: EthereumJSONRPC.Geth
]

@ -6,4 +6,9 @@ config :explorer,
transport: EthereumJSONRPC.Mox,
transport_options: [],
variant: EthereumJSONRPC.Parity
],
subscribe_named_arguments: [
transport: EthereumJSONRPC.Mox,
transport_options: [],
variant: EthereumJSONRPC.Parity
]

@ -21,11 +21,47 @@ be found at [https://hexdocs.pm/indexer](https://hexdocs.pm/indexer).
## Testing
By default, [`mox`](https://github.com/plataformatec/mox) will be used to mock the `EthereumJSONRPC.Transport` and `EthereumJSONRPC.HTTP` behaviours. The mocked behaviours returns differ based on the `EthereumJSONRPC.Variant`.
| `EthereumJSONRPC.Variant` | `EthereumJSONRPC.Transport` | `EthereumJSONRPC.HTTP` | `url` | Command | Usage(s) |
|:--------------------------|:----------------------------|:---------------------------------|:--------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:---------------------------------------------------|
| `EthereumJSONRPC.Parity` | `EthereumJSONRPC.Mox` | `EthereumJSONRPC.HTTP.Mox` | N/A | `mix test` | Local, `circleci/config.yml` `test_parity_mox` job |
| `EthereumJSONRPC.Parity` | `EthereumJSONRPC.HTTP` | `EthereumJSONRPC.HTTP.HTTPoison` | `https://trace-sokol.poa.network` | `ETHEREUM_JSONRPC_VARIANT=EthereumJSONRPC.Parity ETHEREUM_JSONRPC_TRANSPORT=EthereumJSONRPC.HTTP ETHEREUM_JSONRPC_HTTP=EthereumJSONRPC.HTTP.HTTPoison ETHEREUM_JSONRPC_HTTP_URL=https://sokol-trace.poa.network mix test --exclude no_parity` | `.circleci/config.yml` `test_parity_http` job |
| `EthereumJSONRPC.Geth` | `EthereumJSONRPC.Mox` | `EthereumJSONRPC.HTTP.Mox` | N/A | `ETHEREUM_JSONRPC_VARIANT=EthereumJSONRPC.Geth mix test --exclude no_geth` | `.circleci/config.yml` `test_geth_http` job |
| `EthereumJSONRPC.Geth` | `EthereumJSONRPC.HTTP` | `EthereumJSONRPC.HTTP.HTTPoison` | `https://mainnet.infura.io/8lTvJTKmHPCHazkneJsY` | `ETHEREUM_JSONRPC_VARIANT=EthereumJSONRPC.Geth ETHEREUM_JSONRPC_TRANSPORT=EthereumJSONRPC.HTTP ETHEREUM_JSONRPC_HTTP=EthereumJSONRPC.HTTP.HTTPoison ETHEREUM_JSONRPC_HTTP_URL=https://mainnet.infura.io/8lTvJTKmHPCHazkneJsY mix test --exclude no_geth` | `.circleci/config.yml` `test_geth_http` job |
### Parity
#### Mox
**This is the default setup. `mix test` will work on its own, but to be explicit, use the following setup**:
```shell
export ETHEREUM_JSONRPC_CASE=EthereumJSONRPC.Case.Parity.Mox
mix test --exclude no_parity
```
#### HTTP / WebSocket
```shell
export ETHEREUM_JSONRPC_CASE=EthereumJSONRPC.Case.Parity.HTTPWebSocket
mix test --exclude no_parity
```
| Protocol | URL |
|:----------|:-----------------------------------|
| HTTP | `https://sokol-trace.poa.network` |
| WebSocket | `wss://sokol-ws.poa.network/ws` |
### Geth
#### Mox
```shell
export ETHEREUM_JSONRPC_CASE=EthereumJSONRPC.Case.Geth.Mox
mix test --exclude no_geth
```
#### HTTP / WebSocket
```shell
export ETHEREUM_JSONRPC_CASE=EthereumJSONRPC.Case.Geth.HTTPWebSocket
mix test --exclude no_geth
```
| Protocol | URL |
|:----------|:--------------------------------------------------|
| HTTP | `https://mainnet.infura.io/8lTvJTKmHPCHazkneJsY` |
| WebSocket | `wss://mainnet.infura.io/ws/8lTvJTKmHPCHazkneJsY` |

@ -10,4 +10,11 @@ config :indexer,
http_options: [recv_timeout: 60_000, timeout: 60_000, hackney: [pool: :ethereum_jsonrpc]]
],
variant: EthereumJSONRPC.Geth
],
subscribe_named_arguments: [
transport: EthereumJSONRPC.WebSocket,
transport_options: [
web_socket: EthereumJSONRPC.WebSocket.WebSocketClient,
url: "wss://mainnet.infura.io/ws/8lTvJTKmHPCHazkneJsY"
]
]

@ -14,4 +14,11 @@ config :indexer,
http_options: [recv_timeout: 60_000, timeout: 60_000, hackney: [pool: :ethereum_jsonrpc]]
],
variant: EthereumJSONRPC.Parity
],
subscribe_named_arguments: [
transport: EthereumJSONRPC.WebSocket,
transport_options: [
web_socket: EthereumJSONRPC.WebSocket.WebSocketClient,
url: "wss://sokol-ws.poa.network/ws"
]
]

@ -10,4 +10,11 @@ config :indexer,
http_options: [recv_timeout: 60_000, timeout: 60_000, hackney: [pool: :ethereum_jsonrpc]]
],
variant: EthereumJSONRPC.Geth
],
subscribe_named_arguments: [
transport: EthereumJSONRPC.WebSocket,
transport_options: [
web_socket: EthereumJSONRPC.WebSocket.WebSocketClient,
url: "wss://mainnet.infura.io/ws/8lTvJTKmHPCHazkneJsY"
]
]

@ -14,4 +14,11 @@ config :indexer,
http_options: [recv_timeout: 60_000, timeout: 60_000, hackney: [pool: :ethereum_jsonrpc]]
],
variant: EthereumJSONRPC.Parity
],
subscribe_named_arguments: [
transport: EthereumJSONRPC.WebSocket,
transport_options: [
web_socket: EthereumJSONRPC.WebSocket.WebSocketClient,
url: "wss://sokol-ws.poa.network/ws"
]
]

@ -22,8 +22,9 @@ defmodule Indexer.Application do
|> Application.get_all_env()
|> Keyword.take(
~w(blocks_batch_size blocks_concurrency block_interval json_rpc_named_arguments receipts_batch_size
receipts_concurrency)a
receipts_concurrency subscribe_named_arguments)a
)
|> Enum.into(%{})
children = [
{Task.Supervisor, name: Indexer.TaskSupervisor},

@ -5,24 +5,10 @@ defmodule Indexer.BlockFetcher do
require Logger
import Indexer, only: [debug: 1]
alias Explorer.Chain.{Block, Import}
alias Indexer.{AddressExtraction, Balances, Sequence, TokenTransfers}
alias Indexer.{AddressExtraction, Balances, TokenTransfers}
alias Indexer.BlockFetcher.Receipts
# dialyzer thinks that Logger.debug functions always have no_local_return
@dialyzer {:nowarn_function, import_range: 2}
# These are all the *default* values for options.
# DO NOT use them directly in the code. Get options from `state`.
@blocks_batch_size 10
@blocks_concurrency 10
@receipts_batch_size 250
@receipts_concurrency 10
@type address_hash_to_fetched_balance_block_number :: %{String.t() => Block.block_number()}
@type transaction_hash_to_block_number :: %{String.t() => Block.block_number()}
@ -49,18 +35,24 @@ defmodule Indexer.BlockFetcher do
}
) :: Import.all_result()
# These are all the *default* values for options.
# DO NOT use them directly in the code. Get options from `state`.
@receipts_batch_size 250
@receipts_concurrency 10
@doc false
def default_receipts_batch_size, do: @receipts_batch_size
@doc false
def default_receipts_concurrency, do: @receipts_concurrency
@enforce_keys ~w(json_rpc_named_arguments)a
defstruct blocks_batch_size: @blocks_batch_size,
blocks_concurrency: @blocks_concurrency,
broadcast: nil,
defstruct broadcast: nil,
callback_module: nil,
json_rpc_named_arguments: nil,
receipts_batch_size: @receipts_batch_size,
receipts_concurrency: @receipts_concurrency,
sequence: nil
@doc false
def default_blocks_batch_size, do: @blocks_batch_size
receipts_concurrency: @receipts_concurrency
@doc """
Required named arguments
@ -70,54 +62,81 @@ defmodule Indexer.BlockFetcher do
The follow options can be overridden:
* `:blocks_batch_size` - The number of blocks to request in one call to the JSONRPC. Defaults to
`#{@blocks_batch_size}`. Block requests also include the transactions for those blocks. *These transactions
are not paginated.*
* `:blocks_concurrency` - The number of concurrent requests of `:blocks_batch_size` to allow against the JSONRPC.
Defaults to #{@blocks_concurrency}. So upto `blocks_concurrency * block_batch_size` (defaults to
`#{@blocks_concurrency * @blocks_batch_size}`) blocks can be requested from the JSONRPC at once over all
connections.
* `:receipts_batch_size` - The number of receipts to request in one call to the JSONRPC. Defaults to
`#{@receipts_batch_size}`. Receipt requests also include the logs for when the transaction was collated into the
block. *These logs are not paginated.*
* `:receipts_concurrency` - The number of concurrent requests of `:receipts_batch_size` to allow against the JSONRPC
**for each block range**. Defaults to `#{@receipts_concurrency}`. So upto
`block_concurrency * receipts_batch_size * receipts_concurrency` (defaults to
`#{@blocks_concurrency * @receipts_concurrency * @receipts_batch_size}`) receipts can be requested from the
JSONRPC at once over all connections. *Each transaction only has one receipt.*
**for each block range**. Defaults to `#{@receipts_concurrency}`. *Each transaction only has one receipt.*
"""
def new(named_arguments) when is_list(named_arguments) do
def new(named_arguments) when is_map(named_arguments) do
struct!(__MODULE__, named_arguments)
end
def stream_import(%__MODULE__{blocks_concurrency: blocks_concurrency, sequence: sequence} = state)
when is_pid(sequence) do
sequence
|> Sequence.build_stream()
|> Task.async_stream(
&import_range(state, &1),
max_concurrency: blocks_concurrency,
timeout: :infinity
)
|> Stream.run()
end
defp cap_seq(seq, next, range) do
case next do
:more ->
debug(fn ->
first_block_number..last_block_number = range
"got blocks #{first_block_number} - #{last_block_number}"
end)
:end_of_chain ->
Sequence.cap(seq)
@spec fetch_and_import_range(t, Range.t()) ::
{:ok, {inserted :: %{}, next :: :more | :end_of_chain}}
| {:error,
{step :: atom(), reason :: term()}
| [%Ecto.Changeset{}]
| {step :: atom(), failed_value :: term(), changes_so_far :: term()}}
def fetch_and_import_range(
%__MODULE__{
broadcast: broadcast,
callback_module: callback_module,
json_rpc_named_arguments: json_rpc_named_arguments
} = state,
_.._ = range
)
when broadcast in ~w(true false)a and callback_module != nil do
with {:blocks, {:ok, next, result}} <-
{:blocks, EthereumJSONRPC.fetch_blocks_by_range(range, json_rpc_named_arguments)},
%{blocks: blocks, transactions: transactions_without_receipts} = result,
{:receipts, {:ok, receipt_params}} <- {:receipts, Receipts.fetch(state, transactions_without_receipts)},
%{logs: logs, receipts: receipts} = receipt_params,
transactions_with_receipts = Receipts.put(transactions_without_receipts, receipts),
%{token_transfers: token_transfers, tokens: tokens} = TokenTransfers.from_log_params(logs),
addresses =
AddressExtraction.extract_addresses(%{
blocks: blocks,
logs: logs,
token_transfers: token_transfers,
transactions: transactions_with_receipts
}),
balances_params_set =
Balances.params_set(%{
blocks_params: blocks,
logs_params: logs,
transactions_params: transactions_with_receipts
}),
token_balances = Balances.params_set(%{token_transfers_params: token_transfers}),
{:ok, inserted} <-
import_range(
state,
%{
range: range,
addresses: %{params: addresses},
balances: %{params: balances_params_set},
token_balances: %{params: token_balances},
blocks: %{params: blocks},
logs: %{params: logs},
receipts: %{params: receipts},
token_transfers: %{params: token_transfers},
tokens: %{on_conflict: :nothing, params: tokens},
transactions: %{params: transactions_with_receipts, on_conflict: :replace_all}
}
) do
{:ok, {inserted, next}}
else
{step, {:error, reason}} -> {:error, {step, reason}}
{:error, changesets} = error when is_list(changesets) -> error
{:error, step, failed_value, changes_so_far} -> {:error, {step, failed_value, changes_so_far}}
end
:ok
end
defp insert(%__MODULE__{broadcast: broadcast, callback_module: callback_module, sequence: sequence} = state, options)
defp import_range(
%__MODULE__{broadcast: broadcast, callback_module: callback_module} = state,
options
)
when is_map(options) do
{address_hash_to_fetched_balance_block_number, import_options} =
pop_address_hash_to_fetched_balance_block_number(options)
@ -134,33 +153,7 @@ defmodule Indexer.BlockFetcher do
}
)
# use a `case` to ensure that `callback_module` `import` has correct return type
case callback_module.import(state, options_with_broadcast) do
{:ok, _} = ok ->
ok
{:error, changesets} = error when is_list(changesets) ->
%{range: range} = options
Logger.error(fn ->
"failed to validate blocks #{inspect(range)}: #{inspect(changesets)}. Retrying"
end)
:ok = Sequence.queue(sequence, range)
error
{:error, step, failed_value, _changes_so_far} = error ->
%{range: range} = options
Logger.error(fn ->
"failed to insert blocks during #{step} #{inspect(range)}: #{inspect(failed_value)}. Retrying"
end)
:ok = Sequence.queue(sequence, range)
error
end
callback_module.import(state, options_with_broadcast)
end
# `fetched_balance_block_number` is needed for the `BalanceFetcher`, but should not be used for `import` because the
@ -189,61 +182,4 @@ defmodule Indexer.BlockFetcher do
) do
{{hash, fetched_balance_block_number}, Map.delete(address_params, :fetched_balance_block_number)}
end
# Run at state.blocks_concurrency max_concurrency when called by `stream_import/1`
# Only public for testing
@doc false
def import_range(%__MODULE__{json_rpc_named_arguments: json_rpc_named_arguments, sequence: seq} = state, range) do
with {:blocks, {:ok, next, result}} <-
{:blocks, EthereumJSONRPC.fetch_blocks_by_range(range, json_rpc_named_arguments)},
%{blocks: blocks, transactions: transactions_without_receipts} = result,
cap_seq(seq, next, range),
{:receipts, {:ok, receipt_params}} <- {:receipts, Receipts.fetch(state, transactions_without_receipts)},
%{logs: logs, receipts: receipts} = receipt_params,
transactions_with_receipts = Receipts.put(transactions_without_receipts, receipts),
%{token_transfers: token_transfers, tokens: tokens} = TokenTransfers.from_log_params(logs) do
addresses =
AddressExtraction.extract_addresses(%{
blocks: blocks,
logs: logs,
token_transfers: token_transfers,
transactions: transactions_with_receipts
})
balances_params_set =
Balances.params_set(%{
blocks_params: blocks,
logs_params: logs,
transactions_params: transactions_with_receipts
})
token_balances = Balances.params_set(%{token_transfers_params: token_transfers})
insert(
state,
%{
range: range,
addresses: %{params: addresses},
balances: %{params: balances_params_set},
token_balances: %{params: token_balances},
blocks: %{params: blocks},
logs: %{params: logs},
receipts: %{params: receipts},
token_transfers: %{params: token_transfers},
tokens: %{on_conflict: :nothing, params: tokens},
transactions: %{params: transactions_with_receipts, on_conflict: :replace_all}
}
)
else
{step, {:error, reason}} ->
debug(fn ->
first..last = range
"failed to fetch #{step} for blocks #{first} - #{last}: #{inspect(reason)}. Retrying block range."
end)
:ok = Sequence.queue(seq, range)
{:error, step, reason}
end
end
end

@ -6,14 +6,13 @@ defmodule Indexer.BlockFetcher.Catchup do
require Logger
import Indexer, only: [debug: 1]
import Indexer.BlockFetcher, only: [stream_import: 1]
import Indexer.BlockFetcher, only: [fetch_and_import_range: 2]
alias Explorer.Chain
alias Indexer.{
BalanceFetcher,
BlockFetcher,
BoundInterval,
InternalTransactionFetcher,
Sequence,
TokenFetcher
@ -21,37 +20,44 @@ defmodule Indexer.BlockFetcher.Catchup do
@behaviour BlockFetcher
@enforce_keys ~w(block_fetcher bound_interval)a
defstruct ~w(block_fetcher bound_interval task)a
# These are all the *default* values for options.
# DO NOT use them directly in the code. Get options from `state`.
def new(%{block_fetcher: %BlockFetcher{} = common_block_fetcher, block_interval: block_interval}) do
block_fetcher = %BlockFetcher{common_block_fetcher | broadcast: false, callback_module: __MODULE__}
minimum_interval = div(block_interval, 2)
@blocks_batch_size 10
@blocks_concurrency 10
%__MODULE__{
block_fetcher: block_fetcher,
bound_interval: BoundInterval.within(minimum_interval..(minimum_interval * 10))
}
end
defstruct blocks_batch_size: @blocks_batch_size,
blocks_concurrency: @blocks_concurrency,
block_fetcher: nil
@doc false
def default_blocks_batch_size, do: @blocks_batch_size
@doc """
Starts `task/1` and puts it in `t:Indexer.BlockFetcher.t/0`
"""
@spec put(%BlockFetcher.Supervisor{catchup: %__MODULE__{task: nil}}) :: %BlockFetcher.Supervisor{
catchup: %__MODULE__{task: Task.t()}
}
def put(%BlockFetcher.Supervisor{catchup: %__MODULE__{task: nil} = state} = supervisor_state) do
put_in(
supervisor_state.catchup.task,
Task.Supervisor.async_nolink(Indexer.TaskSupervisor, __MODULE__, :task, [state])
)
end
Required named arguments
* `:json_rpc_named_arguments` - `t:EthereumJSONRPC.json_rpc_named_arguments/0` passed to
`EthereumJSONRPC.json_rpc/2`.
The follow options can be overridden:
def task(%__MODULE__{
block_fetcher:
%BlockFetcher{blocks_batch_size: blocks_batch_size, json_rpc_named_arguments: json_rpc_named_arguments} =
block_fetcher
}) do
* `:blocks_batch_size` - The number of blocks to request in one call to the JSONRPC. Defaults to
`#{@blocks_batch_size}`. Block requests also include the transactions for those blocks. *These transactions
are not paginated.*
* `:blocks_concurrency` - The number of concurrent requests of `:blocks_batch_size` to allow against the JSONRPC.
Defaults to #{@blocks_concurrency}. So upto `blocks_concurrency * block_batch_size` (defaults to
`#{@blocks_concurrency * @blocks_batch_size}`) blocks can be requested from the JSONRPC at once over all
connections. Upto `block_concurrency * receipts_batch_size * receipts_concurrency` (defaults to
`#{@blocks_concurrency * BlockFetcher.default_receipts_batch_size() * BlockFetcher.default_receipts_batch_size()}`
) receipts can be requested from the JSONRPC at once over all connections.
"""
def task(
%__MODULE__{
blocks_batch_size: blocks_batch_size,
block_fetcher: %BlockFetcher{json_rpc_named_arguments: json_rpc_named_arguments}
} = state
) do
{:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments)
case latest_block_number do
@ -81,7 +87,7 @@ defmodule Indexer.BlockFetcher.Catchup do
{:ok, sequence} = Sequence.start_link(ranges: missing_ranges, step: -1 * blocks_batch_size)
Sequence.cap(sequence)
stream_import(%BlockFetcher{block_fetcher | sequence: sequence})
stream_fetch_and_import(state, sequence)
end
%{first_block_number: first, missing_block_count: missing_block_count}
@ -105,55 +111,6 @@ defmodule Indexer.BlockFetcher.Catchup do
end
end
def handle_success(
{ref, %{first_block_number: first_block_number, missing_block_count: missing_block_count}},
%BlockFetcher.Supervisor{
catchup: %__MODULE__{
bound_interval: bound_interval,
task: %Task{ref: ref}
}
} = supervisor_state
)
when is_integer(missing_block_count) do
new_bound_interval =
case missing_block_count do
0 ->
Logger.info("Index already caught up in #{first_block_number}-0")
BoundInterval.increase(bound_interval)
_ ->
Logger.info("Index had to catch up #{missing_block_count} blocks in #{first_block_number}-0")
BoundInterval.decrease(bound_interval)
end
Process.demonitor(ref, [:flush])
interval = new_bound_interval.current
Logger.info(fn ->
"Checking if index needs to catch up in #{interval}ms"
end)
Process.send_after(self(), :catchup_index, interval)
update_in(supervisor_state.catchup, fn state ->
%__MODULE__{state | bound_interval: new_bound_interval, task: nil}
end)
end
def handle_failure(
{:DOWN, ref, :process, pid, reason},
%BlockFetcher.Supervisor{catchup: %__MODULE__{task: %Task{pid: pid, ref: ref}}} = supervisor_state
) do
Logger.error(fn -> "Catchup index stream exited with reason (#{inspect(reason)}). Restarting" end)
send(self(), :catchup_index)
put_in(supervisor_state.catchup.task, nil)
end
defp async_import_remaining_block_data(
%{transactions: transaction_hashes, addresses: address_hashes, tokens: tokens},
%{
@ -179,4 +136,72 @@ defmodule Indexer.BlockFetcher.Catchup do
|> Enum.map(& &1.contract_address_hash)
|> TokenFetcher.async_fetch()
end
defp stream_fetch_and_import(%__MODULE__{blocks_concurrency: blocks_concurrency} = state, sequence)
when is_pid(sequence) do
sequence
|> Sequence.build_stream()
|> Task.async_stream(
&fetch_and_import_range_from_sequence(state, &1, sequence),
max_concurrency: blocks_concurrency,
timeout: :infinity
)
|> Stream.run()
end
# Run at state.blocks_concurrency max_concurrency when called by `stream_import/1`
defp fetch_and_import_range_from_sequence(
%__MODULE__{block_fetcher: %BlockFetcher{} = block_fetcher},
_.._ = range,
sequence
) do
case fetch_and_import_range(block_fetcher, range) do
{:ok, {inserted, next}} ->
cap_seq(sequence, next, range)
{:ok, inserted}
{:error, {step, reason}} = error ->
Logger.error(fn ->
first..last = range
"failed to fetch #{step} for blocks #{first} - #{last}: #{inspect(reason)}. Retrying block range."
end)
:ok = Sequence.queue(sequence, range)
error
{:error, changesets} = error when is_list(changesets) ->
Logger.error(fn ->
"failed to validate blocks #{inspect(range)}: #{inspect(changesets)}. Retrying"
end)
:ok = Sequence.queue(sequence, range)
error
{:error, {step, failed_value, _changes_so_far}} = error ->
Logger.error(fn ->
"failed to insert blocks during #{step} #{inspect(range)}: #{inspect(failed_value)}. Retrying"
end)
:ok = Sequence.queue(sequence, range)
error
end
end
defp cap_seq(seq, next, range) do
case next do
:more ->
debug(fn ->
first_block_number..last_block_number = range
"got blocks #{first_block_number} - #{last_block_number}"
end)
:end_of_chain ->
Sequence.cap(seq)
end
:ok
end
end

@ -0,0 +1,110 @@
defmodule Indexer.BlockFetcher.Catchup.Supervisor do
@moduledoc """
Supervises the `Indexer.BlockerFetcher.Catchup` with exponential backoff for restarts.
"""
# NOT a `Supervisor` because of the `Task` restart strategies are custom.
use GenServer
require Logger
alias Indexer.{BlockFetcher, BoundInterval}
alias Indexer.BlockFetcher.Catchup
# milliseconds
@block_interval 5_000
@enforce_keys ~w(bound_interval catchup)a
defstruct bound_interval: nil,
catchup: %Catchup{},
task: nil
def child_spec(arg) do
# The `child_spec` from `use Supervisor` because the one from `use GenServer` will set the `type` to `:worker`
# instead of `:supervisor` and use the wrong shutdown timeout
Supervisor.child_spec(%{id: __MODULE__, start: {__MODULE__, :start_link, [arg]}, type: :supervisor}, [])
end
@doc """
Starts supervisor of `Indexer.BlockerFetcher.Catchup` and `Indexer.BlockFetcher.Realtime`.
For `named_arguments` see `Indexer.BlockFetcher.new/1`. For `t:GenServer.options/0` see `GenServer.start_link/3`.
"""
@spec start_link([named_arguments :: list() | GenServer.options()]) :: {:ok, pid}
def start_link([named_arguments, gen_server_options]) when is_map(named_arguments) and is_list(gen_server_options) do
GenServer.start_link(__MODULE__, named_arguments, gen_server_options)
end
@impl GenServer
def init(named_arguments) do
state = new(named_arguments)
send(self(), :catchup_index)
{:ok, state}
end
defp new(%{block_fetcher: common_block_fetcher} = named_arguments) do
block_fetcher = %BlockFetcher{common_block_fetcher | broadcast: false, callback_module: Catchup}
block_interval = Map.get(named_arguments, :block_interval, @block_interval)
minimum_interval = div(block_interval, 2)
bound_interval = BoundInterval.within(minimum_interval..(minimum_interval * 10))
%__MODULE__{
catchup: %Catchup{block_fetcher: block_fetcher},
bound_interval: bound_interval
}
end
@impl GenServer
def handle_info(:catchup_index, %__MODULE__{catchup: %Catchup{} = catchup} = state) do
{:noreply,
%__MODULE__{state | task: Task.Supervisor.async_nolink(Indexer.TaskSupervisor, Catchup, :task, [catchup])}}
end
def handle_info(
{ref, %{first_block_number: first_block_number, missing_block_count: missing_block_count}},
%__MODULE__{
bound_interval: bound_interval,
task: %Task{ref: ref}
} = state
)
when is_integer(missing_block_count) do
new_bound_interval =
case missing_block_count do
0 ->
Logger.info("Index already caught up in #{first_block_number}-0")
BoundInterval.increase(bound_interval)
_ ->
Logger.info("Index had to catch up #{missing_block_count} blocks in #{first_block_number}-0")
BoundInterval.decrease(bound_interval)
end
Process.demonitor(ref, [:flush])
interval = new_bound_interval.current
Logger.info(fn ->
"Checking if index needs to catch up in #{interval}ms"
end)
Process.send_after(self(), :catchup_index, interval)
{:noreply, %__MODULE__{state | bound_interval: new_bound_interval, task: nil}}
end
def handle_info(
{:DOWN, ref, :process, pid, reason},
%__MODULE__{task: %Task{pid: pid, ref: ref}} = state
) do
Logger.error(fn -> "Catchup index stream exited with reason (#{inspect(reason)}). Restarting" end)
send(self(), :catchup_index)
{:noreply, %__MODULE__{state | task: nil}}
end
end

@ -1,55 +1,113 @@
defmodule Indexer.BlockFetcher.Realtime do
@moduledoc """
Fetches and indexes block ranges from latest block forward.
Fetches and indexes block ranges from latest block forward using a WebSocket.
"""
use GenServer
require Logger
import EthereumJSONRPC, only: [integer_to_quantity: 1]
import Indexer.BlockFetcher, only: [stream_import: 1]
import EthereumJSONRPC, only: [integer_to_quantity: 1, quantity_to_integer: 1]
import Indexer, only: [debug: 1]
import Indexer.BlockFetcher, only: [fetch_and_import_range: 2]
alias EthereumJSONRPC.Subscription
alias Explorer.Chain
alias Indexer.{
AddressExtraction,
BlockFetcher,
Sequence,
TokenFetcher
}
alias Indexer.{AddressExtraction, BlockFetcher, TokenFetcher}
@behaviour BlockFetcher
@enforce_keys ~w(block_fetcher interval)a
defstruct block_fetcher: nil,
interval: nil,
task_by_ref: %{}
def new(%{block_fetcher: %BlockFetcher{} = common_block_fetcher, block_interval: block_interval}) do
block_fetcher = %BlockFetcher{
common_block_fetcher
| callback_module: __MODULE__,
blocks_concurrency: 1,
broadcast: true
}
@enforce_keys ~w(block_fetcher)a
defstruct ~w(block_fetcher subscription)a
@type t :: %__MODULE__{
block_fetcher: %BlockFetcher{
broadcast: true,
callback_module: __MODULE__,
json_rpc_named_arguments: EthereumJSONRPC.json_rpc_named_arguments(),
receipts_batch_size: pos_integer(),
receipts_concurrency: pos_integer()
},
subscription: Subscription.t()
}
def start_link([arguments, gen_server_options]) do
GenServer.start_link(__MODULE__, arguments, gen_server_options)
end
interval = div(block_interval, 2)
@impl GenServer
def init(%{block_fetcher: %BlockFetcher{} = block_fetcher, subscribe_named_arguments: subscribe_named_arguments})
when is_list(subscribe_named_arguments) do
{:ok, %__MODULE__{block_fetcher: %BlockFetcher{block_fetcher | broadcast: true, callback_module: __MODULE__}},
{:continue, {:init, subscribe_named_arguments}}}
end
%__MODULE__{block_fetcher: block_fetcher, interval: interval}
@impl GenServer
def handle_continue({:init, subscribe_named_arguments}, %__MODULE__{subscription: nil} = state)
when is_list(subscribe_named_arguments) do
case EthereumJSONRPC.subscribe("newHeads", subscribe_named_arguments) do
{:ok, subscription} -> {:noreply, %__MODULE__{state | subscription: subscription}}
{:error, reason} -> {:stop, reason, state}
end
end
@doc """
Starts `task/1` and puts it in `t:Indexer.BlockFetcher.t/0` `realtime_task_by_ref`.
"""
def put(%BlockFetcher.Supervisor{realtime: %__MODULE__{} = state} = supervisor_state) do
%Task{ref: ref} = task = Task.Supervisor.async_nolink(Indexer.TaskSupervisor, __MODULE__, :task, [state])
@impl GenServer
def handle_info(
{subscription, {:ok, %{"number" => quantity}}},
%__MODULE__{
block_fetcher: %BlockFetcher{} = block_fetcher,
subscription: %Subscription{} = subscription
} = state
)
when is_binary(quantity) do
number = quantity_to_integer(quantity)
# Subscriptions don't support getting all the blocks and transactions data, so we need to go back and get the full block
case fetch_and_import_range(block_fetcher, number..number) do
{:ok, {_inserted, _next}} ->
debug(fn ->
["realtime indexer fetched and imported block ", to_string(number)]
end)
put_in(supervisor_state.realtime.task_by_ref[ref], task)
end
{:error, {step, reason}} ->
Logger.error(fn ->
[
"realtime indexer failed to fetch ",
to_string(step),
" for block ",
to_string(number),
": ",
inspect(reason),
". Block will be retried by catchup indexer."
]
end)
{:error, changesets} when is_list(changesets) ->
Logger.error(fn ->
[
"realtime indexer failed to validate for block ",
to_string(number),
": ",
inspect(changesets),
". Block will be retried by catchup indexer."
]
end)
{:error, {step, failed_value, _changes_so_far}} ->
Logger.error(fn ->
[
"realtime indexer failed to insert ",
to_string(step),
" for block ",
to_string(number),
": ",
inspect(failed_value),
". Block will be retried by catchup indexer."
]
end)
end
def task(%__MODULE__{block_fetcher: %BlockFetcher{json_rpc_named_arguments: json_rpc_named_arguments} = block_fetcher}) do
{:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments)
{:ok, sequence} = Sequence.start_link(first: latest_block_number, step: 2)
stream_import(%BlockFetcher{block_fetcher | sequence: sequence})
{:noreply, state}
end
@import_options ~w(address_hash_to_fetched_balance_block_number transaction_hash_to_block_number)a
@ -91,52 +149,6 @@ defmodule Indexer.BlockFetcher.Realtime do
end
end
def handle_success(
{ref, :ok = result},
%BlockFetcher.Supervisor{realtime: %__MODULE__{task_by_ref: task_by_ref}} = supervisor_state
) do
{task, running_task_by_ref} = Map.pop(task_by_ref, ref)
case task do
nil ->
Logger.error(fn ->
"Unknown ref (#{inspect(ref)}) that is neither the catchup index" <>
" nor a realtime index Task ref returned result (#{inspect(result)})"
end)
_ ->
:ok
end
Process.demonitor(ref, [:flush])
put_in(supervisor_state.realtime.task_by_ref, running_task_by_ref)
end
def handle_failure(
{:DOWN, ref, :process, pid, reason},
%BlockFetcher.Supervisor{realtime: %__MODULE__{task_by_ref: task_by_ref}} = supervisor_state
) do
{task, running_task_by_ref} = Map.pop(task_by_ref, ref)
case task do
nil ->
Logger.error(fn ->
"Unknown ref (#{inspect(ref)}) that is neither the catchup index" <>
" nor a realtime index Task ref reports unknown pid (#{pid}) DOWN due to reason (#{reason}})"
end)
_ ->
Logger.error(fn ->
"Realtime index stream exited with reason (#{inspect(reason)}). " <>
"The next realtime index task will fill the missing block " <>
"if the lastest block number has not advanced by then or the catch up index will fill the missing block."
end)
end
put_in(supervisor_state.realtime.task_by_ref, running_task_by_ref)
end
defp async_import_remaining_block_data(%{tokens: tokens}) do
tokens
|> Enum.map(& &1.contract_address_hash)

@ -0,0 +1,37 @@
defmodule Indexer.BlockFetcher.Realtime.Supervisor do
@moduledoc """
Supervises realtime block fetcher.
"""
use Supervisor
def start_link([arguments, gen_server_options]) do
Supervisor.start_link(__MODULE__, arguments, gen_server_options)
end
@impl Supervisor
def init(%{block_fetcher: block_fetcher, subscribe_named_arguments: subscribe_named_arguments}) do
children =
case Keyword.fetch!(subscribe_named_arguments, :transport) do
EthereumJSONRPC.WebSocket ->
transport_options = Keyword.fetch!(subscribe_named_arguments, :transport_options)
url = Keyword.fetch!(transport_options, :url)
web_socket_module = Keyword.fetch!(transport_options, :web_socket)
web_socket = Indexer.BlockFetcher.Realtime.WebSocket
block_fetcher_subscribe_named_arguments =
put_in(subscribe_named_arguments[:transport_options][:web_socket_options], %{web_socket: web_socket})
[
{web_socket_module, [url, [name: web_socket]]},
{Indexer.BlockFetcher.Realtime,
[
%{block_fetcher: block_fetcher, subscribe_named_arguments: block_fetcher_subscribe_named_arguments},
[name: Indexer.BlockFetcher.Realtime]
]}
]
end
Supervisor.init(children, strategy: :rest_for_one)
end
end

@ -1,84 +1,35 @@
defmodule Indexer.BlockFetcher.Supervisor do
@moduledoc """
Supervises the `Indexer.BlockerFetcher.Catchup` and `Indexer.BlockFetcher.Realtime`.
Supervises catchup and realtime block fetchers
"""
# NOT a `Supervisor` because of the `Task` restart strategies are custom.
use GenServer
require Logger
alias Indexer.BlockFetcher
alias Indexer.BlockFetcher.{Catchup, Realtime}
# milliseconds
@block_interval 5_000
@enforce_keys ~w(catchup realtime)a
defstruct ~w(catchup realtime)a
def child_spec(arg) do
# The `child_spec` from `use Supervisor` because the one from `use GenServer` will set the `type` to `:worker`
# instead of `:supervisor` and use the wrong shutdown timeout
Supervisor.child_spec(%{id: __MODULE__, start: {__MODULE__, :start_link, [arg]}, type: :supervisor}, [])
end
@doc """
Starts supervisor of `Indexer.BlockerFetcher.Catchup` and `Indexer.BlockFetcher.Realtime`.
For `named_arguments` see `Indexer.BlockFetcher.new/1`. For `t:GenServer.options/0` see `GenServer.start_link/3`.
"""
@spec start_link([named_arguments :: list() | GenServer.options()]) :: {:ok, pid}
def start_link([named_arguments, gen_server_options]) when is_list(named_arguments) and is_list(gen_server_options) do
GenServer.start_link(__MODULE__, named_arguments, gen_server_options)
end
@impl GenServer
def init(named_arguments) do
state = new(named_arguments)
send(self(), :catchup_index)
{:ok, _} = :timer.send_interval(state.realtime.interval, :realtime_index)
{:ok, state}
end
defp new(named_arguments) do
{given_block_interval, block_fetcher_named_arguments} = Keyword.pop(named_arguments, :block_interval)
block_fetcher = struct!(BlockFetcher, block_fetcher_named_arguments)
block_interval = given_block_interval || @block_interval
%__MODULE__{
catchup: Catchup.new(%{block_fetcher: block_fetcher, block_interval: block_interval}),
realtime: Realtime.new(%{block_fetcher: block_fetcher, block_interval: block_interval})
}
end
@impl GenServer
def handle_info(:catchup_index, %__MODULE__{} = state) do
{:noreply, Catchup.put(state)}
end
def handle_info({ref, _} = message, %__MODULE__{catchup: %Catchup{task: %Task{ref: ref}}} = state) do
{:noreply, Catchup.handle_success(message, state)}
end
def handle_info(
{:DOWN, ref, :process, pid, _} = message,
%__MODULE__{catchup: %Catchup{task: %Task{pid: pid, ref: ref}}} = state
) do
{:noreply, Catchup.handle_failure(message, state)}
end
def handle_info(:realtime_index, %__MODULE__{} = state) do
{:noreply, Realtime.put(state)}
end
def handle_info({ref, :ok} = message, %__MODULE__{} = state) when is_reference(ref) do
{:noreply, Realtime.handle_success(message, state)}
end
def handle_info({:DOWN, _, :process, _, _} = message, %__MODULE__{} = state) do
{:noreply, Realtime.handle_failure(message, state)}
use Supervisor
def start_link([arguments, gen_server_options]) do
Supervisor.start_link(__MODULE__, arguments, gen_server_options)
end
@impl Supervisor
def init(%{block_interval: block_interval, subscribe_named_arguments: subscribe_named_arguments} = named_arguments) do
block_fetcher =
named_arguments
|> Map.drop(~w(block_interval subscribe_named_arguments)a)
|> BlockFetcher.new()
Supervisor.init(
[
{Catchup.Supervisor,
[%{block_fetcher: block_fetcher, block_interval: block_interval}, [name: Catchup.Supervisor]]},
{Realtime.Supervisor,
[
%{block_fetcher: block_fetcher, subscribe_named_arguments: subscribe_named_arguments},
[name: Realtime.Supervisor]
]}
],
strategy: :one_for_one
)
end
end

@ -1,4 +1,4 @@
defmodule Indexer.BlockFetcher.SupervisorTest do
defmodule Indexer.BlockFetcher.Catchup.SupervisorTest do
# `async: false` due to use of named GenServer
use EthereumJSONRPC.Case, async: false
use Explorer.DataCase
@ -199,7 +199,7 @@ defmodule Indexer.BlockFetcher.SupervisorTest do
{:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments)
default_blocks_batch_size = BlockFetcher.default_blocks_batch_size()
default_blocks_batch_size = BlockFetcher.Catchup.default_blocks_batch_size()
assert latest_block_number > default_blocks_batch_size
@ -209,7 +209,10 @@ defmodule Indexer.BlockFetcher.SupervisorTest do
AddressBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
InternalTransactionFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
TokenFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
start_supervised!({BlockFetcher.Supervisor, [[json_rpc_named_arguments: json_rpc_named_arguments], []]})
start_supervised!(
{Catchup.Supervisor, [%{block_fetcher: %BlockFetcher{json_rpc_named_arguments: json_rpc_named_arguments}}, []]}
)
first_catchup_block_number = latest_block_number - 1
@ -256,9 +259,8 @@ defmodule Indexer.BlockFetcher.SupervisorTest do
# from `setup :state`
assert_received :catchup_index
assert {:noreply,
%BlockFetcher.Supervisor{catchup: %Catchup{task: %Task{pid: pid, ref: ref}}} = catchup_index_state} =
BlockFetcher.Supervisor.handle_info(:catchup_index, state)
assert {:noreply, %Catchup.Supervisor{catchup: %Catchup{}, task: %Task{pid: pid, ref: ref}} = catchup_index_state} =
Catchup.Supervisor.handle_info(:catchup_index, state)
assert_receive {^ref, %{first_block_number: 0, missing_block_count: 0}} = message
@ -267,12 +269,12 @@ defmodule Indexer.BlockFetcher.SupervisorTest do
# DOWN is not flushed
assert {:messages, [{:DOWN, ^ref, :process, ^pid, :normal}]} = Process.info(self(), :messages)
assert {:noreply, message_state} = BlockFetcher.Supervisor.handle_info(message, catchup_index_state)
assert {:noreply, message_state} = Catchup.Supervisor.handle_info(message, catchup_index_state)
# DOWN is flushed
assert {:messages, []} = Process.info(self(), :messages)
assert message_state.catchup.bound_interval.current > catchup_index_state.catchup.bound_interval.current
assert message_state.bound_interval.current > catchup_index_state.bound_interval.current
end
test "decreases catchup_bound_interval if blocks missing", %{
@ -327,38 +329,39 @@ defmodule Indexer.BlockFetcher.SupervisorTest do
# from `setup :state`
assert_received :catchup_index
assert {:noreply,
%BlockFetcher.Supervisor{catchup: %Catchup{task: %Task{pid: pid, ref: ref}}} = catchup_index_state} =
BlockFetcher.Supervisor.handle_info(:catchup_index, state)
assert {:noreply, %Catchup.Supervisor{catchup: %Catchup{}, task: %Task{pid: pid, ref: ref}} = catchup_index_state} =
Catchup.Supervisor.handle_info(:catchup_index, state)
# 2 blocks are missing, but latest is assumed to be handled by realtime_index, so only 1 is missing for
# catchup_index
assert_receive {^ref, %{first_block_number: 0, missing_block_count: 1}} = message, 200
Process.sleep(200)
# DOWN is not flushed
assert {:messages, [{:DOWN, ^ref, :process, ^pid, :normal}]} = Process.info(self(), :messages)
assert {:noreply, message_state} = BlockFetcher.Supervisor.handle_info(message, catchup_index_state)
assert {:noreply, message_state} = Catchup.Supervisor.handle_info(message, catchup_index_state)
# DOWN is flushed
assert {:messages, []} = Process.info(self(), :messages)
assert message_state.catchup.bound_interval.current == message_state.catchup.bound_interval.minimum
assert message_state.bound_interval.current == message_state.bound_interval.minimum
# When not at minimum it is decreased
above_minimum_state = update_in(catchup_index_state.catchup.bound_interval, &BoundInterval.increase/1)
above_minimum_state = update_in(catchup_index_state.bound_interval, &BoundInterval.increase/1)
assert above_minimum_state.catchup.bound_interval.current > message_state.catchup.bound_interval.minimum
assert {:noreply, above_minimum_message_state} = BlockFetcher.Supervisor.handle_info(message, above_minimum_state)
assert above_minimum_state.bound_interval.current > message_state.bound_interval.minimum
assert {:noreply, above_minimum_message_state} = Catchup.Supervisor.handle_info(message, above_minimum_state)
assert above_minimum_message_state.catchup.bound_interval.current <
above_minimum_state.catchup.bound_interval.current
assert above_minimum_message_state.bound_interval.current < above_minimum_state.bound_interval.current
end
end
defp state(%{json_rpc_named_arguments: json_rpc_named_arguments}) do
{:ok, state} = BlockFetcher.Supervisor.init(json_rpc_named_arguments: json_rpc_named_arguments)
{:ok, state} =
Catchup.Supervisor.init(%{block_fetcher: %BlockFetcher{json_rpc_named_arguments: json_rpc_named_arguments}})
%{state: state}
end

@ -27,21 +27,23 @@ defmodule Indexer.BlockFetcher.RealtimeTest do
trace_replayTransaction: "https://core-trace.poa.network"
)
block_fetcher = %{BlockFetcher.new(json_rpc_named_arguments: core_json_rpc_named_arguments) | broadcast: false}
realtime = Realtime.new(%{block_fetcher: block_fetcher, block_interval: 5_000})
block_fetcher = %BlockFetcher{
broadcast: false,
callback_module: Realtime,
json_rpc_named_arguments: core_json_rpc_named_arguments
}
%{json_rpc_named_arguments: core_json_rpc_named_arguments, realtime: realtime}
%{block_fetcher: block_fetcher, json_rpc_named_arguments: core_json_rpc_named_arguments}
end
describe "Indexer.BlockFetcher.stream_import/1" do
@tag :no_geth
test "in range with internal transactions", %{
json_rpc_named_arguments: json_rpc_named_arguments,
realtime: %Realtime{block_fetcher: %BlockFetcher{} = block_fetcher}
block_fetcher: %BlockFetcher{} = block_fetcher,
json_rpc_named_arguments: json_rpc_named_arguments
} do
{:ok, sequence} = Sequence.start_link(ranges: [], step: 2)
Sequence.cap(sequence)
full_block_fetcher = %BlockFetcher{block_fetcher | sequence: sequence}
start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor})
TokenFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
@ -358,48 +360,48 @@ defmodule Indexer.BlockFetcher.RealtimeTest do
end
assert {:ok,
%{
addresses: [
%Address{hash: first_address_hash, fetched_balance_block_number: 3_946_079},
%Address{hash: second_address_hash, fetched_balance_block_number: 3_946_079},
%Address{hash: third_address_hash, fetched_balance_block_number: 3_946_079},
%Address{hash: fourth_address_hash, fetched_balance_block_number: 3_946_080},
%Address{hash: fifth_address_hash, fetched_balance_block_number: 3_946_079}
],
balances: [
%{
address_hash: first_address_hash,
block_number: 3_946_079
},
%{
address_hash: second_address_hash,
block_number: 3_946_079
},
%{
address_hash: third_address_hash,
block_number: 3_946_079
},
%{
address_hash: fourth_address_hash,
block_number: 3_946_080
},
%{
address_hash: fifth_address_hash,
block_number: 3_946_079
}
],
blocks: [%Block{number: 3_946_079}, %Block{number: 3_946_080}],
internal_transactions: [
%{index: 0, transaction_hash: transaction_hash},
%{index: 1, transaction_hash: transaction_hash},
%{index: 2, transaction_hash: transaction_hash},
%{index: 3, transaction_hash: transaction_hash},
%{index: 4, transaction_hash: transaction_hash},
%{index: 5, transaction_hash: transaction_hash}
],
logs: [],
transactions: [transaction_hash]
}} = BlockFetcher.import_range(full_block_fetcher, 3_946_079..3_946_080)
{%{
addresses: [
%Address{hash: first_address_hash, fetched_balance_block_number: 3_946_079},
%Address{hash: second_address_hash, fetched_balance_block_number: 3_946_079},
%Address{hash: third_address_hash, fetched_balance_block_number: 3_946_079},
%Address{hash: fourth_address_hash, fetched_balance_block_number: 3_946_080},
%Address{hash: fifth_address_hash, fetched_balance_block_number: 3_946_079}
],
balances: [
%{
address_hash: first_address_hash,
block_number: 3_946_079
},
%{
address_hash: second_address_hash,
block_number: 3_946_079
},
%{
address_hash: third_address_hash,
block_number: 3_946_079
},
%{
address_hash: fourth_address_hash,
block_number: 3_946_080
},
%{
address_hash: fifth_address_hash,
block_number: 3_946_079
}
],
blocks: [%Block{number: 3_946_079}, %Block{number: 3_946_080}],
internal_transactions: [
%{index: 0, transaction_hash: transaction_hash},
%{index: 1, transaction_hash: transaction_hash},
%{index: 2, transaction_hash: transaction_hash},
%{index: 3, transaction_hash: transaction_hash},
%{index: 4, transaction_hash: transaction_hash},
%{index: 5, transaction_hash: transaction_hash}
],
logs: [],
transactions: [transaction_hash]
}, :more}} = BlockFetcher.fetch_and_import_range(block_fetcher, 3_946_079..3_946_080)
end
end
end

@ -16,7 +16,6 @@ defmodule Indexer.BlockFetcherTest do
BufferedTask,
InternalTransactionFetcher,
InternalTransactionFetcherCase,
Sequence,
TokenFetcherCase
}
@ -54,12 +53,11 @@ defmodule Indexer.BlockFetcherTest do
TokenFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
%{
block_fetcher:
BlockFetcher.new(
broadcast: false,
callback_module: Indexer.BlockFetcher.Catchup,
json_rpc_named_arguments: json_rpc_named_arguments
)
block_fetcher: %BlockFetcher{
broadcast: false,
callback_module: Indexer.BlockFetcher.Catchup,
json_rpc_named_arguments: json_rpc_named_arguments
}
}
end
@ -178,9 +176,6 @@ defmodule Indexer.BlockFetcherTest do
end
end
{:ok, sequence} = Sequence.start_link(first: 0, step: 1)
sequenced_block_fetcher = %BlockFetcher{block_fetcher | sequence: sequence}
%{address_hash: address_hash, block_hash: block_hash} =
case Keyword.fetch!(json_rpc_named_arguments, :variant) do
EthereumJSONRPC.Geth ->
@ -216,15 +211,15 @@ defmodule Indexer.BlockFetcherTest do
end
log_bad_gateway(
fn -> BlockFetcher.import_range(sequenced_block_fetcher, block_number..block_number) end,
fn -> BlockFetcher.fetch_and_import_range(block_fetcher, block_number..block_number) end,
fn result ->
assert {:ok,
%{
addresses: [%Address{hash: ^address_hash}],
blocks: [%Block{hash: ^block_hash}],
logs: [],
transactions: []
}} = result
{%{
addresses: [%Address{hash: ^address_hash}],
blocks: [%Block{hash: ^block_hash}],
logs: [],
transactions: []
}, :more}} = result
wait_for_tasks(InternalTransactionFetcher)
wait_for_tasks(BalanceFetcher)
@ -415,9 +410,6 @@ defmodule Indexer.BlockFetcherTest do
end
end
{:ok, sequence} = Sequence.start_link(first: 0, step: 1)
sequenced_block_fetcher = %BlockFetcher{block_fetcher | sequence: sequence}
case Keyword.fetch!(json_rpc_named_arguments, :variant) do
EthereumJSONRPC.Geth ->
block_number = 48230
@ -476,7 +468,7 @@ defmodule Indexer.BlockFetcherTest do
154, 143, 4, 28, 171, 95, 190, 255, 254, 174, 75, 182>>
}
]
}} = BlockFetcher.import_range(sequenced_block_fetcher, block_number..block_number)
}} = BlockFetcher.fetch_and_import_range(block_fetcher, block_number..block_number)
wait_for_tasks(InternalTransactionFetcher)
wait_for_tasks(BalanceFetcher)
@ -513,57 +505,57 @@ defmodule Indexer.BlockFetcherTest do
EthereumJSONRPC.Parity ->
assert {:ok,
%{
addresses: [
%Address{
hash:
%Explorer.Chain.Hash{
byte_count: 20,
bytes:
<<139, 243, 141, 71, 100, 146, 144, 100, 242, 212, 211, 165, 101, 32, 167, 106, 179, 223,
65, 91>>
} = first_address_hash
},
%Address{
hash:
%Explorer.Chain.Hash{
byte_count: 20,
bytes:
<<232, 221, 197, 199, 162, 210, 240, 215, 169, 121, 132, 89, 192, 16, 79, 223, 94, 152,
122, 202>>
} = second_address_hash
}
],
blocks: [
%Block{
hash: %Explorer.Chain.Hash{
byte_count: 32,
bytes:
<<246, 180, 184, 200, 141, 243, 235, 210, 82, 236, 71, 99, 40, 51, 77, 192, 38, 207, 102,
96, 106, 132, 251, 118, 155, 61, 60, 188, 204, 132, 113, 189>>
}
}
],
logs: [
%Log{
index: 0,
transaction_hash: %Explorer.Chain.Hash{
byte_count: 32,
bytes:
<<83, 189, 136, 72, 114, 222, 62, 72, 134, 146, 136, 27, 174, 236, 38, 46, 123, 149, 35, 77,
57, 101, 36, 140, 57, 254, 153, 47, 255, 212, 51, 229>>
}
}
],
transactions: [
%Explorer.Chain.Hash{
byte_count: 32,
bytes:
<<83, 189, 136, 72, 114, 222, 62, 72, 134, 146, 136, 27, 174, 236, 38, 46, 123, 149, 35, 77,
57, 101, 36, 140, 57, 254, 153, 47, 255, 212, 51, 229>>
}
]
}} = BlockFetcher.import_range(block_fetcher, block_number..block_number)
{%{
addresses: [
%Address{
hash:
%Explorer.Chain.Hash{
byte_count: 20,
bytes:
<<139, 243, 141, 71, 100, 146, 144, 100, 242, 212, 211, 165, 101, 32, 167, 106, 179, 223,
65, 91>>
} = first_address_hash
},
%Address{
hash:
%Explorer.Chain.Hash{
byte_count: 20,
bytes:
<<232, 221, 197, 199, 162, 210, 240, 215, 169, 121, 132, 89, 192, 16, 79, 223, 94, 152,
122, 202>>
} = second_address_hash
}
],
blocks: [
%Block{
hash: %Explorer.Chain.Hash{
byte_count: 32,
bytes:
<<246, 180, 184, 200, 141, 243, 235, 210, 82, 236, 71, 99, 40, 51, 77, 192, 38, 207, 102,
96, 106, 132, 251, 118, 155, 61, 60, 188, 204, 132, 113, 189>>
}
}
],
logs: [
%Log{
index: 0,
transaction_hash: %Explorer.Chain.Hash{
byte_count: 32,
bytes:
<<83, 189, 136, 72, 114, 222, 62, 72, 134, 146, 136, 27, 174, 236, 38, 46, 123, 149, 35,
77, 57, 101, 36, 140, 57, 254, 153, 47, 255, 212, 51, 229>>
}
}
],
transactions: [
%Explorer.Chain.Hash{
byte_count: 32,
bytes:
<<83, 189, 136, 72, 114, 222, 62, 72, 134, 146, 136, 27, 174, 236, 38, 46, 123, 149, 35, 77,
57, 101, 36, 140, 57, 254, 153, 47, 255, 212, 51, 229>>
}
]
}, :more}} = BlockFetcher.fetch_and_import_range(block_fetcher, block_number..block_number)
wait_for_tasks(InternalTransactionFetcher)
wait_for_tasks(BalanceFetcher)

@ -4,6 +4,7 @@ defmodule Indexer.BufferedTaskTest do
alias Indexer.BufferedTask
@max_batch_size 2
@assert_receive_timeout 200
defp start_buffer(callback_module) do
start_supervised!({Task.Supervisor, name: BufferedTaskSup})
@ -94,13 +95,13 @@ defmodule Indexer.BufferedTaskTest do
refute_receive _
BufferedTask.buffer(buffer, ~w(12 13 14 15 16))
assert_receive {:run, ~w(12 13)}
assert_receive {:run, ~w(14 15)}
assert_receive {:run, ~w(16)}
assert_receive {:run, ~w(12 13)}, @assert_receive_timeout
assert_receive {:run, ~w(14 15)}, @assert_receive_timeout
assert_receive {:run, ~w(16)}, @assert_receive_timeout
refute_receive _
BufferedTask.buffer(buffer, ~w(17))
assert_receive {:run, ~w(17)}
assert_receive {:run, ~w(17)}, @assert_receive_timeout
refute_receive _
end
@ -111,8 +112,8 @@ defmodule Indexer.BufferedTaskTest do
BufferedTask.buffer(buffer, ~w(some more entries))
assert_receive {:run, ~w(some more)}
assert_receive {:run, ~w(entries)}
assert_receive {:run, ~w(some more)}, @assert_receive_timeout
assert_receive {:run, ~w(entries)}, @assert_receive_timeout
refute_receive _
end
@ -122,8 +123,8 @@ defmodule Indexer.BufferedTaskTest do
{:ok, buffer} = start_buffer(RetryableTask)
BufferedTask.buffer(buffer, [:boom])
assert_receive {:run, {0, :boom}}
assert_receive {:run, {1, :boom}}, 200
assert_receive {:run, {0, :boom}}, @assert_receive_timeout
assert_receive {:run, {1, :boom}}, @assert_receive_timeout
refute_receive _
end
@ -132,12 +133,12 @@ defmodule Indexer.BufferedTaskTest do
{:ok, buffer} = start_buffer(RetryableTask)
BufferedTask.buffer(buffer, [1, 2, 3])
assert_receive {:run, {0, [1, 2]}}
assert_receive {:run, {0, [3]}}
assert_receive {:run, {1, [1, 2]}}
assert_receive {:run, {1, [3]}}
assert_receive {:final_run, {2, [1, 2]}}
assert_receive {:final_run, {2, [3]}}
assert_receive {:run, {0, [1, 2]}}, @assert_receive_timeout
assert_receive {:run, {0, [3]}}, @assert_receive_timeout
assert_receive {:run, {1, [1, 2]}}, @assert_receive_timeout
assert_receive {:run, {1, [3]}}, @assert_receive_timeout
assert_receive {:final_run, {2, [1, 2]}}, @assert_receive_timeout
assert_receive {:final_run, {2, [3]}}, @assert_receive_timeout
refute_receive _
end

@ -77,4 +77,5 @@
"tzdata": {:hex, :tzdata, "0.5.16", "13424d3afc76c68ff607f2df966c0ab4f3258859bbe3c979c9ed1606135e7352", [:mix], [{:hackney, "~> 1.0", [hex: :hackney, optional: false]}]},
"unicode_util_compat": {:hex, :unicode_util_compat, "0.3.1", "a1f612a7b512638634a603c8f401892afbf99b8ce93a45041f8aaca99cadb85e", [:rebar3], []},
"wallaby": {:hex, :wallaby, "0.20.0", "cc6663555ff7b05afbebb2a8b461d18a5b321658b9017f7bc77d494b7063266a", [:mix], [{:httpoison, "~> 0.12", [hex: :httpoison, optional: false]}, {:poison, ">= 1.4.0", [hex: :poison, optional: false]}, {:poolboy, "~> 1.5", [hex: :poolboy, optional: false]}]},
"websocket_client": {:hex, :websocket_client, "1.3.0", "2275d7daaa1cdacebf2068891c9844b15f4fdc3de3ec2602420c2fb486db59b6", [:rebar3], [], "hexpm"},
}

Loading…
Cancel
Save