Skip to content

Commit

Permalink
Fixed LeafReaders casting errors to SegmentReaders when segment repli…
Browse files Browse the repository at this point in the history
…cation is enabled during search

Signed-off-by: Navneet Verma <[email protected]>
  • Loading branch information
navneet1v committed Jul 16, 2024
1 parent 48478b4 commit 1886dfa
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Enhancements
### Bug Fixes
* Fixing the arithmetic to find the number of vectors to stream from java to jni layer.[#1804](https://github.com/opensearch-project/k-NN/pull/1804)
* Fixed LeafReaders casting errors to SegmentReaders when segment replication is enabled during search.[#1808](https://github.com/opensearch-project/k-NN/pull/1808)
* Release memory properly for an array type [#1820](https://github.com/opensearch-project/k-NN/pull/1820)
* FIX Same Suffix Cause Recall Drop to zero [#1802](https://github.com/opensearch-project/k-NN/pull/1802)
### Infrastructure
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/opensearch/knn/index/KNNIndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@
import lombok.Getter;
import lombok.extern.log4j.Log4j2;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.FilterLeafReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.FilterDirectory;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.knn.index.mapper.KNNVectorFieldMapper;
Expand Down Expand Up @@ -160,7 +160,7 @@ List<EngineFileContext> getEngineFileContexts(IndexReader indexReader, KNNEngine
List<EngineFileContext> engineFiles = new ArrayList<>();

for (LeafReaderContext leafReaderContext : indexReader.leaves()) {
SegmentReader reader = (SegmentReader) FilterLeafReader.unwrap(leafReaderContext.reader());
SegmentReader reader = Lucene.segmentReader(leafReaderContext.reader());
Path shardPath = ((FSDirectory) FilterDirectory.unwrap(reader.directory())).getDirectory();
String fileExtension = reader.getSegmentInfo().info.getUseCompoundFile()
? knnEngine.getCompoundExtension()
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/org/opensearch/knn/index/query/KNNWeight.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.apache.lucene.index.BinaryDocValues;
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.FilterLeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.search.DocIdSetIterator;
Expand All @@ -29,6 +28,7 @@
import org.apache.lucene.util.DocIdSetBuilder;
import org.apache.lucene.util.FixedBitSet;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.knn.common.KNNConstants;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.SpaceType;
Expand Down Expand Up @@ -203,7 +203,7 @@ private int[] bitSetToIntArray(final BitSet bitSet) {

private Map<Integer, Float> doANNSearch(final LeafReaderContext context, final BitSet filterIdsBitSet, final int cardinality)
throws IOException {
SegmentReader reader = (SegmentReader) FilterLeafReader.unwrap(context.reader());
final SegmentReader reader = Lucene.segmentReader(context.reader());
String directory = ((FSDirectory) FilterDirectory.unwrap(reader.directory())).getDirectory().toString();

FieldInfo fieldInfo = reader.getFieldInfos().fieldInfo(knnQuery.getField());
Expand Down Expand Up @@ -394,7 +394,7 @@ private Map<Integer, Float> doExactSearch(final LeafReaderContext leafReaderCont
}

private KNNIterator getFilteredKNNIterator(final LeafReaderContext leafReaderContext, final BitSet filterIdsBitSet) throws IOException {
final SegmentReader reader = (SegmentReader) FilterLeafReader.unwrap(leafReaderContext.reader());
final SegmentReader reader = Lucene.segmentReader(leafReaderContext.reader());
final FieldInfo fieldInfo = reader.getFieldInfos().fieldInfo(knnQuery.getField());
final BinaryDocValues values = DocValues.getBinary(leafReaderContext.reader(), fieldInfo.getName());
final SpaceType spaceType = getSpaceType(fieldInfo);
Expand Down
94 changes: 94 additions & 0 deletions src/test/java/org/opensearch/knn/index/SegmentReplicationIT.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.knn.index;

import lombok.SneakyThrows;
import lombok.extern.log4j.Log4j2;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.junit.Assert;
import org.opensearch.client.Response;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.knn.KNNRestTestCase;
import org.opensearch.knn.KNNResult;

import java.util.List;

/**
* This IT class contains will contain special cases of IT for segment replication behavior.
* All the index created in this test will have replication type SEGMENT, number of replicas: 1 and should be run on
* at-least 2 node configuration.
*/
@Log4j2
public class SegmentReplicationIT extends KNNRestTestCase {
private static final String INDEX_NAME = "segment-replicated-knn-index";

@SneakyThrows
public void testSearchOnReplicas_whenIndexHasDeletedDocs_thenSuccess() {
createKnnIndex(INDEX_NAME, getKNNSegmentReplicatedIndexSettings(), createKNNIndexMethodFieldMapping(FIELD_NAME, 2));

Float[] vector = { 1.3f, 2.2f };
int docsInIndex = 10;

for (int i = 0; i < docsInIndex; i++) {
addKnnDoc(INDEX_NAME, Integer.toString(i), FIELD_NAME, vector);
}
refreshIndex(INDEX_NAME);
int deleteDocs = 5;
for (int i = 0; i < deleteDocs; i++) {
deleteKnnDoc(INDEX_NAME, Integer.toString(i));
}
refreshIndex(INDEX_NAME);
// sleep for 5sec to ensure data is replicated. I don't have a better way here to know if segments has been
// replicated.
Thread.sleep(5000);
// validate warmup is successful or not.
doKnnWarmup(List.of(INDEX_NAME));

XContentBuilder queryBuilder = XContentFactory.jsonBuilder().startObject().startObject("query");
queryBuilder.startObject("knn");
queryBuilder.startObject(FIELD_NAME);
queryBuilder.field("vector", vector);
queryBuilder.field("k", docsInIndex);
queryBuilder.endObject().endObject().endObject().endObject();

// validate primaries are working
Response searchResponse = performSearch(INDEX_NAME, queryBuilder.toString(), "preference=_primary");
String responseBody = EntityUtils.toString(searchResponse.getEntity());
List<KNNResult> knnResults = parseSearchResponse(responseBody, FIELD_NAME);
assertEquals(docsInIndex - deleteDocs, knnResults.size());

if (ensureMinDataNodesCountForTestingQueriesOnReplica()) {
// validate replicas are working
searchResponse = performSearch(INDEX_NAME, queryBuilder.toString(), "preference=_replica");
responseBody = EntityUtils.toString(searchResponse.getEntity());
knnResults = parseSearchResponse(responseBody, FIELD_NAME);
assertEquals(docsInIndex - deleteDocs, knnResults.size());
}
}

private boolean ensureMinDataNodesCountForTestingQueriesOnReplica() {
int dataNodeCount = getDataNodeCount();
if (dataNodeCount <= 1) {
log.warn(
"Not running segment replication tests named: "
+ "testSearchOnReplicas_whenIndexHasDeletedDocs_thenSuccess, as data nodes count is not atleast 2. "
+ "Actual datanode count : {}",
dataNodeCount
);
Assert.assertTrue(true);
// making the test successful because we don't want to break already running tests.
return false;
}
return true;
}
}
35 changes: 34 additions & 1 deletion src/testFixtures/java/org/opensearch/knn/KNNRestTestCase.java
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,11 @@ protected Response searchExists(String index, ExistsQueryBuilder existsQueryBuil
}

protected Response performSearch(final String indexName, final String query) throws IOException {
Request request = new Request("POST", "/" + indexName + "/_search");
return performSearch(indexName, query, "");
}

protected Response performSearch(final String indexName, final String query, final String urlParameters) throws IOException {
Request request = new Request("POST", "/" + indexName + "/_search?" + urlParameters);
request.setJsonEntity(query);

Response response = client().performRequest(request);
Expand Down Expand Up @@ -667,6 +671,35 @@ protected Settings getKNNDefaultIndexSettings() {
return Settings.builder().put("number_of_shards", 1).put("number_of_replicas", 0).put("index.knn", true).build();
}

protected Settings getKNNSegmentReplicatedIndexSettings() {
return Settings.builder()
.put("number_of_shards", 1)
.put("number_of_replicas", 1)
.put("index.knn", true)
.put("index.replication.type", "SEGMENT")
.build();
}

@SneakyThrows
protected int getDataNodeCount() {
Request request = new Request("GET", "_nodes/stats?filter_path=nodes.*.roles");

Response response = client().performRequest(request);
assertEquals(RestStatus.OK, RestStatus.fromCode(response.getStatusLine().getStatusCode()));
String responseBody = EntityUtils.toString(response.getEntity());

Map<String, Object> responseMap = createParser(MediaTypeRegistry.getDefaultMediaType().xContent(), responseBody).map();
Map<String, Object> nodesInfo = (Map<String, Object>) responseMap.get("nodes");
int dataNodeCount = 0;
for (String key : nodesInfo.keySet()) {
Map<String, List<String>> nodeRoles = (Map<String, List<String>>) nodesInfo.get(key);
if (nodeRoles.get("roles").contains("data")) {
dataNodeCount++;
}
}
return dataNodeCount;
}

/**
* Get Stats from KNN Plugin
*/
Expand Down

0 comments on commit 1886dfa

Please sign in to comment.