Skip to content

Commit

Permalink
sql: use a mutex rather than channels in RevertSpansFanout
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
stevendanna committed Mar 18, 2023
1 parent be536a5 commit 67ba17f
Showing 1 changed file with 19 additions and 34 deletions.
53 changes: 19 additions & 34 deletions pkg/sql/revert.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -148,45 +149,29 @@ func RevertSpansFanout(
}
}

runRevertSpanWorkers := func(callback func(context.Context, roachpb.Span) error) error {
errGroup, workerCtx := errgroup.WithContext(ctx)
for i := range workerPartitions {
workerIdx := i
errGroup.Go(func() error {
spans := workerPartitions[workerIdx].Spans
return RevertSpans(workerCtx, db, spans,
targetTime, ignoreGCThreshold, batchSize, callback)
})
var callback func(context.Context, roachpb.Span) error
if onCompletedCallback != nil {
// If we have an onCompletedCallback arrange for it to
// be called serially so that callers don't need to
// worry about making it concurrency safe.
var callbackMutex syncutil.Mutex
callback = func(ctx context.Context, completed roachpb.Span) error {
callbackMutex.Lock()
defer callbackMutex.Unlock()
return onCompletedCallback(ctx, completed)
}
return errGroup.Wait()
}

if onCompletedCallback != nil {
// If we have an onCompletedCallback and are using multiple
// workers, arrange for it to be called serially so that callers
// don't need to worry about making it concurrency safe.
g := ctxgroup.WithContext(ctx)
completedSpanCh := make(chan roachpb.Span)
errCh := make(chan error)
g.GoCtx(func(ctx context.Context) error {
defer close(errCh)
for sp := range completedSpanCh {
errCh <- onCompletedCallback(ctx, sp)
}
return nil
errGroup, workerCtx := errgroup.WithContext(ctx)
for i := range workerPartitions {
workerIdx := i
errGroup.Go(func() error {
spans := workerPartitions[workerIdx].Spans
return RevertSpans(workerCtx, db, spans,
targetTime, ignoreGCThreshold, batchSize, callback)
})
g.GoCtx(func(ctx context.Context) error {
defer close(completedSpanCh)
return runRevertSpanWorkers(func(_ context.Context, completed roachpb.Span) error {
completedSpanCh <- completed
return <-errCh
})
})
return g.Wait()
} else {
// Otherwise, just run the workers directly.
return runRevertSpanWorkers(onCompletedCallback)
}
return errGroup.Wait()
}

// RevertSpans reverts the passed span to the target time, which must be above
Expand Down

0 comments on commit 67ba17f

Please sign in to comment.