add replaced transaction fetcher

pull/1370/head
Ayrat Badykov 6 years ago
parent 0d41d2a060
commit 5bbb8c9b2e
No known key found for this signature in database
GPG Key ID: B44668E265E9396F
  1. 1
      apps/explorer/lib/explorer/chain/transaction.ex
  2. 109
      apps/indexer/lib/indexer/replaced_transaction/fetcher.ex
  3. 39
      apps/indexer/lib/indexer/replaced_transaction/supervisor.ex
  4. 2
      apps/indexer/lib/indexer/shrinkable/supervisor.ex
  5. 62
      apps/indexer/test/indexer/replaced_transaction/fetcher_test.exs
  6. 17
      apps/indexer/test/support/indexer/replaced_transaction/supervisor/case.ex

@ -83,6 +83,7 @@ defmodule Explorer.Chain.Transaction do
`transaction`'s `index`. `nil` when transaction is pending.
* `error` - the `error` from the last `t:Explorer.Chain.InternalTransaction.t/0` in `internal_transactions` that
caused `status` to be `:error`. Only set after `internal_transactions_index_at` is set AND if there was an error.
Also, `error` is set if transaction is replaced/dropped
* `forks` - copies of this transactions that were collated into `uncles` not on the primary consensus of the chain.
* `from_address` - the source of `value`
* `from_address_hash` - foreign key of `from_address`

@ -0,0 +1,109 @@
defmodule Indexer.ReplacedTransaction.Fetcher do
@moduledoc """
A transaction can get dropped and replaced when a newly created transaction with the same `FROM`
account nonce is accepted and confirmed by the network.
And because it has the same account nonce as the previous transaction it replaces the previous txhash.
This fetcher finds these transaction and sets them `failed` status with `dropped/replaced` error.
"""
use GenServer
require Logger
import Ecto.Query, only: [from: 2]
alias Explorer.Chain.Transaction
alias Explorer.Repo
@default_interval 1_000
# 60 seconds
@query_timeout 60_000
defstruct interval: @default_interval,
task: nil
def child_spec([init_arguments]) do
child_spec([init_arguments, []])
end
def child_spec([_init_arguments, _gen_server_options] = start_link_arguments) do
default = %{
id: __MODULE__,
start: {__MODULE__, :start_link, start_link_arguments}
}
Supervisor.child_spec(default, [])
end
def start_link(arguments, gen_server_options \\ []) do
GenServer.start_link(__MODULE__, arguments, gen_server_options)
end
@impl GenServer
def init(opts) when is_list(opts) do
Logger.metadata(fetcher: :pending_transaction)
opts =
:indexer
|> Application.get_all_env()
|> Keyword.merge(opts)
state =
%__MODULE__{
interval: opts[:pending_transaction_interval] || @default_interval
}
|> schedule_find()
{:ok, state}
end
@impl GenServer
def handle_info(:find, %__MODULE__{} = state) do
task = Task.Supervisor.async_nolink(ReplacedTransaction.TaskSupervisor, fn -> task() end)
{:noreply, %__MODULE__{state | task: task}}
end
def handle_info({ref, _}, %__MODULE__{task: %Task{ref: ref}} = state) do
Process.demonitor(ref, [:flush])
{:noreply, schedule_find(state)}
end
def handle_info(
{:DOWN, ref, :process, pid, reason},
%__MODULE__{task: %Task{pid: pid, ref: ref}} = state
) do
Logger.error(fn -> "replaced transaction finder task exited due to #{inspect(reason)}. Rescheduling." end)
{:noreply, schedule_find(state)}
end
defp schedule_find(%__MODULE__{interval: interval} = state) do
Process.send_after(self(), :find, interval)
%__MODULE__{state | task: nil}
end
defp task do
Logger.metadata(fetcher: :replaced_transaction)
query =
from(transaction in Transaction,
where: is_nil(transaction.block_number),
join: mined_transaction in Transaction,
where:
transaction.from_address_hash == mined_transaction.from_address_hash and
transaction.nonce == mined_transaction.nonce and not is_nil(mined_transaction.block_number),
update: [
set: [status: ^:error, error: "dropped/replaced"]
]
)
try do
Repo.update_all(query, [])
rescue
error ->
Logger.error(fn -> ["Failed to make pending transactions dropped: ", inspect(error)] end)
end
end
end

@ -0,0 +1,39 @@
defmodule Indexer.ReplacedTransaction.Supervisor do
@moduledoc """
Supervises `Indexer.ReplacedTransaction.Fetcher` and its batch tasks through
`Indexer.ReplacedTransaction.TaskSupervisor`.
"""
use Supervisor
alias Indexer.ReplacedTransaction.Fetcher
def child_spec([init_arguments]) do
child_spec([init_arguments, []])
end
def child_spec([_init_arguments, _gen_server_options] = start_link_arguments) do
default = %{
id: __MODULE__,
start: {__MODULE__, :start_link, start_link_arguments},
type: :supervisor
}
Supervisor.child_spec(default, [])
end
def start_link(arguments, gen_server_options \\ []) do
Supervisor.start_link(__MODULE__, arguments, Keyword.put_new(gen_server_options, :name, __MODULE__))
end
@impl Supervisor
def init(fetcher_arguments) do
Supervisor.init(
[
{Task.Supervisor, name: Indexer.ReplacedTransaction.TaskSupervisor},
{Fetcher, [fetcher_arguments, [name: Fetcher]]}
],
strategy: :one_for_one
)
end
end

@ -11,6 +11,7 @@ defmodule Indexer.Shrinkable.Supervisor do
CoinBalance,
InternalTransaction,
PendingTransaction,
ReplacedTransaction,
Token,
TokenBalance,
TokenTransfer
@ -61,6 +62,7 @@ defmodule Indexer.Shrinkable.Supervisor do
]},
{PendingTransaction.Supervisor,
[[json_rpc_named_arguments: json_rpc_named_arguments], [name: PendingTransactionFetcher]]},
{ReplacedTransaction.Supervisor, [[], [name: PendingTransactionFetcher]]},
{Code.Supervisor,
[
[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor],

@ -0,0 +1,62 @@
defmodule Indexer.ReplacedTransaction.FetcherTest do
use Explorer.DataCase
alias Explorer.Chain.Transaction
alias Indexer.ReplacedTransaction
@moduletag :capture_log
describe "start_link/1" do
test "starts finding replaced transactions" do
common_from_address_hash = %Explorer.Chain.Hash{
byte_count: 20,
bytes: <<0x4615CC10092B514258577DAFCA98C142577F1578::big-integer-size(20)-unit(8)>>
}
address = insert(:address, hash: common_from_address_hash)
common_nonce = 10
replaced_transaction_hash = %Explorer.Chain.Hash{
byte_count: 32,
bytes: <<0x9FC76417374AA880D4449A1F7F31EC597F00B1F6F3DD2D66F4C9C6C445836D8B::big-integer-size(32)-unit(8)>>
}
insert(:transaction,
hash: replaced_transaction_hash,
nonce: common_nonce,
from_address: address
)
mined_transaction_hash = %Explorer.Chain.Hash{
byte_count: 32,
bytes: <<0x8FC76417374AA880D4449A1F7F31EC597F00B1F6F3DD2D66F4C9C6C445836D8B::big-integer-size(32)-unit(8)>>
}
block = insert(:block)
insert(:transaction,
hash: mined_transaction_hash,
nonce: common_nonce,
from_address: address,
block_number: block.number,
cumulative_gas_used: 10,
gas_used: 1,
index: 0,
status: :ok
)
ReplacedTransaction.Supervisor.Case.start_supervised!()
found_replaced_transaction =
wait_for_results(fn ->
Repo.one!(
from(transaction in Transaction,
where: transaction.status == ^:error and transaction.error == "dropped/replaced"
)
)
end)
assert found_replaced_transaction.hash == replaced_transaction_hash
end
end
end

@ -0,0 +1,17 @@
defmodule Indexer.ReplacedTransaction.Supervisor.Case do
alias Indexer.ReplacedTransaction
def start_supervised!(fetcher_arguments \\ []) when is_list(fetcher_arguments) do
merged_fetcher_arguments =
Keyword.merge(
fetcher_arguments,
flush_interval: 50,
max_batch_size: 1,
max_concurrency: 1
)
[merged_fetcher_arguments]
|> ReplacedTransaction.Supervisor.child_spec()
|> ExUnit.Callbacks.start_supervised!()
end
end
Loading…
Cancel
Save