From 5b2198f8f7177401c5bbaa21f29b281f6085748f Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Wed, 13 Jan 2021 12:26:28 -0500 Subject: [PATCH 1/3] sql: deal with computed and default column generation in index backfiller This commit was mostly motivated by work on the new schema changer to enable the behavior described in #47989, however, it also turns out to be a prerequisite of the work to use virtual computed columns in secondary indexes. Given we haven't released virtual computed columns, I'm going to omit a release not for this PR. The basic idea is that we need to track dependencies for computed columns to make sure they are retrieved. Default expressions need to be evaluated first. Much of the code is testing. Release note: None --- pkg/sql/BUILD.bazel | 3 + pkg/sql/backfill/backfill.go | 206 +++++++-- pkg/sql/catalog/schemaexpr/computed_column.go | 50 ++- pkg/sql/indexbackfiller_test.go | 403 ++++++++++++++++++ pkg/sql/row/row_converter.go | 3 +- pkg/sql/schema_changer.go | 9 +- pkg/sql/schema_changer_helpers_test.go | 37 ++ pkg/sql/schema_changer_test.go | 4 +- 8 files changed, 660 insertions(+), 55 deletions(-) create mode 100644 pkg/sql/schema_changer_helpers_test.go diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 2b6645a9b902..b9d03c25a359 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -439,6 +439,7 @@ go_test( "run_control_test.go", "scan_test.go", "scatter_test.go", + "schema_changer_helpers_test.go", "schema_changer_test.go", "scrub_test.go", "sequence_test.go", @@ -495,6 +496,7 @@ go_test( "//pkg/server/status/statuspb:statuspb_go_proto", "//pkg/server/telemetry", "//pkg/settings/cluster", + "//pkg/sql/backfill", "//pkg/sql/catalog", "//pkg/sql/catalog/catalogkeys", "//pkg/sql/catalog/catalogkv", @@ -586,6 +588,7 @@ go_test( "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@in_gopkg_yaml_v2//:yaml_v2", + "@org_golang_google_protobuf//proto", "@org_golang_x_sync//errgroup", ], ) diff --git a/pkg/sql/backfill/backfill.go b/pkg/sql/backfill/backfill.go index 81f2d5c3cf3b..44a640e4074c 100644 --- a/pkg/sql/backfill/backfill.go +++ b/pkg/sql/backfill/backfill.go @@ -167,9 +167,10 @@ func (cb *ColumnBackfiller) InitForLocalUse( if err != nil { return err } - computedExprs, err := schemaexpr.MakeComputedExprs( + computedExprs, _, err := schemaexpr.MakeComputedExprs( ctx, cb.added, + desc.GetPublicColumns(), desc, tree.NewUnqualifiedTableName(tree.Name(desc.Name)), evalCtx, @@ -210,9 +211,10 @@ func (cb *ColumnBackfiller) InitForDistributedUse( if err != nil { return err } - computedExprs, err = schemaexpr.MakeComputedExprs( + computedExprs, _, err = schemaexpr.MakeComputedExprs( ctx, cb.added, + desc.GetPublicColumns(), desc, tree.NewUnqualifiedTableName(tree.Name(desc.Name)), evalCtx, @@ -396,11 +398,27 @@ type IndexBackfiller struct { types []*types.T rowVals tree.Datums evalCtx *tree.EvalContext - cols []descpb.ColumnDescriptor + + // cols are all of the writable (PUBLIC and DELETE_AND_WRITE_ONLY) columns in + // the descriptor. + cols []descpb.ColumnDescriptor + + // addedCols are the columns in DELETE_AND_WRITE_ONLY being added as part of + // this index which are not computed. + addedCols []descpb.ColumnDescriptor + + // computedCols are the columns in this index which are computed and do + // not have concrete values in the source index. This is virtual computed + // columns and stored computed columns which are non-public. + computedCols []descpb.ColumnDescriptor + + // Map of columns which need to be evaluated to their expressions. + colExprs map[descpb.ColumnID]tree.TypedExpr // predicates is a map of indexes to partial index predicate expressions. It // includes entries for partial indexes only. predicates map[descpb.IndexID]tree.TypedExpr + // indexesToEncode is a list of indexes to encode entries for a given row. // It is a field of IndexBackfiller to avoid allocating a slice for each row // backfilled. @@ -437,26 +455,108 @@ func (ib *IndexBackfiller) InitForLocalUse( // Initialize ib.added. valNeededForCol := ib.initIndexes(desc) + predicates, colExprs, referencedColumns, err := constructExprs( + ctx, desc, ib.added, ib.cols, ib.addedCols, ib.computedCols, evalCtx, semaCtx, + ) + if err != nil { + return err + } + + // Add the columns referenced in the predicate to valNeededForCol so that + // columns necessary to evaluate the predicate expression are fetched. + referencedColumns.ForEach(func(col descpb.ColumnID) { + valNeededForCol.Add(ib.colIdxMap.GetDefault(col)) + }) + + return ib.init(evalCtx, predicates, colExprs, valNeededForCol, desc, mon) +} + +// constructExprs is a helper to construct the index and column expressions +// required for an index backfill. It also returns the set of columns referenced +// by any of these exprs. +// +// The cols argument is the full set of cols in the table (including those being +// added). The addedCols argument is the set of non-public, non-computed +// columns. The computedCols argument is the set of computed columns in the +// index. +func constructExprs( + ctx context.Context, + desc catalog.TableDescriptor, + addedIndexes []*descpb.IndexDescriptor, + cols, addedCols, computedCols []descpb.ColumnDescriptor, + evalCtx *tree.EvalContext, + semaCtx *tree.SemaContext, +) ( + predicates map[descpb.IndexID]tree.TypedExpr, + colExprs map[descpb.ColumnID]tree.TypedExpr, + referencedColumns catalog.TableColSet, + _ error, +) { // Convert any partial index predicate strings into expressions. predicates, predicateRefColIDs, err := schemaexpr.MakePartialIndexExprs( ctx, - ib.added, - ib.cols, + addedIndexes, + cols, desc, evalCtx, semaCtx, ) if err != nil { - return err + return nil, nil, catalog.TableColSet{}, err } - // Add the columns referenced in the predicate to valNeededForCol so that - // columns necessary to evaluate the predicate expression are fetched. - predicateRefColIDs.ForEach(func(col descpb.ColumnID) { - valNeededForCol.Add(ib.colIdxMap.GetDefault(col)) - }) + // Determine the exprs for newly added, non-computed columns. + defaultExprs, err := schemaexpr.MakeDefaultExprs( + ctx, addedCols, &transform.ExprTransformContext{}, evalCtx, semaCtx, + ) + if err != nil { + return nil, nil, catalog.TableColSet{}, err + } - return ib.init(evalCtx, predicates, valNeededForCol, desc, mon) + // TODO(ajwerner): Rethink this table name. + tn := tree.NewUnqualifiedTableName(tree.Name(desc.GetName())) + computedExprs, computedExprRefColIDs, err := schemaexpr.MakeComputedExprs( + ctx, + computedCols, + cols, + desc, + tn, + evalCtx, + semaCtx, + ) + if err != nil { + return nil, nil, catalog.TableColSet{}, err + } + + numColExprs := len(addedCols) + len(computedCols) + colExprs = make(map[descpb.ColumnID]tree.TypedExpr, numColExprs) + var addedColSet catalog.TableColSet + for i := range defaultExprs { + id := addedCols[i].ID + colExprs[id] = defaultExprs[i] + addedColSet.Add(id) + } + for i := range computedCols { + id := computedCols[i].ID + colExprs[id] = computedExprs[i] + } + + // Ensure that only existing columns are added to the needed set. Otherwise + // the fetcher may complain that the columns don't exist. There's a somewhat + // subtle invariant that if any dependencies exist between computed columns + // and default values that the computed column be a later column and thus the + // default value will have been populated. Computed columns are not permitted + // to reference each other. + addToReferencedColumns := func(cols catalog.TableColSet) { + cols.ForEach(func(col descpb.ColumnID) { + if !addedColSet.Contains(col) { + referencedColumns.Add(col) + } + }) + } + addToReferencedColumns(predicateRefColIDs) + addToReferencedColumns(computedExprRefColIDs) + return predicates, colExprs, referencedColumns, nil } // InitForDistributedUse initializes an IndexBackfiller for use as part of a @@ -475,29 +575,27 @@ func (ib *IndexBackfiller) InitForDistributedUse( evalCtx := flowCtx.NewEvalCtx() var predicates map[descpb.IndexID]tree.TypedExpr - var predicateRefColIDs catalog.TableColSet + var colExprs map[descpb.ColumnID]tree.TypedExpr + var referencedColumns catalog.TableColSet // Install type metadata in the target descriptors, as well as resolve any // user defined types in partial index predicate expressions. - if err := flowCtx.Cfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + if err := flowCtx.Cfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { resolver := flowCtx.TypeResolverFactory.NewTypeResolver(txn) // Hydrate all the types present in the table. - if err := typedesc.HydrateTypesInTableDescriptor(ctx, desc.TableDesc(), resolver); err != nil { + if err = typedesc.HydrateTypesInTableDescriptor( + ctx, desc.TableDesc(), resolver, + ); err != nil { return err } // Set up a SemaContext to type check the default and computed expressions. semaCtx := tree.MakeSemaContext() semaCtx.TypeResolver = resolver - // Convert any partial index predicate strings into expressions. - var err error - predicates, predicateRefColIDs, err = - schemaexpr.MakePartialIndexExprs(ctx, ib.added, ib.cols, desc, evalCtx, &semaCtx) - if err != nil { - return err - } - - return nil + predicates, colExprs, referencedColumns, err = constructExprs( + ctx, desc, ib.added, ib.cols, ib.addedCols, ib.computedCols, evalCtx, &semaCtx, + ) + return err }); err != nil { return err } @@ -508,11 +606,11 @@ func (ib *IndexBackfiller) InitForDistributedUse( // Add the columns referenced in the predicate to valNeededForCol so that // columns necessary to evaluate the predicate expression are fetched. - predicateRefColIDs.ForEach(func(col descpb.ColumnID) { + referencedColumns.ForEach(func(col descpb.ColumnID) { valNeededForCol.Add(ib.colIdxMap.GetDefault(col)) }) - return ib.init(evalCtx, predicates, valNeededForCol, desc, mon) + return ib.init(evalCtx, predicates, colExprs, valNeededForCol, desc, mon) } // Close releases the resources used by the IndexBackfiller. @@ -546,7 +644,14 @@ func (ib *IndexBackfiller) ShrinkBoundAccount(ctx context.Context, shrinkBy int6 // initCols is a helper to populate column metadata of an IndexBackfiller. It // populates the cols and colIdxMap fields. func (ib *IndexBackfiller) initCols(desc *tabledesc.Immutable) { - ib.cols = desc.Columns + for i := range desc.Columns { + col := &desc.Columns[i] + ib.cols = append(ib.cols, *col) + if col.IsComputed() && col.Virtual { + ib.computedCols = append(ib.computedCols, *col) + } + } + ib.cols = append([]descpb.ColumnDescriptor(nil), desc.Columns...) // If there are ongoing mutations, add columns that are being added and in // the DELETE_AND_WRITE_ONLY state. @@ -558,6 +663,11 @@ func (ib *IndexBackfiller) initCols(desc *tabledesc.Immutable) { m.Direction == descpb.DescriptorMutation_ADD && m.State == descpb.DescriptorMutation_DELETE_AND_WRITE_ONLY { ib.cols = append(ib.cols, *column) + if column.IsComputed() { + ib.computedCols = append(ib.computedCols, *column) + } else { + ib.addedCols = append(ib.addedCols, *column) + } } } } @@ -586,8 +696,11 @@ func (ib *IndexBackfiller) initIndexes(desc *tabledesc.Immutable) util.FastIntSe ib.added = append(ib.added, idx) for i := range ib.cols { id := ib.cols[i].ID - if idx.ContainsColumnID(id) || - idx.GetEncodingType(desc.GetPrimaryIndexID()) == descpb.PrimaryIndexEncoding { + idxContainsColumn := idx.ContainsColumnID(id) + isPrimaryIndex := idx.GetEncodingType(desc.GetPrimaryIndexID()) == descpb.PrimaryIndexEncoding + if (idxContainsColumn || isPrimaryIndex) && + !ib.cols[i].Virtual && + i < len(desc.Columns) { valNeededForCol.Add(i) } } @@ -601,12 +714,14 @@ func (ib *IndexBackfiller) initIndexes(desc *tabledesc.Immutable) util.FastIntSe func (ib *IndexBackfiller) init( evalCtx *tree.EvalContext, predicateExprs map[descpb.IndexID]tree.TypedExpr, + colExprs map[descpb.ColumnID]tree.TypedExpr, valNeededForCol util.FastIntSet, desc *tabledesc.Immutable, mon *mon.BytesMonitor, ) error { ib.evalCtx = evalCtx ib.predicates = predicateExprs + ib.colExprs = colExprs // Initialize a list of index descriptors to encode entries for. If there // are no partial indexes, the list is equivalent to the list of indexes @@ -709,6 +824,28 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk( } memUsedPerChunk += indexEntriesPerRowInitialBufferSize buffer := make([]rowenc.IndexEntry, len(ib.added)) + evaluateExprs := func(cols []descpb.ColumnDescriptor) error { + for i := range cols { + colID := cols[i].ID + texpr, ok := ib.colExprs[colID] + if !ok { + continue + } + val, err := texpr.Eval(ib.evalCtx) + if err != nil { + return err + } + colIdx, ok := ib.colIdxMap.Get(colID) + if !ok { + return errors.AssertionFailedf( + "failed to find index for column %d in %d", + colID, tableDesc.GetID(), + ) + } + ib.rowVals[colIdx] = val + } + return nil + } for i := int64(0); i < chunkSize; i++ { encRow, _, _, err := ib.fetcher.NextRow(ctx) if err != nil { @@ -726,6 +863,17 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk( iv.CurSourceRow = ib.rowVals + // First populate default values, then populate computed expressions which + // may reference default values. + if len(ib.colExprs) > 0 { + if err := evaluateExprs(ib.addedCols); err != nil { + return nil, nil, 0, err + } + if err := evaluateExprs(ib.computedCols); err != nil { + return nil, nil, 0, err + } + } + // If there are any partial indexes being added, make a list of the // indexes that the current row should be added to. if len(ib.predicates) > 0 { diff --git a/pkg/sql/catalog/schemaexpr/computed_column.go b/pkg/sql/catalog/schemaexpr/computed_column.go index 49158b27fdc2..1081e5513e48 100644 --- a/pkg/sql/catalog/schemaexpr/computed_column.go +++ b/pkg/sql/catalog/schemaexpr/computed_column.go @@ -168,41 +168,42 @@ func (v *ComputedColumnValidator) ValidateNoDependents(col *descpb.ColumnDescrip // MakeComputedExprs returns a slice of the computed expressions for the // slice of input column descriptors, or nil if none of the input column -// descriptors have computed expressions. +// descriptors have computed expressions. The caller provides the set of +// sourceColumns to which the expr may refer. // // The length of the result slice matches the length of the input column // descriptors. For every column that has no computed expression, a NULL // expression is reported. // -// Note that the order of columns is critical. Expressions cannot reference -// columns that come after them in cols. +// Note that the order of input is critical. Expressions cannot reference +// columns that come after them in input. func MakeComputedExprs( ctx context.Context, - cols []descpb.ColumnDescriptor, + input, sourceColumns []descpb.ColumnDescriptor, tableDesc catalog.TableDescriptor, tn *tree.TableName, evalCtx *tree.EvalContext, semaCtx *tree.SemaContext, -) ([]tree.TypedExpr, error) { +) (_ []tree.TypedExpr, refColIDs catalog.TableColSet, _ error) { // Check to see if any of the columns have computed expressions. If there // are none, we don't bother with constructing the map as the expressions // are all NULL. haveComputed := false - for i := range cols { - if cols[i].IsComputed() { + for i := range input { + if input[i].IsComputed() { haveComputed = true break } } if !haveComputed { - return nil, nil + return nil, catalog.TableColSet{}, nil } // Build the computed expressions map from the parsed statement. - computedExprs := make([]tree.TypedExpr, 0, len(cols)) - exprStrings := make([]string, 0, len(cols)) - for i := range cols { - col := &cols[i] + computedExprs := make([]tree.TypedExpr, 0, len(input)) + exprStrings := make([]string, 0, len(input)) + for i := range input { + col := &input[i] if col.IsComputed() { exprStrings = append(exprStrings, *col.ComputeExpr) } @@ -210,36 +211,45 @@ func MakeComputedExprs( exprs, err := parser.ParseExprs(exprStrings) if err != nil { - return nil, err + return nil, catalog.TableColSet{}, err } - nr := newNameResolver(evalCtx, tableDesc.GetID(), tn, columnDescriptorsToPtrs(tableDesc.GetPublicColumns())) + nr := newNameResolver(evalCtx, tableDesc.GetID(), tn, columnDescriptorsToPtrs(sourceColumns)) nr.addIVarContainerToSemaCtx(semaCtx) var txCtx transform.ExprTransformContext compExprIdx := 0 - for i := range cols { - col := &cols[i] + for i := range input { + col := &input[i] if !col.IsComputed() { computedExprs = append(computedExprs, tree.DNull) nr.addColumn(col) continue } + + // Collect all column IDs that are referenced in the partial index + // predicate expression. + colIDs, err := ExtractColumnIDs(tableDesc, exprs[compExprIdx]) + if err != nil { + return nil, refColIDs, err + } + refColIDs.UnionWith(colIDs) + expr, err := nr.resolveNames(exprs[compExprIdx]) if err != nil { - return nil, err + return nil, catalog.TableColSet{}, err } typedExpr, err := tree.TypeCheck(ctx, expr, semaCtx, col.Type) if err != nil { - return nil, err + return nil, catalog.TableColSet{}, err } if typedExpr, err = txCtx.NormalizeExpr(evalCtx, typedExpr); err != nil { - return nil, err + return nil, catalog.TableColSet{}, err } computedExprs = append(computedExprs, typedExpr) compExprIdx++ nr.addColumn(col) } - return computedExprs, nil + return computedExprs, refColIDs, nil } diff --git a/pkg/sql/indexbackfiller_test.go b/pkg/sql/indexbackfiller_test.go index cb08813de91c..091a8932de97 100644 --- a/pkg/sql/indexbackfiller_test.go +++ b/pkg/sql/indexbackfiller_test.go @@ -14,15 +14,41 @@ import ( "context" gosql "database/sql" "sync" + "sync/atomic" "testing" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/backfill" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" + "github.com/cockroachdb/cockroach/pkg/sql/row" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/tests" + "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/sqlmigrations" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" ) // TestIndexBackfiller tests the scenarios described in docs/tech-notes/index-backfill.md @@ -149,3 +175,380 @@ func queryPairs(t *testing.T, sqlDB *gosql.DB, query string) []pair { } return ret } + +// TestIndexBackfillerComputedAndGeneratedColumns tests that the index +// backfiller support backfilling columns with default values or computed +// expressions which are being backfilled. +// +// This is a remarkably low-level test. It manually mucks with the table +// descriptor to set up an index backfill and then manually mucks sets up and +// plans the index backfill, then it mucks with the table descriptor more +// and ensures that the data was backfilled properly. +func TestIndexBackfillerComputedAndGeneratedColumns(t *testing.T) { + defer leaktest.AfterTest(t)() + + // A test case exists to exercise the behavior of an index backfill. + // The case gets to do arbitrary things to the table (which is created with + // the specified tableName in setupSQL). The idea is that there will be a + // mutation on the table with mutation ID 1 that represents an index backfill. + // The associated job (which is created by the test harness for the first + // mutation), will be blocked from running. Instead, the test will manually + // run the index backfiller via a hook exposed only for testing. + // + // After the backfill is run, the index (specified by its ID) is scanned and + // the contents are compared with expectedContents. + type indexBackfillTestCase struct { + // name of the test case. + name string + + // SQL statements to run to setup the table. + setupSQL string + tableName string + + // setupDesc should mutate the descriptor such that the mutation with + // id 1 contains an index backfill. + setupDesc func(t *testing.T, mut *tabledesc.Mutable) + indexToBackfill descpb.IndexID + expectedContents [][]string + } + + indexBackfillerTestCases := []indexBackfillTestCase{ + // This tests a secondary index which uses a virtual computed column in its + // key. + { + name: "virtual computed column in key", + setupSQL: ` +SET experimental_enable_virtual_columns = true; +CREATE TABLE foo (i INT PRIMARY KEY, k INT, v INT AS (i*i + k) VIRTUAL); +INSERT INTO foo VALUES (1, 2), (2, 3), (3, 4); +`, + tableName: "foo", + indexToBackfill: 2, + // Note that the results are ordered by column ID. + expectedContents: [][]string{ + {"1", "3"}, + {"2", "7"}, + {"3", "13"}, + }, + setupDesc: func(t *testing.T, mut *tabledesc.Mutable) { + indexToBackfill := descpb.IndexDescriptor{ + Name: "virtual_column_backed_index", + ID: mut.NextIndexID, + Unique: true, + Version: descpb.EmptyArraysInInvertedIndexesVersion, + ColumnNames: []string{ + mut.Columns[2].Name, + }, + ColumnDirections: []descpb.IndexDescriptor_Direction{ + descpb.IndexDescriptor_ASC, + }, + ColumnIDs: []descpb.ColumnID{ + mut.Columns[2].ID, + }, + ExtraColumnIDs: []descpb.ColumnID{ + mut.Columns[0].ID, + }, + Type: descpb.IndexDescriptor_FORWARD, + EncodingType: descpb.SecondaryIndexEncoding, + } + mut.NextIndexID++ + require.NoError(t, mut.AddIndexMutation( + &indexToBackfill, descpb.DescriptorMutation_ADD, + )) + }, + }, + // This test will inject a new primary index and perform a primary key swap + // where the new primary index has two new stored columns not in the existing + // primary index. + { + name: "default and generated column referencing it in new primary index", + setupSQL: ` +CREATE TABLE foo (i INT PRIMARY KEY); +INSERT INTO foo VALUES (1), (10), (100); +`, + tableName: "foo", + indexToBackfill: 2, + expectedContents: [][]string{ + {"1", "42", "43"}, + {"10", "42", "52"}, + {"100", "42", "142"}, + }, + setupDesc: func(t *testing.T, mut *tabledesc.Mutable) { + columnWithDefault := descpb.ColumnDescriptor{ + Name: "def", + ID: mut.NextColumnID, + Type: types.Int, + Nullable: false, + DefaultExpr: proto.String("42"), + Hidden: false, + PGAttributeNum: uint32(mut.NextColumnID), + } + mut.NextColumnID++ + mut.AddColumnMutation(&columnWithDefault, descpb.DescriptorMutation_ADD) + // Cheat and jump right to DELETE_AND_WRITE_ONLY. + mut.Mutations[len(mut.Mutations)-1].State = descpb.DescriptorMutation_DELETE_AND_WRITE_ONLY + computedColumnNotInPrimaryIndex := descpb.ColumnDescriptor{ + Name: "comp", + ID: mut.NextColumnID, + Type: types.Int, + Nullable: false, + ComputeExpr: proto.String("i + def"), + Hidden: false, + PGAttributeNum: uint32(mut.NextColumnID), + } + mut.NextColumnID++ + mut.AddColumnMutation(&computedColumnNotInPrimaryIndex, descpb.DescriptorMutation_ADD) + // Cheat and jump right to DELETE_AND_WRITE_ONLY. + mut.Mutations[len(mut.Mutations)-1].State = descpb.DescriptorMutation_DELETE_AND_WRITE_ONLY + + mut.Families[0].ColumnIDs = append(mut.Families[0].ColumnIDs, + columnWithDefault.ID, + computedColumnNotInPrimaryIndex.ID) + mut.Families[0].ColumnNames = append(mut.Families[0].ColumnNames, + columnWithDefault.Name, + computedColumnNotInPrimaryIndex.Name) + + indexToBackfill := descpb.IndexDescriptor{ + Name: "new_primary_index", + ID: mut.NextIndexID, + Unique: true, + Version: descpb.EmptyArraysInInvertedIndexesVersion, + ColumnNames: []string{ + mut.Columns[0].Name, + }, + ColumnDirections: []descpb.IndexDescriptor_Direction{ + descpb.IndexDescriptor_ASC, + }, + StoreColumnNames: []string{ + columnWithDefault.Name, + computedColumnNotInPrimaryIndex.Name, + }, + ColumnIDs: []descpb.ColumnID{ + mut.Columns[0].ID, + }, + ExtraColumnIDs: nil, + StoreColumnIDs: []descpb.ColumnID{ + columnWithDefault.ID, + computedColumnNotInPrimaryIndex.ID, + }, + Type: descpb.IndexDescriptor_FORWARD, + EncodingType: descpb.PrimaryIndexEncoding, + } + mut.NextIndexID++ + require.NoError(t, mut.AddIndexMutation( + &indexToBackfill, descpb.DescriptorMutation_ADD, + )) + mut.AddPrimaryKeySwapMutation(&descpb.PrimaryKeySwap{ + OldPrimaryIndexId: 1, + NewPrimaryIndexId: 2, + }) + }, + }, + } + + // fetchIndex fetches the contents of an a table index returning the results + // as datums. The datums will correspond to each of the columns stored in the + // index, ordered by column ID. + fetchIndex := func( + ctx context.Context, t *testing.T, txn *kv.Txn, table *tabledesc.Mutable, indexID descpb.IndexID, + ) []tree.Datums { + t.Helper() + var fetcher row.Fetcher + var alloc rowenc.DatumAlloc + + mm := mon.MakeStandaloneBudget(1 << 30) + idx, err := table.FindIndexWithID(indexID) + colIdxMap := table.ColumnIdxMap() + var valsNeeded util.FastIntSet + if idx.Primary() { + _ = table.ForeachPublicColumn(func(column *descpb.ColumnDescriptor) error { + if !column.Virtual { + valsNeeded.Add(colIdxMap.GetDefault(column.ID)) + } + return nil + }) + } else { + _ = idx.ForEachColumnID(func(id descpb.ColumnID) error { + valsNeeded.Add(colIdxMap.GetDefault(id)) + return nil + }) + } + require.NoError(t, err) + spans := []roachpb.Span{table.IndexSpan(keys.SystemSQLCodec, indexID)} + const reverse = false + require.NoError(t, fetcher.Init( + ctx, + keys.SystemSQLCodec, + reverse, + descpb.ScanLockingStrength_FOR_NONE, + descpb.ScanLockingWaitPolicy_BLOCK, + false, + &alloc, + mm.Monitor(), + row.FetcherTableArgs{ + Spans: spans, + Desc: table, + Index: idx.IndexDesc(), + ColIdxMap: colIdxMap, + Cols: table.Columns, + ValNeededForCol: valsNeeded, + IsSecondaryIndex: !idx.Primary(), + }, + )) + + require.NoError(t, fetcher.StartScan( + ctx, txn, spans, false, 0, true, false, /* forceProductionBatchSize */ + )) + var rows []tree.Datums + for { + datums, _, _, err := fetcher.NextRowDecoded(ctx) + require.NoError(t, err) + if datums == nil { + break + } + // Copy the datums out as the slice is reused internally. + row := make(tree.Datums, 0, valsNeeded.Len()) + for i := range datums { + if valsNeeded.Contains(i) { + row = append(row, datums[i]) + } + } + rows = append(rows, row) + } + return rows + } + + datumSliceToStrMatrix := func(rows []tree.Datums) [][]string { + res := make([][]string, len(rows)) + for i, row := range rows { + rowStrs := make([]string, len(row)) + for j, d := range row { + rowStrs[j] = d.String() + } + res[i] = rowStrs + } + return res + } + + // See the comment on indexBackfillTestCase for the behavior of run. + run := func(t *testing.T, test indexBackfillTestCase) { + ctx := context.Background() + + // Ensure the job doesn't actually run. The code below will handle running + // the index backfill. + blockChan := make(chan struct{}) + var jobToBlock atomic.Value + jobToBlock.Store(int64(0)) + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ + RunBeforeResume: func(jobID int64) error { + if jobID == jobToBlock.Load().(int64) { + <-blockChan + return errors.New("boom") + } + return nil + }, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + defer close(blockChan) + + // Run the initial setupSQL. + tdb := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + tdb.Exec(t, test.setupSQL) + + // Fetch the descriptor ID for the relevant table. + var tableID descpb.ID + tdb.QueryRow(t, "SELECT ($1::REGCLASS)::INT", test.tableName).Scan(&tableID) + + // Run the testCase's setupDesc function to prepare an index backfill + // mutation. Also, create an associated job and set it up to be blocked. + s0 := tc.Server(0) + lm := s0.LeaseManager().(*lease.Manager) + ie := s0.InternalExecutor().(sqlutil.InternalExecutor) + settings := s0.ClusterSettings() + execCfg := s0.ExecutorConfig().(sql.ExecutorConfig) + jr := s0.JobRegistry().(*jobs.Registry) + var j *jobs.Job + var table catalog.TableDescriptor + require.NoError(t, descs.Txn(ctx, settings, lm, ie, s0.DB(), func( + ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, + ) (err error) { + mut, err := descriptors.GetMutableTableByID(ctx, txn, tableID, tree.ObjectLookupFlags{}) + if err != nil { + return err + } + test.setupDesc(t, mut) + span := mut.PrimaryIndexSpan(execCfg.Codec) + resumeSpanList := make([]jobspb.ResumeSpanList, len(mut.Mutations)) + for i := range mut.Mutations { + resumeSpanList[i] = jobspb.ResumeSpanList{ + ResumeSpans: []roachpb.Span{span}, + } + } + j, err = jr.CreateAdoptableJobWithTxn(ctx, jobs.Record{ + Description: "testing", + Statement: "testing", + Username: security.RootUserName(), + DescriptorIDs: []descpb.ID{tableID}, + Details: jobspb.SchemaChangeDetails{ + DescID: tableID, + TableMutationID: 1, + FormatVersion: jobspb.JobResumerFormatVersion, + ResumeSpanList: resumeSpanList, + }, + Progress: jobspb.SchemaChangeGCProgress{}, + }, txn) + if err != nil { + return err + } + mut.MutationJobs = append(mut.MutationJobs, descpb.TableDescriptor_MutationJob{ + JobID: *j.ID(), + MutationID: 1, + }) + jobToBlock.Store(*j.ID()) + mut.MaybeIncrementVersion() + table = mut.ImmutableCopy().(catalog.TableDescriptor) + return descriptors.WriteDesc(ctx, false /* kvTrace */, mut, txn) + })) + _, err := lm.WaitForOneVersion(ctx, tableID, retry.Options{}) + require.NoError(t, err) + + // Run the index backfill + changer := sql.NewSchemaChangerForTesting( + tableID, 1, execCfg.NodeID.SQLInstanceID(), s0.DB(), lm, jr, &execCfg, settings) + changer.SetJob(j) + spans := []roachpb.Span{table.IndexSpan(keys.SystemSQLCodec, test.indexToBackfill)} + require.NoError(t, changer.TestingDistIndexBackfill( + ctx, table.GetVersion(), spans, backfill.IndexMutationFilter, 10, + )) + + // Make the mutation complete, then read the index and validate that it + // has the expected contents. + require.NoError(t, descs.Txn(ctx, settings, lm, ie, s0.DB(), func( + ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, + ) error { + table, err := descriptors.GetMutableTableByID(ctx, txn, tableID, tree.ObjectLookupFlags{}) + if err != nil { + return err + } + toComplete := len(table.Mutations) + for i := 0; i < toComplete; i++ { + mut := table.Mutations[i] + require.NoError(t, table.MakeMutationComplete(mut)) + } + table.Mutations = table.Mutations[toComplete:] + datums := fetchIndex(ctx, t, txn, table, test.indexToBackfill) + require.Equal(t, test.expectedContents, datumSliceToStrMatrix(datums)) + return nil + })) + } + + for _, test := range indexBackfillerTestCases { + t.Run(test.name, func(t *testing.T) { run(t, test) }) + } +} diff --git a/pkg/sql/row/row_converter.go b/pkg/sql/row/row_converter.go index a9fe681e925e..635c7aac6db1 100644 --- a/pkg/sql/row/row_converter.go +++ b/pkg/sql/row/row_converter.go @@ -428,9 +428,10 @@ func NewDatumRowConverter( // Here, computeExprs will be nil if there's no computed column, or // the list of computed expressions (including nil, for those columns // that are not computed) otherwise, according to colsOrdered. - c.computedExprs, err = schemaexpr.MakeComputedExprs( + c.computedExprs, _, err = schemaexpr.MakeComputedExprs( ctx, colsOrdered, + c.tableDesc.GetPublicColumns(), c.tableDesc, tree.NewUnqualifiedTableName(tree.Name(c.tableDesc.Name)), c.EvalCtx, diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index d83a1a2e20ea..1761b3d427e7 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -108,7 +108,7 @@ func NewSchemaChangerForTesting( tableID descpb.ID, mutationID descpb.MutationID, sqlInstanceID base.SQLInstanceID, - db kv.DB, + db *kv.DB, leaseMgr *lease.Manager, jobRegistry *jobs.Registry, execCfg *ExecutorConfig, @@ -118,7 +118,7 @@ func NewSchemaChangerForTesting( descID: tableID, mutationID: mutationID, sqlInstanceID: sqlInstanceID, - db: &db, + db: db, leaseMgr: leaseMgr, jobRegistry: jobRegistry, settings: settings, @@ -130,7 +130,10 @@ func NewSchemaChangerForTesting( ) sqlutil.InternalExecutor { return execCfg.InternalExecutor }, - metrics: NewSchemaChangerMetrics(), + metrics: NewSchemaChangerMetrics(), + clock: db.Clock(), + distSQLPlanner: execCfg.DistSQLPlanner, + testingKnobs: &SchemaChangerTestingKnobs{}, } } diff --git a/pkg/sql/schema_changer_helpers_test.go b/pkg/sql/schema_changer_helpers_test.go new file mode 100644 index 000000000000..deea56cc7486 --- /dev/null +++ b/pkg/sql/schema_changer_helpers_test.go @@ -0,0 +1,37 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sql + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/backfill" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" +) + +// TestingDistIndexBackfill exposes the index backfill functionality for +// testing. +func (sc *SchemaChanger) TestingDistIndexBackfill( + ctx context.Context, + version descpb.DescriptorVersion, + targetSpans []roachpb.Span, + filter backfill.MutationFilter, + indexBackfillBatchSize int64, +) error { + return sc.distIndexBackfill(ctx, version, targetSpans, filter, indexBackfillBatchSize) +} + +// SetJob sets the job. +func (sc *SchemaChanger) SetJob(job *jobs.Job) { + sc.job = job +} diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index 311658a5b079..eb2ea49e59cf 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -113,7 +113,7 @@ func TestSchemaChangeProcess(t *testing.T) { jobRegistry := s.JobRegistry().(*jobs.Registry) defer stopper.Stop(context.Background()) changer := sql.NewSchemaChangerForTesting( - id, 0, instance, *kvDB, leaseMgr, jobRegistry, &execCfg, cluster.MakeTestingClusterSettings()) + id, 0, instance, kvDB, leaseMgr, jobRegistry, &execCfg, cluster.MakeTestingClusterSettings()) if _, err := sqlDB.Exec(` CREATE DATABASE t; @@ -148,7 +148,7 @@ INSERT INTO t.test VALUES ('a', 'b'), ('c', 'd'); index.ID = tableDesc.NextIndexID tableDesc.NextIndexID++ changer = sql.NewSchemaChangerForTesting( - id, tableDesc.NextMutationID, instance, *kvDB, leaseMgr, jobRegistry, + id, tableDesc.NextMutationID, instance, kvDB, leaseMgr, jobRegistry, &execCfg, cluster.MakeTestingClusterSettings(), ) tableDesc.Mutations = append(tableDesc.Mutations, descpb.DescriptorMutation{ From ac3b3c7bfcf3524063a03f5b0717e4a6507e62c0 Mon Sep 17 00:00:00 2001 From: Barry He Date: Wed, 27 Jan 2021 18:15:12 -0800 Subject: [PATCH 2/3] sql: add full table or index scan count to metrics This new metric allows for users to see a time series of their full table or index scans in the advanced debug console. This metric is part of EngineMetrics, so there's a corresponding internal metric that counts internal full table or index scans from internal engine queries. Release note (ui change): User can see time series of full table or index scans in advanced debug console. --- pkg/sql/conn_executor.go | 7 +++---- pkg/sql/conn_executor_exec.go | 24 +++++++++++++----------- pkg/sql/exec_util.go | 7 ++++++- pkg/sql/executor_statement_metrics.go | 3 +++ pkg/ts/catalog/chart_catalog.go | 8 ++++++++ 5 files changed, 33 insertions(+), 16 deletions(-) diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 66446b9ff8e1..0f9c8e8dc60a 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -301,7 +301,6 @@ func makeMetrics(internal bool) Metrics { SQLOptFallbackCount: metric.NewCounter(getMetricMeta(MetaSQLOptFallback, internal)), SQLOptPlanCacheHits: metric.NewCounter(getMetricMeta(MetaSQLOptPlanCacheHits, internal)), SQLOptPlanCacheMisses: metric.NewCounter(getMetricMeta(MetaSQLOptPlanCacheMisses, internal)), - // TODO(mrtracy): See HistogramWindowInterval in server/config.go for the 6x factor. DistSQLExecLatency: metric.NewLatency(getMetricMeta(MetaDistSQLExecLatency, internal), 6*metricsSampleInterval), @@ -315,8 +314,9 @@ func makeMetrics(internal bool) Metrics { 6*metricsSampleInterval), SQLTxnsOpen: metric.NewGauge(getMetricMeta(MetaSQLTxnsOpen, internal)), - TxnAbortCount: metric.NewCounter(getMetricMeta(MetaTxnAbort, internal)), - FailureCount: metric.NewCounter(getMetricMeta(MetaFailure, internal)), + TxnAbortCount: metric.NewCounter(getMetricMeta(MetaTxnAbort, internal)), + FailureCount: metric.NewCounter(getMetricMeta(MetaFailure, internal)), + FullTableOrIndexScanCount: metric.NewCounter(getMetricMeta(MetaFullTableOrIndexScan, internal)), }, StartedStatementCounters: makeStartedStatementCounters(internal), ExecutedStatementCounters: makeExecutedStatementCounters(internal), @@ -566,7 +566,6 @@ func (s *Server) newConnExecutor( nodeIDOrZero, _ := s.cfg.NodeID.OptionalNodeID() sdMutator := new(sessionDataMutator) *sdMutator = s.makeSessionDataMutator(sd, sdDefaults) - ex := &connExecutor{ server: s, metrics: srvMetrics, diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index d9f63af3265a..649958041bb2 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -914,17 +914,19 @@ func (ex *connExecutor) makeExecPlan(ctx context.Context, planner *planner) erro flags := planner.curPlan.flags - // We don't execute the statement if: - // - plan contains a full table or full index scan. - // - the session setting disallows full table/index scans. - // - the query is not an internal query. - if (flags.IsSet(planFlagContainsFullIndexScan) || flags.IsSet(planFlagContainsFullTableScan)) && - planner.EvalContext().SessionData.DisallowFullTableScans && ex.executorType == executorTypeExec { - return errors.WithHint( - pgerror.Newf(pgcode.TooManyRows, - "query `%s` contains a full table/index scan which is explicitly disallowed", - planner.stmt.SQL), - "try overriding the `disallow_full_table_scans` cluster/session setting") + if flags.IsSet(planFlagContainsFullIndexScan) || flags.IsSet(planFlagContainsFullTableScan) { + if ex.executorType == executorTypeExec && planner.EvalContext().SessionData.DisallowFullTableScans { + // We don't execute the statement if: + // - plan contains a full table or full index scan. + // - the session setting disallows full table/index scans. + // - the query is not an internal query. + return errors.WithHint( + pgerror.Newf(pgcode.TooManyRows, + "query `%s` contains a full table/index scan which is explicitly disallowed", + planner.stmt.SQL), + "try overriding the `disallow_full_table_scans` cluster/session setting") + } + ex.metrics.EngineMetrics.FullTableOrIndexScanCount.Inc(1) } // TODO(knz): Remove this accounting if/when savepoint rollbacks diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 20bf1e0412a3..d6aa11f2ae21 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -488,9 +488,14 @@ var ( Measurement: "Open SQL Transactions", Unit: metric.Unit_COUNT, } + MetaFullTableOrIndexScan = metric.Metadata{ + Name: "sql.full.scan.count", + Help: "Number of full table or index scans", + Measurement: "SQL Statements", + Unit: metric.Unit_COUNT, + } // Below are the metadata for the statement started counters. - MetaQueryStarted = metric.Metadata{ Name: "sql.query.started.count", Help: "Number of SQL queries started", diff --git a/pkg/sql/executor_statement_metrics.go b/pkg/sql/executor_statement_metrics.go index e76c2da419e8..43297aac3249 100644 --- a/pkg/sql/executor_statement_metrics.go +++ b/pkg/sql/executor_statement_metrics.go @@ -140,6 +140,9 @@ type EngineMetrics struct { // FailureCount counts non-retriable errors in open transactions. FailureCount *metric.Counter + + // FullTableOrIndexScanCount counts the number of full table or index scans. + FullTableOrIndexScanCount *metric.Counter } // EngineMetrics implements the metric.Struct interface diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index f60497eceac1..98f6b82ede80 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -1859,6 +1859,14 @@ var charts = []sectionDescription{ }, AxisLabel: "Transactions", }, + { + Title: "Full Table Index Scans", + Metrics: []string{ + "sql.full.scan.count", + "sql.full.scan.count.internal", + }, + AxisLabel: "SQL Statements", + }, { Title: "Byte I/O", Metrics: []string{ From 9c76309baeca639d7c73843db6d6dedcc2e72d05 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Sun, 3 Jan 2021 04:55:15 +0000 Subject: [PATCH 3/3] cloudimpl: cache suffix in remote file sst wrapper Reading SSTs starts with multiple tiny reads in offsets near the end of the file. If we can read that whole region once and fulfill those reads from a cached buffer, we can avoid repeated RPCs. Release note: none. --- pkg/ccl/storageccl/import.go | 62 +++++++++++++++++++++++++++++++ pkg/ccl/storageccl/import_test.go | 59 +++++++++++++++++++++++++++++ 2 files changed, 121 insertions(+) diff --git a/pkg/ccl/storageccl/import.go b/pkg/ccl/storageccl/import.go index 126270feedb7..bd48cdebb5ec 100644 --- a/pkg/ccl/storageccl/import.go +++ b/pkg/ccl/storageccl/import.go @@ -93,6 +93,12 @@ var remoteSSTs = settings.RegisterBoolSetting( true, ) +var remoteSSTSuffixCacheSize = settings.RegisterByteSizeSetting( + "kv.bulk_ingest.stream_external_ssts.suffix_cache_size", + "size of suffix of remote SSTs to download and cache before reading from remote stream", + 64<<10, +) + // commandMetadataEstimate is an estimate of how much metadata Raft will add to // an AddSSTable command. It is intentionally a vast overestimate to avoid // embedding intricate knowledge of the Raft encoding scheme here. @@ -333,6 +339,7 @@ func ExternalSSTReader( } var reader sstable.ReadableFile = raw + if encryption != nil { r, err := decryptingReader(raw, encryption.Key) if err != nil { @@ -340,6 +347,13 @@ func ExternalSSTReader( return nil, err } reader = r + } else { + // We only explicitly buffer the suffix of the file when not decrypting as + // the decrypting reader has its own internal block buffer. + if err := raw.readAndCacheSuffix(remoteSSTSuffixCacheSize.Get(&e.Settings().SV)); err != nil { + f.Close() + return nil, err + } } iter, err := storage.NewSSTIterator(reader) @@ -359,6 +373,15 @@ type sstReader struct { pos int64 readPos int64 // readPos is used to transform Read() to ReadAt(readPos). + + // This wrapper's primary purpose is reading SSTs which often perform many + // tiny reads in a cluster of offsets near the end of the file. If we can read + // the whole region once and fullfil those from a cache, we can avoid repeated + // RPCs. + cache struct { + pos int64 + buf []byte + } } // Close implements io.Closer. @@ -381,11 +404,49 @@ func (r *sstReader) Read(p []byte) (int, error) { return n, err } +// readAndCacheSuffix caches the `size` suffix of the file (which could the +// whole file) for use by later ReadAt calls to avoid making additional RPCs. +func (r *sstReader) readAndCacheSuffix(size int64) error { + if size == 0 { + return nil + } + r.cache.buf = nil + r.cache.pos = int64(r.sz) - size + if r.cache.pos <= 0 { + r.cache.pos = 0 + } + reader, err := r.openAt(r.cache.pos) + if err != nil { + return err + } + defer reader.Close() + read, err := ioutil.ReadAll(reader) + if err != nil { + return err + } + r.cache.buf = read + return nil +} + // ReadAt implements io.ReaderAt by opening a Reader at an offset before reading // from it. Note: contrary to io.ReaderAt, ReadAt does *not* support parallel // calls. func (r *sstReader) ReadAt(p []byte, offset int64) (int, error) { var read int + if offset >= r.cache.pos && offset < r.cache.pos+int64(len(r.cache.buf)) { + read += copy(p, r.cache.buf[offset-r.cache.pos:]) + if read == len(p) { + return read, nil + } + // Advance offset to end of what cache read. + offset += int64(read) + } + + if offset == int64(r.sz) { + return read, io.EOF + } + + // Position the underlying reader at offset if needed. if r.pos != offset { if err := r.Close(); err != nil { return 0, err @@ -397,6 +458,7 @@ func (r *sstReader) ReadAt(p []byte, offset int64) (int, error) { r.pos = offset r.body = b } + var err error for n := 0; read < len(p); n, err = r.body.Read(p[read:]) { read += n diff --git a/pkg/ccl/storageccl/import_test.go b/pkg/ccl/storageccl/import_test.go index 84a7285e00e7..9b412a42da22 100644 --- a/pkg/ccl/storageccl/import_test.go +++ b/pkg/ccl/storageccl/import_test.go @@ -9,8 +9,10 @@ package storageccl import ( + "bytes" "context" "fmt" + "io" "io/ioutil" "os" "path/filepath" @@ -37,6 +39,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/pebble/vfs" + "github.com/stretchr/testify/require" ) func TestMaxImportBatchSize(t *testing.T) { @@ -395,3 +398,59 @@ func runTestImport(t *testing.T, init func(*cluster.Settings)) { }) } } + +func TestSSTReaderCache(t *testing.T) { + defer leaktest.AfterTest(t)() + + var openCalls, expectedOpenCalls int + const sz, suffix = 100, 10 + raw := &sstReader{ + sz: sizeStat(sz), + body: ioutil.NopCloser(bytes.NewReader(nil)), + openAt: func(offset int64) (io.ReadCloser, error) { + openCalls++ + return ioutil.NopCloser(bytes.NewReader(make([]byte, sz-int(offset)))), nil + }, + } + + require.Equal(t, 0, openCalls) + _ = raw.readAndCacheSuffix(suffix) + expectedOpenCalls++ + + discard := make([]byte, 5) + + // Reading in the suffix doesn't make another call. + _, _ = raw.ReadAt(discard, 90) + require.Equal(t, expectedOpenCalls, openCalls) + + // Reading in the suffix again doesn't make another call. + _, _ = raw.ReadAt(discard, 95) + require.Equal(t, expectedOpenCalls, openCalls) + + // Reading outside the suffix makes a new call. + _, _ = raw.ReadAt(discard, 85) + expectedOpenCalls++ + require.Equal(t, expectedOpenCalls, openCalls) + + // Reading at same offset, outside the suffix, does make a new call to rewind. + _, _ = raw.ReadAt(discard, 85) + expectedOpenCalls++ + require.Equal(t, expectedOpenCalls, openCalls) + + // Read at new pos does makes a new call. + _, _ = raw.ReadAt(discard, 0) + expectedOpenCalls++ + require.Equal(t, expectedOpenCalls, openCalls) + + // Read at cur pos (where last read stopped) does not reposition. + _, _ = raw.ReadAt(discard, 5) + require.Equal(t, expectedOpenCalls, openCalls) + + // Read at in suffix between non-suffix reads does not make a call. + _, _ = raw.ReadAt(discard, 92) + require.Equal(t, expectedOpenCalls, openCalls) + + // Read at where prior non-suffix read finished does not make a new call. + _, _ = raw.ReadAt(discard, 10) + require.Equal(t, expectedOpenCalls, openCalls) +}