Skip to content

Commit

Permalink
ENH: error handling in opensearch client refreshment and metrics (ope…
Browse files Browse the repository at this point in the history
…nsearch-project#4039)

* ENH: error handling in client refreshment and metrics

Signed-off-by: George Chen <[email protected]>
  • Loading branch information
chenqi0805 authored Feb 2, 2024
1 parent 16d0d90 commit 4ae8906
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package org.opensearch.dataprepper.plugins.source.opensearch;

import org.opensearch.dataprepper.model.plugin.PluginComponentRefresher;
import org.opensearch.dataprepper.plugins.source.opensearch.metrics.OpenSearchSourcePluginMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;
import java.util.concurrent.locks.ReadWriteLock;
Expand All @@ -9,15 +12,19 @@

public class ClientRefresher<Client>
implements PluginComponentRefresher<Client, OpenSearchSourceConfiguration> {
private static final Logger LOG = LoggerFactory.getLogger(ClientRefresher.class);
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics;
private final Function<OpenSearchSourceConfiguration, Client> clientFunction;
private OpenSearchSourceConfiguration existingConfig;
private final Class<Client> clientClass;
private Client currentClient;

public ClientRefresher(final Class<Client> clientClass,
public ClientRefresher(final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics,
final Class<Client> clientClass,
final Function<OpenSearchSourceConfiguration, Client> clientFunction,
final OpenSearchSourceConfiguration openSearchSourceConfiguration) {
this.openSearchSourcePluginMetrics = openSearchSourcePluginMetrics;
this.clientClass = clientClass;
this.clientFunction = clientFunction;
existingConfig = openSearchSourceConfiguration;
Expand All @@ -42,10 +49,14 @@ public Client get() {
@Override
public void update(final OpenSearchSourceConfiguration openSearchSourceConfiguration) {
if (basicAuthChanged(openSearchSourceConfiguration)) {
openSearchSourcePluginMetrics.getCredentialsChangeCounter().increment();
readWriteLock.writeLock().lock();
try {
currentClient = clientFunction.apply(openSearchSourceConfiguration);
existingConfig = openSearchSourceConfiguration;
} catch (Exception e) {
openSearchSourcePluginMetrics.getClientRefreshErrorsCounter().increment();
LOG.error("Refreshing {} failed.", getComponentClass(), e);
} finally {
readWriteLock.writeLock().unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ private void startProcess(final OpenSearchSourceConfiguration openSearchSourceCo
final OpenSearchClientFactory openSearchClientFactory = OpenSearchClientFactory.create(awsCredentialsSupplier);
final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics = OpenSearchSourcePluginMetrics.create(pluginMetrics);
final SearchAccessorStrategy searchAccessorStrategy = SearchAccessorStrategy.create(
openSearchSourceConfiguration, openSearchClientFactory, pluginConfigObservable);
openSearchSourcePluginMetrics, openSearchSourceConfiguration, openSearchClientFactory, pluginConfigObservable);

final SearchAccessor searchAccessor = searchAccessorStrategy.getSearchAccessor();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ public class OpenSearchSourcePluginMetrics {
static final String PROCESSING_ERRORS = "processingErrors";
static final String BYTES_RECEIVED = "bytesReceived";
static final String BYTES_PROCESSED = "bytesProcessed";
static final String CREDENTIALS_CHANGED = "credentialsChanged";
static final String CLIENT_REFRESH_ERRORS = "clientRefreshErrors";

private final Counter documentsProcessedCounter;
private final Counter indicesProcessedCounter;
Expand All @@ -26,6 +28,8 @@ public class OpenSearchSourcePluginMetrics {

private final DistributionSummary bytesReceivedSummary;
private final DistributionSummary bytesProcessedSummary;
private final Counter credentialsChangeCounter;
private final Counter clientRefreshErrorsCounter;

public static OpenSearchSourcePluginMetrics create(final PluginMetrics pluginMetrics) {
return new OpenSearchSourcePluginMetrics(pluginMetrics);
Expand All @@ -38,6 +42,8 @@ private OpenSearchSourcePluginMetrics(final PluginMetrics pluginMetrics) {
indexProcessingTimeTimer = pluginMetrics.timer(INDEX_PROCESSING_TIME_ELAPSED);
bytesReceivedSummary = pluginMetrics.summary(BYTES_RECEIVED);
bytesProcessedSummary = pluginMetrics.summary(BYTES_PROCESSED);
credentialsChangeCounter = pluginMetrics.counter(CREDENTIALS_CHANGED);
clientRefreshErrorsCounter = pluginMetrics.counter(CLIENT_REFRESH_ERRORS);
}

public Counter getDocumentsProcessedCounter() {
Expand All @@ -63,4 +69,12 @@ public DistributionSummary getBytesReceivedSummary() {
public DistributionSummary getBytesProcessedSummary() {
return bytesProcessedSummary;
}

public Counter getCredentialsChangeCounter() {
return credentialsChangeCounter;
}

public Counter getClientRefreshErrorsCounter() {
return clientRefreshErrorsCounter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.dataprepper.model.plugin.PluginConfigObservable;
import org.opensearch.dataprepper.plugins.source.opensearch.ClientRefresher;
import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.metrics.OpenSearchSourcePluginMetrics;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DistributionVersion;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchContextType;
import org.slf4j.Logger;
Expand All @@ -39,18 +40,23 @@ public class SearchAccessorStrategy {


private final OpenSearchClientFactory openSearchClientFactory;
private final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics;
private final OpenSearchSourceConfiguration openSearchSourceConfiguration;
private final PluginConfigObservable pluginConfigObservable;

public static SearchAccessorStrategy create(final OpenSearchSourceConfiguration openSearchSourceConfiguration,
public static SearchAccessorStrategy create(final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics,
final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final OpenSearchClientFactory openSearchClientFactory,
final PluginConfigObservable pluginConfigObservable) {
return new SearchAccessorStrategy(openSearchSourceConfiguration, openSearchClientFactory, pluginConfigObservable);
return new SearchAccessorStrategy(
openSearchSourcePluginMetrics, openSearchSourceConfiguration, openSearchClientFactory, pluginConfigObservable);
}

private SearchAccessorStrategy(final OpenSearchSourceConfiguration openSearchSourceConfiguration,
private SearchAccessorStrategy(final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics,
final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final OpenSearchClientFactory openSearchClientFactory,
final PluginConfigObservable pluginConfigObservable) {
this.openSearchSourcePluginMetrics = openSearchSourcePluginMetrics;
this.openSearchSourceConfiguration = openSearchSourceConfiguration;
this.openSearchClientFactory = openSearchClientFactory;
this.pluginConfigObservable = pluginConfigObservable;
Expand All @@ -64,7 +70,8 @@ private SearchAccessorStrategy(final OpenSearchSourceConfiguration openSearchSou
public SearchAccessor getSearchAccessor() {

final PluginComponentRefresher<OpenSearchClient, OpenSearchSourceConfiguration> clientRefresher =
new ClientRefresher<>(OpenSearchClient.class, openSearchClientFactory::provideOpenSearchClient,
new ClientRefresher<>(openSearchSourcePluginMetrics,
OpenSearchClient.class, openSearchClientFactory::provideOpenSearchClient,
openSearchSourceConfiguration);

if (Objects.nonNull(openSearchSourceConfiguration.getAwsAuthenticationOptions()) &&
Expand Down Expand Up @@ -93,8 +100,9 @@ public SearchAccessor getSearchAccessor() {
LOG.info("Detected Elasticsearch cluster. Constructing Elasticsearch client");

try {
elasticsearchClientRefresher = new ClientRefresher<>(ElasticsearchClient.class,
openSearchClientFactory::provideElasticSearchClient, openSearchSourceConfiguration);
elasticsearchClientRefresher = new ClientRefresher<>(openSearchSourcePluginMetrics,
ElasticsearchClient.class, openSearchClientFactory::provideElasticSearchClient,
openSearchSourceConfiguration);
final PluginComponentRefresher<ElasticsearchClient, OpenSearchSourceConfiguration>
finalElasticsearchClientRefresher = elasticsearchClientRefresher;
pluginConfigObservable.addPluginConfigObserver(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
package org.opensearch.dataprepper.plugins.source.opensearch;

import io.micrometer.core.instrument.Counter;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.dataprepper.plugins.source.opensearch.metrics.OpenSearchSourcePluginMetrics;

import java.util.function.Function;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
Expand All @@ -26,11 +29,21 @@ class ClientRefresherTest {
@Mock
private OpenSearchSourceConfiguration openSearchSourceConfiguration;

@Mock
private OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics;

@Mock
private Counter basicAuthChangedCounter;

@Mock
private Counter clientRefreshErrors;

@Mock
private Object client;

private ClientRefresher createObjectUnderTest() {
return new ClientRefresher(Object.class, clientFunction, openSearchSourceConfiguration);
return new ClientRefresher(
openSearchSourcePluginMetrics, Object.class, clientFunction, openSearchSourceConfiguration);
}

@BeforeEach
Expand Down Expand Up @@ -58,6 +71,7 @@ void testGetAfterUpdateWithBasicAuthUnchanged() {

@Test
void testGetAfterUpdateWithUsernameChanged() {
when(openSearchSourcePluginMetrics.getCredentialsChangeCounter()).thenReturn(basicAuthChangedCounter);
final ClientRefresher objectUnderTest = createObjectUnderTest();
when(openSearchSourceConfiguration.getUsername()).thenReturn(TEST_USERNAME);
final OpenSearchSourceConfiguration newConfig = mock(OpenSearchSourceConfiguration.class);
Expand All @@ -66,10 +80,12 @@ void testGetAfterUpdateWithUsernameChanged() {
when(clientFunction.apply(eq(newConfig))).thenReturn(newClient);
objectUnderTest.update(newConfig);
assertThat(objectUnderTest.get(), equalTo(newClient));
verify(basicAuthChangedCounter).increment();
}

@Test
void testGetAfterUpdateWithPasswordChanged() {
when(openSearchSourcePluginMetrics.getCredentialsChangeCounter()).thenReturn(basicAuthChangedCounter);
final ClientRefresher objectUnderTest = createObjectUnderTest();
when(openSearchSourceConfiguration.getUsername()).thenReturn(TEST_USERNAME);
when(openSearchSourceConfiguration.getPassword()).thenReturn(TEST_PASSWORD);
Expand All @@ -80,5 +96,23 @@ void testGetAfterUpdateWithPasswordChanged() {
when(clientFunction.apply(eq(newConfig))).thenReturn(newClient);
objectUnderTest.update(newConfig);
assertThat(objectUnderTest.get(), equalTo(newClient));
verify(basicAuthChangedCounter).increment();
}

@Test
void testGetAfterUpdateClientFailure() {
when(openSearchSourcePluginMetrics.getCredentialsChangeCounter()).thenReturn(basicAuthChangedCounter);
when(openSearchSourcePluginMetrics.getClientRefreshErrorsCounter()).thenReturn(clientRefreshErrors);
final ClientRefresher objectUnderTest = createObjectUnderTest();
when(openSearchSourceConfiguration.getUsername()).thenReturn(TEST_USERNAME);
when(openSearchSourceConfiguration.getPassword()).thenReturn(TEST_PASSWORD);
final OpenSearchSourceConfiguration newConfig = mock(OpenSearchSourceConfiguration.class);
when(newConfig.getUsername()).thenReturn(TEST_USERNAME);
when(newConfig.getPassword()).thenReturn(TEST_PASSWORD + "_changed");
when(clientFunction.apply(eq(newConfig))).thenThrow(RuntimeException.class);
objectUnderTest.update(newConfig);
assertThat(objectUnderTest.get(), equalTo(client));
verify(basicAuthChangedCounter).increment();
verify(clientRefreshErrors).increment();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ void start_with_non_null_buffer_does_not_throw() {
final MockedStatic<ServerlessOptionsFactory> serverlessOptionsFactoryMockedStatic = mockStatic(ServerlessOptionsFactory.class)) {
openSearchClientFactoryMockedStatic.when(() -> OpenSearchClientFactory.create(awsCredentialsSupplier)).thenReturn(openSearchClientFactory);
searchAccessorStrategyMockedStatic.when(() -> SearchAccessorStrategy.create(
openSearchSourceConfiguration, openSearchClientFactory, pluginConfigObservable)).thenReturn(searchAccessorStrategy);
openSearchSourcePluginMetrics, openSearchSourceConfiguration, openSearchClientFactory, pluginConfigObservable)).thenReturn(searchAccessorStrategy);
openSearchSourcePluginMetricsMockedStatic.when(() -> OpenSearchSourcePluginMetrics.create(pluginMetrics)).thenReturn(openSearchSourcePluginMetrics);

openSearchServiceMockedStatic.when(() -> OpenSearchService.createOpenSearchService(searchAccessor, sourceCoordinator, openSearchSourceConfiguration, buffer, acknowledgementSetManager, openSearchSourcePluginMetrics))
Expand Down Expand Up @@ -137,7 +137,7 @@ void start_with_non_null_buffer_serverless_options_does_not_throw() {
final MockedStatic<ServerlessNetworkPolicyUpdaterFactory> serverlessNetworkPolicyUpdaterFactoryMockedStatic = mockStatic(ServerlessNetworkPolicyUpdaterFactory.class)) {
openSearchClientFactoryMockedStatic.when(() -> OpenSearchClientFactory.create(awsCredentialsSupplier)).thenReturn(openSearchClientFactory);
searchAccessorStrategyMockedStatic.when(() -> SearchAccessorStrategy.create(
openSearchSourceConfiguration, openSearchClientFactory, pluginConfigObservable)).thenReturn(searchAccessorStrategy);
openSearchSourcePluginMetrics, openSearchSourceConfiguration, openSearchClientFactory, pluginConfigObservable)).thenReturn(searchAccessorStrategy);
openSearchSourcePluginMetricsMockedStatic.when(() -> OpenSearchSourcePluginMetrics.create(pluginMetrics)).thenReturn(openSearchSourcePluginMetrics);

openSearchServiceMockedStatic.when(() -> OpenSearchService.createOpenSearchService(searchAccessor, sourceCoordinator, openSearchSourceConfiguration, buffer, acknowledgementSetManager, openSearchSourcePluginMetrics))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.AwsAuthenticationConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SearchConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.metrics.OpenSearchSourcePluginMetrics;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DistributionVersion;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchContextType;

Expand All @@ -48,12 +49,16 @@ public class SearchAccessStrategyTest {
@Mock
private OpenSearchSourceConfiguration openSearchSourceConfiguration;

@Mock
private OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics;

@Mock
private PluginConfigObservable pluginConfigObservable;

private SearchAccessorStrategy createObjectUnderTest() {
return SearchAccessorStrategy.create(
openSearchSourceConfiguration, openSearchClientFactory, pluginConfigObservable);
openSearchSourcePluginMetrics, openSearchSourceConfiguration, openSearchClientFactory,
pluginConfigObservable);
}

@ParameterizedTest
Expand Down

0 comments on commit 4ae8906

Please sign in to comment.