From 1b0a749ca6d37aa2bf0232345b037e984d631c0f Mon Sep 17 00:00:00 2001 From: mbaxter Date: Thu, 21 Feb 2019 23:39:58 -0500 Subject: [PATCH] [PAN-2312] Validate DAO block (#939) Adds a PeerValidator that, when the Dao fork milestone is in use, checks that the Dao block is present on each peer when they connect and disconnects them if they are on the wrong chain. Also: * Make GetHeadersFromPeer task stricter in validating response matches. * Update BlockHeadersMessage to return a list of headers * Add more controls to DeterministicEthScheduler test util Signed-off-by: Adrian Sutton --- .../ibft/support/StubbedPeerConnection.java | 48 +--- .../consensus/ibft/support/ValidatorPeer.java | 2 +- .../ibft/EthSynchronizerUpdaterTest.java | 46 +--- .../mainnet/MainnetBlockHeaderValidator.java | 6 +- .../ethereum/eth/manager/EthPeer.java | 4 + .../ethereum/eth/manager/EthScheduler.java | 4 +- .../task/AbstractGetHeadersFromPeerTask.java | 18 +- .../manager/task/AbstractPeerRequestTask.java | 10 +- .../eth/messages/BlockHeadersMessage.java | 12 +- .../peervalidation/DaoForkPeerValidator.java | 132 ++++++++++ .../eth/peervalidation/PeerValidator.java | 55 ++++ .../peervalidation/PeerValidatorRunner.java | 67 +++++ .../eth/sync/DefaultSynchronizer.java | 28 ++- .../manager/DeterministicEthScheduler.java | 21 ++ .../manager/EthProtocolManagerTestUtil.java | 25 ++ .../eth/manager/EthSchedulerTest.java | 8 +- .../eth/manager/MockExecutorService.java | 97 ++++--- .../eth/manager/MockPeerConnection.java | 1 + .../eth/manager/RespondingEthPeer.java | 13 + .../eth/messages/BlockHeadersMessageTest.java | 5 +- .../DaoForkPeerValidatorTest.java | 236 ++++++++++++++++++ .../PeerValidatorRunnerTest.java | 122 +++++++++ .../tasks/DownloadHeaderSequenceTaskTest.java | 3 +- .../jsonrpc/AdminJsonRpcHttpServiceTest.java | 6 +- .../ethereum/jsonrpc/MockPeerConnection.java | 59 +---- .../internal/methods/AdminPeersTest.java | 2 +- .../ethereum/p2p/testing/MockNetwork.java | 9 + .../ethereum/p2p/api/PeerConnection.java | 3 + .../p2p/netty/NettyPeerConnection.java | 3 +- .../p2p/discovery/PeerDiscoveryAgentTest.java | 41 +-- .../controller/MainnetPantheonController.java | 23 +- 31 files changed, 872 insertions(+), 237 deletions(-) create mode 100644 ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/peervalidation/DaoForkPeerValidator.java create mode 100644 ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/peervalidation/PeerValidator.java create mode 100644 ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/peervalidation/PeerValidatorRunner.java create mode 100644 ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/peervalidation/DaoForkPeerValidatorTest.java create mode 100644 ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/peervalidation/PeerValidatorRunnerTest.java diff --git a/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/StubbedPeerConnection.java b/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/StubbedPeerConnection.java index 46f0c95d39..e5dfe381c4 100644 --- a/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/StubbedPeerConnection.java +++ b/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/StubbedPeerConnection.java @@ -13,51 +13,19 @@ package tech.pegasys.pantheon.consensus.ibft.support; import static java.util.Collections.emptyList; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; -import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection; -import tech.pegasys.pantheon.ethereum.p2p.wire.Capability; import tech.pegasys.pantheon.ethereum.p2p.wire.PeerInfo; -import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason; import tech.pegasys.pantheon.util.bytes.BytesValue; -import java.net.SocketAddress; -import java.util.Set; +public class StubbedPeerConnection { -public class StubbedPeerConnection implements PeerConnection { - private final BytesValue nodeId; - - public StubbedPeerConnection(final BytesValue nodeId) { - this.nodeId = nodeId; - } - - @Override - public void send(final Capability capability, final MessageData message) - throws PeerNotConnected {} - - @Override - public Set getAgreedCapabilities() { - return null; - } - - @Override - public PeerInfo getPeer() { - return new PeerInfo(0, "IbftIntTestPeer", emptyList(), 0, nodeId); - } - - @Override - public void terminateConnection(final DisconnectReason reason, final boolean peerInitiated) {} - - @Override - public void disconnect(final DisconnectReason reason) {} - - @Override - public SocketAddress getLocalAddress() { - return null; - } - - @Override - public SocketAddress getRemoteAddress() { - return null; + public static PeerConnection create(final BytesValue nodeId) { + PeerConnection peerConnection = mock(PeerConnection.class); + PeerInfo peerInfo = new PeerInfo(0, "IbftIntTestPeer", emptyList(), 0, nodeId); + when(peerConnection.getPeer()).thenReturn(peerInfo); + return peerConnection; } } diff --git a/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/ValidatorPeer.java b/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/ValidatorPeer.java index b178f20f72..4d77d8f946 100644 --- a/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/ValidatorPeer.java +++ b/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/ValidatorPeer.java @@ -65,7 +65,7 @@ public class ValidatorPeer { this.nodeAddress = nodeParams.getAddress(); this.messageFactory = messageFactory; final BytesValue nodeId = nodeKeys.getPublicKey().getEncodedBytes(); - this.peerConnection = new StubbedPeerConnection(nodeId); + this.peerConnection = StubbedPeerConnection.create(nodeId); this.localEventMultiplexer = localEventMultiplexer; } diff --git a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/EthSynchronizerUpdaterTest.java b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/EthSynchronizerUpdaterTest.java index 730a17e4b3..74e6a719fb 100644 --- a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/EthSynchronizerUpdaterTest.java +++ b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/EthSynchronizerUpdaterTest.java @@ -14,6 +14,7 @@ package tech.pegasys.pantheon.consensus.ibft; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyZeroInteractions; @@ -22,14 +23,7 @@ import static org.mockito.Mockito.when; import tech.pegasys.pantheon.ethereum.eth.manager.ChainState; import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; import tech.pegasys.pantheon.ethereum.eth.manager.EthPeers; -import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection; -import tech.pegasys.pantheon.ethereum.p2p.wire.Capability; -import tech.pegasys.pantheon.ethereum.p2p.wire.PeerInfo; -import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason; - -import java.net.SocketAddress; -import java.util.Set; import org.junit.Test; import org.junit.runner.RunWith; @@ -50,7 +44,7 @@ public class EthSynchronizerUpdaterTest { final EthSynchronizerUpdater updater = new EthSynchronizerUpdater(ethPeers); - updater.updatePeerChainState(1, createAnonymousPeerConnection()); + updater.updatePeerChainState(1, mock(PeerConnection.class)); verifyZeroInteractions(ethPeer); } @@ -63,41 +57,7 @@ public class EthSynchronizerUpdaterTest { final EthSynchronizerUpdater updater = new EthSynchronizerUpdater(ethPeers); final long suppliedChainHeight = 6L; - updater.updatePeerChainState(suppliedChainHeight, createAnonymousPeerConnection()); + updater.updatePeerChainState(suppliedChainHeight, mock(PeerConnection.class)); verify(chainState, times(1)).updateHeightEstimate(eq(suppliedChainHeight)); } - - private PeerConnection createAnonymousPeerConnection() { - return new PeerConnection() { - @Override - public void send(final Capability capability, final MessageData message) - throws PeerNotConnected {} - - @Override - public Set getAgreedCapabilities() { - return null; - } - - @Override - public PeerInfo getPeer() { - return new PeerInfo(0, null, null, 0, null); - } - - @Override - public void terminateConnection(final DisconnectReason reason, final boolean peerInitiated) {} - - @Override - public void disconnect(final DisconnectReason reason) {} - - @Override - public SocketAddress getLocalAddress() { - return null; - } - - @Override - public SocketAddress getRemoteAddress() { - return null; - } - }; - } } diff --git a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/mainnet/MainnetBlockHeaderValidator.java b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/mainnet/MainnetBlockHeaderValidator.java index dee87d5b29..0b4cbb33ae 100644 --- a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/mainnet/MainnetBlockHeaderValidator.java +++ b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/mainnet/MainnetBlockHeaderValidator.java @@ -26,7 +26,7 @@ import tech.pegasys.pantheon.util.bytes.BytesValue; public final class MainnetBlockHeaderValidator { - private static final BytesValue DAO_EXTRA_DATA = + public static final BytesValue DAO_EXTRA_DATA = BytesValue.fromHexString("0x64616f2d686172642d666f726b"); private static final int MIN_GAS_LIMIT = 5000; private static final long MAX_GAS_LIMIT = 0x7fffffffffffffffL; @@ -47,6 +47,10 @@ public final class MainnetBlockHeaderValidator { .build(); } + public static boolean validateHeaderForDaoFork(final BlockHeader header) { + return header.getExtraData().equals(DAO_EXTRA_DATA); + } + static BlockHeaderValidator createOmmerValidator( final DifficultyCalculator difficultyCalculator) { return new BlockHeaderValidator.Builder() diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeer.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeer.java index a21306e4e6..2d9cdd6968 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeer.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeer.java @@ -84,6 +84,10 @@ public class EthPeer { this.onStatusesExchanged.set(onStatusesExchanged); } + public boolean isDisconnected() { + return connection.isDisconnected(); + } + public long addChainEstimatedHeightListener(final EstimatedHeightListener listener) { return chainHeadState.addEstimatedHeightListener(listener); } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthScheduler.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthScheduler.java index 3521582f8d..ed9f337e13 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthScheduler.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthScheduler.java @@ -51,8 +51,8 @@ public class EthScheduler { protected final ExecutorService syncWorkerExecutor; protected final ScheduledExecutorService scheduler; protected final ExecutorService txWorkerExecutor; - private final ExecutorService servicesExecutor; - private final ExecutorService computationExecutor; + protected final ExecutorService servicesExecutor; + protected final ExecutorService computationExecutor; private final Collection> serviceFutures = new ConcurrentLinkedDeque<>(); diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractGetHeadersFromPeerTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractGetHeadersFromPeerTask.java index c3451ef102..f99c60fa4a 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractGetHeadersFromPeerTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractGetHeadersFromPeerTask.java @@ -26,7 +26,6 @@ import tech.pegasys.pantheon.metrics.OperationTimer; import java.util.ArrayList; import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.Optional; @@ -73,13 +72,17 @@ public abstract class AbstractGetHeadersFromPeerTask } final BlockHeadersMessage headersMessage = BlockHeadersMessage.readFrom(message); - final Iterator headers = headersMessage.getHeaders(protocolSchedule); - if (!headers.hasNext()) { + final List headers = headersMessage.getHeaders(protocolSchedule); + if (headers.isEmpty()) { // Message contains no data - nothing to do return Optional.empty(); } + if (headers.size() > count) { + // Too many headers - this isn't our response + return Optional.empty(); + } - final BlockHeader firstHeader = headers.next(); + final BlockHeader firstHeader = headers.get(0); if (!matchesFirstHeader(firstHeader)) { // This isn't our message - nothing to do return Optional.empty(); @@ -90,17 +93,14 @@ public abstract class AbstractGetHeadersFromPeerTask long prevNumber = firstHeader.getNumber(); final int expectedDelta = reverse ? -(skip + 1) : (skip + 1); - while (headers.hasNext()) { - final BlockHeader header = headers.next(); + for (int i = 1; i < headers.size(); i++) { + final BlockHeader header = headers.get(i); if (header.getNumber() != prevNumber + expectedDelta) { // Skip doesn't match, this isn't our data return Optional.empty(); } prevNumber = header.getNumber(); headersList.add(header); - if (headersList.size() == count) { - break; - } } LOG.debug("Received {} of {} headers requested from peer.", headersList.size(), count); diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractPeerRequestTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractPeerRequestTask.java index f725ad3d40..bbe6767c5b 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractPeerRequestTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractPeerRequestTask.java @@ -24,12 +24,15 @@ import tech.pegasys.pantheon.metrics.LabelledMetric; import tech.pegasys.pantheon.metrics.OperationTimer; import tech.pegasys.pantheon.util.ExceptionUtils; +import java.time.Duration; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeoutException; public abstract class AbstractPeerRequestTask extends AbstractPeerTask { + private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(5); + private Duration timeout = DEFAULT_TIMEOUT; private final int requestCode; private volatile ResponseStream responseStream; @@ -41,6 +44,11 @@ public abstract class AbstractPeerRequestTask extends AbstractPeerTask { this.requestCode = requestCode; } + public AbstractPeerRequestTask setTimeout(final Duration timeout) { + this.timeout = timeout; + return this; + } + @Override protected final void executeTaskWithPeer(final EthPeer peer) throws PeerNotConnected { final CompletableFuture promise = new CompletableFuture<>(); @@ -63,7 +71,7 @@ public abstract class AbstractPeerRequestTask extends AbstractPeerTask { } }); - ethContext.getScheduler().failAfterTimeout(promise); + ethContext.getScheduler().failAfterTimeout(promise, timeout); } private void handleMessage( diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/BlockHeadersMessage.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/BlockHeadersMessage.java index 2d3bd941f7..8c80867b60 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/BlockHeadersMessage.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/BlockHeadersMessage.java @@ -22,7 +22,8 @@ import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPInput; import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPOutput; import tech.pegasys.pantheon.util.bytes.BytesValue; -import java.util.Iterator; +import java.util.Arrays; +import java.util.List; public final class BlockHeadersMessage extends AbstractMessageData { @@ -38,6 +39,10 @@ public final class BlockHeadersMessage extends AbstractMessageData { return new BlockHeadersMessage(message.getData()); } + public static BlockHeadersMessage create(final BlockHeader... headers) { + return create(Arrays.asList(headers)); + } + public static BlockHeadersMessage create(final Iterable headers) { final BytesValueRLPOutput tmp = new BytesValueRLPOutput(); tmp.startList(); @@ -57,11 +62,10 @@ public final class BlockHeadersMessage extends AbstractMessageData { return EthPV62.BLOCK_HEADERS; } - public Iterator getHeaders(final ProtocolSchedule protocolSchedule) { + public List getHeaders(final ProtocolSchedule protocolSchedule) { final BlockHashFunction blockHashFunction = ScheduleBasedBlockHashFunction.create(protocolSchedule); return new BytesValueRLPInput(data, false) - .readList(rlp -> BlockHeader.readFrom(rlp, blockHashFunction)) - .iterator(); + .readList(rlp -> BlockHeader.readFrom(rlp, blockHashFunction)); } } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/peervalidation/DaoForkPeerValidator.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/peervalidation/DaoForkPeerValidator.java new file mode 100644 index 0000000000..b9f183454b --- /dev/null +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/peervalidation/DaoForkPeerValidator.java @@ -0,0 +1,132 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.peervalidation; + +import static com.google.common.base.Preconditions.checkArgument; + +import tech.pegasys.pantheon.ethereum.core.BlockHeader; +import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; +import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; +import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPeerTask; +import tech.pegasys.pantheon.ethereum.eth.manager.task.GetHeadersFromPeerByNumberTask; +import tech.pegasys.pantheon.ethereum.mainnet.MainnetBlockHeaderValidator; +import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; + +import java.time.Duration; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class DaoForkPeerValidator implements PeerValidator { + private static final Logger LOG = LogManager.getLogger(); + private static long DEFAULT_CHAIN_HEIGHT_ESTIMATION_BUFFER = 10L; + + private final EthContext ethContext; + private final ProtocolSchedule protocolSchedule; + private final LabelledMetric ethTasksTimer; + + private final long daoBlockNumber; + // Wait for peer's chainhead to advance some distance beyond daoBlockNumber before validating + private final long chainHeightEstimationBuffer; + + public DaoForkPeerValidator( + final EthContext ethContext, + final ProtocolSchedule protocolSchedule, + final LabelledMetric ethTasksTimer, + final long daoBlockNumber, + final long chainHeightEstimationBuffer) { + checkArgument(chainHeightEstimationBuffer >= 0); + this.ethContext = ethContext; + this.protocolSchedule = protocolSchedule; + this.ethTasksTimer = ethTasksTimer; + this.daoBlockNumber = daoBlockNumber; + this.chainHeightEstimationBuffer = chainHeightEstimationBuffer; + } + + public DaoForkPeerValidator( + final EthContext ethContext, + final ProtocolSchedule protocolSchedule, + final LabelledMetric ethTasksTimer, + final long daoBlockNumber) { + this( + ethContext, + protocolSchedule, + ethTasksTimer, + daoBlockNumber, + DEFAULT_CHAIN_HEIGHT_ESTIMATION_BUFFER); + } + + @Override + public CompletableFuture validatePeer(final EthPeer ethPeer) { + AbstractPeerTask> getHeaderTask = + GetHeadersFromPeerByNumberTask.forSingleNumber( + protocolSchedule, ethContext, daoBlockNumber, ethTasksTimer) + .setTimeout(Duration.ofSeconds(20)) + .assignPeer(ethPeer); + return getHeaderTask + .run() + .handle( + (res, err) -> { + if (err != null) { + // Mark peer as invalid on error + LOG.debug( + "Peer {} is invalid because DAO block ({}) is unavailable: {}", + ethPeer, + daoBlockNumber, + err.toString()); + return false; + } + List headers = res.getResult(); + if (headers.size() == 0) { + // If no headers are returned, fail + LOG.debug( + "Peer {} is invalid because DAO block ({}) is unavailable.", + ethPeer, + daoBlockNumber); + return false; + } + BlockHeader header = headers.get(0); + boolean validDaoBlock = MainnetBlockHeaderValidator.validateHeaderForDaoFork(header); + if (!validDaoBlock) { + LOG.debug( + "Peer {} is invalid because DAO block ({}) is invalid.", + ethPeer, + daoBlockNumber); + } + return validDaoBlock; + }); + } + + @Override + public boolean canBeValidated(final EthPeer ethPeer) { + return ethPeer.chainState().getEstimatedHeight() + >= (daoBlockNumber + chainHeightEstimationBuffer); + } + + @Override + public Duration nextValidationCheckTimeout(final EthPeer ethPeer) { + if (!ethPeer.chainState().hasEstimatedHeight()) { + return Duration.ofSeconds(30); + } + long distanceToDaoBlock = daoBlockNumber - ethPeer.chainState().getEstimatedHeight(); + if (distanceToDaoBlock < 100_000L) { + return Duration.ofMinutes(1); + } + // If the peer is trailing behind, give it some time to catch up before trying again. + return Duration.ofMinutes(10); + } +} diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/peervalidation/PeerValidator.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/peervalidation/PeerValidator.java new file mode 100644 index 0000000000..62b58c62e8 --- /dev/null +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/peervalidation/PeerValidator.java @@ -0,0 +1,55 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.peervalidation; + +import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; +import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; + +public interface PeerValidator { + + /** + * Whether the peer can currently be validated. + * + * @param ethPeer The peer that need validation. + * @return {@code} True if peer can be validated now. + */ + boolean canBeValidated(final EthPeer ethPeer); + + /** + * If the peer cannot currently be validated, returns a timeout indicating how long to wait. + * before trying to validate the peer again. + * + * @param ethPeer The peer to be validated. + * @return A duration representing how long to wait before trying to validate this peer again. + */ + Duration nextValidationCheckTimeout(final EthPeer ethPeer); + + /** + * Validates the given peer. + * + * @param ethPeer The peer to be validated. + * @return True if the peer is valid, false otherwise. + */ + CompletableFuture validatePeer(final EthPeer ethPeer); + + /** + * @param ethPeer The peer to be disconnected. + * @return The reason for disconnecting. + */ + default DisconnectReason getDisconnectReason(final EthPeer ethPeer) { + return DisconnectReason.SUBPROTOCOL_TRIGGERED; + } +} diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/peervalidation/PeerValidatorRunner.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/peervalidation/PeerValidatorRunner.java new file mode 100644 index 0000000000..c81fcf320f --- /dev/null +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/peervalidation/PeerValidatorRunner.java @@ -0,0 +1,67 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.peervalidation; + +import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; +import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; + +import java.time.Duration; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class PeerValidatorRunner { + private static final Logger LOG = LogManager.getLogger(); + protected final EthContext ethContext; + private final PeerValidator peerValidator; + + PeerValidatorRunner(final EthContext ethContext, final PeerValidator peerValidator) { + this.ethContext = ethContext; + this.peerValidator = peerValidator; + + ethContext.getEthPeers().subscribeConnect(this::checkPeer); + } + + public static void runValidator(final EthContext ethContext, final PeerValidator peerValidator) { + new PeerValidatorRunner(ethContext, peerValidator); + } + + public void checkPeer(final EthPeer ethPeer) { + if (peerValidator.canBeValidated(ethPeer)) { + peerValidator + .validatePeer(ethPeer) + .whenComplete( + (validated, err) -> { + if (err != null || !validated) { + // Disconnect invalid peer + disconnectPeer(ethPeer); + } + }); + } else if (!ethPeer.isDisconnected()) { + scheduleNextCheck(ethPeer); + } + } + + protected void disconnectPeer(final EthPeer ethPeer) { + LOG.debug( + "Disconnecting from peer {} marked invalid by {}", + ethPeer, + peerValidator.getClass().getSimpleName()); + ethPeer.disconnect(peerValidator.getDisconnectReason(ethPeer)); + } + + protected void scheduleNextCheck(final EthPeer ethPeer) { + Duration timeout = peerValidator.nextValidationCheckTimeout(ethPeer); + ethContext.getScheduler().scheduleFutureTask(() -> checkPeer(ethPeer), timeout); + } +} diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java index 9a50612103..3baf0d1430 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java @@ -56,14 +56,12 @@ public class DefaultSynchronizer implements Synchronizer { final EthContext ethContext, final SyncState syncState, final Path dataDirectory, - final MetricsSystem metricsSystem) { + final MetricsSystem metricsSystem, + final LabelledMetric ethTasksTimer) { this.syncConfig = syncConfig; this.ethContext = ethContext; this.syncState = syncState; - final LabelledMetric ethTasksTimer = - metricsSystem.createLabelledTimer( - MetricCategory.SYNCHRONIZER, "task", "Internal processing tasks", "taskName"); this.blockPropagationManager = new BlockPropagationManager<>( syncConfig, @@ -95,6 +93,28 @@ public class DefaultSynchronizer implements Synchronizer { syncState); } + public DefaultSynchronizer( + final SynchronizerConfiguration syncConfig, + final ProtocolSchedule protocolSchedule, + final ProtocolContext protocolContext, + final WorldStateStorage worldStateStorage, + final EthContext ethContext, + final SyncState syncState, + final Path dataDirectory, + final MetricsSystem metricsSystem) { + this( + syncConfig, + protocolSchedule, + protocolContext, + worldStateStorage, + ethContext, + syncState, + dataDirectory, + metricsSystem, + metricsSystem.createLabelledTimer( + MetricCategory.SYNCHRONIZER, "task", "Internal processing tasks", "taskName")); + } + @Override public void start() { if (started.compareAndSet(false, true)) { diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/DeterministicEthScheduler.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/DeterministicEthScheduler.java index e57e14abbd..bdbb2c6775 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/DeterministicEthScheduler.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/DeterministicEthScheduler.java @@ -13,6 +13,8 @@ package tech.pegasys.pantheon.ethereum.eth.manager; import java.time.Duration; +import java.util.Arrays; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -22,6 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger; public class DeterministicEthScheduler extends EthScheduler { private final TimeoutPolicy timeoutPolicy; + private final List executors; public DeterministicEthScheduler() { this(TimeoutPolicy.NEVER); @@ -34,7 +37,24 @@ public class DeterministicEthScheduler extends EthScheduler { new MockExecutorService(), new MockExecutorService(), new MockExecutorService()); + this.timeoutPolicy = timeoutPolicy; + this.executors = + Arrays.asList( + (MockExecutorService) this.syncWorkerExecutor, + (MockExecutorService) this.scheduler, + (MockExecutorService) this.txWorkerExecutor, + (MockExecutorService) this.servicesExecutor, + (MockExecutorService) this.computationExecutor); + } + + // Test utility for running pending futures + public void runPendingFutures() { + executors.forEach(MockExecutorService::runPendingFutures); + } + + public void disableAutoRun() { + executors.forEach(e -> e.setAutoRun(false)); } MockExecutorService mockSyncWorkerExecutor() { @@ -62,6 +82,7 @@ public class DeterministicEthScheduler extends EthScheduler { @FunctionalInterface public interface TimeoutPolicy { TimeoutPolicy NEVER = () -> false; + TimeoutPolicy ALWAYS = () -> true; boolean shouldTimeout(); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTestUtil.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTestUtil.java index 6a2680a157..753578144c 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTestUtil.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTestUtil.java @@ -12,6 +12,7 @@ */ package tech.pegasys.pantheon.ethereum.eth.manager; +import static com.google.common.base.Preconditions.checkArgument; import static tech.pegasys.pantheon.ethereum.core.InMemoryStorageProvider.createInMemoryBlockchain; import static tech.pegasys.pantheon.ethereum.core.InMemoryStorageProvider.createInMemoryWorldStateArchive; @@ -71,6 +72,30 @@ public class EthProtocolManagerTestUtil { return create(blockchain, worldStateArchive, timeoutPolicy); } + // Utility to prevent scheduler from automatically running submitted tasks + public static void disableEthSchedulerAutoRun(final EthProtocolManager ethProtocolManager) { + EthScheduler scheduler = ethProtocolManager.ethContext().getScheduler(); + checkArgument( + scheduler instanceof DeterministicEthScheduler, + "EthProtocolManager must be set up with " + + DeterministicEthScheduler.class.getSimpleName() + + " in order to disable auto run."); + ((DeterministicEthScheduler) scheduler).disableAutoRun(); + } + + // Manually runs any pending tasks submitted to the EthScheduler + // Works with {@code disableEthSchedulerAutoRun} - tasks will only be pending if + // autoRun has been disabled. + public static void runPendingFutures(final EthProtocolManager ethProtocolManager) { + EthScheduler scheduler = ethProtocolManager.ethContext().getScheduler(); + checkArgument( + scheduler instanceof DeterministicEthScheduler, + "EthProtocolManager must be set up with " + + DeterministicEthScheduler.class.getSimpleName() + + " in order to manually run pending futures."); + ((DeterministicEthScheduler) scheduler).runPendingFutures(); + } + public static void broadcastMessage( final EthProtocolManager ethProtocolManager, final RespondingEthPeer peer, diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthSchedulerTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthSchedulerTest.java index 2ed3a8726e..dfcbc08131 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthSchedulerTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthSchedulerTest.java @@ -84,8 +84,8 @@ public class EthSchedulerTest { final CompletableFuture result = ethScheduler.scheduleSyncWorkerTask(() -> new CompletableFuture<>()); - assertThat(syncWorkerExecutor.getScheduledFutures().size()).isEqualTo(1); - final Future future = syncWorkerExecutor.getScheduledFutures().get(0); + assertThat(syncWorkerExecutor.getFutures().size()).isEqualTo(1); + final Future future = syncWorkerExecutor.getFutures().get(0); verify(future, times(0)).cancel(anyBoolean()); result.cancel(true); @@ -136,8 +136,8 @@ public class EthSchedulerTest { final CompletableFuture result = ethScheduler.scheduleFutureTask(() -> new CompletableFuture<>(), Duration.ofMillis(100)); - assertThat(scheduledExecutor.getScheduledFutures().size()).isEqualTo(1); - final Future future = scheduledExecutor.getScheduledFutures().get(0); + assertThat(scheduledExecutor.getFutures().size()).isEqualTo(1); + final Future future = scheduledExecutor.getFutures().get(0); verify(future, times(0)).cancel(anyBoolean()); result.cancel(true); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/MockExecutorService.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/MockExecutorService.java index 24240f4393..555898b8dd 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/MockExecutorService.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/MockExecutorService.java @@ -25,14 +25,25 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; class MockExecutorService implements ExecutorService { - private final List> scheduledFutures = new ArrayList<>(); + private boolean autoRun = true; + private final List> tasks = new ArrayList<>(); - // Test utility for inspecting scheduled futures - public List> getScheduledFutures() { - return scheduledFutures; + // Test utility for inspecting executor's futures + public List> getFutures() { + return tasks.stream().map(ExecutorTask::getFuture).collect(Collectors.toList()); + } + + public void setAutoRun(final boolean shouldAutoRunTasks) { + this.autoRun = shouldAutoRunTasks; + } + + public void runPendingFutures() { + ArrayList> currentTasks = new ArrayList<>(tasks); + currentTasks.forEach(ExecutorTask::run); } @Override @@ -61,44 +72,31 @@ class MockExecutorService implements ExecutorService { @Override public Future submit(final Callable task) { - CompletableFuture future = new CompletableFuture<>(); - try { - final T result = task.call(); - future.complete(result); - } catch (final Exception e) { - future.completeExceptionally(e); + ExecutorTask execTask = new ExecutorTask<>(task::call); + tasks.add(execTask); + if (autoRun) { + execTask.run(); } - future = spy(future); - scheduledFutures.add(future); - return future; + + return execTask.getFuture(); } @Override public Future submit(final Runnable task, final T result) { - CompletableFuture future = new CompletableFuture<>(); - try { - task.run(); - future.complete(result); - } catch (final Exception e) { - future.completeExceptionally(e); - } - future = spy(future); - scheduledFutures.add(future); - return future; + return submit( + () -> { + task.run(); + return result; + }); } @Override public Future submit(final Runnable task) { - CompletableFuture future = new CompletableFuture<>(); - try { - task.run(); - future.complete(null); - } catch (final Exception e) { - future.completeExceptionally(e); - } - future = spy(future); - scheduledFutures.add(future); - return future; + return submit( + () -> { + task.run(); + return null; + }); } @Override @@ -129,4 +127,37 @@ class MockExecutorService implements ExecutorService { @Override public void execute(final Runnable command) {} + + private static class ExecutorTask { + private final CompletableFuture future; + private final Callable taskRunner; + private boolean isPending = true; + + private ExecutorTask(final Callable taskRunner) { + this.future = spy(new CompletableFuture<>()); + this.taskRunner = taskRunner; + } + + public void run() { + if (!isPending) { + return; + } + + isPending = false; + try { + T result = taskRunner.call(); + future.complete(result); + } catch (final Exception e) { + future.completeExceptionally(e); + } + } + + public CompletableFuture getFuture() { + return future; + } + + public boolean isPending() { + return isPending; + } + } } diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/MockPeerConnection.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/MockPeerConnection.java index b8c4f5df54..99d442a41d 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/MockPeerConnection.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/MockPeerConnection.java @@ -89,6 +89,7 @@ public class MockPeerConnection implements PeerConnection { throw new UnsupportedOperationException(); } + @Override public boolean isDisconnected() { return disconnected; } diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/RespondingEthPeer.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/RespondingEthPeer.java index dbefd235d5..6040982837 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/RespondingEthPeer.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/RespondingEthPeer.java @@ -46,6 +46,7 @@ import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; import java.util.stream.Stream; import com.google.common.collect.Lists; @@ -210,6 +211,18 @@ public class RespondingEthPeer { return !outgoingMessages.isEmpty(); } + public static Responder targetedResponder( + final BiFunction requestFilter, + final BiFunction responseGenerator) { + return (cap, msg) -> { + if (requestFilter.apply(cap, msg)) { + return Optional.of(responseGenerator.apply(cap, msg)); + } else { + return Optional.empty(); + } + }; + } + public static Responder blockchainResponder(final Blockchain blockchain) { return blockchainResponder(blockchain, createInMemoryWorldStateArchive()); } diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/BlockHeadersMessageTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/BlockHeadersMessageTest.java index f9156d871c..a7b68940e9 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/BlockHeadersMessageTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/BlockHeadersMessageTest.java @@ -27,7 +27,6 @@ import tech.pegasys.pantheon.util.bytes.BytesValue; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import com.google.common.io.Resources; @@ -57,13 +56,13 @@ public final class BlockHeadersMessageTest { final MessageData initialMessage = BlockHeadersMessage.create(headers); final MessageData raw = new RawMessage(EthPV62.BLOCK_HEADERS, initialMessage.getData()); final BlockHeadersMessage message = BlockHeadersMessage.readFrom(raw); - final Iterator readHeaders = + final List readHeaders = message.getHeaders( FixedDifficultyProtocolSchedule.create( GenesisConfigFile.development().getConfigOptions(), PrivacyParameters.noPrivacy())); for (int i = 0; i < 50; ++i) { - Assertions.assertThat(readHeaders.next()).isEqualTo(headers.get(i)); + Assertions.assertThat(readHeaders.get(i)).isEqualTo(headers.get(i)); } } } diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/peervalidation/DaoForkPeerValidatorTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/peervalidation/DaoForkPeerValidatorTest.java new file mode 100644 index 0000000000..4cd9cdacaf --- /dev/null +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/peervalidation/DaoForkPeerValidatorTest.java @@ -0,0 +1,236 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.peervalidation; + +import static org.assertj.core.api.Assertions.assertThat; + +import tech.pegasys.pantheon.ethereum.core.Block; +import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator; +import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator.BlockOptions; +import tech.pegasys.pantheon.ethereum.eth.manager.DeterministicEthScheduler.TimeoutPolicy; +import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; +import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManager; +import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil; +import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer; +import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.Responder; +import tech.pegasys.pantheon.ethereum.eth.messages.BlockHeadersMessage; +import tech.pegasys.pantheon.ethereum.eth.messages.EthPV62; +import tech.pegasys.pantheon.ethereum.eth.messages.GetBlockHeadersMessage; +import tech.pegasys.pantheon.ethereum.mainnet.MainnetBlockHeaderValidator; +import tech.pegasys.pantheon.ethereum.mainnet.MainnetProtocolSchedule; +import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; +import tech.pegasys.pantheon.util.bytes.BytesValue; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.junit.Test; + +public class DaoForkPeerValidatorTest { + + @Test + public void validatePeer_responsivePeerOnRightSideOfFork() { + EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); + BlockDataGenerator gen = new BlockDataGenerator(1); + long daoBlockNumber = 500; + Block daoBlock = + gen.block( + BlockOptions.create() + .setBlockNumber(daoBlockNumber) + .setExtraData(MainnetBlockHeaderValidator.DAO_EXTRA_DATA)); + + PeerValidator validator = + new DaoForkPeerValidator( + ethProtocolManager.ethContext(), + MainnetProtocolSchedule.create(), + NoOpMetricsSystem.NO_OP_LABELLED_TIMER, + daoBlockNumber, + 0); + + RespondingEthPeer peer = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, daoBlockNumber); + + CompletableFuture result = validator.validatePeer(peer.getEthPeer()); + + assertThat(result).isNotDone(); + + // Send response for dao block + AtomicBoolean daoBlockRequested = respondToDaoBlockRequest(peer, daoBlock); + + assertThat(daoBlockRequested).isTrue(); + assertThat(result).isDone(); + assertThat(result).isCompletedWithValue(true); + } + + @Test + public void validatePeer_responsivePeerOnWrongSideOfFork() { + EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); + BlockDataGenerator gen = new BlockDataGenerator(1); + long daoBlockNumber = 500; + Block daoBlock = + gen.block( + BlockOptions.create().setBlockNumber(daoBlockNumber).setExtraData(BytesValue.EMPTY)); + + PeerValidator validator = + new DaoForkPeerValidator( + ethProtocolManager.ethContext(), + MainnetProtocolSchedule.create(), + NoOpMetricsSystem.NO_OP_LABELLED_TIMER, + daoBlockNumber, + 0); + + RespondingEthPeer peer = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, daoBlockNumber); + + CompletableFuture result = validator.validatePeer(peer.getEthPeer()); + + assertThat(result).isNotDone(); + + // Send response for dao block + AtomicBoolean daoBlockRequested = respondToDaoBlockRequest(peer, daoBlock); + + assertThat(daoBlockRequested).isTrue(); + assertThat(result).isDone(); + assertThat(result).isCompletedWithValue(false); + } + + @Test + public void validatePeer_unresponsivePeer() { + EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(TimeoutPolicy.ALWAYS); + long daoBlockNumber = 500; + + PeerValidator validator = + new DaoForkPeerValidator( + ethProtocolManager.ethContext(), + MainnetProtocolSchedule.create(), + NoOpMetricsSystem.NO_OP_LABELLED_TIMER, + daoBlockNumber, + 0); + + RespondingEthPeer peer = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, daoBlockNumber); + + CompletableFuture result = validator.validatePeer(peer.getEthPeer()); + + // Request should timeout immediately + assertThat(result).isDone(); + assertThat(result).isCompletedWithValue(false); + } + + @Test + public void validatePeer_requestBlockFromPeerBeingTested() { + EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); + BlockDataGenerator gen = new BlockDataGenerator(1); + long daoBlockNumber = 500; + Block daoBlock = + gen.block( + BlockOptions.create() + .setBlockNumber(daoBlockNumber) + .setExtraData(MainnetBlockHeaderValidator.DAO_EXTRA_DATA)); + + PeerValidator validator = + new DaoForkPeerValidator( + ethProtocolManager.ethContext(), + MainnetProtocolSchedule.create(), + NoOpMetricsSystem.NO_OP_LABELLED_TIMER, + daoBlockNumber, + 0); + + int peerCount = 1000; + List otherPeers = + Stream.generate( + () -> EthProtocolManagerTestUtil.createPeer(ethProtocolManager, daoBlockNumber)) + .limit(peerCount) + .collect(Collectors.toList()); + RespondingEthPeer targetPeer = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, daoBlockNumber); + + CompletableFuture result = validator.validatePeer(targetPeer.getEthPeer()); + + assertThat(result).isNotDone(); + + // Other peers should not receive request for dao block + for (RespondingEthPeer otherPeer : otherPeers) { + AtomicBoolean daoBlockRequestedForOtherPeer = respondToDaoBlockRequest(otherPeer, daoBlock); + assertThat(daoBlockRequestedForOtherPeer).isFalse(); + } + + // Target peer should receive request for dao block + final AtomicBoolean daoBlockRequested = respondToDaoBlockRequest(targetPeer, daoBlock); + assertThat(daoBlockRequested).isTrue(); + } + + @Test + public void canBeValidated() { + BlockDataGenerator gen = new BlockDataGenerator(1); + EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(TimeoutPolicy.ALWAYS); + long daoBlockNumber = 500; + long buffer = 10; + + PeerValidator validator = + new DaoForkPeerValidator( + ethProtocolManager.ethContext(), + MainnetProtocolSchedule.create(), + NoOpMetricsSystem.NO_OP_LABELLED_TIMER, + daoBlockNumber, + buffer); + + EthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0).getEthPeer(); + + peer.chainState().update(gen.hash(), daoBlockNumber - 10); + assertThat(validator.canBeValidated(peer)).isFalse(); + + peer.chainState().update(gen.hash(), daoBlockNumber); + assertThat(validator.canBeValidated(peer)).isFalse(); + + peer.chainState().update(gen.hash(), daoBlockNumber + buffer - 1); + assertThat(validator.canBeValidated(peer)).isFalse(); + + peer.chainState().update(gen.hash(), daoBlockNumber + buffer); + assertThat(validator.canBeValidated(peer)).isTrue(); + + peer.chainState().update(gen.hash(), daoBlockNumber + buffer + 10); + assertThat(validator.canBeValidated(peer)).isTrue(); + } + + private AtomicBoolean respondToDaoBlockRequest( + final RespondingEthPeer peer, final Block daoBlock) { + AtomicBoolean daoBlockRequested = new AtomicBoolean(false); + + Responder responder = + RespondingEthPeer.targetedResponder( + (cap, msg) -> { + if (msg.getCode() != EthPV62.GET_BLOCK_HEADERS) { + return false; + } + GetBlockHeadersMessage headersRequest = GetBlockHeadersMessage.readFrom(msg); + boolean isDaoBlockRequest = + headersRequest.blockNumber().isPresent() + && headersRequest.blockNumber().getAsLong() + == daoBlock.getHeader().getNumber(); + if (isDaoBlockRequest) { + daoBlockRequested.set(true); + } + return isDaoBlockRequest; + }, + (cap, msg) -> BlockHeadersMessage.create(daoBlock.getHeader())); + + // Respond + peer.respond(responder); + + return daoBlockRequested; + } +} diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/peervalidation/PeerValidatorRunnerTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/peervalidation/PeerValidatorRunnerTest.java new file mode 100644 index 0000000000..a2656784b4 --- /dev/null +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/peervalidation/PeerValidatorRunnerTest.java @@ -0,0 +1,122 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.peervalidation; + +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; +import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManager; +import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil; +import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; + +import org.junit.Test; + +public class PeerValidatorRunnerTest { + + @Test + public void checkPeer_schedulesFutureCheckWhenPeerNotReady() { + EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); + EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager); + EthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager).getEthPeer(); + + PeerValidator validator = mock(PeerValidator.class); + when(validator.canBeValidated(eq(peer))).thenReturn(false); + when(validator.nextValidationCheckTimeout(eq(peer))).thenReturn(Duration.ofSeconds(30)); + + PeerValidatorRunner runner = + spy(new PeerValidatorRunner(ethProtocolManager.ethContext(), validator)); + runner.checkPeer(peer); + + verify(runner, times(1)).checkPeer(eq(peer)); + verify(validator, never()).validatePeer(eq(peer)); + verify(runner, never()).disconnectPeer(eq(peer)); + verify(runner, times(1)).scheduleNextCheck(eq(peer)); + + // Run pending futures to trigger the next check + EthProtocolManagerTestUtil.runPendingFutures(ethProtocolManager); + verify(runner, times(2)).checkPeer(eq(peer)); + verify(validator, never()).validatePeer(eq(peer)); + verify(runner, never()).disconnectPeer(eq(peer)); + verify(runner, times(2)).scheduleNextCheck(eq(peer)); + } + + @Test + public void checkPeer_doesNotScheduleFutureCheckWhenPeerNotReadyAndDisconnected() { + EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); + EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager); + EthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager).getEthPeer(); + peer.disconnect(DisconnectReason.SUBPROTOCOL_TRIGGERED); + + PeerValidator validator = mock(PeerValidator.class); + when(validator.canBeValidated(eq(peer))).thenReturn(false); + when(validator.nextValidationCheckTimeout(eq(peer))).thenReturn(Duration.ofSeconds(30)); + + PeerValidatorRunner runner = + spy(new PeerValidatorRunner(ethProtocolManager.ethContext(), validator)); + runner.checkPeer(peer); + + verify(runner, times(1)).checkPeer(eq(peer)); + verify(validator, never()).validatePeer(eq(peer)); + verify(runner, never()).disconnectPeer(eq(peer)); + verify(runner, times(0)).scheduleNextCheck(eq(peer)); + } + + @Test + public void checkPeer_handlesInvalidPeer() { + EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); + EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager); + EthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager).getEthPeer(); + + PeerValidator validator = mock(PeerValidator.class); + when(validator.canBeValidated(eq(peer))).thenReturn(true); + when(validator.validatePeer(eq(peer))).thenReturn(CompletableFuture.completedFuture(false)); + when(validator.nextValidationCheckTimeout(eq(peer))).thenReturn(Duration.ofSeconds(30)); + + PeerValidatorRunner runner = + spy(new PeerValidatorRunner(ethProtocolManager.ethContext(), validator)); + runner.checkPeer(peer); + + verify(validator, times(1)).validatePeer(eq(peer)); + verify(runner, times(1)).disconnectPeer(eq(peer)); + verify(runner, never()).scheduleNextCheck(eq(peer)); + } + + @Test + public void checkPeer_handlesValidPeer() { + EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); + EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager); + EthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager).getEthPeer(); + + PeerValidator validator = mock(PeerValidator.class); + when(validator.canBeValidated(eq(peer))).thenReturn(true); + when(validator.validatePeer(eq(peer))).thenReturn(CompletableFuture.completedFuture(true)); + when(validator.nextValidationCheckTimeout(eq(peer))).thenReturn(Duration.ofSeconds(30)); + + PeerValidatorRunner runner = + spy(new PeerValidatorRunner(ethProtocolManager.ethContext(), validator)); + runner.checkPeer(peer); + + verify(validator, times(1)).validatePeer(eq(peer)); + verify(runner, never()).disconnectPeer(eq(peer)); + verify(runner, never()).scheduleNextCheck(eq(peer)); + } +} diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTaskTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTaskTest.java index 416e20a57b..90902d3938 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTaskTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTaskTest.java @@ -33,7 +33,6 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; -import com.google.common.collect.Streams; import org.junit.Test; public class DownloadHeaderSequenceTaskTest extends RetryingMessageTaskTest> { @@ -122,7 +121,7 @@ public class DownloadHeaderSequenceTaskTest extends RetryingMessageTaskTest headerSubset = - Streams.stream(headersMessage.getHeaders(protocolSchedule)) + headersMessage.getHeaders(protocolSchedule).stream() .filter(h -> h.getNumber() >= referenceHeader.getNumber() - 1L) .collect(Collectors.toList()); return Optional.of(BlockHeadersMessage.create(headerSubset)); diff --git a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/AdminJsonRpcHttpServiceTest.java b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/AdminJsonRpcHttpServiceTest.java index 395f2c4f6b..6729adae75 100644 --- a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/AdminJsonRpcHttpServiceTest.java +++ b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/AdminJsonRpcHttpServiceTest.java @@ -58,9 +58,9 @@ public class AdminJsonRpcHttpServiceTest extends JsonRpcHttpServiceTest { final InetSocketAddress addr60302 = new InetSocketAddress("localhost", 60302); final InetSocketAddress addr60303 = new InetSocketAddress("localhost", 60303); - peerList.add(new MockPeerConnection(info1, addr60301, addr30302)); - peerList.add(new MockPeerConnection(info2, addr30301, addr60302)); - peerList.add(new MockPeerConnection(info3, addr30301, addr60303)); + peerList.add(MockPeerConnection.create(info1, addr60301, addr30302)); + peerList.add(MockPeerConnection.create(info2, addr30301, addr60302)); + peerList.add(MockPeerConnection.create(info3, addr30301, addr60303)); when(peerDiscoveryMock.getPeers()).thenReturn(peerList); diff --git a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/MockPeerConnection.java b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/MockPeerConnection.java index c14159f79b..dfc85a92c6 100644 --- a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/MockPeerConnection.java +++ b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/MockPeerConnection.java @@ -12,67 +12,28 @@ */ package tech.pegasys.pantheon.ethereum.jsonrpc; -import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection; -import tech.pegasys.pantheon.ethereum.p2p.wire.Capability; import tech.pegasys.pantheon.ethereum.p2p.wire.PeerInfo; -import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason; import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.Set; -public class MockPeerConnection implements PeerConnection { +public class MockPeerConnection { PeerInfo peerInfo; InetSocketAddress localAddress; InetSocketAddress remoteAddress; - public MockPeerConnection( + public static PeerConnection create( final PeerInfo peerInfo, final InetSocketAddress localAddress, final InetSocketAddress remoteAddress) { - this.peerInfo = peerInfo; - this.localAddress = localAddress; - this.remoteAddress = remoteAddress; - } - - @Override - public void send(final Capability capability, final MessageData message) { - throw new UnsupportedOperationException(); - } - - @Override - public Set getAgreedCapabilities() { - throw new UnsupportedOperationException(); - } - - @Override - public Capability capability(final String protocol) { - throw new UnsupportedOperationException(); - } - - @Override - public PeerInfo getPeer() { - return peerInfo; - } - - @Override - public void terminateConnection(final DisconnectReason reason, final boolean peerInitiated) { - throw new UnsupportedOperationException(); - } - - @Override - public void disconnect(final DisconnectReason reason) { - throw new UnsupportedOperationException(); - } - - @Override - public SocketAddress getLocalAddress() { - return localAddress; - } + PeerConnection peerConnection = mock(PeerConnection.class); + when(peerConnection.getPeer()).thenReturn(peerInfo); + when(peerConnection.getLocalAddress()).thenReturn(localAddress); + when(peerConnection.getRemoteAddress()).thenReturn(remoteAddress); - @Override - public SocketAddress getRemoteAddress() { - return remoteAddress; + return peerConnection; } } diff --git a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/methods/AdminPeersTest.java b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/methods/AdminPeersTest.java index a092e138c8..346879f725 100644 --- a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/methods/AdminPeersTest.java +++ b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/methods/AdminPeersTest.java @@ -101,7 +101,7 @@ public class AdminPeersTest { final PeerInfo peerInfo = new PeerInfo(5, "0x0", Collections.emptyList(), 30303, BytesValue.EMPTY); final PeerConnection p = - new MockPeerConnection( + MockPeerConnection.create( peerInfo, InetSocketAddress.createUnresolved("1.2.3.4", 9876), InetSocketAddress.createUnresolved("4.3.2.1", 6789)); diff --git a/ethereum/mock-p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/testing/MockNetwork.java b/ethereum/mock-p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/testing/MockNetwork.java index 136f431760..f48c27b2d9 100644 --- a/ethereum/mock-p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/testing/MockNetwork.java +++ b/ethereum/mock-p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/testing/MockNetwork.java @@ -218,6 +218,8 @@ public final class MockNetwork { /** {@link Peer} that this connection originates from. */ private final Peer from; + private boolean disconnected = false; + /** * Peer that this connection targets and that will receive {@link Message}s sent via {@link * #send(Capability, MessageData)}. @@ -267,14 +269,21 @@ public final class MockNetwork { @Override public void terminateConnection(final DisconnectReason reason, final boolean peerInitiated) { + disconnected = true; network.disconnect(this, reason); } @Override public void disconnect(final DisconnectReason reason) { + disconnected = true; network.disconnect(this, reason); } + @Override + public boolean isDisconnected() { + return disconnected; + } + @Override public SocketAddress getLocalAddress() { throw new UnsupportedOperationException(); diff --git a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/api/PeerConnection.java b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/api/PeerConnection.java index f5ec1fd0f0..fef7e71c4b 100644 --- a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/api/PeerConnection.java +++ b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/api/PeerConnection.java @@ -85,6 +85,9 @@ public interface PeerConnection { */ void disconnect(DisconnectReason reason); + /** @return True if the peer is disconnected */ + boolean isDisconnected(); + SocketAddress getLocalAddress(); SocketAddress getRemoteAddress(); diff --git a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/NettyPeerConnection.java b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/NettyPeerConnection.java index ccb3f8c09a..28780ebbf9 100644 --- a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/NettyPeerConnection.java +++ b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/NettyPeerConnection.java @@ -150,7 +150,8 @@ final class NettyPeerConnection implements PeerConnection { } } - private boolean isDisconnected() { + @Override + public boolean isDisconnected() { return disconnected.get(); } diff --git a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/PeerDiscoveryAgentTest.java b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/PeerDiscoveryAgentTest.java index 2f9d891bde..7460d90809 100644 --- a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/PeerDiscoveryAgentTest.java +++ b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/PeerDiscoveryAgentTest.java @@ -13,8 +13,9 @@ package tech.pegasys.pantheon.ethereum.p2p.discovery; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; -import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection; import tech.pegasys.pantheon.ethereum.p2p.discovery.PeerDiscoveryTestHelper.AgentBuilder; import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.FindNeighborsPacketData; @@ -24,15 +25,12 @@ import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.NeighborsPacketData import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.Packet; import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PacketType; import tech.pegasys.pantheon.ethereum.p2p.peers.PeerBlacklist; -import tech.pegasys.pantheon.ethereum.p2p.wire.Capability; import tech.pegasys.pantheon.ethereum.p2p.wire.PeerInfo; import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason; import tech.pegasys.pantheon.util.bytes.BytesValue; -import java.net.SocketAddress; import java.util.Collections; import java.util.List; -import java.util.Set; import java.util.stream.Collectors; import org.junit.Test; @@ -287,36 +285,9 @@ public class PeerDiscoveryAgentTest { } private PeerConnection createAnonymousPeerConnection(final BytesValue id) { - return new PeerConnection() { - @Override - public void send(final Capability capability, final MessageData message) - throws PeerNotConnected {} - - @Override - public Set getAgreedCapabilities() { - return null; - } - - @Override - public PeerInfo getPeer() { - return new PeerInfo(0, null, null, 0, id); - } - - @Override - public void terminateConnection(final DisconnectReason reason, final boolean peerInitiated) {} - - @Override - public void disconnect(final DisconnectReason reason) {} - - @Override - public SocketAddress getLocalAddress() { - return null; - } - - @Override - public SocketAddress getRemoteAddress() { - return null; - } - }; + PeerConnection conn = mock(PeerConnection.class); + PeerInfo peerInfo = new PeerInfo(0, null, null, 0, id); + when(conn.getPeer()).thenReturn(peerInfo); + return conn; } } diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/controller/MainnetPantheonController.java b/pantheon/src/main/java/tech/pegasys/pantheon/controller/MainnetPantheonController.java index 47fc045912..ff8dabafb2 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/controller/MainnetPantheonController.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/controller/MainnetPantheonController.java @@ -26,7 +26,10 @@ import tech.pegasys.pantheon.ethereum.core.PrivacyParameters; import tech.pegasys.pantheon.ethereum.core.Synchronizer; import tech.pegasys.pantheon.ethereum.core.TransactionPool; import tech.pegasys.pantheon.ethereum.eth.EthProtocol; +import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManager; +import tech.pegasys.pantheon.ethereum.eth.peervalidation.DaoForkPeerValidator; +import tech.pegasys.pantheon.ethereum.eth.peervalidation.PeerValidatorRunner; import tech.pegasys.pantheon.ethereum.eth.sync.DefaultSynchronizer; import tech.pegasys.pantheon.ethereum.eth.sync.SyncMode; import tech.pegasys.pantheon.ethereum.eth.sync.SynchronizerConfiguration; @@ -37,11 +40,15 @@ import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; import tech.pegasys.pantheon.ethereum.p2p.api.ProtocolManager; import tech.pegasys.pantheon.ethereum.p2p.config.SubProtocolConfiguration; import tech.pegasys.pantheon.ethereum.storage.StorageProvider; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.MetricCategory; import tech.pegasys.pantheon.metrics.MetricsSystem; +import tech.pegasys.pantheon.metrics.OperationTimer; import java.io.IOException; import java.nio.file.Path; import java.time.Clock; +import java.util.OptionalLong; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -116,6 +123,9 @@ public class MainnetPantheonController implements PantheonController { metricsSystem); final SyncState syncState = new SyncState(blockchain, ethProtocolManager.ethContext().getEthPeers()); + final LabelledMetric ethTasksTimer = + metricsSystem.createLabelledTimer( + MetricCategory.SYNCHRONIZER, "task", "Internal processing tasks", "taskName"); final Synchronizer synchronizer = new DefaultSynchronizer<>( syncConfig, @@ -125,7 +135,18 @@ public class MainnetPantheonController implements PantheonController { ethProtocolManager.ethContext(), syncState, dataDirectory, - metricsSystem); + metricsSystem, + ethTasksTimer); + + OptionalLong daoBlock = genesisConfig.getConfigOptions().getDaoForkBlock(); + if (daoBlock.isPresent()) { + // Setup dao validator + EthContext ethContext = ethProtocolManager.ethContext(); + DaoForkPeerValidator daoForkPeerValidator = + new DaoForkPeerValidator( + ethContext, protocolSchedule, ethTasksTimer, daoBlock.getAsLong()); + PeerValidatorRunner.runValidator(ethContext, daoForkPeerValidator); + } final TransactionPool transactionPool = TransactionPoolFactory.createTransactionPool(