From 395ccae15ec6440641c505a1a9c0f86feca8f888 Mon Sep 17 00:00:00 2001 From: Abdelhamid Bakhta <45264458+abdelhamidbakhta@users.noreply.github.com> Date: Tue, 25 Jun 2019 10:48:11 +0200 Subject: [PATCH] [PAN-2786] Stop Transaction Pool Queue from Growing Unbounded (#1586) * [PAN-2786] Stop Transaction Pool Queue from Growing Unbounded - use a `ArrayBlockingQueue` with a fixed size to limit the transaction task queue - expose a method in `MonitoredExecutors` to create a working queue with a maximum capacity - update `EthScheduler` to use a limited working queue for the `txWorkerExecutor` * [PAN-2786] Implement a bounded timed queue - implement a custom bounded queue - use a time based policy with keep alive configuration - implement eviction process based on the policy - add metrics * use field instead of parameter * fix PR pass 1 - use concrete class instead of interface - change metric name to comply with global policy - update unit test - wrap `Runnable` into `scheduleTxWorkerTask` * fix PR - remove time based policy - use raw `Runnable` - make a room for a new element at full capacity * Update BoundedQueueTest.java invert condition * fix PR comments - remove Mock class - make logic more thread safe, avoid race condition - remove element until the new one is accepted * Update ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthScheduler.java Co-Authored-By: Adrian Sutton * spotless apply * fix nit comments - use assertj assertions for better readability - improve unit test * spotless apply Signed-off-by: Adrian Sutton --- .../ethereum/eth/manager/EthScheduler.java | 19 +++++-- .../eth/manager/MonitoredExecutors.java | 30 ++++++++++- .../eth/manager/bounded/BoundedQueue.java | 44 +++++++++++++++ .../manager/DeterministicEthScheduler.java | 4 -- .../eth/manager/MockExecutorService.java | 1 + .../eth/manager/bounded/BoundedQueueTest.java | 53 +++++++++++++++++++ 6 files changed, 142 insertions(+), 9 deletions(-) create mode 100644 ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/bounded/BoundedQueue.java create mode 100644 ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/bounded/BoundedQueueTest.java diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthScheduler.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthScheduler.java index c36f5404a1..c03489fdc3 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthScheduler.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthScheduler.java @@ -12,6 +12,7 @@ */ package tech.pegasys.pantheon.ethereum.eth.manager; +import static tech.pegasys.pantheon.ethereum.eth.manager.MonitoredExecutors.newBoundedThreadPool; import static tech.pegasys.pantheon.ethereum.eth.manager.MonitoredExecutors.newCachedThreadPool; import static tech.pegasys.pantheon.ethereum.eth.manager.MonitoredExecutors.newFixedThreadPool; import static tech.pegasys.pantheon.ethereum.eth.manager.MonitoredExecutors.newScheduledThreadPool; @@ -45,9 +46,9 @@ public class EthScheduler { private static final Logger LOG = LogManager.getLogger(); private final Duration defaultTimeout = Duration.ofSeconds(5); - private final AtomicBoolean stopped = new AtomicBoolean(false); private final CountDownLatch shutdown = new CountDownLatch(1); + private static final int TX_WORKER_CAPACITY = 1000000; protected final ExecutorService syncWorkerExecutor; protected final ScheduledExecutorService scheduler; @@ -62,12 +63,24 @@ public class EthScheduler { final int txWorkerCount, final int computationWorkerCount, final MetricsSystem metricsSystem) { + this(syncWorkerCount, txWorkerCount, TX_WORKER_CAPACITY, computationWorkerCount, metricsSystem); + } + + public EthScheduler( + final int syncWorkerCount, + final int txWorkerCount, + final int txWorkerQueueSize, + final int computationWorkerCount, + final MetricsSystem metricsSystem) { this( newFixedThreadPool( EthScheduler.class.getSimpleName() + "-Workers", syncWorkerCount, metricsSystem), newScheduledThreadPool(EthScheduler.class.getSimpleName() + "-Timer", 1, metricsSystem), - newFixedThreadPool( - EthScheduler.class.getSimpleName() + "-Transactions", txWorkerCount, metricsSystem), + newBoundedThreadPool( + EthScheduler.class.getSimpleName() + "-Transactions", + txWorkerCount, + txWorkerQueueSize, + metricsSystem), newCachedThreadPool(EthScheduler.class.getSimpleName() + "-Services", metricsSystem), newFixedThreadPool( EthScheduler.class.getSimpleName() + "-Computation", diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/MonitoredExecutors.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/MonitoredExecutors.java index 8a5f0cf91b..d2b7ff60fa 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/MonitoredExecutors.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/MonitoredExecutors.java @@ -12,11 +12,13 @@ */ package tech.pegasys.pantheon.ethereum.eth.manager; +import tech.pegasys.pantheon.ethereum.eth.manager.bounded.BoundedQueue; import tech.pegasys.pantheon.metrics.Counter; import tech.pegasys.pantheon.metrics.MetricsSystem; import tech.pegasys.pantheon.metrics.PantheonMetricCategory; import java.util.Locale; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionHandler; @@ -35,6 +37,26 @@ public class MonitoredExecutors { public static ExecutorService newFixedThreadPool( final String name, final int workerCount, final MetricsSystem metricsSystem) { + return newFixedThreadPool(name, workerCount, new LinkedBlockingQueue<>(), metricsSystem); + } + + public static ExecutorService newBoundedThreadPool( + final String name, + final int workerCount, + final int queueSize, + final MetricsSystem metricsSystem) { + return newFixedThreadPool( + name, + workerCount, + new BoundedQueue(queueSize, toMetricName(name), metricsSystem), + metricsSystem); + } + + public static ExecutorService newFixedThreadPool( + final String name, + final int workerCount, + final BlockingQueue workingQueue, + final MetricsSystem metricsSystem) { return newMonitoredExecutor( name, metricsSystem, @@ -44,7 +66,7 @@ public class MonitoredExecutors { workerCount, 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<>(), + workingQueue, threadFactory, rejectedExecutionHandler)); } @@ -79,7 +101,7 @@ public class MonitoredExecutors { final MetricsSystem metricsSystem, final BiFunction creator) { - final String metricName = name.toLowerCase(Locale.US).replace('-', '_'); + final String metricName = toMetricName(name); final T executor = creator.apply( @@ -119,6 +141,10 @@ public class MonitoredExecutors { return executor; } + private static String toMetricName(final String name) { + return name.toLowerCase(Locale.US).replace('-', '_'); + } + private static class CountingAbortPolicy extends AbortPolicy { private final Counter rejectedTaskCounter; diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/bounded/BoundedQueue.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/bounded/BoundedQueue.java new file mode 100644 index 0000000000..85a5fd5a16 --- /dev/null +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/bounded/BoundedQueue.java @@ -0,0 +1,44 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.manager.bounded; + +import tech.pegasys.pantheon.metrics.Counter; +import tech.pegasys.pantheon.metrics.MetricsSystem; +import tech.pegasys.pantheon.metrics.PantheonMetricCategory; + +import java.util.concurrent.LinkedBlockingDeque; + +public class BoundedQueue extends LinkedBlockingDeque { + private final MetricsSystem metricsSystem; + private final Counter totalEvictedTaskCounter; + + public BoundedQueue( + final int capacity, final String metricName, final MetricsSystem metricsSystem) { + super(capacity); + this.metricsSystem = metricsSystem; + this.totalEvictedTaskCounter = + this.metricsSystem.createCounter( + PantheonMetricCategory.EXECUTORS, + metricName + "_dropped_tasks_total", + "Total number of tasks rejected by this working queue."); + } + + @Override + public boolean offer(final Runnable task) { + while (!super.offer(task)) { + remove(); + totalEvictedTaskCounter.inc(); + } + return true; + } +} diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/DeterministicEthScheduler.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/DeterministicEthScheduler.java index d0b12be3c7..6073bb7ee1 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/DeterministicEthScheduler.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/DeterministicEthScheduler.java @@ -69,10 +69,6 @@ public class DeterministicEthScheduler extends EthScheduler { return (MockScheduledExecutor) scheduler; } - MockScheduledExecutor mockTransactionsExecutor() { - return (MockScheduledExecutor) txWorkerExecutor; - } - public MockExecutorService mockServiceExecutor() { return (MockExecutorService) servicesExecutor; } diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/MockExecutorService.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/MockExecutorService.java index ed9cf6d8c6..f8f62b481c 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/MockExecutorService.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/MockExecutorService.java @@ -30,6 +30,7 @@ import java.util.stream.Collectors; public class MockExecutorService implements ExecutorService { private boolean autoRun = true; + private final List> tasks = new ArrayList<>(); // Test utility for inspecting executor's futures diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/bounded/BoundedQueueTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/bounded/BoundedQueueTest.java new file mode 100644 index 0000000000..0e5fbd784f --- /dev/null +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/bounded/BoundedQueueTest.java @@ -0,0 +1,53 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.manager.bounded; + +import static org.assertj.core.api.Assertions.assertThat; + +import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; + +import org.junit.Test; + +public class BoundedQueueTest { + + @Test + public void offerShouldAcceptNewElements() { + int size = 10; + final BoundedQueue queue = new BoundedQueue(size, "test", new NoOpMetricsSystem()); + for (int i = 0; i < size; i++) { + final Runnable task = () -> {}; + assertThat(queue.offer(task)).isTrue(); + assertThat(queue).contains(task); + assertThat(queue.size()).isEqualTo(i + 1); + } + } + + @Test + public void offerShouldMakeARoomAndAcceptNewElementAtFullCapacity() { + final BoundedQueue queue = new BoundedQueue(2, "test", new NoOpMetricsSystem()); + final Runnable task1 = () -> {}; + final Runnable task2 = () -> {}; + final Runnable task3 = () -> {}; + assertThat(queue.offer(task1)).isTrue(); + assertThat(queue.size()).isEqualTo(1); + assertThat(queue).contains(task1); + assertThat(queue.offer(task2)).isTrue(); + assertThat(queue.size()).isEqualTo(2); + assertThat(queue).contains(task2); + assertThat(queue.offer(task3)).isTrue(); + assertThat(queue).doesNotContain(task1); + assertThat(queue).contains(task2); + assertThat(queue).contains(task3); + assertThat(queue.size()).isEqualTo(2); + } +}