Skip to content

Commit

Permalink
sql: audit all processors to make their closure bullet-proof
Browse files Browse the repository at this point in the history
This commit replaces all usages of `ProcessorBaseNoHelper.Ctx` field
with a call to the newly-introduced `Ctx()` method which returns
a background context if the processor hasn't been started. This change
makes it so that all processors now respect the contract of
`colexecop.Closer` interface which says that `Close` must be safe to
call even if `Init` hasn't been performed (in the context of processors
this means that `Columnarizer.Init` wasn't called meaning that
`Processor.Start` wasn't either).

Initially, I attempted to fix this in #91446 by putting the protection
into the columnarizer, but that led to broken assumptions since we
wouldn't close all closers that we expected to (in particular, the
materializer that is the input to the wrapped row-by-row processor
wouldn't be closed). This commit takes a different approach and should
fix the issue for good without introducing any flakiness.

As a result, this commit fixes a rarely hit issue when the aggregator
and the zigzag joiner attempt to log when they are closed if they
haven't been started (that we see occasionally from sentry). The issue
is quite rare though, so no release note seems appropriate.

Release note: None
  • Loading branch information
yuzefovich committed Nov 22, 2022
1 parent f660a45 commit 40e57d1
Show file tree
Hide file tree
Showing 26 changed files with 249 additions and 250 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (bp *backupDataProcessor) close() {
bp.cancelAndWaitForWorker()
bp.agg.Close()
if bp.InternalClose() {
bp.memAcc.Close(bp.Ctx)
bp.memAcc.Close(bp.Ctx())
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,8 +549,8 @@ func (rd *restoreDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Produce
prog.ProgressDetails = *details
case meta := <-rd.metaCh:
return nil, meta
case <-rd.Ctx.Done():
rd.MoveToDraining(rd.Ctx.Err())
case <-rd.Ctx().Done():
rd.MoveToDraining(rd.Ctx().Err())
return nil, rd.DrainHelper()
}

Expand Down
9 changes: 2 additions & 7 deletions pkg/ccl/backupccl/restore_data_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,7 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) {
}
expectedKVs := slurpSSTablesLatestKey(t, filepath.Join(dir, "foo"), slurp, srcPrefix, newPrefix)

mockRestoreDataProcessor, err := newTestingRestoreDataProcessor(ctx, &evalCtx, &flowCtx,
mockRestoreDataSpec)
mockRestoreDataProcessor, err := newTestingRestoreDataProcessor(&evalCtx, &flowCtx, mockRestoreDataSpec)
require.NoError(t, err)
ssts := make(chan mergedSST, 1)
require.NoError(t, mockRestoreDataProcessor.openSSTs(ctx, restoreSpanEntry, ssts))
Expand Down Expand Up @@ -427,15 +426,11 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) {
}

func newTestingRestoreDataProcessor(
ctx context.Context,
evalCtx *eval.Context,
flowCtx *execinfra.FlowCtx,
spec execinfrapb.RestoreDataSpec,
evalCtx *eval.Context, flowCtx *execinfra.FlowCtx, spec execinfrapb.RestoreDataSpec,
) (*restoreDataProcessor, error) {
rd := &restoreDataProcessor{
ProcessorBase: execinfra.ProcessorBase{
ProcessorBaseNoHelper: execinfra.ProcessorBaseNoHelper{
Ctx: ctx,
EvalCtx: evalCtx,
},
},
Expand Down
44 changes: 22 additions & 22 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,20 +448,20 @@ func (ca *changeAggregator) close() {
}
if ca.eventConsumer != nil {
if err := ca.eventConsumer.Close(); err != nil {
log.Warningf(ca.Ctx, "error closing event consumer: %s", err)
log.Warningf(ca.Ctx(), "error closing event consumer: %s", err)
}
}

if ca.sink != nil {
if err := ca.sink.Close(); err != nil {
log.Warningf(ca.Ctx, `error closing sink. goroutines may have leaked: %v`, err)
log.Warningf(ca.Ctx(), `error closing sink. goroutines may have leaked: %v`, err)
}
}
ca.memAcc.Close(ca.Ctx)
ca.memAcc.Close(ca.Ctx())
if ca.kvFeedMemMon != nil {
ca.kvFeedMemMon.Stop(ca.Ctx)
ca.kvFeedMemMon.Stop(ca.Ctx())
}
ca.MemMonitor.Stop(ca.Ctx)
ca.MemMonitor.Stop(ca.Ctx())
ca.InternalClose()
}

Expand Down Expand Up @@ -505,7 +505,7 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet
// kvFeed, sends off this event to the event consumer, and flushes the sink
// if necessary.
func (ca *changeAggregator) tick() error {
event, err := ca.eventProducer.Get(ca.Ctx)
event, err := ca.eventProducer.Get(ca.Ctx())
if err != nil {
return err
}
Expand All @@ -520,16 +520,16 @@ func (ca *changeAggregator) tick() error {
ca.sliMetrics.AdmitLatency.RecordValue(timeutil.Since(event.Timestamp().GoTime()).Nanoseconds())
}
ca.recentKVCount++
return ca.eventConsumer.ConsumeEvent(ca.Ctx, event)
return ca.eventConsumer.ConsumeEvent(ca.Ctx(), event)
case kvevent.TypeResolved:
a := event.DetachAlloc()
a.Release(ca.Ctx)
a.Release(ca.Ctx())
resolved := event.Resolved()
if ca.knobs.FilterSpanWithMutation == nil || !ca.knobs.FilterSpanWithMutation(&resolved) {
return ca.noteResolvedSpan(resolved)
}
case kvevent.TypeFlush:
return ca.sink.Flush(ca.Ctx)
return ca.sink.Flush(ca.Ctx())
}

return nil
Expand Down Expand Up @@ -572,7 +572,7 @@ func (ca *changeAggregator) flushFrontier() error {
// otherwise, we could lose buffered messages and violate the
// at-least-once guarantee. This is also true for checkpointing the
// resolved spans in the job progress.
if err := ca.sink.Flush(ca.Ctx); err != nil {
if err := ca.sink.Flush(ca.Ctx()); err != nil {
return err
}

Expand Down Expand Up @@ -1000,11 +1000,11 @@ func (cf *changeFrontier) close() {
}
if cf.sink != nil {
if err := cf.sink.Close(); err != nil {
log.Warningf(cf.Ctx, `error closing sink. goroutines may have leaked: %v`, err)
log.Warningf(cf.Ctx(), `error closing sink. goroutines may have leaked: %v`, err)
}
}
cf.memAcc.Close(cf.Ctx)
cf.MemMonitor.Stop(cf.Ctx)
cf.memAcc.Close(cf.Ctx())
cf.MemMonitor.Stop(cf.Ctx())
}
}

Expand Down Expand Up @@ -1108,7 +1108,7 @@ func (cf *changeFrontier) noteAggregatorProgress(d rowenc.EncDatum) error {
// job progress update closure, but it currently doesn't pass along the info
// we'd need to do it that way.
if !resolved.Timestamp.IsEmpty() && resolved.Timestamp.Less(cf.highWaterAtStart) {
logcrash.ReportOrPanic(cf.Ctx, &cf.flowCtx.Cfg.Settings.SV,
logcrash.ReportOrPanic(cf.Ctx(), &cf.flowCtx.Cfg.Settings.SV,
`got a span level timestamp %s for %s that is less than the initial high-water %s`,
redact.Safe(resolved.Timestamp), resolved.Span, redact.Safe(cf.highWaterAtStart))
continue
Expand Down Expand Up @@ -1210,7 +1210,7 @@ func (cf *changeFrontier) maybeCheckpointJob(
if err != nil {
return false, err
}
cf.js.checkpointCompleted(cf.Ctx, timeutil.Since(checkpointStart))
cf.js.checkpointCompleted(cf.Ctx(), timeutil.Since(checkpointStart))
return updated, nil
}

Expand All @@ -1228,7 +1228,7 @@ func (cf *changeFrontier) checkpointJobProgress(
var updateSkipped error
if cf.js.job != nil {

if err := cf.js.job.Update(cf.Ctx, nil, func(
if err := cf.js.job.Update(cf.Ctx(), nil, func(
txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater,
) error {
// If we're unable to update the job due to the job state, such as during
Expand All @@ -1253,8 +1253,8 @@ func (cf *changeFrontier) checkpointJobProgress(
if !changefeedbase.ActiveProtectedTimestampsEnabled.Get(&cf.flowCtx.Cfg.Settings.SV) {
timestampManager = cf.deprecatedManageProtectedTimestamps
}
if err := timestampManager(cf.Ctx, txn, changefeedProgress); err != nil {
log.Warningf(cf.Ctx, "error managing protected timestamp record: %v", err)
if err := timestampManager(cf.Ctx(), txn, changefeedProgress); err != nil {
log.Warningf(cf.Ctx(), "error managing protected timestamp record: %v", err)
return err
}

Expand Down Expand Up @@ -1282,7 +1282,7 @@ func (cf *changeFrontier) checkpointJobProgress(
}

if updateSkipped != nil {
log.Warningf(cf.Ctx, "skipping changefeed checkpoint: %s", updateSkipped)
log.Warningf(cf.Ctx(), "skipping changefeed checkpoint: %s", updateSkipped)
return false, nil
}

Expand Down Expand Up @@ -1381,7 +1381,7 @@ func (cf *changeFrontier) maybeEmitResolved(newResolved hlc.Timestamp) error {
if !shouldEmit {
return nil
}
if err := emitResolvedTimestamp(cf.Ctx, cf.encoder, cf.sink, newResolved); err != nil {
if err := emitResolvedTimestamp(cf.Ctx(), cf.encoder, cf.sink, newResolved); err != nil {
return err
}
cf.lastEmitResolved = newResolved.GoTime()
Expand Down Expand Up @@ -1420,13 +1420,13 @@ func (cf *changeFrontier) maybeLogBehindSpan(frontierChanged bool) {
description = fmt.Sprintf("job %d", cf.spec.JobID)
}
if frontierChanged && cf.slowLogEveryN.ShouldProcess(now) {
log.Infof(cf.Ctx, "%s new resolved timestamp %s is behind by %s",
log.Infof(cf.Ctx(), "%s new resolved timestamp %s is behind by %s",
description, frontier, resolvedBehind)
}

if cf.slowLogEveryN.ShouldProcess(now) {
s := cf.frontier.PeekFrontierSpan()
log.Infof(cf.Ctx, "%s span %s is behind by %s", description, s, resolvedBehind)
log.Infof(cf.Ctx(), "%s span %s is behind by %s", description, s, resolvedBehind)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,13 +297,13 @@ func (sf *streamIngestionFrontier) Next() (

if err := sf.maybeUpdatePartitionProgress(); err != nil {
// Updating the partition progress isn't a fatal error.
log.Errorf(sf.Ctx, "failed to update partition progress: %+v", err)
log.Errorf(sf.Ctx(), "failed to update partition progress: %+v", err)
}

// Send back a row to the job so that it can update the progress.
select {
case <-sf.Ctx.Done():
sf.MoveToDraining(sf.Ctx.Err())
case <-sf.Ctx().Done():
sf.MoveToDraining(sf.Ctx().Err())
return nil, sf.DrainHelper()
// Send the latest persisted highwater in the heartbeat to the source cluster
// as even with retries we will never request an earlier row than it, and
Expand All @@ -314,7 +314,7 @@ func (sf *streamIngestionFrontier) Next() (
case <-sf.heartbeatSender.stoppedChan:
err := sf.heartbeatSender.wait()
if err != nil {
log.Errorf(sf.Ctx, "heartbeat sender exited with error: %s", err)
log.Errorf(sf.Ctx(), "heartbeat sender exited with error: %s", err)
}
sf.MoveToDraining(err)
return nil, sf.DrainHelper()
Expand All @@ -325,7 +325,7 @@ func (sf *streamIngestionFrontier) Next() (

func (sf *streamIngestionFrontier) close() {
if err := sf.heartbeatSender.stop(); err != nil {
log.Errorf(sf.Ctx, "heartbeat sender exited with error: %s", err)
log.Errorf(sf.Ctx(), "heartbeat sender exited with error: %s", err)
}
if sf.InternalClose() {
sf.metrics.RunningCount.Dec(1)
Expand Down Expand Up @@ -391,7 +391,7 @@ func (sf *streamIngestionFrontier) noteResolvedTimestamps(
// maybeUpdatePartitionProgress polls the frontier and updates the job progress with
// partition-specific information to track the status of each partition.
func (sf *streamIngestionFrontier) maybeUpdatePartitionProgress() error {
ctx := sf.Ctx
ctx := sf.Ctx()
updateFreq := JobCheckpointFrequency.Get(&sf.flowCtx.Cfg.Settings.SV)
if updateFreq == 0 || timeutil.Since(sf.lastPartitionUpdate) < updateFreq {
return nil
Expand Down
12 changes: 6 additions & 6 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,10 +404,10 @@ func (sip *streamIngestionProcessor) close() {
}

for _, client := range sip.streamPartitionClients {
_ = client.Close(sip.Ctx)
_ = client.Close(sip.Ctx())
}
if sip.batcher != nil {
sip.batcher.Close(sip.Ctx)
sip.batcher.Close(sip.Ctx())
}
if sip.maxFlushRateTimer != nil {
sip.maxFlushRateTimer.Stop()
Expand Down Expand Up @@ -538,7 +538,7 @@ func (sip *streamIngestionProcessor) consumeEvents() (*jobspb.ResolvedSpans, err

if streamingKnobs, ok := sip.FlowCtx.TestingKnobs().StreamingTestingKnobs.(*sql.StreamingTestingKnobs); ok {
if streamingKnobs != nil && streamingKnobs.RunAfterReceivingEvent != nil {
if err := streamingKnobs.RunAfterReceivingEvent(sip.Ctx); err != nil {
if err := streamingKnobs.RunAfterReceivingEvent(sip.Ctx()); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -602,7 +602,7 @@ func (sip *streamIngestionProcessor) bufferSST(sst *roachpb.RangeFeedSSTable) er
// careful with checkpoints: we can only send checkpoint whose TS >= SST batch TS
// after the full SST gets ingested.

_, sp := tracing.ChildSpan(sip.Ctx, "stream-ingestion-buffer-sst")
_, sp := tracing.ChildSpan(sip.Ctx(), "stream-ingestion-buffer-sst")
defer sp.Finish()
return streamingccl.ScanSST(sst, sst.Span,
func(keyVal storage.MVCCKeyValue) error {
Expand Down Expand Up @@ -652,7 +652,7 @@ func (sip *streamIngestionProcessor) bufferDelRange(delRange *roachpb.RangeFeedD
func (sip *streamIngestionProcessor) bufferRangeKeyVal(
rangeKeyVal storage.MVCCRangeKeyValue,
) error {
_, sp := tracing.ChildSpan(sip.Ctx, "stream-ingestion-buffer-range-key")
_, sp := tracing.ChildSpan(sip.Ctx(), "stream-ingestion-buffer-range-key")
defer sp.Finish()

var err error
Expand Down Expand Up @@ -790,7 +790,7 @@ func (r *rangeKeyBatcher) reset() {
}

func (sip *streamIngestionProcessor) flush() (*jobspb.ResolvedSpans, error) {
ctx, sp := tracing.ChildSpan(sip.Ctx, "stream-ingestion-flush")
ctx, sp := tracing.ChildSpan(sip.Ctx(), "stream-ingestion-flush")
defer sp.Finish()

flushedCheckpoints := jobspb.ResolvedSpans{ResolvedSpans: make([]jobspb.ResolvedSpan, 0)}
Expand Down
12 changes: 5 additions & 7 deletions pkg/sql/colexec/columnarizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type Columnarizer struct {
colexecop.NonExplainable

mode columnarizerMode
initialized bool
helper colmem.SetAccountingHelper
metadataAllocator *colmem.Allocator
input execinfra.RowSource
Expand Down Expand Up @@ -171,13 +172,10 @@ func newColumnarizer(

// Init is part of the colexecop.Operator interface.
func (c *Columnarizer) Init(ctx context.Context) {
if c.removedFromFlow {
return
}
if c.Ctx != nil {
// Init has already been called.
if c.removedFromFlow || c.initialized {
return
}
c.initialized = true
c.accumulatedMeta = make([]execinfrapb.ProducerMetadata, 0, 1)
ctx = c.StartInternalNoSpan(ctx)
c.input.Start(ctx)
Expand Down Expand Up @@ -270,7 +268,7 @@ func (c *Columnarizer) DrainMeta() []execinfrapb.ProducerMetadata {
// When this method returns, we no longer will have the reference to the
// metadata, so we can release all memory from the metadata allocator.
defer c.metadataAllocator.ReleaseAll()
if c.Ctx == nil {
if !c.initialized {
// The columnarizer wasn't initialized, so the wrapped processors might
// not have been started leaving them in an unsafe to drain state, so
// we skip the draining. Mostly likely this happened because a panic was
Expand Down Expand Up @@ -301,7 +299,7 @@ func (c *Columnarizer) Close(context.Context) error {
func (c *Columnarizer) trailingMetaCallback() []execinfrapb.ProducerMetadata {
// Close will call InternalClose(). Note that we don't return any trailing
// metadata here because the columnarizers propagate it in DrainMeta.
if err := c.Close(c.Ctx); buildutil.CrdbTestBuild && err != nil {
if err := c.Close(c.Ctx()); buildutil.CrdbTestBuild && err != nil {
// Close never returns an error.
colexecerror.InternalError(errors.NewAssertionErrorWithWrappedErrf(err, "unexpected error from Columnarizer.Close"))
}
Expand Down
10 changes: 1 addition & 9 deletions pkg/sql/colexec/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,18 +352,10 @@ func (m *Materializer) close() {
if m.Closed {
return
}
if m.Ctx == nil {
// In some edge cases (like when Init of an operator above this
// materializer encounters a panic), the materializer might never be
// started, yet it still will attempt to close its Closers. This
// context is only used for logging purposes, so it is ok to grab
// the background context in order to prevent a NPE below.
m.Ctx = context.Background()
}
// Make sure to call InternalClose() only after closing the closers - this
// allows the closers to utilize the unfinished tracing span (if tracing is
// enabled).
m.closers.CloseAndLogOnErr(m.Ctx, "materializer")
m.closers.CloseAndLogOnErr(m.Ctx(), "materializer")
m.InternalClose()
}

Expand Down
Loading

0 comments on commit 40e57d1

Please sign in to comment.