From ba40e139ff674d51bb4aafda699b28f2f332f8a7 Mon Sep 17 00:00:00 2001 From: Mike Goldsmith Date: Fri, 9 Aug 2024 14:48:30 +0100 Subject: [PATCH] fix(processor/logdedup): Update shutdown behaviour (#34563) **Description:** Simplifies the processor shutdown behaviour by removing the unnecessary done channel. **Link to tracking Issue:** - #34478 **Testing:** Updated unit test verifying shutdown behaviour. **Documentation:** N/A --- .chloggen/logdedup-shutdown.yaml | 27 +++++++++++++++++++ .../logdeduplicationprocessor/processor.go | 19 +++++-------- .../processor_test.go | 16 +++-------- 3 files changed, 37 insertions(+), 25 deletions(-) create mode 100644 .chloggen/logdedup-shutdown.yaml diff --git a/.chloggen/logdedup-shutdown.yaml b/.chloggen/logdedup-shutdown.yaml new file mode 100644 index 000000000000..184bf763cf8f --- /dev/null +++ b/.chloggen/logdedup-shutdown.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: logdedupprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Simplifies the processor shutdown behaviour by removing the unnecessary done channel. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [34478] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/processor/logdeduplicationprocessor/processor.go b/processor/logdeduplicationprocessor/processor.go index 18f47c37962e..11e0a40a92c0 100644 --- a/processor/logdeduplicationprocessor/processor.go +++ b/processor/logdeduplicationprocessor/processor.go @@ -62,23 +62,13 @@ func (p *logDedupProcessor) Capabilities() consumer.Capabilities { } // Shutdown stops the processor. -func (p *logDedupProcessor) Shutdown(ctx context.Context) error { +func (p *logDedupProcessor) Shutdown(_ context.Context) error { if p.cancel != nil { + // Call cancel to stop the export interval goroutine and wait for it to finish. p.cancel() - } - - doneChan := make(chan struct{}) - go func() { - defer close(doneChan) p.wg.Wait() - }() - - select { - case <-ctx.Done(): - return ctx.Err() - case <-doneChan: - return nil } + return nil } // ConsumeLogs processes the logs. @@ -117,6 +107,9 @@ func (p *logDedupProcessor) handleExportInterval(ctx context.Context) { for { select { case <-ctx.Done(): + if err := ctx.Err(); err != context.Canceled { + p.logger.Error("context error", zap.Error(err)) + } return case <-ticker.C: p.mux.Lock() diff --git a/processor/logdeduplicationprocessor/processor_test.go b/processor/logdeduplicationprocessor/processor_test.go index fde7f889bfbc..33b94049cbfa 100644 --- a/processor/logdeduplicationprocessor/processor_test.go +++ b/processor/logdeduplicationprocessor/processor_test.go @@ -92,19 +92,11 @@ func TestProcessorShutdownCtxError(t *testing.T) { p, err := newProcessor(cfg, logsSink, logger) require.NoError(t, err) - // We don't call p.Start as it can create a non-deterministic situation in Shutdown where we may not exit due to ctx error - - // Create empty cancel func as this is called during shutdown - p.cancel = func() {} - - // Add one to wait group to ensure shutdown blocks and the ctx error will trigger - p.wg.Add(1) - + // Start then stop the processor checking for errors + err = p.Start(ctx, componenttest.NewNopHost()) + require.NoError(t, err) err = p.Shutdown(ctx) - require.ErrorIs(t, err, context.Canceled) - - // Call done to ensure goroutine spawned in Shutdown doesn't leak - p.wg.Done() + require.NoError(t, err) } func TestProcessorCapabilities(t *testing.T) {