From 90e75d3f34543199114168759bb8e88a848d1896 Mon Sep 17 00:00:00 2001 From: Antoine Toulme Date: Fri, 30 Aug 2024 09:58:41 -0700 Subject: [PATCH] [receiver/splunkhec] fix memory leak (#34911) **Description:** Fix memory leak when the receiver is used for both metrics and logs at the same time **Link to tracking Issue:** Fixes #34886 --- .chloggen/fix_memleak_obsreport.yaml | 27 +++++++++++++++ receiver/splunkhecreceiver/receiver.go | 48 +++++++++++++++++--------- 2 files changed, 59 insertions(+), 16 deletions(-) create mode 100644 .chloggen/fix_memleak_obsreport.yaml diff --git a/.chloggen/fix_memleak_obsreport.yaml b/.chloggen/fix_memleak_obsreport.yaml new file mode 100644 index 000000000000..681cd3eff17f --- /dev/null +++ b/.chloggen/fix_memleak_obsreport.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: splunkhecreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix memory leak when the receiver is used for both metrics and logs at the same time + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [34886] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/receiver/splunkhecreceiver/receiver.go b/receiver/splunkhecreceiver/receiver.go index 895af12299de..3ed0769e4f29 100644 --- a/receiver/splunkhecreceiver/receiver.go +++ b/receiver/splunkhecreceiver/receiver.go @@ -249,22 +249,21 @@ func (r *splunkReceiver) Shutdown(context.Context) error { return err } -func (r *splunkReceiver) processSuccessResponseWithAck(ctx context.Context, resp http.ResponseWriter, eventCount int, channelID string) { +func (r *splunkReceiver) processSuccessResponseWithAck(resp http.ResponseWriter, channelID string) error { if r.ackExt == nil { panic("writing response with ack when ack extension is not configured") } ackID := r.ackExt.ProcessEvent(channelID) r.ackExt.Ack(channelID, ackID) - r.processSuccessResponse(ctx, resp, eventCount, []byte(fmt.Sprintf(responseOKWithAckID, ackID))) + return r.processSuccessResponse(resp, []byte(fmt.Sprintf(responseOKWithAckID, ackID))) } -func (r *splunkReceiver) processSuccessResponse(ctx context.Context, resp http.ResponseWriter, eventCount int, bodyContent []byte) { +func (r *splunkReceiver) processSuccessResponse(resp http.ResponseWriter, bodyContent []byte) error { resp.Header().Set(httpContentTypeHeader, httpJSONTypeHeader) resp.WriteHeader(http.StatusOK) - if _, err := resp.Write(bodyContent); err != nil { - r.failRequest(ctx, resp, http.StatusInternalServerError, errInternalServerError, eventCount, err) - } + _, err := resp.Write(bodyContent) + return err } func (r *splunkReceiver) handleAck(resp http.ResponseWriter, req *http.Request) { @@ -308,7 +307,9 @@ func (r *splunkReceiver) handleAck(resp http.ResponseWriter, req *http.Request) queriedAcks := r.ackExt.QueryAcks(channelID, ackRequest.Acks) ackString, _ := json.Marshal(queriedAcks) - r.processSuccessResponse(ctx, resp, 0, []byte(fmt.Sprintf(ackResponse, ackString))) + if err := r.processSuccessResponse(resp, []byte(fmt.Sprintf(ackResponse, ackString))); err != nil { + r.failRequest(ctx, resp, http.StatusInternalServerError, errInternalServerError, 0, err) + } } func (r *splunkReceiver) handleRawReq(resp http.ResponseWriter, req *http.Request) { @@ -383,12 +384,17 @@ func (r *splunkReceiver) handleRawReq(resp http.ResponseWriter, req *http.Reques if consumerErr != nil { r.failRequest(ctx, resp, http.StatusInternalServerError, errInternalServerError, slLen, consumerErr) } else { + var ackErr error if len(channelID) > 0 && r.ackExt != nil { - r.processSuccessResponseWithAck(ctx, resp, ld.LogRecordCount(), channelID) + ackErr = r.processSuccessResponseWithAck(resp, channelID) } else { - r.processSuccessResponse(ctx, resp, ld.LogRecordCount(), okRespBody) + ackErr = r.processSuccessResponse(resp, okRespBody) + } + if ackErr != nil { + r.failRequest(ctx, resp, http.StatusInternalServerError, errInternalServerError, ld.LogRecordCount(), err) + } else { + r.obsrecv.EndLogsOp(ctx, metadata.Type.String(), slLen, nil) } - r.obsrecv.EndLogsOp(ctx, metadata.Type.String(), slLen, nil) } } @@ -521,7 +527,6 @@ func (r *splunkReceiver) handleReq(resp http.ResponseWriter, req *http.Request) return } decodeErr := r.logsConsumer.ConsumeLogs(ctx, ld) - r.obsrecv.EndLogsOp(ctx, metadata.Type.String(), len(events), decodeErr) if decodeErr != nil { r.failRequest(ctx, resp, http.StatusInternalServerError, errInternalServerError, len(events), decodeErr) return @@ -531,17 +536,27 @@ func (r *splunkReceiver) handleReq(resp http.ResponseWriter, req *http.Request) md, _ := splunkHecToMetricsData(r.settings.Logger, metricEvents, resourceCustomizer, r.config) decodeErr := r.metricsConsumer.ConsumeMetrics(ctx, md) - r.obsrecv.EndMetricsOp(ctx, metadata.Type.String(), len(metricEvents), decodeErr) if decodeErr != nil { r.failRequest(ctx, resp, http.StatusInternalServerError, errInternalServerError, len(metricEvents), decodeErr) return } } + var ackErr error if len(channelID) > 0 && r.ackExt != nil { - r.processSuccessResponseWithAck(ctx, resp, len(events)+len(metricEvents), channelID) + ackErr = r.processSuccessResponseWithAck(resp, channelID) } else { - r.processSuccessResponse(ctx, resp, len(events)+len(metricEvents), okRespBody) + ackErr = r.processSuccessResponse(resp, okRespBody) + } + if ackErr != nil { + r.failRequest(ctx, resp, http.StatusInternalServerError, errInternalServerError, len(events)+len(metricEvents), ackErr) + } else { + if r.logsConsumer != nil { + r.obsrecv.EndLogsOp(ctx, metadata.Type.String(), len(events), nil) + } + if r.metricsConsumer != nil { + r.obsrecv.EndMetricsOp(ctx, metadata.Type.String(), len(metricEvents), nil) + } } } @@ -576,9 +591,10 @@ func (r *splunkReceiver) failRequest( } } - if r.metricsConsumer == nil { + if r.logsConsumer != nil { r.obsrecv.EndLogsOp(ctx, metadata.Type.String(), numRecordsReceived, err) - } else { + } + if r.metricsConsumer != nil { r.obsrecv.EndMetricsOp(ctx, metadata.Type.String(), numRecordsReceived, err) }