From b9154901e9c4348d953dbadadcd062b69ae74d36 Mon Sep 17 00:00:00 2001 From: Chris McCord Date: Thu, 3 May 2018 10:17:24 -0400 Subject: [PATCH 1/2] Add address balance fetching --- apps/explorer/lib/explorer/chain.ex | 48 ++++++- apps/explorer/lib/explorer/chain/address.ex | 27 ++-- .../lib/explorer/indexer/address_fetcher.ex | 125 ++++++++++++++++++ .../lib/explorer/indexer/block_fetcher.ex | 11 +- .../lib/explorer/indexer/supervisor.ex | 11 +- apps/explorer/lib/explorer/jsonrpc.ex | 39 +++++- .../20180117221921_create_address.exs | 4 +- .../test/explorer/chain/address_test.exs | 2 +- apps/explorer/test/explorer/chain_test.exs | 16 +-- 9 files changed, 239 insertions(+), 44 deletions(-) create mode 100644 apps/explorer/lib/explorer/indexer/address_fetcher.ex diff --git a/apps/explorer/lib/explorer/chain.ex b/apps/explorer/lib/explorer/chain.ex index b2b5449e49..09e07ac119 100644 --- a/apps/explorer/lib/explorer/chain.ex +++ b/apps/explorer/lib/explorer/chain.ex @@ -72,13 +72,33 @@ defmodule Explorer.Chain do @spec balance(Address.t(), :wei) :: Wei.t() | nil @spec balance(Address.t(), :gwei) :: Wei.gwei() | nil @spec balance(Address.t(), :ether) :: Wei.ether() | nil - def balance(%Address{balance: balance}, unit) do + def balance(%Address{fetched_balance: balance}, unit) do case balance do nil -> nil _ -> Wei.to(balance, unit) end end + @spec update_balances( + %{address_hash :: String.t => balance :: integer} + ) :: :ok | {:error, reason :: term} + def update_balances(balances) do + timestamps = timestamps() + changes = + for {hash_string, amount} <- balances do + {:ok, truncated_hash} = Explorer.Chain.Hash.Truncated.cast(hash_string) + Map.merge(timestamps, %{ + hash: truncated_hash, + fetched_balance: amount, + balance_fetched_at: timestamps.updated_at, + }) + end + + {_, _} = Repo.safe_insert_all(Address, changes, + conflict_target: :hash, on_conflict: :replace_all) + :ok + end + @doc """ The number of `t:Explorer.Chain.Block.t/0`. @@ -380,7 +400,7 @@ defmodule Explorer.Chain do end @doc """ - Bulk insert tree of resource from a list of blocks. + Bulk insert blocks from a list of blocks. ## Tree @@ -391,7 +411,7 @@ defmodule Explorer.Chain do * `t.Explorer.Chain.Log.t/0` """ - def insert(%{ + def import_blocks(%{ blocks_params: blocks_params, logs_params: logs_params, internal_transactions_params: internal_transactions_params, @@ -412,6 +432,13 @@ defmodule Explorer.Chain do end end + @doc """ + The number of `t:Explorer.Chain.Address.t/0`. + """ + def address_count do + Repo.aggregate(Address, :count, :hash) + end + @doc """ The number of `t:Explorer.Chain.InternalTransaction.t/0`. @@ -478,6 +505,17 @@ defmodule Explorer.Chain do |> Repo.paginate(pagination) end + @doc """ + Returns a stream of unfetched `Explorer.Chain.Address.t/0`. + """ + def stream_unfetched_addresses(initial, reducer) when is_function(reducer) do + Repo.transaction(fn -> + from(a in Address, where: is_nil(a.balance_fetched_at)) + |> Repo.stream() + |> Enum.reduce(initial, reducer) + end) + end + @doc """ The number of `t:Explorer.Chain.Log.t/0`. @@ -1058,11 +1096,11 @@ defmodule Explorer.Chain do insert_changes_list( changes_list, conflict_target: :hash, - # Do nothing so that pre-existing balance is not overwritten - on_conflict: :nothing, + on_conflict: [set: [balance_fetched_at: nil]], for: Address, timestamps: timestamps ) + {:ok, for(changes <- changes_list, do: changes.hash)} end @spec insert_blocks([map()], [timestamps_option]) :: {:ok, Block.t()} | {:error, [Changeset.t()]} diff --git a/apps/explorer/lib/explorer/chain/address.ex b/apps/explorer/lib/explorer/chain/address.ex index b67064239e..30268b6f9c 100644 --- a/apps/explorer/lib/explorer/chain/address.ex +++ b/apps/explorer/lib/explorer/chain/address.ex @@ -20,8 +20,8 @@ defmodule Explorer.Chain.Address do @type hash :: Hash.t() @typedoc """ - * `balance` - `credit.value - debit.value` - * `balance_updated_at` - the last time `balance` was recalculated + * `fetched_balance` - The last fetched balance from Parity + * `balance_fetched_at` - the last time `balance` was fetched * `credit` - accumulation of all credits to the address `hash` * `debit` - accumulation of all debits to the address `hash` * `hash` - the hash of the address's public key @@ -29,8 +29,8 @@ defmodule Explorer.Chain.Address do * `updated_at` when this address was last updated """ @type t :: %__MODULE__{ - balance: Decimal.t(), - balance_updated_at: DateTime.t(), + fetched_balance: Decimal.t(), + balance_fetched_at: DateTime.t(), credit: %Ecto.Association.NotLoaded{} | Credit.t() | nil, debit: %Ecto.Association.NotLoaded{} | Debit.t() | nil, hash: Hash.Truncated.t(), @@ -40,8 +40,8 @@ defmodule Explorer.Chain.Address do @primary_key {:hash, Hash.Truncated, autogenerate: false} schema "addresses" do - field(:balance, :decimal) - field(:balance_updated_at, Timex.Ecto.DateTime) + field(:fetched_balance, :decimal) + field(:balance_fetched_at, Timex.Ecto.DateTime) timestamps() @@ -49,13 +49,11 @@ defmodule Explorer.Chain.Address do has_one(:debit, Debit) end - # Functions - def balance_changeset(%__MODULE__{} = address, attrs) do address - |> cast(attrs, [:balance]) - |> validate_required([:balance]) - |> put_balance_updated_at() + |> cast(attrs, [:fetched_balance]) + |> validate_required([:fetched_balance]) + |> put_change(:balance_fetched_at, Timex.now()) end def changeset(%__MODULE__{} = address, attrs) do @@ -69,17 +67,10 @@ defmodule Explorer.Chain.Address do Enum.map(hash_set, &hash_to_changes/1) end - ## Private Functions - defp hash_to_changes(%Hash{byte_count: 20} = hash) do %{hash: hash} end - defp put_balance_updated_at(changeset) do - changeset - |> put_change(:balance_updated_at, Timex.now()) - end - defimpl String.Chars do def to_string(%@for{hash: hash}) do @protocol.to_string(hash) diff --git a/apps/explorer/lib/explorer/indexer/address_fetcher.ex b/apps/explorer/lib/explorer/indexer/address_fetcher.ex new file mode 100644 index 0000000000..da22f4ac96 --- /dev/null +++ b/apps/explorer/lib/explorer/indexer/address_fetcher.ex @@ -0,0 +1,125 @@ +defmodule Explorer.Indexer.AddressFetcher do + @moduledoc """ + TODO + """ + use GenServer + require Logger + + alias Explorer.Chain + alias Explorer.Chain.{ + Address, + Hash, + } + alias Explorer.JSONRPC + + @fetch_interval :timer.seconds(3) + @max_batch_size 500 + + def async_fetch_addresses(address_hashes) do + GenServer.cast(__MODULE__, {:buffer_addresses, address_hashes}) + end + + def start_link(opts) do + GenServer.start_link(__MODULE__, opts, name: __MODULE__) + end + + def init(_opts) do + send(self(), :fetch_unfetched_addresses) + + {:ok, %{buffer: MapSet.new(), tasks: %{}}} + end + + def handle_info(:fetch_unfetched_addresses, state) do + schedule_next_buffer_fetch(0) + {:noreply, stream_unfetched_addresses(state)} + end + + def handle_info(:buffer_fetch, state) do + schedule_next_buffer_fetch() + {:noreply, flush_buffer(state)} + end + + def handle_info({ref, {:fetched_balances, results}}, state) do + :ok = Chain.update_balances(results) + {:noreply, drop_task(state, ref)} + end + + def handle_info({:DOWN, _ref, :process, _pid, :normal}, state) do + {:noreply, state} + end + def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do + batch = Map.fetch!(state.tasks, ref) + + new_state = + state + |> drop_task(ref) + |> buffer_addresses(batch) + + {:noreply, new_state} + end + + def handle_cast({:buffer_addresses, address_hashes}, state) do + {:noreply, buffer_addresses(state, address_hashes)} + end + + defp drop_task(state, ref) do + %{state | tasks: Map.delete(state.tasks, ref)} + end + + defp buffer_addresses(state, address_hashes) do + string_hashes = for hash <- address_hashes, do: Hash.to_string(hash) + %{state | buffer: MapSet.union(state.buffer, MapSet.new(string_hashes))} + end + + defp stream_unfetched_addresses(state) do + tasks = + {state.tasks, state.buffer} + |> Chain.stream_unfetched_addresses(fn %Address{hash: hash}, {tasks, batch} -> + batch = MapSet.put(batch, Hash.to_string(hash)) + if MapSet.size(batch) >= @max_batch_size do + task = async_fetch_balances(batch) + {Map.put(tasks, task.ref, batch), MapSet.new()} + else + {tasks, batch} + end + end) + |> fetch_remaining() + + %{state | tasks: tasks} + end + defp fetch_remaining({:ok, {tasks, batch}}) do + if MapSet.size(batch) > 0 do + task = async_fetch_balances(batch) + Map.put(tasks, task.ref, batch) + else + tasks + end + end + + defp flush_buffer(state) do + if MapSet.size(state.buffer) > 0 do + task = async_fetch_balances(state.buffer) + new_tasks = Map.put(state.tasks, task.ref, state.buffer) + + %{state | tasks: new_tasks, buffer: MapSet.new()} + else + state + end + end + + defp schedule_next_buffer_fetch(after_ms \\ @fetch_interval) do + Process.send_after(self(), :buffer_fetch, after_ms) + end + + defp do_fetch_addresses(address_hashes) do + JSONRPC.fetch_balances_by_hash(address_hashes) + end + + defp async_fetch_balances(hashes_mapset) do + Task.Supervisor.async_nolink(Explorer.Indexer.TaskSupervisor, fn -> + Logger.debug(fn -> "fetching #{MapSet.size(hashes_mapset)} balances" end) + {:ok, balances} = do_fetch_addresses(Enum.to_list(hashes_mapset)) + {:fetched_balances, balances} + end) + end +end diff --git a/apps/explorer/lib/explorer/indexer/block_fetcher.ex b/apps/explorer/lib/explorer/indexer/block_fetcher.ex index 6e580e0e4f..3317eb37d2 100644 --- a/apps/explorer/lib/explorer/indexer/block_fetcher.ex +++ b/apps/explorer/lib/explorer/indexer/block_fetcher.ex @@ -12,7 +12,10 @@ defmodule Explorer.Indexer.BlockFetcher do require Logger alias Explorer.{Chain, Indexer, JSONRPC} - alias Explorer.Indexer.Sequence + alias Explorer.Indexer.{ + Sequence, + AddressFetcher, + } alias Explorer.JSONRPC.Transactions # Struct @@ -92,6 +95,7 @@ defmodule Explorer.Indexer.BlockFetcher do internal transactions: #{Chain.internal_transaction_count()} receipts: #{Chain.receipt_count()} logs: #{Chain.log_count()} + addresses: #{Chain.address_count()} """ end) @@ -166,14 +170,15 @@ defmodule Explorer.Indexer.BlockFetcher do seq: seq, transactions_params: transactions_params }) do - case Chain.insert(%{ + case Chain.import_blocks(%{ blocks_params: blocks_params, internal_transactions_params: internal_transactions_params, logs_params: log_params, receipts_params: receipt_params, transactions_params: transactions_params }) do - {:ok, _results} -> + {:ok, %{addresses: address_hashes}} -> + :ok = AddressFetcher.async_fetch_addresses(address_hashes) :ok {:error, step, reason, _changes} -> diff --git a/apps/explorer/lib/explorer/indexer/supervisor.ex b/apps/explorer/lib/explorer/indexer/supervisor.ex index 686df6f0ad..d846fd5193 100644 --- a/apps/explorer/lib/explorer/indexer/supervisor.ex +++ b/apps/explorer/lib/explorer/indexer/supervisor.ex @@ -5,7 +5,10 @@ defmodule Explorer.Indexer.Supervisor do use Supervisor - alias Explorer.Indexer.BlockFetcher + alias Explorer.Indexer.{ + BlockFetcher, + AddressFetcher + } # Functions @@ -18,9 +21,11 @@ defmodule Explorer.Indexer.Supervisor do @impl Supervisor def init(_opts) do children = [ - {BlockFetcher, []} + {Task.Supervisor, name: Explorer.Indexer.TaskSupervisor}, + {BlockFetcher, []}, + {AddressFetcher, []}, ] - Supervisor.init(children, strategy: :one_for_one) + Supervisor.init(children, strategy: :rest_for_one) end end diff --git a/apps/explorer/lib/explorer/jsonrpc.ex b/apps/explorer/lib/explorer/jsonrpc.ex index 0d110e05dc..687154e8b7 100644 --- a/apps/explorer/lib/explorer/jsonrpc.ex +++ b/apps/explorer/lib/explorer/jsonrpc.ex @@ -101,6 +101,35 @@ defmodule Explorer.JSONRPC do |> Keyword.fetch!(key) end + @doc """ + Fetches address balances by address hashes. + """ + def fetch_balances_by_hash(address_hashes) do + batched_requests = + for hash <- address_hashes do + %{ + "id" => hash, + "jsonrpc" => "2.0", + "method" => "eth_getBalance", + "params" => [hash, "latest"] + } + end + + batched_requests + |> json_rpc(config(:url)) + |> handle_balances() + end + defp handle_balances({:ok, results}) do + native_results = + for response <- results, into: %{} do + {response["id"], hexadecimal_to_integer(response["result"])} + end + + {:ok, native_results} + end + defp handle_balances({:error, _reason} = err), do: err + + @doc """ Fetches blocks by block hashes. @@ -157,7 +186,7 @@ defmodule Explorer.JSONRPC do case HTTPoison.post(url, json, headers, config(:http)) do {:ok, %HTTPoison.Response{body: body, status_code: code}} -> - body |> decode_json(payload) |> handle_response(code) + body |> decode_json(payload, url) |> handle_response(code) {:error, %HTTPoison.Error{reason: reason}} -> {:error, reason} @@ -218,16 +247,18 @@ defmodule Explorer.JSONRPC do defp encode_json(data), do: Jason.encode_to_iodata!(data) - defp decode_json(body, posted_payload) do + defp decode_json(body, posted_payload, url) do Jason.decode!(body) rescue Jason.DecodeError -> Logger.error(""" failed to decode json payload: - #{inspect(body)} + url: #{inspect(url)} + + body: #{inspect(body)} - #{inspect(posted_payload)} + posted payload: #{inspect(posted_payload)} """) diff --git a/apps/explorer/priv/repo/migrations/20180117221921_create_address.exs b/apps/explorer/priv/repo/migrations/20180117221921_create_address.exs index 22c8cbebef..f3ca2af556 100644 --- a/apps/explorer/priv/repo/migrations/20180117221921_create_address.exs +++ b/apps/explorer/priv/repo/migrations/20180117221921_create_address.exs @@ -3,8 +3,8 @@ defmodule Explorer.Repo.Migrations.CreateAddress do def change do create table(:addresses, primary_key: false) do - add(:balance, :numeric, precision: 100) - add(:balance_updated_at, :utc_datetime) + add(:fetched_balance, :numeric, precision: 100) + add(:balance_fetched_at, :utc_datetime) add(:hash, :bytea, null: false, primary_key: true) timestamps(null: false) diff --git a/apps/explorer/test/explorer/chain/address_test.exs b/apps/explorer/test/explorer/chain/address_test.exs index 2a631664f4..746fb6fc53 100644 --- a/apps/explorer/test/explorer/chain/address_test.exs +++ b/apps/explorer/test/explorer/chain/address_test.exs @@ -18,7 +18,7 @@ defmodule Explorer.Chain.AddressTest do describe "balance_changeset/2" do test "with a new balance" do - changeset = Address.balance_changeset(%Address{}, %{balance: 99}) + changeset = Address.balance_changeset(%Address{}, %{fetched_balance: 99}) assert changeset.valid? end diff --git a/apps/explorer/test/explorer/chain_test.exs b/apps/explorer/test/explorer/chain_test.exs index ded5e6f138..5c9bbed186 100644 --- a/apps/explorer/test/explorer/chain_test.exs +++ b/apps/explorer/test/explorer/chain_test.exs @@ -160,20 +160,20 @@ defmodule Explorer.ChainTest do describe "balance/2" do test "with Address.t with :wei" do - assert Chain.balance(%Address{balance: Decimal.new(1)}, :wei) == Decimal.new(1) - assert Chain.balance(%Address{balance: nil}, :wei) == nil + assert Chain.balance(%Address{fetched_balance: Decimal.new(1)}, :wei) == Decimal.new(1) + assert Chain.balance(%Address{fetched_balance: nil}, :wei) == nil end test "with Address.t with :gwei" do - assert Chain.balance(%Address{balance: Decimal.new(1)}, :gwei) == Decimal.new("1e-9") - assert Chain.balance(%Address{balance: Decimal.new("1e9")}, :gwei) == Decimal.new(1) - assert Chain.balance(%Address{balance: nil}, :gwei) == nil + assert Chain.balance(%Address{fetched_balance: Decimal.new(1)}, :gwei) == Decimal.new("1e-9") + assert Chain.balance(%Address{fetched_balance: Decimal.new("1e9")}, :gwei) == Decimal.new(1) + assert Chain.balance(%Address{fetched_balance: nil}, :gwei) == nil end test "with Address.t with :ether" do - assert Chain.balance(%Address{balance: Decimal.new(1)}, :ether) == Decimal.new("1e-18") - assert Chain.balance(%Address{balance: Decimal.new("1e18")}, :ether) == Decimal.new(1) - assert Chain.balance(%Address{balance: nil}, :ether) == nil + assert Chain.balance(%Address{fetched_balance: Decimal.new(1)}, :ether) == Decimal.new("1e-18") + assert Chain.balance(%Address{fetched_balance: Decimal.new("1e18")}, :ether) == Decimal.new(1) + assert Chain.balance(%Address{fetched_balance: nil}, :ether) == nil end end From d907bd3f75c20ccb17fe15e19d6686beaa137595 Mon Sep 17 00:00:00 2001 From: Chris McCord Date: Thu, 3 May 2018 10:31:42 -0400 Subject: [PATCH 2/2] Bump docs --- .../lib/explorer/indexer/address_fetcher.ex | 19 ++++++++++++------- .../lib/explorer/indexer/block_fetcher.ex | 2 +- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/apps/explorer/lib/explorer/indexer/address_fetcher.ex b/apps/explorer/lib/explorer/indexer/address_fetcher.ex index da22f4ac96..41e711780e 100644 --- a/apps/explorer/lib/explorer/indexer/address_fetcher.ex +++ b/apps/explorer/lib/explorer/indexer/address_fetcher.ex @@ -1,21 +1,23 @@ defmodule Explorer.Indexer.AddressFetcher do @moduledoc """ - TODO + Fetches and indexes `t:Explorer.Chain.Address.t/0` balances. """ use GenServer require Logger alias Explorer.Chain + alias Explorer.Chain.{ Address, - Hash, + Hash } + alias Explorer.JSONRPC @fetch_interval :timer.seconds(3) @max_batch_size 500 - def async_fetch_addresses(address_hashes) do + def async_fetch_balances(address_hashes) do GenServer.cast(__MODULE__, {:buffer_addresses, address_hashes}) end @@ -47,6 +49,7 @@ defmodule Explorer.Indexer.AddressFetcher do def handle_info({:DOWN, _ref, :process, _pid, :normal}, state) do {:noreply, state} end + def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do batch = Map.fetch!(state.tasks, ref) @@ -76,8 +79,9 @@ defmodule Explorer.Indexer.AddressFetcher do {state.tasks, state.buffer} |> Chain.stream_unfetched_addresses(fn %Address{hash: hash}, {tasks, batch} -> batch = MapSet.put(batch, Hash.to_string(hash)) + if MapSet.size(batch) >= @max_batch_size do - task = async_fetch_balances(batch) + task = do_async_fetch_balances(batch) {Map.put(tasks, task.ref, batch), MapSet.new()} else {tasks, batch} @@ -87,9 +91,10 @@ defmodule Explorer.Indexer.AddressFetcher do %{state | tasks: tasks} end + defp fetch_remaining({:ok, {tasks, batch}}) do if MapSet.size(batch) > 0 do - task = async_fetch_balances(batch) + task = do_async_fetch_balances(batch) Map.put(tasks, task.ref, batch) else tasks @@ -98,7 +103,7 @@ defmodule Explorer.Indexer.AddressFetcher do defp flush_buffer(state) do if MapSet.size(state.buffer) > 0 do - task = async_fetch_balances(state.buffer) + task = do_async_fetch_balances(state.buffer) new_tasks = Map.put(state.tasks, task.ref, state.buffer) %{state | tasks: new_tasks, buffer: MapSet.new()} @@ -115,7 +120,7 @@ defmodule Explorer.Indexer.AddressFetcher do JSONRPC.fetch_balances_by_hash(address_hashes) end - defp async_fetch_balances(hashes_mapset) do + defp do_async_fetch_balances(hashes_mapset) do Task.Supervisor.async_nolink(Explorer.Indexer.TaskSupervisor, fn -> Logger.debug(fn -> "fetching #{MapSet.size(hashes_mapset)} balances" end) {:ok, balances} = do_fetch_addresses(Enum.to_list(hashes_mapset)) diff --git a/apps/explorer/lib/explorer/indexer/block_fetcher.ex b/apps/explorer/lib/explorer/indexer/block_fetcher.ex index 3317eb37d2..945b7f9b57 100644 --- a/apps/explorer/lib/explorer/indexer/block_fetcher.ex +++ b/apps/explorer/lib/explorer/indexer/block_fetcher.ex @@ -178,7 +178,7 @@ defmodule Explorer.Indexer.BlockFetcher do transactions_params: transactions_params }) do {:ok, %{addresses: address_hashes}} -> - :ok = AddressFetcher.async_fetch_addresses(address_hashes) + :ok = AddressFetcher.async_fetch_balances(address_hashes) :ok {:error, step, reason, _changes} ->