diff --git a/apps/indexer/lib/indexer/block/catchup/fetcher.ex b/apps/indexer/lib/indexer/block/catchup/fetcher.ex index 47315bb189..b387468f63 100644 --- a/apps/indexer/lib/indexer/block/catchup/fetcher.ex +++ b/apps/indexer/lib/indexer/block/catchup/fetcher.ex @@ -120,6 +120,7 @@ defmodule Indexer.Block.Catchup.Fetcher do defp async_import_remaining_block_data( %{ + block_second_degree_relations: block_second_degree_relations, transactions: transaction_hashes, addresses: address_hashes, tokens: tokens, @@ -149,6 +150,10 @@ defmodule Indexer.Block.Catchup.Fetcher do |> Token.Fetcher.async_fetch() TokenBalance.Fetcher.async_fetch(token_balances) + + block_second_degree_relations + |> Enum.map(& &1.uncle_hash) + |> Block.Uncle.Fetcher.async_fetch_blocks() end defp stream_fetch_and_import(%__MODULE__{blocks_concurrency: blocks_concurrency} = state, sequence) diff --git a/apps/indexer/lib/indexer/block/fetcher.ex b/apps/indexer/lib/indexer/block/fetcher.ex index 7d942d7f4d..312b76cc47 100644 --- a/apps/indexer/lib/indexer/block/fetcher.ex +++ b/apps/indexer/lib/indexer/block/fetcher.ex @@ -92,7 +92,11 @@ defmodule Indexer.Block.Fetcher do when broadcast in ~w(true false)a and callback_module != nil do with {:blocks, {:ok, next, result}} <- {:blocks, EthereumJSONRPC.fetch_blocks_by_range(range, json_rpc_named_arguments)}, - %{blocks: blocks, transactions: transactions_without_receipts, block_second_degree_relations: block_second_degree_relations} = result, + %{ + blocks: blocks, + transactions: transactions_without_receipts, + block_second_degree_relations: block_second_degree_relations + } = result, {:receipts, {:ok, receipt_params}} <- {:receipts, Receipts.fetch(state, transactions_without_receipts)}, %{logs: logs, receipts: receipts} = receipt_params, transactions_with_receipts = Receipts.put(transactions_without_receipts, receipts), @@ -112,10 +116,9 @@ defmodule Indexer.Block.Fetcher do }), token_balances = TokenBalances.params_set(%{token_transfers_params: token_transfers}), {:ok, inserted} <- - import_range( + __MODULE__.import( state, %{ - range: range, addresses: %{params: addresses}, balances: %{params: coin_balances_params_set}, token_balances: %{params: token_balances}, @@ -136,11 +139,11 @@ defmodule Indexer.Block.Fetcher do end end - defp import_range( - %__MODULE__{broadcast: broadcast, callback_module: callback_module} = state, - options - ) - when is_map(options) do + def import( + %__MODULE__{broadcast: broadcast, callback_module: callback_module} = state, + options + ) + when is_map(options) do {address_hash_to_fetched_balance_block_number, import_options} = pop_address_hash_to_fetched_balance_block_number(options) diff --git a/apps/indexer/lib/indexer/block/realtime/fetcher.ex b/apps/indexer/lib/indexer/block/realtime/fetcher.ex index c2d0633a1e..aac96a77e8 100644 --- a/apps/indexer/lib/indexer/block/realtime/fetcher.ex +++ b/apps/indexer/lib/indexer/block/realtime/fetcher.ex @@ -173,10 +173,14 @@ defmodule Indexer.Block.Realtime.Fetcher do end end - defp async_import_remaining_block_data(%{tokens: tokens}) do + defp async_import_remaining_block_data(%{block_second_degree_relations: block_second_degree_relations, tokens: tokens}) do tokens |> Enum.map(& &1.contract_address_hash) |> Token.Fetcher.async_fetch() + + block_second_degree_relations + |> Enum.map(& &1.uncle_hash) + |> Block.Uncle.Fetcher.async_fetch_blocks() end defp internal_transactions( diff --git a/apps/indexer/lib/indexer/block/supervisor.ex b/apps/indexer/lib/indexer/block/supervisor.ex index fcb8245ecd..a9c2ab8728 100644 --- a/apps/indexer/lib/indexer/block/supervisor.ex +++ b/apps/indexer/lib/indexer/block/supervisor.ex @@ -4,7 +4,7 @@ defmodule Indexer.Block.Supervisor do """ alias Indexer.Block - alias Indexer.Block.{Catchup, Realtime} + alias Indexer.Block.{Catchup, Realtime, Uncle} use Supervisor @@ -27,7 +27,8 @@ defmodule Indexer.Block.Supervisor do [ %{block_fetcher: block_fetcher, subscribe_named_arguments: subscribe_named_arguments}, [name: Realtime.Supervisor] - ]} + ]}, + {Uncle.Supervisor, [[block_fetcher: block_fetcher], [name: Uncle.Supervisor]]} ], strategy: :one_for_one ) diff --git a/apps/indexer/lib/indexer/block/uncle/fetcher.ex b/apps/indexer/lib/indexer/block/uncle/fetcher.ex new file mode 100644 index 0000000000..032679b12c --- /dev/null +++ b/apps/indexer/lib/indexer/block/uncle/fetcher.ex @@ -0,0 +1,154 @@ +defmodule Indexer.Block.Uncle.Fetcher do + @moduledoc """ + Fetches `t:Explorer.Chain.Block.t/0` by `hash` and updates `t:Explorer.Chain.Block.SecondDegreeRelation.t/0` + `uncle_fetched_at` where the `uncle_hash` matches `hash`. + """ + + require Logger + + alias Explorer.Chain + alias Explorer.Chain.Hash + alias Indexer.{AddressExtraction, Block, BufferedTask} + + @behaviour Block.Fetcher + @behaviour BufferedTask + + @defaults [ + flush_interval: :timer.seconds(3), + max_batch_size: 10, + max_concurrency: 10, + init_chunk_size: 1000, + task_supervisor: Indexer.Block.Uncle.TaskSupervisor + ] + + @doc """ + Asynchronously fetches `t:Explorer.Chain.Block.t/0` for the given `hashes` and updates + `t:Explorer.Chain.Block.SecondDegreeRelation.t/0` `block_fetched_at`. + """ + @spec async_fetch_blocks([Hash.Full.t()]) :: :ok + def async_fetch_blocks(block_hashes) when is_list(block_hashes) do + BufferedTask.buffer( + __MODULE__, + block_hashes + |> Enum.map(&to_string/1) + |> Enum.uniq() + ) + end + + @doc false + def child_spec([init_options, gen_server_options]) when is_list(init_options) do + {state, mergeable_init_options} = Keyword.pop(init_options, :block_fetcher) + + unless state do + raise ArgumentError, + ":json_rpc_named_arguments must be provided to `#{__MODULE__}.child_spec " <> + "to allow for json_rpc calls when running." + end + + merged_init_options = + @defaults + |> Keyword.merge(mergeable_init_options) + |> Keyword.put(:state, %Block.Fetcher{state | broadcast: false, callback_module: __MODULE__}) + + Supervisor.child_spec({BufferedTask, [{__MODULE__, merged_init_options}, gen_server_options]}, id: __MODULE__) + end + + @impl BufferedTask + def init(initial, reducer, _) do + {:ok, final} = + Chain.stream_unfetched_uncle_hashes(initial, fn uncle_hash, acc -> + uncle_hash + |> to_string() + |> reducer.(acc) + end) + + final + end + + @impl BufferedTask + def run(hashes, _retries, %Block.Fetcher{json_rpc_named_arguments: json_rpc_named_arguments} = block_fetcher) do + # the same block could be included as an uncle on multiple blocks, but we only want to fetch it once + unique_hashes = Enum.uniq(hashes) + + Logger.debug(fn -> "fetching #{length(unique_hashes)} uncle blocks" end) + + case EthereumJSONRPC.fetch_blocks_by_hash(unique_hashes, json_rpc_named_arguments) do + {:ok, + %{ + blocks: blocks_params, + transactions: transactions_params, + block_second_degree_relations: block_second_degree_relations_params + }} -> + addresses_params = + AddressExtraction.extract_addresses(%{blocks: blocks_params, transactions: transactions_params}) + + {:ok, _} = + Block.Fetcher.import(block_fetcher, %{ + addresses: %{params: addresses_params}, + blocks: %{params: blocks_params}, + block_second_degree_relations: %{params: block_second_degree_relations_params}, + transactions: %{params: transactions_params, on_conflict: :nothing} + }) + + :ok + + {:error, reason} -> + Logger.debug(fn -> "failed to fetch #{length(unique_hashes)} uncle blocks, #{inspect(reason)}" end) + {:retry, unique_hashes} + end + end + + @impl Block.Fetcher + def import(_, options) when is_map(options) do + with {:ok, %{block_second_degree_relations: block_second_degree_relations}} = ok <- + options + |> uncle_blocks() + |> fork_transactions() + |> Chain.import() do + # * CoinBalance.Fetcher.async_fetch_balances is not called because uncles don't affect balances + # * InternalTransaction.Fetcher.async_fetch is not called because internal transactions are based on transaction + # hash, which is shared with transaction on consensus blocks. + # * Token.Fetcher.async_fetch is not called because the tokens only matter on consensus blocks + # * TokenBalance.Fetcher.async_fetch is not called because it uses block numbers from consensus, not uncles + + block_second_degree_relations + |> Enum.map(& &1.uncle_hash) + |> Block.Uncle.Fetcher.async_fetch_blocks() + + ok + end + end + + defp uncle_blocks(chain_import_options) do + put_in(chain_import_options, [:blocks, :params, Access.all(), :consensus], false) + end + + defp fork_transactions(chain_import_options) do + transactions_params = chain_import_options[:transactions][:params] || [] + + chain_import_options + |> put_in([:transactions, :params], forked_transactions_params(transactions_params)) + |> put_in([Access.key(:transaction_forks, %{}), :params], transaction_forks_params(transactions_params)) + end + + defp forked_transactions_params(transactions_params) do + # With no block_hash, there will be a collision for the same hash when a transaction is used in more than 1 uncle, + # so use MapSet to prevent duplicate row errors. + MapSet.new(transactions_params, fn transaction_params -> + Map.merge(transaction_params, %{ + block_hash: nil, + block_number: nil, + index: nil, + gas_used: nil, + cumulative_gas_used: nil, + status: nil + }) + end) + end + + defp transaction_forks_params(transactions_params) do + Enum.map(transactions_params, fn %{block_hash: uncle_hash, index: index, hash: hash} -> + %{uncle_hash: uncle_hash, index: index, hash: hash} + end) + end +end diff --git a/apps/indexer/lib/indexer/block/uncle/supervisor.ex b/apps/indexer/lib/indexer/block/uncle/supervisor.ex new file mode 100644 index 0000000000..87daf8bdc6 --- /dev/null +++ b/apps/indexer/lib/indexer/block/uncle/supervisor.ex @@ -0,0 +1,38 @@ +defmodule Indexer.Block.Uncle.Supervisor do + @moduledoc """ + Supervises `Indexer.Block.Uncle.Fetcher`. + """ + + use Supervisor + + alias Indexer.Block.Uncle.Fetcher + + def child_spec([init_arguments]) do + child_spec([init_arguments, []]) + end + + def child_spec([_init_arguments, _gen_server_options] = start_link_arguments) do + default = %{ + id: __MODULE__, + start: {__MODULE__, :start_link, start_link_arguments}, + type: :supervisor + } + + Supervisor.child_spec(default, []) + end + + def start_link(arguments, gen_server_options \\ []) do + Supervisor.start_link(__MODULE__, arguments, gen_server_options) + end + + @impl Supervisor + def init(fetcher_arguments) do + Supervisor.init( + [ + {Task.Supervisor, name: Indexer.Block.Uncle.TaskSupervisor}, + {Fetcher, [fetcher_arguments, [name: Fetcher]]} + ], + strategy: :rest_for_one + ) + end +end diff --git a/apps/indexer/test/indexer/block/catchup/fetcher_test.exs b/apps/indexer/test/indexer/block/catchup/fetcher_test.exs new file mode 100644 index 0000000000..bdcec64fa8 --- /dev/null +++ b/apps/indexer/test/indexer/block/catchup/fetcher_test.exs @@ -0,0 +1,115 @@ +defmodule Indexer.Block.Catchup.FetcherTest do + use EthereumJSONRPC.Case, async: false + use Explorer.DataCase + + import Mox + + alias Indexer.{Block, CoinBalance, InternalTransaction, Token, TokenBalance} + alias Indexer.Block.Catchup.Fetcher + + @moduletag capture_log: true + + # MUST use global mode because we aren't guaranteed to get `start_supervised`'s pid back fast enough to `allow` it to + # use expectations and stubs from test's pid. + setup :set_mox_global + + setup :verify_on_exit! + + setup do + # Uncle don't occur on POA chains, so there's no way to test this using the public addresses, so mox-only testing + %{ + json_rpc_named_arguments: [ + transport: EthereumJSONRPC.Mox, + transport_options: [], + # Which one does not matter, so pick one + variant: EthereumJSONRPC.Parity + ] + } + end + + describe "import/1" do + test "fetches uncles asynchronously", %{json_rpc_named_arguments: json_rpc_named_arguments} do + CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) + InternalTransaction.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) + Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) + TokenBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) + + parent = self() + + pid = + spawn_link(fn -> + receive do + {:"$gen_call", from, {:buffer, uncles}} -> + GenServer.reply(from, :ok) + send(parent, {:uncles, uncles}) + end + end) + + Process.register(pid, Block.Uncle.Fetcher) + + nephew_hash = block_hash() |> to_string() + uncle_hash = block_hash() |> to_string() + miner_hash = address_hash() |> to_string() + block_number = 0 + + assert {:ok, _} = + Fetcher.import(%Block.Fetcher{json_rpc_named_arguments: json_rpc_named_arguments}, %{ + addresses: %{ + params: [ + %{hash: miner_hash} + ] + }, + address_hash_to_fetched_balance_block_number: %{miner_hash => block_number}, + balances: %{ + params: [ + %{ + address_hash: miner_hash, + block_number: block_number + } + ] + }, + blocks: %{ + params: [ + %{ + difficulty: 0, + gas_limit: 21000, + gas_used: 21000, + miner_hash: miner_hash, + nonce: 0, + number: block_number, + parent_hash: + block_hash() + |> to_string(), + size: 0, + timestamp: DateTime.utc_now(), + total_difficulty: 0, + hash: nephew_hash + } + ] + }, + block_second_degree_relations: %{ + params: [ + %{ + nephew_hash: nephew_hash, + uncle_hash: uncle_hash + } + ] + }, + tokens: %{ + params: [], + on_conflict: :nothing + }, + token_balances: %{ + params: [] + }, + transactions: %{ + params: [], + on_conflict: :nothing + }, + transaction_hash_to_block_number: %{} + }) + + assert_receive {:uncles, [^uncle_hash]} + end + end +end diff --git a/apps/indexer/test/indexer/block/realtime_test.exs b/apps/indexer/test/indexer/block/realtime/fetcher_test.exs similarity index 98% rename from apps/indexer/test/indexer/block/realtime_test.exs rename to apps/indexer/test/indexer/block/realtime/fetcher_test.exs index 9b42fc4533..68dbdfb741 100644 --- a/apps/indexer/test/indexer/block/realtime_test.exs +++ b/apps/indexer/test/indexer/block/realtime/fetcher_test.exs @@ -1,4 +1,4 @@ -defmodule Indexer.BlockFetcher.RealtimeTest do +defmodule Indexer.Block.Realtime.FetcherTest do use EthereumJSONRPC.Case, async: false use Explorer.DataCase @@ -7,7 +7,7 @@ defmodule Indexer.BlockFetcher.RealtimeTest do alias Explorer.Chain alias Explorer.Chain.Address alias Indexer.{Sequence, Token} - alias Indexer.Block.Realtime + alias Indexer.Block.{Realtime, Uncle} @moduletag capture_log: true @@ -36,7 +36,7 @@ defmodule Indexer.BlockFetcher.RealtimeTest do %{block_fetcher: block_fetcher, json_rpc_named_arguments: core_json_rpc_named_arguments} end - describe "Indexer.BlockFetcher.stream_import/1" do + describe "Indexer.Block.Fetcher.fetch_and_import_range/1" do @tag :no_geth test "in range with internal transactions", %{ block_fetcher: %Indexer.Block.Fetcher{} = block_fetcher, @@ -47,6 +47,10 @@ defmodule Indexer.BlockFetcher.RealtimeTest do Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) + Uncle.Supervisor.Case.start_supervised!( + block_fetcher: %Indexer.Block.Fetcher{json_rpc_named_arguments: json_rpc_named_arguments} + ) + if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do EthereumJSONRPC.Mox |> expect(:json_rpc, fn [ @@ -122,7 +126,7 @@ defmodule Indexer.BlockFetcher.RealtimeTest do } ], "transactionsRoot" => "0xd7c39a93eafe0bdcbd1324c13dcd674bed8c9fa8adbf8f95bf6a59788985da6f", - "uncles" => [] + "uncles" => ["0xa4ec735cabe1510b5ae081b30f17222580b4588dbec52830529753a688b046cd"] } }, %{ diff --git a/apps/indexer/test/indexer/block/uncle/fetcher_test.exs b/apps/indexer/test/indexer/block/uncle/fetcher_test.exs new file mode 100644 index 0000000000..970f1700a0 --- /dev/null +++ b/apps/indexer/test/indexer/block/uncle/fetcher_test.exs @@ -0,0 +1,126 @@ +defmodule Indexer.Block.Uncle.FetcherTest do + # MUST be `async: false` so that {:shared, pid} is set for connection to allow CoinBalanceFetcher's self-send to have + # connection allowed immediately. + use EthereumJSONRPC.Case, async: false + use Explorer.DataCase + + alias Explorer.Chain + alias Indexer.Block + + import Mox + + @moduletag :capture_log + + # MUST use global mode because we aren't guaranteed to get `start_supervised`'s pid back fast enough to `allow` it to + # use expectations and stubs from test's pid. + setup :set_mox_global + + setup :verify_on_exit! + + setup do + # Uncle don't occur on POA chains, so there's no way to test this using the public addresses, so mox-only testing + %{ + json_rpc_named_arguments: [ + transport: EthereumJSONRPC.Mox, + transport_options: [], + # Which one does not matter, so pick one + variant: EthereumJSONRPC.Parity + ] + } + end + + describe "child_spec/1" do + test "raises ArgumentError is `json_rpc_named_arguments is not provided" do + assert_raise ArgumentError, + ":json_rpc_named_arguments must be provided to `Elixir.Indexer.Block.Uncle.Fetcher.child_spec " <> + "to allow for json_rpc calls when running.", + fn -> + start_supervised({Block.Uncle.Fetcher, [[], []]}) + end + end + end + + describe "init/1" do + test "fetched unfetched uncle hashes", %{json_rpc_named_arguments: json_rpc_named_arguments} do + assert %Chain.Block.SecondDegreeRelation{nephew_hash: nephew_hash, uncle_hash: uncle_hash, uncle: nil} = + :block_second_degree_relation + |> insert() + |> Repo.preload([:nephew, :uncle]) + + uncle_hash_data = to_string(uncle_hash) + uncle_uncle_hash_data = to_string(block_hash()) + + EthereumJSONRPC.Mox + |> expect(:json_rpc, fn [%{method: "eth_getBlockByHash", params: [^uncle_hash_data, true]}], _ -> + number_quantity = "0x0" + + {:ok, + [ + %{ + result: %{ + "author" => "0xe2ac1c6843a33f81ae4935e5ef1277a392990381", + "difficulty" => "0xfffffffffffffffffffffffffffffffe", + "extraData" => "0xd583010a068650617269747986312e32362e32826c69", + "gasLimit" => "0x7a1200", + "gasUsed" => "0x0", + "hash" => uncle_hash_data, + "logsBloom" => "0x", + "miner" => "0xe2ac1c6843a33f81ae4935e5ef1277a392990381", + "number" => number_quantity, + "parentHash" => "0x006edcaa1e6fde822908783bc4ef1ad3675532d542fce53537557391cfe34c3c", + "size" => "0x243", + "timestamp" => "0x5b437f41", + "totalDifficulty" => "0x342337ffffffffffffffffffffffffed8d29bb", + "transactions" => [ + %{ + "blockHash" => uncle_hash_data, + "blockNumber" => number_quantity, + "chainId" => "0x4d", + "condition" => nil, + "creates" => "0xffc87239eb0267bc3ca2cd51d12fbf278e02ccb4", + "from" => "0xe8ddc5c7a2d2f0d7a9798459c0104fdf5e987aca", + "gas" => "0x47b760", + "gasPrice" => "0x174876e800", + "hash" => "0x3a3eb134e6792ce9403ea4188e5e79693de9e4c94e499db132be086400da79e6", + "input" => "0x", + "nonce" => "0x0", + "r" => "0xad3733df250c87556335ffe46c23e34dbaffde93097ef92f52c88632a40f0c75", + "s" => "0x72caddc0371451a58de2ca6ab64e0f586ccdb9465ff54e1c82564940e89291e3", + "standardV" => "0x0", + "to" => nil, + "transactionIndex" => "0x0", + "v" => "0xbd", + "value" => "0x0" + } + ], + "uncles" => [uncle_uncle_hash_data] + } + } + ]} + end) + + Block.Uncle.Supervisor.Case.start_supervised!( + block_fetcher: %Block.Fetcher{json_rpc_named_arguments: json_rpc_named_arguments} + ) + + wait(fn -> + Repo.one!( + from(bsdr in Chain.Block.SecondDegreeRelation, + where: bsdr.nephew_hash == ^nephew_hash and not is_nil(bsdr.uncle_fetched_at) + ) + ) + end) + + refute is_nil(Repo.get(Chain.Block, uncle_hash)) + assert Repo.aggregate(Chain.Transaction.Fork, :count, :hash) == 1 + end + end + + defp wait(producer) do + producer.() + rescue + Ecto.NoResultsError -> + Process.sleep(100) + wait(producer) + end +end diff --git a/apps/indexer/test/support/indexer/block/uncle/supervisor/case.ex b/apps/indexer/test/support/indexer/block/uncle/supervisor/case.ex new file mode 100644 index 0000000000..331ddfebd0 --- /dev/null +++ b/apps/indexer/test/support/indexer/block/uncle/supervisor/case.ex @@ -0,0 +1,18 @@ +defmodule Indexer.Block.Uncle.Supervisor.Case do + alias Indexer.Block + + def start_supervised!(fetcher_arguments \\ []) when is_list(fetcher_arguments) do + merged_fetcher_arguments = + Keyword.merge( + fetcher_arguments, + flush_interval: 50, + init_chunk_size: 1, + max_batch_size: 1, + max_concurrency: 1 + ) + + [merged_fetcher_arguments] + |> Block.Uncle.Supervisor.child_spec() + |> ExUnit.Callbacks.start_supervised!() + end +end