diff --git a/.chloggen/add-logs-opensearch-exporter.yaml b/.chloggen/add-logs-opensearch-exporter.yaml new file mode 100644 index 000000000000..cf23fcf6ae89 --- /dev/null +++ b/.chloggen/add-logs-opensearch-exporter.yaml @@ -0,0 +1,10 @@ +# Use this changelog template to create an entry for release notes. + +change_type: enhancement +component: opensearchexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add log exporting capability to the opensearchexporter. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [23611] \ No newline at end of file diff --git a/exporter/opensearchexporter/README.md b/exporter/opensearchexporter/README.md index e048b4577d05..4b01d248c9c1 100644 --- a/exporter/opensearchexporter/README.md +++ b/exporter/opensearchexporter/README.md @@ -3,7 +3,7 @@ | Status | | | ------------- |-----------| -| Stability | [development]: traces | +| Stability | [development]: logs, traces | | Distributions | [] | | Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aexporter%2Fopensearch%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aexporter%2Fopensearch) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aexporter%2Fopensearch%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aexporter%2Fopensearch) | | [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@Aneurysm9](https://www.github.com/Aneurysm9), [@MitchellGale](https://www.github.com/MitchellGale), [@MaxKsyunz](https://www.github.com/MaxKsyunz), [@YANG-DB](https://www.github.com/YANG-DB) | @@ -32,6 +32,9 @@ Supports standard TLS settings as part of HTTP settings. See [TLS Configuration/ ### Timeout Options - `timeout` : See [timeout](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md) + +### Bulk Indexer Options +- `bulk_action` (optional): the [action](https://opensearch.org/docs/2.9/api-reference/document-apis/bulk/) for ingesting data. Only `create` and `index` are allowed here. ## Example ```yaml diff --git a/exporter/opensearchexporter/config.go b/exporter/opensearchexporter/config.go index 891a21212159..9fb127748eb4 100644 --- a/exporter/opensearchexporter/config.go +++ b/exporter/opensearchexporter/config.go @@ -5,6 +5,7 @@ package opensearchexporter // import "github.com/open-telemetry/opentelemetry-co import ( "errors" + "strings" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/exporter/exporterhelper" @@ -16,6 +17,12 @@ const ( // defaultDataset value is used as ssoTracesExporter.Dataset when component.Config.Dataset is not set. defaultDataset = "default" + + // defaultBulkAction value is used when component.Config.BulkAction is not set. + defaultBulkAction = "create" + + // defaultMappingMode value is used when component.Config.MappingSettings.Mode is not set. + defaultMappingMode = "ss4o" ) // Config defines configuration for OpenSearch exporter. @@ -23,27 +30,122 @@ type Config struct { confighttp.HTTPClientSettings `mapstructure:"http"` exporterhelper.RetrySettings `mapstructure:"retry_on_failure"` exporterhelper.TimeoutSettings `mapstructure:",squash"` - Namespace string `mapstructure:"namespace"` - Dataset string `mapstructure:"dataset"` + MappingsSettings `mapstructure:"mapping"` + + // The Observability indices would follow the recommended for immutable data stream ingestion pattern using + // the data_stream concepts. See https://opensearch.org/docs/latest/dashboards/im-dashboards/datastream/ + // Index pattern will follow the next naming template ss4o_{type}-{dataset}-{namespace} + Dataset string `mapstructure:"dataset"` + Namespace string `mapstructure:"namespace"` + + // LogsIndex configures the index, index alias, or data stream name logs should be indexed in. + // https://opensearch.org/docs/latest/im-plugin/index/ + // https://opensearch.org/docs/latest/dashboards/im-dashboards/datastream/ + LogsIndex string `mapstructure:"logs_index"` + + // BulkAction configures the action for ingesting data. Only `create` and `index` are allowed here. + // If not specified, the default value `create` will be used. + BulkAction string `mapstructure:"bulk_action"` } var ( - errConfigNoEndpoint = errors.New("endpoint must be specified") - errDatasetNoValue = errors.New("dataset must be specified") - errNamespaceNoValue = errors.New("namespace must be specified") + errConfigNoEndpoint = errors.New("endpoint must be specified") + errDatasetNoValue = errors.New("dataset must be specified") + errNamespaceNoValue = errors.New("namespace must be specified") + errBulkActionInvalid = errors.New("bulk_action can either be `create` or `index`") + errMappingModeInvalid = errors.New("mapping.mode is invalid") ) +type MappingsSettings struct { + // Mode configures the field mappings. + // Supported modes are the following: + // + // ss4o: exports logs in the Simple Schema for Observability standard. + // This mode is enabled by default. + // See: https://opensearch.org/docs/latest/observing-your-data/ss4o/ + // + // ecs: maps fields defined in the OpenTelemetry Semantic Conventions + // to the Elastic Common Schema. + // See: https://www.elastic.co/guide/en/ecs/current/index.html + // + // flatten_attributes: uses the ECS mapping but flattens all resource and + // log attributes in the record to the top-level. + Mode string `mapstructure:"mode"` + + // Additional field mappings. + Fields map[string]string `mapstructure:"fields"` + + // File to read additional fields mappings from. + File string `mapstructure:"file"` + + // Field to store timestamp in. If not set uses the default @timestamp + TimestampField string `mapstructure:"timestamp_field"` + + // Whether to store timestamp in Epoch miliseconds + UnixTimestamp bool `mapstructure:"unix_timestamp"` + + // Try to find and remove duplicate fields + Dedup bool `mapstructure:"dedup"` + + Dedot bool `mapstructure:"dedot"` +} + +type MappingMode int + +// Enum values for MappingMode. +const ( + MappingSS4O MappingMode = iota + MappingECS + MappingFlattenAttributes +) + +func (m MappingMode) String() string { + switch m { + case MappingSS4O: + return "ss4o" + case MappingECS: + return "ecs" + case MappingFlattenAttributes: + return "flatten_attributes" + default: + return "ss4o" + } +} + +var mappingModes = func() map[string]MappingMode { + table := map[string]MappingMode{} + for _, m := range []MappingMode{ + MappingECS, + MappingSS4O, + MappingFlattenAttributes, + } { + table[strings.ToLower(m.String())] = m + } + + return table +}() + // Validate validates the opensearch server configuration. func (cfg *Config) Validate() error { var multiErr []error if len(cfg.Endpoint) == 0 { multiErr = append(multiErr, errConfigNoEndpoint) } + if len(cfg.Dataset) == 0 { multiErr = append(multiErr, errDatasetNoValue) } if len(cfg.Namespace) == 0 { multiErr = append(multiErr, errNamespaceNoValue) } + + if cfg.BulkAction != "create" && cfg.BulkAction != "index" { + return errBulkActionInvalid + } + + if _, ok := mappingModes[cfg.MappingsSettings.Mode]; !ok { + multiErr = append(multiErr, errMappingModeInvalid) + } + return errors.Join(multiErr...) } diff --git a/exporter/opensearchexporter/config_test.go b/exporter/opensearchexporter/config_test.go index 1993f94ea3a8..4a6f27ff8e03 100644 --- a/exporter/opensearchexporter/config_test.go +++ b/exporter/opensearchexporter/config_test.go @@ -29,6 +29,7 @@ func TestLoadConfig(t *testing.T) { sampleEndpoint := "https://opensearch.example.com:9200" sampleCfg := withDefaultConfig(func(config *Config) { config.Endpoint = sampleEndpoint + config.BulkAction = defaultBulkAction }) maxIdleConns := 100 idleConnTimeout := 90 * time.Second @@ -66,6 +67,10 @@ func TestLoadConfig(t *testing.T) { Multiplier: 1.5, RandomizationFactor: 0.5, }, + BulkAction: defaultBulkAction, + MappingsSettings: MappingsSettings{ + Mode: "ss4o", + }, }, configValidateAssert: assert.NoError, }, @@ -91,6 +96,16 @@ func TestLoadConfig(t *testing.T) { return assert.ErrorContains(t, err, errNamespaceNoValue.Error()) }, }, + { + id: component.NewIDWithName(metadata.Type, "invalid_bulk_action"), + expected: withDefaultConfig(func(config *Config) { + config.Endpoint = sampleEndpoint + config.BulkAction = "delete" + }), + configValidateAssert: func(t assert.TestingT, err error, i ...interface{}) bool { + return assert.ErrorContains(t, err, errBulkActionInvalid.Error()) + }, + }, } for _, tt := range tests { diff --git a/exporter/opensearchexporter/encoder.go b/exporter/opensearchexporter/encoder.go new file mode 100644 index 000000000000..bc64ea1e9916 --- /dev/null +++ b/exporter/opensearchexporter/encoder.go @@ -0,0 +1,223 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package opensearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/opensearchexporter" + +import ( + "bytes" + "encoding/json" + "time" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/opensearchexporter/internal/objmodel" +) + +type mappingModel interface { + encodeLog(resource pcommon.Resource, + scope pcommon.InstrumentationScope, + schemaURL string, + record plog.LogRecord) ([]byte, error) + encodeTrace(resource pcommon.Resource, + scope pcommon.InstrumentationScope, + schemaURL string, + record ptrace.Span) ([]byte, error) +} + +// encodeModel supports multiple encoding OpenTelemetry signals to multiple schemas. +type encodeModel struct { + dedup bool + dedot bool + sso bool + flattenAttributes bool + timestampField string + unixTime bool + + dataset string + namespace string +} + +func (m *encodeModel) encodeLog(resource pcommon.Resource, + scope pcommon.InstrumentationScope, + schemaURL string, + record plog.LogRecord) ([]byte, error) { + if m.sso { + return m.encodeLogSSO(resource, scope, schemaURL, record) + } + + return m.encodeLogDataModel(resource, record) +} + +// encodeLogSSO encodes a plog.LogRecord following the Simple Schema for Observability. +// See: https://github.com/opensearch-project/opensearch-catalog/tree/main/docs/schema/observability +func (m *encodeModel) encodeLogSSO( + resource pcommon.Resource, + scope pcommon.InstrumentationScope, + schemaURL string, + record plog.LogRecord, +) ([]byte, error) { + sso := ssoRecord{} + sso.Attributes = record.Attributes().AsRaw() + sso.Body = record.Body().AsString() + + now := time.Now() + ts := record.Timestamp().AsTime() + sso.ObservedTimestamp = &now + sso.Timestamp = &ts + + sso.Resource = attributesToMapString(resource.Attributes()) + sso.SchemaURL = schemaURL + sso.SpanID = record.SpanID().String() + sso.TraceID = record.TraceID().String() + + ds := dataStream{} + if m.dataset != "" { + ds.Dataset = m.dataset + } + + if m.namespace != "" { + ds.Namespace = m.namespace + } + + if ds != (dataStream{}) { + ds.Type = "record" + sso.Attributes["data_stream"] = ds + } + + sso.InstrumentationScope.Name = scope.Name() + sso.InstrumentationScope.Version = scope.Version() + sso.InstrumentationScope.SchemaURL = schemaURL + sso.InstrumentationScope.Attributes = scope.Attributes().AsRaw() + + sso.Severity.Text = record.SeverityText() + sso.Severity.Number = int64(record.SeverityNumber()) + + return json.Marshal(sso) +} + +// encodeLogDataModel encodes a plog.LogRecord following the Log Data Model. +// See: https://github.com/open-telemetry/oteps/blob/main/text/logs/0097-log-data-model.md +func (m *encodeModel) encodeLogDataModel(resource pcommon.Resource, record plog.LogRecord) ([]byte, error) { + var document objmodel.Document + if m.flattenAttributes { + document = objmodel.DocumentFromAttributes(resource.Attributes()) + } else { + document.AddAttributes("Attributes", resource.Attributes()) + } + timestampField := "@timestamp" + + if m.timestampField != "" { + timestampField = m.timestampField + } + + if m.unixTime { + document.AddInt(timestampField, epochMilliTimestamp(record)) + } else { + document.AddTimestamp(timestampField, record.Timestamp()) + } + document.AddTraceID("TraceId", record.TraceID()) + document.AddSpanID("SpanId", record.SpanID()) + document.AddInt("TraceFlags", int64(record.Flags())) + document.AddString("SeverityText", record.SeverityText()) + document.AddInt("SeverityNumber", int64(record.SeverityNumber())) + document.AddAttribute("Body", record.Body()) + if m.flattenAttributes { + document.AddAttributes("", record.Attributes()) + } else { + document.AddAttributes("Attributes", record.Attributes()) + } + + if m.dedup { + document.Dedup() + } else if m.dedot { + document.Sort() + } + + var buf bytes.Buffer + err := document.Serialize(&buf, m.dedot) + return buf.Bytes(), err +} + +// encodeTrace encodes a ptrace.Span following the Simple Schema For Observability +// See: https://github.com/opensearch-project/opensearch-catalog/tree/main/docs/schema/observability +func (m *encodeModel) encodeTrace( + resource pcommon.Resource, + scope pcommon.InstrumentationScope, + schemaURL string, + span ptrace.Span, +) ([]byte, error) { + sso := ssoSpan{} + sso.Attributes = span.Attributes().AsRaw() + sso.DroppedAttributesCount = span.DroppedAttributesCount() + sso.DroppedEventsCount = span.DroppedEventsCount() + sso.DroppedLinksCount = span.DroppedLinksCount() + sso.EndTime = span.EndTimestamp().AsTime() + sso.Kind = span.Kind().String() + sso.Name = span.Name() + sso.ParentSpanID = span.ParentSpanID().String() + sso.Resource = attributesToMapString(resource.Attributes()) + sso.SpanID = span.SpanID().String() + sso.StartTime = span.StartTimestamp().AsTime() + sso.Status.Code = span.Status().Code().String() + sso.Status.Message = span.Status().Message() + sso.TraceID = span.TraceID().String() + sso.TraceState = span.TraceState().AsRaw() + + if span.Events().Len() > 0 { + sso.Events = make([]ssoSpanEvent, span.Events().Len()) + for i := 0; i < span.Events().Len(); i++ { + e := span.Events().At(i) + ssoEvent := &sso.Events[i] + ssoEvent.Attributes = e.Attributes().AsRaw() + ssoEvent.DroppedAttributesCount = e.DroppedAttributesCount() + ssoEvent.Name = e.Name() + ts := e.Timestamp().AsTime() + if ts.Unix() != 0 { + ssoEvent.Timestamp = &ts + } else { + now := time.Now() + ssoEvent.ObservedTimestamp = &now + } + } + } + + ds := dataStream{} + if m.dataset != "" { + ds.Dataset = m.dataset + } + + if m.namespace != "" { + ds.Namespace = m.namespace + } + + if ds != (dataStream{}) { + ds.Type = "span" + sso.Attributes["data_stream"] = ds + } + + sso.InstrumentationScope.Name = scope.Name() + sso.InstrumentationScope.DroppedAttributesCount = scope.DroppedAttributesCount() + sso.InstrumentationScope.Version = scope.Version() + sso.InstrumentationScope.SchemaURL = schemaURL + sso.InstrumentationScope.Attributes = scope.Attributes().AsRaw() + + if span.Links().Len() > 0 { + sso.Links = make([]ssoSpanLinks, span.Links().Len()) + for i := 0; i < span.Links().Len(); i++ { + link := span.Links().At(i) + ssoLink := &sso.Links[i] + ssoLink.Attributes = link.Attributes().AsRaw() + ssoLink.DroppedAttributesCount = link.DroppedAttributesCount() + ssoLink.TraceID = link.TraceID().String() + ssoLink.TraceState = link.TraceState().AsRaw() + ssoLink.SpanID = link.SpanID().String() + } + } + return json.Marshal(sso) +} + +func epochMilliTimestamp(record plog.LogRecord) int64 { + return record.Timestamp().AsTime().UnixMilli() +} diff --git a/exporter/opensearchexporter/factory.go b/exporter/opensearchexporter/factory.go index 500948763b16..b4bfdd2e359d 100644 --- a/exporter/opensearchexporter/factory.go +++ b/exporter/opensearchexporter/factory.go @@ -23,15 +23,18 @@ func NewFactory() exporter.Factory { metadata.Type, newDefaultConfig, exporter.WithTraces(createTracesExporter, metadata.TracesStability), + exporter.WithLogs(createLogsExporter, metadata.LogsStability), ) } func newDefaultConfig() component.Config { return &Config{ HTTPClientSettings: confighttp.NewDefaultHTTPClientSettings(), - Namespace: defaultNamespace, Dataset: defaultDataset, + Namespace: defaultNamespace, + BulkAction: defaultBulkAction, RetrySettings: exporterhelper.NewDefaultRetrySettings(), + MappingsSettings: MappingsSettings{Mode: defaultMappingMode}, } } @@ -51,3 +54,20 @@ func createTracesExporter(ctx context.Context, exporterhelper.WithRetry(c.RetrySettings), exporterhelper.WithTimeout(c.TimeoutSettings)) } + +func createLogsExporter(ctx context.Context, + set exporter.CreateSettings, + cfg component.Config) (exporter.Logs, error) { + c := cfg.(*Config) + le, e := newLogExporter(c, set) + if e != nil { + return nil, e + } + + return exporterhelper.NewLogsExporter(ctx, set, cfg, + le.pushLogData, + exporterhelper.WithStart(le.Start), + exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: true}), + exporterhelper.WithRetry(c.RetrySettings), + exporterhelper.WithTimeout(c.TimeoutSettings)) +} diff --git a/exporter/opensearchexporter/factory_test.go b/exporter/opensearchexporter/factory_test.go index 9331e616d1bf..3b1124928fb7 100644 --- a/exporter/opensearchexporter/factory_test.go +++ b/exporter/opensearchexporter/factory_test.go @@ -25,7 +25,7 @@ func TestFactory_CreateMetricsExporter_Fail(t *testing.T) { cfg := factory.CreateDefaultConfig() params := exportertest.NewNopCreateSettings() _, err := factory.CreateMetricsExporter(context.Background(), params, cfg) - require.Error(t, err, "expected an error when creating a traces exporter") + require.Error(t, err, "expected an error when creating a metrics exporter") } func TestFactory_CreateTracesExporter_Fail(t *testing.T) { @@ -36,6 +36,14 @@ func TestFactory_CreateTracesExporter_Fail(t *testing.T) { require.Error(t, err, "expected an error when creating a traces exporter") } +func TestFactory_CreateLogsExporter_Fail(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + params := exportertest.NewNopCreateSettings() + _, err := factory.CreateLogsExporter(context.Background(), params, cfg) + require.Error(t, err, "expected an error when creating a logs exporter") +} + func TestFactory_CreateTracesExporter(t *testing.T) { factory := NewFactory() cfg := withDefaultConfig(func(cfg *Config) { @@ -48,3 +56,16 @@ func TestFactory_CreateTracesExporter(t *testing.T) { require.NoError(t, exporter.Shutdown(context.TODO())) } + +func TestFactory_CreateLogsExporter(t *testing.T) { + factory := NewFactory() + cfg := withDefaultConfig(func(cfg *Config) { + cfg.Endpoint = "https://opensearch.example.com:9200" + }) + params := exportertest.NewNopCreateSettings() + exporter, err := factory.CreateLogsExporter(context.Background(), params, cfg) + require.NoError(t, err) + require.NotNil(t, exporter) + + require.NoError(t, exporter.Shutdown(context.TODO())) +} diff --git a/exporter/opensearchexporter/go.mod b/exporter/opensearchexporter/go.mod index 4cbc8aacb08e..bc387f5aec81 100644 --- a/exporter/opensearchexporter/go.mod +++ b/exporter/opensearchexporter/go.mod @@ -14,12 +14,18 @@ require ( go.opentelemetry.io/collector/exporter v0.88.0 go.opentelemetry.io/collector/pdata v1.0.0-rcv0017 go.uber.org/zap v1.26.0 - gopkg.in/yaml.v3 v3.0.1 +) + +require ( + github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.88.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) require ( github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/elastic/go-structform v0.0.10 github.com/felixge/httpsnoop v1.0.3 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/go-logr/logr v1.2.4 // indirect @@ -37,6 +43,7 @@ require ( github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.86.0 github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rs/cors v1.10.1 // indirect go.opencensus.io v0.24.0 // indirect @@ -60,3 +67,9 @@ require ( google.golang.org/grpc v1.59.0 // indirect google.golang.org/protobuf v1.31.0 // indirect ) + +replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal => ../../internal/coreinternal + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil diff --git a/exporter/opensearchexporter/go.sum b/exporter/opensearchexporter/go.sum index 4df2de52e5ec..08a14110c7a4 100644 --- a/exporter/opensearchexporter/go.sum +++ b/exporter/opensearchexporter/go.sum @@ -19,11 +19,14 @@ github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqy github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/elastic/go-structform v0.0.10 h1:oy08o/Ih2hHTkNcRY/1HhaYvIp5z6t8si8gnCJPDo1w= +github.com/elastic/go-structform v0.0.10/go.mod h1:CZWf9aIRYY5SuKSmOhtXScE5uQiLZNqAFnwKR4OrIM4= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= @@ -116,6 +119,7 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= diff --git a/exporter/opensearchexporter/integration_test.go b/exporter/opensearchexporter/integration_test.go index 2354309f895b..9c3fdfff29c9 100644 --- a/exporter/opensearchexporter/integration_test.go +++ b/exporter/opensearchexporter/integration_test.go @@ -16,11 +16,11 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter/exportertest" - "go.opentelemetry.io/collector/pdata/ptrace" - "gopkg.in/yaml.v3" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/golden" ) -func TestOpenSearchExporter(t *testing.T) { +func TestOpenSearchTraceExporter(t *testing.T) { type requestHandler struct { ValidateReceivedDocuments func(*testing.T, int, []map[string]any) ResponseJSONPath string @@ -98,7 +98,7 @@ func TestOpenSearchExporter(t *testing.T) { strMap := jsonData.(map[string]any) if actionData, isBulkAction := strMap["create"]; isBulkAction { - validateBulkAction(t, actionData.(map[string]any)) + validateBulkAction(t, "ss4o_traces-default-namespace", actionData.(map[string]any)) } else { rtn = append(rtn, strMap) } @@ -138,7 +138,7 @@ func TestOpenSearchExporter(t *testing.T) { require.NoError(t, err) // Load sample data - traces, err := readTraces(tc.TracePath) + traces, err := golden.ReadTraces(tc.TracePath) require.NoError(t, err) // Send it @@ -150,27 +150,139 @@ func TestOpenSearchExporter(t *testing.T) { } } -// validateBulkAction ensures the action JSON object is to the correct index. -func validateBulkAction(t *testing.T, strMap map[string]any) { - val, exists := strMap["_index"] - require.True(t, exists) - require.Equal(t, val, "ss4o_traces-default-namespace") -} +func TestOpenSearchLogExporter(t *testing.T) { + type requestHandler struct { + ValidateReceivedDocuments func(*testing.T, int, []map[string]any) + ResponseJSONPath string + } -// readTraces loads a yaml file at given filePath and converts the content to ptrace.Traces -func readTraces(filePath string) (ptrace.Traces, error) { - b, err := os.ReadFile(filePath) - if err != nil { - return ptrace.Traces{}, err + checkAndRespond := func(responsePath string) requestHandler { + pass := func(t *testing.T, _ int, docs []map[string]any) { + for _, doc := range docs { + require.NotEmpty(t, doc) + } + } + return requestHandler{pass, responsePath} + } + tests := []struct { + Label string + LogPath string + RequestHandlers []requestHandler + ValidateExporterReturn func(error) + }{ + { + "Round trip", + "testdata/logs-sample-a.yaml", + []requestHandler{ + checkAndRespond("testdata/opensearch-response-no-error.json"), + }, + func(err error) { + require.NoError(t, err) + }, + }, + { + "Permanent error", + "testdata/logs-sample-a.yaml", + []requestHandler{ + checkAndRespond("testdata/opensearch-response-permanent-error.json"), + }, + func(err error) { + require.True(t, consumererror.IsPermanent(err)) + }, + }, + { + "Retryable error", + "testdata/logs-sample-a.yaml", + []requestHandler{ + checkAndRespond("testdata/opensearch-response-retryable-error.json"), + checkAndRespond("testdata/opensearch-response-retryable-succeeded.json"), + }, + func(err error) { + require.NoError(t, err) + }, + }, + + { + "Retryable error, succeeds on second try", + "testdata/logs-sample-a.yaml", + []requestHandler{ + checkAndRespond("testdata/opensearch-response-retryable-error.json"), + checkAndRespond("testdata/opensearch-response-retryable-error-2-attempt.json"), + checkAndRespond("testdata/opensearch-response-retryable-succeeded.json"), + }, + func(err error) { + require.NoError(t, err) + }, + }, } - var m map[string]interface{} - if err = yaml.Unmarshal(b, &m); err != nil { - return ptrace.Traces{}, err + + getReceivedDocuments := func(body io.ReadCloser) []map[string]any { + var rtn []map[string]any + var err error + decoder := json.NewDecoder(body) + for decoder.More() { + var jsonData any + err = decoder.Decode(&jsonData) + require.NoError(t, err) + require.NotNil(t, jsonData) + + strMap := jsonData.(map[string]any) + if actionData, isBulkAction := strMap["create"]; isBulkAction { + validateBulkAction(t, "ss4o_logs-default-namespace", actionData.(map[string]any)) + } else { + rtn = append(rtn, strMap) + } + } + return rtn } - b, err = json.Marshal(m) - if err != nil { - return ptrace.Traces{}, err + + for _, tc := range tests { + // Create HTTP listener + var requestCount = 0 + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var err error + docs := getReceivedDocuments(r.Body) + require.LessOrEqualf(t, requestCount, len(tc.RequestHandlers), "Test case generated more requests than it has response for.") + tc.RequestHandlers[requestCount].ValidateReceivedDocuments(t, requestCount, docs) + + w.WriteHeader(200) + response, _ := os.ReadFile(tc.RequestHandlers[requestCount].ResponseJSONPath) + _, err = w.Write(response) + require.NoError(t, err) + + requestCount++ + })) + + cfg := withDefaultConfig(func(config *Config) { + config.Endpoint = ts.URL + config.TimeoutSettings.Timeout = 0 + }) + + // Create exporter + f := NewFactory() + exporter, err := f.CreateLogsExporter(context.Background(), exportertest.NewNopCreateSettings(), cfg) + require.NoError(t, err) + + // Initialize the exporter + err = exporter.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + + // Load sample data + logs, err := golden.ReadLogs(tc.LogPath) + require.NoError(t, err) + + // Send it + err = exporter.ConsumeLogs(context.Background(), logs) + tc.ValidateExporterReturn(err) + err = exporter.Shutdown(context.Background()) + require.NoError(t, err) + ts.Close() } - unmarshaler := ptrace.JSONUnmarshaler{} - return unmarshaler.UnmarshalTraces(b) +} + +// validateBulkAction ensures the JSON object is to the correct index. +func validateBulkAction(t *testing.T, expectedIndex string, strMap map[string]any) { + val, exists := strMap["_index"] + require.True(t, exists) + require.Equal(t, expectedIndex, val) } diff --git a/exporter/opensearchexporter/internal/metadata/generated_status.go b/exporter/opensearchexporter/internal/metadata/generated_status.go index 5e3920c28434..fb407b74435c 100644 --- a/exporter/opensearchexporter/internal/metadata/generated_status.go +++ b/exporter/opensearchexporter/internal/metadata/generated_status.go @@ -8,5 +8,6 @@ import ( const ( Type = "opensearch" + LogsStability = component.StabilityLevelDevelopment TracesStability = component.StabilityLevelDevelopment ) diff --git a/exporter/opensearchexporter/internal/objmodel/objmodel.go b/exporter/opensearchexporter/internal/objmodel/objmodel.go new file mode 100644 index 000000000000..efccbdc9f59e --- /dev/null +++ b/exporter/opensearchexporter/internal/objmodel/objmodel.go @@ -0,0 +1,535 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// The objmodel package provides tools for converting OpenTelemetry Log records into +// JSON documents. +// +// The JSON parsing in OpenSearch does not support parsing JSON documents +// with duplicate fields. The fields in the docuemt can be sort and duplicate entries +// can be removed before serializing. Deduplication ensures that ambigious +// events can still be indexed. +// +// With attributes map encoded as a list of key value +// pairs, we might find some structured loggers that create log records with +// duplicate fields. Although the AttributeMap wrapper tries to give a +// dictionary like view into the list, it is not 'complete'. When iterating the map +// for encoding, we still will encounter the duplicates. +// The AttributeMap helpers treat the first occurrence as the actual field. +// For high-performance structured loggers (e.g. zap) the AttributeMap +// semantics are not necessarily correct. Most often the last occurrence will be +// what we want to export, as the last occurrence represents the last overwrite +// within a context/dictionary (the leaf-logger its context). +// Some Loggers might even allow users to create a mix of dotted and dedotted fields. +// The Document type also tries to combine these into a proper structure, such that these mixed +// representations have a unique encoding only, which allows us to properly remove duplicates. +// +// The `.` is special to OpenSearch. In order to handle common prefixes and attributes +// being a mix of key value pairs with dots and complex objects, we flatten the document first +// before we deduplicate. Final dedotting is optional and only required when +// Ingest Node is used. But either way, we try to present only well formed +// document to OpenSearch. + +// nolint:errcheck +package objmodel // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/opensearchexporter/internal/objmodel" + +import ( + "encoding/hex" + "io" + "math" + "sort" + "strings" + "time" + + "github.com/elastic/go-structform" + "github.com/elastic/go-structform/json" + "go.opentelemetry.io/collector/pdata/pcommon" +) + +// Document is an intermediate representation for converting open telemetry records with arbitrary attributes +// into a JSON document that can be processed by OpenSearch. +type Document struct { + fields []field +} + +type field struct { + key string + value Value +} + +// Value type that can be added to a Document. +type Value struct { + kind Kind + primitive uint64 + dbl float64 + str string + arr []Value + doc Document + ts time.Time +} + +// Kind represent the internal kind of a value stored in a Document. +type Kind uint8 + +// Enum values for Kind. +const ( + KindNil Kind = iota + KindBool + KindInt + KindDouble + KindString + KindArr + KindObject + KindTimestamp + KindIgnore +) + +const tsLayout = "2006-01-02T15:04:05.000000000Z" + +var nilValue = Value{kind: KindNil} +var ignoreValue = Value{kind: KindIgnore} + +// DocumentFromAttributes creates a document from a OpenTelemetry attribute +// map. All nested maps will be flattened, with keys being joined using a `.` symbol. +func DocumentFromAttributes(am pcommon.Map) Document { + return DocumentFromAttributesWithPath("", am) +} + +// DocumentFromAttributesWithPath creates a document from a OpenTelemetry attribute +// map. All nested maps will be flattened, with keys being joined using a `.` symbol. +// +// All keys in the map will be prefixed with path. +func DocumentFromAttributesWithPath(path string, am pcommon.Map) Document { + if am.Len() == 0 { + return Document{} + } + + fields := make([]field, 0, am.Len()) + fields = appendAttributeFields(fields, path, am) + return Document{ + fields: fields, + } +} + +// AddTimestamp adds a raw timestamp value to the Document. +func (doc *Document) AddTimestamp(key string, ts pcommon.Timestamp) { + doc.Add(key, TimestampValue(ts.AsTime())) +} + +// Add adds a converted value to the document. +func (doc *Document) Add(key string, v Value) { + doc.fields = append(doc.fields, field{key: key, value: v}) +} + +// AddString adds a string to the document. +func (doc *Document) AddString(key string, v string) { + if v != "" { + doc.Add(key, StringValue(v)) + } +} + +// AddSpanID adds the hex presentation of a SpanID to the document. If the SpanID +// is empty, no value will be added. +func (doc *Document) AddSpanID(key string, id pcommon.SpanID) { + if !id.IsEmpty() { + doc.AddString(key, hex.EncodeToString(id[:])) + } +} + +// AddTraceID adds the hex presentation of a TraceID value to the document. If the TraceID +// is empty, no value will be added. +func (doc *Document) AddTraceID(key string, id pcommon.TraceID) { + if !id.IsEmpty() { + doc.AddString(key, hex.EncodeToString(id[:])) + } +} + +// AddInt adds an integer value to the document. +func (doc *Document) AddInt(key string, value int64) { + doc.Add(key, IntValue(value)) +} + +// AddAttributes expands and flattens all key-value pairs from the input attribute map into +// the document. +func (doc *Document) AddAttributes(key string, attributes pcommon.Map) { + doc.fields = appendAttributeFields(doc.fields, key, attributes) +} + +// AddAttribute converts and adds a AttributeValue to the document. If the attribute represents a map, +// the fields will be flattened. +func (doc *Document) AddAttribute(key string, attribute pcommon.Value) { + switch attribute.Type() { + case pcommon.ValueTypeEmpty: + // do not add 'null' + case pcommon.ValueTypeMap: + doc.AddAttributes(key, attribute.Map()) + default: + doc.Add(key+".value", ValueFromAttribute(attribute)) + } +} + +// Sort sorts all fields in the document by key name. +func (doc *Document) Sort() { + sort.SliceStable(doc.fields, func(i, j int) bool { + return doc.fields[i].key < doc.fields[j].key + }) + + for i := range doc.fields { + fld := &doc.fields[i] + fld.value.Sort() + } +} + +// Dedup removes fields from the document, that have duplicate keys. +// The filtering only keeps the last value for a key. +// +// Dedup ensure that keys are sorted. +func (doc *Document) Dedup() { + // 1. Always ensure the fields are sorted, Dedup support requires + // Fields to be sorted. + doc.Sort() + + // 2. rename fields if a primitive value is overwritten by an object. + // For example the pair (path.x=1, path.x.a="test") becomes: + // (path.x.value=1, path.x.a="test"). + // + // NOTE: We do the renaming, in order to preserve the original value + // in case of conflicts after dedotting, which would lead to the removal of the field. + // For example docker/k8s labels tend to use `.`, which need to be handled in case + // The collector does pass us these kind of labels as an AttributeMap. + // + // NOTE: If the embedded document already has a field name `value`, we will remove the renamed + // field in favor of the `value` field in the document. + // + // This step removes potential conflicts when dedotting and serializing fields. + var renamed bool + for i := 0; i < len(doc.fields)-1; i++ { + key, nextKey := doc.fields[i].key, doc.fields[i+1].key + if len(key) < len(nextKey) && strings.HasPrefix(nextKey, key) && nextKey[len(key)] == '.' { + renamed = true + doc.fields[i].key = key + ".value" + } + } + if renamed { + doc.Sort() + } + + // 3. mark duplicates as 'ignore' + // + // This step ensures that we do not have duplicate fields names when serializing. + // OpenSearch JSON parser will fail otherwise. + for i := 0; i < len(doc.fields)-1; i++ { + if doc.fields[i].key == doc.fields[i+1].key { + doc.fields[i].value = ignoreValue + } + } + + // 4. fix objects that might be stored in arrays + for i := range doc.fields { + doc.fields[i].value.Dedup() + } +} + +// Serialize writes the document to the given writer. The serializer will create nested objects if dedot is true. +// +// NOTE: The documented MUST be sorted if dedot is true. +func (doc *Document) Serialize(w io.Writer, dedot bool) error { + v := json.NewVisitor(w) + return doc.iterJSON(v, dedot) +} + +func (doc *Document) iterJSON(v *json.Visitor, dedot bool) error { + if dedot { + return doc.iterJSONDedot(v) + } + return doc.iterJSONFlat(v) +} + +func (doc *Document) iterJSONFlat(w *json.Visitor) error { + w.OnObjectStart(-1, structform.AnyType) + defer w.OnObjectFinished() + + for i := range doc.fields { + fld := &doc.fields[i] + if fld.value.IsEmpty() { + continue + } + + w.OnKey(fld.key) + if err := fld.value.iterJSON(w, true); err != nil { + return err + } + } + + return nil +} + +func (doc *Document) iterJSONDedot(w *json.Visitor) error { + objPrefix := "" + level := 0 + + if err := w.OnObjectStart(-1, structform.AnyType); err != nil { + return err + } + defer func() { + _ = w.OnObjectFinished() + }() + + for i := range doc.fields { + fld := &doc.fields[i] + if fld.value.IsEmpty() { + continue + } + + key := fld.key + // decrease object level until last reported and current key have the same path prefix + for L := commonObjPrefix(key, objPrefix); L < len(objPrefix); { + for L > 0 && key[L-1] != '.' { + L-- + } + + // remove levels and append write list of outstanding '}' into the writer + if L > 0 { + for delta := objPrefix[L:]; len(delta) > 0; { + idx := strings.IndexByte(delta, '.') + if idx < 0 { + break + } + + delta = delta[idx+1:] + level-- + if err := w.OnObjectFinished(); err != nil { + return err + } + } + + objPrefix = key[:L] + } else { // no common prefix, close all objects we reported so far. + for ; level > 0; level-- { + if err := w.OnObjectFinished(); err != nil { + return err + } + } + objPrefix = "" + } + } + + // increase object level up to current field + for { + start := len(objPrefix) + idx := strings.IndexByte(key[start:], '.') + if idx < 0 { + break + } + + level++ + objPrefix = key[:len(objPrefix)+idx+1] + fieldName := key[start : start+idx] + if err := w.OnKey(fieldName); err != nil { + return err + } + if err := w.OnObjectStart(-1, structform.AnyType); err != nil { + return err + } + } + + // report value + fieldName := key[len(objPrefix):] + if err := w.OnKey(fieldName); err != nil { + return err + } + if err := fld.value.iterJSON(w, true); err != nil { + return err + } + } + + // close all pending object levels + for ; level > 0; level-- { + if err := w.OnObjectFinished(); err != nil { + return err + } + } + + return nil +} + +// StringValue create a new value from a string. +func StringValue(str string) Value { return Value{kind: KindString, str: str} } + +// IntValue creates a new value from an integer. +func IntValue(i int64) Value { return Value{kind: KindInt, primitive: uint64(i)} } + +// DoubleValue creates a new value from a double value.. +func DoubleValue(d float64) Value { return Value{kind: KindDouble, dbl: d} } + +// BoolValue creates a new value from a double value.. +func BoolValue(b bool) Value { + var v uint64 + if b { + v = 1 + } + return Value{kind: KindBool, primitive: v} +} + +// ArrValue combines multiple values into an array value. +func ArrValue(values ...Value) Value { + return Value{kind: KindArr, arr: values} +} + +// TimestampValue create a new value from a time.Time. +func TimestampValue(ts time.Time) Value { + return Value{kind: KindTimestamp, ts: ts} +} + +// ValueFromAttribute converts a AttributeValue into a value. +func ValueFromAttribute(attr pcommon.Value) Value { + switch attr.Type() { + case pcommon.ValueTypeInt: + return IntValue(attr.Int()) + case pcommon.ValueTypeDouble: + return DoubleValue(attr.Double()) + case pcommon.ValueTypeStr: + return StringValue(attr.Str()) + case pcommon.ValueTypeBool: + return BoolValue(attr.Bool()) + case pcommon.ValueTypeSlice: + sub := arrFromAttributes(attr.Slice()) + return ArrValue(sub...) + case pcommon.ValueTypeMap: + sub := DocumentFromAttributes(attr.Map()) + return Value{kind: KindObject, doc: sub} + default: + return nilValue + } +} + +// Sort recursively sorts all keys in documents held by the value. +func (v *Value) Sort() { + switch v.kind { + case KindObject: + v.doc.Sort() + case KindArr: + for i := range v.arr { + v.arr[i].Sort() + } + } +} + +// Dedup recursively dedups keys in stored documents. +// +// NOTE: The value MUST be sorted. +func (v *Value) Dedup() { + switch v.kind { + case KindObject: + v.doc.Dedup() + case KindArr: + for i := range v.arr { + v.arr[i].Dedup() + } + } +} + +func (v *Value) IsEmpty() bool { + switch v.kind { + case KindNil, KindIgnore: + return true + case KindArr: + return len(v.arr) == 0 + case KindObject: + return len(v.doc.fields) == 0 + default: + return false + } +} + +func (v *Value) iterJSON(w *json.Visitor, dedot bool) error { + switch v.kind { + case KindNil: + return w.OnNil() + case KindBool: + return w.OnBool(v.primitive == 1) + case KindInt: + return w.OnInt64(int64(v.primitive)) + case KindDouble: + if math.IsNaN(v.dbl) || math.IsInf(v.dbl, 0) { + // NaN and Inf are undefined for JSON. Let's serialize to "null" + return w.OnNil() + } + return w.OnFloat64(v.dbl) + case KindString: + return w.OnString(v.str) + case KindTimestamp: + str := v.ts.UTC().Format(tsLayout) + return w.OnString(str) + case KindObject: + if len(v.doc.fields) == 0 { + return w.OnNil() + } + return v.doc.iterJSON(w, dedot) + case KindArr: + w.OnArrayStart(-1, structform.AnyType) + for i := range v.arr { + if err := v.arr[i].iterJSON(w, dedot); err != nil { + return err + } + } + w.OnArrayFinished() + } + + return nil +} + +func arrFromAttributes(aa pcommon.Slice) []Value { + if aa.Len() == 0 { + return nil + } + + values := make([]Value, aa.Len()) + for i := 0; i < aa.Len(); i++ { + values[i] = ValueFromAttribute(aa.At(i)) + } + return values +} + +func appendAttributeFields(fields []field, path string, am pcommon.Map) []field { + am.Range(func(k string, val pcommon.Value) bool { + fields = appendAttributeValue(fields, path, k, val) + return true + }) + return fields +} + +func appendAttributeValue(fields []field, path string, key string, attr pcommon.Value) []field { + if attr.Type() == pcommon.ValueTypeEmpty { + return fields + } + + if attr.Type() == pcommon.ValueTypeMap { + return appendAttributeFields(fields, flattenKey(path, key), attr.Map()) + } + + return append(fields, field{ + key: flattenKey(path, key), + value: ValueFromAttribute(attr), + }) +} + +func flattenKey(path, key string) string { + if path == "" { + return key + } + return path + "." + key +} + +func commonObjPrefix(a, b string) int { + end := len(a) + if alt := len(b); alt < end { + end = alt + } + + for i := 0; i < end; i++ { + if a[i] != b[i] { + return i + } + } + return end +} diff --git a/exporter/opensearchexporter/internal/objmodel/objmodel_test.go b/exporter/opensearchexporter/internal/objmodel/objmodel_test.go new file mode 100644 index 000000000000..f33aaa740754 --- /dev/null +++ b/exporter/opensearchexporter/internal/objmodel/objmodel_test.go @@ -0,0 +1,435 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package objmodel + +import ( + "math" + "strings" + "testing" + "time" + + "github.com/elastic/go-structform/json" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" +) + +var dijkstra = time.Date(1930, 5, 11, 16, 33, 11, 123456789, time.UTC) + +func TestObjectModel_CreateMap(t *testing.T) { + tests := map[string]struct { + build func() Document + want Document + }{ + "from empty map": { + build: func() Document { + return DocumentFromAttributes(pcommon.NewMap()) + }, + }, + "from map": { + build: func() Document { + m := pcommon.NewMap() + m.PutInt("i", 42) + m.PutStr("str", "test") + return DocumentFromAttributes(m) + }, + want: Document{[]field{{"i", IntValue(42)}, {"str", StringValue("test")}}}, + }, + "ignores nil values": { + build: func() Document { + m := pcommon.NewMap() + m.PutEmpty("null") + m.PutStr("str", "test") + return DocumentFromAttributes(m) + }, + want: Document{[]field{{"str", StringValue("test")}}}, + }, + "from map with prefix": { + build: func() Document { + m := pcommon.NewMap() + m.PutInt("i", 42) + m.PutStr("str", "test") + return DocumentFromAttributesWithPath("prefix", m) + }, + want: Document{[]field{{"prefix.i", IntValue(42)}, {"prefix.str", StringValue("test")}}}, + }, + "add attributes with key": { + build: func() (doc Document) { + m := pcommon.NewMap() + m.PutInt("i", 42) + m.PutStr("str", "test") + doc.AddAttributes("prefix", m) + return doc + }, + want: Document{[]field{{"prefix.i", IntValue(42)}, {"prefix.str", StringValue("test")}}}, + }, + "add attribute flattens a map value": { + build: func() (doc Document) { + mapVal := pcommon.NewValueMap() + m := mapVal.Map() + m.PutInt("i", 42) + m.PutStr("str", "test") + doc.AddAttribute("prefix", mapVal) + return doc + }, + want: Document{[]field{{"prefix.i", IntValue(42)}, {"prefix.str", StringValue("test")}}}, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + doc := test.build() + doc.Sort() + assert.Equal(t, test.want, doc) + }) + } +} + +func TestDocument_Sort(t *testing.T) { + tests := map[string]struct { + build func() Document + want Document + }{ + "keys are sorted": { + build: func() (doc Document) { + doc.AddInt("z", 26) + doc.AddInt("a", 1) + return doc + }, + want: Document{[]field{{"a", IntValue(1)}, {"z", IntValue(26)}}}, + }, + "sorting is stable": { + build: func() (doc Document) { + doc.AddInt("a", 1) + doc.AddInt("c", 3) + doc.AddInt("a", 2) + return doc + }, + want: Document{[]field{{"a", IntValue(1)}, {"a", IntValue(2)}, {"c", IntValue(3)}}}, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + doc := test.build() + doc.Sort() + assert.Equal(t, test.want, doc) + }) + } + +} + +func TestObjectModel_Dedup(t *testing.T) { + tests := map[string]struct { + build func() Document + want Document + }{ + "no duplicates": { + build: func() (doc Document) { + doc.AddInt("a", 1) + doc.AddInt("c", 3) + return doc + }, + want: Document{[]field{{"a", IntValue(1)}, {"c", IntValue(3)}}}, + }, + "duplicate keys": { + build: func() (doc Document) { + doc.AddInt("a", 1) + doc.AddInt("c", 3) + doc.AddInt("a", 2) + return doc + }, + want: Document{[]field{{"a", ignoreValue}, {"a", IntValue(2)}, {"c", IntValue(3)}}}, + }, + "duplicate after flattening from map: namespace object at end": { + build: func() Document { + am := pcommon.NewMap() + am.PutInt("namespace.a", 42) + am.PutStr("toplevel", "test") + am.PutEmptyMap("namespace").PutInt("a", 23) + return DocumentFromAttributes(am) + }, + want: Document{[]field{{"namespace.a", ignoreValue}, {"namespace.a", IntValue(23)}, {"toplevel", StringValue("test")}}}, + }, + "duplicate after flattening from map: namespace object at beginning": { + build: func() Document { + am := pcommon.NewMap() + am.PutEmptyMap("namespace").PutInt("a", 23) + am.PutInt("namespace.a", 42) + am.PutStr("toplevel", "test") + return DocumentFromAttributes(am) + }, + want: Document{[]field{{"namespace.a", ignoreValue}, {"namespace.a", IntValue(42)}, {"toplevel", StringValue("test")}}}, + }, + "dedup in arrays": { + build: func() (doc Document) { + var embedded Document + embedded.AddInt("a", 1) + embedded.AddInt("c", 3) + embedded.AddInt("a", 2) + + doc.Add("arr", ArrValue(Value{kind: KindObject, doc: embedded})) + return doc + }, + want: Document{[]field{{"arr", ArrValue(Value{kind: KindObject, doc: Document{[]field{ + {"a", ignoreValue}, + {"a", IntValue(2)}, + {"c", IntValue(3)}, + }}})}}}, + }, + "dedup mix of primitive and object lifts primitive": { + build: func() (doc Document) { + doc.AddInt("namespace", 1) + doc.AddInt("namespace.a", 2) + return doc + }, + want: Document{[]field{{"namespace.a", IntValue(2)}, {"namespace.value", IntValue(1)}}}, + }, + "dedup removes primitive if value exists": { + build: func() (doc Document) { + doc.AddInt("namespace", 1) + doc.AddInt("namespace.a", 2) + doc.AddInt("namespace.value", 3) + return doc + }, + want: Document{[]field{{"namespace.a", IntValue(2)}, {"namespace.value", ignoreValue}, {"namespace.value", IntValue(3)}}}, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + doc := test.build() + doc.Sort() + doc.Dedup() + assert.Equal(t, test.want, doc) + }) + } +} + +func TestValue_FromAttribute(t *testing.T) { + tests := map[string]struct { + in pcommon.Value + want Value + }{ + "null": { + in: pcommon.NewValueEmpty(), + want: nilValue, + }, + "string": { + in: pcommon.NewValueStr("test"), + want: StringValue("test"), + }, + "int": { + in: pcommon.NewValueInt(23), + want: IntValue(23), + }, + "double": { + in: pcommon.NewValueDouble(3.14), + want: DoubleValue(3.14), + }, + "bool": { + in: pcommon.NewValueBool(true), + want: BoolValue(true), + }, + "empty array": { + in: pcommon.NewValueSlice(), + want: Value{kind: KindArr}, + }, + "non-empty array": { + in: func() pcommon.Value { + v := pcommon.NewValueSlice() + tgt := v.Slice().AppendEmpty() + pcommon.NewValueInt(1).CopyTo(tgt) + return v + }(), + want: ArrValue(IntValue(1)), + }, + "empty map": { + in: pcommon.NewValueMap(), + want: Value{kind: KindObject}, + }, + "non-empty map": { + in: func() pcommon.Value { + v := pcommon.NewValueMap() + v.Map().PutInt("a", 1) + return v + }(), + want: Value{kind: KindObject, doc: Document{[]field{{"a", IntValue(1)}}}}, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + v := ValueFromAttribute(test.in) + assert.Equal(t, test.want, v) + }) + } +} + +func TestDocument_Serialize_Flat(t *testing.T) { + tests := map[string]struct { + attrs map[string]interface{} + want string + }{ + "no nesting with multiple fields": { + attrs: map[string]interface{}{ + "a": "test", + "b": 1, + }, + want: `{"a":"test","b":1}`, + }, + "shared prefix": { + attrs: map[string]interface{}{ + "a.str": "test", + "a.i": 1, + }, + want: `{"a.i":1,"a.str":"test"}`, + }, + "multiple namespaces with dot": { + attrs: map[string]interface{}{ + "a.str": "test", + "b.i": 1, + }, + want: `{"a.str":"test","b.i":1}`, + }, + "nested maps": { + attrs: map[string]interface{}{ + "a": map[string]interface{}{ + "str": "test", + "i": 1, + }, + }, + want: `{"a.i":1,"a.str":"test"}`, + }, + "multi-level nested namespace maps": { + attrs: map[string]interface{}{ + "a": map[string]interface{}{ + "b.str": "test", + "i": 1, + }, + }, + want: `{"a.b.str":"test","a.i":1}`, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + var buf strings.Builder + m := pcommon.NewMap() + assert.NoError(t, m.FromRaw(test.attrs)) + doc := DocumentFromAttributes(m) + doc.Dedup() + err := doc.Serialize(&buf, false) + require.NoError(t, err) + + assert.Equal(t, test.want, buf.String()) + }) + } +} + +func TestDocument_Serialize_Dedot(t *testing.T) { + tests := map[string]struct { + attrs map[string]interface{} + want string + }{ + "no nesting with multiple fields": { + attrs: map[string]interface{}{ + "a": "test", + "b": 1, + }, + want: `{"a":"test","b":1}`, + }, + "shared prefix": { + attrs: map[string]interface{}{ + "a.str": "test", + "a.i": 1, + }, + want: `{"a":{"i":1,"str":"test"}}`, + }, + "multiple namespaces": { + attrs: map[string]interface{}{ + "a.str": "test", + "b.i": 1, + }, + want: `{"a":{"str":"test"},"b":{"i":1}}`, + }, + "nested maps": { + attrs: map[string]interface{}{ + "a": map[string]interface{}{ + "str": "test", + "i": 1, + }, + }, + want: `{"a":{"i":1,"str":"test"}}`, + }, + "multi-level nested namespace maps": { + attrs: map[string]interface{}{ + "a": map[string]interface{}{ + "b.c.str": "test", + "i": 1, + }, + }, + want: `{"a":{"b":{"c":{"str":"test"}},"i":1}}`, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + var buf strings.Builder + m := pcommon.NewMap() + assert.NoError(t, m.FromRaw(test.attrs)) + doc := DocumentFromAttributes(m) + doc.Dedup() + err := doc.Serialize(&buf, true) + require.NoError(t, err) + + assert.Equal(t, test.want, buf.String()) + }) + } +} + +func TestValue_Serialize(t *testing.T) { + tests := map[string]struct { + value Value + want string + }{ + "nil value": {value: nilValue, want: "null"}, + "bool value: true": {value: BoolValue(true), want: "true"}, + "bool value: false": {value: BoolValue(false), want: "false"}, + "int value": {value: IntValue(42), want: "42"}, + "double value": {value: DoubleValue(3.14), want: "3.14"}, + "NaN is undefined": {value: DoubleValue(math.NaN()), want: "null"}, + "Inf is undefined": {value: DoubleValue(math.Inf(0)), want: "null"}, + "string value": {value: StringValue("Hello World!"), want: `"Hello World!"`}, + "timestamp": { + value: TimestampValue(dijkstra), + want: `"1930-05-11T16:33:11.123456789Z"`, + }, + "array": { + value: ArrValue(BoolValue(true), IntValue(23)), + want: `[true,23]`, + }, + "object": { + value: func() Value { + doc := Document{} + doc.AddString("a", "b") + return Value{kind: KindObject, doc: doc} + }(), + want: `{"a":"b"}`, + }, + "empty object": { + value: Value{kind: KindObject, doc: Document{}}, + want: "null", + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + var buf strings.Builder + err := test.value.iterJSON(json.NewVisitor(&buf), false) + require.NoError(t, err) + assert.Equal(t, test.want, buf.String()) + }) + } +} diff --git a/exporter/opensearchexporter/log_bulk_indexer.go b/exporter/opensearchexporter/log_bulk_indexer.go new file mode 100644 index 000000000000..99b9747b7743 --- /dev/null +++ b/exporter/opensearchexporter/log_bulk_indexer.go @@ -0,0 +1,142 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package opensearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/opensearchexporter" + +import ( + "bytes" + "context" + "errors" + + "github.com/opensearch-project/opensearch-go/v2" + "github.com/opensearch-project/opensearch-go/v2/opensearchutil" + "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" +) + +type logBulkIndexer struct { + index string + bulkAction string + model mappingModel + errs []error + bulkIndexer opensearchutil.BulkIndexer +} + +func newLogBulkIndexer(index, bulkAction string, model mappingModel) *logBulkIndexer { + return &logBulkIndexer{index, bulkAction, model, nil, nil} +} + +func (lbi *logBulkIndexer) start(client *opensearch.Client) error { + var startErr error + lbi.bulkIndexer, startErr = newLogOpenSearchBulkIndexer(client, lbi.onIndexerError) + return startErr +} + +func (lbi *logBulkIndexer) joinedError() error { + return errors.Join(lbi.errs...) +} + +func (lbi *logBulkIndexer) close(ctx context.Context) { + closeErr := lbi.bulkIndexer.Close(ctx) + if closeErr != nil { + lbi.errs = append(lbi.errs, closeErr) + } +} + +func (lbi *logBulkIndexer) onIndexerError(_ context.Context, indexerErr error) { + if indexerErr != nil { + lbi.appendPermanentError(consumererror.NewPermanent(indexerErr)) + } +} + +func (lbi *logBulkIndexer) appendPermanentError(e error) { + lbi.errs = append(lbi.errs, consumererror.NewPermanent(e)) +} + +func (lbi *logBulkIndexer) appendRetryLogError(err error, log plog.Logs) { + lbi.errs = append(lbi.errs, consumererror.NewLogs(err, log)) +} + +func (lbi *logBulkIndexer) submit(ctx context.Context, ld plog.Logs) { + forEachLog(ld, func(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, log plog.LogRecord) { + payload, err := lbi.model.encodeLog(resource, scope, scopeSchemaURL, log) + if err != nil { + lbi.appendPermanentError(err) + } else { + ItemFailureHandler := func(ctx context.Context, item opensearchutil.BulkIndexerItem, resp opensearchutil.BulkIndexerResponseItem, itemErr error) { + // Setup error handler. The handler handles the per item response status based on the + // selective ACKing in the bulk response. + lbi.processItemFailure(resp, itemErr, makeLog(resource, resourceSchemaURL, scope, scopeSchemaURL, log)) + } + bi := lbi.newBulkIndexerItem(payload) + bi.OnFailure = ItemFailureHandler + err = lbi.bulkIndexer.Add(ctx, bi) + if err != nil { + lbi.appendRetryLogError(err, makeLog(resource, resourceSchemaURL, scope, scopeSchemaURL, log)) + } + } + }) +} + +func makeLog(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, log plog.LogRecord) plog.Logs { + logs := plog.NewLogs() + rs := logs.ResourceLogs().AppendEmpty() + resource.CopyTo(rs.Resource()) + rs.SetSchemaUrl(resourceSchemaURL) + ss := rs.ScopeLogs().AppendEmpty() + + ss.SetSchemaUrl(scopeSchemaURL) + scope.CopyTo(ss.Scope()) + s := ss.LogRecords().AppendEmpty() + + log.CopyTo(s) + + return logs +} + +func (lbi *logBulkIndexer) processItemFailure(resp opensearchutil.BulkIndexerResponseItem, itemErr error, logs plog.Logs) { + switch { + case shouldRetryEvent(resp.Status): + // Recoverable OpenSearch error + lbi.appendRetryLogError(responseAsError(resp), logs) + case resp.Status != 0 && itemErr == nil: + // Non-recoverable OpenSearch error while indexing document + lbi.appendPermanentError(responseAsError(resp)) + default: + // Encoding error. We didn't even attempt to send the event + lbi.appendPermanentError(itemErr) + } +} + +func (lbi *logBulkIndexer) newBulkIndexerItem(document []byte) opensearchutil.BulkIndexerItem { + body := bytes.NewReader(document) + item := opensearchutil.BulkIndexerItem{Action: lbi.bulkAction, Index: lbi.index, Body: body} + return item +} + +func newLogOpenSearchBulkIndexer(client *opensearch.Client, onIndexerError func(context.Context, error)) (opensearchutil.BulkIndexer, error) { + return opensearchutil.NewBulkIndexer(opensearchutil.BulkIndexerConfig{ + NumWorkers: 1, + Client: client, + OnError: onIndexerError, + }) +} + +func forEachLog(ld plog.Logs, visitor func(pcommon.Resource, string, pcommon.InstrumentationScope, string, plog.LogRecord)) { + resourceLogs := ld.ResourceLogs() + for i := 0; i < resourceLogs.Len(); i++ { + il := resourceLogs.At(i) + resource := il.Resource() + scopeLogs := il.ScopeLogs() + for j := 0; j < scopeLogs.Len(); j++ { + scopeSpan := scopeLogs.At(j) + logs := scopeLogs.At(j).LogRecords() + + for k := 0; k < logs.Len(); k++ { + log := logs.At(k) + visitor(resource, il.SchemaUrl(), scopeSpan.Scope(), scopeSpan.SchemaUrl(), log) + } + } + } +} diff --git a/exporter/opensearchexporter/metadata.yaml b/exporter/opensearchexporter/metadata.yaml index da748b08060e..96fa9a12928c 100644 --- a/exporter/opensearchexporter/metadata.yaml +++ b/exporter/opensearchexporter/metadata.yaml @@ -3,6 +3,6 @@ type: opensearch status: class: exporter stability: - development: [traces] + development: [logs, traces] codeowners: active: [Aneurysm9, MitchellGale, MaxKsyunz, YANG-DB] \ No newline at end of file diff --git a/exporter/opensearchexporter/sso_log_exporter.go b/exporter/opensearchexporter/sso_log_exporter.go new file mode 100644 index 000000000000..24c709bbc1b2 --- /dev/null +++ b/exporter/opensearchexporter/sso_log_exporter.go @@ -0,0 +1,83 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package opensearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/opensearchexporter" + +import ( + "context" + "strings" + + "github.com/opensearch-project/opensearch-go/v2" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/pdata/plog" +) + +type logExporter struct { + client *opensearch.Client + Index string + bulkAction string + model mappingModel + httpSettings confighttp.HTTPClientSettings + telemetry component.TelemetrySettings +} + +func newLogExporter(cfg *Config, set exporter.CreateSettings) (*logExporter, error) { + if err := cfg.Validate(); err != nil { + return nil, err + } + + model := &encodeModel{ + dedup: cfg.Dedup, + dedot: cfg.Dedot, + sso: cfg.MappingsSettings.Mode == MappingSS4O.String(), + flattenAttributes: cfg.MappingsSettings.Mode == MappingFlattenAttributes.String(), + timestampField: cfg.MappingsSettings.TimestampField, + unixTime: cfg.MappingsSettings.UnixTimestamp, + dataset: cfg.Dataset, + namespace: cfg.Namespace, + } + + return &logExporter{ + telemetry: set.TelemetrySettings, + Index: getIndexName(cfg.Dataset, cfg.Namespace, cfg.LogsIndex), + bulkAction: cfg.BulkAction, + httpSettings: cfg.HTTPClientSettings, + model: model, + }, nil +} + +func (l *logExporter) Start(_ context.Context, host component.Host) error { + httpClient, err := l.httpSettings.ToClient(host, l.telemetry) + if err != nil { + return err + } + + client, err := newOpenSearchClient(l.httpSettings.Endpoint, httpClient, l.telemetry.Logger) + if err != nil { + return err + } + + l.client = client + return nil +} + +func (l *logExporter) pushLogData(ctx context.Context, ld plog.Logs) error { + indexer := newLogBulkIndexer(l.Index, l.bulkAction, l.model) + startErr := indexer.start(l.client) + if startErr != nil { + return startErr + } + indexer.submit(ctx, ld) + indexer.close(ctx) + return indexer.joinedError() +} + +func getIndexName(dataset, namespace, index string) string { + if len(index) != 0 { + return index + } + + return strings.Join([]string{"ss4o_logs", dataset, namespace}, "-") +} diff --git a/exporter/opensearchexporter/sso_model.go b/exporter/opensearchexporter/sso_model.go index 002e9501ca25..a9dfa2baa42a 100644 --- a/exporter/opensearchexporter/sso_model.go +++ b/exporter/opensearchexporter/sso_model.go @@ -43,13 +43,13 @@ type ssoSpan struct { SchemaURL string `json:"schemaUrl"` Version string `json:"version"` } `json:"instrumentationScope,omitempty"` - Kind string `json:"kind"` - Links []ssoSpanLinks `json:"links,omitempty"` - Name string `json:"name"` - ParentSpanID string `json:"parentSpanId"` - Resource map[string]any `json:"resource,omitempty"` - SpanID string `json:"spanId"` - StartTime time.Time `json:"startTime"` + Kind string `json:"kind"` + Links []ssoSpanLinks `json:"links,omitempty"` + Name string `json:"name"` + ParentSpanID string `json:"parentSpanId"` + Resource map[string]string `json:"resource,omitempty"` + SpanID string `json:"spanId"` + StartTime time.Time `json:"startTime"` Status struct { Code string `json:"code"` Message string `json:"message"` @@ -58,3 +58,24 @@ type ssoSpan struct { TraceID string `json:"traceId"` TraceState string `json:"traceState"` } + +type ssoRecord struct { + Attributes map[string]any `json:"attributes,omitempty"` + Body string `json:"body"` + InstrumentationScope struct { + Attributes map[string]any `json:"attributes,omitempty"` + Name string `json:"name,omitempty"` + SchemaURL string `json:"schemaUrl,omitempty"` + Version string `json:"version,omitempty"` + } `json:"instrumentationScope,omitempty"` + ObservedTimestamp *time.Time `json:"observedTimestamp,omitempty"` + Resource map[string]string `json:"resource,omitempty"` + SchemaURL string `json:"schemaUrl,omitempty"` + Severity struct { + Text string `json:"text,omitempty"` + Number int64 `json:"number,omitempty"` + } `json:"severity"` + SpanID string `json:"spanId,omitempty"` + Timestamp *time.Time `json:"@timestamp"` + TraceID string `json:"traceId,omitempty"` +} diff --git a/exporter/opensearchexporter/sso_trace_exporter.go b/exporter/opensearchexporter/sso_trace_exporter.go index f279166955ac..1b2885766c36 100644 --- a/exporter/opensearchexporter/sso_trace_exporter.go +++ b/exporter/opensearchexporter/sso_trace_exporter.go @@ -19,6 +19,8 @@ type ssoTracesExporter struct { client *opensearch.Client Namespace string Dataset string + bulkAction string + model mappingModel httpSettings confighttp.HTTPClientSettings telemetry component.TelemetrySettings } @@ -28,10 +30,17 @@ func newSSOTracesExporter(cfg *Config, set exporter.CreateSettings) (*ssoTracesE return nil, err } + model := &encodeModel{ + dataset: cfg.Dataset, + namespace: cfg.Namespace, + } + return &ssoTracesExporter{ telemetry: set.TelemetrySettings, Namespace: cfg.Namespace, Dataset: cfg.Dataset, + bulkAction: cfg.BulkAction, + model: model, httpSettings: cfg.HTTPClientSettings, }, nil } @@ -52,7 +61,7 @@ func (s *ssoTracesExporter) Start(_ context.Context, host component.Host) error } func (s *ssoTracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) error { - indexer := newTraceBulkIndexer(s.Dataset, s.Namespace) + indexer := newTraceBulkIndexer(s.Dataset, s.Namespace, s.bulkAction, s.model) startErr := indexer.start(s.client) if startErr != nil { return startErr diff --git a/exporter/opensearchexporter/testdata/config.yaml b/exporter/opensearchexporter/testdata/config.yaml index 1e01a85980e4..a187af23318e 100644 --- a/exporter/opensearchexporter/testdata/config.yaml +++ b/exporter/opensearchexporter/testdata/config.yaml @@ -19,6 +19,11 @@ opensearch/empty_dataset: http: endpoint: https://opensearch.example.com:9200 +opensearch/invalid_bulk_action: + bulk_action: "delete" + http: + endpoint: https://opensearch.example.com:9200 + opensearch/trace: dataset: ngnix namespace: eu diff --git a/exporter/opensearchexporter/testdata/logs-sample-a.yaml b/exporter/opensearchexporter/testdata/logs-sample-a.yaml new file mode 100644 index 000000000000..2b5bc743aa19 --- /dev/null +++ b/exporter/opensearchexporter/testdata/logs-sample-a.yaml @@ -0,0 +1,187 @@ +resourceLogs: + - resource: + attributes: + - key: resource.required + value: + stringValue: foo + - key: resource.optional + value: + stringValue: bar + scopeLogs: + - logRecords: + - attributes: + - key: log.required + value: + stringValue: foo + - key: log.optional + value: + stringValue: bar + body: + stringValue: This is a log message + spanId: "" + timeUnixNano: "1581452773000000789" + traceId: "" + - attributes: + - key: log.required + value: + stringValue: foo + - key: log.optional + value: + stringValue: notbar + body: + stringValue: This is a log message + spanId: "" + timeUnixNano: "1581452773000000789" + traceId: "" + - attributes: + - key: log.required + value: + stringValue: notfoo + body: + stringValue: This is a log message + spanId: "" + timeUnixNano: "1581452773000000789" + traceId: "" + - body: + stringValue: This is a log message + spanId: "" + timeUnixNano: "1581452773000000789" + traceId: "" + scope: {} + - resource: + attributes: + - key: resource.required + value: + stringValue: foo + - key: resource.optional + value: + stringValue: notbar + scopeLogs: + - logRecords: + - attributes: + - key: log.required + value: + stringValue: foo + - key: log.optional + value: + stringValue: bar + body: + stringValue: This is a log message + spanId: "" + timeUnixNano: "1581452773000000789" + traceId: "" + - attributes: + - key: log.required + value: + stringValue: foo + - key: log.optional + value: + stringValue: notbar + body: + stringValue: This is a log message + spanId: "" + timeUnixNano: "1581452773000000789" + traceId: "" + - attributes: + - key: log.required + value: + stringValue: notfoo + body: + stringValue: This is a log message + spanId: "" + timeUnixNano: "1581452773000000789" + traceId: "" + - body: + stringValue: This is a log message + spanId: "" + timeUnixNano: "1581452773000000789" + traceId: "" + scope: {} + - resource: + attributes: + - key: resource.required + value: + stringValue: notfoo + scopeLogs: + - logRecords: + - attributes: + - key: log.required + value: + stringValue: foo + - key: log.optional + value: + stringValue: bar + body: + stringValue: This is a log message + spanId: "" + timeUnixNano: "1581452773000000789" + traceId: "" + - attributes: + - key: log.required + value: + stringValue: foo + - key: log.optional + value: + stringValue: notbar + body: + stringValue: This is a log message + spanId: "" + timeUnixNano: "1581452773000000789" + traceId: "" + - attributes: + - key: log.required + value: + stringValue: notfoo + body: + stringValue: This is a log message + spanId: "" + timeUnixNano: "1581452773000000789" + traceId: "" + - body: + stringValue: This is a log message + spanId: "" + timeUnixNano: "1581452773000000789" + traceId: "" + scope: {} + - resource: {} + scopeLogs: + - logRecords: + - attributes: + - key: log.required + value: + stringValue: foo + - key: log.optional + value: + stringValue: bar + body: + stringValue: This is a log message + spanId: "" + timeUnixNano: "1581452773000000789" + traceId: "" + - attributes: + - key: log.required + value: + stringValue: foo + - key: log.optional + value: + stringValue: notbar + body: + stringValue: This is a log message + spanId: "" + timeUnixNano: "1581452773000000789" + traceId: "" + - attributes: + - key: log.required + value: + stringValue: notfoo + body: + stringValue: This is a log message + spanId: "" + timeUnixNano: "1581452773000000789" + traceId: "" + - body: + stringValue: This is a log message + spanId: "" + timeUnixNano: "1581452773000000789" + traceId: "" + scope: {} diff --git a/exporter/opensearchexporter/trace_bulk_indexer.go b/exporter/opensearchexporter/trace_bulk_indexer.go index 5c0861e9b0f3..6f373f5a00f8 100644 --- a/exporter/opensearchexporter/trace_bulk_indexer.go +++ b/exporter/opensearchexporter/trace_bulk_indexer.go @@ -9,7 +9,6 @@ import ( "encoding/json" "errors" "strings" - "time" "github.com/opensearch-project/opensearch-go/v2" "github.com/opensearch-project/opensearch-go/v2/opensearchutil" @@ -21,12 +20,14 @@ import ( type traceBulkIndexer struct { dataset string namespace string + bulkAction string + model mappingModel errs []error bulkIndexer opensearchutil.BulkIndexer } -func newTraceBulkIndexer(dataset string, namespace string) *traceBulkIndexer { - return &traceBulkIndexer{dataset, namespace, nil, nil} +func newTraceBulkIndexer(dataset string, namespace string, bulkAction string, model mappingModel) *traceBulkIndexer { + return &traceBulkIndexer{dataset, namespace, bulkAction, model, nil, nil} } func (tbi *traceBulkIndexer) joinedError() error { @@ -62,7 +63,7 @@ func (tbi *traceBulkIndexer) appendRetryTraceError(err error, trace ptrace.Trace func (tbi *traceBulkIndexer) submit(ctx context.Context, td ptrace.Traces) { forEachSpan(td, func(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, span ptrace.Span) { - payload, err := tbi.createJSON(resource, scope, scopeSchemaURL, span) + payload, err := tbi.model.encodeTrace(resource, scope, scopeSchemaURL, span) if err != nil { tbi.appendPermanentError(err) } else { @@ -97,82 +98,6 @@ func makeTrace(resource pcommon.Resource, resourceSchemaURL string, scope pcommo return traces } -func (tbi *traceBulkIndexer) createJSON( - resource pcommon.Resource, - scope pcommon.InstrumentationScope, - schemaURL string, - span ptrace.Span, -) ([]byte, error) { - sso := ssoSpan{} - sso.Attributes = span.Attributes().AsRaw() - sso.DroppedAttributesCount = span.DroppedAttributesCount() - sso.DroppedEventsCount = span.DroppedEventsCount() - sso.DroppedLinksCount = span.DroppedLinksCount() - sso.EndTime = span.EndTimestamp().AsTime() - sso.Kind = span.Kind().String() - sso.Name = span.Name() - sso.ParentSpanID = span.ParentSpanID().String() - sso.Resource = resource.Attributes().AsRaw() - sso.SpanID = span.SpanID().String() - sso.StartTime = span.StartTimestamp().AsTime() - sso.Status.Code = span.Status().Code().String() - sso.Status.Message = span.Status().Message() - sso.TraceID = span.TraceID().String() - sso.TraceState = span.TraceState().AsRaw() - - if span.Events().Len() > 0 { - sso.Events = make([]ssoSpanEvent, span.Events().Len()) - for i := 0; i < span.Events().Len(); i++ { - e := span.Events().At(i) - ssoEvent := &sso.Events[i] - ssoEvent.Attributes = e.Attributes().AsRaw() - ssoEvent.DroppedAttributesCount = e.DroppedAttributesCount() - ssoEvent.Name = e.Name() - ts := e.Timestamp().AsTime() - if ts.Unix() != 0 { - ssoEvent.Timestamp = &ts - } else { - now := time.Now() - ssoEvent.ObservedTimestamp = &now - } - } - } - - ds := dataStream{} - if tbi.dataset != "" { - ds.Dataset = tbi.dataset - } - - if tbi.namespace != "" { - ds.Namespace = tbi.namespace - } - - if ds != (dataStream{}) { - ds.Type = "span" - sso.Attributes["data_stream"] = ds - } - - sso.InstrumentationScope.Name = scope.Name() - sso.InstrumentationScope.DroppedAttributesCount = scope.DroppedAttributesCount() - sso.InstrumentationScope.Version = scope.Version() - sso.InstrumentationScope.SchemaURL = schemaURL - sso.InstrumentationScope.Attributes = scope.Attributes().AsRaw() - - if span.Links().Len() > 0 { - sso.Links = make([]ssoSpanLinks, span.Links().Len()) - for i := 0; i < span.Links().Len(); i++ { - link := span.Links().At(i) - ssoLink := &sso.Links[i] - ssoLink.Attributes = link.Attributes().AsRaw() - ssoLink.DroppedAttributesCount = link.DroppedAttributesCount() - ssoLink.TraceID = link.TraceID().String() - ssoLink.TraceState = link.TraceState().AsRaw() - ssoLink.SpanID = link.SpanID().String() - } - } - return json.Marshal(sso) -} - func (tbi *traceBulkIndexer) processItemFailure(resp opensearchutil.BulkIndexerResponseItem, itemErr error, traces ptrace.Traces) { switch { case shouldRetryEvent(resp.Status): @@ -193,6 +118,15 @@ func responseAsError(item opensearchutil.BulkIndexerResponseItem) error { return errors.New(string(errorJSON)) } +func attributesToMapString(attributes pcommon.Map) map[string]string { + m := make(map[string]string, attributes.Len()) + attributes.Range(func(k string, v pcommon.Value) bool { + m[k] = v.AsString() + return true + }) + return m +} + func shouldRetryEvent(status int) bool { var retryOnStatus = []int{500, 502, 503, 504, 429} for _, retryable := range retryOnStatus { @@ -205,7 +139,7 @@ func shouldRetryEvent(status int) bool { func (tbi *traceBulkIndexer) newBulkIndexerItem(document []byte) opensearchutil.BulkIndexerItem { body := bytes.NewReader(document) - item := opensearchutil.BulkIndexerItem{Action: "create", Index: tbi.getIndexName(), Body: body} + item := opensearchutil.BulkIndexerItem{Action: tbi.bulkAction, Index: tbi.getIndexName(), Body: body} return item }