Skip to content

Commit

Permalink
rename attr vars, set requestSize
Browse files Browse the repository at this point in the history
  • Loading branch information
safeer committed Jul 29, 2024
1 parent 8cfd531 commit a9f5a54
Showing 1 changed file with 25 additions and 25 deletions.
50 changes: 25 additions & 25 deletions internal/rpc/otel_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ import (
)

const (
otelFtlRequestKey = "ftl.requestKey"
otelFtlVerbRef = "ftl.verb.ref"
otelFtlVerbModule = "ftl.verb.module"
otelFtlRequestKeyAttr = attribute.Key("ftl.requestKey")
otelFtlVerbRefAttr = attribute.Key("ftl.verb.ref")
otelFtlVerbModuleAttr = attribute.Key("ftl.verb.module")
otelMessageEvent = "message"
otelMessageIDKey = attribute.Key("message.id")
otelMessageSizeKey = attribute.Key("message.uncompressed_size")
otelMessageTypeKey = attribute.Key("message.type")
otelMessageIDAttr = attribute.Key("message.id")
otelMessageSizeAttr = attribute.Key("message.uncompressed_size")
otelMessageTypeAttr = attribute.Key("message.type")
otelMessageTypeSent = "SENT"
otelMessageTypeReceived = "RECEIVED"
)
Expand Down Expand Up @@ -62,14 +62,14 @@ func getAttributes(ctx context.Context, rpcSystemKey string) []attribute.KeyValu
}
requestKey, err := RequestKeyFromContext(ctx)
if err != nil {
logger.Debugf("failed to get request key: %s", err)
logger.Warnf("failed to get request key: %s", err)
}
if key, ok := requestKey.Get(); ok {
attributes = append(attributes, attribute.String(otelFtlRequestKey, key.String()))
attributes = append(attributes, otelFtlRequestKeyAttr.String(key.String()))
}
if verb, ok := VerbFromContext(ctx); ok {
attributes = append(attributes, attribute.String(otelFtlVerbRef, verb.String()))
attributes = append(attributes, attribute.String(otelFtlVerbModule, verb.Module))
attributes = append(attributes, otelFtlVerbRefAttr.String(verb.String()))
attributes = append(attributes, otelFtlVerbModuleAttr.String(verb.Module))
}
return attributes
}
Expand All @@ -82,12 +82,12 @@ func (i *otelInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc {
name := strings.TrimLeft(request.Spec().Procedure, "/")

spanKind := trace.SpanKindClient
requestSpan := otelMessageTypeKey.String(otelMessageTypeSent)
responseSpan := otelMessageTypeKey.String(otelMessageTypeReceived)
requestSpan := otelMessageTypeAttr.String(otelMessageTypeSent)
responseSpan := otelMessageTypeAttr.String(otelMessageTypeReceived)
if !isClient {
spanKind = trace.SpanKindServer
requestSpan = otelMessageTypeKey.String(otelMessageTypeReceived)
responseSpan = otelMessageTypeKey.String(otelMessageTypeSent)
requestSpan = otelMessageTypeAttr.String(otelMessageTypeReceived)
responseSpan = otelMessageTypeAttr.String(otelMessageTypeSent)
}

attributes := getAttributes(ctx, request.Peer().Protocol)
Expand All @@ -99,7 +99,7 @@ func (i *otelInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc {
ctx, span := tracer.Start(ctx, name, traceOpts...)
defer span.End()

var requestSize int
requestSize := 0
if request != nil {
if msg, ok := request.Any().(proto.Message); ok {
requestSize = proto.Size(msg)
Expand All @@ -109,8 +109,8 @@ func (i *otelInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc {
span.AddEvent(otelMessageEvent,
trace.WithAttributes(
requestSpan,
otelMessageIDKey.Int(1),
otelMessageSizeKey.Int(requestSize),
otelMessageIDAttr.Int(1),
otelMessageSizeAttr.Int(requestSize),
),
)

Expand All @@ -125,8 +125,8 @@ func (i *otelInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc {
span.AddEvent(otelMessageEvent,
trace.WithAttributes(
responseSpan,
otelMessageIDKey.Int(1),
otelMessageSizeKey.Int(responseSize),
otelMessageIDAttr.Int(1),
otelMessageSizeAttr.Int(responseSize),
),
)
span.SetAttributes(attributes...)
Expand Down Expand Up @@ -342,12 +342,12 @@ func (s *streamingState) receive(ctx context.Context, msg any, conn streamingSen
s.attributes = append(s.attributes, statusCodeAttribute(s.protocol, err))
}
attrs := append(s.attributes, []attribute.KeyValue{ // nolint:gocritic
otelMessageTypeKey.String(otelMessageTypeReceived),
otelMessageIDKey.Int64(s.receivedCounter),
otelMessageTypeAttr.String(otelMessageTypeReceived),
otelMessageIDAttr.Int64(s.receivedCounter),
}...)
if protomsg, ok := msg.(proto.Message); ok {
size := proto.Size(protomsg)
attrs = append(attrs, otelMessageSizeKey.Int(size))
attrs = append(attrs, otelMessageSizeAttr.Int(size))
s.receiveSize.Record(ctx, int64(size), metric.WithAttributes(attrs...))
}

Expand All @@ -369,12 +369,12 @@ func (s *streamingState) send(ctx context.Context, msg any, conn streamingSender
s.attributes = append(s.attributes, statusCodeAttribute(s.protocol, err))
}
attrs := append(s.attributes, []attribute.KeyValue{ // nolint:gocritic
otelMessageTypeKey.String(otelMessageTypeSent),
otelMessageIDKey.Int64(s.sentCounter),
otelMessageTypeAttr.String(otelMessageTypeSent),
otelMessageIDAttr.Int64(s.sentCounter),
}...)
if protomsg, ok := msg.(proto.Message); ok {
size := proto.Size(protomsg)
attrs = append(attrs, otelMessageSizeKey.Int(size))
attrs = append(attrs, otelMessageSizeAttr.Int(size))
s.sendSize.Record(ctx, int64(size), metric.WithAttributes(attrs...))
}

Expand Down

0 comments on commit a9f5a54

Please sign in to comment.