diff --git a/apps/explorer/lib/explorer/indexer/address_fetcher.ex b/apps/explorer/lib/explorer/indexer/address_fetcher.ex index b972f6c365..91f69cd9b6 100644 --- a/apps/explorer/lib/explorer/indexer/address_fetcher.ex +++ b/apps/explorer/lib/explorer/indexer/address_fetcher.ex @@ -31,7 +31,8 @@ defmodule Explorer.Indexer.AddressFetcher do state = %{ debug_logs: Keyword.get(opts, :debug_logs, false), flush_timer: nil, - fetch_interval: @fetch_interval, + fetch_interval: Keyword.get(opts, :fetch_interval, @fetch_interval), + max_batch_size: Keyword.get(opts, :max_batch_size, @max_batch_size), buffer: :queue.new(), tasks: %{} } @@ -90,7 +91,7 @@ defmodule Explorer.Indexer.AddressFetcher do |> Chain.stream_unfetched_addresses(fn %Address{hash: hash}, batch -> batch = :queue.in(Hash.to_string(hash), batch) - if :queue.len(batch) >= @max_batch_size do + if :queue.len(batch) >= state.max_batch_size do schedule_async_fetch(:queue.to_list(batch)) :queue.new() else diff --git a/apps/explorer/test/explorer/indexer/address_fetcher_test.exs b/apps/explorer/test/explorer/indexer/address_fetcher_test.exs new file mode 100644 index 0000000000..7fafffc6fa --- /dev/null +++ b/apps/explorer/test/explorer/indexer/address_fetcher_test.exs @@ -0,0 +1,85 @@ +defmodule Explorer.Indexer.AddressFetcherTest do + # MUST be `async: false` so that {:shared, pid} is set for connection to allow AddressFetcher's self-send to have + # connection allowed immediately. + use Explorer.DataCase, async: false + + alias Explorer.Chain.Address + alias Explorer.JSONRPC + alias Explorer.Indexer.AddressFetcher + + @hash %Explorer.Chain.Hash{ + byte_count: 20, + bytes: <<139, 243, 141, 71, 100, 146, 144, 100, 242, 212, 211, 165, 101, 32, 167, 106, 179, 223, 65, 91>> + } + + setup do + start_supervised!({JSONRPC, []}) + start_supervised!({Task.Supervisor, name: Explorer.Indexer.TaskSupervisor}) + + :ok + end + + describe "init/1" do + test "fetches unfetched addresses" do + unfetched_address = insert(:address, hash: @hash) + + assert unfetched_address.fetched_balance == nil + assert unfetched_address.balance_fetched_at == nil + + start_address_fetcher() + + fetched_address = + wait(fn -> + Repo.one!(from(address in Address, where: address.hash == ^@hash and not is_nil(address.fetched_balance))) + end) + + refute fetched_address.balance_fetched_at == nil + end + + test "fetches unfetched addresses when less than max batch size" do + insert(:address, hash: @hash) + + start_address_fetcher(max_batch_size: 2) + + fetched_address = + wait(fn -> + Repo.one!(from(address in Address, where: address.hash == ^@hash and not is_nil(address.fetched_balance))) + end) + + refute fetched_address.balance_fetched_at == nil + end + end + + describe "async_fetch_balances/1" do + test "fetches balances for address_hashes" do + start_address_fetcher() + + assert :ok = AddressFetcher.async_fetch_balances([@hash]) + + address = + wait(fn -> + Repo.get!(Address, @hash) + end) + + refute address.fetched_balance == nil + end + end + + defp start_address_fetcher(options \\ []) when is_list(options) do + start_supervised!( + {AddressFetcher, + Keyword.merge( + [debug_logs: false, fetch_interval: 1, max_batch_size: 1, max_concurrency: 1], + options + )} + ) + end + + defp wait(producer) do + producer.() + rescue + Ecto.NoResultsError -> + Process.sleep(100) + wait(producer) + end +end diff --git a/coveralls.json b/coveralls.json index 252126b717..9e27d7c3ad 100644 --- a/coveralls.json +++ b/coveralls.json @@ -1,7 +1,7 @@ { "coverage_options": { "treat_no_relevant_lines_as_covered": true, - "minimum_coverage": 85 + "minimum_coverage": 87.2 }, "terminal_options": { "file_column_width": 120