Merge branch 'master' into optimize-replace-transaction-fetcher

pull/1456/head
Andrew Cravenho 6 years ago committed by GitHub
commit cd38bfe58f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      apps/explorer/lib/explorer/chain/import/runner/addresses.ex
  2. 5
      apps/indexer/config/dev.exs
  3. 5
      apps/indexer/config/prod.exs
  4. 5
      apps/indexer/config/test.exs
  5. 6
      apps/indexer/lib/indexer/block/supervisor.ex
  6. 119
      apps/indexer/lib/indexer/temporary/failed_created_addresses.ex
  7. 38
      apps/indexer/lib/indexer/temporary/failed_created_addresses/supervisor.ex
  8. 139
      apps/indexer/test/indexer/temporary/failed_created_addresses_test.exs
  9. 1
      config/config.exs

@ -87,7 +87,7 @@ defmodule Explorer.Chain.Import.Runner.Addresses do
from(address in Address, from(address in Address,
update: [ update: [
set: [ set: [
contract_code: fragment("COALESCE(?, EXCLUDED.contract_code)", address.contract_code), contract_code: fragment("COALESCE(EXCLUDED.contract_code, ?)", address.contract_code),
# ARGMAX on two columns # ARGMAX on two columns
fetched_coin_balance: fetched_coin_balance:
fragment( fragment(

@ -11,6 +11,11 @@ config :logger, :indexer_token_balances,
path: Path.absname("logs/dev/indexer/token_balances/error.log"), path: Path.absname("logs/dev/indexer/token_balances/error.log"),
metadata_filter: [fetcher: :token_balances] metadata_filter: [fetcher: :token_balances]
config :logger, :failed_contract_creations,
level: :debug,
path: Path.absname("logs/dev/indexer/failed_contract_creations.log"),
metadata_filter: [fetcher: :failed_created_addresses]
variant = variant =
if is_nil(System.get_env("ETHEREUM_JSONRPC_VARIANT")) do if is_nil(System.get_env("ETHEREUM_JSONRPC_VARIANT")) do
"ganache" "ganache"

@ -13,6 +13,11 @@ config :logger, :indexer_token_balances,
metadata_filter: [fetcher: :token_balances], metadata_filter: [fetcher: :token_balances],
rotate: %{max_bytes: 52_428_800, keep: 19} rotate: %{max_bytes: 52_428_800, keep: 19}
config :logger, :failed_contract_creations,
level: :prod,
path: Path.absname("logs/prod/indexer/failed_contract_creations.log"),
metadata_filter: [fetcher: :failed_created_addresses]
variant = variant =
if is_nil(System.get_env("ETHEREUM_JSONRPC_VARIANT")) do if is_nil(System.get_env("ETHEREUM_JSONRPC_VARIANT")) do
"parity" "parity"

@ -10,3 +10,8 @@ config :logger, :indexer_token_balances,
level: :debug, level: :debug,
path: Path.absname("logs/test/indexer/token_balances/error.log"), path: Path.absname("logs/test/indexer/token_balances/error.log"),
metadata_filter: [fetcher: :token_balances] metadata_filter: [fetcher: :token_balances]
config :logger, :failed_contract_creations,
level: :debug,
path: Path.absname("logs/test/indexer/failed_contract_creations.log"),
metadata_filter: [fetcher: :failed_created_addresses]

@ -5,6 +5,7 @@ defmodule Indexer.Block.Supervisor do
alias Indexer.Block alias Indexer.Block
alias Indexer.Block.{Catchup, InvalidConsensus, Realtime, Reward, Uncle} alias Indexer.Block.{Catchup, InvalidConsensus, Realtime, Reward, Uncle}
alias Indexer.Temporary.FailedCreatedAddresses
use Supervisor use Supervisor
@ -45,6 +46,11 @@ defmodule Indexer.Block.Supervisor do
[ [
[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor], [json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor],
[name: Reward.Supervisor] [name: Reward.Supervisor]
]},
{FailedCreatedAddresses.Supervisor,
[
json_rpc_named_arguments,
[name: FailedCreatedAddresses.Supervisor]
]} ]}
], ],
strategy: :one_for_one strategy: :one_for_one

@ -0,0 +1,119 @@
defmodule Indexer.Temporary.FailedCreatedAddresses do
@moduledoc """
Temporary module to fix internal transactions and their created transactions if a parent transaction has failed.
"""
use GenServer
require Logger
import Ecto.Query
alias Explorer.Chain.{InternalTransaction, Transaction}
alias Explorer.Repo
alias Indexer.Temporary.FailedCreatedAddresses.TaskSupervisor
@task_options [max_concurrency: 3, timeout: :infinity]
@query_timeout :infinity
def start_link([json_rpc_named_arguments, gen_server_options]) do
GenServer.start_link(__MODULE__, json_rpc_named_arguments, gen_server_options)
end
@impl GenServer
def init(json_rpc_named_arguments) do
run(json_rpc_named_arguments)
{:ok, json_rpc_named_arguments}
end
def run(json_rpc_named_arguments) do
Logger.debug(
[
"Started query to fetch internal transactions that need to be fixed"
],
fetcher: :failed_created_addresses
)
query =
from(t in Transaction,
left_join: it in InternalTransaction,
on: it.transaction_hash == t.hash,
where: t.status == ^0 and not is_nil(it.created_contract_address_hash)
)
found_transactions = Repo.all(query, timeout: @query_timeout)
Logger.debug(
[
"Finished query to fetch internal transactions that need to be fixed. Number of records is #{
Enum.count(found_transactions)
}"
],
fetcher: :failed_created_addresses
)
TaskSupervisor
|> Task.Supervisor.async_stream_nolink(
found_transactions,
fn transaction -> fix_internal_transaction(transaction, json_rpc_named_arguments) end,
@task_options
)
|> Enum.to_list()
end
def fix_internal_transaction(transaction, json_rpc_named_arguments) do
# credo:disable-for-next-line
try do
Logger.debug(
[
"Started fixing transaction #{to_string(transaction.hash)}"
],
fetcher: :failed_created_addresses
)
transaction_with_internal_transactions = Repo.preload(transaction, [:internal_transactions])
transaction_with_internal_transactions.internal_transactions
|> Enum.filter(fn internal_transaction ->
internal_transaction.created_contract_address_hash
end)
|> Enum.each(fn internal_transaction ->
:ok =
internal_transaction
|> code_entry()
|> Indexer.Code.Fetcher.run(json_rpc_named_arguments)
end)
:ok =
transaction
|> transaction_entry()
|> Indexer.InternalTransaction.Fetcher.run(json_rpc_named_arguments)
Logger.debug(
[
"Finished fixing transaction #{to_string(transaction.hash)}"
],
fetcher: :failed_created_addresses
)
rescue
e ->
Logger.debug(
[
"Failed fixing transaction #{to_string(transaction.hash)} because of #{inspect(e)}"
],
fetcher: :failed_created_addresses
)
end
end
def code_entry(%InternalTransaction{
block_number: block_number,
created_contract_address_hash: %{bytes: created_contract_bytes}
}) do
[{block_number, created_contract_bytes, <<>>}]
end
def transaction_entry(%Transaction{hash: %{bytes: bytes}, index: index, block_number: block_number}) do
[{block_number, bytes, index}]
end
end

@ -0,0 +1,38 @@
defmodule Indexer.Temporary.FailedCreatedAddresses.Supervisor do
@moduledoc """
Supervises `Indexer.Temporary.FailedCreatedAddresses`.
"""
use Supervisor
alias Indexer.Temporary.FailedCreatedAddresses
def child_spec([init_arguments]) do
child_spec([init_arguments, []])
end
def child_spec([_init_arguments, _gen_server_options] = start_link_arguments) do
default = %{
id: __MODULE__,
start: {__MODULE__, :start_link, start_link_arguments},
type: :supervisor
}
Supervisor.child_spec(default, [])
end
def start_link(json_rpc_named_arguments, gen_server_options \\ []) do
Supervisor.start_link(__MODULE__, json_rpc_named_arguments, gen_server_options)
end
@impl Supervisor
def init(json_rpc_named_arguments) do
Supervisor.init(
[
{Task.Supervisor, name: Indexer.Temporary.FailedCreatedAddresses.TaskSupervisor},
{FailedCreatedAddresses, [json_rpc_named_arguments, [name: FailedCreatedAddresses]]}
],
strategy: :rest_for_one
)
end
end

@ -0,0 +1,139 @@
defmodule Indexer.Temporary.FailedCreatedAddressesTest do
use Explorer.DataCase, async: false
use EthereumJSONRPC.Case, async: false
import Mox
import Ecto.Query
alias Explorer.Repo
alias Explorer.Chain.{Address, Transaction}
alias Indexer.Temporary.FailedCreatedAddresses.Supervisor
alias Indexer.CoinBalance
@moduletag capture_log: true
setup :set_mox_global
setup :verify_on_exit!
describe "run/1" do
@tag :no_parity
@tag :no_geth
test "updates failed replaced transactions", %{json_rpc_named_arguments: json_rpc_named_arguments} do
CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
block = insert(:block)
transaction =
:transaction
|> insert(
status: 0,
error: "Reverted",
internal_transactions_indexed_at: DateTime.utc_now(),
block: block,
block_number: block.number,
cumulative_gas_used: 200,
gas_used: 100,
index: 0
)
address = insert(:address, contract_code: "0x0102030405")
insert(:internal_transaction,
block_number: transaction.block_number,
transaction: transaction,
index: 0,
created_contract_address_hash: address.hash
)
if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do
EthereumJSONRPC.Mox
|> expect(:json_rpc, fn _json, _options ->
{:ok, [%{id: 0, jsonrpc: "2.0", result: "0x"}]}
end)
|> expect(:json_rpc, fn [%{id: id, method: "eth_getBalance", params: [_address, _block_quantity]}], _options ->
{:ok, [%{id: id, result: "0x0"}]}
end)
|> expect(:json_rpc, fn _json, _options ->
{:ok,
[
%{
id: 0,
jsonrpc: "2.0",
result: %{
"output" => "0x",
"stateDiff" => nil,
"trace" => [
%{
"action" => %{
"callType" => "call",
"from" => "0xc73add416e2119d20ce80e0904fc1877e33ef246",
"gas" => "0x13388",
"input" => "0xc793bf97",
"to" => "0x2d07e106b5d280e4ccc2d10deee62441c91d4340",
"value" => "0x0"
},
"error" => "Reverted",
"subtraces" => 1,
"traceAddress" => [],
"type" => "call"
},
%{
"action" => %{
"from" => "0x2d07e106b5d280e4ccc2d10deee62441c91d4340",
"gas" => "0xb2ab",
"init" => "0x4bb278f3",
"value" => "0x0"
},
"result" => %{
"address" => "0xf4a5afe28b91cf928c2568805cfbb36d477f0b75",
"code" =>
"0x6080604052600436106038577c010000000000000000000000000000000000000000000000000000000060003504633ccfd60b8114604f575b336000908152602081905260409020805434019055005b348015605a57600080fd5b5060616063565b005b33600081815260208190526040808220805490839055905190929183156108fc02918491818181858888f1935050505015801560a3573d6000803e3d6000fd5b505056fea165627a7a72305820e9a226f249def650de957dd8b4127b85a3049d6bfa818cadc4e2d3c44b6a53530029",
"gasUsed" => "0xa535"
},
"subtraces" => 0,
"traceAddress" => [0],
"type" => "create"
}
],
"vmTrace" => nil
}
}
]}
end)
end
params = [json_rpc_named_arguments, [name: TestFailedCreatedAddresses]]
params
|> Supervisor.child_spec()
|> ExUnit.Callbacks.start_supervised!()
Process.sleep(3_000)
query =
from(t in Transaction,
where: t.hash == ^transaction.hash,
preload: [internal_transactions: :created_contract_address]
)
fetched_transaction = Repo.one(query)
assert Enum.count(fetched_transaction.internal_transactions) == 2
assert Enum.all?(fetched_transaction.internal_transactions, fn it ->
it.error && is_nil(it.created_contract_address_hash)
end)
fetched_address =
Repo.one(
from(a in Address,
where: a.hash == ^address.hash
)
)
assert fetched_address.contract_code == %Explorer.Chain.Data{bytes: ""}
end
end
end

@ -26,6 +26,7 @@ config :logger,
# only :indexer, but all levels # only :indexer, but all levels
{LoggerFileBackend, :indexer}, {LoggerFileBackend, :indexer},
{LoggerFileBackend, :indexer_token_balances}, {LoggerFileBackend, :indexer_token_balances},
{LoggerFileBackend, :failed_contract_creations},
{LoggerFileBackend, :reading_token_functions} {LoggerFileBackend, :reading_token_functions}
] ]

Loading…
Cancel
Save