Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve attribute handling #346

Merged
merged 10 commits into from
Aug 28, 2024
14 changes: 5 additions & 9 deletions input/otlp/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,10 @@ func (c *Consumer) convertLogRecord(
if body := record.Body(); body.Type() != pcommon.ValueTypeEmpty {
event.Message = body.AsString()
if body.Type() == pcommon.ValueTypeMap {
setLabels(body.Map(), event)
body.Map().Range(func(k string, v pcommon.Value) bool {
setLabel(replaceDots(k), event, v)
return true
})
}
}
if traceID := record.TraceID(); !traceID.IsEmpty() {
Expand Down Expand Up @@ -199,7 +202,7 @@ func (c *Consumer) convertLogRecord(
}
event.DataStream.Namespace = v.Str()
default:
setLabel(replaceDots(k), event, ifaceAttributeValue(v))
setLabel(replaceDots(k), event, v)
}
return true
})
Expand Down Expand Up @@ -236,10 +239,3 @@ func (c *Consumer) convertLogRecord(

return event
}

func setLabels(m pcommon.Map, event *modelpb.APMEvent) {
m.Range(func(k string, v pcommon.Value) bool {
setLabel(replaceDots(k), event, ifaceAttributeValue(v))
return true
})
}
124 changes: 59 additions & 65 deletions input/otlp/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,13 +189,15 @@ func translateResourceMetadata(resource pcommon.Resource, out *modelpb.APMEvent)
if out.Host == nil {
out.Host = modelpb.HostFromVTPool()
}
out.Host.Ip = pSliceToType[*modelpb.IP](v.Slice(), func(v pcommon.Value) (*modelpb.IP, bool) {
ip, err := modelpb.ParseIP(v.Str())
if err != nil {
return nil, false
slice := v.Slice()
result := make([]*modelpb.IP, 0, slice.Len())
for i := 0; i < slice.Len(); i++ {
ip, err := modelpb.ParseIP(slice.At(i).Str())
if err == nil {
result = append(result, ip)
}
return ip, true
})
}
out.Host.Ip = result

// process.*
case semconv.AttributeProcessPID:
Expand Down Expand Up @@ -320,15 +322,14 @@ func translateResourceMetadata(resource pcommon.Resource, out *modelpb.APMEvent)
out.DataStream = modelpb.DataStreamFromVTPool()
}
out.DataStream.Namespace = v.Str()

default:
if out.Labels == nil {
out.Labels = make(modelpb.Labels)
}
if out.NumericLabels == nil {
out.NumericLabels = make(modelpb.NumericLabels)
}
setLabel(replaceDots(k), out, ifaceAttributeValue(v))
setLabel(replaceDots(k), out, v)
}
return true
})
Expand Down Expand Up @@ -482,71 +483,64 @@ func cleanServiceName(name string) string {
return serviceNameInvalidRegexp.ReplaceAllString(truncate(name), "_")
}

func ifaceAttributeValue(v pcommon.Value) interface{} {
switch v.Type() {
case pcommon.ValueTypeStr:
return truncate(v.Str())
case pcommon.ValueTypeBool:
return strconv.FormatBool(v.Bool())
case pcommon.ValueTypeInt:
return float64(v.Int())
case pcommon.ValueTypeDouble:
return v.Double()
case pcommon.ValueTypeSlice:
return ifaceAttributeValueSlice(v.Slice())
}
return nil
}

func ifaceAttributeValueSlice(slice pcommon.Slice) []interface{} {
return pSliceToType[interface{}](slice, func(v pcommon.Value) (interface{}, bool) {
return ifaceAttributeValue(v), true
})
}

func pSliceToType[T any](slice pcommon.Slice, f func(pcommon.Value) (T, bool)) []T {
result := make([]T, 0, slice.Len())
for i := 0; i < slice.Len(); i++ {
if v, ok := f(slice.At(i)); ok {
result = append(result, v)
}
}
return result
}

// initEventLabels initializes an event-specific labels from an event.
func initEventLabels(e *modelpb.APMEvent) {
e.Labels = modelpb.Labels(e.Labels).Clone()
e.NumericLabels = modelpb.NumericLabels(e.NumericLabels).Clone()
}

func setLabel(key string, event *modelpb.APMEvent, v interface{}) {
switch v := v.(type) {
case string:
modelpb.Labels(event.Labels).Set(key, v)
case bool:
modelpb.Labels(event.Labels).Set(key, strconv.FormatBool(v))
case float64:
modelpb.NumericLabels(event.NumericLabels).Set(key, v)
case int64:
modelpb.NumericLabels(event.NumericLabels).Set(key, float64(v))
case []interface{}:
if len(v) == 0 {
func setLabel(key string, event *modelpb.APMEvent, v pcommon.Value) {
switch v.Type() {
case pcommon.ValueTypeStr:
modelpb.Labels(event.Labels).Set(key, truncate(v.Str()))
case pcommon.ValueTypeBool:
modelpb.Labels(event.Labels).Set(key, strconv.FormatBool(v.Bool()))
case pcommon.ValueTypeInt:
modelpb.NumericLabels(event.NumericLabels).Set(key, float64(v.Int()))
case pcommon.ValueTypeDouble:
modelpb.NumericLabels(event.NumericLabels).Set(key, v.Double())
case pcommon.ValueTypeSlice:
s := v.Slice()
if s.Len() == 0 {
return
}
switch v[0].(type) {
case string:
value := make([]string, len(v))
for i := range v {
value[i] = v[i].(string)
}
modelpb.Labels(event.Labels).SetSlice(key, value)
case float64:
value := make([]float64, len(v))
for i := range v {
value[i] = v[i].(float64)
}
modelpb.NumericLabels(event.NumericLabels).SetSlice(key, value)
switch s.At(0).Type() {
case pcommon.ValueTypeStr:
result := make([]string, 0, s.Len())
for i := 0; i < s.Len(); i++ {
r := s.At(i)
if r.Type() == pcommon.ValueTypeStr {
result = append(result, r.Str())
}
}
modelpb.Labels(event.Labels).SetSlice(key, result)
case pcommon.ValueTypeBool:
result := make([]string, 0, s.Len())
for i := 0; i < s.Len(); i++ {
r := s.At(i)
if r.Type() == pcommon.ValueTypeBool {
result = append(result, strconv.FormatBool(r.Bool()))
}
}
modelpb.Labels(event.Labels).SetSlice(key, result)
case pcommon.ValueTypeDouble:
result := make([]float64, 0, s.Len())
for i := 0; i < s.Len(); i++ {
r := s.At(i)
if r.Type() == pcommon.ValueTypeDouble {
result = append(result, r.Double())
}
}
modelpb.NumericLabels(event.NumericLabels).SetSlice(key, result)
case pcommon.ValueTypeInt:
result := make([]float64, 0, s.Len())
for i := 0; i < s.Len(); i++ {
r := s.At(i)
if r.Type() == pcommon.ValueTypeInt {
result = append(result, float64(r.Int()))
}
}
modelpb.NumericLabels(event.NumericLabels).SetSlice(key, result)
}
}
}
47 changes: 43 additions & 4 deletions input/otlp/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,16 +329,55 @@ func TestResourceConventions(t *testing.T) {
}
}

// This test ensures that the values are properly translated,
// and that heterogeneous array elements are dropped without a panic.
func TestResourceLabels(t *testing.T) {
metadata := transformResourceMetadata(t, map[string]interface{}{
"string_array": []interface{}{"abc", "def"},
"int_array": []interface{}{123, 456},
"string_value": "abc",
"bool_value": true,
"int_value": 123,
"float_value": 1.23,
"string_array": []interface{}{"abc", "def", true, 123, 1.23, nil},
"bool_array": []interface{}{true, false, "true", 123, 1.23, nil},
"int_array": []interface{}{123, 456, "abc", true, 1.23, nil},
"float_array": []interface{}{1.23, 4.56, "abc", true, 123, nil},
"empty_array": []interface{}{}, // Ensure that an empty array is ignored.
})
assert.Equal(t, modelpb.Labels{
"string_array": {Global: true, Values: []string{"abc", "def"}},
"string_value": {
Global: true,
Value: "abc",
},
"bool_value": {
Global: true,
Value: "true",
},
"string_array": {
Global: true,
Values: []string{"abc", "def"},
},
"bool_array": {
Global: true,
Values: []string{"true", "false"},
},
}, modelpb.Labels(metadata.Labels))
assert.Equal(t, modelpb.NumericLabels{
"int_array": {Global: true, Values: []float64{123, 456}},
"int_value": {
Global: true,
Value: 123,
},
"float_value": {
Global: true,
Value: 1.23,
},
"int_array": {
Global: true,
Values: []float64{123, 456},
},
"float_array": {
Global: true,
Values: []float64{1.23, 4.56},
},
}, modelpb.NumericLabels(metadata.NumericLabels))
}

Expand Down
2 changes: 1 addition & 1 deletion input/otlp/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func (c *Consumer) handleScopeMetrics(
}
event.User.Name = truncate(v.Str())
default:
setLabel(k, event, ifaceAttributeValue(v))
setLabel(k, event, v)
}
return true
})
Expand Down
30 changes: 12 additions & 18 deletions input/otlp/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,12 +292,8 @@ func TranslateTransaction(
}
}
default:
setLabel(k, event, ifaceAttributeValue(v))
setLabel(k, event, v)
}
case pcommon.ValueTypeBool:
setLabel(k, event, ifaceAttributeValue(v))
case pcommon.ValueTypeDouble:
setLabel(k, event, ifaceAttributeValue(v))
case pcommon.ValueTypeInt:
switch kDots {
case semconv.AttributeHTTPStatusCode, attributeHttpResponseStatusCode:
Expand All @@ -315,7 +311,7 @@ func TranslateTransaction(
isRPC = true
event.Transaction.Result = codes.Code(v.Int()).String()
default:
setLabel(k, event, ifaceAttributeValue(v))
setLabel(k, event, v)
}
case pcommon.ValueTypeMap:
case pcommon.ValueTypeStr:
Expand Down Expand Up @@ -491,10 +487,11 @@ func TranslateTransaction(
event.DataStream = modelpb.DataStreamFromVTPool()
}
event.DataStream.Namespace = stringval

default:
modelpb.Labels(event.Labels).Set(k, stringval)
}
default:
setLabel(k, event, v)
}
return true
})
Expand Down Expand Up @@ -643,18 +640,14 @@ func TranslateSpan(spanKind ptrace.SpanKind, attributes pcommon.Map, event *mode

k := replaceDots(kDots)
switch v.Type() {
case pcommon.ValueTypeSlice:
setLabel(k, event, ifaceAttributeValueSlice(v.Slice()))
case pcommon.ValueTypeBool:
switch kDots {
case semconv.AttributeMessagingTempDestination:
messageTempDestination = v.Bool()
fallthrough
default:
setLabel(k, event, strconv.FormatBool(v.Bool()))
setLabel(k, event, v)
}
case pcommon.ValueTypeDouble:
setLabel(k, event, v.Double())
case pcommon.ValueTypeInt:
switch kDots {
case "http.status_code", attributeHttpResponseStatusCode:
Expand All @@ -667,7 +660,7 @@ func TranslateSpan(spanKind ptrace.SpanKind, attributes pcommon.Map, event *mode
rpcSystem = "grpc"
isRPC = true
default:
setLabel(k, event, v.Int())
setLabel(k, event, v)
}
case pcommon.ValueTypeStr:
stringval := truncate(v.Str())
Expand Down Expand Up @@ -837,10 +830,11 @@ func TranslateSpan(spanKind ptrace.SpanKind, attributes pcommon.Map, event *mode
event.DataStream = modelpb.DataStreamFromVTPool()
}
event.DataStream.Namespace = stringval

default:
modelpb.Labels(event.Labels).Set(k, stringval)
setLabel(k, event, v)
}
default:
rubvs marked this conversation as resolved.
Show resolved Hide resolved
setLabel(k, event, v)
}
return true
})
Expand Down Expand Up @@ -1116,7 +1110,7 @@ func (c *Consumer) convertSpanEvent(
event.DataStream.Namespace = v.Str()

default:
setLabel(replaceDots(k), event, ifaceAttributeValue(v))
setLabel(replaceDots(k), event, v)
}
return true
})
Expand Down Expand Up @@ -1161,7 +1155,7 @@ func (c *Consumer) convertSpanEvent(
event.Message = truncate(v.Str())
return true
}
setLabel(k, event, ifaceAttributeValue(v))
setLabel(k, event, v)
}
return true
})
Expand Down Expand Up @@ -1215,7 +1209,7 @@ func (c *Consumer) convertJaegerErrorSpanEvent(event ptrace.SpanEvent, apmEvent
apmEvent.DataStream.Namespace = v.Str()

default:
setLabel(replaceDots(k), apmEvent, ifaceAttributeValue(v))
setLabel(replaceDots(k), apmEvent, v)
}
return true
})
Expand Down
Loading