|
|
|
@ -27,6 +27,7 @@ import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec; |
|
|
|
|
import org.hyperledger.besu.plugin.services.MetricsSystem; |
|
|
|
|
|
|
|
|
|
import java.util.ArrayList; |
|
|
|
|
import java.util.Iterator; |
|
|
|
|
import java.util.List; |
|
|
|
|
import java.util.concurrent.CompletableFuture; |
|
|
|
|
import java.util.function.Supplier; |
|
|
|
@ -67,22 +68,20 @@ public class PersistBlockTask<C> extends AbstractEthTask<Block> { |
|
|
|
|
final List<Block> blocks, |
|
|
|
|
final HeaderValidationMode headerValidationMode, |
|
|
|
|
final MetricsSystem metricsSystem) { |
|
|
|
|
checkArgument(blocks.size() > 0); |
|
|
|
|
checkArgument(!blocks.isEmpty(), "No blocks to import provided"); |
|
|
|
|
return () -> { |
|
|
|
|
final List<Block> successfulImports = new ArrayList<>(); |
|
|
|
|
CompletableFuture<Block> future = null; |
|
|
|
|
for (final Block block : blocks) { |
|
|
|
|
if (future == null) { |
|
|
|
|
future = |
|
|
|
|
final Iterator<Block> blockIterator = blocks.iterator(); |
|
|
|
|
CompletableFuture<Block> future = |
|
|
|
|
importBlockAndAddToList( |
|
|
|
|
protocolSchedule, |
|
|
|
|
protocolContext, |
|
|
|
|
block, |
|
|
|
|
blockIterator.next(), |
|
|
|
|
successfulImports, |
|
|
|
|
headerValidationMode, |
|
|
|
|
metricsSystem); |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
while (blockIterator.hasNext()) { |
|
|
|
|
final Block block = blockIterator.next(); |
|
|
|
|
future = |
|
|
|
|
future.thenCompose( |
|
|
|
|
b -> |
|
|
|
@ -122,34 +121,34 @@ public class PersistBlockTask<C> extends AbstractEthTask<Block> { |
|
|
|
|
final List<Block> blocks, |
|
|
|
|
final HeaderValidationMode headerValidationMode, |
|
|
|
|
final MetricsSystem metricsSystem) { |
|
|
|
|
checkArgument(blocks.size() > 0); |
|
|
|
|
checkArgument(!blocks.isEmpty(), "No blocks to import provided"); |
|
|
|
|
return () -> { |
|
|
|
|
final CompletableFuture<List<Block>> finalResult = new CompletableFuture<>(); |
|
|
|
|
final List<Block> successfulImports = new ArrayList<>(); |
|
|
|
|
CompletableFuture<Block> future = null; |
|
|
|
|
for (final Block block : blocks) { |
|
|
|
|
if (future == null) { |
|
|
|
|
future = |
|
|
|
|
final Iterator<PersistBlockTask<C>> tasks = |
|
|
|
|
blocks.stream() |
|
|
|
|
.map( |
|
|
|
|
block -> |
|
|
|
|
PersistBlockTask.create( |
|
|
|
|
protocolSchedule, protocolContext, block, headerValidationMode, metricsSystem) |
|
|
|
|
.run(); |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
protocolSchedule, |
|
|
|
|
protocolContext, |
|
|
|
|
block, |
|
|
|
|
headerValidationMode, |
|
|
|
|
metricsSystem)) |
|
|
|
|
.iterator(); |
|
|
|
|
|
|
|
|
|
CompletableFuture<Block> future = tasks.next().run(); |
|
|
|
|
while (tasks.hasNext()) { |
|
|
|
|
final PersistBlockTask<C> task = tasks.next(); |
|
|
|
|
future = |
|
|
|
|
future |
|
|
|
|
.handle((r, t) -> r) |
|
|
|
|
.thenCompose( |
|
|
|
|
(r) -> { |
|
|
|
|
r -> { |
|
|
|
|
if (r != null) { |
|
|
|
|
successfulImports.add(r); |
|
|
|
|
} |
|
|
|
|
return PersistBlockTask.create( |
|
|
|
|
protocolSchedule, |
|
|
|
|
protocolContext, |
|
|
|
|
block, |
|
|
|
|
headerValidationMode, |
|
|
|
|
metricsSystem) |
|
|
|
|
.run(); |
|
|
|
|
return task.run(); |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
future.whenComplete( |
|
|
|
|