Skip to content

Commit

Permalink
Merge #83363
Browse files Browse the repository at this point in the history
83363: streamingccl: add tracing spans to ingestion process r=miretskiy a=stevendanna

This enables real spans for the stream ingestion job and adds tracing
spans to the partition client and stream ingestion processor.

Starting the job with real spans means that our trace will include
anything recorded with RecordStructured. For example, here is a
snippet of trace from a running job with this enabled:

```
   444.495ms      0.036ms                === operation:stream-ingestion-processor _unfinished:1 node:1 job:773465692775677953 stream-ingest-distsql:
   444.582ms      0.087ms                    === operation:Subscription.Subscribe _unfinished:1 node:1 job:773465692775677953 stream-ingest-distsql:
 20706.491ms  20261.994ms                structured:{"`@type":"type.googleapis.com/cockroach.roachpb.ScanStats","numInterfaceSeeks":"2","numInternalSeeks":"2","numInterfaceSteps...`
 20707.645ms      1.154ms                structured:{"`@type":"type.googleapis.com/google.protobuf.StringValue","value":"ingesting` SST (22 keys/2174 bytes) via regular write batch"}
 30447.051ms   9739.591ms                structured:{"`@type":"type.googleapis.com/cockroach.roachpb.ScanStats","numInterfaceSeeks":"6","numInternalSeeks":"5","numInterfaceSteps":"...`
 40932.980ms  10485.744ms                structured:{"`@type":"type.googleapis.com/google.protobuf.StringValue","value":"ingesting` SST (2 keys/1213 bytes) via regular write batch"}
 46027.808ms   5094.828ms                structured:{"`@type":"type.googleapis.com/google.protobuf.StringValue","value":"ingesting` SST (1 keys/1185 bytes) via regular write batch"}
```

I've added spans for the few functions on the ingestion side that I
imagined might be interesting, but we should add more as needed.

Informs #83318

Release note: None

Co-authored-by: Steven Danna <[email protected]>
  • Loading branch information
craig[bot] and stevendanna committed Jun 27, 2022
2 parents a611410 + 6552e7d commit 31ed92f
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 3 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/streamclient/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ go_library(
"//pkg/util/randutil",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"@com_github_cockroachdb_errors//:errors",
],
)
Expand Down
16 changes: 16 additions & 0 deletions pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -56,6 +57,9 @@ var _ Client = &partitionedStreamClient{}
func (p *partitionedStreamClient) Create(
ctx context.Context, tenantID roachpb.TenantID,
) (streaming.StreamID, error) {
ctx, sp := tracing.ChildSpan(ctx, "streamclient.Client.Create")
defer sp.Finish()

streamID := streaming.InvalidStreamID

conn, err := p.srcDB.Conn(ctx)
Expand All @@ -76,6 +80,9 @@ func (p *partitionedStreamClient) Create(
func (p *partitionedStreamClient) Heartbeat(
ctx context.Context, streamID streaming.StreamID, consumed hlc.Timestamp,
) (streampb.StreamReplicationStatus, error) {
ctx, sp := tracing.ChildSpan(ctx, "streamclient.Client.Heartbeat")
defer sp.Finish()

conn, err := p.srcDB.Conn(ctx)
if err != nil {
return streampb.StreamReplicationStatus{}, err
Expand Down Expand Up @@ -175,6 +182,9 @@ func (p *partitionedStreamClient) Close() error {
func (p *partitionedStreamClient) Subscribe(
ctx context.Context, stream streaming.StreamID, spec SubscriptionToken, checkpoint hlc.Timestamp,
) (Subscription, error) {
_, sp := tracing.ChildSpan(ctx, "streamclient.Client.Subscribe")
defer sp.Finish()

sps := streampb.StreamPartitionSpec{}
if err := protoutil.Unmarshal(spec, &sps); err != nil {
return nil, err
Expand All @@ -201,6 +211,9 @@ func (p *partitionedStreamClient) Subscribe(

// Complete implements the streamclient.Client interface.
func (p *partitionedStreamClient) Complete(ctx context.Context, streamID streaming.StreamID) error {
ctx, sp := tracing.ChildSpan(ctx, "streamclient.Client.Complete")
defer sp.Finish()

conn, err := p.srcDB.Conn(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -255,6 +268,9 @@ func parseEvent(streamEvent *streampb.StreamEvent) streamingccl.Event {

// Subscribe implements the Subscription interface.
func (p *partitionedStreamSubscription) Subscribe(ctx context.Context) error {
ctx, sp := tracing.ChildSpan(ctx, "Subscription.Subscribe")
defer sp.Finish()

defer close(p.eventsChan)
conn, err := p.db.Conn(ctx)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ func (s *streamIngestionResumer) OnFailOrCancel(_ context.Context, _ interface{}
return nil
}

func (s *streamIngestionResumer) ForceRealSpan() bool { return true }

var _ jobs.Resumer = &streamIngestionResumer{}

func init() {
Expand Down
13 changes: 10 additions & 3 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
)
Expand Down Expand Up @@ -535,6 +536,9 @@ func (sip *streamIngestionProcessor) consumeEvents() (*jobspb.ResolvedSpans, err
}

func (sip *streamIngestionProcessor) bufferSST(sst *roachpb.RangeFeedSSTable) error {
_, sp := tracing.ChildSpan(sip.Ctx, "stream-ingestion-buffer-sst")
defer sp.Finish()

iter, err := storage.NewMemSSTIterator(sst.Data, true)
if err != nil {
return err
Expand Down Expand Up @@ -606,14 +610,17 @@ func (sip *streamIngestionProcessor) bufferCheckpoint(event partitionEvent) erro
}

func (sip *streamIngestionProcessor) flush() (*jobspb.ResolvedSpans, error) {
ctx, sp := tracing.ChildSpan(sip.Ctx, "stream-ingestion-flush")
defer sp.Finish()

flushedCheckpoints := jobspb.ResolvedSpans{ResolvedSpans: make([]jobspb.ResolvedSpan, 0)}
// Ensure that the current batch is sorted.
sort.Sort(sip.curBatch)

totalSize := 0
minBatchMVCCTimestamp := hlc.MaxTimestamp
for _, kv := range sip.curBatch {
if err := sip.batcher.AddMVCCKey(sip.Ctx, kv.Key, kv.Value); err != nil {
if err := sip.batcher.AddMVCCKey(ctx, kv.Key, kv.Value); err != nil {
return nil, errors.Wrapf(err, "adding key %+v", kv)
}
if kv.Key.Timestamp.Less(minBatchMVCCTimestamp) {
Expand All @@ -631,7 +638,7 @@ func (sip *streamIngestionProcessor) flush() (*jobspb.ResolvedSpans, error) {
sip.metrics.IngestedBytes.Inc(int64(totalSize))
sip.metrics.IngestedEvents.Inc(int64(len(sip.curBatch)))
}()
if err := sip.batcher.Flush(sip.Ctx); err != nil {
if err := sip.batcher.Flush(ctx); err != nil {
return nil, errors.Wrap(err, "flushing")
}
}
Expand All @@ -654,7 +661,7 @@ func (sip *streamIngestionProcessor) flush() (*jobspb.ResolvedSpans, error) {
sip.lastFlushTime = timeutil.Now()
sip.bufferedCheckpoints = make(map[string]hlc.Timestamp)

return &flushedCheckpoints, sip.batcher.Reset(sip.Ctx)
return &flushedCheckpoints, sip.batcher.Reset(ctx)
}

func init() {
Expand Down

0 comments on commit 31ed92f

Please sign in to comment.