Skip to content

Commit

Permalink
Merge #91747 #91871
Browse files Browse the repository at this point in the history
91747: tabledesc: add validation for MutationJobs r=postamar a=postamar

Previously, the contents of this slice were almost not validated. This has become a problem because the declarative schema changer relies on it to forbid schema changes concurrent with the legacy schema changer. This patch addresses this by adding the missing validation logic. Henceforth a nonempty MutationJobs slice implies a nonempty Mutations slice.

Fixes #87945.

Release note: None

91871: colexecdisk: fix a rare flake in TestExternalSortMemoryAccounting r=yuzefovich a=yuzefovich

We recently made a change in how we're growing the flat buffer for non-inlined values in the Bytes vector (we now double the capacity). This means that we now might exceed the memory limit sooner (in terms of the batches buffered) than previously since we now have larger allocated capacity. This also triggers a difference in how we're calculating the "proportional size" of the first `n` elements of the Bytes vector. As a result, a test became flaky since we now might create less partitions than the test expects, so this commit relaxes the assertion.

I did spend some time looking into whether the new behavior is concerning, and I don't think it is.

Fixes: #91850.

Release note: None

Co-authored-by: Marius Posta <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
3 people committed Nov 14, 2022
3 parents a8b0cd9 + 7526a97 + 893e44b commit 4660d3b
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 11 deletions.
2 changes: 1 addition & 1 deletion pkg/sql/catalog/tabledesc/table_desc.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (desc *wrapper) GetPostDeserializationChanges() catalog.PostDeserialization
func (desc *wrapper) HasConcurrentSchemaChanges() bool {
return (desc.DeclarativeSchemaChangerState != nil &&
desc.DeclarativeSchemaChangerState.JobID != catpb.InvalidJobID) ||
(len(desc.Mutations) > 0 && len(desc.MutationJobs) > 0)
len(desc.MutationJobs) > 0
}

// SkipNamespace implements the descriptor interface.
Expand Down
20 changes: 20 additions & 0 deletions pkg/sql/catalog/tabledesc/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,13 +675,33 @@ func (desc *wrapper) ValidateSelf(vea catalog.ValidationErrorAccumulator) {

// Validate mutations and exit early if any of these are deeply corrupted.
{
var mutationIDs util.FastIntSet
mutationsHaveErrs := false
for _, m := range desc.Mutations {
mutationIDs.Add(int(m.MutationID))
if err := validateMutation(&m); err != nil {
vea.Report(err)
mutationsHaveErrs = true
}
}
// Check that job IDs are uniquely mapped to mutation IDs in the
// MutationJobs slice.
jobIDs := make(map[descpb.MutationID]catpb.JobID)
for _, mj := range desc.MutationJobs {
if !mutationIDs.Contains(int(mj.MutationID)) {
vea.Report(errors.AssertionFailedf("unknown mutation ID %d associated with job ID %d",
mj.MutationID, mj.JobID))
mutationsHaveErrs = true
continue
}
if jobID, found := jobIDs[mj.MutationID]; found {
vea.Report(errors.AssertionFailedf("two job IDs %d and %d mapped to the same mutation ID %d",
jobID, mj.JobID, mj.MutationID))
mutationsHaveErrs = true
continue
}
jobIDs[mj.MutationID] = mj.JobID
}
if mutationsHaveErrs {
return
}
Expand Down
63 changes: 63 additions & 0 deletions pkg/sql/catalog/tabledesc/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2279,6 +2279,69 @@ func TestValidateTableDesc(t *testing.T) {
DurationExpr: catpb.Expression("INTERVAL '2 minutes'"),
},
}},
{`unknown mutation ID 123 associated with job ID 456`,
descpb.TableDescriptor{
ID: 2,
ParentID: 1,
Name: "foo",
FormatVersion: descpb.InterleavedFormatVersion,
MutationJobs: []descpb.TableDescriptor_MutationJob{
{MutationID: 123, JobID: 456},
},
Columns: []descpb.ColumnDescriptor{
{
ID: 1,
Name: "bar",
},
},
Families: []descpb.ColumnFamilyDescriptor{
{ID: 0, Name: "primary", ColumnIDs: []descpb.ColumnID{1}, ColumnNames: []string{"bar"}},
},
NextColumnID: 2,
NextFamilyID: 1,
}},
{`two job IDs 12345 and 45678 mapped to the same mutation ID 1`,
descpb.TableDescriptor{
ID: 2,
ParentID: 1,
Name: "foo",
FormatVersion: descpb.InterleavedFormatVersion,
MutationJobs: []descpb.TableDescriptor_MutationJob{
{MutationID: 1, JobID: 12345},
{MutationID: 1, JobID: 45678},
},
Mutations: []descpb.DescriptorMutation{
{
Descriptor_: &descpb.DescriptorMutation_Index{
Index: &descpb.IndexDescriptor{
ID: 3,
Name: "new_primary_key",
Unique: true,
KeyColumnIDs: []descpb.ColumnID{3},
KeyColumnNames: []string{"c3"},
KeyColumnDirections: []catpb.IndexColumn_Direction{catpb.IndexColumn_ASC},
Version: descpb.LatestIndexDescriptorVersion,
EncodingType: descpb.PrimaryIndexEncoding,
ConstraintID: 1,
},
},
Direction: descpb.DescriptorMutation_ADD,
State: descpb.DescriptorMutation_DELETE_ONLY,
MutationID: 1,
},
},
Columns: []descpb.ColumnDescriptor{
{
ID: 1,
Name: "bar",
},
},
Families: []descpb.ColumnFamilyDescriptor{
{ID: 0, Name: "primary", ColumnIDs: []descpb.ColumnID{1}, ColumnNames: []string{"bar"}},
},
NextColumnID: 2,
NextFamilyID: 1,
}},
}
for i, d := range testData {
t.Run(d.err, func(t *testing.T) {
Expand Down
15 changes: 8 additions & 7 deletions pkg/sql/colexec/colexecdisk/external_sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ func TestExternalSortMemoryAccounting(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.WithIssue(t, 91850) // flaky test
skip.UnderStress(t, "the test is very memory-intensive and is likely to OOM under stress")

ctx := context.Background()
Expand Down Expand Up @@ -134,12 +133,14 @@ func TestExternalSortMemoryAccounting(t *testing.T) {
// numInMemoryBufferedBatches (first merge = 2x, second merge = 3x, third
// merge 4x, etc, so we expect 2*numNewPartitions-1 partitions).
expMaxTotalPartitionsCreated := 2*numNewPartitions - 1
// Because of the fact that we are creating partitions slightly larger than
// memoryLimit in size and because of our "after the fact" memory
// accounting, we might create less partitions than maximum defined above
// (e.g., if numNewPartitions is 4, then we will create 3 partitions when
// batch size is 3).
expMinTotalPartitionsCreated := numNewPartitions - 1
// Since we are creating partitions slightly larger than memoryLimit in size
// and because of our "after the fact" memory accounting, we might create
// fewer partitions than the target (e.g., if numNewPartitions is 4, then we
// will create 3 partitions when batch size is 3). However, we might not
// even create numNewPartitions-1 in edge cases (due to how we grow
// coldata.Bytes.buffer when setting values), so we opt for a sanity check
// that at least two partitions were created that must always be true.
expMinTotalPartitionsCreated := 2
require.GreaterOrEqualf(t, numPartitionsCreated, expMinTotalPartitionsCreated,
"didn't create enough partitions: actual %d, min expected %d",
numPartitionsCreated, expMinTotalPartitionsCreated,
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/crdb_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,9 @@ UPDATE system.namespace SET id = %d WHERE id = %d;
` referenced table ID %d: referenced descriptor not found`,
tableTblID, tableFkTblID, tableFkTblID),
},
{fmt.Sprintf("%d", tableNoJobID), "defaultdb", "public", "nojob",
fmt.Sprintf(`relation "nojob" (%d): unknown mutation ID 0 associated with job ID 123456`, tableNoJobID),
},
{fmt.Sprintf("%d", tableNoJobID), "defaultdb", "public", "nojob", `mutation job 123456: job not found`},
{fmt.Sprintf("%d", schemaID), fmt.Sprintf("[%d]", databaseID), "public", "",
fmt.Sprintf(`schema "public" (%d): referenced database ID %d: referenced descriptor not found`, schemaID, databaseID),
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/doctor/doctor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,7 @@ func TestExamineDescriptors(t *testing.T) {
},
},
expected: `Examining 2 descriptors and 2 namespace entries...
ParentID 52, ParentSchemaID 29: relation "t" (51): unknown mutation ID 1 associated with job ID 123
ParentID 52, ParentSchemaID 29: relation "t" (51): mutation job 123 has terminal status (canceled)
`,
},
Expand Down
14 changes: 11 additions & 3 deletions pkg/sql/truncate.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,17 @@ func (p *planner) truncateTable(ctx context.Context, id descpb.ID, jobDesc strin
}

// Create new ID's for all of the indexes in the table.
version := p.ExecCfg().Settings.Version.ActiveVersion(ctx)
if err := tableDesc.AllocateIDs(ctx, version); err != nil {
return err
{
version := p.ExecCfg().Settings.Version.ActiveVersion(ctx)
// Temporarily empty the mutation jobs slice otherwise the descriptor
// validation performed by AllocateIDs will fail: the Mutations slice
// has been emptied but MutationJobs only gets emptied later on.
mutationJobs := tableDesc.MutationJobs
tableDesc.MutationJobs = nil
if err := tableDesc.AllocateIDs(ctx, version); err != nil {
return err
}
tableDesc.MutationJobs = mutationJobs
}

// Construct a mapping from old index ID's to new index ID's.
Expand Down

0 comments on commit 4660d3b

Please sign in to comment.