From da651844d85a31a147b4296efe9138b0e20ebcdc Mon Sep 17 00:00:00 2001 From: Adrian Sutton Date: Sun, 3 Feb 2019 12:52:03 +1000 Subject: [PATCH] Handle timeouts when requesting checkpoint headers correctly. (#743) Signed-off-by: Adrian Sutton --- .../ethereum/eth/manager/EthPeers.java | 5 +- .../eth/sync/CheckpointHeaderManager.java | 9 +- .../eth/manager/EthContextTestUtil.java | 28 +++ .../eth/sync/CheckpointHeaderManagerTest.java | 201 ++++++++++++++++++ 4 files changed, 236 insertions(+), 7 deletions(-) create mode 100644 ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthContextTestUtil.java create mode 100644 ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/CheckpointHeaderManagerTest.java diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeers.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeers.java index 4beb7bcc52..377987dee4 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeers.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeers.java @@ -118,9 +118,8 @@ public class EthPeers { @Override public String toString() { final String connectionsList = - String.join( - ",", connections.values().stream().map(EthPeer::toString).collect(Collectors.toList())); - return "EthPeers{" + "connections=" + connectionsList + '}'; + connections.values().stream().map(EthPeer::toString).collect(Collectors.joining(",")); + return "EthPeers{connections=" + connectionsList + '}'; } private void invokeConnectionCallbacks(final EthPeer peer) { diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/CheckpointHeaderManager.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/CheckpointHeaderManager.java index 9fefd5a561..07ab7c0e43 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/CheckpointHeaderManager.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/CheckpointHeaderManager.java @@ -78,9 +78,7 @@ public class CheckpointHeaderManager { return getAdditionalCheckpointHeaders(syncTarget, lastHeader) .thenApply( additionalCheckpoints -> { - if (additionalCheckpoints.isEmpty()) { - checkpointTimeouts++; - } else { + if (!additionalCheckpoints.isEmpty()) { checkpointTimeouts = 0; checkpointHeaders.addAll(additionalCheckpoints); LOG.debug("Tracking {} checkpoint headers", checkpointHeaders.size()); @@ -97,7 +95,7 @@ public class CheckpointHeaderManager { t = ExceptionUtils.rootCause(t); if (t instanceof TimeoutException) { checkpointTimeouts++; - return null; + return emptyList(); } else if (t != null) { // An error occurred, so no new checkpoints to add. return emptyList(); @@ -108,6 +106,9 @@ public class CheckpointHeaderManager { // Don't push header that is already tracked headers.remove(0); } + if (headers.isEmpty()) { + checkpointTimeouts++; + } return headers; }); } diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthContextTestUtil.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthContextTestUtil.java new file mode 100644 index 0000000000..f0bb526aba --- /dev/null +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthContextTestUtil.java @@ -0,0 +1,28 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.manager; + +import tech.pegasys.pantheon.ethereum.eth.manager.DeterministicEthScheduler.TimeoutPolicy; + +public class EthContextTestUtil { + + private static final String PROTOCOL_NAME = "ETH"; + + public static EthContext createTestEthContext(final TimeoutPolicy timeoutPolicy) { + return new EthContext( + PROTOCOL_NAME, + new EthPeers(PROTOCOL_NAME), + new EthMessages(), + new DeterministicEthScheduler(timeoutPolicy)); + } +} diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/CheckpointHeaderManagerTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/CheckpointHeaderManagerTest.java new file mode 100644 index 0000000000..070eb363fe --- /dev/null +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/CheckpointHeaderManagerTest.java @@ -0,0 +1,201 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.sync; + +import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.when; +import static tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem.NO_OP_LABELLED_TIMER; + +import tech.pegasys.pantheon.ethereum.ProtocolContext; +import tech.pegasys.pantheon.ethereum.chain.MutableBlockchain; +import tech.pegasys.pantheon.ethereum.core.BlockHeader; +import tech.pegasys.pantheon.ethereum.core.BlockHeaderTestFixture; +import tech.pegasys.pantheon.ethereum.core.Hash; +import tech.pegasys.pantheon.ethereum.eth.manager.ChainState; +import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; +import tech.pegasys.pantheon.ethereum.eth.manager.EthContextTestUtil; +import tech.pegasys.pantheon.ethereum.eth.manager.EthMessage; +import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; +import tech.pegasys.pantheon.ethereum.eth.manager.RequestManager; +import tech.pegasys.pantheon.ethereum.eth.manager.RequestManager.ResponseStream; +import tech.pegasys.pantheon.ethereum.eth.messages.BlockHeadersMessage; +import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState; +import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncTarget; +import tech.pegasys.pantheon.ethereum.mainnet.MainnetProtocolSchedule; +import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; +import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected; +import tech.pegasys.pantheon.ethereum.worldstate.WorldStateArchive; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; + +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.Before; +import org.junit.Test; + +public class CheckpointHeaderManagerTest { + + private static final BlockHeader GENESIS = block(0); + private static final int SEGMENT_SIZE = 5; + private static final int HEADER_REQUEST_SIZE = 3; + + private static final ProtocolSchedule PROTOCOL_SCHEDULE = MainnetProtocolSchedule.create(); + + private final MutableBlockchain blockchain = mock(MutableBlockchain.class); + private final WorldStateArchive worldStateArchive = mock(WorldStateArchive.class); + private final ProtocolContext protocolContext = + new ProtocolContext<>(blockchain, worldStateArchive, null); + + private final AtomicBoolean timeout = new AtomicBoolean(false); + private final EthContext ethContext = EthContextTestUtil.createTestEthContext(timeout::get); + private final SyncState syncState = new SyncState(blockchain, ethContext.getEthPeers()); + private final LabelledMetric ethTasksTimer = NO_OP_LABELLED_TIMER; + private final EthPeer syncTargetPeer = mock(EthPeer.class); + private final RequestManager requestManager = new RequestManager(syncTargetPeer); + private SyncTarget syncTarget; + + private final CheckpointHeaderManager checkpointHeaderManager = + new CheckpointHeaderManager<>( + SynchronizerConfiguration.builder() + .downloaderChainSegmentSize(SEGMENT_SIZE) + .downloaderHeadersRequestSize(HEADER_REQUEST_SIZE) + .downloaderCheckpointTimeoutsPermitted(2) + .build(), + protocolContext, + ethContext, + syncState, + PROTOCOL_SCHEDULE, + ethTasksTimer); + + @Before + public void setUp() { + when(syncTargetPeer.chainState()).thenReturn(new ChainState()); + syncTarget = syncState.setSyncTarget(syncTargetPeer, GENESIS); + } + + @Test + public void shouldHandleErrorsWhenRequestingHeaders() throws Exception { + when(anyHeadersRequested()).thenThrow(new PeerNotConnected("Nope")); + + assertThat(checkpointHeaderManager.pullCheckpointHeaders(syncTarget)) + .isCompletedWithValue(emptyList()); + } + + @Test + public void shouldHandleTimeouts() throws Exception { + timeout.set(true); + when(anyHeadersRequested()).thenReturn(createResponseStream(), createResponseStream()); + + assertThat(checkpointHeaderManager.pullCheckpointHeaders(syncTarget)) + .isCompletedWithValue(emptyList()); + assertThat(checkpointHeaderManager.checkpointsHaveTimedOut()).isFalse(); + + assertThat(checkpointHeaderManager.pullCheckpointHeaders(syncTarget)) + .isCompletedWithValue(emptyList()); + assertThat(checkpointHeaderManager.checkpointsHaveTimedOut()).isTrue(); + } + + @Test + public void shouldResetTimeoutWhenHeadersReceived() throws Exception { + // Timeout + timeout.set(true); + when(anyHeadersRequested()).thenReturn(createResponseStream()); + + assertThat(checkpointHeaderManager.pullCheckpointHeaders(syncTarget)) + .isCompletedWithValue(emptyList()); + assertThat(checkpointHeaderManager.checkpointsHaveTimedOut()).isFalse(); + + // Receive response + reset(syncTargetPeer); + respondToHeaderRequests(GENESIS, block(5)); + timeout.set(false); + assertThat(checkpointHeaderManager.pullCheckpointHeaders(syncTarget)) + .isCompletedWithValue(asList(GENESIS, block(5))); + assertThat(checkpointHeaderManager.checkpointsHaveTimedOut()).isFalse(); + + // Timeout again but shouldn't have reached threshold + reset(syncTargetPeer); + timeout.set(true); + when(anyHeadersRequested()).thenReturn(createResponseStream()); + assertThat(checkpointHeaderManager.pullCheckpointHeaders(syncTarget)) + .isCompletedWithValue(asList(GENESIS, block(5))); + assertThat(checkpointHeaderManager.checkpointsHaveTimedOut()).isFalse(); + } + + @Test + public void shouldUseReturnedHeadersAsCheckpointHeaders() throws Exception { + respondToHeaderRequests(GENESIS, block(5), block(10)); + + assertThat(checkpointHeaderManager.pullCheckpointHeaders(syncTarget)) + .isCompletedWithValue(asList(GENESIS, block(5), block(10))); + } + + @Test + public void shouldPullAdditionalCheckpointsWhenRequired() throws Exception { + respondToHeaderRequests(GENESIS, block(5)); + respondToHeaderRequests(block(5), block(10), block(15), block(20)); + + // Pull initial headers + assertThat(checkpointHeaderManager.pullCheckpointHeaders(syncTarget)) + .isCompletedWithValue(asList(GENESIS, block(5))); + + assertThat(checkpointHeaderManager.pullCheckpointHeaders(syncTarget)) + .isCompletedWithValue(asList(GENESIS, block(5), block(10), block(15), block(20))); + } + + @Test + public void shouldRemoveImportedCheckpointHeaders() throws Exception { + respondToHeaderRequests(GENESIS, block(5), block(10)); + respondToHeaderRequests(block(10)); + + assertThat(checkpointHeaderManager.pullCheckpointHeaders(syncTarget)) + .isCompletedWithValue(asList(GENESIS, block(5), block(10))); + + when(blockchain.contains(GENESIS.getHash())).thenReturn(true); + when(blockchain.contains(block(5).getHash())).thenReturn(true); + when(blockchain.contains(block(10).getHash())).thenReturn(false); + checkpointHeaderManager.clearImportedCheckpointHeaders(); + + // The first checkpoint header should always be in the blockchain (just as geneis was present) + assertThat(checkpointHeaderManager.pullCheckpointHeaders(syncTarget)) + .isCompletedWithValue(asList(block(5), block(10))); + } + + private void respondToHeaderRequests(final BlockHeader... headers) throws Exception { + final ResponseStream responseStream = createResponseStream(); + when(syncTargetPeer.getHeadersByHash( + headers[0].getHash(), HEADER_REQUEST_SIZE + 1, SEGMENT_SIZE - 1, false)) + .thenReturn(responseStream); + requestManager.dispatchResponse( + new EthMessage(syncTargetPeer, BlockHeadersMessage.create(asList(headers)))); + } + + private static BlockHeader block(final int blockNumber) { + return new BlockHeaderTestFixture().number(blockNumber).buildHeader(); + } + + private ResponseStream createResponseStream() throws PeerNotConnected { + return requestManager.dispatchRequest(() -> {}); + } + + private ResponseStream anyHeadersRequested() throws PeerNotConnected { + return syncTargetPeer.getHeadersByHash(any(Hash.class), anyInt(), anyInt(), anyBoolean()); + } +}