update replaced transaction on import

pull/1370/head
Ayrat Badykov 6 years ago
parent fb597cd6e3
commit 915d053dd5
No known key found for this signature in database
GPG Key ID: B44668E265E9396F
  1. 35
      apps/explorer/lib/explorer/chain/import/runner/transactions.ex
  2. 26
      apps/explorer/priv/repo/migrations/20190124082812_add_index_on_transaction_nonce_and_from_address_hash.exs

@ -42,9 +42,17 @@ defmodule Explorer.Chain.Import.Runner.Transactions do
|> Map.put(:timestamps, timestamps)
|> Map.put(:token_transfer_transaction_hash_set, token_transfer_transaction_hash_set(options))
Multi.run(multi, :transactions, fn repo, _ ->
transactions_timeout = options[option_key()][:timeout] || timeout()
update_transactions_options = %{timeout: transactions_timeout}
multi
|> Multi.run(:transactions, fn repo, _ ->
insert(repo, changes_list, insert_options)
end)
|> Multi.run(:replaced_transactions, fn repo, %{transactions: transactions} ->
update_replaced_transactions(repo, transactions, update_transactions_options)
end)
end
@impl Import.Runner
@ -178,4 +186,29 @@ defmodule Explorer.Chain.Import.Runner.Transactions do
end
defp put_internal_transactions_indexed_at?(_, _), do: false
defp update_replaced_transactions(repo, transactions, %{timeout: timeout}) do
transactions
|> Enum.filter(& &1.transaction.block_hash)
|> Enum.map(fn transaction -> {transaction.nonce, transaction.from_address_hash} end)
|> Enum.uniq()
|> Enum.map(fn {nonce, from_address_hash} ->
from(t in Transaction,
where: t.nonce == ^nonce and t.from_address_hash == ^from_address_hash and is_nil(t.block_hash),
update: [
set: [status: ^:error, error: "dropped/replaced"]
]
)
end)
|> Enum.map(fn query ->
try do
{_, result} = repo.update(query, [], timeout: timeout)
{:ok, result}
rescue
postgrex_error in Postgrex.Error ->
{:error, %{exception: postgrex_error, query: query}}
end
end)
end
end

@ -0,0 +1,26 @@
defmodule Explorer.Repo.Migrations.AddIndexOnTransactionNonceAndFromAddressHash do
use Ecto.Migration
alias Ecto.Adapters.SQL
alias Explorer.Repo
# 30 minutes
@timeout 60_000 * 30
def change do
create(index(:transactions, [:nonce, :from_address_hash]))
# for replaced/dropeed transactions
create(index(:transactions, [:block_hash, :error]))
query = "UPDATE transactions SET error = 'dropped/replaced', status = 0 FROM transactions t1
INNER JOIN transactions t2
ON t1.from_address_hash = t2.from_address_hash AND t1.nonce = t2.nonce
WHERE t1.block_hash IS NULL AND t2.block_hash IS NOT NULL"
SQL.query!(
Repo,
query,
timeout: @timeout
)
end
end
Loading…
Cancel
Save