add ReplacedTRansaction fetcher to supervision tree

pull/1370/head
Ayrat Badykov 6 years ago
parent 7704132587
commit a4f2aee213
No known key found for this signature in database
GPG Key ID: B44668E265E9396F
  1. 2
      apps/explorer/lib/explorer/chain.ex
  2. 70
      apps/explorer/test/explorer/chain_test.exs
  3. 9
      apps/indexer/lib/indexer/block/catchup/fetcher.ex
  4. 18
      apps/indexer/lib/indexer/block/fetcher.ex
  5. 10
      apps/indexer/lib/indexer/block/realtime/fetcher.ex
  6. 5
      apps/indexer/lib/indexer/replaced_transaction/fetcher.ex
  7. 39
      apps/indexer/lib/indexer/replaced_transaction/supervisor.ex
  8. 6
      apps/indexer/lib/indexer/shrinkable/supervisor.ex
  9. 3
      apps/indexer/test/indexer/block/fetcher_test.exs
  10. 4
      apps/indexer/test/indexer/block/realtime/fetcher_test.exs
  11. 17
      apps/indexer/test/support/indexer/replaced_transaction/supervisor/case.ex

@ -2186,7 +2186,7 @@ defmodule Explorer.Chain do
else
query =
filters
|> Enum.reduce(from(t in Transaction, where: is_nil(t.block_hash)), fn {nonce, from_address}, query ->
|> Enum.reduce(Transaction, fn {nonce, from_address}, query ->
from(t in query,
or_where: t.nonce == ^nonce and t.from_address_hash == ^from_address and is_nil(t.block_hash)
)

@ -2950,6 +2950,76 @@ defmodule Explorer.ChainTest do
end
end
describe "update_replaced_transactions/2" do
test "update replaced transactions" do
replaced_transaction_hash = "0x2a263224a95275d77bc30a7e131bc64d948777946a790c0915ab293791fbcb61"
address = insert(:address, hash: "0xb7cffe2ac19b9d5705a24cbe14fef5663af905a6")
insert(:transaction,
from_address: address,
nonce: 1,
block_hash: nil,
index: nil,
block_number: nil,
hash: replaced_transaction_hash
)
mined_transaction_hash = "0x1a263224a95275d77bc30a7e131bc64d948777946a790c0915ab293791fbcb61"
block = insert(:block)
mined_transaction =
insert(:transaction,
from_address: address,
nonce: 1,
index: 0,
block_hash: block.hash,
block_number: block.number,
cumulative_gas_used: 1,
gas_used: 1,
hash: mined_transaction_hash
)
second_mined_transaction_hash = "0x3a263224a95275d77bc30a7e131bc64d948777946a790c0915ab293791fbcb61"
block = insert(:block)
insert(:transaction,
from_address: address,
nonce: 1,
index: 0,
block_hash: block.hash,
block_number: block.number,
cumulative_gas_used: 1,
gas_used: 1,
hash: second_mined_transaction_hash
)
{1, _} =
Chain.update_replaced_transactions([
%{
block_hash: mined_transaction.block_hash,
nonce: mined_transaction.nonce,
from_address_hash: mined_transaction.from_address_hash
}
])
replaced_transaction = Repo.get(Transaction, replaced_transaction_hash)
assert replaced_transaction.status == :error
assert replaced_transaction.error == "dropped/replaced"
found_mined_transaction = Repo.get(Transaction, mined_transaction_hash)
assert found_mined_transaction.status == nil
assert found_mined_transaction.error == nil
second_found_replaced_transaction = Repo.get(Transaction, second_mined_transaction_hash)
assert second_found_replaced_transaction.status == nil
assert second_found_replaced_transaction.error == nil
end
end
describe "stream_unfetched_token_balances/2" do
test "executes the given reducer with the query result" do
address = insert(:address, hash: "0xc45e4830dff873cf8b70de2b194d0ddd06ef651e")

@ -8,7 +8,13 @@ defmodule Indexer.Block.Catchup.Fetcher do
require Logger
import Indexer.Block.Fetcher,
only: [async_import_coin_balances: 2, async_import_tokens: 1, async_import_uncles: 1, fetch_and_import_range: 2]
only: [
async_import_coin_balances: 2,
async_import_tokens: 1,
async_import_uncles: 1,
fetch_and_import_range: 2,
async_import_replaced_transactions: 1
]
alias Ecto.Changeset
alias Explorer.Chain
@ -135,6 +141,7 @@ defmodule Indexer.Block.Catchup.Fetcher do
async_import_tokens(imported)
async_import_token_balances(imported)
async_import_uncles(imported)
async_import_replaced_transactions(imported)
end
defp async_import_created_contract_codes(%{transactions: transactions}) do

@ -8,8 +8,8 @@ defmodule Indexer.Block.Fetcher do
require Logger
alias EthereumJSONRPC.{Blocks, FetchedBeneficiaries}
alias Explorer.Chain.{Address, Block, Import}
alias Indexer.{AddressExtraction, CoinBalance, MintTransfer, Token, TokenTransfers, Tracer}
alias Explorer.Chain.{Address, Block, Hash, Import, Transaction}
alias Indexer.{AddressExtraction, CoinBalance, MintTransfer, ReplacedTransaction, Token, TokenTransfers, Tracer}
alias Indexer.Address.{CoinBalances, TokenBalances}
alias Indexer.Block.Fetcher.Receipts
alias Indexer.Block.Transform
@ -200,6 +200,20 @@ defmodule Indexer.Block.Fetcher do
def async_import_uncles(_), do: :ok
def async_import_replaced_transactions(%{transactions: transactions}) do
transactions
|> Enum.flat_map(fn
%Transaction{block_hash: %Hash{} = block_hash, nonce: nonce, from_address_hash: %Hash{} = from_address_hash} ->
[%{block_hash: block_hash, nonce: nonce, from_address_hash: from_address_hash}]
%Transaction{block_hash: nil} ->
[]
end)
|> ReplacedTransaction.Fetcher.async_fetch(10_000)
end
def async_import_replaced_transactions(_), do: :ok
defp fetch_beneficiaries(range, json_rpc_named_arguments) do
result =
with :ignore <- EthereumJSONRPC.fetch_beneficiaries(range, json_rpc_named_arguments) do

@ -10,7 +10,14 @@ defmodule Indexer.Block.Realtime.Fetcher do
require Logger
import EthereumJSONRPC, only: [integer_to_quantity: 1, quantity_to_integer: 1]
import Indexer.Block.Fetcher, only: [async_import_tokens: 1, async_import_uncles: 1, fetch_and_import_range: 2]
import Indexer.Block.Fetcher,
only: [
async_import_tokens: 1,
async_import_uncles: 1,
fetch_and_import_range: 2,
async_import_replaced_transactions: 1
]
alias ABI.TypeDecoder
alias Ecto.Changeset
@ -340,6 +347,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
defp async_import_remaining_block_data(imported) do
async_import_tokens(imported)
async_import_uncles(imported)
async_import_replaced_transactions(imported)
end
defp internal_transactions(

@ -1,4 +1,8 @@
defmodule Indexer.ReplacedTransaction.Fetcher do
@moduledoc """
Finds and updates replaced transactions.
"""
use Spandex.Decorators
require Logger
@ -37,6 +41,7 @@ defmodule Indexer.ReplacedTransaction.Fetcher do
merged_init_opts =
@defaults
|> Keyword.merge(init_options)
|> Keyword.put(:state, {})
Supervisor.child_spec({BufferedTask, [{__MODULE__, merged_init_opts}, gen_server_options]}, id: __MODULE__)
end

@ -0,0 +1,39 @@
defmodule Indexer.ReplacedTransaction.Supervisor do
@moduledoc """
Supervises `Indexer.ReplacedTransaction.Fetcher` and its batch tasks through
`Indexer.ReplacedTransaction.TaskSupervisor`.
"""
use Supervisor
alias Indexer.ReplacedTransaction.Fetcher
def child_spec([init_arguments]) do
child_spec([init_arguments, []])
end
def child_spec([_init_arguments, _gen_server_options] = start_link_arguments) do
default = %{
id: __MODULE__,
start: {__MODULE__, :start_link, start_link_arguments},
type: :supervisor
}
Supervisor.child_spec(default, [])
end
def start_link(arguments, gen_server_options \\ []) do
Supervisor.start_link(__MODULE__, arguments, Keyword.put_new(gen_server_options, :name, __MODULE__))
end
@impl Supervisor
def init(fetcher_arguments) do
Supervisor.init(
[
{Task.Supervisor, name: Indexer.ReplacedTransaction.TaskSupervisor},
{Fetcher, [fetcher_arguments, [name: Fetcher]]}
],
strategy: :one_for_one
)
end
end

@ -11,6 +11,7 @@ defmodule Indexer.Shrinkable.Supervisor do
CoinBalance,
InternalTransaction,
PendingTransaction,
ReplacedTransaction,
Token,
TokenBalance,
TokenTransfer
@ -66,6 +67,11 @@ defmodule Indexer.Shrinkable.Supervisor do
[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor],
[name: Code.Supervisor]
]},
{ReplacedTransaction.Supervisor,
[
[memory_monitor: memory_monitor],
[name: ReplacedTransaction.Supervisor]
]},
{InternalTransaction.Supervisor,
[
[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor],

@ -9,7 +9,7 @@ defmodule Indexer.Block.FetcherTest do
alias Explorer.Chain
alias Explorer.Chain.{Address, Log, Transaction, Wei}
alias Indexer.{CoinBalance, BufferedTask, Code, InternalTransaction, Token, TokenBalance}
alias Indexer.{CoinBalance, BufferedTask, Code, InternalTransaction, ReplacedTransaction, Token, TokenBalance}
alias Indexer.Block.{Fetcher, Uncle}
@moduletag capture_log: true
@ -45,6 +45,7 @@ defmodule Indexer.Block.FetcherTest do
InternalTransaction.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
TokenBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
ReplacedTransaction.Supervisor.Case.start_supervised!()
Uncle.Supervisor.Case.start_supervised!(
block_fetcher: %Fetcher{json_rpc_named_arguments: json_rpc_named_arguments}

@ -6,7 +6,7 @@ defmodule Indexer.Block.Realtime.FetcherTest do
alias Explorer.Chain
alias Explorer.Chain.{Address, Transaction}
alias Indexer.{Sequence, Token, TokenBalance}
alias Indexer.{Sequence, Token, TokenBalance, ReplacedTransaction}
alias Indexer.Block.{Realtime, Uncle}
@moduletag capture_log: true
@ -54,6 +54,8 @@ defmodule Indexer.Block.Realtime.FetcherTest do
block_fetcher: %Indexer.Block.Fetcher{json_rpc_named_arguments: json_rpc_named_arguments}
)
ReplacedTransaction.Supervisor.Case.start_supervised!()
if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do
EthereumJSONRPC.Mox
|> expect(:json_rpc, fn [

@ -0,0 +1,17 @@
defmodule Indexer.ReplacedTransaction.Supervisor.Case do
alias Indexer.ReplacedTransaction
def start_supervised!(fetcher_arguments \\ []) when is_list(fetcher_arguments) do
merged_fetcher_arguments =
Keyword.merge(
fetcher_arguments,
flush_interval: 50,
max_batch_size: 1,
max_concurrency: 1
)
[merged_fetcher_arguments]
|> ReplacedTransaction.Supervisor.child_spec()
|> ExUnit.Callbacks.start_supervised!()
end
end
Loading…
Cancel
Save