From aee9a7ebc4d568fee411539d2d17dab57a8948c2 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Tue, 11 Jun 2024 20:01:20 +0100 Subject: [PATCH] Refactor otlp model specific logic into processors --- .../internal/objmodel/map.go | 82 +++++++ .../internal/objmodel/map_test.go | 116 ++++++++++ .../internal/objmodel/objmodel.go | 114 ++++------ .../internal/objmodel/pvalue.go | 51 +++++ .../internal/objmodel/pvalue_test.go | 137 ++++++++++++ .../internal/objmodel/spans.go | 36 ++++ .../internal/objmodel/spans_test.go | 74 +++++++ exporter/elasticsearchexporter/model.go | 200 +++++++----------- exporter/elasticsearchexporter/model_test.go | 125 +---------- 9 files changed, 617 insertions(+), 318 deletions(-) create mode 100644 exporter/elasticsearchexporter/internal/objmodel/map.go create mode 100644 exporter/elasticsearchexporter/internal/objmodel/map_test.go create mode 100644 exporter/elasticsearchexporter/internal/objmodel/pvalue.go create mode 100644 exporter/elasticsearchexporter/internal/objmodel/pvalue_test.go create mode 100644 exporter/elasticsearchexporter/internal/objmodel/spans.go create mode 100644 exporter/elasticsearchexporter/internal/objmodel/spans_test.go diff --git a/exporter/elasticsearchexporter/internal/objmodel/map.go b/exporter/elasticsearchexporter/internal/objmodel/map.go new file mode 100644 index 000000000000..80215168d7cf --- /dev/null +++ b/exporter/elasticsearchexporter/internal/objmodel/map.go @@ -0,0 +1,82 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package objmodel + +import ( + "go.opentelemetry.io/collector/pdata/pcommon" +) + +// Map processes a pcommon.Map into key value pairs and adds them to Elasticsearch +// document. Only map types are recursively processed. Map also allows remapping +// keys by passing in a key remapper. Any key remapped via the key remapper to +// an empty string is not added to the resulting document. +type Map struct { + pcommon.Map + + keyRemapper func(string) string +} + +var emptyRemapper = func(k string) string { + return k +} + +// NewMapProcessor creates a new processor of processing pcommon.Map. +func NewMapProcessor(m pcommon.Map, remapper func(string) string) Map { + if remapper == nil { + remapper = emptyRemapper + } + return Map{Map: m, keyRemapper: remapper} +} + +// Len gives the number of entries that will be added to the Document. This +// is an approximate figure as it doesn't count for entries removed via remapper. +func (m Map) Len() int { + return lenMap(m.Map) +} + +// Process iterates over the map and adds the required fields into the document. +// The keys could be remapped to another key as per the remapper function. +func (m Map) Process(doc *Document, key string) { + processMap(m.Map, m.keyRemapper, doc, key) +} + +func lenMap(m pcommon.Map) int { + var count int + m.Range(func(_ string, v pcommon.Value) bool { + switch v.Type() { + case pcommon.ValueTypeEmpty: + // Only maps are expanded in the document + case pcommon.ValueTypeMap: + count += lenMap(v.Map()) + default: + count += 1 + } + return true + }) + return count +} + +func processMap( + m pcommon.Map, + keyRemapper func(string) string, + doc *Document, + key string, +) { + m.Range(func(k string, v pcommon.Value) bool { + k = keyRemapper(flattenKey(key, k)) + if k == "" { + // any empty value for a remapped metric key + // will be skipped + return true + } + + switch v.Type() { + case pcommon.ValueTypeMap: + processMap(v.Map(), keyRemapper, doc, k) + default: + doc.Add(k, ValueFromAttribute(v)) + } + return true + }) +} diff --git a/exporter/elasticsearchexporter/internal/objmodel/map_test.go b/exporter/elasticsearchexporter/internal/objmodel/map_test.go new file mode 100644 index 000000000000..cd25885d0df6 --- /dev/null +++ b/exporter/elasticsearchexporter/internal/objmodel/map_test.go @@ -0,0 +1,116 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package objmodel + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/pcommon" +) + +func TestMap(t *testing.T) { + key := "test" + for _, tc := range []struct { + name string + m pcommon.Map + keyRemapper func(string) string + expectedLen int + expectedDoc Document + }{ + { + name: "empty", + m: pcommon.NewMap(), + expectedDoc: Document{}, + }, + { + name: "map", + m: func() pcommon.Map { + m := pcommon.NewMap() + m.FromRaw(map[string]interface{}{ + "str": "abc", + "num": 1.1, + "bool": true, + "slice": []any{1, 2.1}, + "map": map[string]any{ + "str": "def", + "num": 2, + "bool": false, + "slice": []any{3, 4}, + }, + }) + return m + }(), + expectedLen: 8, + expectedDoc: func() Document { + var doc Document + doc.Add(key+".str", StringValue("abc")) + doc.Add(key+".num", DoubleValue(1.1)) + doc.Add(key+".bool", BoolValue(true)) + doc.Add(key+".slice", ArrValue(IntValue(1), DoubleValue(2.1))) + doc.Add(key+".map.str", StringValue("def")) + doc.Add(key+".map.num", IntValue(2)) + doc.Add(key+".map.bool", BoolValue(false)) + doc.Add(key+".map.slice", ArrValue(IntValue(3), IntValue(4))) + + doc.Sort() + return doc + }(), + }, + { + name: "map_with_remapper", + m: func() pcommon.Map { + m := pcommon.NewMap() + m.FromRaw(map[string]interface{}{ + "str": "abc", + "num": 1.1, + "bool": true, + "slice": []any{1, 2.1}, + "map": map[string]any{ + "str": "def", + "num": 2, + "bool": false, + "slice": []any{3, 4}, + }, + }) + return m + }(), + keyRemapper: func(k string) string { + switch k { + case "test.str": + return "" // should be ignored + case "test.map.num": + return "k.map.num" + } + return k + }, + // expected len is approximate and doesn't accout for entries + // removed via the key remapper + expectedLen: 8, + expectedDoc: func() Document { + var doc Document + doc.Add(key+".num", DoubleValue(1.1)) + doc.Add(key+".bool", BoolValue(true)) + doc.Add(key+".slice", ArrValue(IntValue(1), DoubleValue(2.1))) + doc.Add(key+".map.str", StringValue("def")) + doc.Add("k.map.num", IntValue(2)) + doc.Add(key+".map.bool", BoolValue(false)) + doc.Add(key+".map.slice", ArrValue(IntValue(3), IntValue(4))) + + doc.Sort() + return doc + }(), + }, + } { + t.Run(tc.name, func(t *testing.T) { + var actual Document + p := NewMapProcessor(tc.m, tc.keyRemapper) + p.Process(&actual, key) + actual.Sort() + + assert.Equal(t, tc.expectedLen, p.Len()) + assert.Equal(t, tc.expectedDoc, actual) + }) + } +} diff --git a/exporter/elasticsearchexporter/internal/objmodel/objmodel.go b/exporter/elasticsearchexporter/internal/objmodel/objmodel.go index feaed192871d..7fb1975cd37d 100644 --- a/exporter/elasticsearchexporter/internal/objmodel/objmodel.go +++ b/exporter/elasticsearchexporter/internal/objmodel/objmodel.go @@ -32,7 +32,6 @@ package objmodel // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/objmodel" import ( - "encoding/hex" "io" "math" "slices" @@ -43,7 +42,6 @@ import ( "github.com/elastic/go-structform" "github.com/elastic/go-structform/json" "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/ptrace" ) // Document is an intermediate representation for converting open telemetry records with arbitrary attributes @@ -75,12 +73,7 @@ type Value struct { ts time.Time arr []Value doc Document - - // Raw fields will be processed into one or multiple fields. - rawSpans ptrace.SpanEventSlice - rawVal pcommon.Value - rawM pcommon.Map - rawMMutator func(string, pcommon.Value) + processor processor } // Kind represent the internal kind of a value stored in a Document. @@ -99,14 +92,18 @@ const ( KindObject KindTimestamp KindIgnore - - // Raw values are processed when added and decomposed into - // normal key values before being appended to the document. - KindRawValue - KindRawMap - KindRawSpanEventSlice + KindProcessor ) +// processor defines an interface to allow adding in complex structures +// like OTLP maps and spans. Any processor type value must be processed +// into simpler value types and added to the document via the Process +// method. The process method is invoked by Add and AddMultiple. +type processor interface { + Len() int + Process(*Document, string) +} + const tsLayout = "2006-01-02T15:04:05.000000000Z" var NilValue = Value{kind: KindNil} @@ -149,16 +146,9 @@ func (doc *Document) AddMultiple(kvs ...KeyValue) { var capacity int for _, kv := range kvs { switch kv.Value.kind { - case KindRawValue: - // TODO (lahsivjar): better estimation, value can be slice or map - capacity += 1 - case KindRawMap: - capacity += kv.Value.rawM.Len() - case KindRawSpanEventSlice: - for i := 0; i < kv.Value.rawSpans.Len(); i++ { - // Each event adds timestamp and attributes to the doc - capacity += 1 + kv.Value.rawSpans.At(i).Attributes().Len() - } + case KindIgnore, KindNil: + case KindProcessor: + capacity += kv.Value.processor.Len() default: capacity += 1 } @@ -167,19 +157,9 @@ func (doc *Document) AddMultiple(kvs ...KeyValue) { for _, kv := range kvs { switch kv.Value.kind { - case KindRawValue: - doc.AddAttribute(kv.Key, kv.Value.rawVal) - case KindRawMap: - doc.fields = appendAttributeFields(doc.fields, kv.Key, kv.Value.rawM) - case KindRawSpanEventSlice: - for i := 0; i < kv.Value.rawSpans.Len(); i++ { - e := kv.Value.rawSpans.At(i) - fkey := flattenKey(kv.Key, e.Name()) - kv := NewKV(fkey+".time", TimestampValue(e.Timestamp())) - - doc.fields = append(doc.fields, kv) - doc.AddAttributes(fkey, e.Attributes()) - } + case KindIgnore, KindNil: + case KindProcessor: + kv.Value.processor.Process(doc, kv.Key) default: doc.fields = append(doc.fields, kv) } @@ -405,8 +385,8 @@ func StringValue(str string) Value { return Value{kind: KindString, str: str} } -// NonEmptyStringValue create a new value from a string. -func NonEmptyStringValue(str string) Value { +// NonZeroStringValue create a new string value if string is non zero. +func NonZeroStringValue(str string) Value { if str == "" { return NilValue } @@ -418,6 +398,14 @@ func IntValue(i int64) Value { return Value{kind: KindInt, primitive: uint64(i)} } +// NonZeroIntValue creates a new int value if int is non zero. +func NonZeroIntValue(i int64) Value { + if i == 0 { + return NilValue + } + return IntValue(i) +} + // DoubleValue creates a new value from a double value.. func DoubleValue(d float64) Value { return Value{kind: KindDouble, dbl: d} @@ -442,20 +430,18 @@ func TimestampValue(ts pcommon.Timestamp) Value { return Value{kind: KindTimestamp, ts: ts.AsTime()} } -// TraceIDValue creates a new value from a pcommon.TraceID. -func TraceIDValue(id pcommon.TraceID) Value { - if id.IsEmpty() { - return NilValue - } - return StringValue(hex.EncodeToString(id[:])) +// DocumentValue creates a new value from a document. +func DocumentValue(d Document) Value { + return Value{kind: KindObject, doc: d} } -// SpanIDValue creates a new value from a pcommon.SpanID. -func SpanIDValue(id pcommon.SpanID) Value { - if id.IsEmpty() { - return NilValue - } - return StringValue(hex.EncodeToString(id[:])) +// ProcessorValue creates a processor type value. A Processor defines +// an interface to allow adding in complex structures like OTLP maps +// and spans. Any Processor type value must be processed into simpler +// value types and added to the document via the Process method using +// the various Add methods supported by the Document. +func ProcessorValue(p processor) Value { + return Value{kind: KindProcessor, processor: p} } // ValueFromAttribute converts a AttributeValue into a value. @@ -480,24 +466,6 @@ func ValueFromAttribute(attr pcommon.Value) Value { } } -// RawMapValue adds a raw pcommon.Map to be processed and -// subsequently added to the document. -func RawMapValue(m pcommon.Map) Value { - return Value{kind: KindRawMap, rawM: m} -} - -// RawMapValue adds a raw pcommon.Value to be processed and -// subsequently added to the document. -func RawValue(v pcommon.Value) Value { - return Value{kind: KindRawValue, rawVal: v} -} - -// RawSpans adds a raw ptrace.SpanEventSlice to be processed and -// subsequently added to the document. -func RawSpans(s ptrace.SpanEventSlice) Value { - return Value{kind: KindRawSpanEventSlice, rawSpans: s} -} - // Sort recursively sorts all keys in docuemts held by the value. func (v *Value) Sort() { switch v.kind { @@ -532,12 +500,8 @@ func (v *Value) IsEmpty() bool { return len(v.arr) == 0 case KindObject: return len(v.doc.fields) == 0 - case KindRawMap: - return v.rawM.Len() == 0 - case KindRawSpanEventSlice: - return v.rawSpans.Len() == 0 - case KindRawValue: - return v.rawVal.Type() == pcommon.ValueTypeEmpty + case KindProcessor: + return v.processor.Len() == 0 default: return false } diff --git a/exporter/elasticsearchexporter/internal/objmodel/pvalue.go b/exporter/elasticsearchexporter/internal/objmodel/pvalue.go new file mode 100644 index 000000000000..4a82903cc8b3 --- /dev/null +++ b/exporter/elasticsearchexporter/internal/objmodel/pvalue.go @@ -0,0 +1,51 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package objmodel + +import ( + "go.opentelemetry.io/collector/pdata/pcommon" +) + +// PValue processes a pcommon.Value into key value pairs and adds them to +// the Elasticsearch document. Only map values are processed recursively. +type PValue struct { + pcommon.Value + + // Cache map to prevent recreation of a new map if value type is map + m Map +} + +// NewPValueProcessor creates a new processor for processing pcommon.Value. +func NewPValueProcessor(v pcommon.Value) PValue { + pv := PValue{Value: v} + if v.Type() == pcommon.ValueTypeMap { + pv.m = NewMapProcessor(v.Map(), nil) + } + return pv +} + +// Len gives the number of entries that will be added to the Elasticsearch document. +func (pv PValue) Len() int { + switch pv.Type() { + case pcommon.ValueTypeEmpty: + return 0 + case pcommon.ValueTypeMap: + return pv.m.Len() + } + return 1 +} + +// Process iterates over the value types and adds them to the provided document +// against a given key. +func (pv PValue) Process(doc *Document, key string) { + // Map is flattened, everything else is encoded as a single field + switch pv.Type() { + case pcommon.ValueTypeEmpty: + return + case pcommon.ValueTypeMap: + pv.m.Process(doc, key) + return + } + doc.Add(key, ValueFromAttribute(pv.Value)) +} diff --git a/exporter/elasticsearchexporter/internal/objmodel/pvalue_test.go b/exporter/elasticsearchexporter/internal/objmodel/pvalue_test.go new file mode 100644 index 000000000000..e903f06b2d17 --- /dev/null +++ b/exporter/elasticsearchexporter/internal/objmodel/pvalue_test.go @@ -0,0 +1,137 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package objmodel + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" +) + +func TestPValue(t *testing.T) { + key := "test" + for _, tc := range []struct { + name string + input pcommon.Value + expectedLen int + expectedDoc Document + }{ + { + name: "empty", + input: pcommon.NewValueEmpty(), + expectedDoc: Document{}, + }, + { + name: "int", + input: pcommon.NewValueInt(7), + expectedLen: 1, + expectedDoc: func() Document { + var doc Document + doc.Add(key, IntValue(7)) + return doc + }(), + }, + { + name: "double", + input: pcommon.NewValueDouble(7.1), + expectedLen: 1, + expectedDoc: func() Document { + var doc Document + doc.Add(key, DoubleValue(7.1)) + return doc + }(), + }, + { + name: "bool", + input: pcommon.NewValueBool(true), + expectedLen: 1, + expectedDoc: func() Document { + var doc Document + doc.Add(key, BoolValue(true)) + return doc + }(), + }, + { + name: "slice", + input: func() pcommon.Value { + s := pcommon.NewValueSlice() + require.NoError(t, s.FromRaw([]any{ + 1, + 2.1, + map[string]any{ + "str": "abc", + "num": 1, + "bool": true, + }, + })) + return s + }(), + expectedLen: 1, // Slices are not expanded + expectedDoc: func() Document { + var doc Document + doc.Add(key, ArrValue( + IntValue(1), + DoubleValue(2.1), + DocumentValue(func() Document { + var d Document + d.Add("str", StringValue("abc")) + d.Add("num", IntValue(1)) + d.Add("bool", BoolValue(true)) + + return d + }()), + )) + + doc.Sort() + return doc + }(), + }, + { + name: "map", + input: func() pcommon.Value { + m := pcommon.NewValueMap() + require.NoError(t, m.FromRaw(map[string]interface{}{ + "str": "abc", + "num": 1, + "bool": true, + "slice": []any{1, 2}, + "map": map[string]any{ + "str": "def", + "num": 2, + "bool": false, + "slice": []any{3, 4}, + }, + })) + return m + }(), + expectedLen: 8, + expectedDoc: func() Document { + var doc Document + doc.Add(key+".str", StringValue("abc")) + doc.Add(key+".num", IntValue(1)) + doc.Add(key+".bool", BoolValue(true)) + doc.Add(key+".slice", ArrValue(IntValue(1), IntValue(2))) + doc.Add(key+".map.str", StringValue("def")) + doc.Add(key+".map.num", IntValue(2)) + doc.Add(key+".map.bool", BoolValue(false)) + doc.Add(key+".map.slice", ArrValue(IntValue(3), IntValue(4))) + + doc.Sort() + return doc + }(), + }, + } { + t.Run(tc.name, func(t *testing.T) { + var actual Document + p := NewPValueProcessor(tc.input) + p.Process(&actual, key) + actual.Sort() + + assert.Equal(t, tc.expectedLen, p.Len()) + assert.Equal(t, tc.expectedDoc, actual) + }) + } +} diff --git a/exporter/elasticsearchexporter/internal/objmodel/spans.go b/exporter/elasticsearchexporter/internal/objmodel/spans.go new file mode 100644 index 000000000000..95cc44422832 --- /dev/null +++ b/exporter/elasticsearchexporter/internal/objmodel/spans.go @@ -0,0 +1,36 @@ +package objmodel + +import "go.opentelemetry.io/collector/pdata/ptrace" + +// Spans processes a ptrace.SpanEventSlice into key value pairs and adds +// them to the Elasticsearch document. +type Spans struct { + ptrace.SpanEventSlice +} + +// NewSpansProcessor creates a new processor for processing ptrace.SpanEventSlice. +func NewSpansProcessor(s ptrace.SpanEventSlice) Spans { + return Spans{SpanEventSlice: s} +} + +// Len gives the number of entries that will be added to the Elasticsearch document. +func (s Spans) Len() int { + var count int + for i := 0; i < s.SpanEventSlice.Len(); i++ { + count += 1 /*timestamp*/ + s.At(i).Attributes().Len() + } + return count +} + +// Process iterates over the spans and adds them to the provided document against a +// given key. +func (s Spans) Process(doc *Document, key string) { + for i := 0; i < s.SpanEventSlice.Len(); i++ { + span := s.At(i) + fKey := flattenKey(key, span.Name()) + + doc.Add(fKey+".time", TimestampValue(span.Timestamp())) + // TODO (lahsivjar): Defer this to map processor and deprecate AddAttribute* + doc.AddAttributes(fKey, span.Attributes()) + } +} diff --git a/exporter/elasticsearchexporter/internal/objmodel/spans_test.go b/exporter/elasticsearchexporter/internal/objmodel/spans_test.go new file mode 100644 index 000000000000..1f456e16e4fc --- /dev/null +++ b/exporter/elasticsearchexporter/internal/objmodel/spans_test.go @@ -0,0 +1,74 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package objmodel + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +func TestSpans(t *testing.T) { + key := "test" + ts := pcommon.NewTimestampFromTime(time.Now()) + for _, tc := range []struct { + name string + s ptrace.SpanEventSlice + expectedLen int + expectedDoc Document + }{ + { + name: "empty", + s: ptrace.NewSpanEventSlice(), + expectedDoc: Document{}, + }, + { + name: "spans", + s: func() ptrace.SpanEventSlice { + spans := ptrace.NewSpanEventSlice() + for i := 0; i < 5; i++ { + e := spans.AppendEmpty() + e.SetName(fmt.Sprintf("test%d", i)) + e.SetTimestamp(ts) + e.Attributes().FromRaw(map[string]any{ + "str": "abc", + "num": 1, + "bool": true, + }) + } + + return spans + }(), + expectedLen: 20, + expectedDoc: func() Document { + var d Document + + for i := 0; i < 5; i++ { + k := fmt.Sprintf("%s.test%d", key, i) + d.Add(k+".time", TimestampValue(ts)) + d.Add(k+".str", StringValue("abc")) + d.Add(k+".num", IntValue(1)) + d.Add(k+".bool", BoolValue(true)) + } + + d.Sort() + return d + }(), + }, + } { + t.Run(tc.name, func(t *testing.T) { + var actual Document + p := NewSpansProcessor(tc.s) + p.Process(&actual, key) + actual.Sort() + + assert.Equal(t, tc.expectedLen, p.Len()) + assert.Equal(t, tc.expectedDoc, actual) + }) + } +} diff --git a/exporter/elasticsearchexporter/model.go b/exporter/elasticsearchexporter/model.go index 63a0785a91f9..19fd9d0b8447 100644 --- a/exporter/elasticsearchexporter/model.go +++ b/exporter/elasticsearchexporter/model.go @@ -17,47 +17,52 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil" ) -// resourceAttrsConversionMap contains conversions for resource-level attributes -// from their Semantic Conventions (SemConv) names to equivalent Elastic Common -// Schema (ECS) names. -// If the ECS field name is specified as an empty string (""), the converter will -// neither convert the SemConv key to the equivalent ECS name nor pass-through the -// SemConv key as-is to become the ECS name. -var resourceAttrsConversionMap = map[string]string{ - semconv.AttributeServiceInstanceID: "service.node.name", - semconv.AttributeDeploymentEnvironment: "service.environment", - semconv.AttributeTelemetrySDKName: "", - semconv.AttributeTelemetrySDKLanguage: "", - semconv.AttributeTelemetrySDKVersion: "", - semconv.AttributeTelemetryDistroName: "", - semconv.AttributeTelemetryDistroVersion: "", - semconv.AttributeCloudPlatform: "cloud.service.name", - semconv.AttributeContainerImageTags: "container.image.tag", - semconv.AttributeHostName: "host.hostname", - semconv.AttributeHostArch: "host.architecture", - semconv.AttributeProcessExecutablePath: "process.executable", - semconv.AttributeProcessRuntimeName: "service.runtime.name", - semconv.AttributeProcessRuntimeVersion: "service.runtime.version", - semconv.AttributeOSName: "host.os.name", - semconv.AttributeOSType: "host.os.platform", - semconv.AttributeOSDescription: "host.os.full", - semconv.AttributeOSVersion: "host.os.version", - "k8s.namespace.name": "kubernetes.namespace", - "k8s.node.name": "kubernetes.node.name", - "k8s.pod.name": "kubernetes.pod.name", - "k8s.pod.uid": "kubernetes.pod.uid", -} +var ( + // resourceAttrsConversionMap contains conversions for resource-level attributes + // from their Semantic Conventions (SemConv) names to equivalent Elastic Common + // Schema (ECS) names. + // If the ECS field name is specified as an empty string (""), the converter will + // neither convert the SemConv key to the equivalent ECS name nor pass-through the + // SemConv key as-is to become the ECS name. + resourceAttrsConversionMap = map[string]string{ + semconv.AttributeServiceInstanceID: "service.node.name", + semconv.AttributeDeploymentEnvironment: "service.environment", + semconv.AttributeTelemetrySDKName: "", + semconv.AttributeTelemetrySDKLanguage: "", + semconv.AttributeTelemetrySDKVersion: "", + semconv.AttributeTelemetryDistroName: "", + semconv.AttributeTelemetryDistroVersion: "", + semconv.AttributeCloudPlatform: "cloud.service.name", + semconv.AttributeContainerImageTags: "container.image.tag", + semconv.AttributeHostName: "host.hostname", + semconv.AttributeHostArch: "host.architecture", + semconv.AttributeProcessExecutablePath: "process.executable", + semconv.AttributeProcessRuntimeName: "service.runtime.name", + semconv.AttributeProcessRuntimeVersion: "service.runtime.version", + semconv.AttributeOSName: "host.os.name", + semconv.AttributeOSType: "host.os.platform", + semconv.AttributeOSDescription: "host.os.full", + semconv.AttributeOSVersion: "host.os.version", + "k8s.namespace.name": "kubernetes.namespace", + "k8s.node.name": "kubernetes.node.name", + "k8s.pod.name": "kubernetes.pod.name", + "k8s.pod.uid": "kubernetes.pod.uid", + } -// resourceAttrsConversionMap contains conversions for log record attributes -// from their Semantic Conventions (SemConv) names to equivalent Elastic Common -// Schema (ECS) names. -var recordAttrsConversionMap = map[string]string{ - "event.name": "event.action", - semconv.AttributeExceptionMessage: "error.message", - semconv.AttributeExceptionStacktrace: "error.stacktrace", - semconv.AttributeExceptionType: "error.type", - semconv.AttributeExceptionEscaped: "event.error.exception.handled", -} + // resourceAttrsConversionMap contains conversions for log record attributes + // from their Semantic Conventions (SemConv) names to equivalent Elastic Common + // Schema (ECS) names. + recordAttrsConversionMap = map[string]string{ + "event.name": "event.action", + semconv.AttributeExceptionMessage: "error.message", + semconv.AttributeExceptionStacktrace: "error.stacktrace", + semconv.AttributeExceptionType: "error.type", + semconv.AttributeExceptionEscaped: "event.error.exception.handled", + } + + resourceAttrsConversionMapRemapper = keyRemapperFromMap(resourceAttrsConversionMap) + recordAttrsConversionMapRemapper = keyRemapperFromMap(recordAttrsConversionMap) +) type mappingModel interface { encodeLog(pcommon.Resource, plog.LogRecord, pcommon.InstrumentationScope) ([]byte, error) @@ -118,19 +123,19 @@ func (m *encodeModel) encodeLogDefaultMode(resource pcommon.Resource, record plo // We use @timestamp in order to ensure that we can index if the // default data stream logs template is used. objmodel.NewKV("@timestamp", objmodel.TimestampValue(docTs)), - objmodel.NewKV("TraceId", objmodel.TraceIDValue(record.TraceID())), - objmodel.NewKV("SpanId", objmodel.SpanIDValue(record.SpanID())), + objmodel.NewKV("TraceId", objmodel.NonZeroStringValue(record.TraceID().String())), + objmodel.NewKV("SpanId", objmodel.NonZeroStringValue(record.SpanID().String())), objmodel.NewKV("TraceFlags", objmodel.IntValue(int64(record.Flags()))), - objmodel.NewKV("SeverityText", objmodel.NonEmptyStringValue(record.SeverityText())), + objmodel.NewKV("SeverityText", objmodel.NonZeroStringValue(record.SeverityText())), objmodel.NewKV("SeverityNumber", objmodel.IntValue(int64(record.SeverityNumber()))), - objmodel.NewKV("Body", objmodel.RawValue(record.Body())), + objmodel.NewKV("Body", objmodel.ProcessorValue(objmodel.NewPValueProcessor(record.Body()))), // Add scope name and version as additional scope attributes. // Empty values are also allowed to be added. objmodel.NewKV("Scope.name", objmodel.StringValue(scope.Name())), objmodel.NewKV("Scope.version", objmodel.StringValue(scope.Version())), - objmodel.NewKV("Resource", objmodel.RawMapValue(resource.Attributes())), - objmodel.NewKV("Scope", objmodel.RawMapValue(scope.Attributes())), - objmodel.NewKV(recordAttrKey, objmodel.RawMapValue(record.Attributes())), + objmodel.NewKV("Resource", objmodel.ProcessorValue(objmodel.NewMapProcessor(resource.Attributes(), nil))), + objmodel.NewKV("Scope", objmodel.ProcessorValue(objmodel.NewMapProcessor(scope.Attributes(), nil))), + objmodel.NewKV(recordAttrKey, objmodel.ProcessorValue(objmodel.NewMapProcessor(record.Attributes(), nil))), ) return document @@ -140,45 +145,19 @@ func (m *encodeModel) encodeLogDefaultMode(resource pcommon.Resource, record plo func (m *encodeModel) encodeLogECSMode(resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope) objmodel.Document { var document objmodel.Document - // Appoximate capacity to reduce allocations - resourceAttrs := resource.Attributes() - scopeAttrs := scope.Attributes() - recordAttrs := record.Attributes() - document.EnsureCapacity(9 + // constant number of fields added - resourceAttrs.Len() + - scopeAttrs.Len() + - recordAttrs.Len(), - ) - - // TODO (lahsivjar): move to objmodel with a new function, something like ...WithKeyMutator(...) - // First, try to map resource-level attributes to ECS fields. - encodeLogAttributesECSMode(&document, resourceAttrs, resourceAttrsConversionMap) - - // Then, try to map scope-level attributes to ECS fields. - scopeAttrsConversionMap := map[string]string{ - // None at the moment - } - encodeLogAttributesECSMode(&document, scopeAttrs, scopeAttrsConversionMap) - - // Finally, try to map record-level attributes to ECS fields. - encodeLogAttributesECSMode(&document, recordAttrs, recordAttrsConversionMap) - - severity := objmodel.NilValue - if n := record.SeverityNumber(); n != plog.SeverityNumberUnspecified { - severity = objmodel.IntValue(int64(record.SeverityNumber())) - } - - // Handle special cases. document.AddMultiple( objmodel.NewKV("@timestamp", getTimestampForECS(record)), objmodel.NewKV("agent.name", getAgentNameForECS(resource)), objmodel.NewKV("agent.version", getAgentVersionForECS(resource)), objmodel.NewKV("host.os.type", getHostOsTypeForECS(resource)), - objmodel.NewKV("trace.id", objmodel.TraceIDValue(record.TraceID())), - objmodel.NewKV("span.id", objmodel.SpanIDValue(record.SpanID())), - objmodel.NewKV("log.level", objmodel.NonEmptyStringValue(record.SeverityText())), - objmodel.NewKV("event.severity", severity), - objmodel.NewKV("message", objmodel.RawValue(record.Body())), + objmodel.NewKV("trace.id", objmodel.NonZeroStringValue(record.TraceID().String())), + objmodel.NewKV("span.id", objmodel.NonZeroStringValue(record.SpanID().String())), + objmodel.NewKV("log.level", objmodel.NonZeroStringValue(record.SeverityText())), + objmodel.NewKV("event.severity", objmodel.NonZeroIntValue(int64(record.SeverityNumber()))), + objmodel.NewKV("message", objmodel.ProcessorValue(objmodel.NewPValueProcessor(record.Body()))), + objmodel.NewKV("", objmodel.ProcessorValue(objmodel.NewMapProcessor(resource.Attributes(), resourceAttrsConversionMapRemapper))), + objmodel.NewKV("", objmodel.ProcessorValue(objmodel.NewMapProcessor(scope.Attributes(), nil))), + objmodel.NewKV("", objmodel.ProcessorValue(objmodel.NewMapProcessor(record.Attributes(), recordAttrsConversionMapRemapper))), ) return document @@ -197,23 +176,23 @@ func (m *encodeModel) encodeSpan(resource pcommon.Resource, span ptrace.Span, sc document.AddMultiple( objmodel.NewKV("@timestamp", objmodel.TimestampValue(span.StartTimestamp())), objmodel.NewKV("EndTimestamp", objmodel.TimestampValue(span.EndTimestamp())), - objmodel.NewKV("TraceId", objmodel.TraceIDValue(span.TraceID())), - objmodel.NewKV("SpanId", objmodel.SpanIDValue(span.SpanID())), - objmodel.NewKV("ParentSpanId", objmodel.SpanIDValue(span.ParentSpanID())), - objmodel.NewKV("Name", objmodel.NonEmptyStringValue(span.Name())), - objmodel.NewKV("Kind", objmodel.NonEmptyStringValue(traceutil.SpanKindStr(span.Kind()))), + objmodel.NewKV("TraceId", objmodel.NonZeroStringValue(span.TraceID().String())), + objmodel.NewKV("SpanId", objmodel.NonZeroStringValue(span.SpanID().String())), + objmodel.NewKV("ParentSpanId", objmodel.NonZeroStringValue(span.ParentSpanID().String())), + objmodel.NewKV("Name", objmodel.NonZeroStringValue(span.Name())), + objmodel.NewKV("Kind", objmodel.NonZeroStringValue(traceutil.SpanKindStr(span.Kind()))), objmodel.NewKV("TraceStatus", objmodel.IntValue(int64(span.Status().Code()))), - objmodel.NewKV("TraceStatusDescription", objmodel.NonEmptyStringValue(span.Status().Message())), - objmodel.NewKV("Link", objmodel.NonEmptyStringValue(spanLinksToString(span.Links()))), + objmodel.NewKV("TraceStatusDescription", objmodel.NonZeroStringValue(span.Status().Message())), + objmodel.NewKV("Link", objmodel.NonZeroStringValue(spanLinksToString(span.Links()))), objmodel.NewKV("Duration", objmodel.IntValue(durationAsMicroseconds(span.StartTimestamp(), span.EndTimestamp()))), // Add scope name and version as additional scope attributes // Empty values are also allowed to be added. objmodel.NewKV("Scope.name", objmodel.StringValue(scope.Name())), objmodel.NewKV("Scope.version", objmodel.StringValue(scope.Version())), - objmodel.NewKV("Resource", objmodel.RawMapValue(resource.Attributes())), - objmodel.NewKV("Scope", objmodel.RawMapValue(scope.Attributes())), - objmodel.NewKV(spanAttrKey, objmodel.RawMapValue(span.Attributes())), - objmodel.NewKV(eventsKey, objmodel.RawSpans(span.Events())), + objmodel.NewKV("Resource", objmodel.ProcessorValue(objmodel.NewMapProcessor(resource.Attributes(), nil))), + objmodel.NewKV("Scope", objmodel.ProcessorValue(objmodel.NewMapProcessor(scope.Attributes(), nil))), + objmodel.NewKV(spanAttrKey, objmodel.ProcessorValue(objmodel.NewMapProcessor(span.Attributes(), nil))), + objmodel.NewKV(eventsKey, objmodel.ProcessorValue(objmodel.NewSpansProcessor(span.Events()))), ) if m.dedup { @@ -248,30 +227,13 @@ func durationAsMicroseconds(start, end pcommon.Timestamp) int64 { return (end.AsTime().UnixNano() - start.AsTime().UnixNano()) / 1000 } -func encodeLogAttributesECSMode(document *objmodel.Document, attrs pcommon.Map, conversionMap map[string]string) { - if len(conversionMap) == 0 { - // No conversions to be done; add all attributes at top level of - // document. - document.AddAttributes("", attrs) - return - } - - attrs.Range(func(k string, v pcommon.Value) bool { - // If ECS key is found for current k in conversion map, use it. - if ecsKey, exists := conversionMap[k]; exists { - if ecsKey == "" { - // Skip the conversion for this k. - return true - } - - document.AddAttribute(ecsKey, v) - return true +func keyRemapperFromMap(m map[string]string) func(string) string { + return func(orig string) string { + if remappedKey, ok := m[orig]; ok { + return remappedKey } - - // Otherwise, add key at top level with attribute name as-is. - document.AddAttribute(k, v) - return true - }) + return orig + } } func getAgentNameForECS(resource pcommon.Resource) objmodel.Value { @@ -302,18 +264,18 @@ func getAgentNameForECS(resource pcommon.Resource) objmodel.Value { agentName = fmt.Sprintf("%s/%s", agentName, telemetrySdkLanguage) } - return objmodel.NonEmptyStringValue(agentName) + return objmodel.NonZeroStringValue(agentName) } func getAgentVersionForECS(resource pcommon.Resource) objmodel.Value { attrs := resource.Attributes() if telemetryDistroVersion, exists := attrs.Get(semconv.AttributeTelemetryDistroVersion); exists { - return objmodel.NonEmptyStringValue(telemetryDistroVersion.Str()) + return objmodel.NonZeroStringValue(telemetryDistroVersion.Str()) } if telemetrySdkVersion, exists := attrs.Get(semconv.AttributeTelemetrySDKVersion); exists { - return objmodel.NonEmptyStringValue(telemetrySdkVersion.Str()) + return objmodel.NonZeroStringValue(telemetrySdkVersion.Str()) } return objmodel.NilValue } @@ -348,7 +310,7 @@ func getHostOsTypeForECS(resource pcommon.Resource) objmodel.Value { if ecsHostOsType == "" { return objmodel.NilValue } - return objmodel.NonEmptyStringValue(ecsHostOsType) + return objmodel.NonZeroStringValue(ecsHostOsType) } func getTimestampForECS(record plog.LogRecord) objmodel.Value { diff --git a/exporter/elasticsearchexporter/model_test.go b/exporter/elasticsearchexporter/model_test.go index d0bfa6278008..af08f3f78e74 100644 --- a/exporter/elasticsearchexporter/model_test.go +++ b/exporter/elasticsearchexporter/model_test.go @@ -451,7 +451,7 @@ func TestEncodeLogECSModeAgentVersion(t *testing.T) { expectedDoc := objmodel.Document{} expectedDoc.Add("@timestamp", objmodel.TimestampValue(timestamp)) expectedDoc.Add("agent.name", objmodel.StringValue("otlp")) - expectedDoc.Add("agent.version", objmodel.StringValue(test.expectedAgentVersion)) + expectedDoc.Add("agent.version", objmodel.NonZeroStringValue(test.expectedAgentVersion)) doc.Sort() expectedDoc.Sort() @@ -612,126 +612,3 @@ func TestEncodeLogECSModeTimestamps(t *testing.T) { }) } } - -func TestMapLogAttributesToECS(t *testing.T) { - tests := map[string]struct { - attrs func() pcommon.Map - conversionMap map[string]string - expectedDoc func() objmodel.Document - }{ - "no_attrs": { - attrs: pcommon.NewMap, - conversionMap: map[string]string{ - "foo.bar": "baz", - }, - expectedDoc: func() objmodel.Document { - return objmodel.Document{} - }, - }, - "no_conversion_map": { - attrs: func() pcommon.Map { - m := pcommon.NewMap() - m.PutStr("foo.bar", "baz") - return m - }, - expectedDoc: func() objmodel.Document { - d := objmodel.Document{} - d.Add("foo.bar", objmodel.StringValue("baz")) - return d - }, - }, - "empty_conversion_map": { - attrs: func() pcommon.Map { - m := pcommon.NewMap() - m.PutStr("foo.bar", "baz") - return m - }, - conversionMap: map[string]string{}, - expectedDoc: func() objmodel.Document { - d := objmodel.Document{} - d.Add("foo.bar", objmodel.StringValue("baz")) - return d - }, - }, - "all_attrs_in_conversion_map": { - attrs: func() pcommon.Map { - m := pcommon.NewMap() - m.PutStr("foo.bar", "baz") - m.PutInt("qux", 17) - return m - }, - conversionMap: map[string]string{ - "foo.bar": "bar.qux", - "qux": "foo", - }, - expectedDoc: func() objmodel.Document { - d := objmodel.Document{} - d.Add("bar.qux", objmodel.StringValue("baz")) - d.Add("foo", objmodel.IntValue(17)) - return d - }, - }, - "some_attrs_in_conversion_map": { - attrs: func() pcommon.Map { - m := pcommon.NewMap() - m.PutStr("foo.bar", "baz") - m.PutInt("qux", 17) - return m - }, - conversionMap: map[string]string{ - "foo.bar": "bar.qux", - }, - expectedDoc: func() objmodel.Document { - d := objmodel.Document{} - d.Add("bar.qux", objmodel.StringValue("baz")) - d.Add("qux", objmodel.IntValue(17)) - return d - }, - }, - "no_attrs_in_conversion_map": { - attrs: func() pcommon.Map { - m := pcommon.NewMap() - m.PutStr("foo.bar", "baz") - m.PutInt("qux", 17) - return m - }, - conversionMap: map[string]string{ - "baz": "qux", - }, - expectedDoc: func() objmodel.Document { - d := objmodel.Document{} - d.Add("foo.bar", objmodel.StringValue("baz")) - d.Add("qux", objmodel.IntValue(17)) - return d - }, - }, - "extra_keys_in_conversion_map": { - attrs: func() pcommon.Map { - m := pcommon.NewMap() - m.PutStr("foo.bar", "baz") - return m - }, - conversionMap: map[string]string{ - "foo.bar": "bar.qux", - "qux": "foo", - }, - expectedDoc: func() objmodel.Document { - d := objmodel.Document{} - d.Add("bar.qux", objmodel.StringValue("baz")) - return d - }, - }, - } - - for name, test := range tests { - t.Run(name, func(t *testing.T) { - var doc objmodel.Document - encodeLogAttributesECSMode(&doc, test.attrs(), test.conversionMap) - - doc.Sort() - expectedDoc := test.expectedDoc() - expectedDoc.Sort() - require.Equal(t, expectedDoc, doc) - }) - } -}