From 8135fcafbdd0c719724a70da103f861291020642 Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Tue, 22 Jan 2019 12:05:45 -0600 Subject: [PATCH 01/11] Allow lists of block numbers instead of ranges for beneficiares Simplifies calling as numbers don't need to grouped into ranges when getting numbers from database. --- apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex | 7 +++-- .../lib/ethereum_jsonrpc/parity.ex | 11 ++++--- .../lib/ethereum_jsonrpc/variant.ex | 5 ++- .../test/ethereum_jsonrpc/parity_test.exs | 13 ++++---- .../test/ethereum_jsonrpc_test.exs | 2 +- apps/indexer/lib/indexer/block/fetcher.ex | 5 ++- .../block/uncataloged_rewards/importer.ex | 31 ++++--------------- 7 files changed, 30 insertions(+), 44 deletions(-) diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex index 4addc36998..9b5ea02d35 100644 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex @@ -29,6 +29,7 @@ defmodule EthereumJSONRPC do Block, Blocks, FetchedBalances, + FetchedBeneficiaries, FetchedCodes, Receipts, RequestCoordinator, @@ -229,8 +230,10 @@ defmodule EthereumJSONRPC do @doc """ Fetches block reward contract beneficiaries from variant API. """ - def fetch_beneficiaries(_first.._last = range, json_rpc_named_arguments) do - Keyword.fetch!(json_rpc_named_arguments, :variant).fetch_beneficiaries(range, json_rpc_named_arguments) + @spec fetch_beneficiaries([block_number], json_rpc_named_arguments) :: + {:ok, FetchedBeneficiaries.t()} | {:error, reason :: term} | :ignore + def fetch_beneficiaries(block_numbers, json_rpc_named_arguments) when is_list(block_numbers) do + Keyword.fetch!(json_rpc_named_arguments, :variant).fetch_beneficiaries(block_numbers, json_rpc_named_arguments) end @doc """ diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/parity.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/parity.ex index 6407b2b0ee..e6a6008fd3 100644 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/parity.ex +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/parity.ex @@ -11,10 +11,11 @@ defmodule EthereumJSONRPC.Parity do @behaviour EthereumJSONRPC.Variant @impl EthereumJSONRPC.Variant - def fetch_beneficiaries(_.._ = block_range, json_rpc_named_arguments) when is_list(json_rpc_named_arguments) do + def fetch_beneficiaries(block_numbers, json_rpc_named_arguments) + when is_list(block_numbers) and is_list(json_rpc_named_arguments) do id_to_params = - block_range - |> block_range_to_params_list() + block_numbers + |> block_numbers_to_params_list() |> id_to_params() with {:ok, responses} <- @@ -63,8 +64,8 @@ defmodule EthereumJSONRPC.Parity do end end - defp block_range_to_params_list(_.._ = block_range) do - Enum.map(block_range, &%{block_quantity: integer_to_quantity(&1)}) + defp block_numbers_to_params_list(block_numbers) when is_list(block_numbers) do + Enum.map(block_numbers, &%{block_quantity: integer_to_quantity(&1)}) end defp trace_replay_transaction_responses_to_internal_transactions_params(responses, id_to_params) diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/variant.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/variant.ex index 1355118a1f..afbcfba8e1 100644 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/variant.ex +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/variant.ex @@ -14,8 +14,7 @@ defmodule EthereumJSONRPC.Variant do @type internal_transaction_params :: map() @doc """ - Fetch the block reward contract beneficiaries for a given block - range, from the variant of the Ethereum JSONRPC API. + Fetch the block reward contract beneficiaries for a given blocks from the variant of the Ethereum JSONRPC API. For more information on block reward contracts see: https://wiki.parity.io/Block-Reward-Contract.html @@ -26,7 +25,7 @@ defmodule EthereumJSONRPC.Variant do * `{:error, reason}` - there was an error at the transport level * `:ignore` - the variant does not support fetching beneficiaries """ - @callback fetch_beneficiaries(Range.t(), EthereumJSONRPC.json_rpc_named_arguments()) :: + @callback fetch_beneficiaries([EthereumJSONRPC.block_number()], EthereumJSONRPC.json_rpc_named_arguments()) :: {:ok, FetchedBeneficiaries.t()} | {:error, reason :: term} | :ignore @doc """ diff --git a/apps/ethereum_jsonrpc/test/ethereum_jsonrpc/parity_test.exs b/apps/ethereum_jsonrpc/test/ethereum_jsonrpc/parity_test.exs index fe52efa6a6..76e8195241 100644 --- a/apps/ethereum_jsonrpc/test/ethereum_jsonrpc/parity_test.exs +++ b/apps/ethereum_jsonrpc/test/ethereum_jsonrpc/parity_test.exs @@ -291,7 +291,7 @@ defmodule EthereumJSONRPC.ParityTest do end assert {:ok, %FetchedBeneficiaries{params_set: params_set}} = - EthereumJSONRPC.Parity.fetch_beneficiaries(5_080_887..5_080_887, json_rpc_named_arguments) + EthereumJSONRPC.Parity.fetch_beneficiaries([5_080_887], json_rpc_named_arguments) assert Enum.count(params_set) == 2 @@ -365,7 +365,7 @@ defmodule EthereumJSONRPC.ParityTest do end assert {:ok, %FetchedBeneficiaries{params_set: params_set, errors: []}} = - EthereumJSONRPC.Parity.fetch_beneficiaries(5_609_295..5_609_295, json_rpc_named_arguments) + EthereumJSONRPC.Parity.fetch_beneficiaries([5_609_295], json_rpc_named_arguments) assert Enum.count(params_set) == 2 @@ -396,7 +396,7 @@ defmodule EthereumJSONRPC.ParityTest do end) assert {:ok, %FetchedBeneficiaries{params_set: params_set}} = - EthereumJSONRPC.Parity.fetch_beneficiaries(5_080_887..5_080_887, json_rpc_named_arguments) + EthereumJSONRPC.Parity.fetch_beneficiaries([5_080_887], json_rpc_named_arguments) assert Enum.empty?(params_set) end @@ -473,7 +473,7 @@ defmodule EthereumJSONRPC.ParityTest do end assert {:ok, %FetchedBeneficiaries{params_set: params_set}} = - EthereumJSONRPC.Parity.fetch_beneficiaries(5_077_429..5_077_429, json_rpc_named_arguments) + EthereumJSONRPC.Parity.fetch_beneficiaries([5_077_429], json_rpc_named_arguments) assert Enum.count(params_set) == 2 @@ -594,7 +594,7 @@ defmodule EthereumJSONRPC.ParityTest do assert {:ok, %FetchedBeneficiaries{params_set: params_set}} = EthereumJSONRPC.Parity.fetch_beneficiaries( - block_number1..block_number2, + [block_number1, block_number2], json_rpc_named_arguments ) @@ -641,8 +641,7 @@ defmodule EthereumJSONRPC.ParityTest do {:error, "oops"} end) - assert {:error, "oops"} = - EthereumJSONRPC.Parity.fetch_beneficiaries(5_080_887..5_080_887, json_rpc_named_arguments) + assert {:error, "oops"} = EthereumJSONRPC.Parity.fetch_beneficiaries([5_080_887], json_rpc_named_arguments) end end end diff --git a/apps/ethereum_jsonrpc/test/ethereum_jsonrpc_test.exs b/apps/ethereum_jsonrpc/test/ethereum_jsonrpc_test.exs index 35f0345058..54bd766028 100644 --- a/apps/ethereum_jsonrpc/test/ethereum_jsonrpc_test.exs +++ b/apps/ethereum_jsonrpc/test/ethereum_jsonrpc_test.exs @@ -241,7 +241,7 @@ defmodule EthereumJSONRPCTest do {:ok, []} end) - assert EthereumJSONRPC.fetch_beneficiaries(1..1, json_rpc_named_arguments) == + assert EthereumJSONRPC.fetch_beneficiaries([1], json_rpc_named_arguments) == {:ok, %FetchedBeneficiaries{params_set: MapSet.new(), errors: []}} end end diff --git a/apps/indexer/lib/indexer/block/fetcher.ex b/apps/indexer/lib/indexer/block/fetcher.ex index a43cf561e5..2bbee00fb7 100644 --- a/apps/indexer/lib/indexer/block/fetcher.ex +++ b/apps/indexer/lib/indexer/block/fetcher.ex @@ -202,7 +202,10 @@ defmodule Indexer.Block.Fetcher do defp fetch_beneficiaries(range, json_rpc_named_arguments) do result = - with :ignore <- EthereumJSONRPC.fetch_beneficiaries(range, json_rpc_named_arguments) do + with :ignore <- + range + |> Enum.to_list() + |> EthereumJSONRPC.fetch_beneficiaries(json_rpc_named_arguments) do {:ok, %FetchedBeneficiaries{params_set: MapSet.new()}} end diff --git a/apps/indexer/lib/indexer/block/uncataloged_rewards/importer.ex b/apps/indexer/lib/indexer/block/uncataloged_rewards/importer.ex index 5c98a9eaa3..b4a4669fff 100644 --- a/apps/indexer/lib/indexer/block/uncataloged_rewards/importer.ex +++ b/apps/indexer/lib/indexer/block/uncataloged_rewards/importer.ex @@ -16,10 +16,11 @@ defmodule Indexer.Block.UncatalogedRewards.Importer do @doc """ receives a list of blocks and tries to fetch and insert rewards for them """ - def fetch_and_import_rewards(blocks_batch) do + def fetch_and_import_rewards(blocks) when is_list(blocks) do result = - blocks_batch - |> break_into_chunks_of_block_numbers() + blocks + |> Stream.map(& &1.number) + |> Stream.chunk_every(@chunk_size) |> Enum.reduce([], fn chunk, acc -> chunk |> fetch_beneficiaries() @@ -36,11 +37,9 @@ defmodule Indexer.Block.UncatalogedRewards.Importer do e in RuntimeError -> {:error, %{exception: e}} end - defp fetch_beneficiaries(chunk) do - {chunk_start, chunk_end} = Enum.min_max(chunk) - + defp fetch_beneficiaries(block_numbers) when is_list(block_numbers) do {:ok, %FetchedBeneficiaries{params_set: result}} = - with :ignore <- EthereumJSONRPC.fetch_beneficiaries(chunk_start..chunk_end, json_rpc_named_arguments()) do + with :ignore <- EthereumJSONRPC.fetch_beneficiaries(block_numbers, json_rpc_named_arguments()) do {:ok, %FetchedBeneficiaries{params_set: MapSet.new()}} end @@ -77,24 +76,6 @@ defmodule Indexer.Block.UncatalogedRewards.Importer do end) end - defp break_into_chunks_of_block_numbers(blocks) do - Enum.chunk_while( - blocks, - [], - fn block, acc -> - if (acc == [] || hd(acc) + 1 == block.number) && length(acc) < @chunk_size do - {:cont, [block.number | acc]} - else - {:cont, acc, [block.number]} - end - end, - fn - [] -> {:cont, []} - acc -> {:cont, acc, []} - end - ) - end - defp insert_reward_group([]), do: :empty defp insert_reward_group(rewards) do From 3570436c33c750b921943670b23c3594be8e28c4 Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Tue, 22 Jan 2019 14:51:17 -0600 Subject: [PATCH 02/11] Rename reward functions to match domain names * minted - reward directly from fetch beneficiaries that was minted for the new block. * gas payment - Wei paid to the validator by the transaction signers to cover the gas. --- apps/indexer/lib/indexer/block/fetcher.ex | 16 ++++++------ .../block/uncataloged_rewards/importer.ex | 26 +++++++++---------- 2 files changed, 20 insertions(+), 22 deletions(-) diff --git a/apps/indexer/lib/indexer/block/fetcher.ex b/apps/indexer/lib/indexer/block/fetcher.ex index 2bbee00fb7..feb8ce1b1a 100644 --- a/apps/indexer/lib/indexer/block/fetcher.ex +++ b/apps/indexer/lib/indexer/block/fetcher.ex @@ -126,7 +126,7 @@ defmodule Indexer.Block.Fetcher do transactions_params: transactions_with_receipts } |> CoinBalances.params_set(), - block_rewards <- fetch_block_rewards(beneficiary_params_set, transactions_with_receipts), + beneficiaries_with_gas_payment <- add_gas_payments(beneficiary_params_set, transactions_with_receipts), address_token_balances = TokenBalances.params_set(%{token_transfers_params: token_transfers}), {:ok, inserted} <- __MODULE__.import( @@ -137,7 +137,7 @@ defmodule Indexer.Block.Fetcher do address_token_balances: %{params: address_token_balances}, blocks: %{params: blocks}, block_second_degree_relations: %{params: block_second_degree_relations_params}, - block_rewards: %{params: block_rewards}, + block_rewards: %{params: beneficiaries_with_gas_payment}, logs: %{params: logs}, token_transfers: %{params: token_transfers}, tokens: %{on_conflict: :nothing, params: tokens}, @@ -212,16 +212,16 @@ defmodule Indexer.Block.Fetcher do {:beneficiaries, result} end - defp fetch_block_rewards(beneficiaries, transactions) do + defp add_gas_payments(beneficiaries, transactions) do Enum.map(beneficiaries, fn beneficiary -> case beneficiary.address_type do :validator -> - validation_reward = fetch_validation_reward(beneficiary, transactions) + gas_payment = gas_payment(beneficiary, transactions) - "0x" <> reward_hex = beneficiary.reward - {reward, _} = Integer.parse(reward_hex, 16) + "0x" <> minted_hex = beneficiary.reward + {minted, _} = Integer.parse(minted_hex, 16) - %{beneficiary | reward: reward + validation_reward} + %{beneficiary | reward: minted + gas_payment} _ -> beneficiary @@ -229,7 +229,7 @@ defmodule Indexer.Block.Fetcher do end) end - defp fetch_validation_reward(beneficiary, transactions) do + defp gas_payment(beneficiary, transactions) do transactions |> Stream.filter(fn t -> t.block_number == beneficiary.block_number end) |> Enum.reduce(0, fn t, acc -> acc + t.gas_used * t.gas_price end) diff --git a/apps/indexer/lib/indexer/block/uncataloged_rewards/importer.ex b/apps/indexer/lib/indexer/block/uncataloged_rewards/importer.ex index b4a4669fff..5134faf654 100644 --- a/apps/indexer/lib/indexer/block/uncataloged_rewards/importer.ex +++ b/apps/indexer/lib/indexer/block/uncataloged_rewards/importer.ex @@ -24,7 +24,8 @@ defmodule Indexer.Block.UncatalogedRewards.Importer do |> Enum.reduce([], fn chunk, acc -> chunk |> fetch_beneficiaries() - |> fetch_block_rewards() + |> add_gas_payments() + |> Enum.map(&Reward.changeset(%Reward{}, &1)) |> insert_reward_group() |> case do :empty -> acc @@ -46,26 +47,23 @@ defmodule Indexer.Block.UncatalogedRewards.Importer do result end - defp fetch_block_rewards(beneficiaries) do + defp add_gas_payments(beneficiaries) do Enum.map(beneficiaries, fn beneficiary -> - beneficiary_changes = - case beneficiary.address_type do - :validator -> - validation_reward = fetch_validation_reward(beneficiary) + case beneficiary.address_type do + :validator -> + gas_payment = gas_payment(beneficiary) - {:ok, reward} = Wei.cast(beneficiary.reward) + {:ok, minted} = Wei.cast(beneficiary.reward) - %{beneficiary | reward: Wei.sum(reward, validation_reward)} + %{beneficiary | reward: Wei.sum(minted, gas_payment)} - _ -> - beneficiary - end - - Reward.changeset(%Reward{}, beneficiary_changes) + _ -> + beneficiary + end end) end - defp fetch_validation_reward(beneficiary) do + defp gas_payment(beneficiary) do {:ok, accumulator} = Wei.cast(0) beneficiary.block_number From 01ca7e9f8c5204a83e4723d60b4b66154a35321b Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Tue, 22 Jan 2019 15:08:34 -0600 Subject: [PATCH 03/11] Optimize add_gas_payments --- apps/indexer/lib/indexer/block/fetcher.ex | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/apps/indexer/lib/indexer/block/fetcher.ex b/apps/indexer/lib/indexer/block/fetcher.ex index feb8ce1b1a..f2c62bf5b9 100644 --- a/apps/indexer/lib/indexer/block/fetcher.ex +++ b/apps/indexer/lib/indexer/block/fetcher.ex @@ -213,10 +213,12 @@ defmodule Indexer.Block.Fetcher do end defp add_gas_payments(beneficiaries, transactions) do + transactions_by_block_number = Enum.group_by(transactions, & &1.block_number) + Enum.map(beneficiaries, fn beneficiary -> case beneficiary.address_type do :validator -> - gas_payment = gas_payment(beneficiary, transactions) + gas_payment = gas_payment(beneficiary, transactions_by_block_number) "0x" <> minted_hex = beneficiary.reward {minted, _} = Integer.parse(minted_hex, 16) @@ -229,10 +231,18 @@ defmodule Indexer.Block.Fetcher do end) end - defp gas_payment(beneficiary, transactions) do + defp gas_payment(transactions) when is_list(transactions) do transactions - |> Stream.filter(fn t -> t.block_number == beneficiary.block_number end) - |> Enum.reduce(0, fn t, acc -> acc + t.gas_used * t.gas_price end) + |> Stream.map(&(&1.gas_used * &1.gas_price)) + |> Enum.sum() + end + + defp gas_payment(%{block_number: block_number}, transactions_by_block_number) + when is_map(transactions_by_block_number) do + case Map.fetch(transactions_by_block_number, block_number) do + {:ok, transactions} -> gas_payment(transactions) + :error -> 0 + end end # `fetched_balance_block_number` is needed for the `CoinBalanceFetcher`, but should not be used for `import` because the From ace3abf8df4a7555e1647d23142b9b3b5994ea94 Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Wed, 23 Jan 2019 08:36:42 -0600 Subject: [PATCH 04/11] Don't generate wei^2 from Wei.mult Wei.mult required both parameters to be Wei.t, but that meant it was actually yielding `wei^2` units. Fix the units by requiring the first parameter to be wei and the second to be a unitless integer, which makes more sense for the `gas_price * gas_used` usage as we expect that to be measured in wei and not wei^2. --- apps/explorer/lib/explorer/chain/wei.ex | 14 ++++---- .../explorer/test/explorer/chain/wei_test.exs | 34 +++++++++---------- .../block/uncataloged_rewards/importer.ex | 9 ++--- 3 files changed, 29 insertions(+), 28 deletions(-) diff --git a/apps/explorer/lib/explorer/chain/wei.ex b/apps/explorer/lib/explorer/chain/wei.ex index 8b68002b47..4b12d3deb0 100644 --- a/apps/explorer/lib/explorer/chain/wei.ex +++ b/apps/explorer/lib/explorer/chain/wei.ex @@ -147,18 +147,18 @@ defmodule Explorer.Chain.Wei do end @doc """ - Multiplies two Wei values. + Multiplies Wei values by an `t:integer/0`. ## Example - iex> first = %Explorer.Chain.Wei{value: Decimal.new(10)} - iex> second = %Explorer.Chain.Wei{value: Decimal.new(5)} - iex> Explorer.Chain.Wei.mult(first, second) + iex> wei = %Explorer.Chain.Wei{value: Decimal.new(10)} + iex> multiplier = 5 + iex> Explorer.Chain.Wei.mult(wei, multiplier) %Explorer.Chain.Wei{value: Decimal.new(50)} """ - def mult(%Wei{value: wei_1}, %Wei{value: wei_2}) do - wei_1 - |> Decimal.mult(wei_2) + def mult(%Wei{value: value}, multiplier) when is_integer(multiplier) do + value + |> Decimal.mult(multiplier) |> from(:wei) end diff --git a/apps/explorer/test/explorer/chain/wei_test.exs b/apps/explorer/test/explorer/chain/wei_test.exs index 7bb238a4bc..901fe053a4 100644 --- a/apps/explorer/test/explorer/chain/wei_test.exs +++ b/apps/explorer/test/explorer/chain/wei_test.exs @@ -101,33 +101,33 @@ defmodule Explorer.Chain.WeiTest do end end - describe "mult/1" do - test "with one negative parameter return a negative value" do - first = %Explorer.Chain.Wei{value: Decimal.new(123)} - second = %Explorer.Chain.Wei{value: Decimal.new(-1)} + describe "mult/2" do + test "with positive Wei and positive multiplier returns positive Wei" do + wei = %Explorer.Chain.Wei{value: Decimal.new(123)} + multiplier = 100 - assert Explorer.Chain.Wei.mult(first, second) == %Explorer.Chain.Wei{value: Decimal.new(-123)} + assert Explorer.Chain.Wei.mult(wei, multiplier) == %Explorer.Chain.Wei{value: Decimal.new(12300)} end - test "with two negative parameter return positive number" do - first = %Explorer.Chain.Wei{value: Decimal.new(-123)} - second = %Explorer.Chain.Wei{value: Decimal.new(-100)} + test "with positive Wei and negative multiplier returns positive Wei" do + wei = %Explorer.Chain.Wei{value: Decimal.new(123)} + multiplier = -1 - assert Explorer.Chain.Wei.mult(first, second) == %Explorer.Chain.Wei{value: Decimal.new(12300)} + assert Explorer.Chain.Wei.mult(wei, multiplier) == %Explorer.Chain.Wei{value: Decimal.new(-123)} end - test "with two positive parameters return a positive number" do - first = %Explorer.Chain.Wei{value: Decimal.new(123)} - second = %Explorer.Chain.Wei{value: Decimal.new(100)} + test "with negative Wei and positive multiplier returns negative Wei" do + wei = %Explorer.Chain.Wei{value: Decimal.new(-123)} + multiplier = 100 - assert Explorer.Chain.Wei.mult(first, second) == %Explorer.Chain.Wei{value: Decimal.new(12300)} + assert Explorer.Chain.Wei.mult(wei, multiplier) == %Explorer.Chain.Wei{value: Decimal.new(-12300)} end - test "the order of the paramete matters not" do - first = %Explorer.Chain.Wei{value: Decimal.new(123)} - second = %Explorer.Chain.Wei{value: Decimal.new(-10)} + test "with negative Wei and negative multiplier returns positive Wei" do + wei = %Explorer.Chain.Wei{value: Decimal.new(-123)} + multiplier = -100 - assert Explorer.Chain.Wei.mult(first, second) == Explorer.Chain.Wei.mult(second, first) + assert Explorer.Chain.Wei.mult(wei, multiplier) == %Explorer.Chain.Wei{value: Decimal.new(12300)} end end end diff --git a/apps/indexer/lib/indexer/block/uncataloged_rewards/importer.ex b/apps/indexer/lib/indexer/block/uncataloged_rewards/importer.ex index 5134faf654..0378959844 100644 --- a/apps/indexer/lib/indexer/block/uncataloged_rewards/importer.ex +++ b/apps/indexer/lib/indexer/block/uncataloged_rewards/importer.ex @@ -64,13 +64,14 @@ defmodule Indexer.Block.UncatalogedRewards.Importer do end defp gas_payment(beneficiary) do - {:ok, accumulator} = Wei.cast(0) + {:ok, initial} = Wei.cast(0) beneficiary.block_number |> Chain.get_transactions_of_block_number() - |> Enum.reduce(accumulator, fn t, acc -> - {:ok, price_as_wei} = Wei.cast(t.gas_used) - price_as_wei |> Wei.mult(t.gas_price) |> Wei.sum(acc) + |> Enum.reduce(initial, fn %Transaction{gas_price: gas_price, gas_used: gas_used}, acc -> + gas_price + |> Wei.mult(gas_used) + |> Wei.sum(acc) end) end From d7c494435f5c940ac4184f9757b67b47a48ff7c8 Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Wed, 23 Jan 2019 08:40:21 -0600 Subject: [PATCH 05/11] Use alias consistently --- .../explorer/test/explorer/chain/wei_test.exs | 55 ++++++++++--------- 1 file changed, 28 insertions(+), 27 deletions(-) diff --git a/apps/explorer/test/explorer/chain/wei_test.exs b/apps/explorer/test/explorer/chain/wei_test.exs index 901fe053a4..1dd94e53f2 100644 --- a/apps/explorer/test/explorer/chain/wei_test.exs +++ b/apps/explorer/test/explorer/chain/wei_test.exs @@ -1,8 +1,9 @@ defmodule Explorer.Chain.WeiTest do use ExUnit.Case, async: true + alias Explorer.Chain.Wei - doctest Explorer.Chain.Wei + doctest Wei describe "cast/1" do test "with hex string" do @@ -57,77 +58,77 @@ defmodule Explorer.Chain.WeiTest do describe "sum/1" do test "with two positive values return the sum of them" do - first = %Explorer.Chain.Wei{value: Decimal.new(123)} - second = %Explorer.Chain.Wei{value: Decimal.new(1_000)} + first = %Wei{value: Decimal.new(123)} + second = %Wei{value: Decimal.new(1_000)} - assert Explorer.Chain.Wei.sum(first, second) == %Explorer.Chain.Wei{value: Decimal.new(1_123)} + assert Wei.sum(first, second) == %Wei{value: Decimal.new(1_123)} end test "with a positive and a negative value return the positive minus the negative's absolute" do - first = %Explorer.Chain.Wei{value: Decimal.new(123)} - second = %Explorer.Chain.Wei{value: Decimal.new(-100)} + first = %Wei{value: Decimal.new(123)} + second = %Wei{value: Decimal.new(-100)} - assert Explorer.Chain.Wei.sum(first, second) == %Explorer.Chain.Wei{value: Decimal.new(23)} + assert Wei.sum(first, second) == %Wei{value: Decimal.new(23)} end end describe "sub/1" do test "with a negative second parameter return the sum of the absolute values" do - first = %Explorer.Chain.Wei{value: Decimal.new(123)} - second = %Explorer.Chain.Wei{value: Decimal.new(-100)} + first = %Wei{value: Decimal.new(123)} + second = %Wei{value: Decimal.new(-100)} - assert Explorer.Chain.Wei.sub(first, second) == %Explorer.Chain.Wei{value: Decimal.new(223)} + assert Wei.sub(first, second) == %Wei{value: Decimal.new(223)} end test "with a negative first parameter return the negative of the sum of the absolute values" do - first = %Explorer.Chain.Wei{value: Decimal.new(-123)} - second = %Explorer.Chain.Wei{value: Decimal.new(100)} + first = %Wei{value: Decimal.new(-123)} + second = %Wei{value: Decimal.new(100)} - assert Explorer.Chain.Wei.sub(first, second) == %Explorer.Chain.Wei{value: Decimal.new(-223)} + assert Wei.sub(first, second) == %Wei{value: Decimal.new(-223)} end test "with a larger first parameter return a positive number" do - first = %Explorer.Chain.Wei{value: Decimal.new(123)} - second = %Explorer.Chain.Wei{value: Decimal.new(100)} + first = %Wei{value: Decimal.new(123)} + second = %Wei{value: Decimal.new(100)} - assert Explorer.Chain.Wei.sub(first, second) == %Explorer.Chain.Wei{value: Decimal.new(23)} + assert Wei.sub(first, second) == %Wei{value: Decimal.new(23)} end test "with a larger second parameter return a negative number" do - first = %Explorer.Chain.Wei{value: Decimal.new(23)} - second = %Explorer.Chain.Wei{value: Decimal.new(100)} + first = %Wei{value: Decimal.new(23)} + second = %Wei{value: Decimal.new(100)} - assert Explorer.Chain.Wei.sub(first, second) == %Explorer.Chain.Wei{value: Decimal.new(-77)} + assert Wei.sub(first, second) == %Wei{value: Decimal.new(-77)} end end describe "mult/2" do test "with positive Wei and positive multiplier returns positive Wei" do - wei = %Explorer.Chain.Wei{value: Decimal.new(123)} + wei = %Wei{value: Decimal.new(123)} multiplier = 100 - assert Explorer.Chain.Wei.mult(wei, multiplier) == %Explorer.Chain.Wei{value: Decimal.new(12300)} + assert Wei.mult(wei, multiplier) == %Wei{value: Decimal.new(12300)} end test "with positive Wei and negative multiplier returns positive Wei" do - wei = %Explorer.Chain.Wei{value: Decimal.new(123)} + wei = %Wei{value: Decimal.new(123)} multiplier = -1 - assert Explorer.Chain.Wei.mult(wei, multiplier) == %Explorer.Chain.Wei{value: Decimal.new(-123)} + assert Wei.mult(wei, multiplier) == %Wei{value: Decimal.new(-123)} end test "with negative Wei and positive multiplier returns negative Wei" do - wei = %Explorer.Chain.Wei{value: Decimal.new(-123)} + wei = %Wei{value: Decimal.new(-123)} multiplier = 100 - assert Explorer.Chain.Wei.mult(wei, multiplier) == %Explorer.Chain.Wei{value: Decimal.new(-12300)} + assert Wei.mult(wei, multiplier) == %Wei{value: Decimal.new(-12300)} end test "with negative Wei and negative multiplier returns positive Wei" do - wei = %Explorer.Chain.Wei{value: Decimal.new(-123)} + wei = %Wei{value: Decimal.new(-123)} multiplier = -100 - assert Explorer.Chain.Wei.mult(wei, multiplier) == %Explorer.Chain.Wei{value: Decimal.new(12300)} + assert Wei.mult(wei, multiplier) == %Wei{value: Decimal.new(12300)} end end end From 4c634eb9a604b70381e5704707dd7769a62e07e1 Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Wed, 23 Jan 2019 08:46:51 -0600 Subject: [PATCH 06/11] Use Ecto 3 coalesce, sum, +, and * support --- apps/explorer/lib/explorer/chain.ex | 32 +++-------------------------- 1 file changed, 3 insertions(+), 29 deletions(-) diff --git a/apps/explorer/lib/explorer/chain.ex b/apps/explorer/lib/explorer/chain.ex index f9ccc80da2..369ded9168 100644 --- a/apps/explorer/lib/explorer/chain.ex +++ b/apps/explorer/lib/explorer/chain.ex @@ -321,24 +321,6 @@ defmodule Explorer.Chain do Repo.aggregate(Block, :count, :hash) end - @doc !""" - Returns a default value if no value is found. - """ - defmacrop default_if_empty(value, default) do - quote do - fragment("coalesce(?, ?)", unquote(value), unquote(default)) - end - end - - @doc !""" - Sum of the products of two columns. - """ - defmacrop sum_of_products(col_a, col_b) do - quote do - sum(fragment("?*?", unquote(col_a), unquote(col_b))) - end - end - @doc """ Reward for mining a block. @@ -362,20 +344,12 @@ defmodule Explorer.Chain do on: fragment("? <@ ?", block.number, emission_reward.block_range), where: block.number == ^block_number, group_by: emission_reward.reward, - select: %{ - transaction_reward: %Wei{ - value: default_if_empty(sum_of_products(transaction.gas_used, transaction.gas_price), 0) - }, - static_reward: emission_reward.reward + select: %Wei{ + value: coalesce(sum(transaction.gas_used * transaction.gas_price), 0) + emission_reward.reward } ) - %{ - transaction_reward: transaction_reward, - static_reward: static_reward - } = Repo.one(query) - - Wei.sum(transaction_reward, static_reward) + Repo.one!(query) end @doc """ From 00e136e20c0f91cc8df3c3b8a2d8afb3f4db07ca Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Wed, 23 Jan 2019 10:22:01 -0600 Subject: [PATCH 07/11] Calculate gas payments for all blocks in batch in SQL --- apps/explorer/lib/explorer/chain.ex | 21 +++++++++ apps/explorer/test/explorer/chain_test.exs | 46 +++++++++++++++++++ .../block/uncataloged_rewards/importer.ex | 25 ++++------ 3 files changed, 75 insertions(+), 17 deletions(-) diff --git a/apps/explorer/lib/explorer/chain.ex b/apps/explorer/lib/explorer/chain.ex index 369ded9168..b060564aa9 100644 --- a/apps/explorer/lib/explorer/chain.ex +++ b/apps/explorer/lib/explorer/chain.ex @@ -352,6 +352,27 @@ defmodule Explorer.Chain do Repo.one!(query) end + @doc """ + The `t:Explorer.Chain.Wei.t/0` paid to the miners of the `t:Explorer.Chain.Block.t/0`s with `hash` + `Explorer.Chain.Hash.Full.t/0` by the signers of the transactions in those blocks to cover the gas fee + (`gas_used * gas_price`). + """ + @spec gas_payment_by_block_hash([Hash.Full.t()]) :: %{Hash.Full.t() => Wei.t()} + def gas_payment_by_block_hash(block_hashes) when is_list(block_hashes) do + query = + from( + block in Block, + left_join: transaction in assoc(block, :transactions), + where: block.hash in ^block_hashes and block.consensus == true, + group_by: block.hash, + select: {block.hash, %Wei{value: coalesce(sum(transaction.gas_used * transaction.gas_price), 0)}} + ) + + query + |> Repo.all() + |> Enum.into(%{}) + end + @doc """ Finds all `t:Explorer.Chain.Transaction.t/0`s in the `t:Explorer.Chain.Block.t/0`. diff --git a/apps/explorer/test/explorer/chain_test.exs b/apps/explorer/test/explorer/chain_test.exs index 73fbc8bd57..fe951e111d 100644 --- a/apps/explorer/test/explorer/chain_test.exs +++ b/apps/explorer/test/explorer/chain_test.exs @@ -2279,6 +2279,52 @@ defmodule Explorer.ChainTest do end end + describe "gas_payment_by_block_hash/1" do + setup do + number = 1 + + %{consensus_block: insert(:block, number: number, consensus: true), number: number} + end + + test "without consensus block hash has no key", %{consensus_block: consensus_block, number: number} do + non_consensus_block = insert(:block, number: number, consensus: false) + + :transaction + |> insert(gas_price: 1) + |> with_block(consensus_block, gas_used: 1) + + :transaction + |> insert(gas_price: 1) + |> with_block(consensus_block, gas_used: 2) + + assert Chain.gas_payment_by_block_hash([non_consensus_block.hash]) == %{} + end + + test "with consensus block hash without transactions has key with 0 value", %{ + consensus_block: %Block{hash: consensus_block_hash} + } do + assert Chain.gas_payment_by_block_hash([consensus_block_hash]) == %{ + consensus_block_hash => %Wei{value: Decimal.new(0)} + } + end + + test "with consensus block hash with transactions has key with value", %{ + consensus_block: %Block{hash: consensus_block_hash} = consensus_block + } do + :transaction + |> insert(gas_price: 1) + |> with_block(consensus_block, gas_used: 2) + + :transaction + |> insert(gas_price: 3) + |> with_block(consensus_block, gas_used: 4) + + assert Chain.gas_payment_by_block_hash([consensus_block_hash]) == %{ + consensus_block_hash => %Wei{value: Decimal.new(14)} + } + end + end + describe "missing_block_number_ranges/1" do # 0000 test "0..0 without blocks" do diff --git a/apps/indexer/lib/indexer/block/uncataloged_rewards/importer.ex b/apps/indexer/lib/indexer/block/uncataloged_rewards/importer.ex index 0378959844..6f2b0c6dbe 100644 --- a/apps/indexer/lib/indexer/block/uncataloged_rewards/importer.ex +++ b/apps/indexer/lib/indexer/block/uncataloged_rewards/importer.ex @@ -48,13 +48,16 @@ defmodule Indexer.Block.UncatalogedRewards.Importer do end defp add_gas_payments(beneficiaries) do - Enum.map(beneficiaries, fn beneficiary -> - case beneficiary.address_type do - :validator -> - gas_payment = gas_payment(beneficiary) + gas_payment_by_block_hash = + beneficiaries + |> Stream.filter(&(&1.address_type == :validator)) + |> Enum.map(& &1.block_hash) + |> Chain.gas_payment_by_block_hash() + Enum.map(beneficiaries, fn %{block_hash: block_hash} = beneficiary -> + case gas_payment_by_block_hash do + %{^block_hash => gas_payment} -> {:ok, minted} = Wei.cast(beneficiary.reward) - %{beneficiary | reward: Wei.sum(minted, gas_payment)} _ -> @@ -63,18 +66,6 @@ defmodule Indexer.Block.UncatalogedRewards.Importer do end) end - defp gas_payment(beneficiary) do - {:ok, initial} = Wei.cast(0) - - beneficiary.block_number - |> Chain.get_transactions_of_block_number() - |> Enum.reduce(initial, fn %Transaction{gas_price: gas_price, gas_used: gas_used}, acc -> - gas_price - |> Wei.mult(gas_used) - |> Wei.sum(acc) - end) - end - defp insert_reward_group([]), do: :empty defp insert_reward_group(rewards) do From 7098f4b78db34b76fa17ab7aee4904be2dca336d Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Wed, 23 Jan 2019 11:02:01 -0600 Subject: [PATCH 08/11] Use Explorer.Chain.import in Indexer.Block.UncatalogedRewards.Importer Ensures logic is consistent across Catchup, Realtime, and Uncatalogued fetchers. --- .../chain/import/runner/block/rewards.ex | 21 ++++++-- .../block/uncataloged_rewards/importer.ex | 50 ++++++++----------- .../uncataloged_rewards/importer_test.exs | 13 ++--- 3 files changed, 45 insertions(+), 39 deletions(-) diff --git a/apps/explorer/lib/explorer/chain/import/runner/block/rewards.ex b/apps/explorer/lib/explorer/chain/import/runner/block/rewards.ex index 9a4de5464d..3ce55fbf07 100644 --- a/apps/explorer/lib/explorer/chain/import/runner/block/rewards.ex +++ b/apps/explorer/lib/explorer/chain/import/runner/block/rewards.ex @@ -3,6 +3,8 @@ defmodule Explorer.Chain.Import.Runner.Block.Rewards do Bulk imports `t:Explorer.Chain.Block.Reward.t/0`. """ + import Ecto.Query, only: [from: 2] + alias Ecto.{Changeset, Multi, Repo} alias Explorer.Chain.Block.Reward alias Explorer.Chain.Import @@ -42,20 +44,33 @@ defmodule Explorer.Chain.Import.Runner.Block.Rewards do def timeout, do: @timeout @spec insert(Repo.t(), [map()], %{ + optional(:on_conflict) => Import.Runner.on_conflict(), required(:timeout) => timeout, required(:timestamps) => Import.timestamps() }) :: {:ok, [Reward.t()]} | {:error, [Changeset.t()]} - defp insert(repo, changes_list, %{timeout: timeout, timestamps: timestamps}) + defp insert(repo, changes_list, %{timeout: timeout, timestamps: timestamps} = options) when is_list(changes_list) do + on_conflict = Map.get_lazy(options, :on_conflict, &default_on_conflict/0) + + # order so that row ShareLocks are grabbed in a consistent order + ordered_changes_list = Enum.sort_by(changes_list, &{&1.address_hash, &1.address_type, &1.block_hash}) + Import.insert_changes_list( repo, - changes_list, + ordered_changes_list, conflict_target: [:address_hash, :address_type, :block_hash], - on_conflict: :nothing, + on_conflict: on_conflict, for: ecto_schema_module(), returning: true, timeout: timeout, timestamps: timestamps ) end + + defp default_on_conflict do + from(reward in Reward, + update: [set: [reward: fragment("EXCLUDED.reward")]], + where: fragment("EXCLUDED.reward IS DISTINCT FROM ?", reward.reward) + ) + end end diff --git a/apps/indexer/lib/indexer/block/uncataloged_rewards/importer.ex b/apps/indexer/lib/indexer/block/uncataloged_rewards/importer.ex index 6f2b0c6dbe..409bc56701 100644 --- a/apps/indexer/lib/indexer/block/uncataloged_rewards/importer.ex +++ b/apps/indexer/lib/indexer/block/uncataloged_rewards/importer.ex @@ -3,10 +3,9 @@ defmodule Indexer.Block.UncatalogedRewards.Importer do a module to fetch and import the rewards for blocks that were indexed without the reward """ - alias Ecto.Multi alias EthereumJSONRPC.FetchedBeneficiaries alias Explorer.Chain - alias Explorer.Chain.{Block.Reward, Wei} + alias Explorer.Chain.Wei # max number of blocks in a single request # higher numbers may cause the requests to time out @@ -17,34 +16,39 @@ defmodule Indexer.Block.UncatalogedRewards.Importer do receives a list of blocks and tries to fetch and insert rewards for them """ def fetch_and_import_rewards(blocks) when is_list(blocks) do - result = + block_rewards = blocks |> Stream.map(& &1.number) |> Stream.chunk_every(@chunk_size) - |> Enum.reduce([], fn chunk, acc -> - chunk - |> fetch_beneficiaries() - |> add_gas_payments() - |> Enum.map(&Reward.changeset(%Reward{}, &1)) - |> insert_reward_group() - |> case do - :empty -> acc - insert -> [insert | acc] - end - end) + |> Enum.flat_map(&block_numbers_to_rewards/1) - {:ok, result} + {:ok, block_rewards} rescue e in RuntimeError -> {:error, %{exception: e}} end + defp block_numbers_to_rewards(block_numbers) when is_list(block_numbers) do + case fetch_beneficiaries(block_numbers) do + [] -> + [] + + beneficiaries_params -> + beneficiaries_params + |> add_gas_payments() + |> import_block_reward_params() + |> case do + {:ok, %{block_rewards: block_rewards}} -> block_rewards + end + end + end + defp fetch_beneficiaries(block_numbers) when is_list(block_numbers) do {:ok, %FetchedBeneficiaries{params_set: result}} = with :ignore <- EthereumJSONRPC.fetch_beneficiaries(block_numbers, json_rpc_named_arguments()) do {:ok, %FetchedBeneficiaries{params_set: MapSet.new()}} end - result + Enum.sort_by(result, &{&1.address_hash, &1.address_type, &1.block_hash}) end defp add_gas_payments(beneficiaries) do @@ -66,18 +70,8 @@ defmodule Indexer.Block.UncatalogedRewards.Importer do end) end - defp insert_reward_group([]), do: :empty - - defp insert_reward_group(rewards) do - rewards - |> Enum.reduce({Multi.new(), 0}, fn changeset, {multi, index} -> - {Multi.insert(multi, "insert_#{index}", changeset, - conflict_target: ~w(address_hash address_type block_hash), - on_conflict: {:replace, [:reward]} - ), index + 1} - end) - |> elem(0) - |> Explorer.Repo.transaction() + defp import_block_reward_params(block_rewards_params) when is_list(block_rewards_params) do + Chain.import(%{block_rewards: %{params: block_rewards_params}}) end defp json_rpc_named_arguments do diff --git a/apps/indexer/test/indexer/block/uncataloged_rewards/importer_test.exs b/apps/indexer/test/indexer/block/uncataloged_rewards/importer_test.exs index 0c83e797ee..68987d284b 100644 --- a/apps/indexer/test/indexer/block/uncataloged_rewards/importer_test.exs +++ b/apps/indexer/test/indexer/block/uncataloged_rewards/importer_test.exs @@ -49,12 +49,10 @@ defmodule Indexer.Block.UncatalogedRewards.ImporterTest do assert {:ok, [ - ok: %{ - "insert_0" => %Reward{ - address_hash: ^address_hash, - block_hash: ^block_hash, - address_type: :validator - } + %Reward{ + address_hash: ^address_hash, + block_hash: ^block_hash, + address_type: :validator } ]} = Importer.fetch_and_import_rewards([block]) end @@ -96,8 +94,7 @@ defmodule Indexer.Block.UncatalogedRewards.ImporterTest do {:ok, reward} = Wei.cast(value) - assert {:ok, - [ok: %{"insert_0" => %Reward{block_hash: ^block_hash, address_type: ^address_type, reward: ^reward}}]} = + assert {:ok, [%Reward{block_hash: ^block_hash, address_type: ^address_type, reward: ^reward}]} = Importer.fetch_and_import_rewards([block]) end end From 229df065b4c3aa827f4ecfc9b03c7578cf9ce524 Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Wed, 23 Jan 2019 14:22:37 -0600 Subject: [PATCH 09/11] Failing regression tests for #1337 --- .../parity/fetched_beneficiaries.ex | 1 - .../parity/fetched_beneficiaries_test.exs | 2 +- .../indexer/block/catchup/fetcher_test.exs | 102 ++++++++++++++++++ .../uncataloged_rewards/importer_test.exs | 44 ++++++++ 4 files changed, 147 insertions(+), 2 deletions(-) diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/parity/fetched_beneficiaries.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/parity/fetched_beneficiaries.ex index 6ee59474c8..a47c7d6377 100644 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/parity/fetched_beneficiaries.ex +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/parity/fetched_beneficiaries.ex @@ -30,7 +30,6 @@ defmodule EthereumJSONRPC.Parity.FetchedBeneficiaries do ...> }, ...> %{ ...> "action" => %{"author" => "0x2", "rewardType" => "external", "value" => "0x0"}, - ...> "blockHash" => "0x52a8d2185282506ce681364d2aa0c085ba45fdeb5d6c0ddec1131617a71ee2ca", ...> "blockHash" => "0xFFF", ...> "blockNumber" => 12, ...> "result" => nil, diff --git a/apps/ethereum_jsonrpc/test/ethereum_jsonrpc/parity/fetched_beneficiaries_test.exs b/apps/ethereum_jsonrpc/test/ethereum_jsonrpc/parity/fetched_beneficiaries_test.exs index 769432b7f4..5c5cee4871 100644 --- a/apps/ethereum_jsonrpc/test/ethereum_jsonrpc/parity/fetched_beneficiaries_test.exs +++ b/apps/ethereum_jsonrpc/test/ethereum_jsonrpc/parity/fetched_beneficiaries_test.exs @@ -41,7 +41,7 @@ defmodule EthereumJSONRPC.Parity.FetchedBeneficiariesTest do block_number = 1_000 block_quantity = EthereumJSONRPC.integer_to_quantity(block_number) hash1 = "0xef481b4e2c3ed62265617f2e9dfcdf3cf3efc11a" - hash2 = "0x523b6539ff08d72a6c8bb598af95bf50c1ea839c" + hash2 = "0xef481b4e2c3ed62265617f2e9dfcdf3cf3efc11a" reward = "0xde0b6b3a7640000" responses = [ diff --git a/apps/indexer/test/indexer/block/catchup/fetcher_test.exs b/apps/indexer/test/indexer/block/catchup/fetcher_test.exs index 2832fc6dda..4b66cae068 100644 --- a/apps/indexer/test/indexer/block/catchup/fetcher_test.exs +++ b/apps/indexer/test/indexer/block/catchup/fetcher_test.exs @@ -2,6 +2,7 @@ defmodule Indexer.Block.Catchup.FetcherTest do use EthereumJSONRPC.Case, async: false use Explorer.DataCase + import EthereumJSONRPC, only: [integer_to_quantity: 1] import Mox alias Indexer.{Block, CoinBalance, InternalTransaction, Token, TokenBalance} @@ -111,4 +112,105 @@ defmodule Indexer.Block.Catchup.FetcherTest do assert_receive {:uncles, [^uncle_hash]} end end + + describe "task/1" do + test "ignores fetched beneficiaries with different hash for same number", %{ + json_rpc_named_arguments: json_rpc_named_arguments + } do + latest_block_number = 1 + latest_block_quantity = integer_to_quantity(latest_block_number) + + block_number = latest_block_number - 1 + block_hash = block_hash() + block_quantity = integer_to_quantity(block_number) + + miner_hash = address_hash() + miner_hash_data = to_string(miner_hash) + + new_block_hash = block_hash() + + refute block_hash == new_block_hash + + EthereumJSONRPC.Mox + |> expect(:json_rpc, fn %{method: "eth_getBlockByNumber", params: ["latest", false]}, _options -> + {:ok, %{"number" => latest_block_quantity}} + end) + |> expect(:json_rpc, fn [ + %{ + id: id, + jsonrpc: "2.0", + method: "eth_getBlockByNumber", + params: [^block_quantity, true] + } + ], + _options -> + {:ok, + [ + %{ + id: id, + jsonrpc: "2.0", + result: %{ + "hash" => to_string(block_hash), + "number" => block_quantity, + "difficulty" => "0x0", + "gasLimit" => "0x0", + "gasUsed" => "0x0", + "extraData" => "0x0", + "logsBloom" => "0x0", + "miner" => miner_hash_data, + "parentHash" => + block_hash() + |> to_string(), + "receiptsRoot" => "0x0", + "size" => "0x0", + "sha3Uncles" => "0x0", + "stateRoot" => "0x0", + "timestamp" => "0x0", + "totalDifficulty" => "0x0", + "transactions" => [], + "transactionsRoot" => "0x0", + "uncles" => [] + } + } + ]} + end) + |> expect(:json_rpc, fn [%{id: id, jsonrpc: "2.0", method: "trace_block", params: [^block_quantity]}], _options -> + { + :ok, + [ + %{ + id: id, + jsonrpc: "2.0", + result: [ + %{ + "action" => %{ + "author" => miner_hash_data, + "rewardType" => "external", + "value" => "0x0" + }, + "blockHash" => to_string(new_block_hash), + "blockNumber" => block_number, + "result" => nil, + "subtraces" => 0, + "traceAddress" => [], + "transactionHash" => nil, + "transactionPosition" => nil, + "type" => "reward" + } + ] + } + ] + } + end) + + assert {:ok, _} = + Fetcher.task(%Fetcher{ + blocks_batch_size: 1, + block_fetcher: %Block.Fetcher{ + callback_module: Fetcher, + json_rpc_named_arguments: json_rpc_named_arguments + } + }) + end + end end diff --git a/apps/indexer/test/indexer/block/uncataloged_rewards/importer_test.exs b/apps/indexer/test/indexer/block/uncataloged_rewards/importer_test.exs index 68987d284b..282865c5dd 100644 --- a/apps/indexer/test/indexer/block/uncataloged_rewards/importer_test.exs +++ b/apps/indexer/test/indexer/block/uncataloged_rewards/importer_test.exs @@ -97,5 +97,49 @@ defmodule Indexer.Block.UncatalogedRewards.ImporterTest do assert {:ok, [%Reward{block_hash: ^block_hash, address_type: ^address_type, reward: ^reward}]} = Importer.fetch_and_import_rewards([block]) end + + # regression test for https://github.com/poanetwork/blockscout/issues/1337 + test "with different block hash due to consensus switch between database query and trace_block" do + miner = insert(:address) + block = insert(:block, miner: miner) + block_hash = block.hash + address_type = :validator + insert(:reward, block_hash: block_hash, address_hash: miner.hash, address_type: address_type, reward: 1) + value = "0x2" + + expect(EthereumJSONRPC.Mox, :json_rpc, fn [%{id: id, method: "trace_block"}], _options -> + trace_block_block_hash = block_hash() + + refute trace_block_block_hash == block_hash + + {:ok, + [ + %{ + id: id, + result: [ + %{ + "action" => %{ + "author" => to_string(miner), + "rewardType" => "external", + "value" => value + }, + "blockHash" => to_string(trace_block_block_hash), + "blockNumber" => block.number, + "result" => nil, + "subtraces" => 0, + "traceAddress" => [], + "transactionHash" => nil, + "transactionPosition" => nil, + "type" => "reward" + } + ] + } + ]} + end) + + {:ok, reward} = Wei.cast(value) + + assert {:ok, []} = Importer.fetch_and_import_rewards([block]) + end end end From e0a19e906fd9d85635df605c64992d891118042a Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Thu, 24 Jan 2019 08:56:46 -0600 Subject: [PATCH 10/11] Ignore beneficiaries with different block hash Fixes #1337 --- apps/indexer/lib/indexer/block/fetcher.ex | 70 +++++++++++++++---- apps/indexer/lib/indexer/block/supervisor.ex | 10 ++- .../block/uncataloged_rewards/importer.ex | 65 +++++++++++++---- .../block/uncataloged_rewards/processor.ex | 41 +++++++---- .../indexer/block/catchup/fetcher_test.exs | 18 ++++- .../uncataloged_rewards/importer_test.exs | 20 +++--- 6 files changed, 171 insertions(+), 53 deletions(-) diff --git a/apps/indexer/lib/indexer/block/fetcher.ex b/apps/indexer/lib/indexer/block/fetcher.ex index f2c62bf5b9..1779c051e9 100644 --- a/apps/indexer/lib/indexer/block/fetcher.ex +++ b/apps/indexer/lib/indexer/block/fetcher.ex @@ -106,9 +106,8 @@ defmodule Indexer.Block.Fetcher do transactions_with_receipts = Receipts.put(transactions_params_without_receipts, receipts), %{token_transfers: token_transfers, tokens: tokens} = TokenTransfers.parse(logs), %{mint_transfers: mint_transfers} = MintTransfer.parse(logs), - {:beneficiaries, - {:ok, %FetchedBeneficiaries{params_set: beneficiary_params_set, errors: beneficiaries_errors}}} <- - fetch_beneficiaries(range, json_rpc_named_arguments), + %FetchedBeneficiaries{params_set: beneficiary_params_set, errors: beneficiaries_errors} = + fetch_beneficiaries(blocks, json_rpc_named_arguments), addresses = AddressExtraction.extract_addresses(%{ block_reward_contract_beneficiaries: MapSet.to_list(beneficiary_params_set), @@ -200,16 +199,63 @@ defmodule Indexer.Block.Fetcher do def async_import_uncles(_), do: :ok - defp fetch_beneficiaries(range, json_rpc_named_arguments) do - result = - with :ignore <- - range - |> Enum.to_list() - |> EthereumJSONRPC.fetch_beneficiaries(json_rpc_named_arguments) do - {:ok, %FetchedBeneficiaries{params_set: MapSet.new()}} - end + defp fetch_beneficiaries(blocks, json_rpc_named_arguments) do + hash_by_number = Enum.into(blocks, %{}, &{&1.number, &1.hash}) + + hash_by_number + |> Map.keys() + |> EthereumJSONRPC.fetch_beneficiaries(json_rpc_named_arguments) + |> case do + {:ok, %FetchedBeneficiaries{params_set: params_set} = fetched_beneficiaries} -> + consensus_params_set = consensus_params_set(params_set, hash_by_number) + + %FetchedBeneficiaries{fetched_beneficiaries | params_set: consensus_params_set} + + {:error, reason} -> + Logger.error(fn -> ["Could not fetch beneficiaries: ", inspect(reason)] end) + + error = + case reason do + %{code: code, message: message} -> %{code: code, message: message} + _ -> %{code: -1, message: inspect(reason)} + end + + errors = + Enum.map(hash_by_number, fn {_, number} -> + Map.put(error, :data, %{block_number: number}) + end) - {:beneficiaries, result} + %FetchedBeneficiaries{errors: errors} + + :ignore -> + %FetchedBeneficiaries{} + end + end + + defp consensus_params_set(params_set, hash_by_number) do + params_set + |> Enum.filter(fn %{block_number: block_number, block_hash: block_hash} -> + case Map.fetch!(hash_by_number, block_number) do + ^block_hash -> + true + + other_block_hash -> + Logger.debug(fn -> + [ + "fetch beneficiaries reported block number (", + to_string(block_number), + ") maps to different (", + other_block_hash, + ") block hash than the one from getBlock (", + block_hash, + "). A reorg has occurred." + ] + end) + + false + end + end) + |> Enum.into(MapSet.new()) end defp add_gas_payments(beneficiaries, transactions) do diff --git a/apps/indexer/lib/indexer/block/supervisor.ex b/apps/indexer/lib/indexer/block/supervisor.ex index 1366dd6654..4278ff95ca 100644 --- a/apps/indexer/lib/indexer/block/supervisor.ex +++ b/apps/indexer/lib/indexer/block/supervisor.ex @@ -13,7 +13,13 @@ defmodule Indexer.Block.Supervisor do end @impl Supervisor - def init(%{block_interval: block_interval, subscribe_named_arguments: subscribe_named_arguments} = named_arguments) do + def init( + %{ + block_interval: block_interval, + json_rpc_named_arguments: json_rpc_named_arguments, + subscribe_named_arguments: subscribe_named_arguments + } = named_arguments + ) do block_fetcher = named_arguments |> Map.drop(~w(block_interval memory_monitor subscribe_named_arguments)a) @@ -35,7 +41,7 @@ defmodule Indexer.Block.Supervisor do [name: Realtime.Supervisor] ]}, {Uncle.Supervisor, [[block_fetcher: block_fetcher, memory_monitor: memory_monitor], [name: Uncle.Supervisor]]}, - UncatalogedRewards.Processor + {UncatalogedRewards.Processor, [json_rpc_named_arguments, [name: UncatalogedRewards.Processor]]} ], strategy: :one_for_one ) diff --git a/apps/indexer/lib/indexer/block/uncataloged_rewards/importer.ex b/apps/indexer/lib/indexer/block/uncataloged_rewards/importer.ex index 409bc56701..33b8494c4f 100644 --- a/apps/indexer/lib/indexer/block/uncataloged_rewards/importer.ex +++ b/apps/indexer/lib/indexer/block/uncataloged_rewards/importer.ex @@ -3,6 +3,8 @@ defmodule Indexer.Block.UncatalogedRewards.Importer do a module to fetch and import the rewards for blocks that were indexed without the reward """ + require Logger + alias EthereumJSONRPC.FetchedBeneficiaries alias Explorer.Chain alias Explorer.Chain.Wei @@ -15,20 +17,21 @@ defmodule Indexer.Block.UncatalogedRewards.Importer do @doc """ receives a list of blocks and tries to fetch and insert rewards for them """ - def fetch_and_import_rewards(blocks) when is_list(blocks) do + def fetch_and_import_rewards(blocks, json_rpc_named_arguments) when is_list(blocks) do block_rewards = blocks - |> Stream.map(& &1.number) |> Stream.chunk_every(@chunk_size) - |> Enum.flat_map(&block_numbers_to_rewards/1) + |> Enum.flat_map(&blocks_to_rewards(&1, json_rpc_named_arguments)) {:ok, block_rewards} rescue e in RuntimeError -> {:error, %{exception: e}} end - defp block_numbers_to_rewards(block_numbers) when is_list(block_numbers) do - case fetch_beneficiaries(block_numbers) do + defp blocks_to_rewards(blocks, json_rpc_named_arguments) when is_list(blocks) do + blocks + |> fetch_beneficiaries(json_rpc_named_arguments) + |> case do [] -> [] @@ -42,13 +45,49 @@ defmodule Indexer.Block.UncatalogedRewards.Importer do end end - defp fetch_beneficiaries(block_numbers) when is_list(block_numbers) do - {:ok, %FetchedBeneficiaries{params_set: result}} = - with :ignore <- EthereumJSONRPC.fetch_beneficiaries(block_numbers, json_rpc_named_arguments()) do - {:ok, %FetchedBeneficiaries{params_set: MapSet.new()}} - end + defp fetch_beneficiaries(blocks, json_rpc_named_arguments) when is_list(blocks) do + hash_by_number = Enum.into(blocks, %{}, &{&1.number, to_string(&1.hash)}) + + hash_by_number + |> Map.keys() + |> EthereumJSONRPC.fetch_beneficiaries(json_rpc_named_arguments) + |> case do + {:ok, %FetchedBeneficiaries{params_set: params_set}} -> + params_set_to_consensus_beneficiaries_params(params_set, hash_by_number) + + {:error, reason} -> + Logger.error(fn -> ["Could not fetch beneficiaries: ", inspect(reason)] end) + [] - Enum.sort_by(result, &{&1.address_hash, &1.address_type, &1.block_hash}) + :ignore -> + [] + end + end + + defp params_set_to_consensus_beneficiaries_params(params_set, hash_by_number) do + params_set + |> Enum.filter(fn %{block_number: block_number, block_hash: block_hash} -> + case Map.fetch!(hash_by_number, block_number) do + ^block_hash -> + true + + other_block_hash -> + Logger.debug(fn -> + [ + "fetch beneficiaries reported block number (", + to_string(block_number), + ") maps to different (", + other_block_hash, + ") block hash than the one in the database (", + block_hash, + "). A reorg has occurred." + ] + end) + + false + end + end) + |> Enum.sort_by(&{&1.address_hash, &1.address_type, &1.block_hash}) end defp add_gas_payments(beneficiaries) do @@ -73,8 +112,4 @@ defmodule Indexer.Block.UncatalogedRewards.Importer do defp import_block_reward_params(block_rewards_params) when is_list(block_rewards_params) do Chain.import(%{block_rewards: %{params: block_rewards_params}}) end - - defp json_rpc_named_arguments do - Application.get_env(:explorer, :json_rpc_named_arguments) - end end diff --git a/apps/indexer/lib/indexer/block/uncataloged_rewards/processor.ex b/apps/indexer/lib/indexer/block/uncataloged_rewards/processor.ex index 9515f4667f..63b356f5e4 100644 --- a/apps/indexer/lib/indexer/block/uncataloged_rewards/processor.ex +++ b/apps/indexer/lib/indexer/block/uncataloged_rewards/processor.ex @@ -11,27 +11,42 @@ defmodule Indexer.Block.UncatalogedRewards.Processor do @max_batch_size 150 @default_cooldown 300 - def start_link(_) do - GenServer.start_link(__MODULE__, :ok, name: __MODULE__) + @doc false + def child_spec([json_rpc_named_arguments, gen_server_options]) do + Supervisor.child_spec({__MODULE__, [json_rpc_named_arguments, gen_server_options]}, id: __MODULE__) end - @impl true - def init(args) do - send(self(), :import_batch) - {:ok, args} + def start_link(init_options, gen_server_options) do + GenServer.start_link(__MODULE__, init_options, gen_server_options) end - @impl true - def handle_info(:import_batch, state) do + @impl GenServer + def init(json_rpc_named_arguments) do + {:ok, json_rpc_named_arguments, {:continue, :import_batch}} + end + + @impl GenServer + def handle_continue(:import_batch, json_rpc_named_arguments) do + import_batch(json_rpc_named_arguments) + + {:noreply, json_rpc_named_arguments} + end + + @impl GenServer + def handle_info(:import_batch, json_rpc_named_arguments) do + import_batch(json_rpc_named_arguments) + + {:noreply, json_rpc_named_arguments} + end + + defp import_batch(json_rpc_named_arguments) do @max_batch_size |> Chain.get_blocks_without_reward() - |> import_or_try_later - - {:noreply, state} + |> import_or_try_later(json_rpc_named_arguments) end - defp import_or_try_later(batch) do - import_results = Importer.fetch_and_import_rewards(batch) + defp import_or_try_later(batch, json_rpc_named_arguments) do + import_results = Importer.fetch_and_import_rewards(batch, json_rpc_named_arguments) wait_time = if import_results == {:ok, []}, do: :timer.hours(24), else: @default_cooldown diff --git a/apps/indexer/test/indexer/block/catchup/fetcher_test.exs b/apps/indexer/test/indexer/block/catchup/fetcher_test.exs index 4b66cae068..2f1f44081b 100644 --- a/apps/indexer/test/indexer/block/catchup/fetcher_test.exs +++ b/apps/indexer/test/indexer/block/catchup/fetcher_test.exs @@ -5,6 +5,8 @@ defmodule Indexer.Block.Catchup.FetcherTest do import EthereumJSONRPC, only: [integer_to_quantity: 1] import Mox + alias Explorer.Chain.Block + alias Explorer.Chain.Block.Reward alias Indexer.{Block, CoinBalance, InternalTransaction, Token, TokenBalance} alias Indexer.Block.Catchup.Fetcher @@ -117,6 +119,11 @@ defmodule Indexer.Block.Catchup.FetcherTest do test "ignores fetched beneficiaries with different hash for same number", %{ json_rpc_named_arguments: json_rpc_named_arguments } do + CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) + InternalTransaction.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) + Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) + TokenBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) + latest_block_number = 1 latest_block_quantity = integer_to_quantity(latest_block_number) @@ -203,7 +210,9 @@ defmodule Indexer.Block.Catchup.FetcherTest do } end) - assert {:ok, _} = + assert count(Block) == 0 + + assert %{first_block_number: ^block_number, missing_block_count: 1, shrunk: false} = Fetcher.task(%Fetcher{ blocks_batch_size: 1, block_fetcher: %Block.Fetcher{ @@ -211,6 +220,13 @@ defmodule Indexer.Block.Catchup.FetcherTest do json_rpc_named_arguments: json_rpc_named_arguments } }) + + assert count(Block) == 1 + assert count(Reward) == 0 end end + + defp count(schema) do + Repo.one!(select(schema, fragment("COUNT(*)"))) + end end diff --git a/apps/indexer/test/indexer/block/uncataloged_rewards/importer_test.exs b/apps/indexer/test/indexer/block/uncataloged_rewards/importer_test.exs index 282865c5dd..c72cb069dd 100644 --- a/apps/indexer/test/indexer/block/uncataloged_rewards/importer_test.exs +++ b/apps/indexer/test/indexer/block/uncataloged_rewards/importer_test.exs @@ -9,12 +9,12 @@ defmodule Indexer.Block.UncatalogedRewards.ImporterTest do alias Indexer.Block.UncatalogedRewards.Importer describe "fetch_and_import_rewards/1" do - test "return `{:ok, []}` when receiving an empty list" do - assert Importer.fetch_and_import_rewards([]) == {:ok, []} + test "return `{:ok, []}` when receiving an empty list", %{json_rpc_named_arguments: json_rpc_named_arguments} do + assert Importer.fetch_and_import_rewards([], json_rpc_named_arguments) == {:ok, []} end @tag :no_geth - test "return `{:ok, [transactions executed]}`" do + test "return `{:ok, [transactions executed]}`", %{json_rpc_named_arguments: json_rpc_named_arguments} do address = insert(:address) address_hash = address.hash @@ -54,11 +54,11 @@ defmodule Indexer.Block.UncatalogedRewards.ImporterTest do block_hash: ^block_hash, address_type: :validator } - ]} = Importer.fetch_and_import_rewards([block]) + ]} = Importer.fetch_and_import_rewards([block], json_rpc_named_arguments) end @tag :no_geth - test "replaces reward on conflict" do + test "replaces reward on conflict", %{json_rpc_named_arguments: json_rpc_named_arguments} do miner = insert(:address) block = insert(:block, miner: miner) block_hash = block.hash @@ -95,11 +95,13 @@ defmodule Indexer.Block.UncatalogedRewards.ImporterTest do {:ok, reward} = Wei.cast(value) assert {:ok, [%Reward{block_hash: ^block_hash, address_type: ^address_type, reward: ^reward}]} = - Importer.fetch_and_import_rewards([block]) + Importer.fetch_and_import_rewards([block], json_rpc_named_arguments) end # regression test for https://github.com/poanetwork/blockscout/issues/1337 - test "with different block hash due to consensus switch between database query and trace_block" do + test "with different block hash due to consensus switch between database query and trace_block", %{ + json_rpc_named_arguments: json_rpc_named_arguments + } do miner = insert(:address) block = insert(:block, miner: miner) block_hash = block.hash @@ -137,9 +139,7 @@ defmodule Indexer.Block.UncatalogedRewards.ImporterTest do ]} end) - {:ok, reward} = Wei.cast(value) - - assert {:ok, []} = Importer.fetch_and_import_rewards([block]) + assert {:ok, []} = Importer.fetch_and_import_rewards([block], json_rpc_named_arguments) end end end From 294c9f37e4d31c03ffbb31b07cd33669aa0be3e6 Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Fri, 25 Jan 2019 14:55:57 -0600 Subject: [PATCH 11/11] Convert UncatalogedRewards.Processor into a BufferedTask * Makes it works like the rest of the fetchers for consistency. * Allow it to participate in memory monitoring Bug Fixes * Addresses are created as needed for block rewards. * Coin Balances are created for each block and address mentioned. * Coin Balances are queues in CoinBalance.Fetcher. Enhancements * Fetch beneficiary errors no longer kill the block catchup, but instead only causes them to be async fetched while the rest of the block imports --- apps/explorer/lib/explorer/chain.ex | 32 +- apps/explorer/lib/explorer/chain/block.ex | 4 +- .../test/explorer/chain/block_test.exs | 4 +- apps/explorer/test/explorer/chain_test.exs | 26 +- .../lib/indexer/block/catchup/fetcher.ex | 35 +- apps/indexer/lib/indexer/block/fetcher.ex | 53 +- .../lib/indexer/block/realtime/fetcher.ex | 12 +- .../lib/indexer/block/reward/fetcher.ex | 246 +++++++ .../lib/indexer/block/reward/supervisor.ex | 38 + apps/indexer/lib/indexer/block/supervisor.ex | 8 +- .../block/uncataloged_rewards/importer.ex | 115 --- .../block/uncataloged_rewards/processor.ex | 55 -- .../lib/indexer/coin_balance/fetcher.ex | 1 + .../indexer/block/catchup/fetcher_test.exs | 212 +++++- .../indexer/block/reward/fetcher_test.exs | 687 ++++++++++++++++++ .../uncataloged_rewards/importer_test.exs | 145 ---- .../indexer/block/reward/supervisor/case.ex | 17 + 17 files changed, 1329 insertions(+), 361 deletions(-) create mode 100644 apps/indexer/lib/indexer/block/reward/fetcher.ex create mode 100644 apps/indexer/lib/indexer/block/reward/supervisor.ex delete mode 100644 apps/indexer/lib/indexer/block/uncataloged_rewards/importer.ex delete mode 100644 apps/indexer/lib/indexer/block/uncataloged_rewards/processor.ex create mode 100644 apps/indexer/test/indexer/block/reward/fetcher_test.exs delete mode 100644 apps/indexer/test/indexer/block/uncataloged_rewards/importer_test.exs create mode 100644 apps/indexer/test/support/indexer/block/reward/supervisor/case.ex diff --git a/apps/explorer/lib/explorer/chain.ex b/apps/explorer/lib/explorer/chain.ex index b060564aa9..55bfe35aa9 100644 --- a/apps/explorer/lib/explorer/chain.ex +++ b/apps/explorer/lib/explorer/chain.ex @@ -958,6 +958,29 @@ defmodule Explorer.Chain do |> Repo.all() end + @doc """ + Map `block_number`s to their `t:Explorer.Chain.Block.t/0` `hash` `t:Explorer.Chain.Hash.Full.t/0`. + + Does not include non-consensus blocks. + + iex> block = insert(:block, consensus: false) + iex> Explorer.Chain.block_hash_by_number([block.number]) + %{} + + """ + @spec block_hash_by_number([Block.block_number()]) :: %{Block.block_number() => Hash.Full.t()} + def block_hash_by_number(block_numbers) when is_list(block_numbers) do + query = + from(block in Block, + where: block.consensus == true and block.number in ^block_numbers, + select: {block.number, block.hash} + ) + + query + |> Repo.all() + |> Enum.into(%{}) + end + @doc """ Lists the top 250 `t:Explorer.Chain.Address.t/0`'s' in descending order based on coin balance. @@ -977,12 +1000,11 @@ defmodule Explorer.Chain do end @doc """ - Finds blocks without a reward associated, up to the specified limit + Calls `reducer` on a stream of `t:Explorer.Chain.Block.t/0` without `t:Explorer.Chain.Block.Reward.t/0`. """ - def get_blocks_without_reward(limit \\ 250) do - Block.get_blocks_without_reward() - |> limit(^limit) - |> Repo.all() + def stream_blocks_without_rewards(initial, reducer) when is_function(reducer, 2) do + Block.blocks_without_reward_query() + |> Repo.stream_reduce(initial, reducer) end @doc """ diff --git a/apps/explorer/lib/explorer/chain/block.ex b/apps/explorer/lib/explorer/chain/block.ex index f636447f2e..6e76f4801c 100644 --- a/apps/explorer/lib/explorer/chain/block.ex +++ b/apps/explorer/lib/explorer/chain/block.ex @@ -101,9 +101,9 @@ defmodule Explorer.Chain.Block do |> unique_constraint(:hash, name: :blocks_pkey) end - def get_blocks_without_reward(query \\ __MODULE__) do + def blocks_without_reward_query do from( - b in query, + b in __MODULE__, left_join: r in Reward, on: [block_hash: b.hash], where: is_nil(r.block_hash) and b.consensus == true diff --git a/apps/explorer/test/explorer/chain/block_test.exs b/apps/explorer/test/explorer/chain/block_test.exs index 81268a6d27..35f14c5479 100644 --- a/apps/explorer/test/explorer/chain/block_test.exs +++ b/apps/explorer/test/explorer/chain/block_test.exs @@ -43,14 +43,14 @@ defmodule Explorer.Chain.BlockTest do end end - describe "get_blocks_without_reward/1" do + describe "blocks_without_reward_query/1" do test "finds only blocks without rewards" do rewarded_block = insert(:block) insert(:reward, address_hash: insert(:address).hash, block_hash: rewarded_block.hash) unrewarded_block = insert(:block) results = - Block.get_blocks_without_reward() + Block.blocks_without_reward_query() |> Repo.all() |> Enum.map(& &1.hash) diff --git a/apps/explorer/test/explorer/chain_test.exs b/apps/explorer/test/explorer/chain_test.exs index fe951e111d..7805c77043 100644 --- a/apps/explorer/test/explorer/chain_test.exs +++ b/apps/explorer/test/explorer/chain_test.exs @@ -1274,6 +1274,24 @@ defmodule Explorer.ChainTest do end end + describe "block_hash_by_number/1" do + test "without blocks returns empty map" do + assert Chain.block_hash_by_number([]) == %{} + end + + test "with consensus block returns mapping" do + block = insert(:block) + + assert Chain.block_hash_by_number([block.number]) == %{block.number => block.hash} + end + + test "with non-consensus block does not return mapping" do + block = insert(:block, consensus: false) + + assert Chain.block_hash_by_number([block.number]) == %{} + end + end + describe "list_top_addresses/0" do test "without addresses with balance > 0" do insert(:address, fetched_coin_balance: 0) @@ -1315,25 +1333,25 @@ defmodule Explorer.ChainTest do end end - describe "get_blocks_without_reward/1" do + describe "stream_blocks_without_rewards/2" do test "includes consensus blocks" do %Block{hash: consensus_hash} = insert(:block, consensus: true) - assert [%Block{hash: ^consensus_hash}] = Chain.get_blocks_without_reward() + assert {:ok, [%Block{hash: ^consensus_hash}]} = Chain.stream_blocks_without_rewards([], &[&1 | &2]) end test "does not include consensus block that has a reward" do %Block{hash: consensus_hash, miner_hash: miner_hash} = insert(:block, consensus: true) insert(:reward, address_hash: miner_hash, block_hash: consensus_hash) - assert [] = Chain.get_blocks_without_reward() + assert {:ok, []} = Chain.stream_blocks_without_rewards([], &[&1 | &2]) end # https://github.com/poanetwork/blockscout/issues/1310 regression test test "does not include non-consensus blocks" do insert(:block, consensus: false) - assert [] = Chain.get_blocks_without_reward() + assert {:ok, []} = Chain.stream_blocks_without_rewards([], &[&1 | &2]) end end diff --git a/apps/indexer/lib/indexer/block/catchup/fetcher.ex b/apps/indexer/lib/indexer/block/catchup/fetcher.ex index 233fd5f7e4..cbcbeb5719 100644 --- a/apps/indexer/lib/indexer/block/catchup/fetcher.ex +++ b/apps/indexer/lib/indexer/block/catchup/fetcher.ex @@ -8,7 +8,13 @@ defmodule Indexer.Block.Catchup.Fetcher do require Logger import Indexer.Block.Fetcher, - only: [async_import_coin_balances: 2, async_import_tokens: 1, async_import_uncles: 1, fetch_and_import_range: 2] + only: [ + async_import_block_rewards: 1, + async_import_coin_balances: 2, + async_import_tokens: 1, + async_import_uncles: 1, + fetch_and_import_range: 2 + ] alias Ecto.Changeset alias Explorer.Chain @@ -113,22 +119,27 @@ defmodule Indexer.Block.Catchup.Fetcher do @impl Block.Fetcher def import(_, options) when is_map(options) do - {async_import_remaining_block_data_options, chain_import_options} = + {async_import_remaining_block_data_options, options_with_block_rewards_errors} = Map.split(options, @async_import_remaining_block_data_options) - full_chain_import_options = put_in(chain_import_options, [:blocks, :params, Access.all(), :consensus], true) + {block_reward_errors, options_without_block_rewards_errors} = + pop_in(options_with_block_rewards_errors[:block_rewards][:errors]) + + full_chain_import_options = + put_in(options_without_block_rewards_errors, [:blocks, :params, Access.all(), :consensus], true) with {:import, {:ok, imported} = ok} <- {:import, Chain.import(full_chain_import_options)} do async_import_remaining_block_data( imported, - async_import_remaining_block_data_options + Map.put(async_import_remaining_block_data_options, :block_rewards, %{errors: block_reward_errors}) ) ok end end - defp async_import_remaining_block_data(imported, options) do + defp async_import_remaining_block_data(imported, %{block_rewards: %{errors: block_reward_errors}} = options) do + async_import_block_rewards(block_reward_errors) async_import_coin_balances(imported, options) async_import_created_contract_codes(imported) async_import_internal_transactions(imported) @@ -286,19 +297,19 @@ defmodule Indexer.Block.Catchup.Fetcher do end end - defp retry(sequence, errors) when is_list(errors) do - errors - |> errors_to_ranges() + defp retry(sequence, block_errors) when is_list(block_errors) do + block_errors + |> block_errors_to_block_number_ranges() |> Enum.map(&push_back(sequence, &1)) end - defp errors_to_ranges(errors) when is_list(errors) do - errors - |> Enum.flat_map(&error_to_numbers/1) + defp block_errors_to_block_number_ranges(block_errors) when is_list(block_errors) do + block_errors + |> Enum.map(&block_error_to_number/1) |> numbers_to_ranges() end - defp error_to_numbers(%{data: %{number: number}}) when is_integer(number), do: [number] + defp block_error_to_number(%{data: %{number: number}}) when is_integer(number), do: number defp numbers_to_ranges([]), do: [] diff --git a/apps/indexer/lib/indexer/block/fetcher.ex b/apps/indexer/lib/indexer/block/fetcher.ex index 1779c051e9..9291ebca6d 100644 --- a/apps/indexer/lib/indexer/block/fetcher.ex +++ b/apps/indexer/lib/indexer/block/fetcher.ex @@ -7,6 +7,8 @@ defmodule Indexer.Block.Fetcher do require Logger + import EthereumJSONRPC, only: [quantity_to_integer: 1] + alias EthereumJSONRPC.{Blocks, FetchedBeneficiaries} alias Explorer.Chain.{Address, Block, Import} alias Indexer.{AddressExtraction, CoinBalance, MintTransfer, Token, TokenTransfers, Tracer} @@ -136,14 +138,14 @@ defmodule Indexer.Block.Fetcher do address_token_balances: %{params: address_token_balances}, blocks: %{params: blocks}, block_second_degree_relations: %{params: block_second_degree_relations_params}, - block_rewards: %{params: beneficiaries_with_gas_payment}, + block_rewards: %{errors: beneficiaries_errors, params: beneficiaries_with_gas_payment}, logs: %{params: logs}, token_transfers: %{params: token_transfers}, tokens: %{on_conflict: :nothing, params: tokens}, transactions: %{params: transactions_with_receipts} } ) do - {:ok, %{inserted: inserted, errors: blocks_errors ++ beneficiaries_errors}} + {:ok, %{inserted: inserted, errors: blocks_errors}} else {step, {:error, reason}} -> {:error, {step, reason}} {:import, {:error, step, failed_value, changes_so_far}} -> {:error, {step, failed_value, changes_so_far}} @@ -170,6 +172,14 @@ defmodule Indexer.Block.Fetcher do callback_module.import(state, options_with_broadcast) end + def async_import_block_rewards([]), do: :ok + + def async_import_block_rewards(errors) when is_list(errors) do + errors + |> block_reward_errors_to_block_numbers() + |> Indexer.Block.Reward.Fetcher.async_fetch() + end + def async_import_coin_balances(%{addresses: addresses}, %{ address_hash_to_fetched_balance_block_number: address_hash_to_block_number }) do @@ -199,15 +209,31 @@ defmodule Indexer.Block.Fetcher do def async_import_uncles(_), do: :ok + defp block_reward_errors_to_block_numbers(block_reward_errors) when is_list(block_reward_errors) do + Enum.map(block_reward_errors, &block_reward_error_to_block_number/1) + end + + defp block_reward_error_to_block_number(%{data: %{block_number: block_number}}) when is_integer(block_number) do + block_number + end + + defp block_reward_error_to_block_number(%{data: %{block_quantity: block_quantity}}) when is_binary(block_quantity) do + quantity_to_integer(block_quantity) + end + defp fetch_beneficiaries(blocks, json_rpc_named_arguments) do - hash_by_number = Enum.into(blocks, %{}, &{&1.number, &1.hash}) + hash_string_by_number = + Enum.into(blocks, %{}, fn %{number: number, hash: hash_string} + when is_integer(number) and is_binary(hash_string) -> + {number, hash_string} + end) - hash_by_number + hash_string_by_number |> Map.keys() |> EthereumJSONRPC.fetch_beneficiaries(json_rpc_named_arguments) |> case do {:ok, %FetchedBeneficiaries{params_set: params_set} = fetched_beneficiaries} -> - consensus_params_set = consensus_params_set(params_set, hash_by_number) + consensus_params_set = consensus_params_set(params_set, hash_string_by_number) %FetchedBeneficiaries{fetched_beneficiaries | params_set: consensus_params_set} @@ -221,7 +247,7 @@ defmodule Indexer.Block.Fetcher do end errors = - Enum.map(hash_by_number, fn {_, number} -> + Enum.map(hash_string_by_number, fn {number, _} when is_integer(number) -> Map.put(error, :data, %{block_number: number}) end) @@ -232,22 +258,23 @@ defmodule Indexer.Block.Fetcher do end end - defp consensus_params_set(params_set, hash_by_number) do + defp consensus_params_set(params_set, hash_string_by_number) do params_set - |> Enum.filter(fn %{block_number: block_number, block_hash: block_hash} -> - case Map.fetch!(hash_by_number, block_number) do - ^block_hash -> + |> Enum.filter(fn %{block_number: block_number, block_hash: block_hash_string} + when is_integer(block_number) and is_binary(block_hash_string) -> + case Map.fetch!(hash_string_by_number, block_number) do + ^block_hash_string -> true - other_block_hash -> + other_block_hash_string -> Logger.debug(fn -> [ "fetch beneficiaries reported block number (", to_string(block_number), ") maps to different (", - other_block_hash, + other_block_hash_string, ") block hash than the one from getBlock (", - block_hash, + block_hash_string, "). A reorg has occurred." ] end) diff --git a/apps/indexer/lib/indexer/block/realtime/fetcher.ex b/apps/indexer/lib/indexer/block/realtime/fetcher.ex index 3b37797c9b..3e151d5106 100644 --- a/apps/indexer/lib/indexer/block/realtime/fetcher.ex +++ b/apps/indexer/lib/indexer/block/realtime/fetcher.ex @@ -10,7 +10,9 @@ defmodule Indexer.Block.Realtime.Fetcher do require Logger import EthereumJSONRPC, only: [integer_to_quantity: 1, quantity_to_integer: 1] - import Indexer.Block.Fetcher, only: [async_import_tokens: 1, async_import_uncles: 1, fetch_and_import_range: 2] + + import Indexer.Block.Fetcher, + only: [async_import_block_rewards: 1, async_import_tokens: 1, async_import_uncles: 1, fetch_and_import_range: 2] alias ABI.TypeDecoder alias Ecto.Changeset @@ -157,6 +159,7 @@ defmodule Indexer.Block.Realtime.Fetcher do address_hash_to_fetched_balance_block_number: address_hash_to_block_number, address_token_balances: %{params: address_token_balances_params}, addresses: %{params: addresses_params}, + block_rewards: block_rewards, transactions: %{params: transactions_params}, token_transfers: %{params: token_transfers_params} } = options @@ -183,17 +186,19 @@ defmodule Indexer.Block.Realtime.Fetcher do {:address_token_balances, {:ok, address_token_balances}} <- {:address_token_balances, fetch_token_balances(address_token_balances_params)}, address_current_token_balances = TokenBalances.to_address_current_token_balances(address_token_balances), + {block_reward_errors, chain_import_block_rewards} = Map.pop(block_rewards, :errors), chain_import_options = options |> Map.drop(@import_options) |> put_in([:addresses, :params], balances_addresses_params) |> put_in([:blocks, :params, Access.all(), :consensus], true) + |> put_in([:block_rewards], chain_import_block_rewards) |> put_in([Access.key(:address_coin_balances, %{}), :params], balances_params) |> put_in([Access.key(:address_current_token_balances, %{}), :params], address_current_token_balances) |> put_in([Access.key(:address_token_balances), :params], address_token_balances) |> put_in([Access.key(:internal_transactions, %{}), :params], internal_transactions_params), {:import, {:ok, imported} = ok} <- {:import, Chain.import(chain_import_options)} do - async_import_remaining_block_data(imported) + async_import_remaining_block_data(imported, %{block_rewards: %{errors: block_reward_errors}}) ok end end @@ -337,7 +342,8 @@ defmodule Indexer.Block.Realtime.Fetcher do Enum.any?(changesets, &(Map.get(&1, :message) == "Unknown block number")) end - defp async_import_remaining_block_data(imported) do + defp async_import_remaining_block_data(imported, %{block_rewards: %{errors: block_reward_errors}}) do + async_import_block_rewards(block_reward_errors) async_import_tokens(imported) async_import_uncles(imported) end diff --git a/apps/indexer/lib/indexer/block/reward/fetcher.ex b/apps/indexer/lib/indexer/block/reward/fetcher.ex new file mode 100644 index 0000000000..611b3ccbbf --- /dev/null +++ b/apps/indexer/lib/indexer/block/reward/fetcher.ex @@ -0,0 +1,246 @@ +defmodule Indexer.Block.Reward.Fetcher do + @moduledoc """ + Fetches `t:Explorer.Chain.Block.Reward.t/0` for a given `t:Explorer.Chain.Block.block_number/0`. + + To protect from reorgs where the returned rewards are for same `number`, but a different `hash`, the `hash` is + retrieved from the database and compared against that returned from `EthereumJSONRPC.` + """ + + use Spandex.Decorators + + require Logger + + import EthereumJSONRPC, only: [quantity_to_integer: 1] + + alias Ecto.Changeset + alias EthereumJSONRPC.FetchedBeneficiaries + alias Explorer.Chain + alias Explorer.Chain.{Block, Wei} + alias Indexer.Address.CoinBalances + alias Indexer.{AddressExtraction, BufferedTask, CoinBalance, Tracer} + + @behaviour BufferedTask + + @defaults [ + flush_interval: :timer.seconds(3), + max_batch_size: 10, + max_concurrency: 4, + task_supervisor: Indexer.Block.Reward.TaskSupervisor, + metadata: [fetcher: :block_reward] + ] + + @doc """ + Asynchronously fetches block rewards for each `t:Explorer.Chain.Explorer.block_number/0`` in `block_numbers`. + """ + @spec async_fetch([Block.block_number()]) :: :ok + def async_fetch(block_numbers) when is_list(block_numbers) do + BufferedTask.buffer(__MODULE__, block_numbers) + end + + @doc false + # credo:disable-for-next-line Credo.Check.Design.DuplicatedCode + def child_spec([init_options, gen_server_options]) do + {state, mergeable_init_options} = Keyword.pop(init_options, :json_rpc_named_arguments) + + unless state do + raise ArgumentError, + ":json_rpc_named_arguments must be provided to `#{__MODULE__}.child_spec " <> + "to allow for json_rpc calls when running." + end + + merged_init_options = + @defaults + |> Keyword.merge(mergeable_init_options) + |> Keyword.put(:state, state) + + Supervisor.child_spec({BufferedTask, [{__MODULE__, merged_init_options}, gen_server_options]}, id: __MODULE__) + end + + @impl BufferedTask + def init(initial, reducer, _) do + {:ok, final} = + Chain.stream_blocks_without_rewards(initial, fn %{number: number}, acc -> + reducer.(number, acc) + end) + + final + end + + @impl BufferedTask + @decorate trace(name: "fetch", resource: "Indexer.Block.Reward.Fetcher.run/2", service: :indexer, tracer: Tracer) + def run(entries, json_rpc_named_arguments) do + hash_string_by_number = + entries + |> Enum.uniq() + |> hash_string_by_number() + + consensus_numbers = Map.keys(hash_string_by_number) + + consensus_number_count = Enum.count(consensus_numbers) + + Logger.metadata(count: consensus_number_count) + + Logger.debug(fn -> "fetching" end) + + consensus_numbers + |> EthereumJSONRPC.fetch_beneficiaries(json_rpc_named_arguments) + |> case do + {:ok, fetched_beneficiaries} -> + run_fetched_beneficiaries(fetched_beneficiaries, hash_string_by_number) + + {:error, reason} -> + Logger.error( + fn -> + ["failed to fetch: ", inspect(reason)] + end, + error_count: consensus_number_count + ) + + {:retry, consensus_numbers} + end + end + + defp hash_string_by_number(numbers) when is_list(numbers) do + numbers + |> Chain.block_hash_by_number() + |> Enum.into(%{}, fn {number, hash} -> + {number, to_string(hash)} + end) + end + + defp run_fetched_beneficiaries(%FetchedBeneficiaries{params_set: params_set, errors: errors}, hash_string_by_number) do + params_set + |> filter_consensus_params(hash_string_by_number) + |> case do + [] -> + retry_errors(errors) + + beneficiaries_params -> + beneficiaries_params + |> add_gas_payments() + |> import_block_reward_params() + |> case do + {:ok, %{address_coin_balances: address_coin_balances}} -> + CoinBalance.Fetcher.async_fetch_balances(address_coin_balances) + + retry_errors(errors) + + {:error, [%Changeset{} | _] = changesets} -> + Logger.error(fn -> ["Failed to validate: ", inspect(changesets)] end, + error_count: Enum.count(hash_string_by_number) + ) + + retry_beneficiaries_params(beneficiaries_params) + + {:error, step, failed_value, _changes_so_far} -> + Logger.error(fn -> ["Failed to import", inspect(failed_value)] end, + step: step, + error_count: Enum.count(hash_string_by_number) + ) + + retry_beneficiaries_params(beneficiaries_params) + end + end + end + + defp filter_consensus_params(params_set, hash_string_by_number) do + Enum.filter(params_set, fn %{block_number: block_number, block_hash: block_hash} -> + case Map.fetch!(hash_string_by_number, block_number) do + ^block_hash -> + true + + other_block_hash -> + Logger.debug(fn -> + [ + "fetch beneficiaries reported block number (", + to_string(block_number), + ") maps to different (", + other_block_hash, + ") block hash than the one in the database (", + block_hash, + "). A reorg has occurred." + ] + end) + + false + end + end) + end + + defp add_gas_payments(beneficiaries_params) do + gas_payment_by_block_hash = + beneficiaries_params + |> Stream.filter(&(&1.address_type == :validator)) + |> Enum.map(& &1.block_hash) + |> Chain.gas_payment_by_block_hash() + + Enum.map(beneficiaries_params, fn %{block_hash: block_hash} = beneficiary -> + case gas_payment_by_block_hash do + %{^block_hash => gas_payment} -> + {:ok, minted} = Wei.cast(beneficiary.reward) + %{beneficiary | reward: Wei.sum(minted, gas_payment)} + + _ -> + beneficiary + end + end) + end + + defp import_block_reward_params(block_rewards_params) when is_list(block_rewards_params) do + addresses_params = AddressExtraction.extract_addresses(%{block_reward_contract_beneficiaries: block_rewards_params}) + address_coin_balances_params_set = CoinBalances.params_set(%{beneficiary_params: block_rewards_params}) + + Chain.import(%{ + addresses: %{params: addresses_params}, + address_coin_balances: %{params: address_coin_balances_params_set}, + block_rewards: %{params: block_rewards_params} + }) + end + + defp retry_beneficiaries_params(beneficiaries_params) when is_list(beneficiaries_params) do + entries = Enum.map(beneficiaries_params, & &1.block_number) + + {:retry, entries} + end + + defp retry_errors([]), do: :ok + + defp retry_errors(errors) when is_list(errors) do + retried_entries = fetched_beneficiaries_errors_to_entries(errors) + + Logger.error( + fn -> + [ + "failed to fetch: ", + fetched_beneficiaries_errors_to_iodata(errors) + ] + end, + error_count: Enum.count(retried_entries) + ) + + {:retry, retried_entries} + end + + defp fetched_beneficiaries_errors_to_entries(errors) when is_list(errors) do + Enum.map(errors, &fetched_beneficiary_error_to_entry/1) + end + + defp fetched_beneficiary_error_to_entry(%{data: %{block_quantity: block_quantity}}) when is_binary(block_quantity) do + quantity_to_integer(block_quantity) + end + + defp fetched_beneficiaries_errors_to_iodata(errors) when is_list(errors) do + fetched_beneficiaries_errors_to_iodata(errors, []) + end + + defp fetched_beneficiaries_errors_to_iodata([], iodata), do: iodata + + defp fetched_beneficiaries_errors_to_iodata([error | errors], iodata) do + fetched_beneficiaries_errors_to_iodata(errors, [iodata | fetched_beneficiary_error_to_iodata(error)]) + end + + defp fetched_beneficiary_error_to_iodata(%{code: code, message: message, data: %{block_quantity: block_quantity}}) + when is_integer(code) and is_binary(message) and is_binary(block_quantity) do + ["@", quantity_to_integer(block_quantity), ": (", to_string(code), ") ", message, ?\n] + end +end diff --git a/apps/indexer/lib/indexer/block/reward/supervisor.ex b/apps/indexer/lib/indexer/block/reward/supervisor.ex new file mode 100644 index 0000000000..de222dd328 --- /dev/null +++ b/apps/indexer/lib/indexer/block/reward/supervisor.ex @@ -0,0 +1,38 @@ +defmodule Indexer.Block.Reward.Supervisor do + @moduledoc """ + Supervises `Indexer.Block.Reward.Fetcher` and its batch tasks through `Indexer.Block.Reward.TaskSupervisor` + """ + + use Supervisor + + alias Indexer.Block.Reward.Fetcher + + def child_spec([init_arguments]) do + child_spec([init_arguments, []]) + end + + def child_spec([_init_arguments, _gen_server_options] = start_link_arguments) do + default = %{ + id: __MODULE__, + start: {__MODULE__, :start_link, start_link_arguments}, + type: :supervisor + } + + Supervisor.child_spec(default, []) + end + + def start_link(arguments, gen_server_options \\ []) do + Supervisor.start_link(__MODULE__, arguments, Keyword.put_new(gen_server_options, :name, __MODULE__)) + end + + @impl Supervisor + def init(fetcher_arguments) do + Supervisor.init( + [ + {Task.Supervisor, name: Indexer.Block.Reward.TaskSupervisor}, + {Fetcher, [fetcher_arguments, [name: Fetcher]]} + ], + strategy: :one_for_one + ) + end +end diff --git a/apps/indexer/lib/indexer/block/supervisor.ex b/apps/indexer/lib/indexer/block/supervisor.ex index 4278ff95ca..bfc5fb20f4 100644 --- a/apps/indexer/lib/indexer/block/supervisor.ex +++ b/apps/indexer/lib/indexer/block/supervisor.ex @@ -4,7 +4,7 @@ defmodule Indexer.Block.Supervisor do """ alias Indexer.Block - alias Indexer.Block.{Catchup, InvalidConsensus, Realtime, UncatalogedRewards, Uncle} + alias Indexer.Block.{Catchup, InvalidConsensus, Realtime, Reward, Uncle} use Supervisor @@ -41,7 +41,11 @@ defmodule Indexer.Block.Supervisor do [name: Realtime.Supervisor] ]}, {Uncle.Supervisor, [[block_fetcher: block_fetcher, memory_monitor: memory_monitor], [name: Uncle.Supervisor]]}, - {UncatalogedRewards.Processor, [json_rpc_named_arguments, [name: UncatalogedRewards.Processor]]} + {Reward.Supervisor, + [ + [json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor], + [name: Reward.Supervisor] + ]} ], strategy: :one_for_one ) diff --git a/apps/indexer/lib/indexer/block/uncataloged_rewards/importer.ex b/apps/indexer/lib/indexer/block/uncataloged_rewards/importer.ex deleted file mode 100644 index 33b8494c4f..0000000000 --- a/apps/indexer/lib/indexer/block/uncataloged_rewards/importer.ex +++ /dev/null @@ -1,115 +0,0 @@ -defmodule Indexer.Block.UncatalogedRewards.Importer do - @moduledoc """ - a module to fetch and import the rewards for blocks that were indexed without the reward - """ - - require Logger - - alias EthereumJSONRPC.FetchedBeneficiaries - alias Explorer.Chain - alias Explorer.Chain.Wei - - # max number of blocks in a single request - # higher numbers may cause the requests to time out - # lower numbers will generate more requests - @chunk_size 10 - - @doc """ - receives a list of blocks and tries to fetch and insert rewards for them - """ - def fetch_and_import_rewards(blocks, json_rpc_named_arguments) when is_list(blocks) do - block_rewards = - blocks - |> Stream.chunk_every(@chunk_size) - |> Enum.flat_map(&blocks_to_rewards(&1, json_rpc_named_arguments)) - - {:ok, block_rewards} - rescue - e in RuntimeError -> {:error, %{exception: e}} - end - - defp blocks_to_rewards(blocks, json_rpc_named_arguments) when is_list(blocks) do - blocks - |> fetch_beneficiaries(json_rpc_named_arguments) - |> case do - [] -> - [] - - beneficiaries_params -> - beneficiaries_params - |> add_gas_payments() - |> import_block_reward_params() - |> case do - {:ok, %{block_rewards: block_rewards}} -> block_rewards - end - end - end - - defp fetch_beneficiaries(blocks, json_rpc_named_arguments) when is_list(blocks) do - hash_by_number = Enum.into(blocks, %{}, &{&1.number, to_string(&1.hash)}) - - hash_by_number - |> Map.keys() - |> EthereumJSONRPC.fetch_beneficiaries(json_rpc_named_arguments) - |> case do - {:ok, %FetchedBeneficiaries{params_set: params_set}} -> - params_set_to_consensus_beneficiaries_params(params_set, hash_by_number) - - {:error, reason} -> - Logger.error(fn -> ["Could not fetch beneficiaries: ", inspect(reason)] end) - [] - - :ignore -> - [] - end - end - - defp params_set_to_consensus_beneficiaries_params(params_set, hash_by_number) do - params_set - |> Enum.filter(fn %{block_number: block_number, block_hash: block_hash} -> - case Map.fetch!(hash_by_number, block_number) do - ^block_hash -> - true - - other_block_hash -> - Logger.debug(fn -> - [ - "fetch beneficiaries reported block number (", - to_string(block_number), - ") maps to different (", - other_block_hash, - ") block hash than the one in the database (", - block_hash, - "). A reorg has occurred." - ] - end) - - false - end - end) - |> Enum.sort_by(&{&1.address_hash, &1.address_type, &1.block_hash}) - end - - defp add_gas_payments(beneficiaries) do - gas_payment_by_block_hash = - beneficiaries - |> Stream.filter(&(&1.address_type == :validator)) - |> Enum.map(& &1.block_hash) - |> Chain.gas_payment_by_block_hash() - - Enum.map(beneficiaries, fn %{block_hash: block_hash} = beneficiary -> - case gas_payment_by_block_hash do - %{^block_hash => gas_payment} -> - {:ok, minted} = Wei.cast(beneficiary.reward) - %{beneficiary | reward: Wei.sum(minted, gas_payment)} - - _ -> - beneficiary - end - end) - end - - defp import_block_reward_params(block_rewards_params) when is_list(block_rewards_params) do - Chain.import(%{block_rewards: %{params: block_rewards_params}}) - end -end diff --git a/apps/indexer/lib/indexer/block/uncataloged_rewards/processor.ex b/apps/indexer/lib/indexer/block/uncataloged_rewards/processor.ex deleted file mode 100644 index 63b356f5e4..0000000000 --- a/apps/indexer/lib/indexer/block/uncataloged_rewards/processor.ex +++ /dev/null @@ -1,55 +0,0 @@ -defmodule Indexer.Block.UncatalogedRewards.Processor do - @moduledoc """ - genserver to find blocks without rewards and fetch their rewards in batches - """ - - use GenServer - - alias Explorer.Chain - alias Indexer.Block.UncatalogedRewards.Importer - - @max_batch_size 150 - @default_cooldown 300 - - @doc false - def child_spec([json_rpc_named_arguments, gen_server_options]) do - Supervisor.child_spec({__MODULE__, [json_rpc_named_arguments, gen_server_options]}, id: __MODULE__) - end - - def start_link(init_options, gen_server_options) do - GenServer.start_link(__MODULE__, init_options, gen_server_options) - end - - @impl GenServer - def init(json_rpc_named_arguments) do - {:ok, json_rpc_named_arguments, {:continue, :import_batch}} - end - - @impl GenServer - def handle_continue(:import_batch, json_rpc_named_arguments) do - import_batch(json_rpc_named_arguments) - - {:noreply, json_rpc_named_arguments} - end - - @impl GenServer - def handle_info(:import_batch, json_rpc_named_arguments) do - import_batch(json_rpc_named_arguments) - - {:noreply, json_rpc_named_arguments} - end - - defp import_batch(json_rpc_named_arguments) do - @max_batch_size - |> Chain.get_blocks_without_reward() - |> import_or_try_later(json_rpc_named_arguments) - end - - defp import_or_try_later(batch, json_rpc_named_arguments) do - import_results = Importer.fetch_and_import_rewards(batch, json_rpc_named_arguments) - - wait_time = if import_results == {:ok, []}, do: :timer.hours(24), else: @default_cooldown - - Process.send_after(self(), :import_batch, wait_time) - end -end diff --git a/apps/indexer/lib/indexer/coin_balance/fetcher.ex b/apps/indexer/lib/indexer/coin_balance/fetcher.ex index 9fda896ee6..ac74b7fb15 100644 --- a/apps/indexer/lib/indexer/coin_balance/fetcher.ex +++ b/apps/indexer/lib/indexer/coin_balance/fetcher.ex @@ -38,6 +38,7 @@ defmodule Indexer.CoinBalance.Fetcher do end @doc false + # credo:disable-for-next-line Credo.Check.Design.DuplicatedCode def child_spec([init_options, gen_server_options]) do {state, mergeable_init_options} = Keyword.pop(init_options, :json_rpc_named_arguments) diff --git a/apps/indexer/test/indexer/block/catchup/fetcher_test.exs b/apps/indexer/test/indexer/block/catchup/fetcher_test.exs index 2f1f44081b..851064e365 100644 --- a/apps/indexer/test/indexer/block/catchup/fetcher_test.exs +++ b/apps/indexer/test/indexer/block/catchup/fetcher_test.exs @@ -5,7 +5,7 @@ defmodule Indexer.Block.Catchup.FetcherTest do import EthereumJSONRPC, only: [integer_to_quantity: 1] import Mox - alias Explorer.Chain.Block + alias Explorer.Chain alias Explorer.Chain.Block.Reward alias Indexer.{Block, CoinBalance, InternalTransaction, Token, TokenBalance} alias Indexer.Block.Catchup.Fetcher @@ -90,6 +90,7 @@ defmodule Indexer.Block.Catchup.FetcherTest do } ] }, + block_rewards: %{errors: [], params: []}, block_second_degree_relations: %{ params: [ %{ @@ -210,7 +211,7 @@ defmodule Indexer.Block.Catchup.FetcherTest do } end) - assert count(Block) == 0 + assert count(Chain.Block) == 0 assert %{first_block_number: ^block_number, missing_block_count: 1, shrunk: false} = Fetcher.task(%Fetcher{ @@ -221,9 +222,214 @@ defmodule Indexer.Block.Catchup.FetcherTest do } }) - assert count(Block) == 1 + assert count(Chain.Block) == 1 assert count(Reward) == 0 end + + test "async fetches beneficiaries when individual responses error out", %{ + json_rpc_named_arguments: json_rpc_named_arguments + } do + CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) + InternalTransaction.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) + Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) + TokenBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) + + latest_block_number = 1 + latest_block_quantity = integer_to_quantity(latest_block_number) + + block_number = latest_block_number - 1 + block_hash = block_hash() + block_quantity = integer_to_quantity(block_number) + + miner_hash = address_hash() + miner_hash_data = to_string(miner_hash) + + new_block_hash = block_hash() + + refute block_hash == new_block_hash + + EthereumJSONRPC.Mox + |> expect(:json_rpc, fn %{method: "eth_getBlockByNumber", params: ["latest", false]}, _options -> + {:ok, %{"number" => latest_block_quantity}} + end) + |> expect(:json_rpc, fn [ + %{ + id: id, + jsonrpc: "2.0", + method: "eth_getBlockByNumber", + params: [^block_quantity, true] + } + ], + _options -> + {:ok, + [ + %{ + id: id, + jsonrpc: "2.0", + result: %{ + "hash" => to_string(block_hash), + "number" => block_quantity, + "difficulty" => "0x0", + "gasLimit" => "0x0", + "gasUsed" => "0x0", + "extraData" => "0x0", + "logsBloom" => "0x0", + "miner" => miner_hash_data, + "parentHash" => + block_hash() + |> to_string(), + "receiptsRoot" => "0x0", + "size" => "0x0", + "sha3Uncles" => "0x0", + "stateRoot" => "0x0", + "timestamp" => "0x0", + "totalDifficulty" => "0x0", + "transactions" => [], + "transactionsRoot" => "0x0", + "uncles" => [] + } + } + ]} + end) + |> expect(:json_rpc, fn [%{id: id, method: "trace_block", params: [^block_quantity]}], _options -> + {:ok, + [ + %{ + id: id, + jsonrpc: "2.0", + result: nil + } + ]} + end) + + assert count(Chain.Block) == 0 + + parent = self() + + pid = + spawn_link(fn -> + receive do + {:"$gen_call", from, {:buffer, block_numbers}} -> + GenServer.reply(from, :ok) + send(parent, {:block_numbers, block_numbers}) + end + end) + + Process.register(pid, Indexer.Block.Reward.Fetcher) + + assert %{first_block_number: ^block_number, missing_block_count: 1, shrunk: false} = + Fetcher.task(%Fetcher{ + blocks_batch_size: 1, + block_fetcher: %Block.Fetcher{ + callback_module: Fetcher, + json_rpc_named_arguments: json_rpc_named_arguments + } + }) + + assert count(Chain.Block) == 1 + assert count(Reward) == 0 + + assert_receive {:block_numbers, [block_number]}, 5_000 + end + + test "async fetches beneficiaries when entire call errors out", %{ + json_rpc_named_arguments: json_rpc_named_arguments + } do + CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) + InternalTransaction.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) + Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) + TokenBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) + + latest_block_number = 1 + latest_block_quantity = integer_to_quantity(latest_block_number) + + block_number = latest_block_number - 1 + block_hash = block_hash() + block_quantity = integer_to_quantity(block_number) + + miner_hash = address_hash() + miner_hash_data = to_string(miner_hash) + + new_block_hash = block_hash() + + refute block_hash == new_block_hash + + EthereumJSONRPC.Mox + |> expect(:json_rpc, fn %{method: "eth_getBlockByNumber", params: ["latest", false]}, _options -> + {:ok, %{"number" => latest_block_quantity}} + end) + |> expect(:json_rpc, fn [ + %{ + id: id, + jsonrpc: "2.0", + method: "eth_getBlockByNumber", + params: [^block_quantity, true] + } + ], + _options -> + {:ok, + [ + %{ + id: id, + jsonrpc: "2.0", + result: %{ + "hash" => to_string(block_hash), + "number" => block_quantity, + "difficulty" => "0x0", + "gasLimit" => "0x0", + "gasUsed" => "0x0", + "extraData" => "0x0", + "logsBloom" => "0x0", + "miner" => miner_hash_data, + "parentHash" => + block_hash() + |> to_string(), + "receiptsRoot" => "0x0", + "size" => "0x0", + "sha3Uncles" => "0x0", + "stateRoot" => "0x0", + "timestamp" => "0x0", + "totalDifficulty" => "0x0", + "transactions" => [], + "transactionsRoot" => "0x0", + "uncles" => [] + } + } + ]} + end) + |> expect(:json_rpc, fn [%{method: "trace_block", params: [^block_quantity]}], _options -> + {:error, :boom} + end) + + assert count(Chain.Block) == 0 + + parent = self() + + pid = + spawn_link(fn -> + receive do + {:"$gen_call", from, {:buffer, block_numbers}} -> + GenServer.reply(from, :ok) + send(parent, {:block_numbers, block_numbers}) + end + end) + + Process.register(pid, Indexer.Block.Reward.Fetcher) + + assert %{first_block_number: ^block_number, missing_block_count: 1, shrunk: false} = + Fetcher.task(%Fetcher{ + blocks_batch_size: 1, + block_fetcher: %Block.Fetcher{ + callback_module: Fetcher, + json_rpc_named_arguments: json_rpc_named_arguments + } + }) + + assert count(Chain.Block) == 1 + assert count(Reward) == 0 + + assert_receive {:block_numbers, [block_number]}, 5_000 + end end defp count(schema) do diff --git a/apps/indexer/test/indexer/block/reward/fetcher_test.exs b/apps/indexer/test/indexer/block/reward/fetcher_test.exs new file mode 100644 index 0000000000..39ba7de998 --- /dev/null +++ b/apps/indexer/test/indexer/block/reward/fetcher_test.exs @@ -0,0 +1,687 @@ +defmodule Indexer.Block.Reward.FetcherTest do + # MUST be `async: false` so that {:shared, pid} is set for connection to allow CoinBalanceFetcher's self-send to have + # connection allowed immediately. + use EthereumJSONRPC.Case, async: false + use Explorer.DataCase + + import EthereumJSONRPC, only: [integer_to_quantity: 1] + import Mox + + alias Explorer.Chain + alias Explorer.Chain.{Block, Hash, Wei} + alias Indexer.Block.Reward + alias Indexer.BufferedTask + + @moduletag :capture_log + + # MUST use global mode because we aren't guaranteed to get `start_supervised`'s pid back fast enough to `allow` it to + # use expectations and stubs from test's pid. + setup :set_mox_global + + setup :verify_on_exit! + + setup do + start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor}) + + # Need to always mock to allow consensus switches to happen on demand and protect from them happening when we don't + # want them to. + %{ + json_rpc_named_arguments: [ + transport: EthereumJSONRPC.Mox, + transport_options: [], + # Which one does not matter, so pick one + variant: EthereumJSONRPC.Parity + ] + } + end + + describe "init/3" do + test "without blocks" do + assert [] = Reward.Fetcher.init([], &[&1 | &2], nil) + end + + test "with consensus block without reward" do + %Block{number: block_number} = insert(:block) + + assert [^block_number] = Reward.Fetcher.init([], &[&1 | &2], nil) + end + + test "with consensus block with reward" do + block = insert(:block) + insert(:reward, address_hash: block.miner_hash, block_hash: block.hash) + + assert [] = Reward.Fetcher.init([], &[&1 | &2], nil) + end + + test "with non-consensus block" do + insert(:block, consensus: false) + + assert [] = Reward.Fetcher.init([], &[&1 | &2], nil) + end + end + + describe "async_fetch/1" do + setup %{json_rpc_named_arguments: json_rpc_named_arguments} do + Reward.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) + + block = insert(:block) + + %{block: block} + end + + test "with consensus block without reward", %{ + block: %Block{ + hash: block_hash, + number: block_number, + miner_hash: %Hash{bytes: miner_hash_bytes} = miner_hash, + consensus: true + } + } do + block_quantity = integer_to_quantity(block_number) + + expect(EthereumJSONRPC.Mox, :json_rpc, fn [ + %{ + id: id, + jsonrpc: "2.0", + method: "trace_block", + params: [^block_quantity] + } + ], + _ -> + { + :ok, + [ + %{ + id: id, + jsonrpc: "2.0", + result: [ + %{ + "action" => %{ + "author" => to_string(miner_hash), + "rewardType" => "external", + "value" => "0x0" + }, + # ... but, switches to non-consensus by the time `trace_block` is called + "blockHash" => to_string(block_hash), + "blockNumber" => block_number, + "result" => nil, + "subtraces" => 0, + "traceAddress" => [], + "transactionHash" => nil, + "transactionPosition" => nil, + "type" => "reward" + } + ] + } + ] + } + end) + + assert count(Chain.Block.Reward) == 0 + + parent = self() + + pid = + spawn_link(fn -> + receive do + {:"$gen_call", from, {:buffer, balance_fields}} -> + GenServer.reply(from, :ok) + send(parent, {:balance_fields, balance_fields}) + end + end) + + Process.register(pid, Indexer.CoinBalance.Fetcher) + + assert :ok = Reward.Fetcher.async_fetch([block_number]) + + wait_for_tasks(Reward.Fetcher) + + assert count(Chain.Block.Reward) == 1 + assert_receive {:balance_fields, [{^miner_hash_bytes, ^block_number}]}, 500 + end + + test "with consensus block with reward", %{ + block: %Block{ + hash: block_hash, + number: block_number, + miner_hash: %Hash{bytes: miner_hash_bytes} = miner_hash, + consensus: true + } + } do + insert(:reward, block_hash: block_hash, address_hash: miner_hash) + + block_quantity = integer_to_quantity(block_number) + + expect(EthereumJSONRPC.Mox, :json_rpc, fn [ + %{ + id: id, + jsonrpc: "2.0", + method: "trace_block", + params: [^block_quantity] + } + ], + _ -> + { + :ok, + [ + %{ + id: id, + jsonrpc: "2.0", + result: [ + %{ + "action" => %{ + "author" => to_string(miner_hash), + "rewardType" => "external", + "value" => "0x0" + }, + # ... but, switches to non-consensus by the time `trace_block` is called + "blockHash" => to_string(block_hash), + "blockNumber" => block_number, + "result" => nil, + "subtraces" => 0, + "traceAddress" => [], + "transactionHash" => nil, + "transactionPosition" => nil, + "type" => "reward" + } + ] + } + ] + } + end) + + parent = self() + + pid = + spawn_link(fn -> + receive do + {:"$gen_call", from, {:buffer, balance_fields}} -> + GenServer.reply(from, :ok) + send(parent, {:balance_fields, balance_fields}) + end + end) + + Process.register(pid, Indexer.CoinBalance.Fetcher) + + assert :ok = Reward.Fetcher.async_fetch([block_number]) + + wait_for_tasks(Reward.Fetcher) + + assert count(Chain.Block.Reward) == 1 + assert_receive {:balance_fields, [{^miner_hash_bytes, ^block_number}]}, 500 + end + + test "with consensus block does not import if fetch beneficiaries returns a different block hash for block number", + %{block: %Block{hash: block_hash, number: block_number, consensus: true, miner_hash: miner_hash}} do + block_quantity = integer_to_quantity(block_number) + new_block_hash = block_hash() + + refute block_hash == new_block_hash + + expect(EthereumJSONRPC.Mox, :json_rpc, fn [ + %{ + id: id, + jsonrpc: "2.0", + method: "trace_block", + params: [^block_quantity] + } + ], + _ -> + { + :ok, + [ + %{ + id: id, + jsonrpc: "2.0", + result: [ + %{ + "action" => %{ + "author" => to_string(miner_hash), + "rewardType" => "external", + "value" => "0x0" + }, + # ... but, switches to non-consensus by the time `trace_block` is called + "blockHash" => to_string(new_block_hash), + "blockNumber" => block_number, + "result" => nil, + "subtraces" => 0, + "traceAddress" => [], + "transactionHash" => nil, + "transactionPosition" => nil, + "type" => "reward" + } + ] + } + ] + } + end) + + assert :ok = Reward.Fetcher.async_fetch([block_number]) + + wait_for_tasks(Reward.Fetcher) + + assert count(Chain.Block.Reward) == 0 + end + end + + describe "run/2" do + setup do + block = insert(:block) + + %{block: block} + end + + test "with consensus block without reward", %{ + block: %Block{ + hash: block_hash, + number: block_number, + miner_hash: %Hash{bytes: miner_hash_bytes} = miner_hash, + consensus: true + }, + json_rpc_named_arguments: json_rpc_named_arguments + } do + block_quantity = integer_to_quantity(block_number) + + expect(EthereumJSONRPC.Mox, :json_rpc, fn [ + %{ + id: id, + jsonrpc: "2.0", + method: "trace_block", + params: [^block_quantity] + } + ], + _ -> + { + :ok, + [ + %{ + id: id, + jsonrpc: "2.0", + result: [ + %{ + "action" => %{ + "author" => to_string(miner_hash), + "rewardType" => "external", + "value" => "0x0" + }, + # ... but, switches to non-consensus by the time `trace_block` is called + "blockHash" => to_string(block_hash), + "blockNumber" => block_number, + "result" => nil, + "subtraces" => 0, + "traceAddress" => [], + "transactionHash" => nil, + "transactionPosition" => nil, + "type" => "reward" + } + ] + } + ] + } + end) + + assert count(Chain.Block.Reward) == 0 + assert count(Chain.Address.CoinBalance) == 0 + + parent = self() + + pid = + spawn_link(fn -> + receive do + {:"$gen_call", from, {:buffer, balance_fields}} -> + GenServer.reply(from, :ok) + send(parent, {:balance_fields, balance_fields}) + end + end) + + Process.register(pid, Indexer.CoinBalance.Fetcher) + + assert :ok = Reward.Fetcher.run([block_number], json_rpc_named_arguments) + + assert count(Chain.Block.Reward) == 1 + assert count(Chain.Address.CoinBalance) == 1 + assert_receive {:balance_fields, [{^miner_hash_bytes, ^block_number}]}, 500 + end + + test "with consensus block without reward with new address adds rewards for all addresses", %{ + block: %Block{ + hash: block_hash, + number: block_number, + miner_hash: %Hash{bytes: miner_hash_bytes} = miner_hash, + consensus: true + }, + json_rpc_named_arguments: json_rpc_named_arguments + } do + block_quantity = integer_to_quantity(block_number) + %Hash{bytes: new_address_hash_bytes} = new_address_hash = address_hash() + + expect(EthereumJSONRPC.Mox, :json_rpc, fn [ + %{ + id: id, + jsonrpc: "2.0", + method: "trace_block", + params: [^block_quantity] + } + ], + _ -> + { + :ok, + [ + %{ + id: id, + jsonrpc: "2.0", + result: [ + %{ + "action" => %{ + "author" => to_string(miner_hash), + "rewardType" => "external", + "value" => "0x1" + }, + # ... but, switches to non-consensus by the time `trace_block` is called + "blockHash" => to_string(block_hash), + "blockNumber" => block_number, + "result" => nil, + "subtraces" => 0, + "traceAddress" => [], + "transactionHash" => nil, + "transactionPosition" => nil, + "type" => "reward" + }, + %{ + "action" => %{ + "author" => to_string(new_address_hash), + "rewardType" => "external", + "value" => "0x2" + }, + "blockHash" => to_string(block_hash), + "blockNumber" => block_number, + "result" => nil, + "subtraces" => 0, + "traceAddress" => [], + "transactionHash" => nil, + "transactionPosition" => nil, + "type" => "reward" + } + ] + } + ] + } + end) + + assert count(Chain.Block.Reward) == 0 + assert count(Chain.Address.CoinBalance) == 0 + + parent = self() + + pid = + spawn_link(fn -> + receive do + {:"$gen_call", from, {:buffer, balance_fields}} -> + GenServer.reply(from, :ok) + send(parent, {:balance_fields, balance_fields}) + end + end) + + Process.register(pid, Indexer.CoinBalance.Fetcher) + + assert :ok = Reward.Fetcher.run([block_number], json_rpc_named_arguments) + + assert count(Chain.Block.Reward) == 2 + assert count(Chain.Address.CoinBalance) == 2 + + assert_receive {:balance_fields, balance_fields}, 500 + assert {miner_hash_bytes, block_number} in balance_fields + assert {new_address_hash_bytes, block_number} in balance_fields + end + + test "with consensus block with reward", %{ + block: %Block{ + hash: block_hash, + number: block_number, + miner_hash: %Hash{bytes: miner_hash_bytes} = miner_hash, + consensus: true + }, + json_rpc_named_arguments: json_rpc_named_arguments + } do + insert(:reward, block_hash: block_hash, address_hash: miner_hash, reward: 0) + insert(:unfetched_balance, address_hash: miner_hash, block_number: block_number) + + block_quantity = integer_to_quantity(block_number) + + expect(EthereumJSONRPC.Mox, :json_rpc, fn [ + %{ + id: id, + jsonrpc: "2.0", + method: "trace_block", + params: [^block_quantity] + } + ], + _ -> + { + :ok, + [ + %{ + id: id, + jsonrpc: "2.0", + result: [ + %{ + "action" => %{ + "author" => to_string(miner_hash), + "rewardType" => "external", + "value" => "0x1" + }, + # ... but, switches to non-consensus by the time `trace_block` is called + "blockHash" => to_string(block_hash), + "blockNumber" => block_number, + "result" => nil, + "subtraces" => 0, + "traceAddress" => [], + "transactionHash" => nil, + "transactionPosition" => nil, + "type" => "reward" + } + ] + } + ] + } + end) + + assert count(Chain.Block.Reward) == 1 + assert count(Chain.Address.CoinBalance) == 1 + + value = Decimal.new(0) + + assert [%Chain.Block.Reward{reward: %Wei{value: ^value}}] = Repo.all(Chain.Block.Reward) + + parent = self() + + pid = + spawn_link(fn -> + receive do + {:"$gen_call", from, {:buffer, balance_fields}} -> + GenServer.reply(from, :ok) + send(parent, {:balance_fields, balance_fields}) + end + end) + + Process.register(pid, Indexer.CoinBalance.Fetcher) + + assert :ok = Reward.Fetcher.run([block_number], json_rpc_named_arguments) + + assert count(Chain.Block.Reward) == 1 + assert count(Chain.Address.CoinBalance) == 1 + + value = Decimal.new(1) + + assert [%Chain.Block.Reward{reward: %Wei{value: ^value}}] = Repo.all(Chain.Block.Reward) + assert_receive {:balance_fields, [{^miner_hash_bytes, ^block_number}]}, 500 + end + + test "with consensus block does not import if fetch beneficiaries returns a different block hash for block number", + %{ + block: %Block{hash: block_hash, number: block_number, consensus: true, miner_hash: miner_hash}, + json_rpc_named_arguments: json_rpc_named_arguments + } do + block_quantity = integer_to_quantity(block_number) + new_block_hash = block_hash() + + refute block_hash == new_block_hash + + expect(EthereumJSONRPC.Mox, :json_rpc, fn [ + %{ + id: id, + jsonrpc: "2.0", + method: "trace_block", + params: [^block_quantity] + } + ], + _ -> + { + :ok, + [ + %{ + id: id, + jsonrpc: "2.0", + result: [ + %{ + "action" => %{ + "author" => to_string(miner_hash), + "rewardType" => "external", + "value" => "0x0" + }, + # ... but, switches to non-consensus by the time `trace_block` is called + "blockHash" => to_string(new_block_hash), + "blockNumber" => block_number, + "result" => nil, + "subtraces" => 0, + "traceAddress" => [], + "transactionHash" => nil, + "transactionPosition" => nil, + "type" => "reward" + } + ] + } + ] + } + end) + + assert :ok = Reward.Fetcher.run([block_number], json_rpc_named_arguments) + + assert count(Chain.Block.Reward) == 0 + assert count(Chain.Address.CoinBalance) == 0 + end + + test "with mix of beneficiaries_params and errors, imports beneficiaries_params and retries errors", %{ + block: %Block{ + hash: block_hash, + number: block_number, + miner_hash: %Hash{bytes: miner_hash_bytes} = miner_hash, + consensus: true + }, + json_rpc_named_arguments: json_rpc_named_arguments + } do + block_quantity = integer_to_quantity(block_number) + %Block{number: error_block_number} = insert(:block) + + error_block_quantity = integer_to_quantity(error_block_number) + + EthereumJSONRPC.Mox + |> expect(:json_rpc, fn [_, _] = requests, _ -> + { + :ok, + Enum.map(requests, fn + %{ + id: id, + jsonrpc: "2.0", + method: "trace_block", + params: [^block_quantity] + } -> + %{ + id: id, + jsonrpc: "2.0", + result: [ + %{ + "action" => %{ + "author" => to_string(miner_hash), + "rewardType" => "external", + "value" => "0x1" + }, + # ... but, switches to non-consensus by the time `trace_block` is called + "blockHash" => to_string(block_hash), + "blockNumber" => block_number, + "result" => nil, + "subtraces" => 0, + "traceAddress" => [], + "transactionHash" => nil, + "transactionPosition" => nil, + "type" => "reward" + } + ] + } + + %{id: id, jsonrpc: "2.0", method: "trace_block", params: [^error_block_quantity]} -> + %{id: id, jsonrpc: "2.0", result: nil} + end) + } + end) + + assert count(Chain.Block.Reward) == 0 + assert count(Chain.Address.CoinBalance) == 0 + + parent = self() + + pid = + spawn_link(fn -> + receive do + {:"$gen_call", from, {:buffer, balance_fields}} -> + GenServer.reply(from, :ok) + send(parent, {:balance_fields, balance_fields}) + end + end) + + Process.register(pid, Indexer.CoinBalance.Fetcher) + + assert {:retry, [^error_block_number]} = + Reward.Fetcher.run([block_number, error_block_number], json_rpc_named_arguments) + + assert count(Chain.Block.Reward) == 1 + assert count(Chain.Address.CoinBalance) == 1 + + assert_receive {:balance_fields, balance_fields}, 500 + assert {miner_hash_bytes, block_number} in balance_fields + end + end + + defp count(schema) do + Repo.one!(select(schema, fragment("COUNT(*)"))) + end + + defp wait_for_tasks(buffered_task) do + wait_until(:timer.seconds(10), fn -> + counts = BufferedTask.debug_count(buffered_task) + counts.buffer == 0 and counts.tasks == 0 + end) + end + + defp wait_until(timeout, producer) do + parent = self() + ref = make_ref() + + spawn(fn -> do_wait_until(parent, ref, producer) end) + + receive do + {^ref, :ok} -> :ok + after + timeout -> exit(:timeout) + end + end + + defp do_wait_until(parent, ref, producer) do + if producer.() do + send(parent, {ref, :ok}) + else + :timer.sleep(100) + do_wait_until(parent, ref, producer) + end + end +end diff --git a/apps/indexer/test/indexer/block/uncataloged_rewards/importer_test.exs b/apps/indexer/test/indexer/block/uncataloged_rewards/importer_test.exs deleted file mode 100644 index c72cb069dd..0000000000 --- a/apps/indexer/test/indexer/block/uncataloged_rewards/importer_test.exs +++ /dev/null @@ -1,145 +0,0 @@ -defmodule Indexer.Block.UncatalogedRewards.ImporterTest do - use EthereumJSONRPC.Case, async: false - use Explorer.DataCase - - import Mox - - alias Explorer.Chain.Wei - alias Explorer.Chain.Block.Reward - alias Indexer.Block.UncatalogedRewards.Importer - - describe "fetch_and_import_rewards/1" do - test "return `{:ok, []}` when receiving an empty list", %{json_rpc_named_arguments: json_rpc_named_arguments} do - assert Importer.fetch_and_import_rewards([], json_rpc_named_arguments) == {:ok, []} - end - - @tag :no_geth - test "return `{:ok, [transactions executed]}`", %{json_rpc_named_arguments: json_rpc_named_arguments} do - address = insert(:address) - address_hash = address.hash - - block = insert(:block, number: 1234, miner: address) - block_hash = block.hash - - expect(EthereumJSONRPC.Mox, :json_rpc, fn [%{id: id, method: "trace_block", params: _params}], _options -> - {:ok, - [ - %{ - id: id, - result: [ - %{ - "action" => %{ - "author" => to_string(address_hash), - "rewardType" => "external", - "value" => "0xde0b6b3a7640000" - }, - "blockHash" => to_string(block_hash), - "blockNumber" => block.number, - "result" => nil, - "subtraces" => 0, - "traceAddress" => [], - "transactionHash" => nil, - "transactionPosition" => nil, - "type" => "reward" - } - ] - } - ]} - end) - - assert {:ok, - [ - %Reward{ - address_hash: ^address_hash, - block_hash: ^block_hash, - address_type: :validator - } - ]} = Importer.fetch_and_import_rewards([block], json_rpc_named_arguments) - end - - @tag :no_geth - test "replaces reward on conflict", %{json_rpc_named_arguments: json_rpc_named_arguments} do - miner = insert(:address) - block = insert(:block, miner: miner) - block_hash = block.hash - address_type = :validator - insert(:reward, block_hash: block_hash, address_hash: miner.hash, address_type: address_type, reward: 1) - value = "0x2" - - expect(EthereumJSONRPC.Mox, :json_rpc, fn [%{id: id, method: "trace_block"}], _options -> - {:ok, - [ - %{ - id: id, - result: [ - %{ - "action" => %{ - "author" => to_string(miner), - "rewardType" => "external", - "value" => value - }, - "blockHash" => to_string(block_hash), - "blockNumber" => block.number, - "result" => nil, - "subtraces" => 0, - "traceAddress" => [], - "transactionHash" => nil, - "transactionPosition" => nil, - "type" => "reward" - } - ] - } - ]} - end) - - {:ok, reward} = Wei.cast(value) - - assert {:ok, [%Reward{block_hash: ^block_hash, address_type: ^address_type, reward: ^reward}]} = - Importer.fetch_and_import_rewards([block], json_rpc_named_arguments) - end - - # regression test for https://github.com/poanetwork/blockscout/issues/1337 - test "with different block hash due to consensus switch between database query and trace_block", %{ - json_rpc_named_arguments: json_rpc_named_arguments - } do - miner = insert(:address) - block = insert(:block, miner: miner) - block_hash = block.hash - address_type = :validator - insert(:reward, block_hash: block_hash, address_hash: miner.hash, address_type: address_type, reward: 1) - value = "0x2" - - expect(EthereumJSONRPC.Mox, :json_rpc, fn [%{id: id, method: "trace_block"}], _options -> - trace_block_block_hash = block_hash() - - refute trace_block_block_hash == block_hash - - {:ok, - [ - %{ - id: id, - result: [ - %{ - "action" => %{ - "author" => to_string(miner), - "rewardType" => "external", - "value" => value - }, - "blockHash" => to_string(trace_block_block_hash), - "blockNumber" => block.number, - "result" => nil, - "subtraces" => 0, - "traceAddress" => [], - "transactionHash" => nil, - "transactionPosition" => nil, - "type" => "reward" - } - ] - } - ]} - end) - - assert {:ok, []} = Importer.fetch_and_import_rewards([block], json_rpc_named_arguments) - end - end -end diff --git a/apps/indexer/test/support/indexer/block/reward/supervisor/case.ex b/apps/indexer/test/support/indexer/block/reward/supervisor/case.ex new file mode 100644 index 0000000000..531eee1708 --- /dev/null +++ b/apps/indexer/test/support/indexer/block/reward/supervisor/case.ex @@ -0,0 +1,17 @@ +defmodule Indexer.Block.Reward.Supervisor.Case do + alias Indexer.Block.Reward + + def start_supervised!(fetcher_arguments \\ []) when is_list(fetcher_arguments) do + merged_fetcher_arguments = + Keyword.merge( + fetcher_arguments, + flush_interval: 50, + max_batch_size: 1, + max_concurrency: 1 + ) + + [merged_fetcher_arguments] + |> Reward.Supervisor.child_spec() + |> ExUnit.Callbacks.start_supervised!() + end +end