Improving backwards sync (#3638)

Refactor Backwards sync to use a rocks db.

Signed-off-by: Jiri Peinlich <jiri.peinlich@gmail.com>
pull/3720/head
Jiri Peinlich 3 years ago committed by Karim TAAM
parent 6c8602d435
commit 918b5359b1
  1. 8
      besu/src/main/java/org/hyperledger/besu/controller/MergeBesuControllerBuilder.java
  2. 6
      besu/src/main/java/org/hyperledger/besu/controller/TransitionBesuControllerBuilder.java
  3. 5
      besu/src/test/java/org/hyperledger/besu/controller/TransitionControllerBuilderTest.java
  4. 8
      consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/TransitionBackwardSyncContext.java
  5. 3
      consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/blockcreation/MergeCoordinator.java
  6. 3
      ethereum/core/src/main/java/org/hyperledger/besu/ethereum/storage/keyvalue/KeyValueSegmentIdentifier.java
  7. 240
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardChain.java
  8. 258
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncContext.java
  9. 172
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncLookupService.java
  10. 193
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncPhase.java
  11. 112
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncStep.java
  12. 58
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncTask.java
  13. 91
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/ForwardSyncStep.java
  14. 20
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/GenericKeyValueStorageFacade.java
  15. 34
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/HashConvertor.java
  16. 74
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/SyncStepStep.java
  17. 11
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/SyncState.java
  18. 132
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncContextTest.java
  19. 132
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncLookupServiceTest.java
  20. 142
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncStepTest.java
  21. 111
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncTaskTest.java
  22. 100
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/ForwardSyncStepTest.java
  23. 131
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/InMemoryBackwardChainTest.java
  24. 2
      plugin-api/build.gradle
  25. 12
      plugin-api/src/main/java/org/hyperledger/besu/plugin/services/BesuEvents.java

@ -28,11 +28,12 @@ import org.hyperledger.besu.ethereum.core.MiningParameters;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator; import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.peervalidation.RequiredBlocksPeerValidator; import org.hyperledger.besu.ethereum.eth.peervalidation.RequiredBlocksPeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardChain;
import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardSyncContext; import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardSyncContext;
import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardSyncLookupService;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ScheduleBasedBlockHeaderFunctions;
import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive; import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive;
import java.util.List; import java.util.List;
@ -68,9 +69,8 @@ public class MergeBesuControllerBuilder extends BesuControllerBuilder {
metricsSystem, metricsSystem,
ethProtocolManager.ethContext(), ethProtocolManager.ethContext(),
syncState, syncState,
new BackwardSyncLookupService( BackwardChain.from(
protocolSchedule, ethProtocolManager.ethContext(), metricsSystem, protocolContext), storageProvider, ScheduleBasedBlockHeaderFunctions.create(protocolSchedule))));
storageProvider));
} }
protected MiningCoordinator createTransitionMiningCoordinator( protected MiningCoordinator createTransitionMiningCoordinator(

@ -34,7 +34,6 @@ import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardSyncContext; import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardSyncContext;
import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardSyncLookupService;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
@ -98,11 +97,6 @@ public class TransitionBesuControllerBuilder extends BesuControllerBuilder {
metricsSystem, metricsSystem,
ethProtocolManager.ethContext(), ethProtocolManager.ethContext(),
syncState, syncState,
new BackwardSyncLookupService(
transitionProtocolSchedule,
ethProtocolManager.ethContext(),
metricsSystem,
protocolContext),
storageProvider); storageProvider);
final TransitionCoordinator composedCoordinator = final TransitionCoordinator composedCoordinator =

@ -30,12 +30,14 @@ import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.chain.MutableBlockchain; import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture; import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture;
import org.hyperledger.besu.ethereum.core.Difficulty; import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider;
import org.hyperledger.besu.ethereum.core.MiningParameters; import org.hyperledger.besu.ethereum.core.MiningParameters;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec; import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.storage.StorageProvider;
import java.util.Optional; import java.util.Optional;
@ -61,6 +63,7 @@ public class TransitionControllerBuilderTest {
@Mock SyncState syncState; @Mock SyncState syncState;
@Mock EthProtocolManager ethProtocolManager; @Mock EthProtocolManager ethProtocolManager;
@Mock PostMergeContext mergeContext; @Mock PostMergeContext mergeContext;
StorageProvider storageProvider = new InMemoryKeyValueStorageProvider();
@Spy CliqueBesuControllerBuilder cliqueBuilder = new CliqueBesuControllerBuilder(); @Spy CliqueBesuControllerBuilder cliqueBuilder = new CliqueBesuControllerBuilder();
@Spy BesuControllerBuilder powBuilder = new MainnetBesuControllerBuilder(); @Spy BesuControllerBuilder powBuilder = new MainnetBesuControllerBuilder();
@ -76,6 +79,7 @@ public class TransitionControllerBuilderTest {
new TransitionProtocolSchedule( new TransitionProtocolSchedule(
preMergeProtocolSchedule, postMergeProtocolSchedule, mergeContext)); preMergeProtocolSchedule, postMergeProtocolSchedule, mergeContext));
cliqueBuilder.nodeKey(NodeKeyUtils.generate()); cliqueBuilder.nodeKey(NodeKeyUtils.generate());
postMergeBuilder.storageProvider(storageProvider);
when(protocolContext.getBlockchain()).thenReturn(mockBlockchain); when(protocolContext.getBlockchain()).thenReturn(mockBlockchain);
when(transitionProtocolSchedule.getPostMergeSchedule()).thenReturn(postMergeProtocolSchedule); when(transitionProtocolSchedule.getPostMergeSchedule()).thenReturn(postMergeProtocolSchedule);
when(transitionProtocolSchedule.getPreMergeSchedule()).thenReturn(preMergeProtocolSchedule); when(transitionProtocolSchedule.getPreMergeSchedule()).thenReturn(preMergeProtocolSchedule);
@ -152,6 +156,7 @@ public class TransitionControllerBuilderTest {
TransitionCoordinator buildTransitionCoordinator( TransitionCoordinator buildTransitionCoordinator(
final BesuControllerBuilder preMerge, final MergeBesuControllerBuilder postMerge) { final BesuControllerBuilder preMerge, final MergeBesuControllerBuilder postMerge) {
var builder = new TransitionBesuControllerBuilder(preMerge, postMerge); var builder = new TransitionBesuControllerBuilder(preMerge, postMerge);
builder.storageProvider(storageProvider);
var coordinator = var coordinator =
builder.createMiningCoordinator( builder.createMiningCoordinator(
transitionProtocolSchedule, transitionProtocolSchedule,

@ -18,9 +18,10 @@ import org.hyperledger.besu.ethereum.BlockValidator;
import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.core.Block; import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardChain;
import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardSyncContext; import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardSyncContext;
import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardSyncLookupService;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.mainnet.ScheduleBasedBlockHeaderFunctions;
import org.hyperledger.besu.ethereum.storage.StorageProvider; import org.hyperledger.besu.ethereum.storage.StorageProvider;
import org.hyperledger.besu.plugin.services.MetricsSystem; import org.hyperledger.besu.plugin.services.MetricsSystem;
@ -34,7 +35,6 @@ public class TransitionBackwardSyncContext extends BackwardSyncContext {
final MetricsSystem metricsSystem, final MetricsSystem metricsSystem,
final EthContext ethContext, final EthContext ethContext,
final SyncState syncState, final SyncState syncState,
final BackwardSyncLookupService backwardSyncLookupService,
final StorageProvider storageProvider) { final StorageProvider storageProvider) {
super( super(
protocolContext, protocolContext,
@ -42,8 +42,8 @@ public class TransitionBackwardSyncContext extends BackwardSyncContext {
metricsSystem, metricsSystem,
ethContext, ethContext,
syncState, syncState,
backwardSyncLookupService, BackwardChain.from(
storageProvider); storageProvider, ScheduleBasedBlockHeaderFunctions.create(transitionProtocolSchedule)));
this.transitionProtocolSchedule = transitionProtocolSchedule; this.transitionProtocolSchedule = transitionProtocolSchedule;
} }

@ -16,7 +16,6 @@ package org.hyperledger.besu.consensus.merge.blockcreation;
import static org.hyperledger.besu.consensus.merge.TransitionUtils.isTerminalProofOfWorkBlock; import static org.hyperledger.besu.consensus.merge.TransitionUtils.isTerminalProofOfWorkBlock;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.debugLambda; import static org.hyperledger.besu.util.Slf4jLambdaHelper.debugLambda;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.infoLambda;
import org.hyperledger.besu.consensus.merge.MergeContext; import org.hyperledger.besu.consensus.merge.MergeContext;
import org.hyperledger.besu.datatypes.Address; import org.hyperledger.besu.datatypes.Address;
@ -215,7 +214,7 @@ public class MergeCoordinator implements MergeMiningCoordinator {
if (optHeader.isPresent()) { if (optHeader.isPresent()) {
debugLambda(LOG, "BlockHeader {} is already present", () -> optHeader.get().toLogString()); debugLambda(LOG, "BlockHeader {} is already present", () -> optHeader.get().toLogString());
} else { } else {
infoLambda(LOG, "appending block hash {} to backward sync", blockhash::toHexString); debugLambda(LOG, "appending block hash {} to backward sync", blockhash::toHexString);
backwardSyncContext.syncBackwardsUntil(blockhash); backwardSyncContext.syncBackwardsUntil(blockhash);
} }
return optHeader; return optHeader;

@ -32,7 +32,8 @@ public enum KeyValueSegmentIdentifier implements SegmentIdentifier {
GOQUORUM_PRIVATE_WORLD_STATE(new byte[] {11}), GOQUORUM_PRIVATE_WORLD_STATE(new byte[] {11}),
GOQUORUM_PRIVATE_STORAGE(new byte[] {12}), GOQUORUM_PRIVATE_STORAGE(new byte[] {12}),
BACKWARD_SYNC_HEADERS(new byte[] {13}), BACKWARD_SYNC_HEADERS(new byte[] {13}),
BACKWARD_SYNC_BLOCKS(new byte[] {14}); BACKWARD_SYNC_BLOCKS(new byte[] {14}),
BACKWARD_SYNC_CHAIN(new byte[] {15});
private final byte[] id; private final byte[] id;
private final int[] versionList; private final int[] versionList;

@ -16,7 +16,6 @@
package org.hyperledger.besu.ethereum.eth.sync.backwardsync; package org.hyperledger.besu.ethereum.eth.sync.backwardsync;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.debugLambda; import static org.hyperledger.besu.util.Slf4jLambdaHelper.debugLambda;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.warnLambda;
import static org.slf4j.LoggerFactory.getLogger; import static org.slf4j.LoggerFactory.getLogger;
import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.datatypes.Hash;
@ -26,81 +25,75 @@ import org.hyperledger.besu.ethereum.core.BlockHeaderFunctions;
import org.hyperledger.besu.ethereum.storage.StorageProvider; import org.hyperledger.besu.ethereum.storage.StorageProvider;
import org.hyperledger.besu.ethereum.storage.keyvalue.KeyValueSegmentIdentifier; import org.hyperledger.besu.ethereum.storage.keyvalue.KeyValueSegmentIdentifier;
import java.util.ArrayDeque;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Collectors; import java.util.Queue;
import org.slf4j.Logger; import org.slf4j.Logger;
public class BackwardChain { public class BackwardChain {
private static final Logger LOG = getLogger(BackwardChain.class); private static final Logger LOG = getLogger(BackwardChain.class);
private final List<Hash> ancestors = new ArrayList<>(); private final GenericKeyValueStorageFacade<Hash, BlockHeader> headers;
private final List<Hash> successors = new ArrayList<>(); private final GenericKeyValueStorageFacade<Hash, Block> blocks;
private final GenericKeyValueStorageFacade<Hash, Hash> chainStorage;
protected final GenericKeyValueStorageFacade<Hash, BlockHeader> headers; private Optional<BlockHeader> firstStoredAncestor = Optional.empty();
protected final GenericKeyValueStorageFacade<Hash, Block> blocks; private Optional<BlockHeader> lastStoredPivot = Optional.empty();
private final Queue<Hash> hashesToAppend = new ArrayDeque<>();
public BackwardChain( public BackwardChain(
final StorageProvider provider, final GenericKeyValueStorageFacade<Hash, BlockHeader> headersStorage,
final BlockHeaderFunctions blockHeaderFunctions, final GenericKeyValueStorageFacade<Hash, Block> blocksStorage,
final Block pivot) { final GenericKeyValueStorageFacade<Hash, Hash> chainStorage) {
this( this.headers = headersStorage;
this.blocks = blocksStorage;
this.chainStorage = chainStorage;
}
public static BackwardChain from(
final StorageProvider storageProvider, final BlockHeaderFunctions blockHeaderFunctions) {
return new BackwardChain(
new GenericKeyValueStorageFacade<>( new GenericKeyValueStorageFacade<>(
Hash::toArrayUnsafe, Hash::toArrayUnsafe,
BlocksHeadersConvertor.of(blockHeaderFunctions), BlocksHeadersConvertor.of(blockHeaderFunctions),
provider.getStorageBySegmentIdentifier( storageProvider.getStorageBySegmentIdentifier(
KeyValueSegmentIdentifier.BACKWARD_SYNC_HEADERS)), KeyValueSegmentIdentifier.BACKWARD_SYNC_HEADERS)),
new GenericKeyValueStorageFacade<>( new GenericKeyValueStorageFacade<>(
Hash::toArrayUnsafe, Hash::toArrayUnsafe,
BlocksConvertor.of(blockHeaderFunctions), BlocksConvertor.of(blockHeaderFunctions),
provider.getStorageBySegmentIdentifier(KeyValueSegmentIdentifier.BACKWARD_SYNC_BLOCKS)), storageProvider.getStorageBySegmentIdentifier(
pivot); KeyValueSegmentIdentifier.BACKWARD_SYNC_BLOCKS)),
new GenericKeyValueStorageFacade<>(
Hash::toArrayUnsafe,
new HashConvertor(),
storageProvider.getStorageBySegmentIdentifier(
KeyValueSegmentIdentifier.BACKWARD_SYNC_CHAIN)));
} }
public BackwardChain( public synchronized Optional<BlockHeader> getFirstAncestorHeader() {
final GenericKeyValueStorageFacade<Hash, BlockHeader> headersStorage, return firstStoredAncestor;
final GenericKeyValueStorageFacade<Hash, Block> blocksStorage,
final Block pivot) {
this.headers = headersStorage;
this.blocks = blocksStorage;
headersStorage.put(pivot.getHeader().getHash(), pivot.getHeader());
blocksStorage.put(pivot.getHash(), pivot);
ancestors.add(pivot.getHeader().getHash());
successors.add(pivot.getHash());
} }
public Optional<BlockHeader> getFirstAncestorHeader() { public synchronized List<BlockHeader> getFirstNAncestorHeaders(final int size) {
if (ancestors.isEmpty()) { List<BlockHeader> result = new ArrayList<>(size);
return Optional.empty(); Optional<BlockHeader> it = firstStoredAncestor;
while (it.isPresent() && result.size() < size) {
result.add(it.get());
it = chainStorage.get(it.get().getHash()).flatMap(headers::get);
} }
return headers.get(ancestors.get(ancestors.size() - 1)); return result;
} }
public List<BlockHeader> getFirstNAncestorHeaders(final int size) { public synchronized void prependAncestorsHeader(final BlockHeader blockHeader) {
List<Hash> resultList = new ArrayList<>(size); if (firstStoredAncestor.isEmpty()) {
for (int i = Math.min(size, ancestors.size()); i > 0; --i) { firstStoredAncestor = Optional.of(blockHeader);
resultList.add(ancestors.get(ancestors.size() - i)); lastStoredPivot = Optional.of(blockHeader);
headers.put(blockHeader.getHash(), blockHeader);
return;
} }
return resultList.stream() BlockHeader firstHeader = firstStoredAncestor.get();
.map(h -> this.headers.get(h).orElseThrow())
.collect(Collectors.toList());
}
public List<BlockHeader> getAllAncestors() {
return getFirstNAncestorHeaders(ancestors.size());
}
public void prependAncestorsHeader(final BlockHeader blockHeader) {
BlockHeader firstHeader =
getFirstAncestorHeader()
.orElseThrow(
() ->
new BackwardSyncException(
"Cannot save more headers during forward sync", true));
if (firstHeader.getNumber() != blockHeader.getNumber() + 1) { if (firstHeader.getNumber() != blockHeader.getNumber() + 1) {
throw new BackwardSyncException( throw new BackwardSyncException(
"Wrong height of header " "Wrong height of header "
@ -118,134 +111,81 @@ public class BackwardChain {
+ firstHeader.getParentHash().toHexString()); + firstHeader.getParentHash().toHexString());
} }
headers.put(blockHeader.getHash(), blockHeader); headers.put(blockHeader.getHash(), blockHeader);
ancestors.add(blockHeader.getHash()); chainStorage.put(blockHeader.getHash(), firstStoredAncestor.get().getHash());
firstStoredAncestor = Optional.of(blockHeader);
debugLambda( debugLambda(
LOG, LOG,
"Added header {} on height {} to backward chain led by pivot {} on height {}", "Added header {} on height {} to backward chain led by pivot {} on height {}",
() -> blockHeader.getHash().toHexString(), () -> blockHeader.getHash().toHexString(),
blockHeader::getNumber, blockHeader::getNumber,
() -> firstHeader.getHash().toHexString(), () -> lastStoredPivot.orElseThrow().getHash().toHexString(),
firstHeader::getNumber); firstHeader::getNumber);
} }
public void prependChain(final BackwardChain historicalBackwardChain) { public synchronized Optional<Block> getPivot() {
BlockHeader firstHeader = if (lastStoredPivot.isEmpty()) {
getFirstAncestorHeader() return Optional.empty();
.orElseThrow(
() -> new BackwardSyncException("Cannot merge when syncing forward...", true));
Optional<BlockHeader> historicalHeader =
historicalBackwardChain.getHeaderOnHeight(firstHeader.getNumber() - 1);
if (historicalHeader.isEmpty()) {
return;
}
if (firstHeader.getParentHash().equals(historicalHeader.orElseThrow().getHash())) {
for (Block successor : historicalBackwardChain.getSuccessors()) {
if (successor.getHeader().getNumber() > getPivot().getHeader().getNumber()) {
this.successors.add(successor.getHeader().getHash());
}
}
Collections.reverse(historicalBackwardChain.getSuccessors());
for (Block successor : historicalBackwardChain.getSuccessors()) {
if (successor.getHeader().getNumber()
< getFirstAncestorHeader().orElseThrow().getNumber()) {
this.ancestors.add(successor.getHeader().getHash());
}
}
for (BlockHeader ancestor : historicalBackwardChain.getAllAncestors()) {
if (ancestor.getNumber() < getFirstAncestorHeader().orElseThrow().getNumber()) {
this.ancestors.add(ancestor.getHash());
}
}
debugLambda(
LOG,
"Merged backward chain. New chain starts at height {} and ends at height {}",
() -> getPivot().getHeader().getNumber(),
() -> getFirstAncestorHeader().orElseThrow().getNumber());
} else {
warnLambda(
LOG,
"Cannot merge previous historical run because headers on height {} ({}) of {} and {} are not equal. Ignoring previous run. Did someone lie to us?",
() -> firstHeader.getNumber() - 1,
() -> historicalHeader.orElseThrow().getNumber(),
() -> firstHeader.getParentHash().toHexString(),
() -> historicalHeader.orElseThrow().getHash().toHexString());
} }
return blocks.get(lastStoredPivot.get().getHash());
} }
public Block getPivot() { public synchronized void dropFirstHeader() {
return blocks.get(successors.get(successors.size() - 1)).orElseThrow(); if (firstStoredAncestor.isEmpty()) {
} return;
}
public void dropFirstHeader() { headers.drop(firstStoredAncestor.get().getHash());
headers.drop(ancestors.get(ancestors.size() - 1)); final Optional<Hash> hash = chainStorage.get(firstStoredAncestor.get().getHash());
ancestors.remove(ancestors.size() - 1); chainStorage.drop(firstStoredAncestor.get().getHash());
firstStoredAncestor = hash.flatMap(headers::get);
if (firstStoredAncestor.isEmpty()) {
lastStoredPivot = Optional.empty();
}
} }
public void appendExpectedBlock(final Block newPivot) { public synchronized void appendTrustedBlock(final Block newPivot) {
successors.add(newPivot.getHash());
headers.put(newPivot.getHash(), newPivot.getHeader()); headers.put(newPivot.getHash(), newPivot.getHeader());
blocks.put(newPivot.getHash(), newPivot); blocks.put(newPivot.getHash(), newPivot);
if (lastStoredPivot.isEmpty()) {
firstStoredAncestor = Optional.of(newPivot.getHeader());
} else {
if (newPivot.getHeader().getParentHash().equals(lastStoredPivot.get().getHash())) {
chainStorage.put(lastStoredPivot.get().getHash(), newPivot.getHash());
} else {
firstStoredAncestor = Optional.of(newPivot.getHeader());
}
}
lastStoredPivot = Optional.of(newPivot.getHeader());
} }
public List<Block> getSuccessors() { public synchronized boolean isTrusted(final Hash hash) {
return successors.stream()
.map(hash -> blocks.get(hash).orElseThrow())
.collect(Collectors.toList());
}
public boolean isTrusted(final Hash hash) {
return blocks.get(hash).isPresent(); return blocks.get(hash).isPresent();
} }
public Block getTrustedBlock(final Hash hash) { public synchronized Block getTrustedBlock(final Hash hash) {
return blocks.get(hash).orElseThrow(); return blocks.get(hash).orElseThrow();
} }
public void clear() { public synchronized void clear() {
ancestors.clear();
successors.clear();
blocks.clear(); blocks.clear();
headers.clear(); headers.clear();
chainStorage.clear();
firstStoredAncestor = Optional.empty();
lastStoredPivot = Optional.empty();
hashesToAppend.clear();
} }
public void commit() {} public synchronized Optional<BlockHeader> getHeader(final Hash hash) {
return headers.get(hash);
}
public Optional<BlockHeader> getHeaderOnHeight(final long height) { public synchronized void addNewHash(final Hash newBlockHash) {
if (ancestors.isEmpty()) { if (hashesToAppend.contains(newBlockHash)) {
return Optional.empty(); return;
}
final long firstAncestor = headers.get(ancestors.get(0)).orElseThrow().getNumber();
if (firstAncestor >= height) {
if (firstAncestor - height < ancestors.size()) {
final Optional<BlockHeader> blockHeader =
headers.get(ancestors.get((int) (firstAncestor - height)));
blockHeader.ifPresent(
blockHeader1 ->
LOG.debug(
"First: {} Height: {}, result: {}",
firstAncestor,
height,
blockHeader.orElseThrow().getNumber()));
return blockHeader;
} else {
return Optional.empty();
}
} else {
if (successors.isEmpty()) {
return Optional.empty();
}
final long firstSuccessor = headers.get(successors.get(0)).orElseThrow().getNumber();
if (height - firstSuccessor < successors.size()) {
LOG.debug(
"First: {} Height: {}, result: {}",
firstSuccessor,
height,
headers.get(successors.get((int) (height - firstSuccessor))).orElseThrow().getNumber());
return headers.get(successors.get((int) (height - firstSuccessor)));
} else {
return Optional.empty();
}
} }
this.hashesToAppend.add(newBlockHash);
}
public synchronized Optional<Hash> getFirstHash() {
return Optional.ofNullable(hashesToAppend.poll());
} }
} }

@ -15,29 +15,27 @@
package org.hyperledger.besu.ethereum.eth.sync.backwardsync; package org.hyperledger.besu.ethereum.eth.sync.backwardsync;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.debugLambda; import static org.hyperledger.besu.util.Slf4jLambdaHelper.debugLambda;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.infoLambda;
import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.BlockValidator; import org.hyperledger.besu.ethereum.BlockValidator;
import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.core.Block; import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ScheduleBasedBlockHeaderFunctions;
import org.hyperledger.besu.ethereum.storage.StorageProvider;
import org.hyperledger.besu.plugin.services.MetricsSystem; import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.time.Duration; import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function; import java.util.function.Function;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -52,12 +50,10 @@ public class BackwardSyncContext {
private final MetricsSystem metricsSystem; private final MetricsSystem metricsSystem;
private final SyncState syncState; private final SyncState syncState;
private final Map<Long, BackwardChain> backwardChainMap = new ConcurrentHashMap<>();
private final AtomicReference<BackwardChain> currentChain = new AtomicReference<>();
private final AtomicReference<CompletableFuture<Void>> currentBackwardSyncFuture = private final AtomicReference<CompletableFuture<Void>> currentBackwardSyncFuture =
new AtomicReference<>(); new AtomicReference<>();
private final BackwardSyncLookupService service; private final BackwardChain backwardChain;
private final StorageProvider storageProvider; private int batchSize = BATCH_SIZE;
public BackwardSyncContext( public BackwardSyncContext(
final ProtocolContext protocolContext, final ProtocolContext protocolContext,
@ -65,16 +61,14 @@ public class BackwardSyncContext {
final MetricsSystem metricsSystem, final MetricsSystem metricsSystem,
final EthContext ethContext, final EthContext ethContext,
final SyncState syncState, final SyncState syncState,
final BackwardSyncLookupService backwardSyncLookupService, final BackwardChain backwardChain) {
final StorageProvider storageProvider) {
this.protocolContext = protocolContext; this.protocolContext = protocolContext;
this.protocolSchedule = protocolSchedule; this.protocolSchedule = protocolSchedule;
this.ethContext = ethContext; this.ethContext = ethContext;
this.metricsSystem = metricsSystem; this.metricsSystem = metricsSystem;
this.syncState = syncState; this.syncState = syncState;
this.service = backwardSyncLookupService; this.backwardChain = backwardChain;
this.storageProvider = storageProvider;
} }
public boolean isSyncing() { public boolean isSyncing() {
@ -83,114 +77,49 @@ public class BackwardSyncContext {
.orElse(Boolean.FALSE); .orElse(Boolean.FALSE);
} }
public CompletableFuture<Void> syncBackwardsUntil(final Hash newBlockhash) { public CompletableFuture<Void> syncBackwardsUntil(final Hash newBlockHash) {
final Optional<BackwardChain> chain = getCurrentChain(); final CompletableFuture<Void> future = this.currentBackwardSyncFuture.get();
CompletableFuture<List<Block>> completableFuture; if (backwardChain.isTrusted(newBlockHash)) {
if (chain.isPresent() && chain.get().isTrusted(newBlockhash)) {
debugLambda( debugLambda(
LOG, LOG,
"not fetching and appending hash {} to backwards sync since it is present in successors", "not fetching or appending hash {} to backwards sync since it is present in successors",
newBlockhash::toHexString); newBlockHash::toHexString);
completableFuture = CompletableFuture.completedFuture(Collections.emptyList()); return future;
} else {
completableFuture = service.lookup(newBlockhash);
} }
backwardChain.addNewHash(newBlockHash);
// kick off async process to fetch this block by hash then delegate to syncBackwardsUntil if (future != null) {
final CompletableFuture<Void> future = return future;
completableFuture.thenCompose(
blocks -> {
if (blocks.isEmpty()) {
return CompletableFuture.completedFuture(null);
} else return this.syncBackwardsUntil(blocks);
});
this.currentBackwardSyncFuture.set(future);
return future;
}
private CompletionStage<Void> syncBackwardsUntil(final List<Block> blocks) {
CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
for (Block block : blocks) {
future = future.thenCompose(unused -> syncBackwardsUntil(block));
} }
return future; infoLambda(LOG, "Starting new backward sync towards a pivot {}", newBlockHash::toHexString);
this.currentBackwardSyncFuture.set(prepareBackwardSyncFutureWithRetry());
return this.currentBackwardSyncFuture.get();
} }
public CompletableFuture<Void> syncBackwardsUntil(final Block newPivot) { public CompletableFuture<Void> syncBackwardsUntil(final Block newPivot) {
final BackwardChain backwardChain = currentChain.get(); final CompletableFuture<Void> future = this.currentBackwardSyncFuture.get();
if (backwardChain == null) { if (backwardChain.isTrusted(newPivot.getHash())) {
debugLambda( debugLambda(
LOG, LOG,
"Starting new backward sync towards a pivot {} at height {}", "not fetching or appending hash {} to backwards sync since it is present in successors",
() -> newPivot.getHash().toString().substring(0, 20), () -> newPivot.getHash().toHexString());
() -> newPivot.getHeader().getNumber()); return future;
final BackwardChain newChain =
new BackwardChain(
storageProvider,
ScheduleBasedBlockHeaderFunctions.create(protocolSchedule),
newPivot);
this.currentChain.set(newChain);
backwardChainMap.put(newPivot.getHeader().getNumber(), newChain);
currentBackwardSyncFuture.set(prepareBackwardSyncFutureWithRetry(newChain));
return currentBackwardSyncFuture.get();
} }
if (newPivot.getHeader().getParentHash().equals(currentChain.get().getPivot().getHash())) { backwardChain.appendTrustedBlock(newPivot);
LOG.debug( if (future != null) {
"Backward sync is ongoing. Appending expected next block to the end of backward sync chain"); return future;
backwardChain.appendExpectedBlock(newPivot);
backwardChainMap.put(newPivot.getHeader().getNumber(), backwardChain);
return currentBackwardSyncFuture.get();
} }
debugLambda( infoLambda(
LOG, LOG,
"Stopping existing backward sync from pivot {} at height {} and restarting with pivot {} at height {}", "Starting new backward sync towards a pivot {}({})",
() -> backwardChain.getPivot().getHash().toString().substring(0, 20), () -> newPivot.getHeader().getNumber(),
() -> backwardChain.getPivot().getHeader().getNumber(), () -> newPivot.getHash().toHexString());
() -> newPivot.getHash().toString().substring(0, 20), this.currentBackwardSyncFuture.set(prepareBackwardSyncFutureWithRetry());
() -> newPivot.getHeader().getNumber()); return this.currentBackwardSyncFuture.get();
BackwardChain newBackwardChain =
new BackwardChain(
storageProvider, ScheduleBasedBlockHeaderFunctions.create(protocolSchedule), newPivot);
backwardChainMap.put(newPivot.getHeader().getNumber(), newBackwardChain);
this.currentChain.set(
newBackwardChain); // the current ongoing backward sync will finish its current step and end
currentBackwardSyncFuture.set(
currentBackwardSyncFuture
.get()
.handle(
(unused, error) -> {
if (error != null) {
if ((error.getCause() != null)
&& (error.getCause() instanceof BackwardSyncException)) {
LOG.info(
"Previous Backward sync ended exceptionally with message {}",
error.getMessage());
} else {
LOG.info(
"Previous Backward sync ended exceptionally with message {}",
error.getMessage());
if (error instanceof RuntimeException) {
throw (RuntimeException) error;
} else {
throw new BackwardSyncException(error);
}
}
} else {
LOG.info("The previous backward sync finished without and exception");
}
return newBackwardChain;
})
.thenCompose(this::prepareBackwardSyncFutureWithRetry));
return currentBackwardSyncFuture.get();
} }
private CompletableFuture<Void> prepareBackwardSyncFutureWithRetry( private CompletableFuture<Void> prepareBackwardSyncFutureWithRetry() {
final BackwardChain backwardChain) {
CompletableFuture<Void> f = prepareBackwardSyncFuture(backwardChain); CompletableFuture<Void> f = prepareBackwardSyncFuture();
for (int i = 0; i < MAX_RETRIES; i++) { for (int i = 0; i < MAX_RETRIES; i++) {
f = f =
f.thenApply(CompletableFuture::completedFuture) f.thenApply(CompletableFuture::completedFuture)
@ -207,13 +136,13 @@ public class BackwardSyncContext {
return ethContext return ethContext
.getScheduler() .getScheduler()
.scheduleFutureTask( .scheduleFutureTask(
() -> prepareBackwardSyncFuture(backwardChain), Duration.ofSeconds(5)); () -> prepareBackwardSyncFuture(), Duration.ofSeconds(5));
}) })
.thenCompose(Function.identity()); .thenCompose(Function.identity());
} }
return f.handle( return f.handle(
(unused, throwable) -> { (unused, throwable) -> {
this.cleanup(backwardChain); this.currentBackwardSyncFuture.set(null);
if (throwable != null) { if (throwable != null) {
throw new BackwardSyncException(throwable); throw new BackwardSyncException(throwable);
} }
@ -221,20 +150,8 @@ public class BackwardSyncContext {
}); });
} }
private CompletableFuture<Void> prepareBackwardSyncFuture(final BackwardChain backwardChain) { private CompletableFuture<Void> prepareBackwardSyncFuture() {
return new BackwardSyncPhase(this, backwardChain) return executeNextStep(null);
.executeAsync(null)
.thenCompose(new ForwardSyncPhase(this, backwardChain)::executeAsync);
}
private void cleanup(final BackwardChain chain) {
if (currentChain.compareAndSet(chain, null)) {
this.currentBackwardSyncFuture.set(null);
}
}
public Optional<BackwardChain> getCurrentChain() {
return Optional.ofNullable(currentChain.get());
} }
public ProtocolSchedule getProtocolSchedule() { public ProtocolSchedule getProtocolSchedule() {
@ -261,15 +178,96 @@ public class BackwardSyncContext {
return getBlockValidator(block.getHeader().getNumber()); return getBlockValidator(block.getHeader().getNumber());
} }
public Optional<BackwardChain> findCorrectChainFromPivot(final long number) { public boolean isOnTTD() {
return Optional.ofNullable(backwardChainMap.get(number)); return syncState.hasReachedTerminalDifficulty().orElse(false);
} }
public void putCurrentChainToHeight(final long height, final BackwardChain backwardChain) { public CompletableFuture<Void> stop() {
backwardChainMap.put(height, backwardChain); return currentBackwardSyncFuture.get();
} }
public boolean isOnTTD() { public CompletableFuture<Void> executeNextStep(final Void unused) {
return syncState.hasReachedTerminalDifficulty().orElse(false); final Optional<Hash> firstHash = backwardChain.getFirstHash();
if (firstHash.isPresent()) {
return executeSyncStep(firstHash.get());
}
if (!isOnTTD()) {
return waitForTTD().thenCompose(this::executeNextStep);
}
final Optional<BlockHeader> firstAncestorHeader = backwardChain.getFirstAncestorHeader();
if (firstAncestorHeader.isEmpty()) {
LOG.info("The Backward sync is done...");
return CompletableFuture.completedFuture(null);
}
if (getProtocolContext().getBlockchain().getChainHead().getHeight()
> firstAncestorHeader.get().getNumber() - 1) {
LOG.info(
"Backward reached bellow previous head {}({}) : {} ({})",
getProtocolContext().getBlockchain().getChainHead().getHeight(),
getProtocolContext().getBlockchain().getChainHead().getHash().toHexString(),
firstAncestorHeader.get().getNumber(),
firstAncestorHeader.get().getHash());
}
if (getProtocolContext().getBlockchain().contains(firstAncestorHeader.get().getParentHash())) {
return executeForwardAsync(firstAncestorHeader.get());
}
return executeBackwardAsync(firstAncestorHeader.get());
}
private CompletableFuture<Void> executeSyncStep(final Hash hash) {
return new SyncStepStep(this, backwardChain).executeAsync(hash);
}
@VisibleForTesting
protected CompletableFuture<Void> executeBackwardAsync(final BlockHeader firstHeader) {
return new BackwardSyncStep(this, backwardChain).executeAsync(firstHeader);
}
@VisibleForTesting
protected CompletableFuture<Void> executeForwardAsync(final BlockHeader firstHeader) {
return new ForwardSyncStep(this, backwardChain).executeAsync();
}
@VisibleForTesting
protected CompletableFuture<Void> waitForTTD() {
final CountDownLatch latch = new CountDownLatch(1);
final long id =
syncState.subscribeTTDReached(
reached -> {
if (reached) {
latch.countDown();
}
});
return CompletableFuture.runAsync(
() -> {
try {
if (!isOnTTD()) {
LOG.info("Waiting for TTD...");
final boolean await = latch.await(2, TimeUnit.MINUTES);
if (await) {
LOG.info("TTD reached...");
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new BackwardSyncException("Wait for TTD was interrupted");
} finally {
syncState.unsubscribeTTDReached(id);
}
});
}
// In rare case when we request too many headers/blocks we get response that does not contain all
// data and we might want to retry with smaller batch size
public int getBatchSize() {
return batchSize;
}
public void halveBatchSize() {
this.batchSize = batchSize / 2 + 1;
}
public void resetBatchSize() {
this.batchSize = BATCH_SIZE;
} }
} }

@ -1,172 +0,0 @@
/*
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.backwardsync;
import static org.slf4j.LoggerFactory.getLogger;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.RetryingGetBlockFromPeersTask;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
@ThreadSafe
public class BackwardSyncLookupService {
private static final Logger LOG = getLogger(BackwardSyncLookupService.class);
private static final int MAX_RETRIES = 100;
public static final int UNUSED = -1;
@GuardedBy("this")
private final Queue<Hash> hashes = new ArrayDeque<>();
@GuardedBy("this")
boolean running = false;
private final ProtocolSchedule protocolSchedule;
private final EthContext ethContext;
private final MetricsSystem metricsSystem;
private List<Block> results = new ArrayList<>();
private final ProtocolContext protocolContext;
public BackwardSyncLookupService(
final ProtocolSchedule protocolSchedule,
final EthContext ethContext,
final MetricsSystem metricsSystem,
final ProtocolContext protocolContext) {
this.protocolSchedule = protocolSchedule;
this.ethContext = ethContext;
this.metricsSystem = metricsSystem;
this.protocolContext = protocolContext;
}
public CompletableFuture<List<Block>> lookup(final Hash newBlockhash) {
synchronized (this) {
hashes.add(newBlockhash);
if (running) {
LOG.info(
"some other future is already running and will process our hash {} when time comes...",
newBlockhash.toHexString());
return CompletableFuture.completedFuture(Collections.emptyList());
}
running = true;
}
return findBlocksWithRetries()
.handle(
(blocks, throwable) -> {
synchronized (this) {
running = false;
}
if (throwable != null) {
throw new BackwardSyncException(throwable);
}
return blocks;
});
}
private CompletableFuture<List<Block>> findBlocksWithRetries() {
CompletableFuture<List<Block>> f = tryToFindBlocks();
for (int i = 0; i < MAX_RETRIES; i++) {
f =
f.thenApply(CompletableFuture::completedFuture)
.exceptionally(
ex -> {
synchronized (this) {
if (!results.isEmpty()) {
List<Block> copy = new ArrayList<>(results);
results = new ArrayList<>();
return CompletableFuture.completedFuture(copy);
}
}
LOG.error(
"Failed to fetch blocks because {} Current peers: {}. Waiting for few seconds ...",
ex.getMessage(),
ethContext.getEthPeers().peerCount());
return ethContext
.getScheduler()
.scheduleFutureTask(this::tryToFindBlocks, Duration.ofSeconds(5));
})
.thenCompose(Function.identity());
}
return f.thenApply(this::rememberResults).thenCompose(this::possibleNextHash);
}
private CompletableFuture<List<Block>> tryToFindBlocks() {
return CompletableFuture.supplyAsync(this::getNextHash)
.thenCompose(this::tryToFindBlock)
.thenApply(this::rememberResult)
.thenCompose(this::possibleNextHash);
}
private CompletableFuture<List<Block>> possibleNextHash(final List<Block> blocks) {
synchronized (this) {
hashes.poll();
if (hashes.isEmpty()) {
results = new ArrayList<>();
running = false;
return CompletableFuture.completedFuture(blocks);
}
}
return tryToFindBlocks();
}
private List<Block> rememberResult(final Block block) {
this.results.add(block);
return results;
}
private List<Block> rememberResults(final List<Block> blocks) {
this.results.addAll(blocks);
return results;
}
private synchronized Hash getNextHash() {
return hashes.peek();
}
private CompletableFuture<Block> tryToFindBlock(final Hash targetHash) {
final RetryingGetBlockFromPeersTask getBlockTask =
RetryingGetBlockFromPeersTask.create(
protocolContext,
protocolSchedule,
ethContext,
metricsSystem,
ethContext.getEthPeers().getMaxPeers(),
Optional.of(targetHash),
UNUSED);
return ethContext
.getScheduler()
.scheduleSyncWorkerTask(getBlockTask::run)
.thenApply(AbstractPeerTask.PeerTaskResult::getResult);
}
}

@ -1,193 +0,0 @@
/*
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.backwardsync;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.debugLambda;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.infoLambda;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.task.GetHeadersFromPeerByHashTask;
import org.hyperledger.besu.ethereum.eth.manager.task.RetryingGetHeadersEndingAtFromPeerByHashTask;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BackwardSyncPhase extends BackwardSyncTask {
private static final Logger LOG = LoggerFactory.getLogger(BackwardSyncPhase.class);
public BackwardSyncPhase(final BackwardSyncContext context, final BackwardChain backwardChain) {
super(context, backwardChain);
}
@VisibleForTesting
protected CompletableFuture<Void> waitForTTD() {
if (context.isOnTTD()) {
return CompletableFuture.completedFuture(null);
}
LOG.debug("Did not reach TTD yet, falling asleep...");
return context
.getEthContext()
.getScheduler()
.scheduleFutureTask(this::waitForTTD, Duration.ofSeconds(5));
}
@Override
public CompletableFuture<Void> executeStep() {
return CompletableFuture.supplyAsync(this::waitForTTD)
.thenCompose(Function.identity())
.thenApply(this::earliestUnprocessedHash)
.thenCompose(this::requestHeaders)
.thenApply(this::saveHeaders)
.thenApply(this::possibleMerge)
.thenCompose(this::possiblyMoreBackwardSteps);
}
@VisibleForTesting
protected Hash earliestUnprocessedHash(final Void unused) {
BlockHeader firstHeader =
backwardChain
.getFirstAncestorHeader()
.orElseThrow(
() ->
new BackwardSyncException(
"No unprocessed hashes during backward sync. that is probably a bug.",
true));
Hash parentHash = firstHeader.getParentHash();
debugLambda(
LOG,
"First unprocessed hash for current pivot is {} expected on height {}",
parentHash::toHexString,
() -> firstHeader.getNumber() - 1);
return parentHash;
}
@VisibleForTesting
protected CompletableFuture<BlockHeader> requestHeader(final Hash hash) {
debugLambda(LOG, "Requesting header for hash {}", hash::toHexString);
return GetHeadersFromPeerByHashTask.forSingleHash(
context.getProtocolSchedule(),
context.getEthContext(),
hash,
context.getProtocolContext().getBlockchain().getChainHead().getHeight(),
context.getMetricsSystem())
.run()
.thenApply(
peerResult -> {
final List<BlockHeader> result = peerResult.getResult();
if (result.isEmpty()) {
throw new BackwardSyncException(
"Did not receive a header for hash {}" + hash.toHexString(), true);
}
BlockHeader blockHeader = result.get(0);
debugLambda(
LOG,
"Got header {} with height {}",
() -> blockHeader.getHash().toHexString(),
blockHeader::getNumber);
return blockHeader;
});
}
@VisibleForTesting
protected CompletableFuture<List<BlockHeader>> requestHeaders(final Hash hash) {
debugLambda(LOG, "Requesting header for hash {}", hash::toHexString);
final RetryingGetHeadersEndingAtFromPeerByHashTask
retryingGetHeadersEndingAtFromPeerByHashTask =
RetryingGetHeadersEndingAtFromPeerByHashTask.endingAtHash(
context.getProtocolSchedule(),
context.getEthContext(),
hash,
context.getProtocolContext().getBlockchain().getChainHead().getHeight(),
BackwardSyncContext.BATCH_SIZE,
context.getMetricsSystem());
return context
.getEthContext()
.getScheduler()
.scheduleSyncWorkerTask(retryingGetHeadersEndingAtFromPeerByHashTask::run)
.thenApply(
blockHeaders -> {
if (blockHeaders.isEmpty()) {
throw new BackwardSyncException(
"Did not receive a header for hash {}" + hash.toHexString(), true);
}
debugLambda(
LOG,
"Got headers {} -> {}",
blockHeaders.get(0)::getNumber,
blockHeaders.get(blockHeaders.size() - 1)::getNumber);
return blockHeaders;
});
}
@VisibleForTesting
protected Void saveHeader(final BlockHeader blockHeader) {
backwardChain.prependAncestorsHeader(blockHeader);
context.putCurrentChainToHeight(blockHeader.getNumber(), backwardChain);
return null;
}
@VisibleForTesting
protected Void saveHeaders(final List<BlockHeader> blockHeaders) {
for (BlockHeader blockHeader : blockHeaders) {
saveHeader(blockHeader);
}
infoLambda(
LOG,
"Saved headers {} -> {}",
() -> blockHeaders.get(0).getNumber(),
() -> blockHeaders.get(blockHeaders.size() - 1).getNumber());
return null;
}
@VisibleForTesting
protected BlockHeader possibleMerge(final Void unused) {
Optional<BackwardChain> maybeHistoricalBackwardChain =
context.findCorrectChainFromPivot(
backwardChain.getFirstAncestorHeader().orElseThrow().getNumber() - 1);
maybeHistoricalBackwardChain.ifPresent(backwardChain::prependChain);
return backwardChain.getFirstAncestorHeader().orElseThrow();
}
// if the previous header is not present yet, we need to go deeper
@VisibleForTesting
protected CompletableFuture<Void> possiblyMoreBackwardSteps(final BlockHeader blockHeader) {
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
if (context.getProtocolContext().getBlockchain().contains(blockHeader.getHash())) {
LOG.info("Backward Phase finished.");
completableFuture.complete(null);
return completableFuture;
}
if (context.getProtocolContext().getBlockchain().getChainHead().getHeight()
> blockHeader.getNumber() - 1) {
LOG.warn(
"Backward sync is following unknown branch {} ({}) and reached bellow previous head {}({})",
blockHeader.getNumber(),
blockHeader.getHash(),
context.getProtocolContext().getBlockchain().getChainHead().getHeight(),
context.getProtocolContext().getBlockchain().getChainHead().getHash().toHexString());
}
LOG.debug("Backward sync did not reach a know block, need to go deeper");
completableFuture.complete(null);
return completableFuture.thenCompose(this::executeAsync);
}
}

@ -0,0 +1,112 @@
/*
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.backwardsync;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.debugLambda;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.infoLambda;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.task.RetryingGetHeadersEndingAtFromPeerByHashTask;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BackwardSyncStep {
private static final Logger LOG = LoggerFactory.getLogger(BackwardSyncStep.class);
private final BackwardSyncContext context;
private final BackwardChain backwardChain;
public BackwardSyncStep(final BackwardSyncContext context, final BackwardChain backwardChain) {
this.context = context;
this.backwardChain = backwardChain;
}
public CompletableFuture<Void> executeAsync(final BlockHeader firstHeader) {
return CompletableFuture.supplyAsync(() -> firstHeader)
.thenApply(this::possibleRestoreOldNodes)
.thenCompose(this::requestHeaders)
.thenApply(this::saveHeaders)
.thenCompose(context::executeNextStep);
}
@VisibleForTesting
protected Hash possibleRestoreOldNodes(final BlockHeader firstAncestor) {
Hash lastHash = firstAncestor.getParentHash();
Optional<BlockHeader> iterator = backwardChain.getHeader(lastHash);
while (iterator.isPresent()) {
backwardChain.prependAncestorsHeader(iterator.get());
lastHash = iterator.get().getParentHash();
iterator = backwardChain.getHeader(lastHash);
}
return lastHash;
}
@VisibleForTesting
protected CompletableFuture<List<BlockHeader>> requestHeaders(final Hash hash) {
debugLambda(LOG, "Requesting header for hash {}", hash::toHexString);
final RetryingGetHeadersEndingAtFromPeerByHashTask
retryingGetHeadersEndingAtFromPeerByHashTask =
RetryingGetHeadersEndingAtFromPeerByHashTask.endingAtHash(
context.getProtocolSchedule(),
context.getEthContext(),
hash,
context.getProtocolContext().getBlockchain().getChainHead().getHeight(),
context.getBatchSize(),
context.getMetricsSystem());
return context
.getEthContext()
.getScheduler()
.scheduleSyncWorkerTask(retryingGetHeadersEndingAtFromPeerByHashTask::run)
.thenApply(
blockHeaders -> {
if (blockHeaders.isEmpty()) {
throw new BackwardSyncException(
"Did not receive a header for hash {}" + hash.toHexString(), true);
}
debugLambda(
LOG,
"Got headers {} -> {}",
blockHeaders.get(0)::getNumber,
blockHeaders.get(blockHeaders.size() - 1)::getNumber);
return blockHeaders;
});
}
@VisibleForTesting
protected Void saveHeader(final BlockHeader blockHeader) {
backwardChain.prependAncestorsHeader(blockHeader);
return null;
}
@VisibleForTesting
protected Void saveHeaders(final List<BlockHeader> blockHeaders) {
for (BlockHeader blockHeader : blockHeaders) {
saveHeader(blockHeader);
}
infoLambda(
LOG,
"Saved headers {} -> {} (head: {})",
() -> blockHeaders.get(0).getNumber(),
() -> blockHeaders.get(blockHeaders.size() - 1).getNumber(),
() -> context.getProtocolContext().getBlockchain().getChainHead().getHeight());
return null;
}
}

@ -1,58 +0,0 @@
/*
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.backwardsync;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
public abstract class BackwardSyncTask {
protected BackwardSyncContext context;
protected BackwardChain backwardChain;
private static final Logger LOG = getLogger(BackwardSyncTask.class);
protected BackwardSyncTask(final BackwardSyncContext context, final BackwardChain backwardChain) {
this.context = context;
this.backwardChain = backwardChain;
}
CompletableFuture<Void> executeAsync(final Void unused) {
Optional<BackwardChain> currentChain = context.getCurrentChain();
if (currentChain.isPresent()) {
if (!backwardChain.equals(currentChain.get())) {
LOG.debug(
"The pivot changed, we should stop current flow, some new flow is waiting to take over...");
return CompletableFuture.completedFuture(null);
}
if (backwardChain.getFirstAncestorHeader().isEmpty()) {
LOG.info("The Backwards sync is already finished...");
return CompletableFuture.completedFuture(null);
}
return executeStep();
} else {
CompletableFuture<Void> result = new CompletableFuture<>();
result.completeExceptionally(
new BackwardSyncException(
"No pivot... that is weird and should not have happened. This method should have been called after the pivot was set..."));
return result;
}
}
abstract CompletableFuture<Void> executeStep();
}

@ -16,7 +16,7 @@ package org.hyperledger.besu.ethereum.eth.sync.backwardsync;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.debugLambda; import static org.hyperledger.besu.util.Slf4jLambdaHelper.debugLambda;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.infoLambda; import static org.hyperledger.besu.util.Slf4jLambdaHelper.infoLambda;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.warnLambda; import static org.hyperledger.besu.util.Slf4jLambdaHelper.traceLambda;
import org.hyperledger.besu.ethereum.core.Block; import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.BlockHeader;
@ -30,31 +30,31 @@ import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public class ForwardSyncPhase extends BackwardSyncTask { public class ForwardSyncStep {
private static final Logger LOG = LoggerFactory.getLogger(ForwardSyncPhase.class); private static final Logger LOG = LoggerFactory.getLogger(ForwardSyncStep.class);
private int batchSize = BackwardSyncContext.BATCH_SIZE; private final BackwardSyncContext context;
private final BackwardChain backwardChain;
public ForwardSyncPhase(final BackwardSyncContext context, final BackwardChain backwardChain) { public ForwardSyncStep(final BackwardSyncContext context, final BackwardChain backwardChain) {
super(context, backwardChain); this.context = context;
this.backwardChain = backwardChain;
} }
@Override public CompletableFuture<Void> executeAsync() {
public CompletableFuture<Void> executeStep() {
return CompletableFuture.supplyAsync(() -> returnFirstNUnknownHeaders(null)) return CompletableFuture.supplyAsync(() -> returnFirstNUnknownHeaders(null))
.thenCompose(this::possibleRequestBodies) .thenCompose(this::possibleRequestBodies)
.thenApply(this::processKnownAncestors) .thenApply(this::processKnownAncestors)
.thenCompose(this::possiblyMoreForwardSteps); .thenCompose(context::executeNextStep);
} }
@VisibleForTesting @VisibleForTesting
protected BlockHeader processKnownAncestors(final Void unused) { protected Void processKnownAncestors(final Void unused) {
while (backwardChain.getFirstAncestorHeader().isPresent()) { while (backwardChain.getFirstAncestorHeader().isPresent()) {
BlockHeader header = backwardChain.getFirstAncestorHeader().orElseThrow(); BlockHeader header = backwardChain.getFirstAncestorHeader().orElseThrow();
if (context.getProtocolContext().getBlockchain().contains(header.getHash())) { if (context.getProtocolContext().getBlockchain().contains(header.getHash())) {
@ -63,7 +63,8 @@ public class ForwardSyncPhase extends BackwardSyncTask {
"Block {} is already imported, we can ignore it for the sync process", "Block {} is already imported, we can ignore it for the sync process",
() -> header.getHash().toHexString()); () -> header.getHash().toHexString());
backwardChain.dropFirstHeader(); backwardChain.dropFirstHeader();
} else if (backwardChain.isTrusted(header.getHash())) { } else if (context.getProtocolContext().getBlockchain().contains(header.getParentHash())
&& backwardChain.isTrusted(header.getHash())) {
debugLambda( debugLambda(
LOG, LOG,
"Importing trusted block {}({})", "Importing trusted block {}({})",
@ -72,7 +73,7 @@ public class ForwardSyncPhase extends BackwardSyncTask {
saveBlock(backwardChain.getTrustedBlock(header.getHash())); saveBlock(backwardChain.getTrustedBlock(header.getHash()));
} else { } else {
debugLambda(LOG, "First unprocessed header is {}", header::getNumber); debugLambda(LOG, "First unprocessed header is {}", header::getNumber);
return header; return null;
} }
} }
return null; return null;
@ -86,7 +87,7 @@ public class ForwardSyncPhase extends BackwardSyncTask {
debugLambda( debugLambda(
LOG, LOG,
"Block {}({}) is already imported, we can ignore it for the sync process", "Block {}({}) is already imported, we can ignore it for the sync process",
() -> header.getNumber(), header::getNumber,
() -> header.getHash().toHexString()); () -> header.getHash().toHexString());
backwardChain.dropFirstHeader(); backwardChain.dropFirstHeader();
} else if (backwardChain.isTrusted(header.getHash())) { } else if (backwardChain.isTrusted(header.getHash())) {
@ -96,7 +97,7 @@ public class ForwardSyncPhase extends BackwardSyncTask {
() -> header.getHash().toHexString()); () -> header.getHash().toHexString());
saveBlock(backwardChain.getTrustedBlock(header.getHash())); saveBlock(backwardChain.getTrustedBlock(header.getHash()));
} else { } else {
return backwardChain.getFirstNAncestorHeaders(batchSize); return backwardChain.getFirstNAncestorHeaders(context.getBatchSize());
} }
} }
return Collections.emptyList(); return Collections.emptyList();
@ -167,7 +168,7 @@ public class ForwardSyncPhase extends BackwardSyncTask {
@VisibleForTesting @VisibleForTesting
protected Void saveBlock(final Block block) { protected Void saveBlock(final Block block) {
debugLambda(LOG, "Going to validate block {}", () -> block.getHeader().getHash().toHexString()); traceLambda(LOG, "Going to validate block {}", () -> block.getHeader().getHash().toHexString());
var optResult = var optResult =
context context
.getBlockValidatorForBlock(block) .getBlockValidatorForBlock(block)
@ -179,7 +180,7 @@ public class ForwardSyncPhase extends BackwardSyncTask {
optResult.blockProcessingOutputs.ifPresent( optResult.blockProcessingOutputs.ifPresent(
result -> { result -> {
debugLambda( traceLambda(
LOG, LOG,
"Block {} was validated, going to import it", "Block {} was validated, going to import it",
() -> block.getHeader().getHash().toHexString()); () -> block.getHeader().getHash().toHexString());
@ -191,6 +192,11 @@ public class ForwardSyncPhase extends BackwardSyncTask {
@VisibleForTesting @VisibleForTesting
protected Void saveBlocks(final List<Block> blocks) { protected Void saveBlocks(final List<Block> blocks) {
if (blocks.isEmpty()) {
LOG.info("No blocks to save...");
context.halveBatchSize();
return null;
}
for (Block block : blocks) { for (Block block : blocks) {
final Optional<Block> parent = final Optional<Block> parent =
@ -199,59 +205,20 @@ public class ForwardSyncPhase extends BackwardSyncTask {
.getBlockchain() .getBlockchain()
.getBlockByHash(block.getHeader().getParentHash()); .getBlockByHash(block.getHeader().getParentHash());
if (parent.isEmpty()) { if (parent.isEmpty()) {
batchSize = batchSize / 2 + 1; context.halveBatchSize();
return null; return null;
} else { } else {
batchSize = BackwardSyncContext.BATCH_SIZE;
saveBlock(block); saveBlock(block);
} }
} }
backwardChain.commit();
infoLambda( infoLambda(
LOG, LOG,
"Saved blocks {}->{}", "Saved blocks {} -> {} (target: {})",
() -> blocks.get(0).getHeader().getNumber(), () -> blocks.get(0).getHeader().getNumber(),
() -> blocks.get(blocks.size() - 1).getHeader().getNumber()); () -> blocks.get(blocks.size() - 1).getHeader().getNumber(),
() ->
backwardChain.getPivot().orElse(blocks.get(blocks.size() - 1)).getHeader().getNumber());
context.resetBatchSize();
return null; return null;
} }
@VisibleForTesting
protected CompletableFuture<Void> possiblyMoreForwardSteps(final BlockHeader firstNotSynced) {
CompletableFuture<Void> completableFuture = CompletableFuture.completedFuture(null);
if (firstNotSynced == null) {
final List<Block> successors = backwardChain.getSuccessors();
LOG.info(
"Forward Sync Phase is finished. Importing {} block(s) provided by consensus layer...",
successors.size());
successors.forEach(
block -> {
if (!context.getProtocolContext().getBlockchain().contains(block.getHash())) {
saveBlock(block);
}
});
LOG.info("The Backward sync is done...");
backwardChain.clear();
return CompletableFuture.completedFuture(null);
}
if (context.getProtocolContext().getBlockchain().contains(firstNotSynced.getParentHash())) {
debugLambda(
LOG,
"Block {} is not yet imported, we need to run another step of ForwardSync",
firstNotSynced::toLogString);
return completableFuture.thenCompose(this::executeAsync);
}
warnLambda(
LOG,
"Block {} is not yet imported but its parent {} is not imported either... "
+ "This should not normally happen and indicates a wrong behaviour somewhere...",
firstNotSynced::toLogString,
() -> firstNotSynced.getParentHash().toHexString());
return completableFuture.thenCompose(this::executeBackwardAsync);
}
@VisibleForTesting
protected CompletionStage<Void> executeBackwardAsync(final Void unused) {
return new BackwardSyncPhase(context, backwardChain).executeAsync(unused);
}
} }

@ -20,6 +20,7 @@ import org.hyperledger.besu.plugin.services.storage.KeyValueStorageTransaction;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.Map;
import java.util.Optional; import java.util.Optional;
public class GenericKeyValueStorageFacade<K, V> implements Closeable { public class GenericKeyValueStorageFacade<K, V> implements Closeable {
@ -40,12 +41,22 @@ public class GenericKeyValueStorageFacade<K, V> implements Closeable {
return storage.get(keyConvertor.toBytes(key)).map(valueConvertor::fromBytes); return storage.get(keyConvertor.toBytes(key)).map(valueConvertor::fromBytes);
} }
public Optional<V> get(final byte[] key) {
return storage.get(key).map(valueConvertor::fromBytes);
}
public void put(final K key, final V value) { public void put(final K key, final V value) {
final KeyValueStorageTransaction keyValueStorageTransaction = storage.startTransaction(); final KeyValueStorageTransaction keyValueStorageTransaction = storage.startTransaction();
keyValueStorageTransaction.put(keyConvertor.toBytes(key), valueConvertor.toBytes(value)); keyValueStorageTransaction.put(keyConvertor.toBytes(key), valueConvertor.toBytes(value));
keyValueStorageTransaction.commit(); keyValueStorageTransaction.commit();
} }
public void put(final byte[] key, final V value) {
final KeyValueStorageTransaction keyValueStorageTransaction = storage.startTransaction();
keyValueStorageTransaction.put(key, valueConvertor.toBytes(value));
keyValueStorageTransaction.commit();
}
public void drop(final K key) { public void drop(final K key) {
storage.tryDelete(keyConvertor.toBytes(key)); storage.tryDelete(keyConvertor.toBytes(key));
} }
@ -58,4 +69,13 @@ public class GenericKeyValueStorageFacade<K, V> implements Closeable {
public void close() throws IOException { public void close() throws IOException {
storage.close(); storage.close();
} }
public void putAll(final Map<K, V> map) {
final KeyValueStorageTransaction keyValueStorageTransaction = storage.startTransaction();
for (Map.Entry<K, V> entry : map.entrySet()) {
keyValueStorageTransaction.put(
keyConvertor.toBytes(entry.getKey()), valueConvertor.toBytes(entry.getValue()));
}
keyValueStorageTransaction.commit();
}
} }

@ -0,0 +1,34 @@
/*
*
* * Copyright Hyperledger Besu Contributors.
* *
* * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* * the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* * specific language governing permissions and limitations under the License.
* *
* * SPDX-License-Identifier: Apache-2.0
*
*/
package org.hyperledger.besu.ethereum.eth.sync.backwardsync;
import org.hyperledger.besu.datatypes.Hash;
import org.apache.tuweni.bytes.Bytes32;
public class HashConvertor implements ValueConvertor<Hash> {
@Override
public Hash fromBytes(final byte[] bytes) {
return Hash.wrap(Bytes32.wrap(bytes));
}
@Override
public byte[] toBytes(final Hash value) {
return value.toArrayUnsafe();
}
}

@ -0,0 +1,74 @@
/*
*
* * Copyright Hyperledger Besu Contributors.
* *
* * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* * the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* * specific language governing permissions and limitations under the License.
* *
* * SPDX-License-Identifier: Apache-2.0
*
*/
package org.hyperledger.besu.ethereum.eth.sync.backwardsync;
import static org.slf4j.LoggerFactory.getLogger;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.RetryingGetBlockFromPeersTask;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
public class SyncStepStep {
private static final Logger LOG = getLogger(SyncStepStep.class);
public static final int UNUSED = -1;
private final BackwardSyncContext context;
private final BackwardChain backwardChain;
public SyncStepStep(final BackwardSyncContext context, final BackwardChain backwardChain) {
this.context = context;
this.backwardChain = backwardChain;
}
public CompletableFuture<Void> executeAsync(final Hash hash) {
return CompletableFuture.supplyAsync(() -> hash)
.thenCompose(this::requestBlock)
.thenApply(this::saveBlock)
.thenCompose(context::executeNextStep);
}
private CompletableFuture<Block> requestBlock(final Hash targetHash) {
final RetryingGetBlockFromPeersTask getBlockTask =
RetryingGetBlockFromPeersTask.create(
context.getProtocolContext(),
context.getProtocolSchedule(),
context.getEthContext(),
context.getMetricsSystem(),
context.getEthContext().getEthPeers().getMaxPeers(),
Optional.of(targetHash),
UNUSED);
return context
.getEthContext()
.getScheduler()
.scheduleSyncWorkerTask(getBlockTask::run)
.thenApply(AbstractPeerTask.PeerTaskResult::getResult);
}
private Void saveBlock(final Block block) {
LOG.debug(
"Appending block {}({})", block.getHeader().getNumber(), block.getHash().toHexString());
backwardChain.appendTrustedBlock(block);
return null;
}
}

@ -27,6 +27,7 @@ import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStateDownloadStatu
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import org.hyperledger.besu.plugin.data.SyncStatus; import org.hyperledger.besu.plugin.data.SyncStatus;
import org.hyperledger.besu.plugin.services.BesuEvents.SyncStatusListener; import org.hyperledger.besu.plugin.services.BesuEvents.SyncStatusListener;
import org.hyperledger.besu.plugin.services.BesuEvents.TTDReachedListener;
import org.hyperledger.besu.util.Subscribers; import org.hyperledger.besu.util.Subscribers;
import java.util.Map; import java.util.Map;
@ -42,6 +43,7 @@ public class SyncState {
private final AtomicLong inSyncSubscriberId = new AtomicLong(); private final AtomicLong inSyncSubscriberId = new AtomicLong();
private final Map<Long, InSyncTracker> inSyncTrackers = new ConcurrentHashMap<>(); private final Map<Long, InSyncTracker> inSyncTrackers = new ConcurrentHashMap<>();
private final Subscribers<SyncStatusListener> syncStatusListeners = Subscribers.create(); private final Subscribers<SyncStatusListener> syncStatusListeners = Subscribers.create();
private final Subscribers<TTDReachedListener> ttdReachedListeners = Subscribers.create();
private volatile long chainHeightListenerId; private volatile long chainHeightListenerId;
private volatile Optional<SyncTarget> syncTarget = Optional.empty(); private volatile Optional<SyncTarget> syncTarget = Optional.empty();
private Optional<WorldStateDownloadStatus> worldStateDownloadStatus = Optional.empty(); private Optional<WorldStateDownloadStatus> worldStateDownloadStatus = Optional.empty();
@ -109,10 +111,18 @@ public class SyncState {
return syncStatusListeners.subscribe(listener); return syncStatusListeners.subscribe(listener);
} }
public long subscribeTTDReached(final TTDReachedListener listener) {
return ttdReachedListeners.subscribe(listener);
}
public boolean unsubscribeSyncStatus(final long listenerId) { public boolean unsubscribeSyncStatus(final long listenerId) {
return syncStatusListeners.unsubscribe(listenerId); return syncStatusListeners.unsubscribe(listenerId);
} }
public boolean unsubscribeTTDReached(final long listenerId) {
return ttdReachedListeners.unsubscribe(listenerId);
}
public Optional<SyncStatus> syncStatus() { public Optional<SyncStatus> syncStatus() {
return syncStatus(syncTarget); return syncStatus(syncTarget);
} }
@ -143,6 +153,7 @@ public class SyncState {
// TODO: this is an inexpensive way to stop sync when we reach TTD, // TODO: this is an inexpensive way to stop sync when we reach TTD,
// we should revisit when merge sync is better defined // we should revisit when merge sync is better defined
this.reachedTerminalDifficulty = Optional.of(stoppedAtTerminalDifficulty); this.reachedTerminalDifficulty = Optional.of(stoppedAtTerminalDifficulty);
ttdReachedListeners.forEach(listener -> listener.onTTDReached(stoppedAtTerminalDifficulty));
} }
public Optional<Boolean> hasReachedTerminalDifficulty() { public Optional<Boolean> hasReachedTerminalDifficulty() {

@ -21,7 +21,10 @@ import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider.createInMemoryBlockchain; import static org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider.createInMemoryBlockchain;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import org.hyperledger.besu.config.StubGenesisConfigOptions; import org.hyperledger.besu.config.StubGenesisConfigOptions;
@ -32,29 +35,35 @@ import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.chain.MutableBlockchain; import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.core.Block; import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockDataGenerator; import org.hyperledger.besu.ethereum.core.BlockDataGenerator;
import org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider; import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.TransactionReceipt; import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil;
import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer; import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.mainnet.MainnetBlockHeaderFunctions;
import org.hyperledger.besu.ethereum.mainnet.MainnetProtocolSchedule; import org.hyperledger.besu.ethereum.mainnet.MainnetProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec; import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.referencetests.ReferenceTestWorldState; import org.hyperledger.besu.ethereum.referencetests.ReferenceTestWorldState;
import org.hyperledger.besu.plugin.services.BesuEvents;
import org.hyperledger.besu.plugin.services.MetricsSystem; import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.Answers; import org.mockito.Answers;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.Spy; import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner; import org.mockito.junit.MockitoJUnitRunner;
@ -86,13 +95,12 @@ public class BackwardSyncContextTest {
@Mock private BlockValidator blockValidator; @Mock private BlockValidator blockValidator;
@Mock private SyncState syncState; @Mock private SyncState syncState;
@Mock private BackwardSyncLookupService backwardSyncLookupService; private BackwardChain backwardChain;
@Before @Before
public void setup() { public void setup() {
when(mockProtocolSpec.getBlockValidator()).thenReturn(blockValidator); when(mockProtocolSpec.getBlockValidator()).thenReturn(blockValidator);
when(protocolSchedule.getByBlockNumber(anyLong())).thenReturn(mockProtocolSpec); when(protocolSchedule.getByBlockNumber(anyLong())).thenReturn(mockProtocolSpec);
when(syncState.hasReachedTerminalDifficulty()).thenReturn(Optional.of(true));
Block genesisBlock = blockDataGenerator.genesisBlock(); Block genesisBlock = blockDataGenerator.genesisBlock();
remoteBlockchain = createInMemoryBlockchain(genesisBlock); remoteBlockchain = createInMemoryBlockchain(genesisBlock);
localBlockchain = createInMemoryBlockchain(genesisBlock); localBlockchain = createInMemoryBlockchain(genesisBlock);
@ -126,6 +134,7 @@ public class BackwardSyncContextTest {
new ReferenceTestWorldState(), blockDataGenerator.receipts(block))); new ReferenceTestWorldState(), blockDataGenerator.receipts(block)));
}); });
backwardChain = newBackwardChain();
context = context =
spy( spy(
new BackwardSyncContext( new BackwardSyncContext(
@ -134,15 +143,31 @@ public class BackwardSyncContextTest {
metricsSystem, metricsSystem,
ethContext, ethContext,
syncState, syncState,
backwardSyncLookupService, backwardChain));
new InMemoryKeyValueStorageProvider())); doReturn(true).when(context).isOnTTD();
doReturn(2).when(context).getBatchSize();
}
private BackwardChain newBackwardChain() {
final GenericKeyValueStorageFacade<Hash, BlockHeader> headersStorage =
new GenericKeyValueStorageFacade<>(
Hash::toArrayUnsafe,
new BlocksHeadersConvertor(new MainnetBlockHeaderFunctions()),
new InMemoryKeyValueStorage());
final GenericKeyValueStorageFacade<Hash, Block> blocksStorage =
new GenericKeyValueStorageFacade<>(
Hash::toArrayUnsafe,
new BlocksConvertor(new MainnetBlockHeaderFunctions()),
new InMemoryKeyValueStorage());
final GenericKeyValueStorageFacade<Hash, Hash> chainStorage =
new GenericKeyValueStorageFacade<>(
Hash::toArrayUnsafe, new HashConvertor(), new InMemoryKeyValueStorage());
return new BackwardChain(headersStorage, blocksStorage, chainStorage);
} }
@Test @Test
public void shouldSyncUntilHash() throws Exception { public void shouldSyncUntilHash() throws Exception {
final Hash hash = getBlockByNumber(REMOTE_HEIGHT).getHash(); final Hash hash = getBlockByNumber(REMOTE_HEIGHT).getHash();
when(backwardSyncLookupService.lookup(hash))
.thenReturn(CompletableFuture.completedFuture(List.of(getBlockByNumber(REMOTE_HEIGHT))));
final CompletableFuture<Void> future = context.syncBackwardsUntil(hash); final CompletableFuture<Void> future = context.syncBackwardsUntil(hash);
respondUntilFutureIsDone(future); respondUntilFutureIsDone(future);
@ -151,15 +176,6 @@ public class BackwardSyncContextTest {
assertThat(localBlockchain.getChainHeadBlock()).isEqualTo(remoteBlockchain.getChainHeadBlock()); assertThat(localBlockchain.getChainHeadBlock()).isEqualTo(remoteBlockchain.getChainHeadBlock());
} }
@Test
public void shouldNotAppendWhenAlreadySyncingHash() {
final Hash hash = getBlockByNumber(REMOTE_HEIGHT).getHash();
when(backwardSyncLookupService.lookup(hash))
.thenReturn(CompletableFuture.completedFuture(Collections.emptyList()));
final CompletableFuture<Void> fut2 = context.syncBackwardsUntil(hash);
assertThat(fut2).isCompleted();
}
@Test @Test
public void shouldSyncUntilRemoteBranch() throws Exception { public void shouldSyncUntilRemoteBranch() throws Exception {
@ -189,32 +205,80 @@ public class BackwardSyncContextTest {
assertThat(localBlockchain.getChainHeadBlock()).isEqualTo(remoteBlockchain.getChainHeadBlock()); assertThat(localBlockchain.getChainHeadBlock()).isEqualTo(remoteBlockchain.getChainHeadBlock());
} }
private void respondUntilFutureIsDone(final CompletableFuture<Void> future) {
final RespondingEthPeer.Responder responder =
RespondingEthPeer.blockchainResponder(remoteBlockchain);
peer.respondWhileOtherThreadsWork(responder, () -> !future.isDone());
}
@Nonnull
private Block getBlockByNumber(final int number) {
return remoteBlockchain.getBlockByNumber(number).orElseThrow();
}
@Captor ArgumentCaptor<BesuEvents.TTDReachedListener> captor;
@Test @Test
public void shouldReplaceFlowWhenBlockWasSkipped() throws Exception { public void shouldWaitWhenTTDNotReached()
throws ExecutionException, InterruptedException, TimeoutException {
doReturn(false).when(context).isOnTTD();
when(syncState.subscribeTTDReached(any())).thenReturn(88L);
final CompletableFuture<Void> future = final CompletableFuture<Void> voidCompletableFuture = context.waitForTTD();
context.syncBackwardsUntil(getBlockByNumber(REMOTE_HEIGHT - 10));
final CompletableFuture<Void> secondFuture = verify(syncState).subscribeTTDReached(captor.capture());
context.syncBackwardsUntil(getBlockByNumber(REMOTE_HEIGHT)); verify(syncState, never()).unsubscribeTTDReached(anyLong());
assertThat(voidCompletableFuture).isNotCompleted();
assertThat(future).isNotSameAs(secondFuture); captor.getValue().onTTDReached(true);
respondUntilFutureIsDone(secondFuture); voidCompletableFuture.get(1, TimeUnit.SECONDS);
secondFuture.get(); verify(syncState).unsubscribeTTDReached(88L);
assertThat(localBlockchain.getChainHeadBlock()).isEqualTo(remoteBlockchain.getChainHeadBlock());
} }
private void respondUntilFutureIsDone(final CompletableFuture<Void> future) { @Test
final RespondingEthPeer.Responder responder = public void shouldNotWaitWhenTTDReached()
RespondingEthPeer.blockchainResponder(remoteBlockchain); throws ExecutionException, InterruptedException, TimeoutException {
doReturn(true).when(context).isOnTTD();
when(syncState.subscribeTTDReached(any())).thenReturn(88L);
final CompletableFuture<Void> voidCompletableFuture = context.waitForTTD();
voidCompletableFuture.get(1, TimeUnit.SECONDS);
assertThat(voidCompletableFuture).isCompleted();
verify(syncState).subscribeTTDReached(captor.capture());
verify(syncState).unsubscribeTTDReached(88L);
}
peer.respondWhileOtherThreadsWork(responder, () -> !future.isDone()); @Test
public void shouldStartForwardStepWhenOnLocalHeight() {
createBackwardChain(LOCAL_HEIGHT, LOCAL_HEIGHT + 10);
doReturn(CompletableFuture.completedFuture(null)).when(context).executeForwardAsync(any());
context.executeNextStep(null);
verify(context).executeForwardAsync(any());
} }
@Nonnull @Test
private Block getBlockByNumber(final int number) { public void shouldFinishWhenWorkIsDonw() {
return remoteBlockchain.getBlockByNumber(number).orElseThrow();
final CompletableFuture<Void> completableFuture = context.executeNextStep(null);
assertThat(completableFuture.isDone()).isTrue();
}
@Test
public void shouldCreateAnotherBackwardStepWhenNotOnLocalHeight() {
createBackwardChain(LOCAL_HEIGHT + 3, LOCAL_HEIGHT + 10);
doReturn(CompletableFuture.completedFuture(null)).when(context).executeBackwardAsync(any());
context.executeNextStep(null);
verify(context).executeBackwardAsync(any());
}
private void createBackwardChain(final int from, final int until) {
for (int i = until; i > from; --i) {
backwardChain.prependAncestorsHeader(getBlockByNumber(i - 1).getHeader());
}
} }
} }

@ -1,132 +0,0 @@
/*
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.backwardsync;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider.createInMemoryBlockchain;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.config.StubGenesisConfigOptions;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockDataGenerator;
import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil;
import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer;
import org.hyperledger.besu.ethereum.mainnet.MainnetProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class BackwardSyncLookupServiceTest {
public static final int REMOTE_HEIGHT = 50;
private static final BlockDataGenerator blockDataGenerator = new BlockDataGenerator();
@Spy
private final ProtocolSchedule protocolSchedule =
MainnetProtocolSchedule.fromConfig(new StubGenesisConfigOptions());
@Spy private final ProtocolSpec mockProtocolSpec = protocolSchedule.getByBlockNumber(0L);
private MutableBlockchain remoteBlockchain;
private RespondingEthPeer peer;
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private MetricsSystem metricsSystem;
@Mock private ProtocolContext protocolContext;
private BackwardSyncLookupService backwardSyncLookupService;
@Before
public void setup() throws NoSuchFieldException, IllegalAccessException {
when(protocolSchedule.getByBlockNumber(anyLong())).thenReturn(mockProtocolSpec);
Block genesisBlock = blockDataGenerator.genesisBlock();
remoteBlockchain = createInMemoryBlockchain(genesisBlock);
final Field max_retries = BackwardSyncLookupService.class.getDeclaredField("MAX_RETRIES");
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(max_retries, max_retries.getModifiers() & ~Modifier.FINAL);
max_retries.setAccessible(true);
max_retries.set(null, 1);
for (int i = 1; i <= REMOTE_HEIGHT; i++) {
final BlockDataGenerator.BlockOptions options =
new BlockDataGenerator.BlockOptions()
.setBlockNumber(i)
.setParentHash(remoteBlockchain.getBlockHashByNumber(i - 1).orElseThrow());
final Block block = blockDataGenerator.block(options);
final List<TransactionReceipt> receipts = blockDataGenerator.receipts(block);
remoteBlockchain.appendBlock(block, receipts);
}
EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create();
peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager);
EthContext ethContext = ethProtocolManager.ethContext();
backwardSyncLookupService =
spy(
new BackwardSyncLookupService(
protocolSchedule, ethContext, metricsSystem, protocolContext));
}
@Test
public void shouldFindABlockWhenResponding() throws Exception {
final Hash hash = getBlockByNumber(23).getHash();
final CompletableFuture<List<Block>> future = backwardSyncLookupService.lookup(hash);
respondUntilFutureIsDone(future);
final List<Block> blocks = future.get();
assertThat(blocks.get(0)).isEqualTo(getBlockByNumber(23));
}
private void respondUntilFutureIsDone(final CompletableFuture<?> future) {
final RespondingEthPeer.Responder responder =
RespondingEthPeer.blockchainResponder(remoteBlockchain);
peer.respondWhileOtherThreadsWork(responder, () -> !future.isDone());
}
@Nonnull
private Block getBlockByNumber(final int number) {
return remoteBlockchain.getBlockByNumber(number).orElseThrow();
}
}

@ -17,8 +17,6 @@ package org.hyperledger.besu.ethereum.eth.sync.backwardsync;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat; import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
import static org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider.createInMemoryBlockchain; import static org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider.createInMemoryBlockchain;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -40,10 +38,7 @@ import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage; import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import org.junit.Before; import org.junit.Before;
@ -55,7 +50,7 @@ import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner; import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class BackwardSyncPhaseTest { public class BackwardSyncStepTest {
public static final int REMOTE_HEIGHT = 50; public static final int REMOTE_HEIGHT = 50;
public static final int LOCAL_HEIGHT = 25; public static final int LOCAL_HEIGHT = 25;
@ -71,6 +66,7 @@ public class BackwardSyncPhaseTest {
private RespondingEthPeer peer; private RespondingEthPeer peer;
GenericKeyValueStorageFacade<Hash, BlockHeader> headersStorage; GenericKeyValueStorageFacade<Hash, BlockHeader> headersStorage;
GenericKeyValueStorageFacade<Hash, Block> blocksStorage; GenericKeyValueStorageFacade<Hash, Block> blocksStorage;
GenericKeyValueStorageFacade<Hash, Hash> chainStorage;
@Before @Before
public void setup() { public void setup() {
@ -85,6 +81,10 @@ public class BackwardSyncPhaseTest {
new BlocksConvertor(new MainnetBlockHeaderFunctions()), new BlocksConvertor(new MainnetBlockHeaderFunctions()),
new InMemoryKeyValueStorage()); new InMemoryKeyValueStorage());
chainStorage =
new GenericKeyValueStorageFacade<>(
Hash::toArrayUnsafe, new HashConvertor(), new InMemoryKeyValueStorage());
Block genesisBlock = blockDataGenerator.genesisBlock(); Block genesisBlock = blockDataGenerator.genesisBlock();
remoteBlockchain = createInMemoryBlockchain(genesisBlock); remoteBlockchain = createInMemoryBlockchain(genesisBlock);
MutableBlockchain localBlockchain = createInMemoryBlockchain(genesisBlock); MutableBlockchain localBlockchain = createInMemoryBlockchain(genesisBlock);
@ -104,6 +104,9 @@ public class BackwardSyncPhaseTest {
} }
when(context.getProtocolContext().getBlockchain()).thenReturn(localBlockchain); when(context.getProtocolContext().getBlockchain()).thenReturn(localBlockchain);
when(context.getProtocolSchedule()).thenReturn(protocolSchedule); when(context.getProtocolSchedule()).thenReturn(protocolSchedule);
when(context.getBatchSize()).thenReturn(5);
when(context.executeNextStep(null)).thenReturn(CompletableFuture.completedFuture(null));
EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create();
peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager); peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager);
@ -111,26 +114,17 @@ public class BackwardSyncPhaseTest {
when(context.getEthContext()).thenReturn(ethContext); when(context.getEthContext()).thenReturn(ethContext);
} }
@Test
public void shouldWaitWhenTTDNotReached()
throws ExecutionException, InterruptedException, TimeoutException {
final BackwardChain backwardChain = createBackwardChain(LOCAL_HEIGHT + 3);
when(context.isOnTTD()).thenReturn(false).thenReturn(false).thenReturn(true);
BackwardSyncPhase step = new BackwardSyncPhase(context, backwardChain);
step.waitForTTD();
verify(context, Mockito.times(2)).getEthContext();
}
@Test @Test
public void shouldFindHeaderWhenRequested() throws Exception { public void shouldFindHeaderWhenRequested() throws Exception {
final BackwardChain backwardChain = createBackwardChain(LOCAL_HEIGHT + 3); final BackwardChain backwardChain = createBackwardChain(LOCAL_HEIGHT + 3);
when(context.isOnTTD()).thenReturn(true); when(context.getBatchSize()).thenReturn(5);
BackwardSyncPhase step = new BackwardSyncPhase(context, backwardChain); BackwardSyncStep step = spy(new BackwardSyncStep(context, backwardChain));
final RespondingEthPeer.Responder responder = final RespondingEthPeer.Responder responder =
RespondingEthPeer.blockchainResponder(remoteBlockchain); RespondingEthPeer.blockchainResponder(remoteBlockchain);
final CompletableFuture<Void> future = step.executeStep(); final CompletableFuture<Void> future =
step.executeAsync(backwardChain.getFirstAncestorHeader().orElseThrow());
peer.respondWhileOtherThreadsWork(responder, () -> !future.isDone()); peer.respondWhileOtherThreadsWork(responder, () -> !future.isDone());
future.get(); future.get();
} }
@ -138,49 +132,38 @@ public class BackwardSyncPhaseTest {
@Test @Test
public void shouldFindHashToSync() { public void shouldFindHashToSync() {
BackwardSyncPhase step = final BackwardChain backwardChain = createBackwardChain(REMOTE_HEIGHT - 4, REMOTE_HEIGHT);
new BackwardSyncPhase(context, createBackwardChain(REMOTE_HEIGHT - 4, REMOTE_HEIGHT)); BackwardSyncStep step = new BackwardSyncStep(context, backwardChain);
final Hash hash =
final Hash hash = step.earliestUnprocessedHash(null); step.possibleRestoreOldNodes(backwardChain.getFirstAncestorHeader().orElseThrow());
assertThat(hash).isEqualTo(getBlockByNumber(REMOTE_HEIGHT - 4).getHeader().getParentHash()); assertThat(hash).isEqualTo(getBlockByNumber(REMOTE_HEIGHT - 4).getHeader().getParentHash());
} }
@Test
public void shouldFailWhenNothingToSync() {
final BackwardChain chain = createBackwardChain(REMOTE_HEIGHT);
chain.dropFirstHeader();
BackwardSyncPhase step = new BackwardSyncPhase(context, chain);
assertThatThrownBy(() -> step.earliestUnprocessedHash(null))
.isInstanceOf(BackwardSyncException.class)
.hasMessageContaining("No unprocessed hashes during backward sync");
}
@Test @Test
public void shouldRequestHeaderWhenAsked() throws Exception { public void shouldRequestHeaderWhenAsked() throws Exception {
BackwardSyncPhase step = new BackwardSyncPhase(context, createBackwardChain(REMOTE_HEIGHT - 1)); BackwardSyncStep step = new BackwardSyncStep(context, createBackwardChain(REMOTE_HEIGHT - 1));
final Block lookingForBlock = getBlockByNumber(REMOTE_HEIGHT - 2); final Block lookingForBlock = getBlockByNumber(REMOTE_HEIGHT - 2);
final RespondingEthPeer.Responder responder = final RespondingEthPeer.Responder responder =
RespondingEthPeer.blockchainResponder(remoteBlockchain); RespondingEthPeer.blockchainResponder(remoteBlockchain);
final CompletableFuture<BlockHeader> future = final CompletableFuture<List<BlockHeader>> future =
step.requestHeader(lookingForBlock.getHeader().getHash()); step.requestHeaders(lookingForBlock.getHeader().getHash());
peer.respondWhileOtherThreadsWork(responder, () -> !future.isDone()); peer.respondWhileOtherThreadsWork(responder, () -> !future.isDone());
final BlockHeader blockHeader = future.get(); final BlockHeader blockHeader = future.get().get(0);
assertThat(blockHeader).isEqualTo(lookingForBlock.getHeader()); assertThat(blockHeader).isEqualTo(lookingForBlock.getHeader());
} }
@Test @Test
public void shouldThrowWhenResponseIsEmptyWhenRequestingHeader() throws Exception { public void shouldThrowWhenResponseIsEmptyWhenRequestingHeader() throws Exception {
BackwardSyncPhase step = new BackwardSyncPhase(context, createBackwardChain(REMOTE_HEIGHT - 1)); BackwardSyncStep step = new BackwardSyncStep(context, createBackwardChain(REMOTE_HEIGHT - 1));
final Block lookingForBlock = getBlockByNumber(REMOTE_HEIGHT - 2); final Block lookingForBlock = getBlockByNumber(REMOTE_HEIGHT - 2);
final RespondingEthPeer.Responder responder = RespondingEthPeer.emptyResponder(); final RespondingEthPeer.Responder responder = RespondingEthPeer.emptyResponder();
final CompletableFuture<BlockHeader> future = final CompletableFuture<List<BlockHeader>> future =
step.requestHeader(lookingForBlock.getHeader().getHash()); step.requestHeaders(lookingForBlock.getHeader().getHash());
peer.respondWhileOtherThreadsWork(responder, () -> !future.isDone()); peer.respondWhileOtherThreadsWork(responder, () -> !future.isDone());
assertThatThrownBy(future::get) assertThatThrownBy(future::get)
@ -194,80 +177,11 @@ public class BackwardSyncPhaseTest {
final BackwardChain chain = Mockito.mock(BackwardChain.class); final BackwardChain chain = Mockito.mock(BackwardChain.class);
final BlockHeader header = Mockito.mock(BlockHeader.class); final BlockHeader header = Mockito.mock(BlockHeader.class);
when(header.getNumber()).thenReturn(12345L); BackwardSyncStep step = new BackwardSyncStep(context, chain);
BackwardSyncPhase step = new BackwardSyncPhase(context, chain);
step.saveHeader(header); step.saveHeader(header);
verify(chain).prependAncestorsHeader(header); verify(chain).prependAncestorsHeader(header);
verify(context).putCurrentChainToHeight(12345L, chain);
}
@Test
public void shouldMergeWhenPossible() {
BackwardChain backwardChain = createBackwardChain(REMOTE_HEIGHT - 3, REMOTE_HEIGHT);
backwardChain = spy(backwardChain);
BackwardSyncPhase step = new BackwardSyncPhase(context, backwardChain);
final BackwardChain historicalChain =
createBackwardChain(REMOTE_HEIGHT - 10, REMOTE_HEIGHT - 4);
when(context.findCorrectChainFromPivot(REMOTE_HEIGHT - 4))
.thenReturn(Optional.of(historicalChain));
assertThat(backwardChain.getFirstAncestorHeader().orElseThrow())
.isEqualTo(getBlockByNumber(REMOTE_HEIGHT - 3).getHeader());
step.possibleMerge(null);
assertThat(backwardChain.getFirstAncestorHeader().orElseThrow())
.isEqualTo(getBlockByNumber(REMOTE_HEIGHT - 10).getHeader());
verify(backwardChain).prependChain(historicalChain);
}
@Test
public void shouldNotMergeWhenNotPossible() {
BackwardChain backwardChain = createBackwardChain(REMOTE_HEIGHT - 5, REMOTE_HEIGHT);
backwardChain = spy(backwardChain);
when(context.findCorrectChainFromPivot(any(Long.class))).thenReturn(Optional.empty());
BackwardSyncPhase step = new BackwardSyncPhase(context, backwardChain);
step.possibleMerge(null);
verify(backwardChain, never()).prependChain(any());
}
@Test
public void shouldFinishWhenNoMoreSteps() {
BackwardChain backwardChain = createBackwardChain(LOCAL_HEIGHT, LOCAL_HEIGHT + 10);
BackwardSyncPhase step = new BackwardSyncPhase(context, backwardChain);
final CompletableFuture<Void> completableFuture =
step.possiblyMoreBackwardSteps(getBlockByNumber(LOCAL_HEIGHT).getHeader());
assertThat(completableFuture.isDone()).isTrue();
assertThat(completableFuture.isCompletedExceptionally()).isFalse();
}
@Test
public void shouldFinishExceptionallyWhenHeaderIsBellowBlockchainHeightButUnknown() {
BackwardChain backwardChain = createBackwardChain(LOCAL_HEIGHT, LOCAL_HEIGHT + 10);
BackwardSyncPhase step = new BackwardSyncPhase(context, backwardChain);
final CompletableFuture<Void> completableFuture =
step.possiblyMoreBackwardSteps(
ChainForTestCreator.createEmptyBlock((long) LOCAL_HEIGHT - 1).getHeader());
assertThat(completableFuture.isCompletedExceptionally()).isTrue();
}
@Test
public void shouldCreateAnotherStepWhenThereIsWorkToBeDone() {
BackwardChain backwardChain = createBackwardChain(LOCAL_HEIGHT + 3, LOCAL_HEIGHT + 10);
BackwardSyncPhase step = spy(new BackwardSyncPhase(context, backwardChain));
step.possiblyMoreBackwardSteps(backwardChain.getFirstAncestorHeader().orElseThrow());
verify(step).executeAsync(any());
} }
private BackwardChain createBackwardChain(final int from, final int until) { private BackwardChain createBackwardChain(final int from, final int until) {
@ -280,8 +194,10 @@ public class BackwardSyncPhaseTest {
@Nonnull @Nonnull
private BackwardChain createBackwardChain(final int number) { private BackwardChain createBackwardChain(final int number) {
return new BackwardChain( final BackwardChain backwardChain =
headersStorage, blocksStorage, remoteBlockchain.getBlockByNumber(number).orElseThrow()); new BackwardChain(headersStorage, blocksStorage, chainStorage);
backwardChain.appendTrustedBlock(remoteBlockchain.getBlockByNumber(number).orElseThrow());
return backwardChain;
} }
@Nonnull @Nonnull

@ -1,111 +0,0 @@
/*
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.backwardsync;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.mainnet.MainnetBlockHeaderFunctions;
import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class BackwardSyncTaskTest {
public static final int HEIGHT = 20_000;
@Mock private BackwardSyncContext context;
private List<Block> blocks;
GenericKeyValueStorageFacade<Hash, BlockHeader> headersStorage;
GenericKeyValueStorageFacade<Hash, Block> blocksStorage;
@Before
public void initContextAndChain() {
blocks = ChainForTestCreator.prepareChain(2, HEIGHT);
headersStorage =
new GenericKeyValueStorageFacade<>(
Hash::toArrayUnsafe,
new BlocksHeadersConvertor(new MainnetBlockHeaderFunctions()),
new InMemoryKeyValueStorage());
blocksStorage =
new GenericKeyValueStorageFacade<>(
Hash::toArrayUnsafe,
new BlocksConvertor(new MainnetBlockHeaderFunctions()),
new InMemoryKeyValueStorage());
}
@Test
public void shouldFailWhenPivotNotSetInContext() {
when(context.getCurrentChain()).thenReturn(Optional.empty());
BackwardSyncTask step = createBackwardSyncTask();
CompletableFuture<Void> completableFuture = step.executeAsync(null);
assertThatThrownBy(completableFuture::get)
.getCause()
.isInstanceOf(BackwardSyncException.class)
.hasMessageContaining("No pivot");
}
@Nonnull
private BackwardSyncTask createBackwardSyncTask() {
final BackwardChain backwardChain =
new BackwardChain(headersStorage, blocksStorage, blocks.get(1));
return createBackwardSyncTask(backwardChain);
}
@Nonnull
private BackwardSyncTask createBackwardSyncTask(final BackwardChain backwardChain) {
return new BackwardSyncTask(context, backwardChain) {
@Override
CompletableFuture<Void> executeStep() {
return CompletableFuture.completedFuture(null);
}
};
}
@Test
public void shouldFinishImmediatelyFailWhenPivotIsDifferent() {
final BackwardChain backwardChain =
new BackwardChain(headersStorage, blocksStorage, blocks.get(0));
when(context.getCurrentChain()).thenReturn(Optional.of(backwardChain));
BackwardSyncTask step = createBackwardSyncTask();
CompletableFuture<Void> completableFuture = step.executeAsync(null);
assertThat(completableFuture.isDone()).isTrue();
}
@Test
public void shouldExecuteWhenPivotIsCorrect() {
final BackwardChain backwardChain =
new BackwardChain(headersStorage, blocksStorage, blocks.get(1));
BackwardSyncTask step = createBackwardSyncTask();
when(context.getCurrentChain()).thenReturn(Optional.of(backwardChain));
CompletableFuture<Void> completableFuture = step.executeAsync(null);
assertThat(completableFuture.isDone()).isTrue();
}
}

@ -20,8 +20,6 @@ package org.hyperledger.besu.ethereum.eth.sync.backwardsync;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat; import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider.createInMemoryBlockchain; import static org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider.createInMemoryBlockchain;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import org.hyperledger.besu.config.StubGenesisConfigOptions; import org.hyperledger.besu.config.StubGenesisConfigOptions;
@ -46,6 +44,7 @@ import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import org.junit.Before; import org.junit.Before;
@ -56,7 +55,7 @@ import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner; import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class ForwardSyncPhaseTest { public class ForwardSyncStepTest {
public static final int REMOTE_HEIGHT = 50; public static final int REMOTE_HEIGHT = 50;
public static final int LOCAL_HEIGHT = 25; public static final int LOCAL_HEIGHT = 25;
@ -74,6 +73,8 @@ public class ForwardSyncPhaseTest {
GenericKeyValueStorageFacade<Hash, BlockHeader> headersStorage; GenericKeyValueStorageFacade<Hash, BlockHeader> headersStorage;
GenericKeyValueStorageFacade<Hash, Block> blocksStorage; GenericKeyValueStorageFacade<Hash, Block> blocksStorage;
GenericKeyValueStorageFacade<Hash, Hash> chainStorage;
@Before @Before
public void setup() { public void setup() {
headersStorage = headersStorage =
@ -86,6 +87,9 @@ public class ForwardSyncPhaseTest {
Hash::toArrayUnsafe, Hash::toArrayUnsafe,
new BlocksConvertor(new MainnetBlockHeaderFunctions()), new BlocksConvertor(new MainnetBlockHeaderFunctions()),
new InMemoryKeyValueStorage()); new InMemoryKeyValueStorage());
chainStorage =
new GenericKeyValueStorageFacade<>(
Hash::toArrayUnsafe, new HashConvertor(), new InMemoryKeyValueStorage());
Block genesisBlock = blockDataGenerator.genesisBlock(); Block genesisBlock = blockDataGenerator.genesisBlock();
remoteBlockchain = createInMemoryBlockchain(genesisBlock); remoteBlockchain = createInMemoryBlockchain(genesisBlock);
@ -107,6 +111,8 @@ public class ForwardSyncPhaseTest {
when(context.getProtocolContext().getBlockchain()).thenReturn(localBlockchain); when(context.getProtocolContext().getBlockchain()).thenReturn(localBlockchain);
when(context.getProtocolSchedule()).thenReturn(protocolSchedule); when(context.getProtocolSchedule()).thenReturn(protocolSchedule);
when(context.getBatchSize()).thenReturn(2);
when(context.executeNextStep(null)).thenReturn(CompletableFuture.completedFuture(null));
EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create();
peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager); peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager);
@ -129,12 +135,12 @@ public class ForwardSyncPhaseTest {
@Test @Test
public void shouldExecuteForwardSyncWhenPossible() throws Exception { public void shouldExecuteForwardSyncWhenPossible() throws Exception {
final BackwardChain backwardChain = createBackwardChain(LOCAL_HEIGHT, LOCAL_HEIGHT + 3); final BackwardChain backwardChain = createBackwardChain(LOCAL_HEIGHT, LOCAL_HEIGHT + 3);
ForwardSyncPhase step = new ForwardSyncPhase(context, backwardChain); ForwardSyncStep step = new ForwardSyncStep(context, backwardChain);
final RespondingEthPeer.Responder responder = final RespondingEthPeer.Responder responder =
RespondingEthPeer.blockchainResponder(remoteBlockchain); RespondingEthPeer.blockchainResponder(remoteBlockchain);
final CompletableFuture<Void> completableFuture = step.executeStep(); final CompletableFuture<Void> completableFuture = step.executeAsync();
peer.respondWhile( peer.respondWhile(
responder, responder,
@ -153,7 +159,7 @@ public class ForwardSyncPhaseTest {
@Test @Test
public void shouldDropHeadersAsLongAsWeKnowThem() { public void shouldDropHeadersAsLongAsWeKnowThem() {
final BackwardChain backwardChain = createBackwardChain(LOCAL_HEIGHT - 5, LOCAL_HEIGHT + 3); final BackwardChain backwardChain = createBackwardChain(LOCAL_HEIGHT - 5, LOCAL_HEIGHT + 3);
ForwardSyncPhase step = new ForwardSyncPhase(context, backwardChain); ForwardSyncStep step = new ForwardSyncStep(context, backwardChain);
assertThat(backwardChain.getFirstAncestorHeader().orElseThrow()) assertThat(backwardChain.getFirstAncestorHeader().orElseThrow())
.isEqualTo(getBlockByNumber(LOCAL_HEIGHT - 5).getHeader()); .isEqualTo(getBlockByNumber(LOCAL_HEIGHT - 5).getHeader());
@ -162,41 +168,9 @@ public class ForwardSyncPhaseTest {
.isEqualTo(getBlockByNumber(LOCAL_HEIGHT + 1).getHeader()); .isEqualTo(getBlockByNumber(LOCAL_HEIGHT + 1).getHeader());
} }
@Test
public void shouldDropBlocksThatWeTrust() {
final BackwardChain backwardChain = createBackwardChain(LOCAL_HEIGHT - 5, LOCAL_HEIGHT);
backwardChain.appendExpectedBlock(getBlockByNumber(LOCAL_HEIGHT + 1));
final BackwardChain finalChain = createBackwardChain(LOCAL_HEIGHT + 2, LOCAL_HEIGHT + 5);
finalChain.prependChain(backwardChain);
ForwardSyncPhase step = new ForwardSyncPhase(context, finalChain);
assertThat(finalChain.getFirstAncestorHeader().orElseThrow())
.isEqualTo(getBlockByNumber(LOCAL_HEIGHT - 5).getHeader());
step.processKnownAncestors(null);
assertThat(finalChain.getFirstAncestorHeader().orElseThrow())
.isEqualTo(getBlockByNumber(LOCAL_HEIGHT + 2).getHeader());
}
@Test
public void shouldMergeEvenLongerChains() {
final BackwardChain backwardChain = createBackwardChain(LOCAL_HEIGHT - 5, LOCAL_HEIGHT + 7);
backwardChain.appendExpectedBlock(getBlockByNumber(LOCAL_HEIGHT + 1));
final BackwardChain finalChain = createBackwardChain(LOCAL_HEIGHT + 2, LOCAL_HEIGHT + 5);
finalChain.prependChain(backwardChain);
ForwardSyncPhase step = new ForwardSyncPhase(context, finalChain);
assertThat(finalChain.getFirstAncestorHeader().orElseThrow())
.isEqualTo(getBlockByNumber(LOCAL_HEIGHT - 5).getHeader());
step.processKnownAncestors(null);
assertThat(finalChain.getFirstAncestorHeader().orElseThrow())
.isEqualTo(getBlockByNumber(LOCAL_HEIGHT + 2).getHeader());
}
@Test @Test
public void shouldNotRequestWhenNull() { public void shouldNotRequestWhenNull() {
ForwardSyncPhase phase = new ForwardSyncPhase(null, null); ForwardSyncStep phase = new ForwardSyncStep(context, null);
final CompletableFuture<Void> completableFuture = phase.possibleRequestBlock(null); final CompletableFuture<Void> completableFuture = phase.possibleRequestBlock(null);
assertThat(completableFuture.isDone()).isTrue(); assertThat(completableFuture.isDone()).isTrue();
@ -207,8 +181,8 @@ public class ForwardSyncPhaseTest {
@Test @Test
public void shouldFindBlockWhenRequested() throws Exception { public void shouldFindBlockWhenRequested() throws Exception {
ForwardSyncPhase step = ForwardSyncStep step =
new ForwardSyncPhase(context, createBackwardChain(LOCAL_HEIGHT + 1, LOCAL_HEIGHT + 3)); new ForwardSyncStep(context, createBackwardChain(LOCAL_HEIGHT + 1, LOCAL_HEIGHT + 3));
final RespondingEthPeer.Responder responder = final RespondingEthPeer.Responder responder =
RespondingEthPeer.blockchainResponder(remoteBlockchain); RespondingEthPeer.blockchainResponder(remoteBlockchain);
@ -221,37 +195,19 @@ public class ForwardSyncPhaseTest {
} }
@Test @Test
public void shouldCreateAnotherStepWhenThereIsWorkToBeDone() { public void shouldAddSuccessorsWhenNoUnknownBlockSet() throws Exception {
BackwardChain backwardChain = createBackwardChain(LOCAL_HEIGHT + 1, LOCAL_HEIGHT + 10);
ForwardSyncPhase step = spy(new ForwardSyncPhase(context, backwardChain));
step.possiblyMoreForwardSteps(backwardChain.getFirstAncestorHeader().orElseThrow());
verify(step).executeAsync(any());
}
@Test
public void shouldCreateBackwardStepWhenParentOfWorkIsNotImportedYet() {
BackwardChain backwardChain = createBackwardChain(LOCAL_HEIGHT + 3, LOCAL_HEIGHT + 10);
ForwardSyncPhase step = spy(new ForwardSyncPhase(context, backwardChain));
step.possiblyMoreForwardSteps(backwardChain.getFirstAncestorHeader().orElseThrow());
verify(step).executeBackwardAsync(any());
}
@Test
public void shouldAddSuccessorsWhenNoUnknownBlockSet() {
BackwardChain backwardChain = createBackwardChain(LOCAL_HEIGHT - 3, LOCAL_HEIGHT); BackwardChain backwardChain = createBackwardChain(LOCAL_HEIGHT - 3, LOCAL_HEIGHT);
backwardChain.appendExpectedBlock(getBlockByNumber(LOCAL_HEIGHT + 1)); backwardChain.appendTrustedBlock(getBlockByNumber(LOCAL_HEIGHT + 1));
backwardChain.appendExpectedBlock(getBlockByNumber(LOCAL_HEIGHT + 2)); backwardChain.appendTrustedBlock(getBlockByNumber(LOCAL_HEIGHT + 2));
backwardChain.appendExpectedBlock(getBlockByNumber(LOCAL_HEIGHT + 3)); backwardChain.appendTrustedBlock(getBlockByNumber(LOCAL_HEIGHT + 3));
ForwardSyncStep step = new ForwardSyncStep(context, backwardChain);
step.processKnownAncestors(null);
assertThat(backwardChain.getFirstAncestorHeader()).isEmpty();
ForwardSyncPhase step = new ForwardSyncPhase(context, backwardChain); final CompletableFuture<Void> future = step.executeAsync();
final BlockHeader header = step.processKnownAncestors(null);
assertThat(header).isNull();
final CompletableFuture<Void> future = step.possiblyMoreForwardSteps(null); future.get(1, TimeUnit.SECONDS);
assertThat(future.isDone()).isTrue(); assertThat(future.isDone()).isTrue();
assertThat(localBlockchain.getChainHeadBlock()).isEqualTo(getBlockByNumber(LOCAL_HEIGHT + 3)); assertThat(localBlockchain.getChainHeadBlock()).isEqualTo(getBlockByNumber(LOCAL_HEIGHT + 3));
} }
@ -266,8 +222,10 @@ public class ForwardSyncPhaseTest {
@Nonnull @Nonnull
private BackwardChain backwardChainFromBlock(final int number) { private BackwardChain backwardChainFromBlock(final int number) {
return new BackwardChain( final BackwardChain backwardChain =
headersStorage, blocksStorage, remoteBlockchain.getBlockByNumber(number).orElseThrow()); new BackwardChain(headersStorage, blocksStorage, chainStorage);
backwardChain.appendTrustedBlock(remoteBlockchain.getBlockByNumber(number).orElseThrow());
return backwardChain;
} }
@Nonnull @Nonnull

@ -22,10 +22,15 @@ import static org.hyperledger.besu.ethereum.eth.sync.backwardsync.ChainForTestCr
import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.Block; import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.BlockHeaderFunctions;
import org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider;
import org.hyperledger.besu.ethereum.mainnet.MainnetBlockHeaderFunctions; import org.hyperledger.besu.ethereum.mainnet.MainnetBlockHeaderFunctions;
import org.hyperledger.besu.ethereum.storage.StorageProvider;
import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage; import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage;
import java.util.List; import java.util.List;
import java.util.Optional;
import javax.annotation.Nonnull;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -41,6 +46,7 @@ public class InMemoryBackwardChainTest {
GenericKeyValueStorageFacade<Hash, BlockHeader> headersStorage; GenericKeyValueStorageFacade<Hash, BlockHeader> headersStorage;
GenericKeyValueStorageFacade<Hash, Block> blocksStorage; GenericKeyValueStorageFacade<Hash, Block> blocksStorage;
GenericKeyValueStorageFacade<Hash, Hash> chainStorage;
@Before @Before
public void prepareData() { public void prepareData() {
@ -54,14 +60,16 @@ public class InMemoryBackwardChainTest {
Hash::toArrayUnsafe, Hash::toArrayUnsafe,
new BlocksConvertor(new MainnetBlockHeaderFunctions()), new BlocksConvertor(new MainnetBlockHeaderFunctions()),
new InMemoryKeyValueStorage()); new InMemoryKeyValueStorage());
chainStorage =
new GenericKeyValueStorageFacade<>(
Hash::toArrayUnsafe, new HashConvertor(), new InMemoryKeyValueStorage());
blocks = prepareChain(ELEMENTS, HEIGHT); blocks = prepareChain(ELEMENTS, HEIGHT);
} }
@Test @Test
public void shouldReturnFirstHeaderCorrectly() { public void shouldReturnFirstHeaderCorrectly() {
BackwardChain backwardChain = BackwardChain backwardChain = createChainFromBlock(blocks.get(blocks.size() - 1));
new BackwardChain(headersStorage, blocksStorage, blocks.get(blocks.size() - 1));
backwardChain.prependAncestorsHeader(blocks.get(blocks.size() - 2).getHeader()); backwardChain.prependAncestorsHeader(blocks.get(blocks.size() - 2).getHeader());
backwardChain.prependAncestorsHeader(blocks.get(blocks.size() - 3).getHeader()); backwardChain.prependAncestorsHeader(blocks.get(blocks.size() - 3).getHeader());
backwardChain.prependAncestorsHeader(blocks.get(blocks.size() - 4).getHeader()); backwardChain.prependAncestorsHeader(blocks.get(blocks.size() - 4).getHeader());
@ -69,10 +77,17 @@ public class InMemoryBackwardChainTest {
assertThat(firstHeader).isEqualTo(blocks.get(blocks.size() - 4).getHeader()); assertThat(firstHeader).isEqualTo(blocks.get(blocks.size() - 4).getHeader());
} }
@Nonnull
private BackwardChain createChainFromBlock(final Block pivot) {
final BackwardChain backwardChain =
new BackwardChain(headersStorage, blocksStorage, chainStorage);
backwardChain.appendTrustedBlock(pivot);
return backwardChain;
}
@Test @Test
public void shouldSaveHeadersWhenHeightAndHashMatches() { public void shouldSaveHeadersWhenHeightAndHashMatches() {
BackwardChain backwardChain = BackwardChain backwardChain = createChainFromBlock(blocks.get(blocks.size() - 1));
new BackwardChain(headersStorage, blocksStorage, blocks.get(blocks.size() - 1));
backwardChain.prependAncestorsHeader(blocks.get(blocks.size() - 2).getHeader()); backwardChain.prependAncestorsHeader(blocks.get(blocks.size() - 2).getHeader());
backwardChain.prependAncestorsHeader(blocks.get(blocks.size() - 3).getHeader()); backwardChain.prependAncestorsHeader(blocks.get(blocks.size() - 3).getHeader());
backwardChain.prependAncestorsHeader(blocks.get(blocks.size() - 4).getHeader()); backwardChain.prependAncestorsHeader(blocks.get(blocks.size() - 4).getHeader());
@ -82,8 +97,7 @@ public class InMemoryBackwardChainTest {
@Test @Test
public void shouldNotSaveHeadersWhenWrongHeight() { public void shouldNotSaveHeadersWhenWrongHeight() {
BackwardChain backwardChain = BackwardChain backwardChain = createChainFromBlock(blocks.get(blocks.size() - 1));
new BackwardChain(headersStorage, blocksStorage, blocks.get(blocks.size() - 1));
backwardChain.prependAncestorsHeader(blocks.get(blocks.size() - 2).getHeader()); backwardChain.prependAncestorsHeader(blocks.get(blocks.size() - 2).getHeader());
backwardChain.prependAncestorsHeader(blocks.get(blocks.size() - 3).getHeader()); backwardChain.prependAncestorsHeader(blocks.get(blocks.size() - 3).getHeader());
assertThatThrownBy( assertThatThrownBy(
@ -96,8 +110,7 @@ public class InMemoryBackwardChainTest {
@Test @Test
public void shouldNotSaveHeadersWhenWrongHash() { public void shouldNotSaveHeadersWhenWrongHash() {
BackwardChain backwardChain = BackwardChain backwardChain = createChainFromBlock(blocks.get(blocks.size() - 1));
new BackwardChain(headersStorage, blocksStorage, blocks.get(blocks.size() - 1));
backwardChain.prependAncestorsHeader(blocks.get(blocks.size() - 2).getHeader()); backwardChain.prependAncestorsHeader(blocks.get(blocks.size() - 2).getHeader());
backwardChain.prependAncestorsHeader(blocks.get(blocks.size() - 3).getHeader()); backwardChain.prependAncestorsHeader(blocks.get(blocks.size() - 3).getHeader());
BlockHeader wrongHashHeader = prepareWrongParentHash(blocks.get(blocks.size() - 4).getHeader()); BlockHeader wrongHashHeader = prepareWrongParentHash(blocks.get(blocks.size() - 4).getHeader());
@ -108,55 +121,10 @@ public class InMemoryBackwardChainTest {
assertThat(firstHeader).isEqualTo(blocks.get(blocks.size() - 3).getHeader()); assertThat(firstHeader).isEqualTo(blocks.get(blocks.size() - 3).getHeader());
} }
@Test
public void shouldMergeConnectedChains() {
BackwardChain firstChain =
new BackwardChain(headersStorage, blocksStorage, blocks.get(blocks.size() - 1));
firstChain.prependAncestorsHeader(blocks.get(blocks.size() - 2).getHeader());
firstChain.prependAncestorsHeader(blocks.get(blocks.size() - 3).getHeader());
BackwardChain secondChain =
new BackwardChain(headersStorage, blocksStorage, blocks.get(blocks.size() - 4));
secondChain.prependAncestorsHeader(blocks.get(blocks.size() - 5).getHeader());
secondChain.prependAncestorsHeader(blocks.get(blocks.size() - 6).getHeader());
BlockHeader firstHeader = firstChain.getFirstAncestorHeader().orElseThrow();
assertThat(firstHeader).isEqualTo(blocks.get(blocks.size() - 3).getHeader());
firstChain.prependChain(secondChain);
firstHeader = firstChain.getFirstAncestorHeader().orElseThrow();
assertThat(firstHeader).isEqualTo(blocks.get(blocks.size() - 6).getHeader());
}
@Test
public void shouldNotMergeNotConnectedChains() {
BackwardChain firstChain =
new BackwardChain(headersStorage, blocksStorage, blocks.get(blocks.size() - 1));
firstChain.prependAncestorsHeader(blocks.get(blocks.size() - 2).getHeader());
firstChain.prependAncestorsHeader(blocks.get(blocks.size() - 3).getHeader());
BackwardChain secondChain =
new BackwardChain(headersStorage, blocksStorage, blocks.get(blocks.size() - 5));
secondChain.prependAncestorsHeader(blocks.get(blocks.size() - 6).getHeader());
secondChain.prependAncestorsHeader(blocks.get(blocks.size() - 7).getHeader());
BlockHeader firstHeader = firstChain.getFirstAncestorHeader().orElseThrow();
assertThat(firstHeader).isEqualTo(blocks.get(blocks.size() - 3).getHeader());
firstChain.prependChain(secondChain);
firstHeader = firstChain.getFirstAncestorHeader().orElseThrow();
assertThat(firstHeader).isEqualTo(blocks.get(blocks.size() - 3).getHeader());
}
@Test @Test
public void shouldDropFromTheEnd() { public void shouldDropFromTheEnd() {
BackwardChain backwardChain = BackwardChain backwardChain = createChainFromBlock(blocks.get(blocks.size() - 1));
new BackwardChain(headersStorage, blocksStorage, blocks.get(blocks.size() - 1));
backwardChain.prependAncestorsHeader(blocks.get(blocks.size() - 2).getHeader()); backwardChain.prependAncestorsHeader(blocks.get(blocks.size() - 2).getHeader());
backwardChain.prependAncestorsHeader(blocks.get(blocks.size() - 3).getHeader()); backwardChain.prependAncestorsHeader(blocks.get(blocks.size() - 3).getHeader());
@ -173,4 +141,59 @@ public class InMemoryBackwardChainTest {
firstHeader = backwardChain.getFirstAncestorHeader().orElseThrow(); firstHeader = backwardChain.getFirstAncestorHeader().orElseThrow();
assertThat(firstHeader).isEqualTo(blocks.get(blocks.size() - 1).getHeader()); assertThat(firstHeader).isEqualTo(blocks.get(blocks.size() - 1).getHeader());
} }
@Test
public void shouldCreateChainFromScheduleAndFunctions() {
final StorageProvider provider = new InMemoryKeyValueStorageProvider();
BlockHeaderFunctions functions = new MainnetBlockHeaderFunctions();
final BackwardChain chain = BackwardChain.from(provider, functions);
assertThat(chain).isNotNull();
chain.clear();
}
@Test
public void shouldAddHeaderToQueue() {
BackwardChain backwardChain = createChainFromBlock(blocks.get(3));
Optional<Hash> firstHash = backwardChain.getFirstHash();
assertThat(firstHash).isNotPresent();
backwardChain.addNewHash(blocks.get(7).getHash());
backwardChain.addNewHash(blocks.get(9).getHash());
backwardChain.addNewHash(blocks.get(9).getHash());
backwardChain.addNewHash(blocks.get(11).getHash());
firstHash = backwardChain.getFirstHash();
assertThat(firstHash).isPresent();
assertThat(firstHash.orElseThrow()).isEqualTo(blocks.get(7).getHash());
firstHash = backwardChain.getFirstHash();
assertThat(firstHash).isPresent();
assertThat(firstHash.orElseThrow()).isEqualTo(blocks.get(9).getHash());
firstHash = backwardChain.getFirstHash();
assertThat(firstHash).isPresent();
assertThat(firstHash.orElseThrow()).isEqualTo(blocks.get(11).getHash());
}
@Test
public void shouldChangeFirstAncestorIfPivotIsToFar() {
BackwardChain backwardChain = createChainFromBlock(blocks.get(3));
backwardChain.appendTrustedBlock(blocks.get(4));
Optional<BlockHeader> firstAncestorHeader = backwardChain.getFirstAncestorHeader();
assertThat(firstAncestorHeader).isPresent();
assertThat(firstAncestorHeader.orElseThrow()).isEqualTo(blocks.get(3).getHeader());
Optional<Block> pivot = backwardChain.getPivot();
assertThat(pivot).isPresent();
assertThat(pivot.orElseThrow()).isEqualTo(blocks.get(4));
backwardChain.appendTrustedBlock(blocks.get(7));
firstAncestorHeader = backwardChain.getFirstAncestorHeader();
assertThat(firstAncestorHeader).isPresent();
assertThat(firstAncestorHeader.orElseThrow()).isEqualTo(blocks.get(7).getHeader());
pivot = backwardChain.getPivot();
assertThat(pivot).isPresent();
assertThat(pivot.orElseThrow()).isEqualTo(blocks.get(7));
}
} }

@ -64,7 +64,7 @@ Calculated : ${currentHash}
tasks.register('checkAPIChanges', FileStateChecker) { tasks.register('checkAPIChanges', FileStateChecker) {
description = "Checks that the API for the Plugin-API project does not change without deliberate thought" description = "Checks that the API for the Plugin-API project does not change without deliberate thought"
files = sourceSets.main.allJava.files files = sourceSets.main.allJava.files
knownHash = 'hRHEr0UjfMKObN5VFIHk9GgcYlcEsEXEYfj0qVl5XQk=' knownHash = 'Q0degxzD4dwmIvROpsxR3krWexb/lQUOfs7YjH2Sr5M='
} }
check.dependsOn('checkAPIChanges') check.dependsOn('checkAPIChanges')

@ -236,4 +236,16 @@ public interface BesuEvents extends BesuService {
*/ */
void onLogEmitted(LogWithMetadata logWithMetadata); void onLogEmitted(LogWithMetadata logWithMetadata);
} }
interface TTDReachedListener {
/**
* Emitted when Total Terminal Difficulty is reached on a chain and dependent merge
* functionality should trigger.
*
* @param reached is true when we reached TTD, can be potentially false in case we reorg under
* TTD
*/
void onTTDReached(boolean reached);
}
} }

Loading…
Cancel
Save