diff --git a/.ci/bwcVersions b/.ci/bwcVersions index 535dea1da3c28..2f4fade05f988 100644 --- a/.ci/bwcVersions +++ b/.ci/bwcVersions @@ -54,6 +54,7 @@ BWC_VERSION: - "1.3.15" - "1.3.16" - "1.3.17" + - "1.3.18" - "2.0.0" - "2.0.1" - "2.0.2" diff --git a/CHANGELOG.md b/CHANGELOG.md index 7ca7840800b60..19eb74a935b19 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,12 +12,14 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Remote Store] Add dynamic cluster settings to set timeout for segments upload to Remote Store ([#13679](https://github.com/opensearch-project/OpenSearch/pull/13679)) - Add getMetadataFields to MapperService ([#13819](https://github.com/opensearch-project/OpenSearch/pull/13819)) - Allow setting query parameters on requests ([#13776](https://github.com/opensearch-project/OpenSearch/issues/13776)) +- [Remote Store] Add support to disable flush based on translog reader count ([#14027](https://github.com/opensearch-project/OpenSearch/pull/14027)) +- [Query Insights] Add exporter support for top n queries ([#12982](https://github.com/opensearch-project/OpenSearch/pull/12982)) - Add remote routing table for remote state publication with experimental feature flag ([#13304](https://github.com/opensearch-project/OpenSearch/pull/13304)) ### Dependencies - Bump `com.github.spullara.mustache.java:compiler` from 0.9.10 to 0.9.13 ([#13329](https://github.com/opensearch-project/OpenSearch/pull/13329), [#13559](https://github.com/opensearch-project/OpenSearch/pull/13559)) - Bump `org.apache.commons:commons-text` from 1.11.0 to 1.12.0 ([#13557](https://github.com/opensearch-project/OpenSearch/pull/13557)) -- Bump `org.hdrhistogram:HdrHistogram` from 2.1.12 to 2.2.1 ([#13556](https://github.com/opensearch-project/OpenSearch/pull/13556)) +- Bump `org.hdrhistogram:HdrHistogram` from 2.1.12 to 2.2.2 ([#13556](https://github.com/opensearch-project/OpenSearch/pull/13556), [#13986](https://github.com/opensearch-project/OpenSearch/pull/13986)) - Bump `com.gradle.enterprise` from 3.17.2 to 3.17.4 ([#13641](https://github.com/opensearch-project/OpenSearch/pull/13641), [#13753](https://github.com/opensearch-project/OpenSearch/pull/13753)) - Bump `org.apache.hadoop:hadoop-minicluster` from 3.3.6 to 3.4.0 ([#13642](https://github.com/opensearch-project/OpenSearch/pull/13642)) - Bump `mockito` from 5.11.0 to 5.12.0 ([#13665](https://github.com/opensearch-project/OpenSearch/pull/13665)) @@ -36,6 +38,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add ability for Boolean and date field queries to run when only doc_values are enabled ([#11650](https://github.com/opensearch-project/OpenSearch/pull/11650)) - Refactor implementations of query phase searcher, allow QueryCollectorContext to have zero collectors ([#13481](https://github.com/opensearch-project/OpenSearch/pull/13481)) - Adds support to inject telemetry instances to plugins ([#13636](https://github.com/opensearch-project/OpenSearch/pull/13636)) +- Move cache removal notifications outside lru lock ([#14017](https://github.com/opensearch-project/OpenSearch/pull/14017)) ### Deprecated @@ -49,6 +52,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Don't return negative scores from `multi_match` query with `cross_fields` type ([#13829](https://github.com/opensearch-project/OpenSearch/pull/13829)) - Pass parent filter to inner hit query ([#13903](https://github.com/opensearch-project/OpenSearch/pull/13903)) - Fix NPE on restore searchable snapshot ([#13911](https://github.com/opensearch-project/OpenSearch/pull/13911)) +- Fix double invocation of postCollection when MultiBucketCollector is present ([#14015](https://github.com/opensearch-project/OpenSearch/pull/14015)) ### Security diff --git a/buildSrc/version.properties b/buildSrc/version.properties index 96d4f2a39f66b..38710f3301bb6 100644 --- a/buildSrc/version.properties +++ b/buildSrc/version.properties @@ -23,6 +23,8 @@ guava = 32.1.1-jre protobuf = 3.22.3 jakarta_annotation = 1.3.5 google_http_client = 1.44.1 +tdigest = 3.2 +hdrhistogram = 2.2.2 # when updating the JNA version, also update the version in buildSrc/build.gradle jna = 5.13.0 diff --git a/gradle/ide.gradle b/gradle/ide.gradle index 14d6b2982ccd0..ea353f8d92bdd 100644 --- a/gradle/ide.gradle +++ b/gradle/ide.gradle @@ -28,7 +28,7 @@ allprojects { apply plugin: 'idea' tasks.named('idea').configure { - doFirst { throw new GradleException("Use of the 'idea' task has been deprecated. For details on importing into IntelliJ see CONTRIBUTING.md.") } + doFirst { throw new GradleException("Use of the 'idea' task has been deprecated. For details on importing into IntelliJ see DEVELOPER_GUIDE.md.") } } } diff --git a/libs/core/src/main/java/org/opensearch/Version.java b/libs/core/src/main/java/org/opensearch/Version.java index be34553aa214c..da178ef308553 100644 --- a/libs/core/src/main/java/org/opensearch/Version.java +++ b/libs/core/src/main/java/org/opensearch/Version.java @@ -97,6 +97,7 @@ public class Version implements Comparable, ToXContentFragment { public static final Version V_1_3_15 = new Version(1031599, org.apache.lucene.util.Version.LUCENE_8_10_1); public static final Version V_1_3_16 = new Version(1031699, org.apache.lucene.util.Version.LUCENE_8_10_1); public static final Version V_1_3_17 = new Version(1031799, org.apache.lucene.util.Version.LUCENE_8_10_1); + public static final Version V_1_3_18 = new Version(1031899, org.apache.lucene.util.Version.LUCENE_8_10_1); public static final Version V_2_0_0 = new Version(2000099, org.apache.lucene.util.Version.LUCENE_9_1_0); public static final Version V_2_0_1 = new Version(2000199, org.apache.lucene.util.Version.LUCENE_9_1_0); public static final Version V_2_0_2 = new Version(2000299, org.apache.lucene.util.Version.LUCENE_9_1_0); diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java index 4d7e0d486068a..22831c3e0f8ba 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java @@ -70,7 +70,7 @@ public Collection createComponents( final Supplier repositoriesServiceSupplier ) { // create top n queries service - final QueryInsightsService queryInsightsService = new QueryInsightsService(threadPool); + final QueryInsightsService queryInsightsService = new QueryInsightsService(clusterService.getClusterSettings(), threadPool, client); return List.of(queryInsightsService, new QueryInsightsListener(clusterService, queryInsightsService)); } @@ -110,7 +110,8 @@ public List> getSettings() { // Settings for top N queries QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED, QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE, - QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE + QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE, + QueryInsightsSettings.TOP_N_LATENCY_EXPORTER_SETTINGS ); } } diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/DebugExporter.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/DebugExporter.java new file mode 100644 index 0000000000000..116bd26e1f9bc --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/DebugExporter.java @@ -0,0 +1,61 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.exporter; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; + +import java.util.List; + +/** + * Debug exporter for development purpose + */ +public final class DebugExporter implements QueryInsightsExporter { + /** + * Logger of the debug exporter + */ + private final Logger logger = LogManager.getLogger(); + + /** + * Constructor of DebugExporter + */ + private DebugExporter() {} + + private static class InstanceHolder { + private static final DebugExporter INSTANCE = new DebugExporter(); + } + + /** + Get the singleton instance of DebugExporter + * + @return DebugExporter instance + */ + public static DebugExporter getInstance() { + return InstanceHolder.INSTANCE; + } + + /** + * Write the list of SearchQueryRecord to debug log + * + * @param records list of {@link SearchQueryRecord} + */ + @Override + public void export(final List records) { + logger.debug("QUERY_INSIGHTS_RECORDS: " + records.toString()); + } + + /** + * Close the debugger exporter sink + */ + @Override + public void close() { + logger.debug("Closing the DebugExporter.."); + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java new file mode 100644 index 0000000000000..c19fe3655098b --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java @@ -0,0 +1,113 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.exporter; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.bulk.BulkRequestBuilder; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.client.Client; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.joda.time.format.DateTimeFormatter; + +import java.util.List; + +/** + * Local index exporter for exporting query insights data to local OpenSearch indices. + */ +public final class LocalIndexExporter implements QueryInsightsExporter { + /** + * Logger of the local index exporter + */ + private final Logger logger = LogManager.getLogger(); + private final Client client; + private DateTimeFormatter indexPattern; + + /** + * Constructor of LocalIndexExporter + * + * @param client OS client + * @param indexPattern the pattern of index to export to + */ + public LocalIndexExporter(final Client client, final DateTimeFormatter indexPattern) { + this.indexPattern = indexPattern; + this.client = client; + } + + /** + * Getter of indexPattern + * + * @return indexPattern + */ + public DateTimeFormatter getIndexPattern() { + return indexPattern; + } + + /** + * Setter of indexPattern + * + * @param indexPattern index pattern + * @return the current LocalIndexExporter + */ + public LocalIndexExporter setIndexPattern(DateTimeFormatter indexPattern) { + this.indexPattern = indexPattern; + return this; + } + + /** + * Export a list of SearchQueryRecord to a local index + * + * @param records list of {@link SearchQueryRecord} + */ + @Override + public void export(final List records) { + if (records == null || records.size() == 0) { + return; + } + try { + final String index = getDateTimeFromFormat(); + final BulkRequestBuilder bulkRequestBuilder = client.prepareBulk().setTimeout(TimeValue.timeValueMinutes(1)); + for (SearchQueryRecord record : records) { + bulkRequestBuilder.add( + new IndexRequest(index).source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + ); + } + bulkRequestBuilder.execute(new ActionListener() { + @Override + public void onResponse(BulkResponse bulkItemResponses) {} + + @Override + public void onFailure(Exception e) { + logger.error("Failed to execute bulk operation for query insights data: ", e); + } + }); + } catch (final Exception e) { + logger.error("Unable to index query insights data: ", e); + } + } + + /** + * Close the exporter sink + */ + @Override + public void close() { + logger.debug("Closing the LocalIndexExporter.."); + } + + private String getDateTimeFromFormat() { + return indexPattern.print(DateTime.now(DateTimeZone.UTC)); + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporter.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporter.java new file mode 100644 index 0000000000000..42e5354eb1640 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporter.java @@ -0,0 +1,26 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.exporter; + +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; + +import java.io.Closeable; +import java.util.List; + +/** + * Base interface for Query Insights exporters + */ +public interface QueryInsightsExporter extends Closeable { + /** + * Export a list of SearchQueryRecord to the exporter sink + * + * @param records list of {@link SearchQueryRecord} + */ + void export(final List records); +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactory.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactory.java new file mode 100644 index 0000000000000..7324590c9f582 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactory.java @@ -0,0 +1,143 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.exporter; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.client.Client; +import org.opensearch.common.settings.Settings; +import org.joda.time.format.DateTimeFormat; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Locale; +import java.util.Set; + +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_N_LATENCY_QUERIES_INDEX_PATTERN; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_QUERIES_EXPORTER_TYPE; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORTER_TYPE; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORT_INDEX; + +/** + * Factory class for validating and creating exporters based on provided settings + */ +public class QueryInsightsExporterFactory { + /** + * Logger of the query insights exporter factory + */ + private final Logger logger = LogManager.getLogger(); + final private Client client; + final private Set exporters; + + /** + * Constructor of QueryInsightsExporterFactory + * + * @param client OS client + */ + public QueryInsightsExporterFactory(final Client client) { + this.client = client; + this.exporters = new HashSet<>(); + } + + /** + * Validate exporter sink config + * + * @param settings exporter sink config {@link Settings} + * @throws IllegalArgumentException if provided exporter sink config settings are invalid + */ + public void validateExporterConfig(final Settings settings) throws IllegalArgumentException { + // Disable exporter if the EXPORTER_TYPE setting is null + if (settings.get(EXPORTER_TYPE) == null) { + return; + } + SinkType type; + try { + type = SinkType.parse(settings.get(EXPORTER_TYPE, DEFAULT_TOP_QUERIES_EXPORTER_TYPE)); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException( + String.format( + Locale.ROOT, + "Invalid exporter type [%s], type should be one of %s", + settings.get(EXPORTER_TYPE), + SinkType.allSinkTypes() + ) + ); + } + switch (type) { + case LOCAL_INDEX: + final String indexPattern = settings.get(EXPORT_INDEX, DEFAULT_TOP_N_LATENCY_QUERIES_INDEX_PATTERN); + if (indexPattern.length() == 0) { + throw new IllegalArgumentException("Empty index pattern configured for the exporter"); + } + try { + DateTimeFormat.forPattern(indexPattern); + } catch (Exception e) { + throw new IllegalArgumentException( + String.format(Locale.ROOT, "Invalid index pattern [%s] configured for the exporter", indexPattern) + ); + } + } + } + + /** + * Create an exporter based on provided parameters + * + * @param type The type of exporter to create + * @param indexPattern the index pattern if creating a index exporter + * @return QueryInsightsExporter the created exporter sink + */ + public QueryInsightsExporter createExporter(SinkType type, String indexPattern) { + if (SinkType.LOCAL_INDEX.equals(type)) { + QueryInsightsExporter exporter = new LocalIndexExporter(client, DateTimeFormat.forPattern(indexPattern)); + this.exporters.add(exporter); + return exporter; + } + return DebugExporter.getInstance(); + } + + /** + * Update an exporter based on provided parameters + * + * @param exporter The exporter to update + * @param indexPattern the index pattern if creating a index exporter + * @return QueryInsightsExporter the updated exporter sink + */ + public QueryInsightsExporter updateExporter(QueryInsightsExporter exporter, String indexPattern) { + if (exporter.getClass() == LocalIndexExporter.class) { + ((LocalIndexExporter) exporter).setIndexPattern(DateTimeFormat.forPattern(indexPattern)); + } + return exporter; + } + + /** + * Close an exporter + * + * @param exporter the exporter to close + */ + public void closeExporter(QueryInsightsExporter exporter) throws IOException { + if (exporter != null) { + exporter.close(); + this.exporters.remove(exporter); + } + } + + /** + * Close all exporters + * + */ + public void closeAllExporters() { + for (QueryInsightsExporter exporter : exporters) { + try { + closeExporter(exporter); + } catch (IOException e) { + logger.error("Fail to close query insights exporter, error: ", e); + } + } + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/SinkType.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/SinkType.java new file mode 100644 index 0000000000000..c90c9c76b6706 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/SinkType.java @@ -0,0 +1,66 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.exporter; + +import java.util.Arrays; +import java.util.Locale; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Type of supported sinks + */ +public enum SinkType { + /** debug exporter */ + DEBUG("debug"), + /** local index exporter */ + LOCAL_INDEX("local_index"); + + private final String type; + + SinkType(String type) { + this.type = type; + } + + @Override + public String toString() { + return type; + } + + /** + * Parse SinkType from String + * @param type the String representation of the SinkType + * @return SinkType + */ + public static SinkType parse(final String type) { + return valueOf(type.toUpperCase(Locale.ROOT)); + } + + /** + * Get all valid SinkTypes + * + * @return A set contains all valid SinkTypes + */ + public static Set allSinkTypes() { + return Arrays.stream(values()).collect(Collectors.toSet()); + } + + /** + * Get Sink type from exporter + * + * @param exporter the {@link QueryInsightsExporter} + * @return SinkType associated with this exporter + */ + public static SinkType getSinkTypeFromExporter(QueryInsightsExporter exporter) { + if (exporter.getClass().equals(LocalIndexExporter.class)) { + return SinkType.LOCAL_INDEX; + } + return SinkType.DEBUG; + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/package-info.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/package-info.java new file mode 100644 index 0000000000000..7164411194f85 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Query Insights exporter + */ +package org.opensearch.plugin.insights.core.exporter; diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java index 525ca0d4a3d33..a83bb2094f165 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java @@ -8,14 +8,18 @@ package org.opensearch.plugin.insights.core.service; +import org.opensearch.client.Client; import org.opensearch.common.inject.Inject; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory; import org.opensearch.plugin.insights.rules.model.MetricType; import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; import org.opensearch.plugin.insights.settings.QueryInsightsSettings; import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool; +import java.io.IOException; import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; @@ -23,6 +27,8 @@ import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_EXPORTER_SETTINGS; + /** * Service responsible for gathering, analyzing, storing and exporting * information related to search queries @@ -56,21 +62,35 @@ public class QueryInsightsService extends AbstractLifecycleComponent { */ protected volatile Scheduler.Cancellable scheduledFuture; + /** + * Query Insights exporter factory + */ + final QueryInsightsExporterFactory queryInsightsExporterFactory; + /** * Constructor of the QueryInsightsService * - * @param threadPool The OpenSearch thread pool to run async tasks + * @param clusterSettings OpenSearch cluster level settings + * @param threadPool The OpenSearch thread pool to run async tasks + * @param client OS client */ @Inject - public QueryInsightsService(final ThreadPool threadPool) { + public QueryInsightsService(final ClusterSettings clusterSettings, final ThreadPool threadPool, final Client client) { enableCollect = new HashMap<>(); queryRecordsQueue = new LinkedBlockingQueue<>(QueryInsightsSettings.QUERY_RECORD_QUEUE_CAPACITY); + this.threadPool = threadPool; + this.queryInsightsExporterFactory = new QueryInsightsExporterFactory(client); + // initialize top n queries services and configurations consumers topQueriesServices = new HashMap<>(); for (MetricType metricType : MetricType.allMetricTypes()) { enableCollect.put(metricType, false); - topQueriesServices.put(metricType, new TopQueriesService(metricType)); + topQueriesServices.put(metricType, new TopQueriesService(metricType, threadPool, queryInsightsExporterFactory)); } - this.threadPool = threadPool; + clusterSettings.addSettingsUpdateConsumer( + TOP_N_LATENCY_EXPORTER_SETTINGS, + (settings -> getTopQueriesService(MetricType.LATENCY).setExporter(settings)), + (settings -> getTopQueriesService(MetricType.LATENCY).validateExporterConfig(settings)) + ); } /** @@ -176,5 +196,12 @@ protected void doStop() { } @Override - protected void doClose() {} + protected void doClose() throws IOException { + // close all top n queries service + for (TopQueriesService topQueriesService : topQueriesServices.values()) { + topQueriesService.close(); + } + // close any unclosed resources + queryInsightsExporterFactory.closeAllExporters(); + } } diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java index d2c30cbdf98e7..ff90edf1ec33d 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java @@ -8,11 +8,19 @@ package org.opensearch.plugin.insights.core.service; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporter; +import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory; +import org.opensearch.plugin.insights.core.exporter.SinkType; import org.opensearch.plugin.insights.rules.model.MetricType; import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; import org.opensearch.plugin.insights.settings.QueryInsightsSettings; +import org.opensearch.threadpool.ThreadPool; +import java.io.IOException; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; @@ -27,6 +35,12 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_N_LATENCY_QUERIES_INDEX_PATTERN; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_QUERIES_EXPORTER_TYPE; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORTER_TYPE; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORT_INDEX; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR; + /** * Service responsible for gathering and storing top N queries * with high latency or resource usage @@ -34,6 +48,10 @@ * @opensearch.internal */ public class TopQueriesService { + /** + * Logger of the local index exporter + */ + private final Logger logger = LogManager.getLogger(); private boolean enabled; /** * The metric type to measure top n queries @@ -63,12 +81,34 @@ public class TopQueriesService { */ private final AtomicReference> topQueriesHistorySnapshot; - TopQueriesService(final MetricType metricType) { + /** + * Factory for validating and creating exporters + */ + private final QueryInsightsExporterFactory queryInsightsExporterFactory; + + /** + * The internal OpenSearch thread pool that execute async processing and exporting tasks + */ + private final ThreadPool threadPool; + + /** + * Exporter for exporting top queries data + */ + private QueryInsightsExporter exporter; + + TopQueriesService( + final MetricType metricType, + final ThreadPool threadPool, + final QueryInsightsExporterFactory queryInsightsExporterFactory + ) { this.enabled = false; this.metricType = metricType; + this.threadPool = threadPool; + this.queryInsightsExporterFactory = queryInsightsExporterFactory; this.topNSize = QueryInsightsSettings.DEFAULT_TOP_N_SIZE; this.windowSize = QueryInsightsSettings.DEFAULT_WINDOW_SIZE; this.windowStart = -1L; + this.exporter = null; topQueriesStore = new PriorityQueue<>(topNSize, (a, b) -> SearchQueryRecord.compare(a, b, metricType)); topQueriesCurrentSnapshot = new AtomicReference<>(new ArrayList<>()); topQueriesHistorySnapshot = new AtomicReference<>(new ArrayList<>()); @@ -169,6 +209,50 @@ public void validateWindowSize(final TimeValue windowSize) { } } + /** + * Set up the top queries exporter based on provided settings + * + * @param settings exporter config {@link Settings} + */ + public void setExporter(final Settings settings) { + if (settings.get(EXPORTER_TYPE) != null) { + SinkType expectedType = SinkType.parse(settings.get(EXPORTER_TYPE, DEFAULT_TOP_QUERIES_EXPORTER_TYPE)); + if (exporter != null && expectedType == SinkType.getSinkTypeFromExporter(exporter)) { + queryInsightsExporterFactory.updateExporter( + exporter, + settings.get(EXPORT_INDEX, DEFAULT_TOP_N_LATENCY_QUERIES_INDEX_PATTERN) + ); + } else { + try { + queryInsightsExporterFactory.closeExporter(this.exporter); + } catch (IOException e) { + logger.error("Fail to close the current exporter when updating exporter, error: ", e); + } + this.exporter = queryInsightsExporterFactory.createExporter( + SinkType.parse(settings.get(EXPORTER_TYPE, DEFAULT_TOP_QUERIES_EXPORTER_TYPE)), + settings.get(EXPORT_INDEX, DEFAULT_TOP_N_LATENCY_QUERIES_INDEX_PATTERN) + ); + } + } else { + // Disable exporter if exporter type is set to null + try { + queryInsightsExporterFactory.closeExporter(this.exporter); + this.exporter = null; + } catch (IOException e) { + logger.error("Fail to close the current exporter when disabling exporter, error: ", e); + } + } + } + + /** + * Validate provided settings for top queries exporter + * + * @param settings settings exporter config {@link Settings} + */ + public void validateExporterConfig(Settings settings) { + queryInsightsExporterFactory.validateExporterConfig(settings); + } + /** * Get all top queries records that are in the current top n queries store * Optionally include top N records from the last window. @@ -254,6 +338,10 @@ private void rotateWindowIfNecessary(final long newWindowStart) { topQueriesStore.clear(); topQueriesCurrentSnapshot.set(new ArrayList<>()); windowStart = newWindowStart; + // export to the configured sink + if (exporter != null) { + threadPool.executor(QUERY_INSIGHTS_EXECUTOR).execute(() -> exporter.export(history)); + } } } @@ -279,4 +367,11 @@ private long calculateWindowStart(final long timestamp) { public List getTopQueriesCurrentSnapshot() { return topQueriesCurrentSnapshot.get(); } + + /** + * Close the top n queries service + */ + public void close() throws IOException { + queryInsightsExporterFactory.closeExporter(this.exporter); + } } diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java index 060711edb5580..fec00a680ae58 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java @@ -8,9 +8,11 @@ package org.opensearch.plugin.insights.rules.model; +import org.opensearch.core.common.Strings; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.ToXContentObject; import org.opensearch.core.xcontent.XContentBuilder; @@ -173,4 +175,9 @@ public boolean equals(final Object o) { public int hashCode() { return Objects.hash(timestamp, measurements, attributes); } + + @Override + public String toString() { + return Strings.toString(MediaTypeRegistry.JSON, this); + } } diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java index 52cc1fbde790f..b2e01062e334c 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java @@ -9,7 +9,9 @@ package org.opensearch.plugin.insights.settings; import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.plugin.insights.core.exporter.SinkType; import java.util.Arrays; import java.util.HashSet; @@ -109,6 +111,37 @@ public class QueryInsightsSettings { Setting.Property.Dynamic ); + /** + * Config key for exporter type + */ + public static final String EXPORTER_TYPE = "type"; + /** + * Config key for export index + */ + public static final String EXPORT_INDEX = "config.index"; + + /** + * Settings and defaults for top queries exporters + */ + private static final String TOP_N_LATENCY_QUERIES_EXPORTER_PREFIX = TOP_N_LATENCY_QUERIES_PREFIX + ".exporter."; + /** + * Default index pattern of top n queries by latency + */ + public static final String DEFAULT_TOP_N_LATENCY_QUERIES_INDEX_PATTERN = "'top_queries_by_latency-'YYYY.MM.dd"; + /** + * Default exporter type of top queries + */ + public static final String DEFAULT_TOP_QUERIES_EXPORTER_TYPE = SinkType.LOCAL_INDEX.toString(); + + /** + * Settings for the exporter of top latency queries + */ + public static final Setting TOP_N_LATENCY_EXPORTER_SETTINGS = Setting.groupSetting( + TOP_N_LATENCY_QUERIES_EXPORTER_PREFIX, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + /** * Default constructor */ diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java index 273b69e483e8c..15bf284652e13 100644 --- a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java @@ -49,6 +49,7 @@ public void setup() { clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_EXPORTER_SETTINGS); clusterService = new ClusterService(settings, clusterSettings, threadPool); @@ -59,7 +60,8 @@ public void testGetSettings() { Arrays.asList( QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED, QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE, - QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE + QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE, + QueryInsightsSettings.TOP_N_LATENCY_EXPORTER_SETTINGS ), queryInsightsPlugin.getSettings() ); diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/exporter/DebugExporterTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/exporter/DebugExporterTests.java new file mode 100644 index 0000000000000..736e406289b2c --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/exporter/DebugExporterTests.java @@ -0,0 +1,37 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.exporter; + +import org.opensearch.plugin.insights.QueryInsightsTestUtils; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; +import org.opensearch.test.OpenSearchTestCase; +import org.junit.Before; + +import java.util.List; + +/** + * Granular tests for the {@link DebugExporterTests} class. + */ +public class DebugExporterTests extends OpenSearchTestCase { + private DebugExporter debugExporter; + + @Before + public void setup() { + debugExporter = DebugExporter.getInstance(); + } + + public void testExport() { + List records = QueryInsightsTestUtils.generateQueryInsightRecords(2); + try { + debugExporter.export(records); + } catch (Exception e) { + fail("No exception should be thrown when exporting query insights data"); + } + } +} diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporterTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporterTests.java new file mode 100644 index 0000000000000..9ea864a7083f4 --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporterTests.java @@ -0,0 +1,99 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.exporter; + +import org.opensearch.action.bulk.BulkAction; +import org.opensearch.action.bulk.BulkRequestBuilder; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.client.Client; +import org.opensearch.plugin.insights.QueryInsightsTestUtils; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; +import org.opensearch.test.OpenSearchTestCase; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; +import org.junit.Before; + +import java.util.List; + +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +/** + * Granular tests for the {@link LocalIndexExporterTests} class. + */ +public class LocalIndexExporterTests extends OpenSearchTestCase { + private final DateTimeFormatter format = DateTimeFormat.forPattern("YYYY.MM.dd"); + private final Client client = mock(Client.class); + private LocalIndexExporter localIndexExporter; + + @Before + public void setup() { + localIndexExporter = new LocalIndexExporter(client, format); + } + + public void testExportEmptyRecords() { + List records = List.of(); + try { + localIndexExporter.export(records); + } catch (Exception e) { + fail("No exception should be thrown when exporting empty query insights data"); + } + } + + @SuppressWarnings("unchecked") + public void testExportRecords() { + BulkRequestBuilder bulkRequestBuilder = spy(new BulkRequestBuilder(client, BulkAction.INSTANCE)); + final PlainActionFuture future = mock(PlainActionFuture.class); + when(future.actionGet()).thenReturn(null); + doAnswer(invocation -> future).when(bulkRequestBuilder).execute(); + when(client.prepareBulk()).thenReturn(bulkRequestBuilder); + + List records = QueryInsightsTestUtils.generateQueryInsightRecords(2); + try { + localIndexExporter.export(records); + } catch (Exception e) { + fail("No exception should be thrown when exporting query insights data"); + } + assertEquals(2, bulkRequestBuilder.numberOfActions()); + } + + @SuppressWarnings("unchecked") + public void testExportRecordsWithError() { + BulkRequestBuilder bulkRequestBuilder = spy(new BulkRequestBuilder(client, BulkAction.INSTANCE)); + final PlainActionFuture future = mock(PlainActionFuture.class); + when(future.actionGet()).thenReturn(null); + doThrow(new RuntimeException()).when(bulkRequestBuilder).execute(); + when(client.prepareBulk()).thenReturn(bulkRequestBuilder); + + List records = QueryInsightsTestUtils.generateQueryInsightRecords(2); + try { + localIndexExporter.export(records); + } catch (Exception e) { + fail("No exception should be thrown when exporting query insights data"); + } + } + + public void testClose() { + try { + localIndexExporter.close(); + } catch (Exception e) { + fail("No exception should be thrown when closing local index exporter"); + } + } + + public void testGetAndSetIndexPattern() { + DateTimeFormatter newFormatter = mock(DateTimeFormatter.class); + localIndexExporter.setIndexPattern(newFormatter); + assert (localIndexExporter.getIndexPattern() == newFormatter); + } +} diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactoryTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactoryTests.java new file mode 100644 index 0000000000000..f01dd2c17509c --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactoryTests.java @@ -0,0 +1,89 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.exporter; + +import org.opensearch.client.Client; +import org.opensearch.common.settings.Settings; +import org.opensearch.test.OpenSearchTestCase; +import org.joda.time.format.DateTimeFormat; +import org.junit.Before; + +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_QUERIES_EXPORTER_TYPE; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORTER_TYPE; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORT_INDEX; +import static org.mockito.Mockito.mock; + +/** + * Granular tests for the {@link QueryInsightsExporterFactoryTests} class. + */ +public class QueryInsightsExporterFactoryTests extends OpenSearchTestCase { + private final String format = "YYYY.MM.dd"; + + private final Client client = mock(Client.class); + private QueryInsightsExporterFactory queryInsightsExporterFactory; + + @Before + public void setup() { + queryInsightsExporterFactory = new QueryInsightsExporterFactory(client); + } + + public void testValidateConfigWhenResetExporter() { + Settings.Builder settingsBuilder = Settings.builder(); + // empty settings + Settings settings = settingsBuilder.build(); + try { + queryInsightsExporterFactory.validateExporterConfig(settings); + } catch (Exception e) { + fail("No exception should be thrown when setting is null"); + } + } + + public void testInvalidExporterTypeConfig() { + Settings.Builder settingsBuilder = Settings.builder(); + Settings settings = settingsBuilder.put(EXPORTER_TYPE, "some_invalid_type").build(); + assertThrows(IllegalArgumentException.class, () -> { queryInsightsExporterFactory.validateExporterConfig(settings); }); + } + + public void testInvalidLocalIndexConfig() { + Settings.Builder settingsBuilder = Settings.builder(); + assertThrows(IllegalArgumentException.class, () -> { + queryInsightsExporterFactory.validateExporterConfig( + settingsBuilder.put(EXPORTER_TYPE, DEFAULT_TOP_QUERIES_EXPORTER_TYPE).put(EXPORT_INDEX, "").build() + ); + }); + assertThrows(IllegalArgumentException.class, () -> { + queryInsightsExporterFactory.validateExporterConfig( + settingsBuilder.put(EXPORTER_TYPE, DEFAULT_TOP_QUERIES_EXPORTER_TYPE).put(EXPORT_INDEX, "some_invalid_pattern").build() + ); + }); + } + + public void testCreateAndCloseExporter() { + QueryInsightsExporter exporter1 = queryInsightsExporterFactory.createExporter(SinkType.LOCAL_INDEX, format); + assertTrue(exporter1 instanceof LocalIndexExporter); + QueryInsightsExporter exporter2 = queryInsightsExporterFactory.createExporter(SinkType.DEBUG, format); + assertTrue(exporter2 instanceof DebugExporter); + QueryInsightsExporter exporter3 = queryInsightsExporterFactory.createExporter(SinkType.DEBUG, format); + assertTrue(exporter3 instanceof DebugExporter); + try { + queryInsightsExporterFactory.closeExporter(exporter1); + queryInsightsExporterFactory.closeExporter(exporter2); + queryInsightsExporterFactory.closeAllExporters(); + } catch (Exception e) { + fail("No exception should be thrown when closing exporter"); + } + } + + public void testUpdateExporter() { + LocalIndexExporter exporter = new LocalIndexExporter(client, DateTimeFormat.forPattern("yyyy-MM-dd")); + queryInsightsExporterFactory.updateExporter(exporter, "yyyy-MM-dd-HH"); + assertEquals(DateTimeFormat.forPattern("yyyy-MM-dd-HH"), exporter.getIndexPattern()); + } + +} diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java index c29b48b9690d1..428f615ce2f90 100644 --- a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java @@ -8,6 +8,9 @@ package org.opensearch.plugin.insights.core.service; +import org.opensearch.client.Client; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; import org.opensearch.plugin.insights.QueryInsightsTestUtils; import org.opensearch.plugin.insights.rules.model.MetricType; import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; @@ -23,11 +26,16 @@ */ public class QueryInsightsServiceTests extends OpenSearchTestCase { private final ThreadPool threadPool = mock(ThreadPool.class); + private final Client client = mock(Client.class); private QueryInsightsService queryInsightsService; @Before public void setup() { - queryInsightsService = new QueryInsightsService(threadPool); + Settings.Builder settingsBuilder = Settings.builder(); + Settings settings = settingsBuilder.build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_EXPORTER_SETTINGS); + queryInsightsService = new QueryInsightsService(clusterSettings, threadPool, client); queryInsightsService.enableCollection(MetricType.LATENCY, true); queryInsightsService.enableCollection(MetricType.CPU, true); queryInsightsService.enableCollection(MetricType.JVM, true); @@ -46,4 +54,12 @@ public void testAddRecordToLimitAndDrain() { queryInsightsService.getTopQueriesService(MetricType.LATENCY).getTopQueriesRecords(false).size() ); } + + public void testClose() { + try { + queryInsightsService.doClose(); + } catch (Exception e) { + fail("No exception expected when closing query insights service"); + } + } } diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java index 060df84a89485..3efd4c86833cc 100644 --- a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java @@ -11,24 +11,30 @@ import org.opensearch.cluster.coordination.DeterministicTaskQueue; import org.opensearch.common.unit.TimeValue; import org.opensearch.plugin.insights.QueryInsightsTestUtils; +import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory; import org.opensearch.plugin.insights.rules.model.MetricType; import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; import org.opensearch.plugin.insights.settings.QueryInsightsSettings; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.ThreadPool; import org.junit.Before; import java.util.List; import java.util.concurrent.TimeUnit; +import static org.mockito.Mockito.mock; + /** * Unit Tests for {@link QueryInsightsService}. */ public class TopQueriesServiceTests extends OpenSearchTestCase { private TopQueriesService topQueriesService; + private final ThreadPool threadPool = mock(ThreadPool.class); + private final QueryInsightsExporterFactory queryInsightsExporterFactory = mock(QueryInsightsExporterFactory.class); @Before public void setup() { - topQueriesService = new TopQueriesService(MetricType.LATENCY); + topQueriesService = new TopQueriesService(MetricType.LATENCY, threadPool, queryInsightsExporterFactory); topQueriesService.setTopNSize(Integer.MAX_VALUE); topQueriesService.setWindowSize(new TimeValue(Long.MAX_VALUE)); topQueriesService.setEnabled(true); diff --git a/server/build.gradle b/server/build.gradle index 96810672cd5f9..3a9ed1a6838d7 100644 --- a/server/build.gradle +++ b/server/build.gradle @@ -96,9 +96,9 @@ dependencies { api "joda-time:joda-time:${versions.joda}" // percentiles aggregation - api 'com.tdunning:t-digest:3.2' - // precentil ranks aggregation - api 'org.hdrhistogram:HdrHistogram:2.2.1' + api "com.tdunning:t-digest:${versions.tdigest}" + // percentile ranks aggregation + api "org.hdrhistogram:HdrHistogram:${versions.hdrhistogram}" // lucene spatial api "org.locationtech.spatial4j:spatial4j:${versions.spatial4j}", optional diff --git a/server/licenses/HdrHistogram-2.2.1.jar.sha1 b/server/licenses/HdrHistogram-2.2.1.jar.sha1 deleted file mode 100644 index 68225950d4744..0000000000000 --- a/server/licenses/HdrHistogram-2.2.1.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -0eb1feb351f64176c377772a30174e582c0274d5 \ No newline at end of file diff --git a/server/licenses/HdrHistogram-2.2.2.jar.sha1 b/server/licenses/HdrHistogram-2.2.2.jar.sha1 new file mode 100644 index 0000000000000..2c895841bce81 --- /dev/null +++ b/server/licenses/HdrHistogram-2.2.2.jar.sha1 @@ -0,0 +1 @@ +7959933ebcc0f05b2eaa5af0a0c8689fa257b15c diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index 7721b18a4fe6b..96d6338e5913b 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -852,7 +852,9 @@ public void testFlushOnTooManyRemoteTranslogFiles() throws Exception { ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); updateSettingsRequest.persistentSettings( - Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS.getKey(), "100") + Settings.builder() + .put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS.getKey(), "100") + .put(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), "0ms") ); assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); @@ -883,5 +885,27 @@ public void testFlushOnTooManyRemoteTranslogFiles() throws Exception { assertEquals(totalFiles, 1L); } }, 30, TimeUnit.SECONDS); + + // Disabling max translog readers + assertAcked( + internalCluster().client() + .admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS.getKey(), "-1")) + .get() + ); + + // Indexing 500 more docs + for (int i = 0; i < 500; i++) { + indexBulk(INDEX_NAME, 1); + } + + // No flush is triggered since max_translog_readers is set to -1 + // Total tlog files would be incremented by 500 + try (Stream files = Files.list(translogLocation)) { + long totalFiles = files.filter(f -> f.getFileName().toString().endsWith(Translog.TRANSLOG_FILE_SUFFIX)).count(); + assertEquals(totalFiles, 501L); + } } } diff --git a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/terms/StringTermsIT.java b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/terms/StringTermsIT.java index edf9cd432dda2..f5d018b2ef491 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/terms/StringTermsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/terms/StringTermsIT.java @@ -42,10 +42,12 @@ import org.opensearch.index.query.QueryBuilders; import org.opensearch.script.Script; import org.opensearch.script.ScriptType; +import org.opensearch.search.aggregations.AggregationBuilders; import org.opensearch.search.aggregations.AggregationExecutionException; import org.opensearch.search.aggregations.Aggregator.SubAggCollectionMode; import org.opensearch.search.aggregations.BucketOrder; import org.opensearch.search.aggregations.bucket.filter.Filter; +import org.opensearch.search.aggregations.bucket.filter.InternalFilters; import org.opensearch.search.aggregations.bucket.terms.Terms.Bucket; import org.opensearch.search.aggregations.metrics.Avg; import org.opensearch.search.aggregations.metrics.ExtendedStats; @@ -999,6 +1001,72 @@ public void testOtherDocCount() { testOtherDocCount(SINGLE_VALUED_FIELD_NAME, MULTI_VALUED_FIELD_NAME); } + public void testDeferredSubAggs() { + // Tests subAgg doc count is the same with different collection modes and additional top level aggs + SearchResponse r1 = client().prepareSearch("idx") + .setSize(0) + .addAggregation( + terms("terms1").collectMode(SubAggCollectionMode.BREADTH_FIRST) + .field("s_value") + .size(2) + .subAggregation(AggregationBuilders.filters("filter", QueryBuilders.boolQuery())) + ) + .addAggregation(AggregationBuilders.min("min").field("constant")) + .get(); + + SearchResponse r2 = client().prepareSearch("idx") + .setSize(0) + .addAggregation( + terms("terms1").collectMode(SubAggCollectionMode.DEPTH_FIRST) + .field("s_value") + .size(2) + .subAggregation(AggregationBuilders.filters("filter", QueryBuilders.boolQuery())) + ) + .addAggregation(AggregationBuilders.min("min").field("constant")) + .get(); + + SearchResponse r3 = client().prepareSearch("idx") + .setSize(0) + .addAggregation( + terms("terms1").collectMode(SubAggCollectionMode.BREADTH_FIRST) + .field("s_value") + .size(2) + .subAggregation(AggregationBuilders.filters("filter", QueryBuilders.boolQuery())) + ) + .get(); + + SearchResponse r4 = client().prepareSearch("idx") + .setSize(0) + .addAggregation( + terms("terms1").collectMode(SubAggCollectionMode.DEPTH_FIRST) + .field("s_value") + .size(2) + .subAggregation(AggregationBuilders.filters("filter", QueryBuilders.boolQuery())) + ) + .get(); + + assertNotNull(r1.getAggregations().get("terms1")); + assertNotNull(r2.getAggregations().get("terms1")); + assertNotNull(r3.getAggregations().get("terms1")); + assertNotNull(r4.getAggregations().get("terms1")); + + Terms terms = r1.getAggregations().get("terms1"); + Bucket b1 = terms.getBucketByKey("val0"); + InternalFilters f1 = b1.getAggregations().get("filter"); + long docCount1 = f1.getBuckets().get(0).getDocCount(); + Bucket b2 = terms.getBucketByKey("val1"); + InternalFilters f2 = b2.getAggregations().get("filter"); + long docCount2 = f1.getBuckets().get(0).getDocCount(); + + for (SearchResponse response : new SearchResponse[] { r2, r3, r4 }) { + terms = response.getAggregations().get("terms1"); + f1 = terms.getBucketByKey(b1.getKeyAsString()).getAggregations().get("filter"); + f2 = terms.getBucketByKey(b2.getKeyAsString()).getAggregations().get("filter"); + assertEquals(docCount1, f1.getBuckets().get(0).getDocCount()); + assertEquals(docCount2, f2.getBuckets().get(0).getDocCount()); + } + } + /** * Make sure that a request using a deterministic script or not using a script get cached. * Ensure requests using nondeterministic scripts do not get cached. diff --git a/server/src/main/java/org/opensearch/common/cache/Cache.java b/server/src/main/java/org/opensearch/common/cache/Cache.java index 6d346de25cadf..caae81e4387b4 100644 --- a/server/src/main/java/org/opensearch/common/cache/Cache.java +++ b/server/src/main/java/org/opensearch/common/cache/Cache.java @@ -36,9 +36,11 @@ import org.opensearch.common.collect.Tuple; import org.opensearch.common.util.concurrent.ReleasableLock; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; @@ -396,7 +398,12 @@ private V get(K key, long now, Consumer> onExpiration) { if (entry == null) { return null; } else { - promote(entry, now); + List> removalNotifications = promote(entry, now).v2(); + if (!removalNotifications.isEmpty()) { + for (RemovalNotification removalNotification : removalNotifications) { + removalListener.onRemoval(removalNotification); + } + } return entry.value; } } @@ -446,8 +453,14 @@ private V compute(K key, CacheLoader loader) throws ExecutionException { BiFunction, Throwable, ? extends V> handler = (ok, ex) -> { if (ok != null) { + List> removalNotifications = new ArrayList<>(); try (ReleasableLock ignored = lruLock.acquire()) { - promote(ok, now); + removalNotifications = promote(ok, now).v2(); + } + if (!removalNotifications.isEmpty()) { + for (RemovalNotification removalNotification : removalNotifications) { + removalListener.onRemoval(removalNotification); + } } return ok.value; } else { @@ -512,16 +525,22 @@ private void put(K key, V value, long now) { CacheSegment segment = getCacheSegment(key); Tuple, Entry> tuple = segment.put(key, value, now); boolean replaced = false; + List> removalNotifications = new ArrayList<>(); try (ReleasableLock ignored = lruLock.acquire()) { if (tuple.v2() != null && tuple.v2().state == State.EXISTING) { if (unlink(tuple.v2())) { replaced = true; } } - promote(tuple.v1(), now); + removalNotifications = promote(tuple.v1(), now).v2(); } if (replaced) { - removalListener.onRemoval(new RemovalNotification<>(tuple.v2().key, tuple.v2().value, RemovalReason.REPLACED)); + removalNotifications.add(new RemovalNotification<>(tuple.v2().key, tuple.v2().value, RemovalReason.REPLACED)); + } + if (!removalNotifications.isEmpty()) { + for (RemovalNotification removalNotification : removalNotifications) { + removalListener.onRemoval(removalNotification); + } } } @@ -767,8 +786,17 @@ public long getEvictions() { } } - private boolean promote(Entry entry, long now) { + /** + * Promotes the desired entry to the head of the lru list and tries to see if it needs to evict any entries in + * case the cache size is exceeding or the entry got expired. + * @param entry Entry to be promoted + * @param now the current time + * @return Returns a tuple. v1 signifies whether an entry got promoted, v2 signifies the list of removal + * notifications that the callers needs to handle. + */ + private Tuple>> promote(Entry entry, long now) { boolean promoted = true; + List> removalNotifications = new ArrayList<>(); try (ReleasableLock ignored = lruLock.acquire()) { switch (entry.state) { case DELETED: @@ -782,10 +810,21 @@ private boolean promote(Entry entry, long now) { break; } if (promoted) { - evict(now); + while (tail != null && shouldPrune(tail, now)) { + Entry entryToBeRemoved = tail; + CacheSegment segment = getCacheSegment(entryToBeRemoved.key); + if (segment != null) { + segment.remove(entryToBeRemoved.key, entryToBeRemoved.value, f -> {}); + } + if (unlink(entryToBeRemoved)) { + removalNotifications.add( + new RemovalNotification<>(entryToBeRemoved.key, entryToBeRemoved.value, RemovalReason.EVICTED) + ); + } + } } } - return promoted; + return new Tuple<>(promoted, removalNotifications); } private void evict(long now) { diff --git a/server/src/main/java/org/opensearch/common/cache/RemovalListener.java b/server/src/main/java/org/opensearch/common/cache/RemovalListener.java index 68e1cdf6139e2..eaaaec2bb07e0 100644 --- a/server/src/main/java/org/opensearch/common/cache/RemovalListener.java +++ b/server/src/main/java/org/opensearch/common/cache/RemovalListener.java @@ -42,5 +42,10 @@ @ExperimentalApi @FunctionalInterface public interface RemovalListener { + + /** + * This may be called from multiple threads at once. So implementation needs to be thread safe. + * @param notification removal notification for desired entry. + */ void onRemoval(RemovalNotification notification); } diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 243af4ec248c3..fbff99536ebf4 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -170,6 +170,13 @@ protected boolean performAfterRefreshWithPermit(boolean didRefresh) { * @return true if sync is needed */ private boolean shouldSync(boolean didRefresh, boolean skipPrimaryTermCheck) { + // Ignore syncing segments if the underlying shard is closed + // This also makes sure that retries are not scheduled for shards + // with failed syncSegments invocation after they are closed + if (shardClosed()) { + logger.info("Shard is already closed. Not attempting sync to remote store"); + return false; + } boolean shouldSync = didRefresh // If the readers change, didRefresh is always true. // The third condition exists for uploading the zero state segments where the refresh has not changed the reader // reference, but it is important to upload the zero state segments so that the restore does not break. @@ -608,6 +615,15 @@ public void onFailure(String file) { }; } + /** + * Checks if the underlying IndexShard instance is closed + * + * @return true if it is closed, false otherwise + */ + private boolean shardClosed() { + return indexShard.state() == IndexShardState.CLOSED; + } + @Override protected Logger getLogger() { return logger; diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 67549c86b7dd2..f29b6fba6537f 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -706,6 +706,10 @@ int availablePermits() { */ @Override protected boolean shouldFlush() { - return readers.size() >= translogTransferManager.getMaxRemoteTranslogReadersSettings(); + int maxRemoteTlogReaders = translogTransferManager.getMaxRemoteTranslogReadersSettings(); + if (maxRemoteTlogReaders == -1) { + return false; + } + return readers.size() >= maxRemoteTlogReaders; } } diff --git a/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java b/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java index 074186f64a75d..8cb482c8d8681 100644 --- a/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java +++ b/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java @@ -25,6 +25,7 @@ */ @PublicApi(since = "2.14.0") public class RemoteStoreSettings { + private static final int MIN_CLUSTER_REMOTE_MAX_TRANSLOG_READERS = 100; /** * Used to specify the default translog buffer interval for remote store backed indexes. @@ -112,7 +113,12 @@ public class RemoteStoreSettings { public static final Setting CLUSTER_REMOTE_MAX_TRANSLOG_READERS = Setting.intSetting( "cluster.remote_store.translog.max_readers", 1000, - 100, + -1, + v -> { + if (v != -1 && v < MIN_CLUSTER_REMOTE_MAX_TRANSLOG_READERS) { + throw new IllegalArgumentException("Cannot set value lower than " + MIN_CLUSTER_REMOTE_MAX_TRANSLOG_READERS); + } + }, Property.Dynamic, Property.NodeScope ); diff --git a/server/src/main/java/org/opensearch/search/aggregations/BucketCollectorProcessor.java b/server/src/main/java/org/opensearch/search/aggregations/BucketCollectorProcessor.java index df05ce3f5c049..32c243cc12aa6 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/BucketCollectorProcessor.java +++ b/server/src/main/java/org/opensearch/search/aggregations/BucketCollectorProcessor.java @@ -71,10 +71,10 @@ public void processPostCollection(Collector collectorTree) throws IOException { collectors.offer(innerCollector); } } else if (currentCollector instanceof BucketCollector) { - ((BucketCollector) currentCollector).postCollection(); - // Perform build aggregation during post collection if (currentCollector instanceof Aggregator) { + // Do not perform postCollection for MultiBucketCollector as we are unwrapping that below + ((BucketCollector) currentCollector).postCollection(); ((Aggregator) currentCollector).buildTopLevel(); } else if (currentCollector instanceof MultiBucketCollector) { for (Collector innerCollector : ((MultiBucketCollector) currentCollector).getCollectors()) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/BestBucketsDeferringCollector.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/BestBucketsDeferringCollector.java index 223be3ba2d1ae..69d0fcd6c96f2 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/BestBucketsDeferringCollector.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/BestBucketsDeferringCollector.java @@ -124,6 +124,7 @@ private void finishLeaf() { if (context != null) { assert docDeltasBuilder != null && bucketsBuilder != null; entries.add(new Entry(context, docDeltasBuilder.build(), bucketsBuilder.build())); + context = null; } } @@ -161,6 +162,7 @@ public void preCollection() throws IOException { @Override public void postCollection() throws IOException { + assert searchContext.searcher().getLeafContexts().isEmpty() || finished != true; finishLeaf(); finished = true; } diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index e0ee58cbfdb22..ee80b1b037ce5 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -47,6 +47,7 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; @@ -469,6 +470,25 @@ public void testRefreshPersistentFailure() throws Exception { assertFalse("remote store should not in sync", tuple.v1().isRemoteSegmentStoreInSync()); } + public void testRefreshPersistentFailureAndIndexShardClosed() throws Exception { + int succeedOnAttempt = 3; + int closeShardOnAttempt = 1; + CountDownLatch refreshCountLatch = new CountDownLatch(1); + CountDownLatch successLatch = new CountDownLatch(10); + Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh( + succeedOnAttempt, + refreshCountLatch, + successLatch, + true, + closeShardOnAttempt + ); + // Giving 10ms for some iterations of remote refresh upload + Thread.sleep(TimeUnit.SECONDS.toMillis(2)); + RemoteStoreRefreshListener listener = tuple.v1(); + assertFalse("remote store should not in sync", listener.isRemoteSegmentStoreInSync()); + assertFalse(listener.getRetryScheduledStatus()); + } + private void assertNoLagAndTotalUploadsFailed(RemoteSegmentTransferTracker segmentTracker, long totalUploadsFailed) throws Exception { assertBusy(() -> { assertEquals(0, segmentTracker.getBytesLag()); @@ -547,6 +567,49 @@ private Tuple mockIn return mockIndexShardWithRetryAndScheduleRefresh(succeedOnAttempt, refreshCountLatch, successLatch, 1, noOpLatch); } + private Tuple mockIndexShardWithRetryAndScheduleRefresh( + int totalAttempt, + CountDownLatch refreshCountLatch, + CountDownLatch successLatch, + int checkpointPublishSucceedOnAttempt, + CountDownLatch reachedCheckpointPublishLatch, + boolean mockPrimaryTerm, + boolean testUploadTimeout + ) throws IOException { + return mockIndexShardWithRetryAndScheduleRefresh( + totalAttempt, + refreshCountLatch, + successLatch, + checkpointPublishSucceedOnAttempt, + reachedCheckpointPublishLatch, + mockPrimaryTerm, + testUploadTimeout, + false, + 0 + ); + } + + private Tuple mockIndexShardWithRetryAndScheduleRefresh( + int succeedOnAttempt, + CountDownLatch refreshCountLatch, + CountDownLatch successLatch, + boolean closedShard, + int closeShardAfterAttempt + ) throws IOException { + CountDownLatch noOpLatch = new CountDownLatch(0); + return mockIndexShardWithRetryAndScheduleRefresh( + succeedOnAttempt, + refreshCountLatch, + successLatch, + 1, + noOpLatch, + true, + false, + closedShard, + closeShardAfterAttempt + ); + } + private Tuple mockIndexShardWithRetryAndScheduleRefresh( int succeedOnAttempt, CountDownLatch refreshCountLatch, @@ -561,7 +624,9 @@ private Tuple mockIn succeedCheckpointPublishOnAttempt, reachedCheckpointPublishLatch, true, - false + false, + false, + 0 ); } @@ -572,7 +637,9 @@ private Tuple mockIn int succeedCheckpointPublishOnAttempt, CountDownLatch reachedCheckpointPublishLatch, boolean mockPrimaryTerm, - boolean testUploadTimeout + boolean testUploadTimeout, + boolean closeShard, + int closeShardAfterAttempt ) throws IOException { // Create index shard that we will be using to mock different methods in IndexShard for the unit test indexShard = newStartedShard( @@ -600,7 +667,6 @@ private Tuple mockIn IndexShard shard = mock(IndexShard.class); Store store = mock(Store.class); when(shard.store()).thenReturn(store); - when(shard.state()).thenReturn(IndexShardState.STARTED); when(store.directory()).thenReturn(indexShard.store().directory()); // Mock (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory()) @@ -662,6 +728,14 @@ private Tuple mockIn return indexShard.getLatestReplicationCheckpoint(); })).when(shard).computeReplicationCheckpoint(any()); + doAnswer((invocationOnMock -> { + if (closeShard && counter.get() == closeShardAfterAttempt) { + logger.info("Closing shard..."); + return IndexShardState.CLOSED; + } + return IndexShardState.STARTED; + })).when(shard).state(); + doAnswer(invocation -> { if (Objects.nonNull(successLatch)) { successLatch.countDown(); diff --git a/server/src/test/java/org/opensearch/indices/RemoteStoreSettingsDynamicUpdateTests.java b/server/src/test/java/org/opensearch/indices/RemoteStoreSettingsDynamicUpdateTests.java index f89fd3df6e340..cc9096ee41315 100644 --- a/server/src/test/java/org/opensearch/indices/RemoteStoreSettingsDynamicUpdateTests.java +++ b/server/src/test/java/org/opensearch/indices/RemoteStoreSettingsDynamicUpdateTests.java @@ -116,4 +116,15 @@ public void testMaxRemoteReferencedTranslogFiles() { ); assertEquals(500, remoteStoreSettings.getMaxRemoteTranslogReaders()); } + + public void testDisableMaxRemoteReferencedTranslogFiles() { + // Test default value + assertEquals(1000, remoteStoreSettings.getMaxRemoteTranslogReaders()); + + // Test override with valid value + clusterSettings.applySettings( + Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS.getKey(), "-1").build() + ); + assertEquals(-1, remoteStoreSettings.getMaxRemoteTranslogReaders()); + } }