Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "sql: add support for create index inside new schema changer" #72584

Merged
merged 1 commit into from
Nov 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,6 @@ ALL_TESTS = [
"//pkg/sql/schemachanger/rel:rel_test",
"//pkg/sql/schemachanger/scbuild:scbuild_test",
"//pkg/sql/schemachanger/scexec:scexec_test",
"//pkg/sql/schemachanger/scgraph:scgraph_test",
"//pkg/sql/schemachanger/scplan/deprules:deprules_test",
"//pkg/sql/schemachanger/scplan:scplan_test",
"//pkg/sql/schemachanger/screl:screl_test",
Expand Down
8 changes: 3 additions & 5 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
Expand Down Expand Up @@ -1800,7 +1798,7 @@ func revalidateIndexes(

// We don't actually need the 'historical' read the way the schema change does
// since our table is offline.
var runner sqlutil.HistoricalInternalExecTxnRunner = func(ctx context.Context, fn sqlutil.InternalExecFn) error {
var runner sql.HistoricalInternalExecTxnRunner = func(ctx context.Context, fn sql.InternalExecFn) error {
return execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
ie := job.MakeSessionBoundInternalExecutor(ctx, sql.NewFakeSessionData(execCfg.SV())).(*sql.InternalExecutor)
return fn(ctx, txn, ie)
Expand Down Expand Up @@ -1828,7 +1826,7 @@ func revalidateIndexes(
}
}
if len(forward) > 0 {
if err := sql.ValidateForwardIndexes(ctx, tableDesc.MakePublic(), forward, runner, false, true, sessiondata.InternalExecutorOverride{}); err != nil {
if err := sql.ValidateForwardIndexes(ctx, tableDesc.MakePublic(), forward, runner, false, true); err != nil {
if invalid := (sql.InvalidIndexesError{}); errors.As(err, &invalid) {
invalidIndexes[tableDesc.ID] = invalid.Indexes
} else {
Expand All @@ -1837,7 +1835,7 @@ func revalidateIndexes(
}
}
if len(inverted) > 0 {
if err := sql.ValidateInvertedIndexes(ctx, execCfg.Codec, tableDesc.MakePublic(), inverted, runner, true, sessiondata.InternalExecutorOverride{}); err != nil {
if err := sql.ValidateInvertedIndexes(ctx, execCfg.Codec, tableDesc.MakePublic(), inverted, runner, true); err != nil {
if invalid := (sql.InvalidIndexesError{}); errors.As(err, &invalid) {
invalidIndexes[tableDesc.ID] = append(invalidIndexes[tableDesc.ID], invalid.Indexes...)
} else {
Expand Down
13 changes: 0 additions & 13 deletions pkg/ccl/logictestccl/testdata/logic_test/new_schema_changer

This file was deleted.

1 change: 0 additions & 1 deletion pkg/kv/kvserver/protectedts/ptstorage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ go_test(
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/colinfo",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
Expand Down
7 changes: 0 additions & 7 deletions pkg/kv/kvserver/protectedts/ptstorage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
Expand Down Expand Up @@ -802,9 +801,3 @@ func (ie *wrappedInternalExecutor) setErrFunc(f func(statement string) error) {
defer ie.mu.Unlock()
ie.mu.errFunc = f
}

func (ie *wrappedInternalExecutor) WithSyntheticDescriptors(
descs []catalog.Descriptor, run func() error,
) error {
panic("not implemented")
}
1 change: 0 additions & 1 deletion pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ go_library(
"schema_change_cluster_setting.go",
"schema_change_plan_node.go",
"schema_changer.go",
"schema_changer_ccl.go",
"schema_changer_metrics.go",
"schema_changer_state.go",
"scrub.go",
Expand Down
99 changes: 22 additions & 77 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
"github.com/cockroachdb/cockroach/pkg/sql/rowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -157,18 +155,26 @@ func (sc *SchemaChanger) makeFixedTimestampRunner(readAsOf hlc.Timestamp) histor
return runner
}

// InternalExecFn is the type of functions that operates using an internalExecutor.
type InternalExecFn func(ctx context.Context, txn *kv.Txn, ie *InternalExecutor) error

// HistoricalInternalExecTxnRunner is like historicalTxnRunner except it only
// passes the fn the exported InternalExecutor instead of the whole unexported
// extendedEvalContenxt, so it can be implemented outside pkg/sql.
type HistoricalInternalExecTxnRunner func(ctx context.Context, fn InternalExecFn) error

// makeFixedTimestampRunner creates a HistoricalTxnRunner suitable for use by the helpers.
func (sc *SchemaChanger) makeFixedTimestampInternalExecRunner(
readAsOf hlc.Timestamp,
) sqlutil.HistoricalInternalExecTxnRunner {
runner := func(ctx context.Context, retryable sqlutil.InternalExecFn) error {
) HistoricalInternalExecTxnRunner {
runner := func(ctx context.Context, retryable InternalExecFn) error {
return sc.fixedTimestampTxn(ctx, readAsOf, func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
// We need to re-create the evalCtx since the txn may retry.
ie := createSchemaChangeEvalCtx(
ctx, sc.execCfg, readAsOf, sc.ieFactory, descriptors,
).InternalExecutor.(sqlutil.InternalExecutor)
).InternalExecutor.(*InternalExecutor)
return retryable(ctx, txn, ie)
})
}
Expand Down Expand Up @@ -1414,12 +1420,12 @@ func (sc *SchemaChanger) validateIndexes(ctx context.Context) error {

if len(forwardIndexes) > 0 {
grp.GoCtx(func(ctx context.Context) error {
return ValidateForwardIndexes(ctx, tableDesc, forwardIndexes, runHistoricalTxn, true /* withFirstMutationPubic */, false /* gatherAllInvalid */, sessiondata.InternalExecutorOverride{})
return ValidateForwardIndexes(ctx, tableDesc, forwardIndexes, runHistoricalTxn, true /* withFirstMutationPubic */, false /* gatherAllInvalid */)
})
}
if len(invertedIndexes) > 0 {
grp.GoCtx(func(ctx context.Context) error {
return ValidateInvertedIndexes(ctx, sc.execCfg.Codec, tableDesc, invertedIndexes, runHistoricalTxn, false /* gatherAllInvalid */, sessiondata.InternalExecutorOverride{})
return ValidateInvertedIndexes(ctx, sc.execCfg.Codec, tableDesc, invertedIndexes, runHistoricalTxn, false /* gatherAllInvalid */)
})
}
if err := grp.Wait(); err != nil {
Expand Down Expand Up @@ -1450,9 +1456,8 @@ func ValidateInvertedIndexes(
codec keys.SQLCodec,
tableDesc catalog.TableDescriptor,
indexes []catalog.Index,
runHistoricalTxn sqlutil.HistoricalInternalExecTxnRunner,
runHistoricalTxn HistoricalInternalExecTxnRunner,
gatherAllInvalid bool,
execOverride sessiondata.InternalExecutorOverride,
) error {
grp := ctxgroup.WithContext(ctx)
invalid := make(chan descpb.IndexID, len(indexes))
Expand All @@ -1476,7 +1481,7 @@ func ValidateInvertedIndexes(
span := tableDesc.IndexSpan(codec, idx.GetID())
key := span.Key
endKey := span.EndKey
if err := runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, _ sqlutil.InternalExecutor) error {
if err := runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, _ *InternalExecutor) error {
for {
kvs, err := txn.Scan(ctx, key, endKey, 1000000)
if err != nil {
Expand Down Expand Up @@ -1540,7 +1545,7 @@ func ValidateInvertedIndexes(
colNameOrExpr = fmt.Sprintf("%q", col.ColName())
}

if err := runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error {
if err := runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, ie *InternalExecutor) error {
var stmt string
geoConfig := idx.GetGeoConfig()
if geoindex.IsEmptyConfig(&geoConfig) {
Expand All @@ -1560,7 +1565,7 @@ func ValidateInvertedIndexes(
stmt = fmt.Sprintf(`%s WHERE %s`, stmt, idx.GetPredicate())
}
return ie.WithSyntheticDescriptors([]catalog.Descriptor{tableDesc}, func() error {
row, err := ie.QueryRowEx(ctx, "verify-inverted-idx-count", txn, execOverride, stmt)
row, err := ie.QueryRowEx(ctx, "verify-inverted-idx-count", txn, sessiondata.InternalExecutorOverride{}, stmt)
if err != nil {
return err
}
Expand Down Expand Up @@ -1593,65 +1598,6 @@ func ValidateInvertedIndexes(
return nil
}

type indexValidator struct {
db *kv.DB
codec keys.SQLCodec
executor *InternalExecutor
}

// ValidateForwardIndexes checks that the indexes have entries for all the rows.
func (iv indexValidator) ValidateForwardIndexes(
ctx context.Context,
tableDesc catalog.TableDescriptor,
indexes []catalog.Index,
withFirstMutationPublic bool,
gatherAllInvalid bool,
override sessiondata.InternalExecutorOverride,
) error {
// Set up a new transaction with the current timestamp.
txnRunner := func(ctx context.Context, fn sqlutil.InternalExecFn) error {
validationTxn := iv.db.NewTxn(ctx, "validation")
err := validationTxn.SetFixedTimestamp(ctx, iv.db.Clock().Now())
if err != nil {
return err
}
return fn(ctx, validationTxn, iv.executor)
}
return ValidateForwardIndexes(ctx, tableDesc, indexes, txnRunner, withFirstMutationPublic, gatherAllInvalid, override)
}

// ValidateInvertedIndexes checks that the indexes have entries for all the rows.
func (iv indexValidator) ValidateInvertedIndexes(
ctx context.Context,
tableDesc catalog.TableDescriptor,
indexes []catalog.Index,
gatherAllInvalid bool,
override sessiondata.InternalExecutorOverride,
) error {
// Set up a new transaction with the current timestamp.
txnRunner := func(ctx context.Context, fn sqlutil.InternalExecFn) error {
validationTxn := iv.db.NewTxn(ctx, "validation")
err := validationTxn.SetFixedTimestamp(ctx, iv.db.Clock().Now())
if err != nil {
return err
}
return fn(ctx, validationTxn, iv.executor)
}
return ValidateInvertedIndexes(ctx, iv.codec, tableDesc, indexes, txnRunner, gatherAllInvalid, override)
}

// MakeIndexValidator creates a IndexValidator interface
// for the new schema changer.
func MakeIndexValidator(
db *kv.DB, codec keys.SQLCodec, executor *InternalExecutor,
) scexec.IndexValidator {
return indexValidator{
db: db,
codec: codec,
executor: executor,
}
}

// ValidateForwardIndexes checks that the indexes have entries for all the rows.
//
// This operates over multiple goroutines concurrently and is thus not
Expand All @@ -1668,10 +1614,9 @@ func ValidateForwardIndexes(
ctx context.Context,
tableDesc catalog.TableDescriptor,
indexes []catalog.Index,
runHistoricalTxn sqlutil.HistoricalInternalExecTxnRunner,
runHistoricalTxn HistoricalInternalExecTxnRunner,
withFirstMutationPublic bool,
gatherAllInvalid bool,
execOverride sessiondata.InternalExecutorOverride,
) error {
grp := ctxgroup.WithContext(ctx)

Expand Down Expand Up @@ -1733,7 +1678,7 @@ func ValidateForwardIndexes(

// Retrieve the row count in the index.
var idxLen int64
if err := runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error {
if err := runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, ie *InternalExecutor) error {
query := fmt.Sprintf(`SELECT count(1) FROM [%d AS t]@[%d]`, desc.GetID(), idx.GetID())
// If the index is a partial index the predicate must be added
// as a filter to the query to force scanning the index.
Expand All @@ -1742,7 +1687,7 @@ func ValidateForwardIndexes(
}

return ie.WithSyntheticDescriptors([]catalog.Descriptor{desc}, func() error {
row, err := ie.QueryRowEx(ctx, "verify-idx-count", txn, execOverride, query)
row, err := ie.QueryRowEx(ctx, "verify-idx-count", txn, sessiondata.InternalExecutorOverride{}, query)
if err != nil {
return err
}
Expand Down Expand Up @@ -1825,7 +1770,7 @@ func ValidateForwardIndexes(
}

// Count the number of rows in the table.
if err := runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error {
if err := runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, ie *InternalExecutor) error {
var s strings.Builder
for _, idx := range indexes {
// For partial indexes, count the number of rows in the table
Expand All @@ -1841,7 +1786,7 @@ func ValidateForwardIndexes(
query := fmt.Sprintf(`SELECT count(1)%s FROM [%d AS t]@[%d]`, partialIndexCounts, desc.GetID(), desc.GetPrimaryIndexID())

return ie.WithSyntheticDescriptors([]catalog.Descriptor{desc}, func() error {
cnt, err := ie.QueryRowEx(ctx, "VERIFY INDEX", txn, execOverride, query)
cnt, err := ie.QueryRowEx(ctx, "VERIFY INDEX", txn, sessiondata.InternalExecutorOverride{}, query)
if err != nil {
return err
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/sql/catalog/tabledesc/structured.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (desc *wrapper) KeysPerRow(indexID descpb.IndexID) (int, error) {
return len(desc.Families), nil
}

// BuildIndexName returns an index name that is not equal to any
// buildIndexName returns an index name that is not equal to any
// of tableDesc's indexes, roughly following Postgres's conventions for naming
// anonymous indexes. For example:
//
Expand All @@ -154,7 +154,9 @@ func (desc *wrapper) KeysPerRow(indexID descpb.IndexID) (int, error) {
// CREATE INDEX ON t ((a + b), c, lower(d))
// => t_expr_c_expr1_idx
//
func BuildIndexName(tableDesc *Mutable, idx *descpb.IndexDescriptor) (string, error) {
func buildIndexName(tableDesc *Mutable, index catalog.Index) (string, error) {
idx := index.IndexDesc()

// An index name has a segment for the table name, each key column, and a
// final word (either "idx" or "key").
segments := make([]string, 0, len(idx.KeyColumnNames)+2)
Expand Down Expand Up @@ -682,7 +684,7 @@ func (desc *Mutable) allocateIndexIDs(columnNames map[string]descpb.ColumnID) er
// Assign names to unnamed indexes.
err := catalog.ForEachDeletableNonPrimaryIndex(desc, func(idx catalog.Index) error {
if len(idx.GetName()) == 0 {
name, err := BuildIndexName(desc, idx.IndexDesc())
name, err := buildIndexName(desc, idx)
if err != nil {
return err
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/sql/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -371,7 +370,7 @@ func validateUniqueConstraint(
constraintName string,
columnIDs []descpb.ColumnID,
pred string,
ie sqlutil.InternalExecutor,
ie *InternalExecutor,
txn *kv.Txn,
) error {
query, colNames, err := duplicateRowQuery(
Expand Down
2 changes: 0 additions & 2 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2938,8 +2938,6 @@ func (ex *connExecutor) runPreCommitStages(ctx context.Context) error {
&ex.extraTxnState.descCollection,
ex.server.cfg.JobRegistry,
ex.server.cfg.IndexBackfiller,
MakeIndexValidator(ex.planner.Txn().DB(), ex.server.cfg.Codec, ex.planner.execCfg.InternalExecutor),
MakeCCLCallbacks(ex.server.cfg.Settings, ex.planner.EvalContext()),
ex.server.cfg.NewSchemaChangerTestingKnobs,
scs.stmts,
scop.PreCommitPhase,
Expand Down
Loading