Backward sync log UX improvements (#4655)

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>
pull/4664/head
Fabio Di Fabio 2 years ago committed by GitHub
parent d95cea338f
commit af04357874
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 8
      consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/blockcreation/MergeCoordinator.java
  3. 7
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/engine/EngineNewPayload.java
  4. 7
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/RetryingGetBlockFromPeersTask.java
  5. 155
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncContext.java
  6. 38
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncStep.java
  7. 26
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardsSyncAlgorithm.java
  8. 19
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/ForwardSyncStep.java
  9. 11
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/SyncStepStep.java

@ -11,6 +11,7 @@
- Upgrade RocksDB version from 7.6.0 to 7.7.3
- Added new RPC endpoints `debug_setHead` & `debug_replayBlock [4580](https://github.com/hyperledger/besu/pull/4580)
- Upgrade OpenTelemetry to version 1.19.0 [#3675](https://github.com/hyperledger/besu/pull/3675)
- Backward sync log UX improvements [#4655](https://github.com/hyperledger/besu/pull/4655)
### Bug Fixes

@ -390,7 +390,11 @@ public class MergeCoordinator implements MergeMiningCoordinator, BadChainListene
}
private Void logSyncException(final Hash blockHash, final Throwable exception) {
LOG.warn("Sync to block hash " + blockHash.toHexString() + " failed", exception.getMessage());
debugLambda(
LOG,
"Sync to block hash {} failed, reason {}",
blockHash::toHexString,
exception::getMessage);
return null;
}
@ -439,7 +443,7 @@ public class MergeCoordinator implements MergeMiningCoordinator, BadChainListene
if (newHead.getNumber() < blockchain.getChainHeadBlockNumber()
&& isDescendantOf(newHead, blockchain.getChainHeadHeader())) {
LOG.info("Ignoring update to old head");
debugLambda(LOG, "Ignoring update to old head {}", newHead::toLogString);
return ForkchoiceResult.withIgnoreUpdateToOldHead(newHead);
}

@ -180,7 +180,6 @@ public class EngineNewPayload extends ExecutionEngineJsonRpcMethod {
final var block =
new Block(newBlockHeader, new BlockBody(transactions, Collections.emptyList()));
final String warningMessage = "Sync to block " + block.toLogString() + " failed";
if (mergeContext.get().isSyncing() || parentHeader.isEmpty()) {
LOG.debug(
@ -192,7 +191,11 @@ public class EngineNewPayload extends ExecutionEngineJsonRpcMethod {
.appendNewPayloadToSync(block)
.exceptionally(
exception -> {
LOG.warn(warningMessage, exception.getMessage());
debugLambda(
LOG,
"Sync to block {} failed, reason {}",
block::toLogString,
exception::getMessage);
return null;
});
return respondWith(reqId, blockParam, null, SYNCING);

@ -102,8 +102,11 @@ public class RetryingGetBlockFromPeersTask
this::getAssignedPeer,
this::getRetryCount);
} else {
LOG.warn(
"Failed to get block {} after {} retries", logBlockNumberMaybeHash(), getRetryCount());
debugLambda(
LOG,
"Failed to get block {} after {} retries",
this::logBlockNumberMaybeHash,
this::getRetryCount);
}
super.handleTaskError(error);
}

@ -47,7 +47,7 @@ public class BackwardSyncContext {
private static final Logger LOG = LoggerFactory.getLogger(BackwardSyncContext.class);
public static final int BATCH_SIZE = 200;
private static final int DEFAULT_MAX_RETRIES = 20;
private static final long MILLIS_DELAY_BETWEEN_PROGRESS_LOG = 10_000L;
private static final long DEFAULT_MILLIS_BETWEEN_RETRIES = 5000;
protected final ProtocolContext protocolContext;
@ -55,9 +55,7 @@ public class BackwardSyncContext {
private final EthContext ethContext;
private final MetricsSystem metricsSystem;
private final SyncState syncState;
private final AtomicReference<CompletableFuture<Void>> currentBackwardSyncFuture =
new AtomicReference<>();
private final AtomicReference<Status> currentBackwardSyncStatus = new AtomicReference<>();
private final BackwardChain backwardChain;
private int batchSize = BATCH_SIZE;
private Optional<Hash> maybeFinalized = Optional.empty();
@ -105,8 +103,8 @@ public class BackwardSyncContext {
}
public synchronized boolean isSyncing() {
return Optional.ofNullable(currentBackwardSyncFuture.get())
.map(CompletableFuture::isDone)
return Optional.ofNullable(currentBackwardSyncStatus.get())
.map(status -> status.currentFuture.isDone())
.orElse(Boolean.FALSE);
}
@ -124,33 +122,51 @@ public class BackwardSyncContext {
}
public synchronized CompletableFuture<Void> syncBackwardsUntil(final Hash newBlockHash) {
Optional<CompletableFuture<Void>> maybeFuture =
Optional.ofNullable(this.currentBackwardSyncFuture.get());
Optional<Status> maybeCurrentStatus = Optional.ofNullable(this.currentBackwardSyncStatus.get());
if (isTrusted(newBlockHash)) {
return maybeFuture.orElseGet(() -> CompletableFuture.completedFuture(null));
return maybeCurrentStatus
.map(
status -> {
backwardChain
.getBlock(newBlockHash)
.ifPresent(block -> status.updateTargetHeight(block.getHeader().getNumber()));
return status.currentFuture;
})
.orElseGet(() -> CompletableFuture.completedFuture(null));
}
backwardChain.addNewHash(newBlockHash);
return maybeFuture.orElseGet(
() -> {
CompletableFuture<Void> future = prepareBackwardSyncFutureWithRetry();
this.currentBackwardSyncFuture.set(future);
return future;
});
return maybeCurrentStatus
.map(Status::getCurrentFuture)
.orElseGet(
() -> {
LOG.info("Starting a new backward sync session");
Status status = new Status(prepareBackwardSyncFutureWithRetry());
this.currentBackwardSyncStatus.set(status);
return status.currentFuture;
});
}
public synchronized CompletableFuture<Void> syncBackwardsUntil(final Block newPivot) {
Optional<CompletableFuture<Void>> maybeFuture =
Optional.ofNullable(this.currentBackwardSyncFuture.get());
Optional<Status> maybeCurrentStatus = Optional.ofNullable(this.currentBackwardSyncStatus.get());
if (isTrusted(newPivot.getHash())) {
return maybeFuture.orElseGet(() -> CompletableFuture.completedFuture(null));
return maybeCurrentStatus
.map(Status::getCurrentFuture)
.orElseGet(() -> CompletableFuture.completedFuture(null));
}
backwardChain.appendTrustedBlock(newPivot);
return maybeFuture.orElseGet(
() -> {
CompletableFuture<Void> future = prepareBackwardSyncFutureWithRetry();
this.currentBackwardSyncFuture.set(future);
return future;
});
return maybeCurrentStatus
.map(Status::getCurrentFuture)
.orElseGet(
() -> {
LOG.info("Starting a new backward sync session");
LOG.info("Backward sync target block is {}", newPivot.toLogString());
Status status = new Status(prepareBackwardSyncFutureWithRetry());
status.setSyncRange(
getProtocolContext().getBlockchain().getChainHeadBlockNumber(),
newPivot.getHeader().getNumber());
this.currentBackwardSyncStatus.set(status);
return status.currentFuture;
});
}
private boolean isTrusted(final Hash hash) {
@ -168,8 +184,9 @@ public class BackwardSyncContext {
return prepareBackwardSyncFutureWithRetry(maxRetries)
.handle(
(unused, throwable) -> {
this.currentBackwardSyncFuture.set(null);
this.currentBackwardSyncStatus.set(null);
if (throwable != null) {
LOG.info("Current backward sync session failed, it will be restarted");
throw extractBackwardSyncException(throwable)
.orElse(new BackwardSyncException(throwable));
}
@ -201,8 +218,8 @@ public class BackwardSyncContext {
.ifPresentOrElse(
backwardSyncException -> {
if (backwardSyncException.shouldRestart()) {
LOG.info(
"Backward sync failed ({}). Current Peers: {}. Retrying in {} milliseconds...",
LOG.debug(
"Backward sync failed ({}). Current Peers: {}. Retrying in {} milliseconds",
throwable.getMessage(),
ethContext.getEthPeers().peerCount(),
millisBetweenRetries);
@ -213,8 +230,8 @@ public class BackwardSyncContext {
}
},
() -> {
LOG.warn(
"Backward sync failed ({}). Current Peers: {}. Retrying in {} milliseconds...",
LOG.debug(
"Backward sync failed ({}). Current Peers: {}. Retrying in {} milliseconds",
throwable.getMessage(),
ethContext.getEthPeers().peerCount(),
millisBetweenRetries);
@ -278,10 +295,6 @@ public class BackwardSyncContext {
&& syncState.isInitialSyncPhaseDone();
}
public CompletableFuture<Void> stop() {
return currentBackwardSyncFuture.get();
}
public void subscribeBadChainListener(final BadChainListener badChainListener) {
badChainListeners.subscribe(badChainListener);
}
@ -316,6 +329,7 @@ public class BackwardSyncContext {
.getBlockchain()
.appendBlock(block, optResult.getYield().get().getReceipts());
possiblyMoveHead(block);
logBlockImportProgress(block.getHeader().getNumber());
} else {
emitBadChainEvent(block);
throw new BackwardSyncException(
@ -365,6 +379,10 @@ public class BackwardSyncContext {
.findFirst();
}
public Status getStatus() {
return currentBackwardSyncStatus.get();
}
private void emitBadChainEvent(final Block badBlock) {
final List<Block> badBlockDescendants = new ArrayList<>();
final List<BlockHeader> badBlockHeaderDescendants = new ArrayList<>();
@ -385,4 +403,75 @@ public class BackwardSyncContext {
badChainListeners.forEach(
listener -> listener.onBadChain(badBlock, badBlockDescendants, badBlockHeaderDescendants));
}
private void logBlockImportProgress(final long currImportedHeight) {
final Status currentStatus = getStatus();
final long targetHeight = currentStatus.getTargetChainHeight();
final long initialHeight = currentStatus.getInitialChainHeight();
final long estimatedTotal = targetHeight - initialHeight;
final long imported = currImportedHeight - initialHeight;
final float completedPercentage = 100.0f * imported / estimatedTotal;
if (completedPercentage < 100.0f) {
if (currentStatus.progressLogDue()) {
LOG.info(
String.format(
"Backward sync phase 2 of 2, %.2f%% completed, imported %d blocks of at least %d (current head %d, target head %d). Peers: %d",
completedPercentage,
imported,
estimatedTotal,
currImportedHeight,
currentStatus.getTargetChainHeight(),
getEthContext().getEthPeers().peerCount()));
}
} else {
LOG.info(
String.format(
"Backward sync phase 2 of 2 completed, imported a total of %d blocks. Peers: %d",
imported, getEthContext().getEthPeers().peerCount()));
}
}
static class Status {
private final CompletableFuture<Void> currentFuture;
private long targetChainHeight;
private long initialChainHeight;
private static long lastLogAt = 0;
public Status(final CompletableFuture<Void> currentFuture) {
this.currentFuture = currentFuture;
}
public void setSyncRange(final long initialHeight, final long targetHeight) {
initialChainHeight = initialHeight;
targetChainHeight = targetHeight;
}
public void updateTargetHeight(final long newTargetHeight) {
targetChainHeight = newTargetHeight;
}
public boolean progressLogDue() {
final long now = System.currentTimeMillis();
if (now - lastLogAt > MILLIS_DELAY_BETWEEN_PROGRESS_LOG) {
lastLogAt = now;
return true;
}
return false;
}
public CompletableFuture<Void> getCurrentFuture() {
return currentFuture;
}
public long getTargetChainHeight() {
return targetChainHeight;
}
public long getInitialChainHeight() {
return initialChainHeight;
}
}
}

@ -15,7 +15,6 @@
package org.hyperledger.besu.ethereum.eth.sync.backwardsync;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.debugLambda;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.infoLambda;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.BlockHeader;
@ -61,7 +60,7 @@ public class BackwardSyncStep {
@VisibleForTesting
protected CompletableFuture<List<BlockHeader>> requestHeaders(final Hash hash) {
final int batchSize = context.getBatchSize();
debugLambda(LOG, "Requesting header for hash {}", hash::toHexString);
LOG.debug("Requesting headers for hash {}, with batch size {}", hash, batchSize);
final RetryingGetHeadersEndingAtFromPeerByHashTask
retryingGetHeadersEndingAtFromPeerByHashTask =
@ -101,12 +100,35 @@ public class BackwardSyncStep {
for (BlockHeader blockHeader : blockHeaders) {
saveHeader(blockHeader);
}
infoLambda(
LOG,
"Saved headers {} -> {} (head: {})",
() -> blockHeaders.get(0).getNumber(),
() -> blockHeaders.get(blockHeaders.size() - 1).getNumber(),
() -> context.getProtocolContext().getBlockchain().getChainHead().getHeight());
logProgress(blockHeaders.get(blockHeaders.size() - 1).getNumber());
return null;
}
private void logProgress(final long currLowestDownloadedHeight) {
final long targetHeight = context.getStatus().getTargetChainHeight();
final long initialHeight = context.getStatus().getInitialChainHeight();
final long estimatedTotal = targetHeight - initialHeight;
final long downloaded = targetHeight - currLowestDownloadedHeight;
final float completedPercentage = 100.0f * downloaded / estimatedTotal;
if (completedPercentage < 100.0f) {
if (context.getStatus().progressLogDue()) {
LOG.info(
String.format(
"Backward sync phase 1 of 2, %.2f%% completed, downloaded %d headers of at least %d. Peers: %d",
completedPercentage,
downloaded,
estimatedTotal,
context.getEthContext().getEthPeers().peerCount()));
}
} else {
LOG.info(
String.format(
"Backward sync phase 1 of 2 completed, downloaded a total of %d headers. Peers: %d",
downloaded, context.getEthContext().getEthPeers().peerCount()));
}
}
}

@ -22,6 +22,7 @@ import static org.slf4j.LoggerFactory.getLogger;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import java.util.Optional;
@ -57,11 +58,15 @@ public class BackwardsSyncAlgorithm {
final Optional<Hash> firstHash = context.getBackwardChain().getFirstHashToAppend();
if (firstHash.isPresent()) {
return executeSyncStep(firstHash.get())
.whenComplete(
(result, throwable) -> {
if (throwable == null) {
context.getBackwardChain().removeFromHashToAppend(firstHash.get());
}
.thenAccept(
result -> {
LOG.info("Backward sync target block is {}", result.toLogString());
context.getBackwardChain().removeFromHashToAppend(firstHash.get());
context
.getStatus()
.setSyncRange(
context.getProtocolContext().getBlockchain().getChainHeadBlockNumber(),
result.getHeader().getNumber());
});
}
if (!context.isReady()) {
@ -73,7 +78,7 @@ public class BackwardsSyncAlgorithm {
context.getBackwardChain().getFirstAncestorHeader();
if (possibleFirstAncestorHeader.isEmpty()) {
this.finished = true;
LOG.info("The Backward sync is done...");
LOG.info("The Backward sync is done");
context.getBackwardChain().clear();
return CompletableFuture.completedFuture(null);
}
@ -85,13 +90,16 @@ public class BackwardsSyncAlgorithm {
if (blockchain.getChainHead().getHeight() > firstAncestorHeader.getNumber()) {
debugLambda(
LOG,
"Backward reached below previous head {} : {}",
"Backward reached below current chain head {} : {}",
() -> blockchain.getChainHead().toLogString(),
firstAncestorHeader::toLogString);
}
if (finalBlockConfirmation.ancestorHeaderReached(firstAncestorHeader)) {
LOG.info("Backward sync reached ancestor header, starting Forward sync");
debugLambda(
LOG,
"Backward sync reached ancestor header with {}, starting Forward sync",
firstAncestorHeader::toLogString);
return executeForwardAsync();
}
@ -104,7 +112,7 @@ public class BackwardsSyncAlgorithm {
}
@VisibleForTesting
public CompletableFuture<Void> executeSyncStep(final Hash hash) {
public CompletableFuture<Block> executeSyncStep(final Hash hash) {
return new SyncStepStep(context, context.getBackwardChain()).executeAsync(hash);
}

@ -15,7 +15,6 @@
package org.hyperledger.besu.ethereum.eth.sync.backwardsync;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.debugLambda;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.infoLambda;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockHeader;
@ -78,6 +77,7 @@ public class ForwardSyncStep {
return run.thenApply(AbstractPeerTask.PeerTaskResult::getResult)
.thenApply(
blocks -> {
LOG.debug("Got {} blocks from peers", blocks.size());
blocks.sort(Comparator.comparing(block -> block.getHeader().getNumber()));
return blocks;
});
@ -86,8 +86,8 @@ public class ForwardSyncStep {
@VisibleForTesting
protected Void saveBlocks(final List<Block> blocks) {
if (blocks.isEmpty()) {
LOG.info("No blocks to save...");
context.halveBatchSize();
LOG.debug("No blocks to save, reducing batch size to {}", context.getBatchSize());
return null;
}
@ -97,20 +97,21 @@ public class ForwardSyncStep {
.getProtocolContext()
.getBlockchain()
.getBlockByHash(block.getHeader().getParentHash());
if (parent.isEmpty()) {
context.halveBatchSize();
debugLambda(
LOG,
"Parent block {} not found, while saving block {}, reducing batch size to {}",
block.getHeader().getParentHash()::toString,
block::toLogString,
context::getBatchSize);
return null;
} else {
context.saveBlock(block);
}
}
infoLambda(
LOG,
"Saved blocks {} -> {} (target: {})",
() -> blocks.get(0).getHeader().getNumber(),
() -> blocks.get(blocks.size() - 1).getHeader().getNumber(),
() ->
backwardChain.getPivot().orElse(blocks.get(blocks.size() - 1)).getHeader().getNumber());
context.resetBatchSize();
return null;
}

@ -17,6 +17,7 @@
package org.hyperledger.besu.ethereum.eth.sync.backwardsync;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.debugLambda;
import static org.slf4j.LoggerFactory.getLogger;
import org.hyperledger.besu.datatypes.Hash;
@ -41,13 +42,14 @@ public class SyncStepStep {
this.backwardChain = backwardChain;
}
public CompletableFuture<Void> executeAsync(final Hash hash) {
public CompletableFuture<Block> executeAsync(final Hash hash) {
return CompletableFuture.supplyAsync(() -> hash)
.thenCompose(this::requestBlock)
.thenApply(this::saveBlock);
}
private CompletableFuture<Block> requestBlock(final Hash targetHash) {
debugLambda(LOG, "Fetching block by hash {} from peers", targetHash::toString);
final RetryingGetBlockFromPeersTask getBlockTask =
RetryingGetBlockFromPeersTask.create(
context.getProtocolSchedule(),
@ -63,10 +65,9 @@ public class SyncStepStep {
.thenApply(AbstractPeerTask.PeerTaskResult::getResult);
}
private Void saveBlock(final Block block) {
LOG.debug(
"Appending block {}({})", block.getHeader().getNumber(), block.getHash().toHexString());
private Block saveBlock(final Block block) {
debugLambda(LOG, "Appending fetched block {}", block::toLogString);
backwardChain.appendTrustedBlock(block);
return null;
return block;
}
}

Loading…
Cancel
Save