Skip to content

Commit

Permalink
Flush data after index
Browse files Browse the repository at this point in the history
Signed-off-by: Vijayan Balasubramanian <[email protected]>
  • Loading branch information
VijayanB committed Oct 23, 2024
1 parent 481da4f commit e2dcdc6
Show file tree
Hide file tree
Showing 12 changed files with 74 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@
package org.opensearch.knn.bwc;

import org.junit.Assert;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.SpaceType;
import org.opensearch.knn.index.VectorDataType;
import org.opensearch.knn.index.engine.KNNEngine;

import java.util.List;
import java.util.Map;

import static org.opensearch.knn.TestUtils.KNN_ALGO_PARAM_EF_CONSTRUCTION_MIN_VALUE;
Expand Down Expand Up @@ -52,6 +55,8 @@ public void testKNNIndexDefaultLegacyFieldMapping() throws Exception {
createKnnIndex(testIndex, getKNNDefaultIndexSettings(), createKnnIndexMapping(TEST_FIELD, DIMENSIONS));
addKNNDocs(testIndex, TEST_FIELD, DIMENSIONS, DOC_ID, NUM_DOCS);
} else {
// update index setting to allow build graph always since we test graph count that are loaded into memory
updateIndexSettings(testIndex, Settings.builder().put(KNNSettings.INDEX_KNN_ADVANCED_APPROXIMATE_THRESHOLD, 0));
validateKNNIndexingOnUpgrade(NUM_DOCS);
}
}
Expand Down Expand Up @@ -275,13 +280,14 @@ public void testNoParametersOnUpgrade() throws Exception {

// KNN indexing tests when the cluster is upgraded to latest version
public void validateKNNIndexingOnUpgrade(int numOfDocs) throws Exception {
updateIndexSettings(testIndex, Settings.builder().put(KNNSettings.INDEX_KNN_ADVANCED_APPROXIMATE_THRESHOLD, 0));
forceMergeKnnIndex(testIndex);
QUERY_COUNT = numOfDocs;
validateKNNSearch(testIndex, TEST_FIELD, DIMENSIONS, QUERY_COUNT, K);
cleanUpCache();
clearCache(List.of(testIndex));
DOC_ID = numOfDocs;
addKNNDocs(testIndex, TEST_FIELD, DIMENSIONS, DOC_ID, NUM_DOCS);
QUERY_COUNT = QUERY_COUNT + NUM_DOCS;
validateKNNSearch(testIndex, TEST_FIELD, DIMENSIONS, QUERY_COUNT, K);
forceMergeKnnIndex(testIndex);
validateKNNSearch(testIndex, TEST_FIELD, DIMENSIONS, QUERY_COUNT, K);
deleteKNNIndex(testIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
package org.opensearch.knn.bwc;

import org.opensearch.common.settings.Settings;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.SpaceType;

import java.util.Collections;
import java.util.Locale;

import static org.opensearch.knn.TestUtils.KNN_ALGO_PARAM_M_MIN_VALUE;
import static org.opensearch.knn.TestUtils.KNN_ALGO_PARAM_EF_CONSTRUCTION_MIN_VALUE;
Expand All @@ -35,6 +35,8 @@ public void testKNNWarmupDefaultLegacyFieldMapping() throws Exception {
createKnnIndex(testIndex, getKNNDefaultIndexSettings(), createKnnIndexMapping(TEST_FIELD, DIMENSIONS));
addKNNDocs(testIndex, TEST_FIELD, DIMENSIONS, DOC_ID, NUM_DOCS);
} else {
// update index setting to allow build graph always since we test graph count that are loaded into memory
updateIndexSettings(testIndex, Settings.builder().put(KNNSettings.INDEX_KNN_ADVANCED_APPROXIMATE_THRESHOLD, 0));
validateKNNWarmupOnUpgrade();
}
}
Expand Down Expand Up @@ -66,6 +68,8 @@ public void testKNNWarmupDefaultMethodFieldMapping() throws Exception {
createKnnIndex(testIndex, getKNNDefaultIndexSettings(), createKNNIndexMethodFieldMapping(TEST_FIELD, DIMENSIONS));
addKNNDocs(testIndex, TEST_FIELD, DIMENSIONS, DOC_ID, NUM_DOCS);
} else {
// update index setting to allow build graph always since we test graph count that are loaded into memory
updateIndexSettings(testIndex, Settings.builder().put(KNNSettings.INDEX_KNN_ADVANCED_APPROXIMATE_THRESHOLD, 0));
validateKNNWarmupOnUpgrade();
}
}
Expand All @@ -86,22 +90,26 @@ public void testKNNWarmupCustomMethodFieldMapping() throws Exception {
}

public void validateKNNWarmupOnUpgrade() throws Exception {
// update index setting to allow build graph always since we test graph count that are loaded into memory
updateIndexSettings(testIndex, Settings.builder().put(KNNSettings.INDEX_KNN_ADVANCED_APPROXIMATE_THRESHOLD, 0));
int graphCount = getTotalGraphsInCache();
knnWarmup(Collections.singletonList(testIndex));
int totalGraph = getTotalGraphsInCache();
assertTrue(String.format(Locale.ROOT, "[%d] is not greater than [%d]", totalGraph, graphCount), totalGraph > graphCount);
assertTrue(totalGraph > graphCount);

QUERY_COUNT = NUM_DOCS;
validateKNNSearch(testIndex, TEST_FIELD, DIMENSIONS, QUERY_COUNT, K);

DOC_ID = NUM_DOCS;
addKNNDocs(testIndex, TEST_FIELD, DIMENSIONS, DOC_ID, NUM_DOCS);
forceMergeKnnIndex(testIndex);

int updatedGraphCount = getTotalGraphsInCache();
knnWarmup(Collections.singletonList(testIndex));
assertTrue(getTotalGraphsInCache() > updatedGraphCount);

QUERY_COUNT = QUERY_COUNT + NUM_DOCS;

validateKNNSearch(testIndex, TEST_FIELD, DIMENSIONS, QUERY_COUNT, K);
deleteKNNIndex(testIndex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ public void testClearCache() throws Exception {
createKnnIndex(testIndex, getKNNDefaultIndexSettings(), createKnnIndexMapping(TEST_FIELD, DIMENSIONS));
int docIdOld = 0;
addKNNDocs(testIndex, TEST_FIELD, DIMENSIONS, docIdOld, NUM_DOCS);
int graphCount = getTotalGraphsInCache();
knnWarmup(Collections.singletonList(testIndex));
assertTrue(getTotalGraphsInCache() > graphCount);
break;
case UPGRADED:
queryCnt = NUM_DOCS;
Expand All @@ -42,14 +45,8 @@ public void testClearCache() throws Exception {

// validation steps for Clear Cache API after upgrading all nodes from old version to new version
public void validateClearCacheOnUpgrade(int queryCount) throws Exception {
int graphCount = getTotalGraphsInCache();
knnWarmup(Collections.singletonList(testIndex));
assertTrue(getTotalGraphsInCache() > graphCount);
validateKNNSearch(testIndex, TEST_FIELD, DIMENSIONS, queryCount, K);

clearCache(Collections.singletonList(testIndex));
assertEquals(0, getTotalGraphsInCache());
validateKNNSearch(testIndex, TEST_FIELD, DIMENSIONS, queryCount, K);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

package org.opensearch.knn.bwc;

import org.opensearch.common.settings.Settings;
import org.opensearch.knn.index.KNNSettings;

import java.util.Collections;

import static org.opensearch.knn.TestUtils.NODES_BWC_CLUSTER;
Expand Down Expand Up @@ -33,6 +36,7 @@ public void testKNNWarmup() throws Exception {
docIdMixed = 2 * NUM_DOCS;
totalDocsCountMixed = 3 * NUM_DOCS;
}
updateIndexSettings(testIndex, Settings.builder().put(KNNSettings.INDEX_KNN_ADVANCED_APPROXIMATE_THRESHOLD, 0));
validateKNNWarmupOnUpgrade(totalDocsCountMixed, docIdMixed);
break;
case UPGRADED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ public NativeEngines990KnnVectorsFormat() {
this(new Lucene99FlatVectorsFormat(new DefaultFlatVectorScorer()));
}

public NativeEngines990KnnVectorsFormat(int approximateThreshold) {
this(new Lucene99FlatVectorsFormat(new DefaultFlatVectorScorer()), approximateThreshold);
}

public NativeEngines990KnnVectorsFormat(final FlatVectorsFormat flatVectorsFormat) {
this(flatVectorsFormat, KNNSettings.INDEX_KNN_ADVANCED_APPROXIMATE_THRESHOLD_DEFAULT_VALUE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
* able to pick this class if its in test folder. Don't use this codec outside of testing
*/
public class UnitTestCodec extends FilterCodec {
private static final Integer BUILD_GRAPH_ALWAYS = 0;

public UnitTestCodec() {
super("UnitTestCodec", KNNCodecVersion.current().getDefaultKnnCodecSupplier().get());
}
Expand All @@ -30,7 +32,7 @@ public KnnVectorsFormat knnVectorsFormat() {
return new PerFieldKnnVectorsFormat() {
@Override
public KnnVectorsFormat getKnnVectorsFormatForField(String field) {
return new NativeEngines990KnnVectorsFormat();
return new NativeEngines990KnnVectorsFormat(BUILD_GRAPH_ALWAYS);
}
};
}
Expand Down
22 changes: 22 additions & 0 deletions src/test/java/org/opensearch/knn/KNNSingleNodeTestCase.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.knn;

import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.cluster.ClusterName;
Expand All @@ -14,6 +15,7 @@
import org.opensearch.cluster.block.ClusterBlocks;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.query.KNNQueryBuilder;
import org.opensearch.knn.index.memory.NativeMemoryCacheManager;
import org.opensearch.knn.index.memory.NativeMemoryLoadStrategy;
Expand Down Expand Up @@ -104,6 +106,14 @@ protected void createKnnIndexMapping(String indexName, String fieldName, Integer
OpenSearchAssertions.assertAcked(client().admin().indices().putMapping(request).actionGet());
}

/**
* Create simple k-NN mapping with engine
*/
protected void updateIndexSetting(String indexName, Settings setting) {
UpdateSettingsRequest request = new UpdateSettingsRequest(setting, indexName);
OpenSearchAssertions.assertAcked(client().admin().indices().updateSettings(request).actionGet());
}

/**
* Create simple k-NN mapping which can be nested.
* e.g. fieldPath = "a.b.c" will create mapping for "c" as knn_vector
Expand Down Expand Up @@ -140,6 +150,18 @@ protected Settings getKNNDefaultIndexSettings() {
return Settings.builder().put("number_of_shards", 1).put("number_of_replicas", 0).put("index.knn", true).build();
}

/**
* Get default k-NN settings for test cases with build graph always
*/
protected Settings getKNNDefaultIndexSettingsBuildsGraphAlways() {
return Settings.builder()
.put("number_of_shards", 1)
.put("number_of_replicas", 0)
.put("index.knn", true)
.put(KNNSettings.INDEX_KNN_ADVANCED_APPROXIMATE_THRESHOLD, 0)
.build();
}

/**
* Add a k-NN doc to an index
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import static org.hamcrest.Matchers.containsString;

public class KNNESSettingsTestIT extends KNNRestTestCase {

public static final int ALWAYS_BUILD_GRAPH = 0;

/**
* KNN Index writes should be blocked when the plugin disabled
* @throws Exception Exception from test
Expand Down Expand Up @@ -72,7 +75,7 @@ public void testQueriesPluginDisabled() throws Exception {
}

public void testItemRemovedFromCache_expiration() throws Exception {
createKnnIndex(INDEX_NAME, createKnnIndexMapping(FIELD_NAME, 2));
createKnnIndex(INDEX_NAME, buildKNNIndexSettings(ALWAYS_BUILD_GRAPH), createKnnIndexMapping(FIELD_NAME, 2));
updateClusterSettings(KNNSettings.KNN_CACHE_ITEM_EXPIRY_ENABLED, true);
updateClusterSettings(KNNSettings.KNN_CACHE_ITEM_EXPIRY_TIME_MINUTES, "1m");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.apache.lucene.util.StringHelper;
import org.apache.lucene.util.Version;
import org.mockito.Mockito;
import org.opensearch.common.settings.Settings;
import org.opensearch.knn.KNNSingleNodeTestCase;
import org.opensearch.index.IndexService;
import org.opensearch.index.engine.Engine;
Expand Down Expand Up @@ -71,6 +72,7 @@ public void testWarmup_emptyIndex() throws IOException {
public void testWarmup_shardPresentInCache() throws InterruptedException, ExecutionException, IOException {
IndexService indexService = createKNNIndex(testIndexName);
createKnnIndexMapping(testIndexName, testFieldName, dimensions);
updateIndexSetting(testIndexName, Settings.builder().put(KNNSettings.INDEX_KNN_ADVANCED_APPROXIMATE_THRESHOLD, 0).build());
addKnnDoc(testIndexName, "1", testFieldName, new Float[] { 2.5F, 3.5F });

searchKNNIndex(testIndexName, testFieldName, new float[] { 1.0f, 2.0f }, 1);
Expand All @@ -85,6 +87,7 @@ public void testWarmup_shardPresentInCache() throws InterruptedException, Execut
public void testWarmup_shardNotPresentInCache() throws InterruptedException, ExecutionException, IOException {
IndexService indexService = createKNNIndex(testIndexName);
createKnnIndexMapping(testIndexName, testFieldName, dimensions);
updateIndexSetting(testIndexName, Settings.builder().put(KNNSettings.INDEX_KNN_ADVANCED_APPROXIMATE_THRESHOLD, 0).build());
IndexShard indexShard;
KNNIndexShard knnIndexShard;

Expand All @@ -106,6 +109,7 @@ public void testWarmup_shardNotPresentInCache() throws InterruptedException, Exe
public void testGetAllEngineFileContexts() throws IOException, ExecutionException, InterruptedException {
IndexService indexService = createKNNIndex(testIndexName);
createKnnIndexMapping(testIndexName, testFieldName, dimensions);
updateIndexSetting(testIndexName, Settings.builder().put(KNNSettings.INDEX_KNN_ADVANCED_APPROXIMATE_THRESHOLD, 0).build());

IndexShard indexShard = indexService.iterator().next();
KNNIndexShard knnIndexShard = new KNNIndexShard(indexShard);
Expand Down Expand Up @@ -204,6 +208,7 @@ public void testClearCache_emptyIndex() {
public void testClearCache_shardPresentInCache() {
IndexService indexService = createKNNIndex(testIndexName);
createKnnIndexMapping(testIndexName, testFieldName, dimensions);
updateIndexSetting(testIndexName, Settings.builder().put(KNNSettings.INDEX_KNN_ADVANCED_APPROXIMATE_THRESHOLD, 0).build());
addKnnDoc(testIndexName, String.valueOf(randomInt()), testFieldName, new Float[] { randomFloat(), randomFloat() });

IndexShard indexShard = indexService.iterator().next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public void testShardOperation() {
KNNWarmupTransportAction knnWarmupTransportAction = node().injector().getInstance(KNNWarmupTransportAction.class);
assertEquals(0, NativeMemoryCacheManager.getInstance().getIndicesCacheStats().size());

IndexService indexService = createKNNIndex(testIndex);
IndexService indexService = createIndex(testIndex, getKNNDefaultIndexSettingsBuildsGraphAlways());
createKnnIndexMapping(testIndex, TEST_FIELD, DIMENSIONS);
addKnnDoc(testIndex, String.valueOf(randomInt()), TEST_FIELD, new Float[] { randomFloat(), randomFloat() });
ShardRouting shardRouting = indexService.iterator().next().routingEntry();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void testShardOperation() throws IOException, ExecutionException, Interru
KNNWarmupTransportAction knnWarmupTransportAction = node().injector().getInstance(KNNWarmupTransportAction.class);
assertEquals(0, NativeMemoryCacheManager.getInstance().getIndicesCacheStats().size());

indexService = createKNNIndex(testIndexName);
indexService = createIndex(testIndexName, getKNNDefaultIndexSettingsBuildsGraphAlways());
createKnnIndexMapping(testIndexName, testFieldName, dimensions);
shardRouting = indexService.iterator().next().routingEntry();

Expand Down
9 changes: 9 additions & 0 deletions src/testFixtures/java/org/opensearch/knn/KNNRestTestCase.java
Original file line number Diff line number Diff line change
Expand Up @@ -1309,6 +1309,7 @@ public void addKNNDocs(String testIndex, String testField, int dimension, int fi
Arrays.fill(indexVector, (float) i);
addKnnDoc(testIndex, Integer.toString(i), testField, indexVector);
}
flushIndex(testIndex);
}

public void addKNNByteDocs(String testIndex, String testField, int dimension, int firstDocID, int numDocs) throws IOException {
Expand All @@ -1317,6 +1318,7 @@ public void addKNNByteDocs(String testIndex, String testField, int dimension, in
Arrays.fill(indexVector, (byte) i);
addKnnDoc(testIndex, Integer.toString(i), testField, indexVector);
}
flushIndex(testIndex);
}

public void validateKNNSearch(String testIndex, String testField, int dimension, int numDocs, int k) throws Exception {
Expand Down Expand Up @@ -1792,6 +1794,13 @@ protected void refreshIndex(final String index) throws IOException {
assertEquals(request.getEndpoint() + ": failed", RestStatus.OK, RestStatus.fromCode(response.getStatusLine().getStatusCode()));
}

protected void flushIndex(final String index) throws IOException {
Request request = new Request("POST", "/" + index + "/_flush");

Response response = client().performRequest(request);
assertEquals(request.getEndpoint() + ": failed", RestStatus.OK, RestStatus.fromCode(response.getStatusLine().getStatusCode()));
}

protected void addKnnDocWithAttributes(String docId, float[] vector, Map<String, String> fieldValues) throws IOException {
Request request = new Request("POST", "/" + INDEX_NAME + "/_doc/" + docId + "?refresh=true");

Expand Down

0 comments on commit e2dcdc6

Please sign in to comment.