Skip to content

Commit

Permalink
tracing: use byte-limits for recorded logs per span
Browse files Browse the repository at this point in the history
Touches #59188. We can introduce byte-limits for verbose logging
messages in a similar manner to what we've done for structured events.

This commit also:
- adds a _dropped tag to recordings with dropped logs/structured events.
- squashes a bug where reset spans (as used in SessionTracing) still
  held onto earlier structured events
- moves away from the internal usage of the opentracing.LogRecord type,
  it's unnecessary

Release justification: low risk, high benefit changes to existing
functionality

Release note: None
  • Loading branch information
irfansharif committed Mar 3, 2021
1 parent 9902059 commit 5b72a34
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 37 deletions.
83 changes: 52 additions & 31 deletions pkg/util/tracing/crdbspan.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/cockroachdb/logtags"
"github.com/gogo/protobuf/types"
"github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
)

// crdbSpan is a span for internal crdb usage. This is used to power SQL session
Expand Down Expand Up @@ -56,13 +55,22 @@ type crdbSpanMu struct {
// duration is initialized to -1 and set on Finish().
duration time.Duration

// recording maintains state once StartRecording() is called.
recording struct {
// recordingType is the recording type of the ongoing recording, if any.
// Its 'load' method may be called without holding the surrounding mutex,
// but its 'swap' method requires the mutex.
recordingType atomicRecordingType
recordedLogs []opentracing.LogRecord

logBytes int64
logs ring.Buffer // of tracingpb.LogRecords

structuredBytes int64
structured ring.Buffer // of Structured events

// dropped is true if the span has capped out it's memory limits for
// logs and structured events, and has had to drop some.
dropped bool

// children contains the list of child spans started after this Span
// started recording.
children []*crdbSpan
Expand All @@ -79,9 +87,6 @@ type crdbSpanMu struct {
// those that were set before recording started)?
tags opentracing.Tags

bytesStructured int64
structured ring.Buffer // of Structured events

// The Span's associated baggage.
baggage map[string]string
}
Expand Down Expand Up @@ -123,7 +128,12 @@ func (s *crdbSpan) resetRecording() {
s.mu.Lock()
defer s.mu.Unlock()

s.mu.recording.recordedLogs = nil
s.mu.recording.logs.Reset()
s.mu.recording.logBytes = 0
s.mu.recording.structured.Reset()
s.mu.recording.structuredBytes = 0
s.mu.recording.dropped = false

s.mu.recording.children = nil
s.mu.recording.remoteSpans = nil
}
Expand Down Expand Up @@ -210,30 +220,43 @@ func (s *crdbSpan) record(msg string) {
return
}

logRecord := tracingpb.LogRecord{
Time: time.Now(),
Fields: []tracingpb.LogRecord_Field{
{Key: tracingpb.LogMessageField, Value: msg},
},
}

s.mu.Lock()
defer s.mu.Unlock()
if len(s.mu.recording.recordedLogs) < maxLogsPerSpan {
s.mu.recording.recordedLogs = append(s.mu.recording.recordedLogs, opentracing.LogRecord{
Timestamp: time.Now(),
Fields: []otlog.Field{
otlog.String(tracingpb.LogMessageField, msg),
},
})

s.mu.recording.logBytes += int64(logRecord.Size())
if s.mu.recording.logBytes > maxLogBytesPerSpan {
s.mu.recording.dropped = true
}
for s.mu.recording.logBytes > maxLogBytesPerSpan {
first := s.mu.recording.logs.GetFirst().(tracingpb.LogRecord)
s.mu.recording.logs.RemoveFirst()
s.mu.recording.logBytes -= int64(first.Size())
}
s.mu.recording.logs.AddLast(logRecord)
}

func (s *crdbSpan) recordStructured(item Structured) {
s.mu.Lock()
defer s.mu.Unlock()

s.mu.bytesStructured += int64(item.Size())
for s.mu.bytesStructured > maxStructuredBytesPerSpan {
last := s.mu.structured.GetLast().(Structured)
s.mu.structured.RemoveLast()
s.mu.bytesStructured -= int64(last.Size())
s.mu.recording.structuredBytes += int64(item.Size())
if s.mu.recording.structuredBytes > maxStructuredBytesPerSpan {
s.mu.recording.dropped = true
}
for s.mu.recording.structuredBytes > maxStructuredBytesPerSpan {
last := s.mu.recording.structured.GetLast().(Structured)
s.mu.recording.structured.RemoveLast()
s.mu.recording.structuredBytes -= int64(last.Size())
}

s.mu.structured.AddFirst(item)
s.mu.recording.structured.AddFirst(item)
}

func (s *crdbSpan) setBaggageItemAndTag(restrictedKey, value string) {
Expand Down Expand Up @@ -299,12 +322,15 @@ func (s *crdbSpan) getRecordingLocked(wantTags bool) tracingpb.RecordedSpan {
if s.mu.recording.recordingType.load() == RecordingVerbose {
addTag("_verbose", "1")
}
if s.mu.recording.dropped {
addTag("_dropped", "1")
}
}

if numEvents := s.mu.structured.Len(); numEvents != 0 {
if numEvents := s.mu.recording.structured.Len(); numEvents != 0 {
rs.InternalStructured = make([]*types.Any, 0, numEvents)
for i := 0; i < numEvents; i++ {
event := s.mu.structured.Get(i).(Structured)
event := s.mu.recording.structured.Get(i).(Structured)
item, err := types.MarshalAny(event)
if err != nil {
// An error here is an error from Marshal; these
Expand Down Expand Up @@ -335,15 +361,10 @@ func (s *crdbSpan) getRecordingLocked(wantTags bool) tracingpb.RecordedSpan {
}
}

rs.Logs = make([]tracingpb.LogRecord, len(s.mu.recording.recordedLogs))
for i, r := range s.mu.recording.recordedLogs {
rs.Logs[i].Time = r.Timestamp
rs.Logs[i].Fields = make([]tracingpb.LogRecord_Field, len(r.Fields))
for j, f := range r.Fields {
rs.Logs[i].Fields[j] = tracingpb.LogRecord_Field{
Key: f.Key(),
Value: fmt.Sprint(f.Value()),
}
if numLogs := s.mu.recording.logs.Len(); numLogs != 0 {
rs.Logs = make([]tracingpb.LogRecord, numLogs)
for i := 0; i < numLogs; i++ {
rs.Logs[i] = s.mu.recording.logs.Get(i).(tracingpb.LogRecord)
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/util/tracing/shadow.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func makeShadowSpan(
func createLightStepTracer(token string) (shadowTracerManager, opentracing.Tracer) {
return lightStepManager{}, lightstep.NewTracer(lightstep.Options{
AccessToken: token,
MaxLogsPerSpan: maxLogsPerSpan,
MaxLogsPerSpan: maxLogsPerSpanExternal,
MaxBufferedSpans: 10000,
UseGRPC: true,
})
Expand Down
129 changes: 128 additions & 1 deletion pkg/util/tracing/span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/gogo/protobuf/types"
Expand All @@ -35,7 +36,11 @@ func TestRecordingString(t *testing.T) {
root.SetVerbose(true)
root.Record("root 1")
// Hackily fix the timing on the first log message, so that we can check it later.
root.i.crdb.mu.recording.recordedLogs[0].Timestamp = root.i.crdb.startTime.Add(time.Millisecond)
r := root.i.crdb.mu.recording.logs.GetFirst().(tracingpb.LogRecord)
r.Time = root.i.crdb.startTime.Add(time.Millisecond)
root.i.crdb.mu.recording.logs.RemoveFirst()
root.i.crdb.mu.recording.logs.AddFirst(r)

// Sleep a bit so that everything that comes afterwards has higher timestamps
// than the one we just assigned. Otherwise the sorting will be screwed up.
time.Sleep(10 * time.Millisecond)
Expand Down Expand Up @@ -214,6 +219,8 @@ func TestSpanRecordStructured(t *testing.T) {
`))
}

// TestSpanRecordStructuredLimit tests recording behavior when the size of
// structured data recorded into the span exceeds the configured limit.
func TestSpanRecordStructuredLimit(t *testing.T) {
tr := NewTracer()
sp := tr.StartSpan("root", WithForceRealSpan())
Expand All @@ -231,6 +238,7 @@ func TestSpanRecordStructuredLimit(t *testing.T) {
rec := sp.GetRecording()
require.Len(t, rec, 1)
require.Len(t, rec[0].InternalStructured, numPayloads)
require.Equal(t, rec[0].Tags["_dropped"], "1")

first := rec[0].InternalStructured[0]
last := rec[0].InternalStructured[len(rec[0].InternalStructured)-1]
Expand All @@ -246,6 +254,125 @@ func TestSpanRecordStructuredLimit(t *testing.T) {
require.Equal(t, res, int32(offset+extra+1))
}

// TestSpanRecordLimit tests recording behavior when the amount of data logged
// into the span exceeds the configured limit.
func TestSpanRecordLimit(t *testing.T) {
tr := NewTracer()
sp := tr.StartSpan("root", WithForceRealSpan())
defer sp.Finish()
sp.SetVerbose(true)

msg := func(i int) string { return fmt.Sprintf("%06d", i) }

// Determine the size of a log record by actually recording once.
sp.Record(msg(0))
logSize := sp.GetRecording()[0].Logs[0].Size()
sp.ResetRecording()

numLogs := maxLogBytesPerSpan / logSize
const extra = 10
for i := 1; i <= numLogs+extra; i++ {
sp.Record(msg(i))
}

rec := sp.GetRecording()
require.Len(t, rec, 1)
require.Len(t, rec[0].Logs, numLogs)
require.Equal(t, rec[0].Tags["_dropped"], "1")

first := rec[0].Logs[0]
last := rec[0].Logs[len(rec[0].Logs)-1]

require.Equal(t, first.Fields[0].Value, msg(extra+1))
require.Equal(t, last.Fields[0].Value, msg(numLogs+extra))
}

// testStructuredImpl is a testing implementation of Structured event.
type testStructuredImpl struct {
*types.Int32Value
}

var _ Structured = &testStructuredImpl{}

func (t *testStructuredImpl) String() string {
return fmt.Sprintf("structured=%d", t.Value)
}

func newTestStructured(i int) *testStructuredImpl {
return &testStructuredImpl{
&types.Int32Value{Value: int32(i)},
}
}

// TestSpanReset checks that resetting a span clears out existing recordings.
func TestSpanReset(t *testing.T) {
tr := NewTracer()
sp := tr.StartSpan("root", WithForceRealSpan())
defer sp.Finish()
sp.SetVerbose(true)

for i := 1; i <= 10; i++ {
if i%2 == 0 {
sp.RecordStructured(newTestStructured(i))
} else {
sp.Record(fmt.Sprintf("%d", i))
}
}

require.NoError(t, TestingCheckRecordedSpans(sp.GetRecording(), `
span: root
tags: _unfinished=1 _verbose=1
event: 1
event: structured=2
event: 3
event: structured=4
event: 5
event: structured=6
event: 7
event: structured=8
event: 9
event: structured=10
`))
require.NoError(t, TestingCheckRecording(sp.GetRecording(), `
=== operation:root _unfinished:1 _verbose:1
event:1
event:structured=2
event:3
event:structured=4
event:5
event:structured=6
event:7
event:structured=8
event:9
event:structured=10
`))

sp.ResetRecording()

require.NoError(t, TestingCheckRecordedSpans(sp.GetRecording(), `
span: root
tags: _unfinished=1 _verbose=1
`))
require.NoError(t, TestingCheckRecording(sp.GetRecording(), `
=== operation:root _unfinished:1 _verbose:1
`))

msg := func(i int) string { return fmt.Sprintf("%06d", i) }
sp.Record(msg(0))
logSize := sp.GetRecording()[0].Logs[0].Size()
numLogs := maxLogBytesPerSpan / logSize
const extra = 10

for i := 1; i <= numLogs+extra; i++ {
sp.Record(msg(i))
}

require.Equal(t, sp.GetRecording()[0].Tags["_dropped"], "1")
sp.ResetRecording()
_, found := sp.GetRecording()[0].Tags["_dropped"]
require.False(t, found)
}

func TestNonVerboseChildSpanRegisteredWithParent(t *testing.T) {
tr := NewTracer()
sp := tr.StartSpan("root", WithForceRealSpan())
Expand Down
9 changes: 6 additions & 3 deletions pkg/util/tracing/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,17 @@ import (
const verboseTracingBaggageKey = "sb"

const (
// maxLogsPerSpan limits the number of logs in a Span; use a comfortable
// maxLogBytesPerSpan limits the size of logs in a span; use a comfortable
// limit.
maxLogsPerSpan = 1000
maxLogBytesPerSpan = 10 * (1 << 10) // 10 KiB
// maxStructuredBytesPerSpan limits the size of structured events in a span;
// use a comfortable limit.
maxStructuredBytesPerSpan = 1 << 10 // 1 KiB
// maxChildrenPerSpan limits the number of (direct) child spans in a Span.
maxChildrenPerSpan = 1000
// maxLogsPerSpanExternal limits the number of logs in a Span for external
// tracers (net/trace, lightstep); use a comfortable limit.
maxLogsPerSpanExternal = 1000
)

// These constants are used to form keys to represent tracing context
Expand Down Expand Up @@ -318,7 +321,7 @@ func (t *Tracer) startSpanGeneric(
var netTr trace.Trace
if t.useNetTrace() {
netTr = trace.New("tracing", opName)
netTr.SetMaxEvents(maxLogsPerSpan)
netTr.SetMaxEvents(maxLogsPerSpanExternal)

// If LogTags are given, pass them as tags to the shadow span.
// Regular tags are populated later, via the top-level Span.
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/tracing/tracer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func TestShadowTracer(t *testing.T) {
Port: 65535,
Plaintext: true,
},
MaxLogsPerSpan: maxLogsPerSpan,
MaxLogsPerSpan: maxLogsPerSpanExternal,
UseGRPC: true,
}),
},
Expand Down

0 comments on commit 5b72a34

Please sign in to comment.