|
|
|
@ -234,8 +234,8 @@ class SnapServer implements BesuEvents.InitialSyncCompletionListener { |
|
|
|
|
.map( |
|
|
|
|
storage -> { |
|
|
|
|
LOGGER.trace("obtained worldstate in {}", stopWatch); |
|
|
|
|
StatefulPredicate shouldContinuePredicate = |
|
|
|
|
new StatefulPredicate( |
|
|
|
|
ResponseSizePredicate responseSizePredicate = |
|
|
|
|
new ResponseSizePredicate( |
|
|
|
|
"account", |
|
|
|
|
stopWatch, |
|
|
|
|
maxResponseBytes, |
|
|
|
@ -248,9 +248,13 @@ class SnapServer implements BesuEvents.InitialSyncCompletionListener { |
|
|
|
|
return rlpOutput.encodedSize(); |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
final Bytes32 endKeyBytes = range.endKeyHash(); |
|
|
|
|
var shouldContinuePredicate = |
|
|
|
|
new ExceedingPredicate( |
|
|
|
|
new EndKeyExceedsPredicate(endKeyBytes).and(responseSizePredicate)); |
|
|
|
|
|
|
|
|
|
NavigableMap<Bytes32, Bytes> accounts = |
|
|
|
|
storage.streamFlatAccounts( |
|
|
|
|
range.startKeyHash(), range.endKeyHash(), shouldContinuePredicate); |
|
|
|
|
storage.streamFlatAccounts(range.startKeyHash(), shouldContinuePredicate); |
|
|
|
|
|
|
|
|
|
if (accounts.isEmpty() && shouldContinuePredicate.shouldContinue.get()) { |
|
|
|
|
// fetch next account after range, if it exists
|
|
|
|
@ -331,8 +335,8 @@ class SnapServer implements BesuEvents.InitialSyncCompletionListener { |
|
|
|
|
storage -> { |
|
|
|
|
LOGGER.trace("obtained worldstate in {}", stopWatch); |
|
|
|
|
// reusable predicate to limit by rec count and bytes:
|
|
|
|
|
var statefulPredicate = |
|
|
|
|
new StatefulPredicate( |
|
|
|
|
var responsePredicate = |
|
|
|
|
new ResponseSizePredicate( |
|
|
|
|
"storage", |
|
|
|
|
stopWatch, |
|
|
|
|
maxResponseBytes, |
|
|
|
@ -364,9 +368,12 @@ class SnapServer implements BesuEvents.InitialSyncCompletionListener { |
|
|
|
|
new WorldStateProofProvider(new WorldStateStorageCoordinator(storage)); |
|
|
|
|
|
|
|
|
|
for (var forAccountHash : range.hashes()) { |
|
|
|
|
var predicate = |
|
|
|
|
new ExceedingPredicate( |
|
|
|
|
new EndKeyExceedsPredicate(endKeyBytes).and(responsePredicate)); |
|
|
|
|
var accountStorages = |
|
|
|
|
storage.streamFlatStorages( |
|
|
|
|
Hash.wrap(forAccountHash), startKeyBytes, endKeyBytes, statefulPredicate); |
|
|
|
|
Hash.wrap(forAccountHash), startKeyBytes, predicate); |
|
|
|
|
|
|
|
|
|
//// address partial range queries that return empty
|
|
|
|
|
if (accountStorages.isEmpty() && isPartialRange) { |
|
|
|
@ -386,7 +393,7 @@ class SnapServer implements BesuEvents.InitialSyncCompletionListener { |
|
|
|
|
|
|
|
|
|
// if a partial storage range was requested, or we interrupted storage due to
|
|
|
|
|
// request limits, send proofs:
|
|
|
|
|
if (isPartialRange || !statefulPredicate.shouldGetMore()) { |
|
|
|
|
if (isPartialRange || !predicate.shouldGetMore()) { |
|
|
|
|
// send a proof for the left side range origin
|
|
|
|
|
proofNodes.addAll( |
|
|
|
|
worldStateProof.getStorageProofRelatedNodes( |
|
|
|
@ -403,7 +410,7 @@ class SnapServer implements BesuEvents.InitialSyncCompletionListener { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (!statefulPredicate.shouldGetMore()) { |
|
|
|
|
if (!predicate.shouldGetMore()) { |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -462,7 +469,7 @@ class SnapServer implements BesuEvents.InitialSyncCompletionListener { |
|
|
|
|
if (optCode.isPresent()) { |
|
|
|
|
if (!codeBytes.isEmpty() |
|
|
|
|
&& (sumListBytes(codeBytes) + optCode.get().size() > maxResponseBytes |
|
|
|
|
|| stopWatch.getTime() > StatefulPredicate.MAX_MILLIS_PER_REQUEST)) { |
|
|
|
|
|| stopWatch.getTime() > ResponseSizePredicate.MAX_MILLIS_PER_REQUEST)) { |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
codeBytes.add(optCode.get()); |
|
|
|
@ -521,7 +528,8 @@ class SnapServer implements BesuEvents.InitialSyncCompletionListener { |
|
|
|
|
var trieNode = optStorage.orElse(Bytes.EMPTY); |
|
|
|
|
if (!trieNodes.isEmpty() |
|
|
|
|
&& (sumListBytes(trieNodes) + trieNode.size() > maxResponseBytes |
|
|
|
|
|| stopWatch.getTime() > StatefulPredicate.MAX_MILLIS_PER_REQUEST)) { |
|
|
|
|
|| stopWatch.getTime() |
|
|
|
|
> ResponseSizePredicate.MAX_MILLIS_PER_REQUEST)) { |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
trieNodes.add(trieNode); |
|
|
|
@ -578,7 +586,39 @@ class SnapServer implements BesuEvents.InitialSyncCompletionListener { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static class StatefulPredicate implements Predicate<Pair<Bytes32, Bytes>> { |
|
|
|
|
/** |
|
|
|
|
* Predicate that doesn't immediately stop when the delegate predicate returns false, but instead |
|
|
|
|
* sets a flag to stop after the current element is processed. |
|
|
|
|
*/ |
|
|
|
|
static class ExceedingPredicate implements Predicate<Pair<Bytes32, Bytes>> { |
|
|
|
|
private final Predicate<Pair<Bytes32, Bytes>> delegate; |
|
|
|
|
final AtomicBoolean shouldContinue = new AtomicBoolean(true); |
|
|
|
|
|
|
|
|
|
public ExceedingPredicate(final Predicate<Pair<Bytes32, Bytes>> delegate) { |
|
|
|
|
this.delegate = delegate; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
public boolean test(final Pair<Bytes32, Bytes> pair) { |
|
|
|
|
final boolean result = delegate.test(pair); |
|
|
|
|
return shouldContinue.getAndSet(result); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public boolean shouldGetMore() { |
|
|
|
|
return shouldContinue.get(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** Predicate that stops when the end key is exceeded. */ |
|
|
|
|
record EndKeyExceedsPredicate(Bytes endKey) implements Predicate<Pair<Bytes32, Bytes>> { |
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
public boolean test(final Pair<Bytes32, Bytes> pair) { |
|
|
|
|
return endKey.compareTo(Bytes.wrap(pair.getFirst())) > 0; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static class ResponseSizePredicate implements Predicate<Pair<Bytes32, Bytes>> { |
|
|
|
|
// default to a max of 4 seconds per request
|
|
|
|
|
static final long MAX_MILLIS_PER_REQUEST = 4000; |
|
|
|
|
|
|
|
|
@ -588,26 +628,19 @@ class SnapServer implements BesuEvents.InitialSyncCompletionListener { |
|
|
|
|
final Function<Pair<Bytes32, Bytes>, Integer> encodingSizeAccumulator; |
|
|
|
|
final StopWatch stopWatch; |
|
|
|
|
final int maxResponseBytes; |
|
|
|
|
// TODO: remove this hack, 10% is a fudge factor to account for the proof node size
|
|
|
|
|
final int maxResponseBytesFudgeFactor; |
|
|
|
|
final String forWhat; |
|
|
|
|
|
|
|
|
|
StatefulPredicate( |
|
|
|
|
ResponseSizePredicate( |
|
|
|
|
final String forWhat, |
|
|
|
|
final StopWatch stopWatch, |
|
|
|
|
final int maxResponseBytes, |
|
|
|
|
final Function<Pair<Bytes32, Bytes>, Integer> encodingSizeAccumulator) { |
|
|
|
|
this.stopWatch = stopWatch; |
|
|
|
|
this.maxResponseBytes = maxResponseBytes; |
|
|
|
|
this.maxResponseBytesFudgeFactor = maxResponseBytes * 9 / 10; |
|
|
|
|
this.forWhat = forWhat; |
|
|
|
|
this.encodingSizeAccumulator = encodingSizeAccumulator; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public boolean shouldGetMore() { |
|
|
|
|
return shouldContinue.get(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
public boolean test(final Pair<Bytes32, Bytes> pair) { |
|
|
|
|
LOGGER |
|
|
|
@ -628,14 +661,11 @@ class SnapServer implements BesuEvents.InitialSyncCompletionListener { |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
var hasNoRecords = recordLimit.get() == 0; |
|
|
|
|
var underRecordLimit = recordLimit.addAndGet(1) <= MAX_ENTRIES_PER_REQUEST; |
|
|
|
|
var underByteLimit = |
|
|
|
|
byteLimit.accumulateAndGet(0, (cur, __) -> cur + encodingSizeAccumulator.apply(pair)) |
|
|
|
|
< maxResponseBytesFudgeFactor; |
|
|
|
|
// Only enforce limits when we have at least 1 record as the snapsync spec
|
|
|
|
|
// requires at least 1 record must be returned
|
|
|
|
|
if (hasNoRecords || (underRecordLimit && underByteLimit)) { |
|
|
|
|
< maxResponseBytes; |
|
|
|
|
if (underRecordLimit && underByteLimit) { |
|
|
|
|
return true; |
|
|
|
|
} else { |
|
|
|
|
shouldContinue.set(false); |
|
|
|
|