Skip to content

Commit

Permalink
[exporter/datasetexporter]: Initial implementation of logs and traces (
Browse files Browse the repository at this point in the history
…#21815)

* [exporter/datasetexporter]: Add support for sending logs

* Fix gci issues

* Fix another batch of lint errors

* Regenerate metadata

* Change nanoseconds from int to string

* Remove changes in go.sum files

* Update exporter/datasetexporter/examples/e2e/README.md

Co-authored-by: Antoine Toulme <[email protected]>

* Update exporter/datasetexporter/traces_exporter.go

Co-authored-by: Antoine Toulme <[email protected]>

* Incorporate suggestions from the PR

* Use dataset-go library version 0.0.7

* Add Changelog entry

* Run `make` and fix all issues

* Add support for making aggregation configurable

* Update syntax in the README

* Fix go.mod in otelcontribcol

* Fix go.sum in datasetexporter

* Fix go.sum for otelcontribcol

* Fix make goporto

* Run make crosslink

* Run make gotidy to pass another check

* Update readme for example

* Rename docker-compose.yml to yaml

* User lowerCased constant names

* Remove examples

* Run make crosslink

---------

Co-authored-by: Antoine Toulme <[email protected]>
  • Loading branch information
martin-majlis-s1 and atoulme authored May 17, 2023
1 parent 3f5370e commit 5f9fabe
Show file tree
Hide file tree
Showing 19 changed files with 2,039 additions and 260 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: datasetexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for exporting logs and traces.

# One or more tracking issues related to the change
issues: [20660]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
2 changes: 2 additions & 0 deletions cmd/otelcontribcol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ require (
github.com/containerd/ttrpc v1.1.0 // indirect
github.com/coreos/go-oidc v2.2.1+incompatible // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/cskr/pubsub v1.0.2 // indirect
github.com/cyphar/filepath-securejoin v0.2.3 // indirect
github.com/danieljoos/wincred v1.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
Expand Down Expand Up @@ -551,6 +552,7 @@ require (
github.com/rs/cors v1.9.0 // indirect
github.com/samber/lo v1.37.0 // indirect
github.com/scaleway/scaleway-sdk-go v1.0.0-beta.14 // indirect
github.com/scalyr/dataset-go v0.0.7 // indirect
github.com/seccomp/libseccomp-golang v0.9.2-0.20220502022130-f33da4d89646 // indirect
github.com/secure-systems-lab/go-securesystemslib v0.4.0 // indirect
github.com/segmentio/asm v1.2.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions cmd/otelcontribcol/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 25 additions & 15 deletions exporter/datasetexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<!-- status autogenerated section -->
| Status | |
| ------------- |-----------|
| Stability | [development]: traces, logs |
| Stability | [development]: logs, traces |
| Distributions | [] |

[development]: https://github.com/open-telemetry/opentelemetry-collector#development
Expand All @@ -17,13 +17,22 @@ See the [Getting Started](https://app.scalyr.com/help/getting-started) guide.

### Required Settings

- `dataset_url` (no default): The URL of the DataSet API that ingests the data. Most likely https://app.scalyr.com. If not specified env variable `DATASET_URL` is used.
- `api_key` (no default): The "Log Write" API Key required to use API. Instructions how to get [API key](https://app.scalyr.com/help/api-keys). If not specified env variable `DATASET_API_KEY` is used.
- `dataset_url` (no default): The URL of the DataSet API that ingests the data. Most likely https://app.scalyr.com.
- `api_key` (no default): The "Log Write" API Key required to use API. Instructions how to get [API key](https://app.scalyr.com/help/api-keys).

If you do not want to specify `api_key` in the file, you can use the [builtin functionality](https://opentelemetry.io/docs/collector/configuration/#configuration-environment-variables) and use `api_key: ${env:DATASET_API_KEY}`.

### Optional Settings

- `max_delay_ms` (default = "15000"): The maximum delay between sending batches from the same source.
- `group_by` (default = []): The list of attributes based on which events should be grouped.
- `buffer`:
- `max_lifetime` (default = 5s): The maximum delay between sending batches from the same source.
- `group_by` (default = []): The list of attributes based on which events should be grouped.
- `retry_initial_interval` (default = 5s): Time to wait after the first failure before retrying.
- `retry_max_interval` (default = 30s): Is the upper bound on backoff.
- `max_elapsed_time` (default = 300s): Is the maximum amount of time spent trying to send a buffer.
- `traces`:
- `aggregate` (default = false): Count the number of spans and errors belonging to a trace.
- `max_wait` (default = 5s): The maximum waiting for all spans from single trace to arrive; ignored if `aggregate` is false.
- `retry_on_failure`: See [retry_on_failure](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md)
- `sending_queue`: See [sending_queue](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md)
- `timeout`: See [timeout](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md)
Expand All @@ -39,16 +48,12 @@ exporters:
dataset_url: https://app.scalyr.com
# API Key
api_key: your_api_key
# Send batch to the API at least every 15s
max_delay_ms: 15000
# Group data based on these attributes
group_by:
- attributes.container_id
- attributes.log.file.path
- body.map.kubernetes.container_hash
- body.map.kubernetes.pod_id
- body.map.kubernetes.docker_id
- body.map.stream
buffer:
# Send buffer to the API at least every 10s
max_lifetime: 10s
# Group data based on these attributes
group_by:
- attributes.container_id

service:
pipelines:
Expand All @@ -57,4 +62,9 @@ service:
processors: [batch]
# add dataset among your exporters
exporters: [dataset]
traces:
receivers: [otlp]
processors: [batch]
# add dataset among your exporters
exporters: [dataset]
```
108 changes: 79 additions & 29 deletions exporter/datasetexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,61 @@ package datasetexporter // import "github.com/open-telemetry/opentelemetry-colle

import (
"fmt"
"os"
"strconv"
"time"

"github.com/scalyr/dataset-go/pkg/buffer"
"github.com/scalyr/dataset-go/pkg/buffer_config"
datasetConfig "github.com/scalyr/dataset-go/pkg/config"
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

const maxDelayMs = "15000"
const tracesMaxWait = 5 * time.Second
const tracesAggregate = false

type TracesSettings struct {
Aggregate bool `mapstructure:"aggregate"`
MaxWait time.Duration `mapstructure:"max_wait"`
}

// newDefaultTracesSettings returns the default settings for TracesSettings.
func newDefaultTracesSettings() TracesSettings {
return TracesSettings{
Aggregate: tracesAggregate,
MaxWait: tracesMaxWait,
}
}

const bufferMaxLifetime = 5 * time.Second
const bufferRetryInitialInterval = 5 * time.Second
const bufferRetryMaxInterval = 30 * time.Second
const bufferRetryMaxElapsedTime = 300 * time.Second

type BufferSettings struct {
MaxLifetime time.Duration `mapstructure:"max_lifetime"`
GroupBy []string `mapstructure:"group_by"`
RetryInitialInterval time.Duration `mapstructure:"retry_initial_interval"`
RetryMaxInterval time.Duration `mapstructure:"retry_max_interval"`
RetryMaxElapsedTime time.Duration `mapstructure:"retry_max_elapsed_time"`
}

// newDefaultBufferSettings returns the default settings for BufferSettings.
func newDefaultBufferSettings() BufferSettings {
return BufferSettings{
MaxLifetime: bufferMaxLifetime,
GroupBy: []string{},
RetryInitialInterval: bufferRetryInitialInterval,
RetryMaxInterval: bufferRetryMaxInterval,
RetryMaxElapsedTime: bufferRetryMaxElapsedTime,
}
}

type Config struct {
DatasetURL string `mapstructure:"dataset_url"`
APIKey string `mapstructure:"api_key"`
MaxDelayMs string `mapstructure:"max_delay_ms"`
GroupBy []string `mapstructure:"group_by"`
DatasetURL string `mapstructure:"dataset_url"`
APIKey configopaque.String `mapstructure:"api_key"`
BufferSettings `mapstructure:"buffer"`
TracesSettings `mapstructure:"traces"`
exporterhelper.RetrySettings `mapstructure:"retry_on_failure"`
exporterhelper.QueueSettings `mapstructure:"sending_queue"`
exporterhelper.TimeoutSettings `mapstructure:"timeout"`
Expand All @@ -40,17 +81,6 @@ func (c *Config) Unmarshal(conf *confmap.Conf) error {
return fmt.Errorf("cannot unmarshal config: %w", err)
}

if len(c.DatasetURL) == 0 {
c.DatasetURL = os.Getenv("DATASET_URL")
}
if len(c.APIKey) == 0 {
c.APIKey = os.Getenv("DATASET_API_KEY")
}

if len(c.MaxDelayMs) == 0 {
c.MaxDelayMs = maxDelayMs
}

return nil
}

Expand All @@ -64,15 +94,6 @@ func (c *Config) Validate() error {
return fmt.Errorf("dataset_url is required")
}

_, err := strconv.Atoi(c.MaxDelayMs)
if err != nil {
return fmt.Errorf(
"max_delay_ms must be integer, but %s was used: %w",
c.MaxDelayMs,
err,
)
}

return nil
}

Expand All @@ -81,11 +102,40 @@ func (c *Config) Validate() error {
func (c *Config) String() string {
s := ""
s += fmt.Sprintf("%s: %s; ", "DatasetURL", c.DatasetURL)
s += fmt.Sprintf("%s: %s; ", "MaxDelayMs", c.MaxDelayMs)
s += fmt.Sprintf("%s: %s; ", "GroupBy", c.GroupBy)
s += fmt.Sprintf("%s: %+v; ", "BufferSettings", c.BufferSettings)
s += fmt.Sprintf("%s: %+v; ", "TracesSettings", c.TracesSettings)
s += fmt.Sprintf("%s: %+v; ", "RetrySettings", c.RetrySettings)
s += fmt.Sprintf("%s: %+v; ", "QueueSettings", c.QueueSettings)
s += fmt.Sprintf("%s: %+v", "TimeoutSettings", c.TimeoutSettings)

return s
}

func (c *Config) convert() (*ExporterConfig, error) {
err := c.Validate()
if err != nil {
return nil, fmt.Errorf("config is not valid: %w", err)
}

return &ExporterConfig{
datasetConfig: &datasetConfig.DataSetConfig{
Endpoint: c.DatasetURL,
Tokens: datasetConfig.DataSetTokens{WriteLog: string(c.APIKey)},
BufferSettings: buffer_config.DataSetBufferSettings{
MaxLifetime: c.BufferSettings.MaxLifetime,
MaxSize: buffer.LimitBufferSize,
GroupBy: c.BufferSettings.GroupBy,
RetryInitialInterval: c.BufferSettings.RetryInitialInterval,
RetryMaxInterval: c.BufferSettings.RetryMaxInterval,
RetryMaxElapsedTime: c.BufferSettings.RetryMaxElapsedTime,
},
},
tracesSettings: c.TracesSettings,
},
nil
}

type ExporterConfig struct {
datasetConfig *datasetConfig.DataSetConfig
tracesSettings TracesSettings
}
Loading

0 comments on commit 5f9fabe

Please sign in to comment.