diff --git a/besu/src/main/java/org/hyperledger/besu/cli/subcommands/operator/RestoreState.java b/besu/src/main/java/org/hyperledger/besu/cli/subcommands/operator/RestoreState.java index 4fda098b6d..9f1a2f72ac 100644 --- a/besu/src/main/java/org/hyperledger/besu/cli/subcommands/operator/RestoreState.java +++ b/besu/src/main/java/org/hyperledger/besu/cli/subcommands/operator/RestoreState.java @@ -38,24 +38,19 @@ import org.hyperledger.besu.ethereum.trie.RestoreVisitor; import org.hyperledger.besu.ethereum.worldstate.DefaultWorldStateArchive; import org.hyperledger.besu.ethereum.worldstate.StateTrieAccountValue; import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage; +import org.hyperledger.besu.util.io.RollingFileReader; -import java.io.Closeable; -import java.io.DataInputStream; -import java.io.EOFException; -import java.io.FileInputStream; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.List; -import java.util.function.BiFunction; import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.tuweni.bytes.Bytes; import org.apache.tuweni.bytes.Bytes32; -import org.xerial.snappy.Snappy; import picocli.CommandLine.Command; import picocli.CommandLine.Option; import picocli.CommandLine.ParentCommand; @@ -280,71 +275,7 @@ public class RestoreState implements Runnable { trieNodeCount++; } - static class RollingFileReader implements Closeable { - final BiFunction filenameGenerator; - final boolean compressed; - int currentPosition; - int fileNumber; - FileInputStream in; - final DataInputStream index; - boolean done = false; - - RollingFileReader( - final BiFunction filenameGenerator, final boolean compressed) - throws IOException { - this.filenameGenerator = filenameGenerator; - this.compressed = compressed; - final Path firstInputFile = filenameGenerator.apply(fileNumber, compressed); - in = new FileInputStream(firstInputFile.toFile()); - index = - new DataInputStream( - new FileInputStream(StateBackupService.dataFileToIndex(firstInputFile).toFile())); - fileNumber = index.readInt(); - currentPosition = index.readUnsignedShort(); - } - - byte[] readBytes() throws IOException { - byte[] raw; - try { - final int start = currentPosition; - final int nextFile = index.readUnsignedShort(); - currentPosition = index.readInt(); - if (nextFile == fileNumber) { - final int len = currentPosition - start; - raw = new byte[len]; - //noinspection ResultOfMethodCallIgnored - in.read(raw); - } else { - raw = in.readAllBytes(); - in.close(); - fileNumber = nextFile; - in = new FileInputStream(filenameGenerator.apply(fileNumber, compressed).toFile()); - if (currentPosition != 0) { - //noinspection ResultOfMethodCallIgnored - in.skip(currentPosition); - } - } - } catch (final EOFException eofe) { - // this happens when we read the last value, where there is no next index. - raw = in.readAllBytes(); - done = true; - } - return compressed ? Snappy.uncompress(raw) : raw; - } - - @Override - public void close() throws IOException { - in.close(); - index.close(); - } - - public boolean isDone() { - return done; - } - } - - @SuppressWarnings("unused") - BesuController createBesuController() { + private BesuController createBesuController() { return parentCommand.parentCommand.buildController(); } } diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/StateBackupService.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/StateBackupService.java index c774b85797..b764435755 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/StateBackupService.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/StateBackupService.java @@ -34,11 +34,8 @@ import org.hyperledger.besu.ethereum.trie.TrieIterator.State; import org.hyperledger.besu.ethereum.worldstate.StateTrieAccountValue; import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage; import org.hyperledger.besu.plugin.data.Hash; +import org.hyperledger.besu.util.io.RollingFileWriter; -import java.io.Closeable; -import java.io.DataOutputStream; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; @@ -51,7 +48,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import java.util.function.BiFunction; import java.util.function.Function; import com.fasterxml.jackson.annotation.JsonGetter; @@ -60,12 +56,10 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.tuweni.bytes.Bytes; import org.apache.tuweni.bytes.Bytes32; -import org.xerial.snappy.Snappy; public class StateBackupService { private static final Logger LOG = LogManager.getLogger(); - private static final long MAX_FILE_SIZE = 1 << 28; // 256 MiB max file size private static final Bytes ACCOUNT_END_MARKER; static { @@ -75,7 +69,7 @@ public class StateBackupService { ACCOUNT_END_MARKER = endMarker.encoded(); } - private final String besuVesion; + private final String besuVersion; private final Lock submissionLock = new ReentrantLock(); private final EthScheduler scheduler; private final Blockchain blockchain; @@ -86,12 +80,12 @@ public class StateBackupService { private RollingFileWriter accountFileWriter; public StateBackupService( - final String besuVesion, + final String besuVersion, final Blockchain blockchain, final Path backupDir, final EthScheduler scheduler, final WorldStateStorage worldStateStorage) { - this.besuVesion = besuVesion; + this.besuVersion = besuVersion; this.blockchain = blockchain; this.backupDir = backupDir; this.scheduler = scheduler; @@ -194,7 +188,7 @@ public class StateBackupService { backupStatus.compressed = compress; backupStatus.currentAccount = Bytes32.ZERO; - backupChaindata(); + backupChainData(); backupLeaves(); writeManifest(); @@ -208,7 +202,7 @@ public class StateBackupService { private void writeManifest() throws IOException { final Map manifest = new HashMap<>(); - manifest.put("clientVersion", besuVesion); + manifest.put("clientVersion", besuVersion); manifest.put("compressed", backupStatus.compressed); manifest.put("targetBlock", backupStatus.targetBlock); manifest.put("accountCount", backupStatus.accountCount); @@ -296,7 +290,7 @@ public class StateBackupService { return State.CONTINUE; } - private void backupChaindata() throws IOException { + private void backupChainData() throws IOException { try (final RollingFileWriter headerWriter = new RollingFileWriter(this::headerFileName, backupStatus.compressed); final RollingFileWriter bodyWriter = @@ -351,53 +345,6 @@ public class StateBackupService { return State.CONTINUE; } - static class RollingFileWriter implements Closeable { - final BiFunction filenameGenerator; - final boolean compressed; - int currentSize; - int fileNumber; - FileOutputStream out; - final DataOutputStream index; - - RollingFileWriter( - final BiFunction filenameGenerator, final boolean compressed) - throws FileNotFoundException { - this.filenameGenerator = filenameGenerator; - this.compressed = compressed; - currentSize = 0; - fileNumber = 0; - final Path firstOutputFile = filenameGenerator.apply(fileNumber, compressed); - out = new FileOutputStream(firstOutputFile.toFile()); - index = new DataOutputStream(new FileOutputStream(dataFileToIndex(firstOutputFile).toFile())); - } - - void writeBytes(final byte[] bytes) throws IOException { - final byte[] finalBytes; - if (compressed) { - finalBytes = Snappy.compress(bytes); - } else { - finalBytes = bytes; - } - int pos = currentSize; - currentSize += finalBytes.length; - if (currentSize > MAX_FILE_SIZE) { - out.close(); - out = new FileOutputStream(filenameGenerator.apply(++fileNumber, compressed).toFile()); - currentSize = finalBytes.length; - pos = 0; - } - index.writeShort(fileNumber); - index.writeInt(pos); - out.write(finalBytes); - } - - @Override - public void close() throws IOException { - out.close(); - index.close(); - } - } - public static final class BackupStatus { long targetBlock; long storedBlock; diff --git a/util/build.gradle b/util/build.gradle index 4d8ff95e11..c4ca741fef 100644 --- a/util/build.gradle +++ b/util/build.gradle @@ -36,6 +36,7 @@ dependencies { implementation 'io.vertx:vertx-core' implementation 'org.apache.tuweni:bytes' implementation 'org.apache.tuweni:units' + implementation 'org.xerial.snappy:snappy-java' runtimeOnly 'org.apache.logging.log4j:log4j-core' diff --git a/util/src/main/java/org/hyperledger/besu/util/io/RollingFileReader.java b/util/src/main/java/org/hyperledger/besu/util/io/RollingFileReader.java new file mode 100644 index 0000000000..ec56a45dd0 --- /dev/null +++ b/util/src/main/java/org/hyperledger/besu/util/io/RollingFileReader.java @@ -0,0 +1,98 @@ +/* + * Copyright ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.hyperledger.besu.util.io; + +import java.io.Closeable; +import java.io.EOFException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.file.Path; +import java.util.function.BiFunction; + +import org.xerial.snappy.Snappy; + +public class RollingFileReader implements Closeable { + private final BiFunction filenameGenerator; + private final boolean compressed; + private int currentPosition; + private int fileNumber; + private RandomAccessFile in; + private final RandomAccessFile index; + private boolean done = false; + + public RollingFileReader( + final BiFunction filenameGenerator, final boolean compressed) + throws IOException { + this.filenameGenerator = filenameGenerator; + this.compressed = compressed; + final Path firstInputFile = filenameGenerator.apply(fileNumber, compressed); + in = new RandomAccessFile(firstInputFile.toFile(), "r"); + index = new RandomAccessFile(RollingFileWriter.dataFileToIndex(firstInputFile).toFile(), "r"); + fileNumber = index.readInt(); + currentPosition = index.readUnsignedShort(); + } + + public byte[] readBytes() throws IOException { + byte[] raw; + try { + final int start = currentPosition; + final int nextFile = index.readUnsignedShort(); + currentPosition = index.readInt(); + if (nextFile == fileNumber) { + final int len = currentPosition - start; + raw = new byte[len]; + in.read(raw); + } else { + raw = new byte[(int) (in.length() - in.getFilePointer())]; + in.read(raw); + in.close(); + fileNumber = nextFile; + in = new RandomAccessFile(filenameGenerator.apply(fileNumber, compressed).toFile(), "r"); + if (currentPosition != 0) { + in.seek(currentPosition); + } + } + } catch (final EOFException e) { + // this happens when we read the last value, where there is no next index. + raw = new byte[(int) (in.length() - in.getFilePointer())]; + in.read(raw); + done = true; + } + return compressed ? Snappy.uncompress(raw) : raw; + } + + public void seek(final long position) throws IOException { + index.seek(position * 6); + final int oldFile = fileNumber; + fileNumber = index.readUnsignedShort(); + currentPosition = index.readInt(); + if (oldFile != fileNumber) { + in = new RandomAccessFile(filenameGenerator.apply(fileNumber, compressed).toFile(), "r"); + } + in.seek(currentPosition); + } + + @Override + public void close() throws IOException { + in.close(); + index.close(); + } + + public boolean isDone() { + return done; + } +} diff --git a/util/src/main/java/org/hyperledger/besu/util/io/RollingFileWriter.java b/util/src/main/java/org/hyperledger/besu/util/io/RollingFileWriter.java new file mode 100644 index 0000000000..aaa03e6106 --- /dev/null +++ b/util/src/main/java/org/hyperledger/besu/util/io/RollingFileWriter.java @@ -0,0 +1,87 @@ +/* + * Copyright ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.hyperledger.besu.util.io; + +import java.io.Closeable; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Path; +import java.util.function.BiFunction; + +import org.xerial.snappy.Snappy; + +public class RollingFileWriter implements Closeable { + private static final long MAX_FILE_SIZE = 1 << 30; // 1 GiB max file size + + private final BiFunction filenameGenerator; + private final boolean compressed; + private int currentSize; + private int fileNumber; + private FileOutputStream out; + private final DataOutputStream index; + + public RollingFileWriter( + final BiFunction filenameGenerator, final boolean compressed) + throws FileNotFoundException { + this.filenameGenerator = filenameGenerator; + this.compressed = compressed; + currentSize = 0; + fileNumber = 0; + final Path firstOutputFile = filenameGenerator.apply(fileNumber, compressed); + final File parentDir = firstOutputFile.getParent().toFile(); + if (!parentDir.exists()) { + //noinspection ResultOfMethodCallIgnored + parentDir.mkdirs(); + } + out = new FileOutputStream(firstOutputFile.toFile()); + + index = new DataOutputStream(new FileOutputStream(dataFileToIndex(firstOutputFile).toFile())); + } + + public static Path dataFileToIndex(final Path dataName) { + return Path.of(dataName.toString().replaceAll("(.*)[-.]\\d\\d\\d\\d\\.(.)dat", "$1.$2idx")); + } + + public void writeBytes(final byte[] bytes) throws IOException { + final byte[] finalBytes; + if (compressed) { + finalBytes = Snappy.compress(bytes); + } else { + finalBytes = bytes; + } + int pos = currentSize; + currentSize += finalBytes.length; + if (currentSize > MAX_FILE_SIZE) { + out.close(); + out = new FileOutputStream(filenameGenerator.apply(++fileNumber, compressed).toFile()); + currentSize = finalBytes.length; + pos = 0; + } + index.writeShort(fileNumber); + index.writeInt(pos); + out.write(finalBytes); + } + + @Override + public void close() throws IOException { + out.close(); + index.close(); + } +}