diff --git a/pkg/util/tracing/crdbspan.go b/pkg/util/tracing/crdbspan.go index b42f906c036e..7647a80cab22 100644 --- a/pkg/util/tracing/crdbspan.go +++ b/pkg/util/tracing/crdbspan.go @@ -24,7 +24,6 @@ import ( "github.com/cockroachdb/logtags" "github.com/gogo/protobuf/types" "github.com/opentracing/opentracing-go" - otlog "github.com/opentracing/opentracing-go/log" ) // crdbSpan is a span for internal crdb usage. This is used to power SQL session @@ -56,13 +55,22 @@ type crdbSpanMu struct { // duration is initialized to -1 and set on Finish(). duration time.Duration - // recording maintains state once StartRecording() is called. recording struct { // recordingType is the recording type of the ongoing recording, if any. // Its 'load' method may be called without holding the surrounding mutex, // but its 'swap' method requires the mutex. recordingType atomicRecordingType - recordedLogs []opentracing.LogRecord + + logBytes int64 + logs ring.Buffer // of tracingpb.LogRecords + + structuredBytes int64 + structured ring.Buffer // of Structured events + + // dropped is true if the span has capped out it's memory limits for + // logs and structured events, and has had to drop some. + dropped bool + // children contains the list of child spans started after this Span // started recording. children []*crdbSpan @@ -79,9 +87,6 @@ type crdbSpanMu struct { // those that were set before recording started)? tags opentracing.Tags - bytesStructured int64 - structured ring.Buffer // of Structured events - // The Span's associated baggage. baggage map[string]string } @@ -123,7 +128,12 @@ func (s *crdbSpan) resetRecording() { s.mu.Lock() defer s.mu.Unlock() - s.mu.recording.recordedLogs = nil + s.mu.recording.logs.Reset() + s.mu.recording.logBytes = 0 + s.mu.recording.structured.Reset() + s.mu.recording.structuredBytes = 0 + s.mu.recording.dropped = false + s.mu.recording.children = nil s.mu.recording.remoteSpans = nil } @@ -210,30 +220,43 @@ func (s *crdbSpan) record(msg string) { return } + logRecord := tracingpb.LogRecord{ + Time: time.Now(), + Fields: []tracingpb.LogRecord_Field{ + {Key: tracingpb.LogMessageField, Value: msg}, + }, + } + s.mu.Lock() defer s.mu.Unlock() - if len(s.mu.recording.recordedLogs) < maxLogsPerSpan { - s.mu.recording.recordedLogs = append(s.mu.recording.recordedLogs, opentracing.LogRecord{ - Timestamp: time.Now(), - Fields: []otlog.Field{ - otlog.String(tracingpb.LogMessageField, msg), - }, - }) + + s.mu.recording.logBytes += int64(logRecord.Size()) + if s.mu.recording.logBytes > maxLogBytesPerSpan { + s.mu.recording.dropped = true + } + for s.mu.recording.logBytes > maxLogBytesPerSpan { + first := s.mu.recording.logs.GetFirst().(tracingpb.LogRecord) + s.mu.recording.logs.RemoveFirst() + s.mu.recording.logBytes -= int64(first.Size()) } + s.mu.recording.logs.AddLast(logRecord) } func (s *crdbSpan) recordStructured(item Structured) { s.mu.Lock() defer s.mu.Unlock() - s.mu.bytesStructured += int64(item.Size()) - for s.mu.bytesStructured > maxStructuredBytesPerSpan { - last := s.mu.structured.GetLast().(Structured) - s.mu.structured.RemoveLast() - s.mu.bytesStructured -= int64(last.Size()) + s.mu.recording.structuredBytes += int64(item.Size()) + if s.mu.recording.structuredBytes > maxStructuredBytesPerSpan { + s.mu.recording.dropped = true + } + for s.mu.recording.structuredBytes > maxStructuredBytesPerSpan { + last := s.mu.recording.structured.GetLast().(Structured) + s.mu.recording.structured.RemoveLast() + s.mu.recording.structuredBytes -= int64(last.Size()) } - s.mu.structured.AddFirst(item) + s.mu.recording.structured.AddFirst(item) } func (s *crdbSpan) setBaggageItemAndTag(restrictedKey, value string) { @@ -299,12 +322,15 @@ func (s *crdbSpan) getRecordingLocked(wantTags bool) tracingpb.RecordedSpan { if s.mu.recording.recordingType.load() == RecordingVerbose { addTag("_verbose", "1") } + if s.mu.recording.dropped { + addTag("_dropped", "1") + } } - if numEvents := s.mu.structured.Len(); numEvents != 0 { + if numEvents := s.mu.recording.structured.Len(); numEvents != 0 { rs.InternalStructured = make([]*types.Any, 0, numEvents) for i := 0; i < numEvents; i++ { - event := s.mu.structured.Get(i).(Structured) + event := s.mu.recording.structured.Get(i).(Structured) item, err := types.MarshalAny(event) if err != nil { // An error here is an error from Marshal; these @@ -335,15 +361,10 @@ func (s *crdbSpan) getRecordingLocked(wantTags bool) tracingpb.RecordedSpan { } } - rs.Logs = make([]tracingpb.LogRecord, len(s.mu.recording.recordedLogs)) - for i, r := range s.mu.recording.recordedLogs { - rs.Logs[i].Time = r.Timestamp - rs.Logs[i].Fields = make([]tracingpb.LogRecord_Field, len(r.Fields)) - for j, f := range r.Fields { - rs.Logs[i].Fields[j] = tracingpb.LogRecord_Field{ - Key: f.Key(), - Value: fmt.Sprint(f.Value()), - } + if numLogs := s.mu.recording.logs.Len(); numLogs != 0 { + rs.Logs = make([]tracingpb.LogRecord, numLogs) + for i := 0; i < numLogs; i++ { + rs.Logs[i] = s.mu.recording.logs.Get(i).(tracingpb.LogRecord) } } diff --git a/pkg/util/tracing/shadow.go b/pkg/util/tracing/shadow.go index db7d6f9ffc44..62c6b944aac2 100644 --- a/pkg/util/tracing/shadow.go +++ b/pkg/util/tracing/shadow.go @@ -106,7 +106,7 @@ func makeShadowSpan( func createLightStepTracer(token string) (shadowTracerManager, opentracing.Tracer) { return lightStepManager{}, lightstep.NewTracer(lightstep.Options{ AccessToken: token, - MaxLogsPerSpan: maxLogsPerSpan, + MaxLogsPerSpan: maxLogsPerSpanExternal, MaxBufferedSpans: 10000, UseGRPC: true, }) diff --git a/pkg/util/tracing/span_test.go b/pkg/util/tracing/span_test.go index ce622e85e5dc..98b0399fc4b1 100644 --- a/pkg/util/tracing/span_test.go +++ b/pkg/util/tracing/span_test.go @@ -18,6 +18,7 @@ import ( "testing" "time" + "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" "github.com/gogo/protobuf/types" @@ -35,7 +36,11 @@ func TestRecordingString(t *testing.T) { root.SetVerbose(true) root.Record("root 1") // Hackily fix the timing on the first log message, so that we can check it later. - root.i.crdb.mu.recording.recordedLogs[0].Timestamp = root.i.crdb.startTime.Add(time.Millisecond) + r := root.i.crdb.mu.recording.logs.GetFirst().(tracingpb.LogRecord) + r.Time = root.i.crdb.startTime.Add(time.Millisecond) + root.i.crdb.mu.recording.logs.RemoveFirst() + root.i.crdb.mu.recording.logs.AddFirst(r) + // Sleep a bit so that everything that comes afterwards has higher timestamps // than the one we just assigned. Otherwise the sorting will be screwed up. time.Sleep(10 * time.Millisecond) @@ -214,6 +219,8 @@ func TestSpanRecordStructured(t *testing.T) { `)) } +// TestSpanRecordStructuredLimit tests recording behavior when the size of +// structured data recorded into the span exceeds the configured limit. func TestSpanRecordStructuredLimit(t *testing.T) { tr := NewTracer() sp := tr.StartSpan("root", WithForceRealSpan()) @@ -231,6 +238,7 @@ func TestSpanRecordStructuredLimit(t *testing.T) { rec := sp.GetRecording() require.Len(t, rec, 1) require.Len(t, rec[0].InternalStructured, numPayloads) + require.Equal(t, rec[0].Tags["_dropped"], "1") first := rec[0].InternalStructured[0] last := rec[0].InternalStructured[len(rec[0].InternalStructured)-1] @@ -246,6 +254,125 @@ func TestSpanRecordStructuredLimit(t *testing.T) { require.Equal(t, res, int32(offset+extra+1)) } +// TestSpanRecordLimit tests recording behavior when the amount of data logged +// into the span exceeds the configured limit. +func TestSpanRecordLimit(t *testing.T) { + tr := NewTracer() + sp := tr.StartSpan("root", WithForceRealSpan()) + defer sp.Finish() + sp.SetVerbose(true) + + msg := func(i int) string { return fmt.Sprintf("%06d", i) } + + // Determine the size of a log record by actually recording once. + sp.Record(msg(0)) + logSize := sp.GetRecording()[0].Logs[0].Size() + sp.ResetRecording() + + numLogs := maxLogBytesPerSpan / logSize + const extra = 10 + for i := 1; i <= numLogs+extra; i++ { + sp.Record(msg(i)) + } + + rec := sp.GetRecording() + require.Len(t, rec, 1) + require.Len(t, rec[0].Logs, numLogs) + require.Equal(t, rec[0].Tags["_dropped"], "1") + + first := rec[0].Logs[0] + last := rec[0].Logs[len(rec[0].Logs)-1] + + require.Equal(t, first.Fields[0].Value, msg(extra+1)) + require.Equal(t, last.Fields[0].Value, msg(numLogs+extra)) +} + +// testStructuredImpl is a testing implementation of Structured event. +type testStructuredImpl struct { + *types.Int32Value +} + +var _ Structured = &testStructuredImpl{} + +func (t *testStructuredImpl) String() string { + return fmt.Sprintf("structured=%d", t.Value) +} + +func newTestStructured(i int) *testStructuredImpl { + return &testStructuredImpl{ + &types.Int32Value{Value: int32(i)}, + } +} + +// TestSpanReset checks that resetting a span clears out existing recordings. +func TestSpanReset(t *testing.T) { + tr := NewTracer() + sp := tr.StartSpan("root", WithForceRealSpan()) + defer sp.Finish() + sp.SetVerbose(true) + + for i := 1; i <= 10; i++ { + if i%2 == 0 { + sp.RecordStructured(newTestStructured(i)) + } else { + sp.Record(fmt.Sprintf("%d", i)) + } + } + + require.NoError(t, TestingCheckRecordedSpans(sp.GetRecording(), ` + span: root + tags: _unfinished=1 _verbose=1 + event: 1 + event: structured=2 + event: 3 + event: structured=4 + event: 5 + event: structured=6 + event: 7 + event: structured=8 + event: 9 + event: structured=10 + `)) + require.NoError(t, TestingCheckRecording(sp.GetRecording(), ` + === operation:root _unfinished:1 _verbose:1 + event:1 + event:structured=2 + event:3 + event:structured=4 + event:5 + event:structured=6 + event:7 + event:structured=8 + event:9 + event:structured=10 + `)) + + sp.ResetRecording() + + require.NoError(t, TestingCheckRecordedSpans(sp.GetRecording(), ` + span: root + tags: _unfinished=1 _verbose=1 + `)) + require.NoError(t, TestingCheckRecording(sp.GetRecording(), ` + === operation:root _unfinished:1 _verbose:1 + `)) + + msg := func(i int) string { return fmt.Sprintf("%06d", i) } + sp.Record(msg(0)) + logSize := sp.GetRecording()[0].Logs[0].Size() + numLogs := maxLogBytesPerSpan / logSize + const extra = 10 + + for i := 1; i <= numLogs+extra; i++ { + sp.Record(msg(i)) + } + + require.Equal(t, sp.GetRecording()[0].Tags["_dropped"], "1") + sp.ResetRecording() + _, found := sp.GetRecording()[0].Tags["_dropped"] + require.False(t, found) +} + func TestNonVerboseChildSpanRegisteredWithParent(t *testing.T) { tr := NewTracer() sp := tr.StartSpan("root", WithForceRealSpan()) diff --git a/pkg/util/tracing/tracer.go b/pkg/util/tracing/tracer.go index e8df8909bca0..eec8e22da3cf 100644 --- a/pkg/util/tracing/tracer.go +++ b/pkg/util/tracing/tracer.go @@ -41,14 +41,17 @@ import ( const verboseTracingBaggageKey = "sb" const ( - // maxLogsPerSpan limits the number of logs in a Span; use a comfortable + // maxLogBytesPerSpan limits the size of logs in a span; use a comfortable // limit. - maxLogsPerSpan = 1000 + maxLogBytesPerSpan = 10 * (1 << 10) // 10 KiB // maxStructuredBytesPerSpan limits the size of structured events in a span; // use a comfortable limit. maxStructuredBytesPerSpan = 1 << 10 // 1 KiB // maxChildrenPerSpan limits the number of (direct) child spans in a Span. maxChildrenPerSpan = 1000 + // maxLogsPerSpanExternal limits the number of logs in a Span for external + // tracers (net/trace, lightstep); use a comfortable limit. + maxLogsPerSpanExternal = 1000 ) // These constants are used to form keys to represent tracing context @@ -318,7 +321,7 @@ func (t *Tracer) startSpanGeneric( var netTr trace.Trace if t.useNetTrace() { netTr = trace.New("tracing", opName) - netTr.SetMaxEvents(maxLogsPerSpan) + netTr.SetMaxEvents(maxLogsPerSpanExternal) // If LogTags are given, pass them as tags to the shadow span. // Regular tags are populated later, via the top-level Span. diff --git a/pkg/util/tracing/tracer_test.go b/pkg/util/tracing/tracer_test.go index 48111cca771d..ac3d5960961b 100644 --- a/pkg/util/tracing/tracer_test.go +++ b/pkg/util/tracing/tracer_test.go @@ -351,7 +351,7 @@ func TestShadowTracer(t *testing.T) { Port: 65535, Plaintext: true, }, - MaxLogsPerSpan: maxLogsPerSpan, + MaxLogsPerSpan: maxLogsPerSpanExternal, UseGRPC: true, }), },