Merge pull request #505 from poanetwork/491

Use nil Balance value to indicate unfetched
pull/528/head
Luke Imhoff 6 years ago committed by GitHub
commit 3e4a48785e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 232
      apps/explorer/lib/explorer/chain.ex
  2. 10
      apps/explorer/lib/explorer/chain/balance.ex
  3. 22
      apps/explorer/lib/explorer/chain/import.ex
  4. 14
      apps/explorer/priv/repo/migrations/20180717204948_create_balances.exs
  5. 7
      apps/explorer/test/explorer/chain/balance_test.exs
  6. 55
      apps/explorer/test/explorer/chain_test.exs
  7. 13
      apps/explorer/test/explorer/market/history/cataloger_test.exs
  8. 28
      apps/explorer/test/support/factory.ex
  9. 16
      apps/explorer_web/test/explorer_web/features/viewing_chain_test.exs
  10. 27
      apps/explorer_web/test/explorer_web/features/viewing_transactions_test.exs
  11. 2
      apps/explorer_web/test/support/feature_case.ex
  12. 8
      apps/indexer/lib/indexer/balance_fetcher.ex
  13. 90
      apps/indexer/lib/indexer/balances.ex
  14. 31
      apps/indexer/lib/indexer/block_fetcher.ex
  15. 6
      apps/indexer/lib/indexer/block_fetcher/catchup.ex
  16. 146
      apps/indexer/lib/indexer/block_fetcher/realtime.ex
  17. 2
      apps/indexer/test/indexer/balance_fetcher_test.exs
  18. 184
      apps/indexer/test/indexer/balances_test.exs
  19. 3
      apps/indexer/test/indexer/supervisor_test.exs

@ -733,211 +733,21 @@ defmodule Explorer.Chain do
When there are addresses, the `reducer` is called for each `t:Explorer.Chain.Address.t/0` `hash` and all
`t:Explorer.Chain.Block.t/0` `block_number` that address is mentioned.
An `t:Explorer.Chain.Address.t/0` `hash` can be used as an `t:Explorer.Chain.Block.t/0` `miner_hash`.
iex> {:ok, miner_hash} = Explorer.Chain.string_to_address_hash("0xe8ddc5c7a2d2f0d7a9798459c0104fdf5e987aca")
iex> miner = insert(:address, hash: miner_hash)
iex> insert(:block, miner: miner, number: 34)
iex> {:ok, balance_fields_list} = Explorer.Chain.stream_unfetched_balances(
...> [],
...> fn balance_fields, acc -> [balance_fields | acc] end
...> )
iex> balance_fields_list
[
%{
address_hash: %Explorer.Chain.Hash{
byte_count: 20,
bytes: <<232, 221, 197, 199, 162, 210, 240, 215, 169, 121, 132,
89, 192, 16, 79, 223, 94, 152, 122, 202>>
},
block_number: 34
}
]
An `t:Explorer.Chain.Address.t/0` `hash` can be used as an `t:Explorer.Chain.Transaction.t/0` `from_address_hash`.
iex> {:ok, from_address_hash} =
...> Explorer.Chain.string_to_address_hash("0xe8ddc5c7a2d2f0d7a9798459c0104fdf5e987aca")
iex> from_address = insert(:address, hash: from_address_hash)
iex> block = insert(:block, number: 34)
iex> :transaction |>
...> insert(from_address: from_address) |>
...> with_block(block)
iex> {:ok, balance_fields_list} = Explorer.Chain.stream_unfetched_balances(
...> [],
...> fn balance_fields, acc -> [balance_fields | acc] end
...> )
iex> %{
...> address_hash: %Explorer.Chain.Hash{
...> byte_count: 20,
...> bytes: <<232, 221, 197, 199, 162, 210, 240, 215, 169, 121, 132,
...> 89, 192, 16, 79, 223, 94, 152, 122, 202>>
...> },
...> block_number: 34
...> } in balance_fields_list
true
An `t:Explorer.Chain.Address.t/0` `hash` can be used as an `t:Explorer.Chain.Transaction.t/0` `to_address_hash`.
iex> {:ok, to_address_hash} = Explorer.Chain.string_to_address_hash("0x8e854802d695269a6f1f3fcabb2111d2f5a0e6f9")
iex> to_address = insert(:address, hash: to_address_hash)
iex> block = insert(:block, number: 34)
iex> :transaction |>
...> insert(to_address: to_address) |>
...> with_block(block)
iex> {:ok, balance_fields_list} = Explorer.Chain.stream_unfetched_balances(
...> [],
...> fn balance_fields, acc -> [balance_fields | acc] end
...> )
iex> %{
...> address_hash: %Explorer.Chain.Hash{
...> byte_count: 20,
...> bytes: <<142, 133, 72, 2, 214, 149, 38, 154, 111, 31, 63, 202,
...> 187, 33, 17, 210, 245, 160, 230, 249>>
...> },
...> block_number: 34
...> } in balance_fields_list
true
An `t:Explorer.Chain.Address.t/0` `hash` can be used as an `t:Explorer.Chain.Log.t/0` `address_hash`.
iex> {:ok, address_hash} = Explorer.Chain.string_to_address_hash("0x8bf38d4764929064f2d4d3a56520a76ab3df415b")
iex> address = insert(:address, hash: address_hash)
iex> block = insert(:block, number: 37)
iex> transaction =
...> :transaction |>
...> insert() |>
...> with_block(block)
...> insert(:log, address: address, transaction: transaction)
iex> {:ok, balance_fields_list} = Explorer.Chain.stream_unfetched_balances(
...> [],
...> fn balance_fields, acc -> [balance_fields | acc] end
...> )
iex> %{
iex> address_hash: %Explorer.Chain.Hash{
iex> byte_count: 20,
iex> bytes: <<139, 243, 141, 71, 100, 146, 144, 100, 242, 212, 211,
iex> 165, 101, 32, 167, 106, 179, 223, 65, 91>>
iex> },
iex> block_number: 37
iex> } in balance_fields_list
true
An `t:Explorer.Chain.Address.t/0` `hash` can be used as an `t:Explorer.Chain.InternalTransaction.t/0`
`created_contract_address_hash`.
iex> {:ok, created_contract_address_hash} =
...> Explorer.Chain.string_to_address_hash("0xffc87239eb0267bc3ca2cd51d12fbf278e02ccb4")
iex> created_contract_address = insert(:address, hash: created_contract_address_hash)
iex> block = insert(:block, number: 37)
iex> transaction =
...> :transaction |>
...> insert() |>
...> with_block(block)
iex> insert(
...> :internal_transaction_create,
...> created_contract_address: created_contract_address,
...> index: 0,
...> transaction: transaction
...> )
iex> {:ok, balance_fields_list} = Explorer.Chain.stream_unfetched_balances(
...> [],
...> fn balance_fields, acc -> [balance_fields | acc] end
...> )
iex> %{
...> address_hash: %Explorer.Chain.Hash{
...> byte_count: 20,
...> bytes: <<255, 200, 114, 57, 235, 2, 103, 188, 60, 162, 205, 81,
...> 209, 47, 191, 39, 142, 2, 204, 180>>
...> },
...> block_number: 37
...> } in balance_fields_list
true
An `t:Explorer.Chain.Address.t/0` `hash` can be used as an `t:Explorer.Chain.InternalTransaction.t/0`
`from_address_hash`.
iex> {:ok, from_address_hash} =
...> Explorer.Chain.string_to_address_hash("0xe8ddc5c7a2d2f0d7a9798459c0104fdf5e987aca")
iex> from_address = insert(:address, hash: from_address_hash)
iex> block = insert(:block, number: 37)
iex> transaction =
...> :transaction |>
...> insert() |>
...> with_block(block)
iex> insert(
...> :internal_transaction_create,
...> from_address: from_address,
...> index: 0,
...> transaction: transaction
...> )
iex> {:ok, balance_fields_list} = Explorer.Chain.stream_unfetched_balances(
...> [],
...> fn balance_fields, acc -> [balance_fields | acc] end
...> )
iex> %{
...> address_hash: %Explorer.Chain.Hash{
...> byte_count: 20,
...> bytes: <<232, 221, 197, 199, 162, 210, 240, 215, 169, 121, 132,
...> 89, 192, 16, 79, 223, 94, 152, 122, 202>>
...> },
...> block_number: 37
...> } in balance_fields_list
true
An `t:Explorer.Chain.Address.t/0` `hash` can be used as an `t:Explorer.Chain.InternalTransaction.t/0`
`to_address_hash`.
iex> {:ok, to_address_hash} =
...> Explorer.Chain.string_to_address_hash("0xfdca0da4158740a93693441b35809b5bb463e527")
iex> to_address = insert(:address, hash: to_address_hash)
iex> block = insert(:block, number: 38)
iex> transaction =
...> :transaction |>
...> insert() |>
...> with_block(block)
iex> insert(
...> :internal_transaction,
...> index: 0,
...> to_address: to_address,
...> transaction: transaction
...> )
iex> {:ok, balance_fields_list} = Explorer.Chain.stream_unfetched_balances(
...> [],
...> fn balance_fields, acc -> [balance_fields | acc] end
...> )
iex> %{
...> address_hash: %Explorer.Chain.Hash{
...> byte_count: 20,
...> bytes: <<253, 202, 13, 164, 21, 135, 64, 169, 54, 147, 68, 27,
...> 53, 128, 155, 91, 180, 99, 229, 39>>
...> },
...> block_number: 38
...> } in balance_fields_list
true
| Address Hash Schema | Address Hash Field | Block Number Schema | Block Number Field |
|--------------------------------------------|---------------------------------|------------------------------------|--------------------|
| `t:Explorer.Chain.Block.t/0` | `miner_hash` | `t:Explorer.Chain.Block.t/0` | `number` |
| `t:Explorer.Chain.Transaction.t/0` | `from_address_hash` | `t:Explorer.Chain.Transaction.t/0` | `block_number` |
| `t:Explorer.Chain.Transaction.t/0` | `to_address_hash` | `t:Explorer.Chain.Transaction.t/0` | `block_number` |
| `t:Explorer.Chain.Log.t/0` | `address_hash` | `t:Explorer.Chain.Transaction.t/0` | `block_number` |
| `t:Explorer.Chain.InternalTransaction.t/0` | `created_contract_address_hash` | `t:Explorer.Chain.Transaction.t/0` | `block_number` |
| `t:Explorer.Chain.InternalTransaction.t/0` | `from_address_hash` | `t:Explorer.Chain.Transaction.t/0` | `block_number` |
| `t:Explorer.Chain.InternalTransaction.t/0` | `to_address_hash` | `t:Explorer.Chain.Transaction.t/0` | `block_number` |
Pending `t:Explorer.Chain.Transaction.t/0` `from_address_hash` and `to_address_hash` aren't returned because they
don't have an associated block number.
iex> insert(:transaction)
iex> {:ok, balance_fields_list} = Explorer.Chain.stream_unfetched_balances(
...> [],
...> fn balance_fields, acc -> [balance_fields | acc] end
...> )
iex> balance_fields_list
[]
When there are no addresses, the `reducer` is never called and the `initial` is returned in an `:ok` tuple.
iex> {:ok, pid} = Agent.start_link(fn -> 0 end)
iex> Explorer.Chain.stream_unfetched_balances([], fn address_fields, acc ->
...> Agent.update(pid, &(&1 + 1))
...> [address_fields | acc]
...> end)
{:ok, []}
iex> Agent.get(pid, & &1)
0
When an `t:Explorer.Chain.Address.t/0` `hash` is used multiple times, all unique `t:Explorer.Chain.Block.t/0` `number`
will be returned.
"""
@ -952,27 +762,9 @@ defmodule Explorer.Chain do
fn ->
query =
from(
address in Address,
left_join: internal_transaction in InternalTransaction,
on:
address.hash in [
internal_transaction.created_contract_address_hash,
internal_transaction.from_address_hash,
internal_transaction.to_address_hash
],
left_join: log in Log,
on: log.address_hash == address.hash,
left_join: transaction in Transaction,
on:
transaction.hash in [internal_transaction.transaction_hash, log.transaction_hash] or
address.hash in [transaction.from_address_hash, transaction.to_address_hash],
left_join: block in Block,
on: block.hash == transaction.block_hash or block.miner_hash == address.hash,
left_join: balance in Balance,
on: balance.address_hash == address.hash and balance.block_number == block.number,
where: not is_nil(block.number) and is_nil(balance.address_hash) and is_nil(balance.block_number),
group_by: [address.hash, block.number],
select: %{address_hash: address.hash, block_number: block.number}
balance in Balance,
where: is_nil(balance.value_fetched_at),
select: %{address_hash: balance.address_hash, block_number: balance.block_number}
)
query

@ -8,7 +8,9 @@ defmodule Explorer.Chain.Balance do
alias Explorer.Chain.{Address, Block, Hash, Wei}
@required_fields ~w(address_hash block_number value)a
@optional_fields ~w(value value_fetched_at)a
@required_fields ~w(address_hash block_number)a
@allowed_fields @optional_fields ++ @required_fields
@typedoc """
* `address` - the `t:Explorer.Chain.Address.t/0` with `value` at end of `block_number`.
@ -21,6 +23,7 @@ defmodule Explorer.Chain.Balance do
* `value` - the value of `address` at the end of the `t:Explorer.Chain.Block.block_number/0` for the
`t:Explorer.Chain.Block.t/0`. When `block_number` is the greatest `t:Explorer.Chain.Block.block_number/0` for a
given `address`, the `t:Explorer.Chain.Address.t/0` `fetched_balance` will match this value.
* `value_fetched_at` - when `value` was fetched.
"""
@type t :: %__MODULE__{
address: %Ecto.Association.NotLoaded{} | Address.t(),
@ -28,13 +31,14 @@ defmodule Explorer.Chain.Balance do
block_number: Block.block_number(),
inserted_at: DateTime.t(),
updated_at: DateTime.t(),
value: Wei.t()
value: Wei.t() | nil
}
@primary_key false
schema "balances" do
field(:block_number, :integer)
field(:value, Wei)
field(:value_fetched_at, :utc_datetime)
timestamps()
@ -43,7 +47,7 @@ defmodule Explorer.Chain.Balance do
def changeset(%__MODULE__{} = balance, params) do
balance
|> cast(params, @required_fields)
|> cast(params, @allowed_fields)
|> validate_required(@required_fields)
|> foreign_key_constraint(:address_hash)
|> unique_constraint(:block_number, name: :balances_address_hash_block_number_index)

@ -567,12 +567,28 @@ defmodule Explorer.Chain.Import do
value:
fragment(
"""
CASE WHEN EXCLUDED.updated_at > ? THEN EXCLUDED.value
ELSE ?
CASE WHEN EXCLUDED.value IS NOT NULL AND (? IS NULL OR EXCLUDED.value_fetched_at > ?) THEN
EXCLUDED.value
ELSE
?
END
""",
balance.updated_at,
balance.value_fetched_at,
balance.value_fetched_at,
balance.value
),
value_fetched_at:
fragment(
"""
CASE WHEN EXCLUDED.value IS NOT NULL AND (? IS NULL OR EXCLUDED.value_fetched_at > ?) THEN
EXCLUDED.value_fetched_at
ELSE
?
END
""",
balance.value_fetched_at,
balance.value_fetched_at,
balance.value_fetched_at
)
]
]

@ -5,11 +5,23 @@ defmodule Explorer.Repo.Migrations.CreateBalances do
create table(:balances, primary_key: false) do
add(:address_hash, references(:addresses, column: :hash, type: :bytea), null: false)
add(:block_number, :bigint, null: false)
add(:value, :numeric, precision: 100, null: false)
# null until fetched
add(:value, :numeric, precision: 100, default: fragment("NULL"), null: true)
add(:value_fetched_at, :utc_datetime, default: fragment("NULL"), null: true)
timestamps(null: false, type: :utc_datetime)
end
create(unique_index(:balances, [:address_hash, :block_number]))
create(
unique_index(
:balances,
[:address_hash, :block_number],
name: :unfetched_balances,
where: "value_fetched_at IS NULL"
)
)
end
end

@ -6,19 +6,18 @@ defmodule Explorer.Chain.BalanceTest do
describe "changeset/2" do
test "is valid with address_hash, block_number, and value" do
params = params_for(:balance)
params = params_for(:fetched_balance)
assert %Changeset{valid?: true} = Balance.changeset(%Balance{}, params)
end
test "address_hash, block_number, and value is required" do
test "address_hash and block_number is required" do
assert %Changeset{valid?: false, errors: errors} = Balance.changeset(%Balance{}, %{})
assert is_list(errors)
assert length(errors) == 3
assert length(errors) == 2
assert Keyword.get_values(errors, :address_hash) == [{"can't be blank", [validation: :required]}]
assert Keyword.get_values(errors, :block_number) == [{"can't be blank", [validation: :required]}]
assert Keyword.get_values(errors, :value) == [{"can't be blank", [validation: :required]}]
end
end
end

@ -1125,20 +1125,21 @@ defmodule Explorer.ChainTest do
end
describe "stream_unfetched_balances/2" do
test "with existing `t:Explorer.Chain.Balance.t/0` with same `address_hash` and `block_number` " <>
test "with `t:Explorer.Chain.Balance.t/0` with value_fetched_at with same `address_hash` and `block_number` " <>
"does not return `t:Explorer.Chain.Block.t/0` `miner_hash`" do
%Address{hash: miner_hash} = miner = insert(:address)
%Block{number: block_number} = insert(:block, miner: miner)
balance = insert(:unfetched_balance, address_hash: miner_hash, block_number: block_number)
assert {:ok, [%{address_hash: ^miner_hash, block_number: ^block_number}]} =
Chain.stream_unfetched_balances([], &[&1 | &2])
insert(:balance, address_hash: miner_hash, block_number: block_number)
update_balance_value(balance, 1)
assert {:ok, []} = Chain.stream_unfetched_balances([], &[&1 | &2])
end
test "with existing `t:Explorer.Chain.Balance.t/0` with same `address_hash` and `block_number` " <>
test "with `t:Explorer.Chain.Balance.t/0` with value_fetched_at with same `address_hash` and `block_number` " <>
"does not return `t:Explorer.Chain.Transaction.t/0` `from_address_hash`" do
%Address{hash: from_address_hash} = from_address = insert(:address)
%Block{number: block_number} = block = insert(:block)
@ -1147,6 +1148,8 @@ defmodule Explorer.ChainTest do
|> insert(from_address: from_address)
|> with_block(block)
balance = insert(:unfetched_balance, address_hash: from_address_hash, block_number: block_number)
{:ok, balance_fields_list} =
Explorer.Chain.stream_unfetched_balances(
[],
@ -1155,7 +1158,7 @@ defmodule Explorer.ChainTest do
assert %{address_hash: from_address_hash, block_number: block_number} in balance_fields_list
insert(:balance, address_hash: from_address_hash, block_number: block_number)
update_balance_value(balance, 1)
{:ok, balance_fields_list} =
Explorer.Chain.stream_unfetched_balances(
@ -1166,7 +1169,7 @@ defmodule Explorer.ChainTest do
refute %{address_hash: from_address_hash, block_number: block_number} in balance_fields_list
end
test "with existing `t:Explorer.Chain.Balance.t/0` with same `address_hash` and `block_number` " <>
test "with `t:Explorer.Chain.Balance.t/0` with value_fetched_at with same `address_hash` and `block_number` " <>
"does not return `t:Explorer.Chain.Transaction.t/0` `to_address_hash`" do
%Address{hash: to_address_hash} = to_address = insert(:address)
%Block{number: block_number} = block = insert(:block)
@ -1175,6 +1178,8 @@ defmodule Explorer.ChainTest do
|> insert(to_address: to_address)
|> with_block(block)
balance = insert(:unfetched_balance, address_hash: to_address_hash, block_number: block_number)
{:ok, balance_fields_list} =
Explorer.Chain.stream_unfetched_balances(
[],
@ -1183,7 +1188,7 @@ defmodule Explorer.ChainTest do
assert %{address_hash: to_address_hash, block_number: block_number} in balance_fields_list
insert(:balance, address_hash: to_address_hash, block_number: block_number)
update_balance_value(balance, 1)
{:ok, balance_fields_list} =
Explorer.Chain.stream_unfetched_balances(
@ -1194,7 +1199,7 @@ defmodule Explorer.ChainTest do
refute %{address_hash: to_address_hash, block_number: block_number} in balance_fields_list
end
test "with existing `t:Explorer.Chain.Balance.t/0` with same `address_hash` and `block_number` " <>
test "with `t:Explorer.Chain.Balance.t/0` with value_fetched_at with same `address_hash` and `block_number` " <>
"does not return `t:Explorer.Chain.Log.t/0` `address_hash`" do
address = insert(:address)
block = insert(:block)
@ -1206,6 +1211,8 @@ defmodule Explorer.ChainTest do
insert(:log, address: address, transaction: transaction)
balance = insert(:unfetched_balance, address_hash: address.hash, block_number: block.number)
{:ok, balance_fields_list} =
Explorer.Chain.stream_unfetched_balances(
[],
@ -1217,7 +1224,7 @@ defmodule Explorer.ChainTest do
block_number: block.number
} in balance_fields_list
insert(:balance, address_hash: address.hash, block_number: block.number)
update_balance_value(balance, 1)
{:ok, balance_fields_list} =
Explorer.Chain.stream_unfetched_balances(
@ -1231,7 +1238,7 @@ defmodule Explorer.ChainTest do
} in balance_fields_list
end
test "with existing `t:Explorer.Chain.Balance.t/0` with same `address_hash` and `block_number` " <>
test "with `t:Explorer.Chain.Balance.t/0` with value_fetched_at with same `address_hash` and `block_number` " <>
"does not return `t:Explorer.Chain.InternalTransaction.t/0` `created_contract_address_hash`" do
created_contract_address = insert(:address)
block = insert(:block)
@ -1248,6 +1255,8 @@ defmodule Explorer.ChainTest do
transaction: transaction
)
balance = insert(:unfetched_balance, address_hash: created_contract_address.hash, block_number: block.number)
{:ok, balance_fields_list} =
Explorer.Chain.stream_unfetched_balances(
[],
@ -1259,7 +1268,7 @@ defmodule Explorer.ChainTest do
block_number: block.number
} in balance_fields_list
insert(:balance, address_hash: created_contract_address.hash, block_number: block.number)
update_balance_value(balance, 1)
{:ok, balance_fields_list} =
Explorer.Chain.stream_unfetched_balances(
@ -1273,7 +1282,7 @@ defmodule Explorer.ChainTest do
} in balance_fields_list
end
test "with existing `t:Explorer.Chain.Balance.t/0` with same `address_hash` and `block_number` " <>
test "with `t:Explorer.Chain.Balance.t/0` with value_fetched_at with same `address_hash` and `block_number` " <>
"does not return `t:Explorer.Chain.InternalTransaction.t/0` `from_address_hash`" do
from_address = insert(:address)
block = insert(:block)
@ -1290,6 +1299,8 @@ defmodule Explorer.ChainTest do
transaction: transaction
)
balance = insert(:unfetched_balance, address_hash: from_address.hash, block_number: block.number)
{:ok, balance_fields_list} =
Explorer.Chain.stream_unfetched_balances(
[],
@ -1298,7 +1309,7 @@ defmodule Explorer.ChainTest do
assert %{address_hash: from_address.hash, block_number: block.number} in balance_fields_list
insert(:balance, address_hash: from_address.hash, block_number: block.number)
update_balance_value(balance, 1)
{:ok, balance_fields_list} =
Explorer.Chain.stream_unfetched_balances(
@ -1309,7 +1320,7 @@ defmodule Explorer.ChainTest do
refute %{address_hash: from_address.hash, block_number: block.number} in balance_fields_list
end
test "with existing `t:Explorer.Chain.Balance.t/0` with same `address_hash` and `block_number` " <>
test "with `t:Explorer.Chain.Balance.t/0` with value_fetched_at with same `address_hash` and `block_number` " <>
"does not return `t:Explorer.Chain.InternalTransaction.t/0` `to_address_hash`" do
to_address = insert(:address)
block = insert(:block)
@ -1326,6 +1337,8 @@ defmodule Explorer.ChainTest do
transaction: transaction
)
balance = insert(:unfetched_balance, address_hash: to_address.hash, block_number: block.number)
{:ok, balance_fields_list} =
Explorer.Chain.stream_unfetched_balances(
[],
@ -1334,7 +1347,7 @@ defmodule Explorer.ChainTest do
assert %{address_hash: to_address.hash, block_number: block.number} in balance_fields_list
insert(:balance, address_hash: to_address.hash, block_number: block.number)
update_balance_value(balance, 1)
{:ok, balance_fields_list} =
Explorer.Chain.stream_unfetched_balances(
@ -1349,18 +1362,24 @@ defmodule Explorer.ChainTest do
miner = insert(:address)
mined_block = insert(:block, miner: miner)
insert(:unfetched_balance, address_hash: miner.hash, block_number: mined_block.number)
from_transaction_block = insert(:block)
:transaction
|> insert(from_address: miner)
|> with_block(from_transaction_block)
insert(:unfetched_balance, address_hash: miner.hash, block_number: from_transaction_block.number)
to_transaction_block = insert(:block)
:transaction
|> insert(to_address: miner)
|> with_block(to_transaction_block)
insert(:unfetched_balance, address_hash: miner.hash, block_number: to_transaction_block.number)
log_block = insert(:block)
log_transaction =
@ -1369,6 +1388,8 @@ defmodule Explorer.ChainTest do
|> with_block(log_block)
insert(:log, address: miner, transaction: log_transaction)
insert(:unfetched_balance, address_hash: miner.hash, block_number: log_block.number)
from_internal_transaction_block = insert(:block)
from_internal_transaction_transaction =
@ -1383,6 +1404,8 @@ defmodule Explorer.ChainTest do
transaction: from_internal_transaction_transaction
)
insert(:unfetched_balance, address_hash: miner.hash, block_number: from_internal_transaction_block.number)
to_internal_transaction_block = insert(:block)
to_internal_transaction_transaction =
@ -1397,6 +1420,8 @@ defmodule Explorer.ChainTest do
transaction: to_internal_transaction_transaction
)
insert(:unfetched_balance, address_hash: miner.hash, block_number: to_internal_transaction_block.number)
{:ok, balance_fields_list} =
Explorer.Chain.stream_unfetched_balances(
[],
@ -1420,6 +1445,8 @@ defmodule Explorer.ChainTest do
miner = insert(:address)
block = insert(:block, miner: miner)
insert(:unfetched_balance, address_hash: miner.hash, block_number: block.number)
:transaction
|> insert(from_address: miner)
|> with_block(block)

@ -38,19 +38,6 @@ defmodule Explorer.Market.History.CatalogerTest do
assert Repo.get_by(MarketHistory, date: record.date)
end
@tag capture_log: true
test "handle_info with failed task" do
state = %{}
test_pid = self()
expect(TestSource, :fetch_history, fn 10 -> send(test_pid, :retry) end)
set_mox_global()
assert {:noreply, state} == Cataloger.handle_info({nil, {10, 0, :error}}, state)
# Back off check
refute_receive :retry, 100
assert_receive :retry, 300
end
test "handle info for DOWN message" do
assert {:noreply, %{}} == Cataloger.handle_info({:DOWN, nil, :process, nil, nil}, %{})
end

@ -33,14 +33,25 @@ defmodule Explorer.Factory do
}
end
def balance_factory do
def unfetched_balance_factory do
%Balance{
address_hash: address_hash(),
block_number: block_number(),
value: Enum.random(1..100_000)
block_number: block_number()
}
end
def update_balance_value(%Balance{address_hash: address_hash, block_number: block_number}, value) do
Repo.update_all(
from(balance in Balance, where: balance.address_hash == ^address_hash and balance.block_number == ^block_number),
set: [value: value, value_fetched_at: DateTime.utc_now()]
)
end
def fetched_balance_factory do
unfetched_balance_factory()
|> struct!(value: Enum.random(1..100_000))
end
def contract_address_factory do
%Address{
hash: address_hash(),
@ -241,6 +252,17 @@ defmodule Explorer.Factory do
}
end
def internal_transaction_suicide_factory() do
%InternalTransaction{
from_address: build(:address),
trace_address: [],
# caller MUST supply `transaction` because it can't be built lazily to allow overrides without creating an extra
# transaction
type: :suicide,
value: sequence("internal_transaction_value", &Decimal.new(&1))
}
end
def log_factory do
%Log{
address: build(:address),

@ -146,22 +146,6 @@ defmodule ExplorerWeb.ViewingChainTest do
|> refute_has(ChainPage.transaction(last_shown_transaction))
end
test "count of non-loaded transactions live update when batch overflow", %{session: session, block: block} do
transaction_hashes =
30
|> insert_list(:transaction)
|> with_block(block)
|> Enum.map(& &1.hash)
session
|> ChainPage.visit_page()
|> assert_has(ChainPage.transactions(count: 5))
Notifier.handle_event({:chain_event, :transactions, transaction_hashes})
assert_has(session, ChainPage.non_loaded_transaction_count("30"))
end
test "contract creation is shown for to_address", %{session: session, block: block} do
contract_address = insert(:contract_address)

@ -97,33 +97,6 @@ defmodule ExplorerWeb.ViewingTransactionsTest do
|> TransactionListPage.visit_page()
|> assert_has(TransactionListPage.contract_creation(transaction))
end
test "viewing new transactions via live update on list page", %{session: session} do
TransactionListPage.visit_page(session)
transaction =
:transaction
|> insert()
|> with_block()
Notifier.handle_event({:chain_event, :transactions, [transaction.hash]})
assert_has(session, TransactionListPage.transaction(transaction))
end
test "count of non-loaded transactions on list page live update when batch overflow", %{session: session} do
transaction_hashes =
30
|> insert_list(:transaction)
|> with_block()
|> Enum.map(& &1.hash)
TransactionListPage.visit_page(session)
Notifier.handle_event({:chain_event, :transactions, transaction_hashes})
assert_has(session, TransactionListPage.non_loaded_transaction_count("30"))
end
end
describe "viewing a transaction page" do

@ -1,5 +1,6 @@
defmodule ExplorerWeb.FeatureCase do
use ExUnit.CaseTemplate
use Wallaby.DSL
# Types on Wallaby.Browser.resize_window don't allow session from start_session to be passed, so setup breaks
@dialyzer {:nowarn_function, __ex_unit_setup_0: 1}
@ -12,6 +13,7 @@ defmodule ExplorerWeb.FeatureCase do
import Ecto.Changeset
import Ecto.Query
import Explorer.Factory
import ExplorerWeb.FeatureCase
import ExplorerWeb.Router.Helpers
alias Explorer.Repo

@ -72,12 +72,16 @@ defmodule Indexer.BalanceFetcher do
case EthereumJSONRPC.fetch_balances(unique_params_list, json_rpc_named_arguments) do
{:ok, balances_params} ->
addresses_params = balances_params_to_address_params(balances_params)
value_fetched_at = DateTime.utc_now()
importable_balances_params = Enum.map(balances_params, &Map.put(&1, :value_fetched_at, value_fetched_at))
addresses_params = balances_params_to_address_params(importable_balances_params)
{:ok, _} =
Chain.import(%{
addresses: %{params: addresses_params, with: :balance_changeset},
balances: %{params: balances_params}
balances: %{params: importable_balances_params}
})
:ok

@ -0,0 +1,90 @@
defmodule Indexer.Balances do
@moduledoc """
Extracts `Explorer.Chain.Balance` params from other schema's params
"""
def params_set(%{} = import_options) do
Enum.reduce(import_options, MapSet.new(), &reducer/2)
end
defp reducer({:blocks_params, blocks_params}, acc) when is_list(blocks_params) do
# a block MUST have a miner_hash and number
Enum.into(blocks_params, acc, fn %{miner_hash: address_hash, number: block_number}
when is_binary(address_hash) and is_integer(block_number) ->
%{address_hash: address_hash, block_number: block_number}
end)
end
defp reducer({:internal_transactions_params, internal_transactions_params}, initial)
when is_list(internal_transactions_params) do
Enum.reduce(internal_transactions_params, initial, &internal_transactions_params_reducer/2)
end
defp reducer({:logs_params, logs_params}, acc) when is_list(logs_params) do
# a log MUST have and address_hash
Enum.into(logs_params, acc, fn %{address_hash: address_hash, block_number: block_number}
when is_binary(address_hash) and is_integer(block_number) ->
%{address_hash: address_hash, block_number: block_number}
end)
end
defp reducer({:token_transfers_params, token_transfers_params}, initial) when is_list(token_transfers_params) do
Enum.reduce(token_transfers_params, initial, fn %{
block_number: block_number,
from_address_hash: from_address_hash,
to_address_hash: to_address_hash,
token_contract_address_hash: token_contract_address_hash
},
acc
when is_integer(block_number) and is_binary(from_address_hash) and
is_binary(to_address_hash) and
is_binary(token_contract_address_hash) ->
acc
|> MapSet.put(%{address_hash: from_address_hash, block_number: block_number})
|> MapSet.put(%{address_hash: to_address_hash, block_number: block_number})
|> MapSet.put(%{address_hash: token_contract_address_hash, block_number: block_number})
end)
end
defp reducer({:transactions_params, transactions_params}, initial) when is_list(transactions_params) do
Enum.reduce(transactions_params, initial, &transactions_params_reducer/2)
end
defp internal_transactions_params_reducer(%{block_number: block_number} = internal_transaction_params, acc)
when is_integer(block_number) do
case internal_transaction_params do
%{type: "call"} ->
acc
%{type: "create", error: _} ->
acc
%{type: "create", created_contract_address_hash: address_hash} when is_binary(address_hash) ->
MapSet.put(acc, %{address_hash: address_hash, block_number: block_number})
%{type: "suicide", from_address_hash: from_address_hash, to_address_hash: to_address_hash}
when is_binary(from_address_hash) and is_binary(to_address_hash) ->
acc
|> MapSet.put(%{address_hash: from_address_hash, block_number: block_number})
|> MapSet.put(%{address_hash: to_address_hash, block_number: block_number})
end
end
defp transactions_params_reducer(
%{block_number: block_number, from_address_hash: from_address_hash} = transaction_params,
initial
)
when is_integer(block_number) and is_binary(from_address_hash) do
# a transaction MUST have a `from_address_hash`
acc = MapSet.put(initial, %{address_hash: from_address_hash, block_number: block_number})
# `to_address_hash` is optional
case transaction_params do
%{to_address_hash: to_address_hash} when is_binary(to_address_hash) ->
MapSet.put(acc, %{address_hash: to_address_hash, block_number: block_number})
_ ->
acc
end
end
end

@ -8,13 +8,7 @@ defmodule Indexer.BlockFetcher do
import Indexer, only: [debug: 1]
alias Explorer.Chain.{Block, Import}
alias Indexer.{
AddressExtraction,
Sequence,
TokenTransfers
}
alias Indexer.{AddressExtraction, Balances, Sequence, TokenTransfers}
alias Indexer.BlockFetcher.Receipts
# dialyzer thinks that Logger.debug functions always have no_local_return
@ -43,6 +37,7 @@ defmodule Indexer.BlockFetcher do
address_hash_to_fetched_balance_block_number: address_hash_to_fetched_balance_block_number,
transaction_hash_to_block_number_option: transaction_hash_to_block_number,
addresses: Import.addresses_options(),
balances: Import.balances_options(),
blocks: Import.blocks_options(),
broadcast: boolean,
logs: Import.logs_options(),
@ -143,10 +138,21 @@ defmodule Indexer.BlockFetcher do
{:ok, _} = ok ->
ok
{:error, changesets} = error when is_list(changesets) ->
%{range: range} = options
Logger.error(fn ->
"failed to validate blocks #{inspect(range)}: #{inspect(changesets)}. Retrying"
end)
:ok = Sequence.queue(sequence, range)
error
{:error, step, failed_value, _changes_so_far} = error ->
%{range: range} = options
debug(fn ->
Logger.error(fn ->
"failed to insert blocks during #{step} #{inspect(range)}: #{inspect(failed_value)}. Retrying"
end)
@ -203,11 +209,20 @@ defmodule Indexer.BlockFetcher do
transactions: transactions_with_receipts
})
balances_params_set =
Balances.params_set(%{
blocks_params: blocks,
logs_params: logs,
token_transfers_params: token_transfers,
transactions_params: transactions_with_receipts
})
insert(
state,
%{
range: range,
addresses: %{params: addresses},
balances: %{params: balances_params_set},
blocks: %{params: blocks},
logs: %{params: logs},
receipts: %{params: receipts},

@ -47,13 +47,11 @@ defmodule Indexer.BlockFetcher.Catchup do
)
end
def task(
%__MODULE__{
def task(%__MODULE__{
block_fetcher:
%BlockFetcher{blocks_batch_size: blocks_batch_size, json_rpc_named_arguments: json_rpc_named_arguments} =
block_fetcher
} = state
) do
}) do
{:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments)
case latest_block_number do

@ -59,6 +59,7 @@ defmodule Indexer.BlockFetcher.Realtime do
block_fetcher,
%{
address_hash_to_fetched_balance_block_number: address_hash_to_block_number,
balances: %{params: balance_params},
addresses: %{params: addresses_params},
transactions: %{params: transactions_params}
} = options
@ -75,7 +76,8 @@ defmodule Indexer.BlockFetcher.Realtime do
{:ok, %{addresses_params: balances_addresses_params, balances_params: balances_params}} <-
balances(block_fetcher, %{
address_hash_to_block_number: address_hash_to_block_number,
address_params: internal_transactions_addresses_params
addresses_params: internal_transactions_addresses_params,
balances_params: balance_params
}),
chain_import_options =
options
@ -89,65 +91,6 @@ defmodule Indexer.BlockFetcher.Realtime do
end
end
def internal_transactions(
%BlockFetcher{json_rpc_named_arguments: json_rpc_named_arguments},
%{addresses_params: addresses_params, transactions_params: transactions_params}
) do
with {:ok, internal_transactions_params} <-
transactions_params
|> transactions_params_to_fetch_internal_transactions_params()
|> EthereumJSONRPC.fetch_internal_transactions(json_rpc_named_arguments) do
merged_addresses_params =
%{internal_transactions: internal_transactions_params}
|> AddressExtraction.extract_addresses()
|> Kernel.++(addresses_params)
|> AddressExtraction.merge_addresses()
{:ok, %{addresses_params: merged_addresses_params, internal_transactions_params: internal_transactions_params}}
end
end
defp transactions_params_to_fetch_internal_transactions_params(transactions_params) do
Enum.map(transactions_params, &transaction_params_to_fetch_internal_transaction_params/1)
end
defp transaction_params_to_fetch_internal_transaction_params(%{block_number: block_number, hash: hash})
when is_integer(block_number) do
%{block_number: block_number, hash_data: to_string(hash)}
end
def balances(
%BlockFetcher{json_rpc_named_arguments: json_rpc_named_arguments},
%{
address_params: address_params,
address_hash_to_block_number: address_hash_to_block_number
}
) do
balances_params =
Enum.map(address_params, fn %{hash: address_hash} = address_params when is_binary(address_hash) ->
block_number =
case address_params do
%{fetched_balance_block_number: block_number} when is_integer(block_number) ->
block_number
_ ->
Map.fetch!(address_hash_to_block_number, address_hash)
end
%{hash_data: address_hash, block_quantity: integer_to_quantity(block_number)}
end)
with {:ok, balances_params} <- EthereumJSONRPC.fetch_balances(balances_params, json_rpc_named_arguments) do
merged_addresses_params =
%{balances: balances_params}
|> AddressExtraction.extract_addresses()
|> Kernel.++(address_params)
|> AddressExtraction.merge_addresses()
{:ok, %{addresses_params: merged_addresses_params, balances_params: balances_params}}
end
end
def handle_success(
{ref, :ok = result},
%BlockFetcher.Supervisor{realtime: %__MODULE__{task_by_ref: task_by_ref}} = supervisor_state
@ -199,4 +142,87 @@ defmodule Indexer.BlockFetcher.Realtime do
|> Enum.map(& &1.contract_address_hash)
|> TokenFetcher.async_fetch()
end
defp internal_transactions(
%BlockFetcher{json_rpc_named_arguments: json_rpc_named_arguments},
%{addresses_params: addresses_params, transactions_params: transactions_params}
) do
with {:ok, internal_transactions_params} <-
transactions_params
|> transactions_params_to_fetch_internal_transactions_params()
|> EthereumJSONRPC.fetch_internal_transactions(json_rpc_named_arguments) do
merged_addresses_params =
%{internal_transactions: internal_transactions_params}
|> AddressExtraction.extract_addresses()
|> Kernel.++(addresses_params)
|> AddressExtraction.merge_addresses()
{:ok, %{addresses_params: merged_addresses_params, internal_transactions_params: internal_transactions_params}}
end
end
defp transactions_params_to_fetch_internal_transactions_params(transactions_params) do
Enum.map(transactions_params, &transaction_params_to_fetch_internal_transaction_params/1)
end
defp transaction_params_to_fetch_internal_transaction_params(%{block_number: block_number, hash: hash})
when is_integer(block_number) do
%{block_number: block_number, hash_data: to_string(hash)}
end
defp balances(
%BlockFetcher{json_rpc_named_arguments: json_rpc_named_arguments},
%{addresses_params: addresses_params} = options
) do
with {:ok, fetched_balances_params} <-
options
|> fetch_balances_params_list()
|> EthereumJSONRPC.fetch_balances(json_rpc_named_arguments) do
merged_addresses_params =
%{balances: fetched_balances_params}
|> AddressExtraction.extract_addresses()
|> Kernel.++(addresses_params)
|> AddressExtraction.merge_addresses()
value_fetched_at = DateTime.utc_now()
importable_balances_params = Enum.map(fetched_balances_params, &Map.put(&1, :value_fetched_at, value_fetched_at))
{:ok, %{addresses_params: merged_addresses_params, balances_params: importable_balances_params}}
end
end
defp fetch_balances_params_list(%{
addresses_params: addresses_params,
address_hash_to_block_number: address_hash_to_block_number,
balances_params: balances_params
}) do
addresses_params
|> addresses_params_to_fetched_balances_params_set(%{address_hash_to_block_number: address_hash_to_block_number})
|> MapSet.union(balances_params_to_fetch_balances_params_set(balances_params))
# stable order for easier moxing
|> Enum.sort_by(fn %{hash_data: hash_data, block_quantity: block_quantity} -> {hash_data, block_quantity} end)
end
defp addresses_params_to_fetched_balances_params_set(addresses_params, %{
address_hash_to_block_number: address_hash_to_block_number
}) do
Enum.into(addresses_params, MapSet.new(), fn %{hash: address_hash} = address_params when is_binary(address_hash) ->
block_number =
case address_params do
%{fetched_balance_block_number: block_number} when is_integer(block_number) ->
block_number
_ ->
Map.fetch!(address_hash_to_block_number, address_hash)
end
%{hash_data: address_hash, block_quantity: integer_to_quantity(block_number)}
end)
end
defp balances_params_to_fetch_balances_params_set(balances_params) do
Enum.into(balances_params, MapSet.new(), fn %{address_hash: address_hash, block_number: block_number} ->
%{hash_data: address_hash, block_quantity: integer_to_quantity(block_number)}
end)
end
end

@ -63,6 +63,7 @@ defmodule Indexer.AddressBalanceFetcherTest do
{:ok, miner_hash} = Hash.Address.cast(miner_hash_data)
miner = insert(:address, hash: miner_hash)
block = insert(:block, miner: miner, number: block_number)
insert(:unfetched_balance, address_hash: miner.hash, block_number: block_number)
assert miner.fetched_balance == nil
assert miner.fetched_balance_block_number == nil
@ -118,6 +119,7 @@ defmodule Indexer.AddressBalanceFetcherTest do
{:ok, miner_hash} = Hash.Address.cast(miner_hash_data)
miner = insert(:address, hash: miner_hash)
block = insert(:block, miner: miner, number: block_number)
insert(:unfetched_balance, address_hash: miner.hash, block_number: block_number)
AddressBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments, max_batch_size: 2)

@ -0,0 +1,184 @@
defmodule Indexer.BalancesTest do
use ExUnit.Case, async: true
alias Explorer.Factory
alias Indexer.Balances
describe "params_set/1" do
test "with block extracts miner_hash" do
miner_hash =
Factory.address_hash()
|> to_string()
block_number = 1
params_set = Balances.params_set(%{blocks_params: [%{miner_hash: miner_hash, number: block_number}]})
assert MapSet.size(params_set) == 1
assert %{address_hash: miner_hash, block_number: block_number}
end
test "with call internal transaction extracts nothing" do
internal_transaction_params =
:internal_transaction
|> Factory.params_for()
|> Map.update!(:type, &to_string/1)
|> Map.put(:block_number, 1)
params_set = Balances.params_set(%{internal_transactions_params: [internal_transaction_params]})
assert MapSet.size(params_set) == 0
end
test "with create internal transaction with error extracts nothing" do
internal_transaction_params =
:internal_transaction_create
|> Factory.params_for()
|> Map.update!(:type, &to_string/1)
|> Map.put(:block_number, 1)
|> Map.put(:error, "illegal operation")
params_set = Balances.params_set(%{internal_transactions_params: [internal_transaction_params]})
assert MapSet.size(params_set) == 0
end
test "with create internal transaction without error extracts created_contract_address_hash" do
block_number = 1
created_contract_address_hash =
Factory.address_hash()
|> to_string()
internal_transaction_params =
:internal_transaction_create
|> Factory.params_for()
|> Map.update!(:type, &to_string/1)
|> Map.put(:block_number, block_number)
|> Map.put(:created_contract_address_hash, created_contract_address_hash)
params_set = Balances.params_set(%{internal_transactions_params: [internal_transaction_params]})
assert MapSet.size(params_set) == 1
assert %{address_hash: created_contract_address_hash, block_number: block_number}
end
test "with suicide internal transaction extracts from_address_hash and to_address_hash" do
block_number = 1
from_address_hash =
Factory.address_hash()
|> to_string()
to_address_hash =
Factory.address_hash()
|> to_string()
internal_transaction_params =
:internal_transaction_suicide
|> Factory.params_for()
|> Map.update!(:type, &to_string/1)
|> Map.put(:block_number, block_number)
|> Map.put(:from_address_hash, from_address_hash)
|> Map.put(:to_address_hash, to_address_hash)
params_set = Balances.params_set(%{internal_transactions_params: [internal_transaction_params]})
assert MapSet.size(params_set) == 2
assert %{address_hash: from_address_hash, block_number: block_number}
assert %{address_hash: to_address_hash, block_number: block_number}
end
test "with log extracts address_hash" do
block_number = 1
address_hash =
Factory.address_hash()
|> to_string()
log_params =
:log
|> Factory.params_for()
|> Map.put(:block_number, block_number)
|> Map.put(:address_hash, address_hash)
params_set = Balances.params_set(%{logs_params: [log_params]})
assert MapSet.size(params_set) == 1
assert %{address_hash: address_hash, block_number: block_number}
end
test "with token transfer extract from_address, to_address, and token_contract_address_hash" do
block_number = 1
from_address_hash =
Factory.address_hash()
|> to_string()
to_address_hash =
Factory.address_hash()
|> to_string()
token_contract_address_hash =
Factory.address_hash()
|> to_string()
token_transfer_params = %{
block_number: block_number,
from_address_hash: from_address_hash,
to_address_hash: to_address_hash,
token_contract_address_hash: token_contract_address_hash
}
params_set = Balances.params_set(%{token_transfers_params: [token_transfer_params]})
assert MapSet.size(params_set) == 3
assert %{address_hash: from_address_hash, block_number: block_number}
assert %{address_hash: to_address_hash, block_number: block_number}
assert %{address_hash: token_contract_address_hash, block_number: block_number}
end
test "with transaction without to_address_hash extracts from_address_hash" do
block_number = 1
from_address_hash =
Factory.address_hash()
|> to_string()
transaction_params =
:transaction
|> Factory.params_for()
|> Map.put(:block_number, block_number)
|> Map.put(:from_address_hash, from_address_hash)
params_set = Balances.params_set(%{transactions_params: [transaction_params]})
assert MapSet.size(params_set) == 1
assert %{address_hash: from_address_hash, block_number: block_number}
end
test "with transaction with to_address_hash extracts from_address_hash and to_address_hash" do
block_number = 1
from_address_hash =
Factory.address_hash()
|> to_string()
to_address_hash =
Factory.address_hash()
|> to_string()
transaction_params =
:transaction
|> Factory.params_for()
|> Map.put(:block_number, block_number)
|> Map.put(:from_address_hash, from_address_hash)
|> Map.put(:to_address_hash, to_address_hash)
params_set = Balances.params_set(%{transactions_params: [transaction_params]})
assert MapSet.size(params_set) == 2
assert %{address_hash: from_address_hash, block_number: block_number}
assert %{address_hash: to_address_hash, block_number: block_number}
end
end
end

@ -113,6 +113,9 @@ defmodule Indexer.BlockFetcher.SupervisorTest do
[%{method: "eth_getBalance"} | _] = requests, _options ->
{:ok, Enum.map(requests, fn %{id: id} -> %{id: id, jsonrpc: "2.0", result: "0x0"} end)}
[], _options ->
{:ok, []}
end)
EthereumJSONRPC.Geth ->

Loading…
Cancel
Save