Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/rawdb: drop MigrateTable #30331

Merged
merged 1 commit into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions core/rawdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,6 @@ func (db *nofreezedb) ReadAncients(fn func(reader ethdb.AncientReaderOp) error)
return fn(db)
}

// MigrateTable processes the entries in a given table in sequence
// converting them to a new format if they're of an old format.
func (db *nofreezedb) MigrateTable(kind string, convert convertLegacyFn) error {
return errNotSupported
}

// AncientDatadir returns an error as we don't have a backing chain freezer.
func (db *nofreezedb) AncientDatadir() (string, error) {
return "", errNotSupported
Expand Down
114 changes: 0 additions & 114 deletions core/rawdb/freezer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ import (
"path/filepath"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
Expand Down Expand Up @@ -389,115 +387,3 @@ func (f *Freezer) repair() error {
f.tail.Store(tail)
return nil
}

// convertLegacyFn takes a raw freezer entry in an older format and
// returns it in the new format.
type convertLegacyFn = func([]byte) ([]byte, error)

// MigrateTable processes the entries in a given table in sequence
// converting them to a new format if they're of an old format.
func (f *Freezer) MigrateTable(kind string, convert convertLegacyFn) error {
if f.readonly {
return errReadOnly
}
f.writeLock.Lock()
defer f.writeLock.Unlock()

table, ok := f.tables[kind]
if !ok {
return errUnknownTable
}
// forEach iterates every entry in the table serially and in order, calling `fn`
// with the item as argument. If `fn` returns an error the iteration stops
// and that error will be returned.
forEach := func(t *freezerTable, offset uint64, fn func(uint64, []byte) error) error {
var (
items = t.items.Load()
batchSize = uint64(1024)
maxBytes = uint64(1024 * 1024)
)
for i := offset; i < items; {
if i+batchSize > items {
batchSize = items - i
}
data, err := t.RetrieveItems(i, batchSize, maxBytes)
if err != nil {
return err
}
for j, item := range data {
if err := fn(i+uint64(j), item); err != nil {
return err
}
}
i += uint64(len(data))
}
return nil
}
// TODO(s1na): This is a sanity-check since as of now no process does tail-deletion. But the migration
// process assumes no deletion at tail and needs to be modified to account for that.
if table.itemOffset.Load() > 0 || table.itemHidden.Load() > 0 {
return errors.New("migration not supported for tail-deleted freezers")
}
ancientsPath := filepath.Dir(table.index.Name())
// Set up new dir for the migrated table, the content of which
// we'll at the end move over to the ancients dir.
migrationPath := filepath.Join(ancientsPath, "migration")
newTable, err := newFreezerTable(migrationPath, kind, table.noCompression, false)
if err != nil {
return err
}
var (
batch = newTable.newBatch()
out []byte
start = time.Now()
logged = time.Now()
offset = newTable.items.Load()
)
if offset > 0 {
log.Info("found previous migration attempt", "migrated", offset)
}
// Iterate through entries and transform them
if err := forEach(table, offset, func(i uint64, blob []byte) error {
if i%10000 == 0 && time.Since(logged) > 16*time.Second {
log.Info("Processing legacy elements", "count", i, "elapsed", common.PrettyDuration(time.Since(start)))
logged = time.Now()
}
out, err = convert(blob)
if err != nil {
return err
}
if err := batch.AppendRaw(i, out); err != nil {
return err
}
return nil
}); err != nil {
return err
}
if err := batch.commit(); err != nil {
return err
}
log.Info("Replacing old table files with migrated ones", "elapsed", common.PrettyDuration(time.Since(start)))
// Release and delete old table files. Note this won't
// delete the index file.
table.releaseFilesAfter(0, true)

if err := newTable.Close(); err != nil {
return err
}
files, err := os.ReadDir(migrationPath)
if err != nil {
return err
}
// Move migrated files to ancients dir.
for _, f := range files {
// This will replace the old index file as a side-effect.
if err := os.Rename(filepath.Join(migrationPath, f.Name()), filepath.Join(ancientsPath, f.Name())); err != nil {
return err
}
}
// Delete by now empty dir.
if err := os.Remove(migrationPath); err != nil {
return err
}
return nil
}
7 changes: 0 additions & 7 deletions core/rawdb/freezer_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,13 +394,6 @@ func (f *MemoryFreezer) Sync() error {
return nil
}

// MigrateTable processes and migrates entries of a given table to a new format.
// The second argument is a function that takes a raw entry and returns it
// in the newest format.
func (f *MemoryFreezer) MigrateTable(string, func([]byte) ([]byte, error)) error {
return errors.New("not implemented")
}

// Close releases all the sources held by the memory freezer. It will panic if
// any following invocation is made to a closed freezer.
func (f *MemoryFreezer) Close() error {
Expand Down
9 changes: 0 additions & 9 deletions core/rawdb/freezer_resettable.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,15 +202,6 @@ func (f *resettableFreezer) Sync() error {
return f.freezer.Sync()
}

// MigrateTable processes the entries in a given table in sequence
// converting them to a new format if they're of an old format.
func (f *resettableFreezer) MigrateTable(kind string, convert convertLegacyFn) error {
f.lock.RLock()
defer f.lock.RUnlock()

return f.freezer.MigrateTable(kind, convert)
}

// cleanup removes the directory located in the specified path
// has the name with deletion marker suffix.
func cleanup(path string) error {
Expand Down
83 changes: 0 additions & 83 deletions core/rawdb/freezer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (
"fmt"
"math/big"
"math/rand"
"os"
"path/filepath"
"sync"
"testing"

Expand Down Expand Up @@ -379,87 +377,6 @@ func checkAncientCount(t *testing.T, f *Freezer, kind string, n uint64) {
}
}

func TestRenameWindows(t *testing.T) {
var (
fname = "file.bin"
fname2 = "file2.bin"
data = []byte{1, 2, 3, 4}
data2 = []byte{2, 3, 4, 5}
data3 = []byte{3, 5, 6, 7}
dataLen = 4
)

// Create 2 temp dirs
dir1 := t.TempDir()
dir2 := t.TempDir()

// Create file in dir1 and fill with data
f, err := os.Create(filepath.Join(dir1, fname))
if err != nil {
t.Fatal(err)
}
f2, err := os.Create(filepath.Join(dir1, fname2))
if err != nil {
t.Fatal(err)
}
f3, err := os.Create(filepath.Join(dir2, fname2))
if err != nil {
t.Fatal(err)
}
if _, err := f.Write(data); err != nil {
t.Fatal(err)
}
if _, err := f2.Write(data2); err != nil {
t.Fatal(err)
}
if _, err := f3.Write(data3); err != nil {
t.Fatal(err)
}
if err := f.Close(); err != nil {
t.Fatal(err)
}
if err := f2.Close(); err != nil {
t.Fatal(err)
}
if err := f3.Close(); err != nil {
t.Fatal(err)
}
if err := os.Rename(f.Name(), filepath.Join(dir2, fname)); err != nil {
t.Fatal(err)
}
if err := os.Rename(f2.Name(), filepath.Join(dir2, fname2)); err != nil {
t.Fatal(err)
}

// Check file contents
f, err = os.Open(filepath.Join(dir2, fname))
if err != nil {
t.Fatal(err)
}
defer f.Close()
defer os.Remove(f.Name())
buf := make([]byte, dataLen)
if _, err := f.Read(buf); err != nil {
t.Fatal(err)
}
if !bytes.Equal(buf, data) {
t.Errorf("unexpected file contents. Got %v\n", buf)
}

f, err = os.Open(filepath.Join(dir2, fname2))
if err != nil {
t.Fatal(err)
}
defer f.Close()
defer os.Remove(f.Name())
if _, err := f.Read(buf); err != nil {
t.Fatal(err)
}
if !bytes.Equal(buf, data2) {
t.Errorf("unexpected file contents. Got %v\n", buf)
}
}

func TestFreezerCloseSync(t *testing.T) {
t.Parallel()
f, _ := newFreezerForTesting(t, map[string]bool{"a": true, "b": true})
Expand Down
6 changes: 0 additions & 6 deletions core/rawdb/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,6 @@ func (t *table) Sync() error {
return t.db.Sync()
}

// MigrateTable processes the entries in a given table in sequence
// converting them to a new format if they're of an old format.
func (t *table) MigrateTable(kind string, convert convertLegacyFn) error {
return t.db.MigrateTable(kind, convert)
}

// AncientDatadir returns the ancient datadir of the underlying database.
func (t *table) AncientDatadir() (string, error) {
return t.db.AncientDatadir()
Expand Down
5 changes: 0 additions & 5 deletions ethdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,6 @@ type AncientWriter interface {

// Sync flushes all in-memory ancient store data to disk.
Sync() error

// MigrateTable processes and migrates entries of a given table to a new format.
// The second argument is a function that takes a raw entry and returns it
// in the newest format.
MigrateTable(string, func([]byte) ([]byte, error)) error
}

// AncientWriteOp is given to the function argument of ModifyAncients.
Expand Down
4 changes: 0 additions & 4 deletions ethdb/remotedb/remotedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,6 @@ func (db *Database) Sync() error {
return nil
}

func (db *Database) MigrateTable(s string, f func([]byte) ([]byte, error)) error {
panic("not supported")
}

func (db *Database) NewBatch() ethdb.Batch {
panic("not supported")
}
Expand Down