Replace filtering headers after the fact with calculating number to request up-front. (#1216)

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
Adrian Sutton 6 years ago committed by GitHub
parent ee53d3bbfb
commit 26a851e436
  1. 48
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/CheckpointHeaderFetcher.java
  2. 3
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncDownloadPipelineFactory.java
  3. 103
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/CheckpointHeaderFetcherTest.java
  4. 63
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncCheckpointFilterTest.java

@ -12,6 +12,10 @@
*/ */
package tech.pegasys.pantheon.ethereum.eth.sync; package tech.pegasys.pantheon.ethereum.eth.sync;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static java.util.concurrent.CompletableFuture.completedFuture;
import tech.pegasys.pantheon.ethereum.core.BlockHeader; import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
@ -21,48 +25,72 @@ import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.MetricsSystem; import tech.pegasys.pantheon.metrics.MetricsSystem;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.function.UnaryOperator;
public class CheckpointHeaderFetcher { public class CheckpointHeaderFetcher {
private final SynchronizerConfiguration syncConfig; private final SynchronizerConfiguration syncConfig;
private final ProtocolSchedule<?> protocolSchedule; private final ProtocolSchedule<?> protocolSchedule;
private final EthContext ethContext; private final EthContext ethContext;
private final UnaryOperator<List<BlockHeader>> checkpointFilter; private final Optional<BlockHeader> lastCheckpointHeader;
private final MetricsSystem metricsSystem; private final MetricsSystem metricsSystem;
public CheckpointHeaderFetcher( public CheckpointHeaderFetcher(
final SynchronizerConfiguration syncConfig, final SynchronizerConfiguration syncConfig,
final ProtocolSchedule<?> protocolSchedule, final ProtocolSchedule<?> protocolSchedule,
final EthContext ethContext, final EthContext ethContext,
final UnaryOperator<List<BlockHeader>> checkpointFilter, final Optional<BlockHeader> lastCheckpointHeader,
final MetricsSystem metricsSystem) { final MetricsSystem metricsSystem) {
this.syncConfig = syncConfig; this.syncConfig = syncConfig;
this.protocolSchedule = protocolSchedule; this.protocolSchedule = protocolSchedule;
this.ethContext = ethContext; this.ethContext = ethContext;
this.checkpointFilter = checkpointFilter; this.lastCheckpointHeader = lastCheckpointHeader;
this.metricsSystem = metricsSystem; this.metricsSystem = metricsSystem;
} }
public CompletableFuture<List<BlockHeader>> getNextCheckpointHeaders( public CompletableFuture<List<BlockHeader>> getNextCheckpointHeaders(
final EthPeer peer, final BlockHeader lastHeader) { final EthPeer peer, final BlockHeader lastHeader) {
final int skip = syncConfig.downloaderChainSegmentSize() - 1; final int skip = syncConfig.downloaderChainSegmentSize() - 1;
final int additionalHeaderCount = syncConfig.downloaderHeaderRequestSize(); final int maximumHeaderRequestSize = syncConfig.downloaderHeaderRequestSize();
final int additionalHeaderCount;
if (lastCheckpointHeader.isPresent()) {
final BlockHeader targetHeader = lastCheckpointHeader.get();
final long blocksUntilTarget = targetHeader.getNumber() - lastHeader.getNumber();
if (blocksUntilTarget <= 0) {
return completedFuture(emptyList());
}
final long maxHeadersToRequest = blocksUntilTarget / (skip + 1);
additionalHeaderCount = (int) Math.min(maxHeadersToRequest, maximumHeaderRequestSize);
if (additionalHeaderCount == 0) {
return completedFuture(singletonList(targetHeader));
}
} else {
additionalHeaderCount = maximumHeaderRequestSize;
}
return requestHeaders(peer, lastHeader, additionalHeaderCount, skip);
}
private CompletableFuture<List<BlockHeader>> requestHeaders(
final EthPeer peer,
final BlockHeader referenceHeader,
final int headerCount,
final int skip) {
return GetHeadersFromPeerByHashTask.startingAtHash( return GetHeadersFromPeerByHashTask.startingAtHash(
protocolSchedule, protocolSchedule,
ethContext, ethContext,
lastHeader.getHash(), referenceHeader.getHash(),
lastHeader.getNumber(), referenceHeader.getNumber(),
// + 1 because lastHeader will be returned as well. // + 1 because lastHeader will be returned as well.
additionalHeaderCount + 1, headerCount + 1,
skip, skip,
metricsSystem) metricsSystem)
.assignPeer(peer) .assignPeer(peer)
.run() .run()
.thenApply(PeerTaskResult::getResult) .thenApply(PeerTaskResult::getResult)
.thenApply( .thenApply(headers -> stripExistingCheckpointHeader(referenceHeader, headers));
headers -> checkpointFilter.apply(stripExistingCheckpointHeader(lastHeader, headers)));
} }
private List<BlockHeader> stripExistingCheckpointHeader( private List<BlockHeader> stripExistingCheckpointHeader(

@ -27,6 +27,7 @@ import tech.pegasys.pantheon.services.pipeline.Pipeline;
import tech.pegasys.pantheon.services.pipeline.PipelineBuilder; import tech.pegasys.pantheon.services.pipeline.PipelineBuilder;
import java.time.Duration; import java.time.Duration;
import java.util.Optional;
public class FastSyncDownloadPipelineFactory implements DownloadPipelineFactory { public class FastSyncDownloadPipelineFactory implements DownloadPipelineFactory {
private final SynchronizerConfiguration syncConfig; private final SynchronizerConfiguration syncConfig;
@ -60,7 +61,7 @@ public class FastSyncDownloadPipelineFactory implements DownloadPipelineFactory
syncConfig, syncConfig,
protocolSchedule, protocolSchedule,
ethContext, ethContext,
new FastSyncCheckpointFilter(pivotBlockHeader), Optional.of(pivotBlockHeader),
metricsSystem), metricsSystem),
this::shouldContinueDownloadingFromPeer, this::shouldContinueDownloadingFromPeer,
ethContext.getScheduler(), ethContext.getScheduler(),

@ -13,9 +13,9 @@
package tech.pegasys.pantheon.ethereum.eth.sync; package tech.pegasys.pantheon.ethereum.eth.sync;
import static java.util.Arrays.asList; import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
import tech.pegasys.pantheon.ethereum.ProtocolContext; import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.chain.Blockchain; import tech.pegasys.pantheon.ethereum.chain.Blockchain;
@ -31,14 +31,13 @@ import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.function.UnaryOperator;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner; import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
@ -47,9 +46,9 @@ public class CheckpointHeaderFetcherTest {
private static ProtocolSchedule<Void> protocolSchedule; private static ProtocolSchedule<Void> protocolSchedule;
private static ProtocolContext<Void> protocolContext; private static ProtocolContext<Void> protocolContext;
private static final MetricsSystem metricsSystem = new NoOpMetricsSystem(); private static final MetricsSystem metricsSystem = new NoOpMetricsSystem();
@Mock private UnaryOperator<List<BlockHeader>> filter;
private EthProtocolManager ethProtocolManager; private EthProtocolManager ethProtocolManager;
private CheckpointHeaderFetcher checkpointHeaderFetcher; private Responder responder;
private RespondingEthPeer respondingPeer;
@BeforeClass @BeforeClass
public static void setUpClass() { public static void setUpClass() {
@ -65,26 +64,15 @@ public class CheckpointHeaderFetcherTest {
ethProtocolManager = ethProtocolManager =
EthProtocolManagerTestUtil.create( EthProtocolManagerTestUtil.create(
blockchain, protocolContext.getWorldStateArchive(), () -> false); blockchain, protocolContext.getWorldStateArchive(), () -> false);
final EthContext ethContext = ethProtocolManager.ethContext(); responder =
checkpointHeaderFetcher = RespondingEthPeer.blockchainResponder(blockchain, protocolContext.getWorldStateArchive());
new CheckpointHeaderFetcher( respondingPeer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
SynchronizerConfiguration.builder()
.downloaderChainSegmentSize(5)
.downloaderHeadersRequestSize(3)
.build(),
protocolSchedule,
ethContext,
filter,
metricsSystem);
} }
@Test @Test
public void shouldRequestHeadersFromPeerAndExcludeExistingHeader() { public void shouldRequestHeadersFromPeerAndExcludeExistingHeader() {
when(filter.apply(any())).thenAnswer(invocation -> invocation.getArgument(0)); final CheckpointHeaderFetcher checkpointHeaderFetcher =
final Responder responder = createCheckpointHeaderFetcher(Optional.empty());
RespondingEthPeer.blockchainResponder(blockchain, protocolContext.getWorldStateArchive());
final RespondingEthPeer respondingPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
final CompletableFuture<List<BlockHeader>> result = final CompletableFuture<List<BlockHeader>> result =
checkpointHeaderFetcher.getNextCheckpointHeaders(respondingPeer.getEthPeer(), header(1)); checkpointHeaderFetcher.getNextCheckpointHeaders(respondingPeer.getEthPeer(), header(1));
@ -97,23 +85,74 @@ public class CheckpointHeaderFetcherTest {
} }
@Test @Test
public void shouldApplyFilterToDownloadedCheckpoints() { public void shouldNotRequestHeadersBeyondTargetWhenTargetIsMultipleOfSegmentSize() {
final List<BlockHeader> filteredResult = asList(header(7), header(9)); final CheckpointHeaderFetcher checkpointHeaderFetcher =
final List<BlockHeader> unfilteredResult = asList(header(6), header(11), header(16)); createCheckpointHeaderFetcher(Optional.of(header(11)));
when(filter.apply(unfilteredResult)).thenReturn(filteredResult);
final Responder responder =
RespondingEthPeer.blockchainResponder(blockchain, protocolContext.getWorldStateArchive());
final RespondingEthPeer respondingPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
final CompletableFuture<List<BlockHeader>> result = final CompletableFuture<List<BlockHeader>> result =
checkpointHeaderFetcher.getNextCheckpointHeaders(respondingPeer.getEthPeer(), header(1)); checkpointHeaderFetcher.getNextCheckpointHeaders(respondingPeer.getEthPeer(), header(1));
assertThat(result).isNotDone(); respondingPeer.respond(responder);
assertThat(result).isCompletedWithValue(asList(header(6), header(11)));
}
@Test
public void shouldNotRequestHeadersBeyondTargetWhenTargetIsNotAMultipleOfSegmentSize() {
final CheckpointHeaderFetcher checkpointHeaderFetcher =
createCheckpointHeaderFetcher(Optional.of(header(15)));
final CompletableFuture<List<BlockHeader>> result =
checkpointHeaderFetcher.getNextCheckpointHeaders(respondingPeer.getEthPeer(), header(1));
respondingPeer.respond(responder); respondingPeer.respond(responder);
assertThat(result).isCompletedWithValue(filteredResult); assertThat(result).isCompletedWithValue(asList(header(6), header(11)));
}
@Test
public void shouldReturnOnlyTargetHeaderWhenLastHeaderIsTheCheckpointBeforeTarget() {
final CheckpointHeaderFetcher checkpointHeaderFetcher =
createCheckpointHeaderFetcher(Optional.of(header(15)));
final CompletableFuture<List<BlockHeader>> result =
checkpointHeaderFetcher.getNextCheckpointHeaders(respondingPeer.getEthPeer(), header(11));
assertThat(result).isCompletedWithValue(singletonList(header(15)));
}
@Test
public void shouldReturnEmptyListWhenLastHeaderIsTarget() {
final CheckpointHeaderFetcher checkpointHeaderFetcher =
createCheckpointHeaderFetcher(Optional.of(header(15)));
final CompletableFuture<List<BlockHeader>> result =
checkpointHeaderFetcher.getNextCheckpointHeaders(respondingPeer.getEthPeer(), header(15));
assertThat(result).isCompletedWithValue(emptyList());
}
@Test
public void shouldReturnEmptyListWhenLastHeaderIsAfterTarget() {
final CheckpointHeaderFetcher checkpointHeaderFetcher =
createCheckpointHeaderFetcher(Optional.of(header(15)));
final CompletableFuture<List<BlockHeader>> result =
checkpointHeaderFetcher.getNextCheckpointHeaders(respondingPeer.getEthPeer(), header(16));
assertThat(result).isCompletedWithValue(emptyList());
}
private CheckpointHeaderFetcher createCheckpointHeaderFetcher(
final Optional<BlockHeader> targetHeader) {
final EthContext ethContext = ethProtocolManager.ethContext();
return new CheckpointHeaderFetcher(
SynchronizerConfiguration.builder()
.downloaderChainSegmentSize(5)
.downloaderHeadersRequestSize(3)
.build(),
protocolSchedule,
ethContext,
targetHeader,
metricsSystem);
} }
private BlockHeader header(final long blockNumber) { private BlockHeader header(final long blockNumber) {

@ -1,63 +0,0 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.fastsync;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static org.assertj.core.api.Assertions.assertThat;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.BlockHeaderTestFixture;
import java.util.List;
import org.junit.Test;
public class FastSyncCheckpointFilterTest {
private final BlockHeader pivotBlockHeader = header(50);
private final FastSyncCheckpointFilter filter = new FastSyncCheckpointFilter(pivotBlockHeader);
@Test
public void shouldNotChangeCheckpointsPriorToThePivotBlock() {
final List<BlockHeader> input =
asList(header(10), header(20), header(30), header(40), header(49));
assertThat(filter.apply(input)).isEqualTo(input);
}
@Test
public void shouldRemoveCheckpointsBeyondPivotBlock() {
final List<BlockHeader> input = asList(header(40), header(50), header(60), header(70));
assertThat(filter.apply(input)).containsExactly(header(40), header(50));
}
@Test
public void shouldAppendPivotBlockHeaderWhenRemovingCheckpointsIfNotAlreadyPresent() {
final List<BlockHeader> input = asList(header(45), header(55), header(65));
assertThat(filter.apply(input)).containsExactly(header(45), header(50));
}
@Test
public void shouldReturnOnlyPivotBlockHeaderIfAllBlocksAreAfterPivotBlock() {
assertThat(filter.apply(asList(header(55), header(60)))).containsExactly(pivotBlockHeader);
}
@Test
public void shouldNotChangeEmptyHeaders() {
assertThat(filter.apply(emptyList())).isEmpty();
}
private BlockHeader header(final int number) {
return new BlockHeaderTestFixture().number(number).buildHeader();
}
}
Loading…
Cancel
Save