Merge pull request #833 from poanetwork/408

Support reorganizations
pull/845/head
Luke Imhoff 6 years ago committed by GitHub
commit f5a0e1ae57
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      apps/explorer/.sobelow-conf
  2. 147
      apps/explorer/lib/explorer/chain/import.ex
  3. 510
      apps/explorer/test/explorer/chain/import_test.exs

@ -1,7 +1,7 @@
[
verbose: false,
private: true,
skip: false,
skip: true,
exit: "low",
format: "compact",
ignore: ["Config.HTTPS"],

@ -3,9 +3,10 @@ defmodule Explorer.Chain.Import do
Bulk importing of data into `Explorer.Repo`
"""
import Ecto.Query, only: [from: 2]
import Ecto.Query, only: [from: 2, update: 2]
alias Ecto.{Changeset, Multi}
alias Ecto.Adapters.SQL
alias Explorer.Chain.{
Address,
@ -218,6 +219,9 @@ defmodule Explorer.Chain.Import do
* `:timeout` - the timeout for inserting all transactions found in the params lists across all
types. Defaults to `#{@insert_transactions_timeout}` milliseconds.
* `:with` - the changeset function on `Explorer.Chain.Transaction` to use validate `:params`.
* `:transaction_forks`
* `:params` - `list` of params for `Explorer.Chain.Transaction.Fork.changeset/2`.
* `:timeout` - the timeout for inserting all transaction forks.
* `:token_balances`
* `:params` - `list` of params for `Explorer.Chain.TokenBalance.changeset/2`
* `:timeout` - the timeout for `Repo.transaction`. Defaults to `#{@transaction_timeout}` milliseconds.
@ -377,16 +381,30 @@ defmodule Explorer.Chain.Import do
case ecto_schema_module_to_changes_list_map do
%{Block => blocks_changes} ->
timestamps = Map.fetch!(options, :timestamps)
blocks_timeout = options[:blocks][:timeout] || @insert_blocks_timeout
where_forked = where_forked(blocks_changes)
multi
|> Multi.run(:derive_transaction_forks, fn _ ->
derive_transaction_forks(%{
timeout: options[:transaction_forks][:timeout] || @insert_transaction_forks_timeout,
timestamps: timestamps,
where_forked: where_forked
})
end)
# MUST be after `:derive_transaction_forks`, which depends on values in `transactions` table
|> Multi.run(:fork_transactions, fn _ ->
fork_transactions(%{
timeout: options[:transactions][:timeout] || @insert_transactions_timeout,
timestamps: timestamps,
where_forked: where_forked
})
end)
|> Multi.run(:lose_consenus, fn _ ->
lose_consensus(blocks_changes, %{timeout: blocks_timeout, timestamps: timestamps})
end)
|> Multi.run(:blocks, fn _ ->
insert_blocks(
blocks_changes,
%{
timeout: options[:blocks][:timeout] || @insert_blocks_timeout,
timestamps: timestamps
}
)
insert_blocks(blocks_changes, %{timeout: blocks_timeout, timestamps: timestamps})
end)
|> Multi.run(:uncle_fetched_block_second_degree_relations, fn %{blocks: blocks} when is_list(blocks) ->
update_block_second_degree_relations(
@ -505,9 +523,7 @@ defmodule Explorer.Chain.Import do
when is_map(ecto_schema_module_to_changes_list) and is_map(options) do
case ecto_schema_module_to_changes_list do
%{Token => tokens_changes} ->
tokens_options = Map.fetch!(options, :tokens)
timestamps = Map.fetch!(options, :timestamps)
on_conflict = Map.fetch!(tokens_options, :on_conflict)
%{timestamps: timestamps, tokens: %{on_conflict: on_conflict}} = options
Multi.run(multi, :tokens, fn _ ->
insert_tokens(
@ -976,6 +992,115 @@ defmodule Explorer.Chain.Import do
{:ok, inserted}
end
defp fork_transactions(%{timeout: timeout, timestamps: %{updated_at: updated_at}, where_forked: where_forked}) do
query =
where_forked
|> update(
set: [
block_hash: nil,
block_number: nil,
gas_used: nil,
cumulative_gas_used: nil,
index: nil,
internal_transactions_indexed_at: nil,
status: nil,
updated_at: ^updated_at
]
)
try do
{_, result} = Repo.update_all(query, [], timeout: timeout, returning: [:hash])
{:ok, result}
rescue
postgrex_error in Postgrex.Error ->
{:error, %{exception: postgrex_error}}
end
end
defp where_forked(blocks_changes) when is_list(blocks_changes) do
Enum.reduce(blocks_changes, Transaction, fn %{consensus: consensus, hash: hash, number: number}, acc ->
case consensus do
false ->
from(transaction in acc, or_where: transaction.block_hash == ^hash and transaction.block_number == ^number)
true ->
from(transaction in acc, or_where: transaction.block_hash != ^hash and transaction.block_number == ^number)
end
end)
end
# sobelow_skip ["SQL.Query"]
defp derive_transaction_forks(%{
timeout: timeout,
timestamps: %{inserted_at: inserted_at, updated_at: updated_at},
where_forked: where_forked
}) do
query =
from(transaction in where_forked,
select: [
transaction.block_hash,
transaction.index,
transaction.hash,
type(^inserted_at, transaction.inserted_at),
type(^updated_at, transaction.updated_at)
]
)
{sql, parameters} = SQL.to_sql(:all, Repo, query)
{:ok, %Postgrex.Result{columns: ["uncle_hash", "hash"], command: :insert, rows: rows}} =
SQL.query(
Repo,
"""
INSERT INTO transaction_forks (uncle_hash, index, hash, inserted_at, updated_at)
#{sql}
RETURNING uncle_hash, hash
""",
parameters,
timeout: timeout
)
derived_transaction_forks = Enum.map(rows, fn [uncle_hash, hash] -> %{uncle_hash: uncle_hash, hash: hash} end)
{:ok, derived_transaction_forks}
end
defp lose_consensus(blocks_changes, %{timeout: timeout, timestamps: %{updated_at: updated_at}})
when is_list(blocks_changes) do
ordered_consensus_block_number =
blocks_changes
|> Enum.reduce(MapSet.new(), fn
%{consensus: true, number: number}, acc ->
MapSet.put(acc, number)
%{consensus: false}, acc ->
acc
end)
|> Enum.sort()
query =
from(
block in Block,
where: block.number in ^ordered_consensus_block_number,
update: [
set: [
consensus: false,
updated_at: ^updated_at
]
]
)
try do
{_, result} = Repo.update_all(query, [], timeout: timeout, returning: [:hash, :number])
{:ok, result}
rescue
postgrex_error in Postgrex.Error ->
{:error, %{exception: postgrex_error, consensus_block_numbers: ordered_consensus_block_number}}
end
end
defp update_block_second_degree_relations(blocks, %{timeout: timeout, timestamps: %{updated_at: updated_at}})
when is_list(blocks) do
ordered_uncle_hashes =

@ -16,6 +16,8 @@ defmodule Explorer.Chain.ImportTest do
Transaction
}
@moduletag :capturelog
doctest Import
describe "all/1" do
@ -1119,5 +1121,513 @@ defmodule Explorer.Chain.ImportTest do
assert Repo.aggregate(Transaction.Fork, :count, :hash) == 1
end
test "reorganizations can switch blocks to non-consensus with new block taking the consensus spot for the number" do
block_number = 0
miner_hash_before = address_hash()
from_address_hash_before = address_hash()
block_hash_before = block_hash()
difficulty_before = Decimal.new(0)
gas_limit_before = Decimal.new(0)
gas_used_before = Decimal.new(0)
{:ok, nonce_before} = Hash.Nonce.cast(0)
parent_hash_before = block_hash()
size_before = 0
timestamp_before = Timex.parse!("2019-01-01T01:00:00Z", "{ISO:Extended:Z}")
total_difficulty_before = Decimal.new(0)
assert {:ok, _} =
Import.all(%{
addresses: %{
params: [
%{hash: miner_hash_before},
%{hash: from_address_hash_before}
]
},
blocks: %{
params: [
%{
consensus: true,
difficulty: difficulty_before,
gas_limit: gas_limit_before,
gas_used: gas_used_before,
hash: block_hash_before,
miner_hash: miner_hash_before,
nonce: nonce_before,
number: block_number,
parent_hash: parent_hash_before,
size: size_before,
timestamp: timestamp_before,
total_difficulty: total_difficulty_before
}
]
}
})
%Block{consensus: true, number: ^block_number} = Repo.get(Block, block_hash_before)
miner_hash_after = address_hash()
from_address_hash_after = address_hash()
block_hash_after = block_hash()
assert {:ok, _} =
Import.all(%{
addresses: %{
params: [
%{hash: miner_hash_after},
%{hash: from_address_hash_after}
]
},
blocks: %{
params: [
%{
consensus: true,
difficulty: 1,
gas_limit: 1,
gas_used: 1,
hash: block_hash_after,
miner_hash: miner_hash_after,
nonce: 1,
number: block_number,
parent_hash: block_hash(),
size: 1,
timestamp: Timex.parse!("2019-01-01T02:00:00Z", "{ISO:Extended:Z}"),
total_difficulty: 1
}
]
}
})
# new block grabs `consensus`
assert %Block{
consensus: true,
difficulty: difficulty_after,
gas_limit: gas_limit_after,
gas_used: gas_used_after,
nonce: nonce_after,
number: ^block_number,
parent_hash: parent_hash_after,
size: size_after,
timestamp: timestamp_after,
total_difficulty: total_difficulty_after
} = Repo.get(Block, block_hash_after)
refute difficulty_after == difficulty_before
refute gas_limit_after == gas_limit_before
refute gas_used_after == gas_used_before
refute nonce_after == nonce_before
refute parent_hash_after == parent_hash_before
refute size_after == size_before
refute timestamp_after == timestamp_before
refute total_difficulty_after == total_difficulty_before
# only `consensus` changes in original block
assert %Block{
consensus: false,
difficulty: ^difficulty_before,
gas_limit: ^gas_limit_before,
gas_used: ^gas_used_before,
nonce: ^nonce_before,
number: ^block_number,
parent_hash: ^parent_hash_before,
size: ^size_before,
timestamp: timestamp,
total_difficulty: ^total_difficulty_before
} = Repo.get(Block, block_hash_before)
assert DateTime.compare(timestamp, timestamp_before) == :eq
end
test "reorganizations nils transaction receipt fields for transactions that end up in non-consensus blocks" do
block_number = 0
miner_hash_before = address_hash()
from_address_hash_before = address_hash()
block_hash_before = block_hash()
index_before = 0
transaction_hash = transaction_hash()
assert {:ok, _} =
Import.all(%{
addresses: %{
params: [
%{hash: miner_hash_before},
%{hash: from_address_hash_before}
]
},
blocks: %{
params: [
%{
consensus: true,
difficulty: 0,
gas_limit: 0,
gas_used: 0,
hash: block_hash_before,
miner_hash: miner_hash_before,
nonce: 0,
number: block_number,
parent_hash: block_hash(),
size: 0,
timestamp: Timex.parse!("2019-01-01T01:00:00Z", "{ISO:Extended:Z}"),
total_difficulty: 0
}
]
},
transactions: %{
params: [
%{
block_hash: block_hash_before,
block_number: block_number,
from_address_hash: from_address_hash_before,
gas: 21_000,
gas_price: 1,
gas_used: 21_000,
cumulative_gas_used: 21_000,
hash: transaction_hash,
index: index_before,
input: "0x",
nonce: 0,
r: 0,
s: 0,
v: 0,
value: 0,
status: :ok
}
],
on_conflict: :replace_all
}
})
%Block{consensus: true, number: ^block_number} = Repo.get(Block, block_hash_before)
transaction_before = Repo.get!(Transaction, transaction_hash)
refute transaction_before.block_hash == nil
refute transaction_before.block_number == nil
refute transaction_before.gas_used == nil
refute transaction_before.cumulative_gas_used == nil
refute transaction_before.index == nil
refute transaction_before.status == nil
miner_hash_after = address_hash()
from_address_hash_after = address_hash()
block_hash_after = block_hash()
assert {:ok, _} =
Import.all(%{
addresses: %{
params: [
%{hash: miner_hash_after},
%{hash: from_address_hash_after}
]
},
blocks: %{
params: [
%{
consensus: true,
difficulty: 1,
gas_limit: 1,
gas_used: 1,
hash: block_hash_after,
miner_hash: miner_hash_after,
nonce: 1,
number: block_number,
parent_hash: block_hash(),
size: 1,
timestamp: Timex.parse!("2019-01-01T02:00:00Z", "{ISO:Extended:Z}"),
total_difficulty: 1
}
]
}
})
transaction_after = Repo.get!(Transaction, transaction_hash)
assert transaction_after.block_hash == nil
assert transaction_after.block_number == nil
assert transaction_after.gas_used == nil
assert transaction_after.cumulative_gas_used == nil
assert transaction_after.index == nil
assert transaction_after.status == nil
end
test "reorganizations fork transactions that end up in non-consensus blocks" do
block_number = 0
miner_hash_before = address_hash()
from_address_hash_before = address_hash()
block_hash_before = block_hash()
index_before = 0
transaction_hash = transaction_hash()
assert {:ok, _} =
Import.all(%{
addresses: %{
params: [
%{hash: miner_hash_before},
%{hash: from_address_hash_before}
]
},
blocks: %{
params: [
%{
consensus: true,
difficulty: 0,
gas_limit: 0,
gas_used: 0,
hash: block_hash_before,
miner_hash: miner_hash_before,
nonce: 0,
number: block_number,
parent_hash: block_hash(),
size: 0,
timestamp: Timex.parse!("2019-01-01T01:00:00Z", "{ISO:Extended:Z}"),
total_difficulty: 0
}
]
},
transactions: %{
params: [
%{
block_hash: block_hash_before,
block_number: block_number,
from_address_hash: from_address_hash_before,
gas: 21_000,
gas_price: 1,
gas_used: 21_000,
cumulative_gas_used: 21_000,
hash: transaction_hash,
index: index_before,
input: "0x",
nonce: 0,
r: 0,
s: 0,
v: 0,
value: 0,
status: :ok
}
],
on_conflict: :replace_all
}
})
%Block{consensus: true, number: ^block_number} = Repo.get(Block, block_hash_before)
assert Repo.one!(from(transaction_fork in Transaction.Fork, select: fragment("COUNT(*)"))) == 0
miner_hash_after = address_hash()
from_address_hash_after = address_hash()
block_hash_after = block_hash()
assert {:ok, _} =
Import.all(%{
addresses: %{
params: [
%{hash: miner_hash_after},
%{hash: from_address_hash_after}
]
},
blocks: %{
params: [
%{
consensus: true,
difficulty: 1,
gas_limit: 1,
gas_used: 1,
hash: block_hash_after,
miner_hash: miner_hash_after,
nonce: 1,
number: block_number,
parent_hash: block_hash(),
size: 1,
timestamp: Timex.parse!("2019-01-01T02:00:00Z", "{ISO:Extended:Z}"),
total_difficulty: 1
}
]
}
})
assert Repo.one!(from(transaction_fork in Transaction.Fork, select: fragment("COUNT(*)"))) == 1
assert %Transaction.Fork{index: ^index_before} =
Repo.one(
from(transaction_fork in Transaction.Fork,
where:
transaction_fork.uncle_hash == ^block_hash_before and transaction_fork.hash == ^transaction_hash
)
)
end
test "timeouts can be overridden" do
assert {:ok, _} =
Import.all(%{
addresses: %{
params: [],
timeout: 1
},
balances: %{
params: [],
timeout: 1
},
blocks: %{
params: [],
timeout: 1
},
block_second_degree_relations: %{
params: [],
timeout: 1
},
internal_transactions: %{
params: [],
timeout: 1
},
logs: %{
params: [],
timeout: 1
},
token_transfers: %{
params: [],
timeout: 1
},
tokens: %{
params: [],
on_conflict: :replace_all,
timeout: 1
},
transactions: %{
params: [],
on_conflict: :replace_all,
timeout: 1
},
transaction_forks: %{
params: [],
timeout: 1
},
token_balances: %{
params: [],
timeout: 1
}
})
end
# https://github.com/poanetwork/blockscout/pull/833#issuecomment-426102868 regression test
test "a non-consensus block being added after a block with same number does not change the consensus block to non-consensus" do
block_number = 0
miner_hash_before = address_hash()
from_address_hash_before = address_hash()
block_hash_before = block_hash()
difficulty_before = Decimal.new(0)
gas_limit_before = Decimal.new(0)
gas_used_before = Decimal.new(0)
{:ok, nonce_before} = Hash.Nonce.cast(0)
parent_hash_before = block_hash()
size_before = 0
timestamp_before = Timex.parse!("2019-01-01T01:00:00Z", "{ISO:Extended:Z}")
total_difficulty_before = Decimal.new(0)
assert {:ok, _} =
Import.all(%{
addresses: %{
params: [
%{hash: miner_hash_before},
%{hash: from_address_hash_before}
]
},
blocks: %{
params: [
%{
consensus: true,
difficulty: difficulty_before,
gas_limit: gas_limit_before,
gas_used: gas_used_before,
hash: block_hash_before,
miner_hash: miner_hash_before,
nonce: nonce_before,
number: block_number,
parent_hash: parent_hash_before,
size: size_before,
timestamp: timestamp_before,
total_difficulty: total_difficulty_before
}
]
}
})
%Block{consensus: true, number: ^block_number} = Repo.get(Block, block_hash_before)
miner_hash_after = address_hash()
from_address_hash_after = address_hash()
block_hash_after = block_hash()
assert {:ok, _} =
Import.all(%{
addresses: %{
params: [
%{hash: miner_hash_after},
%{hash: from_address_hash_after}
]
},
blocks: %{
params: [
%{
consensus: false,
difficulty: 1,
gas_limit: 1,
gas_used: 1,
hash: block_hash_after,
miner_hash: miner_hash_after,
nonce: 1,
number: block_number,
parent_hash: block_hash(),
size: 1,
timestamp: Timex.parse!("2019-01-01T02:00:00Z", "{ISO:Extended:Z}"),
total_difficulty: 1
}
]
}
})
# new block does not grab `consensus`
assert %Block{
consensus: false,
difficulty: difficulty_after,
gas_limit: gas_limit_after,
gas_used: gas_used_after,
nonce: nonce_after,
number: ^block_number,
parent_hash: parent_hash_after,
size: size_after,
timestamp: timestamp_after,
total_difficulty: total_difficulty_after
} = Repo.get(Block, block_hash_after)
refute difficulty_after == difficulty_before
refute gas_limit_after == gas_limit_before
refute gas_used_after == gas_used_before
refute nonce_after == nonce_before
refute parent_hash_after == parent_hash_before
refute size_after == size_before
refute timestamp_after == timestamp_before
refute total_difficulty_after == total_difficulty_before
# nothing changes on the original consensus block
assert %Block{
consensus: true,
difficulty: ^difficulty_before,
gas_limit: ^gas_limit_before,
gas_used: ^gas_used_before,
nonce: ^nonce_before,
number: ^block_number,
parent_hash: ^parent_hash_before,
size: ^size_before,
timestamp: timestamp,
total_difficulty: ^total_difficulty_before
} = Repo.get(Block, block_hash_before)
assert DateTime.compare(timestamp, timestamp_before) == :eq
end
end
end

Loading…
Cancel
Save