Skip to content

Commit

Permalink
Add/fix sanitizer for Zipkin span start time and duration (#333)
Browse files Browse the repository at this point in the history
  • Loading branch information
pavolloffay authored and yurishkuro committed Aug 15, 2017
1 parent db9f152 commit 79fba12
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 3 deletions.
1 change: 1 addition & 0 deletions cmd/collector/app/builder/span_handler_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func (spanHb *SpanHandlerBuilder) BuildHandlers() (app.ZipkinSpansHandler, app.J

zSanitizer := zs.NewChainedSanitizer(
zs.NewSpanDurationSanitizer(spanHb.logger),
zs.NewSpanStartTimeSanitizer(),
zs.NewParentIDSanitizer(spanHb.logger),
zs.NewErrorTagSanitizer(),
)
Expand Down
50 changes: 49 additions & 1 deletion cmd/collector/app/sanitizer/zipkin/span_sanitizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,29 @@ type spanDurationSanitizer struct {

func (s *spanDurationSanitizer) Sanitize(span *zc.Span) *zc.Span {
if span.Duration == nil {
span.Duration = &defaultDuration
duration := defaultDuration
if len(span.Annotations) >= 2 {
// Prefer RPC one-way (cs -> sr) vs arbitrary annotations.
first := span.Annotations[0].Timestamp
last := span.Annotations[len(span.Annotations)-1].Timestamp
for _, anno := range span.Annotations {
if anno.Value == zc.CLIENT_SEND {
first = anno.Timestamp
} else if anno.Value == zc.CLIENT_RECV {
last = anno.Timestamp
}
}
if first < last {
duration = last - first
if span.Timestamp == nil {
span.Timestamp = &first
}
}
}
span.Duration = &duration
return span
}

duration := *span.Duration
if duration >= 0 {
return span
Expand All @@ -99,6 +119,34 @@ func (s *spanDurationSanitizer) Sanitize(span *zc.Span) *zc.Span {
return span
}

// NewSpanStartTimeSanitizer returns a Sanitizer that changes span start time if is nil
// If there is zipkincore.CLIENT_SEND use that, if no fall back on zipkincore.SERVER_RECV
func NewSpanStartTimeSanitizer() Sanitizer {
return &spanStartTimeSanitizer{}
}

type spanStartTimeSanitizer struct {
}

func (s *spanStartTimeSanitizer) Sanitize(span *zc.Span) *zc.Span {
if span.Timestamp != nil || len(span.Annotations) == 0 {
return span
}

for _, anno := range span.Annotations {
if anno.Value == zc.CLIENT_SEND {
span.Timestamp = &anno.Timestamp
return span
}
if anno.Value == zc.SERVER_RECV && span.ParentID == nil {
// continue, cs has higher precedence and might be after
span.Timestamp = &anno.Timestamp
}
}

return span
}

// NewParentIDSanitizer returns a sanitizer that deals parentID == 0
// by replacing with nil, per Zipkin convention.
func NewParentIDSanitizer(logger *zap.Logger) Sanitizer {
Expand Down
52 changes: 52 additions & 0 deletions cmd/collector/app/sanitizer/zipkin/span_sanitizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,26 @@ func TestSpanDurationSanitizer(t *testing.T) {
nilDurationSpan := &zipkincore.Span{}
actual = sanitizer.Sanitize(nilDurationSpan)
assert.Equal(t, int64(1), *actual.Duration)

span = &zipkincore.Span{
Annotations: []*zipkincore.Annotation{
{Value: zipkincore.CLIENT_SEND, Timestamp: 10},
{Value: zipkincore.CLIENT_RECV, Timestamp: 30},
},
}
actual = sanitizer.Sanitize(span)
assert.Equal(t, int64(20), *actual.Duration)

span = &zipkincore.Span{
Annotations: []*zipkincore.Annotation{
{Value: "first", Timestamp: 100},
{Value: zipkincore.CLIENT_SEND, Timestamp: 10},
{Value: zipkincore.CLIENT_RECV, Timestamp: 30},
{Value: "last", Timestamp: 300},
},
}
actual = sanitizer.Sanitize(span)
assert.Equal(t, int64(20), *actual.Duration)
}

func TestSpanParentIDSanitizer(t *testing.T) {
Expand Down Expand Up @@ -154,6 +174,38 @@ func TestSpanErrorSanitizer(t *testing.T) {
}
}

func TestSpanStartTimeSanitizer(t *testing.T) {
sanitizer := NewSpanStartTimeSanitizer()

var helper int64 = 30
span := &zipkincore.Span{
Timestamp: &helper,
Annotations: []*zipkincore.Annotation{
{Value: zipkincore.CLIENT_SEND, Timestamp: 10},
{Value: zipkincore.SERVER_RECV, Timestamp: 20},
},
}
sanitized := sanitizer.Sanitize(span)
assert.Equal(t, int64(30), *sanitized.Timestamp)

span = &zipkincore.Span{
Annotations: []*zipkincore.Annotation{
{Value: zipkincore.CLIENT_SEND, Timestamp: 10},
{Value: zipkincore.SERVER_RECV, Timestamp: 20},
},
}
sanitized = sanitizer.Sanitize(span)
assert.Equal(t, int64(10), *sanitized.Timestamp)
span = &zipkincore.Span{
Annotations: []*zipkincore.Annotation{
{Value: zipkincore.SERVER_SEND, Timestamp: 10},
{Value: zipkincore.SERVER_RECV, Timestamp: 20},
},
}
sanitized = sanitizer.Sanitize(span)
assert.Equal(t, int64(20), *sanitized.Timestamp)
}

func TestSpanLogger(t *testing.T) {
logger, log := testutils.NewLogger()
span := &zipkincore.Span{
Expand Down
3 changes: 1 addition & 2 deletions cmd/collector/app/span_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/uber/jaeger/model/converter/thrift/zipkin"
"github.com/uber/jaeger/thrift-gen/jaeger"
"github.com/uber/jaeger/thrift-gen/zipkincore"
zc "github.com/uber/jaeger/thrift-gen/zipkincore"
)

const (
Expand All @@ -45,7 +44,7 @@ const (
// ZipkinSpansHandler consumes and handles zipkin spans
type ZipkinSpansHandler interface {
// SubmitZipkinBatch records a batch of spans in Zipkin Thrift format
SubmitZipkinBatch(ctx thrift.Context, spans []*zc.Span) ([]*zc.Response, error)
SubmitZipkinBatch(ctx thrift.Context, spans []*zipkincore.Span) ([]*zipkincore.Response, error)
}

// JaegerBatchesHandler consumes and handles Jaeger batches
Expand Down

0 comments on commit 79fba12

Please sign in to comment.