diff --git a/exporter/splunkhecexporter/buffer.go b/exporter/splunkhecexporter/buffer.go index 9836bff634d3..e12481ccd52d 100644 --- a/exporter/splunkhecexporter/buffer.go +++ b/exporter/splunkhecexporter/buffer.go @@ -36,9 +36,6 @@ type bufferState struct { maxEventLength uint writer io.Writer buf *bytes.Buffer - resource int // index in ResourceLogs/ResourceMetrics/ResourceSpans list - library int // index in ScopeLogs/ScopeMetrics/ScopeSpans list - record int // index in Logs/Metrics/Spans list rawLength int } @@ -190,9 +187,7 @@ type bufferStatePool struct { // get returns a bufferState from the pool. func (p bufferStatePool) get() *bufferState { - bf := p.pool.Get().(*bufferState) - bf.reset() - return bf + return p.pool.Get().(*bufferState) } // put returns a bufferState to the pool. diff --git a/exporter/splunkhecexporter/client.go b/exporter/splunkhecexporter/client.go index b2f97be15694..3f17219303a3 100644 --- a/exporter/splunkhecexporter/client.go +++ b/exporter/splunkhecexporter/client.go @@ -40,6 +40,14 @@ var getPushLogFn = func(c *client) func(ctx context.Context, ld plog.Logs) error return c.pushLogData } +// iterState captures a state of iteration over the pdata Logs/Metrics/Traces instances. +type iterState struct { + resource int // index in ResourceLogs/ResourceMetrics/ResourceSpans list + library int // index in ScopeLogs/ScopeMetrics/ScopeSpans list + record int // index in Logs/Metrics/Spans list + done bool +} + // client sends the data to the splunk backend. type client struct { config *Config @@ -157,215 +165,173 @@ func isProfilingData(sl plog.ScopeLogs) bool { func (c *client) pushLogDataInBatches(ctx context.Context, ld plog.Logs, headers map[string]string) error { bufState := c.bufferStatePool.get() defer c.bufferStatePool.put(bufState) - + is := iterState{} var permanentErrors []error - var rls = ld.ResourceLogs() - for i := 0; i < rls.Len(); i++ { - ills := rls.At(i).ScopeLogs() - for j := 0; j < ills.Len(); j++ { - var err error - var newPermanentErrors []error - - bufState.resource, bufState.library = i, j - newPermanentErrors, err = c.pushLogRecords(ctx, rls, bufState, headers) - - if err != nil { - return consumererror.NewLogs(err, subLogs(ld, bufState)) + for { + bufState.reset() + latestIterState, batchPermanentErrors := c.fillLogsBuffer(ld, bufState, is) + permanentErrors = append(permanentErrors, batchPermanentErrors...) + if bufState.containsData() { + if err := c.postEvents(ctx, bufState, headers); err != nil { + return consumererror.NewLogs(err, subLogs(ld, is)) } - - permanentErrors = append(permanentErrors, newPermanentErrors...) } - } - - // There's some leftover unsent non-profiling data - if bufState.containsData() { - if err := c.postEvents(ctx, bufState, headers); err != nil { - return consumererror.NewLogs(err, subLogs(ld, bufState)) + if latestIterState.done { + break } + is = latestIterState } return multierr.Combine(permanentErrors...) } -func (c *client) pushLogRecords(ctx context.Context, lds plog.ResourceLogsSlice, state *bufferState, headers map[string]string) (permanentErrors []error, sendingError error) { - res := lds.At(state.resource) - logs := res.ScopeLogs().At(state.library).LogRecords() - - state.record = 0 - for k := 0; k < logs.Len(); k++ { - var b []byte - - if c.config.ExportRaw { - b = []byte(logs.At(k).Body().AsString() + "\n") - } else { - // Parsing log record to Splunk event. - event := mapLogRecordToSplunkEvent(res.Resource(), logs.At(k), c.config) - // JSON encoding event and writing to buffer. - var err error - b, err = jsoniter.Marshal(event) - if err != nil { - permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped log event: %v, error: %w", event, err))) - continue - } - - } - - // Continue adding events to buffer up to capacity. - accepted, e := state.accept(b) - if e != nil { - permanentErrors = append(permanentErrors, consumererror.NewPermanent( - fmt.Errorf("error writing the event: %w", e))) - continue - } - if accepted { - continue - } +// fillLogsBuffer fills the buffer with Splunk events until the buffer is full or all logs are processed. +func (c *client) fillLogsBuffer(logs plog.Logs, bs *bufferState, is iterState) (iterState, []error) { + var b []byte + var permanentErrors []error - if state.containsData() { - if err := c.postEvents(ctx, state, headers); err != nil { - return permanentErrors, err + for i := is.resource; i < logs.ResourceLogs().Len(); i++ { + rl := logs.ResourceLogs().At(i) + for j := is.library; j < rl.ScopeLogs().Len(); j++ { + sl := rl.ScopeLogs().At(j) + for k := is.record; k < sl.LogRecords().Len(); k++ { + logRecord := sl.LogRecords().At(k) + + if c.config.ExportRaw { + b = []byte(logRecord.Body().AsString() + "\n") + } else { + // Parsing log record to Splunk event. + event := mapLogRecordToSplunkEvent(rl.Resource(), logRecord, c.config) + // JSON encoding event and writing to buffer. + var err error + b, err = jsoniter.Marshal(event) + if err != nil { + permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf( + "dropped log event: %v, error: %w", event, err))) + continue + } + } + + // Continue adding events to buffer up to capacity. + accepted, e := bs.accept(b) + if e != nil { + permanentErrors = append(permanentErrors, + consumererror.NewPermanent(fmt.Errorf("error writing the event: %w", e))) + continue + } + if !accepted { + if bs.containsData() { + return iterState{i, j, k, false}, permanentErrors + } + permanentErrors = append(permanentErrors, consumererror.NewPermanent( + fmt.Errorf("dropped log event: error: event size %d bytes larger than configured max"+ + " content length %d bytes", len(b), bs.bufferMaxLen))) + return iterState{i, j, k + 1, false}, permanentErrors + } } } - state.reset() - - // Writing truncated bytes back to buffer. - accepted, e = state.accept(b) - if accepted { - state.record = k - continue - } - - state.record = k + 1 - if e != nil { - permanentErrors = append(permanentErrors, consumererror.NewPermanent( - fmt.Errorf("error writing the event: %w", e))) - } else { - permanentErrors = append(permanentErrors, consumererror.NewPermanent( - fmt.Errorf("dropped log event error: event size %d bytes larger than configured max content length %d bytes", len(b), state.bufferMaxLen))) - } } - return permanentErrors, nil + return iterState{done: true}, permanentErrors } -func (c *client) pushMetricsRecords(ctx context.Context, mds pmetric.ResourceMetricsSlice, state *bufferState, headers map[string]string) (permanentErrors []error, sendingError error) { - res := mds.At(state.resource) - metrics := res.ScopeMetrics().At(state.library).Metrics() - - state.record = 0 - for k := 0; k < metrics.Len(); k++ { - // Parsing metric record to Splunk event. - events := mapMetricToSplunkEvent(res.Resource(), metrics.At(k), c.config, c.logger) - buf := bytes.NewBuffer(make([]byte, 0, c.config.MaxContentLengthMetrics)) - if c.config.UseMultiMetricFormat { - merged, err := mergeEventsToMultiMetricFormat(events) - if err != nil { - permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("error merging events: %w", err))) - } else { - events = merged - } - } - for _, event := range events { - // JSON encoding event and writing to buffer. - b, err := jsoniter.Marshal(event) - if err != nil { - permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped metric event: %v, error: %w", event, err))) - continue - } - buf.Write(b) - } - - // Continue adding events to buffer up to capacity. - b := buf.Bytes() - accepted, e := state.accept(b) - if e != nil { - permanentErrors = append(permanentErrors, consumererror.NewPermanent( - fmt.Errorf("error writing the event: %w", e))) - continue - } - if accepted { - continue - } +func (c *client) fillMetricsBuffer(metrics pmetric.Metrics, bs *bufferState, is iterState) (iterState, []error) { + var permanentErrors []error - if state.containsData() { - if err := c.postEvents(ctx, state, headers); err != nil { - return permanentErrors, err + for i := is.resource; i < metrics.ResourceMetrics().Len(); i++ { + rm := metrics.ResourceMetrics().At(i) + for j := is.library; j < rm.ScopeMetrics().Len(); j++ { + sm := rm.ScopeMetrics().At(j) + for k := is.record; k < sm.Metrics().Len(); k++ { + metric := sm.Metrics().At(k) + + // Parsing metric record to Splunk event. + events := mapMetricToSplunkEvent(rm.Resource(), metric, c.config, c.logger) + buf := bytes.NewBuffer(make([]byte, 0, c.config.MaxContentLengthMetrics)) + if c.config.UseMultiMetricFormat { + merged, err := mergeEventsToMultiMetricFormat(events) + if err != nil { + permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf( + "error merging events: %w", err))) + } else { + events = merged + } + } + for _, event := range events { + // JSON encoding event and writing to buffer. + b, err := jsoniter.Marshal(event) + if err != nil { + permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped metric event: %v, error: %w", event, err))) + continue + } + buf.Write(b) + } + + // Continue adding events to buffer up to capacity. + b := buf.Bytes() + accepted, e := bs.accept(b) + if e != nil { + permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf( + "error writing the event: %w", e))) + continue + } + if !accepted { + if bs.containsData() { + return iterState{i, j, k, false}, permanentErrors + } + permanentErrors = append(permanentErrors, consumererror.NewPermanent( + fmt.Errorf("dropped metric event: error: event size %d bytes larger than configured max"+ + " content length %d bytes", len(b), bs.bufferMaxLen))) + return iterState{i, j, k + 1, false}, permanentErrors + } } } - state.reset() - - // Writing truncated bytes back to buffer. - accepted, e = state.accept(b) - if accepted { - state.record = k - continue - } - - state.record = k + 1 - if e != nil { - permanentErrors = append(permanentErrors, consumererror.NewPermanent( - fmt.Errorf("error writing the event: %w", e))) - } else { - permanentErrors = append(permanentErrors, consumererror.NewPermanent( - fmt.Errorf("dropped metric event: error: event size %d bytes larger than configured max content length %d bytes", len(b), state.bufferMaxLen))) - } } - return permanentErrors, nil + return iterState{done: true}, permanentErrors } -func (c *client) pushTracesData(ctx context.Context, tds ptrace.ResourceSpansSlice, state *bufferState, headers map[string]string) (permanentErrors []error, sendingError error) { - res := tds.At(state.resource) - spans := res.ScopeSpans().At(state.library).Spans() - - state.record = 0 - for k := 0; k < spans.Len(); k++ { - // Parsing span record to Splunk event. - event := mapSpanToSplunkEvent(res.Resource(), spans.At(k), c.config) - // JSON encoding event and writing to buffer. - b, err := jsoniter.Marshal(event) - if err != nil { - permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped span events: %v, error: %w", event, err))) - continue - } - - // Continue adding events to buffer up to capacity. - accepted, e := state.accept(b) - if e != nil { - permanentErrors = append(permanentErrors, consumererror.NewPermanent( - fmt.Errorf("error writing the event: %w", e))) - continue - } - if accepted { - continue - } +func (c *client) fillTracesBuffer(traces ptrace.Traces, bs *bufferState, is iterState) (iterState, []error) { + var permanentErrors []error - if state.containsData() { - if err = c.postEvents(ctx, state, headers); err != nil { - return permanentErrors, err + for i := is.resource; i < traces.ResourceSpans().Len(); i++ { + rs := traces.ResourceSpans().At(i) + for j := is.library; j < rs.ScopeSpans().Len(); j++ { + ss := rs.ScopeSpans().At(j) + for k := is.record; k < ss.Spans().Len(); k++ { + span := ss.Spans().At(k) + + // Parsing span record to Splunk event. + event := mapSpanToSplunkEvent(rs.Resource(), span, c.config) + // JSON encoding event and writing to buffer. + b, err := jsoniter.Marshal(event) + if err != nil { + permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped span events: %v, error: %w", event, err))) + continue + } + + // Continue adding events to buffer up to capacity. + accepted, e := bs.accept(b) + if e != nil { + permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf( + "error writing the event: %w", e))) + continue + } + if !accepted { + if bs.containsData() { + return iterState{i, j, k, false}, permanentErrors + } + permanentErrors = append(permanentErrors, consumererror.NewPermanent( + fmt.Errorf("dropped span event: error: event size %d bytes larger than configured max"+ + " content length %d bytes", len(b), bs.bufferMaxLen))) + return iterState{i, j, k + 1, false}, permanentErrors + } } } - state.reset() - - // Writing truncated bytes back to buffer. - accepted, e = state.accept(b) - if accepted { - state.record = k - continue - } - - state.record = k + 1 - if e != nil { - permanentErrors = append(permanentErrors, consumererror.NewPermanent( - fmt.Errorf("error writing the event: %w", e))) - } else { - permanentErrors = append(permanentErrors, consumererror.NewPermanent( - fmt.Errorf("dropped trace event error: event size %d bytes larger than configured max content length %d bytes", len(b), state.bufferMaxLen))) - } } - return permanentErrors, nil + return iterState{done: true}, permanentErrors } // pushMetricsDataInBatches sends batches of Splunk events in JSON format. @@ -374,32 +340,22 @@ func (c *client) pushTracesData(ctx context.Context, tds ptrace.ResourceSpansSli func (c *client) pushMetricsDataInBatches(ctx context.Context, md pmetric.Metrics, headers map[string]string) error { bufState := c.bufferStatePool.get() defer c.bufferStatePool.put(bufState) - + is := iterState{} var permanentErrors []error - var rms = md.ResourceMetrics() - for i := 0; i < rms.Len(); i++ { - ilms := rms.At(i).ScopeMetrics() - for j := 0; j < ilms.Len(); j++ { - var err error - var newPermanentErrors []error - - bufState.resource, bufState.library = i, j - newPermanentErrors, err = c.pushMetricsRecords(ctx, rms, bufState, headers) - - if err != nil { - return consumererror.NewMetrics(err, subMetrics(md, bufState)) + for { + bufState.reset() + latestIterState, batchPermanentErrors := c.fillMetricsBuffer(md, bufState, is) + permanentErrors = append(permanentErrors, batchPermanentErrors...) + if bufState.containsData() { + if err := c.postEvents(ctx, bufState, headers); err != nil { + return consumererror.NewMetrics(err, subMetrics(md, is)) } - - permanentErrors = append(permanentErrors, newPermanentErrors...) } - } - - // There's some leftover unsent metrics - if bufState.containsData() { - if err := c.postEvents(ctx, bufState, headers); err != nil { - return consumererror.NewMetrics(err, subMetrics(md, bufState)) + if latestIterState.done { + break } + is = latestIterState } return multierr.Combine(permanentErrors...) @@ -411,32 +367,22 @@ func (c *client) pushMetricsDataInBatches(ctx context.Context, md pmetric.Metric func (c *client) pushTracesDataInBatches(ctx context.Context, td ptrace.Traces, headers map[string]string) error { bufState := c.bufferStatePool.get() defer c.bufferStatePool.put(bufState) - + is := iterState{} var permanentErrors []error - var rts = td.ResourceSpans() - for i := 0; i < rts.Len(); i++ { - ilts := rts.At(i).ScopeSpans() - for j := 0; j < ilts.Len(); j++ { - var err error - var newPermanentErrors []error - - bufState.resource, bufState.library = i, j - newPermanentErrors, err = c.pushTracesData(ctx, rts, bufState, headers) - - if err != nil { - return consumererror.NewTraces(err, subTraces(td, bufState)) + for { + bufState.reset() + latestIterState, batchPermanentErrors := c.fillTracesBuffer(td, bufState, is) + permanentErrors = append(permanentErrors, batchPermanentErrors...) + if bufState.containsData() { + if err := c.postEvents(ctx, bufState, headers); err != nil { + return consumererror.NewTraces(err, subTraces(td, is)) } - - permanentErrors = append(permanentErrors, newPermanentErrors...) } - } - - // There's some leftover unsent traces - if bufState.containsData() { - if err := c.postEvents(ctx, bufState, headers); err != nil { - return consumererror.NewTraces(err, subTraces(td, bufState)) + if latestIterState.done { + break } + is = latestIterState } return multierr.Combine(permanentErrors...) @@ -450,7 +396,7 @@ func (c *client) postEvents(ctx context.Context, bufState *bufferState, headers } // subLogs returns a subset of logs starting from the state. -func subLogs(src plog.Logs, state *bufferState) plog.Logs { +func subLogs(src plog.Logs, state iterState) plog.Logs { dst := plog.NewLogs() resources := src.ResourceLogs() resourcesSub := dst.ResourceLogs() @@ -492,7 +438,7 @@ func subLogs(src plog.Logs, state *bufferState) plog.Logs { } // subMetrics returns a subset of metrics starting from the state. -func subMetrics(src pmetric.Metrics, state *bufferState) pmetric.Metrics { +func subMetrics(src pmetric.Metrics, state iterState) pmetric.Metrics { dst := pmetric.NewMetrics() resources := src.ResourceMetrics() resourcesSub := dst.ResourceMetrics() @@ -533,7 +479,7 @@ func subMetrics(src pmetric.Metrics, state *bufferState) pmetric.Metrics { return dst } -func subTraces(src ptrace.Traces, state *bufferState) ptrace.Traces { +func subTraces(src ptrace.Traces, state iterState) ptrace.Traces { dst := ptrace.NewTraces() resources := src.ResourceSpans() resourcesSub := dst.ResourceSpans() diff --git a/exporter/splunkhecexporter/client_test.go b/exporter/splunkhecexporter/client_test.go index 5ccf6792f5da..e7bbd0e32101 100644 --- a/exporter/splunkhecexporter/client_test.go +++ b/exporter/splunkhecexporter/client_test.go @@ -1499,7 +1499,7 @@ func TestSubLogs(t *testing.T) { logs := createLogData(2, 2, 3) // Logs subset from leftmost index (resource 0, library 0, record 0). - _0_0_0 := &bufferState{resource: 0, library: 0, record: 0} //revive:disable-line:var-naming + _0_0_0 := iterState{resource: 0, library: 0, record: 0} //revive:disable-line:var-naming got := subLogs(logs, _0_0_0) // Number of logs in subset should equal original logs. @@ -1513,7 +1513,7 @@ func TestSubLogs(t *testing.T) { assert.Equal(t, "1_1_2", val.AsString()) // Logs subset from some mid index (resource 0, library 1, log 2). - _0_1_2 := &bufferState{resource: 0, library: 1, record: 2} //revive:disable-line:var-naming + _0_1_2 := iterState{resource: 0, library: 1, record: 2} //revive:disable-line:var-naming got = subLogs(logs, _0_1_2) assert.Equal(t, 7, got.LogRecordCount()) @@ -1526,7 +1526,7 @@ func TestSubLogs(t *testing.T) { assert.Equal(t, "1_1_2", val.AsString()) // Logs subset from rightmost index (resource 1, library 1, log 2). - _1_1_2 := &bufferState{resource: 1, library: 1, record: 2} //revive:disable-line:var-naming + _1_1_2 := iterState{resource: 1, library: 1, record: 2} //revive:disable-line:var-naming got = subLogs(logs, _1_1_2) // Number of logs in subset should be 1. @@ -1537,30 +1537,48 @@ func TestSubLogs(t *testing.T) { assert.Equal(t, "1_1_2", val.AsString()) } -func TestPushLogRecordsBufferCounters(t *testing.T) { +func TestPushLogsPartialSuccess(t *testing.T) { cfg := NewFactory().CreateDefaultConfig().(*Config) cfg.ExportRaw = true - c := client{ - config: cfg, - logger: zap.NewNop(), - hecWorker: &mockHecWorker{}, - bufferStatePool: newBufferStatePool(6, false, 10), - } + cfg.MaxContentLengthLogs = 6 + c := newLogsClient(exportertest.NewNopCreateSettings(), cfg) - logs := plog.NewResourceLogsSlice() - logRecords := logs.AppendEmpty().ScopeLogs().AppendEmpty().LogRecords() - logRecords.AppendEmpty().Body().SetStr("12345") // the first log record should be accepted and sent - logRecords.AppendEmpty().Body().SetStr("12345678") // the second log record should be rejected as it's too big - logRecords.AppendEmpty().Body().SetStr("12345") // the third log record should be just accepted - bs := c.bufferStatePool.get() - defer c.bufferStatePool.put(bs) + // The first request succeeds, the second fails. + httpClient, _ := newTestClientWithPresetResponses([]int{200, 503}, []string{"OK", "NOK"}) + url := &url.URL{Scheme: "http", Host: "splunk"} + c.hecWorker = &defaultHecWorker{url, httpClient, buildHTTPHeaders(cfg, component.NewDefaultBuildInfo())} - permErrs, err := c.pushLogRecords(context.Background(), logs, bs, nil) - assert.NoError(t, err) - assert.Len(t, permErrs, 1) - assert.ErrorContains(t, permErrs[0], "bytes larger than configured max content length") + logs := plog.NewLogs() + logRecords := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords() + logRecords.AppendEmpty().Body().SetStr("log-1") // should be successfully sent + logRecords.AppendEmpty().Body().SetStr("log-2-too-big") // should be permanently rejected as it's too big + logRecords.AppendEmpty().Body().SetStr("log-3") // should be rejected and returned to for retry + + err := c.pushLogData(context.Background(), logs) + expectedErr := consumererror.Logs{} + require.ErrorContains(t, err, "503") + require.ErrorAs(t, err, &expectedErr) + require.Equal(t, 1, expectedErr.Data().LogRecordCount()) + assert.Equal(t, "log-3", expectedErr.Data().ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().Str()) +} - assert.Equal(t, 2, bs.record, "the buffer counter must be at the latest log record") +func TestPushLogsRetryableFailureMultipleResources(t *testing.T) { + c := newLogsClient(exportertest.NewNopCreateSettings(), NewFactory().CreateDefaultConfig().(*Config)) + + httpClient, _ := newTestClientWithPresetResponses([]int{503}, []string{"NOK"}) + url := &url.URL{Scheme: "http", Host: "splunk"} + c.hecWorker = &defaultHecWorker{url, httpClient, buildHTTPHeaders(c.config, component.NewDefaultBuildInfo())} + + logs := plog.NewLogs() + logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("log-1") + logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("log-2") + logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("log-3") + + err := c.pushLogData(context.Background(), logs) + expectedErr := consumererror.Logs{} + require.ErrorContains(t, err, "503") + require.ErrorAs(t, err, &expectedErr) + assert.Equal(t, logs, expectedErr.Data()) } // validateCompressedEqual validates that GZipped `got` contains `expected` strings