Skip to content

Commit

Permalink
sql: fix race conditions in index merge progress tracking
Browse files Browse the repository at this point in the history
This move us further away from the way the BackfillTracker works in
the schemachanger package, but I wanted to start with something seems
to pass the race tests before making any further changes.

The main change here is that previously the TodoSpans slice-of-slices
was passed around and ultimately accessed outside of any locking in a
number of places. To avoid this, we replaced the Get/Set API with an
Update API and changed the flush call to make a copy of the progress
before trying to update the job. We also copy the MergeProgress when
we first get passed it to avoid referencing a TodoSpans that someone
else has a reference to.

Release note: None
  • Loading branch information
stevendanna committed Feb 2, 2022
1 parent 2e2805c commit ad30009
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 35 deletions.
21 changes: 11 additions & 10 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -1063,7 +1063,7 @@ func (sc *SchemaChanger) distIndexBackfill(
if updatedTodoSpans == nil {
return nil
}
nRanges, err := numRangesInSpans(ctx, sc.db, sc.distSQLPlanner, mu.updatedTodoSpans)
nRanges, err := numRangesInSpans(ctx, sc.db, sc.distSQLPlanner, updatedTodoSpans)
if err != nil {
return err
}
Expand Down Expand Up @@ -2624,22 +2624,23 @@ func (sc *SchemaChanger) distIndexMerge(
tracker := NewIndexMergeTracker(progress, sc.job)
periodicFlusher := newPeriodicProgressFlusher(sc.settings)

metaFn := func(_ context.Context, meta *execinfrapb.ProducerMetadata) error {
metaFn := func(ctx context.Context, meta *execinfrapb.ProducerMetadata) error {
if meta.BulkProcessorProgress != nil {

idxCompletedSpans := make(map[int32][]roachpb.Span)

for i, sp := range meta.BulkProcessorProgress.CompletedSpans {
spanIdx := meta.BulkProcessorProgress.CompletedSpanIdx[i]
idxCompletedSpans[spanIdx] = append(idxCompletedSpans[spanIdx], sp)
}
currentProgress := tracker.GetMergeProgress()

for idx, completedSpans := range idxCompletedSpans {
currentProgress.TodoSpans[idx] = roachpb.SubtractSpans(currentProgress.TodoSpans[idx], completedSpans)
tracker.UpdateMergeProgress(ctx, func(_ context.Context, currentProgress *MergeProgress) {
for idx, completedSpans := range idxCompletedSpans {
currentProgress.TodoSpans[idx] = roachpb.SubtractSpans(currentProgress.TodoSpans[idx], completedSpans)
}
})
if sc.testingKnobs.AlwaysUpdateIndexBackfillDetails {
if err := tracker.FlushCheckpoint(ctx); err != nil {
return err
}
}

tracker.SetMergeProgress(ctx, currentProgress)
}
return nil
}
Expand Down
49 changes: 31 additions & 18 deletions pkg/sql/mvcc_backfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,24 @@ type MergeProgress struct {
AddedIndexes, TemporaryIndexes []descpb.IndexID
}

func (mp *MergeProgress) Copy() *MergeProgress {
newp := &MergeProgress{
TodoSpans: make([][]roachpb.Span, len(mp.TodoSpans)),
MutationIdx: make([]int, len(mp.MutationIdx)),
AddedIndexes: make([]descpb.IndexID, len(mp.AddedIndexes)),
TemporaryIndexes: make([]descpb.IndexID, len(mp.TemporaryIndexes)),
}
copy(newp.MutationIdx, mp.MutationIdx)
copy(newp.AddedIndexes, mp.AddedIndexes)
copy(newp.TemporaryIndexes, mp.TemporaryIndexes)
for i, spanSlice := range mp.TodoSpans {
newSpanSlice := make([]roachpb.Span, len(spanSlice))
copy(newSpanSlice, spanSlice)
newp.TodoSpans[i] = newSpanSlice
}
return newp
}

// IndexMergeTracker abstracts the infrastructure to read and write merge
// progress to job state.
type IndexMergeTracker struct {
Expand All @@ -132,19 +150,21 @@ var _ scexec.BackfillProgressFlusher = (*IndexMergeTracker)(nil)
// NewIndexMergeTracker creates a new IndexMergeTracker
func NewIndexMergeTracker(progress *MergeProgress, job *jobs.Job) *IndexMergeTracker {
imt := IndexMergeTracker{}
imt.mu.progress = progress
imt.mu.progress = progress.Copy()
imt.job = job
return &imt
}

// FlushCheckpoint writes out a checkpoint containing any data which has been
// previously set via SetMergeProgress
// FlushCheckpoint writes out a checkpoint containing any data which
// has been previously updated via UpdateMergeProgress.
func (imt *IndexMergeTracker) FlushCheckpoint(ctx context.Context) error {
progress := imt.GetMergeProgress()

if progress.TodoSpans == nil {
imt.mu.Lock()
if imt.mu.progress.TodoSpans == nil {
imt.mu.Unlock()
return nil
}
progress := imt.mu.progress.Copy()
imt.mu.Unlock()

details, ok := imt.job.Details().(jobspb.SchemaChangeDetails)
if !ok {
Expand All @@ -168,20 +188,13 @@ func (imt *IndexMergeTracker) FlushFractionCompleted(ctx context.Context) error
return nil
}

// SetMergeProgress sets the progress for all index merges. Setting the progress
// does not make that progress durable as the tracker may invoke FlushCheckpoint
// later.
func (imt *IndexMergeTracker) SetMergeProgress(ctx context.Context, progress *MergeProgress) {
imt.mu.Lock()
imt.mu.progress = progress
imt.mu.Unlock()
}

// GetMergeProgress reads the current merge progress.
func (imt *IndexMergeTracker) GetMergeProgress() *MergeProgress {
// UpdateMergeProgress allow the caller to modify the current progress with updateFn.
func (imt *IndexMergeTracker) UpdateMergeProgress(
ctx context.Context, updateFn func(ctx context.Context, progress *MergeProgress),
) {
imt.mu.Lock()
defer imt.mu.Unlock()
return imt.mu.progress
updateFn(ctx, imt.mu.progress)
}

func newPeriodicProgressFlusher(settings *cluster.Settings) scexec.PeriodicProgressFlusher {
Expand Down
13 changes: 9 additions & 4 deletions pkg/sql/mvcc_backfiller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

// Test schema changes are retried and complete properly when there's an error
Expand Down Expand Up @@ -123,9 +124,7 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v INT);

// TODO(rui): use testing hook instead of cluster setting once this value for
// the backfill merge is hooked up to testing hooks.
if _, err := sqlDB.Exec(fmt.Sprintf(`
SET CLUSTER SETTING bulkio.index_backfill.batch_size = %d;
SET CLUSTER SETTING bulkio.index_backfill.checkpoint_interval = '%s'`, maxValue/5, "0.001ms")); err != nil {
if _, err := sqlDB.Exec(fmt.Sprintf(`SET CLUSTER SETTING bulkio.index_backfill.batch_size = %d;`, maxValue/5)); err != nil {
t.Fatal(err)
}

Expand All @@ -145,7 +144,13 @@ SET CLUSTER SETTING bulkio.index_backfill.checkpoint_interval = '%s'`, maxValue/
t.Fatal(err)
}

addIndexSchemaChange(t, sqlDB, kvDB, maxValue, 2)
addIndexSchemaChange(t, sqlDB, kvDB, maxValue, 2, func() {
if _, err := sqlDB.Exec("SHOW JOBS WHEN COMPLETE (SELECT job_id FROM [SHOW JOBS])"); err != nil {
t.Fatal(err)
}
})
require.True(t, mergeChunk > 3, fmt.Sprintf("mergeChunk: %d", mergeChunk))

}

// Test index backfill merges are not affected by various operations that run
Expand Down
10 changes: 7 additions & 3 deletions pkg/sql/schema_changer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1059,7 +1059,7 @@ COMMIT;

// Add an index and check that it succeeds.
func addIndexSchemaChange(
t *testing.T, sqlDB *gosql.DB, kvDB *kv.DB, maxValue int, numKeysPerRow int,
t *testing.T, sqlDB *gosql.DB, kvDB *kv.DB, maxValue int, numKeysPerRow int, waitFn func(),
) {
if _, err := sqlDB.Exec("CREATE UNIQUE INDEX foo ON t.test (v)"); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1093,6 +1093,10 @@ func addIndexSchemaChange(

ctx := context.Background()

if waitFn != nil {
waitFn()
}

if err := sqltestutils.CheckTableKeyCount(ctx, kvDB, numKeysPerRow, maxValue); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1298,7 +1302,7 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v INT);
t.Fatal(err)
}

addIndexSchemaChange(t, sqlDB, kvDB, maxValue, 2)
addIndexSchemaChange(t, sqlDB, kvDB, maxValue, 2, nil)

currChunk = 0
seenSpan = roachpb.Span{}
Expand Down Expand Up @@ -1452,7 +1456,7 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v INT);
t.Fatal(err)
}

addIndexSchemaChange(t, sqlDB, kvDB, maxValue, 2)
addIndexSchemaChange(t, sqlDB, kvDB, maxValue, 2, nil)
if num := atomic.SwapUint32(&numBackfills, 0); num != 2 {
t.Fatalf("expected %d backfills, but saw %d", 2, num)
}
Expand Down

0 comments on commit ad30009

Please sign in to comment.