Skip to content

Commit

Permalink
[Search Pipelines] Add default_search_pipeline index setting (opensea…
Browse files Browse the repository at this point in the history
…rch-project#7470)

* [Search Pipelines] Add default_search_pipeline index setting

Once users have defined and tested a search pipeline, they may want to
apply it by default to all queries hitting a given index.

Users should be able to bypass the default pipeline for the index by
explicitly specifying `search_pipeline=_none` in the URL parameter of
their search request.

Signed-off-by: Michael Froh <[email protected]>

* Rename default search pipeline setting and add getter/setter

Also change version check for ad hoc pipeline to 2.8

Signed-off-by: Michael Froh <[email protected]>

* Add tests for new IndexSettings methods

Signed-off-by: Michael Froh <[email protected]>

* Update test to reflect change to FeatureFlagSetter

Signed-off-by: Michael Froh <[email protected]>

---------

Signed-off-by: Michael Froh <[email protected]>
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
msfroh authored and shiv0408 committed Apr 25, 2024
1 parent f20ce9c commit d610da2
Show file tree
Hide file tree
Showing 9 changed files with 166 additions and 15 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Extensions] Add IdentityPlugin into core to support Extension identities ([#7246](https://github.com/opensearch-project/OpenSearch/pull/7246))
- Add connectToNodeAsExtension in TransportService ([#6866](https://github.com/opensearch-project/OpenSearch/pull/6866))
- [Search Pipelines] Accept pipelines defined in search source ([#7253](https://github.com/opensearch-project/OpenSearch/pull/7253))
- [Search Pipelines] Add `default_search_pipeline` index setting ([#7470](https://github.com/opensearch-project/OpenSearch/pull/7470))
- Add descending order search optimization through reverse segment read. ([#7244](https://github.com/opensearch-project/OpenSearch/pull/7244))
- Add 'unsigned_long' numeric field type ([#6237](https://github.com/opensearch-project/OpenSearch/pull/6237))
- Add back primary shard preference for queries ([#7375](https://github.com/opensearch-project/OpenSearch/pull/7375))
Expand Down Expand Up @@ -129,4 +130,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Security

[Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.7...2.x
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.7...2.x
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,24 @@ teardown:
}
- match: { hits.total.value: 1 }
- match: { hits.hits.0._id: "1" }

# Make it the default for the index
- do:
indices.put_settings:
index: test
body:
index.search.default_pipeline: my_pipeline

- do:
search:
index: test
body: { }
- match: { hits.total.value: 1 }

# Explicitly bypass the pipeline to match both docs
- do:
search:
search_pipeline: _none
index: test
body: { }
- match: { hits.total.value: 2 }
1 change: 1 addition & 0 deletions qa/mixed-cluster/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) {
numberOfNodes = 4

setting 'path.repo', "${buildDir}/cluster/shared/repo/${baseName}"
setting 'opensearch.experimental.feature.search_pipeline.enabled', 'true'
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
---
"Test basic pipeline crud":
- skip:
version: " - 2.99.99"
reason: "to be introduced in future release / TODO: change if/when we backport to 2.x"
version: " - 2.7.0"
reason: "Added in 2.7.0"
- do:
search_pipeline.put:
id: "my_pipeline"
Expand Down Expand Up @@ -32,8 +32,8 @@
---
"Test Put Versioned Pipeline":
- skip:
version: " - 2.99.99"
reason: "to be introduced in future release / TODO: change if/when we backport to 2.x"
version: " - 2.7.0"
reason: "Added in 2.7.0"
- do:
search_pipeline.put:
id: "my_pipeline"
Expand Down Expand Up @@ -125,8 +125,8 @@
---
"Test Get All Pipelines":
- skip:
version: " - 2.99.99"
reason: "to be introduced in future release / TODO: change if/when we backport to 2.x"
version: " - 2.7.0"
reason: "Added in 2.7.0"
- do:
search_pipeline.put:
id: "first_pipeline"
Expand All @@ -152,8 +152,8 @@
---
"Test invalid config":
- skip:
version: " - 2.99.99"
reason: "to be introduced in future release / TODO: change if/when we backport to 2.x"
version: " - 2.7.0"
reason: "Added in 2.7.0"
- do:
catch: /parse_exception/
search_pipeline.put:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED,
IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME,
IndexSettings.INDEX_MERGE_ON_FLUSH_POLICY,
IndexSettings.DEFAULT_SEARCH_PIPELINE,

// Settings for Searchable Snapshots
IndexSettings.SEARCHABLE_SNAPSHOT_REPOSITORY,
Expand Down
27 changes: 27 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsException;
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue;
Expand All @@ -51,6 +52,7 @@
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.ingest.IngestService;
import org.opensearch.node.Node;
import org.opensearch.search.pipeline.SearchPipelineService;

import java.util.Collections;
import java.util.List;
Expand All @@ -62,6 +64,7 @@
import java.util.function.UnaryOperator;

import static org.opensearch.common.util.FeatureFlags.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY;
import static org.opensearch.common.util.FeatureFlags.SEARCH_PIPELINE;
import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_DEPTH_LIMIT_SETTING;
import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING;
import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_NESTED_DOCS_LIMIT_SETTING;
Expand Down Expand Up @@ -578,6 +581,14 @@ public final class IndexSettings {
Property.InternalIndex
);

public static final Setting<String> DEFAULT_SEARCH_PIPELINE = new Setting<>(
"index.search.default_pipeline",
SearchPipelineService.NOOP_PIPELINE_ID,
Function.identity(),
Property.Dynamic,
Property.IndexScope
);

private final Index index;
private final Version version;
private final Logger logger;
Expand Down Expand Up @@ -619,6 +630,8 @@ public final class IndexSettings {

private volatile long retentionLeaseMillis;

private volatile String defaultSearchPipeline;

/**
* The maximum age of a retention lease before it is considered expired.
*
Expand Down Expand Up @@ -823,6 +836,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
maxFullFlushMergeWaitTime = scopedSettings.get(INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME);
mergeOnFlushEnabled = scopedSettings.get(INDEX_MERGE_ON_FLUSH_ENABLED);
setMergeOnFlushPolicy(scopedSettings.get(INDEX_MERGE_ON_FLUSH_POLICY));
defaultSearchPipeline = scopedSettings.get(DEFAULT_SEARCH_PIPELINE);

scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING, mergePolicyConfig::setNoCFSRatio);
scopedSettings.addSettingsUpdateConsumer(
Expand Down Expand Up @@ -896,6 +910,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME, this::setMaxFullFlushMergeWaitTime);
scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_ON_FLUSH_ENABLED, this::setMergeOnFlushEnabled);
scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_ON_FLUSH_POLICY, this::setMergeOnFlushPolicy);
scopedSettings.addSettingsUpdateConsumer(DEFAULT_SEARCH_PIPELINE, this::setDefaultSearchPipeline);
}

private void setSearchSegmentOrderReversed(boolean reversed) {
Expand Down Expand Up @@ -1585,4 +1600,16 @@ private void setMergeOnFlushPolicy(String policy) {
public Optional<UnaryOperator<MergePolicy>> getMergeOnFlushPolicy() {
return Optional.ofNullable(mergeOnFlushPolicy);
}

public String getDefaultSearchPipeline() {
return defaultSearchPipeline;
}

public void setDefaultSearchPipeline(String defaultSearchPipeline) {
if (FeatureFlags.isEnabled(SEARCH_PIPELINE)) {
this.defaultSearchPipeline = defaultSearchPipeline;
} else {
throw new SettingsException("Unsupported setting: " + DEFAULT_SEARCH_PIPELINE.getKey());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,21 @@
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateApplier;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterManagerTaskKeys;
import org.opensearch.cluster.service.ClusterManagerTaskThrottler;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.regex.Regex;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.gateway.GatewayService;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.analysis.AnalysisRegistry;
import org.opensearch.ingest.ConfigurationUtils;
import org.opensearch.node.ReportingService;
Expand Down Expand Up @@ -339,6 +342,7 @@ public PipelinedRequest resolvePipeline(SearchRequest searchRequest) throws Exce
return new PipelinedRequest(pipeline, searchRequest);
}
if (searchRequest.source() != null && searchRequest.source().searchPipelineSource() != null) {
// Pipeline defined in search request (ad hoc pipeline).
if (searchRequest.pipeline() != null) {
throw new IllegalArgumentException(
"Both named and inline search pipeline were specified. Please only specify one or the other."
Expand All @@ -354,13 +358,28 @@ public PipelinedRequest resolvePipeline(SearchRequest searchRequest) throws Exce
} catch (Exception e) {
throw new SearchPipelineProcessingException(e);
}
} else if (searchRequest.pipeline() != null) {
String pipelineId = searchRequest.pipeline();
PipelineHolder pipelineHolder = pipelines.get(pipelineId);
if (pipelineHolder == null) {
throw new IllegalArgumentException("Pipeline " + pipelineId + " is not defined");
} else {
String pipelineId = NOOP_PIPELINE_ID;
if (searchRequest.pipeline() != null) {
// Named pipeline specified for the request
pipelineId = searchRequest.pipeline();
} else if (searchRequest.indices() != null && searchRequest.indices().length == 1) {
// Check for index default pipeline
IndexMetadata indexMetadata = state.metadata().index(searchRequest.indices()[0]);
if (indexMetadata != null) {
Settings indexSettings = indexMetadata.getSettings();
if (IndexSettings.DEFAULT_SEARCH_PIPELINE.exists(indexSettings)) {
pipelineId = IndexSettings.DEFAULT_SEARCH_PIPELINE.get(indexSettings);
}
}
}
if (NOOP_PIPELINE_ID.equals(pipelineId) == false) {
PipelineHolder pipelineHolder = pipelines.get(pipelineId);
if (pipelineHolder == null) {
throw new IllegalArgumentException("Pipeline " + pipelineId + " is not defined");
}
pipeline = pipelineHolder.pipeline;
}
pipeline = pipelineHolder.pipeline;
}
SearchRequest transformedRequest = pipeline.transformRequest(searchRequest);
return new PipelinedRequest(pipeline, transformedRequest);
Expand Down
38 changes: 38 additions & 0 deletions server/src/test/java/org/opensearch/index/IndexSettingsTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.search.pipeline.SearchPipelineService;
import org.opensearch.test.FeatureFlagSetter;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.VersionUtils;
Expand Down Expand Up @@ -1083,4 +1084,41 @@ public void testExtendedCompatibilityVersionWithoutFeatureFlag() {
assertTrue(settings.isRemoteSnapshot());
assertEquals(Version.CURRENT.minimumIndexCompatibilityVersion(), settings.getExtendedCompatibilitySnapshotVersion());
}

@SuppressForbidden(reason = "sets the SEARCH_PIPELINE feature flag")
public void testDefaultSearchPipeline() throws Exception {
FeatureFlagSetter.set(FeatureFlags.SEARCH_PIPELINE);
IndexMetadata metadata = newIndexMeta(
"index",
Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT).build()
);
IndexSettings settings = new IndexSettings(metadata, Settings.EMPTY);
assertEquals(SearchPipelineService.NOOP_PIPELINE_ID, settings.getDefaultSearchPipeline());
metadata = newIndexMeta(
"index",
Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexSettings.DEFAULT_SEARCH_PIPELINE.getKey(), "foo")
.build()
);
settings.updateIndexMetadata(metadata);
assertEquals("foo", settings.getDefaultSearchPipeline());
}

public void testDefaultSearchPipelineWithoutFeatureFlag() {
IndexMetadata metadata = newIndexMeta(
"index",
Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT).build()
);
IndexSettings settings = new IndexSettings(metadata, Settings.EMPTY);
assertEquals(SearchPipelineService.NOOP_PIPELINE_ID, settings.getDefaultSearchPipeline());
IndexMetadata updatedMetadata = newIndexMeta(
"index",
Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexSettings.DEFAULT_SEARCH_PIPELINE.getKey(), "foo")
.build()
);
assertThrows(SettingsException.class, () -> settings.updateIndexMetadata(updatedMetadata));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
Expand All @@ -32,6 +33,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.plugins.SearchPipelinePlugin;
import org.opensearch.search.SearchHit;
Expand Down Expand Up @@ -134,6 +136,47 @@ public void testResolveSearchPipelineDoesNotExist() {
assertTrue(e.getMessage(), e.getMessage().contains(" not defined"));
}

public void testResolveIndexDefaultPipeline() throws Exception {
SearchPipelineService service = createWithProcessors();

SearchPipelineMetadata metadata = new SearchPipelineMetadata(
Map.of(
"p1",
new PipelineConfiguration(
"p1",
new BytesArray("{\"request_processors\" : [ { \"scale_request_size\": { \"scale\" : 2 } } ] }"),
XContentType.JSON
)
)
);
Settings defaultPipelineSetting = Settings.builder()
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
.put(IndexMetadata.SETTING_INDEX_VERSION_CREATED.getKey(), Version.CURRENT)
.put(IndexSettings.DEFAULT_SEARCH_PIPELINE.getKey(), "p1")
.build();
IndexMetadata indexMetadata = new IndexMetadata.Builder("my_index").settings(defaultPipelineSetting).build();
ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build();
ClusterState previousState = clusterState;
clusterState = ClusterState.builder(clusterState)
.metadata(Metadata.builder().put(indexMetadata, false).putCustom(SearchPipelineMetadata.TYPE, metadata))
.build();

ClusterChangedEvent cce = new ClusterChangedEvent("", clusterState, previousState);
service.applyClusterState(cce);

SearchRequest searchRequest = new SearchRequest("my_index").source(SearchSourceBuilder.searchSource().size(5));
PipelinedRequest pipelinedRequest = service.resolvePipeline(searchRequest);
assertEquals("p1", pipelinedRequest.getPipeline().getId());
assertEquals(10, pipelinedRequest.transformedRequest().source().size());

// Bypass the default pipeline
searchRequest.pipeline("_none");
pipelinedRequest = service.resolvePipeline(searchRequest);
assertEquals("_none", pipelinedRequest.getPipeline().getId());
assertEquals(5, pipelinedRequest.transformedRequest().source().size());
}

private static abstract class FakeProcessor implements Processor {
private final String type;
private final String tag;
Expand Down

0 comments on commit d610da2

Please sign in to comment.