Skip to content

Commit

Permalink
[exporter/datasetexporter] Improved handling of "observed timestamp" …
Browse files Browse the repository at this point in the history
…and body / message fields, add config option for exporting scope attributes (#23826)

NOTE: This PR diff is slightly larger since it also includes / it's
based on top of changes from #23672.

I assume / hope that PR will be merged before this one since it would
make it a bit easier for me. Technically, I could also create a new
branch based on top of upstream ``main`` branch, but this would make it
harder for me since some of the changes in this branch depend on changes
in # 23672.

Once that PR will be merged, I will sync up this branch with latest
``main`` and then the diff should get smaller.

---

This pull request includes 3 small changes related to the consistency on
how we handle various field types.

1) Use consistent handling for the "observed timestamp" field

Field has been renamed to ``sca:observedTime``. 

This way we follow ``sca:`` prefix convention for special / internal
fields (e.g. we already have ``sca:ingestTime`` field added on the
server-side in some cases). The field value has also been changed from
ISO date time string to nano seconds since epoch. This way it's
consistent with other internal timestamp fields on the DataSet side.

2) Use consistent handling for ``message`` / ``body`` field

I have simplified and removed ``body.type`` and ``body.value`` approach.
This approach is not used by any other existing DataSet integrations so
it could confuse the end users.

For simple value types (string, bool, int, double) we simply populate
the ``message`` field and don't also store the value redundantly in
additional ``body.{int,bool,double}`` field.

Only exception right now is a complex map type - we decompose this one
into multiple fields in manner of ``body.map`` fields.

Since this is also something which doesn't happen in other DataSet
integrations I've put it behind config option / feature flag.

For now, it defaults to true, but in the future (based on user feedback,
etc.) we may want to switch it to ``false``, aka not perform this
decomposition by default.

In other integrations, this decomposition is usually handles elsewhere -
e.g. either on the client (part of the processor or similar) or on the
DataSet server side using a (JSON) parser.

3) Allow users to exclude scope information from the DataSet events.

This is similar to the ``export_resource_info_on_event`` change we made
recently.

I introduced a new config option / feature flag with which user can
disable inclusion of scope information with each event.

Scope information can also be quite noisy (depending on how
instrumentation is configured) and users may want to exclude it in case
they are not utilizing it on the server side - in this case it would
just result in increased (billable) DataSet log volume without any
additional value for the end user.

This data is less noisy and verbose than resource one so I decided to
leave the default value to ``true``, but in case it turns out that most
people don't use it and ``false`` makes more sense for the default
value, we can change it later on.

---------

Co-authored-by: tomas.zdara <[email protected]>
Co-authored-by: Pablo Baeyens <[email protected]>
  • Loading branch information
3 people authored Jul 20, 2023
1 parent acaa2b8 commit 247e2bb
Show file tree
Hide file tree
Showing 8 changed files with 177 additions and 77 deletions.
20 changes: 20 additions & 0 deletions .chloggen/dataset-exporter-timestamp-body-scope-improvements.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Use this changelog template to create an entry for release notes.
# If your change doesn't affect end users, such as a test fix or a tooling change,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: exporter/datasetexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Rename 'observed_timestamp' field on the DataSet event to 'sca:observedTimestamp' and ensure the value is nanoseconds since epoch, update serializing and handling of body / message field to ensure it's consistent with other DataSet integrations and allow user to disable exporting scope information with each event by setting 'export_scope_info_on_event' logs config option to false."

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [20660, 23826]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
2 changes: 2 additions & 0 deletions exporter/datasetexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ If you do not want to specify `api_key` in the file, you can use the [builtin fu
- `retry_max_elapsed_time` (default = 300s): Is the maximum amount of time spent trying to send a buffer.
- `logs`:
- `export_resource_info_on_event` (default = false): Include resource info to DataSet Event while exporting Logs. This is especially useful when reducing DataSet billable log volume.
- `export_scope_info_on_event` (default = false): Include LogRecord scope information (if available) on the DataSet event.
- `decompose_complex_message_field` (default = true): Set this to false to disable decomposing complex body / message field types (e.g. a map) into separate fields.
- `retry_on_failure`: See [retry_on_failure](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md)
- `sending_queue`: See [sending_queue](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md)
- `timeout`: See [timeout](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md)
Expand Down
16 changes: 15 additions & 1 deletion exporter/datasetexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,32 @@ func newDefaultTracesSettings() TracesSettings {
}

const logsExportResourceInfoDefault = false
const logsExportScopeInfoDefault = true
const logsDecomposeComplexMessageFieldDefault = false

type LogsSettings struct {
// ExportResourceInfo is optional flag to signal that the resource info is being exported to DataSet while exporting Logs.
// This is especially useful when reducing DataSet billable log volume.
// Default value: false.
ExportResourceInfo bool `mapstructure:"export_resource_info_on_event"`

// ExportScopeInfo is an optional flag that signals if scope info should be exported (when available) with each event. If scope
// information is not utilized, it makes sense to disable exporting it since it will result in increased billable log volume.
ExportScopeInfo bool `mapstructure:"export_scope_info_on_event"`

// DecomposeComplexMessageField is an optional flag to signal that message / body of complex types (e.g. a map) should be
// decomposed / deconstructed into multiple fields. This is usually done outside of the main DataSet integration on the
// client side (e.g. as part of the attribute processor or similar) or on the server side (DataSet server side JSON parser
// for message field) and that's why this functionality is disabled by default.
DecomposeComplexMessageField bool `mapstructure:"decompose_complex_message_field"`
}

// newDefaultLogsSettings returns the default settings for LogsSettings.
func newDefaultLogsSettings() LogsSettings {
return LogsSettings{
ExportResourceInfo: logsExportResourceInfoDefault,
ExportResourceInfo: logsExportResourceInfoDefault,
ExportScopeInfo: logsExportScopeInfoDefault,
DecomposeComplexMessageField: logsDecomposeComplexMessageFieldDefault,
}
}

Expand Down
19 changes: 18 additions & 1 deletion exporter/datasetexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ func TestConfigUseDefaults(t *testing.T) {
assert.Equal(t, "secret", string(config.APIKey))
assert.Equal(t, bufferMaxLifetime, config.MaxLifetime)
assert.Equal(t, logsExportResourceInfoDefault, config.LogsSettings.ExportResourceInfo)
assert.Equal(t, logsExportScopeInfoDefault, config.LogsSettings.ExportScopeInfo)
assert.Equal(t, logsDecomposeComplexMessageFieldDefault, config.LogsSettings.DecomposeComplexMessageField)
}

func TestConfigValidate(t *testing.T) {
Expand Down Expand Up @@ -111,7 +113,7 @@ func TestConfigString(t *testing.T) {
}

assert.Equal(t,
"DatasetURL: https://example.com; BufferSettings: {MaxLifetime:123ns GroupBy:[field1 field2] RetryInitialInterval:0s RetryMaxInterval:0s RetryMaxElapsedTime:0s}; TracesSettings: {}; RetrySettings: {Enabled:true InitialInterval:5s RandomizationFactor:0.5 Multiplier:1.5 MaxInterval:30s MaxElapsedTime:5m0s}; QueueSettings: {Enabled:true NumConsumers:10 QueueSize:1000 StorageID:<nil>}; TimeoutSettings: {Timeout:5s}; LogsSettings: {ExportResourceInfo:false}",
"DatasetURL: https://example.com; BufferSettings: {MaxLifetime:123ns GroupBy:[field1 field2] RetryInitialInterval:0s RetryMaxInterval:0s RetryMaxElapsedTime:0s}; TracesSettings: {}; RetrySettings: {Enabled:true InitialInterval:5s RandomizationFactor:0.5 Multiplier:1.5 MaxInterval:30s MaxElapsedTime:5m0s}; QueueSettings: {Enabled:true NumConsumers:10 QueueSize:1000 StorageID:<nil>}; TimeoutSettings: {Timeout:5s}; LogsSettings: {ExportResourceInfo:false ExportScopeInfo:false DecomposeComplexMessageField:false}",
config.String(),
)
}
Expand All @@ -130,3 +132,18 @@ func TestConfigUseProvidedExportResourceInfoValue(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, true, config.LogsSettings.ExportResourceInfo)
}

func TestConfigUseProvidedExportScopeInfoValue(t *testing.T) {
f := NewFactory()
config := f.CreateDefaultConfig().(*Config)
configMap := confmap.NewFromStringMap(map[string]interface{}{
"dataset_url": "https://example.com",
"api_key": "secret",
"logs": map[string]any{
"export_scope_info_on_event": false,
},
})
err := config.Unmarshal(configMap)
assert.Nil(t, err)
assert.Equal(t, false, config.LogsSettings.ExportScopeInfo)
}
8 changes: 6 additions & 2 deletions exporter/datasetexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func TestCreateDefaultConfig(t *testing.T) {
assert.Equal(t, &Config{
BufferSettings: newDefaultBufferSettings(),
TracesSettings: newDefaultTracesSettings(),
LogsSettings: newDefaultLogsSettings(),
RetrySettings: exporterhelper.NewDefaultRetrySettings(),
QueueSettings: exporterhelper.NewDefaultQueueSettings(),
TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(),
Expand Down Expand Up @@ -88,7 +89,9 @@ func TestLoadConfig(t *testing.T) {
},
TracesSettings: TracesSettings{},
LogsSettings: LogsSettings{
ExportResourceInfo: true,
ExportResourceInfo: true,
ExportScopeInfo: true,
DecomposeComplexMessageField: true,
},
RetrySettings: exporterhelper.RetrySettings{
Enabled: true,
Expand Down Expand Up @@ -136,7 +139,7 @@ func createExporterTests() []CreateTest {
{
name: "broken",
config: &Config{},
expectedError: fmt.Errorf("cannot get DataSetExpoter: cannot convert config: DatasetURL: ; BufferSettings: {MaxLifetime:0s GroupBy:[] RetryInitialInterval:0s RetryMaxInterval:0s RetryMaxElapsedTime:0s}; TracesSettings: {}; RetrySettings: {Enabled:false InitialInterval:0s RandomizationFactor:0 Multiplier:0 MaxInterval:0s MaxElapsedTime:0s}; QueueSettings: {Enabled:false NumConsumers:0 QueueSize:0 StorageID:<nil>}; TimeoutSettings: {Timeout:0s}; LogsSettings: {ExportResourceInfo:false}; config is not valid: api_key is required"),
expectedError: fmt.Errorf("cannot get DataSetExpoter: cannot convert config: DatasetURL: ; BufferSettings: {MaxLifetime:0s GroupBy:[] RetryInitialInterval:0s RetryMaxInterval:0s RetryMaxElapsedTime:0s}; TracesSettings: {}; RetrySettings: {Enabled:false InitialInterval:0s RandomizationFactor:0 Multiplier:0 MaxInterval:0s MaxElapsedTime:0s}; QueueSettings: {Enabled:false NumConsumers:0 QueueSize:0 StorageID:<nil>}; TimeoutSettings: {Timeout:0s}; LogsSettings: {ExportResourceInfo:false ExportScopeInfo:false DecomposeComplexMessageField:false}; config is not valid: api_key is required"),
},
{
name: "valid",
Expand All @@ -150,6 +153,7 @@ func createExporterTests() []CreateTest {
RetryMaxInterval: time.Minute,
RetryMaxElapsedTime: time.Hour,
},
LogsSettings: newDefaultLogsSettings(),
TracesSettings: TracesSettings{},
RetrySettings: exporterhelper.NewDefaultRetrySettings(),
QueueSettings: exporterhelper.NewDefaultQueueSettings(),
Expand Down
53 changes: 29 additions & 24 deletions exporter/datasetexporter/logs_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@ import (
"go.opentelemetry.io/collector/pdata/plog"
)

// We define it here so we can easily mock it inside tests
var now = time.Now

// Prefix which is added to all the special / internal DataSet fields
const specialDataSetFieldNamePrefix string = "sca:"

// If a LogRecord doesn't contain severity or we can't map it to a valid DataSet severity, we use
// this value (3 - INFO) instead
const defaultDataSetSeverityLevel int = dataSetLogLevelInfo
Expand Down Expand Up @@ -54,28 +58,21 @@ func createLogsExporter(ctx context.Context, set exporter.CreateSettings, config
)
}

func buildBody(attrs map[string]interface{}, value pcommon.Value) string {
func buildBody(settings LogsSettings, attrs map[string]interface{}, value pcommon.Value) string {
// The message / body is stored as part of the "message" field on the DataSet event.
message := value.AsString()
attrs["body.type"] = value.Type().String()
switch value.Type() {
case pcommon.ValueTypeEmpty:
attrs["body.empty"] = value.AsString()
case pcommon.ValueTypeStr:
attrs["body.str"] = value.Str()
case pcommon.ValueTypeBool:
attrs["body.bool"] = value.Bool()
case pcommon.ValueTypeDouble:
attrs["body.double"] = value.Double()
case pcommon.ValueTypeInt:
attrs["body.int"] = value.Int()
case pcommon.ValueTypeMap:

// Additionally, we support de-composing complex message value (e.g. map / dictionary) into
// multiple event attributes.
//
// This functionality is behind a config option / feature flag and not enabled by default
// since no other existing DataSet integrations handle it in this manner (aka for out of
// the box consistency reasons).
// If user wants to achieve something like that, they usually handle that on the client
// (e.g. attribute processor or similar) or on the server (DataSet server side JSON parser
// for the message field).
if settings.DecomposeComplexMessageField && value.Type() == pcommon.ValueTypeMap {
updateWithPrefixedValues(attrs, "body.map.", ".", value.Map().AsRaw(), 0)
case pcommon.ValueTypeBytes:
attrs["body.bytes"] = value.AsString()
case pcommon.ValueTypeSlice:
attrs["body.slice"] = value.AsRaw()
default:
attrs["body.unknown"] = value.AsString()
}

return message
Expand Down Expand Up @@ -183,14 +180,17 @@ func buildEventFromLog(
}

if body := log.Body().AsString(); body != "" {
attrs["message"] = buildBody(attrs, log.Body())
attrs["message"] = buildBody(settings, attrs, log.Body())
}

if dropped := log.DroppedAttributesCount(); dropped > 0 {
attrs["dropped_attributes_count"] = dropped
}

if !observedTs.Equal(time.Unix(0, 0)) {
attrs["observed.timestamp"] = observedTs.String()
attrs[specialDataSetFieldNamePrefix+"observedTime"] = strconv.FormatInt(observedTs.UnixNano(), 10)
}

if span := log.SpanID().String(); span != "" {
attrs["span_id"] = span
}
Expand All @@ -215,8 +215,13 @@ func buildEventFromLog(
if settings.ExportResourceInfo {
updateWithPrefixedValues(attrs, "resource.attributes.", ".", resource.Attributes().AsRaw(), 0)
}
attrs["scope.name"] = scope.Name()
updateWithPrefixedValues(attrs, "scope.attributes.", ".", scope.Attributes().AsRaw(), 0)

if settings.ExportScopeInfo {
if scope.Name() != "" {
attrs["scope.name"] = scope.Name()
}
updateWithPrefixedValues(attrs, "scope.attributes.", ".", scope.Attributes().AsRaw(), 0)
}

event.Attrs = attrs
event.Log = "LL"
Expand Down
Loading

0 comments on commit 247e2bb

Please sign in to comment.