diff --git a/apps/explorer/.sobelow-conf b/apps/explorer/.sobelow-conf index de56ac1b78..c97cb5e14b 100644 --- a/apps/explorer/.sobelow-conf +++ b/apps/explorer/.sobelow-conf @@ -1,7 +1,7 @@ [ verbose: false, private: true, - skip: false, + skip: true, exit: "low", format: "compact", ignore: ["Config.HTTPS"], diff --git a/apps/explorer/lib/explorer/chain/import.ex b/apps/explorer/lib/explorer/chain/import.ex index 2566a17324..470a93851a 100644 --- a/apps/explorer/lib/explorer/chain/import.ex +++ b/apps/explorer/lib/explorer/chain/import.ex @@ -3,9 +3,10 @@ defmodule Explorer.Chain.Import do Bulk importing of data into `Explorer.Repo` """ - import Ecto.Query, only: [from: 2] + import Ecto.Query, only: [from: 2, update: 2] alias Ecto.{Changeset, Multi} + alias Ecto.Adapters.SQL alias Explorer.Chain.{ Address, @@ -218,6 +219,9 @@ defmodule Explorer.Chain.Import do * `:timeout` - the timeout for inserting all transactions found in the params lists across all types. Defaults to `#{@insert_transactions_timeout}` milliseconds. * `:with` - the changeset function on `Explorer.Chain.Transaction` to use validate `:params`. + * `:transaction_forks` + * `:params` - `list` of params for `Explorer.Chain.Transaction.Fork.changeset/2`. + * `:timeout` - the timeout for inserting all transaction forks. * `:token_balances` * `:params` - `list` of params for `Explorer.Chain.TokenBalance.changeset/2` * `:timeout` - the timeout for `Repo.transaction`. Defaults to `#{@transaction_timeout}` milliseconds. @@ -377,16 +381,30 @@ defmodule Explorer.Chain.Import do case ecto_schema_module_to_changes_list_map do %{Block => blocks_changes} -> timestamps = Map.fetch!(options, :timestamps) + blocks_timeout = options[:blocks][:timeout] || @insert_blocks_timeout + where_forked = where_forked(blocks_changes) multi + |> Multi.run(:derive_transaction_forks, fn _ -> + derive_transaction_forks(%{ + timeout: options[:transaction_forks][:timeout] || @insert_transaction_forks_timeout, + timestamps: timestamps, + where_forked: where_forked + }) + end) + # MUST be after `:derive_transaction_forks`, which depends on values in `transactions` table + |> Multi.run(:fork_transactions, fn _ -> + fork_transactions(%{ + timeout: options[:transactions][:timeout] || @insert_transactions_timeout, + timestamps: timestamps, + where_forked: where_forked + }) + end) + |> Multi.run(:lose_consenus, fn _ -> + lose_consensus(blocks_changes, %{timeout: blocks_timeout, timestamps: timestamps}) + end) |> Multi.run(:blocks, fn _ -> - insert_blocks( - blocks_changes, - %{ - timeout: options[:blocks][:timeout] || @insert_blocks_timeout, - timestamps: timestamps - } - ) + insert_blocks(blocks_changes, %{timeout: blocks_timeout, timestamps: timestamps}) end) |> Multi.run(:uncle_fetched_block_second_degree_relations, fn %{blocks: blocks} when is_list(blocks) -> update_block_second_degree_relations( @@ -505,9 +523,7 @@ defmodule Explorer.Chain.Import do when is_map(ecto_schema_module_to_changes_list) and is_map(options) do case ecto_schema_module_to_changes_list do %{Token => tokens_changes} -> - tokens_options = Map.fetch!(options, :tokens) - timestamps = Map.fetch!(options, :timestamps) - on_conflict = Map.fetch!(tokens_options, :on_conflict) + %{timestamps: timestamps, tokens: %{on_conflict: on_conflict}} = options Multi.run(multi, :tokens, fn _ -> insert_tokens( @@ -976,6 +992,115 @@ defmodule Explorer.Chain.Import do {:ok, inserted} end + defp fork_transactions(%{timeout: timeout, timestamps: %{updated_at: updated_at}, where_forked: where_forked}) do + query = + where_forked + |> update( + set: [ + block_hash: nil, + block_number: nil, + gas_used: nil, + cumulative_gas_used: nil, + index: nil, + internal_transactions_indexed_at: nil, + status: nil, + updated_at: ^updated_at + ] + ) + + try do + {_, result} = Repo.update_all(query, [], timeout: timeout, returning: [:hash]) + + {:ok, result} + rescue + postgrex_error in Postgrex.Error -> + {:error, %{exception: postgrex_error}} + end + end + + defp where_forked(blocks_changes) when is_list(blocks_changes) do + Enum.reduce(blocks_changes, Transaction, fn %{consensus: consensus, hash: hash, number: number}, acc -> + case consensus do + false -> + from(transaction in acc, or_where: transaction.block_hash == ^hash and transaction.block_number == ^number) + + true -> + from(transaction in acc, or_where: transaction.block_hash != ^hash and transaction.block_number == ^number) + end + end) + end + + # sobelow_skip ["SQL.Query"] + defp derive_transaction_forks(%{ + timeout: timeout, + timestamps: %{inserted_at: inserted_at, updated_at: updated_at}, + where_forked: where_forked + }) do + query = + from(transaction in where_forked, + select: [ + transaction.block_hash, + transaction.index, + transaction.hash, + type(^inserted_at, transaction.inserted_at), + type(^updated_at, transaction.updated_at) + ] + ) + + {sql, parameters} = SQL.to_sql(:all, Repo, query) + + {:ok, %Postgrex.Result{columns: ["uncle_hash", "hash"], command: :insert, rows: rows}} = + SQL.query( + Repo, + """ + INSERT INTO transaction_forks (uncle_hash, index, hash, inserted_at, updated_at) + #{sql} + RETURNING uncle_hash, hash + """, + parameters, + timeout: timeout + ) + + derived_transaction_forks = Enum.map(rows, fn [uncle_hash, hash] -> %{uncle_hash: uncle_hash, hash: hash} end) + + {:ok, derived_transaction_forks} + end + + defp lose_consensus(blocks_changes, %{timeout: timeout, timestamps: %{updated_at: updated_at}}) + when is_list(blocks_changes) do + ordered_consensus_block_number = + blocks_changes + |> Enum.reduce(MapSet.new(), fn + %{consensus: true, number: number}, acc -> + MapSet.put(acc, number) + + %{consensus: false}, acc -> + acc + end) + |> Enum.sort() + + query = + from( + block in Block, + where: block.number in ^ordered_consensus_block_number, + update: [ + set: [ + consensus: false, + updated_at: ^updated_at + ] + ] + ) + + try do + {_, result} = Repo.update_all(query, [], timeout: timeout, returning: [:hash, :number]) + + {:ok, result} + rescue + postgrex_error in Postgrex.Error -> + {:error, %{exception: postgrex_error, consensus_block_numbers: ordered_consensus_block_number}} + end + end + defp update_block_second_degree_relations(blocks, %{timeout: timeout, timestamps: %{updated_at: updated_at}}) when is_list(blocks) do ordered_uncle_hashes = diff --git a/apps/explorer/test/explorer/chain/import_test.exs b/apps/explorer/test/explorer/chain/import_test.exs index 1e39d49983..59145714cd 100644 --- a/apps/explorer/test/explorer/chain/import_test.exs +++ b/apps/explorer/test/explorer/chain/import_test.exs @@ -16,6 +16,8 @@ defmodule Explorer.Chain.ImportTest do Transaction } + @moduletag :capturelog + doctest Import describe "all/1" do @@ -1119,5 +1121,513 @@ defmodule Explorer.Chain.ImportTest do assert Repo.aggregate(Transaction.Fork, :count, :hash) == 1 end + + test "reorganizations can switch blocks to non-consensus with new block taking the consensus spot for the number" do + block_number = 0 + + miner_hash_before = address_hash() + from_address_hash_before = address_hash() + block_hash_before = block_hash() + difficulty_before = Decimal.new(0) + gas_limit_before = Decimal.new(0) + gas_used_before = Decimal.new(0) + {:ok, nonce_before} = Hash.Nonce.cast(0) + parent_hash_before = block_hash() + size_before = 0 + timestamp_before = Timex.parse!("2019-01-01T01:00:00Z", "{ISO:Extended:Z}") + total_difficulty_before = Decimal.new(0) + + assert {:ok, _} = + Import.all(%{ + addresses: %{ + params: [ + %{hash: miner_hash_before}, + %{hash: from_address_hash_before} + ] + }, + blocks: %{ + params: [ + %{ + consensus: true, + difficulty: difficulty_before, + gas_limit: gas_limit_before, + gas_used: gas_used_before, + hash: block_hash_before, + miner_hash: miner_hash_before, + nonce: nonce_before, + number: block_number, + parent_hash: parent_hash_before, + size: size_before, + timestamp: timestamp_before, + total_difficulty: total_difficulty_before + } + ] + } + }) + + %Block{consensus: true, number: ^block_number} = Repo.get(Block, block_hash_before) + + miner_hash_after = address_hash() + from_address_hash_after = address_hash() + block_hash_after = block_hash() + + assert {:ok, _} = + Import.all(%{ + addresses: %{ + params: [ + %{hash: miner_hash_after}, + %{hash: from_address_hash_after} + ] + }, + blocks: %{ + params: [ + %{ + consensus: true, + difficulty: 1, + gas_limit: 1, + gas_used: 1, + hash: block_hash_after, + miner_hash: miner_hash_after, + nonce: 1, + number: block_number, + parent_hash: block_hash(), + size: 1, + timestamp: Timex.parse!("2019-01-01T02:00:00Z", "{ISO:Extended:Z}"), + total_difficulty: 1 + } + ] + } + }) + + # new block grabs `consensus` + assert %Block{ + consensus: true, + difficulty: difficulty_after, + gas_limit: gas_limit_after, + gas_used: gas_used_after, + nonce: nonce_after, + number: ^block_number, + parent_hash: parent_hash_after, + size: size_after, + timestamp: timestamp_after, + total_difficulty: total_difficulty_after + } = Repo.get(Block, block_hash_after) + + refute difficulty_after == difficulty_before + refute gas_limit_after == gas_limit_before + refute gas_used_after == gas_used_before + refute nonce_after == nonce_before + refute parent_hash_after == parent_hash_before + refute size_after == size_before + refute timestamp_after == timestamp_before + refute total_difficulty_after == total_difficulty_before + + # only `consensus` changes in original block + assert %Block{ + consensus: false, + difficulty: ^difficulty_before, + gas_limit: ^gas_limit_before, + gas_used: ^gas_used_before, + nonce: ^nonce_before, + number: ^block_number, + parent_hash: ^parent_hash_before, + size: ^size_before, + timestamp: timestamp, + total_difficulty: ^total_difficulty_before + } = Repo.get(Block, block_hash_before) + + assert DateTime.compare(timestamp, timestamp_before) == :eq + end + + test "reorganizations nils transaction receipt fields for transactions that end up in non-consensus blocks" do + block_number = 0 + + miner_hash_before = address_hash() + from_address_hash_before = address_hash() + block_hash_before = block_hash() + index_before = 0 + + transaction_hash = transaction_hash() + + assert {:ok, _} = + Import.all(%{ + addresses: %{ + params: [ + %{hash: miner_hash_before}, + %{hash: from_address_hash_before} + ] + }, + blocks: %{ + params: [ + %{ + consensus: true, + difficulty: 0, + gas_limit: 0, + gas_used: 0, + hash: block_hash_before, + miner_hash: miner_hash_before, + nonce: 0, + number: block_number, + parent_hash: block_hash(), + size: 0, + timestamp: Timex.parse!("2019-01-01T01:00:00Z", "{ISO:Extended:Z}"), + total_difficulty: 0 + } + ] + }, + transactions: %{ + params: [ + %{ + block_hash: block_hash_before, + block_number: block_number, + from_address_hash: from_address_hash_before, + gas: 21_000, + gas_price: 1, + gas_used: 21_000, + cumulative_gas_used: 21_000, + hash: transaction_hash, + index: index_before, + input: "0x", + nonce: 0, + r: 0, + s: 0, + v: 0, + value: 0, + status: :ok + } + ], + on_conflict: :replace_all + } + }) + + %Block{consensus: true, number: ^block_number} = Repo.get(Block, block_hash_before) + transaction_before = Repo.get!(Transaction, transaction_hash) + + refute transaction_before.block_hash == nil + refute transaction_before.block_number == nil + refute transaction_before.gas_used == nil + refute transaction_before.cumulative_gas_used == nil + refute transaction_before.index == nil + refute transaction_before.status == nil + + miner_hash_after = address_hash() + from_address_hash_after = address_hash() + block_hash_after = block_hash() + + assert {:ok, _} = + Import.all(%{ + addresses: %{ + params: [ + %{hash: miner_hash_after}, + %{hash: from_address_hash_after} + ] + }, + blocks: %{ + params: [ + %{ + consensus: true, + difficulty: 1, + gas_limit: 1, + gas_used: 1, + hash: block_hash_after, + miner_hash: miner_hash_after, + nonce: 1, + number: block_number, + parent_hash: block_hash(), + size: 1, + timestamp: Timex.parse!("2019-01-01T02:00:00Z", "{ISO:Extended:Z}"), + total_difficulty: 1 + } + ] + } + }) + + transaction_after = Repo.get!(Transaction, transaction_hash) + + assert transaction_after.block_hash == nil + assert transaction_after.block_number == nil + assert transaction_after.gas_used == nil + assert transaction_after.cumulative_gas_used == nil + assert transaction_after.index == nil + assert transaction_after.status == nil + end + + test "reorganizations fork transactions that end up in non-consensus blocks" do + block_number = 0 + + miner_hash_before = address_hash() + from_address_hash_before = address_hash() + block_hash_before = block_hash() + index_before = 0 + + transaction_hash = transaction_hash() + + assert {:ok, _} = + Import.all(%{ + addresses: %{ + params: [ + %{hash: miner_hash_before}, + %{hash: from_address_hash_before} + ] + }, + blocks: %{ + params: [ + %{ + consensus: true, + difficulty: 0, + gas_limit: 0, + gas_used: 0, + hash: block_hash_before, + miner_hash: miner_hash_before, + nonce: 0, + number: block_number, + parent_hash: block_hash(), + size: 0, + timestamp: Timex.parse!("2019-01-01T01:00:00Z", "{ISO:Extended:Z}"), + total_difficulty: 0 + } + ] + }, + transactions: %{ + params: [ + %{ + block_hash: block_hash_before, + block_number: block_number, + from_address_hash: from_address_hash_before, + gas: 21_000, + gas_price: 1, + gas_used: 21_000, + cumulative_gas_used: 21_000, + hash: transaction_hash, + index: index_before, + input: "0x", + nonce: 0, + r: 0, + s: 0, + v: 0, + value: 0, + status: :ok + } + ], + on_conflict: :replace_all + } + }) + + %Block{consensus: true, number: ^block_number} = Repo.get(Block, block_hash_before) + + assert Repo.one!(from(transaction_fork in Transaction.Fork, select: fragment("COUNT(*)"))) == 0 + + miner_hash_after = address_hash() + from_address_hash_after = address_hash() + block_hash_after = block_hash() + + assert {:ok, _} = + Import.all(%{ + addresses: %{ + params: [ + %{hash: miner_hash_after}, + %{hash: from_address_hash_after} + ] + }, + blocks: %{ + params: [ + %{ + consensus: true, + difficulty: 1, + gas_limit: 1, + gas_used: 1, + hash: block_hash_after, + miner_hash: miner_hash_after, + nonce: 1, + number: block_number, + parent_hash: block_hash(), + size: 1, + timestamp: Timex.parse!("2019-01-01T02:00:00Z", "{ISO:Extended:Z}"), + total_difficulty: 1 + } + ] + } + }) + + assert Repo.one!(from(transaction_fork in Transaction.Fork, select: fragment("COUNT(*)"))) == 1 + + assert %Transaction.Fork{index: ^index_before} = + Repo.one( + from(transaction_fork in Transaction.Fork, + where: + transaction_fork.uncle_hash == ^block_hash_before and transaction_fork.hash == ^transaction_hash + ) + ) + end + + test "timeouts can be overridden" do + assert {:ok, _} = + Import.all(%{ + addresses: %{ + params: [], + timeout: 1 + }, + balances: %{ + params: [], + timeout: 1 + }, + blocks: %{ + params: [], + timeout: 1 + }, + block_second_degree_relations: %{ + params: [], + timeout: 1 + }, + internal_transactions: %{ + params: [], + timeout: 1 + }, + logs: %{ + params: [], + timeout: 1 + }, + token_transfers: %{ + params: [], + timeout: 1 + }, + tokens: %{ + params: [], + on_conflict: :replace_all, + timeout: 1 + }, + transactions: %{ + params: [], + on_conflict: :replace_all, + timeout: 1 + }, + transaction_forks: %{ + params: [], + timeout: 1 + }, + token_balances: %{ + params: [], + timeout: 1 + } + }) + end + + # https://github.com/poanetwork/blockscout/pull/833#issuecomment-426102868 regression test + test "a non-consensus block being added after a block with same number does not change the consensus block to non-consensus" do + block_number = 0 + + miner_hash_before = address_hash() + from_address_hash_before = address_hash() + block_hash_before = block_hash() + difficulty_before = Decimal.new(0) + gas_limit_before = Decimal.new(0) + gas_used_before = Decimal.new(0) + {:ok, nonce_before} = Hash.Nonce.cast(0) + parent_hash_before = block_hash() + size_before = 0 + timestamp_before = Timex.parse!("2019-01-01T01:00:00Z", "{ISO:Extended:Z}") + total_difficulty_before = Decimal.new(0) + + assert {:ok, _} = + Import.all(%{ + addresses: %{ + params: [ + %{hash: miner_hash_before}, + %{hash: from_address_hash_before} + ] + }, + blocks: %{ + params: [ + %{ + consensus: true, + difficulty: difficulty_before, + gas_limit: gas_limit_before, + gas_used: gas_used_before, + hash: block_hash_before, + miner_hash: miner_hash_before, + nonce: nonce_before, + number: block_number, + parent_hash: parent_hash_before, + size: size_before, + timestamp: timestamp_before, + total_difficulty: total_difficulty_before + } + ] + } + }) + + %Block{consensus: true, number: ^block_number} = Repo.get(Block, block_hash_before) + + miner_hash_after = address_hash() + from_address_hash_after = address_hash() + block_hash_after = block_hash() + + assert {:ok, _} = + Import.all(%{ + addresses: %{ + params: [ + %{hash: miner_hash_after}, + %{hash: from_address_hash_after} + ] + }, + blocks: %{ + params: [ + %{ + consensus: false, + difficulty: 1, + gas_limit: 1, + gas_used: 1, + hash: block_hash_after, + miner_hash: miner_hash_after, + nonce: 1, + number: block_number, + parent_hash: block_hash(), + size: 1, + timestamp: Timex.parse!("2019-01-01T02:00:00Z", "{ISO:Extended:Z}"), + total_difficulty: 1 + } + ] + } + }) + + # new block does not grab `consensus` + assert %Block{ + consensus: false, + difficulty: difficulty_after, + gas_limit: gas_limit_after, + gas_used: gas_used_after, + nonce: nonce_after, + number: ^block_number, + parent_hash: parent_hash_after, + size: size_after, + timestamp: timestamp_after, + total_difficulty: total_difficulty_after + } = Repo.get(Block, block_hash_after) + + refute difficulty_after == difficulty_before + refute gas_limit_after == gas_limit_before + refute gas_used_after == gas_used_before + refute nonce_after == nonce_before + refute parent_hash_after == parent_hash_before + refute size_after == size_before + refute timestamp_after == timestamp_before + refute total_difficulty_after == total_difficulty_before + + # nothing changes on the original consensus block + assert %Block{ + consensus: true, + difficulty: ^difficulty_before, + gas_limit: ^gas_limit_before, + gas_used: ^gas_used_before, + nonce: ^nonce_before, + number: ^block_number, + parent_hash: ^parent_hash_before, + size: ^size_before, + timestamp: timestamp, + total_difficulty: ^total_difficulty_before + } = Repo.get(Block, block_hash_before) + + assert DateTime.compare(timestamp, timestamp_before) == :eq + end end end