diff --git a/pkg/ccl/partitionccl/BUILD.bazel b/pkg/ccl/partitionccl/BUILD.bazel index 609e4d7ed27d..2a12e6b6a055 100644 --- a/pkg/ccl/partitionccl/BUILD.bazel +++ b/pkg/ccl/partitionccl/BUILD.bazel @@ -41,6 +41,7 @@ go_test( "drop_test.go", "main_test.go", "partition_test.go", + "scrub_test.go", "zone_test.go", ], embed = [":partitionccl"], @@ -73,6 +74,9 @@ go_test( "//pkg/sql/lexbase", "//pkg/sql/parser", "//pkg/sql/randgen", + "//pkg/sql/rowenc", + "//pkg/sql/scrub", + "//pkg/sql/scrub/scrubtestutils", "//pkg/sql/sem/eval", "//pkg/sql/sem/tree", "//pkg/sql/tests", diff --git a/pkg/ccl/partitionccl/scrub_test.go b/pkg/ccl/partitionccl/scrub_test.go new file mode 100644 index 000000000000..422ae0f289c1 --- /dev/null +++ b/pkg/ccl/partitionccl/scrub_test.go @@ -0,0 +1,521 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package partitionccl + +import ( + "context" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/scrub" + "github.com/cockroachdb/cockroach/pkg/sql/scrub/scrubtestutils" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" +) + +// TestScrubUniqueIndex tests SCRUB on a table that violates a UNIQUE +// constraint. +func TestScrubUniqueIndex(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + utilccl.TestingEnableEnterprise() + s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(context.Background()) + + // Create the table and row entries. + if _, err := db.Exec(` +CREATE DATABASE db; +SET experimental_enable_implicit_column_partitioning = true; +CREATE TABLE db.t ( + id INT PRIMARY KEY, + id2 INT UNIQUE, + partition_by INT +) PARTITION ALL BY LIST (partition_by) ( + PARTITION one VALUES IN (1), + PARTITION two VALUES IN (2) +); + +INSERT INTO db.t VALUES (1, 2, 1), (2, 3, 2); +`); err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Overwrite the value on partition one with a duplicate unique index value. + values := []tree.Datum{tree.NewDInt(1), tree.NewDInt(3), tree.NewDInt(1)} + tableDesc := desctestutils.TestingGetPublicTableDescriptor(kvDB, keys.SystemSQLCodec, "db", "t") + primaryIndex := tableDesc.GetPrimaryIndex() + var colIDtoRowIndex catalog.TableColMap + colIDtoRowIndex.Set(tableDesc.PublicColumns()[0].GetID(), 0) + colIDtoRowIndex.Set(tableDesc.PublicColumns()[1].GetID(), 1) + colIDtoRowIndex.Set(tableDesc.PublicColumns()[2].GetID(), 2) + primaryIndexKey, err := rowenc.EncodePrimaryIndex(keys.SystemSQLCodec, tableDesc, primaryIndex, colIDtoRowIndex, values, true) + if err != nil { + t.Fatalf("unexpected error %s", err) + } + if len(primaryIndexKey) != 1 { + t.Fatalf("expected 1 index entry, got %d", len(primaryIndexKey)) + } + + // Add the primary key via the KV API. + if err := kvDB.Put(context.Background(), primaryIndexKey[0].Key, &primaryIndexKey[0].Value); err != nil { + t.Fatalf("unexpected error: %s", err) + } + + secondaryIndex := tableDesc.PublicNonPrimaryIndexes()[0] + secondaryIndexKey, err := rowenc.EncodeSecondaryIndex(keys.SystemSQLCodec, tableDesc, secondaryIndex, colIDtoRowIndex, values, true) + if err != nil { + t.Fatalf("unexpected error %s", err) + } + if len(secondaryIndexKey) != 1 { + t.Fatalf("expected 1 index entry, got %d", len(secondaryIndexKey)) + } + + // Add the secondary key via the KV API. + if err := kvDB.Put(context.Background(), secondaryIndexKey[0].Key, &secondaryIndexKey[0].Value); err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Run SCRUB. + exp := []scrubtestutils.ExpectedScrubResult{ + { + ErrorType: scrub.UniqueConstraintViolation, + Database: "db", + Table: "t", + PrimaryKey: "(1, 1)", + DetailsRegex: `{"constraint_name": "t_id2_key", "row_data": {"id": "1", "id2": "3", "partition_by": "1"}`, + }, + { + ErrorType: scrub.UniqueConstraintViolation, + Database: "db", + Table: "t", + PrimaryKey: "(2, 2)", + DetailsRegex: `{"constraint_name": "t_id2_key", "row_data": {"id": "2", "id2": "3", "partition_by": "2"}`, + }, + } + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE db.t WITH OPTIONS CONSTRAINT ALL`, exp) + time.Sleep(1 * time.Millisecond) + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE db.t AS OF SYSTEM TIME '-1ms' WITH OPTIONS CONSTRAINT ALL`, exp) +} + +// TestScrubUniqueIndexWithNulls tests SCRUB on a table with NULLs, which does +// not violate UNIQUE constraints. +func TestScrubUniqueIndexWithNulls(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + utilccl.TestingEnableEnterprise() + s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(context.Background()) + + // Create the table and row entries. + if _, err := db.Exec(` +CREATE DATABASE db; +SET experimental_enable_implicit_column_partitioning = true; +CREATE TABLE db.t ( + id INT PRIMARY KEY, + id2 INT UNIQUE, + partition_by INT +) PARTITION ALL BY LIST (partition_by) ( + PARTITION one VALUES IN (1), + PARTITION two VALUES IN (2) +); + +INSERT INTO db.t VALUES (1, 2, 1), (2, NULL, 2); +`); err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Overwrite the value on partition one with a NULL index value. + values := []tree.Datum{tree.NewDInt(1), tree.DNull, tree.NewDInt(1)} + tableDesc := desctestutils.TestingGetPublicTableDescriptor(kvDB, keys.SystemSQLCodec, "db", "t") + primaryIndex := tableDesc.GetPrimaryIndex() + var colIDtoRowIndex catalog.TableColMap + colIDtoRowIndex.Set(tableDesc.PublicColumns()[0].GetID(), 0) + colIDtoRowIndex.Set(tableDesc.PublicColumns()[1].GetID(), 1) + colIDtoRowIndex.Set(tableDesc.PublicColumns()[2].GetID(), 2) + primaryIndexKey, err := rowenc.EncodePrimaryIndex(keys.SystemSQLCodec, tableDesc, primaryIndex, colIDtoRowIndex, values, true) + if err != nil { + t.Fatalf("unexpected error %s", err) + } + if len(primaryIndexKey) != 1 { + t.Fatalf("expected 1 index entry, got %d", len(primaryIndexKey)) + } + + // Add the primary key via the KV API. + if err := kvDB.Put(context.Background(), primaryIndexKey[0].Key, &primaryIndexKey[0].Value); err != nil { + t.Fatalf("unexpected error: %s", err) + } + + secondaryIndex := tableDesc.PublicNonPrimaryIndexes()[0] + secondaryIndexKey, err := rowenc.EncodeSecondaryIndex(keys.SystemSQLCodec, tableDesc, secondaryIndex, colIDtoRowIndex, values, true) + if err != nil { + t.Fatalf("unexpected error %s", err) + } + if len(secondaryIndexKey) != 1 { + t.Fatalf("expected 1 index entry, got %d", len(secondaryIndexKey)) + } + + // Add the secondary key via the KV API. + if err := kvDB.Put(context.Background(), secondaryIndexKey[0].Key, &secondaryIndexKey[0].Value); err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Run SCRUB. + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE db.t WITH OPTIONS CONSTRAINT ALL`, []scrubtestutils.ExpectedScrubResult{}) + time.Sleep(1 * time.Millisecond) + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE db.t AS OF SYSTEM TIME '-1ms' WITH OPTIONS CONSTRAINT ALL`, []scrubtestutils.ExpectedScrubResult{}) +} + +// TestScrubUniqueIndexExplicitPartition tests SCRUB on a table that uses an +// explicit partitioning scheme, and therefore should not violate unique +// constraints if one of the two values in the constraint is duplicated, even +// if the individual value has a unique constraint. +func TestScrubUniqueIndexExplicitPartition(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + utilccl.TestingEnableEnterprise() + s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(context.Background()) + + // Create the table and row entries. + if _, err := db.Exec(` +CREATE DATABASE db; +CREATE TABLE db.t ( + id INT PRIMARY KEY, + id2 INT, + UNIQUE (id, id2) PARTITION BY LIST (id) ( + PARTITION one VALUES IN (1), + PARTITION two VALUES IN (2) + ) +); + + +INSERT INTO db.t VALUES (1, 3), (2, 4); +`); err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Overwrite the value on partition one with a duplicate unique value. + values := []tree.Datum{tree.NewDInt(1), tree.NewDInt(4)} + tableDesc := desctestutils.TestingGetPublicTableDescriptor(kvDB, keys.SystemSQLCodec, "db", "t") + primaryIndex := tableDesc.GetPrimaryIndex() + var colIDtoRowIndex catalog.TableColMap + colIDtoRowIndex.Set(tableDesc.PublicColumns()[0].GetID(), 0) + colIDtoRowIndex.Set(tableDesc.PublicColumns()[1].GetID(), 1) + primaryIndexKey, err := rowenc.EncodePrimaryIndex(keys.SystemSQLCodec, tableDesc, primaryIndex, colIDtoRowIndex, values, true) + if err != nil { + t.Fatalf("unexpected error %s", err) + } + if len(primaryIndexKey) != 1 { + t.Fatalf("expected 1 index entry, got %d", len(primaryIndexKey)) + } + + // Add the primary key via the KV API. + if err := kvDB.Put(context.Background(), primaryIndexKey[0].Key, &primaryIndexKey[0].Value); err != nil { + t.Fatalf("unexpected error: %s", err) + } + secondaryIndex := tableDesc.PublicNonPrimaryIndexes()[0] + secondaryIndexKey, err := rowenc.EncodeSecondaryIndex( + keys.SystemSQLCodec, tableDesc, secondaryIndex, colIDtoRowIndex, values, true /* includeEmpty */) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if len(secondaryIndexKey) != 1 { + t.Fatalf("expected 1 index entry, got %d. got %#v", len(secondaryIndexKey), secondaryIndexKey) + } + if err := kvDB.Put(context.Background(), secondaryIndexKey[0].Key, &secondaryIndexKey[0].Value); err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Run SCRUB. + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE db.t WITH OPTIONS CONSTRAINT ALL`, []scrubtestutils.ExpectedScrubResult{}) + time.Sleep(1 * time.Millisecond) + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE db.t AS OF SYSTEM TIME '-1ms' WITH OPTIONS CONSTRAINT ALL`, []scrubtestutils.ExpectedScrubResult{}) +} + +// TestScrubPartialUniqueIndex tests SCRUB on a table that violates a UNIQUE +// constraint on a partial unique index. +func TestScrubPartialUniqueIndex(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + utilccl.TestingEnableEnterprise() + s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(context.Background()) + + // Create the table and row entries. + if _, err := db.Exec(` +CREATE DATABASE db; +SET experimental_enable_implicit_column_partitioning = true; +CREATE TABLE db.t ( + id INT PRIMARY KEY, + id2 INT, + partition_by INT, + UNIQUE INDEX idx (id2) WHERE id2 > 4 +) PARTITION ALL BY LIST (partition_by) ( + PARTITION one VALUES IN (1), + PARTITION two VALUES IN (2) +); + + +INSERT INTO db.t VALUES (1, 2, 1), (2, 3, 2), (3, 5, 1), (4, 6, 2); +`); err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Overwrite the values on partition 1 with duplicate idx2 values: one that + // falls under the unique index constraint, and one that does not. + valuesConstrained := []tree.Datum{tree.NewDInt(3), tree.NewDInt(6), tree.NewDInt(1)} + valuesNotConstrained := []tree.Datum{tree.NewDInt(1), tree.NewDInt(3), tree.NewDInt(1)} + tableDesc := desctestutils.TestingGetPublicTableDescriptor(kvDB, keys.SystemSQLCodec, "db", "t") + primaryIndex := tableDesc.GetPrimaryIndex() + secondaryIndex := tableDesc.PublicNonPrimaryIndexes()[0] + var colIDtoRowIndex catalog.TableColMap + colIDtoRowIndex.Set(tableDesc.PublicColumns()[0].GetID(), 0) + colIDtoRowIndex.Set(tableDesc.PublicColumns()[1].GetID(), 1) + colIDtoRowIndex.Set(tableDesc.PublicColumns()[2].GetID(), 2) + + // Modify the primary index with a duplicate constrained value. + primaryIndexKey, err := rowenc.EncodePrimaryIndex(keys.SystemSQLCodec, tableDesc, primaryIndex, colIDtoRowIndex, valuesConstrained, true) + if err != nil { + t.Fatalf("unexpected error %s", err) + } + if len(primaryIndexKey) != 1 { + t.Fatalf("expected 1 index entry, got %d", len(primaryIndexKey)) + } + if err := kvDB.Put(context.Background(), primaryIndexKey[0].Key, &primaryIndexKey[0].Value); err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Modify the secondary index with a duplicate constrained value. + secondaryIndexKey, err := rowenc.EncodeSecondaryIndex( + keys.SystemSQLCodec, tableDesc, secondaryIndex, colIDtoRowIndex, valuesConstrained, true /* includeEmpty */) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if len(secondaryIndexKey) != 1 { + t.Fatalf("expected 1 index entry, got %d. got %#v", len(secondaryIndexKey), secondaryIndexKey) + } + if err := kvDB.Put(context.Background(), secondaryIndexKey[0].Key, &secondaryIndexKey[0].Value); err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Modify the primary index with a duplicate value not in the constrained + // range. + primaryIndexKey, err = rowenc.EncodePrimaryIndex(keys.SystemSQLCodec, tableDesc, primaryIndex, colIDtoRowIndex, valuesNotConstrained, true) + if err != nil { + t.Fatalf("unexpected error %s", err) + } + if len(primaryIndexKey) != 1 { + t.Fatalf("expected 1 index entry, got %d", len(primaryIndexKey)) + } + if err := kvDB.Put(context.Background(), primaryIndexKey[0].Key, &primaryIndexKey[0].Value); err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Modify the secondary index with a duplicate value not in the constrained + // range. + secondaryIndexKey, err = rowenc.EncodeSecondaryIndex( + keys.SystemSQLCodec, tableDesc, secondaryIndex, colIDtoRowIndex, valuesNotConstrained, true /* includeEmpty */) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + if len(secondaryIndexKey) != 1 { + t.Fatalf("expected 1 index entry, got %d. got %#v", len(secondaryIndexKey), secondaryIndexKey) + } + if err := kvDB.Put(context.Background(), secondaryIndexKey[0].Key, &secondaryIndexKey[0].Value); err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Run SCRUB. + exp := []scrubtestutils.ExpectedScrubResult{ + { + ErrorType: scrub.UniqueConstraintViolation, + Database: "db", + Table: "t", + PrimaryKey: "(1, 3)", + DetailsRegex: `{"constraint_name": "idx", "row_data": {"id": "3", "id2": "6", "partition_by": "1"}`, + }, + { + ErrorType: scrub.UniqueConstraintViolation, + Database: "db", + Table: "t", + PrimaryKey: "(2, 4)", + DetailsRegex: `{"constraint_name": "idx", "row_data": {"id": "4", "id2": "6", "partition_by": "2"}`, + }, + } + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE db.t WITH OPTIONS CONSTRAINT ALL`, exp) + time.Sleep(1 * time.Millisecond) + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE db.t AS OF SYSTEM TIME '-1ms' WITH OPTIONS CONSTRAINT ALL`, exp) +} + +// TestScrubUniqueIndexMultiCol tests SCRUB on a table that violates a UNIQUE +// constraint for multiple columns. +func TestScrubUniqueIndexMultiCol(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + utilccl.TestingEnableEnterprise() + s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(context.Background()) + + // Create the table and row entries. + if _, err := db.Exec(` +CREATE DATABASE db; +SET experimental_enable_implicit_column_partitioning = true; +CREATE TABLE db.t ( + pk INT PRIMARY KEY, + id INT, + id2 INT, + partition_by INT, + UNIQUE (id, id2) +) PARTITION ALL BY LIST (partition_by) ( + PARTITION one VALUES IN (1), + PARTITION two VALUES IN (2) +); + + +INSERT INTO db.t VALUES (1, 1, 2, 1); +`); err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Insert a row on partition 2 with a duplicate unique index value. + values := []tree.Datum{tree.NewDInt(2), tree.NewDInt(1), tree.NewDInt(2), tree.NewDInt(2)} + tableDesc := desctestutils.TestingGetPublicTableDescriptor(kvDB, keys.SystemSQLCodec, "db", "t") + primaryIndex := tableDesc.GetPrimaryIndex() + var colIDtoRowIndex catalog.TableColMap + colIDtoRowIndex.Set(tableDesc.PublicColumns()[0].GetID(), 0) + colIDtoRowIndex.Set(tableDesc.PublicColumns()[1].GetID(), 1) + colIDtoRowIndex.Set(tableDesc.PublicColumns()[2].GetID(), 2) + colIDtoRowIndex.Set(tableDesc.PublicColumns()[3].GetID(), 3) + primaryIndexKey, err := rowenc.EncodePrimaryIndex(keys.SystemSQLCodec, tableDesc, primaryIndex, colIDtoRowIndex, values, true) + if err != nil { + t.Fatalf("unexpected error %s", err) + } + if len(primaryIndexKey) != 1 { + t.Fatalf("expected 1 index entry, got %d", len(primaryIndexKey)) + } + + // Add the primary key via the KV API. + if err := kvDB.Put(context.Background(), primaryIndexKey[0].Key, &primaryIndexKey[0].Value); err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Modify the secondary index with the duplicate value. + secondaryIndex := tableDesc.PublicNonPrimaryIndexes()[0] + secondaryIndexKey, err := rowenc.EncodeSecondaryIndex( + keys.SystemSQLCodec, tableDesc, secondaryIndex, colIDtoRowIndex, values, true /* includeEmpty */) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if len(secondaryIndexKey) != 1 { + t.Fatalf("expected 1 index entry, got %d. got %#v", len(secondaryIndexKey), secondaryIndexKey) + } + if err := kvDB.Put(context.Background(), secondaryIndexKey[0].Key, &secondaryIndexKey[0].Value); err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Run SCRUB. + exp := []scrubtestutils.ExpectedScrubResult{ + { + ErrorType: scrub.UniqueConstraintViolation, + Database: "db", + Table: "t", + PrimaryKey: "(1, 1)", + DetailsRegex: `{"constraint_name": "t_id_id2_key", "row_data": {"id": "1", "id2": "2", "partition_by": "1", "pk": "1"}`, + }, + { + ErrorType: scrub.UniqueConstraintViolation, + Database: "db", + Table: "t", + PrimaryKey: "(2, 2)", + DetailsRegex: `{"constraint_name": "t_id_id2_key", "row_data": {"id": "1", "id2": "2", "partition_by": "2", "pk": "2"}`, + }, + } + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE db.t WITH OPTIONS CONSTRAINT ALL`, exp) + time.Sleep(1 * time.Millisecond) + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE db.t AS OF SYSTEM TIME '-1ms' WITH OPTIONS CONSTRAINT ALL`, exp) +} + +// TestScrubPrimaryKey tests SCRUB on a table that violates a PRIMARY KEY +// constraint. +func TestScrubPrimaryKey(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + utilccl.TestingEnableEnterprise() + s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(context.Background()) + + // Create the table and row entries. + if _, err := db.Exec(` +CREATE DATABASE db; +SET experimental_enable_implicit_column_partitioning = true; +CREATE TABLE db.t ( + id INT PRIMARY KEY, + partition_by INT +) PARTITION ALL BY LIST (partition_by) ( + PARTITION one VALUES IN (1), + PARTITION two VALUES IN (2) +); + + +INSERT INTO db.t VALUES (1, 1); +`); err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Insert a duplicate primary key into a different partition. + values := []tree.Datum{tree.NewDInt(1), tree.NewDInt(2)} + tableDesc := desctestutils.TestingGetPublicTableDescriptor(kvDB, keys.SystemSQLCodec, "db", "t") + primaryIndex := tableDesc.GetPrimaryIndex() + var colIDtoRowIndex catalog.TableColMap + colIDtoRowIndex.Set(tableDesc.PublicColumns()[0].GetID(), 0) + colIDtoRowIndex.Set(tableDesc.PublicColumns()[1].GetID(), 1) + primaryIndexKey, err := rowenc.EncodePrimaryIndex(keys.SystemSQLCodec, tableDesc, primaryIndex, colIDtoRowIndex, values, true) + if err != nil { + t.Fatalf("unexpected error %s", err) + } + if len(primaryIndexKey) != 1 { + t.Fatalf("expected 1 index entry, got %d", len(primaryIndexKey)) + } + + // Insert primary key via KV. + if err := kvDB.Put(context.Background(), primaryIndexKey[0].Key, &primaryIndexKey[0].Value); err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Run SCRUB. + exp := []scrubtestutils.ExpectedScrubResult{ + { + ErrorType: scrub.UniqueConstraintViolation, + Database: "db", + Table: "t", + PrimaryKey: "(1, 1)", + DetailsRegex: `{"constraint_name": "t_pkey", "row_data": {"id": "1", "partition_by": "1"}`, + }, + { + ErrorType: scrub.UniqueConstraintViolation, + Database: "db", + Table: "t", + PrimaryKey: "(2, 1)", + DetailsRegex: `{"constraint_name": "t_pkey", "row_data": {"id": "1", "partition_by": "2"}`, + }, + } + + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE db.t WITH OPTIONS CONSTRAINT ALL`, exp) + time.Sleep(1 * time.Millisecond) + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE db.t AS OF SYSTEM TIME '-1ms' WITH OPTIONS CONSTRAINT ALL`, exp) +} diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 0a94db212498..9a7f9e6c6162 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -181,6 +181,7 @@ go_library( "scrub_constraint.go", "scrub_fk.go", "scrub_index.go", + "scrub_unique_constraint.go", "select_name_resolution.go", "sequence.go", "sequence_select.go", @@ -675,6 +676,7 @@ go_test( "//pkg/sql/rowinfra", "//pkg/sql/schemachanger/scexec", "//pkg/sql/scrub", + "//pkg/sql/scrub/scrubtestutils", "//pkg/sql/sem/builtins", "//pkg/sql/sem/catconstants", "//pkg/sql/sem/catid", diff --git a/pkg/sql/scrub.go b/pkg/sql/scrub.go index d3567500db22..dca97daeaa5f 100644 --- a/pkg/sql/scrub.go +++ b/pkg/sql/scrub.go @@ -394,8 +394,7 @@ func createIndexCheckOperations( // createConstraintCheckOperations will return all of the constraints // that are being checked. If constraintNames is nil, then all // constraints are returned. -// TODO(joey): Only SQL CHECK and FOREIGN KEY constraints are -// implemented. +// Only SQL CHECK, FOREIGN KEY, and UNIQUE constraints are supported. func createConstraintCheckOperations( ctx context.Context, p *planner, @@ -443,6 +442,13 @@ func createConstraintCheckOperations( constraint, asOf, )) + case descpb.ConstraintTypePK, descpb.ConstraintTypeUnique: + results = append(results, newSQLUniqueConstraintCheckOperation( + tableName, + tableDesc, + constraint, + asOf, + )) } } return results, nil diff --git a/pkg/sql/scrub/errors.go b/pkg/sql/scrub/errors.go index 9ef8abbaf8b3..4a942c72cc93 100644 --- a/pkg/sql/scrub/errors.go +++ b/pkg/sql/scrub/errors.go @@ -42,6 +42,9 @@ const ( // ForeignKeyConstraintViolation occurs when a row in a // table is violating a foreign key constraint. ForeignKeyConstraintViolation = "foreign_key_violation" + // UniqueConstraintViolation occurs when a row in a table is violating + // a unique constraint. + UniqueConstraintViolation = "unique_constraint_violation" ) // Error contains the details on the scrub error that was caught. diff --git a/pkg/sql/scrub/scrubtestutils/BUILD.bazel b/pkg/sql/scrub/scrubtestutils/BUILD.bazel new file mode 100644 index 000000000000..32f44603c1fd --- /dev/null +++ b/pkg/sql/scrub/scrubtestutils/BUILD.bazel @@ -0,0 +1,9 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "scrubtestutils", + srcs = ["scrub_test_helper.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/sql/scrub/scrubtestutils", + visibility = ["//visibility:public"], + deps = ["//pkg/testutils/sqlutils"], +) diff --git a/pkg/sql/scrub/scrubtestutils/scrub_test_helper.go b/pkg/sql/scrub/scrubtestutils/scrub_test_helper.go new file mode 100644 index 000000000000..e82ff877060f --- /dev/null +++ b/pkg/sql/scrub/scrubtestutils/scrub_test_helper.go @@ -0,0 +1,88 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package scrubtestutils + +import ( + gosql "database/sql" + "regexp" + "testing" + + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" +) + +// ExpectedScrubResult contains details about errors that are expected during +// SCRUB testing. +type ExpectedScrubResult struct { + ErrorType string + Database string + Table string + PrimaryKey string + Repaired bool + DetailsRegex string +} + +// CheckScrubResult compares the results from running SCRUB with the expected +// results and throws an error if they do not match. +func CheckScrubResult(t *testing.T, ress []sqlutils.ScrubResult, exps []ExpectedScrubResult) { + t.Helper() + + for i := 0; i < len(exps); i++ { + res := ress[i] + exp := exps[i] + if res.ErrorType != exp.ErrorType { + t.Errorf("expected %q error, instead got: %s", exp.ErrorType, res.ErrorType) + } + + if res.Database != exp.Database { + t.Errorf("expected database %q, got %q", exp.Database, res.Database) + } + + if res.Table != exp.Table { + t.Errorf("expected table %q, got %q", exp.Table, res.Table) + } + + if res.PrimaryKey != exp.PrimaryKey { + t.Errorf("expected primary key %q, got %q", exp.PrimaryKey, res.PrimaryKey) + } + if res.Repaired != exp.Repaired { + t.Fatalf("expected repaired %v, got %v", exp.Repaired, res.Repaired) + } + + if matched, err := regexp.MatchString(exp.DetailsRegex, res.Details); err != nil { + t.Fatal(err) + } else if !matched { + t.Errorf("expected error details to contain `%s`, got `%s`", exp.DetailsRegex, res.Details) + } + } +} + +// RunScrub runs a SCRUB statement and checks that it returns exactly one scrub +// result and that it matches the expected result. +func RunScrub(t *testing.T, db *gosql.DB, scrubStmt string, exp []ExpectedScrubResult) { + t.Helper() + + // Run SCRUB and find the violation created. + rows, err := db.Query(scrubStmt) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + defer rows.Close() + + results, err := sqlutils.GetScrubResultRows(rows) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + if len(results) != len(exp) { + t.Fatalf("expected %d results, got %d. got %#v", len(exp), len(results), results) + } + CheckScrubResult(t, results, exp) +} diff --git a/pkg/sql/scrub_test.go b/pkg/sql/scrub_test.go index 6e89a796df60..b5a5f2fd0cbf 100644 --- a/pkg/sql/scrub_test.go +++ b/pkg/sql/scrub_test.go @@ -12,9 +12,7 @@ package sql_test import ( "context" - gosql "database/sql" "fmt" - "regexp" "strings" "testing" "time" @@ -28,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/rowenc/valueside" "github.com/cockroachdb/cockroach/pkg/sql/scrub" + "github.com/cockroachdb/cockroach/pkg/sql/scrub/scrubtestutils" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" @@ -68,23 +67,25 @@ INSERT INTO t."tEst" VALUES (10, 20); } // Run SCRUB and find the index errors we created. - exp := expectedScrubResult{ - ErrorType: scrub.MissingIndexEntryError, - Database: "t", - Table: "tEst", - PrimaryKey: "(10)", - Repaired: false, - DetailsRegex: `"v": "20"`, - } - runScrub(t, db, `EXPERIMENTAL SCRUB TABLE t."tEst" WITH OPTIONS INDEX ALL`, exp) + exp := []scrubtestutils.ExpectedScrubResult{ + { + ErrorType: scrub.MissingIndexEntryError, + Database: "t", + Table: "tEst", + PrimaryKey: "(10)", + Repaired: false, + DetailsRegex: `"v": "20"`, + }, + } + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE t."tEst" WITH OPTIONS INDEX ALL`, exp) // Run again with AS OF SYSTEM TIME. time.Sleep(1 * time.Millisecond) - runScrub(t, db, `EXPERIMENTAL SCRUB TABLE t."tEst" AS OF SYSTEM TIME '-1ms' WITH OPTIONS INDEX ALL`, exp) + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE t."tEst" AS OF SYSTEM TIME '-1ms' WITH OPTIONS INDEX ALL`, exp) // Verify that AS OF SYSTEM TIME actually operates in the past. ts := r.QueryStr(t, `SELECT cluster_logical_timestamp()`)[0][0] r.Exec(t, `DELETE FROM t."tEst"`) - runScrub( + scrubtestutils.RunScrub( t, db, fmt.Sprintf( `EXPERIMENTAL SCRUB TABLE t."tEst" AS OF SYSTEM TIME '%s' WITH OPTIONS INDEX ALL`, ts, ), @@ -117,15 +118,17 @@ INSERT INTO t.test VALUES (2, 15); t.Fatalf("unexpected error: %s", err.Error()) } // Run SCRUB and find the index errors we created. - exp := expectedScrubResult{ - ErrorType: scrub.MissingIndexEntryError, - Database: "t", - Table: "test", - PrimaryKey: "(2)", - Repaired: false, - DetailsRegex: `"v": "15"`, + exp := []scrubtestutils.ExpectedScrubResult{ + { + ErrorType: scrub.MissingIndexEntryError, + Database: "t", + Table: "test", + PrimaryKey: "(2)", + Repaired: false, + DetailsRegex: `"v": "15"`, + }, } - runScrub(t, db, `EXPERIMENTAL SCRUB TABLE t.test WITH OPTIONS INDEX ALL`, exp) + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE t.test WITH OPTIONS INDEX ALL`, exp) }) t.Run("dangling index entry that matches predicate", func(t *testing.T) { r.Exec(t, ` @@ -143,15 +146,17 @@ INSERT INTO t.test VALUES (2, 15); t.Fatalf("unexpected error: %s", err.Error()) } // Run SCRUB and find the index errors we created. - exp := expectedScrubResult{ - ErrorType: scrub.DanglingIndexReferenceError, - Database: "t", - Table: "test", - PrimaryKey: "(3)", - Repaired: false, - DetailsRegex: `"v": "25"`, + exp := []scrubtestutils.ExpectedScrubResult{ + { + ErrorType: scrub.DanglingIndexReferenceError, + Database: "t", + Table: "test", + PrimaryKey: "(3)", + Repaired: false, + DetailsRegex: `"v": "25"`, + }, } - runScrub(t, db, `EXPERIMENTAL SCRUB TABLE t.test WITH OPTIONS INDEX ALL`, exp) + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE t.test WITH OPTIONS INDEX ALL`, exp) }) t.Run("dangling index entry that does not match predicate", func(t *testing.T) { r.Exec(t, ` @@ -169,15 +174,17 @@ INSERT INTO t.test VALUES (2, 15); t.Fatalf("unexpected error: %s", err.Error()) } // Run SCRUB and find the index errors we created. - exp := expectedScrubResult{ - ErrorType: scrub.DanglingIndexReferenceError, - Database: "t", - Table: "test", - PrimaryKey: "(3)", - Repaired: false, - DetailsRegex: `"v": "7"`, + exp := []scrubtestutils.ExpectedScrubResult{ + { + ErrorType: scrub.DanglingIndexReferenceError, + Database: "t", + Table: "test", + PrimaryKey: "(3)", + Repaired: false, + DetailsRegex: `"v": "7"`, + }, } - runScrub(t, db, `EXPERIMENTAL SCRUB TABLE t.test WITH OPTIONS INDEX ALL`, exp) + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE t.test WITH OPTIONS INDEX ALL`, exp) }) t.Run("index entry that does not match predicate", func(t *testing.T) { r.Exec(t, ` @@ -195,15 +202,17 @@ INSERT INTO t.test VALUES (2, 15); t.Fatalf("unexpected error: %s", err.Error()) } // Run SCRUB and find the index errors we created. - exp := expectedScrubResult{ - ErrorType: scrub.DanglingIndexReferenceError, - Database: "t", - Table: "test", - PrimaryKey: "(1)", - Repaired: false, - DetailsRegex: `"v": "5"`, + exp := []scrubtestutils.ExpectedScrubResult{ + { + ErrorType: scrub.DanglingIndexReferenceError, + Database: "t", + Table: "test", + PrimaryKey: "(1)", + Repaired: false, + DetailsRegex: `"v": "5"`, + }, } - runScrub(t, db, `EXPERIMENTAL SCRUB TABLE t.test WITH OPTIONS INDEX ALL`, exp) + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE t.test WITH OPTIONS INDEX ALL`, exp) }) } @@ -575,22 +584,24 @@ func TestScrubFKConstraintFKMissing(t *testing.T) { } // Run SCRUB and find the FOREIGN KEY violation created. - exp := expectedScrubResult{ - ErrorType: scrub.ForeignKeyConstraintViolation, - Database: "t", - Table: "child", - PrimaryKey: "(10)", - DetailsRegex: `{"constraint_name": "child_parent_id_fkey", "row_data": {"child_id": "10", "parent_id": "0"}}`, - } - runScrub(t, db, `EXPERIMENTAL SCRUB TABLE t.child WITH OPTIONS CONSTRAINT ALL`, exp) + exp := []scrubtestutils.ExpectedScrubResult{ + { + ErrorType: scrub.ForeignKeyConstraintViolation, + Database: "t", + Table: "child", + PrimaryKey: "(10)", + DetailsRegex: `{"constraint_name": "child_parent_id_fkey", "row_data": {"child_id": "10", "parent_id": "0"}}`, + }, + } + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE t.child WITH OPTIONS CONSTRAINT ALL`, exp) // Run again with AS OF SYSTEM TIME. time.Sleep(1 * time.Millisecond) - runScrub(t, db, `EXPERIMENTAL SCRUB TABLE t.child AS OF SYSTEM TIME '-1ms' WITH OPTIONS CONSTRAINT ALL`, exp) + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE t.child AS OF SYSTEM TIME '-1ms' WITH OPTIONS CONSTRAINT ALL`, exp) // Verify that AS OF SYSTEM TIME actually operates in the past. ts := r.QueryStr(t, `SELECT cluster_logical_timestamp()`)[0][0] r.Exec(t, "INSERT INTO t.parent VALUES (0)") - runScrub( + scrubtestutils.RunScrub( t, db, fmt.Sprintf( `EXPERIMENTAL SCRUB TABLE t.child AS OF SYSTEM TIME '%s' WITH OPTIONS CONSTRAINT ALL`, ts, ), @@ -633,75 +644,79 @@ ALTER TABLE t.child ADD FOREIGN KEY (parent_id, parent_id2) REFERENCES t.parent } // Run SCRUB and find the FOREIGN KEY violation created. - exp := expectedScrubResult{ - ErrorType: scrub.ForeignKeyConstraintViolation, - Database: "t", - Table: "child", - PrimaryKey: "(11)", - DetailsRegex: `{"constraint_name": "child_parent_id_parent_id2_fkey", "row_data": {"child_id": "11", "parent_id": "1337", "parent_id2": "NULL"}}`, - } - runScrub(t, db, `EXPERIMENTAL SCRUB TABLE t.child WITH OPTIONS CONSTRAINT ALL`, exp) + exp := []scrubtestutils.ExpectedScrubResult{ + { + ErrorType: scrub.ForeignKeyConstraintViolation, + Database: "t", + Table: "child", + PrimaryKey: "(11)", + DetailsRegex: `{"constraint_name": "child_parent_id_parent_id2_fkey", "row_data": {"child_id": "11", "parent_id": "1337", "parent_id2": "NULL"}}`, + }, + } + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE t.child WITH OPTIONS CONSTRAINT ALL`, exp) time.Sleep(1 * time.Millisecond) - runScrub(t, db, `EXPERIMENTAL SCRUB TABLE t.child AS OF SYSTEM TIME '-1ms' WITH OPTIONS CONSTRAINT ALL`, exp) -} - -type expectedScrubResult struct { - ErrorType string - Database string - Table string - PrimaryKey string - Repaired bool - DetailsRegex string + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE t.child AS OF SYSTEM TIME '-1ms' WITH OPTIONS CONSTRAINT ALL`, exp) } -func checkScrubResult(t *testing.T, res sqlutils.ScrubResult, exp expectedScrubResult) { - t.Helper() - - if res.ErrorType != exp.ErrorType { - t.Errorf("expected %q error, instead got: %s", exp.ErrorType, res.ErrorType) - } - - if res.Database != exp.Database { - t.Errorf("expected database %q, got %q", exp.Database, res.Database) - } - - if res.Table != exp.Table { - t.Errorf("expected table %q, got %q", exp.Table, res.Table) - } - - if res.PrimaryKey != exp.PrimaryKey { - t.Errorf("expected primary key %q, got %q", exp.PrimaryKey, res.PrimaryKey) - } - if res.Repaired != exp.Repaired { - t.Fatalf("expected repaired %v, got %v", exp.Repaired, res.Repaired) - } - - if matched, err := regexp.MatchString(exp.DetailsRegex, res.Details); err != nil { - t.Fatal(err) - } else if !matched { - t.Errorf("expected error details to contain `%s`, got `%s`", exp.DetailsRegex, res.Details) - } -} +// TestScrubUniqueWithoutIndex tests SCRUB on a table that violates a +// UNIQUE WITHOUT INDEX constraint. +func TestScrubUniqueWithoutIndex(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(context.Background()) -// runScrub runs a SCRUB statement and checks that it returns exactly one scrub -// result and that it matches the expected result. -func runScrub(t *testing.T, db *gosql.DB, scrubStmt string, exp expectedScrubResult) { - t.Helper() + // Create the table and row entries. + if _, err := db.Exec(` +CREATE DATABASE db; +SET experimental_enable_unique_without_index_constraints = true; +CREATE TABLE db.t ( + id INT PRIMARY KEY, + id2 INT UNIQUE WITHOUT INDEX +); - // Run SCRUB and find the FOREIGN KEY violation created. - rows, err := db.Query(scrubStmt) - if err != nil { +INSERT INTO db.t VALUES (1, 2), (2,3); +`); err != nil { t.Fatalf("unexpected error: %s", err) } - defer rows.Close() - results, err := sqlutils.GetScrubResultRows(rows) + // Overwrite one of the values with a duplicate unique value. + values := []tree.Datum{tree.NewDInt(1), tree.NewDInt(3)} + tableDesc := desctestutils.TestingGetPublicTableDescriptor(kvDB, keys.SystemSQLCodec, "db", "t") + primaryIndex := tableDesc.GetPrimaryIndex() + var colIDtoRowIndex catalog.TableColMap + colIDtoRowIndex.Set(tableDesc.PublicColumns()[0].GetID(), 0) + colIDtoRowIndex.Set(tableDesc.PublicColumns()[1].GetID(), 1) + primaryIndexKey, err := rowenc.EncodePrimaryIndex(keys.SystemSQLCodec, tableDesc, primaryIndex, colIDtoRowIndex, values, true) if err != nil { + t.Fatalf("unexpected error %s", err) + } + if len(primaryIndexKey) != 1 { + t.Fatalf("expected 1 index entry, got %d", len(primaryIndexKey)) + } + // Put a duplicate unique value via KV. + if err := kvDB.Put(context.Background(), primaryIndexKey[0].Key, &primaryIndexKey[0].Value); err != nil { t.Fatalf("unexpected error: %s", err) } - if len(results) != 1 { - t.Fatalf("expected 1 result, got %d. got %#v", len(results), results) + // Run SCRUB + exp := []scrubtestutils.ExpectedScrubResult{ + { + ErrorType: scrub.UniqueConstraintViolation, + Database: "db", + Table: "t", + PrimaryKey: "(1)", + DetailsRegex: `{"constraint_name": "unique_id2", "row_data": {"id": "1", "id2": "3"}`, + }, + { + ErrorType: scrub.UniqueConstraintViolation, + Database: "db", + Table: "t", + PrimaryKey: "(2)", + DetailsRegex: `{"constraint_name": "unique_id2", "row_data": {"id": "2", "id2": "3"}`, + }, } - checkScrubResult(t, results[0], exp) + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE db.t WITH OPTIONS CONSTRAINT ALL`, exp) + time.Sleep(1 * time.Millisecond) + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE db.t AS OF SYSTEM TIME '-1ms' WITH OPTIONS CONSTRAINT ALL`, exp) } diff --git a/pkg/sql/scrub_unique_constraint.go b/pkg/sql/scrub_unique_constraint.go new file mode 100644 index 000000000000..bdcb4c2a949e --- /dev/null +++ b/pkg/sql/scrub_unique_constraint.go @@ -0,0 +1,202 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sql + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/scrub" + "github.com/cockroachdb/cockroach/pkg/sql/sem/catid" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util/hlc" +) + +// sqlUniqueConstraintCheckOperation is a check which validates a +// UNIQUE constraint on a table. +type sqlUniqueConstraintCheckOperation struct { + tableName *tree.TableName + tableDesc catalog.TableDescriptor + constraint *descpb.ConstraintDetail + cols []catid.ColumnID + name string + asOf hlc.Timestamp + predicate string + + // columns is a list of the columns returned in the query result + // tree.Datums. + columns []catalog.Column + // primaryColIdxs maps PrimaryIndex.Columns to the row + // indexes in the query result tree.Datums. + primaryColIdxs []int + + run sqlCheckConstraintCheckRun +} + +func newSQLUniqueConstraintCheckOperation( + tableName *tree.TableName, + tableDesc catalog.TableDescriptor, + constraint descpb.ConstraintDetail, + asOf hlc.Timestamp, +) *sqlUniqueConstraintCheckOperation { + op := sqlUniqueConstraintCheckOperation{ + tableName: tableName, + tableDesc: tableDesc, + constraint: &constraint, + asOf: asOf, + } + if constraint.UniqueWithoutIndexConstraint != nil { + op.cols = constraint.UniqueWithoutIndexConstraint.ColumnIDs + op.name = constraint.UniqueWithoutIndexConstraint.Name + op.predicate = constraint.UniqueWithoutIndexConstraint.Predicate + } else if constraint.Index != nil { + op.cols = constraint.Index.KeyColumnIDs + // Partitioning columns are prepended to the index but are not part of the + // unique constraint, so we ignore them. + if constraint.Index.Partitioning.NumImplicitColumns > 0 { + op.cols = op.cols[constraint.Index.Partitioning.NumImplicitColumns:] + } + op.name = constraint.Index.Name + op.predicate = constraint.Index.Predicate + } + return &op +} + +// Start implements the checkOperation interface. +// It creates a SELECT expression and generates a plan from it, which +// then runs in the distSQL execution engine. +func (o *sqlUniqueConstraintCheckOperation) Start(params runParams) error { + ctx := params.ctx + // Create a query of the form: + // SELECT a,b,c FROM db.t AS tbl1 JOIN + // (SELECT b, c FROM db.t GROUP BY b, c + // WHERE b IS NOT NULL AND c IS NOT NULL [AND partial index predicate] + // HAVING COUNT(*) > 1) as tbl2 + // ON tbl1.b = tbl2.b AND tbl1.c = tbl2.c; + // Where a, b, and c are all the public columns in table db.t and b and c are + // the unique columns. We select all public columns to provide detailed + // information if there are constraint violations. + + // Collect all the columns. + o.columns = o.tableDesc.PublicColumns() + + // Make a list of all the unique column names. + keyCols := make([]string, len(o.cols)) + matchers := make([]string, len(o.cols)) + for i := 0; i < len(o.cols); i++ { + col, err := o.tableDesc.FindColumnWithID(o.cols[i]) + if err != nil { + return err + } + keyCols[i] = tree.NameString(col.GetName()) + matchers[i] = fmt.Sprintf("tbl1.%[1]s=tbl2.%[1]s", keyCols[i]) + } + // Make a list of all the public column names. + pCols := make([]string, len(o.columns)) + for i := 0; i < len(o.columns); i++ { + pCols[i] = fmt.Sprintf("tbl1.%[1]s", tree.NameString(o.columns[i].GetName())) + } + asOf := "" + if o.asOf != hlc.MaxTimestamp { + asOf = fmt.Sprintf("AS OF SYSTEM TIME %[1]d", + o.asOf.WallTime) + } + tableName := fmt.Sprintf("%s.%s", o.tableName.Catalog(), o.tableName.Table()) + dup, _, err := duplicateRowQuery(o.tableDesc, o.cols, o.predicate, false /* limitResults */) + if err != nil { + return err + } + + sel := fmt.Sprintf(`SELECT %[1]s +FROM %[2]s AS tbl1 JOIN +(%[3]s) AS tbl2 +ON %[4]s +%[5]s `, + strings.Join(pCols, ","), // 1 + tableName, // 2 + dup, // 3 + strings.Join(matchers, " AND "), // 4 + asOf, // 5 + ) + + rows, err := params.extendedEvalCtx.ExecCfg.InternalExecutor.QueryBuffered( + ctx, "scrub-unique", params.p.txn, sel, + ) + if err != nil { + return err + } + + o.run.started = true + o.run.rows = rows + // Find the row indexes for all of the primary index columns. + o.primaryColIdxs, err = getPrimaryColIdxs(o.tableDesc, o.columns) + return err +} + +// Next implements the checkOperation interface. +func (o *sqlUniqueConstraintCheckOperation) Next(params runParams) (tree.Datums, error) { + row := o.run.rows[o.run.rowIndex] + o.run.rowIndex++ + timestamp, err := tree.MakeDTimestamp( + params.extendedEvalCtx.GetStmtTimestamp(), + time.Nanosecond, + ) + if err != nil { + return nil, err + } + + var primaryKeyDatums tree.Datums + for _, rowIdx := range o.primaryColIdxs { + primaryKeyDatums = append(primaryKeyDatums, row[rowIdx]) + } + + details := make(map[string]interface{}) + rowDetails := make(map[string]interface{}) + details["row_data"] = rowDetails + details["constraint_name"] = o.name + for rowIdx, col := range o.columns { + rowDetails[col.GetName()] = row[rowIdx].String() + } + detailsJSON, err := tree.MakeDJSON(details) + if err != nil { + return nil, err + } + + return tree.Datums{ + tree.DNull, /* job_uuid */ + tree.NewDString(scrub.UniqueConstraintViolation), + tree.NewDString(o.tableName.Catalog()), + tree.NewDString(o.tableName.Table()), + tree.NewDString(primaryKeyDatums.String()), + timestamp, + tree.DBoolFalse, + detailsJSON, + }, nil +} + +// Started implements the checkOperation interface. +func (o *sqlUniqueConstraintCheckOperation) Started() bool { + return o.run.started +} + +// Done implements the checkOperation interface. +func (o *sqlUniqueConstraintCheckOperation) Done(ctx context.Context) bool { + return o.run.rows == nil || o.run.rowIndex >= len(o.run.rows) +} + +// Close implements the checkOperation interface. +func (o *sqlUniqueConstraintCheckOperation) Close(ctx context.Context) { + o.run.rows = nil +}