Skip to content

Commit

Permalink
sql: implement unique index validation inside the new
Browse files Browse the repository at this point in the history
Previously, the unique index validation inside the
new schema changer was stubbed out. This was inadequate
because during index addition operations we were not
properly confirming the unique constraints were properly
implemented. To address this, this patch adds support in
the new schema changer for the valdiation logic.

Release note: None
  • Loading branch information
fqazi committed Jul 30, 2021
1 parent 014c13e commit a850d20
Show file tree
Hide file tree
Showing 11 changed files with 205 additions and 90 deletions.
8 changes: 5 additions & 3 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/storage/cloud"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
Expand Down Expand Up @@ -1765,7 +1767,7 @@ func revalidateIndexes(

// We don't actually need the 'historical' read the way the schema change does
// since our table is offline.
var runner sql.HistoricalInternalExecTxnRunner = func(ctx context.Context, fn sql.InternalExecFn) error {
var runner sqlutil.HistoricalInternalExecTxnRunner = func(ctx context.Context, fn sqlutil.InternalExecFn) error {
return execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
ie := job.MakeSessionBoundInternalExecutor(ctx, sql.NewFakeSessionData(execCfg.SV())).(*sql.InternalExecutor)
return fn(ctx, txn, ie)
Expand Down Expand Up @@ -1793,7 +1795,7 @@ func revalidateIndexes(
}
}
if len(forward) > 0 {
if err := sql.ValidateForwardIndexes(ctx, tableDesc.MakePublic(), forward, runner, false, true); err != nil {
if err := sql.ValidateForwardIndexes(ctx, tableDesc.MakePublic(), forward, runner, false, true, sessiondata.InternalExecutorOverride{}); err != nil {
if invalid := (sql.InvalidIndexesError{}); errors.As(err, &invalid) {
invalidIndexes[tableDesc.ID] = invalid.Indexes
} else {
Expand All @@ -1802,7 +1804,7 @@ func revalidateIndexes(
}
}
if len(inverted) > 0 {
if err := sql.ValidateInvertedIndexes(ctx, execCfg.Codec, tableDesc.MakePublic(), inverted, runner, true); err != nil {
if err := sql.ValidateInvertedIndexes(ctx, execCfg.Codec, tableDesc.MakePublic(), inverted, runner, true, sessiondata.InternalExecutorOverride{}); err != nil {
if invalid := (sql.InvalidIndexesError{}); errors.As(err, &invalid) {
invalidIndexes[tableDesc.ID] = append(invalidIndexes[tableDesc.ID], invalid.Indexes...)
} else {
Expand Down
39 changes: 17 additions & 22 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -146,26 +147,18 @@ func (sc *SchemaChanger) makeFixedTimestampRunner(readAsOf hlc.Timestamp) histor
return runner
}

// InternalExecFn is the type of functions that operates using an internalExecutor.
type InternalExecFn func(ctx context.Context, txn *kv.Txn, ie *InternalExecutor) error

// HistoricalInternalExecTxnRunner is like historicalTxnRunner except it only
// passes the fn the exported InternalExecutor instead of the whole unexported
// extendedEvalContenxt, so it can be implemented outside pkg/sql.
type HistoricalInternalExecTxnRunner func(ctx context.Context, fn InternalExecFn) error

// makeFixedTimestampRunner creates a HistoricalTxnRunner suitable for use by the helpers.
func (sc *SchemaChanger) makeFixedTimestampInternalExecRunner(
readAsOf hlc.Timestamp,
) HistoricalInternalExecTxnRunner {
runner := func(ctx context.Context, retryable InternalExecFn) error {
) sqlutil.HistoricalInternalExecTxnRunner {
runner := func(ctx context.Context, retryable sqlutil.InternalExecFn) error {
return sc.fixedTimestampTxn(ctx, readAsOf, func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
// We need to re-create the evalCtx since the txn may retry.
ie := createSchemaChangeEvalCtx(
ctx, sc.execCfg, readAsOf, sc.ieFactory, descriptors,
).InternalExecutor.(*InternalExecutor)
).InternalExecutor.(sqlutil.InternalExecutor)
return retryable(ctx, txn, ie)
})
}
Expand Down Expand Up @@ -1506,12 +1499,12 @@ func (sc *SchemaChanger) validateIndexes(ctx context.Context) error {

if len(forwardIndexes) > 0 {
grp.GoCtx(func(ctx context.Context) error {
return ValidateForwardIndexes(ctx, tableDesc, forwardIndexes, runHistoricalTxn, true /* withFirstMutationPubic */, false /* gatherAllInvalid */)
return ValidateForwardIndexes(ctx, tableDesc, forwardIndexes, runHistoricalTxn, true /* withFirstMutationPubic */, false /* gatherAllInvalid */, sessiondata.InternalExecutorOverride{})
})
}
if len(invertedIndexes) > 0 {
grp.GoCtx(func(ctx context.Context) error {
return ValidateInvertedIndexes(ctx, sc.execCfg.Codec, tableDesc, invertedIndexes, runHistoricalTxn, false /* gatherAllInvalid */)
return ValidateInvertedIndexes(ctx, sc.execCfg.Codec, tableDesc, invertedIndexes, runHistoricalTxn, false /* gatherAllInvalid */, sessiondata.InternalExecutorOverride{})
})
}
if err := grp.Wait(); err != nil {
Expand Down Expand Up @@ -1542,8 +1535,9 @@ func ValidateInvertedIndexes(
codec keys.SQLCodec,
tableDesc catalog.TableDescriptor,
indexes []catalog.Index,
runHistoricalTxn HistoricalInternalExecTxnRunner,
runHistoricalTxn sqlutil.HistoricalInternalExecTxnRunner,
gatherAllInvalid bool,
execOverride sessiondata.InternalExecutorOverride,
) error {
grp := ctxgroup.WithContext(ctx)
invalid := make(chan descpb.IndexID, len(indexes))
Expand All @@ -1567,7 +1561,7 @@ func ValidateInvertedIndexes(
span := tableDesc.IndexSpan(codec, idx.GetID())
key := span.Key
endKey := span.EndKey
if err := runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, _ *InternalExecutor) error {
if err := runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, _ sqlutil.InternalExecutor) error {
for {
kvs, err := txn.Scan(ctx, key, endKey, 1000000)
if err != nil {
Expand Down Expand Up @@ -1631,7 +1625,7 @@ func ValidateInvertedIndexes(
colNameOrExpr = fmt.Sprintf("%q", col.ColName())
}

if err := runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, ie *InternalExecutor) error {
if err := runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error {
var stmt string
geoConfig := idx.GetGeoConfig()
if geoindex.IsEmptyConfig(&geoConfig) {
Expand All @@ -1651,7 +1645,7 @@ func ValidateInvertedIndexes(
stmt = fmt.Sprintf(`%s WHERE %s`, stmt, idx.GetPredicate())
}
return ie.WithSyntheticDescriptors([]catalog.Descriptor{tableDesc}, func() error {
row, err := ie.QueryRowEx(ctx, "verify-inverted-idx-count", txn, sessiondata.InternalExecutorOverride{}, stmt)
row, err := ie.QueryRowEx(ctx, "verify-inverted-idx-count", txn, execOverride, stmt)
if err != nil {
return err
}
Expand Down Expand Up @@ -1700,9 +1694,10 @@ func ValidateForwardIndexes(
ctx context.Context,
tableDesc catalog.TableDescriptor,
indexes []catalog.Index,
runHistoricalTxn HistoricalInternalExecTxnRunner,
runHistoricalTxn sqlutil.HistoricalInternalExecTxnRunner,
withFirstMutationPublic bool,
gatherAllInvalid bool,
execOverride sessiondata.InternalExecutorOverride,
) error {
grp := ctxgroup.WithContext(ctx)

Expand Down Expand Up @@ -1764,7 +1759,7 @@ func ValidateForwardIndexes(

// Retrieve the row count in the index.
var idxLen int64
if err := runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, ie *InternalExecutor) error {
if err := runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error {
query := fmt.Sprintf(`SELECT count(1) FROM [%d AS t]@[%d]`, desc.GetID(), idx.GetID())
// If the index is a partial index the predicate must be added
// as a filter to the query to force scanning the index.
Expand All @@ -1773,7 +1768,7 @@ func ValidateForwardIndexes(
}

return ie.WithSyntheticDescriptors([]catalog.Descriptor{desc}, func() error {
row, err := ie.QueryRowEx(ctx, "verify-idx-count", txn, sessiondata.InternalExecutorOverride{}, query)
row, err := ie.QueryRowEx(ctx, "verify-idx-count", txn, execOverride, query)
if err != nil {
return err
}
Expand Down Expand Up @@ -1856,7 +1851,7 @@ func ValidateForwardIndexes(
}

// Count the number of rows in the table.
if err := runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, ie *InternalExecutor) error {
if err := runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error {
var s strings.Builder
for _, idx := range indexes {
// For partial indexes, count the number of rows in the table
Expand All @@ -1872,7 +1867,7 @@ func ValidateForwardIndexes(
query := fmt.Sprintf(`SELECT count(1)%s FROM [%d AS t]@[%d]`, partialIndexCounts, desc.GetID(), desc.GetPrimaryIndexID())

return ie.WithSyntheticDescriptors([]catalog.Descriptor{desc}, func() error {
cnt, err := ie.QueryRowEx(ctx, "VERIFY INDEX", txn, sessiondata.InternalExecutorOverride{}, query)
cnt, err := ie.QueryRowEx(ctx, "VERIFY INDEX", txn, execOverride, query)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -370,7 +371,7 @@ func validateUniqueConstraint(
constraintName string,
columnIDs []descpb.ColumnID,
pred string,
ie *InternalExecutor,
ie sqlutil.InternalExecutor,
txn *kv.Txn,
) error {
query, colNames, err := duplicateRowQuery(
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2744,7 +2744,8 @@ func (ex *connExecutor) runPreCommitStages(ctx context.Context) error {
executor := scexec.NewExecutor(
ex.planner.txn, &ex.extraTxnState.descCollection, ex.server.cfg.Codec,
nil /* backfiller */, nil /* jobTracker */, ex.server.cfg.NewSchemaChangerTestingKnobs,
ex.server.cfg.JobRegistry, ex.planner.execCfg.InternalExecutor,ex.server.cfg.Settings, ex.planner.EvalContext(),
ex.server.cfg.JobRegistry, ex.planner.execCfg.InternalExecutor, ex.server.cfg.Settings, ex.planner.EvalContext(),
ValidateForwardIndexes, ValidateInvertedIndexes,
)
after, err := runNewSchemaChanger(
ctx,
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/schema_change_plan_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ func (s *schemaChangePlanNode) startExec(params runParams) error {
scs := p.extendedEvalCtx.SchemaChangerState
executor := scexec.NewExecutor(p.txn, p.Descriptors(), p.EvalContext().Codec,
nil /* backfiller */, nil /* jobTracker */, p.ExecCfg().NewSchemaChangerTestingKnobs,
params.extendedEvalCtx.ExecCfg.JobRegistry, params.p.execCfg.InternalExecutor, p.ExecCfg().Settings, p.EvalContext())
params.extendedEvalCtx.ExecCfg.JobRegistry, params.p.execCfg.InternalExecutor, p.ExecCfg().Settings, p.EvalContext(),
ValidateForwardIndexes, ValidateInvertedIndexes)
after, err := runNewSchemaChanger(
params.ctx, scplan.StatementPhase, s.plannedState, executor, scs.stmts,
)
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/schemachanger/scexec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ go_test(
deps = [
":scexec",
"//pkg/base",
"//pkg/keys",
"//pkg/kv",
"//pkg/security",
"//pkg/security/securitytest",
Expand All @@ -67,6 +68,7 @@ go_test(
"//pkg/sql/schemachanger/scpb",
"//pkg/sql/schemachanger/scplan",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sessiondatapb",
"//pkg/sql/sqlutil",
"//pkg/sql/types",
Expand Down
109 changes: 85 additions & 24 deletions pkg/sql/schemachanger/scexec/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
Expand All @@ -28,24 +29,51 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scop"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scplan"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

// ValidateForwardIndexesFn call back function used to validate
// a set of forward indexes.
type ValidateForwardIndexesFn func(
ctx context.Context,
tableDesc catalog.TableDescriptor,
indexes []catalog.Index,
runHistoricalTxn sqlutil.HistoricalInternalExecTxnRunner,
withFirstMutationPublic bool,
gatherAllInvalid bool,
execOverride sessiondata.InternalExecutorOverride,
) error

// ValidateInvertedIndexesFn call back function used to validate
// a set of inverted indexes.
type ValidateInvertedIndexesFn func(
ctx context.Context,
codec keys.SQLCodec,
tableDesc catalog.TableDescriptor,
indexes []catalog.Index,
runHistoricalTxn sqlutil.HistoricalInternalExecTxnRunner,
gatherAllInvalid bool,
execOverride sessiondata.InternalExecutorOverride,
) error

// An Executor executes ops generated during planning. It mostly holds
// dependencies for execution and has little additional logic of its own.
type Executor struct {
txn *kv.Txn
descsCollection *descs.Collection
codec keys.SQLCodec
indexBackfiller IndexBackfiller
jobTracker JobProgressTracker
testingKnobs *NewSchemaChangerTestingKnobs
jobRegistry *jobs.Registry
executor sqlutil.InternalExecutor
settings *cluster.Settings
evalCtx *tree.EvalContext
txn *kv.Txn
descsCollection *descs.Collection
codec keys.SQLCodec
indexBackfiller IndexBackfiller
jobTracker JobProgressTracker
testingKnobs *NewSchemaChangerTestingKnobs
jobRegistry *jobs.Registry
executor sqlutil.InternalExecutor
settings *cluster.Settings
evalCtx *tree.EvalContext
validateForwardIndexes ValidateForwardIndexesFn
validateInvertedIndexes ValidateInvertedIndexesFn
}

// NewExecutor creates a new Executor.
Expand All @@ -60,18 +88,22 @@ func NewExecutor(
executor sqlutil.InternalExecutor,
settings *cluster.Settings,
evalCtx *tree.EvalContext,
validateForwardIndexes ValidateForwardIndexesFn,
validateInvertedIndexes ValidateInvertedIndexesFn,
) *Executor {
return &Executor{
txn: txn,
descsCollection: descsCollection,
codec: codec,
indexBackfiller: backfiller,
jobTracker: tracker,
testingKnobs: testingKnobs,
jobRegistry: jobRegistry,
executor: executor,
settings: settings,
evalCtx: evalCtx,
txn: txn,
descsCollection: descsCollection,
codec: codec,
indexBackfiller: backfiller,
jobTracker: tracker,
testingKnobs: testingKnobs,
jobRegistry: jobRegistry,
executor: executor,
settings: settings,
evalCtx: evalCtx,
validateForwardIndexes: validateForwardIndexes,
validateInvertedIndexes: validateInvertedIndexes,
}
}

Expand Down Expand Up @@ -119,7 +151,39 @@ func (ex *Executor) ExecuteOps(
}

func (ex *Executor) executeValidationOps(ctx context.Context, execute []scop.Op) error {
log.Errorf(ctx, "not implemented")
for _, op := range execute {
switch op := op.(type) {
case *scop.ValidateUniqueIndex:
table, err := ex.descsCollection.GetImmutableTableByID(ctx, ex.txn, op.TableID, tree.ObjectLookupFlagsWithRequiredTableKind(tree.ResolveRequireTableDesc))
if err != nil {
return err
}
index, err := table.FindIndexWithID(op.IndexID)
if err != nil {
return err
}
// Setup a new transaction with the current timestamp.
txnRunner := func(ctx context.Context, fn sqlutil.InternalExecFn) error {
validationTxn := ex.txn.DB().NewTxn(ctx, "validation")
validationTxn.SetFixedTimestamp(ctx, ex.txn.DB().Clock().Now())
return fn(ctx, validationTxn, ex.executor)
}
// Execute the validation operation as a root user.
execOverride := sessiondata.InternalExecutorOverride{
User: security.RootUserName(),
}
if index.GetType() == descpb.IndexDescriptor_FORWARD {
err = ex.validateForwardIndexes(ctx, table, []catalog.Index{index}, txnRunner, true, false, execOverride)
} else {
err = ex.validateInvertedIndexes(ctx, ex.codec, table, []catalog.Index{index}, txnRunner, false, execOverride)
}
return err
case *scop.ValidateCheckConstraint:
log.Errorf(ctx, "not implemented")
default:
panic("unimplemented")
}
}
return nil
}

Expand Down Expand Up @@ -284,9 +348,6 @@ func UpdateDescriptorJobIDs(
if desc.DescriptorType() != catalog.Table {
continue
}
if err != nil {
return err
}
// Currently all "locking" schema changes are on tables. This will probably
// need to be expanded at least to types.
table, err := descriptors.GetMutableTableByID(ctx, txn, id,
Expand Down
Loading

0 comments on commit a850d20

Please sign in to comment.