Merge pull request #2283 from poanetwork/pp-transactions-cache

Add transactions cache
pull/2290/head
Victor Baranov 5 years ago committed by GitHub
commit f4088b0e6a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 2
      apps/block_scout_web/lib/block_scout_web/controllers/recent_transactions_controller.ex
  3. 3
      apps/block_scout_web/test/support/conn_case.ex
  4. 3
      apps/block_scout_web/test/support/feature_case.ex
  5. 14
      apps/explorer/lib/explorer/application.ex
  6. 23
      apps/explorer/lib/explorer/chain.ex
  7. 3
      apps/explorer/lib/explorer/chain/import/runner/transactions.ex
  8. 143
      apps/explorer/lib/explorer/chain/transactions_cache.ex
  9. 95
      apps/explorer/test/explorer/chain/transactions_cache_test.exs
  10. 2
      apps/explorer/test/support/data_case.ex
  11. 7
      apps/indexer/lib/indexer/block/fetcher.ex

@ -1,5 +1,6 @@
## Current
### Features
- [#2283](https://github.com/poanetwork/blockscout/pull/2283) - Add transactions cache
- [#2182](https://github.com/poanetwork/blockscout/pull/2182) - add market history cache
- [#2109](https://github.com/poanetwork/blockscout/pull/2109) - use bigger updates instead of `Multi` transactions in BlocksTransactionsMismatch
- [#2075](https://github.com/poanetwork/blockscout/pull/2075) - add blocks cache

@ -12,7 +12,7 @@ defmodule BlockScoutWeb.RecentTransactionsController do
necessity_by_association: %{
:block => :required,
[created_contract_address: :names] => :optional,
[from_address: :names] => :required,
[from_address: :names] => :optional,
[to_address: :names] => :optional
},
paging_options: %PagingOptions{page_size: 5}

@ -38,6 +38,9 @@ defmodule BlockScoutWeb.ConnCase do
Ecto.Adapters.SQL.Sandbox.mode(Explorer.Repo, {:shared, self()})
end
Supervisor.terminate_child(Explorer.Supervisor, {ConCache, Explorer.Chain.TransactionsCache.cache_name()})
Supervisor.restart_child(Explorer.Supervisor, {ConCache, Explorer.Chain.TransactionsCache.cache_name()})
{:ok, conn: Phoenix.ConnTest.build_conn()}
end
end

@ -27,6 +27,9 @@ defmodule BlockScoutWeb.FeatureCase do
Ecto.Adapters.SQL.Sandbox.mode(Explorer.Repo, {:shared, self()})
end
Supervisor.terminate_child(Explorer.Supervisor, {ConCache, Explorer.Chain.TransactionsCache.cache_name()})
Supervisor.restart_child(Explorer.Supervisor, {ConCache, Explorer.Chain.TransactionsCache.cache_name()})
metadata = Phoenix.Ecto.SQL.Sandbox.metadata_for(Explorer.Repo, self())
{:ok, session} = Wallaby.start_session(metadata: metadata)
session = Wallaby.Browser.resize_window(session, 1200, 800)

@ -6,7 +6,16 @@ defmodule Explorer.Application do
use Application
alias Explorer.Admin
alias Explorer.Chain.{BlockCountCache, BlockNumberCache, BlocksCache, NetVersionCache, TransactionCountCache}
alias Explorer.Chain.{
BlockCountCache,
BlockNumberCache,
BlocksCache,
NetVersionCache,
TransactionCountCache,
TransactionsCache
}
alias Explorer.Market.MarketHistoryCache
alias Explorer.Repo.PrometheusLogger
@ -34,7 +43,8 @@ defmodule Explorer.Application do
{BlockCountCache, []},
con_cache_child_spec(BlocksCache.cache_name()),
con_cache_child_spec(NetVersionCache.cache_name()),
con_cache_child_spec(MarketHistoryCache.cache_name())
con_cache_child_spec(MarketHistoryCache.cache_name()),
con_cache_child_spec(TransactionsCache.cache_name())
]
children = base_children ++ configurable_children()

@ -46,6 +46,7 @@ defmodule Explorer.Chain do
TokenTransfer,
Transaction,
TransactionCountCache,
TransactionsCache,
Wei
}
@ -1948,9 +1949,27 @@ defmodule Explorer.Chain do
@spec recent_collated_transactions([paging_options | necessity_by_association_option]) :: [Transaction.t()]
def recent_collated_transactions(options \\ []) when is_list(options) do
necessity_by_association = Keyword.get(options, :necessity_by_association, %{})
paging_options = Keyword.get(options, :paging_options, @default_paging_options)
options
|> Keyword.get(:paging_options, @default_paging_options)
if is_nil(paging_options.key) do
paging_options.page_size
|> TransactionsCache.take_enough()
|> case do
nil ->
transactions = fetch_recent_collated_transactions(paging_options, necessity_by_association)
TransactionsCache.update(transactions)
transactions
transactions ->
transactions
end
else
fetch_recent_collated_transactions(paging_options, necessity_by_association)
end
end
def fetch_recent_collated_transactions(paging_options, necessity_by_association) do
paging_options
|> fetch_transactions()
|> where([transaction], not is_nil(transaction.block_number) and not is_nil(transaction.index))
|> join_associations(necessity_by_association)

@ -90,8 +90,7 @@ defmodule Explorer.Chain.Import.Runner.Transactions do
conflict_target: :hash,
on_conflict: on_conflict,
for: Transaction,
returning:
~w(block_number index hash internal_transactions_indexed_at block_hash old_block_hash nonce from_address_hash created_contract_address_hash)a,
returning: true,
timeout: timeout,
timestamps: timestamps
)

@ -0,0 +1,143 @@
defmodule Explorer.Chain.TransactionsCache do
@moduledoc """
Caches the latest imported transactions
"""
alias Explorer.Chain.Transaction
alias Explorer.Repo
@transactions_ids_key "transactions_ids"
@cache_name :transactions
@max_size 51
@preloads [
:block,
created_contract_address: :names,
from_address: :names,
to_address: :names,
token_transfers: :token,
token_transfers: :from_address,
token_transfers: :to_address
]
@spec cache_name :: atom()
def cache_name, do: @cache_name
@doc """
Fetches a transaction from its id ({block_number, index}), returns nil if not found
"""
@spec get({non_neg_integer(), non_neg_integer()}) :: Transaction.t() | nil
def get(id), do: ConCache.get(@cache_name, id)
@doc """
Return the current number of transactions stored
"""
@spec size :: non_neg_integer()
def size, do: Enum.count(transactions_ids())
@doc """
Checks if there are enough transactions stored
"""
@spec enough?(non_neg_integer()) :: boolean()
def enough?(amount) do
amount <= size()
end
@doc """
Checks if the number of transactions stored is already the max allowed
"""
@spec full? :: boolean()
def full? do
@max_size <= size()
end
@doc "Returns the list ids of the transactions currently stored"
@spec transactions_ids :: [{non_neg_integer(), non_neg_integer()}]
def transactions_ids do
ConCache.get(@cache_name, @transactions_ids_key) || []
end
@doc "Returns all the stored transactions"
@spec all :: [Transaction.t()]
def all, do: Enum.map(transactions_ids(), &get(&1))
@doc "Returns the `n` most recent transactions stored"
@spec take(integer()) :: [Transaction.t()]
def take(amount) do
transactions_ids()
|> Enum.take(amount)
|> Enum.map(&get(&1))
end
@doc """
Returns the `n` most recent transactions, unless there are not as many stored,
in which case returns `nil`
"""
@spec take_enough(integer()) :: [Transaction.t()] | nil
def take_enough(amount) do
if enough?(amount), do: take(amount)
end
@doc """
Adds a transaction (or a list of transactions).
If the cache is already full, the transaction will be only stored if it can take
the place of a less recent one.
NOTE: each transaction is inserted atomically
"""
@spec update([Transaction.t()] | Transaction.t() | nil) :: :ok
def update(transactions) when is_nil(transactions), do: :ok
def update(transactions) when is_list(transactions) do
Enum.map(transactions, &update(&1))
end
def update(transaction) do
ConCache.isolated(@cache_name, @transactions_ids_key, fn ->
transaction_id = {transaction.block_number, transaction.index}
ids = transactions_ids()
if full?() do
{init, [min]} = Enum.split(ids, -1)
cond do
transaction_id < min ->
:ok
transaction_id > min ->
insert_transaction(transaction_id, transaction, init)
ConCache.delete(@cache_name, min)
transaction_id == min ->
put_transaction(transaction_id, transaction)
end
else
insert_transaction(transaction_id, transaction, ids)
end
end)
end
defp insert_transaction(transaction_id, transaction, ids) do
put_transaction(transaction_id, transaction)
ConCache.put(@cache_name, @transactions_ids_key, insert_sorted(transaction_id, ids))
end
defp put_transaction(transaction_id, transaction) do
full_transaction = Repo.preload(transaction, @preloads)
ConCache.put(@cache_name, transaction_id, full_transaction)
end
defp insert_sorted(id, ids) do
case ids do
[] ->
[id]
[head | tail] ->
cond do
head > id -> [head | insert_sorted(id, tail)]
head < id -> [id | ids]
head == id -> ids
end
end
end
end

@ -0,0 +1,95 @@
defmodule Explorer.Chain.TransactionsCacheTest do
use Explorer.DataCase
alias Explorer.Chain.TransactionsCache
alias Explorer.Repo
@size 51
describe "update/1" do
test "adds a new value to a new cache with preloads" do
transaction = insert(:transaction) |> preload_all()
TransactionsCache.update(transaction)
assert TransactionsCache.take(1) == [transaction]
end
test "adds several elements, removing the oldest when necessary" do
transactions =
1..@size
|> Enum.map(fn n ->
block = insert(:block, number: n)
insert(:transaction) |> with_block(block)
end)
TransactionsCache.update(transactions)
assert TransactionsCache.all() == Enum.reverse(preload_all(transactions))
more_transactions =
(@size + 1)..(@size + 10)
|> Enum.map(fn n ->
block = insert(:block, number: n)
insert(:transaction) |> with_block(block)
end)
TransactionsCache.update(more_transactions)
kept_transactions =
Enum.reverse(transactions ++ more_transactions)
|> Enum.take(@size)
|> preload_all()
assert TransactionsCache.take(@size) == kept_transactions
end
test "does not add a transaction too old when full" do
transactions =
10..(@size + 9)
|> Enum.map(fn n ->
block = insert(:block, number: n)
insert(:transaction) |> with_block(block)
end)
TransactionsCache.update(transactions)
loaded_transactions = Enum.reverse(preload_all(transactions))
assert TransactionsCache.all() == loaded_transactions
block = insert(:block, number: 1)
insert(:transaction) |> with_block(block) |> TransactionsCache.update()
assert TransactionsCache.all() == loaded_transactions
end
test "adds intermediate transactions" do
blocks = 1..10 |> Map.new(fn n -> {n, insert(:block, number: n)} end)
insert(:transaction) |> with_block(blocks[1]) |> TransactionsCache.update()
insert(:transaction) |> with_block(blocks[10]) |> TransactionsCache.update()
assert TransactionsCache.size() == 2
insert(:transaction) |> with_block(blocks[5]) |> TransactionsCache.update()
assert TransactionsCache.size() == 3
end
end
defp preload_all(transactions) when is_list(transactions) do
Enum.map(transactions, &preload_all(&1))
end
defp preload_all(transaction) do
Repo.preload(transaction, [
:block,
created_contract_address: :names,
from_address: :names,
to_address: :names,
token_transfers: :token,
token_transfers: :from_address,
token_transfers: :to_address
])
end
end

@ -42,6 +42,8 @@ defmodule Explorer.DataCase do
Explorer.Chain.BlockNumberCache.setup()
Supervisor.terminate_child(Explorer.Supervisor, {ConCache, Explorer.Chain.BlocksCache.cache_name()})
Supervisor.restart_child(Explorer.Supervisor, {ConCache, Explorer.Chain.BlocksCache.cache_name()})
Supervisor.terminate_child(Explorer.Supervisor, {ConCache, Explorer.Chain.TransactionsCache.cache_name()})
Supervisor.restart_child(Explorer.Supervisor, {ConCache, Explorer.Chain.TransactionsCache.cache_name()})
:ok
end

@ -11,7 +11,7 @@ defmodule Indexer.Block.Fetcher do
alias EthereumJSONRPC.{Blocks, FetchedBeneficiaries}
alias Explorer.Chain
alias Explorer.Chain.{Address, Block, BlockNumberCache, BlocksCache, Hash, Import, Transaction}
alias Explorer.Chain.{Address, Block, BlockNumberCache, BlocksCache, Hash, Import, Transaction, TransactionsCache}
alias Indexer.Block.Fetcher.Receipts
alias Indexer.Fetcher.{
@ -173,6 +173,7 @@ defmodule Indexer.Block.Fetcher do
) do
result = {:ok, %{inserted: inserted, errors: blocks_errors}}
update_block_cache(inserted[:blocks])
update_transactions_cache(inserted[:transactions])
result
else
{step, {:error, reason}} -> {:error, {step, reason}}
@ -189,6 +190,10 @@ defmodule Indexer.Block.Fetcher do
BlocksCache.update_blocks(blocks)
end
defp update_transactions_cache(transactions) do
TransactionsCache.update(transactions)
end
def import(
%__MODULE__{broadcast: broadcast, callback_module: callback_module} = state,
options

Loading…
Cancel
Save