diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java index 0d482a69a1..2157502cff 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java @@ -39,9 +39,11 @@ import tech.pegasys.pantheon.metrics.OperationTimer; import tech.pegasys.pantheon.util.uint.UInt256; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -49,7 +51,6 @@ import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.collect.Range; -import io.netty.util.internal.ConcurrentSet; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -65,7 +66,8 @@ public class BlockPropagationManager { private final AtomicBoolean started = new AtomicBoolean(false); - private final Set requestedBlocks = new ConcurrentSet<>(); + private final Set requestedBlocks = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final Set importingBlocks = Collections.newSetFromMap(new ConcurrentHashMap<>()); private final PendingBlocks pendingBlocks; BlockPropagationManager( @@ -202,6 +204,9 @@ public class BlockPropagationManager { if (pendingBlocks.contains(announcedBlock.hash())) { continue; } + if (importingBlocks.contains(announcedBlock.hash())) { + continue; + } if (blockchain.contains(announcedBlock.hash())) { continue; } @@ -247,6 +252,17 @@ public class BlockPropagationManager { } } + if (!importingBlocks.add(block.getHash())) { + // We're already importing this block. + return CompletableFuture.completedFuture(block); + } + + if (protocolContext.getBlockchain().contains(block.getHash())) { + // We've already imported this block. + importingBlocks.remove(block.getHash()); + return CompletableFuture.completedFuture(block); + } + // Import block final PersistBlockTask importTask = PersistBlockTask.create( @@ -256,8 +272,8 @@ public class BlockPropagationManager { .scheduleSyncWorkerTask(importTask::run) .whenComplete( (r, t) -> { + importingBlocks.remove(block.getHash()); if (t != null) { - // TODO do we time failures? But we cannot drop a label in at this point. LOG.warn( "Failed to import announced block {} ({}).", block.getHeader().getNumber(), diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManagerTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManagerTest.java index d72f8fb3b0..57d8e74bbf 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManagerTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManagerTest.java @@ -14,9 +14,11 @@ package tech.pegasys.pantheon.ethereum.eth.sync; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import tech.pegasys.pantheon.ethereum.ProtocolContext; import tech.pegasys.pantheon.ethereum.chain.Blockchain; @@ -24,8 +26,12 @@ import tech.pegasys.pantheon.ethereum.chain.MutableBlockchain; import tech.pegasys.pantheon.ethereum.core.Block; import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator; import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator.BlockOptions; +import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; +import tech.pegasys.pantheon.ethereum.eth.manager.EthMessages; +import tech.pegasys.pantheon.ethereum.eth.manager.EthPeers; import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManager; import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil; +import tech.pegasys.pantheon.ethereum.eth.manager.EthScheduler; import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer; import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.Responder; import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.BlockchainSetupUtil; @@ -41,6 +47,8 @@ import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; import tech.pegasys.pantheon.util.uint.UInt256; import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; import org.junit.Before; import org.junit.BeforeClass; @@ -530,4 +538,31 @@ public class BlockPropagationManagerTest { assertThat(peer.getEthPeer().chainState().getBestBlock().getTotalDifficulty()) .isEqualTo(totalDifficulty); } + + @SuppressWarnings("unchecked") + @Test + public void shouldNotImportBlocksThatAreAlreadyBeingImported() { + final EthScheduler ethScheduler = mock(EthScheduler.class); + when(ethScheduler.scheduleSyncWorkerTask(any(Supplier.class))) + .thenReturn(new CompletableFuture<>()); + final EthContext ethContext = + new EthContext("eth", new EthPeers("eth"), new EthMessages(), ethScheduler); + final BlockPropagationManager blockPropagationManager = + new BlockPropagationManager<>( + syncConfig, + protocolSchedule, + protocolContext, + ethContext, + syncState, + pendingBlocks, + ethTasksTimer); + + blockchainUtil.importFirstBlocks(2); + final Block nextBlock = blockchainUtil.getBlock(2); + + blockPropagationManager.importOrSavePendingBlock(nextBlock); + blockPropagationManager.importOrSavePendingBlock(nextBlock); + + verify(ethScheduler, times(1)).scheduleSyncWorkerTask(any(Supplier.class)); + } }