Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
83719: storage: remove dependency to sql/catalog/bootstrap r=Xiang-Gu a=Xiang-Gu

Previously, tests in `pkg/storage` depended on `sql/catalog/bootstrap`.
This was inadequate/weird because `storage` is a much lower layer in the
architectural stack. It will also help prevent dependency cycles in
other PRs when we introduce depencies (see #82172 if interested).

Release note: None

83958: colexecbase: add all remaining casts from strings r=yuzefovich a=yuzefovich

**tree: minor cleanup**

This commit does a few minor things:
- actually uses the error in a few places when constructing a ParseError
- refactors some of the interval-parsing functions to expose them to be
used in the follow-up commit
- extracts a helper method to construct an error when timestamp exceeds
bounds.

Release note: None

**colexecbase: sort native cast info lexicographically**

This commit sorts the information about natively supported casts
lexicographically so that it is easier to see what is actually
supported. This is simply a mechanical change.

Release note: None

**colexecbase: add all remaining casts from strings**

This commit adds the native casts from strings to all remaining
natively-supported types (dates, decimals, floats, ints, intervals,
timestamps, jsons).

I was inspired to do this because the combination of this
commit and the vectorized rendering on top of the wrapped row-by-row
processors would expose some bugs (e.g. #83094).

Addresses: #48135.

Release note: None

83984: storageccl: use NewPebbleIterator in restore data processor r=erikgrinaker a=msbutler

This PR replaces the multiIterator used in the restore data processor with the
PebbleSSTIterator, which has baked in range key support.

This patch is apart of a larger effort to teach backup and restore about MVCC
bulk operations. Next, the readAsOfIterator will need to learn how to
deal with range keys.

Informs #71155

Release note: none

84049: sql: fix memory accounting of prepared statements and portals in error cases r=yuzefovich a=yuzefovich

**sql: make sure to close mem acc of prepared stmt in case of an error**

Previously, it was possible that we would not close the memory account
created for a prepared statement when an error is encountered. This was
the case because we would not include the prepared stmt into the prep
stmts namespace, so it would just get lost. However, up until recently
this was not an issue since we mistakenly cleared that memory account
when creating the prepared statement.

Release note: None

**sql: only increment ref count of prep stmt in non-error case of portals**

Previously, it was possible to "leak" a reference to a prepared
statement if we made a portal from it (i.e. took a reference to the
prepared statement) and the memory reservation was denied. This could
lead to "leftover bytes" errors when stopping the "session" monitors.
However, the impact is minor because on release builds we'd still return
those "leftover bytes" and would just file a sentry issue. This is now
fixed.

Fixes: #83935

Release note: None

84097: DOC-4899: Remove linking on subdirectory from show_backup diagram r=RichardJCai a=nickvigilante

Release note: None

84143: Revert "kvstreamer: reuse incomplete Get requests on resume batches" r=yuzefovich a=yuzefovich

This reverts commit 21f2390.

Previously, I didn't realize that the KV layer would modify all requests
included into the BatchRequest in `txnSeqNumAllocator`, so we cannot
reuse even incomplete GetRequests. It is unfortunate, but not a big
deal.

Fixes: #83974.

Release note: None

84169: opt: fix incorrect column indexing in index recommendations r=mgartner a=mgartner

#### opt: fix incorrect column indexing in index recommendations

Fixes #83965

Release note (bug fix): A minor bug has been fixed that caused internal
errors and poor index recommendations when running `EXPLAIN` statements.

#### opt: clarify logic in Metadata.UpdateTableMeta

Release note: None


84188: roachtest: update supported tag for gorm r=ZhouXing19 a=ZhouXing19

fixes #83794
fixes #83797
fixes #83885

Release note: None

Co-authored-by: Xiang Gu <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Michael Butler <[email protected]>
Co-authored-by: Nick Vigilante <[email protected]>
Co-authored-by: Marcus Gartner <[email protected]>
Co-authored-by: Jane Xing <[email protected]>
  • Loading branch information
7 people committed Jul 11, 2022
9 parents 87e3efc + 6ba0ac6 + 3637d15 + 4d1f66e + 27330ca + 3cebf96 + 516553d + 41a794b + 0f623d6 commit 10853d2
Show file tree
Hide file tree
Showing 24 changed files with 3,141 additions and 1,096 deletions.
4 changes: 2 additions & 2 deletions docs/generated/sql/bnf/alter_table_set_storage_param.bnf
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
alter_onetable_stmt ::=
'ALTER' 'TABLE' table_name 'SET' '(' storage_parameter_key '=' var_value ) ) )* ')'
| 'ALTER' 'TABLE' 'IF' 'EXISTS' table_name 'SET' '(' storage_parameter_key '=' var_value ) ) )* ')'
'ALTER' 'TABLE' table_name 'SET' '(' storage_parameter_key '=' var_value ')'
| 'ALTER' 'TABLE' 'IF' 'EXISTS' table_name 'SET' '(' storage_parameter_key '=' var_value ')'
10 changes: 5 additions & 5 deletions pkg/ccl/backupccl/backupinfo/backup_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ func debugDumpFileSST(
}
encOpts = &roachpb.FileEncryptionOptions{Key: key}
}
iter, err := storageccl.ExternalSSTReader(ctx, store, fileInfoPath, encOpts)
iter, err := storageccl.DeprecatingExternalSSTReader(ctx, store, fileInfoPath, encOpts)
if err != nil {
return err
}
Expand Down Expand Up @@ -665,7 +665,7 @@ func DebugDumpMetadataSST(
encOpts = &roachpb.FileEncryptionOptions{Key: key}
}

iter, err := storageccl.ExternalSSTReader(ctx, store, path, encOpts)
iter, err := storageccl.DeprecatingExternalSSTReader(ctx, store, path, encOpts)
if err != nil {
return err
}
Expand Down Expand Up @@ -805,7 +805,7 @@ func NewBackupMetadata(
encOpts = &roachpb.FileEncryptionOptions{Key: key}
}

iter, err := storageccl.ExternalSSTReader(ctx, exportStore, sstFileName, encOpts)
iter, err := storageccl.DeprecatingExternalSSTReader(ctx, exportStore, sstFileName, encOpts)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -922,7 +922,7 @@ func (b *BackupMetadata) FileIter(ctx context.Context) FileIterator {
break
}

iter, err := storageccl.ExternalSSTReader(ctx, b.store, path, encOpts)
iter, err := storageccl.DeprecatingExternalSSTReader(ctx, b.store, path, encOpts)
if err != nil {
return FileIterator{err: err}
}
Expand Down Expand Up @@ -1232,7 +1232,7 @@ func makeBytesIter(
encOpts = &roachpb.FileEncryptionOptions{Key: key}
}

iter, err := storageccl.ExternalSSTReader(ctx, store, path, encOpts)
iter, err := storageccl.DeprecatingExternalSSTReader(ctx, store, path, encOpts)
if err != nil {
return bytesIter{iterError: err}
}
Expand Down
42 changes: 19 additions & 23 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,37 +278,27 @@ func (rd *restoreDataProcessor) openSSTs(
) error {
ctxDone := ctx.Done()

// The sstables only contain MVCC data and no intents, so using an MVCC
// iterator is sufficient.
var iters []storage.SimpleMVCCIterator
// TODO(msbutler): use a a map of external storage factories to avoid reopening the same dir
// in a given restore span entry
var dirs []cloud.ExternalStorage

// If we bail early and haven't handed off responsibility of the dirs/iters to
// the channel, close anything that we had open.
defer func() {
for _, iter := range iters {
iter.Close()
}

for _, dir := range dirs {
if err := dir.Close(); err != nil {
log.Warningf(ctx, "close export storage failed %v", err)
}
}
}()

// sendIters sends all of the currently accumulated iterators over the
// sendIter sends a multiplexed iterator covering the currently accumulated files over the
// channel.
sendIters := func(itersToSend []storage.SimpleMVCCIterator, dirsToSend []cloud.ExternalStorage) error {
multiIter := storage.MakeMultiIterator(itersToSend)
readAsOfIter := storage.NewReadAsOfIterator(multiIter, rd.spec.RestoreTime)
sendIter := func(iter storage.SimpleMVCCIterator, dirsToSend []cloud.ExternalStorage) error {
readAsOfIter := storage.NewReadAsOfIterator(iter, rd.spec.RestoreTime)

cleanup := func() {
readAsOfIter.Close()
multiIter.Close()
for _, iter := range itersToSend {
iter.Close()
}

for _, dir := range dirsToSend {
if err := dir.Close(); err != nil {
Expand All @@ -329,13 +319,13 @@ func (rd *restoreDataProcessor) openSSTs(
return ctx.Err()
}

iters = make([]storage.SimpleMVCCIterator, 0)
dirs = make([]cloud.ExternalStorage, 0)
return nil
}

log.VEventf(ctx, 1 /* level */, "ingesting span [%s-%s)", entry.Span.Key, entry.Span.EndKey)

filePaths := make([]string, 0, len(EntryFiles{}))
for _, file := range entry.Files {
log.VEventf(ctx, 2, "import file %s which starts at %s", file.Path, entry.Span.Key)

Expand All @@ -344,17 +334,23 @@ func (rd *restoreDataProcessor) openSSTs(
return err
}
dirs = append(dirs, dir)
filePaths = append(filePaths, file.Path)

// TODO(pbardea): When memory monitoring is added, send the currently
// accumulated iterators on the channel if we run into memory pressure.
iter, err := storageccl.ExternalSSTReader(ctx, dir, file.Path, rd.spec.Encryption)
if err != nil {
return err
}
iters = append(iters, iter)
}

return sendIters(iters, dirs)
iterOpts := storage.IterOptions{
RangeKeyMaskingBelow: rd.spec.RestoreTime,
KeyTypes: storage.IterKeyTypePointsAndRanges,
LowerBound: keys.LocalMax,
UpperBound: keys.MaxKey,
}
iter, err := storageccl.ExternalSSTReader(ctx, dirs, filePaths, rd.spec.Encryption,
iterOpts)
if err != nil {
return err
}
return sendIter(iter, dirs)
}

func (rd *restoreDataProcessor) runRestoreWorkers(ctx context.Context, ssts chan mergedSST) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/cliccl/debug_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ func makeIters(
return nil, nil, errors.Wrapf(err, "making external storage")
}

iters[i], err = storageccl.ExternalSSTReader(ctx, dirStorage[i], file.Path, nil)
iters[i], err = storageccl.DeprecatingExternalSSTReader(ctx, dirStorage[i], file.Path, nil)
if err != nil {
return nil, nil, errors.Wrapf(err, "fetching sst reader")
}
Expand Down
120 changes: 118 additions & 2 deletions pkg/ccl/storageccl/external_sst_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"github.com/cockroachdb/pebble/sstable"
)

// RemoteSSTs lets external SSTables get iterated directly in some cases,
// rather than being downloaded entirely first.
var remoteSSTs = settings.RegisterBoolSetting(
settings.TenantWritable,
"kv.bulk_ingest.stream_external_ssts.enabled",
Expand All @@ -39,12 +41,126 @@ var remoteSSTSuffixCacheSize = settings.RegisterByteSizeSetting(
64<<10,
)

// ExternalSSTReader returns opens an SST in external storage, optionally
// decrypting with the supplied parameters, and returns iterator over it.
func getFileWithRetry(
ctx context.Context, basename string, e cloud.ExternalStorage,
) (ioctx.ReadCloserCtx, int64, error) {
// Do an initial read of the file, from the beginning, to get the file size as
// this is used e.g. to read the trailer.
var f ioctx.ReadCloserCtx
var sz int64
const maxAttempts = 3
if err := retry.WithMaxAttempts(ctx, base.DefaultRetryOptions(), maxAttempts, func() error {
var err error
f, sz, err = e.ReadFileAt(ctx, basename, 0)
return err
}); err != nil {
return nil, 0, err
}
return f, sz, nil
}

// newMemPebbleSSTReader returns a PebbleSSTIterator for in-memory SSTs from
// external storage, optionally decrypting with the supplied parameters.
//
// ctx is captured and used throughout the life of the returned iterator, until
// the iterator's Close() method is called.
func newMemPebbleSSTReader(
ctx context.Context,
e []cloud.ExternalStorage,
basenames []string,
encryption *roachpb.FileEncryptionOptions,
iterOps storage.IterOptions,
) (storage.SimpleMVCCIterator, error) {

inMemorySSTs := make([][]byte, 0, len(basenames))

for i, basename := range basenames {
f, _, err := getFileWithRetry(ctx, basename, e[i])
if err != nil {
return nil, err
}
content, err := ioctx.ReadAll(ctx, f)
f.Close(ctx)
if err != nil {
return nil, err
}
if encryption != nil {
content, err = DecryptFile(ctx, content, encryption.Key, nil /* mm */)
if err != nil {
return nil, err
}
}
inMemorySSTs = append(inMemorySSTs, content)
}
return storage.NewPebbleMultiMemSSTIterator(inMemorySSTs, false, iterOps)
}

// ExternalSSTReader returns a PebbleSSTIterator for the SSTs in external storage,
// optionally decrypting with the supplied parameters.
//
// ctx is captured and used throughout the life of the returned iterator, until
// the iterator's Close() method is called.
func ExternalSSTReader(
ctx context.Context,
e []cloud.ExternalStorage,
basenames []string,
encryption *roachpb.FileEncryptionOptions,
iterOps storage.IterOptions,
) (storage.SimpleMVCCIterator, error) {
if !remoteSSTs.Get(&e[0].Settings().SV) {
return newMemPebbleSSTReader(ctx, e, basenames, encryption, iterOps)
}
remoteCacheSize := remoteSSTSuffixCacheSize.Get(&e[0].Settings().SV)
readers := make([]sstable.ReadableFile, 0, len(basenames))

for i, basename := range basenames {
f, sz, err := getFileWithRetry(ctx, basename, e[i])
if err != nil {
return nil, err
}

raw := &sstReader{
ctx: ctx,
sz: sizeStat(sz),
body: f,
openAt: func(offset int64) (ioctx.ReadCloserCtx, error) {
reader, _, err := e[i].ReadFileAt(ctx, basename, offset)
return reader, err
},
}

var reader sstable.ReadableFile

if encryption != nil {
r, err := decryptingReader(raw, encryption.Key)
if err != nil {
f.Close(ctx)
return nil, err
}
reader = r
} else {
// We only explicitly buffer the suffix of the file when not decrypting as
// the decrypting reader has its own internal block buffer.
if err := raw.readAndCacheSuffix(remoteCacheSize); err != nil {
f.Close(ctx)
return nil, err
}
reader = raw
}
readers = append(readers, reader)
}
return storage.NewPebbleSSTIterator(readers, iterOps)
}

// DeprecatingExternalSSTReader returns opens an SST in external storage, optionally
// decrypting with the supplied parameters, and returns iterator over it.
//
// ctx is captured and used throughout the life of the returned iterator, until
// the iterator's Close() method is called.
//
// TODO (msbutler): replace all current calls with new ExternalSSTReader,
// as it does not handle range keys
func DeprecatingExternalSSTReader(
ctx context.Context,
e cloud.ExternalStorage,
basename string,
Expand Down
4 changes: 2 additions & 2 deletions pkg/cmd/docgen/diagrams.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ var specs = []stmtSpec{
stmt: "alter_onetable_stmt",
inline: []string{"alter_table_cmds", "alter_table_cmd", "storage_parameter_list", "storage_parameter"},
regreplace: map[string]string{
`var_value.*`: `var_value ) ) )* ')'`,
`var_value.*`: `var_value ')'`,
},
match: []*regexp.Regexp{regexp.MustCompile("relation_expr 'SET")},
replace: map[string]string{
Expand Down Expand Up @@ -1311,7 +1311,7 @@ var specs = []stmtSpec{
"'BACKUP' string_or_placeholder 'IN' string_or_placeholder": "'BACKUP' subdirectory 'IN' location",
"'BACKUP' 'SCHEMAS' string_or_placeholder": "'BACKUP' 'SCHEMAS' location",
},
unlink: []string{"location"},
unlink: []string{"subdirectory", "location"},
},
{
name: "show_jobs",
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/tests/gorm.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

var gormReleaseTag = regexp.MustCompile(`^v(?P<major>\d+)\.(?P<minor>\d+)\.(?P<point>\d+)$`)
var gormSupportedTag = "v1.23.5"
var gormSupportedTag = "v1.23.8"

func registerGORM(r registry.Registry) {
runGORM := func(ctx context.Context, t test.Test, c cluster.Cluster) {
Expand Down
28 changes: 16 additions & 12 deletions pkg/kv/kvclient/kvstreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1567,6 +1567,14 @@ func buildResumeSingleRangeBatch(
// requests with the ResumeSpans.
resumeReq.reqsReservedBytes = fp.resumeReqsMemUsage
resumeReq.overheadAccountedFor = req.overheadAccountedFor
// Note that due to limitations of the KV layer (#75452) we cannot reuse
// original requests because the KV doesn't allow mutability (and all
// requests are modified by txnSeqNumAllocator, even if they are not
// evaluated due to TargetBytes limit).
gets := make([]struct {
req roachpb.GetRequest
union roachpb.RequestUnion_Get
}, fp.numIncompleteGets)
scans := make([]struct {
req roachpb.ScanRequest
union roachpb.RequestUnion_Scan
Expand All @@ -1583,18 +1591,14 @@ func buildResumeSingleRangeBatch(
emptyResponse = false
continue
}
// This Get wasn't completed - include it into the batch again (we
// can just reuse the original request since it hasn't been
// modified which is also asserted below).
if buildutil.CrdbTestBuild {
if origSpan := req.reqs[i].GetInner().Header().Span(); !get.ResumeSpan.Equal(origSpan) {
panic(errors.AssertionFailedf(
"unexpectedly the ResumeSpan %s on the GetResponse is different from the original span %s",
get.ResumeSpan, origSpan,
))
}
}
resumeReq.reqs[resumeReqIdx] = req.reqs[i]
// This Get wasn't completed - create a new request according to the
// ResumeSpan and include it into the batch.
newGet := gets[0]
gets = gets[1:]
newGet.req.SetSpan(*get.ResumeSpan)
newGet.req.KeyLocking = s.keyLocking
newGet.union.Get = &newGet.req
resumeReq.reqs[resumeReqIdx].Value = &newGet.union
resumeReq.positions = append(resumeReq.positions, position)
if req.subRequestIdx != nil {
resumeReq.subRequestIdx = append(resumeReq.subRequestIdx, req.subRequestIdx[i])
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecbase/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ go_library(
"//pkg/util/duration", # keep
"//pkg/util/json", # keep
"//pkg/util/log",
"//pkg/util/timeutil/pgdate", # keep
"//pkg/util/uuid", # keep
"@com_github_cockroachdb_apd_v3//:apd", # keep
"@com_github_cockroachdb_errors//:errors",
Expand Down
Loading

0 comments on commit 10853d2

Please sign in to comment.