Skip to content

Commit

Permalink
sql: audit all processors to make their closure bullet-proof
Browse files Browse the repository at this point in the history
This commit replaces all usages of `ProcessorBaseNoHelper.Ctx` field
with a call to the newly-introduced `Ctx()` method which returns
a background context if the processor hasn't been started. This change
makes it so that all processors now respect the contract of
`colexecop.Closer` interface which says that `Close` must be safe to
call even if `Init` hasn't been performed (in the context of processors
this means that `Columnarizer.Init` wasn't called meaning that
`Processor.Start` wasn't either).

Initially, I attempted to fix this in cockroachdb#91446 by putting the protection
into the columnarizer, but that led to broken assumptions since we
wouldn't close all closers that we expected to (in particular, the
materializer that is the input to the wrapped row-by-row processor
wouldn't be closed). This commit takes a different approach and should
fix the issue for good without introducing any flakiness.

As a result, this commit fixes a rarely hit issue when the aggregator
and the zigzag joiner attempt to log when they are closed if they
haven't been started (that we see occasionally from sentry). The issue
is quite rare though, so no release note seems appropriate.

Release note: None
  • Loading branch information
yuzefovich committed Nov 22, 2022
1 parent 82efd56 commit aa4881f
Show file tree
Hide file tree
Showing 27 changed files with 261 additions and 260 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func (bp *backupDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Producer
func (bp *backupDataProcessor) close() {
bp.cancelAndWaitForWorker()
if bp.InternalClose() {
bp.memAcc.Close(bp.Ctx)
bp.memAcc.Close(bp.Ctx())
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,8 +568,8 @@ func (rd *restoreDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Produce
prog.ProgressDetails = *details
case meta := <-rd.metaCh:
return nil, meta
case <-rd.Ctx.Done():
rd.MoveToDraining(rd.Ctx.Err())
case <-rd.Ctx().Done():
rd.MoveToDraining(rd.Ctx().Err())
return nil, rd.DrainHelper()
}

Expand Down
9 changes: 2 additions & 7 deletions pkg/ccl/backupccl/restore_data_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,8 +374,7 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) {
}
expectedKVs := slurpSSTablesLatestKey(t, filepath.Join(dir, "foo"), slurp, srcPrefix, newPrefix)

mockRestoreDataProcessor, err := newTestingRestoreDataProcessor(ctx, &evalCtx, &flowCtx,
mockRestoreDataSpec)
mockRestoreDataProcessor, err := newTestingRestoreDataProcessor(&evalCtx, &flowCtx, mockRestoreDataSpec)
require.NoError(t, err)
ssts := make(chan mergedSST, 1)
require.NoError(t, mockRestoreDataProcessor.openSSTs(ctx, restoreSpanEntry, ssts))
Expand Down Expand Up @@ -417,15 +416,11 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) {
}

func newTestingRestoreDataProcessor(
ctx context.Context,
evalCtx *tree.EvalContext,
flowCtx *execinfra.FlowCtx,
spec execinfrapb.RestoreDataSpec,
evalCtx *tree.EvalContext, flowCtx *execinfra.FlowCtx, spec execinfrapb.RestoreDataSpec,
) (*restoreDataProcessor, error) {
rd := &restoreDataProcessor{
ProcessorBase: execinfra.ProcessorBase{
ProcessorBaseNoHelper: execinfra.ProcessorBaseNoHelper{
Ctx: ctx,
EvalCtx: evalCtx,
},
},
Expand Down
46 changes: 23 additions & 23 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,15 +484,15 @@ func (ca *changeAggregator) close() {
}
if ca.sink != nil {
if err := ca.sink.Close(); err != nil {
log.Warningf(ca.Ctx, `error closing sink. goroutines may have leaked: %v`, err)
log.Warningf(ca.Ctx(), `error closing sink. goroutines may have leaked: %v`, err)
}
}

ca.memAcc.Close(ca.Ctx)
ca.memAcc.Close(ca.Ctx())
if ca.kvFeedMemMon != nil {
ca.kvFeedMemMon.Stop(ca.Ctx)
ca.kvFeedMemMon.Stop(ca.Ctx())
}
ca.MemMonitor.Stop(ca.Ctx)
ca.MemMonitor.Stop(ca.Ctx())
ca.InternalClose()
}

Expand Down Expand Up @@ -536,7 +536,7 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet
// kvFeed, sends off this event to the event consumer, and flushes the sink
// if necessary.
func (ca *changeAggregator) tick() error {
event, err := ca.eventProducer.Get(ca.Ctx)
event, err := ca.eventProducer.Get(ca.Ctx())
if err != nil {
return err
}
Expand All @@ -551,16 +551,16 @@ func (ca *changeAggregator) tick() error {
ca.sliMetrics.AdmitLatency.RecordValue(timeutil.Since(event.Timestamp().GoTime()).Nanoseconds())
}
ca.recentKVCount++
return ca.eventConsumer.ConsumeEvent(ca.Ctx, event)
return ca.eventConsumer.ConsumeEvent(ca.Ctx(), event)
case kvevent.TypeResolved:
a := event.DetachAlloc()
a.Release(ca.Ctx)
a.Release(ca.Ctx())
resolved := event.Resolved()
if ca.knobs.ShouldSkipResolved == nil || !ca.knobs.ShouldSkipResolved(resolved) {
return ca.noteResolvedSpan(resolved)
}
case kvevent.TypeFlush:
return ca.sink.Flush(ca.Ctx)
return ca.sink.Flush(ca.Ctx())
}

return nil
Expand Down Expand Up @@ -601,7 +601,7 @@ func (ca *changeAggregator) flushFrontier() error {
// otherwise, we could lose buffered messages and violate the
// at-least-once guarantee. This is also true for checkpointing the
// resolved spans in the job progress.
if err := ca.sink.Flush(ca.Ctx); err != nil {
if err := ca.sink.Flush(ca.Ctx()); err != nil {
return err
}

Expand All @@ -626,7 +626,7 @@ func (ca *changeAggregator) flushFrontier() error {

func (ca *changeAggregator) emitResolved(batch jobspb.ResolvedSpans) error {
// TODO(smiskin): Remove post-22.2
if !ca.flowCtx.Cfg.Settings.Version.IsActive(ca.Ctx, clusterversion.ChangefeedIdleness) {
if !ca.flowCtx.Cfg.Settings.Version.IsActive(ca.Ctx(), clusterversion.ChangefeedIdleness) {
for _, resolved := range batch.ResolvedSpans {
resolvedBytes, err := protoutil.Marshal(&resolved)
if err != nil {
Expand Down Expand Up @@ -1414,11 +1414,11 @@ func (cf *changeFrontier) close() {
}
if cf.sink != nil {
if err := cf.sink.Close(); err != nil {
log.Warningf(cf.Ctx, `error closing sink. goroutines may have leaked: %v`, err)
log.Warningf(cf.Ctx(), `error closing sink. goroutines may have leaked: %v`, err)
}
}
cf.memAcc.Close(cf.Ctx)
cf.MemMonitor.Stop(cf.Ctx)
cf.memAcc.Close(cf.Ctx())
cf.MemMonitor.Stop(cf.Ctx())
}
}

Expand Down Expand Up @@ -1506,7 +1506,7 @@ func (cf *changeFrontier) noteAggregatorProgress(d rowenc.EncDatum) error {
}

var resolvedSpans jobspb.ResolvedSpans
if cf.flowCtx.Cfg.Settings.Version.IsActive(cf.Ctx, clusterversion.ChangefeedIdleness) {
if cf.flowCtx.Cfg.Settings.Version.IsActive(cf.Ctx(), clusterversion.ChangefeedIdleness) {
if err := protoutil.Unmarshal([]byte(*raw), &resolvedSpans); err != nil {
return errors.NewAssertionErrorWithWrappedErrf(err,
`unmarshalling aggregator progress update: %x`, raw)
Expand Down Expand Up @@ -1534,7 +1534,7 @@ func (cf *changeFrontier) noteAggregatorProgress(d rowenc.EncDatum) error {
// job progress update closure, but it currently doesn't pass along the info
// we'd need to do it that way.
if !resolved.Timestamp.IsEmpty() && resolved.Timestamp.Less(cf.highWaterAtStart) {
logcrash.ReportOrPanic(cf.Ctx, &cf.flowCtx.Cfg.Settings.SV,
logcrash.ReportOrPanic(cf.Ctx(), &cf.flowCtx.Cfg.Settings.SV,
`got a span level timestamp %s for %s that is less than the initial high-water %s`,
redact.Safe(resolved.Timestamp), resolved.Span, redact.Safe(cf.highWaterAtStart))
continue
Expand Down Expand Up @@ -1636,7 +1636,7 @@ func (cf *changeFrontier) maybeCheckpointJob(
if err != nil {
return false, err
}
cf.js.checkpointCompleted(cf.Ctx, timeutil.Since(checkpointStart))
cf.js.checkpointCompleted(cf.Ctx(), timeutil.Since(checkpointStart))
return updated, nil
}

Expand All @@ -1652,7 +1652,7 @@ func (cf *changeFrontier) checkpointJobProgress(
}
cf.metrics.FrontierUpdates.Inc(1)
var updateSkipped error
if err := cf.js.job.Update(cf.Ctx, nil, func(
if err := cf.js.job.Update(cf.Ctx(), nil, func(
txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater,
) error {
// If we're unable to update the job due to the job state, such as during
Expand All @@ -1677,8 +1677,8 @@ func (cf *changeFrontier) checkpointJobProgress(
if !changefeedbase.ActiveProtectedTimestampsEnabled.Get(&cf.flowCtx.Cfg.Settings.SV) {
timestampManager = cf.deprecatedManageProtectedTimestamps
}
if err := timestampManager(cf.Ctx, txn, changefeedProgress); err != nil {
log.Warningf(cf.Ctx, "error managing protected timestamp record: %v", err)
if err := timestampManager(cf.Ctx(), txn, changefeedProgress); err != nil {
log.Warningf(cf.Ctx(), "error managing protected timestamp record: %v", err)
return err
}

Expand All @@ -1702,7 +1702,7 @@ func (cf *changeFrontier) checkpointJobProgress(
}

if updateSkipped != nil {
log.Warningf(cf.Ctx, "skipping changefeed checkpoint: %s", updateSkipped)
log.Warningf(cf.Ctx(), "skipping changefeed checkpoint: %s", updateSkipped)
return false, nil
}

Expand Down Expand Up @@ -1795,7 +1795,7 @@ func (cf *changeFrontier) maybeEmitResolved(newResolved hlc.Timestamp) error {
if !shouldEmit {
return nil
}
if err := emitResolvedTimestamp(cf.Ctx, cf.encoder, cf.sink, newResolved); err != nil {
if err := emitResolvedTimestamp(cf.Ctx(), cf.encoder, cf.sink, newResolved); err != nil {
return err
}
cf.lastEmitResolved = newResolved.GoTime()
Expand Down Expand Up @@ -1834,13 +1834,13 @@ func (cf *changeFrontier) maybeLogBehindSpan(frontierChanged bool) {
description = fmt.Sprintf("job %d", cf.spec.JobID)
}
if frontierChanged {
log.Infof(cf.Ctx, "%s new resolved timestamp %s is behind by %s",
log.Infof(cf.Ctx(), "%s new resolved timestamp %s is behind by %s",
description, frontier, resolvedBehind)
}

if cf.slowLogEveryN.ShouldProcess(now) {
s := cf.frontier.PeekFrontierSpan()
log.Infof(cf.Ctx, "%s span %s is behind by %s", description, s, resolvedBehind)
log.Infof(cf.Ctx(), "%s span %s is behind by %s", description, s, resolvedBehind)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func (sf *streamIngestionFrontier) Next() (

if err := sf.maybeUpdatePartitionProgress(); err != nil {
// Updating the partition progress isn't a fatal error.
log.Errorf(sf.Ctx, "failed to update partition progress: %+v", err)
log.Errorf(sf.Ctx(), "failed to update partition progress: %+v", err)
}

var frontierChanged bool
Expand All @@ -268,8 +268,8 @@ func (sf *streamIngestionFrontier) Next() (
// Send back a row to the job so that it can update the progress.
newResolvedTS := sf.frontier.Frontier()
select {
case <-sf.Ctx.Done():
sf.MoveToDraining(sf.Ctx.Err())
case <-sf.Ctx().Done():
sf.MoveToDraining(sf.Ctx().Err())
return nil, sf.DrainHelper()
// Send the frontier update in the heartbeat to the source cluster.
case sf.heartbeatSender.frontierUpdates <- newResolvedTS:
Expand Down Expand Up @@ -301,7 +301,7 @@ func (sf *streamIngestionFrontier) Next() (
func (sf *streamIngestionFrontier) ConsumerClosed() {
if sf.InternalClose() {
if err := sf.heartbeatSender.stop(); err != nil {
log.Errorf(sf.Ctx, "heartbeatSender exited with error: %s", err.Error())
log.Errorf(sf.Ctx(), "heartbeatSender exited with error: %s", err.Error())
}
}
}
Expand Down Expand Up @@ -349,7 +349,7 @@ func (sf *streamIngestionFrontier) noteResolvedTimestamps(
// maybeUpdatePartitionProgress polls the frontier and updates the job progress with
// partition-specific information to track the status of each partition.
func (sf *streamIngestionFrontier) maybeUpdatePartitionProgress() error {
ctx := sf.Ctx
ctx := sf.Ctx()
updateFreq := PartitionProgressFrequency.Get(&sf.flowCtx.Cfg.Settings.SV)
if updateFreq == 0 || timeutil.Since(sf.lastPartitionUpdate) < updateFreq {
return nil
Expand Down
18 changes: 9 additions & 9 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func (sip *streamIngestionProcessor) close() {
_ = client.Close()
}
if sip.batcher != nil {
sip.batcher.Close(sip.Ctx)
sip.batcher.Close(sip.Ctx())
}
if sip.maxFlushRateTimer != nil {
sip.maxFlushRateTimer.Stop()
Expand Down Expand Up @@ -476,7 +476,7 @@ func (sip *streamIngestionProcessor) consumeEvents() (*jobspb.ResolvedSpans, err
if streamingKnobs, ok := sip.FlowCtx.TestingKnobs().StreamingTestingKnobs.(*sql.StreamingTestingKnobs); ok {
if streamingKnobs != nil {
if streamingKnobs.RunAfterReceivingEvent != nil {
streamingKnobs.RunAfterReceivingEvent(sip.Ctx)
streamingKnobs.RunAfterReceivingEvent(sip.Ctx())
}
}
}
Expand All @@ -503,13 +503,13 @@ func (sip *streamIngestionProcessor) consumeEvents() (*jobspb.ResolvedSpans, err

return sip.flush()
case streamingccl.GenerationEvent:
log.Info(sip.Ctx, "GenerationEvent received")
log.Info(sip.Ctx(), "GenerationEvent received")
select {
case <-sip.cutoverCh:
sip.internalDrained = true
return nil, nil
case <-sip.Ctx.Done():
return nil, sip.Ctx.Err()
case <-sip.Ctx().Done():
return nil, sip.Ctx().Err()
}
default:
return nil, errors.Newf("unknown streaming event type %v", event.Type())
Expand Down Expand Up @@ -552,7 +552,7 @@ func (sip *streamIngestionProcessor) bufferKV(event partitionEvent) error {
}

func (sip *streamIngestionProcessor) bufferCheckpoint(event partitionEvent) error {
log.Infof(sip.Ctx, "got checkpoint %v", event.GetResolved())
log.Infof(sip.Ctx(), "got checkpoint %v", event.GetResolved())
resolvedTimePtr := event.GetResolved()
if resolvedTimePtr == nil {
return errors.New("checkpoint event expected to have a resolved timestamp")
Expand All @@ -574,13 +574,13 @@ func (sip *streamIngestionProcessor) flush() (*jobspb.ResolvedSpans, error) {

totalSize := 0
for _, kv := range sip.curBatch {
if err := sip.batcher.AddMVCCKey(sip.Ctx, kv.Key, kv.Value); err != nil {
if err := sip.batcher.AddMVCCKey(sip.Ctx(), kv.Key, kv.Value); err != nil {
return nil, errors.Wrapf(err, "adding key %+v", kv)
}
totalSize += len(kv.Key.Key) + len(kv.Value)
}

if err := sip.batcher.Flush(sip.Ctx); err != nil {
if err := sip.batcher.Flush(sip.Ctx()); err != nil {
return nil, errors.Wrap(err, "flushing")
}
sip.metrics.Flushes.Inc(1)
Expand All @@ -605,7 +605,7 @@ func (sip *streamIngestionProcessor) flush() (*jobspb.ResolvedSpans, error) {
sip.lastFlushTime = timeutil.Now()
sip.bufferedCheckpoints = make(map[string]hlc.Timestamp)

return &flushedCheckpoints, sip.batcher.Reset(sip.Ctx)
return &flushedCheckpoints, sip.batcher.Reset(sip.Ctx())
}

func init() {
Expand Down
20 changes: 9 additions & 11 deletions pkg/sql/colexec/columnarizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,11 @@ type Columnarizer struct {
execinfra.ProcessorBaseNoHelper
colexecop.NonExplainable

mode columnarizerMode
helper colmem.SetAccountingHelper
input execinfra.RowSource
da tree.DatumAlloc
mode columnarizerMode
initialized bool
helper colmem.SetAccountingHelper
input execinfra.RowSource
da tree.DatumAlloc

batch coldata.Batch
vecs coldata.TypedVecs
Expand Down Expand Up @@ -131,7 +132,7 @@ func newColumnarizer(
// Close will call InternalClose(). Note that we don't return
// any trailing metadata here because the columnarizers
// propagate it in DrainMeta.
if err := c.Close(c.Ctx); buildutil.CrdbTestBuild && err != nil {
if err := c.Close(c.Ctx()); buildutil.CrdbTestBuild && err != nil {
// Close never returns an error.
colexecerror.InternalError(errors.NewAssertionErrorWithWrappedErrf(err, "unexpected error from Columnarizer.Close"))
}
Expand All @@ -145,13 +146,10 @@ func newColumnarizer(

// Init is part of the colexecop.Operator interface.
func (c *Columnarizer) Init(ctx context.Context) {
if c.removedFromFlow {
return
}
if c.Ctx != nil {
// Init has already been called.
if c.removedFromFlow || c.initialized {
return
}
c.initialized = true
c.accumulatedMeta = make([]execinfrapb.ProducerMetadata, 0, 1)
ctx = c.StartInternalNoSpan(ctx)
c.input.Start(ctx)
Expand Down Expand Up @@ -241,7 +239,7 @@ func (c *Columnarizer) DrainMeta() []execinfrapb.ProducerMetadata {
// We no longer need the batch.
c.batch = nil
c.helper.ReleaseMemory()
if c.Ctx == nil {
if !c.initialized {
// The columnarizer wasn't initialized, so the wrapped processors might
// not have been started leaving them in an unsafe to drain state, so
// we skip the draining. Mostly likely this happened because a panic was
Expand Down
10 changes: 1 addition & 9 deletions pkg/sql/colexec/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,15 +311,7 @@ func (m *Materializer) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata

func (m *Materializer) close() {
if m.InternalClose() {
if m.Ctx == nil {
// In some edge cases (like when Init of an operator above this
// materializer encounters a panic), the materializer might never be
// started, yet it still will attempt to close its Closers. This
// context is only used for logging purposes, so it is ok to grab
// the background context in order to prevent a NPE below.
m.Ctx = context.Background()
}
m.closers.CloseAndLogOnErr(m.Ctx, "materializer")
m.closers.CloseAndLogOnErr(m.Ctx(), "materializer")
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/execinfra/metadata_test_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (mtr *MetadataTestReceiver) Next() (rowenc.EncDatumRow, *execinfrapb.Produc
// We don't use ProcessorBase.ProcessRowHelper() here because we need
// special handling for errors: this proc never starts draining in order for
// it to be as unintrusive as possible.
outRow, ok, err := mtr.OutputHelper.ProcessRow(mtr.Ctx, row)
outRow, ok, err := mtr.OutputHelper.ProcessRow(mtr.Ctx(), row)
if err != nil {
mtr.trailingMeta = append(mtr.trailingMeta, execinfrapb.ProducerMetadata{Err: err})
continue
Expand Down
Loading

0 comments on commit aa4881f

Please sign in to comment.