Fetchers graceful shutdown

pull/5720/head
Qwerty5Uiop 2 years ago
parent e8b55afd92
commit 2d451a7c05
  1. 1
      CHANGELOG.md
  2. 15
      apps/indexer/lib/indexer/block/catchup/fetcher.ex
  3. 42
      apps/indexer/lib/indexer/block/realtime/fetcher.ex
  4. 22
      apps/indexer/lib/indexer/supervisor.ex
  5. 3
      apps/indexer/test/indexer/block/catchup/fetcher_test.exs
  6. 1
      docker-compose/docker-compose-no-build-erigon.yml
  7. 1
      docker-compose/docker-compose-no-build-ganache.yml
  8. 1
      docker-compose/docker-compose-no-build-geth.yml
  9. 1
      docker-compose/docker-compose-no-build-hardhat-network.yml
  10. 1
      docker-compose/docker-compose-no-build-nethermind.yml
  11. 1
      docker-compose/docker-compose-no-build-no-db-container.yml
  12. 1
      docker-compose/docker-compose-no-rust-services.yml
  13. 1
      docker-compose/docker-compose.yml

@ -7,6 +7,7 @@
- [#6897](https://github.com/blockscout/blockscout/pull/6897) - Support basic auth in JSON RPC endpoint
- [#6908](https://github.com/blockscout/blockscout/pull/6908) - Allow disable API rate limit
- [#6951](https://github.com/blockscout/blockscout/pull/6951) - Set poll: true for TokenInstance fetcher
- [#5720](https://github.com/blockscout/blockscout/pull/5720) - Fetchers graceful shutdown
### Fixes

@ -25,12 +25,13 @@ defmodule Indexer.Block.Catchup.Fetcher do
alias Explorer.Chain
alias Explorer.Utility.MissingBlockRange
alias Indexer.{Block, Tracer}
alias Indexer.Block.Catchup.Sequence
alias Indexer.Block.Catchup.{Sequence, TaskSupervisor}
alias Indexer.Memory.Shrinkable
alias Indexer.Prometheus
@behaviour Block.Fetcher
@shutdown_after :timer.minutes(5)
@sequence_name :block_catchup_sequencer
defstruct block_fetcher: nil,
@ -151,12 +152,13 @@ defmodule Indexer.Block.Catchup.Fetcher do
defp stream_fetch_and_import(state, sequence)
when is_pid(sequence) do
sequence
|> Sequence.build_stream()
|> Task.async_stream(
&fetch_and_import_range_from_sequence(state, &1, sequence),
ranges = Sequence.build_stream(sequence)
TaskSupervisor
|> Task.Supervisor.async_stream(ranges, &fetch_and_import_range_from_sequence(state, &1, sequence),
max_concurrency: blocks_concurrency(),
timeout: :infinity
timeout: :infinity,
shutdown: @shutdown_after
)
|> Stream.run()
end
@ -173,6 +175,7 @@ defmodule Indexer.Block.Catchup.Fetcher do
sequence
) do
Logger.metadata(fetcher: :block_catchup, first_block_number: first, last_block_number: last)
Process.flag(:trap_exit, true)
{fetch_duration, result} = :timer.tc(fn -> fetch_and_import_range(block_fetcher, range) end)

@ -41,8 +41,17 @@ defmodule Indexer.Block.Realtime.Fetcher do
@minimum_safe_polling_period :timer.seconds(1)
@blocks_concurrency 100
@shutdown_after :timer.minutes(1)
@enforce_keys ~w(block_fetcher)a
defstruct ~w(block_fetcher subscription previous_number timer)a
defstruct block_fetcher: nil,
subscription: nil,
previous_number: nil,
timer: nil,
blocks_concurrency: @blocks_concurrency
@type t :: %__MODULE__{
block_fetcher: %Block.Fetcher{
@ -53,7 +62,9 @@ defmodule Indexer.Block.Realtime.Fetcher do
receipts_concurrency: pos_integer()
},
subscription: Subscription.t(),
previous_number: pos_integer() | nil
previous_number: pos_integer() | nil,
timer: reference(),
blocks_concurrency: pos_integer()
}
def start_link([arguments, gen_server_options]) do
@ -81,7 +92,8 @@ defmodule Indexer.Block.Realtime.Fetcher do
block_fetcher: %Block.Fetcher{} = block_fetcher,
subscription: %Subscription{} = subscription,
previous_number: previous_number,
timer: timer
timer: timer,
blocks_concurrency: blocks_concurrency
} = state
)
when is_binary(quantity) do
@ -93,7 +105,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
# Subscriptions don't support getting all the blocks and transactions data,
# so we need to go back and get the full block
start_fetch_and_import(number, block_fetcher, previous_number)
start_fetch_and_import(number, block_fetcher, blocks_concurrency, previous_number)
Process.cancel_timer(timer)
new_timer = schedule_polling()
@ -111,13 +123,14 @@ defmodule Indexer.Block.Realtime.Fetcher do
:poll_latest_block_number,
%__MODULE__{
block_fetcher: %Block.Fetcher{json_rpc_named_arguments: json_rpc_named_arguments} = block_fetcher,
previous_number: previous_number
previous_number: previous_number,
blocks_concurrency: blocks_concurrency
} = state
) do
new_previous_number =
case EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments) do
{:ok, number} when is_nil(previous_number) or number != previous_number ->
start_fetch_and_import(number, block_fetcher, previous_number)
start_fetch_and_import(number, block_fetcher, blocks_concurrency, previous_number)
number
@ -228,14 +241,19 @@ defmodule Indexer.Block.Realtime.Fetcher do
{:ok, []}
end
defp start_fetch_and_import(number, block_fetcher, previous_number) do
defp start_fetch_and_import(number, block_fetcher, blocks_concurrency, previous_number) do
start_at = determine_start_at(number, previous_number)
is_reorg = reorg?(number, previous_number)
for block_number_to_fetch <- start_at..number do
args = [block_number_to_fetch, block_fetcher, is_reorg]
Task.Supervisor.start_child(TaskSupervisor, __MODULE__, :fetch_and_import_block, args)
end
TaskSupervisor
|> Task.Supervisor.async_stream(
start_at..number,
&fetch_and_import_block(&1, block_fetcher, is_reorg),
max_concurrency: blocks_concurrency,
timeout: :infinity,
shutdown: @shutdown_after
)
|> Stream.run()
end
defp determine_start_at(number, nil), do: number
@ -260,6 +278,8 @@ defmodule Indexer.Block.Realtime.Fetcher do
@decorate trace(name: "fetch", resource: "Indexer.Block.Realtime.Fetcher.fetch_and_import_block/3", tracer: Tracer)
def fetch_and_import_block(block_number_to_fetch, block_fetcher, reorg?, retry \\ 3) do
Process.flag(:trap_exit, true)
Indexer.Logger.metadata(
fn ->
if reorg? do

@ -96,15 +96,6 @@ defmodule Indexer.Supervisor do
[
# Root fetchers
{PendingTransaction.Supervisor, [[json_rpc_named_arguments: json_rpc_named_arguments]]},
configure(Realtime.Supervisor, [
%{block_fetcher: realtime_block_fetcher, subscribe_named_arguments: realtime_subscribe_named_arguments},
[name: Realtime.Supervisor]
]),
{Catchup.Supervisor,
[
%{block_fetcher: block_fetcher, block_interval: block_interval, memory_monitor: memory_monitor},
[name: Catchup.Supervisor]
]},
# Async catchup fetchers
{UncleBlock.Supervisor, [[block_fetcher: block_fetcher, memory_monitor: memory_monitor]]},
@ -139,7 +130,18 @@ defmodule Indexer.Supervisor do
{BlocksTransactionsMismatch.Supervisor,
[[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]},
{PendingOpsCleaner, [[], []]},
{PendingBlockOperationsSanitizer, [[]]}
{PendingBlockOperationsSanitizer, [[]]},
# Block fetchers
configure(Realtime.Supervisor, [
%{block_fetcher: realtime_block_fetcher, subscribe_named_arguments: realtime_subscribe_named_arguments},
[name: Realtime.Supervisor]
]),
{Catchup.Supervisor,
[
%{block_fetcher: block_fetcher, block_interval: block_interval, memory_monitor: memory_monitor},
[name: Catchup.Supervisor]
]}
]
|> List.flatten()

@ -144,6 +144,7 @@ defmodule Indexer.Block.Catchup.FetcherTest do
} do
Application.put_env(:indexer, Indexer.Block.Catchup.Fetcher, batch_size: 1, concurrency: 10)
Application.put_env(:indexer, :block_ranges, "0..1")
start_supervised!({Task.Supervisor, name: Indexer.Block.Catchup.TaskSupervisor})
CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
InternalTransaction.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
@ -300,6 +301,7 @@ defmodule Indexer.Block.Catchup.FetcherTest do
} do
Application.put_env(:indexer, Indexer.Block.Catchup.Fetcher, batch_size: 1, concurrency: 10)
Application.put_env(:indexer, :block_ranges, "0..1")
start_supervised!({Task.Supervisor, name: Indexer.Block.Catchup.TaskSupervisor})
CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
InternalTransaction.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
@ -453,6 +455,7 @@ defmodule Indexer.Block.Catchup.FetcherTest do
} do
Application.put_env(:indexer, Indexer.Block.Catchup.Fetcher, batch_size: 1, concurrency: 10)
Application.put_env(:indexer, :block_ranges, "0..1")
start_supervised!({Task.Supervisor, name: Indexer.Block.Catchup.TaskSupervisor})
CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
InternalTransaction.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)

@ -20,6 +20,7 @@ services:
image: blockscout/blockscout:${DOCKER_TAG:-latest}
pull_policy: always
restart: always
stop_grace_period: 5m
container_name: 'blockscout'
links:
- db:database

@ -20,6 +20,7 @@ services:
image: blockscout/blockscout:${DOCKER_TAG:-latest}
pull_policy: always
restart: always
stop_grace_period: 5m
container_name: 'blockscout'
links:
- db:database

@ -20,6 +20,7 @@ services:
image: blockscout/blockscout:${DOCKER_TAG:-latest}
pull_policy: always
restart: always
stop_grace_period: 5m
container_name: 'blockscout'
links:
- db:database

@ -20,6 +20,7 @@ services:
image: blockscout/blockscout:${DOCKER_TAG:-latest}
pull_policy: always
restart: always
stop_grace_period: 5m
container_name: 'blockscout'
links:
- db:database

@ -20,6 +20,7 @@ services:
image: blockscout/blockscout:${DOCKER_TAG:-latest}
pull_policy: always
restart: always
stop_grace_period: 5m
container_name: 'blockscout'
links:
- db:database

@ -14,6 +14,7 @@ services:
image: blockscout/blockscout:${DOCKER_TAG:-latest}
pull_policy: always
restart: always
stop_grace_period: 5m
container_name: 'blockscout'
command: bash -c "bin/blockscout eval \"Elixir.Explorer.ReleaseTasks.create_and_migrate()\" && bin/blockscout start"
extra_hosts:

@ -30,6 +30,7 @@ services:
WOBSERVER_ENABLED: "false"
ADMIN_PANEL_ENABLED: ""
restart: always
stop_grace_period: 5m
container_name: 'blockscout'
links:
- db:database

@ -31,6 +31,7 @@ services:
WOBSERVER_ENABLED: "false"
ADMIN_PANEL_ENABLED: ""
restart: always
stop_grace_period: 5m
container_name: 'blockscout'
links:
- db:database

Loading…
Cancel
Save