diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 26bba49e2297..67053bbfaf09 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -42,6 +42,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/storage/cloud" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" @@ -1765,7 +1767,7 @@ func revalidateIndexes( // We don't actually need the 'historical' read the way the schema change does // since our table is offline. - var runner sql.HistoricalInternalExecTxnRunner = func(ctx context.Context, fn sql.InternalExecFn) error { + var runner sqlutil.HistoricalInternalExecTxnRunner = func(ctx context.Context, fn sqlutil.InternalExecFn) error { return execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { ie := job.MakeSessionBoundInternalExecutor(ctx, sql.NewFakeSessionData(execCfg.SV())).(*sql.InternalExecutor) return fn(ctx, txn, ie) @@ -1793,7 +1795,7 @@ func revalidateIndexes( } } if len(forward) > 0 { - if err := sql.ValidateForwardIndexes(ctx, tableDesc.MakePublic(), forward, runner, false, true); err != nil { + if err := sql.ValidateForwardIndexes(ctx, tableDesc.MakePublic(), forward, runner, false, true, sessiondata.InternalExecutorOverride{}); err != nil { if invalid := (sql.InvalidIndexesError{}); errors.As(err, &invalid) { invalidIndexes[tableDesc.ID] = invalid.Indexes } else { @@ -1802,7 +1804,7 @@ func revalidateIndexes( } } if len(inverted) > 0 { - if err := sql.ValidateInvertedIndexes(ctx, execCfg.Codec, tableDesc.MakePublic(), inverted, runner, true); err != nil { + if err := sql.ValidateInvertedIndexes(ctx, execCfg.Codec, tableDesc.MakePublic(), inverted, runner, true, sessiondata.InternalExecutorOverride{}); err != nil { if invalid := (sql.InvalidIndexesError{}); errors.As(err, &invalid) { invalidIndexes[tableDesc.ID] = append(invalidIndexes[tableDesc.ID], invalid.Indexes...) } else { diff --git a/pkg/sql/backfill.go b/pkg/sql/backfill.go index 5567b0b96350..6a315ac4d00f 100644 --- a/pkg/sql/backfill.go +++ b/pkg/sql/backfill.go @@ -42,6 +42,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/rowexec" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -146,26 +147,18 @@ func (sc *SchemaChanger) makeFixedTimestampRunner(readAsOf hlc.Timestamp) histor return runner } -// InternalExecFn is the type of functions that operates using an internalExecutor. -type InternalExecFn func(ctx context.Context, txn *kv.Txn, ie *InternalExecutor) error - -// HistoricalInternalExecTxnRunner is like historicalTxnRunner except it only -// passes the fn the exported InternalExecutor instead of the whole unexported -// extendedEvalContenxt, so it can be implemented outside pkg/sql. -type HistoricalInternalExecTxnRunner func(ctx context.Context, fn InternalExecFn) error - // makeFixedTimestampRunner creates a HistoricalTxnRunner suitable for use by the helpers. func (sc *SchemaChanger) makeFixedTimestampInternalExecRunner( readAsOf hlc.Timestamp, -) HistoricalInternalExecTxnRunner { - runner := func(ctx context.Context, retryable InternalExecFn) error { +) sqlutil.HistoricalInternalExecTxnRunner { + runner := func(ctx context.Context, retryable sqlutil.InternalExecFn) error { return sc.fixedTimestampTxn(ctx, readAsOf, func( ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ) error { // We need to re-create the evalCtx since the txn may retry. ie := createSchemaChangeEvalCtx( ctx, sc.execCfg, readAsOf, sc.ieFactory, descriptors, - ).InternalExecutor.(*InternalExecutor) + ).InternalExecutor.(sqlutil.InternalExecutor) return retryable(ctx, txn, ie) }) } @@ -1506,12 +1499,12 @@ func (sc *SchemaChanger) validateIndexes(ctx context.Context) error { if len(forwardIndexes) > 0 { grp.GoCtx(func(ctx context.Context) error { - return ValidateForwardIndexes(ctx, tableDesc, forwardIndexes, runHistoricalTxn, true /* withFirstMutationPubic */, false /* gatherAllInvalid */) + return ValidateForwardIndexes(ctx, tableDesc, forwardIndexes, runHistoricalTxn, true /* withFirstMutationPubic */, false /* gatherAllInvalid */, sessiondata.InternalExecutorOverride{}) }) } if len(invertedIndexes) > 0 { grp.GoCtx(func(ctx context.Context) error { - return ValidateInvertedIndexes(ctx, sc.execCfg.Codec, tableDesc, invertedIndexes, runHistoricalTxn, false /* gatherAllInvalid */) + return ValidateInvertedIndexes(ctx, sc.execCfg.Codec, tableDesc, invertedIndexes, runHistoricalTxn, false /* gatherAllInvalid */, sessiondata.InternalExecutorOverride{}) }) } if err := grp.Wait(); err != nil { @@ -1542,8 +1535,9 @@ func ValidateInvertedIndexes( codec keys.SQLCodec, tableDesc catalog.TableDescriptor, indexes []catalog.Index, - runHistoricalTxn HistoricalInternalExecTxnRunner, + runHistoricalTxn sqlutil.HistoricalInternalExecTxnRunner, gatherAllInvalid bool, + execOverride sessiondata.InternalExecutorOverride, ) error { grp := ctxgroup.WithContext(ctx) invalid := make(chan descpb.IndexID, len(indexes)) @@ -1567,7 +1561,7 @@ func ValidateInvertedIndexes( span := tableDesc.IndexSpan(codec, idx.GetID()) key := span.Key endKey := span.EndKey - if err := runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, _ *InternalExecutor) error { + if err := runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, _ sqlutil.InternalExecutor) error { for { kvs, err := txn.Scan(ctx, key, endKey, 1000000) if err != nil { @@ -1631,7 +1625,7 @@ func ValidateInvertedIndexes( colNameOrExpr = fmt.Sprintf("%q", col.ColName()) } - if err := runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, ie *InternalExecutor) error { + if err := runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error { var stmt string geoConfig := idx.GetGeoConfig() if geoindex.IsEmptyConfig(&geoConfig) { @@ -1651,7 +1645,7 @@ func ValidateInvertedIndexes( stmt = fmt.Sprintf(`%s WHERE %s`, stmt, idx.GetPredicate()) } return ie.WithSyntheticDescriptors([]catalog.Descriptor{tableDesc}, func() error { - row, err := ie.QueryRowEx(ctx, "verify-inverted-idx-count", txn, sessiondata.InternalExecutorOverride{}, stmt) + row, err := ie.QueryRowEx(ctx, "verify-inverted-idx-count", txn, execOverride, stmt) if err != nil { return err } @@ -1700,9 +1694,10 @@ func ValidateForwardIndexes( ctx context.Context, tableDesc catalog.TableDescriptor, indexes []catalog.Index, - runHistoricalTxn HistoricalInternalExecTxnRunner, + runHistoricalTxn sqlutil.HistoricalInternalExecTxnRunner, withFirstMutationPublic bool, gatherAllInvalid bool, + execOverride sessiondata.InternalExecutorOverride, ) error { grp := ctxgroup.WithContext(ctx) @@ -1764,7 +1759,7 @@ func ValidateForwardIndexes( // Retrieve the row count in the index. var idxLen int64 - if err := runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, ie *InternalExecutor) error { + if err := runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error { query := fmt.Sprintf(`SELECT count(1) FROM [%d AS t]@[%d]`, desc.GetID(), idx.GetID()) // If the index is a partial index the predicate must be added // as a filter to the query to force scanning the index. @@ -1773,7 +1768,7 @@ func ValidateForwardIndexes( } return ie.WithSyntheticDescriptors([]catalog.Descriptor{desc}, func() error { - row, err := ie.QueryRowEx(ctx, "verify-idx-count", txn, sessiondata.InternalExecutorOverride{}, query) + row, err := ie.QueryRowEx(ctx, "verify-idx-count", txn, execOverride, query) if err != nil { return err } @@ -1856,7 +1851,7 @@ func ValidateForwardIndexes( } // Count the number of rows in the table. - if err := runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, ie *InternalExecutor) error { + if err := runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error { var s strings.Builder for _, idx := range indexes { // For partial indexes, count the number of rows in the table @@ -1872,7 +1867,7 @@ func ValidateForwardIndexes( query := fmt.Sprintf(`SELECT count(1)%s FROM [%d AS t]@[%d]`, partialIndexCounts, desc.GetID(), desc.GetPrimaryIndexID()) return ie.WithSyntheticDescriptors([]catalog.Descriptor{desc}, func() error { - cnt, err := ie.QueryRowEx(ctx, "VERIFY INDEX", txn, sessiondata.InternalExecutorOverride{}, query) + cnt, err := ie.QueryRowEx(ctx, "VERIFY INDEX", txn, execOverride, query) if err != nil { return err } diff --git a/pkg/sql/check.go b/pkg/sql/check.go index f89093109182..efb1139b8407 100644 --- a/pkg/sql/check.go +++ b/pkg/sql/check.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" @@ -370,7 +371,7 @@ func validateUniqueConstraint( constraintName string, columnIDs []descpb.ColumnID, pred string, - ie *InternalExecutor, + ie sqlutil.InternalExecutor, txn *kv.Txn, ) error { query, colNames, err := duplicateRowQuery( diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index f929b69b93b8..d1522b9fbf98 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -2744,7 +2744,8 @@ func (ex *connExecutor) runPreCommitStages(ctx context.Context) error { executor := scexec.NewExecutor( ex.planner.txn, &ex.extraTxnState.descCollection, ex.server.cfg.Codec, nil /* backfiller */, nil /* jobTracker */, ex.server.cfg.NewSchemaChangerTestingKnobs, - ex.server.cfg.JobRegistry, ex.planner.execCfg.InternalExecutor,ex.server.cfg.Settings, ex.planner.EvalContext(), + ex.server.cfg.JobRegistry, ex.planner.execCfg.InternalExecutor, ex.server.cfg.Settings, ex.planner.EvalContext(), + ValidateForwardIndexes, ValidateInvertedIndexes, ) after, err := runNewSchemaChanger( ctx, diff --git a/pkg/sql/schema_change_plan_node.go b/pkg/sql/schema_change_plan_node.go index b83f5c8f838a..cf6f3aa25b8d 100644 --- a/pkg/sql/schema_change_plan_node.go +++ b/pkg/sql/schema_change_plan_node.go @@ -122,7 +122,8 @@ func (s *schemaChangePlanNode) startExec(params runParams) error { scs := p.extendedEvalCtx.SchemaChangerState executor := scexec.NewExecutor(p.txn, p.Descriptors(), p.EvalContext().Codec, nil /* backfiller */, nil /* jobTracker */, p.ExecCfg().NewSchemaChangerTestingKnobs, - params.extendedEvalCtx.ExecCfg.JobRegistry, params.p.execCfg.InternalExecutor, p.ExecCfg().Settings, p.EvalContext()) + params.extendedEvalCtx.ExecCfg.JobRegistry, params.p.execCfg.InternalExecutor, p.ExecCfg().Settings, p.EvalContext(), + ValidateForwardIndexes, ValidateInvertedIndexes) after, err := runNewSchemaChanger( params.ctx, scplan.StatementPhase, s.plannedState, executor, scs.stmts, ) diff --git a/pkg/sql/schemachanger/scexec/BUILD.bazel b/pkg/sql/schemachanger/scexec/BUILD.bazel index dcd40991732d..3763488ec593 100644 --- a/pkg/sql/schemachanger/scexec/BUILD.bazel +++ b/pkg/sql/schemachanger/scexec/BUILD.bazel @@ -49,6 +49,7 @@ go_test( deps = [ ":scexec", "//pkg/base", + "//pkg/keys", "//pkg/kv", "//pkg/security", "//pkg/security/securitytest", @@ -67,6 +68,7 @@ go_test( "//pkg/sql/schemachanger/scpb", "//pkg/sql/schemachanger/scplan", "//pkg/sql/sem/tree", + "//pkg/sql/sessiondata", "//pkg/sql/sessiondatapb", "//pkg/sql/sqlutil", "//pkg/sql/types", diff --git a/pkg/sql/schemachanger/scexec/executor.go b/pkg/sql/schemachanger/scexec/executor.go index eb0051566345..46f8c70ab257 100644 --- a/pkg/sql/schemachanger/scexec/executor.go +++ b/pkg/sql/schemachanger/scexec/executor.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" @@ -28,24 +29,51 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scop" "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scplan" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" ) +// ValidateForwardIndexesFn call back function used to validate +// a set of forward indexes. +type ValidateForwardIndexesFn func( + ctx context.Context, + tableDesc catalog.TableDescriptor, + indexes []catalog.Index, + runHistoricalTxn sqlutil.HistoricalInternalExecTxnRunner, + withFirstMutationPublic bool, + gatherAllInvalid bool, + execOverride sessiondata.InternalExecutorOverride, +) error + +// ValidateInvertedIndexesFn call back function used to validate +// a set of inverted indexes. +type ValidateInvertedIndexesFn func( + ctx context.Context, + codec keys.SQLCodec, + tableDesc catalog.TableDescriptor, + indexes []catalog.Index, + runHistoricalTxn sqlutil.HistoricalInternalExecTxnRunner, + gatherAllInvalid bool, + execOverride sessiondata.InternalExecutorOverride, +) error + // An Executor executes ops generated during planning. It mostly holds // dependencies for execution and has little additional logic of its own. type Executor struct { - txn *kv.Txn - descsCollection *descs.Collection - codec keys.SQLCodec - indexBackfiller IndexBackfiller - jobTracker JobProgressTracker - testingKnobs *NewSchemaChangerTestingKnobs - jobRegistry *jobs.Registry - executor sqlutil.InternalExecutor - settings *cluster.Settings - evalCtx *tree.EvalContext + txn *kv.Txn + descsCollection *descs.Collection + codec keys.SQLCodec + indexBackfiller IndexBackfiller + jobTracker JobProgressTracker + testingKnobs *NewSchemaChangerTestingKnobs + jobRegistry *jobs.Registry + executor sqlutil.InternalExecutor + settings *cluster.Settings + evalCtx *tree.EvalContext + validateForwardIndexes ValidateForwardIndexesFn + validateInvertedIndexes ValidateInvertedIndexesFn } // NewExecutor creates a new Executor. @@ -60,18 +88,22 @@ func NewExecutor( executor sqlutil.InternalExecutor, settings *cluster.Settings, evalCtx *tree.EvalContext, + validateForwardIndexes ValidateForwardIndexesFn, + validateInvertedIndexes ValidateInvertedIndexesFn, ) *Executor { return &Executor{ - txn: txn, - descsCollection: descsCollection, - codec: codec, - indexBackfiller: backfiller, - jobTracker: tracker, - testingKnobs: testingKnobs, - jobRegistry: jobRegistry, - executor: executor, - settings: settings, - evalCtx: evalCtx, + txn: txn, + descsCollection: descsCollection, + codec: codec, + indexBackfiller: backfiller, + jobTracker: tracker, + testingKnobs: testingKnobs, + jobRegistry: jobRegistry, + executor: executor, + settings: settings, + evalCtx: evalCtx, + validateForwardIndexes: validateForwardIndexes, + validateInvertedIndexes: validateInvertedIndexes, } } @@ -119,7 +151,39 @@ func (ex *Executor) ExecuteOps( } func (ex *Executor) executeValidationOps(ctx context.Context, execute []scop.Op) error { - log.Errorf(ctx, "not implemented") + for _, op := range execute { + switch op := op.(type) { + case *scop.ValidateUniqueIndex: + table, err := ex.descsCollection.GetImmutableTableByID(ctx, ex.txn, op.TableID, tree.ObjectLookupFlagsWithRequiredTableKind(tree.ResolveRequireTableDesc)) + if err != nil { + return err + } + index, err := table.FindIndexWithID(op.IndexID) + if err != nil { + return err + } + // Setup a new transaction with the current timestamp. + txnRunner := func(ctx context.Context, fn sqlutil.InternalExecFn) error { + validationTxn := ex.txn.DB().NewTxn(ctx, "validation") + validationTxn.SetFixedTimestamp(ctx, ex.txn.DB().Clock().Now()) + return fn(ctx, validationTxn, ex.executor) + } + // Execute the validation operation as a root user. + execOverride := sessiondata.InternalExecutorOverride{ + User: security.RootUserName(), + } + if index.GetType() == descpb.IndexDescriptor_FORWARD { + err = ex.validateForwardIndexes(ctx, table, []catalog.Index{index}, txnRunner, true, false, execOverride) + } else { + err = ex.validateInvertedIndexes(ctx, ex.codec, table, []catalog.Index{index}, txnRunner, false, execOverride) + } + return err + case *scop.ValidateCheckConstraint: + log.Errorf(ctx, "not implemented") + default: + panic("unimplemented") + } + } return nil } @@ -284,9 +348,6 @@ func UpdateDescriptorJobIDs( if desc.DescriptorType() != catalog.Table { continue } - if err != nil { - return err - } // Currently all "locking" schema changes are on tables. This will probably // need to be expanded at least to types. table, err := descriptors.GetMutableTableByID(ctx, txn, id, diff --git a/pkg/sql/schemachanger/scexec/executor_external_test.go b/pkg/sql/schemachanger/scexec/executor_external_test.go index 072eeeef605b..3d96b7b29bad 100644 --- a/pkg/sql/schemachanger/scexec/executor_external_test.go +++ b/pkg/sql/schemachanger/scexec/executor_external_test.go @@ -15,6 +15,7 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -32,6 +33,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scpb" "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scplan" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/types" @@ -126,7 +128,7 @@ CREATE TABLE db.t ( require.NoError(t, ti.txn(ctx, func( ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ) error { - ex := scexec.NewExecutor(txn, descriptors, ti.lm.Codec(), nil, nil, nil, nil, nil, nil, nil) + ex := scexec.NewExecutor(txn, descriptors, ti.lm.Codec(), nil, nil, nil, nil, nil, nil, nil, nil, nil) _, orig, err := descriptors.GetImmutableTableByName(ctx, txn, &tn, immFlags) require.NoError(t, err) require.Equal(t, c.orig(), orig) @@ -139,10 +141,13 @@ CREATE TABLE db.t ( } indexToAdd := descpb.IndexDescriptor{ - ID: 2, - Name: "foo", - KeyColumnIDs: []descpb.ColumnID{1}, - KeyColumnNames: []string{"i"}, + ID: 2, + Name: "foo", + Version: descpb.StrictIndexColumnIDGuaranteesVersion, + CreatedExplicitly: true, + KeyColumnIDs: []descpb.ColumnID{1}, + KeyColumnNames: []string{"i"}, + StoreColumnNames: []string{}, KeyColumnDirections: []descpb.IndexDescriptor_Direction{ descpb.IndexDescriptor_ASC, }, @@ -167,8 +172,12 @@ CREATE TABLE db.t ( ops: func() scop.Ops { return scop.MakeOps( &scop.MakeAddedIndexDeleteOnly{ - TableID: table.ID, - Index: indexToAdd, + TableID: table.ID, + IndexID: indexToAdd.ID, + IndexName: indexToAdd.Name, + KeyColumnIDs: indexToAdd.KeyColumnIDs, + KeyColumnDirections: indexToAdd.KeyColumnDirections, + SecondaryIndex: true, }, ) }, @@ -212,6 +221,29 @@ CREATE TABLE db.t ( func TestSchemaChanger(t *testing.T) { defer leaktest.AfterTest(t)() + // Dummy functions for validating indexes / inverted indexes + dummyValidateIndexes := func( + ctx context.Context, + tableDesc catalog.TableDescriptor, + indexes []catalog.Index, + runHistoricalTxn sqlutil.HistoricalInternalExecTxnRunner, + withFirstMutationPublic bool, + gatherAllInvalid bool, + execOverride sessiondata.InternalExecutorOverride, + ) error { + return nil + } + dummyValidateInvertedIndexes := func( + ctx context.Context, + codec keys.SQLCodec, + tableDesc catalog.TableDescriptor, + indexes []catalog.Index, + runHistoricalTxn sqlutil.HistoricalInternalExecTxnRunner, + gatherAllInvalid bool, + execOverride sessiondata.InternalExecutorOverride, + ) error { + return nil + } ctx := context.Background() t.Run("add column", func(t *testing.T) { ti := setupTestInfra(t) @@ -234,21 +266,14 @@ func TestSchemaChanger(t *testing.T) { // targetSlice = []*scpb.Target{ scpb.NewTarget(scpb.Target_ADD, &scpb.PrimaryIndex{ - TableID: fooTable.GetID(), - Index: descpb.IndexDescriptor{ - Name: "new_primary_key", - ID: 2, - KeyColumnIDs: []descpb.ColumnID{1}, - KeyColumnNames: []string{"i"}, - KeyColumnDirections: []descpb.IndexDescriptor_Direction{descpb.IndexDescriptor_ASC}, - StoreColumnIDs: []descpb.ColumnID{2}, - StoreColumnNames: []string{"j"}, - Unique: true, - Type: descpb.IndexDescriptor_FORWARD, - Version: descpb.PrimaryIndexWithStoredColumnsVersion, - EncodingType: descpb.PrimaryIndexEncoding, - }, - OtherPrimaryIndexID: fooTable.GetPrimaryIndexID(), + TableID: fooTable.GetID(), + IndexName: "new_primary_key", + IndexId: 2, + KeyColumnIDs: []descpb.ColumnID{1}, + KeyColumnDirections: []scpb.PrimaryIndex_Direction{scpb.PrimaryIndex_ASC}, + StoringColumnIDs: []descpb.ColumnID{2}, + Unique: true, + Inverted: false, }), scpb.NewTarget(scpb.Target_ADD, &scpb.Column{ TableID: fooTable.GetID(), @@ -263,17 +288,13 @@ func TestSchemaChanger(t *testing.T) { }, }), scpb.NewTarget(scpb.Target_DROP, &scpb.PrimaryIndex{ - TableID: fooTable.GetID(), - Index: descpb.IndexDescriptor{ - Name: "primary", - ID: 1, - KeyColumnIDs: []descpb.ColumnID{1}, - KeyColumnNames: []string{"i"}, - KeyColumnDirections: []descpb.IndexDescriptor_Direction{descpb.IndexDescriptor_ASC}, - Unique: true, - Type: descpb.IndexDescriptor_FORWARD, - }, - OtherPrimaryIndexID: 2, + TableID: fooTable.GetID(), + IndexName: "primary", + IndexId: 1, + KeyColumnIDs: []descpb.ColumnID{1}, + KeyColumnDirections: []scpb.PrimaryIndex_Direction{scpb.PrimaryIndex_ASC}, + Unique: true, + Inverted: false, }), } @@ -313,6 +334,8 @@ func TestSchemaChanger(t *testing.T) { nil, nil, nil, + nil, + nil, ) require.NoError(t, exec.ExecuteOps(ctx, s.Ops, scexec.TestingKnobMetadata{})) ts = s.After @@ -329,7 +352,12 @@ func TestSchemaChanger(t *testing.T) { }) require.NoError(t, err) for _, s := range sc.Stages { - exec := scexec.NewExecutor(txn, descriptors, ti.lm.Codec(), noopBackfiller{}, nil, nil, nil, nil, nil, nil) + exec := scexec.NewExecutor(txn, descriptors, ti.lm.Codec(), noopBackfiller{}, nil, nil, nil, nil, nil, nil, + func(ctx context.Context, tableDesc catalog.TableDescriptor, indexes []catalog.Index, runHistoricalTxn sqlutil.HistoricalInternalExecTxnRunner, withFirstMutationPublic bool, gatherAllInvalid bool, execOverride sessiondata.InternalExecutorOverride) error { + return nil + }, func(ctx context.Context, codec keys.SQLCodec, tableDesc catalog.TableDescriptor, indexes []catalog.Index, runHistoricalTxn sqlutil.HistoricalInternalExecTxnRunner, gatherAllInvalid bool, execOverride sessiondata.InternalExecutorOverride) error { + return nil + }) require.NoError(t, exec.ExecuteOps(ctx, s.Ops, scexec.TestingKnobMetadata{})) after = s.After } @@ -389,7 +417,6 @@ func TestSchemaChanger(t *testing.T) { require.Len(t, parsed, 1) outputNodes, err := scbuild.Build(ctx, buildDeps, nil, parsed[0].AST.(*tree.AlterTable)) require.NoError(t, err) - for _, phase := range []scplan.Phase{ scplan.StatementPhase, scplan.PreCommitPhase, @@ -399,7 +426,7 @@ func TestSchemaChanger(t *testing.T) { }) require.NoError(t, err) for _, s := range sc.Stages { - require.NoError(t, scexec.NewExecutor(txn, descriptors, ti.lm.Codec(), noopBackfiller{}, nil, nil, nil, nil, nil, nil). + require.NoError(t, scexec.NewExecutor(txn, descriptors, ti.lm.Codec(), noopBackfiller{}, nil, nil, nil, nil, nil, nil, dummyValidateIndexes, dummyValidateInvertedIndexes). ExecuteOps(ctx, s.Ops, scexec.TestingKnobMetadata{})) ts = s.After } @@ -414,7 +441,7 @@ func TestSchemaChanger(t *testing.T) { }) require.NoError(t, err) for _, s := range sc.Stages { - exec := scexec.NewExecutor(txn, descriptors, ti.lm.Codec(), noopBackfiller{}, nil, nil, nil, nil, nil, nil) + exec := scexec.NewExecutor(txn, descriptors, ti.lm.Codec(), noopBackfiller{}, nil, nil, nil, nil, nil, nil, dummyValidateIndexes, dummyValidateInvertedIndexes) require.NoError(t, exec.ExecuteOps(ctx, s.Ops, scexec.TestingKnobMetadata{})) } return nil diff --git a/pkg/sql/schemachanger/scjob/job.go b/pkg/sql/schemachanger/scjob/job.go index c283d3a50ee8..1b7839c40d6c 100644 --- a/pkg/sql/schemachanger/scjob/job.go +++ b/pkg/sql/schemachanger/scjob/job.go @@ -117,7 +117,7 @@ func (n *newSchemaChangeResumer) Resume(ctx context.Context, execCtxI interface{ if err := scexec.NewExecutor( txn, descriptors, execCtx.ExecCfg().Codec, execCtx.ExecCfg().IndexBackfiller, jt, execCtx.ExecCfg().NewSchemaChangerTestingKnobs, execCtx.ExecCfg().JobRegistry, - execCtx.ExecCfg().InternalExecutor, execCtx.ExecCfg().Settings, &execCtx.ExtendedEvalContext().EvalContext).ExecuteOps(ctx, s.Ops, scexec.TestingKnobMetadata{ + execCtx.ExecCfg().InternalExecutor, execCtx.ExecCfg().Settings, &execCtx.ExtendedEvalContext().EvalContext, sql.ValidateForwardIndexes, sql.ValidateInvertedIndexes).ExecuteOps(ctx, s.Ops, scexec.TestingKnobMetadata{ Statements: n.job.Payload().Statement, Phase: scplan.PostCommitPhase, }); err != nil { diff --git a/pkg/sql/sqlutil/BUILD.bazel b/pkg/sql/sqlutil/BUILD.bazel index ccf251c5eb7d..234ce51cb482 100644 --- a/pkg/sql/sqlutil/BUILD.bazel +++ b/pkg/sql/sqlutil/BUILD.bazel @@ -7,6 +7,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/kv", + "//pkg/sql/catalog", "//pkg/sql/catalog/colinfo", "//pkg/sql/sem/tree", "//pkg/sql/sessiondata", diff --git a/pkg/sql/sqlutil/internal_executor.go b/pkg/sql/sqlutil/internal_executor.go index aa0d2f7d5253..fedcc0d28440 100644 --- a/pkg/sql/sqlutil/internal_executor.go +++ b/pkg/sql/sqlutil/internal_executor.go @@ -14,6 +14,7 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" @@ -142,6 +143,21 @@ type InternalExecutor interface { stmt string, qargs ...interface{}, ) (InternalRows, error) + + // WithSyntheticDescriptors sets the synthetic descriptors before running the + // the provided closure and resets them afterward. Used for queries/statements + // that need to use in-memory synthetic descriptors different from descriptors + // written to disk. These descriptors override all other descriptors on the + // immutable resolution path. + // + // Warning: Not safe for concurrent use from multiple goroutines. This API is + // flawed in that the internal executor is meant to function as a stateless + // wrapper, and creates a new connExecutor and descs.Collection on each query/ + // statement, so these descriptors should really be specified at a per-query/ + // statement level. See #34304. + WithSyntheticDescriptors( + descs []catalog.Descriptor, run func() error, + ) error } // InternalRows is an iterator interface that's exposed by the internal @@ -180,3 +196,11 @@ type InternalRows interface { type SessionBoundInternalExecutorFactory func( context.Context, *sessiondata.SessionData, ) InternalExecutor + +// InternalExecFn is the type of functions that operates using an internalExecutor. +type InternalExecFn func(ctx context.Context, txn *kv.Txn, ie InternalExecutor) error + +// HistoricalInternalExecTxnRunner is like historicalTxnRunner except it only +// passes the fn the exported InternalExecutor instead of the whole unexported +// extendedEvalContenxt, so it can be implemented outside pkg/sql. +type HistoricalInternalExecTxnRunner func(ctx context.Context, fn InternalExecFn) error