Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
79293: sql/importer: re-validate unique constraints after IMPORT r=dt a=dt

Release note (bug fix): Previously IMPORT INTO could create duplicate entries violating UNIQUE constraints in REGIONAL BY ROW tables and tables utilizing UNIQUE WITHOUT INDEX constraints. A new post-IMPORT validation step for those tables now fails and rolls back the IMPORT in such cases.

Co-authored-by: David Taylor <[email protected]>
  • Loading branch information
craig[bot] and dt committed Apr 4, 2022
2 parents fa126a4 + ebe283b commit ca927f0
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 12 deletions.
17 changes: 14 additions & 3 deletions pkg/ccl/importerccl/ccl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestImportMultiRegion(t *testing.T) {

baseDir := sharedTestdata(t)
tc, sqlDB, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster(
t, 2 /* numServers */, base.TestingKnobs{}, multiregionccltestutils.WithBaseDirectory(baseDir),
t, 3 /* numServers */, base.TestingKnobs{}, multiregionccltestutils.WithBaseDirectory(baseDir),
)
defer cleanup()

Expand Down Expand Up @@ -86,7 +86,7 @@ func TestImportMultiRegion(t *testing.T) {

// Create the databases
tdb.Exec(t, `CREATE DATABASE foo`)
tdb.Exec(t, `CREATE DATABASE multi_region PRIMARY REGION "us-east1"`)
tdb.Exec(t, `CREATE DATABASE multi_region PRIMARY REGION "us-east1" REGIONS "us-east1", "us-east2"`)

simpleOcf := fmt.Sprintf("nodelocal://0/avro/%s", "simple.ocf")

Expand Down Expand Up @@ -183,6 +183,17 @@ DROP VIEW IF EXISTS v`,
args: []interface{}{srv.URL},
data: "1,\"foo\",NULL,us-east1\n",
},
{
name: "import-into-multi-region-regional-by-row-dupes",
db: "multi_region",
table: "mr_regional_by_row",
create: "CREATE TABLE mr_regional_by_row (i INT8 PRIMARY KEY) LOCALITY REGIONAL BY ROW;" +
"INSERT INTO mr_regional_by_row (i, crdb_region) VALUES (1, 'us-east2')",
sql: "IMPORT INTO mr_regional_by_row (i, crdb_region) CSV DATA ($1)",
args: []interface{}{srv.URL},
data: "1,us-east1\n",
errString: `failed to validate unique constraint`,
},
{
name: "import-into-multi-region-regional-by-row-to-multi-region-database-concurrent-table-add",
db: "multi_region",
Expand All @@ -199,7 +210,7 @@ DROP VIEW IF EXISTS v`,
table: "mr_regional_by_row",
create: "CREATE TABLE mr_regional_by_row (i INT8 PRIMARY KEY, s text, b bytea) LOCALITY REGIONAL BY ROW",
sql: "IMPORT INTO mr_regional_by_row (i, s, b, crdb_region) CSV DATA ($1)",
during: `ALTER DATABASE multi_region ADD REGION "us-east2"`,
during: `ALTER DATABASE multi_region ADD REGION "us-east3"`,
errString: `type descriptor "crdb_internal_region" \(\d+\) has been ` +
`modified, potentially incompatibly, since import planning; ` +
`aborting to avoid possible corruption`,
Expand Down
34 changes: 25 additions & 9 deletions pkg/sql/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func (p *planner) RevalidateUniqueConstraintsInCurrentDB(ctx context.Context) er
}

for _, tableDesc := range tableDescs {
if err = p.revalidateUniqueConstraintsInTable(ctx, tableDesc); err != nil {
if err = RevalidateUniqueConstraintsInTable(ctx, p.Txn(), p.ExecCfg().InternalExecutor, tableDesc); err != nil {
return err
}
}
Expand All @@ -402,7 +402,7 @@ func (p *planner) RevalidateUniqueConstraintsInTable(ctx context.Context, tableI
if err != nil {
return err
}
return p.revalidateUniqueConstraintsInTable(ctx, tableDesc)
return RevalidateUniqueConstraintsInTable(ctx, p.Txn(), p.ExecCfg().InternalExecutor, tableDesc)
}

// RevalidateUniqueConstraint verifies that the given unique constraint on the
Expand Down Expand Up @@ -465,16 +465,32 @@ func (p *planner) RevalidateUniqueConstraint(
return errors.Newf("unique constraint %s does not exist", constraintName)
}

// revalidateUniqueConstraintsInTable verifies that all unique constraints
// HasVirtualUniqueConstraints returns true if the table has one or more
// constraints that are validated by RevalidateUniqueConstraintsInTable.
func HasVirtualUniqueConstraints(tableDesc catalog.TableDescriptor) bool {
for _, index := range tableDesc.ActiveIndexes() {
if index.IsUnique() && index.GetPartitioning().NumImplicitColumns() > 0 {
return true
}
}
for _, uc := range tableDesc.GetUniqueWithoutIndexConstraints() {
if uc.Validity == descpb.ConstraintValidity_Validated {
return true
}
}
return false
}

// RevalidateUniqueConstraintsInTable verifies that all unique constraints
// defined on the given table are valid. In other words, it verifies that all
// rows in the table have unique values for every unique constraint defined on
// the table.
//
// Note that we only need to validate UNIQUE constraints that are not already
// enforced by an index. This includes implicitly partitioned UNIQUE indexes
// and UNIQUE WITHOUT INDEX constraints.
func (p *planner) revalidateUniqueConstraintsInTable(
ctx context.Context, tableDesc catalog.TableDescriptor,
func RevalidateUniqueConstraintsInTable(
ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor, tableDesc catalog.TableDescriptor,
) error {
// Check implicitly partitioned UNIQUE indexes.
for _, index := range tableDesc.ActiveIndexes() {
Expand All @@ -485,8 +501,8 @@ func (p *planner) revalidateUniqueConstraintsInTable(
index.GetName(),
index.IndexDesc().KeyColumnIDs[index.GetPartitioning().NumImplicitColumns():],
index.GetPredicate(),
p.ExecCfg().InternalExecutor,
p.Txn(),
ie,
txn,
true, /* preExisting */
); err != nil {
log.Errorf(ctx, "validation of unique constraints failed for table %s: %s", tableDesc.GetName(), err)
Expand All @@ -504,8 +520,8 @@ func (p *planner) revalidateUniqueConstraintsInTable(
uc.Name,
uc.ColumnIDs,
uc.Predicate,
p.ExecCfg().InternalExecutor,
p.Txn(),
ie,
txn,
true, /* preExisting */
); err != nil {
log.Errorf(ctx, "validation of unique constraints failed for table %s: %s", tableDesc.GetName(), err)
Expand Down
33 changes: 33 additions & 0 deletions pkg/sql/importer/import_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,10 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error {
return err
}

if err := r.checkVirtualConstraints(ctx, p.ExecCfg(), r.job); err != nil {
return err
}

// If the table being imported into referenced UDTs, ensure that a concurrent
// schema change on any of the typeDescs has not modified the type descriptor. If
// it has, it is unsafe to import the data and we fail the import job.
Expand Down Expand Up @@ -1097,6 +1101,35 @@ func (r *importResumer) publishSchemas(ctx context.Context, execCfg *sql.Executo
})
}

// checkVirtualConstraints checks constraints that are enforced via runtime
// checks, such as uniqueness checks that are not directly backed by an index.
func (*importResumer) checkVirtualConstraints(
ctx context.Context, execCfg *sql.ExecutorConfig, job *jobs.Job,
) error {
for _, tbl := range job.Details().(jobspb.ImportDetails).Tables {
desc := tabledesc.NewBuilder(tbl.Desc).BuildExistingMutableTable()
desc.SetPublic()

if sql.HasVirtualUniqueConstraints(desc) {
if err := job.RunningStatus(ctx, nil /* txn */, func(_ context.Context, _ jobspb.Details) (jobs.RunningStatus, error) {
return jobs.RunningStatus(fmt.Sprintf("re-validating %s", desc.GetName())), nil
}); err != nil {
return errors.Wrapf(err, "failed to update running status of job %d", errors.Safe(job.ID()))
}
}

if err := execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
ie := execCfg.InternalExecutorFactory(ctx, sql.NewFakeSessionData(execCfg.SV()))
return ie.WithSyntheticDescriptors([]catalog.Descriptor{desc}, func() error {
return sql.RevalidateUniqueConstraintsInTable(ctx, txn, ie, desc)
})
}); err != nil {
return err
}
}
return nil
}

// checkForUDTModification checks whether any of the types referenced by the
// table being imported into have been modified incompatibly since they were
// read during import planning. If they have, it may be unsafe to continue
Expand Down

0 comments on commit ca927f0

Please sign in to comment.