Skip to content

Commit

Permalink
db: add support for checkpointing with remote files
Browse files Browse the repository at this point in the history
Previously, if we couldn't read a remote file's DiskFileNum locally,
we'd error out on the Checkpoint() function even though the store
was valid and the file just happened to be remote (shared or external).
This change adds support for saving the catalog's state to the checkpoint
directory so references to remote files from the checkpoint can still
be resolved. To prevent unreffing of shared files, we need to add a dangling
reference to them that doesn't get deleted.

Fixes cockroachdb#3362.
  • Loading branch information
itsbilal committed Mar 15, 2024
1 parent 65193e0 commit e8b1c6f
Show file tree
Hide file tree
Showing 7 changed files with 439 additions and 2 deletions.
22 changes: 22 additions & 0 deletions checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ func (d *DB) Checkpoint(
}

var excludedFiles map[deletedFileEntry]*fileMetadata
var remoteFiles []base.DiskFileNum
// Set of FileBacking.DiskFileNum which will be required by virtual sstables
// in the checkpoint.
requiredVirtualBackingFiles := make(map[base.DiskFileNum]struct{})
Expand All @@ -293,6 +294,21 @@ func (d *DB) Checkpoint(
}
requiredVirtualBackingFiles[fileBacking.DiskFileNum] = struct{}{}
}
meta, err := d.objProvider.Lookup(fileTypeTable, fileBacking.DiskFileNum)
if err != nil {
ckErr = err
return ckErr
}
if meta.IsRemote() {
// We don't copy remote files. This is desirable as checkpointing is
// supposed to be a fast operation, and references to remote files can
// always be resolved by any checkpoint readers by reading the object
// catalog. We don't add this file to excludedFiles either, as that'd
// cause it to be deleted in the second manifest entry which is also
// inaccurate.
remoteFiles = append(remoteFiles, meta.DiskFileNum)
continue
}

srcPath := base.MakeFilepath(fs, d.dirname, fileTypeTable, fileBacking.DiskFileNum)
destPath := fs.PathJoin(destDir, fs.PathBase(srcPath))
Expand All @@ -319,6 +335,12 @@ func (d *DB) Checkpoint(
if ckErr != nil {
return ckErr
}
if len(remoteFiles) > 0 {
ckErr = d.objProvider.CheckpointState(fs, destDir, fileTypeTable, remoteFiles)
if ckErr != nil {
return ckErr
}
}

// Copy the WAL files. We copy rather than link because WAL file recycling
// will cause the WAL files to be reused which would invalidate the
Expand Down
27 changes: 25 additions & 2 deletions checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ import (

"github.com/cockroachdb/datadriven"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/objstorage/remote"
"github.com/cockroachdb/pebble/vfs"
"github.com/stretchr/testify/require"
)

func TestCheckpoint(t *testing.T) {
func testCheckpointImpl(t *testing.T, ddFile string, createOnShared bool) {
dbs := make(map[string]*DB)
defer func() {
for _, db := range dbs {
Expand All @@ -32,17 +33,24 @@ func TestCheckpoint(t *testing.T) {

mem := vfs.NewMem()
var memLog base.InMemLogger
remoteMem := remote.NewInMem()
opts := &Options{
FS: vfs.WithLogging(mem, memLog.Infof),
FormatMajorVersion: internalFormatNewest,
L0CompactionThreshold: 10,
DisableAutomaticCompactions: true,
Logger: testLogger{t},
}
opts.Experimental.RemoteStorage = remote.MakeSimpleFactory(map[remote.Locator]remote.Storage{
"": remoteMem,
})
if createOnShared {
opts.Experimental.CreateOnShared = remote.CreateOnSharedAll
}
opts.DisableTableStats = true
opts.private.testingAlwaysWaitForCleanup = true

datadriven.RunTest(t, "testdata/checkpoint", func(t *testing.T, td *datadriven.TestData) string {
datadriven.RunTest(t, ddFile, func(t *testing.T, td *datadriven.TestData) string {
switch td.Cmd {
case "batch":
if len(td.CmdArgs) != 1 {
Expand Down Expand Up @@ -192,6 +200,12 @@ func TestCheckpoint(t *testing.T) {
return err.Error()
}
dbs[dir] = d
if len(dbs) == 1 && createOnShared {
// This is the first db. Set a creator ID.
if err := d.SetCreatorID(1); err != nil {
return err.Error()
}
}
return memLog.String()

case "scan":
Expand All @@ -216,6 +230,15 @@ func TestCheckpoint(t *testing.T) {
})
}

func TestCheckpoint(t *testing.T) {
t.Run("shared=false", func(t *testing.T) {
testCheckpointImpl(t, "testdata/checkpoint", false /* createOnShared */)
})
t.Run("shared=true", func(t *testing.T) {
testCheckpointImpl(t, "testdata/checkpoint_shared", true /* createOnShared */)
})
}

func TestCheckpointCompaction(t *testing.T) {
fs := vfs.NewMem()
d, err := Open("", &Options{FS: fs, Logger: testLogger{t: t}})
Expand Down
5 changes: 5 additions & 0 deletions objstorage/objstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,11 @@ type Provider interface {
// directory does not exist.
IsNotExistError(err error) bool

// CheckpointState saves any saved state on local disk to the specified
// directory on the specified VFS. A new Pebble instance instantiated at that
// path should be able to resolve references to the specified files.
CheckpointState(fs vfs.FS, dir string, fileType base.FileType, fileNums []base.DiskFileNum) error

// Metrics returns metrics about objstorage. Currently, it only returns metrics
// about the shared cache.
Metrics() sharedcache.Metrics
Expand Down
24 changes: 24 additions & 0 deletions objstorage/objstorageprovider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,30 @@ func (p *provider) Metrics() sharedcache.Metrics {
return sharedcache.Metrics{}
}

// CheckpointState is part of the objstorage.Provider interface.
func (p *provider) CheckpointState(
fs vfs.FS, dir string, fileType base.FileType, fileNums []base.DiskFileNum,
) error {
p.mu.Lock()
defer p.mu.Unlock()
for i := range fileNums {
if _, ok := p.mu.knownObjects[fileNums[i]]; !ok {
return errors.Wrapf(
os.ErrNotExist,
"file %s (type %d) unknown to the objstorage provider",
fileNums[i], errors.Safe(fileType),
)
}
// Prevent this object from deletion, at least for the life of this instance.
p.mu.protectedObjects[fileNums[i]] = p.mu.protectedObjects[fileNums[i]] + 1
}

if p.remote.catalog != nil {
return p.remote.catalog.Checkpoint(fs, dir)
}
return nil
}

func (p *provider) addMetadata(meta objstorage.ObjectMetadata) {
p.mu.Lock()
defer p.mu.Unlock()
Expand Down
23 changes: 23 additions & 0 deletions objstorage/objstorageprovider/remoteobjcat/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"cmp"
"fmt"
"io"
"path/filepath"
"slices"
"sync"

Expand Down Expand Up @@ -373,6 +374,28 @@ func (c *Catalog) createNewCatalogFileLocked() (outErr error) {
return nil
}

// Checkpoint copies catalog state to a file in the specified directory
func (c *Catalog) Checkpoint(fs vfs.FS, dir string) error {
c.mu.Lock()
defer c.mu.Unlock()

// NB: Every write to recWriter is flushed. We don't need to worry about
// this new file descriptor not getting all the saved catalog entries.
existingCatalogFilepath := filepath.Join(c.dirname, c.mu.catalogFilename)
destPath := filepath.Join(dir, c.mu.catalogFilename)
if err := vfs.CopyAcrossFS(c.fs, existingCatalogFilepath, fs, destPath); err != nil {
return err
}
catalogMarker, _, err := atomicfs.LocateMarker(fs, dir, catalogMarkerName)
if err != nil {
return err
}
if err := catalogMarker.Move(c.mu.catalogFilename); err != nil {
return err
}
return catalogMarker.Close()
}

func writeRecord(ve *VersionEdit, file vfs.File, recWriter *record.Writer) error {
w, err := recWriter.Next()
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions testdata/checkpoint
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ close: db/marker.manifest.000001.MANIFEST-000001
sync: db
open-dir: db
open-dir: db
open-dir: db
sync: db/MANIFEST-000001
create: db/000002.log
sync: db
Expand Down Expand Up @@ -281,6 +282,7 @@ open: checkpoints/checkpoint1/MANIFEST-000001
close: checkpoints/checkpoint1/MANIFEST-000001
open-dir: checkpoints/checkpoint1
open-dir: checkpoints/checkpoint1
open-dir: checkpoints/checkpoint1
open: checkpoints/checkpoint1/OPTIONS-000003
close: checkpoints/checkpoint1/OPTIONS-000003
open: checkpoints/checkpoint1/000006.log
Expand Down Expand Up @@ -347,6 +349,7 @@ open: checkpoints/checkpoint2/MANIFEST-000001
close: checkpoints/checkpoint2/MANIFEST-000001
open-dir: checkpoints/checkpoint2
open-dir: checkpoints/checkpoint2
open-dir: checkpoints/checkpoint2
open: checkpoints/checkpoint2/OPTIONS-000003
close: checkpoints/checkpoint2/OPTIONS-000003
open: checkpoints/checkpoint2/000006.log
Expand Down Expand Up @@ -388,6 +391,7 @@ open: checkpoints/checkpoint3/MANIFEST-000001
close: checkpoints/checkpoint3/MANIFEST-000001
open-dir: checkpoints/checkpoint3
open-dir: checkpoints/checkpoint3
open-dir: checkpoints/checkpoint3
open: checkpoints/checkpoint3/OPTIONS-000003
close: checkpoints/checkpoint3/OPTIONS-000003
open: checkpoints/checkpoint3/000006.log
Expand Down Expand Up @@ -525,6 +529,7 @@ open: checkpoints/checkpoint4/MANIFEST-000001
close: checkpoints/checkpoint4/MANIFEST-000001
open-dir: checkpoints/checkpoint4
open-dir: checkpoints/checkpoint4
open-dir: checkpoints/checkpoint4
open: checkpoints/checkpoint4/OPTIONS-000003
close: checkpoints/checkpoint4/OPTIONS-000003
open: checkpoints/checkpoint4/000008.log
Expand Down Expand Up @@ -635,6 +640,7 @@ open: checkpoints/checkpoint5/MANIFEST-000001
close: checkpoints/checkpoint5/MANIFEST-000001
open-dir: checkpoints/checkpoint5
open-dir: checkpoints/checkpoint5
open-dir: checkpoints/checkpoint5
open: checkpoints/checkpoint5/OPTIONS-000003
close: checkpoints/checkpoint5/OPTIONS-000003
open: checkpoints/checkpoint5/000008.log
Expand Down Expand Up @@ -731,6 +737,7 @@ open: checkpoints/checkpoint6/MANIFEST-000001
close: checkpoints/checkpoint6/MANIFEST-000001
open-dir: checkpoints/checkpoint6
open-dir: checkpoints/checkpoint6
open-dir: checkpoints/checkpoint6
open: checkpoints/checkpoint6/OPTIONS-000003
close: checkpoints/checkpoint6/OPTIONS-000003
open: checkpoints/checkpoint6/000008.log
Expand Down
Loading

0 comments on commit e8b1c6f

Please sign in to comment.