diff --git a/.chloggen/issue-20660-datasetexporter-logs-traces-initial-code.yaml b/.chloggen/issue-20660-datasetexporter-logs-traces-initial-code.yaml new file mode 100755 index 000000000000..2ca08438a9f8 --- /dev/null +++ b/.chloggen/issue-20660-datasetexporter-logs-traces-initial-code.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: datasetexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add support for exporting logs and traces. + +# One or more tracking issues related to the change +issues: [20660] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/cmd/otelcontribcol/go.mod b/cmd/otelcontribcol/go.mod index 4e87499b88f9..da2fde058ca0 100644 --- a/cmd/otelcontribcol/go.mod +++ b/cmd/otelcontribcol/go.mod @@ -319,6 +319,7 @@ require ( github.com/containerd/ttrpc v1.1.0 // indirect github.com/coreos/go-oidc v2.2.1+incompatible // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect + github.com/cskr/pubsub v1.0.2 // indirect github.com/cyphar/filepath-securejoin v0.2.3 // indirect github.com/danieljoos/wincred v1.1.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect @@ -551,6 +552,7 @@ require ( github.com/rs/cors v1.9.0 // indirect github.com/samber/lo v1.37.0 // indirect github.com/scaleway/scaleway-sdk-go v1.0.0-beta.14 // indirect + github.com/scalyr/dataset-go v0.0.7 // indirect github.com/seccomp/libseccomp-golang v0.9.2-0.20220502022130-f33da4d89646 // indirect github.com/secure-systems-lab/go-securesystemslib v0.4.0 // indirect github.com/segmentio/asm v1.2.0 // indirect diff --git a/cmd/otelcontribcol/go.sum b/cmd/otelcontribcol/go.sum index 0f38d6613a95..95b9c6f53c9d 100644 --- a/cmd/otelcontribcol/go.sum +++ b/cmd/otelcontribcol/go.sum @@ -978,6 +978,8 @@ github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7Do github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/creack/pty v1.1.18/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4= github.com/crossdock/crossdock-go v0.0.0-20160816171116-049aabb0122b/go.mod h1:v9FBN7gdVTpiD/+LZ7Po0UKvROyT87uLVxTHVky/dlQ= +github.com/cskr/pubsub v1.0.2 h1:vlOzMhl6PFn60gRlTQQsIfVwaPB/B/8MziK8FhEPt/0= +github.com/cskr/pubsub v1.0.2/go.mod h1:/8MzYXk/NJAz782G8RPkFzXTZVu63VotefPnR9TIRis= github.com/cyphar/filepath-securejoin v0.2.3 h1:YX6ebbZCZP7VkM3scTTokDgBL2TY741X51MTk3ycuNI= github.com/cyphar/filepath-securejoin v0.2.3/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxGGx79pTxQpKOJNYHHl4= github.com/daixiang0/gci v0.2.8/go.mod h1:+4dZ7TISfSmqfAGv59ePaHfNzgGtIkHAhhdKggP1JAc= @@ -1748,6 +1750,7 @@ github.com/jaegertracing/jaeger v1.22.0/go.mod h1:WnwW68MjJEViSLRQhe0nkIsBDaF3Cz github.com/jaegertracing/jaeger v1.38.0/go.mod h1:4MBTMxfCp3d4buDLxRlHnESQvTFCkN16OUIeE9BEdl4= github.com/jaegertracing/jaeger v1.41.0 h1:vVNky8dP46M2RjGaZ7qRENqylW+tBFay3h57N16Ip7M= github.com/jaegertracing/jaeger v1.41.0/go.mod h1:SIkAT75iVmA9U+mESGYuMH6UQv6V9Qy4qxo0lwfCQAc= +github.com/jarcoal/httpmock v1.3.0 h1:2RJ8GP0IIaWwcC9Fp2BmVi8Kog3v2Hn7VXM3fTd+nuc= github.com/jawher/mow.cli v1.0.4/go.mod h1:5hQj2V8g+qYmLUVWqu4Wuja1pI57M83EChYLVZ0sMKk= github.com/jawher/mow.cli v1.2.0/go.mod h1:y+pcA3jBAdo/GIZx/0rFjw/K2bVEODP9rfZOfaiq8Ko= github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= @@ -1944,6 +1947,7 @@ github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5 github.com/matttproud/golang_protobuf_extensions v1.0.2/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= +github.com/maxatome/go-testdeep v1.13.0 h1:EBmRelH7MhMfPvA+0kXAeOeJUXn3mzul5NmvjLDcQZI= github.com/mbilski/exhaustivestruct v1.2.0/go.mod h1:OeTBVxQWoEmB2J2JCHmXWPJ0aksxSUOUy+nvtVEfzXc= github.com/mgechev/dots v0.0.0-20190921121421-c36f7dcfbb81/go.mod h1:KQ7+USdGKfpPjXk4Ga+5XxQM4Lm4e3gAogrreFAYpOg= github.com/mgechev/revive v1.0.3/go.mod h1:POGGZagSo/0frdr7VeAifzS5Uka0d0GPiM35MsTO8nE= @@ -2348,6 +2352,8 @@ github.com/sanposhiho/wastedassign v0.1.3/go.mod h1:LGpq5Hsv74QaqM47WtIsRSF/ik9k github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/scaleway/scaleway-sdk-go v1.0.0-beta.14 h1:yFl3jyaSVLNYXlnNYM5z2pagEk1dYQhfr1p20T1NyKY= github.com/scaleway/scaleway-sdk-go v1.0.0-beta.14/go.mod h1:fCa7OJZ/9DRTnOKmxvT6pn+LPWUptQAmHF/SBJUGEcg= +github.com/scalyr/dataset-go v0.0.7 h1:Uz+b2iMcIzp0C3ppK4OWs3mUWnB7WGN7SEJVvm0V/Jk= +github.com/scalyr/dataset-go v0.0.7/go.mod h1:J8ioLMhd4bUoaPdXJOHQiQI/XVs0kG/kAuVsN8buLBQ= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/seccomp/libseccomp-golang v0.9.2-0.20220502022130-f33da4d89646 h1:RpforrEYXWkmGwJHIGnLZ3tTWStkjVVstwzNGqxX2Ds= diff --git a/exporter/datasetexporter/README.md b/exporter/datasetexporter/README.md index 6f6d303a3d3a..221bfd1f9dce 100644 --- a/exporter/datasetexporter/README.md +++ b/exporter/datasetexporter/README.md @@ -3,7 +3,7 @@ | Status | | | ------------- |-----------| -| Stability | [development]: traces, logs | +| Stability | [development]: logs, traces | | Distributions | [] | [development]: https://github.com/open-telemetry/opentelemetry-collector#development @@ -17,13 +17,22 @@ See the [Getting Started](https://app.scalyr.com/help/getting-started) guide. ### Required Settings -- `dataset_url` (no default): The URL of the DataSet API that ingests the data. Most likely https://app.scalyr.com. If not specified env variable `DATASET_URL` is used. -- `api_key` (no default): The "Log Write" API Key required to use API. Instructions how to get [API key](https://app.scalyr.com/help/api-keys). If not specified env variable `DATASET_API_KEY` is used. +- `dataset_url` (no default): The URL of the DataSet API that ingests the data. Most likely https://app.scalyr.com. +- `api_key` (no default): The "Log Write" API Key required to use API. Instructions how to get [API key](https://app.scalyr.com/help/api-keys). + +If you do not want to specify `api_key` in the file, you can use the [builtin functionality](https://opentelemetry.io/docs/collector/configuration/#configuration-environment-variables) and use `api_key: ${env:DATASET_API_KEY}`. ### Optional Settings -- `max_delay_ms` (default = "15000"): The maximum delay between sending batches from the same source. -- `group_by` (default = []): The list of attributes based on which events should be grouped. +- `buffer`: + - `max_lifetime` (default = 5s): The maximum delay between sending batches from the same source. + - `group_by` (default = []): The list of attributes based on which events should be grouped. + - `retry_initial_interval` (default = 5s): Time to wait after the first failure before retrying. + - `retry_max_interval` (default = 30s): Is the upper bound on backoff. + - `max_elapsed_time` (default = 300s): Is the maximum amount of time spent trying to send a buffer. +- `traces`: + - `aggregate` (default = false): Count the number of spans and errors belonging to a trace. + - `max_wait` (default = 5s): The maximum waiting for all spans from single trace to arrive; ignored if `aggregate` is false. - `retry_on_failure`: See [retry_on_failure](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md) - `sending_queue`: See [sending_queue](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md) - `timeout`: See [timeout](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md) @@ -39,16 +48,12 @@ exporters: dataset_url: https://app.scalyr.com # API Key api_key: your_api_key - # Send batch to the API at least every 15s - max_delay_ms: 15000 - # Group data based on these attributes - group_by: - - attributes.container_id - - attributes.log.file.path - - body.map.kubernetes.container_hash - - body.map.kubernetes.pod_id - - body.map.kubernetes.docker_id - - body.map.stream + buffer: + # Send buffer to the API at least every 10s + max_lifetime: 10s + # Group data based on these attributes + group_by: + - attributes.container_id service: pipelines: @@ -57,4 +62,9 @@ service: processors: [batch] # add dataset among your exporters exporters: [dataset] + traces: + receivers: [otlp] + processors: [batch] + # add dataset among your exporters + exporters: [dataset] ``` \ No newline at end of file diff --git a/exporter/datasetexporter/config.go b/exporter/datasetexporter/config.go index 4803cf202282..990184cb06cc 100644 --- a/exporter/datasetexporter/config.go +++ b/exporter/datasetexporter/config.go @@ -16,20 +16,61 @@ package datasetexporter // import "github.com/open-telemetry/opentelemetry-colle import ( "fmt" - "os" - "strconv" + "time" + "github.com/scalyr/dataset-go/pkg/buffer" + "github.com/scalyr/dataset-go/pkg/buffer_config" + datasetConfig "github.com/scalyr/dataset-go/pkg/config" + "go.opentelemetry.io/collector/config/configopaque" "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/exporter/exporterhelper" ) -const maxDelayMs = "15000" +const tracesMaxWait = 5 * time.Second +const tracesAggregate = false + +type TracesSettings struct { + Aggregate bool `mapstructure:"aggregate"` + MaxWait time.Duration `mapstructure:"max_wait"` +} + +// newDefaultTracesSettings returns the default settings for TracesSettings. +func newDefaultTracesSettings() TracesSettings { + return TracesSettings{ + Aggregate: tracesAggregate, + MaxWait: tracesMaxWait, + } +} + +const bufferMaxLifetime = 5 * time.Second +const bufferRetryInitialInterval = 5 * time.Second +const bufferRetryMaxInterval = 30 * time.Second +const bufferRetryMaxElapsedTime = 300 * time.Second + +type BufferSettings struct { + MaxLifetime time.Duration `mapstructure:"max_lifetime"` + GroupBy []string `mapstructure:"group_by"` + RetryInitialInterval time.Duration `mapstructure:"retry_initial_interval"` + RetryMaxInterval time.Duration `mapstructure:"retry_max_interval"` + RetryMaxElapsedTime time.Duration `mapstructure:"retry_max_elapsed_time"` +} + +// newDefaultBufferSettings returns the default settings for BufferSettings. +func newDefaultBufferSettings() BufferSettings { + return BufferSettings{ + MaxLifetime: bufferMaxLifetime, + GroupBy: []string{}, + RetryInitialInterval: bufferRetryInitialInterval, + RetryMaxInterval: bufferRetryMaxInterval, + RetryMaxElapsedTime: bufferRetryMaxElapsedTime, + } +} type Config struct { - DatasetURL string `mapstructure:"dataset_url"` - APIKey string `mapstructure:"api_key"` - MaxDelayMs string `mapstructure:"max_delay_ms"` - GroupBy []string `mapstructure:"group_by"` + DatasetURL string `mapstructure:"dataset_url"` + APIKey configopaque.String `mapstructure:"api_key"` + BufferSettings `mapstructure:"buffer"` + TracesSettings `mapstructure:"traces"` exporterhelper.RetrySettings `mapstructure:"retry_on_failure"` exporterhelper.QueueSettings `mapstructure:"sending_queue"` exporterhelper.TimeoutSettings `mapstructure:"timeout"` @@ -40,17 +81,6 @@ func (c *Config) Unmarshal(conf *confmap.Conf) error { return fmt.Errorf("cannot unmarshal config: %w", err) } - if len(c.DatasetURL) == 0 { - c.DatasetURL = os.Getenv("DATASET_URL") - } - if len(c.APIKey) == 0 { - c.APIKey = os.Getenv("DATASET_API_KEY") - } - - if len(c.MaxDelayMs) == 0 { - c.MaxDelayMs = maxDelayMs - } - return nil } @@ -64,15 +94,6 @@ func (c *Config) Validate() error { return fmt.Errorf("dataset_url is required") } - _, err := strconv.Atoi(c.MaxDelayMs) - if err != nil { - return fmt.Errorf( - "max_delay_ms must be integer, but %s was used: %w", - c.MaxDelayMs, - err, - ) - } - return nil } @@ -81,11 +102,40 @@ func (c *Config) Validate() error { func (c *Config) String() string { s := "" s += fmt.Sprintf("%s: %s; ", "DatasetURL", c.DatasetURL) - s += fmt.Sprintf("%s: %s; ", "MaxDelayMs", c.MaxDelayMs) - s += fmt.Sprintf("%s: %s; ", "GroupBy", c.GroupBy) + s += fmt.Sprintf("%s: %+v; ", "BufferSettings", c.BufferSettings) + s += fmt.Sprintf("%s: %+v; ", "TracesSettings", c.TracesSettings) s += fmt.Sprintf("%s: %+v; ", "RetrySettings", c.RetrySettings) s += fmt.Sprintf("%s: %+v; ", "QueueSettings", c.QueueSettings) s += fmt.Sprintf("%s: %+v", "TimeoutSettings", c.TimeoutSettings) return s } + +func (c *Config) convert() (*ExporterConfig, error) { + err := c.Validate() + if err != nil { + return nil, fmt.Errorf("config is not valid: %w", err) + } + + return &ExporterConfig{ + datasetConfig: &datasetConfig.DataSetConfig{ + Endpoint: c.DatasetURL, + Tokens: datasetConfig.DataSetTokens{WriteLog: string(c.APIKey)}, + BufferSettings: buffer_config.DataSetBufferSettings{ + MaxLifetime: c.BufferSettings.MaxLifetime, + MaxSize: buffer.LimitBufferSize, + GroupBy: c.BufferSettings.GroupBy, + RetryInitialInterval: c.BufferSettings.RetryInitialInterval, + RetryMaxInterval: c.BufferSettings.RetryMaxInterval, + RetryMaxElapsedTime: c.BufferSettings.RetryMaxElapsedTime, + }, + }, + tracesSettings: c.TracesSettings, + }, + nil +} + +type ExporterConfig struct { + datasetConfig *datasetConfig.DataSetConfig + tracesSettings TracesSettings +} diff --git a/exporter/datasetexporter/config_test.go b/exporter/datasetexporter/config_test.go index c8aeb11cc1d7..3bc68edfc698 100644 --- a/exporter/datasetexporter/config_test.go +++ b/exporter/datasetexporter/config_test.go @@ -16,7 +16,9 @@ package datasetexporter import ( "fmt" + "os" "testing" + "time" "github.com/stretchr/testify/suite" "go.opentelemetry.io/collector/confmap" @@ -31,16 +33,19 @@ func TestSuiteConfig(t *testing.T) { suite.Run(t, new(SuiteConfig)) } +func (s *SuiteConfig) SetupTest() { + os.Clearenv() +} + func (s *SuiteConfig) TestConfigUnmarshalUnknownAttributes() { - config := Config{} - conf := confmap.NewFromStringMap(map[string]interface{}{ + f := NewFactory() + config := f.CreateDefaultConfig().(*Config) + configMap := confmap.NewFromStringMap(map[string]interface{}{ "dataset_url": "https://example.com", "api_key": "secret", "unknown_attribute": "some value", }) - - err := config.Unmarshal(conf) - s.NotNil(err) + err := config.Unmarshal(configMap) unmarshalErr := fmt.Errorf("1 error(s) decoding:\n\n* '' has invalid keys: unknown_attribute") expectedError := fmt.Errorf("cannot unmarshal config: %w", unmarshalErr) @@ -48,48 +53,20 @@ func (s *SuiteConfig) TestConfigUnmarshalUnknownAttributes() { s.Equal(expectedError.Error(), err.Error()) } -func (s *SuiteConfig) TestConfigKeepValuesWhenEnvSet() { - s.T().Setenv("DATASET_URL", "https://example.org") - s.T().Setenv("DATASET_API_KEY", "api_key") - - config := Config{} - conf := confmap.NewFromStringMap(map[string]interface{}{ +func (s *SuiteConfig) TestConfigUseDefaults() { + f := NewFactory() + config := f.CreateDefaultConfig().(*Config) + configMap := confmap.NewFromStringMap(map[string]interface{}{ "dataset_url": "https://example.com", "api_key": "secret", }) - err := config.Unmarshal(conf) - s.Nil(err) - - s.Equal("https://example.com", config.DatasetURL) - s.Equal("secret", config.APIKey) -} - -func (s *SuiteConfig) TestConfigUseEnvWhenSet() { - s.T().Setenv("DATASET_URL", "https://example.org") - s.T().Setenv("DATASET_API_KEY", "api_key") - - config := Config{} - conf := confmap.NewFromStringMap(map[string]interface{}{}) - err := config.Unmarshal(conf) - s.Nil(err) - - s.Equal("https://example.org", config.DatasetURL) - s.Equal("api_key", config.APIKey) -} - -func (s *SuiteConfig) TestConfigUseDefaultForMaxDelay() { - config := Config{} - conf := confmap.NewFromStringMap(map[string]interface{}{ - "dataset_url": "https://example.com", - "api_key": "secret", - "max_delay_ms": "", - }) - err := config.Unmarshal(conf) + err := config.Unmarshal(configMap) s.Nil(err) s.Equal("https://example.com", config.DatasetURL) - s.Equal("secret", config.APIKey) - s.Equal("15000", config.MaxDelayMs) + s.Equal("secret", string(config.APIKey)) + s.Equal(bufferMaxLifetime, config.MaxLifetime) + s.Equal(tracesMaxWait, config.TracesSettings.MaxWait) } func (s *SuiteConfig) TestConfigValidate() { @@ -103,7 +80,9 @@ func (s *SuiteConfig) TestConfigValidate() { config: Config{ DatasetURL: "https://example.com", APIKey: "secret", - MaxDelayMs: "12345", + BufferSettings: BufferSettings{ + MaxLifetime: 123 * time.Millisecond, + }, }, expected: nil, }, @@ -111,27 +90,22 @@ func (s *SuiteConfig) TestConfigValidate() { name: "missing api_key", config: Config{ DatasetURL: "https://example.com", - MaxDelayMs: "15000", + BufferSettings: BufferSettings{ + MaxLifetime: bufferMaxLifetime, + }, }, expected: fmt.Errorf("api_key is required"), }, { name: "missing dataset_url", config: Config{ - APIKey: "1234", - MaxDelayMs: "15000", + APIKey: "1234", + BufferSettings: BufferSettings{ + MaxLifetime: bufferMaxLifetime, + }, }, expected: fmt.Errorf("dataset_url is required"), }, - { - name: "invalid max_delay_ms", - config: Config{ - DatasetURL: "https://example.com", - APIKey: "1234", - MaxDelayMs: "abc", - }, - expected: fmt.Errorf("max_delay_ms must be integer, but abc was used: strconv.Atoi: parsing \"abc\": invalid syntax"), - }, } for _, tt := range tests { @@ -148,17 +122,23 @@ func (s *SuiteConfig) TestConfigValidate() { func (s *SuiteConfig) TestConfigString() { config := Config{ - DatasetURL: "https://example.com", - APIKey: "secret", - MaxDelayMs: "1234", - GroupBy: []string{"field1", "field2"}, + DatasetURL: "https://example.com", + APIKey: "secret", + BufferSettings: BufferSettings{ + MaxLifetime: 123, + GroupBy: []string{"field1", "field2"}, + }, + TracesSettings: TracesSettings{ + Aggregate: true, + MaxWait: 45 * time.Second, + }, RetrySettings: exporterhelper.NewDefaultRetrySettings(), QueueSettings: exporterhelper.NewDefaultQueueSettings(), TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(), } s.Equal( - "DatasetURL: https://example.com; MaxDelayMs: 1234; GroupBy: [field1 field2]; RetrySettings: {Enabled:true InitialInterval:5s RandomizationFactor:0.5 Multiplier:1.5 MaxInterval:30s MaxElapsedTime:5m0s}; QueueSettings: {Enabled:true NumConsumers:10 QueueSize:1000 StorageID:}; TimeoutSettings: {Timeout:5s}", + "DatasetURL: https://example.com; BufferSettings: {MaxLifetime:123ns GroupBy:[field1 field2] RetryInitialInterval:0s RetryMaxInterval:0s RetryMaxElapsedTime:0s}; TracesSettings: {Aggregate:true MaxWait:45s}; RetrySettings: {Enabled:true InitialInterval:5s RandomizationFactor:0.5 Multiplier:1.5 MaxInterval:30s MaxElapsedTime:5m0s}; QueueSettings: {Enabled:true NumConsumers:10 QueueSize:1000 StorageID:}; TimeoutSettings: {Timeout:5s}", config.String(), ) } diff --git a/exporter/datasetexporter/datasetexporter.go b/exporter/datasetexporter/datasetexporter.go index b3e395d3d3b0..fe1bf5e0e908 100644 --- a/exporter/datasetexporter/datasetexporter.go +++ b/exporter/datasetexporter/datasetexporter.go @@ -17,69 +17,112 @@ package datasetexporter // import "github.com/open-telemetry/opentelemetry-colle import ( "context" "fmt" - "sync" + "net/http" + "reflect" + "strconv" "time" "github.com/google/uuid" - "go.opentelemetry.io/collector/pdata/plog" - "go.opentelemetry.io/collector/pdata/ptrace" + "github.com/scalyr/dataset-go/pkg/api/add_events" + "github.com/scalyr/dataset-go/pkg/client" "go.uber.org/zap" "golang.org/x/time/rate" ) -type datasetExporter struct { - limiter *rate.Limiter - logger *zap.Logger - session string +type DatasetExporter struct { + client *client.DataSetClient + limiter *rate.Limiter + logger *zap.Logger + session string + spanTracker *spanTracker } -var exporterInstance *datasetExporter +func newDatasetExporter(entity string, config *Config, logger *zap.Logger) (*DatasetExporter, error) { + logger.Info("Creating new DataSetExporter", + zap.String("config", config.String()), + zap.String("entity", entity), + ) + exporterCfg, err := config.convert() + if err != nil { + return nil, fmt.Errorf( + "cannot convert config: %s; %w", + config.String(), err, + ) + } + + client, err := client.NewClient( + exporterCfg.datasetConfig, + &http.Client{Timeout: time.Second * 60}, + logger, + ) + if err != nil { + logger.Error("Cannot create DataSetClient: ", zap.Error(err)) + return nil, fmt.Errorf("cannot create newDatasetExporter: %w", err) + } -func newDatasetExporter(logger *zap.Logger) (*datasetExporter, error) { - logger.Info("Creating new DataSet Exporter with config") - if logger == nil { - return nil, fmt.Errorf("logger has to be set") + tracker := newSpanTracker(exporterCfg.tracesSettings.MaxWait) + if !exporterCfg.tracesSettings.Aggregate { + tracker = nil } - return &datasetExporter{ - limiter: rate.NewLimiter(100*rate.Every(1*time.Minute), 100), // 100 requests / minute - session: uuid.New().String(), - logger: logger, + return &DatasetExporter{ + client: client, + limiter: rate.NewLimiter(100*rate.Every(1*time.Minute), 100), // 100 requests / minute + session: uuid.New().String(), + logger: logger, + spanTracker: tracker, }, nil } -var lock = &sync.Mutex{} +func (e *DatasetExporter) shutdown(context.Context) error { + e.client.SendAllAddEventsBuffers() + return nil +} -func getDatasetExporter(entity string, config *Config, logger *zap.Logger) (*datasetExporter, error) { - logger.Info( - "Get logger for: ", - zap.String("entity", entity), - ) - // TODO: create exporter per config - if exporterInstance == nil { - lock.Lock() - defer lock.Unlock() - if exporterInstance == nil { - logger.Info( - "DataSetExport is using config: ", - zap.String("config", config.String()), - zap.String("entity", entity), - ) - instance, err := newDatasetExporter(logger) - if err != nil { - return nil, fmt.Errorf("cannot create new dataset exporter: %w", err) - } - exporterInstance = instance - } +func sendBatch(events []*add_events.EventBundle, client *client.DataSetClient) error { + return client.AddEvents(events) +} + +func buildKey(prefix string, separator string, key string, depth int) string { + res := prefix + if depth > 0 && len(prefix) > 0 { + res += separator } + res += key + return res +} - return exporterInstance, nil +func updateWithPrefixedValuesMap(target map[string]interface{}, prefix string, separator string, source map[string]interface{}, depth int) { + for k, v := range source { + key := buildKey(prefix, separator, k, depth) + updateWithPrefixedValues(target, key, separator, v, depth+1) + } } -func (e *datasetExporter) consumeLogs(ctx context.Context, ld plog.Logs) error { - return nil +func updateWithPrefixedValuesArray(target map[string]interface{}, prefix string, separator string, source []interface{}, depth int) { + for i, v := range source { + key := buildKey(prefix, separator, strconv.FormatInt(int64(i), 10), depth) + updateWithPrefixedValues(target, key, separator, v, depth+1) + } } -func (e *datasetExporter) consumeTraces(ctx context.Context, ld ptrace.Traces) error { - return nil +func updateWithPrefixedValues(target map[string]interface{}, prefix string, separator string, source interface{}, depth int) { + st := reflect.TypeOf(source) + switch st.Kind() { + case reflect.Map: + updateWithPrefixedValuesMap(target, prefix, separator, source.(map[string]interface{}), depth) + case reflect.Array, reflect.Slice: + updateWithPrefixedValuesArray(target, prefix, separator, source.([]interface{}), depth) + default: + for { + _, found := target[prefix] + if found { + prefix += separator + } else { + target[prefix] = source + break + } + } + + } } diff --git a/exporter/datasetexporter/factory.go b/exporter/datasetexporter/factory.go index 8ea0ee6d2df8..8f11e97ddd5a 100644 --- a/exporter/datasetexporter/factory.go +++ b/exporter/datasetexporter/factory.go @@ -15,9 +15,6 @@ package datasetexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datasetexporter" import ( - "context" - "fmt" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper" @@ -39,45 +36,15 @@ func NewFactory() exporter.Factory { func createDefaultConfig() component.Config { return &Config{ - MaxDelayMs: maxDelayMs, + BufferSettings: newDefaultBufferSettings(), + TracesSettings: newDefaultTracesSettings(), RetrySettings: exporterhelper.NewDefaultRetrySettings(), QueueSettings: exporterhelper.NewDefaultQueueSettings(), TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(), } } -func createLogsExporter(ctx context.Context, set exporter.CreateSettings, config component.Config) (exporter.Logs, error) { - cfg := config.(*Config) - e, err := getDatasetExporter("logs", cfg, set.Logger) - if err != nil { - return nil, fmt.Errorf("cannot get DataSetExpoter: %w", err) - } - - return exporterhelper.NewLogsExporter( - ctx, - set, - config, - e.consumeLogs, - exporterhelper.WithQueue(cfg.QueueSettings), - exporterhelper.WithRetry(cfg.RetrySettings), - exporterhelper.WithTimeout(cfg.TimeoutSettings), - ) -} - -func createTracesExporter(ctx context.Context, set exporter.CreateSettings, config component.Config) (exporter.Traces, error) { - cfg := config.(*Config) - e, err := getDatasetExporter("traces", cfg, set.Logger) - if err != nil { - return nil, fmt.Errorf("cannot get DataSetExpoter: %w", err) - } - - return exporterhelper.NewTracesExporter( - ctx, - set, - config, - e.consumeTraces, - exporterhelper.WithQueue(cfg.QueueSettings), - exporterhelper.WithRetry(cfg.RetrySettings), - exporterhelper.WithTimeout(cfg.TimeoutSettings), - ) +// castConfig casts it to the Dataset Config struct. +func castConfig(c component.Config) *Config { + return c.(*Config) } diff --git a/exporter/datasetexporter/factory_test.go b/exporter/datasetexporter/factory_test.go index ebaf64af1970..b34ad49f7168 100644 --- a/exporter/datasetexporter/factory_test.go +++ b/exporter/datasetexporter/factory_test.go @@ -15,7 +15,7 @@ package datasetexporter import ( - "context" + "fmt" "os" "path/filepath" "testing" @@ -26,7 +26,6 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/confmap/confmaptest" "go.opentelemetry.io/collector/exporter/exporterhelper" - "go.opentelemetry.io/collector/exporter/exportertest" ) type SuiteFactory struct { @@ -46,7 +45,8 @@ func (s *SuiteFactory) TestCreateDefaultConfig() { cfg := factory.CreateDefaultConfig() s.Equal(&Config{ - MaxDelayMs: maxDelayMs, + BufferSettings: newDefaultBufferSettings(), + TracesSettings: newDefaultTracesSettings(), RetrySettings: exporterhelper.NewDefaultRetrySettings(), QueueSettings: exporterhelper.NewDefaultQueueSettings(), TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(), @@ -68,7 +68,8 @@ func (s *SuiteFactory) TestLoadConfig() { expected: &Config{ DatasetURL: "https://app.scalyr.com", APIKey: "key-minimal", - MaxDelayMs: maxDelayMs, + BufferSettings: newDefaultBufferSettings(), + TracesSettings: newDefaultTracesSettings(), RetrySettings: exporterhelper.NewDefaultRetrySettings(), QueueSettings: exporterhelper.NewDefaultQueueSettings(), TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(), @@ -77,10 +78,16 @@ func (s *SuiteFactory) TestLoadConfig() { { id: component.NewIDWithName(CfgTypeStr, "lib"), expected: &Config{ - DatasetURL: "https://app.eu.scalyr.com", - APIKey: "key-lib", - MaxDelayMs: "12345", - GroupBy: []string{"attributes.container_id", "attributes.log.file.path"}, + DatasetURL: "https://app.eu.scalyr.com", + APIKey: "key-lib", + BufferSettings: BufferSettings{ + MaxLifetime: 345 * time.Millisecond, + GroupBy: []string{"attributes.container_id", "attributes.log.file.path"}, + RetryInitialInterval: bufferRetryInitialInterval, + RetryMaxInterval: bufferRetryMaxInterval, + RetryMaxElapsedTime: bufferRetryMaxElapsedTime, + }, + TracesSettings: newDefaultTracesSettings(), RetrySettings: exporterhelper.NewDefaultRetrySettings(), QueueSettings: exporterhelper.NewDefaultQueueSettings(), TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(), @@ -91,8 +98,16 @@ func (s *SuiteFactory) TestLoadConfig() { expected: &Config{ DatasetURL: "https://app.scalyr.com", APIKey: "key-full", - MaxDelayMs: "3456", - GroupBy: []string{"body.map.kubernetes.pod_id", "body.map.kubernetes.docker_id", "body.map.stream"}, + BufferSettings: BufferSettings{ + MaxLifetime: 3456 * time.Millisecond, + GroupBy: []string{"body.map.kubernetes.pod_id", "body.map.kubernetes.docker_id", "body.map.stream"}, + RetryInitialInterval: 21 * time.Second, + RetryMaxInterval: 22 * time.Second, + RetryMaxElapsedTime: 23 * time.Second, + }, + TracesSettings: TracesSettings{ + MaxWait: 3 * time.Second, + }, RetrySettings: exporterhelper.RetrySettings{ Enabled: true, InitialInterval: 11 * time.Nanosecond, @@ -136,13 +151,24 @@ type CreateTest struct { func createExporterTests() []CreateTest { return []CreateTest{ + { + name: "broken", + config: &Config{}, + expectedError: fmt.Errorf("cannot get DataSetExpoter: cannot convert config: DatasetURL: ; BufferSettings: {MaxLifetime:0s GroupBy:[] RetryInitialInterval:0s RetryMaxInterval:0s RetryMaxElapsedTime:0s}; TracesSettings: {Aggregate:false MaxWait:0s}; RetrySettings: {Enabled:false InitialInterval:0s RandomizationFactor:0 Multiplier:0 MaxInterval:0s MaxElapsedTime:0s}; QueueSettings: {Enabled:false NumConsumers:0 QueueSize:0 StorageID:}; TimeoutSettings: {Timeout:0s}; config is not valid: api_key is required"), + }, { name: "valid", config: &Config{ - DatasetURL: "https://app.eu.scalyr.com", - APIKey: "key-lib", - MaxDelayMs: "12345", - GroupBy: []string{"attributes.container_id"}, + DatasetURL: "https://app.eu.scalyr.com", + APIKey: "key-lib", + BufferSettings: BufferSettings{ + MaxLifetime: 12345, + GroupBy: []string{"attributes.container_id"}, + }, + TracesSettings: TracesSettings{ + Aggregate: true, + MaxWait: 5 * time.Second, + }, RetrySettings: exporterhelper.NewDefaultRetrySettings(), QueueSettings: exporterhelper.NewDefaultQueueSettings(), TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(), @@ -151,43 +177,3 @@ func createExporterTests() []CreateTest { }, } } - -func (s *SuiteFactory) TestCreateLogsExporter() { - ctx := context.Background() - createSettings := exportertest.NewNopCreateSettings() - tests := createExporterTests() - - for _, tt := range tests { - s.T().Run(tt.name, func(*testing.T) { - exporterInstance = nil - logs, err := createLogsExporter(ctx, createSettings, tt.config) - - if err == nil { - s.Nil(tt.expectedError) - } else { - s.Equal(tt.expectedError.Error(), err.Error()) - s.Nil(logs) - } - }) - } -} - -func (s *SuiteFactory) TestCreateTracesExporter() { - ctx := context.Background() - createSettings := exportertest.NewNopCreateSettings() - tests := createExporterTests() - - for _, tt := range tests { - s.T().Run(tt.name, func(t *testing.T) { - exporterInstance = nil - logs, err := createTracesExporter(ctx, createSettings, tt.config) - - if err == nil { - s.Nil(tt.expectedError) - } else { - s.Equal(tt.expectedError.Error(), err.Error()) - s.Nil(logs) - } - }) - } -} diff --git a/exporter/datasetexporter/go.mod b/exporter/datasetexporter/go.mod index 6008dcbf0d87..0f19b75e8b4a 100644 --- a/exporter/datasetexporter/go.mod +++ b/exporter/datasetexporter/go.mod @@ -4,6 +4,9 @@ go 1.19 require ( github.com/google/uuid v1.3.0 + // github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/filestorage v0.77.0 + github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.77.0 + github.com/scalyr/dataset-go v0.0.7 github.com/stretchr/testify v1.8.2 go.opentelemetry.io/collector/component v0.77.0 go.opentelemetry.io/collector/confmap v0.77.0 @@ -11,11 +14,16 @@ require ( go.opentelemetry.io/collector/pdata v1.0.0-rcv0011 go.uber.org/zap v1.24.0 golang.org/x/time v0.3.0 + ) +require go.opentelemetry.io/collector v0.77.0 + require ( github.com/cenkalti/backoff/v4 v4.2.1 // indirect + github.com/cskr/pubsub v1.0.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/go-logfmt/logfmt v0.6.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/json-iterator/go v1.1.12 // indirect @@ -27,7 +35,6 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect go.opencensus.io v0.24.0 // indirect - go.opentelemetry.io/collector v0.77.0 // indirect go.opentelemetry.io/collector/consumer v0.77.0 // indirect go.opentelemetry.io/collector/featuregate v0.77.0 // indirect go.opentelemetry.io/collector/receiver v0.77.0 // indirect @@ -35,13 +42,19 @@ require ( go.opentelemetry.io/otel/metric v0.38.1 // indirect go.opentelemetry.io/otel/trace v1.15.1 // indirect go.uber.org/atomic v1.10.0 // indirect - go.uber.org/goleak v1.1.12 // indirect + go.uber.org/goleak v1.2.1 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.10.0 // indirect golang.org/x/sys v0.8.0 // indirect golang.org/x/text v0.9.0 // indirect - google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 // indirect + google.golang.org/genproto v0.0.0-20230320184635-7606e756e683 // indirect google.golang.org/grpc v1.55.0 // indirect google.golang.org/protobuf v1.30.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) + +replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal => ../../internal/coreinternal + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil diff --git a/exporter/datasetexporter/go.sum b/exporter/datasetexporter/go.sum index a646cbda79f3..6caaefa3308d 100644 --- a/exporter/datasetexporter/go.sum +++ b/exporter/datasetexporter/go.sum @@ -38,6 +38,8 @@ github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGX github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= +github.com/cskr/pubsub v1.0.2 h1:vlOzMhl6PFn60gRlTQQsIfVwaPB/B/8MziK8FhEPt/0= +github.com/cskr/pubsub v1.0.2/go.mod h1:/8MzYXk/NJAz782G8RPkFzXTZVu63VotefPnR9TIRis= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -61,7 +63,8 @@ github.com/go-ldap/ldap v3.0.2+incompatible/go.mod h1:qfd9rJvER9Q0/D/Sqn1DfHRoBp github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= -github.com/go-logfmt/logfmt v0.5.1 h1:otpy5pqBCBZ1ng9RQ0dPu4PN7ba75Y/aA+UpowDyNVA= +github.com/go-logfmt/logfmt v0.6.0 h1:wGYYu3uicYdqXVgoYbvnkrPVXkuLM1p1ifugDMEdRi4= +github.com/go-logfmt/logfmt v0.6.0/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= @@ -145,6 +148,7 @@ github.com/hashicorp/yamux v0.0.0-20180604194846-3520598351bb/go.mod h1:+NfK9FKe github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM= github.com/hjson/hjson-go/v4 v4.0.0 h1:wlm6IYYqHjOdXH1gHev4VoXCaW20HdQAGCxdOEEg2cs= github.com/hjson/hjson-go/v4 v4.0.0/go.mod h1:KaYt3bTw3zhBjYqnXkYywcYctk0A2nxeEFTse3rH13E= +github.com/jarcoal/httpmock v1.3.0 h1:2RJ8GP0IIaWwcC9Fp2BmVi8Kog3v2Hn7VXM3fTd+nuc= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/joho/godotenv v1.3.0 h1:Zjp+RcGpHhGlrMbJzXTrZZPrWj+1vfm90La1wgB6Bhc= @@ -180,6 +184,7 @@ github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOA github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= +github.com/maxatome/go-testdeep v1.13.0 h1:EBmRelH7MhMfPvA+0kXAeOeJUXn3mzul5NmvjLDcQZI= github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso= github.com/miekg/dns v1.1.41/go.mod h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJysuI= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= @@ -248,6 +253,8 @@ github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjR github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/ryanuber/go-glob v1.0.0/go.mod h1:807d1WSdnB0XRJzKNil9Om6lcp/3a0v4qIHxIXzX/Yc= +github.com/scalyr/dataset-go v0.0.7 h1:Uz+b2iMcIzp0C3ppK4OWs3mUWnB7WGN7SEJVvm0V/Jk= +github.com/scalyr/dataset-go v0.0.7/go.mod h1:J8ioLMhd4bUoaPdXJOHQiQI/XVs0kG/kAuVsN8buLBQ= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= @@ -304,8 +311,8 @@ go.opentelemetry.io/otel/trace v1.15.1/go.mod h1:IWdQG/5N1x7f6YUlmdLeJvH9yxtuJAf go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= -go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= -go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= +go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= +go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= @@ -321,7 +328,6 @@ golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= -golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= @@ -414,7 +420,6 @@ golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapK golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= -golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -427,8 +432,8 @@ google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98 google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0= -google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 h1:DdoeryqhaXp1LtT/emMP1BRJPHHKFi5akj/nbx/zNTA= -google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4/go.mod h1:NWraEVixdDnqcqQ30jipen1STv2r/n24Wb7twVTGR4s= +google.golang.org/genproto v0.0.0-20230320184635-7606e756e683 h1:khxVcsk/FhnzxMKOyD+TDGwjbEOpcPuIpmafPGFmhMA= +google.golang.org/genproto v0.0.0-20230320184635-7606e756e683/go.mod h1:NWraEVixdDnqcqQ30jipen1STv2r/n24Wb7twVTGR4s= google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.22.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= @@ -456,7 +461,6 @@ google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/asn1-ber.v1 v1.0.0-20181015200546-f715ec2f112d/go.mod h1:cuepJuh7vyXfUyUwEgHQXw849cJrilpS5NeIjOWESAw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/square/go-jose.v2 v2.3.1/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI= diff --git a/exporter/datasetexporter/internal/metadata/generated_status.go b/exporter/datasetexporter/internal/metadata/generated_status.go index c846571ff2ce..fd98d3248701 100644 --- a/exporter/datasetexporter/internal/metadata/generated_status.go +++ b/exporter/datasetexporter/internal/metadata/generated_status.go @@ -8,6 +8,6 @@ import ( const ( Type = "datasetexporter" - TracesStability = component.StabilityLevelDevelopment LogsStability = component.StabilityLevelDevelopment + TracesStability = component.StabilityLevelDevelopment ) diff --git a/exporter/datasetexporter/logs_exporter.go b/exporter/datasetexporter/logs_exporter.go new file mode 100644 index 000000000000..d2e315623a84 --- /dev/null +++ b/exporter/datasetexporter/logs_exporter.go @@ -0,0 +1,150 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package datasetexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datasetexporter" + +import ( + "context" + "fmt" + "strconv" + "time" + + "github.com/scalyr/dataset-go/pkg/api/add_events" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterhelper" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" +) + +func createLogsExporter(ctx context.Context, set exporter.CreateSettings, config component.Config) (exporter.Logs, error) { + cfg := castConfig(config) + e, err := newDatasetExporter("logs", cfg, set.Logger) + if err != nil { + return nil, fmt.Errorf("cannot get DataSetExpoter: %w", err) + } + + return exporterhelper.NewLogsExporter( + ctx, + set, + config, + e.consumeLogs, + exporterhelper.WithQueue(cfg.QueueSettings), + exporterhelper.WithRetry(cfg.RetrySettings), + exporterhelper.WithTimeout(cfg.TimeoutSettings), + exporterhelper.WithShutdown(e.shutdown), + ) +} + +func buildBody(attrs map[string]interface{}, value pcommon.Value) string { + message := value.AsString() + attrs["body.type"] = value.Type().String() + switch value.Type() { + case pcommon.ValueTypeEmpty: + attrs["body.empty"] = value.AsString() + case pcommon.ValueTypeStr: + attrs["body.str"] = value.Str() + case pcommon.ValueTypeBool: + attrs["body.bool"] = value.Bool() + case pcommon.ValueTypeDouble: + attrs["body.double"] = value.Double() + case pcommon.ValueTypeInt: + attrs["body.int"] = value.Int() + case pcommon.ValueTypeMap: + updateWithPrefixedValues(attrs, "body.map.", ".", value.Map().AsRaw(), 0) + case pcommon.ValueTypeBytes: + attrs["body.bytes"] = value.AsString() + case pcommon.ValueTypeSlice: + attrs["body.slice"] = value.AsRaw() + default: + attrs["body.unknown"] = value.AsString() + } + + return message +} + +func buildEventFromLog(log plog.LogRecord, resource pcommon.Resource, scope pcommon.InstrumentationScope) *add_events.EventBundle { + attrs := make(map[string]interface{}) + event := add_events.Event{} + + if sevNum := log.SeverityNumber(); sevNum > 0 { + attrs["severity.number"] = sevNum + event.Sev = int(sevNum) + } + + if timestamp := log.Timestamp().AsTime(); !timestamp.Equal(time.Unix(0, 0)) { + attrs["timestamp"] = timestamp.String() + event.Ts = strconv.FormatInt(timestamp.UnixNano(), 10) + } + + if body := log.Body().AsString(); body != "" { + attrs["message"] = fmt.Sprintf( + "OtelExporter - Log - %s", + buildBody(attrs, log.Body()), + ) + } + if dropped := log.DroppedAttributesCount(); dropped > 0 { + attrs["dropped_attributes_count"] = dropped + } + if observed := log.ObservedTimestamp().AsTime(); !observed.Equal(time.Unix(0, 0)) { + attrs["observed.timestamp"] = observed.String() + } + if sevText := log.SeverityText(); sevText != "" { + attrs["severity.text"] = sevText + } + if span := log.SpanID().String(); span != "" { + attrs["span_id"] = span + } + + if trace := log.TraceID().String(); trace != "" { + attrs["trace_id"] = trace + } + + updateWithPrefixedValues(attrs, "attributes.", ".", log.Attributes().AsRaw(), 0) + attrs["flags"] = log.Flags() + attrs["flag.is_sampled"] = log.Flags().IsSampled() + + updateWithPrefixedValues(attrs, "resource.attributes.", ".", resource.Attributes().AsRaw(), 0) + attrs["scope.name"] = scope.Name() + updateWithPrefixedValues(attrs, "scope.attributes.", ".", scope.Attributes().AsRaw(), 0) + + event.Attrs = attrs + event.Log = "LL" + event.Thread = "TL" + return &add_events.EventBundle{ + Event: &event, + Thread: &add_events.Thread{Id: "TL", Name: "logs"}, + Log: &add_events.Log{Id: "LL", Attrs: map[string]interface{}{}}, + } +} + +func (e *DatasetExporter) consumeLogs(ctx context.Context, ld plog.Logs) error { + var events []*add_events.EventBundle + + resourceLogs := ld.ResourceLogs() + for i := 0; i < resourceLogs.Len(); i++ { + resource := resourceLogs.At(i).Resource() + scopeLogs := resourceLogs.At(i).ScopeLogs() + for j := 0; j < scopeLogs.Len(); j++ { + scope := scopeLogs.At(j).Scope() + logRecords := scopeLogs.At(j).LogRecords() + for k := 0; k < logRecords.Len(); k++ { + logRecord := logRecords.At(k) + events = append(events, buildEventFromLog(logRecord, resource, scope)) + } + } + } + + return sendBatch(events, e.client) +} diff --git a/exporter/datasetexporter/logs_exporter_stress_test.go b/exporter/datasetexporter/logs_exporter_stress_test.go new file mode 100644 index 000000000000..d9ff2a11af7d --- /dev/null +++ b/exporter/datasetexporter/logs_exporter_stress_test.go @@ -0,0 +1,165 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build integration +// +build integration + +package datasetexporter + +import ( + "context" + "encoding/json" + "fmt" + "math/rand" + "net/http" + "net/http/httptest" + "os" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/suite" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/exporter/exporterhelper" + "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" +) + +type SuiteLogsE2EExporter struct { + suite.Suite +} + +func (s *SuiteLogsE2EExporter) SetupTest() { + os.Clearenv() +} + +func TestSuiteLogsE2EExporterIntegration(t *testing.T) { + suite.Run(t, new(SuiteLogsE2EExporter)) +} + +func (s *SuiteLogsE2EExporter) TestConsumeLogsManyLogsShouldSucceed() { + const maxDelay = 200 * time.Millisecond + createSettings := exportertest.NewNopCreateSettings() + + const maxBatchCount = 20 + const logsPerBatch = 10000 + const expectedLogs = uint64(maxBatchCount * logsPerBatch) + + attempt := atomic.Uint64{} + wasSuccessful := atomic.Bool{} + processedEvents := atomic.Uint64{} + seenKeys := make(map[string]int64) + expectedKeys := make(map[string]int64) + mutex := &sync.RWMutex{} + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + attempt.Add(1) + cer, err := extract(req) + + s.NoError(err, "Error reading request: %v", err) + + for _, ev := range cer.Events { + processedEvents.Add(1) + key, found := ev.Attrs["body.str"] + s.True(found) + mutex.Lock() + sKey := key.(string) + _, f := seenKeys[sKey] + if !f { + seenKeys[sKey] = 0 + } + seenKeys[sKey]++ + mutex.Unlock() + } + + wasSuccessful.Store(true) + payload, err := json.Marshal(map[string]interface{}{ + "status": "success", + "bytesCharged": 42, + }) + s.NoError(err) + l, err := w.Write(payload) + s.Greater(l, 1) + s.NoError(err) + })) + defer server.Close() + + config := &Config{ + DatasetURL: server.URL, + APIKey: "key-lib", + BufferSettings: BufferSettings{ + MaxLifetime: maxDelay, + GroupBy: []string{"attributes.container_id"}, + }, + RetrySettings: exporterhelper.NewDefaultRetrySettings(), + QueueSettings: exporterhelper.NewDefaultQueueSettings(), + TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(), + } + + logs, err := createLogsExporter(context.Background(), createSettings, config) + waitingTime := time.Duration(0) + if s.NoError(err) { + err = logs.Start(context.Background(), componenttest.NewNopHost()) + s.NoError(err) + + for bI := 0; bI < maxBatchCount; bI++ { + batch := plog.NewLogs() + rL := batch.ResourceLogs().AppendEmpty() + sL := rL.ScopeLogs().AppendEmpty() + for lI := 0; lI < logsPerBatch; lI++ { + key := fmt.Sprintf("%04d-%06d", bI, lI) + log := sL.LogRecords().AppendEmpty() + log.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) + log.Body().SetStr(key) + log.Attributes().PutStr("key", key) + log.Attributes().PutStr("p1", strings.Repeat("A", rand.Intn(2000))) + expectedKeys[key] = 1 + } + err = logs.ConsumeLogs(context.Background(), batch) + s.Nil(err) + time.Sleep(time.Duration(float64(maxDelay.Nanoseconds()) * 0.7)) + } + + s.NotNil(logs) + + time.Sleep(time.Second) + err = logs.Shutdown(context.Background()) + s.Nil(err) + lastProcessed := uint64(0) + sameNumber := 0 + for { + s.T().Logf("Processed events: %d / %d", processedEvents.Load(), expectedLogs) + if lastProcessed == processedEvents.Load() { + sameNumber++ + } + if processedEvents.Load() >= expectedLogs || sameNumber > 10 { + break + } + lastProcessed = processedEvents.Load() + time.Sleep(time.Second) + waitingTime += time.Second + } + } + + time.Sleep(2 * time.Second) + + s.True(wasSuccessful.Load()) + + s.Equal(seenKeys, expectedKeys) + s.Equal(expectedLogs, processedEvents.Load(), "processed items") + s.Equal(expectedLogs, uint64(len(seenKeys)), "unique items") +} diff --git a/exporter/datasetexporter/logs_exporter_test.go b/exporter/datasetexporter/logs_exporter_test.go new file mode 100644 index 000000000000..ca8e69c124be --- /dev/null +++ b/exporter/datasetexporter/logs_exporter_test.go @@ -0,0 +1,337 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package datasetexporter + +import ( + "bytes" + "compress/gzip" + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "os" + "sync/atomic" + "testing" + "time" + + "github.com/scalyr/dataset-go/pkg/api/add_events" + "github.com/scalyr/dataset-go/pkg/api/request" + "github.com/stretchr/testify/suite" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/exporter/exporterhelper" + "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata" +) + +type SuiteLogsExporter struct { + suite.Suite +} + +func (s *SuiteLogsExporter) SetupTest() { + os.Clearenv() +} + +func TestSuiteLogsExporter(t *testing.T) { + suite.Run(t, new(SuiteLogsExporter)) +} + +func (s *SuiteLogsExporter) TestCreateLogsExporter() { + ctx := context.Background() + createSettings := exportertest.NewNopCreateSettings() + tests := createExporterTests() + + for _, tt := range tests { + s.T().Run(tt.name, func(*testing.T) { + logs, err := createLogsExporter(ctx, createSettings, tt.config) + + if err == nil { + s.Nil(tt.expectedError, tt.name) + s.NotNil(logs, tt.name) + } else { + if tt.expectedError == nil { + s.Nil(err, tt.name) + } else { + s.Equal(tt.expectedError.Error(), err.Error(), tt.name) + s.Nil(logs, tt.name) + } + } + }) + } +} + +func (s *SuiteLogsExporter) TestBuildBody() { + slice := pcommon.NewValueSlice() + err := slice.FromRaw([]any{1, 2, 3}) + s.NoError(err) + + bytes := pcommon.NewValueBytes() + err = bytes.FromRaw([]byte{byte(65), byte(66), byte(67)}) + s.NoError(err) + tests := []struct { + body pcommon.Value + key string + value interface{} + message string + }{ + { + body: pcommon.NewValueEmpty(), + key: "body.empty", + value: "", + message: "", + }, + { + body: pcommon.NewValueStr("foo"), + key: "body.str", + value: "foo", + message: "foo", + }, + { + body: pcommon.NewValueBool(true), + key: "body.bool", + value: true, + message: "true", + }, + { + body: pcommon.NewValueDouble(42.5), + key: "body.double", + value: float64(42.5), + message: "42.5", + }, + { + body: pcommon.NewValueInt(42), + key: "body.int", + value: int64(42), + message: "42", + }, + { + body: bytes, + key: "body.bytes", + value: "QUJD", + message: "QUJD", + }, + { + body: slice, + key: "body.slice", + value: []interface{}{int64(1), int64(2), int64(3)}, + message: "[1,2,3]", + }, + } + + for _, tt := range tests { + s.T().Run(tt.key, func(*testing.T) { + attrs := make(map[string]interface{}) + msg := buildBody(attrs, tt.body) + expectedAttrs := make(map[string]interface{}) + expectedAttrs["body.type"] = tt.body.Type().String() + expectedAttrs[tt.key] = tt.value + + s.Equal(tt.message, msg, tt.key) + s.Equal(expectedAttrs, attrs, tt.key) + }) + } +} + +func (s *SuiteLogsExporter) TestBuildBodyMap() { + m := pcommon.NewValueMap() + err := m.FromRaw(map[string]any{ + "scalar": "scalar-value", + "map": map[string]any{ + "m1": "v1", + "m2": "v2", + }, + "array": []any{1, 2, 3}, + }) + if s.NoError(err) { + attrs := make(map[string]interface{}) + msg := buildBody(attrs, m) + expectedAttrs := make(map[string]interface{}) + expectedAttrs["body.type"] = pcommon.ValueTypeMap.String() + expectedAttrs["body.map.scalar"] = "scalar-value" + expectedAttrs["body.map.map.m1"] = "v1" + expectedAttrs["body.map.map.m2"] = "v2" + expectedAttrs["body.map.array.0"] = int64(1) + expectedAttrs["body.map.array.1"] = int64(2) + expectedAttrs["body.map.array.2"] = int64(3) + + expectedMsg := "{\"array\":[1,2,3],\"map\":{\"m1\":\"v1\",\"m2\":\"v2\"},\"scalar\":\"scalar-value\"}" + + s.Equal(expectedMsg, msg) + s.Equal(expectedAttrs, attrs) + } +} + +var testLEventRaw = &add_events.Event{ + Thread: "TL", + Log: "LL", + Sev: 9, + Ts: "1581452773000000789", + Attrs: map[string]interface{}{ + "attributes.app": "server", + "attributes.instance_num": int64(1), + "body.str": "This is a log message", + "body.type": "Str", + "dropped_attributes_count": uint32(1), + "flag.is_sampled": false, + "flags": plog.LogRecordFlags(0), + "message": "OtelExporter - Log - This is a log message", + "resource.attributes.resource-attr": "resource-attr-val-1", + "scope.name": "", + "severity.number": plog.SeverityNumberInfo, + "severity.text": "Info", + "span_id": "0102040800000000", + "timestamp": "2020-02-11 20:26:13.000000789 +0000 UTC", + "trace_id": "08040201000000000000000000000000", + }, +} + +var testLEventReq = &add_events.Event{ + Thread: testLEventRaw.Thread, + Log: testLEventRaw.Log, + Sev: testLEventRaw.Sev, + Ts: testLEventRaw.Ts, + Attrs: map[string]interface{}{ + "attributes.app": "server", + "attributes.instance_num": float64(1), + "body.str": "This is a log message", + "body.type": "Str", + "dropped_attributes_count": float64(1), + "flag.is_sampled": false, + "flags": float64(plog.LogRecordFlags(0)), + "message": "OtelExporter - Log - This is a log message", + "resource.attributes.resource-attr": "resource-attr-val-1", + "scope.name": "", + "severity.number": float64(plog.SeverityNumberInfo), + "severity.text": "Info", + "span_id": "0102040800000000", + "timestamp": "2020-02-11 20:26:13.000000789 +0000 UTC", + "trace_id": "08040201000000000000000000000000", + "bundle_key": "d41d8cd98f00b204e9800998ecf8427e", + }, +} + +var testLThread = &add_events.Thread{ + Id: "TL", + Name: "logs", +} + +var testLLog = &add_events.Log{ + Id: "LL", + Attrs: map[string]interface{}{}, +} + +func (s *SuiteLogsExporter) TestBuildEventFromLog() { + lr := testdata.GenerateLogsOneLogRecord() + ld := lr.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0) + + expected := &add_events.EventBundle{ + Event: testLEventRaw, + Thread: testLThread, + Log: testLLog, + } + was := buildEventFromLog( + ld, + lr.ResourceLogs().At(0).Resource(), + lr.ResourceLogs().At(0).ScopeLogs().At(0).Scope(), + ) + + s.Equal(expected, was) +} + +func extract(req *http.Request) (add_events.AddEventsRequest, error) { + data, _ := io.ReadAll(req.Body) + b := bytes.NewBuffer(data) + reader, _ := gzip.NewReader(b) + + var resB bytes.Buffer + _, _ = resB.ReadFrom(reader) + + cer := &add_events.AddEventsRequest{} + err := json.Unmarshal(resB.Bytes(), cer) + return *cer, err +} + +func (s *SuiteLogsExporter) TestConsumeLogsShouldSucceed() { + createSettings := exportertest.NewNopCreateSettings() + + attempt := atomic.Uint64{} + wasSuccessful := atomic.Bool{} + addRequest := add_events.AddEventsRequest{} + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + attempt.Add(1) + cer, err := extract(req) + addRequest = cer + + s.NoError(err, "Error reading request: %v", err) + + wasSuccessful.Store(true) + payload, err := json.Marshal(map[string]interface{}{ + "status": "success", + "bytesCharged": 42, + }) + s.NoError(err) + l, err := w.Write(payload) + s.Greater(l, 1) + s.NoError(err) + })) + defer server.Close() + + config := &Config{ + DatasetURL: server.URL, + APIKey: "key-lib", + BufferSettings: BufferSettings{ + MaxLifetime: time.Millisecond, + GroupBy: []string{"attributes.container_id"}, + }, + RetrySettings: exporterhelper.NewDefaultRetrySettings(), + QueueSettings: exporterhelper.NewDefaultQueueSettings(), + TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(), + } + + lr := testdata.GenerateLogsOneLogRecord() + + logs, err := createLogsExporter(context.Background(), createSettings, config) + if s.NoError(err) { + err = logs.Start(context.Background(), componenttest.NewNopHost()) + s.NoError(err) + + s.NotNil(logs) + err = logs.ConsumeLogs(context.Background(), lr) + s.Nil(err) + time.Sleep(time.Second) + err = logs.Shutdown(context.Background()) + s.Nil(err) + } + + s.True(wasSuccessful.Load()) + s.Equal( + add_events.AddEventsRequest{ + AuthParams: request.AuthParams{ + Token: "key-lib", + }, + AddEventsRequestParams: add_events.AddEventsRequestParams{ + Session: addRequest.Session, + Events: []*add_events.Event{testLEventReq}, + Threads: []*add_events.Thread{testLThread}, + Logs: []*add_events.Log{testLLog}, + }, + }, + addRequest, + ) +} diff --git a/exporter/datasetexporter/metadata.yaml b/exporter/datasetexporter/metadata.yaml index c5d938e0fb01..9f3d1faf7c04 100644 --- a/exporter/datasetexporter/metadata.yaml +++ b/exporter/datasetexporter/metadata.yaml @@ -3,5 +3,5 @@ type: datasetexporter status: class: exporter stability: - development: [ traces, logs ] + development: [ logs, traces ] distributions: [] \ No newline at end of file diff --git a/exporter/datasetexporter/testdata/config.yaml b/exporter/datasetexporter/testdata/config.yaml index 1c3fd88105cc..fa6b2c5c2697 100644 --- a/exporter/datasetexporter/testdata/config.yaml +++ b/exporter/datasetexporter/testdata/config.yaml @@ -5,19 +5,26 @@ dataset/minimal: dataset/lib: dataset_url: https://app.eu.scalyr.com api_key: key-lib - max_delay_ms: 12345 - group_by: - - attributes.container_id - - attributes.log.file.path + buffer: + max_lifetime: 345ms + group_by: + - attributes.container_id + - attributes.log.file.path dataset/full: dataset_url: https://app.scalyr.com api_key: key-full - max_delay_ms: 3456 - group_by: - - body.map.kubernetes.pod_id - - body.map.kubernetes.docker_id - - body.map.stream + buffer: + max_lifetime: 3456ms + group_by: + - body.map.kubernetes.pod_id + - body.map.kubernetes.docker_id + - body.map.stream + retry_initial_interval: 21s + retry_max_interval: 22s + retry_max_elapsed_time: 23s + traces: + max_wait: 3s retry_on_failure: enabled: true initial_interval: 11 diff --git a/exporter/datasetexporter/traces_exporter.go b/exporter/datasetexporter/traces_exporter.go new file mode 100644 index 000000000000..044211fdec27 --- /dev/null +++ b/exporter/datasetexporter/traces_exporter.go @@ -0,0 +1,356 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package datasetexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datasetexporter" + +import ( + "context" + "fmt" + "sort" + "strings" + "time" + + "github.com/scalyr/dataset-go/pkg/api/add_events" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterhelper" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +const ServiceNameKey = "service.name" + +func createTracesExporter(ctx context.Context, set exporter.CreateSettings, config component.Config) (exporter.Traces, error) { + cfg := castConfig(config) + e, err := newDatasetExporter("logs", cfg, set.Logger) + if err != nil { + return nil, fmt.Errorf("cannot get DataSetExpoter: %w", err) + } + + return exporterhelper.NewTracesExporter( + ctx, + set, + config, + e.consumeTraces, + exporterhelper.WithQueue(cfg.QueueSettings), + exporterhelper.WithRetry(cfg.RetrySettings), + exporterhelper.WithTimeout(cfg.TimeoutSettings), + exporterhelper.WithShutdown(e.shutdown), + ) +} + +func buildEventFromSpan( + bundle spanBundle, + tracker *spanTracker, +) *add_events.EventBundle { + span := bundle.span + resource := bundle.resource + + attrs := make(map[string]interface{}) + event := add_events.Event{ + Sev: int(plog.SeverityNumberInfo), + Ts: fmt.Sprintf("%d", span.StartTimestamp().AsTime().UnixNano()), + } + + attrs["sca:schema"] = "tracing" + attrs["sca:schemVer"] = 1 + attrs["sca:type"] = "span" + + attrs["name"] = span.Name() + attrs["span_id"] = span.SpanID().String() + if !span.ParentSpanID().IsEmpty() { + attrs["parent_span_id"] = span.ParentSpanID().String() + } + attrs["trace_id"] = span.TraceID().String() + + attrs["start_time_unix_nano"] = fmt.Sprintf("%d", span.StartTimestamp().AsTime().UnixNano()) + attrs["end_time_unix_nano"] = fmt.Sprintf("%d", span.EndTimestamp().AsTime().UnixNano()) + attrs["duration_nano"] = fmt.Sprintf("%d", span.EndTimestamp().AsTime().UnixNano()-span.StartTimestamp().AsTime().UnixNano()) + + attrs["kind"] = strings.ToLower(span.Kind().String()) + attrs["status_code"] = strings.ToLower(span.Status().Code().String()) + attrs["status_message"] = span.Status().Message() + // for now we care only small subset of attributes + // updateWithPrefixedValues(attrs, "resource_", "_", resource.Attributes().AsRaw(), 0) + updateResource(attrs, resource.Attributes().AsRaw()) + + service, serviceFound := resource.Attributes().Get(ServiceNameKey) + if serviceFound { + attrs["services"] = service.AsString() + } + + if tracker != nil { + // find tracking info + if !span.TraceID().IsEmpty() { + key := newTraceAndSpan(span.TraceID(), span.SpanID()) + + // find previous information + info, ok := tracker.spans[key] + if ok { + // don't count itself + attrs["span_count"] = info.spanCount + attrs["error_count"] = info.errorCount + updateServices(attrs, info.services) + + delete(tracker.spans, key) + } + + // propagate those values to the parent + pKey := newTraceAndSpan(span.TraceID(), span.ParentSpanID()) + if !span.ParentSpanID().IsEmpty() { + // find previous information + pInfo, pOk := tracker.spans[pKey] + if pOk { + // so we know that this span is parent of something + pInfo.spanCount += info.spanCount + pInfo.errorCount += info.errorCount + for k, v := range info.services { + pInfo.services[k] = v + } + tracker.spans[pKey] = pInfo + } else { + attrs["missing_parent"] = 1 + } + } else { + // we have processed parent, so lets remove it + delete(tracker.spans, pKey) + } + } + } + + // since attributes are overwriting existing keys, they have to be at the end + // updateWithPrefixedValues(attrs, "attributes_", "_", span.Attributes().AsRaw(), 0) + updateWithPrefixedValues(attrs, "", "_", span.Attributes().AsRaw(), 0) + + event.Attrs = attrs + event.Log = "LT" + event.Thread = "TT" + return &add_events.EventBundle{ + Event: &event, + Thread: &add_events.Thread{Id: "TT", Name: "traces"}, + Log: &add_events.Log{Id: "LT", Attrs: map[string]interface{}{}}, + } +} + +const resourceName = "resource_name" +const resourceType = "resource_type" + +type ResourceType string + +const ( + Service = ResourceType("service") + Process = ResourceType("process") +) + +func updateResource(attrs map[string]interface{}, resource map[string]any) { + // first detect, whether there is key service.name + // if it's there, we are done + name, found := resource["service.name"] + if found { + attrs[resourceName] = name + attrs[resourceType] = string(Service) + return + } + + // if we were not able to find service name, lets mark it as process + attrs[resourceName] = "" + attrs[resourceType] = string(Process) + + // but still try to search for anything, that start on service + // if we found it, we will mark it as service + for k, v := range resource { + if strings.HasPrefix(k, "service") { + attrs[resourceName] = "" + attrs[resourceType] = string(Service) + return + } + // when we find process.pid - lets use it as name + if k == "process.pid" { + attrs[resourceName] = v + } + } +} + +func updateServices(attrs map[string]interface{}, services map[string]bool) { + servicesA := make([]string, 0) + service, serviceFound := attrs["services"] + if serviceFound { + delete(services, service.(string)) + } + + if len(services) > 0 { + for k := range services { + servicesA = append(servicesA, k) + } + + sort.Strings(servicesA) + attrs["services"] = fmt.Sprintf("%s,%s", attrs["services"], strings.Join(servicesA, ",")) + } +} + +type spanBundle struct { + span ptrace.Span + resource pcommon.Resource + scope pcommon.InstrumentationScope +} + +func buildEventsFromTraces(ld ptrace.Traces, tracker *spanTracker) []*add_events.EventBundle { + var events []*add_events.EventBundle + var spans = make([]spanBundle, 0) + + // convert spans into events + resourceSpans := ld.ResourceSpans() + for i := 0; i < resourceSpans.Len(); i++ { + resource := resourceSpans.At(i).Resource() + scopeSpans := resourceSpans.At(i).ScopeSpans() + for j := 0; j < scopeSpans.Len(); j++ { + scope := scopeSpans.At(j).Scope() + spanRecords := scopeSpans.At(j).Spans() + for k := 0; k < spanRecords.Len(); k++ { + spanRecord := spanRecords.At(k) + spans = append(spans, spanBundle{spanRecord, resource, scope}) + } + } + } + + if tracker != nil { + // sort by end time + // there is no guarantee that parent ends last, so lets place at least all root nodes + // at the end + // to get even better results, we should do topological sorting, but still events + // can be in multiple batches, so it will not be perfect either + // this should be implemented on the backend to support multiple collectors anyway + sort.Slice(spans, func(i, j int) bool { + if spans[i].span.ParentSpanID().IsEmpty() == spans[j].span.ParentSpanID().IsEmpty() { + return spans[i].span.EndTimestamp() < spans[j].span.EndTimestamp() + } + + return !spans[i].span.ParentSpanID().IsEmpty() + }) + } + + for _, span := range spans { + if tracker != nil { + tracker.update(span) + } + events = append(events, buildEventFromSpan(span, tracker)) + } + + if tracker != nil { + // purge old values + tracker.purge() + } + + return events +} + +func (e *DatasetExporter) consumeTraces(ctx context.Context, ld ptrace.Traces) error { + + return sendBatch(buildEventsFromTraces(ld, e.spanTracker), e.client) +} + +type spanInfo struct { + errorCount int + spanCount int + createdAt time.Time + services map[string]bool +} + +type TraceAndSpan [24]byte + +func newTraceAndSpan(traceID pcommon.TraceID, spanID pcommon.SpanID) TraceAndSpan { + var traceAndSpan TraceAndSpan + copy(traceAndSpan[:], traceID[:]) + copy(traceAndSpan[16:], spanID[:]) + return traceAndSpan +} + +func (ts TraceAndSpan) split() (traceID pcommon.TraceID, spanID pcommon.SpanID) { + copy(traceID[:], ts[:16]) + copy(spanID[:], ts[16:]) + return traceID, spanID +} + +func (ts TraceAndSpan) String() string { + traceID, spanID := ts.split() + return traceID.String() + spanID.String() +} + +type spanTracker struct { + spans map[TraceAndSpan]spanInfo + purgedAt time.Time + purgeEvery time.Duration +} + +func newSpanTracker(purgeEvery time.Duration) *spanTracker { + return &spanTracker{ + spans: make(map[TraceAndSpan]spanInfo), + purgedAt: time.Now(), + purgeEvery: purgeEvery, + } +} + +func (tt *spanTracker) update(bundle spanBundle) { + span := bundle.span + resource := bundle.resource + // for empty trace ids do nothing + if span.TraceID().IsEmpty() { + return + } + + key := newTraceAndSpan(span.TraceID(), span.ParentSpanID()) + + // find previous information + info, ok := tt.spans[key] + if !ok { + info = spanInfo{ + createdAt: time.Now(), + services: make(map[string]bool), + } + } + + // increase counters + info.spanCount++ + if span.Status().Code() == ptrace.StatusCodeError { + info.errorCount++ + } + + // update services + service, serviceFound := resource.Attributes().Get(ServiceNameKey) + if serviceFound { + info.services[service.AsString()] = true + } + + tt.spans[key] = info +} + +func (tt *spanTracker) purge() int { + // if it was purged recently, do nothing + if time.Since(tt.purgedAt) < tt.purgeEvery { + return 0 + } + + // find and purge old values + purged := 0 + for key, info := range tt.spans { + if time.Since(info.createdAt) > tt.purgeEvery { + purged++ + delete(tt.spans, key) + } + } + + tt.purgedAt = time.Now() + return purged +} diff --git a/exporter/datasetexporter/traces_exporter_test.go b/exporter/datasetexporter/traces_exporter_test.go new file mode 100644 index 000000000000..4a7f9d8a37ef --- /dev/null +++ b/exporter/datasetexporter/traces_exporter_test.go @@ -0,0 +1,687 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package datasetexporter + +import ( + "context" + "fmt" + "os" + "sort" + "strings" + "testing" + "time" + + "github.com/scalyr/dataset-go/pkg/api/add_events" + "github.com/stretchr/testify/suite" + "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata" +) + +type SuiteTracesExporter struct { + suite.Suite +} + +func (s *SuiteTracesExporter) SetupTest() { + os.Clearenv() +} + +func TestSuiteTracesExporter(t *testing.T) { + suite.Run(t, new(SuiteTracesExporter)) +} + +func (s *SuiteTracesExporter) TestCreateTracesExporter() { + ctx := context.Background() + createSettings := exportertest.NewNopCreateSettings() + tests := createExporterTests() + + for _, tt := range tests { + s.T().Run(tt.name, func(*testing.T) { + logs, err := createTracesExporter(ctx, createSettings, tt.config) + + if err == nil { + s.Nil(tt.expectedError) + s.NotNil(logs) + } else { + s.Equal(tt.expectedError.Error(), err.Error()) + s.Nil(logs) + } + }) + } +} + +func generateTEvent1Raw() *add_events.Event { + return &add_events.Event{ + Thread: "TT", + Log: "LT", + Sev: 9, + Ts: "1581452772000000321", + Attrs: map[string]interface{}{ + "sca:schemVer": 1, + "sca:schema": "tracing", + "sca:type": "span", + + "name": "operationA", + "kind": "unspecified", + + "start_time_unix_nano": "1581452772000000321", + "end_time_unix_nano": "1581452773000000789", + "duration_nano": "1000000468", + + "span_id": "", + "trace_id": "", + "resource_name": "", + "resource_type": "process", + "status_code": "error", + "status_message": "status-cancelled", + }, + } +} + +func generateTEvent2Raw() *add_events.Event { + return &add_events.Event{ + Thread: "TT", + Log: "LT", + Sev: 9, + Ts: "1581452772000000321", + Attrs: map[string]interface{}{ + "sca:schemVer": 1, + "sca:schema": "tracing", + "sca:type": "span", + + "name": "operationB", + "kind": "unspecified", + + "start_time_unix_nano": "1581452772000000321", + "end_time_unix_nano": "1581452773000000789", + "duration_nano": "1000000468", + + "span_id": "", + "trace_id": "", + "status_code": "unset", + "status_message": "", + "resource_name": "", + "resource_type": "process", + }, + } +} + +func generateTEvent3Raw() *add_events.Event { + return &add_events.Event{ + Thread: "TT", + Log: "LT", + Sev: 9, + Ts: "1581452772000000321", + Attrs: map[string]interface{}{ + "sca:schemVer": 1, + "sca:schema": "tracing", + "sca:type": "span", + + "name": "operationC", + "kind": "unspecified", + + "start_time_unix_nano": "1581452772000000321", + "end_time_unix_nano": "1581452773000000789", + "duration_nano": "1000000468", + + "span_id": "", + "trace_id": "", + "span-attr": "span-attr-val", + "status_code": "unset", + "status_message": "", + "resource_name": "", + "resource_type": "process", + }, + } +} + +var testTThread = &add_events.Thread{ + Id: "TT", + Name: "traces", +} + +var testTLog = &add_events.Log{ + Id: "LT", + Attrs: map[string]interface{}{}, +} + +func (s *SuiteTracesExporter) TestBuildEventFromSpanOne() { + traces := testdata.GenerateTracesOneSpan() + span := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0) + expected := &add_events.EventBundle{ + Event: generateTEvent1Raw(), + Thread: testTThread, + Log: testTLog, + } + was := buildEventFromSpan( + spanBundle{ + span, + traces.ResourceSpans().At(0).Resource(), + traces.ResourceSpans().At(0).ScopeSpans().At(0).Scope(), + }, + newSpanTracker(time.Hour), + ) + + s.Equal(expected, was) +} + +func (s *SuiteTracesExporter) TestBuildEventsFromSpanAttributesCollision() { + td := ptrace.NewTraces() + rs := td.ResourceSpans().AppendEmpty() + rss := rs.ScopeSpans().AppendEmpty() + span := rss.Spans().AppendEmpty() + span.Attributes().PutStr("name", "should_be_name_") + span.Attributes().PutStr("span_id", "should_be_span_id_") + expected := &add_events.EventBundle{ + Event: &add_events.Event{ + Thread: "TT", + Log: "LT", + Sev: 9, + Ts: "0", + Attrs: map[string]interface{}{ + "sca:schemVer": 1, + "sca:schema": "tracing", + "sca:type": "span", + + "name": "", + "kind": "unspecified", + + "start_time_unix_nano": "0", + "end_time_unix_nano": "0", + "duration_nano": "0", + + "span_id": "", + "trace_id": "", + "status_code": "unset", + "status_message": "", + "resource_name": "", + "resource_type": "process", + "name_": "should_be_name_", + "span_id_": "should_be_span_id_", + }, + }, + Thread: testTThread, + Log: testTLog, + } + was := buildEventFromSpan( + spanBundle{ + span, + rs.Resource(), + rss.Scope(), + }, + newSpanTracker(time.Hour), + ) + + s.Equal(expected, was) +} + +func (s *SuiteTracesExporter) TestBuildEventsFromTracesFromTwoSpansSameResourceOneDifferent() { + traces := testdata.GenerateTracesTwoSpansSameResourceOneDifferent() + was := buildEventsFromTraces(traces, newSpanTracker(time.Hour)) + + expected := []*add_events.EventBundle{ + { + Event: generateTEvent1Raw(), + Thread: testTThread, + Log: testTLog, + }, + { + Event: generateTEvent2Raw(), + Thread: testTThread, + Log: testTLog, + }, + { + Event: generateTEvent3Raw(), + Thread: testTThread, + Log: testTLog, + }, + } + + s.Equal(expected, was) +} + +var span0Id = [8]byte{1, 1, 1, 1, 1, 1, 1, 1} +var span00Id = [8]byte{1, 2, 1, 1, 1, 1, 1, 1} +var span01Id = [8]byte{1, 3, 1, 1, 1, 1, 1, 1} +var span000Id = [8]byte{1, 2, 2, 1, 1, 1, 1, 1} +var span001Id = [8]byte{1, 2, 3, 1, 1, 1, 1, 1} +var span002Id = [8]byte{1, 2, 4, 1, 1, 1, 1, 1} + +var span1Id = [8]byte{2, 2, 2, 2, 2, 2, 2, 2} +var span10Id = [8]byte{2, 3, 2, 2, 2, 2, 2, 2} + +var span21Id = [8]byte{3, 3, 3, 3, 3, 3, 3, 3} +var span22Id = [8]byte{3, 4, 3, 3, 3, 3, 3, 3} + +var span21PId = [8]byte{3, 5, 3, 3, 3, 3, 3, 3} +var span22PId = [8]byte{3, 6, 3, 3, 3, 3, 3, 3} + +var span3Id = [8]byte{4, 4, 4, 4, 4, 4, 4, 4} +var span30Id = [8]byte{4, 5, 4, 4, 4, 4, 4, 4} + +var trace0Id = [16]byte{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1} +var trace1Id = [16]byte{2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2} +var trace2Id = [16]byte{3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3} +var trace3Id = [16]byte{4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4} + +func GenerateTracesTreesAndOrphans() ptrace.Traces { + td := ptrace.NewTraces() + rs0 := td.ResourceSpans().AppendEmpty() + rs0ils0 := rs0.ScopeSpans().AppendEmpty() + span001 := rs0ils0.Spans().AppendEmpty() + span000 := rs0ils0.Spans().AppendEmpty() + span30 := rs0ils0.Spans().AppendEmpty() + rs1 := td.ResourceSpans().AppendEmpty() + rs1ils0 := rs1.ScopeSpans().AppendEmpty() + span22 := rs1ils0.Spans().AppendEmpty() + span21 := rs1ils0.Spans().AppendEmpty() + span10 := rs1ils0.Spans().AppendEmpty() + span01 := rs1ils0.Spans().AppendEmpty() + span00 := rs1ils0.Spans().AppendEmpty() + rs2 := td.ResourceSpans().AppendEmpty() + rs2ils0 := rs2.ScopeSpans().AppendEmpty() + span1 := rs2ils0.Spans().AppendEmpty() + span0 := rs2ils0.Spans().AppendEmpty() + span002 := rs2ils0.Spans().AppendEmpty() + span3 := rs2ils0.Spans().AppendEmpty() + + // set error statuses + status21 := span21.Status() + status21.SetCode(ptrace.StatusCodeError) + + status000 := span000.Status() + status000.SetCode(ptrace.StatusCodeError) + + status001 := span001.Status() + status001.SetCode(ptrace.StatusCodeError) + + // set traces + trace0 := pcommon.TraceID(trace0Id) + trace1 := pcommon.TraceID(trace1Id) + trace2 := pcommon.TraceID(trace2Id) + trace3 := pcommon.TraceID(trace3Id) + + span0.SetTraceID(trace0) + span00.SetTraceID(trace0) + span01.SetTraceID(trace0) + span000.SetTraceID(trace0) + span001.SetTraceID(trace0) + span002.SetTraceID(trace0) + + span1.SetTraceID(trace1) + span10.SetTraceID(trace1) + + span21.SetTraceID(trace2) + span22.SetTraceID(trace2) + + span3.SetTraceID(trace3) + span30.SetTraceID(trace3) + + // set span ids for trace 0 - it's tree + + span0.SetSpanID(span0Id) + span0.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Unix(0, 0))) + span0.SetEndTimestamp(pcommon.NewTimestampFromTime(time.Unix(0, 10000))) + + span00.SetSpanID(span00Id) + span00.SetParentSpanID(span0.SpanID()) + span00.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Unix(0, 100))) + span00.SetEndTimestamp(pcommon.NewTimestampFromTime(time.Unix(0, 4900))) + + span01.SetSpanID(span01Id) + span01.SetParentSpanID(span0.SpanID()) + span01.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Unix(0, 5100))) + span01.SetEndTimestamp(pcommon.NewTimestampFromTime(time.Unix(0, 9900))) + + span000.SetSpanID(span000Id) + span000.SetParentSpanID(span00.SpanID()) + span000.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Unix(0, 200))) + span000.SetEndTimestamp(pcommon.NewTimestampFromTime(time.Unix(0, 2000))) + + span001.SetSpanID(span001Id) + span001.SetParentSpanID(span00.SpanID()) + span001.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Unix(0, 2100))) + span001.SetEndTimestamp(pcommon.NewTimestampFromTime(time.Unix(0, 3800))) + + span002.SetSpanID(span002Id) + span002.SetParentSpanID(span00.SpanID()) + span002.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Unix(0, 4000))) + span002.SetEndTimestamp(pcommon.NewTimestampFromTime(time.Unix(0, 4800))) + + // set span ids for trace 1 + span1.SetSpanID(span1Id) + span1.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Unix(0, 10000))) + span1.SetEndTimestamp(pcommon.NewTimestampFromTime(time.Unix(0, 20000))) + span10.SetSpanID(span10Id) + span10.SetParentSpanID(span1.SpanID()) + span10.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Unix(0, 10100))) + span10.SetEndTimestamp(pcommon.NewTimestampFromTime(time.Unix(0, 19900))) + + // set span ids for trace 2 - there is no parent + span21.SetSpanID(span21Id) + span21.SetParentSpanID(span21PId) + span21.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Unix(0, 21000))) + span21.SetEndTimestamp(pcommon.NewTimestampFromTime(time.Unix(0, 22000))) + span22.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Unix(0, 23000))) + span22.SetEndTimestamp(pcommon.NewTimestampFromTime(time.Unix(0, 24000))) + + span22.SetSpanID(span22Id) + span22.SetParentSpanID(span22PId) + + // set spans for trace 3 - parent starts later and starts sooner than child + // set span ids for trace 1 + span3.SetSpanID(span3Id) + span3.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Unix(0, 40100))) + span3.SetEndTimestamp(pcommon.NewTimestampFromTime(time.Unix(0, 49900))) + span30.SetSpanID(span30Id) + span30.SetParentSpanID(span3.SpanID()) + span30.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Unix(0, 40000))) + span30.SetEndTimestamp(pcommon.NewTimestampFromTime(time.Unix(0, 50000))) + + // set resources + res0 := rs0.Resource() + res0.Attributes().PutStr("service.name", "sAAA") + res0.Attributes().PutStr("service.namespace", "snAAA") + res1 := rs1.Resource() + res1.Attributes().PutStr("service.name", "sBBB") + res1.Attributes().PutStr("service.namespace", "snBBB") + res2 := rs2.Resource() + res2.Attributes().PutStr("service.name", "sCCC") + res2.Attributes().PutStr("service.namespace", "snCCC") + + return td +} + +func generateSimpleEvent( + traceID string, + spanID string, + parentID string, + status ptrace.Status, + start int64, + end int64, + serviceName string, + services string, +) *add_events.Event { + attrs := map[string]interface{}{ + "sca:schemVer": 1, + "sca:schema": "tracing", + "sca:type": "span", + + "name": "", + "kind": "unspecified", + + "start_time_unix_nano": fmt.Sprintf("%d", start), + "end_time_unix_nano": fmt.Sprintf("%d", end), + "duration_nano": fmt.Sprintf("%d", end-start), + + "span_id": spanID, + "trace_id": traceID, + "status_code": strings.ToLower(status.Code().String()), + "status_message": status.Message(), + + "resource_name": serviceName, + "resource_type": "service", + "services": services, + } + if parentID != "" { + attrs["parent_span_id"] = parentID + } + + return &add_events.Event{ + Thread: "TT", + Log: "LT", + Sev: 9, + Ts: fmt.Sprintf("%d", start), + Attrs: attrs, + } +} + +func (s *SuiteTracesExporter) TestBuildEventsFromTracesTreesAndOrphansWithTracker() { + tracker := newSpanTracker(time.Second) + traces := GenerateTracesTreesAndOrphans() + was := buildEventsFromTraces(traces, tracker) + + statusUnset := ptrace.NewStatus() + statusError := ptrace.NewStatus() + statusError.SetCode(ptrace.StatusCodeError) + + expected := []*add_events.EventBundle{ + { + Event: generateSimpleEvent("01010101010101010101010101010101", "0102020101010101", "0102010101010101", statusError, 200, 2000, "sAAA", "sAAA"), + Thread: testTThread, + Log: testTLog, + }, + { + Event: generateSimpleEvent("01010101010101010101010101010101", "0102030101010101", "0102010101010101", statusError, 2100, 3800, "sAAA", "sAAA"), + Thread: testTThread, + Log: testTLog, + }, + { + Event: generateSimpleEvent("01010101010101010101010101010101", "0102040101010101", "0102010101010101", statusUnset, 4000, 4800, "sCCC", "sCCC"), + Thread: testTThread, + Log: testTLog, + }, + { + Event: generateSimpleEvent("01010101010101010101010101010101", "0102010101010101", "0101010101010101", statusUnset, 100, 4900, "sBBB", "sBBB,sAAA,sCCC"), + Thread: testTThread, + Log: testTLog, + }, + { + Event: generateSimpleEvent("01010101010101010101010101010101", "0103010101010101", "0101010101010101", statusUnset, 5100, 9900, "sBBB", "sBBB"), + Thread: testTThread, + Log: testTLog, + }, + + { + Event: generateSimpleEvent("02020202020202020202020202020202", "0203020202020202", "0202020202020202", statusUnset, 10100, 19900, "sBBB", "sBBB"), + Thread: testTThread, + Log: testTLog, + }, + + { + Event: generateSimpleEvent("03030303030303030303030303030303", "0303030303030303", "0305030303030303", statusError, 21000, 22000, "sBBB", "sBBB"), + Thread: testTThread, + Log: testTLog, + }, + { + Event: generateSimpleEvent("03030303030303030303030303030303", "0304030303030303", "0306030303030303", statusUnset, 23000, 24000, "sBBB", "sBBB"), + Thread: testTThread, + Log: testTLog, + }, + + { + Event: generateSimpleEvent("04040404040404040404040404040404", "0405040404040404", "0404040404040404", statusUnset, 40000, 50000, "sAAA", "sAAA"), + Thread: testTThread, + Log: testTLog, + }, + + { + Event: generateSimpleEvent("01010101010101010101010101010101", "0101010101010101", "", statusUnset, 0, 10000, "sCCC", "sCCC,sAAA,sBBB"), + Thread: testTThread, + Log: testTLog, + }, + + { + Event: generateSimpleEvent("02020202020202020202020202020202", "0202020202020202", "", statusUnset, 10000, 20000, "sCCC", "sCCC,sBBB"), + Thread: testTThread, + Log: testTLog, + }, + + { + Event: generateSimpleEvent("04040404040404040404040404040404", "0404040404040404", "", statusUnset, 40100, 49900, "sCCC", "sCCC,sAAA"), + Thread: testTThread, + Log: testTLog, + }, + } + + // span00 + expected[3].Event.Attrs["error_count"] = 2 + expected[3].Event.Attrs["span_count"] = 3 + + // span0 + expected[9].Event.Attrs["error_count"] = 2 + expected[9].Event.Attrs["span_count"] = 5 + + // span1 + expected[10].Event.Attrs["error_count"] = 0 + expected[10].Event.Attrs["span_count"] = 1 + + // span3 + expected[11].Event.Attrs["error_count"] = 0 + expected[11].Event.Attrs["span_count"] = 1 + + s.Equal(expected, was) + + expectedKeys := []string{ + newTraceAndSpan(trace2Id, span22PId).String(), + newTraceAndSpan(trace2Id, span21PId).String(), + } + sort.Slice(expectedKeys, func(i, j int) bool { + return expectedKeys[i] < expectedKeys[j] + }) + + wasKeys := make([]string, 0) + for k := range tracker.spans { + wasKeys = append(wasKeys, k.String()) + } + sort.Slice(wasKeys, func(i, j int) bool { + return wasKeys[i] < wasKeys[j] + }) + + s.Equal(expectedKeys, wasKeys) + s.Equal(2, len(tracker.spans)) + time.Sleep(time.Second) + s.Equal(2, tracker.purge()) + s.Equal(0, len(tracker.spans)) +} + +func (s *SuiteTracesExporter) TestBuildEventsFromTracesTreesAndOrphansWithoutTracker() { + traces := GenerateTracesTreesAndOrphans() + was := buildEventsFromTraces(traces, nil) + + statusUnset := ptrace.NewStatus() + statusError := ptrace.NewStatus() + statusError.SetCode(ptrace.StatusCodeError) + + expected := []*add_events.EventBundle{ + { + Event: generateSimpleEvent("01010101010101010101010101010101", "0102030101010101", "0102010101010101", statusError, 2100, 3800, "sAAA", "sAAA"), + Thread: testTThread, + Log: testTLog, + }, + { + Event: generateSimpleEvent("01010101010101010101010101010101", "0102020101010101", "0102010101010101", statusError, 200, 2000, "sAAA", "sAAA"), + Thread: testTThread, + Log: testTLog, + }, + { + Event: generateSimpleEvent("04040404040404040404040404040404", "0405040404040404", "0404040404040404", statusUnset, 40000, 50000, "sAAA", "sAAA"), + Thread: testTThread, + Log: testTLog, + }, + { + Event: generateSimpleEvent("03030303030303030303030303030303", "0304030303030303", "0306030303030303", statusUnset, 23000, 24000, "sBBB", "sBBB"), + Thread: testTThread, + Log: testTLog, + }, + { + Event: generateSimpleEvent("03030303030303030303030303030303", "0303030303030303", "0305030303030303", statusError, 21000, 22000, "sBBB", "sBBB"), + Thread: testTThread, + Log: testTLog, + }, + { + Event: generateSimpleEvent("02020202020202020202020202020202", "0203020202020202", "0202020202020202", statusUnset, 10100, 19900, "sBBB", "sBBB"), + Thread: testTThread, + Log: testTLog, + }, + { + Event: generateSimpleEvent("01010101010101010101010101010101", "0103010101010101", "0101010101010101", statusUnset, 5100, 9900, "sBBB", "sBBB"), + Thread: testTThread, + Log: testTLog, + }, + { + Event: generateSimpleEvent("01010101010101010101010101010101", "0102010101010101", "0101010101010101", statusUnset, 100, 4900, "sBBB", "sBBB"), + Thread: testTThread, + Log: testTLog, + }, + { + Event: generateSimpleEvent("02020202020202020202020202020202", "0202020202020202", "", statusUnset, 10000, 20000, "sCCC", "sCCC"), + Thread: testTThread, + Log: testTLog, + }, + { + Event: generateSimpleEvent("01010101010101010101010101010101", "0101010101010101", "", statusUnset, 0, 10000, "sCCC", "sCCC"), + Thread: testTThread, + Log: testTLog, + }, + { + Event: generateSimpleEvent("01010101010101010101010101010101", "0102040101010101", "0102010101010101", statusUnset, 4000, 4800, "sCCC", "sCCC"), + Thread: testTThread, + Log: testTLog, + }, + { + Event: generateSimpleEvent("04040404040404040404040404040404", "0404040404040404", "", statusUnset, 40100, 49900, "sCCC", "sCCC"), + Thread: testTThread, + Log: testTLog, + }, + } + + s.Equal(expected, was) +} + +func (s *SuiteTracesExporter) TestUpdateResource() { + tests := []struct { + name string + resource map[string]any + expected map[string]interface{} + }{ + { + name: "with_service.name", + resource: map[string]any{"service.name": "foo"}, + expected: map[string]interface{}{resourceName: "foo", resourceType: string(Service)}, + }, + { + name: "without_service.name", + resource: map[string]any{"service.bar": "foo"}, + expected: map[string]interface{}{resourceName: "", resourceType: string(Service)}, + }, + { + name: "with_process.pid", + resource: map[string]any{"process.pid": "bar"}, + expected: map[string]interface{}{resourceName: "bar", resourceType: string(Process)}, + }, + { + name: "prefer_service", + resource: map[string]any{"service.bar": "foo", "process.pid": "bar"}, + expected: map[string]interface{}{resourceName: "", resourceType: string(Service)}, + }, + { + name: "empty", + resource: map[string]any{}, + expected: map[string]interface{}{resourceName: "", resourceType: string(Process)}, + }, + } + + for _, tt := range tests { + s.T().Run(tt.name, func(*testing.T) { + attrs := make(map[string]interface{}) + updateResource(attrs, tt.resource) + + s.Equal(tt.expected, attrs, tt.name) + }) + } +}