Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/datasetexporter]: Initial implementation of logs and traces #21815

Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
fd8ee6c
[exporter/datasetexporter]: Add support for sending logs
martin-majlis-s1 May 5, 2023
b40524b
Merge branch 'main' into issue-20660-datasetexporter-logs-initial-code
martin-majlis-s1 May 10, 2023
ddccebe
Fix gci issues
martin-majlis-s1 May 11, 2023
d4f666a
Fix another batch of lint errors
martin-majlis-s1 May 11, 2023
ed1da0f
Regenerate metadata
martin-majlis-s1 May 11, 2023
8b0fc15
Change nanoseconds from int to string
martin-majlis-s1 May 11, 2023
62defb9
Remove changes in go.sum files
martin-majlis-s1 May 11, 2023
83aae30
Update exporter/datasetexporter/examples/e2e/README.md
martin-majlis-s1 May 12, 2023
da0c5f4
Update exporter/datasetexporter/traces_exporter.go
martin-majlis-s1 May 12, 2023
456cd94
Incorporate suggestions from the PR
martin-majlis-s1 May 12, 2023
24e803c
Merge branch 'main' into issue-20660-datasetexporter-logs-traces-init…
martin-majlis-s1 May 12, 2023
0af59fa
Use dataset-go library version 0.0.7
martin-majlis-s1 May 12, 2023
8eb3d5b
Add Changelog entry
martin-majlis-s1 May 12, 2023
539d90b
Run `make` and fix all issues
martin-majlis-s1 May 12, 2023
c9bfc0c
Add support for making aggregation configurable
martin-majlis-s1 May 12, 2023
fa2d4c3
Update syntax in the README
martin-majlis-s1 May 12, 2023
d73e26c
Merge branch 'main' into issue-20660-datasetexporter-logs-traces-init…
martin-majlis-s1 May 15, 2023
c0b439c
Fix go.mod in otelcontribcol
martin-majlis-s1 May 15, 2023
73e9ab6
Fix go.sum in datasetexporter
martin-majlis-s1 May 15, 2023
740f7fe
Fix go.sum for otelcontribcol
martin-majlis-s1 May 15, 2023
9900a05
Fix make goporto
martin-majlis-s1 May 15, 2023
bb30197
Run make crosslink
martin-majlis-s1 May 15, 2023
ecb09ec
Run make gotidy to pass another check
martin-majlis-s1 May 15, 2023
fcb384d
Update readme for example
martin-majlis-s1 May 15, 2023
050cf54
Rename docker-compose.yml to yaml
martin-majlis-s1 May 15, 2023
06fd3ab
User lowerCased constant names
martin-majlis-s1 May 15, 2023
27940c2
Merge branch 'main' into issue-20660-datasetexporter-logs-traces-init…
martin-majlis-s1 May 17, 2023
16b34d1
Remove examples
martin-majlis-s1 May 17, 2023
f4e2dbf
Run make crosslink
martin-majlis-s1 May 17, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/otelcontribcol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ require (
github.com/containerd/ttrpc v1.1.0 // indirect
github.com/coreos/go-oidc v2.2.1+incompatible // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/cskr/pubsub v1.0.2 // indirect
github.com/cyphar/filepath-securejoin v0.2.3 // indirect
github.com/danieljoos/wincred v1.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
Expand Down Expand Up @@ -549,6 +550,7 @@ require (
github.com/rs/cors v1.9.0 // indirect
github.com/samber/lo v1.37.0 // indirect
github.com/scaleway/scaleway-sdk-go v1.0.0-beta.14 // indirect
github.com/scalyr/dataset-go v0.0.6 // indirect
github.com/seccomp/libseccomp-golang v0.9.2-0.20220502022130-f33da4d89646 // indirect
github.com/secure-systems-lab/go-securesystemslib v0.4.0 // indirect
github.com/segmentio/asm v1.2.0 // indirect
Expand Down
2 changes: 1 addition & 1 deletion exporter/datasetexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<!-- status autogenerated section -->
| Status | |
| ------------- |-----------|
| Stability | [development]: traces, logs |
| Stability | [development]: logs, traces |
| Distributions | [] |

[development]: https://github.com/open-telemetry/opentelemetry-collector#development
Expand Down
72 changes: 54 additions & 18 deletions exporter/datasetexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,34 @@ package datasetexporter // import "github.com/open-telemetry/opentelemetry-colle
import (
"fmt"
"os"
"strconv"
"time"

"github.com/scalyr/dataset-go/pkg/buffer"
datasetConfig "github.com/scalyr/dataset-go/pkg/config"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

const maxDelayMs = "15000"
const maxDelay = 15 * time.Millisecond
const tracesMaxWait = 5 * time.Second

type TracesSettings struct {
MaxWait time.Duration `mapstructure:"max_wait"`
}

// NewDefaultTracesSettings returns the default settings for TracesSettings.
func NewDefaultTracesSettings() TracesSettings {
return TracesSettings{
MaxWait: tracesMaxWait,
}
}

type Config struct {
DatasetURL string `mapstructure:"dataset_url"`
APIKey string `mapstructure:"api_key"`
MaxDelayMs string `mapstructure:"max_delay_ms"`
GroupBy []string `mapstructure:"group_by"`
DatasetURL string `mapstructure:"dataset_url"`
APIKey string `mapstructure:"api_key"`
MaxDelay time.Duration `mapstructure:"max_delay"`
GroupBy []string `mapstructure:"group_by"`
TracesSettings `mapstructure:"traces"`
exporterhelper.RetrySettings `mapstructure:"retry_on_failure"`
exporterhelper.QueueSettings `mapstructure:"sending_queue"`
exporterhelper.TimeoutSettings `mapstructure:"timeout"`
Expand All @@ -47,8 +62,12 @@ func (c *Config) Unmarshal(conf *confmap.Conf) error {
c.APIKey = os.Getenv("DATASET_API_KEY")
martin-majlis-s1 marked this conversation as resolved.
Show resolved Hide resolved
}

if len(c.MaxDelayMs) == 0 {
c.MaxDelayMs = maxDelayMs
if c.MaxDelay == 0 {
c.MaxDelay = maxDelay
}

if c.TracesSettings.MaxWait == 0 {
c.TracesSettings.MaxWait = tracesMaxWait
}

return nil
Expand All @@ -64,15 +83,6 @@ func (c *Config) Validate() error {
return fmt.Errorf("dataset_url is required")
}

_, err := strconv.Atoi(c.MaxDelayMs)
if err != nil {
return fmt.Errorf(
"max_delay_ms must be integer, but %s was used: %w",
c.MaxDelayMs,
err,
)
}

return nil
}

Expand All @@ -81,11 +91,37 @@ func (c *Config) Validate() error {
func (c *Config) String() string {
s := ""
s += fmt.Sprintf("%s: %s; ", "DatasetURL", c.DatasetURL)
s += fmt.Sprintf("%s: %s; ", "MaxDelayMs", c.MaxDelayMs)
s += fmt.Sprintf("%s: %s; ", "MaxDelay", c.MaxDelay)
s += fmt.Sprintf("%s: %s; ", "GroupBy", c.GroupBy)
s += fmt.Sprintf("%s: %+v; ", "TracesSettings", c.TracesSettings)
s += fmt.Sprintf("%s: %+v; ", "RetrySettings", c.RetrySettings)
s += fmt.Sprintf("%s: %+v; ", "QueueSettings", c.QueueSettings)
s += fmt.Sprintf("%s: %+v", "TimeoutSettings", c.TimeoutSettings)

return s
}

func (c *Config) Convert() (*ExporterConfig, error) {
err := c.Validate()
if err != nil {
return nil, fmt.Errorf("config is not valid: %w", err)
}

return &ExporterConfig{
datasetConfig: &datasetConfig.DataSetConfig{
Endpoint: c.DatasetURL,
Tokens: datasetConfig.DataSetTokens{WriteLog: c.APIKey},
MaxBufferDelay: c.MaxDelay,
MaxPayloadB: buffer.LimitBufferSize,
GroupBy: c.GroupBy,
RetryBase: 5 * time.Second,
martin-majlis-s1 marked this conversation as resolved.
Show resolved Hide resolved
},
tracesSettings: c.TracesSettings,
},
nil
}

type ExporterConfig struct {
datasetConfig *datasetConfig.DataSetConfig
tracesSettings TracesSettings
}
41 changes: 18 additions & 23 deletions exporter/datasetexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package datasetexporter
import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/suite"
"go.opentelemetry.io/collector/confmap"
Expand Down Expand Up @@ -77,19 +78,19 @@ func (s *SuiteConfig) TestConfigUseEnvWhenSet() {
s.Equal("api_key", config.APIKey)
}

func (s *SuiteConfig) TestConfigUseDefaultForMaxDelay() {
func (s *SuiteConfig) TestConfigUseDefaults() {
config := Config{}
conf := confmap.NewFromStringMap(map[string]interface{}{
"dataset_url": "https://example.com",
"api_key": "secret",
"max_delay_ms": "",
"dataset_url": "https://example.com",
"api_key": "secret",
})
err := config.Unmarshal(conf)
s.Nil(err)

s.Equal("https://example.com", config.DatasetURL)
s.Equal("secret", config.APIKey)
s.Equal("15000", config.MaxDelayMs)
s.Equal(maxDelay, config.MaxDelay)
s.Equal(tracesMaxWait, config.TracesSettings.MaxWait)
}

func (s *SuiteConfig) TestConfigValidate() {
Expand All @@ -103,35 +104,26 @@ func (s *SuiteConfig) TestConfigValidate() {
config: Config{
DatasetURL: "https://example.com",
APIKey: "secret",
MaxDelayMs: "12345",
MaxDelay: 123 * time.Millisecond,
},
expected: nil,
},
{
name: "missing api_key",
config: Config{
DatasetURL: "https://example.com",
MaxDelayMs: "15000",
MaxDelay: maxDelay,
},
expected: fmt.Errorf("api_key is required"),
},
{
name: "missing dataset_url",
config: Config{
APIKey: "1234",
MaxDelayMs: "15000",
APIKey: "1234",
MaxDelay: maxDelay,
},
expected: fmt.Errorf("dataset_url is required"),
},
{
name: "invalid max_delay_ms",
config: Config{
DatasetURL: "https://example.com",
APIKey: "1234",
MaxDelayMs: "abc",
},
expected: fmt.Errorf("max_delay_ms must be integer, but abc was used: strconv.Atoi: parsing \"abc\": invalid syntax"),
},
}

for _, tt := range tests {
Expand All @@ -148,17 +140,20 @@ func (s *SuiteConfig) TestConfigValidate() {

func (s *SuiteConfig) TestConfigString() {
config := Config{
DatasetURL: "https://example.com",
APIKey: "secret",
MaxDelayMs: "1234",
GroupBy: []string{"field1", "field2"},
DatasetURL: "https://example.com",
APIKey: "secret",
MaxDelay: 123,
GroupBy: []string{"field1", "field2"},
TracesSettings: TracesSettings{
MaxWait: 45 * time.Second,
},
RetrySettings: exporterhelper.NewDefaultRetrySettings(),
QueueSettings: exporterhelper.NewDefaultQueueSettings(),
TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(),
}

s.Equal(
"DatasetURL: https://example.com; MaxDelayMs: 1234; GroupBy: [field1 field2]; RetrySettings: {Enabled:true InitialInterval:5s RandomizationFactor:0.5 Multiplier:1.5 MaxInterval:30s MaxElapsedTime:5m0s}; QueueSettings: {Enabled:true NumConsumers:10 QueueSize:1000 StorageID:<nil>}; TimeoutSettings: {Timeout:5s}",
"DatasetURL: https://example.com; MaxDelay: 123ns; GroupBy: [field1 field2]; TracesSettings: {MaxWait:45s}; RetrySettings: {Enabled:true InitialInterval:5s RandomizationFactor:0.5 Multiplier:1.5 MaxInterval:30s MaxElapsedTime:5m0s}; QueueSettings: {Enabled:true NumConsumers:10 QueueSize:1000 StorageID:<nil>}; TimeoutSettings: {Timeout:5s}",
config.String(),
)
}
124 changes: 80 additions & 44 deletions exporter/datasetexporter/datasetexporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,71 +15,107 @@
package datasetexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datasetexporter"

import (
"context"
"fmt"
"sync"
"net/http"
"reflect"
"strconv"
"time"

"github.com/google/uuid"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/ptrace"
"github.com/scalyr/dataset-go/pkg/api/add_events"
"github.com/scalyr/dataset-go/pkg/client"
"go.uber.org/zap"
"golang.org/x/time/rate"
)

type datasetExporter struct {
limiter *rate.Limiter
logger *zap.Logger
session string
type DatasetExporter struct {
client *client.DataSetClient
limiter *rate.Limiter
logger *zap.Logger
session string
spanTracker spanTracker
}

var exporterInstance *datasetExporter
func NewDatasetExporter(entity string, config *Config, logger *zap.Logger) (*DatasetExporter, error) {
martin-majlis-s1 marked this conversation as resolved.
Show resolved Hide resolved
logger.Info("Creating new DataSetExporter",
zap.String("config", config.String()),
zap.String("entity", entity),
)
exporterCfg, err := config.Convert()
if err != nil {
return nil, fmt.Errorf(
"cannot convert config: %s; %w",
config.String(), err,
)
}

func newDatasetExporter(logger *zap.Logger) (*datasetExporter, error) {
logger.Info("Creating new DataSet Exporter with config")
if logger == nil {
return nil, fmt.Errorf("logger has to be set")
client, err := client.NewClient(
exporterCfg.datasetConfig,
&http.Client{Timeout: time.Second * 60},
logger,
)
if err != nil {
logger.Error("Cannot create DataSetClient: ", zap.Error(err))
return nil, fmt.Errorf("cannot create newDatasetExporter: %w", err)
}

return &datasetExporter{
limiter: rate.NewLimiter(100*rate.Every(1*time.Minute), 100), // 100 requests / minute
session: uuid.New().String(),
logger: logger,
return &DatasetExporter{
client: client,
limiter: rate.NewLimiter(100*rate.Every(1*time.Minute), 100), // 100 requests / minute
session: uuid.New().String(),
logger: logger,
spanTracker: newSpanTracker(exporterCfg.tracesSettings.MaxWait),
}, nil
}

var lock = &sync.Mutex{}
func (e *DatasetExporter) shutdown() {
e.client.SendAllAddEventsBuffers()
}

func getDatasetExporter(entity string, config *Config, logger *zap.Logger) (*datasetExporter, error) {
logger.Info(
"Get logger for: ",
zap.String("entity", entity),
)
// TODO: create exporter per config
if exporterInstance == nil {
lock.Lock()
defer lock.Unlock()
if exporterInstance == nil {
logger.Info(
"DataSetExport is using config: ",
zap.String("config", config.String()),
zap.String("entity", entity),
)
instance, err := newDatasetExporter(logger)
if err != nil {
return nil, fmt.Errorf("cannot create new dataset exporter: %w", err)
}
exporterInstance = instance
}
func sendBatch(events []*add_events.EventBundle, client *client.DataSetClient) error {
return client.AddEvents(events)
}

func buildKey(prefix string, separator string, key string, depth int) string {
res := prefix
if depth > 0 && len(prefix) > 0 {
res += separator
}
res += key
return res
}

return exporterInstance, nil
func updateWithPrefixedValuesMap(target map[string]interface{}, prefix string, separator string, source map[string]interface{}, depth int) {
for k, v := range source {
key := buildKey(prefix, separator, k, depth)
updateWithPrefixedValues(target, key, separator, v, depth+1)
}
}

func (e *datasetExporter) consumeLogs(ctx context.Context, ld plog.Logs) error {
return nil
func updateWithPrefixedValuesArray(target map[string]interface{}, prefix string, separator string, source []interface{}, depth int) {
for i, v := range source {
key := buildKey(prefix, separator, strconv.FormatInt(int64(i), 10), depth)
updateWithPrefixedValues(target, key, separator, v, depth+1)
}
}

func (e *datasetExporter) consumeTraces(ctx context.Context, ld ptrace.Traces) error {
return nil
func updateWithPrefixedValues(target map[string]interface{}, prefix string, separator string, source interface{}, depth int) {
st := reflect.TypeOf(source)
switch st.Kind() {
case reflect.Map:
updateWithPrefixedValuesMap(target, prefix, separator, source.(map[string]interface{}), depth)
case reflect.Array, reflect.Slice:
updateWithPrefixedValuesArray(target, prefix, separator, source.([]interface{}), depth)
default:
for {
_, found := target[prefix]
if found {
prefix += separator
} else {
target[prefix] = source
break
}
}

}
}
3 changes: 3 additions & 0 deletions exporter/datasetexporter/examples/e2e/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
docker-compose.yml
Dockerfile
otel-config.yaml
Loading