From 4ae89069c6777d95b600d4f95cbcd31a23e877d2 Mon Sep 17 00:00:00 2001 From: Qi Chen Date: Fri, 2 Feb 2024 16:17:40 -0600 Subject: [PATCH] ENH: error handling in opensearch client refreshment and metrics (#4039) * ENH: error handling in client refreshment and metrics Signed-off-by: George Chen --- .../source/opensearch/ClientRefresher.java | 13 ++++++- .../source/opensearch/OpenSearchSource.java | 2 +- .../OpenSearchSourcePluginMetrics.java | 14 ++++++++ .../worker/client/SearchAccessorStrategy.java | 20 +++++++---- .../opensearch/ClientRefresherTest.java | 36 ++++++++++++++++++- .../opensearch/OpenSearchSourceTest.java | 4 +-- .../client/SearchAccessStrategyTest.java | 7 +++- 7 files changed, 84 insertions(+), 12 deletions(-) diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/ClientRefresher.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/ClientRefresher.java index 1ae322fe0d..f8ba735321 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/ClientRefresher.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/ClientRefresher.java @@ -1,6 +1,9 @@ package org.opensearch.dataprepper.plugins.source.opensearch; import org.opensearch.dataprepper.model.plugin.PluginComponentRefresher; +import org.opensearch.dataprepper.plugins.source.opensearch.metrics.OpenSearchSourcePluginMetrics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Objects; import java.util.concurrent.locks.ReadWriteLock; @@ -9,15 +12,19 @@ public class ClientRefresher implements PluginComponentRefresher { + private static final Logger LOG = LoggerFactory.getLogger(ClientRefresher.class); private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + private final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics; private final Function clientFunction; private OpenSearchSourceConfiguration existingConfig; private final Class clientClass; private Client currentClient; - public ClientRefresher(final Class clientClass, + public ClientRefresher(final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics, + final Class clientClass, final Function clientFunction, final OpenSearchSourceConfiguration openSearchSourceConfiguration) { + this.openSearchSourcePluginMetrics = openSearchSourcePluginMetrics; this.clientClass = clientClass; this.clientFunction = clientFunction; existingConfig = openSearchSourceConfiguration; @@ -42,10 +49,14 @@ public Client get() { @Override public void update(final OpenSearchSourceConfiguration openSearchSourceConfiguration) { if (basicAuthChanged(openSearchSourceConfiguration)) { + openSearchSourcePluginMetrics.getCredentialsChangeCounter().increment(); readWriteLock.writeLock().lock(); try { currentClient = clientFunction.apply(openSearchSourceConfiguration); existingConfig = openSearchSourceConfiguration; + } catch (Exception e) { + openSearchSourcePluginMetrics.getClientRefreshErrorsCounter().increment(); + LOG.error("Refreshing {} failed.", getComponentClass(), e); } finally { readWriteLock.writeLock().unlock(); } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSource.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSource.java index d4c91b57ef..57a9e2fa95 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSource.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSource.java @@ -69,7 +69,7 @@ private void startProcess(final OpenSearchSourceConfiguration openSearchSourceCo final OpenSearchClientFactory openSearchClientFactory = OpenSearchClientFactory.create(awsCredentialsSupplier); final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics = OpenSearchSourcePluginMetrics.create(pluginMetrics); final SearchAccessorStrategy searchAccessorStrategy = SearchAccessorStrategy.create( - openSearchSourceConfiguration, openSearchClientFactory, pluginConfigObservable); + openSearchSourcePluginMetrics, openSearchSourceConfiguration, openSearchClientFactory, pluginConfigObservable); final SearchAccessor searchAccessor = searchAccessorStrategy.getSearchAccessor(); diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/metrics/OpenSearchSourcePluginMetrics.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/metrics/OpenSearchSourcePluginMetrics.java index 2064ddbae6..3f1c7792c1 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/metrics/OpenSearchSourcePluginMetrics.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/metrics/OpenSearchSourcePluginMetrics.java @@ -18,6 +18,8 @@ public class OpenSearchSourcePluginMetrics { static final String PROCESSING_ERRORS = "processingErrors"; static final String BYTES_RECEIVED = "bytesReceived"; static final String BYTES_PROCESSED = "bytesProcessed"; + static final String CREDENTIALS_CHANGED = "credentialsChanged"; + static final String CLIENT_REFRESH_ERRORS = "clientRefreshErrors"; private final Counter documentsProcessedCounter; private final Counter indicesProcessedCounter; @@ -26,6 +28,8 @@ public class OpenSearchSourcePluginMetrics { private final DistributionSummary bytesReceivedSummary; private final DistributionSummary bytesProcessedSummary; + private final Counter credentialsChangeCounter; + private final Counter clientRefreshErrorsCounter; public static OpenSearchSourcePluginMetrics create(final PluginMetrics pluginMetrics) { return new OpenSearchSourcePluginMetrics(pluginMetrics); @@ -38,6 +42,8 @@ private OpenSearchSourcePluginMetrics(final PluginMetrics pluginMetrics) { indexProcessingTimeTimer = pluginMetrics.timer(INDEX_PROCESSING_TIME_ELAPSED); bytesReceivedSummary = pluginMetrics.summary(BYTES_RECEIVED); bytesProcessedSummary = pluginMetrics.summary(BYTES_PROCESSED); + credentialsChangeCounter = pluginMetrics.counter(CREDENTIALS_CHANGED); + clientRefreshErrorsCounter = pluginMetrics.counter(CLIENT_REFRESH_ERRORS); } public Counter getDocumentsProcessedCounter() { @@ -63,4 +69,12 @@ public DistributionSummary getBytesReceivedSummary() { public DistributionSummary getBytesProcessedSummary() { return bytesProcessedSummary; } + + public Counter getCredentialsChangeCounter() { + return credentialsChangeCounter; + } + + public Counter getClientRefreshErrorsCounter() { + return clientRefreshErrorsCounter; + } } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessorStrategy.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessorStrategy.java index accb5845fc..6a3b1fa609 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessorStrategy.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessorStrategy.java @@ -14,6 +14,7 @@ import org.opensearch.dataprepper.model.plugin.PluginConfigObservable; import org.opensearch.dataprepper.plugins.source.opensearch.ClientRefresher; import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration; +import org.opensearch.dataprepper.plugins.source.opensearch.metrics.OpenSearchSourcePluginMetrics; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DistributionVersion; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchContextType; import org.slf4j.Logger; @@ -39,18 +40,23 @@ public class SearchAccessorStrategy { private final OpenSearchClientFactory openSearchClientFactory; + private final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics; private final OpenSearchSourceConfiguration openSearchSourceConfiguration; private final PluginConfigObservable pluginConfigObservable; - public static SearchAccessorStrategy create(final OpenSearchSourceConfiguration openSearchSourceConfiguration, + public static SearchAccessorStrategy create(final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics, + final OpenSearchSourceConfiguration openSearchSourceConfiguration, final OpenSearchClientFactory openSearchClientFactory, final PluginConfigObservable pluginConfigObservable) { - return new SearchAccessorStrategy(openSearchSourceConfiguration, openSearchClientFactory, pluginConfigObservable); + return new SearchAccessorStrategy( + openSearchSourcePluginMetrics, openSearchSourceConfiguration, openSearchClientFactory, pluginConfigObservable); } - private SearchAccessorStrategy(final OpenSearchSourceConfiguration openSearchSourceConfiguration, + private SearchAccessorStrategy(final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics, + final OpenSearchSourceConfiguration openSearchSourceConfiguration, final OpenSearchClientFactory openSearchClientFactory, final PluginConfigObservable pluginConfigObservable) { + this.openSearchSourcePluginMetrics = openSearchSourcePluginMetrics; this.openSearchSourceConfiguration = openSearchSourceConfiguration; this.openSearchClientFactory = openSearchClientFactory; this.pluginConfigObservable = pluginConfigObservable; @@ -64,7 +70,8 @@ private SearchAccessorStrategy(final OpenSearchSourceConfiguration openSearchSou public SearchAccessor getSearchAccessor() { final PluginComponentRefresher clientRefresher = - new ClientRefresher<>(OpenSearchClient.class, openSearchClientFactory::provideOpenSearchClient, + new ClientRefresher<>(openSearchSourcePluginMetrics, + OpenSearchClient.class, openSearchClientFactory::provideOpenSearchClient, openSearchSourceConfiguration); if (Objects.nonNull(openSearchSourceConfiguration.getAwsAuthenticationOptions()) && @@ -93,8 +100,9 @@ public SearchAccessor getSearchAccessor() { LOG.info("Detected Elasticsearch cluster. Constructing Elasticsearch client"); try { - elasticsearchClientRefresher = new ClientRefresher<>(ElasticsearchClient.class, - openSearchClientFactory::provideElasticSearchClient, openSearchSourceConfiguration); + elasticsearchClientRefresher = new ClientRefresher<>(openSearchSourcePluginMetrics, + ElasticsearchClient.class, openSearchClientFactory::provideElasticSearchClient, + openSearchSourceConfiguration); final PluginComponentRefresher finalElasticsearchClientRefresher = elasticsearchClientRefresher; pluginConfigObservable.addPluginConfigObserver( diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/ClientRefresherTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/ClientRefresherTest.java index 2d1fce7baf..449212f556 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/ClientRefresherTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/ClientRefresherTest.java @@ -1,11 +1,13 @@ package org.opensearch.dataprepper.plugins.source.opensearch; +import io.micrometer.core.instrument.Counter; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.dataprepper.plugins.source.opensearch.metrics.OpenSearchSourcePluginMetrics; import java.util.function.Function; @@ -13,6 +15,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @@ -26,11 +29,21 @@ class ClientRefresherTest { @Mock private OpenSearchSourceConfiguration openSearchSourceConfiguration; + @Mock + private OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics; + + @Mock + private Counter basicAuthChangedCounter; + + @Mock + private Counter clientRefreshErrors; + @Mock private Object client; private ClientRefresher createObjectUnderTest() { - return new ClientRefresher(Object.class, clientFunction, openSearchSourceConfiguration); + return new ClientRefresher( + openSearchSourcePluginMetrics, Object.class, clientFunction, openSearchSourceConfiguration); } @BeforeEach @@ -58,6 +71,7 @@ void testGetAfterUpdateWithBasicAuthUnchanged() { @Test void testGetAfterUpdateWithUsernameChanged() { + when(openSearchSourcePluginMetrics.getCredentialsChangeCounter()).thenReturn(basicAuthChangedCounter); final ClientRefresher objectUnderTest = createObjectUnderTest(); when(openSearchSourceConfiguration.getUsername()).thenReturn(TEST_USERNAME); final OpenSearchSourceConfiguration newConfig = mock(OpenSearchSourceConfiguration.class); @@ -66,10 +80,12 @@ void testGetAfterUpdateWithUsernameChanged() { when(clientFunction.apply(eq(newConfig))).thenReturn(newClient); objectUnderTest.update(newConfig); assertThat(objectUnderTest.get(), equalTo(newClient)); + verify(basicAuthChangedCounter).increment(); } @Test void testGetAfterUpdateWithPasswordChanged() { + when(openSearchSourcePluginMetrics.getCredentialsChangeCounter()).thenReturn(basicAuthChangedCounter); final ClientRefresher objectUnderTest = createObjectUnderTest(); when(openSearchSourceConfiguration.getUsername()).thenReturn(TEST_USERNAME); when(openSearchSourceConfiguration.getPassword()).thenReturn(TEST_PASSWORD); @@ -80,5 +96,23 @@ void testGetAfterUpdateWithPasswordChanged() { when(clientFunction.apply(eq(newConfig))).thenReturn(newClient); objectUnderTest.update(newConfig); assertThat(objectUnderTest.get(), equalTo(newClient)); + verify(basicAuthChangedCounter).increment(); + } + + @Test + void testGetAfterUpdateClientFailure() { + when(openSearchSourcePluginMetrics.getCredentialsChangeCounter()).thenReturn(basicAuthChangedCounter); + when(openSearchSourcePluginMetrics.getClientRefreshErrorsCounter()).thenReturn(clientRefreshErrors); + final ClientRefresher objectUnderTest = createObjectUnderTest(); + when(openSearchSourceConfiguration.getUsername()).thenReturn(TEST_USERNAME); + when(openSearchSourceConfiguration.getPassword()).thenReturn(TEST_PASSWORD); + final OpenSearchSourceConfiguration newConfig = mock(OpenSearchSourceConfiguration.class); + when(newConfig.getUsername()).thenReturn(TEST_USERNAME); + when(newConfig.getPassword()).thenReturn(TEST_PASSWORD + "_changed"); + when(clientFunction.apply(eq(newConfig))).thenThrow(RuntimeException.class); + objectUnderTest.update(newConfig); + assertThat(objectUnderTest.get(), equalTo(client)); + verify(basicAuthChangedCounter).increment(); + verify(clientRefreshErrors).increment(); } } \ No newline at end of file diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSourceTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSourceTest.java index 78e67cd98a..efe5c5fcf6 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSourceTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSourceTest.java @@ -109,7 +109,7 @@ void start_with_non_null_buffer_does_not_throw() { final MockedStatic serverlessOptionsFactoryMockedStatic = mockStatic(ServerlessOptionsFactory.class)) { openSearchClientFactoryMockedStatic.when(() -> OpenSearchClientFactory.create(awsCredentialsSupplier)).thenReturn(openSearchClientFactory); searchAccessorStrategyMockedStatic.when(() -> SearchAccessorStrategy.create( - openSearchSourceConfiguration, openSearchClientFactory, pluginConfigObservable)).thenReturn(searchAccessorStrategy); + openSearchSourcePluginMetrics, openSearchSourceConfiguration, openSearchClientFactory, pluginConfigObservable)).thenReturn(searchAccessorStrategy); openSearchSourcePluginMetricsMockedStatic.when(() -> OpenSearchSourcePluginMetrics.create(pluginMetrics)).thenReturn(openSearchSourcePluginMetrics); openSearchServiceMockedStatic.when(() -> OpenSearchService.createOpenSearchService(searchAccessor, sourceCoordinator, openSearchSourceConfiguration, buffer, acknowledgementSetManager, openSearchSourcePluginMetrics)) @@ -137,7 +137,7 @@ void start_with_non_null_buffer_serverless_options_does_not_throw() { final MockedStatic serverlessNetworkPolicyUpdaterFactoryMockedStatic = mockStatic(ServerlessNetworkPolicyUpdaterFactory.class)) { openSearchClientFactoryMockedStatic.when(() -> OpenSearchClientFactory.create(awsCredentialsSupplier)).thenReturn(openSearchClientFactory); searchAccessorStrategyMockedStatic.when(() -> SearchAccessorStrategy.create( - openSearchSourceConfiguration, openSearchClientFactory, pluginConfigObservable)).thenReturn(searchAccessorStrategy); + openSearchSourcePluginMetrics, openSearchSourceConfiguration, openSearchClientFactory, pluginConfigObservable)).thenReturn(searchAccessorStrategy); openSearchSourcePluginMetricsMockedStatic.when(() -> OpenSearchSourcePluginMetrics.create(pluginMetrics)).thenReturn(openSearchSourcePluginMetrics); openSearchServiceMockedStatic.when(() -> OpenSearchService.createOpenSearchService(searchAccessor, sourceCoordinator, openSearchSourceConfiguration, buffer, acknowledgementSetManager, openSearchSourcePluginMetrics)) diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessStrategyTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessStrategyTest.java index 4b56a0fc1b..cc8695a923 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessStrategyTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessStrategyTest.java @@ -23,6 +23,7 @@ import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration; import org.opensearch.dataprepper.plugins.source.opensearch.configuration.AwsAuthenticationConfiguration; import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SearchConfiguration; +import org.opensearch.dataprepper.plugins.source.opensearch.metrics.OpenSearchSourcePluginMetrics; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DistributionVersion; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchContextType; @@ -48,12 +49,16 @@ public class SearchAccessStrategyTest { @Mock private OpenSearchSourceConfiguration openSearchSourceConfiguration; + @Mock + private OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics; + @Mock private PluginConfigObservable pluginConfigObservable; private SearchAccessorStrategy createObjectUnderTest() { return SearchAccessorStrategy.create( - openSearchSourceConfiguration, openSearchClientFactory, pluginConfigObservable); + openSearchSourcePluginMetrics, openSearchSourceConfiguration, openSearchClientFactory, + pluginConfigObservable); } @ParameterizedTest