Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-22.2: sql: audit all processors to make their closure bullet-proof #92358

Merged
merged 1 commit into from
Nov 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (bp *backupDataProcessor) close() {
bp.cancelAndWaitForWorker()
bp.agg.Close()
if bp.InternalClose() {
bp.memAcc.Close(bp.Ctx)
bp.memAcc.Close(bp.Ctx())
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,8 +549,8 @@ func (rd *restoreDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Produce
prog.ProgressDetails = *details
case meta := <-rd.metaCh:
return nil, meta
case <-rd.Ctx.Done():
rd.MoveToDraining(rd.Ctx.Err())
case <-rd.Ctx().Done():
rd.MoveToDraining(rd.Ctx().Err())
return nil, rd.DrainHelper()
}

Expand Down
9 changes: 2 additions & 7 deletions pkg/ccl/backupccl/restore_data_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,7 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) {
}
expectedKVs := slurpSSTablesLatestKey(t, filepath.Join(dir, "foo"), slurp, srcPrefix, newPrefix)

mockRestoreDataProcessor, err := newTestingRestoreDataProcessor(ctx, &evalCtx, &flowCtx,
mockRestoreDataSpec)
mockRestoreDataProcessor, err := newTestingRestoreDataProcessor(&evalCtx, &flowCtx, mockRestoreDataSpec)
require.NoError(t, err)
ssts := make(chan mergedSST, 1)
require.NoError(t, mockRestoreDataProcessor.openSSTs(ctx, restoreSpanEntry, ssts))
Expand Down Expand Up @@ -427,15 +426,11 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) {
}

func newTestingRestoreDataProcessor(
ctx context.Context,
evalCtx *eval.Context,
flowCtx *execinfra.FlowCtx,
spec execinfrapb.RestoreDataSpec,
evalCtx *eval.Context, flowCtx *execinfra.FlowCtx, spec execinfrapb.RestoreDataSpec,
) (*restoreDataProcessor, error) {
rd := &restoreDataProcessor{
ProcessorBase: execinfra.ProcessorBase{
ProcessorBaseNoHelper: execinfra.ProcessorBaseNoHelper{
Ctx: ctx,
EvalCtx: evalCtx,
},
},
Expand Down
44 changes: 22 additions & 22 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,20 +448,20 @@ func (ca *changeAggregator) close() {
}
if ca.eventConsumer != nil {
if err := ca.eventConsumer.Close(); err != nil {
log.Warningf(ca.Ctx, "error closing event consumer: %s", err)
log.Warningf(ca.Ctx(), "error closing event consumer: %s", err)
}
}

if ca.sink != nil {
if err := ca.sink.Close(); err != nil {
log.Warningf(ca.Ctx, `error closing sink. goroutines may have leaked: %v`, err)
log.Warningf(ca.Ctx(), `error closing sink. goroutines may have leaked: %v`, err)
}
}
ca.memAcc.Close(ca.Ctx)
ca.memAcc.Close(ca.Ctx())
if ca.kvFeedMemMon != nil {
ca.kvFeedMemMon.Stop(ca.Ctx)
ca.kvFeedMemMon.Stop(ca.Ctx())
}
ca.MemMonitor.Stop(ca.Ctx)
ca.MemMonitor.Stop(ca.Ctx())
ca.InternalClose()
}

Expand Down Expand Up @@ -505,7 +505,7 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet
// kvFeed, sends off this event to the event consumer, and flushes the sink
// if necessary.
func (ca *changeAggregator) tick() error {
event, err := ca.eventProducer.Get(ca.Ctx)
event, err := ca.eventProducer.Get(ca.Ctx())
if err != nil {
return err
}
Expand All @@ -520,16 +520,16 @@ func (ca *changeAggregator) tick() error {
ca.sliMetrics.AdmitLatency.RecordValue(timeutil.Since(event.Timestamp().GoTime()).Nanoseconds())
}
ca.recentKVCount++
return ca.eventConsumer.ConsumeEvent(ca.Ctx, event)
return ca.eventConsumer.ConsumeEvent(ca.Ctx(), event)
case kvevent.TypeResolved:
a := event.DetachAlloc()
a.Release(ca.Ctx)
a.Release(ca.Ctx())
resolved := event.Resolved()
if ca.knobs.FilterSpanWithMutation == nil || !ca.knobs.FilterSpanWithMutation(&resolved) {
return ca.noteResolvedSpan(resolved)
}
case kvevent.TypeFlush:
return ca.sink.Flush(ca.Ctx)
return ca.sink.Flush(ca.Ctx())
}

return nil
Expand Down Expand Up @@ -572,7 +572,7 @@ func (ca *changeAggregator) flushFrontier() error {
// otherwise, we could lose buffered messages and violate the
// at-least-once guarantee. This is also true for checkpointing the
// resolved spans in the job progress.
if err := ca.sink.Flush(ca.Ctx); err != nil {
if err := ca.sink.Flush(ca.Ctx()); err != nil {
return err
}

Expand Down Expand Up @@ -1000,11 +1000,11 @@ func (cf *changeFrontier) close() {
}
if cf.sink != nil {
if err := cf.sink.Close(); err != nil {
log.Warningf(cf.Ctx, `error closing sink. goroutines may have leaked: %v`, err)
log.Warningf(cf.Ctx(), `error closing sink. goroutines may have leaked: %v`, err)
}
}
cf.memAcc.Close(cf.Ctx)
cf.MemMonitor.Stop(cf.Ctx)
cf.memAcc.Close(cf.Ctx())
cf.MemMonitor.Stop(cf.Ctx())
}
}

Expand Down Expand Up @@ -1108,7 +1108,7 @@ func (cf *changeFrontier) noteAggregatorProgress(d rowenc.EncDatum) error {
// job progress update closure, but it currently doesn't pass along the info
// we'd need to do it that way.
if !resolved.Timestamp.IsEmpty() && resolved.Timestamp.Less(cf.highWaterAtStart) {
logcrash.ReportOrPanic(cf.Ctx, &cf.flowCtx.Cfg.Settings.SV,
logcrash.ReportOrPanic(cf.Ctx(), &cf.flowCtx.Cfg.Settings.SV,
`got a span level timestamp %s for %s that is less than the initial high-water %s`,
redact.Safe(resolved.Timestamp), resolved.Span, redact.Safe(cf.highWaterAtStart))
continue
Expand Down Expand Up @@ -1210,7 +1210,7 @@ func (cf *changeFrontier) maybeCheckpointJob(
if err != nil {
return false, err
}
cf.js.checkpointCompleted(cf.Ctx, timeutil.Since(checkpointStart))
cf.js.checkpointCompleted(cf.Ctx(), timeutil.Since(checkpointStart))
return updated, nil
}

Expand All @@ -1228,7 +1228,7 @@ func (cf *changeFrontier) checkpointJobProgress(
var updateSkipped error
if cf.js.job != nil {

if err := cf.js.job.Update(cf.Ctx, nil, func(
if err := cf.js.job.Update(cf.Ctx(), nil, func(
txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater,
) error {
// If we're unable to update the job due to the job state, such as during
Expand All @@ -1253,8 +1253,8 @@ func (cf *changeFrontier) checkpointJobProgress(
if !changefeedbase.ActiveProtectedTimestampsEnabled.Get(&cf.flowCtx.Cfg.Settings.SV) {
timestampManager = cf.deprecatedManageProtectedTimestamps
}
if err := timestampManager(cf.Ctx, txn, changefeedProgress); err != nil {
log.Warningf(cf.Ctx, "error managing protected timestamp record: %v", err)
if err := timestampManager(cf.Ctx(), txn, changefeedProgress); err != nil {
log.Warningf(cf.Ctx(), "error managing protected timestamp record: %v", err)
return err
}

Expand Down Expand Up @@ -1282,7 +1282,7 @@ func (cf *changeFrontier) checkpointJobProgress(
}

if updateSkipped != nil {
log.Warningf(cf.Ctx, "skipping changefeed checkpoint: %s", updateSkipped)
log.Warningf(cf.Ctx(), "skipping changefeed checkpoint: %s", updateSkipped)
return false, nil
}

Expand Down Expand Up @@ -1381,7 +1381,7 @@ func (cf *changeFrontier) maybeEmitResolved(newResolved hlc.Timestamp) error {
if !shouldEmit {
return nil
}
if err := emitResolvedTimestamp(cf.Ctx, cf.encoder, cf.sink, newResolved); err != nil {
if err := emitResolvedTimestamp(cf.Ctx(), cf.encoder, cf.sink, newResolved); err != nil {
return err
}
cf.lastEmitResolved = newResolved.GoTime()
Expand Down Expand Up @@ -1420,13 +1420,13 @@ func (cf *changeFrontier) maybeLogBehindSpan(frontierChanged bool) {
description = fmt.Sprintf("job %d", cf.spec.JobID)
}
if frontierChanged && cf.slowLogEveryN.ShouldProcess(now) {
log.Infof(cf.Ctx, "%s new resolved timestamp %s is behind by %s",
log.Infof(cf.Ctx(), "%s new resolved timestamp %s is behind by %s",
description, frontier, resolvedBehind)
}

if cf.slowLogEveryN.ShouldProcess(now) {
s := cf.frontier.PeekFrontierSpan()
log.Infof(cf.Ctx, "%s span %s is behind by %s", description, s, resolvedBehind)
log.Infof(cf.Ctx(), "%s span %s is behind by %s", description, s, resolvedBehind)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,13 +297,13 @@ func (sf *streamIngestionFrontier) Next() (

if err := sf.maybeUpdatePartitionProgress(); err != nil {
// Updating the partition progress isn't a fatal error.
log.Errorf(sf.Ctx, "failed to update partition progress: %+v", err)
log.Errorf(sf.Ctx(), "failed to update partition progress: %+v", err)
}

// Send back a row to the job so that it can update the progress.
select {
case <-sf.Ctx.Done():
sf.MoveToDraining(sf.Ctx.Err())
case <-sf.Ctx().Done():
sf.MoveToDraining(sf.Ctx().Err())
return nil, sf.DrainHelper()
// Send the latest persisted highwater in the heartbeat to the source cluster
// as even with retries we will never request an earlier row than it, and
Expand All @@ -314,7 +314,7 @@ func (sf *streamIngestionFrontier) Next() (
case <-sf.heartbeatSender.stoppedChan:
err := sf.heartbeatSender.wait()
if err != nil {
log.Errorf(sf.Ctx, "heartbeat sender exited with error: %s", err)
log.Errorf(sf.Ctx(), "heartbeat sender exited with error: %s", err)
}
sf.MoveToDraining(err)
return nil, sf.DrainHelper()
Expand All @@ -325,7 +325,7 @@ func (sf *streamIngestionFrontier) Next() (

func (sf *streamIngestionFrontier) close() {
if err := sf.heartbeatSender.stop(); err != nil {
log.Errorf(sf.Ctx, "heartbeat sender exited with error: %s", err)
log.Errorf(sf.Ctx(), "heartbeat sender exited with error: %s", err)
}
if sf.InternalClose() {
sf.metrics.RunningCount.Dec(1)
Expand Down Expand Up @@ -391,7 +391,7 @@ func (sf *streamIngestionFrontier) noteResolvedTimestamps(
// maybeUpdatePartitionProgress polls the frontier and updates the job progress with
// partition-specific information to track the status of each partition.
func (sf *streamIngestionFrontier) maybeUpdatePartitionProgress() error {
ctx := sf.Ctx
ctx := sf.Ctx()
updateFreq := JobCheckpointFrequency.Get(&sf.flowCtx.Cfg.Settings.SV)
if updateFreq == 0 || timeutil.Since(sf.lastPartitionUpdate) < updateFreq {
return nil
Expand Down
12 changes: 6 additions & 6 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,10 +404,10 @@ func (sip *streamIngestionProcessor) close() {
}

for _, client := range sip.streamPartitionClients {
_ = client.Close(sip.Ctx)
_ = client.Close(sip.Ctx())
}
if sip.batcher != nil {
sip.batcher.Close(sip.Ctx)
sip.batcher.Close(sip.Ctx())
}
if sip.maxFlushRateTimer != nil {
sip.maxFlushRateTimer.Stop()
Expand Down Expand Up @@ -538,7 +538,7 @@ func (sip *streamIngestionProcessor) consumeEvents() (*jobspb.ResolvedSpans, err

if streamingKnobs, ok := sip.FlowCtx.TestingKnobs().StreamingTestingKnobs.(*sql.StreamingTestingKnobs); ok {
if streamingKnobs != nil && streamingKnobs.RunAfterReceivingEvent != nil {
if err := streamingKnobs.RunAfterReceivingEvent(sip.Ctx); err != nil {
if err := streamingKnobs.RunAfterReceivingEvent(sip.Ctx()); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -602,7 +602,7 @@ func (sip *streamIngestionProcessor) bufferSST(sst *roachpb.RangeFeedSSTable) er
// careful with checkpoints: we can only send checkpoint whose TS >= SST batch TS
// after the full SST gets ingested.

_, sp := tracing.ChildSpan(sip.Ctx, "stream-ingestion-buffer-sst")
_, sp := tracing.ChildSpan(sip.Ctx(), "stream-ingestion-buffer-sst")
defer sp.Finish()
return streamingccl.ScanSST(sst, sst.Span,
func(keyVal storage.MVCCKeyValue) error {
Expand Down Expand Up @@ -652,7 +652,7 @@ func (sip *streamIngestionProcessor) bufferDelRange(delRange *roachpb.RangeFeedD
func (sip *streamIngestionProcessor) bufferRangeKeyVal(
rangeKeyVal storage.MVCCRangeKeyValue,
) error {
_, sp := tracing.ChildSpan(sip.Ctx, "stream-ingestion-buffer-range-key")
_, sp := tracing.ChildSpan(sip.Ctx(), "stream-ingestion-buffer-range-key")
defer sp.Finish()

var err error
Expand Down Expand Up @@ -790,7 +790,7 @@ func (r *rangeKeyBatcher) reset() {
}

func (sip *streamIngestionProcessor) flush() (*jobspb.ResolvedSpans, error) {
ctx, sp := tracing.ChildSpan(sip.Ctx, "stream-ingestion-flush")
ctx, sp := tracing.ChildSpan(sip.Ctx(), "stream-ingestion-flush")
defer sp.Finish()

flushedCheckpoints := jobspb.ResolvedSpans{ResolvedSpans: make([]jobspb.ResolvedSpan, 0)}
Expand Down
12 changes: 5 additions & 7 deletions pkg/sql/colexec/columnarizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type Columnarizer struct {
colexecop.NonExplainable

mode columnarizerMode
initialized bool
helper colmem.SetAccountingHelper
metadataAllocator *colmem.Allocator
input execinfra.RowSource
Expand Down Expand Up @@ -171,13 +172,10 @@ func newColumnarizer(

// Init is part of the colexecop.Operator interface.
func (c *Columnarizer) Init(ctx context.Context) {
if c.removedFromFlow {
return
}
if c.Ctx != nil {
// Init has already been called.
if c.removedFromFlow || c.initialized {
return
}
c.initialized = true
c.accumulatedMeta = make([]execinfrapb.ProducerMetadata, 0, 1)
ctx = c.StartInternalNoSpan(ctx)
c.input.Start(ctx)
Expand Down Expand Up @@ -270,7 +268,7 @@ func (c *Columnarizer) DrainMeta() []execinfrapb.ProducerMetadata {
// When this method returns, we no longer will have the reference to the
// metadata, so we can release all memory from the metadata allocator.
defer c.metadataAllocator.ReleaseAll()
if c.Ctx == nil {
if !c.initialized {
// The columnarizer wasn't initialized, so the wrapped processors might
// not have been started leaving them in an unsafe to drain state, so
// we skip the draining. Mostly likely this happened because a panic was
Expand Down Expand Up @@ -301,7 +299,7 @@ func (c *Columnarizer) Close(context.Context) error {
func (c *Columnarizer) trailingMetaCallback() []execinfrapb.ProducerMetadata {
// Close will call InternalClose(). Note that we don't return any trailing
// metadata here because the columnarizers propagate it in DrainMeta.
if err := c.Close(c.Ctx); buildutil.CrdbTestBuild && err != nil {
if err := c.Close(c.Ctx()); buildutil.CrdbTestBuild && err != nil {
// Close never returns an error.
colexecerror.InternalError(errors.NewAssertionErrorWithWrappedErrf(err, "unexpected error from Columnarizer.Close"))
}
Expand Down
10 changes: 1 addition & 9 deletions pkg/sql/colexec/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,18 +352,10 @@ func (m *Materializer) close() {
if m.Closed {
return
}
if m.Ctx == nil {
// In some edge cases (like when Init of an operator above this
// materializer encounters a panic), the materializer might never be
// started, yet it still will attempt to close its Closers. This
// context is only used for logging purposes, so it is ok to grab
// the background context in order to prevent a NPE below.
m.Ctx = context.Background()
}
// Make sure to call InternalClose() only after closing the closers - this
// allows the closers to utilize the unfinished tracing span (if tracing is
// enabled).
m.closers.CloseAndLogOnErr(m.Ctx, "materializer")
m.closers.CloseAndLogOnErr(m.Ctx(), "materializer")
m.InternalClose()
}

Expand Down
Loading