[NC-2099] Avoid duplicate import of announced blocks (#550)

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
Adrian Sutton 6 years ago committed by GitHub
parent 9c585ec523
commit c9e1f0a3ae
  1. 22
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java
  2. 35
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManagerTest.java

@ -39,9 +39,11 @@ import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.util.uint.UInt256; import tech.pegasys.pantheon.util.uint.UInt256;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -49,7 +51,6 @@ import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Range; import com.google.common.collect.Range;
import io.netty.util.internal.ConcurrentSet;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
@ -65,7 +66,8 @@ public class BlockPropagationManager<C> {
private final AtomicBoolean started = new AtomicBoolean(false); private final AtomicBoolean started = new AtomicBoolean(false);
private final Set<Hash> requestedBlocks = new ConcurrentSet<>(); private final Set<Hash> requestedBlocks = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Set<Hash> importingBlocks = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final PendingBlocks pendingBlocks; private final PendingBlocks pendingBlocks;
BlockPropagationManager( BlockPropagationManager(
@ -202,6 +204,9 @@ public class BlockPropagationManager<C> {
if (pendingBlocks.contains(announcedBlock.hash())) { if (pendingBlocks.contains(announcedBlock.hash())) {
continue; continue;
} }
if (importingBlocks.contains(announcedBlock.hash())) {
continue;
}
if (blockchain.contains(announcedBlock.hash())) { if (blockchain.contains(announcedBlock.hash())) {
continue; continue;
} }
@ -247,6 +252,17 @@ public class BlockPropagationManager<C> {
} }
} }
if (!importingBlocks.add(block.getHash())) {
// We're already importing this block.
return CompletableFuture.completedFuture(block);
}
if (protocolContext.getBlockchain().contains(block.getHash())) {
// We've already imported this block.
importingBlocks.remove(block.getHash());
return CompletableFuture.completedFuture(block);
}
// Import block // Import block
final PersistBlockTask<C> importTask = final PersistBlockTask<C> importTask =
PersistBlockTask.create( PersistBlockTask.create(
@ -256,8 +272,8 @@ public class BlockPropagationManager<C> {
.scheduleSyncWorkerTask(importTask::run) .scheduleSyncWorkerTask(importTask::run)
.whenComplete( .whenComplete(
(r, t) -> { (r, t) -> {
importingBlocks.remove(block.getHash());
if (t != null) { if (t != null) {
// TODO do we time failures? But we cannot drop a label in at this point.
LOG.warn( LOG.warn(
"Failed to import announced block {} ({}).", "Failed to import announced block {} ({}).",
block.getHeader().getNumber(), block.getHeader().getNumber(),

@ -14,9 +14,11 @@ package tech.pegasys.pantheon.ethereum.eth.sync;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import tech.pegasys.pantheon.ethereum.ProtocolContext; import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.chain.Blockchain; import tech.pegasys.pantheon.ethereum.chain.Blockchain;
@ -24,8 +26,12 @@ import tech.pegasys.pantheon.ethereum.chain.MutableBlockchain;
import tech.pegasys.pantheon.ethereum.core.Block; import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator; import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator;
import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator.BlockOptions; import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator.BlockOptions;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthMessages;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeers;
import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManager; import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManager;
import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil; import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil;
import tech.pegasys.pantheon.ethereum.eth.manager.EthScheduler;
import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer; import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.Responder; import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.Responder;
import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.BlockchainSetupUtil; import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.BlockchainSetupUtil;
@ -41,6 +47,8 @@ import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.util.uint.UInt256; import tech.pegasys.pantheon.util.uint.UInt256;
import java.util.Collections; import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -530,4 +538,31 @@ public class BlockPropagationManagerTest {
assertThat(peer.getEthPeer().chainState().getBestBlock().getTotalDifficulty()) assertThat(peer.getEthPeer().chainState().getBestBlock().getTotalDifficulty())
.isEqualTo(totalDifficulty); .isEqualTo(totalDifficulty);
} }
@SuppressWarnings("unchecked")
@Test
public void shouldNotImportBlocksThatAreAlreadyBeingImported() {
final EthScheduler ethScheduler = mock(EthScheduler.class);
when(ethScheduler.scheduleSyncWorkerTask(any(Supplier.class)))
.thenReturn(new CompletableFuture<>());
final EthContext ethContext =
new EthContext("eth", new EthPeers("eth"), new EthMessages(), ethScheduler);
final BlockPropagationManager<Void> blockPropagationManager =
new BlockPropagationManager<>(
syncConfig,
protocolSchedule,
protocolContext,
ethContext,
syncState,
pendingBlocks,
ethTasksTimer);
blockchainUtil.importFirstBlocks(2);
final Block nextBlock = blockchainUtil.getBlock(2);
blockPropagationManager.importOrSavePendingBlock(nextBlock);
blockPropagationManager.importOrSavePendingBlock(nextBlock);
verify(ethScheduler, times(1)).scheduleSyncWorkerTask(any(Supplier.class));
}
} }

Loading…
Cancel
Save