From 5bbb8c9b2e82db7f1e25ff814133323fea0939e1 Mon Sep 17 00:00:00 2001 From: Ayrat Badykov Date: Tue, 22 Jan 2019 14:06:30 +0300 Subject: [PATCH] add replaced transaction fetcher --- .../lib/explorer/chain/transaction.ex | 1 + .../indexer/replaced_transaction/fetcher.ex | 109 ++++++++++++++++++ .../replaced_transaction/supervisor.ex | 39 +++++++ .../lib/indexer/shrinkable/supervisor.ex | 2 + .../replaced_transaction/fetcher_test.exs | 62 ++++++++++ .../replaced_transaction/supervisor/case.ex | 17 +++ 6 files changed, 230 insertions(+) create mode 100644 apps/indexer/lib/indexer/replaced_transaction/fetcher.ex create mode 100644 apps/indexer/lib/indexer/replaced_transaction/supervisor.ex create mode 100644 apps/indexer/test/indexer/replaced_transaction/fetcher_test.exs create mode 100644 apps/indexer/test/support/indexer/replaced_transaction/supervisor/case.ex diff --git a/apps/explorer/lib/explorer/chain/transaction.ex b/apps/explorer/lib/explorer/chain/transaction.ex index 512a9bf762..3568006f4e 100644 --- a/apps/explorer/lib/explorer/chain/transaction.ex +++ b/apps/explorer/lib/explorer/chain/transaction.ex @@ -83,6 +83,7 @@ defmodule Explorer.Chain.Transaction do `transaction`'s `index`. `nil` when transaction is pending. * `error` - the `error` from the last `t:Explorer.Chain.InternalTransaction.t/0` in `internal_transactions` that caused `status` to be `:error`. Only set after `internal_transactions_index_at` is set AND if there was an error. + Also, `error` is set if transaction is replaced/dropped * `forks` - copies of this transactions that were collated into `uncles` not on the primary consensus of the chain. * `from_address` - the source of `value` * `from_address_hash` - foreign key of `from_address` diff --git a/apps/indexer/lib/indexer/replaced_transaction/fetcher.ex b/apps/indexer/lib/indexer/replaced_transaction/fetcher.ex new file mode 100644 index 0000000000..3da5162d93 --- /dev/null +++ b/apps/indexer/lib/indexer/replaced_transaction/fetcher.ex @@ -0,0 +1,109 @@ +defmodule Indexer.ReplacedTransaction.Fetcher do + @moduledoc """ + A transaction can get dropped and replaced when a newly created transaction with the same `FROM` + account nonce is accepted and confirmed by the network. + And because it has the same account nonce as the previous transaction it replaces the previous txhash. + + This fetcher finds these transaction and sets them `failed` status with `dropped/replaced` error. + """ + use GenServer + + require Logger + + import Ecto.Query, only: [from: 2] + + alias Explorer.Chain.Transaction + alias Explorer.Repo + + @default_interval 1_000 + + # 60 seconds + @query_timeout 60_000 + + defstruct interval: @default_interval, + task: nil + + def child_spec([init_arguments]) do + child_spec([init_arguments, []]) + end + + def child_spec([_init_arguments, _gen_server_options] = start_link_arguments) do + default = %{ + id: __MODULE__, + start: {__MODULE__, :start_link, start_link_arguments} + } + + Supervisor.child_spec(default, []) + end + + def start_link(arguments, gen_server_options \\ []) do + GenServer.start_link(__MODULE__, arguments, gen_server_options) + end + + @impl GenServer + def init(opts) when is_list(opts) do + Logger.metadata(fetcher: :pending_transaction) + + opts = + :indexer + |> Application.get_all_env() + |> Keyword.merge(opts) + + state = + %__MODULE__{ + interval: opts[:pending_transaction_interval] || @default_interval + } + |> schedule_find() + + {:ok, state} + end + + @impl GenServer + def handle_info(:find, %__MODULE__{} = state) do + task = Task.Supervisor.async_nolink(ReplacedTransaction.TaskSupervisor, fn -> task() end) + {:noreply, %__MODULE__{state | task: task}} + end + + def handle_info({ref, _}, %__MODULE__{task: %Task{ref: ref}} = state) do + Process.demonitor(ref, [:flush]) + + {:noreply, schedule_find(state)} + end + + def handle_info( + {:DOWN, ref, :process, pid, reason}, + %__MODULE__{task: %Task{pid: pid, ref: ref}} = state + ) do + Logger.error(fn -> "replaced transaction finder task exited due to #{inspect(reason)}. Rescheduling." end) + + {:noreply, schedule_find(state)} + end + + defp schedule_find(%__MODULE__{interval: interval} = state) do + Process.send_after(self(), :find, interval) + %__MODULE__{state | task: nil} + end + + defp task do + Logger.metadata(fetcher: :replaced_transaction) + + query = + from(transaction in Transaction, + where: is_nil(transaction.block_number), + join: mined_transaction in Transaction, + where: + transaction.from_address_hash == mined_transaction.from_address_hash and + transaction.nonce == mined_transaction.nonce and not is_nil(mined_transaction.block_number), + update: [ + set: [status: ^:error, error: "dropped/replaced"] + ] + ) + + try do + Repo.update_all(query, []) + rescue + error -> + Logger.error(fn -> ["Failed to make pending transactions dropped: ", inspect(error)] end) + end + end +end diff --git a/apps/indexer/lib/indexer/replaced_transaction/supervisor.ex b/apps/indexer/lib/indexer/replaced_transaction/supervisor.ex new file mode 100644 index 0000000000..f6e8e8b2f8 --- /dev/null +++ b/apps/indexer/lib/indexer/replaced_transaction/supervisor.ex @@ -0,0 +1,39 @@ +defmodule Indexer.ReplacedTransaction.Supervisor do + @moduledoc """ + Supervises `Indexer.ReplacedTransaction.Fetcher` and its batch tasks through + `Indexer.ReplacedTransaction.TaskSupervisor`. + """ + + use Supervisor + + alias Indexer.ReplacedTransaction.Fetcher + + def child_spec([init_arguments]) do + child_spec([init_arguments, []]) + end + + def child_spec([_init_arguments, _gen_server_options] = start_link_arguments) do + default = %{ + id: __MODULE__, + start: {__MODULE__, :start_link, start_link_arguments}, + type: :supervisor + } + + Supervisor.child_spec(default, []) + end + + def start_link(arguments, gen_server_options \\ []) do + Supervisor.start_link(__MODULE__, arguments, Keyword.put_new(gen_server_options, :name, __MODULE__)) + end + + @impl Supervisor + def init(fetcher_arguments) do + Supervisor.init( + [ + {Task.Supervisor, name: Indexer.ReplacedTransaction.TaskSupervisor}, + {Fetcher, [fetcher_arguments, [name: Fetcher]]} + ], + strategy: :one_for_one + ) + end +end diff --git a/apps/indexer/lib/indexer/shrinkable/supervisor.ex b/apps/indexer/lib/indexer/shrinkable/supervisor.ex index 756c91d992..15390fdf50 100644 --- a/apps/indexer/lib/indexer/shrinkable/supervisor.ex +++ b/apps/indexer/lib/indexer/shrinkable/supervisor.ex @@ -11,6 +11,7 @@ defmodule Indexer.Shrinkable.Supervisor do CoinBalance, InternalTransaction, PendingTransaction, + ReplacedTransaction, Token, TokenBalance, TokenTransfer @@ -61,6 +62,7 @@ defmodule Indexer.Shrinkable.Supervisor do ]}, {PendingTransaction.Supervisor, [[json_rpc_named_arguments: json_rpc_named_arguments], [name: PendingTransactionFetcher]]}, + {ReplacedTransaction.Supervisor, [[], [name: PendingTransactionFetcher]]}, {Code.Supervisor, [ [json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor], diff --git a/apps/indexer/test/indexer/replaced_transaction/fetcher_test.exs b/apps/indexer/test/indexer/replaced_transaction/fetcher_test.exs new file mode 100644 index 0000000000..41473e87c5 --- /dev/null +++ b/apps/indexer/test/indexer/replaced_transaction/fetcher_test.exs @@ -0,0 +1,62 @@ +defmodule Indexer.ReplacedTransaction.FetcherTest do + use Explorer.DataCase + + alias Explorer.Chain.Transaction + alias Indexer.ReplacedTransaction + + @moduletag :capture_log + + describe "start_link/1" do + test "starts finding replaced transactions" do + common_from_address_hash = %Explorer.Chain.Hash{ + byte_count: 20, + bytes: <<0x4615CC10092B514258577DAFCA98C142577F1578::big-integer-size(20)-unit(8)>> + } + + address = insert(:address, hash: common_from_address_hash) + + common_nonce = 10 + + replaced_transaction_hash = %Explorer.Chain.Hash{ + byte_count: 32, + bytes: <<0x9FC76417374AA880D4449A1F7F31EC597F00B1F6F3DD2D66F4C9C6C445836D8B::big-integer-size(32)-unit(8)>> + } + + insert(:transaction, + hash: replaced_transaction_hash, + nonce: common_nonce, + from_address: address + ) + + mined_transaction_hash = %Explorer.Chain.Hash{ + byte_count: 32, + bytes: <<0x8FC76417374AA880D4449A1F7F31EC597F00B1F6F3DD2D66F4C9C6C445836D8B::big-integer-size(32)-unit(8)>> + } + + block = insert(:block) + insert(:transaction, + hash: mined_transaction_hash, + nonce: common_nonce, + from_address: address, + block_number: block.number, + cumulative_gas_used: 10, + gas_used: 1, + index: 0, + status: :ok + ) + + ReplacedTransaction.Supervisor.Case.start_supervised!() + + found_replaced_transaction = + wait_for_results(fn -> + Repo.one!( + from(transaction in Transaction, + where: transaction.status == ^:error and transaction.error == "dropped/replaced" + ) + ) + end) + + assert found_replaced_transaction.hash == replaced_transaction_hash + end + end +end diff --git a/apps/indexer/test/support/indexer/replaced_transaction/supervisor/case.ex b/apps/indexer/test/support/indexer/replaced_transaction/supervisor/case.ex new file mode 100644 index 0000000000..e3fdd7124b --- /dev/null +++ b/apps/indexer/test/support/indexer/replaced_transaction/supervisor/case.ex @@ -0,0 +1,17 @@ +defmodule Indexer.ReplacedTransaction.Supervisor.Case do + alias Indexer.ReplacedTransaction + + def start_supervised!(fetcher_arguments \\ []) when is_list(fetcher_arguments) do + merged_fetcher_arguments = + Keyword.merge( + fetcher_arguments, + flush_interval: 50, + max_batch_size: 1, + max_concurrency: 1 + ) + + [merged_fetcher_arguments] + |> ReplacedTransaction.Supervisor.child_spec() + |> ExUnit.Callbacks.start_supervised!() + end +end