Fetch coin balances in async mode in realtime fetcher (#9182)
* Fetch coin balances in async mode in realtime fetcher * Coin balances fetcher refactor * Don't filter non-traceable blocks in realtime coin balances fetcherpull/9439/head
parent
f554b0d3f5
commit
e7d2dd0ee4
@ -0,0 +1,77 @@ |
||||
defmodule Indexer.Fetcher.CoinBalance.Catchup do |
||||
@moduledoc """ |
||||
Fetches `t:Explorer.Chain.Address.CoinBalance.t/0` and updates `t:Explorer.Chain.Address.t/0` `fetched_coin_balance` and |
||||
`fetched_coin_balance_block_number` to value at max `t:Explorer.Chain.Address.CoinBalance.t/0` `block_number` for the given `t:Explorer.Chain.Address.t/` `hash`. |
||||
""" |
||||
|
||||
use Indexer.Fetcher, restart: :permanent |
||||
use Spandex.Decorators |
||||
|
||||
alias Explorer.Chain |
||||
alias Explorer.Chain.{Block, Hash} |
||||
alias Indexer.{BufferedTask, Tracer} |
||||
alias Indexer.Fetcher.CoinBalance.Catchup.Supervisor, as: CoinBalanceSupervisor |
||||
alias Indexer.Fetcher.CoinBalance.Helper |
||||
|
||||
@behaviour BufferedTask |
||||
|
||||
@default_max_batch_size 500 |
||||
@default_max_concurrency 4 |
||||
|
||||
@doc """ |
||||
Asynchronously fetches balances for each address `hash` at the `block_number`. |
||||
""" |
||||
@spec async_fetch_balances([ |
||||
%{required(:address_hash) => Hash.Address.t(), required(:block_number) => Block.block_number()} |
||||
]) :: :ok |
||||
def async_fetch_balances(balance_fields) when is_list(balance_fields) do |
||||
if CoinBalanceSupervisor.disabled?() do |
||||
:ok |
||||
else |
||||
entries = Enum.map(balance_fields, &Helper.entry/1) |
||||
|
||||
BufferedTask.buffer(__MODULE__, entries) |
||||
end |
||||
end |
||||
|
||||
def child_spec(params) do |
||||
Helper.child_spec(params, defaults(), __MODULE__) |
||||
end |
||||
|
||||
@impl BufferedTask |
||||
def init(initial, reducer, _) do |
||||
{:ok, final} = |
||||
Chain.stream_unfetched_balances( |
||||
initial, |
||||
fn address_fields, acc -> |
||||
address_fields |
||||
|> Helper.entry() |
||||
|> reducer.(acc) |
||||
end, |
||||
true |
||||
) |
||||
|
||||
final |
||||
end |
||||
|
||||
@impl BufferedTask |
||||
@decorate trace( |
||||
name: "fetch", |
||||
resource: "Indexer.Fetcher.CoinBalance.Catchup.run/2", |
||||
service: :indexer, |
||||
tracer: Tracer |
||||
) |
||||
def run(entries, json_rpc_named_arguments) do |
||||
Helper.run(entries, json_rpc_named_arguments, true) |
||||
end |
||||
|
||||
defp defaults do |
||||
[ |
||||
flush_interval: :timer.seconds(3), |
||||
max_batch_size: Application.get_env(:indexer, __MODULE__)[:batch_size] || @default_max_batch_size, |
||||
max_concurrency: Application.get_env(:indexer, __MODULE__)[:concurrency] || @default_max_concurrency, |
||||
task_supervisor: Indexer.Fetcher.CoinBalance.Catchup.TaskSupervisor, |
||||
metadata: [fetcher: :coin_balance_catchup] |
||||
] |
||||
end |
||||
end |
@ -0,0 +1,61 @@ |
||||
defmodule Indexer.Fetcher.CoinBalance.Realtime do |
||||
@moduledoc """ |
||||
Separate version of `Indexer.Fetcher.CoinBalance.Catchup` for fetching balances from realtime block fetcher |
||||
""" |
||||
|
||||
use Indexer.Fetcher, restart: :permanent |
||||
use Spandex.Decorators |
||||
|
||||
alias Explorer.Chain.{Block, Hash} |
||||
alias Indexer.{BufferedTask, Tracer} |
||||
alias Indexer.Fetcher.CoinBalance.Helper |
||||
alias Indexer.Fetcher.CoinBalance.Realtime.Supervisor, as: CoinBalanceSupervisor |
||||
|
||||
@behaviour BufferedTask |
||||
|
||||
@default_max_batch_size 500 |
||||
@default_max_concurrency 4 |
||||
|
||||
@doc """ |
||||
Asynchronously fetches balances for each address `hash` at the `block_number`. |
||||
""" |
||||
@spec async_fetch_balances([ |
||||
%{required(:address_hash) => Hash.Address.t(), required(:block_number) => Block.block_number()} |
||||
]) :: :ok |
||||
def async_fetch_balances(balance_fields) when is_list(balance_fields) do |
||||
entries = Enum.map(balance_fields, &Helper.entry/1) |
||||
|
||||
BufferedTask.buffer(__MODULE__, entries) |
||||
end |
||||
|
||||
def child_spec(params) do |
||||
Helper.child_spec(params, defaults(), __MODULE__) |
||||
end |
||||
|
||||
@impl BufferedTask |
||||
def init(_, _, _) do |
||||
{0, []} |
||||
end |
||||
|
||||
@impl BufferedTask |
||||
@decorate trace( |
||||
name: "fetch", |
||||
resource: "Indexer.Fetcher.CoinBalance.Realtime.run/2", |
||||
service: :indexer, |
||||
tracer: Tracer |
||||
) |
||||
def run(entries, json_rpc_named_arguments) do |
||||
Helper.run(entries, json_rpc_named_arguments, false) |
||||
end |
||||
|
||||
defp defaults do |
||||
[ |
||||
poll: false, |
||||
flush_interval: :timer.seconds(3), |
||||
max_batch_size: Application.get_env(:indexer, __MODULE__)[:batch_size] || @default_max_batch_size, |
||||
max_concurrency: Application.get_env(:indexer, __MODULE__)[:concurrency] || @default_max_concurrency, |
||||
task_supervisor: Indexer.Fetcher.CoinBalance.Realtime.TaskSupervisor, |
||||
metadata: [fetcher: :coin_balance_realtime] |
||||
] |
||||
end |
||||
end |
@ -1,68 +0,0 @@ |
||||
defmodule Indexer.Fetcher.CoinBalanceDailyUpdater do |
||||
@moduledoc """ |
||||
Accumulates and periodically updates daily coin balances |
||||
""" |
||||
|
||||
use GenServer |
||||
|
||||
alias Explorer.Chain |
||||
alias Explorer.Counters.AverageBlockTime |
||||
alias Timex.Duration |
||||
|
||||
@default_update_interval :timer.seconds(10) |
||||
|
||||
def start_link(_) do |
||||
GenServer.start_link(__MODULE__, :ok, name: __MODULE__) |
||||
end |
||||
|
||||
@impl true |
||||
def init(_) do |
||||
schedule_next_update() |
||||
|
||||
{:ok, %{}} |
||||
end |
||||
|
||||
def add_daily_balances_params(daily_balances_params) do |
||||
GenServer.cast(__MODULE__, {:add_daily_balances_params, daily_balances_params}) |
||||
end |
||||
|
||||
@impl true |
||||
def handle_cast({:add_daily_balances_params, daily_balances_params}, state) do |
||||
{:noreply, Enum.reduce(daily_balances_params, state, &put_new_param/2)} |
||||
end |
||||
|
||||
defp put_new_param(%{day: day, address_hash: address_hash, value: value} = param, acc) do |
||||
Map.update(acc, {address_hash, day}, param, fn %{value: old_value} = old_param -> |
||||
if is_nil(old_value) or value > old_value, do: param, else: old_param |
||||
end) |
||||
end |
||||
|
||||
@impl true |
||||
def handle_info(:update, state) when state == %{} do |
||||
schedule_next_update() |
||||
|
||||
{:noreply, %{}} |
||||
end |
||||
|
||||
def handle_info(:update, state) do |
||||
Chain.import(%{address_coin_balances_daily: %{params: Map.values(state)}}) |
||||
|
||||
schedule_next_update() |
||||
|
||||
{:noreply, %{}} |
||||
end |
||||
|
||||
def handle_info(_, state) do |
||||
{:noreply, state} |
||||
end |
||||
|
||||
defp schedule_next_update do |
||||
update_interval = |
||||
case AverageBlockTime.average_block_time() do |
||||
{:error, :disabled} -> @default_update_interval |
||||
block_time -> round(Duration.to_milliseconds(block_time)) |
||||
end |
||||
|
||||
Process.send_after(self(), :update, update_interval) |
||||
end |
||||
end |
@ -0,0 +1,17 @@ |
||||
defmodule Indexer.Fetcher.CoinBalance.Realtime.Supervisor.Case do |
||||
alias Indexer.Fetcher.CoinBalance.Realtime |
||||
|
||||
def start_supervised!(fetcher_arguments \\ []) when is_list(fetcher_arguments) do |
||||
merged_fetcher_arguments = |
||||
Keyword.merge( |
||||
fetcher_arguments, |
||||
flush_interval: 50, |
||||
max_batch_size: 1, |
||||
max_concurrency: 1 |
||||
) |
||||
|
||||
[merged_fetcher_arguments] |
||||
|> Realtime.Supervisor.child_spec() |
||||
|> ExUnit.Callbacks.start_supervised!() |
||||
end |
||||
end |
Loading…
Reference in new issue