Merge branch 'master' into ab-catchup-code-fetcher

pull/1370/head
Ayrat Badykov 6 years ago committed by GitHub
commit e66f79b01e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex
  2. 11
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/parity.ex
  3. 1
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/parity/fetched_beneficiaries.ex
  4. 5
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/variant.ex
  5. 2
      apps/ethereum_jsonrpc/test/ethereum_jsonrpc/parity/fetched_beneficiaries_test.exs
  6. 13
      apps/ethereum_jsonrpc/test/ethereum_jsonrpc/parity_test.exs
  7. 2
      apps/ethereum_jsonrpc/test/ethereum_jsonrpc_test.exs
  8. 83
      apps/explorer/lib/explorer/chain.ex
  9. 4
      apps/explorer/lib/explorer/chain/block.ex
  10. 21
      apps/explorer/lib/explorer/chain/import/runner/block/rewards.ex
  11. 14
      apps/explorer/lib/explorer/chain/wei.ex
  12. 4
      apps/explorer/test/explorer/chain/block_test.exs
  13. 73
      apps/explorer/test/explorer/chain/wei_test.exs
  14. 72
      apps/explorer/test/explorer/chain_test.exs
  15. 28
      apps/indexer/lib/indexer/block/catchup/fetcher.ex
  16. 126
      apps/indexer/lib/indexer/block/fetcher.ex
  17. 16
      apps/indexer/lib/indexer/block/realtime/fetcher.ex
  18. 246
      apps/indexer/lib/indexer/block/reward/fetcher.ex
  19. 38
      apps/indexer/lib/indexer/block/reward/supervisor.ex
  20. 16
      apps/indexer/lib/indexer/block/supervisor.ex
  21. 115
      apps/indexer/lib/indexer/block/uncataloged_rewards/importer.ex
  22. 40
      apps/indexer/lib/indexer/block/uncataloged_rewards/processor.ex
  23. 1
      apps/indexer/lib/indexer/coin_balance/fetcher.ex
  24. 324
      apps/indexer/test/indexer/block/catchup/fetcher_test.exs
  25. 687
      apps/indexer/test/indexer/block/reward/fetcher_test.exs
  26. 104
      apps/indexer/test/indexer/block/uncataloged_rewards/importer_test.exs
  27. 17
      apps/indexer/test/support/indexer/block/reward/supervisor/case.ex

@ -29,6 +29,7 @@ defmodule EthereumJSONRPC do
Block, Block,
Blocks, Blocks,
FetchedBalances, FetchedBalances,
FetchedBeneficiaries,
FetchedCodes, FetchedCodes,
Receipts, Receipts,
RequestCoordinator, RequestCoordinator,
@ -229,8 +230,10 @@ defmodule EthereumJSONRPC do
@doc """ @doc """
Fetches block reward contract beneficiaries from variant API. Fetches block reward contract beneficiaries from variant API.
""" """
def fetch_beneficiaries(_first.._last = range, json_rpc_named_arguments) do @spec fetch_beneficiaries([block_number], json_rpc_named_arguments) ::
Keyword.fetch!(json_rpc_named_arguments, :variant).fetch_beneficiaries(range, json_rpc_named_arguments) {:ok, FetchedBeneficiaries.t()} | {:error, reason :: term} | :ignore
def fetch_beneficiaries(block_numbers, json_rpc_named_arguments) when is_list(block_numbers) do
Keyword.fetch!(json_rpc_named_arguments, :variant).fetch_beneficiaries(block_numbers, json_rpc_named_arguments)
end end
@doc """ @doc """

@ -11,10 +11,11 @@ defmodule EthereumJSONRPC.Parity do
@behaviour EthereumJSONRPC.Variant @behaviour EthereumJSONRPC.Variant
@impl EthereumJSONRPC.Variant @impl EthereumJSONRPC.Variant
def fetch_beneficiaries(_.._ = block_range, json_rpc_named_arguments) when is_list(json_rpc_named_arguments) do def fetch_beneficiaries(block_numbers, json_rpc_named_arguments)
when is_list(block_numbers) and is_list(json_rpc_named_arguments) do
id_to_params = id_to_params =
block_range block_numbers
|> block_range_to_params_list() |> block_numbers_to_params_list()
|> id_to_params() |> id_to_params()
with {:ok, responses} <- with {:ok, responses} <-
@ -63,8 +64,8 @@ defmodule EthereumJSONRPC.Parity do
end end
end end
defp block_range_to_params_list(_.._ = block_range) do defp block_numbers_to_params_list(block_numbers) when is_list(block_numbers) do
Enum.map(block_range, &%{block_quantity: integer_to_quantity(&1)}) Enum.map(block_numbers, &%{block_quantity: integer_to_quantity(&1)})
end end
defp trace_replay_transaction_responses_to_internal_transactions_params(responses, id_to_params) defp trace_replay_transaction_responses_to_internal_transactions_params(responses, id_to_params)

@ -30,7 +30,6 @@ defmodule EthereumJSONRPC.Parity.FetchedBeneficiaries do
...> }, ...> },
...> %{ ...> %{
...> "action" => %{"author" => "0x2", "rewardType" => "external", "value" => "0x0"}, ...> "action" => %{"author" => "0x2", "rewardType" => "external", "value" => "0x0"},
...> "blockHash" => "0x52a8d2185282506ce681364d2aa0c085ba45fdeb5d6c0ddec1131617a71ee2ca",
...> "blockHash" => "0xFFF", ...> "blockHash" => "0xFFF",
...> "blockNumber" => 12, ...> "blockNumber" => 12,
...> "result" => nil, ...> "result" => nil,

@ -14,8 +14,7 @@ defmodule EthereumJSONRPC.Variant do
@type internal_transaction_params :: map() @type internal_transaction_params :: map()
@doc """ @doc """
Fetch the block reward contract beneficiaries for a given block Fetch the block reward contract beneficiaries for a given blocks from the variant of the Ethereum JSONRPC API.
range, from the variant of the Ethereum JSONRPC API.
For more information on block reward contracts see: For more information on block reward contracts see:
https://wiki.parity.io/Block-Reward-Contract.html https://wiki.parity.io/Block-Reward-Contract.html
@ -26,7 +25,7 @@ defmodule EthereumJSONRPC.Variant do
* `{:error, reason}` - there was an error at the transport level * `{:error, reason}` - there was an error at the transport level
* `:ignore` - the variant does not support fetching beneficiaries * `:ignore` - the variant does not support fetching beneficiaries
""" """
@callback fetch_beneficiaries(Range.t(), EthereumJSONRPC.json_rpc_named_arguments()) :: @callback fetch_beneficiaries([EthereumJSONRPC.block_number()], EthereumJSONRPC.json_rpc_named_arguments()) ::
{:ok, FetchedBeneficiaries.t()} | {:error, reason :: term} | :ignore {:ok, FetchedBeneficiaries.t()} | {:error, reason :: term} | :ignore
@doc """ @doc """

@ -41,7 +41,7 @@ defmodule EthereumJSONRPC.Parity.FetchedBeneficiariesTest do
block_number = 1_000 block_number = 1_000
block_quantity = EthereumJSONRPC.integer_to_quantity(block_number) block_quantity = EthereumJSONRPC.integer_to_quantity(block_number)
hash1 = "0xef481b4e2c3ed62265617f2e9dfcdf3cf3efc11a" hash1 = "0xef481b4e2c3ed62265617f2e9dfcdf3cf3efc11a"
hash2 = "0x523b6539ff08d72a6c8bb598af95bf50c1ea839c" hash2 = "0xef481b4e2c3ed62265617f2e9dfcdf3cf3efc11a"
reward = "0xde0b6b3a7640000" reward = "0xde0b6b3a7640000"
responses = [ responses = [

@ -291,7 +291,7 @@ defmodule EthereumJSONRPC.ParityTest do
end end
assert {:ok, %FetchedBeneficiaries{params_set: params_set}} = assert {:ok, %FetchedBeneficiaries{params_set: params_set}} =
EthereumJSONRPC.Parity.fetch_beneficiaries(5_080_887..5_080_887, json_rpc_named_arguments) EthereumJSONRPC.Parity.fetch_beneficiaries([5_080_887], json_rpc_named_arguments)
assert Enum.count(params_set) == 2 assert Enum.count(params_set) == 2
@ -365,7 +365,7 @@ defmodule EthereumJSONRPC.ParityTest do
end end
assert {:ok, %FetchedBeneficiaries{params_set: params_set, errors: []}} = assert {:ok, %FetchedBeneficiaries{params_set: params_set, errors: []}} =
EthereumJSONRPC.Parity.fetch_beneficiaries(5_609_295..5_609_295, json_rpc_named_arguments) EthereumJSONRPC.Parity.fetch_beneficiaries([5_609_295], json_rpc_named_arguments)
assert Enum.count(params_set) == 2 assert Enum.count(params_set) == 2
@ -396,7 +396,7 @@ defmodule EthereumJSONRPC.ParityTest do
end) end)
assert {:ok, %FetchedBeneficiaries{params_set: params_set}} = assert {:ok, %FetchedBeneficiaries{params_set: params_set}} =
EthereumJSONRPC.Parity.fetch_beneficiaries(5_080_887..5_080_887, json_rpc_named_arguments) EthereumJSONRPC.Parity.fetch_beneficiaries([5_080_887], json_rpc_named_arguments)
assert Enum.empty?(params_set) assert Enum.empty?(params_set)
end end
@ -473,7 +473,7 @@ defmodule EthereumJSONRPC.ParityTest do
end end
assert {:ok, %FetchedBeneficiaries{params_set: params_set}} = assert {:ok, %FetchedBeneficiaries{params_set: params_set}} =
EthereumJSONRPC.Parity.fetch_beneficiaries(5_077_429..5_077_429, json_rpc_named_arguments) EthereumJSONRPC.Parity.fetch_beneficiaries([5_077_429], json_rpc_named_arguments)
assert Enum.count(params_set) == 2 assert Enum.count(params_set) == 2
@ -594,7 +594,7 @@ defmodule EthereumJSONRPC.ParityTest do
assert {:ok, %FetchedBeneficiaries{params_set: params_set}} = assert {:ok, %FetchedBeneficiaries{params_set: params_set}} =
EthereumJSONRPC.Parity.fetch_beneficiaries( EthereumJSONRPC.Parity.fetch_beneficiaries(
block_number1..block_number2, [block_number1, block_number2],
json_rpc_named_arguments json_rpc_named_arguments
) )
@ -641,8 +641,7 @@ defmodule EthereumJSONRPC.ParityTest do
{:error, "oops"} {:error, "oops"}
end) end)
assert {:error, "oops"} = assert {:error, "oops"} = EthereumJSONRPC.Parity.fetch_beneficiaries([5_080_887], json_rpc_named_arguments)
EthereumJSONRPC.Parity.fetch_beneficiaries(5_080_887..5_080_887, json_rpc_named_arguments)
end end
end end
end end

@ -241,7 +241,7 @@ defmodule EthereumJSONRPCTest do
{:ok, []} {:ok, []}
end) end)
assert EthereumJSONRPC.fetch_beneficiaries(1..1, json_rpc_named_arguments) == assert EthereumJSONRPC.fetch_beneficiaries([1], json_rpc_named_arguments) ==
{:ok, %FetchedBeneficiaries{params_set: MapSet.new(), errors: []}} {:ok, %FetchedBeneficiaries{params_set: MapSet.new(), errors: []}}
end end
end end

@ -321,24 +321,6 @@ defmodule Explorer.Chain do
Repo.aggregate(Block, :count, :hash) Repo.aggregate(Block, :count, :hash)
end end
@doc !"""
Returns a default value if no value is found.
"""
defmacrop default_if_empty(value, default) do
quote do
fragment("coalesce(?, ?)", unquote(value), unquote(default))
end
end
@doc !"""
Sum of the products of two columns.
"""
defmacrop sum_of_products(col_a, col_b) do
quote do
sum(fragment("?*?", unquote(col_a), unquote(col_b)))
end
end
@doc """ @doc """
Reward for mining a block. Reward for mining a block.
@ -362,20 +344,33 @@ defmodule Explorer.Chain do
on: fragment("? <@ ?", block.number, emission_reward.block_range), on: fragment("? <@ ?", block.number, emission_reward.block_range),
where: block.number == ^block_number, where: block.number == ^block_number,
group_by: emission_reward.reward, group_by: emission_reward.reward,
select: %{ select: %Wei{
transaction_reward: %Wei{ value: coalesce(sum(transaction.gas_used * transaction.gas_price), 0) + emission_reward.reward
value: default_if_empty(sum_of_products(transaction.gas_used, transaction.gas_price), 0)
},
static_reward: emission_reward.reward
} }
) )
%{ Repo.one!(query)
transaction_reward: transaction_reward, end
static_reward: static_reward
} = Repo.one(query)
Wei.sum(transaction_reward, static_reward) @doc """
The `t:Explorer.Chain.Wei.t/0` paid to the miners of the `t:Explorer.Chain.Block.t/0`s with `hash`
`Explorer.Chain.Hash.Full.t/0` by the signers of the transactions in those blocks to cover the gas fee
(`gas_used * gas_price`).
"""
@spec gas_payment_by_block_hash([Hash.Full.t()]) :: %{Hash.Full.t() => Wei.t()}
def gas_payment_by_block_hash(block_hashes) when is_list(block_hashes) do
query =
from(
block in Block,
left_join: transaction in assoc(block, :transactions),
where: block.hash in ^block_hashes and block.consensus == true,
group_by: block.hash,
select: {block.hash, %Wei{value: coalesce(sum(transaction.gas_used * transaction.gas_price), 0)}}
)
query
|> Repo.all()
|> Enum.into(%{})
end end
@doc """ @doc """
@ -963,6 +958,29 @@ defmodule Explorer.Chain do
|> Repo.all() |> Repo.all()
end end
@doc """
Map `block_number`s to their `t:Explorer.Chain.Block.t/0` `hash` `t:Explorer.Chain.Hash.Full.t/0`.
Does not include non-consensus blocks.
iex> block = insert(:block, consensus: false)
iex> Explorer.Chain.block_hash_by_number([block.number])
%{}
"""
@spec block_hash_by_number([Block.block_number()]) :: %{Block.block_number() => Hash.Full.t()}
def block_hash_by_number(block_numbers) when is_list(block_numbers) do
query =
from(block in Block,
where: block.consensus == true and block.number in ^block_numbers,
select: {block.number, block.hash}
)
query
|> Repo.all()
|> Enum.into(%{})
end
@doc """ @doc """
Lists the top 250 `t:Explorer.Chain.Address.t/0`'s' in descending order based on coin balance. Lists the top 250 `t:Explorer.Chain.Address.t/0`'s' in descending order based on coin balance.
@ -982,12 +1000,11 @@ defmodule Explorer.Chain do
end end
@doc """ @doc """
Finds blocks without a reward associated, up to the specified limit Calls `reducer` on a stream of `t:Explorer.Chain.Block.t/0` without `t:Explorer.Chain.Block.Reward.t/0`.
""" """
def get_blocks_without_reward(limit \\ 250) do def stream_blocks_without_rewards(initial, reducer) when is_function(reducer, 2) do
Block.get_blocks_without_reward() Block.blocks_without_reward_query()
|> limit(^limit) |> Repo.stream_reduce(initial, reducer)
|> Repo.all()
end end
@doc """ @doc """

@ -101,9 +101,9 @@ defmodule Explorer.Chain.Block do
|> unique_constraint(:hash, name: :blocks_pkey) |> unique_constraint(:hash, name: :blocks_pkey)
end end
def get_blocks_without_reward(query \\ __MODULE__) do def blocks_without_reward_query do
from( from(
b in query, b in __MODULE__,
left_join: r in Reward, left_join: r in Reward,
on: [block_hash: b.hash], on: [block_hash: b.hash],
where: is_nil(r.block_hash) and b.consensus == true where: is_nil(r.block_hash) and b.consensus == true

@ -3,6 +3,8 @@ defmodule Explorer.Chain.Import.Runner.Block.Rewards do
Bulk imports `t:Explorer.Chain.Block.Reward.t/0`. Bulk imports `t:Explorer.Chain.Block.Reward.t/0`.
""" """
import Ecto.Query, only: [from: 2]
alias Ecto.{Changeset, Multi, Repo} alias Ecto.{Changeset, Multi, Repo}
alias Explorer.Chain.Block.Reward alias Explorer.Chain.Block.Reward
alias Explorer.Chain.Import alias Explorer.Chain.Import
@ -42,20 +44,33 @@ defmodule Explorer.Chain.Import.Runner.Block.Rewards do
def timeout, do: @timeout def timeout, do: @timeout
@spec insert(Repo.t(), [map()], %{ @spec insert(Repo.t(), [map()], %{
optional(:on_conflict) => Import.Runner.on_conflict(),
required(:timeout) => timeout, required(:timeout) => timeout,
required(:timestamps) => Import.timestamps() required(:timestamps) => Import.timestamps()
}) :: {:ok, [Reward.t()]} | {:error, [Changeset.t()]} }) :: {:ok, [Reward.t()]} | {:error, [Changeset.t()]}
defp insert(repo, changes_list, %{timeout: timeout, timestamps: timestamps}) defp insert(repo, changes_list, %{timeout: timeout, timestamps: timestamps} = options)
when is_list(changes_list) do when is_list(changes_list) do
on_conflict = Map.get_lazy(options, :on_conflict, &default_on_conflict/0)
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = Enum.sort_by(changes_list, &{&1.address_hash, &1.address_type, &1.block_hash})
Import.insert_changes_list( Import.insert_changes_list(
repo, repo,
changes_list, ordered_changes_list,
conflict_target: [:address_hash, :address_type, :block_hash], conflict_target: [:address_hash, :address_type, :block_hash],
on_conflict: :nothing, on_conflict: on_conflict,
for: ecto_schema_module(), for: ecto_schema_module(),
returning: true, returning: true,
timeout: timeout, timeout: timeout,
timestamps: timestamps timestamps: timestamps
) )
end end
defp default_on_conflict do
from(reward in Reward,
update: [set: [reward: fragment("EXCLUDED.reward")]],
where: fragment("EXCLUDED.reward IS DISTINCT FROM ?", reward.reward)
)
end
end end

@ -147,18 +147,18 @@ defmodule Explorer.Chain.Wei do
end end
@doc """ @doc """
Multiplies two Wei values. Multiplies Wei values by an `t:integer/0`.
## Example ## Example
iex> first = %Explorer.Chain.Wei{value: Decimal.new(10)} iex> wei = %Explorer.Chain.Wei{value: Decimal.new(10)}
iex> second = %Explorer.Chain.Wei{value: Decimal.new(5)} iex> multiplier = 5
iex> Explorer.Chain.Wei.mult(first, second) iex> Explorer.Chain.Wei.mult(wei, multiplier)
%Explorer.Chain.Wei{value: Decimal.new(50)} %Explorer.Chain.Wei{value: Decimal.new(50)}
""" """
def mult(%Wei{value: wei_1}, %Wei{value: wei_2}) do def mult(%Wei{value: value}, multiplier) when is_integer(multiplier) do
wei_1 value
|> Decimal.mult(wei_2) |> Decimal.mult(multiplier)
|> from(:wei) |> from(:wei)
end end

@ -43,14 +43,14 @@ defmodule Explorer.Chain.BlockTest do
end end
end end
describe "get_blocks_without_reward/1" do describe "blocks_without_reward_query/1" do
test "finds only blocks without rewards" do test "finds only blocks without rewards" do
rewarded_block = insert(:block) rewarded_block = insert(:block)
insert(:reward, address_hash: insert(:address).hash, block_hash: rewarded_block.hash) insert(:reward, address_hash: insert(:address).hash, block_hash: rewarded_block.hash)
unrewarded_block = insert(:block) unrewarded_block = insert(:block)
results = results =
Block.get_blocks_without_reward() Block.blocks_without_reward_query()
|> Repo.all() |> Repo.all()
|> Enum.map(& &1.hash) |> Enum.map(& &1.hash)

@ -1,8 +1,9 @@
defmodule Explorer.Chain.WeiTest do defmodule Explorer.Chain.WeiTest do
use ExUnit.Case, async: true use ExUnit.Case, async: true
alias Explorer.Chain.Wei alias Explorer.Chain.Wei
doctest Explorer.Chain.Wei doctest Wei
describe "cast/1" do describe "cast/1" do
test "with hex string" do test "with hex string" do
@ -57,77 +58,77 @@ defmodule Explorer.Chain.WeiTest do
describe "sum/1" do describe "sum/1" do
test "with two positive values return the sum of them" do test "with two positive values return the sum of them" do
first = %Explorer.Chain.Wei{value: Decimal.new(123)} first = %Wei{value: Decimal.new(123)}
second = %Explorer.Chain.Wei{value: Decimal.new(1_000)} second = %Wei{value: Decimal.new(1_000)}
assert Explorer.Chain.Wei.sum(first, second) == %Explorer.Chain.Wei{value: Decimal.new(1_123)} assert Wei.sum(first, second) == %Wei{value: Decimal.new(1_123)}
end end
test "with a positive and a negative value return the positive minus the negative's absolute" do test "with a positive and a negative value return the positive minus the negative's absolute" do
first = %Explorer.Chain.Wei{value: Decimal.new(123)} first = %Wei{value: Decimal.new(123)}
second = %Explorer.Chain.Wei{value: Decimal.new(-100)} second = %Wei{value: Decimal.new(-100)}
assert Explorer.Chain.Wei.sum(first, second) == %Explorer.Chain.Wei{value: Decimal.new(23)} assert Wei.sum(first, second) == %Wei{value: Decimal.new(23)}
end end
end end
describe "sub/1" do describe "sub/1" do
test "with a negative second parameter return the sum of the absolute values" do test "with a negative second parameter return the sum of the absolute values" do
first = %Explorer.Chain.Wei{value: Decimal.new(123)} first = %Wei{value: Decimal.new(123)}
second = %Explorer.Chain.Wei{value: Decimal.new(-100)} second = %Wei{value: Decimal.new(-100)}
assert Explorer.Chain.Wei.sub(first, second) == %Explorer.Chain.Wei{value: Decimal.new(223)} assert Wei.sub(first, second) == %Wei{value: Decimal.new(223)}
end end
test "with a negative first parameter return the negative of the sum of the absolute values" do test "with a negative first parameter return the negative of the sum of the absolute values" do
first = %Explorer.Chain.Wei{value: Decimal.new(-123)} first = %Wei{value: Decimal.new(-123)}
second = %Explorer.Chain.Wei{value: Decimal.new(100)} second = %Wei{value: Decimal.new(100)}
assert Explorer.Chain.Wei.sub(first, second) == %Explorer.Chain.Wei{value: Decimal.new(-223)} assert Wei.sub(first, second) == %Wei{value: Decimal.new(-223)}
end end
test "with a larger first parameter return a positive number" do test "with a larger first parameter return a positive number" do
first = %Explorer.Chain.Wei{value: Decimal.new(123)} first = %Wei{value: Decimal.new(123)}
second = %Explorer.Chain.Wei{value: Decimal.new(100)} second = %Wei{value: Decimal.new(100)}
assert Explorer.Chain.Wei.sub(first, second) == %Explorer.Chain.Wei{value: Decimal.new(23)} assert Wei.sub(first, second) == %Wei{value: Decimal.new(23)}
end end
test "with a larger second parameter return a negative number" do test "with a larger second parameter return a negative number" do
first = %Explorer.Chain.Wei{value: Decimal.new(23)} first = %Wei{value: Decimal.new(23)}
second = %Explorer.Chain.Wei{value: Decimal.new(100)} second = %Wei{value: Decimal.new(100)}
assert Explorer.Chain.Wei.sub(first, second) == %Explorer.Chain.Wei{value: Decimal.new(-77)} assert Wei.sub(first, second) == %Wei{value: Decimal.new(-77)}
end end
end end
describe "mult/1" do describe "mult/2" do
test "with one negative parameter return a negative value" do test "with positive Wei and positive multiplier returns positive Wei" do
first = %Explorer.Chain.Wei{value: Decimal.new(123)} wei = %Wei{value: Decimal.new(123)}
second = %Explorer.Chain.Wei{value: Decimal.new(-1)} multiplier = 100
assert Explorer.Chain.Wei.mult(first, second) == %Explorer.Chain.Wei{value: Decimal.new(-123)} assert Wei.mult(wei, multiplier) == %Wei{value: Decimal.new(12300)}
end end
test "with two negative parameter return positive number" do test "with positive Wei and negative multiplier returns positive Wei" do
first = %Explorer.Chain.Wei{value: Decimal.new(-123)} wei = %Wei{value: Decimal.new(123)}
second = %Explorer.Chain.Wei{value: Decimal.new(-100)} multiplier = -1
assert Explorer.Chain.Wei.mult(first, second) == %Explorer.Chain.Wei{value: Decimal.new(12300)} assert Wei.mult(wei, multiplier) == %Wei{value: Decimal.new(-123)}
end end
test "with two positive parameters return a positive number" do test "with negative Wei and positive multiplier returns negative Wei" do
first = %Explorer.Chain.Wei{value: Decimal.new(123)} wei = %Wei{value: Decimal.new(-123)}
second = %Explorer.Chain.Wei{value: Decimal.new(100)} multiplier = 100
assert Explorer.Chain.Wei.mult(first, second) == %Explorer.Chain.Wei{value: Decimal.new(12300)} assert Wei.mult(wei, multiplier) == %Wei{value: Decimal.new(-12300)}
end end
test "the order of the paramete matters not" do test "with negative Wei and negative multiplier returns positive Wei" do
first = %Explorer.Chain.Wei{value: Decimal.new(123)} wei = %Wei{value: Decimal.new(-123)}
second = %Explorer.Chain.Wei{value: Decimal.new(-10)} multiplier = -100
assert Explorer.Chain.Wei.mult(first, second) == Explorer.Chain.Wei.mult(second, first) assert Wei.mult(wei, multiplier) == %Wei{value: Decimal.new(12300)}
end end
end end
end end

@ -1274,6 +1274,24 @@ defmodule Explorer.ChainTest do
end end
end end
describe "block_hash_by_number/1" do
test "without blocks returns empty map" do
assert Chain.block_hash_by_number([]) == %{}
end
test "with consensus block returns mapping" do
block = insert(:block)
assert Chain.block_hash_by_number([block.number]) == %{block.number => block.hash}
end
test "with non-consensus block does not return mapping" do
block = insert(:block, consensus: false)
assert Chain.block_hash_by_number([block.number]) == %{}
end
end
describe "list_top_addresses/0" do describe "list_top_addresses/0" do
test "without addresses with balance > 0" do test "without addresses with balance > 0" do
insert(:address, fetched_coin_balance: 0) insert(:address, fetched_coin_balance: 0)
@ -1315,25 +1333,25 @@ defmodule Explorer.ChainTest do
end end
end end
describe "get_blocks_without_reward/1" do describe "stream_blocks_without_rewards/2" do
test "includes consensus blocks" do test "includes consensus blocks" do
%Block{hash: consensus_hash} = insert(:block, consensus: true) %Block{hash: consensus_hash} = insert(:block, consensus: true)
assert [%Block{hash: ^consensus_hash}] = Chain.get_blocks_without_reward() assert {:ok, [%Block{hash: ^consensus_hash}]} = Chain.stream_blocks_without_rewards([], &[&1 | &2])
end end
test "does not include consensus block that has a reward" do test "does not include consensus block that has a reward" do
%Block{hash: consensus_hash, miner_hash: miner_hash} = insert(:block, consensus: true) %Block{hash: consensus_hash, miner_hash: miner_hash} = insert(:block, consensus: true)
insert(:reward, address_hash: miner_hash, block_hash: consensus_hash) insert(:reward, address_hash: miner_hash, block_hash: consensus_hash)
assert [] = Chain.get_blocks_without_reward() assert {:ok, []} = Chain.stream_blocks_without_rewards([], &[&1 | &2])
end end
# https://github.com/poanetwork/blockscout/issues/1310 regression test # https://github.com/poanetwork/blockscout/issues/1310 regression test
test "does not include non-consensus blocks" do test "does not include non-consensus blocks" do
insert(:block, consensus: false) insert(:block, consensus: false)
assert [] = Chain.get_blocks_without_reward() assert {:ok, []} = Chain.stream_blocks_without_rewards([], &[&1 | &2])
end end
end end
@ -2279,6 +2297,52 @@ defmodule Explorer.ChainTest do
end end
end end
describe "gas_payment_by_block_hash/1" do
setup do
number = 1
%{consensus_block: insert(:block, number: number, consensus: true), number: number}
end
test "without consensus block hash has no key", %{consensus_block: consensus_block, number: number} do
non_consensus_block = insert(:block, number: number, consensus: false)
:transaction
|> insert(gas_price: 1)
|> with_block(consensus_block, gas_used: 1)
:transaction
|> insert(gas_price: 1)
|> with_block(consensus_block, gas_used: 2)
assert Chain.gas_payment_by_block_hash([non_consensus_block.hash]) == %{}
end
test "with consensus block hash without transactions has key with 0 value", %{
consensus_block: %Block{hash: consensus_block_hash}
} do
assert Chain.gas_payment_by_block_hash([consensus_block_hash]) == %{
consensus_block_hash => %Wei{value: Decimal.new(0)}
}
end
test "with consensus block hash with transactions has key with value", %{
consensus_block: %Block{hash: consensus_block_hash} = consensus_block
} do
:transaction
|> insert(gas_price: 1)
|> with_block(consensus_block, gas_used: 2)
:transaction
|> insert(gas_price: 3)
|> with_block(consensus_block, gas_used: 4)
assert Chain.gas_payment_by_block_hash([consensus_block_hash]) == %{
consensus_block_hash => %Wei{value: Decimal.new(14)}
}
end
end
describe "missing_block_number_ranges/1" do describe "missing_block_number_ranges/1" do
# 0000 # 0000
test "0..0 without blocks" do test "0..0 without blocks" do

@ -9,6 +9,7 @@ defmodule Indexer.Block.Catchup.Fetcher do
import Indexer.Block.Fetcher, import Indexer.Block.Fetcher,
only: [ only: [
async_import_block_rewards: 1,
async_import_coin_balances: 2, async_import_coin_balances: 2,
async_import_tokens: 1, async_import_tokens: 1,
async_import_uncles: 1, async_import_uncles: 1,
@ -119,22 +120,27 @@ defmodule Indexer.Block.Catchup.Fetcher do
@impl Block.Fetcher @impl Block.Fetcher
def import(_, options) when is_map(options) do def import(_, options) when is_map(options) do
{async_import_remaining_block_data_options, chain_import_options} = {async_import_remaining_block_data_options, options_with_block_rewards_errors} =
Map.split(options, @async_import_remaining_block_data_options) Map.split(options, @async_import_remaining_block_data_options)
full_chain_import_options = put_in(chain_import_options, [:blocks, :params, Access.all(), :consensus], true) {block_reward_errors, options_without_block_rewards_errors} =
pop_in(options_with_block_rewards_errors[:block_rewards][:errors])
full_chain_import_options =
put_in(options_without_block_rewards_errors, [:blocks, :params, Access.all(), :consensus], true)
with {:import, {:ok, imported} = ok} <- {:import, Chain.import(full_chain_import_options)} do with {:import, {:ok, imported} = ok} <- {:import, Chain.import(full_chain_import_options)} do
async_import_remaining_block_data( async_import_remaining_block_data(
imported, imported,
async_import_remaining_block_data_options Map.put(async_import_remaining_block_data_options, :block_rewards, %{errors: block_reward_errors})
) )
ok ok
end end
end end
defp async_import_remaining_block_data(imported, options) do defp async_import_remaining_block_data(imported, %{block_rewards: %{errors: block_reward_errors}} = options) do
async_import_block_rewards(block_reward_errors)
async_import_coin_balances(imported, options) async_import_coin_balances(imported, options)
async_import_created_contract_codes(imported) async_import_created_contract_codes(imported)
async_import_internal_transactions(imported) async_import_internal_transactions(imported)
@ -293,19 +299,19 @@ defmodule Indexer.Block.Catchup.Fetcher do
end end
end end
defp retry(sequence, errors) when is_list(errors) do defp retry(sequence, block_errors) when is_list(block_errors) do
errors block_errors
|> errors_to_ranges() |> block_errors_to_block_number_ranges()
|> Enum.map(&push_back(sequence, &1)) |> Enum.map(&push_back(sequence, &1))
end end
defp errors_to_ranges(errors) when is_list(errors) do defp block_errors_to_block_number_ranges(block_errors) when is_list(block_errors) do
errors block_errors
|> Enum.flat_map(&error_to_numbers/1) |> Enum.map(&block_error_to_number/1)
|> numbers_to_ranges() |> numbers_to_ranges()
end end
defp error_to_numbers(%{data: %{number: number}}) when is_integer(number), do: [number] defp block_error_to_number(%{data: %{number: number}}) when is_integer(number), do: number
defp numbers_to_ranges([]), do: [] defp numbers_to_ranges([]), do: []

@ -7,6 +7,8 @@ defmodule Indexer.Block.Fetcher do
require Logger require Logger
import EthereumJSONRPC, only: [quantity_to_integer: 1]
alias EthereumJSONRPC.{Blocks, FetchedBeneficiaries} alias EthereumJSONRPC.{Blocks, FetchedBeneficiaries}
alias Explorer.Chain.{Address, Block, Hash, Import, Transaction} alias Explorer.Chain.{Address, Block, Hash, Import, Transaction}
alias Indexer.{AddressExtraction, CoinBalance, MintTransfer, ReplacedTransaction, Token, TokenTransfers, Tracer} alias Indexer.{AddressExtraction, CoinBalance, MintTransfer, ReplacedTransaction, Token, TokenTransfers, Tracer}
@ -106,9 +108,8 @@ defmodule Indexer.Block.Fetcher do
transactions_with_receipts = Receipts.put(transactions_params_without_receipts, receipts), transactions_with_receipts = Receipts.put(transactions_params_without_receipts, receipts),
%{token_transfers: token_transfers, tokens: tokens} = TokenTransfers.parse(logs), %{token_transfers: token_transfers, tokens: tokens} = TokenTransfers.parse(logs),
%{mint_transfers: mint_transfers} = MintTransfer.parse(logs), %{mint_transfers: mint_transfers} = MintTransfer.parse(logs),
{:beneficiaries, %FetchedBeneficiaries{params_set: beneficiary_params_set, errors: beneficiaries_errors} =
{:ok, %FetchedBeneficiaries{params_set: beneficiary_params_set, errors: beneficiaries_errors}}} <- fetch_beneficiaries(blocks, json_rpc_named_arguments),
fetch_beneficiaries(range, json_rpc_named_arguments),
addresses = addresses =
AddressExtraction.extract_addresses(%{ AddressExtraction.extract_addresses(%{
block_reward_contract_beneficiaries: MapSet.to_list(beneficiary_params_set), block_reward_contract_beneficiaries: MapSet.to_list(beneficiary_params_set),
@ -126,7 +127,7 @@ defmodule Indexer.Block.Fetcher do
transactions_params: transactions_with_receipts transactions_params: transactions_with_receipts
} }
|> CoinBalances.params_set(), |> CoinBalances.params_set(),
block_rewards <- fetch_block_rewards(beneficiary_params_set, transactions_with_receipts), beneficiaries_with_gas_payment <- add_gas_payments(beneficiary_params_set, transactions_with_receipts),
address_token_balances = TokenBalances.params_set(%{token_transfers_params: token_transfers}), address_token_balances = TokenBalances.params_set(%{token_transfers_params: token_transfers}),
{:ok, inserted} <- {:ok, inserted} <-
__MODULE__.import( __MODULE__.import(
@ -137,14 +138,14 @@ defmodule Indexer.Block.Fetcher do
address_token_balances: %{params: address_token_balances}, address_token_balances: %{params: address_token_balances},
blocks: %{params: blocks}, blocks: %{params: blocks},
block_second_degree_relations: %{params: block_second_degree_relations_params}, block_second_degree_relations: %{params: block_second_degree_relations_params},
block_rewards: %{params: block_rewards}, block_rewards: %{errors: beneficiaries_errors, params: beneficiaries_with_gas_payment},
logs: %{params: logs}, logs: %{params: logs},
token_transfers: %{params: token_transfers}, token_transfers: %{params: token_transfers},
tokens: %{on_conflict: :nothing, params: tokens}, tokens: %{on_conflict: :nothing, params: tokens},
transactions: %{params: transactions_with_receipts} transactions: %{params: transactions_with_receipts}
} }
) do ) do
{:ok, %{inserted: inserted, errors: blocks_errors ++ beneficiaries_errors}} {:ok, %{inserted: inserted, errors: blocks_errors}}
else else
{step, {:error, reason}} -> {:error, {step, reason}} {step, {:error, reason}} -> {:error, {step, reason}}
{:import, {:error, step, failed_value, changes_so_far}} -> {:error, {step, failed_value, changes_so_far}} {:import, {:error, step, failed_value, changes_so_far}} -> {:error, {step, failed_value, changes_so_far}}
@ -171,6 +172,14 @@ defmodule Indexer.Block.Fetcher do
callback_module.import(state, options_with_broadcast) callback_module.import(state, options_with_broadcast)
end end
def async_import_block_rewards([]), do: :ok
def async_import_block_rewards(errors) when is_list(errors) do
errors
|> block_reward_errors_to_block_numbers()
|> Indexer.Block.Reward.Fetcher.async_fetch()
end
def async_import_coin_balances(%{addresses: addresses}, %{ def async_import_coin_balances(%{addresses: addresses}, %{
address_hash_to_fetched_balance_block_number: address_hash_to_block_number address_hash_to_fetched_balance_block_number: address_hash_to_block_number
}) do }) do
@ -214,25 +223,94 @@ defmodule Indexer.Block.Fetcher do
def async_import_replaced_transactions(_), do: :ok def async_import_replaced_transactions(_), do: :ok
defp fetch_beneficiaries(range, json_rpc_named_arguments) do defp block_reward_errors_to_block_numbers(block_reward_errors) when is_list(block_reward_errors) do
result = Enum.map(block_reward_errors, &block_reward_error_to_block_number/1)
with :ignore <- EthereumJSONRPC.fetch_beneficiaries(range, json_rpc_named_arguments) do end
{:ok, %FetchedBeneficiaries{params_set: MapSet.new()}}
end defp block_reward_error_to_block_number(%{data: %{block_number: block_number}}) when is_integer(block_number) do
block_number
end
defp block_reward_error_to_block_number(%{data: %{block_quantity: block_quantity}}) when is_binary(block_quantity) do
quantity_to_integer(block_quantity)
end
defp fetch_beneficiaries(blocks, json_rpc_named_arguments) do
hash_string_by_number =
Enum.into(blocks, %{}, fn %{number: number, hash: hash_string}
when is_integer(number) and is_binary(hash_string) ->
{number, hash_string}
end)
hash_string_by_number
|> Map.keys()
|> EthereumJSONRPC.fetch_beneficiaries(json_rpc_named_arguments)
|> case do
{:ok, %FetchedBeneficiaries{params_set: params_set} = fetched_beneficiaries} ->
consensus_params_set = consensus_params_set(params_set, hash_string_by_number)
%FetchedBeneficiaries{fetched_beneficiaries | params_set: consensus_params_set}
{:beneficiaries, result} {:error, reason} ->
Logger.error(fn -> ["Could not fetch beneficiaries: ", inspect(reason)] end)
error =
case reason do
%{code: code, message: message} -> %{code: code, message: message}
_ -> %{code: -1, message: inspect(reason)}
end
errors =
Enum.map(hash_string_by_number, fn {number, _} when is_integer(number) ->
Map.put(error, :data, %{block_number: number})
end)
%FetchedBeneficiaries{errors: errors}
:ignore ->
%FetchedBeneficiaries{}
end
end end
defp fetch_block_rewards(beneficiaries, transactions) do defp consensus_params_set(params_set, hash_string_by_number) do
params_set
|> Enum.filter(fn %{block_number: block_number, block_hash: block_hash_string}
when is_integer(block_number) and is_binary(block_hash_string) ->
case Map.fetch!(hash_string_by_number, block_number) do
^block_hash_string ->
true
other_block_hash_string ->
Logger.debug(fn ->
[
"fetch beneficiaries reported block number (",
to_string(block_number),
") maps to different (",
other_block_hash_string,
") block hash than the one from getBlock (",
block_hash_string,
"). A reorg has occurred."
]
end)
false
end
end)
|> Enum.into(MapSet.new())
end
defp add_gas_payments(beneficiaries, transactions) do
transactions_by_block_number = Enum.group_by(transactions, & &1.block_number)
Enum.map(beneficiaries, fn beneficiary -> Enum.map(beneficiaries, fn beneficiary ->
case beneficiary.address_type do case beneficiary.address_type do
:validator -> :validator ->
validation_reward = fetch_validation_reward(beneficiary, transactions) gas_payment = gas_payment(beneficiary, transactions_by_block_number)
"0x" <> reward_hex = beneficiary.reward "0x" <> minted_hex = beneficiary.reward
{reward, _} = Integer.parse(reward_hex, 16) {minted, _} = Integer.parse(minted_hex, 16)
%{beneficiary | reward: reward + validation_reward} %{beneficiary | reward: minted + gas_payment}
_ -> _ ->
beneficiary beneficiary
@ -240,10 +318,18 @@ defmodule Indexer.Block.Fetcher do
end) end)
end end
defp fetch_validation_reward(beneficiary, transactions) do defp gas_payment(transactions) when is_list(transactions) do
transactions transactions
|> Stream.filter(fn t -> t.block_number == beneficiary.block_number end) |> Stream.map(&(&1.gas_used * &1.gas_price))
|> Enum.reduce(0, fn t, acc -> acc + t.gas_used * t.gas_price end) |> Enum.sum()
end
defp gas_payment(%{block_number: block_number}, transactions_by_block_number)
when is_map(transactions_by_block_number) do
case Map.fetch(transactions_by_block_number, block_number) do
{:ok, transactions} -> gas_payment(transactions)
:error -> 0
end
end end
# `fetched_balance_block_number` is needed for the `CoinBalanceFetcher`, but should not be used for `import` because the # `fetched_balance_block_number` is needed for the `CoinBalanceFetcher`, but should not be used for `import` because the

@ -12,13 +12,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
import EthereumJSONRPC, only: [integer_to_quantity: 1, quantity_to_integer: 1] import EthereumJSONRPC, only: [integer_to_quantity: 1, quantity_to_integer: 1]
import Indexer.Block.Fetcher, import Indexer.Block.Fetcher,
only: [ only: [async_import_block_rewards: 1, async_import_tokens: 1, async_import_uncles: 1, fetch_and_import_range: 2, async_import_replaced_transactions: 1]
async_import_tokens: 1,
async_import_uncles: 1,
fetch_and_import_range: 2,
async_import_replaced_transactions: 1
]
alias ABI.TypeDecoder alias ABI.TypeDecoder
alias Ecto.Changeset alias Ecto.Changeset
alias EthereumJSONRPC.{FetchedBalances, Subscription} alias EthereumJSONRPC.{FetchedBalances, Subscription}
@ -164,6 +158,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
address_hash_to_fetched_balance_block_number: address_hash_to_block_number, address_hash_to_fetched_balance_block_number: address_hash_to_block_number,
address_token_balances: %{params: address_token_balances_params}, address_token_balances: %{params: address_token_balances_params},
addresses: %{params: addresses_params}, addresses: %{params: addresses_params},
block_rewards: block_rewards,
transactions: %{params: transactions_params}, transactions: %{params: transactions_params},
token_transfers: %{params: token_transfers_params} token_transfers: %{params: token_transfers_params}
} = options } = options
@ -190,17 +185,19 @@ defmodule Indexer.Block.Realtime.Fetcher do
{:address_token_balances, {:ok, address_token_balances}} <- {:address_token_balances, {:ok, address_token_balances}} <-
{:address_token_balances, fetch_token_balances(address_token_balances_params)}, {:address_token_balances, fetch_token_balances(address_token_balances_params)},
address_current_token_balances = TokenBalances.to_address_current_token_balances(address_token_balances), address_current_token_balances = TokenBalances.to_address_current_token_balances(address_token_balances),
{block_reward_errors, chain_import_block_rewards} = Map.pop(block_rewards, :errors),
chain_import_options = chain_import_options =
options options
|> Map.drop(@import_options) |> Map.drop(@import_options)
|> put_in([:addresses, :params], balances_addresses_params) |> put_in([:addresses, :params], balances_addresses_params)
|> put_in([:blocks, :params, Access.all(), :consensus], true) |> put_in([:blocks, :params, Access.all(), :consensus], true)
|> put_in([:block_rewards], chain_import_block_rewards)
|> put_in([Access.key(:address_coin_balances, %{}), :params], balances_params) |> put_in([Access.key(:address_coin_balances, %{}), :params], balances_params)
|> put_in([Access.key(:address_current_token_balances, %{}), :params], address_current_token_balances) |> put_in([Access.key(:address_current_token_balances, %{}), :params], address_current_token_balances)
|> put_in([Access.key(:address_token_balances), :params], address_token_balances) |> put_in([Access.key(:address_token_balances), :params], address_token_balances)
|> put_in([Access.key(:internal_transactions, %{}), :params], internal_transactions_params), |> put_in([Access.key(:internal_transactions, %{}), :params], internal_transactions_params),
{:import, {:ok, imported} = ok} <- {:import, Chain.import(chain_import_options)} do {:import, {:ok, imported} = ok} <- {:import, Chain.import(chain_import_options)} do
async_import_remaining_block_data(imported) async_import_remaining_block_data(imported, %{block_rewards: %{errors: block_reward_errors}})
ok ok
end end
end end
@ -344,7 +341,8 @@ defmodule Indexer.Block.Realtime.Fetcher do
Enum.any?(changesets, &(Map.get(&1, :message) == "Unknown block number")) Enum.any?(changesets, &(Map.get(&1, :message) == "Unknown block number"))
end end
defp async_import_remaining_block_data(imported) do defp async_import_remaining_block_data(imported, %{block_rewards: %{errors: block_reward_errors}}) do
async_import_block_rewards(block_reward_errors)
async_import_tokens(imported) async_import_tokens(imported)
async_import_uncles(imported) async_import_uncles(imported)
async_import_replaced_transactions(imported) async_import_replaced_transactions(imported)

@ -0,0 +1,246 @@
defmodule Indexer.Block.Reward.Fetcher do
@moduledoc """
Fetches `t:Explorer.Chain.Block.Reward.t/0` for a given `t:Explorer.Chain.Block.block_number/0`.
To protect from reorgs where the returned rewards are for same `number`, but a different `hash`, the `hash` is
retrieved from the database and compared against that returned from `EthereumJSONRPC.`
"""
use Spandex.Decorators
require Logger
import EthereumJSONRPC, only: [quantity_to_integer: 1]
alias Ecto.Changeset
alias EthereumJSONRPC.FetchedBeneficiaries
alias Explorer.Chain
alias Explorer.Chain.{Block, Wei}
alias Indexer.Address.CoinBalances
alias Indexer.{AddressExtraction, BufferedTask, CoinBalance, Tracer}
@behaviour BufferedTask
@defaults [
flush_interval: :timer.seconds(3),
max_batch_size: 10,
max_concurrency: 4,
task_supervisor: Indexer.Block.Reward.TaskSupervisor,
metadata: [fetcher: :block_reward]
]
@doc """
Asynchronously fetches block rewards for each `t:Explorer.Chain.Explorer.block_number/0`` in `block_numbers`.
"""
@spec async_fetch([Block.block_number()]) :: :ok
def async_fetch(block_numbers) when is_list(block_numbers) do
BufferedTask.buffer(__MODULE__, block_numbers)
end
@doc false
# credo:disable-for-next-line Credo.Check.Design.DuplicatedCode
def child_spec([init_options, gen_server_options]) do
{state, mergeable_init_options} = Keyword.pop(init_options, :json_rpc_named_arguments)
unless state do
raise ArgumentError,
":json_rpc_named_arguments must be provided to `#{__MODULE__}.child_spec " <>
"to allow for json_rpc calls when running."
end
merged_init_options =
@defaults
|> Keyword.merge(mergeable_init_options)
|> Keyword.put(:state, state)
Supervisor.child_spec({BufferedTask, [{__MODULE__, merged_init_options}, gen_server_options]}, id: __MODULE__)
end
@impl BufferedTask
def init(initial, reducer, _) do
{:ok, final} =
Chain.stream_blocks_without_rewards(initial, fn %{number: number}, acc ->
reducer.(number, acc)
end)
final
end
@impl BufferedTask
@decorate trace(name: "fetch", resource: "Indexer.Block.Reward.Fetcher.run/2", service: :indexer, tracer: Tracer)
def run(entries, json_rpc_named_arguments) do
hash_string_by_number =
entries
|> Enum.uniq()
|> hash_string_by_number()
consensus_numbers = Map.keys(hash_string_by_number)
consensus_number_count = Enum.count(consensus_numbers)
Logger.metadata(count: consensus_number_count)
Logger.debug(fn -> "fetching" end)
consensus_numbers
|> EthereumJSONRPC.fetch_beneficiaries(json_rpc_named_arguments)
|> case do
{:ok, fetched_beneficiaries} ->
run_fetched_beneficiaries(fetched_beneficiaries, hash_string_by_number)
{:error, reason} ->
Logger.error(
fn ->
["failed to fetch: ", inspect(reason)]
end,
error_count: consensus_number_count
)
{:retry, consensus_numbers}
end
end
defp hash_string_by_number(numbers) when is_list(numbers) do
numbers
|> Chain.block_hash_by_number()
|> Enum.into(%{}, fn {number, hash} ->
{number, to_string(hash)}
end)
end
defp run_fetched_beneficiaries(%FetchedBeneficiaries{params_set: params_set, errors: errors}, hash_string_by_number) do
params_set
|> filter_consensus_params(hash_string_by_number)
|> case do
[] ->
retry_errors(errors)
beneficiaries_params ->
beneficiaries_params
|> add_gas_payments()
|> import_block_reward_params()
|> case do
{:ok, %{address_coin_balances: address_coin_balances}} ->
CoinBalance.Fetcher.async_fetch_balances(address_coin_balances)
retry_errors(errors)
{:error, [%Changeset{} | _] = changesets} ->
Logger.error(fn -> ["Failed to validate: ", inspect(changesets)] end,
error_count: Enum.count(hash_string_by_number)
)
retry_beneficiaries_params(beneficiaries_params)
{:error, step, failed_value, _changes_so_far} ->
Logger.error(fn -> ["Failed to import", inspect(failed_value)] end,
step: step,
error_count: Enum.count(hash_string_by_number)
)
retry_beneficiaries_params(beneficiaries_params)
end
end
end
defp filter_consensus_params(params_set, hash_string_by_number) do
Enum.filter(params_set, fn %{block_number: block_number, block_hash: block_hash} ->
case Map.fetch!(hash_string_by_number, block_number) do
^block_hash ->
true
other_block_hash ->
Logger.debug(fn ->
[
"fetch beneficiaries reported block number (",
to_string(block_number),
") maps to different (",
other_block_hash,
") block hash than the one in the database (",
block_hash,
"). A reorg has occurred."
]
end)
false
end
end)
end
defp add_gas_payments(beneficiaries_params) do
gas_payment_by_block_hash =
beneficiaries_params
|> Stream.filter(&(&1.address_type == :validator))
|> Enum.map(& &1.block_hash)
|> Chain.gas_payment_by_block_hash()
Enum.map(beneficiaries_params, fn %{block_hash: block_hash} = beneficiary ->
case gas_payment_by_block_hash do
%{^block_hash => gas_payment} ->
{:ok, minted} = Wei.cast(beneficiary.reward)
%{beneficiary | reward: Wei.sum(minted, gas_payment)}
_ ->
beneficiary
end
end)
end
defp import_block_reward_params(block_rewards_params) when is_list(block_rewards_params) do
addresses_params = AddressExtraction.extract_addresses(%{block_reward_contract_beneficiaries: block_rewards_params})
address_coin_balances_params_set = CoinBalances.params_set(%{beneficiary_params: block_rewards_params})
Chain.import(%{
addresses: %{params: addresses_params},
address_coin_balances: %{params: address_coin_balances_params_set},
block_rewards: %{params: block_rewards_params}
})
end
defp retry_beneficiaries_params(beneficiaries_params) when is_list(beneficiaries_params) do
entries = Enum.map(beneficiaries_params, & &1.block_number)
{:retry, entries}
end
defp retry_errors([]), do: :ok
defp retry_errors(errors) when is_list(errors) do
retried_entries = fetched_beneficiaries_errors_to_entries(errors)
Logger.error(
fn ->
[
"failed to fetch: ",
fetched_beneficiaries_errors_to_iodata(errors)
]
end,
error_count: Enum.count(retried_entries)
)
{:retry, retried_entries}
end
defp fetched_beneficiaries_errors_to_entries(errors) when is_list(errors) do
Enum.map(errors, &fetched_beneficiary_error_to_entry/1)
end
defp fetched_beneficiary_error_to_entry(%{data: %{block_quantity: block_quantity}}) when is_binary(block_quantity) do
quantity_to_integer(block_quantity)
end
defp fetched_beneficiaries_errors_to_iodata(errors) when is_list(errors) do
fetched_beneficiaries_errors_to_iodata(errors, [])
end
defp fetched_beneficiaries_errors_to_iodata([], iodata), do: iodata
defp fetched_beneficiaries_errors_to_iodata([error | errors], iodata) do
fetched_beneficiaries_errors_to_iodata(errors, [iodata | fetched_beneficiary_error_to_iodata(error)])
end
defp fetched_beneficiary_error_to_iodata(%{code: code, message: message, data: %{block_quantity: block_quantity}})
when is_integer(code) and is_binary(message) and is_binary(block_quantity) do
["@", quantity_to_integer(block_quantity), ": (", to_string(code), ") ", message, ?\n]
end
end

@ -0,0 +1,38 @@
defmodule Indexer.Block.Reward.Supervisor do
@moduledoc """
Supervises `Indexer.Block.Reward.Fetcher` and its batch tasks through `Indexer.Block.Reward.TaskSupervisor`
"""
use Supervisor
alias Indexer.Block.Reward.Fetcher
def child_spec([init_arguments]) do
child_spec([init_arguments, []])
end
def child_spec([_init_arguments, _gen_server_options] = start_link_arguments) do
default = %{
id: __MODULE__,
start: {__MODULE__, :start_link, start_link_arguments},
type: :supervisor
}
Supervisor.child_spec(default, [])
end
def start_link(arguments, gen_server_options \\ []) do
Supervisor.start_link(__MODULE__, arguments, Keyword.put_new(gen_server_options, :name, __MODULE__))
end
@impl Supervisor
def init(fetcher_arguments) do
Supervisor.init(
[
{Task.Supervisor, name: Indexer.Block.Reward.TaskSupervisor},
{Fetcher, [fetcher_arguments, [name: Fetcher]]}
],
strategy: :one_for_one
)
end
end

@ -4,7 +4,7 @@ defmodule Indexer.Block.Supervisor do
""" """
alias Indexer.Block alias Indexer.Block
alias Indexer.Block.{Catchup, InvalidConsensus, Realtime, UncatalogedRewards, Uncle} alias Indexer.Block.{Catchup, InvalidConsensus, Realtime, Reward, Uncle}
use Supervisor use Supervisor
@ -13,7 +13,13 @@ defmodule Indexer.Block.Supervisor do
end end
@impl Supervisor @impl Supervisor
def init(%{block_interval: block_interval, subscribe_named_arguments: subscribe_named_arguments} = named_arguments) do def init(
%{
block_interval: block_interval,
json_rpc_named_arguments: json_rpc_named_arguments,
subscribe_named_arguments: subscribe_named_arguments
} = named_arguments
) do
block_fetcher = block_fetcher =
named_arguments named_arguments
|> Map.drop(~w(block_interval memory_monitor subscribe_named_arguments)a) |> Map.drop(~w(block_interval memory_monitor subscribe_named_arguments)a)
@ -35,7 +41,11 @@ defmodule Indexer.Block.Supervisor do
[name: Realtime.Supervisor] [name: Realtime.Supervisor]
]}, ]},
{Uncle.Supervisor, [[block_fetcher: block_fetcher, memory_monitor: memory_monitor], [name: Uncle.Supervisor]]}, {Uncle.Supervisor, [[block_fetcher: block_fetcher, memory_monitor: memory_monitor], [name: Uncle.Supervisor]]},
UncatalogedRewards.Processor {Reward.Supervisor,
[
[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor],
[name: Reward.Supervisor]
]}
], ],
strategy: :one_for_one strategy: :one_for_one
) )

@ -1,115 +0,0 @@
defmodule Indexer.Block.UncatalogedRewards.Importer do
@moduledoc """
a module to fetch and import the rewards for blocks that were indexed without the reward
"""
alias Ecto.Multi
alias EthereumJSONRPC.FetchedBeneficiaries
alias Explorer.Chain
alias Explorer.Chain.{Block.Reward, Wei}
# max number of blocks in a single request
# higher numbers may cause the requests to time out
# lower numbers will generate more requests
@chunk_size 10
@doc """
receives a list of blocks and tries to fetch and insert rewards for them
"""
def fetch_and_import_rewards(blocks_batch) do
result =
blocks_batch
|> break_into_chunks_of_block_numbers()
|> Enum.reduce([], fn chunk, acc ->
chunk
|> fetch_beneficiaries()
|> fetch_block_rewards()
|> insert_reward_group()
|> case do
:empty -> acc
insert -> [insert | acc]
end
end)
{:ok, result}
rescue
e in RuntimeError -> {:error, %{exception: e}}
end
defp fetch_beneficiaries(chunk) do
{chunk_start, chunk_end} = Enum.min_max(chunk)
{:ok, %FetchedBeneficiaries{params_set: result}} =
with :ignore <- EthereumJSONRPC.fetch_beneficiaries(chunk_start..chunk_end, json_rpc_named_arguments()) do
{:ok, %FetchedBeneficiaries{params_set: MapSet.new()}}
end
result
end
defp fetch_block_rewards(beneficiaries) do
Enum.map(beneficiaries, fn beneficiary ->
beneficiary_changes =
case beneficiary.address_type do
:validator ->
validation_reward = fetch_validation_reward(beneficiary)
{:ok, reward} = Wei.cast(beneficiary.reward)
%{beneficiary | reward: Wei.sum(reward, validation_reward)}
_ ->
beneficiary
end
Reward.changeset(%Reward{}, beneficiary_changes)
end)
end
defp fetch_validation_reward(beneficiary) do
{:ok, accumulator} = Wei.cast(0)
beneficiary.block_number
|> Chain.get_transactions_of_block_number()
|> Enum.reduce(accumulator, fn t, acc ->
{:ok, price_as_wei} = Wei.cast(t.gas_used)
price_as_wei |> Wei.mult(t.gas_price) |> Wei.sum(acc)
end)
end
defp break_into_chunks_of_block_numbers(blocks) do
Enum.chunk_while(
blocks,
[],
fn block, acc ->
if (acc == [] || hd(acc) + 1 == block.number) && length(acc) < @chunk_size do
{:cont, [block.number | acc]}
else
{:cont, acc, [block.number]}
end
end,
fn
[] -> {:cont, []}
acc -> {:cont, acc, []}
end
)
end
defp insert_reward_group([]), do: :empty
defp insert_reward_group(rewards) do
rewards
|> Enum.reduce({Multi.new(), 0}, fn changeset, {multi, index} ->
{Multi.insert(multi, "insert_#{index}", changeset,
conflict_target: ~w(address_hash address_type block_hash),
on_conflict: {:replace, [:reward]}
), index + 1}
end)
|> elem(0)
|> Explorer.Repo.transaction()
end
defp json_rpc_named_arguments do
Application.get_env(:explorer, :json_rpc_named_arguments)
end
end

@ -1,40 +0,0 @@
defmodule Indexer.Block.UncatalogedRewards.Processor do
@moduledoc """
genserver to find blocks without rewards and fetch their rewards in batches
"""
use GenServer
alias Explorer.Chain
alias Indexer.Block.UncatalogedRewards.Importer
@max_batch_size 150
@default_cooldown 300
def start_link(_) do
GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
end
@impl true
def init(args) do
send(self(), :import_batch)
{:ok, args}
end
@impl true
def handle_info(:import_batch, state) do
@max_batch_size
|> Chain.get_blocks_without_reward()
|> import_or_try_later
{:noreply, state}
end
defp import_or_try_later(batch) do
import_results = Importer.fetch_and_import_rewards(batch)
wait_time = if import_results == {:ok, []}, do: :timer.hours(24), else: @default_cooldown
Process.send_after(self(), :import_batch, wait_time)
end
end

@ -38,6 +38,7 @@ defmodule Indexer.CoinBalance.Fetcher do
end end
@doc false @doc false
# credo:disable-for-next-line Credo.Check.Design.DuplicatedCode
def child_spec([init_options, gen_server_options]) do def child_spec([init_options, gen_server_options]) do
{state, mergeable_init_options} = Keyword.pop(init_options, :json_rpc_named_arguments) {state, mergeable_init_options} = Keyword.pop(init_options, :json_rpc_named_arguments)

@ -2,8 +2,11 @@ defmodule Indexer.Block.Catchup.FetcherTest do
use EthereumJSONRPC.Case, async: false use EthereumJSONRPC.Case, async: false
use Explorer.DataCase use Explorer.DataCase
import EthereumJSONRPC, only: [integer_to_quantity: 1]
import Mox import Mox
alias Explorer.Chain
alias Explorer.Chain.Block.Reward
alias Indexer.{Block, CoinBalance, InternalTransaction, Token, TokenBalance} alias Indexer.{Block, CoinBalance, InternalTransaction, Token, TokenBalance}
alias Indexer.Block.Catchup.Fetcher alias Indexer.Block.Catchup.Fetcher
@ -87,6 +90,7 @@ defmodule Indexer.Block.Catchup.FetcherTest do
} }
] ]
}, },
block_rewards: %{errors: [], params: []},
block_second_degree_relations: %{ block_second_degree_relations: %{
params: [ params: [
%{ %{
@ -111,4 +115,324 @@ defmodule Indexer.Block.Catchup.FetcherTest do
assert_receive {:uncles, [^uncle_hash]} assert_receive {:uncles, [^uncle_hash]}
end end
end end
describe "task/1" do
test "ignores fetched beneficiaries with different hash for same number", %{
json_rpc_named_arguments: json_rpc_named_arguments
} do
CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
InternalTransaction.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
TokenBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
latest_block_number = 1
latest_block_quantity = integer_to_quantity(latest_block_number)
block_number = latest_block_number - 1
block_hash = block_hash()
block_quantity = integer_to_quantity(block_number)
miner_hash = address_hash()
miner_hash_data = to_string(miner_hash)
new_block_hash = block_hash()
refute block_hash == new_block_hash
EthereumJSONRPC.Mox
|> expect(:json_rpc, fn %{method: "eth_getBlockByNumber", params: ["latest", false]}, _options ->
{:ok, %{"number" => latest_block_quantity}}
end)
|> expect(:json_rpc, fn [
%{
id: id,
jsonrpc: "2.0",
method: "eth_getBlockByNumber",
params: [^block_quantity, true]
}
],
_options ->
{:ok,
[
%{
id: id,
jsonrpc: "2.0",
result: %{
"hash" => to_string(block_hash),
"number" => block_quantity,
"difficulty" => "0x0",
"gasLimit" => "0x0",
"gasUsed" => "0x0",
"extraData" => "0x0",
"logsBloom" => "0x0",
"miner" => miner_hash_data,
"parentHash" =>
block_hash()
|> to_string(),
"receiptsRoot" => "0x0",
"size" => "0x0",
"sha3Uncles" => "0x0",
"stateRoot" => "0x0",
"timestamp" => "0x0",
"totalDifficulty" => "0x0",
"transactions" => [],
"transactionsRoot" => "0x0",
"uncles" => []
}
}
]}
end)
|> expect(:json_rpc, fn [%{id: id, jsonrpc: "2.0", method: "trace_block", params: [^block_quantity]}], _options ->
{
:ok,
[
%{
id: id,
jsonrpc: "2.0",
result: [
%{
"action" => %{
"author" => miner_hash_data,
"rewardType" => "external",
"value" => "0x0"
},
"blockHash" => to_string(new_block_hash),
"blockNumber" => block_number,
"result" => nil,
"subtraces" => 0,
"traceAddress" => [],
"transactionHash" => nil,
"transactionPosition" => nil,
"type" => "reward"
}
]
}
]
}
end)
assert count(Chain.Block) == 0
assert %{first_block_number: ^block_number, missing_block_count: 1, shrunk: false} =
Fetcher.task(%Fetcher{
blocks_batch_size: 1,
block_fetcher: %Block.Fetcher{
callback_module: Fetcher,
json_rpc_named_arguments: json_rpc_named_arguments
}
})
assert count(Chain.Block) == 1
assert count(Reward) == 0
end
test "async fetches beneficiaries when individual responses error out", %{
json_rpc_named_arguments: json_rpc_named_arguments
} do
CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
InternalTransaction.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
TokenBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
latest_block_number = 1
latest_block_quantity = integer_to_quantity(latest_block_number)
block_number = latest_block_number - 1
block_hash = block_hash()
block_quantity = integer_to_quantity(block_number)
miner_hash = address_hash()
miner_hash_data = to_string(miner_hash)
new_block_hash = block_hash()
refute block_hash == new_block_hash
EthereumJSONRPC.Mox
|> expect(:json_rpc, fn %{method: "eth_getBlockByNumber", params: ["latest", false]}, _options ->
{:ok, %{"number" => latest_block_quantity}}
end)
|> expect(:json_rpc, fn [
%{
id: id,
jsonrpc: "2.0",
method: "eth_getBlockByNumber",
params: [^block_quantity, true]
}
],
_options ->
{:ok,
[
%{
id: id,
jsonrpc: "2.0",
result: %{
"hash" => to_string(block_hash),
"number" => block_quantity,
"difficulty" => "0x0",
"gasLimit" => "0x0",
"gasUsed" => "0x0",
"extraData" => "0x0",
"logsBloom" => "0x0",
"miner" => miner_hash_data,
"parentHash" =>
block_hash()
|> to_string(),
"receiptsRoot" => "0x0",
"size" => "0x0",
"sha3Uncles" => "0x0",
"stateRoot" => "0x0",
"timestamp" => "0x0",
"totalDifficulty" => "0x0",
"transactions" => [],
"transactionsRoot" => "0x0",
"uncles" => []
}
}
]}
end)
|> expect(:json_rpc, fn [%{id: id, method: "trace_block", params: [^block_quantity]}], _options ->
{:ok,
[
%{
id: id,
jsonrpc: "2.0",
result: nil
}
]}
end)
assert count(Chain.Block) == 0
parent = self()
pid =
spawn_link(fn ->
receive do
{:"$gen_call", from, {:buffer, block_numbers}} ->
GenServer.reply(from, :ok)
send(parent, {:block_numbers, block_numbers})
end
end)
Process.register(pid, Indexer.Block.Reward.Fetcher)
assert %{first_block_number: ^block_number, missing_block_count: 1, shrunk: false} =
Fetcher.task(%Fetcher{
blocks_batch_size: 1,
block_fetcher: %Block.Fetcher{
callback_module: Fetcher,
json_rpc_named_arguments: json_rpc_named_arguments
}
})
assert count(Chain.Block) == 1
assert count(Reward) == 0
assert_receive {:block_numbers, [block_number]}, 5_000
end
test "async fetches beneficiaries when entire call errors out", %{
json_rpc_named_arguments: json_rpc_named_arguments
} do
CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
InternalTransaction.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
TokenBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
latest_block_number = 1
latest_block_quantity = integer_to_quantity(latest_block_number)
block_number = latest_block_number - 1
block_hash = block_hash()
block_quantity = integer_to_quantity(block_number)
miner_hash = address_hash()
miner_hash_data = to_string(miner_hash)
new_block_hash = block_hash()
refute block_hash == new_block_hash
EthereumJSONRPC.Mox
|> expect(:json_rpc, fn %{method: "eth_getBlockByNumber", params: ["latest", false]}, _options ->
{:ok, %{"number" => latest_block_quantity}}
end)
|> expect(:json_rpc, fn [
%{
id: id,
jsonrpc: "2.0",
method: "eth_getBlockByNumber",
params: [^block_quantity, true]
}
],
_options ->
{:ok,
[
%{
id: id,
jsonrpc: "2.0",
result: %{
"hash" => to_string(block_hash),
"number" => block_quantity,
"difficulty" => "0x0",
"gasLimit" => "0x0",
"gasUsed" => "0x0",
"extraData" => "0x0",
"logsBloom" => "0x0",
"miner" => miner_hash_data,
"parentHash" =>
block_hash()
|> to_string(),
"receiptsRoot" => "0x0",
"size" => "0x0",
"sha3Uncles" => "0x0",
"stateRoot" => "0x0",
"timestamp" => "0x0",
"totalDifficulty" => "0x0",
"transactions" => [],
"transactionsRoot" => "0x0",
"uncles" => []
}
}
]}
end)
|> expect(:json_rpc, fn [%{method: "trace_block", params: [^block_quantity]}], _options ->
{:error, :boom}
end)
assert count(Chain.Block) == 0
parent = self()
pid =
spawn_link(fn ->
receive do
{:"$gen_call", from, {:buffer, block_numbers}} ->
GenServer.reply(from, :ok)
send(parent, {:block_numbers, block_numbers})
end
end)
Process.register(pid, Indexer.Block.Reward.Fetcher)
assert %{first_block_number: ^block_number, missing_block_count: 1, shrunk: false} =
Fetcher.task(%Fetcher{
blocks_batch_size: 1,
block_fetcher: %Block.Fetcher{
callback_module: Fetcher,
json_rpc_named_arguments: json_rpc_named_arguments
}
})
assert count(Chain.Block) == 1
assert count(Reward) == 0
assert_receive {:block_numbers, [block_number]}, 5_000
end
end
defp count(schema) do
Repo.one!(select(schema, fragment("COUNT(*)")))
end
end end

@ -0,0 +1,687 @@
defmodule Indexer.Block.Reward.FetcherTest do
# MUST be `async: false` so that {:shared, pid} is set for connection to allow CoinBalanceFetcher's self-send to have
# connection allowed immediately.
use EthereumJSONRPC.Case, async: false
use Explorer.DataCase
import EthereumJSONRPC, only: [integer_to_quantity: 1]
import Mox
alias Explorer.Chain
alias Explorer.Chain.{Block, Hash, Wei}
alias Indexer.Block.Reward
alias Indexer.BufferedTask
@moduletag :capture_log
# MUST use global mode because we aren't guaranteed to get `start_supervised`'s pid back fast enough to `allow` it to
# use expectations and stubs from test's pid.
setup :set_mox_global
setup :verify_on_exit!
setup do
start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor})
# Need to always mock to allow consensus switches to happen on demand and protect from them happening when we don't
# want them to.
%{
json_rpc_named_arguments: [
transport: EthereumJSONRPC.Mox,
transport_options: [],
# Which one does not matter, so pick one
variant: EthereumJSONRPC.Parity
]
}
end
describe "init/3" do
test "without blocks" do
assert [] = Reward.Fetcher.init([], &[&1 | &2], nil)
end
test "with consensus block without reward" do
%Block{number: block_number} = insert(:block)
assert [^block_number] = Reward.Fetcher.init([], &[&1 | &2], nil)
end
test "with consensus block with reward" do
block = insert(:block)
insert(:reward, address_hash: block.miner_hash, block_hash: block.hash)
assert [] = Reward.Fetcher.init([], &[&1 | &2], nil)
end
test "with non-consensus block" do
insert(:block, consensus: false)
assert [] = Reward.Fetcher.init([], &[&1 | &2], nil)
end
end
describe "async_fetch/1" do
setup %{json_rpc_named_arguments: json_rpc_named_arguments} do
Reward.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
block = insert(:block)
%{block: block}
end
test "with consensus block without reward", %{
block: %Block{
hash: block_hash,
number: block_number,
miner_hash: %Hash{bytes: miner_hash_bytes} = miner_hash,
consensus: true
}
} do
block_quantity = integer_to_quantity(block_number)
expect(EthereumJSONRPC.Mox, :json_rpc, fn [
%{
id: id,
jsonrpc: "2.0",
method: "trace_block",
params: [^block_quantity]
}
],
_ ->
{
:ok,
[
%{
id: id,
jsonrpc: "2.0",
result: [
%{
"action" => %{
"author" => to_string(miner_hash),
"rewardType" => "external",
"value" => "0x0"
},
# ... but, switches to non-consensus by the time `trace_block` is called
"blockHash" => to_string(block_hash),
"blockNumber" => block_number,
"result" => nil,
"subtraces" => 0,
"traceAddress" => [],
"transactionHash" => nil,
"transactionPosition" => nil,
"type" => "reward"
}
]
}
]
}
end)
assert count(Chain.Block.Reward) == 0
parent = self()
pid =
spawn_link(fn ->
receive do
{:"$gen_call", from, {:buffer, balance_fields}} ->
GenServer.reply(from, :ok)
send(parent, {:balance_fields, balance_fields})
end
end)
Process.register(pid, Indexer.CoinBalance.Fetcher)
assert :ok = Reward.Fetcher.async_fetch([block_number])
wait_for_tasks(Reward.Fetcher)
assert count(Chain.Block.Reward) == 1
assert_receive {:balance_fields, [{^miner_hash_bytes, ^block_number}]}, 500
end
test "with consensus block with reward", %{
block: %Block{
hash: block_hash,
number: block_number,
miner_hash: %Hash{bytes: miner_hash_bytes} = miner_hash,
consensus: true
}
} do
insert(:reward, block_hash: block_hash, address_hash: miner_hash)
block_quantity = integer_to_quantity(block_number)
expect(EthereumJSONRPC.Mox, :json_rpc, fn [
%{
id: id,
jsonrpc: "2.0",
method: "trace_block",
params: [^block_quantity]
}
],
_ ->
{
:ok,
[
%{
id: id,
jsonrpc: "2.0",
result: [
%{
"action" => %{
"author" => to_string(miner_hash),
"rewardType" => "external",
"value" => "0x0"
},
# ... but, switches to non-consensus by the time `trace_block` is called
"blockHash" => to_string(block_hash),
"blockNumber" => block_number,
"result" => nil,
"subtraces" => 0,
"traceAddress" => [],
"transactionHash" => nil,
"transactionPosition" => nil,
"type" => "reward"
}
]
}
]
}
end)
parent = self()
pid =
spawn_link(fn ->
receive do
{:"$gen_call", from, {:buffer, balance_fields}} ->
GenServer.reply(from, :ok)
send(parent, {:balance_fields, balance_fields})
end
end)
Process.register(pid, Indexer.CoinBalance.Fetcher)
assert :ok = Reward.Fetcher.async_fetch([block_number])
wait_for_tasks(Reward.Fetcher)
assert count(Chain.Block.Reward) == 1
assert_receive {:balance_fields, [{^miner_hash_bytes, ^block_number}]}, 500
end
test "with consensus block does not import if fetch beneficiaries returns a different block hash for block number",
%{block: %Block{hash: block_hash, number: block_number, consensus: true, miner_hash: miner_hash}} do
block_quantity = integer_to_quantity(block_number)
new_block_hash = block_hash()
refute block_hash == new_block_hash
expect(EthereumJSONRPC.Mox, :json_rpc, fn [
%{
id: id,
jsonrpc: "2.0",
method: "trace_block",
params: [^block_quantity]
}
],
_ ->
{
:ok,
[
%{
id: id,
jsonrpc: "2.0",
result: [
%{
"action" => %{
"author" => to_string(miner_hash),
"rewardType" => "external",
"value" => "0x0"
},
# ... but, switches to non-consensus by the time `trace_block` is called
"blockHash" => to_string(new_block_hash),
"blockNumber" => block_number,
"result" => nil,
"subtraces" => 0,
"traceAddress" => [],
"transactionHash" => nil,
"transactionPosition" => nil,
"type" => "reward"
}
]
}
]
}
end)
assert :ok = Reward.Fetcher.async_fetch([block_number])
wait_for_tasks(Reward.Fetcher)
assert count(Chain.Block.Reward) == 0
end
end
describe "run/2" do
setup do
block = insert(:block)
%{block: block}
end
test "with consensus block without reward", %{
block: %Block{
hash: block_hash,
number: block_number,
miner_hash: %Hash{bytes: miner_hash_bytes} = miner_hash,
consensus: true
},
json_rpc_named_arguments: json_rpc_named_arguments
} do
block_quantity = integer_to_quantity(block_number)
expect(EthereumJSONRPC.Mox, :json_rpc, fn [
%{
id: id,
jsonrpc: "2.0",
method: "trace_block",
params: [^block_quantity]
}
],
_ ->
{
:ok,
[
%{
id: id,
jsonrpc: "2.0",
result: [
%{
"action" => %{
"author" => to_string(miner_hash),
"rewardType" => "external",
"value" => "0x0"
},
# ... but, switches to non-consensus by the time `trace_block` is called
"blockHash" => to_string(block_hash),
"blockNumber" => block_number,
"result" => nil,
"subtraces" => 0,
"traceAddress" => [],
"transactionHash" => nil,
"transactionPosition" => nil,
"type" => "reward"
}
]
}
]
}
end)
assert count(Chain.Block.Reward) == 0
assert count(Chain.Address.CoinBalance) == 0
parent = self()
pid =
spawn_link(fn ->
receive do
{:"$gen_call", from, {:buffer, balance_fields}} ->
GenServer.reply(from, :ok)
send(parent, {:balance_fields, balance_fields})
end
end)
Process.register(pid, Indexer.CoinBalance.Fetcher)
assert :ok = Reward.Fetcher.run([block_number], json_rpc_named_arguments)
assert count(Chain.Block.Reward) == 1
assert count(Chain.Address.CoinBalance) == 1
assert_receive {:balance_fields, [{^miner_hash_bytes, ^block_number}]}, 500
end
test "with consensus block without reward with new address adds rewards for all addresses", %{
block: %Block{
hash: block_hash,
number: block_number,
miner_hash: %Hash{bytes: miner_hash_bytes} = miner_hash,
consensus: true
},
json_rpc_named_arguments: json_rpc_named_arguments
} do
block_quantity = integer_to_quantity(block_number)
%Hash{bytes: new_address_hash_bytes} = new_address_hash = address_hash()
expect(EthereumJSONRPC.Mox, :json_rpc, fn [
%{
id: id,
jsonrpc: "2.0",
method: "trace_block",
params: [^block_quantity]
}
],
_ ->
{
:ok,
[
%{
id: id,
jsonrpc: "2.0",
result: [
%{
"action" => %{
"author" => to_string(miner_hash),
"rewardType" => "external",
"value" => "0x1"
},
# ... but, switches to non-consensus by the time `trace_block` is called
"blockHash" => to_string(block_hash),
"blockNumber" => block_number,
"result" => nil,
"subtraces" => 0,
"traceAddress" => [],
"transactionHash" => nil,
"transactionPosition" => nil,
"type" => "reward"
},
%{
"action" => %{
"author" => to_string(new_address_hash),
"rewardType" => "external",
"value" => "0x2"
},
"blockHash" => to_string(block_hash),
"blockNumber" => block_number,
"result" => nil,
"subtraces" => 0,
"traceAddress" => [],
"transactionHash" => nil,
"transactionPosition" => nil,
"type" => "reward"
}
]
}
]
}
end)
assert count(Chain.Block.Reward) == 0
assert count(Chain.Address.CoinBalance) == 0
parent = self()
pid =
spawn_link(fn ->
receive do
{:"$gen_call", from, {:buffer, balance_fields}} ->
GenServer.reply(from, :ok)
send(parent, {:balance_fields, balance_fields})
end
end)
Process.register(pid, Indexer.CoinBalance.Fetcher)
assert :ok = Reward.Fetcher.run([block_number], json_rpc_named_arguments)
assert count(Chain.Block.Reward) == 2
assert count(Chain.Address.CoinBalance) == 2
assert_receive {:balance_fields, balance_fields}, 500
assert {miner_hash_bytes, block_number} in balance_fields
assert {new_address_hash_bytes, block_number} in balance_fields
end
test "with consensus block with reward", %{
block: %Block{
hash: block_hash,
number: block_number,
miner_hash: %Hash{bytes: miner_hash_bytes} = miner_hash,
consensus: true
},
json_rpc_named_arguments: json_rpc_named_arguments
} do
insert(:reward, block_hash: block_hash, address_hash: miner_hash, reward: 0)
insert(:unfetched_balance, address_hash: miner_hash, block_number: block_number)
block_quantity = integer_to_quantity(block_number)
expect(EthereumJSONRPC.Mox, :json_rpc, fn [
%{
id: id,
jsonrpc: "2.0",
method: "trace_block",
params: [^block_quantity]
}
],
_ ->
{
:ok,
[
%{
id: id,
jsonrpc: "2.0",
result: [
%{
"action" => %{
"author" => to_string(miner_hash),
"rewardType" => "external",
"value" => "0x1"
},
# ... but, switches to non-consensus by the time `trace_block` is called
"blockHash" => to_string(block_hash),
"blockNumber" => block_number,
"result" => nil,
"subtraces" => 0,
"traceAddress" => [],
"transactionHash" => nil,
"transactionPosition" => nil,
"type" => "reward"
}
]
}
]
}
end)
assert count(Chain.Block.Reward) == 1
assert count(Chain.Address.CoinBalance) == 1
value = Decimal.new(0)
assert [%Chain.Block.Reward{reward: %Wei{value: ^value}}] = Repo.all(Chain.Block.Reward)
parent = self()
pid =
spawn_link(fn ->
receive do
{:"$gen_call", from, {:buffer, balance_fields}} ->
GenServer.reply(from, :ok)
send(parent, {:balance_fields, balance_fields})
end
end)
Process.register(pid, Indexer.CoinBalance.Fetcher)
assert :ok = Reward.Fetcher.run([block_number], json_rpc_named_arguments)
assert count(Chain.Block.Reward) == 1
assert count(Chain.Address.CoinBalance) == 1
value = Decimal.new(1)
assert [%Chain.Block.Reward{reward: %Wei{value: ^value}}] = Repo.all(Chain.Block.Reward)
assert_receive {:balance_fields, [{^miner_hash_bytes, ^block_number}]}, 500
end
test "with consensus block does not import if fetch beneficiaries returns a different block hash for block number",
%{
block: %Block{hash: block_hash, number: block_number, consensus: true, miner_hash: miner_hash},
json_rpc_named_arguments: json_rpc_named_arguments
} do
block_quantity = integer_to_quantity(block_number)
new_block_hash = block_hash()
refute block_hash == new_block_hash
expect(EthereumJSONRPC.Mox, :json_rpc, fn [
%{
id: id,
jsonrpc: "2.0",
method: "trace_block",
params: [^block_quantity]
}
],
_ ->
{
:ok,
[
%{
id: id,
jsonrpc: "2.0",
result: [
%{
"action" => %{
"author" => to_string(miner_hash),
"rewardType" => "external",
"value" => "0x0"
},
# ... but, switches to non-consensus by the time `trace_block` is called
"blockHash" => to_string(new_block_hash),
"blockNumber" => block_number,
"result" => nil,
"subtraces" => 0,
"traceAddress" => [],
"transactionHash" => nil,
"transactionPosition" => nil,
"type" => "reward"
}
]
}
]
}
end)
assert :ok = Reward.Fetcher.run([block_number], json_rpc_named_arguments)
assert count(Chain.Block.Reward) == 0
assert count(Chain.Address.CoinBalance) == 0
end
test "with mix of beneficiaries_params and errors, imports beneficiaries_params and retries errors", %{
block: %Block{
hash: block_hash,
number: block_number,
miner_hash: %Hash{bytes: miner_hash_bytes} = miner_hash,
consensus: true
},
json_rpc_named_arguments: json_rpc_named_arguments
} do
block_quantity = integer_to_quantity(block_number)
%Block{number: error_block_number} = insert(:block)
error_block_quantity = integer_to_quantity(error_block_number)
EthereumJSONRPC.Mox
|> expect(:json_rpc, fn [_, _] = requests, _ ->
{
:ok,
Enum.map(requests, fn
%{
id: id,
jsonrpc: "2.0",
method: "trace_block",
params: [^block_quantity]
} ->
%{
id: id,
jsonrpc: "2.0",
result: [
%{
"action" => %{
"author" => to_string(miner_hash),
"rewardType" => "external",
"value" => "0x1"
},
# ... but, switches to non-consensus by the time `trace_block` is called
"blockHash" => to_string(block_hash),
"blockNumber" => block_number,
"result" => nil,
"subtraces" => 0,
"traceAddress" => [],
"transactionHash" => nil,
"transactionPosition" => nil,
"type" => "reward"
}
]
}
%{id: id, jsonrpc: "2.0", method: "trace_block", params: [^error_block_quantity]} ->
%{id: id, jsonrpc: "2.0", result: nil}
end)
}
end)
assert count(Chain.Block.Reward) == 0
assert count(Chain.Address.CoinBalance) == 0
parent = self()
pid =
spawn_link(fn ->
receive do
{:"$gen_call", from, {:buffer, balance_fields}} ->
GenServer.reply(from, :ok)
send(parent, {:balance_fields, balance_fields})
end
end)
Process.register(pid, Indexer.CoinBalance.Fetcher)
assert {:retry, [^error_block_number]} =
Reward.Fetcher.run([block_number, error_block_number], json_rpc_named_arguments)
assert count(Chain.Block.Reward) == 1
assert count(Chain.Address.CoinBalance) == 1
assert_receive {:balance_fields, balance_fields}, 500
assert {miner_hash_bytes, block_number} in balance_fields
end
end
defp count(schema) do
Repo.one!(select(schema, fragment("COUNT(*)")))
end
defp wait_for_tasks(buffered_task) do
wait_until(:timer.seconds(10), fn ->
counts = BufferedTask.debug_count(buffered_task)
counts.buffer == 0 and counts.tasks == 0
end)
end
defp wait_until(timeout, producer) do
parent = self()
ref = make_ref()
spawn(fn -> do_wait_until(parent, ref, producer) end)
receive do
{^ref, :ok} -> :ok
after
timeout -> exit(:timeout)
end
end
defp do_wait_until(parent, ref, producer) do
if producer.() do
send(parent, {ref, :ok})
else
:timer.sleep(100)
do_wait_until(parent, ref, producer)
end
end
end

@ -1,104 +0,0 @@
defmodule Indexer.Block.UncatalogedRewards.ImporterTest do
use EthereumJSONRPC.Case, async: false
use Explorer.DataCase
import Mox
alias Explorer.Chain.Wei
alias Explorer.Chain.Block.Reward
alias Indexer.Block.UncatalogedRewards.Importer
describe "fetch_and_import_rewards/1" do
test "return `{:ok, []}` when receiving an empty list" do
assert Importer.fetch_and_import_rewards([]) == {:ok, []}
end
@tag :no_geth
test "return `{:ok, [transactions executed]}`" do
address = insert(:address)
address_hash = address.hash
block = insert(:block, number: 1234, miner: address)
block_hash = block.hash
expect(EthereumJSONRPC.Mox, :json_rpc, fn [%{id: id, method: "trace_block", params: _params}], _options ->
{:ok,
[
%{
id: id,
result: [
%{
"action" => %{
"author" => to_string(address_hash),
"rewardType" => "external",
"value" => "0xde0b6b3a7640000"
},
"blockHash" => to_string(block_hash),
"blockNumber" => block.number,
"result" => nil,
"subtraces" => 0,
"traceAddress" => [],
"transactionHash" => nil,
"transactionPosition" => nil,
"type" => "reward"
}
]
}
]}
end)
assert {:ok,
[
ok: %{
"insert_0" => %Reward{
address_hash: ^address_hash,
block_hash: ^block_hash,
address_type: :validator
}
}
]} = Importer.fetch_and_import_rewards([block])
end
@tag :no_geth
test "replaces reward on conflict" do
miner = insert(:address)
block = insert(:block, miner: miner)
block_hash = block.hash
address_type = :validator
insert(:reward, block_hash: block_hash, address_hash: miner.hash, address_type: address_type, reward: 1)
value = "0x2"
expect(EthereumJSONRPC.Mox, :json_rpc, fn [%{id: id, method: "trace_block"}], _options ->
{:ok,
[
%{
id: id,
result: [
%{
"action" => %{
"author" => to_string(miner),
"rewardType" => "external",
"value" => value
},
"blockHash" => to_string(block_hash),
"blockNumber" => block.number,
"result" => nil,
"subtraces" => 0,
"traceAddress" => [],
"transactionHash" => nil,
"transactionPosition" => nil,
"type" => "reward"
}
]
}
]}
end)
{:ok, reward} = Wei.cast(value)
assert {:ok,
[ok: %{"insert_0" => %Reward{block_hash: ^block_hash, address_type: ^address_type, reward: ^reward}}]} =
Importer.fetch_and_import_rewards([block])
end
end
end

@ -0,0 +1,17 @@
defmodule Indexer.Block.Reward.Supervisor.Case do
alias Indexer.Block.Reward
def start_supervised!(fetcher_arguments \\ []) when is_list(fetcher_arguments) do
merged_fetcher_arguments =
Keyword.merge(
fetcher_arguments,
flush_interval: 50,
max_batch_size: 1,
max_concurrency: 1
)
[merged_fetcher_arguments]
|> Reward.Supervisor.child_spec()
|> ExUnit.Callbacks.start_supervised!()
end
end
Loading…
Cancel
Save