Skip to content

Commit

Permalink
Fix "index out of range" when receiving a dual client/server Zipkin s…
Browse files Browse the repository at this point in the history
…pan (#4160)

## Which problem is this PR solving?
- Resolves #3404 

## Short description of the changes
- Handle case where one input zipkin span is transformed into two Jaeger
spans, but still return response with the same length as input
- Add a test for dial client/server span provided in #3404 
- Side effect: refactor `cmd/collector/app/zipkin` to avoid circular
dependencies

Signed-off-by: Yuri Shkuro <[email protected]>
  • Loading branch information
yurishkuro authored Jan 15, 2023
1 parent f99a590 commit e2d0531
Show file tree
Hide file tree
Showing 22 changed files with 377 additions and 140 deletions.
53 changes: 53 additions & 0 deletions cmd/collector/app/handler/testdata/zipkin_v1_merged_spans.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
[
{
"traceId": "4ed11df465275600",
"id": "4ed11df465275600",
"name": "get /",
"timestamp": 1633073248674949,
"duration": 14861,
"localEndpoint": {
"serviceName": "first_service",
"ipv4": "10.0.2.15"
},
"annotations": [
{
"timestamp": 1633073248674949,
"value": "sr"
},
{
"timestamp": 163307324868981,
"value": "ss"
}
]
},
{
"traceId": "4ed11df465275600",
"parentId": "4ed11df465275600",
"id": "c943743e25dc2cdf",
"name": "get /api",
"timestamp": 1633073248678309,
"duration": 3360,
"localEndpoint": {
"serviceName": "first_service",
"ipv4": "10.0.2.15"
},
"annotations": [
{
"timestamp": 1633073248678309,
"value": "cs"
},
{
"timestamp": 1633073248681669,
"value": "sr"
},
{
"timestamp": 1633073248685029,
"value": "cr"
},
{
"timestamp": 1633073248688388,
"value": "ss"
}
]
}
]
24 changes: 19 additions & 5 deletions cmd/collector/app/handler/thrift_span_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,13 @@ func NewZipkinSpanHandler(logger *zap.Logger, modelHandler processor.SpanProcess
// SubmitZipkinBatch records a batch of spans already in Zipkin Thrift format.
func (h *zipkinSpanHandler) SubmitZipkinBatch(spans []*zipkincore.Span, options SubmitBatchOptions) ([]*zipkincore.Response, error) {
mSpans := make([]*model.Span, 0, len(spans))
for _, span := range spans {
convCount := make([]int, len(spans))
for i, span := range spans {
sanitized := h.sanitizer.Sanitize(span)
mSpans = append(mSpans, convertZipkinToModel(sanitized, h.logger)...)
// conversion may return more than one span, e.g. when the input Zipkin span represents both client & server spans
converted := convertZipkinToModel(sanitized, h.logger)
convCount[i] = len(converted)
mSpans = append(mSpans, converted...)
}
bools, err := h.modelProcessor.ProcessSpans(mSpans, processor.SpansOptions{
InboundTransport: options.InboundTransport,
Expand All @@ -121,13 +125,23 @@ func (h *zipkinSpanHandler) SubmitZipkinBatch(spans []*zipkincore.Span, options
return nil, err
}
responses := make([]*zipkincore.Response, len(spans))
for i, ok := range bools {
// at this point we may have len(spans) < len(bools) if conversion results in more spans
b := 0 // index through bools which we advance by convCount[i] for each iteration
for i := range spans {
res := zipkincore.NewResponse()
res.Ok = ok
res.Ok = true
for j := 0; j < convCount[i]; j++ {
res.Ok = res.Ok && bools[b]
b++
}
responses[i] = res
}

h.logger.Debug("Zipkin span batch processed by the collector.", zap.Int("span-count", len(spans)))
h.logger.Debug(
"Zipkin span batch processed by the collector.",
zap.Int("received-span-count", len(spans)),
zap.Int("processed-span-count", len(mSpans)),
)
return responses, nil
}

Expand Down
65 changes: 47 additions & 18 deletions cmd/collector/app/handler/thrift_span_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@ package handler

import (
"errors"
"os"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer/zipkin"
zipkinsanitizer "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer/zipkin"
"github.com/jaegertracing/jaeger/cmd/collector/app/zipkin/zipkindeser"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/thrift-gen/jaeger"
"github.com/jaegertracing/jaeger/thrift-gen/zipkincore"
Expand Down Expand Up @@ -82,31 +85,57 @@ func (s *shouldIErrorProcessor) Close() error {
}

func TestZipkinSpanHandler(t *testing.T) {
testChunks := []struct {
tests := []struct {
name string
expectedErr error
filename string
}{
{
name: "good case",
expectedErr: nil,
},
{
name: "bad case",
expectedErr: errTestError,
},
{
name: "dual client-server span",
expectedErr: nil,
filename: "testdata/zipkin_v1_merged_spans.json",
},
}
for _, tc := range testChunks {
logger := zap.NewNop()
h := NewZipkinSpanHandler(logger, &shouldIErrorProcessor{tc.expectedErr != nil}, zipkin.NewParentIDSanitizer())
res, err := h.SubmitZipkinBatch([]*zipkincore.Span{
{
ID: 12345,
},
}, SubmitBatchOptions{})
if tc.expectedErr != nil {
assert.Nil(t, res)
assert.Equal(t, tc.expectedErr, err)
} else {
assert.Len(t, res, 1)
assert.NoError(t, err)
assert.True(t, res[0].Ok)
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
logger := zap.NewNop()
h := NewZipkinSpanHandler(
logger,
&shouldIErrorProcessor{tc.expectedErr != nil},
zipkinsanitizer.NewChainedSanitizer(zipkinsanitizer.NewStandardSanitizers()...),
)
var spans []*zipkincore.Span
if tc.filename != "" {
data, err := os.ReadFile(tc.filename)
require.NoError(t, err)
spans, err = zipkindeser.DeserializeJSON(data)
require.NoError(t, err)
} else {
spans = []*zipkincore.Span{
{
ID: 12345,
},
}
}
res, err := h.SubmitZipkinBatch(spans, SubmitBatchOptions{})
if tc.expectedErr != nil {
assert.Nil(t, res)
assert.Equal(t, tc.expectedErr, err)
} else {
assert.Len(t, res, len(spans))
assert.NoError(t, err)
for i := range res {
assert.True(t, res[i].Ok)
}
}
})
}
}
5 changes: 3 additions & 2 deletions cmd/collector/app/span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

"github.com/jaegertracing/jaeger/cmd/collector/app/handler"
"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
zipkinSanitizer "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer/zipkin"
zipkinsanitizer "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer/zipkin"
"github.com/jaegertracing/jaeger/internal/metricstest"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/metrics"
Expand Down Expand Up @@ -100,7 +100,8 @@ func TestBySvcMetrics(t *testing.T) {
switch test.format {
case processor.ZipkinSpanFormat:
span := makeZipkinSpan(test.serviceName, test.rootSpan, test.debug)
zHandler := handler.NewZipkinSpanHandler(logger, sp, zipkinSanitizer.NewParentIDSanitizer())
sanitizer := zipkinsanitizer.NewChainedSanitizer(zipkinsanitizer.NewStandardSanitizers()...)
zHandler := handler.NewZipkinSpanHandler(logger, sp, sanitizer)
zHandler.SubmitZipkinBatch([]*zc.Span{span, span}, handler.SubmitBatchOptions{})
metricPrefix = "service"
format = "zipkin"
Expand Down
7 changes: 4 additions & 3 deletions cmd/collector/app/zipkin/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

"github.com/jaegertracing/jaeger/cmd/collector/app/handler"
"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/cmd/collector/app/zipkin/zipkindeser"
"github.com/jaegertracing/jaeger/model/converter/thrift/zipkin"
zipkinProto "github.com/jaegertracing/jaeger/proto-gen/zipkin"
"github.com/jaegertracing/jaeger/swagger-gen/models"
Expand Down Expand Up @@ -93,7 +94,7 @@ func (aH *APIHandler) saveSpans(w http.ResponseWriter, r *http.Request) {
case "application/x-thrift":
tSpans, err = zipkin.DeserializeThrift(bodyBytes)
case "application/json":
tSpans, err = DeserializeJSON(bodyBytes)
tSpans, err = zipkindeser.DeserializeJSON(bodyBytes)
default:
http.Error(w, "Unsupported Content-Type", http.StatusBadRequest)
return
Expand Down Expand Up @@ -170,7 +171,7 @@ func jsonToThriftSpansV2(bodyBytes []byte, zipkinV2Formats strfmt.Registry) ([]*
return nil, err
}

tSpans, err := spansV2ToThrift(spans)
tSpans, err := zipkindeser.SpansV2ToThrift(spans)
if err != nil {
return nil, err
}
Expand All @@ -183,7 +184,7 @@ func protoToThriftSpansV2(bodyBytes []byte) ([]*zipkincore.Span, error) {
return nil, err
}

tSpans, err := protoSpansV2ToThrift(&spans)
tSpans, err := zipkindeser.ProtoSpansV2ToThrift(&spans)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit e2d0531

Please sign in to comment.