Merge pull request #1375 from poanetwork/fetch-latest-coin-balance
feat: synchronously fetch coin balances when an address is viewed.pull/1444/head
commit
5b5a0b3cfe
@ -0,0 +1,180 @@ |
||||
defmodule Indexer.CoinBalance.OnDemandFetcher do |
||||
@moduledoc """ |
||||
Ensures that we have a reasonably up to date coin balance for a given address. |
||||
|
||||
If we have an unfetched coin balance for that address, it will be synchronously fetched. |
||||
If not we will fetch the coin balance and created a fetched coin balance. |
||||
If we have a fetched coin balance, but it is over 100 blocks old, we will fetch and create a fetched coin baalnce. |
||||
""" |
||||
|
||||
@latest_balance_stale_threshold :timer.hours(24) |
||||
|
||||
use GenServer |
||||
|
||||
import Ecto.Query, only: [from: 2] |
||||
import EthereumJSONRPC, only: [integer_to_quantity: 1] |
||||
|
||||
alias EthereumJSONRPC.FetchedBalances |
||||
alias Explorer.{Chain, Repo} |
||||
alias Explorer.Chain.{Address, BlockNumberCache} |
||||
alias Explorer.Chain.Address.CoinBalance |
||||
alias Explorer.Counters.AverageBlockTime |
||||
alias Indexer.CoinBalance.Fetcher |
||||
alias Timex.Duration |
||||
|
||||
@type block_number :: integer |
||||
|
||||
@typedoc """ |
||||
`block_number` represents the block that we will be updating the address to. |
||||
|
||||
If there is a pending balance in the window, we will not fetch the balance |
||||
as of the latest block, we will instead fetch that pending balance. |
||||
""" |
||||
@type balance_status :: |
||||
:current |
||||
| {:stale, block_number} |
||||
| {:pending, block_number} |
||||
|
||||
## Interface |
||||
|
||||
@spec trigger_fetch(Address.t()) :: balance_status |
||||
def trigger_fetch(address) do |
||||
latest_block_number = latest_block_number() |
||||
|
||||
case stale_balance_window(latest_block_number) do |
||||
{:error, :no_average_block_time} -> |
||||
:current |
||||
|
||||
stale_balance_window -> |
||||
do_trigger_fetch(address, latest_block_number, stale_balance_window) |
||||
end |
||||
end |
||||
|
||||
## Callbacks |
||||
|
||||
def child_spec([json_rpc_named_arguments, server_opts]) do |
||||
%{ |
||||
id: __MODULE__, |
||||
start: {__MODULE__, :start_link, [json_rpc_named_arguments, server_opts]}, |
||||
type: :worker |
||||
} |
||||
end |
||||
|
||||
def start_link(json_rpc_named_arguments, server_opts) do |
||||
GenServer.start_link(__MODULE__, json_rpc_named_arguments, server_opts) |
||||
end |
||||
|
||||
def init(json_rpc_named_arguments) do |
||||
{:ok, %{json_rpc_named_arguments: json_rpc_named_arguments}} |
||||
end |
||||
|
||||
def handle_cast({:fetch_and_update, block_number, address}, state) do |
||||
fetch_and_update(block_number, address, state.json_rpc_named_arguments) |
||||
|
||||
{:noreply, state} |
||||
end |
||||
|
||||
def handle_cast({:fetch_and_import, block_number, address}, state) do |
||||
fetch_and_import(block_number, address, state.json_rpc_named_arguments) |
||||
|
||||
{:noreply, state} |
||||
end |
||||
|
||||
## Implementation |
||||
|
||||
defp do_trigger_fetch(%Address{fetched_coin_balance_block_number: nil} = address, latest_block_number, _) do |
||||
GenServer.cast(__MODULE__, {:fetch_and_update, latest_block_number, address}) |
||||
|
||||
{:stale, 0} |
||||
end |
||||
|
||||
defp do_trigger_fetch(address, latest_block_number, stale_balance_window) do |
||||
latest = |
||||
from( |
||||
cb in CoinBalance, |
||||
where: cb.address_hash == ^address.hash, |
||||
where: cb.block_number >= ^stale_balance_window, |
||||
where: is_nil(cb.value_fetched_at), |
||||
order_by: [desc: :block_number], |
||||
limit: 1 |
||||
) |
||||
|
||||
if address.fetched_coin_balance_block_number < stale_balance_window do |
||||
GenServer.cast(__MODULE__, {:fetch_and_update, latest_block_number, address}) |
||||
|
||||
{:stale, latest_block_number} |
||||
else |
||||
case Repo.one(latest) do |
||||
nil -> |
||||
# There is no recent coin balance to fetch, so we check to see how old the |
||||
# balance is on the address. If it is too old, we check again, just to be safe. |
||||
:current |
||||
|
||||
%CoinBalance{value_fetched_at: nil, block_number: block_number} -> |
||||
GenServer.cast(__MODULE__, {:fetch_and_import, block_number, address}) |
||||
|
||||
{:pending, block_number} |
||||
|
||||
%CoinBalance{} -> |
||||
:current |
||||
end |
||||
end |
||||
end |
||||
|
||||
defp fetch_and_import(block_number, address, json_rpc_named_arguments) do |
||||
case fetch_balances(block_number, address, json_rpc_named_arguments) do |
||||
{:ok, fetched_balances} -> do_import(fetched_balances) |
||||
_ -> :ok |
||||
end |
||||
end |
||||
|
||||
defp fetch_and_update(block_number, address, json_rpc_named_arguments) do |
||||
case fetch_balances(block_number, address, json_rpc_named_arguments) do |
||||
{:ok, %{params_list: []}} -> |
||||
:ok |
||||
|
||||
{:ok, %{params_list: params_list}} -> |
||||
address_params = Fetcher.balances_params_to_address_params(params_list) |
||||
|
||||
Chain.import(%{ |
||||
addresses: %{params: address_params, with: :balance_changeset}, |
||||
broadcast: :on_demand |
||||
}) |
||||
|
||||
_ -> |
||||
:ok |
||||
end |
||||
end |
||||
|
||||
defp fetch_balances(block_number, address, json_rpc_named_arguments) do |
||||
params = %{block_quantity: integer_to_quantity(block_number), hash_data: to_string(address.hash)} |
||||
|
||||
EthereumJSONRPC.fetch_balances([params], json_rpc_named_arguments) |
||||
end |
||||
|
||||
defp do_import(%FetchedBalances{} = fetched_balances) do |
||||
case Fetcher.import_fetched_balances(fetched_balances, :on_demand) do |
||||
{:ok, %{addresses: [address]}} -> {:ok, address} |
||||
_ -> :error |
||||
end |
||||
end |
||||
|
||||
defp latest_block_number do |
||||
BlockNumberCache.max_number() |
||||
end |
||||
|
||||
defp stale_balance_window(block_number) do |
||||
case AverageBlockTime.average_block_time() do |
||||
{:error, :disabled} -> |
||||
{:error, :no_average_block_time} |
||||
|
||||
duration -> |
||||
average_block_time = |
||||
duration |
||||
|> Duration.to_milliseconds() |
||||
|> round() |
||||
|
||||
block_number - div(@latest_balance_stale_threshold, average_block_time) |
||||
end |
||||
end |
||||
end |
@ -0,0 +1,153 @@ |
||||
defmodule Indexer.CoinBalance.OnDemandFetcherTest do |
||||
# MUST be `async: false` so that {:shared, pid} is set for connection to allow CoinBalanceFetcher's self-send to have |
||||
# connection allowed immediately. |
||||
use EthereumJSONRPC.Case, async: false |
||||
use Explorer.DataCase |
||||
|
||||
import Mox |
||||
|
||||
alias Explorer.Chain.Events.Subscriber |
||||
alias Explorer.Chain.Wei |
||||
alias Explorer.Counters.AverageBlockTime |
||||
alias Indexer.CoinBalance.OnDemandFetcher |
||||
|
||||
@moduletag :capture_log |
||||
|
||||
# MUST use global mode because we aren't guaranteed to get `start_supervised`'s pid back fast enough to `allow` it to |
||||
# use expectations and stubs from test's pid. |
||||
setup :set_mox_global |
||||
|
||||
setup :verify_on_exit! |
||||
|
||||
setup %{json_rpc_named_arguments: json_rpc_named_arguments} do |
||||
mocked_json_rpc_named_arguments = Keyword.put(json_rpc_named_arguments, :transport, EthereumJSONRPC.Mox) |
||||
|
||||
start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor}) |
||||
start_supervised!(AverageBlockTime) |
||||
start_supervised!({OnDemandFetcher, [mocked_json_rpc_named_arguments, [name: OnDemandFetcher]]}) |
||||
|
||||
Application.put_env(:explorer, AverageBlockTime, enabled: true) |
||||
|
||||
on_exit(fn -> |
||||
Application.put_env(:explorer, AverageBlockTime, enabled: false) |
||||
end) |
||||
|
||||
%{json_rpc_named_arguments: mocked_json_rpc_named_arguments} |
||||
end |
||||
|
||||
describe "trigger_fetch/1" do |
||||
setup do |
||||
now = Timex.now() |
||||
|
||||
# we space these very far apart so that we know it will consider the 0th block stale (it calculates how far |
||||
# back we'd need to go to get 24 hours in the past) |
||||
block_0 = insert(:block, number: 0, timestamp: Timex.shift(now, hours: -50)) |
||||
AverageBlockTime.average_block_time(block_0) |
||||
block_1 = insert(:block, number: 1, timestamp: now) |
||||
AverageBlockTime.average_block_time(block_1) |
||||
|
||||
stale_address = insert(:address, fetched_coin_balance: 1, fetched_coin_balance_block_number: 0) |
||||
current_address = insert(:address, fetched_coin_balance: 1, fetched_coin_balance_block_number: 1) |
||||
|
||||
pending_address = insert(:address, fetched_coin_balance: 1, fetched_coin_balance_block_number: 1) |
||||
insert(:unfetched_balance, address_hash: pending_address.hash, block_number: 2) |
||||
|
||||
%{stale_address: stale_address, current_address: current_address, pending_address: pending_address} |
||||
end |
||||
|
||||
test "treats all addresses as current if the average block time is disabled", %{stale_address: address} do |
||||
Application.put_env(:explorer, AverageBlockTime, enabled: false) |
||||
|
||||
assert OnDemandFetcher.trigger_fetch(address) == :current |
||||
end |
||||
|
||||
test "if the address has not been fetched within the last 24 hours of blocks it is considered stale", %{ |
||||
stale_address: address |
||||
} do |
||||
assert OnDemandFetcher.trigger_fetch(address) == {:stale, 1} |
||||
end |
||||
|
||||
test "if the address has been fetched within the last 24 hours of blocks it is considered current", %{ |
||||
current_address: address |
||||
} do |
||||
assert OnDemandFetcher.trigger_fetch(address) == :current |
||||
end |
||||
|
||||
test "if there is an unfetched balance within the window for an address, it is considered pending", %{ |
||||
pending_address: pending_address |
||||
} do |
||||
assert OnDemandFetcher.trigger_fetch(pending_address) == {:pending, 2} |
||||
end |
||||
end |
||||
|
||||
describe "update behaviour" do |
||||
setup do |
||||
Subscriber.to(:addresses, :on_demand) |
||||
Subscriber.to(:address_coin_balances, :on_demand) |
||||
|
||||
now = Timex.now() |
||||
|
||||
# we space these very far apart so that we know it will consider the 0th block stale (it calculates how far |
||||
# back we'd need to go to get 24 hours in the past) |
||||
block_0 = insert(:block, number: 0, timestamp: Timex.shift(now, hours: -50)) |
||||
AverageBlockTime.average_block_time(block_0) |
||||
block_1 = insert(:block, number: 1, timestamp: now) |
||||
AverageBlockTime.average_block_time(block_1) |
||||
|
||||
:ok |
||||
end |
||||
|
||||
test "a stale address broadcasts the new address" do |
||||
address = insert(:address, fetched_coin_balance: 1, fetched_coin_balance_block_number: 0) |
||||
address_hash = address.hash |
||||
string_address_hash = to_string(address.hash) |
||||
|
||||
expect(EthereumJSONRPC.Mox, :json_rpc, 1, fn [ |
||||
%{ |
||||
id: id, |
||||
method: "eth_getBalance", |
||||
params: [^string_address_hash, "0x1"] |
||||
} |
||||
], |
||||
_options -> |
||||
{:ok, [%{id: id, jsonrpc: "2.0", result: "0x02"}]} |
||||
end) |
||||
|
||||
assert OnDemandFetcher.trigger_fetch(address) == {:stale, 1} |
||||
|
||||
{:ok, expected_wei} = Wei.cast(2) |
||||
|
||||
assert_receive( |
||||
{:chain_event, :addresses, :on_demand, |
||||
[%{hash: ^address_hash, fetched_coin_balance: ^expected_wei, fetched_coin_balance_block_number: 1}]} |
||||
) |
||||
end |
||||
|
||||
test "a pending address broadcasts the new address and the new coin balance" do |
||||
address = insert(:address, fetched_coin_balance: 0, fetched_coin_balance_block_number: 1) |
||||
insert(:unfetched_balance, address_hash: address.hash, block_number: 2) |
||||
address_hash = address.hash |
||||
string_address_hash = to_string(address.hash) |
||||
|
||||
expect(EthereumJSONRPC.Mox, :json_rpc, 1, fn [ |
||||
%{ |
||||
id: id, |
||||
method: "eth_getBalance", |
||||
params: [^string_address_hash, "0x2"] |
||||
} |
||||
], |
||||
_options -> |
||||
{:ok, [%{id: id, jsonrpc: "2.0", result: "0x02"}]} |
||||
end) |
||||
|
||||
assert OnDemandFetcher.trigger_fetch(address) == {:pending, 2} |
||||
|
||||
{:ok, expected_wei} = Wei.cast(2) |
||||
|
||||
assert_receive( |
||||
{:chain_event, :addresses, :on_demand, |
||||
[%{hash: ^address_hash, fetched_coin_balance: ^expected_wei, fetched_coin_balance_block_number: 2}]} |
||||
) |
||||
end |
||||
end |
||||
end |
Loading…
Reference in new issue