Skip to content

Commit

Permalink
sql: implement unique index validation inside the new
Browse files Browse the repository at this point in the history
Previously, the unique index validation inside the
new schema changer was stubbed out. This was inadequate
because during index addition operations we were not
properly confirming the unique constraints were properly
implemented. To address this, this patch adds support in
the new schema changer for the valdiation logic.

Release note: None
  • Loading branch information
fqazi committed Nov 8, 2021
1 parent f5762cb commit ce4dbe1
Show file tree
Hide file tree
Showing 46 changed files with 1,005 additions and 904 deletions.
8 changes: 5 additions & 3 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
Expand Down Expand Up @@ -1798,7 +1800,7 @@ func revalidateIndexes(

// We don't actually need the 'historical' read the way the schema change does
// since our table is offline.
var runner sql.HistoricalInternalExecTxnRunner = func(ctx context.Context, fn sql.InternalExecFn) error {
var runner sqlutil.HistoricalInternalExecTxnRunner = func(ctx context.Context, fn sqlutil.InternalExecFn) error {
return execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
ie := job.MakeSessionBoundInternalExecutor(ctx, sql.NewFakeSessionData(execCfg.SV())).(*sql.InternalExecutor)
return fn(ctx, txn, ie)
Expand Down Expand Up @@ -1826,7 +1828,7 @@ func revalidateIndexes(
}
}
if len(forward) > 0 {
if err := sql.ValidateForwardIndexes(ctx, tableDesc.MakePublic(), forward, runner, false, true); err != nil {
if err := sql.ValidateForwardIndexes(ctx, tableDesc.MakePublic(), forward, runner, false, true, sessiondata.InternalExecutorOverride{}); err != nil {
if invalid := (sql.InvalidIndexesError{}); errors.As(err, &invalid) {
invalidIndexes[tableDesc.ID] = invalid.Indexes
} else {
Expand All @@ -1835,7 +1837,7 @@ func revalidateIndexes(
}
}
if len(inverted) > 0 {
if err := sql.ValidateInvertedIndexes(ctx, execCfg.Codec, tableDesc.MakePublic(), inverted, runner, true); err != nil {
if err := sql.ValidateInvertedIndexes(ctx, execCfg.Codec, tableDesc.MakePublic(), inverted, runner, true, sessiondata.InternalExecutorOverride{}); err != nil {
if invalid := (sql.InvalidIndexesError{}); errors.As(err, &invalid) {
invalidIndexes[tableDesc.ID] = append(invalidIndexes[tableDesc.ID], invalid.Indexes...)
} else {
Expand Down
2 changes: 0 additions & 2 deletions pkg/ccl/partitionccl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scbuild"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
Expand Down Expand Up @@ -612,5 +611,4 @@ func selectPartitionExprsByName(

func init() {
sql.CreatePartitioningCCL = createPartitioning
scbuild.CreatePartitioningCCL = createPartitioning
}
99 changes: 77 additions & 22 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
"github.com/cockroachdb/cockroach/pkg/sql/rowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -155,26 +157,18 @@ func (sc *SchemaChanger) makeFixedTimestampRunner(readAsOf hlc.Timestamp) histor
return runner
}

// InternalExecFn is the type of functions that operates using an internalExecutor.
type InternalExecFn func(ctx context.Context, txn *kv.Txn, ie *InternalExecutor) error

// HistoricalInternalExecTxnRunner is like historicalTxnRunner except it only
// passes the fn the exported InternalExecutor instead of the whole unexported
// extendedEvalContenxt, so it can be implemented outside pkg/sql.
type HistoricalInternalExecTxnRunner func(ctx context.Context, fn InternalExecFn) error

// makeFixedTimestampRunner creates a HistoricalTxnRunner suitable for use by the helpers.
func (sc *SchemaChanger) makeFixedTimestampInternalExecRunner(
readAsOf hlc.Timestamp,
) HistoricalInternalExecTxnRunner {
runner := func(ctx context.Context, retryable InternalExecFn) error {
) sqlutil.HistoricalInternalExecTxnRunner {
runner := func(ctx context.Context, retryable sqlutil.InternalExecFn) error {
return sc.fixedTimestampTxn(ctx, readAsOf, func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
// We need to re-create the evalCtx since the txn may retry.
ie := createSchemaChangeEvalCtx(
ctx, sc.execCfg, readAsOf, sc.ieFactory, descriptors,
).InternalExecutor.(*InternalExecutor)
).InternalExecutor.(sqlutil.InternalExecutor)
return retryable(ctx, txn, ie)
})
}
Expand Down Expand Up @@ -1510,12 +1504,12 @@ func (sc *SchemaChanger) validateIndexes(ctx context.Context) error {

if len(forwardIndexes) > 0 {
grp.GoCtx(func(ctx context.Context) error {
return ValidateForwardIndexes(ctx, tableDesc, forwardIndexes, runHistoricalTxn, true /* withFirstMutationPubic */, false /* gatherAllInvalid */)
return ValidateForwardIndexes(ctx, tableDesc, forwardIndexes, runHistoricalTxn, true /* withFirstMutationPubic */, false /* gatherAllInvalid */, sessiondata.InternalExecutorOverride{})
})
}
if len(invertedIndexes) > 0 {
grp.GoCtx(func(ctx context.Context) error {
return ValidateInvertedIndexes(ctx, sc.execCfg.Codec, tableDesc, invertedIndexes, runHistoricalTxn, false /* gatherAllInvalid */)
return ValidateInvertedIndexes(ctx, sc.execCfg.Codec, tableDesc, invertedIndexes, runHistoricalTxn, false /* gatherAllInvalid */, sessiondata.InternalExecutorOverride{})
})
}
if err := grp.Wait(); err != nil {
Expand Down Expand Up @@ -1546,8 +1540,9 @@ func ValidateInvertedIndexes(
codec keys.SQLCodec,
tableDesc catalog.TableDescriptor,
indexes []catalog.Index,
runHistoricalTxn HistoricalInternalExecTxnRunner,
runHistoricalTxn sqlutil.HistoricalInternalExecTxnRunner,
gatherAllInvalid bool,
execOverride sessiondata.InternalExecutorOverride,
) error {
grp := ctxgroup.WithContext(ctx)
invalid := make(chan descpb.IndexID, len(indexes))
Expand All @@ -1571,7 +1566,7 @@ func ValidateInvertedIndexes(
span := tableDesc.IndexSpan(codec, idx.GetID())
key := span.Key
endKey := span.EndKey
if err := runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, _ *InternalExecutor) error {
if err := runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, _ sqlutil.InternalExecutor) error {
for {
kvs, err := txn.Scan(ctx, key, endKey, 1000000)
if err != nil {
Expand Down Expand Up @@ -1635,7 +1630,7 @@ func ValidateInvertedIndexes(
colNameOrExpr = fmt.Sprintf("%q", col.ColName())
}

if err := runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, ie *InternalExecutor) error {
if err := runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error {
var stmt string
geoConfig := idx.GetGeoConfig()
if geoindex.IsEmptyConfig(&geoConfig) {
Expand All @@ -1655,7 +1650,7 @@ func ValidateInvertedIndexes(
stmt = fmt.Sprintf(`%s WHERE %s`, stmt, idx.GetPredicate())
}
return ie.WithSyntheticDescriptors([]catalog.Descriptor{tableDesc}, func() error {
row, err := ie.QueryRowEx(ctx, "verify-inverted-idx-count", txn, sessiondata.InternalExecutorOverride{}, stmt)
row, err := ie.QueryRowEx(ctx, "verify-inverted-idx-count", txn, execOverride, stmt)
if err != nil {
return err
}
Expand Down Expand Up @@ -1688,6 +1683,65 @@ func ValidateInvertedIndexes(
return nil
}

type indexValidator struct {
db *kv.DB
codec keys.SQLCodec
executor *InternalExecutor
}

// ValidateForwardIndexes checks that the indexes have entries for all the rows.
func (iv indexValidator) ValidateForwardIndexes(
ctx context.Context,
tableDesc catalog.TableDescriptor,
indexes []catalog.Index,
withFirstMutationPublic bool,
gatherAllInvalid bool,
override sessiondata.InternalExecutorOverride,
) error {
// Set up a new transaction with the current timestamp.
txnRunner := func(ctx context.Context, fn sqlutil.InternalExecFn) error {
validationTxn := iv.db.NewTxn(ctx, "validation")
err := validationTxn.SetFixedTimestamp(ctx, iv.db.Clock().Now())
if err != nil {
return err
}
return fn(ctx, validationTxn, iv.executor)
}
return ValidateForwardIndexes(ctx, tableDesc, indexes, txnRunner, withFirstMutationPublic, gatherAllInvalid, override)
}

// ValidateInvertedIndexes checks that the indexes have entries for all the rows.
func (iv indexValidator) ValidateInvertedIndexes(
ctx context.Context,
tableDesc catalog.TableDescriptor,
indexes []catalog.Index,
gatherAllInvalid bool,
override sessiondata.InternalExecutorOverride,
) error {
// Set up a new transaction with the current timestamp.
txnRunner := func(ctx context.Context, fn sqlutil.InternalExecFn) error {
validationTxn := iv.db.NewTxn(ctx, "validation")
err := validationTxn.SetFixedTimestamp(ctx, iv.db.Clock().Now())
if err != nil {
return err
}
return fn(ctx, validationTxn, iv.executor)
}
return ValidateInvertedIndexes(ctx, iv.codec, tableDesc, indexes, txnRunner, gatherAllInvalid, override)
}

// MakeIndexValidator creates a IndexValidator interface
// for the new schema changer.
func MakeIndexValidator(
db *kv.DB, codec keys.SQLCodec, executor *InternalExecutor,
) scexec.IndexValidator {
return indexValidator{
db: db,
codec: codec,
executor: executor,
}
}

// ValidateForwardIndexes checks that the indexes have entries for all the rows.
//
// This operates over multiple goroutines concurrently and is thus not
Expand All @@ -1704,9 +1758,10 @@ func ValidateForwardIndexes(
ctx context.Context,
tableDesc catalog.TableDescriptor,
indexes []catalog.Index,
runHistoricalTxn HistoricalInternalExecTxnRunner,
runHistoricalTxn sqlutil.HistoricalInternalExecTxnRunner,
withFirstMutationPublic bool,
gatherAllInvalid bool,
execOverride sessiondata.InternalExecutorOverride,
) error {
grp := ctxgroup.WithContext(ctx)

Expand Down Expand Up @@ -1768,7 +1823,7 @@ func ValidateForwardIndexes(

// Retrieve the row count in the index.
var idxLen int64
if err := runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, ie *InternalExecutor) error {
if err := runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error {
query := fmt.Sprintf(`SELECT count(1) FROM [%d AS t]@[%d]`, desc.GetID(), idx.GetID())
// If the index is a partial index the predicate must be added
// as a filter to the query to force scanning the index.
Expand All @@ -1777,7 +1832,7 @@ func ValidateForwardIndexes(
}

return ie.WithSyntheticDescriptors([]catalog.Descriptor{desc}, func() error {
row, err := ie.QueryRowEx(ctx, "verify-idx-count", txn, sessiondata.InternalExecutorOverride{}, query)
row, err := ie.QueryRowEx(ctx, "verify-idx-count", txn, execOverride, query)
if err != nil {
return err
}
Expand Down Expand Up @@ -1860,7 +1915,7 @@ func ValidateForwardIndexes(
}

// Count the number of rows in the table.
if err := runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, ie *InternalExecutor) error {
if err := runHistoricalTxn(ctx, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error {
var s strings.Builder
for _, idx := range indexes {
// For partial indexes, count the number of rows in the table
Expand All @@ -1876,7 +1931,7 @@ func ValidateForwardIndexes(
query := fmt.Sprintf(`SELECT count(1)%s FROM [%d AS t]@[%d]`, partialIndexCounts, desc.GetID(), desc.GetPrimaryIndexID())

return ie.WithSyntheticDescriptors([]catalog.Descriptor{desc}, func() error {
cnt, err := ie.QueryRowEx(ctx, "VERIFY INDEX", txn, sessiondata.InternalExecutorOverride{}, query)
cnt, err := ie.QueryRowEx(ctx, "VERIFY INDEX", txn, execOverride, query)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -370,7 +371,7 @@ func validateUniqueConstraint(
constraintName string,
columnIDs []descpb.ColumnID,
pred string,
ie *InternalExecutor,
ie sqlutil.InternalExecutor,
txn *kv.Txn,
) error {
query, colNames, err := duplicateRowQuery(
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2929,6 +2929,8 @@ func (ex *connExecutor) runPreCommitStages(ctx context.Context) error {
&ex.extraTxnState.descCollection,
ex.server.cfg.JobRegistry,
ex.server.cfg.IndexBackfiller,
MakeIndexValidator(ex.planner.Txn().DB(), ex.server.cfg.Codec, ex.planner.execCfg.InternalExecutor),
MakeCCLCallbacks(ex.server.cfg.Settings, ex.planner.EvalContext()),
ex.server.cfg.NewSchemaChangerTestingKnobs,
scs.stmts,
scop.PreCommitPhase,
Expand Down
Loading

0 comments on commit ce4dbe1

Please sign in to comment.