Detect zombies backend query and abort useless processing. (#1042)

* Detect zombies backend query and abort useless processing.
The timeout handler populate the request context with a boolean value indicating whether or not the query is alive, i.e the HTTP request has not expired. Backend queries are now conditioned by this value and can be stopped if needed.
This PR experiments this mechanism on a reduced scope. Hence, only `eth_getLogs` backend queries are affected.

- Created `BackendQuery` utility class to run a process only if the query is alive, i.e timeout not expired.
- Put `AtomicBoolean` value in the `JsonRpcRequestContext`
- `TimeoutHandler` sets the alive value to `false` if the timeout handler is triggered.
- Updated `BlockchainQueries` to run steps depending on the value of the `AtomicBoolean` retrieved from the request context.
- Added unit tests.

Signed-off-by: Abdelhamid Bakhta <abdelhamid.bakhta@consensys.net>
pull/1100/head
Abdelhamid Bakhta 4 years ago committed by GitHub
parent dc4e2a8368
commit 903afdedd6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 19
      besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java
  2. 8
      besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java
  3. 53
      besu/src/test/java/org/hyperledger/besu/cli/BesuCommandTest.java
  4. 44
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/graphql/GraphQLDataFetcherContext.java
  5. 103
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/graphql/GraphQLDataFetcherContextImpl.java
  6. 8
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/graphql/GraphQLDataFetchers.java
  7. 12
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/graphql/GraphQLHttpService.java
  8. 2
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/graphql/internal/pojoadapter/BlockAdapterBase.java
  9. 53
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/handlers/IsAliveHandler.java
  10. 21
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/handlers/RpcMethodTimeoutException.java
  11. 5
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/handlers/TimeoutHandler.java
  12. 13
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/JsonRpcHttpService.java
  13. 25
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/JsonRpcRequestContext.java
  14. 3
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/filter/FilterManager.java
  15. 10
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/EthGetLogs.java
  16. 2
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/response/JsonRpcError.java
  17. 18
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/WebSocketConfiguration.java
  18. 117
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/WebSocketRequestHandler.java
  19. 54
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/BackendQuery.java
  20. 167
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/BlockchainQueries.java
  21. 2
      ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/graphql/AbstractDataFetcherTest.java
  22. 12
      ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/graphql/AbstractEthGraphQLHttpServiceTest.java
  23. 12
      ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/graphql/GraphQLHttpServiceCorsTest.java
  24. 12
      ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/graphql/GraphQLHttpServiceHostWhitelistTest.java
  25. 19
      ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/graphql/GraphQLHttpServiceTest.java
  26. 12
      ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/filter/FilterManagerLogFilterTest.java
  27. 11
      ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/WebSocketHostWhitelistTest.java
  28. 9
      ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/WebSocketRequestHandlerTest.java
  29. 11
      ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/WebSocketServiceLoginTest.java
  30. 11
      ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/WebSocketServiceTest.java
  31. 10
      ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/methods/EthSubscribeIntegrationTest.java
  32. 10
      ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/methods/EthUnsubscribeIntegrationTest.java
  33. 74
      ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/query/BackendQueryTest.java
  34. 6
      ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/query/BlockchainQueriesLogCacheTest.java
  35. 9
      ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/query/BlockchainQueriesTest.java

@ -25,7 +25,7 @@ import org.hyperledger.besu.controller.BesuController;
import org.hyperledger.besu.crypto.NodeKey;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.api.graphql.GraphQLConfiguration;
import org.hyperledger.besu.ethereum.api.graphql.GraphQLDataFetcherContext;
import org.hyperledger.besu.ethereum.api.graphql.GraphQLDataFetcherContextImpl;
import org.hyperledger.besu.ethereum.api.graphql.GraphQLDataFetchers;
import org.hyperledger.besu.ethereum.api.graphql.GraphQLHttpService;
import org.hyperledger.besu.ethereum.api.graphql.GraphQLProvider;
@ -485,8 +485,8 @@ public class RunnerBuilder {
Optional<GraphQLHttpService> graphQLHttpService = Optional.empty();
if (graphQLConfiguration.isEnabled()) {
final GraphQLDataFetchers fetchers = new GraphQLDataFetchers(supportedCapabilities);
final GraphQLDataFetcherContext dataFetcherContext =
new GraphQLDataFetcherContext(
final GraphQLDataFetcherContextImpl dataFetcherContext =
new GraphQLDataFetcherContextImpl(
blockchainQueries,
protocolSchedule,
transactionPool,
@ -502,7 +502,12 @@ public class RunnerBuilder {
graphQLHttpService =
Optional.of(
new GraphQLHttpService(
vertx, dataDir, graphQLConfiguration, graphQL, dataFetcherContext));
vertx,
dataDir,
graphQLConfiguration,
graphQL,
dataFetcherContext,
besuController.getProtocolManager().ethContext().getScheduler()));
}
Optional<WebSocketService> webSocketService = Optional.empty();
@ -783,7 +788,11 @@ public class RunnerBuilder {
}
final WebSocketRequestHandler websocketRequestHandler =
new WebSocketRequestHandler(vertx, websocketMethodsFactory.methods());
new WebSocketRequestHandler(
vertx,
websocketMethodsFactory.methods(),
besuController.getProtocolManager().ethContext().getScheduler(),
webSocketConfiguration.getTimeoutSec());
return new WebSocketService(vertx, configuration, websocketRequestHandler);
}

@ -1010,6 +1010,13 @@ public class BesuCommand implements DefaultCommandValues, Runnable {
arity = "1")
private final Long httpTimeoutSec = TimeoutOptions.defaultOptions().getTimeoutSeconds();
@CommandLine.Option(
hidden = true,
names = {"--Xws-timeout-seconds"},
description = "Web socket timeout in seconds (default: ${DEFAULT-VALUE})",
arity = "1")
private final Long wsTimeoutSec = TimeoutOptions.defaultOptions().getTimeoutSeconds();
private EthNetworkConfig ethNetworkConfig;
private JsonRpcConfiguration jsonRpcConfiguration;
private GraphQLConfiguration graphQLConfiguration;
@ -1653,6 +1660,7 @@ public class BesuCommand implements DefaultCommandValues, Runnable {
webSocketConfiguration.setAuthenticationCredentialsFile(rpcWsAuthenticationCredentialsFile());
webSocketConfiguration.setHostsAllowlist(hostsAllowlist);
webSocketConfiguration.setAuthenticationPublicKeyFile(rpcWsAuthenticationPublicKeyFile);
webSocketConfiguration.setTimeoutSec(wsTimeoutSec);
return webSocketConfiguration;
}

@ -45,6 +45,7 @@ import org.hyperledger.besu.cli.config.EthNetworkConfig;
import org.hyperledger.besu.config.GenesisConfigFile;
import org.hyperledger.besu.config.experimental.ExperimentalEIPs;
import org.hyperledger.besu.ethereum.api.graphql.GraphQLConfiguration;
import org.hyperledger.besu.ethereum.api.handlers.TimeoutOptions;
import org.hyperledger.besu.ethereum.api.jsonrpc.JsonRpcConfiguration;
import org.hyperledger.besu.ethereum.api.jsonrpc.RpcApi;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.WebSocketConfiguration;
@ -3431,4 +3432,56 @@ public class BesuCommandTest extends CommandTestAbstract {
assertThat(commandErrorOutput.toString()).isEmpty();
assertThat(ExperimentalEIPs.eip1559Enabled).isFalse();
}
@Test
public void assertThatDefaultHttpTimeoutSecondsWorks() {
parseCommand();
assertThat(commandErrorOutput.toString()).isEmpty();
verify(mockRunnerBuilder).jsonRpcConfiguration(jsonRpcConfigArgumentCaptor.capture());
verify(mockRunnerBuilder).build();
assertThat(jsonRpcConfigArgumentCaptor.getValue().getHttpTimeoutSec())
.isEqualTo(TimeoutOptions.defaultOptions().getTimeoutSeconds());
}
@Test
public void assertThatHttpTimeoutSecondsWorks() {
parseCommand("--Xhttp-timeout-seconds=513");
assertThat(commandErrorOutput.toString()).isEmpty();
verify(mockRunnerBuilder).jsonRpcConfiguration(jsonRpcConfigArgumentCaptor.capture());
verify(mockRunnerBuilder).build();
assertThat(jsonRpcConfigArgumentCaptor.getValue().getHttpTimeoutSec()).isEqualTo(513);
}
@Test
public void assertThatInvalidHttpTimeoutSecondsFails() {
parseCommand("--Xhttp-timeout-seconds=abc");
assertThat(commandErrorOutput.toString())
.contains("Invalid value for option", "--Xhttp-timeout-seconds", "abc", "is not a long");
}
@Test
public void assertThatDefaultWsTimeoutSecondsWorks() {
parseCommand();
assertThat(commandErrorOutput.toString()).isEmpty();
verify(mockRunnerBuilder).webSocketConfiguration(wsRpcConfigArgumentCaptor.capture());
verify(mockRunnerBuilder).build();
assertThat(wsRpcConfigArgumentCaptor.getValue().getTimeoutSec())
.isEqualTo(TimeoutOptions.defaultOptions().getTimeoutSeconds());
}
@Test
public void assertThatWsTimeoutSecondsWorks() {
parseCommand("--Xws-timeout-seconds=11112018");
assertThat(commandErrorOutput.toString()).isEmpty();
verify(mockRunnerBuilder).webSocketConfiguration(wsRpcConfigArgumentCaptor.capture());
verify(mockRunnerBuilder).build();
assertThat(wsRpcConfigArgumentCaptor.getValue().getTimeoutSec()).isEqualTo(11112018);
}
@Test
public void assertThatInvalidWsTimeoutSecondsFails() {
parseCommand("--Xws-timeout-seconds=abc");
assertThat(commandErrorOutput.toString())
.contains("Invalid value for option", "--Xws-timeout-seconds", "abc", "is not a long");
}
}

@ -14,50 +14,26 @@
*/
package org.hyperledger.besu.ethereum.api.graphql;
import org.hyperledger.besu.ethereum.api.handlers.IsAliveHandler;
import org.hyperledger.besu.ethereum.api.query.BlockchainQueries;
import org.hyperledger.besu.ethereum.blockcreation.MiningCoordinator;
import org.hyperledger.besu.ethereum.core.Synchronizer;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
public class GraphQLDataFetcherContext {
private final BlockchainQueries blockchainQueries;
private final MiningCoordinator miningCoordinator;
private final Synchronizer synchronizer;
private final ProtocolSchedule protocolSchedule;
private final TransactionPool transactionPool;
public GraphQLDataFetcherContext(
final BlockchainQueries blockchainQueries,
final ProtocolSchedule protocolSchedule,
final TransactionPool transactionPool,
final MiningCoordinator miningCoordinator,
final Synchronizer synchronizer) {
this.blockchainQueries = blockchainQueries;
this.protocolSchedule = protocolSchedule;
this.miningCoordinator = miningCoordinator;
this.synchronizer = synchronizer;
this.transactionPool = transactionPool;
}
public interface GraphQLDataFetcherContext {
public TransactionPool getTransactionPool() {
return transactionPool;
}
TransactionPool getTransactionPool();
public BlockchainQueries getBlockchainQueries() {
return blockchainQueries;
}
BlockchainQueries getBlockchainQueries();
public MiningCoordinator getMiningCoordinator() {
return miningCoordinator;
}
MiningCoordinator getMiningCoordinator();
public Synchronizer getSynchronizer() {
return synchronizer;
}
Synchronizer getSynchronizer();
ProtocolSchedule getProtocolSchedule();
public ProtocolSchedule getProtocolSchedule() {
return protocolSchedule;
default IsAliveHandler getIsAliveHandler() {
return new IsAliveHandler(true);
}
}

@ -0,0 +1,103 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.api.graphql;
import org.hyperledger.besu.ethereum.api.handlers.IsAliveHandler;
import org.hyperledger.besu.ethereum.api.query.BlockchainQueries;
import org.hyperledger.besu.ethereum.blockcreation.MiningCoordinator;
import org.hyperledger.besu.ethereum.core.Synchronizer;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
public class GraphQLDataFetcherContextImpl implements GraphQLDataFetcherContext {
private final BlockchainQueries blockchainQueries;
private final MiningCoordinator miningCoordinator;
private final Synchronizer synchronizer;
private final ProtocolSchedule protocolSchedule;
private final TransactionPool transactionPool;
private final IsAliveHandler isAliveHandler;
public GraphQLDataFetcherContextImpl(
final GraphQLDataFetcherContext context, final IsAliveHandler isAliveHandler) {
this(
context.getBlockchainQueries(),
context.getProtocolSchedule(),
context.getTransactionPool(),
context.getMiningCoordinator(),
context.getSynchronizer(),
isAliveHandler);
}
public GraphQLDataFetcherContextImpl(
final BlockchainQueries blockchainQueries,
final ProtocolSchedule protocolSchedule,
final TransactionPool transactionPool,
final MiningCoordinator miningCoordinator,
final Synchronizer synchronizer) {
this(
blockchainQueries,
protocolSchedule,
transactionPool,
miningCoordinator,
synchronizer,
new IsAliveHandler(true));
}
public GraphQLDataFetcherContextImpl(
final BlockchainQueries blockchainQueries,
final ProtocolSchedule protocolSchedule,
final TransactionPool transactionPool,
final MiningCoordinator miningCoordinator,
final Synchronizer synchronizer,
final IsAliveHandler isAliveHandler) {
this.blockchainQueries = blockchainQueries;
this.protocolSchedule = protocolSchedule;
this.miningCoordinator = miningCoordinator;
this.synchronizer = synchronizer;
this.transactionPool = transactionPool;
this.isAliveHandler = isAliveHandler;
}
@Override
public TransactionPool getTransactionPool() {
return transactionPool;
}
@Override
public BlockchainQueries getBlockchainQueries() {
return blockchainQueries;
}
@Override
public MiningCoordinator getMiningCoordinator() {
return miningCoordinator;
}
@Override
public Synchronizer getSynchronizer() {
return synchronizer;
}
@Override
public ProtocolSchedule getProtocolSchedule() {
return protocolSchedule;
}
@Override
public IsAliveHandler getIsAliveHandler() {
return isAliveHandler;
}
}

@ -212,8 +212,8 @@ public class GraphQLDataFetchers {
DataFetcher<Optional<List<LogAdapter>>> getLogsDataFetcher() {
return dataFetchingEnvironment -> {
final BlockchainQueries blockchainQuery =
((GraphQLDataFetcherContext) dataFetchingEnvironment.getContext()).getBlockchainQueries();
final GraphQLDataFetcherContext dataFetcherContext = dataFetchingEnvironment.getContext();
final BlockchainQueries blockchainQuery = dataFetcherContext.getBlockchainQueries();
final Map<String, Object> filter = dataFetchingEnvironment.getArgument("filter");
@ -238,7 +238,9 @@ public class GraphQLDataFetchers {
final LogsQuery query =
new LogsQuery.Builder().addresses(addrs).topics(transformedTopics).build();
final List<LogWithMetadata> logs = blockchainQuery.matchingLogs(fromBlock, toBlock, query);
final List<LogWithMetadata> logs =
blockchainQuery.matchingLogs(
fromBlock, toBlock, query, dataFetcherContext.getIsAliveHandler());
final List<LogAdapter> results = new ArrayList<>();
for (final LogWithMetadata log : logs) {
results.add(new LogAdapter(log));

@ -24,8 +24,10 @@ import org.hyperledger.besu.ethereum.api.graphql.internal.response.GraphQLJsonRe
import org.hyperledger.besu.ethereum.api.graphql.internal.response.GraphQLResponse;
import org.hyperledger.besu.ethereum.api.graphql.internal.response.GraphQLResponseType;
import org.hyperledger.besu.ethereum.api.graphql.internal.response.GraphQLSuccessResponse;
import org.hyperledger.besu.ethereum.api.handlers.IsAliveHandler;
import org.hyperledger.besu.ethereum.api.handlers.TimeoutHandler;
import org.hyperledger.besu.ethereum.api.handlers.TimeoutOptions;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.util.NetworkUtility;
import java.net.InetSocketAddress;
@ -86,6 +88,7 @@ public class GraphQLHttpService {
private final GraphQL graphQL;
private final GraphQLDataFetcherContext dataFetcherContext;
private final EthScheduler scheduler;
/**
* Construct a GraphQLHttpService handler
@ -95,13 +98,15 @@ public class GraphQLHttpService {
* @param config Configuration for the rpc methods being loaded
* @param graphQL GraphQL engine
* @param dataFetcherContext DataFetcherContext required by GraphQL to finish it's job
* @param scheduler {@link EthScheduler} used to trigger timeout on backend queries
*/
public GraphQLHttpService(
final Vertx vertx,
final Path dataDir,
final GraphQLConfiguration config,
final GraphQL graphQL,
final GraphQLDataFetcherContext dataFetcherContext) {
final GraphQLDataFetcherContextImpl dataFetcherContext,
final EthScheduler scheduler) {
this.dataDir = dataDir;
validateConfig(config);
@ -109,6 +114,7 @@ public class GraphQLHttpService {
this.vertx = vertx;
this.graphQL = graphQL;
this.dataFetcherContext = dataFetcherContext;
this.scheduler = scheduler;
}
private void validateConfig(final GraphQLConfiguration config) {
@ -382,7 +388,9 @@ public class GraphQLHttpService {
.query(requestJson)
.operationName(operationName)
.variables(variables)
.context(dataFetcherContext)
.context(
new GraphQLDataFetcherContextImpl(
dataFetcherContext, new IsAliveHandler(scheduler, config.getHttpTimeoutSec())))
.build();
final ExecutionResult result = graphQL.execute(executionInput);
final Map<String, Object> toSpecificationResult = result.toSpecification();

@ -164,7 +164,7 @@ public class BlockAdapterBase extends AdapterBase {
final BlockchainQueries blockchain = getBlockchainQueries(environment);
final Hash hash = header.getHash();
final List<LogWithMetadata> logs = blockchain.matchingLogs(hash, query);
final List<LogWithMetadata> logs = blockchain.matchingLogs(hash, query, () -> true);
final List<LogAdapter> results = new ArrayList<>();
for (final LogWithMetadata log : logs) {
results.add(new LogAdapter(log));

@ -0,0 +1,53 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.api.handlers;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
public class IsAliveHandler implements Supplier<Boolean> {
private final AtomicBoolean alive;
public IsAliveHandler(final boolean alive) {
this(new AtomicBoolean(alive));
}
public IsAliveHandler(final AtomicBoolean alive) {
this.alive = alive;
}
public IsAliveHandler(final EthScheduler ethScheduler, final long timeoutSec) {
this(ethScheduler, new AtomicBoolean(true), timeoutSec);
}
public IsAliveHandler(
final EthScheduler ethScheduler, final AtomicBoolean alive, final long timeoutSec) {
this.alive = alive;
ethScheduler.scheduleFutureTask(this::triggerTimeout, Duration.ofSeconds(timeoutSec));
}
private void triggerTimeout() {
alive.set(false);
}
@Override
public Boolean get() {
return alive.get();
}
}

@ -0,0 +1,21 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.api.handlers;
public class RpcMethodTimeoutException extends RuntimeException {
public RpcMethodTimeoutException() {
super("Timeout expired");
}
}

@ -64,7 +64,10 @@ public class TimeoutHandler {
ctx.vertx()
.setTimer(
timeoutOptions.getTimeoutMillis(),
t -> ctx.fail(timeoutOptions.getErrorCode()));
t -> {
ctx.fail(timeoutOptions.getErrorCode());
ctx.response().close();
});
ctx.addBodyEndHandler(v -> ctx.vertx().cancelTimer(tid));
});
}

@ -469,7 +469,7 @@ public class JsonRpcHttpService {
final HttpServerResponse response = routingContext.response();
vertx.executeBlocking(
future -> {
final JsonRpcResponse jsonRpcResponse = process(request, user);
final JsonRpcResponse jsonRpcResponse = process(routingContext, request, user);
future.complete(jsonRpcResponse);
},
false,
@ -529,7 +529,7 @@ public class JsonRpcHttpService {
final JsonObject req = (JsonObject) obj;
final Future<JsonRpcResponse> fut = Future.future();
vertx.executeBlocking(
future -> future.complete(process(req, user)),
future -> future.complete(process(routingContext, req, user)),
false,
ar -> {
if (ar.failed()) {
@ -567,7 +567,8 @@ public class JsonRpcHttpService {
return result.getType() != JsonRpcResponseType.NONE;
}
private JsonRpcResponse process(final JsonObject requestJson, final Optional<User> user) {
private JsonRpcResponse process(
final RoutingContext ctx, final JsonObject requestJson, final Optional<User> user) {
final JsonRpcRequest requestBody;
Object id = null;
try {
@ -594,9 +595,11 @@ public class JsonRpcHttpService {
try (final OperationTimer.TimingContext ignored =
requestTimer.labels(requestBody.getMethod()).startTimer()) {
if (user.isPresent()) {
return method.response(new JsonRpcRequestContext(requestBody, user.get()));
return method.response(
new JsonRpcRequestContext(requestBody, user.get(), () -> !ctx.response().closed()));
}
return method.response(new JsonRpcRequestContext(requestBody));
return method.response(
new JsonRpcRequestContext(requestBody, () -> !ctx.response().closed()));
} catch (final InvalidJsonRpcParameters e) {
LOG.debug("Invalid Params", e);
return errorResponse(id, JsonRpcError.INVALID_PARAMS);

@ -16,6 +16,7 @@ package org.hyperledger.besu.ethereum.api.jsonrpc.internal;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import io.vertx.ext.auth.User;
@ -23,18 +24,32 @@ public class JsonRpcRequestContext {
private final JsonRpcRequest jsonRpcRequest;
private final Optional<User> user;
private final Supplier<Boolean> alive;
public JsonRpcRequestContext(final JsonRpcRequest jsonRpcRequest) {
this(jsonRpcRequest, Optional.empty());
this(jsonRpcRequest, () -> true);
}
public JsonRpcRequestContext(final JsonRpcRequest jsonRpcRequest, final Supplier<Boolean> alive) {
this(jsonRpcRequest, Optional.empty(), alive);
}
public JsonRpcRequestContext(final JsonRpcRequest jsonRpcRequest, final User user) {
this(jsonRpcRequest, Optional.of(user));
this(jsonRpcRequest, Optional.of(user), () -> true);
}
public JsonRpcRequestContext(
final JsonRpcRequest jsonRpcRequest, final User user, final Supplier<Boolean> alive) {
this(jsonRpcRequest, Optional.of(user), alive);
}
public JsonRpcRequestContext(final JsonRpcRequest jsonRpcRequest, final Optional<User> user) {
public JsonRpcRequestContext(
final JsonRpcRequest jsonRpcRequest,
final Optional<User> user,
final Supplier<Boolean> alive) {
this.jsonRpcRequest = jsonRpcRequest;
this.user = user;
this.alive = alive;
}
public JsonRpcRequest getRequest() {
@ -69,4 +84,8 @@ public class JsonRpcRequestContext {
public int hashCode() {
return Objects.hash(jsonRpcRequest, user);
}
public boolean isAlive() {
return alive.get();
}
}

@ -296,7 +296,8 @@ public class FilterManager extends AbstractVerticle {
filter.getLogsQuery()))
.orElse(emptyList());
} else {
return blockchainQueries.matchingLogs(fromBlockNumber, toBlockNumber, filter.getLogsQuery());
return blockchainQueries.matchingLogs(
fromBlockNumber, toBlockNumber, filter.getLogsQuery(), () -> true);
}
}
}

@ -52,14 +52,20 @@ public class EthGetLogs implements JsonRpcMethod {
final List<LogWithMetadata> matchingLogs =
filter
.getBlockHash()
.map(blockHash -> blockchain.matchingLogs(blockHash, filter.getLogsQuery()))
.map(
blockHash ->
blockchain.matchingLogs(
blockHash, filter.getLogsQuery(), requestContext::isAlive))
.orElseGet(
() -> {
final long fromBlockNumber = filter.getFromBlock().getNumber().orElse(0);
final long toBlockNumber =
filter.getToBlock().getNumber().orElse(blockchain.headBlockNumber());
return blockchain.matchingLogs(
fromBlockNumber, toBlockNumber, filter.getLogsQuery());
fromBlockNumber,
toBlockNumber,
filter.getLogsQuery(),
requestContext::isAlive);
});
return new JsonRpcSuccessResponse(

@ -27,6 +27,8 @@ public enum JsonRpcError {
METHOD_NOT_FOUND(-32601, "Method not found"),
INVALID_PARAMS(-32602, "Invalid params"),
INTERNAL_ERROR(-32603, "Internal error"),
TIMEOUT_ERROR(-32603, "Timeout expired"),
METHOD_NOT_ENABLED(-32604, "Method not enabled"),
// eth_sendTransaction specific error message

@ -14,6 +14,7 @@
*/
package org.hyperledger.besu.ethereum.api.jsonrpc.websocket;
import org.hyperledger.besu.ethereum.api.handlers.TimeoutOptions;
import org.hyperledger.besu.ethereum.api.jsonrpc.RpcApi;
import org.hyperledger.besu.ethereum.api.jsonrpc.RpcApis;
@ -40,6 +41,7 @@ public class WebSocketConfiguration {
private String authenticationCredentialsFile;
private List<String> hostsWhitelist = Arrays.asList("localhost", "127.0.0.1");
private File authenticationPublicKeyFile;
private long timeoutSec;
public static WebSocketConfiguration createDefault() {
final WebSocketConfiguration config = new WebSocketConfiguration();
@ -47,6 +49,7 @@ public class WebSocketConfiguration {
config.setHost(DEFAULT_WEBSOCKET_HOST);
config.setPort(DEFAULT_WEBSOCKET_PORT);
config.setRpcApis(DEFAULT_WEBSOCKET_APIS);
config.setTimeoutSec(TimeoutOptions.defaultOptions().getTimeoutSeconds());
return config;
}
@ -116,6 +119,14 @@ public class WebSocketConfiguration {
this.authenticationPublicKeyFile = authenticationPublicKeyFile;
}
public long getTimeoutSec() {
return timeoutSec;
}
public void setTimeoutSec(final long timeoutSec) {
this.timeoutSec = timeoutSec;
}
@Override
public boolean equals(final Object o) {
if (this == o) {
@ -132,7 +143,8 @@ public class WebSocketConfiguration {
&& Objects.equals(rpcApis, that.rpcApis)
&& Objects.equals(authenticationCredentialsFile, that.authenticationCredentialsFile)
&& Objects.equals(hostsWhitelist, that.hostsWhitelist)
&& Objects.equals(authenticationPublicKeyFile, that.authenticationPublicKeyFile);
&& Objects.equals(authenticationPublicKeyFile, that.authenticationPublicKeyFile)
&& timeoutSec == that.timeoutSec;
}
@Override
@ -145,7 +157,8 @@ public class WebSocketConfiguration {
authenticationEnabled,
authenticationCredentialsFile,
hostsWhitelist,
authenticationPublicKeyFile);
authenticationPublicKeyFile,
timeoutSec);
}
@Override
@ -159,6 +172,7 @@ public class WebSocketConfiguration {
.add("authenticationCredentialsFile", authenticationCredentialsFile)
.add("hostsWhitelist", hostsWhitelist)
.add("authenticationPublicKeyFile", authenticationPublicKeyFile)
.add("timeoutSec", timeoutSec)
.toString();
}
}

@ -14,6 +14,8 @@
*/
package org.hyperledger.besu.ethereum.api.jsonrpc.websocket;
import org.hyperledger.besu.ethereum.api.handlers.IsAliveHandler;
import org.hyperledger.besu.ethereum.api.handlers.RpcMethodTimeoutException;
import org.hyperledger.besu.ethereum.api.jsonrpc.authentication.AuthenticationService;
import org.hyperledger.besu.ethereum.api.jsonrpc.authentication.AuthenticationUtils;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequestContext;
@ -23,10 +25,14 @@ import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcError;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcErrorResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcUnauthorizedResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.methods.WebSocketRpcRequest;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import java.util.Map;
import java.util.Optional;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.DecodeException;
@ -41,10 +47,18 @@ public class WebSocketRequestHandler {
private final Vertx vertx;
private final Map<String, JsonRpcMethod> methods;
final EthScheduler ethScheduler;
private final long timeoutSec;
public WebSocketRequestHandler(final Vertx vertx, final Map<String, JsonRpcMethod> methods) {
public WebSocketRequestHandler(
final Vertx vertx,
final Map<String, JsonRpcMethod> methods,
final EthScheduler ethScheduler,
final long timeoutSec) {
this.vertx = vertx;
this.methods = methods;
this.ethScheduler = ethScheduler;
this.timeoutSec = timeoutSec;
}
public void handle(final String id, final String payload) {
@ -57,51 +71,64 @@ public class WebSocketRequestHandler {
final String payload,
final Optional<User> user) {
vertx.executeBlocking(
future -> {
final WebSocketRpcRequest request;
try {
request = Json.decodeValue(payload, WebSocketRpcRequest.class);
} catch (final IllegalArgumentException | DecodeException e) {
LOG.debug("Error mapping json to WebSocketRpcRequest", e);
future.complete(new JsonRpcErrorResponse(null, JsonRpcError.INVALID_REQUEST));
return;
}
executeHandler(authenticationService, id, payload, user), false, resultHandler(id));
}
private Handler<Promise<Object>> executeHandler(
final Optional<AuthenticationService> authenticationService,
final String id,
final String payload,
final Optional<User> user) {
return future -> {
final WebSocketRpcRequest request;
try {
request = Json.decodeValue(payload, WebSocketRpcRequest.class);
} catch (final IllegalArgumentException | DecodeException e) {
LOG.debug("Error mapping json to WebSocketRpcRequest", e);
future.complete(new JsonRpcErrorResponse(null, JsonRpcError.INVALID_REQUEST));
return;
}
if (!methods.containsKey(request.getMethod())) {
future.complete(new JsonRpcErrorResponse(request.getId(), JsonRpcError.METHOD_NOT_FOUND));
LOG.debug("Can't find method {}", request.getMethod());
return;
}
final JsonRpcMethod method = methods.get(request.getMethod());
try {
LOG.debug("WS-RPC request -> {}", request.getMethod());
request.setConnectionId(id);
if (AuthenticationUtils.isPermitted(authenticationService, user, method)) {
final JsonRpcRequestContext requestContext =
new JsonRpcRequestContext(
request, user, new IsAliveHandler(ethScheduler, timeoutSec));
future.complete(method.response(requestContext));
} else {
future.complete(
new JsonRpcUnauthorizedResponse(request.getId(), JsonRpcError.UNAUTHORIZED));
}
} catch (final InvalidJsonRpcParameters e) {
LOG.debug("Invalid Params", e);
future.complete(new JsonRpcErrorResponse(request.getId(), JsonRpcError.INVALID_PARAMS));
} catch (final RpcMethodTimeoutException e) {
LOG.error(JsonRpcError.TIMEOUT_ERROR.getMessage(), e);
future.complete(new JsonRpcErrorResponse(request.getId(), JsonRpcError.TIMEOUT_ERROR));
} catch (final Exception e) {
LOG.error(JsonRpcError.INTERNAL_ERROR.getMessage(), e);
future.complete(new JsonRpcErrorResponse(request.getId(), JsonRpcError.INTERNAL_ERROR));
}
};
}
if (!methods.containsKey(request.getMethod())) {
future.complete(
new JsonRpcErrorResponse(request.getId(), JsonRpcError.METHOD_NOT_FOUND));
LOG.debug("Can't find method {}", request.getMethod());
return;
}
final JsonRpcMethod method = methods.get(request.getMethod());
try {
LOG.debug("WS-RPC request -> {}", request.getMethod());
request.setConnectionId(id);
if (AuthenticationUtils.isPermitted(authenticationService, user, method)) {
final JsonRpcRequestContext requestContext = new JsonRpcRequestContext(request, user);
future.complete(method.response(requestContext));
} else {
future.complete(
new JsonRpcUnauthorizedResponse(request.getId(), JsonRpcError.UNAUTHORIZED));
}
} catch (final InvalidJsonRpcParameters e) {
LOG.debug("Invalid Params", e);
future.complete(new JsonRpcErrorResponse(request.getId(), JsonRpcError.INVALID_PARAMS));
} catch (final Exception e) {
LOG.error(JsonRpcError.INTERNAL_ERROR.getMessage(), e);
future.complete(new JsonRpcErrorResponse(request.getId(), JsonRpcError.INTERNAL_ERROR));
}
},
false,
result -> {
if (result.succeeded()) {
replyToClient(id, Json.encodeToBuffer(result.result()));
} else {
replyToClient(
id,
Json.encodeToBuffer(new JsonRpcErrorResponse(null, JsonRpcError.INTERNAL_ERROR)));
}
});
private Handler<AsyncResult<Object>> resultHandler(final String id) {
return result -> {
if (result.succeeded()) {
replyToClient(id, Json.encodeToBuffer(result.result()));
} else {
replyToClient(
id, Json.encodeToBuffer(new JsonRpcErrorResponse(null, JsonRpcError.INTERNAL_ERROR)));
}
};
}
private void replyToClient(final String id, final Buffer request) {

@ -0,0 +1,54 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.api.query;
import org.hyperledger.besu.ethereum.api.handlers.RpcMethodTimeoutException;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class BackendQuery {
private static final Logger LOG = LogManager.getLogger();
public static <T> T runIfAlive(final Callable<T> task, final Supplier<Boolean> alive)
throws Exception {
return runIfAlive(Optional.empty(), task, alive);
}
public static <T> T runIfAlive(
final String taskName, final Callable<T> task, final Supplier<Boolean> alive)
throws Exception {
return runIfAlive(Optional.ofNullable(taskName), task, alive);
}
public static <T> T runIfAlive(
final Optional<String> taskName, final Callable<T> task, final Supplier<Boolean> alive)
throws Exception {
if (!alive.get()) {
LOG.warn(
"Zombie backend query detected [ {} ], aborting process.", taskName.orElse("unnamed"));
throw new RpcMethodTimeoutException();
}
return task.call();
}
public static void stopIfExpired(final Supplier<Boolean> alive) throws Exception {
runIfAlive(() -> null, alive);
}
}

@ -17,6 +17,7 @@ package org.hyperledger.besu.ethereum.api.query;
import static com.google.common.base.Preconditions.checkArgument;
import static org.hyperledger.besu.ethereum.api.query.TransactionLogBloomCacher.BLOCKS_PER_BLOOM_CACHE;
import org.hyperledger.besu.ethereum.api.handlers.RpcMethodTimeoutException;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.chain.TransactionLocation;
import org.hyperledger.besu.ethereum.core.Account;
@ -46,6 +47,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
@ -537,43 +539,66 @@ public class BlockchainQueries {
* @param toBlockNumber The block number defining the last block in the search range (inclusive).
* @param query Constraints on required topics by topic index. For a given index if the set of
* topics is non-empty, the topic at this index must match one of the values in the set.
* @param isQueryAlive Whether or not the backend query should stay alive.
* @return The set of logs matching the given constraints.
*/
public List<LogWithMetadata> matchingLogs(
final long fromBlockNumber, final long toBlockNumber, final LogsQuery query) {
final List<LogWithMetadata> result = new ArrayList<>();
final long startSegment = fromBlockNumber / BLOCKS_PER_BLOOM_CACHE;
final long endSegment = toBlockNumber / BLOCKS_PER_BLOOM_CACHE;
long currentStep = fromBlockNumber;
for (long segment = startSegment; segment <= endSegment; segment++) {
final long thisSegment = segment;
final long thisStep = currentStep;
final long nextStep = (segment + 1) * BLOCKS_PER_BLOOM_CACHE;
result.addAll(
cachePath
.map(path -> path.resolve("logBloom-" + thisSegment + ".cache"))
.filter(Files::isRegularFile)
.map(
cacheFile ->
matchingLogsCached(
thisSegment * BLOCKS_PER_BLOOM_CACHE,
thisStep % BLOCKS_PER_BLOOM_CACHE,
Math.min(toBlockNumber, nextStep - 1) % BLOCKS_PER_BLOOM_CACHE,
query,
cacheFile))
.orElseGet(
() ->
matchingLogsUncached(
thisStep,
Math.min(toBlockNumber, Math.min(toBlockNumber, nextStep - 1)),
query)));
currentStep = nextStep;
final long fromBlockNumber,
final long toBlockNumber,
final LogsQuery query,
final Supplier<Boolean> isQueryAlive) {
try {
final List<LogWithMetadata> result = new ArrayList<>();
final long startSegment = fromBlockNumber / BLOCKS_PER_BLOOM_CACHE;
final long endSegment = toBlockNumber / BLOCKS_PER_BLOOM_CACHE;
long currentStep = fromBlockNumber;
for (long segment = startSegment; segment <= endSegment; segment++) {
final long thisSegment = segment;
final long thisStep = currentStep;
final long nextStep = (segment + 1) * BLOCKS_PER_BLOOM_CACHE;
BackendQuery.stopIfExpired(isQueryAlive);
result.addAll(
cachePath
.map(path -> path.resolve("logBloom-" + thisSegment + ".cache"))
.filter(Files::isRegularFile)
.map(
cacheFile -> {
try {
return matchingLogsCached(
thisSegment * BLOCKS_PER_BLOOM_CACHE,
thisStep % BLOCKS_PER_BLOOM_CACHE,
Math.min(toBlockNumber, nextStep - 1) % BLOCKS_PER_BLOOM_CACHE,
query,
cacheFile,
isQueryAlive);
} catch (Exception e) {
throw new RuntimeException(e);
}
})
.orElseGet(
() ->
matchingLogsUncached(
thisStep,
Math.min(toBlockNumber, Math.min(toBlockNumber, nextStep - 1)),
query,
isQueryAlive)));
currentStep = nextStep;
}
return result;
} catch (RpcMethodTimeoutException e) {
LOG.error("Error retrieving matching logs", e);
throw e;
} catch (Exception e) {
LOG.error("Error retrieving matching logs", e);
throw new RuntimeException(e);
}
return result;
}
private List<LogWithMetadata> matchingLogsUncached(
final long fromBlockNumber, final long toBlockNumber, final LogsQuery query) {
final long fromBlockNumber,
final long toBlockNumber,
final LogsQuery query,
final Supplier<Boolean> isQueryAlive) {
// rangeClosed handles the inverted from/to situations automatically with zero results.
return LongStream.rangeClosed(fromBlockNumber, toBlockNumber)
.mapToObj(blockchain::getBlockHeader)
@ -583,7 +608,7 @@ public class BlockchainQueries {
.takeWhile(Optional::isPresent)
.map(Optional::get)
.filter(header -> query.couldMatch(header.getLogsBloom()))
.flatMap(header -> matchingLogs(header.getHash(), query).stream())
.flatMap(header -> matchingLogs(header.getHash(), query, isQueryAlive).stream())
.collect(Collectors.toList());
}
@ -592,24 +617,31 @@ public class BlockchainQueries {
final long offset,
final long endOffset,
final LogsQuery query,
final Path cacheFile) {
final Path cacheFile,
final Supplier<Boolean> isQueryAlive)
throws Exception {
final List<LogWithMetadata> results = new ArrayList<>();
try (final RandomAccessFile raf = new RandomAccessFile(cacheFile.toFile(), "r")) {
raf.seek(offset * 256);
final byte[] bloomBuff = new byte[256];
final Bytes bytesValue = Bytes.wrap(bloomBuff);
for (long pos = offset; pos <= endOffset; pos++) {
BackendQuery.stopIfExpired(isQueryAlive);
try {
raf.readFully(bloomBuff);
} catch (final EOFException e) {
results.addAll(matchingLogsUncached(segmentStart + pos, segmentStart + endOffset, query));
results.addAll(
matchingLogsUncached(
segmentStart + pos, segmentStart + endOffset, query, isQueryAlive));
break;
}
final LogsBloomFilter logsBloom = new LogsBloomFilter(bytesValue);
if (query.couldMatch(logsBloom)) {
results.addAll(
matchingLogs(
blockchain.getBlockHashByNumber(segmentStart + pos).orElseThrow(), query));
blockchain.getBlockHashByNumber(segmentStart + pos).orElseThrow(),
query,
isQueryAlive));
}
}
} catch (final IOException e) {
@ -619,25 +651,56 @@ public class BlockchainQueries {
return results;
}
public List<LogWithMetadata> matchingLogs(final Hash blockHash, final LogsQuery query) {
final Optional<BlockHeader> blockHeader = blockchain.getBlockHeader(blockHash);
if (blockHeader.isEmpty()) {
return Collections.emptyList();
public List<LogWithMetadata> matchingLogs(
final Hash blockHash, final LogsQuery query, final Supplier<Boolean> isQueryAlive) {
try {
final Optional<BlockHeader> blockHeader =
BackendQuery.runIfAlive(
"matchingLogs - getBlockHeader",
() -> blockchain.getBlockHeader(blockHash),
isQueryAlive);
if (blockHeader.isEmpty()) {
return Collections.emptyList();
}
// receipts and transactions should exist if the header exists, so throwing is ok.
final List<TransactionReceipt> receipts =
BackendQuery.runIfAlive(
"matchingLogs - getTxReceipts",
() -> blockchain.getTxReceipts(blockHash).orElseThrow(),
isQueryAlive);
final List<Transaction> transactions =
BackendQuery.runIfAlive(
"matchingLogs - getBlockBody",
() -> blockchain.getBlockBody(blockHash).orElseThrow().getTransactions(),
isQueryAlive);
final long number = blockHeader.get().getNumber();
final boolean removed =
BackendQuery.runIfAlive(
"matchingLogs - blockIsOnCanonicalChain",
() -> !blockchain.blockIsOnCanonicalChain(blockHash),
isQueryAlive);
return IntStream.range(0, receipts.size())
.mapToObj(
i -> {
try {
BackendQuery.stopIfExpired(isQueryAlive);
return LogWithMetadata.generate(
receipts.get(i),
number,
blockHash,
transactions.get(i).getHash(),
i,
removed);
} catch (Exception e) {
throw new RuntimeException(e);
}
})
.flatMap(Collection::stream)
.filter(query::matches)
.collect(Collectors.toList());
} catch (Exception e) {
throw new RuntimeException(e);
}
// receipts and transactions should exist if the header exists, so throwing is ok.
final List<TransactionReceipt> receipts = blockchain.getTxReceipts(blockHash).orElseThrow();
final List<Transaction> transactions =
blockchain.getBlockBody(blockHash).orElseThrow().getTransactions();
final long number = blockHeader.get().getNumber();
final boolean removed = !blockchain.blockIsOnCanonicalChain(blockHash);
return IntStream.range(0, receipts.size())
.mapToObj(
i ->
LogWithMetadata.generate(
receipts.get(i), number, blockHash, transactions.get(i).getHash(), i, removed))
.flatMap(Collection::stream)
.filter(query::matches)
.collect(Collectors.toList());
}
/**

@ -38,7 +38,7 @@ public abstract class AbstractDataFetcherTest {
@Mock protected DataFetchingEnvironment environment;
@Mock protected GraphQLDataFetcherContext context;
@Mock protected GraphQLDataFetcherContextImpl context;
@Mock protected BlockchainQueries query;

@ -28,6 +28,7 @@ import org.hyperledger.besu.ethereum.core.Synchronizer;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.core.Wei;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactions;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.mainnet.HeaderValidationMode;
@ -161,8 +162,8 @@ public abstract class AbstractEthGraphQLHttpServiceTest {
final GraphQLConfiguration config = GraphQLConfiguration.createDefault();
config.setPort(0);
final GraphQLDataFetcherContext dataFetcherContext =
new GraphQLDataFetcherContext(
final GraphQLDataFetcherContextImpl dataFetcherContext =
new GraphQLDataFetcherContextImpl(
blockchainQueries,
PROTOCOL_SCHEDULE,
transactionPoolMock,
@ -174,7 +175,12 @@ public abstract class AbstractEthGraphQLHttpServiceTest {
service =
new GraphQLHttpService(
vertx, folder.newFolder().toPath(), config, graphQL, dataFetcherContext);
vertx,
folder.newFolder().toPath(),
config,
graphQL,
dataFetcherContext,
Mockito.mock(EthScheduler.class));
service.start().join();
client = new OkHttpClient();

@ -18,6 +18,7 @@ import org.hyperledger.besu.ethereum.api.query.BlockchainQueries;
import org.hyperledger.besu.ethereum.blockcreation.EthHashMiningCoordinator;
import org.hyperledger.besu.ethereum.core.Synchronizer;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability;
@ -212,8 +213,8 @@ public class GraphQLHttpServiceCorsTest {
final EthHashMiningCoordinator miningCoordinatorMock =
Mockito.mock(EthHashMiningCoordinator.class);
final GraphQLDataFetcherContext dataFetcherContext =
Mockito.mock(GraphQLDataFetcherContext.class);
final GraphQLDataFetcherContextImpl dataFetcherContext =
Mockito.mock(GraphQLDataFetcherContextImpl.class);
Mockito.when(dataFetcherContext.getBlockchainQueries()).thenReturn(blockchainQueries);
Mockito.when(dataFetcherContext.getMiningCoordinator()).thenReturn(miningCoordinatorMock);
@ -229,7 +230,12 @@ public class GraphQLHttpServiceCorsTest {
final GraphQLHttpService graphQLHttpService =
new GraphQLHttpService(
vertx, folder.newFolder().toPath(), config, graphQL, dataFetcherContext);
vertx,
folder.newFolder().toPath(),
config,
graphQL,
dataFetcherContext,
Mockito.mock(EthScheduler.class));
graphQLHttpService.start().join();
return graphQLHttpService;

@ -18,6 +18,7 @@ import org.hyperledger.besu.ethereum.api.query.BlockchainQueries;
import org.hyperledger.besu.ethereum.blockcreation.EthHashMiningCoordinator;
import org.hyperledger.besu.ethereum.core.Synchronizer;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability;
@ -73,8 +74,8 @@ public class GraphQLHttpServiceHostWhitelistTest {
final EthHashMiningCoordinator miningCoordinatorMock =
Mockito.mock(EthHashMiningCoordinator.class);
final GraphQLDataFetcherContext dataFetcherContext =
Mockito.mock(GraphQLDataFetcherContext.class);
final GraphQLDataFetcherContextImpl dataFetcherContext =
Mockito.mock(GraphQLDataFetcherContextImpl.class);
Mockito.when(dataFetcherContext.getBlockchainQueries()).thenReturn(blockchainQueries);
Mockito.when(dataFetcherContext.getMiningCoordinator()).thenReturn(miningCoordinatorMock);
@ -89,7 +90,12 @@ public class GraphQLHttpServiceHostWhitelistTest {
final GraphQL graphQL = GraphQLProvider.buildGraphQL(dataFetchers);
return new GraphQLHttpService(
vertx, folder.newFolder().toPath(), graphQLConfig, graphQL, dataFetcherContext);
vertx,
folder.newFolder().toPath(),
graphQLConfig,
graphQL,
dataFetcherContext,
Mockito.mock(EthScheduler.class));
}
private static GraphQLConfiguration createGraphQLConfig() {

@ -24,6 +24,7 @@ import org.hyperledger.besu.ethereum.core.Hash;
import org.hyperledger.besu.ethereum.core.Synchronizer;
import org.hyperledger.besu.ethereum.core.Wei;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability;
import org.hyperledger.besu.testutil.BlockTestUtil;
@ -70,7 +71,7 @@ public class GraphQLHttpServiceTest {
private static Synchronizer synchronizer;
private static GraphQL graphQL;
private static GraphQLDataFetchers dataFetchers;
private static GraphQLDataFetcherContext dataFetcherContext;
private static GraphQLDataFetcherContextImpl dataFetcherContext;
private static EthHashMiningCoordinator miningCoordinatorMock;
private final GraphQLTestHelper testHelper = new GraphQLTestHelper();
@ -83,7 +84,7 @@ public class GraphQLHttpServiceTest {
miningCoordinatorMock = Mockito.mock(EthHashMiningCoordinator.class);
dataFetcherContext = Mockito.mock(GraphQLDataFetcherContext.class);
dataFetcherContext = Mockito.mock(GraphQLDataFetcherContextImpl.class);
Mockito.when(dataFetcherContext.getBlockchainQueries()).thenReturn(blockchainQueries);
Mockito.when(dataFetcherContext.getMiningCoordinator()).thenReturn(miningCoordinatorMock);
@ -106,12 +107,22 @@ public class GraphQLHttpServiceTest {
private static GraphQLHttpService createGraphQLHttpService(final GraphQLConfiguration config)
throws Exception {
return new GraphQLHttpService(
vertx, folder.newFolder().toPath(), config, graphQL, dataFetcherContext);
vertx,
folder.newFolder().toPath(),
config,
graphQL,
dataFetcherContext,
Mockito.mock(EthScheduler.class));
}
private static GraphQLHttpService createGraphQLHttpService() throws Exception {
return new GraphQLHttpService(
vertx, folder.newFolder().toPath(), createGraphQLConfig(), graphQL, dataFetcherContext);
vertx,
folder.newFolder().toPath(),
createGraphQLConfig(),
graphQL,
dataFetcherContext,
Mockito.mock(EthScheduler.class));
}
private static GraphQLConfiguration createGraphQLConfig() {

@ -97,7 +97,7 @@ public class FilterManagerLogFilterTest {
filterManager.installLogFilter(latest(), latest(), logsQuery());
final Hash blockAddedHash = recordBlockEvents(1).get(0).getBlock().getHash();
verify(blockchainQueries, never()).matchingLogs(eq(100L), eq(100L), eq(logsQuery()));
verify(blockchainQueries, never()).matchingLogs(eq(100L), eq(100L), eq(logsQuery()), any());
verify(privacyQueries).matchingLogs(eq(PRIVACY_GROUP_ID), eq(blockAddedHash), eq(logsQuery()));
}
@ -109,7 +109,7 @@ public class FilterManagerLogFilterTest {
final Hash blockAddedHash = recordBlockEvents(1).get(0).getBlock().getHash();
verify(blockchainQueries, never()).matchingLogs(any(), any());
verify(blockchainQueries, never()).matchingLogs(any(), any(), any());
verify(privacyQueries).matchingLogs(eq(PRIVACY_GROUP_ID), eq(blockAddedHash), eq(logsQuery()));
}
@ -143,7 +143,7 @@ public class FilterManagerLogFilterTest {
filterManager.installLogFilter(latest(), latest(), logsQuery());
recordBlockEvents(1);
verify(blockchainQueries, never()).matchingLogs(anyLong(), anyLong(), any());
verify(blockchainQueries, never()).matchingLogs(anyLong(), anyLong(), any(), any());
}
@Test
@ -151,7 +151,7 @@ public class FilterManagerLogFilterTest {
final List<LogWithMetadata> logs = filterManager.logsChanges("NOT THERE");
assertThat(logs).isNull();
verify(blockchainQueries, never()).matchingLogs(anyLong(), anyLong(), any());
verify(blockchainQueries, never()).matchingLogs(anyLong(), anyLong(), any(), any());
}
@Test
@ -190,7 +190,7 @@ public class FilterManagerLogFilterTest {
public void getLogsForExistingFilterReturnsResults() {
final LogWithMetadata log = logWithMetadata();
when(blockchainQueries.headBlockNumber()).thenReturn(100L);
when(blockchainQueries.matchingLogs(eq(100L), eq(100L), eq(logsQuery())))
when(blockchainQueries.matchingLogs(eq(100L), eq(100L), eq(logsQuery()), any()))
.thenReturn(singletonList(log));
final String filterId = filterManager.installLogFilter(latest(), latest(), logsQuery());
@ -258,7 +258,7 @@ public class FilterManagerLogFilterTest {
final List<LogWithMetadata> logs = filterManager.logs(privateLogFilterId);
verify(blockchainQueries, times(2)).headBlockNumber();
verify(blockchainQueries, never()).matchingLogs(anyLong(), anyLong(), any());
verify(blockchainQueries, never()).matchingLogs(anyLong(), anyLong(), any(), any());
verify(privacyQueries).matchingLogs(eq(PRIVACY_GROUP_ID), anyLong(), anyLong(), any());
assertThat(logs.get(0)).isEqualTo(logWithMetadata);

@ -15,12 +15,15 @@
package org.hyperledger.besu.ethereum.api.jsonrpc.websocket;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import org.hyperledger.besu.ethereum.api.handlers.TimeoutOptions;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.JsonRpcMethod;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.methods.WebSocketMethodsFactory;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.SubscriptionManager;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import java.net.InetSocketAddress;
@ -71,7 +74,13 @@ public class WebSocketHostWhitelistTest {
new WebSocketMethodsFactory(
new SubscriptionManager(new NoOpMetricsSystem()), new HashMap<>())
.methods();
webSocketRequestHandlerSpy = spy(new WebSocketRequestHandler(vertx, websocketMethods));
webSocketRequestHandlerSpy =
spy(
new WebSocketRequestHandler(
vertx,
websocketMethods,
mock(EthScheduler.class),
TimeoutOptions.defaultOptions().getTimeoutSeconds()));
websocketService =
new WebSocketService(vertx, webSocketConfiguration, webSocketRequestHandlerSpy);

@ -19,6 +19,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.ethereum.api.handlers.TimeoutOptions;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequest;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequestContext;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.exception.InvalidJsonRpcParameters;
@ -27,6 +28,7 @@ import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcError;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcErrorResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcSuccessResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.methods.WebSocketRpcRequest;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import java.util.HashMap;
import java.util.Map;
@ -61,7 +63,12 @@ public class WebSocketRequestHandlerTest {
jsonRpcMethodMock = mock(JsonRpcMethod.class);
methods.put("eth_x", jsonRpcMethodMock);
handler = new WebSocketRequestHandler(vertx, methods);
handler =
new WebSocketRequestHandler(
vertx,
methods,
mock(EthScheduler.class),
TimeoutOptions.defaultOptions().getTimeoutSeconds());
}
@After

@ -18,12 +18,15 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.util.Lists.list;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import org.hyperledger.besu.ethereum.api.handlers.TimeoutOptions;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.JsonRpcMethod;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.methods.WebSocketMethodsFactory;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.SubscriptionManager;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import java.net.URISyntaxException;
@ -87,7 +90,13 @@ public class WebSocketServiceLoginTest {
new WebSocketMethodsFactory(
new SubscriptionManager(new NoOpMetricsSystem()), new HashMap<>())
.methods();
webSocketRequestHandlerSpy = spy(new WebSocketRequestHandler(vertx, websocketMethods));
webSocketRequestHandlerSpy =
spy(
new WebSocketRequestHandler(
vertx,
websocketMethods,
mock(EthScheduler.class),
TimeoutOptions.defaultOptions().getTimeoutSeconds()));
websocketService =
new WebSocketService(vertx, websocketConfiguration, webSocketRequestHandlerSpy);

@ -19,9 +19,11 @@ import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verifyNoInteractions;
import org.hyperledger.besu.ethereum.api.handlers.TimeoutOptions;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.JsonRpcMethod;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.methods.WebSocketMethodsFactory;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.SubscriptionManager;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import java.util.Arrays;
@ -45,6 +47,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
@RunWith(VertxUnitRunner.class)
public class WebSocketServiceTest {
@ -69,7 +72,13 @@ public class WebSocketServiceTest {
new WebSocketMethodsFactory(
new SubscriptionManager(new NoOpMetricsSystem()), new HashMap<>())
.methods();
webSocketRequestHandlerSpy = spy(new WebSocketRequestHandler(vertx, websocketMethods));
webSocketRequestHandlerSpy =
spy(
new WebSocketRequestHandler(
vertx,
websocketMethods,
Mockito.mock(EthScheduler.class),
TimeoutOptions.defaultOptions().getTimeoutSeconds()));
websocketService =
new WebSocketService(vertx, websocketConfiguration, webSocketRequestHandlerSpy);

@ -16,12 +16,14 @@ package org.hyperledger.besu.ethereum.api.jsonrpc.websocket.methods;
import static org.assertj.core.api.Assertions.assertThat;
import org.hyperledger.besu.ethereum.api.handlers.TimeoutOptions;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequest;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.WebSocketRequestHandler;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.Subscription;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.SubscriptionManager;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.request.SubscriptionType;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.syncing.SyncingSubscription;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import java.util.HashMap;
@ -37,6 +39,7 @@ import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
@RunWith(VertxUnitRunner.class)
public class EthSubscribeIntegrationTest {
@ -54,7 +57,12 @@ public class EthSubscribeIntegrationTest {
vertx = Vertx.vertx();
subscriptionManager = new SubscriptionManager(new NoOpMetricsSystem());
webSocketMethodsFactory = new WebSocketMethodsFactory(subscriptionManager, new HashMap<>());
webSocketRequestHandler = new WebSocketRequestHandler(vertx, webSocketMethodsFactory.methods());
webSocketRequestHandler =
new WebSocketRequestHandler(
vertx,
webSocketMethodsFactory.methods(),
Mockito.mock(EthScheduler.class),
TimeoutOptions.defaultOptions().getTimeoutSeconds());
}
@Test

@ -15,12 +15,15 @@
package org.hyperledger.besu.ethereum.api.jsonrpc.websocket.methods;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import org.hyperledger.besu.ethereum.api.handlers.TimeoutOptions;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequest;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.WebSocketRequestHandler;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.SubscriptionManager;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.request.SubscribeRequest;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.request.SubscriptionType;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import java.util.HashMap;
@ -49,7 +52,12 @@ public class EthUnsubscribeIntegrationTest {
vertx = Vertx.vertx();
subscriptionManager = new SubscriptionManager(new NoOpMetricsSystem());
webSocketMethodsFactory = new WebSocketMethodsFactory(subscriptionManager, new HashMap<>());
webSocketRequestHandler = new WebSocketRequestHandler(vertx, webSocketMethodsFactory.methods());
webSocketRequestHandler =
new WebSocketRequestHandler(
vertx,
webSocketMethodsFactory.methods(),
mock(EthScheduler.class),
TimeoutOptions.defaultOptions().getTimeoutSeconds());
}
@Test

@ -0,0 +1,74 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.api.query;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.util.Arrays;
import java.util.Collection;
import java.util.function.Supplier;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class BackendQueryTest<T> {
private final Supplier<Boolean> alive;
private final Object wantReturn;
private final boolean wantException;
private final Class<T> wantExceptionClass;
private final String wantExceptionMessage;
public BackendQueryTest(
final Supplier<Boolean> alive,
final Object wantReturn,
final boolean wantException,
final Class<T> wantExceptionClass,
final String wantExceptionMessage) {
this.alive = alive;
this.wantReturn = wantReturn;
this.wantException = wantException;
this.wantExceptionClass = wantExceptionClass;
this.wantExceptionMessage = wantExceptionMessage;
}
@Parameters
public static Collection<Object[]> data() {
return Arrays.asList(
new Object[][] {
{supplierOf(false), null, true, RuntimeException.class, "Timeout expired"},
{supplierOf(true), "expected return", false, null, null}
});
}
private static Supplier<Boolean> supplierOf(final boolean val) {
return () -> val;
}
@Test
public void test() throws Exception {
if (wantException) {
assertThatThrownBy(() -> BackendQuery.runIfAlive(() -> wantReturn, alive))
.isInstanceOf(wantExceptionClass)
.hasMessage(wantExceptionMessage);
} else {
assertThat(BackendQuery.runIfAlive(() -> wantReturn, alive)).isEqualTo(wantReturn);
}
}
}

@ -135,7 +135,7 @@ public class BlockchainQueriesLogCacheTest {
@Test
public void cachedCachedSeamTest() {
for (long i = BLOCKS_PER_BLOOM_CACHE - 3; i <= BLOCKS_PER_BLOOM_CACHE; i++) {
blockchainQueries.matchingLogs(i, i + 2, logsQuery);
blockchainQueries.matchingLogs(i, i + 2, logsQuery, () -> true);
}
// 4 ranges of 3 hits a piece = 12 calls - 97-99, 98-00, 99-01, 00-02
@ -157,7 +157,7 @@ public class BlockchainQueriesLogCacheTest {
@Test
public void cachedUncachedSeamTest() {
for (long i = (2 * BLOCKS_PER_BLOOM_CACHE) - 3; i <= 2 * BLOCKS_PER_BLOOM_CACHE; i++) {
blockchainQueries.matchingLogs(i, i + 2, logsQuery);
blockchainQueries.matchingLogs(i, i + 2, logsQuery, () -> true);
}
// 6 sets of calls on cache side of seam: 97-99, 98-99, 99, {}
@ -184,7 +184,7 @@ public class BlockchainQueriesLogCacheTest {
@Test
public void uncachedUncachedSeamTest() {
for (long i = (3 * BLOCKS_PER_BLOOM_CACHE) - 3; i <= 3 * BLOCKS_PER_BLOOM_CACHE; i++) {
blockchainQueries.matchingLogs(i, i + 2, logsQuery);
blockchainQueries.matchingLogs(i, i + 2, logsQuery, () -> true);
}
// 4 ranges of 3 hits a piece = 12 calls - 97-99, 98-00, 99-01, 00-02

@ -321,7 +321,8 @@ public class BlockchainQueriesTest {
// check that logs have removed = false
List<LogWithMetadata> logs =
data.blockchainQueries.matchingLogs(targetBlock.getHash(), new LogsQuery.Builder().build());
data.blockchainQueries.matchingLogs(
targetBlock.getHash(), new LogsQuery.Builder().build(), () -> true);
assertThat(logs).isNotEmpty();
assertThat(logs).allMatch(l -> !l.isRemoved());
@ -342,7 +343,8 @@ public class BlockchainQueriesTest {
// check that logs have removed = true
logs =
data.blockchainQueries.matchingLogs(targetBlock.getHash(), new LogsQuery.Builder().build());
data.blockchainQueries.matchingLogs(
targetBlock.getHash(), new LogsQuery.Builder().build(), () -> true);
assertThat(logs).isNotEmpty();
assertThat(logs).allMatch(LogWithMetadata::isRemoved);
}
@ -351,7 +353,8 @@ public class BlockchainQueriesTest {
public void matchingLogsShouldReturnAnEmptyListWhenGivenAnInvalidBlockHash() {
final BlockchainWithData data = setupBlockchain(3);
final BlockchainQueries queries = data.blockchainQueries;
List<LogWithMetadata> logs = queries.matchingLogs(Hash.ZERO, new LogsQuery.Builder().build());
List<LogWithMetadata> logs =
queries.matchingLogs(Hash.ZERO, new LogsQuery.Builder().build(), () -> true);
assertThat(logs).isEmpty();
}

Loading…
Cancel
Save