Restore updating chain head and finalized block during backward sync (#4718)

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>
pull/4736/head
Fabio Di Fabio 2 years ago committed by GitHub
parent 951153d8ee
commit 9d4ec3bca4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      CHANGELOG.md
  2. 70
      consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/blockcreation/MergeCoordinator.java
  3. 4
      consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/blockcreation/MergeMiningCoordinator.java
  4. 10
      consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/blockcreation/TransitionCoordinator.java
  5. 4
      consensus/merge/src/test/java/org/hyperledger/besu/consensus/merge/blockcreation/MergeCoordinatorTest.java
  6. 19
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/engine/EngineForkchoiceUpdated.java
  7. 24
      ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/engine/EngineForkchoiceUpdatedTest.java
  8. 16
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardChain.java
  9. 59
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncContext.java
  10. 6
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncStep.java
  11. 60
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardsSyncAlgorithm.java
  12. 88
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncAlgSpec.java
  13. 11
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncContextTest.java
  14. 11
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncStepTest.java

@ -23,6 +23,7 @@
- Print an overview of configuration and system information at startup [#4451](https://github.com/hyperledger/besu/pull/4451)
### Bug Fixes
- Restore updating chain head and finalized block during backward sync [#4718](https://github.com/hyperledger/besu/pull/4718)
### Download Links
@ -76,7 +77,6 @@
https://hyperledger.jfrog.io/hyperledger/besu-binaries/besu/22.10.0/besu-22.10.0.tar.gz / sha256: 88fb5df567e4ec3547d7d2970cfef00debbd020c0da66b19166d43779b3b2b85
https://hyperledger.jfrog.io/hyperledger/besu-binaries/besu/22.10.0/besu-22.10.0.zip / sha256: c8e39f7c879409cb9b47f4d3de5e9c521249083830a8c9a45e8a14a319fe195d
## 22.10.0-RC2
### Breaking Changes

@ -390,33 +390,45 @@ public class MergeCoordinator implements MergeMiningCoordinator, BadChainListene
}
@Override
public Optional<BlockHeader> getOrSyncHeaderByHash(final Hash blockHash) {
public Optional<BlockHeader> getOrSyncHeadByHash(final Hash headHash, final Hash finalizedHash) {
final var chain = protocolContext.getBlockchain();
final var optHeader = chain.getBlockHeader(blockHash);
final var maybeHeadHeader = chain.getBlockHeader(headHash);
if (optHeader.isPresent()) {
debugLambda(LOG, "BlockHeader {} is already present", () -> optHeader.get().toLogString());
if (maybeHeadHeader.isPresent()) {
debugLambda(LOG, "BlockHeader {} is already present", maybeHeadHeader.get()::toLogString);
} else {
debugLambda(LOG, "appending block hash {} to backward sync", blockHash::toHexString);
backwardSyncContext.syncBackwardsUntil(blockHash);
debugLambda(LOG, "Appending new head block hash {} to backward sync", headHash::toHexString);
backwardSyncContext.updateHead(headHash);
backwardSyncContext
.syncBackwardsUntil(headHash)
.thenRun(() -> updateFinalized(finalizedHash));
}
return optHeader;
return maybeHeadHeader;
}
@Override
public Optional<BlockHeader> getOrSyncHeaderByHash(
final Hash blockHash, final Hash finalizedBlockHash) {
final var chain = protocolContext.getBlockchain();
final var optHeader = chain.getBlockHeader(blockHash);
if (optHeader.isPresent()) {
debugLambda(LOG, "BlockHeader {} is already present", () -> optHeader.get().toLogString());
} else {
debugLambda(LOG, "appending block hash {} to backward sync", blockHash::toHexString);
backwardSyncContext.updateHeads(blockHash, finalizedBlockHash);
backwardSyncContext.syncBackwardsUntil(blockHash);
private void updateFinalized(final Hash finalizedHash) {
if (mergeContext
.getFinalized()
.map(BlockHeader::getHash)
.map(finalizedHash::equals)
.orElse(Boolean.FALSE)) {
LOG.debug("Finalized block already set to {}, nothing to do", finalizedHash);
return;
}
return optHeader;
protocolContext
.getBlockchain()
.getBlockHeader(finalizedHash)
.ifPresentOrElse(
finalizedHeader -> {
debugLambda(
LOG, "Setting finalized block header to {}", finalizedHeader::toLogString);
mergeContext.setFinalized(finalizedHeader);
},
() ->
LOG.warn(
"Internal error, backward sync completed but failed to import finalized block {}",
finalizedHash));
}
@Override
@ -693,12 +705,11 @@ public class MergeCoordinator implements MergeMiningCoordinator, BadChainListene
@Override
public boolean isDescendantOf(final BlockHeader ancestorBlock, final BlockHeader newBlock) {
LOG.debug(
"checking if block {}:{} is ancestor of {}:{}",
ancestorBlock.getNumber(),
ancestorBlock.getBlockHash(),
newBlock.getNumber(),
newBlock.getBlockHash());
debugLambda(
LOG,
"checking if block {} is ancestor of {}",
ancestorBlock::toLogString,
newBlock::toLogString);
// start with self, because descending from yourself is valid
Optional<BlockHeader> parentOf = Optional.of(newBlock);
@ -714,10 +725,11 @@ public class MergeCoordinator implements MergeMiningCoordinator, BadChainListene
&& ancestorBlock.getBlockHash().equals(parentOf.get().getBlockHash())) {
return true;
} else {
LOG.debug(
debugLambda(
LOG,
"looped all the way back, did not find ancestor {} of child {}",
ancestorBlock.getBlockHash(),
newBlock.getBlockHash());
ancestorBlock::toLogString,
newBlock::toLogString);
return false;
}
}

@ -63,12 +63,10 @@ public interface MergeMiningCoordinator extends MiningCoordinator {
CompletableFuture<Void> appendNewPayloadToSync(Block newPayload);
Optional<BlockHeader> getOrSyncHeaderByHash(Hash blockHash);
Optional<BlockHeader> getOrSyncHeadByHash(Hash headHash, Hash finalizedHash);
boolean isMiningBeforeMerge();
Optional<BlockHeader> getOrSyncHeaderByHash(Hash blockHash, Hash finalizedBlockHash);
void addBadBlock(final Block block, Optional<Throwable> maybeCause);
boolean isBadBlock(Hash blockHash);

@ -183,14 +183,8 @@ public class TransitionCoordinator extends TransitionUtils<MiningCoordinator>
}
@Override
public Optional<BlockHeader> getOrSyncHeaderByHash(final Hash blockHash) {
return mergeCoordinator.getOrSyncHeaderByHash(blockHash);
}
@Override
public Optional<BlockHeader> getOrSyncHeaderByHash(
final Hash blockHash, final Hash finalizedBlockHash) {
return mergeCoordinator.getOrSyncHeaderByHash(blockHash, finalizedBlockHash);
public Optional<BlockHeader> getOrSyncHeadByHash(final Hash headHash, final Hash finalizedHash) {
return mergeCoordinator.getOrSyncHeadByHash(headHash, finalizedHash);
}
@Override

@ -698,7 +698,7 @@ public class MergeCoordinatorTest implements MergeGenesisConfigHelper {
BlockHeader mockHeader =
headerGenerator.parentHash(Hash.fromHexStringLenient("0xdead")).buildHeader();
when(blockchain.getBlockHeader(mockHeader.getHash())).thenReturn(Optional.of(mockHeader));
var res = coordinator.getOrSyncHeaderByHash(mockHeader.getHash());
var res = coordinator.getOrSyncHeadByHash(mockHeader.getHash(), Hash.ZERO);
assertThat(res).isPresent();
}
@ -710,7 +710,7 @@ public class MergeCoordinatorTest implements MergeGenesisConfigHelper {
when(backwardSyncContext.syncBackwardsUntil(mockHeader.getBlockHash()))
.thenReturn(CompletableFuture.completedFuture(null));
var res = coordinator.getOrSyncHeaderByHash(mockHeader.getHash());
var res = coordinator.getOrSyncHeadByHash(mockHeader.getHash(), Hash.ZERO);
assertThat(res).isNotPresent();
}

@ -99,24 +99,27 @@ public class EngineForkchoiceUpdated extends ExecutionEngineJsonRpcMethod {
Optional.of(forkChoice.getHeadBlockHash() + " is an invalid block")));
}
Optional<BlockHeader> newHead =
mergeCoordinator.getOrSyncHeaderByHash(forkChoice.getHeadBlockHash());
final Optional<BlockHeader> maybeNewHead =
mergeCoordinator.getOrSyncHeadByHash(
forkChoice.getHeadBlockHash(), forkChoice.getFinalizedBlockHash());
if (newHead.isEmpty()) {
if (maybeNewHead.isEmpty()) {
return syncingResponse(requestId, forkChoice);
}
final BlockHeader newHead = maybeNewHead.get();
maybePayloadAttributes.ifPresentOrElse(
this::logPayload, () -> LOG.debug("Payload attributes are null"));
if (!isValidForkchoiceState(
forkChoice.getSafeBlockHash(), forkChoice.getFinalizedBlockHash(), newHead.get())) {
forkChoice.getSafeBlockHash(), forkChoice.getFinalizedBlockHash(), newHead)) {
logForkchoiceUpdatedCall(INVALID, forkChoice);
return new JsonRpcErrorResponse(requestId, JsonRpcError.INVALID_FORKCHOICE_STATE);
}
// TODO: post-merge cleanup, this should be unnecessary after merge
if (!mergeCoordinator.latestValidAncestorDescendsFromTerminal(newHead.get())) {
if (!mergeCoordinator.latestValidAncestorDescendsFromTerminal(newHead)) {
logForkchoiceUpdatedCall(INVALID, forkChoice);
return new JsonRpcSuccessResponse(
requestId,
@ -124,12 +127,12 @@ public class EngineForkchoiceUpdated extends ExecutionEngineJsonRpcMethod {
INVALID,
Hash.ZERO,
null,
Optional.of(newHead.get() + " did not descend from terminal block")));
Optional.of(newHead + " did not descend from terminal block")));
}
ForkchoiceResult result =
mergeCoordinator.updateForkChoice(
newHead.get(),
newHead,
forkChoice.getFinalizedBlockHash(),
forkChoice.getSafeBlockHash(),
maybePayloadAttributes.map(
@ -149,7 +152,7 @@ public class EngineForkchoiceUpdated extends ExecutionEngineJsonRpcMethod {
maybePayloadAttributes.map(
payloadAttributes ->
mergeCoordinator.preparePayload(
newHead.get(),
newHead,
payloadAttributes.getTimestamp(),
payloadAttributes.getPrevRandao(),
payloadAttributes.getSuggestedFeeRecipient()));

@ -111,7 +111,7 @@ public class EngineForkchoiceUpdatedTest {
public void shouldReturnInvalidOnBadTerminalBlock() {
BlockHeader mockHeader = new BlockHeaderTestFixture().baseFeePerGas(Wei.ONE).buildHeader();
when(mergeCoordinator.getOrSyncHeaderByHash(mockHeader.getHash()))
when(mergeCoordinator.getOrSyncHeadByHash(mockHeader.getHash(), Hash.ZERO))
.thenReturn(Optional.of(mockHeader));
when(mergeCoordinator.latestValidAncestorDescendsFromTerminal(mockHeader)).thenReturn(false);
assertSuccessWithPayloadForForkchoiceResult(
@ -148,7 +148,7 @@ public class EngineForkchoiceUpdatedTest {
@Test
public void shouldReturnValidWithoutFinalizedOrPayload() {
BlockHeader mockHeader = new BlockHeaderTestFixture().baseFeePerGas(Wei.ONE).buildHeader();
when(mergeCoordinator.getOrSyncHeaderByHash(mockHeader.getHash()))
when(mergeCoordinator.getOrSyncHeadByHash(mockHeader.getHash(), Hash.ZERO))
.thenReturn(Optional.of(mockHeader));
when(mergeCoordinator.latestValidAncestorDescendsFromTerminal(mockHeader)).thenReturn(true);
@ -172,7 +172,7 @@ public class EngineForkchoiceUpdatedTest {
when(mergeCoordinator.latestValidAncestorDescendsFromTerminal(mockHeader)).thenReturn(true);
when(mergeCoordinator.isDescendantOf(any(), any())).thenReturn(true);
when(mergeContext.isSyncing()).thenReturn(false);
when(mergeCoordinator.getOrSyncHeaderByHash(mockHeader.getHash()))
when(mergeCoordinator.getOrSyncHeadByHash(mockHeader.getHash(), parent.getHash()))
.thenReturn(Optional.of(mockHeader));
when(mergeCoordinator.updateForkChoice(
mockHeader, parent.getHash(), parent.getHash(), Optional.empty()))
@ -202,7 +202,7 @@ public class EngineForkchoiceUpdatedTest {
BlockHeader mockParent = builder.number(9L).buildHeader();
BlockHeader mockHeader = builder.number(10L).parentHash(mockParent.getHash()).buildHeader();
when(blockchain.getBlockHeader(any())).thenReturn(Optional.of(mockHeader));
when(mergeCoordinator.getOrSyncHeaderByHash(mockHeader.getHash()))
when(mergeCoordinator.getOrSyncHeadByHash(mockHeader.getHash(), Hash.ZERO))
.thenReturn(Optional.of(mockHeader));
when(mergeCoordinator.latestValidAncestorDescendsFromTerminal(mockHeader)).thenReturn(true);
when(mergeCoordinator.isDescendantOf(any(), any())).thenReturn(true);
@ -217,7 +217,7 @@ public class EngineForkchoiceUpdatedTest {
@Test
public void shouldReturnValidWithoutFinalizedWithPayload() {
BlockHeader mockHeader = new BlockHeaderTestFixture().baseFeePerGas(Wei.ONE).buildHeader();
when(mergeCoordinator.getOrSyncHeaderByHash(mockHeader.getHash()))
when(mergeCoordinator.getOrSyncHeadByHash(mockHeader.getHash(), Hash.ZERO))
.thenReturn(Optional.of(mockHeader));
when(mergeCoordinator.latestValidAncestorDescendsFromTerminal(mockHeader)).thenReturn(true);
@ -254,7 +254,7 @@ public class EngineForkchoiceUpdatedTest {
when(blockchain.getBlockHeader(finalizedBlockHash)).thenReturn(Optional.empty());
when(mergeContext.isSyncing()).thenReturn(false);
when(mergeCoordinator.getOrSyncHeaderByHash(newHead.getHash()))
when(mergeCoordinator.getOrSyncHeadByHash(newHead.getHash(), finalizedBlockHash))
.thenReturn(Optional.of(newHead));
var resp =
@ -275,7 +275,7 @@ public class EngineForkchoiceUpdatedTest {
when(blockchain.getBlockHeader(newHead.getHash())).thenReturn(Optional.of(newHead));
when(blockchain.getBlockHeader(finalized.getHash())).thenReturn(Optional.of(finalized));
when(mergeContext.isSyncing()).thenReturn(false);
when(mergeCoordinator.getOrSyncHeaderByHash(newHead.getHash()))
when(mergeCoordinator.getOrSyncHeadByHash(newHead.getHash(), finalized.getBlockHash()))
.thenReturn(Optional.of(newHead));
when(mergeCoordinator.isDescendantOf(finalized, newHead)).thenReturn(false);
@ -301,7 +301,7 @@ public class EngineForkchoiceUpdatedTest {
when(blockchain.getBlockHeader(parent.getHash())).thenReturn(Optional.of(parent));
when(mergeContext.isSyncing()).thenReturn(false);
when(mergeCoordinator.getOrSyncHeaderByHash(newHead.getHash()))
when(mergeCoordinator.getOrSyncHeadByHash(newHead.getHash(), parent.getBlockHash()))
.thenReturn(Optional.of(newHead));
var resp =
@ -328,7 +328,7 @@ public class EngineForkchoiceUpdatedTest {
when(blockchain.getBlockHeader(finalized.getHash())).thenReturn(Optional.of(finalized));
when(blockchain.getBlockHeader(safeBlockBlockHash)).thenReturn(Optional.empty());
when(mergeContext.isSyncing()).thenReturn(false);
when(mergeCoordinator.getOrSyncHeaderByHash(newHead.getHash()))
when(mergeCoordinator.getOrSyncHeadByHash(newHead.getHash(), finalized.getBlockHash()))
.thenReturn(Optional.of(newHead));
when(mergeCoordinator.isDescendantOf(finalized, newHead)).thenReturn(true);
@ -352,7 +352,7 @@ public class EngineForkchoiceUpdatedTest {
when(blockchain.getBlockHeader(finalized.getHash())).thenReturn(Optional.of(finalized));
when(blockchain.getBlockHeader(safeBlock.getHash())).thenReturn(Optional.of(safeBlock));
when(mergeContext.isSyncing()).thenReturn(false);
when(mergeCoordinator.getOrSyncHeaderByHash(newHead.getHash()))
when(mergeCoordinator.getOrSyncHeadByHash(newHead.getHash(), finalized.getBlockHash()))
.thenReturn(Optional.of(newHead));
when(mergeCoordinator.isDescendantOf(finalized, newHead)).thenReturn(true);
when(mergeCoordinator.isDescendantOf(finalized, safeBlock)).thenReturn(false);
@ -377,7 +377,7 @@ public class EngineForkchoiceUpdatedTest {
when(blockchain.getBlockHeader(finalized.getHash())).thenReturn(Optional.of(finalized));
when(blockchain.getBlockHeader(safeBlock.getHash())).thenReturn(Optional.of(safeBlock));
when(mergeContext.isSyncing()).thenReturn(false);
when(mergeCoordinator.getOrSyncHeaderByHash(newHead.getHash()))
when(mergeCoordinator.getOrSyncHeadByHash(newHead.getHash(), finalized.getBlockHash()))
.thenReturn(Optional.of(newHead));
when(mergeCoordinator.isDescendantOf(finalized, newHead)).thenReturn(true);
when(mergeCoordinator.isDescendantOf(finalized, safeBlock)).thenReturn(true);
@ -397,7 +397,7 @@ public class EngineForkchoiceUpdatedTest {
public void shouldIgnoreUpdateToOldHeadAndNotPreparePayload() {
BlockHeader mockHeader = new BlockHeaderTestFixture().baseFeePerGas(Wei.ONE).buildHeader();
when(mergeCoordinator.getOrSyncHeaderByHash(mockHeader.getHash()))
when(mergeCoordinator.getOrSyncHeadByHash(mockHeader.getHash(), Hash.ZERO))
.thenReturn(Optional.of(mockHeader));
when(mergeCoordinator.latestValidAncestorDescendsFromTerminal(mockHeader)).thenReturn(true);

@ -93,15 +93,14 @@ public class BackwardChain {
headers.put(blockHeader.getHash(), blockHeader);
return;
}
BlockHeader firstHeader = firstStoredAncestor.get();
final BlockHeader firstHeader = firstStoredAncestor.get();
headers.put(blockHeader.getHash(), blockHeader);
chainStorage.put(blockHeader.getHash(), firstStoredAncestor.get().getHash());
chainStorage.put(blockHeader.getHash(), firstHeader.getHash());
firstStoredAncestor = Optional.of(blockHeader);
debugLambda(
LOG,
"Added header {} on height {} to backward chain led by pivot {} on height {}",
"Added header {} to backward chain led by pivot {} on height {}",
blockHeader::toLogString,
blockHeader::getNumber,
() -> lastStoredPivot.orElseThrow().toLogString(),
firstHeader::getNumber);
}
@ -127,16 +126,23 @@ public class BackwardChain {
}
public synchronized void appendTrustedBlock(final Block newPivot) {
debugLambda(LOG, "appending trusted block {}", newPivot::toLogString);
debugLambda(LOG, "Appending trusted block {}", newPivot::toLogString);
headers.put(newPivot.getHash(), newPivot.getHeader());
blocks.put(newPivot.getHash(), newPivot);
if (lastStoredPivot.isEmpty()) {
firstStoredAncestor = Optional.of(newPivot.getHeader());
} else {
if (newPivot.getHeader().getParentHash().equals(lastStoredPivot.get().getHash())) {
debugLambda(
LOG,
"Added block {} to backward chain led by pivot {} on height {}",
newPivot::toLogString,
lastStoredPivot.get()::toLogString,
firstStoredAncestor.get()::getNumber);
chainStorage.put(lastStoredPivot.get().getHash(), newPivot.getHash());
} else {
firstStoredAncestor = Optional.of(newPivot.getHeader());
debugLambda(LOG, "Re-pivoting to new target block {}", newPivot::toLogString);
}
}
lastStoredPivot = Optional.of(newPivot.getHeader());

@ -37,7 +37,6 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
@ -58,13 +57,9 @@ public class BackwardSyncContext {
private final AtomicReference<Status> currentBackwardSyncStatus = new AtomicReference<>();
private final BackwardChain backwardChain;
private int batchSize = BATCH_SIZE;
private Optional<Hash> maybeFinalized = Optional.empty();
private Optional<Hash> maybeHead = Optional.empty();
private final int maxRetries;
private final long millisBetweenRetries = DEFAULT_MILLIS_BETWEEN_RETRIES;
private final Subscribers<BadChainListener> badChainListeners = Subscribers.create();
public BackwardSyncContext(
@ -108,16 +103,17 @@ public class BackwardSyncContext {
.orElse(Boolean.FALSE);
}
public synchronized void updateHeads(final Hash head, final Hash finalizedBlockHash) {
if (Hash.ZERO.equals(finalizedBlockHash)) {
this.maybeFinalized = Optional.empty();
public synchronized void updateHead(final Hash headHash) {
if (Hash.ZERO.equals(headHash)) {
maybeHead = Optional.empty();
} else {
this.maybeFinalized = Optional.ofNullable(finalizedBlockHash);
}
if (Hash.ZERO.equals(head)) {
this.maybeHead = Optional.empty();
} else {
this.maybeHead = Optional.ofNullable(head);
maybeHead = Optional.of(headHash);
Optional<Status> maybeCurrentStatus = Optional.ofNullable(currentBackwardSyncStatus.get());
maybeCurrentStatus.ifPresent(
status ->
backwardChain
.getBlock(headHash)
.ifPresent(block -> status.updateTargetHeight(block.getHeader().getNumber())));
}
}
@ -125,13 +121,7 @@ public class BackwardSyncContext {
Optional<Status> maybeCurrentStatus = Optional.ofNullable(this.currentBackwardSyncStatus.get());
if (isTrusted(newBlockHash)) {
return maybeCurrentStatus
.map(
status -> {
backwardChain
.getBlock(newBlockHash)
.ifPresent(block -> status.updateTargetHeight(block.getHeader().getNumber()));
return status.currentFuture;
})
.map(Status::getCurrentFuture)
.orElseGet(() -> CompletableFuture.completedFuture(null));
}
backwardChain.addNewHash(newBlockHash);
@ -300,7 +290,7 @@ public class BackwardSyncContext {
}
// In rare case when we request too many headers/blocks we get response that does not contain all
// data and we might want to retry with smaller batch size
// data, and we might want to retry with smaller batch size
public int getBatchSize() {
return batchSize;
}
@ -349,19 +339,21 @@ public class BackwardSyncContext {
LOG.debug("Nothing to do with the head");
return;
}
if (blockchain.getChainHead().getHash().equals(maybeHead.get())) {
final Hash head = maybeHead.get();
if (blockchain.getChainHead().getHash().equals(head)) {
LOG.debug("Head is already properly set");
return;
}
if (blockchain.contains(maybeHead.get())) {
LOG.debug("Changing head to {}", maybeHead.get().toHexString());
blockchain.rewindToBlock(maybeHead.get());
if (blockchain.contains(head)) {
LOG.debug("Changing head to {}", head);
blockchain.rewindToBlock(head);
return;
}
if (blockchain.getChainHead().getHash().equals(lastSavedBlock.getHash())) {
LOG.debug("Rewinding head to lastSavedBlock {}", lastSavedBlock.getHash());
blockchain.rewindToBlock(lastSavedBlock.getHash());
}
debugLambda(LOG, "Rewinding head to last saved block {}", lastSavedBlock::toLogString);
blockchain.rewindToBlock(lastSavedBlock.getHash());
}
public SyncState getSyncState() {
@ -372,13 +364,6 @@ public class BackwardSyncContext {
return backwardChain;
}
public Optional<Hash> findMaybeFinalized() {
return Stream.of(maybeFinalized, getProtocolContext().getBlockchain().getFinalized())
.filter(Optional::isPresent)
.map(Optional::get)
.findFirst();
}
public Status getStatus() {
return currentBackwardSyncStatus.get();
}

@ -59,6 +59,12 @@ public class BackwardSyncStep {
@VisibleForTesting
protected CompletableFuture<List<BlockHeader>> requestHeaders(final Hash hash) {
if (context.getProtocolContext().getBlockchain().contains(hash)) {
LOG.debug(
"Hash {} already present in local blockchain no need to request headers to peers", hash);
return CompletableFuture.completedFuture(List.of());
}
final int batchSize = context.getBatchSize();
LOG.debug("Requesting headers for hash {}, with batch size {}", hash, batchSize);

@ -73,21 +73,21 @@ public class BackwardsSyncAlgorithm {
if (!context.isReady()) {
return waitForReady();
}
runFinalizedSuccessionRule(
context.getProtocolContext().getBlockchain(), context.findMaybeFinalized());
final Optional<BlockHeader> possibleFirstAncestorHeader =
final Optional<BlockHeader> maybeFirstAncestorHeader =
context.getBackwardChain().getFirstAncestorHeader();
if (possibleFirstAncestorHeader.isEmpty()) {
if (maybeFirstAncestorHeader.isEmpty()) {
this.finished = true;
LOG.info("The Backward sync is done");
LOG.info("Current backward sync session is done");
context.getBackwardChain().clear();
return CompletableFuture.completedFuture(null);
}
final MutableBlockchain blockchain = context.getProtocolContext().getBlockchain();
final BlockHeader firstAncestorHeader = possibleFirstAncestorHeader.get();
final BlockHeader firstAncestorHeader = maybeFirstAncestorHeader.get();
if (blockchain.contains(firstAncestorHeader.getHash())) {
return executeProcessKnownAncestors();
}
if (blockchain.getChainHead().getHeight() > firstAncestorHeader.getNumber()) {
debugLambda(
LOG,
@ -99,7 +99,7 @@ public class BackwardsSyncAlgorithm {
if (finalBlockConfirmation.ancestorHeaderReached(firstAncestorHeader)) {
debugLambda(
LOG,
"Backward sync reached ancestor header with {}, starting Forward sync",
"Backward sync reached ancestor header with {}, starting forward sync",
firstAncestorHeader::toLogString);
return executeForwardAsync();
}
@ -166,52 +166,6 @@ public class BackwardsSyncAlgorithm {
}
}
@VisibleForTesting
protected void runFinalizedSuccessionRule(
final MutableBlockchain blockchain, final Optional<Hash> maybeFinalized) {
if (maybeFinalized.isEmpty()) {
LOG.debug("Nothing to validate yet, consensus layer did not provide a new finalized block");
return;
}
final Hash newFinalized = maybeFinalized.get();
if (!blockchain.contains(newFinalized)) {
LOG.debug("New finalized block {} is not imported yet", newFinalized);
return;
}
final Optional<Hash> maybeOldFinalized = blockchain.getFinalized();
if (maybeOldFinalized.isPresent()) {
final Hash oldFinalized = maybeOldFinalized.get();
if (newFinalized.equals(oldFinalized)) {
LOG.debug("We already have this block as finalized");
return;
}
BlockHeader newFinalizedHeader =
blockchain
.getBlockHeader(newFinalized)
.orElseThrow(
() ->
new BackwardSyncException(
"The header " + newFinalized.toHexString() + "not found"));
BlockHeader oldFinalizedHeader =
blockchain
.getBlockHeader(oldFinalized)
.orElseThrow(
() ->
new BackwardSyncException(
"The header " + oldFinalized.toHexString() + "not found"));
LOG.info(
"Updating finalized {} block to new finalized block {}",
oldFinalizedHeader.toLogString(),
newFinalizedHeader.toLogString());
} else {
// Todo: should TTD test be here?
LOG.info("Setting new finalized block to {}", newFinalized);
}
blockchain.setFinalized(newFinalized);
}
private CompletableFuture<Void> waitForPeers(final int count) {
final WaitForPeersTask waitForPeersTask =
WaitForPeersTask.create(context.getEthContext(), count, context.getMetricsSystem());

@ -36,7 +36,6 @@ import org.hyperledger.besu.plugin.services.BesuEvents;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
@ -262,93 +261,6 @@ public class BackwardSyncAlgSpec {
verify(algorithm).executeBackwardAsync(any());
}
@Test
public void successionShouldIgnoreEmptyFinalized() {
final BackwardsSyncAlgorithm backwardsSyncAlgorithm =
new BackwardsSyncAlgorithm(context, firstHeader -> false);
Optional<Hash> finalized = localBlockchain.getFinalized();
assertThat(finalized).isEmpty();
backwardsSyncAlgorithm.runFinalizedSuccessionRule(localBlockchain, Optional.empty());
finalized = localBlockchain.getFinalized();
assertThat(finalized).isEmpty();
}
@Test
public void successionShouldSetFinalizedFromEmpty() {
final BackwardsSyncAlgorithm backwardsSyncAlgorithm =
new BackwardsSyncAlgorithm(context, firstHeader -> false);
Optional<Hash> finalized = localBlockchain.getFinalized();
assertThat(finalized).isEmpty();
backwardsSyncAlgorithm.runFinalizedSuccessionRule(
localBlockchain, Optional.of(localBlockchain.getChainHead().getHash()));
finalized = localBlockchain.getFinalized();
assertThat(finalized).isPresent();
assertThat(finalized).contains(localBlockchain.getChainHead().getHash());
}
@Test
public void successionShouldIgnoreFinalisedWhenNotImportedYet() {
final BackwardsSyncAlgorithm backwardsSyncAlgorithm =
new BackwardsSyncAlgorithm(context, firstHeader -> false);
Optional<Hash> finalized = localBlockchain.getFinalized();
assertThat(finalized).isEmpty();
backwardsSyncAlgorithm.runFinalizedSuccessionRule(
localBlockchain, Optional.of(remoteBlockchain.getChainHead().getHash()));
finalized = localBlockchain.getFinalized();
assertThat(finalized).isEmpty();
}
@Test
public void successionShouldKeepFinalizedWhenNotChanged() {
final BackwardsSyncAlgorithm backwardsSyncAlgorithm =
new BackwardsSyncAlgorithm(context, firstHeader -> false);
Optional<Hash> finalized = localBlockchain.getFinalized();
assertThat(finalized).isEmpty();
backwardsSyncAlgorithm.runFinalizedSuccessionRule(
localBlockchain, Optional.of(localBlockchain.getChainHead().getHash()));
backwardsSyncAlgorithm.runFinalizedSuccessionRule(
localBlockchain, Optional.of(localBlockchain.getChainHead().getHash()));
finalized = localBlockchain.getFinalized();
assertThat(finalized).isPresent();
assertThat(finalized).contains(localBlockchain.getChainHead().getHash());
}
@Test
public void successionShouldUpdateOldFinalizedToNewFinalized() {
final BackwardsSyncAlgorithm backwardsSyncAlgorithm =
new BackwardsSyncAlgorithm(context, firstHeader -> false);
Optional<Hash> finalized = localBlockchain.getFinalized();
assertThat(finalized).isEmpty();
final Hash fin1 = localBlockchain.getBlockByNumber(LOCAL_HEIGHT - 5).orElseThrow().getHash();
backwardsSyncAlgorithm.runFinalizedSuccessionRule(localBlockchain, Optional.of(fin1));
finalized = localBlockchain.getFinalized();
assertThat(finalized).isPresent();
assertThat(finalized).contains(fin1);
final Hash fin2 = localBlockchain.getBlockByNumber(LOCAL_HEIGHT - 3).orElseThrow().getHash();
backwardsSyncAlgorithm.runFinalizedSuccessionRule(localBlockchain, Optional.of(fin2));
finalized = localBlockchain.getFinalized();
assertThat(finalized).isPresent();
assertThat(finalized).contains(fin2);
}
@Test
public void shouldStartForwardSyncIfGenesisIsReached() {
doReturn(true).when(context).isReady();

@ -249,16 +249,7 @@ public class BackwardSyncContextTest {
@Test
public void testUpdatingHead() {
context.updateHeads(null, null);
context.possiblyMoveHead(null);
assertThat(localBlockchain.getChainHeadBlock().getHeader().getNumber()).isEqualTo(LOCAL_HEIGHT);
context.updateHeads(Hash.ZERO, null);
context.possiblyMoveHead(null);
assertThat(localBlockchain.getChainHeadBlock().getHeader().getNumber()).isEqualTo(LOCAL_HEIGHT);
context.updateHeads(localBlockchain.getBlockByNumber(4).orElseThrow().getHash(), null);
context.updateHead(localBlockchain.getBlockByNumber(4).orElseThrow().getHash());
context.possiblyMoveHead(null);
assertThat(localBlockchain.getChainHeadBlock().getHeader().getNumber()).isEqualTo(4);

@ -163,6 +163,17 @@ public class BackwardSyncStepTest {
assertThat(blockHeader).isEqualTo(lookingForBlock.getHeader());
}
@Test
public void shouldNotRequestHeaderIfAlreadyPresent() throws Exception {
BackwardSyncStep step = new BackwardSyncStep(context, createBackwardChain(REMOTE_HEIGHT - 1));
final Block lookingForBlock = getBlockByNumber(LOCAL_HEIGHT);
final CompletableFuture<List<BlockHeader>> future =
step.requestHeaders(lookingForBlock.getHeader().getHash());
assertThat(future.get().isEmpty()).isTrue();
}
@Test
public void shouldRequestHeaderBeforeCurrentHeight() throws Exception {
extendBlockchain(REMOTE_HEIGHT + 1, context.getProtocolContext().getBlockchain());

Loading…
Cancel
Save