PAN-2592: Rename methods that create and return streams away from getX() (#1368)

* Change all Stream<?> getX() and Stream<?> x() methods to Stream<?> streanX methods, such as `Stream<Peer> streamIdlePeers()`
* Update coding conventions to reflect this.


Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
Danno Ferrin 6 years ago committed by GitHub
parent 2026ab9d1a
commit c028352642
  1. 4
      CODING-CONVENTIONS.md
  2. 2
      config/src/main/java/tech/pegasys/pantheon/config/GenesisConfigFile.java
  3. 8
      config/src/test/java/tech/pegasys/pantheon/config/GenesisConfigFileTest.java
  4. 2
      ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/chain/GenesisState.java
  5. 2
      ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/WorldState.java
  6. 4
      ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/worldstate/DebuggableMutableWorldState.java
  7. 8
      ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/worldstate/DefaultMutableWorldState.java
  8. 4
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeers.java
  9. 2
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/PendingPeerRequest.java
  10. 4
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockBroadcaster.java
  11. 2
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/TrailingPeerLimiter.java
  12. 2
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/PivotBlockRetriever.java
  13. 2
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionSender.java
  14. 2
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/RespondingEthPeer.java
  15. 4
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockBroadcasterTest.java
  16. 2
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/TrailingPeerLimiterTest.java
  17. 2
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncChainDownloaderTest.java
  18. 4
      ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/methods/DebugMetrics.java
  19. 4
      ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/methods/DebugMetricsTest.java
  20. 2
      ethereum/mock-p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/testing/MockNetwork.java
  21. 2
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/NoopP2PNetwork.java
  22. 2
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/api/P2PNetwork.java
  23. 6
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/discovery/PeerDiscoveryAgent.java
  24. 2
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/Bucket.java
  25. 6
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/PeerDiscoveryController.java
  26. 10
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/PeerTable.java
  27. 6
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/network/DefaultP2PNetwork.java
  28. 57
      ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/PeerDiscoveryAgentTest.java
  29. 9
      ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/PeerDiscoveryBondingTest.java
  30. 12
      ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/PeerDiscoveryBootstrappingTest.java
  31. 20
      ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/PeerDiscoveryTimestampsTest.java
  32. 16
      ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/BucketTest.java
  33. 77
      ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/PeerDiscoveryControllerTest.java
  34. 10
      ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/PeerDiscoveryTableRefreshTest.java
  35. 12
      ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/PeerTableTest.java
  36. 30
      ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/network/DefaultP2PNetworkTest.java
  37. 4
      ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/network/NetworkingServiceLifecycleTest.java
  38. 6
      metrics/core/src/main/java/tech/pegasys/pantheon/metrics/MetricsSystem.java
  39. 4
      metrics/core/src/main/java/tech/pegasys/pantheon/metrics/noop/NoOpMetricsSystem.java
  40. 2
      metrics/core/src/main/java/tech/pegasys/pantheon/metrics/prometheus/PrometheusMetricsSystem.java
  41. 24
      metrics/core/src/test/java/tech/pegasys/pantheon/metrics/prometheus/PrometheusMetricsSystemTest.java

@ -149,7 +149,9 @@ Method parameters must be final. Class level and local fields should be final w
* Getters follow idiomatic format with `get` prefix. For example, `getBlock()` gets a block property. * Getters follow idiomatic format with `get` prefix. For example, `getBlock()` gets a block property.
* Setters follow idiomatic format with `set` prefix. For example, `setBlock(Block block)` sets a block property. * Setters follow idiomatic format with `set` prefix. For example, `setBlock(Block block)` sets a block property.
* For `toString methods`, use the Guava 18+ `MoreObjects.toStringHelper` * The Setter pattern should not be used for chained builder methods.
* Methods returning a `Stream` should be prefixed with `stream`. For example `streamIdlePeers()` returns a stream of the idle peers.
* For `toString` methods use the Guava 18+ `MoreObjects.toStringHelper`
* Equals and `hashCode()` methods use the `Object.equals` and `Object.hash` methods (this is the _Java 7+_ template in IntelliJ. Don’t accept subclasses and don’t use getters) * Equals and `hashCode()` methods use the `Object.equals` and `Object.hash` methods (this is the _Java 7+_ template in IntelliJ. Don’t accept subclasses and don’t use getters)
## 4.2.4 Testing ## 4.2.4 Testing

@ -64,7 +64,7 @@ public class GenesisConfigFile {
return new JsonGenesisConfigOptions(configRoot.getJsonObject("config")); return new JsonGenesisConfigOptions(configRoot.getJsonObject("config"));
} }
public Stream<GenesisAllocation> getAllocations() { public Stream<GenesisAllocation> streamAllocations() {
final JsonObject allocations = configRoot.getJsonObject("alloc", new JsonObject()); final JsonObject allocations = configRoot.getJsonObject("alloc", new JsonObject());
return allocations.fieldNames().stream() return allocations.fieldNames().stream()
.map(key -> new GenesisAllocation(key, allocations.getJsonObject(key))); .map(key -> new GenesisAllocation(key, allocations.getJsonObject(key)));

@ -35,7 +35,7 @@ public class GenesisConfigFileTest {
// Sanity check some basic properties to confirm this is the mainnet file. // Sanity check some basic properties to confirm this is the mainnet file.
assertThat(config.getConfigOptions().isEthHash()).isTrue(); assertThat(config.getConfigOptions().isEthHash()).isTrue();
assertThat(config.getConfigOptions().getChainId()).hasValue(MAINNET_CHAIN_ID); assertThat(config.getConfigOptions().getChainId()).hasValue(MAINNET_CHAIN_ID);
assertThat(config.getAllocations().map(GenesisAllocation::getAddress)) assertThat(config.streamAllocations().map(GenesisAllocation::getAddress))
.contains( .contains(
"000d836201318ec6899a67540690382780743280", "000d836201318ec6899a67540690382780743280",
"001762430ea9c3a26e5749afdb70da5f78ddbb8c", "001762430ea9c3a26e5749afdb70da5f78ddbb8c",
@ -48,7 +48,7 @@ public class GenesisConfigFileTest {
// Sanity check some basic properties to confirm this is the dev file. // Sanity check some basic properties to confirm this is the dev file.
assertThat(config.getConfigOptions().isEthHash()).isTrue(); assertThat(config.getConfigOptions().isEthHash()).isTrue();
assertThat(config.getConfigOptions().getChainId()).hasValue(DEVELOPMENT_CHAIN_ID); assertThat(config.getConfigOptions().getChainId()).hasValue(DEVELOPMENT_CHAIN_ID);
assertThat(config.getAllocations().map(GenesisAllocation::getAddress)) assertThat(config.streamAllocations().map(GenesisAllocation::getAddress))
.contains( .contains(
"fe3b557e8fb62b89f4916b721be55ceb828dbd73", "fe3b557e8fb62b89f4916b721be55ceb828dbd73",
"627306090abab3a6e1400e9345bc60c78a8bef57", "627306090abab3a6e1400e9345bc60c78a8bef57",
@ -155,7 +155,7 @@ public class GenesisConfigFileTest {
final Map<String, String> allocations = final Map<String, String> allocations =
config config
.getAllocations() .streamAllocations()
.collect( .collect(
Collectors.toMap(GenesisAllocation::getAddress, GenesisAllocation::getBalance)); Collectors.toMap(GenesisAllocation::getAddress, GenesisAllocation::getBalance));
assertThat(allocations) assertThat(allocations)
@ -168,7 +168,7 @@ public class GenesisConfigFileTest {
@Test @Test
public void shouldGetEmptyAllocationsWhenAllocNotPresent() { public void shouldGetEmptyAllocationsWhenAllocNotPresent() {
final GenesisConfigFile config = GenesisConfigFile.fromConfig("{}"); final GenesisConfigFile config = GenesisConfigFile.fromConfig("{}");
assertThat(config.getAllocations()).isEmpty(); assertThat(config.streamAllocations()).isEmpty();
} }
@Test @Test

@ -189,7 +189,7 @@ public final class GenesisState {
} }
private static Stream<GenesisAccount> parseAllocations(final GenesisConfigFile genesis) { private static Stream<GenesisAccount> parseAllocations(final GenesisConfigFile genesis) {
return genesis.getAllocations().map(GenesisAccount::fromAllocation); return genesis.streamAllocations().map(GenesisAccount::fromAllocation);
} }
private static long parseNonce(final GenesisConfigFile genesis) { private static long parseNonce(final GenesisConfigFile genesis) {

@ -37,5 +37,5 @@ public interface WorldState extends WorldView {
* @return a stream of all the accounts (in no particular order) contained in the world state * @return a stream of all the accounts (in no particular order) contained in the world state
* represented by the root hash of this object at the time of the call. * represented by the root hash of this object at the time of the call.
*/ */
Stream<Account> accounts(); Stream<Account> streamAccounts();
} }

@ -73,7 +73,7 @@ public class DebuggableMutableWorldState extends DefaultMutableWorldState {
} }
@Override @Override
public Stream<Account> accounts() { public Stream<Account> streamAccounts() {
return info.accounts.stream().map(this::get).filter(Objects::nonNull); return info.accounts.stream().map(this::get).filter(Objects::nonNull);
} }
@ -81,7 +81,7 @@ public class DebuggableMutableWorldState extends DefaultMutableWorldState {
public String toString() { public String toString() {
final StringBuilder builder = new StringBuilder(); final StringBuilder builder = new StringBuilder();
builder.append(rootHash()).append(":\n"); builder.append(rootHash()).append(":\n");
accounts() streamAccounts()
.forEach( .forEach(
account -> { account -> {
final Address address = account.getAddress(); final Address address = account.getAddress();

@ -103,13 +103,13 @@ public class DefaultMutableWorldState implements MutableWorldState {
private AccountState deserializeAccount( private AccountState deserializeAccount(
final Address address, final Hash addressHash, final BytesValue encoded) throws RLPException { final Address address, final Hash addressHash, final BytesValue encoded) throws RLPException {
final RLPInput in = RLP.input(encoded); final RLPInput in = RLP.input(encoded);
StateTrieAccountValue accountValue = StateTrieAccountValue.readFrom(in); final StateTrieAccountValue accountValue = StateTrieAccountValue.readFrom(in);
return new AccountState(address, addressHash, accountValue); return new AccountState(address, addressHash, accountValue);
} }
private static BytesValue serializeAccount( private static BytesValue serializeAccount(
final long nonce, final Wei balance, final Hash storageRoot, final Hash codeHash) { final long nonce, final Wei balance, final Hash storageRoot, final Hash codeHash) {
StateTrieAccountValue accountValue = final StateTrieAccountValue accountValue =
new StateTrieAccountValue(nonce, balance, storageRoot, codeHash); new StateTrieAccountValue(nonce, balance, storageRoot, codeHash);
return RLP.encode(accountValue::writeTo); return RLP.encode(accountValue::writeTo);
} }
@ -120,7 +120,7 @@ public class DefaultMutableWorldState implements MutableWorldState {
} }
@Override @Override
public Stream<Account> accounts() { public Stream<Account> streamAccounts() {
// TODO: the current trie implementation doesn't have walking capability yet (pending NC-746) // TODO: the current trie implementation doesn't have walking capability yet (pending NC-746)
// so this can't be implemented. // so this can't be implemented.
throw new UnsupportedOperationException("TODO"); throw new UnsupportedOperationException("TODO");
@ -226,7 +226,7 @@ public class DefaultMutableWorldState implements MutableWorldState {
return updatedCode; return updatedCode;
} }
// No code is common, save the KV-store lookup. // No code is common, save the KV-store lookup.
Hash codeHash = getCodeHash(); final Hash codeHash = getCodeHash();
if (codeHash.equals(Hash.EMPTY)) { if (codeHash.equals(Hash.EMPTY)) {
return BytesValue.EMPTY; return BytesValue.EMPTY;
} }

@ -119,12 +119,12 @@ public class EthPeers {
return connections.size(); return connections.size();
} }
public Stream<EthPeer> availablePeers() { public Stream<EthPeer> streamAvailablePeers() {
return connections.values().stream().filter(EthPeer::readyForRequests); return connections.values().stream().filter(EthPeer::readyForRequests);
} }
public Optional<EthPeer> bestPeer() { public Optional<EthPeer> bestPeer() {
return availablePeers().max(BEST_CHAIN); return streamAvailablePeers().max(BEST_CHAIN);
} }
@FunctionalInterface @FunctionalInterface

@ -81,7 +81,7 @@ public class PendingPeerRequest {
return peer.isPresent() return peer.isPresent()
? peer ? peer
: ethPeers : ethPeers
.availablePeers() .streamAvailablePeers()
.filter(peer -> peer.chainState().getEstimatedHeight() >= minimumBlockNumber) .filter(peer -> peer.chainState().getEstimatedHeight() >= minimumBlockNumber)
.min(EthPeers.LEAST_TO_MOST_BUSY); .min(EthPeers.LEAST_TO_MOST_BUSY);
} }

@ -34,14 +34,14 @@ public class BlockBroadcaster {
final NewBlockMessage newBlockMessage = NewBlockMessage.create(block, totalDifficulty); final NewBlockMessage newBlockMessage = NewBlockMessage.create(block, totalDifficulty);
ethContext ethContext
.getEthPeers() .getEthPeers()
.availablePeers() .streamAvailablePeers()
.filter(ethPeer -> !ethPeer.hasSeenBlock(block.getHash())) .filter(ethPeer -> !ethPeer.hasSeenBlock(block.getHash()))
.forEach( .forEach(
ethPeer -> { ethPeer -> {
ethPeer.registerKnownBlock(block.getHash()); ethPeer.registerKnownBlock(block.getHash());
try { try {
ethPeer.send(newBlockMessage); ethPeer.send(newBlockMessage);
} catch (PeerConnection.PeerNotConnected e) { } catch (final PeerConnection.PeerNotConnected e) {
LOG.trace("Failed to broadcast new block to peer", e); LOG.trace("Failed to broadcast new block to peer", e);
} }
}); });

@ -56,7 +56,7 @@ public class TrailingPeerLimiter implements BlockAddedObserver {
final long maxTrailingPeers = requirements.getMaxTrailingPeers(); final long maxTrailingPeers = requirements.getMaxTrailingPeers();
final List<EthPeer> trailingPeers = final List<EthPeer> trailingPeers =
ethPeers ethPeers
.availablePeers() .streamAvailablePeers()
.filter(peer -> peer.chainState().hasEstimatedHeight()) .filter(peer -> peer.chainState().hasEstimatedHeight())
.filter(peer -> peer.chainState().getEstimatedHeight() < minimumHeightToBeUpToDate) .filter(peer -> peer.chainState().getEstimatedHeight() < minimumHeightToBeUpToDate)
.sorted(BY_CHAIN_HEIGHT) .sorted(BY_CHAIN_HEIGHT)

@ -75,7 +75,7 @@ public class PivotBlockRetriever<C> {
final List<EthPeer> peersToQuery = final List<EthPeer> peersToQuery =
ethContext ethContext
.getEthPeers() .getEthPeers()
.availablePeers() .streamAvailablePeers()
.filter(peer -> peer.chainState().getEstimatedHeight() >= pivotBlockNumber) .filter(peer -> peer.chainState().getEstimatedHeight() >= pivotBlockNumber)
.collect(Collectors.toList()); .collect(Collectors.toList());

@ -35,7 +35,7 @@ class TransactionSender implements TransactionBatchAddedListener {
public void onTransactionsAdded(final Iterable<Transaction> transactions) { public void onTransactionsAdded(final Iterable<Transaction> transactions) {
ethContext ethContext
.getEthPeers() .getEthPeers()
.availablePeers() .streamAvailablePeers()
.forEach( .forEach(
peer -> peer ->
transactions.forEach( transactions.forEach(

@ -214,7 +214,7 @@ public class RespondingEthPeer {
return Optional.of(outgoingMessages.peek().messageData); return Optional.of(outgoingMessages.peek().messageData);
} }
public Stream<MessageData> pendingOutgoingRequests() { public Stream<MessageData> streamPendingOutgoingRequests() {
return outgoingMessages.stream().map(OutgoingMessage::messageData); return outgoingMessages.stream().map(OutgoingMessage::messageData);
} }

@ -40,7 +40,7 @@ public class BlockBroadcasterTest {
public void blockPropagationUnitTest() throws PeerConnection.PeerNotConnected { public void blockPropagationUnitTest() throws PeerConnection.PeerNotConnected {
final EthPeer ethPeer = mock(EthPeer.class); final EthPeer ethPeer = mock(EthPeer.class);
final EthPeers ethPeers = mock(EthPeers.class); final EthPeers ethPeers = mock(EthPeers.class);
when(ethPeers.availablePeers()).thenReturn(Stream.of(ethPeer)); when(ethPeers.streamAvailablePeers()).thenReturn(Stream.of(ethPeer));
final EthContext ethContext = mock(EthContext.class); final EthContext ethContext = mock(EthContext.class);
when(ethContext.getEthPeers()).thenReturn(ethPeers); when(ethContext.getEthPeers()).thenReturn(ethPeers);
@ -63,7 +63,7 @@ public class BlockBroadcasterTest {
final EthPeer ethPeer1 = mock(EthPeer.class); final EthPeer ethPeer1 = mock(EthPeer.class);
final EthPeers ethPeers = mock(EthPeers.class); final EthPeers ethPeers = mock(EthPeers.class);
when(ethPeers.availablePeers()).thenReturn(Stream.of(ethPeer0, ethPeer1)); when(ethPeers.streamAvailablePeers()).thenReturn(Stream.of(ethPeer0, ethPeer1));
final EthContext ethContext = mock(EthContext.class); final EthContext ethContext = mock(EthContext.class);
when(ethContext.getEthPeers()).thenReturn(ethPeers); when(ethContext.getEthPeers()).thenReturn(ethPeers);

@ -55,7 +55,7 @@ public class TrailingPeerLimiterTest {
@Before @Before
public void setUp() { public void setUp() {
when(ethPeers.availablePeers()).then(invocation -> peers.stream()); when(ethPeers.streamAvailablePeers()).then(invocation -> peers.stream());
} }
@Test @Test

@ -414,7 +414,7 @@ public class FullSyncChainDownloaderTest {
// Check that any requests for checkpoint headers are only sent to the best peer // Check that any requests for checkpoint headers are only sent to the best peer
final long checkpointRequestsToOtherPeers = final long checkpointRequestsToOtherPeers =
otherPeers.stream() otherPeers.stream()
.map(RespondingEthPeer::pendingOutgoingRequests) .map(RespondingEthPeer::streamPendingOutgoingRequests)
.flatMap(Function.identity()) .flatMap(Function.identity())
.filter(m -> m.getCode() == EthPV62.GET_BLOCK_HEADERS) .filter(m -> m.getCode() == EthPV62.GET_BLOCK_HEADERS)
.map(GetBlockHeadersMessage::readFrom) .map(GetBlockHeadersMessage::readFrom)

@ -39,7 +39,9 @@ public class DebugMetrics implements JsonRpcMethod {
@Override @Override
public JsonRpcResponse response(final JsonRpcRequest request) { public JsonRpcResponse response(final JsonRpcRequest request) {
final Map<String, Object> observations = new HashMap<>(); final Map<String, Object> observations = new HashMap<>();
metricsSystem.getMetrics().forEach(observation -> addObservation(observations, observation)); metricsSystem
.streamObservations()
.forEach(observation -> addObservation(observations, observation));
return new JsonRpcSuccessResponse(request.getId(), observations); return new JsonRpcSuccessResponse(request.getId(), observations);
} }

@ -45,7 +45,7 @@ public class DebugMetricsTest {
@Test @Test
public void shouldReportUnlabelledObservationsByCategory() { public void shouldReportUnlabelledObservationsByCategory() {
when(metricsSystem.getMetrics()) when(metricsSystem.streamObservations())
.thenReturn( .thenReturn(
Stream.of( Stream.of(
new Observation(PEERS, "peer1", "peer1Value", Collections.emptyList()), new Observation(PEERS, "peer1", "peer1Value", Collections.emptyList()),
@ -62,7 +62,7 @@ public class DebugMetricsTest {
@Test @Test
public void shouldNestObservationsByLabel() { public void shouldNestObservationsByLabel() {
when(metricsSystem.getMetrics()) when(metricsSystem.streamObservations())
.thenReturn( .thenReturn(
Stream.of( Stream.of(
new Observation(PEERS, "peer1", "value1", asList("label1A", "label2A")), new Observation(PEERS, "peer1", "value1", asList("label1A", "label2A")),

@ -129,7 +129,7 @@ public final class MockNetwork {
} }
@Override @Override
public Stream<DiscoveryPeer> getDiscoveredPeers() { public Stream<DiscoveryPeer> streamDiscoveredPeers() {
return Stream.empty(); return Stream.empty();
} }

@ -35,7 +35,7 @@ public class NoopP2PNetwork implements P2PNetwork {
} }
@Override @Override
public Stream<DiscoveryPeer> getDiscoveredPeers() { public Stream<DiscoveryPeer> streamDiscoveredPeers() {
return Stream.empty(); return Stream.empty();
} }

@ -42,7 +42,7 @@ public interface P2PNetwork extends Closeable {
* *
* @return A stream of discovered peers on the network. * @return A stream of discovered peers on the network.
*/ */
Stream<DiscoveryPeer> getDiscoveredPeers(); Stream<DiscoveryPeer> streamDiscoveredPeers();
/** /**
* Connects to a {@link Peer}. * Connects to a {@link Peer}.

@ -162,7 +162,7 @@ public abstract class PeerDiscoveryAgent implements DisconnectCallback {
} }
private void startController() { private void startController() {
PeerDiscoveryController controller = createController(); final PeerDiscoveryController controller = createController();
this.controller = Optional.of(controller); this.controller = Optional.of(controller);
controller.start(); controller.start();
} }
@ -240,8 +240,8 @@ public abstract class PeerDiscoveryAgent implements DisconnectCallback {
}); });
} }
public Stream<DiscoveryPeer> getPeers() { public Stream<DiscoveryPeer> streamDiscoveredPeers() {
return controller.map(PeerDiscoveryController::getPeers).orElse(Stream.empty()); return controller.map(PeerDiscoveryController::streamDiscoveredPeers).orElse(Stream.empty());
} }
public Optional<DiscoveryPeer> getAdvertisedPeer() { public Optional<DiscoveryPeer> getAdvertisedPeer() {

@ -134,7 +134,7 @@ public class Bucket {
* *
* @return immutable view of the peer array * @return immutable view of the peer array
*/ */
synchronized List<DiscoveryPeer> peers() { synchronized List<DiscoveryPeer> getPeers() {
return unmodifiableList(asList(Arrays.copyOf(kBucket, tailIndex + 1))); return unmodifiableList(asList(Arrays.copyOf(kBucket, tailIndex + 1)));
} }

@ -219,7 +219,7 @@ public class PeerDiscoveryController {
// if smart contract permissioning is enabled, bond with bootnodes // if smart contract permissioning is enabled, bond with bootnodes
if (nodePermissioningController.get().getSyncStatusNodePermissioningProvider().isPresent()) { if (nodePermissioningController.get().getSyncStatusNodePermissioningProvider().isPresent()) {
for (DiscoveryPeer p : initialDiscoveryPeers) { for (final DiscoveryPeer p : initialDiscoveryPeers) {
bond(p); bond(p);
} }
} }
@ -554,8 +554,8 @@ public class PeerDiscoveryController {
* *
* @return List of peers. * @return List of peers.
*/ */
public Stream<DiscoveryPeer> getPeers() { public Stream<DiscoveryPeer> streamDiscoveredPeers() {
return peerTable.getAllPeers(); return peerTable.streamAllPeers();
} }
public void setRetryDelayFunction(final RetryDelayFunction retryDelayFunction) { public void setRetryDelayFunction(final RetryDelayFunction retryDelayFunction) {

@ -154,7 +154,7 @@ public class PeerTable {
distanceCache.remove(id); distanceCache.remove(id);
if (table[distance].peers().isEmpty()) { if (table[distance].getPeers().isEmpty()) {
return EvictResult.absent(); return EvictResult.absent();
} }
@ -176,7 +176,7 @@ public class PeerTable {
private void buildBloomFilter() { private void buildBloomFilter() {
final BloomFilter<BytesValue> bf = final BloomFilter<BytesValue> bf =
BloomFilter.create((id, val) -> val.putBytes(id.extractArray()), maxEntriesCnt, 0.001); BloomFilter.create((id, val) -> val.putBytes(id.extractArray()), maxEntriesCnt, 0.001);
getAllPeers().map(Peer::getId).forEach(bf::put); streamAllPeers().map(Peer::getId).forEach(bf::put);
this.evictionCnt = 0; this.evictionCnt = 0;
this.idBloom = bf; this.idBloom = bf;
} }
@ -191,15 +191,15 @@ public class PeerTable {
*/ */
public List<DiscoveryPeer> nearestPeers(final BytesValue target, final int limit) { public List<DiscoveryPeer> nearestPeers(final BytesValue target, final int limit) {
final BytesValue keccak256 = Hash.keccak256(target); final BytesValue keccak256 = Hash.keccak256(target);
return getAllPeers() return streamAllPeers()
.filter(p -> p.getStatus() == PeerDiscoveryStatus.BONDED) .filter(p -> p.getStatus() == PeerDiscoveryStatus.BONDED)
.sorted(comparingInt((peer) -> distance(peer.keccak256(), keccak256))) .sorted(comparingInt((peer) -> distance(peer.keccak256(), keccak256)))
.limit(limit) .limit(limit)
.collect(toList()); .collect(toList());
} }
public Stream<DiscoveryPeer> getAllPeers() { public Stream<DiscoveryPeer> streamAllPeers() {
return Arrays.stream(table).flatMap(e -> e.peers().stream()); return Arrays.stream(table).flatMap(e -> e.getPeers().stream());
} }
/** /**

@ -400,7 +400,7 @@ public class DefaultP2PNetwork implements P2PNetwork {
} }
final List<DiscoveryPeer> peers = final List<DiscoveryPeer> peers =
getDiscoveredPeers() streamDiscoveredPeers()
.filter(peer -> peer.getStatus() == PeerDiscoveryStatus.BONDED) .filter(peer -> peer.getStatus() == PeerDiscoveryStatus.BONDED)
.filter(peer -> !isConnected(peer) && !isConnecting(peer)) .filter(peer -> !isConnected(peer) && !isConnecting(peer))
.collect(Collectors.toList()); .collect(Collectors.toList());
@ -426,8 +426,8 @@ public class DefaultP2PNetwork implements P2PNetwork {
} }
@Override @Override
public Stream<DiscoveryPeer> getDiscoveredPeers() { public Stream<DiscoveryPeer> streamDiscoveredPeers() {
return peerDiscoveryAgent.getPeers(); return peerDiscoveryAgent.streamDiscoveredPeers();
} }
@Override @Override

@ -46,7 +46,7 @@ public class PeerDiscoveryAgentTest {
public void neighborsPacketFromUnbondedPeerIsDropped() { public void neighborsPacketFromUnbondedPeerIsDropped() {
// Start an agent with no bootstrap peers. // Start an agent with no bootstrap peers.
final MockPeerDiscoveryAgent agent = helper.startDiscoveryAgent(Collections.emptyList()); final MockPeerDiscoveryAgent agent = helper.startDiscoveryAgent(Collections.emptyList());
assertThat(agent.getPeers()).isEmpty(); assertThat(agent.streamDiscoveredPeers()).isEmpty();
// Start a test peer // Start a test peer
final MockPeerDiscoveryAgent otherNode = helper.startDiscoveryAgent(); final MockPeerDiscoveryAgent otherNode = helper.startDiscoveryAgent();
@ -57,7 +57,7 @@ public class PeerDiscoveryAgentTest {
final Packet packet = Packet.create(PacketType.NEIGHBORS, data, otherNode.getKeyPair()); final Packet packet = Packet.create(PacketType.NEIGHBORS, data, otherNode.getKeyPair());
helper.sendMessageBetweenAgents(otherNode, agent, packet); helper.sendMessageBetweenAgents(otherNode, agent, packet);
assertThat(agent.getPeers()).isEmpty(); assertThat(agent.streamDiscoveredPeers()).isEmpty();
} }
@Test @Test
@ -77,8 +77,9 @@ public class PeerDiscoveryAgentTest {
// list. By moving to a contains we make sure that all the peers are loaded with tolerance for // list. By moving to a contains we make sure that all the peers are loaded with tolerance for
// duplicates. If we fix the duplication problem we should use containsExactlyInAnyOrder to // duplicates. If we fix the duplication problem we should use containsExactlyInAnyOrder to
// hedge against missing one and duplicating another. // hedge against missing one and duplicating another.
assertThat(agent.getPeers()).contains(otherPeers.toArray(new DiscoveryPeer[20])); assertThat(agent.streamDiscoveredPeers()).contains(otherPeers.toArray(new DiscoveryPeer[20]));
assertThat(agent.getPeers()).allMatch(p -> p.getStatus() == PeerDiscoveryStatus.BONDED); assertThat(agent.streamDiscoveredPeers())
.allMatch(p -> p.getStatus() == PeerDiscoveryStatus.BONDED);
// Use additional agent to exchange messages with agent // Use additional agent to exchange messages with agent
final MockPeerDiscoveryAgent testAgent = helper.startDiscoveryAgent(); final MockPeerDiscoveryAgent testAgent = helper.startDiscoveryAgent();
@ -96,12 +97,12 @@ public class PeerDiscoveryAgentTest {
helper.sendMessageBetweenAgents(testAgent, agent, packet); helper.sendMessageBetweenAgents(testAgent, agent, packet);
// Check response packet // Check response packet
List<IncomingPacket> incomingPackets = final List<IncomingPacket> incomingPackets =
testAgent.getIncomingPackets().stream() testAgent.getIncomingPackets().stream()
.filter(p -> p.packet.getType().equals(PacketType.NEIGHBORS)) .filter(p -> p.packet.getType().equals(PacketType.NEIGHBORS))
.collect(toList()); .collect(toList());
assertThat(incomingPackets.size()).isEqualTo(1); assertThat(incomingPackets.size()).isEqualTo(1);
IncomingPacket neighborsPacket = incomingPackets.get(0); final IncomingPacket neighborsPacket = incomingPackets.get(0);
assertThat(neighborsPacket.fromAgent).isEqualTo(agent); assertThat(neighborsPacket.fromAgent).isEqualTo(agent);
// Assert that we only received 16 items. // Assert that we only received 16 items.
@ -128,12 +129,12 @@ public class PeerDiscoveryAgentTest {
final MockPeerDiscoveryAgent peerDiscoveryAgent2 = helper.startDiscoveryAgent(peer); final MockPeerDiscoveryAgent peerDiscoveryAgent2 = helper.startDiscoveryAgent(peer);
peerDiscoveryAgent2.start(BROADCAST_TCP_PORT).join(); peerDiscoveryAgent2.start(BROADCAST_TCP_PORT).join();
assertThat(peerDiscoveryAgent2.getPeers().collect(toList()).size()).isEqualTo(1); assertThat(peerDiscoveryAgent2.streamDiscoveredPeers().collect(toList()).size()).isEqualTo(1);
final PeerConnection peerConnection = createAnonymousPeerConnection(peer.getId()); final PeerConnection peerConnection = createAnonymousPeerConnection(peer.getId());
peerDiscoveryAgent2.onDisconnect(peerConnection, DisconnectReason.REQUESTED, true); peerDiscoveryAgent2.onDisconnect(peerConnection, DisconnectReason.REQUESTED, true);
assertThat(peerDiscoveryAgent2.getPeers().collect(toList()).size()).isEqualTo(0); assertThat(peerDiscoveryAgent2.streamDiscoveredPeers().collect(toList()).size()).isEqualTo(0);
} }
@Test @Test
@ -148,24 +149,24 @@ public class PeerDiscoveryAgentTest {
// Bond to peer // Bond to peer
bondViaIncomingPing(agent, otherNode); bondViaIncomingPing(agent, otherNode);
assertThat(agent.getPeers()).hasSize(1); assertThat(agent.streamDiscoveredPeers()).hasSize(1);
// Disconnect with innocuous reason // Disconnect with innocuous reason
blacklist.onDisconnect(wirePeer, DisconnectReason.TOO_MANY_PEERS, false); blacklist.onDisconnect(wirePeer, DisconnectReason.TOO_MANY_PEERS, false);
agent.onDisconnect(wirePeer, DisconnectReason.TOO_MANY_PEERS, false); agent.onDisconnect(wirePeer, DisconnectReason.TOO_MANY_PEERS, false);
// Confirm peer was removed // Confirm peer was removed
assertThat(agent.getPeers()).hasSize(0); assertThat(agent.streamDiscoveredPeers()).hasSize(0);
// Bond again // Bond again
bondViaIncomingPing(agent, otherNode); bondViaIncomingPing(agent, otherNode);
// Check peer was allowed to connect // Check peer was allowed to connect
assertThat(agent.getPeers()).hasSize(1); assertThat(agent.streamDiscoveredPeers()).hasSize(1);
} }
protected void bondViaIncomingPing( protected void bondViaIncomingPing(
final MockPeerDiscoveryAgent agent, final MockPeerDiscoveryAgent otherNode) { final MockPeerDiscoveryAgent agent, final MockPeerDiscoveryAgent otherNode) {
Packet pingPacket = helper.createPingPacket(otherNode, agent); final Packet pingPacket = helper.createPingPacket(otherNode, agent);
helper.sendMessageBetweenAgents(otherNode, agent, pingPacket); helper.sendMessageBetweenAgents(otherNode, agent, pingPacket);
} }
@ -181,19 +182,19 @@ public class PeerDiscoveryAgentTest {
// Bond to peer // Bond to peer
bondViaIncomingPing(agent, otherNode); bondViaIncomingPing(agent, otherNode);
assertThat(agent.getPeers()).hasSize(1); assertThat(agent.streamDiscoveredPeers()).hasSize(1);
// Disconnect with problematic reason // Disconnect with problematic reason
blacklist.onDisconnect(wirePeer, DisconnectReason.BREACH_OF_PROTOCOL, false); blacklist.onDisconnect(wirePeer, DisconnectReason.BREACH_OF_PROTOCOL, false);
agent.onDisconnect(wirePeer, DisconnectReason.BREACH_OF_PROTOCOL, false); agent.onDisconnect(wirePeer, DisconnectReason.BREACH_OF_PROTOCOL, false);
// Confirm peer was removed // Confirm peer was removed
assertThat(agent.getPeers()).hasSize(0); assertThat(agent.streamDiscoveredPeers()).hasSize(0);
// Bond again // Bond again
bondViaIncomingPing(agent, otherNode); bondViaIncomingPing(agent, otherNode);
// Check peer was not allowed to connect // Check peer was not allowed to connect
assertThat(agent.getPeers()).hasSize(0); assertThat(agent.streamDiscoveredPeers()).hasSize(0);
} }
@Test @Test
@ -208,19 +209,19 @@ public class PeerDiscoveryAgentTest {
// Bond to peer // Bond to peer
bondViaIncomingPing(agent, otherNode); bondViaIncomingPing(agent, otherNode);
assertThat(agent.getPeers()).hasSize(1); assertThat(agent.streamDiscoveredPeers()).hasSize(1);
// Disconnect with problematic reason // Disconnect with problematic reason
blacklist.onDisconnect(wirePeer, DisconnectReason.BREACH_OF_PROTOCOL, true); blacklist.onDisconnect(wirePeer, DisconnectReason.BREACH_OF_PROTOCOL, true);
agent.onDisconnect(wirePeer, DisconnectReason.BREACH_OF_PROTOCOL, true); agent.onDisconnect(wirePeer, DisconnectReason.BREACH_OF_PROTOCOL, true);
// Confirm peer was removed // Confirm peer was removed
assertThat(agent.getPeers()).hasSize(0); assertThat(agent.streamDiscoveredPeers()).hasSize(0);
// Bond again // Bond again
bondViaIncomingPing(agent, otherNode); bondViaIncomingPing(agent, otherNode);
// Check peer was allowed to connect // Check peer was allowed to connect
assertThat(agent.getPeers()).hasSize(1); assertThat(agent.streamDiscoveredPeers()).hasSize(1);
} }
@Test @Test
@ -235,19 +236,19 @@ public class PeerDiscoveryAgentTest {
// Bond to peer // Bond to peer
bondViaIncomingPing(agent, otherNode); bondViaIncomingPing(agent, otherNode);
assertThat(agent.getPeers()).hasSize(1); assertThat(agent.streamDiscoveredPeers()).hasSize(1);
// Disconnect // Disconnect
blacklist.onDisconnect(wirePeer, DisconnectReason.INCOMPATIBLE_P2P_PROTOCOL_VERSION, false); blacklist.onDisconnect(wirePeer, DisconnectReason.INCOMPATIBLE_P2P_PROTOCOL_VERSION, false);
agent.onDisconnect(wirePeer, DisconnectReason.INCOMPATIBLE_P2P_PROTOCOL_VERSION, false); agent.onDisconnect(wirePeer, DisconnectReason.INCOMPATIBLE_P2P_PROTOCOL_VERSION, false);
// Confirm peer was removed // Confirm peer was removed
assertThat(agent.getPeers()).hasSize(0); assertThat(agent.streamDiscoveredPeers()).hasSize(0);
// Bond again // Bond again
bondViaIncomingPing(agent, otherNode); bondViaIncomingPing(agent, otherNode);
// Check peer was not allowed to connect // Check peer was not allowed to connect
assertThat(agent.getPeers()).hasSize(0); assertThat(agent.streamDiscoveredPeers()).hasSize(0);
} }
@Test @Test
@ -262,24 +263,24 @@ public class PeerDiscoveryAgentTest {
// Bond to peer // Bond to peer
bondViaIncomingPing(agent, otherNode); bondViaIncomingPing(agent, otherNode);
assertThat(agent.getPeers()).hasSize(1); assertThat(agent.streamDiscoveredPeers()).hasSize(1);
// Disconnect // Disconnect
blacklist.onDisconnect(wirePeer, DisconnectReason.INCOMPATIBLE_P2P_PROTOCOL_VERSION, true); blacklist.onDisconnect(wirePeer, DisconnectReason.INCOMPATIBLE_P2P_PROTOCOL_VERSION, true);
agent.onDisconnect(wirePeer, DisconnectReason.INCOMPATIBLE_P2P_PROTOCOL_VERSION, true); agent.onDisconnect(wirePeer, DisconnectReason.INCOMPATIBLE_P2P_PROTOCOL_VERSION, true);
// Confirm peer was removed // Confirm peer was removed
assertThat(agent.getPeers()).hasSize(0); assertThat(agent.streamDiscoveredPeers()).hasSize(0);
// Bond again // Bond again
bondViaIncomingPing(agent, otherNode); bondViaIncomingPing(agent, otherNode);
// Check peer was not allowed to connect // Check peer was not allowed to connect
assertThat(agent.getPeers()).hasSize(0); assertThat(agent.streamDiscoveredPeers()).hasSize(0);
} }
@Test @Test
public void shouldBeActiveWhenConfigIsTrue() { public void shouldBeActiveWhenConfigIsTrue() {
AgentBuilder agentBuilder = helper.agentBuilder().active(true); final AgentBuilder agentBuilder = helper.agentBuilder().active(true);
final MockPeerDiscoveryAgent agent = helper.startDiscoveryAgent(agentBuilder); final MockPeerDiscoveryAgent agent = helper.startDiscoveryAgent(agentBuilder);
assertThat(agent.isActive()).isTrue(); assertThat(agent.isActive()).isTrue();
@ -287,15 +288,15 @@ public class PeerDiscoveryAgentTest {
@Test @Test
public void shouldNotBeActiveWhenConfigIsFalse() { public void shouldNotBeActiveWhenConfigIsFalse() {
AgentBuilder agentBuilder = helper.agentBuilder().active(false); final AgentBuilder agentBuilder = helper.agentBuilder().active(false);
final MockPeerDiscoveryAgent agent = helper.startDiscoveryAgent(agentBuilder); final MockPeerDiscoveryAgent agent = helper.startDiscoveryAgent(agentBuilder);
assertThat(agent.isActive()).isFalse(); assertThat(agent.isActive()).isFalse();
} }
private PeerConnection createAnonymousPeerConnection(final BytesValue id) { private PeerConnection createAnonymousPeerConnection(final BytesValue id) {
PeerConnection conn = mock(PeerConnection.class); final PeerConnection conn = mock(PeerConnection.class);
PeerInfo peerInfo = new PeerInfo(0, null, null, 0, id); final PeerInfo peerInfo = new PeerInfo(0, null, null, 0, id);
when(conn.getPeerInfo()).thenReturn(peerInfo); when(conn.getPeerInfo()).thenReturn(peerInfo);
return conn; return conn;
} }

@ -53,8 +53,9 @@ public class PeerDiscoveryBondingTest {
assertThat(pong.getTo()).isEqualTo(otherAgent.getAdvertisedPeer().get().getEndpoint()); assertThat(pong.getTo()).isEqualTo(otherAgent.getAdvertisedPeer().get().getEndpoint());
// The agent considers the test peer BONDED. // The agent considers the test peer BONDED.
assertThat(agent.getPeers()).hasSize(1); assertThat(agent.streamDiscoveredPeers()).hasSize(1);
assertThat(agent.getPeers()).allMatch(p -> p.getStatus() == PeerDiscoveryStatus.BONDED); assertThat(agent.streamDiscoveredPeers())
.allMatch(p -> p.getStatus() == PeerDiscoveryStatus.BONDED);
} }
@Test @Test
@ -67,7 +68,7 @@ public class PeerDiscoveryBondingTest {
// we haven't bonded. // we haven't bonded.
final MockPeerDiscoveryAgent otherNode = helper.startDiscoveryAgent(); final MockPeerDiscoveryAgent otherNode = helper.startDiscoveryAgent();
final FindNeighborsPacketData data = FindNeighborsPacketData.create(otherNode.getId()); final FindNeighborsPacketData data = FindNeighborsPacketData.create(otherNode.getId());
Packet packet = Packet.create(PacketType.FIND_NEIGHBORS, data, otherNode.getKeyPair()); final Packet packet = Packet.create(PacketType.FIND_NEIGHBORS, data, otherNode.getKeyPair());
helper.sendMessageBetweenAgents(otherNode, agent, packet); helper.sendMessageBetweenAgents(otherNode, agent, packet);
// No responses received // No responses received
@ -84,7 +85,7 @@ public class PeerDiscoveryBondingTest {
.filter(p -> p.packet.getType().equals(PacketType.PONG)) .filter(p -> p.packet.getType().equals(PacketType.PONG))
.collect(Collectors.toList()); .collect(Collectors.toList());
assertThat(incomingPongs.size()).isEqualTo(1); assertThat(incomingPongs.size()).isEqualTo(1);
Optional<PongPacketData> maybePongData = final Optional<PongPacketData> maybePongData =
incomingPongs.get(0).packet.getPacketData(PongPacketData.class); incomingPongs.get(0).packet.getPacketData(PongPacketData.class);
assertThat(maybePongData).isPresent(); assertThat(maybePongData).isPresent();
assertThat(maybePongData.get().getTo()) assertThat(maybePongData.get().getTo())

@ -48,7 +48,7 @@ public class PeerDiscoveryBootstrappingTest {
.filter(p -> p.packet.getType().equals(PacketType.PING)) .filter(p -> p.packet.getType().equals(PacketType.PING))
.collect(toList()); .collect(toList());
assertThat(incomingPackets.size()).isEqualTo(1); assertThat(incomingPackets.size()).isEqualTo(1);
Packet pingPacket = incomingPackets.get(0).packet; final Packet pingPacket = incomingPackets.get(0).packet;
assertThat(pingPacket.getNodeId()).isEqualTo(agent.getAdvertisedPeer().get().getId()); assertThat(pingPacket.getNodeId()).isEqualTo(agent.getAdvertisedPeer().get().getId());
final PingPacketData pingData = pingPacket.getPacketData(PingPacketData.class).get(); final PingPacketData pingData = pingPacket.getPacketData(PingPacketData.class).get();
@ -69,7 +69,7 @@ public class PeerDiscoveryBootstrappingTest {
.collect(toList()); .collect(toList());
// Start five agents. // Start five agents.
List<MockPeerDiscoveryAgent> agents = helper.startDiscoveryAgents(5, bootstrapPeers); final List<MockPeerDiscoveryAgent> agents = helper.startDiscoveryAgents(5, bootstrapPeers);
// Assert that all test peers received a Find Neighbors packet. // Assert that all test peers received a Find Neighbors packet.
for (final MockPeerDiscoveryAgent bootstrapAgent : bootstrapAgents) { for (final MockPeerDiscoveryAgent bootstrapAgent : bootstrapAgents) {
@ -91,7 +91,7 @@ public class PeerDiscoveryBootstrappingTest {
assertThat(senderIds).containsExactlyInAnyOrderElementsOf(agentIds); assertThat(senderIds).containsExactlyInAnyOrderElementsOf(agentIds);
// Traverse all received pings. // Traverse all received pings.
List<Packet> pingPackets = final List<Packet> pingPackets =
packets.stream().filter(p -> p.getType().equals(PacketType.PING)).collect(toList()); packets.stream().filter(p -> p.getType().equals(PacketType.PING)).collect(toList());
for (final Packet packet : pingPackets) { for (final Packet packet : pingPackets) {
// Assert that the packet was a Find Neighbors one. // Assert that the packet was a Find Neighbors one.
@ -118,11 +118,11 @@ public class PeerDiscoveryBootstrappingTest {
final BytesValue[] otherPeersIds = final BytesValue[] otherPeersIds =
otherAgents.stream().map(PeerDiscoveryAgent::getId).toArray(BytesValue[]::new); otherAgents.stream().map(PeerDiscoveryAgent::getId).toArray(BytesValue[]::new);
assertThat(bootstrapAgent.getPeers()) assertThat(bootstrapAgent.streamDiscoveredPeers())
.extracting(Peer::getId) .extracting(Peer::getId)
.containsExactlyInAnyOrder(otherPeersIds); .containsExactlyInAnyOrder(otherPeersIds);
assertThat(bootstrapAgent.getPeers()) assertThat(bootstrapAgent.streamDiscoveredPeers())
.allMatch(p -> p.getStatus() == PeerDiscoveryStatus.BONDED); .allMatch(p -> p.getStatus() == PeerDiscoveryStatus.BONDED);
// This agent will bootstrap off the bootstrap peer, will add all nodes returned by the latter, // This agent will bootstrap off the bootstrap peer, will add all nodes returned by the latter,
@ -130,6 +130,6 @@ public class PeerDiscoveryBootstrappingTest {
// bond with them, ultimately adding all 7 nodes in the network to its table. // bond with them, ultimately adding all 7 nodes in the network to its table.
final PeerDiscoveryAgent newAgent = final PeerDiscoveryAgent newAgent =
helper.startDiscoveryAgent(bootstrapAgent.getAdvertisedPeer().get()); helper.startDiscoveryAgent(bootstrapAgent.getAdvertisedPeer().get());
assertThat(newAgent.getPeers()).hasSize(6); assertThat(newAgent.streamDiscoveredPeers()).hasSize(6);
} }
} }

@ -49,8 +49,8 @@ public class PeerDiscoveryTimestampsTest {
final MockPeerDiscoveryAgent agent = mock(MockPeerDiscoveryAgent.class); final MockPeerDiscoveryAgent agent = mock(MockPeerDiscoveryAgent.class);
when(agent.getAdvertisedPeer()).thenReturn(Optional.of(peers.get(0))); when(agent.getAdvertisedPeer()).thenReturn(Optional.of(peers.get(0)));
DiscoveryPeer localPeer = peers.get(0); final DiscoveryPeer localPeer = peers.get(0);
KeyPair localKeyPair = keypairs.get(0); final KeyPair localKeyPair = keypairs.get(0);
final PeerDiscoveryController controller = final PeerDiscoveryController controller =
new PeerDiscoveryController( new PeerDiscoveryController(
@ -79,9 +79,9 @@ public class PeerDiscoveryTimestampsTest {
final AtomicLong lastSeen = new AtomicLong(); final AtomicLong lastSeen = new AtomicLong();
final AtomicLong firstDiscovered = new AtomicLong(); final AtomicLong firstDiscovered = new AtomicLong();
assertThat(controller.getPeers()).hasSize(1); assertThat(controller.streamDiscoveredPeers()).hasSize(1);
DiscoveryPeer p = controller.getPeers().iterator().next(); DiscoveryPeer p = controller.streamDiscoveredPeers().iterator().next();
assertThat(p.getLastSeen()).isGreaterThan(0); assertThat(p.getLastSeen()).isGreaterThan(0);
assertThat(p.getFirstDiscovered()).isGreaterThan(0); assertThat(p.getFirstDiscovered()).isGreaterThan(0);
@ -90,9 +90,9 @@ public class PeerDiscoveryTimestampsTest {
controller.onMessage(packet, peers.get(1)); controller.onMessage(packet, peers.get(1));
assertThat(controller.getPeers()).hasSize(1); assertThat(controller.streamDiscoveredPeers()).hasSize(1);
p = controller.getPeers().iterator().next(); p = controller.streamDiscoveredPeers().iterator().next();
assertThat(p.getLastSeen()).isGreaterThan(lastSeen.get()); assertThat(p.getLastSeen()).isGreaterThan(lastSeen.get());
assertThat(p.getFirstDiscovered()).isEqualTo(firstDiscovered.get()); assertThat(p.getFirstDiscovered()).isEqualTo(firstDiscovered.get());
} }
@ -100,20 +100,20 @@ public class PeerDiscoveryTimestampsTest {
@Test @Test
public void lastContactedTimestampUpdatedOnOutboundMessage() { public void lastContactedTimestampUpdatedOnOutboundMessage() {
final MockPeerDiscoveryAgent agent = helper.startDiscoveryAgent(Collections.emptyList()); final MockPeerDiscoveryAgent agent = helper.startDiscoveryAgent(Collections.emptyList());
assertThat(agent.getPeers()).hasSize(0); assertThat(agent.streamDiscoveredPeers()).hasSize(0);
// Start a test peer and send a PING packet to the agent under test. // Start a test peer and send a PING packet to the agent under test.
final MockPeerDiscoveryAgent testAgent = helper.startDiscoveryAgent(); final MockPeerDiscoveryAgent testAgent = helper.startDiscoveryAgent();
final Packet ping = helper.createPingPacket(testAgent, agent); final Packet ping = helper.createPingPacket(testAgent, agent);
helper.sendMessageBetweenAgents(testAgent, agent, ping); helper.sendMessageBetweenAgents(testAgent, agent, ping);
assertThat(agent.getPeers()).hasSize(1); assertThat(agent.streamDiscoveredPeers()).hasSize(1);
final AtomicLong lastContacted = new AtomicLong(); final AtomicLong lastContacted = new AtomicLong();
final AtomicLong lastSeen = new AtomicLong(); final AtomicLong lastSeen = new AtomicLong();
final AtomicLong firstDiscovered = new AtomicLong(); final AtomicLong firstDiscovered = new AtomicLong();
DiscoveryPeer peer = agent.getPeers().iterator().next(); DiscoveryPeer peer = agent.streamDiscoveredPeers().iterator().next();
final long lc = peer.getLastContacted(); final long lc = peer.getLastContacted();
final long ls = peer.getLastSeen(); final long ls = peer.getLastSeen();
final long fd = peer.getFirstDiscovered(); final long fd = peer.getFirstDiscovered();
@ -129,7 +129,7 @@ public class PeerDiscoveryTimestampsTest {
// Send another packet and ensure that timestamps are updated accordingly. // Send another packet and ensure that timestamps are updated accordingly.
helper.sendMessageBetweenAgents(testAgent, agent, ping); helper.sendMessageBetweenAgents(testAgent, agent, ping);
peer = agent.getPeers().iterator().next(); peer = agent.streamDiscoveredPeers().iterator().next();
assertThat(peer.getLastContacted()).isGreaterThan(lastContacted.get()); assertThat(peer.getLastContacted()).isGreaterThan(lastContacted.get());
assertThat(peer.getLastSeen()).isGreaterThan(lastSeen.get()); assertThat(peer.getLastSeen()).isGreaterThan(lastSeen.get());

@ -59,7 +59,7 @@ public class BucketTest {
kBucket.add(peer); kBucket.add(peer);
} }
kBucket.getAndTouch(peers.get(0).getId()); kBucket.getAndTouch(peers.get(0).getId());
assertThat(kBucket.peers().indexOf(peers.get(0))).isEqualTo(0); assertThat(kBucket.getPeers().indexOf(peers.get(0))).isEqualTo(0);
} }
@Test @Test
@ -70,7 +70,7 @@ public class BucketTest {
kBucket.add(p); kBucket.add(p);
} }
kBucket.evict(peers.get(4)); kBucket.evict(peers.get(4));
assertFalse(kBucket.peers().contains(peers.get(4))); assertFalse(kBucket.getPeers().contains(peers.get(4)));
} }
@Test @Test
@ -103,19 +103,19 @@ public class BucketTest {
// 16. // 16.
assertThatThrownBy(() -> kBucket.add(peers.get(0))) assertThatThrownBy(() -> kBucket.add(peers.get(0)))
.isInstanceOf(IllegalArgumentException.class); .isInstanceOf(IllegalArgumentException.class);
assertThat(kBucket.peers()).hasSize(16); assertThat(kBucket.getPeers()).hasSize(16);
// Try to evict a peer that doesn't exist, and check the result is false. // Try to evict a peer that doesn't exist, and check the result is false.
assertThat(kBucket.evict(peers.get(17))).isFalse(); assertThat(kBucket.evict(peers.get(17))).isFalse();
assertThat(kBucket.peers()).hasSize(16); assertThat(kBucket.getPeers()).hasSize(16);
// Evict a peer from head, another from the middle, and the tail. // Evict a peer from head, another from the middle, and the tail.
assertThat(kBucket.evict(peers.get(0))).isTrue(); assertThat(kBucket.evict(peers.get(0))).isTrue();
assertThat(kBucket.peers()).hasSize(15); assertThat(kBucket.getPeers()).hasSize(15);
assertThat(kBucket.evict(peers.get(7))).isTrue(); assertThat(kBucket.evict(peers.get(7))).isTrue();
assertThat(kBucket.peers()).hasSize(14); assertThat(kBucket.getPeers()).hasSize(14);
assertThat(kBucket.evict(peers.get(15))).isTrue(); assertThat(kBucket.evict(peers.get(15))).isTrue();
assertThat(kBucket.peers()).hasSize(13); assertThat(kBucket.getPeers()).hasSize(13);
// Check that we can now add peers again. // Check that we can now add peers again.
assertThat(kBucket.add(peers.get(0))).isNotPresent(); assertThat(kBucket.add(peers.get(0))).isNotPresent();
@ -127,7 +127,7 @@ public class BucketTest {
assertThat(kBucket.getAndTouch(peers.get(6).getId())).isPresent().get().isEqualTo(peers.get(6)); assertThat(kBucket.getAndTouch(peers.get(6).getId())).isPresent().get().isEqualTo(peers.get(6));
assertThat(kBucket.getAndTouch(peers.get(9).getId())).isPresent().get().isEqualTo(peers.get(9)); assertThat(kBucket.getAndTouch(peers.get(9).getId())).isPresent().get().isEqualTo(peers.get(9));
assertThat(kBucket.peers()) assertThat(kBucket.getPeers())
.containsSequence( .containsSequence(
peers.get(9), peers.get(9),
peers.get(6), peers.get(6),

@ -145,7 +145,7 @@ public class PeerDiscoveryControllerTest {
.send(eq(p), matchPacketOfType(PacketType.PING))); .send(eq(p), matchPacketOfType(PacketType.PING)));
controller controller
.getPeers() .streamDiscoveredPeers()
.forEach(p -> assertThat(p.getStatus()).isEqualTo(PeerDiscoveryStatus.BONDING)); .forEach(p -> assertThat(p.getStatus()).isEqualTo(PeerDiscoveryStatus.BONDING));
} }
@ -283,7 +283,10 @@ public class PeerDiscoveryControllerTest {
controller.start(); controller.start();
assertThat(controller.getPeers().filter(p -> p.getStatus() == PeerDiscoveryStatus.BONDING)) assertThat(
controller
.streamDiscoveredPeers()
.filter(p -> p.getStatus() == PeerDiscoveryStatus.BONDING))
.hasSize(3); .hasSize(3);
// Simulate PONG messages from all peers // Simulate PONG messages from all peers
@ -307,9 +310,15 @@ public class PeerDiscoveryControllerTest {
.send(eq(peers.get(i)), matchPacketOfType(PacketType.FIND_NEIGHBORS)); .send(eq(peers.get(i)), matchPacketOfType(PacketType.FIND_NEIGHBORS));
} }
assertThat(controller.getPeers().filter(p -> p.getStatus() == PeerDiscoveryStatus.BONDING)) assertThat(
controller
.streamDiscoveredPeers()
.filter(p -> p.getStatus() == PeerDiscoveryStatus.BONDING))
.hasSize(0); .hasSize(0);
assertThat(controller.getPeers().filter(p -> p.getStatus() == PeerDiscoveryStatus.BONDED)) assertThat(
controller
.streamDiscoveredPeers()
.filter(p -> p.getStatus() == PeerDiscoveryStatus.BONDED))
.hasSize(3); .hasSize(3);
} }
@ -334,7 +343,10 @@ public class PeerDiscoveryControllerTest {
controller.start(); controller.start();
assertThat(controller.getPeers().filter(p -> p.getStatus() == PeerDiscoveryStatus.BONDING)) assertThat(
controller
.streamDiscoveredPeers()
.filter(p -> p.getStatus() == PeerDiscoveryStatus.BONDING))
.hasSize(3); .hasSize(3);
// Send a PONG packet from peer 1, with an incorrect hash. // Send a PONG packet from peer 1, with an incorrect hash.
@ -347,7 +359,10 @@ public class PeerDiscoveryControllerTest {
verify(outboundMessageHandler, never()) verify(outboundMessageHandler, never())
.send(eq(peers.get(1)), matchPacketOfType(PacketType.FIND_NEIGHBORS)); .send(eq(peers.get(1)), matchPacketOfType(PacketType.FIND_NEIGHBORS));
assertThat(controller.getPeers().filter(p -> p.getStatus() == PeerDiscoveryStatus.BONDING)) assertThat(
controller
.streamDiscoveredPeers()
.filter(p -> p.getStatus() == PeerDiscoveryStatus.BONDING))
.hasSize(3); .hasSize(3);
} }
@ -399,8 +414,8 @@ public class PeerDiscoveryControllerTest {
final FindNeighborsPacketData data = maybeData.get(); final FindNeighborsPacketData data = maybeData.get();
assertThat(data.getTarget()).isEqualTo(localPeer.getId()); assertThat(data.getTarget()).isEqualTo(localPeer.getId());
assertThat(controller.getPeers()).hasSize(1); assertThat(controller.streamDiscoveredPeers()).hasSize(1);
assertThat(controller.getPeers().findFirst().get().getStatus()) assertThat(controller.streamDiscoveredPeers().findFirst().get().getStatus())
.isEqualTo(PeerDiscoveryStatus.BONDED); .isEqualTo(PeerDiscoveryStatus.BONDED);
} }
@ -452,11 +467,11 @@ public class PeerDiscoveryControllerTest {
respondWithPong(peers.get(0), keyPairs.get(0), pingPacket.getHash()); respondWithPong(peers.get(0), keyPairs.get(0), pingPacket.getHash());
// Assert that we're bonding with the third peer. // Assert that we're bonding with the third peer.
assertThat(controller.getPeers()).hasSize(2); assertThat(controller.streamDiscoveredPeers()).hasSize(2);
assertThat(controller.getPeers()) assertThat(controller.streamDiscoveredPeers())
.filteredOn(p -> p.getStatus() == PeerDiscoveryStatus.BONDING) .filteredOn(p -> p.getStatus() == PeerDiscoveryStatus.BONDING)
.hasSize(1); .hasSize(1);
assertThat(controller.getPeers()) assertThat(controller.streamDiscoveredPeers())
.filteredOn(p -> p.getStatus() == PeerDiscoveryStatus.BONDED) .filteredOn(p -> p.getStatus() == PeerDiscoveryStatus.BONDED)
.hasSize(1); .hasSize(1);
@ -477,8 +492,8 @@ public class PeerDiscoveryControllerTest {
controller.onMessage(neighborsPacket0, peers.get(0)); controller.onMessage(neighborsPacket0, peers.get(0));
// Assert that we're bonded with the third peer. // Assert that we're bonded with the third peer.
assertThat(controller.getPeers()).hasSize(2); assertThat(controller.streamDiscoveredPeers()).hasSize(2);
assertThat(controller.getPeers()) assertThat(controller.streamDiscoveredPeers())
.filteredOn(p -> p.getStatus() == PeerDiscoveryStatus.BONDED) .filteredOn(p -> p.getStatus() == PeerDiscoveryStatus.BONDED)
.hasSize(2); .hasSize(2);
@ -500,7 +515,7 @@ public class PeerDiscoveryControllerTest {
controller.onMessage(pongPacket2, peers.get(2)); controller.onMessage(pongPacket2, peers.get(2));
// Assert we're now bonded with peer[2]. // Assert we're now bonded with peer[2].
assertThat(controller.getPeers()) assertThat(controller.streamDiscoveredPeers())
.filteredOn(p -> p.equals(peers.get(2)) && p.getStatus() == PeerDiscoveryStatus.BONDED) .filteredOn(p -> p.equals(peers.get(2)) && p.getStatus() == PeerDiscoveryStatus.BONDED)
.hasSize(1); .hasSize(1);
@ -529,7 +544,7 @@ public class PeerDiscoveryControllerTest {
final Packet pingPacket = mockPingPacket(peers.get(0), localPeer); final Packet pingPacket = mockPingPacket(peers.get(0), localPeer);
controller.onMessage(pingPacket, peers.get(0)); controller.onMessage(pingPacket, peers.get(0));
assertThat(controller.getPeers()).contains(peers.get(0)); assertThat(controller.streamDiscoveredPeers()).contains(peers.get(0));
} }
@Test @Test
@ -540,7 +555,7 @@ public class PeerDiscoveryControllerTest {
final Packet pingPacket = mockPingPacket(this.localPeer, this.localPeer); final Packet pingPacket = mockPingPacket(this.localPeer, this.localPeer);
controller.onMessage(pingPacket, localPeer); controller.onMessage(pingPacket, localPeer);
assertThat(controller.getPeers()).doesNotContain(localPeer); assertThat(controller.streamDiscoveredPeers()).doesNotContain(localPeer);
} }
@Test @Test
@ -555,9 +570,9 @@ public class PeerDiscoveryControllerTest {
final Packet pingPacket = mockPingPacket(peers.get(16), localPeer); final Packet pingPacket = mockPingPacket(peers.get(16), localPeer);
controller.onMessage(pingPacket, peers.get(16)); controller.onMessage(pingPacket, peers.get(16));
assertThat(controller.getPeers()).contains(peers.get(16)); assertThat(controller.streamDiscoveredPeers()).contains(peers.get(16));
// The first peer added should have been evicted. // The first peer added should have been evicted.
assertThat(controller.getPeers()).doesNotContain(peers.get(0)); assertThat(controller.streamDiscoveredPeers()).doesNotContain(peers.get(0));
} }
@Test @Test
@ -566,12 +581,12 @@ public class PeerDiscoveryControllerTest {
startPeerDiscoveryController(); startPeerDiscoveryController();
peerTable.tryAdd(peers.get(0)); peerTable.tryAdd(peers.get(0));
assertThat(controller.getPeers()).contains(peers.get(0)); assertThat(controller.streamDiscoveredPeers()).contains(peers.get(0));
final Packet pingPacket = mockPingPacket(peers.get(0), localPeer); final Packet pingPacket = mockPingPacket(peers.get(0), localPeer);
controller.onMessage(pingPacket, peers.get(0)); controller.onMessage(pingPacket, peers.get(0));
assertThat(controller.getPeers()).contains(peers.get(0)); assertThat(controller.streamDiscoveredPeers()).contains(peers.get(0));
} }
@Test @Test
@ -638,10 +653,10 @@ public class PeerDiscoveryControllerTest {
MockPacketDataFactory.mockPongPacket(otherPeer2, pingPacket2.getHash()); MockPacketDataFactory.mockPongPacket(otherPeer2, pingPacket2.getHash());
controller.onMessage(pongPacket2, otherPeer2); controller.onMessage(pongPacket2, otherPeer2);
assertThat(controller.getPeers()).hasSize(2); assertThat(controller.streamDiscoveredPeers()).hasSize(2);
assertThat(controller.getPeers()).contains(discoPeer); assertThat(controller.streamDiscoveredPeers()).contains(discoPeer);
assertThat(controller.getPeers()).contains(otherPeer); assertThat(controller.streamDiscoveredPeers()).contains(otherPeer);
assertThat(controller.getPeers()).doesNotContain(otherPeer2); assertThat(controller.streamDiscoveredPeers()).doesNotContain(otherPeer2);
} }
private PacketData matchPingDataForPeer(final DiscoveryPeer peer) { private PacketData matchPingDataForPeer(final DiscoveryPeer peer) {
@ -864,7 +879,7 @@ public class PeerDiscoveryControllerTest {
MockPacketDataFactory.mockPongPacket(peers.get(0), pingPacket.getHash()); MockPacketDataFactory.mockPongPacket(peers.get(0), pingPacket.getHash());
controller.onMessage(pongPacket, peers.get(0)); controller.onMessage(pongPacket, peers.get(0));
assertThat(controller.getPeers()).contains(peers.get(0)); assertThat(controller.streamDiscoveredPeers()).contains(peers.get(0));
} }
@Test @Test
@ -920,15 +935,15 @@ public class PeerDiscoveryControllerTest {
MockPacketDataFactory.mockPongPacket(peers.get(16), pingPacket.getHash()); MockPacketDataFactory.mockPongPacket(peers.get(16), pingPacket.getHash());
controller.onMessage(pongPacket16, peers.get(16)); controller.onMessage(pongPacket16, peers.get(16));
assertThat(controller.getPeers()).contains(peers.get(16)); assertThat(controller.streamDiscoveredPeers()).contains(peers.get(16));
assertThat(controller.getPeers().collect(Collectors.toList())).hasSize(16); assertThat(controller.streamDiscoveredPeers().collect(Collectors.toList())).hasSize(16);
assertThat(evictedPeerFromBucket(bootstrapPeers, controller)).isTrue(); assertThat(evictedPeerFromBucket(bootstrapPeers, controller)).isTrue();
} }
private boolean evictedPeerFromBucket( private boolean evictedPeerFromBucket(
final List<DiscoveryPeer> peers, final PeerDiscoveryController controller) { final List<DiscoveryPeer> peers, final PeerDiscoveryController controller) {
for (final DiscoveryPeer peer : peers) { for (final DiscoveryPeer peer : peers) {
if (controller.getPeers().noneMatch(candidate -> candidate.equals(peer))) { if (controller.streamDiscoveredPeers().noneMatch(candidate -> candidate.equals(peer))) {
return true; return true;
} }
} }
@ -964,7 +979,7 @@ public class PeerDiscoveryControllerTest {
verify(outboundMessageHandler, times(1)) verify(outboundMessageHandler, times(1))
.send(eq(peers.get(0)), matchPacketOfType(PacketType.FIND_NEIGHBORS)); .send(eq(peers.get(0)), matchPacketOfType(PacketType.FIND_NEIGHBORS));
assertThat(controller.getPeers()).doesNotContain(peers.get(1)); assertThat(controller.streamDiscoveredPeers()).doesNotContain(peers.get(1));
} }
@Test @Test
@ -1047,7 +1062,7 @@ public class PeerDiscoveryControllerTest {
final Packet pingPacket = mockPingPacket(peers.get(0), localPeer); final Packet pingPacket = mockPingPacket(peers.get(0), localPeer);
controller.onMessage(pingPacket, peers.get(0)); controller.onMessage(pingPacket, peers.get(0));
assertThat(controller.getPeers()).doesNotContain(peers.get(0)); assertThat(controller.streamDiscoveredPeers()).doesNotContain(peers.get(0));
} }
@Test @Test
@ -1065,7 +1080,7 @@ public class PeerDiscoveryControllerTest {
controller = getControllerBuilder().peerDroppedObservers(peerDroppedSubscribers).build(); controller = getControllerBuilder().peerDroppedObservers(peerDroppedSubscribers).build();
controller.start(); controller.start();
boolean dropped = controller.dropFromPeerTable(peer); final boolean dropped = controller.dropFromPeerTable(peer);
assertThat(dropped).isFalse(); assertThat(dropped).isFalse();
verifyZeroInteractions(peerDroppedEventConsumer); verifyZeroInteractions(peerDroppedEventConsumer);

@ -46,8 +46,8 @@ public class PeerDiscoveryTableRefreshTest {
public void tableRefreshSingleNode() { public void tableRefreshSingleNode() {
final List<SECP256K1.KeyPair> keypairs = PeerDiscoveryTestHelper.generateKeyPairs(2); final List<SECP256K1.KeyPair> keypairs = PeerDiscoveryTestHelper.generateKeyPairs(2);
final List<DiscoveryPeer> peers = helper.createDiscoveryPeers(keypairs); final List<DiscoveryPeer> peers = helper.createDiscoveryPeers(keypairs);
DiscoveryPeer localPeer = peers.get(0); final DiscoveryPeer localPeer = peers.get(0);
KeyPair localKeyPair = keypairs.get(0); final KeyPair localKeyPair = keypairs.get(0);
// Create and start the PeerDiscoveryController // Create and start the PeerDiscoveryController
final OutboundMessageHandler outboundMessageHandler = mock(OutboundMessageHandler.class); final OutboundMessageHandler outboundMessageHandler = mock(OutboundMessageHandler.class);
@ -78,7 +78,7 @@ public class PeerDiscoveryTableRefreshTest {
controller.onMessage(pingPacket, peers.get(1)); controller.onMessage(pingPacket, peers.get(1));
// Wait until the controller has added the newly found peer. // Wait until the controller has added the newly found peer.
assertThat(controller.getPeers()).hasSize(1); assertThat(controller.streamDiscoveredPeers()).hasSize(1);
// Simulate a PONG message from peer 0. // Simulate a PONG message from peer 0.
final PongPacketData pongPacketData = final PongPacketData pongPacketData =
@ -92,7 +92,7 @@ public class PeerDiscoveryTableRefreshTest {
controller.getRecursivePeerRefreshState().cancel(); controller.getRecursivePeerRefreshState().cancel();
timer.runPeriodicHandlers(); timer.runPeriodicHandlers();
controller.getPeers().forEach(p -> p.setStatus(PeerDiscoveryStatus.KNOWN)); controller.streamDiscoveredPeers().forEach(p -> p.setStatus(PeerDiscoveryStatus.KNOWN));
controller.onMessage(pingPacket, peers.get(1)); controller.onMessage(pingPacket, peers.get(1));
} }
verify(outboundMessageHandler, atLeast(5)).send(eq(peers.get(1)), captor.capture()); verify(outboundMessageHandler, atLeast(5)).send(eq(peers.get(1)), captor.capture());
@ -105,7 +105,7 @@ public class PeerDiscoveryTableRefreshTest {
// Collect targets from find neighbors packets // Collect targets from find neighbors packets
final List<BytesValue> targets = new ArrayList<>(); final List<BytesValue> targets = new ArrayList<>();
for (final Packet captured : capturedFindNeighborsPackets) { for (final Packet captured : capturedFindNeighborsPackets) {
Optional<FindNeighborsPacketData> maybeData = final Optional<FindNeighborsPacketData> maybeData =
captured.getPacketData(FindNeighborsPacketData.class); captured.getPacketData(FindNeighborsPacketData.class);
assertThat(maybeData).isPresent(); assertThat(maybeData).isPresent();
final FindNeighborsPacketData neighborsData = maybeData.get(); final FindNeighborsPacketData neighborsData = maybeData.get();

@ -39,7 +39,7 @@ public class PeerTableTest {
assertThat(result.getOutcome()).isEqualTo(AddOutcome.ADDED); assertThat(result.getOutcome()).isEqualTo(AddOutcome.ADDED);
} }
assertThat(table.getAllPeers()).hasSize(5); assertThat(table.streamAllPeers()).hasSize(5);
} }
@Test @Test
@ -55,7 +55,7 @@ public class PeerTableTest {
final PeerTable.AddResult result = table.tryAdd(localPeer); final PeerTable.AddResult result = table.tryAdd(localPeer);
assertThat(result.getOutcome()).isEqualTo(AddOutcome.SELF); assertThat(result.getOutcome()).isEqualTo(AddOutcome.SELF);
assertThat(table.getAllPeers()).hasSize(0); assertThat(table.streamAllPeers()).hasSize(0);
} }
@Test @Test
@ -80,7 +80,7 @@ public class PeerTableTest {
table.tryAdd(peer); table.tryAdd(peer);
EvictResult evictResult = table.tryEvict(peer); final EvictResult evictResult = table.tryEvict(peer);
assertThat(evictResult.getOutcome()).isEqualTo(EvictOutcome.EVICTED); assertThat(evictResult.getOutcome()).isEqualTo(EvictOutcome.EVICTED);
} }
@ -89,7 +89,7 @@ public class PeerTableTest {
final PeerTable table = new PeerTable(Peer.randomId(), 16); final PeerTable table = new PeerTable(Peer.randomId(), 16);
final DiscoveryPeer peer = helper.createDiscoveryPeer(); final DiscoveryPeer peer = helper.createDiscoveryPeer();
EvictResult evictResult = table.tryEvict(peer); final EvictResult evictResult = table.tryEvict(peer);
assertThat(evictResult.getOutcome()).isEqualTo(EvictOutcome.ABSENT); assertThat(evictResult.getOutcome()).isEqualTo(EvictOutcome.ABSENT);
} }
@ -100,7 +100,7 @@ public class PeerTableTest {
final List<DiscoveryPeer> otherPeers = helper.createDiscoveryPeers(5); final List<DiscoveryPeer> otherPeers = helper.createDiscoveryPeers(5);
otherPeers.forEach(table::tryAdd); otherPeers.forEach(table::tryAdd);
EvictResult evictResult = table.tryEvict(peer); final EvictResult evictResult = table.tryEvict(peer);
assertThat(evictResult.getOutcome()).isEqualTo(EvictOutcome.ABSENT); assertThat(evictResult.getOutcome()).isEqualTo(EvictOutcome.ABSENT);
} }
@ -109,7 +109,7 @@ public class PeerTableTest {
final DiscoveryPeer peer = helper.createDiscoveryPeer(); final DiscoveryPeer peer = helper.createDiscoveryPeer();
final PeerTable table = new PeerTable(peer.getId(), 16); final PeerTable table = new PeerTable(peer.getId(), 16);
EvictResult evictResult = table.tryEvict(peer); final EvictResult evictResult = table.tryEvict(peer);
assertThat(evictResult.getOutcome()).isEqualTo(EvictOutcome.SELF); assertThat(evictResult.getOutcome()).isEqualTo(EvictOutcome.SELF);
} }
} }

@ -80,7 +80,7 @@ public final class DefaultP2PNetworkTest {
@Mock private Blockchain blockchain; @Mock private Blockchain blockchain;
private ArgumentCaptor<BlockAddedObserver> observerCaptor = private final ArgumentCaptor<BlockAddedObserver> observerCaptor =
ArgumentCaptor.forClass(BlockAddedObserver.class); ArgumentCaptor.forClass(BlockAddedObserver.class);
private final Vertx vertx = Vertx.vertx(); private final Vertx vertx = Vertx.vertx();
@ -312,7 +312,7 @@ public final class DefaultP2PNetworkTest {
final Peer remotePeer = mockPeer("127.0.0.2", 30302); final Peer remotePeer = mockPeer("127.0.0.2", 30302);
final PeerConnection peerConnection = mockPeerConnection(remotePeer); final PeerConnection peerConnection = mockPeerConnection(remotePeer);
CompletableFuture<PeerConnection> future = network.connect(remotePeer); final CompletableFuture<PeerConnection> future = network.connect(remotePeer);
assertThat(network.peerMaintainConnectionList.contains(remotePeer)).isFalse(); assertThat(network.peerMaintainConnectionList.contains(remotePeer)).isFalse();
assertThat(network.pendingConnections.containsKey(remotePeer)).isTrue(); assertThat(network.pendingConnections.containsKey(remotePeer)).isTrue();
@ -343,10 +343,10 @@ public final class DefaultP2PNetworkTest {
@Test @Test
public void handlePeerBondedEvent_forPeerWithNoTcpPort() { public void handlePeerBondedEvent_forPeerWithNoTcpPort() {
final DefaultP2PNetwork network = mockNetwork(); final DefaultP2PNetwork network = mockNetwork();
DiscoveryPeer peer = final DiscoveryPeer peer =
DiscoveryPeer.fromIdAndEndpoint( DiscoveryPeer.fromIdAndEndpoint(
Peer.randomId(), new Endpoint("127.0.0.1", 999, OptionalInt.empty())); Peer.randomId(), new Endpoint("127.0.0.1", 999, OptionalInt.empty()));
PeerBondedEvent peerBondedEvent = new PeerBondedEvent(peer, System.currentTimeMillis()); final PeerBondedEvent peerBondedEvent = new PeerBondedEvent(peer, System.currentTimeMillis());
network.handlePeerBondedEvent().accept(peerBondedEvent); network.handlePeerBondedEvent().accept(peerBondedEvent);
verify(network, times(1)).connect(peer); verify(network, times(1)).connect(peer);
@ -362,8 +362,8 @@ public final class DefaultP2PNetworkTest {
DiscoveryPeer peer = createDiscoveryPeer(); DiscoveryPeer peer = createDiscoveryPeer();
peer.setStatus(PeerDiscoveryStatus.BONDED); peer.setStatus(PeerDiscoveryStatus.BONDED);
doReturn(Stream.of(peer)).when(network).getDiscoveredPeers(); doReturn(Stream.of(peer)).when(network).streamDiscoveredPeers();
ArgumentCaptor<DiscoveryPeer> peerCapture = ArgumentCaptor.forClass(DiscoveryPeer.class); final ArgumentCaptor<DiscoveryPeer> peerCapture = ArgumentCaptor.forClass(DiscoveryPeer.class);
doReturn(CompletableFuture.completedFuture(mock(PeerConnection.class))) doReturn(CompletableFuture.completedFuture(mock(PeerConnection.class)))
.when(network) .when(network)
.connect(peerCapture.capture()); .connect(peerCapture.capture());
@ -383,7 +383,7 @@ public final class DefaultP2PNetworkTest {
DiscoveryPeer peer = createDiscoveryPeer(); DiscoveryPeer peer = createDiscoveryPeer();
peer.setStatus(PeerDiscoveryStatus.KNOWN); peer.setStatus(PeerDiscoveryStatus.KNOWN);
doReturn(Stream.of(peer)).when(network).getDiscoveredPeers(); doReturn(Stream.of(peer)).when(network).streamDiscoveredPeers();
network.attemptPeerConnections(); network.attemptPeerConnections();
verify(network, times(0)).connect(any()); verify(network, times(0)).connect(any());
@ -400,7 +400,7 @@ public final class DefaultP2PNetworkTest {
peer.setStatus(PeerDiscoveryStatus.BONDED); peer.setStatus(PeerDiscoveryStatus.BONDED);
doReturn(true).when(network).isConnecting(peer); doReturn(true).when(network).isConnecting(peer);
doReturn(Stream.of(peer)).when(network).getDiscoveredPeers(); doReturn(Stream.of(peer)).when(network).streamDiscoveredPeers();
network.attemptPeerConnections(); network.attemptPeerConnections();
verify(network, times(0)).connect(any()); verify(network, times(0)).connect(any());
@ -417,7 +417,7 @@ public final class DefaultP2PNetworkTest {
peer.setStatus(PeerDiscoveryStatus.BONDED); peer.setStatus(PeerDiscoveryStatus.BONDED);
doReturn(true).when(network).isConnected(peer); doReturn(true).when(network).isConnected(peer);
doReturn(Stream.of(peer)).when(network).getDiscoveredPeers(); doReturn(Stream.of(peer)).when(network).streamDiscoveredPeers();
network.attemptPeerConnections(); network.attemptPeerConnections();
verify(network, times(0)).connect(any()); verify(network, times(0)).connect(any());
@ -430,7 +430,7 @@ public final class DefaultP2PNetworkTest {
mockNetwork(() -> RlpxConfiguration.create().setMaxPeers(maxPeers)); mockNetwork(() -> RlpxConfiguration.create().setMaxPeers(maxPeers));
doReturn(2).when(network).connectionCount(); doReturn(2).when(network).connectionCount();
List<DiscoveryPeer> peers = final List<DiscoveryPeer> peers =
Stream.iterate(1, n -> n + 1) Stream.iterate(1, n -> n + 1)
.limit(10) .limit(10)
.map( .map(
@ -441,8 +441,8 @@ public final class DefaultP2PNetworkTest {
}) })
.collect(Collectors.toList()); .collect(Collectors.toList());
doReturn(peers.stream()).when(network).getDiscoveredPeers(); doReturn(peers.stream()).when(network).streamDiscoveredPeers();
ArgumentCaptor<DiscoveryPeer> peerCapture = ArgumentCaptor.forClass(DiscoveryPeer.class); final ArgumentCaptor<DiscoveryPeer> peerCapture = ArgumentCaptor.forClass(DiscoveryPeer.class);
doReturn(CompletableFuture.completedFuture(mock(PeerConnection.class))) doReturn(CompletableFuture.completedFuture(mock(PeerConnection.class)))
.when(network) .when(network)
.connect(peerCapture.capture()); .connect(peerCapture.capture());
@ -459,7 +459,7 @@ public final class DefaultP2PNetworkTest {
mockNetwork(() -> RlpxConfiguration.create().setMaxPeers(maxPeers)); mockNetwork(() -> RlpxConfiguration.create().setMaxPeers(maxPeers));
doReturn(maxPeers).when(network).connectionCount(); doReturn(maxPeers).when(network).connectionCount();
List<DiscoveryPeer> peers = final List<DiscoveryPeer> peers =
Stream.iterate(1, n -> n + 1) Stream.iterate(1, n -> n + 1)
.limit(10) .limit(10)
.map( .map(
@ -470,7 +470,7 @@ public final class DefaultP2PNetworkTest {
}) })
.collect(Collectors.toList()); .collect(Collectors.toList());
lenient().doReturn(peers.stream()).when(network).getDiscoveredPeers(); lenient().doReturn(peers.stream()).when(network).streamDiscoveredPeers();
network.attemptPeerConnections(); network.attemptPeerConnections();
verify(network, times(0)).connect(any()); verify(network, times(0)).connect(any());
@ -518,7 +518,7 @@ public final class DefaultP2PNetworkTest {
} }
private DefaultP2PNetwork mockNetwork(final Supplier<RlpxConfiguration> rlpxConfig) { private DefaultP2PNetwork mockNetwork(final Supplier<RlpxConfiguration> rlpxConfig) {
DefaultP2PNetwork network = spy(network(rlpxConfig)); final DefaultP2PNetwork network = spy(network(rlpxConfig));
lenient().doReturn(new CompletableFuture<>()).when(network).connect(any()); lenient().doReturn(new CompletableFuture<>()).when(network).connect(any());
return network; return network;
} }

@ -59,7 +59,7 @@ public class NetworkingServiceLifecycleTest {
assertEquals(config.getDiscovery().getAdvertisedHost(), enode.getIpAsString()); assertEquals(config.getDiscovery().getAdvertisedHost(), enode.getIpAsString());
assertThat(udpPort).isNotZero(); assertThat(udpPort).isNotZero();
assertThat(tcpPort).isNotZero(); assertThat(tcpPort).isNotZero();
assertThat(service.getDiscoveredPeers()).hasSize(0); assertThat(service.streamDiscoveredPeers()).hasSize(0);
} }
} }
@ -146,7 +146,7 @@ public class NetworkingServiceLifecycleTest {
@Test @Test
public void createP2PNetwork_NoActivePeers() throws IOException { public void createP2PNetwork_NoActivePeers() throws IOException {
try (final P2PNetwork agent = builder().build()) { try (final P2PNetwork agent = builder().build()) {
assertTrue(agent.getDiscoveredPeers().collect(toList()).isEmpty()); assertTrue(agent.streamDiscoveredPeers().collect(toList()).isEmpty());
assertEquals(0, agent.getPeers().size()); assertEquals(0, agent.getPeers().size());
} }
} }

@ -52,9 +52,9 @@ public interface MetricsSystem {
createGauge(category, name, help, () -> (double) valueSupplier.get()); createGauge(category, name, help, () -> (double) valueSupplier.get());
} }
Stream<Observation> getMetrics(MetricCategory category); Stream<Observation> streamObservations(MetricCategory category);
default Stream<Observation> getMetrics() { default Stream<Observation> streamObservations() {
return Stream.of(MetricCategory.values()).flatMap(this::getMetrics); return Stream.of(MetricCategory.values()).flatMap(this::streamObservations);
} }
} }

@ -88,12 +88,12 @@ public class NoOpMetricsSystem implements MetricsSystem {
final Supplier<Double> valueSupplier) {} final Supplier<Double> valueSupplier) {}
@Override @Override
public Stream<Observation> getMetrics(final MetricCategory category) { public Stream<Observation> streamObservations(final MetricCategory category) {
return Stream.empty(); return Stream.empty();
} }
@Override @Override
public Stream<Observation> getMetrics() { public Stream<Observation> streamObservations() {
return Stream.empty(); return Stream.empty();
} }

@ -160,7 +160,7 @@ public class PrometheusMetricsSystem implements MetricsSystem {
} }
@Override @Override
public Stream<Observation> getMetrics(final MetricCategory category) { public Stream<Observation> streamObservations(final MetricCategory category) {
return collectors.getOrDefault(category, Collections.emptySet()).stream() return collectors.getOrDefault(category, Collections.emptySet()).stream()
.flatMap(collector -> collector.collect().stream()) .flatMap(collector -> collector.collect().stream())
.flatMap(familySamples -> convertSamplesToObservations(category, familySamples)); .flatMap(familySamples -> convertSamplesToObservations(category, familySamples));

@ -50,11 +50,11 @@ public class PrometheusMetricsSystemTest {
final Counter counter = metricsSystem.createCounter(PEERS, "connected", "Some help string"); final Counter counter = metricsSystem.createCounter(PEERS, "connected", "Some help string");
counter.inc(); counter.inc();
assertThat(metricsSystem.getMetrics()) assertThat(metricsSystem.streamObservations())
.containsExactly(new Observation(PEERS, "connected", 1d, emptyList())); .containsExactly(new Observation(PEERS, "connected", 1d, emptyList()));
counter.inc(); counter.inc();
assertThat(metricsSystem.getMetrics()) assertThat(metricsSystem.streamObservations())
.containsExactly(new Observation(PEERS, "connected", 2d, emptyList())); .containsExactly(new Observation(PEERS, "connected", 2d, emptyList()));
} }
@ -67,11 +67,11 @@ public class PrometheusMetricsSystemTest {
assertThat(counter1).isEqualTo(counter2); assertThat(counter1).isEqualTo(counter2);
counter1.labels().inc(); counter1.labels().inc();
assertThat(metricsSystem.getMetrics()) assertThat(metricsSystem.streamObservations())
.containsExactly(new Observation(PEERS, "connected", 1d, emptyList())); .containsExactly(new Observation(PEERS, "connected", 1d, emptyList()));
counter2.labels().inc(); counter2.labels().inc();
assertThat(metricsSystem.getMetrics()) assertThat(metricsSystem.streamObservations())
.containsExactly(new Observation(PEERS, "connected", 2d, emptyList())); .containsExactly(new Observation(PEERS, "connected", 2d, emptyList()));
} }
@ -84,7 +84,7 @@ public class PrometheusMetricsSystemTest {
counter.labels("value2").inc(); counter.labels("value2").inc();
counter.labels("value1").inc(); counter.labels("value1").inc();
assertThat(metricsSystem.getMetrics()) assertThat(metricsSystem.streamObservations())
.containsExactlyInAnyOrder( .containsExactlyInAnyOrder(
new Observation(PEERS, "connected", 2d, singletonList("value1")), new Observation(PEERS, "connected", 2d, singletonList("value1")),
new Observation(PEERS, "connected", 1d, singletonList("value2"))); new Observation(PEERS, "connected", 1d, singletonList("value2")));
@ -95,11 +95,11 @@ public class PrometheusMetricsSystemTest {
final Counter counter = metricsSystem.createCounter(PEERS, "connected", "Some help string"); final Counter counter = metricsSystem.createCounter(PEERS, "connected", "Some help string");
counter.inc(5); counter.inc(5);
assertThat(metricsSystem.getMetrics()) assertThat(metricsSystem.streamObservations())
.containsExactly(new Observation(PEERS, "connected", 5d, emptyList())); .containsExactly(new Observation(PEERS, "connected", 5d, emptyList()));
counter.inc(6); counter.inc(6);
assertThat(metricsSystem.getMetrics()) assertThat(metricsSystem.streamObservations())
.containsExactly(new Observation(PEERS, "connected", 11d, emptyList())); .containsExactly(new Observation(PEERS, "connected", 11d, emptyList()));
} }
@ -110,7 +110,7 @@ public class PrometheusMetricsSystemTest {
final TimingContext context = timer.startTimer(); final TimingContext context = timer.startTimer();
context.stopTimer(); context.stopTimer();
assertThat(metricsSystem.getMetrics()) assertThat(metricsSystem.streamObservations())
.usingElementComparator(IGNORE_VALUES) .usingElementComparator(IGNORE_VALUES)
.containsExactlyInAnyOrder( .containsExactlyInAnyOrder(
new Observation(RPC, "request", null, asList("quantile", "0.2")), new Observation(RPC, "request", null, asList("quantile", "0.2")),
@ -140,7 +140,7 @@ public class PrometheusMetricsSystemTest {
//noinspection EmptyTryBlock //noinspection EmptyTryBlock
try (final TimingContext ignored = timer.labels("method").startTimer()) {} try (final TimingContext ignored = timer.labels("method").startTimer()) {}
assertThat(metricsSystem.getMetrics()) assertThat(metricsSystem.streamObservations())
.usingElementComparator(IGNORE_VALUES) // We don't know how long it will actually take. .usingElementComparator(IGNORE_VALUES) // We don't know how long it will actually take.
.containsExactlyInAnyOrder( .containsExactlyInAnyOrder(
new Observation(RPC, "request", null, asList("method", "quantile", "0.2")), new Observation(RPC, "request", null, asList("method", "quantile", "0.2")),
@ -157,7 +157,7 @@ public class PrometheusMetricsSystemTest {
public void shouldCreateObservationFromGauge() { public void shouldCreateObservationFromGauge() {
metricsSystem.createGauge(JVM, "myValue", "Help", () -> 7d); metricsSystem.createGauge(JVM, "myValue", "Help", () -> 7d);
assertThat(metricsSystem.getMetrics()) assertThat(metricsSystem.streamObservations())
.containsExactlyInAnyOrder(new Observation(JVM, "myValue", 7d, emptyList())); .containsExactlyInAnyOrder(new Observation(JVM, "myValue", 7d, emptyList()));
} }
@ -184,7 +184,7 @@ public class PrometheusMetricsSystemTest {
assertThat(counterN).isSameAs(NoOpMetricsSystem.NO_OP_LABELLED_1_COUNTER); assertThat(counterN).isSameAs(NoOpMetricsSystem.NO_OP_LABELLED_1_COUNTER);
counterN.labels("show").inc(); counterN.labels("show").inc();
assertThat(localMetricSystem.getMetrics()).isEmpty(); assertThat(localMetricSystem.streamObservations()).isEmpty();
// do a category we are watching // do a category we are watching
final LabelledMetric<Counter> counterR = final LabelledMetric<Counter> counterR =
@ -192,7 +192,7 @@ public class PrometheusMetricsSystemTest {
assertThat(counterR).isNotSameAs(NoOpMetricsSystem.NO_OP_LABELLED_1_COUNTER); assertThat(counterR).isNotSameAs(NoOpMetricsSystem.NO_OP_LABELLED_1_COUNTER);
counterR.labels("op").inc(); counterR.labels("op").inc();
assertThat(localMetricSystem.getMetrics()) assertThat(localMetricSystem.streamObservations())
.containsExactly(new Observation(RPC, "name", 1.0, singletonList("op"))); .containsExactly(new Observation(RPC, "name", 1.0, singletonList("op")));
} }

Loading…
Cancel
Save