Merge branch 'optimized-indexer' of github.com:poanetwork/poa-explorer into optimized-indexer

pull/162/head
Luke Imhoff 7 years ago
commit 9a2ec6f26d
  1. 48
      apps/explorer/lib/explorer/chain.ex
  2. 27
      apps/explorer/lib/explorer/chain/address.ex
  3. 130
      apps/explorer/lib/explorer/indexer/address_fetcher.ex
  4. 11
      apps/explorer/lib/explorer/indexer/block_fetcher.ex
  5. 11
      apps/explorer/lib/explorer/indexer/supervisor.ex
  6. 39
      apps/explorer/lib/explorer/jsonrpc.ex
  7. 4
      apps/explorer/priv/repo/migrations/20180117221921_create_address.exs
  8. 2
      apps/explorer/test/explorer/chain/address_test.exs
  9. 16
      apps/explorer/test/explorer/chain_test.exs

@ -72,13 +72,33 @@ defmodule Explorer.Chain do
@spec balance(Address.t(), :wei) :: Wei.t() | nil
@spec balance(Address.t(), :gwei) :: Wei.gwei() | nil
@spec balance(Address.t(), :ether) :: Wei.ether() | nil
def balance(%Address{balance: balance}, unit) do
def balance(%Address{fetched_balance: balance}, unit) do
case balance do
nil -> nil
_ -> Wei.to(balance, unit)
end
end
@spec update_balances(
%{address_hash :: String.t => balance :: integer}
) :: :ok | {:error, reason :: term}
def update_balances(balances) do
timestamps = timestamps()
changes =
for {hash_string, amount} <- balances do
{:ok, truncated_hash} = Explorer.Chain.Hash.Truncated.cast(hash_string)
Map.merge(timestamps, %{
hash: truncated_hash,
fetched_balance: amount,
balance_fetched_at: timestamps.updated_at,
})
end
{_, _} = Repo.safe_insert_all(Address, changes,
conflict_target: :hash, on_conflict: :replace_all)
:ok
end
@doc """
The number of `t:Explorer.Chain.Block.t/0`.
@ -380,7 +400,7 @@ defmodule Explorer.Chain do
end
@doc """
Bulk insert tree of resource from a list of blocks.
Bulk insert blocks from a list of blocks.
## Tree
@ -391,7 +411,7 @@ defmodule Explorer.Chain do
* `t.Explorer.Chain.Log.t/0`
"""
def insert(%{
def import_blocks(%{
blocks_params: blocks_params,
logs_params: logs_params,
internal_transactions_params: internal_transactions_params,
@ -412,6 +432,13 @@ defmodule Explorer.Chain do
end
end
@doc """
The number of `t:Explorer.Chain.Address.t/0`.
"""
def address_count do
Repo.aggregate(Address, :count, :hash)
end
@doc """
The number of `t:Explorer.Chain.InternalTransaction.t/0`.
@ -478,6 +505,17 @@ defmodule Explorer.Chain do
|> Repo.paginate(pagination)
end
@doc """
Returns a stream of unfetched `Explorer.Chain.Address.t/0`.
"""
def stream_unfetched_addresses(initial, reducer) when is_function(reducer) do
Repo.transaction(fn ->
from(a in Address, where: is_nil(a.balance_fetched_at))
|> Repo.stream()
|> Enum.reduce(initial, reducer)
end)
end
@doc """
The number of `t:Explorer.Chain.Log.t/0`.
@ -1058,11 +1096,11 @@ defmodule Explorer.Chain do
insert_changes_list(
changes_list,
conflict_target: :hash,
# Do nothing so that pre-existing balance is not overwritten
on_conflict: :nothing,
on_conflict: [set: [balance_fetched_at: nil]],
for: Address,
timestamps: timestamps
)
{:ok, for(changes <- changes_list, do: changes.hash)}
end
@spec insert_blocks([map()], [timestamps_option]) :: {:ok, Block.t()} | {:error, [Changeset.t()]}

@ -20,8 +20,8 @@ defmodule Explorer.Chain.Address do
@type hash :: Hash.t()
@typedoc """
* `balance` - `credit.value - debit.value`
* `balance_updated_at` - the last time `balance` was recalculated
* `fetched_balance` - The last fetched balance from Parity
* `balance_fetched_at` - the last time `balance` was fetched
* `credit` - accumulation of all credits to the address `hash`
* `debit` - accumulation of all debits to the address `hash`
* `hash` - the hash of the address's public key
@ -29,8 +29,8 @@ defmodule Explorer.Chain.Address do
* `updated_at` when this address was last updated
"""
@type t :: %__MODULE__{
balance: Decimal.t(),
balance_updated_at: DateTime.t(),
fetched_balance: Decimal.t(),
balance_fetched_at: DateTime.t(),
credit: %Ecto.Association.NotLoaded{} | Credit.t() | nil,
debit: %Ecto.Association.NotLoaded{} | Debit.t() | nil,
hash: Hash.Truncated.t(),
@ -40,8 +40,8 @@ defmodule Explorer.Chain.Address do
@primary_key {:hash, Hash.Truncated, autogenerate: false}
schema "addresses" do
field(:balance, :decimal)
field(:balance_updated_at, Timex.Ecto.DateTime)
field(:fetched_balance, :decimal)
field(:balance_fetched_at, Timex.Ecto.DateTime)
timestamps()
@ -49,13 +49,11 @@ defmodule Explorer.Chain.Address do
has_one(:debit, Debit)
end
# Functions
def balance_changeset(%__MODULE__{} = address, attrs) do
address
|> cast(attrs, [:balance])
|> validate_required([:balance])
|> put_balance_updated_at()
|> cast(attrs, [:fetched_balance])
|> validate_required([:fetched_balance])
|> put_change(:balance_fetched_at, Timex.now())
end
def changeset(%__MODULE__{} = address, attrs) do
@ -69,17 +67,10 @@ defmodule Explorer.Chain.Address do
Enum.map(hash_set, &hash_to_changes/1)
end
## Private Functions
defp hash_to_changes(%Hash{byte_count: 20} = hash) do
%{hash: hash}
end
defp put_balance_updated_at(changeset) do
changeset
|> put_change(:balance_updated_at, Timex.now())
end
defimpl String.Chars do
def to_string(%@for{hash: hash}) do
@protocol.to_string(hash)

@ -0,0 +1,130 @@
defmodule Explorer.Indexer.AddressFetcher do
@moduledoc """
Fetches and indexes `t:Explorer.Chain.Address.t/0` balances.
"""
use GenServer
require Logger
alias Explorer.Chain
alias Explorer.Chain.{
Address,
Hash
}
alias Explorer.JSONRPC
@fetch_interval :timer.seconds(3)
@max_batch_size 500
def async_fetch_balances(address_hashes) do
GenServer.cast(__MODULE__, {:buffer_addresses, address_hashes})
end
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(_opts) do
send(self(), :fetch_unfetched_addresses)
{:ok, %{buffer: MapSet.new(), tasks: %{}}}
end
def handle_info(:fetch_unfetched_addresses, state) do
schedule_next_buffer_fetch(0)
{:noreply, stream_unfetched_addresses(state)}
end
def handle_info(:buffer_fetch, state) do
schedule_next_buffer_fetch()
{:noreply, flush_buffer(state)}
end
def handle_info({ref, {:fetched_balances, results}}, state) do
:ok = Chain.update_balances(results)
{:noreply, drop_task(state, ref)}
end
def handle_info({:DOWN, _ref, :process, _pid, :normal}, state) do
{:noreply, state}
end
def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do
batch = Map.fetch!(state.tasks, ref)
new_state =
state
|> drop_task(ref)
|> buffer_addresses(batch)
{:noreply, new_state}
end
def handle_cast({:buffer_addresses, address_hashes}, state) do
{:noreply, buffer_addresses(state, address_hashes)}
end
defp drop_task(state, ref) do
%{state | tasks: Map.delete(state.tasks, ref)}
end
defp buffer_addresses(state, address_hashes) do
string_hashes = for hash <- address_hashes, do: Hash.to_string(hash)
%{state | buffer: MapSet.union(state.buffer, MapSet.new(string_hashes))}
end
defp stream_unfetched_addresses(state) do
tasks =
{state.tasks, state.buffer}
|> Chain.stream_unfetched_addresses(fn %Address{hash: hash}, {tasks, batch} ->
batch = MapSet.put(batch, Hash.to_string(hash))
if MapSet.size(batch) >= @max_batch_size do
task = do_async_fetch_balances(batch)
{Map.put(tasks, task.ref, batch), MapSet.new()}
else
{tasks, batch}
end
end)
|> fetch_remaining()
%{state | tasks: tasks}
end
defp fetch_remaining({:ok, {tasks, batch}}) do
if MapSet.size(batch) > 0 do
task = do_async_fetch_balances(batch)
Map.put(tasks, task.ref, batch)
else
tasks
end
end
defp flush_buffer(state) do
if MapSet.size(state.buffer) > 0 do
task = do_async_fetch_balances(state.buffer)
new_tasks = Map.put(state.tasks, task.ref, state.buffer)
%{state | tasks: new_tasks, buffer: MapSet.new()}
else
state
end
end
defp schedule_next_buffer_fetch(after_ms \\ @fetch_interval) do
Process.send_after(self(), :buffer_fetch, after_ms)
end
defp do_fetch_addresses(address_hashes) do
JSONRPC.fetch_balances_by_hash(address_hashes)
end
defp do_async_fetch_balances(hashes_mapset) do
Task.Supervisor.async_nolink(Explorer.Indexer.TaskSupervisor, fn ->
Logger.debug(fn -> "fetching #{MapSet.size(hashes_mapset)} balances" end)
{:ok, balances} = do_fetch_addresses(Enum.to_list(hashes_mapset))
{:fetched_balances, balances}
end)
end
end

@ -12,7 +12,10 @@ defmodule Explorer.Indexer.BlockFetcher do
require Logger
alias Explorer.{Chain, Indexer, JSONRPC}
alias Explorer.Indexer.Sequence
alias Explorer.Indexer.{
Sequence,
AddressFetcher,
}
alias Explorer.JSONRPC.Transactions
# Struct
@ -92,6 +95,7 @@ defmodule Explorer.Indexer.BlockFetcher do
internal transactions: #{Chain.internal_transaction_count()}
receipts: #{Chain.receipt_count()}
logs: #{Chain.log_count()}
addresses: #{Chain.address_count()}
"""
end)
@ -166,14 +170,15 @@ defmodule Explorer.Indexer.BlockFetcher do
seq: seq,
transactions_params: transactions_params
}) do
case Chain.insert(%{
case Chain.import_blocks(%{
blocks_params: blocks_params,
internal_transactions_params: internal_transactions_params,
logs_params: log_params,
receipts_params: receipt_params,
transactions_params: transactions_params
}) do
{:ok, _results} ->
{:ok, %{addresses: address_hashes}} ->
:ok = AddressFetcher.async_fetch_balances(address_hashes)
:ok
{:error, step, reason, _changes} ->

@ -5,7 +5,10 @@ defmodule Explorer.Indexer.Supervisor do
use Supervisor
alias Explorer.Indexer.BlockFetcher
alias Explorer.Indexer.{
BlockFetcher,
AddressFetcher
}
# Functions
@ -18,9 +21,11 @@ defmodule Explorer.Indexer.Supervisor do
@impl Supervisor
def init(_opts) do
children = [
{BlockFetcher, []}
{Task.Supervisor, name: Explorer.Indexer.TaskSupervisor},
{BlockFetcher, []},
{AddressFetcher, []},
]
Supervisor.init(children, strategy: :one_for_one)
Supervisor.init(children, strategy: :rest_for_one)
end
end

@ -101,6 +101,35 @@ defmodule Explorer.JSONRPC do
|> Keyword.fetch!(key)
end
@doc """
Fetches address balances by address hashes.
"""
def fetch_balances_by_hash(address_hashes) do
batched_requests =
for hash <- address_hashes do
%{
"id" => hash,
"jsonrpc" => "2.0",
"method" => "eth_getBalance",
"params" => [hash, "latest"]
}
end
batched_requests
|> json_rpc(config(:url))
|> handle_balances()
end
defp handle_balances({:ok, results}) do
native_results =
for response <- results, into: %{} do
{response["id"], hexadecimal_to_integer(response["result"])}
end
{:ok, native_results}
end
defp handle_balances({:error, _reason} = err), do: err
@doc """
Fetches blocks by block hashes.
@ -157,7 +186,7 @@ defmodule Explorer.JSONRPC do
case HTTPoison.post(url, json, headers, config(:http)) do
{:ok, %HTTPoison.Response{body: body, status_code: code}} ->
body |> decode_json(payload) |> handle_response(code)
body |> decode_json(payload, url) |> handle_response(code)
{:error, %HTTPoison.Error{reason: reason}} ->
{:error, reason}
@ -218,16 +247,18 @@ defmodule Explorer.JSONRPC do
defp encode_json(data), do: Jason.encode_to_iodata!(data)
defp decode_json(body, posted_payload) do
defp decode_json(body, posted_payload, url) do
Jason.decode!(body)
rescue
Jason.DecodeError ->
Logger.error("""
failed to decode json payload:
#{inspect(body)}
url: #{inspect(url)}
body: #{inspect(body)}
#{inspect(posted_payload)}
posted payload: #{inspect(posted_payload)}
""")

@ -3,8 +3,8 @@ defmodule Explorer.Repo.Migrations.CreateAddress do
def change do
create table(:addresses, primary_key: false) do
add(:balance, :numeric, precision: 100)
add(:balance_updated_at, :utc_datetime)
add(:fetched_balance, :numeric, precision: 100)
add(:balance_fetched_at, :utc_datetime)
add(:hash, :bytea, null: false, primary_key: true)
timestamps(null: false)

@ -18,7 +18,7 @@ defmodule Explorer.Chain.AddressTest do
describe "balance_changeset/2" do
test "with a new balance" do
changeset = Address.balance_changeset(%Address{}, %{balance: 99})
changeset = Address.balance_changeset(%Address{}, %{fetched_balance: 99})
assert changeset.valid?
end

@ -160,20 +160,20 @@ defmodule Explorer.ChainTest do
describe "balance/2" do
test "with Address.t with :wei" do
assert Chain.balance(%Address{balance: Decimal.new(1)}, :wei) == Decimal.new(1)
assert Chain.balance(%Address{balance: nil}, :wei) == nil
assert Chain.balance(%Address{fetched_balance: Decimal.new(1)}, :wei) == Decimal.new(1)
assert Chain.balance(%Address{fetched_balance: nil}, :wei) == nil
end
test "with Address.t with :gwei" do
assert Chain.balance(%Address{balance: Decimal.new(1)}, :gwei) == Decimal.new("1e-9")
assert Chain.balance(%Address{balance: Decimal.new("1e9")}, :gwei) == Decimal.new(1)
assert Chain.balance(%Address{balance: nil}, :gwei) == nil
assert Chain.balance(%Address{fetched_balance: Decimal.new(1)}, :gwei) == Decimal.new("1e-9")
assert Chain.balance(%Address{fetched_balance: Decimal.new("1e9")}, :gwei) == Decimal.new(1)
assert Chain.balance(%Address{fetched_balance: nil}, :gwei) == nil
end
test "with Address.t with :ether" do
assert Chain.balance(%Address{balance: Decimal.new(1)}, :ether) == Decimal.new("1e-18")
assert Chain.balance(%Address{balance: Decimal.new("1e18")}, :ether) == Decimal.new(1)
assert Chain.balance(%Address{balance: nil}, :ether) == nil
assert Chain.balance(%Address{fetched_balance: Decimal.new(1)}, :ether) == Decimal.new("1e-18")
assert Chain.balance(%Address{fetched_balance: Decimal.new("1e18")}, :ether) == Decimal.new(1)
assert Chain.balance(%Address{fetched_balance: nil}, :ether) == nil
end
end

Loading…
Cancel
Save