Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

db: add support for checkpointing with remote files #3415

Merged
merged 1 commit into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ func mkdirAllAndSyncParents(fs vfs.FS, destDir string) (vfs.File, error) {
// space overhead for a checkpoint if hard links are disabled. Also beware that
// even if hard links are used, the space overhead for the checkpoint will
// increase over time as the DB performs compactions.
//
// Note that shared files in a checkpoint could get deleted if the DB is
// restarted after a checkpoint operation, as the reference for the checkpoint
// is only maintained in memory. This is okay as long as users of Checkpoint
// crash shortly afterwards with a "poison file" preventing further restarts.
func (d *DB) Checkpoint(
destDir string, opts ...CheckpointOption,
) (
Expand Down Expand Up @@ -268,6 +273,7 @@ func (d *DB) Checkpoint(
}

var excludedFiles map[deletedFileEntry]*fileMetadata
var remoteFiles []base.DiskFileNum
// Set of FileBacking.DiskFileNum which will be required by virtual sstables
// in the checkpoint.
requiredVirtualBackingFiles := make(map[base.DiskFileNum]struct{})
Expand All @@ -293,6 +299,21 @@ func (d *DB) Checkpoint(
}
requiredVirtualBackingFiles[fileBacking.DiskFileNum] = struct{}{}
}
meta, err := d.objProvider.Lookup(fileTypeTable, fileBacking.DiskFileNum)
if err != nil {
ckErr = err
return ckErr
}
if meta.IsRemote() {
// We don't copy remote files. This is desirable as checkpointing is
// supposed to be a fast operation, and references to remote files can
// always be resolved by any checkpoint readers by reading the object
// catalog. We don't add this file to excludedFiles either, as that'd
// cause it to be deleted in the second manifest entry which is also
// inaccurate.
remoteFiles = append(remoteFiles, meta.DiskFileNum)
continue
}

srcPath := base.MakeFilepath(fs, d.dirname, fileTypeTable, fileBacking.DiskFileNum)
destPath := fs.PathJoin(destDir, fs.PathBase(srcPath))
Expand All @@ -319,6 +340,12 @@ func (d *DB) Checkpoint(
if ckErr != nil {
return ckErr
}
if len(remoteFiles) > 0 {
ckErr = d.objProvider.CheckpointState(fs, destDir, fileTypeTable, remoteFiles)
if ckErr != nil {
return ckErr
}
}

// Copy the WAL files. We copy rather than link because WAL file recycling
// will cause the WAL files to be reused which would invalidate the
Expand Down
31 changes: 29 additions & 2 deletions checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,20 @@ import (
"context"
"fmt"
"math/rand"
"runtime"
"sort"
"strings"
"sync"
"testing"

"github.com/cockroachdb/datadriven"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/objstorage/remote"
"github.com/cockroachdb/pebble/vfs"
"github.com/stretchr/testify/require"
)

func TestCheckpoint(t *testing.T) {
func testCheckpointImpl(t *testing.T, ddFile string, createOnShared bool) {
dbs := make(map[string]*DB)
defer func() {
for _, db := range dbs {
Expand All @@ -32,17 +34,24 @@ func TestCheckpoint(t *testing.T) {

mem := vfs.NewMem()
var memLog base.InMemLogger
remoteMem := remote.NewInMem()
opts := &Options{
FS: vfs.WithLogging(mem, memLog.Infof),
FormatMajorVersion: internalFormatNewest,
L0CompactionThreshold: 10,
DisableAutomaticCompactions: true,
Logger: testLogger{t},
}
opts.Experimental.RemoteStorage = remote.MakeSimpleFactory(map[remote.Locator]remote.Storage{
"": remoteMem,
})
if createOnShared {
opts.Experimental.CreateOnShared = remote.CreateOnSharedAll
}
opts.DisableTableStats = true
opts.private.testingAlwaysWaitForCleanup = true

datadriven.RunTest(t, "testdata/checkpoint", func(t *testing.T, td *datadriven.TestData) string {
datadriven.RunTest(t, ddFile, func(t *testing.T, td *datadriven.TestData) string {
switch td.Cmd {
case "batch":
if len(td.CmdArgs) != 1 {
Expand Down Expand Up @@ -192,6 +201,12 @@ func TestCheckpoint(t *testing.T) {
return err.Error()
}
dbs[dir] = d
if len(dbs) == 1 && createOnShared {
// This is the first db. Set a creator ID.
if err := d.SetCreatorID(1); err != nil {
return err.Error()
}
}
return memLog.String()

case "scan":
Expand All @@ -216,6 +231,18 @@ func TestCheckpoint(t *testing.T) {
})
}

func TestCheckpoint(t *testing.T) {
t.Run("shared=false", func(t *testing.T) {
testCheckpointImpl(t, "testdata/checkpoint", false /* createOnShared */)
})
t.Run("shared=true", func(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skipf("skipped on windows")
}
testCheckpointImpl(t, "testdata/checkpoint_shared", true /* createOnShared */)
})
}

func TestCheckpointCompaction(t *testing.T) {
fs := vfs.NewMem()
d, err := Open("", &Options{FS: fs, Logger: testLogger{t: t}})
Expand Down
5 changes: 5 additions & 0 deletions objstorage/objstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,11 @@ type Provider interface {
// directory does not exist.
IsNotExistError(err error) bool

// CheckpointState saves any saved state on local disk to the specified
// directory on the specified VFS. A new Pebble instance instantiated at that
// path should be able to resolve references to the specified files.
CheckpointState(fs vfs.FS, dir string, fileType base.FileType, fileNums []base.DiskFileNum) error

// Metrics returns metrics about objstorage. Currently, it only returns metrics
// about the shared cache.
Metrics() sharedcache.Metrics
Expand Down
24 changes: 24 additions & 0 deletions objstorage/objstorageprovider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,30 @@ func (p *provider) Metrics() sharedcache.Metrics {
return sharedcache.Metrics{}
}

// CheckpointState is part of the objstorage.Provider interface.
func (p *provider) CheckpointState(
fs vfs.FS, dir string, fileType base.FileType, fileNums []base.DiskFileNum,
) error {
p.mu.Lock()
defer p.mu.Unlock()
for i := range fileNums {
if _, ok := p.mu.knownObjects[fileNums[i]]; !ok {
return errors.Wrapf(
os.ErrNotExist,
"file %s (type %d) unknown to the objstorage provider",
fileNums[i], errors.Safe(fileType),
)
}
// Prevent this object from deletion, at least for the life of this instance.
p.mu.protectedObjects[fileNums[i]] = p.mu.protectedObjects[fileNums[i]] + 1
}

if p.remote.catalog != nil {
return p.remote.catalog.Checkpoint(fs, dir)
}
return nil
}

func (p *provider) addMetadata(meta objstorage.ObjectMetadata) {
p.mu.Lock()
defer p.mu.Unlock()
Expand Down
23 changes: 23 additions & 0 deletions objstorage/objstorageprovider/remoteobjcat/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"cmp"
"fmt"
"io"
"path/filepath"
"slices"
"sync"

Expand Down Expand Up @@ -378,6 +379,28 @@ func (c *Catalog) createNewCatalogFileLocked() (outErr error) {
return nil
}

// Checkpoint copies catalog state to a file in the specified directory
func (c *Catalog) Checkpoint(fs vfs.FS, dir string) error {
c.mu.Lock()
defer c.mu.Unlock()

// NB: Every write to recWriter is flushed. We don't need to worry about
// this new file descriptor not getting all the saved catalog entries.
existingCatalogFilepath := filepath.Join(c.dirname, c.mu.catalogFilename)
destPath := filepath.Join(dir, c.mu.catalogFilename)
if err := vfs.CopyAcrossFS(c.fs, existingCatalogFilepath, fs, destPath); err != nil {
return err
}
catalogMarker, _, err := atomicfs.LocateMarker(fs, dir, catalogMarkerName)
if err != nil {
return err
}
if err := catalogMarker.Move(c.mu.catalogFilename); err != nil {
return err
}
return catalogMarker.Close()
}

func writeRecord(ve *VersionEdit, file vfs.File, recWriter *record.Writer) error {
w, err := recWriter.Next()
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions testdata/checkpoint
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ close: db/marker.manifest.000001.MANIFEST-000001
sync: db
open-dir: db
open-dir: db
open-dir: db
sync: db/MANIFEST-000001
create: db/000002.log
sync: db
Expand Down Expand Up @@ -281,6 +282,7 @@ open: checkpoints/checkpoint1/MANIFEST-000001
close: checkpoints/checkpoint1/MANIFEST-000001
open-dir: checkpoints/checkpoint1
open-dir: checkpoints/checkpoint1
open-dir: checkpoints/checkpoint1
open: checkpoints/checkpoint1/OPTIONS-000003
close: checkpoints/checkpoint1/OPTIONS-000003
open: checkpoints/checkpoint1/000006.log
Expand Down Expand Up @@ -347,6 +349,7 @@ open: checkpoints/checkpoint2/MANIFEST-000001
close: checkpoints/checkpoint2/MANIFEST-000001
open-dir: checkpoints/checkpoint2
open-dir: checkpoints/checkpoint2
open-dir: checkpoints/checkpoint2
open: checkpoints/checkpoint2/OPTIONS-000003
close: checkpoints/checkpoint2/OPTIONS-000003
open: checkpoints/checkpoint2/000006.log
Expand Down Expand Up @@ -388,6 +391,7 @@ open: checkpoints/checkpoint3/MANIFEST-000001
close: checkpoints/checkpoint3/MANIFEST-000001
open-dir: checkpoints/checkpoint3
open-dir: checkpoints/checkpoint3
open-dir: checkpoints/checkpoint3
open: checkpoints/checkpoint3/OPTIONS-000003
close: checkpoints/checkpoint3/OPTIONS-000003
open: checkpoints/checkpoint3/000006.log
Expand Down Expand Up @@ -525,6 +529,7 @@ open: checkpoints/checkpoint4/MANIFEST-000001
close: checkpoints/checkpoint4/MANIFEST-000001
open-dir: checkpoints/checkpoint4
open-dir: checkpoints/checkpoint4
open-dir: checkpoints/checkpoint4
open: checkpoints/checkpoint4/OPTIONS-000003
close: checkpoints/checkpoint4/OPTIONS-000003
open: checkpoints/checkpoint4/000008.log
Expand Down Expand Up @@ -635,6 +640,7 @@ open: checkpoints/checkpoint5/MANIFEST-000001
close: checkpoints/checkpoint5/MANIFEST-000001
open-dir: checkpoints/checkpoint5
open-dir: checkpoints/checkpoint5
open-dir: checkpoints/checkpoint5
open: checkpoints/checkpoint5/OPTIONS-000003
close: checkpoints/checkpoint5/OPTIONS-000003
open: checkpoints/checkpoint5/000008.log
Expand Down Expand Up @@ -731,6 +737,7 @@ open: checkpoints/checkpoint6/MANIFEST-000001
close: checkpoints/checkpoint6/MANIFEST-000001
open-dir: checkpoints/checkpoint6
open-dir: checkpoints/checkpoint6
open-dir: checkpoints/checkpoint6
open: checkpoints/checkpoint6/OPTIONS-000003
close: checkpoints/checkpoint6/OPTIONS-000003
open: checkpoints/checkpoint6/000008.log
Expand Down
Loading
Loading