Merge pull request #1376 from poanetwork/1337
Prevent block rewards for different hash being recorded for same block number during reorgpull/1384/head
commit
6f152dbc64
@ -0,0 +1,246 @@ |
||||
defmodule Indexer.Block.Reward.Fetcher do |
||||
@moduledoc """ |
||||
Fetches `t:Explorer.Chain.Block.Reward.t/0` for a given `t:Explorer.Chain.Block.block_number/0`. |
||||
|
||||
To protect from reorgs where the returned rewards are for same `number`, but a different `hash`, the `hash` is |
||||
retrieved from the database and compared against that returned from `EthereumJSONRPC.` |
||||
""" |
||||
|
||||
use Spandex.Decorators |
||||
|
||||
require Logger |
||||
|
||||
import EthereumJSONRPC, only: [quantity_to_integer: 1] |
||||
|
||||
alias Ecto.Changeset |
||||
alias EthereumJSONRPC.FetchedBeneficiaries |
||||
alias Explorer.Chain |
||||
alias Explorer.Chain.{Block, Wei} |
||||
alias Indexer.Address.CoinBalances |
||||
alias Indexer.{AddressExtraction, BufferedTask, CoinBalance, Tracer} |
||||
|
||||
@behaviour BufferedTask |
||||
|
||||
@defaults [ |
||||
flush_interval: :timer.seconds(3), |
||||
max_batch_size: 10, |
||||
max_concurrency: 4, |
||||
task_supervisor: Indexer.Block.Reward.TaskSupervisor, |
||||
metadata: [fetcher: :block_reward] |
||||
] |
||||
|
||||
@doc """ |
||||
Asynchronously fetches block rewards for each `t:Explorer.Chain.Explorer.block_number/0`` in `block_numbers`. |
||||
""" |
||||
@spec async_fetch([Block.block_number()]) :: :ok |
||||
def async_fetch(block_numbers) when is_list(block_numbers) do |
||||
BufferedTask.buffer(__MODULE__, block_numbers) |
||||
end |
||||
|
||||
@doc false |
||||
# credo:disable-for-next-line Credo.Check.Design.DuplicatedCode |
||||
def child_spec([init_options, gen_server_options]) do |
||||
{state, mergeable_init_options} = Keyword.pop(init_options, :json_rpc_named_arguments) |
||||
|
||||
unless state do |
||||
raise ArgumentError, |
||||
":json_rpc_named_arguments must be provided to `#{__MODULE__}.child_spec " <> |
||||
"to allow for json_rpc calls when running." |
||||
end |
||||
|
||||
merged_init_options = |
||||
@defaults |
||||
|> Keyword.merge(mergeable_init_options) |
||||
|> Keyword.put(:state, state) |
||||
|
||||
Supervisor.child_spec({BufferedTask, [{__MODULE__, merged_init_options}, gen_server_options]}, id: __MODULE__) |
||||
end |
||||
|
||||
@impl BufferedTask |
||||
def init(initial, reducer, _) do |
||||
{:ok, final} = |
||||
Chain.stream_blocks_without_rewards(initial, fn %{number: number}, acc -> |
||||
reducer.(number, acc) |
||||
end) |
||||
|
||||
final |
||||
end |
||||
|
||||
@impl BufferedTask |
||||
@decorate trace(name: "fetch", resource: "Indexer.Block.Reward.Fetcher.run/2", service: :indexer, tracer: Tracer) |
||||
def run(entries, json_rpc_named_arguments) do |
||||
hash_string_by_number = |
||||
entries |
||||
|> Enum.uniq() |
||||
|> hash_string_by_number() |
||||
|
||||
consensus_numbers = Map.keys(hash_string_by_number) |
||||
|
||||
consensus_number_count = Enum.count(consensus_numbers) |
||||
|
||||
Logger.metadata(count: consensus_number_count) |
||||
|
||||
Logger.debug(fn -> "fetching" end) |
||||
|
||||
consensus_numbers |
||||
|> EthereumJSONRPC.fetch_beneficiaries(json_rpc_named_arguments) |
||||
|> case do |
||||
{:ok, fetched_beneficiaries} -> |
||||
run_fetched_beneficiaries(fetched_beneficiaries, hash_string_by_number) |
||||
|
||||
{:error, reason} -> |
||||
Logger.error( |
||||
fn -> |
||||
["failed to fetch: ", inspect(reason)] |
||||
end, |
||||
error_count: consensus_number_count |
||||
) |
||||
|
||||
{:retry, consensus_numbers} |
||||
end |
||||
end |
||||
|
||||
defp hash_string_by_number(numbers) when is_list(numbers) do |
||||
numbers |
||||
|> Chain.block_hash_by_number() |
||||
|> Enum.into(%{}, fn {number, hash} -> |
||||
{number, to_string(hash)} |
||||
end) |
||||
end |
||||
|
||||
defp run_fetched_beneficiaries(%FetchedBeneficiaries{params_set: params_set, errors: errors}, hash_string_by_number) do |
||||
params_set |
||||
|> filter_consensus_params(hash_string_by_number) |
||||
|> case do |
||||
[] -> |
||||
retry_errors(errors) |
||||
|
||||
beneficiaries_params -> |
||||
beneficiaries_params |
||||
|> add_gas_payments() |
||||
|> import_block_reward_params() |
||||
|> case do |
||||
{:ok, %{address_coin_balances: address_coin_balances}} -> |
||||
CoinBalance.Fetcher.async_fetch_balances(address_coin_balances) |
||||
|
||||
retry_errors(errors) |
||||
|
||||
{:error, [%Changeset{} | _] = changesets} -> |
||||
Logger.error(fn -> ["Failed to validate: ", inspect(changesets)] end, |
||||
error_count: Enum.count(hash_string_by_number) |
||||
) |
||||
|
||||
retry_beneficiaries_params(beneficiaries_params) |
||||
|
||||
{:error, step, failed_value, _changes_so_far} -> |
||||
Logger.error(fn -> ["Failed to import", inspect(failed_value)] end, |
||||
step: step, |
||||
error_count: Enum.count(hash_string_by_number) |
||||
) |
||||
|
||||
retry_beneficiaries_params(beneficiaries_params) |
||||
end |
||||
end |
||||
end |
||||
|
||||
defp filter_consensus_params(params_set, hash_string_by_number) do |
||||
Enum.filter(params_set, fn %{block_number: block_number, block_hash: block_hash} -> |
||||
case Map.fetch!(hash_string_by_number, block_number) do |
||||
^block_hash -> |
||||
true |
||||
|
||||
other_block_hash -> |
||||
Logger.debug(fn -> |
||||
[ |
||||
"fetch beneficiaries reported block number (", |
||||
to_string(block_number), |
||||
") maps to different (", |
||||
other_block_hash, |
||||
") block hash than the one in the database (", |
||||
block_hash, |
||||
"). A reorg has occurred." |
||||
] |
||||
end) |
||||
|
||||
false |
||||
end |
||||
end) |
||||
end |
||||
|
||||
defp add_gas_payments(beneficiaries_params) do |
||||
gas_payment_by_block_hash = |
||||
beneficiaries_params |
||||
|> Stream.filter(&(&1.address_type == :validator)) |
||||
|> Enum.map(& &1.block_hash) |
||||
|> Chain.gas_payment_by_block_hash() |
||||
|
||||
Enum.map(beneficiaries_params, fn %{block_hash: block_hash} = beneficiary -> |
||||
case gas_payment_by_block_hash do |
||||
%{^block_hash => gas_payment} -> |
||||
{:ok, minted} = Wei.cast(beneficiary.reward) |
||||
%{beneficiary | reward: Wei.sum(minted, gas_payment)} |
||||
|
||||
_ -> |
||||
beneficiary |
||||
end |
||||
end) |
||||
end |
||||
|
||||
defp import_block_reward_params(block_rewards_params) when is_list(block_rewards_params) do |
||||
addresses_params = AddressExtraction.extract_addresses(%{block_reward_contract_beneficiaries: block_rewards_params}) |
||||
address_coin_balances_params_set = CoinBalances.params_set(%{beneficiary_params: block_rewards_params}) |
||||
|
||||
Chain.import(%{ |
||||
addresses: %{params: addresses_params}, |
||||
address_coin_balances: %{params: address_coin_balances_params_set}, |
||||
block_rewards: %{params: block_rewards_params} |
||||
}) |
||||
end |
||||
|
||||
defp retry_beneficiaries_params(beneficiaries_params) when is_list(beneficiaries_params) do |
||||
entries = Enum.map(beneficiaries_params, & &1.block_number) |
||||
|
||||
{:retry, entries} |
||||
end |
||||
|
||||
defp retry_errors([]), do: :ok |
||||
|
||||
defp retry_errors(errors) when is_list(errors) do |
||||
retried_entries = fetched_beneficiaries_errors_to_entries(errors) |
||||
|
||||
Logger.error( |
||||
fn -> |
||||
[ |
||||
"failed to fetch: ", |
||||
fetched_beneficiaries_errors_to_iodata(errors) |
||||
] |
||||
end, |
||||
error_count: Enum.count(retried_entries) |
||||
) |
||||
|
||||
{:retry, retried_entries} |
||||
end |
||||
|
||||
defp fetched_beneficiaries_errors_to_entries(errors) when is_list(errors) do |
||||
Enum.map(errors, &fetched_beneficiary_error_to_entry/1) |
||||
end |
||||
|
||||
defp fetched_beneficiary_error_to_entry(%{data: %{block_quantity: block_quantity}}) when is_binary(block_quantity) do |
||||
quantity_to_integer(block_quantity) |
||||
end |
||||
|
||||
defp fetched_beneficiaries_errors_to_iodata(errors) when is_list(errors) do |
||||
fetched_beneficiaries_errors_to_iodata(errors, []) |
||||
end |
||||
|
||||
defp fetched_beneficiaries_errors_to_iodata([], iodata), do: iodata |
||||
|
||||
defp fetched_beneficiaries_errors_to_iodata([error | errors], iodata) do |
||||
fetched_beneficiaries_errors_to_iodata(errors, [iodata | fetched_beneficiary_error_to_iodata(error)]) |
||||
end |
||||
|
||||
defp fetched_beneficiary_error_to_iodata(%{code: code, message: message, data: %{block_quantity: block_quantity}}) |
||||
when is_integer(code) and is_binary(message) and is_binary(block_quantity) do |
||||
["@", quantity_to_integer(block_quantity), ": (", to_string(code), ") ", message, ?\n] |
||||
end |
||||
end |
@ -0,0 +1,38 @@ |
||||
defmodule Indexer.Block.Reward.Supervisor do |
||||
@moduledoc """ |
||||
Supervises `Indexer.Block.Reward.Fetcher` and its batch tasks through `Indexer.Block.Reward.TaskSupervisor` |
||||
""" |
||||
|
||||
use Supervisor |
||||
|
||||
alias Indexer.Block.Reward.Fetcher |
||||
|
||||
def child_spec([init_arguments]) do |
||||
child_spec([init_arguments, []]) |
||||
end |
||||
|
||||
def child_spec([_init_arguments, _gen_server_options] = start_link_arguments) do |
||||
default = %{ |
||||
id: __MODULE__, |
||||
start: {__MODULE__, :start_link, start_link_arguments}, |
||||
type: :supervisor |
||||
} |
||||
|
||||
Supervisor.child_spec(default, []) |
||||
end |
||||
|
||||
def start_link(arguments, gen_server_options \\ []) do |
||||
Supervisor.start_link(__MODULE__, arguments, Keyword.put_new(gen_server_options, :name, __MODULE__)) |
||||
end |
||||
|
||||
@impl Supervisor |
||||
def init(fetcher_arguments) do |
||||
Supervisor.init( |
||||
[ |
||||
{Task.Supervisor, name: Indexer.Block.Reward.TaskSupervisor}, |
||||
{Fetcher, [fetcher_arguments, [name: Fetcher]]} |
||||
], |
||||
strategy: :one_for_one |
||||
) |
||||
end |
||||
end |
@ -1,115 +0,0 @@ |
||||
defmodule Indexer.Block.UncatalogedRewards.Importer do |
||||
@moduledoc """ |
||||
a module to fetch and import the rewards for blocks that were indexed without the reward |
||||
""" |
||||
|
||||
alias Ecto.Multi |
||||
alias EthereumJSONRPC.FetchedBeneficiaries |
||||
alias Explorer.Chain |
||||
alias Explorer.Chain.{Block.Reward, Wei} |
||||
|
||||
# max number of blocks in a single request |
||||
# higher numbers may cause the requests to time out |
||||
# lower numbers will generate more requests |
||||
@chunk_size 10 |
||||
|
||||
@doc """ |
||||
receives a list of blocks and tries to fetch and insert rewards for them |
||||
""" |
||||
def fetch_and_import_rewards(blocks_batch) do |
||||
result = |
||||
blocks_batch |
||||
|> break_into_chunks_of_block_numbers() |
||||
|> Enum.reduce([], fn chunk, acc -> |
||||
chunk |
||||
|> fetch_beneficiaries() |
||||
|> fetch_block_rewards() |
||||
|> insert_reward_group() |
||||
|> case do |
||||
:empty -> acc |
||||
insert -> [insert | acc] |
||||
end |
||||
end) |
||||
|
||||
{:ok, result} |
||||
rescue |
||||
e in RuntimeError -> {:error, %{exception: e}} |
||||
end |
||||
|
||||
defp fetch_beneficiaries(chunk) do |
||||
{chunk_start, chunk_end} = Enum.min_max(chunk) |
||||
|
||||
{:ok, %FetchedBeneficiaries{params_set: result}} = |
||||
with :ignore <- EthereumJSONRPC.fetch_beneficiaries(chunk_start..chunk_end, json_rpc_named_arguments()) do |
||||
{:ok, %FetchedBeneficiaries{params_set: MapSet.new()}} |
||||
end |
||||
|
||||
result |
||||
end |
||||
|
||||
defp fetch_block_rewards(beneficiaries) do |
||||
Enum.map(beneficiaries, fn beneficiary -> |
||||
beneficiary_changes = |
||||
case beneficiary.address_type do |
||||
:validator -> |
||||
validation_reward = fetch_validation_reward(beneficiary) |
||||
|
||||
{:ok, reward} = Wei.cast(beneficiary.reward) |
||||
|
||||
%{beneficiary | reward: Wei.sum(reward, validation_reward)} |
||||
|
||||
_ -> |
||||
beneficiary |
||||
end |
||||
|
||||
Reward.changeset(%Reward{}, beneficiary_changes) |
||||
end) |
||||
end |
||||
|
||||
defp fetch_validation_reward(beneficiary) do |
||||
{:ok, accumulator} = Wei.cast(0) |
||||
|
||||
beneficiary.block_number |
||||
|> Chain.get_transactions_of_block_number() |
||||
|> Enum.reduce(accumulator, fn t, acc -> |
||||
{:ok, price_as_wei} = Wei.cast(t.gas_used) |
||||
price_as_wei |> Wei.mult(t.gas_price) |> Wei.sum(acc) |
||||
end) |
||||
end |
||||
|
||||
defp break_into_chunks_of_block_numbers(blocks) do |
||||
Enum.chunk_while( |
||||
blocks, |
||||
[], |
||||
fn block, acc -> |
||||
if (acc == [] || hd(acc) + 1 == block.number) && length(acc) < @chunk_size do |
||||
{:cont, [block.number | acc]} |
||||
else |
||||
{:cont, acc, [block.number]} |
||||
end |
||||
end, |
||||
fn |
||||
[] -> {:cont, []} |
||||
acc -> {:cont, acc, []} |
||||
end |
||||
) |
||||
end |
||||
|
||||
defp insert_reward_group([]), do: :empty |
||||
|
||||
defp insert_reward_group(rewards) do |
||||
rewards |
||||
|> Enum.reduce({Multi.new(), 0}, fn changeset, {multi, index} -> |
||||
{Multi.insert(multi, "insert_#{index}", changeset, |
||||
conflict_target: ~w(address_hash address_type block_hash), |
||||
on_conflict: {:replace, [:reward]} |
||||
), index + 1} |
||||
end) |
||||
|> elem(0) |
||||
|> Explorer.Repo.transaction() |
||||
end |
||||
|
||||
defp json_rpc_named_arguments do |
||||
Application.get_env(:explorer, :json_rpc_named_arguments) |
||||
end |
||||
end |
@ -1,40 +0,0 @@ |
||||
defmodule Indexer.Block.UncatalogedRewards.Processor do |
||||
@moduledoc """ |
||||
genserver to find blocks without rewards and fetch their rewards in batches |
||||
""" |
||||
|
||||
use GenServer |
||||
|
||||
alias Explorer.Chain |
||||
alias Indexer.Block.UncatalogedRewards.Importer |
||||
|
||||
@max_batch_size 150 |
||||
@default_cooldown 300 |
||||
|
||||
def start_link(_) do |
||||
GenServer.start_link(__MODULE__, :ok, name: __MODULE__) |
||||
end |
||||
|
||||
@impl true |
||||
def init(args) do |
||||
send(self(), :import_batch) |
||||
{:ok, args} |
||||
end |
||||
|
||||
@impl true |
||||
def handle_info(:import_batch, state) do |
||||
@max_batch_size |
||||
|> Chain.get_blocks_without_reward() |
||||
|> import_or_try_later |
||||
|
||||
{:noreply, state} |
||||
end |
||||
|
||||
defp import_or_try_later(batch) do |
||||
import_results = Importer.fetch_and_import_rewards(batch) |
||||
|
||||
wait_time = if import_results == {:ok, []}, do: :timer.hours(24), else: @default_cooldown |
||||
|
||||
Process.send_after(self(), :import_batch, wait_time) |
||||
end |
||||
end |
@ -0,0 +1,687 @@ |
||||
defmodule Indexer.Block.Reward.FetcherTest do |
||||
# MUST be `async: false` so that {:shared, pid} is set for connection to allow CoinBalanceFetcher's self-send to have |
||||
# connection allowed immediately. |
||||
use EthereumJSONRPC.Case, async: false |
||||
use Explorer.DataCase |
||||
|
||||
import EthereumJSONRPC, only: [integer_to_quantity: 1] |
||||
import Mox |
||||
|
||||
alias Explorer.Chain |
||||
alias Explorer.Chain.{Block, Hash, Wei} |
||||
alias Indexer.Block.Reward |
||||
alias Indexer.BufferedTask |
||||
|
||||
@moduletag :capture_log |
||||
|
||||
# MUST use global mode because we aren't guaranteed to get `start_supervised`'s pid back fast enough to `allow` it to |
||||
# use expectations and stubs from test's pid. |
||||
setup :set_mox_global |
||||
|
||||
setup :verify_on_exit! |
||||
|
||||
setup do |
||||
start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor}) |
||||
|
||||
# Need to always mock to allow consensus switches to happen on demand and protect from them happening when we don't |
||||
# want them to. |
||||
%{ |
||||
json_rpc_named_arguments: [ |
||||
transport: EthereumJSONRPC.Mox, |
||||
transport_options: [], |
||||
# Which one does not matter, so pick one |
||||
variant: EthereumJSONRPC.Parity |
||||
] |
||||
} |
||||
end |
||||
|
||||
describe "init/3" do |
||||
test "without blocks" do |
||||
assert [] = Reward.Fetcher.init([], &[&1 | &2], nil) |
||||
end |
||||
|
||||
test "with consensus block without reward" do |
||||
%Block{number: block_number} = insert(:block) |
||||
|
||||
assert [^block_number] = Reward.Fetcher.init([], &[&1 | &2], nil) |
||||
end |
||||
|
||||
test "with consensus block with reward" do |
||||
block = insert(:block) |
||||
insert(:reward, address_hash: block.miner_hash, block_hash: block.hash) |
||||
|
||||
assert [] = Reward.Fetcher.init([], &[&1 | &2], nil) |
||||
end |
||||
|
||||
test "with non-consensus block" do |
||||
insert(:block, consensus: false) |
||||
|
||||
assert [] = Reward.Fetcher.init([], &[&1 | &2], nil) |
||||
end |
||||
end |
||||
|
||||
describe "async_fetch/1" do |
||||
setup %{json_rpc_named_arguments: json_rpc_named_arguments} do |
||||
Reward.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) |
||||
|
||||
block = insert(:block) |
||||
|
||||
%{block: block} |
||||
end |
||||
|
||||
test "with consensus block without reward", %{ |
||||
block: %Block{ |
||||
hash: block_hash, |
||||
number: block_number, |
||||
miner_hash: %Hash{bytes: miner_hash_bytes} = miner_hash, |
||||
consensus: true |
||||
} |
||||
} do |
||||
block_quantity = integer_to_quantity(block_number) |
||||
|
||||
expect(EthereumJSONRPC.Mox, :json_rpc, fn [ |
||||
%{ |
||||
id: id, |
||||
jsonrpc: "2.0", |
||||
method: "trace_block", |
||||
params: [^block_quantity] |
||||
} |
||||
], |
||||
_ -> |
||||
{ |
||||
:ok, |
||||
[ |
||||
%{ |
||||
id: id, |
||||
jsonrpc: "2.0", |
||||
result: [ |
||||
%{ |
||||
"action" => %{ |
||||
"author" => to_string(miner_hash), |
||||
"rewardType" => "external", |
||||
"value" => "0x0" |
||||
}, |
||||
# ... but, switches to non-consensus by the time `trace_block` is called |
||||
"blockHash" => to_string(block_hash), |
||||
"blockNumber" => block_number, |
||||
"result" => nil, |
||||
"subtraces" => 0, |
||||
"traceAddress" => [], |
||||
"transactionHash" => nil, |
||||
"transactionPosition" => nil, |
||||
"type" => "reward" |
||||
} |
||||
] |
||||
} |
||||
] |
||||
} |
||||
end) |
||||
|
||||
assert count(Chain.Block.Reward) == 0 |
||||
|
||||
parent = self() |
||||
|
||||
pid = |
||||
spawn_link(fn -> |
||||
receive do |
||||
{:"$gen_call", from, {:buffer, balance_fields}} -> |
||||
GenServer.reply(from, :ok) |
||||
send(parent, {:balance_fields, balance_fields}) |
||||
end |
||||
end) |
||||
|
||||
Process.register(pid, Indexer.CoinBalance.Fetcher) |
||||
|
||||
assert :ok = Reward.Fetcher.async_fetch([block_number]) |
||||
|
||||
wait_for_tasks(Reward.Fetcher) |
||||
|
||||
assert count(Chain.Block.Reward) == 1 |
||||
assert_receive {:balance_fields, [{^miner_hash_bytes, ^block_number}]}, 500 |
||||
end |
||||
|
||||
test "with consensus block with reward", %{ |
||||
block: %Block{ |
||||
hash: block_hash, |
||||
number: block_number, |
||||
miner_hash: %Hash{bytes: miner_hash_bytes} = miner_hash, |
||||
consensus: true |
||||
} |
||||
} do |
||||
insert(:reward, block_hash: block_hash, address_hash: miner_hash) |
||||
|
||||
block_quantity = integer_to_quantity(block_number) |
||||
|
||||
expect(EthereumJSONRPC.Mox, :json_rpc, fn [ |
||||
%{ |
||||
id: id, |
||||
jsonrpc: "2.0", |
||||
method: "trace_block", |
||||
params: [^block_quantity] |
||||
} |
||||
], |
||||
_ -> |
||||
{ |
||||
:ok, |
||||
[ |
||||
%{ |
||||
id: id, |
||||
jsonrpc: "2.0", |
||||
result: [ |
||||
%{ |
||||
"action" => %{ |
||||
"author" => to_string(miner_hash), |
||||
"rewardType" => "external", |
||||
"value" => "0x0" |
||||
}, |
||||
# ... but, switches to non-consensus by the time `trace_block` is called |
||||
"blockHash" => to_string(block_hash), |
||||
"blockNumber" => block_number, |
||||
"result" => nil, |
||||
"subtraces" => 0, |
||||
"traceAddress" => [], |
||||
"transactionHash" => nil, |
||||
"transactionPosition" => nil, |
||||
"type" => "reward" |
||||
} |
||||
] |
||||
} |
||||
] |
||||
} |
||||
end) |
||||
|
||||
parent = self() |
||||
|
||||
pid = |
||||
spawn_link(fn -> |
||||
receive do |
||||
{:"$gen_call", from, {:buffer, balance_fields}} -> |
||||
GenServer.reply(from, :ok) |
||||
send(parent, {:balance_fields, balance_fields}) |
||||
end |
||||
end) |
||||
|
||||
Process.register(pid, Indexer.CoinBalance.Fetcher) |
||||
|
||||
assert :ok = Reward.Fetcher.async_fetch([block_number]) |
||||
|
||||
wait_for_tasks(Reward.Fetcher) |
||||
|
||||
assert count(Chain.Block.Reward) == 1 |
||||
assert_receive {:balance_fields, [{^miner_hash_bytes, ^block_number}]}, 500 |
||||
end |
||||
|
||||
test "with consensus block does not import if fetch beneficiaries returns a different block hash for block number", |
||||
%{block: %Block{hash: block_hash, number: block_number, consensus: true, miner_hash: miner_hash}} do |
||||
block_quantity = integer_to_quantity(block_number) |
||||
new_block_hash = block_hash() |
||||
|
||||
refute block_hash == new_block_hash |
||||
|
||||
expect(EthereumJSONRPC.Mox, :json_rpc, fn [ |
||||
%{ |
||||
id: id, |
||||
jsonrpc: "2.0", |
||||
method: "trace_block", |
||||
params: [^block_quantity] |
||||
} |
||||
], |
||||
_ -> |
||||
{ |
||||
:ok, |
||||
[ |
||||
%{ |
||||
id: id, |
||||
jsonrpc: "2.0", |
||||
result: [ |
||||
%{ |
||||
"action" => %{ |
||||
"author" => to_string(miner_hash), |
||||
"rewardType" => "external", |
||||
"value" => "0x0" |
||||
}, |
||||
# ... but, switches to non-consensus by the time `trace_block` is called |
||||
"blockHash" => to_string(new_block_hash), |
||||
"blockNumber" => block_number, |
||||
"result" => nil, |
||||
"subtraces" => 0, |
||||
"traceAddress" => [], |
||||
"transactionHash" => nil, |
||||
"transactionPosition" => nil, |
||||
"type" => "reward" |
||||
} |
||||
] |
||||
} |
||||
] |
||||
} |
||||
end) |
||||
|
||||
assert :ok = Reward.Fetcher.async_fetch([block_number]) |
||||
|
||||
wait_for_tasks(Reward.Fetcher) |
||||
|
||||
assert count(Chain.Block.Reward) == 0 |
||||
end |
||||
end |
||||
|
||||
describe "run/2" do |
||||
setup do |
||||
block = insert(:block) |
||||
|
||||
%{block: block} |
||||
end |
||||
|
||||
test "with consensus block without reward", %{ |
||||
block: %Block{ |
||||
hash: block_hash, |
||||
number: block_number, |
||||
miner_hash: %Hash{bytes: miner_hash_bytes} = miner_hash, |
||||
consensus: true |
||||
}, |
||||
json_rpc_named_arguments: json_rpc_named_arguments |
||||
} do |
||||
block_quantity = integer_to_quantity(block_number) |
||||
|
||||
expect(EthereumJSONRPC.Mox, :json_rpc, fn [ |
||||
%{ |
||||
id: id, |
||||
jsonrpc: "2.0", |
||||
method: "trace_block", |
||||
params: [^block_quantity] |
||||
} |
||||
], |
||||
_ -> |
||||
{ |
||||
:ok, |
||||
[ |
||||
%{ |
||||
id: id, |
||||
jsonrpc: "2.0", |
||||
result: [ |
||||
%{ |
||||
"action" => %{ |
||||
"author" => to_string(miner_hash), |
||||
"rewardType" => "external", |
||||
"value" => "0x0" |
||||
}, |
||||
# ... but, switches to non-consensus by the time `trace_block` is called |
||||
"blockHash" => to_string(block_hash), |
||||
"blockNumber" => block_number, |
||||
"result" => nil, |
||||
"subtraces" => 0, |
||||
"traceAddress" => [], |
||||
"transactionHash" => nil, |
||||
"transactionPosition" => nil, |
||||
"type" => "reward" |
||||
} |
||||
] |
||||
} |
||||
] |
||||
} |
||||
end) |
||||
|
||||
assert count(Chain.Block.Reward) == 0 |
||||
assert count(Chain.Address.CoinBalance) == 0 |
||||
|
||||
parent = self() |
||||
|
||||
pid = |
||||
spawn_link(fn -> |
||||
receive do |
||||
{:"$gen_call", from, {:buffer, balance_fields}} -> |
||||
GenServer.reply(from, :ok) |
||||
send(parent, {:balance_fields, balance_fields}) |
||||
end |
||||
end) |
||||
|
||||
Process.register(pid, Indexer.CoinBalance.Fetcher) |
||||
|
||||
assert :ok = Reward.Fetcher.run([block_number], json_rpc_named_arguments) |
||||
|
||||
assert count(Chain.Block.Reward) == 1 |
||||
assert count(Chain.Address.CoinBalance) == 1 |
||||
assert_receive {:balance_fields, [{^miner_hash_bytes, ^block_number}]}, 500 |
||||
end |
||||
|
||||
test "with consensus block without reward with new address adds rewards for all addresses", %{ |
||||
block: %Block{ |
||||
hash: block_hash, |
||||
number: block_number, |
||||
miner_hash: %Hash{bytes: miner_hash_bytes} = miner_hash, |
||||
consensus: true |
||||
}, |
||||
json_rpc_named_arguments: json_rpc_named_arguments |
||||
} do |
||||
block_quantity = integer_to_quantity(block_number) |
||||
%Hash{bytes: new_address_hash_bytes} = new_address_hash = address_hash() |
||||
|
||||
expect(EthereumJSONRPC.Mox, :json_rpc, fn [ |
||||
%{ |
||||
id: id, |
||||
jsonrpc: "2.0", |
||||
method: "trace_block", |
||||
params: [^block_quantity] |
||||
} |
||||
], |
||||
_ -> |
||||
{ |
||||
:ok, |
||||
[ |
||||
%{ |
||||
id: id, |
||||
jsonrpc: "2.0", |
||||
result: [ |
||||
%{ |
||||
"action" => %{ |
||||
"author" => to_string(miner_hash), |
||||
"rewardType" => "external", |
||||
"value" => "0x1" |
||||
}, |
||||
# ... but, switches to non-consensus by the time `trace_block` is called |
||||
"blockHash" => to_string(block_hash), |
||||
"blockNumber" => block_number, |
||||
"result" => nil, |
||||
"subtraces" => 0, |
||||
"traceAddress" => [], |
||||
"transactionHash" => nil, |
||||
"transactionPosition" => nil, |
||||
"type" => "reward" |
||||
}, |
||||
%{ |
||||
"action" => %{ |
||||
"author" => to_string(new_address_hash), |
||||
"rewardType" => "external", |
||||
"value" => "0x2" |
||||
}, |
||||
"blockHash" => to_string(block_hash), |
||||
"blockNumber" => block_number, |
||||
"result" => nil, |
||||
"subtraces" => 0, |
||||
"traceAddress" => [], |
||||
"transactionHash" => nil, |
||||
"transactionPosition" => nil, |
||||
"type" => "reward" |
||||
} |
||||
] |
||||
} |
||||
] |
||||
} |
||||
end) |
||||
|
||||
assert count(Chain.Block.Reward) == 0 |
||||
assert count(Chain.Address.CoinBalance) == 0 |
||||
|
||||
parent = self() |
||||
|
||||
pid = |
||||
spawn_link(fn -> |
||||
receive do |
||||
{:"$gen_call", from, {:buffer, balance_fields}} -> |
||||
GenServer.reply(from, :ok) |
||||
send(parent, {:balance_fields, balance_fields}) |
||||
end |
||||
end) |
||||
|
||||
Process.register(pid, Indexer.CoinBalance.Fetcher) |
||||
|
||||
assert :ok = Reward.Fetcher.run([block_number], json_rpc_named_arguments) |
||||
|
||||
assert count(Chain.Block.Reward) == 2 |
||||
assert count(Chain.Address.CoinBalance) == 2 |
||||
|
||||
assert_receive {:balance_fields, balance_fields}, 500 |
||||
assert {miner_hash_bytes, block_number} in balance_fields |
||||
assert {new_address_hash_bytes, block_number} in balance_fields |
||||
end |
||||
|
||||
test "with consensus block with reward", %{ |
||||
block: %Block{ |
||||
hash: block_hash, |
||||
number: block_number, |
||||
miner_hash: %Hash{bytes: miner_hash_bytes} = miner_hash, |
||||
consensus: true |
||||
}, |
||||
json_rpc_named_arguments: json_rpc_named_arguments |
||||
} do |
||||
insert(:reward, block_hash: block_hash, address_hash: miner_hash, reward: 0) |
||||
insert(:unfetched_balance, address_hash: miner_hash, block_number: block_number) |
||||
|
||||
block_quantity = integer_to_quantity(block_number) |
||||
|
||||
expect(EthereumJSONRPC.Mox, :json_rpc, fn [ |
||||
%{ |
||||
id: id, |
||||
jsonrpc: "2.0", |
||||
method: "trace_block", |
||||
params: [^block_quantity] |
||||
} |
||||
], |
||||
_ -> |
||||
{ |
||||
:ok, |
||||
[ |
||||
%{ |
||||
id: id, |
||||
jsonrpc: "2.0", |
||||
result: [ |
||||
%{ |
||||
"action" => %{ |
||||
"author" => to_string(miner_hash), |
||||
"rewardType" => "external", |
||||
"value" => "0x1" |
||||
}, |
||||
# ... but, switches to non-consensus by the time `trace_block` is called |
||||
"blockHash" => to_string(block_hash), |
||||
"blockNumber" => block_number, |
||||
"result" => nil, |
||||
"subtraces" => 0, |
||||
"traceAddress" => [], |
||||
"transactionHash" => nil, |
||||
"transactionPosition" => nil, |
||||
"type" => "reward" |
||||
} |
||||
] |
||||
} |
||||
] |
||||
} |
||||
end) |
||||
|
||||
assert count(Chain.Block.Reward) == 1 |
||||
assert count(Chain.Address.CoinBalance) == 1 |
||||
|
||||
value = Decimal.new(0) |
||||
|
||||
assert [%Chain.Block.Reward{reward: %Wei{value: ^value}}] = Repo.all(Chain.Block.Reward) |
||||
|
||||
parent = self() |
||||
|
||||
pid = |
||||
spawn_link(fn -> |
||||
receive do |
||||
{:"$gen_call", from, {:buffer, balance_fields}} -> |
||||
GenServer.reply(from, :ok) |
||||
send(parent, {:balance_fields, balance_fields}) |
||||
end |
||||
end) |
||||
|
||||
Process.register(pid, Indexer.CoinBalance.Fetcher) |
||||
|
||||
assert :ok = Reward.Fetcher.run([block_number], json_rpc_named_arguments) |
||||
|
||||
assert count(Chain.Block.Reward) == 1 |
||||
assert count(Chain.Address.CoinBalance) == 1 |
||||
|
||||
value = Decimal.new(1) |
||||
|
||||
assert [%Chain.Block.Reward{reward: %Wei{value: ^value}}] = Repo.all(Chain.Block.Reward) |
||||
assert_receive {:balance_fields, [{^miner_hash_bytes, ^block_number}]}, 500 |
||||
end |
||||
|
||||
test "with consensus block does not import if fetch beneficiaries returns a different block hash for block number", |
||||
%{ |
||||
block: %Block{hash: block_hash, number: block_number, consensus: true, miner_hash: miner_hash}, |
||||
json_rpc_named_arguments: json_rpc_named_arguments |
||||
} do |
||||
block_quantity = integer_to_quantity(block_number) |
||||
new_block_hash = block_hash() |
||||
|
||||
refute block_hash == new_block_hash |
||||
|
||||
expect(EthereumJSONRPC.Mox, :json_rpc, fn [ |
||||
%{ |
||||
id: id, |
||||
jsonrpc: "2.0", |
||||
method: "trace_block", |
||||
params: [^block_quantity] |
||||
} |
||||
], |
||||
_ -> |
||||
{ |
||||
:ok, |
||||
[ |
||||
%{ |
||||
id: id, |
||||
jsonrpc: "2.0", |
||||
result: [ |
||||
%{ |
||||
"action" => %{ |
||||
"author" => to_string(miner_hash), |
||||
"rewardType" => "external", |
||||
"value" => "0x0" |
||||
}, |
||||
# ... but, switches to non-consensus by the time `trace_block` is called |
||||
"blockHash" => to_string(new_block_hash), |
||||
"blockNumber" => block_number, |
||||
"result" => nil, |
||||
"subtraces" => 0, |
||||
"traceAddress" => [], |
||||
"transactionHash" => nil, |
||||
"transactionPosition" => nil, |
||||
"type" => "reward" |
||||
} |
||||
] |
||||
} |
||||
] |
||||
} |
||||
end) |
||||
|
||||
assert :ok = Reward.Fetcher.run([block_number], json_rpc_named_arguments) |
||||
|
||||
assert count(Chain.Block.Reward) == 0 |
||||
assert count(Chain.Address.CoinBalance) == 0 |
||||
end |
||||
|
||||
test "with mix of beneficiaries_params and errors, imports beneficiaries_params and retries errors", %{ |
||||
block: %Block{ |
||||
hash: block_hash, |
||||
number: block_number, |
||||
miner_hash: %Hash{bytes: miner_hash_bytes} = miner_hash, |
||||
consensus: true |
||||
}, |
||||
json_rpc_named_arguments: json_rpc_named_arguments |
||||
} do |
||||
block_quantity = integer_to_quantity(block_number) |
||||
%Block{number: error_block_number} = insert(:block) |
||||
|
||||
error_block_quantity = integer_to_quantity(error_block_number) |
||||
|
||||
EthereumJSONRPC.Mox |
||||
|> expect(:json_rpc, fn [_, _] = requests, _ -> |
||||
{ |
||||
:ok, |
||||
Enum.map(requests, fn |
||||
%{ |
||||
id: id, |
||||
jsonrpc: "2.0", |
||||
method: "trace_block", |
||||
params: [^block_quantity] |
||||
} -> |
||||
%{ |
||||
id: id, |
||||
jsonrpc: "2.0", |
||||
result: [ |
||||
%{ |
||||
"action" => %{ |
||||
"author" => to_string(miner_hash), |
||||
"rewardType" => "external", |
||||
"value" => "0x1" |
||||
}, |
||||
# ... but, switches to non-consensus by the time `trace_block` is called |
||||
"blockHash" => to_string(block_hash), |
||||
"blockNumber" => block_number, |
||||
"result" => nil, |
||||
"subtraces" => 0, |
||||
"traceAddress" => [], |
||||
"transactionHash" => nil, |
||||
"transactionPosition" => nil, |
||||
"type" => "reward" |
||||
} |
||||
] |
||||
} |
||||
|
||||
%{id: id, jsonrpc: "2.0", method: "trace_block", params: [^error_block_quantity]} -> |
||||
%{id: id, jsonrpc: "2.0", result: nil} |
||||
end) |
||||
} |
||||
end) |
||||
|
||||
assert count(Chain.Block.Reward) == 0 |
||||
assert count(Chain.Address.CoinBalance) == 0 |
||||
|
||||
parent = self() |
||||
|
||||
pid = |
||||
spawn_link(fn -> |
||||
receive do |
||||
{:"$gen_call", from, {:buffer, balance_fields}} -> |
||||
GenServer.reply(from, :ok) |
||||
send(parent, {:balance_fields, balance_fields}) |
||||
end |
||||
end) |
||||
|
||||
Process.register(pid, Indexer.CoinBalance.Fetcher) |
||||
|
||||
assert {:retry, [^error_block_number]} = |
||||
Reward.Fetcher.run([block_number, error_block_number], json_rpc_named_arguments) |
||||
|
||||
assert count(Chain.Block.Reward) == 1 |
||||
assert count(Chain.Address.CoinBalance) == 1 |
||||
|
||||
assert_receive {:balance_fields, balance_fields}, 500 |
||||
assert {miner_hash_bytes, block_number} in balance_fields |
||||
end |
||||
end |
||||
|
||||
defp count(schema) do |
||||
Repo.one!(select(schema, fragment("COUNT(*)"))) |
||||
end |
||||
|
||||
defp wait_for_tasks(buffered_task) do |
||||
wait_until(:timer.seconds(10), fn -> |
||||
counts = BufferedTask.debug_count(buffered_task) |
||||
counts.buffer == 0 and counts.tasks == 0 |
||||
end) |
||||
end |
||||
|
||||
defp wait_until(timeout, producer) do |
||||
parent = self() |
||||
ref = make_ref() |
||||
|
||||
spawn(fn -> do_wait_until(parent, ref, producer) end) |
||||
|
||||
receive do |
||||
{^ref, :ok} -> :ok |
||||
after |
||||
timeout -> exit(:timeout) |
||||
end |
||||
end |
||||
|
||||
defp do_wait_until(parent, ref, producer) do |
||||
if producer.() do |
||||
send(parent, {ref, :ok}) |
||||
else |
||||
:timer.sleep(100) |
||||
do_wait_until(parent, ref, producer) |
||||
end |
||||
end |
||||
end |
@ -1,104 +0,0 @@ |
||||
defmodule Indexer.Block.UncatalogedRewards.ImporterTest do |
||||
use EthereumJSONRPC.Case, async: false |
||||
use Explorer.DataCase |
||||
|
||||
import Mox |
||||
|
||||
alias Explorer.Chain.Wei |
||||
alias Explorer.Chain.Block.Reward |
||||
alias Indexer.Block.UncatalogedRewards.Importer |
||||
|
||||
describe "fetch_and_import_rewards/1" do |
||||
test "return `{:ok, []}` when receiving an empty list" do |
||||
assert Importer.fetch_and_import_rewards([]) == {:ok, []} |
||||
end |
||||
|
||||
@tag :no_geth |
||||
test "return `{:ok, [transactions executed]}`" do |
||||
address = insert(:address) |
||||
address_hash = address.hash |
||||
|
||||
block = insert(:block, number: 1234, miner: address) |
||||
block_hash = block.hash |
||||
|
||||
expect(EthereumJSONRPC.Mox, :json_rpc, fn [%{id: id, method: "trace_block", params: _params}], _options -> |
||||
{:ok, |
||||
[ |
||||
%{ |
||||
id: id, |
||||
result: [ |
||||
%{ |
||||
"action" => %{ |
||||
"author" => to_string(address_hash), |
||||
"rewardType" => "external", |
||||
"value" => "0xde0b6b3a7640000" |
||||
}, |
||||
"blockHash" => to_string(block_hash), |
||||
"blockNumber" => block.number, |
||||
"result" => nil, |
||||
"subtraces" => 0, |
||||
"traceAddress" => [], |
||||
"transactionHash" => nil, |
||||
"transactionPosition" => nil, |
||||
"type" => "reward" |
||||
} |
||||
] |
||||
} |
||||
]} |
||||
end) |
||||
|
||||
assert {:ok, |
||||
[ |
||||
ok: %{ |
||||
"insert_0" => %Reward{ |
||||
address_hash: ^address_hash, |
||||
block_hash: ^block_hash, |
||||
address_type: :validator |
||||
} |
||||
} |
||||
]} = Importer.fetch_and_import_rewards([block]) |
||||
end |
||||
|
||||
@tag :no_geth |
||||
test "replaces reward on conflict" do |
||||
miner = insert(:address) |
||||
block = insert(:block, miner: miner) |
||||
block_hash = block.hash |
||||
address_type = :validator |
||||
insert(:reward, block_hash: block_hash, address_hash: miner.hash, address_type: address_type, reward: 1) |
||||
value = "0x2" |
||||
|
||||
expect(EthereumJSONRPC.Mox, :json_rpc, fn [%{id: id, method: "trace_block"}], _options -> |
||||
{:ok, |
||||
[ |
||||
%{ |
||||
id: id, |
||||
result: [ |
||||
%{ |
||||
"action" => %{ |
||||
"author" => to_string(miner), |
||||
"rewardType" => "external", |
||||
"value" => value |
||||
}, |
||||
"blockHash" => to_string(block_hash), |
||||
"blockNumber" => block.number, |
||||
"result" => nil, |
||||
"subtraces" => 0, |
||||
"traceAddress" => [], |
||||
"transactionHash" => nil, |
||||
"transactionPosition" => nil, |
||||
"type" => "reward" |
||||
} |
||||
] |
||||
} |
||||
]} |
||||
end) |
||||
|
||||
{:ok, reward} = Wei.cast(value) |
||||
|
||||
assert {:ok, |
||||
[ok: %{"insert_0" => %Reward{block_hash: ^block_hash, address_type: ^address_type, reward: ^reward}}]} = |
||||
Importer.fetch_and_import_rewards([block]) |
||||
end |
||||
end |
||||
end |
@ -0,0 +1,17 @@ |
||||
defmodule Indexer.Block.Reward.Supervisor.Case do |
||||
alias Indexer.Block.Reward |
||||
|
||||
def start_supervised!(fetcher_arguments \\ []) when is_list(fetcher_arguments) do |
||||
merged_fetcher_arguments = |
||||
Keyword.merge( |
||||
fetcher_arguments, |
||||
flush_interval: 50, |
||||
max_batch_size: 1, |
||||
max_concurrency: 1 |
||||
) |
||||
|
||||
[merged_fetcher_arguments] |
||||
|> Reward.Supervisor.child_spec() |
||||
|> ExUnit.Callbacks.start_supervised!() |
||||
end |
||||
end |
Loading…
Reference in new issue