Denormalization improvements

v6.0.0-dev
Qwerty5Uiop 12 months ago committed by Viktor Baranov
parent 27d042fc8f
commit 39a8d6094a
  1. 2
      .dialyzer-ignore
  2. 4
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/block.ex
  3. 34
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/blocks.ex
  4. 75
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/transaction.ex
  5. 4
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/transactions.ex
  6. 12
      apps/explorer/lib/explorer/chain.ex
  7. 0
      apps/explorer/lib/explorer/chain/address_internal_transaction_csv_exporter.ex
  8. 0
      apps/explorer/lib/explorer/chain/address_token_transfer_csv_exporter.ex
  9. 0
      apps/explorer/lib/explorer/chain/address_transaction_csv_exporter.ex
  10. 4
      apps/explorer/lib/explorer/chain/denormalization_helper.ex
  11. 2
      apps/explorer/lib/explorer/chain/transaction.ex
  12. 16
      apps/explorer/lib/explorer/transactions_denormalization_migrator.ex
  13. 31
      apps/explorer/lib/explorer/utility/migration_status.ex
  14. 11
      apps/explorer/priv/repo/migrations/20220315082902_add_consensus_to_transaction_table.exs
  15. 11
      apps/explorer/priv/repo/migrations/20220315093927_add_block_timestamp_to_transaction_table.exs
  16. 13
      apps/explorer/priv/repo/migrations/20231212101547_add_block_timestamp_and_consensus_to_transactions.exs
  17. 12
      apps/explorer/priv/repo/migrations/20231212102127_create_migrations_status.exs
  18. 16
      apps/explorer/test/explorer/chain_test.exs
  19. 2
      apps/indexer/test/indexer/block/catchup/fetcher_test.exs
  20. 2
      apps/indexer/test/indexer/fetcher/internal_transaction_test.exs
  21. 2
      apps/indexer/test/indexer/fetcher/uncle_block_test.exs
  22. 4
      config/runtime.exs

@ -23,4 +23,4 @@ lib/indexer/fetcher/zkevm/transaction_batch.ex:156
lib/indexer/fetcher/zkevm/transaction_batch.ex:252
lib/block_scout_web/views/api/v2/transaction_view.ex:431
lib/block_scout_web/views/api/v2/transaction_view.ex:472
lib/explorer/chain/transaction.ex:167
lib/explorer/chain/transaction.ex:169

@ -788,8 +788,8 @@ defmodule EthereumJSONRPC.Block do
{key, timestamp_to_datetime(timestamp)}
end
defp entry_to_elixir({"transactions" = key, transactions}, _block) do
{key, Transactions.to_elixir(transactions)}
defp entry_to_elixir({"transactions" = key, transactions}, %{"timestamp" => block_timestamp}) do
{key, Transactions.to_elixir(transactions, timestamp_to_datetime(block_timestamp))}
end
defp entry_to_elixir({"withdrawals" = key, nil}, _block) do

@ -54,13 +54,12 @@ defmodule EthereumJSONRPC.Blocks do
transactions_params = Transactions.elixir_to_params(elixir_transactions)
withdrawals_params = Withdrawals.elixir_to_params(elixir_withdrawals)
blocks_params = elixir_to_params(elixir_blocks)
transactions_params_with_block_timestamp = add_timestamp_to_transactions_params(transactions_params, blocks_params)
%__MODULE__{
errors: errors,
blocks_params: blocks_params,
block_second_degree_relations_params: block_second_degree_relations_params,
transactions_params: transactions_params_with_block_timestamp,
transactions_params: transactions_params,
withdrawals_params: withdrawals_params
}
end
@ -455,35 +454,4 @@ defmodule EthereumJSONRPC.Blocks do
def to_elixir(blocks) when is_list(blocks) do
Enum.map(blocks, &Block.to_elixir/1)
end
defp add_timestamp_to_transactions_params(transactions_params, blocks_params) do
block_hashes =
transactions_params
|> Enum.map(fn %{block_hash: block_hash} -> block_hash end)
|> Enum.uniq()
block_hash_timestamp_map =
block_hashes
|> Enum.map(fn block_hash ->
block =
Enum.find(blocks_params, fn block_param ->
block_param.hash == block_hash
end)
%{}
|> Map.put("#{block_hash}", block.timestamp)
end)
|> Enum.reduce(%{}, fn hash_timestamp_map_item, acc ->
Map.merge(acc, hash_timestamp_map_item)
end)
transactions_params
|> Enum.map(fn transactions_param ->
Map.put(
transactions_param,
:block_timestamp,
Map.get(block_hash_timestamp_map, "#{transactions_param.block_hash}")
)
end)
end
end

@ -237,11 +237,10 @@ defmodule EthereumJSONRPC.Transaction do
result
end
if transaction["creates"] do
Map.put(result, :created_contract_address_hash, transaction["creates"])
else
result
end
put_if_present(transaction, result, [
{"creates", :created_contract_address_hash},
{"block_timestamp", :block_timestamp}
])
end
def elixir_to_params(
@ -286,11 +285,10 @@ defmodule EthereumJSONRPC.Transaction do
max_fee_per_gas: max_fee_per_gas
}
if transaction["creates"] do
Map.put(result, :created_contract_address_hash, transaction["creates"])
else
result
end
put_if_present(transaction, result, [
{"creates", :created_contract_address_hash},
{"block_timestamp", :block_timestamp}
])
end
# txpool_content method on Erigon node returns tx data
@ -336,11 +334,10 @@ defmodule EthereumJSONRPC.Transaction do
max_fee_per_gas: max_fee_per_gas
}
if transaction["creates"] do
Map.put(result, :created_contract_address_hash, transaction["creates"])
else
result
end
put_if_present(transaction, result, [
{"creates", :created_contract_address_hash},
{"block_timestamp", :block_timestamp}
])
end
# this is for Suave chain (handles `executionNode` and `requestRecord` fields without EIP-1559 fields)
@ -407,11 +404,10 @@ defmodule EthereumJSONRPC.Transaction do
result
end
if transaction["creates"] do
Map.put(result, :created_contract_address_hash, transaction["creates"])
else
result
end
put_if_present(transaction, result, [
{"creates", :created_contract_address_hash},
{"block_timestamp", :block_timestamp}
])
end
def elixir_to_params(
@ -452,11 +448,10 @@ defmodule EthereumJSONRPC.Transaction do
type: type
}
if transaction["creates"] do
Map.put(result, :created_contract_address_hash, transaction["creates"])
else
result
end
put_if_present(transaction, result, [
{"creates", :created_contract_address_hash},
{"block_timestamp", :block_timestamp}
])
end
def elixir_to_params(
@ -495,11 +490,10 @@ defmodule EthereumJSONRPC.Transaction do
transaction_index: index
}
if transaction["creates"] do
Map.put(result, :created_contract_address_hash, transaction["creates"])
else
result
end
put_if_present(transaction, result, [
{"creates", :created_contract_address_hash},
{"block_timestamp", :block_timestamp}
])
end
@doc """
@ -580,11 +574,14 @@ defmodule EthereumJSONRPC.Transaction do
}
"""
def to_elixir(transaction) when is_map(transaction) do
Enum.into(transaction, %{}, &entry_to_elixir/1)
def to_elixir(transaction, block_timestamp \\ nil)
def to_elixir(transaction, block_timestamp) when is_map(transaction) do
initial = (block_timestamp && %{"block_timestamp" => block_timestamp}) || %{}
Enum.into(transaction, initial, &entry_to_elixir/1)
end
def to_elixir(transaction) when is_binary(transaction) do
def to_elixir(transaction, _block_timestamp) when is_binary(transaction) do
nil
end
@ -658,4 +655,16 @@ defmodule EthereumJSONRPC.Transaction do
defp entry_to_elixir(_) do
{nil, nil}
end
defp put_if_present(transaction, result, keys) do
Enum.reduce(keys, result, fn {from_key, to_key}, acc ->
value = transaction[from_key]
if value do
Map.put(acc, to_key, value)
else
acc
end
end)
end
end

@ -151,9 +151,9 @@ defmodule EthereumJSONRPC.Transactions do
]
"""
def to_elixir(transactions) when is_list(transactions) do
def to_elixir(transactions, block_timestamp \\ nil) when is_list(transactions) do
transactions
|> Enum.map(&Transaction.to_elixir/1)
|> Enum.map(&Transaction.to_elixir(&1, block_timestamp))
|> Enum.filter(&(!is_nil(&1)))
end
end

@ -539,13 +539,17 @@ defmodule Explorer.Chain do
)
end
initial_gas_payments =
block_hashes
|> Enum.map(&{&1, %Wei{value: Decimal.new(0)}})
|> Enum.into(%{})
existing_data =
query
|> Repo.all()
|> (&if(Enum.count(&1) > 0,
do: &1,
else: Enum.zip([block_hashes, for(_ <- 1..Enum.count(block_hashes), do: %Wei{value: Decimal.new(0)})])
)).()
|> Enum.into(%{})
Map.merge(initial_gas_payments, existing_data)
end
def timestamp_by_block_hash(block_hashes) when is_list(block_hashes) do

@ -1,5 +1,7 @@
defmodule Explorer.Chain.DenormalizationHelper do
@moduledoc false
@moduledoc """
Helper functions for dynamic logic based on denormalization migration completeness
"""
alias Explorer.Chain.Cache.BackgroundMigrations

@ -553,10 +553,10 @@ defmodule Explorer.Chain.Transaction do
|> unique_constraint(:hash)
end
@spec block_timestamp(t()) :: DateTime.t()
def block_timestamp(%{block_number: nil, inserted_at: time}), do: time
def block_timestamp(%{block_timestamp: time}) when not is_nil(time), do: time
def block_timestamp(%{block: %{timestamp: time}}), do: time
def block_timestamp(_), do: nil
def preload_token_transfers(query, address_hash) do
token_transfers_query =

@ -10,9 +10,10 @@ defmodule Explorer.TransactionsDenormalizationMigrator do
alias Explorer.Chain.Cache.BackgroundMigrations
alias Explorer.Chain.Transaction
alias Explorer.Repo
alias Explorer.Utility.MigrationStatus
@default_batch_size 500
@default_concurrency 4 * System.schedulers_online()
@migration_name "denormalization"
@spec start_link(term()) :: GenServer.on_start()
def start_link(_) do
@ -25,15 +26,23 @@ defmodule Explorer.TransactionsDenormalizationMigrator do
@impl true
def init(_) do
case MigrationStatus.get_status(@migration_name) do
"completed" ->
:ignore
_ ->
MigrationStatus.set_status(@migration_name, "started")
schedule_batch_migration()
{:ok, %{}}
end
end
@impl true
def handle_info(:migrate_batch, state) do
case last_unprocessed_transaction_hashes() do
[] ->
BackgroundMigrations.set_denormalization_finished(true)
MigrationStatus.set_status(@migration_name, "completed")
{:stop, :normal, state}
hashes ->
@ -54,7 +63,6 @@ defmodule Explorer.TransactionsDenormalizationMigrator do
limit = batch_size() * concurrency()
unprocessed_transactions_query()
|> order_by(desc: :inserted_at)
|> select([t], t.hash)
|> limit(^limit)
|> Repo.all()
@ -86,6 +94,8 @@ defmodule Explorer.TransactionsDenormalizationMigrator do
end
defp concurrency do
Application.get_env(:explorer, __MODULE__)[:batch_size] || @default_concurrency
default = 4 * System.schedulers_online()
Application.get_env(:explorer, __MODULE__)[:concurrency] || default
end
end

@ -0,0 +1,31 @@
defmodule Explorer.Utility.MigrationStatus do
@moduledoc """
Module is responsible for keeping the current status of background migrations.
"""
use Explorer.Schema
alias Explorer.Repo
@primary_key false
schema "migrations_status" do
field(:migration_name, :string)
field(:status, :string)
timestamps()
end
@doc false
def changeset(migration_status \\ %__MODULE__{}, params) do
cast(migration_status, params, [:migration_name, :status])
end
def get_status(migration_name) do
Repo.one(from(ms in __MODULE__, where: ms.migration_name == ^migration_name, select: ms.status))
end
def set_status(migration_name, status) do
%{migration_name: migration_name, status: status}
|> changeset()
|> Repo.insert(on_conflict: :replace_all, conflict_target: :migration_name)
end
end

@ -1,11 +0,0 @@
defmodule Explorer.Repo.Migrations.AddConsensusToTransactionTable do
use Ecto.Migration
def change do
alter table("transactions") do
add(:block_consensus, :boolean, default: true)
end
create(index(:transactions, :block_consensus))
end
end

@ -1,11 +0,0 @@
defmodule Explorer.Repo.Migrations.AddBlockTimestampToTransactionTable do
use Ecto.Migration
def change do
alter table("transactions") do
add(:block_timestamp, :utc_datetime_usec)
end
create(index(:transactions, :block_timestamp))
end
end

@ -0,0 +1,13 @@
defmodule Explorer.Repo.Migrations.AddBlockTimestampAndConsensusToTransactions do
use Ecto.Migration
def change do
alter table(:transactions) do
add_if_not_exists(:block_timestamp, :utc_datetime_usec)
add_if_not_exists(:block_consensus, :boolean, default: true)
end
create_if_not_exists(index(:transactions, :block_timestamp))
create_if_not_exists(index(:transactions, :block_consensus))
end
end

@ -0,0 +1,12 @@
defmodule Explorer.Repo.Migrations.CreateMigrationsStatus do
use Ecto.Migration
def change do
create table(:migrations_status, primary_key: false) do
add(:migration_name, :string, primary_key: true)
add(:status, :string)
timestamps()
end
end
end

@ -869,7 +869,9 @@ defmodule Explorer.ChainTest do
test "returns the correct address if it exists" do
address = insert(:address)
assert {:ok, _address} = Chain.hash_to_address(address.hash)
assert {:ok, address_from_db} = Chain.hash_to_address(address.hash)
assert address_from_db.hash == address.hash
assert address_from_db.inserted_at == address.inserted_at
end
test "has_decompiled_code? is true if there are decompiled contracts" do
@ -918,14 +920,16 @@ defmodule Explorer.ChainTest do
test "returns an address if it already exists" do
address = insert(:address)
assert {:ok, _address} = Chain.find_or_insert_address_from_hash(address.hash)
assert {:ok, address_from_db} = Chain.find_or_insert_address_from_hash(address.hash)
assert address_from_db.hash == address.hash
assert address_from_db.inserted_at == address.inserted_at
end
test "returns an address if it doesn't exist" do
hash_str = "0xcbbcd5ac86f9a50e13313633b262e16f695a90c2"
{:ok, hash} = Chain.string_to_address_hash(hash_str)
assert {:ok, %Chain.Address{hash: _hash}} = Chain.find_or_insert_address_from_hash(hash)
assert {:ok, %Chain.Address{hash: ^hash}} = Chain.find_or_insert_address_from_hash(hash)
end
end
@ -3984,7 +3988,11 @@ defmodule Explorer.ChainTest do
assert {:ok, result} = Chain.token_from_address_hash(token.contract_address_hash, options)
assert result.contract_address.smart_contract
assert address.smart_contract.address_hash == result.contract_address.smart_contract.address_hash
assert address.smart_contract.contract_code_md5 == result.contract_address.smart_contract.contract_code_md5
assert address.smart_contract.abi == result.contract_address.smart_contract.abi
assert address.smart_contract.contract_source_code == result.contract_address.smart_contract.contract_source_code
assert address.smart_contract.name == result.contract_address.smart_contract.name
end
end

@ -456,7 +456,7 @@ defmodule Indexer.Block.Catchup.FetcherTest do
assert count(Chain.Block) == 1
assert count(Reward) == 0
assert_receive {:block_numbers, [_block_number]}, 5_000
assert_receive {:block_numbers, [^block_number]}, 5_000
end
test "async fetches beneficiaries when entire call errors out", %{

@ -305,7 +305,7 @@ defmodule Indexer.Fetcher.InternalTransactionTest do
assert {:retry, [block.number]} == InternalTransaction.run([block.number, block.number], json_rpc_named_arguments)
assert %{block_hash: _block_hash} = Repo.get(PendingBlockOperation, block_hash)
assert %{block_hash: ^block_hash} = Repo.get(PendingBlockOperation, block_hash)
end
test "remove block consensus on foreign_key_violation", %{

@ -196,7 +196,7 @@ defmodule Indexer.Fetcher.UncleBlockTest do
]}
end)
assert {:retry, [_entry]} =
assert {:retry, [^entry]} =
UncleBlock.run(entries, %Block.Fetcher{json_rpc_named_arguments: json_rpc_named_arguments})
end
end

@ -456,6 +456,10 @@ config :explorer, Explorer.MicroserviceInterfaces.BENS,
service_url: System.get_env("MICROSERVICE_BENS_URL"),
enabled: ConfigHelper.parse_bool_env_var("MICROSERVICE_BENS_ENABLED")
config :explorer, Explorer.TransactionsDenormalizationMigrator,
batch_size: ConfigHelper.parse_integer_env_var("DENORMALIZATION_MIGRATION_BATCH_SIZE", 500),
concurrency: ConfigHelper.parse_integer_env_var("DENORMALIZATION_MIGRATION_CONCURRENCY", 10)
###############
### Indexer ###
###############

Loading…
Cancel
Save