Skip to content

Commit

Permalink
Merge #69768 #69941
Browse files Browse the repository at this point in the history
69768: workload: fix some missing Err() checks on sql.Query usages r=cucaroach a=cucaroach

When calling Next until it returns false there could be error or it
could be the end of results or an error could have occurred and Err must
be called to see which happened.   In cases where we call Next until it
returns false the query is auto closed and its sufficient to call Err
instead of Close.

This fixes all but one of the cases found by the rowserrcheck linter,
there's one false positive in tpcds that appears to be caused by go
routine usage:

pkg/workload/tpcds/tpcds.go:311:26: rows.Err must be checked

Release justification: fixes hidden errors in test code
Release note: None

Fixes #68164 

69941: storage: use atomic marker for file registry r=sumeerbhola a=jbowens

Use a separate atomic marker file to denote which of the records-based,
file registries is currently active. This scheme does not require atomic
renames, and does not require closing and re-opening the registry during
a rotation.

This will need to be backported to crl-release-21.2.

Fixes #69797.
Informs #69861.

Release justification: fixes a high-severity bug in new functionality
Release note: None

Co-authored-by: Tommy Reilly <[email protected]>
Co-authored-by: Jackson Owens <[email protected]>
  • Loading branch information
3 people committed Sep 10, 2021
3 parents c1a9c8b + c5cac34 + b889168 commit eaafe77
Show file tree
Hide file tree
Showing 9 changed files with 651 additions and 139 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/storageccl/engineccl/encrypted_fs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func TestEncryptedFS(t *testing.T) {

memFS := vfs.NewMem()

require.NoError(t, memFS.MkdirAll("/bar", os.ModePerm))
fileRegistry := &storage.PebbleFileRegistry{FS: memFS, DBDir: "/bar"}
require.NoError(t, fileRegistry.Load())

Expand Down
8 changes: 4 additions & 4 deletions pkg/cli/interactive_tests/test_encryption.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ proc file_has_size {filepath size} {
}
}

proc file_exists {filepath} {
if {! [ file exist $filepath]} {
report "MISSING EXPECTED FILE: $filepath"
proc file_not_exists {filepath} {
if {[ file exist $filepath]} {
report "UNEXPECTED FILE: $filepath"
exit 1
}
}
Expand Down Expand Up @@ -74,7 +74,7 @@ send "$argv start-single-node --insecure --store=$storedir --enterprise-encrypti
eexpect "node starting"
interrupt
eexpect "shutdown completed"
file_exists "$storedir/COCKROACHDB_ENCRYPTION_REGISTRY"
file_not_exists "$storedir/COCKROACHDB_REGISTRY"
send "$argv debug encryption-status $storedir --enterprise-encryption=path=$storedir,key=$keydir/aes-128.key,old-key=plain\r"
eexpect " \"Active\": true,\r\n \"Type\": \"AES128_CTR\","
# Try starting without the encryption flag.
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ go_library(
"@com_github_cockroachdb_pebble//record",
"@com_github_cockroachdb_pebble//sstable",
"@com_github_cockroachdb_pebble//vfs",
"@com_github_cockroachdb_pebble//vfs/atomicfs",
"@com_github_cockroachdb_redact//:redact",
"@com_github_dustin_go_humanize//:go-humanize",
"@com_github_gogo_protobuf//proto",
Expand Down
180 changes: 131 additions & 49 deletions pkg/storage/pebble_file_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
package storage

import (
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"sort"
"strings"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
Expand All @@ -25,6 +27,7 @@ import (
"github.com/cockroachdb/errors/oserror"
"github.com/cockroachdb/pebble/record"
"github.com/cockroachdb/pebble/vfs"
"github.com/cockroachdb/pebble/vfs/atomicfs"
"github.com/gogo/protobuf/proto"
)

Expand All @@ -34,17 +37,18 @@ var CanRegistryElideFunc func(entry *enginepb.FileEntry) bool

const maxRegistrySize = 128 << 20 // 128 MB

// PebbleFileRegistry keeps track of files for the data-FS and store-FS for Pebble (see encrypted_fs.go
// for high-level comment).
// PebbleFileRegistry keeps track of files for the data-FS and store-FS
// for Pebble (see encrypted_fs.go for high-level comment).
//
// It is created even when file registry is disabled, so that it can be used to ensure that
// a registry file did not exist previously, since that would indicate that disabling the registry
// can cause data loss.
// It is created even when file registry is disabled, so that it can be
// used to ensure that a registry file did not exist previously, since
// that would indicate that disabling the registry can cause data loss.
//
// The records-based registry file written to disk contains a sequence
// of records: a marshaled enginepb.RegistryHeader byte slice followed by
// marshaled enginepb.RegistryUpdateBatch byte slices that correspond to a
// batch of updates to the file registry. The updates are replayed in Load.
// of records: a marshaled enginepb.RegistryHeader byte slice followed
// by marshaled enginepb.RegistryUpdateBatch byte slices that correspond
// to a batch of updates to the file registry. The updates are replayed
// in Load.
type PebbleFileRegistry struct {
// Initialize the following before calling Load().

Expand All @@ -63,7 +67,6 @@ type PebbleFileRegistry struct {
// Implementation.
// TODO(ayang): remove oldRegistryPath when we deprecate the old registry
oldRegistryPath string
registryPath string

mu struct {
syncutil.Mutex
Expand All @@ -75,13 +78,22 @@ type PebbleFileRegistry struct {
registryFile vfs.File
// registryWriter is a record.Writer for registryFile.
registryWriter *record.Writer
// registryMarker is an atomic file marker used to denote which
// of the records-based registry files is the current one. When
// we rotate files, the marker is atomically moved to the new
// file. It's guaranteed to be non-nil after Load.
marker *atomicfs.Marker
// registryFilename is the filename of the currently active
// records-based registry file. If no file has been written yet,
// this may be the empty string.
registryFilename string
}
}

const (
// TODO(ayang): mark COCKROACHDB_REGISTRY as deprecated so it isn't reused
oldRegistryFilename = "COCKROACHDB_REGISTRY"
registryFilename = "COCKROACHDB_ENCRYPTION_REGISTRY"
registryFilename = "COCKROACHDB_REGISTRY"
registryMarkerName = "registry"
)

// CheckNoRegistryFile checks that no registry file currently exists.
Expand All @@ -98,7 +110,7 @@ func (r *PebbleFileRegistry) CheckNoRegistryFile() error {

func (r *PebbleFileRegistry) checkNoBaseRegistry() error {
// NB: We do not assign r.oldRegistryPath if the registry will not be used.
oldRegistryPath := r.FS.PathJoin(r.DBDir, oldRegistryFilename)
oldRegistryPath := r.FS.PathJoin(r.DBDir, registryFilename)
_, err := r.FS.Stat(oldRegistryPath)
if err == nil {
return os.ErrExist
Expand All @@ -110,27 +122,29 @@ func (r *PebbleFileRegistry) checkNoBaseRegistry() error {
}

func (r *PebbleFileRegistry) checkNoRecordsRegistry() error {
// NB: We do not assign r.registryPath if the registry will not be used.
registryPath := r.FS.PathJoin(r.DBDir, registryFilename)
_, err := r.FS.Stat(registryPath)
if err == nil {
return os.ErrExist
}
if !oserror.IsNotExist(err) {
filename, err := atomicfs.ReadMarker(r.FS, r.DBDir, registryMarkerName)
if oserror.IsNotExist(err) {
// ReadMarker may return oserror.IsNotExist if the data
// directory does not exist.
return nil
} else if err != nil {
return err
}
if filename != "" {
return oserror.ErrExist
}
return nil
}

// Load loads the contents of the file registry from a file, if the file exists, else it is a noop.
// Load should be called exactly once if the file registry will be used.
// Load loads the contents of the file registry from a file, if the file
// exists, else it is a noop. Load should be called exactly once if the
// file registry will be used.
func (r *PebbleFileRegistry) Load() error {
r.mu.Lock()
defer r.mu.Unlock()

// Initialize private fields needed when the file registry will be used.
r.oldRegistryPath = r.FS.PathJoin(r.DBDir, oldRegistryFilename)
r.registryPath = r.FS.PathJoin(r.DBDir, registryFilename)
r.oldRegistryPath = r.FS.PathJoin(r.DBDir, registryFilename)
r.mu.currProto = &enginepb.FileRegistry{}

if err := r.loadRegistryFromFile(); err != nil {
Expand All @@ -146,6 +160,27 @@ func (r *PebbleFileRegistry) Load() error {
}

func (r *PebbleFileRegistry) loadRegistryFromFile() error {
// The file registry uses an 'atomic marker' file to denote which of
// the new records-based file registries is currently active. It's
// okay to load the marker unconditionally, because LocateMarker
// succeeds even if the marker has never been placed.
marker, currentFilename, err := atomicfs.LocateMarker(r.FS, r.DBDir, registryMarkerName)
if err != nil {
return err
}
r.mu.marker = marker
// If the marker does not exist, currentFilename may be the
// empty string. That's okay.
r.mu.registryFilename = currentFilename

// Atomic markers may accumulate obsolete files. Remove any obsolete
// marker files as long as we're not in read-only mode.
if !r.ReadOnly {
if err := r.mu.marker.RemoveObsolete(); err != nil {
return err
}
}

// We treat the old registry file as the source of truth until the version
// is finalized. At that point, we upgrade to the new records-based registry
// file and delete the old registry file.
Expand Down Expand Up @@ -201,10 +236,10 @@ func (r *PebbleFileRegistry) maybeLoadOldBaseRegistry() (bool, error) {
}

func (r *PebbleFileRegistry) maybeLoadNewRecordsRegistry() (bool, error) {
records, err := r.FS.Open(r.registryPath)
if oserror.IsNotExist(err) {
if r.mu.registryFilename == "" {
return false, nil
}
records, err := r.FS.Open(r.FS.PathJoin(r.DBDir, r.mu.registryFilename))
if err != nil {
return false, err
}
Expand Down Expand Up @@ -276,8 +311,25 @@ func (r *PebbleFileRegistry) maybeElideEntries() error {
if r.ReadOnly {
return nil
}

// Copy the filenames to a slice and sort it for deterministic
// iteration order. This is helpful in tests and this function is a
// one-time cost at startup.
//
// TODO(jackson): Rather than Stat-ing each file, we could
// recursively List each directory and walk two lists of sorted
// filenames. We should test a store with many files to see how much
// the current approach slows node start.
filenames := make([]string, 0, len(r.mu.currProto.Files))
for filename := range r.mu.currProto.Files {
filenames = append(filenames, filename)
}
sort.Strings(filenames)

batch := &enginepb.RegistryUpdateBatch{}
for filename, entry := range r.mu.currProto.Files {
for _, filename := range filenames {
entry := r.mu.currProto.Files[filename]

// Some entries may be elided. This is used within
// ccl/storageccl/engineccl to elide plaintext file entries.
if CanRegistryElideFunc != nil && CanRegistryElideFunc(entry) {
Expand Down Expand Up @@ -420,12 +472,29 @@ func (r *PebbleFileRegistry) upgradeToRecordsVersion() error {
if r.mu.currProto.Version == enginepb.RegistryVersion_Records {
return nil
}

dir, err := r.FS.OpenDir(r.DBDir)
if err != nil {
return err
}
defer dir.Close()

// Create a new registry file to record the upgraded version.
r.mu.currProto.SetVersion(enginepb.RegistryVersion_Records)
if err := r.createNewRegistryFile(); err != nil {
return err
}
return r.FS.Remove(r.oldRegistryPath)
if err := r.FS.Remove(r.oldRegistryPath); err != nil && !oserror.IsNotExist(err) {
return err
}
// We need to sync the removal of the old registry because the
// presence of the old registry will cause future starts to read
// from the old registry.
if err := dir.Sync(); err != nil {
// Fsync errors must be fatal.
panic(errors.Wrap(err, "syncing database directory"))
}
return nil
}

func (r *PebbleFileRegistry) processBatchLocked(batch *enginepb.RegistryUpdateBatch) error {
Expand Down Expand Up @@ -468,7 +537,7 @@ func (r *PebbleFileRegistry) writeToRegistryFile(batch *enginepb.RegistryUpdateB
// Create a new file registry file if one doesn't exist yet.
if r.mu.registryWriter == nil {
if err := r.createNewRegistryFile(); err != nil {
return err
return errors.Wrap(err, "creating new registry file")
}
}
w, err := r.mu.registryWriter.Next()
Expand All @@ -492,16 +561,22 @@ func (r *PebbleFileRegistry) writeToRegistryFile(batch *enginepb.RegistryUpdateB
// if we have exceeded the max registry size.
if r.mu.registryWriter.Size() > maxRegistrySize {
if err := r.createNewRegistryFile(); err != nil {
return err
return errors.Wrap(err, "rotating registry file")
}
}
return nil
}

func makeRegistryFilename(iter uint64) string {
return fmt.Sprintf("%s_%06d", registryFilename, iter)
}

func (r *PebbleFileRegistry) createNewRegistryFile() error {
// Create a temporary file for the new registry file.
tempPath := r.registryPath + tempFileExtension
f, err := r.FS.Create(tempPath)
// Create a new registry file. It won't be active until the marker
// is moved to the new filename.
filename := makeRegistryFilename(r.mu.marker.NextIter())
filepath := r.FS.PathJoin(r.DBDir, filename)
f, err := r.FS.Create(filepath)
if err != nil {
return err
}
Expand Down Expand Up @@ -552,26 +627,31 @@ func (r *PebbleFileRegistry) createNewRegistryFile() error {
return errFunc(err)
}

// Close and replace the current registry file with the temporary file.
if err := r.closeRegistry(); err != nil {
return errFunc(err)
}
if err := r.FS.Rename(tempPath, r.registryPath); err != nil {
return errFunc(err)
}
fdir, err := r.FS.OpenDir(r.DBDir)
if err != nil {
return errFunc(err)
}
if err := fdir.Sync(); err != nil {
return errFunc(err)
// Moving the marker to the new filename atomically switches to the
// new file. Move handles syncing the data directory as well.
if err := r.mu.marker.Move(filename); err != nil {
return errors.Wrap(errFunc(err), "moving marker")
}
if err := fdir.Close(); err != nil {
return errFunc(err)

// Close and remove the previous registry file.
{
// Any errors in this block will be returned at the end of the
// function, after we update the internal state to point to the
// new filename (since we've already successfully installed it).
err = r.closeRegistry()
if err == nil && r.mu.registryFilename != "" {
rmErr := r.FS.Remove(r.FS.PathJoin(r.DBDir, r.mu.registryFilename))
if rmErr != nil && !oserror.IsNotExist(rmErr) {
err = errors.CombineErrors(err, rmErr)
}
}
}

r.mu.registryFile = f
r.mu.registryWriter = records
return nil
r.mu.registryFilename = filename

return err
}

func (r *PebbleFileRegistry) getRegistryCopy() *enginepb.FileRegistry {
Expand All @@ -587,7 +667,9 @@ func (r *PebbleFileRegistry) getRegistryCopy() *enginepb.FileRegistry {
func (r *PebbleFileRegistry) Close() error {
r.mu.Lock()
defer r.mu.Unlock()
return r.closeRegistry()
err := r.closeRegistry()
err = errors.CombineErrors(err, r.mu.marker.Close())
return err
}

func (r *PebbleFileRegistry) closeRegistry() error {
Expand Down
Loading

0 comments on commit eaafe77

Please sign in to comment.