Extract RollingFileReader and RollingFileWriter (#1614)

Extract RollingFileReader and RollingFileWriter to the utility package.

Signed-off-by: Danno Ferrin <danno.ferrin@gmail.com>
pull/1647/head
Danno Ferrin 4 years ago committed by GitHub
parent e22fbc178a
commit 55f7c502d3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 73
      besu/src/main/java/org/hyperledger/besu/cli/subcommands/operator/RestoreState.java
  2. 67
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/StateBackupService.java
  3. 1
      util/build.gradle
  4. 98
      util/src/main/java/org/hyperledger/besu/util/io/RollingFileReader.java
  5. 87
      util/src/main/java/org/hyperledger/besu/util/io/RollingFileWriter.java

@ -38,24 +38,19 @@ import org.hyperledger.besu.ethereum.trie.RestoreVisitor;
import org.hyperledger.besu.ethereum.worldstate.DefaultWorldStateArchive;
import org.hyperledger.besu.ethereum.worldstate.StateTrieAccountValue;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.util.io.RollingFileReader;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.function.BiFunction;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.xerial.snappy.Snappy;
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;
import picocli.CommandLine.ParentCommand;
@ -280,71 +275,7 @@ public class RestoreState implements Runnable {
trieNodeCount++;
}
static class RollingFileReader implements Closeable {
final BiFunction<Integer, Boolean, Path> filenameGenerator;
final boolean compressed;
int currentPosition;
int fileNumber;
FileInputStream in;
final DataInputStream index;
boolean done = false;
RollingFileReader(
final BiFunction<Integer, Boolean, Path> filenameGenerator, final boolean compressed)
throws IOException {
this.filenameGenerator = filenameGenerator;
this.compressed = compressed;
final Path firstInputFile = filenameGenerator.apply(fileNumber, compressed);
in = new FileInputStream(firstInputFile.toFile());
index =
new DataInputStream(
new FileInputStream(StateBackupService.dataFileToIndex(firstInputFile).toFile()));
fileNumber = index.readInt();
currentPosition = index.readUnsignedShort();
}
byte[] readBytes() throws IOException {
byte[] raw;
try {
final int start = currentPosition;
final int nextFile = index.readUnsignedShort();
currentPosition = index.readInt();
if (nextFile == fileNumber) {
final int len = currentPosition - start;
raw = new byte[len];
//noinspection ResultOfMethodCallIgnored
in.read(raw);
} else {
raw = in.readAllBytes();
in.close();
fileNumber = nextFile;
in = new FileInputStream(filenameGenerator.apply(fileNumber, compressed).toFile());
if (currentPosition != 0) {
//noinspection ResultOfMethodCallIgnored
in.skip(currentPosition);
}
}
} catch (final EOFException eofe) {
// this happens when we read the last value, where there is no next index.
raw = in.readAllBytes();
done = true;
}
return compressed ? Snappy.uncompress(raw) : raw;
}
@Override
public void close() throws IOException {
in.close();
index.close();
}
public boolean isDone() {
return done;
}
}
@SuppressWarnings("unused")
BesuController createBesuController() {
private BesuController createBesuController() {
return parentCommand.parentCommand.buildController();
}
}

@ -34,11 +34,8 @@ import org.hyperledger.besu.ethereum.trie.TrieIterator.State;
import org.hyperledger.besu.ethereum.worldstate.StateTrieAccountValue;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.plugin.data.Hash;
import org.hyperledger.besu.util.io.RollingFileWriter;
import java.io.Closeable;
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
@ -51,7 +48,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
import java.util.function.Function;
import com.fasterxml.jackson.annotation.JsonGetter;
@ -60,12 +56,10 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.xerial.snappy.Snappy;
public class StateBackupService {
private static final Logger LOG = LogManager.getLogger();
private static final long MAX_FILE_SIZE = 1 << 28; // 256 MiB max file size
private static final Bytes ACCOUNT_END_MARKER;
static {
@ -75,7 +69,7 @@ public class StateBackupService {
ACCOUNT_END_MARKER = endMarker.encoded();
}
private final String besuVesion;
private final String besuVersion;
private final Lock submissionLock = new ReentrantLock();
private final EthScheduler scheduler;
private final Blockchain blockchain;
@ -86,12 +80,12 @@ public class StateBackupService {
private RollingFileWriter accountFileWriter;
public StateBackupService(
final String besuVesion,
final String besuVersion,
final Blockchain blockchain,
final Path backupDir,
final EthScheduler scheduler,
final WorldStateStorage worldStateStorage) {
this.besuVesion = besuVesion;
this.besuVersion = besuVersion;
this.blockchain = blockchain;
this.backupDir = backupDir;
this.scheduler = scheduler;
@ -194,7 +188,7 @@ public class StateBackupService {
backupStatus.compressed = compress;
backupStatus.currentAccount = Bytes32.ZERO;
backupChaindata();
backupChainData();
backupLeaves();
writeManifest();
@ -208,7 +202,7 @@ public class StateBackupService {
private void writeManifest() throws IOException {
final Map<String, Object> manifest = new HashMap<>();
manifest.put("clientVersion", besuVesion);
manifest.put("clientVersion", besuVersion);
manifest.put("compressed", backupStatus.compressed);
manifest.put("targetBlock", backupStatus.targetBlock);
manifest.put("accountCount", backupStatus.accountCount);
@ -296,7 +290,7 @@ public class StateBackupService {
return State.CONTINUE;
}
private void backupChaindata() throws IOException {
private void backupChainData() throws IOException {
try (final RollingFileWriter headerWriter =
new RollingFileWriter(this::headerFileName, backupStatus.compressed);
final RollingFileWriter bodyWriter =
@ -351,53 +345,6 @@ public class StateBackupService {
return State.CONTINUE;
}
static class RollingFileWriter implements Closeable {
final BiFunction<Integer, Boolean, Path> filenameGenerator;
final boolean compressed;
int currentSize;
int fileNumber;
FileOutputStream out;
final DataOutputStream index;
RollingFileWriter(
final BiFunction<Integer, Boolean, Path> filenameGenerator, final boolean compressed)
throws FileNotFoundException {
this.filenameGenerator = filenameGenerator;
this.compressed = compressed;
currentSize = 0;
fileNumber = 0;
final Path firstOutputFile = filenameGenerator.apply(fileNumber, compressed);
out = new FileOutputStream(firstOutputFile.toFile());
index = new DataOutputStream(new FileOutputStream(dataFileToIndex(firstOutputFile).toFile()));
}
void writeBytes(final byte[] bytes) throws IOException {
final byte[] finalBytes;
if (compressed) {
finalBytes = Snappy.compress(bytes);
} else {
finalBytes = bytes;
}
int pos = currentSize;
currentSize += finalBytes.length;
if (currentSize > MAX_FILE_SIZE) {
out.close();
out = new FileOutputStream(filenameGenerator.apply(++fileNumber, compressed).toFile());
currentSize = finalBytes.length;
pos = 0;
}
index.writeShort(fileNumber);
index.writeInt(pos);
out.write(finalBytes);
}
@Override
public void close() throws IOException {
out.close();
index.close();
}
}
public static final class BackupStatus {
long targetBlock;
long storedBlock;

@ -36,6 +36,7 @@ dependencies {
implementation 'io.vertx:vertx-core'
implementation 'org.apache.tuweni:bytes'
implementation 'org.apache.tuweni:units'
implementation 'org.xerial.snappy:snappy-java'
runtimeOnly 'org.apache.logging.log4j:log4j-core'

@ -0,0 +1,98 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*
*/
package org.hyperledger.besu.util.io;
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.file.Path;
import java.util.function.BiFunction;
import org.xerial.snappy.Snappy;
public class RollingFileReader implements Closeable {
private final BiFunction<Integer, Boolean, Path> filenameGenerator;
private final boolean compressed;
private int currentPosition;
private int fileNumber;
private RandomAccessFile in;
private final RandomAccessFile index;
private boolean done = false;
public RollingFileReader(
final BiFunction<Integer, Boolean, Path> filenameGenerator, final boolean compressed)
throws IOException {
this.filenameGenerator = filenameGenerator;
this.compressed = compressed;
final Path firstInputFile = filenameGenerator.apply(fileNumber, compressed);
in = new RandomAccessFile(firstInputFile.toFile(), "r");
index = new RandomAccessFile(RollingFileWriter.dataFileToIndex(firstInputFile).toFile(), "r");
fileNumber = index.readInt();
currentPosition = index.readUnsignedShort();
}
public byte[] readBytes() throws IOException {
byte[] raw;
try {
final int start = currentPosition;
final int nextFile = index.readUnsignedShort();
currentPosition = index.readInt();
if (nextFile == fileNumber) {
final int len = currentPosition - start;
raw = new byte[len];
in.read(raw);
} else {
raw = new byte[(int) (in.length() - in.getFilePointer())];
in.read(raw);
in.close();
fileNumber = nextFile;
in = new RandomAccessFile(filenameGenerator.apply(fileNumber, compressed).toFile(), "r");
if (currentPosition != 0) {
in.seek(currentPosition);
}
}
} catch (final EOFException e) {
// this happens when we read the last value, where there is no next index.
raw = new byte[(int) (in.length() - in.getFilePointer())];
in.read(raw);
done = true;
}
return compressed ? Snappy.uncompress(raw) : raw;
}
public void seek(final long position) throws IOException {
index.seek(position * 6);
final int oldFile = fileNumber;
fileNumber = index.readUnsignedShort();
currentPosition = index.readInt();
if (oldFile != fileNumber) {
in = new RandomAccessFile(filenameGenerator.apply(fileNumber, compressed).toFile(), "r");
}
in.seek(currentPosition);
}
@Override
public void close() throws IOException {
in.close();
index.close();
}
public boolean isDone() {
return done;
}
}

@ -0,0 +1,87 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*
*/
package org.hyperledger.besu.util.io;
import java.io.Closeable;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Path;
import java.util.function.BiFunction;
import org.xerial.snappy.Snappy;
public class RollingFileWriter implements Closeable {
private static final long MAX_FILE_SIZE = 1 << 30; // 1 GiB max file size
private final BiFunction<Integer, Boolean, Path> filenameGenerator;
private final boolean compressed;
private int currentSize;
private int fileNumber;
private FileOutputStream out;
private final DataOutputStream index;
public RollingFileWriter(
final BiFunction<Integer, Boolean, Path> filenameGenerator, final boolean compressed)
throws FileNotFoundException {
this.filenameGenerator = filenameGenerator;
this.compressed = compressed;
currentSize = 0;
fileNumber = 0;
final Path firstOutputFile = filenameGenerator.apply(fileNumber, compressed);
final File parentDir = firstOutputFile.getParent().toFile();
if (!parentDir.exists()) {
//noinspection ResultOfMethodCallIgnored
parentDir.mkdirs();
}
out = new FileOutputStream(firstOutputFile.toFile());
index = new DataOutputStream(new FileOutputStream(dataFileToIndex(firstOutputFile).toFile()));
}
public static Path dataFileToIndex(final Path dataName) {
return Path.of(dataName.toString().replaceAll("(.*)[-.]\\d\\d\\d\\d\\.(.)dat", "$1.$2idx"));
}
public void writeBytes(final byte[] bytes) throws IOException {
final byte[] finalBytes;
if (compressed) {
finalBytes = Snappy.compress(bytes);
} else {
finalBytes = bytes;
}
int pos = currentSize;
currentSize += finalBytes.length;
if (currentSize > MAX_FILE_SIZE) {
out.close();
out = new FileOutputStream(filenameGenerator.apply(++fileNumber, compressed).toFile());
currentSize = finalBytes.length;
pos = 0;
}
index.writeShort(fileNumber);
index.writeInt(pos);
out.write(finalBytes);
}
@Override
public void close() throws IOException {
out.close();
index.close();
}
}
Loading…
Cancel
Save