Skip to content

Commit

Permalink
Integrates KNN plugin with ConcurrentSearchRequestDecider interface (o…
Browse files Browse the repository at this point in the history
…pensearch-project#2111)

This allows knn queries to enable concurrency when index.search.concurrent_segment_search.mode or
search.concurrent_segment_search.mode in auto mode. Without this the
default behavior of auto mode is non-concurrent search

Signed-off-by: Tejas Shah <[email protected]>
  • Loading branch information
shatejas authored Sep 18, 2024
1 parent ccc59b1 commit 0421cdc
Show file tree
Hide file tree
Showing 6 changed files with 291 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
## [Unreleased 3.0](https://github.com/opensearch-project/k-NN/compare/2.x...HEAD)
### Features
### Enhancements
* Adds concurrent segment search support for mode auto [#2111](https://github.com/opensearch-project/k-NN/pull/2111)
### Bug Fixes
* Add DocValuesProducers for releasing memory when close index [#1946](https://github.com/opensearch-project/k-NN/pull/1946)
### Infrastructure
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/org/opensearch/knn/plugin/KNNPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.index.engine.EngineFactory;
import org.opensearch.indices.SystemIndexDescriptor;
import org.opensearch.knn.index.KNNCircuitBreaker;
import org.opensearch.knn.plugin.search.KNNConcurrentSearchRequestDecider;
import org.opensearch.knn.index.util.KNNClusterUtil;
import org.opensearch.knn.index.query.KNNQueryBuilder;
import org.opensearch.knn.index.KNNSettings;
Expand Down Expand Up @@ -95,6 +96,7 @@
import org.opensearch.script.ScriptContext;
import org.opensearch.script.ScriptEngine;
import org.opensearch.script.ScriptService;
import org.opensearch.search.deciders.ConcurrentSearchRequestDecider;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.FixedExecutorBuilder;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -349,4 +351,9 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
return ImmutableList.of(new SystemIndexDescriptor(MODEL_INDEX_NAME, "Index for storing models used for k-NN indices"));
}

@Override
public Optional<ConcurrentSearchRequestDecider.Factory> getConcurrentSearchRequestDeciderFactory() {
return Optional.of(new KNNConcurrentSearchRequestDecider.Factory());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.plugin.search;

import lombok.EqualsAndHashCode;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.query.KNNQueryBuilder;
import org.opensearch.search.deciders.ConcurrentSearchDecision;
import org.opensearch.search.deciders.ConcurrentSearchRequestDecider;

import java.util.Optional;

/**
* Decides if the knn query uses concurrent segment search
* As of 2.17, this is only used when
* - "index.search.concurrent_segment_search.mode": "auto" or
* - "search.concurrent_segment_search.mode": "auto"
*
* Note: the class is not thread-safe and a new instance needs to be created for each request
*/
@EqualsAndHashCode(callSuper = true)
public class KNNConcurrentSearchRequestDecider extends ConcurrentSearchRequestDecider {

private static final ConcurrentSearchDecision DEFAULT_KNN_DECISION = new ConcurrentSearchDecision(
ConcurrentSearchDecision.DecisionStatus.NO_OP,
"Default decision"
);
private static final ConcurrentSearchDecision YES = new ConcurrentSearchDecision(
ConcurrentSearchDecision.DecisionStatus.YES,
"Enable concurrent search for knn as Query has k-NN query in it and index is k-nn index"
);

private ConcurrentSearchDecision knnDecision = DEFAULT_KNN_DECISION;

@Override
public void evaluateForQuery(final QueryBuilder queryBuilder, final IndexSettings indexSettings) {
if (queryBuilder instanceof KNNQueryBuilder && indexSettings.getValue(KNNSettings.IS_KNN_INDEX_SETTING)) {
knnDecision = YES;
} else {
knnDecision = DEFAULT_KNN_DECISION;
}
}

@Override
public ConcurrentSearchDecision getConcurrentSearchDecision() {
return knnDecision;
}

/**
* Returns {@link KNNConcurrentSearchRequestDecider} when index.knn is true
*/
public static class Factory implements ConcurrentSearchRequestDecider.Factory {
public Optional<ConcurrentSearchRequestDecider> create(final IndexSettings indexSettings) {
if (indexSettings.getValue(KNNSettings.IS_KNN_INDEX_SETTING)) {
return Optional.of(new KNNConcurrentSearchRequestDecider());
}
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.integ.search;

import com.google.common.primitives.Floats;
import lombok.SneakyThrows;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.junit.BeforeClass;
import org.opensearch.client.Response;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.knn.KNNJsonIndexMappingsBuilder;
import org.opensearch.knn.KNNRestTestCase;
import org.opensearch.knn.KNNResult;
import org.opensearch.knn.TestUtils;
import org.opensearch.knn.index.SpaceType;
import org.opensearch.knn.index.engine.KNNEngine;
import org.opensearch.knn.index.query.KNNQueryBuilder;
import org.opensearch.knn.plugin.script.KNNScoringUtil;

import java.io.IOException;
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

import static org.opensearch.knn.common.KNNConstants.METHOD_HNSW;

/**
* Note that this is simply a sanity test to make sure that concurrent search code path is hit E2E and scores are intact
* There is no latency verification as it can be better encapsulated in nightly runs.
*/
public class ConcurrentSegmentSearchIT extends KNNRestTestCase {

static TestUtils.TestData testData;

@BeforeClass
public static void setUpClass() throws IOException {
if (ConcurrentSegmentSearchIT.class.getClassLoader() == null) {
throw new IllegalStateException("ClassLoader of ConcurrentSegmentSearchIT Class is null");
}
URL testIndexVectors = ConcurrentSegmentSearchIT.class.getClassLoader().getResource("data/test_vectors_1000x128.json");
URL testQueries = ConcurrentSegmentSearchIT.class.getClassLoader().getResource("data/test_queries_100x128.csv");
assert testIndexVectors != null;
assert testQueries != null;
testData = new TestUtils.TestData(testIndexVectors.getPath(), testQueries.getPath());
}

@SneakyThrows
public void testConcurrentSegmentSearch_thenSucceed() {
String indexName = "test-concurrent-segment";
String fieldName = "test-field-1";
int dimension = testData.indexData.vectors[0].length;
final XContentBuilder indexBuilder = createFaissHnswIndexMapping(fieldName, dimension);
Map<String, Object> mappingMap = xContentBuilderToMap(indexBuilder);
String mapping = indexBuilder.toString();
createKnnIndex(indexName, mapping);
assertEquals(new TreeMap<>(mappingMap), new TreeMap<>(getIndexMappingAsMap(indexName)));

// Index the test data
for (int i = 0; i < testData.indexData.docs.length; i++) {
addKnnDoc(
indexName,
Integer.toString(testData.indexData.docs[i]),
fieldName,
Floats.asList(testData.indexData.vectors[i]).toArray()
);
}
refreshAllNonSystemIndices();
updateIndexSettings(indexName, Settings.builder().put("index.search.concurrent_segment_search.mode", "auto"));

// Test search queries
int k = 10;
verifySearch(indexName, fieldName, k);

updateIndexSettings(indexName, Settings.builder().put("index.search.concurrent_segment_search.mode", "all"));
verifySearch(indexName, fieldName, k);

deleteKNNIndex(indexName);
}

/*
{
"properties": {
"<fieldName>": {
"type": "knn_vector",
"dimension": <dimension>,
"method": {
"name": "hnsw",
"space_type": "l2",
"engine": "faiss",
"parameters": {
"m": 16,
"ef_construction": 128,
"ef_search": 128
}
}
}
}
*/
@SneakyThrows
private XContentBuilder createFaissHnswIndexMapping(String fieldName, int dimension) {
return KNNJsonIndexMappingsBuilder.builder()
.fieldName(fieldName)
.dimension(dimension)
.method(
KNNJsonIndexMappingsBuilder.Method.builder()
.engine(KNNEngine.FAISS.getName())
.methodName(METHOD_HNSW)
.spaceType(SpaceType.L2.getValue())
.parameters(KNNJsonIndexMappingsBuilder.Method.Parameters.builder().efConstruction(128).efSearch(128).m(16).build())
.build()
)
.build()
.getIndexMappingBuilder();
}

@SneakyThrows
private void verifySearch(String indexName, String fieldName, int k) {
for (int i = 0; i < testData.queries.length; i++) {
final KNNQueryBuilder queryBuilder = KNNQueryBuilder.builder().fieldName(fieldName).vector(testData.queries[i]).k(k).build();
Response response = searchKNNIndex(indexName, queryBuilder, k);
String responseBody = EntityUtils.toString(response.getEntity());
List<KNNResult> knnResults = parseSearchResponse(responseBody, fieldName);
assertEquals(k, knnResults.size());

List<Float> actualScores = parseSearchResponseScore(responseBody, fieldName);
for (int j = 0; j < k; j++) {
float[] primitiveArray = knnResults.get(j).getVector();
assertEquals(
KNNEngine.FAISS.score(KNNScoringUtil.l2Squared(testData.queries[i], primitiveArray), SpaceType.L2),
actualScores.get(j),
0.0001
);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.plugin.search;

import org.opensearch.index.IndexSettings;
import org.opensearch.index.query.MatchAllQueryBuilder;
import org.opensearch.knn.KNNTestCase;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.query.KNNQueryBuilder;
import org.opensearch.search.deciders.ConcurrentSearchDecision;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class KNNConcurrentSearchRequestDeciderTests extends KNNTestCase {

public void testDecider_thenSucceed() {
ConcurrentSearchDecision noop = new ConcurrentSearchDecision(ConcurrentSearchDecision.DecisionStatus.NO_OP, "Default decision");

KNNConcurrentSearchRequestDecider decider = new KNNConcurrentSearchRequestDecider();
assertDecision(noop, decider.getConcurrentSearchDecision());
IndexSettings indexSettingsMock = mock(IndexSettings.class);
when(indexSettingsMock.getValue(KNNSettings.IS_KNN_INDEX_SETTING)).thenReturn(Boolean.FALSE);

// Non KNNQueryBuilder
decider.evaluateForQuery(new MatchAllQueryBuilder(), indexSettingsMock);
assertDecision(noop, decider.getConcurrentSearchDecision());
decider.evaluateForQuery(
KNNQueryBuilder.builder().vector(new float[] { 1f, 2f, 3f, 4f, 5f, 6f }).fieldName("decider").k(10).build(),
indexSettingsMock
);
assertDecision(noop, decider.getConcurrentSearchDecision());

when(indexSettingsMock.getValue(KNNSettings.IS_KNN_INDEX_SETTING)).thenReturn(Boolean.TRUE);
decider.evaluateForQuery(
KNNQueryBuilder.builder().vector(new float[] { 1f, 2f, 3f, 4f, 5f, 6f }).fieldName("decider").k(10).build(),
indexSettingsMock
);
ConcurrentSearchDecision yes = new ConcurrentSearchDecision(
ConcurrentSearchDecision.DecisionStatus.YES,
"Enable concurrent search for knn as Query has k-NN query in it and index is k-nn index"
);
assertDecision(yes, decider.getConcurrentSearchDecision());

decider.evaluateForQuery(new MatchAllQueryBuilder(), indexSettingsMock);
assertDecision(noop, decider.getConcurrentSearchDecision());
}

public void testDeciderFactory_thenSucceed() {
KNNConcurrentSearchRequestDecider.Factory factory = new KNNConcurrentSearchRequestDecider.Factory();
IndexSettings indexSettingsMock = mock(IndexSettings.class);
when(indexSettingsMock.getValue(KNNSettings.IS_KNN_INDEX_SETTING)).thenReturn(Boolean.TRUE);
assertNotSame(factory.create(indexSettingsMock).get(), factory.create(indexSettingsMock).get());
when(indexSettingsMock.getValue(KNNSettings.IS_KNN_INDEX_SETTING)).thenReturn(Boolean.FALSE);
assertTrue(factory.create(indexSettingsMock).isEmpty());
}

private void assertDecision(ConcurrentSearchDecision expected, ConcurrentSearchDecision actual) {
assertEquals(expected.getDecisionReason(), actual.getDecisionReason());
assertEquals(expected.getDecisionStatus(), actual.getDecisionStatus());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import lombok.NonNull;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.knn.common.KNNConstants;

import java.io.IOException;

Expand All @@ -26,7 +27,7 @@ public class KNNJsonIndexMappingsBuilder {
private String vectorDataType;
private Method method;

public String getIndexMapping() throws IOException {
public XContentBuilder getIndexMappingBuilder() throws IOException {
if (nestedFieldName != null) {
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()
.startObject()
Expand All @@ -40,7 +41,7 @@ public String getIndexMapping() throws IOException {
addVectorDataType(xContentBuilder);
addMethod(xContentBuilder);
xContentBuilder.endObject().endObject().endObject().endObject().endObject();
return xContentBuilder.toString();
return xContentBuilder;
} else {
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()
.startObject()
Expand All @@ -51,10 +52,14 @@ public String getIndexMapping() throws IOException {
addVectorDataType(xContentBuilder);
addMethod(xContentBuilder);
xContentBuilder.endObject().endObject().endObject();
return xContentBuilder.toString();
return xContentBuilder;
}
}

public String getIndexMapping() throws IOException {
return getIndexMappingBuilder().toString();
}

private void addVectorDataType(final XContentBuilder xContentBuilder) throws IOException {
if (vectorDataType == null) {
return;
Expand Down Expand Up @@ -104,6 +109,7 @@ public static class Parameters {
private Encoder encoder;
private Integer efConstruction;
private Integer efSearch;
private Integer m;

private void addTo(final XContentBuilder xContentBuilder) throws IOException {
xContentBuilder.startObject("parameters");
Expand All @@ -113,6 +119,9 @@ private void addTo(final XContentBuilder xContentBuilder) throws IOException {
if (efSearch != null) {
xContentBuilder.field("ef_search", efSearch);
}
if (m != null) {
xContentBuilder.field(KNNConstants.METHOD_PARAMETER_M, m);
}
addEncoder(xContentBuilder);
xContentBuilder.endObject();
}
Expand Down

0 comments on commit 0421cdc

Please sign in to comment.