Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-22.1: sql: audit all processors to make their closure bullet-proof #92362

Merged
merged 1 commit into from
Nov 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func (bp *backupDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Producer
func (bp *backupDataProcessor) close() {
bp.cancelAndWaitForWorker()
if bp.InternalClose() {
bp.memAcc.Close(bp.Ctx)
bp.memAcc.Close(bp.Ctx())
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,8 +568,8 @@ func (rd *restoreDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Produce
prog.ProgressDetails = *details
case meta := <-rd.metaCh:
return nil, meta
case <-rd.Ctx.Done():
rd.MoveToDraining(rd.Ctx.Err())
case <-rd.Ctx().Done():
rd.MoveToDraining(rd.Ctx().Err())
return nil, rd.DrainHelper()
}

Expand Down
9 changes: 2 additions & 7 deletions pkg/ccl/backupccl/restore_data_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,8 +374,7 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) {
}
expectedKVs := slurpSSTablesLatestKey(t, filepath.Join(dir, "foo"), slurp, srcPrefix, newPrefix)

mockRestoreDataProcessor, err := newTestingRestoreDataProcessor(ctx, &evalCtx, &flowCtx,
mockRestoreDataSpec)
mockRestoreDataProcessor, err := newTestingRestoreDataProcessor(&evalCtx, &flowCtx, mockRestoreDataSpec)
require.NoError(t, err)
ssts := make(chan mergedSST, 1)
require.NoError(t, mockRestoreDataProcessor.openSSTs(ctx, restoreSpanEntry, ssts))
Expand Down Expand Up @@ -417,15 +416,11 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) {
}

func newTestingRestoreDataProcessor(
ctx context.Context,
evalCtx *tree.EvalContext,
flowCtx *execinfra.FlowCtx,
spec execinfrapb.RestoreDataSpec,
evalCtx *tree.EvalContext, flowCtx *execinfra.FlowCtx, spec execinfrapb.RestoreDataSpec,
) (*restoreDataProcessor, error) {
rd := &restoreDataProcessor{
ProcessorBase: execinfra.ProcessorBase{
ProcessorBaseNoHelper: execinfra.ProcessorBaseNoHelper{
Ctx: ctx,
EvalCtx: evalCtx,
},
},
Expand Down
46 changes: 23 additions & 23 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,15 +484,15 @@ func (ca *changeAggregator) close() {
}
if ca.sink != nil {
if err := ca.sink.Close(); err != nil {
log.Warningf(ca.Ctx, `error closing sink. goroutines may have leaked: %v`, err)
log.Warningf(ca.Ctx(), `error closing sink. goroutines may have leaked: %v`, err)
}
}

ca.memAcc.Close(ca.Ctx)
ca.memAcc.Close(ca.Ctx())
if ca.kvFeedMemMon != nil {
ca.kvFeedMemMon.Stop(ca.Ctx)
ca.kvFeedMemMon.Stop(ca.Ctx())
}
ca.MemMonitor.Stop(ca.Ctx)
ca.MemMonitor.Stop(ca.Ctx())
ca.InternalClose()
}

Expand Down Expand Up @@ -536,7 +536,7 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet
// kvFeed, sends off this event to the event consumer, and flushes the sink
// if necessary.
func (ca *changeAggregator) tick() error {
event, err := ca.eventProducer.Get(ca.Ctx)
event, err := ca.eventProducer.Get(ca.Ctx())
if err != nil {
return err
}
Expand All @@ -551,16 +551,16 @@ func (ca *changeAggregator) tick() error {
ca.sliMetrics.AdmitLatency.RecordValue(timeutil.Since(event.Timestamp().GoTime()).Nanoseconds())
}
ca.recentKVCount++
return ca.eventConsumer.ConsumeEvent(ca.Ctx, event)
return ca.eventConsumer.ConsumeEvent(ca.Ctx(), event)
case kvevent.TypeResolved:
a := event.DetachAlloc()
a.Release(ca.Ctx)
a.Release(ca.Ctx())
resolved := event.Resolved()
if ca.knobs.ShouldSkipResolved == nil || !ca.knobs.ShouldSkipResolved(resolved) {
return ca.noteResolvedSpan(resolved)
}
case kvevent.TypeFlush:
return ca.sink.Flush(ca.Ctx)
return ca.sink.Flush(ca.Ctx())
}

return nil
Expand Down Expand Up @@ -601,7 +601,7 @@ func (ca *changeAggregator) flushFrontier() error {
// otherwise, we could lose buffered messages and violate the
// at-least-once guarantee. This is also true for checkpointing the
// resolved spans in the job progress.
if err := ca.sink.Flush(ca.Ctx); err != nil {
if err := ca.sink.Flush(ca.Ctx()); err != nil {
return err
}

Expand All @@ -626,7 +626,7 @@ func (ca *changeAggregator) flushFrontier() error {

func (ca *changeAggregator) emitResolved(batch jobspb.ResolvedSpans) error {
// TODO(smiskin): Remove post-22.2
if !ca.flowCtx.Cfg.Settings.Version.IsActive(ca.Ctx, clusterversion.ChangefeedIdleness) {
if !ca.flowCtx.Cfg.Settings.Version.IsActive(ca.Ctx(), clusterversion.ChangefeedIdleness) {
for _, resolved := range batch.ResolvedSpans {
resolvedBytes, err := protoutil.Marshal(&resolved)
if err != nil {
Expand Down Expand Up @@ -1414,11 +1414,11 @@ func (cf *changeFrontier) close() {
}
if cf.sink != nil {
if err := cf.sink.Close(); err != nil {
log.Warningf(cf.Ctx, `error closing sink. goroutines may have leaked: %v`, err)
log.Warningf(cf.Ctx(), `error closing sink. goroutines may have leaked: %v`, err)
}
}
cf.memAcc.Close(cf.Ctx)
cf.MemMonitor.Stop(cf.Ctx)
cf.memAcc.Close(cf.Ctx())
cf.MemMonitor.Stop(cf.Ctx())
}
}

Expand Down Expand Up @@ -1506,7 +1506,7 @@ func (cf *changeFrontier) noteAggregatorProgress(d rowenc.EncDatum) error {
}

var resolvedSpans jobspb.ResolvedSpans
if cf.flowCtx.Cfg.Settings.Version.IsActive(cf.Ctx, clusterversion.ChangefeedIdleness) {
if cf.flowCtx.Cfg.Settings.Version.IsActive(cf.Ctx(), clusterversion.ChangefeedIdleness) {
if err := protoutil.Unmarshal([]byte(*raw), &resolvedSpans); err != nil {
return errors.NewAssertionErrorWithWrappedErrf(err,
`unmarshalling aggregator progress update: %x`, raw)
Expand Down Expand Up @@ -1534,7 +1534,7 @@ func (cf *changeFrontier) noteAggregatorProgress(d rowenc.EncDatum) error {
// job progress update closure, but it currently doesn't pass along the info
// we'd need to do it that way.
if !resolved.Timestamp.IsEmpty() && resolved.Timestamp.Less(cf.highWaterAtStart) {
logcrash.ReportOrPanic(cf.Ctx, &cf.flowCtx.Cfg.Settings.SV,
logcrash.ReportOrPanic(cf.Ctx(), &cf.flowCtx.Cfg.Settings.SV,
`got a span level timestamp %s for %s that is less than the initial high-water %s`,
redact.Safe(resolved.Timestamp), resolved.Span, redact.Safe(cf.highWaterAtStart))
continue
Expand Down Expand Up @@ -1636,7 +1636,7 @@ func (cf *changeFrontier) maybeCheckpointJob(
if err != nil {
return false, err
}
cf.js.checkpointCompleted(cf.Ctx, timeutil.Since(checkpointStart))
cf.js.checkpointCompleted(cf.Ctx(), timeutil.Since(checkpointStart))
return updated, nil
}

Expand All @@ -1652,7 +1652,7 @@ func (cf *changeFrontier) checkpointJobProgress(
}
cf.metrics.FrontierUpdates.Inc(1)
var updateSkipped error
if err := cf.js.job.Update(cf.Ctx, nil, func(
if err := cf.js.job.Update(cf.Ctx(), nil, func(
txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater,
) error {
// If we're unable to update the job due to the job state, such as during
Expand All @@ -1677,8 +1677,8 @@ func (cf *changeFrontier) checkpointJobProgress(
if !changefeedbase.ActiveProtectedTimestampsEnabled.Get(&cf.flowCtx.Cfg.Settings.SV) {
timestampManager = cf.deprecatedManageProtectedTimestamps
}
if err := timestampManager(cf.Ctx, txn, changefeedProgress); err != nil {
log.Warningf(cf.Ctx, "error managing protected timestamp record: %v", err)
if err := timestampManager(cf.Ctx(), txn, changefeedProgress); err != nil {
log.Warningf(cf.Ctx(), "error managing protected timestamp record: %v", err)
return err
}

Expand All @@ -1702,7 +1702,7 @@ func (cf *changeFrontier) checkpointJobProgress(
}

if updateSkipped != nil {
log.Warningf(cf.Ctx, "skipping changefeed checkpoint: %s", updateSkipped)
log.Warningf(cf.Ctx(), "skipping changefeed checkpoint: %s", updateSkipped)
return false, nil
}

Expand Down Expand Up @@ -1795,7 +1795,7 @@ func (cf *changeFrontier) maybeEmitResolved(newResolved hlc.Timestamp) error {
if !shouldEmit {
return nil
}
if err := emitResolvedTimestamp(cf.Ctx, cf.encoder, cf.sink, newResolved); err != nil {
if err := emitResolvedTimestamp(cf.Ctx(), cf.encoder, cf.sink, newResolved); err != nil {
return err
}
cf.lastEmitResolved = newResolved.GoTime()
Expand Down Expand Up @@ -1834,13 +1834,13 @@ func (cf *changeFrontier) maybeLogBehindSpan(frontierChanged bool) {
description = fmt.Sprintf("job %d", cf.spec.JobID)
}
if frontierChanged {
log.Infof(cf.Ctx, "%s new resolved timestamp %s is behind by %s",
log.Infof(cf.Ctx(), "%s new resolved timestamp %s is behind by %s",
description, frontier, resolvedBehind)
}

if cf.slowLogEveryN.ShouldProcess(now) {
s := cf.frontier.PeekFrontierSpan()
log.Infof(cf.Ctx, "%s span %s is behind by %s", description, s, resolvedBehind)
log.Infof(cf.Ctx(), "%s span %s is behind by %s", description, s, resolvedBehind)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func (sf *streamIngestionFrontier) Next() (

if err := sf.maybeUpdatePartitionProgress(); err != nil {
// Updating the partition progress isn't a fatal error.
log.Errorf(sf.Ctx, "failed to update partition progress: %+v", err)
log.Errorf(sf.Ctx(), "failed to update partition progress: %+v", err)
}

var frontierChanged bool
Expand All @@ -268,8 +268,8 @@ func (sf *streamIngestionFrontier) Next() (
// Send back a row to the job so that it can update the progress.
newResolvedTS := sf.frontier.Frontier()
select {
case <-sf.Ctx.Done():
sf.MoveToDraining(sf.Ctx.Err())
case <-sf.Ctx().Done():
sf.MoveToDraining(sf.Ctx().Err())
return nil, sf.DrainHelper()
// Send the frontier update in the heartbeat to the source cluster.
case sf.heartbeatSender.frontierUpdates <- newResolvedTS:
Expand Down Expand Up @@ -301,7 +301,7 @@ func (sf *streamIngestionFrontier) Next() (
func (sf *streamIngestionFrontier) ConsumerClosed() {
if sf.InternalClose() {
if err := sf.heartbeatSender.stop(); err != nil {
log.Errorf(sf.Ctx, "heartbeatSender exited with error: %s", err.Error())
log.Errorf(sf.Ctx(), "heartbeatSender exited with error: %s", err.Error())
}
}
}
Expand Down Expand Up @@ -349,7 +349,7 @@ func (sf *streamIngestionFrontier) noteResolvedTimestamps(
// maybeUpdatePartitionProgress polls the frontier and updates the job progress with
// partition-specific information to track the status of each partition.
func (sf *streamIngestionFrontier) maybeUpdatePartitionProgress() error {
ctx := sf.Ctx
ctx := sf.Ctx()
updateFreq := PartitionProgressFrequency.Get(&sf.flowCtx.Cfg.Settings.SV)
if updateFreq == 0 || timeutil.Since(sf.lastPartitionUpdate) < updateFreq {
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func (sip *streamIngestionProcessor) close() {
_ = client.Close()
}
if sip.batcher != nil {
sip.batcher.Close(sip.Ctx)
sip.batcher.Close(sip.Ctx())
}
if sip.maxFlushRateTimer != nil {
sip.maxFlushRateTimer.Stop()
Expand Down Expand Up @@ -476,7 +476,7 @@ func (sip *streamIngestionProcessor) consumeEvents() (*jobspb.ResolvedSpans, err
if streamingKnobs, ok := sip.FlowCtx.TestingKnobs().StreamingTestingKnobs.(*sql.StreamingTestingKnobs); ok {
if streamingKnobs != nil {
if streamingKnobs.RunAfterReceivingEvent != nil {
streamingKnobs.RunAfterReceivingEvent(sip.Ctx)
streamingKnobs.RunAfterReceivingEvent(sip.Ctx())
}
}
}
Expand All @@ -503,13 +503,13 @@ func (sip *streamIngestionProcessor) consumeEvents() (*jobspb.ResolvedSpans, err

return sip.flush()
case streamingccl.GenerationEvent:
log.Info(sip.Ctx, "GenerationEvent received")
log.Info(sip.Ctx(), "GenerationEvent received")
select {
case <-sip.cutoverCh:
sip.internalDrained = true
return nil, nil
case <-sip.Ctx.Done():
return nil, sip.Ctx.Err()
case <-sip.Ctx().Done():
return nil, sip.Ctx().Err()
}
default:
return nil, errors.Newf("unknown streaming event type %v", event.Type())
Expand Down Expand Up @@ -552,7 +552,7 @@ func (sip *streamIngestionProcessor) bufferKV(event partitionEvent) error {
}

func (sip *streamIngestionProcessor) bufferCheckpoint(event partitionEvent) error {
log.Infof(sip.Ctx, "got checkpoint %v", event.GetResolved())
log.Infof(sip.Ctx(), "got checkpoint %v", event.GetResolved())
resolvedTimePtr := event.GetResolved()
if resolvedTimePtr == nil {
return errors.New("checkpoint event expected to have a resolved timestamp")
Expand All @@ -574,13 +574,13 @@ func (sip *streamIngestionProcessor) flush() (*jobspb.ResolvedSpans, error) {

totalSize := 0
for _, kv := range sip.curBatch {
if err := sip.batcher.AddMVCCKey(sip.Ctx, kv.Key, kv.Value); err != nil {
if err := sip.batcher.AddMVCCKey(sip.Ctx(), kv.Key, kv.Value); err != nil {
return nil, errors.Wrapf(err, "adding key %+v", kv)
}
totalSize += len(kv.Key.Key) + len(kv.Value)
}

if err := sip.batcher.Flush(sip.Ctx); err != nil {
if err := sip.batcher.Flush(sip.Ctx()); err != nil {
return nil, errors.Wrap(err, "flushing")
}
sip.metrics.Flushes.Inc(1)
Expand All @@ -605,7 +605,7 @@ func (sip *streamIngestionProcessor) flush() (*jobspb.ResolvedSpans, error) {
sip.lastFlushTime = timeutil.Now()
sip.bufferedCheckpoints = make(map[string]hlc.Timestamp)

return &flushedCheckpoints, sip.batcher.Reset(sip.Ctx)
return &flushedCheckpoints, sip.batcher.Reset(sip.Ctx())
}

func init() {
Expand Down
20 changes: 9 additions & 11 deletions pkg/sql/colexec/columnarizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,11 @@ type Columnarizer struct {
execinfra.ProcessorBaseNoHelper
colexecop.NonExplainable

mode columnarizerMode
helper colmem.SetAccountingHelper
input execinfra.RowSource
da tree.DatumAlloc
mode columnarizerMode
initialized bool
helper colmem.SetAccountingHelper
input execinfra.RowSource
da tree.DatumAlloc

batch coldata.Batch
vecs coldata.TypedVecs
Expand Down Expand Up @@ -131,7 +132,7 @@ func newColumnarizer(
// Close will call InternalClose(). Note that we don't return
// any trailing metadata here because the columnarizers
// propagate it in DrainMeta.
if err := c.Close(c.Ctx); buildutil.CrdbTestBuild && err != nil {
if err := c.Close(c.Ctx()); buildutil.CrdbTestBuild && err != nil {
// Close never returns an error.
colexecerror.InternalError(errors.NewAssertionErrorWithWrappedErrf(err, "unexpected error from Columnarizer.Close"))
}
Expand All @@ -145,13 +146,10 @@ func newColumnarizer(

// Init is part of the colexecop.Operator interface.
func (c *Columnarizer) Init(ctx context.Context) {
if c.removedFromFlow {
return
}
if c.Ctx != nil {
// Init has already been called.
if c.removedFromFlow || c.initialized {
return
}
c.initialized = true
c.accumulatedMeta = make([]execinfrapb.ProducerMetadata, 0, 1)
ctx = c.StartInternalNoSpan(ctx)
c.input.Start(ctx)
Expand Down Expand Up @@ -241,7 +239,7 @@ func (c *Columnarizer) DrainMeta() []execinfrapb.ProducerMetadata {
// We no longer need the batch.
c.batch = nil
c.helper.ReleaseMemory()
if c.Ctx == nil {
if !c.initialized {
// The columnarizer wasn't initialized, so the wrapped processors might
// not have been started leaving them in an unsafe to drain state, so
// we skip the draining. Mostly likely this happened because a panic was
Expand Down
10 changes: 1 addition & 9 deletions pkg/sql/colexec/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,15 +311,7 @@ func (m *Materializer) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata

func (m *Materializer) close() {
if m.InternalClose() {
if m.Ctx == nil {
// In some edge cases (like when Init of an operator above this
// materializer encounters a panic), the materializer might never be
// started, yet it still will attempt to close its Closers. This
// context is only used for logging purposes, so it is ok to grab
// the background context in order to prevent a NPE below.
m.Ctx = context.Background()
}
m.closers.CloseAndLogOnErr(m.Ctx, "materializer")
m.closers.CloseAndLogOnErr(m.Ctx(), "materializer")
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/execinfra/metadata_test_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (mtr *MetadataTestReceiver) Next() (rowenc.EncDatumRow, *execinfrapb.Produc
// We don't use ProcessorBase.ProcessRowHelper() here because we need
// special handling for errors: this proc never starts draining in order for
// it to be as unintrusive as possible.
outRow, ok, err := mtr.OutputHelper.ProcessRow(mtr.Ctx, row)
outRow, ok, err := mtr.OutputHelper.ProcessRow(mtr.Ctx(), row)
if err != nil {
mtr.trailingMeta = append(mtr.trailingMeta, execinfrapb.ProducerMetadata{Err: err})
continue
Expand Down
Loading