diff --git a/acceptance-tests/src/test/java/tech/pegasys/pantheon/tests/acceptance/dsl/node/factory/PantheonNodeFactory.java b/acceptance-tests/src/test/java/tech/pegasys/pantheon/tests/acceptance/dsl/node/factory/PantheonNodeFactory.java index ea90d3d3fb..256355d727 100644 --- a/acceptance-tests/src/test/java/tech/pegasys/pantheon/tests/acceptance/dsl/node/factory/PantheonNodeFactory.java +++ b/acceptance-tests/src/test/java/tech/pegasys/pantheon/tests/acceptance/dsl/node/factory/PantheonNodeFactory.java @@ -96,21 +96,6 @@ public class PantheonNodeFactory { .build()); } - public PantheonNode createMinerNodeWithCustomRefreshDelay( - final String name, final Long refreshDelay) throws IOException { - - WebSocketConfiguration wsConfig = createWebSocketEnabledConfig(); - wsConfig.setRefreshDelay(refreshDelay); - - return create( - new PantheonFactoryConfigurationBuilder() - .setName(name) - .miningEnabled() - .setJsonRpcConfiguration(createJsonRpcEnabledConfig()) - .setWebSocketConfiguration(wsConfig) - .build()); - } - public PantheonNode createArchiveNode(final String name) throws IOException { return create( new PantheonFactoryConfigurationBuilder() @@ -180,19 +165,6 @@ public class PantheonNodeFactory { .build()); } - public PantheonNode createArchiveNodeWithCustomRefreshDelay( - final String name, final Long refreshDelay) throws IOException { - WebSocketConfiguration wsConfig = createWebSocketEnabledConfig(); - wsConfig.setRefreshDelay(refreshDelay); - - return create( - new PantheonFactoryConfigurationBuilder() - .setName(name) - .setJsonRpcConfiguration(createJsonRpcEnabledConfig()) - .setWebSocketConfiguration(wsConfig) - .build()); - } - public PantheonNode createArchiveNodeWithRpcDisabled(final String name) throws IOException { return create(new PantheonFactoryConfigurationBuilder().setName(name).build()); } diff --git a/docs/Reference/Pantheon-CLI-Syntax.md b/docs/Reference/Pantheon-CLI-Syntax.md index 9280962ac5..d8022fb7c9 100644 --- a/docs/Reference/Pantheon-CLI-Syntax.md +++ b/docs/Reference/Pantheon-CLI-Syntax.md @@ -935,20 +935,6 @@ The default is 8546. Ports must be [exposed appropriately](../Configuring-Panthe !!!note This option is not used when running Pantheon from the [Docker image](../Getting-Started/Run-Docker-Image.md#exposing-ports). -### rpc-ws-refresh-delay - -```bash tab="Syntax" ---rpc-ws-refresh-delay= -``` - -```bash tab="Example" ---rpc-ws-refresh-delay="10000" -``` - -Refresh delay for Websocket synchronizing subscription in milliseconds. -The default is 5000. - - ### help ```bash tab="Syntax" diff --git a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/SyncStatus.java b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/SyncStatus.java index 2b5add05a1..ca9065d17f 100644 --- a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/SyncStatus.java +++ b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/SyncStatus.java @@ -12,6 +12,8 @@ */ package tech.pegasys.pantheon.ethereum.core; +import com.google.common.base.Objects; + public final class SyncStatus { private final long startingBlock; @@ -35,4 +37,23 @@ public final class SyncStatus { public long getHighestBlock() { return highestBlock; } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SyncStatus that = (SyncStatus) o; + return startingBlock == that.startingBlock + && currentBlock == that.currentBlock + && highestBlock == that.highestBlock; + } + + @Override + public int hashCode() { + return Objects.hashCode(startingBlock, currentBlock, highestBlock); + } } diff --git a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/Synchronizer.java b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/Synchronizer.java index a7b936fc2c..7362ee15a9 100644 --- a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/Synchronizer.java +++ b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/Synchronizer.java @@ -26,4 +26,13 @@ public interface Synchronizer { Optional getSyncStatus(); boolean hasSufficientPeers(); + + long observeSyncStatus(final SyncStatusListener listener); + + boolean removeObserver(long observerId); + + @FunctionalInterface + interface SyncStatusListener { + void onSyncStatus(final SyncStatus status); + } } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java index 9908035ba3..45ff1b83ec 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java @@ -12,6 +12,8 @@ */ package tech.pegasys.pantheon.ethereum.eth.sync; +import static com.google.common.base.Preconditions.checkNotNull; + import tech.pegasys.pantheon.ethereum.ProtocolContext; import tech.pegasys.pantheon.ethereum.core.SyncStatus; import tech.pegasys.pantheon.ethereum.core.Synchronizer; @@ -25,6 +27,7 @@ import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage; import tech.pegasys.pantheon.metrics.MetricsSystem; import tech.pegasys.pantheon.util.ExceptionUtils; +import tech.pegasys.pantheon.util.Subscribers; import java.nio.file.Path; import java.util.Optional; @@ -44,6 +47,7 @@ public class DefaultSynchronizer implements Synchronizer { private final BlockPropagationManager blockPropagationManager; private final FullSyncDownloader fullSyncDownloader; private final Optional> fastSynchronizer; + private final Subscribers syncStatusListeners = new Subscribers<>(); public DefaultSynchronizer( final SynchronizerConfiguration syncConfig, @@ -91,6 +95,7 @@ public class DefaultSynchronizer implements Synchronizer { @Override public void start() { if (started.compareAndSet(false, true)) { + syncState.addSyncStatusListener(this::syncStatusCallback); if (fastSynchronizer.isPresent()) { fastSynchronizer.get().start().whenComplete(this::handleFastSyncResult); } else { @@ -139,4 +144,19 @@ public class DefaultSynchronizer implements Synchronizer { fastSynchronizer.isPresent() ? syncConfig.getFastSyncMinimumPeerCount() : 1; return ethContext.getEthPeers().availablePeerCount() >= requiredPeerCount; } + + @Override + public long observeSyncStatus(final SyncStatusListener listener) { + checkNotNull(listener); + return syncStatusListeners.subscribe(listener); + } + + @Override + public boolean removeObserver(final long observerId) { + return syncStatusListeners.unsubscribe(observerId); + } + + private void syncStatusCallback(final SyncStatus status) { + syncStatusListeners.forEach(c -> c.onSyncStatus(status)); + } } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/state/SyncState.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/state/SyncState.java index 938d1dc72c..c3df8531d2 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/state/SyncState.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/state/SyncState.java @@ -15,6 +15,7 @@ package tech.pegasys.pantheon.ethereum.eth.sync.state; import tech.pegasys.pantheon.ethereum.chain.Blockchain; import tech.pegasys.pantheon.ethereum.core.BlockHeader; import tech.pegasys.pantheon.ethereum.core.SyncStatus; +import tech.pegasys.pantheon.ethereum.core.Synchronizer.SyncStatusListener; import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; import tech.pegasys.pantheon.ethereum.eth.manager.EthPeers; import tech.pegasys.pantheon.util.Subscribers; @@ -30,6 +31,7 @@ public class SyncState { private final long startingBlock; private boolean lastInSync = true; private final Subscribers inSyncListeners = new Subscribers<>(); + private final Subscribers syncStatusListeners = new Subscribers<>(); private Optional syncTarget = Optional.empty(); private long chainHeightListenerId; @@ -42,13 +44,23 @@ public class SyncState { if (event.isNewCanonicalHead()) { checkInSync(); } + publishSyncStatus(); }); } + private void publishSyncStatus() { + final SyncStatus syncStatus = syncStatus(); + syncStatusListeners.forEach(c -> c.onSyncStatus(syncStatus)); + } + public void addInSyncListener(final InSyncListener observer) { inSyncListeners.subscribe(observer); } + public void addSyncStatusListener(final SyncStatusListener observer) { + syncStatusListeners.subscribe(observer); + } + public SyncStatus syncStatus() { return new SyncStatus(startingBlock(), chainHeadNumber(), bestChainHeight()); } diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/state/SyncStateTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/state/SyncStateTest.java index 7df6da18a3..d9b39bf979 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/state/SyncStateTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/state/SyncStateTest.java @@ -13,6 +13,7 @@ package tech.pegasys.pantheon.ethereum.eth.sync.state; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -25,6 +26,7 @@ import tech.pegasys.pantheon.ethereum.core.Block; import tech.pegasys.pantheon.ethereum.core.BlockBody; import tech.pegasys.pantheon.ethereum.core.BlockHeader; import tech.pegasys.pantheon.ethereum.core.BlockHeaderTestFixture; +import tech.pegasys.pantheon.ethereum.core.Synchronizer.SyncStatusListener; import tech.pegasys.pantheon.ethereum.eth.manager.ChainState; import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; import tech.pegasys.pantheon.ethereum.eth.manager.EthPeers; @@ -100,6 +102,21 @@ public class SyncStateTest { verify(inSyncListener).onSyncStatusChanged(true); } + @Test + public void shouldSendSyncStatusWhenBlockIsAddedToTheChain() { + SyncStatusListener syncStatusListener = mock(SyncStatusListener.class); + syncState.addSyncStatusListener(syncStatusListener); + + blockAddedObserver.onBlockAdded( + BlockAddedEvent.createForHeadAdvancement( + new Block( + targetBlockHeader(), + new BlockBody(Collections.emptyList(), Collections.emptyList()))), + blockchain); + + verify(syncStatusListener).onSyncStatus(eq(syncState.syncStatus())); + } + private void setupOutOfSyncState() { final BlockHeader bestBlockHeader = targetBlockHeader(); peerChainHead.update(bestBlockHeader); diff --git a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/WebSocketConfiguration.java b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/WebSocketConfiguration.java index a8afb9b572..27c39ba0dc 100644 --- a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/WebSocketConfiguration.java +++ b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/WebSocketConfiguration.java @@ -29,13 +29,11 @@ public class WebSocketConfiguration { public static final int DEFAULT_WEBSOCKET_PORT = 8546; public static final Collection DEFAULT_WEBSOCKET_APIS = Arrays.asList(RpcApis.ETH, RpcApis.NET, RpcApis.WEB3); - public static final long DEFAULT_WEBSOCKET_REFRESH_DELAY = 5000; private boolean enabled; private int port; private String host; private Collection rpcApis; - private long refreshDelay; private boolean authenticationEnabled = false; private String authenticationCredentialsFile; private Collection hostsWhitelist = Collections.singletonList("localhost"); @@ -46,7 +44,6 @@ public class WebSocketConfiguration { config.setHost(DEFAULT_WEBSOCKET_HOST); config.setPort(DEFAULT_WEBSOCKET_PORT); config.setRpcApis(DEFAULT_WEBSOCKET_APIS); - config.setRefreshDelay(DEFAULT_WEBSOCKET_REFRESH_DELAY); return config; } @@ -121,14 +118,6 @@ public class WebSocketConfiguration { return Objects.hashCode(enabled, port, host, rpcApis); } - public void setRefreshDelay(final long refreshDelay) { - this.refreshDelay = refreshDelay; - } - - public long getRefreshDelay() { - return refreshDelay; - } - public boolean isAuthenticationEnabled() { return authenticationEnabled; } diff --git a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/SubscriptionManager.java b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/SubscriptionManager.java index 5773ceb41f..7cdb383fab 100644 --- a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/SubscriptionManager.java +++ b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/SubscriptionManager.java @@ -13,7 +13,6 @@ package tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription; import tech.pegasys.pantheon.ethereum.jsonrpc.internal.results.JsonRpcResult; -import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.WebSocketConfiguration; import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.request.SubscribeRequest; import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.request.SubscriptionType; import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.request.UnsubscribeRequest; @@ -50,15 +49,8 @@ public class SubscriptionManager extends AbstractVerticle { private final Map subscriptions = new HashMap<>(); private final Map> connectionSubscriptionsMap = new HashMap<>(); private final SubscriptionBuilder subscriptionBuilder = new SubscriptionBuilder(); - private final long refreshDelay; - public SubscriptionManager(final long refreshDelay) { - this.refreshDelay = refreshDelay; - } - - public SubscriptionManager() { - this.refreshDelay = WebSocketConfiguration.DEFAULT_WEBSOCKET_REFRESH_DELAY; - } + public SubscriptionManager() {} @Override public void start() { @@ -168,8 +160,4 @@ public class SubscriptionManager extends AbstractVerticle { .findFirst() .ifPresent(connectionId -> vertx.eventBus().send(connectionId, Json.encode(response))); } - - public long getRefreshDelay() { - return refreshDelay; - } } diff --git a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionService.java b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionService.java index 9b2aab3856..753135899a 100644 --- a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionService.java +++ b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionService.java @@ -14,72 +14,28 @@ package tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.syncing; import tech.pegasys.pantheon.ethereum.core.SyncStatus; import tech.pegasys.pantheon.ethereum.core.Synchronizer; -import tech.pegasys.pantheon.ethereum.jsonrpc.internal.results.JsonRpcResult; import tech.pegasys.pantheon.ethereum.jsonrpc.internal.results.SyncingResult; import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.Subscription; import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.SubscriptionManager; import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.request.SubscriptionType; import java.util.List; -import java.util.Optional; public class SyncingSubscriptionService { private final SubscriptionManager subscriptionManager; - private final Synchronizer synchronizer; - - private Optional previousSyncStatus; - private long timerId; public SyncingSubscriptionService( final SubscriptionManager subscriptionManager, final Synchronizer synchronizer) { this.subscriptionManager = subscriptionManager; - this.synchronizer = synchronizer; - previousSyncStatus = synchronizer.getSyncStatus(); - engageNextTimerTick(); + synchronizer.observeSyncStatus(this::sendSyncingToMatchingSubscriptions); } - public void sendSyncingToMatchingSubscriptions() { + private void sendSyncingToMatchingSubscriptions(final SyncStatus syncStatus) { final List syncingSubscriptions = subscriptionManager.subscriptionsOfType(SubscriptionType.SYNCING, Subscription.class); - final Optional syncStatus = synchronizer.getSyncStatus(); - - final boolean syncStatusChange = !syncStatus.equals(previousSyncStatus); - final JsonRpcResult result; - - if (syncStatus.isPresent()) { - result = new SyncingResult(syncStatus.get()); - } else { - result = new NotSynchronisingResult(); - } - - for (final Subscription subscription : syncingSubscriptions) { - sendSyncingResultToSubscription((SyncingSubscription) subscription, result, syncStatusChange); - } - previousSyncStatus = syncStatus; - } - - private void sendSyncingResultToSubscription( - final SyncingSubscription subscription, - final JsonRpcResult result, - final boolean syncStatusChange) { - if (syncStatusChange || !subscription.isFirstMessageHasBeenSent()) { - subscriptionManager.sendMessage(subscription.getId(), result); - subscription.setFirstMessageHasBeenSent(true); - } - } - public void engageNextTimerTick() { - if (subscriptionManager.getVertx() != null) { - this.timerId = - subscriptionManager - .getVertx() - .setTimer( - subscriptionManager.getRefreshDelay(), - (id) -> { - sendSyncingToMatchingSubscriptions(); - engageNextTimerTick(); - }); - } + syncingSubscriptions.forEach( + s -> subscriptionManager.sendMessage(s.getId(), new SyncingResult(syncStatus))); } } diff --git a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionServiceTest.java b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionServiceTest.java index 46b053108e..ff4a8a28f4 100644 --- a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionServiceTest.java +++ b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionServiceTest.java @@ -14,98 +14,50 @@ package tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.syncing; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.ArgumentMatchers.refEq; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import tech.pegasys.pantheon.ethereum.core.SyncStatus; import tech.pegasys.pantheon.ethereum.core.Synchronizer; +import tech.pegasys.pantheon.ethereum.core.Synchronizer.SyncStatusListener; import tech.pegasys.pantheon.ethereum.jsonrpc.internal.results.SyncingResult; import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.SubscriptionManager; import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.request.SubscriptionType; -import java.util.Optional; - import com.google.common.collect.Lists; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) public class SyncingSubscriptionServiceTest { - private SyncingSubscriptionService syncingSubscriptionService; - @Mock private SubscriptionManager subscriptionManager; @Mock private Synchronizer synchronizer; + private SyncStatusListener syncStatusListener; @Before public void before() { - syncingSubscriptionService = new SyncingSubscriptionService(subscriptionManager, synchronizer); - } - - @Test - public void shouldSendSyncStatusWhenSyncing() { - final SyncingSubscription subscription = new SyncingSubscription(9L, SubscriptionType.SYNCING); - when(subscriptionManager.subscriptionsOfType(any(), any())) - .thenReturn(Lists.newArrayList(subscription)); - final SyncStatus syncStatus = new SyncStatus(0L, 1L, 1L); - final SyncingResult expectedSyncingResult = new SyncingResult(syncStatus); - when(synchronizer.getSyncStatus()).thenReturn(Optional.of(syncStatus)); - - syncingSubscriptionService.sendSyncingToMatchingSubscriptions(); - - verify(subscriptionManager).sendMessage(eq(subscription.getId()), refEq(expectedSyncingResult)); - } - - @Test - public void shouldSendFalseWhenNotSyncing() { - final SyncingSubscription subscription = new SyncingSubscription(9L, SubscriptionType.SYNCING); - when(subscriptionManager.subscriptionsOfType(any(), any())) - .thenReturn(Lists.newArrayList(subscription)); - when(synchronizer.getSyncStatus()).thenReturn(Optional.empty()); - - syncingSubscriptionService.sendSyncingToMatchingSubscriptions(); - - verify(subscriptionManager) - .sendMessage(eq(subscription.getId()), refEq(new NotSynchronisingResult())); + final ArgumentCaptor captor = + ArgumentCaptor.forClass(SyncStatusListener.class); + when(synchronizer.observeSyncStatus(captor.capture())).thenReturn(1L); + new SyncingSubscriptionService(subscriptionManager, synchronizer); + syncStatusListener = captor.getValue(); } @Test - public void shouldSendNoMoreSyncStatusWhenSyncingStatusHasNotChanged() { + public void shouldSendSyncStatusWhenReceiveSyncStatus() { final SyncingSubscription subscription = new SyncingSubscription(9L, SubscriptionType.SYNCING); when(subscriptionManager.subscriptionsOfType(any(), any())) .thenReturn(Lists.newArrayList(subscription)); final SyncStatus syncStatus = new SyncStatus(0L, 1L, 1L); final SyncingResult expectedSyncingResult = new SyncingResult(syncStatus); - when(synchronizer.getSyncStatus()).thenReturn(Optional.of(syncStatus)); - - syncingSubscriptionService.sendSyncingToMatchingSubscriptions(); - - verify(subscriptionManager).sendMessage(eq(subscription.getId()), refEq(expectedSyncingResult)); - syncingSubscriptionService.sendSyncingToMatchingSubscriptions(); - } - - @Test - public void shouldSendDifferentSyncStatusWhenSyncingStatusHasChanged() { - final SyncingSubscription subscription = new SyncingSubscription(9L, SubscriptionType.SYNCING); - when(subscriptionManager.subscriptionsOfType(any(), any())) - .thenReturn(Lists.newArrayList(subscription)); - final SyncStatus syncStatus1 = new SyncStatus(0L, 1L, 9L); - final SyncStatus syncStatus2 = new SyncStatus(0L, 5L, 9L); - final SyncingResult expectedSyncingResult1 = new SyncingResult(syncStatus1); - when(synchronizer.getSyncStatus()).thenReturn(Optional.of(syncStatus1)); - syncingSubscriptionService.sendSyncingToMatchingSubscriptions(); - verify(subscriptionManager) - .sendMessage(eq(subscription.getId()), refEq(expectedSyncingResult1)); + syncStatusListener.onSyncStatus(syncStatus); - final SyncingResult expectedSyncingResult2 = new SyncingResult(syncStatus2); - when(synchronizer.getSyncStatus()).thenReturn(Optional.of(syncStatus2)); - syncingSubscriptionService.sendSyncingToMatchingSubscriptions(); - verify(subscriptionManager) - .sendMessage(eq(subscription.getId()), refEq(expectedSyncingResult2)); + verify(subscriptionManager).sendMessage(eq(subscription.getId()), eq(expectedSyncingResult)); } } diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/RunnerBuilder.java b/pantheon/src/main/java/tech/pegasys/pantheon/RunnerBuilder.java index c20eabaff7..36b92dccf6 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/RunnerBuilder.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/RunnerBuilder.java @@ -302,8 +302,7 @@ public class RunnerBuilder { privacyParameters); final SubscriptionManager subscriptionManager = - createSubscriptionManager( - vertx, transactionPool, webSocketConfiguration.getRefreshDelay()); + createSubscriptionManager(vertx, transactionPool); createLogsSubscriptionService( context.getBlockchain(), context.getWorldStateArchive(), subscriptionManager); @@ -382,8 +381,8 @@ public class RunnerBuilder { } private SubscriptionManager createSubscriptionManager( - final Vertx vertx, final TransactionPool transactionPool, final long refreshDelay) { - final SubscriptionManager subscriptionManager = new SubscriptionManager(refreshDelay); + final Vertx vertx, final TransactionPool transactionPool) { + final SubscriptionManager subscriptionManager = new SubscriptionManager(); final PendingTransactionSubscriptionService pendingTransactions = new PendingTransactionSubscriptionService(subscriptionManager); transactionPool.addTransactionListener(pendingTransactions); diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/cli/PantheonCommand.java b/pantheon/src/main/java/tech/pegasys/pantheon/cli/PantheonCommand.java index 98f0ebeeec..9060fe7a7b 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/cli/PantheonCommand.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/cli/PantheonCommand.java @@ -19,7 +19,6 @@ import static tech.pegasys.pantheon.cli.NetworkName.MAINNET; import static tech.pegasys.pantheon.ethereum.jsonrpc.JsonRpcConfiguration.DEFAULT_JSON_RPC_PORT; import static tech.pegasys.pantheon.ethereum.jsonrpc.RpcApis.DEFAULT_JSON_RPC_APIS; import static tech.pegasys.pantheon.ethereum.jsonrpc.websocket.WebSocketConfiguration.DEFAULT_WEBSOCKET_PORT; -import static tech.pegasys.pantheon.ethereum.jsonrpc.websocket.WebSocketConfiguration.DEFAULT_WEBSOCKET_REFRESH_DELAY; import static tech.pegasys.pantheon.ethereum.p2p.peers.DefaultPeer.DEFAULT_PORT; import static tech.pegasys.pantheon.metrics.MetricCategory.DEFAULT_METRIC_CATEGORIES; import static tech.pegasys.pantheon.metrics.prometheus.MetricsConfiguration.DEFAULT_METRICS_PORT; @@ -311,27 +310,6 @@ public class PantheonCommand implements DefaultCommandValues, Runnable { private Long rpcWsRefreshDelay; - @Option( - names = {"--rpc-ws-refresh-delay"}, - paramLabel = "", - arity = "1", - description = - "Refresh delay of WebSocket subscription sync in milliseconds " - + "(default: ${DEFAULT-VALUE})", - defaultValue = "" + DEFAULT_WEBSOCKET_REFRESH_DELAY) - private Long configureRefreshDelay(final Long refreshDelay) { - if (refreshDelay < DEFAULT_MIN_REFRESH_DELAY || refreshDelay > DEFAULT_MAX_REFRESH_DELAY) { - throw new ParameterException( - this.commandLine, - String.format( - "Refresh delay must be a positive integer between %s and %s", - String.valueOf(DEFAULT_MIN_REFRESH_DELAY), - String.valueOf(DEFAULT_MAX_REFRESH_DELAY))); - } - this.rpcWsRefreshDelay = refreshDelay; - return refreshDelay; - } - @Option( names = {"--rpc-ws-authentication-enabled"}, description = @@ -718,7 +696,6 @@ public class PantheonCommand implements DefaultCommandValues, Runnable { Arrays.asList( "--rpc-ws-api", "--rpc-ws-apis", - "--rpc-ws-refresh-delay", "--rpc-ws-host", "--rpc-ws-port", "--rpc-ws-authentication-enabled", @@ -735,7 +712,6 @@ public class PantheonCommand implements DefaultCommandValues, Runnable { webSocketConfiguration.setHost(rpcWsHost); webSocketConfiguration.setPort(rpcWsPort); webSocketConfiguration.setRpcApis(rpcWsApis); - webSocketConfiguration.setRefreshDelay(rpcWsRefreshDelay); webSocketConfiguration.setAuthenticationEnabled(isRpcWsAuthenticationEnabled); webSocketConfiguration.setAuthenticationCredentialsFile(rpcWsAuthenticationCredentialsFile()); webSocketConfiguration.setHostsWhitelist(hostsWhitelist); diff --git a/pantheon/src/test/java/tech/pegasys/pantheon/cli/PantheonCommandTest.java b/pantheon/src/test/java/tech/pegasys/pantheon/cli/PantheonCommandTest.java index 72906b9064..a60e5f7a41 100644 --- a/pantheon/src/test/java/tech/pegasys/pantheon/cli/PantheonCommandTest.java +++ b/pantheon/src/test/java/tech/pegasys/pantheon/cli/PantheonCommandTest.java @@ -793,28 +793,6 @@ public class PantheonCommandTest extends CommandTestAbstract { assertThat(commandErrorOutput.toString()).isEmpty(); } - @Test - public void rpcWsRefreshDelayMustBeUsed() { - parseCommand("--rpc-ws-enabled", "--rpc-ws-refresh-delay", "2000"); - - verify(mockRunnerBuilder).webSocketConfiguration(wsRpcConfigArgumentCaptor.capture()); - verify(mockRunnerBuilder).build(); - - assertThat(wsRpcConfigArgumentCaptor.getValue().getRefreshDelay()).isEqualTo(2000); - - assertThat(commandOutput.toString()).isEmpty(); - assertThat(commandErrorOutput.toString()).isEmpty(); - } - - @Test - public void rpcWsRefreshDelayWithNegativeValueMustError() { - parseCommand("--rpc-ws-enabled", "--rpc-ws-refresh-delay", "-2000"); - - assertThat(commandOutput.toString()).isEmpty(); - final String expectedErrorMsg = "Refresh delay must be a positive integer between"; - assertThat(commandErrorOutput.toString()).startsWith(expectedErrorMsg); - } - @Test public void bannedNodeIdsOptionMustBeUsed() { final String[] nodes = {"0001", "0002", "0003"}; @@ -1368,19 +1346,10 @@ public class PantheonCommandTest extends CommandTestAbstract { @Test public void rpcWsOptionsRequiresServiceToBeEnabled() { - parseCommand( - "--rpc-ws-api", - "ETH,NET", - "--rpc-ws-host", - "0.0.0.0", - "--rpc-ws-port", - "1234", - "--rpc-ws-refresh-delay", - "2"); + parseCommand("--rpc-ws-api", "ETH,NET", "--rpc-ws-host", "0.0.0.0", "--rpc-ws-port", "1234"); verifyOptionsConstraintLoggerCall( - "--rpc-ws-host, --rpc-ws-port, --rpc-ws-api and --rpc-ws-refresh-delay", - "--rpc-ws-enabled"); + "--rpc-ws-host, --rpc-ws-port and --rpc-ws-api", "--rpc-ws-enabled"); assertThat(commandOutput.toString()).isEmpty(); assertThat(commandErrorOutput.toString()).isEmpty(); diff --git a/pantheon/src/test/resources/everything_config.toml b/pantheon/src/test/resources/everything_config.toml index b8a24f12a6..83f054cebf 100644 --- a/pantheon/src/test/resources/everything_config.toml +++ b/pantheon/src/test/resources/everything_config.toml @@ -50,7 +50,6 @@ rpc-ws-api=["DEBUG","ETH"] rpc-ws-apis=["DEBUG","ETH"] rpc-ws-host="9.10.11.12" rpc-ws-port=9101 -rpc-ws-refresh-delay=500 rpc-ws-authentication-enabled=false rpc-ws-authentication-credentials-file="none"