Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
91969: sql: audit all processors to make their closure bullet-proof r=yuzefovich a=yuzefovich

This commit replaces all usages of `ProcessorBaseNoHelper.Ctx` field
with a call to the newly-introduced `Ctx()` method which returns
a background context if the processor hasn't been started. This change
makes it so that all processors now respect the contract of
`colexecop.Closer` interface which says that `Close` must be safe to
call even if `Init` hasn't been performed (in the context of processors
this means that `Columnarizer.Init` wasn't called meaning that
`Processor.Start` wasn't either).

Initially, I attempted to fix this in cockroachdb#91446 by putting the protection
into the columnarizer, but that led to broken assumptions since we
wouldn't close all closers that we expected to (in particular, the
materializer that is the input to the wrapped row-by-row processor
wouldn't be closed). This commit takes a different approach and should
fix the issue for good without introducing any flakiness.

As a result, this commit fixes a rarely hit issue when the aggregator
and the zigzag joiner attempt to log when they are closed if they
haven't been started (that we see occasionally from sentry). The issue
is quite rare though, so no release note seems appropriate.

Fixes: cockroachdb#84902.
Fixes: cockroachdb#91845.

Release note: None

92265: kvconnectorccl: allow secondary tenants to prefetch range lookups r=ajwerner a=ajwerner

This patch permits the tenant connector to request more than 0 ranges to be prefetched. In order to enable this, we add logic in the implementation of the RangeLookup RPC to filter any results which are not intended for this tenant.

Fixes cockroachdb#91433

Release note: None

92284: ui: show stmt idle time in execution/planning chart r=matthewtodd a=matthewtodd

Part of cockroachdb#86667.

The example statement fingerprint below comes from me connecting to a local cockroach instance with `psql` (since `cockroach sql` by default runs a few extra queries that confuse the timings) and individually running the following lines to simulate some inter-statement latency:

```sql
BEGIN;
SELECT 1;
COMMIT;
```

| Before | After |
|--|--|
|<img width="1372" alt="Screen Shot 2022-11-21 at 2 35 49 PM" src="https://user-images.githubusercontent.com/5261/203144731-fd5a42fb-7b60-473a-990a-fb5fabf2756a.png">|<img width="1372" alt="Screen Shot 2022-11-21 at 2 35 58 PM" src="https://user-images.githubusercontent.com/5261/203144736-2600c0f9-7811-4b9d-a9e1-dbb8aeea71ee.png">|

Release note (ui change): The "Statement Execution and Planning Time" chart on the statement fingerprint page now includes a third value, "Idle," representing the time spent by the application waiting to execute this statement while holding a transaction open.

Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Andrew Werner <[email protected]>
Co-authored-by: Matthew Todd <[email protected]>
  • Loading branch information
4 people committed Nov 22, 2022
4 parents e6b30e1 + 19f3386 + 298b016 + d77b7dd commit 7ffaece
Show file tree
Hide file tree
Showing 41 changed files with 549 additions and 284 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (bp *backupDataProcessor) close() {
bp.cancelAndWaitForWorker()
bp.agg.Close()
if bp.InternalClose() {
bp.memAcc.Close(bp.Ctx)
bp.memAcc.Close(bp.Ctx())
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,8 +559,8 @@ func (rd *restoreDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Produce
prog.ProgressDetails = *details
case meta := <-rd.metaCh:
return nil, meta
case <-rd.Ctx.Done():
rd.MoveToDraining(rd.Ctx.Err())
case <-rd.Ctx().Done():
rd.MoveToDraining(rd.Ctx().Err())
return nil, rd.DrainHelper()
}

Expand Down
9 changes: 2 additions & 7 deletions pkg/ccl/backupccl/restore_data_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,8 +395,7 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) {
}
expectedKVs := slurpSSTablesLatestKey(t, filepath.Join(dir, "foo"), slurp, srcPrefix, newPrefix)

mockRestoreDataProcessor, err := newTestingRestoreDataProcessor(ctx, &evalCtx, &flowCtx,
mockRestoreDataSpec)
mockRestoreDataProcessor, err := newTestingRestoreDataProcessor(&evalCtx, &flowCtx, mockRestoreDataSpec)
require.NoError(t, err)
ssts := make(chan mergedSST, 1)
require.NoError(t, mockRestoreDataProcessor.openSSTs(ctx, restoreSpanEntry, ssts))
Expand Down Expand Up @@ -439,15 +438,11 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) {
}

func newTestingRestoreDataProcessor(
ctx context.Context,
evalCtx *eval.Context,
flowCtx *execinfra.FlowCtx,
spec execinfrapb.RestoreDataSpec,
evalCtx *eval.Context, flowCtx *execinfra.FlowCtx, spec execinfrapb.RestoreDataSpec,
) (*restoreDataProcessor, error) {
rd := &restoreDataProcessor{
ProcessorBase: execinfra.ProcessorBase{
ProcessorBaseNoHelper: execinfra.ProcessorBaseNoHelper{
Ctx: ctx,
EvalCtx: evalCtx,
},
},
Expand Down
40 changes: 20 additions & 20 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,19 +446,19 @@ func (ca *changeAggregator) close() {
}
if ca.eventConsumer != nil {
if err := ca.eventConsumer.Close(); err != nil {
log.Warningf(ca.Ctx, "error closing event consumer: %s", err)
log.Warningf(ca.Ctx(), "error closing event consumer: %s", err)
}
}

if ca.sink != nil {
// Best effort: context is often cancel by now, so we expect to see an error
_ = ca.sink.Close()
}
ca.memAcc.Close(ca.Ctx)
ca.memAcc.Close(ca.Ctx())
if ca.kvFeedMemMon != nil {
ca.kvFeedMemMon.Stop(ca.Ctx)
ca.kvFeedMemMon.Stop(ca.Ctx())
}
ca.MemMonitor.Stop(ca.Ctx)
ca.MemMonitor.Stop(ca.Ctx())
ca.InternalClose()
}

Expand Down Expand Up @@ -501,7 +501,7 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet
// kvFeed, sends off this event to the event consumer, and flushes the sink
// if necessary.
func (ca *changeAggregator) tick() error {
event, err := ca.eventProducer.Get(ca.Ctx)
event, err := ca.eventProducer.Get(ca.Ctx())
if err != nil {
return err
}
Expand All @@ -516,16 +516,16 @@ func (ca *changeAggregator) tick() error {
ca.sliMetrics.AdmitLatency.RecordValue(timeutil.Since(event.Timestamp().GoTime()).Nanoseconds())
}
ca.recentKVCount++
return ca.eventConsumer.ConsumeEvent(ca.Ctx, event)
return ca.eventConsumer.ConsumeEvent(ca.Ctx(), event)
case kvevent.TypeResolved:
a := event.DetachAlloc()
a.Release(ca.Ctx)
a.Release(ca.Ctx())
resolved := event.Resolved()
if ca.knobs.FilterSpanWithMutation == nil || !ca.knobs.FilterSpanWithMutation(&resolved) {
return ca.noteResolvedSpan(resolved)
}
case kvevent.TypeFlush:
return ca.sink.Flush(ca.Ctx)
return ca.sink.Flush(ca.Ctx())
}

return nil
Expand Down Expand Up @@ -568,7 +568,7 @@ func (ca *changeAggregator) flushFrontier() error {
// otherwise, we could lose buffered messages and violate the
// at-least-once guarantee. This is also true for checkpointing the
// resolved spans in the job progress.
if err := ca.sink.Flush(ca.Ctx); err != nil {
if err := ca.sink.Flush(ca.Ctx()); err != nil {
return err
}

Expand Down Expand Up @@ -996,8 +996,8 @@ func (cf *changeFrontier) close() {
// Best effort: context is often cancel by now, so we expect to see an error
_ = cf.sink.Close()
}
cf.memAcc.Close(cf.Ctx)
cf.MemMonitor.Stop(cf.Ctx)
cf.memAcc.Close(cf.Ctx())
cf.MemMonitor.Stop(cf.Ctx())
}
}

Expand Down Expand Up @@ -1104,7 +1104,7 @@ func (cf *changeFrontier) noteAggregatorProgress(d rowenc.EncDatum) error {
// job progress update closure, but it currently doesn't pass along the info
// we'd need to do it that way.
if !resolved.Timestamp.IsEmpty() && resolved.Timestamp.Less(cf.highWaterAtStart) {
logcrash.ReportOrPanic(cf.Ctx, &cf.flowCtx.Cfg.Settings.SV,
logcrash.ReportOrPanic(cf.Ctx(), &cf.flowCtx.Cfg.Settings.SV,
`got a span level timestamp %s for %s that is less than the initial high-water %s`,
redact.Safe(resolved.Timestamp), resolved.Span, redact.Safe(cf.highWaterAtStart))
continue
Expand Down Expand Up @@ -1206,7 +1206,7 @@ func (cf *changeFrontier) maybeCheckpointJob(
if err != nil {
return false, err
}
cf.js.checkpointCompleted(cf.Ctx, timeutil.Since(checkpointStart))
cf.js.checkpointCompleted(cf.Ctx(), timeutil.Since(checkpointStart))
return updated, nil
}

Expand All @@ -1224,7 +1224,7 @@ func (cf *changeFrontier) checkpointJobProgress(
var updateSkipped error
if cf.js.job != nil {

if err := cf.js.job.Update(cf.Ctx, nil, func(
if err := cf.js.job.Update(cf.Ctx(), nil, func(
txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater,
) error {
// If we're unable to update the job due to the job state, such as during
Expand All @@ -1249,8 +1249,8 @@ func (cf *changeFrontier) checkpointJobProgress(
if !changefeedbase.ActiveProtectedTimestampsEnabled.Get(&cf.flowCtx.Cfg.Settings.SV) {
timestampManager = cf.deprecatedManageProtectedTimestamps
}
if err := timestampManager(cf.Ctx, txn, changefeedProgress); err != nil {
log.Warningf(cf.Ctx, "error managing protected timestamp record: %v", err)
if err := timestampManager(cf.Ctx(), txn, changefeedProgress); err != nil {
log.Warningf(cf.Ctx(), "error managing protected timestamp record: %v", err)
return err
}

Expand Down Expand Up @@ -1278,7 +1278,7 @@ func (cf *changeFrontier) checkpointJobProgress(
}

if updateSkipped != nil {
log.Warningf(cf.Ctx, "skipping changefeed checkpoint: %s", updateSkipped)
log.Warningf(cf.Ctx(), "skipping changefeed checkpoint: %s", updateSkipped)
return false, nil
}

Expand Down Expand Up @@ -1377,7 +1377,7 @@ func (cf *changeFrontier) maybeEmitResolved(newResolved hlc.Timestamp) error {
if !shouldEmit {
return nil
}
if err := emitResolvedTimestamp(cf.Ctx, cf.encoder, cf.sink, newResolved); err != nil {
if err := emitResolvedTimestamp(cf.Ctx(), cf.encoder, cf.sink, newResolved); err != nil {
return err
}
cf.lastEmitResolved = newResolved.GoTime()
Expand Down Expand Up @@ -1416,13 +1416,13 @@ func (cf *changeFrontier) maybeLogBehindSpan(frontierChanged bool) {
description = fmt.Sprintf("job %d", cf.spec.JobID)
}
if frontierChanged && cf.slowLogEveryN.ShouldProcess(now) {
log.Infof(cf.Ctx, "%s new resolved timestamp %s is behind by %s",
log.Infof(cf.Ctx(), "%s new resolved timestamp %s is behind by %s",
description, frontier, resolvedBehind)
}

if cf.slowLogEveryN.ShouldProcess(now) {
s := cf.frontier.PeekFrontierSpan()
log.Infof(cf.Ctx, "%s span %s is behind by %s", description, s, resolvedBehind)
log.Infof(cf.Ctx(), "%s span %s is behind by %s", description, s, resolvedBehind)
}
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/kvccl/kvtenantccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ go_test(
"main_test.go",
"setting_overrides_test.go",
"tenant_kv_test.go",
"tenant_range_lookup_test.go",
"tenant_trace_test.go",
"tenant_upgrade_test.go",
],
Expand All @@ -59,7 +60,10 @@ go_test(
"//pkg/config",
"//pkg/gossip",
"//pkg/jobs",
"//pkg/keys",
"//pkg/kv/kvclient/kvcoord",
"//pkg/kv/kvclient/kvtenant",
"//pkg/kv/kvclient/rangecache",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/roachpb",
Expand Down
5 changes: 1 addition & 4 deletions pkg/ccl/kvccl/kvtenantccl/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,10 +389,7 @@ func (c *Connector) RangeLookup(
// for more discussion on the choice of ReadConsistency and its
// implications.
ReadConsistency: rc,
// Until we add protection in the Internal service implementation to
// prevent prefetching from traversing into RangeDescriptors owned by
// other tenants, we must disable prefetching.
PrefetchNum: 0,
PrefetchNum: kvcoord.RangeLookupPrefetchCount,
PrefetchReverse: useReverseScan,
})
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/kvccl/kvtenantccl/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
Expand Down Expand Up @@ -313,7 +314,7 @@ func TestConnectorRangeLookup(t *testing.T) {
// Validate request.
assert.Equal(t, roachpb.RKey("a"), req.Key)
assert.Equal(t, roachpb.READ_UNCOMMITTED, req.ReadConsistency)
assert.Equal(t, int64(0), req.PrefetchNum)
assert.Equal(t, int64(kvcoord.RangeLookupPrefetchCount), req.PrefetchNum)
assert.Equal(t, false, req.PrefetchReverse)

// Respond.
Expand Down
81 changes: 81 additions & 0 deletions pkg/ccl/kvccl/kvtenantccl/tenant_range_lookup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package kvtenantccl_test

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/require"
)

// TestRangeLookupPrefetchFiltering is an integration test to ensure that
// range results are filtered for the client.
func TestRangeLookupPrefetchFiltering(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
DisableDefaultTestTenant: true, // we're going to manually add tenants
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
DisableMergeQueue: true,
},
},
},
})
defer tc.Stopper().Stop(ctx)

ten2ID := roachpb.MustMakeTenantID(2)
tenant2, err := tc.Server(0).StartTenant(ctx, base.TestTenantArgs{
TenantID: ten2ID,
})
require.NoError(t, err)

// Split some ranges within tenant2 that we'll want to see in prefetch.
ten2Codec := keys.MakeSQLCodec(ten2ID)
ten2Split1 := append(ten2Codec.TenantPrefix(), 'a')
ten2Split2 := append(ten2Codec.TenantPrefix(), 'b')
{
tc.SplitRangeOrFatal(t, ten2Split1)
tc.SplitRangeOrFatal(t, ten2Split2)
}

// Split some ranges for the tenant which comes after tenant2.
{
ten3Codec := keys.MakeSQLCodec(roachpb.MustMakeTenantID(3))
tc.SplitRangeOrFatal(t, ten3Codec.TenantPrefix())
tc.SplitRangeOrFatal(t, append(ten3Codec.TenantPrefix(), 'b'))
tc.SplitRangeOrFatal(t, append(ten3Codec.TenantPrefix(), 'c'))
}

// Do the fetch and make sure we prefetch all the ranges we should see,
// and none of the ranges we should not.
db := tenant2.DistSenderI().(*kvcoord.DistSender).RangeDescriptorCache().DB()
prefixRKey := keys.MustAddr(ten2Codec.TenantPrefix())
res, prefetch, err := db.RangeLookup(
ctx, prefixRKey,
rangecache.ReadFromLeaseholder, false, /* useReverseScan */
)
require.NoError(t, err)
require.Len(t, res, 1)
require.Equal(t, prefixRKey, res[0].StartKey)
require.Len(t, prefetch, 2)
require.Equal(t, keys.MustAddr(ten2Split1), prefetch[0].StartKey)
require.Equal(t, keys.MustAddr(ten2Split2), prefetch[1].StartKey)
}
Original file line number Diff line number Diff line change
Expand Up @@ -298,13 +298,13 @@ func (sf *streamIngestionFrontier) Next() (

if err := sf.maybeUpdatePartitionProgress(); err != nil {
// Updating the partition progress isn't a fatal error.
log.Errorf(sf.Ctx, "failed to update partition progress: %+v", err)
log.Errorf(sf.Ctx(), "failed to update partition progress: %+v", err)
}

// Send back a row to the job so that it can update the progress.
select {
case <-sf.Ctx.Done():
sf.MoveToDraining(sf.Ctx.Err())
case <-sf.Ctx().Done():
sf.MoveToDraining(sf.Ctx().Err())
return nil, sf.DrainHelper()
// Send the latest persisted highwater in the heartbeat to the source cluster
// as even with retries we will never request an earlier row than it, and
Expand All @@ -315,7 +315,7 @@ func (sf *streamIngestionFrontier) Next() (
case <-sf.heartbeatSender.stoppedChan:
err := sf.heartbeatSender.wait()
if err != nil {
log.Errorf(sf.Ctx, "heartbeat sender exited with error: %s", err)
log.Errorf(sf.Ctx(), "heartbeat sender exited with error: %s", err)
}
sf.MoveToDraining(err)
return nil, sf.DrainHelper()
Expand All @@ -326,7 +326,7 @@ func (sf *streamIngestionFrontier) Next() (

func (sf *streamIngestionFrontier) close() {
if err := sf.heartbeatSender.stop(); err != nil {
log.Errorf(sf.Ctx, "heartbeat sender exited with error: %s", err)
log.Errorf(sf.Ctx(), "heartbeat sender exited with error: %s", err)
}
if sf.InternalClose() {
sf.metrics.RunningCount.Dec(1)
Expand Down Expand Up @@ -392,7 +392,7 @@ func (sf *streamIngestionFrontier) noteResolvedTimestamps(
// maybeUpdatePartitionProgress polls the frontier and updates the job progress with
// partition-specific information to track the status of each partition.
func (sf *streamIngestionFrontier) maybeUpdatePartitionProgress() error {
ctx := sf.Ctx
ctx := sf.Ctx()
updateFreq := JobCheckpointFrequency.Get(&sf.flowCtx.Cfg.Settings.SV)
if updateFreq == 0 || timeutil.Since(sf.lastPartitionUpdate) < updateFreq {
return nil
Expand Down
Loading

0 comments on commit 7ffaece

Please sign in to comment.