NC-1361 Added configurable refresh delay for SyncingSubscriptionService on start up (#383)

* WS sync subscription delay added

* WS sync subscription delay added with unit testing

* WS sync subscription delay added with unit testing

* changed number to a constant in constructor

* Use default from websocket class instead of making new one

* Removed magic numbers

* Made error message use const as well
Michael Connor 6 years ago committed by GitHub
parent b5df96c2ec
commit 7120c7744d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 18
      acceptance-tests/src/test/java/tech/pegasys/pantheon/tests/acceptance/dsl/node/PantheonNodeFactory.java
  2. 11
      ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/WebSocketConfiguration.java
  3. 14
      ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/SubscriptionManager.java
  4. 3
      ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionService.java
  5. 7
      pantheon/src/main/java/tech/pegasys/pantheon/RunnerBuilder.java
  6. 26
      pantheon/src/main/java/tech/pegasys/pantheon/cli/PantheonCommand.java
  7. 22
      pantheon/src/test/java/tech/pegasys/pantheon/cli/PantheonCommandTest.java

@ -61,12 +61,30 @@ public class PantheonNodeFactory {
name, createMiningParameters(true), createJsonRpcConfig(), createWebSocketConfig()));
}
public PantheonNode createMinerNodeWithCustomRefreshDelay(
final String name, final Long refreshDelay) throws IOException {
WebSocketConfiguration wsConfig = createWebSocketConfig();
wsConfig.setRefreshDelay(refreshDelay);
return create(
new PantheonFactoryConfiguration(
name, createMiningParameters(true), createJsonRpcConfig(), wsConfig));
}
public PantheonNode createArchiveNode(final String name) throws IOException {
return create(
new PantheonFactoryConfiguration(
name, createMiningParameters(false), createJsonRpcConfig(), createWebSocketConfig()));
}
public PantheonNode createArchiveNodeWithCustomRefreshDelay(
final String name, final Long refreshDelay) throws IOException {
WebSocketConfiguration wsConfig = createWebSocketConfig();
wsConfig.setRefreshDelay(refreshDelay);
return create(
new PantheonFactoryConfiguration(
name, createMiningParameters(false), createJsonRpcConfig(), wsConfig));
}
public PantheonNode createArchiveNodeWithRpcDisabled(final String name) throws IOException {
return create(
new PantheonFactoryConfiguration(

@ -27,11 +27,13 @@ public class WebSocketConfiguration {
public static final int DEFAULT_WEBSOCKET_PORT = 8546;
public static final Collection<RpcApi> DEFAULT_WEBSOCKET_APIS =
Arrays.asList(RpcApis.ETH, RpcApis.NET, RpcApis.WEB3);
public static final long DEFAULT_WEBSOCKET_REFRESH_DELAY = 5000;
private boolean enabled;
private int port;
private String host;
private Collection<RpcApi> rpcApis;
private long refreshDelay;
public static WebSocketConfiguration createDefault() {
final WebSocketConfiguration config = new WebSocketConfiguration();
@ -39,6 +41,7 @@ public class WebSocketConfiguration {
config.setHost(DEFAULT_WEBSOCKET_HOST);
config.setPort(DEFAULT_WEBSOCKET_PORT);
config.setRpcApis(DEFAULT_WEBSOCKET_APIS);
config.setRefreshDelay(DEFAULT_WEBSOCKET_REFRESH_DELAY);
return config;
}
@ -110,4 +113,12 @@ public class WebSocketConfiguration {
public int hashCode() {
return Objects.hashCode(enabled, port, host, rpcApis);
}
public void setRefreshDelay(final long refreshDelay) {
this.refreshDelay = refreshDelay;
}
public long getRefreshDelay() {
return refreshDelay;
}
}

@ -13,6 +13,7 @@
package tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.results.JsonRpcResult;
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.WebSocketConfiguration;
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.request.SubscribeRequest;
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.request.SubscriptionType;
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.request.UnsubscribeRequest;
@ -49,6 +50,15 @@ public class SubscriptionManager extends AbstractVerticle {
private final Map<Long, Subscription> subscriptions = new HashMap<>();
private final Map<String, List<Long>> connectionSubscriptionsMap = new HashMap<>();
private final SubscriptionBuilder subscriptionBuilder = new SubscriptionBuilder();
private final long refreshDelay;
public SubscriptionManager(final long refreshDelay) {
this.refreshDelay = refreshDelay;
}
public SubscriptionManager() {
this.refreshDelay = WebSocketConfiguration.DEFAULT_WEBSOCKET_REFRESH_DELAY;
}
@Override
public void start() {
@ -162,4 +172,8 @@ public class SubscriptionManager extends AbstractVerticle {
.findFirst()
.ifPresent(connectionId -> vertx.eventBus().send(connectionId, Json.encode(response)));
}
public long getRefreshDelay() {
return refreshDelay;
}
}

@ -27,7 +27,6 @@ public class SyncingSubscriptionService {
private final SubscriptionManager subscriptionManager;
private final Synchronizer synchronizer;
private final long currentRefreshDelay = 5000;
private Optional<SyncStatus> previousSyncStatus;
private long timerId;
@ -76,7 +75,7 @@ public class SyncingSubscriptionService {
subscriptionManager
.getVertx()
.setTimer(
currentRefreshDelay,
subscriptionManager.getRefreshDelay(),
(id) -> {
sendSyncingToMatchingSubscriptions();
engageNextTimerTick();

@ -261,7 +261,8 @@ public class RunnerBuilder {
filterManager);
final SubscriptionManager subscriptionManager =
createSubscriptionManager(vertx, transactionPool);
createSubscriptionManager(
vertx, transactionPool, webSocketConfiguration.getRefreshDelay());
createLogsSubscriptionService(
context.getBlockchain(), context.getWorldStateArchive(), subscriptionManager);
@ -325,8 +326,8 @@ public class RunnerBuilder {
}
private SubscriptionManager createSubscriptionManager(
final Vertx vertx, final TransactionPool transactionPool) {
final SubscriptionManager subscriptionManager = new SubscriptionManager();
final Vertx vertx, final TransactionPool transactionPool, final long refreshDelay) {
final SubscriptionManager subscriptionManager = new SubscriptionManager(refreshDelay);
final PendingTransactionSubscriptionService pendingTransactions =
new PendingTransactionSubscriptionService(subscriptionManager);
transactionPool.addTransactionListener(pendingTransactions);

@ -96,6 +96,8 @@ public class PantheonCommand implements DefaultCommandValues, Runnable {
private static final Wei DEFAULT_MIN_TRANSACTION_GAS_PRICE = Wei.of(1000);
private static final BytesValue DEFAULT_EXTRA_DATA = BytesValue.EMPTY;
private static final long DEFAULT_MAX_REFRESH_DELAY = 3600000;
private static final long DEFAULT_MIN_REFRESH_DELAY = 1;
public static class RpcApisConverter implements ITypeConverter<RpcApi> {
@Override
@ -312,6 +314,29 @@ public class PantheonCommand implements DefaultCommandValues, Runnable {
)
private final Collection<RpcApi> wsApis = null;
private Long refreshDelay;
@Option(
names = {"--ws-refresh-delay"},
paramLabel = "<refresh delay>",
arity = "1",
description =
"Refresh delay of websocket subscription sync in milliseconds. "
+ "default: ${DEFAULT-VALUE}",
defaultValue = "" + WebSocketConfiguration.DEFAULT_WEBSOCKET_REFRESH_DELAY
)
private void setRefreshDelay(final Long refreshDelay) {
if (refreshDelay < DEFAULT_MIN_REFRESH_DELAY || refreshDelay > DEFAULT_MAX_REFRESH_DELAY) {
throw new ParameterException(
new CommandLine(this),
String.format(
"refreshDelay must be a positive integer between %s and %s",
String.valueOf(DEFAULT_MIN_REFRESH_DELAY),
String.valueOf(DEFAULT_MAX_REFRESH_DELAY)));
}
this.refreshDelay = refreshDelay;
}
@Option(
names = {"--host-whitelist"},
paramLabel = "<hostname>",
@ -504,6 +529,7 @@ public class PantheonCommand implements DefaultCommandValues, Runnable {
webSocketConfiguration.setHost(wsHostAndPort.getHost());
webSocketConfiguration.setPort(wsHostAndPort.getPort());
webSocketConfiguration.setRpcApis(wsApis);
webSocketConfiguration.setRefreshDelay(refreshDelay);
return webSocketConfiguration;
}

@ -443,6 +443,28 @@ public class PantheonCommandTest extends CommandTestAbstract {
assertThat(commandErrorOutput.toString()).isEmpty();
}
@Test
public void callingWithRefreshDelayWithValueMustUseValue() {
parseCommand("--ws-refresh-delay", "2000");
verify(mockRunnerBuilder).webSocketConfiguration(wsRpcConfigArgumentCaptor.capture());
verify(mockRunnerBuilder).build();
assertThat(wsRpcConfigArgumentCaptor.getValue().getRefreshDelay()).isEqualTo(2000);
assertThat(commandOutput.toString()).isEmpty();
assertThat(commandErrorOutput.toString()).isEmpty();
}
@Test
public void callingWithRefreshDelayWithNegativeValueMustError() {
parseCommand("--ws-refresh-delay", "-2000");
assertThat(commandOutput.toString()).isEmpty();
final String expectedErrorMsg = "refreshDelay must be a positive integer between";
assertThat(commandErrorOutput.toString()).startsWith(expectedErrorMsg);
}
@Test
public void callingWithNodesWhitelistOptionButNoValueMustNotError() {
parseCommand("--nodes-whitelist");

Loading…
Cancel
Save