Skip to content

Commit

Permalink
Merge pull request #92358 from yuzefovich/backport22.2-91969
Browse files Browse the repository at this point in the history
release-22.2: sql: audit all processors to make their closure bullet-proof
  • Loading branch information
yuzefovich authored Nov 29, 2022
2 parents 734fe89 + 40e57d1 commit d7bc4c5
Show file tree
Hide file tree
Showing 26 changed files with 249 additions and 250 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (bp *backupDataProcessor) close() {
bp.cancelAndWaitForWorker()
bp.agg.Close()
if bp.InternalClose() {
bp.memAcc.Close(bp.Ctx)
bp.memAcc.Close(bp.Ctx())
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,8 +549,8 @@ func (rd *restoreDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Produce
prog.ProgressDetails = *details
case meta := <-rd.metaCh:
return nil, meta
case <-rd.Ctx.Done():
rd.MoveToDraining(rd.Ctx.Err())
case <-rd.Ctx().Done():
rd.MoveToDraining(rd.Ctx().Err())
return nil, rd.DrainHelper()
}

Expand Down
9 changes: 2 additions & 7 deletions pkg/ccl/backupccl/restore_data_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,7 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) {
}
expectedKVs := slurpSSTablesLatestKey(t, filepath.Join(dir, "foo"), slurp, srcPrefix, newPrefix)

mockRestoreDataProcessor, err := newTestingRestoreDataProcessor(ctx, &evalCtx, &flowCtx,
mockRestoreDataSpec)
mockRestoreDataProcessor, err := newTestingRestoreDataProcessor(&evalCtx, &flowCtx, mockRestoreDataSpec)
require.NoError(t, err)
ssts := make(chan mergedSST, 1)
require.NoError(t, mockRestoreDataProcessor.openSSTs(ctx, restoreSpanEntry, ssts))
Expand Down Expand Up @@ -427,15 +426,11 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) {
}

func newTestingRestoreDataProcessor(
ctx context.Context,
evalCtx *eval.Context,
flowCtx *execinfra.FlowCtx,
spec execinfrapb.RestoreDataSpec,
evalCtx *eval.Context, flowCtx *execinfra.FlowCtx, spec execinfrapb.RestoreDataSpec,
) (*restoreDataProcessor, error) {
rd := &restoreDataProcessor{
ProcessorBase: execinfra.ProcessorBase{
ProcessorBaseNoHelper: execinfra.ProcessorBaseNoHelper{
Ctx: ctx,
EvalCtx: evalCtx,
},
},
Expand Down
44 changes: 22 additions & 22 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,20 +448,20 @@ func (ca *changeAggregator) close() {
}
if ca.eventConsumer != nil {
if err := ca.eventConsumer.Close(); err != nil {
log.Warningf(ca.Ctx, "error closing event consumer: %s", err)
log.Warningf(ca.Ctx(), "error closing event consumer: %s", err)
}
}

if ca.sink != nil {
if err := ca.sink.Close(); err != nil {
log.Warningf(ca.Ctx, `error closing sink. goroutines may have leaked: %v`, err)
log.Warningf(ca.Ctx(), `error closing sink. goroutines may have leaked: %v`, err)
}
}
ca.memAcc.Close(ca.Ctx)
ca.memAcc.Close(ca.Ctx())
if ca.kvFeedMemMon != nil {
ca.kvFeedMemMon.Stop(ca.Ctx)
ca.kvFeedMemMon.Stop(ca.Ctx())
}
ca.MemMonitor.Stop(ca.Ctx)
ca.MemMonitor.Stop(ca.Ctx())
ca.InternalClose()
}

Expand Down Expand Up @@ -505,7 +505,7 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet
// kvFeed, sends off this event to the event consumer, and flushes the sink
// if necessary.
func (ca *changeAggregator) tick() error {
event, err := ca.eventProducer.Get(ca.Ctx)
event, err := ca.eventProducer.Get(ca.Ctx())
if err != nil {
return err
}
Expand All @@ -520,16 +520,16 @@ func (ca *changeAggregator) tick() error {
ca.sliMetrics.AdmitLatency.RecordValue(timeutil.Since(event.Timestamp().GoTime()).Nanoseconds())
}
ca.recentKVCount++
return ca.eventConsumer.ConsumeEvent(ca.Ctx, event)
return ca.eventConsumer.ConsumeEvent(ca.Ctx(), event)
case kvevent.TypeResolved:
a := event.DetachAlloc()
a.Release(ca.Ctx)
a.Release(ca.Ctx())
resolved := event.Resolved()
if ca.knobs.FilterSpanWithMutation == nil || !ca.knobs.FilterSpanWithMutation(&resolved) {
return ca.noteResolvedSpan(resolved)
}
case kvevent.TypeFlush:
return ca.sink.Flush(ca.Ctx)
return ca.sink.Flush(ca.Ctx())
}

return nil
Expand Down Expand Up @@ -572,7 +572,7 @@ func (ca *changeAggregator) flushFrontier() error {
// otherwise, we could lose buffered messages and violate the
// at-least-once guarantee. This is also true for checkpointing the
// resolved spans in the job progress.
if err := ca.sink.Flush(ca.Ctx); err != nil {
if err := ca.sink.Flush(ca.Ctx()); err != nil {
return err
}

Expand Down Expand Up @@ -1000,11 +1000,11 @@ func (cf *changeFrontier) close() {
}
if cf.sink != nil {
if err := cf.sink.Close(); err != nil {
log.Warningf(cf.Ctx, `error closing sink. goroutines may have leaked: %v`, err)
log.Warningf(cf.Ctx(), `error closing sink. goroutines may have leaked: %v`, err)
}
}
cf.memAcc.Close(cf.Ctx)
cf.MemMonitor.Stop(cf.Ctx)
cf.memAcc.Close(cf.Ctx())
cf.MemMonitor.Stop(cf.Ctx())
}
}

Expand Down Expand Up @@ -1108,7 +1108,7 @@ func (cf *changeFrontier) noteAggregatorProgress(d rowenc.EncDatum) error {
// job progress update closure, but it currently doesn't pass along the info
// we'd need to do it that way.
if !resolved.Timestamp.IsEmpty() && resolved.Timestamp.Less(cf.highWaterAtStart) {
logcrash.ReportOrPanic(cf.Ctx, &cf.flowCtx.Cfg.Settings.SV,
logcrash.ReportOrPanic(cf.Ctx(), &cf.flowCtx.Cfg.Settings.SV,
`got a span level timestamp %s for %s that is less than the initial high-water %s`,
redact.Safe(resolved.Timestamp), resolved.Span, redact.Safe(cf.highWaterAtStart))
continue
Expand Down Expand Up @@ -1210,7 +1210,7 @@ func (cf *changeFrontier) maybeCheckpointJob(
if err != nil {
return false, err
}
cf.js.checkpointCompleted(cf.Ctx, timeutil.Since(checkpointStart))
cf.js.checkpointCompleted(cf.Ctx(), timeutil.Since(checkpointStart))
return updated, nil
}

Expand All @@ -1228,7 +1228,7 @@ func (cf *changeFrontier) checkpointJobProgress(
var updateSkipped error
if cf.js.job != nil {

if err := cf.js.job.Update(cf.Ctx, nil, func(
if err := cf.js.job.Update(cf.Ctx(), nil, func(
txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater,
) error {
// If we're unable to update the job due to the job state, such as during
Expand All @@ -1253,8 +1253,8 @@ func (cf *changeFrontier) checkpointJobProgress(
if !changefeedbase.ActiveProtectedTimestampsEnabled.Get(&cf.flowCtx.Cfg.Settings.SV) {
timestampManager = cf.deprecatedManageProtectedTimestamps
}
if err := timestampManager(cf.Ctx, txn, changefeedProgress); err != nil {
log.Warningf(cf.Ctx, "error managing protected timestamp record: %v", err)
if err := timestampManager(cf.Ctx(), txn, changefeedProgress); err != nil {
log.Warningf(cf.Ctx(), "error managing protected timestamp record: %v", err)
return err
}

Expand Down Expand Up @@ -1282,7 +1282,7 @@ func (cf *changeFrontier) checkpointJobProgress(
}

if updateSkipped != nil {
log.Warningf(cf.Ctx, "skipping changefeed checkpoint: %s", updateSkipped)
log.Warningf(cf.Ctx(), "skipping changefeed checkpoint: %s", updateSkipped)
return false, nil
}

Expand Down Expand Up @@ -1381,7 +1381,7 @@ func (cf *changeFrontier) maybeEmitResolved(newResolved hlc.Timestamp) error {
if !shouldEmit {
return nil
}
if err := emitResolvedTimestamp(cf.Ctx, cf.encoder, cf.sink, newResolved); err != nil {
if err := emitResolvedTimestamp(cf.Ctx(), cf.encoder, cf.sink, newResolved); err != nil {
return err
}
cf.lastEmitResolved = newResolved.GoTime()
Expand Down Expand Up @@ -1420,13 +1420,13 @@ func (cf *changeFrontier) maybeLogBehindSpan(frontierChanged bool) {
description = fmt.Sprintf("job %d", cf.spec.JobID)
}
if frontierChanged && cf.slowLogEveryN.ShouldProcess(now) {
log.Infof(cf.Ctx, "%s new resolved timestamp %s is behind by %s",
log.Infof(cf.Ctx(), "%s new resolved timestamp %s is behind by %s",
description, frontier, resolvedBehind)
}

if cf.slowLogEveryN.ShouldProcess(now) {
s := cf.frontier.PeekFrontierSpan()
log.Infof(cf.Ctx, "%s span %s is behind by %s", description, s, resolvedBehind)
log.Infof(cf.Ctx(), "%s span %s is behind by %s", description, s, resolvedBehind)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,13 +297,13 @@ func (sf *streamIngestionFrontier) Next() (

if err := sf.maybeUpdatePartitionProgress(); err != nil {
// Updating the partition progress isn't a fatal error.
log.Errorf(sf.Ctx, "failed to update partition progress: %+v", err)
log.Errorf(sf.Ctx(), "failed to update partition progress: %+v", err)
}

// Send back a row to the job so that it can update the progress.
select {
case <-sf.Ctx.Done():
sf.MoveToDraining(sf.Ctx.Err())
case <-sf.Ctx().Done():
sf.MoveToDraining(sf.Ctx().Err())
return nil, sf.DrainHelper()
// Send the latest persisted highwater in the heartbeat to the source cluster
// as even with retries we will never request an earlier row than it, and
Expand All @@ -314,7 +314,7 @@ func (sf *streamIngestionFrontier) Next() (
case <-sf.heartbeatSender.stoppedChan:
err := sf.heartbeatSender.wait()
if err != nil {
log.Errorf(sf.Ctx, "heartbeat sender exited with error: %s", err)
log.Errorf(sf.Ctx(), "heartbeat sender exited with error: %s", err)
}
sf.MoveToDraining(err)
return nil, sf.DrainHelper()
Expand All @@ -325,7 +325,7 @@ func (sf *streamIngestionFrontier) Next() (

func (sf *streamIngestionFrontier) close() {
if err := sf.heartbeatSender.stop(); err != nil {
log.Errorf(sf.Ctx, "heartbeat sender exited with error: %s", err)
log.Errorf(sf.Ctx(), "heartbeat sender exited with error: %s", err)
}
if sf.InternalClose() {
sf.metrics.RunningCount.Dec(1)
Expand Down Expand Up @@ -391,7 +391,7 @@ func (sf *streamIngestionFrontier) noteResolvedTimestamps(
// maybeUpdatePartitionProgress polls the frontier and updates the job progress with
// partition-specific information to track the status of each partition.
func (sf *streamIngestionFrontier) maybeUpdatePartitionProgress() error {
ctx := sf.Ctx
ctx := sf.Ctx()
updateFreq := JobCheckpointFrequency.Get(&sf.flowCtx.Cfg.Settings.SV)
if updateFreq == 0 || timeutil.Since(sf.lastPartitionUpdate) < updateFreq {
return nil
Expand Down
12 changes: 6 additions & 6 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,10 +404,10 @@ func (sip *streamIngestionProcessor) close() {
}

for _, client := range sip.streamPartitionClients {
_ = client.Close(sip.Ctx)
_ = client.Close(sip.Ctx())
}
if sip.batcher != nil {
sip.batcher.Close(sip.Ctx)
sip.batcher.Close(sip.Ctx())
}
if sip.maxFlushRateTimer != nil {
sip.maxFlushRateTimer.Stop()
Expand Down Expand Up @@ -538,7 +538,7 @@ func (sip *streamIngestionProcessor) consumeEvents() (*jobspb.ResolvedSpans, err

if streamingKnobs, ok := sip.FlowCtx.TestingKnobs().StreamingTestingKnobs.(*sql.StreamingTestingKnobs); ok {
if streamingKnobs != nil && streamingKnobs.RunAfterReceivingEvent != nil {
if err := streamingKnobs.RunAfterReceivingEvent(sip.Ctx); err != nil {
if err := streamingKnobs.RunAfterReceivingEvent(sip.Ctx()); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -602,7 +602,7 @@ func (sip *streamIngestionProcessor) bufferSST(sst *roachpb.RangeFeedSSTable) er
// careful with checkpoints: we can only send checkpoint whose TS >= SST batch TS
// after the full SST gets ingested.

_, sp := tracing.ChildSpan(sip.Ctx, "stream-ingestion-buffer-sst")
_, sp := tracing.ChildSpan(sip.Ctx(), "stream-ingestion-buffer-sst")
defer sp.Finish()
return streamingccl.ScanSST(sst, sst.Span,
func(keyVal storage.MVCCKeyValue) error {
Expand Down Expand Up @@ -652,7 +652,7 @@ func (sip *streamIngestionProcessor) bufferDelRange(delRange *roachpb.RangeFeedD
func (sip *streamIngestionProcessor) bufferRangeKeyVal(
rangeKeyVal storage.MVCCRangeKeyValue,
) error {
_, sp := tracing.ChildSpan(sip.Ctx, "stream-ingestion-buffer-range-key")
_, sp := tracing.ChildSpan(sip.Ctx(), "stream-ingestion-buffer-range-key")
defer sp.Finish()

var err error
Expand Down Expand Up @@ -790,7 +790,7 @@ func (r *rangeKeyBatcher) reset() {
}

func (sip *streamIngestionProcessor) flush() (*jobspb.ResolvedSpans, error) {
ctx, sp := tracing.ChildSpan(sip.Ctx, "stream-ingestion-flush")
ctx, sp := tracing.ChildSpan(sip.Ctx(), "stream-ingestion-flush")
defer sp.Finish()

flushedCheckpoints := jobspb.ResolvedSpans{ResolvedSpans: make([]jobspb.ResolvedSpan, 0)}
Expand Down
12 changes: 5 additions & 7 deletions pkg/sql/colexec/columnarizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type Columnarizer struct {
colexecop.NonExplainable

mode columnarizerMode
initialized bool
helper colmem.SetAccountingHelper
metadataAllocator *colmem.Allocator
input execinfra.RowSource
Expand Down Expand Up @@ -171,13 +172,10 @@ func newColumnarizer(

// Init is part of the colexecop.Operator interface.
func (c *Columnarizer) Init(ctx context.Context) {
if c.removedFromFlow {
return
}
if c.Ctx != nil {
// Init has already been called.
if c.removedFromFlow || c.initialized {
return
}
c.initialized = true
c.accumulatedMeta = make([]execinfrapb.ProducerMetadata, 0, 1)
ctx = c.StartInternalNoSpan(ctx)
c.input.Start(ctx)
Expand Down Expand Up @@ -270,7 +268,7 @@ func (c *Columnarizer) DrainMeta() []execinfrapb.ProducerMetadata {
// When this method returns, we no longer will have the reference to the
// metadata, so we can release all memory from the metadata allocator.
defer c.metadataAllocator.ReleaseAll()
if c.Ctx == nil {
if !c.initialized {
// The columnarizer wasn't initialized, so the wrapped processors might
// not have been started leaving them in an unsafe to drain state, so
// we skip the draining. Mostly likely this happened because a panic was
Expand Down Expand Up @@ -301,7 +299,7 @@ func (c *Columnarizer) Close(context.Context) error {
func (c *Columnarizer) trailingMetaCallback() []execinfrapb.ProducerMetadata {
// Close will call InternalClose(). Note that we don't return any trailing
// metadata here because the columnarizers propagate it in DrainMeta.
if err := c.Close(c.Ctx); buildutil.CrdbTestBuild && err != nil {
if err := c.Close(c.Ctx()); buildutil.CrdbTestBuild && err != nil {
// Close never returns an error.
colexecerror.InternalError(errors.NewAssertionErrorWithWrappedErrf(err, "unexpected error from Columnarizer.Close"))
}
Expand Down
10 changes: 1 addition & 9 deletions pkg/sql/colexec/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,18 +352,10 @@ func (m *Materializer) close() {
if m.Closed {
return
}
if m.Ctx == nil {
// In some edge cases (like when Init of an operator above this
// materializer encounters a panic), the materializer might never be
// started, yet it still will attempt to close its Closers. This
// context is only used for logging purposes, so it is ok to grab
// the background context in order to prevent a NPE below.
m.Ctx = context.Background()
}
// Make sure to call InternalClose() only after closing the closers - this
// allows the closers to utilize the unfinished tracing span (if tracing is
// enabled).
m.closers.CloseAndLogOnErr(m.Ctx, "materializer")
m.closers.CloseAndLogOnErr(m.Ctx(), "materializer")
m.InternalClose()
}

Expand Down
Loading

0 comments on commit d7bc4c5

Please sign in to comment.