[BESU-122] Index tx log bloom bits and use the index for queries. (#245)

* [BESU-122] Index tx log bloom bits and use the index for queries.

This comes in two parts: first a CLI program to generate the log bloom
indexes, then updating BlockchainQueries to use the indexes if present.

First, to create the bloom index on a synced node (for example Goerli):
`bin/besu --network=goerli --data-path /tmp/goerli operator generate-log-bloom-cache`
There are options where to start and to stop.  I estimate 15-30 minutes
for mainnet.

The RPCs should magically use the indexes now.  Note that the last
fragment of 100K blocks is not indexed and uses the old paths.

Signed-off-by: Danno Ferrin <danno.ferrin@gmail.com>
pull/248/head
Danno Ferrin 5 years ago committed by GitHub
parent f87179014f
commit ac9912d5f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 16
      besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java
  2. 294
      besu/src/main/java/org/hyperledger/besu/cli/subcommands/operator/GenerateBlockchainConfig.java
  3. 132
      besu/src/main/java/org/hyperledger/besu/cli/subcommands/operator/GenerateLogBloomCache.java
  4. 286
      besu/src/main/java/org/hyperledger/besu/cli/subcommands/operator/OperatorSubCommand.java
  5. 1
      besu/src/main/java/org/hyperledger/besu/controller/BesuController.java
  6. 10
      besu/src/test/java/org/hyperledger/besu/cli/operator/OperatorSubCommandTest.java
  7. 8
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/graphql/GraphQLDataFetcherContext.java
  8. 6
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/methods/JsonRpcMethodsFactory.java
  9. 85
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/BlockchainQueries.java
  10. 3
      ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/graphql/AbstractEthGraphQLHttpServiceTest.java
  11. 202
      ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/query/BlockchainQueriesLogCacheTest.java

@ -16,6 +16,7 @@ package org.hyperledger.besu;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.isNull;
import static org.hyperledger.besu.controller.BesuController.CACHE_PATH;
import org.hyperledger.besu.cli.config.EthNetworkConfig;
import org.hyperledger.besu.controller.BesuController;
@ -435,7 +436,8 @@ public class RunnerBuilder {
protocolSchedule,
transactionPool,
miningCoordinator,
synchronizer);
synchronizer,
Optional.of(dataDir.resolve(CACHE_PATH)));
final GraphQL graphQL;
try {
graphQL = GraphQLProvider.buildGraphQL(fetchers);
@ -574,7 +576,10 @@ public class RunnerBuilder {
final Vertx vertx, final ProtocolContext<?> context, final TransactionPool transactionPool) {
final FilterManager filterManager =
new FilterManager(
new BlockchainQueries(context.getBlockchain(), context.getWorldStateArchive()),
new BlockchainQueries(
context.getBlockchain(),
context.getWorldStateArchive(),
Optional.of(dataDir.resolve(CACHE_PATH))),
transactionPool,
new FilterIdGenerator(),
new FilterRepository());
@ -622,7 +627,8 @@ public class RunnerBuilder {
privacyParameters,
jsonRpcConfiguration,
webSocketConfiguration,
metricsConfiguration);
metricsConfiguration,
Optional.of(dataDir.resolve(CACHE_PATH)));
methods.putAll(besuController.getAdditionalJsonRpcMethods(jsonRpcApis));
return methods;
}
@ -660,7 +666,9 @@ public class RunnerBuilder {
final SubscriptionManager subscriptionManager) {
final NewBlockHeadersSubscriptionService newBlockHeadersSubscriptionService =
new NewBlockHeadersSubscriptionService(
subscriptionManager, new BlockchainQueries(blockchain, worldStateArchive));
subscriptionManager,
new BlockchainQueries(
blockchain, worldStateArchive, Optional.of(dataDir.resolve(CACHE_PATH))));
blockchain.observeBlockAdded(newBlockHeadersSubscriptionService);
}

@ -0,0 +1,294 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.cli.subcommands.operator;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.nio.charset.StandardCharsets.UTF_8;
import org.hyperledger.besu.cli.DefaultCommandValues;
import org.hyperledger.besu.config.JsonGenesisConfigOptions;
import org.hyperledger.besu.config.JsonUtil;
import org.hyperledger.besu.consensus.ibft.IbftExtraData;
import org.hyperledger.besu.crypto.SECP256K1;
import org.hyperledger.besu.ethereum.core.Address;
import org.hyperledger.besu.ethereum.core.Util;
import org.hyperledger.besu.util.bytes.BytesValue;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.IntStream;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeType;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.io.Resources;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;
import picocli.CommandLine.ParentCommand;
@Command(
name = "generate-blockchain-config",
description = "Generates node keypairs and genesis file with RLP encoded IBFT 2.0 extra data.",
mixinStandardHelpOptions = true)
class GenerateBlockchainConfig implements Runnable {
private static final Logger LOG = LogManager.getLogger();
@Option(
required = true,
names = "--config-file",
paramLabel = DefaultCommandValues.MANDATORY_FILE_FORMAT_HELP,
description = "Configuration file.",
arity = "1..1")
private File configurationFile = null;
@Option(
required = true,
names = "--to",
paramLabel = DefaultCommandValues.MANDATORY_DIRECTORY_FORMAT_HELP,
description = "Directory to write output files to.",
arity = "1..1")
private File outputDirectory = null;
@Option(
names = "--genesis-file-name",
paramLabel = DefaultCommandValues.MANDATORY_PATH_FORMAT_HELP,
description = "Name of the genesis file. (default: ${DEFAULT-VALUE})",
arity = "1..1")
private String genesisFileName = "genesis.json";
@Option(
names = "--private-key-file-name",
paramLabel = DefaultCommandValues.MANDATORY_PATH_FORMAT_HELP,
description = "Name of the private key file. (default: ${DEFAULT-VALUE})",
arity = "1..1")
private String privateKeyFileName = "key.priv";
@Option(
names = "--public-key-file-name",
paramLabel = DefaultCommandValues.MANDATORY_PATH_FORMAT_HELP,
description = "Name of the public key file. (default: ${DEFAULT-VALUE})",
arity = "1..1")
private String publicKeyFileName = "key.pub";
@ParentCommand
private OperatorSubCommand parentCommand; // Picocli injects reference to parent command
private ObjectNode operatorConfig;
private ObjectNode genesisConfig;
private ObjectNode blockchainConfig;
private ObjectNode nodesConfig;
private boolean generateNodesKeys;
private List<Address> addressesForGenesisExtraData = new ArrayList<>();
private Path keysDirectory;
@Override
public void run() {
checkPreconditions();
generateBlockchainConfig();
}
private void checkPreconditions() {
checkNotNull(parentCommand);
checkNotNull(parentCommand.parentCommand);
if (isAnyDuplicate(genesisFileName, publicKeyFileName, privateKeyFileName)) {
throw new IllegalArgumentException("Output file paths must be unique.");
}
}
/** Generates output directory with all required configuration files. */
private void generateBlockchainConfig() {
try {
handleOutputDirectory();
parseConfig();
if (generateNodesKeys) {
generateNodesKeys();
} else {
importPublicKeysFromConfig();
}
processExtraData();
writeGenesisFile(outputDirectory, genesisFileName, genesisConfig);
} catch (final IOException e) {
LOG.error("An error occurred while trying to generate network configuration.", e);
}
}
/** Imports public keys from input configuration. */
private void importPublicKeysFromConfig() {
LOG.info("Importing public keys from configuration.");
JsonUtil.getArrayNode(nodesConfig, "keys")
.ifPresent(keys -> keys.forEach(this::importPublicKey));
}
/**
* Imports a single public key.
*
* @param publicKeyJson The public key.
*/
private void importPublicKey(final JsonNode publicKeyJson) {
if (publicKeyJson.getNodeType() != JsonNodeType.STRING) {
throw new IllegalArgumentException(
"Invalid key json of type: " + publicKeyJson.getNodeType());
}
final String publicKeyText = publicKeyJson.asText();
try {
final SECP256K1.PublicKey publicKey =
SECP256K1.PublicKey.create(BytesValue.fromHexString(publicKeyText));
writeKeypair(publicKey, null);
LOG.info("Public key imported from configuration.({})", publicKey.toString());
} catch (final IOException e) {
LOG.error("An error occurred while trying to import node public key.", e);
}
}
/** Generates nodes keypairs. */
private void generateNodesKeys() {
final int nodesCount = JsonUtil.getInt(nodesConfig, "count", 0);
LOG.info("Generating {} nodes keys.", nodesCount);
IntStream.range(0, nodesCount).forEach(this::generateNodeKeypair);
}
/**
* Generate a keypair for a node.
*
* @param node The number of the node.
*/
private void generateNodeKeypair(final int node) {
try {
LOG.info("Generating keypair for node {}.", node);
final SECP256K1.KeyPair keyPair = SECP256K1.KeyPair.generate();
writeKeypair(keyPair.getPublicKey(), keyPair.getPrivateKey());
} catch (final IOException e) {
LOG.error("An error occurred while trying to generate node keypair.", e);
}
}
/**
* Writes public and private keys in separate files. Both are written in the same directory named
* with the address derived from the public key.
*
* @param publicKey The public key.
* @param privateKey The private key. No file is created if privateKey is NULL.
* @throws IOException
*/
private void writeKeypair(
final SECP256K1.PublicKey publicKey, final SECP256K1.PrivateKey privateKey)
throws IOException {
final Address nodeAddress = Util.publicKeyToAddress(publicKey);
addressesForGenesisExtraData.add(nodeAddress);
final Path nodeDirectoryPath = keysDirectory.resolve(nodeAddress.toString());
Files.createDirectory(nodeDirectoryPath);
createFileAndWrite(nodeDirectoryPath, publicKeyFileName, publicKey.toString());
if (privateKey != null) {
createFileAndWrite(nodeDirectoryPath, privateKeyFileName, privateKey.toString());
}
}
/** Computes RLP encoded exta data from pre filled list of addresses. */
private void processExtraData() {
final ObjectNode configNode = JsonUtil.getObjectNode(genesisConfig, "config").orElse(null);
final JsonGenesisConfigOptions genesisConfigOptions =
JsonGenesisConfigOptions.fromJsonObject(configNode);
if (genesisConfigOptions.isIbft2()) {
LOG.info("Generating IBFT extra data.");
final String extraData =
IbftExtraData.fromAddresses(addressesForGenesisExtraData).encode().toString();
genesisConfig.put("extraData", extraData);
}
}
private void createFileAndWrite(final Path directory, final String fileName, final String content)
throws IOException {
final Path filePath = directory.resolve(fileName);
Files.write(filePath, content.getBytes(UTF_8), StandardOpenOption.CREATE_NEW);
}
/**
* Parses the root configuration file and related sub elements.
*
* @throws IOException
*/
private void parseConfig() throws IOException {
final String configString =
Resources.toString(configurationFile.toPath().toUri().toURL(), UTF_8);
final ObjectNode root = JsonUtil.objectNodeFromString(configString);
operatorConfig = root;
genesisConfig =
JsonUtil.getObjectNode(operatorConfig, "genesis").orElse(JsonUtil.createEmptyObjectNode());
blockchainConfig =
JsonUtil.getObjectNode(operatorConfig, "blockchain")
.orElse(JsonUtil.createEmptyObjectNode());
nodesConfig =
JsonUtil.getObjectNode(blockchainConfig, "nodes").orElse(JsonUtil.createEmptyObjectNode());
generateNodesKeys = JsonUtil.getBoolean(nodesConfig, "generate", false);
}
/**
* Checks if the output directory exists.
*
* @throws IOException
*/
private void handleOutputDirectory() throws IOException {
checkNotNull(outputDirectory);
final Path outputDirectoryPath = outputDirectory.toPath();
if (outputDirectory.exists()
&& outputDirectory.isDirectory()
&& outputDirectory.list() != null
&& outputDirectory.list().length > 0) {
throw new IllegalArgumentException("Output directory must be empty.");
} else if (!outputDirectory.exists()) {
Files.createDirectory(outputDirectoryPath);
}
keysDirectory = outputDirectoryPath.resolve("keys");
Files.createDirectory(keysDirectory);
}
/**
* Write the content of the genesis to the output file.
*
* @param directory The directory to write the file to.
* @param fileName The name of the output file.
* @param genesis The genesis content.
* @throws IOException
*/
private void writeGenesisFile(
final File directory, final String fileName, final ObjectNode genesis) throws IOException {
LOG.info("Writing genesis file.");
Files.write(
directory.toPath().resolve(fileName),
JsonUtil.getJson(genesis).getBytes(UTF_8),
StandardOpenOption.CREATE_NEW);
}
private static boolean isAnyDuplicate(final String... values) {
final Set<String> set = new HashSet<>();
for (final String value : values) {
if (!set.add(value)) {
return true;
}
}
return false;
}
}

@ -0,0 +1,132 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*
*/
package org.hyperledger.besu.cli.subcommands.operator;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static org.hyperledger.besu.cli.DefaultCommandValues.MANDATORY_LONG_FORMAT_HELP;
import org.hyperledger.besu.controller.BesuController;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.plugin.data.BlockHeader;
import java.io.File;
import java.io.FileOutputStream;
import java.nio.file.Path;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;
import picocli.CommandLine.ParentCommand;
@Command(
name = "generate-log-bloom-cache",
description = "Generate cached values of block log bloom filters.",
mixinStandardHelpOptions = true)
public class GenerateLogBloomCache implements Runnable {
private static final Logger LOG = LogManager.getLogger();
public static final int BLOCKS_PER_FILE = 100_000;
public static final String CACHE_DIRECTORY_NAME = "caches";
@Option(
names = "--start-block",
paramLabel = MANDATORY_LONG_FORMAT_HELP,
description =
"The block to start generating indexes. Must be an increment of "
+ BLOCKS_PER_FILE
+ " (default: ${DEFAULT-VALUE})",
arity = "1..1")
private final Long startBlock = 0L;
@Option(
names = "--end-block",
paramLabel = MANDATORY_LONG_FORMAT_HELP,
description =
"The block to start generating indexes (exclusive). (default: last block divisible by "
+ BLOCKS_PER_FILE
+ ")",
arity = "1..1")
private final Long endBlock = -1L;
@ParentCommand private OperatorSubCommand parentCommand;
@Override
public void run() {
checkPreconditions();
generateLogBloomCache();
}
private void checkPreconditions() {
checkNotNull(parentCommand.parentCommand.dataDir());
checkState(
startBlock % BLOCKS_PER_FILE == 0,
"Start block must be an even increment of %s",
BLOCKS_PER_FILE);
}
@SuppressWarnings("ResultOfMethodCallIgnored")
private void generateLogBloomCache() {
final Path cacheDir = parentCommand.parentCommand.dataDir().resolve(CACHE_DIRECTORY_NAME);
cacheDir.toFile().mkdirs();
generateLogBloomCache(
startBlock,
endBlock,
cacheDir,
createBesuController().getProtocolContext().getBlockchain());
}
public static void generateLogBloomCache(
final long start, final long stop, final Path cacheDir, final Blockchain blockchain) {
final long stopBlock =
stop < 0
? (blockchain.getChainHeadBlockNumber() / BLOCKS_PER_FILE) * BLOCKS_PER_FILE
: stop;
LOG.debug("Start block: {} Stop block: {} Path: {}", start, stopBlock, cacheDir);
checkArgument(start % BLOCKS_PER_FILE == 0, "Start block must be at the beginning of a file");
checkArgument(stopBlock % BLOCKS_PER_FILE == 0, "End block must be at the beginning of a file");
try {
FileOutputStream fos = null;
for (long blockNum = start; blockNum < stopBlock; blockNum++) {
if (blockNum % BLOCKS_PER_FILE == 0 || fos == null) {
LOG.info("Indexing block {}", blockNum);
if (fos != null) {
fos.close();
}
fos = new FileOutputStream(createCacheFile(blockNum, cacheDir));
}
final BlockHeader header = blockchain.getBlockHeader(blockNum).orElseThrow();
final byte[] logs = header.getLogsBloom().getByteArray();
checkNotNull(logs);
checkState(logs.length == 256, "BloomBits are not the correct length");
fos.write(logs);
}
} catch (final Exception e) {
LOG.error("Unhandled indexing exception", e);
}
}
private static File createCacheFile(final long blockNumber, final Path cacheDir) {
return cacheDir.resolve("logBloom-" + (blockNumber / BLOCKS_PER_FILE) + ".index").toFile();
}
private BesuController<?> createBesuController() {
return parentCommand.parentCommand.buildController();
}
}

@ -14,52 +14,24 @@
*/
package org.hyperledger.besu.cli.subcommands.operator;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.hyperledger.besu.cli.subcommands.operator.OperatorSubCommand.COMMAND_NAME;
import org.hyperledger.besu.cli.BesuCommand;
import org.hyperledger.besu.cli.DefaultCommandValues;
import org.hyperledger.besu.config.JsonGenesisConfigOptions;
import org.hyperledger.besu.config.JsonUtil;
import org.hyperledger.besu.consensus.ibft.IbftExtraData;
import org.hyperledger.besu.crypto.SECP256K1;
import org.hyperledger.besu.ethereum.core.Address;
import org.hyperledger.besu.ethereum.core.Util;
import org.hyperledger.besu.util.bytes.BytesValue;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.IntStream;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeType;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.io.Resources;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import picocli.CommandLine.Command;
import picocli.CommandLine.Model.CommandSpec;
import picocli.CommandLine.Option;
import picocli.CommandLine.ParentCommand;
import picocli.CommandLine.Spec;
/** Operator related sub-command */
@Command(
name = COMMAND_NAME,
description = "This command provides operator related actions.",
description = "Operator related actions such as generating configuration and caches.",
mixinStandardHelpOptions = true,
subcommands = {OperatorSubCommand.GenerateNetworkConfigSubCommand.class})
subcommands = {GenerateBlockchainConfig.class, GenerateLogBloomCache.class})
public class OperatorSubCommand implements Runnable {
private static final Logger LOG = LogManager.getLogger();
public static final String COMMAND_NAME = "operator";
public static final String GENERATE_BLOCKCHAIN_CONFIG_SUBCOMMAND_NAME =
@ -67,7 +39,7 @@ public class OperatorSubCommand implements Runnable {
@SuppressWarnings("unused")
@ParentCommand
private BesuCommand parentCommand; // Picocli injects reference to parent command
BesuCommand parentCommand; // Picocli injects reference to parent command
@SuppressWarnings("unused")
@Spec
@ -83,256 +55,4 @@ public class OperatorSubCommand implements Runnable {
public void run() {
spec.commandLine().usage(out);
}
@Command(
name = "generate-blockchain-config",
description =
"This command generates node keypairs, genesis file (with RLP encoded IBFT 2.0 extra data).",
mixinStandardHelpOptions = true)
static class GenerateNetworkConfigSubCommand implements Runnable {
@Option(
required = true,
names = "--config-file",
paramLabel = DefaultCommandValues.MANDATORY_FILE_FORMAT_HELP,
description = "Configuration file.",
arity = "1..1")
private File configurationFile = null;
@Option(
required = true,
names = "--to",
paramLabel = DefaultCommandValues.MANDATORY_DIRECTORY_FORMAT_HELP,
description = "Directory to write output files to.",
arity = "1..1")
private File outputDirectory = null;
@Option(
names = "--genesis-file-name",
paramLabel = DefaultCommandValues.MANDATORY_PATH_FORMAT_HELP,
description = "Name of the genesis file. (default: ${DEFAULT-VALUE})",
arity = "1..1")
private String genesisFileName = "genesis.json";
@Option(
names = "--private-key-file-name",
paramLabel = DefaultCommandValues.MANDATORY_PATH_FORMAT_HELP,
description = "Name of the private key file. (default: ${DEFAULT-VALUE})",
arity = "1..1")
private String privateKeyFileName = "key.priv";
@Option(
names = "--public-key-file-name",
paramLabel = DefaultCommandValues.MANDATORY_PATH_FORMAT_HELP,
description = "Name of the public key file. (default: ${DEFAULT-VALUE})",
arity = "1..1")
private String publicKeyFileName = "key.pub";
@SuppressWarnings("unused")
@ParentCommand
private OperatorSubCommand parentCommand; // Picocli injects reference to parent command
private ObjectNode operatorConfig;
private ObjectNode genesisConfig;
private ObjectNode blockchainConfig;
private ObjectNode nodesConfig;
private boolean generateNodesKeys;
private List<Address> addressesForGenesisExtraData = new ArrayList<>();
private Path keysDirectory;
@Override
public void run() {
checkPreconditions();
generateBlockchainConfig();
}
private void checkPreconditions() {
checkNotNull(parentCommand);
checkNotNull(parentCommand.parentCommand);
if (isAnyDuplicate(genesisFileName, publicKeyFileName, privateKeyFileName)) {
throw new IllegalArgumentException("Output file paths must be unique.");
}
}
/** Generates output directory with all required configuration files. */
private void generateBlockchainConfig() {
try {
handleOutputDirectory();
parseConfig();
if (generateNodesKeys) {
generateNodesKeys();
} else {
importPublicKeysFromConfig();
}
processExtraData();
writeGenesisFile(outputDirectory, genesisFileName, genesisConfig);
} catch (IOException e) {
LOG.error("An error occurred while trying to generate network configuration.", e);
}
}
/** Imports public keys from input configuration. */
private void importPublicKeysFromConfig() {
LOG.info("Importing public keys from configuration.");
JsonUtil.getArrayNode(nodesConfig, "keys")
.ifPresent(keys -> keys.forEach(this::importPublicKey));
}
/**
* Imports a single public key.
*
* @param publicKeyJson The public key.
*/
private void importPublicKey(final JsonNode publicKeyJson) {
if (publicKeyJson.getNodeType() != JsonNodeType.STRING) {
throw new IllegalArgumentException(
"Invalid key json of type: " + publicKeyJson.getNodeType());
}
String publicKeyText = publicKeyJson.asText();
try {
final SECP256K1.PublicKey publicKey =
SECP256K1.PublicKey.create(BytesValue.fromHexString(publicKeyText));
writeKeypair(publicKey, null);
LOG.info("Public key imported from configuration.({})", publicKey.toString());
} catch (IOException e) {
LOG.error("An error occurred while trying to import node public key.", e);
}
}
/** Generates nodes keypairs. */
private void generateNodesKeys() {
final int nodesCount = JsonUtil.getInt(nodesConfig, "count", 0);
LOG.info("Generating {} nodes keys.", nodesCount);
IntStream.range(0, nodesCount).forEach(this::generateNodeKeypair);
}
/**
* Generate a keypair for a node.
*
* @param node The number of the node.
*/
private void generateNodeKeypair(final int node) {
try {
LOG.info("Generating keypair for node {}.", node);
final SECP256K1.KeyPair keyPair = SECP256K1.KeyPair.generate();
writeKeypair(keyPair.getPublicKey(), keyPair.getPrivateKey());
} catch (IOException e) {
LOG.error("An error occurred while trying to generate node keypair.", e);
}
}
/**
* Writes public and private keys in separate files. Both are written in the same directory
* named with the address derived from the public key.
*
* @param publicKey The public key.
* @param privateKey The private key. No file is created if privateKey is NULL.
* @throws IOException
*/
private void writeKeypair(
final SECP256K1.PublicKey publicKey, final SECP256K1.PrivateKey privateKey)
throws IOException {
final Address nodeAddress = Util.publicKeyToAddress(publicKey);
addressesForGenesisExtraData.add(nodeAddress);
final Path nodeDirectoryPath = keysDirectory.resolve(nodeAddress.toString());
Files.createDirectory(nodeDirectoryPath);
createFileAndWrite(nodeDirectoryPath, publicKeyFileName, publicKey.toString());
if (privateKey != null) {
createFileAndWrite(nodeDirectoryPath, privateKeyFileName, privateKey.toString());
}
}
/**
* Computes RLP encoded exta data from pre filled list of addresses.
*
* @throws IOException
*/
private void processExtraData() {
final ObjectNode configNode = JsonUtil.getObjectNode(genesisConfig, "config").orElse(null);
final JsonGenesisConfigOptions genesisConfigOptions =
JsonGenesisConfigOptions.fromJsonObject(configNode);
if (genesisConfigOptions.isIbft2()) {
LOG.info("Generating IBFT extra data.");
final String extraData =
IbftExtraData.fromAddresses(addressesForGenesisExtraData).encode().toString();
genesisConfig.put("extraData", extraData);
}
}
private void createFileAndWrite(
final Path directory, final String fileName, final String content) throws IOException {
final Path filePath = directory.resolve(fileName);
Files.write(filePath, content.getBytes(UTF_8), StandardOpenOption.CREATE_NEW);
}
/**
* Parses the root configuration file and related sub elements.
*
* @throws IOException
*/
private void parseConfig() throws IOException {
final String configString =
Resources.toString(configurationFile.toPath().toUri().toURL(), UTF_8);
final ObjectNode root = JsonUtil.objectNodeFromString(configString);
operatorConfig = root;
genesisConfig =
JsonUtil.getObjectNode(operatorConfig, "genesis")
.orElse(JsonUtil.createEmptyObjectNode());
blockchainConfig =
JsonUtil.getObjectNode(operatorConfig, "blockchain")
.orElse(JsonUtil.createEmptyObjectNode());
nodesConfig =
JsonUtil.getObjectNode(blockchainConfig, "nodes")
.orElse(JsonUtil.createEmptyObjectNode());
generateNodesKeys = JsonUtil.getBoolean(nodesConfig, "generate", false);
}
/**
* Checks if the output directory exists.
*
* @throws IOException
*/
private void handleOutputDirectory() throws IOException {
checkNotNull(outputDirectory);
final Path outputDirectoryPath = outputDirectory.toPath();
if (outputDirectory.exists()
&& outputDirectory.isDirectory()
&& outputDirectory.list() != null
&& outputDirectory.list().length > 0) {
throw new IllegalArgumentException("Output directory must be empty.");
} else if (!outputDirectory.exists()) {
Files.createDirectory(outputDirectoryPath);
}
keysDirectory = outputDirectoryPath.resolve("keys");
Files.createDirectory(keysDirectory);
}
/**
* Write the content of the genesis to the output file.
*
* @param directory The directory to write the file to.
* @param fileName The name of the output file.
* @param genesis The genesis content.
* @throws IOException
*/
private void writeGenesisFile(
final File directory, final String fileName, final ObjectNode genesis) throws IOException {
LOG.info("Writing genesis file.");
Files.write(
directory.toPath().resolve(fileName),
JsonUtil.getJson(genesis).getBytes(UTF_8),
StandardOpenOption.CREATE_NEW);
}
}
private static boolean isAnyDuplicate(final String... values) {
final Set<String> set = new HashSet<>();
for (String value : values) {
if (!set.add(value)) {
return true;
}
}
return false;
}
}

@ -46,6 +46,7 @@ public class BesuController<C> implements java.io.Closeable {
private static final Logger LOG = LogManager.getLogger();
public static final String DATABASE_PATH = "database";
public static final String CACHE_PATH = "caches";
private final ProtocolSchedule<C> protocolSchedule;
private final ProtocolContext<C> protocolContext;
private final EthProtocolManager ethProtocolManager;

@ -54,7 +54,7 @@ public class OperatorSubCommandTest extends CommandTestAbstract {
private static final String EXPECTED_OPERATOR_USAGE =
"Usage: besu operator [-hV] [COMMAND]"
+ System.lineSeparator()
+ "This command provides operator related actions."
+ "Operator related actions such as generating configuration and caches."
+ System.lineSeparator()
+ " -h, --help Show this help message and exit."
+ System.lineSeparator()
@ -62,9 +62,11 @@ public class OperatorSubCommandTest extends CommandTestAbstract {
+ System.lineSeparator()
+ "Commands:"
+ System.lineSeparator()
+ " generate-blockchain-config This command generates node keypairs, genesis"
+ " generate-blockchain-config Generates node keypairs and genesis file with RLP"
+ System.lineSeparator()
+ " file (with RLP encoded IBFT 2.0 extra data).";
+ " encoded IBFT 2.0 extra data."
+ System.lineSeparator()
+ " generate-log-bloom-cache Generate cached values of block log bloom filters.";
private Path tmpOutputDirectoryPath;
@ -259,7 +261,7 @@ public class OperatorSubCommandTest extends CommandTestAbstract {
}
String[] argsArray() {
String[] wrapper = new String[] {};
final String[] wrapper = new String[] {};
return args.toArray(wrapper);
}
}

@ -22,6 +22,9 @@ import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive;
import java.nio.file.Path;
import java.util.Optional;
public class GraphQLDataFetcherContext {
private final BlockchainQueries blockchain;
@ -36,8 +39,9 @@ public class GraphQLDataFetcherContext {
final ProtocolSchedule<?> protocolSchedule,
final TransactionPool transactionPool,
final MiningCoordinator miningCoordinator,
final Synchronizer synchronizer) {
this.blockchain = new BlockchainQueries(blockchain, worldStateArchive);
final Synchronizer synchronizer,
final Optional<Path> cachePath) {
this.blockchain = new BlockchainQueries(blockchain, worldStateArchive, cachePath);
this.protocolSchedule = protocolSchedule;
this.miningCoordinator = miningCoordinator;
this.synchronizer = synchronizer;

@ -37,6 +37,7 @@ import org.hyperledger.besu.metrics.ObservableMetricsSystem;
import org.hyperledger.besu.metrics.prometheus.MetricsConfiguration;
import java.math.BigInteger;
import java.nio.file.Path;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
@ -66,9 +67,10 @@ public class JsonRpcMethodsFactory {
final PrivacyParameters privacyParameters,
final JsonRpcConfiguration jsonRpcConfiguration,
final WebSocketConfiguration webSocketConfiguration,
final MetricsConfiguration metricsConfiguration) {
final MetricsConfiguration metricsConfiguration,
final Optional<Path> cachePath) {
final BlockchainQueries blockchainQueries =
new BlockchainQueries(blockchain, worldStateArchive);
new BlockchainQueries(blockchain, worldStateArchive, cachePath);
return methods(
clientVersion,
networkId,

@ -25,6 +25,7 @@ import org.hyperledger.besu.ethereum.core.BlockBody;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Hash;
import org.hyperledger.besu.ethereum.core.LogWithMetadata;
import org.hyperledger.besu.ethereum.core.LogsBloomFilter;
import org.hyperledger.besu.ethereum.core.MutableWorldState;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.core.TransactionReceipt;
@ -34,6 +35,11 @@ import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive;
import org.hyperledger.besu.util.bytes.BytesValue;
import org.hyperledger.besu.util.uint.UInt256;
import java.io.EOFException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -44,14 +50,29 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import com.google.common.annotations.VisibleForTesting;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class BlockchainQueries {
private static final Logger LOG = LogManager.getLogger();
@VisibleForTesting static final long BLOCKS_PER_BLOOM_CACHE = 100_000;
private final WorldStateArchive worldStateArchive;
private final Blockchain blockchain;
private final Optional<Path> cachePath;
public BlockchainQueries(final Blockchain blockchain, final WorldStateArchive worldStateArchive) {
this(blockchain, worldStateArchive, Optional.empty());
}
public BlockchainQueries(
final Blockchain blockchain,
final WorldStateArchive worldStateArchive,
final Optional<Path> cachePath) {
this.blockchain = blockchain;
this.worldStateArchive = worldStateArchive;
this.cachePath = cachePath;
}
public Blockchain getBlockchain() {
@ -486,6 +507,39 @@ public class BlockchainQueries {
*/
public List<LogWithMetadata> matchingLogs(
final long fromBlockNumber, final long toBlockNumber, final LogsQuery query) {
final List<LogWithMetadata> result = new ArrayList<>();
final long startSegment = fromBlockNumber / BLOCKS_PER_BLOOM_CACHE;
final long endSegment = toBlockNumber / BLOCKS_PER_BLOOM_CACHE;
long currentStep = fromBlockNumber;
for (long segment = startSegment; segment <= endSegment; segment++) {
final long thisSegment = segment;
final long thisStep = currentStep;
final long nextStep = (segment + 1) * BLOCKS_PER_BLOOM_CACHE;
result.addAll(
cachePath
.map(path -> path.resolve("logBloom-" + thisSegment + ".index"))
.filter(Files::isRegularFile)
.map(
cacheFile ->
matchingLogsCached(
thisSegment * BLOCKS_PER_BLOOM_CACHE,
thisStep % BLOCKS_PER_BLOOM_CACHE,
Math.min(toBlockNumber, nextStep - 1) % BLOCKS_PER_BLOOM_CACHE,
query,
cacheFile))
.orElseGet(
() ->
matchingLogsUncached(
thisStep,
Math.min(toBlockNumber, Math.min(toBlockNumber, nextStep - 1)),
query)));
currentStep = nextStep;
}
return result;
}
private List<LogWithMetadata> matchingLogsUncached(
final long fromBlockNumber, final long toBlockNumber, final LogsQuery query) {
// rangeClosed handles the inverted from/to situations automatically with zero results.
return LongStream.rangeClosed(fromBlockNumber, toBlockNumber)
.mapToObj(blockchain::getBlockHeader)
@ -499,6 +553,37 @@ public class BlockchainQueries {
.collect(Collectors.toList());
}
private List<LogWithMetadata> matchingLogsCached(
final long segmentStart,
final long offset,
final long endOffset,
final LogsQuery query,
final Path cacheFile) {
final List<LogWithMetadata> results = new ArrayList<>();
try (final RandomAccessFile raf = new RandomAccessFile(cacheFile.toFile(), "r")) {
raf.seek(offset * 256);
final byte[] bloomBuff = new byte[256];
final BytesValue bytesValue = BytesValue.wrap(bloomBuff);
for (long pos = offset; pos <= endOffset; pos++) {
try {
raf.readFully(bloomBuff);
} catch (final EOFException eofe) {
break;
}
final LogsBloomFilter logsBloom = new LogsBloomFilter(bytesValue);
if (query.couldMatch(logsBloom)) {
results.addAll(
matchingLogs(
blockchain.getBlockHashByNumber(segmentStart + pos).orElseThrow(), query));
}
}
} catch (final IOException e) {
e.printStackTrace(System.out);
LOG.error("Error reading cached log blooms", e);
}
return results;
}
public List<LogWithMetadata> matchingLogs(final Hash blockHash, final LogsQuery query) {
final Optional<BlockHeader> blockHeader = blockchain.getBlockHeader(blockHash);
if (blockHeader.isEmpty()) {

@ -167,7 +167,8 @@ public abstract class AbstractEthGraphQLHttpServiceTest {
PROTOCOL_SCHEDULE,
transactionPoolMock,
miningCoordinatorMock,
synchronizerMock);
synchronizerMock,
Optional.empty());
final GraphQLDataFetchers dataFetchers = new GraphQLDataFetchers(supportedCapabilities);
final GraphQL graphQL = GraphQLProvider.buildGraphQL(dataFetchers);

@ -0,0 +1,202 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*
*/
package org.hyperledger.besu.ethereum.api.query;
import static org.hyperledger.besu.ethereum.api.query.BlockchainQueries.BLOCKS_PER_BLOOM_CACHE;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.Address;
import org.hyperledger.besu.ethereum.core.BlockBody;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Hash;
import org.hyperledger.besu.ethereum.core.Log;
import org.hyperledger.besu.ethereum.core.LogsBloomFilter;
import org.hyperledger.besu.ethereum.mainnet.MainnetBlockHeaderFunctions;
import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive;
import org.hyperledger.besu.util.bytes.BytesValue;
import org.hyperledger.besu.util.uint.UInt256;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class BlockchainQueriesLogCacheTest {
@ClassRule public static TemporaryFolder cacheDir = new TemporaryFolder();
private static LogsQuery logsQuery;
private Hash testHash;
private static LogsBloomFilter testLogsBloomFilter;
@Mock Blockchain blockchain;
@Mock WorldStateArchive worldStateArchive;
@BeforeClass
public static void setupClass() throws IOException {
final Address testAddress = Address.fromHexString("0x123456");
final BytesValue testMessage = BytesValue.fromHexString("0x9876");
final Log testLog = new Log(testAddress, testMessage, List.of());
testLogsBloomFilter = new LogsBloomFilter();
testLogsBloomFilter.insertLog(testLog);
logsQuery = new LogsQuery(List.of(testAddress), List.of());
for (int i = 0; i < 2; i++) {
final RandomAccessFile file =
new RandomAccessFile(cacheDir.newFile("logBloom-" + i + ".index"), "rws");
writeThreeEntries(testLogsBloomFilter, file);
file.seek((BLOCKS_PER_BLOOM_CACHE - 3) * LogsBloomFilter.BYTE_SIZE);
writeThreeEntries(testLogsBloomFilter, file);
}
}
private static void writeThreeEntries(final LogsBloomFilter filter, final RandomAccessFile file)
throws IOException {
file.write(filter.getByteArray());
file.write(filter.getByteArray());
file.write(filter.getByteArray());
}
@Before
public void setup() {
final BlockHeader fakeHeader =
new BlockHeader(
Hash.EMPTY,
Hash.EMPTY,
Address.ZERO,
Hash.EMPTY,
Hash.EMPTY,
Hash.EMPTY,
testLogsBloomFilter,
UInt256.ZERO,
0,
0,
0,
0,
BytesValue.EMPTY,
Hash.EMPTY,
0,
new MainnetBlockHeaderFunctions());
testHash = fakeHeader.getHash();
final BlockBody fakeBody = new BlockBody(Collections.emptyList(), Collections.emptyList());
when(blockchain.getBlockHashByNumber(anyLong())).thenReturn(Optional.of(testHash));
when(blockchain.getBlockHeader(any())).thenReturn(Optional.of(fakeHeader));
when(blockchain.getBlockHeader(anyLong())).thenReturn(Optional.of(fakeHeader));
when(blockchain.getTxReceipts(any())).thenReturn(Optional.of(Collections.emptyList()));
when(blockchain.getBlockBody(any())).thenReturn(Optional.of(fakeBody));
}
/**
* Tests fours sets of a three block range where the seam (where the segment changes) is in all
* possible positions in the range.
*
* <p>For this test both sides of the seam are cached.
*/
@Test
public void cachedCachedSeamTest() {
final BlockchainQueries query =
new BlockchainQueries(
blockchain, worldStateArchive, Optional.of(cacheDir.getRoot().toPath()));
for (long i = BLOCKS_PER_BLOOM_CACHE - 3; i <= BLOCKS_PER_BLOOM_CACHE; i++) {
query.matchingLogs(i, i + 2, logsQuery);
}
// 4 ranges of 3 hits a piece = 12 calls - 97-99, 98-00, 99-01, 00-02
verify(blockchain, times(12)).getBlockHashByNumber(anyLong());
verify(blockchain, times(12)).getBlockHeader(testHash);
verify(blockchain, times(12)).getTxReceipts(testHash);
verify(blockchain, times(12)).getBlockBody(testHash);
verify(blockchain, times(12)).blockIsOnCanonicalChain(testHash);
verifyNoMoreInteractions(blockchain);
}
/**
* Tests fours sets of a three block range where the seam (where the segment changes) is in all
* possible positions in the range.
*
* <p>For this test the low side is cached the high side is uncached.
*/
@Test
public void cachedUncachedSeamTest() {
final BlockchainQueries query =
new BlockchainQueries(
blockchain, worldStateArchive, Optional.of(cacheDir.getRoot().toPath()));
for (long i = (2 * BLOCKS_PER_BLOOM_CACHE) - 3; i <= 2 * BLOCKS_PER_BLOOM_CACHE; i++) {
query.matchingLogs(i, i + 2, logsQuery);
}
// 6 sets of calls on cache side of seam: 97-99, 98-99, 99, {}
verify(blockchain, times(6)).getBlockHashByNumber(anyLong());
// 6 sets of calls on uncached side of seam: {}, 00, 00-01, 00-02
verify(blockchain, times(6)).getBlockHeader(anyLong());
// called on both halves of the seam
verify(blockchain, times(12)).getBlockHeader(testHash);
verify(blockchain, times(12)).getTxReceipts(testHash);
verify(blockchain, times(12)).getBlockBody(testHash);
verify(blockchain, times(12)).blockIsOnCanonicalChain(testHash);
verifyNoMoreInteractions(blockchain);
}
/**
* Tests fours sets of a three block range where the seam (where the segment changes) is in all
* possible positions in the range.
*
* <p>For this test the both sides are uncached.
*/
@Test
public void uncachedUncachedSeamTest() {
final BlockchainQueries query =
new BlockchainQueries(
blockchain, worldStateArchive, Optional.of(cacheDir.getRoot().toPath()));
for (long i = (3 * BLOCKS_PER_BLOOM_CACHE) - 3; i <= 3 * BLOCKS_PER_BLOOM_CACHE; i++) {
query.matchingLogs(i, i + 2, logsQuery);
}
// 4 ranges of 3 hits a piece = 12 calls - 97-99, 98-00, 99-01, 00-02
verify(blockchain, times(12)).getBlockHeader(anyLong());
verify(blockchain, times(12)).getBlockHeader(testHash);
verify(blockchain, times(12)).getTxReceipts(testHash);
verify(blockchain, times(12)).getBlockBody(testHash);
verify(blockchain, times(12)).blockIsOnCanonicalChain(testHash);
verifyNoMoreInteractions(blockchain);
}
}
Loading…
Cancel
Save