move replaced transactions update to fetcher

pull/1370/head
Ayrat Badykov 6 years ago
parent 98e0529d3f
commit cf78b8cb80
No known key found for this signature in database
GPG Key ID: B44668E265E9396F
  1. 67
      apps/explorer/lib/explorer/chain.ex
  2. 46
      apps/explorer/lib/explorer/chain/import/runner/transactions.ex
  3. 86
      apps/explorer/test/explorer/chain/import_test.exs
  4. 106
      apps/indexer/lib/indexer/replaced_transaction/fetcher.ex

@ -1207,6 +1207,38 @@ defmodule Explorer.Chain do
Repo.stream_reduce(query, initial, reducer) Repo.stream_reduce(query, initial, reducer)
end end
@spec stream_mined_transactions(
fields :: [
:block_hash
| :internal_transactions_indexed_at
| :created_contract_code_indexed_at
| :from_address_hash
| :gas
| :gas_price
| :hash
| :index
| :input
| :nonce
| :r
| :s
| :to_address_hash
| :v
| :value
],
initial :: accumulator,
reducer :: (entry :: term(), accumulator -> accumulator)
) :: {:ok, accumulator}
when accumulator: term()
def stream_mined_transactions(fields, initial, reducer) when is_function(reducer, 2) do
query =
from(t in Transaction,
where: not is_nil(t.block_hash) and not is_nil(t.nonce) and not is_nil(t.from_address_hash),
select: ^fields
)
Repo.stream_reduce(query, initial, reducer)
end
@doc """ @doc """
Returns a stream of all `t:Explorer.Chain.Block.t/0` `hash`es that are marked as unfetched in Returns a stream of all `t:Explorer.Chain.Block.t/0` `hash`es that are marked as unfetched in
`t:Explorer.Chain.Block.SecondDegreeRelation.t/0`. `t:Explorer.Chain.Block.SecondDegreeRelation.t/0`.
@ -2131,6 +2163,41 @@ defmodule Explorer.Chain do
|> Repo.all() |> Repo.all()
end end
@spec update_replaced_transactions([
%{
required(:nonce) => non_neg_integer,
required(:from_address_hash) => Hash.Address.t(),
required(:block_hash) => Hash.Full.t()
}
]) :: {integer(), nil | [term()]}
def update_replaced_transactions(transactions, timeout \\ :infinity) do
filters =
transactions
|> Enum.filter(fn transaction ->
transaction.block_hash && transaction.nonce && transaction.from_address_hash
end)
|> Enum.map(fn transaction ->
{transaction.nonce, transaction.from_address_hash}
end)
|> Enum.uniq()
if Enum.empty?(filters) do
{:ok, []}
else
query =
filters
|> Enum.reduce(from(t in Transaction, where: is_nil(t.block_hash)), fn {nonce, from_address}, query ->
from(t in query,
or_where: t.nonce == ^nonce and t.from_address_hash == ^from_address and is_nil(t.block_hash)
)
end)
update_query = from(t in query, update: [set: [status: ^:error, error: "dropped/replaced"]])
Repo.update_all(update_query, [], timeout: timeout)
end
end
@doc """ @doc """
Update a new `t:Token.t/0` record. Update a new `t:Token.t/0` record.

@ -42,17 +42,9 @@ defmodule Explorer.Chain.Import.Runner.Transactions do
|> Map.put(:timestamps, timestamps) |> Map.put(:timestamps, timestamps)
|> Map.put(:token_transfer_transaction_hash_set, token_transfer_transaction_hash_set(options)) |> Map.put(:token_transfer_transaction_hash_set, token_transfer_transaction_hash_set(options))
transactions_timeout = options[option_key()][:timeout] || timeout() Multi.run(multi, :transactions, fn repo, _ ->
update_transactions_options = %{timeout: transactions_timeout}
multi
|> Multi.run(:transactions, fn repo, _ ->
insert(repo, changes_list, insert_options) insert(repo, changes_list, insert_options)
end) end)
|> Multi.run(:replaced_transactions, fn repo, _ ->
update_replaced_transactions(repo, changes_list, update_transactions_options)
end)
end end
@impl Import.Runner @impl Import.Runner
@ -186,40 +178,4 @@ defmodule Explorer.Chain.Import.Runner.Transactions do
end end
defp put_internal_transactions_indexed_at?(_, _), do: false defp put_internal_transactions_indexed_at?(_, _), do: false
defp update_replaced_transactions(repo, transactions, %{timeout: timeout}) do
filters =
transactions
|> Enum.filter(fn transaction ->
Map.get(transaction, :block_hash) && Map.get(transaction, :block_number) && Map.get(transaction, :nonce) &&
Map.get(transaction, :from_address_hash)
end)
|> Enum.map(fn transaction ->
{transaction.nonce, transaction.from_address_hash}
end)
|> Enum.uniq()
if Enum.empty?(filters) do
{:ok, []}
else
query =
filters
|> Enum.reduce(from(t in Transaction, where: is_nil(t.block_hash)), fn {nonce, from_address}, query ->
from(t in query,
or_where: t.nonce == ^nonce and t.from_address_hash == ^from_address and is_nil(t.block_hash)
)
end)
update_query = from(t in query, update: [set: [status: ^:error, error: "dropped/replaced"]])
try do
{_, result} = repo.update_all(update_query, [], timeout: timeout)
{:ok, result}
rescue
postgrex_error in Postgrex.Error ->
{:error, %{exception: postgrex_error, query: query}}
end
end
end
end end

@ -1035,92 +1035,6 @@ defmodule Explorer.Chain.ImportTest do
Repo.get(Transaction, "0xab349efbe1ddc6d85d84a993aa52bdaadce66e8ee166dd10013ce3f2a94ca724") Repo.get(Transaction, "0xab349efbe1ddc6d85d84a993aa52bdaadce66e8ee166dd10013ce3f2a94ca724")
end end
test "updates replaced transaction's status and error message" do
replaced_transaction_hash = "0x2a263224a95275d77bc30a7e131bc64d948777946a790c0915ab293791fbcb61"
address = insert(:address, hash: "0xb7cffe2ac19b9d5705a24cbe14fef5663af905a6")
insert(:transaction,
from_address: address,
nonce: 1,
block_hash: nil,
index: nil,
block_number: nil,
hash: replaced_transaction_hash
)
assert {:ok, _} =
Import.all(%{
addresses: %{
params: [
%{hash: "0x1c0fa194a9d3b44313dcd849f3c6be6ad270a0a4"},
%{hash: "0x679ed2245eba484021c2d3f4d174fb2bb2bd0e49"},
%{hash: "0xb7cffe2ac19b9d5705a24cbe14fef5663af905a6"},
%{hash: "0xfa52274dd61e1643d2205169732f29114bc240b3"}
]
},
address_coin_balances: %{
params: [
%{
address_hash: "0xb7cffe2ac19b9d5705a24cbe14fef5663af905a6",
block_number: 6_535_159
}
]
},
blocks: %{
params: [
%{
consensus: true,
difficulty: 242_354_495_292_210,
gas_limit: 4_703_218,
gas_used: 1_009_480,
hash: "0x1f8cde8bd326702c49e065d56b08bdc82caa0c4820d914e27026c9c68ca1cf09",
miner_hash: "0x1c0fa194a9d3b44313dcd849f3c6be6ad270a0a4",
nonce: "0xafa5fc5c07f55ba5",
number: 6_535_159,
parent_hash: "0xd2cf6cf7a3d5455f450a2a3701995a7ad51f12010674883a6690cee337f75ffa",
size: 4052,
timestamp: DateTime.from_iso8601("2018-09-10 21:34:39Z") |> elem(1),
total_difficulty: 415_641_295_487_918_824_165
}
]
},
broadcast: false,
transactions: %{
params: [
%{
block_hash: "0x1f8cde8bd326702c49e065d56b08bdc82caa0c4820d914e27026c9c68ca1cf09",
block_number: 6_535_159,
cumulative_gas_used: 978_227,
from_address_hash: "0xb7cffe2ac19b9d5705a24cbe14fef5663af905a6",
gas: 978_227,
gas_price: 99_000_000_000,
gas_used: 978_227,
hash: "0x1a263224a95275d77bc30a7e131bc64d948777946a790c0915ab293791fbcb61",
index: 0,
input: "0x",
nonce: 1,
r:
33_589_694_337_999_451_585_110_289_972_555_130_664_768_096_048_542_148_916_928_040_955_524_640_045_158,
s:
42_310_749_137_599_445_408_044_732_541_966_181_996_695_356_587_068_481_874_121_265_172_051_825_560_665,
status: nil,
to_address_hash: nil,
transaction_hash: "0x1a263224a95275d77bc30a7e131bc64d948777946a790c0915ab293791fbcb61",
transaction_index: 0,
v: 158,
value: 0
}
]
}
})
assert %Transaction{status: nil, error: nil} =
Repo.get(Transaction, "0x1a263224a95275d77bc30a7e131bc64d948777946a790c0915ab293791fbcb61")
assert %Transaction{status: :error, error: "dropped/replaced"} = Repo.get(Transaction, replaced_transaction_hash)
end
test "uncles record their transaction indexes in transactions_forks" do test "uncles record their transaction indexes in transactions_forks" do
miner_hash = address_hash() miner_hash = address_hash()
from_address_hash = address_hash() from_address_hash = address_hash()

@ -0,0 +1,106 @@
defmodule Indexer.ReplacedTransaction.Fetcher do
use Spandex.Decorators
require Logger
alias Explorer.Chain
alias Explorer.Chain.Hash
alias Indexer.{BufferedTask, Tracer}
@behaviour BufferedTask
@max_batch_size 10
@max_concurrency 4
@defaults [
flush_interval: :timer.seconds(3),
max_concurrency: @max_concurrency,
max_batch_size: @max_batch_size,
task_supervisor: Indexer.ReplacedTransaction.TaskSupervisor,
metadata: [fetcher: :replaced_transaction]
]
@spec async_fetch([
%{
required(:nonce) => non_neg_integer,
required(:from_address_hash) => Hash.Address.t(),
required(:block_hash) => Hash.Full.t()
}
]) :: :ok
def async_fetch(transactions_fields, timeout \\ 5000) when is_list(transactions_fields) do
entries = Enum.map(transactions_fields, &entry/1)
BufferedTask.buffer(__MODULE__, entries, timeout)
end
@doc false
def child_spec([init_options, gen_server_options]) do
merged_init_opts =
@defaults
|> Keyword.merge(init_options)
Supervisor.child_spec({BufferedTask, [{__MODULE__, merged_init_opts}, gen_server_options]}, id: __MODULE__)
end
@impl BufferedTask
def init(initial, reducer, _) do
{:ok, final} =
Chain.stream_mined_transactions(
[:block_hash, :nonce, :from_address_hash],
initial,
fn transaction_fields, acc ->
transaction_fields
|> entry()
|> reducer.(acc)
end
)
final
end
defp entry(%{
block_hash: %Hash{bytes: block_hash_bytes},
nonce: nonce,
from_address_hash: %Hash{bytes: from_address_hash_bytes}
})
when is_integer(nonce) do
{block_hash_bytes, nonce, from_address_hash_bytes}
end
defp params({block_hash_bytes, nonce, from_address_hash_bytes})
when is_integer(nonce) do
{:ok, from_address_hash} = Hash.Address.cast(from_address_hash_bytes)
{:ok, block_hash} = Hash.Full.cast(block_hash_bytes)
%{nonce: nonce, from_address_hash: from_address_hash, block_hash: block_hash}
end
@impl BufferedTask
@decorate trace(
name: "fetch",
resource: "Indexer.ReplacedTransaction.Fetcher.run/2",
service: :indexer,
tracer: Tracer
)
def run(entries, _) do
Logger.debug("fetching replaced transactions for transactions")
try do
{:ok, _} =
entries
|> Enum.map(&params/1)
|> Chain.update_replaced_transactions()
:ok
rescue
reason ->
Logger.error(fn ->
[
"failed to update replaced transactions for transactions: ",
inspect(reason)
]
end)
{:retry, entries}
end
end
end
Loading…
Cancel
Save