remove replaced transactions fetcher

pull/1370/head
Ayrat Badykov 6 years ago
parent b780876b3c
commit 3c1dd1d71f
No known key found for this signature in database
GPG Key ID: B44668E265E9396F
  1. 22
      apps/explorer/lib/explorer/chain.ex
  2. 54
      apps/explorer/test/explorer/chain_test.exs
  3. 99
      apps/indexer/lib/indexer/replaced_transaction/fetcher.ex
  4. 39
      apps/indexer/lib/indexer/replaced_transaction/supervisor.ex
  5. 2
      apps/indexer/lib/indexer/shrinkable/supervisor.ex
  6. 70
      apps/indexer/test/indexer/replaced_transaction/fetcher_test.exs
  7. 17
      apps/indexer/test/support/indexer/replaced_transaction/supervisor/case.ex

@ -1556,7 +1556,7 @@ defmodule Explorer.Chain do
defp pending_transactions_query(query) do defp pending_transactions_query(query) do
from(transaction in query, from(transaction in query,
where: is_nil(transaction.block_hash) and is_nil(transaction.error) where: is_nil(transaction.block_hash)
) )
end end
@ -2131,26 +2131,6 @@ defmodule Explorer.Chain do
|> Repo.all() |> Repo.all()
end end
@doc """
Finds replaced/dropped transactions and sets their status to `:error` with `dropped/replaced` error.
"""
@spec update_replaced_transactions(:infinity | non_neg_integer()) :: {integer(), nil | [term()]}
def update_replaced_transactions(timeout \\ :infinity) do
query =
from(transaction in Transaction,
where: is_nil(transaction.block_number),
join: mined_transaction in Transaction,
where:
transaction.from_address_hash == mined_transaction.from_address_hash and
transaction.nonce == mined_transaction.nonce and not is_nil(mined_transaction.block_number),
update: [
set: [status: ^:error, error: "dropped/replaced"]
]
)
Repo.update_all(query, [], timeout: timeout)
end
@doc """ @doc """
Update a new `t:Token.t/0` record. Update a new `t:Token.t/0` record.

@ -937,60 +937,6 @@ defmodule Explorer.ChainTest do
end end
end end
describe "update_replaced_transactions" do
test "finds and updates replaced transaction" do
common_from_address_hash = %Explorer.Chain.Hash{
byte_count: 20,
bytes: <<0x4615CC10092B514258577DAFCA98C142577F1578::big-integer-size(20)-unit(8)>>
}
address = insert(:address, hash: common_from_address_hash)
common_nonce = 10
replaced_transaction_hash = %Explorer.Chain.Hash{
byte_count: 32,
bytes: <<0x9FC76417374AA880D4449A1F7F31EC597F00B1F6F3DD2D66F4C9C6C445836D8B::big-integer-size(32)-unit(8)>>
}
insert(:transaction,
hash: replaced_transaction_hash,
nonce: common_nonce,
from_address: address
)
mined_transaction_hash = %Explorer.Chain.Hash{
byte_count: 32,
bytes: <<0x8FC76417374AA880D4449A1F7F31EC597F00B1F6F3DD2D66F4C9C6C445836D8B::big-integer-size(32)-unit(8)>>
}
block = insert(:block)
insert(:transaction,
hash: mined_transaction_hash,
nonce: common_nonce,
from_address: address,
block_number: block.number,
block_hash: block.hash,
cumulative_gas_used: 10,
gas_used: 1,
index: 0,
status: :ok
)
{1, nil} = Explorer.Chain.update_replaced_transactions()
found_replaced_transaction =
Repo.one!(
from(transaction in Transaction,
where: transaction.status == ^:error and transaction.error == "dropped/replaced"
)
)
assert found_replaced_transaction.hash == replaced_transaction_hash
end
end
# Full tests in `test/explorer/import_test.exs` # Full tests in `test/explorer/import_test.exs`
describe "import/1" do describe "import/1" do
@import_data %{ @import_data %{

@ -1,99 +0,0 @@
defmodule Indexer.ReplacedTransaction.Fetcher do
@moduledoc """
A transaction can get dropped and replaced when a newly created transaction with the same `FROM`
account nonce is accepted and confirmed by the network.
And because it has the same account nonce as the previous transaction it replaces the previous txhash.
This fetcher finds these transaction and sets them `failed` status with `dropped/replaced` error.
"""
use GenServer
require Logger
alias Explorer.Chain
alias Indexer.ReplacedTransaction
# 1 second
@default_interval 1_000
# 60 seconds
@query_timeout 60_000
defstruct interval: @default_interval,
query_timeout: @query_timeout,
task: nil
def child_spec([init_arguments]) do
child_spec([init_arguments, []])
end
def child_spec([_init_arguments, _gen_server_options] = start_link_arguments) do
default = %{
id: __MODULE__,
start: {__MODULE__, :start_link, start_link_arguments}
}
Supervisor.child_spec(default, [])
end
def start_link(arguments, gen_server_options \\ []) do
GenServer.start_link(__MODULE__, arguments, gen_server_options)
end
@impl GenServer
def init(opts) when is_list(opts) do
Logger.metadata(fetcher: :pending_transaction)
opts =
:indexer
|> Application.get_all_env()
|> Keyword.merge(opts)
state =
%__MODULE__{
interval: opts[:replaced_transaction_interval] || @default_interval,
query_timeout: opts[:replaced_transaction_query_timeout] || @query_timeout
}
|> schedule_find()
{:ok, state}
end
@impl GenServer
def handle_info(:find, %__MODULE__{} = state) do
task = Task.Supervisor.async_nolink(ReplacedTransaction.TaskSupervisor, fn -> task(state) end)
{:noreply, %__MODULE__{state | task: task}}
end
def handle_info({ref, _}, %__MODULE__{task: %Task{ref: ref}} = state) do
Process.demonitor(ref, [:flush])
{:noreply, schedule_find(state)}
end
def handle_info(
{:DOWN, ref, :process, pid, reason},
%__MODULE__{task: %Task{pid: pid, ref: ref}} = state
) do
Logger.error(fn -> "replaced transaction finder task exited due to #{inspect(reason)}. Rescheduling." end)
{:noreply, schedule_find(state)}
end
defp schedule_find(%__MODULE__{interval: interval} = state) do
Process.send_after(self(), :find, interval)
%__MODULE__{state | task: nil}
end
defp task(%__MODULE__{query_timeout: query_timeout}) do
Logger.metadata(fetcher: :replaced_transaction)
try do
Chain.update_replaced_transactions(query_timeout)
rescue
error ->
Logger.error(fn -> ["Failed to make pending transactions dropped: ", inspect(error)] end)
end
end
end

@ -1,39 +0,0 @@
defmodule Indexer.ReplacedTransaction.Supervisor do
@moduledoc """
Supervises `Indexer.ReplacedTransaction.Fetcher` and its batch tasks through
`Indexer.ReplacedTransaction.TaskSupervisor`.
"""
use Supervisor
alias Indexer.ReplacedTransaction.Fetcher
def child_spec([init_arguments]) do
child_spec([init_arguments, []])
end
def child_spec([_init_arguments, _gen_server_options] = start_link_arguments) do
default = %{
id: __MODULE__,
start: {__MODULE__, :start_link, start_link_arguments},
type: :supervisor
}
Supervisor.child_spec(default, [])
end
def start_link(arguments, gen_server_options \\ []) do
Supervisor.start_link(__MODULE__, arguments, Keyword.put_new(gen_server_options, :name, __MODULE__))
end
@impl Supervisor
def init(fetcher_arguments) do
Supervisor.init(
[
{Task.Supervisor, name: Indexer.ReplacedTransaction.TaskSupervisor},
{Fetcher, [fetcher_arguments, [name: Fetcher]]}
],
strategy: :one_for_one
)
end
end

@ -11,7 +11,6 @@ defmodule Indexer.Shrinkable.Supervisor do
CoinBalance, CoinBalance,
InternalTransaction, InternalTransaction,
PendingTransaction, PendingTransaction,
ReplacedTransaction,
Token, Token,
TokenBalance, TokenBalance,
TokenTransfer TokenTransfer
@ -62,7 +61,6 @@ defmodule Indexer.Shrinkable.Supervisor do
]}, ]},
{PendingTransaction.Supervisor, {PendingTransaction.Supervisor,
[[json_rpc_named_arguments: json_rpc_named_arguments], [name: PendingTransactionFetcher]]}, [[json_rpc_named_arguments: json_rpc_named_arguments], [name: PendingTransactionFetcher]]},
{ReplacedTransaction.Supervisor, [[], [name: ReplacedTransactionFetcher]]},
{Code.Supervisor, {Code.Supervisor,
[ [
[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor], [json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor],

@ -1,70 +0,0 @@
defmodule Indexer.ReplacedTransaction.FetcherTest do
use Explorer.DataCase
alias Explorer.Chain.Transaction
alias Indexer.ReplacedTransaction
@moduletag :capture_log
setup do
start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor})
:ok
end
describe "start_link/1" do
test "starts finding replaced transactions" do
common_from_address_hash = %Explorer.Chain.Hash{
byte_count: 20,
bytes: <<0x4615CC10092B514258577DAFCA98C142577F1578::big-integer-size(20)-unit(8)>>
}
address = insert(:address, hash: common_from_address_hash)
common_nonce = 10
replaced_transaction_hash = %Explorer.Chain.Hash{
byte_count: 32,
bytes: <<0x9FC76417374AA880D4449A1F7F31EC597F00B1F6F3DD2D66F4C9C6C445836D8B::big-integer-size(32)-unit(8)>>
}
insert(:transaction,
hash: replaced_transaction_hash,
nonce: common_nonce,
from_address: address
)
mined_transaction_hash = %Explorer.Chain.Hash{
byte_count: 32,
bytes: <<0x8FC76417374AA880D4449A1F7F31EC597F00B1F6F3DD2D66F4C9C6C445836D8B::big-integer-size(32)-unit(8)>>
}
block = insert(:block)
insert(:transaction,
hash: mined_transaction_hash,
nonce: common_nonce,
from_address: address,
block_number: block.number,
block_hash: block.hash,
cumulative_gas_used: 10,
gas_used: 1,
index: 0,
status: :ok
)
ReplacedTransaction.Supervisor.Case.start_supervised!()
found_replaced_transaction =
wait_for_results(fn ->
Repo.one!(
from(transaction in Transaction,
where: transaction.status == ^:error and transaction.error == "dropped/replaced"
)
)
end)
assert found_replaced_transaction.hash == replaced_transaction_hash
end
end
end

@ -1,17 +0,0 @@
defmodule Indexer.ReplacedTransaction.Supervisor.Case do
alias Indexer.ReplacedTransaction
def start_supervised!(fetcher_arguments \\ []) when is_list(fetcher_arguments) do
merged_fetcher_arguments =
Keyword.merge(
fetcher_arguments,
flush_interval: 50,
max_batch_size: 1,
max_concurrency: 1
)
[merged_fetcher_arguments]
|> ReplacedTransaction.Supervisor.child_spec()
|> ExUnit.Callbacks.start_supervised!()
end
end
Loading…
Cancel
Save