Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
58990: sql: deal with computed and default column generation in index backfi… r=ajwerner a=ajwerner

This commit was mostly motivated by work on the new schema changer to enable
the behavior described in #47989, however, it also turns out to be a
prerequisite of the work to use virtual computed columns in secondary indexes.
Given we haven't released virtual computed columns, I'm going to omit a
release not for this PR.

The basic idea is that we need to track dependencies for computed columns
to make sure they are retrieved. Default expressions need to be evaluated
first. Much of the code is testing.

Release note: None

59261: sql: add full table or index scan count metric r=barryhe2000 a=barryhe2000

Previously, this metric was not available, but is now added so that users can
see when full table scans or full index scans are being used for their queries.

Fixes: #58653

Release note (ui change): User can see time series custom chart in admin ui
 for full table or index scans.

59478: cloudimpl: cache suffix in remote file sst wrapper r=dt a=dt

Reading SSTs starts with multiple tiny reads in offsets near the end of the file.
If we can read that whole region once and fulfill those reads from a cached buffer,
we can avoid repeated RPCs.

Release note: none.

Co-authored-by: Andrew Werner <[email protected]>
Co-authored-by: Barry He <[email protected]>
Co-authored-by: David Taylor <[email protected]>
  • Loading branch information
4 people committed Jan 29, 2021
4 parents 5f981c7 + 5b2198f + ac3b3c7 + 9c76309 commit 054c618
Show file tree
Hide file tree
Showing 15 changed files with 814 additions and 71 deletions.
62 changes: 62 additions & 0 deletions pkg/ccl/storageccl/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ var remoteSSTs = settings.RegisterBoolSetting(
true,
)

var remoteSSTSuffixCacheSize = settings.RegisterByteSizeSetting(
"kv.bulk_ingest.stream_external_ssts.suffix_cache_size",
"size of suffix of remote SSTs to download and cache before reading from remote stream",
64<<10,
)

// commandMetadataEstimate is an estimate of how much metadata Raft will add to
// an AddSSTable command. It is intentionally a vast overestimate to avoid
// embedding intricate knowledge of the Raft encoding scheme here.
Expand Down Expand Up @@ -333,13 +339,21 @@ func ExternalSSTReader(
}

var reader sstable.ReadableFile = raw

if encryption != nil {
r, err := decryptingReader(raw, encryption.Key)
if err != nil {
f.Close()
return nil, err
}
reader = r
} else {
// We only explicitly buffer the suffix of the file when not decrypting as
// the decrypting reader has its own internal block buffer.
if err := raw.readAndCacheSuffix(remoteSSTSuffixCacheSize.Get(&e.Settings().SV)); err != nil {
f.Close()
return nil, err
}
}

iter, err := storage.NewSSTIterator(reader)
Expand All @@ -359,6 +373,15 @@ type sstReader struct {
pos int64

readPos int64 // readPos is used to transform Read() to ReadAt(readPos).

// This wrapper's primary purpose is reading SSTs which often perform many
// tiny reads in a cluster of offsets near the end of the file. If we can read
// the whole region once and fullfil those from a cache, we can avoid repeated
// RPCs.
cache struct {
pos int64
buf []byte
}
}

// Close implements io.Closer.
Expand All @@ -381,11 +404,49 @@ func (r *sstReader) Read(p []byte) (int, error) {
return n, err
}

// readAndCacheSuffix caches the `size` suffix of the file (which could the
// whole file) for use by later ReadAt calls to avoid making additional RPCs.
func (r *sstReader) readAndCacheSuffix(size int64) error {
if size == 0 {
return nil
}
r.cache.buf = nil
r.cache.pos = int64(r.sz) - size
if r.cache.pos <= 0 {
r.cache.pos = 0
}
reader, err := r.openAt(r.cache.pos)
if err != nil {
return err
}
defer reader.Close()
read, err := ioutil.ReadAll(reader)
if err != nil {
return err
}
r.cache.buf = read
return nil
}

// ReadAt implements io.ReaderAt by opening a Reader at an offset before reading
// from it. Note: contrary to io.ReaderAt, ReadAt does *not* support parallel
// calls.
func (r *sstReader) ReadAt(p []byte, offset int64) (int, error) {
var read int
if offset >= r.cache.pos && offset < r.cache.pos+int64(len(r.cache.buf)) {
read += copy(p, r.cache.buf[offset-r.cache.pos:])
if read == len(p) {
return read, nil
}
// Advance offset to end of what cache read.
offset += int64(read)
}

if offset == int64(r.sz) {
return read, io.EOF
}

// Position the underlying reader at offset if needed.
if r.pos != offset {
if err := r.Close(); err != nil {
return 0, err
Expand All @@ -397,6 +458,7 @@ func (r *sstReader) ReadAt(p []byte, offset int64) (int, error) {
r.pos = offset
r.body = b
}

var err error
for n := 0; read < len(p); n, err = r.body.Read(p[read:]) {
read += n
Expand Down
59 changes: 59 additions & 0 deletions pkg/ccl/storageccl/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
package storageccl

import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
Expand All @@ -37,6 +39,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/pebble/vfs"
"github.com/stretchr/testify/require"
)

func TestMaxImportBatchSize(t *testing.T) {
Expand Down Expand Up @@ -395,3 +398,59 @@ func runTestImport(t *testing.T, init func(*cluster.Settings)) {
})
}
}

func TestSSTReaderCache(t *testing.T) {
defer leaktest.AfterTest(t)()

var openCalls, expectedOpenCalls int
const sz, suffix = 100, 10
raw := &sstReader{
sz: sizeStat(sz),
body: ioutil.NopCloser(bytes.NewReader(nil)),
openAt: func(offset int64) (io.ReadCloser, error) {
openCalls++
return ioutil.NopCloser(bytes.NewReader(make([]byte, sz-int(offset)))), nil
},
}

require.Equal(t, 0, openCalls)
_ = raw.readAndCacheSuffix(suffix)
expectedOpenCalls++

discard := make([]byte, 5)

// Reading in the suffix doesn't make another call.
_, _ = raw.ReadAt(discard, 90)
require.Equal(t, expectedOpenCalls, openCalls)

// Reading in the suffix again doesn't make another call.
_, _ = raw.ReadAt(discard, 95)
require.Equal(t, expectedOpenCalls, openCalls)

// Reading outside the suffix makes a new call.
_, _ = raw.ReadAt(discard, 85)
expectedOpenCalls++
require.Equal(t, expectedOpenCalls, openCalls)

// Reading at same offset, outside the suffix, does make a new call to rewind.
_, _ = raw.ReadAt(discard, 85)
expectedOpenCalls++
require.Equal(t, expectedOpenCalls, openCalls)

// Read at new pos does makes a new call.
_, _ = raw.ReadAt(discard, 0)
expectedOpenCalls++
require.Equal(t, expectedOpenCalls, openCalls)

// Read at cur pos (where last read stopped) does not reposition.
_, _ = raw.ReadAt(discard, 5)
require.Equal(t, expectedOpenCalls, openCalls)

// Read at in suffix between non-suffix reads does not make a call.
_, _ = raw.ReadAt(discard, 92)
require.Equal(t, expectedOpenCalls, openCalls)

// Read at where prior non-suffix read finished does not make a new call.
_, _ = raw.ReadAt(discard, 10)
require.Equal(t, expectedOpenCalls, openCalls)
}
3 changes: 3 additions & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ go_test(
"run_control_test.go",
"scan_test.go",
"scatter_test.go",
"schema_changer_helpers_test.go",
"schema_changer_test.go",
"scrub_test.go",
"sequence_test.go",
Expand Down Expand Up @@ -495,6 +496,7 @@ go_test(
"//pkg/server/status/statuspb:statuspb_go_proto",
"//pkg/server/telemetry",
"//pkg/settings/cluster",
"//pkg/sql/backfill",
"//pkg/sql/catalog",
"//pkg/sql/catalog/catalogkeys",
"//pkg/sql/catalog/catalogkv",
Expand Down Expand Up @@ -586,6 +588,7 @@ go_test(
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@in_gopkg_yaml_v2//:yaml_v2",
"@org_golang_google_protobuf//proto",
"@org_golang_x_sync//errgroup",
],
)
Expand Down
Loading

0 comments on commit 054c618

Please sign in to comment.