Merge pull request #1214 from poanetwork/metadata-instead-of-prose

Metadata instead of prose
pull/1218/head
Andrew Cravenho 6 years ago committed by GitHub
commit 2fdd5bb7df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      apps/block_scout_web/config/config.exs
  2. 4
      apps/ethereum_jsonrpc/config/config.exs
  3. 4
      apps/explorer/config/config.exs
  4. 4
      apps/indexer/config/config.exs
  5. 45
      apps/indexer/lib/indexer/block/catchup/bound_interval_supervisor.ex
  6. 46
      apps/indexer/lib/indexer/block/catchup/fetcher.ex
  7. 2
      apps/indexer/lib/indexer/block/fetcher/receipts.ex
  8. 42
      apps/indexer/lib/indexer/block/realtime/fetcher.ex
  9. 47
      apps/indexer/lib/indexer/block/uncle/fetcher.ex
  10. 35
      apps/indexer/lib/indexer/coin_balance/fetcher.ex
  11. 25
      apps/indexer/lib/indexer/internal_transaction/fetcher.ex
  12. 16
      apps/indexer/lib/indexer/logger.ex
  13. 13
      apps/indexer/lib/indexer/token_balance/fetcher.ex
  14. 2
      apps/indexer/lib/indexer/token_balances.ex
  15. 12
      config/config.exs

@ -48,7 +48,9 @@ config :ex_cldr,
config :logger, :block_scout_web,
# keep synced with `config/config.exs`
format: "$dateT$time $metadata[$level] $message\n",
metadata: ~w(application fetcher request_id)a,
metadata:
~w(application fetcher request_id first_block_number last_block_number missing_block_range_count missing_block_count
block_number step count error_count shrunk)a,
metadata_filter: [application: :block_scout_web]
config :spandex_phoenix, tracer: BlockScoutWeb.Tracer

@ -17,7 +17,9 @@ config :ethereum_jsonrpc, EthereumJSONRPC.Tracer,
config :logger, :ethereum_jsonrpc,
# keep synced with `config/config.exs`
format: "$dateT$time $metadata[$level] $message\n",
metadata: ~w(application fetcher request_id)a,
metadata:
~w(application fetcher request_id first_block_number last_block_number missing_block_range_count missing_block_count
block_number step count error_count shrunk)a,
metadata_filter: [application: :ethereum_jsonrpc]
# Import environment specific config. This must remain at the bottom

@ -50,7 +50,9 @@ config :explorer,
config :logger, :explorer,
# keep synced with `config/config.exs`
format: "$dateT$time $metadata[$level] $message\n",
metadata: ~w(application fetcher request_id)a,
metadata:
~w(application fetcher request_id first_block_number last_block_number missing_block_range_count missing_block_count
block_number step count error_count shrunk)a,
metadata_filter: [application: :explorer]
config :spandex_ecto, SpandexEcto.EctoLogger,

@ -19,7 +19,9 @@ config :indexer, Indexer.Tracer,
config :logger, :indexer,
# keep synced with `config/config.exs`
format: "$dateT$time $metadata[$level] $message\n",
metadata: ~w(application fetcher request_id)a,
metadata:
~w(application fetcher request_id first_block_number last_block_number missing_block_range_count missing_block_count
block_number step count error_count shrunk)a,
metadata_filter: [application: :indexer]
# Import environment specific config. This must remain at the bottom

@ -184,7 +184,8 @@ defmodule Indexer.Block.Catchup.BoundIntervalSupervisor do
end
def handle_info(
{ref, %{first_block_number: first_block_number, missing_block_count: missing_block_count, shrunk: false}},
{ref,
%{first_block_number: first_block_number, missing_block_count: missing_block_count, shrunk: false = shrunk}},
%__MODULE__{
bound_interval: bound_interval,
task: %Task{ref: ref}
@ -194,20 +195,23 @@ defmodule Indexer.Block.Catchup.BoundIntervalSupervisor do
new_bound_interval =
case missing_block_count do
0 ->
Logger.info(fn -> ["Index already caught up in ", to_string(first_block_number), "-0."] end)
Logger.info("Index already caught up.",
first_block_number: first_block_number,
last_block_number: 0,
missing_block_count: 0,
shrunk: shrunk
)
BoundInterval.increase(bound_interval)
_ ->
Logger.info(fn ->
[
"Index had to catch up ",
to_string(missing_block_count),
" blocks in ",
to_string(first_block_number),
"-0."
]
end)
Logger.info(
"Index had to catch up.",
first_block_number: first_block_number,
last_block_number: 0,
missing_block_count: missing_block_count,
shrunk: shrunk
)
BoundInterval.decrease(bound_interval)
end
@ -226,7 +230,8 @@ defmodule Indexer.Block.Catchup.BoundIntervalSupervisor do
end
def handle_info(
{ref, %{first_block_number: first_block_number, missing_block_count: missing_block_count, shrunk: true}},
{ref,
%{first_block_number: first_block_number, missing_block_count: missing_block_count, shrunk: true = shrunk}},
%__MODULE__{
task: %Task{ref: ref}
} = state
@ -234,15 +239,13 @@ defmodule Indexer.Block.Catchup.BoundIntervalSupervisor do
when is_integer(missing_block_count) do
Process.demonitor(ref, [:flush])
Logger.info(fn ->
[
"Index had to catch up ",
to_string(missing_block_count),
" blocks in ",
to_string(first_block_number),
"-0, but the sequence was shrunk to save memory, so retrying immediately."
]
end)
Logger.info(
"Index had to catch up, but the sequence was shrunk to save memory, so retrying immediately.",
first_block_number: first_block_number,
last_block_number: 0,
missing_block_count: missing_block_count,
shrunk: shrunk
)
send(self(), :catchup_index)

@ -71,6 +71,9 @@ defmodule Indexer.Block.Catchup.Fetcher do
# realtime indexer gets the current latest block
first = latest_block_number - 1
last = 0
Logger.metadata(first_block_number: first, last_block_number: last)
missing_ranges = Chain.missing_block_number_ranges(first..last)
range_count = Enum.count(missing_ranges)
@ -79,9 +82,10 @@ defmodule Indexer.Block.Catchup.Fetcher do
|> Stream.map(&Enum.count/1)
|> Enum.sum()
Logger.debug(fn ->
"#{missing_block_count} missed blocks in #{range_count} ranges between #{first} and #{last}"
end)
Logger.debug(fn -> "Missed blocks in ranges." end,
missing_block_range_count: range_count,
missing_block_count: missing_block_count
)
shrunk =
case missing_block_count do
@ -171,23 +175,25 @@ defmodule Indexer.Block.Catchup.Fetcher do
)
defp fetch_and_import_range_from_sequence(
%__MODULE__{block_fetcher: %Block.Fetcher{} = block_fetcher},
_.._ = range,
first..last = range,
sequence
) do
Logger.metadata(fetcher: :block_catchup)
Logger.metadata(fetcher: :block_catchup, first_block_number: first, last_block_number: last)
case fetch_and_import_range(block_fetcher, range) do
{:ok, %{inserted: inserted, errors: errors}} ->
errors = cap_seq(sequence, errors, range)
errors = cap_seq(sequence, errors)
retry(sequence, errors)
{:ok, inserted: inserted}
{:error, {step, reason}} = error ->
Logger.error(fn ->
first..last = range
"failed to fetch #{step} for blocks #{first} - #{last}: #{inspect(reason)}. Retrying block range."
end)
Logger.error(
fn ->
["failed to fetch: ", inspect(reason), ". Retrying."]
end,
step: step
)
push_back(sequence, range)
@ -195,7 +201,7 @@ defmodule Indexer.Block.Catchup.Fetcher do
{:error, changesets} = error when is_list(changesets) ->
Logger.error(fn ->
"failed to validate blocks #{inspect(range)}: #{inspect(changesets)}. Retrying"
["failed to validate: ", inspect(changesets), ". Retrying."]
end)
push_back(sequence, range)
@ -203,9 +209,12 @@ defmodule Indexer.Block.Catchup.Fetcher do
error
{:error, {step, failed_value, _changes_so_far}} = error ->
Logger.error(fn ->
"failed to insert blocks during #{step} #{inspect(range)}: #{inspect(failed_value)}. Retrying"
end)
Logger.error(
fn ->
["failed to insert: ", inspect(failed_value), ". Retrying."]
end,
step: step
)
push_back(sequence, range)
@ -213,7 +222,7 @@ defmodule Indexer.Block.Catchup.Fetcher do
end
end
defp cap_seq(seq, errors, range) do
defp cap_seq(seq, errors) do
{not_founds, other_errors} =
Enum.split_with(errors, fn
%{code: 404, data: %{number: _}} -> true
@ -222,10 +231,7 @@ defmodule Indexer.Block.Catchup.Fetcher do
case not_founds do
[] ->
Logger.debug(fn ->
first_block_number..last_block_number = range
"got blocks #{first_block_number} - #{last_block_number}"
end)
Logger.debug("got blocks")
other_errors
@ -239,7 +245,7 @@ defmodule Indexer.Block.Catchup.Fetcher do
defp push_back(sequence, range) do
case Sequence.push_back(sequence, range) do
:ok -> :ok
{:error, reason} -> Logger.error(fn -> ["Could not push block range to back to Sequence: ", inspect(reason)] end)
{:error, reason} -> Logger.error(fn -> ["Could not push back to Sequence: ", inspect(reason)] end)
end
end

@ -13,7 +13,7 @@ defmodule Indexer.Block.Fetcher.Receipts do
%Block.Fetcher{json_rpc_named_arguments: json_rpc_named_arguments} = state,
transaction_params
) do
Logger.debug(fn -> "fetching #{length(transaction_params)} transaction receipts" end)
Logger.debug("fetching transaction receipts", count: Enum.count(transaction_params))
stream_opts = [max_concurrency: state.receipts_concurrency, timeout: :infinity]
transaction_params

@ -159,8 +159,8 @@ defmodule Indexer.Block.Realtime.Fetcher do
@decorate trace(name: "fetch", resource: "Indexer.Block.Realtime.Fetcher.fetch_and_import_block/3", tracer: Tracer)
def fetch_and_import_block(block_number_to_fetch, block_fetcher, reorg?, retry \\ 3) do
Logger.metadata(fetcher: :block_realtime)
Indexer.Logger.metadata(
fn ->
if reorg? do
# give previous fetch attempt (for same block number) a chance to finish
# before fetching again, to reduce block consensus mistakes
@ -168,6 +168,10 @@ defmodule Indexer.Block.Realtime.Fetcher do
end
do_fetch_and_import_block(block_number_to_fetch, block_fetcher, retry)
end,
fetcher: :block_realtime,
block_number: block_number_to_fetch
)
end
@decorate span(tracer: Tracer)
@ -179,33 +183,28 @@ defmodule Indexer.Block.Realtime.Fetcher do
Task.Supervisor.start_child(TaskSupervisor, ConsensusEnsurer, :perform, args)
end
Logger.debug(fn ->
["fetched and imported block ", to_string(block_number_to_fetch)]
end)
Logger.debug("Fetched and imported.")
{:ok, %{inserted: _, errors: [_ | _] = errors}} ->
Logger.error(fn ->
[
"failed to fetch block ",
to_string(block_number_to_fetch),
": ",
"failed to fetch block: ",
inspect(errors),
". Block will be retried by catchup indexer."
]
end)
{:error, {step, reason}} ->
Logger.error(fn ->
Logger.error(
fn ->
[
"failed to fetch ",
to_string(step),
" for block ",
to_string(block_number_to_fetch),
": ",
"failed to fetch: ",
inspect(reason),
". Block will be retried by catchup indexer."
]
end)
end,
step: step
)
{:error, [%Changeset{} | _] = changesets} ->
params = %{
@ -228,17 +227,16 @@ defmodule Indexer.Block.Realtime.Fetcher do
end
{:error, {step, failed_value, _changes_so_far}} ->
Logger.error(fn ->
Logger.error(
fn ->
[
"failed to insert ",
to_string(step),
" for block ",
to_string(block_number_to_fetch),
": ",
"failed to insert: ",
inspect(failed_value),
". Block will be retried by catchup indexer."
]
end)
end,
step: step
)
end
end

@ -74,16 +74,22 @@ defmodule Indexer.Block.Uncle.Fetcher do
# the same block could be included as an uncle on multiple blocks, but we only want to fetch it once
unique_hashes = Enum.uniq(hashes)
Logger.debug(fn -> "fetching #{length(unique_hashes)}" end)
unique_hash_count = Enum.count(unique_hashes)
Logger.metadata(count: unique_hash_count)
Logger.debug("fetching")
case EthereumJSONRPC.fetch_blocks_by_hash(unique_hashes, json_rpc_named_arguments) do
{:ok, blocks} ->
run_blocks(blocks, block_fetcher, unique_hashes)
{:error, reason} ->
Logger.error(fn ->
["failed to fetch ", unique_hashes |> length |> to_string(), ": ", inspect(reason)]
end)
Logger.error(
fn ->
["failed to fetch: ", inspect(reason)]
end,
error_count: unique_hash_count
)
{:retry, unique_hashes}
end
@ -110,19 +116,13 @@ defmodule Indexer.Block.Uncle.Fetcher do
transactions: %{params: transactions_params, on_conflict: :nothing}
}) do
{:ok, _} ->
retry(errors, original_entries)
retry(errors)
{:error, step, failed_value, _changes_so_far} ->
Logger.error(fn ->
[
"failed to import ",
original_entries |> length() |> to_string(),
" in step ",
inspect(step),
": ",
inspect(failed_value)
]
end)
Logger.error(fn -> ["failed to import: ", inspect(failed_value)] end,
step: step,
error_count: Enum.count(original_entries)
)
{:retry, original_entries}
end
@ -185,21 +185,20 @@ defmodule Indexer.Block.Uncle.Fetcher do
end)
end
defp retry([], _), do: :ok
defp retry([]), do: :ok
defp retry(errors, original_entries) when is_list(errors) do
defp retry(errors) when is_list(errors) do
retried_entries = errors_to_entries(errors)
Logger.error(fn ->
Logger.error(
fn ->
[
"failed to fetch ",
retried_entries |> length() |> to_string(),
"/",
original_entries |> length() |> to_string(),
": ",
"failed to fetch: ",
errors_to_iodata(errors)
]
end)
end,
error_count: Enum.count(retried_entries)
)
end
defp errors_to_entries(errors) when is_list(errors) do

@ -74,7 +74,10 @@ defmodule Indexer.CoinBalance.Fetcher do
# `{address, block}`, so take unique params only
unique_entries = Enum.uniq(entries)
Logger.debug(fn -> ["fetching ", unique_entries |> length() |> to_string()] end)
unique_entry_count = Enum.count(unique_entries)
Logger.metadata(count: unique_entry_count)
Logger.debug(fn -> "fetching" end)
unique_entries
|> Enum.map(&entry_to_params/1)
@ -84,9 +87,12 @@ defmodule Indexer.CoinBalance.Fetcher do
run_fetched_balances(fetched_balances, unique_entries)
{:error, reason} ->
Logger.error(fn ->
["failed to fetch ", unique_entries |> length() |> to_string(), ": ", inspect(reason)]
end)
Logger.error(
fn ->
["failed to fetch: ", inspect(reason)]
end,
error_count: unique_entry_count
)
{:retry, unique_entries}
end
@ -115,7 +121,7 @@ defmodule Indexer.CoinBalance.Fetcher do
defp run_fetched_balances(%FetchedBalances{params_list: []}, original_entries), do: {:retry, original_entries}
defp run_fetched_balances(%FetchedBalances{params_list: params_list, errors: errors}, original_entries) do
defp run_fetched_balances(%FetchedBalances{params_list: params_list, errors: errors}, _) do
value_fetched_at = DateTime.utc_now()
importable_balances_params = Enum.map(params_list, &Map.put(&1, :value_fetched_at, value_fetched_at))
@ -128,24 +134,23 @@ defmodule Indexer.CoinBalance.Fetcher do
address_coin_balances: %{params: importable_balances_params}
})
retry(errors, original_entries)
retry(errors)
end
defp retry([], _), do: :ok
defp retry([]), do: :ok
defp retry(errors, original_entries) when is_list(errors) do
defp retry(errors) when is_list(errors) do
retried_entries = fetched_balances_errors_to_entries(errors)
Logger.error(fn ->
Logger.error(
fn ->
[
"failed to fetch ",
retried_entries |> length() |> to_string(),
"/",
original_entries |> length() |> to_string(),
": ",
"failed to fetch: ",
fetched_balance_errors_to_iodata(errors)
]
end)
end,
error_count: Enum.count(retried_entries)
)
{:retry, retried_entries}
end

@ -103,7 +103,10 @@ defmodule Indexer.InternalTransaction.Fetcher do
def run(entries, json_rpc_named_arguments) do
unique_entries = unique_entries(entries)
Logger.debug(fn -> "fetching internal transactions for #{length(unique_entries)} transactions" end)
unique_entries_count = Enum.count(unique_entries)
Logger.metadata(count: unique_entries_count)
Logger.debug("fetching internal transactions for transactions")
unique_entries
|> Enum.map(&params/1)
@ -128,25 +131,25 @@ defmodule Indexer.InternalTransaction.Fetcher do
})
else
{:error, step, reason, _changes_so_far} ->
Logger.error(fn ->
Logger.error(
fn ->
[
"failed to import internal transactions for ",
to_string(length(entries)),
" transactions at ",
to_string(step),
": ",
"failed to import internal transactions for transactions: ",
inspect(reason)
]
end)
end,
step: step,
error_count: unique_entries_count
)
# re-queue the de-duped entries
{:retry, unique_entries}
end
{:error, reason} ->
Logger.error(fn ->
"failed to fetch internal transactions for #{length(entries)} transactions: #{inspect(reason)}"
end)
Logger.error(fn -> ["failed to fetch internal transactions for transactions: ", inspect(reason)] end,
error_count: unique_entries_count
)
# re-queue the de-duped entries
{:retry, unique_entries}

@ -1,8 +1,22 @@
defmodule Indexer.Logger do
@moduledoc """
Helpers for formatting `Logger` data as `t:iodata/0`.
Helpers for `Logger`.
"""
@doc """
Sets `keyword` in `Logger.metadata/1` around `fun`.
"""
def metadata(fun, keyword) when is_function(fun, 0) and is_list(keyword) do
metadata_before = Logger.metadata()
try do
Logger.metadata(keyword)
fun.()
after
Logger.reset_metadata(metadata_before)
end
end
@doc """
The PID and its registered name (if it has one) as `t:iodata/0`.
"""

@ -93,10 +93,11 @@ defmodule Indexer.TokenBalance.Fetcher do
end
def fetch_from_blockchain(params_list) do
{:ok, token_balances} =
params_list
|> Enum.filter(&(&1.retries_count <= @max_retries))
|> TokenBalances.fetch_token_balances_from_blockchain()
retryable_params_list = Enum.filter(params_list, &(&1.retries_count <= @max_retries))
Logger.metadata(count: Enum.count(retryable_params_list))
{:ok, token_balances} = TokenBalances.fetch_token_balances_from_blockchain(retryable_params_list)
token_balances
end
@ -116,7 +117,9 @@ defmodule Indexer.TokenBalance.Fetcher do
:ok
{:error, reason} ->
Logger.debug(fn -> "failed to import #{length(token_balances_params)} token balances, #{inspect(reason)}" end)
Logger.debug(fn -> ["failed to import token balances: ", inspect(reason)] end,
error_count: Enum.count(token_balances_params)
)
:error
end

@ -34,7 +34,7 @@ defmodule Indexer.TokenBalances do
@decorate span(tracer: Tracer)
def fetch_token_balances_from_blockchain(token_balances, opts \\ []) do
Logger.debug(fn -> "fetching #{Enum.count(token_balances)} token balances" end)
Logger.debug("fetching token balances", count: Enum.count(token_balances))
task_timeout = Keyword.get(opts, :timeout, @task_timeout)

@ -32,19 +32,25 @@ config :logger,
config :logger, :console,
# Use same format for all loggers, even though the level should only ever be `:error` for `:error` backend
format: "$dateT$time $metadata[$level] $message\n",
metadata: ~w(application fetcher request_id)a
metadata:
~w(application fetcher request_id first_block_number last_block_number missing_block_range_count missing_block_count
block_number step count error_count shrunk)a
config :logger, :ecto,
# Use same format for all loggers, even though the level should only ever be `:error` for `:error` backend
format: "$dateT$time $metadata[$level] $message\n",
metadata: ~w(application fetcher request_id)a,
metadata:
~w(application fetcher request_id first_block_number last_block_number missing_block_range_count missing_block_count
block_number step count error_count shrunk)a,
metadata_filter: [application: :ecto]
config :logger, :error,
# Use same format for all loggers, even though the level should only ever be `:error` for `:error` backend
format: "$dateT$time $metadata[$level] $message\n",
level: :error,
metadata: ~w(application fetcher request_id)a
metadata:
~w(application fetcher request_id first_block_number last_block_number missing_block_range_count missing_block_count
block_number step count error_count shrunk)a
# Import environment specific config. This must remain at the bottom
# of this file so it overrides the configuration defined above.

Loading…
Cancel
Save