From cf78b8cb80f0cf001e1f976c9931395c10df32e4 Mon Sep 17 00:00:00 2001 From: Ayrat Badykov Date: Fri, 25 Jan 2019 12:23:00 +0300 Subject: [PATCH] move replaced transactions update to fetcher --- apps/explorer/lib/explorer/chain.ex | 67 +++++++++++ .../chain/import/runner/transactions.ex | 46 +------- .../test/explorer/chain/import_test.exs | 86 -------------- .../indexer/replaced_transaction/fetcher.ex | 106 ++++++++++++++++++ 4 files changed, 174 insertions(+), 131 deletions(-) create mode 100644 apps/indexer/lib/indexer/replaced_transaction/fetcher.ex diff --git a/apps/explorer/lib/explorer/chain.ex b/apps/explorer/lib/explorer/chain.ex index 78cd45205f..258e1c6a50 100644 --- a/apps/explorer/lib/explorer/chain.ex +++ b/apps/explorer/lib/explorer/chain.ex @@ -1207,6 +1207,38 @@ defmodule Explorer.Chain do Repo.stream_reduce(query, initial, reducer) end + @spec stream_mined_transactions( + fields :: [ + :block_hash + | :internal_transactions_indexed_at + | :created_contract_code_indexed_at + | :from_address_hash + | :gas + | :gas_price + | :hash + | :index + | :input + | :nonce + | :r + | :s + | :to_address_hash + | :v + | :value + ], + initial :: accumulator, + reducer :: (entry :: term(), accumulator -> accumulator) + ) :: {:ok, accumulator} + when accumulator: term() + def stream_mined_transactions(fields, initial, reducer) when is_function(reducer, 2) do + query = + from(t in Transaction, + where: not is_nil(t.block_hash) and not is_nil(t.nonce) and not is_nil(t.from_address_hash), + select: ^fields + ) + + Repo.stream_reduce(query, initial, reducer) + end + @doc """ Returns a stream of all `t:Explorer.Chain.Block.t/0` `hash`es that are marked as unfetched in `t:Explorer.Chain.Block.SecondDegreeRelation.t/0`. @@ -2131,6 +2163,41 @@ defmodule Explorer.Chain do |> Repo.all() end + @spec update_replaced_transactions([ + %{ + required(:nonce) => non_neg_integer, + required(:from_address_hash) => Hash.Address.t(), + required(:block_hash) => Hash.Full.t() + } + ]) :: {integer(), nil | [term()]} + def update_replaced_transactions(transactions, timeout \\ :infinity) do + filters = + transactions + |> Enum.filter(fn transaction -> + transaction.block_hash && transaction.nonce && transaction.from_address_hash + end) + |> Enum.map(fn transaction -> + {transaction.nonce, transaction.from_address_hash} + end) + |> Enum.uniq() + + if Enum.empty?(filters) do + {:ok, []} + else + query = + filters + |> Enum.reduce(from(t in Transaction, where: is_nil(t.block_hash)), fn {nonce, from_address}, query -> + from(t in query, + or_where: t.nonce == ^nonce and t.from_address_hash == ^from_address and is_nil(t.block_hash) + ) + end) + + update_query = from(t in query, update: [set: [status: ^:error, error: "dropped/replaced"]]) + + Repo.update_all(update_query, [], timeout: timeout) + end + end + @doc """ Update a new `t:Token.t/0` record. diff --git a/apps/explorer/lib/explorer/chain/import/runner/transactions.ex b/apps/explorer/lib/explorer/chain/import/runner/transactions.ex index b43785a621..e155a84e23 100644 --- a/apps/explorer/lib/explorer/chain/import/runner/transactions.ex +++ b/apps/explorer/lib/explorer/chain/import/runner/transactions.ex @@ -42,17 +42,9 @@ defmodule Explorer.Chain.Import.Runner.Transactions do |> Map.put(:timestamps, timestamps) |> Map.put(:token_transfer_transaction_hash_set, token_transfer_transaction_hash_set(options)) - transactions_timeout = options[option_key()][:timeout] || timeout() - - update_transactions_options = %{timeout: transactions_timeout} - - multi - |> Multi.run(:transactions, fn repo, _ -> + Multi.run(multi, :transactions, fn repo, _ -> insert(repo, changes_list, insert_options) end) - |> Multi.run(:replaced_transactions, fn repo, _ -> - update_replaced_transactions(repo, changes_list, update_transactions_options) - end) end @impl Import.Runner @@ -186,40 +178,4 @@ defmodule Explorer.Chain.Import.Runner.Transactions do end defp put_internal_transactions_indexed_at?(_, _), do: false - - defp update_replaced_transactions(repo, transactions, %{timeout: timeout}) do - filters = - transactions - |> Enum.filter(fn transaction -> - Map.get(transaction, :block_hash) && Map.get(transaction, :block_number) && Map.get(transaction, :nonce) && - Map.get(transaction, :from_address_hash) - end) - |> Enum.map(fn transaction -> - {transaction.nonce, transaction.from_address_hash} - end) - |> Enum.uniq() - - if Enum.empty?(filters) do - {:ok, []} - else - query = - filters - |> Enum.reduce(from(t in Transaction, where: is_nil(t.block_hash)), fn {nonce, from_address}, query -> - from(t in query, - or_where: t.nonce == ^nonce and t.from_address_hash == ^from_address and is_nil(t.block_hash) - ) - end) - - update_query = from(t in query, update: [set: [status: ^:error, error: "dropped/replaced"]]) - - try do - {_, result} = repo.update_all(update_query, [], timeout: timeout) - - {:ok, result} - rescue - postgrex_error in Postgrex.Error -> - {:error, %{exception: postgrex_error, query: query}} - end - end - end end diff --git a/apps/explorer/test/explorer/chain/import_test.exs b/apps/explorer/test/explorer/chain/import_test.exs index 33693d4962..93a981648a 100644 --- a/apps/explorer/test/explorer/chain/import_test.exs +++ b/apps/explorer/test/explorer/chain/import_test.exs @@ -1035,92 +1035,6 @@ defmodule Explorer.Chain.ImportTest do Repo.get(Transaction, "0xab349efbe1ddc6d85d84a993aa52bdaadce66e8ee166dd10013ce3f2a94ca724") end - test "updates replaced transaction's status and error message" do - replaced_transaction_hash = "0x2a263224a95275d77bc30a7e131bc64d948777946a790c0915ab293791fbcb61" - - address = insert(:address, hash: "0xb7cffe2ac19b9d5705a24cbe14fef5663af905a6") - - insert(:transaction, - from_address: address, - nonce: 1, - block_hash: nil, - index: nil, - block_number: nil, - hash: replaced_transaction_hash - ) - - assert {:ok, _} = - Import.all(%{ - addresses: %{ - params: [ - %{hash: "0x1c0fa194a9d3b44313dcd849f3c6be6ad270a0a4"}, - %{hash: "0x679ed2245eba484021c2d3f4d174fb2bb2bd0e49"}, - %{hash: "0xb7cffe2ac19b9d5705a24cbe14fef5663af905a6"}, - %{hash: "0xfa52274dd61e1643d2205169732f29114bc240b3"} - ] - }, - address_coin_balances: %{ - params: [ - %{ - address_hash: "0xb7cffe2ac19b9d5705a24cbe14fef5663af905a6", - block_number: 6_535_159 - } - ] - }, - blocks: %{ - params: [ - %{ - consensus: true, - difficulty: 242_354_495_292_210, - gas_limit: 4_703_218, - gas_used: 1_009_480, - hash: "0x1f8cde8bd326702c49e065d56b08bdc82caa0c4820d914e27026c9c68ca1cf09", - miner_hash: "0x1c0fa194a9d3b44313dcd849f3c6be6ad270a0a4", - nonce: "0xafa5fc5c07f55ba5", - number: 6_535_159, - parent_hash: "0xd2cf6cf7a3d5455f450a2a3701995a7ad51f12010674883a6690cee337f75ffa", - size: 4052, - timestamp: DateTime.from_iso8601("2018-09-10 21:34:39Z") |> elem(1), - total_difficulty: 415_641_295_487_918_824_165 - } - ] - }, - broadcast: false, - transactions: %{ - params: [ - %{ - block_hash: "0x1f8cde8bd326702c49e065d56b08bdc82caa0c4820d914e27026c9c68ca1cf09", - block_number: 6_535_159, - cumulative_gas_used: 978_227, - from_address_hash: "0xb7cffe2ac19b9d5705a24cbe14fef5663af905a6", - gas: 978_227, - gas_price: 99_000_000_000, - gas_used: 978_227, - hash: "0x1a263224a95275d77bc30a7e131bc64d948777946a790c0915ab293791fbcb61", - index: 0, - input: "0x", - nonce: 1, - r: - 33_589_694_337_999_451_585_110_289_972_555_130_664_768_096_048_542_148_916_928_040_955_524_640_045_158, - s: - 42_310_749_137_599_445_408_044_732_541_966_181_996_695_356_587_068_481_874_121_265_172_051_825_560_665, - status: nil, - to_address_hash: nil, - transaction_hash: "0x1a263224a95275d77bc30a7e131bc64d948777946a790c0915ab293791fbcb61", - transaction_index: 0, - v: 158, - value: 0 - } - ] - } - }) - - assert %Transaction{status: nil, error: nil} = - Repo.get(Transaction, "0x1a263224a95275d77bc30a7e131bc64d948777946a790c0915ab293791fbcb61") - - assert %Transaction{status: :error, error: "dropped/replaced"} = Repo.get(Transaction, replaced_transaction_hash) - end - test "uncles record their transaction indexes in transactions_forks" do miner_hash = address_hash() from_address_hash = address_hash() diff --git a/apps/indexer/lib/indexer/replaced_transaction/fetcher.ex b/apps/indexer/lib/indexer/replaced_transaction/fetcher.ex new file mode 100644 index 0000000000..a47c996775 --- /dev/null +++ b/apps/indexer/lib/indexer/replaced_transaction/fetcher.ex @@ -0,0 +1,106 @@ +defmodule Indexer.ReplacedTransaction.Fetcher do + use Spandex.Decorators + + require Logger + + alias Explorer.Chain + alias Explorer.Chain.Hash + alias Indexer.{BufferedTask, Tracer} + + @behaviour BufferedTask + + @max_batch_size 10 + @max_concurrency 4 + @defaults [ + flush_interval: :timer.seconds(3), + max_concurrency: @max_concurrency, + max_batch_size: @max_batch_size, + task_supervisor: Indexer.ReplacedTransaction.TaskSupervisor, + metadata: [fetcher: :replaced_transaction] + ] + + @spec async_fetch([ + %{ + required(:nonce) => non_neg_integer, + required(:from_address_hash) => Hash.Address.t(), + required(:block_hash) => Hash.Full.t() + } + ]) :: :ok + def async_fetch(transactions_fields, timeout \\ 5000) when is_list(transactions_fields) do + entries = Enum.map(transactions_fields, &entry/1) + + BufferedTask.buffer(__MODULE__, entries, timeout) + end + + @doc false + def child_spec([init_options, gen_server_options]) do + merged_init_opts = + @defaults + |> Keyword.merge(init_options) + + Supervisor.child_spec({BufferedTask, [{__MODULE__, merged_init_opts}, gen_server_options]}, id: __MODULE__) + end + + @impl BufferedTask + def init(initial, reducer, _) do + {:ok, final} = + Chain.stream_mined_transactions( + [:block_hash, :nonce, :from_address_hash], + initial, + fn transaction_fields, acc -> + transaction_fields + |> entry() + |> reducer.(acc) + end + ) + + final + end + + defp entry(%{ + block_hash: %Hash{bytes: block_hash_bytes}, + nonce: nonce, + from_address_hash: %Hash{bytes: from_address_hash_bytes} + }) + when is_integer(nonce) do + {block_hash_bytes, nonce, from_address_hash_bytes} + end + + defp params({block_hash_bytes, nonce, from_address_hash_bytes}) + when is_integer(nonce) do + {:ok, from_address_hash} = Hash.Address.cast(from_address_hash_bytes) + {:ok, block_hash} = Hash.Full.cast(block_hash_bytes) + + %{nonce: nonce, from_address_hash: from_address_hash, block_hash: block_hash} + end + + @impl BufferedTask + @decorate trace( + name: "fetch", + resource: "Indexer.ReplacedTransaction.Fetcher.run/2", + service: :indexer, + tracer: Tracer + ) + def run(entries, _) do + Logger.debug("fetching replaced transactions for transactions") + + try do + {:ok, _} = + entries + |> Enum.map(¶ms/1) + |> Chain.update_replaced_transactions() + + :ok + rescue + reason -> + Logger.error(fn -> + [ + "failed to update replaced transactions for transactions: ", + inspect(reason) + ] + end) + + {:retry, entries} + end + end +end