Skip to content

Commit

Permalink
Add inbound transport as label to collector metrics (#1446)
Browse files Browse the repository at this point in the history
* adding endpoint metrics

Signed-off-by: Jude Wang <[email protected]>

* fix linter issue

Signed-off-by: Jude Wang <[email protected]>

* refactor span_handler to emit endpoint metrics

Signed-off-by: Jude Wang <[email protected]>

* minor update

Signed-off-by: Jude Wang <[email protected]>

* replace string

Signed-off-by: Jude Wang <[email protected]>

* default transport type is undefined

Signed-off-by: Jude Wang <[email protected]>

* fix merge conflict and test

Signed-off-by: Jude Wang <[email protected]>

* refactor

Signed-off-by: Jude Wang <[email protected]>

* lint & fmt

Signed-off-by: Jude Wang <[email protected]>

* update switch statement to be map due to twice faster benchmark

Signed-off-by: Jude Wang <[email protected]>

* Use typed constants, rename types

Signed-off-by: Yuri Shkuro <[email protected]>

* Clean-up comments

Signed-off-by: Yuri Shkuro <[email protected]>

* Remove copy-pasta

Signed-off-by: Yuri Shkuro <[email protected]>
  • Loading branch information
guanw authored and yurishkuro committed Apr 6, 2019
1 parent 8b2a2c1 commit dbb6ea7
Show file tree
Hide file tree
Showing 11 changed files with 124 additions and 73 deletions.
4 changes: 2 additions & 2 deletions cmd/collector/app/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest)
}
}
_, err := g.spanProcessor.ProcessSpans(r.GetBatch().Spans, ProcessSpansOptions{
InboundTransport: "grpc", // TODO do we have a constant?
SpanFormat: JaegerFormatType,
InboundTransport: GRPCTransport,
SpanFormat: ProtoSpanFormat,
})
if err != nil {
g.logger.Error("cannot process spans", zap.Error(err))
Expand Down
2 changes: 1 addition & 1 deletion cmd/collector/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (aH *APIHandler) saveSpan(w http.ResponseWriter, r *http.Request) {
return
}
batches := []*tJaeger.Batch{batch}
opts := SubmitBatchOptions{InboundTransport: "http"} // TODO do we have a constant?
opts := SubmitBatchOptions{InboundTransport: HTTPTransport}
if _, err = aH.jaegerBatchesHandler.SubmitBatches(batches, opts); err != nil {
http.Error(w, fmt.Sprintf("Cannot submit Jaeger batch: %v", err), http.StatusInternalServerError)
return
Expand Down
94 changes: 74 additions & 20 deletions cmd/collector/app/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ import (
)

const (
maxServiceNames = 2000
otherServices = "other-services"
// TODO this needs to be configurable via CLI.
maxServiceNames = 4000

// otherServices is the catch-all label when number of services exceeds maxServiceNames
otherServices = "other-services"
)

// SpanProcessorMetrics contains all the necessary metrics for the SpanProcessor
Expand All @@ -44,7 +47,7 @@ type SpanProcessorMetrics struct {
SavedOkBySvc metricsBySvc // spans actually saved
SavedErrBySvc metricsBySvc // spans failed to save
serviceNames metrics.Gauge // total number of unique service name metrics reported by this collector
spanCounts map[string]CountsBySpanType
spanCounts SpanCountsByFormat
}

type countsBySvc struct {
Expand All @@ -61,23 +64,58 @@ type metricsBySvc struct {
traces countsBySvc // number of traces originated per service
}

// CountsBySpanType measures received, rejected, and receivedByService metrics for a format type
type CountsBySpanType struct {
// ReceivedBySvc maintain by-service metrics for a format type
// InboundTransport identifies the transport used to receive spans.
type InboundTransport string

const (
// GRPCTransport indicates spans received over gRPC.
GRPCTransport InboundTransport = "grpc"
// TChannelTransport indicates spans received over TChannel.
TChannelTransport InboundTransport = "tchannel"
// HTTPTransport indicates spans received over HTTP.
HTTPTransport InboundTransport = "http"
// UnknownTransport is the fallback/catch-all category.
UnknownTransport InboundTransport = "unknown"
)

// SpanFormat identifies the data format in which the span was originally received.
type SpanFormat string

const (
// JaegerSpanFormat is for Jaeger Thrift spans.
JaegerSpanFormat SpanFormat = "jaeger"
// ZipkinSpanFormat is for Zipkin Thrift spans.
ZipkinSpanFormat SpanFormat = "zipkin"
// ProtoSpanFormat is for Jaeger protobuf Spans.
ProtoSpanFormat SpanFormat = "proto"
// UnknownSpanFormat is the fallback/catch-all category.
UnknownSpanFormat SpanFormat = "unknown"
)

// SpanCountsByFormat groups metrics by different span formats (thrift, proto, etc.)
type SpanCountsByFormat map[SpanFormat]SpanCountsByTransport

// SpanCountsByTransport groups metrics by inbound transport (e.g http, grpc, tchannel)
type SpanCountsByTransport map[InboundTransport]SpanCounts

// SpanCounts contains countrs for received and rejected spans.
type SpanCounts struct {
// ReceivedBySvc maintain by-service metrics.
ReceivedBySvc metricsBySvc
// RejectedBySvc is the number of spans we rejected (usually due to blacklisting) by-service
// RejectedBySvc is the number of spans we rejected (usually due to blacklisting) by-service.
RejectedBySvc metricsBySvc
}

// NewSpanProcessorMetrics returns a SpanProcessorMetrics
func NewSpanProcessorMetrics(serviceMetrics metrics.Factory, hostMetrics metrics.Factory, otherFormatTypes []string) *SpanProcessorMetrics {
spanCounts := map[string]CountsBySpanType{
ZipkinFormatType: newCountsBySpanType(serviceMetrics.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"format": ZipkinFormatType}})),
JaegerFormatType: newCountsBySpanType(serviceMetrics.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"format": JaegerFormatType}})),
UnknownFormatType: newCountsBySpanType(serviceMetrics.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"format": UnknownFormatType}})),
func NewSpanProcessorMetrics(serviceMetrics metrics.Factory, hostMetrics metrics.Factory, otherFormatTypes []SpanFormat) *SpanProcessorMetrics {
spanCounts := SpanCountsByFormat{
ZipkinSpanFormat: newCountsByTransport(serviceMetrics, ZipkinSpanFormat),
JaegerSpanFormat: newCountsByTransport(serviceMetrics, JaegerSpanFormat),
ProtoSpanFormat: newCountsByTransport(serviceMetrics, ProtoSpanFormat),
UnknownSpanFormat: newCountsByTransport(serviceMetrics, UnknownSpanFormat),
}
for _, otherFormatType := range otherFormatTypes {
spanCounts[otherFormatType] = newCountsBySpanType(serviceMetrics.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"format": otherFormatType}}))
spanCounts[otherFormatType] = newCountsByTransport(serviceMetrics, otherFormatType)
}
m := &SpanProcessorMetrics{
SaveLatency: hostMetrics.Timer(metrics.TimerOptions{Name: "save-latency", Tags: nil}),
Expand Down Expand Up @@ -118,20 +156,35 @@ func newCountsBySvc(factory metrics.Factory, category string, maxServiceNames in
}
}

func newCountsBySpanType(factory metrics.Factory) CountsBySpanType {
return CountsBySpanType{
func newCountsByTransport(factory metrics.Factory, format SpanFormat) SpanCountsByTransport {
factory = factory.Namespace(metrics.NSOptions{Tags: map[string]string{"format": string(format)}})
return SpanCountsByTransport{
HTTPTransport: newCounts(factory, HTTPTransport),
TChannelTransport: newCounts(factory, TChannelTransport),
GRPCTransport: newCounts(factory, GRPCTransport),
UnknownTransport: newCounts(factory, UnknownTransport),
}
}

func newCounts(factory metrics.Factory, transport InboundTransport) SpanCounts {
factory = factory.Namespace(metrics.NSOptions{Tags: map[string]string{"transport": string(transport)}})
return SpanCounts{
RejectedBySvc: newMetricsBySvc(factory, "rejected"),
ReceivedBySvc: newMetricsBySvc(factory, "received"),
}
}

// GetCountsForFormat gets the countsBySpanType for a given format. If none exists, we use the Unknown format.
func (m *SpanProcessorMetrics) GetCountsForFormat(spanFormat string) CountsBySpanType {
// GetCountsForFormat gets the SpanCounts for a given format and transport. If none exists, we use the Unknown format.
func (m *SpanProcessorMetrics) GetCountsForFormat(spanFormat SpanFormat, transport InboundTransport) SpanCounts {
c, ok := m.spanCounts[spanFormat]
if !ok {
return m.spanCounts[UnknownFormatType]
c = m.spanCounts[UnknownSpanFormat]
}
t, ok := c[transport]
if !ok {
t = c[UnknownTransport]
}
return c
return t
}

// reportServiceNameForSpan determines the name of the service that emitted
Expand Down Expand Up @@ -183,7 +236,8 @@ func (m *countsBySvc) countByServiceName(serviceName string, isDebug bool) {
if isDebug {
debugStr = "true"
}
c := m.factory.Counter(metrics.Options{Name: m.category, Tags: map[string]string{"svc": serviceName, "debug": debugStr}})
tags := map[string]string{"svc": serviceName, "debug": debugStr}
c := m.factory.Counter(metrics.Options{Name: m.category, Tags: tags})
counts[serviceName] = c
counter = c
} else {
Expand Down
29 changes: 16 additions & 13 deletions cmd/collector/app/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,30 +29,33 @@ func TestProcessorMetrics(t *testing.T) {
baseMetrics := metricstest.NewFactory(time.Hour)
serviceMetrics := baseMetrics.Namespace(jaegerM.NSOptions{Name: "service", Tags: nil})
hostMetrics := baseMetrics.Namespace(jaegerM.NSOptions{Name: "host", Tags: nil})
spm := NewSpanProcessorMetrics(serviceMetrics, hostMetrics, []string{"scruffy"})
benderFormatMetrics := spm.GetCountsForFormat("bender")
assert.NotNil(t, benderFormatMetrics)
jFormat := spm.GetCountsForFormat(JaegerFormatType)
assert.NotNil(t, jFormat)
jFormat.ReceivedBySvc.ReportServiceNameForSpan(&model.Span{
spm := NewSpanProcessorMetrics(serviceMetrics, hostMetrics, []SpanFormat{SpanFormat("scruffy")})
benderFormatHTTPMetrics := spm.GetCountsForFormat("bender", HTTPTransport)
assert.NotNil(t, benderFormatHTTPMetrics)
benderFormatGRPCMetrics := spm.GetCountsForFormat("bender", GRPCTransport)
assert.NotNil(t, benderFormatGRPCMetrics)

jTChannelFormat := spm.GetCountsForFormat(JaegerSpanFormat, TChannelTransport)
assert.NotNil(t, jTChannelFormat)
jTChannelFormat.ReceivedBySvc.ReportServiceNameForSpan(&model.Span{
Process: &model.Process{},
})
mSpan := model.Span{
Process: &model.Process{
ServiceName: "fry",
},
}
jFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan)
jTChannelFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan)
mSpan.Flags.SetDebug()
jFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan)
jTChannelFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan)
mSpan.ReplaceParentID(1234)
jFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan)
jTChannelFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan)
counters, gauges := baseMetrics.Backend.Snapshot()

assert.EqualValues(t, 1, counters["service.spans.received|debug=false|format=jaeger|svc=fry"])
assert.EqualValues(t, 2, counters["service.spans.received|debug=true|format=jaeger|svc=fry"])
assert.EqualValues(t, 1, counters["service.traces.received|debug=false|format=jaeger|svc=fry"])
assert.EqualValues(t, 1, counters["service.traces.received|debug=true|format=jaeger|svc=fry"])
assert.EqualValues(t, 1, counters["service.spans.received|debug=false|format=jaeger|svc=fry|transport=tchannel"])
assert.EqualValues(t, 2, counters["service.spans.received|debug=true|format=jaeger|svc=fry|transport=tchannel"])
assert.EqualValues(t, 1, counters["service.traces.received|debug=false|format=jaeger|svc=fry|transport=tchannel"])
assert.EqualValues(t, 1, counters["service.traces.received|debug=true|format=jaeger|svc=fry|transport=tchannel"])
assert.Empty(t, gauges)
}

Expand Down
4 changes: 2 additions & 2 deletions cmd/collector/app/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type options struct {
blockingSubmit bool
queueSize int
reportBusy bool
extraFormatTypes []string
extraFormatTypes []SpanFormat
}

// Option is a function that sets some option on StorageBuilder.
Expand Down Expand Up @@ -128,7 +128,7 @@ func (options) ReportBusy(reportBusy bool) Option {
}

// ExtraFormatTypes creates an Option that initializes the extra list of format types
func (options) ExtraFormatTypes(extraFormatTypes []string) Option {
func (options) ExtraFormatTypes(extraFormatTypes []SpanFormat) Option {
return func(b *options) {
b.extraFormatTypes = extraFormatTypes
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/collector/app/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
)

func TestAllOptionSet(t *testing.T) {
types := []string{"sneh"}
types := []SpanFormat{SpanFormat("sneh")}
opts := Options.apply(
Options.ReportBusy(true),
Options.BlockingSubmit(true),
Expand Down
12 changes: 6 additions & 6 deletions cmd/collector/app/span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package app
import (
"time"

"github.com/uber/tchannel-go"
tchannel "github.com/uber/tchannel-go"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer"
Expand All @@ -28,8 +28,8 @@ import (

// ProcessSpansOptions additional options passed to processor along with the spans.
type ProcessSpansOptions struct {
SpanFormat string
InboundTransport string
SpanFormat SpanFormat
InboundTransport InboundTransport
}

// SpanProcessor handles model spans
Expand Down Expand Up @@ -126,7 +126,7 @@ func (sp *spanProcessor) ProcessSpans(mSpans []*model.Span, options ProcessSpans
sp.metrics.BatchSize.Update(int64(len(mSpans)))
retMe := make([]bool, len(mSpans))
for i, mSpan := range mSpans {
ok := sp.enqueueSpan(mSpan, options.SpanFormat)
ok := sp.enqueueSpan(mSpan, options.SpanFormat, options.InboundTransport)
if !ok && sp.reportBusy {
return nil, tchannel.ErrServerBusy
}
Expand All @@ -140,8 +140,8 @@ func (sp *spanProcessor) processItemFromQueue(item *queueItem) {
sp.metrics.InQueueLatency.Record(time.Since(item.queuedTime))
}

func (sp *spanProcessor) enqueueSpan(span *model.Span, originalFormat string) bool {
spanCounts := sp.metrics.GetCountsForFormat(originalFormat)
func (sp *spanProcessor) enqueueSpan(span *model.Span, originalFormat SpanFormat, transport InboundTransport) bool {
spanCounts := sp.metrics.GetCountsForFormat(originalFormat, transport)
spanCounts.ReceivedBySvc.ReportServiceNameForSpan(span)

if !sp.filterSpan(span) {
Expand Down
25 changes: 12 additions & 13 deletions cmd/collector/app/span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ func TestBySvcMetrics(t *testing.T) {
allowedService := "bender"

type TestCase struct {
format string
format SpanFormat
serviceName string
rootSpan bool
debug bool
}

spanFormat := [2]string{ZipkinFormatType, JaegerFormatType}
spanFormat := [2]SpanFormat{ZipkinSpanFormat, JaegerSpanFormat}
serviceNames := [2]string{allowedService, blackListedService}
rootSpanEnabled := [2]bool{true, false}
debugEnabled := [2]bool{true, false}
Expand Down Expand Up @@ -83,13 +83,13 @@ func TestBySvcMetrics(t *testing.T) {
)
var metricPrefix, format string
switch test.format {
case ZipkinFormatType:
case ZipkinSpanFormat:
span := makeZipkinSpan(test.serviceName, test.rootSpan, test.debug)
zHandler := NewZipkinSpanHandler(logger, processor, zipkinSanitizer.NewParentIDSanitizer())
zHandler.SubmitZipkinBatch([]*zc.Span{span, span}, SubmitBatchOptions{})
metricPrefix = "service"
format = "zipkin"
case JaegerFormatType:
case JaegerSpanFormat:
span, process := makeJaegerSpan(test.serviceName, test.rootSpan, test.debug)
jHandler := NewJaegerSpanHandler(logger, processor)
jHandler.SubmitBatches([]*jaeger.Batch{
Expand All @@ -109,21 +109,21 @@ func TestBySvcMetrics(t *testing.T) {
expected := []metricstest.ExpectedMetric{}
if test.debug {
expected = append(expected, metricstest.ExpectedMetric{
Name: metricPrefix + ".spans.received|debug=true|format=" + format + "|svc=" + test.serviceName, Value: 2,
Name: metricPrefix + ".spans.received|debug=true|format=" + format + "|svc=" + test.serviceName + "|transport=unknown", Value: 2,
})
} else {
expected = append(expected, metricstest.ExpectedMetric{
Name: metricPrefix + ".spans.received|debug=false|format=" + format + "|svc=" + test.serviceName, Value: 2,
Name: metricPrefix + ".spans.received|debug=false|format=" + format + "|svc=" + test.serviceName + "|transport=unknown", Value: 2,
})
}
if test.rootSpan {
if test.debug {
expected = append(expected, metricstest.ExpectedMetric{
Name: metricPrefix + ".traces.received|debug=true|format=" + format + "|svc=" + test.serviceName, Value: 2,
Name: metricPrefix + ".traces.received|debug=true|format=" + format + "|svc=" + test.serviceName + "|transport=unknown", Value: 2,
})
} else {
expected = append(expected, metricstest.ExpectedMetric{
Name: metricPrefix + ".traces.received|debug=false|format=" + format + "|svc=" + test.serviceName, Value: 2,
Name: metricPrefix + ".traces.received|debug=false|format=" + format + "|svc=" + test.serviceName + "|transport=unknown", Value: 2,
})
}
}
Expand All @@ -137,7 +137,7 @@ func TestBySvcMetrics(t *testing.T) {
})
} else {
expected = append(expected, metricstest.ExpectedMetric{
Name: metricPrefix + ".spans.rejected|debug=false|format=" + format + "|svc=" + test.serviceName, Value: 2,
Name: metricPrefix + ".spans.rejected|debug=false|format=" + format + "|svc=" + test.serviceName + "|transport=unknown", Value: 2,
})
}
mb.AssertCounterMetrics(t, expected...)
Expand Down Expand Up @@ -213,7 +213,7 @@ func TestSpanProcessor(t *testing.T) {
ServiceName: "x",
},
},
}, ProcessSpansOptions{SpanFormat: JaegerFormatType})
}, ProcessSpansOptions{SpanFormat: JaegerSpanFormat})
assert.NoError(t, err)
assert.Equal(t, []bool{true}, res)
}
Expand All @@ -236,8 +236,7 @@ func TestSpanProcessorErrors(t *testing.T) {
ServiceName: "x",
},
},
}, ProcessSpansOptions{SpanFormat: JaegerFormatType})

}, ProcessSpansOptions{SpanFormat: JaegerSpanFormat})
assert.NoError(t, err)
assert.Equal(t, []bool{true}, res)

Expand Down Expand Up @@ -295,7 +294,7 @@ func TestSpanProcessorBusy(t *testing.T) {
ServiceName: "x",
},
},
}, ProcessSpansOptions{SpanFormat: JaegerFormatType})
}, ProcessSpansOptions{SpanFormat: JaegerSpanFormat})

assert.Error(t, err, "expcting busy error")
assert.Nil(t, res)
Expand Down
8 changes: 6 additions & 2 deletions cmd/collector/app/tchannel_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,17 @@ func (h *TChannelHandler) SubmitZipkinBatch(
_ thrift.Context,
spans []*zipkincore.Span,
) ([]*zipkincore.Response, error) {
return h.zipkinHandler.SubmitZipkinBatch(spans, SubmitBatchOptions{InboundTransport: "tchannel"})
return h.zipkinHandler.SubmitZipkinBatch(spans, SubmitBatchOptions{
InboundTransport: TChannelTransport,
})
}

// SubmitBatches implements jaeger.TChanCollector.
func (h *TChannelHandler) SubmitBatches(
_ thrift.Context,
batches []*jaeger.Batch,
) ([]*jaeger.BatchSubmitResponse, error) {
return h.jaegerHandler.SubmitBatches(batches, SubmitBatchOptions{InboundTransport: "tchannel"})
return h.jaegerHandler.SubmitBatches(batches, SubmitBatchOptions{
InboundTransport: TChannelTransport,
})
}
Loading

0 comments on commit dbb6ea7

Please sign in to comment.