Try to build better block proposals until timeout or GetPayload is ca… (#4516)

* Try to build better block proposals until timeout or GetPayload is called

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>
pull/4546/head
Fabio Di Fabio 2 years ago committed by GitHub
parent d73ce2116c
commit e9f979ebd3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      CHANGELOG.md
  2. 20
      besu/src/main/java/org/hyperledger/besu/controller/MergeBesuControllerBuilder.java
  3. 4
      besu/src/main/java/org/hyperledger/besu/controller/TransitionBesuControllerBuilder.java
  4. 2
      besu/src/test/java/org/hyperledger/besu/controller/TransitionControllerBuilderTest.java
  5. 4
      consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/PostMergeContext.java
  6. 214
      consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/blockcreation/MergeCoordinator.java
  7. 2
      consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/blockcreation/MergeMiningCoordinator.java
  8. 5
      consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/blockcreation/PayloadIdentifier.java
  9. 5
      consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/blockcreation/TransitionCoordinator.java
  10. 246
      consensus/merge/src/test/java/org/hyperledger/besu/consensus/merge/blockcreation/MergeCoordinatorTest.java
  11. 4
      consensus/merge/src/test/java/org/hyperledger/besu/consensus/merge/blockcreation/MergeReorgTest.java
  12. 6
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/engine/EngineGetPayload.java
  13. 6
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/methods/ExecutionEngineJsonRpcMethods.java
  14. 6
      ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/engine/EngineGetPayloadTest.java
  15. 7
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/MonitoredExecutors.java

@ -18,6 +18,7 @@
- New unstable configuration options to set the maximum time, in milliseconds, a PoS block creation jobs is allowed to run [#4519](https://github.com/hyperledger/besu/pull/4519)
- Tune EthScheduler thread pools to avoid to recreate too many threads [#4529](https://github.com/hyperledger/besu/pull/4529)
- RocksDB snapshot based worldstate and plugin-api addition of Snapshot interfaces [#4409](https://github.com/hyperledger/besu/pull/4409)
- Continuously try to build better block proposals until timeout or GetPayload is called [#4516](https://github.com/hyperledger/besu/pull/4516)
### Bug Fixes
- Corrects emission of blockadded events when rewinding during a re-org. Fix for [#4495](https://github.com/hyperledger/besu/issues/4495)
@ -26,7 +27,6 @@
- Corrects treating a block as bad on internal error [#4512](https://github.com/hyperledger/besu/issues/4512)
- In GraphQL update scalar parsing to be variable friendly [#4522](https://github.com/hyperledger/besu/pull/4522)
### Download Links
@ -89,7 +89,6 @@ https://hyperledger.jfrog.io/hyperledger/besu-binaries/besu/22.7.5/besu-22.7.5.t
### Additions and Improvements
- Allow free gas networks in the London fee market [#4061](https://github.com/hyperledger/besu/issues/4061)
- Upgrade besu-native to 0.6.0 and use Blake2bf native implementation if available by default [#4264](https://github.com/hyperledger/besu/pull/4264)
<<<<<<< HEAD
- Resets engine QoS timer with every call to the engine API instead of only when ExchangeTransitionConfiguration is called [#4411](https://github.com/hyperledger/besu/issues/4411)
- ExchangeTransitionConfiguration mismatch will only submit a debug log not a warning anymore [#4411](https://github.com/hyperledger/besu/issues/4411)
- Upgrade besu-native to 0.6.1 and include linux arm64 build of bls12-381 [#4416](https://github.com/hyperledger/besu/pull/4416)

@ -33,6 +33,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.MergePeerFilter;
import org.hyperledger.besu.ethereum.eth.manager.MonitoredExecutors;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.peervalidation.RequiredBlocksPeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardChain;
@ -42,10 +43,13 @@ import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ScheduleBasedBlockHeaderFunctions;
import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
@ -66,7 +70,6 @@ public class MergeBesuControllerBuilder extends BesuControllerBuilder {
return createTransitionMiningCoordinator(
protocolSchedule,
protocolContext,
ethProtocolManager.ethContext(),
transactionPool,
miningParameters,
syncState,
@ -77,7 +80,8 @@ public class MergeBesuControllerBuilder extends BesuControllerBuilder {
ethProtocolManager.ethContext(),
syncState,
BackwardChain.from(
storageProvider, ScheduleBasedBlockHeaderFunctions.create(protocolSchedule))));
storageProvider, ScheduleBasedBlockHeaderFunctions.create(protocolSchedule))),
metricsSystem);
}
@Override
@ -132,18 +136,24 @@ public class MergeBesuControllerBuilder extends BesuControllerBuilder {
protected MiningCoordinator createTransitionMiningCoordinator(
final ProtocolSchedule protocolSchedule,
final ProtocolContext protocolContext,
final EthContext ethContext,
final TransactionPool transactionPool,
final MiningParameters miningParameters,
final SyncState syncState,
final BackwardSyncContext backwardSyncContext) {
final BackwardSyncContext backwardSyncContext,
final MetricsSystem metricsSystem) {
this.syncState.set(syncState);
final ExecutorService blockBuilderExecutor =
MonitoredExecutors.newCachedThreadPool("PoS-Block-Builder", 1, metricsSystem);
return new MergeCoordinator(
protocolContext,
protocolSchedule,
ethContext,
task -> {
LOG.debug("Block builder executor status {}", blockBuilderExecutor);
return CompletableFuture.runAsync(task, blockBuilderExecutor);
},
transactionPool.getPendingTransactions(),
miningParameters,
backwardSyncContext);

@ -129,11 +129,11 @@ public class TransitionBesuControllerBuilder extends BesuControllerBuilder {
mergeBesuControllerBuilder.createTransitionMiningCoordinator(
transitionProtocolSchedule,
protocolContext,
ethProtocolManager.ethContext(),
transactionPool,
transitionMiningParameters,
syncState,
transitionBackwardsSyncContext));
transitionBackwardsSyncContext,
metricsSystem));
initTransitionWatcher(protocolContext, composedCoordinator);
return composedCoordinator;
}

@ -46,6 +46,7 @@ import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.mainnet.feemarket.FeeMarket;
import org.hyperledger.besu.ethereum.storage.StorageProvider;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import java.util.Optional;
@ -239,6 +240,7 @@ public class TransitionControllerBuilderTest {
final BesuControllerBuilder preMerge, final MergeBesuControllerBuilder postMerge) {
var builder = new TransitionBesuControllerBuilder(preMerge, postMerge);
builder.storageProvider(storageProvider);
builder.metricsSystem(new NoOpMetricsSystem());
var coordinator =
builder.createMiningCoordinator(
transitionProtocolSchedule,

@ -215,7 +215,7 @@ public class PostMergeContext implements MergeContext {
debugLambda(
LOG,
"New proposal for payloadId {} {} is better than the previous one {}",
payloadId::toHexString,
payloadId::toString,
() -> logBlockProposal(newBlock),
() -> logBlockProposal(currBestBlock));
blocksInProgress.removeAll(
@ -228,7 +228,7 @@ public class PostMergeContext implements MergeContext {
debugLambda(
LOG,
"Current best proposal for payloadId {} {}",
payloadId::toHexString,
payloadId::toString,
() -> retrieveBlockById(payloadId).map(bb -> logBlockProposal(bb)).orElse("N/A"));
}
}

@ -17,7 +17,6 @@ package org.hyperledger.besu.consensus.merge.blockcreation;
import static org.hyperledger.besu.consensus.merge.TransitionUtils.isTerminalProofOfWorkBlock;
import static org.hyperledger.besu.consensus.merge.blockcreation.MergeMiningCoordinator.ForkchoiceResult.Status.INVALID;
import static org.hyperledger.besu.consensus.merge.blockcreation.MergeMiningCoordinator.ForkchoiceResult.Status.INVALID_PAYLOAD_ATTRIBUTES;
import static org.hyperledger.besu.util.FutureUtils.exceptionallyCompose;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.debugLambda;
import org.hyperledger.besu.consensus.merge.MergeContext;
@ -34,7 +33,6 @@ import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.core.MiningParameters;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardSyncContext;
import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BadChainListener;
import org.hyperledger.besu.ethereum.eth.transactions.sorter.AbstractPendingTransactionsSorter;
@ -47,9 +45,13 @@ import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
@ -70,20 +72,23 @@ public class MergeCoordinator implements MergeMiningCoordinator, BadChainListene
protected final AtomicReference<BlockHeader> latestDescendsFromTerminal = new AtomicReference<>();
protected final MergeContext mergeContext;
protected final ProtocolContext protocolContext;
protected final EthContext ethContext;
protected final ProposalBuilderExecutor blockBuilderExecutor;
protected final BackwardSyncContext backwardSyncContext;
protected final ProtocolSchedule protocolSchedule;
private final Map<PayloadIdentifier, BlockCreationTask> blockCreationTask =
new ConcurrentHashMap<>();
public MergeCoordinator(
final ProtocolContext protocolContext,
final ProtocolSchedule protocolSchedule,
final EthContext ethContext,
final ProposalBuilderExecutor blockBuilderExecutor,
final AbstractPendingTransactionsSorter pendingTransactions,
final MiningParameters miningParams,
final BackwardSyncContext backwardSyncContext) {
this.protocolContext = protocolContext;
this.protocolSchedule = protocolSchedule;
this.ethContext = ethContext;
this.blockBuilderExecutor = blockBuilderExecutor;
this.mergeContext = protocolContext.getConsensusContext(MergeContext.class);
this.miningParameters = miningParams;
this.backwardSyncContext = backwardSyncContext;
@ -178,12 +183,26 @@ public class MergeCoordinator implements MergeMiningCoordinator, BadChainListene
final Bytes32 prevRandao,
final Address feeRecipient) {
// we assume that preparePayload is always called sequentially, since the RPC Engine calls
// are sequential, if this assumption changes then more synchronization should be added to
// shared data structures
final PayloadIdentifier payloadIdentifier =
PayloadIdentifier.forPayloadParams(
parentHeader.getBlockHash(), timestamp, prevRandao, feeRecipient);
if (blockCreationTask.containsKey(payloadIdentifier)) {
LOG.debug(
"Block proposal for the same payload id {} already present, nothing to do",
payloadIdentifier);
return payloadIdentifier;
}
final MergeBlockCreator mergeBlockCreator =
this.mergeBlockCreator.forParams(parentHeader, Optional.ofNullable(feeRecipient));
blockCreationTask.put(payloadIdentifier, new BlockCreationTask(mergeBlockCreator));
// put the empty block in first
final Block emptyBlock =
mergeBlockCreator.createBlock(Optional.of(Collections.emptyList()), prevRandao, timestamp);
@ -195,7 +214,7 @@ public class MergeCoordinator implements MergeMiningCoordinator, BadChainListene
LOG,
"Built empty block proposal {} for payload {}",
emptyBlock::toLogString,
payloadIdentifier::toShortHexString);
payloadIdentifier::toString);
} else {
LOG.warn(
"failed to execute empty block proposal {}, reason {}",
@ -208,75 +227,122 @@ public class MergeCoordinator implements MergeMiningCoordinator, BadChainListene
return payloadIdentifier;
}
@Override
public void finalizeProposalById(final PayloadIdentifier payloadId) {
LOG.debug("Finalizing block proposal for payload id {}", payloadId);
blockCreationTask.computeIfPresent(
payloadId,
(pid, blockCreationTask) -> {
blockCreationTask.cancel();
return blockCreationTask;
});
}
private void tryToBuildBetterBlock(
final Long timestamp,
final Bytes32 random,
final PayloadIdentifier payloadIdentifier,
final MergeBlockCreator mergeBlockCreator) {
long remainingTime = miningParameters.getPosBlockCreationMaxTime();
final Supplier<Block> blockCreator =
() -> mergeBlockCreator.createBlock(Optional.empty(), random, timestamp);
// start working on a full block and update the payload value and candidate when it's ready
final long startedAt = System.currentTimeMillis();
retryBlockCreation(payloadIdentifier, blockCreator, remainingTime)
.thenAccept(
bestBlock -> {
final var resultBest = validateBlock(bestBlock);
if (resultBest.blockProcessingOutputs.isPresent()) {
mergeContext.putPayloadById(payloadIdentifier, bestBlock);
LOG.info(
"Successfully built block {} for proposal identified by {}, with {} transactions, in {}ms",
bestBlock.toLogString(),
payloadIdentifier.toHexString(),
bestBlock.getBody().getTransactions().size(),
System.currentTimeMillis() - startedAt);
} else {
LOG.warn(
"Block {} built for proposal identified by {}, is not valid reason {}",
bestBlock.getHash(),
payloadIdentifier.toHexString(),
resultBest.errorMessage);
LOG.debug(
"Block creation started for payload id {}, remaining time is {}ms",
payloadIdentifier,
miningParameters.getPosBlockCreationMaxTime());
blockBuilderExecutor
.buildProposal(() -> retryBlockCreationUntilUseful(payloadIdentifier, blockCreator))
.orTimeout(miningParameters.getPosBlockCreationMaxTime(), TimeUnit.MILLISECONDS)
.whenComplete(
(unused, throwable) -> {
if (throwable != null) {
debugLambda(
LOG,
"Exception building block for payload id {}, reason {}",
payloadIdentifier::toString,
() -> logException(throwable));
}
blockCreationTask.computeIfPresent(
payloadIdentifier,
(pid, blockCreationTask) -> {
blockCreationTask.cancel();
return null;
});
});
}
private CompletableFuture<Block> retryBlockCreation(
private Void retryBlockCreationUntilUseful(
final PayloadIdentifier payloadIdentifier, final Supplier<Block> blockCreator) {
while (!isBlockCreationCancelled(payloadIdentifier)) {
try {
recoverableBlockCreation(payloadIdentifier, blockCreator, System.currentTimeMillis());
} catch (final CancellationException ce) {
debugLambda(
LOG,
"Block creation for payload id {} has been cancelled, reason {}",
payloadIdentifier::toString,
() -> logException(ce));
return null;
} catch (final Throwable e) {
LOG.warn(
"Something went wrong creating block for payload id {}, error {}",
payloadIdentifier,
logException(e));
return null;
}
}
return null;
}
private void recoverableBlockCreation(
final PayloadIdentifier payloadIdentifier,
final Supplier<Block> blockCreator,
final long remainingTime) {
final long startedAt = System.currentTimeMillis();
final long startedAt) {
try {
evaluateNewBlock(blockCreator.get(), payloadIdentifier, startedAt);
} catch (final Throwable throwable) {
if (canRetryBlockCreation(throwable) && !isBlockCreationCancelled(payloadIdentifier)) {
debugLambda(
LOG,
"Retrying block creation for payload id {} after recoverable error {}",
payloadIdentifier::toString,
() -> logException(throwable));
recoverableBlockCreation(payloadIdentifier, blockCreator, startedAt);
} else {
throw throwable;
}
}
}
debugLambda(
LOG,
"Block creation started for payload id {}, remaining time is {}ms",
payloadIdentifier::toShortHexString,
() -> remainingTime);
return exceptionallyCompose(
ethContext
.getScheduler()
.scheduleComputationTask(blockCreator)
.orTimeout(remainingTime, TimeUnit.MILLISECONDS),
throwable -> {
if (canRetryBlockCreation(throwable)) {
final long newRemainingTime = remainingTime - (System.currentTimeMillis() - startedAt);
debugLambda(
LOG,
"Retrying block creation for payload id {}, remaining time is {}ms, last error {}",
payloadIdentifier::toShortHexString,
() -> newRemainingTime,
() -> logException(throwable));
return retryBlockCreation(payloadIdentifier, blockCreator, newRemainingTime);
} else {
debugLambda(
LOG,
"Something went wrong creating block for payload id {}, error {}",
payloadIdentifier::toShortHexString,
() -> logException(throwable));
}
return CompletableFuture.failedFuture(throwable);
});
private void evaluateNewBlock(
final Block bestBlock, final PayloadIdentifier payloadIdentifier, final long startedAt) {
if (isBlockCreationCancelled(payloadIdentifier)) return;
final var resultBest = validateBlock(bestBlock);
if (resultBest.blockProcessingOutputs.isPresent()) {
if (isBlockCreationCancelled(payloadIdentifier)) return;
mergeContext.putPayloadById(payloadIdentifier, bestBlock);
debugLambda(
LOG,
"Successfully built block {} for proposal identified by {}, with {} transactions, in {}ms",
bestBlock::toLogString,
payloadIdentifier::toString,
bestBlock.getBody().getTransactions()::size,
() -> System.currentTimeMillis() - startedAt);
} else {
LOG.warn(
"Block {} built for proposal identified by {}, is not valid reason {}",
bestBlock.getHash(),
payloadIdentifier.toString(),
resultBest.errorMessage);
}
}
private boolean canRetryBlockCreation(final Throwable throwable) {
@ -327,7 +393,6 @@ public class MergeCoordinator implements MergeMiningCoordinator, BadChainListene
@Override
public Result validateBlock(final Block block) {
final var chain = protocolContext.getBlockchain();
chain
.getBlockHeader(block.getHeader().getParentHash())
@ -715,4 +780,31 @@ public class MergeCoordinator implements MergeMiningCoordinator, BadChainListene
pw.flush();
return sw.toString();
}
private boolean isBlockCreationCancelled(final PayloadIdentifier payloadId) {
final BlockCreationTask job = blockCreationTask.get(payloadId);
if (job == null) {
return true;
}
return job.cancelled.get();
}
private static class BlockCreationTask {
final MergeBlockCreator blockCreator;
final AtomicBoolean cancelled;
public BlockCreationTask(final MergeBlockCreator blockCreator) {
this.blockCreator = blockCreator;
this.cancelled = new AtomicBoolean(false);
}
public void cancel() {
cancelled.set(true);
blockCreator.cancel();
}
}
public interface ProposalBuilderExecutor {
CompletableFuture<Void> buildProposal(final Runnable task);
}
}

@ -75,6 +75,8 @@ public interface MergeMiningCoordinator extends MiningCoordinator {
Optional<Hash> getLatestValidHashOfBadBlock(final Hash blockHash);
void finalizeProposalById(final PayloadIdentifier payloadId);
class ForkchoiceResult {
public enum Status {
VALID,

@ -91,4 +91,9 @@ public class PayloadIdentifier implements Quantity {
public int hashCode() {
return val.hashCode();
}
@Override
public String toString() {
return toHexString();
}
}

@ -213,4 +213,9 @@ public class TransitionCoordinator extends TransitionUtils<MiningCoordinator>
public Optional<Hash> getLatestValidHashOfBadBlock(final Hash blockHash) {
return mergeCoordinator.getLatestValidHashOfBadBlock(blockHash);
}
@Override
public void finalizeProposalById(final PayloadIdentifier payloadId) {
mergeCoordinator.finalizeProposalById(payloadId);
}
}

@ -20,11 +20,13 @@ import static org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider
import static org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider.createInMemoryWorldStateArchive;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -32,6 +34,7 @@ import static org.mockito.Mockito.when;
import org.hyperledger.besu.config.MergeConfigOptions;
import org.hyperledger.besu.consensus.merge.MergeContext;
import org.hyperledger.besu.consensus.merge.blockcreation.MergeCoordinator.ProposalBuilderExecutor;
import org.hyperledger.besu.consensus.merge.blockcreation.MergeMiningCoordinator.ForkchoiceResult;
import org.hyperledger.besu.crypto.KeyPair;
import org.hyperledger.besu.crypto.SECPPrivateKey;
@ -54,8 +57,6 @@ import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.core.MiningParameters;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.core.TransactionTestFixture;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardSyncContext;
import org.hyperledger.besu.ethereum.eth.transactions.ImmutableTransactionPoolConfiguration;
import org.hyperledger.besu.ethereum.eth.transactions.sorter.BaseFeePendingTransactionsSorter;
@ -70,9 +71,11 @@ import org.hyperledger.besu.testutil.TestClock;
import java.time.ZoneId;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import com.google.common.base.Suppliers;
import org.apache.tuweni.bytes.Bytes32;
@ -104,8 +107,8 @@ public class MergeCoordinatorTest implements MergeGenesisConfigHelper {
new KeyPair(PRIVATE_KEY1, SIGNATURE_ALGORITHM.get().createPublicKey(PRIVATE_KEY1));
@Mock MergeContext mergeContext;
@Mock BackwardSyncContext backwardSyncContext;
@Mock EthContext ethContext;
@Mock EthScheduler ethScheduler;
@Mock ProposalBuilderExecutor proposalBuilderExecutor;
private final Address coinbase = genesisAllocations(getPosGenesisConfigFile()).findFirst().get();
@Spy
@ -133,12 +136,15 @@ public class MergeCoordinatorTest implements MergeGenesisConfigHelper {
private final BaseFeePendingTransactionsSorter transactions =
new BaseFeePendingTransactionsSorter(
ImmutableTransactionPoolConfiguration.builder().txPoolMaxSize(10).build(),
ImmutableTransactionPoolConfiguration.builder()
.txPoolMaxSize(10)
.txPoolLimitByAccountPercentage(100.0f)
.build(),
TestClock.system(ZoneId.systemDefault()),
metricsSystem,
MergeCoordinatorTest::mockBlockHeader);
final Transaction localTransaction0 = createTransaction(0);
CompletableFuture<Void> blockCreationTask = CompletableFuture.completedFuture(null);
@Before
public void setUp() {
@ -151,16 +157,20 @@ public class MergeCoordinatorTest implements MergeGenesisConfigHelper {
genesisState.writeStateTo(mutable);
mutable.persist(null);
when(ethContext.getScheduler()).thenReturn(ethScheduler);
when(ethScheduler.scheduleComputationTask(any()))
.thenAnswer(i -> CompletableFuture.completedFuture(i.getArgument(0, Supplier.class).get()));
when(proposalBuilderExecutor.buildProposal(any()))
.thenAnswer(
invocation -> {
final Runnable runnable = invocation.getArgument(0);
blockCreationTask = CompletableFuture.runAsync(runnable);
return blockCreationTask;
});
MergeConfigOptions.setMergeEnabled(true);
this.coordinator =
new MergeCoordinator(
protocolContext,
mockProtocolSchedule,
ethContext,
proposalBuilderExecutor,
transactions,
miningParameters,
backwardSyncContext);
@ -168,7 +178,13 @@ public class MergeCoordinatorTest implements MergeGenesisConfigHelper {
@Test
public void coinbaseShouldMatchSuggestedFeeRecipient() {
when(mergeContext.getFinalized()).thenReturn(Optional.empty());
doAnswer(
invocation -> {
coordinator.finalizeProposalById(invocation.getArgument(0, PayloadIdentifier.class));
return null;
})
.when(mergeContext)
.putPayloadById(any(), any());
var payloadId =
coordinator.preparePayload(
@ -185,15 +201,66 @@ public class MergeCoordinatorTest implements MergeGenesisConfigHelper {
}
@Test
public void shouldRetryBlockCreationIfStillHaveTime() {
when(mergeContext.getFinalized()).thenReturn(Optional.empty());
reset(ethContext, ethScheduler);
when(ethContext.getScheduler()).thenReturn(ethScheduler);
when(ethScheduler.scheduleComputationTask(any()))
.thenReturn(CompletableFuture.failedFuture(new StorageException("lock")))
.thenAnswer(i -> CompletableFuture.completedFuture(i.getArgument(0, Supplier.class).get()));
public void shouldContinueBuildingBlocksUntilFinalizeIsCalled()
throws InterruptedException, ExecutionException {
final AtomicLong retries = new AtomicLong(0);
doAnswer(
invocation -> {
if (retries.getAndIncrement() < 5) {
// a new transaction every time a block is built
transactions.addLocalTransaction(
createTransaction(retries.get() - 1), Optional.empty());
} else {
// when we have 5 transactions finalize block creation
coordinator.finalizeProposalById(
invocation.getArgument(0, PayloadIdentifier.class));
}
return null;
})
.when(mergeContext)
.putPayloadById(any(), any());
var payloadId =
coordinator.preparePayload(
genesisState.getBlock().getHeader(),
System.currentTimeMillis() / 1000,
Bytes32.ZERO,
suggestedFeeRecipient);
blockCreationTask.get();
ArgumentCaptor<Block> block = ArgumentCaptor.forClass(Block.class);
verify(mergeContext, times(retries.intValue())).putPayloadById(eq(payloadId), block.capture());
transactions.addLocalTransaction(localTransaction0, Optional.empty());
assertThat(block.getAllValues().size()).isEqualTo(retries.intValue());
for (int i = 0; i < retries.intValue(); i++) {
assertThat(block.getAllValues().get(i).getBody().getTransactions()).hasSize(i);
}
}
@Test
public void shouldRetryBlockCreationOnRecoverableError()
throws InterruptedException, ExecutionException {
doAnswer(
invocation -> {
if (invocation.getArgument(1, Block.class).getBody().getTransactions().isEmpty()) {
// this is called by the first empty block
doThrow(new StorageException("lock")) // first fail
.doCallRealMethod() // then work
.when(blockchain)
.getBlockHeader(any());
} else {
// stop block creation loop when we see a not empty block
coordinator.finalizeProposalById(
invocation.getArgument(0, PayloadIdentifier.class));
}
return null;
})
.when(mergeContext)
.putPayloadById(any(), any());
transactions.addLocalTransaction(createTransaction(0), Optional.empty());
var payloadId =
coordinator.preparePayload(
@ -202,10 +269,11 @@ public class MergeCoordinatorTest implements MergeGenesisConfigHelper {
Bytes32.ZERO,
suggestedFeeRecipient);
blockCreationTask.get();
ArgumentCaptor<Block> block = ArgumentCaptor.forClass(Block.class);
verify(mergeContext, times(2)).putPayloadById(eq(payloadId), block.capture());
verify(ethScheduler, times(2)).scheduleComputationTask(any());
assertThat(block.getAllValues().size()).isEqualTo(2);
assertThat(block.getAllValues().get(0).getBody().getTransactions()).hasSize(0);
@ -213,23 +281,16 @@ public class MergeCoordinatorTest implements MergeGenesisConfigHelper {
}
@Test
public void shouldStopRetryBlockCreationIfTimeExpired() {
when(mergeContext.getFinalized()).thenReturn(Optional.empty());
doReturn(1L).when(miningParameters).getPosBlockCreationMaxTime();
reset(ethContext, ethScheduler);
when(ethContext.getScheduler()).thenReturn(ethScheduler);
when(ethScheduler.scheduleComputationTask(any()))
.thenReturn(
CompletableFuture.supplyAsync(
() -> {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return CompletableFuture.failedFuture(new RuntimeException());
}))
.thenAnswer(i -> fail("Must not be called twice"));
public void shouldStopRetryBlockCreationIfTimeExpired() throws InterruptedException {
final AtomicLong retries = new AtomicLong(0);
doReturn(100L).when(miningParameters).getPosBlockCreationMaxTime();
doAnswer(
invocation -> {
retries.incrementAndGet();
return null;
})
.when(mergeContext)
.putPayloadById(any(), any());
var payloadId =
coordinator.preparePayload(
@ -238,10 +299,109 @@ public class MergeCoordinatorTest implements MergeGenesisConfigHelper {
Bytes32.ZERO,
suggestedFeeRecipient);
try {
blockCreationTask.get();
fail("Timeout expected");
} catch (ExecutionException e) {
assertThat(e).hasCauseInstanceOf(TimeoutException.class);
}
verify(mergeContext, atLeast(retries.intValue())).putPayloadById(eq(payloadId), any());
}
@Test
public void shouldStopInProgressBlockCreationIfFinalizedIsCalled()
throws InterruptedException, ExecutionException {
final CountDownLatch waitForBlockCreationInProgress = new CountDownLatch(1);
doAnswer(
invocation ->
// this is called by the first empty block
doAnswer(
i -> {
waitForBlockCreationInProgress.countDown();
// simulate a long running task
try {
Thread.sleep(1000);
} catch (Exception e) {
throw new RuntimeException(e);
}
return i.callRealMethod();
})
.when(blockchain)
.getBlockHeader(any()))
.when(mergeContext)
.putPayloadById(any(), any());
var payloadId =
coordinator.preparePayload(
genesisState.getBlock().getHeader(),
System.currentTimeMillis() / 1000,
Bytes32.ZERO,
suggestedFeeRecipient);
waitForBlockCreationInProgress.await();
coordinator.finalizeProposalById(payloadId);
blockCreationTask.get();
// check that we only the empty block has been built
ArgumentCaptor<Block> block = ArgumentCaptor.forClass(Block.class);
verify(mergeContext, times(1)).putPayloadById(eq(payloadId), block.capture());
verify(ethScheduler, times(1)).scheduleComputationTask(any());
assertThat(block.getAllValues().size()).isEqualTo(1);
assertThat(block.getAllValues().get(0).getBody().getTransactions()).hasSize(0);
}
@Test
public void shouldNotStartAnotherBlockCreationJobIfCalledAgainWithTheSamePayloadId()
throws ExecutionException, InterruptedException {
final AtomicLong retries = new AtomicLong(0);
doAnswer(
invocation -> {
if (retries.getAndIncrement() < 5) {
// a new transaction every time a block is built
transactions.addLocalTransaction(
createTransaction(retries.get() - 1), Optional.empty());
} else {
// when we have 5 transactions finalize block creation
coordinator.finalizeProposalById(
invocation.getArgument(0, PayloadIdentifier.class));
}
return null;
})
.when(mergeContext)
.putPayloadById(any(), any());
final long timestamp = System.currentTimeMillis() / 1000;
var payloadId1 =
coordinator.preparePayload(
genesisState.getBlock().getHeader(), timestamp, Bytes32.ZERO, suggestedFeeRecipient);
final CompletableFuture<Void> task1 = blockCreationTask;
var payloadId2 =
coordinator.preparePayload(
genesisState.getBlock().getHeader(), timestamp, Bytes32.ZERO, suggestedFeeRecipient);
assertThat(payloadId1).isEqualTo(payloadId2);
final CompletableFuture<Void> task2 = blockCreationTask;
assertThat(task1).isSameAs(task2);
blockCreationTask.get();
ArgumentCaptor<Block> block = ArgumentCaptor.forClass(Block.class);
verify(mergeContext, times(retries.intValue())).putPayloadById(eq(payloadId1), block.capture());
assertThat(block.getAllValues().size()).isEqualTo(retries.intValue());
for (int i = 0; i < retries.intValue(); i++) {
assertThat(block.getAllValues().get(i).getBody().getTransactions()).hasSize(i);
}
}
@Test
@ -512,7 +672,7 @@ public class MergeCoordinatorTest implements MergeGenesisConfigHelper {
new MergeCoordinator(
mockProtocolContext,
mockProtocolSchedule,
ethContext,
CompletableFuture::runAsync,
transactions,
new MiningParameters.Builder().coinbase(coinbase).build(),
mock(BackwardSyncContext.class));
@ -573,7 +733,7 @@ public class MergeCoordinatorTest implements MergeGenesisConfigHelper {
new MergeCoordinator(
mockProtocolContext,
mockProtocolSchedule,
ethContext,
CompletableFuture::runAsync,
transactions,
new MiningParameters.Builder().coinbase(coinbase).build(),
mock(BackwardSyncContext.class));
@ -764,7 +924,7 @@ public class MergeCoordinatorTest implements MergeGenesisConfigHelper {
new MergeCoordinator(
mockProtocolContext,
mockProtocolSchedule,
ethContext,
CompletableFuture::runAsync,
transactions,
new MiningParameters.Builder().coinbase(coinbase).build(),
mock(BackwardSyncContext.class)));

@ -34,7 +34,6 @@ import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture;
import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.core.MiningParameters;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardSyncContext;
import org.hyperledger.besu.ethereum.eth.transactions.sorter.AbstractPendingTransactionsSorter;
import org.hyperledger.besu.ethereum.mainnet.BlockHeaderValidator;
@ -46,6 +45,7 @@ import org.hyperledger.besu.util.Log4j2ConfiguratorUtil;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.junit.Before;
import org.junit.Test;
@ -87,7 +87,7 @@ public class MergeReorgTest implements MergeGenesisConfigHelper {
new MergeCoordinator(
protocolContext,
mockProtocolSchedule,
mock(EthContext.class),
CompletableFuture::runAsync,
mockSorter,
new MiningParameters.Builder().coinbase(coinbase).build(),
mock(BackwardSyncContext.class));

@ -17,6 +17,7 @@ package org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.engine;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.debugLambda;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.infoLambda;
import org.hyperledger.besu.consensus.merge.blockcreation.MergeMiningCoordinator;
import org.hyperledger.besu.consensus.merge.blockcreation.PayloadIdentifier;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.api.jsonrpc.RpcMethod;
@ -37,15 +38,18 @@ import org.slf4j.LoggerFactory;
public class EngineGetPayload extends ExecutionEngineJsonRpcMethod {
private final MergeMiningCoordinator mergeMiningCoordinator;
private final BlockResultFactory blockResultFactory;
private static final Logger LOG = LoggerFactory.getLogger(EngineGetPayload.class);
public EngineGetPayload(
final Vertx vertx,
final ProtocolContext protocolContext,
final MergeMiningCoordinator mergeMiningCoordinator,
final BlockResultFactory blockResultFactory,
final EngineCallListener engineCallListener) {
super(vertx, protocolContext, engineCallListener);
this.mergeMiningCoordinator = mergeMiningCoordinator;
this.blockResultFactory = blockResultFactory;
}
@ -59,7 +63,7 @@ public class EngineGetPayload extends ExecutionEngineJsonRpcMethod {
engineCallListener.executionEngineCalled();
final PayloadIdentifier payloadId = request.getRequiredParameter(0, PayloadIdentifier.class);
mergeMiningCoordinator.finalizeProposalById(payloadId);
final Optional<Block> block = mergeContext.get().retrieveBlockById(payloadId);
if (block.isPresent()) {
var proposal = block.get();

@ -68,7 +68,11 @@ public class ExecutionEngineJsonRpcMethods extends ApiGroupJsonRpcMethods {
if (mergeCoordinator.isPresent()) {
return mapOf(
new EngineGetPayload(
consensusEngineServer, protocolContext, blockResultFactory, engineQosTimer),
consensusEngineServer,
protocolContext,
mergeCoordinator.get(),
blockResultFactory,
engineQosTimer),
new EngineNewPayload(
consensusEngineServer,
protocolContext,

@ -20,6 +20,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.consensus.merge.MergeContext;
import org.hyperledger.besu.consensus.merge.blockcreation.MergeMiningCoordinator;
import org.hyperledger.besu.consensus.merge.blockcreation.PayloadIdentifier;
import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.datatypes.Hash;
@ -66,6 +67,7 @@ public class EngineGetPayloadTest {
@Mock private ProtocolContext protocolContext;
@Mock private MergeContext mergeContext;
@Mock private MergeMiningCoordinator mergeMiningCoordinator;
@Mock private EngineCallListener engineCallListener;
@ -73,7 +75,9 @@ public class EngineGetPayloadTest {
public void before() {
when(mergeContext.retrieveBlockById(mockPid)).thenReturn(Optional.of(mockBlock));
when(protocolContext.safeConsensusContext(Mockito.any())).thenReturn(Optional.of(mergeContext));
this.method = new EngineGetPayload(vertx, protocolContext, factory, engineCallListener);
this.method =
new EngineGetPayload(
vertx, protocolContext, mergeMiningCoordinator, factory, engineCallListener);
}
@Test

@ -90,12 +90,17 @@ public class MonitoredExecutors {
public static ExecutorService newCachedThreadPool(
final String name, final MetricsSystem metricsSystem) {
return newCachedThreadPool(name, 0, metricsSystem);
}
public static ExecutorService newCachedThreadPool(
final String name, final int corePoolSize, final MetricsSystem metricsSystem) {
return newMonitoredExecutor(
name,
metricsSystem,
(rejectedExecutionHandler, threadFactory) ->
new ThreadPoolExecutor(
0,
corePoolSize,
Integer.MAX_VALUE,
60L,
TimeUnit.SECONDS,

Loading…
Cancel
Save