From a850d2067314a8cb996d69058e8c062a75efc416 Mon Sep 17 00:00:00 2001 From: Faizan Qazi Date: Tue, 29 Jun 2021 21:39:28 -0400 Subject: [PATCH] sql: implement unique index validation inside the new Previously, the unique index validation inside the new schema changer was stubbed out. This was inadequate because during index addition operations we were not properly confirming the unique constraints were properly implemented. To address this, this patch adds support in the new schema changer for the valdiation logic. Release note: None --- pkg/ccl/backupccl/restore_job.go | 8 +- pkg/sql/backfill.go | 39 +++---- pkg/sql/check.go | 3 +- pkg/sql/conn_executor.go | 3 +- pkg/sql/schema_change_plan_node.go | 3 +- pkg/sql/schemachanger/scexec/BUILD.bazel | 2 + pkg/sql/schemachanger/scexec/executor.go | 109 ++++++++++++++---- .../scexec/executor_external_test.go | 101 ++++++++++------ pkg/sql/schemachanger/scjob/job.go | 2 +- pkg/sql/sqlutil/BUILD.bazel | 1 + pkg/sql/sqlutil/internal_executor.go | 24 ++++ 11 files changed, 205 insertions(+), 90 deletions(-) diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 26bba49e2297..67053bbfaf09 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -42,6 +42,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/storage/cloud" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" @@ -1765,7 +1767,7 @@ func revalidateIndexes( // We don't actually need the 'historical' read the way the schema change does // since our table is offline. - var runner sql.HistoricalInternalExecTxnRunner = func(ctx context.Context, fn sql.InternalExecFn) error { + var runner sqlutil.HistoricalInternalExecTxnRunner = func(ctx context.Context, fn sqlutil.InternalExecFn) error { return execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { ie := job.MakeSessionBoundInternalExecutor(ctx, sql.NewFakeSessionData(execCfg.SV())).(*sql.InternalExecutor) return fn(ctx, txn, ie) @@ -1793,7 +1795,7 @@ func revalidateIndexes( } } if len(forward) > 0 { - if err := sql.ValidateForwardIndexes(ctx, tableDesc.MakePublic(), forward, runner, false, true); err != nil { + if err := sql.ValidateForwardIndexes(ctx, tableDesc.MakePublic(), forward, runner, false, true, sessiondata.InternalExecutorOverride{}); err != nil { if invalid := (sql.InvalidIndexesError{}); errors.As(err, &invalid) { invalidIndexes[tableDesc.ID] = invalid.Indexes } else { @@ -1802,7 +1804,7 @@ func revalidateIndexes( } } if len(inverted) > 0 { - if err := sql.ValidateInvertedIndexes(ctx, execCfg.Codec, tableDesc.MakePublic(), inverted, runner, true); err != nil { + if err := sql.ValidateInvertedIndexes(ctx, execCfg.Codec, tableDesc.MakePublic(), inverted, runner, true, sessiondata.InternalExecutorOverride{}); err != nil { if invalid := (sql.InvalidIndexesError{}); errors.As(err, &invalid) { invalidIndexes[tableDesc.ID] = append(invalidIndexes[tableDesc.ID], invalid.Indexes...) } else { diff --git a/pkg/sql/backfill.go b/pkg/sql/backfill.go index 5567b0b96350..6a315ac4d00f 100644 --- a/pkg/sql/backfill.go +++ b/pkg/sql/backfill.go @@ -42,6 +42,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/rowexec" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -146,26 +147,18 @@ func (sc *SchemaChanger) makeFixedTimestampRunner(readAsOf hlc.Timestamp) histor return runner } -// InternalExecFn is the type of functions that operates using an internalExecutor. -type InternalExecFn func(ctx context.Context, txn *kv.Txn, ie *InternalExecutor) error - -// HistoricalInternalExecTxnRunner is like historicalTxnRunner except it only -// passes the fn the exported InternalExecutor instead of the whole unexported -// extendedEvalContenxt, so it can be implemented outside pkg/sql. -type HistoricalInternalExecTxnRunner func(ctx context.Context, fn InternalExecFn) error - // makeFixedTimestampRunner creates a HistoricalTxnRunner suitable for use by the helpers. func (sc *SchemaChanger) makeFixedTimestampInternalExecRunner( readAsOf hlc.Timestamp, -) HistoricalInternalExecTxnRunner { - runner := func(ctx context.Context, retryable InternalExecFn) error { +) sqlutil.HistoricalInternalExecTxnRunner { + runner := func(ctx context.Context, retryable sqlutil.InternalExecFn) error { return sc.fixedTimestampTxn(ctx, readAsOf, func( ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ) error { // We need to re-create the evalCtx since the txn may retry. ie := createSchemaChangeEvalCtx( ctx, sc.execCfg, readAsOf, sc.ieFactory, descriptors, - ).InternalExecutor.(*InternalExecutor) + ).InternalExecutor.(sqlutil.InternalExecutor) return retryable(ctx, txn, ie) }) } @@ -1506,12 +1499,12 @@ func (sc *SchemaChanger) validateIndexes(ctx context.Context) error { if len(forwardIndexes) > 0 { grp.GoCtx(func(ctx context.Context) error { - return ValidateForwardIndexes(ctx, tableDesc, forwardIndexes, runHistoricalTxn, true /* withFirstMutationPubic */, false /* gatherAllInvalid */) + return ValidateForwardIndexes(ctx, tableDesc, forwardIndexes, runHistoricalTxn, true /* withFirstMutationPubic */, false /* gatherAllInvalid */, sessiondata.InternalExecutorOverride{}) }) } if len(invertedIndexes) > 0 { grp.GoCtx(func(ctx context.Context) error { - return ValidateInvertedIndexes(ctx, sc.execCfg.Codec, tableDesc, invertedIndexes, runHistoricalTxn, false /* gatherAllInvalid */) + return ValidateInvertedIndexes(ctx, sc.execCfg.Codec, tableDesc, invertedIndexes, runHistoricalTxn, false /* gatherAllInvalid */, sessiondata.InternalExecutorOverride{}) }) } if err := grp.Wait(); err != nil { @@ -1542,8 +1535,9 @@ func ValidateInvertedIndexes( codec keys.SQLCodec, tableDesc catalog.TableDescriptor, indexes []catalog.Index, - runHistoricalTxn HistoricalInternalExecTxnRunner, + runHistoricalTxn sqlutil.HistoricalInternalExecTxnRunner, gatherAllInvalid bool, + execOverride sessiondata.InternalExecutorOverride, ) error { grp := ctxgroup.WithContext(ctx) invalid := make(chan descpb.IndexID, len(indexes)) @@ -1567,7 +1561,7 @@ func ValidateInvertedIndexes( span := tableDesc.IndexSpan(codec, idx.GetID()) key := span.Key endKey := span.EndKey - if err := runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, _ *InternalExecutor) error { + if err := runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, _ sqlutil.InternalExecutor) error { for { kvs, err := txn.Scan(ctx, key, endKey, 1000000) if err != nil { @@ -1631,7 +1625,7 @@ func ValidateInvertedIndexes( colNameOrExpr = fmt.Sprintf("%q", col.ColName()) } - if err := runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, ie *InternalExecutor) error { + if err := runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error { var stmt string geoConfig := idx.GetGeoConfig() if geoindex.IsEmptyConfig(&geoConfig) { @@ -1651,7 +1645,7 @@ func ValidateInvertedIndexes( stmt = fmt.Sprintf(`%s WHERE %s`, stmt, idx.GetPredicate()) } return ie.WithSyntheticDescriptors([]catalog.Descriptor{tableDesc}, func() error { - row, err := ie.QueryRowEx(ctx, "verify-inverted-idx-count", txn, sessiondata.InternalExecutorOverride{}, stmt) + row, err := ie.QueryRowEx(ctx, "verify-inverted-idx-count", txn, execOverride, stmt) if err != nil { return err } @@ -1700,9 +1694,10 @@ func ValidateForwardIndexes( ctx context.Context, tableDesc catalog.TableDescriptor, indexes []catalog.Index, - runHistoricalTxn HistoricalInternalExecTxnRunner, + runHistoricalTxn sqlutil.HistoricalInternalExecTxnRunner, withFirstMutationPublic bool, gatherAllInvalid bool, + execOverride sessiondata.InternalExecutorOverride, ) error { grp := ctxgroup.WithContext(ctx) @@ -1764,7 +1759,7 @@ func ValidateForwardIndexes( // Retrieve the row count in the index. var idxLen int64 - if err := runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, ie *InternalExecutor) error { + if err := runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error { query := fmt.Sprintf(`SELECT count(1) FROM [%d AS t]@[%d]`, desc.GetID(), idx.GetID()) // If the index is a partial index the predicate must be added // as a filter to the query to force scanning the index. @@ -1773,7 +1768,7 @@ func ValidateForwardIndexes( } return ie.WithSyntheticDescriptors([]catalog.Descriptor{desc}, func() error { - row, err := ie.QueryRowEx(ctx, "verify-idx-count", txn, sessiondata.InternalExecutorOverride{}, query) + row, err := ie.QueryRowEx(ctx, "verify-idx-count", txn, execOverride, query) if err != nil { return err } @@ -1856,7 +1851,7 @@ func ValidateForwardIndexes( } // Count the number of rows in the table. - if err := runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, ie *InternalExecutor) error { + if err := runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error { var s strings.Builder for _, idx := range indexes { // For partial indexes, count the number of rows in the table @@ -1872,7 +1867,7 @@ func ValidateForwardIndexes( query := fmt.Sprintf(`SELECT count(1)%s FROM [%d AS t]@[%d]`, partialIndexCounts, desc.GetID(), desc.GetPrimaryIndexID()) return ie.WithSyntheticDescriptors([]catalog.Descriptor{desc}, func() error { - cnt, err := ie.QueryRowEx(ctx, "VERIFY INDEX", txn, sessiondata.InternalExecutorOverride{}, query) + cnt, err := ie.QueryRowEx(ctx, "VERIFY INDEX", txn, execOverride, query) if err != nil { return err } diff --git a/pkg/sql/check.go b/pkg/sql/check.go index f89093109182..efb1139b8407 100644 --- a/pkg/sql/check.go +++ b/pkg/sql/check.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" @@ -370,7 +371,7 @@ func validateUniqueConstraint( constraintName string, columnIDs []descpb.ColumnID, pred string, - ie *InternalExecutor, + ie sqlutil.InternalExecutor, txn *kv.Txn, ) error { query, colNames, err := duplicateRowQuery( diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index f929b69b93b8..d1522b9fbf98 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -2744,7 +2744,8 @@ func (ex *connExecutor) runPreCommitStages(ctx context.Context) error { executor := scexec.NewExecutor( ex.planner.txn, &ex.extraTxnState.descCollection, ex.server.cfg.Codec, nil /* backfiller */, nil /* jobTracker */, ex.server.cfg.NewSchemaChangerTestingKnobs, - ex.server.cfg.JobRegistry, ex.planner.execCfg.InternalExecutor,ex.server.cfg.Settings, ex.planner.EvalContext(), + ex.server.cfg.JobRegistry, ex.planner.execCfg.InternalExecutor, ex.server.cfg.Settings, ex.planner.EvalContext(), + ValidateForwardIndexes, ValidateInvertedIndexes, ) after, err := runNewSchemaChanger( ctx, diff --git a/pkg/sql/schema_change_plan_node.go b/pkg/sql/schema_change_plan_node.go index b83f5c8f838a..cf6f3aa25b8d 100644 --- a/pkg/sql/schema_change_plan_node.go +++ b/pkg/sql/schema_change_plan_node.go @@ -122,7 +122,8 @@ func (s *schemaChangePlanNode) startExec(params runParams) error { scs := p.extendedEvalCtx.SchemaChangerState executor := scexec.NewExecutor(p.txn, p.Descriptors(), p.EvalContext().Codec, nil /* backfiller */, nil /* jobTracker */, p.ExecCfg().NewSchemaChangerTestingKnobs, - params.extendedEvalCtx.ExecCfg.JobRegistry, params.p.execCfg.InternalExecutor, p.ExecCfg().Settings, p.EvalContext()) + params.extendedEvalCtx.ExecCfg.JobRegistry, params.p.execCfg.InternalExecutor, p.ExecCfg().Settings, p.EvalContext(), + ValidateForwardIndexes, ValidateInvertedIndexes) after, err := runNewSchemaChanger( params.ctx, scplan.StatementPhase, s.plannedState, executor, scs.stmts, ) diff --git a/pkg/sql/schemachanger/scexec/BUILD.bazel b/pkg/sql/schemachanger/scexec/BUILD.bazel index dcd40991732d..3763488ec593 100644 --- a/pkg/sql/schemachanger/scexec/BUILD.bazel +++ b/pkg/sql/schemachanger/scexec/BUILD.bazel @@ -49,6 +49,7 @@ go_test( deps = [ ":scexec", "//pkg/base", + "//pkg/keys", "//pkg/kv", "//pkg/security", "//pkg/security/securitytest", @@ -67,6 +68,7 @@ go_test( "//pkg/sql/schemachanger/scpb", "//pkg/sql/schemachanger/scplan", "//pkg/sql/sem/tree", + "//pkg/sql/sessiondata", "//pkg/sql/sessiondatapb", "//pkg/sql/sqlutil", "//pkg/sql/types", diff --git a/pkg/sql/schemachanger/scexec/executor.go b/pkg/sql/schemachanger/scexec/executor.go index eb0051566345..46f8c70ab257 100644 --- a/pkg/sql/schemachanger/scexec/executor.go +++ b/pkg/sql/schemachanger/scexec/executor.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" @@ -28,24 +29,51 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scop" "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scplan" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" ) +// ValidateForwardIndexesFn call back function used to validate +// a set of forward indexes. +type ValidateForwardIndexesFn func( + ctx context.Context, + tableDesc catalog.TableDescriptor, + indexes []catalog.Index, + runHistoricalTxn sqlutil.HistoricalInternalExecTxnRunner, + withFirstMutationPublic bool, + gatherAllInvalid bool, + execOverride sessiondata.InternalExecutorOverride, +) error + +// ValidateInvertedIndexesFn call back function used to validate +// a set of inverted indexes. +type ValidateInvertedIndexesFn func( + ctx context.Context, + codec keys.SQLCodec, + tableDesc catalog.TableDescriptor, + indexes []catalog.Index, + runHistoricalTxn sqlutil.HistoricalInternalExecTxnRunner, + gatherAllInvalid bool, + execOverride sessiondata.InternalExecutorOverride, +) error + // An Executor executes ops generated during planning. It mostly holds // dependencies for execution and has little additional logic of its own. type Executor struct { - txn *kv.Txn - descsCollection *descs.Collection - codec keys.SQLCodec - indexBackfiller IndexBackfiller - jobTracker JobProgressTracker - testingKnobs *NewSchemaChangerTestingKnobs - jobRegistry *jobs.Registry - executor sqlutil.InternalExecutor - settings *cluster.Settings - evalCtx *tree.EvalContext + txn *kv.Txn + descsCollection *descs.Collection + codec keys.SQLCodec + indexBackfiller IndexBackfiller + jobTracker JobProgressTracker + testingKnobs *NewSchemaChangerTestingKnobs + jobRegistry *jobs.Registry + executor sqlutil.InternalExecutor + settings *cluster.Settings + evalCtx *tree.EvalContext + validateForwardIndexes ValidateForwardIndexesFn + validateInvertedIndexes ValidateInvertedIndexesFn } // NewExecutor creates a new Executor. @@ -60,18 +88,22 @@ func NewExecutor( executor sqlutil.InternalExecutor, settings *cluster.Settings, evalCtx *tree.EvalContext, + validateForwardIndexes ValidateForwardIndexesFn, + validateInvertedIndexes ValidateInvertedIndexesFn, ) *Executor { return &Executor{ - txn: txn, - descsCollection: descsCollection, - codec: codec, - indexBackfiller: backfiller, - jobTracker: tracker, - testingKnobs: testingKnobs, - jobRegistry: jobRegistry, - executor: executor, - settings: settings, - evalCtx: evalCtx, + txn: txn, + descsCollection: descsCollection, + codec: codec, + indexBackfiller: backfiller, + jobTracker: tracker, + testingKnobs: testingKnobs, + jobRegistry: jobRegistry, + executor: executor, + settings: settings, + evalCtx: evalCtx, + validateForwardIndexes: validateForwardIndexes, + validateInvertedIndexes: validateInvertedIndexes, } } @@ -119,7 +151,39 @@ func (ex *Executor) ExecuteOps( } func (ex *Executor) executeValidationOps(ctx context.Context, execute []scop.Op) error { - log.Errorf(ctx, "not implemented") + for _, op := range execute { + switch op := op.(type) { + case *scop.ValidateUniqueIndex: + table, err := ex.descsCollection.GetImmutableTableByID(ctx, ex.txn, op.TableID, tree.ObjectLookupFlagsWithRequiredTableKind(tree.ResolveRequireTableDesc)) + if err != nil { + return err + } + index, err := table.FindIndexWithID(op.IndexID) + if err != nil { + return err + } + // Setup a new transaction with the current timestamp. + txnRunner := func(ctx context.Context, fn sqlutil.InternalExecFn) error { + validationTxn := ex.txn.DB().NewTxn(ctx, "validation") + validationTxn.SetFixedTimestamp(ctx, ex.txn.DB().Clock().Now()) + return fn(ctx, validationTxn, ex.executor) + } + // Execute the validation operation as a root user. + execOverride := sessiondata.InternalExecutorOverride{ + User: security.RootUserName(), + } + if index.GetType() == descpb.IndexDescriptor_FORWARD { + err = ex.validateForwardIndexes(ctx, table, []catalog.Index{index}, txnRunner, true, false, execOverride) + } else { + err = ex.validateInvertedIndexes(ctx, ex.codec, table, []catalog.Index{index}, txnRunner, false, execOverride) + } + return err + case *scop.ValidateCheckConstraint: + log.Errorf(ctx, "not implemented") + default: + panic("unimplemented") + } + } return nil } @@ -284,9 +348,6 @@ func UpdateDescriptorJobIDs( if desc.DescriptorType() != catalog.Table { continue } - if err != nil { - return err - } // Currently all "locking" schema changes are on tables. This will probably // need to be expanded at least to types. table, err := descriptors.GetMutableTableByID(ctx, txn, id, diff --git a/pkg/sql/schemachanger/scexec/executor_external_test.go b/pkg/sql/schemachanger/scexec/executor_external_test.go index 072eeeef605b..3d96b7b29bad 100644 --- a/pkg/sql/schemachanger/scexec/executor_external_test.go +++ b/pkg/sql/schemachanger/scexec/executor_external_test.go @@ -15,6 +15,7 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -32,6 +33,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scpb" "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scplan" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/types" @@ -126,7 +128,7 @@ CREATE TABLE db.t ( require.NoError(t, ti.txn(ctx, func( ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ) error { - ex := scexec.NewExecutor(txn, descriptors, ti.lm.Codec(), nil, nil, nil, nil, nil, nil, nil) + ex := scexec.NewExecutor(txn, descriptors, ti.lm.Codec(), nil, nil, nil, nil, nil, nil, nil, nil, nil) _, orig, err := descriptors.GetImmutableTableByName(ctx, txn, &tn, immFlags) require.NoError(t, err) require.Equal(t, c.orig(), orig) @@ -139,10 +141,13 @@ CREATE TABLE db.t ( } indexToAdd := descpb.IndexDescriptor{ - ID: 2, - Name: "foo", - KeyColumnIDs: []descpb.ColumnID{1}, - KeyColumnNames: []string{"i"}, + ID: 2, + Name: "foo", + Version: descpb.StrictIndexColumnIDGuaranteesVersion, + CreatedExplicitly: true, + KeyColumnIDs: []descpb.ColumnID{1}, + KeyColumnNames: []string{"i"}, + StoreColumnNames: []string{}, KeyColumnDirections: []descpb.IndexDescriptor_Direction{ descpb.IndexDescriptor_ASC, }, @@ -167,8 +172,12 @@ CREATE TABLE db.t ( ops: func() scop.Ops { return scop.MakeOps( &scop.MakeAddedIndexDeleteOnly{ - TableID: table.ID, - Index: indexToAdd, + TableID: table.ID, + IndexID: indexToAdd.ID, + IndexName: indexToAdd.Name, + KeyColumnIDs: indexToAdd.KeyColumnIDs, + KeyColumnDirections: indexToAdd.KeyColumnDirections, + SecondaryIndex: true, }, ) }, @@ -212,6 +221,29 @@ CREATE TABLE db.t ( func TestSchemaChanger(t *testing.T) { defer leaktest.AfterTest(t)() + // Dummy functions for validating indexes / inverted indexes + dummyValidateIndexes := func( + ctx context.Context, + tableDesc catalog.TableDescriptor, + indexes []catalog.Index, + runHistoricalTxn sqlutil.HistoricalInternalExecTxnRunner, + withFirstMutationPublic bool, + gatherAllInvalid bool, + execOverride sessiondata.InternalExecutorOverride, + ) error { + return nil + } + dummyValidateInvertedIndexes := func( + ctx context.Context, + codec keys.SQLCodec, + tableDesc catalog.TableDescriptor, + indexes []catalog.Index, + runHistoricalTxn sqlutil.HistoricalInternalExecTxnRunner, + gatherAllInvalid bool, + execOverride sessiondata.InternalExecutorOverride, + ) error { + return nil + } ctx := context.Background() t.Run("add column", func(t *testing.T) { ti := setupTestInfra(t) @@ -234,21 +266,14 @@ func TestSchemaChanger(t *testing.T) { // targetSlice = []*scpb.Target{ scpb.NewTarget(scpb.Target_ADD, &scpb.PrimaryIndex{ - TableID: fooTable.GetID(), - Index: descpb.IndexDescriptor{ - Name: "new_primary_key", - ID: 2, - KeyColumnIDs: []descpb.ColumnID{1}, - KeyColumnNames: []string{"i"}, - KeyColumnDirections: []descpb.IndexDescriptor_Direction{descpb.IndexDescriptor_ASC}, - StoreColumnIDs: []descpb.ColumnID{2}, - StoreColumnNames: []string{"j"}, - Unique: true, - Type: descpb.IndexDescriptor_FORWARD, - Version: descpb.PrimaryIndexWithStoredColumnsVersion, - EncodingType: descpb.PrimaryIndexEncoding, - }, - OtherPrimaryIndexID: fooTable.GetPrimaryIndexID(), + TableID: fooTable.GetID(), + IndexName: "new_primary_key", + IndexId: 2, + KeyColumnIDs: []descpb.ColumnID{1}, + KeyColumnDirections: []scpb.PrimaryIndex_Direction{scpb.PrimaryIndex_ASC}, + StoringColumnIDs: []descpb.ColumnID{2}, + Unique: true, + Inverted: false, }), scpb.NewTarget(scpb.Target_ADD, &scpb.Column{ TableID: fooTable.GetID(), @@ -263,17 +288,13 @@ func TestSchemaChanger(t *testing.T) { }, }), scpb.NewTarget(scpb.Target_DROP, &scpb.PrimaryIndex{ - TableID: fooTable.GetID(), - Index: descpb.IndexDescriptor{ - Name: "primary", - ID: 1, - KeyColumnIDs: []descpb.ColumnID{1}, - KeyColumnNames: []string{"i"}, - KeyColumnDirections: []descpb.IndexDescriptor_Direction{descpb.IndexDescriptor_ASC}, - Unique: true, - Type: descpb.IndexDescriptor_FORWARD, - }, - OtherPrimaryIndexID: 2, + TableID: fooTable.GetID(), + IndexName: "primary", + IndexId: 1, + KeyColumnIDs: []descpb.ColumnID{1}, + KeyColumnDirections: []scpb.PrimaryIndex_Direction{scpb.PrimaryIndex_ASC}, + Unique: true, + Inverted: false, }), } @@ -313,6 +334,8 @@ func TestSchemaChanger(t *testing.T) { nil, nil, nil, + nil, + nil, ) require.NoError(t, exec.ExecuteOps(ctx, s.Ops, scexec.TestingKnobMetadata{})) ts = s.After @@ -329,7 +352,12 @@ func TestSchemaChanger(t *testing.T) { }) require.NoError(t, err) for _, s := range sc.Stages { - exec := scexec.NewExecutor(txn, descriptors, ti.lm.Codec(), noopBackfiller{}, nil, nil, nil, nil, nil, nil) + exec := scexec.NewExecutor(txn, descriptors, ti.lm.Codec(), noopBackfiller{}, nil, nil, nil, nil, nil, nil, + func(ctx context.Context, tableDesc catalog.TableDescriptor, indexes []catalog.Index, runHistoricalTxn sqlutil.HistoricalInternalExecTxnRunner, withFirstMutationPublic bool, gatherAllInvalid bool, execOverride sessiondata.InternalExecutorOverride) error { + return nil + }, func(ctx context.Context, codec keys.SQLCodec, tableDesc catalog.TableDescriptor, indexes []catalog.Index, runHistoricalTxn sqlutil.HistoricalInternalExecTxnRunner, gatherAllInvalid bool, execOverride sessiondata.InternalExecutorOverride) error { + return nil + }) require.NoError(t, exec.ExecuteOps(ctx, s.Ops, scexec.TestingKnobMetadata{})) after = s.After } @@ -389,7 +417,6 @@ func TestSchemaChanger(t *testing.T) { require.Len(t, parsed, 1) outputNodes, err := scbuild.Build(ctx, buildDeps, nil, parsed[0].AST.(*tree.AlterTable)) require.NoError(t, err) - for _, phase := range []scplan.Phase{ scplan.StatementPhase, scplan.PreCommitPhase, @@ -399,7 +426,7 @@ func TestSchemaChanger(t *testing.T) { }) require.NoError(t, err) for _, s := range sc.Stages { - require.NoError(t, scexec.NewExecutor(txn, descriptors, ti.lm.Codec(), noopBackfiller{}, nil, nil, nil, nil, nil, nil). + require.NoError(t, scexec.NewExecutor(txn, descriptors, ti.lm.Codec(), noopBackfiller{}, nil, nil, nil, nil, nil, nil, dummyValidateIndexes, dummyValidateInvertedIndexes). ExecuteOps(ctx, s.Ops, scexec.TestingKnobMetadata{})) ts = s.After } @@ -414,7 +441,7 @@ func TestSchemaChanger(t *testing.T) { }) require.NoError(t, err) for _, s := range sc.Stages { - exec := scexec.NewExecutor(txn, descriptors, ti.lm.Codec(), noopBackfiller{}, nil, nil, nil, nil, nil, nil) + exec := scexec.NewExecutor(txn, descriptors, ti.lm.Codec(), noopBackfiller{}, nil, nil, nil, nil, nil, nil, dummyValidateIndexes, dummyValidateInvertedIndexes) require.NoError(t, exec.ExecuteOps(ctx, s.Ops, scexec.TestingKnobMetadata{})) } return nil diff --git a/pkg/sql/schemachanger/scjob/job.go b/pkg/sql/schemachanger/scjob/job.go index c283d3a50ee8..1b7839c40d6c 100644 --- a/pkg/sql/schemachanger/scjob/job.go +++ b/pkg/sql/schemachanger/scjob/job.go @@ -117,7 +117,7 @@ func (n *newSchemaChangeResumer) Resume(ctx context.Context, execCtxI interface{ if err := scexec.NewExecutor( txn, descriptors, execCtx.ExecCfg().Codec, execCtx.ExecCfg().IndexBackfiller, jt, execCtx.ExecCfg().NewSchemaChangerTestingKnobs, execCtx.ExecCfg().JobRegistry, - execCtx.ExecCfg().InternalExecutor, execCtx.ExecCfg().Settings, &execCtx.ExtendedEvalContext().EvalContext).ExecuteOps(ctx, s.Ops, scexec.TestingKnobMetadata{ + execCtx.ExecCfg().InternalExecutor, execCtx.ExecCfg().Settings, &execCtx.ExtendedEvalContext().EvalContext, sql.ValidateForwardIndexes, sql.ValidateInvertedIndexes).ExecuteOps(ctx, s.Ops, scexec.TestingKnobMetadata{ Statements: n.job.Payload().Statement, Phase: scplan.PostCommitPhase, }); err != nil { diff --git a/pkg/sql/sqlutil/BUILD.bazel b/pkg/sql/sqlutil/BUILD.bazel index ccf251c5eb7d..234ce51cb482 100644 --- a/pkg/sql/sqlutil/BUILD.bazel +++ b/pkg/sql/sqlutil/BUILD.bazel @@ -7,6 +7,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/kv", + "//pkg/sql/catalog", "//pkg/sql/catalog/colinfo", "//pkg/sql/sem/tree", "//pkg/sql/sessiondata", diff --git a/pkg/sql/sqlutil/internal_executor.go b/pkg/sql/sqlutil/internal_executor.go index aa0d2f7d5253..fedcc0d28440 100644 --- a/pkg/sql/sqlutil/internal_executor.go +++ b/pkg/sql/sqlutil/internal_executor.go @@ -14,6 +14,7 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" @@ -142,6 +143,21 @@ type InternalExecutor interface { stmt string, qargs ...interface{}, ) (InternalRows, error) + + // WithSyntheticDescriptors sets the synthetic descriptors before running the + // the provided closure and resets them afterward. Used for queries/statements + // that need to use in-memory synthetic descriptors different from descriptors + // written to disk. These descriptors override all other descriptors on the + // immutable resolution path. + // + // Warning: Not safe for concurrent use from multiple goroutines. This API is + // flawed in that the internal executor is meant to function as a stateless + // wrapper, and creates a new connExecutor and descs.Collection on each query/ + // statement, so these descriptors should really be specified at a per-query/ + // statement level. See #34304. + WithSyntheticDescriptors( + descs []catalog.Descriptor, run func() error, + ) error } // InternalRows is an iterator interface that's exposed by the internal @@ -180,3 +196,11 @@ type InternalRows interface { type SessionBoundInternalExecutorFactory func( context.Context, *sessiondata.SessionData, ) InternalExecutor + +// InternalExecFn is the type of functions that operates using an internalExecutor. +type InternalExecFn func(ctx context.Context, txn *kv.Txn, ie InternalExecutor) error + +// HistoricalInternalExecTxnRunner is like historicalTxnRunner except it only +// passes the fn the exported InternalExecutor instead of the whole unexported +// extendedEvalContenxt, so it can be implemented outside pkg/sql. +type HistoricalInternalExecTxnRunner func(ctx context.Context, fn InternalExecFn) error