Add fast sync status to eth_syncing and EthQL query (#565)

The EthQL API spec specifies pulledStates and knownStates in the
`syncing` query. Previously we always returned null. This plumbs through
the needed data so that the synchronization states can report the
fast sync progress via EthQL, as well as the `eth_syncing` JSON-RPC.

Signed-off-by: Danno Ferrin <danno.ferrin@gmail.com>
pull/588/head
Danno Ferrin 5 years ago committed by GitHub
parent 5e368dd7e6
commit 6d7d525c58
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      CHANGELOG.md
  2. 6
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/graphql/internal/pojoadapter/SyncStateAdapter.java
  3. 18
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/results/SyncingResult.java
  4. 2
      ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/graphql/AbstractEthGraphQLHttpServiceTest.java
  5. 31
      ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/JsonRpcHttpServiceTest.java
  6. 3
      ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/health/ReadinessCheckTest.java
  7. 20
      ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/EthSyncingTest.java
  8. 3
      ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionServiceTest.java
  9. 4
      ethereum/api/src/test/resources/org/hyperledger/besu/ethereum/api/graphql/eth_syncing.json
  10. 21
      ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/DefaultSyncStatus.java
  11. 3
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastDownloaderFactory.java
  12. 19
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/SyncState.java
  13. 18
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/CompleteTaskStep.java
  14. 26
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/WorldStateDownloadStatus.java
  15. 20
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/WorldStateDownloader.java
  16. 2
      plugin-api/build.gradle
  17. 26
      plugin-api/src/main/java/org/hyperledger/besu/plugin/data/SyncStatus.java

@ -4,7 +4,7 @@
### Additions and Improvements
-
- Added `pulledStates` and `knownStates` to the EthQL `syncing` query and `eth_syncing` JSON-RPC api [\#565](https://github.com/hyperledger/besu/pull/565)
### Bug Fixes
@ -1757,7 +1757,7 @@ If using the URL `http://127.0.0.1` to make JSON-RPC calls, use `--host-whitelis
If your application publishes RPC ports, specify the hostnames when starting Besu. For example:
```json
```bash
pantheon --host-whitelist=example.com
```

@ -39,12 +39,10 @@ public class SyncStateAdapter {
}
public Optional<Long> getPulledStates() {
// currently synchronizer has no this information
return Optional.empty();
return syncStatus.getPulledStates();
}
public Optional<Long> getKnownStates() {
// currently synchronizer has no this information
return Optional.empty();
return syncStatus.getKnownStates();
}
}

@ -19,6 +19,8 @@ import org.hyperledger.besu.plugin.data.SyncStatus;
import java.util.Objects;
import com.fasterxml.jackson.annotation.JsonGetter;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
@JsonPropertyOrder({"startingBlock", "currentBlock", "highestBlock"})
@ -27,12 +29,16 @@ public class SyncingResult implements JsonRpcResult {
private final String startingBlock;
private final String currentBlock;
private final String highestBlock;
private final String pullStates;
private final String knownStates;
public SyncingResult(final SyncStatus syncStatus) {
this.startingBlock = Quantity.create(syncStatus.getStartingBlock());
this.currentBlock = Quantity.create(syncStatus.getCurrentBlock());
this.highestBlock = Quantity.create(syncStatus.getHighestBlock());
this.pullStates = syncStatus.getPulledStates().map(Quantity::create).orElse(null);
this.knownStates = syncStatus.getKnownStates().map(Quantity::create).orElse(null);
}
@JsonGetter(value = "startingBlock")
@ -50,6 +56,18 @@ public class SyncingResult implements JsonRpcResult {
return highestBlock;
}
@JsonInclude(value = Include.NON_NULL)
@JsonGetter(value = "pulledStates")
public String getPullStates() {
return pullStates;
}
@JsonInclude(value = Include.NON_NULL)
@JsonGetter(value = "knownStates")
public String getKnownStates() {
return knownStates;
}
@Override
public boolean equals(final Object other) {
if (!(other instanceof SyncingResult)) {

@ -118,7 +118,7 @@ public abstract class AbstractEthGraphQLHttpServiceTest {
@Before
public void setupTest() throws Exception {
final Synchronizer synchronizerMock = Mockito.mock(Synchronizer.class);
final SyncStatus status = new DefaultSyncStatus(1, 2, 3);
final SyncStatus status = new DefaultSyncStatus(1, 2, 3, Optional.of(4L), Optional.of(5L));
Mockito.when(synchronizerMock.getSyncStatus()).thenReturn(Optional.of(status));
final EthHashMiningCoordinator miningCoordinatorMock =

@ -1858,7 +1858,8 @@ public class JsonRpcHttpServiceTest {
@Test
public void ethSyncingResultIsPresent() throws Exception {
final SyncStatus testResult = new DefaultSyncStatus(1L, 8L, 7L);
final SyncStatus testResult =
new DefaultSyncStatus(1L, 8L, 7L, Optional.empty(), Optional.empty());
when(synchronizer.getSyncStatus()).thenReturn(Optional.of(testResult));
final String id = "999";
final RequestBody body =
@ -1879,6 +1880,34 @@ public class JsonRpcHttpServiceTest {
}
}
@Test
public void ethFastSyncingResultIsPresent() throws Exception {
final SyncStatus testResult =
new DefaultSyncStatus(1L, 8L, 7L, Optional.of(6L), Optional.of(5L));
when(synchronizer.getSyncStatus()).thenReturn(Optional.of(testResult));
final String id = "999";
final RequestBody body =
RequestBody.create(
JSON,
"{\"jsonrpc\":\"2.0\",\"id\":" + Json.encode(id) + ",\"method\":\"eth_syncing\"}");
try (final Response resp = client.newCall(buildPostRequest(body)).execute()) {
final String respBody = resp.body().string();
final JsonObject json = new JsonObject(respBody);
final JsonObject result = json.getJsonObject("result");
final long startingBlock = Long.decode(result.getString("startingBlock"));
assertThat(startingBlock).isEqualTo(1L);
final long currentBlock = Long.decode(result.getString("currentBlock"));
assertThat(currentBlock).isEqualTo(8L);
final long highestBlock = Long.decode(result.getString("highestBlock"));
assertThat(highestBlock).isEqualTo(7L);
final long pulledStates = Long.decode(result.getString("pulledStates"));
assertThat(pulledStates).isEqualTo(6L);
final long knownStates = Long.decode(result.getString("knownStates"));
assertThat(knownStates).isEqualTo(5L);
}
}
public BlockWithMetadata<TransactionWithMetadata, Hash> blockWithMetadata(final Block block) {
final Difficulty td = block.getHeader().getDifficulty().add(10L);
final int size = block.calculateSize();

@ -143,6 +143,7 @@ public class ReadinessCheckTest {
}
private Optional<SyncStatus> createSyncStatus(final int currentBlock, final int highestBlock) {
return Optional.of(new DefaultSyncStatus(0, currentBlock, highestBlock));
return Optional.of(
new DefaultSyncStatus(0, currentBlock, highestBlock, Optional.empty(), Optional.empty()));
}
}

@ -71,7 +71,25 @@ public class EthSyncingTest {
@Test
public void shouldReturnExpectedValueWhenSyncStatusIsNotEmpty() {
final JsonRpcRequestContext request = requestWithParams();
final SyncStatus expectedSyncStatus = new DefaultSyncStatus(0, 1, 2);
final SyncStatus expectedSyncStatus =
new DefaultSyncStatus(0, 1, 2, Optional.empty(), Optional.empty());
final JsonRpcResponse expectedResponse =
new JsonRpcSuccessResponse(
request.getRequest().getId(), new SyncingResult(expectedSyncStatus));
final Optional<SyncStatus> optionalSyncStatus = Optional.of(expectedSyncStatus);
when(synchronizer.getSyncStatus()).thenReturn(optionalSyncStatus);
final JsonRpcResponse actualResponse = method.response(request);
assertThat(actualResponse).isEqualToComparingFieldByField(expectedResponse);
verify(synchronizer).getSyncStatus();
verifyNoMoreInteractions(synchronizer);
}
@Test
public void shouldReturnExpectedValueWhenFastSyncStatusIsNotEmpty() {
final JsonRpcRequestContext request = requestWithParams();
final SyncStatus expectedSyncStatus =
new DefaultSyncStatus(0, 1, 2, Optional.of(3L), Optional.of(4L));
final JsonRpcResponse expectedResponse =
new JsonRpcSuccessResponse(
request.getRequest().getId(), new SyncingResult(expectedSyncStatus));

@ -63,7 +63,8 @@ public class SyncingSubscriptionServiceTest {
final SyncingSubscription subscription =
new SyncingSubscription(9L, "conn", SubscriptionType.SYNCING);
final List<SyncingSubscription> subscriptions = Collections.singletonList(subscription);
final Optional<SyncStatus> syncStatus = Optional.of(new DefaultSyncStatus(0L, 1L, 3L));
final Optional<SyncStatus> syncStatus =
Optional.of(new DefaultSyncStatus(0L, 1L, 3L, Optional.empty(), Optional.empty()));
final JsonRpcResult expectedSyncingResult = new SyncingResult(syncStatus.get());
doAnswer(

@ -8,8 +8,8 @@
"startingBlock" : 1,
"currentBlock" : 2,
"highestBlock" : 3,
"pulledStates" : null,
"knownStates" : null
"pulledStates" : 4,
"knownStates" : 5
}
}
},

@ -15,18 +15,27 @@
package org.hyperledger.besu.ethereum.core;
import java.util.Objects;
import java.util.Optional;
public final class DefaultSyncStatus implements org.hyperledger.besu.plugin.data.SyncStatus {
private final long startingBlock;
private final long currentBlock;
private final long highestBlock;
private final Optional<Long> pulledStates;
private final Optional<Long> knownStates;
public DefaultSyncStatus(
final long startingBlock, final long currentBlock, final long highestBlock) {
final long startingBlock,
final long currentBlock,
final long highestBlock,
final Optional<Long> pulledStates,
final Optional<Long> knownStates) {
this.startingBlock = startingBlock;
this.currentBlock = currentBlock;
this.highestBlock = highestBlock;
this.pulledStates = pulledStates;
this.knownStates = knownStates;
}
@Override
@ -44,6 +53,16 @@ public final class DefaultSyncStatus implements org.hyperledger.besu.plugin.data
return highestBlock;
}
@Override
public Optional<Long> getPulledStates() {
return pulledStates;
}
@Override
public Optional<Long> getKnownStates() {
return knownStates;
}
@Override
public boolean equals(final Object o) {
if (this == o) {

@ -60,7 +60,7 @@ public class FastDownloaderFactory {
new FastSyncStateStorage(fastSyncDataDirectory);
final FastSyncState fastSyncState =
fastSyncStateStorage.loadState(ScheduleBasedBlockHeaderFunctions.create(protocolSchedule));
if (!fastSyncState.getPivotBlockHeader().isPresent()
if (fastSyncState.getPivotBlockHeader().isEmpty()
&& protocolContext.getBlockchain().getChainHeadBlockNumber()
!= BlockHeader.GENESIS_BLOCK_NUMBER) {
LOG.info(
@ -98,6 +98,7 @@ public class FastDownloaderFactory {
taskCollection,
fastSyncDataDirectory,
fastSyncState);
syncState.setWorldStateDownloadStatus(worldStateDownloader);
return Optional.of(fastSyncDownloader);
}

@ -23,6 +23,7 @@ import org.hyperledger.besu.ethereum.core.Synchronizer.InSyncListener;
import org.hyperledger.besu.ethereum.eth.manager.ChainHeadEstimate;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStateDownloadStatus;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import org.hyperledger.besu.plugin.data.SyncStatus;
import org.hyperledger.besu.plugin.services.BesuEvents.SyncStatusListener;
@ -43,6 +44,7 @@ public class SyncState {
private final Subscribers<SyncStatusListener> syncStatusListeners = Subscribers.create();
private volatile long chainHeightListenerId;
private volatile Optional<SyncTarget> syncTarget = Optional.empty();
private Optional<WorldStateDownloadStatus> worldStateDownloadStatus = Optional.empty();
public SyncState(final Blockchain blockchain, final EthPeers ethPeers) {
this.blockchain = blockchain;
@ -111,6 +113,10 @@ public class SyncState {
replaceSyncTarget(Optional.of(syncTarget));
}
public void setWorldStateDownloadStatus(final WorldStateDownloadStatus worldStateDownloadStatus) {
this.worldStateDownloadStatus = Optional.ofNullable(worldStateDownloadStatus);
}
public boolean isInSync() {
return isInSync(Synchronizer.DEFAULT_IN_SYNC_TOLERANCE);
}
@ -183,7 +189,12 @@ public class SyncState {
final long chainHeadBlockNumber = blockchain.getChainHeadBlockNumber();
final long commonAncestor = target.commonAncestor().getNumber();
final long highestKnownBlock = bestChainHeight(chainHeadBlockNumber);
return new DefaultSyncStatus(commonAncestor, chainHeadBlockNumber, highestKnownBlock);
return new DefaultSyncStatus(
commonAncestor,
chainHeadBlockNumber,
highestKnownBlock,
worldStateDownloadStatus.flatMap(WorldStateDownloadStatus::getPulledStates),
worldStateDownloadStatus.flatMap(WorldStateDownloadStatus::getKnownStates));
});
}
@ -211,9 +222,9 @@ public class SyncState {
}
private synchronized void checkInSync() {
ChainHead localChain = getLocalChainHead();
Optional<ChainHeadEstimate> syncTargetChain = getSyncTargetChainHead();
Optional<ChainHeadEstimate> bestPeerChain = getBestPeerChainHead();
final ChainHead localChain = getLocalChainHead();
final Optional<ChainHeadEstimate> syncTargetChain = getSyncTargetChainHead();
final Optional<ChainHeadEstimate> bestPeerChain = getBestPeerChainHead();
inSyncTrackers
.values()

@ -36,7 +36,6 @@ public class CompleteTaskStep {
private final Counter retriedRequestsCounter;
private final LongSupplier worldStatePendingRequestsCurrentSupplier;
private final DecimalFormat doubleFormatter = new DecimalFormat("#.##");
private double estimatedWorldStateCompletion;
public CompleteTaskStep(
final WorldStateStorage worldStateStorage,
@ -80,16 +79,23 @@ public class CompleteTaskStep {
private void displayWorldStateSyncProgress() {
LOG.info(
"Downloaded {} world state nodes. At least {} nodes remaining. Estimated World State completion: {} %.",
completedRequestsCounter.get(),
getCompletedRequests(),
worldStatePendingRequestsCurrentSupplier.getAsLong(),
doubleFormatter.format(computeWorldStateSyncProgress() * 100.0));
}
public double computeWorldStateSyncProgress() {
final double pendingRequests = worldStatePendingRequestsCurrentSupplier.getAsLong();
final double completedRequests = completedRequestsCounter.get();
estimatedWorldStateCompletion = completedRequests / (completedRequests + pendingRequests);
return this.estimatedWorldStateCompletion;
final double pendingRequests = getPendingRequests();
final double completedRequests = getCompletedRequests();
return completedRequests / (completedRequests + pendingRequests);
}
long getCompletedRequests() {
return completedRequestsCounter.get();
}
long getPendingRequests() {
return worldStatePendingRequestsCurrentSupplier.getAsLong();
}
private void enqueueChildren(

@ -0,0 +1,26 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*
*/
package org.hyperledger.besu.ethereum.eth.sync.worldstate;
import java.util.Optional;
public interface WorldStateDownloadStatus {
public Optional<Long> getPulledStates();
public Optional<Long> getKnownStates();
}

@ -23,6 +23,7 @@ import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.services.tasks.CachingTaskCollection;
import java.time.Clock;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
@ -31,7 +32,7 @@ import java.util.function.IntSupplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class WorldStateDownloader {
public class WorldStateDownloader implements WorldStateDownloadStatus {
private static final Logger LOG = LogManager.getLogger();
private final long minMillisBeforeStalling;
@ -47,6 +48,8 @@ public class WorldStateDownloader {
private final AtomicReference<WorldDownloadState> downloadState = new AtomicReference<>();
private Optional<CompleteTaskStep> maybeCompleteTask = Optional.empty();
public WorldStateDownloader(
final EthContext ethContext,
final WorldStateStorage worldStateStorage,
@ -123,6 +126,8 @@ public class WorldStateDownloader {
newDownloadState.enqueueRequest(NodeDataRequest.createAccountDataRequest(stateRoot));
}
maybeCompleteTask =
Optional.of(new CompleteTaskStep(worldStateStorage, metricsSystem, taskCollection::size));
final WorldStateDownloadProcess downloadProcess =
WorldStateDownloadProcess.builder()
.hashCountPerRequest(hashCountPerRequest)
@ -130,8 +135,7 @@ public class WorldStateDownloader {
.loadLocalDataStep(new LoadLocalDataStep(worldStateStorage, metricsSystem))
.requestDataStep(new RequestDataStep(ethContext, metricsSystem))
.persistDataStep(new PersistDataStep(worldStateStorage))
.completeTaskStep(
new CompleteTaskStep(worldStateStorage, metricsSystem, taskCollection::size))
.completeTaskStep(maybeCompleteTask.get())
.downloadState(newDownloadState)
.pivotBlockHeader(header)
.metricsSystem(metricsSystem)
@ -151,4 +155,14 @@ public class WorldStateDownloader {
}
}
}
@Override
public Optional<Long> getPulledStates() {
return maybeCompleteTask.map(CompleteTaskStep::getCompletedRequests);
}
@Override
public Optional<Long> getKnownStates() {
return maybeCompleteTask.map(task -> task.getCompletedRequests() + task.getPendingRequests());
}
}

@ -65,7 +65,7 @@ Calculated : ${currentHash}
tasks.register('checkAPIChanges', FileStateChecker) {
description = "Checks that the API for the Plugin-API project does not change without deliberate thought"
files = sourceSets.main.allJava.files
knownHash = '+vZGHbG9d/X21cBYgDMG8rinX0gJiATbSsmWtaeYvo0='
knownHash = 'xsbRG+RsUybzMTULPcsLugarMHFTU7ohQBKw44x2/zQ='
}
check.dependsOn('checkAPIChanges')

@ -14,6 +14,10 @@
*/
package org.hyperledger.besu.plugin.data;
import org.hyperledger.besu.plugin.Unstable;
import java.util.Optional;
public interface SyncStatus {
/**
@ -36,4 +40,26 @@ public interface SyncStatus {
* @return the height of the highest known block.
*/
long getHighestBlock();
/**
* PulledStates is the number of state entries fetched so far, or empty if this is not known or
* not relevant.
*
* @return count of pulled states
*/
@Unstable
default Optional<Long> getPulledStates() {
return Optional.empty();
};
/**
* KnownStates is the number of states the node knows of so far, or empty if this is not known or
* not relevant.
*
* @return count of known states
*/
@Unstable
default Optional<Long> getKnownStates() {
return Optional.empty();
};
}

Loading…
Cancel
Save