mirror of https://github.com/hyperledger/besu
Batching backward sync (#3532)
* Backward sync now batches requests Signed-off-by: Jiri Peinlich <jiri.peinlich@gmail.com>pull/3617/head
parent
a8d13c31c9
commit
0fce76cc9f
@ -0,0 +1,92 @@ |
||||
/* |
||||
* Copyright Hyperledger Besu Contributors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
*/ |
||||
|
||||
package org.hyperledger.besu.ethereum.eth.manager.task; |
||||
|
||||
import static com.google.common.base.Preconditions.checkNotNull; |
||||
|
||||
import org.hyperledger.besu.datatypes.Hash; |
||||
import org.hyperledger.besu.ethereum.core.BlockHeader; |
||||
import org.hyperledger.besu.ethereum.eth.manager.EthContext; |
||||
import org.hyperledger.besu.ethereum.eth.manager.EthPeer; |
||||
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; |
||||
import org.hyperledger.besu.plugin.services.MetricsSystem; |
||||
|
||||
import java.util.List; |
||||
import java.util.Optional; |
||||
import java.util.concurrent.CompletableFuture; |
||||
|
||||
import com.google.common.annotations.VisibleForTesting; |
||||
|
||||
public class RetryingGetHeadersEndingAtFromPeerByHashTask |
||||
extends AbstractRetryingPeerTask<List<BlockHeader>> { |
||||
|
||||
private final Hash referenceHash; |
||||
private final ProtocolSchedule protocolSchedule; |
||||
private final long minimumRequiredBlockNumber; |
||||
private final int count; |
||||
|
||||
@VisibleForTesting |
||||
RetryingGetHeadersEndingAtFromPeerByHashTask( |
||||
final ProtocolSchedule protocolSchedule, |
||||
final EthContext ethContext, |
||||
final Hash referenceHash, |
||||
final long minimumRequiredBlockNumber, |
||||
final int count, |
||||
final MetricsSystem metricsSystem) { |
||||
super(ethContext, 3, List::isEmpty, metricsSystem); |
||||
this.protocolSchedule = protocolSchedule; |
||||
this.minimumRequiredBlockNumber = minimumRequiredBlockNumber; |
||||
this.count = count; |
||||
checkNotNull(referenceHash); |
||||
this.referenceHash = referenceHash; |
||||
} |
||||
|
||||
public static RetryingGetHeadersEndingAtFromPeerByHashTask endingAtHash( |
||||
final ProtocolSchedule protocolSchedule, |
||||
final EthContext ethContext, |
||||
final Hash referenceHash, |
||||
final long minimumRequiredBlockNumber, |
||||
final int count, |
||||
final MetricsSystem metricsSystem) { |
||||
return new RetryingGetHeadersEndingAtFromPeerByHashTask( |
||||
protocolSchedule, |
||||
ethContext, |
||||
referenceHash, |
||||
minimumRequiredBlockNumber, |
||||
count, |
||||
metricsSystem); |
||||
} |
||||
|
||||
@Override |
||||
protected CompletableFuture<List<BlockHeader>> executePeerTask( |
||||
final Optional<EthPeer> assignedPeer) { |
||||
final AbstractGetHeadersFromPeerTask task = |
||||
GetHeadersFromPeerByHashTask.endingAtHash( |
||||
protocolSchedule, |
||||
getEthContext(), |
||||
referenceHash, |
||||
minimumRequiredBlockNumber, |
||||
count, |
||||
getMetricsSystem()); |
||||
assignedPeer.ifPresent(task::assignPeer); |
||||
return executeSubTask(task::run) |
||||
.thenApply( |
||||
peerResult -> { |
||||
result.complete(peerResult.getResult()); |
||||
return peerResult.getResult(); |
||||
}); |
||||
} |
||||
} |
@ -0,0 +1,172 @@ |
||||
/* |
||||
* Copyright Hyperledger Besu Contributors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
*/ |
||||
package org.hyperledger.besu.ethereum.eth.sync.backwardsync; |
||||
|
||||
import static org.slf4j.LoggerFactory.getLogger; |
||||
|
||||
import org.hyperledger.besu.datatypes.Hash; |
||||
import org.hyperledger.besu.ethereum.ProtocolContext; |
||||
import org.hyperledger.besu.ethereum.core.Block; |
||||
import org.hyperledger.besu.ethereum.eth.manager.EthContext; |
||||
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask; |
||||
import org.hyperledger.besu.ethereum.eth.manager.task.RetryingGetBlockFromPeersTask; |
||||
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; |
||||
import org.hyperledger.besu.plugin.services.MetricsSystem; |
||||
|
||||
import java.time.Duration; |
||||
import java.util.ArrayDeque; |
||||
import java.util.ArrayList; |
||||
import java.util.Collections; |
||||
import java.util.List; |
||||
import java.util.Optional; |
||||
import java.util.Queue; |
||||
import java.util.concurrent.CompletableFuture; |
||||
import java.util.function.Function; |
||||
import javax.annotation.concurrent.GuardedBy; |
||||
import javax.annotation.concurrent.ThreadSafe; |
||||
|
||||
import org.slf4j.Logger; |
||||
|
||||
@ThreadSafe |
||||
public class BackwardSyncLookupService { |
||||
private static final Logger LOG = getLogger(BackwardSyncLookupService.class); |
||||
private static final int MAX_RETRIES = 100; |
||||
public static final int UNUSED = -1; |
||||
|
||||
@GuardedBy("this") |
||||
private final Queue<Hash> hashes = new ArrayDeque<>(); |
||||
|
||||
@GuardedBy("this") |
||||
boolean running = false; |
||||
|
||||
private final ProtocolSchedule protocolSchedule; |
||||
private final EthContext ethContext; |
||||
private final MetricsSystem metricsSystem; |
||||
private List<Block> results = new ArrayList<>(); |
||||
private final ProtocolContext protocolContext; |
||||
|
||||
public BackwardSyncLookupService( |
||||
final ProtocolSchedule protocolSchedule, |
||||
final EthContext ethContext, |
||||
final MetricsSystem metricsSystem, |
||||
final ProtocolContext protocolContext) { |
||||
this.protocolSchedule = protocolSchedule; |
||||
this.ethContext = ethContext; |
||||
this.metricsSystem = metricsSystem; |
||||
this.protocolContext = protocolContext; |
||||
} |
||||
|
||||
public CompletableFuture<List<Block>> lookup(final Hash newBlockhash) { |
||||
synchronized (this) { |
||||
hashes.add(newBlockhash); |
||||
if (running) { |
||||
LOG.info( |
||||
"some other future is already running and will process our hash {} when time comes...", |
||||
newBlockhash.toHexString()); |
||||
return CompletableFuture.completedFuture(Collections.emptyList()); |
||||
} |
||||
running = true; |
||||
} |
||||
return findBlocksWithRetries() |
||||
.handle( |
||||
(blocks, throwable) -> { |
||||
synchronized (this) { |
||||
running = false; |
||||
} |
||||
if (throwable != null) { |
||||
throw new BackwardSyncException(throwable); |
||||
} |
||||
return blocks; |
||||
}); |
||||
} |
||||
|
||||
private CompletableFuture<List<Block>> findBlocksWithRetries() { |
||||
|
||||
CompletableFuture<List<Block>> f = tryToFindBlocks(); |
||||
for (int i = 0; i < MAX_RETRIES; i++) { |
||||
f = |
||||
f.thenApply(CompletableFuture::completedFuture) |
||||
.exceptionally( |
||||
ex -> { |
||||
synchronized (this) { |
||||
if (!results.isEmpty()) { |
||||
List<Block> copy = new ArrayList<>(results); |
||||
results = new ArrayList<>(); |
||||
return CompletableFuture.completedFuture(copy); |
||||
} |
||||
} |
||||
LOG.error( |
||||
"Failed to fetch blocks because {} Current peers: {}. Waiting for few seconds ...", |
||||
ex.getMessage(), |
||||
ethContext.getEthPeers().peerCount()); |
||||
return ethContext |
||||
.getScheduler() |
||||
.scheduleFutureTask(this::tryToFindBlocks, Duration.ofSeconds(5)); |
||||
}) |
||||
.thenCompose(Function.identity()); |
||||
} |
||||
return f.thenApply(this::rememberResults).thenCompose(this::possibleNextHash); |
||||
} |
||||
|
||||
private CompletableFuture<List<Block>> tryToFindBlocks() { |
||||
return CompletableFuture.supplyAsync(this::getNextHash) |
||||
.thenCompose(this::tryToFindBlock) |
||||
.thenApply(this::rememberResult) |
||||
.thenCompose(this::possibleNextHash); |
||||
} |
||||
|
||||
private CompletableFuture<List<Block>> possibleNextHash(final List<Block> blocks) { |
||||
synchronized (this) { |
||||
hashes.poll(); |
||||
if (hashes.isEmpty()) { |
||||
results = new ArrayList<>(); |
||||
running = false; |
||||
return CompletableFuture.completedFuture(blocks); |
||||
} |
||||
} |
||||
return tryToFindBlocks(); |
||||
} |
||||
|
||||
private List<Block> rememberResult(final Block block) { |
||||
this.results.add(block); |
||||
return results; |
||||
} |
||||
|
||||
private List<Block> rememberResults(final List<Block> blocks) { |
||||
this.results.addAll(blocks); |
||||
return results; |
||||
} |
||||
|
||||
private synchronized Hash getNextHash() { |
||||
return hashes.peek(); |
||||
} |
||||
|
||||
private CompletableFuture<Block> tryToFindBlock(final Hash targetHash) { |
||||
|
||||
final RetryingGetBlockFromPeersTask getBlockTask = |
||||
RetryingGetBlockFromPeersTask.create( |
||||
protocolContext, |
||||
protocolSchedule, |
||||
ethContext, |
||||
metricsSystem, |
||||
ethContext.getEthPeers().getMaxPeers(), |
||||
Optional.of(targetHash), |
||||
UNUSED); |
||||
return ethContext |
||||
.getScheduler() |
||||
.scheduleSyncWorkerTask(getBlockTask::run) |
||||
.thenApply(AbstractPeerTask.PeerTaskResult::getResult); |
||||
} |
||||
} |
@ -0,0 +1,47 @@ |
||||
/* |
||||
* Copyright Hyperledger Besu Contributors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
*/ |
||||
|
||||
package org.hyperledger.besu.ethereum.eth.sync.backwardsync; |
||||
|
||||
import org.hyperledger.besu.ethereum.core.Block; |
||||
import org.hyperledger.besu.ethereum.core.BlockHeaderFunctions; |
||||
import org.hyperledger.besu.ethereum.rlp.RLP; |
||||
import org.hyperledger.besu.ethereum.rlp.RLPInput; |
||||
|
||||
import org.apache.tuweni.bytes.Bytes; |
||||
|
||||
public class BlocksConvertor implements ValueConvertor<Block> { |
||||
private final BlockHeaderFunctions blockHeaderFunctions; |
||||
|
||||
public BlocksConvertor(final BlockHeaderFunctions blockHeaderFunctions) { |
||||
this.blockHeaderFunctions = blockHeaderFunctions; |
||||
} |
||||
|
||||
public static ValueConvertor<Block> of(final BlockHeaderFunctions blockHeaderFunctions) { |
||||
return new BlocksConvertor(blockHeaderFunctions); |
||||
} |
||||
|
||||
@Override |
||||
public Block fromBytes(final byte[] bytes) { |
||||
|
||||
final RLPInput input = RLP.input(Bytes.wrap(bytes)); |
||||
return Block.readFrom(input, blockHeaderFunctions); |
||||
} |
||||
|
||||
@Override |
||||
public byte[] toBytes(final Block value) { |
||||
return value.toRlp().toArrayUnsafe(); |
||||
} |
||||
} |
@ -0,0 +1,47 @@ |
||||
/* |
||||
* Copyright Hyperledger Besu Contributors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
*/ |
||||
|
||||
package org.hyperledger.besu.ethereum.eth.sync.backwardsync; |
||||
|
||||
import org.hyperledger.besu.ethereum.core.BlockHeader; |
||||
import org.hyperledger.besu.ethereum.core.BlockHeaderFunctions; |
||||
import org.hyperledger.besu.ethereum.rlp.BytesValueRLPOutput; |
||||
import org.hyperledger.besu.ethereum.rlp.RLP; |
||||
|
||||
import org.apache.tuweni.bytes.Bytes; |
||||
|
||||
public class BlocksHeadersConvertor implements ValueConvertor<BlockHeader> { |
||||
private final BlockHeaderFunctions blockHeaderFunctions; |
||||
|
||||
public BlocksHeadersConvertor(final BlockHeaderFunctions blockHeaderFunctions) { |
||||
this.blockHeaderFunctions = blockHeaderFunctions; |
||||
} |
||||
|
||||
public static ValueConvertor<BlockHeader> of(final BlockHeaderFunctions blockHeaderFunctions) { |
||||
return new BlocksHeadersConvertor(blockHeaderFunctions); |
||||
} |
||||
|
||||
@Override |
||||
public BlockHeader fromBytes(final byte[] bytes) { |
||||
return BlockHeader.readFrom(RLP.input(Bytes.wrap(bytes)), blockHeaderFunctions); |
||||
} |
||||
|
||||
@Override |
||||
public byte[] toBytes(final BlockHeader value) { |
||||
BytesValueRLPOutput output = new BytesValueRLPOutput(); |
||||
value.writeTo(output); |
||||
return output.encoded().toArrayUnsafe(); |
||||
} |
||||
} |
@ -0,0 +1,257 @@ |
||||
/* |
||||
* Copyright Hyperledger Besu Contributors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
*/ |
||||
package org.hyperledger.besu.ethereum.eth.sync.backwardsync; |
||||
|
||||
import static org.hyperledger.besu.util.Slf4jLambdaHelper.debugLambda; |
||||
import static org.hyperledger.besu.util.Slf4jLambdaHelper.infoLambda; |
||||
import static org.hyperledger.besu.util.Slf4jLambdaHelper.warnLambda; |
||||
|
||||
import org.hyperledger.besu.ethereum.core.Block; |
||||
import org.hyperledger.besu.ethereum.core.BlockHeader; |
||||
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask; |
||||
import org.hyperledger.besu.ethereum.eth.manager.task.GetBlockFromPeerTask; |
||||
import org.hyperledger.besu.ethereum.eth.manager.task.GetBodiesFromPeerTask; |
||||
import org.hyperledger.besu.ethereum.mainnet.HeaderValidationMode; |
||||
|
||||
import java.util.Collections; |
||||
import java.util.Comparator; |
||||
import java.util.List; |
||||
import java.util.Optional; |
||||
import java.util.concurrent.CompletableFuture; |
||||
import java.util.concurrent.CompletionStage; |
||||
|
||||
import com.google.common.annotations.VisibleForTesting; |
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
public class ForwardSyncPhase extends BackwardSyncTask { |
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ForwardSyncPhase.class); |
||||
private int batchSize = BackwardSyncContext.BATCH_SIZE; |
||||
|
||||
public ForwardSyncPhase(final BackwardSyncContext context, final BackwardChain backwardChain) { |
||||
super(context, backwardChain); |
||||
} |
||||
|
||||
@Override |
||||
public CompletableFuture<Void> executeStep() { |
||||
return CompletableFuture.supplyAsync(() -> returnFirstNUnknownHeaders(null)) |
||||
.thenCompose(this::possibleRequestBodies) |
||||
.thenApply(this::processKnownAncestors) |
||||
.thenCompose(this::possiblyMoreForwardSteps); |
||||
} |
||||
|
||||
@VisibleForTesting |
||||
protected BlockHeader processKnownAncestors(final Void unused) { |
||||
while (backwardChain.getFirstAncestorHeader().isPresent()) { |
||||
BlockHeader header = backwardChain.getFirstAncestorHeader().orElseThrow(); |
||||
if (context.getProtocolContext().getBlockchain().contains(header.getHash())) { |
||||
debugLambda( |
||||
LOG, |
||||
"Block {} is already imported, we can ignore it for the sync process", |
||||
() -> header.getHash().toHexString()); |
||||
backwardChain.dropFirstHeader(); |
||||
} else if (backwardChain.isTrusted(header.getHash())) { |
||||
debugLambda( |
||||
LOG, |
||||
"Importing trusted block {}({})", |
||||
header::getNumber, |
||||
() -> header.getHash().toHexString()); |
||||
saveBlock(backwardChain.getTrustedBlock(header.getHash())); |
||||
} else { |
||||
debugLambda(LOG, "First unprocessed header is {}", header::getNumber); |
||||
return header; |
||||
} |
||||
} |
||||
return null; |
||||
} |
||||
|
||||
@VisibleForTesting |
||||
protected List<BlockHeader> returnFirstNUnknownHeaders(final Void unused) { |
||||
while (backwardChain.getFirstAncestorHeader().isPresent()) { |
||||
BlockHeader header = backwardChain.getFirstAncestorHeader().orElseThrow(); |
||||
if (context.getProtocolContext().getBlockchain().contains(header.getHash())) { |
||||
debugLambda( |
||||
LOG, |
||||
"Block {}({}) is already imported, we can ignore it for the sync process", |
||||
() -> header.getNumber(), |
||||
() -> header.getHash().toHexString()); |
||||
backwardChain.dropFirstHeader(); |
||||
} else if (backwardChain.isTrusted(header.getHash())) { |
||||
debugLambda( |
||||
LOG, |
||||
"Block {} was added by consensus layer, we can trust it and should therefore import it.", |
||||
() -> header.getHash().toHexString()); |
||||
saveBlock(backwardChain.getTrustedBlock(header.getHash())); |
||||
} else { |
||||
return backwardChain.getFirstNAncestorHeaders(batchSize); |
||||
} |
||||
} |
||||
return Collections.emptyList(); |
||||
} |
||||
|
||||
@VisibleForTesting |
||||
public CompletableFuture<Void> possibleRequestBlock(final BlockHeader blockHeader) { |
||||
if (blockHeader == null) { |
||||
return CompletableFuture.completedFuture(null); |
||||
} else { |
||||
debugLambda( |
||||
LOG, |
||||
"Requesting body for {} ({})", |
||||
blockHeader::getNumber, |
||||
() -> blockHeader.getHash().toHexString()); |
||||
return requestBlock(blockHeader).thenApply(this::saveBlock); |
||||
} |
||||
} |
||||
|
||||
@VisibleForTesting |
||||
public CompletableFuture<Void> possibleRequestBodies(final List<BlockHeader> blockHeaders) { |
||||
if (blockHeaders.isEmpty()) { |
||||
return CompletableFuture.completedFuture(null); |
||||
} else { |
||||
debugLambda( |
||||
LOG, |
||||
"Requesting {} blocks {}->{} ({})", |
||||
blockHeaders::size, |
||||
() -> blockHeaders.get(0).getNumber(), |
||||
() -> blockHeaders.get(blockHeaders.size() - 1).getNumber(), |
||||
() -> blockHeaders.get(0).getHash().toHexString()); |
||||
return requestBodies(blockHeaders).thenApply(this::saveBlocks); |
||||
} |
||||
} |
||||
|
||||
@VisibleForTesting |
||||
protected CompletableFuture<Block> requestBlock(final BlockHeader blockHeader) { |
||||
final GetBlockFromPeerTask getBlockFromPeerTask = |
||||
GetBlockFromPeerTask.create( |
||||
context.getProtocolSchedule(), |
||||
context.getEthContext(), |
||||
Optional.of(blockHeader.getHash()), |
||||
blockHeader.getNumber(), |
||||
context.getMetricsSystem()); |
||||
final CompletableFuture<AbstractPeerTask.PeerTaskResult<Block>> run = |
||||
getBlockFromPeerTask.run(); |
||||
return run.thenApply(AbstractPeerTask.PeerTaskResult::getResult); |
||||
} |
||||
|
||||
@VisibleForTesting |
||||
protected CompletableFuture<List<Block>> requestBodies(final List<BlockHeader> blockHeaders) { |
||||
final GetBodiesFromPeerTask getBodiesFromPeerTask = |
||||
GetBodiesFromPeerTask.forHeaders( |
||||
context.getProtocolSchedule(), |
||||
context.getEthContext(), |
||||
blockHeaders, |
||||
context.getMetricsSystem()); |
||||
|
||||
final CompletableFuture<AbstractPeerTask.PeerTaskResult<List<Block>>> run = |
||||
getBodiesFromPeerTask.run(); |
||||
return run.thenApply(AbstractPeerTask.PeerTaskResult::getResult) |
||||
.thenApply( |
||||
blocks -> { |
||||
blocks.sort(Comparator.comparing(block -> block.getHeader().getNumber())); |
||||
return blocks; |
||||
}); |
||||
} |
||||
|
||||
@VisibleForTesting |
||||
protected Void saveBlock(final Block block) { |
||||
debugLambda(LOG, "Going to validate block {}", () -> block.getHeader().getHash().toHexString()); |
||||
var optResult = |
||||
context |
||||
.getBlockValidator(block.getHeader().getNumber()) |
||||
.validateAndProcessBlock( |
||||
context.getProtocolContext(), |
||||
block, |
||||
HeaderValidationMode.FULL, |
||||
HeaderValidationMode.NONE); |
||||
|
||||
optResult.blockProcessingOutputs.ifPresent( |
||||
result -> { |
||||
debugLambda( |
||||
LOG, |
||||
"Block {} was validated, going to import it", |
||||
() -> block.getHeader().getHash().toHexString()); |
||||
result.worldState.persist(block.getHeader()); |
||||
context.getProtocolContext().getBlockchain().appendBlock(block, result.receipts); |
||||
}); |
||||
return null; |
||||
} |
||||
|
||||
@VisibleForTesting |
||||
protected Void saveBlocks(final List<Block> blocks) { |
||||
|
||||
for (Block block : blocks) { |
||||
final Optional<Block> parent = |
||||
context |
||||
.getProtocolContext() |
||||
.getBlockchain() |
||||
.getBlockByHash(block.getHeader().getParentHash()); |
||||
if (parent.isEmpty()) { |
||||
batchSize = batchSize / 2 + 1; |
||||
return null; |
||||
} else { |
||||
batchSize = BackwardSyncContext.BATCH_SIZE; |
||||
saveBlock(block); |
||||
} |
||||
} |
||||
backwardChain.commit(); |
||||
infoLambda( |
||||
LOG, |
||||
"Saved blocks {}->{}", |
||||
() -> blocks.get(0).getHeader().getNumber(), |
||||
() -> blocks.get(blocks.size() - 1).getHeader().getNumber()); |
||||
return null; |
||||
} |
||||
|
||||
@VisibleForTesting |
||||
protected CompletableFuture<Void> possiblyMoreForwardSteps(final BlockHeader firstNotSynced) { |
||||
CompletableFuture<Void> completableFuture = CompletableFuture.completedFuture(null); |
||||
if (firstNotSynced == null) { |
||||
final List<Block> successors = backwardChain.getSuccessors(); |
||||
LOG.info( |
||||
"Forward Sync Phase is finished. Importing {} block(s) provided by consensus layer...", |
||||
successors.size()); |
||||
successors.forEach( |
||||
block -> { |
||||
if (!context.getProtocolContext().getBlockchain().contains(block.getHash())) { |
||||
saveBlock(block); |
||||
} |
||||
}); |
||||
LOG.info("The Backward sync is done..."); |
||||
backwardChain.clear(); |
||||
return CompletableFuture.completedFuture(null); |
||||
} |
||||
if (context.getProtocolContext().getBlockchain().contains(firstNotSynced.getParentHash())) { |
||||
debugLambda( |
||||
LOG, |
||||
"Block {} is not yet imported, we need to run another step of ForwardSync", |
||||
firstNotSynced::toLogString); |
||||
return completableFuture.thenCompose(this::executeAsync); |
||||
} |
||||
|
||||
warnLambda( |
||||
LOG, |
||||
"Block {} is not yet imported but its parent {} is not imported either... " |
||||
+ "This should not normally happen and indicates a wrong behaviour somewhere...", |
||||
firstNotSynced::toLogString, |
||||
() -> firstNotSynced.getParentHash().toHexString()); |
||||
return completableFuture.thenCompose(this::executeBackwardAsync); |
||||
} |
||||
|
||||
@VisibleForTesting |
||||
protected CompletionStage<Void> executeBackwardAsync(final Void unused) { |
||||
return new BackwardSyncPhase(context, backwardChain).executeAsync(unused); |
||||
} |
||||
} |
@ -1,147 +0,0 @@ |
||||
/* |
||||
* Copyright Hyperledger Besu Contributors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
*/ |
||||
package org.hyperledger.besu.ethereum.eth.sync.backwardsync; |
||||
|
||||
import static org.hyperledger.besu.util.Slf4jLambdaHelper.debugLambda; |
||||
|
||||
import org.hyperledger.besu.ethereum.core.Block; |
||||
import org.hyperledger.besu.ethereum.core.BlockHeader; |
||||
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask; |
||||
import org.hyperledger.besu.ethereum.eth.manager.task.GetBlockFromPeerTask; |
||||
import org.hyperledger.besu.ethereum.mainnet.HeaderValidationMode; |
||||
|
||||
import java.util.Optional; |
||||
import java.util.concurrent.CompletableFuture; |
||||
|
||||
import com.google.common.annotations.VisibleForTesting; |
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
public class ForwardSyncStep extends BackwardSyncTask { |
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ForwardSyncStep.class); |
||||
|
||||
public ForwardSyncStep(final BackwardsSyncContext context, final BackwardChain backwardChain) { |
||||
super(context, backwardChain); |
||||
} |
||||
|
||||
@Override |
||||
public CompletableFuture<Void> executeStep() { |
||||
return CompletableFuture.supplyAsync(() -> processKnownAncestors(null)) |
||||
.thenCompose(this::possibleRequestBlock) |
||||
.thenApply(this::processKnownAncestors) |
||||
.thenCompose(this::possiblyMoreForwardSteps); |
||||
} |
||||
|
||||
@VisibleForTesting |
||||
protected BlockHeader processKnownAncestors(final Void unused) { |
||||
while (backwardChain.getFirstAncestorHeader().isPresent()) { |
||||
BlockHeader header = backwardChain.getFirstAncestorHeader().orElseThrow(); |
||||
if (context.getProtocolContext().getBlockchain().contains(header.getHash())) { |
||||
debugLambda( |
||||
LOG, |
||||
"Block {} is already imported, we can ignore it for the sync process", |
||||
() -> header.getHash().toString().substring(0, 20)); |
||||
backwardChain.dropFirstHeader(); |
||||
} else if (backwardChain.isTrusted(header.getHash())) { |
||||
debugLambda( |
||||
LOG, |
||||
"Block {} was added by consensus layer, we can trust it and should therefore import it.", |
||||
() -> header.getHash().toString().substring(0, 20)); |
||||
saveBlock(backwardChain.getTrustedBlock(header.getHash())); |
||||
} else { |
||||
return header; |
||||
} |
||||
} |
||||
return null; |
||||
} |
||||
|
||||
@VisibleForTesting |
||||
public CompletableFuture<Void> possibleRequestBlock(final BlockHeader blockHeader) { |
||||
if (blockHeader == null) { |
||||
return CompletableFuture.completedFuture(null); |
||||
} else { |
||||
debugLambda( |
||||
LOG, |
||||
"We don't have body of block {}, going to request it", |
||||
() -> blockHeader.getHash().toString().substring(0, 20)); |
||||
return requestBlock(blockHeader).thenApply(this::saveBlock); |
||||
} |
||||
} |
||||
|
||||
@VisibleForTesting |
||||
protected CompletableFuture<Block> requestBlock(final BlockHeader blockHeader) { |
||||
final GetBlockFromPeerTask getBlockFromPeerTask = |
||||
GetBlockFromPeerTask.create( |
||||
context.getProtocolSchedule(), |
||||
context.getEthContext(), |
||||
Optional.of(blockHeader.getHash()), |
||||
blockHeader.getNumber(), |
||||
context.getMetricsSystem()); |
||||
final CompletableFuture<AbstractPeerTask.PeerTaskResult<Block>> run = |
||||
getBlockFromPeerTask.run(); |
||||
return run.thenApply(AbstractPeerTask.PeerTaskResult::getResult); |
||||
} |
||||
|
||||
@VisibleForTesting |
||||
protected Void saveBlock(final Block block) { |
||||
debugLambda( |
||||
LOG, |
||||
"Going to validate block {}", |
||||
() -> block.getHeader().getHash().toString().substring(0, 20)); |
||||
var optResult = |
||||
context |
||||
.getBlockValidator(block.getHeader().getNumber()) |
||||
.validateAndProcessBlock( |
||||
context.getProtocolContext(), |
||||
block, |
||||
HeaderValidationMode.FULL, |
||||
HeaderValidationMode.NONE); |
||||
|
||||
optResult.blockProcessingOutputs.ifPresent( |
||||
result -> { |
||||
debugLambda( |
||||
LOG, |
||||
"Block {} was validated, going to import it", |
||||
() -> block.getHeader().getHash().toString().substring(0, 20)); |
||||
result.worldState.persist(block.getHeader()); |
||||
context.getProtocolContext().getBlockchain().appendBlock(block, result.receipts); |
||||
}); |
||||
return null; |
||||
} |
||||
|
||||
@VisibleForTesting |
||||
protected CompletableFuture<Void> possiblyMoreForwardSteps(final BlockHeader firstUnsynced) { |
||||
CompletableFuture<Void> completableFuture = CompletableFuture.completedFuture(null); |
||||
if (firstUnsynced == null) { |
||||
LOG.debug("The only work left is to import blocks provided by consensus layer..."); |
||||
backwardChain |
||||
.getSuccessors() |
||||
.forEach( |
||||
block -> { |
||||
if (!context.getProtocolContext().getBlockchain().contains(block.getHash())) { |
||||
saveBlock(block); |
||||
} |
||||
}); |
||||
LOG.debug("The sync is done..."); |
||||
return CompletableFuture.completedFuture(null); |
||||
} |
||||
debugLambda( |
||||
LOG, |
||||
"Block {} is not yet imported, we need to run another step of ForwardSync", |
||||
() -> firstUnsynced.getHash().toString().substring(0, 20)); |
||||
return completableFuture.thenCompose(this::executeAsync); |
||||
} |
||||
} |
@ -0,0 +1,61 @@ |
||||
/* |
||||
* Copyright Hyperledger Besu Contributors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
*/ |
||||
|
||||
package org.hyperledger.besu.ethereum.eth.sync.backwardsync; |
||||
|
||||
import org.hyperledger.besu.plugin.services.storage.KeyValueStorage; |
||||
import org.hyperledger.besu.plugin.services.storage.KeyValueStorageTransaction; |
||||
|
||||
import java.io.Closeable; |
||||
import java.io.IOException; |
||||
import java.util.Optional; |
||||
|
||||
public class GenericKeyValueStorageFacade<K, V> implements Closeable { |
||||
protected final KeyValueStorage storage; |
||||
private final KeyConvertor<K> keyConvertor; |
||||
private final ValueConvertor<V> valueConvertor; |
||||
|
||||
public GenericKeyValueStorageFacade( |
||||
final KeyConvertor<K> keyConvertor, |
||||
final ValueConvertor<V> valueConvertor, |
||||
final KeyValueStorage storageBySegmentIdentifier) { |
||||
this.keyConvertor = keyConvertor; |
||||
this.valueConvertor = valueConvertor; |
||||
this.storage = storageBySegmentIdentifier; |
||||
} |
||||
|
||||
public Optional<V> get(final K key) { |
||||
return storage.get(keyConvertor.toBytes(key)).map(valueConvertor::fromBytes); |
||||
} |
||||
|
||||
public void put(final K key, final V value) { |
||||
final KeyValueStorageTransaction keyValueStorageTransaction = storage.startTransaction(); |
||||
keyValueStorageTransaction.put(keyConvertor.toBytes(key), valueConvertor.toBytes(value)); |
||||
keyValueStorageTransaction.commit(); |
||||
} |
||||
|
||||
public void drop(final K key) { |
||||
storage.tryDelete(keyConvertor.toBytes(key)); |
||||
} |
||||
|
||||
public void clear() { |
||||
storage.clear(); |
||||
} |
||||
|
||||
@Override |
||||
public void close() throws IOException { |
||||
storage.close(); |
||||
} |
||||
} |
@ -0,0 +1,20 @@ |
||||
/* |
||||
* Copyright Hyperledger Besu Contributors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
*/ |
||||
|
||||
package org.hyperledger.besu.ethereum.eth.sync.backwardsync; |
||||
|
||||
public interface KeyConvertor<T> { |
||||
byte[] toBytes(final T key); |
||||
} |
@ -0,0 +1,22 @@ |
||||
/* |
||||
* Copyright Hyperledger Besu Contributors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
*/ |
||||
|
||||
package org.hyperledger.besu.ethereum.eth.sync.backwardsync; |
||||
|
||||
public interface ValueConvertor<V> { |
||||
V fromBytes(final byte[] bytes); |
||||
|
||||
byte[] toBytes(final V value); |
||||
} |
@ -0,0 +1,132 @@ |
||||
/* |
||||
* Copyright Hyperledger Besu Contributors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
*/ |
||||
|
||||
package org.hyperledger.besu.ethereum.eth.sync.backwardsync; |
||||
|
||||
import static org.assertj.core.api.AssertionsForClassTypes.assertThat; |
||||
import static org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider.createInMemoryBlockchain; |
||||
import static org.mockito.ArgumentMatchers.anyLong; |
||||
import static org.mockito.Mockito.spy; |
||||
import static org.mockito.Mockito.when; |
||||
|
||||
import org.hyperledger.besu.config.StubGenesisConfigOptions; |
||||
import org.hyperledger.besu.datatypes.Hash; |
||||
import org.hyperledger.besu.ethereum.ProtocolContext; |
||||
import org.hyperledger.besu.ethereum.chain.MutableBlockchain; |
||||
import org.hyperledger.besu.ethereum.core.Block; |
||||
import org.hyperledger.besu.ethereum.core.BlockDataGenerator; |
||||
import org.hyperledger.besu.ethereum.core.TransactionReceipt; |
||||
import org.hyperledger.besu.ethereum.eth.manager.EthContext; |
||||
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; |
||||
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil; |
||||
import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer; |
||||
import org.hyperledger.besu.ethereum.mainnet.MainnetProtocolSchedule; |
||||
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; |
||||
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec; |
||||
import org.hyperledger.besu.plugin.services.MetricsSystem; |
||||
|
||||
import java.lang.reflect.Field; |
||||
import java.lang.reflect.Modifier; |
||||
import java.util.List; |
||||
import java.util.concurrent.CompletableFuture; |
||||
|
||||
import org.jetbrains.annotations.NotNull; |
||||
import org.junit.Before; |
||||
import org.junit.Test; |
||||
import org.junit.runner.RunWith; |
||||
import org.mockito.Answers; |
||||
import org.mockito.Mock; |
||||
import org.mockito.Spy; |
||||
import org.mockito.junit.MockitoJUnitRunner; |
||||
|
||||
@RunWith(MockitoJUnitRunner.class) |
||||
public class BackwardSyncLookupServiceTest { |
||||
public static final int REMOTE_HEIGHT = 50; |
||||
private static final BlockDataGenerator blockDataGenerator = new BlockDataGenerator(); |
||||
|
||||
@Spy |
||||
private final ProtocolSchedule protocolSchedule = |
||||
MainnetProtocolSchedule.fromConfig(new StubGenesisConfigOptions()); |
||||
|
||||
@Spy private final ProtocolSpec mockProtocolSpec = protocolSchedule.getByBlockNumber(0L); |
||||
private MutableBlockchain remoteBlockchain; |
||||
private RespondingEthPeer peer; |
||||
|
||||
@Mock(answer = Answers.RETURNS_DEEP_STUBS) |
||||
private MetricsSystem metricsSystem; |
||||
|
||||
@Mock private ProtocolContext protocolContext; |
||||
|
||||
private BackwardSyncLookupService backwardSyncLookupService; |
||||
|
||||
@Before |
||||
public void setup() throws NoSuchFieldException, IllegalAccessException { |
||||
when(protocolSchedule.getByBlockNumber(anyLong())).thenReturn(mockProtocolSpec); |
||||
Block genesisBlock = blockDataGenerator.genesisBlock(); |
||||
remoteBlockchain = createInMemoryBlockchain(genesisBlock); |
||||
final Field max_retries = BackwardSyncLookupService.class.getDeclaredField("MAX_RETRIES"); |
||||
|
||||
Field modifiersField = Field.class.getDeclaredField("modifiers"); |
||||
modifiersField.setAccessible(true); |
||||
modifiersField.setInt(max_retries, max_retries.getModifiers() & ~Modifier.FINAL); |
||||
|
||||
max_retries.setAccessible(true); |
||||
max_retries.set(null, 1); |
||||
|
||||
for (int i = 1; i <= REMOTE_HEIGHT; i++) { |
||||
final BlockDataGenerator.BlockOptions options = |
||||
new BlockDataGenerator.BlockOptions() |
||||
.setBlockNumber(i) |
||||
.setParentHash(remoteBlockchain.getBlockHashByNumber(i - 1).orElseThrow()); |
||||
final Block block = blockDataGenerator.block(options); |
||||
final List<TransactionReceipt> receipts = blockDataGenerator.receipts(block); |
||||
|
||||
remoteBlockchain.appendBlock(block, receipts); |
||||
} |
||||
EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); |
||||
|
||||
peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager); |
||||
EthContext ethContext = ethProtocolManager.ethContext(); |
||||
|
||||
backwardSyncLookupService = |
||||
spy( |
||||
new BackwardSyncLookupService( |
||||
protocolSchedule, ethContext, metricsSystem, protocolContext)); |
||||
} |
||||
|
||||
@Test |
||||
public void shouldFindABlockWhenResponding() throws Exception { |
||||
final Hash hash = getBlockByNumber(23).getHash(); |
||||
|
||||
final CompletableFuture<List<Block>> future = backwardSyncLookupService.lookup(hash); |
||||
|
||||
respondUntilFutureIsDone(future); |
||||
|
||||
final List<Block> blocks = future.get(); |
||||
assertThat(blocks.get(0)).isEqualTo(getBlockByNumber(23)); |
||||
} |
||||
|
||||
private void respondUntilFutureIsDone(final CompletableFuture<?> future) { |
||||
final RespondingEthPeer.Responder responder = |
||||
RespondingEthPeer.blockchainResponder(remoteBlockchain); |
||||
|
||||
peer.respondWhileOtherThreadsWork(responder, () -> !future.isDone()); |
||||
} |
||||
|
||||
@NotNull |
||||
private Block getBlockByNumber(final int number) { |
||||
return remoteBlockchain.getBlockByNumber(number).orElseThrow(); |
||||
} |
||||
} |
Loading…
Reference in new issue