Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-23.2: logstore: sync sideloaded storage directories #115709

Merged
merged 6 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/logstore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ go_library(
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_errors//oserror",
"@com_github_cockroachdb_pebble//record",
"@com_github_cockroachdb_pebble//vfs",
"@com_github_cockroachdb_redact//:redact",
"@io_etcd_go_raft_v3//:raft",
"@io_etcd_go_raft_v3//raftpb",
Expand Down Expand Up @@ -71,6 +72,8 @@ go_test(
"//pkg/util/tracing",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_errors//oserror",
"@com_github_cockroachdb_pebble//vfs",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@io_etcd_go_raft_v3//raftpb",
"@org_golang_x_time//rate",
Expand Down
11 changes: 9 additions & 2 deletions pkg/kv/kvserver/logstore/sideload.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ type SideloadStorage interface {
// Writes the given contents to the file specified by the given index and
// term. Overwrites the file if it already exists.
Put(_ context.Context, index kvpb.RaftIndex, term kvpb.RaftTerm, contents []byte) error
// Sync syncs the underlying filesystem metadata so that all the preceding
// mutations, such as Put and TruncateTo, are durable.
Sync() error
// Load the file at the given index and term. Return errSideloadedFileNotFound when no
// such file is present.
Get(_ context.Context, index kvpb.RaftIndex, term kvpb.RaftTerm) ([]byte, error)
Expand Down Expand Up @@ -140,8 +143,12 @@ func MaybeSideloadEntries(
sideloadedEntriesSize += int64(len(dataToSideload))
}

if output == nil {
// We never saw a sideloaded command.
if output != nil { // there is at least one sideloaded command
// Sync the sideloaded storage directory so that the commands are durable.
if err := sideloaded.Sync(); err != nil {
return nil, 0, 0, 0, err
}
} else { // we never saw a sideloaded command
output = input
}

Expand Down
106 changes: 88 additions & 18 deletions pkg/kv/kvserver/logstore/sideload_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/errors/oserror"
"github.com/cockroachdb/pebble/vfs"
"golang.org/x/time/rate"
)

Expand All @@ -37,11 +38,10 @@ var _ SideloadStorage = &DiskSideloadStorage{}
//
// TODO(pavelkalinnikov): remove the interface, this type is the only impl.
type DiskSideloadStorage struct {
st *cluster.Settings
limiter *rate.Limiter
dir string
dirCreated bool
eng storage.Engine
st *cluster.Settings
limiter *rate.Limiter
dir string
eng storage.Engine
}

func sideloadedPath(baseDir string, rangeID roachpb.RangeID) string {
Expand All @@ -50,7 +50,7 @@ func sideloadedPath(baseDir string, rangeID roachpb.RangeID) string {
// per directory, respectively. Newer FS typically have no such limitation,
// but still.
//
// For example, r1828 will end up in baseDir/r1XXX/r1828.
// For example, r1828 will end up in baseDir/sideloading/r1XXX/r1828.
return filepath.Join(
baseDir,
"sideloading",
Expand All @@ -76,12 +76,6 @@ func NewDiskSideloadStorage(
}
}

func (ss *DiskSideloadStorage) createDir() error {
err := ss.eng.MkdirAll(ss.dir, os.ModePerm)
ss.dirCreated = ss.dirCreated || err == nil
return err
}

// Dir implements SideloadStorage.
func (ss *DiskSideloadStorage) Dir() string {
return ss.dir
Expand All @@ -102,15 +96,37 @@ func (ss *DiskSideloadStorage) Put(
} else if !oserror.IsNotExist(err) {
return err
}
// createDir() ensures ss.dir exists but will not create any subdirectories
// within ss.dir because filename() does not make subdirectories in ss.dir.
if err := ss.createDir(); err != nil {
// Ensure that ss.dir exists. The filename() is placed directly in ss.dir,
// so the next loop iteration should succeed.
if err := mkdirAllAndSyncParents(ss.eng, ss.dir, os.ModePerm); err != nil {
return err
}
continue
}
}

// Sync implements SideloadStorage.
func (ss *DiskSideloadStorage) Sync() error {
dir, err := ss.eng.OpenDir(ss.dir)
// The directory can be missing because we did not Put() any entry to it yet,
// or it has been removed by TruncateTo() or Clear().
//
// TODO(pavelkalinnikov): if ss.dir existed and has been removed, we should
// sync the parent of ss.dir, to persist the removal. Otherwise it may come
// back after a restart. Alternatively, and more likely, we should cleanup
// leftovers upon restart - we have other TODOs for that.
if oserror.IsNotExist(err) {
return nil
} else if err != nil {
return err
}
if err := dir.Sync(); err != nil {
_ = dir.Close()
return err
}
return dir.Close()
}

// Get implements SideloadStorage.
func (ss *DiskSideloadStorage) Get(
ctx context.Context, index kvpb.RaftIndex, term kvpb.RaftTerm,
Expand Down Expand Up @@ -170,9 +186,7 @@ func (ss *DiskSideloadStorage) purgeFile(ctx context.Context, filename string) (

// Clear implements SideloadStorage.
func (ss *DiskSideloadStorage) Clear(_ context.Context) error {
err := ss.eng.RemoveAll(ss.dir)
ss.dirCreated = ss.dirCreated && err != nil
return err
return ss.eng.RemoveAll(ss.dir)
}

// TruncateTo implements SideloadStorage.
Expand All @@ -198,6 +212,7 @@ func (ss *DiskSideloadStorage) possiblyTruncateTo(
return nil
}
if index < from {
// TODO(pavelkalinnikov): these files may never be removed. Clean them up.
return nil
}
// index is in [from, to)
Expand All @@ -222,6 +237,8 @@ func (ss *DiskSideloadStorage) possiblyTruncateTo(
// Not worth trying to figure out which one, just try to delete.
err := ss.eng.Remove(ss.dir)
if err != nil && !oserror.IsNotExist(err) {
// TODO(pavelkalinnikov): this is possible because deletedAll can be left
// true despite existence of files with index < from which are skipped.
log.Infof(ctx, "unable to remove sideloaded dir %s: %v", ss.dir, err)
err = nil // handled
}
Expand Down Expand Up @@ -285,3 +302,56 @@ func (ss *DiskSideloadStorage) String() string {
fmt.Fprintf(&buf, "(%d files)\n", count)
return buf.String()
}

// mkdirAllAndSyncParents creates the given directory and all its missing
// parents if any. For every newly created directly, it syncs the corresponding
// parent directory. The directories are created using the provided permissions
// mask, with the same semantics as in os.MkdirAll.
//
// For example, if path is "/x/y/z", and "/x" previously existed, then this func
// creates "/x/y" and "/x/y/z", and syncs directories "/x" and "/x/y".
//
// TODO(pavelkalinnikov): this does not work well with paths containing . and ..
// elements inside the data-dir directory. We don't construct the path this way
// though, right now any non-canonical part of the path would be only in the
// <data-dir> path.
//
// TODO(pavelkalinnikov): have a type-safe canonical path type which can be
// iterated without thinking about . and .. placeholders.
func mkdirAllAndSyncParents(fs vfs.FS, path string, perm os.FileMode) error {
// Find the lowest existing directory in the hierarchy.
var exists string
for dir, parent := path, ""; ; dir = parent {
if _, err := fs.Stat(dir); err == nil {
exists = dir
break
} else if !oserror.IsNotExist(err) {
return errors.Wrapf(err, "could not get dir info: %s", dir)
}
parent = fs.PathDir(dir)
// NB: not checking against the separator, to be platform-agnostic.
if dir == "." || parent == dir { // reached the topmost dir or the root
return errors.Newf("topmost dir does not exist: %s", dir)
}
}

// Create the destination directory and any of its missing parents.
if err := fs.MkdirAll(path, perm); err != nil {
return errors.Wrapf(err, "could not create all directories: %s", path)
}

// Sync parent directories up to the lowest existing ancestor, included.
for dir, parent := path, ""; dir != exists; dir = parent {
parent = fs.PathDir(dir)
if handle, err := fs.OpenDir(parent); err != nil {
return errors.Wrapf(err, "could not open parent dir: %s", parent)
} else if err := handle.Sync(); err != nil {
_ = handle.Close()
return errors.Wrapf(err, "could not sync parent dir: %s", parent)
} else if err := handle.Close(); err != nil {
return errors.Wrapf(err, "could not close parent dir: %s", parent)
}
}

return nil
}
Loading