From a6567e3f2b88eab2729ce96e39470eb1520e88c4 Mon Sep 17 00:00:00 2001 From: rharding6373 Date: Fri, 18 Mar 2022 11:30:12 -0700 Subject: [PATCH] sql: adds unique constraint checks to SCRUB The SCRUB tool validates table constraints. Before this change, it could only validate `CHECK` and foreign key constraints. This PR enhances SCRUB to also validate tables that contain `PRIMARY KEY`, `UNIQUE`, or the experimental `UNIQUE WITHOUT INDEX` keywords. Fixes: #73632 Release note (sql change): This PR expands the capabilities of the experimental SCRUB tool to include checking unique constraints for primary keys, unique indexes, and unique columns without indexes. The usage and output of SCRUB is unchanged, but if there is a unique constraint violation, users will see the error message `unique_constraint_violation` for all rows that violate the constraint, along with information about the row. --- pkg/ccl/partitionccl/BUILD.bazel | 4 + pkg/ccl/partitionccl/scrub_test.go | 521 ++++++++++++++++++ pkg/sql/BUILD.bazel | 2 + pkg/sql/scrub.go | 10 +- pkg/sql/scrub/errors.go | 3 + pkg/sql/scrub/scrubtestutils/BUILD.bazel | 9 + .../scrub/scrubtestutils/scrub_test_helper.go | 88 +++ pkg/sql/scrub_test.go | 241 ++++---- pkg/sql/scrub_unique_constraint.go | 202 +++++++ 9 files changed, 965 insertions(+), 115 deletions(-) create mode 100644 pkg/ccl/partitionccl/scrub_test.go create mode 100644 pkg/sql/scrub/scrubtestutils/BUILD.bazel create mode 100644 pkg/sql/scrub/scrubtestutils/scrub_test_helper.go create mode 100644 pkg/sql/scrub_unique_constraint.go diff --git a/pkg/ccl/partitionccl/BUILD.bazel b/pkg/ccl/partitionccl/BUILD.bazel index 609e4d7ed27d..2a12e6b6a055 100644 --- a/pkg/ccl/partitionccl/BUILD.bazel +++ b/pkg/ccl/partitionccl/BUILD.bazel @@ -41,6 +41,7 @@ go_test( "drop_test.go", "main_test.go", "partition_test.go", + "scrub_test.go", "zone_test.go", ], embed = [":partitionccl"], @@ -73,6 +74,9 @@ go_test( "//pkg/sql/lexbase", "//pkg/sql/parser", "//pkg/sql/randgen", + "//pkg/sql/rowenc", + "//pkg/sql/scrub", + "//pkg/sql/scrub/scrubtestutils", "//pkg/sql/sem/eval", "//pkg/sql/sem/tree", "//pkg/sql/tests", diff --git a/pkg/ccl/partitionccl/scrub_test.go b/pkg/ccl/partitionccl/scrub_test.go new file mode 100644 index 000000000000..422ae0f289c1 --- /dev/null +++ b/pkg/ccl/partitionccl/scrub_test.go @@ -0,0 +1,521 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package partitionccl + +import ( + "context" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/scrub" + "github.com/cockroachdb/cockroach/pkg/sql/scrub/scrubtestutils" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" +) + +// TestScrubUniqueIndex tests SCRUB on a table that violates a UNIQUE +// constraint. +func TestScrubUniqueIndex(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + utilccl.TestingEnableEnterprise() + s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(context.Background()) + + // Create the table and row entries. + if _, err := db.Exec(` +CREATE DATABASE db; +SET experimental_enable_implicit_column_partitioning = true; +CREATE TABLE db.t ( + id INT PRIMARY KEY, + id2 INT UNIQUE, + partition_by INT +) PARTITION ALL BY LIST (partition_by) ( + PARTITION one VALUES IN (1), + PARTITION two VALUES IN (2) +); + +INSERT INTO db.t VALUES (1, 2, 1), (2, 3, 2); +`); err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Overwrite the value on partition one with a duplicate unique index value. + values := []tree.Datum{tree.NewDInt(1), tree.NewDInt(3), tree.NewDInt(1)} + tableDesc := desctestutils.TestingGetPublicTableDescriptor(kvDB, keys.SystemSQLCodec, "db", "t") + primaryIndex := tableDesc.GetPrimaryIndex() + var colIDtoRowIndex catalog.TableColMap + colIDtoRowIndex.Set(tableDesc.PublicColumns()[0].GetID(), 0) + colIDtoRowIndex.Set(tableDesc.PublicColumns()[1].GetID(), 1) + colIDtoRowIndex.Set(tableDesc.PublicColumns()[2].GetID(), 2) + primaryIndexKey, err := rowenc.EncodePrimaryIndex(keys.SystemSQLCodec, tableDesc, primaryIndex, colIDtoRowIndex, values, true) + if err != nil { + t.Fatalf("unexpected error %s", err) + } + if len(primaryIndexKey) != 1 { + t.Fatalf("expected 1 index entry, got %d", len(primaryIndexKey)) + } + + // Add the primary key via the KV API. + if err := kvDB.Put(context.Background(), primaryIndexKey[0].Key, &primaryIndexKey[0].Value); err != nil { + t.Fatalf("unexpected error: %s", err) + } + + secondaryIndex := tableDesc.PublicNonPrimaryIndexes()[0] + secondaryIndexKey, err := rowenc.EncodeSecondaryIndex(keys.SystemSQLCodec, tableDesc, secondaryIndex, colIDtoRowIndex, values, true) + if err != nil { + t.Fatalf("unexpected error %s", err) + } + if len(secondaryIndexKey) != 1 { + t.Fatalf("expected 1 index entry, got %d", len(secondaryIndexKey)) + } + + // Add the secondary key via the KV API. + if err := kvDB.Put(context.Background(), secondaryIndexKey[0].Key, &secondaryIndexKey[0].Value); err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Run SCRUB. + exp := []scrubtestutils.ExpectedScrubResult{ + { + ErrorType: scrub.UniqueConstraintViolation, + Database: "db", + Table: "t", + PrimaryKey: "(1, 1)", + DetailsRegex: `{"constraint_name": "t_id2_key", "row_data": {"id": "1", "id2": "3", "partition_by": "1"}`, + }, + { + ErrorType: scrub.UniqueConstraintViolation, + Database: "db", + Table: "t", + PrimaryKey: "(2, 2)", + DetailsRegex: `{"constraint_name": "t_id2_key", "row_data": {"id": "2", "id2": "3", "partition_by": "2"}`, + }, + } + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE db.t WITH OPTIONS CONSTRAINT ALL`, exp) + time.Sleep(1 * time.Millisecond) + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE db.t AS OF SYSTEM TIME '-1ms' WITH OPTIONS CONSTRAINT ALL`, exp) +} + +// TestScrubUniqueIndexWithNulls tests SCRUB on a table with NULLs, which does +// not violate UNIQUE constraints. +func TestScrubUniqueIndexWithNulls(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + utilccl.TestingEnableEnterprise() + s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(context.Background()) + + // Create the table and row entries. + if _, err := db.Exec(` +CREATE DATABASE db; +SET experimental_enable_implicit_column_partitioning = true; +CREATE TABLE db.t ( + id INT PRIMARY KEY, + id2 INT UNIQUE, + partition_by INT +) PARTITION ALL BY LIST (partition_by) ( + PARTITION one VALUES IN (1), + PARTITION two VALUES IN (2) +); + +INSERT INTO db.t VALUES (1, 2, 1), (2, NULL, 2); +`); err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Overwrite the value on partition one with a NULL index value. + values := []tree.Datum{tree.NewDInt(1), tree.DNull, tree.NewDInt(1)} + tableDesc := desctestutils.TestingGetPublicTableDescriptor(kvDB, keys.SystemSQLCodec, "db", "t") + primaryIndex := tableDesc.GetPrimaryIndex() + var colIDtoRowIndex catalog.TableColMap + colIDtoRowIndex.Set(tableDesc.PublicColumns()[0].GetID(), 0) + colIDtoRowIndex.Set(tableDesc.PublicColumns()[1].GetID(), 1) + colIDtoRowIndex.Set(tableDesc.PublicColumns()[2].GetID(), 2) + primaryIndexKey, err := rowenc.EncodePrimaryIndex(keys.SystemSQLCodec, tableDesc, primaryIndex, colIDtoRowIndex, values, true) + if err != nil { + t.Fatalf("unexpected error %s", err) + } + if len(primaryIndexKey) != 1 { + t.Fatalf("expected 1 index entry, got %d", len(primaryIndexKey)) + } + + // Add the primary key via the KV API. + if err := kvDB.Put(context.Background(), primaryIndexKey[0].Key, &primaryIndexKey[0].Value); err != nil { + t.Fatalf("unexpected error: %s", err) + } + + secondaryIndex := tableDesc.PublicNonPrimaryIndexes()[0] + secondaryIndexKey, err := rowenc.EncodeSecondaryIndex(keys.SystemSQLCodec, tableDesc, secondaryIndex, colIDtoRowIndex, values, true) + if err != nil { + t.Fatalf("unexpected error %s", err) + } + if len(secondaryIndexKey) != 1 { + t.Fatalf("expected 1 index entry, got %d", len(secondaryIndexKey)) + } + + // Add the secondary key via the KV API. + if err := kvDB.Put(context.Background(), secondaryIndexKey[0].Key, &secondaryIndexKey[0].Value); err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Run SCRUB. + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE db.t WITH OPTIONS CONSTRAINT ALL`, []scrubtestutils.ExpectedScrubResult{}) + time.Sleep(1 * time.Millisecond) + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE db.t AS OF SYSTEM TIME '-1ms' WITH OPTIONS CONSTRAINT ALL`, []scrubtestutils.ExpectedScrubResult{}) +} + +// TestScrubUniqueIndexExplicitPartition tests SCRUB on a table that uses an +// explicit partitioning scheme, and therefore should not violate unique +// constraints if one of the two values in the constraint is duplicated, even +// if the individual value has a unique constraint. +func TestScrubUniqueIndexExplicitPartition(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + utilccl.TestingEnableEnterprise() + s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(context.Background()) + + // Create the table and row entries. + if _, err := db.Exec(` +CREATE DATABASE db; +CREATE TABLE db.t ( + id INT PRIMARY KEY, + id2 INT, + UNIQUE (id, id2) PARTITION BY LIST (id) ( + PARTITION one VALUES IN (1), + PARTITION two VALUES IN (2) + ) +); + + +INSERT INTO db.t VALUES (1, 3), (2, 4); +`); err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Overwrite the value on partition one with a duplicate unique value. + values := []tree.Datum{tree.NewDInt(1), tree.NewDInt(4)} + tableDesc := desctestutils.TestingGetPublicTableDescriptor(kvDB, keys.SystemSQLCodec, "db", "t") + primaryIndex := tableDesc.GetPrimaryIndex() + var colIDtoRowIndex catalog.TableColMap + colIDtoRowIndex.Set(tableDesc.PublicColumns()[0].GetID(), 0) + colIDtoRowIndex.Set(tableDesc.PublicColumns()[1].GetID(), 1) + primaryIndexKey, err := rowenc.EncodePrimaryIndex(keys.SystemSQLCodec, tableDesc, primaryIndex, colIDtoRowIndex, values, true) + if err != nil { + t.Fatalf("unexpected error %s", err) + } + if len(primaryIndexKey) != 1 { + t.Fatalf("expected 1 index entry, got %d", len(primaryIndexKey)) + } + + // Add the primary key via the KV API. + if err := kvDB.Put(context.Background(), primaryIndexKey[0].Key, &primaryIndexKey[0].Value); err != nil { + t.Fatalf("unexpected error: %s", err) + } + secondaryIndex := tableDesc.PublicNonPrimaryIndexes()[0] + secondaryIndexKey, err := rowenc.EncodeSecondaryIndex( + keys.SystemSQLCodec, tableDesc, secondaryIndex, colIDtoRowIndex, values, true /* includeEmpty */) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if len(secondaryIndexKey) != 1 { + t.Fatalf("expected 1 index entry, got %d. got %#v", len(secondaryIndexKey), secondaryIndexKey) + } + if err := kvDB.Put(context.Background(), secondaryIndexKey[0].Key, &secondaryIndexKey[0].Value); err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Run SCRUB. + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE db.t WITH OPTIONS CONSTRAINT ALL`, []scrubtestutils.ExpectedScrubResult{}) + time.Sleep(1 * time.Millisecond) + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE db.t AS OF SYSTEM TIME '-1ms' WITH OPTIONS CONSTRAINT ALL`, []scrubtestutils.ExpectedScrubResult{}) +} + +// TestScrubPartialUniqueIndex tests SCRUB on a table that violates a UNIQUE +// constraint on a partial unique index. +func TestScrubPartialUniqueIndex(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + utilccl.TestingEnableEnterprise() + s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(context.Background()) + + // Create the table and row entries. + if _, err := db.Exec(` +CREATE DATABASE db; +SET experimental_enable_implicit_column_partitioning = true; +CREATE TABLE db.t ( + id INT PRIMARY KEY, + id2 INT, + partition_by INT, + UNIQUE INDEX idx (id2) WHERE id2 > 4 +) PARTITION ALL BY LIST (partition_by) ( + PARTITION one VALUES IN (1), + PARTITION two VALUES IN (2) +); + + +INSERT INTO db.t VALUES (1, 2, 1), (2, 3, 2), (3, 5, 1), (4, 6, 2); +`); err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Overwrite the values on partition 1 with duplicate idx2 values: one that + // falls under the unique index constraint, and one that does not. + valuesConstrained := []tree.Datum{tree.NewDInt(3), tree.NewDInt(6), tree.NewDInt(1)} + valuesNotConstrained := []tree.Datum{tree.NewDInt(1), tree.NewDInt(3), tree.NewDInt(1)} + tableDesc := desctestutils.TestingGetPublicTableDescriptor(kvDB, keys.SystemSQLCodec, "db", "t") + primaryIndex := tableDesc.GetPrimaryIndex() + secondaryIndex := tableDesc.PublicNonPrimaryIndexes()[0] + var colIDtoRowIndex catalog.TableColMap + colIDtoRowIndex.Set(tableDesc.PublicColumns()[0].GetID(), 0) + colIDtoRowIndex.Set(tableDesc.PublicColumns()[1].GetID(), 1) + colIDtoRowIndex.Set(tableDesc.PublicColumns()[2].GetID(), 2) + + // Modify the primary index with a duplicate constrained value. + primaryIndexKey, err := rowenc.EncodePrimaryIndex(keys.SystemSQLCodec, tableDesc, primaryIndex, colIDtoRowIndex, valuesConstrained, true) + if err != nil { + t.Fatalf("unexpected error %s", err) + } + if len(primaryIndexKey) != 1 { + t.Fatalf("expected 1 index entry, got %d", len(primaryIndexKey)) + } + if err := kvDB.Put(context.Background(), primaryIndexKey[0].Key, &primaryIndexKey[0].Value); err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Modify the secondary index with a duplicate constrained value. + secondaryIndexKey, err := rowenc.EncodeSecondaryIndex( + keys.SystemSQLCodec, tableDesc, secondaryIndex, colIDtoRowIndex, valuesConstrained, true /* includeEmpty */) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if len(secondaryIndexKey) != 1 { + t.Fatalf("expected 1 index entry, got %d. got %#v", len(secondaryIndexKey), secondaryIndexKey) + } + if err := kvDB.Put(context.Background(), secondaryIndexKey[0].Key, &secondaryIndexKey[0].Value); err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Modify the primary index with a duplicate value not in the constrained + // range. + primaryIndexKey, err = rowenc.EncodePrimaryIndex(keys.SystemSQLCodec, tableDesc, primaryIndex, colIDtoRowIndex, valuesNotConstrained, true) + if err != nil { + t.Fatalf("unexpected error %s", err) + } + if len(primaryIndexKey) != 1 { + t.Fatalf("expected 1 index entry, got %d", len(primaryIndexKey)) + } + if err := kvDB.Put(context.Background(), primaryIndexKey[0].Key, &primaryIndexKey[0].Value); err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Modify the secondary index with a duplicate value not in the constrained + // range. + secondaryIndexKey, err = rowenc.EncodeSecondaryIndex( + keys.SystemSQLCodec, tableDesc, secondaryIndex, colIDtoRowIndex, valuesNotConstrained, true /* includeEmpty */) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + if len(secondaryIndexKey) != 1 { + t.Fatalf("expected 1 index entry, got %d. got %#v", len(secondaryIndexKey), secondaryIndexKey) + } + if err := kvDB.Put(context.Background(), secondaryIndexKey[0].Key, &secondaryIndexKey[0].Value); err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Run SCRUB. + exp := []scrubtestutils.ExpectedScrubResult{ + { + ErrorType: scrub.UniqueConstraintViolation, + Database: "db", + Table: "t", + PrimaryKey: "(1, 3)", + DetailsRegex: `{"constraint_name": "idx", "row_data": {"id": "3", "id2": "6", "partition_by": "1"}`, + }, + { + ErrorType: scrub.UniqueConstraintViolation, + Database: "db", + Table: "t", + PrimaryKey: "(2, 4)", + DetailsRegex: `{"constraint_name": "idx", "row_data": {"id": "4", "id2": "6", "partition_by": "2"}`, + }, + } + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE db.t WITH OPTIONS CONSTRAINT ALL`, exp) + time.Sleep(1 * time.Millisecond) + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE db.t AS OF SYSTEM TIME '-1ms' WITH OPTIONS CONSTRAINT ALL`, exp) +} + +// TestScrubUniqueIndexMultiCol tests SCRUB on a table that violates a UNIQUE +// constraint for multiple columns. +func TestScrubUniqueIndexMultiCol(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + utilccl.TestingEnableEnterprise() + s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(context.Background()) + + // Create the table and row entries. + if _, err := db.Exec(` +CREATE DATABASE db; +SET experimental_enable_implicit_column_partitioning = true; +CREATE TABLE db.t ( + pk INT PRIMARY KEY, + id INT, + id2 INT, + partition_by INT, + UNIQUE (id, id2) +) PARTITION ALL BY LIST (partition_by) ( + PARTITION one VALUES IN (1), + PARTITION two VALUES IN (2) +); + + +INSERT INTO db.t VALUES (1, 1, 2, 1); +`); err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Insert a row on partition 2 with a duplicate unique index value. + values := []tree.Datum{tree.NewDInt(2), tree.NewDInt(1), tree.NewDInt(2), tree.NewDInt(2)} + tableDesc := desctestutils.TestingGetPublicTableDescriptor(kvDB, keys.SystemSQLCodec, "db", "t") + primaryIndex := tableDesc.GetPrimaryIndex() + var colIDtoRowIndex catalog.TableColMap + colIDtoRowIndex.Set(tableDesc.PublicColumns()[0].GetID(), 0) + colIDtoRowIndex.Set(tableDesc.PublicColumns()[1].GetID(), 1) + colIDtoRowIndex.Set(tableDesc.PublicColumns()[2].GetID(), 2) + colIDtoRowIndex.Set(tableDesc.PublicColumns()[3].GetID(), 3) + primaryIndexKey, err := rowenc.EncodePrimaryIndex(keys.SystemSQLCodec, tableDesc, primaryIndex, colIDtoRowIndex, values, true) + if err != nil { + t.Fatalf("unexpected error %s", err) + } + if len(primaryIndexKey) != 1 { + t.Fatalf("expected 1 index entry, got %d", len(primaryIndexKey)) + } + + // Add the primary key via the KV API. + if err := kvDB.Put(context.Background(), primaryIndexKey[0].Key, &primaryIndexKey[0].Value); err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Modify the secondary index with the duplicate value. + secondaryIndex := tableDesc.PublicNonPrimaryIndexes()[0] + secondaryIndexKey, err := rowenc.EncodeSecondaryIndex( + keys.SystemSQLCodec, tableDesc, secondaryIndex, colIDtoRowIndex, values, true /* includeEmpty */) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if len(secondaryIndexKey) != 1 { + t.Fatalf("expected 1 index entry, got %d. got %#v", len(secondaryIndexKey), secondaryIndexKey) + } + if err := kvDB.Put(context.Background(), secondaryIndexKey[0].Key, &secondaryIndexKey[0].Value); err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Run SCRUB. + exp := []scrubtestutils.ExpectedScrubResult{ + { + ErrorType: scrub.UniqueConstraintViolation, + Database: "db", + Table: "t", + PrimaryKey: "(1, 1)", + DetailsRegex: `{"constraint_name": "t_id_id2_key", "row_data": {"id": "1", "id2": "2", "partition_by": "1", "pk": "1"}`, + }, + { + ErrorType: scrub.UniqueConstraintViolation, + Database: "db", + Table: "t", + PrimaryKey: "(2, 2)", + DetailsRegex: `{"constraint_name": "t_id_id2_key", "row_data": {"id": "1", "id2": "2", "partition_by": "2", "pk": "2"}`, + }, + } + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE db.t WITH OPTIONS CONSTRAINT ALL`, exp) + time.Sleep(1 * time.Millisecond) + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE db.t AS OF SYSTEM TIME '-1ms' WITH OPTIONS CONSTRAINT ALL`, exp) +} + +// TestScrubPrimaryKey tests SCRUB on a table that violates a PRIMARY KEY +// constraint. +func TestScrubPrimaryKey(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + utilccl.TestingEnableEnterprise() + s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(context.Background()) + + // Create the table and row entries. + if _, err := db.Exec(` +CREATE DATABASE db; +SET experimental_enable_implicit_column_partitioning = true; +CREATE TABLE db.t ( + id INT PRIMARY KEY, + partition_by INT +) PARTITION ALL BY LIST (partition_by) ( + PARTITION one VALUES IN (1), + PARTITION two VALUES IN (2) +); + + +INSERT INTO db.t VALUES (1, 1); +`); err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Insert a duplicate primary key into a different partition. + values := []tree.Datum{tree.NewDInt(1), tree.NewDInt(2)} + tableDesc := desctestutils.TestingGetPublicTableDescriptor(kvDB, keys.SystemSQLCodec, "db", "t") + primaryIndex := tableDesc.GetPrimaryIndex() + var colIDtoRowIndex catalog.TableColMap + colIDtoRowIndex.Set(tableDesc.PublicColumns()[0].GetID(), 0) + colIDtoRowIndex.Set(tableDesc.PublicColumns()[1].GetID(), 1) + primaryIndexKey, err := rowenc.EncodePrimaryIndex(keys.SystemSQLCodec, tableDesc, primaryIndex, colIDtoRowIndex, values, true) + if err != nil { + t.Fatalf("unexpected error %s", err) + } + if len(primaryIndexKey) != 1 { + t.Fatalf("expected 1 index entry, got %d", len(primaryIndexKey)) + } + + // Insert primary key via KV. + if err := kvDB.Put(context.Background(), primaryIndexKey[0].Key, &primaryIndexKey[0].Value); err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Run SCRUB. + exp := []scrubtestutils.ExpectedScrubResult{ + { + ErrorType: scrub.UniqueConstraintViolation, + Database: "db", + Table: "t", + PrimaryKey: "(1, 1)", + DetailsRegex: `{"constraint_name": "t_pkey", "row_data": {"id": "1", "partition_by": "1"}`, + }, + { + ErrorType: scrub.UniqueConstraintViolation, + Database: "db", + Table: "t", + PrimaryKey: "(2, 1)", + DetailsRegex: `{"constraint_name": "t_pkey", "row_data": {"id": "1", "partition_by": "2"}`, + }, + } + + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE db.t WITH OPTIONS CONSTRAINT ALL`, exp) + time.Sleep(1 * time.Millisecond) + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE db.t AS OF SYSTEM TIME '-1ms' WITH OPTIONS CONSTRAINT ALL`, exp) +} diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 81724e41aec8..361860fed176 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -181,6 +181,7 @@ go_library( "scrub_constraint.go", "scrub_fk.go", "scrub_index.go", + "scrub_unique_constraint.go", "select_name_resolution.go", "sequence.go", "sequence_select.go", @@ -672,6 +673,7 @@ go_test( "//pkg/sql/rowinfra", "//pkg/sql/schemachanger/scexec", "//pkg/sql/scrub", + "//pkg/sql/scrub/scrubtestutils", "//pkg/sql/sem/builtins", "//pkg/sql/sem/catconstants", "//pkg/sql/sem/catid", diff --git a/pkg/sql/scrub.go b/pkg/sql/scrub.go index d3567500db22..dca97daeaa5f 100644 --- a/pkg/sql/scrub.go +++ b/pkg/sql/scrub.go @@ -394,8 +394,7 @@ func createIndexCheckOperations( // createConstraintCheckOperations will return all of the constraints // that are being checked. If constraintNames is nil, then all // constraints are returned. -// TODO(joey): Only SQL CHECK and FOREIGN KEY constraints are -// implemented. +// Only SQL CHECK, FOREIGN KEY, and UNIQUE constraints are supported. func createConstraintCheckOperations( ctx context.Context, p *planner, @@ -443,6 +442,13 @@ func createConstraintCheckOperations( constraint, asOf, )) + case descpb.ConstraintTypePK, descpb.ConstraintTypeUnique: + results = append(results, newSQLUniqueConstraintCheckOperation( + tableName, + tableDesc, + constraint, + asOf, + )) } } return results, nil diff --git a/pkg/sql/scrub/errors.go b/pkg/sql/scrub/errors.go index 9ef8abbaf8b3..4a942c72cc93 100644 --- a/pkg/sql/scrub/errors.go +++ b/pkg/sql/scrub/errors.go @@ -42,6 +42,9 @@ const ( // ForeignKeyConstraintViolation occurs when a row in a // table is violating a foreign key constraint. ForeignKeyConstraintViolation = "foreign_key_violation" + // UniqueConstraintViolation occurs when a row in a table is violating + // a unique constraint. + UniqueConstraintViolation = "unique_constraint_violation" ) // Error contains the details on the scrub error that was caught. diff --git a/pkg/sql/scrub/scrubtestutils/BUILD.bazel b/pkg/sql/scrub/scrubtestutils/BUILD.bazel new file mode 100644 index 000000000000..32f44603c1fd --- /dev/null +++ b/pkg/sql/scrub/scrubtestutils/BUILD.bazel @@ -0,0 +1,9 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "scrubtestutils", + srcs = ["scrub_test_helper.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/sql/scrub/scrubtestutils", + visibility = ["//visibility:public"], + deps = ["//pkg/testutils/sqlutils"], +) diff --git a/pkg/sql/scrub/scrubtestutils/scrub_test_helper.go b/pkg/sql/scrub/scrubtestutils/scrub_test_helper.go new file mode 100644 index 000000000000..e82ff877060f --- /dev/null +++ b/pkg/sql/scrub/scrubtestutils/scrub_test_helper.go @@ -0,0 +1,88 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package scrubtestutils + +import ( + gosql "database/sql" + "regexp" + "testing" + + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" +) + +// ExpectedScrubResult contains details about errors that are expected during +// SCRUB testing. +type ExpectedScrubResult struct { + ErrorType string + Database string + Table string + PrimaryKey string + Repaired bool + DetailsRegex string +} + +// CheckScrubResult compares the results from running SCRUB with the expected +// results and throws an error if they do not match. +func CheckScrubResult(t *testing.T, ress []sqlutils.ScrubResult, exps []ExpectedScrubResult) { + t.Helper() + + for i := 0; i < len(exps); i++ { + res := ress[i] + exp := exps[i] + if res.ErrorType != exp.ErrorType { + t.Errorf("expected %q error, instead got: %s", exp.ErrorType, res.ErrorType) + } + + if res.Database != exp.Database { + t.Errorf("expected database %q, got %q", exp.Database, res.Database) + } + + if res.Table != exp.Table { + t.Errorf("expected table %q, got %q", exp.Table, res.Table) + } + + if res.PrimaryKey != exp.PrimaryKey { + t.Errorf("expected primary key %q, got %q", exp.PrimaryKey, res.PrimaryKey) + } + if res.Repaired != exp.Repaired { + t.Fatalf("expected repaired %v, got %v", exp.Repaired, res.Repaired) + } + + if matched, err := regexp.MatchString(exp.DetailsRegex, res.Details); err != nil { + t.Fatal(err) + } else if !matched { + t.Errorf("expected error details to contain `%s`, got `%s`", exp.DetailsRegex, res.Details) + } + } +} + +// RunScrub runs a SCRUB statement and checks that it returns exactly one scrub +// result and that it matches the expected result. +func RunScrub(t *testing.T, db *gosql.DB, scrubStmt string, exp []ExpectedScrubResult) { + t.Helper() + + // Run SCRUB and find the violation created. + rows, err := db.Query(scrubStmt) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + defer rows.Close() + + results, err := sqlutils.GetScrubResultRows(rows) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + if len(results) != len(exp) { + t.Fatalf("expected %d results, got %d. got %#v", len(exp), len(results), results) + } + CheckScrubResult(t, results, exp) +} diff --git a/pkg/sql/scrub_test.go b/pkg/sql/scrub_test.go index 6e89a796df60..b5a5f2fd0cbf 100644 --- a/pkg/sql/scrub_test.go +++ b/pkg/sql/scrub_test.go @@ -12,9 +12,7 @@ package sql_test import ( "context" - gosql "database/sql" "fmt" - "regexp" "strings" "testing" "time" @@ -28,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/rowenc/valueside" "github.com/cockroachdb/cockroach/pkg/sql/scrub" + "github.com/cockroachdb/cockroach/pkg/sql/scrub/scrubtestutils" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" @@ -68,23 +67,25 @@ INSERT INTO t."tEst" VALUES (10, 20); } // Run SCRUB and find the index errors we created. - exp := expectedScrubResult{ - ErrorType: scrub.MissingIndexEntryError, - Database: "t", - Table: "tEst", - PrimaryKey: "(10)", - Repaired: false, - DetailsRegex: `"v": "20"`, - } - runScrub(t, db, `EXPERIMENTAL SCRUB TABLE t."tEst" WITH OPTIONS INDEX ALL`, exp) + exp := []scrubtestutils.ExpectedScrubResult{ + { + ErrorType: scrub.MissingIndexEntryError, + Database: "t", + Table: "tEst", + PrimaryKey: "(10)", + Repaired: false, + DetailsRegex: `"v": "20"`, + }, + } + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE t."tEst" WITH OPTIONS INDEX ALL`, exp) // Run again with AS OF SYSTEM TIME. time.Sleep(1 * time.Millisecond) - runScrub(t, db, `EXPERIMENTAL SCRUB TABLE t."tEst" AS OF SYSTEM TIME '-1ms' WITH OPTIONS INDEX ALL`, exp) + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE t."tEst" AS OF SYSTEM TIME '-1ms' WITH OPTIONS INDEX ALL`, exp) // Verify that AS OF SYSTEM TIME actually operates in the past. ts := r.QueryStr(t, `SELECT cluster_logical_timestamp()`)[0][0] r.Exec(t, `DELETE FROM t."tEst"`) - runScrub( + scrubtestutils.RunScrub( t, db, fmt.Sprintf( `EXPERIMENTAL SCRUB TABLE t."tEst" AS OF SYSTEM TIME '%s' WITH OPTIONS INDEX ALL`, ts, ), @@ -117,15 +118,17 @@ INSERT INTO t.test VALUES (2, 15); t.Fatalf("unexpected error: %s", err.Error()) } // Run SCRUB and find the index errors we created. - exp := expectedScrubResult{ - ErrorType: scrub.MissingIndexEntryError, - Database: "t", - Table: "test", - PrimaryKey: "(2)", - Repaired: false, - DetailsRegex: `"v": "15"`, + exp := []scrubtestutils.ExpectedScrubResult{ + { + ErrorType: scrub.MissingIndexEntryError, + Database: "t", + Table: "test", + PrimaryKey: "(2)", + Repaired: false, + DetailsRegex: `"v": "15"`, + }, } - runScrub(t, db, `EXPERIMENTAL SCRUB TABLE t.test WITH OPTIONS INDEX ALL`, exp) + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE t.test WITH OPTIONS INDEX ALL`, exp) }) t.Run("dangling index entry that matches predicate", func(t *testing.T) { r.Exec(t, ` @@ -143,15 +146,17 @@ INSERT INTO t.test VALUES (2, 15); t.Fatalf("unexpected error: %s", err.Error()) } // Run SCRUB and find the index errors we created. - exp := expectedScrubResult{ - ErrorType: scrub.DanglingIndexReferenceError, - Database: "t", - Table: "test", - PrimaryKey: "(3)", - Repaired: false, - DetailsRegex: `"v": "25"`, + exp := []scrubtestutils.ExpectedScrubResult{ + { + ErrorType: scrub.DanglingIndexReferenceError, + Database: "t", + Table: "test", + PrimaryKey: "(3)", + Repaired: false, + DetailsRegex: `"v": "25"`, + }, } - runScrub(t, db, `EXPERIMENTAL SCRUB TABLE t.test WITH OPTIONS INDEX ALL`, exp) + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE t.test WITH OPTIONS INDEX ALL`, exp) }) t.Run("dangling index entry that does not match predicate", func(t *testing.T) { r.Exec(t, ` @@ -169,15 +174,17 @@ INSERT INTO t.test VALUES (2, 15); t.Fatalf("unexpected error: %s", err.Error()) } // Run SCRUB and find the index errors we created. - exp := expectedScrubResult{ - ErrorType: scrub.DanglingIndexReferenceError, - Database: "t", - Table: "test", - PrimaryKey: "(3)", - Repaired: false, - DetailsRegex: `"v": "7"`, + exp := []scrubtestutils.ExpectedScrubResult{ + { + ErrorType: scrub.DanglingIndexReferenceError, + Database: "t", + Table: "test", + PrimaryKey: "(3)", + Repaired: false, + DetailsRegex: `"v": "7"`, + }, } - runScrub(t, db, `EXPERIMENTAL SCRUB TABLE t.test WITH OPTIONS INDEX ALL`, exp) + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE t.test WITH OPTIONS INDEX ALL`, exp) }) t.Run("index entry that does not match predicate", func(t *testing.T) { r.Exec(t, ` @@ -195,15 +202,17 @@ INSERT INTO t.test VALUES (2, 15); t.Fatalf("unexpected error: %s", err.Error()) } // Run SCRUB and find the index errors we created. - exp := expectedScrubResult{ - ErrorType: scrub.DanglingIndexReferenceError, - Database: "t", - Table: "test", - PrimaryKey: "(1)", - Repaired: false, - DetailsRegex: `"v": "5"`, + exp := []scrubtestutils.ExpectedScrubResult{ + { + ErrorType: scrub.DanglingIndexReferenceError, + Database: "t", + Table: "test", + PrimaryKey: "(1)", + Repaired: false, + DetailsRegex: `"v": "5"`, + }, } - runScrub(t, db, `EXPERIMENTAL SCRUB TABLE t.test WITH OPTIONS INDEX ALL`, exp) + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE t.test WITH OPTIONS INDEX ALL`, exp) }) } @@ -575,22 +584,24 @@ func TestScrubFKConstraintFKMissing(t *testing.T) { } // Run SCRUB and find the FOREIGN KEY violation created. - exp := expectedScrubResult{ - ErrorType: scrub.ForeignKeyConstraintViolation, - Database: "t", - Table: "child", - PrimaryKey: "(10)", - DetailsRegex: `{"constraint_name": "child_parent_id_fkey", "row_data": {"child_id": "10", "parent_id": "0"}}`, - } - runScrub(t, db, `EXPERIMENTAL SCRUB TABLE t.child WITH OPTIONS CONSTRAINT ALL`, exp) + exp := []scrubtestutils.ExpectedScrubResult{ + { + ErrorType: scrub.ForeignKeyConstraintViolation, + Database: "t", + Table: "child", + PrimaryKey: "(10)", + DetailsRegex: `{"constraint_name": "child_parent_id_fkey", "row_data": {"child_id": "10", "parent_id": "0"}}`, + }, + } + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE t.child WITH OPTIONS CONSTRAINT ALL`, exp) // Run again with AS OF SYSTEM TIME. time.Sleep(1 * time.Millisecond) - runScrub(t, db, `EXPERIMENTAL SCRUB TABLE t.child AS OF SYSTEM TIME '-1ms' WITH OPTIONS CONSTRAINT ALL`, exp) + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE t.child AS OF SYSTEM TIME '-1ms' WITH OPTIONS CONSTRAINT ALL`, exp) // Verify that AS OF SYSTEM TIME actually operates in the past. ts := r.QueryStr(t, `SELECT cluster_logical_timestamp()`)[0][0] r.Exec(t, "INSERT INTO t.parent VALUES (0)") - runScrub( + scrubtestutils.RunScrub( t, db, fmt.Sprintf( `EXPERIMENTAL SCRUB TABLE t.child AS OF SYSTEM TIME '%s' WITH OPTIONS CONSTRAINT ALL`, ts, ), @@ -633,75 +644,79 @@ ALTER TABLE t.child ADD FOREIGN KEY (parent_id, parent_id2) REFERENCES t.parent } // Run SCRUB and find the FOREIGN KEY violation created. - exp := expectedScrubResult{ - ErrorType: scrub.ForeignKeyConstraintViolation, - Database: "t", - Table: "child", - PrimaryKey: "(11)", - DetailsRegex: `{"constraint_name": "child_parent_id_parent_id2_fkey", "row_data": {"child_id": "11", "parent_id": "1337", "parent_id2": "NULL"}}`, - } - runScrub(t, db, `EXPERIMENTAL SCRUB TABLE t.child WITH OPTIONS CONSTRAINT ALL`, exp) + exp := []scrubtestutils.ExpectedScrubResult{ + { + ErrorType: scrub.ForeignKeyConstraintViolation, + Database: "t", + Table: "child", + PrimaryKey: "(11)", + DetailsRegex: `{"constraint_name": "child_parent_id_parent_id2_fkey", "row_data": {"child_id": "11", "parent_id": "1337", "parent_id2": "NULL"}}`, + }, + } + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE t.child WITH OPTIONS CONSTRAINT ALL`, exp) time.Sleep(1 * time.Millisecond) - runScrub(t, db, `EXPERIMENTAL SCRUB TABLE t.child AS OF SYSTEM TIME '-1ms' WITH OPTIONS CONSTRAINT ALL`, exp) -} - -type expectedScrubResult struct { - ErrorType string - Database string - Table string - PrimaryKey string - Repaired bool - DetailsRegex string + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE t.child AS OF SYSTEM TIME '-1ms' WITH OPTIONS CONSTRAINT ALL`, exp) } -func checkScrubResult(t *testing.T, res sqlutils.ScrubResult, exp expectedScrubResult) { - t.Helper() - - if res.ErrorType != exp.ErrorType { - t.Errorf("expected %q error, instead got: %s", exp.ErrorType, res.ErrorType) - } - - if res.Database != exp.Database { - t.Errorf("expected database %q, got %q", exp.Database, res.Database) - } - - if res.Table != exp.Table { - t.Errorf("expected table %q, got %q", exp.Table, res.Table) - } - - if res.PrimaryKey != exp.PrimaryKey { - t.Errorf("expected primary key %q, got %q", exp.PrimaryKey, res.PrimaryKey) - } - if res.Repaired != exp.Repaired { - t.Fatalf("expected repaired %v, got %v", exp.Repaired, res.Repaired) - } - - if matched, err := regexp.MatchString(exp.DetailsRegex, res.Details); err != nil { - t.Fatal(err) - } else if !matched { - t.Errorf("expected error details to contain `%s`, got `%s`", exp.DetailsRegex, res.Details) - } -} +// TestScrubUniqueWithoutIndex tests SCRUB on a table that violates a +// UNIQUE WITHOUT INDEX constraint. +func TestScrubUniqueWithoutIndex(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(context.Background()) -// runScrub runs a SCRUB statement and checks that it returns exactly one scrub -// result and that it matches the expected result. -func runScrub(t *testing.T, db *gosql.DB, scrubStmt string, exp expectedScrubResult) { - t.Helper() + // Create the table and row entries. + if _, err := db.Exec(` +CREATE DATABASE db; +SET experimental_enable_unique_without_index_constraints = true; +CREATE TABLE db.t ( + id INT PRIMARY KEY, + id2 INT UNIQUE WITHOUT INDEX +); - // Run SCRUB and find the FOREIGN KEY violation created. - rows, err := db.Query(scrubStmt) - if err != nil { +INSERT INTO db.t VALUES (1, 2), (2,3); +`); err != nil { t.Fatalf("unexpected error: %s", err) } - defer rows.Close() - results, err := sqlutils.GetScrubResultRows(rows) + // Overwrite one of the values with a duplicate unique value. + values := []tree.Datum{tree.NewDInt(1), tree.NewDInt(3)} + tableDesc := desctestutils.TestingGetPublicTableDescriptor(kvDB, keys.SystemSQLCodec, "db", "t") + primaryIndex := tableDesc.GetPrimaryIndex() + var colIDtoRowIndex catalog.TableColMap + colIDtoRowIndex.Set(tableDesc.PublicColumns()[0].GetID(), 0) + colIDtoRowIndex.Set(tableDesc.PublicColumns()[1].GetID(), 1) + primaryIndexKey, err := rowenc.EncodePrimaryIndex(keys.SystemSQLCodec, tableDesc, primaryIndex, colIDtoRowIndex, values, true) if err != nil { + t.Fatalf("unexpected error %s", err) + } + if len(primaryIndexKey) != 1 { + t.Fatalf("expected 1 index entry, got %d", len(primaryIndexKey)) + } + // Put a duplicate unique value via KV. + if err := kvDB.Put(context.Background(), primaryIndexKey[0].Key, &primaryIndexKey[0].Value); err != nil { t.Fatalf("unexpected error: %s", err) } - if len(results) != 1 { - t.Fatalf("expected 1 result, got %d. got %#v", len(results), results) + // Run SCRUB + exp := []scrubtestutils.ExpectedScrubResult{ + { + ErrorType: scrub.UniqueConstraintViolation, + Database: "db", + Table: "t", + PrimaryKey: "(1)", + DetailsRegex: `{"constraint_name": "unique_id2", "row_data": {"id": "1", "id2": "3"}`, + }, + { + ErrorType: scrub.UniqueConstraintViolation, + Database: "db", + Table: "t", + PrimaryKey: "(2)", + DetailsRegex: `{"constraint_name": "unique_id2", "row_data": {"id": "2", "id2": "3"}`, + }, } - checkScrubResult(t, results[0], exp) + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE db.t WITH OPTIONS CONSTRAINT ALL`, exp) + time.Sleep(1 * time.Millisecond) + scrubtestutils.RunScrub(t, db, `EXPERIMENTAL SCRUB TABLE db.t AS OF SYSTEM TIME '-1ms' WITH OPTIONS CONSTRAINT ALL`, exp) } diff --git a/pkg/sql/scrub_unique_constraint.go b/pkg/sql/scrub_unique_constraint.go new file mode 100644 index 000000000000..bdcb4c2a949e --- /dev/null +++ b/pkg/sql/scrub_unique_constraint.go @@ -0,0 +1,202 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sql + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/scrub" + "github.com/cockroachdb/cockroach/pkg/sql/sem/catid" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util/hlc" +) + +// sqlUniqueConstraintCheckOperation is a check which validates a +// UNIQUE constraint on a table. +type sqlUniqueConstraintCheckOperation struct { + tableName *tree.TableName + tableDesc catalog.TableDescriptor + constraint *descpb.ConstraintDetail + cols []catid.ColumnID + name string + asOf hlc.Timestamp + predicate string + + // columns is a list of the columns returned in the query result + // tree.Datums. + columns []catalog.Column + // primaryColIdxs maps PrimaryIndex.Columns to the row + // indexes in the query result tree.Datums. + primaryColIdxs []int + + run sqlCheckConstraintCheckRun +} + +func newSQLUniqueConstraintCheckOperation( + tableName *tree.TableName, + tableDesc catalog.TableDescriptor, + constraint descpb.ConstraintDetail, + asOf hlc.Timestamp, +) *sqlUniqueConstraintCheckOperation { + op := sqlUniqueConstraintCheckOperation{ + tableName: tableName, + tableDesc: tableDesc, + constraint: &constraint, + asOf: asOf, + } + if constraint.UniqueWithoutIndexConstraint != nil { + op.cols = constraint.UniqueWithoutIndexConstraint.ColumnIDs + op.name = constraint.UniqueWithoutIndexConstraint.Name + op.predicate = constraint.UniqueWithoutIndexConstraint.Predicate + } else if constraint.Index != nil { + op.cols = constraint.Index.KeyColumnIDs + // Partitioning columns are prepended to the index but are not part of the + // unique constraint, so we ignore them. + if constraint.Index.Partitioning.NumImplicitColumns > 0 { + op.cols = op.cols[constraint.Index.Partitioning.NumImplicitColumns:] + } + op.name = constraint.Index.Name + op.predicate = constraint.Index.Predicate + } + return &op +} + +// Start implements the checkOperation interface. +// It creates a SELECT expression and generates a plan from it, which +// then runs in the distSQL execution engine. +func (o *sqlUniqueConstraintCheckOperation) Start(params runParams) error { + ctx := params.ctx + // Create a query of the form: + // SELECT a,b,c FROM db.t AS tbl1 JOIN + // (SELECT b, c FROM db.t GROUP BY b, c + // WHERE b IS NOT NULL AND c IS NOT NULL [AND partial index predicate] + // HAVING COUNT(*) > 1) as tbl2 + // ON tbl1.b = tbl2.b AND tbl1.c = tbl2.c; + // Where a, b, and c are all the public columns in table db.t and b and c are + // the unique columns. We select all public columns to provide detailed + // information if there are constraint violations. + + // Collect all the columns. + o.columns = o.tableDesc.PublicColumns() + + // Make a list of all the unique column names. + keyCols := make([]string, len(o.cols)) + matchers := make([]string, len(o.cols)) + for i := 0; i < len(o.cols); i++ { + col, err := o.tableDesc.FindColumnWithID(o.cols[i]) + if err != nil { + return err + } + keyCols[i] = tree.NameString(col.GetName()) + matchers[i] = fmt.Sprintf("tbl1.%[1]s=tbl2.%[1]s", keyCols[i]) + } + // Make a list of all the public column names. + pCols := make([]string, len(o.columns)) + for i := 0; i < len(o.columns); i++ { + pCols[i] = fmt.Sprintf("tbl1.%[1]s", tree.NameString(o.columns[i].GetName())) + } + asOf := "" + if o.asOf != hlc.MaxTimestamp { + asOf = fmt.Sprintf("AS OF SYSTEM TIME %[1]d", + o.asOf.WallTime) + } + tableName := fmt.Sprintf("%s.%s", o.tableName.Catalog(), o.tableName.Table()) + dup, _, err := duplicateRowQuery(o.tableDesc, o.cols, o.predicate, false /* limitResults */) + if err != nil { + return err + } + + sel := fmt.Sprintf(`SELECT %[1]s +FROM %[2]s AS tbl1 JOIN +(%[3]s) AS tbl2 +ON %[4]s +%[5]s `, + strings.Join(pCols, ","), // 1 + tableName, // 2 + dup, // 3 + strings.Join(matchers, " AND "), // 4 + asOf, // 5 + ) + + rows, err := params.extendedEvalCtx.ExecCfg.InternalExecutor.QueryBuffered( + ctx, "scrub-unique", params.p.txn, sel, + ) + if err != nil { + return err + } + + o.run.started = true + o.run.rows = rows + // Find the row indexes for all of the primary index columns. + o.primaryColIdxs, err = getPrimaryColIdxs(o.tableDesc, o.columns) + return err +} + +// Next implements the checkOperation interface. +func (o *sqlUniqueConstraintCheckOperation) Next(params runParams) (tree.Datums, error) { + row := o.run.rows[o.run.rowIndex] + o.run.rowIndex++ + timestamp, err := tree.MakeDTimestamp( + params.extendedEvalCtx.GetStmtTimestamp(), + time.Nanosecond, + ) + if err != nil { + return nil, err + } + + var primaryKeyDatums tree.Datums + for _, rowIdx := range o.primaryColIdxs { + primaryKeyDatums = append(primaryKeyDatums, row[rowIdx]) + } + + details := make(map[string]interface{}) + rowDetails := make(map[string]interface{}) + details["row_data"] = rowDetails + details["constraint_name"] = o.name + for rowIdx, col := range o.columns { + rowDetails[col.GetName()] = row[rowIdx].String() + } + detailsJSON, err := tree.MakeDJSON(details) + if err != nil { + return nil, err + } + + return tree.Datums{ + tree.DNull, /* job_uuid */ + tree.NewDString(scrub.UniqueConstraintViolation), + tree.NewDString(o.tableName.Catalog()), + tree.NewDString(o.tableName.Table()), + tree.NewDString(primaryKeyDatums.String()), + timestamp, + tree.DBoolFalse, + detailsJSON, + }, nil +} + +// Started implements the checkOperation interface. +func (o *sqlUniqueConstraintCheckOperation) Started() bool { + return o.run.started +} + +// Done implements the checkOperation interface. +func (o *sqlUniqueConstraintCheckOperation) Done(ctx context.Context) bool { + return o.run.rows == nil || o.run.rowIndex >= len(o.run.rows) +} + +// Close implements the checkOperation interface. +func (o *sqlUniqueConstraintCheckOperation) Close(ctx context.Context) { + o.run.rows = nil +}