From 84c641909ad5183d8308cfa4dacc54fbd031f762 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CGheisMohammadi=E2=80=9D?= <36589218+GheisMohammadi@users.noreply.github.com> Date: Thu, 14 Mar 2024 14:19:05 +0100 Subject: [PATCH] add freezer db --- core/rawdb/freezer.go | 509 +++++++++ core/rawdb/freezer_batch.go | 255 +++++ core/rawdb/freezer_meta.go | 109 ++ core/rawdb/freezer_meta_test.go | 60 ++ core/rawdb/freezer_resettable.go | 238 +++++ core/rawdb/freezer_resettable_test.go | 107 ++ core/rawdb/freezer_table.go | 990 ++++++++++++++++++ core/rawdb/freezer_table_test.go | 1369 +++++++++++++++++++++++++ core/rawdb/freezer_test.go | 483 +++++++++ core/rawdb/freezer_utils.go | 131 +++ core/rawdb/freezer_utils_test.go | 75 ++ 11 files changed, 4326 insertions(+) create mode 100644 core/rawdb/freezer.go create mode 100644 core/rawdb/freezer_batch.go create mode 100644 core/rawdb/freezer_meta.go create mode 100644 core/rawdb/freezer_meta_test.go create mode 100644 core/rawdb/freezer_resettable.go create mode 100644 core/rawdb/freezer_resettable_test.go create mode 100644 core/rawdb/freezer_table.go create mode 100644 core/rawdb/freezer_table_test.go create mode 100644 core/rawdb/freezer_test.go create mode 100644 core/rawdb/freezer_utils.go create mode 100644 core/rawdb/freezer_utils_test.go diff --git a/core/rawdb/freezer.go b/core/rawdb/freezer.go new file mode 100644 index 000000000..b7824ddc0 --- /dev/null +++ b/core/rawdb/freezer.go @@ -0,0 +1,509 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rawdb + +import ( + "errors" + "fmt" + "math" + "os" + "path/filepath" + "sync" + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" + "github.com/gofrs/flock" +) + +var ( + // errReadOnly is returned if the freezer is opened in read only mode. All the + // mutations are disallowed. + errReadOnly = errors.New("read only") + + // errUnknownTable is returned if the user attempts to read from a table that is + // not tracked by the freezer. + errUnknownTable = errors.New("unknown table") + + // errOutOrderInsertion is returned if the user attempts to inject out-of-order + // binary blobs into the freezer. + errOutOrderInsertion = errors.New("the append operation is out-order") + + // errSymlinkDatadir is returned if the ancient directory specified by user + // is a symbolic link. + errSymlinkDatadir = errors.New("symbolic link datadir is not supported") +) + +// freezerTableSize defines the maximum size of freezer data files. +const freezerTableSize = 2 * 1000 * 1000 * 1000 + +// Freezer is a memory mapped append-only database to store immutable ordered +// data into flat files: +// +// - The append-only nature ensures that disk writes are minimized. +// - The memory mapping ensures we can max out system memory for caching without +// reserving it for go-ethereum. This would also reduce the memory requirements +// of Geth, and thus also GC overhead. +type Freezer struct { + frozen atomic.Uint64 // Number of blocks already frozen + tail atomic.Uint64 // Number of the first stored item in the freezer + + // This lock synchronizes writers and the truncate operation, as well as + // the "atomic" (batched) read operations. + writeLock sync.RWMutex + writeBatch *freezerBatch + + readonly bool + tables map[string]*freezerTable // Data tables for storing everything + instanceLock *flock.Flock // File-system lock to prevent double opens + closeOnce sync.Once +} + +// NewChainFreezer is a small utility method around NewFreezer that sets the +// default parameters for the chain storage. +func NewChainFreezer(datadir string, namespace string, readonly bool) (*Freezer, error) { + return NewFreezer(datadir, namespace, readonly, freezerTableSize, chainFreezerNoSnappy) +} + +// NewFreezer creates a freezer instance for maintaining immutable ordered +// data according to the given parameters. +// +// The 'tables' argument defines the data tables. If the value of a map +// entry is true, snappy compression is disabled for the table. +func NewFreezer(datadir string, namespace string, readonly bool, maxTableSize uint32, tables map[string]bool) (*Freezer, error) { + // Create the initial freezer object + var ( + readMeter = metrics.NewRegisteredMeter(namespace+"ancient/read", nil) + writeMeter = metrics.NewRegisteredMeter(namespace+"ancient/write", nil) + sizeGauge = metrics.NewRegisteredGauge(namespace+"ancient/size", nil) + ) + // Ensure the datadir is not a symbolic link if it exists. + if info, err := os.Lstat(datadir); !os.IsNotExist(err) { + if info.Mode()&os.ModeSymlink != 0 { + log.Warn("Symbolic link ancient database is not supported", "path", datadir) + return nil, errSymlinkDatadir + } + } + flockFile := filepath.Join(datadir, "FLOCK") + if err := os.MkdirAll(filepath.Dir(flockFile), 0755); err != nil { + return nil, err + } + // Leveldb uses LOCK as the filelock filename. To prevent the + // name collision, we use FLOCK as the lock name. + lock := flock.New(flockFile) + tryLock := lock.TryLock + if readonly { + tryLock = lock.TryRLock + } + if locked, err := tryLock(); err != nil { + return nil, err + } else if !locked { + return nil, errors.New("locking failed") + } + // Open all the supported data tables + freezer := &Freezer{ + readonly: readonly, + tables: make(map[string]*freezerTable), + instanceLock: lock, + } + + // Create the tables. + for name, disableSnappy := range tables { + table, err := newTable(datadir, name, readMeter, writeMeter, sizeGauge, maxTableSize, disableSnappy, readonly) + if err != nil { + for _, table := range freezer.tables { + table.Close() + } + lock.Unlock() + return nil, err + } + freezer.tables[name] = table + } + var err error + if freezer.readonly { + // In readonly mode only validate, don't truncate. + // validate also sets `freezer.frozen`. + err = freezer.validate() + } else { + // Truncate all tables to common length. + err = freezer.repair() + } + if err != nil { + for _, table := range freezer.tables { + table.Close() + } + lock.Unlock() + return nil, err + } + + // Create the write batch. + freezer.writeBatch = newFreezerBatch(freezer) + + log.Info("Opened ancient database", "database", datadir, "readonly", readonly) + return freezer, nil +} + +// Close terminates the chain freezer, unmapping all the data files. +func (f *Freezer) Close() error { + f.writeLock.Lock() + defer f.writeLock.Unlock() + + var errs []error + f.closeOnce.Do(func() { + for _, table := range f.tables { + if err := table.Close(); err != nil { + errs = append(errs, err) + } + } + if err := f.instanceLock.Unlock(); err != nil { + errs = append(errs, err) + } + }) + if errs != nil { + return fmt.Errorf("%v", errs) + } + return nil +} + +// HasAncient returns an indicator whether the specified ancient data exists +// in the freezer. +func (f *Freezer) HasAncient(kind string, number uint64) (bool, error) { + if table := f.tables[kind]; table != nil { + return table.has(number), nil + } + return false, nil +} + +// Ancient retrieves an ancient binary blob from the append-only immutable files. +func (f *Freezer) Ancient(kind string, number uint64) ([]byte, error) { + if table := f.tables[kind]; table != nil { + return table.Retrieve(number) + } + return nil, errUnknownTable +} + +// AncientRange retrieves multiple items in sequence, starting from the index 'start'. +// It will return +// - at most 'count' items, +// - if maxBytes is specified: at least 1 item (even if exceeding the maxByteSize), +// but will otherwise return as many items as fit into maxByteSize. +// - if maxBytes is not specified, 'count' items will be returned if they are present. +func (f *Freezer) AncientRange(kind string, start, count, maxBytes uint64) ([][]byte, error) { + if table := f.tables[kind]; table != nil { + return table.RetrieveItems(start, count, maxBytes) + } + return nil, errUnknownTable +} + +// Ancients returns the length of the frozen items. +func (f *Freezer) Ancients() (uint64, error) { + return f.frozen.Load(), nil +} + +// Tail returns the number of first stored item in the freezer. +func (f *Freezer) Tail() (uint64, error) { + return f.tail.Load(), nil +} + +// AncientSize returns the ancient size of the specified category. +func (f *Freezer) AncientSize(kind string) (uint64, error) { + // This needs the write lock to avoid data races on table fields. + // Speed doesn't matter here, AncientSize is for debugging. + f.writeLock.RLock() + defer f.writeLock.RUnlock() + + if table := f.tables[kind]; table != nil { + return table.size() + } + return 0, errUnknownTable +} + +// ReadAncients runs the given read operation while ensuring that no writes take place +// on the underlying freezer. +func (f *Freezer) ReadAncients(fn func(ethdb.AncientReaderOp) error) (err error) { + f.writeLock.RLock() + defer f.writeLock.RUnlock() + + return fn(f) +} + +// ModifyAncients runs the given write operation. +func (f *Freezer) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (writeSize int64, err error) { + if f.readonly { + return 0, errReadOnly + } + f.writeLock.Lock() + defer f.writeLock.Unlock() + + // Roll back all tables to the starting position in case of error. + prevItem := f.frozen.Load() + defer func() { + if err != nil { + // The write operation has failed. Go back to the previous item position. + for name, table := range f.tables { + err := table.truncateHead(prevItem) + if err != nil { + log.Error("Freezer table roll-back failed", "table", name, "index", prevItem, "err", err) + } + } + } + }() + + f.writeBatch.reset() + if err := fn(f.writeBatch); err != nil { + return 0, err + } + item, writeSize, err := f.writeBatch.commit() + if err != nil { + return 0, err + } + f.frozen.Store(item) + return writeSize, nil +} + +// TruncateHead discards any recent data above the provided threshold number. +// It returns the previous head number. +func (f *Freezer) TruncateHead(items uint64) (uint64, error) { + if f.readonly { + return 0, errReadOnly + } + f.writeLock.Lock() + defer f.writeLock.Unlock() + + oitems := f.frozen.Load() + if oitems <= items { + return oitems, nil + } + for _, table := range f.tables { + if err := table.truncateHead(items); err != nil { + return 0, err + } + } + f.frozen.Store(items) + return oitems, nil +} + +// TruncateTail discards any recent data below the provided threshold number. +func (f *Freezer) TruncateTail(tail uint64) (uint64, error) { + if f.readonly { + return 0, errReadOnly + } + f.writeLock.Lock() + defer f.writeLock.Unlock() + + old := f.tail.Load() + if old >= tail { + return old, nil + } + for _, table := range f.tables { + if err := table.truncateTail(tail); err != nil { + return 0, err + } + } + f.tail.Store(tail) + return old, nil +} + +// Sync flushes all data tables to disk. +func (f *Freezer) Sync() error { + var errs []error + for _, table := range f.tables { + if err := table.Sync(); err != nil { + errs = append(errs, err) + } + } + if errs != nil { + return fmt.Errorf("%v", errs) + } + return nil +} + +// validate checks that every table has the same boundary. +// Used instead of `repair` in readonly mode. +func (f *Freezer) validate() error { + if len(f.tables) == 0 { + return nil + } + var ( + head uint64 + tail uint64 + name string + ) + // Hack to get boundary of any table + for kind, table := range f.tables { + head = table.items.Load() + tail = table.itemHidden.Load() + name = kind + break + } + // Now check every table against those boundaries. + for kind, table := range f.tables { + if head != table.items.Load() { + return fmt.Errorf("freezer tables %s and %s have differing head: %d != %d", kind, name, table.items.Load(), head) + } + if tail != table.itemHidden.Load() { + return fmt.Errorf("freezer tables %s and %s have differing tail: %d != %d", kind, name, table.itemHidden.Load(), tail) + } + } + f.frozen.Store(head) + f.tail.Store(tail) + return nil +} + +// repair truncates all data tables to the same length. +func (f *Freezer) repair() error { + var ( + head = uint64(math.MaxUint64) + tail = uint64(0) + ) + for _, table := range f.tables { + items := table.items.Load() + if head > items { + head = items + } + hidden := table.itemHidden.Load() + if hidden > tail { + tail = hidden + } + } + for _, table := range f.tables { + if err := table.truncateHead(head); err != nil { + return err + } + if err := table.truncateTail(tail); err != nil { + return err + } + } + f.frozen.Store(head) + f.tail.Store(tail) + return nil +} + +// convertLegacyFn takes a raw freezer entry in an older format and +// returns it in the new format. +type convertLegacyFn = func([]byte) ([]byte, error) + +// MigrateTable processes the entries in a given table in sequence +// converting them to a new format if they're of an old format. +func (f *Freezer) MigrateTable(kind string, convert convertLegacyFn) error { + if f.readonly { + return errReadOnly + } + f.writeLock.Lock() + defer f.writeLock.Unlock() + + table, ok := f.tables[kind] + if !ok { + return errUnknownTable + } + // forEach iterates every entry in the table serially and in order, calling `fn` + // with the item as argument. If `fn` returns an error the iteration stops + // and that error will be returned. + forEach := func(t *freezerTable, offset uint64, fn func(uint64, []byte) error) error { + var ( + items = t.items.Load() + batchSize = uint64(1024) + maxBytes = uint64(1024 * 1024) + ) + for i := offset; i < items; { + if i+batchSize > items { + batchSize = items - i + } + data, err := t.RetrieveItems(i, batchSize, maxBytes) + if err != nil { + return err + } + for j, item := range data { + if err := fn(i+uint64(j), item); err != nil { + return err + } + } + i += uint64(len(data)) + } + return nil + } + // TODO(s1na): This is a sanity-check since as of now no process does tail-deletion. But the migration + // process assumes no deletion at tail and needs to be modified to account for that. + if table.itemOffset.Load() > 0 || table.itemHidden.Load() > 0 { + return errors.New("migration not supported for tail-deleted freezers") + } + ancientsPath := filepath.Dir(table.index.Name()) + // Set up new dir for the migrated table, the content of which + // we'll at the end move over to the ancients dir. + migrationPath := filepath.Join(ancientsPath, "migration") + newTable, err := newFreezerTable(migrationPath, kind, table.noCompression, false) + if err != nil { + return err + } + var ( + batch = newTable.newBatch() + out []byte + start = time.Now() + logged = time.Now() + offset = newTable.items.Load() + ) + if offset > 0 { + log.Info("found previous migration attempt", "migrated", offset) + } + // Iterate through entries and transform them + if err := forEach(table, offset, func(i uint64, blob []byte) error { + if i%10000 == 0 && time.Since(logged) > 16*time.Second { + log.Info("Processing legacy elements", "count", i, "elapsed", common.PrettyDuration(time.Since(start))) + logged = time.Now() + } + out, err = convert(blob) + if err != nil { + return err + } + if err := batch.AppendRaw(i, out); err != nil { + return err + } + return nil + }); err != nil { + return err + } + if err := batch.commit(); err != nil { + return err + } + log.Info("Replacing old table files with migrated ones", "elapsed", common.PrettyDuration(time.Since(start))) + // Release and delete old table files. Note this won't + // delete the index file. + table.releaseFilesAfter(0, true) + + if err := newTable.Close(); err != nil { + return err + } + files, err := os.ReadDir(migrationPath) + if err != nil { + return err + } + // Move migrated files to ancients dir. + for _, f := range files { + // This will replace the old index file as a side-effect. + if err := os.Rename(filepath.Join(migrationPath, f.Name()), filepath.Join(ancientsPath, f.Name())); err != nil { + return err + } + } + // Delete by now empty dir. + if err := os.Remove(migrationPath); err != nil { + return err + } + return nil +} diff --git a/core/rawdb/freezer_batch.go b/core/rawdb/freezer_batch.go new file mode 100644 index 000000000..84a63a451 --- /dev/null +++ b/core/rawdb/freezer_batch.go @@ -0,0 +1,255 @@ +// Copyright 2021 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rawdb + +import ( + "fmt" + + "github.com/ethereum/go-ethereum/common/math" + "github.com/ethereum/go-ethereum/rlp" + "github.com/golang/snappy" +) + +// This is the maximum amount of data that will be buffered in memory +// for a single freezer table batch. +const freezerBatchBufferLimit = 2 * 1024 * 1024 + +// freezerBatch is a write operation of multiple items on a freezer. +type freezerBatch struct { + tables map[string]*freezerTableBatch +} + +func newFreezerBatch(f *Freezer) *freezerBatch { + batch := &freezerBatch{tables: make(map[string]*freezerTableBatch, len(f.tables))} + for kind, table := range f.tables { + batch.tables[kind] = table.newBatch() + } + return batch +} + +// Append adds an RLP-encoded item of the given kind. +func (batch *freezerBatch) Append(kind string, num uint64, item interface{}) error { + return batch.tables[kind].Append(num, item) +} + +// AppendRaw adds an item of the given kind. +func (batch *freezerBatch) AppendRaw(kind string, num uint64, item []byte) error { + return batch.tables[kind].AppendRaw(num, item) +} + +// reset initializes the batch. +func (batch *freezerBatch) reset() { + for _, tb := range batch.tables { + tb.reset() + } +} + +// commit is called at the end of a write operation and +// writes all remaining data to tables. +func (batch *freezerBatch) commit() (item uint64, writeSize int64, err error) { + // Check that count agrees on all batches. + item = uint64(math.MaxUint64) + for name, tb := range batch.tables { + if item < math.MaxUint64 && tb.curItem != item { + return 0, 0, fmt.Errorf("table %s is at item %d, want %d", name, tb.curItem, item) + } + item = tb.curItem + } + + // Commit all table batches. + for _, tb := range batch.tables { + if err := tb.commit(); err != nil { + return 0, 0, err + } + writeSize += tb.totalBytes + } + return item, writeSize, nil +} + +// freezerTableBatch is a batch for a freezer table. +type freezerTableBatch struct { + t *freezerTable + + sb *snappyBuffer + encBuffer writeBuffer + dataBuffer []byte + indexBuffer []byte + curItem uint64 // expected index of next append + totalBytes int64 // counts written bytes since reset +} + +// newBatch creates a new batch for the freezer table. +func (t *freezerTable) newBatch() *freezerTableBatch { + batch := &freezerTableBatch{t: t} + if !t.noCompression { + batch.sb = new(snappyBuffer) + } + batch.reset() + return batch +} + +// reset clears the batch for reuse. +func (batch *freezerTableBatch) reset() { + batch.dataBuffer = batch.dataBuffer[:0] + batch.indexBuffer = batch.indexBuffer[:0] + batch.curItem = batch.t.items.Load() + batch.totalBytes = 0 +} + +// Append rlp-encodes and adds data at the end of the freezer table. The item number is a +// precautionary parameter to ensure data correctness, but the table will reject already +// existing data. +func (batch *freezerTableBatch) Append(item uint64, data interface{}) error { + if item != batch.curItem { + return fmt.Errorf("%w: have %d want %d", errOutOrderInsertion, item, batch.curItem) + } + + // Encode the item. + batch.encBuffer.Reset() + if err := rlp.Encode(&batch.encBuffer, data); err != nil { + return err + } + encItem := batch.encBuffer.data + if batch.sb != nil { + encItem = batch.sb.compress(encItem) + } + return batch.appendItem(encItem) +} + +// AppendRaw injects a binary blob at the end of the freezer table. The item number is a +// precautionary parameter to ensure data correctness, but the table will reject already +// existing data. +func (batch *freezerTableBatch) AppendRaw(item uint64, blob []byte) error { + if item != batch.curItem { + return fmt.Errorf("%w: have %d want %d", errOutOrderInsertion, item, batch.curItem) + } + + encItem := blob + if batch.sb != nil { + encItem = batch.sb.compress(blob) + } + return batch.appendItem(encItem) +} + +func (batch *freezerTableBatch) appendItem(data []byte) error { + // Check if item fits into current data file. + itemSize := int64(len(data)) + itemOffset := batch.t.headBytes + int64(len(batch.dataBuffer)) + if itemOffset+itemSize > int64(batch.t.maxFileSize) { + // It doesn't fit, go to next file first. + if err := batch.commit(); err != nil { + return err + } + if err := batch.t.advanceHead(); err != nil { + return err + } + itemOffset = 0 + } + + // Put data to buffer. + batch.dataBuffer = append(batch.dataBuffer, data...) + batch.totalBytes += itemSize + + // Put index entry to buffer. + entry := indexEntry{filenum: batch.t.headId, offset: uint32(itemOffset + itemSize)} + batch.indexBuffer = entry.append(batch.indexBuffer) + batch.curItem++ + + return batch.maybeCommit() +} + +// maybeCommit writes the buffered data if the buffer is full enough. +func (batch *freezerTableBatch) maybeCommit() error { + if len(batch.dataBuffer) > freezerBatchBufferLimit { + return batch.commit() + } + return nil +} + +// commit writes the batched items to the backing freezerTable. +func (batch *freezerTableBatch) commit() error { + // Write data. The head file is fsync'd after write to ensure the + // data is truly transferred to disk. + _, err := batch.t.head.Write(batch.dataBuffer) + if err != nil { + return err + } + if err := batch.t.head.Sync(); err != nil { + return err + } + dataSize := int64(len(batch.dataBuffer)) + batch.dataBuffer = batch.dataBuffer[:0] + + // Write indices. The index file is fsync'd after write to ensure the + // data indexes are truly transferred to disk. + _, err = batch.t.index.Write(batch.indexBuffer) + if err != nil { + return err + } + if err := batch.t.index.Sync(); err != nil { + return err + } + indexSize := int64(len(batch.indexBuffer)) + batch.indexBuffer = batch.indexBuffer[:0] + + // Update headBytes of table. + batch.t.headBytes += dataSize + batch.t.items.Store(batch.curItem) + + // Update metrics. + batch.t.sizeGauge.Inc(dataSize + indexSize) + batch.t.writeMeter.Mark(dataSize + indexSize) + return nil +} + +// snappyBuffer writes snappy in block format, and can be reused. It is +// reset when WriteTo is called. +type snappyBuffer struct { + dst []byte +} + +// compress snappy-compresses the data. +func (s *snappyBuffer) compress(data []byte) []byte { + // The snappy library does not care what the capacity of the buffer is, + // but only checks the length. If the length is too small, it will + // allocate a brand new buffer. + // To avoid that, we check the required size here, and grow the size of the + // buffer to utilize the full capacity. + if n := snappy.MaxEncodedLen(len(data)); len(s.dst) < n { + if cap(s.dst) < n { + s.dst = make([]byte, n) + } + s.dst = s.dst[:n] + } + + s.dst = snappy.Encode(s.dst, data) + return s.dst +} + +// writeBuffer implements io.Writer for a byte slice. +type writeBuffer struct { + data []byte +} + +func (wb *writeBuffer) Write(data []byte) (int, error) { + wb.data = append(wb.data, data...) + return len(data), nil +} + +func (wb *writeBuffer) Reset() { + wb.data = wb.data[:0] +} diff --git a/core/rawdb/freezer_meta.go b/core/rawdb/freezer_meta.go new file mode 100644 index 000000000..9eef9df35 --- /dev/null +++ b/core/rawdb/freezer_meta.go @@ -0,0 +1,109 @@ +// Copyright 2022 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rawdb + +import ( + "io" + "os" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rlp" +) + +const freezerVersion = 1 // The initial version tag of freezer table metadata + +// freezerTableMeta wraps all the metadata of the freezer table. +type freezerTableMeta struct { + // Version is the versioning descriptor of the freezer table. + Version uint16 + + // VirtualTail indicates how many items have been marked as deleted. + // Its value is equal to the number of items removed from the table + // plus the number of items hidden in the table, so it should never + // be lower than the "actual tail". + VirtualTail uint64 +} + +// newMetadata initializes the metadata object with the given virtual tail. +func newMetadata(tail uint64) *freezerTableMeta { + return &freezerTableMeta{ + Version: freezerVersion, + VirtualTail: tail, + } +} + +// readMetadata reads the metadata of the freezer table from the +// given metadata file. +func readMetadata(file *os.File) (*freezerTableMeta, error) { + _, err := file.Seek(0, io.SeekStart) + if err != nil { + return nil, err + } + var meta freezerTableMeta + if err := rlp.Decode(file, &meta); err != nil { + return nil, err + } + return &meta, nil +} + +// writeMetadata writes the metadata of the freezer table into the +// given metadata file. +func writeMetadata(file *os.File, meta *freezerTableMeta) error { + _, err := file.Seek(0, io.SeekStart) + if err != nil { + return err + } + return rlp.Encode(file, meta) +} + +// loadMetadata loads the metadata from the given metadata file. +// Initializes the metadata file with the given "actual tail" if +// it's empty. +func loadMetadata(file *os.File, tail uint64) (*freezerTableMeta, error) { + stat, err := file.Stat() + if err != nil { + return nil, err + } + // Write the metadata with the given actual tail into metadata file + // if it's non-existent. There are two possible scenarios here: + // - the freezer table is empty + // - the freezer table is legacy + // In both cases, write the meta into the file with the actual tail + // as the virtual tail. + if stat.Size() == 0 { + m := newMetadata(tail) + if err := writeMetadata(file, m); err != nil { + return nil, err + } + return m, nil + } + m, err := readMetadata(file) + if err != nil { + return nil, err + } + // Update the virtual tail with the given actual tail if it's even + // lower than it. Theoretically it shouldn't happen at all, print + // a warning here. + if m.VirtualTail < tail { + log.Warn("Updated virtual tail", "have", m.VirtualTail, "now", tail) + m.VirtualTail = tail + if err := writeMetadata(file, m); err != nil { + return nil, err + } + } + return m, nil +} diff --git a/core/rawdb/freezer_meta_test.go b/core/rawdb/freezer_meta_test.go new file mode 100644 index 000000000..ba1a95e45 --- /dev/null +++ b/core/rawdb/freezer_meta_test.go @@ -0,0 +1,60 @@ +// Copyright 2022 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rawdb + +import ( + "os" + "testing" +) + +func TestReadWriteFreezerTableMeta(t *testing.T) { + f, err := os.CreateTemp(os.TempDir(), "*") + if err != nil { + t.Fatalf("Failed to create file %v", err) + } + err = writeMetadata(f, newMetadata(100)) + if err != nil { + t.Fatalf("Failed to write metadata %v", err) + } + meta, err := readMetadata(f) + if err != nil { + t.Fatalf("Failed to read metadata %v", err) + } + if meta.Version != freezerVersion { + t.Fatalf("Unexpected version field") + } + if meta.VirtualTail != uint64(100) { + t.Fatalf("Unexpected virtual tail field") + } +} + +func TestInitializeFreezerTableMeta(t *testing.T) { + f, err := os.CreateTemp(os.TempDir(), "*") + if err != nil { + t.Fatalf("Failed to create file %v", err) + } + meta, err := loadMetadata(f, uint64(100)) + if err != nil { + t.Fatalf("Failed to read metadata %v", err) + } + if meta.Version != freezerVersion { + t.Fatalf("Unexpected version field") + } + if meta.VirtualTail != uint64(100) { + t.Fatalf("Unexpected virtual tail field") + } +} diff --git a/core/rawdb/freezer_resettable.go b/core/rawdb/freezer_resettable.go new file mode 100644 index 000000000..7a8548973 --- /dev/null +++ b/core/rawdb/freezer_resettable.go @@ -0,0 +1,238 @@ +// Copyright 2022 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rawdb + +import ( + "os" + "path/filepath" + "sync" + + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" +) + +const tmpSuffix = ".tmp" + +// freezerOpenFunc is the function used to open/create a freezer. +type freezerOpenFunc = func() (*Freezer, error) + +// ResettableFreezer is a wrapper of the freezer which makes the +// freezer resettable. +type ResettableFreezer struct { + freezer *Freezer + opener freezerOpenFunc + datadir string + lock sync.RWMutex +} + +// NewResettableFreezer creates a resettable freezer, note freezer is +// only resettable if the passed file directory is exclusively occupied +// by the freezer. And also the user-configurable ancient root directory +// is **not** supported for reset since it might be a mount and rename +// will cause a copy of hundreds of gigabyte into local directory. It +// needs some other file based solutions. +// +// The reset function will delete directory atomically and re-create the +// freezer from scratch. +func NewResettableFreezer(datadir string, namespace string, readonly bool, maxTableSize uint32, tables map[string]bool) (*ResettableFreezer, error) { + if err := cleanup(datadir); err != nil { + return nil, err + } + opener := func() (*Freezer, error) { + return NewFreezer(datadir, namespace, readonly, maxTableSize, tables) + } + freezer, err := opener() + if err != nil { + return nil, err + } + return &ResettableFreezer{ + freezer: freezer, + opener: opener, + datadir: datadir, + }, nil +} + +// Reset deletes the file directory exclusively occupied by the freezer and +// recreate the freezer from scratch. The atomicity of directory deletion +// is guaranteed by the rename operation, the leftover directory will be +// cleaned up in next startup in case crash happens after rename. +func (f *ResettableFreezer) Reset() error { + f.lock.Lock() + defer f.lock.Unlock() + + if err := f.freezer.Close(); err != nil { + return err + } + tmp := tmpName(f.datadir) + if err := os.Rename(f.datadir, tmp); err != nil { + return err + } + if err := os.RemoveAll(tmp); err != nil { + return err + } + freezer, err := f.opener() + if err != nil { + return err + } + f.freezer = freezer + return nil +} + +// Close terminates the chain freezer, unmapping all the data files. +func (f *ResettableFreezer) Close() error { + f.lock.RLock() + defer f.lock.RUnlock() + + return f.freezer.Close() +} + +// HasAncient returns an indicator whether the specified ancient data exists +// in the freezer +func (f *ResettableFreezer) HasAncient(kind string, number uint64) (bool, error) { + f.lock.RLock() + defer f.lock.RUnlock() + + return f.freezer.HasAncient(kind, number) +} + +// Ancient retrieves an ancient binary blob from the append-only immutable files. +func (f *ResettableFreezer) Ancient(kind string, number uint64) ([]byte, error) { + f.lock.RLock() + defer f.lock.RUnlock() + + return f.freezer.Ancient(kind, number) +} + +// AncientRange retrieves multiple items in sequence, starting from the index 'start'. +// It will return +// - at most 'count' items, +// - if maxBytes is specified: at least 1 item (even if exceeding the maxByteSize), +// but will otherwise return as many items as fit into maxByteSize. +// - if maxBytes is not specified, 'count' items will be returned if they are present. +func (f *ResettableFreezer) AncientRange(kind string, start, count, maxBytes uint64) ([][]byte, error) { + f.lock.RLock() + defer f.lock.RUnlock() + + return f.freezer.AncientRange(kind, start, count, maxBytes) +} + +// Ancients returns the length of the frozen items. +func (f *ResettableFreezer) Ancients() (uint64, error) { + f.lock.RLock() + defer f.lock.RUnlock() + + return f.freezer.Ancients() +} + +// Tail returns the number of first stored item in the freezer. +func (f *ResettableFreezer) Tail() (uint64, error) { + f.lock.RLock() + defer f.lock.RUnlock() + + return f.freezer.Tail() +} + +// AncientSize returns the ancient size of the specified category. +func (f *ResettableFreezer) AncientSize(kind string) (uint64, error) { + f.lock.RLock() + defer f.lock.RUnlock() + + return f.freezer.AncientSize(kind) +} + +// ReadAncients runs the given read operation while ensuring that no writes take place +// on the underlying freezer. +func (f *ResettableFreezer) ReadAncients(fn func(ethdb.AncientReaderOp) error) (err error) { + f.lock.RLock() + defer f.lock.RUnlock() + + return f.freezer.ReadAncients(fn) +} + +// ModifyAncients runs the given write operation. +func (f *ResettableFreezer) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (writeSize int64, err error) { + f.lock.RLock() + defer f.lock.RUnlock() + + return f.freezer.ModifyAncients(fn) +} + +// TruncateHead discards any recent data above the provided threshold number. +// It returns the previous head number. +func (f *ResettableFreezer) TruncateHead(items uint64) (uint64, error) { + f.lock.RLock() + defer f.lock.RUnlock() + + return f.freezer.TruncateHead(items) +} + +// TruncateTail discards any recent data below the provided threshold number. +// It returns the previous value +func (f *ResettableFreezer) TruncateTail(tail uint64) (uint64, error) { + f.lock.RLock() + defer f.lock.RUnlock() + + return f.freezer.TruncateTail(tail) +} + +// Sync flushes all data tables to disk. +func (f *ResettableFreezer) Sync() error { + f.lock.RLock() + defer f.lock.RUnlock() + + return f.freezer.Sync() +} + +// MigrateTable processes the entries in a given table in sequence +// converting them to a new format if they're of an old format. +func (f *ResettableFreezer) MigrateTable(kind string, convert convertLegacyFn) error { + f.lock.RLock() + defer f.lock.RUnlock() + + return f.freezer.MigrateTable(kind, convert) +} + +// cleanup removes the directory located in the specified path +// has the name with deletion marker suffix. +func cleanup(path string) error { + parent := filepath.Dir(path) + if _, err := os.Lstat(parent); os.IsNotExist(err) { + return nil + } + dir, err := os.Open(parent) + if err != nil { + return err + } + names, err := dir.Readdirnames(0) + if err != nil { + return err + } + if cerr := dir.Close(); cerr != nil { + return cerr + } + for _, name := range names { + if name == filepath.Base(path)+tmpSuffix { + log.Info("Removed leftover freezer directory", "name", name) + return os.RemoveAll(filepath.Join(parent, name)) + } + } + return nil +} + +func tmpName(path string) string { + return filepath.Join(filepath.Dir(path), filepath.Base(path)+tmpSuffix) +} diff --git a/core/rawdb/freezer_resettable_test.go b/core/rawdb/freezer_resettable_test.go new file mode 100644 index 000000000..d741bc14e --- /dev/null +++ b/core/rawdb/freezer_resettable_test.go @@ -0,0 +1,107 @@ +// Copyright 2022 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rawdb + +import ( + "bytes" + "os" + "testing" + + "github.com/ethereum/go-ethereum/ethdb" +) + +func TestResetFreezer(t *testing.T) { + items := []struct { + id uint64 + blob []byte + }{ + {0, bytes.Repeat([]byte{0}, 2048)}, + {1, bytes.Repeat([]byte{1}, 2048)}, + {2, bytes.Repeat([]byte{2}, 2048)}, + } + f, _ := NewResettableFreezer(t.TempDir(), "", false, 2048, freezerTestTableDef) + defer f.Close() + + f.ModifyAncients(func(op ethdb.AncientWriteOp) error { + for _, item := range items { + op.AppendRaw("test", item.id, item.blob) + } + return nil + }) + for _, item := range items { + blob, _ := f.Ancient("test", item.id) + if !bytes.Equal(blob, item.blob) { + t.Fatal("Unexpected blob") + } + } + + // Reset freezer + f.Reset() + count, _ := f.Ancients() + if count != 0 { + t.Fatal("Failed to reset freezer") + } + for _, item := range items { + blob, _ := f.Ancient("test", item.id) + if len(blob) != 0 { + t.Fatal("Unexpected blob") + } + } + + // Fill the freezer + f.ModifyAncients(func(op ethdb.AncientWriteOp) error { + for _, item := range items { + op.AppendRaw("test", item.id, item.blob) + } + return nil + }) + for _, item := range items { + blob, _ := f.Ancient("test", item.id) + if !bytes.Equal(blob, item.blob) { + t.Fatal("Unexpected blob") + } + } +} + +func TestFreezerCleanup(t *testing.T) { + items := []struct { + id uint64 + blob []byte + }{ + {0, bytes.Repeat([]byte{0}, 2048)}, + {1, bytes.Repeat([]byte{1}, 2048)}, + {2, bytes.Repeat([]byte{2}, 2048)}, + } + datadir := t.TempDir() + f, _ := NewResettableFreezer(datadir, "", false, 2048, freezerTestTableDef) + f.ModifyAncients(func(op ethdb.AncientWriteOp) error { + for _, item := range items { + op.AppendRaw("test", item.id, item.blob) + } + return nil + }) + f.Close() + os.Rename(datadir, tmpName(datadir)) + + // Open the freezer again, trigger cleanup operation + f, _ = NewResettableFreezer(datadir, "", false, 2048, freezerTestTableDef) + f.Close() + + if _, err := os.Lstat(tmpName(datadir)); !os.IsNotExist(err) { + t.Fatal("Failed to cleanup leftover directory") + } +} diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go new file mode 100644 index 000000000..4b9d510e8 --- /dev/null +++ b/core/rawdb/freezer_table.go @@ -0,0 +1,990 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rawdb + +import ( + "bytes" + "encoding/binary" + "errors" + "fmt" + "io" + "os" + "path/filepath" + "sync" + "sync/atomic" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" + "github.com/golang/snappy" +) + +var ( + // errClosed is returned if an operation attempts to read from or write to the + // freezer table after it has already been closed. + errClosed = errors.New("closed") + + // errOutOfBounds is returned if the item requested is not contained within the + // freezer table. + errOutOfBounds = errors.New("out of bounds") + + // errNotSupported is returned if the database doesn't support the required operation. + errNotSupported = errors.New("this operation is not supported") +) + +// indexEntry contains the number/id of the file that the data resides in, as well as the +// offset within the file to the end of the data. +// In serialized form, the filenum is stored as uint16. +type indexEntry struct { + filenum uint32 // stored as uint16 ( 2 bytes ) + offset uint32 // stored as uint32 ( 4 bytes ) +} + +const indexEntrySize = 6 + +// unmarshalBinary deserializes binary b into the rawIndex entry. +func (i *indexEntry) unmarshalBinary(b []byte) { + i.filenum = uint32(binary.BigEndian.Uint16(b[:2])) + i.offset = binary.BigEndian.Uint32(b[2:6]) +} + +// append adds the encoded entry to the end of b. +func (i *indexEntry) append(b []byte) []byte { + offset := len(b) + out := append(b, make([]byte, indexEntrySize)...) + binary.BigEndian.PutUint16(out[offset:], uint16(i.filenum)) + binary.BigEndian.PutUint32(out[offset+2:], i.offset) + return out +} + +// bounds returns the start- and end- offsets, and the file number of where to +// read there data item marked by the two index entries. The two entries are +// assumed to be sequential. +func (i *indexEntry) bounds(end *indexEntry) (startOffset, endOffset, fileId uint32) { + if i.filenum != end.filenum { + // If a piece of data 'crosses' a data-file, + // it's actually in one piece on the second data-file. + // We return a zero-indexEntry for the second file as start + return 0, end.offset, end.filenum + } + return i.offset, end.offset, end.filenum +} + +// freezerTable represents a single chained data table within the freezer (e.g. blocks). +// It consists of a data file (snappy encoded arbitrary data blobs) and an indexEntry +// file (uncompressed 64 bit indices into the data file). +type freezerTable struct { + items atomic.Uint64 // Number of items stored in the table (including items removed from tail) + itemOffset atomic.Uint64 // Number of items removed from the table + + // itemHidden is the number of items marked as deleted. Tail deletion is + // only supported at file level which means the actual deletion will be + // delayed until the entire data file is marked as deleted. Before that + // these items will be hidden to prevent being visited again. The value + // should never be lower than itemOffset. + itemHidden atomic.Uint64 + + noCompression bool // if true, disables snappy compression. Note: does not work retroactively + readonly bool + maxFileSize uint32 // Max file size for data-files + name string + path string + + head *os.File // File descriptor for the data head of the table + index *os.File // File descriptor for the indexEntry file of the table + meta *os.File // File descriptor for metadata of the table + files map[uint32]*os.File // open files + headId uint32 // number of the currently active head file + tailId uint32 // number of the earliest file + + headBytes int64 // Number of bytes written to the head file + readMeter metrics.Meter // Meter for measuring the effective amount of data read + writeMeter metrics.Meter // Meter for measuring the effective amount of data written + sizeGauge metrics.Gauge // Gauge for tracking the combined size of all freezer tables + + logger log.Logger // Logger with database path and table name embedded + lock sync.RWMutex // Mutex protecting the data file descriptors +} + +// newFreezerTable opens the given path as a freezer table. +func newFreezerTable(path, name string, disableSnappy, readonly bool) (*freezerTable, error) { + return newTable(path, name, metrics.NilMeter{}, metrics.NilMeter{}, metrics.NilGauge{}, freezerTableSize, disableSnappy, readonly) +} + +// newTable opens a freezer table, creating the data and index files if they are +// non-existent. Both files are truncated to the shortest common length to ensure +// they don't go out of sync. +func newTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, sizeGauge metrics.Gauge, maxFilesize uint32, noCompression, readonly bool) (*freezerTable, error) { + // Ensure the containing directory exists and open the indexEntry file + if err := os.MkdirAll(path, 0755); err != nil { + return nil, err + } + var idxName string + if noCompression { + idxName = fmt.Sprintf("%s.ridx", name) // raw index file + } else { + idxName = fmt.Sprintf("%s.cidx", name) // compressed index file + } + var ( + err error + index *os.File + meta *os.File + ) + if readonly { + // Will fail if table index file or meta file is not existent + index, err = openFreezerFileForReadOnly(filepath.Join(path, idxName)) + if err != nil { + return nil, err + } + meta, err = openFreezerFileForReadOnly(filepath.Join(path, fmt.Sprintf("%s.meta", name))) + if err != nil { + return nil, err + } + } else { + index, err = openFreezerFileForAppend(filepath.Join(path, idxName)) + if err != nil { + return nil, err + } + meta, err = openFreezerFileForAppend(filepath.Join(path, fmt.Sprintf("%s.meta", name))) + if err != nil { + return nil, err + } + } + // Create the table and repair any past inconsistency + tab := &freezerTable{ + index: index, + meta: meta, + files: make(map[uint32]*os.File), + readMeter: readMeter, + writeMeter: writeMeter, + sizeGauge: sizeGauge, + name: name, + path: path, + logger: log.New("database", path, "table", name), + noCompression: noCompression, + readonly: readonly, + maxFileSize: maxFilesize, + } + if err := tab.repair(); err != nil { + tab.Close() + return nil, err + } + // Initialize the starting size counter + size, err := tab.sizeNolock() + if err != nil { + tab.Close() + return nil, err + } + tab.sizeGauge.Inc(int64(size)) + + return tab, nil +} + +// repair cross-checks the head and the index file and truncates them to +// be in sync with each other after a potential crash / data loss. +func (t *freezerTable) repair() error { + // Create a temporary offset buffer to init files with and read indexEntry into + buffer := make([]byte, indexEntrySize) + + // If we've just created the files, initialize the index with the 0 indexEntry + stat, err := t.index.Stat() + if err != nil { + return err + } + if stat.Size() == 0 { + if _, err := t.index.Write(buffer); err != nil { + return err + } + } + // Ensure the index is a multiple of indexEntrySize bytes + if overflow := stat.Size() % indexEntrySize; overflow != 0 { + if t.readonly { + return fmt.Errorf("index file(path: %s, name: %s) size is not a multiple of %d", t.path, t.name, indexEntrySize) + } + if err := truncateFreezerFile(t.index, stat.Size()-overflow); err != nil { + return err + } // New file can't trigger this path + } + // Retrieve the file sizes and prepare for truncation + if stat, err = t.index.Stat(); err != nil { + return err + } + offsetsSize := stat.Size() + + // Open the head file + var ( + firstIndex indexEntry + lastIndex indexEntry + contentSize int64 + contentExp int64 + verbose bool + ) + // Read index zero, determine what file is the earliest + // and what item offset to use + t.index.ReadAt(buffer, 0) + firstIndex.unmarshalBinary(buffer) + + // Assign the tail fields with the first stored index. + // The total removed items is represented with an uint32, + // which is not enough in theory but enough in practice. + // TODO: use uint64 to represent total removed items. + t.tailId = firstIndex.filenum + t.itemOffset.Store(uint64(firstIndex.offset)) + + // Load metadata from the file + meta, err := loadMetadata(t.meta, t.itemOffset.Load()) + if err != nil { + return err + } + t.itemHidden.Store(meta.VirtualTail) + + // Read the last index, use the default value in case the freezer is empty + if offsetsSize == indexEntrySize { + lastIndex = indexEntry{filenum: t.tailId, offset: 0} + } else { + t.index.ReadAt(buffer, offsetsSize-indexEntrySize) + lastIndex.unmarshalBinary(buffer) + } + // Print an error log if the index is corrupted due to an incorrect + // last index item. While it is theoretically possible to have a zero offset + // by storing all zero-size items, it is highly unlikely to occur in practice. + if lastIndex.offset == 0 && offsetsSize/indexEntrySize > 1 { + log.Error("Corrupted index file detected", "lastOffset", lastIndex.offset, "indexes", offsetsSize/indexEntrySize) + } + if t.readonly { + t.head, err = t.openFile(lastIndex.filenum, openFreezerFileForReadOnly) + } else { + t.head, err = t.openFile(lastIndex.filenum, openFreezerFileForAppend) + } + if err != nil { + return err + } + if stat, err = t.head.Stat(); err != nil { + return err + } + contentSize = stat.Size() + + // Keep truncating both files until they come in sync + contentExp = int64(lastIndex.offset) + for contentExp != contentSize { + if t.readonly { + return fmt.Errorf("freezer table(path: %s, name: %s, num: %d) is corrupted", t.path, t.name, lastIndex.filenum) + } + verbose = true + // Truncate the head file to the last offset pointer + if contentExp < contentSize { + t.logger.Warn("Truncating dangling head", "indexed", contentExp, "stored", contentSize) + if err := truncateFreezerFile(t.head, contentExp); err != nil { + return err + } + contentSize = contentExp + } + // Truncate the index to point within the head file + if contentExp > contentSize { + t.logger.Warn("Truncating dangling indexes", "indexes", offsetsSize/indexEntrySize, "indexed", contentExp, "stored", contentSize) + if err := truncateFreezerFile(t.index, offsetsSize-indexEntrySize); err != nil { + return err + } + offsetsSize -= indexEntrySize + + // Read the new head index, use the default value in case + // the freezer is already empty. + var newLastIndex indexEntry + if offsetsSize == indexEntrySize { + newLastIndex = indexEntry{filenum: t.tailId, offset: 0} + } else { + t.index.ReadAt(buffer, offsetsSize-indexEntrySize) + newLastIndex.unmarshalBinary(buffer) + } + // We might have slipped back into an earlier head-file here + if newLastIndex.filenum != lastIndex.filenum { + // Release earlier opened file + t.releaseFile(lastIndex.filenum) + if t.head, err = t.openFile(newLastIndex.filenum, openFreezerFileForAppend); err != nil { + return err + } + if stat, err = t.head.Stat(); err != nil { + // TODO, anything more we can do here? + // A data file has gone missing... + return err + } + contentSize = stat.Size() + } + lastIndex = newLastIndex + contentExp = int64(lastIndex.offset) + } + } + // Sync() fails for read-only files on windows. + if !t.readonly { + // Ensure all reparation changes have been written to disk + if err := t.index.Sync(); err != nil { + return err + } + if err := t.head.Sync(); err != nil { + return err + } + if err := t.meta.Sync(); err != nil { + return err + } + } + // Update the item and byte counters and return + t.items.Store(t.itemOffset.Load() + uint64(offsetsSize/indexEntrySize-1)) // last indexEntry points to the end of the data file + t.headBytes = contentSize + t.headId = lastIndex.filenum + + // Delete the leftover files because of head deletion + t.releaseFilesAfter(t.headId, true) + + // Delete the leftover files because of tail deletion + t.releaseFilesBefore(t.tailId, true) + + // Close opened files and preopen all files + if err := t.preopen(); err != nil { + return err + } + if verbose { + t.logger.Info("Chain freezer table opened", "items", t.items.Load(), "deleted", t.itemOffset.Load(), "hidden", t.itemHidden.Load(), "tailId", t.tailId, "headId", t.headId, "size", t.headBytes) + } else { + t.logger.Debug("Chain freezer table opened", "items", t.items.Load(), "size", common.StorageSize(t.headBytes)) + } + return nil +} + +// preopen opens all files that the freezer will need. This method should be called from an init-context, +// since it assumes that it doesn't have to bother with locking +// The rationale for doing preopen is to not have to do it from within Retrieve, thus not needing to ever +// obtain a write-lock within Retrieve. +func (t *freezerTable) preopen() (err error) { + // The repair might have already opened (some) files + t.releaseFilesAfter(0, false) + + // Open all except head in RDONLY + for i := t.tailId; i < t.headId; i++ { + if _, err = t.openFile(i, openFreezerFileForReadOnly); err != nil { + return err + } + } + if t.readonly { + t.head, err = t.openFile(t.headId, openFreezerFileForReadOnly) + } else { + // Open head in read/write + t.head, err = t.openFile(t.headId, openFreezerFileForAppend) + } + return err +} + +// truncateHead discards any recent data above the provided threshold number. +func (t *freezerTable) truncateHead(items uint64) error { + t.lock.Lock() + defer t.lock.Unlock() + + // Ensure the given truncate target falls in the correct range + existing := t.items.Load() + if existing <= items { + return nil + } + if items < t.itemHidden.Load() { + return errors.New("truncation below tail") + } + // We need to truncate, save the old size for metrics tracking + oldSize, err := t.sizeNolock() + if err != nil { + return err + } + // Something's out of sync, truncate the table's offset index + log := t.logger.Debug + if existing > items+1 { + log = t.logger.Warn // Only loud warn if we delete multiple items + } + log("Truncating freezer table", "items", existing, "limit", items) + + // Truncate the index file first, the tail position is also considered + // when calculating the new freezer table length. + length := items - t.itemOffset.Load() + if err := truncateFreezerFile(t.index, int64(length+1)*indexEntrySize); err != nil { + return err + } + if err := t.index.Sync(); err != nil { + return err + } + // Calculate the new expected size of the data file and truncate it + var expected indexEntry + if length == 0 { + expected = indexEntry{filenum: t.tailId, offset: 0} + } else { + buffer := make([]byte, indexEntrySize) + if _, err := t.index.ReadAt(buffer, int64(length*indexEntrySize)); err != nil { + return err + } + expected.unmarshalBinary(buffer) + } + // We might need to truncate back to older files + if expected.filenum != t.headId { + // If already open for reading, force-reopen for writing + t.releaseFile(expected.filenum) + newHead, err := t.openFile(expected.filenum, openFreezerFileForAppend) + if err != nil { + return err + } + // Release any files _after the current head -- both the previous head + // and any files which may have been opened for reading + t.releaseFilesAfter(expected.filenum, true) + + // Set back the historic head + t.head = newHead + t.headId = expected.filenum + } + if err := truncateFreezerFile(t.head, int64(expected.offset)); err != nil { + return err + } + if err := t.head.Sync(); err != nil { + return err + } + // All data files truncated, set internal counters and return + t.headBytes = int64(expected.offset) + t.items.Store(items) + + // Retrieve the new size and update the total size counter + newSize, err := t.sizeNolock() + if err != nil { + return err + } + t.sizeGauge.Dec(int64(oldSize - newSize)) + return nil +} + +// sizeHidden returns the total data size of hidden items in the freezer table. +// This function assumes the lock is already held. +func (t *freezerTable) sizeHidden() (uint64, error) { + hidden, offset := t.itemHidden.Load(), t.itemOffset.Load() + if hidden <= offset { + return 0, nil + } + indices, err := t.getIndices(hidden-1, 1) + if err != nil { + return 0, err + } + return uint64(indices[1].offset), nil +} + +// truncateTail discards any recent data before the provided threshold number. +func (t *freezerTable) truncateTail(items uint64) error { + t.lock.Lock() + defer t.lock.Unlock() + + // Ensure the given truncate target falls in the correct range + if t.itemHidden.Load() >= items { + return nil + } + if t.items.Load() < items { + return errors.New("truncation above head") + } + // Load the new tail index by the given new tail position + var ( + newTailId uint32 + buffer = make([]byte, indexEntrySize) + ) + if t.items.Load() == items { + newTailId = t.headId + } else { + offset := items - t.itemOffset.Load() + if _, err := t.index.ReadAt(buffer, int64((offset+1)*indexEntrySize)); err != nil { + return err + } + var newTail indexEntry + newTail.unmarshalBinary(buffer) + newTailId = newTail.filenum + } + // Save the old size for metrics tracking. This needs to be done + // before any updates to either itemHidden or itemOffset. + oldSize, err := t.sizeNolock() + if err != nil { + return err + } + // Update the virtual tail marker and hidden these entries in table. + t.itemHidden.Store(items) + if err := writeMetadata(t.meta, newMetadata(items)); err != nil { + return err + } + // Hidden items still fall in the current tail file, no data file + // can be dropped. + if t.tailId == newTailId { + return nil + } + // Hidden items fall in the incorrect range, returns the error. + if t.tailId > newTailId { + return fmt.Errorf("invalid index, tail-file %d, item-file %d", t.tailId, newTailId) + } + // Count how many items can be deleted from the file. + var ( + newDeleted = items + deleted = t.itemOffset.Load() + ) + // Hidden items exceed the current tail file, drop the relevant data files. + for current := items - 1; current >= deleted; current -= 1 { + if _, err := t.index.ReadAt(buffer, int64((current-deleted+1)*indexEntrySize)); err != nil { + return err + } + var pre indexEntry + pre.unmarshalBinary(buffer) + if pre.filenum != newTailId { + break + } + newDeleted = current + } + // Commit the changes of metadata file first before manipulating + // the indexes file. + if err := t.meta.Sync(); err != nil { + return err + } + // Close the index file before shorten it. + if err := t.index.Close(); err != nil { + return err + } + // Truncate the deleted index entries from the index file. + err = copyFrom(t.index.Name(), t.index.Name(), indexEntrySize*(newDeleted-deleted+1), func(f *os.File) error { + tailIndex := indexEntry{ + filenum: newTailId, + offset: uint32(newDeleted), + } + _, err := f.Write(tailIndex.append(nil)) + return err + }) + if err != nil { + return err + } + // Reopen the modified index file to load the changes + t.index, err = openFreezerFileForAppend(t.index.Name()) + if err != nil { + return err + } + // Sync the file to ensure changes are flushed to disk + if err := t.index.Sync(); err != nil { + return err + } + // Release any files before the current tail + t.tailId = newTailId + t.itemOffset.Store(newDeleted) + t.releaseFilesBefore(t.tailId, true) + + // Retrieve the new size and update the total size counter + newSize, err := t.sizeNolock() + if err != nil { + return err + } + t.sizeGauge.Dec(int64(oldSize - newSize)) + return nil +} + +// Close closes all opened files. +func (t *freezerTable) Close() error { + t.lock.Lock() + defer t.lock.Unlock() + + var errs []error + doClose := func(f *os.File, sync bool, close bool) { + if sync && !t.readonly { + if err := f.Sync(); err != nil { + errs = append(errs, err) + } + } + if close { + if err := f.Close(); err != nil { + errs = append(errs, err) + } + } + } + // Trying to fsync a file opened in rdonly causes "Access denied" + // error on Windows. + doClose(t.index, true, true) + doClose(t.meta, true, true) + + // The preopened non-head data-files are all opened in readonly. + // The head is opened in rw-mode, so we sync it here - but since it's also + // part of t.files, it will be closed in the loop below. + doClose(t.head, true, false) // sync but do not close + + for _, f := range t.files { + doClose(f, false, true) // close but do not sync + } + t.index = nil + t.meta = nil + t.head = nil + + if errs != nil { + return fmt.Errorf("%v", errs) + } + return nil +} + +// openFile assumes that the write-lock is held by the caller +func (t *freezerTable) openFile(num uint32, opener func(string) (*os.File, error)) (f *os.File, err error) { + var exist bool + if f, exist = t.files[num]; !exist { + var name string + if t.noCompression { + name = fmt.Sprintf("%s.%04d.rdat", t.name, num) + } else { + name = fmt.Sprintf("%s.%04d.cdat", t.name, num) + } + f, err = opener(filepath.Join(t.path, name)) + if err != nil { + return nil, err + } + t.files[num] = f + } + return f, err +} + +// releaseFile closes a file, and removes it from the open file cache. +// Assumes that the caller holds the write lock +func (t *freezerTable) releaseFile(num uint32) { + if f, exist := t.files[num]; exist { + delete(t.files, num) + f.Close() + } +} + +// releaseFilesAfter closes all open files with a higher number, and optionally also deletes the files +func (t *freezerTable) releaseFilesAfter(num uint32, remove bool) { + for fnum, f := range t.files { + if fnum > num { + delete(t.files, fnum) + f.Close() + if remove { + os.Remove(f.Name()) + } + } + } +} + +// releaseFilesBefore closes all open files with a lower number, and optionally also deletes the files +func (t *freezerTable) releaseFilesBefore(num uint32, remove bool) { + for fnum, f := range t.files { + if fnum < num { + delete(t.files, fnum) + f.Close() + if remove { + os.Remove(f.Name()) + } + } + } +} + +// getIndices returns the index entries for the given from-item, covering 'count' items. +// N.B: The actual number of returned indices for N items will always be N+1 (unless an +// error is returned). +// OBS: This method assumes that the caller has already verified (and/or trimmed) the range +// so that the items are within bounds. If this method is used to read out of bounds, +// it will return error. +func (t *freezerTable) getIndices(from, count uint64) ([]*indexEntry, error) { + // Apply the table-offset + from = from - t.itemOffset.Load() + + // For reading N items, we need N+1 indices. + buffer := make([]byte, (count+1)*indexEntrySize) + if _, err := t.index.ReadAt(buffer, int64(from*indexEntrySize)); err != nil { + return nil, err + } + var ( + indices []*indexEntry + offset int + ) + for i := from; i <= from+count; i++ { + index := new(indexEntry) + index.unmarshalBinary(buffer[offset:]) + offset += indexEntrySize + indices = append(indices, index) + } + if from == 0 { + // Special case if we're reading the first item in the freezer. We assume that + // the first item always start from zero(regarding the deletion, we + // only support deletion by files, so that the assumption is held). + // This means we can use the first item metadata to carry information about + // the 'global' offset, for the deletion-case + indices[0].offset = 0 + indices[0].filenum = indices[1].filenum + } + return indices, nil +} + +// Retrieve looks up the data offset of an item with the given number and retrieves +// the raw binary blob from the data file. +func (t *freezerTable) Retrieve(item uint64) ([]byte, error) { + items, err := t.RetrieveItems(item, 1, 0) + if err != nil { + return nil, err + } + return items[0], nil +} + +// RetrieveItems returns multiple items in sequence, starting from the index 'start'. +// It will return at most 'max' items, but will abort earlier to respect the +// 'maxBytes' argument. However, if the 'maxBytes' is smaller than the size of one +// item, it _will_ return one element and possibly overflow the maxBytes. +func (t *freezerTable) RetrieveItems(start, count, maxBytes uint64) ([][]byte, error) { + // First we read the 'raw' data, which might be compressed. + diskData, sizes, err := t.retrieveItems(start, count, maxBytes) + if err != nil { + return nil, err + } + var ( + output = make([][]byte, 0, count) + offset int // offset for reading + outputSize int // size of uncompressed data + ) + // Now slice up the data and decompress. + for i, diskSize := range sizes { + item := diskData[offset : offset+diskSize] + offset += diskSize + decompressedSize := diskSize + if !t.noCompression { + decompressedSize, _ = snappy.DecodedLen(item) + } + if i > 0 && maxBytes != 0 && uint64(outputSize+decompressedSize) > maxBytes { + break + } + if !t.noCompression { + data, err := snappy.Decode(nil, item) + if err != nil { + return nil, err + } + output = append(output, data) + } else { + output = append(output, item) + } + outputSize += decompressedSize + } + return output, nil +} + +// retrieveItems reads up to 'count' items from the table. It reads at least +// one item, but otherwise avoids reading more than maxBytes bytes. Freezer +// will ignore the size limitation and continuously allocate memory to store +// data if maxBytes is 0. It returns the (potentially compressed) data, and +// the sizes. +func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []int, error) { + t.lock.RLock() + defer t.lock.RUnlock() + + // Ensure the table and the item are accessible + if t.index == nil || t.head == nil || t.meta == nil { + return nil, nil, errClosed + } + var ( + items = t.items.Load() // the total items(head + 1) + hidden = t.itemHidden.Load() // the number of hidden items + ) + // Ensure the start is written, not deleted from the tail, and that the + // caller actually wants something + if items <= start || hidden > start || count == 0 { + return nil, nil, errOutOfBounds + } + if start+count > items { + count = items - start + } + var output []byte // Buffer to read data into + if maxBytes != 0 { + output = make([]byte, 0, maxBytes) + } else { + output = make([]byte, 0, 1024) // initial buffer cap + } + // readData is a helper method to read a single data item from disk. + readData := func(fileId, start uint32, length int) error { + output = grow(output, length) + dataFile, exist := t.files[fileId] + if !exist { + return fmt.Errorf("missing data file %d", fileId) + } + if _, err := dataFile.ReadAt(output[len(output)-length:], int64(start)); err != nil { + return fmt.Errorf("%w, fileid: %d, start: %d, length: %d", err, fileId, start, length) + } + return nil + } + // Read all the indexes in one go + indices, err := t.getIndices(start, count) + if err != nil { + return nil, nil, err + } + var ( + sizes []int // The sizes for each element + totalSize = 0 // The total size of all data read so far + readStart = indices[0].offset // Where, in the file, to start reading + unreadSize = 0 // The size of the as-yet-unread data + ) + + for i, firstIndex := range indices[:len(indices)-1] { + secondIndex := indices[i+1] + // Determine the size of the item. + offset1, offset2, _ := firstIndex.bounds(secondIndex) + size := int(offset2 - offset1) + // Crossing a file boundary? + if secondIndex.filenum != firstIndex.filenum { + // If we have unread data in the first file, we need to do that read now. + if unreadSize > 0 { + if err := readData(firstIndex.filenum, readStart, unreadSize); err != nil { + return nil, nil, err + } + unreadSize = 0 + } + readStart = 0 + } + if i > 0 && uint64(totalSize+size) > maxBytes && maxBytes != 0 { + // About to break out due to byte limit being exceeded. We don't + // read this last item, but we need to do the deferred reads now. + if unreadSize > 0 { + if err := readData(secondIndex.filenum, readStart, unreadSize); err != nil { + return nil, nil, err + } + } + break + } + // Defer the read for later + unreadSize += size + totalSize += size + sizes = append(sizes, size) + if i == len(indices)-2 || (uint64(totalSize) > maxBytes && maxBytes != 0) { + // Last item, need to do the read now + if err := readData(secondIndex.filenum, readStart, unreadSize); err != nil { + return nil, nil, err + } + break + } + } + + // Update metrics. + t.readMeter.Mark(int64(totalSize)) + return output, sizes, nil +} + +// has returns an indicator whether the specified number data is still accessible +// in the freezer table. +func (t *freezerTable) has(number uint64) bool { + return t.items.Load() > number && t.itemHidden.Load() <= number +} + +// size returns the total data size in the freezer table. +func (t *freezerTable) size() (uint64, error) { + t.lock.RLock() + defer t.lock.RUnlock() + + return t.sizeNolock() +} + +// sizeNolock returns the total data size in the freezer table. This function +// assumes the lock is already held. +func (t *freezerTable) sizeNolock() (uint64, error) { + stat, err := t.index.Stat() + if err != nil { + return 0, err + } + hidden, err := t.sizeHidden() + if err != nil { + return 0, err + } + total := uint64(t.maxFileSize)*uint64(t.headId-t.tailId) + uint64(t.headBytes) + uint64(stat.Size()) - hidden + return total, nil +} + +// advanceHead should be called when the current head file would outgrow the file limits, +// and a new file must be opened. The caller of this method must hold the write-lock +// before calling this method. +func (t *freezerTable) advanceHead() error { + t.lock.Lock() + defer t.lock.Unlock() + + // We open the next file in truncated mode -- if this file already + // exists, we need to start over from scratch on it. + nextID := t.headId + 1 + newHead, err := t.openFile(nextID, openFreezerFileTruncated) + if err != nil { + return err + } + // Commit the contents of the old file to stable storage and + // tear it down. It will be re-opened in read-only mode. + if err := t.head.Sync(); err != nil { + return err + } + t.releaseFile(t.headId) + t.openFile(t.headId, openFreezerFileForReadOnly) + + // Swap out the current head. + t.head = newHead + t.headBytes = 0 + t.headId = nextID + return nil +} + +// Sync pushes any pending data from memory out to disk. This is an expensive +// operation, so use it with care. +func (t *freezerTable) Sync() error { + t.lock.Lock() + defer t.lock.Unlock() + if t.index == nil || t.head == nil || t.meta == nil { + return errClosed + } + var err error + trackError := func(e error) { + if e != nil && err == nil { + err = e + } + } + + trackError(t.index.Sync()) + trackError(t.meta.Sync()) + trackError(t.head.Sync()) + return err +} + +func (t *freezerTable) dumpIndexStdout(start, stop int64) { + t.dumpIndex(os.Stdout, start, stop) +} + +func (t *freezerTable) dumpIndexString(start, stop int64) string { + var out bytes.Buffer + out.WriteString("\n") + t.dumpIndex(&out, start, stop) + return out.String() +} + +func (t *freezerTable) dumpIndex(w io.Writer, start, stop int64) { + meta, err := readMetadata(t.meta) + if err != nil { + fmt.Fprintf(w, "Failed to decode freezer table %v\n", err) + return + } + fmt.Fprintf(w, "Version %d count %d, deleted %d, hidden %d\n", meta.Version, + t.items.Load(), t.itemOffset.Load(), t.itemHidden.Load()) + + buf := make([]byte, indexEntrySize) + + fmt.Fprintf(w, "| number | fileno | offset |\n") + fmt.Fprintf(w, "|--------|--------|--------|\n") + + for i := uint64(start); ; i++ { + if _, err := t.index.ReadAt(buf, int64((i+1)*indexEntrySize)); err != nil { + break + } + var entry indexEntry + entry.unmarshalBinary(buf) + fmt.Fprintf(w, "| %03d | %03d | %03d | \n", i, entry.filenum, entry.offset) + if stop > 0 && i >= uint64(stop) { + break + } + } + fmt.Fprintf(w, "|--------------------------|\n") +} diff --git a/core/rawdb/freezer_table_test.go b/core/rawdb/freezer_table_test.go new file mode 100644 index 000000000..91b4943e5 --- /dev/null +++ b/core/rawdb/freezer_table_test.go @@ -0,0 +1,1369 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rawdb + +import ( + "bytes" + "encoding/binary" + "fmt" + "math/rand" + "os" + "path/filepath" + "reflect" + "testing" + "testing/quick" + + "github.com/davecgh/go-spew/spew" + "github.com/ethereum/go-ethereum/metrics" + "github.com/stretchr/testify/require" +) + +// TestFreezerBasics test initializing a freezertable from scratch, writing to the table, +// and reading it back. +func TestFreezerBasics(t *testing.T) { + t.Parallel() + // set cutoff at 50 bytes + f, err := newTable(os.TempDir(), + fmt.Sprintf("unittest-%d", rand.Uint64()), + metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), 50, true, false) + if err != nil { + t.Fatal(err) + } + defer f.Close() + + // Write 15 bytes 255 times, results in 85 files + writeChunks(t, f, 255, 15) + + //print(t, f, 0) + //print(t, f, 1) + //print(t, f, 2) + // + //db[0] = 000000000000000000000000000000 + //db[1] = 010101010101010101010101010101 + //db[2] = 020202020202020202020202020202 + + for y := 0; y < 255; y++ { + exp := getChunk(15, y) + got, err := f.Retrieve(uint64(y)) + if err != nil { + t.Fatalf("reading item %d: %v", y, err) + } + if !bytes.Equal(got, exp) { + t.Fatalf("test %d, got \n%x != \n%x", y, got, exp) + } + } + // Check that we cannot read too far + _, err = f.Retrieve(uint64(255)) + if err != errOutOfBounds { + t.Fatal(err) + } +} + +// TestFreezerBasicsClosing tests same as TestFreezerBasics, but also closes and reopens the freezer between +// every operation +func TestFreezerBasicsClosing(t *testing.T) { + t.Parallel() + // set cutoff at 50 bytes + var ( + fname = fmt.Sprintf("basics-close-%d", rand.Uint64()) + rm, wm, sg = metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() + f *freezerTable + err error + ) + f, err = newTable(os.TempDir(), fname, rm, wm, sg, 50, true, false) + if err != nil { + t.Fatal(err) + } + + // Write 15 bytes 255 times, results in 85 files. + // In-between writes, the table is closed and re-opened. + for x := 0; x < 255; x++ { + data := getChunk(15, x) + batch := f.newBatch() + require.NoError(t, batch.AppendRaw(uint64(x), data)) + require.NoError(t, batch.commit()) + f.Close() + + f, err = newTable(os.TempDir(), fname, rm, wm, sg, 50, true, false) + if err != nil { + t.Fatal(err) + } + } + defer f.Close() + + for y := 0; y < 255; y++ { + exp := getChunk(15, y) + got, err := f.Retrieve(uint64(y)) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(got, exp) { + t.Fatalf("test %d, got \n%x != \n%x", y, got, exp) + } + f.Close() + f, err = newTable(os.TempDir(), fname, rm, wm, sg, 50, true, false) + if err != nil { + t.Fatal(err) + } + } +} + +// TestFreezerRepairDanglingHead tests that we can recover if index entries are removed +func TestFreezerRepairDanglingHead(t *testing.T) { + t.Parallel() + rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() + fname := fmt.Sprintf("dangling_headtest-%d", rand.Uint64()) + + // Fill table + { + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true, false) + if err != nil { + t.Fatal(err) + } + // Write 15 bytes 255 times + writeChunks(t, f, 255, 15) + + // The last item should be there + if _, err = f.Retrieve(0xfe); err != nil { + t.Fatal(err) + } + f.Close() + } + + // open the index + idxFile, err := os.OpenFile(filepath.Join(os.TempDir(), fmt.Sprintf("%s.ridx", fname)), os.O_RDWR, 0644) + if err != nil { + t.Fatalf("Failed to open index file: %v", err) + } + // Remove 4 bytes + stat, err := idxFile.Stat() + if err != nil { + t.Fatalf("Failed to stat index file: %v", err) + } + idxFile.Truncate(stat.Size() - 4) + idxFile.Close() + + // Now open it again + { + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true, false) + if err != nil { + t.Fatal(err) + } + // The last item should be missing + if _, err = f.Retrieve(0xff); err == nil { + t.Errorf("Expected error for missing index entry") + } + // The one before should still be there + if _, err = f.Retrieve(0xfd); err != nil { + t.Fatalf("Expected no error, got %v", err) + } + } +} + +// TestFreezerRepairDanglingHeadLarge tests that we can recover if very many index entries are removed +func TestFreezerRepairDanglingHeadLarge(t *testing.T) { + t.Parallel() + rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() + fname := fmt.Sprintf("dangling_headtest-%d", rand.Uint64()) + + // Fill a table and close it + { + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true, false) + if err != nil { + t.Fatal(err) + } + // Write 15 bytes 255 times + writeChunks(t, f, 255, 15) + + // The last item should be there + if _, err = f.Retrieve(f.items.Load() - 1); err != nil { + t.Fatal(err) + } + f.Close() + } + + // open the index + idxFile, err := os.OpenFile(filepath.Join(os.TempDir(), fmt.Sprintf("%s.ridx", fname)), os.O_RDWR, 0644) + if err != nil { + t.Fatalf("Failed to open index file: %v", err) + } + // Remove everything but the first item, and leave data unaligned + // 0-indexEntry, 1-indexEntry, corrupt-indexEntry + idxFile.Truncate(2*indexEntrySize + indexEntrySize/2) + idxFile.Close() + + // Now open it again + { + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true, false) + if err != nil { + t.Fatal(err) + } + // The first item should be there + if _, err = f.Retrieve(0); err != nil { + t.Fatal(err) + } + // The second item should be missing + if _, err = f.Retrieve(1); err == nil { + t.Errorf("Expected error for missing index entry") + } + // We should now be able to store items again, from item = 1 + batch := f.newBatch() + for x := 1; x < 0xff; x++ { + require.NoError(t, batch.AppendRaw(uint64(x), getChunk(15, ^x))) + } + require.NoError(t, batch.commit()) + f.Close() + } + + // And if we open it, we should now be able to read all of them (new values) + { + f, _ := newTable(os.TempDir(), fname, rm, wm, sg, 50, true, false) + for y := 1; y < 255; y++ { + exp := getChunk(15, ^y) + got, err := f.Retrieve(uint64(y)) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(got, exp) { + t.Fatalf("test %d, got \n%x != \n%x", y, got, exp) + } + } + } +} + +// TestSnappyDetection tests that we fail to open a snappy database and vice versa +func TestSnappyDetection(t *testing.T) { + t.Parallel() + rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() + fname := fmt.Sprintf("snappytest-%d", rand.Uint64()) + + // Open with snappy + { + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true, false) + if err != nil { + t.Fatal(err) + } + // Write 15 bytes 255 times + writeChunks(t, f, 255, 15) + f.Close() + } + + // Open without snappy + { + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, false, false) + if err != nil { + t.Fatal(err) + } + if _, err = f.Retrieve(0); err == nil { + f.Close() + t.Fatalf("expected empty table") + } + } + + // Open with snappy + { + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true, false) + if err != nil { + t.Fatal(err) + } + // There should be 255 items + if _, err = f.Retrieve(0xfe); err != nil { + f.Close() + t.Fatalf("expected no error, got %v", err) + } + } +} + +func assertFileSize(f string, size int64) error { + stat, err := os.Stat(f) + if err != nil { + return err + } + if stat.Size() != size { + return fmt.Errorf("error, expected size %d, got %d", size, stat.Size()) + } + return nil +} + +// TestFreezerRepairDanglingIndex checks that if the index has more entries than there are data, +// the index is repaired +func TestFreezerRepairDanglingIndex(t *testing.T) { + t.Parallel() + rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() + fname := fmt.Sprintf("dangling_indextest-%d", rand.Uint64()) + + // Fill a table and close it + { + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true, false) + if err != nil { + t.Fatal(err) + } + // Write 15 bytes 9 times : 150 bytes + writeChunks(t, f, 9, 15) + + // The last item should be there + if _, err = f.Retrieve(f.items.Load() - 1); err != nil { + f.Close() + t.Fatal(err) + } + f.Close() + // File sizes should be 45, 45, 45 : items[3, 3, 3) + } + + // Crop third file + fileToCrop := filepath.Join(os.TempDir(), fmt.Sprintf("%s.0002.rdat", fname)) + // Truncate third file: 45 ,45, 20 + { + if err := assertFileSize(fileToCrop, 45); err != nil { + t.Fatal(err) + } + file, err := os.OpenFile(fileToCrop, os.O_RDWR, 0644) + if err != nil { + t.Fatal(err) + } + file.Truncate(20) + file.Close() + } + + // Open db it again + // It should restore the file(s) to + // 45, 45, 15 + // with 3+3+1 items + { + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true, false) + if err != nil { + t.Fatal(err) + } + defer f.Close() + if f.items.Load() != 7 { + t.Fatalf("expected %d items, got %d", 7, f.items.Load()) + } + if err := assertFileSize(fileToCrop, 15); err != nil { + t.Fatal(err) + } + } +} + +func TestFreezerTruncate(t *testing.T) { + t.Parallel() + rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() + fname := fmt.Sprintf("truncation-%d", rand.Uint64()) + + // Fill table + { + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true, false) + if err != nil { + t.Fatal(err) + } + // Write 15 bytes 30 times + writeChunks(t, f, 30, 15) + + // The last item should be there + if _, err = f.Retrieve(f.items.Load() - 1); err != nil { + t.Fatal(err) + } + f.Close() + } + + // Reopen, truncate + { + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true, false) + if err != nil { + t.Fatal(err) + } + defer f.Close() + f.truncateHead(10) // 150 bytes + if f.items.Load() != 10 { + t.Fatalf("expected %d items, got %d", 10, f.items.Load()) + } + // 45, 45, 45, 15 -- bytes should be 15 + if f.headBytes != 15 { + t.Fatalf("expected %d bytes, got %d", 15, f.headBytes) + } + } +} + +// TestFreezerRepairFirstFile tests a head file with the very first item only half-written. +// That will rewind the index, and _should_ truncate the head file +func TestFreezerRepairFirstFile(t *testing.T) { + t.Parallel() + rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() + fname := fmt.Sprintf("truncationfirst-%d", rand.Uint64()) + + // Fill table + { + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true, false) + if err != nil { + t.Fatal(err) + } + // Write 80 bytes, splitting out into two files + batch := f.newBatch() + require.NoError(t, batch.AppendRaw(0, getChunk(40, 0xFF))) + require.NoError(t, batch.AppendRaw(1, getChunk(40, 0xEE))) + require.NoError(t, batch.commit()) + + // The last item should be there + if _, err = f.Retrieve(1); err != nil { + t.Fatal(err) + } + f.Close() + } + + // Truncate the file in half + fileToCrop := filepath.Join(os.TempDir(), fmt.Sprintf("%s.0001.rdat", fname)) + { + if err := assertFileSize(fileToCrop, 40); err != nil { + t.Fatal(err) + } + file, err := os.OpenFile(fileToCrop, os.O_RDWR, 0644) + if err != nil { + t.Fatal(err) + } + file.Truncate(20) + file.Close() + } + + // Reopen + { + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true, false) + if err != nil { + t.Fatal(err) + } + if f.items.Load() != 1 { + f.Close() + t.Fatalf("expected %d items, got %d", 0, f.items.Load()) + } + + // Write 40 bytes + batch := f.newBatch() + require.NoError(t, batch.AppendRaw(1, getChunk(40, 0xDD))) + require.NoError(t, batch.commit()) + + f.Close() + + // Should have been truncated down to zero and then 40 written + if err := assertFileSize(fileToCrop, 40); err != nil { + t.Fatal(err) + } + } +} + +// TestFreezerReadAndTruncate tests: +// - we have a table open +// - do some reads, so files are open in readonly +// - truncate so those files are 'removed' +// - check that we did not keep the rdonly file descriptors +func TestFreezerReadAndTruncate(t *testing.T) { + t.Parallel() + rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() + fname := fmt.Sprintf("read_truncate-%d", rand.Uint64()) + + // Fill table + { + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true, false) + if err != nil { + t.Fatal(err) + } + // Write 15 bytes 30 times + writeChunks(t, f, 30, 15) + + // The last item should be there + if _, err = f.Retrieve(f.items.Load() - 1); err != nil { + t.Fatal(err) + } + f.Close() + } + + // Reopen and read all files + { + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true, false) + if err != nil { + t.Fatal(err) + } + if f.items.Load() != 30 { + f.Close() + t.Fatalf("expected %d items, got %d", 0, f.items.Load()) + } + for y := byte(0); y < 30; y++ { + f.Retrieve(uint64(y)) + } + + // Now, truncate back to zero + f.truncateHead(0) + + // Write the data again + batch := f.newBatch() + for x := 0; x < 30; x++ { + require.NoError(t, batch.AppendRaw(uint64(x), getChunk(15, ^x))) + } + require.NoError(t, batch.commit()) + f.Close() + } +} + +func TestFreezerOffset(t *testing.T) { + t.Parallel() + rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() + fname := fmt.Sprintf("offset-%d", rand.Uint64()) + + // Fill table + { + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true, false) + if err != nil { + t.Fatal(err) + } + + // Write 6 x 20 bytes, splitting out into three files + batch := f.newBatch() + require.NoError(t, batch.AppendRaw(0, getChunk(20, 0xFF))) + require.NoError(t, batch.AppendRaw(1, getChunk(20, 0xEE))) + + require.NoError(t, batch.AppendRaw(2, getChunk(20, 0xdd))) + require.NoError(t, batch.AppendRaw(3, getChunk(20, 0xcc))) + + require.NoError(t, batch.AppendRaw(4, getChunk(20, 0xbb))) + require.NoError(t, batch.AppendRaw(5, getChunk(20, 0xaa))) + require.NoError(t, batch.commit()) + + t.Log(f.dumpIndexString(0, 100)) + f.Close() + } + + // Now crop it. + { + // delete files 0 and 1 + for i := 0; i < 2; i++ { + p := filepath.Join(os.TempDir(), fmt.Sprintf("%v.%04d.rdat", fname, i)) + if err := os.Remove(p); err != nil { + t.Fatal(err) + } + } + // Read the index file + p := filepath.Join(os.TempDir(), fmt.Sprintf("%v.ridx", fname)) + indexFile, err := os.OpenFile(p, os.O_RDWR, 0644) + if err != nil { + t.Fatal(err) + } + indexBuf := make([]byte, 7*indexEntrySize) + indexFile.Read(indexBuf) + + // Update the index file, so that we store + // [ file = 2, offset = 4 ] at index zero + + zeroIndex := indexEntry{ + filenum: uint32(2), // First file is 2 + offset: uint32(4), // We have removed four items + } + buf := zeroIndex.append(nil) + + // Overwrite index zero + copy(indexBuf, buf) + + // Remove the four next indices by overwriting + copy(indexBuf[indexEntrySize:], indexBuf[indexEntrySize*5:]) + indexFile.WriteAt(indexBuf, 0) + + // Need to truncate the moved index items + indexFile.Truncate(indexEntrySize * (1 + 2)) + indexFile.Close() + } + + // Now open again + { + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true, false) + if err != nil { + t.Fatal(err) + } + defer f.Close() + t.Log(f.dumpIndexString(0, 100)) + + // It should allow writing item 6. + batch := f.newBatch() + require.NoError(t, batch.AppendRaw(6, getChunk(20, 0x99))) + require.NoError(t, batch.commit()) + + checkRetrieveError(t, f, map[uint64]error{ + 0: errOutOfBounds, + 1: errOutOfBounds, + 2: errOutOfBounds, + 3: errOutOfBounds, + }) + checkRetrieve(t, f, map[uint64][]byte{ + 4: getChunk(20, 0xbb), + 5: getChunk(20, 0xaa), + 6: getChunk(20, 0x99), + }) + } + + // Edit the index again, with a much larger initial offset of 1M. + { + // Read the index file + p := filepath.Join(os.TempDir(), fmt.Sprintf("%v.ridx", fname)) + indexFile, err := os.OpenFile(p, os.O_RDWR, 0644) + if err != nil { + t.Fatal(err) + } + indexBuf := make([]byte, 3*indexEntrySize) + indexFile.Read(indexBuf) + + // Update the index file, so that we store + // [ file = 2, offset = 1M ] at index zero + + zeroIndex := indexEntry{ + offset: uint32(1000000), // We have removed 1M items + filenum: uint32(2), // First file is 2 + } + buf := zeroIndex.append(nil) + + // Overwrite index zero + copy(indexBuf, buf) + indexFile.WriteAt(indexBuf, 0) + indexFile.Close() + } + + // Check that existing items have been moved to index 1M. + { + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true, false) + if err != nil { + t.Fatal(err) + } + defer f.Close() + t.Log(f.dumpIndexString(0, 100)) + + checkRetrieveError(t, f, map[uint64]error{ + 0: errOutOfBounds, + 1: errOutOfBounds, + 2: errOutOfBounds, + 3: errOutOfBounds, + 999999: errOutOfBounds, + }) + checkRetrieve(t, f, map[uint64][]byte{ + 1000000: getChunk(20, 0xbb), + 1000001: getChunk(20, 0xaa), + }) + } +} + +func assertTableSize(t *testing.T, f *freezerTable, size int) { + t.Helper() + if got, err := f.size(); got != uint64(size) { + t.Fatalf("expected size of %d bytes, got %d, err: %v", size, got, err) + } +} + +func TestTruncateTail(t *testing.T) { + t.Parallel() + rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() + fname := fmt.Sprintf("truncate-tail-%d", rand.Uint64()) + + // Fill table + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true, false) + if err != nil { + t.Fatal(err) + } + + // Write 7 x 20 bytes, splitting out into four files + batch := f.newBatch() + require.NoError(t, batch.AppendRaw(0, getChunk(20, 0xFF))) + require.NoError(t, batch.AppendRaw(1, getChunk(20, 0xEE))) + require.NoError(t, batch.AppendRaw(2, getChunk(20, 0xdd))) + require.NoError(t, batch.AppendRaw(3, getChunk(20, 0xcc))) + require.NoError(t, batch.AppendRaw(4, getChunk(20, 0xbb))) + require.NoError(t, batch.AppendRaw(5, getChunk(20, 0xaa))) + require.NoError(t, batch.AppendRaw(6, getChunk(20, 0x11))) + require.NoError(t, batch.commit()) + + // nothing to do, all the items should still be there. + f.truncateTail(0) + fmt.Println(f.dumpIndexString(0, 1000)) + checkRetrieve(t, f, map[uint64][]byte{ + 0: getChunk(20, 0xFF), + 1: getChunk(20, 0xEE), + 2: getChunk(20, 0xdd), + 3: getChunk(20, 0xcc), + 4: getChunk(20, 0xbb), + 5: getChunk(20, 0xaa), + 6: getChunk(20, 0x11), + }) + // maxFileSize*fileCount + headBytes + indexFileSize - hiddenBytes + expected := 20*7 + 48 - 0 + assertTableSize(t, f, expected) + + // truncate single element( item 0 ), deletion is only supported at file level + f.truncateTail(1) + fmt.Println(f.dumpIndexString(0, 1000)) + checkRetrieveError(t, f, map[uint64]error{ + 0: errOutOfBounds, + }) + checkRetrieve(t, f, map[uint64][]byte{ + 1: getChunk(20, 0xEE), + 2: getChunk(20, 0xdd), + 3: getChunk(20, 0xcc), + 4: getChunk(20, 0xbb), + 5: getChunk(20, 0xaa), + 6: getChunk(20, 0x11), + }) + expected = 20*7 + 48 - 20 + assertTableSize(t, f, expected) + + // Reopen the table, the deletion information should be persisted as well + f.Close() + f, err = newTable(os.TempDir(), fname, rm, wm, sg, 40, true, false) + if err != nil { + t.Fatal(err) + } + checkRetrieveError(t, f, map[uint64]error{ + 0: errOutOfBounds, + }) + checkRetrieve(t, f, map[uint64][]byte{ + 1: getChunk(20, 0xEE), + 2: getChunk(20, 0xdd), + 3: getChunk(20, 0xcc), + 4: getChunk(20, 0xbb), + 5: getChunk(20, 0xaa), + 6: getChunk(20, 0x11), + }) + + // truncate two elements( item 0, item 1 ), the file 0 should be deleted + f.truncateTail(2) + checkRetrieveError(t, f, map[uint64]error{ + 0: errOutOfBounds, + 1: errOutOfBounds, + }) + checkRetrieve(t, f, map[uint64][]byte{ + 2: getChunk(20, 0xdd), + 3: getChunk(20, 0xcc), + 4: getChunk(20, 0xbb), + 5: getChunk(20, 0xaa), + 6: getChunk(20, 0x11), + }) + expected = 20*5 + 36 - 0 + assertTableSize(t, f, expected) + + // Reopen the table, the above testing should still pass + f.Close() + f, err = newTable(os.TempDir(), fname, rm, wm, sg, 40, true, false) + if err != nil { + t.Fatal(err) + } + defer f.Close() + + checkRetrieveError(t, f, map[uint64]error{ + 0: errOutOfBounds, + 1: errOutOfBounds, + }) + checkRetrieve(t, f, map[uint64][]byte{ + 2: getChunk(20, 0xdd), + 3: getChunk(20, 0xcc), + 4: getChunk(20, 0xbb), + 5: getChunk(20, 0xaa), + 6: getChunk(20, 0x11), + }) + + // truncate 3 more elements( item 2, 3, 4), the file 1 should be deleted + // file 2 should only contain item 5 + f.truncateTail(5) + checkRetrieveError(t, f, map[uint64]error{ + 0: errOutOfBounds, + 1: errOutOfBounds, + 2: errOutOfBounds, + 3: errOutOfBounds, + 4: errOutOfBounds, + }) + checkRetrieve(t, f, map[uint64][]byte{ + 5: getChunk(20, 0xaa), + 6: getChunk(20, 0x11), + }) + expected = 20*3 + 24 - 20 + assertTableSize(t, f, expected) + + // truncate all, the entire freezer should be deleted + f.truncateTail(7) + checkRetrieveError(t, f, map[uint64]error{ + 0: errOutOfBounds, + 1: errOutOfBounds, + 2: errOutOfBounds, + 3: errOutOfBounds, + 4: errOutOfBounds, + 5: errOutOfBounds, + 6: errOutOfBounds, + }) + expected = 12 + assertTableSize(t, f, expected) +} + +func TestTruncateHead(t *testing.T) { + t.Parallel() + rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() + fname := fmt.Sprintf("truncate-head-blow-tail-%d", rand.Uint64()) + + // Fill table + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true, false) + if err != nil { + t.Fatal(err) + } + + // Write 7 x 20 bytes, splitting out into four files + batch := f.newBatch() + require.NoError(t, batch.AppendRaw(0, getChunk(20, 0xFF))) + require.NoError(t, batch.AppendRaw(1, getChunk(20, 0xEE))) + require.NoError(t, batch.AppendRaw(2, getChunk(20, 0xdd))) + require.NoError(t, batch.AppendRaw(3, getChunk(20, 0xcc))) + require.NoError(t, batch.AppendRaw(4, getChunk(20, 0xbb))) + require.NoError(t, batch.AppendRaw(5, getChunk(20, 0xaa))) + require.NoError(t, batch.AppendRaw(6, getChunk(20, 0x11))) + require.NoError(t, batch.commit()) + + f.truncateTail(4) // Tail = 4 + + // NewHead is required to be 3, the entire table should be truncated + f.truncateHead(4) + checkRetrieveError(t, f, map[uint64]error{ + 0: errOutOfBounds, // Deleted by tail + 1: errOutOfBounds, // Deleted by tail + 2: errOutOfBounds, // Deleted by tail + 3: errOutOfBounds, // Deleted by tail + 4: errOutOfBounds, // Deleted by Head + 5: errOutOfBounds, // Deleted by Head + 6: errOutOfBounds, // Deleted by Head + }) + + // Append new items + batch = f.newBatch() + require.NoError(t, batch.AppendRaw(4, getChunk(20, 0xbb))) + require.NoError(t, batch.AppendRaw(5, getChunk(20, 0xaa))) + require.NoError(t, batch.AppendRaw(6, getChunk(20, 0x11))) + require.NoError(t, batch.commit()) + + checkRetrieve(t, f, map[uint64][]byte{ + 4: getChunk(20, 0xbb), + 5: getChunk(20, 0xaa), + 6: getChunk(20, 0x11), + }) +} + +func checkRetrieve(t *testing.T, f *freezerTable, items map[uint64][]byte) { + t.Helper() + + for item, wantBytes := range items { + value, err := f.Retrieve(item) + if err != nil { + t.Fatalf("can't get expected item %d: %v", item, err) + } + if !bytes.Equal(value, wantBytes) { + t.Fatalf("item %d has wrong value %x (want %x)", item, value, wantBytes) + } + } +} + +func checkRetrieveError(t *testing.T, f *freezerTable, items map[uint64]error) { + t.Helper() + + for item, wantError := range items { + value, err := f.Retrieve(item) + if err == nil { + t.Fatalf("unexpected value %x for item %d, want error %v", item, value, wantError) + } + if err != wantError { + t.Fatalf("wrong error for item %d: %v", item, err) + } + } +} + +// Gets a chunk of data, filled with 'b' +func getChunk(size int, b int) []byte { + data := make([]byte, size) + for i := range data { + data[i] = byte(b) + } + return data +} + +// TODO (?) +// - test that if we remove several head-files, as well as data last data-file, +// the index is truncated accordingly +// Right now, the freezer would fail on these conditions: +// 1. have data files d0, d1, d2, d3 +// 2. remove d2,d3 +// +// However, all 'normal' failure modes arising due to failing to sync() or save a file +// should be handled already, and the case described above can only (?) happen if an +// external process/user deletes files from the filesystem. + +func writeChunks(t *testing.T, ft *freezerTable, n int, length int) { + t.Helper() + + batch := ft.newBatch() + for i := 0; i < n; i++ { + if err := batch.AppendRaw(uint64(i), getChunk(length, i)); err != nil { + t.Fatalf("AppendRaw(%d, ...) returned error: %v", i, err) + } + } + if err := batch.commit(); err != nil { + t.Fatalf("Commit returned error: %v", err) + } +} + +// TestSequentialRead does some basic tests on the RetrieveItems. +func TestSequentialRead(t *testing.T) { + rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() + fname := fmt.Sprintf("batchread-%d", rand.Uint64()) + { // Fill table + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true, false) + if err != nil { + t.Fatal(err) + } + // Write 15 bytes 30 times + writeChunks(t, f, 30, 15) + f.dumpIndexStdout(0, 30) + f.Close() + } + { // Open it, iterate, verify iteration + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true, false) + if err != nil { + t.Fatal(err) + } + items, err := f.RetrieveItems(0, 10000, 100000) + if err != nil { + t.Fatal(err) + } + if have, want := len(items), 30; have != want { + t.Fatalf("want %d items, have %d ", want, have) + } + for i, have := range items { + want := getChunk(15, i) + if !bytes.Equal(want, have) { + t.Fatalf("data corruption: have\n%x\n, want \n%x\n", have, want) + } + } + f.Close() + } + { // Open it, iterate, verify byte limit. The byte limit is less than item + // size, so each lookup should only return one item + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true, false) + if err != nil { + t.Fatal(err) + } + items, err := f.RetrieveItems(0, 10000, 10) + if err != nil { + t.Fatal(err) + } + if have, want := len(items), 1; have != want { + t.Fatalf("want %d items, have %d ", want, have) + } + for i, have := range items { + want := getChunk(15, i) + if !bytes.Equal(want, have) { + t.Fatalf("data corruption: have\n%x\n, want \n%x\n", have, want) + } + } + f.Close() + } +} + +// TestSequentialReadByteLimit does some more advanced tests on batch reads. +// These tests check that when the byte limit hits, we correctly abort in time, +// but also properly do all the deferred reads for the previous data, regardless +// of whether the data crosses a file boundary or not. +func TestSequentialReadByteLimit(t *testing.T) { + rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() + fname := fmt.Sprintf("batchread-2-%d", rand.Uint64()) + { // Fill table + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 100, true, false) + if err != nil { + t.Fatal(err) + } + // Write 10 bytes 30 times, + // Splitting it at every 100 bytes (10 items) + writeChunks(t, f, 30, 10) + f.Close() + } + for i, tc := range []struct { + items uint64 + limit uint64 + want int + }{ + {9, 89, 8}, + {10, 99, 9}, + {11, 109, 10}, + {100, 89, 8}, + {100, 99, 9}, + {100, 109, 10}, + } { + { + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 100, true, false) + if err != nil { + t.Fatal(err) + } + items, err := f.RetrieveItems(0, tc.items, tc.limit) + if err != nil { + t.Fatal(err) + } + if have, want := len(items), tc.want; have != want { + t.Fatalf("test %d: want %d items, have %d ", i, want, have) + } + for ii, have := range items { + want := getChunk(10, ii) + if !bytes.Equal(want, have) { + t.Fatalf("test %d: data corruption item %d: have\n%x\n, want \n%x\n", i, ii, have, want) + } + } + f.Close() + } + } +} + +// TestSequentialReadNoByteLimit tests the batch-read if maxBytes is not specified. +// Freezer should return the requested items regardless the size limitation. +func TestSequentialReadNoByteLimit(t *testing.T) { + rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() + fname := fmt.Sprintf("batchread-3-%d", rand.Uint64()) + { // Fill table + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 100, true, false) + if err != nil { + t.Fatal(err) + } + // Write 10 bytes 30 times, + // Splitting it at every 100 bytes (10 items) + writeChunks(t, f, 30, 10) + f.Close() + } + for i, tc := range []struct { + items uint64 + want int + }{ + {1, 1}, + {30, 30}, + {31, 30}, + } { + { + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 100, true, false) + if err != nil { + t.Fatal(err) + } + items, err := f.RetrieveItems(0, tc.items, 0) + if err != nil { + t.Fatal(err) + } + if have, want := len(items), tc.want; have != want { + t.Fatalf("test %d: want %d items, have %d ", i, want, have) + } + for ii, have := range items { + want := getChunk(10, ii) + if !bytes.Equal(want, have) { + t.Fatalf("test %d: data corruption item %d: have\n%x\n, want \n%x\n", i, ii, have, want) + } + } + f.Close() + } + } +} + +func TestFreezerReadonly(t *testing.T) { + tmpdir := os.TempDir() + // Case 1: Check it fails on non-existent file. + _, err := newTable(tmpdir, + fmt.Sprintf("readonlytest-%d", rand.Uint64()), + metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), 50, true, true) + if err == nil { + t.Fatal("readonly table instantiation should fail for non-existent table") + } + + // Case 2: Check that it fails on invalid index length. + fname := fmt.Sprintf("readonlytest-%d", rand.Uint64()) + idxFile, err := openFreezerFileForAppend(filepath.Join(tmpdir, fmt.Sprintf("%s.ridx", fname))) + if err != nil { + t.Errorf("Failed to open index file: %v\n", err) + } + // size should not be a multiple of indexEntrySize. + idxFile.Write(make([]byte, 17)) + idxFile.Close() + _, err = newTable(tmpdir, fname, + metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), 50, true, true) + if err == nil { + t.Errorf("readonly table instantiation should fail for invalid index size") + } + + // Case 3: Open table non-readonly table to write some data. + // Then corrupt the head file and make sure opening the table + // again in readonly triggers an error. + fname = fmt.Sprintf("readonlytest-%d", rand.Uint64()) + f, err := newTable(tmpdir, fname, + metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), 50, true, false) + if err != nil { + t.Fatalf("failed to instantiate table: %v", err) + } + writeChunks(t, f, 8, 32) + // Corrupt table file + if _, err := f.head.Write([]byte{1, 1}); err != nil { + t.Fatal(err) + } + if err := f.Close(); err != nil { + t.Fatal(err) + } + _, err = newTable(tmpdir, fname, + metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), 50, true, true) + if err == nil { + t.Errorf("readonly table instantiation should fail for corrupt table file") + } + + // Case 4: Write some data to a table and later re-open it as readonly. + // Should be successful. + fname = fmt.Sprintf("readonlytest-%d", rand.Uint64()) + f, err = newTable(tmpdir, fname, + metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), 50, true, false) + if err != nil { + t.Fatalf("failed to instantiate table: %v\n", err) + } + writeChunks(t, f, 32, 128) + if err := f.Close(); err != nil { + t.Fatal(err) + } + f, err = newTable(tmpdir, fname, + metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), 50, true, true) + if err != nil { + t.Fatal(err) + } + v, err := f.Retrieve(10) + if err != nil { + t.Fatal(err) + } + exp := getChunk(128, 10) + if !bytes.Equal(v, exp) { + t.Errorf("retrieved value is incorrect") + } + + // Case 5: Now write some data via a batch. + // This should fail either during AppendRaw or Commit + batch := f.newBatch() + writeErr := batch.AppendRaw(32, make([]byte, 1)) + if writeErr == nil { + writeErr = batch.commit() + } + if writeErr == nil { + t.Fatalf("Writing to readonly table should fail") + } +} + +// randTest performs random freezer table operations. +// Instances of this test are created by Generate. +type randTest []randTestStep + +type randTestStep struct { + op int + items []uint64 // for append and retrieve + blobs [][]byte // for append + target uint64 // for truncate(head/tail) + err error // for debugging +} + +const ( + opReload = iota + opAppend + opRetrieve + opTruncateHead + opTruncateHeadAll + opTruncateTail + opTruncateTailAll + opCheckAll + opMax // boundary value, not an actual op +) + +func getVals(first uint64, n int) [][]byte { + var ret [][]byte + for i := 0; i < n; i++ { + val := make([]byte, 8) + binary.BigEndian.PutUint64(val, first+uint64(i)) + ret = append(ret, val) + } + return ret +} + +func (randTest) Generate(r *rand.Rand, size int) reflect.Value { + var ( + deleted uint64 // The number of deleted items from tail + items []uint64 // The index of entries in table + + // getItems retrieves the indexes for items in table. + getItems = func(n int) []uint64 { + length := len(items) + if length == 0 { + return nil + } + var ret []uint64 + index := rand.Intn(length) + for i := index; len(ret) < n && i < length; i++ { + ret = append(ret, items[i]) + } + return ret + } + + // addItems appends the given length items into the table. + addItems = func(n int) []uint64 { + var first = deleted + if len(items) != 0 { + first = items[len(items)-1] + 1 + } + var ret []uint64 + for i := 0; i < n; i++ { + ret = append(ret, first+uint64(i)) + } + items = append(items, ret...) + return ret + } + ) + + var steps randTest + for i := 0; i < size; i++ { + step := randTestStep{op: r.Intn(opMax)} + switch step.op { + case opReload, opCheckAll: + case opAppend: + num := r.Intn(3) + step.items = addItems(num) + if len(step.items) == 0 { + step.blobs = nil + } else { + step.blobs = getVals(step.items[0], num) + } + case opRetrieve: + step.items = getItems(r.Intn(3)) + case opTruncateHead: + if len(items) == 0 { + step.target = deleted + } else { + index := r.Intn(len(items)) + items = items[:index] + step.target = deleted + uint64(index) + } + case opTruncateHeadAll: + step.target = deleted + items = items[:0] + case opTruncateTail: + if len(items) == 0 { + step.target = deleted + } else { + index := r.Intn(len(items)) + items = items[index:] + deleted += uint64(index) + step.target = deleted + } + case opTruncateTailAll: + step.target = deleted + uint64(len(items)) + items = items[:0] + deleted = step.target + } + steps = append(steps, step) + } + return reflect.ValueOf(steps) +} + +func runRandTest(rt randTest) bool { + fname := fmt.Sprintf("randtest-%d", rand.Uint64()) + f, err := newTable(os.TempDir(), fname, metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), 50, true, false) + if err != nil { + panic("failed to initialize table") + } + var values [][]byte + for i, step := range rt { + switch step.op { + case opReload: + f.Close() + f, err = newTable(os.TempDir(), fname, metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), 50, true, false) + if err != nil { + rt[i].err = fmt.Errorf("failed to reload table %v", err) + } + case opCheckAll: + tail := f.itemHidden.Load() + head := f.items.Load() + + if tail == head { + continue + } + got, err := f.RetrieveItems(f.itemHidden.Load(), head-tail, 100000) + if err != nil { + rt[i].err = err + } else { + if !reflect.DeepEqual(got, values) { + rt[i].err = fmt.Errorf("mismatch on retrieved values %v %v", got, values) + } + } + + case opAppend: + batch := f.newBatch() + for i := 0; i < len(step.items); i++ { + batch.AppendRaw(step.items[i], step.blobs[i]) + } + batch.commit() + values = append(values, step.blobs...) + + case opRetrieve: + var blobs [][]byte + if len(step.items) == 0 { + continue + } + tail := f.itemHidden.Load() + for i := 0; i < len(step.items); i++ { + blobs = append(blobs, values[step.items[i]-tail]) + } + got, err := f.RetrieveItems(step.items[0], uint64(len(step.items)), 100000) + if err != nil { + rt[i].err = err + } else { + if !reflect.DeepEqual(got, blobs) { + rt[i].err = fmt.Errorf("mismatch on retrieved values %v %v %v", got, blobs, step.items) + } + } + + case opTruncateHead: + f.truncateHead(step.target) + + length := f.items.Load() - f.itemHidden.Load() + values = values[:length] + + case opTruncateHeadAll: + f.truncateHead(step.target) + values = nil + + case opTruncateTail: + prev := f.itemHidden.Load() + f.truncateTail(step.target) + + truncated := f.itemHidden.Load() - prev + values = values[truncated:] + + case opTruncateTailAll: + f.truncateTail(step.target) + values = nil + } + // Abort the test on error. + if rt[i].err != nil { + return false + } + } + f.Close() + return true +} + +func TestRandom(t *testing.T) { + if err := quick.Check(runRandTest, nil); err != nil { + if cerr, ok := err.(*quick.CheckError); ok { + t.Fatalf("random test iteration %d failed: %s", cerr.Count, spew.Sdump(cerr.In)) + } + t.Fatal(err) + } +} diff --git a/core/rawdb/freezer_test.go b/core/rawdb/freezer_test.go new file mode 100644 index 000000000..2a1566389 --- /dev/null +++ b/core/rawdb/freezer_test.go @@ -0,0 +1,483 @@ +// Copyright 2021 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rawdb + +import ( + "bytes" + "errors" + "fmt" + "math/big" + "math/rand" + "os" + "path" + "path/filepath" + "sync" + "testing" + + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/rlp" + "github.com/stretchr/testify/require" +) + +var freezerTestTableDef = map[string]bool{"test": true} + +func TestFreezerModify(t *testing.T) { + t.Parallel() + + // Create test data. + var valuesRaw [][]byte + var valuesRLP []*big.Int + for x := 0; x < 100; x++ { + v := getChunk(256, x) + valuesRaw = append(valuesRaw, v) + iv := big.NewInt(int64(x)) + iv = iv.Exp(iv, iv, nil) + valuesRLP = append(valuesRLP, iv) + } + + tables := map[string]bool{"raw": true, "rlp": false} + f, _ := newFreezerForTesting(t, tables) + defer f.Close() + + // Commit test data. + _, err := f.ModifyAncients(func(op ethdb.AncientWriteOp) error { + for i := range valuesRaw { + if err := op.AppendRaw("raw", uint64(i), valuesRaw[i]); err != nil { + return err + } + if err := op.Append("rlp", uint64(i), valuesRLP[i]); err != nil { + return err + } + } + return nil + }) + if err != nil { + t.Fatal("ModifyAncients failed:", err) + } + + // Dump indexes. + for _, table := range f.tables { + t.Log(table.name, "index:", table.dumpIndexString(0, int64(len(valuesRaw)))) + } + + // Read back test data. + checkAncientCount(t, f, "raw", uint64(len(valuesRaw))) + checkAncientCount(t, f, "rlp", uint64(len(valuesRLP))) + for i := range valuesRaw { + v, _ := f.Ancient("raw", uint64(i)) + if !bytes.Equal(v, valuesRaw[i]) { + t.Fatalf("wrong raw value at %d: %x", i, v) + } + ivEnc, _ := f.Ancient("rlp", uint64(i)) + want, _ := rlp.EncodeToBytes(valuesRLP[i]) + if !bytes.Equal(ivEnc, want) { + t.Fatalf("wrong RLP value at %d: %x", i, ivEnc) + } + } +} + +// This checks that ModifyAncients rolls back freezer updates +// when the function passed to it returns an error. +func TestFreezerModifyRollback(t *testing.T) { + t.Parallel() + + f, dir := newFreezerForTesting(t, freezerTestTableDef) + + theError := errors.New("oops") + _, err := f.ModifyAncients(func(op ethdb.AncientWriteOp) error { + // Append three items. This creates two files immediately, + // because the table size limit of the test freezer is 2048. + require.NoError(t, op.AppendRaw("test", 0, make([]byte, 2048))) + require.NoError(t, op.AppendRaw("test", 1, make([]byte, 2048))) + require.NoError(t, op.AppendRaw("test", 2, make([]byte, 2048))) + return theError + }) + if err != theError { + t.Errorf("ModifyAncients returned wrong error %q", err) + } + checkAncientCount(t, f, "test", 0) + f.Close() + + // Reopen and check that the rolled-back data doesn't reappear. + tables := map[string]bool{"test": true} + f2, err := NewFreezer(dir, "", false, 2049, tables) + if err != nil { + t.Fatalf("can't reopen freezer after failed ModifyAncients: %v", err) + } + defer f2.Close() + checkAncientCount(t, f2, "test", 0) +} + +// This test runs ModifyAncients and Ancient concurrently with each other. +func TestFreezerConcurrentModifyRetrieve(t *testing.T) { + t.Parallel() + + f, _ := newFreezerForTesting(t, freezerTestTableDef) + defer f.Close() + + var ( + numReaders = 5 + writeBatchSize = uint64(50) + written = make(chan uint64, numReaders*6) + wg sync.WaitGroup + ) + wg.Add(numReaders + 1) + + // Launch the writer. It appends 10000 items in batches. + go func() { + defer wg.Done() + defer close(written) + for item := uint64(0); item < 10000; item += writeBatchSize { + _, err := f.ModifyAncients(func(op ethdb.AncientWriteOp) error { + for i := uint64(0); i < writeBatchSize; i++ { + item := item + i + value := getChunk(32, int(item)) + if err := op.AppendRaw("test", item, value); err != nil { + return err + } + } + return nil + }) + if err != nil { + panic(err) + } + for i := 0; i < numReaders; i++ { + written <- item + writeBatchSize + } + } + }() + + // Launch the readers. They read random items from the freezer up to the + // current frozen item count. + for i := 0; i < numReaders; i++ { + go func() { + defer wg.Done() + for frozen := range written { + for rc := 0; rc < 80; rc++ { + num := uint64(rand.Intn(int(frozen))) + value, err := f.Ancient("test", num) + if err != nil { + panic(fmt.Errorf("error reading %d (frozen %d): %v", num, frozen, err)) + } + if !bytes.Equal(value, getChunk(32, int(num))) { + panic(fmt.Errorf("wrong value at %d", num)) + } + } + } + }() + } + + wg.Wait() +} + +// This test runs ModifyAncients and TruncateHead concurrently with each other. +func TestFreezerConcurrentModifyTruncate(t *testing.T) { + f, _ := newFreezerForTesting(t, freezerTestTableDef) + defer f.Close() + + var item = make([]byte, 256) + + for i := 0; i < 10; i++ { + // First reset and write 100 items. + if _, err := f.TruncateHead(0); err != nil { + t.Fatal("truncate failed:", err) + } + _, err := f.ModifyAncients(func(op ethdb.AncientWriteOp) error { + for i := uint64(0); i < 100; i++ { + if err := op.AppendRaw("test", i, item); err != nil { + return err + } + } + return nil + }) + if err != nil { + t.Fatal("modify failed:", err) + } + checkAncientCount(t, f, "test", 100) + + // Now append 100 more items and truncate concurrently. + var ( + wg sync.WaitGroup + truncateErr error + modifyErr error + ) + wg.Add(3) + go func() { + _, modifyErr = f.ModifyAncients(func(op ethdb.AncientWriteOp) error { + for i := uint64(100); i < 200; i++ { + if err := op.AppendRaw("test", i, item); err != nil { + return err + } + } + return nil + }) + wg.Done() + }() + go func() { + _, truncateErr = f.TruncateHead(10) + wg.Done() + }() + go func() { + f.AncientSize("test") + wg.Done() + }() + wg.Wait() + + // Now check the outcome. If the truncate operation went through first, the append + // fails, otherwise it succeeds. In either case, the freezer should be positioned + // at 10 after both operations are done. + if truncateErr != nil { + t.Fatal("concurrent truncate failed:", err) + } + if !(errors.Is(modifyErr, nil) || errors.Is(modifyErr, errOutOrderInsertion)) { + t.Fatal("wrong error from concurrent modify:", modifyErr) + } + checkAncientCount(t, f, "test", 10) + } +} + +func TestFreezerReadonlyValidate(t *testing.T) { + tables := map[string]bool{"a": true, "b": true} + dir := t.TempDir() + // Open non-readonly freezer and fill individual tables + // with different amount of data. + f, err := NewFreezer(dir, "", false, 2049, tables) + if err != nil { + t.Fatal("can't open freezer", err) + } + var item = make([]byte, 1024) + aBatch := f.tables["a"].newBatch() + require.NoError(t, aBatch.AppendRaw(0, item)) + require.NoError(t, aBatch.AppendRaw(1, item)) + require.NoError(t, aBatch.AppendRaw(2, item)) + require.NoError(t, aBatch.commit()) + bBatch := f.tables["b"].newBatch() + require.NoError(t, bBatch.AppendRaw(0, item)) + require.NoError(t, bBatch.commit()) + if f.tables["a"].items.Load() != 3 { + t.Fatalf("unexpected number of items in table") + } + if f.tables["b"].items.Load() != 1 { + t.Fatalf("unexpected number of items in table") + } + require.NoError(t, f.Close()) + + // Re-openening as readonly should fail when validating + // table lengths. + _, err = NewFreezer(dir, "", true, 2049, tables) + if err == nil { + t.Fatal("readonly freezer should fail with differing table lengths") + } +} + +func TestFreezerConcurrentReadonly(t *testing.T) { + t.Parallel() + + tables := map[string]bool{"a": true} + dir := t.TempDir() + + f, err := NewFreezer(dir, "", false, 2049, tables) + if err != nil { + t.Fatal("can't open freezer", err) + } + var item = make([]byte, 1024) + batch := f.tables["a"].newBatch() + items := uint64(10) + for i := uint64(0); i < items; i++ { + require.NoError(t, batch.AppendRaw(i, item)) + } + require.NoError(t, batch.commit()) + if loaded := f.tables["a"].items.Load(); loaded != items { + t.Fatalf("unexpected number of items in table, want: %d, have: %d", items, loaded) + } + require.NoError(t, f.Close()) + + var ( + wg sync.WaitGroup + fs = make([]*Freezer, 5) + errs = make([]error, 5) + ) + for i := 0; i < 5; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + + f, err := NewFreezer(dir, "", true, 2049, tables) + if err == nil { + fs[i] = f + } else { + errs[i] = err + } + }(i) + } + + wg.Wait() + + for i := range fs { + if err := errs[i]; err != nil { + t.Fatal("failed to open freezer", err) + } + require.NoError(t, fs[i].Close()) + } +} + +func newFreezerForTesting(t *testing.T, tables map[string]bool) (*Freezer, string) { + t.Helper() + + dir := t.TempDir() + // note: using low max table size here to ensure the tests actually + // switch between multiple files. + f, err := NewFreezer(dir, "", false, 2049, tables) + if err != nil { + t.Fatal("can't open freezer", err) + } + return f, dir +} + +// checkAncientCount verifies that the freezer contains n items. +func checkAncientCount(t *testing.T, f *Freezer, kind string, n uint64) { + t.Helper() + + if frozen, _ := f.Ancients(); frozen != n { + t.Fatalf("Ancients() returned %d, want %d", frozen, n) + } + + // Check at index n-1. + if n > 0 { + index := n - 1 + if ok, _ := f.HasAncient(kind, index); !ok { + t.Errorf("HasAncient(%q, %d) returned false unexpectedly", kind, index) + } + if _, err := f.Ancient(kind, index); err != nil { + t.Errorf("Ancient(%q, %d) returned unexpected error %q", kind, index, err) + } + } + + // Check at index n. + index := n + if ok, _ := f.HasAncient(kind, index); ok { + t.Errorf("HasAncient(%q, %d) returned true unexpectedly", kind, index) + } + if _, err := f.Ancient(kind, index); err == nil { + t.Errorf("Ancient(%q, %d) didn't return expected error", kind, index) + } else if err != errOutOfBounds { + t.Errorf("Ancient(%q, %d) returned unexpected error %q", kind, index, err) + } +} + +func TestRenameWindows(t *testing.T) { + var ( + fname = "file.bin" + fname2 = "file2.bin" + data = []byte{1, 2, 3, 4} + data2 = []byte{2, 3, 4, 5} + data3 = []byte{3, 5, 6, 7} + dataLen = 4 + ) + + // Create 2 temp dirs + dir1 := t.TempDir() + dir2 := t.TempDir() + + // Create file in dir1 and fill with data + f, err := os.Create(filepath.Join(dir1, fname)) + if err != nil { + t.Fatal(err) + } + f2, err := os.Create(path.Join(dir1, fname2)) + if err != nil { + t.Fatal(err) + } + f3, err := os.Create(path.Join(dir2, fname2)) + if err != nil { + t.Fatal(err) + } + if _, err := f.Write(data); err != nil { + t.Fatal(err) + } + if _, err := f2.Write(data2); err != nil { + t.Fatal(err) + } + if _, err := f3.Write(data3); err != nil { + t.Fatal(err) + } + if err := f.Close(); err != nil { + t.Fatal(err) + } + if err := f2.Close(); err != nil { + t.Fatal(err) + } + if err := f3.Close(); err != nil { + t.Fatal(err) + } + if err := os.Rename(f.Name(), path.Join(dir2, fname)); err != nil { + t.Fatal(err) + } + if err := os.Rename(f2.Name(), path.Join(dir2, fname2)); err != nil { + t.Fatal(err) + } + + // Check file contents + f, err = os.Open(path.Join(dir2, fname)) + if err != nil { + t.Fatal(err) + } + defer f.Close() + defer os.Remove(f.Name()) + buf := make([]byte, dataLen) + if _, err := f.Read(buf); err != nil { + t.Fatal(err) + } + if !bytes.Equal(buf, data) { + t.Errorf("unexpected file contents. Got %v\n", buf) + } + + f, err = os.Open(path.Join(dir2, fname2)) + if err != nil { + t.Fatal(err) + } + defer f.Close() + defer os.Remove(f.Name()) + if _, err := f.Read(buf); err != nil { + t.Fatal(err) + } + if !bytes.Equal(buf, data2) { + t.Errorf("unexpected file contents. Got %v\n", buf) + } +} + +func TestFreezerCloseSync(t *testing.T) { + t.Parallel() + f, _ := newFreezerForTesting(t, map[string]bool{"a": true, "b": true}) + defer f.Close() + + // Now, close and sync. This mimics the behaviour if the node is shut down, + // just as the chain freezer is writing. + // 1: thread-1: chain treezer writes, via freezeRange (holds lock) + // 2: thread-2: Close called, waits for write to finish + // 3: thread-1: finishes writing, releases lock + // 4: thread-2: obtains lock, completes Close() + // 5: thread-1: calls f.Sync() + if err := f.Close(); err != nil { + t.Fatal(err) + } + if err := f.Sync(); err == nil { + t.Fatalf("want error, have nil") + } else if have, want := err.Error(), "[closed closed]"; have != want { + t.Fatalf("want %v, have %v", have, want) + } +} diff --git a/core/rawdb/freezer_utils.go b/core/rawdb/freezer_utils.go new file mode 100644 index 000000000..752e95ba6 --- /dev/null +++ b/core/rawdb/freezer_utils.go @@ -0,0 +1,131 @@ +// Copyright 2022 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rawdb + +import ( + "io" + "os" + "path/filepath" +) + +// copyFrom copies data from 'srcPath' at offset 'offset' into 'destPath'. +// The 'destPath' is created if it doesn't exist, otherwise it is overwritten. +// Before the copy is executed, there is a callback can be registered to +// manipulate the dest file. +// It is perfectly valid to have destPath == srcPath. +func copyFrom(srcPath, destPath string, offset uint64, before func(f *os.File) error) error { + // Create a temp file in the same dir where we want it to wind up + f, err := os.CreateTemp(filepath.Dir(destPath), "*") + if err != nil { + return err + } + fname := f.Name() + + // Clean up the leftover file + defer func() { + if f != nil { + f.Close() + } + os.Remove(fname) + }() + // Apply the given function if it's not nil before we copy + // the content from the src. + if before != nil { + if err := before(f); err != nil { + return err + } + } + // Open the source file + src, err := os.Open(srcPath) + if err != nil { + return err + } + if _, err = src.Seek(int64(offset), 0); err != nil { + src.Close() + return err + } + // io.Copy uses 32K buffer internally. + _, err = io.Copy(f, src) + if err != nil { + src.Close() + return err + } + // Rename the temporary file to the specified dest name. + // src may be same as dest, so needs to be closed before + // we do the final move. + src.Close() + + if err := f.Close(); err != nil { + return err + } + f = nil + return os.Rename(fname, destPath) +} + +// openFreezerFileForAppend opens a freezer table file and seeks to the end +func openFreezerFileForAppend(filename string) (*os.File, error) { + // Open the file without the O_APPEND flag + // because it has differing behaviour during Truncate operations + // on different OS's + file, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0644) + if err != nil { + return nil, err + } + // Seek to end for append + if _, err = file.Seek(0, io.SeekEnd); err != nil { + return nil, err + } + return file, nil +} + +// openFreezerFileForReadOnly opens a freezer table file for read only access +func openFreezerFileForReadOnly(filename string) (*os.File, error) { + return os.OpenFile(filename, os.O_RDONLY, 0644) +} + +// openFreezerFileTruncated opens a freezer table making sure it is truncated +func openFreezerFileTruncated(filename string) (*os.File, error) { + return os.OpenFile(filename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) +} + +// truncateFreezerFile resizes a freezer table file and seeks to the end +func truncateFreezerFile(file *os.File, size int64) error { + if err := file.Truncate(size); err != nil { + return err + } + // Seek to end for append + if _, err := file.Seek(0, io.SeekEnd); err != nil { + return err + } + return nil +} + +// grow prepares the slice space for new item, and doubles the slice capacity +// if space is not enough. +func grow(buf []byte, n int) []byte { + if cap(buf)-len(buf) < n { + newcap := 2 * cap(buf) + if newcap-len(buf) < n { + newcap = len(buf) + n + } + nbuf := make([]byte, len(buf), newcap) + copy(nbuf, buf) + buf = nbuf + } + buf = buf[:len(buf)+n] + return buf +} diff --git a/core/rawdb/freezer_utils_test.go b/core/rawdb/freezer_utils_test.go new file mode 100644 index 000000000..829cbfb4f --- /dev/null +++ b/core/rawdb/freezer_utils_test.go @@ -0,0 +1,75 @@ +// Copyright 2022 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rawdb + +import ( + "bytes" + "os" + "testing" +) + +func TestCopyFrom(t *testing.T) { + var ( + content = []byte{0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8} + prefix = []byte{0x9, 0xa, 0xb, 0xc, 0xd, 0xf} + ) + var cases = []struct { + src, dest string + offset uint64 + writePrefix bool + }{ + {"foo", "bar", 0, false}, + {"foo", "bar", 1, false}, + {"foo", "bar", 8, false}, + {"foo", "foo", 0, false}, + {"foo", "foo", 1, false}, + {"foo", "foo", 8, false}, + {"foo", "bar", 0, true}, + {"foo", "bar", 1, true}, + {"foo", "bar", 8, true}, + } + for _, c := range cases { + os.WriteFile(c.src, content, 0600) + + if err := copyFrom(c.src, c.dest, c.offset, func(f *os.File) error { + if !c.writePrefix { + return nil + } + f.Write(prefix) + return nil + }); err != nil { + os.Remove(c.src) + t.Fatalf("Failed to copy %v", err) + } + + blob, err := os.ReadFile(c.dest) + if err != nil { + os.Remove(c.src) + os.Remove(c.dest) + t.Fatalf("Failed to read %v", err) + } + want := content[c.offset:] + if c.writePrefix { + want = append(prefix, want...) + } + if !bytes.Equal(blob, want) { + t.Fatal("Unexpected value") + } + os.Remove(c.src) + os.Remove(c.dest) + } +}