diff --git a/.chloggen/add_multi_keys_batchperresourceattr.yaml b/.chloggen/add_multi_keys_batchperresourceattr.yaml new file mode 100755 index 000000000000..a3557e75534a --- /dev/null +++ b/.chloggen/add_multi_keys_batchperresourceattr.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: splunkhecexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Batch data according to access token and index, if present. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [30404] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/exporter/splunkhecexporter/factory.go b/exporter/splunkhecexporter/factory.go index 609b9469fb58..b95028d8ca4b 100644 --- a/exporter/splunkhecexporter/factory.go +++ b/exporter/splunkhecexporter/factory.go @@ -41,6 +41,12 @@ type baseLogsExporter struct { consumer.Logs } +// TODO: Find a place for this to be shared. +type baseTracesExporter struct { + component.Component + consumer.Traces +} + // NewFactory creates a factory for Splunk HEC exporter. func NewFactory() exporter.Factory { return exporter.NewFactory( @@ -103,7 +109,7 @@ func createTracesExporter( c := newTracesClient(set, cfg) - return exporterhelper.NewTracesExporter( + e, err := exporterhelper.NewTracesExporter( ctx, set, cfg, @@ -114,6 +120,17 @@ func createTracesExporter( exporterhelper.WithQueue(cfg.QueueSettings), exporterhelper.WithStart(c.start), exporterhelper.WithShutdown(c.stop)) + + if err != nil { + return nil, err + } + + wrapped := &baseTracesExporter{ + Component: e, + Traces: batchperresourceattr.NewMultiBatchPerResourceTraces([]string{splunk.HecTokenLabel, splunk.DefaultIndexLabel}, e), + } + + return wrapped, nil } func createMetricsExporter( @@ -125,7 +142,7 @@ func createMetricsExporter( c := newMetricsClient(set, cfg) - exporter, err := exporterhelper.NewMetricsExporter( + e, err := exporterhelper.NewMetricsExporter( ctx, set, cfg, @@ -141,8 +158,8 @@ func createMetricsExporter( } wrapped := &baseMetricsExporter{ - Component: exporter, - Metrics: batchperresourceattr.NewBatchPerResourceMetrics(splunk.HecTokenLabel, exporter), + Component: e, + Metrics: batchperresourceattr.NewMultiBatchPerResourceMetrics([]string{splunk.HecTokenLabel, splunk.DefaultIndexLabel}, e), } return wrapped, nil @@ -175,7 +192,7 @@ func createLogsExporter( wrapped := &baseLogsExporter{ Component: logsExporter, - Logs: batchperresourceattr.NewBatchPerResourceLogs(splunk.HecTokenLabel, &perScopeBatcher{ + Logs: batchperresourceattr.NewMultiBatchPerResourceLogs([]string{splunk.HecTokenLabel, splunk.DefaultIndexLabel}, &perScopeBatcher{ logsEnabled: cfg.LogDataEnabled, profilingEnabled: cfg.ProfilingDataEnabled, logger: set.Logger, diff --git a/pkg/batchperresourceattr/batchperresourceattr.go b/pkg/batchperresourceattr/batchperresourceattr.go index ab29bcc2a1bc..92f55b6481da 100644 --- a/pkg/batchperresourceattr/batchperresourceattr.go +++ b/pkg/batchperresourceattr/batchperresourceattr.go @@ -5,6 +5,7 @@ package batchperresourceattr // import "github.com/open-telemetry/opentelemetry- import ( "context" + "fmt" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/plog" @@ -13,15 +14,24 @@ import ( "go.uber.org/multierr" ) +var separator = string([]byte{0x0, 0x1}) + type batchTraces struct { - attrKey string - next consumer.Traces + attrKeys []string + next consumer.Traces } func NewBatchPerResourceTraces(attrKey string, next consumer.Traces) consumer.Traces { return &batchTraces{ - attrKey: attrKey, - next: next, + attrKeys: []string{attrKey}, + next: next, + } +} + +func NewMultiBatchPerResourceTraces(attrKeys []string, next consumer.Traces) consumer.Traces { + return &batchTraces{ + attrKeys: attrKeys, + next: next, } } @@ -42,9 +52,13 @@ func (bt *batchTraces) ConsumeTraces(ctx context.Context, td ptrace.Traces) erro for i := 0; i < lenRss; i++ { rs := rss.At(i) var attrVal string - if attributeValue, ok := rs.Resource().Attributes().Get(bt.attrKey); ok { - attrVal = attributeValue.Str() + + for _, k := range bt.attrKeys { + if attributeValue, ok := rs.Resource().Attributes().Get(k); ok { + attrVal = fmt.Sprintf("%s%s%s", attrVal, separator, attributeValue.Str()) + } } + indicesByAttr[attrVal] = append(indicesByAttr[attrVal], i) } // If there is a single attribute value, then call next. @@ -66,14 +80,21 @@ func (bt *batchTraces) ConsumeTraces(ctx context.Context, td ptrace.Traces) erro } type batchMetrics struct { - attrKey string - next consumer.Metrics + attrKeys []string + next consumer.Metrics } func NewBatchPerResourceMetrics(attrKey string, next consumer.Metrics) consumer.Metrics { return &batchMetrics{ - attrKey: attrKey, - next: next, + attrKeys: []string{attrKey}, + next: next, + } +} + +func NewMultiBatchPerResourceMetrics(attrKeys []string, next consumer.Metrics) consumer.Metrics { + return &batchMetrics{ + attrKeys: attrKeys, + next: next, } } @@ -94,8 +115,10 @@ func (bt *batchMetrics) ConsumeMetrics(ctx context.Context, td pmetric.Metrics) for i := 0; i < lenRms; i++ { rm := rms.At(i) var attrVal string - if attributeValue, ok := rm.Resource().Attributes().Get(bt.attrKey); ok { - attrVal = attributeValue.Str() + for _, k := range bt.attrKeys { + if attributeValue, ok := rm.Resource().Attributes().Get(k); ok { + attrVal = fmt.Sprintf("%s%s%s", attrVal, separator, attributeValue.Str()) + } } indicesByAttr[attrVal] = append(indicesByAttr[attrVal], i) } @@ -118,14 +141,21 @@ func (bt *batchMetrics) ConsumeMetrics(ctx context.Context, td pmetric.Metrics) } type batchLogs struct { - attrKey string - next consumer.Logs + attrKeys []string + next consumer.Logs } func NewBatchPerResourceLogs(attrKey string, next consumer.Logs) consumer.Logs { return &batchLogs{ - attrKey: attrKey, - next: next, + attrKeys: []string{attrKey}, + next: next, + } +} + +func NewMultiBatchPerResourceLogs(attrKeys []string, next consumer.Logs) consumer.Logs { + return &batchLogs{ + attrKeys: attrKeys, + next: next, } } @@ -146,8 +176,10 @@ func (bt *batchLogs) ConsumeLogs(ctx context.Context, td plog.Logs) error { for i := 0; i < lenRls; i++ { rl := rls.At(i) var attrVal string - if attributeValue, ok := rl.Resource().Attributes().Get(bt.attrKey); ok { - attrVal = attributeValue.Str() + for _, k := range bt.attrKeys { + if attributeValue, ok := rl.Resource().Attributes().Get(k); ok { + attrVal = fmt.Sprintf("%s%s%s", attrVal, separator, attributeValue.Str()) + } } indicesByAttr[attrVal] = append(indicesByAttr[attrVal], i) } diff --git a/pkg/batchperresourceattr/batchperresourceattr_test.go b/pkg/batchperresourceattr/batchperresourceattr_test.go index ede4a6f0a513..4d3bddbe9100 100644 --- a/pkg/batchperresourceattr/batchperresourceattr_test.go +++ b/pkg/batchperresourceattr/batchperresourceattr_test.go @@ -99,6 +99,35 @@ func TestSplitTracesIntoDifferentBatches(t *testing.T) { assert.Equal(t, newTraces(expected.ResourceSpans().At(3), expected.ResourceSpans().At(7)), outBatches[4]) } +func TestSplitTracesIntoDifferentBatchesWithMultipleKeys(t *testing.T) { + inBatch := ptrace.NewTraces() + fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "attr_key", "1", "attr_key2", "1") + fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "attr_key", "2", "attr_key2", "2") + fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "attr_key", "3", "attr_key2", "3") + fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "attr_key", "4", "attr_key2", "4") + fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "attr_key", "1", "attr_key2", "1") + fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "attr_key", "2", "attr_key2", "2") + fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "attr_key", "3", "attr_key2", "3") + fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "attr_key", "4", "attr_key2", "4") + fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "attr_key", "5", "attr_key2", "6") + fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "diff_attr_key", "1") + expected := ptrace.NewTraces() + inBatch.CopyTo(expected) + + sink := new(consumertest.TracesSink) + bpr := NewMultiBatchPerResourceTraces([]string{"attr_key", "attr_key2"}, sink) + assert.NoError(t, bpr.ConsumeTraces(context.Background(), inBatch)) + outBatches := sink.AllTraces() + require.Len(t, outBatches, 6) + sortTraces(outBatches, "attr_key") + assert.Equal(t, newTraces(expected.ResourceSpans().At(9)), outBatches[0]) + assert.Equal(t, newTraces(expected.ResourceSpans().At(0), expected.ResourceSpans().At(4)), outBatches[1]) + assert.Equal(t, newTraces(expected.ResourceSpans().At(1), expected.ResourceSpans().At(5)), outBatches[2]) + assert.Equal(t, newTraces(expected.ResourceSpans().At(2), expected.ResourceSpans().At(6)), outBatches[3]) + assert.Equal(t, newTraces(expected.ResourceSpans().At(3), expected.ResourceSpans().At(7)), outBatches[4]) + assert.Equal(t, newTraces(expected.ResourceSpans().At(8)), outBatches[5]) +} + func TestSplitMetricsOneResourceMetrics(t *testing.T) { inBatch := pmetric.NewMetrics() fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", "1") @@ -180,6 +209,35 @@ func TestSplitMetricsIntoDifferentBatches(t *testing.T) { assert.Equal(t, newMetrics(expected.ResourceMetrics().At(3), expected.ResourceMetrics().At(7)), outBatches[4]) } +func TestSplitMetricsIntoDifferentBatchesWithMultipleKeys(t *testing.T) { + inBatch := pmetric.NewMetrics() + fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", "1", "attr_key2", "1") + fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", "2", "attr_key2", "2") + fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", "3", "attr_key2", "3") + fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", "4", "attr_key2", "4") + fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", "1", "attr_key2", "1") + fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", "2", "attr_key2", "2") + fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", "3", "attr_key2", "3") + fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", "4", "attr_key2", "4") + fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", "5", "attr_key2", "6") + fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "diff_attr_key", "1") + expected := pmetric.NewMetrics() + inBatch.CopyTo(expected) + + sink := new(consumertest.MetricsSink) + bpr := NewMultiBatchPerResourceMetrics([]string{"attr_key", "attr_key2"}, sink) + assert.NoError(t, bpr.ConsumeMetrics(context.Background(), inBatch)) + outBatches := sink.AllMetrics() + require.Len(t, outBatches, 6) + sortMetrics(outBatches, "attr_key") + assert.Equal(t, newMetrics(expected.ResourceMetrics().At(9)), outBatches[0]) + assert.Equal(t, newMetrics(expected.ResourceMetrics().At(0), expected.ResourceMetrics().At(4)), outBatches[1]) + assert.Equal(t, newMetrics(expected.ResourceMetrics().At(1), expected.ResourceMetrics().At(5)), outBatches[2]) + assert.Equal(t, newMetrics(expected.ResourceMetrics().At(2), expected.ResourceMetrics().At(6)), outBatches[3]) + assert.Equal(t, newMetrics(expected.ResourceMetrics().At(3), expected.ResourceMetrics().At(7)), outBatches[4]) + assert.Equal(t, newMetrics(expected.ResourceMetrics().At(8)), outBatches[5]) +} + func TestSplitLogsOneResourceLogs(t *testing.T) { inBatch := plog.NewLogs() fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", "1") @@ -261,6 +319,35 @@ func TestSplitLogsIntoDifferentBatches(t *testing.T) { assert.Equal(t, newLogs(expected.ResourceLogs().At(3), expected.ResourceLogs().At(7)), outBatches[4]) } +func TestSplitLogsIntoDifferentBatchesWithMultipleKeys(t *testing.T) { + inBatch := plog.NewLogs() + fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", "1", "attr_key2", "1") + fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", "2", "attr_key2", "2") + fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", "3", "attr_key2", "3") + fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", "4", "attr_key2", "4") + fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", "1", "attr_key2", "1") + fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", "2", "attr_key2", "2") + fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", "3", "attr_key2", "3") + fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", "4", "attr_key2", "4") + fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", "5", "attr_key2", "6") + fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "diff_attr_key", "1") + expected := plog.NewLogs() + inBatch.CopyTo(expected) + + sink := new(consumertest.LogsSink) + bpr := NewMultiBatchPerResourceLogs([]string{"attr_key", "attr_key2"}, sink) + assert.NoError(t, bpr.ConsumeLogs(context.Background(), inBatch)) + outBatches := sink.AllLogs() + require.Len(t, outBatches, 6) + sortLogs(outBatches, "attr_key") + assert.Equal(t, newLogs(expected.ResourceLogs().At(9)), outBatches[0]) + assert.Equal(t, newLogs(expected.ResourceLogs().At(0), expected.ResourceLogs().At(4)), outBatches[1]) + assert.Equal(t, newLogs(expected.ResourceLogs().At(1), expected.ResourceLogs().At(5)), outBatches[2]) + assert.Equal(t, newLogs(expected.ResourceLogs().At(2), expected.ResourceLogs().At(6)), outBatches[3]) + assert.Equal(t, newLogs(expected.ResourceLogs().At(3), expected.ResourceLogs().At(7)), outBatches[4]) + assert.Equal(t, newLogs(expected.ResourceLogs().At(8)), outBatches[5]) +} + func newTraces(rss ...ptrace.ResourceSpans) ptrace.Traces { td := ptrace.NewTraces() for _, rs := range rss { @@ -283,8 +370,11 @@ func sortTraces(tds []ptrace.Traces, attrKey string) { }) } -func fillResourceSpans(rs ptrace.ResourceSpans, key string, val string) { - rs.Resource().Attributes().PutStr(key, val) +func fillResourceSpans(rs ptrace.ResourceSpans, kv ...string) { + for i := 0; i < len(kv); i += 2 { + rs.Resource().Attributes().PutStr(kv[i], kv[i+1]) + } + rs.Resource().Attributes().PutInt("__other_key__", 123) ils := rs.ScopeSpans().AppendEmpty() firstSpan := ils.Spans().AppendEmpty() @@ -317,8 +407,10 @@ func sortMetrics(tds []pmetric.Metrics, attrKey string) { }) } -func fillResourceMetrics(rs pmetric.ResourceMetrics, key string, val string) { - rs.Resource().Attributes().PutStr(key, val) +func fillResourceMetrics(rs pmetric.ResourceMetrics, kv ...string) { + for i := 0; i < len(kv); i += 2 { + rs.Resource().Attributes().PutStr(kv[i], kv[i+1]) + } rs.Resource().Attributes().PutInt("__other_key__", 123) ils := rs.ScopeMetrics().AppendEmpty() firstMetric := ils.Metrics().AppendEmpty() @@ -351,8 +443,10 @@ func sortLogs(tds []plog.Logs, attrKey string) { }) } -func fillResourceLogs(rs plog.ResourceLogs, key string, val string) { - rs.Resource().Attributes().PutStr(key, val) +func fillResourceLogs(rs plog.ResourceLogs, kv ...string) { + for i := 0; i < len(kv); i += 2 { + rs.Resource().Attributes().PutStr(kv[i], kv[i+1]) + } rs.Resource().Attributes().PutInt("__other_key__", 123) ils := rs.ScopeLogs().AppendEmpty() firstLogRecord := ils.LogRecords().AppendEmpty()