|
|
|
@ -19,6 +19,8 @@ defmodule Indexer.Fetcher.Beacon.Blob do |
|
|
|
|
|
|
|
|
|
@default_max_batch_size 10 |
|
|
|
|
@default_max_concurrency 1 |
|
|
|
|
@default_retries_limit 2 |
|
|
|
|
@default_retry_deadline :timer.minutes(5) |
|
|
|
|
|
|
|
|
|
@doc """ |
|
|
|
|
Asynchronously fetches blobs for given `block_timestamp`. |
|
|
|
@ -87,24 +89,40 @@ defmodule Indexer.Fetcher.Beacon.Blob do |
|
|
|
|
Logger.debug(fn -> "fetching" end) |
|
|
|
|
|
|
|
|
|
entries |
|
|
|
|
|> Enum.map(×tamp_to_slot(&1, state)) |
|
|
|
|
|> Enum.map(&entry_to_slot(&1, state)) |
|
|
|
|
|> Client.get_blob_sidecars() |
|
|
|
|
|> case do |
|
|
|
|
{:ok, fetched_blobs, retry_indices} -> |
|
|
|
|
run_fetched_blobs(fetched_blobs) |
|
|
|
|
|
|
|
|
|
if Enum.empty?(retry_indices) do |
|
|
|
|
retry_entities = |
|
|
|
|
retry_indices |
|
|
|
|
|> Enum.map(&Enum.at(entries, &1)) |
|
|
|
|
|> Enum.filter(&should_retry?/1) |
|
|
|
|
|> Enum.map(&increment_retry_count/1) |
|
|
|
|
|
|
|
|
|
if Enum.empty?(retry_entities) do |
|
|
|
|
:ok |
|
|
|
|
else |
|
|
|
|
{:retry, retry_indices |> Enum.map(&Enum.at(entries, &1))} |
|
|
|
|
{:retry, retry_entities} |
|
|
|
|
end |
|
|
|
|
end |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
defp entry(block_timestamp) do |
|
|
|
|
DateTime.to_unix(block_timestamp) |
|
|
|
|
{DateTime.to_unix(block_timestamp), 0} |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
defp increment_retry_count({block_timestamp, retry_count}), do: {block_timestamp, retry_count + 1} |
|
|
|
|
|
|
|
|
|
# Stop retrying after 2 failed retries for slots older than 5 minutes |
|
|
|
|
defp should_retry?({block_timestamp, retry_count}), |
|
|
|
|
do: |
|
|
|
|
retry_count < @default_retries_limit || |
|
|
|
|
block_timestamp + @default_retry_deadline > DateTime.to_unix(DateTime.utc_now()) |
|
|
|
|
|
|
|
|
|
defp entry_to_slot({block_timestamp, _}, state), do: timestamp_to_slot(block_timestamp, state) |
|
|
|
|
|
|
|
|
|
@doc """ |
|
|
|
|
Converts block timestamp to the slot number. |
|
|
|
|
""" |
|
|
|
|