Merge pull request #1269 from poanetwork/gsf-block-rewards-for-older-blocks
get rewards of blocks already indexed without rewardspull/1267/head
commit
8a926c1b05
@ -0,0 +1,112 @@ |
||||
defmodule Indexer.Block.UncatalogedRewards.Importer do |
||||
@moduledoc """ |
||||
a module to fetch and import the rewards for blocks that were indexed without the reward |
||||
""" |
||||
|
||||
alias Ecto.Multi |
||||
alias EthereumJSONRPC.FetchedBeneficiaries |
||||
alias Explorer.Chain |
||||
alias Explorer.Chain.{Block.Reward, Wei} |
||||
|
||||
# max number of blocks in a single request |
||||
# higher numbers may cause the requests to time out |
||||
# lower numbers will generate more requests |
||||
@chunk_size 10 |
||||
|
||||
@doc """ |
||||
receives a list of blocks and tries to fetch and insert rewards for them |
||||
""" |
||||
def fetch_and_import_rewards(blocks_batch) do |
||||
result = |
||||
blocks_batch |
||||
|> break_into_chunks_of_block_numbers() |
||||
|> Enum.reduce([], fn chunk, acc -> |
||||
chunk |
||||
|> fetch_beneficiaries() |
||||
|> fetch_block_rewards() |
||||
|> insert_reward_group() |
||||
|> case do |
||||
:empty -> acc |
||||
insert -> [insert | acc] |
||||
end |
||||
end) |
||||
|
||||
{:ok, result} |
||||
rescue |
||||
e in RuntimeError -> {:error, %{exception: e}} |
||||
end |
||||
|
||||
defp fetch_beneficiaries(chunk) do |
||||
{chunk_start, chunk_end} = Enum.min_max(chunk) |
||||
|
||||
{:ok, %FetchedBeneficiaries{params_set: result}} = |
||||
with :ignore <- EthereumJSONRPC.fetch_beneficiaries(chunk_start..chunk_end, json_rpc_named_arguments()) do |
||||
{:ok, %FetchedBeneficiaries{params_set: MapSet.new()}} |
||||
end |
||||
|
||||
result |
||||
end |
||||
|
||||
defp fetch_block_rewards(beneficiaries) do |
||||
Enum.map(beneficiaries, fn beneficiary -> |
||||
beneficiary_changes = |
||||
case beneficiary.address_type do |
||||
:validator -> |
||||
validation_reward = fetch_validation_reward(beneficiary) |
||||
|
||||
{:ok, reward} = Wei.cast(beneficiary.reward) |
||||
|
||||
%{beneficiary | reward: Wei.sum(reward, validation_reward)} |
||||
|
||||
_ -> |
||||
beneficiary |
||||
end |
||||
|
||||
Reward.changeset(%Reward{}, beneficiary_changes) |
||||
end) |
||||
end |
||||
|
||||
defp fetch_validation_reward(beneficiary) do |
||||
{:ok, accumulator} = Wei.cast(0) |
||||
|
||||
beneficiary.block_number |
||||
|> Chain.get_transactions_of_block_number() |
||||
|> Enum.reduce(accumulator, fn t, acc -> |
||||
{:ok, price_as_wei} = Wei.cast(t.gas_used) |
||||
price_as_wei |> Wei.mult(t.gas_price) |> Wei.sum(acc) |
||||
end) |
||||
end |
||||
|
||||
defp break_into_chunks_of_block_numbers(blocks) do |
||||
Enum.chunk_while( |
||||
blocks, |
||||
[], |
||||
fn block, acc -> |
||||
if (acc == [] || hd(acc) + 1 == block.number) && length(acc) < @chunk_size do |
||||
{:cont, [block.number | acc]} |
||||
else |
||||
{:cont, acc, [block.number]} |
||||
end |
||||
end, |
||||
fn |
||||
[] -> {:cont, []} |
||||
acc -> {:cont, acc, []} |
||||
end |
||||
) |
||||
end |
||||
|
||||
defp insert_reward_group([]), do: :empty |
||||
|
||||
defp insert_reward_group(rewards) do |
||||
rewards |
||||
|> Enum.reduce({Multi.new(), 0}, fn changeset, {multi, index} -> |
||||
{Multi.insert(multi, "insert_#{index}", changeset), index + 1} |
||||
end) |
||||
|> elem(0) |
||||
|> Explorer.Repo.transaction() |
||||
end |
||||
|
||||
defp json_rpc_named_arguments do |
||||
Application.get_env(:explorer, :json_rpc_named_arguments) |
||||
end |
||||
end |
@ -0,0 +1,40 @@ |
||||
defmodule Indexer.Block.UncatalogedRewards.Processor do |
||||
@moduledoc """ |
||||
genserver to find blocks without rewards and fetch their rewards in batches |
||||
""" |
||||
|
||||
use GenServer |
||||
|
||||
alias Explorer.Chain |
||||
alias Indexer.Block.UncatalogedRewards.Importer |
||||
|
||||
@max_batch_size 150 |
||||
@default_cooldown 300 |
||||
|
||||
def start_link(_) do |
||||
GenServer.start_link(__MODULE__, :ok, name: __MODULE__) |
||||
end |
||||
|
||||
@impl true |
||||
def init(args) do |
||||
send(self(), :import_batch) |
||||
{:ok, args} |
||||
end |
||||
|
||||
@impl true |
||||
def handle_info(:import_batch, state) do |
||||
@max_batch_size |
||||
|> Chain.get_blocks_without_reward() |
||||
|> import_or_try_later |
||||
|
||||
{:noreply, state} |
||||
end |
||||
|
||||
defp import_or_try_later(batch) do |
||||
import_results = Importer.fetch_and_import_rewards(batch) |
||||
|
||||
wait_time = if import_results == {:ok, []}, do: :timer.hours(24), else: @default_cooldown |
||||
|
||||
Process.send_after(self(), :import_batch, wait_time) |
||||
end |
||||
end |
@ -0,0 +1,64 @@ |
||||
defmodule Indexer.Block.UncatalogedRewards.ImporterTest do |
||||
use EthereumJSONRPC.Case, async: false |
||||
use Explorer.DataCase |
||||
|
||||
import Mox |
||||
import EthereumJSONRPC, only: [integer_to_quantity: 1] |
||||
import EthereumJSONRPC.Case |
||||
|
||||
alias Explorer.Chain |
||||
alias Indexer.Block.UncatalogedRewards.Importer |
||||
|
||||
describe "fetch_and_import_rewards/1" do |
||||
test "return `{:ok, []}` when receiving an empty list" do |
||||
assert Importer.fetch_and_import_rewards([]) == {:ok, []} |
||||
end |
||||
|
||||
@tag :no_geth |
||||
test "return `{:ok, [transactions executed]}`" do |
||||
address = insert(:address) |
||||
block = insert(:block, number: 1234, miner: address) |
||||
|
||||
expect(EthereumJSONRPC.Mox, :json_rpc, fn [%{id: id, method: "trace_block", params: _params}], _options -> |
||||
{:ok, |
||||
[ |
||||
%{ |
||||
id: id, |
||||
result: [ |
||||
%{ |
||||
"action" => %{ |
||||
"author" => to_string(address.hash), |
||||
"rewardType" => "external", |
||||
"value" => "0xde0b6b3a7640000" |
||||
}, |
||||
"blockHash" => to_string(block.hash), |
||||
"blockNumber" => 1234, |
||||
"result" => nil, |
||||
"subtraces" => 0, |
||||
"traceAddress" => [], |
||||
"transactionHash" => nil, |
||||
"transactionPosition" => nil, |
||||
"type" => "reward" |
||||
} |
||||
] |
||||
} |
||||
]} |
||||
end) |
||||
|
||||
expected = |
||||
{:ok, |
||||
[ |
||||
ok: %{ |
||||
"insert_0" => %Explorer.Chain.Block.Reward{ |
||||
address_hash: address.hash, |
||||
block_hash: block.hash, |
||||
address_type: :validator |
||||
} |
||||
} |
||||
]} |
||||
|
||||
result = Importer.fetch_and_import_rewards([block]) |
||||
assert result = expected |
||||
end |
||||
end |
||||
end |
Loading…
Reference in new issue