Skip to content

Commit

Permalink
Improve attribute handling (#346)
Browse files Browse the repository at this point in the history
* drop invalid array elements in otlp payload

* replace switch with if-statement

* add unit test to translate span

* update translate span test to check slice values

* improve conversion between event labels and signal attributes

* remove reduntant function

* set event labels as default

* only append host ip elements when error is nil

* update transaction translation to default to setting a label directly

* merge unit tests
  • Loading branch information
rubvs authored Aug 28, 2024
1 parent ff5b129 commit 227954a
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 121 deletions.
14 changes: 5 additions & 9 deletions input/otlp/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,10 @@ func (c *Consumer) convertLogRecord(
if body := record.Body(); body.Type() != pcommon.ValueTypeEmpty {
event.Message = body.AsString()
if body.Type() == pcommon.ValueTypeMap {
setLabels(body.Map(), event)
body.Map().Range(func(k string, v pcommon.Value) bool {
setLabel(replaceDots(k), event, v)
return true
})
}
}
if traceID := record.TraceID(); !traceID.IsEmpty() {
Expand Down Expand Up @@ -199,7 +202,7 @@ func (c *Consumer) convertLogRecord(
}
event.DataStream.Namespace = v.Str()
default:
setLabel(replaceDots(k), event, ifaceAttributeValue(v))
setLabel(replaceDots(k), event, v)
}
return true
})
Expand Down Expand Up @@ -236,10 +239,3 @@ func (c *Consumer) convertLogRecord(

return event
}

func setLabels(m pcommon.Map, event *modelpb.APMEvent) {
m.Range(func(k string, v pcommon.Value) bool {
setLabel(replaceDots(k), event, ifaceAttributeValue(v))
return true
})
}
124 changes: 59 additions & 65 deletions input/otlp/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,13 +189,15 @@ func translateResourceMetadata(resource pcommon.Resource, out *modelpb.APMEvent)
if out.Host == nil {
out.Host = modelpb.HostFromVTPool()
}
out.Host.Ip = pSliceToType[*modelpb.IP](v.Slice(), func(v pcommon.Value) (*modelpb.IP, bool) {
ip, err := modelpb.ParseIP(v.Str())
if err != nil {
return nil, false
slice := v.Slice()
result := make([]*modelpb.IP, 0, slice.Len())
for i := 0; i < slice.Len(); i++ {
ip, err := modelpb.ParseIP(slice.At(i).Str())
if err == nil {
result = append(result, ip)
}
return ip, true
})
}
out.Host.Ip = result

// process.*
case semconv.AttributeProcessPID:
Expand Down Expand Up @@ -320,15 +322,14 @@ func translateResourceMetadata(resource pcommon.Resource, out *modelpb.APMEvent)
out.DataStream = modelpb.DataStreamFromVTPool()
}
out.DataStream.Namespace = v.Str()

default:
if out.Labels == nil {
out.Labels = make(modelpb.Labels)
}
if out.NumericLabels == nil {
out.NumericLabels = make(modelpb.NumericLabels)
}
setLabel(replaceDots(k), out, ifaceAttributeValue(v))
setLabel(replaceDots(k), out, v)
}
return true
})
Expand Down Expand Up @@ -482,71 +483,64 @@ func cleanServiceName(name string) string {
return serviceNameInvalidRegexp.ReplaceAllString(truncate(name), "_")
}

func ifaceAttributeValue(v pcommon.Value) interface{} {
switch v.Type() {
case pcommon.ValueTypeStr:
return truncate(v.Str())
case pcommon.ValueTypeBool:
return strconv.FormatBool(v.Bool())
case pcommon.ValueTypeInt:
return float64(v.Int())
case pcommon.ValueTypeDouble:
return v.Double()
case pcommon.ValueTypeSlice:
return ifaceAttributeValueSlice(v.Slice())
}
return nil
}

func ifaceAttributeValueSlice(slice pcommon.Slice) []interface{} {
return pSliceToType[interface{}](slice, func(v pcommon.Value) (interface{}, bool) {
return ifaceAttributeValue(v), true
})
}

func pSliceToType[T any](slice pcommon.Slice, f func(pcommon.Value) (T, bool)) []T {
result := make([]T, 0, slice.Len())
for i := 0; i < slice.Len(); i++ {
if v, ok := f(slice.At(i)); ok {
result = append(result, v)
}
}
return result
}

// initEventLabels initializes an event-specific labels from an event.
func initEventLabels(e *modelpb.APMEvent) {
e.Labels = modelpb.Labels(e.Labels).Clone()
e.NumericLabels = modelpb.NumericLabels(e.NumericLabels).Clone()
}

func setLabel(key string, event *modelpb.APMEvent, v interface{}) {
switch v := v.(type) {
case string:
modelpb.Labels(event.Labels).Set(key, v)
case bool:
modelpb.Labels(event.Labels).Set(key, strconv.FormatBool(v))
case float64:
modelpb.NumericLabels(event.NumericLabels).Set(key, v)
case int64:
modelpb.NumericLabels(event.NumericLabels).Set(key, float64(v))
case []interface{}:
if len(v) == 0 {
func setLabel(key string, event *modelpb.APMEvent, v pcommon.Value) {
switch v.Type() {
case pcommon.ValueTypeStr:
modelpb.Labels(event.Labels).Set(key, truncate(v.Str()))
case pcommon.ValueTypeBool:
modelpb.Labels(event.Labels).Set(key, strconv.FormatBool(v.Bool()))
case pcommon.ValueTypeInt:
modelpb.NumericLabels(event.NumericLabels).Set(key, float64(v.Int()))
case pcommon.ValueTypeDouble:
modelpb.NumericLabels(event.NumericLabels).Set(key, v.Double())
case pcommon.ValueTypeSlice:
s := v.Slice()
if s.Len() == 0 {
return
}
switch v[0].(type) {
case string:
value := make([]string, len(v))
for i := range v {
value[i] = v[i].(string)
}
modelpb.Labels(event.Labels).SetSlice(key, value)
case float64:
value := make([]float64, len(v))
for i := range v {
value[i] = v[i].(float64)
}
modelpb.NumericLabels(event.NumericLabels).SetSlice(key, value)
switch s.At(0).Type() {
case pcommon.ValueTypeStr:
result := make([]string, 0, s.Len())
for i := 0; i < s.Len(); i++ {
r := s.At(i)
if r.Type() == pcommon.ValueTypeStr {
result = append(result, r.Str())
}
}
modelpb.Labels(event.Labels).SetSlice(key, result)
case pcommon.ValueTypeBool:
result := make([]string, 0, s.Len())
for i := 0; i < s.Len(); i++ {
r := s.At(i)
if r.Type() == pcommon.ValueTypeBool {
result = append(result, strconv.FormatBool(r.Bool()))
}
}
modelpb.Labels(event.Labels).SetSlice(key, result)
case pcommon.ValueTypeDouble:
result := make([]float64, 0, s.Len())
for i := 0; i < s.Len(); i++ {
r := s.At(i)
if r.Type() == pcommon.ValueTypeDouble {
result = append(result, r.Double())
}
}
modelpb.NumericLabels(event.NumericLabels).SetSlice(key, result)
case pcommon.ValueTypeInt:
result := make([]float64, 0, s.Len())
for i := 0; i < s.Len(); i++ {
r := s.At(i)
if r.Type() == pcommon.ValueTypeInt {
result = append(result, float64(r.Int()))
}
}
modelpb.NumericLabels(event.NumericLabels).SetSlice(key, result)
}
}
}
47 changes: 43 additions & 4 deletions input/otlp/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,16 +329,55 @@ func TestResourceConventions(t *testing.T) {
}
}

// This test ensures that the values are properly translated,
// and that heterogeneous array elements are dropped without a panic.
func TestResourceLabels(t *testing.T) {
metadata := transformResourceMetadata(t, map[string]interface{}{
"string_array": []interface{}{"abc", "def"},
"int_array": []interface{}{123, 456},
"string_value": "abc",
"bool_value": true,
"int_value": 123,
"float_value": 1.23,
"string_array": []interface{}{"abc", "def", true, 123, 1.23, nil},
"bool_array": []interface{}{true, false, "true", 123, 1.23, nil},
"int_array": []interface{}{123, 456, "abc", true, 1.23, nil},
"float_array": []interface{}{1.23, 4.56, "abc", true, 123, nil},
"empty_array": []interface{}{}, // Ensure that an empty array is ignored.
})
assert.Equal(t, modelpb.Labels{
"string_array": {Global: true, Values: []string{"abc", "def"}},
"string_value": {
Global: true,
Value: "abc",
},
"bool_value": {
Global: true,
Value: "true",
},
"string_array": {
Global: true,
Values: []string{"abc", "def"},
},
"bool_array": {
Global: true,
Values: []string{"true", "false"},
},
}, modelpb.Labels(metadata.Labels))
assert.Equal(t, modelpb.NumericLabels{
"int_array": {Global: true, Values: []float64{123, 456}},
"int_value": {
Global: true,
Value: 123,
},
"float_value": {
Global: true,
Value: 1.23,
},
"int_array": {
Global: true,
Values: []float64{123, 456},
},
"float_array": {
Global: true,
Values: []float64{1.23, 4.56},
},
}, modelpb.NumericLabels(metadata.NumericLabels))
}

Expand Down
2 changes: 1 addition & 1 deletion input/otlp/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func (c *Consumer) handleScopeMetrics(
}
event.User.Name = truncate(v.Str())
default:
setLabel(k, event, ifaceAttributeValue(v))
setLabel(k, event, v)
}
return true
})
Expand Down
30 changes: 12 additions & 18 deletions input/otlp/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,12 +292,8 @@ func TranslateTransaction(
}
}
default:
setLabel(k, event, ifaceAttributeValue(v))
setLabel(k, event, v)
}
case pcommon.ValueTypeBool:
setLabel(k, event, ifaceAttributeValue(v))
case pcommon.ValueTypeDouble:
setLabel(k, event, ifaceAttributeValue(v))
case pcommon.ValueTypeInt:
switch kDots {
case semconv.AttributeHTTPStatusCode, attributeHttpResponseStatusCode:
Expand All @@ -315,7 +311,7 @@ func TranslateTransaction(
isRPC = true
event.Transaction.Result = codes.Code(v.Int()).String()
default:
setLabel(k, event, ifaceAttributeValue(v))
setLabel(k, event, v)
}
case pcommon.ValueTypeMap:
case pcommon.ValueTypeStr:
Expand Down Expand Up @@ -491,10 +487,11 @@ func TranslateTransaction(
event.DataStream = modelpb.DataStreamFromVTPool()
}
event.DataStream.Namespace = stringval

default:
modelpb.Labels(event.Labels).Set(k, stringval)
}
default:
setLabel(k, event, v)
}
return true
})
Expand Down Expand Up @@ -643,18 +640,14 @@ func TranslateSpan(spanKind ptrace.SpanKind, attributes pcommon.Map, event *mode

k := replaceDots(kDots)
switch v.Type() {
case pcommon.ValueTypeSlice:
setLabel(k, event, ifaceAttributeValueSlice(v.Slice()))
case pcommon.ValueTypeBool:
switch kDots {
case semconv.AttributeMessagingTempDestination:
messageTempDestination = v.Bool()
fallthrough
default:
setLabel(k, event, strconv.FormatBool(v.Bool()))
setLabel(k, event, v)
}
case pcommon.ValueTypeDouble:
setLabel(k, event, v.Double())
case pcommon.ValueTypeInt:
switch kDots {
case "http.status_code", attributeHttpResponseStatusCode:
Expand All @@ -667,7 +660,7 @@ func TranslateSpan(spanKind ptrace.SpanKind, attributes pcommon.Map, event *mode
rpcSystem = "grpc"
isRPC = true
default:
setLabel(k, event, v.Int())
setLabel(k, event, v)
}
case pcommon.ValueTypeStr:
stringval := truncate(v.Str())
Expand Down Expand Up @@ -837,10 +830,11 @@ func TranslateSpan(spanKind ptrace.SpanKind, attributes pcommon.Map, event *mode
event.DataStream = modelpb.DataStreamFromVTPool()
}
event.DataStream.Namespace = stringval

default:
modelpb.Labels(event.Labels).Set(k, stringval)
setLabel(k, event, v)
}
default:
setLabel(k, event, v)
}
return true
})
Expand Down Expand Up @@ -1116,7 +1110,7 @@ func (c *Consumer) convertSpanEvent(
event.DataStream.Namespace = v.Str()

default:
setLabel(replaceDots(k), event, ifaceAttributeValue(v))
setLabel(replaceDots(k), event, v)
}
return true
})
Expand Down Expand Up @@ -1161,7 +1155,7 @@ func (c *Consumer) convertSpanEvent(
event.Message = truncate(v.Str())
return true
}
setLabel(k, event, ifaceAttributeValue(v))
setLabel(k, event, v)
}
return true
})
Expand Down Expand Up @@ -1215,7 +1209,7 @@ func (c *Consumer) convertJaegerErrorSpanEvent(event ptrace.SpanEvent, apmEvent
apmEvent.DataStream.Namespace = v.Str()

default:
setLabel(replaceDots(k), apmEvent, ifaceAttributeValue(v))
setLabel(replaceDots(k), apmEvent, v)
}
return true
})
Expand Down
Loading

0 comments on commit 227954a

Please sign in to comment.