Skip to content

Commit

Permalink
Refactor otlp model specific logic into processors
Browse files Browse the repository at this point in the history
  • Loading branch information
lahsivjar committed Jun 12, 2024
1 parent e4f226f commit aee9a7e
Show file tree
Hide file tree
Showing 9 changed files with 617 additions and 318 deletions.
82 changes: 82 additions & 0 deletions exporter/elasticsearchexporter/internal/objmodel/map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package objmodel

import (
"go.opentelemetry.io/collector/pdata/pcommon"
)

// Map processes a pcommon.Map into key value pairs and adds them to Elasticsearch
// document. Only map types are recursively processed. Map also allows remapping
// keys by passing in a key remapper. Any key remapped via the key remapper to
// an empty string is not added to the resulting document.
type Map struct {
pcommon.Map

keyRemapper func(string) string
}

var emptyRemapper = func(k string) string {
return k
}

// NewMapProcessor creates a new processor of processing pcommon.Map.
func NewMapProcessor(m pcommon.Map, remapper func(string) string) Map {
if remapper == nil {
remapper = emptyRemapper
}
return Map{Map: m, keyRemapper: remapper}
}

// Len gives the number of entries that will be added to the Document. This
// is an approximate figure as it doesn't count for entries removed via remapper.
func (m Map) Len() int {
return lenMap(m.Map)
}

// Process iterates over the map and adds the required fields into the document.
// The keys could be remapped to another key as per the remapper function.
func (m Map) Process(doc *Document, key string) {
processMap(m.Map, m.keyRemapper, doc, key)
}

func lenMap(m pcommon.Map) int {
var count int
m.Range(func(_ string, v pcommon.Value) bool {
switch v.Type() {
case pcommon.ValueTypeEmpty:
// Only maps are expanded in the document
case pcommon.ValueTypeMap:
count += lenMap(v.Map())
default:
count += 1
}
return true
})
return count
}

func processMap(
m pcommon.Map,
keyRemapper func(string) string,
doc *Document,
key string,
) {
m.Range(func(k string, v pcommon.Value) bool {
k = keyRemapper(flattenKey(key, k))
if k == "" {
// any empty value for a remapped metric key
// will be skipped
return true
}

switch v.Type() {
case pcommon.ValueTypeMap:
processMap(v.Map(), keyRemapper, doc, k)
default:
doc.Add(k, ValueFromAttribute(v))
}
return true
})
}
116 changes: 116 additions & 0 deletions exporter/elasticsearchexporter/internal/objmodel/map_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package objmodel

import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/pcommon"
)

func TestMap(t *testing.T) {
key := "test"
for _, tc := range []struct {
name string
m pcommon.Map
keyRemapper func(string) string
expectedLen int
expectedDoc Document
}{
{
name: "empty",
m: pcommon.NewMap(),
expectedDoc: Document{},
},
{
name: "map",
m: func() pcommon.Map {
m := pcommon.NewMap()
m.FromRaw(map[string]interface{}{
"str": "abc",
"num": 1.1,
"bool": true,
"slice": []any{1, 2.1},
"map": map[string]any{
"str": "def",
"num": 2,
"bool": false,
"slice": []any{3, 4},
},
})
return m
}(),
expectedLen: 8,
expectedDoc: func() Document {
var doc Document
doc.Add(key+".str", StringValue("abc"))
doc.Add(key+".num", DoubleValue(1.1))
doc.Add(key+".bool", BoolValue(true))
doc.Add(key+".slice", ArrValue(IntValue(1), DoubleValue(2.1)))
doc.Add(key+".map.str", StringValue("def"))
doc.Add(key+".map.num", IntValue(2))
doc.Add(key+".map.bool", BoolValue(false))
doc.Add(key+".map.slice", ArrValue(IntValue(3), IntValue(4)))

doc.Sort()
return doc
}(),
},
{
name: "map_with_remapper",
m: func() pcommon.Map {
m := pcommon.NewMap()
m.FromRaw(map[string]interface{}{
"str": "abc",
"num": 1.1,
"bool": true,
"slice": []any{1, 2.1},
"map": map[string]any{
"str": "def",
"num": 2,
"bool": false,
"slice": []any{3, 4},
},
})
return m
}(),
keyRemapper: func(k string) string {
switch k {
case "test.str":
return "" // should be ignored
case "test.map.num":
return "k.map.num"
}
return k
},
// expected len is approximate and doesn't accout for entries
// removed via the key remapper
expectedLen: 8,
expectedDoc: func() Document {
var doc Document
doc.Add(key+".num", DoubleValue(1.1))
doc.Add(key+".bool", BoolValue(true))
doc.Add(key+".slice", ArrValue(IntValue(1), DoubleValue(2.1)))
doc.Add(key+".map.str", StringValue("def"))
doc.Add("k.map.num", IntValue(2))
doc.Add(key+".map.bool", BoolValue(false))
doc.Add(key+".map.slice", ArrValue(IntValue(3), IntValue(4)))

doc.Sort()
return doc
}(),
},
} {
t.Run(tc.name, func(t *testing.T) {
var actual Document
p := NewMapProcessor(tc.m, tc.keyRemapper)
p.Process(&actual, key)
actual.Sort()

assert.Equal(t, tc.expectedLen, p.Len())
assert.Equal(t, tc.expectedDoc, actual)
})
}
}
114 changes: 39 additions & 75 deletions exporter/elasticsearchexporter/internal/objmodel/objmodel.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
package objmodel // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/objmodel"

import (
"encoding/hex"
"io"
"math"
"slices"
Expand All @@ -43,7 +42,6 @@ import (
"github.com/elastic/go-structform"
"github.com/elastic/go-structform/json"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
)

// Document is an intermediate representation for converting open telemetry records with arbitrary attributes
Expand Down Expand Up @@ -75,12 +73,7 @@ type Value struct {
ts time.Time
arr []Value
doc Document

// Raw fields will be processed into one or multiple fields.
rawSpans ptrace.SpanEventSlice
rawVal pcommon.Value
rawM pcommon.Map
rawMMutator func(string, pcommon.Value)
processor processor
}

// Kind represent the internal kind of a value stored in a Document.
Expand All @@ -99,14 +92,18 @@ const (
KindObject
KindTimestamp
KindIgnore

// Raw values are processed when added and decomposed into
// normal key values before being appended to the document.
KindRawValue
KindRawMap
KindRawSpanEventSlice
KindProcessor
)

// processor defines an interface to allow adding in complex structures
// like OTLP maps and spans. Any processor type value must be processed
// into simpler value types and added to the document via the Process
// method. The process method is invoked by Add and AddMultiple.
type processor interface {
Len() int
Process(*Document, string)
}

const tsLayout = "2006-01-02T15:04:05.000000000Z"

var NilValue = Value{kind: KindNil}
Expand Down Expand Up @@ -149,16 +146,9 @@ func (doc *Document) AddMultiple(kvs ...KeyValue) {
var capacity int
for _, kv := range kvs {
switch kv.Value.kind {
case KindRawValue:
// TODO (lahsivjar): better estimation, value can be slice or map
capacity += 1
case KindRawMap:
capacity += kv.Value.rawM.Len()
case KindRawSpanEventSlice:
for i := 0; i < kv.Value.rawSpans.Len(); i++ {
// Each event adds timestamp and attributes to the doc
capacity += 1 + kv.Value.rawSpans.At(i).Attributes().Len()
}
case KindIgnore, KindNil:
case KindProcessor:
capacity += kv.Value.processor.Len()
default:
capacity += 1
}
Expand All @@ -167,19 +157,9 @@ func (doc *Document) AddMultiple(kvs ...KeyValue) {

for _, kv := range kvs {
switch kv.Value.kind {
case KindRawValue:
doc.AddAttribute(kv.Key, kv.Value.rawVal)
case KindRawMap:
doc.fields = appendAttributeFields(doc.fields, kv.Key, kv.Value.rawM)
case KindRawSpanEventSlice:
for i := 0; i < kv.Value.rawSpans.Len(); i++ {
e := kv.Value.rawSpans.At(i)
fkey := flattenKey(kv.Key, e.Name())
kv := NewKV(fkey+".time", TimestampValue(e.Timestamp()))

doc.fields = append(doc.fields, kv)
doc.AddAttributes(fkey, e.Attributes())
}
case KindIgnore, KindNil:
case KindProcessor:
kv.Value.processor.Process(doc, kv.Key)
default:
doc.fields = append(doc.fields, kv)
}
Expand Down Expand Up @@ -405,8 +385,8 @@ func StringValue(str string) Value {
return Value{kind: KindString, str: str}
}

// NonEmptyStringValue create a new value from a string.
func NonEmptyStringValue(str string) Value {
// NonZeroStringValue create a new string value if string is non zero.
func NonZeroStringValue(str string) Value {
if str == "" {
return NilValue
}
Expand All @@ -418,6 +398,14 @@ func IntValue(i int64) Value {
return Value{kind: KindInt, primitive: uint64(i)}
}

// NonZeroIntValue creates a new int value if int is non zero.
func NonZeroIntValue(i int64) Value {
if i == 0 {
return NilValue
}
return IntValue(i)
}

// DoubleValue creates a new value from a double value..
func DoubleValue(d float64) Value {
return Value{kind: KindDouble, dbl: d}
Expand All @@ -442,20 +430,18 @@ func TimestampValue(ts pcommon.Timestamp) Value {
return Value{kind: KindTimestamp, ts: ts.AsTime()}
}

// TraceIDValue creates a new value from a pcommon.TraceID.
func TraceIDValue(id pcommon.TraceID) Value {
if id.IsEmpty() {
return NilValue
}
return StringValue(hex.EncodeToString(id[:]))
// DocumentValue creates a new value from a document.
func DocumentValue(d Document) Value {
return Value{kind: KindObject, doc: d}
}

// SpanIDValue creates a new value from a pcommon.SpanID.
func SpanIDValue(id pcommon.SpanID) Value {
if id.IsEmpty() {
return NilValue
}
return StringValue(hex.EncodeToString(id[:]))
// ProcessorValue creates a processor type value. A Processor defines
// an interface to allow adding in complex structures like OTLP maps
// and spans. Any Processor type value must be processed into simpler
// value types and added to the document via the Process method using
// the various Add methods supported by the Document.
func ProcessorValue(p processor) Value {
return Value{kind: KindProcessor, processor: p}
}

// ValueFromAttribute converts a AttributeValue into a value.
Expand All @@ -480,24 +466,6 @@ func ValueFromAttribute(attr pcommon.Value) Value {
}
}

// RawMapValue adds a raw pcommon.Map to be processed and
// subsequently added to the document.
func RawMapValue(m pcommon.Map) Value {
return Value{kind: KindRawMap, rawM: m}
}

// RawMapValue adds a raw pcommon.Value to be processed and
// subsequently added to the document.
func RawValue(v pcommon.Value) Value {
return Value{kind: KindRawValue, rawVal: v}
}

// RawSpans adds a raw ptrace.SpanEventSlice to be processed and
// subsequently added to the document.
func RawSpans(s ptrace.SpanEventSlice) Value {
return Value{kind: KindRawSpanEventSlice, rawSpans: s}
}

// Sort recursively sorts all keys in docuemts held by the value.
func (v *Value) Sort() {
switch v.kind {
Expand Down Expand Up @@ -532,12 +500,8 @@ func (v *Value) IsEmpty() bool {
return len(v.arr) == 0
case KindObject:
return len(v.doc.fields) == 0
case KindRawMap:
return v.rawM.Len() == 0
case KindRawSpanEventSlice:
return v.rawSpans.Len() == 0
case KindRawValue:
return v.rawVal.Type() == pcommon.ValueTypeEmpty
case KindProcessor:
return v.processor.Len() == 0
default:
return false
}
Expand Down
Loading

0 comments on commit aee9a7e

Please sign in to comment.