7311: Add PeerTask system for use in future PRs

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>
pull/7628/head
Matilda Clerke 2 months ago
parent 89dfa95860
commit 4b80016587
  1. 26
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/InvalidPeerTaskResponseException.java
  2. 17
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/NoAvailablePeerException.java
  3. 64
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerManager.java
  4. 63
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTask.java
  5. 20
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskBehavior.java
  6. 157
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java
  7. 24
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorResponseCode.java
  8. 35
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorResult.java
  9. 55
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskRequestSender.java
  10. 80
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerManagerTest.java
  11. 268
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java
  12. 77
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskRequestSenderTest.java

@ -0,0 +1,26 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;
public class InvalidPeerTaskResponseException extends Exception {
public InvalidPeerTaskResponseException() {
super();
}
public InvalidPeerTaskResponseException(final Throwable cause) {
super(cause);
}
}

@ -0,0 +1,17 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;
public class NoAvailablePeerException extends Exception {}

@ -0,0 +1,64 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.p2p.peers.PeerId;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** "Manages" the EthPeers for the PeerTaskExecutor */
public class PeerManager {
private static final Logger LOG = LoggerFactory.getLogger(PeerManager.class);
// use a synchronized map to ensure the map is never modified by multiple threads at once
private final Map<PeerId, EthPeer> ethPeersByPeerId =
Collections.synchronizedMap(new HashMap<>());
/**
* Gets the highest reputation peer matching the supplies filter
*
* @param filter a filter to match prospective peers with
* @return the highest reputation peer matching the supplies filter
* @throws NoAvailablePeerException If there are no suitable peers
*/
public EthPeer getPeer(final Predicate<EthPeer> filter) throws NoAvailablePeerException {
LOG.trace("Getting peer from pool of {} peers", ethPeersByPeerId.size());
return ethPeersByPeerId.values().stream()
.filter(filter)
.max(Comparator.naturalOrder())
.orElseThrow(NoAvailablePeerException::new);
}
public Optional<EthPeer> getPeerByPeerId(final PeerId peerId) {
return Optional.ofNullable(ethPeersByPeerId.get(peerId));
}
public void addPeer(final EthPeer ethPeer) {
ethPeersByPeerId.put(ethPeer.getConnection().getPeer(), ethPeer);
}
public void removePeer(final PeerId peerId) {
ethPeersByPeerId.remove(peerId);
}
}

@ -0,0 +1,63 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import java.util.Collection;
/**
* Represents a task to be executed on an EthPeer by the PeerTaskExecutor
*
* @param <T> The type of the result of this PeerTask
*/
public interface PeerTask<T> {
/**
* Returns the SubProtocol used for this PeerTask
*
* @return the SubProtocol used for this PeerTask
*/
String getSubProtocol();
/**
* Gets the minimum required block number for a peer to have to successfully execute this task
*
* @return the minimum required block number for a peer to have to successfully execute this task
*/
long getRequiredBlockNumber();
/**
* Gets the request data to send to the EthPeer
*
* @return the request data to send to the EthPeer
*/
MessageData getRequestMessage();
/**
* Parses the MessageData response from the EthPeer
*
* @param messageData the response MessageData to be parsed
* @return a T built from the response MessageData
* @throws InvalidPeerTaskResponseException if the response messageData is invalid
*/
T parseResponse(MessageData messageData) throws InvalidPeerTaskResponseException;
/**
* Gets the Collection of behaviors this task is expected to exhibit in the PeetTaskExecutor
*
* @return the Collection of behaviors this task is expected to exhibit in the PeetTaskExecutor
*/
Collection<PeerTaskBehavior> getPeerTaskBehaviors();
}

@ -0,0 +1,20 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;
public enum PeerTaskBehavior {
RETRY_WITH_SAME_PEER,
RETRY_WITH_OTHER_PEERS
}

@ -0,0 +1,157 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.LabelledMetric;
import org.hyperledger.besu.plugin.services.metrics.OperationTimer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
/** Manages the execution of PeerTasks, respecting their PeerTaskBehavior */
public class PeerTaskExecutor {
private static final long[] WAIT_TIME_BEFORE_RETRY = {0, 20000, 5000};
private final PeerManager peerManager;
private final PeerTaskRequestSender requestSender;
private final Supplier<ProtocolSpec> protocolSpecSupplier;
private final LabelledMetric<OperationTimer> requestTimer;
public PeerTaskExecutor(
final PeerManager peerManager,
final PeerTaskRequestSender requestSender,
final Supplier<ProtocolSpec> protocolSpecSupplier,
final MetricsSystem metricsSystem) {
this.peerManager = peerManager;
this.requestSender = requestSender;
this.protocolSpecSupplier = protocolSpecSupplier;
requestTimer =
metricsSystem.createLabelledTimer(
BesuMetricCategory.PEERS, "Peer Task Executor Request Time", "", "Task Class Name");
}
public <T> PeerTaskExecutorResult<T> execute(final PeerTask<T> peerTask) {
PeerTaskExecutorResult<T> executorResult;
int triesRemaining =
peerTask.getPeerTaskBehaviors().contains(PeerTaskBehavior.RETRY_WITH_OTHER_PEERS) ? 3 : 1;
final Collection<EthPeer> usedEthPeers = new ArrayList<>();
do {
EthPeer peer;
try {
peer =
peerManager.getPeer(
(candidatePeer) ->
isPeerUnused(candidatePeer, usedEthPeers)
&& (protocolSpecSupplier.get().isPoS()
|| isPeerHeightHighEnough(
candidatePeer, peerTask.getRequiredBlockNumber()))
&& isPeerProtocolSuitable(candidatePeer, peerTask.getSubProtocol()));
usedEthPeers.add(peer);
executorResult = executeAgainstPeer(peerTask, peer);
} catch (NoAvailablePeerException e) {
executorResult =
new PeerTaskExecutorResult<>(null, PeerTaskExecutorResponseCode.NO_PEER_AVAILABLE);
}
} while (--triesRemaining > 0
&& executorResult.getResponseCode() != PeerTaskExecutorResponseCode.SUCCESS);
return executorResult;
}
public <T> CompletableFuture<PeerTaskExecutorResult<T>> executeAsync(final PeerTask<T> peerTask) {
return CompletableFuture.supplyAsync(() -> execute(peerTask));
}
public <T> PeerTaskExecutorResult<T> executeAgainstPeer(
final PeerTask<T> peerTask, final EthPeer peer) {
MessageData requestMessageData = peerTask.getRequestMessage();
PeerTaskExecutorResult<T> executorResult;
int triesRemaining =
peerTask.getPeerTaskBehaviors().contains(PeerTaskBehavior.RETRY_WITH_SAME_PEER) ? 3 : 1;
do {
try {
MessageData responseMessageData;
try (final OperationTimer.TimingContext timingContext =
requestTimer.labels(peerTask.getClass().getSimpleName()).startTimer()) {
responseMessageData =
requestSender.sendRequest(peerTask.getSubProtocol(), requestMessageData, peer);
}
T result = peerTask.parseResponse(responseMessageData);
peer.recordUsefulResponse();
executorResult = new PeerTaskExecutorResult<>(result, PeerTaskExecutorResponseCode.SUCCESS);
} catch (PeerConnection.PeerNotConnected e) {
executorResult =
new PeerTaskExecutorResult<>(null, PeerTaskExecutorResponseCode.PEER_DISCONNECTED);
} catch (InterruptedException | TimeoutException e) {
peer.recordRequestTimeout(requestMessageData.getCode());
executorResult = new PeerTaskExecutorResult<>(null, PeerTaskExecutorResponseCode.TIMEOUT);
} catch (InvalidPeerTaskResponseException e) {
peer.recordUselessResponse(e.getMessage());
executorResult =
new PeerTaskExecutorResult<>(null, PeerTaskExecutorResponseCode.INVALID_RESPONSE);
} catch (ExecutionException e) {
executorResult =
new PeerTaskExecutorResult<>(null, PeerTaskExecutorResponseCode.INTERNAL_SERVER_ERROR);
}
} while (--triesRemaining > 0
&& executorResult.getResponseCode() != PeerTaskExecutorResponseCode.SUCCESS
&& executorResult.getResponseCode() != PeerTaskExecutorResponseCode.PEER_DISCONNECTED
&& sleepBetweenRetries(WAIT_TIME_BEFORE_RETRY[triesRemaining]));
return executorResult;
}
public <T> CompletableFuture<PeerTaskExecutorResult<T>> executeAgainstPeerAsync(
final PeerTask<T> peerTask, final EthPeer peer) {
return CompletableFuture.supplyAsync(() -> executeAgainstPeer(peerTask, peer));
}
private boolean sleepBetweenRetries(final long sleepTime) {
try {
Thread.sleep(sleepTime);
return true;
} catch (InterruptedException e) {
return false;
}
}
private static boolean isPeerUnused(
final EthPeer ethPeer, final Collection<EthPeer> usedEthPeers) {
return !usedEthPeers.contains(ethPeer);
}
private static boolean isPeerHeightHighEnough(final EthPeer ethPeer, final long requiredHeight) {
return ethPeer.chainState().getEstimatedHeight() >= requiredHeight;
}
private static boolean isPeerProtocolSuitable(final EthPeer ethPeer, final String protocol) {
return ethPeer.getProtocolName().equals(protocol);
}
}

@ -0,0 +1,24 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;
public enum PeerTaskExecutorResponseCode {
SUCCESS,
NO_PEER_AVAILABLE,
PEER_DISCONNECTED,
INTERNAL_SERVER_ERROR,
TIMEOUT,
INVALID_RESPONSE
}

@ -0,0 +1,35 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;
import java.util.Optional;
public class PeerTaskExecutorResult<T> {
private final Optional<T> result;
private final PeerTaskExecutorResponseCode responseCode;
public PeerTaskExecutorResult(final T result, final PeerTaskExecutorResponseCode responseCode) {
this.result = Optional.ofNullable(result);
this.responseCode = responseCode;
}
public Optional<T> getResult() {
return result;
}
public PeerTaskExecutorResponseCode getResponseCode() {
return responseCode;
}
}

@ -0,0 +1,55 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.RequestManager.ResponseStream;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class PeerTaskRequestSender {
private static final long DEFAULT_TIMEOUT_MS = 20_000;
private final long timeoutMs;
public PeerTaskRequestSender() {
this.timeoutMs = DEFAULT_TIMEOUT_MS;
}
public PeerTaskRequestSender(final long timeoutMs) {
this.timeoutMs = timeoutMs;
}
public MessageData sendRequest(
final String subProtocol, final MessageData requestMessageData, final EthPeer ethPeer)
throws PeerConnection.PeerNotConnected,
ExecutionException,
InterruptedException,
TimeoutException {
ResponseStream responseStream =
ethPeer.send(requestMessageData, subProtocol, ethPeer.getConnection());
final CompletableFuture<MessageData> responseMessageDataFuture = new CompletableFuture<>();
responseStream.then(
(boolean streamClosed, MessageData message, EthPeer peer) -> {
responseMessageDataFuture.complete(message);
});
return responseMessageDataFuture.get(timeoutMs, TimeUnit.MILLISECONDS);
}
}

@ -0,0 +1,80 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.MockPeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability;
import java.time.Clock;
import java.util.Collections;
import java.util.Set;
import org.apache.tuweni.bytes.Bytes;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
public class PeerManagerTest {
public PeerManager peerManager;
@BeforeEach
public void beforeTest() {
peerManager = new PeerManager();
}
@Test
public void testGetPeer() throws NoAvailablePeerException {
EthPeer protocol1With5ReputationPeer =
createTestPeer(Set.of(Capability.create("capability1", 1)), "protocol1", 5);
peerManager.addPeer(protocol1With5ReputationPeer);
EthPeer protocol1With4ReputationPeer =
createTestPeer(Set.of(Capability.create("capability1", 1)), "protocol1", 4);
peerManager.addPeer(protocol1With4ReputationPeer);
EthPeer protocol2With50ReputationPeer =
createTestPeer(Set.of(Capability.create("capability1", 1)), "protocol2", 50);
peerManager.addPeer(protocol2With50ReputationPeer);
EthPeer protocol2With4ReputationPeer =
createTestPeer(Set.of(Capability.create("capability1", 1)), "protocol2", 4);
peerManager.addPeer(protocol2With4ReputationPeer);
EthPeer result = peerManager.getPeer((p) -> p.getProtocolName().equals("protocol1"));
Assertions.assertSame(protocol1With5ReputationPeer, result);
}
private EthPeer createTestPeer(
final Set<Capability> connectionCapabilities,
final String protocolName,
final int reputationAdjustment) {
PeerConnection peerConnection = new MockPeerConnection(connectionCapabilities);
EthPeer peer =
new EthPeer(
peerConnection,
protocolName,
null,
Collections.emptyList(),
1,
Clock.systemUTC(),
Collections.emptyList(),
Bytes.EMPTY);
for (int i = 0; i < reputationAdjustment; i++) {
peer.getReputation().recordUsefulResponse();
}
return peer;
}
}

@ -0,0 +1,268 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
public class PeerTaskExecutorTest {
private @Mock PeerManager peerManager;
private @Mock PeerTaskRequestSender requestSender;
private @Mock ProtocolSpec protocolSpec;
private @Mock PeerTask<Object> peerTask;
private @Mock MessageData requestMessageData;
private @Mock MessageData responseMessageData;
private @Mock EthPeer ethPeer;
private AutoCloseable mockCloser;
private PeerTaskExecutor peerTaskExecutor;
@BeforeEach
public void beforeTest() {
mockCloser = MockitoAnnotations.openMocks(this);
peerTaskExecutor =
new PeerTaskExecutor(
peerManager, requestSender, () -> protocolSpec, new NoOpMetricsSystem());
}
@AfterEach
public void afterTest() throws Exception {
mockCloser.close();
}
@Test
public void testExecuteAgainstPeerWithNoPeerTaskBehaviorsAndSuccessfulFlow()
throws PeerConnection.PeerNotConnected,
ExecutionException,
InterruptedException,
TimeoutException,
InvalidPeerTaskResponseException {
String subprotocol = "subprotocol";
Object responseObject = new Object();
Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData);
Mockito.when(peerTask.getPeerTaskBehaviors()).thenReturn(Collections.emptyList());
Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol);
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer))
.thenReturn(responseMessageData);
Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject);
PeerTaskExecutorResult<Object> result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer);
Mockito.verify(ethPeer).recordUsefulResponse();
Assertions.assertNotNull(result);
Assertions.assertTrue(result.getResult().isPresent());
Assertions.assertSame(responseObject, result.getResult().get());
Assertions.assertEquals(PeerTaskExecutorResponseCode.SUCCESS, result.getResponseCode());
}
@Test
public void testExecuteAgainstPeerWithRetryBehaviorsAndSuccessfulFlowAfterFirstFailure()
throws PeerConnection.PeerNotConnected,
ExecutionException,
InterruptedException,
TimeoutException,
InvalidPeerTaskResponseException {
String subprotocol = "subprotocol";
Object responseObject = new Object();
int requestMessageDataCode = 123;
Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData);
Mockito.when(peerTask.getPeerTaskBehaviors())
.thenReturn(List.of(PeerTaskBehavior.RETRY_WITH_SAME_PEER));
Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol);
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer))
.thenThrow(new TimeoutException())
.thenReturn(responseMessageData);
Mockito.when(requestMessageData.getCode()).thenReturn(requestMessageDataCode);
Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject);
PeerTaskExecutorResult<Object> result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer);
Mockito.verify(ethPeer).recordRequestTimeout(requestMessageDataCode);
Mockito.verify(ethPeer).recordUsefulResponse();
Assertions.assertNotNull(result);
Assertions.assertTrue(result.getResult().isPresent());
Assertions.assertSame(responseObject, result.getResult().get());
Assertions.assertEquals(PeerTaskExecutorResponseCode.SUCCESS, result.getResponseCode());
}
@Test
public void testExecuteAgainstPeerWithNoPeerTaskBehaviorsAndPeerNotConnected()
throws PeerConnection.PeerNotConnected,
ExecutionException,
InterruptedException,
TimeoutException,
InvalidPeerTaskResponseException {
String subprotocol = "subprotocol";
Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData);
Mockito.when(peerTask.getPeerTaskBehaviors()).thenReturn(Collections.emptyList());
Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol);
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer))
.thenThrow(new PeerConnection.PeerNotConnected(""));
PeerTaskExecutorResult<Object> result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer);
Assertions.assertNotNull(result);
Assertions.assertTrue(result.getResult().isEmpty());
Assertions.assertEquals(
PeerTaskExecutorResponseCode.PEER_DISCONNECTED, result.getResponseCode());
}
@Test
public void testExecuteAgainstPeerWithNoPeerTaskBehaviorsAndTimeoutException()
throws PeerConnection.PeerNotConnected,
ExecutionException,
InterruptedException,
TimeoutException,
InvalidPeerTaskResponseException {
String subprotocol = "subprotocol";
int requestMessageDataCode = 123;
Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData);
Mockito.when(peerTask.getPeerTaskBehaviors()).thenReturn(Collections.emptyList());
Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol);
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer))
.thenThrow(new TimeoutException());
Mockito.when(requestMessageData.getCode()).thenReturn(requestMessageDataCode);
PeerTaskExecutorResult<Object> result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer);
Mockito.verify(ethPeer).recordRequestTimeout(requestMessageDataCode);
Assertions.assertNotNull(result);
Assertions.assertTrue(result.getResult().isEmpty());
Assertions.assertEquals(PeerTaskExecutorResponseCode.TIMEOUT, result.getResponseCode());
}
@Test
public void testExecuteAgainstPeerWithNoPeerTaskBehaviorsAndInvalidResponseMessage()
throws PeerConnection.PeerNotConnected,
ExecutionException,
InterruptedException,
TimeoutException,
InvalidPeerTaskResponseException {
String subprotocol = "subprotocol";
Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData);
Mockito.when(peerTask.getPeerTaskBehaviors()).thenReturn(Collections.emptyList());
Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol);
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer))
.thenReturn(responseMessageData);
Mockito.when(peerTask.parseResponse(responseMessageData))
.thenThrow(new InvalidPeerTaskResponseException());
PeerTaskExecutorResult<Object> result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer);
Mockito.verify(ethPeer).recordUselessResponse(null);
Assertions.assertNotNull(result);
Assertions.assertTrue(result.getResult().isEmpty());
Assertions.assertEquals(
PeerTaskExecutorResponseCode.INVALID_RESPONSE, result.getResponseCode());
}
@Test
@SuppressWarnings("unchecked")
public void testExecuteWithNoPeerTaskBehaviorsAndSuccessFlow()
throws PeerConnection.PeerNotConnected,
ExecutionException,
InterruptedException,
TimeoutException,
InvalidPeerTaskResponseException,
NoAvailablePeerException {
String subprotocol = "subprotocol";
Object responseObject = new Object();
Mockito.when(peerManager.getPeer(Mockito.any(Predicate.class))).thenReturn(ethPeer);
Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData);
Mockito.when(peerTask.getPeerTaskBehaviors()).thenReturn(Collections.emptyList());
Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol);
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer))
.thenReturn(responseMessageData);
Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject);
PeerTaskExecutorResult<Object> result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer);
Mockito.verify(ethPeer).recordUsefulResponse();
Assertions.assertNotNull(result);
Assertions.assertTrue(result.getResult().isPresent());
Assertions.assertSame(responseObject, result.getResult().get());
Assertions.assertEquals(PeerTaskExecutorResponseCode.SUCCESS, result.getResponseCode());
}
@Test
@SuppressWarnings("unchecked")
public void testExecuteWithPeerSwitchingAndSuccessFlow()
throws PeerConnection.PeerNotConnected,
ExecutionException,
InterruptedException,
TimeoutException,
InvalidPeerTaskResponseException,
NoAvailablePeerException {
String subprotocol = "subprotocol";
Object responseObject = new Object();
int requestMessageDataCode = 123;
EthPeer peer2 = Mockito.mock(EthPeer.class);
Mockito.when(peerManager.getPeer(Mockito.any(Predicate.class)))
.thenReturn(ethPeer)
.thenReturn(peer2);
Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData);
Mockito.when(peerTask.getPeerTaskBehaviors())
.thenReturn(List.of(PeerTaskBehavior.RETRY_WITH_OTHER_PEERS));
Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol);
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer))
.thenThrow(new TimeoutException());
Mockito.when(requestMessageData.getCode()).thenReturn(requestMessageDataCode);
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, peer2))
.thenReturn(responseMessageData);
Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject);
PeerTaskExecutorResult<Object> result = peerTaskExecutor.execute(peerTask);
Mockito.verify(ethPeer).recordRequestTimeout(requestMessageDataCode);
Mockito.verify(peer2).recordUsefulResponse();
Assertions.assertNotNull(result);
Assertions.assertTrue(result.getResult().isPresent());
Assertions.assertSame(responseObject, result.getResult().get());
Assertions.assertEquals(PeerTaskExecutorResponseCode.SUCCESS, result.getResponseCode());
}
}

@ -0,0 +1,77 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.RequestManager;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
public class PeerTaskRequestSenderTest {
private PeerTaskRequestSender peerTaskRequestSender;
@BeforeEach
public void beforeTest() {
peerTaskRequestSender = new PeerTaskRequestSender();
}
@Test
public void testSendRequest()
throws PeerConnection.PeerNotConnected,
ExecutionException,
InterruptedException,
TimeoutException {
String subprotocol = "subprotocol";
MessageData requestMessageData = Mockito.mock(MessageData.class);
MessageData responseMessageData = Mockito.mock(MessageData.class);
EthPeer peer = Mockito.mock(EthPeer.class);
PeerConnection peerConnection = Mockito.mock(PeerConnection.class);
RequestManager.ResponseStream responseStream =
Mockito.mock(RequestManager.ResponseStream.class);
Mockito.when(peer.getConnection()).thenReturn(peerConnection);
Mockito.when(peer.send(requestMessageData, subprotocol, peerConnection))
.thenReturn(responseStream);
CompletableFuture<MessageData> actualResponseMessageDataFuture =
CompletableFuture.supplyAsync(
() -> {
try {
return peerTaskRequestSender.sendRequest(subprotocol, requestMessageData, peer);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
Thread.sleep(500);
ArgumentCaptor<RequestManager.ResponseCallback> responseCallbackArgumentCaptor =
ArgumentCaptor.forClass(RequestManager.ResponseCallback.class);
Mockito.verify(responseStream).then(responseCallbackArgumentCaptor.capture());
RequestManager.ResponseCallback responseCallback = responseCallbackArgumentCaptor.getValue();
responseCallback.exec(false, responseMessageData, peer);
Assertions.assertSame(responseMessageData, actualResponseMessageDataFuture.get());
}
}
Loading…
Cancel
Save