[PAN-2312] Validate DAO block (#939)

Adds a PeerValidator that, when the Dao fork milestone is in use, checks that the Dao block is present on each peer when they connect and disconnects them if they are on the wrong chain.
Also:
* Make GetHeadersFromPeer task stricter in validating response matches.
* Update BlockHeadersMessage to return a list of headers
* Add more controls to DeterministicEthScheduler test util
Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
mbaxter 6 years ago committed by Adrian Sutton
parent 8f7ed8c3ae
commit 1b0a749ca6
  1. 48
      consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/StubbedPeerConnection.java
  2. 2
      consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/ValidatorPeer.java
  3. 46
      consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/EthSynchronizerUpdaterTest.java
  4. 6
      ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/mainnet/MainnetBlockHeaderValidator.java
  5. 4
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeer.java
  6. 4
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthScheduler.java
  7. 18
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractGetHeadersFromPeerTask.java
  8. 10
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractPeerRequestTask.java
  9. 12
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/BlockHeadersMessage.java
  10. 132
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/peervalidation/DaoForkPeerValidator.java
  11. 55
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/peervalidation/PeerValidator.java
  12. 67
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/peervalidation/PeerValidatorRunner.java
  13. 28
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java
  14. 21
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/DeterministicEthScheduler.java
  15. 25
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTestUtil.java
  16. 8
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthSchedulerTest.java
  17. 93
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/MockExecutorService.java
  18. 1
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/MockPeerConnection.java
  19. 13
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/RespondingEthPeer.java
  20. 5
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/BlockHeadersMessageTest.java
  21. 236
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/peervalidation/DaoForkPeerValidatorTest.java
  22. 122
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/peervalidation/PeerValidatorRunnerTest.java
  23. 3
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTaskTest.java
  24. 6
      ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/AdminJsonRpcHttpServiceTest.java
  25. 59
      ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/MockPeerConnection.java
  26. 2
      ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/methods/AdminPeersTest.java
  27. 9
      ethereum/mock-p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/testing/MockNetwork.java
  28. 3
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/api/PeerConnection.java
  29. 3
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/NettyPeerConnection.java
  30. 41
      ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/PeerDiscoveryAgentTest.java
  31. 23
      pantheon/src/main/java/tech/pegasys/pantheon/controller/MainnetPantheonController.java

@ -13,51 +13,19 @@
package tech.pegasys.pantheon.consensus.ibft.support;
import static java.util.Collections.emptyList;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection;
import tech.pegasys.pantheon.ethereum.p2p.wire.Capability;
import tech.pegasys.pantheon.ethereum.p2p.wire.PeerInfo;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.net.SocketAddress;
import java.util.Set;
public class StubbedPeerConnection {
public class StubbedPeerConnection implements PeerConnection {
private final BytesValue nodeId;
public StubbedPeerConnection(final BytesValue nodeId) {
this.nodeId = nodeId;
}
@Override
public void send(final Capability capability, final MessageData message)
throws PeerNotConnected {}
@Override
public Set<Capability> getAgreedCapabilities() {
return null;
}
@Override
public PeerInfo getPeer() {
return new PeerInfo(0, "IbftIntTestPeer", emptyList(), 0, nodeId);
}
@Override
public void terminateConnection(final DisconnectReason reason, final boolean peerInitiated) {}
@Override
public void disconnect(final DisconnectReason reason) {}
@Override
public SocketAddress getLocalAddress() {
return null;
}
@Override
public SocketAddress getRemoteAddress() {
return null;
public static PeerConnection create(final BytesValue nodeId) {
PeerConnection peerConnection = mock(PeerConnection.class);
PeerInfo peerInfo = new PeerInfo(0, "IbftIntTestPeer", emptyList(), 0, nodeId);
when(peerConnection.getPeer()).thenReturn(peerInfo);
return peerConnection;
}
}

@ -65,7 +65,7 @@ public class ValidatorPeer {
this.nodeAddress = nodeParams.getAddress();
this.messageFactory = messageFactory;
final BytesValue nodeId = nodeKeys.getPublicKey().getEncodedBytes();
this.peerConnection = new StubbedPeerConnection(nodeId);
this.peerConnection = StubbedPeerConnection.create(nodeId);
this.localEventMultiplexer = localEventMultiplexer;
}

@ -14,6 +14,7 @@ package tech.pegasys.pantheon.consensus.ibft;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
@ -22,14 +23,7 @@ import static org.mockito.Mockito.when;
import tech.pegasys.pantheon.ethereum.eth.manager.ChainState;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeers;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection;
import tech.pegasys.pantheon.ethereum.p2p.wire.Capability;
import tech.pegasys.pantheon.ethereum.p2p.wire.PeerInfo;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
import java.net.SocketAddress;
import java.util.Set;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -50,7 +44,7 @@ public class EthSynchronizerUpdaterTest {
final EthSynchronizerUpdater updater = new EthSynchronizerUpdater(ethPeers);
updater.updatePeerChainState(1, createAnonymousPeerConnection());
updater.updatePeerChainState(1, mock(PeerConnection.class));
verifyZeroInteractions(ethPeer);
}
@ -63,41 +57,7 @@ public class EthSynchronizerUpdaterTest {
final EthSynchronizerUpdater updater = new EthSynchronizerUpdater(ethPeers);
final long suppliedChainHeight = 6L;
updater.updatePeerChainState(suppliedChainHeight, createAnonymousPeerConnection());
updater.updatePeerChainState(suppliedChainHeight, mock(PeerConnection.class));
verify(chainState, times(1)).updateHeightEstimate(eq(suppliedChainHeight));
}
private PeerConnection createAnonymousPeerConnection() {
return new PeerConnection() {
@Override
public void send(final Capability capability, final MessageData message)
throws PeerNotConnected {}
@Override
public Set<Capability> getAgreedCapabilities() {
return null;
}
@Override
public PeerInfo getPeer() {
return new PeerInfo(0, null, null, 0, null);
}
@Override
public void terminateConnection(final DisconnectReason reason, final boolean peerInitiated) {}
@Override
public void disconnect(final DisconnectReason reason) {}
@Override
public SocketAddress getLocalAddress() {
return null;
}
@Override
public SocketAddress getRemoteAddress() {
return null;
}
};
}
}

@ -26,7 +26,7 @@ import tech.pegasys.pantheon.util.bytes.BytesValue;
public final class MainnetBlockHeaderValidator {
private static final BytesValue DAO_EXTRA_DATA =
public static final BytesValue DAO_EXTRA_DATA =
BytesValue.fromHexString("0x64616f2d686172642d666f726b");
private static final int MIN_GAS_LIMIT = 5000;
private static final long MAX_GAS_LIMIT = 0x7fffffffffffffffL;
@ -47,6 +47,10 @@ public final class MainnetBlockHeaderValidator {
.build();
}
public static boolean validateHeaderForDaoFork(final BlockHeader header) {
return header.getExtraData().equals(DAO_EXTRA_DATA);
}
static BlockHeaderValidator<Void> createOmmerValidator(
final DifficultyCalculator<Void> difficultyCalculator) {
return new BlockHeaderValidator.Builder<Void>()

@ -84,6 +84,10 @@ public class EthPeer {
this.onStatusesExchanged.set(onStatusesExchanged);
}
public boolean isDisconnected() {
return connection.isDisconnected();
}
public long addChainEstimatedHeightListener(final EstimatedHeightListener listener) {
return chainHeadState.addEstimatedHeightListener(listener);
}

@ -51,8 +51,8 @@ public class EthScheduler {
protected final ExecutorService syncWorkerExecutor;
protected final ScheduledExecutorService scheduler;
protected final ExecutorService txWorkerExecutor;
private final ExecutorService servicesExecutor;
private final ExecutorService computationExecutor;
protected final ExecutorService servicesExecutor;
protected final ExecutorService computationExecutor;
private final Collection<CompletableFuture<?>> serviceFutures = new ConcurrentLinkedDeque<>();

@ -26,7 +26,6 @@ import tech.pegasys.pantheon.metrics.OperationTimer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
@ -73,13 +72,17 @@ public abstract class AbstractGetHeadersFromPeerTask
}
final BlockHeadersMessage headersMessage = BlockHeadersMessage.readFrom(message);
final Iterator<BlockHeader> headers = headersMessage.getHeaders(protocolSchedule);
if (!headers.hasNext()) {
final List<BlockHeader> headers = headersMessage.getHeaders(protocolSchedule);
if (headers.isEmpty()) {
// Message contains no data - nothing to do
return Optional.empty();
}
if (headers.size() > count) {
// Too many headers - this isn't our response
return Optional.empty();
}
final BlockHeader firstHeader = headers.next();
final BlockHeader firstHeader = headers.get(0);
if (!matchesFirstHeader(firstHeader)) {
// This isn't our message - nothing to do
return Optional.empty();
@ -90,17 +93,14 @@ public abstract class AbstractGetHeadersFromPeerTask
long prevNumber = firstHeader.getNumber();
final int expectedDelta = reverse ? -(skip + 1) : (skip + 1);
while (headers.hasNext()) {
final BlockHeader header = headers.next();
for (int i = 1; i < headers.size(); i++) {
final BlockHeader header = headers.get(i);
if (header.getNumber() != prevNumber + expectedDelta) {
// Skip doesn't match, this isn't our data
return Optional.empty();
}
prevNumber = header.getNumber();
headersList.add(header);
if (headersList.size() == count) {
break;
}
}
LOG.debug("Received {} of {} headers requested from peer.", headersList.size(), count);

@ -24,12 +24,15 @@ import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.util.ExceptionUtils;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
public abstract class AbstractPeerRequestTask<R> extends AbstractPeerTask<R> {
private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(5);
private Duration timeout = DEFAULT_TIMEOUT;
private final int requestCode;
private volatile ResponseStream responseStream;
@ -41,6 +44,11 @@ public abstract class AbstractPeerRequestTask<R> extends AbstractPeerTask<R> {
this.requestCode = requestCode;
}
public AbstractPeerRequestTask<R> setTimeout(final Duration timeout) {
this.timeout = timeout;
return this;
}
@Override
protected final void executeTaskWithPeer(final EthPeer peer) throws PeerNotConnected {
final CompletableFuture<R> promise = new CompletableFuture<>();
@ -63,7 +71,7 @@ public abstract class AbstractPeerRequestTask<R> extends AbstractPeerTask<R> {
}
});
ethContext.getScheduler().failAfterTimeout(promise);
ethContext.getScheduler().failAfterTimeout(promise, timeout);
}
private void handleMessage(

@ -22,7 +22,8 @@ import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPInput;
import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPOutput;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.util.Iterator;
import java.util.Arrays;
import java.util.List;
public final class BlockHeadersMessage extends AbstractMessageData {
@ -38,6 +39,10 @@ public final class BlockHeadersMessage extends AbstractMessageData {
return new BlockHeadersMessage(message.getData());
}
public static BlockHeadersMessage create(final BlockHeader... headers) {
return create(Arrays.asList(headers));
}
public static BlockHeadersMessage create(final Iterable<BlockHeader> headers) {
final BytesValueRLPOutput tmp = new BytesValueRLPOutput();
tmp.startList();
@ -57,11 +62,10 @@ public final class BlockHeadersMessage extends AbstractMessageData {
return EthPV62.BLOCK_HEADERS;
}
public <C> Iterator<BlockHeader> getHeaders(final ProtocolSchedule<C> protocolSchedule) {
public <C> List<BlockHeader> getHeaders(final ProtocolSchedule<C> protocolSchedule) {
final BlockHashFunction blockHashFunction =
ScheduleBasedBlockHashFunction.create(protocolSchedule);
return new BytesValueRLPInput(data, false)
.readList(rlp -> BlockHeader.readFrom(rlp, blockHashFunction))
.iterator();
.readList(rlp -> BlockHeader.readFrom(rlp, blockHashFunction));
}
}

@ -0,0 +1,132 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.peervalidation;
import static com.google.common.base.Preconditions.checkArgument;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPeerTask;
import tech.pegasys.pantheon.ethereum.eth.manager.task.GetHeadersFromPeerByNumberTask;
import tech.pegasys.pantheon.ethereum.mainnet.MainnetBlockHeaderValidator;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class DaoForkPeerValidator implements PeerValidator {
private static final Logger LOG = LogManager.getLogger();
private static long DEFAULT_CHAIN_HEIGHT_ESTIMATION_BUFFER = 10L;
private final EthContext ethContext;
private final ProtocolSchedule<?> protocolSchedule;
private final LabelledMetric<OperationTimer> ethTasksTimer;
private final long daoBlockNumber;
// Wait for peer's chainhead to advance some distance beyond daoBlockNumber before validating
private final long chainHeightEstimationBuffer;
public DaoForkPeerValidator(
final EthContext ethContext,
final ProtocolSchedule<?> protocolSchedule,
final LabelledMetric<OperationTimer> ethTasksTimer,
final long daoBlockNumber,
final long chainHeightEstimationBuffer) {
checkArgument(chainHeightEstimationBuffer >= 0);
this.ethContext = ethContext;
this.protocolSchedule = protocolSchedule;
this.ethTasksTimer = ethTasksTimer;
this.daoBlockNumber = daoBlockNumber;
this.chainHeightEstimationBuffer = chainHeightEstimationBuffer;
}
public DaoForkPeerValidator(
final EthContext ethContext,
final ProtocolSchedule<?> protocolSchedule,
final LabelledMetric<OperationTimer> ethTasksTimer,
final long daoBlockNumber) {
this(
ethContext,
protocolSchedule,
ethTasksTimer,
daoBlockNumber,
DEFAULT_CHAIN_HEIGHT_ESTIMATION_BUFFER);
}
@Override
public CompletableFuture<Boolean> validatePeer(final EthPeer ethPeer) {
AbstractPeerTask<List<BlockHeader>> getHeaderTask =
GetHeadersFromPeerByNumberTask.forSingleNumber(
protocolSchedule, ethContext, daoBlockNumber, ethTasksTimer)
.setTimeout(Duration.ofSeconds(20))
.assignPeer(ethPeer);
return getHeaderTask
.run()
.handle(
(res, err) -> {
if (err != null) {
// Mark peer as invalid on error
LOG.debug(
"Peer {} is invalid because DAO block ({}) is unavailable: {}",
ethPeer,
daoBlockNumber,
err.toString());
return false;
}
List<BlockHeader> headers = res.getResult();
if (headers.size() == 0) {
// If no headers are returned, fail
LOG.debug(
"Peer {} is invalid because DAO block ({}) is unavailable.",
ethPeer,
daoBlockNumber);
return false;
}
BlockHeader header = headers.get(0);
boolean validDaoBlock = MainnetBlockHeaderValidator.validateHeaderForDaoFork(header);
if (!validDaoBlock) {
LOG.debug(
"Peer {} is invalid because DAO block ({}) is invalid.",
ethPeer,
daoBlockNumber);
}
return validDaoBlock;
});
}
@Override
public boolean canBeValidated(final EthPeer ethPeer) {
return ethPeer.chainState().getEstimatedHeight()
>= (daoBlockNumber + chainHeightEstimationBuffer);
}
@Override
public Duration nextValidationCheckTimeout(final EthPeer ethPeer) {
if (!ethPeer.chainState().hasEstimatedHeight()) {
return Duration.ofSeconds(30);
}
long distanceToDaoBlock = daoBlockNumber - ethPeer.chainState().getEstimatedHeight();
if (distanceToDaoBlock < 100_000L) {
return Duration.ofMinutes(1);
}
// If the peer is trailing behind, give it some time to catch up before trying again.
return Duration.ofMinutes(10);
}
}

@ -0,0 +1,55 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.peervalidation;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
public interface PeerValidator {
/**
* Whether the peer can currently be validated.
*
* @param ethPeer The peer that need validation.
* @return {@code} True if peer can be validated now.
*/
boolean canBeValidated(final EthPeer ethPeer);
/**
* If the peer cannot currently be validated, returns a timeout indicating how long to wait.
* before trying to validate the peer again.
*
* @param ethPeer The peer to be validated.
* @return A duration representing how long to wait before trying to validate this peer again.
*/
Duration nextValidationCheckTimeout(final EthPeer ethPeer);
/**
* Validates the given peer.
*
* @param ethPeer The peer to be validated.
* @return True if the peer is valid, false otherwise.
*/
CompletableFuture<Boolean> validatePeer(final EthPeer ethPeer);
/**
* @param ethPeer The peer to be disconnected.
* @return The reason for disconnecting.
*/
default DisconnectReason getDisconnectReason(final EthPeer ethPeer) {
return DisconnectReason.SUBPROTOCOL_TRIGGERED;
}
}

@ -0,0 +1,67 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.peervalidation;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import java.time.Duration;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class PeerValidatorRunner {
private static final Logger LOG = LogManager.getLogger();
protected final EthContext ethContext;
private final PeerValidator peerValidator;
PeerValidatorRunner(final EthContext ethContext, final PeerValidator peerValidator) {
this.ethContext = ethContext;
this.peerValidator = peerValidator;
ethContext.getEthPeers().subscribeConnect(this::checkPeer);
}
public static void runValidator(final EthContext ethContext, final PeerValidator peerValidator) {
new PeerValidatorRunner(ethContext, peerValidator);
}
public void checkPeer(final EthPeer ethPeer) {
if (peerValidator.canBeValidated(ethPeer)) {
peerValidator
.validatePeer(ethPeer)
.whenComplete(
(validated, err) -> {
if (err != null || !validated) {
// Disconnect invalid peer
disconnectPeer(ethPeer);
}
});
} else if (!ethPeer.isDisconnected()) {
scheduleNextCheck(ethPeer);
}
}
protected void disconnectPeer(final EthPeer ethPeer) {
LOG.debug(
"Disconnecting from peer {} marked invalid by {}",
ethPeer,
peerValidator.getClass().getSimpleName());
ethPeer.disconnect(peerValidator.getDisconnectReason(ethPeer));
}
protected void scheduleNextCheck(final EthPeer ethPeer) {
Duration timeout = peerValidator.nextValidationCheckTimeout(ethPeer);
ethContext.getScheduler().scheduleFutureTask(() -> checkPeer(ethPeer), timeout);
}
}

@ -56,14 +56,12 @@ public class DefaultSynchronizer<C> implements Synchronizer {
final EthContext ethContext,
final SyncState syncState,
final Path dataDirectory,
final MetricsSystem metricsSystem) {
final MetricsSystem metricsSystem,
final LabelledMetric<OperationTimer> ethTasksTimer) {
this.syncConfig = syncConfig;
this.ethContext = ethContext;
this.syncState = syncState;
final LabelledMetric<OperationTimer> ethTasksTimer =
metricsSystem.createLabelledTimer(
MetricCategory.SYNCHRONIZER, "task", "Internal processing tasks", "taskName");
this.blockPropagationManager =
new BlockPropagationManager<>(
syncConfig,
@ -95,6 +93,28 @@ public class DefaultSynchronizer<C> implements Synchronizer {
syncState);
}
public DefaultSynchronizer(
final SynchronizerConfiguration syncConfig,
final ProtocolSchedule<C> protocolSchedule,
final ProtocolContext<C> protocolContext,
final WorldStateStorage worldStateStorage,
final EthContext ethContext,
final SyncState syncState,
final Path dataDirectory,
final MetricsSystem metricsSystem) {
this(
syncConfig,
protocolSchedule,
protocolContext,
worldStateStorage,
ethContext,
syncState,
dataDirectory,
metricsSystem,
metricsSystem.createLabelledTimer(
MetricCategory.SYNCHRONIZER, "task", "Internal processing tasks", "taskName"));
}
@Override
public void start() {
if (started.compareAndSet(false, true)) {

@ -13,6 +13,8 @@
package tech.pegasys.pantheon.ethereum.eth.manager;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -22,6 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger;
public class DeterministicEthScheduler extends EthScheduler {
private final TimeoutPolicy timeoutPolicy;
private final List<MockExecutorService> executors;
public DeterministicEthScheduler() {
this(TimeoutPolicy.NEVER);
@ -34,7 +37,24 @@ public class DeterministicEthScheduler extends EthScheduler {
new MockExecutorService(),
new MockExecutorService(),
new MockExecutorService());
this.timeoutPolicy = timeoutPolicy;
this.executors =
Arrays.asList(
(MockExecutorService) this.syncWorkerExecutor,
(MockExecutorService) this.scheduler,
(MockExecutorService) this.txWorkerExecutor,
(MockExecutorService) this.servicesExecutor,
(MockExecutorService) this.computationExecutor);
}
// Test utility for running pending futures
public void runPendingFutures() {
executors.forEach(MockExecutorService::runPendingFutures);
}
public void disableAutoRun() {
executors.forEach(e -> e.setAutoRun(false));
}
MockExecutorService mockSyncWorkerExecutor() {
@ -62,6 +82,7 @@ public class DeterministicEthScheduler extends EthScheduler {
@FunctionalInterface
public interface TimeoutPolicy {
TimeoutPolicy NEVER = () -> false;
TimeoutPolicy ALWAYS = () -> true;
boolean shouldTimeout();

@ -12,6 +12,7 @@
*/
package tech.pegasys.pantheon.ethereum.eth.manager;
import static com.google.common.base.Preconditions.checkArgument;
import static tech.pegasys.pantheon.ethereum.core.InMemoryStorageProvider.createInMemoryBlockchain;
import static tech.pegasys.pantheon.ethereum.core.InMemoryStorageProvider.createInMemoryWorldStateArchive;
@ -71,6 +72,30 @@ public class EthProtocolManagerTestUtil {
return create(blockchain, worldStateArchive, timeoutPolicy);
}
// Utility to prevent scheduler from automatically running submitted tasks
public static void disableEthSchedulerAutoRun(final EthProtocolManager ethProtocolManager) {
EthScheduler scheduler = ethProtocolManager.ethContext().getScheduler();
checkArgument(
scheduler instanceof DeterministicEthScheduler,
"EthProtocolManager must be set up with "
+ DeterministicEthScheduler.class.getSimpleName()
+ " in order to disable auto run.");
((DeterministicEthScheduler) scheduler).disableAutoRun();
}
// Manually runs any pending tasks submitted to the EthScheduler
// Works with {@code disableEthSchedulerAutoRun} - tasks will only be pending if
// autoRun has been disabled.
public static void runPendingFutures(final EthProtocolManager ethProtocolManager) {
EthScheduler scheduler = ethProtocolManager.ethContext().getScheduler();
checkArgument(
scheduler instanceof DeterministicEthScheduler,
"EthProtocolManager must be set up with "
+ DeterministicEthScheduler.class.getSimpleName()
+ " in order to manually run pending futures.");
((DeterministicEthScheduler) scheduler).runPendingFutures();
}
public static void broadcastMessage(
final EthProtocolManager ethProtocolManager,
final RespondingEthPeer peer,

@ -84,8 +84,8 @@ public class EthSchedulerTest {
final CompletableFuture<Object> result =
ethScheduler.scheduleSyncWorkerTask(() -> new CompletableFuture<>());
assertThat(syncWorkerExecutor.getScheduledFutures().size()).isEqualTo(1);
final Future<?> future = syncWorkerExecutor.getScheduledFutures().get(0);
assertThat(syncWorkerExecutor.getFutures().size()).isEqualTo(1);
final Future<?> future = syncWorkerExecutor.getFutures().get(0);
verify(future, times(0)).cancel(anyBoolean());
result.cancel(true);
@ -136,8 +136,8 @@ public class EthSchedulerTest {
final CompletableFuture<Object> result =
ethScheduler.scheduleFutureTask(() -> new CompletableFuture<>(), Duration.ofMillis(100));
assertThat(scheduledExecutor.getScheduledFutures().size()).isEqualTo(1);
final Future<?> future = scheduledExecutor.getScheduledFutures().get(0);
assertThat(scheduledExecutor.getFutures().size()).isEqualTo(1);
final Future<?> future = scheduledExecutor.getFutures().get(0);
verify(future, times(0)).cancel(anyBoolean());
result.cancel(true);

@ -25,14 +25,25 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
class MockExecutorService implements ExecutorService {
private final List<Future<?>> scheduledFutures = new ArrayList<>();
private boolean autoRun = true;
private final List<ExecutorTask<?>> tasks = new ArrayList<>();
// Test utility for inspecting scheduled futures
public List<Future<?>> getScheduledFutures() {
return scheduledFutures;
// Test utility for inspecting executor's futures
public List<Future<?>> getFutures() {
return tasks.stream().map(ExecutorTask::getFuture).collect(Collectors.toList());
}
public void setAutoRun(final boolean shouldAutoRunTasks) {
this.autoRun = shouldAutoRunTasks;
}
public void runPendingFutures() {
ArrayList<ExecutorTask<?>> currentTasks = new ArrayList<>(tasks);
currentTasks.forEach(ExecutorTask::run);
}
@Override
@ -61,44 +72,31 @@ class MockExecutorService implements ExecutorService {
@Override
public <T> Future<T> submit(final Callable<T> task) {
CompletableFuture<T> future = new CompletableFuture<>();
try {
final T result = task.call();
future.complete(result);
} catch (final Exception e) {
future.completeExceptionally(e);
ExecutorTask<T> execTask = new ExecutorTask<>(task::call);
tasks.add(execTask);
if (autoRun) {
execTask.run();
}
future = spy(future);
scheduledFutures.add(future);
return future;
return execTask.getFuture();
}
@Override
public <T> Future<T> submit(final Runnable task, final T result) {
CompletableFuture<T> future = new CompletableFuture<>();
try {
return submit(
() -> {
task.run();
future.complete(result);
} catch (final Exception e) {
future.completeExceptionally(e);
}
future = spy(future);
scheduledFutures.add(future);
return future;
return result;
});
}
@Override
public Future<?> submit(final Runnable task) {
CompletableFuture<?> future = new CompletableFuture<>();
try {
return submit(
() -> {
task.run();
future.complete(null);
} catch (final Exception e) {
future.completeExceptionally(e);
}
future = spy(future);
scheduledFutures.add(future);
return future;
return null;
});
}
@Override
@ -129,4 +127,37 @@ class MockExecutorService implements ExecutorService {
@Override
public void execute(final Runnable command) {}
private static class ExecutorTask<T> {
private final CompletableFuture<T> future;
private final Callable<T> taskRunner;
private boolean isPending = true;
private ExecutorTask(final Callable<T> taskRunner) {
this.future = spy(new CompletableFuture<>());
this.taskRunner = taskRunner;
}
public void run() {
if (!isPending) {
return;
}
isPending = false;
try {
T result = taskRunner.call();
future.complete(result);
} catch (final Exception e) {
future.completeExceptionally(e);
}
}
public CompletableFuture<T> getFuture() {
return future;
}
public boolean isPending() {
return isPending;
}
}
}

@ -89,6 +89,7 @@ public class MockPeerConnection implements PeerConnection {
throw new UnsupportedOperationException();
}
@Override
public boolean isDisconnected() {
return disconnected;
}

@ -46,6 +46,7 @@ import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.stream.Stream;
import com.google.common.collect.Lists;
@ -210,6 +211,18 @@ public class RespondingEthPeer {
return !outgoingMessages.isEmpty();
}
public static Responder targetedResponder(
final BiFunction<Capability, MessageData, Boolean> requestFilter,
final BiFunction<Capability, MessageData, MessageData> responseGenerator) {
return (cap, msg) -> {
if (requestFilter.apply(cap, msg)) {
return Optional.of(responseGenerator.apply(cap, msg));
} else {
return Optional.empty();
}
};
}
public static Responder blockchainResponder(final Blockchain blockchain) {
return blockchainResponder(blockchain, createInMemoryWorldStateArchive());
}

@ -27,7 +27,6 @@ import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import com.google.common.io.Resources;
@ -57,13 +56,13 @@ public final class BlockHeadersMessageTest {
final MessageData initialMessage = BlockHeadersMessage.create(headers);
final MessageData raw = new RawMessage(EthPV62.BLOCK_HEADERS, initialMessage.getData());
final BlockHeadersMessage message = BlockHeadersMessage.readFrom(raw);
final Iterator<BlockHeader> readHeaders =
final List<BlockHeader> readHeaders =
message.getHeaders(
FixedDifficultyProtocolSchedule.create(
GenesisConfigFile.development().getConfigOptions(), PrivacyParameters.noPrivacy()));
for (int i = 0; i < 50; ++i) {
Assertions.assertThat(readHeaders.next()).isEqualTo(headers.get(i));
Assertions.assertThat(readHeaders.get(i)).isEqualTo(headers.get(i));
}
}
}

@ -0,0 +1,236 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.peervalidation;
import static org.assertj.core.api.Assertions.assertThat;
import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator;
import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator.BlockOptions;
import tech.pegasys.pantheon.ethereum.eth.manager.DeterministicEthScheduler.TimeoutPolicy;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManager;
import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil;
import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.Responder;
import tech.pegasys.pantheon.ethereum.eth.messages.BlockHeadersMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.EthPV62;
import tech.pegasys.pantheon.ethereum.eth.messages.GetBlockHeadersMessage;
import tech.pegasys.pantheon.ethereum.mainnet.MainnetBlockHeaderValidator;
import tech.pegasys.pantheon.ethereum.mainnet.MainnetProtocolSchedule;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.Test;
public class DaoForkPeerValidatorTest {
@Test
public void validatePeer_responsivePeerOnRightSideOfFork() {
EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create();
BlockDataGenerator gen = new BlockDataGenerator(1);
long daoBlockNumber = 500;
Block daoBlock =
gen.block(
BlockOptions.create()
.setBlockNumber(daoBlockNumber)
.setExtraData(MainnetBlockHeaderValidator.DAO_EXTRA_DATA));
PeerValidator validator =
new DaoForkPeerValidator(
ethProtocolManager.ethContext(),
MainnetProtocolSchedule.create(),
NoOpMetricsSystem.NO_OP_LABELLED_TIMER,
daoBlockNumber,
0);
RespondingEthPeer peer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, daoBlockNumber);
CompletableFuture<Boolean> result = validator.validatePeer(peer.getEthPeer());
assertThat(result).isNotDone();
// Send response for dao block
AtomicBoolean daoBlockRequested = respondToDaoBlockRequest(peer, daoBlock);
assertThat(daoBlockRequested).isTrue();
assertThat(result).isDone();
assertThat(result).isCompletedWithValue(true);
}
@Test
public void validatePeer_responsivePeerOnWrongSideOfFork() {
EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create();
BlockDataGenerator gen = new BlockDataGenerator(1);
long daoBlockNumber = 500;
Block daoBlock =
gen.block(
BlockOptions.create().setBlockNumber(daoBlockNumber).setExtraData(BytesValue.EMPTY));
PeerValidator validator =
new DaoForkPeerValidator(
ethProtocolManager.ethContext(),
MainnetProtocolSchedule.create(),
NoOpMetricsSystem.NO_OP_LABELLED_TIMER,
daoBlockNumber,
0);
RespondingEthPeer peer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, daoBlockNumber);
CompletableFuture<Boolean> result = validator.validatePeer(peer.getEthPeer());
assertThat(result).isNotDone();
// Send response for dao block
AtomicBoolean daoBlockRequested = respondToDaoBlockRequest(peer, daoBlock);
assertThat(daoBlockRequested).isTrue();
assertThat(result).isDone();
assertThat(result).isCompletedWithValue(false);
}
@Test
public void validatePeer_unresponsivePeer() {
EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(TimeoutPolicy.ALWAYS);
long daoBlockNumber = 500;
PeerValidator validator =
new DaoForkPeerValidator(
ethProtocolManager.ethContext(),
MainnetProtocolSchedule.create(),
NoOpMetricsSystem.NO_OP_LABELLED_TIMER,
daoBlockNumber,
0);
RespondingEthPeer peer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, daoBlockNumber);
CompletableFuture<Boolean> result = validator.validatePeer(peer.getEthPeer());
// Request should timeout immediately
assertThat(result).isDone();
assertThat(result).isCompletedWithValue(false);
}
@Test
public void validatePeer_requestBlockFromPeerBeingTested() {
EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create();
BlockDataGenerator gen = new BlockDataGenerator(1);
long daoBlockNumber = 500;
Block daoBlock =
gen.block(
BlockOptions.create()
.setBlockNumber(daoBlockNumber)
.setExtraData(MainnetBlockHeaderValidator.DAO_EXTRA_DATA));
PeerValidator validator =
new DaoForkPeerValidator(
ethProtocolManager.ethContext(),
MainnetProtocolSchedule.create(),
NoOpMetricsSystem.NO_OP_LABELLED_TIMER,
daoBlockNumber,
0);
int peerCount = 1000;
List<RespondingEthPeer> otherPeers =
Stream.generate(
() -> EthProtocolManagerTestUtil.createPeer(ethProtocolManager, daoBlockNumber))
.limit(peerCount)
.collect(Collectors.toList());
RespondingEthPeer targetPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, daoBlockNumber);
CompletableFuture<Boolean> result = validator.validatePeer(targetPeer.getEthPeer());
assertThat(result).isNotDone();
// Other peers should not receive request for dao block
for (RespondingEthPeer otherPeer : otherPeers) {
AtomicBoolean daoBlockRequestedForOtherPeer = respondToDaoBlockRequest(otherPeer, daoBlock);
assertThat(daoBlockRequestedForOtherPeer).isFalse();
}
// Target peer should receive request for dao block
final AtomicBoolean daoBlockRequested = respondToDaoBlockRequest(targetPeer, daoBlock);
assertThat(daoBlockRequested).isTrue();
}
@Test
public void canBeValidated() {
BlockDataGenerator gen = new BlockDataGenerator(1);
EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(TimeoutPolicy.ALWAYS);
long daoBlockNumber = 500;
long buffer = 10;
PeerValidator validator =
new DaoForkPeerValidator(
ethProtocolManager.ethContext(),
MainnetProtocolSchedule.create(),
NoOpMetricsSystem.NO_OP_LABELLED_TIMER,
daoBlockNumber,
buffer);
EthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0).getEthPeer();
peer.chainState().update(gen.hash(), daoBlockNumber - 10);
assertThat(validator.canBeValidated(peer)).isFalse();
peer.chainState().update(gen.hash(), daoBlockNumber);
assertThat(validator.canBeValidated(peer)).isFalse();
peer.chainState().update(gen.hash(), daoBlockNumber + buffer - 1);
assertThat(validator.canBeValidated(peer)).isFalse();
peer.chainState().update(gen.hash(), daoBlockNumber + buffer);
assertThat(validator.canBeValidated(peer)).isTrue();
peer.chainState().update(gen.hash(), daoBlockNumber + buffer + 10);
assertThat(validator.canBeValidated(peer)).isTrue();
}
private AtomicBoolean respondToDaoBlockRequest(
final RespondingEthPeer peer, final Block daoBlock) {
AtomicBoolean daoBlockRequested = new AtomicBoolean(false);
Responder responder =
RespondingEthPeer.targetedResponder(
(cap, msg) -> {
if (msg.getCode() != EthPV62.GET_BLOCK_HEADERS) {
return false;
}
GetBlockHeadersMessage headersRequest = GetBlockHeadersMessage.readFrom(msg);
boolean isDaoBlockRequest =
headersRequest.blockNumber().isPresent()
&& headersRequest.blockNumber().getAsLong()
== daoBlock.getHeader().getNumber();
if (isDaoBlockRequest) {
daoBlockRequested.set(true);
}
return isDaoBlockRequest;
},
(cap, msg) -> BlockHeadersMessage.create(daoBlock.getHeader()));
// Respond
peer.respond(responder);
return daoBlockRequested;
}
}

@ -0,0 +1,122 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.peervalidation;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManager;
import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import org.junit.Test;
public class PeerValidatorRunnerTest {
@Test
public void checkPeer_schedulesFutureCheckWhenPeerNotReady() {
EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create();
EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager);
EthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager).getEthPeer();
PeerValidator validator = mock(PeerValidator.class);
when(validator.canBeValidated(eq(peer))).thenReturn(false);
when(validator.nextValidationCheckTimeout(eq(peer))).thenReturn(Duration.ofSeconds(30));
PeerValidatorRunner runner =
spy(new PeerValidatorRunner(ethProtocolManager.ethContext(), validator));
runner.checkPeer(peer);
verify(runner, times(1)).checkPeer(eq(peer));
verify(validator, never()).validatePeer(eq(peer));
verify(runner, never()).disconnectPeer(eq(peer));
verify(runner, times(1)).scheduleNextCheck(eq(peer));
// Run pending futures to trigger the next check
EthProtocolManagerTestUtil.runPendingFutures(ethProtocolManager);
verify(runner, times(2)).checkPeer(eq(peer));
verify(validator, never()).validatePeer(eq(peer));
verify(runner, never()).disconnectPeer(eq(peer));
verify(runner, times(2)).scheduleNextCheck(eq(peer));
}
@Test
public void checkPeer_doesNotScheduleFutureCheckWhenPeerNotReadyAndDisconnected() {
EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create();
EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager);
EthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager).getEthPeer();
peer.disconnect(DisconnectReason.SUBPROTOCOL_TRIGGERED);
PeerValidator validator = mock(PeerValidator.class);
when(validator.canBeValidated(eq(peer))).thenReturn(false);
when(validator.nextValidationCheckTimeout(eq(peer))).thenReturn(Duration.ofSeconds(30));
PeerValidatorRunner runner =
spy(new PeerValidatorRunner(ethProtocolManager.ethContext(), validator));
runner.checkPeer(peer);
verify(runner, times(1)).checkPeer(eq(peer));
verify(validator, never()).validatePeer(eq(peer));
verify(runner, never()).disconnectPeer(eq(peer));
verify(runner, times(0)).scheduleNextCheck(eq(peer));
}
@Test
public void checkPeer_handlesInvalidPeer() {
EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create();
EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager);
EthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager).getEthPeer();
PeerValidator validator = mock(PeerValidator.class);
when(validator.canBeValidated(eq(peer))).thenReturn(true);
when(validator.validatePeer(eq(peer))).thenReturn(CompletableFuture.completedFuture(false));
when(validator.nextValidationCheckTimeout(eq(peer))).thenReturn(Duration.ofSeconds(30));
PeerValidatorRunner runner =
spy(new PeerValidatorRunner(ethProtocolManager.ethContext(), validator));
runner.checkPeer(peer);
verify(validator, times(1)).validatePeer(eq(peer));
verify(runner, times(1)).disconnectPeer(eq(peer));
verify(runner, never()).scheduleNextCheck(eq(peer));
}
@Test
public void checkPeer_handlesValidPeer() {
EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create();
EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager);
EthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager).getEthPeer();
PeerValidator validator = mock(PeerValidator.class);
when(validator.canBeValidated(eq(peer))).thenReturn(true);
when(validator.validatePeer(eq(peer))).thenReturn(CompletableFuture.completedFuture(true));
when(validator.nextValidationCheckTimeout(eq(peer))).thenReturn(Duration.ofSeconds(30));
PeerValidatorRunner runner =
spy(new PeerValidatorRunner(ethProtocolManager.ethContext(), validator));
runner.checkPeer(peer);
verify(validator, times(1)).validatePeer(eq(peer));
verify(runner, never()).disconnectPeer(eq(peer));
verify(runner, never()).scheduleNextCheck(eq(peer));
}
}

@ -33,7 +33,6 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import com.google.common.collect.Streams;
import org.junit.Test;
public class DownloadHeaderSequenceTaskTest extends RetryingMessageTaskTest<List<BlockHeader>> {
@ -122,7 +121,7 @@ public class DownloadHeaderSequenceTaskTest extends RetryingMessageTaskTest<List
BlockHeadersMessage.readFrom(fullResponse.get());
// Filter for a subset of headers
final List<BlockHeader> headerSubset =
Streams.stream(headersMessage.getHeaders(protocolSchedule))
headersMessage.getHeaders(protocolSchedule).stream()
.filter(h -> h.getNumber() >= referenceHeader.getNumber() - 1L)
.collect(Collectors.toList());
return Optional.of(BlockHeadersMessage.create(headerSubset));

@ -58,9 +58,9 @@ public class AdminJsonRpcHttpServiceTest extends JsonRpcHttpServiceTest {
final InetSocketAddress addr60302 = new InetSocketAddress("localhost", 60302);
final InetSocketAddress addr60303 = new InetSocketAddress("localhost", 60303);
peerList.add(new MockPeerConnection(info1, addr60301, addr30302));
peerList.add(new MockPeerConnection(info2, addr30301, addr60302));
peerList.add(new MockPeerConnection(info3, addr30301, addr60303));
peerList.add(MockPeerConnection.create(info1, addr60301, addr30302));
peerList.add(MockPeerConnection.create(info2, addr30301, addr60302));
peerList.add(MockPeerConnection.create(info3, addr30301, addr60303));
when(peerDiscoveryMock.getPeers()).thenReturn(peerList);

@ -12,67 +12,28 @@
*/
package tech.pegasys.pantheon.ethereum.jsonrpc;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection;
import tech.pegasys.pantheon.ethereum.p2p.wire.Capability;
import tech.pegasys.pantheon.ethereum.p2p.wire.PeerInfo;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Set;
public class MockPeerConnection implements PeerConnection {
public class MockPeerConnection {
PeerInfo peerInfo;
InetSocketAddress localAddress;
InetSocketAddress remoteAddress;
public MockPeerConnection(
public static PeerConnection create(
final PeerInfo peerInfo,
final InetSocketAddress localAddress,
final InetSocketAddress remoteAddress) {
this.peerInfo = peerInfo;
this.localAddress = localAddress;
this.remoteAddress = remoteAddress;
}
@Override
public void send(final Capability capability, final MessageData message) {
throw new UnsupportedOperationException();
}
@Override
public Set<Capability> getAgreedCapabilities() {
throw new UnsupportedOperationException();
}
@Override
public Capability capability(final String protocol) {
throw new UnsupportedOperationException();
}
@Override
public PeerInfo getPeer() {
return peerInfo;
}
@Override
public void terminateConnection(final DisconnectReason reason, final boolean peerInitiated) {
throw new UnsupportedOperationException();
}
@Override
public void disconnect(final DisconnectReason reason) {
throw new UnsupportedOperationException();
}
@Override
public SocketAddress getLocalAddress() {
return localAddress;
}
PeerConnection peerConnection = mock(PeerConnection.class);
when(peerConnection.getPeer()).thenReturn(peerInfo);
when(peerConnection.getLocalAddress()).thenReturn(localAddress);
when(peerConnection.getRemoteAddress()).thenReturn(remoteAddress);
@Override
public SocketAddress getRemoteAddress() {
return remoteAddress;
return peerConnection;
}
}

@ -101,7 +101,7 @@ public class AdminPeersTest {
final PeerInfo peerInfo =
new PeerInfo(5, "0x0", Collections.emptyList(), 30303, BytesValue.EMPTY);
final PeerConnection p =
new MockPeerConnection(
MockPeerConnection.create(
peerInfo,
InetSocketAddress.createUnresolved("1.2.3.4", 9876),
InetSocketAddress.createUnresolved("4.3.2.1", 6789));

@ -218,6 +218,8 @@ public final class MockNetwork {
/** {@link Peer} that this connection originates from. */
private final Peer from;
private boolean disconnected = false;
/**
* Peer that this connection targets and that will receive {@link Message}s sent via {@link
* #send(Capability, MessageData)}.
@ -267,14 +269,21 @@ public final class MockNetwork {
@Override
public void terminateConnection(final DisconnectReason reason, final boolean peerInitiated) {
disconnected = true;
network.disconnect(this, reason);
}
@Override
public void disconnect(final DisconnectReason reason) {
disconnected = true;
network.disconnect(this, reason);
}
@Override
public boolean isDisconnected() {
return disconnected;
}
@Override
public SocketAddress getLocalAddress() {
throw new UnsupportedOperationException();

@ -85,6 +85,9 @@ public interface PeerConnection {
*/
void disconnect(DisconnectReason reason);
/** @return True if the peer is disconnected */
boolean isDisconnected();
SocketAddress getLocalAddress();
SocketAddress getRemoteAddress();

@ -150,7 +150,8 @@ final class NettyPeerConnection implements PeerConnection {
}
}
private boolean isDisconnected() {
@Override
public boolean isDisconnected() {
return disconnected.get();
}

@ -13,8 +13,9 @@
package tech.pegasys.pantheon.ethereum.p2p.discovery;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection;
import tech.pegasys.pantheon.ethereum.p2p.discovery.PeerDiscoveryTestHelper.AgentBuilder;
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.FindNeighborsPacketData;
@ -24,15 +25,12 @@ import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.NeighborsPacketData
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.Packet;
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PacketType;
import tech.pegasys.pantheon.ethereum.p2p.peers.PeerBlacklist;
import tech.pegasys.pantheon.ethereum.p2p.wire.Capability;
import tech.pegasys.pantheon.ethereum.p2p.wire.PeerInfo;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.junit.Test;
@ -287,36 +285,9 @@ public class PeerDiscoveryAgentTest {
}
private PeerConnection createAnonymousPeerConnection(final BytesValue id) {
return new PeerConnection() {
@Override
public void send(final Capability capability, final MessageData message)
throws PeerNotConnected {}
@Override
public Set<Capability> getAgreedCapabilities() {
return null;
}
@Override
public PeerInfo getPeer() {
return new PeerInfo(0, null, null, 0, id);
}
@Override
public void terminateConnection(final DisconnectReason reason, final boolean peerInitiated) {}
@Override
public void disconnect(final DisconnectReason reason) {}
@Override
public SocketAddress getLocalAddress() {
return null;
}
@Override
public SocketAddress getRemoteAddress() {
return null;
}
};
PeerConnection conn = mock(PeerConnection.class);
PeerInfo peerInfo = new PeerInfo(0, null, null, 0, id);
when(conn.getPeer()).thenReturn(peerInfo);
return conn;
}
}

@ -26,7 +26,10 @@ import tech.pegasys.pantheon.ethereum.core.PrivacyParameters;
import tech.pegasys.pantheon.ethereum.core.Synchronizer;
import tech.pegasys.pantheon.ethereum.core.TransactionPool;
import tech.pegasys.pantheon.ethereum.eth.EthProtocol;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManager;
import tech.pegasys.pantheon.ethereum.eth.peervalidation.DaoForkPeerValidator;
import tech.pegasys.pantheon.ethereum.eth.peervalidation.PeerValidatorRunner;
import tech.pegasys.pantheon.ethereum.eth.sync.DefaultSynchronizer;
import tech.pegasys.pantheon.ethereum.eth.sync.SyncMode;
import tech.pegasys.pantheon.ethereum.eth.sync.SynchronizerConfiguration;
@ -37,11 +40,15 @@ import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.p2p.api.ProtocolManager;
import tech.pegasys.pantheon.ethereum.p2p.config.SubProtocolConfiguration;
import tech.pegasys.pantheon.ethereum.storage.StorageProvider;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.metrics.OperationTimer;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Clock;
import java.util.OptionalLong;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@ -116,6 +123,9 @@ public class MainnetPantheonController implements PantheonController<Void> {
metricsSystem);
final SyncState syncState =
new SyncState(blockchain, ethProtocolManager.ethContext().getEthPeers());
final LabelledMetric<OperationTimer> ethTasksTimer =
metricsSystem.createLabelledTimer(
MetricCategory.SYNCHRONIZER, "task", "Internal processing tasks", "taskName");
final Synchronizer synchronizer =
new DefaultSynchronizer<>(
syncConfig,
@ -125,7 +135,18 @@ public class MainnetPantheonController implements PantheonController<Void> {
ethProtocolManager.ethContext(),
syncState,
dataDirectory,
metricsSystem);
metricsSystem,
ethTasksTimer);
OptionalLong daoBlock = genesisConfig.getConfigOptions().getDaoForkBlock();
if (daoBlock.isPresent()) {
// Setup dao validator
EthContext ethContext = ethProtocolManager.ethContext();
DaoForkPeerValidator daoForkPeerValidator =
new DaoForkPeerValidator(
ethContext, protocolSchedule, ethTasksTimer, daoBlock.getAsLong());
PeerValidatorRunner.runValidator(ethContext, daoForkPeerValidator);
}
final TransactionPool transactionPool =
TransactionPoolFactory.createTransactionPool(

Loading…
Cancel
Save