|
|
@ -53,12 +53,12 @@ defmodule Indexer.ReplacedTransaction.Fetcher do |
|
|
|
@impl BufferedTask |
|
|
|
@impl BufferedTask |
|
|
|
def init(initial, reducer, _) do |
|
|
|
def init(initial, reducer, _) do |
|
|
|
{:ok, final} = |
|
|
|
{:ok, final} = |
|
|
|
Chain.stream_mined_transactions( |
|
|
|
[:block_hash, :nonce, :from_address_hash, :hash] |
|
|
|
[:block_hash, :nonce, :from_address_hash], |
|
|
|
|> Chain.stream_pending_transactions( |
|
|
|
initial, |
|
|
|
initial, |
|
|
|
fn transaction_fields, acc -> |
|
|
|
fn transaction_fields, acc -> |
|
|
|
transaction_fields |
|
|
|
transaction_fields |
|
|
|
|> entry() |
|
|
|
|> pending_entry() |
|
|
|
|> reducer.(acc) |
|
|
|
|> reducer.(acc) |
|
|
|
end |
|
|
|
end |
|
|
|
) |
|
|
|
) |
|
|
@ -75,6 +75,10 @@ defmodule Indexer.ReplacedTransaction.Fetcher do |
|
|
|
{block_hash_bytes, nonce, from_address_hash_bytes} |
|
|
|
{block_hash_bytes, nonce, from_address_hash_bytes} |
|
|
|
end |
|
|
|
end |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
defp pending_entry(%{hash: %Hash{bytes: hash}, nonce: nonce, from_address_hash: %Hash{bytes: from_address_hash_bytes}}) do |
|
|
|
|
|
|
|
{:pending, nonce, from_address_hash_bytes, hash} |
|
|
|
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
|
|
defp params({block_hash_bytes, nonce, from_address_hash_bytes}) |
|
|
|
defp params({block_hash_bytes, nonce, from_address_hash_bytes}) |
|
|
|
when is_integer(nonce) do |
|
|
|
when is_integer(nonce) do |
|
|
|
{:ok, from_address_hash} = Hash.Address.cast(from_address_hash_bytes) |
|
|
|
{:ok, from_address_hash} = Hash.Address.cast(from_address_hash_bytes) |
|
|
@ -83,6 +87,10 @@ defmodule Indexer.ReplacedTransaction.Fetcher do |
|
|
|
%{nonce: nonce, from_address_hash: from_address_hash, block_hash: block_hash} |
|
|
|
%{nonce: nonce, from_address_hash: from_address_hash, block_hash: block_hash} |
|
|
|
end |
|
|
|
end |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
defp pending_params({:pending, nonce, from_address_hash, hash}) do |
|
|
|
|
|
|
|
%{nonce: nonce, from_address_hash: from_address_hash, hash: hash} |
|
|
|
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
|
|
@impl BufferedTask |
|
|
|
@impl BufferedTask |
|
|
|
@decorate trace( |
|
|
|
@decorate trace( |
|
|
|
name: "fetch", |
|
|
|
name: "fetch", |
|
|
@ -94,7 +102,17 @@ defmodule Indexer.ReplacedTransaction.Fetcher do |
|
|
|
Logger.debug("fetching replaced transactions for transactions") |
|
|
|
Logger.debug("fetching replaced transactions for transactions") |
|
|
|
|
|
|
|
|
|
|
|
try do |
|
|
|
try do |
|
|
|
|
|
|
|
{pending, realtime} = |
|
|
|
entries |
|
|
|
entries |
|
|
|
|
|
|
|
|> Enum.split_with(fn entry -> |
|
|
|
|
|
|
|
match?({:pending, _, _, _}, entry) |
|
|
|
|
|
|
|
end) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pending |
|
|
|
|
|
|
|
|> Enum.map(&pending_params/1) |
|
|
|
|
|
|
|
|> Chain.find_and_update_replaced_transactions() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
realtime |
|
|
|
|> Enum.map(¶ms/1) |
|
|
|
|> Enum.map(¶ms/1) |
|
|
|
|> Chain.update_replaced_transactions() |
|
|
|
|> Chain.update_replaced_transactions() |
|
|
|
|
|
|
|
|
|
|
|