Configure block workers to run on their own queues

pull/2/head
Doc Ritezel 7 years ago
parent 1971309d22
commit 89bba0b6e9
  1. 3
      config/config.exs
  2. 3
      config/dev.exs
  3. 8
      config/prod.exs
  4. 2
      lib/explorer/importers/block_importer.ex
  5. 4
      lib/explorer/workers/import_block.ex
  6. 2
      lib/explorer/workers/import_transaction.ex

@ -44,7 +44,8 @@ config :exq,
start_on_application: false, start_on_application: false,
scheduler_enable: true, scheduler_enable: true,
shutdown_timeout: 5000, shutdown_timeout: 5000,
max_retries: 10 max_retries: 10,
queues: [{"default", 1}, {"blocks", 1}, {"transactions", 1}]
config :exq_ui, config :exq_ui,
server: false server: false

@ -62,7 +62,4 @@ config :explorer, Explorer.Scheduler,
[schedule: {:extended, "*/15 * * * * *"}, task: {Explorer.Workers.ImportSkippedBlocks, :perform_later, [1]}], [schedule: {:extended, "*/15 * * * * *"}, task: {Explorer.Workers.ImportSkippedBlocks, :perform_later, [1]}],
] ]
config :exq,
concurrency: 4
import_config "dev.secret.exs" import_config "dev.secret.exs"

@ -54,6 +54,10 @@ config :explorer, Explorer.Scheduler,
# Configure Exq # Configure Exq
config :exq, config :exq,
concurrency: String.to_integer(System.get_env("EXQ_CONCURRENCY") || "1"),
node_identifier: Explorer.ExqNodeIdentifier, node_identifier: Explorer.ExqNodeIdentifier,
url: System.get_env("REDIS_URL") url: System.get_env("REDIS_URL"),
queues: [
{"default", String.to_integer(System.get_env("EXQ_CONCURRENCY") || "1")},
{"blocks", String.to_integer(System.get_env("EXQ_BLOCKS_CONCURRENCY") || "1")},
{"transactions", String.to_integer(System.get_env("EXQ_TRANSACTIONS_CONCURRENCY") || "1")}
]

@ -7,6 +7,7 @@ defmodule Explorer.BlockImporter do
alias Explorer.Repo.NewRelic, as: Repo alias Explorer.Repo.NewRelic, as: Repo
alias Explorer.Workers.ImportTransaction alias Explorer.Workers.ImportTransaction
@dialyzer {:nowarn_function, import: 1}
def import(block_number) do def import(block_number) do
raw_block = download_block(block_number) raw_block = download_block(block_number)
changes = extract_block(raw_block) changes = extract_block(raw_block)
@ -19,6 +20,7 @@ defmodule Explorer.BlockImporter do
import_transactions(raw_block["transactions"]) import_transactions(raw_block["transactions"])
end end
@dialyzer {:nowarn_function, download_block: 1}
def download_block(block_number) do def download_block(block_number) do
{:ok, block} = eth_get_block_by_number(block_number, false) {:ok, block} = eth_get_block_by_number(block_number, false)
block block

@ -12,10 +12,10 @@ defmodule Explorer.Workers.ImportBlock do
def perform, do: perform("latest") def perform, do: perform("latest")
def perform_later("latest") do def perform_later("latest") do
Exq.enqueue(Exq.Enqueuer, "default", __MODULE__, ["latest"], max_retries: 0) Exq.enqueue(Exq.Enqueuer, "blocks", __MODULE__, ["latest"], max_retries: 0)
end end
def perform_later(number) do def perform_later(number) do
Exq.enqueue(Exq.Enqueuer, "default", __MODULE__, [number]) Exq.enqueue(Exq.Enqueuer, "blocks", __MODULE__, [number])
end end
end end

@ -9,6 +9,6 @@ defmodule Explorer.Workers.ImportTransaction do
end end
def perform_later(hash) do def perform_later(hash) do
Exq.enqueue(Exq.Enqueuer, "default", __MODULE__, [hash]) Exq.enqueue(Exq.Enqueuer, "transactions", __MODULE__, [hash])
end end
end end

Loading…
Cancel
Save