Use COUNT(*) for count_token_transfers_from_token_hash

By using `COUNT(*)` instead of `COUNT(column)`, Postgres optimizes the
count to use the row count recorded in the index.  The ETS counter
appeared necessary because in 95de1ebcb0,
`COUNT(id)` was being used
(95de1ebcb0/apps/explorer/lib/explorer/chain/token_transfer.ex (L138)),
which seems reasonable in Ecto as you need some column to count, but in
Postgres, it is more efficient to count everything with `*` as it implies
"count rows".  `COUNT(column)` requires Postgres check if any of the values are
`NULL` and it does not optimize the query even if the column is `NOT NULL` in
most cases like when the `id` was used.

Testing on eth60-test database, `COUNT(*)` took 62ms while
`COUNT(column)` (in this case `amount` since the table no longer has
`id) took 16s demonstrating the vast speed improvement of `*` counting.

```
sql> SELECT COUNT(*)
     FROM token_transfers
     WHERE token_contract_address_hash = (
     SELECT token_contract_address_hash
     FROM token_transfers
     LIMIT 1)
[2018-12-21 07:51:02] 1 row retrieved starting from 1 in 70 ms
(execution: 62 ms, fetching: 8 ms)
sql> SELECT COUNT(amount)
     FROM token_transfers
     WHERE token_contract_address_hash = (
     SELECT token_contract_address_hash
     FROM token_transfers
     LIMIT 1)
[2018-12-21 07:51:43] 1 row retrieved starting from 1 in 16 s 862 ms
(execution: 16 s 855 ms, fetching: 7 ms)
```
pull/1275/head
Luke Imhoff 6 years ago
parent 5999db7fab
commit f67241cfc0
  1. 5
      apps/block_scout_web/test/block_scout_web/controllers/tokens/holder_controller_test.exs
  2. 14
      apps/block_scout_web/test/block_scout_web/controllers/tokens/inventory_controller_test.exs
  3. 5
      apps/block_scout_web/test/block_scout_web/controllers/tokens/read_contract_controller_test.exs
  4. 5
      apps/block_scout_web/test/block_scout_web/features/viewing_tokens_test.exs
  5. 2
      apps/explorer/config/config.exs
  6. 2
      apps/explorer/config/test.exs
  7. 1
      apps/explorer/lib/explorer/application.ex
  8. 5
      apps/explorer/lib/explorer/chain.ex
  9. 28
      apps/explorer/lib/explorer/chain/token_transfer.ex
  10. 114
      apps/explorer/lib/explorer/counters/token_transfer_counter.ex
  11. 37
      apps/explorer/test/explorer/chain/token_transfer_test.exs
  12. 56
      apps/explorer/test/explorer/counters/token_transfer_counter_test.exs

@ -4,7 +4,7 @@ defmodule BlockScoutWeb.Tokens.HolderControllerTest do
async: false
alias Explorer.Chain.Hash
alias Explorer.Counters.{TokenHoldersCounter, TokenTransferCounter}
alias Explorer.Counters.TokenHoldersCounter
describe "GET index/3" do
test "with invalid address hash", %{conn: conn} do
@ -32,9 +32,6 @@ defmodule BlockScoutWeb.Tokens.HolderControllerTest do
start_supervised!(TokenHoldersCounter)
TokenHoldersCounter.consolidate()
start_supervised!(TokenTransferCounter)
TokenTransferCounter.consolidate()
conn =
get(
conn,

@ -3,7 +3,7 @@ defmodule BlockScoutWeb.Tokens.InventoryControllerTest do
# ETS table is shared in `Explorer.Counters.*`
async: false
alias Explorer.Counters.{TokenHoldersCounter, TokenTransferCounter}
alias Explorer.Counters.TokenHoldersCounter
describe "GET index/3" do
test "with invalid address hash", %{conn: conn} do
@ -38,9 +38,6 @@ defmodule BlockScoutWeb.Tokens.InventoryControllerTest do
start_supervised!(TokenHoldersCounter)
TokenHoldersCounter.consolidate()
start_supervised!(TokenTransferCounter)
TokenTransferCounter.consolidate()
conn =
get(
conn,
@ -73,9 +70,6 @@ defmodule BlockScoutWeb.Tokens.InventoryControllerTest do
start_supervised!(TokenHoldersCounter)
TokenHoldersCounter.consolidate()
start_supervised!(TokenTransferCounter)
TokenTransferCounter.consolidate()
conn =
get(conn, token_inventory_path(conn, :index, token.contract_address_hash), %{
"token_id" => "999"
@ -111,9 +105,6 @@ defmodule BlockScoutWeb.Tokens.InventoryControllerTest do
start_supervised!(TokenHoldersCounter)
TokenHoldersCounter.consolidate()
start_supervised!(TokenTransferCounter)
TokenTransferCounter.consolidate()
conn = get(conn, token_inventory_path(conn, :index, token.contract_address_hash))
assert conn.assigns.next_page_params == expected_next_page_params
@ -138,9 +129,6 @@ defmodule BlockScoutWeb.Tokens.InventoryControllerTest do
start_supervised!(TokenHoldersCounter)
TokenHoldersCounter.consolidate()
start_supervised!(TokenTransferCounter)
TokenTransferCounter.consolidate()
conn = get(conn, token_inventory_path(conn, :index, token.contract_address_hash))
refute conn.assigns.next_page_params

@ -3,7 +3,7 @@ defmodule BlockScoutWeb.Tokens.ReadContractControllerTest do
# ETS tables are shared in `Explorer.Counters.*`
async: false
alias Explorer.Counters.{TokenHoldersCounter, TokenTransferCounter}
alias Explorer.Counters.TokenHoldersCounter
describe "GET index/3" do
test "with invalid address hash", %{conn: conn} do
@ -33,9 +33,6 @@ defmodule BlockScoutWeb.Tokens.ReadContractControllerTest do
start_supervised!(TokenHoldersCounter)
TokenHoldersCounter.consolidate()
start_supervised!(TokenTransferCounter)
TokenTransferCounter.consolidate()
conn = get(conn, token_read_contract_path(BlockScoutWeb.Endpoint, :index, token.contract_address_hash))
assert html_response(conn, 200)

@ -1,7 +1,7 @@
defmodule BlockScoutWeb.ViewingTokensTest do
use BlockScoutWeb.FeatureCase, async: true
alias Explorer.Counters.{TokenHoldersCounter, TokenTransferCounter}
alias Explorer.Counters.TokenHoldersCounter
alias BlockScoutWeb.TokenPage
describe "viewing token holders" do
@ -17,9 +17,6 @@ defmodule BlockScoutWeb.ViewingTokensTest do
start_supervised!(TokenHoldersCounter)
TokenHoldersCounter.consolidate()
start_supervised!(TokenTransferCounter)
TokenTransferCounter.consolidate()
session
|> TokenPage.visit_page(token.contract_address)
|> assert_has(TokenPage.token_holders(count: 2))

@ -15,8 +15,6 @@ config :explorer, Explorer.Counters.AddressesWithBalanceCounter, enabled: true,
config :explorer, Explorer.Counters.TokenHoldersCounter, enabled: true, enable_consolidation: true
config :explorer, Explorer.Counters.TokenTransferCounter, enabled: true, enable_consolidation: true
config :explorer, Explorer.Counters.BlockValidationCounter, enabled: true, enable_consolidation: true
config :explorer, Explorer.ExchangeRates, enabled: true, store: :ets

@ -21,8 +21,6 @@ config :explorer, Explorer.Counters.BlockValidationCounter, enabled: false, enab
config :explorer, Explorer.Counters.TokenHoldersCounter, enabled: false, enable_consolidation: false
config :explorer, Explorer.Counters.TokenTransferCounter, enabled: false, enable_consolidation: false
config :explorer, Explorer.Market.History.Cataloger, enabled: false
config :explorer, Explorer.Tracer, disabled?: false

@ -36,7 +36,6 @@ defmodule Explorer.Application do
configure(Explorer.ExchangeRates),
configure(Explorer.Market.History.Cataloger),
configure(Explorer.Counters.TokenHoldersCounter),
configure(Explorer.Counters.TokenTransferCounter),
configure(Explorer.Counters.BlockValidationCounter),
configure(Explorer.Counters.AddressesWithBalanceCounter),
configure(Explorer.Validator.MetadataProcessor)

@ -45,8 +45,7 @@ defmodule Explorer.Chain do
alias Explorer.Counters.{
AddressesWithBalanceCounter,
BlockValidationCounter,
TokenHoldersCounter,
TokenTransferCounter
TokenHoldersCounter
}
alias Dataloader.Ecto, as: DataloaderEcto
@ -2012,7 +2011,7 @@ defmodule Explorer.Chain do
@spec count_token_transfers_from_token_hash(Hash.t()) :: non_neg_integer()
def count_token_transfers_from_token_hash(token_address_hash) do
TokenTransferCounter.fetch(token_address_hash)
TokenTransfer.count_token_transfers_from_token_hash(token_address_hash)
end
@spec transaction_has_token_transfers?(Hash.t()) :: boolean()

@ -133,6 +133,18 @@ defmodule Explorer.Chain.TokenTransfer do
|> Repo.all()
end
@spec count_token_transfers_from_token_hash(Hash.t()) :: non_neg_integer()
def count_token_transfers_from_token_hash(token_address_hash) do
query =
from(
tt in TokenTransfer,
where: tt.token_contract_address_hash == ^token_address_hash,
select: fragment("COUNT(*)")
)
Repo.one(query)
end
def page_token_transfer(query, %PagingOptions{key: nil}), do: query
def page_token_transfer(query, %PagingOptions{key: {token_id}}) do
@ -272,20 +284,4 @@ defmodule Explorer.Chain.TokenTransfer do
select: tt
)
end
@doc """
Counts all the token transfers and groups by token contract address hash.
"""
def each_count(fun) when is_function(fun, 1) do
query =
from(
tt in TokenTransfer,
join: t in Token,
on: tt.token_contract_address_hash == t.contract_address_hash,
select: {tt.token_contract_address_hash, fragment("COUNT(*)")},
group_by: tt.token_contract_address_hash
)
Repo.stream_each(query, fun)
end
end

@ -1,114 +0,0 @@
defmodule Explorer.Counters.TokenTransferCounter do
use GenServer
@moduledoc """
Module responsible for fetching and consolidating the number of transfers
from a token.
"""
alias Explorer.Chain
alias Explorer.Chain.{Hash, TokenTransfer}
@table :token_transfer_counter
# It is undesirable to automatically start the consolidation in all environments.
# Consider the test environment: if the consolidation initiates but does not
# finish before a test ends, that test will fail. This way, hundreds of
# tests were failing before disabling the consolidation and the scheduler in
# the test env.
config = Application.get_env(:explorer, Explorer.Counters.TokenHoldersCounter)
@enable_consolidation Keyword.get(config, :enable_consolidation)
@doc """
Returns a boolean that indicates whether consolidation is enabled
In order to choose whether or not to enable the initial consolidation, change the following Explorer config:
`config :explorer, Explorer.Counters.TokenTransferCounter, enable_consolidation: true`
to:
`config :explorer, Explorer.Counters.TokenTransferCounter, enable_consolidation: false`
"""
def enable_consolidation?, do: @enable_consolidation
def table_name do
@table
end
@doc """
Starts a process to continually monitor the token counters.
"""
@spec start_link(term()) :: GenServer.on_start()
def start_link(_) do
GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
end
## Server
@impl true
def init(args) do
create_table()
if enable_consolidation?() do
Task.start_link(&consolidate/0)
end
Chain.subscribe_to_events(:token_transfers)
{:ok, args}
end
def create_table do
opts = [
:set,
:named_table,
:public,
read_concurrency: true,
write_concurrency: true
]
:ets.new(table_name(), opts)
end
@doc """
Consolidates the number of token transfers grouped by token.
"""
def consolidate do
TokenTransfer.each_count(fn {token_hash, total} ->
insert_or_update_counter(token_hash, total)
end)
end
@doc """
Fetches the number of transfers related to a token hash.
"""
@spec fetch(Hash.t()) :: non_neg_integer
def fetch(token_hash) do
do_fetch(:ets.lookup(table_name(), to_string(token_hash)))
end
defp do_fetch([{_, result} | _]), do: result
defp do_fetch([]), do: 0
@impl true
def handle_info({:chain_event, :token_transfers, _type, token_transfers}, state) do
token_transfers
|> Enum.map(& &1.token_contract_address_hash)
|> Enum.each(&insert_or_update_counter(&1, 1))
{:noreply, state}
end
@doc """
Inserts a new item into the `:ets` table.
When the record exist, the counter will be incremented by one. When the
record does not exist, the counter will be inserted with a default value.
"""
@spec insert_or_update_counter(Hash.t(), non_neg_integer) :: term()
def insert_or_update_counter(token_hash, number) do
default = {to_string(token_hash), 0}
:ets.update_counter(table_name(), to_string(token_hash), number, default)
end
end

@ -146,43 +146,6 @@ defmodule Explorer.Chain.TokenTransferTest do
end
end
describe "each_count/0" do
test "streams token transfers grouped by tokens" do
token_contract_address = insert(:contract_address)
token = insert(:token, contract_address: token_contract_address)
transaction =
:transaction
|> insert()
|> with_block()
insert(
:token_transfer,
to_address: build(:address),
transaction: transaction,
token_contract_address: token_contract_address,
token: token
)
insert(
:token_transfer,
to_address: build(:address),
transaction: transaction,
token_contract_address: token_contract_address,
token: token
)
{:ok, agent_pid} = Agent.start_link(fn -> [] end)
TokenTransfer.each_count(fn entry -> Agent.update(agent_pid, &[entry | &1]) end)
results = Agent.get(agent_pid, fn entries -> Enum.reverse(entries) end)
assert length(results) == 1
assert List.first(results) == {token.contract_address_hash, 2}
end
end
describe "address_to_unique_tokens/2" do
test "returns list of unique tokens for a token contract" do
token_contract_address = insert(:contract_address)

@ -1,56 +0,0 @@
defmodule Explorer.Counters.TokenTransferCounterTest do
use Explorer.DataCase
alias Explorer.Counters.TokenTransferCounter
setup do
start_supervised!(TokenTransferCounter)
:ok
end
describe "consolidate/0" do
test "loads the token's transfers consolidate info" do
token_contract_address = insert(:contract_address)
token = insert(:token, contract_address: token_contract_address)
transaction =
:transaction
|> insert()
|> with_block()
insert(
:token_transfer,
to_address: build(:address),
transaction: transaction,
token_contract_address: token_contract_address,
token: token
)
insert(
:token_transfer,
to_address: build(:address),
transaction: transaction,
token_contract_address: token_contract_address,
token: token
)
TokenTransferCounter.consolidate()
assert TokenTransferCounter.fetch(token.contract_address_hash) == 2
end
end
describe "fetch/1" do
test "fetches the total token transfers by token contract address hash" do
token_contract_address = insert(:contract_address)
token = insert(:token, contract_address: token_contract_address)
assert TokenTransferCounter.fetch(token.contract_address_hash) == 0
TokenTransferCounter.insert_or_update_counter(token.contract_address_hash, 15)
assert TokenTransferCounter.fetch(token.contract_address_hash) == 15
end
end
end
Loading…
Cancel
Save