[PAN-2341] Created SyncStatus notifications (#1010)

* [PAN-2341] Created mechanism to publish sync status

* Refactoring WebSockets syncing subscription to respond to SyncStatus events

* Removing rpc-ws-refresh-delay option

* Removing ws-refresh-delay option from ATs

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
Lucas Saldanha 6 years ago committed by GitHub
parent f6f56614aa
commit 79f46ac592
  1. 28
      acceptance-tests/src/test/java/tech/pegasys/pantheon/tests/acceptance/dsl/node/factory/PantheonNodeFactory.java
  2. 14
      docs/Reference/Pantheon-CLI-Syntax.md
  3. 21
      ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/SyncStatus.java
  4. 9
      ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/Synchronizer.java
  5. 20
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java
  6. 12
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/state/SyncState.java
  7. 17
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/state/SyncStateTest.java
  8. 11
      ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/WebSocketConfiguration.java
  9. 14
      ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/SubscriptionManager.java
  10. 52
      ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionService.java
  11. 70
      ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionServiceTest.java
  12. 7
      pantheon/src/main/java/tech/pegasys/pantheon/RunnerBuilder.java
  13. 24
      pantheon/src/main/java/tech/pegasys/pantheon/cli/PantheonCommand.java
  14. 35
      pantheon/src/test/java/tech/pegasys/pantheon/cli/PantheonCommandTest.java
  15. 1
      pantheon/src/test/resources/everything_config.toml

@ -96,21 +96,6 @@ public class PantheonNodeFactory {
.build()); .build());
} }
public PantheonNode createMinerNodeWithCustomRefreshDelay(
final String name, final Long refreshDelay) throws IOException {
WebSocketConfiguration wsConfig = createWebSocketEnabledConfig();
wsConfig.setRefreshDelay(refreshDelay);
return create(
new PantheonFactoryConfigurationBuilder()
.setName(name)
.miningEnabled()
.setJsonRpcConfiguration(createJsonRpcEnabledConfig())
.setWebSocketConfiguration(wsConfig)
.build());
}
public PantheonNode createArchiveNode(final String name) throws IOException { public PantheonNode createArchiveNode(final String name) throws IOException {
return create( return create(
new PantheonFactoryConfigurationBuilder() new PantheonFactoryConfigurationBuilder()
@ -180,19 +165,6 @@ public class PantheonNodeFactory {
.build()); .build());
} }
public PantheonNode createArchiveNodeWithCustomRefreshDelay(
final String name, final Long refreshDelay) throws IOException {
WebSocketConfiguration wsConfig = createWebSocketEnabledConfig();
wsConfig.setRefreshDelay(refreshDelay);
return create(
new PantheonFactoryConfigurationBuilder()
.setName(name)
.setJsonRpcConfiguration(createJsonRpcEnabledConfig())
.setWebSocketConfiguration(wsConfig)
.build());
}
public PantheonNode createArchiveNodeWithRpcDisabled(final String name) throws IOException { public PantheonNode createArchiveNodeWithRpcDisabled(final String name) throws IOException {
return create(new PantheonFactoryConfigurationBuilder().setName(name).build()); return create(new PantheonFactoryConfigurationBuilder().setName(name).build());
} }

@ -935,20 +935,6 @@ The default is 8546. Ports must be [exposed appropriately](../Configuring-Panthe
!!!note !!!note
This option is not used when running Pantheon from the [Docker image](../Getting-Started/Run-Docker-Image.md#exposing-ports). This option is not used when running Pantheon from the [Docker image](../Getting-Started/Run-Docker-Image.md#exposing-ports).
### rpc-ws-refresh-delay
```bash tab="Syntax"
--rpc-ws-refresh-delay=<refresh delay>
```
```bash tab="Example"
--rpc-ws-refresh-delay="10000"
```
Refresh delay for Websocket synchronizing subscription in milliseconds.
The default is 5000.
### help ### help
```bash tab="Syntax" ```bash tab="Syntax"

@ -12,6 +12,8 @@
*/ */
package tech.pegasys.pantheon.ethereum.core; package tech.pegasys.pantheon.ethereum.core;
import com.google.common.base.Objects;
public final class SyncStatus { public final class SyncStatus {
private final long startingBlock; private final long startingBlock;
@ -35,4 +37,23 @@ public final class SyncStatus {
public long getHighestBlock() { public long getHighestBlock() {
return highestBlock; return highestBlock;
} }
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SyncStatus that = (SyncStatus) o;
return startingBlock == that.startingBlock
&& currentBlock == that.currentBlock
&& highestBlock == that.highestBlock;
}
@Override
public int hashCode() {
return Objects.hashCode(startingBlock, currentBlock, highestBlock);
}
} }

@ -26,4 +26,13 @@ public interface Synchronizer {
Optional<SyncStatus> getSyncStatus(); Optional<SyncStatus> getSyncStatus();
boolean hasSufficientPeers(); boolean hasSufficientPeers();
long observeSyncStatus(final SyncStatusListener listener);
boolean removeObserver(long observerId);
@FunctionalInterface
interface SyncStatusListener {
void onSyncStatus(final SyncStatus status);
}
} }

@ -12,6 +12,8 @@
*/ */
package tech.pegasys.pantheon.ethereum.eth.sync; package tech.pegasys.pantheon.ethereum.eth.sync;
import static com.google.common.base.Preconditions.checkNotNull;
import tech.pegasys.pantheon.ethereum.ProtocolContext; import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.core.SyncStatus; import tech.pegasys.pantheon.ethereum.core.SyncStatus;
import tech.pegasys.pantheon.ethereum.core.Synchronizer; import tech.pegasys.pantheon.ethereum.core.Synchronizer;
@ -25,6 +27,7 @@ import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage; import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.metrics.MetricsSystem; import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.util.ExceptionUtils; import tech.pegasys.pantheon.util.ExceptionUtils;
import tech.pegasys.pantheon.util.Subscribers;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.Optional; import java.util.Optional;
@ -44,6 +47,7 @@ public class DefaultSynchronizer<C> implements Synchronizer {
private final BlockPropagationManager<C> blockPropagationManager; private final BlockPropagationManager<C> blockPropagationManager;
private final FullSyncDownloader<C> fullSyncDownloader; private final FullSyncDownloader<C> fullSyncDownloader;
private final Optional<FastSynchronizer<C>> fastSynchronizer; private final Optional<FastSynchronizer<C>> fastSynchronizer;
private final Subscribers<SyncStatusListener> syncStatusListeners = new Subscribers<>();
public DefaultSynchronizer( public DefaultSynchronizer(
final SynchronizerConfiguration syncConfig, final SynchronizerConfiguration syncConfig,
@ -91,6 +95,7 @@ public class DefaultSynchronizer<C> implements Synchronizer {
@Override @Override
public void start() { public void start() {
if (started.compareAndSet(false, true)) { if (started.compareAndSet(false, true)) {
syncState.addSyncStatusListener(this::syncStatusCallback);
if (fastSynchronizer.isPresent()) { if (fastSynchronizer.isPresent()) {
fastSynchronizer.get().start().whenComplete(this::handleFastSyncResult); fastSynchronizer.get().start().whenComplete(this::handleFastSyncResult);
} else { } else {
@ -139,4 +144,19 @@ public class DefaultSynchronizer<C> implements Synchronizer {
fastSynchronizer.isPresent() ? syncConfig.getFastSyncMinimumPeerCount() : 1; fastSynchronizer.isPresent() ? syncConfig.getFastSyncMinimumPeerCount() : 1;
return ethContext.getEthPeers().availablePeerCount() >= requiredPeerCount; return ethContext.getEthPeers().availablePeerCount() >= requiredPeerCount;
} }
@Override
public long observeSyncStatus(final SyncStatusListener listener) {
checkNotNull(listener);
return syncStatusListeners.subscribe(listener);
}
@Override
public boolean removeObserver(final long observerId) {
return syncStatusListeners.unsubscribe(observerId);
}
private void syncStatusCallback(final SyncStatus status) {
syncStatusListeners.forEach(c -> c.onSyncStatus(status));
}
} }

@ -15,6 +15,7 @@ package tech.pegasys.pantheon.ethereum.eth.sync.state;
import tech.pegasys.pantheon.ethereum.chain.Blockchain; import tech.pegasys.pantheon.ethereum.chain.Blockchain;
import tech.pegasys.pantheon.ethereum.core.BlockHeader; import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.SyncStatus; import tech.pegasys.pantheon.ethereum.core.SyncStatus;
import tech.pegasys.pantheon.ethereum.core.Synchronizer.SyncStatusListener;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeers; import tech.pegasys.pantheon.ethereum.eth.manager.EthPeers;
import tech.pegasys.pantheon.util.Subscribers; import tech.pegasys.pantheon.util.Subscribers;
@ -30,6 +31,7 @@ public class SyncState {
private final long startingBlock; private final long startingBlock;
private boolean lastInSync = true; private boolean lastInSync = true;
private final Subscribers<InSyncListener> inSyncListeners = new Subscribers<>(); private final Subscribers<InSyncListener> inSyncListeners = new Subscribers<>();
private final Subscribers<SyncStatusListener> syncStatusListeners = new Subscribers<>();
private Optional<SyncTarget> syncTarget = Optional.empty(); private Optional<SyncTarget> syncTarget = Optional.empty();
private long chainHeightListenerId; private long chainHeightListenerId;
@ -42,13 +44,23 @@ public class SyncState {
if (event.isNewCanonicalHead()) { if (event.isNewCanonicalHead()) {
checkInSync(); checkInSync();
} }
publishSyncStatus();
}); });
} }
private void publishSyncStatus() {
final SyncStatus syncStatus = syncStatus();
syncStatusListeners.forEach(c -> c.onSyncStatus(syncStatus));
}
public void addInSyncListener(final InSyncListener observer) { public void addInSyncListener(final InSyncListener observer) {
inSyncListeners.subscribe(observer); inSyncListeners.subscribe(observer);
} }
public void addSyncStatusListener(final SyncStatusListener observer) {
syncStatusListeners.subscribe(observer);
}
public SyncStatus syncStatus() { public SyncStatus syncStatus() {
return new SyncStatus(startingBlock(), chainHeadNumber(), bestChainHeight()); return new SyncStatus(startingBlock(), chainHeadNumber(), bestChainHeight());
} }

@ -13,6 +13,7 @@
package tech.pegasys.pantheon.ethereum.eth.sync.state; package tech.pegasys.pantheon.ethereum.eth.sync.state;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions;
@ -25,6 +26,7 @@ import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockBody; import tech.pegasys.pantheon.ethereum.core.BlockBody;
import tech.pegasys.pantheon.ethereum.core.BlockHeader; import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.BlockHeaderTestFixture; import tech.pegasys.pantheon.ethereum.core.BlockHeaderTestFixture;
import tech.pegasys.pantheon.ethereum.core.Synchronizer.SyncStatusListener;
import tech.pegasys.pantheon.ethereum.eth.manager.ChainState; import tech.pegasys.pantheon.ethereum.eth.manager.ChainState;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeers; import tech.pegasys.pantheon.ethereum.eth.manager.EthPeers;
@ -100,6 +102,21 @@ public class SyncStateTest {
verify(inSyncListener).onSyncStatusChanged(true); verify(inSyncListener).onSyncStatusChanged(true);
} }
@Test
public void shouldSendSyncStatusWhenBlockIsAddedToTheChain() {
SyncStatusListener syncStatusListener = mock(SyncStatusListener.class);
syncState.addSyncStatusListener(syncStatusListener);
blockAddedObserver.onBlockAdded(
BlockAddedEvent.createForHeadAdvancement(
new Block(
targetBlockHeader(),
new BlockBody(Collections.emptyList(), Collections.emptyList()))),
blockchain);
verify(syncStatusListener).onSyncStatus(eq(syncState.syncStatus()));
}
private void setupOutOfSyncState() { private void setupOutOfSyncState() {
final BlockHeader bestBlockHeader = targetBlockHeader(); final BlockHeader bestBlockHeader = targetBlockHeader();
peerChainHead.update(bestBlockHeader); peerChainHead.update(bestBlockHeader);

@ -29,13 +29,11 @@ public class WebSocketConfiguration {
public static final int DEFAULT_WEBSOCKET_PORT = 8546; public static final int DEFAULT_WEBSOCKET_PORT = 8546;
public static final Collection<RpcApi> DEFAULT_WEBSOCKET_APIS = public static final Collection<RpcApi> DEFAULT_WEBSOCKET_APIS =
Arrays.asList(RpcApis.ETH, RpcApis.NET, RpcApis.WEB3); Arrays.asList(RpcApis.ETH, RpcApis.NET, RpcApis.WEB3);
public static final long DEFAULT_WEBSOCKET_REFRESH_DELAY = 5000;
private boolean enabled; private boolean enabled;
private int port; private int port;
private String host; private String host;
private Collection<RpcApi> rpcApis; private Collection<RpcApi> rpcApis;
private long refreshDelay;
private boolean authenticationEnabled = false; private boolean authenticationEnabled = false;
private String authenticationCredentialsFile; private String authenticationCredentialsFile;
private Collection<String> hostsWhitelist = Collections.singletonList("localhost"); private Collection<String> hostsWhitelist = Collections.singletonList("localhost");
@ -46,7 +44,6 @@ public class WebSocketConfiguration {
config.setHost(DEFAULT_WEBSOCKET_HOST); config.setHost(DEFAULT_WEBSOCKET_HOST);
config.setPort(DEFAULT_WEBSOCKET_PORT); config.setPort(DEFAULT_WEBSOCKET_PORT);
config.setRpcApis(DEFAULT_WEBSOCKET_APIS); config.setRpcApis(DEFAULT_WEBSOCKET_APIS);
config.setRefreshDelay(DEFAULT_WEBSOCKET_REFRESH_DELAY);
return config; return config;
} }
@ -121,14 +118,6 @@ public class WebSocketConfiguration {
return Objects.hashCode(enabled, port, host, rpcApis); return Objects.hashCode(enabled, port, host, rpcApis);
} }
public void setRefreshDelay(final long refreshDelay) {
this.refreshDelay = refreshDelay;
}
public long getRefreshDelay() {
return refreshDelay;
}
public boolean isAuthenticationEnabled() { public boolean isAuthenticationEnabled() {
return authenticationEnabled; return authenticationEnabled;
} }

@ -13,7 +13,6 @@
package tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription; package tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.results.JsonRpcResult; import tech.pegasys.pantheon.ethereum.jsonrpc.internal.results.JsonRpcResult;
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.WebSocketConfiguration;
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.request.SubscribeRequest; import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.request.SubscribeRequest;
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.request.SubscriptionType; import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.request.SubscriptionType;
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.request.UnsubscribeRequest; import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.request.UnsubscribeRequest;
@ -50,15 +49,8 @@ public class SubscriptionManager extends AbstractVerticle {
private final Map<Long, Subscription> subscriptions = new HashMap<>(); private final Map<Long, Subscription> subscriptions = new HashMap<>();
private final Map<String, List<Long>> connectionSubscriptionsMap = new HashMap<>(); private final Map<String, List<Long>> connectionSubscriptionsMap = new HashMap<>();
private final SubscriptionBuilder subscriptionBuilder = new SubscriptionBuilder(); private final SubscriptionBuilder subscriptionBuilder = new SubscriptionBuilder();
private final long refreshDelay;
public SubscriptionManager(final long refreshDelay) { public SubscriptionManager() {}
this.refreshDelay = refreshDelay;
}
public SubscriptionManager() {
this.refreshDelay = WebSocketConfiguration.DEFAULT_WEBSOCKET_REFRESH_DELAY;
}
@Override @Override
public void start() { public void start() {
@ -168,8 +160,4 @@ public class SubscriptionManager extends AbstractVerticle {
.findFirst() .findFirst()
.ifPresent(connectionId -> vertx.eventBus().send(connectionId, Json.encode(response))); .ifPresent(connectionId -> vertx.eventBus().send(connectionId, Json.encode(response)));
} }
public long getRefreshDelay() {
return refreshDelay;
}
} }

@ -14,72 +14,28 @@ package tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.syncing;
import tech.pegasys.pantheon.ethereum.core.SyncStatus; import tech.pegasys.pantheon.ethereum.core.SyncStatus;
import tech.pegasys.pantheon.ethereum.core.Synchronizer; import tech.pegasys.pantheon.ethereum.core.Synchronizer;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.results.JsonRpcResult;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.results.SyncingResult; import tech.pegasys.pantheon.ethereum.jsonrpc.internal.results.SyncingResult;
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.Subscription; import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.Subscription;
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.SubscriptionManager; import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.SubscriptionManager;
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.request.SubscriptionType; import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.request.SubscriptionType;
import java.util.List; import java.util.List;
import java.util.Optional;
public class SyncingSubscriptionService { public class SyncingSubscriptionService {
private final SubscriptionManager subscriptionManager; private final SubscriptionManager subscriptionManager;
private final Synchronizer synchronizer;
private Optional<SyncStatus> previousSyncStatus;
private long timerId;
public SyncingSubscriptionService( public SyncingSubscriptionService(
final SubscriptionManager subscriptionManager, final Synchronizer synchronizer) { final SubscriptionManager subscriptionManager, final Synchronizer synchronizer) {
this.subscriptionManager = subscriptionManager; this.subscriptionManager = subscriptionManager;
this.synchronizer = synchronizer; synchronizer.observeSyncStatus(this::sendSyncingToMatchingSubscriptions);
previousSyncStatus = synchronizer.getSyncStatus();
engageNextTimerTick();
} }
public void sendSyncingToMatchingSubscriptions() { private void sendSyncingToMatchingSubscriptions(final SyncStatus syncStatus) {
final List<Subscription> syncingSubscriptions = final List<Subscription> syncingSubscriptions =
subscriptionManager.subscriptionsOfType(SubscriptionType.SYNCING, Subscription.class); subscriptionManager.subscriptionsOfType(SubscriptionType.SYNCING, Subscription.class);
final Optional<SyncStatus> syncStatus = synchronizer.getSyncStatus();
final boolean syncStatusChange = !syncStatus.equals(previousSyncStatus);
final JsonRpcResult result;
if (syncStatus.isPresent()) {
result = new SyncingResult(syncStatus.get());
} else {
result = new NotSynchronisingResult();
}
for (final Subscription subscription : syncingSubscriptions) {
sendSyncingResultToSubscription((SyncingSubscription) subscription, result, syncStatusChange);
}
previousSyncStatus = syncStatus;
}
private void sendSyncingResultToSubscription(
final SyncingSubscription subscription,
final JsonRpcResult result,
final boolean syncStatusChange) {
if (syncStatusChange || !subscription.isFirstMessageHasBeenSent()) {
subscriptionManager.sendMessage(subscription.getId(), result);
subscription.setFirstMessageHasBeenSent(true);
}
}
public void engageNextTimerTick() { syncingSubscriptions.forEach(
if (subscriptionManager.getVertx() != null) { s -> subscriptionManager.sendMessage(s.getId(), new SyncingResult(syncStatus)));
this.timerId =
subscriptionManager
.getVertx()
.setTimer(
subscriptionManager.getRefreshDelay(),
(id) -> {
sendSyncingToMatchingSubscriptions();
engageNextTimerTick();
});
}
} }
} }

@ -14,98 +14,50 @@ package tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.syncing;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.refEq;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import tech.pegasys.pantheon.ethereum.core.SyncStatus; import tech.pegasys.pantheon.ethereum.core.SyncStatus;
import tech.pegasys.pantheon.ethereum.core.Synchronizer; import tech.pegasys.pantheon.ethereum.core.Synchronizer;
import tech.pegasys.pantheon.ethereum.core.Synchronizer.SyncStatusListener;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.results.SyncingResult; import tech.pegasys.pantheon.ethereum.jsonrpc.internal.results.SyncingResult;
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.SubscriptionManager; import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.SubscriptionManager;
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.request.SubscriptionType; import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.request.SubscriptionType;
import java.util.Optional;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner; import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class SyncingSubscriptionServiceTest { public class SyncingSubscriptionServiceTest {
private SyncingSubscriptionService syncingSubscriptionService;
@Mock private SubscriptionManager subscriptionManager; @Mock private SubscriptionManager subscriptionManager;
@Mock private Synchronizer synchronizer; @Mock private Synchronizer synchronizer;
private SyncStatusListener syncStatusListener;
@Before @Before
public void before() { public void before() {
syncingSubscriptionService = new SyncingSubscriptionService(subscriptionManager, synchronizer); final ArgumentCaptor<SyncStatusListener> captor =
} ArgumentCaptor.forClass(SyncStatusListener.class);
when(synchronizer.observeSyncStatus(captor.capture())).thenReturn(1L);
@Test new SyncingSubscriptionService(subscriptionManager, synchronizer);
public void shouldSendSyncStatusWhenSyncing() { syncStatusListener = captor.getValue();
final SyncingSubscription subscription = new SyncingSubscription(9L, SubscriptionType.SYNCING);
when(subscriptionManager.subscriptionsOfType(any(), any()))
.thenReturn(Lists.newArrayList(subscription));
final SyncStatus syncStatus = new SyncStatus(0L, 1L, 1L);
final SyncingResult expectedSyncingResult = new SyncingResult(syncStatus);
when(synchronizer.getSyncStatus()).thenReturn(Optional.of(syncStatus));
syncingSubscriptionService.sendSyncingToMatchingSubscriptions();
verify(subscriptionManager).sendMessage(eq(subscription.getId()), refEq(expectedSyncingResult));
}
@Test
public void shouldSendFalseWhenNotSyncing() {
final SyncingSubscription subscription = new SyncingSubscription(9L, SubscriptionType.SYNCING);
when(subscriptionManager.subscriptionsOfType(any(), any()))
.thenReturn(Lists.newArrayList(subscription));
when(synchronizer.getSyncStatus()).thenReturn(Optional.empty());
syncingSubscriptionService.sendSyncingToMatchingSubscriptions();
verify(subscriptionManager)
.sendMessage(eq(subscription.getId()), refEq(new NotSynchronisingResult()));
} }
@Test @Test
public void shouldSendNoMoreSyncStatusWhenSyncingStatusHasNotChanged() { public void shouldSendSyncStatusWhenReceiveSyncStatus() {
final SyncingSubscription subscription = new SyncingSubscription(9L, SubscriptionType.SYNCING); final SyncingSubscription subscription = new SyncingSubscription(9L, SubscriptionType.SYNCING);
when(subscriptionManager.subscriptionsOfType(any(), any())) when(subscriptionManager.subscriptionsOfType(any(), any()))
.thenReturn(Lists.newArrayList(subscription)); .thenReturn(Lists.newArrayList(subscription));
final SyncStatus syncStatus = new SyncStatus(0L, 1L, 1L); final SyncStatus syncStatus = new SyncStatus(0L, 1L, 1L);
final SyncingResult expectedSyncingResult = new SyncingResult(syncStatus); final SyncingResult expectedSyncingResult = new SyncingResult(syncStatus);
when(synchronizer.getSyncStatus()).thenReturn(Optional.of(syncStatus));
syncingSubscriptionService.sendSyncingToMatchingSubscriptions();
verify(subscriptionManager).sendMessage(eq(subscription.getId()), refEq(expectedSyncingResult));
syncingSubscriptionService.sendSyncingToMatchingSubscriptions();
}
@Test
public void shouldSendDifferentSyncStatusWhenSyncingStatusHasChanged() {
final SyncingSubscription subscription = new SyncingSubscription(9L, SubscriptionType.SYNCING);
when(subscriptionManager.subscriptionsOfType(any(), any()))
.thenReturn(Lists.newArrayList(subscription));
final SyncStatus syncStatus1 = new SyncStatus(0L, 1L, 9L);
final SyncStatus syncStatus2 = new SyncStatus(0L, 5L, 9L);
final SyncingResult expectedSyncingResult1 = new SyncingResult(syncStatus1);
when(synchronizer.getSyncStatus()).thenReturn(Optional.of(syncStatus1));
syncingSubscriptionService.sendSyncingToMatchingSubscriptions(); syncStatusListener.onSyncStatus(syncStatus);
verify(subscriptionManager)
.sendMessage(eq(subscription.getId()), refEq(expectedSyncingResult1));
final SyncingResult expectedSyncingResult2 = new SyncingResult(syncStatus2); verify(subscriptionManager).sendMessage(eq(subscription.getId()), eq(expectedSyncingResult));
when(synchronizer.getSyncStatus()).thenReturn(Optional.of(syncStatus2));
syncingSubscriptionService.sendSyncingToMatchingSubscriptions();
verify(subscriptionManager)
.sendMessage(eq(subscription.getId()), refEq(expectedSyncingResult2));
} }
} }

@ -302,8 +302,7 @@ public class RunnerBuilder {
privacyParameters); privacyParameters);
final SubscriptionManager subscriptionManager = final SubscriptionManager subscriptionManager =
createSubscriptionManager( createSubscriptionManager(vertx, transactionPool);
vertx, transactionPool, webSocketConfiguration.getRefreshDelay());
createLogsSubscriptionService( createLogsSubscriptionService(
context.getBlockchain(), context.getWorldStateArchive(), subscriptionManager); context.getBlockchain(), context.getWorldStateArchive(), subscriptionManager);
@ -382,8 +381,8 @@ public class RunnerBuilder {
} }
private SubscriptionManager createSubscriptionManager( private SubscriptionManager createSubscriptionManager(
final Vertx vertx, final TransactionPool transactionPool, final long refreshDelay) { final Vertx vertx, final TransactionPool transactionPool) {
final SubscriptionManager subscriptionManager = new SubscriptionManager(refreshDelay); final SubscriptionManager subscriptionManager = new SubscriptionManager();
final PendingTransactionSubscriptionService pendingTransactions = final PendingTransactionSubscriptionService pendingTransactions =
new PendingTransactionSubscriptionService(subscriptionManager); new PendingTransactionSubscriptionService(subscriptionManager);
transactionPool.addTransactionListener(pendingTransactions); transactionPool.addTransactionListener(pendingTransactions);

@ -19,7 +19,6 @@ import static tech.pegasys.pantheon.cli.NetworkName.MAINNET;
import static tech.pegasys.pantheon.ethereum.jsonrpc.JsonRpcConfiguration.DEFAULT_JSON_RPC_PORT; import static tech.pegasys.pantheon.ethereum.jsonrpc.JsonRpcConfiguration.DEFAULT_JSON_RPC_PORT;
import static tech.pegasys.pantheon.ethereum.jsonrpc.RpcApis.DEFAULT_JSON_RPC_APIS; import static tech.pegasys.pantheon.ethereum.jsonrpc.RpcApis.DEFAULT_JSON_RPC_APIS;
import static tech.pegasys.pantheon.ethereum.jsonrpc.websocket.WebSocketConfiguration.DEFAULT_WEBSOCKET_PORT; import static tech.pegasys.pantheon.ethereum.jsonrpc.websocket.WebSocketConfiguration.DEFAULT_WEBSOCKET_PORT;
import static tech.pegasys.pantheon.ethereum.jsonrpc.websocket.WebSocketConfiguration.DEFAULT_WEBSOCKET_REFRESH_DELAY;
import static tech.pegasys.pantheon.ethereum.p2p.peers.DefaultPeer.DEFAULT_PORT; import static tech.pegasys.pantheon.ethereum.p2p.peers.DefaultPeer.DEFAULT_PORT;
import static tech.pegasys.pantheon.metrics.MetricCategory.DEFAULT_METRIC_CATEGORIES; import static tech.pegasys.pantheon.metrics.MetricCategory.DEFAULT_METRIC_CATEGORIES;
import static tech.pegasys.pantheon.metrics.prometheus.MetricsConfiguration.DEFAULT_METRICS_PORT; import static tech.pegasys.pantheon.metrics.prometheus.MetricsConfiguration.DEFAULT_METRICS_PORT;
@ -311,27 +310,6 @@ public class PantheonCommand implements DefaultCommandValues, Runnable {
private Long rpcWsRefreshDelay; private Long rpcWsRefreshDelay;
@Option(
names = {"--rpc-ws-refresh-delay"},
paramLabel = "<refresh delay>",
arity = "1",
description =
"Refresh delay of WebSocket subscription sync in milliseconds "
+ "(default: ${DEFAULT-VALUE})",
defaultValue = "" + DEFAULT_WEBSOCKET_REFRESH_DELAY)
private Long configureRefreshDelay(final Long refreshDelay) {
if (refreshDelay < DEFAULT_MIN_REFRESH_DELAY || refreshDelay > DEFAULT_MAX_REFRESH_DELAY) {
throw new ParameterException(
this.commandLine,
String.format(
"Refresh delay must be a positive integer between %s and %s",
String.valueOf(DEFAULT_MIN_REFRESH_DELAY),
String.valueOf(DEFAULT_MAX_REFRESH_DELAY)));
}
this.rpcWsRefreshDelay = refreshDelay;
return refreshDelay;
}
@Option( @Option(
names = {"--rpc-ws-authentication-enabled"}, names = {"--rpc-ws-authentication-enabled"},
description = description =
@ -718,7 +696,6 @@ public class PantheonCommand implements DefaultCommandValues, Runnable {
Arrays.asList( Arrays.asList(
"--rpc-ws-api", "--rpc-ws-api",
"--rpc-ws-apis", "--rpc-ws-apis",
"--rpc-ws-refresh-delay",
"--rpc-ws-host", "--rpc-ws-host",
"--rpc-ws-port", "--rpc-ws-port",
"--rpc-ws-authentication-enabled", "--rpc-ws-authentication-enabled",
@ -735,7 +712,6 @@ public class PantheonCommand implements DefaultCommandValues, Runnable {
webSocketConfiguration.setHost(rpcWsHost); webSocketConfiguration.setHost(rpcWsHost);
webSocketConfiguration.setPort(rpcWsPort); webSocketConfiguration.setPort(rpcWsPort);
webSocketConfiguration.setRpcApis(rpcWsApis); webSocketConfiguration.setRpcApis(rpcWsApis);
webSocketConfiguration.setRefreshDelay(rpcWsRefreshDelay);
webSocketConfiguration.setAuthenticationEnabled(isRpcWsAuthenticationEnabled); webSocketConfiguration.setAuthenticationEnabled(isRpcWsAuthenticationEnabled);
webSocketConfiguration.setAuthenticationCredentialsFile(rpcWsAuthenticationCredentialsFile()); webSocketConfiguration.setAuthenticationCredentialsFile(rpcWsAuthenticationCredentialsFile());
webSocketConfiguration.setHostsWhitelist(hostsWhitelist); webSocketConfiguration.setHostsWhitelist(hostsWhitelist);

@ -793,28 +793,6 @@ public class PantheonCommandTest extends CommandTestAbstract {
assertThat(commandErrorOutput.toString()).isEmpty(); assertThat(commandErrorOutput.toString()).isEmpty();
} }
@Test
public void rpcWsRefreshDelayMustBeUsed() {
parseCommand("--rpc-ws-enabled", "--rpc-ws-refresh-delay", "2000");
verify(mockRunnerBuilder).webSocketConfiguration(wsRpcConfigArgumentCaptor.capture());
verify(mockRunnerBuilder).build();
assertThat(wsRpcConfigArgumentCaptor.getValue().getRefreshDelay()).isEqualTo(2000);
assertThat(commandOutput.toString()).isEmpty();
assertThat(commandErrorOutput.toString()).isEmpty();
}
@Test
public void rpcWsRefreshDelayWithNegativeValueMustError() {
parseCommand("--rpc-ws-enabled", "--rpc-ws-refresh-delay", "-2000");
assertThat(commandOutput.toString()).isEmpty();
final String expectedErrorMsg = "Refresh delay must be a positive integer between";
assertThat(commandErrorOutput.toString()).startsWith(expectedErrorMsg);
}
@Test @Test
public void bannedNodeIdsOptionMustBeUsed() { public void bannedNodeIdsOptionMustBeUsed() {
final String[] nodes = {"0001", "0002", "0003"}; final String[] nodes = {"0001", "0002", "0003"};
@ -1368,19 +1346,10 @@ public class PantheonCommandTest extends CommandTestAbstract {
@Test @Test
public void rpcWsOptionsRequiresServiceToBeEnabled() { public void rpcWsOptionsRequiresServiceToBeEnabled() {
parseCommand( parseCommand("--rpc-ws-api", "ETH,NET", "--rpc-ws-host", "0.0.0.0", "--rpc-ws-port", "1234");
"--rpc-ws-api",
"ETH,NET",
"--rpc-ws-host",
"0.0.0.0",
"--rpc-ws-port",
"1234",
"--rpc-ws-refresh-delay",
"2");
verifyOptionsConstraintLoggerCall( verifyOptionsConstraintLoggerCall(
"--rpc-ws-host, --rpc-ws-port, --rpc-ws-api and --rpc-ws-refresh-delay", "--rpc-ws-host, --rpc-ws-port and --rpc-ws-api", "--rpc-ws-enabled");
"--rpc-ws-enabled");
assertThat(commandOutput.toString()).isEmpty(); assertThat(commandOutput.toString()).isEmpty();
assertThat(commandErrorOutput.toString()).isEmpty(); assertThat(commandErrorOutput.toString()).isEmpty();

@ -50,7 +50,6 @@ rpc-ws-api=["DEBUG","ETH"]
rpc-ws-apis=["DEBUG","ETH"] rpc-ws-apis=["DEBUG","ETH"]
rpc-ws-host="9.10.11.12" rpc-ws-host="9.10.11.12"
rpc-ws-port=9101 rpc-ws-port=9101
rpc-ws-refresh-delay=500
rpc-ws-authentication-enabled=false rpc-ws-authentication-enabled=false
rpc-ws-authentication-credentials-file="none" rpc-ws-authentication-credentials-file="none"

Loading…
Cancel
Save