Skip to content

Commit

Permalink
[exporter/splunkhec] batch by index and access token (#30407)
Browse files Browse the repository at this point in the history
**Description:**
Allow to batch with multiple keys. Use this new capability with the
Splunk HEC exporter.

**Link to tracking Issue:**
Fixes #30404
  • Loading branch information
atoulme authored Jan 10, 2024
1 parent 187c393 commit 8990edc
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 29 deletions.
27 changes: 27 additions & 0 deletions .chloggen/add_multi_keys_batchperresourceattr.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: splunkhecexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Batch data according to access token and index, if present.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [30404]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
27 changes: 22 additions & 5 deletions exporter/splunkhecexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ type baseLogsExporter struct {
consumer.Logs
}

// TODO: Find a place for this to be shared.
type baseTracesExporter struct {
component.Component
consumer.Traces
}

// NewFactory creates a factory for Splunk HEC exporter.
func NewFactory() exporter.Factory {
return exporter.NewFactory(
Expand Down Expand Up @@ -103,7 +109,7 @@ func createTracesExporter(

c := newTracesClient(set, cfg)

return exporterhelper.NewTracesExporter(
e, err := exporterhelper.NewTracesExporter(
ctx,
set,
cfg,
Expand All @@ -114,6 +120,17 @@ func createTracesExporter(
exporterhelper.WithQueue(cfg.QueueSettings),
exporterhelper.WithStart(c.start),
exporterhelper.WithShutdown(c.stop))

if err != nil {
return nil, err
}

wrapped := &baseTracesExporter{
Component: e,
Traces: batchperresourceattr.NewMultiBatchPerResourceTraces([]string{splunk.HecTokenLabel, splunk.DefaultIndexLabel}, e),
}

return wrapped, nil
}

func createMetricsExporter(
Expand All @@ -125,7 +142,7 @@ func createMetricsExporter(

c := newMetricsClient(set, cfg)

exporter, err := exporterhelper.NewMetricsExporter(
e, err := exporterhelper.NewMetricsExporter(
ctx,
set,
cfg,
Expand All @@ -141,8 +158,8 @@ func createMetricsExporter(
}

wrapped := &baseMetricsExporter{
Component: exporter,
Metrics: batchperresourceattr.NewBatchPerResourceMetrics(splunk.HecTokenLabel, exporter),
Component: e,
Metrics: batchperresourceattr.NewMultiBatchPerResourceMetrics([]string{splunk.HecTokenLabel, splunk.DefaultIndexLabel}, e),
}

return wrapped, nil
Expand Down Expand Up @@ -175,7 +192,7 @@ func createLogsExporter(

wrapped := &baseLogsExporter{
Component: logsExporter,
Logs: batchperresourceattr.NewBatchPerResourceLogs(splunk.HecTokenLabel, &perScopeBatcher{
Logs: batchperresourceattr.NewMultiBatchPerResourceLogs([]string{splunk.HecTokenLabel, splunk.DefaultIndexLabel}, &perScopeBatcher{
logsEnabled: cfg.LogDataEnabled,
profilingEnabled: cfg.ProfilingDataEnabled,
logger: set.Logger,
Expand Down
68 changes: 50 additions & 18 deletions pkg/batchperresourceattr/batchperresourceattr.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package batchperresourceattr // import "github.com/open-telemetry/opentelemetry-

import (
"context"
"fmt"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
Expand All @@ -13,15 +14,24 @@ import (
"go.uber.org/multierr"
)

var separator = string([]byte{0x0, 0x1})

type batchTraces struct {
attrKey string
next consumer.Traces
attrKeys []string
next consumer.Traces
}

func NewBatchPerResourceTraces(attrKey string, next consumer.Traces) consumer.Traces {
return &batchTraces{
attrKey: attrKey,
next: next,
attrKeys: []string{attrKey},
next: next,
}
}

func NewMultiBatchPerResourceTraces(attrKeys []string, next consumer.Traces) consumer.Traces {
return &batchTraces{
attrKeys: attrKeys,
next: next,
}
}

Expand All @@ -42,9 +52,13 @@ func (bt *batchTraces) ConsumeTraces(ctx context.Context, td ptrace.Traces) erro
for i := 0; i < lenRss; i++ {
rs := rss.At(i)
var attrVal string
if attributeValue, ok := rs.Resource().Attributes().Get(bt.attrKey); ok {
attrVal = attributeValue.Str()

for _, k := range bt.attrKeys {
if attributeValue, ok := rs.Resource().Attributes().Get(k); ok {
attrVal = fmt.Sprintf("%s%s%s", attrVal, separator, attributeValue.Str())
}
}

indicesByAttr[attrVal] = append(indicesByAttr[attrVal], i)
}
// If there is a single attribute value, then call next.
Expand All @@ -66,14 +80,21 @@ func (bt *batchTraces) ConsumeTraces(ctx context.Context, td ptrace.Traces) erro
}

type batchMetrics struct {
attrKey string
next consumer.Metrics
attrKeys []string
next consumer.Metrics
}

func NewBatchPerResourceMetrics(attrKey string, next consumer.Metrics) consumer.Metrics {
return &batchMetrics{
attrKey: attrKey,
next: next,
attrKeys: []string{attrKey},
next: next,
}
}

func NewMultiBatchPerResourceMetrics(attrKeys []string, next consumer.Metrics) consumer.Metrics {
return &batchMetrics{
attrKeys: attrKeys,
next: next,
}
}

Expand All @@ -94,8 +115,10 @@ func (bt *batchMetrics) ConsumeMetrics(ctx context.Context, td pmetric.Metrics)
for i := 0; i < lenRms; i++ {
rm := rms.At(i)
var attrVal string
if attributeValue, ok := rm.Resource().Attributes().Get(bt.attrKey); ok {
attrVal = attributeValue.Str()
for _, k := range bt.attrKeys {
if attributeValue, ok := rm.Resource().Attributes().Get(k); ok {
attrVal = fmt.Sprintf("%s%s%s", attrVal, separator, attributeValue.Str())
}
}
indicesByAttr[attrVal] = append(indicesByAttr[attrVal], i)
}
Expand All @@ -118,14 +141,21 @@ func (bt *batchMetrics) ConsumeMetrics(ctx context.Context, td pmetric.Metrics)
}

type batchLogs struct {
attrKey string
next consumer.Logs
attrKeys []string
next consumer.Logs
}

func NewBatchPerResourceLogs(attrKey string, next consumer.Logs) consumer.Logs {
return &batchLogs{
attrKey: attrKey,
next: next,
attrKeys: []string{attrKey},
next: next,
}
}

func NewMultiBatchPerResourceLogs(attrKeys []string, next consumer.Logs) consumer.Logs {
return &batchLogs{
attrKeys: attrKeys,
next: next,
}
}

Expand All @@ -146,8 +176,10 @@ func (bt *batchLogs) ConsumeLogs(ctx context.Context, td plog.Logs) error {
for i := 0; i < lenRls; i++ {
rl := rls.At(i)
var attrVal string
if attributeValue, ok := rl.Resource().Attributes().Get(bt.attrKey); ok {
attrVal = attributeValue.Str()
for _, k := range bt.attrKeys {
if attributeValue, ok := rl.Resource().Attributes().Get(k); ok {
attrVal = fmt.Sprintf("%s%s%s", attrVal, separator, attributeValue.Str())
}
}
indicesByAttr[attrVal] = append(indicesByAttr[attrVal], i)
}
Expand Down
106 changes: 100 additions & 6 deletions pkg/batchperresourceattr/batchperresourceattr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,35 @@ func TestSplitTracesIntoDifferentBatches(t *testing.T) {
assert.Equal(t, newTraces(expected.ResourceSpans().At(3), expected.ResourceSpans().At(7)), outBatches[4])
}

func TestSplitTracesIntoDifferentBatchesWithMultipleKeys(t *testing.T) {
inBatch := ptrace.NewTraces()
fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "attr_key", "1", "attr_key2", "1")
fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "attr_key", "2", "attr_key2", "2")
fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "attr_key", "3", "attr_key2", "3")
fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "attr_key", "4", "attr_key2", "4")
fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "attr_key", "1", "attr_key2", "1")
fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "attr_key", "2", "attr_key2", "2")
fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "attr_key", "3", "attr_key2", "3")
fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "attr_key", "4", "attr_key2", "4")
fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "attr_key", "5", "attr_key2", "6")
fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "diff_attr_key", "1")
expected := ptrace.NewTraces()
inBatch.CopyTo(expected)

sink := new(consumertest.TracesSink)
bpr := NewMultiBatchPerResourceTraces([]string{"attr_key", "attr_key2"}, sink)
assert.NoError(t, bpr.ConsumeTraces(context.Background(), inBatch))
outBatches := sink.AllTraces()
require.Len(t, outBatches, 6)
sortTraces(outBatches, "attr_key")
assert.Equal(t, newTraces(expected.ResourceSpans().At(9)), outBatches[0])
assert.Equal(t, newTraces(expected.ResourceSpans().At(0), expected.ResourceSpans().At(4)), outBatches[1])
assert.Equal(t, newTraces(expected.ResourceSpans().At(1), expected.ResourceSpans().At(5)), outBatches[2])
assert.Equal(t, newTraces(expected.ResourceSpans().At(2), expected.ResourceSpans().At(6)), outBatches[3])
assert.Equal(t, newTraces(expected.ResourceSpans().At(3), expected.ResourceSpans().At(7)), outBatches[4])
assert.Equal(t, newTraces(expected.ResourceSpans().At(8)), outBatches[5])
}

func TestSplitMetricsOneResourceMetrics(t *testing.T) {
inBatch := pmetric.NewMetrics()
fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", "1")
Expand Down Expand Up @@ -180,6 +209,35 @@ func TestSplitMetricsIntoDifferentBatches(t *testing.T) {
assert.Equal(t, newMetrics(expected.ResourceMetrics().At(3), expected.ResourceMetrics().At(7)), outBatches[4])
}

func TestSplitMetricsIntoDifferentBatchesWithMultipleKeys(t *testing.T) {
inBatch := pmetric.NewMetrics()
fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", "1", "attr_key2", "1")
fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", "2", "attr_key2", "2")
fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", "3", "attr_key2", "3")
fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", "4", "attr_key2", "4")
fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", "1", "attr_key2", "1")
fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", "2", "attr_key2", "2")
fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", "3", "attr_key2", "3")
fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", "4", "attr_key2", "4")
fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", "5", "attr_key2", "6")
fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "diff_attr_key", "1")
expected := pmetric.NewMetrics()
inBatch.CopyTo(expected)

sink := new(consumertest.MetricsSink)
bpr := NewMultiBatchPerResourceMetrics([]string{"attr_key", "attr_key2"}, sink)
assert.NoError(t, bpr.ConsumeMetrics(context.Background(), inBatch))
outBatches := sink.AllMetrics()
require.Len(t, outBatches, 6)
sortMetrics(outBatches, "attr_key")
assert.Equal(t, newMetrics(expected.ResourceMetrics().At(9)), outBatches[0])
assert.Equal(t, newMetrics(expected.ResourceMetrics().At(0), expected.ResourceMetrics().At(4)), outBatches[1])
assert.Equal(t, newMetrics(expected.ResourceMetrics().At(1), expected.ResourceMetrics().At(5)), outBatches[2])
assert.Equal(t, newMetrics(expected.ResourceMetrics().At(2), expected.ResourceMetrics().At(6)), outBatches[3])
assert.Equal(t, newMetrics(expected.ResourceMetrics().At(3), expected.ResourceMetrics().At(7)), outBatches[4])
assert.Equal(t, newMetrics(expected.ResourceMetrics().At(8)), outBatches[5])
}

func TestSplitLogsOneResourceLogs(t *testing.T) {
inBatch := plog.NewLogs()
fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", "1")
Expand Down Expand Up @@ -261,6 +319,35 @@ func TestSplitLogsIntoDifferentBatches(t *testing.T) {
assert.Equal(t, newLogs(expected.ResourceLogs().At(3), expected.ResourceLogs().At(7)), outBatches[4])
}

func TestSplitLogsIntoDifferentBatchesWithMultipleKeys(t *testing.T) {
inBatch := plog.NewLogs()
fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", "1", "attr_key2", "1")
fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", "2", "attr_key2", "2")
fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", "3", "attr_key2", "3")
fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", "4", "attr_key2", "4")
fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", "1", "attr_key2", "1")
fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", "2", "attr_key2", "2")
fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", "3", "attr_key2", "3")
fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", "4", "attr_key2", "4")
fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", "5", "attr_key2", "6")
fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "diff_attr_key", "1")
expected := plog.NewLogs()
inBatch.CopyTo(expected)

sink := new(consumertest.LogsSink)
bpr := NewMultiBatchPerResourceLogs([]string{"attr_key", "attr_key2"}, sink)
assert.NoError(t, bpr.ConsumeLogs(context.Background(), inBatch))
outBatches := sink.AllLogs()
require.Len(t, outBatches, 6)
sortLogs(outBatches, "attr_key")
assert.Equal(t, newLogs(expected.ResourceLogs().At(9)), outBatches[0])
assert.Equal(t, newLogs(expected.ResourceLogs().At(0), expected.ResourceLogs().At(4)), outBatches[1])
assert.Equal(t, newLogs(expected.ResourceLogs().At(1), expected.ResourceLogs().At(5)), outBatches[2])
assert.Equal(t, newLogs(expected.ResourceLogs().At(2), expected.ResourceLogs().At(6)), outBatches[3])
assert.Equal(t, newLogs(expected.ResourceLogs().At(3), expected.ResourceLogs().At(7)), outBatches[4])
assert.Equal(t, newLogs(expected.ResourceLogs().At(8)), outBatches[5])
}

func newTraces(rss ...ptrace.ResourceSpans) ptrace.Traces {
td := ptrace.NewTraces()
for _, rs := range rss {
Expand All @@ -283,8 +370,11 @@ func sortTraces(tds []ptrace.Traces, attrKey string) {
})
}

func fillResourceSpans(rs ptrace.ResourceSpans, key string, val string) {
rs.Resource().Attributes().PutStr(key, val)
func fillResourceSpans(rs ptrace.ResourceSpans, kv ...string) {
for i := 0; i < len(kv); i += 2 {
rs.Resource().Attributes().PutStr(kv[i], kv[i+1])
}

rs.Resource().Attributes().PutInt("__other_key__", 123)
ils := rs.ScopeSpans().AppendEmpty()
firstSpan := ils.Spans().AppendEmpty()
Expand Down Expand Up @@ -317,8 +407,10 @@ func sortMetrics(tds []pmetric.Metrics, attrKey string) {
})
}

func fillResourceMetrics(rs pmetric.ResourceMetrics, key string, val string) {
rs.Resource().Attributes().PutStr(key, val)
func fillResourceMetrics(rs pmetric.ResourceMetrics, kv ...string) {
for i := 0; i < len(kv); i += 2 {
rs.Resource().Attributes().PutStr(kv[i], kv[i+1])
}
rs.Resource().Attributes().PutInt("__other_key__", 123)
ils := rs.ScopeMetrics().AppendEmpty()
firstMetric := ils.Metrics().AppendEmpty()
Expand Down Expand Up @@ -351,8 +443,10 @@ func sortLogs(tds []plog.Logs, attrKey string) {
})
}

func fillResourceLogs(rs plog.ResourceLogs, key string, val string) {
rs.Resource().Attributes().PutStr(key, val)
func fillResourceLogs(rs plog.ResourceLogs, kv ...string) {
for i := 0; i < len(kv); i += 2 {
rs.Resource().Attributes().PutStr(kv[i], kv[i+1])
}
rs.Resource().Attributes().PutInt("__other_key__", 123)
ils := rs.ScopeLogs().AppendEmpty()
firstLogRecord := ils.LogRecords().AppendEmpty()
Expand Down

0 comments on commit 8990edc

Please sign in to comment.