From bf9bf6170f4fd17aa2ef9bae0d07fd6ceaa32186 Mon Sep 17 00:00:00 2001 From: Dooyong Kim Date: Wed, 11 Sep 2024 16:07:21 -0700 Subject: [PATCH] Introduced loading + writing layer in vector searching. --- NativeMemoryEntryContextTests.java | 294 ++++++++++++++++++ jni/include/faiss_index_service.h | 35 ++- jni/include/faiss_methods.h | 15 +- jni/include/faiss_wrapper.h | 6 + .../org_opensearch_knn_jni_FaissService.h | 50 +++ jni/include/stream_support.h | 243 +++++++++++++++ jni/src/faiss_index_service.cpp | 42 +++ jni/src/faiss_methods.cpp | 9 + jni/src/faiss_wrapper.cpp | 40 +++ .../org_opensearch_knn_jni_FaissService.cpp | 90 ++++++ .../opensearch/knn/index/KNNIndexShard.java | 3 + .../DefaultIndexBuildStrategy.java | 15 + .../MemOptimizedNativeIndexBuildStrategy.java | 89 ++++++ .../nativeindex/NativeIndexBuildStrategy.java | 2 + .../codec/nativeindex/NativeIndexWriter.java | 72 ++++- .../nativeindex/model/BuildIndexParams.java | 2 + .../memory/NativeMemoryEntryContext.java | 11 +- .../memory/NativeMemoryLoadStrategy.java | 55 +++- .../opensearch/knn/index/query/KNNWeight.java | 1 + .../knn/index/util/IndexInputWithBuffer.java | 31 ++ .../org/opensearch/knn/jni/FaissService.java | 36 +++ .../org/opensearch/knn/jni/JNIService.java | 50 +++ .../memory/NativeMemoryEntryContextTests.java | 4 + .../memory/NativeMemoryLoadStrategyTests.java | 8 +- 24 files changed, 1187 insertions(+), 16 deletions(-) create mode 100644 NativeMemoryEntryContextTests.java create mode 100644 jni/include/stream_support.h create mode 100644 src/main/java/org/opensearch/knn/index/util/IndexInputWithBuffer.java diff --git a/NativeMemoryEntryContextTests.java b/NativeMemoryEntryContextTests.java new file mode 100644 index 0000000000..223cb6b21a --- /dev/null +++ b/NativeMemoryEntryContextTests.java @@ -0,0 +1,294 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.knn.index.memory; + +import com.google.common.collect.ImmutableMap; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.knn.KNNTestCase; +import org.opensearch.knn.index.util.IndexUtil; +import org.opensearch.knn.index.VectorDataType; +import org.opensearch.knn.index.engine.KNNEngine; + +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Map; + +import static java.nio.file.StandardOpenOption.APPEND; +import static java.nio.file.StandardOpenOption.CREATE; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class NativeMemoryEntryContextTests extends KNNTestCase { + + public void testAbstract_getKey() { + String key = "test-1"; + TestNativeMemoryEntryContext testNativeMemoryEntryContext = new TestNativeMemoryEntryContext(key, 10); + + assertEquals(key, testNativeMemoryEntryContext.getKey()); + } + + public void testIndexEntryContext_load() throws IOException { + NativeMemoryLoadStrategy.IndexLoadStrategy indexLoadStrategy = mock(NativeMemoryLoadStrategy.IndexLoadStrategy.class); + NativeMemoryEntryContext.IndexEntryContext indexEntryContext = new NativeMemoryEntryContext.IndexEntryContext( + null, + "test", + indexLoadStrategy, + null, + "test" + ); + + NativeMemoryAllocation.IndexAllocation indexAllocation = new NativeMemoryAllocation.IndexAllocation( + null, + 0, + 10, + KNNEngine.DEFAULT, + "test-path", + "test-name", + null + ); + + when(indexLoadStrategy.load(indexEntryContext)).thenReturn(indexAllocation); + + assertEquals(indexAllocation, indexEntryContext.load()); + } + + public void testIndexEntryContext_calculateSize() throws IOException { + // Create a file and write random bytes to it + Path tmpFile = createTempFile(); + byte[] data = new byte[1024 * 3]; + Arrays.fill(data, (byte) 'c'); + + try (OutputStream out = new BufferedOutputStream(Files.newOutputStream(tmpFile, CREATE, APPEND))) { + out.write(data, 0, data.length); + } catch (IOException x) { + fail("Failed to write to file"); + } + + // Get the expected size of this function + int expectedSize = IndexUtil.getFileSizeInKB(tmpFile.toAbsolutePath().toString()); + + // Check that the indexEntryContext will return the same thing + NativeMemoryEntryContext.IndexEntryContext indexEntryContext = new NativeMemoryEntryContext.IndexEntryContext( + null, + tmpFile.toAbsolutePath().toString(), + null, + null, + "test" + ); + + assertEquals(expectedSize, indexEntryContext.calculateSizeInKB().longValue()); + } + + public void testIndexEntryContext_getOpenSearchIndexName() { + String openSearchIndexName = "test-index"; + NativeMemoryEntryContext.IndexEntryContext indexEntryContext = new NativeMemoryEntryContext.IndexEntryContext( + null, + "test", + null, + null, + openSearchIndexName + ); + + assertEquals(openSearchIndexName, indexEntryContext.getOpenSearchIndexName()); + } + + public void testIndexEntryContext_getParameters() { + Map parameters = ImmutableMap.of("test-1", 10); + NativeMemoryEntryContext.IndexEntryContext indexEntryContext = new NativeMemoryEntryContext.IndexEntryContext( + null, + "test", + null, + parameters, + "test" + ); + + assertEquals(parameters, indexEntryContext.getParameters()); + } + + public void testTrainingDataEntryContext_load() { + NativeMemoryLoadStrategy.TrainingLoadStrategy trainingLoadStrategy = mock(NativeMemoryLoadStrategy.TrainingLoadStrategy.class); + NativeMemoryEntryContext.TrainingDataEntryContext trainingDataEntryContext = new NativeMemoryEntryContext.TrainingDataEntryContext( + 0, + "test", + "test", + trainingLoadStrategy, + null, + 0, + 0, + VectorDataType.DEFAULT + ); + + NativeMemoryAllocation.TrainingDataAllocation trainingDataAllocation = new NativeMemoryAllocation.TrainingDataAllocation( + null, + 0, + 0, + VectorDataType.DEFAULT + ); + + when(trainingLoadStrategy.load(trainingDataEntryContext)).thenReturn(trainingDataAllocation); + + assertEquals(trainingDataAllocation, trainingDataEntryContext.load()); + } + + public void testTrainingDataEntryContext_getTrainIndexName() { + String trainIndexName = "test-index"; + NativeMemoryEntryContext.TrainingDataEntryContext trainingDataEntryContext = new NativeMemoryEntryContext.TrainingDataEntryContext( + 0, + trainIndexName, + "test", + null, + null, + 0, + 0, + VectorDataType.DEFAULT + ); + + assertEquals(trainIndexName, trainingDataEntryContext.getTrainIndexName()); + } + + public void testTrainingDataEntryContext_getTrainFieldName() { + String trainFieldName = "test-field"; + NativeMemoryEntryContext.TrainingDataEntryContext trainingDataEntryContext = new NativeMemoryEntryContext.TrainingDataEntryContext( + 0, + "test", + trainFieldName, + null, + null, + 0, + 0, + VectorDataType.DEFAULT + ); + + assertEquals(trainFieldName, trainingDataEntryContext.getTrainFieldName()); + } + + public void testTrainingDataEntryContext_getMaxVectorCount() { + int maxVectorCount = 11; + NativeMemoryEntryContext.TrainingDataEntryContext trainingDataEntryContext = new NativeMemoryEntryContext.TrainingDataEntryContext( + 0, + "test", + "test", + null, + null, + maxVectorCount, + 0, + VectorDataType.DEFAULT + ); + + assertEquals(maxVectorCount, trainingDataEntryContext.getMaxVectorCount()); + } + + public void testTrainingDataEntryContext_getSearchSize() { + int searchSize = 11; + NativeMemoryEntryContext.TrainingDataEntryContext trainingDataEntryContext = new NativeMemoryEntryContext.TrainingDataEntryContext( + 0, + "test", + "test", + null, + null, + 0, + searchSize, + VectorDataType.DEFAULT + ); + + assertEquals(searchSize, trainingDataEntryContext.getSearchSize()); + } + + public void testTrainingDataEntryContext_getIndicesService() { + ClusterService clusterService = mock(ClusterService.class); + NativeMemoryEntryContext.TrainingDataEntryContext trainingDataEntryContext = new NativeMemoryEntryContext.TrainingDataEntryContext( + 0, + "test", + "test", + null, + clusterService, + 0, + 0, + VectorDataType.DEFAULT + ); + + assertEquals(clusterService, trainingDataEntryContext.getClusterService()); + } + + private static class TestNativeMemoryAllocation implements NativeMemoryAllocation { + + @Override + public void close() { + + } + + @Override + public boolean isClosed() { + return false; + } + + @Override + public long getMemoryAddress() { + return 0; + } + + @Override + public void readLock() { + + } + + @Override + public void writeLock() { + + } + + @Override + public void readUnlock() { + + } + + @Override + public void writeUnlock() { + + } + + @Override + public int getSizeInKB() { + return 0; + } + } + + private static class TestNativeMemoryEntryContext extends NativeMemoryEntryContext { + + int size; + + /** + * Constructor + * + * @param key String used to identify entry in the cache + * @param size size this allocation will take up in the cache + */ + public TestNativeMemoryEntryContext(String key, int size) { + super(key); + this.size = size; + } + + @Override + public Integer calculateSizeInKB() { + return size; + } + + @Override + public TestNativeMemoryAllocation load() throws IOException { + return null; + } + } +} diff --git a/jni/include/faiss_index_service.h b/jni/include/faiss_index_service.h index 29ec90e803..0bcbf05d0c 100644 --- a/jni/include/faiss_index_service.h +++ b/jni/include/faiss_index_service.h @@ -29,8 +29,9 @@ namespace faiss_wrapper { * This class should evolve to have only cpp object but not jni object */ class IndexService { -public: + public: IndexService(std::unique_ptr faissMethods); + /** * Initialize index * @@ -45,6 +46,7 @@ class IndexService { * @return memory address of the native index object */ virtual jlong initIndex(knn_jni::JNIUtilInterface *jniUtil, JNIEnv *env, faiss::MetricType metric, std::string indexDescription, int dim, int numVectors, int threadCount, std::unordered_map parameters); + /** * Add vectors to index * @@ -55,6 +57,7 @@ class IndexService { * @param idMapAddress memory address of the native index object */ virtual void insertToIndex(int dim, int numIds, int threadCount, int64_t vectorsAddress, std::vector &ids, jlong idMapAddress); + /** * Write index to disk * @@ -63,9 +66,14 @@ class IndexService { * @param idMap memory address of the native index object */ virtual void writeIndex(std::string indexPath, jlong idMapAddress); + + virtual void writeIndexWithStream(JNIEnv *env, jobject outputStream, jlong idMapAddress); + virtual ~IndexService() = default; -protected: + + protected: virtual void allocIndex(faiss::Index * index, size_t dim, size_t numVectors); + std::unique_ptr faissMethods; }; @@ -74,10 +82,11 @@ class IndexService { * This class should evolve to have only cpp object but not jni object */ class BinaryIndexService : public IndexService { -public: + public: //TODO Remove dependency on JNIUtilInterface and JNIEnv //TODO Reduce the number of parameters BinaryIndexService(std::unique_ptr faissMethods); + /** * Initialize index * @@ -92,6 +101,7 @@ class BinaryIndexService : public IndexService { * @return memory address of the native index object */ virtual jlong initIndex(knn_jni::JNIUtilInterface *jniUtil, JNIEnv *env, faiss::MetricType metric, std::string indexDescription, int dim, int numVectors, int threadCount, std::unordered_map parameters) override; + /** * Add vectors to index * @@ -107,6 +117,7 @@ class BinaryIndexService : public IndexService { * @param parameters parameters to be applied to faiss index */ virtual void insertToIndex(int dim, int numIds, int threadCount, int64_t vectorsAddress, std::vector &ids, jlong idMapAddress) override; + /** * Write index to disk * @@ -120,8 +131,12 @@ class BinaryIndexService : public IndexService { * @param parameters parameters to be applied to faiss index */ virtual void writeIndex(std::string indexPath, jlong idMapAddress) override; + + void writeIndexWithStream(JNIEnv *env, jobject outputStream, jlong idMapAddress) override; + virtual ~BinaryIndexService() = default; -protected: + + protected: virtual void allocIndex(faiss::Index * index, size_t dim, size_t numVectors) override; }; @@ -130,12 +145,12 @@ class BinaryIndexService : public IndexService { * This class should evolve to have only cpp object but not jni object */ class ByteIndexService : public IndexService { -public: + public: //TODO Remove dependency on JNIUtilInterface and JNIEnv //TODO Reduce the number of parameters ByteIndexService(std::unique_ptr faissMethods); -/** + /** * Initialize index * * @param jniUtil jni util @@ -149,6 +164,7 @@ class ByteIndexService : public IndexService { * @return memory address of the native index object */ virtual jlong initIndex(knn_jni::JNIUtilInterface *jniUtil, JNIEnv *env, faiss::MetricType metric, std::string indexDescription, int dim, int numVectors, int threadCount, std::unordered_map parameters) override; + /** * Add vectors to index * @@ -164,6 +180,7 @@ class ByteIndexService : public IndexService { * @param parameters parameters to be applied to faiss index */ virtual void insertToIndex(int dim, int numIds, int threadCount, int64_t vectorsAddress, std::vector &ids, jlong idMapAddress) override; + /** * Write index to disk * @@ -177,8 +194,12 @@ class ByteIndexService : public IndexService { * @param parameters parameters to be applied to faiss index */ virtual void writeIndex(std::string indexPath, jlong idMapAddress) override; + + void writeIndexWithStream(JNIEnv *env, jobject outputStream, jlong idMapAddress) override; + virtual ~ByteIndexService() = default; -protected: + + protected: virtual void allocIndex(faiss::Index * index, size_t dim, size_t numVectors) override; }; diff --git a/jni/include/faiss_methods.h b/jni/include/faiss_methods.h index 38d8d756a7..5698e2b710 100644 --- a/jni/include/faiss_methods.h +++ b/jni/include/faiss_methods.h @@ -24,16 +24,27 @@ namespace faiss_wrapper { * This class helps to mock faiss methods during unit test */ class FaissMethods { -public: + public: FaissMethods() = default; + virtual faiss::Index* indexFactory(int d, const char* description, faiss::MetricType metric); + virtual faiss::IndexBinary* indexBinaryFactory(int d, const char* description); + virtual faiss::IndexIDMapTemplate* indexIdMap(faiss::Index* index); + virtual faiss::IndexIDMapTemplate* indexBinaryIdMap(faiss::IndexBinary* index); + virtual void writeIndex(const faiss::Index* idx, const char* fname); + virtual void writeIndexBinary(const faiss::IndexBinary* idx, const char* fname); + + virtual void writeIndexWithWriter(const faiss::Index* idx, faiss::IOWriter* writer); + + virtual void writeIndexBinaryWithWriter(const faiss::IndexBinary* idx, faiss::IOWriter* writer); + virtual ~FaissMethods() = default; -}; +}; // class FaissMethods } //namespace faiss_wrapper } //namespace knn_jni diff --git a/jni/include/faiss_wrapper.h b/jni/include/faiss_wrapper.h index d6375653d8..e523000da0 100644 --- a/jni/include/faiss_wrapper.h +++ b/jni/include/faiss_wrapper.h @@ -24,6 +24,8 @@ namespace knn_jni { void WriteIndex(knn_jni::JNIUtilInterface *jniUtil, JNIEnv *env, jstring indexPathJ, jlong indexAddr, IndexService *indexService); + void WriteIndexWithStream(knn_jni::JNIUtilInterface *jniUtil, JNIEnv *env, jobject outputStream, jlong indexAddr, IndexService *indexService); + // Create an index with ids and vectors. Instead of creating a new index, this function creates the index // based off of the template index passed in. The index is serialized to indexPathJ. void CreateIndexFromTemplate(knn_jni::JNIUtilInterface * jniUtil, JNIEnv * env, jintArray idsJ, @@ -47,11 +49,15 @@ namespace knn_jni { // Return a pointer to the loaded index jlong LoadIndex(knn_jni::JNIUtilInterface * jniUtil, JNIEnv * env, jstring indexPathJ); + jlong LoadIndexWithStream(knn_jni::JNIUtilInterface * jniUtil, JNIEnv * env, faiss::IOReader* ioReader); + // Load a binary index from indexPathJ into memory. // // Return a pointer to the loaded index jlong LoadBinaryIndex(knn_jni::JNIUtilInterface * jniUtil, JNIEnv * env, jstring indexPathJ); + jlong LoadBinaryIndexWithStream(knn_jni::JNIUtilInterface * jniUtil, JNIEnv * env, faiss::IOReader* ioReader); + // Check if a loaded index requires shared state bool IsSharedIndexStateRequired(jlong indexPointerJ); diff --git a/jni/include/org_opensearch_knn_jni_FaissService.h b/jni/include/org_opensearch_knn_jni_FaissService.h index d42ce197c8..ac03d517c0 100644 --- a/jni/include/org_opensearch_knn_jni_FaissService.h +++ b/jni/include/org_opensearch_knn_jni_FaissService.h @@ -78,6 +78,18 @@ JNIEXPORT void JNICALL Java_org_opensearch_knn_jni_FaissService_insertToByteInde JNIEXPORT void JNICALL Java_org_opensearch_knn_jni_FaissService_writeIndex(JNIEnv * env, jclass cls, jlong indexAddress, jstring indexPathJ); + +/* + * Class: org_opensearch_knn_jni_FaissService + * Method: writeIndexWithStream + * Signature: (JLorg/opensearch/knn/index/util/IndexOutputWithBuffer;)V + */ +JNIEXPORT void JNICALL Java_org_opensearch_knn_jni_FaissService_writeIndexWithStream(JNIEnv * env, + jclass cls, + jlong indexAddress, + jobject outputStream); + + /* * Class: org_opensearch_knn_jni_FaissService * Method: writeBinaryIndex @@ -87,6 +99,17 @@ JNIEXPORT void JNICALL Java_org_opensearch_knn_jni_FaissService_writeBinaryIndex jlong indexAddress, jstring indexPathJ); +/* + * Class: org_opensearch_knn_jni_FaissService + * Method: writeBinaryIndexWithStream + * Signature: (JLorg/opensearch/knn/index/util/IndexOutputWithBuffer;)V + */ +JNIEXPORT void JNICALL Java_org_opensearch_knn_jni_FaissService_writeBinaryIndexWithStream(JNIEnv * env, + jclass cls, + jlong indexAddress, + jobject outputStream); + + /* * Class: org_opensearch_knn_jni_FaissService * Method: writeByteIndex @@ -96,6 +119,17 @@ JNIEXPORT void JNICALL Java_org_opensearch_knn_jni_FaissService_writeByteIndex(J jlong indexAddress, jstring indexPathJ); +/* + * Class: org_opensearch_knn_jni_FaissService + * Method: writeByteIndexWithStream + * Signature: (JLorg/opensearch/knn/index/util/IndexOutputWithBuffer;)V + */ +JNIEXPORT void JNICALL Java_org_opensearch_knn_jni_FaissService_writeByteIndexWithStream(JNIEnv * env, + jclass cls, + jlong indexAddress, + jobject outputStream); + + /* * Class: org_opensearch_knn_jni_FaissService * Method: createIndexFromTemplate @@ -128,6 +162,14 @@ JNIEXPORT void JNICALL Java_org_opensearch_knn_jni_FaissService_createIndexFromT JNIEXPORT jlong JNICALL Java_org_opensearch_knn_jni_FaissService_loadIndex (JNIEnv *, jclass, jstring); +/* + * Class: org_opensearch_knn_jni_FaissService + * Method: loadIndexWithStream + * Signature: (Lorg/opensearch/knn/index/util/IndexInputWithBuffer;)J + */ +JNIEXPORT jlong JNICALL Java_org_opensearch_knn_jni_FaissService_loadIndexWithStream + (JNIEnv *, jclass, jobject); + /* * Class: org_opensearch_knn_jni_FaissService * Method: loadBinaryIndex @@ -136,6 +178,14 @@ JNIEXPORT jlong JNICALL Java_org_opensearch_knn_jni_FaissService_loadIndex JNIEXPORT jlong JNICALL Java_org_opensearch_knn_jni_FaissService_loadBinaryIndex (JNIEnv *, jclass, jstring); +/* + * Class: org_opensearch_knn_jni_FaissService + * Method: loadBinaryIndexWithStream + * Signature: (Lorg/opensearch/knn/index/util/IndexInputWithBuffer;)J + */ +JNIEXPORT jlong JNICALL Java_org_opensearch_knn_jni_FaissService_loadBinaryIndexWithStream + (JNIEnv *, jclass, jobject); + /* * Class: org_opensearch_knn_jni_FaissService * Method: isSharedIndexStateRequired diff --git a/jni/include/stream_support.h b/jni/include/stream_support.h new file mode 100644 index 0000000000..8c83c178a0 --- /dev/null +++ b/jni/include/stream_support.h @@ -0,0 +1,243 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +#ifndef OPENSEARCH_KNN_JNI_STREAM_SUPPORT_H +#define OPENSEARCH_KNN_JNI_STREAM_SUPPORT_H + +#include "faiss/impl/io.h" + +#include +#include +#include + +namespace knn_jni { namespace stream { + +class NativeEngineIndexInputMediator { + public: + // Expect BufferedIndexInput is given as `_indexInput`. + NativeEngineIndexInputMediator(JNIEnv * _env, jobject _indexInput) + : env(_env), + indexInput(_indexInput), + bufferArray((jbyteArray) (_env->GetObjectField(_indexInput, getBufferFieldId(_env)))), + copyBytesMethod(getCopyBytesMethod(_env)) { + } + + void copyBytes(int32_t nbytes, uint8_t* destination) { + while (nbytes > 0) { + // Call `copyBytes` to read bytes as many as possible. + const auto readBytes = + env->CallIntMethod(indexInput, copyBytesMethod, nbytes); + + // === Critical Section Start === + + // Get primitive array pointer, no copy is happening in OpenJDK. + jbyte* primitiveArray = + (jbyte*) env->GetPrimitiveArrayCritical(bufferArray, NULL); + + // Copy Java bytes to C++ destination address. + std::memcpy(destination, primitiveArray, readBytes); + + // Release the acquired primitive array pointer. + // JNI_ABORT tells JVM to directly free memory without copying back to Java byte[]. + // Since we're merely copying data, we don't need to copying back. + env->ReleasePrimitiveArrayCritical(bufferArray, primitiveArray, JNI_ABORT); + + // === Critical Section End === + + destination += readBytes; + nbytes -= readBytes; + } // End while + } + + private: + static jclass getIndexInputWithBufferClass(JNIEnv * env) { + static jclass INDEX_INPUT_WITH_BUFFER_CLASS = + env->FindClass("org/opensearch/knn/index/util/IndexInputWithBuffer"); + return INDEX_INPUT_WITH_BUFFER_CLASS; + } + + static jmethodID getCopyBytesMethod(JNIEnv *env) { + static jmethodID COPY_METHOD_ID = + env->GetMethodID(getIndexInputWithBufferClass(env), "copyBytes", "(J)I"); + return COPY_METHOD_ID; + } + + static jfieldID getBufferFieldId(JNIEnv *env) { + static jfieldID BUFFER_FIELD_ID = env->GetFieldID(getIndexInputWithBufferClass(env), "buffer", "[B"); + return BUFFER_FIELD_ID; + } + + JNIEnv * env; + + // `IndexInputWithBuffer` instance having `IndexInput` instance obtained from `Directory` for reading. + jobject indexInput; + jbyteArray bufferArray; + jmethodID copyBytesMethod; +}; // class NativeEngineIndexInputMediator + + + +class FaissOpenSearchIOReader final : public faiss::IOReader { +public: + explicit FaissOpenSearchIOReader(NativeEngineIndexInputMediator* _mediator) + : faiss::IOReader(), + mediator(_mediator) { + name = "FaissOpenSearchIOReader"; + } + + size_t operator()(void* ptr, size_t size, size_t nitems) final { + const auto readBytes = size * nitems; + if (readBytes > 0) { + // Mediator calls IndexInput, then copy read bytes to `ptr`. + mediator->copyBytes(readBytes, (uint8_t *) ptr); + } + return nitems; + } + + int filedescriptor() final { + throw std::runtime_error("filedescriptor() is not supported in FaissOpenSearchIOReader."); + } + +private: + NativeEngineIndexInputMediator* mediator; +}; // class FaissOpenSearchIOReader + + + +class NativeEngineIndexOutputMediator { + public: + NativeEngineIndexOutputMediator(JNIEnv * _env, jobject _indexOutput, int32_t _bufferSize = 4 * 1024) + : env(_env), + indexOutput(_indexOutput), + bufferSize(_bufferSize), + buffer(new uint8_t[_bufferSize]), + offset(), + javaBuffer((jbyteArray) (_env->GetObjectField(_indexOutput, getBufferFieldId(_env)))), + javaBufferLength((int32_t) _env->GetArrayLength(javaBuffer)) { + } + + ~NativeEngineIndexOutputMediator() { + if (offset > 0) { + flush(); + } + } + + void writeBytes(uint8_t* data, size_t nbytes) { + auto left = (int32_t) nbytes; + std::cout << "@@@ [KDY] write " << nbytes << std::endl; + while (left > 0) { + const auto copyBytes = std::min(left, bufferSize - offset); + std::memcpy(buffer.get() + offset, data, copyBytes); + offset += copyBytes; + data += copyBytes; + left -= copyBytes; + + if (offset >= bufferSize) { + flush(); + } + } + } + + private: + static jclass getIndexOutputWithBufferClass(JNIEnv *env) { + static jclass INDEX_OUTPUT_WITH_BUFFER_CLASS = + env->FindClass("org/opensearch/knn/index/util/IndexOutputWithBuffer"); + return INDEX_OUTPUT_WITH_BUFFER_CLASS; + } + + static jmethodID getWriteBytesMethod(JNIEnv *env) { + static jmethodID WRITE_METHOD_ID = + env->GetMethodID(getIndexOutputWithBufferClass(env), "writeBytes", "(I)V"); + return WRITE_METHOD_ID; + } + + static jfieldID getBufferFieldId(JNIEnv *env) { + static jfieldID BUFFER_FIELD_ID = env->GetFieldID(getIndexOutputWithBufferClass(env), "buffer", "[B"); + return BUFFER_FIELD_ID; + } + + void flush() { + std::cout << "@@@ [KDY] flush " << offset << std::endl; + // `writeBytes` method defined. + jmethodID writeBytesMethod = getWriteBytesMethod(env); + + // Start copy C++ bytes into Java byte[]. + auto left = offset; + int32_t copiedIndex = 0; + while (left > 0) { + const auto copyBytes = std::min(left, javaBufferLength); + + // === Critical Section Start === + jbyte* primitiveArray = + (jbyte*) env->GetPrimitiveArrayCritical(javaBuffer, NULL); + + // Copy C++ bytes to Java buffer. + std::memcpy(primitiveArray, buffer.get() + copiedIndex, copyBytes); + + // Release the acquired primitive array pointer. + // JNI_ABORT tells JVM to directly free memory without copying back to Java byte[]. + // Since we're merely copying data, we don't need to copying back. + env->ReleasePrimitiveArrayCritical(javaBuffer, primitiveArray, JNI_ABORT); + // === Critical Section End === + + // Indirectly call internal stream to process copied bytes. + // Expect it to everntually trigger Lucene's IndexOutput to flush bytes. + env->CallVoidMethod(indexOutput, writeBytesMethod, copyBytes); + + left -= copyBytes; + copiedIndex += copyBytes; + } + + offset = 0; + } + + JNIEnv * env; + jobject indexOutput; + int32_t bufferSize; + std::unique_ptr buffer; + int32_t offset; + jbyteArray javaBuffer; + int32_t javaBufferLength; +}; // NativeEngineIndexOutputMediator + + + +class FaissOpenSearchIOWriter final : public faiss::IOWriter { +public: + explicit FaissOpenSearchIOWriter(NativeEngineIndexOutputMediator* _mediator) + : faiss::IOWriter(), + mediator(_mediator) { + name = "FaissOpenSearchIOWriter"; + } + + size_t operator()(const void* ptr, size_t size, size_t nitems) final { + const auto writeBytes = size * nitems; + if (writeBytes > 0) { + // Mediator calls IndexOutput, then flush bytes in its implementation. + mediator->writeBytes((uint8_t*) ptr, writeBytes); + } + return nitems; + } + + int filedescriptor() final { + throw std::runtime_error("filedescriptor() is not supported in FaissOpenSearchIOWriter."); + } + +private: + NativeEngineIndexOutputMediator* mediator; +}; // FaissOpenSearchIOWriter + + + +} +} + +#endif //OPENSEARCH_KNN_JNI_STREAM_SUPPORT_H diff --git a/jni/src/faiss_index_service.cpp b/jni/src/faiss_index_service.cpp index 16ded4bcbf..410a1c3103 100644 --- a/jni/src/faiss_index_service.cpp +++ b/jni/src/faiss_index_service.cpp @@ -18,6 +18,7 @@ #include "faiss/IndexBinaryIVF.h" #include "faiss/IndexIDMap.h" #include "faiss/index_io.h" +#include "stream_support.h" #include #include #include @@ -156,6 +157,20 @@ void IndexService::writeIndex( } } +void IndexService::writeIndexWithStream(JNIEnv *env, jobject outputStream, jlong idMapAddress) { + std::unique_ptr idMap (reinterpret_cast (idMapAddress)); + + stream::NativeEngineIndexOutputMediator mediator {env, outputStream}; + stream::FaissOpenSearchIOWriter writer {&mediator}; + + try { + // Write the index to disk + faissMethods->writeIndexWithWriter(idMap.get(), &writer); + } catch(const std::exception &e) { + throw std::runtime_error("Failed to write index to disk"); + } +} + BinaryIndexService::BinaryIndexService(std::unique_ptr faissMethods) : IndexService(std::move(faissMethods)) {} void BinaryIndexService::allocIndex(faiss::Index * index, size_t dim, size_t numVectors) { @@ -250,6 +265,19 @@ void BinaryIndexService::writeIndex( } } +void BinaryIndexService::writeIndexWithStream(JNIEnv *env, jobject outputStream, jlong idMapAddress) { + std::unique_ptr idMap (reinterpret_cast (idMapAddress)); + + stream::NativeEngineIndexOutputMediator mediator {env, outputStream}; + stream::FaissOpenSearchIOWriter writer {&mediator}; + + try { + faissMethods->writeIndexBinaryWithWriter(idMap.get(), &writer); + } catch(const std::exception &e) { + throw std::runtime_error("Failed to write index to disk"); + } +} + ByteIndexService::ByteIndexService(std::unique_ptr faissMethods) : IndexService(std::move(faissMethods)) {} void ByteIndexService::allocIndex(faiss::Index * index, size_t dim, size_t numVectors) { @@ -363,5 +391,19 @@ void ByteIndexService::writeIndex( throw std::runtime_error("Failed to write index to disk"); } } + +void ByteIndexService::writeIndexWithStream(JNIEnv *env, jobject outputStream, jlong idMapAddress) { + std::unique_ptr idMap (reinterpret_cast (idMapAddress)); + + stream::NativeEngineIndexOutputMediator mediator {env, outputStream}; + stream::FaissOpenSearchIOWriter writer {&mediator}; + + try { + faissMethods->writeIndexWithWriter(idMap.get(), &writer); + } catch(const std::exception &e) { + throw std::runtime_error("Failed to write index to disk"); + } +} + } // namespace faiss_wrapper } // namesapce knn_jni \ No newline at end of file diff --git a/jni/src/faiss_methods.cpp b/jni/src/faiss_methods.cpp index 05c8f459ae..95ffa9af45 100644 --- a/jni/src/faiss_methods.cpp +++ b/jni/src/faiss_methods.cpp @@ -32,9 +32,18 @@ faiss::IndexIDMapTemplate* FaissMethods::indexBinaryIdMap(fa void FaissMethods::writeIndex(const faiss::Index* idx, const char* fname) { faiss::write_index(idx, fname); } + void FaissMethods::writeIndexBinary(const faiss::IndexBinary* idx, const char* fname) { faiss::write_index_binary(idx, fname); } +void FaissMethods::writeIndexWithWriter(const faiss::Index* idx, faiss::IOWriter* writer) { + faiss::write_index(idx, writer); +} + +void FaissMethods::writeIndexBinaryWithWriter(const faiss::IndexBinary* idx, faiss::IOWriter* writer) { + faiss::write_index_binary(idx, writer); +} + } // namespace faiss_wrapper } // namesapce knn_jni diff --git a/jni/src/faiss_wrapper.cpp b/jni/src/faiss_wrapper.cpp index ba15c3ce7f..4db6a62ee0 100644 --- a/jni/src/faiss_wrapper.cpp +++ b/jni/src/faiss_wrapper.cpp @@ -183,6 +183,18 @@ void knn_jni::faiss_wrapper::WriteIndex(knn_jni::JNIUtilInterface * jniUtil, JNI indexService->writeIndex(indexPathCpp, index_ptr); } +void knn_jni::faiss_wrapper::WriteIndexWithStream(knn_jni::JNIUtilInterface * jniUtil, + JNIEnv * env, + jobject outputStream, + jlong index_ptr, + IndexService* indexService) { + if (outputStream == nullptr) { + throw std::runtime_error("Output stream cannot be null"); + } + + indexService->writeIndexWithStream(env, outputStream, index_ptr); +} + void knn_jni::faiss_wrapper::CreateIndexFromTemplate(knn_jni::JNIUtilInterface * jniUtil, JNIEnv * env, jintArray idsJ, jlong vectorsAddressJ, jint dimJ, jstring indexPathJ, jbyteArray templateIndexJ, jobject parametersJ) { @@ -423,6 +435,20 @@ jlong knn_jni::faiss_wrapper::LoadIndex(knn_jni::JNIUtilInterface * jniUtil, JNI return (jlong) indexReader; } +jlong knn_jni::faiss_wrapper::LoadIndexWithStream(knn_jni::JNIUtilInterface * jniUtil, JNIEnv * env, faiss::IOReader* ioReader) { + if (ioReader == nullptr) [[unlikely]] { + throw std::runtime_error("IOReader cannot be null"); + } + + faiss::Index* indexReader = + faiss::read_index(ioReader, + faiss::IO_FLAG_READ_ONLY + | faiss::IO_FLAG_PQ_SKIP_SDC_TABLE + | faiss::IO_FLAG_SKIP_PRECOMPUTE_TABLE); + + return (jlong) indexReader; +} + jlong knn_jni::faiss_wrapper::LoadBinaryIndex(knn_jni::JNIUtilInterface * jniUtil, JNIEnv * env, jstring indexPathJ) { if (indexPathJ == nullptr) { throw std::runtime_error("Index path cannot be null"); @@ -436,6 +462,20 @@ jlong knn_jni::faiss_wrapper::LoadBinaryIndex(knn_jni::JNIUtilInterface * jniUti return (jlong) indexReader; } +jlong knn_jni::faiss_wrapper::LoadBinaryIndexWithStream(knn_jni::JNIUtilInterface * jniUtil, JNIEnv * env, faiss::IOReader* ioReader) { + if (ioReader == nullptr) [[unlikely]] { + throw std::runtime_error("IOReader cannot be null"); + } + + faiss::IndexBinary* indexReader = + faiss::read_index_binary(ioReader, + faiss::IO_FLAG_READ_ONLY + | faiss::IO_FLAG_PQ_SKIP_SDC_TABLE + | faiss::IO_FLAG_SKIP_PRECOMPUTE_TABLE); + + return (jlong) indexReader; +} + bool knn_jni::faiss_wrapper::IsSharedIndexStateRequired(jlong indexPointerJ) { auto * index = reinterpret_cast(indexPointerJ); return isIndexIVFPQL2(index); diff --git a/jni/src/org_opensearch_knn_jni_FaissService.cpp b/jni/src/org_opensearch_knn_jni_FaissService.cpp index 70c986b7d0..d45d51ea18 100644 --- a/jni/src/org_opensearch_knn_jni_FaissService.cpp +++ b/jni/src/org_opensearch_knn_jni_FaissService.cpp @@ -17,6 +17,7 @@ #include "faiss_wrapper.h" #include "jni_util.h" +#include "stream_support.h" static knn_jni::JNIUtil jniUtil; static const jint KNN_FAISS_JNI_VERSION = JNI_VERSION_1_1; @@ -136,6 +137,20 @@ JNIEXPORT void JNICALL Java_org_opensearch_knn_jni_FaissService_writeIndex(JNIEn } } +JNIEXPORT void JNICALL Java_org_opensearch_knn_jni_FaissService_writeIndexWithStream(JNIEnv * env, + jclass cls, + jlong indexAddress, + jobject outputStream) +{ + try { + std::unique_ptr faissMethods(new knn_jni::faiss_wrapper::FaissMethods()); + knn_jni::faiss_wrapper::IndexService indexService(std::move(faissMethods)); + knn_jni::faiss_wrapper::WriteIndexWithStream(&jniUtil, env, outputStream, indexAddress, &indexService); + } catch (...) { + jniUtil.CatchCppExceptionAndThrowJava(env); + } +} + JNIEXPORT void JNICALL Java_org_opensearch_knn_jni_FaissService_writeBinaryIndex(JNIEnv * env, jclass cls, jlong indexAddress, jstring indexPathJ) @@ -149,6 +164,20 @@ JNIEXPORT void JNICALL Java_org_opensearch_knn_jni_FaissService_writeBinaryIndex } } +JNIEXPORT void JNICALL Java_org_opensearch_knn_jni_FaissService_writeBinaryIndexWithStream(JNIEnv * env, + jclass cls, + jlong indexAddress, + jobject outputStream) +{ + try { + std::unique_ptr faissMethods(new knn_jni::faiss_wrapper::FaissMethods()); + knn_jni::faiss_wrapper::BinaryIndexService binaryIndexService(std::move(faissMethods)); + knn_jni::faiss_wrapper::WriteIndexWithStream(&jniUtil, env, outputStream, indexAddress, &binaryIndexService); + } catch (...) { + jniUtil.CatchCppExceptionAndThrowJava(env); + } +} + JNIEXPORT void JNICALL Java_org_opensearch_knn_jni_FaissService_writeByteIndex(JNIEnv * env, jclass cls, jlong indexAddress, jstring indexPathJ) @@ -162,6 +191,21 @@ JNIEXPORT void JNICALL Java_org_opensearch_knn_jni_FaissService_writeByteIndex(J } } +JNIEXPORT void JNICALL Java_org_opensearch_knn_jni_FaissService_writeByteIndexWithStream(JNIEnv * env, + jclass cls, + jlong indexAddress, + jobject outputStream) +{ + try { + std::unique_ptr faissMethods(new knn_jni::faiss_wrapper::FaissMethods()); + knn_jni::faiss_wrapper::ByteIndexService byteIndexService(std::move(faissMethods)); + knn_jni::faiss_wrapper::WriteIndexWithStream(&jniUtil, env, outputStream, indexAddress, &byteIndexService); + } catch (...) { + jniUtil.CatchCppExceptionAndThrowJava(env); + } +} + + JNIEXPORT void JNICALL Java_org_opensearch_knn_jni_FaissService_createIndexFromTemplate(JNIEnv * env, jclass cls, jintArray idsJ, jlong vectorsAddressJ, @@ -217,6 +261,29 @@ JNIEXPORT jlong JNICALL Java_org_opensearch_knn_jni_FaissService_loadIndex(JNIEn return NULL; } +JNIEXPORT jlong JNICALL Java_org_opensearch_knn_jni_FaissService_loadIndexWithStream + (JNIEnv * env, jclass cls, jobject readStream) +{ + try { + // Create a mediator locally. + // Note that `indexInput` is `BufferedIndexInput` type. + knn_jni::stream::NativeEngineIndexInputMediator mediator {env, readStream}; + + // Wrap the mediator with a glue code inheriting IOReader. + knn_jni::stream::FaissOpenSearchIOReader faissOpenSearchIOReader {&mediator}; + + // Pass IOReader to Faiss for loading vector index. + return knn_jni::faiss_wrapper::LoadIndexWithStream( + &jniUtil, + env, + &faissOpenSearchIOReader); + } catch (...) { + jniUtil.CatchCppExceptionAndThrowJava(env); + } + + return NULL; +} + JNIEXPORT jlong JNICALL Java_org_opensearch_knn_jni_FaissService_loadBinaryIndex(JNIEnv * env, jclass cls, jstring indexPathJ) { try { @@ -227,6 +294,29 @@ JNIEXPORT jlong JNICALL Java_org_opensearch_knn_jni_FaissService_loadBinaryIndex return NULL; } +JNIEXPORT jlong JNICALL Java_org_opensearch_knn_jni_FaissService_loadBinaryIndexWithStream + (JNIEnv * env, jclass cls, jobject readStream) +{ + try { + // Create a mediator locally. + // Note that `indexInput` is `BufferedIndexInput` type. + knn_jni::stream::NativeEngineIndexInputMediator mediator {env, readStream}; + + // Wrap the mediator with a glue code inheriting IOReader. + knn_jni::stream::FaissOpenSearchIOReader faissOpenSearchIOReader {&mediator}; + + // Pass IOReader to Faiss for loading vector index. + return knn_jni::faiss_wrapper::LoadBinaryIndexWithStream( + &jniUtil, + env, + &faissOpenSearchIOReader); + } catch (...) { + jniUtil.CatchCppExceptionAndThrowJava(env); + } + + return NULL; +} + JNIEXPORT jboolean JNICALL Java_org_opensearch_knn_jni_FaissService_isSharedIndexStateRequired (JNIEnv * env, jclass cls, jlong indexPointerJ) { diff --git a/src/main/java/org/opensearch/knn/index/KNNIndexShard.java b/src/main/java/org/opensearch/knn/index/KNNIndexShard.java index 5c096e4f7a..3607f14e85 100644 --- a/src/main/java/org/opensearch/knn/index/KNNIndexShard.java +++ b/src/main/java/org/opensearch/knn/index/KNNIndexShard.java @@ -13,6 +13,7 @@ import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.SegmentReader; +import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; import org.apache.lucene.store.FilterDirectory; import org.opensearch.common.lucene.Lucene; @@ -89,11 +90,13 @@ public String getIndexName() { */ public void warmup() throws IOException { log.info("[KNN] Warming up index: [{}]", getIndexName()); + Directory directory = indexShard.store().directory(); try (Engine.Searcher searcher = indexShard.acquireSearcher("knn-warmup")) { getAllEngineFileContexts(searcher.getIndexReader()).forEach((engineFileContext) -> { try { nativeMemoryCacheManager.get( new NativeMemoryEntryContext.IndexEntryContext( + directory, engineFileContext.getIndexPath(), NativeMemoryLoadStrategy.IndexLoadStrategy.getInstance(), getParametersAtLoading( diff --git a/src/main/java/org/opensearch/knn/index/codec/nativeindex/DefaultIndexBuildStrategy.java b/src/main/java/org/opensearch/knn/index/codec/nativeindex/DefaultIndexBuildStrategy.java index e68121a7db..0e44a4e9ee 100644 --- a/src/main/java/org/opensearch/knn/index/codec/nativeindex/DefaultIndexBuildStrategy.java +++ b/src/main/java/org/opensearch/knn/index/codec/nativeindex/DefaultIndexBuildStrategy.java @@ -108,4 +108,19 @@ public void buildAndWriteIndex(final BuildIndexParams indexInfo) throws IOExcept ); } } + + /** + * Builds and writes a k-NN index using the provided vector values and index parameters. This method handles both + * quantized and non-quantized vectors, transferring them off-heap before building the index using native JNI services. + * + *

The method first iterates over the vector values to calculate the necessary bytes per vector. If quantization is + * enabled, the vectors are quantized before being transferred off-heap. Once all vectors are transferred, they are + * flushed and used to build the index. The index is then written to the specified path using JNI calls.

+ * + * @param indexInfo The {@link BuildIndexParams} containing the parameters and configuration for building the index. + * @throws IOException If an I/O error occurs during the process of building and writing the index. + */ + public void buildAndWriteIndexV2(final BuildIndexParams indexInfo) throws IOException { + buildAndWriteIndex(indexInfo); + } } diff --git a/src/main/java/org/opensearch/knn/index/codec/nativeindex/MemOptimizedNativeIndexBuildStrategy.java b/src/main/java/org/opensearch/knn/index/codec/nativeindex/MemOptimizedNativeIndexBuildStrategy.java index af3f4777f4..8c514a0ea4 100644 --- a/src/main/java/org/opensearch/knn/index/codec/nativeindex/MemOptimizedNativeIndexBuildStrategy.java +++ b/src/main/java/org/opensearch/knn/index/codec/nativeindex/MemOptimizedNativeIndexBuildStrategy.java @@ -130,4 +130,93 @@ public void buildAndWriteIndex(final BuildIndexParams indexInfo) throws IOExcept ); } } + + /** + * Builds and writes a k-NN index using the provided vector values and index parameters. This method handles both + * quantized and non-quantized vectors, transferring them off-heap before building the index using native JNI services. + * + *

The method first iterates over the vector values to calculate the necessary bytes per vector. If quantization is + * enabled, the vectors are quantized before being transferred off-heap. Once all vectors are transferred, they are + * flushed and used to build the index. The index is then written to the specified path using JNI calls.

+ * + * @param indexInfo The {@link BuildIndexParams} containing the parameters and configuration for building the index. + * @throws IOException If an I/O error occurs during the process of building and writing the index. + */ + public void buildAndWriteIndexV2(final BuildIndexParams indexInfo) throws IOException { + final KNNVectorValues knnVectorValues = indexInfo.getVectorValues(); + // Needed to make sure we don't get 0 dimensions while initializing index + iterateVectorValuesOnce(knnVectorValues); + KNNEngine engine = indexInfo.getKnnEngine(); + Map indexParameters = indexInfo.getParameters(); + IndexBuildSetup indexBuildSetup = QuantizationIndexUtils.prepareIndexBuild(knnVectorValues, indexInfo); + + // Initialize the index + long indexMemoryAddress = AccessController.doPrivileged( + (PrivilegedAction) () -> JNIService.initIndex( + indexInfo.getTotalLiveDocs(), + indexBuildSetup.getDimensions(), + indexParameters, + engine + ) + ); + + int transferLimit = (int) Math.max(1, KNNSettings.getVectorStreamingMemoryLimit().getBytes() / indexBuildSetup.getBytesPerVector()); + try (final OffHeapVectorTransfer vectorTransfer = getVectorTransfer(indexInfo.getVectorDataType(), transferLimit)) { + + final List transferredDocIds = new ArrayList<>(transferLimit); + + while (knnVectorValues.docId() != NO_MORE_DOCS) { + Object vector = QuantizationIndexUtils.processAndReturnVector(knnVectorValues, indexBuildSetup); + // append is false to be able to reuse the memory location + boolean transferred = vectorTransfer.transfer(vector, false); + transferredDocIds.add(knnVectorValues.docId()); + if (transferred) { + // Insert vectors + long vectorAddress = vectorTransfer.getVectorAddress(); + AccessController.doPrivileged((PrivilegedAction) () -> { + JNIService.insertToIndex( + intListToArray(transferredDocIds), + vectorAddress, + indexBuildSetup.getDimensions(), + indexParameters, + indexMemoryAddress, + engine + ); + return null; + }); + transferredDocIds.clear(); + } + knnVectorValues.nextDoc(); + } + + boolean flush = vectorTransfer.flush(false); + // Need to make sure that the flushed vectors are indexed + if (flush) { + long vectorAddress = vectorTransfer.getVectorAddress(); + AccessController.doPrivileged((PrivilegedAction) () -> { + JNIService.insertToIndex( + intListToArray(transferredDocIds), + vectorAddress, + indexBuildSetup.getDimensions(), + indexParameters, + indexMemoryAddress, + engine + ); + return null; + }); + transferredDocIds.clear(); + } + + // Write vector + AccessController.doPrivileged((PrivilegedAction) () -> { + JNIService.writeIndex(indexInfo.getIndexOutputWithBuffer(), indexMemoryAddress, engine, indexParameters); + return null; + }); + } catch (Exception exception) { + throw new RuntimeException( + "Failed to build index, field name [" + indexInfo.getFieldName() + "], parameters " + indexInfo, + exception + ); + } + } } diff --git a/src/main/java/org/opensearch/knn/index/codec/nativeindex/NativeIndexBuildStrategy.java b/src/main/java/org/opensearch/knn/index/codec/nativeindex/NativeIndexBuildStrategy.java index 8c9f6de971..bb012461c4 100644 --- a/src/main/java/org/opensearch/knn/index/codec/nativeindex/NativeIndexBuildStrategy.java +++ b/src/main/java/org/opensearch/knn/index/codec/nativeindex/NativeIndexBuildStrategy.java @@ -15,4 +15,6 @@ public interface NativeIndexBuildStrategy { void buildAndWriteIndex(BuildIndexParams indexInfo) throws IOException; + + void buildAndWriteIndexV2(BuildIndexParams indexInfo) throws IOException; } diff --git a/src/main/java/org/opensearch/knn/index/codec/nativeindex/NativeIndexWriter.java b/src/main/java/org/opensearch/knn/index/codec/nativeindex/NativeIndexWriter.java index edc96c9e14..6ddcb4a653 100644 --- a/src/main/java/org/opensearch/knn/index/codec/nativeindex/NativeIndexWriter.java +++ b/src/main/java/org/opensearch/knn/index/codec/nativeindex/NativeIndexWriter.java @@ -7,11 +7,13 @@ import lombok.AllArgsConstructor; import lombok.extern.log4j.Log4j2; +import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.index.FieldInfo; import org.apache.lucene.index.SegmentWriteState; import org.apache.lucene.store.ChecksumIndexInput; import org.apache.lucene.store.FSDirectory; import org.apache.lucene.store.FilterDirectory; +import org.apache.lucene.store.IndexOutput; import org.opensearch.common.Nullable; import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.core.common.bytes.BytesArray; @@ -27,6 +29,7 @@ import org.opensearch.knn.index.engine.KNNEngine; import org.opensearch.knn.index.engine.qframe.QuantizationConfig; import org.opensearch.knn.index.quantizationservice.QuantizationService; +import org.opensearch.knn.index.util.IndexOutputWithBuffer; import org.opensearch.knn.index.util.IndexUtil; import org.opensearch.knn.index.vectorvalues.KNNVectorValues; import org.opensearch.knn.indices.Model; @@ -133,11 +136,17 @@ public void mergeIndex(final KNNVectorValues knnVectorValues, int totalLiveDo private void buildAndWriteIndex(final KNNVectorValues knnVectorValues, int totalLiveDocs) throws IOException { if (totalLiveDocs == 0) { - log.debug("No live docs for field " + fieldInfo.name); + log.debug("No live docs for field {}", fieldInfo.name); + return; + } + + if (indexBuilder instanceof MemOptimizedNativeIndexBuildStrategy) { + buildAndWriteIndexV2(knnVectorValues, totalLiveDocs); return; } final KNNEngine knnEngine = extractKNNEngine(fieldInfo); + final String engineFileName = buildEngineFileName( state.segmentInfo.name, knnEngine.getVersion(), @@ -155,6 +164,67 @@ private void buildAndWriteIndex(final KNNVectorValues knnVectorValues, int to writeFooter(indexPath, engineFileName, state); } + private void buildAndWriteIndexV2(final KNNVectorValues knnVectorValues, int totalLiveDocs) throws IOException { + final KNNEngine knnEngine = extractKNNEngine(fieldInfo); + + final String engineFileName = buildEngineFileName( + state.segmentInfo.name, + knnEngine.getVersion(), + fieldInfo.name, + knnEngine.getExtension() + ); + + try (final IndexOutput vectorIndexOutput = state.directory.createOutput(engineFileName, state.context)) { + final IndexOutputWithBuffer vectorIndexOutputWithBuffer = new IndexOutputWithBuffer(vectorIndexOutput); + + final BuildIndexParams nativeIndexParams = indexParamsV2( + fieldInfo, + vectorIndexOutputWithBuffer, + knnEngine, + knnVectorValues, + totalLiveDocs + ); + indexBuilder.buildAndWriteIndexV2(nativeIndexParams); + CodecUtil.writeFooter(vectorIndexOutput); + } + } + + // The logic for building parameters need to be cleaned up. There are various cases handled here + // Currently it falls under two categories - with model and without model. Without model is further divided based on vector data type + // TODO: Refactor this so its scalable. Possibly move it out of this class + private BuildIndexParams indexParamsV2( + FieldInfo fieldInfo, + IndexOutputWithBuffer indexOutputWithBuffer, + KNNEngine knnEngine, + KNNVectorValues vectorValues, + int totalLiveDocs + ) throws IOException { + final Map parameters; + VectorDataType vectorDataType; + if (quantizationState != null) { + vectorDataType = QuantizationService.getInstance().getVectorDataTypeForTransfer(fieldInfo); + } else { + vectorDataType = extractVectorDataType(fieldInfo); + } + if (fieldInfo.attributes().containsKey(MODEL_ID)) { + Model model = getModel(fieldInfo); + parameters = getTemplateParameters(fieldInfo, model); + } else { + parameters = getParameters(fieldInfo, vectorDataType, knnEngine); + } + + return BuildIndexParams.builder() + .fieldName(fieldInfo.name) + .parameters(parameters) + .vectorDataType(vectorDataType) + .knnEngine(knnEngine) + .indexOutputWithBuffer(indexOutputWithBuffer) + .quantizationState(quantizationState) + .vectorValues(vectorValues) + .totalLiveDocs(totalLiveDocs) + .build(); + } + // The logic for building parameters need to be cleaned up. There are various cases handled here // Currently it falls under two categories - with model and without model. Without model is further divided based on vector data type // TODO: Refactor this so its scalable. Possibly move it out of this class diff --git a/src/main/java/org/opensearch/knn/index/codec/nativeindex/model/BuildIndexParams.java b/src/main/java/org/opensearch/knn/index/codec/nativeindex/model/BuildIndexParams.java index 88507b1fc3..1898a98158 100644 --- a/src/main/java/org/opensearch/knn/index/codec/nativeindex/model/BuildIndexParams.java +++ b/src/main/java/org/opensearch/knn/index/codec/nativeindex/model/BuildIndexParams.java @@ -11,6 +11,7 @@ import org.opensearch.common.Nullable; import org.opensearch.knn.index.VectorDataType; import org.opensearch.knn.index.engine.KNNEngine; +import org.opensearch.knn.index.util.IndexOutputWithBuffer; import org.opensearch.knn.index.vectorvalues.KNNVectorValues; import org.opensearch.knn.quantization.models.quantizationState.QuantizationState; @@ -32,4 +33,5 @@ public class BuildIndexParams { QuantizationState quantizationState; KNNVectorValues vectorValues; int totalLiveDocs; + IndexOutputWithBuffer indexOutputWithBuffer; } diff --git a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryEntryContext.java b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryEntryContext.java index 2dfc5fafba..40d23e610d 100644 --- a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryEntryContext.java +++ b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryEntryContext.java @@ -11,6 +11,7 @@ package org.opensearch.knn.index.memory; +import org.apache.lucene.store.Directory; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; import org.opensearch.knn.index.util.IndexUtil; @@ -62,6 +63,7 @@ public String getKey() { public static class IndexEntryContext extends NativeMemoryEntryContext { + private final Directory directory; private final NativeMemoryLoadStrategy.IndexLoadStrategy indexLoadStrategy; private final String openSearchIndexName; private final Map parameters; @@ -77,12 +79,13 @@ public static class IndexEntryContext extends NativeMemoryEntryContext parameters, String openSearchIndexName ) { - this(indexPath, indexLoadStrategy, parameters, openSearchIndexName, null); + this(directory, indexPath, indexLoadStrategy, parameters, openSearchIndexName, null); } /** @@ -95,6 +98,7 @@ public IndexEntryContext( * @param modelId model to be loaded. If none available, pass null */ public IndexEntryContext( + Directory directory, String indexPath, NativeMemoryLoadStrategy.IndexLoadStrategy indexLoadStrategy, Map parameters, @@ -102,12 +106,17 @@ public IndexEntryContext( String modelId ) { super(indexPath); + this.directory = directory; this.indexLoadStrategy = indexLoadStrategy; this.openSearchIndexName = openSearchIndexName; this.parameters = parameters; this.modelId = modelId; } + public Directory getDirectory() { + return directory; + } + @Override public Integer calculateSizeInKB() { return IndexSizeCalculator.INSTANCE.apply(this); diff --git a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryLoadStrategy.java b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryLoadStrategy.java index 8324f23404..11a5d42911 100644 --- a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryLoadStrategy.java +++ b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryLoadStrategy.java @@ -12,7 +12,12 @@ package org.opensearch.knn.index.memory; import lombok.extern.log4j.Log4j2; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; import org.opensearch.core.action.ActionListener; +import org.opensearch.knn.index.util.IndexInputWithBuffer; import org.opensearch.knn.index.util.IndexUtil; import org.opensearch.knn.jni.JNIService; import org.opensearch.knn.index.engine.KNNEngine; @@ -86,9 +91,9 @@ public void onFileDeleted(Path indexFilePath) { }; } - @Override - public NativeMemoryAllocation.IndexAllocation load(NativeMemoryEntryContext.IndexEntryContext indexEntryContext) - throws IOException { + private NativeMemoryAllocation.IndexAllocation loadFileBasedVectorIndex( + NativeMemoryEntryContext.IndexEntryContext indexEntryContext + ) throws IOException { Path indexPath = Paths.get(indexEntryContext.getKey()); FileWatcher fileWatcher = new FileWatcher(indexPath); fileWatcher.addListener(indexFileOnDeleteListener); @@ -118,6 +123,50 @@ public NativeMemoryAllocation.IndexAllocation load(NativeMemoryEntryContext.Inde ); } + @Override + public NativeMemoryAllocation.IndexAllocation load(NativeMemoryEntryContext.IndexEntryContext indexEntryContext) + throws IOException { + Directory directory = indexEntryContext.getDirectory(); + + // Get the vector index path (note that it's logical path) + final String indexPath = indexEntryContext.getKey(); + FileWatcher fileWatcher = new FileWatcher(Path.of(indexPath)); + fileWatcher.addListener(indexFileOnDeleteListener); + fileWatcher.init(); + + KNNEngine knnEngine = KNNEngine.getEngineNameFromPath(indexPath); + + if (knnEngine != KNNEngine.FAISS) { + return loadFileBasedVectorIndex(indexEntryContext); + } + + try (IndexInput readStream = directory.openInput(indexPath, IOContext.READONCE)) { + IndexInputWithBuffer indexInputWithBuffer = new IndexInputWithBuffer(readStream); + long indexAddress = JNIService.loadIndex(indexInputWithBuffer, indexEntryContext.getParameters(), knnEngine); + + SharedIndexState sharedIndexState = null; + String modelId = indexEntryContext.getModelId(); + if (IndexUtil.isSharedIndexStateRequired(knnEngine, modelId, indexAddress)) { + log.info("Index with model: \"{}\" requires shared state. Retrieving shared state.", modelId); + sharedIndexState = SharedIndexStateManager.getInstance().get(indexAddress, modelId, knnEngine); + JNIService.setSharedIndexState(indexAddress, sharedIndexState.getSharedIndexStateAddress(), knnEngine); + } + + final WatcherHandle watcherHandle = resourceWatcherService.add(fileWatcher); + return new NativeMemoryAllocation.IndexAllocation( + executor, + indexAddress, + indexEntryContext.calculateSizeInKB(), + knnEngine, + indexPath.toString(), + indexEntryContext.getOpenSearchIndexName(), + watcherHandle, + sharedIndexState, + IndexUtil.isBinaryIndex(knnEngine, indexEntryContext.getParameters()) + ); + } + } + @Override public void close() { executor.shutdown(); diff --git a/src/main/java/org/opensearch/knn/index/query/KNNWeight.java b/src/main/java/org/opensearch/knn/index/query/KNNWeight.java index 1769328fe6..45092bd140 100644 --- a/src/main/java/org/opensearch/knn/index/query/KNNWeight.java +++ b/src/main/java/org/opensearch/knn/index/query/KNNWeight.java @@ -278,6 +278,7 @@ private Map doANNSearch( try { indexAllocation = nativeMemoryCacheManager.get( new NativeMemoryEntryContext.IndexEntryContext( + reader.directory(), indexPath.toString(), NativeMemoryLoadStrategy.IndexLoadStrategy.getInstance(), getParametersAtLoading( diff --git a/src/main/java/org/opensearch/knn/index/util/IndexInputWithBuffer.java b/src/main/java/org/opensearch/knn/index/util/IndexInputWithBuffer.java new file mode 100644 index 0000000000..b0e8cfd610 --- /dev/null +++ b/src/main/java/org/opensearch/knn/index/util/IndexInputWithBuffer.java @@ -0,0 +1,31 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.knn.index.util; + +import org.apache.lucene.store.IndexInput; + +import java.io.IOException; + +public class IndexInputWithBuffer { + public IndexInputWithBuffer(IndexInput indexInput) { + this.indexInput = indexInput; + } + + private int copyBytes(long nbytes) throws IOException { + final int readBytes = Math.min(Math.toIntExact(nbytes), buffer.length); + indexInput.readBytes(buffer, 0, readBytes); + return readBytes; + } + + private IndexInput indexInput; + // 4K buffer. + private byte[] buffer = new byte[4 * 1024]; + + @Override + public String toString() { + return "{indexInput=" + indexInput + ", len(buffer)=" + buffer.length + "}"; + } +} diff --git a/src/main/java/org/opensearch/knn/jni/FaissService.java b/src/main/java/org/opensearch/knn/jni/FaissService.java index 037171b980..0cb6c59df4 100644 --- a/src/main/java/org/opensearch/knn/jni/FaissService.java +++ b/src/main/java/org/opensearch/knn/jni/FaissService.java @@ -14,6 +14,8 @@ import org.opensearch.knn.common.KNNConstants; import org.opensearch.knn.index.query.KNNQueryResult; import org.opensearch.knn.index.engine.KNNEngine; +import org.opensearch.knn.index.util.IndexInputWithBuffer; +import org.opensearch.knn.index.util.IndexOutputWithBuffer; import java.security.AccessController; import java.security.PrivilegedAction; @@ -128,6 +130,16 @@ class FaissService { */ public static native void writeIndex(long indexAddress, String indexPath); + /** + * Writes a faiss index. + * + * NOTE: This will always free the index. Do not call free after this. + * + * @param indexAddress address of native memory where index is stored + * @param outputStream Stream instance wrapping Lucene IndexOutput with a buffer where native engine writes bytes into it. + */ + public static native void writeIndexWithStream(long indexAddress, IndexOutputWithBuffer outputStream); + /** * Writes a faiss index. * @@ -138,6 +150,16 @@ class FaissService { */ public static native void writeBinaryIndex(long indexAddress, String indexPath); + /** + * Writes a faiss index. + * + * NOTE: This will always free the index. Do not call free after this. + * + * @param indexAddress address of native memory where index is stored + * @param outputStream Stream instance wrapping Lucene IndexOutput with a buffer where native engine writes bytes into it. + */ + public static native void writeBinaryIndexWithStream(long indexAddress, IndexOutputWithBuffer outputStream); + /** * Writes a faiss index. * @@ -148,6 +170,16 @@ class FaissService { */ public static native void writeByteIndex(long indexAddress, String indexPath); + /** + * Writes a faiss index. + * + * NOTE: This will always free the index. Do not call free after this. + * + * @param indexAddress address of native memory where index is stored + * @param outputStream Stream instance wrapping Lucene IndexOutput with a buffer where native engine writes bytes into it. + */ + public static native void writeByteIndexWithStream(long indexAddress, IndexOutputWithBuffer outputStream); + /** * Create an index for the native library with a provided template index * @@ -213,6 +245,8 @@ public static native void createByteIndexFromTemplate( */ public static native long loadIndex(String indexPath); + public static native long loadIndexWithStream(IndexInputWithBuffer readStream); + /** * Load a binary index into memory * @@ -221,6 +255,8 @@ public static native void createByteIndexFromTemplate( */ public static native long loadBinaryIndex(String indexPath); + public static native long loadBinaryIndexWithStream(IndexInputWithBuffer readStream); + /** * Determine if index contains shared state. * diff --git a/src/main/java/org/opensearch/knn/jni/JNIService.java b/src/main/java/org/opensearch/knn/jni/JNIService.java index 94c1ec48ed..6378a5ea2a 100644 --- a/src/main/java/org/opensearch/knn/jni/JNIService.java +++ b/src/main/java/org/opensearch/knn/jni/JNIService.java @@ -16,6 +16,8 @@ import org.opensearch.knn.common.KNNConstants; import org.opensearch.knn.index.engine.KNNEngine; import org.opensearch.knn.index.query.KNNQueryResult; +import org.opensearch.knn.index.util.IndexInputWithBuffer; +import org.opensearch.knn.index.util.IndexOutputWithBuffer; import org.opensearch.knn.index.util.IndexUtil; import java.util.Locale; @@ -113,6 +115,36 @@ public static void writeIndex(String indexPath, long indexAddress, KNNEngine knn ); } + /** + * Writes a faiss index to disk. + * + * @param indexOutputWithBuffer Stream instance wrapping Lucene IndexOutput with a buffer where native engine writes bytes into it. + * @param indexAddress address of native memory where index is stored + * @param knnEngine knn engine + * @param parameters parameters to build index + */ + public static void writeIndex( + IndexOutputWithBuffer indexOutputWithBuffer, + long indexAddress, + KNNEngine knnEngine, + Map parameters + ) { + if (KNNEngine.FAISS == knnEngine) { + if (IndexUtil.isBinaryIndex(knnEngine, parameters)) { + FaissService.writeBinaryIndexWithStream(indexAddress, indexOutputWithBuffer); + } else if (IndexUtil.isByteIndex(parameters)) { + FaissService.writeByteIndexWithStream(indexAddress, indexOutputWithBuffer); + } else { + FaissService.writeIndexWithStream(indexAddress, indexOutputWithBuffer); + } + return; + } + + throw new IllegalArgumentException( + String.format(Locale.ROOT, "writeIndex is not supported for provided engine : %s", knnEngine.getName()) + ); + } + /** * Create an index for the native library. The memory occupied by the vectorsAddress will be freed up during the * function call. So Java layer doesn't need to free up the memory. This is not an ideal behavior because Java layer @@ -211,6 +243,24 @@ public static long loadIndex(String indexPath, Map parameters, K ); } + public static long loadIndex(IndexInputWithBuffer readStream, Map parameters, KNNEngine knnEngine) { + if (KNNEngine.NMSLIB == knnEngine) { + throw new UnsupportedOperationException(); + } + + if (KNNEngine.FAISS == knnEngine) { + if (IndexUtil.isBinaryIndex(knnEngine, parameters)) { + return FaissService.loadBinaryIndexWithStream(readStream); + } else { + return FaissService.loadIndexWithStream(readStream); + } + } + + throw new IllegalArgumentException( + String.format(Locale.ROOT, "LoadIndex not supported for provided engine : %s", knnEngine.getName()) + ); + } + /** * Determine if index contains shared state. Currently, we cannot do this in the plugin because we do not store the * model definition anywhere. Only faiss supports indices that have shared state. So for all other engines it will diff --git a/src/test/java/org/opensearch/knn/index/memory/NativeMemoryEntryContextTests.java b/src/test/java/org/opensearch/knn/index/memory/NativeMemoryEntryContextTests.java index 385572cb4a..223cb6b21a 100644 --- a/src/test/java/org/opensearch/knn/index/memory/NativeMemoryEntryContextTests.java +++ b/src/test/java/org/opensearch/knn/index/memory/NativeMemoryEntryContextTests.java @@ -43,6 +43,7 @@ public void testAbstract_getKey() { public void testIndexEntryContext_load() throws IOException { NativeMemoryLoadStrategy.IndexLoadStrategy indexLoadStrategy = mock(NativeMemoryLoadStrategy.IndexLoadStrategy.class); NativeMemoryEntryContext.IndexEntryContext indexEntryContext = new NativeMemoryEntryContext.IndexEntryContext( + null, "test", indexLoadStrategy, null, @@ -81,6 +82,7 @@ public void testIndexEntryContext_calculateSize() throws IOException { // Check that the indexEntryContext will return the same thing NativeMemoryEntryContext.IndexEntryContext indexEntryContext = new NativeMemoryEntryContext.IndexEntryContext( + null, tmpFile.toAbsolutePath().toString(), null, null, @@ -93,6 +95,7 @@ public void testIndexEntryContext_calculateSize() throws IOException { public void testIndexEntryContext_getOpenSearchIndexName() { String openSearchIndexName = "test-index"; NativeMemoryEntryContext.IndexEntryContext indexEntryContext = new NativeMemoryEntryContext.IndexEntryContext( + null, "test", null, null, @@ -105,6 +108,7 @@ public void testIndexEntryContext_getOpenSearchIndexName() { public void testIndexEntryContext_getParameters() { Map parameters = ImmutableMap.of("test-1", 10); NativeMemoryEntryContext.IndexEntryContext indexEntryContext = new NativeMemoryEntryContext.IndexEntryContext( + null, "test", null, parameters, diff --git a/src/test/java/org/opensearch/knn/index/memory/NativeMemoryLoadStrategyTests.java b/src/test/java/org/opensearch/knn/index/memory/NativeMemoryLoadStrategyTests.java index 29fbdb9785..3662931f55 100644 --- a/src/test/java/org/opensearch/knn/index/memory/NativeMemoryLoadStrategyTests.java +++ b/src/test/java/org/opensearch/knn/index/memory/NativeMemoryLoadStrategyTests.java @@ -12,6 +12,8 @@ package org.opensearch.knn.index.memory; import com.google.common.collect.ImmutableMap; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.MMapDirectory; import org.opensearch.core.action.ActionListener; import org.opensearch.action.search.SearchResponse; import org.opensearch.knn.KNNTestCase; @@ -35,8 +37,6 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -46,6 +46,7 @@ public class NativeMemoryLoadStrategyTests extends KNNTestCase { public void testIndexLoadStrategy_load() throws IOException { // Create basic nmslib HNSW index Path dir = createTempDir(); + Directory luceneDirectory = new MMapDirectory(dir); KNNEngine knnEngine = KNNEngine.NMSLIB; String indexName = "test1" + knnEngine.getExtension(); String path = dir.resolve(indexName).toAbsolutePath().toString(); @@ -67,6 +68,7 @@ public void testIndexLoadStrategy_load() throws IOException { NativeMemoryLoadStrategy.IndexLoadStrategy.initialize(resourceWatcherService); NativeMemoryEntryContext.IndexEntryContext indexEntryContext = new NativeMemoryEntryContext.IndexEntryContext( + luceneDirectory, path, NativeMemoryLoadStrategy.IndexLoadStrategy.getInstance(), parameters, @@ -86,6 +88,7 @@ public void testIndexLoadStrategy_load() throws IOException { public void testLoad_whenFaissBinary_thenSuccess() throws IOException { Path dir = createTempDir(); + Directory luceneDirectory = new MMapDirectory(dir); KNNEngine knnEngine = KNNEngine.FAISS; String indexName = "test1" + knnEngine.getExtension(); String path = dir.resolve(indexName).toAbsolutePath().toString(); @@ -115,6 +118,7 @@ public void testLoad_whenFaissBinary_thenSuccess() throws IOException { NativeMemoryLoadStrategy.IndexLoadStrategy.initialize(resourceWatcherService); NativeMemoryEntryContext.IndexEntryContext indexEntryContext = new NativeMemoryEntryContext.IndexEntryContext( + luceneDirectory, path, NativeMemoryLoadStrategy.IndexLoadStrategy.getInstance(), parameters,