|
|
|
@ -6,14 +6,12 @@ defmodule Indexer.ReplacedTransaction.Fetcher do |
|
|
|
|
|
|
|
|
|
This fetcher finds these transaction and sets them `failed` status with `dropped/replaced` error. |
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
use GenServer |
|
|
|
|
|
|
|
|
|
require Logger |
|
|
|
|
|
|
|
|
|
import Ecto.Query, only: [from: 2] |
|
|
|
|
|
|
|
|
|
alias Explorer.Chain.Transaction |
|
|
|
|
alias Explorer.Repo |
|
|
|
|
alias Explorer.Chain |
|
|
|
|
alias Indexer.ReplacedTransaction |
|
|
|
|
|
|
|
|
|
# 1 second |
|
|
|
@ -23,6 +21,7 @@ defmodule Indexer.ReplacedTransaction.Fetcher do |
|
|
|
|
@query_timeout 60_000 |
|
|
|
|
|
|
|
|
|
defstruct interval: @default_interval, |
|
|
|
|
query_timeout: @query_timeout, |
|
|
|
|
task: nil |
|
|
|
|
|
|
|
|
|
def child_spec([init_arguments]) do |
|
|
|
@ -53,7 +52,8 @@ defmodule Indexer.ReplacedTransaction.Fetcher do |
|
|
|
|
|
|
|
|
|
state = |
|
|
|
|
%__MODULE__{ |
|
|
|
|
interval: opts[:replaced_transaction_interval] || @default_interval |
|
|
|
|
interval: opts[:replaced_transaction_interval] || @default_interval, |
|
|
|
|
query_timeout: opts[:replaced_transaction_query_timeout] || @query_timeout, |
|
|
|
|
} |
|
|
|
|
|> schedule_find() |
|
|
|
|
|
|
|
|
@ -62,7 +62,7 @@ defmodule Indexer.ReplacedTransaction.Fetcher do |
|
|
|
|
|
|
|
|
|
@impl GenServer |
|
|
|
|
def handle_info(:find, %__MODULE__{} = state) do |
|
|
|
|
task = Task.Supervisor.async_nolink(ReplacedTransaction.TaskSupervisor, fn -> task() end) |
|
|
|
|
task = Task.Supervisor.async_nolink(ReplacedTransaction.TaskSupervisor, fn -> task(state) end) |
|
|
|
|
{:noreply, %__MODULE__{state | task: task}} |
|
|
|
|
end |
|
|
|
|
|
|
|
|
@ -86,23 +86,11 @@ defmodule Indexer.ReplacedTransaction.Fetcher do |
|
|
|
|
%__MODULE__{state | task: nil} |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
defp task do |
|
|
|
|
defp task(%__MODULE__{query_timeout: query_timeout}) do |
|
|
|
|
Logger.metadata(fetcher: :replaced_transaction) |
|
|
|
|
|
|
|
|
|
query = |
|
|
|
|
from(transaction in Transaction, |
|
|
|
|
where: is_nil(transaction.block_number), |
|
|
|
|
join: mined_transaction in Transaction, |
|
|
|
|
where: |
|
|
|
|
transaction.from_address_hash == mined_transaction.from_address_hash and |
|
|
|
|
transaction.nonce == mined_transaction.nonce and not is_nil(mined_transaction.block_number), |
|
|
|
|
update: [ |
|
|
|
|
set: [status: ^:error, error: "dropped/replaced"] |
|
|
|
|
] |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
try do |
|
|
|
|
Repo.update_all(query, [], timeout: @query_timeout) |
|
|
|
|
Chain.update_replaced_transactions(query_timeout) |
|
|
|
|
rescue |
|
|
|
|
error -> |
|
|
|
|
Logger.error(fn -> ["Failed to make pending transactions dropped: ", inspect(error)] end) |
|
|
|
|