diff --git a/apps/explorer/lib/explorer/chain.ex b/apps/explorer/lib/explorer/chain.ex index 518e093e21..05f1f0409c 100644 --- a/apps/explorer/lib/explorer/chain.ex +++ b/apps/explorer/lib/explorer/chain.ex @@ -955,7 +955,7 @@ defmodule Explorer.Chain do @doc """ Counts all of the block validations and groups by the `miner_hash`. """ - def group_block_validations_by_address do + def each_address_block_validation_count(fun) when is_function(fun, 1) do query = from( b in Block, @@ -965,7 +965,7 @@ defmodule Explorer.Chain do group_by: b.miner_hash ) - Repo.all(query) + Repo.stream_each(query, fun) end @doc """ @@ -1007,21 +1007,14 @@ defmodule Explorer.Chain do ) :: {:ok, accumulator} when accumulator: term() def stream_unfetched_balances(initial, reducer) when is_function(reducer, 2) do - Repo.transaction( - fn -> - query = - from( - balance in CoinBalance, - where: is_nil(balance.value_fetched_at), - select: %{address_hash: balance.address_hash, block_number: balance.block_number} - ) + query = + from( + balance in CoinBalance, + where: is_nil(balance.value_fetched_at), + select: %{address_hash: balance.address_hash, block_number: balance.block_number} + ) - query - |> Repo.stream(timeout: :infinity) - |> Enum.reduce(initial, reducer) - end, - timeout: :infinity - ) + Repo.stream_reduce(query, initial, reducer) end @doc """ @@ -1033,16 +1026,8 @@ defmodule Explorer.Chain do ) :: {:ok, accumulator} when accumulator: term() def stream_unfetched_token_balances(initial, reducer) when is_function(reducer, 2) do - Repo.transaction( - fn -> - query = TokenBalance.unfetched_token_balances() - - query - |> Repo.stream(timeout: :infinity) - |> Enum.reduce(initial, reducer) - end, - timeout: :infinity - ) + TokenBalance.unfetched_token_balances() + |> Repo.stream_reduce(initial, reducer) end @doc """ @@ -1097,22 +1082,15 @@ defmodule Explorer.Chain do ) :: {:ok, accumulator} when accumulator: term() def stream_transactions_with_unfetched_internal_transactions(fields, initial, reducer) when is_function(reducer, 2) do - Repo.transaction( - fn -> - query = - from( - t in Transaction, - # exclude pending transactions - where: not is_nil(t.block_hash) and is_nil(t.internal_transactions_indexed_at), - select: ^fields - ) + query = + from( + t in Transaction, + # exclude pending transactions + where: not is_nil(t.block_hash) and is_nil(t.internal_transactions_indexed_at), + select: ^fields + ) - query - |> Repo.stream(timeout: :infinity) - |> Enum.reduce(initial, reducer) - end, - timeout: :infinity - ) + Repo.stream_reduce(query, initial, reducer) end @doc """ @@ -1129,21 +1107,14 @@ defmodule Explorer.Chain do ) :: {:ok, accumulator} when accumulator: term() def stream_unfetched_uncle_hashes(initial, reducer) when is_function(reducer, 2) do - Repo.transaction( - fn -> - query = - from(bsdr in Block.SecondDegreeRelation, - where: is_nil(bsdr.uncle_fetched_at), - select: bsdr.uncle_hash, - group_by: bsdr.uncle_hash - ) + query = + from(bsdr in Block.SecondDegreeRelation, + where: is_nil(bsdr.uncle_fetched_at), + select: bsdr.uncle_hash, + group_by: bsdr.uncle_hash + ) - query - |> Repo.stream(timeout: :infinity) - |> Enum.reduce(initial, reducer) - end, - timeout: :infinity - ) + Repo.stream_reduce(query, initial, reducer) end @doc """ @@ -1938,22 +1909,15 @@ defmodule Explorer.Chain do reducer :: (entry :: Hash.Address.t(), accumulator -> accumulator) ) :: {:ok, accumulator} when accumulator: term() - def stream_uncataloged_token_contract_address_hashes(initial_acc, reducer) when is_function(reducer, 2) do - Repo.transaction( - fn -> - query = - from( - token in Token, - where: token.cataloged == false, - select: token.contract_address_hash - ) + def stream_uncataloged_token_contract_address_hashes(initial, reducer) when is_function(reducer, 2) do + query = + from( + token in Token, + where: token.cataloged == false, + select: token.contract_address_hash + ) - query - |> Repo.stream(timeout: :infinity) - |> Enum.reduce(initial_acc, reducer) - end, - timeout: :infinity - ) + Repo.stream_reduce(query, initial, reducer) end @doc """ @@ -1964,16 +1928,10 @@ defmodule Explorer.Chain do reducer :: (entry :: Hash.Address.t(), accumulator -> accumulator) ) :: {:ok, accumulator} when accumulator: term() - def stream_cataloged_token_contract_address_hashes(initial_acc, reducer) when is_function(reducer, 2) do - Repo.transaction( - fn -> - Chain.Token.cataloged_tokens() - |> order_by(asc: :updated_at) - |> Repo.stream(timeout: :infinity) - |> Enum.reduce(initial_acc, reducer) - end, - timeout: :infinity - ) + def stream_cataloged_token_contract_address_hashes(initial, reducer) when is_function(reducer, 2) do + Chain.Token.cataloged_tokens() + |> order_by(asc: :updated_at) + |> Repo.stream_reduce(initial, reducer) end @doc """ @@ -1992,14 +1950,7 @@ defmodule Explorer.Chain do distinct: t.block_number ) - Repo.transaction( - fn -> - query - |> Repo.stream(timeout: :infinity) - |> Enum.reduce([], &[&1 | &2]) - end, - timeout: :infinity - ) + Repo.stream_reduce(query, [], &[&1 | &2]) end @doc """ diff --git a/apps/explorer/lib/explorer/chain/token_transfer.ex b/apps/explorer/lib/explorer/chain/token_transfer.ex index 80c46fa64d..0a00803537 100644 --- a/apps/explorer/lib/explorer/chain/token_transfer.ex +++ b/apps/explorer/lib/explorer/chain/token_transfer.ex @@ -226,7 +226,7 @@ defmodule Explorer.Chain.TokenTransfer do @doc """ Counts all the token transfers and groups by token contract address hash. """ - def count_token_transfers do + def each_count(fun) when is_function(fun, 1) do query = from( tt in TokenTransfer, @@ -236,6 +236,6 @@ defmodule Explorer.Chain.TokenTransfer do group_by: tt.token_contract_address_hash ) - Repo.all(query) + Repo.stream_each(query, fun) end end diff --git a/apps/explorer/lib/explorer/counters/block_validation_counter.ex b/apps/explorer/lib/explorer/counters/block_validation_counter.ex index 8b47b213fb..22b3935106 100644 --- a/apps/explorer/lib/explorer/counters/block_validation_counter.ex +++ b/apps/explorer/lib/explorer/counters/block_validation_counter.ex @@ -60,11 +60,9 @@ defmodule Explorer.Counters.BlockValidationCounter do Consolidates the number of block validations grouped by `address_hash`. """ def consolidate_blocks do - total_block_validations = Chain.group_block_validations_by_address() - - for {address_hash, total} <- total_block_validations do + Chain.each_address_block_validation_count(fn {address_hash, total} -> insert_or_update_counter(address_hash, total) - end + end) end @doc """ diff --git a/apps/explorer/lib/explorer/counters/token_holders_counter.ex b/apps/explorer/lib/explorer/counters/token_holders_counter.ex index 62d51148c2..a1bd862db1 100644 --- a/apps/explorer/lib/explorer/counters/token_holders_counter.ex +++ b/apps/explorer/lib/explorer/counters/token_holders_counter.ex @@ -6,6 +6,7 @@ defmodule Explorer.Counters.TokenHoldersCounter do """ alias Explorer.Chain.Address.TokenBalance + alias Explorer.Chain.Hash alias Explorer.Repo @table :token_holders_counter @@ -59,11 +60,9 @@ defmodule Explorer.Counters.TokenHoldersCounter do """ def consolidate do TokenBalance.tokens_grouped_by_number_of_holders() - |> Repo.all() - |> Enum.map(fn {token, number_of_holders} -> - {token.bytes, number_of_holders} + |> Repo.stream_each(fn {%Hash{bytes: bytes}, number_of_holders} -> + insert_counter({bytes, number_of_holders}) end) - |> insert_counter() end defp schedule_next_consolidation do @@ -76,8 +75,8 @@ defmodule Explorer.Counters.TokenHoldersCounter do @doc """ Fetches the token holders info for a specific token from the `:ets` table. """ - def fetch(token_hash) do - do_fetch(:ets.lookup(table_name(), token_hash.bytes)) + def fetch(%Hash{bytes: bytes}) do + do_fetch(:ets.lookup(table_name(), bytes)) end defp do_fetch([{_, result}]), do: result diff --git a/apps/explorer/lib/explorer/counters/token_transfer_counter.ex b/apps/explorer/lib/explorer/counters/token_transfer_counter.ex index fd2826ab14..e165023ed5 100644 --- a/apps/explorer/lib/explorer/counters/token_transfer_counter.ex +++ b/apps/explorer/lib/explorer/counters/token_transfer_counter.ex @@ -51,11 +51,9 @@ defmodule Explorer.Counters.TokenTransferCounter do Consolidates the number of token transfers grouped by token. """ def consolidate do - total_token_transfers = TokenTransfer.count_token_transfers() - - for {token_hash, total} <- total_token_transfers do + TokenTransfer.each_count(fn {token_hash, total} -> insert_or_update_counter(token_hash, total) - end + end) end @doc """ diff --git a/apps/explorer/lib/explorer/repo.ex b/apps/explorer/lib/explorer/repo.ex index e1bde3b5e8..e5fb373f3f 100644 --- a/apps/explorer/lib/explorer/repo.ex +++ b/apps/explorer/lib/explorer/repo.ex @@ -20,7 +20,7 @@ defmodule Explorer.Repo do returning = opts[:returning] elements - |> Enum.chunk_every(1000) + |> Enum.chunk_every(500) |> Enum.reduce({0, []}, fn chunk, {total_count, acc} -> {count, inserted} = try do @@ -65,4 +65,23 @@ defmodule Explorer.Repo do end end) end + + def stream_in_transaction(query, fun) when is_function(fun, 1) do + transaction( + fn -> + query + |> stream(timeout: :infinity) + |> fun.() + end, + timeout: :infinity + ) + end + + def stream_each(query, fun) when is_function(fun, 1) do + stream_in_transaction(query, &Enum.each(&1, fun)) + end + + def stream_reduce(query, initial, reducer) when is_function(reducer, 2) do + stream_in_transaction(query, &Enum.reduce(&1, initial, reducer)) + end end diff --git a/apps/explorer/test/explorer/chain/token_transfer_test.exs b/apps/explorer/test/explorer/chain/token_transfer_test.exs index 0ba6b11b66..fa356396bc 100644 --- a/apps/explorer/test/explorer/chain/token_transfer_test.exs +++ b/apps/explorer/test/explorer/chain/token_transfer_test.exs @@ -146,8 +146,8 @@ defmodule Explorer.Chain.TokenTransferTest do end end - describe "count_token_transfers/0" do - test "returns token transfers grouped by tokens" do + describe "each_count/0" do + test "streams token transfers grouped by tokens" do token_contract_address = insert(:contract_address) token = insert(:token, contract_address: token_contract_address) @@ -172,7 +172,11 @@ defmodule Explorer.Chain.TokenTransferTest do token: token ) - results = TokenTransfer.count_token_transfers() + {:ok, agent_pid} = Agent.start_link(fn -> [] end) + + TokenTransfer.each_count(fn entry -> Agent.update(agent_pid, &[entry | &1]) end) + + results = Agent.get(agent_pid, fn entries -> Enum.reverse(entries) end) assert length(results) == 1 assert List.first(results) == {token.contract_address_hash, 2} diff --git a/apps/explorer/test/explorer/chain_test.exs b/apps/explorer/test/explorer/chain_test.exs index 18e3ac6b65..a532b7a2e2 100644 --- a/apps/explorer/test/explorer/chain_test.exs +++ b/apps/explorer/test/explorer/chain_test.exs @@ -1240,13 +1240,17 @@ defmodule Explorer.ChainTest do end end - describe "group_block_validations_by_address/0" do - test "returns block validations grouped by the address that validated them (`address_hash`)" do + describe "each_address_block_validation_count/0" do + test "streams block validation count grouped by the address that validated them (`address_hash`)" do address = insert(:address) insert(:block, miner: address, miner_hash: address.hash) - results = Chain.group_block_validations_by_address() + {:ok, agent_pid} = Agent.start_link(fn -> [] end) + + Chain.each_address_block_validation_count(fn entry -> Agent.update(agent_pid, &[entry | &1]) end) + + results = Agent.get(agent_pid, &Enum.reverse/1) assert length(results) == 1 assert results == [{address.hash, 1}]