Skip to content

Commit

Permalink
[chore][exporter/elasticsearch] Optimize allocations for encoded doc
Browse files Browse the repository at this point in the history
  • Loading branch information
lahsivjar committed Jun 12, 2024
1 parent 2fdd7e5 commit d61f5b8
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 33 deletions.
1 change: 1 addition & 0 deletions exporter/elasticsearchexporter/integrationtest/.gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
results/*
*.test
18 changes: 18 additions & 0 deletions exporter/elasticsearchexporter/internal/objmodel/objmodel.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"encoding/hex"
"io"
"math"
"slices"
"sort"
"strings"
"time"
Expand All @@ -51,6 +52,11 @@ type Document struct {
fields []field
}

// EnsureCapacity ensures that the document has capacity to add n more fields.
func (d *Document) EnsureCapacity(n int) {
d.fields = slices.Grow(d.fields, n)
}

type field struct {
key string
value Value
Expand Down Expand Up @@ -149,6 +155,7 @@ func (doc *Document) AddInt(key string, value int64) {
// AddAttributes expands and flattens all key-value pairs from the input attribute map into
// the document.
func (doc *Document) AddAttributes(key string, attributes pcommon.Map) {
doc.EnsureCapacity(attributes.Len())
doc.fields = appendAttributeFields(doc.fields, key, attributes)
}

Expand All @@ -167,6 +174,7 @@ func (doc *Document) AddAttribute(key string, attribute pcommon.Value) {

// AddEvents converts and adds span events to the document.
func (doc *Document) AddEvents(key string, events ptrace.SpanEventSlice) {
doc.EnsureCapacity(EstimateEventFields(events))
for i := 0; i < events.Len(); i++ {
e := events.At(i)
doc.AddTimestamp(flattenKey(key, e.Name()+".time"), e.Timestamp())
Expand Down Expand Up @@ -367,6 +375,16 @@ func (doc *Document) iterJSONDedot(w *json.Visitor) error {
return nil
}

// EstimateEventFields estimates the number of fields that would be added to
// the document for the given slice of spans.
func EstimateEventFields(events ptrace.SpanEventSlice) (count int) {
for i := 0; i < events.Len(); i++ {
// Timestamp and attributes are added as fields
count += 1 + events.At(i).Attributes().Len()
}
return
}

// StringValue create a new value from a string.
func StringValue(str string) Value { return Value{kind: KindString, str: str} }

Expand Down
98 changes: 65 additions & 33 deletions exporter/elasticsearchexporter/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,17 @@ var resourceAttrsConversionMap = map[string]string{
"k8s.pod.uid": "kubernetes.pod.uid",
}

// resourceAttrsConversionMap contains conversions for log record attributes
// from their Semantic Conventions (SemConv) names to equivalent Elastic Common
// Schema (ECS) names.
var recordAttrsConversionMap = map[string]string{
"event.name": "event.action",
semconv.AttributeExceptionMessage: "error.message",
semconv.AttributeExceptionStacktrace: "error.stacktrace",
semconv.AttributeExceptionType: "error.type",
semconv.AttributeExceptionEscaped: "event.error.exception.handled",
}

type mappingModel interface {
encodeLog(pcommon.Resource, plog.LogRecord, pcommon.InstrumentationScope) ([]byte, error)
encodeSpan(pcommon.Resource, ptrace.Span, pcommon.InstrumentationScope) ([]byte, error)
Expand Down Expand Up @@ -94,6 +105,16 @@ func (m *encodeModel) encodeLog(resource pcommon.Resource, record plog.LogRecord
func (m *encodeModel) encodeLogDefaultMode(resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope) objmodel.Document {
var document objmodel.Document

// Appoximate capacity to reduce allocations
resourceAttrs := resource.Attributes()
scopeAttrs := scope.Attributes()
recordAttrs := record.Attributes()
document.EnsureCapacity(9 + // constant number of fields added
resourceAttrs.Len() +
scopeAttrs.Len() +
recordAttrs.Len(),
)

docTimeStamp := record.Timestamp()
if docTimeStamp.AsTime().UnixNano() == 0 {
docTimeStamp = record.ObservedTimestamp()
Expand All @@ -105,9 +126,14 @@ func (m *encodeModel) encodeLogDefaultMode(resource pcommon.Resource, record plo
document.AddString("SeverityText", record.SeverityText())
document.AddInt("SeverityNumber", int64(record.SeverityNumber()))
document.AddAttribute("Body", record.Body())
m.encodeAttributes(&document, record.Attributes())
document.AddAttributes("Resource", resource.Attributes())
document.AddAttributes("Scope", scopeToAttributes(scope))
// Add scope name and version as additional scope attributes.
// Empty values are also allowed to be added.
document.Add("Scope.name", objmodel.StringValue(scope.Name()))
document.Add("Scope.version", objmodel.StringValue(scope.Version()))

document.AddAttributes("Resource", resourceAttrs)
document.AddAttributes("Scope", scopeAttrs)
m.encodeAttributes(&document, recordAttrs)

return document

Expand All @@ -116,24 +142,27 @@ func (m *encodeModel) encodeLogDefaultMode(resource pcommon.Resource, record plo
func (m *encodeModel) encodeLogECSMode(resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope) objmodel.Document {
var document objmodel.Document

// Appoximate capacity to reduce allocations
resourceAttrs := resource.Attributes()
scopeAttrs := scope.Attributes()
recordAttrs := record.Attributes()
document.EnsureCapacity(9 + // constant number of fields added
resourceAttrs.Len() +
scopeAttrs.Len() +
recordAttrs.Len(),
)

// First, try to map resource-level attributes to ECS fields.
encodeLogAttributesECSMode(&document, resource.Attributes(), resourceAttrsConversionMap)
encodeLogAttributesECSMode(&document, resourceAttrs, resourceAttrsConversionMap)

// Then, try to map scope-level attributes to ECS fields.
scopeAttrsConversionMap := map[string]string{
// None at the moment
}
encodeLogAttributesECSMode(&document, scope.Attributes(), scopeAttrsConversionMap)
encodeLogAttributesECSMode(&document, scopeAttrs, scopeAttrsConversionMap)

// Finally, try to map record-level attributes to ECS fields.
recordAttrsConversionMap := map[string]string{
"event.name": "event.action",
semconv.AttributeExceptionMessage: "error.message",
semconv.AttributeExceptionStacktrace: "error.stacktrace",
semconv.AttributeExceptionType: "error.type",
semconv.AttributeExceptionEscaped: "event.error.exception.handled",
}
encodeLogAttributesECSMode(&document, record.Attributes(), recordAttrsConversionMap)
encodeLogAttributesECSMode(&document, recordAttrs, recordAttrsConversionMap)

// Handle special cases.
encodeLogAgentNameECSMode(&document, resource)
Expand All @@ -142,21 +171,29 @@ func (m *encodeModel) encodeLogECSMode(resource pcommon.Resource, record plog.Lo
encodeLogTimestampECSMode(&document, record)
document.AddTraceID("trace.id", record.TraceID())
document.AddSpanID("span.id", record.SpanID())
document.AddString("log.level", record.SeverityText())
document.AddAttribute("message", record.Body())
if n := record.SeverityNumber(); n != plog.SeverityNumberUnspecified {
document.AddInt("event.severity", int64(record.SeverityNumber()))
}

document.AddString("log.level", record.SeverityText())

if record.Body().Type() == pcommon.ValueTypeStr {
document.AddAttribute("message", record.Body())
}

return document
}

func (m *encodeModel) encodeSpan(resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope) ([]byte, error) {
var document objmodel.Document

// Appoximate capacity to reduce allocations
resourceAttrs := resource.Attributes()
scopeAttrs := scope.Attributes()
spanAttrs := span.Attributes()
document.EnsureCapacity(13 + // constant number of fields added
resourceAttrs.Len() +
scopeAttrs.Len() +
spanAttrs.Len() +
objmodel.EstimateEventFields(span.Events()),
)

document.AddTimestamp("@timestamp", span.StartTimestamp()) // We use @timestamp in order to ensure that we can index if the default data stream logs template is used.
document.AddTimestamp("EndTimestamp", span.EndTimestamp())
document.AddTraceID("TraceId", span.TraceID())
Expand All @@ -167,11 +204,16 @@ func (m *encodeModel) encodeSpan(resource pcommon.Resource, span ptrace.Span, sc
document.AddInt("TraceStatus", int64(span.Status().Code()))
document.AddString("TraceStatusDescription", span.Status().Message())
document.AddString("Link", spanLinksToString(span.Links()))
m.encodeAttributes(&document, span.Attributes())
document.AddAttributes("Resource", resource.Attributes())
m.encodeEvents(&document, span.Events())
document.AddInt("Duration", durationAsMicroseconds(span.StartTimestamp().AsTime(), span.EndTimestamp().AsTime())) // unit is microseconds
document.AddAttributes("Scope", scopeToAttributes(scope))
// Add scope name and version as additional scope attributes
// Empty values are also allowed to be added.
document.Add("Scope.name", objmodel.StringValue(scope.Name()))
document.Add("Scope.version", objmodel.StringValue(scope.Version()))

m.encodeEvents(&document, span.Events())
document.AddAttributes("Resource", resourceAttrs)
document.AddAttributes("Scope", scopeAttrs)
m.encodeAttributes(&document, spanAttrs)

if m.dedup {
document.Dedup()
Expand Down Expand Up @@ -220,16 +262,6 @@ func durationAsMicroseconds(start, end time.Time) int64 {
return (end.UnixNano() - start.UnixNano()) / 1000
}

func scopeToAttributes(scope pcommon.InstrumentationScope) pcommon.Map {
attrs := pcommon.NewMap()
attrs.PutStr("name", scope.Name())
attrs.PutStr("version", scope.Version())
for k, v := range scope.Attributes().AsRaw() {
attrs.PutStr(k, v.(string))
}
return attrs
}

func encodeLogAttributesECSMode(document *objmodel.Document, attrs pcommon.Map, conversionMap map[string]string) {
if len(conversionMap) == 0 {
// No conversions to be done; add all attributes at top level of
Expand Down

0 comments on commit d61f5b8

Please sign in to comment.