[PAN-2786] Stop Transaction Pool Queue from Growing Unbounded (#1586)

* [PAN-2786] Stop Transaction Pool Queue from Growing Unbounded

- use a `ArrayBlockingQueue` with a fixed size to limit the transaction task queue
- expose a method in `MonitoredExecutors` to create a working queue with a maximum capacity
- update `EthScheduler` to use a limited working queue for the `txWorkerExecutor`

* [PAN-2786] Implement a bounded timed queue

- implement a custom bounded queue
- use a time based policy with keep alive configuration
- implement eviction process based on the policy
- add metrics

* use field instead of parameter

* fix PR pass 1

- use concrete class instead of interface
- change metric name to comply with global policy
- update unit test
- wrap `Runnable` into `scheduleTxWorkerTask`

* fix PR

- remove time based policy
- use raw `Runnable`
- make a room for a new element at full capacity

* Update BoundedQueueTest.java

invert condition

* fix PR comments

- remove Mock class
- make logic more thread safe, avoid race condition
- remove element until the new one is accepted

* Update ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthScheduler.java

Co-Authored-By: Adrian Sutton <adrian@symphonious.net>

* spotless apply

* fix nit comments

- use assertj assertions for better readability
- improve unit test

* spotless apply

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
Abdelhamid Bakhta 5 years ago committed by GitHub
parent 04069f20c2
commit 395ccae15e
  1. 19
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthScheduler.java
  2. 30
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/MonitoredExecutors.java
  3. 44
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/bounded/BoundedQueue.java
  4. 4
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/DeterministicEthScheduler.java
  5. 1
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/MockExecutorService.java
  6. 53
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/bounded/BoundedQueueTest.java

@ -12,6 +12,7 @@
*/ */
package tech.pegasys.pantheon.ethereum.eth.manager; package tech.pegasys.pantheon.ethereum.eth.manager;
import static tech.pegasys.pantheon.ethereum.eth.manager.MonitoredExecutors.newBoundedThreadPool;
import static tech.pegasys.pantheon.ethereum.eth.manager.MonitoredExecutors.newCachedThreadPool; import static tech.pegasys.pantheon.ethereum.eth.manager.MonitoredExecutors.newCachedThreadPool;
import static tech.pegasys.pantheon.ethereum.eth.manager.MonitoredExecutors.newFixedThreadPool; import static tech.pegasys.pantheon.ethereum.eth.manager.MonitoredExecutors.newFixedThreadPool;
import static tech.pegasys.pantheon.ethereum.eth.manager.MonitoredExecutors.newScheduledThreadPool; import static tech.pegasys.pantheon.ethereum.eth.manager.MonitoredExecutors.newScheduledThreadPool;
@ -45,9 +46,9 @@ public class EthScheduler {
private static final Logger LOG = LogManager.getLogger(); private static final Logger LOG = LogManager.getLogger();
private final Duration defaultTimeout = Duration.ofSeconds(5); private final Duration defaultTimeout = Duration.ofSeconds(5);
private final AtomicBoolean stopped = new AtomicBoolean(false); private final AtomicBoolean stopped = new AtomicBoolean(false);
private final CountDownLatch shutdown = new CountDownLatch(1); private final CountDownLatch shutdown = new CountDownLatch(1);
private static final int TX_WORKER_CAPACITY = 1000000;
protected final ExecutorService syncWorkerExecutor; protected final ExecutorService syncWorkerExecutor;
protected final ScheduledExecutorService scheduler; protected final ScheduledExecutorService scheduler;
@ -62,12 +63,24 @@ public class EthScheduler {
final int txWorkerCount, final int txWorkerCount,
final int computationWorkerCount, final int computationWorkerCount,
final MetricsSystem metricsSystem) { final MetricsSystem metricsSystem) {
this(syncWorkerCount, txWorkerCount, TX_WORKER_CAPACITY, computationWorkerCount, metricsSystem);
}
public EthScheduler(
final int syncWorkerCount,
final int txWorkerCount,
final int txWorkerQueueSize,
final int computationWorkerCount,
final MetricsSystem metricsSystem) {
this( this(
newFixedThreadPool( newFixedThreadPool(
EthScheduler.class.getSimpleName() + "-Workers", syncWorkerCount, metricsSystem), EthScheduler.class.getSimpleName() + "-Workers", syncWorkerCount, metricsSystem),
newScheduledThreadPool(EthScheduler.class.getSimpleName() + "-Timer", 1, metricsSystem), newScheduledThreadPool(EthScheduler.class.getSimpleName() + "-Timer", 1, metricsSystem),
newFixedThreadPool( newBoundedThreadPool(
EthScheduler.class.getSimpleName() + "-Transactions", txWorkerCount, metricsSystem), EthScheduler.class.getSimpleName() + "-Transactions",
txWorkerCount,
txWorkerQueueSize,
metricsSystem),
newCachedThreadPool(EthScheduler.class.getSimpleName() + "-Services", metricsSystem), newCachedThreadPool(EthScheduler.class.getSimpleName() + "-Services", metricsSystem),
newFixedThreadPool( newFixedThreadPool(
EthScheduler.class.getSimpleName() + "-Computation", EthScheduler.class.getSimpleName() + "-Computation",

@ -12,11 +12,13 @@
*/ */
package tech.pegasys.pantheon.ethereum.eth.manager; package tech.pegasys.pantheon.ethereum.eth.manager;
import tech.pegasys.pantheon.ethereum.eth.manager.bounded.BoundedQueue;
import tech.pegasys.pantheon.metrics.Counter; import tech.pegasys.pantheon.metrics.Counter;
import tech.pegasys.pantheon.metrics.MetricsSystem; import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.metrics.PantheonMetricCategory; import tech.pegasys.pantheon.metrics.PantheonMetricCategory;
import java.util.Locale; import java.util.Locale;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.RejectedExecutionHandler;
@ -35,6 +37,26 @@ public class MonitoredExecutors {
public static ExecutorService newFixedThreadPool( public static ExecutorService newFixedThreadPool(
final String name, final int workerCount, final MetricsSystem metricsSystem) { final String name, final int workerCount, final MetricsSystem metricsSystem) {
return newFixedThreadPool(name, workerCount, new LinkedBlockingQueue<>(), metricsSystem);
}
public static ExecutorService newBoundedThreadPool(
final String name,
final int workerCount,
final int queueSize,
final MetricsSystem metricsSystem) {
return newFixedThreadPool(
name,
workerCount,
new BoundedQueue(queueSize, toMetricName(name), metricsSystem),
metricsSystem);
}
public static ExecutorService newFixedThreadPool(
final String name,
final int workerCount,
final BlockingQueue<Runnable> workingQueue,
final MetricsSystem metricsSystem) {
return newMonitoredExecutor( return newMonitoredExecutor(
name, name,
metricsSystem, metricsSystem,
@ -44,7 +66,7 @@ public class MonitoredExecutors {
workerCount, workerCount,
0L, 0L,
TimeUnit.MILLISECONDS, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), workingQueue,
threadFactory, threadFactory,
rejectedExecutionHandler)); rejectedExecutionHandler));
} }
@ -79,7 +101,7 @@ public class MonitoredExecutors {
final MetricsSystem metricsSystem, final MetricsSystem metricsSystem,
final BiFunction<RejectedExecutionHandler, ThreadFactory, T> creator) { final BiFunction<RejectedExecutionHandler, ThreadFactory, T> creator) {
final String metricName = name.toLowerCase(Locale.US).replace('-', '_'); final String metricName = toMetricName(name);
final T executor = final T executor =
creator.apply( creator.apply(
@ -119,6 +141,10 @@ public class MonitoredExecutors {
return executor; return executor;
} }
private static String toMetricName(final String name) {
return name.toLowerCase(Locale.US).replace('-', '_');
}
private static class CountingAbortPolicy extends AbortPolicy { private static class CountingAbortPolicy extends AbortPolicy {
private final Counter rejectedTaskCounter; private final Counter rejectedTaskCounter;

@ -0,0 +1,44 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.manager.bounded;
import tech.pegasys.pantheon.metrics.Counter;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.metrics.PantheonMetricCategory;
import java.util.concurrent.LinkedBlockingDeque;
public class BoundedQueue extends LinkedBlockingDeque<Runnable> {
private final MetricsSystem metricsSystem;
private final Counter totalEvictedTaskCounter;
public BoundedQueue(
final int capacity, final String metricName, final MetricsSystem metricsSystem) {
super(capacity);
this.metricsSystem = metricsSystem;
this.totalEvictedTaskCounter =
this.metricsSystem.createCounter(
PantheonMetricCategory.EXECUTORS,
metricName + "_dropped_tasks_total",
"Total number of tasks rejected by this working queue.");
}
@Override
public boolean offer(final Runnable task) {
while (!super.offer(task)) {
remove();
totalEvictedTaskCounter.inc();
}
return true;
}
}

@ -69,10 +69,6 @@ public class DeterministicEthScheduler extends EthScheduler {
return (MockScheduledExecutor) scheduler; return (MockScheduledExecutor) scheduler;
} }
MockScheduledExecutor mockTransactionsExecutor() {
return (MockScheduledExecutor) txWorkerExecutor;
}
public MockExecutorService mockServiceExecutor() { public MockExecutorService mockServiceExecutor() {
return (MockExecutorService) servicesExecutor; return (MockExecutorService) servicesExecutor;
} }

@ -30,6 +30,7 @@ import java.util.stream.Collectors;
public class MockExecutorService implements ExecutorService { public class MockExecutorService implements ExecutorService {
private boolean autoRun = true; private boolean autoRun = true;
private final List<ExecutorTask<?>> tasks = new ArrayList<>(); private final List<ExecutorTask<?>> tasks = new ArrayList<>();
// Test utility for inspecting executor's futures // Test utility for inspecting executor's futures

@ -0,0 +1,53 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.manager.bounded;
import static org.assertj.core.api.Assertions.assertThat;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import org.junit.Test;
public class BoundedQueueTest {
@Test
public void offerShouldAcceptNewElements() {
int size = 10;
final BoundedQueue queue = new BoundedQueue(size, "test", new NoOpMetricsSystem());
for (int i = 0; i < size; i++) {
final Runnable task = () -> {};
assertThat(queue.offer(task)).isTrue();
assertThat(queue).contains(task);
assertThat(queue.size()).isEqualTo(i + 1);
}
}
@Test
public void offerShouldMakeARoomAndAcceptNewElementAtFullCapacity() {
final BoundedQueue queue = new BoundedQueue(2, "test", new NoOpMetricsSystem());
final Runnable task1 = () -> {};
final Runnable task2 = () -> {};
final Runnable task3 = () -> {};
assertThat(queue.offer(task1)).isTrue();
assertThat(queue.size()).isEqualTo(1);
assertThat(queue).contains(task1);
assertThat(queue.offer(task2)).isTrue();
assertThat(queue.size()).isEqualTo(2);
assertThat(queue).contains(task2);
assertThat(queue.offer(task3)).isTrue();
assertThat(queue).doesNotContain(task1);
assertThat(queue).contains(task2);
assertThat(queue).contains(task3);
assertThat(queue.size()).isEqualTo(2);
}
}
Loading…
Cancel
Save