Skip to content

Commit

Permalink
[exporter/elasticsearch] Flesh out encoding log records with `mapping…
Browse files Browse the repository at this point in the history
….mode: ecs` (open-telemetry#31694)

**Description:** <Describe what has changed.>

This PR is a follow up to
open-telemetry#31553.
In that PR, the foundations were laid for encoding log records with ECS
fields when the user specifies `mapping.mode: ecs` in their exporter
configuration.

This PR adds ECS conversions for several additional fields.

I expect there will be more followup PR(s) to add more conversions,
especially ones that might not be straightforward.

**Link to tracking issue:** open-telemetry#29742 

**Testing:** Added unit tests

**Documentation:** The `mapping.mode: ecs` configuration setting is
already
[documented](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/exporter/elasticsearchexporter#configuration-options).

---------

Co-authored-by: Andrzej Stencel <[email protected]>
  • Loading branch information
ycombinator and andrzej-stencel authored May 7, 2024
1 parent 391429f commit 3987074
Show file tree
Hide file tree
Showing 5 changed files with 779 additions and 71 deletions.
27 changes: 27 additions & 0 deletions .chloggen/exporter-elasticsarch-more-ecs-mode.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Converts more SemConv fields in OTel events to ECS fields in Elasticsearch documents when `mapping.mode: ecs` is specified."

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31694]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
2 changes: 1 addition & 1 deletion exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ This exporter supports sending OpenTelemetry logs and traces to [Elasticsearch](
- `mode` (default=none): The fields naming mode. valid modes are:
- `none`: Use original fields and event structure from the OTLP event.
- `ecs`: Try to map fields defined in the
[OpenTelemetry Semantic Conventions](https://github.com/open-telemetry/semantic-conventions)
[OpenTelemetry Semantic Conventions](https://github.com/open-telemetry/semantic-conventions) (version 1.22.0)
to [Elastic Common Schema (ECS)](https://www.elastic.co/guide/en/ecs/current/index.html). :warning: This mode's behavior is unstable, it is currently undergoing changes
- `raw`: Omit the `Attributes.` string prefixed to field names for log and
span attributes as well as omit the `Events.` string prefixed to
Expand Down
12 changes: 6 additions & 6 deletions exporter/elasticsearchexporter/logs_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func TestExporter_PushEvent(t *testing.T) {
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
rec.Record(docs)

expected := `{"@timestamp":"1970-01-01T00:00:00.000000000Z","application":"myapp","attrKey1":"abc","attrKey2":"def","error":{"stack_trace":"no no no no"},"message":"hello world","service":{"name":"myservice"}}`
expected := `{"attrKey1":"abc","attrKey2":"def","application":"myapp","service":{"name":"myservice"},"error":{"stacktrace":"no no no no"},"agent":{"name":"otlp"},"@timestamp":"1970-01-01T00:00:00.000000000Z","message":"hello world"}`
actual := string(docs[0].Document)
assert.Equal(t, expected, actual)

Expand All @@ -187,14 +187,14 @@ func TestExporter_PushEvent(t *testing.T) {
mustSendLogsWithAttributes(t, exporter,
// record attrs
map[string]string{
"application": "myapp",
"service.name": "myservice",
"application": "myapp",
"service.name": "myservice",
"exception.stacktrace": "no no no no",
},
// resource attrs
map[string]string{
"attrKey1": "abc",
"attrKey2": "def",
"exception.stacktrace": "no no no no",
"attrKey1": "abc",
"attrKey2": "def",
},
// record body
"hello world",
Expand Down
277 changes: 214 additions & 63 deletions exporter/elasticsearchexporter/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,49 @@ package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry
import (
"bytes"
"encoding/json"
"fmt"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/ptrace"
semconv "go.opentelemetry.io/collector/semconv/v1.22.0"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/objmodel"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil"
)

// resourceAttrsConversionMap contains conversions for resource-level attributes
// from their Semantic Conventions (SemConv) names to equivalent Elastic Common
// Schema (ECS) names.
// If the ECS field name is specified as an empty string (""), the converter will
// neither convert the SemConv key to the equivalent ECS name nor pass-through the
// SemConv key as-is to become the ECS name.
var resourceAttrsConversionMap = map[string]string{
semconv.AttributeServiceInstanceID: "service.node.name",
semconv.AttributeDeploymentEnvironment: "service.environment",
semconv.AttributeTelemetrySDKName: "",
semconv.AttributeTelemetrySDKLanguage: "",
semconv.AttributeTelemetrySDKVersion: "",
semconv.AttributeTelemetryDistroName: "",
semconv.AttributeTelemetryDistroVersion: "",
semconv.AttributeCloudPlatform: "cloud.service.name",
semconv.AttributeContainerImageTags: "container.image.tag",
semconv.AttributeHostName: "host.hostname",
semconv.AttributeHostArch: "host.architecture",
semconv.AttributeProcessExecutablePath: "process.executable",
semconv.AttributeProcessRuntimeName: "service.runtime.name",
semconv.AttributeProcessRuntimeVersion: "service.runtime.version",
semconv.AttributeOSName: "host.os.name",
semconv.AttributeOSType: "host.os.platform",
semconv.AttributeOSDescription: "host.os.full",
semconv.AttributeOSVersion: "host.os.version",
"k8s.namespace.name": "kubernetes.namespace",
"k8s.node.name": "kubernetes.node.name",
"k8s.pod.name": "kubernetes.pod.name",
"k8s.pod.uid": "kubernetes.pod.uid",
}

type mappingModel interface {
encodeLog(pcommon.Resource, plog.LogRecord, pcommon.InstrumentationScope) ([]byte, error)
encodeSpan(pcommon.Resource, ptrace.Span, pcommon.InstrumentationScope) ([]byte, error)
Expand All @@ -41,82 +74,86 @@ const (

func (m *encodeModel) encodeLog(resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope) ([]byte, error) {
var document objmodel.Document

switch m.mode {
case MappingECS:
if record.Timestamp() != 0 {
document.AddTimestamp("@timestamp", record.Timestamp())
} else {
document.AddTimestamp("@timestamp", record.ObservedTimestamp())
}

document.AddTraceID("trace.id", record.TraceID())
document.AddSpanID("span.id", record.SpanID())

if n := record.SeverityNumber(); n != plog.SeverityNumberUnspecified {
document.AddInt("event.severity", int64(record.SeverityNumber()))
}

document.AddString("log.level", record.SeverityText())
document = m.encodeLogECSMode(resource, record, scope)
default:
document = m.encodeLogDefaultMode(resource, record, scope)
}

if record.Body().Type() == pcommon.ValueTypeStr {
document.AddAttribute("message", record.Body())
}
var buf bytes.Buffer
err := document.Serialize(&buf, m.dedot)
return buf.Bytes(), err
}

fieldMapper := func(k string) string {
switch k {
case "exception.type":
return "error.type"
case "exception.message":
return "error.message"
case "exception.stacktrace":
return "error.stack_trace"
default:
return k
}
}
func (m *encodeModel) encodeLogDefaultMode(resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope) objmodel.Document {
var document objmodel.Document

resource.Attributes().Range(func(k string, v pcommon.Value) bool {
k = fieldMapper(k)
document.AddAttribute(k, v)
return true
})
scope.Attributes().Range(func(k string, v pcommon.Value) bool {
k = fieldMapper(k)
document.AddAttribute(k, v)
return true
})
record.Attributes().Range(func(k string, v pcommon.Value) bool {
k = fieldMapper(k)
document.AddAttribute(k, v)
return true
})
default:
docTimeStamp := record.Timestamp()
if docTimeStamp.AsTime().UnixNano() == 0 {
docTimeStamp = record.ObservedTimestamp()
}
document.AddTimestamp("@timestamp", docTimeStamp) // We use @timestamp in order to ensure that we can index if the default data stream logs template is used.
document.AddTraceID("TraceId", record.TraceID())
document.AddSpanID("SpanId", record.SpanID())
document.AddInt("TraceFlags", int64(record.Flags()))
document.AddString("SeverityText", record.SeverityText())
document.AddInt("SeverityNumber", int64(record.SeverityNumber()))
document.AddAttribute("Body", record.Body())
m.encodeAttributes(&document, record.Attributes())
document.AddAttributes("Resource", resource.Attributes())
document.AddAttributes("Scope", scopeToAttributes(scope))
docTimeStamp := record.Timestamp()
if docTimeStamp.AsTime().UnixNano() == 0 {
docTimeStamp = record.ObservedTimestamp()
}
document.AddTimestamp("@timestamp", docTimeStamp) // We use @timestamp in order to ensure that we can index if the default data stream logs template is used.
document.AddTraceID("TraceId", record.TraceID())
document.AddSpanID("SpanId", record.SpanID())
document.AddInt("TraceFlags", int64(record.Flags()))
document.AddString("SeverityText", record.SeverityText())
document.AddInt("SeverityNumber", int64(record.SeverityNumber()))
document.AddAttribute("Body", record.Body())
m.encodeAttributes(&document, record.Attributes())
document.AddAttributes("Resource", resource.Attributes())
document.AddAttributes("Scope", scopeToAttributes(scope))

if m.dedup {
document.Dedup()
} else if m.dedot {
document.Sort()
}

var buf bytes.Buffer
err := document.Serialize(&buf, m.dedot)
return buf.Bytes(), err
return document

}

func (m *encodeModel) encodeLogECSMode(resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope) objmodel.Document {
var document objmodel.Document

// First, try to map resource-level attributes to ECS fields.
encodeLogAttributesECSMode(&document, resource.Attributes(), resourceAttrsConversionMap)

// Then, try to map scope-level attributes to ECS fields.
scopeAttrsConversionMap := map[string]string{
// None at the moment
}
encodeLogAttributesECSMode(&document, scope.Attributes(), scopeAttrsConversionMap)

// Finally, try to map record-level attributes to ECS fields.
recordAttrsConversionMap := map[string]string{
"event.name": "event.action",
semconv.AttributeExceptionMessage: "error.message",
semconv.AttributeExceptionStacktrace: "error.stacktrace",
semconv.AttributeExceptionType: "error.type",
semconv.AttributeExceptionEscaped: "event.error.exception.handled",
}
encodeLogAttributesECSMode(&document, record.Attributes(), recordAttrsConversionMap)

// Handle special cases.
encodeLogAgentNameECSMode(&document, resource)
encodeLogAgentVersionECSMode(&document, resource)
encodeLogHostOsTypeECSMode(&document, resource)
encodeLogTimestampECSMode(&document, record)
document.AddTraceID("trace.id", record.TraceID())
document.AddSpanID("span.id", record.SpanID())
if n := record.SeverityNumber(); n != plog.SeverityNumberUnspecified {
document.AddInt("event.severity", int64(record.SeverityNumber()))
}

document.AddString("log.level", record.SeverityText())

if record.Body().Type() == pcommon.ValueTypeStr {
document.AddAttribute("message", record.Body())
}

return document
}

func (m *encodeModel) encodeSpan(resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope) ([]byte, error) {
Expand Down Expand Up @@ -193,3 +230,117 @@ func scopeToAttributes(scope pcommon.InstrumentationScope) pcommon.Map {
}
return attrs
}

func encodeLogAttributesECSMode(document *objmodel.Document, attrs pcommon.Map, conversionMap map[string]string) {
if len(conversionMap) == 0 {
// No conversions to be done; add all attributes at top level of
// document.
document.AddAttributes("", attrs)
return
}

attrs.Range(func(k string, v pcommon.Value) bool {
// If ECS key is found for current k in conversion map, use it.
if ecsKey, exists := conversionMap[k]; exists {
if ecsKey == "" {
// Skip the conversion for this k.
return true
}

document.AddAttribute(ecsKey, v)
return true
}

// Otherwise, add key at top level with attribute name as-is.
document.AddAttribute(k, v)
return true
})
}

func encodeLogAgentNameECSMode(document *objmodel.Document, resource pcommon.Resource) {
// Parse out telemetry SDK name, language, and distro name from resource
// attributes, setting defaults as needed.
telemetrySdkName := "otlp"
var telemetrySdkLanguage, telemetryDistroName string

attrs := resource.Attributes()
if v, exists := attrs.Get(semconv.AttributeTelemetrySDKName); exists {
telemetrySdkName = v.Str()
}
if v, exists := attrs.Get(semconv.AttributeTelemetrySDKLanguage); exists {
telemetrySdkLanguage = v.Str()
}
if v, exists := attrs.Get(semconv.AttributeTelemetryDistroName); exists {
telemetryDistroName = v.Str()
if telemetrySdkLanguage == "" {
telemetrySdkLanguage = "unknown"
}
}

// Construct agent name from telemetry SDK name, language, and distro name.
agentName := telemetrySdkName
if telemetryDistroName != "" {
agentName = fmt.Sprintf("%s/%s/%s", agentName, telemetrySdkLanguage, telemetryDistroName)
} else if telemetrySdkLanguage != "" {
agentName = fmt.Sprintf("%s/%s", agentName, telemetrySdkLanguage)
}

// Set agent name in document.
document.AddString("agent.name", agentName)
}

func encodeLogAgentVersionECSMode(document *objmodel.Document, resource pcommon.Resource) {
attrs := resource.Attributes()

if telemetryDistroVersion, exists := attrs.Get(semconv.AttributeTelemetryDistroVersion); exists {
document.AddString("agent.version", telemetryDistroVersion.Str())
return
}

if telemetrySdkVersion, exists := attrs.Get(semconv.AttributeTelemetrySDKVersion); exists {
document.AddString("agent.version", telemetrySdkVersion.Str())
return
}
}

func encodeLogHostOsTypeECSMode(document *objmodel.Document, resource pcommon.Resource) {
// https://www.elastic.co/guide/en/ecs/current/ecs-os.html#field-os-type:
//
// "One of these following values should be used (lowercase): linux, macos, unix, windows.
// If the OS you’re dealing with is not in the list, the field should not be populated."

var ecsHostOsType string
if semConvOsType, exists := resource.Attributes().Get(semconv.AttributeOSType); exists {
switch semConvOsType.Str() {
case "windows", "linux":
ecsHostOsType = semConvOsType.Str()
case "darwin":
ecsHostOsType = "macos"
case "aix", "hpux", "solaris":
ecsHostOsType = "unix"
}
}

if semConvOsName, exists := resource.Attributes().Get(semconv.AttributeOSName); exists {
switch semConvOsName.Str() {
case "Android":
ecsHostOsType = "android"
case "iOS":
ecsHostOsType = "ios"
}
}

if ecsHostOsType == "" {
return
}
document.AddString("host.os.type", ecsHostOsType)
}

func encodeLogTimestampECSMode(document *objmodel.Document, record plog.LogRecord) {
if record.Timestamp() != 0 {
document.AddTimestamp("@timestamp", record.Timestamp())
return
}

document.AddTimestamp("@timestamp", record.ObservedTimestamp())
}
Loading

0 comments on commit 3987074

Please sign in to comment.