Merge pull request #2899 from poanetwork/ab-fix-empty-buffered-task

fix empty buffered task
pull/2932/head
Victor Baranov 5 years ago committed by GitHub
commit 768ce9a85a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 10
      apps/indexer/lib/indexer/buffered_task.ex
  3. 1
      apps/indexer/lib/indexer/fetcher/internal_transaction.ex
  4. 3
      apps/indexer/test/support/indexer/fetcher/internal_transaction_supervisor_case.ex

@ -8,6 +8,7 @@
- [#2906](https://github.com/poanetwork/blockscout/pull/2906) - fix address sum cache - [#2906](https://github.com/poanetwork/blockscout/pull/2906) - fix address sum cache
- [#2902](https://github.com/poanetwork/blockscout/pull/2902) - Offset in blocks retrieval for average block time - [#2902](https://github.com/poanetwork/blockscout/pull/2902) - Offset in blocks retrieval for average block time
- [#2900](https://github.com/poanetwork/blockscout/pull/2900) - check fetched instance metadata in multiple places - [#2900](https://github.com/poanetwork/blockscout/pull/2900) - check fetched instance metadata in multiple places
- [#2899](https://github.com/poanetwork/blockscout/pull/2899) - fix empty buffered task
### Chore ### Chore
- [#2896](https://github.com/poanetwork/blockscout/pull/2896) - Disable Parity websockets tests - [#2896](https://github.com/poanetwork/blockscout/pull/2896) - Disable Parity websockets tests

@ -8,6 +8,7 @@ defmodule Indexer.BufferedTask do
* `:flush_interval` - The interval in milliseconds to flush the buffer. * `:flush_interval` - The interval in milliseconds to flush the buffer.
* `:max_concurrency` - The maximum number of tasks to run concurrently at any give time. * `:max_concurrency` - The maximum number of tasks to run concurrently at any give time.
* `:poll` - poll for new records when all records are processed
* `:max_batch_size` - The maximum batch passed to `c:run/2`. * `:max_batch_size` - The maximum batch passed to `c:run/2`.
* `:memory_monitor` - The `Indexer.Memory.Monitor` `t:GenServer.server/0` to register as * `:memory_monitor` - The `Indexer.Memory.Monitor` `t:GenServer.server/0` to register as
`Indexer.Memory.Monitor.shrinkable/0` with. `Indexer.Memory.Monitor.shrinkable/0` with.
@ -70,6 +71,7 @@ defmodule Indexer.BufferedTask do
flush_interval: nil, flush_interval: nil,
max_batch_size: nil, max_batch_size: nil,
max_concurrency: nil, max_concurrency: nil,
poll: false,
metadata: [], metadata: [],
current_buffer: [], current_buffer: [],
bound_queue: %BoundQueue{}, bound_queue: %BoundQueue{},
@ -229,6 +231,7 @@ defmodule Indexer.BufferedTask do
state = %BufferedTask{ state = %BufferedTask{
callback_module: callback_module, callback_module: callback_module,
callback_module_state: Keyword.fetch!(opts, :state), callback_module_state: Keyword.fetch!(opts, :state),
poll: Keyword.get(opts, :poll, false),
task_supervisor: Keyword.fetch!(opts, :task_supervisor), task_supervisor: Keyword.fetch!(opts, :task_supervisor),
flush_interval: Keyword.fetch!(opts, :flush_interval), flush_interval: Keyword.fetch!(opts, :flush_interval),
max_batch_size: Keyword.fetch!(opts, :max_batch_size), max_batch_size: Keyword.fetch!(opts, :max_batch_size),
@ -434,7 +437,12 @@ defmodule Indexer.BufferedTask do
end end
end end
# was shrunk and out of work, get more work from `init/2` # get more work from `init/2`
defp schedule_next(%BufferedTask{poll: true, bound_queue: %BoundQueue{size: 0}} = state) do
do_initial_stream(state)
end
# was shrunk and was out of work, get more work from `init/2`
defp schedule_next(%BufferedTask{bound_queue: %BoundQueue{size: 0, maximum_size: maximum_size}} = state) defp schedule_next(%BufferedTask{bound_queue: %BoundQueue{size: 0, maximum_size: maximum_size}} = state)
when maximum_size != nil do when maximum_size != nil do
Logger.info(fn -> Logger.info(fn ->

@ -26,6 +26,7 @@ defmodule Indexer.Fetcher.InternalTransaction do
flush_interval: :timer.seconds(3), flush_interval: :timer.seconds(3),
max_concurrency: @max_concurrency, max_concurrency: @max_concurrency,
max_batch_size: @max_batch_size, max_batch_size: @max_batch_size,
poll: true,
task_supervisor: Indexer.Fetcher.InternalTransaction.TaskSupervisor, task_supervisor: Indexer.Fetcher.InternalTransaction.TaskSupervisor,
metadata: [fetcher: :internal_transaction] metadata: [fetcher: :internal_transaction]
] ]

@ -7,7 +7,8 @@ defmodule Indexer.Fetcher.InternalTransaction.Supervisor.Case do
fetcher_arguments, fetcher_arguments,
flush_interval: 50, flush_interval: 50,
max_batch_size: 1, max_batch_size: 1,
max_concurrency: 1 max_concurrency: 1,
poll: false
) )
[merged_fetcher_arguments] [merged_fetcher_arguments]

Loading…
Cancel
Save