|
|
|
@ -49,9 +49,44 @@ defmodule Explorer.BufferedTask do |
|
|
|
|
|
|
|
|
|
require Logger |
|
|
|
|
|
|
|
|
|
@doc """ |
|
|
|
|
Populates a task's buffer on boot with an initial set of entries. |
|
|
|
|
|
|
|
|
|
For example, the following callback would buffer all unfetched account balances on startup: |
|
|
|
|
|
|
|
|
|
def init(acc, reducer) do |
|
|
|
|
Chain.stream_unfetched_addresses([:hash], acc, fn %{hash: hash}, acc -> |
|
|
|
|
reducer.(Hash.to_string(hash), acc) |
|
|
|
|
end) |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
The `init/2` operation may be long-running and allows concurrent calls to `Explorer.BufferedTask.buffer/2` for |
|
|
|
|
on-demand entries. |
|
|
|
|
""" |
|
|
|
|
@callback init(initial :: term, reducer :: function) :: |
|
|
|
|
{:ok, accumulated_results :: term | initial :: term} | {:error, reason :: term} |
|
|
|
|
|
|
|
|
|
@doc """ |
|
|
|
|
Invoked as concurrency becomes available with a list of batched entries to be processed. |
|
|
|
|
|
|
|
|
|
For example, the `run/2` callback for above could be written: |
|
|
|
|
|
|
|
|
|
def run(string_hashes, _retries) do |
|
|
|
|
case EthereumJSONRPC.fetch_balances_by_hash(string_hashes) do |
|
|
|
|
{:ok, results} -> :ok = Chain.update_balances(results) |
|
|
|
|
{:error, reason} -> {:retry, reason} |
|
|
|
|
end |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
If a task crashes, it will be retried automatically with an increased `retries` count passed in as the second |
|
|
|
|
argument. Tasks may also be programmatically retried by returning `{:retry, reason}` from `run/2`. |
|
|
|
|
|
|
|
|
|
## Returns |
|
|
|
|
|
|
|
|
|
* `:ok` - run was successful |
|
|
|
|
* `{:retry, reason}` - run should be retried after it failed due to `reason` |
|
|
|
|
|
|
|
|
|
""" |
|
|
|
|
@callback run(entries :: list, retries :: pos_integer) :: :ok | {:retry, reason :: term} | {:halt, reason :: term} |
|
|
|
|
|
|
|
|
|
@doc """ |
|
|
|
|