Skip to content

Commit

Permalink
sql: handle missing job when during mutations GC
Browse files Browse the repository at this point in the history
There are cases where the schema changer tries to clean out the
mutations on a descriptor. When it cannot find the job it currently
throws an error and this does not let the transaction that cleans the
job from the descriptor commit. However, if it cannot find the job then
there is no chance it will find the job later. Currently, the schema
changer will repeatedly attempt to clean this job and always fail since
it cannot find the job.

It should consider the job as successful if it cannot find the job and
continue committing the transaction that does the cleanup for this
mutation.

Release note (bug fix): Stop repeatedly looking for non-existing jobs,
which may cause  high memory usage, when cleaning up schema changes.
  • Loading branch information
pbardea committed Feb 4, 2020
1 parent 4ac7dc3 commit 359901a
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 1 deletion.
3 changes: 2 additions & 1 deletion pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,8 @@ func (sc *SchemaChanger) maybeGCMutations(
func(txn *client.Txn) error {
job, err := sc.jobRegistry.LoadJobWithTxn(ctx, mutation.JobID, txn)
if err != nil {
return err
log.Warningf(ctx, "ignoring error during logEvent while GCing mutations: %+v", err)
return nil
}
return job.WithTxn(txn).Succeeded(ctx, jobs.NoopFn)
},
Expand Down
98 changes: 98 additions & 0 deletions pkg/sql/schema_changer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4948,3 +4948,101 @@ ALTER TABLE t.test2 ADD FOREIGN KEY (k) REFERENCES t.test;
t.Fatal(err)
}
}

// TestOrphanedGCMutationsRemoved tests that if a table descriptor has a
// GCMutations which references a job that does not exist anymore, that it will
// eventually be cleaned up anyway. One way this can arise is when a table
// was backed up right after an index deletion.
func TestOrphanedGCMutationsRemoved(t *testing.T) {
defer leaktest.AfterTest(t)()
params, _ := tests.CreateTestServerParams()
const chunkSize = 200
// Disable synchronous schema change processing so that the mutations get
// processed asynchronously.
var enableAsyncSchemaChanges uint32
params.Knobs = base.TestingKnobs{
SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{
AsyncExecNotification: func() error {
if enable := atomic.LoadUint32(&enableAsyncSchemaChanges); enable == 0 {
return errors.New("async schema changes are disabled")
}
return nil
},
AsyncExecQuickly: true,
BackfillChunkSize: chunkSize,
},
}
s, sqlDB, kvDB := serverutils.StartServer(t, params)
defer s.Stopper().Stop(context.Background())

retryOpts := retry.Options{
InitialBackoff: 20 * time.Millisecond,
MaxBackoff: 200 * time.Millisecond,
Multiplier: 2,
}

// Create a k-v table.
if _, err := sqlDB.Exec(`
CREATE DATABASE t;
CREATE TABLE t.test (k INT PRIMARY KEY, v INT8);
`); err != nil {
t.Fatal(err)
}
if _, err := sqlDB.Exec(`CREATE INDEX t_v ON t.test(v)`); err != nil {
t.Fatal(err)
}

// Add some data.
const maxValue = chunkSize + 1
if err := bulkInsertIntoTable(sqlDB, maxValue); err != nil {
t.Fatal(err)
}

tableDesc := sqlbase.GetTableDescriptor(kvDB, "t", "test")
// Wait until indexes are created.
for r := retry.Start(retryOpts); r.Next(); {
tableDesc = sqlbase.GetTableDescriptor(kvDB, "t", "test")
if len(tableDesc.Indexes) == 1 {
break
}
}

if _, err := sqlDB.Exec(`DROP INDEX t.t_v`); err != nil {
t.Fatal(err)
}

tableDesc = sqlbase.GetTableDescriptor(kvDB, "t", "test")
if e := 1; e != len(tableDesc.GCMutations) {
t.Fatalf("e = %d, v = %d", e, len(tableDesc.GCMutations))
}

// Delete the associated job.
jobID := tableDesc.GCMutations[0].JobID
if _, err := sqlDB.Exec(fmt.Sprintf("DELETE FROM system.jobs WHERE id=%d", jobID)); err != nil {
t.Fatal(err)
}

// Ensure the GCMutations has not yet been completed.
tableDesc = sqlbase.GetTableDescriptor(kvDB, "t", "test")
if e := 1; e != len(tableDesc.GCMutations) {
t.Fatalf("e = %d, v = %d", e, len(tableDesc.GCMutations))
}

// Enable async schema change processing for purged schema changes.
atomic.StoreUint32(&enableAsyncSchemaChanges, 1)

// Add immediate GC TTL to allow index creation purge to complete.
if _, err := addImmediateGCZoneConfig(sqlDB, tableDesc.ID); err != nil {
t.Fatal(err)
}

// Ensure that GC mutations that cannot find their job will eventually be
// cleared.
testutils.SucceedsSoon(t, func() error {
tableDesc = sqlbase.GetTableDescriptor(kvDB, "t", "test")
if len(tableDesc.GCMutations) > 0 {
return errors.Errorf("%d gc mutations remaining", len(tableDesc.GCMutations))
}
return nil
})
}

0 comments on commit 359901a

Please sign in to comment.