Skip to content

Commit

Permalink
backupccl: fix a span use-after-Finish
Browse files Browse the repository at this point in the history
This processor was spawning a goroutine whose tracing span is finished
externally when the processor is moved to draining. That might have
happened while the goroutine was running, which caused the span to be
possibly used afterwards. That's not cool. This patch overrides the
processor's draining process so that it properly waits for the goroutine
to finish before finishing the span.

Release note: None
  • Loading branch information
andreimatei committed Nov 10, 2021
1 parent a539b38 commit 799b21b
Showing 1 changed file with 21 additions and 2 deletions.
23 changes: 21 additions & 2 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,11 @@ type backupDataProcessor struct {
spec execinfrapb.BackupDataSpec
output execinfra.RowReceiver

progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress
backupErr error
// cancelAndWaitForWorker cancels the producer goroutine and waits for it to
// finish. It can be called multiple times.
cancelAndWaitForWorker func()
progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress
backupErr error
}

var _ execinfra.Processor = &backupDataProcessor{}
Expand All @@ -136,6 +139,10 @@ func newBackupDataProcessor(
execinfra.ProcStateOpts{
// This processor doesn't have any inputs to drain.
InputsToDrain: nil,
TrailingMetaCallback: func() []execinfrapb.ProducerMetadata {
bp.close()
return nil
},
}); err != nil {
return nil, err
}
Expand All @@ -145,7 +152,14 @@ func newBackupDataProcessor(
// Start is part of the RowSource interface.
func (bp *backupDataProcessor) Start(ctx context.Context) {
ctx = bp.StartInternal(ctx, backupProcessorName)
ctx, cancel := context.WithCancel(ctx)
bp.cancelAndWaitForWorker = func() {
cancel()
for range bp.progCh {
}
}
go func() {
defer cancel()
defer close(bp.progCh)
bp.backupErr = runBackupProcessor(ctx, bp.flowCtx, &bp.spec, bp.progCh)
}()
Expand Down Expand Up @@ -173,6 +187,11 @@ func (bp *backupDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Producer
return nil, bp.DrainHelper()
}

func (bp *backupDataProcessor) close() {
bp.cancelAndWaitForWorker()
bp.ProcessorBase.InternalClose()
}

type spanAndTime struct {
// spanIdx is a unique identifier of this object.
spanIdx int
Expand Down

0 comments on commit 799b21b

Please sign in to comment.