Merge pull request #1209 from poanetwork/ets-streaming

ETS streaming
pull/1210/head
Luke Imhoff 6 years ago committed by GitHub
commit 02bbd559d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 127
      apps/explorer/lib/explorer/chain.ex
  2. 4
      apps/explorer/lib/explorer/chain/token_transfer.ex
  3. 6
      apps/explorer/lib/explorer/counters/block_validation_counter.ex
  4. 11
      apps/explorer/lib/explorer/counters/token_holders_counter.ex
  5. 6
      apps/explorer/lib/explorer/counters/token_transfer_counter.ex
  6. 21
      apps/explorer/lib/explorer/repo.ex
  7. 10
      apps/explorer/test/explorer/chain/token_transfer_test.exs
  8. 10
      apps/explorer/test/explorer/chain_test.exs

@ -955,7 +955,7 @@ defmodule Explorer.Chain do
@doc """ @doc """
Counts all of the block validations and groups by the `miner_hash`. Counts all of the block validations and groups by the `miner_hash`.
""" """
def group_block_validations_by_address do def each_address_block_validation_count(fun) when is_function(fun, 1) do
query = query =
from( from(
b in Block, b in Block,
@ -965,7 +965,7 @@ defmodule Explorer.Chain do
group_by: b.miner_hash group_by: b.miner_hash
) )
Repo.all(query) Repo.stream_each(query, fun)
end end
@doc """ @doc """
@ -1007,21 +1007,14 @@ defmodule Explorer.Chain do
) :: {:ok, accumulator} ) :: {:ok, accumulator}
when accumulator: term() when accumulator: term()
def stream_unfetched_balances(initial, reducer) when is_function(reducer, 2) do def stream_unfetched_balances(initial, reducer) when is_function(reducer, 2) do
Repo.transaction( query =
fn -> from(
query = balance in CoinBalance,
from( where: is_nil(balance.value_fetched_at),
balance in CoinBalance, select: %{address_hash: balance.address_hash, block_number: balance.block_number}
where: is_nil(balance.value_fetched_at), )
select: %{address_hash: balance.address_hash, block_number: balance.block_number}
)
query Repo.stream_reduce(query, initial, reducer)
|> Repo.stream(timeout: :infinity)
|> Enum.reduce(initial, reducer)
end,
timeout: :infinity
)
end end
@doc """ @doc """
@ -1033,16 +1026,8 @@ defmodule Explorer.Chain do
) :: {:ok, accumulator} ) :: {:ok, accumulator}
when accumulator: term() when accumulator: term()
def stream_unfetched_token_balances(initial, reducer) when is_function(reducer, 2) do def stream_unfetched_token_balances(initial, reducer) when is_function(reducer, 2) do
Repo.transaction( TokenBalance.unfetched_token_balances()
fn -> |> Repo.stream_reduce(initial, reducer)
query = TokenBalance.unfetched_token_balances()
query
|> Repo.stream(timeout: :infinity)
|> Enum.reduce(initial, reducer)
end,
timeout: :infinity
)
end end
@doc """ @doc """
@ -1097,22 +1082,15 @@ defmodule Explorer.Chain do
) :: {:ok, accumulator} ) :: {:ok, accumulator}
when accumulator: term() when accumulator: term()
def stream_transactions_with_unfetched_internal_transactions(fields, initial, reducer) when is_function(reducer, 2) do def stream_transactions_with_unfetched_internal_transactions(fields, initial, reducer) when is_function(reducer, 2) do
Repo.transaction( query =
fn -> from(
query = t in Transaction,
from( # exclude pending transactions
t in Transaction, where: not is_nil(t.block_hash) and is_nil(t.internal_transactions_indexed_at),
# exclude pending transactions select: ^fields
where: not is_nil(t.block_hash) and is_nil(t.internal_transactions_indexed_at), )
select: ^fields
)
query Repo.stream_reduce(query, initial, reducer)
|> Repo.stream(timeout: :infinity)
|> Enum.reduce(initial, reducer)
end,
timeout: :infinity
)
end end
@doc """ @doc """
@ -1129,21 +1107,14 @@ defmodule Explorer.Chain do
) :: {:ok, accumulator} ) :: {:ok, accumulator}
when accumulator: term() when accumulator: term()
def stream_unfetched_uncle_hashes(initial, reducer) when is_function(reducer, 2) do def stream_unfetched_uncle_hashes(initial, reducer) when is_function(reducer, 2) do
Repo.transaction( query =
fn -> from(bsdr in Block.SecondDegreeRelation,
query = where: is_nil(bsdr.uncle_fetched_at),
from(bsdr in Block.SecondDegreeRelation, select: bsdr.uncle_hash,
where: is_nil(bsdr.uncle_fetched_at), group_by: bsdr.uncle_hash
select: bsdr.uncle_hash, )
group_by: bsdr.uncle_hash
)
query Repo.stream_reduce(query, initial, reducer)
|> Repo.stream(timeout: :infinity)
|> Enum.reduce(initial, reducer)
end,
timeout: :infinity
)
end end
@doc """ @doc """
@ -1938,22 +1909,15 @@ defmodule Explorer.Chain do
reducer :: (entry :: Hash.Address.t(), accumulator -> accumulator) reducer :: (entry :: Hash.Address.t(), accumulator -> accumulator)
) :: {:ok, accumulator} ) :: {:ok, accumulator}
when accumulator: term() when accumulator: term()
def stream_uncataloged_token_contract_address_hashes(initial_acc, reducer) when is_function(reducer, 2) do def stream_uncataloged_token_contract_address_hashes(initial, reducer) when is_function(reducer, 2) do
Repo.transaction( query =
fn -> from(
query = token in Token,
from( where: token.cataloged == false,
token in Token, select: token.contract_address_hash
where: token.cataloged == false, )
select: token.contract_address_hash
)
query Repo.stream_reduce(query, initial, reducer)
|> Repo.stream(timeout: :infinity)
|> Enum.reduce(initial_acc, reducer)
end,
timeout: :infinity
)
end end
@doc """ @doc """
@ -1964,16 +1928,10 @@ defmodule Explorer.Chain do
reducer :: (entry :: Hash.Address.t(), accumulator -> accumulator) reducer :: (entry :: Hash.Address.t(), accumulator -> accumulator)
) :: {:ok, accumulator} ) :: {:ok, accumulator}
when accumulator: term() when accumulator: term()
def stream_cataloged_token_contract_address_hashes(initial_acc, reducer) when is_function(reducer, 2) do def stream_cataloged_token_contract_address_hashes(initial, reducer) when is_function(reducer, 2) do
Repo.transaction( Chain.Token.cataloged_tokens()
fn -> |> order_by(asc: :updated_at)
Chain.Token.cataloged_tokens() |> Repo.stream_reduce(initial, reducer)
|> order_by(asc: :updated_at)
|> Repo.stream(timeout: :infinity)
|> Enum.reduce(initial_acc, reducer)
end,
timeout: :infinity
)
end end
@doc """ @doc """
@ -1992,14 +1950,7 @@ defmodule Explorer.Chain do
distinct: t.block_number distinct: t.block_number
) )
Repo.transaction( Repo.stream_reduce(query, [], &[&1 | &2])
fn ->
query
|> Repo.stream(timeout: :infinity)
|> Enum.reduce([], &[&1 | &2])
end,
timeout: :infinity
)
end end
@doc """ @doc """

@ -226,7 +226,7 @@ defmodule Explorer.Chain.TokenTransfer do
@doc """ @doc """
Counts all the token transfers and groups by token contract address hash. Counts all the token transfers and groups by token contract address hash.
""" """
def count_token_transfers do def each_count(fun) when is_function(fun, 1) do
query = query =
from( from(
tt in TokenTransfer, tt in TokenTransfer,
@ -236,6 +236,6 @@ defmodule Explorer.Chain.TokenTransfer do
group_by: tt.token_contract_address_hash group_by: tt.token_contract_address_hash
) )
Repo.all(query) Repo.stream_each(query, fun)
end end
end end

@ -60,11 +60,9 @@ defmodule Explorer.Counters.BlockValidationCounter do
Consolidates the number of block validations grouped by `address_hash`. Consolidates the number of block validations grouped by `address_hash`.
""" """
def consolidate_blocks do def consolidate_blocks do
total_block_validations = Chain.group_block_validations_by_address() Chain.each_address_block_validation_count(fn {address_hash, total} ->
for {address_hash, total} <- total_block_validations do
insert_or_update_counter(address_hash, total) insert_or_update_counter(address_hash, total)
end end)
end end
@doc """ @doc """

@ -6,6 +6,7 @@ defmodule Explorer.Counters.TokenHoldersCounter do
""" """
alias Explorer.Chain.Address.TokenBalance alias Explorer.Chain.Address.TokenBalance
alias Explorer.Chain.Hash
alias Explorer.Repo alias Explorer.Repo
@table :token_holders_counter @table :token_holders_counter
@ -59,11 +60,9 @@ defmodule Explorer.Counters.TokenHoldersCounter do
""" """
def consolidate do def consolidate do
TokenBalance.tokens_grouped_by_number_of_holders() TokenBalance.tokens_grouped_by_number_of_holders()
|> Repo.all() |> Repo.stream_each(fn {%Hash{bytes: bytes}, number_of_holders} ->
|> Enum.map(fn {token, number_of_holders} -> insert_counter({bytes, number_of_holders})
{token.bytes, number_of_holders}
end) end)
|> insert_counter()
end end
defp schedule_next_consolidation do defp schedule_next_consolidation do
@ -76,8 +75,8 @@ defmodule Explorer.Counters.TokenHoldersCounter do
@doc """ @doc """
Fetches the token holders info for a specific token from the `:ets` table. Fetches the token holders info for a specific token from the `:ets` table.
""" """
def fetch(token_hash) do def fetch(%Hash{bytes: bytes}) do
do_fetch(:ets.lookup(table_name(), token_hash.bytes)) do_fetch(:ets.lookup(table_name(), bytes))
end end
defp do_fetch([{_, result}]), do: result defp do_fetch([{_, result}]), do: result

@ -51,11 +51,9 @@ defmodule Explorer.Counters.TokenTransferCounter do
Consolidates the number of token transfers grouped by token. Consolidates the number of token transfers grouped by token.
""" """
def consolidate do def consolidate do
total_token_transfers = TokenTransfer.count_token_transfers() TokenTransfer.each_count(fn {token_hash, total} ->
for {token_hash, total} <- total_token_transfers do
insert_or_update_counter(token_hash, total) insert_or_update_counter(token_hash, total)
end end)
end end
@doc """ @doc """

@ -20,7 +20,7 @@ defmodule Explorer.Repo do
returning = opts[:returning] returning = opts[:returning]
elements elements
|> Enum.chunk_every(1000) |> Enum.chunk_every(500)
|> Enum.reduce({0, []}, fn chunk, {total_count, acc} -> |> Enum.reduce({0, []}, fn chunk, {total_count, acc} ->
{count, inserted} = {count, inserted} =
try do try do
@ -65,4 +65,23 @@ defmodule Explorer.Repo do
end end
end) end)
end end
def stream_in_transaction(query, fun) when is_function(fun, 1) do
transaction(
fn ->
query
|> stream(timeout: :infinity)
|> fun.()
end,
timeout: :infinity
)
end
def stream_each(query, fun) when is_function(fun, 1) do
stream_in_transaction(query, &Enum.each(&1, fun))
end
def stream_reduce(query, initial, reducer) when is_function(reducer, 2) do
stream_in_transaction(query, &Enum.reduce(&1, initial, reducer))
end
end end

@ -146,8 +146,8 @@ defmodule Explorer.Chain.TokenTransferTest do
end end
end end
describe "count_token_transfers/0" do describe "each_count/0" do
test "returns token transfers grouped by tokens" do test "streams token transfers grouped by tokens" do
token_contract_address = insert(:contract_address) token_contract_address = insert(:contract_address)
token = insert(:token, contract_address: token_contract_address) token = insert(:token, contract_address: token_contract_address)
@ -172,7 +172,11 @@ defmodule Explorer.Chain.TokenTransferTest do
token: token token: token
) )
results = TokenTransfer.count_token_transfers() {:ok, agent_pid} = Agent.start_link(fn -> [] end)
TokenTransfer.each_count(fn entry -> Agent.update(agent_pid, &[entry | &1]) end)
results = Agent.get(agent_pid, fn entries -> Enum.reverse(entries) end)
assert length(results) == 1 assert length(results) == 1
assert List.first(results) == {token.contract_address_hash, 2} assert List.first(results) == {token.contract_address_hash, 2}

@ -1240,13 +1240,17 @@ defmodule Explorer.ChainTest do
end end
end end
describe "group_block_validations_by_address/0" do describe "each_address_block_validation_count/0" do
test "returns block validations grouped by the address that validated them (`address_hash`)" do test "streams block validation count grouped by the address that validated them (`address_hash`)" do
address = insert(:address) address = insert(:address)
insert(:block, miner: address, miner_hash: address.hash) insert(:block, miner: address, miner_hash: address.hash)
results = Chain.group_block_validations_by_address() {:ok, agent_pid} = Agent.start_link(fn -> [] end)
Chain.each_address_block_validation_count(fn entry -> Agent.update(agent_pid, &[entry | &1]) end)
results = Agent.get(agent_pid, &Enum.reverse/1)
assert length(results) == 1 assert length(results) == 1
assert results == [{address.hash, 1}] assert results == [{address.hash, 1}]

Loading…
Cancel
Save