add freezer db

update/ethdb2024
“GheisMohammadi” 8 months ago
parent 4bdddf7067
commit 84c641909a
No known key found for this signature in database
GPG Key ID: 15073AED3829FE90
  1. 509
      core/rawdb/freezer.go
  2. 255
      core/rawdb/freezer_batch.go
  3. 109
      core/rawdb/freezer_meta.go
  4. 60
      core/rawdb/freezer_meta_test.go
  5. 238
      core/rawdb/freezer_resettable.go
  6. 107
      core/rawdb/freezer_resettable_test.go
  7. 990
      core/rawdb/freezer_table.go
  8. 1369
      core/rawdb/freezer_table_test.go
  9. 483
      core/rawdb/freezer_test.go
  10. 131
      core/rawdb/freezer_utils.go
  11. 75
      core/rawdb/freezer_utils_test.go

@ -0,0 +1,509 @@
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package rawdb
import (
"errors"
"fmt"
"math"
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/gofrs/flock"
)
var (
// errReadOnly is returned if the freezer is opened in read only mode. All the
// mutations are disallowed.
errReadOnly = errors.New("read only")
// errUnknownTable is returned if the user attempts to read from a table that is
// not tracked by the freezer.
errUnknownTable = errors.New("unknown table")
// errOutOrderInsertion is returned if the user attempts to inject out-of-order
// binary blobs into the freezer.
errOutOrderInsertion = errors.New("the append operation is out-order")
// errSymlinkDatadir is returned if the ancient directory specified by user
// is a symbolic link.
errSymlinkDatadir = errors.New("symbolic link datadir is not supported")
)
// freezerTableSize defines the maximum size of freezer data files.
const freezerTableSize = 2 * 1000 * 1000 * 1000
// Freezer is a memory mapped append-only database to store immutable ordered
// data into flat files:
//
// - The append-only nature ensures that disk writes are minimized.
// - The memory mapping ensures we can max out system memory for caching without
// reserving it for go-ethereum. This would also reduce the memory requirements
// of Geth, and thus also GC overhead.
type Freezer struct {
frozen atomic.Uint64 // Number of blocks already frozen
tail atomic.Uint64 // Number of the first stored item in the freezer
// This lock synchronizes writers and the truncate operation, as well as
// the "atomic" (batched) read operations.
writeLock sync.RWMutex
writeBatch *freezerBatch
readonly bool
tables map[string]*freezerTable // Data tables for storing everything
instanceLock *flock.Flock // File-system lock to prevent double opens
closeOnce sync.Once
}
// NewChainFreezer is a small utility method around NewFreezer that sets the
// default parameters for the chain storage.
func NewChainFreezer(datadir string, namespace string, readonly bool) (*Freezer, error) {
return NewFreezer(datadir, namespace, readonly, freezerTableSize, chainFreezerNoSnappy)
}
// NewFreezer creates a freezer instance for maintaining immutable ordered
// data according to the given parameters.
//
// The 'tables' argument defines the data tables. If the value of a map
// entry is true, snappy compression is disabled for the table.
func NewFreezer(datadir string, namespace string, readonly bool, maxTableSize uint32, tables map[string]bool) (*Freezer, error) {
// Create the initial freezer object
var (
readMeter = metrics.NewRegisteredMeter(namespace+"ancient/read", nil)
writeMeter = metrics.NewRegisteredMeter(namespace+"ancient/write", nil)
sizeGauge = metrics.NewRegisteredGauge(namespace+"ancient/size", nil)
)
// Ensure the datadir is not a symbolic link if it exists.
if info, err := os.Lstat(datadir); !os.IsNotExist(err) {
if info.Mode()&os.ModeSymlink != 0 {
log.Warn("Symbolic link ancient database is not supported", "path", datadir)
return nil, errSymlinkDatadir
}
}
flockFile := filepath.Join(datadir, "FLOCK")
if err := os.MkdirAll(filepath.Dir(flockFile), 0755); err != nil {
return nil, err
}
// Leveldb uses LOCK as the filelock filename. To prevent the
// name collision, we use FLOCK as the lock name.
lock := flock.New(flockFile)
tryLock := lock.TryLock
if readonly {
tryLock = lock.TryRLock
}
if locked, err := tryLock(); err != nil {
return nil, err
} else if !locked {
return nil, errors.New("locking failed")
}
// Open all the supported data tables
freezer := &Freezer{
readonly: readonly,
tables: make(map[string]*freezerTable),
instanceLock: lock,
}
// Create the tables.
for name, disableSnappy := range tables {
table, err := newTable(datadir, name, readMeter, writeMeter, sizeGauge, maxTableSize, disableSnappy, readonly)
if err != nil {
for _, table := range freezer.tables {
table.Close()
}
lock.Unlock()
return nil, err
}
freezer.tables[name] = table
}
var err error
if freezer.readonly {
// In readonly mode only validate, don't truncate.
// validate also sets `freezer.frozen`.
err = freezer.validate()
} else {
// Truncate all tables to common length.
err = freezer.repair()
}
if err != nil {
for _, table := range freezer.tables {
table.Close()
}
lock.Unlock()
return nil, err
}
// Create the write batch.
freezer.writeBatch = newFreezerBatch(freezer)
log.Info("Opened ancient database", "database", datadir, "readonly", readonly)
return freezer, nil
}
// Close terminates the chain freezer, unmapping all the data files.
func (f *Freezer) Close() error {
f.writeLock.Lock()
defer f.writeLock.Unlock()
var errs []error
f.closeOnce.Do(func() {
for _, table := range f.tables {
if err := table.Close(); err != nil {
errs = append(errs, err)
}
}
if err := f.instanceLock.Unlock(); err != nil {
errs = append(errs, err)
}
})
if errs != nil {
return fmt.Errorf("%v", errs)
}
return nil
}
// HasAncient returns an indicator whether the specified ancient data exists
// in the freezer.
func (f *Freezer) HasAncient(kind string, number uint64) (bool, error) {
if table := f.tables[kind]; table != nil {
return table.has(number), nil
}
return false, nil
}
// Ancient retrieves an ancient binary blob from the append-only immutable files.
func (f *Freezer) Ancient(kind string, number uint64) ([]byte, error) {
if table := f.tables[kind]; table != nil {
return table.Retrieve(number)
}
return nil, errUnknownTable
}
// AncientRange retrieves multiple items in sequence, starting from the index 'start'.
// It will return
// - at most 'count' items,
// - if maxBytes is specified: at least 1 item (even if exceeding the maxByteSize),
// but will otherwise return as many items as fit into maxByteSize.
// - if maxBytes is not specified, 'count' items will be returned if they are present.
func (f *Freezer) AncientRange(kind string, start, count, maxBytes uint64) ([][]byte, error) {
if table := f.tables[kind]; table != nil {
return table.RetrieveItems(start, count, maxBytes)
}
return nil, errUnknownTable
}
// Ancients returns the length of the frozen items.
func (f *Freezer) Ancients() (uint64, error) {
return f.frozen.Load(), nil
}
// Tail returns the number of first stored item in the freezer.
func (f *Freezer) Tail() (uint64, error) {
return f.tail.Load(), nil
}
// AncientSize returns the ancient size of the specified category.
func (f *Freezer) AncientSize(kind string) (uint64, error) {
// This needs the write lock to avoid data races on table fields.
// Speed doesn't matter here, AncientSize is for debugging.
f.writeLock.RLock()
defer f.writeLock.RUnlock()
if table := f.tables[kind]; table != nil {
return table.size()
}
return 0, errUnknownTable
}
// ReadAncients runs the given read operation while ensuring that no writes take place
// on the underlying freezer.
func (f *Freezer) ReadAncients(fn func(ethdb.AncientReaderOp) error) (err error) {
f.writeLock.RLock()
defer f.writeLock.RUnlock()
return fn(f)
}
// ModifyAncients runs the given write operation.
func (f *Freezer) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (writeSize int64, err error) {
if f.readonly {
return 0, errReadOnly
}
f.writeLock.Lock()
defer f.writeLock.Unlock()
// Roll back all tables to the starting position in case of error.
prevItem := f.frozen.Load()
defer func() {
if err != nil {
// The write operation has failed. Go back to the previous item position.
for name, table := range f.tables {
err := table.truncateHead(prevItem)
if err != nil {
log.Error("Freezer table roll-back failed", "table", name, "index", prevItem, "err", err)
}
}
}
}()
f.writeBatch.reset()
if err := fn(f.writeBatch); err != nil {
return 0, err
}
item, writeSize, err := f.writeBatch.commit()
if err != nil {
return 0, err
}
f.frozen.Store(item)
return writeSize, nil
}
// TruncateHead discards any recent data above the provided threshold number.
// It returns the previous head number.
func (f *Freezer) TruncateHead(items uint64) (uint64, error) {
if f.readonly {
return 0, errReadOnly
}
f.writeLock.Lock()
defer f.writeLock.Unlock()
oitems := f.frozen.Load()
if oitems <= items {
return oitems, nil
}
for _, table := range f.tables {
if err := table.truncateHead(items); err != nil {
return 0, err
}
}
f.frozen.Store(items)
return oitems, nil
}
// TruncateTail discards any recent data below the provided threshold number.
func (f *Freezer) TruncateTail(tail uint64) (uint64, error) {
if f.readonly {
return 0, errReadOnly
}
f.writeLock.Lock()
defer f.writeLock.Unlock()
old := f.tail.Load()
if old >= tail {
return old, nil
}
for _, table := range f.tables {
if err := table.truncateTail(tail); err != nil {
return 0, err
}
}
f.tail.Store(tail)
return old, nil
}
// Sync flushes all data tables to disk.
func (f *Freezer) Sync() error {
var errs []error
for _, table := range f.tables {
if err := table.Sync(); err != nil {
errs = append(errs, err)
}
}
if errs != nil {
return fmt.Errorf("%v", errs)
}
return nil
}
// validate checks that every table has the same boundary.
// Used instead of `repair` in readonly mode.
func (f *Freezer) validate() error {
if len(f.tables) == 0 {
return nil
}
var (
head uint64
tail uint64
name string
)
// Hack to get boundary of any table
for kind, table := range f.tables {
head = table.items.Load()
tail = table.itemHidden.Load()
name = kind
break
}
// Now check every table against those boundaries.
for kind, table := range f.tables {
if head != table.items.Load() {
return fmt.Errorf("freezer tables %s and %s have differing head: %d != %d", kind, name, table.items.Load(), head)
}
if tail != table.itemHidden.Load() {
return fmt.Errorf("freezer tables %s and %s have differing tail: %d != %d", kind, name, table.itemHidden.Load(), tail)
}
}
f.frozen.Store(head)
f.tail.Store(tail)
return nil
}
// repair truncates all data tables to the same length.
func (f *Freezer) repair() error {
var (
head = uint64(math.MaxUint64)
tail = uint64(0)
)
for _, table := range f.tables {
items := table.items.Load()
if head > items {
head = items
}
hidden := table.itemHidden.Load()
if hidden > tail {
tail = hidden
}
}
for _, table := range f.tables {
if err := table.truncateHead(head); err != nil {
return err
}
if err := table.truncateTail(tail); err != nil {
return err
}
}
f.frozen.Store(head)
f.tail.Store(tail)
return nil
}
// convertLegacyFn takes a raw freezer entry in an older format and
// returns it in the new format.
type convertLegacyFn = func([]byte) ([]byte, error)
// MigrateTable processes the entries in a given table in sequence
// converting them to a new format if they're of an old format.
func (f *Freezer) MigrateTable(kind string, convert convertLegacyFn) error {
if f.readonly {
return errReadOnly
}
f.writeLock.Lock()
defer f.writeLock.Unlock()
table, ok := f.tables[kind]
if !ok {
return errUnknownTable
}
// forEach iterates every entry in the table serially and in order, calling `fn`
// with the item as argument. If `fn` returns an error the iteration stops
// and that error will be returned.
forEach := func(t *freezerTable, offset uint64, fn func(uint64, []byte) error) error {
var (
items = t.items.Load()
batchSize = uint64(1024)
maxBytes = uint64(1024 * 1024)
)
for i := offset; i < items; {
if i+batchSize > items {
batchSize = items - i
}
data, err := t.RetrieveItems(i, batchSize, maxBytes)
if err != nil {
return err
}
for j, item := range data {
if err := fn(i+uint64(j), item); err != nil {
return err
}
}
i += uint64(len(data))
}
return nil
}
// TODO(s1na): This is a sanity-check since as of now no process does tail-deletion. But the migration
// process assumes no deletion at tail and needs to be modified to account for that.
if table.itemOffset.Load() > 0 || table.itemHidden.Load() > 0 {
return errors.New("migration not supported for tail-deleted freezers")
}
ancientsPath := filepath.Dir(table.index.Name())
// Set up new dir for the migrated table, the content of which
// we'll at the end move over to the ancients dir.
migrationPath := filepath.Join(ancientsPath, "migration")
newTable, err := newFreezerTable(migrationPath, kind, table.noCompression, false)
if err != nil {
return err
}
var (
batch = newTable.newBatch()
out []byte
start = time.Now()
logged = time.Now()
offset = newTable.items.Load()
)
if offset > 0 {
log.Info("found previous migration attempt", "migrated", offset)
}
// Iterate through entries and transform them
if err := forEach(table, offset, func(i uint64, blob []byte) error {
if i%10000 == 0 && time.Since(logged) > 16*time.Second {
log.Info("Processing legacy elements", "count", i, "elapsed", common.PrettyDuration(time.Since(start)))
logged = time.Now()
}
out, err = convert(blob)
if err != nil {
return err
}
if err := batch.AppendRaw(i, out); err != nil {
return err
}
return nil
}); err != nil {
return err
}
if err := batch.commit(); err != nil {
return err
}
log.Info("Replacing old table files with migrated ones", "elapsed", common.PrettyDuration(time.Since(start)))
// Release and delete old table files. Note this won't
// delete the index file.
table.releaseFilesAfter(0, true)
if err := newTable.Close(); err != nil {
return err
}
files, err := os.ReadDir(migrationPath)
if err != nil {
return err
}
// Move migrated files to ancients dir.
for _, f := range files {
// This will replace the old index file as a side-effect.
if err := os.Rename(filepath.Join(migrationPath, f.Name()), filepath.Join(ancientsPath, f.Name())); err != nil {
return err
}
}
// Delete by now empty dir.
if err := os.Remove(migrationPath); err != nil {
return err
}
return nil
}

@ -0,0 +1,255 @@
// Copyright 2021 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package rawdb
import (
"fmt"
"github.com/ethereum/go-ethereum/common/math"
"github.com/ethereum/go-ethereum/rlp"
"github.com/golang/snappy"
)
// This is the maximum amount of data that will be buffered in memory
// for a single freezer table batch.
const freezerBatchBufferLimit = 2 * 1024 * 1024
// freezerBatch is a write operation of multiple items on a freezer.
type freezerBatch struct {
tables map[string]*freezerTableBatch
}
func newFreezerBatch(f *Freezer) *freezerBatch {
batch := &freezerBatch{tables: make(map[string]*freezerTableBatch, len(f.tables))}
for kind, table := range f.tables {
batch.tables[kind] = table.newBatch()
}
return batch
}
// Append adds an RLP-encoded item of the given kind.
func (batch *freezerBatch) Append(kind string, num uint64, item interface{}) error {
return batch.tables[kind].Append(num, item)
}
// AppendRaw adds an item of the given kind.
func (batch *freezerBatch) AppendRaw(kind string, num uint64, item []byte) error {
return batch.tables[kind].AppendRaw(num, item)
}
// reset initializes the batch.
func (batch *freezerBatch) reset() {
for _, tb := range batch.tables {
tb.reset()
}
}
// commit is called at the end of a write operation and
// writes all remaining data to tables.
func (batch *freezerBatch) commit() (item uint64, writeSize int64, err error) {
// Check that count agrees on all batches.
item = uint64(math.MaxUint64)
for name, tb := range batch.tables {
if item < math.MaxUint64 && tb.curItem != item {
return 0, 0, fmt.Errorf("table %s is at item %d, want %d", name, tb.curItem, item)
}
item = tb.curItem
}
// Commit all table batches.
for _, tb := range batch.tables {
if err := tb.commit(); err != nil {
return 0, 0, err
}
writeSize += tb.totalBytes
}
return item, writeSize, nil
}
// freezerTableBatch is a batch for a freezer table.
type freezerTableBatch struct {
t *freezerTable
sb *snappyBuffer
encBuffer writeBuffer
dataBuffer []byte
indexBuffer []byte
curItem uint64 // expected index of next append
totalBytes int64 // counts written bytes since reset
}
// newBatch creates a new batch for the freezer table.
func (t *freezerTable) newBatch() *freezerTableBatch {
batch := &freezerTableBatch{t: t}
if !t.noCompression {
batch.sb = new(snappyBuffer)
}
batch.reset()
return batch
}
// reset clears the batch for reuse.
func (batch *freezerTableBatch) reset() {
batch.dataBuffer = batch.dataBuffer[:0]
batch.indexBuffer = batch.indexBuffer[:0]
batch.curItem = batch.t.items.Load()
batch.totalBytes = 0
}
// Append rlp-encodes and adds data at the end of the freezer table. The item number is a
// precautionary parameter to ensure data correctness, but the table will reject already
// existing data.
func (batch *freezerTableBatch) Append(item uint64, data interface{}) error {
if item != batch.curItem {
return fmt.Errorf("%w: have %d want %d", errOutOrderInsertion, item, batch.curItem)
}
// Encode the item.
batch.encBuffer.Reset()
if err := rlp.Encode(&batch.encBuffer, data); err != nil {
return err
}
encItem := batch.encBuffer.data
if batch.sb != nil {
encItem = batch.sb.compress(encItem)
}
return batch.appendItem(encItem)
}
// AppendRaw injects a binary blob at the end of the freezer table. The item number is a
// precautionary parameter to ensure data correctness, but the table will reject already
// existing data.
func (batch *freezerTableBatch) AppendRaw(item uint64, blob []byte) error {
if item != batch.curItem {
return fmt.Errorf("%w: have %d want %d", errOutOrderInsertion, item, batch.curItem)
}
encItem := blob
if batch.sb != nil {
encItem = batch.sb.compress(blob)
}
return batch.appendItem(encItem)
}
func (batch *freezerTableBatch) appendItem(data []byte) error {
// Check if item fits into current data file.
itemSize := int64(len(data))
itemOffset := batch.t.headBytes + int64(len(batch.dataBuffer))
if itemOffset+itemSize > int64(batch.t.maxFileSize) {
// It doesn't fit, go to next file first.
if err := batch.commit(); err != nil {
return err
}
if err := batch.t.advanceHead(); err != nil {
return err
}
itemOffset = 0
}
// Put data to buffer.
batch.dataBuffer = append(batch.dataBuffer, data...)
batch.totalBytes += itemSize
// Put index entry to buffer.
entry := indexEntry{filenum: batch.t.headId, offset: uint32(itemOffset + itemSize)}
batch.indexBuffer = entry.append(batch.indexBuffer)
batch.curItem++
return batch.maybeCommit()
}
// maybeCommit writes the buffered data if the buffer is full enough.
func (batch *freezerTableBatch) maybeCommit() error {
if len(batch.dataBuffer) > freezerBatchBufferLimit {
return batch.commit()
}
return nil
}
// commit writes the batched items to the backing freezerTable.
func (batch *freezerTableBatch) commit() error {
// Write data. The head file is fsync'd after write to ensure the
// data is truly transferred to disk.
_, err := batch.t.head.Write(batch.dataBuffer)
if err != nil {
return err
}
if err := batch.t.head.Sync(); err != nil {
return err
}
dataSize := int64(len(batch.dataBuffer))
batch.dataBuffer = batch.dataBuffer[:0]
// Write indices. The index file is fsync'd after write to ensure the
// data indexes are truly transferred to disk.
_, err = batch.t.index.Write(batch.indexBuffer)
if err != nil {
return err
}
if err := batch.t.index.Sync(); err != nil {
return err
}
indexSize := int64(len(batch.indexBuffer))
batch.indexBuffer = batch.indexBuffer[:0]
// Update headBytes of table.
batch.t.headBytes += dataSize
batch.t.items.Store(batch.curItem)
// Update metrics.
batch.t.sizeGauge.Inc(dataSize + indexSize)
batch.t.writeMeter.Mark(dataSize + indexSize)
return nil
}
// snappyBuffer writes snappy in block format, and can be reused. It is
// reset when WriteTo is called.
type snappyBuffer struct {
dst []byte
}
// compress snappy-compresses the data.
func (s *snappyBuffer) compress(data []byte) []byte {
// The snappy library does not care what the capacity of the buffer is,
// but only checks the length. If the length is too small, it will
// allocate a brand new buffer.
// To avoid that, we check the required size here, and grow the size of the
// buffer to utilize the full capacity.
if n := snappy.MaxEncodedLen(len(data)); len(s.dst) < n {
if cap(s.dst) < n {
s.dst = make([]byte, n)
}
s.dst = s.dst[:n]
}
s.dst = snappy.Encode(s.dst, data)
return s.dst
}
// writeBuffer implements io.Writer for a byte slice.
type writeBuffer struct {
data []byte
}
func (wb *writeBuffer) Write(data []byte) (int, error) {
wb.data = append(wb.data, data...)
return len(data), nil
}
func (wb *writeBuffer) Reset() {
wb.data = wb.data[:0]
}

@ -0,0 +1,109 @@
// Copyright 2022 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package rawdb
import (
"io"
"os"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
)
const freezerVersion = 1 // The initial version tag of freezer table metadata
// freezerTableMeta wraps all the metadata of the freezer table.
type freezerTableMeta struct {
// Version is the versioning descriptor of the freezer table.
Version uint16
// VirtualTail indicates how many items have been marked as deleted.
// Its value is equal to the number of items removed from the table
// plus the number of items hidden in the table, so it should never
// be lower than the "actual tail".
VirtualTail uint64
}
// newMetadata initializes the metadata object with the given virtual tail.
func newMetadata(tail uint64) *freezerTableMeta {
return &freezerTableMeta{
Version: freezerVersion,
VirtualTail: tail,
}
}
// readMetadata reads the metadata of the freezer table from the
// given metadata file.
func readMetadata(file *os.File) (*freezerTableMeta, error) {
_, err := file.Seek(0, io.SeekStart)
if err != nil {
return nil, err
}
var meta freezerTableMeta
if err := rlp.Decode(file, &meta); err != nil {
return nil, err
}
return &meta, nil
}
// writeMetadata writes the metadata of the freezer table into the
// given metadata file.
func writeMetadata(file *os.File, meta *freezerTableMeta) error {
_, err := file.Seek(0, io.SeekStart)
if err != nil {
return err
}
return rlp.Encode(file, meta)
}
// loadMetadata loads the metadata from the given metadata file.
// Initializes the metadata file with the given "actual tail" if
// it's empty.
func loadMetadata(file *os.File, tail uint64) (*freezerTableMeta, error) {
stat, err := file.Stat()
if err != nil {
return nil, err
}
// Write the metadata with the given actual tail into metadata file
// if it's non-existent. There are two possible scenarios here:
// - the freezer table is empty
// - the freezer table is legacy
// In both cases, write the meta into the file with the actual tail
// as the virtual tail.
if stat.Size() == 0 {
m := newMetadata(tail)
if err := writeMetadata(file, m); err != nil {
return nil, err
}
return m, nil
}
m, err := readMetadata(file)
if err != nil {
return nil, err
}
// Update the virtual tail with the given actual tail if it's even
// lower than it. Theoretically it shouldn't happen at all, print
// a warning here.
if m.VirtualTail < tail {
log.Warn("Updated virtual tail", "have", m.VirtualTail, "now", tail)
m.VirtualTail = tail
if err := writeMetadata(file, m); err != nil {
return nil, err
}
}
return m, nil
}

@ -0,0 +1,60 @@
// Copyright 2022 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package rawdb
import (
"os"
"testing"
)
func TestReadWriteFreezerTableMeta(t *testing.T) {
f, err := os.CreateTemp(os.TempDir(), "*")
if err != nil {
t.Fatalf("Failed to create file %v", err)
}
err = writeMetadata(f, newMetadata(100))
if err != nil {
t.Fatalf("Failed to write metadata %v", err)
}
meta, err := readMetadata(f)
if err != nil {
t.Fatalf("Failed to read metadata %v", err)
}
if meta.Version != freezerVersion {
t.Fatalf("Unexpected version field")
}
if meta.VirtualTail != uint64(100) {
t.Fatalf("Unexpected virtual tail field")
}
}
func TestInitializeFreezerTableMeta(t *testing.T) {
f, err := os.CreateTemp(os.TempDir(), "*")
if err != nil {
t.Fatalf("Failed to create file %v", err)
}
meta, err := loadMetadata(f, uint64(100))
if err != nil {
t.Fatalf("Failed to read metadata %v", err)
}
if meta.Version != freezerVersion {
t.Fatalf("Unexpected version field")
}
if meta.VirtualTail != uint64(100) {
t.Fatalf("Unexpected virtual tail field")
}
}

@ -0,0 +1,238 @@
// Copyright 2022 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package rawdb
import (
"os"
"path/filepath"
"sync"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
)
const tmpSuffix = ".tmp"
// freezerOpenFunc is the function used to open/create a freezer.
type freezerOpenFunc = func() (*Freezer, error)
// ResettableFreezer is a wrapper of the freezer which makes the
// freezer resettable.
type ResettableFreezer struct {
freezer *Freezer
opener freezerOpenFunc
datadir string
lock sync.RWMutex
}
// NewResettableFreezer creates a resettable freezer, note freezer is
// only resettable if the passed file directory is exclusively occupied
// by the freezer. And also the user-configurable ancient root directory
// is **not** supported for reset since it might be a mount and rename
// will cause a copy of hundreds of gigabyte into local directory. It
// needs some other file based solutions.
//
// The reset function will delete directory atomically and re-create the
// freezer from scratch.
func NewResettableFreezer(datadir string, namespace string, readonly bool, maxTableSize uint32, tables map[string]bool) (*ResettableFreezer, error) {
if err := cleanup(datadir); err != nil {
return nil, err
}
opener := func() (*Freezer, error) {
return NewFreezer(datadir, namespace, readonly, maxTableSize, tables)
}
freezer, err := opener()
if err != nil {
return nil, err
}
return &ResettableFreezer{
freezer: freezer,
opener: opener,
datadir: datadir,
}, nil
}
// Reset deletes the file directory exclusively occupied by the freezer and
// recreate the freezer from scratch. The atomicity of directory deletion
// is guaranteed by the rename operation, the leftover directory will be
// cleaned up in next startup in case crash happens after rename.
func (f *ResettableFreezer) Reset() error {
f.lock.Lock()
defer f.lock.Unlock()
if err := f.freezer.Close(); err != nil {
return err
}
tmp := tmpName(f.datadir)
if err := os.Rename(f.datadir, tmp); err != nil {
return err
}
if err := os.RemoveAll(tmp); err != nil {
return err
}
freezer, err := f.opener()
if err != nil {
return err
}
f.freezer = freezer
return nil
}
// Close terminates the chain freezer, unmapping all the data files.
func (f *ResettableFreezer) Close() error {
f.lock.RLock()
defer f.lock.RUnlock()
return f.freezer.Close()
}
// HasAncient returns an indicator whether the specified ancient data exists
// in the freezer
func (f *ResettableFreezer) HasAncient(kind string, number uint64) (bool, error) {
f.lock.RLock()
defer f.lock.RUnlock()
return f.freezer.HasAncient(kind, number)
}
// Ancient retrieves an ancient binary blob from the append-only immutable files.
func (f *ResettableFreezer) Ancient(kind string, number uint64) ([]byte, error) {
f.lock.RLock()
defer f.lock.RUnlock()
return f.freezer.Ancient(kind, number)
}
// AncientRange retrieves multiple items in sequence, starting from the index 'start'.
// It will return
// - at most 'count' items,
// - if maxBytes is specified: at least 1 item (even if exceeding the maxByteSize),
// but will otherwise return as many items as fit into maxByteSize.
// - if maxBytes is not specified, 'count' items will be returned if they are present.
func (f *ResettableFreezer) AncientRange(kind string, start, count, maxBytes uint64) ([][]byte, error) {
f.lock.RLock()
defer f.lock.RUnlock()
return f.freezer.AncientRange(kind, start, count, maxBytes)
}
// Ancients returns the length of the frozen items.
func (f *ResettableFreezer) Ancients() (uint64, error) {
f.lock.RLock()
defer f.lock.RUnlock()
return f.freezer.Ancients()
}
// Tail returns the number of first stored item in the freezer.
func (f *ResettableFreezer) Tail() (uint64, error) {
f.lock.RLock()
defer f.lock.RUnlock()
return f.freezer.Tail()
}
// AncientSize returns the ancient size of the specified category.
func (f *ResettableFreezer) AncientSize(kind string) (uint64, error) {
f.lock.RLock()
defer f.lock.RUnlock()
return f.freezer.AncientSize(kind)
}
// ReadAncients runs the given read operation while ensuring that no writes take place
// on the underlying freezer.
func (f *ResettableFreezer) ReadAncients(fn func(ethdb.AncientReaderOp) error) (err error) {
f.lock.RLock()
defer f.lock.RUnlock()
return f.freezer.ReadAncients(fn)
}
// ModifyAncients runs the given write operation.
func (f *ResettableFreezer) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (writeSize int64, err error) {
f.lock.RLock()
defer f.lock.RUnlock()
return f.freezer.ModifyAncients(fn)
}
// TruncateHead discards any recent data above the provided threshold number.
// It returns the previous head number.
func (f *ResettableFreezer) TruncateHead(items uint64) (uint64, error) {
f.lock.RLock()
defer f.lock.RUnlock()
return f.freezer.TruncateHead(items)
}
// TruncateTail discards any recent data below the provided threshold number.
// It returns the previous value
func (f *ResettableFreezer) TruncateTail(tail uint64) (uint64, error) {
f.lock.RLock()
defer f.lock.RUnlock()
return f.freezer.TruncateTail(tail)
}
// Sync flushes all data tables to disk.
func (f *ResettableFreezer) Sync() error {
f.lock.RLock()
defer f.lock.RUnlock()
return f.freezer.Sync()
}
// MigrateTable processes the entries in a given table in sequence
// converting them to a new format if they're of an old format.
func (f *ResettableFreezer) MigrateTable(kind string, convert convertLegacyFn) error {
f.lock.RLock()
defer f.lock.RUnlock()
return f.freezer.MigrateTable(kind, convert)
}
// cleanup removes the directory located in the specified path
// has the name with deletion marker suffix.
func cleanup(path string) error {
parent := filepath.Dir(path)
if _, err := os.Lstat(parent); os.IsNotExist(err) {
return nil
}
dir, err := os.Open(parent)
if err != nil {
return err
}
names, err := dir.Readdirnames(0)
if err != nil {
return err
}
if cerr := dir.Close(); cerr != nil {
return cerr
}
for _, name := range names {
if name == filepath.Base(path)+tmpSuffix {
log.Info("Removed leftover freezer directory", "name", name)
return os.RemoveAll(filepath.Join(parent, name))
}
}
return nil
}
func tmpName(path string) string {
return filepath.Join(filepath.Dir(path), filepath.Base(path)+tmpSuffix)
}

@ -0,0 +1,107 @@
// Copyright 2022 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package rawdb
import (
"bytes"
"os"
"testing"
"github.com/ethereum/go-ethereum/ethdb"
)
func TestResetFreezer(t *testing.T) {
items := []struct {
id uint64
blob []byte
}{
{0, bytes.Repeat([]byte{0}, 2048)},
{1, bytes.Repeat([]byte{1}, 2048)},
{2, bytes.Repeat([]byte{2}, 2048)},
}
f, _ := NewResettableFreezer(t.TempDir(), "", false, 2048, freezerTestTableDef)
defer f.Close()
f.ModifyAncients(func(op ethdb.AncientWriteOp) error {
for _, item := range items {
op.AppendRaw("test", item.id, item.blob)
}
return nil
})
for _, item := range items {
blob, _ := f.Ancient("test", item.id)
if !bytes.Equal(blob, item.blob) {
t.Fatal("Unexpected blob")
}
}
// Reset freezer
f.Reset()
count, _ := f.Ancients()
if count != 0 {
t.Fatal("Failed to reset freezer")
}
for _, item := range items {
blob, _ := f.Ancient("test", item.id)
if len(blob) != 0 {
t.Fatal("Unexpected blob")
}
}
// Fill the freezer
f.ModifyAncients(func(op ethdb.AncientWriteOp) error {
for _, item := range items {
op.AppendRaw("test", item.id, item.blob)
}
return nil
})
for _, item := range items {
blob, _ := f.Ancient("test", item.id)
if !bytes.Equal(blob, item.blob) {
t.Fatal("Unexpected blob")
}
}
}
func TestFreezerCleanup(t *testing.T) {
items := []struct {
id uint64
blob []byte
}{
{0, bytes.Repeat([]byte{0}, 2048)},
{1, bytes.Repeat([]byte{1}, 2048)},
{2, bytes.Repeat([]byte{2}, 2048)},
}
datadir := t.TempDir()
f, _ := NewResettableFreezer(datadir, "", false, 2048, freezerTestTableDef)
f.ModifyAncients(func(op ethdb.AncientWriteOp) error {
for _, item := range items {
op.AppendRaw("test", item.id, item.blob)
}
return nil
})
f.Close()
os.Rename(datadir, tmpName(datadir))
// Open the freezer again, trigger cleanup operation
f, _ = NewResettableFreezer(datadir, "", false, 2048, freezerTestTableDef)
f.Close()
if _, err := os.Lstat(tmpName(datadir)); !os.IsNotExist(err) {
t.Fatal("Failed to cleanup leftover directory")
}
}

@ -0,0 +1,990 @@
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package rawdb
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"sync"
"sync/atomic"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/golang/snappy"
)
var (
// errClosed is returned if an operation attempts to read from or write to the
// freezer table after it has already been closed.
errClosed = errors.New("closed")
// errOutOfBounds is returned if the item requested is not contained within the
// freezer table.
errOutOfBounds = errors.New("out of bounds")
// errNotSupported is returned if the database doesn't support the required operation.
errNotSupported = errors.New("this operation is not supported")
)
// indexEntry contains the number/id of the file that the data resides in, as well as the
// offset within the file to the end of the data.
// In serialized form, the filenum is stored as uint16.
type indexEntry struct {
filenum uint32 // stored as uint16 ( 2 bytes )
offset uint32 // stored as uint32 ( 4 bytes )
}
const indexEntrySize = 6
// unmarshalBinary deserializes binary b into the rawIndex entry.
func (i *indexEntry) unmarshalBinary(b []byte) {
i.filenum = uint32(binary.BigEndian.Uint16(b[:2]))
i.offset = binary.BigEndian.Uint32(b[2:6])
}
// append adds the encoded entry to the end of b.
func (i *indexEntry) append(b []byte) []byte {
offset := len(b)
out := append(b, make([]byte, indexEntrySize)...)
binary.BigEndian.PutUint16(out[offset:], uint16(i.filenum))
binary.BigEndian.PutUint32(out[offset+2:], i.offset)
return out
}
// bounds returns the start- and end- offsets, and the file number of where to
// read there data item marked by the two index entries. The two entries are
// assumed to be sequential.
func (i *indexEntry) bounds(end *indexEntry) (startOffset, endOffset, fileId uint32) {
if i.filenum != end.filenum {
// If a piece of data 'crosses' a data-file,
// it's actually in one piece on the second data-file.
// We return a zero-indexEntry for the second file as start
return 0, end.offset, end.filenum
}
return i.offset, end.offset, end.filenum
}
// freezerTable represents a single chained data table within the freezer (e.g. blocks).
// It consists of a data file (snappy encoded arbitrary data blobs) and an indexEntry
// file (uncompressed 64 bit indices into the data file).
type freezerTable struct {
items atomic.Uint64 // Number of items stored in the table (including items removed from tail)
itemOffset atomic.Uint64 // Number of items removed from the table
// itemHidden is the number of items marked as deleted. Tail deletion is
// only supported at file level which means the actual deletion will be
// delayed until the entire data file is marked as deleted. Before that
// these items will be hidden to prevent being visited again. The value
// should never be lower than itemOffset.
itemHidden atomic.Uint64
noCompression bool // if true, disables snappy compression. Note: does not work retroactively
readonly bool
maxFileSize uint32 // Max file size for data-files
name string
path string
head *os.File // File descriptor for the data head of the table
index *os.File // File descriptor for the indexEntry file of the table
meta *os.File // File descriptor for metadata of the table
files map[uint32]*os.File // open files
headId uint32 // number of the currently active head file
tailId uint32 // number of the earliest file
headBytes int64 // Number of bytes written to the head file
readMeter metrics.Meter // Meter for measuring the effective amount of data read
writeMeter metrics.Meter // Meter for measuring the effective amount of data written
sizeGauge metrics.Gauge // Gauge for tracking the combined size of all freezer tables
logger log.Logger // Logger with database path and table name embedded
lock sync.RWMutex // Mutex protecting the data file descriptors
}
// newFreezerTable opens the given path as a freezer table.
func newFreezerTable(path, name string, disableSnappy, readonly bool) (*freezerTable, error) {
return newTable(path, name, metrics.NilMeter{}, metrics.NilMeter{}, metrics.NilGauge{}, freezerTableSize, disableSnappy, readonly)
}
// newTable opens a freezer table, creating the data and index files if they are
// non-existent. Both files are truncated to the shortest common length to ensure
// they don't go out of sync.
func newTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, sizeGauge metrics.Gauge, maxFilesize uint32, noCompression, readonly bool) (*freezerTable, error) {
// Ensure the containing directory exists and open the indexEntry file
if err := os.MkdirAll(path, 0755); err != nil {
return nil, err
}
var idxName string
if noCompression {
idxName = fmt.Sprintf("%s.ridx", name) // raw index file
} else {
idxName = fmt.Sprintf("%s.cidx", name) // compressed index file
}
var (
err error
index *os.File
meta *os.File
)
if readonly {
// Will fail if table index file or meta file is not existent
index, err = openFreezerFileForReadOnly(filepath.Join(path, idxName))
if err != nil {
return nil, err
}
meta, err = openFreezerFileForReadOnly(filepath.Join(path, fmt.Sprintf("%s.meta", name)))
if err != nil {
return nil, err
}
} else {
index, err = openFreezerFileForAppend(filepath.Join(path, idxName))
if err != nil {
return nil, err
}
meta, err = openFreezerFileForAppend(filepath.Join(path, fmt.Sprintf("%s.meta", name)))
if err != nil {
return nil, err
}
}
// Create the table and repair any past inconsistency
tab := &freezerTable{
index: index,
meta: meta,
files: make(map[uint32]*os.File),
readMeter: readMeter,
writeMeter: writeMeter,
sizeGauge: sizeGauge,
name: name,
path: path,
logger: log.New("database", path, "table", name),
noCompression: noCompression,
readonly: readonly,
maxFileSize: maxFilesize,
}
if err := tab.repair(); err != nil {
tab.Close()
return nil, err
}
// Initialize the starting size counter
size, err := tab.sizeNolock()
if err != nil {
tab.Close()
return nil, err
}
tab.sizeGauge.Inc(int64(size))
return tab, nil
}
// repair cross-checks the head and the index file and truncates them to
// be in sync with each other after a potential crash / data loss.
func (t *freezerTable) repair() error {
// Create a temporary offset buffer to init files with and read indexEntry into
buffer := make([]byte, indexEntrySize)
// If we've just created the files, initialize the index with the 0 indexEntry
stat, err := t.index.Stat()
if err != nil {
return err
}
if stat.Size() == 0 {
if _, err := t.index.Write(buffer); err != nil {
return err
}
}
// Ensure the index is a multiple of indexEntrySize bytes
if overflow := stat.Size() % indexEntrySize; overflow != 0 {
if t.readonly {
return fmt.Errorf("index file(path: %s, name: %s) size is not a multiple of %d", t.path, t.name, indexEntrySize)
}
if err := truncateFreezerFile(t.index, stat.Size()-overflow); err != nil {
return err
} // New file can't trigger this path
}
// Retrieve the file sizes and prepare for truncation
if stat, err = t.index.Stat(); err != nil {
return err
}
offsetsSize := stat.Size()
// Open the head file
var (
firstIndex indexEntry
lastIndex indexEntry
contentSize int64
contentExp int64
verbose bool
)
// Read index zero, determine what file is the earliest
// and what item offset to use
t.index.ReadAt(buffer, 0)
firstIndex.unmarshalBinary(buffer)
// Assign the tail fields with the first stored index.
// The total removed items is represented with an uint32,
// which is not enough in theory but enough in practice.
// TODO: use uint64 to represent total removed items.
t.tailId = firstIndex.filenum
t.itemOffset.Store(uint64(firstIndex.offset))
// Load metadata from the file
meta, err := loadMetadata(t.meta, t.itemOffset.Load())
if err != nil {
return err
}
t.itemHidden.Store(meta.VirtualTail)
// Read the last index, use the default value in case the freezer is empty
if offsetsSize == indexEntrySize {
lastIndex = indexEntry{filenum: t.tailId, offset: 0}
} else {
t.index.ReadAt(buffer, offsetsSize-indexEntrySize)
lastIndex.unmarshalBinary(buffer)
}
// Print an error log if the index is corrupted due to an incorrect
// last index item. While it is theoretically possible to have a zero offset
// by storing all zero-size items, it is highly unlikely to occur in practice.
if lastIndex.offset == 0 && offsetsSize/indexEntrySize > 1 {
log.Error("Corrupted index file detected", "lastOffset", lastIndex.offset, "indexes", offsetsSize/indexEntrySize)
}
if t.readonly {
t.head, err = t.openFile(lastIndex.filenum, openFreezerFileForReadOnly)
} else {
t.head, err = t.openFile(lastIndex.filenum, openFreezerFileForAppend)
}
if err != nil {
return err
}
if stat, err = t.head.Stat(); err != nil {
return err
}
contentSize = stat.Size()
// Keep truncating both files until they come in sync
contentExp = int64(lastIndex.offset)
for contentExp != contentSize {
if t.readonly {
return fmt.Errorf("freezer table(path: %s, name: %s, num: %d) is corrupted", t.path, t.name, lastIndex.filenum)
}
verbose = true
// Truncate the head file to the last offset pointer
if contentExp < contentSize {
t.logger.Warn("Truncating dangling head", "indexed", contentExp, "stored", contentSize)
if err := truncateFreezerFile(t.head, contentExp); err != nil {
return err
}
contentSize = contentExp
}
// Truncate the index to point within the head file
if contentExp > contentSize {
t.logger.Warn("Truncating dangling indexes", "indexes", offsetsSize/indexEntrySize, "indexed", contentExp, "stored", contentSize)
if err := truncateFreezerFile(t.index, offsetsSize-indexEntrySize); err != nil {
return err
}
offsetsSize -= indexEntrySize
// Read the new head index, use the default value in case
// the freezer is already empty.
var newLastIndex indexEntry
if offsetsSize == indexEntrySize {
newLastIndex = indexEntry{filenum: t.tailId, offset: 0}
} else {
t.index.ReadAt(buffer, offsetsSize-indexEntrySize)
newLastIndex.unmarshalBinary(buffer)
}
// We might have slipped back into an earlier head-file here
if newLastIndex.filenum != lastIndex.filenum {
// Release earlier opened file
t.releaseFile(lastIndex.filenum)
if t.head, err = t.openFile(newLastIndex.filenum, openFreezerFileForAppend); err != nil {
return err
}
if stat, err = t.head.Stat(); err != nil {
// TODO, anything more we can do here?
// A data file has gone missing...
return err
}
contentSize = stat.Size()
}
lastIndex = newLastIndex
contentExp = int64(lastIndex.offset)
}
}
// Sync() fails for read-only files on windows.
if !t.readonly {
// Ensure all reparation changes have been written to disk
if err := t.index.Sync(); err != nil {
return err
}
if err := t.head.Sync(); err != nil {
return err
}
if err := t.meta.Sync(); err != nil {
return err
}
}
// Update the item and byte counters and return
t.items.Store(t.itemOffset.Load() + uint64(offsetsSize/indexEntrySize-1)) // last indexEntry points to the end of the data file
t.headBytes = contentSize
t.headId = lastIndex.filenum
// Delete the leftover files because of head deletion
t.releaseFilesAfter(t.headId, true)
// Delete the leftover files because of tail deletion
t.releaseFilesBefore(t.tailId, true)
// Close opened files and preopen all files
if err := t.preopen(); err != nil {
return err
}
if verbose {
t.logger.Info("Chain freezer table opened", "items", t.items.Load(), "deleted", t.itemOffset.Load(), "hidden", t.itemHidden.Load(), "tailId", t.tailId, "headId", t.headId, "size", t.headBytes)
} else {
t.logger.Debug("Chain freezer table opened", "items", t.items.Load(), "size", common.StorageSize(t.headBytes))
}
return nil
}
// preopen opens all files that the freezer will need. This method should be called from an init-context,
// since it assumes that it doesn't have to bother with locking
// The rationale for doing preopen is to not have to do it from within Retrieve, thus not needing to ever
// obtain a write-lock within Retrieve.
func (t *freezerTable) preopen() (err error) {
// The repair might have already opened (some) files
t.releaseFilesAfter(0, false)
// Open all except head in RDONLY
for i := t.tailId; i < t.headId; i++ {
if _, err = t.openFile(i, openFreezerFileForReadOnly); err != nil {
return err
}
}
if t.readonly {
t.head, err = t.openFile(t.headId, openFreezerFileForReadOnly)
} else {
// Open head in read/write
t.head, err = t.openFile(t.headId, openFreezerFileForAppend)
}
return err
}
// truncateHead discards any recent data above the provided threshold number.
func (t *freezerTable) truncateHead(items uint64) error {
t.lock.Lock()
defer t.lock.Unlock()
// Ensure the given truncate target falls in the correct range
existing := t.items.Load()
if existing <= items {
return nil
}
if items < t.itemHidden.Load() {
return errors.New("truncation below tail")
}
// We need to truncate, save the old size for metrics tracking
oldSize, err := t.sizeNolock()
if err != nil {
return err
}
// Something's out of sync, truncate the table's offset index
log := t.logger.Debug
if existing > items+1 {
log = t.logger.Warn // Only loud warn if we delete multiple items
}
log("Truncating freezer table", "items", existing, "limit", items)
// Truncate the index file first, the tail position is also considered
// when calculating the new freezer table length.
length := items - t.itemOffset.Load()
if err := truncateFreezerFile(t.index, int64(length+1)*indexEntrySize); err != nil {
return err
}
if err := t.index.Sync(); err != nil {
return err
}
// Calculate the new expected size of the data file and truncate it
var expected indexEntry
if length == 0 {
expected = indexEntry{filenum: t.tailId, offset: 0}
} else {
buffer := make([]byte, indexEntrySize)
if _, err := t.index.ReadAt(buffer, int64(length*indexEntrySize)); err != nil {
return err
}
expected.unmarshalBinary(buffer)
}
// We might need to truncate back to older files
if expected.filenum != t.headId {
// If already open for reading, force-reopen for writing
t.releaseFile(expected.filenum)
newHead, err := t.openFile(expected.filenum, openFreezerFileForAppend)
if err != nil {
return err
}
// Release any files _after the current head -- both the previous head
// and any files which may have been opened for reading
t.releaseFilesAfter(expected.filenum, true)
// Set back the historic head
t.head = newHead
t.headId = expected.filenum
}
if err := truncateFreezerFile(t.head, int64(expected.offset)); err != nil {
return err
}
if err := t.head.Sync(); err != nil {
return err
}
// All data files truncated, set internal counters and return
t.headBytes = int64(expected.offset)
t.items.Store(items)
// Retrieve the new size and update the total size counter
newSize, err := t.sizeNolock()
if err != nil {
return err
}
t.sizeGauge.Dec(int64(oldSize - newSize))
return nil
}
// sizeHidden returns the total data size of hidden items in the freezer table.
// This function assumes the lock is already held.
func (t *freezerTable) sizeHidden() (uint64, error) {
hidden, offset := t.itemHidden.Load(), t.itemOffset.Load()
if hidden <= offset {
return 0, nil
}
indices, err := t.getIndices(hidden-1, 1)
if err != nil {
return 0, err
}
return uint64(indices[1].offset), nil
}
// truncateTail discards any recent data before the provided threshold number.
func (t *freezerTable) truncateTail(items uint64) error {
t.lock.Lock()
defer t.lock.Unlock()
// Ensure the given truncate target falls in the correct range
if t.itemHidden.Load() >= items {
return nil
}
if t.items.Load() < items {
return errors.New("truncation above head")
}
// Load the new tail index by the given new tail position
var (
newTailId uint32
buffer = make([]byte, indexEntrySize)
)
if t.items.Load() == items {
newTailId = t.headId
} else {
offset := items - t.itemOffset.Load()
if _, err := t.index.ReadAt(buffer, int64((offset+1)*indexEntrySize)); err != nil {
return err
}
var newTail indexEntry
newTail.unmarshalBinary(buffer)
newTailId = newTail.filenum
}
// Save the old size for metrics tracking. This needs to be done
// before any updates to either itemHidden or itemOffset.
oldSize, err := t.sizeNolock()
if err != nil {
return err
}
// Update the virtual tail marker and hidden these entries in table.
t.itemHidden.Store(items)
if err := writeMetadata(t.meta, newMetadata(items)); err != nil {
return err
}
// Hidden items still fall in the current tail file, no data file
// can be dropped.
if t.tailId == newTailId {
return nil
}
// Hidden items fall in the incorrect range, returns the error.
if t.tailId > newTailId {
return fmt.Errorf("invalid index, tail-file %d, item-file %d", t.tailId, newTailId)
}
// Count how many items can be deleted from the file.
var (
newDeleted = items
deleted = t.itemOffset.Load()
)
// Hidden items exceed the current tail file, drop the relevant data files.
for current := items - 1; current >= deleted; current -= 1 {
if _, err := t.index.ReadAt(buffer, int64((current-deleted+1)*indexEntrySize)); err != nil {
return err
}
var pre indexEntry
pre.unmarshalBinary(buffer)
if pre.filenum != newTailId {
break
}
newDeleted = current
}
// Commit the changes of metadata file first before manipulating
// the indexes file.
if err := t.meta.Sync(); err != nil {
return err
}
// Close the index file before shorten it.
if err := t.index.Close(); err != nil {
return err
}
// Truncate the deleted index entries from the index file.
err = copyFrom(t.index.Name(), t.index.Name(), indexEntrySize*(newDeleted-deleted+1), func(f *os.File) error {
tailIndex := indexEntry{
filenum: newTailId,
offset: uint32(newDeleted),
}
_, err := f.Write(tailIndex.append(nil))
return err
})
if err != nil {
return err
}
// Reopen the modified index file to load the changes
t.index, err = openFreezerFileForAppend(t.index.Name())
if err != nil {
return err
}
// Sync the file to ensure changes are flushed to disk
if err := t.index.Sync(); err != nil {
return err
}
// Release any files before the current tail
t.tailId = newTailId
t.itemOffset.Store(newDeleted)
t.releaseFilesBefore(t.tailId, true)
// Retrieve the new size and update the total size counter
newSize, err := t.sizeNolock()
if err != nil {
return err
}
t.sizeGauge.Dec(int64(oldSize - newSize))
return nil
}
// Close closes all opened files.
func (t *freezerTable) Close() error {
t.lock.Lock()
defer t.lock.Unlock()
var errs []error
doClose := func(f *os.File, sync bool, close bool) {
if sync && !t.readonly {
if err := f.Sync(); err != nil {
errs = append(errs, err)
}
}
if close {
if err := f.Close(); err != nil {
errs = append(errs, err)
}
}
}
// Trying to fsync a file opened in rdonly causes "Access denied"
// error on Windows.
doClose(t.index, true, true)
doClose(t.meta, true, true)
// The preopened non-head data-files are all opened in readonly.
// The head is opened in rw-mode, so we sync it here - but since it's also
// part of t.files, it will be closed in the loop below.
doClose(t.head, true, false) // sync but do not close
for _, f := range t.files {
doClose(f, false, true) // close but do not sync
}
t.index = nil
t.meta = nil
t.head = nil
if errs != nil {
return fmt.Errorf("%v", errs)
}
return nil
}
// openFile assumes that the write-lock is held by the caller
func (t *freezerTable) openFile(num uint32, opener func(string) (*os.File, error)) (f *os.File, err error) {
var exist bool
if f, exist = t.files[num]; !exist {
var name string
if t.noCompression {
name = fmt.Sprintf("%s.%04d.rdat", t.name, num)
} else {
name = fmt.Sprintf("%s.%04d.cdat", t.name, num)
}
f, err = opener(filepath.Join(t.path, name))
if err != nil {
return nil, err
}
t.files[num] = f
}
return f, err
}
// releaseFile closes a file, and removes it from the open file cache.
// Assumes that the caller holds the write lock
func (t *freezerTable) releaseFile(num uint32) {
if f, exist := t.files[num]; exist {
delete(t.files, num)
f.Close()
}
}
// releaseFilesAfter closes all open files with a higher number, and optionally also deletes the files
func (t *freezerTable) releaseFilesAfter(num uint32, remove bool) {
for fnum, f := range t.files {
if fnum > num {
delete(t.files, fnum)
f.Close()
if remove {
os.Remove(f.Name())
}
}
}
}
// releaseFilesBefore closes all open files with a lower number, and optionally also deletes the files
func (t *freezerTable) releaseFilesBefore(num uint32, remove bool) {
for fnum, f := range t.files {
if fnum < num {
delete(t.files, fnum)
f.Close()
if remove {
os.Remove(f.Name())
}
}
}
}
// getIndices returns the index entries for the given from-item, covering 'count' items.
// N.B: The actual number of returned indices for N items will always be N+1 (unless an
// error is returned).
// OBS: This method assumes that the caller has already verified (and/or trimmed) the range
// so that the items are within bounds. If this method is used to read out of bounds,
// it will return error.
func (t *freezerTable) getIndices(from, count uint64) ([]*indexEntry, error) {
// Apply the table-offset
from = from - t.itemOffset.Load()
// For reading N items, we need N+1 indices.
buffer := make([]byte, (count+1)*indexEntrySize)
if _, err := t.index.ReadAt(buffer, int64(from*indexEntrySize)); err != nil {
return nil, err
}
var (
indices []*indexEntry
offset int
)
for i := from; i <= from+count; i++ {
index := new(indexEntry)
index.unmarshalBinary(buffer[offset:])
offset += indexEntrySize
indices = append(indices, index)
}
if from == 0 {
// Special case if we're reading the first item in the freezer. We assume that
// the first item always start from zero(regarding the deletion, we
// only support deletion by files, so that the assumption is held).
// This means we can use the first item metadata to carry information about
// the 'global' offset, for the deletion-case
indices[0].offset = 0
indices[0].filenum = indices[1].filenum
}
return indices, nil
}
// Retrieve looks up the data offset of an item with the given number and retrieves
// the raw binary blob from the data file.
func (t *freezerTable) Retrieve(item uint64) ([]byte, error) {
items, err := t.RetrieveItems(item, 1, 0)
if err != nil {
return nil, err
}
return items[0], nil
}
// RetrieveItems returns multiple items in sequence, starting from the index 'start'.
// It will return at most 'max' items, but will abort earlier to respect the
// 'maxBytes' argument. However, if the 'maxBytes' is smaller than the size of one
// item, it _will_ return one element and possibly overflow the maxBytes.
func (t *freezerTable) RetrieveItems(start, count, maxBytes uint64) ([][]byte, error) {
// First we read the 'raw' data, which might be compressed.
diskData, sizes, err := t.retrieveItems(start, count, maxBytes)
if err != nil {
return nil, err
}
var (
output = make([][]byte, 0, count)
offset int // offset for reading
outputSize int // size of uncompressed data
)
// Now slice up the data and decompress.
for i, diskSize := range sizes {
item := diskData[offset : offset+diskSize]
offset += diskSize
decompressedSize := diskSize
if !t.noCompression {
decompressedSize, _ = snappy.DecodedLen(item)
}
if i > 0 && maxBytes != 0 && uint64(outputSize+decompressedSize) > maxBytes {
break
}
if !t.noCompression {
data, err := snappy.Decode(nil, item)
if err != nil {
return nil, err
}
output = append(output, data)
} else {
output = append(output, item)
}
outputSize += decompressedSize
}
return output, nil
}
// retrieveItems reads up to 'count' items from the table. It reads at least
// one item, but otherwise avoids reading more than maxBytes bytes. Freezer
// will ignore the size limitation and continuously allocate memory to store
// data if maxBytes is 0. It returns the (potentially compressed) data, and
// the sizes.
func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []int, error) {
t.lock.RLock()
defer t.lock.RUnlock()
// Ensure the table and the item are accessible
if t.index == nil || t.head == nil || t.meta == nil {
return nil, nil, errClosed
}
var (
items = t.items.Load() // the total items(head + 1)
hidden = t.itemHidden.Load() // the number of hidden items
)
// Ensure the start is written, not deleted from the tail, and that the
// caller actually wants something
if items <= start || hidden > start || count == 0 {
return nil, nil, errOutOfBounds
}
if start+count > items {
count = items - start
}
var output []byte // Buffer to read data into
if maxBytes != 0 {
output = make([]byte, 0, maxBytes)
} else {
output = make([]byte, 0, 1024) // initial buffer cap
}
// readData is a helper method to read a single data item from disk.
readData := func(fileId, start uint32, length int) error {
output = grow(output, length)
dataFile, exist := t.files[fileId]
if !exist {
return fmt.Errorf("missing data file %d", fileId)
}
if _, err := dataFile.ReadAt(output[len(output)-length:], int64(start)); err != nil {
return fmt.Errorf("%w, fileid: %d, start: %d, length: %d", err, fileId, start, length)
}
return nil
}
// Read all the indexes in one go
indices, err := t.getIndices(start, count)
if err != nil {
return nil, nil, err
}
var (
sizes []int // The sizes for each element
totalSize = 0 // The total size of all data read so far
readStart = indices[0].offset // Where, in the file, to start reading
unreadSize = 0 // The size of the as-yet-unread data
)
for i, firstIndex := range indices[:len(indices)-1] {
secondIndex := indices[i+1]
// Determine the size of the item.
offset1, offset2, _ := firstIndex.bounds(secondIndex)
size := int(offset2 - offset1)
// Crossing a file boundary?
if secondIndex.filenum != firstIndex.filenum {
// If we have unread data in the first file, we need to do that read now.
if unreadSize > 0 {
if err := readData(firstIndex.filenum, readStart, unreadSize); err != nil {
return nil, nil, err
}
unreadSize = 0
}
readStart = 0
}
if i > 0 && uint64(totalSize+size) > maxBytes && maxBytes != 0 {
// About to break out due to byte limit being exceeded. We don't
// read this last item, but we need to do the deferred reads now.
if unreadSize > 0 {
if err := readData(secondIndex.filenum, readStart, unreadSize); err != nil {
return nil, nil, err
}
}
break
}
// Defer the read for later
unreadSize += size
totalSize += size
sizes = append(sizes, size)
if i == len(indices)-2 || (uint64(totalSize) > maxBytes && maxBytes != 0) {
// Last item, need to do the read now
if err := readData(secondIndex.filenum, readStart, unreadSize); err != nil {
return nil, nil, err
}
break
}
}
// Update metrics.
t.readMeter.Mark(int64(totalSize))
return output, sizes, nil
}
// has returns an indicator whether the specified number data is still accessible
// in the freezer table.
func (t *freezerTable) has(number uint64) bool {
return t.items.Load() > number && t.itemHidden.Load() <= number
}
// size returns the total data size in the freezer table.
func (t *freezerTable) size() (uint64, error) {
t.lock.RLock()
defer t.lock.RUnlock()
return t.sizeNolock()
}
// sizeNolock returns the total data size in the freezer table. This function
// assumes the lock is already held.
func (t *freezerTable) sizeNolock() (uint64, error) {
stat, err := t.index.Stat()
if err != nil {
return 0, err
}
hidden, err := t.sizeHidden()
if err != nil {
return 0, err
}
total := uint64(t.maxFileSize)*uint64(t.headId-t.tailId) + uint64(t.headBytes) + uint64(stat.Size()) - hidden
return total, nil
}
// advanceHead should be called when the current head file would outgrow the file limits,
// and a new file must be opened. The caller of this method must hold the write-lock
// before calling this method.
func (t *freezerTable) advanceHead() error {
t.lock.Lock()
defer t.lock.Unlock()
// We open the next file in truncated mode -- if this file already
// exists, we need to start over from scratch on it.
nextID := t.headId + 1
newHead, err := t.openFile(nextID, openFreezerFileTruncated)
if err != nil {
return err
}
// Commit the contents of the old file to stable storage and
// tear it down. It will be re-opened in read-only mode.
if err := t.head.Sync(); err != nil {
return err
}
t.releaseFile(t.headId)
t.openFile(t.headId, openFreezerFileForReadOnly)
// Swap out the current head.
t.head = newHead
t.headBytes = 0
t.headId = nextID
return nil
}
// Sync pushes any pending data from memory out to disk. This is an expensive
// operation, so use it with care.
func (t *freezerTable) Sync() error {
t.lock.Lock()
defer t.lock.Unlock()
if t.index == nil || t.head == nil || t.meta == nil {
return errClosed
}
var err error
trackError := func(e error) {
if e != nil && err == nil {
err = e
}
}
trackError(t.index.Sync())
trackError(t.meta.Sync())
trackError(t.head.Sync())
return err
}
func (t *freezerTable) dumpIndexStdout(start, stop int64) {
t.dumpIndex(os.Stdout, start, stop)
}
func (t *freezerTable) dumpIndexString(start, stop int64) string {
var out bytes.Buffer
out.WriteString("\n")
t.dumpIndex(&out, start, stop)
return out.String()
}
func (t *freezerTable) dumpIndex(w io.Writer, start, stop int64) {
meta, err := readMetadata(t.meta)
if err != nil {
fmt.Fprintf(w, "Failed to decode freezer table %v\n", err)
return
}
fmt.Fprintf(w, "Version %d count %d, deleted %d, hidden %d\n", meta.Version,
t.items.Load(), t.itemOffset.Load(), t.itemHidden.Load())
buf := make([]byte, indexEntrySize)
fmt.Fprintf(w, "| number | fileno | offset |\n")
fmt.Fprintf(w, "|--------|--------|--------|\n")
for i := uint64(start); ; i++ {
if _, err := t.index.ReadAt(buf, int64((i+1)*indexEntrySize)); err != nil {
break
}
var entry indexEntry
entry.unmarshalBinary(buf)
fmt.Fprintf(w, "| %03d | %03d | %03d | \n", i, entry.filenum, entry.offset)
if stop > 0 && i >= uint64(stop) {
break
}
}
fmt.Fprintf(w, "|--------------------------|\n")
}

File diff suppressed because it is too large Load Diff

@ -0,0 +1,483 @@
// Copyright 2021 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package rawdb
import (
"bytes"
"errors"
"fmt"
"math/big"
"math/rand"
"os"
"path"
"path/filepath"
"sync"
"testing"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/rlp"
"github.com/stretchr/testify/require"
)
var freezerTestTableDef = map[string]bool{"test": true}
func TestFreezerModify(t *testing.T) {
t.Parallel()
// Create test data.
var valuesRaw [][]byte
var valuesRLP []*big.Int
for x := 0; x < 100; x++ {
v := getChunk(256, x)
valuesRaw = append(valuesRaw, v)
iv := big.NewInt(int64(x))
iv = iv.Exp(iv, iv, nil)
valuesRLP = append(valuesRLP, iv)
}
tables := map[string]bool{"raw": true, "rlp": false}
f, _ := newFreezerForTesting(t, tables)
defer f.Close()
// Commit test data.
_, err := f.ModifyAncients(func(op ethdb.AncientWriteOp) error {
for i := range valuesRaw {
if err := op.AppendRaw("raw", uint64(i), valuesRaw[i]); err != nil {
return err
}
if err := op.Append("rlp", uint64(i), valuesRLP[i]); err != nil {
return err
}
}
return nil
})
if err != nil {
t.Fatal("ModifyAncients failed:", err)
}
// Dump indexes.
for _, table := range f.tables {
t.Log(table.name, "index:", table.dumpIndexString(0, int64(len(valuesRaw))))
}
// Read back test data.
checkAncientCount(t, f, "raw", uint64(len(valuesRaw)))
checkAncientCount(t, f, "rlp", uint64(len(valuesRLP)))
for i := range valuesRaw {
v, _ := f.Ancient("raw", uint64(i))
if !bytes.Equal(v, valuesRaw[i]) {
t.Fatalf("wrong raw value at %d: %x", i, v)
}
ivEnc, _ := f.Ancient("rlp", uint64(i))
want, _ := rlp.EncodeToBytes(valuesRLP[i])
if !bytes.Equal(ivEnc, want) {
t.Fatalf("wrong RLP value at %d: %x", i, ivEnc)
}
}
}
// This checks that ModifyAncients rolls back freezer updates
// when the function passed to it returns an error.
func TestFreezerModifyRollback(t *testing.T) {
t.Parallel()
f, dir := newFreezerForTesting(t, freezerTestTableDef)
theError := errors.New("oops")
_, err := f.ModifyAncients(func(op ethdb.AncientWriteOp) error {
// Append three items. This creates two files immediately,
// because the table size limit of the test freezer is 2048.
require.NoError(t, op.AppendRaw("test", 0, make([]byte, 2048)))
require.NoError(t, op.AppendRaw("test", 1, make([]byte, 2048)))
require.NoError(t, op.AppendRaw("test", 2, make([]byte, 2048)))
return theError
})
if err != theError {
t.Errorf("ModifyAncients returned wrong error %q", err)
}
checkAncientCount(t, f, "test", 0)
f.Close()
// Reopen and check that the rolled-back data doesn't reappear.
tables := map[string]bool{"test": true}
f2, err := NewFreezer(dir, "", false, 2049, tables)
if err != nil {
t.Fatalf("can't reopen freezer after failed ModifyAncients: %v", err)
}
defer f2.Close()
checkAncientCount(t, f2, "test", 0)
}
// This test runs ModifyAncients and Ancient concurrently with each other.
func TestFreezerConcurrentModifyRetrieve(t *testing.T) {
t.Parallel()
f, _ := newFreezerForTesting(t, freezerTestTableDef)
defer f.Close()
var (
numReaders = 5
writeBatchSize = uint64(50)
written = make(chan uint64, numReaders*6)
wg sync.WaitGroup
)
wg.Add(numReaders + 1)
// Launch the writer. It appends 10000 items in batches.
go func() {
defer wg.Done()
defer close(written)
for item := uint64(0); item < 10000; item += writeBatchSize {
_, err := f.ModifyAncients(func(op ethdb.AncientWriteOp) error {
for i := uint64(0); i < writeBatchSize; i++ {
item := item + i
value := getChunk(32, int(item))
if err := op.AppendRaw("test", item, value); err != nil {
return err
}
}
return nil
})
if err != nil {
panic(err)
}
for i := 0; i < numReaders; i++ {
written <- item + writeBatchSize
}
}
}()
// Launch the readers. They read random items from the freezer up to the
// current frozen item count.
for i := 0; i < numReaders; i++ {
go func() {
defer wg.Done()
for frozen := range written {
for rc := 0; rc < 80; rc++ {
num := uint64(rand.Intn(int(frozen)))
value, err := f.Ancient("test", num)
if err != nil {
panic(fmt.Errorf("error reading %d (frozen %d): %v", num, frozen, err))
}
if !bytes.Equal(value, getChunk(32, int(num))) {
panic(fmt.Errorf("wrong value at %d", num))
}
}
}
}()
}
wg.Wait()
}
// This test runs ModifyAncients and TruncateHead concurrently with each other.
func TestFreezerConcurrentModifyTruncate(t *testing.T) {
f, _ := newFreezerForTesting(t, freezerTestTableDef)
defer f.Close()
var item = make([]byte, 256)
for i := 0; i < 10; i++ {
// First reset and write 100 items.
if _, err := f.TruncateHead(0); err != nil {
t.Fatal("truncate failed:", err)
}
_, err := f.ModifyAncients(func(op ethdb.AncientWriteOp) error {
for i := uint64(0); i < 100; i++ {
if err := op.AppendRaw("test", i, item); err != nil {
return err
}
}
return nil
})
if err != nil {
t.Fatal("modify failed:", err)
}
checkAncientCount(t, f, "test", 100)
// Now append 100 more items and truncate concurrently.
var (
wg sync.WaitGroup
truncateErr error
modifyErr error
)
wg.Add(3)
go func() {
_, modifyErr = f.ModifyAncients(func(op ethdb.AncientWriteOp) error {
for i := uint64(100); i < 200; i++ {
if err := op.AppendRaw("test", i, item); err != nil {
return err
}
}
return nil
})
wg.Done()
}()
go func() {
_, truncateErr = f.TruncateHead(10)
wg.Done()
}()
go func() {
f.AncientSize("test")
wg.Done()
}()
wg.Wait()
// Now check the outcome. If the truncate operation went through first, the append
// fails, otherwise it succeeds. In either case, the freezer should be positioned
// at 10 after both operations are done.
if truncateErr != nil {
t.Fatal("concurrent truncate failed:", err)
}
if !(errors.Is(modifyErr, nil) || errors.Is(modifyErr, errOutOrderInsertion)) {
t.Fatal("wrong error from concurrent modify:", modifyErr)
}
checkAncientCount(t, f, "test", 10)
}
}
func TestFreezerReadonlyValidate(t *testing.T) {
tables := map[string]bool{"a": true, "b": true}
dir := t.TempDir()
// Open non-readonly freezer and fill individual tables
// with different amount of data.
f, err := NewFreezer(dir, "", false, 2049, tables)
if err != nil {
t.Fatal("can't open freezer", err)
}
var item = make([]byte, 1024)
aBatch := f.tables["a"].newBatch()
require.NoError(t, aBatch.AppendRaw(0, item))
require.NoError(t, aBatch.AppendRaw(1, item))
require.NoError(t, aBatch.AppendRaw(2, item))
require.NoError(t, aBatch.commit())
bBatch := f.tables["b"].newBatch()
require.NoError(t, bBatch.AppendRaw(0, item))
require.NoError(t, bBatch.commit())
if f.tables["a"].items.Load() != 3 {
t.Fatalf("unexpected number of items in table")
}
if f.tables["b"].items.Load() != 1 {
t.Fatalf("unexpected number of items in table")
}
require.NoError(t, f.Close())
// Re-openening as readonly should fail when validating
// table lengths.
_, err = NewFreezer(dir, "", true, 2049, tables)
if err == nil {
t.Fatal("readonly freezer should fail with differing table lengths")
}
}
func TestFreezerConcurrentReadonly(t *testing.T) {
t.Parallel()
tables := map[string]bool{"a": true}
dir := t.TempDir()
f, err := NewFreezer(dir, "", false, 2049, tables)
if err != nil {
t.Fatal("can't open freezer", err)
}
var item = make([]byte, 1024)
batch := f.tables["a"].newBatch()
items := uint64(10)
for i := uint64(0); i < items; i++ {
require.NoError(t, batch.AppendRaw(i, item))
}
require.NoError(t, batch.commit())
if loaded := f.tables["a"].items.Load(); loaded != items {
t.Fatalf("unexpected number of items in table, want: %d, have: %d", items, loaded)
}
require.NoError(t, f.Close())
var (
wg sync.WaitGroup
fs = make([]*Freezer, 5)
errs = make([]error, 5)
)
for i := 0; i < 5; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
f, err := NewFreezer(dir, "", true, 2049, tables)
if err == nil {
fs[i] = f
} else {
errs[i] = err
}
}(i)
}
wg.Wait()
for i := range fs {
if err := errs[i]; err != nil {
t.Fatal("failed to open freezer", err)
}
require.NoError(t, fs[i].Close())
}
}
func newFreezerForTesting(t *testing.T, tables map[string]bool) (*Freezer, string) {
t.Helper()
dir := t.TempDir()
// note: using low max table size here to ensure the tests actually
// switch between multiple files.
f, err := NewFreezer(dir, "", false, 2049, tables)
if err != nil {
t.Fatal("can't open freezer", err)
}
return f, dir
}
// checkAncientCount verifies that the freezer contains n items.
func checkAncientCount(t *testing.T, f *Freezer, kind string, n uint64) {
t.Helper()
if frozen, _ := f.Ancients(); frozen != n {
t.Fatalf("Ancients() returned %d, want %d", frozen, n)
}
// Check at index n-1.
if n > 0 {
index := n - 1
if ok, _ := f.HasAncient(kind, index); !ok {
t.Errorf("HasAncient(%q, %d) returned false unexpectedly", kind, index)
}
if _, err := f.Ancient(kind, index); err != nil {
t.Errorf("Ancient(%q, %d) returned unexpected error %q", kind, index, err)
}
}
// Check at index n.
index := n
if ok, _ := f.HasAncient(kind, index); ok {
t.Errorf("HasAncient(%q, %d) returned true unexpectedly", kind, index)
}
if _, err := f.Ancient(kind, index); err == nil {
t.Errorf("Ancient(%q, %d) didn't return expected error", kind, index)
} else if err != errOutOfBounds {
t.Errorf("Ancient(%q, %d) returned unexpected error %q", kind, index, err)
}
}
func TestRenameWindows(t *testing.T) {
var (
fname = "file.bin"
fname2 = "file2.bin"
data = []byte{1, 2, 3, 4}
data2 = []byte{2, 3, 4, 5}
data3 = []byte{3, 5, 6, 7}
dataLen = 4
)
// Create 2 temp dirs
dir1 := t.TempDir()
dir2 := t.TempDir()
// Create file in dir1 and fill with data
f, err := os.Create(filepath.Join(dir1, fname))
if err != nil {
t.Fatal(err)
}
f2, err := os.Create(path.Join(dir1, fname2))
if err != nil {
t.Fatal(err)
}
f3, err := os.Create(path.Join(dir2, fname2))
if err != nil {
t.Fatal(err)
}
if _, err := f.Write(data); err != nil {
t.Fatal(err)
}
if _, err := f2.Write(data2); err != nil {
t.Fatal(err)
}
if _, err := f3.Write(data3); err != nil {
t.Fatal(err)
}
if err := f.Close(); err != nil {
t.Fatal(err)
}
if err := f2.Close(); err != nil {
t.Fatal(err)
}
if err := f3.Close(); err != nil {
t.Fatal(err)
}
if err := os.Rename(f.Name(), path.Join(dir2, fname)); err != nil {
t.Fatal(err)
}
if err := os.Rename(f2.Name(), path.Join(dir2, fname2)); err != nil {
t.Fatal(err)
}
// Check file contents
f, err = os.Open(path.Join(dir2, fname))
if err != nil {
t.Fatal(err)
}
defer f.Close()
defer os.Remove(f.Name())
buf := make([]byte, dataLen)
if _, err := f.Read(buf); err != nil {
t.Fatal(err)
}
if !bytes.Equal(buf, data) {
t.Errorf("unexpected file contents. Got %v\n", buf)
}
f, err = os.Open(path.Join(dir2, fname2))
if err != nil {
t.Fatal(err)
}
defer f.Close()
defer os.Remove(f.Name())
if _, err := f.Read(buf); err != nil {
t.Fatal(err)
}
if !bytes.Equal(buf, data2) {
t.Errorf("unexpected file contents. Got %v\n", buf)
}
}
func TestFreezerCloseSync(t *testing.T) {
t.Parallel()
f, _ := newFreezerForTesting(t, map[string]bool{"a": true, "b": true})
defer f.Close()
// Now, close and sync. This mimics the behaviour if the node is shut down,
// just as the chain freezer is writing.
// 1: thread-1: chain treezer writes, via freezeRange (holds lock)
// 2: thread-2: Close called, waits for write to finish
// 3: thread-1: finishes writing, releases lock
// 4: thread-2: obtains lock, completes Close()
// 5: thread-1: calls f.Sync()
if err := f.Close(); err != nil {
t.Fatal(err)
}
if err := f.Sync(); err == nil {
t.Fatalf("want error, have nil")
} else if have, want := err.Error(), "[closed closed]"; have != want {
t.Fatalf("want %v, have %v", have, want)
}
}

@ -0,0 +1,131 @@
// Copyright 2022 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package rawdb
import (
"io"
"os"
"path/filepath"
)
// copyFrom copies data from 'srcPath' at offset 'offset' into 'destPath'.
// The 'destPath' is created if it doesn't exist, otherwise it is overwritten.
// Before the copy is executed, there is a callback can be registered to
// manipulate the dest file.
// It is perfectly valid to have destPath == srcPath.
func copyFrom(srcPath, destPath string, offset uint64, before func(f *os.File) error) error {
// Create a temp file in the same dir where we want it to wind up
f, err := os.CreateTemp(filepath.Dir(destPath), "*")
if err != nil {
return err
}
fname := f.Name()
// Clean up the leftover file
defer func() {
if f != nil {
f.Close()
}
os.Remove(fname)
}()
// Apply the given function if it's not nil before we copy
// the content from the src.
if before != nil {
if err := before(f); err != nil {
return err
}
}
// Open the source file
src, err := os.Open(srcPath)
if err != nil {
return err
}
if _, err = src.Seek(int64(offset), 0); err != nil {
src.Close()
return err
}
// io.Copy uses 32K buffer internally.
_, err = io.Copy(f, src)
if err != nil {
src.Close()
return err
}
// Rename the temporary file to the specified dest name.
// src may be same as dest, so needs to be closed before
// we do the final move.
src.Close()
if err := f.Close(); err != nil {
return err
}
f = nil
return os.Rename(fname, destPath)
}
// openFreezerFileForAppend opens a freezer table file and seeks to the end
func openFreezerFileForAppend(filename string) (*os.File, error) {
// Open the file without the O_APPEND flag
// because it has differing behaviour during Truncate operations
// on different OS's
file, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
return nil, err
}
// Seek to end for append
if _, err = file.Seek(0, io.SeekEnd); err != nil {
return nil, err
}
return file, nil
}
// openFreezerFileForReadOnly opens a freezer table file for read only access
func openFreezerFileForReadOnly(filename string) (*os.File, error) {
return os.OpenFile(filename, os.O_RDONLY, 0644)
}
// openFreezerFileTruncated opens a freezer table making sure it is truncated
func openFreezerFileTruncated(filename string) (*os.File, error) {
return os.OpenFile(filename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
}
// truncateFreezerFile resizes a freezer table file and seeks to the end
func truncateFreezerFile(file *os.File, size int64) error {
if err := file.Truncate(size); err != nil {
return err
}
// Seek to end for append
if _, err := file.Seek(0, io.SeekEnd); err != nil {
return err
}
return nil
}
// grow prepares the slice space for new item, and doubles the slice capacity
// if space is not enough.
func grow(buf []byte, n int) []byte {
if cap(buf)-len(buf) < n {
newcap := 2 * cap(buf)
if newcap-len(buf) < n {
newcap = len(buf) + n
}
nbuf := make([]byte, len(buf), newcap)
copy(nbuf, buf)
buf = nbuf
}
buf = buf[:len(buf)+n]
return buf
}

@ -0,0 +1,75 @@
// Copyright 2022 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package rawdb
import (
"bytes"
"os"
"testing"
)
func TestCopyFrom(t *testing.T) {
var (
content = []byte{0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8}
prefix = []byte{0x9, 0xa, 0xb, 0xc, 0xd, 0xf}
)
var cases = []struct {
src, dest string
offset uint64
writePrefix bool
}{
{"foo", "bar", 0, false},
{"foo", "bar", 1, false},
{"foo", "bar", 8, false},
{"foo", "foo", 0, false},
{"foo", "foo", 1, false},
{"foo", "foo", 8, false},
{"foo", "bar", 0, true},
{"foo", "bar", 1, true},
{"foo", "bar", 8, true},
}
for _, c := range cases {
os.WriteFile(c.src, content, 0600)
if err := copyFrom(c.src, c.dest, c.offset, func(f *os.File) error {
if !c.writePrefix {
return nil
}
f.Write(prefix)
return nil
}); err != nil {
os.Remove(c.src)
t.Fatalf("Failed to copy %v", err)
}
blob, err := os.ReadFile(c.dest)
if err != nil {
os.Remove(c.src)
os.Remove(c.dest)
t.Fatalf("Failed to read %v", err)
}
want := content[c.offset:]
if c.writePrefix {
want = append(prefix, want...)
}
if !bytes.Equal(blob, want) {
t.Fatal("Unexpected value")
}
os.Remove(c.src)
os.Remove(c.dest)
}
}
Loading…
Cancel
Save