From 79fba1278743840dff6b84aa059ec58f7ec21a0a Mon Sep 17 00:00:00 2001
From: Pavol Loffay
Date: Tue, 15 Aug 2017 07:47:40 +0200
Subject: [PATCH] Add/fix sanitizer for Zipkin span start time and duration
(#333)
---
.../app/builder/span_handler_builder.go | 1 +
.../app/sanitizer/zipkin/span_sanitizer.go | 50 +++++++++++++++++-
.../sanitizer/zipkin/span_sanitizer_test.go | 52 +++++++++++++++++++
cmd/collector/app/span_handler.go | 3 +-
4 files changed, 103 insertions(+), 3 deletions(-)
diff --git a/cmd/collector/app/builder/span_handler_builder.go b/cmd/collector/app/builder/span_handler_builder.go
index 740158f2b1b..9afa5d197b8 100644
--- a/cmd/collector/app/builder/span_handler_builder.go
+++ b/cmd/collector/app/builder/span_handler_builder.go
@@ -126,6 +126,7 @@ func (spanHb *SpanHandlerBuilder) BuildHandlers() (app.ZipkinSpansHandler, app.J
zSanitizer := zs.NewChainedSanitizer(
zs.NewSpanDurationSanitizer(spanHb.logger),
+ zs.NewSpanStartTimeSanitizer(),
zs.NewParentIDSanitizer(spanHb.logger),
zs.NewErrorTagSanitizer(),
)
diff --git a/cmd/collector/app/sanitizer/zipkin/span_sanitizer.go b/cmd/collector/app/sanitizer/zipkin/span_sanitizer.go
index 8c05f635b65..30c1f44ad29 100644
--- a/cmd/collector/app/sanitizer/zipkin/span_sanitizer.go
+++ b/cmd/collector/app/sanitizer/zipkin/span_sanitizer.go
@@ -82,9 +82,29 @@ type spanDurationSanitizer struct {
func (s *spanDurationSanitizer) Sanitize(span *zc.Span) *zc.Span {
if span.Duration == nil {
- span.Duration = &defaultDuration
+ duration := defaultDuration
+ if len(span.Annotations) >= 2 {
+ // Prefer RPC one-way (cs -> sr) vs arbitrary annotations.
+ first := span.Annotations[0].Timestamp
+ last := span.Annotations[len(span.Annotations)-1].Timestamp
+ for _, anno := range span.Annotations {
+ if anno.Value == zc.CLIENT_SEND {
+ first = anno.Timestamp
+ } else if anno.Value == zc.CLIENT_RECV {
+ last = anno.Timestamp
+ }
+ }
+ if first < last {
+ duration = last - first
+ if span.Timestamp == nil {
+ span.Timestamp = &first
+ }
+ }
+ }
+ span.Duration = &duration
return span
}
+
duration := *span.Duration
if duration >= 0 {
return span
@@ -99,6 +119,34 @@ func (s *spanDurationSanitizer) Sanitize(span *zc.Span) *zc.Span {
return span
}
+// NewSpanStartTimeSanitizer returns a Sanitizer that changes span start time if is nil
+// If there is zipkincore.CLIENT_SEND use that, if no fall back on zipkincore.SERVER_RECV
+func NewSpanStartTimeSanitizer() Sanitizer {
+ return &spanStartTimeSanitizer{}
+}
+
+type spanStartTimeSanitizer struct {
+}
+
+func (s *spanStartTimeSanitizer) Sanitize(span *zc.Span) *zc.Span {
+ if span.Timestamp != nil || len(span.Annotations) == 0 {
+ return span
+ }
+
+ for _, anno := range span.Annotations {
+ if anno.Value == zc.CLIENT_SEND {
+ span.Timestamp = &anno.Timestamp
+ return span
+ }
+ if anno.Value == zc.SERVER_RECV && span.ParentID == nil {
+ // continue, cs has higher precedence and might be after
+ span.Timestamp = &anno.Timestamp
+ }
+ }
+
+ return span
+}
+
// NewParentIDSanitizer returns a sanitizer that deals parentID == 0
// by replacing with nil, per Zipkin convention.
func NewParentIDSanitizer(logger *zap.Logger) Sanitizer {
diff --git a/cmd/collector/app/sanitizer/zipkin/span_sanitizer_test.go b/cmd/collector/app/sanitizer/zipkin/span_sanitizer_test.go
index 22ef65c3d7b..dcec52c19be 100644
--- a/cmd/collector/app/sanitizer/zipkin/span_sanitizer_test.go
+++ b/cmd/collector/app/sanitizer/zipkin/span_sanitizer_test.go
@@ -68,6 +68,26 @@ func TestSpanDurationSanitizer(t *testing.T) {
nilDurationSpan := &zipkincore.Span{}
actual = sanitizer.Sanitize(nilDurationSpan)
assert.Equal(t, int64(1), *actual.Duration)
+
+ span = &zipkincore.Span{
+ Annotations: []*zipkincore.Annotation{
+ {Value: zipkincore.CLIENT_SEND, Timestamp: 10},
+ {Value: zipkincore.CLIENT_RECV, Timestamp: 30},
+ },
+ }
+ actual = sanitizer.Sanitize(span)
+ assert.Equal(t, int64(20), *actual.Duration)
+
+ span = &zipkincore.Span{
+ Annotations: []*zipkincore.Annotation{
+ {Value: "first", Timestamp: 100},
+ {Value: zipkincore.CLIENT_SEND, Timestamp: 10},
+ {Value: zipkincore.CLIENT_RECV, Timestamp: 30},
+ {Value: "last", Timestamp: 300},
+ },
+ }
+ actual = sanitizer.Sanitize(span)
+ assert.Equal(t, int64(20), *actual.Duration)
}
func TestSpanParentIDSanitizer(t *testing.T) {
@@ -154,6 +174,38 @@ func TestSpanErrorSanitizer(t *testing.T) {
}
}
+func TestSpanStartTimeSanitizer(t *testing.T) {
+ sanitizer := NewSpanStartTimeSanitizer()
+
+ var helper int64 = 30
+ span := &zipkincore.Span{
+ Timestamp: &helper,
+ Annotations: []*zipkincore.Annotation{
+ {Value: zipkincore.CLIENT_SEND, Timestamp: 10},
+ {Value: zipkincore.SERVER_RECV, Timestamp: 20},
+ },
+ }
+ sanitized := sanitizer.Sanitize(span)
+ assert.Equal(t, int64(30), *sanitized.Timestamp)
+
+ span = &zipkincore.Span{
+ Annotations: []*zipkincore.Annotation{
+ {Value: zipkincore.CLIENT_SEND, Timestamp: 10},
+ {Value: zipkincore.SERVER_RECV, Timestamp: 20},
+ },
+ }
+ sanitized = sanitizer.Sanitize(span)
+ assert.Equal(t, int64(10), *sanitized.Timestamp)
+ span = &zipkincore.Span{
+ Annotations: []*zipkincore.Annotation{
+ {Value: zipkincore.SERVER_SEND, Timestamp: 10},
+ {Value: zipkincore.SERVER_RECV, Timestamp: 20},
+ },
+ }
+ sanitized = sanitizer.Sanitize(span)
+ assert.Equal(t, int64(20), *sanitized.Timestamp)
+}
+
func TestSpanLogger(t *testing.T) {
logger, log := testutils.NewLogger()
span := &zipkincore.Span{
diff --git a/cmd/collector/app/span_handler.go b/cmd/collector/app/span_handler.go
index d7bbd59af83..7f484824dc7 100644
--- a/cmd/collector/app/span_handler.go
+++ b/cmd/collector/app/span_handler.go
@@ -30,7 +30,6 @@ import (
"github.com/uber/jaeger/model/converter/thrift/zipkin"
"github.com/uber/jaeger/thrift-gen/jaeger"
"github.com/uber/jaeger/thrift-gen/zipkincore"
- zc "github.com/uber/jaeger/thrift-gen/zipkincore"
)
const (
@@ -45,7 +44,7 @@ const (
// ZipkinSpansHandler consumes and handles zipkin spans
type ZipkinSpansHandler interface {
// SubmitZipkinBatch records a batch of spans in Zipkin Thrift format
- SubmitZipkinBatch(ctx thrift.Context, spans []*zc.Span) ([]*zc.Response, error)
+ SubmitZipkinBatch(ctx thrift.Context, spans []*zipkincore.Span) ([]*zipkincore.Response, error)
}
// JaegerBatchesHandler consumes and handles Jaeger batches