From 47032c064e805c94364efbf5a36e4c0dc3fe2892 Mon Sep 17 00:00:00 2001 From: zachdaniel Date: Tue, 19 Feb 2019 14:42:54 -0500 Subject: [PATCH] feat: optimize/rework `init` for replaced transactions fetcher --- apps/explorer/lib/explorer/chain.ex | 63 +++++++++++++ .../chain/import/runner/transactions.ex | 2 +- .../indexer/replaced_transaction/fetcher.ex | 26 +++++- .../replaced_transaction/fetcher_test.exs | 89 +++++++++++++++++++ 4 files changed, 175 insertions(+), 5 deletions(-) diff --git a/apps/explorer/lib/explorer/chain.ex b/apps/explorer/lib/explorer/chain.ex index 2f6e2f071b..cd7b970a86 100644 --- a/apps/explorer/lib/explorer/chain.ex +++ b/apps/explorer/lib/explorer/chain.ex @@ -11,6 +11,7 @@ defmodule Explorer.Chain do order_by: 2, order_by: 3, preload: 2, + select: 2, subquery: 1, union_all: 2, where: 2, @@ -1271,6 +1272,37 @@ defmodule Explorer.Chain do Repo.stream_reduce(query, initial, reducer) end + @spec stream_pending_transactions( + fields :: [ + :block_hash + | :internal_transactions_indexed_at + | :created_contract_code_indexed_at + | :from_address_hash + | :gas + | :gas_price + | :hash + | :index + | :input + | :nonce + | :r + | :s + | :to_address_hash + | :v + | :value + ], + initial :: accumulator, + reducer :: (entry :: term(), accumulator -> accumulator) + ) :: {:ok, accumulator} + when accumulator: term() + def stream_pending_transactions(fields, initial, reducer) when is_function(reducer, 2) do + query = + Transaction + |> pending_transactions_query() + |> select(^fields) + + Repo.stream_reduce(query, initial, reducer) + end + @doc """ Returns a stream of all `t:Explorer.Chain.Block.t/0` `hash`es that are marked as unfetched in `t:Explorer.Chain.Block.SecondDegreeRelation.t/0`. @@ -2265,6 +2297,37 @@ defmodule Explorer.Chain do |> Repo.all() end + @spec find_and_update_replaced_transactions([ + %{ + required(:nonce) => non_neg_integer, + required(:from_address_hash) => Hash.Address.t(), + required(:hash) => Hash.t() + } + ]) :: {integer(), nil | [term()]} + def find_and_update_replaced_transactions(transactions, timeout \\ :infinity) do + query = + Enum.reduce(transactions, Transaction, fn %{hash: hash, nonce: nonce, from_address_hash: from_address_hash}, + query -> + from(t in query, + or_where: + t.nonce == ^nonce and t.from_address_hash == ^from_address_hash and t.hash != ^hash and + not is_nil(t.block_number) + ) + end) + + hashes = Enum.map(transactions, & &1.hash) + + transactions_to_update = + from(pending in Transaction, + join: duplicate in subquery(query), + on: duplicate.nonce == pending.nonce, + on: duplicate.from_address_hash == pending.from_address_hash, + where: pending.hash in ^hashes + ) + + Repo.update_all(transactions_to_update, [set: [error: "dropped/replaced", status: :error]], timeout: timeout) + end + @spec update_replaced_transactions([ %{ required(:nonce) => non_neg_integer, diff --git a/apps/explorer/lib/explorer/chain/import/runner/transactions.ex b/apps/explorer/lib/explorer/chain/import/runner/transactions.ex index e155a84e23..f652309c4d 100644 --- a/apps/explorer/lib/explorer/chain/import/runner/transactions.ex +++ b/apps/explorer/lib/explorer/chain/import/runner/transactions.ex @@ -86,7 +86,7 @@ defmodule Explorer.Chain.Import.Runner.Transactions do conflict_target: :hash, on_conflict: on_conflict, for: Transaction, - returning: ~w(block_number index hash internal_transactions_indexed_at)a, + returning: ~w(block_number index hash internal_transactions_indexed_at block_hash nonce from_address_hash)a, timeout: timeout, timestamps: timestamps ) diff --git a/apps/indexer/lib/indexer/replaced_transaction/fetcher.ex b/apps/indexer/lib/indexer/replaced_transaction/fetcher.ex index 9a647e51b9..803c0f275a 100644 --- a/apps/indexer/lib/indexer/replaced_transaction/fetcher.ex +++ b/apps/indexer/lib/indexer/replaced_transaction/fetcher.ex @@ -53,12 +53,12 @@ defmodule Indexer.ReplacedTransaction.Fetcher do @impl BufferedTask def init(initial, reducer, _) do {:ok, final} = - Chain.stream_mined_transactions( - [:block_hash, :nonce, :from_address_hash], + [:block_hash, :nonce, :from_address_hash, :hash] + |> Chain.stream_pending_transactions( initial, fn transaction_fields, acc -> transaction_fields - |> entry() + |> pending_entry() |> reducer.(acc) end ) @@ -75,6 +75,10 @@ defmodule Indexer.ReplacedTransaction.Fetcher do {block_hash_bytes, nonce, from_address_hash_bytes} end + defp pending_entry(%{hash: %Hash{bytes: hash}, nonce: nonce, from_address_hash: %Hash{bytes: from_address_hash_bytes}}) do + {:pending, nonce, from_address_hash_bytes, hash} + end + defp params({block_hash_bytes, nonce, from_address_hash_bytes}) when is_integer(nonce) do {:ok, from_address_hash} = Hash.Address.cast(from_address_hash_bytes) @@ -83,6 +87,10 @@ defmodule Indexer.ReplacedTransaction.Fetcher do %{nonce: nonce, from_address_hash: from_address_hash, block_hash: block_hash} end + defp pending_params({:pending, nonce, from_address_hash, hash}) do + %{nonce: nonce, from_address_hash: from_address_hash, hash: hash} + end + @impl BufferedTask @decorate trace( name: "fetch", @@ -94,7 +102,17 @@ defmodule Indexer.ReplacedTransaction.Fetcher do Logger.debug("fetching replaced transactions for transactions") try do - entries + {pending, realtime} = + entries + |> Enum.split_with(fn entry -> + match?({:pending, _, _, _}, entry) + end) + + pending + |> Enum.map(&pending_params/1) + |> Chain.find_and_update_replaced_transactions() + + realtime |> Enum.map(¶ms/1) |> Chain.update_replaced_transactions() diff --git a/apps/indexer/test/indexer/replaced_transaction/fetcher_test.exs b/apps/indexer/test/indexer/replaced_transaction/fetcher_test.exs index 925aacd9df..8b2b61b80e 100644 --- a/apps/indexer/test/indexer/replaced_transaction/fetcher_test.exs +++ b/apps/indexer/test/indexer/replaced_transaction/fetcher_test.exs @@ -95,6 +95,95 @@ defmodule Indexer.ReplacedTransaction.FetcherTest do assert %Transaction{error: nil, status: nil} = Repo.one!(from(t in Transaction, where: t.hash == ^second_replaced_transaction_hash)) end + + test "updates a replaced transaction on init" do + replaced_transaction_hash = "0x2a263224a95275d77bc30a7e131bc64d948777946a790c0915ab293791fbcb61" + + address = insert(:address, hash: "0xb7cffe2ac19b9d5705a24cbe14fef5663af905a6") + + insert(:transaction, + from_address: address, + nonce: 1, + block_hash: nil, + index: nil, + block_number: nil, + hash: replaced_transaction_hash + ) + + mined_transaction_hash = "0x1a263224a95275d77bc30a7e131bc64d948777946a790c0915ab293791fbcb61" + block = insert(:block) + + mined_transaction = + insert(:transaction, + from_address: address, + nonce: 1, + index: 0, + block_hash: block.hash, + block_number: block.number, + cumulative_gas_used: 1, + gas_used: 1, + hash: mined_transaction_hash + ) + + second_mined_transaction_hash = "0x3a263224a95275d77bc30a7e131bc64d948777946a790c0915ab293791fbcb61" + second_block = insert(:block) + + insert(:transaction, + from_address: address, + nonce: 1, + index: 0, + block_hash: second_block.hash, + block_number: second_block.number, + cumulative_gas_used: 1, + gas_used: 1, + hash: second_mined_transaction_hash + ) + + second_replaced_transaction_hash = "0x7a263224a95275d77bc30a7e131bc64d948777946a790c0915ab293791fbcb61" + second_address = insert(:address, hash: "0xc7cffe2ac19b9d5705a24cbe14fef5663af905a6") + + insert(:transaction, + from_address: second_address, + nonce: 1, + block_hash: nil, + index: nil, + block_number: nil, + hash: second_replaced_transaction_hash + ) + + insert(:transaction, + from_address: mined_transaction.from_address, + nonce: mined_transaction.nonce + ) + |> with_block(block) + + ReplacedTransaction.Supervisor.Case.start_supervised!() + + # assert :ok = + # ReplacedTransaction.Fetcher.async_fetch([ + # %{ + # block_hash: mined_transaction.block_hash, + # nonce: mined_transaction.nonce, + # from_address_hash: mined_transaction.from_address_hash + # } + # ]) + + found_replaced_transaction = + wait(fn -> + Repo.one!(from(t in Transaction, where: t.hash == ^replaced_transaction_hash and t.status == ^:error)) + end) + + assert found_replaced_transaction.error == "dropped/replaced" + + assert %Transaction{error: nil, status: nil} = + Repo.one!(from(t in Transaction, where: t.hash == ^mined_transaction_hash)) + + assert %Transaction{error: nil, status: nil} = + Repo.one!(from(t in Transaction, where: t.hash == ^second_mined_transaction_hash)) + + assert %Transaction{error: nil, status: nil} = + Repo.one!(from(t in Transaction, where: t.hash == ^second_replaced_transaction_hash)) + end end defp wait(producer) do