From 26b66c7f405315f0ee565856452da43dd4d64f51 Mon Sep 17 00:00:00 2001 From: Paulo Janotti Date: Thu, 12 Oct 2023 15:16:14 -0700 Subject: [PATCH 1/3] Cache event publishers: log warn once per provider --- pkg/stanza/operator/input/windows/operator.go | 17 +++-- .../operator/input/windows/publisher.go | 4 ++ .../operator/input/windows/publisher_test.go | 23 ++++++- .../operator/input/windows/publishercache.go | 50 ++++++++++++++ .../input/windows/publishercache_test.go | 68 +++++++++++++++++++ 5 files changed, 155 insertions(+), 7 deletions(-) create mode 100644 pkg/stanza/operator/input/windows/publishercache.go create mode 100644 pkg/stanza/operator/input/windows/publishercache_test.go diff --git a/pkg/stanza/operator/input/windows/operator.go b/pkg/stanza/operator/input/windows/operator.go index 5df98bb64cef..c34a0db6086d 100644 --- a/pkg/stanza/operator/input/windows/operator.go +++ b/pkg/stanza/operator/input/windows/operator.go @@ -94,6 +94,7 @@ type Input struct { excludeProviders []string pollInterval time.Duration persister operator.Persister + publisherCache publisherCache cancel context.CancelFunc wg sync.WaitGroup } @@ -123,6 +124,8 @@ func (e *Input) Start(persister operator.Persister) error { return fmt.Errorf("failed to open subscription: %w", err) } + e.publisherCache = newPublisherCache() + e.wg.Add(1) go e.readOnInterval(ctx) return nil @@ -141,6 +144,10 @@ func (e *Input) Stop() error { return fmt.Errorf("failed to close bookmark: %w", err) } + if err := e.publisherCache.evictAll(); err != nil { + return fmt.Errorf("failed to close publishers: %w", err) + } + return nil } @@ -231,13 +238,15 @@ func (e *Input) processEvent(ctx context.Context, event Event) { } } - publisher := NewPublisher() - if err := publisher.Open(simpleEvent.Provider.Name); err != nil { - e.Errorf("Failed to open publisher: %s: writing log entry to pipeline without metadata", err) + publisher, openPublisherErr := e.publisherCache.get(simpleEvent.Provider.Name) + if openPublisherErr != nil { + e.Warnf("Failed to open the %q event source, respective log entries can't be formatted: %s", simpleEvent.Provider.Name, openPublisherErr) + } + + if !publisher.Valid() { e.sendEvent(ctx, simpleEvent) return } - defer publisher.Close() formattedEvent, err := event.RenderFormatted(e.buffer, publisher) if err != nil { diff --git a/pkg/stanza/operator/input/windows/publisher.go b/pkg/stanza/operator/input/windows/publisher.go index dc6e8668063c..8000440362e5 100644 --- a/pkg/stanza/operator/input/windows/publisher.go +++ b/pkg/stanza/operator/input/windows/publisher.go @@ -36,6 +36,10 @@ func (p *Publisher) Open(provider string) error { return nil } +func (p *Publisher) Valid() bool { + return p.handle != 0 +} + // Close will close the publisher handle. func (p *Publisher) Close() error { if p.handle == 0 { diff --git a/pkg/stanza/operator/input/windows/publisher_test.go b/pkg/stanza/operator/input/windows/publisher_test.go index f4d8ef2f6237..5fb2dae0ef8c 100644 --- a/pkg/stanza/operator/input/windows/publisher_test.go +++ b/pkg/stanza/operator/input/windows/publisher_test.go @@ -17,6 +17,7 @@ func TestPublisherOpenPreexisting(t *testing.T) { err := publisher.Open("") require.Error(t, err) require.Contains(t, err.Error(), "publisher handle is already open") + require.True(t, publisher.Valid()) } func TestPublisherOpenInvalidUTF8(t *testing.T) { @@ -25,44 +26,60 @@ func TestPublisherOpenInvalidUTF8(t *testing.T) { err := publisher.Open(invalidUTF8) require.Error(t, err) require.Contains(t, err.Error(), "failed to convert the provider name \"\\x00\" to utf16: invalid argument") + require.False(t, publisher.Valid()) } func TestPublisherOpenSyscallFailure(t *testing.T) { publisher := NewPublisher() provider := "provider" - openPublisherMetadataProc = SimpleMockProc(0, 0, ErrorNotSupported) + defer mockWithDeferredRestore(&openPublisherMetadataProc, SimpleMockProc(0, 0, ErrorNotSupported))() err := publisher.Open(provider) require.Error(t, err) require.Contains(t, err.Error(), "failed to open the metadata for the \"provider\" provider: The request is not supported.") + require.False(t, publisher.Valid()) } func TestPublisherOpenSuccess(t *testing.T) { publisher := NewPublisher() provider := "provider" - openPublisherMetadataProc = SimpleMockProc(5, 0, ErrorSuccess) + defer mockWithDeferredRestore(&openPublisherMetadataProc, SimpleMockProc(5, 0, ErrorSuccess))() err := publisher.Open(provider) require.NoError(t, err) require.Equal(t, uintptr(5), publisher.handle) + require.True(t, publisher.Valid()) } func TestPublisherCloseWhenAlreadyClosed(t *testing.T) { publisher := NewPublisher() err := publisher.Close() require.NoError(t, err) + require.False(t, publisher.Valid()) } func TestPublisherCloseSyscallFailure(t *testing.T) { publisher := Publisher{handle: 5} - closeProc = SimpleMockProc(0, 0, ErrorNotSupported) + defer mockWithDeferredRestore(&closeProc, SimpleMockProc(0, 0, ErrorNotSupported))() err := publisher.Close() require.Error(t, err) require.Contains(t, err.Error(), "failed to close publisher") + require.True(t, publisher.Valid()) } func TestPublisherCloseSuccess(t *testing.T) { publisher := Publisher{handle: 5} + originalCloseProc := closeProc closeProc = SimpleMockProc(1, 0, ErrorSuccess) + defer func() { closeProc = originalCloseProc }() err := publisher.Close() require.NoError(t, err) require.Equal(t, uintptr(0), publisher.handle) + require.False(t, publisher.Valid()) +} + +func mockWithDeferredRestore(call *SyscallProc, mockCall SyscallProc) func() { + original := *call + *call = mockCall + return func() { + *call = original + } } diff --git a/pkg/stanza/operator/input/windows/publishercache.go b/pkg/stanza/operator/input/windows/publishercache.go new file mode 100644 index 000000000000..e45a0e6bfbc5 --- /dev/null +++ b/pkg/stanza/operator/input/windows/publishercache.go @@ -0,0 +1,50 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//go:build windows +// +build windows + +package windows // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/windows" + +import ( + "errors" +) + +type publisherCache struct { + cache map[string]Publisher +} + +func newPublisherCache() publisherCache { + return publisherCache{ + cache: make(map[string]Publisher), + } +} + +func (c *publisherCache) get(provider string) (publisher Publisher, openPublisherErr error) { + publisher, ok := c.cache[provider] + if ok { + return publisher, nil + } + + publisher = NewPublisher() + err := publisher.Open(provider) + + // Always store the publisher even if there was an error opening it. + c.cache[provider] = publisher + + return publisher, err +} + +func (c *publisherCache) evictAll() error { + var errs error + for _, publisher := range c.cache { + if publisher.Valid() { + if err := publisher.Close(); err != nil { + errs = errors.Join(errs, err) + } + } + } + + c.cache = make(map[string]Publisher) + return errs +} diff --git a/pkg/stanza/operator/input/windows/publishercache_test.go b/pkg/stanza/operator/input/windows/publishercache_test.go new file mode 100644 index 000000000000..e3ada625941c --- /dev/null +++ b/pkg/stanza/operator/input/windows/publishercache_test.go @@ -0,0 +1,68 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//go:build windows +// +build windows + +package windows + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestGetValidPublisher(t *testing.T) { + publisherCache := newPublisherCache() + defer publisherCache.evictAll() + + // Provider "Application" exists in all Windows versions. + publisher, openPublisherErr := publisherCache.get("Application") + require.NoError(t, openPublisherErr) + require.True(t, publisher.Valid()) + + // Get the same publisher again. + publisher, openPublisherErr = publisherCache.get("Application") + require.NoError(t, openPublisherErr) + require.True(t, publisher.Valid()) +} + +func TestGetInvalidPublisher(t *testing.T) { + publisherCache := newPublisherCache() + defer publisherCache.evictAll() + + // Provider "InvalidProvider" does not exist in any Windows version. + publisher, openPublisherErr := publisherCache.get("InvalidProvider") + require.Error(t, openPublisherErr, "%v", publisherCache) + require.False(t, publisher.Valid()) + + // Get "InvalidProvider" publisher again. + publisher, openPublisherErr = publisherCache.get("InvalidProvider") + require.NoError(t, openPublisherErr) // It is cached, no error opening it. + require.False(t, publisher.Valid()) +} + +func TestValidAndInvalidPublishers(t *testing.T) { + publisherCache := newPublisherCache() + defer publisherCache.evictAll() + + // Provider "EventCreate" exists in all Windows versions. + publisher, openPublisherErr := publisherCache.get("EventCreate") + require.NoError(t, openPublisherErr) + require.True(t, publisher.Valid()) + + // Provider "InvalidProvider" does not exist in any Windows version. + publisher, openPublisherErr = publisherCache.get("InvalidProvider") + require.Error(t, openPublisherErr, "%v", publisherCache) + require.False(t, publisher.Valid()) + + // Get the existing publisher again. + publisher, openPublisherErr = publisherCache.get("EventCreate") + require.NoError(t, openPublisherErr) + require.True(t, publisher.Valid()) + + // Get "InvalidProvider" publisher again. + publisher, openPublisherErr = publisherCache.get("InvalidProvider") + require.NoError(t, openPublisherErr) // It is cached, no error opening it. + require.False(t, publisher.Valid()) +} From 9502529b98f33b51e5b321afcf74c560cad6cdf3 Mon Sep 17 00:00:00 2001 From: Paulo Janotti Date: Thu, 12 Oct 2023 15:50:26 -0700 Subject: [PATCH 2/3] Fix wrong assumption about EventCreate provider --- pkg/stanza/operator/input/windows/publishercache_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/stanza/operator/input/windows/publishercache_test.go b/pkg/stanza/operator/input/windows/publishercache_test.go index e3ada625941c..63f29cb68865 100644 --- a/pkg/stanza/operator/input/windows/publishercache_test.go +++ b/pkg/stanza/operator/input/windows/publishercache_test.go @@ -46,8 +46,8 @@ func TestValidAndInvalidPublishers(t *testing.T) { publisherCache := newPublisherCache() defer publisherCache.evictAll() - // Provider "EventCreate" exists in all Windows versions. - publisher, openPublisherErr := publisherCache.get("EventCreate") + // Provider "Application" exists in all Windows versions. + publisher, openPublisherErr := publisherCache.get("Application") require.NoError(t, openPublisherErr) require.True(t, publisher.Valid()) @@ -57,7 +57,7 @@ func TestValidAndInvalidPublishers(t *testing.T) { require.False(t, publisher.Valid()) // Get the existing publisher again. - publisher, openPublisherErr = publisherCache.get("EventCreate") + publisher, openPublisherErr = publisherCache.get("Application") require.NoError(t, openPublisherErr) require.True(t, publisher.Valid()) From 3989a5d19907c57498c8987849c63a7c0f3a6891 Mon Sep 17 00:00:00 2001 From: Paulo Janotti Date: Fri, 13 Oct 2023 10:21:18 -0700 Subject: [PATCH 3/3] Add chloggen file --- ...-warning-when-publisher-not-available.yaml | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 .chloggen/log-warning-when-publisher-not-available.yaml diff --git a/.chloggen/log-warning-when-publisher-not-available.yaml b/.chloggen/log-warning-when-publisher-not-available.yaml new file mode 100644 index 000000000000..1228492e1f65 --- /dev/null +++ b/.chloggen/log-warning-when-publisher-not-available.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: pkg/stanza + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Log warning, instead of error, when Windows Event Log publisher metadata is not available and cache the successfully retrieved ones. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [27658] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: []