Skip to content

Commit

Permalink
Merge #86123 #86130
Browse files Browse the repository at this point in the history
86123: streamingccl: fix NPE in crdb_internal.stream_ingestion_stats* r=ajwerner a=stevendanna

This fixes a NPE that could occur if the user passed in the Job ID of
a job that isn't a stream ingestion job.

Fixes #86040

Release note: None

Release justification: Bug fix for non-production feature.

86130: kvserver: annotate Replica.Send with pprof labels r=tbg a=pavelkalinnikov

This commit adds pprof labels to Send methods, as a proper replacement of the
no longer working (since Go 1.17) unnamed parameters hack. Adding labels is
conditional to the CPU profile cluster setting, to avoid the profiling cost and
extra allocations in pprof.Do context construction during normal operation.

Fixes #85948

Release justification: Profiling labels help investigating customer issues
Release note: None

Co-authored-by: Steven Danna <[email protected]>
Co-authored-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
3 people committed Aug 17, 2022
3 parents 5f45e42 + b0fc809 + 9869df0 commit 39a86d3
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 32 deletions.
15 changes: 15 additions & 0 deletions pkg/ccl/logictestccl/testdata/logic_test/crdb_internal
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,18 @@ RANGE default ALTER RANGE default CONFIGURE ZONE USING
# to, the background generators would conflict with an error.
statement ok
SELECT a.* FROM crdb_internal.partitions AS a JOIN crdb_internal.partitions AS b ON a.table_id = b.table_id

subtest replication-builtins

user testuser

query error pq: crdb_internal\.stream_ingestion_stats_json\(\): replication restricted to ADMIN role
SELECT crdb_internal.stream_ingestion_stats_json(unique_rowid());

user root

query error pq: crdb_internal\.stream_ingestion_stats_json\(\): job.*does not exist
SELECT crdb_internal.stream_ingestion_stats_json(unique_rowid());

query error pq: crdb_internal\.stream_ingestion_stats_json\(\): job.*is not a stream ingestion job
SELECT crdb_internal.stream_ingestion_stats_json(id) FROM (SELECT id FROM system.jobs LIMIT 1);
6 changes: 5 additions & 1 deletion pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,11 @@ func getStreamIngestionStats(
if err != nil {
return nil, err
}
details := j.Details().(jobspb.StreamIngestionDetails)
details, ok := j.Details().(jobspb.StreamIngestionDetails)
if !ok {
return nil, errors.Errorf("job with id %d is not a stream ingestion job", ingestionJobID)
}

progress := j.Progress()
stats := &streampb.StreamIngestionStats{
IngestionDetails: &details,
Expand Down
40 changes: 11 additions & 29 deletions pkg/kv/kvserver/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package kvserver
import (
"context"
"reflect"
"runtime/pprof"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency"
Expand All @@ -24,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/circuit"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -109,38 +111,17 @@ func (r *Replica) Send(
// SendWithWriteBytes is the implementation of Send with an additional
// *StoreWriteBytes return value.
func (r *Replica) SendWithWriteBytes(
ctx context.Context, ba roachpb.BatchRequest,
ctx context.Context, req roachpb.BatchRequest,
) (*roachpb.BatchResponse, *StoreWriteBytes, *roachpb.Error) {
return r.sendWithoutRangeID(ctx, &ba)
}

// sendWithoutRangeID used to be called sendWithRangeID, accepted a `_forStacks
// roachpb.RangeID` argument, and had the description below. Ever since Go
// switched to the register-based calling convention though, this stopped
// working, giving essentially random numbers in the goroutine dumps that were
// misleading. It has thus been "disarmed" until Go produces useful values
// again.
//
// See (internal): https://cockroachlabs.slack.com/archives/G01G8LK77DK/p1641478596004700
//
// sendWithRangeID takes an unused rangeID argument so that the range
// ID will be accessible in stack traces (both in panics and when
// sampling goroutines from a live server). This line is subject to
// the whims of the compiler and it can be difficult to find the right
// value, but as of this writing the following example shows a stack
// while processing range 21 (0x15) (the first occurrence of that
// number is the rangeID argument, the second is within the encoded
// BatchRequest, although we don't want to rely on that occurring
// within the portion printed in the stack trace):
//
// github.com/cockroachdb/cockroach/pkg/storage.(*Replica).sendWithRangeID(0xc420d1a000, 0x64bfb80, 0xc421564b10, 0x15, 0x153fd4634aeb0193, 0x0, 0x100000001, 0x1, 0x15, 0x0, ...)
func (r *Replica) sendWithoutRangeID(
ctx context.Context, ba *roachpb.BatchRequest,
) (_ *roachpb.BatchResponse, _ *StoreWriteBytes, rErr *roachpb.Error) {
var br *roachpb.BatchResponse

if r.store.cfg.Settings.CPUProfileType() == cluster.CPUProfileWithLabels {
defer pprof.SetGoroutineLabels(ctx)
// Note: the defer statement captured the previous context.
ctx = pprof.WithLabels(ctx, pprof.Labels("range_str", r.rangeStr.String()))
pprof.SetGoroutineLabels(ctx)
}
// Add the range log tag.
ctx = r.AnnotateCtx(ctx)
ba := &req

// Record summary throughput information about the batch request for
// accounting.
Expand Down Expand Up @@ -177,6 +158,7 @@ func (r *Replica) sendWithoutRangeID(
}

// Differentiate between read-write, read-only, and admin.
var br *roachpb.BatchResponse
var pErr *roachpb.Error
var writeBytes *StoreWriteBytes
if isReadOnly {
Expand Down
3 changes: 1 addition & 2 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13452,8 +13452,7 @@ func TestProposalNotAcknowledgedOrReproposedAfterApplication(t *testing.T) {

// Round trip another proposal through the replica to ensure that previously
// committed entries have been applied.
_, _, pErr = tc.repl.sendWithoutRangeID(ctx, &ba)
if pErr != nil {
if _, pErr := tc.repl.Send(ctx, ba); pErr != nil {
t.Fatal(pErr)
}
log.Flush()
Expand Down

0 comments on commit 39a86d3

Please sign in to comment.