From 12ddea2c3235d7036e7491e7dae990e1a86f70d8 Mon Sep 17 00:00:00 2001 From: Andrew Klepchick Date: Wed, 10 Jul 2024 12:59:28 -0700 Subject: [PATCH 01/16] Initial commit with new changes. Adds methods for iteratively creating an index. Signed-off-by: Andrew Klepchick --- jni/cmake/init-faiss.cmake-e | 69 ++++++ jni/include/faiss_index_service.h | 92 +++++++- jni/include/faiss_wrapper.h | 6 + .../org_opensearch_knn_jni_FaissService.h | 24 ++ jni/src/faiss_index_service.cpp | 210 ++++++++++++++++- jni/src/faiss_wrapper.cpp | 214 ++++++++++++++++++ .../org_opensearch_knn_jni_FaissService.cpp | 88 +++++++ .../KNN80Codec/KNN80DocValuesConsumer.java | 121 ++++++++-- .../knn/index/codec/util/KNNCodecUtil.java | 60 ++++- .../org/opensearch/knn/jni/FaissService.java | 12 + .../org/opensearch/knn/jni/JNIService.java | 40 ++++ 11 files changed, 904 insertions(+), 32 deletions(-) create mode 100644 jni/cmake/init-faiss.cmake-e diff --git a/jni/cmake/init-faiss.cmake-e b/jni/cmake/init-faiss.cmake-e new file mode 100644 index 0000000000..bef93eda00 --- /dev/null +++ b/jni/cmake/init-faiss.cmake-e @@ -0,0 +1,69 @@ +# +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# + +# Check if faiss exists +find_path(FAISS_REPO_DIR NAMES faiss PATHS ${CMAKE_CURRENT_SOURCE_DIR}/external/faiss NO_DEFAULT_PATH) + +# If not, pull the updated submodule +if (NOT EXISTS ${FAISS_REPO_DIR}) + message(STATUS "Could not find faiss. Pulling updated submodule.") + execute_process(COMMAND git submodule update --init -- external/faiss WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}) +endif () + +# Check if patch exist, this is to skip git apply during CI build. See CI.yml with ubuntu. +find_path(PATCH_FILE NAMES 0001-Custom-patch-to-support-multi-vector.patch 0002-Enable-precomp-table-to-be-shared-ivfpq.patch 0003-Custom-patch-to-support-range-search-params.patch PATHS ${CMAKE_CURRENT_SOURCE_DIR}/patches/faiss NO_DEFAULT_PATH) + +# If it exists, apply patches +if (EXISTS ${PATCH_FILE}) + message(STATUS "Applying custom patches.") + execute_process(COMMAND git ${GIT_PATCH_COMMAND} --3way --ignore-space-change --ignore-whitespace ${CMAKE_CURRENT_SOURCE_DIR}/patches/faiss/0001-Custom-patch-to-support-multi-vector.patch WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/external/faiss ERROR_VARIABLE ERROR_MSG RESULT_VARIABLE RESULT_CODE) + execute_process(COMMAND git ${GIT_PATCH_COMMAND} --3way --ignore-space-change --ignore-whitespace ${CMAKE_CURRENT_SOURCE_DIR}/patches/faiss/0002-Enable-precomp-table-to-be-shared-ivfpq.patch WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/external/faiss ERROR_VARIABLE ERROR_MSG RESULT_VARIABLE RESULT_CODE) + execute_process(COMMAND git ${GIT_PATCH_COMMAND} --3way --ignore-space-change --ignore-whitespace ${CMAKE_CURRENT_SOURCE_DIR}/patches/faiss/0003-Custom-patch-to-support-range-search-params.patch WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/external/faiss ERROR_VARIABLE ERROR_MSG RESULT_VARIABLE RESULT_CODE) + if(RESULT_CODE) + message(FATAL_ERROR "Failed to apply patch:\n${ERROR_MSG}") + endif() +endif() + +if (${CMAKE_SYSTEM_NAME} STREQUAL Darwin) + if(CMAKE_C_COMPILER_ID MATCHES "Clang\$") + set(OpenMP_C_FLAGS "-Xpreprocessor -fopenmp") + set(OpenMP_C_LIB_NAMES "omp") + set(OpenMP_omp_LIBRARY /usr/local/opt/libomp/lib/libomp.dylib) + endif() + + if(CMAKE_CXX_COMPILER_ID MATCHES "Clang\$") + set(OpenMP_CXX_FLAGS "-Xpreprocessor -fopenmp -I/usr/local/opt/libomp/include") + set(OpenMP_CXX_LIB_NAMES "omp") + set(OpenMP_omp_LIBRARY /usr/local/opt/libomp/lib/libomp.dylib) + endif() +endif() + +find_package(ZLIB REQUIRED) + +# Statically link BLAS - ensure this is before we find the blas package so we dont dynamically link +set(BLA_STATIC ON) +find_package(BLAS REQUIRED) +enable_language(Fortran) +find_package(LAPACK REQUIRED) + +# Set relevant properties +set(BUILD_TESTING OFF) # Avoid building faiss tests +set(FAISS_ENABLE_GPU OFF) +set(FAISS_ENABLE_PYTHON OFF) + +if(NOT DEFINED SIMD_ENABLED) + set(SIMD_ENABLED true) # set default value as true if the argument is not set +endif() + +if(${CMAKE_SYSTEM_NAME} STREQUAL Windows OR ${CMAKE_SYSTEM_PROCESSOR} MATCHES "aarch64" OR ${CMAKE_SYSTEM_PROCESSOR} MATCHES "arm64" OR NOT ${SIMD_ENABLED}) + set(FAISS_OPT_LEVEL generic) # Keep optimization level as generic on Windows OS as it is not supported due to MINGW64 compiler issue. Also, on aarch64 avx2 is not supported. + set(TARGET_LINK_FAISS_LIB faiss) +else() + set(FAISS_OPT_LEVEL avx2) # Keep optimization level as avx2 to improve performance on Linux and Mac. + set(TARGET_LINK_FAISS_LIB faiss_avx2) + string(PREPEND LIB_EXT "_avx2") # Prepend "_avx2" to lib extension to create the library as "libopensearchknn_faiss_avx2.so" on linux and "libopensearchknn_faiss_avx2.jnilib" on mac +endif() + +add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/external/faiss EXCLUDE_FROM_ALL) diff --git a/jni/include/faiss_index_service.h b/jni/include/faiss_index_service.h index 59f15fda9c..05d1b9be35 100644 --- a/jni/include/faiss_index_service.h +++ b/jni/include/faiss_index_service.h @@ -31,8 +31,50 @@ namespace faiss_wrapper { class IndexService { public: IndexService(std::unique_ptr faissMethods); - //TODO Remove dependency on JNIUtilInterface and JNIEnv - //TODO Reduce the number of parameters + /** + * Initialize index + * + * @param jniUtil jni util + * @param env jni environment + * @param metric space type for distance calculation + * @param indexDescription index description to be used by faiss index factory + * @param dim dimension of vectors + * @param numVectors number of vectors + * @param threadCount number of thread count to be used while adding data + * @param parameters parameters to be applied to faiss index + * @return pointer to the native index object + */ + virtual jlong initIndex(knn_jni::JNIUtilInterface *jniUtil, JNIEnv *env, faiss::MetricType metric, std::string indexDescription, int dim, int numVectors, int threadCount, std::unordered_map parameters); + /** + * Add vectors to index + * + * @param jniUtil jni util + * @param env jni environment + * @param metric space type for distance calculation + * @param indexDescription index description to be used by faiss index factory + * @param dim dimension of vectors + * @param numIds number of vectors + * @param threadCount number of thread count to be used while adding data + * @param vectorsAddress memory address which is holding vector data + * @param idMap a map of document id and vector id + * @param parameters parameters to be applied to faiss index + */ + virtual void insertToIndex(knn_jni::JNIUtilInterface *jniUtil, JNIEnv *env, faiss::MetricType metric, std::string indexDescription, int dim, int numIds, int threadCount, int64_t vectorsAddress, std::vector &ids, jlong idMapAddress, std::unordered_map parameters); + /** + * Write index to disk + * + * @param jniUtil jni util + * @param env jni environment + * @param metric space type for distance calculation + * @param indexDescription index description to be used by faiss index factory + * @param threadCount number of thread count to be used while adding data + * @param indexPath path to write index + * @param idMap a map of document id and vector id + * @param parameters parameters to be applied to faiss index + */ + virtual void writeIndex(knn_jni::JNIUtilInterface *jniUtil, JNIEnv *env, faiss::MetricType metric, std::string indexDescription, int threadCount, std::string indexPath, jlong idMapAddress, std::unordered_map parameters); + // TODO Remove dependency on JNIUtilInterface and JNIEnv + // TODO Reduce the number of parameters /** * Create index @@ -58,7 +100,7 @@ class IndexService { int numIds, int threadCount, int64_t vectorsAddress, - std::vector ids, + std::vector & ids, std::string indexPath, std::unordered_map parameters); virtual ~IndexService() = default; @@ -75,6 +117,48 @@ class BinaryIndexService : public IndexService { //TODO Remove dependency on JNIUtilInterface and JNIEnv //TODO Reduce the number of parameters BinaryIndexService(std::unique_ptr faissMethods); + /** + * Initialize index + * + * @param jniUtil jni util + * @param env jni environment + * @param metric space type for distance calculation + * @param indexDescription index description to be used by faiss index factory + * @param dim dimension of vectors + * @param numVectors number of vectors + * @param threadCount number of thread count to be used while adding data + * @param parameters parameters to be applied to faiss index + * @return pointer to the native index object + */ + virtual jlong initIndex(knn_jni::JNIUtilInterface *jniUtil, JNIEnv *env, faiss::MetricType metric, std::string indexDescription, int dim, int numVectors, int threadCount, std::unordered_map parameters) override; + /** + * Add vectors to index + * + * @param jniUtil jni util + * @param env jni environment + * @param metric space type for distance calculation + * @param indexDescription index description to be used by faiss index factory + * @param dim dimension of vectors + * @param numIds number of vectors + * @param threadCount number of thread count to be used while adding data + * @param vectorsAddress memory address which is holding vector data + * @param idMap a map of document id and vector id + * @param parameters parameters to be applied to faiss index + */ + virtual void insertToIndex(knn_jni::JNIUtilInterface *jniUtil, JNIEnv *env, faiss::MetricType metric, std::string indexDescription, int dim, int numIds, int threadCount, int64_t vectorsAddress, std::vector &ids, jlong idMapAddress, std::unordered_map parameters) override; + /** + * Write index to disk + * + * @param jniUtil jni util + * @param env jni environment + * @param metric space type for distance calculation + * @param indexDescription index description to be used by faiss index factory + * @param threadCount number of thread count to be used while adding data + * @param indexPath path to write index + * @param idMap a map of document id and vector id + * @param parameters parameters to be applied to faiss index + */ + virtual void writeIndex(knn_jni::JNIUtilInterface *jniUtil, JNIEnv *env, faiss::MetricType metric, std::string indexDescription, int threadCount, std::string indexPath, jlong idMapAddress, std::unordered_map parameters) override; /** * Create binary index * @@ -99,7 +183,7 @@ class BinaryIndexService : public IndexService { int numIds, int threadCount, int64_t vectorsAddress, - std::vector ids, + std::vector & ids, std::string indexPath, std::unordered_map parameters ) override; diff --git a/jni/include/faiss_wrapper.h b/jni/include/faiss_wrapper.h index 5ac17cfd13..fa120f9dcc 100644 --- a/jni/include/faiss_wrapper.h +++ b/jni/include/faiss_wrapper.h @@ -18,6 +18,12 @@ namespace knn_jni { namespace faiss_wrapper { + jlong InitIndex(knn_jni::JNIUtilInterface *jniUtil, JNIEnv *env, jlong numDocs, jint dimJ, jobject parametersJ, IndexService *indexService); + + void InsertToIndex(knn_jni::JNIUtilInterface *jniUtil, JNIEnv *env, jintArray idsJ, jlong vectorsAddressJ, jint dimJ, jlong index_ptr, jobject parametersJ, IndexService *indexService); + + void WriteIndex(knn_jni::JNIUtilInterface *jniUtil, JNIEnv *env, jstring indexPathJ, jlong index_ptr, jobject parametersJ, IndexService *indexService); + // Create an index with ids and vectors. The configuration is defined by values in the Java map, parametersJ. // The index is serialized to indexPathJ. void CreateIndex(knn_jni::JNIUtilInterface * jniUtil, JNIEnv * env, jintArray idsJ, jlong vectorsAddressJ, jint dimJ, diff --git a/jni/include/org_opensearch_knn_jni_FaissService.h b/jni/include/org_opensearch_knn_jni_FaissService.h index 3d6aef45c4..801cfedde2 100644 --- a/jni/include/org_opensearch_knn_jni_FaissService.h +++ b/jni/include/org_opensearch_knn_jni_FaissService.h @@ -19,6 +19,30 @@ extern "C" { #endif +JNIEXPORT jlong JNICALL Java_org_opensearch_knn_jni_FaissService_initIndex(JNIEnv * env, jclass cls, + jlong numDocs, jint dimJ, + jobject parametersJ); + +JNIEXPORT jlong JNICALL Java_org_opensearch_knn_jni_FaissService_initBinaryIndex(JNIEnv * env, jclass cls, + jlong numDocs, jint dimJ, + jobject parametersJ); + +JNIEXPORT void JNICALL Java_org_opensearch_knn_jni_FaissService_insertToIndex(JNIEnv * env, jclass cls, jintArray idsJ, + jlong vectorsAddressJ, jint dimJ, + jlong indexAddress, jobject parametersJ); + +JNIEXPORT void JNICALL Java_org_opensearch_knn_jni_FaissService_insertToBinaryIndex(JNIEnv * env, jclass cls, jintArray idsJ, + jlong vectorsAddressJ, jint dimJ, + jlong indexAddress, jobject parametersJ); + +JNIEXPORT void JNICALL Java_org_opensearch_knn_jni_FaissService_writeIndex(JNIEnv * env, jclass cls, + jlong indexAddress, + jstring indexPathJ, jobject parametersJ); + +JNIEXPORT void JNICALL Java_org_opensearch_knn_jni_FaissService_writeBinaryIndex(JNIEnv * env, jclass cls, + jlong indexAddress, + jstring indexPathJ, jobject parametersJ); + /* * Class: org_opensearch_knn_jni_FaissService * Method: createIndex diff --git a/jni/src/faiss_index_service.cpp b/jni/src/faiss_index_service.cpp index 8c5ba36af2..80782f76ac 100644 --- a/jni/src/faiss_index_service.cpp +++ b/jni/src/faiss_index_service.cpp @@ -57,6 +57,109 @@ void SetExtraParameters(knn_jni::JNIUtilInterface * jniUtil, JNIEnv *env, IndexService::IndexService(std::unique_ptr faissMethods) : faissMethods(std::move(faissMethods)) {} +jlong IndexService::initIndex( + knn_jni::JNIUtilInterface * jniUtil, + JNIEnv * env, + faiss::MetricType metric, + std::string indexDescription, + int dim, + int numVectors, + int threadCount, + std::unordered_map parameters + ) { + // Removed the check for number of vectors + // Don't use unique_ptr here since we want to access the index in the future. + faiss::Index * indexWriter(faissMethods->indexFactory(dim, indexDescription.c_str(), metric)); + + // Set thread count if it is passed in as a parameter. Setting this variable will only impact the current thread + if(threadCount != 0) { + omp_set_num_threads(threadCount); + } + + // Add extra parameters that cant be configured with the index factory + SetExtraParameters(jniUtil, env, parameters, indexWriter); + + // Check that the index does not need to be trained + if(!indexWriter->is_trained) { + throw std::runtime_error("Index is not trained"); + } + + // Add vectors + faiss::IndexIDMap * idMap(faissMethods->indexIdMap(indexWriter)); + + faiss::IndexHNSW * hnsw = dynamic_cast(idMap->index); + + if(hnsw != NULL) { + faiss::IndexFlat * storage = dynamic_cast(hnsw->storage); + if(storage != NULL) { + storage->codes.reserve(dim * numVectors); + } + } + + return (jlong)idMap; +} + +void IndexService::insertToIndex( + knn_jni::JNIUtilInterface * jniUtil, + JNIEnv * env, + faiss::MetricType metric, + std::string indexDescription, + int dim, + int numIds, + int threadCount, + int64_t vectorsAddress, + std::vector & ids, + jlong idMapAddress, + std::unordered_map parameters + ) { + // Read vectors from memory address (unique ptr since we want to remove from memory after use) + std::unique_ptr> inputVectors (reinterpret_cast*>(vectorsAddress)); + + // The number of vectors can be int here because a lucene segment number of total docs never crosses INT_MAX value + int numVectors = (int) (inputVectors->size() / (uint64_t) dim); + if(numVectors == 0) { + throw std::runtime_error("Number of vectors cannot be 0"); + } + + if (numIds != numVectors) { + throw std::runtime_error("Number of IDs does not match number of vectors"); + } + + // Set thread count if it is passed in as a parameter. Setting this variable will only impact the current thread + if(threadCount != 0) { + omp_set_num_threads(threadCount); + } + + faiss::IndexIDMap * idMap = reinterpret_cast (idMapAddress); + + // Add vectors + idMap->add_with_ids(numVectors, inputVectors->data(), ids.data()); +} + +void IndexService::writeIndex( + knn_jni::JNIUtilInterface * jniUtil, + JNIEnv * env, + faiss::MetricType metric, + std::string indexDescription, + int threadCount, + std::string indexPath, + jlong idMapAddress, + std::unordered_map parameters + ) { + // Set thread count if it is passed in as a parameter. Setting this variable will only impact the current thread + if(threadCount != 0) { + omp_set_num_threads(threadCount); + } + + faiss::IndexIDMap * idMap = reinterpret_cast (idMapAddress); + + // Write the index to disk + faissMethods->writeIndex(idMap, indexPath.c_str()); + + // Free the memory used by the index + delete idMap; +} + void IndexService::createIndex( knn_jni::JNIUtilInterface * jniUtil, JNIEnv * env, @@ -66,7 +169,7 @@ void IndexService::createIndex( int numIds, int threadCount, int64_t vectorsAddress, - std::vector ids, + std::vector & ids, std::string indexPath, std::unordered_map parameters ) { @@ -108,6 +211,109 @@ void IndexService::createIndex( BinaryIndexService::BinaryIndexService(std::unique_ptr faissMethods) : IndexService(std::move(faissMethods)) {} +jlong BinaryIndexService::initIndex( + knn_jni::JNIUtilInterface * jniUtil, + JNIEnv * env, + faiss::MetricType metric, + std::string indexDescription, + int dim, + int numVectors, + int threadCount, + std::unordered_map parameters + ) { + // Removed the check for number of vectors + // Don't use unique_ptr here since we want to access the index in the future. + faiss::IndexBinary * indexWriter(faissMethods->indexBinaryFactory(dim, indexDescription.c_str(), metric)); + + // Set thread count if it is passed in as a parameter. Setting this variable will only impact the current thread + if(threadCount != 0) { + omp_set_num_threads(threadCount); + } + + // Add extra parameters that cant be configured with the index factory + SetExtraParameters(jniUtil, env, parameters, indexWriter); + + // Check that the index does not need to be trained + if(!indexWriter->is_trained) { + throw std::runtime_error("Index is not trained"); + } + + // Add vectors + faiss::IndexBinaryIDMap * idMap(faissMethods->indexBinaryIdMap(indexWriter)); + + faiss::IndexBinaryHNSW * hnsw = dynamic_cast(idMap->index); + + if(hnsw != NULL) { + faiss::IndexBinaryFlat * storage = dynamic_cast(hnsw->storage); + if(storage != NULL) { + storage->xb.reserve(dim / 8 * numVectors); + } + } + + return (jlong)idMap; +} + +void BinaryIndexService::insertToIndex( + knn_jni::JNIUtilInterface * jniUtil, + JNIEnv * env, + faiss::MetricType metric, + std::string indexDescription, + int dim, + int numIds, + int threadCount, + int64_t vectorsAddress, + std::vector & ids, + jlong idMapAddress, + std::unordered_map parameters + ) { + // Read vectors from memory address (unique ptr since we want to remove from memory after use) + std::unique_ptr> inputVectors (reinterpret_cast*>(vectorsAddress)); + + // The number of vectors can be int here because a lucene segment number of total docs never crosses INT_MAX value + int numVectors = (int) (inputVectors->size() / (uint64_t) (dim / 8)); + if(numVectors == 0) { + throw std::runtime_error("Number of vectors cannot be 0"); + } + + if (numIds != numVectors) { + throw std::runtime_error("Number of IDs does not match number of vectors"); + } + + // Set thread count if it is passed in as a parameter. Setting this variable will only impact the current thread + if(threadCount != 0) { + omp_set_num_threads(threadCount); + } + + faiss::IndexBinaryIDMap * idMap = reinterpret_cast (idMapAddress); + + // Add vectors + idMap->add_with_ids(numVectors, inputVectors->data(), ids.data()); +} + +void BinaryIndexService::writeIndex( + knn_jni::JNIUtilInterface * jniUtil, + JNIEnv * env, + faiss::MetricType metric, + std::string indexDescription, + int threadCount, + std::string indexPath, + jlong idMapAddress, + std::unordered_map parameters + ) { + // Set thread count if it is passed in as a parameter. Setting this variable will only impact the current thread + if(threadCount != 0) { + omp_set_num_threads(threadCount); + } + + faiss::IndexBinaryIDMap * idMap = reinterpret_cast (idMapAddress); + + // Write the index to disk + faissMethods->writeIndexBinary(idMap, indexPath.c_str()); + + // Free the memory used by the index + delete idMap; +} + void BinaryIndexService::createIndex( knn_jni::JNIUtilInterface * jniUtil, JNIEnv * env, @@ -117,7 +323,7 @@ void BinaryIndexService::createIndex( int numIds, int threadCount, int64_t vectorsAddress, - std::vector ids, + std::vector & ids, std::string indexPath, std::unordered_map parameters ) { diff --git a/jni/src/faiss_wrapper.cpp b/jni/src/faiss_wrapper.cpp index c4c6e18eb5..b599a304bb 100644 --- a/jni/src/faiss_wrapper.cpp +++ b/jni/src/faiss_wrapper.cpp @@ -85,6 +85,220 @@ bool isIndexIVFPQL2(faiss::Index * index); // IndexIDMap which has member that will point to underlying index that stores the data faiss::IndexIVFPQ * extractIVFPQIndex(faiss::Index * index); +jlong knn_jni::faiss_wrapper::InitIndex(knn_jni::JNIUtilInterface * jniUtil, JNIEnv * env, jlong numDocs, jint dimJ, + jobject parametersJ, IndexService* indexService) { + + if(dimJ <= 0) { + throw std::runtime_error("Vectors dimensions cannot be less than or equal to 0"); + } + + if (parametersJ == nullptr) { + throw std::runtime_error("Parameters cannot be null"); + } + + // parametersJ is a Java Map. ConvertJavaMapToCppMap converts it to a c++ map + // so that it is easier to access. + auto parametersCpp = jniUtil->ConvertJavaMapToCppMap(env, parametersJ); + + // Parameters to pass + // Metric type + jobject spaceTypeJ = knn_jni::GetJObjectFromMapOrThrow(parametersCpp, knn_jni::SPACE_TYPE); + std::string spaceTypeCpp(jniUtil->ConvertJavaObjectToCppString(env, spaceTypeJ)); + faiss::MetricType metric = TranslateSpaceToMetric(spaceTypeCpp); + jniUtil->DeleteLocalRef(env, spaceTypeJ); + + // Dimension + int dim = (int)dimJ; + + // Number of docs + int docs = (int)numDocs; + + // Index description + jobject indexDescriptionJ = knn_jni::GetJObjectFromMapOrThrow(parametersCpp, knn_jni::INDEX_DESCRIPTION); + std::string indexDescriptionCpp(jniUtil->ConvertJavaObjectToCppString(env, indexDescriptionJ)); + jniUtil->DeleteLocalRef(env, indexDescriptionJ); + + // Thread count + int threadCount = 0; + if(parametersCpp.find(knn_jni::INDEX_THREAD_QUANTITY) != parametersCpp.end()) { + threadCount = jniUtil->ConvertJavaObjectToCppInteger(env, parametersCpp[knn_jni::INDEX_THREAD_QUANTITY]); + } + + // Extra parameters + // TODO: parse the entire map and remove jni object + std::unordered_map subParametersCpp; + if(parametersCpp.find(knn_jni::PARAMETERS) != parametersCpp.end()) { + subParametersCpp = jniUtil->ConvertJavaMapToCppMap(env, parametersCpp[knn_jni::PARAMETERS]); + } + // end parameters to pass + + // Create index + return indexService->initIndex(jniUtil, env, metric, indexDescriptionCpp, dim, numDocs, threadCount, subParametersCpp); +} + +jlong knn_jni::faiss_wrapper::InitIndexFromTemplate(knn_jni::JNIUtilInterface * jniUtil, JNIEnv * env, jlong numDocs, jint dimJ, + jobject parametersJ, jbyteArray templateIndexJ, IndexService* indexService) { + + if(dimJ <= 0) { + throw std::runtime_error("Vectors dimensions cannot be less than or equal to 0"); + } + + if (parametersJ == nullptr) { + throw std::runtime_error("Parameters cannot be null"); + } + + // parametersJ is a Java Map. ConvertJavaMapToCppMap converts it to a c++ map + // so that it is easier to access. + auto parametersCpp = jniUtil->ConvertJavaMapToCppMap(env, parametersJ); + + // Parameters to pass + // Metric type + jobject spaceTypeJ = knn_jni::GetJObjectFromMapOrThrow(parametersCpp, knn_jni::SPACE_TYPE); + std::string spaceTypeCpp(jniUtil->ConvertJavaObjectToCppString(env, spaceTypeJ)); + faiss::MetricType metric = TranslateSpaceToMetric(spaceTypeCpp); + jniUtil->DeleteLocalRef(env, spaceTypeJ); + + // Dimension + int dim = (int)dimJ; + + // Number of docs + int docs = (int)numDocs; + + // Index description + jobject indexDescriptionJ = knn_jni::GetJObjectFromMapOrThrow(parametersCpp, knn_jni::INDEX_DESCRIPTION); + std::string indexDescriptionCpp(jniUtil->ConvertJavaObjectToCppString(env, indexDescriptionJ)); + jniUtil->DeleteLocalRef(env, indexDescriptionJ); + + // Thread count + int threadCount = 0; + if(parametersCpp.find(knn_jni::INDEX_THREAD_QUANTITY) != parametersCpp.end()) { + threadCount = jniUtil->ConvertJavaObjectToCppInteger(env, parametersCpp[knn_jni::INDEX_THREAD_QUANTITY]); + } + + // Extra parameters + // TODO: parse the entire map and remove jni object + std::unordered_map subParametersCpp; + if(parametersCpp.find(knn_jni::PARAMETERS) != parametersCpp.end()) { + subParametersCpp = jniUtil->ConvertJavaMapToCppMap(env, parametersCpp[knn_jni::PARAMETERS]); + } + // end parameters to pass + + // Create index + return indexService->initIndex(jniUtil, env, metric, indexDescriptionCpp, dim, numDocs, threadCount, subParametersCpp); +} + +void knn_jni::faiss_wrapper::InsertToIndex(knn_jni::JNIUtilInterface * jniUtil, JNIEnv * env, jintArray idsJ, jlong vectorsAddressJ, jint dimJ, + jlong index_ptr, jobject parametersJ, IndexService* indexService) { + if (idsJ == nullptr) { + throw std::runtime_error("IDs cannot be null"); + } + + if (vectorsAddressJ <= 0) { + throw std::runtime_error("VectorsAddress cannot be less than 0"); + } + + if(dimJ <= 0) { + throw std::runtime_error("Vectors dimensions cannot be less than or equal to 0"); + } + + if (parametersJ == nullptr) { + throw std::runtime_error("Parameters cannot be null"); + } + + // parametersJ is a Java Map. ConvertJavaMapToCppMap converts it to a c++ map + // so that it is easier to access. + auto parametersCpp = jniUtil->ConvertJavaMapToCppMap(env, parametersJ); + + // Parameters to pass + // Metric type + jobject spaceTypeJ = knn_jni::GetJObjectFromMapOrThrow(parametersCpp, knn_jni::SPACE_TYPE); + std::string spaceTypeCpp(jniUtil->ConvertJavaObjectToCppString(env, spaceTypeJ)); + faiss::MetricType metric = TranslateSpaceToMetric(spaceTypeCpp); + jniUtil->DeleteLocalRef(env, spaceTypeJ); + + // Dimension + int dim = (int)dimJ; + + // Number of vectors + int numIds = jniUtil->GetJavaIntArrayLength(env, idsJ); + + // Index description + jobject indexDescriptionJ = knn_jni::GetJObjectFromMapOrThrow(parametersCpp, knn_jni::INDEX_DESCRIPTION); + std::string indexDescriptionCpp(jniUtil->ConvertJavaObjectToCppString(env, indexDescriptionJ)); + jniUtil->DeleteLocalRef(env, indexDescriptionJ); + + // Thread count + int threadCount = 0; + if(parametersCpp.find(knn_jni::INDEX_THREAD_QUANTITY) != parametersCpp.end()) { + threadCount = jniUtil->ConvertJavaObjectToCppInteger(env, parametersCpp[knn_jni::INDEX_THREAD_QUANTITY]); + } + + // Vectors address + int64_t vectorsAddress = (int64_t)vectorsAddressJ; + + // Ids + auto ids = jniUtil->ConvertJavaIntArrayToCppIntVector(env, idsJ); + + // Extra parameters + // TODO: parse the entire map and remove jni object + std::unordered_map subParametersCpp; + if(parametersCpp.find(knn_jni::PARAMETERS) != parametersCpp.end()) { + subParametersCpp = jniUtil->ConvertJavaMapToCppMap(env, parametersCpp[knn_jni::PARAMETERS]); + } + // end parameters to pass + + // Create index + indexService->insertToIndex(jniUtil, env, metric, indexDescriptionCpp, dim, numIds, threadCount, vectorsAddress, ids, index_ptr, subParametersCpp); +} + +void knn_jni::faiss_wrapper::WriteIndex(knn_jni::JNIUtilInterface * jniUtil, JNIEnv * env, + jstring indexPathJ, jlong index_ptr, jobject parametersJ, IndexService* indexService) { + + if (indexPathJ == nullptr) { + throw std::runtime_error("Index path cannot be null"); + } + + if (parametersJ == nullptr) { + throw std::runtime_error("Parameters cannot be null"); + } + + // parametersJ is a Java Map. ConvertJavaMapToCppMap converts it to a c++ map + // so that it is easier to access. + auto parametersCpp = jniUtil->ConvertJavaMapToCppMap(env, parametersJ); + + // Parameters to pass + // Metric type + jobject spaceTypeJ = knn_jni::GetJObjectFromMapOrThrow(parametersCpp, knn_jni::SPACE_TYPE); + std::string spaceTypeCpp(jniUtil->ConvertJavaObjectToCppString(env, spaceTypeJ)); + faiss::MetricType metric = TranslateSpaceToMetric(spaceTypeCpp); + jniUtil->DeleteLocalRef(env, spaceTypeJ); + + // Index description + jobject indexDescriptionJ = knn_jni::GetJObjectFromMapOrThrow(parametersCpp, knn_jni::INDEX_DESCRIPTION); + std::string indexDescriptionCpp(jniUtil->ConvertJavaObjectToCppString(env, indexDescriptionJ)); + jniUtil->DeleteLocalRef(env, indexDescriptionJ); + + // Thread count + int threadCount = 0; + if(parametersCpp.find(knn_jni::INDEX_THREAD_QUANTITY) != parametersCpp.end()) { + threadCount = jniUtil->ConvertJavaObjectToCppInteger(env, parametersCpp[knn_jni::INDEX_THREAD_QUANTITY]); + } + + // Index path + std::string indexPathCpp(jniUtil->ConvertJavaStringToCppString(env, indexPathJ)); + + // Extra parameters + // TODO: parse the entire map and remove jni object + std::unordered_map subParametersCpp; + if(parametersCpp.find(knn_jni::PARAMETERS) != parametersCpp.end()) { + subParametersCpp = jniUtil->ConvertJavaMapToCppMap(env, parametersCpp[knn_jni::PARAMETERS]); + } + // end parameters to pass + + // Create index + indexService->writeIndex(jniUtil, env, metric, indexDescriptionCpp, threadCount, indexPathCpp, index_ptr, subParametersCpp); +} + void knn_jni::faiss_wrapper::CreateIndex(knn_jni::JNIUtilInterface * jniUtil, JNIEnv * env, jintArray idsJ, jlong vectorsAddressJ, jint dimJ, jstring indexPathJ, jobject parametersJ, IndexService* indexService) { if (idsJ == nullptr) { diff --git a/jni/src/org_opensearch_knn_jni_FaissService.cpp b/jni/src/org_opensearch_knn_jni_FaissService.cpp index 5f9c83ea86..d6693b7546 100644 --- a/jni/src/org_opensearch_knn_jni_FaissService.cpp +++ b/jni/src/org_opensearch_knn_jni_FaissService.cpp @@ -39,6 +39,94 @@ void JNI_OnUnload(JavaVM *vm, void *reserved) { jniUtil.Uninitialize(env); } +JNIEXPORT jlong JNICALL Java_org_opensearch_knn_jni_FaissService_initIndex(JNIEnv * env, jclass cls, jintArray idsJ, + jlong numDocs, jint dimJ, + jobject parametersJ) +{ + try { + std::unique_ptr faissMethods(new knn_jni::faiss_wrapper::FaissMethods()); + knn_jni::faiss_wrapper::IndexService indexService(std::move(faissMethods)); + return knn_jni::faiss_wrapper::InitIndex(&jniUtil, env, numDocs, dimJ, parametersJ, &indexService); + } catch (...) { + jniUtil.CatchCppExceptionAndThrowJava(env); + } +} + +JNIEXPORT jlong JNICALL Java_org_opensearch_knn_jni_FaissService_initBinaryIndex(JNIEnv * env, jclass cls, jintArray idsJ, + jlong numDocs, jint dimJ, + jobject parametersJ) +{ + try { + std::unique_ptr faissMethods(new knn_jni::faiss_wrapper::FaissMethods()); + knn_jni::faiss_wrapper::BinaryIndexService binaryIndexService(std::move(faissMethods)); + return knn_jni::faiss_wrapper::InitIndex(&jniUtil, env, numDocs, dimJ, parametersJ, &binaryIndexService); + } catch (...) { + jniUtil.CatchCppExceptionAndThrowJava(env); + } +} + +JNIEXPORT void JNICALL Java_org_opensearch_knn_jni_FaissService_insertToIndex(JNIEnv * env, jclass cls, jintArray idsJ, + jlong vectorsAddressJ, jint dimJ, + jlong indexAddress, jobject parametersJ) +{ + try { + std::unique_ptr faissMethods(new knn_jni::faiss_wrapper::FaissMethods()); + knn_jni::faiss_wrapper::IndexService indexService(std::move(faissMethods)); + knn_jni::faiss_wrapper::InsertToIndex(&jniUtil, env, idsJ, vectorsAddressJ, dimJ, indexAddress, parametersJ, &indexService); + + // Releasing the vectorsAddressJ memory as that is not required once we have created the index. + // This is not the ideal approach, please refer this gh issue for long term solution: + // https://github.com/opensearch-project/k-NN/issues/1600 + delete reinterpret_cast*>(vectorsAddressJ); + } catch (...) { + jniUtil.CatchCppExceptionAndThrowJava(env); + } +} + +JNIEXPORT void JNICALL Java_org_opensearch_knn_jni_FaissService_insertToBinaryIndex(JNIEnv * env, jclass cls, jintArray idsJ, + jlong vectorsAddressJ, jint dimJ, + jlong indexAddress, jobject parametersJ) +{ + try { + std::unique_ptr faissMethods(new knn_jni::faiss_wrapper::FaissMethods()); + knn_jni::faiss_wrapper::BinaryIndexService binaryIndexService(std::move(faissMethods)); + knn_jni::faiss_wrapper::InsertToIndex(&jniUtil, env, idsJ, vectorsAddressJ, dimJ, indexAddress, parametersJ, &binaryIndexService); + + // Releasing the vectorsAddressJ memory as that is not required once we have created the index. + // This is not the ideal approach, please refer this gh issue for long term solution: + // https://github.com/opensearch-project/k-NN/issues/1600 + delete reinterpret_cast*>(vectorsAddressJ); + } catch (...) { + jniUtil.CatchCppExceptionAndThrowJava(env); + } +} + +JNIEXPORT void JNICALL Java_org_opensearch_knn_jni_FaissService_writeIndex(JNIEnv * env, jclass cls, + jlong indexAddress, + jstring indexPathJ, jobject parametersJ) +{ + try { + std::unique_ptr faissMethods(new knn_jni::faiss_wrapper::FaissMethods()); + knn_jni::faiss_wrapper::IndexService indexService(std::move(faissMethods)); + knn_jni::faiss_wrapper::WriteIndex(&jniUtil, env, indexPathJ, indexAddress, parametersJ, &indexService); + } catch (...) { + jniUtil.CatchCppExceptionAndThrowJava(env); + } +} + +JNIEXPORT void JNICALL Java_org_opensearch_knn_jni_FaissService_writeBinaryIndex(JNIEnv * env, jclass cls, + jlong indexAddress, + jstring indexPathJ, jobject parametersJ) +{ + try { + std::unique_ptr faissMethods(new knn_jni::faiss_wrapper::FaissMethods()); + knn_jni::faiss_wrapper::BinaryIndexService binaryIndexService(std::move(faissMethods)); + knn_jni::faiss_wrapper::WriteIndex(&jniUtil, env, indexPathJ, indexAddress, parametersJ, &binaryIndexService); + } catch (...) { + jniUtil.CatchCppExceptionAndThrowJava(env); + } +} + JNIEXPORT void JNICALL Java_org_opensearch_knn_jni_FaissService_createIndex(JNIEnv * env, jclass cls, jintArray idsJ, jlong vectorsAddressJ, jint dimJ, jstring indexPathJ, jobject parametersJ) diff --git a/src/main/java/org/opensearch/knn/index/codec/KNN80Codec/KNN80DocValuesConsumer.java b/src/main/java/org/opensearch/knn/index/codec/KNN80Codec/KNN80DocValuesConsumer.java index 2e5c780768..27b7e61693 100644 --- a/src/main/java/org/opensearch/knn/index/codec/KNN80Codec/KNN80DocValuesConsumer.java +++ b/src/main/java/org/opensearch/knn/index/codec/KNN80Codec/KNN80DocValuesConsumer.java @@ -109,17 +109,16 @@ private KNNEngine getKNNEngine(@NonNull FieldInfo field) { public void addKNNBinaryField(FieldInfo field, DocValuesProducer valuesProducer, boolean isMerge, boolean isRefresh) throws IOException { // Get values to be indexed - BinaryDocValues values = valuesProducer.getBinary(field); - KNNCodecUtil.Pair pair = KNNCodecUtil.getFloats(values); - if (pair.getVectorAddress() == 0 || pair.docs.length == 0) { - logger.info("Skipping engine index creation as there are no vectors or docs in the segment"); + if (field == null) { + logger.info("Field is null!\n"); return; } - long arraySize = calculateArraySize(pair.docs.length, pair.getDimension(), pair.serializationMode); + BinaryDocValues values = valuesProducer.getBinary(field); + long num_docs = KNNCodecUtil.getTotalLiveDocsCount(values); + long totalArraySize = 0; + long totalDocsIncrement = 0; if (isMerge) { KNNGraphValue.MERGE_CURRENT_OPERATIONS.increment(); - KNNGraphValue.MERGE_CURRENT_DOCS.incrementBy(pair.docs.length); - KNNGraphValue.MERGE_CURRENT_SIZE_IN_BYTES.incrementBy(arraySize); } // Increment counter for number of graph index requests KNNCounter.GRAPH_INDEX_REQUESTS.increment(); @@ -134,32 +133,54 @@ public void addKNNBinaryField(FieldInfo field, DocValuesProducer valuesProducer, ((FSDirectory) (FilterDirectory.unwrap(state.directory))).getDirectory().toString(), engineFileName ).toString(); - NativeIndexCreator indexCreator; // Create library index either from model or from scratch + + // This is a bit of a hack. We have to create an output here and then immediately close it to ensure that + // engineFileName is added to the tracked files by Lucene's TrackingDirectoryWrapper. Otherwise, the file will + // not be marked as added to the directory. + state.directory.createOutput(engineFileName, state.context).close(); if (field.attributes().containsKey(MODEL_ID)) { String modelId = field.attributes().get(MODEL_ID); Model model = ModelCache.getInstance().get(modelId); if (model.getModelBlob() == null) { throw new RuntimeException(String.format("There is no trained model with id \"%s\"", modelId)); } - indexCreator = () -> createKNNIndexFromTemplate(model.getModelBlob(), pair, knnEngine, indexPath); + KNNCodecUtil.VectorBatch pair = KNNCodecUtil.getFloats(values); + createKNNIndexFromTemplate(model.getModelBlob(), pair, knnEngine, indexPath); + totalDocsIncrement += pair.docs.length; + totalArraySize += calculateArraySize(pair.docs.length, pair.getDimension(), pair.serializationMode); } else { - indexCreator = () -> createKNNIndexFromScratch(field, pair, knnEngine, indexPath); + if (KNNEngine.FAISS == knnEngine) { + KNNCodecUtil.VectorBatch pair = KNNCodecUtil.getFloatsBatch(values); + long indexAddress = initIndexFromScratch(field, num_docs, pair.getDimension(), knnEngine); + while (true) { + if (pair.docs.length != 0) insertToIndex(pair, knnEngine, indexAddress); + if (isMerge) { + totalArraySize += calculateArraySize(pair.docs.length, pair.getDimension(), pair.serializationMode); + totalDocsIncrement += pair.docs.length; + } + if (pair.finished) { + break; + } + pair = KNNCodecUtil.getFloatsBatch(values); + } + writeIndex(indexAddress, indexPath, knnEngine); + } else { + // Note that iterative graph construction has not yet been implemented for nmslib + KNNCodecUtil.VectorBatch pair = KNNCodecUtil.getFloats(values); + createKNNIndexFromScratch(field, pair, knnEngine, indexPath); + totalDocsIncrement += pair.docs.length; + totalArraySize += calculateArraySize(pair.docs.length, pair.getDimension(), pair.serializationMode); + } } - if (isMerge) { - recordMergeStats(pair.docs.length, arraySize); + KNNGraphValue.MERGE_CURRENT_DOCS.incrementBy(totalDocsIncrement); + KNNGraphValue.MERGE_CURRENT_SIZE_IN_BYTES.incrementBy(totalArraySize); + recordMergeStats((int) totalDocsIncrement, totalArraySize); } - if (isRefresh) { recordRefreshStats(); } - - // This is a bit of a hack. We have to create an output here and then immediately close it to ensure that - // engineFileName is added to the tracked files by Lucene's TrackingDirectoryWrapper. Otherwise, the file will - // not be marked as added to the directory. - state.directory.createOutput(engineFileName, state.context).close(); - indexCreator.createIndex(); writeFooter(indexPath, engineFileName); } @@ -176,7 +197,65 @@ private void recordRefreshStats() { KNNGraphValue.REFRESH_TOTAL_OPERATIONS.increment(); } - private void createKNNIndexFromTemplate(byte[] model, KNNCodecUtil.Pair pair, KNNEngine knnEngine, String indexPath) { + private long initIndexFromScratch(FieldInfo fieldInfo, long size, int dim, KNNEngine knnEngine) throws IOException { + Map parameters = new HashMap<>(); + Map fieldAttributes = fieldInfo.attributes(); + String parametersString = fieldAttributes.get(KNNConstants.PARAMETERS); + + // parametersString will be null when legacy mapper is used + if (parametersString == null) { + parameters.put(KNNConstants.SPACE_TYPE, fieldAttributes.getOrDefault(KNNConstants.SPACE_TYPE, SpaceType.DEFAULT.getValue())); + + String efConstruction = fieldAttributes.get(KNNConstants.HNSW_ALGO_EF_CONSTRUCTION); + Map algoParams = new HashMap<>(); + if (efConstruction != null) { + algoParams.put(KNNConstants.METHOD_PARAMETER_EF_CONSTRUCTION, Integer.parseInt(efConstruction)); + } + + String m = fieldAttributes.get(KNNConstants.HNSW_ALGO_M); + if (m != null) { + algoParams.put(KNNConstants.METHOD_PARAMETER_M, Integer.parseInt(m)); + } + parameters.put(PARAMETERS, algoParams); + } else { + parameters.putAll( + XContentHelper.createParser( + NamedXContentRegistry.EMPTY, + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, + new BytesArray(parametersString), + MediaTypeRegistry.getDefaultMediaType() + ).map() + ); + } + + // Used to determine how many threads to use when indexing + parameters.put(KNNConstants.INDEX_THREAD_QTY, KNNSettings.state().getSettingValue(KNNSettings.KNN_ALGO_PARAM_INDEX_THREAD_QTY)); + + // Pass the path for the nms library to save the file + return AccessController.doPrivileged((PrivilegedAction) () -> { + return JNIService.initIndexFromScratch(size, dim, parameters, knnEngine); + }); + } + + private void insertToIndex(KNNCodecUtil.VectorBatch pair, KNNEngine knnEngine, long indexAddress) { + Map parameters = ImmutableMap.of( + KNNConstants.INDEX_THREAD_QTY, + KNNSettings.state().getSettingValue(KNNSettings.KNN_ALGO_PARAM_INDEX_THREAD_QTY) + ); + AccessController.doPrivileged((PrivilegedAction) () -> { + JNIService.insertToIndex(pair.docs, pair.getVectorAddress(), pair.getDimension(), parameters, indexAddress, knnEngine); + return null; + }); + } + + private void writeIndex(long indexAddress, String indexPath, KNNEngine knnEngine) { + AccessController.doPrivileged((PrivilegedAction) () -> { + JNIService.writeIndex(indexPath, indexAddress, knnEngine); + return null; + }); + } + + private void createKNNIndexFromTemplate(byte[] model, KNNCodecUtil.VectorBatch pair, KNNEngine knnEngine, String indexPath) { Map parameters = ImmutableMap.of( KNNConstants.INDEX_THREAD_QTY, KNNSettings.state().getSettingValue(KNNSettings.KNN_ALGO_PARAM_INDEX_THREAD_QTY) @@ -195,7 +274,7 @@ private void createKNNIndexFromTemplate(byte[] model, KNNCodecUtil.Pair pair, KN }); } - private void createKNNIndexFromScratch(FieldInfo fieldInfo, KNNCodecUtil.Pair pair, KNNEngine knnEngine, String indexPath) + private void createKNNIndexFromScratch(FieldInfo fieldInfo, KNNCodecUtil.VectorBatch pair, KNNEngine knnEngine, String indexPath) throws IOException { Map parameters = new HashMap<>(); Map fieldAttributes = fieldInfo.attributes(); diff --git a/src/main/java/org/opensearch/knn/index/codec/util/KNNCodecUtil.java b/src/main/java/org/opensearch/knn/index/codec/util/KNNCodecUtil.java index 0a000cadb1..f6cceab903 100644 --- a/src/main/java/org/opensearch/knn/index/codec/util/KNNCodecUtil.java +++ b/src/main/java/org/opensearch/knn/index/codec/util/KNNCodecUtil.java @@ -31,7 +31,7 @@ public class KNNCodecUtil { public static final int JAVA_ROUNDING_NUMBER = 8; @AllArgsConstructor - public static final class Pair { + public static final class VectorBatch { public int[] docs; @Getter @Setter @@ -40,10 +40,10 @@ public static final class Pair { @Setter private int dimension; public SerializationMode serializationMode; - + public boolean finished; } - public static KNNCodecUtil.Pair getFloats(BinaryDocValues values) throws IOException { + public static KNNCodecUtil.VectorBatch getFloats(BinaryDocValues values) throws IOException { List vectorList = new ArrayList<>(); List docIdList = new ArrayList<>(); long vectorAddress = 0; @@ -90,7 +90,57 @@ public static KNNCodecUtil.Pair getFloats(BinaryDocValues values) throws IOExcep if (vectorList.isEmpty() == false) { vectorAddress = JNICommons.storeVectorData(vectorAddress, vectorList.toArray(new float[][] {}), totalLiveDocs * dimension); } - return new KNNCodecUtil.Pair(docIdList.stream().mapToInt(Integer::intValue).toArray(), vectorAddress, dimension, serializationMode); + return new KNNCodecUtil.VectorBatch(docIdList.stream().mapToInt(Integer::intValue).toArray(), vectorAddress, dimension, serializationMode, true); + } + + public static KNNCodecUtil.VectorBatch getFloatsBatch(BinaryDocValues values) throws IOException { + List vectorList = new ArrayList<>(); + List docIdList = new ArrayList<>(); + long vectorAddress = 0; + int dimension = 0; + SerializationMode serializationMode = SerializationMode.COLLECTION_OF_FLOATS; + + long totalLiveDocs = getTotalLiveDocsCount(values); + long vectorsStreamingMemoryLimit = KNNSettings.getVectorStreamingMemoryLimit().getBytes(); + long vectorsPerTransfer = Integer.MIN_VALUE; + + for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) { + BytesRef bytesref = values.binaryValue(); + try (ByteArrayInputStream byteStream = new ByteArrayInputStream(bytesref.bytes, bytesref.offset, bytesref.length)) { + serializationMode = KNNVectorSerializerFactory.serializerModeFromStream(byteStream); + final KNNVectorSerializer vectorSerializer = KNNVectorSerializerFactory.getSerializerByStreamContent(byteStream); + final float[] vector = vectorSerializer.byteToFloatArray(byteStream); + dimension = vector.length; + + if (vectorsPerTransfer == Integer.MIN_VALUE) { + // if vectorsStreamingMemoryLimit is 100 bytes and we have 50 vectors with 5 dimension, then per + // transfer we have to send 100/(5 * 4) => 20 vectors. + vectorsPerTransfer = vectorsStreamingMemoryLimit / ((long) dimension * Float.BYTES); + // If vectorsPerTransfer comes out to be 0, then we set number of vectors per transfer to 1, to ensure that + // we are sending minimum number of vectors. + if (vectorsPerTransfer == 0) { + vectorsPerTransfer = 1; + } + } + docIdList.add(doc); + vectorList.add(vector); + if (vectorList.size() == vectorsPerTransfer) { + vectorAddress = JNICommons.storeVectorData( + vectorAddress, + vectorList.toArray(new float[][] {}), + totalLiveDocs * dimension + ); + // We should probably come up with a better way to reuse the vectorList memory which we have + // created. Problem here is doing like this can lead to a lot of list memory which is of no use and + // will be garbage collected later on, but it creates pressure on JVM. We should revisit this. + return new KNNCodecUtil.VectorBatch(docIdList.stream().mapToInt(Integer::intValue).toArray(), vectorAddress, dimension, serializationMode, false); + } + } + } + if (vectorList.isEmpty() == false) { + vectorAddress = JNICommons.storeVectorData(vectorAddress, vectorList.toArray(new float[][] {}), totalLiveDocs * dimension); + } + return new KNNCodecUtil.VectorBatch(docIdList.stream().mapToInt(Integer::intValue).toArray(), vectorAddress, dimension, serializationMode, true); } public static long calculateArraySize(int numVectors, int vectorLength, SerializationMode serializationMode) { @@ -129,7 +179,7 @@ public static String buildEngineFileSuffix(String fieldName, String extension) { return String.format("_%s%s", fieldName, extension); } - private static long getTotalLiveDocsCount(final BinaryDocValues binaryDocValues) { + public static long getTotalLiveDocsCount(final BinaryDocValues binaryDocValues) { long totalLiveDocs; if (binaryDocValues instanceof KNN80BinaryDocValues) { totalLiveDocs = ((KNN80BinaryDocValues) binaryDocValues).getTotalLiveDocs(); diff --git a/src/main/java/org/opensearch/knn/jni/FaissService.java b/src/main/java/org/opensearch/knn/jni/FaissService.java index f718ce6d5f..3b68d72bac 100644 --- a/src/main/java/org/opensearch/knn/jni/FaissService.java +++ b/src/main/java/org/opensearch/knn/jni/FaissService.java @@ -49,6 +49,18 @@ class FaissService { }); } + public static native long initIndex(long numDocs, int dim, Map parameters); + + public static native long initBinaryIndex(long numDocs, int dim, Map parameters); + + public static native void insertToIndex(int[] ids, long vectorsAddress, int dim, long indexAddress, Map parameters); + + public static native void insertToBinaryIndex(int[] ids, long vectorsAddress, int dim, long indexAddress, Map parameters); + + public static native void writeIndex(long indexAddress, String indexPath, Map parameters); + + public static native void writeBinaryIndex(long indexAddress, String indexPath, Map parameters); + /** * Create an index for the native library The memory occupied by the vectorsAddress will be freed up during the * function call. So Java layer doesn't need to free up the memory. This is not an ideal behavior because Java layer diff --git a/src/main/java/org/opensearch/knn/jni/JNIService.java b/src/main/java/org/opensearch/knn/jni/JNIService.java index ed6a169c10..08ee0cf049 100644 --- a/src/main/java/org/opensearch/knn/jni/JNIService.java +++ b/src/main/java/org/opensearch/knn/jni/JNIService.java @@ -25,6 +25,46 @@ public class JNIService { private static final String FAISS_BINARY_INDEX_PREFIX = "B"; + public static long initIndexFromScratch(long size, int dim, Map parameters, KNNEngine knnEngine) { + if (KNNEngine.FAISS == knnEngine) { + if (parameters.get(KNNConstants.INDEX_DESCRIPTION_PARAMETER) != null + && parameters.get(KNNConstants.INDEX_DESCRIPTION_PARAMETER).toString().startsWith(FAISS_BINARY_INDEX_PREFIX)) { + return FaissService.initBinaryIndex(size, dim, parameters); + } else { + return FaissService.initIndex(size, dim, parameters); + } + } + + throw new IllegalArgumentException(String.format("initIndexFromScratch not supported for provided engine : %s", knnEngine.getName())); + } + + public static void insertToIndex(int[] docs, long vectorAddress, int dimension, Map parameters, + long indexAddress, KNNEngine knnEngine) { + if (KNNEngine.FAISS == knnEngine) { + if (parameters.get(KNNConstants.INDEX_DESCRIPTION_PARAMETER) != null + && parameters.get(KNNConstants.INDEX_DESCRIPTION_PARAMETER).toString().startsWith(FAISS_BINARY_INDEX_PREFIX)) { + FaissService.insertToBinaryIndex(docs, vectorAddress, dimension, indexAddress, parameters); + } else { + FaissService.insertToIndex(docs, vectorAddress, dimension, indexAddress, parameters); + } + } + + throw new IllegalArgumentException(String.format("insertToIndex not supported for provided engine : %s", knnEngine.getName())); + } + + public static void writeIndex(String indexPath, long indexAddress, KNNEngine knnEngine, Map parameters) { + if (KNNEngine.FAISS == knnEngine) { + if (parameters.get(KNNConstants.INDEX_DESCRIPTION_PARAMETER) != null + && parameters.get(KNNConstants.INDEX_DESCRIPTION_PARAMETER).toString().startsWith(FAISS_BINARY_INDEX_PREFIX)) { + FaissService.writeBinaryIndex(indexAddress, indexPath, parameters); + } else { + FaissService.writeIndex(indexAddress, indexPath, parameters); + } + } + + throw new IllegalArgumentException(String.format("writeIndex not supported for provided engine : %s", knnEngine.getName())); + } + /** * Create an index for the native library. The memory occupied by the vectorsAddress will be freed up during the * function call. So Java layer doesn't need to free up the memory. This is not an ideal behavior because Java layer From d5a5d6eaf3cc845bb1a6a08194b973912b5fe713 Mon Sep 17 00:00:00 2001 From: Andrew Klepchick Date: Wed, 10 Jul 2024 13:01:09 -0700 Subject: [PATCH 02/16] Added parameters to writeIndex Signed-off-by: Andrew Klepchick --- .../knn/index/codec/KNN80Codec/KNN80DocValuesConsumer.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/opensearch/knn/index/codec/KNN80Codec/KNN80DocValuesConsumer.java b/src/main/java/org/opensearch/knn/index/codec/KNN80Codec/KNN80DocValuesConsumer.java index 27b7e61693..074c27f1f1 100644 --- a/src/main/java/org/opensearch/knn/index/codec/KNN80Codec/KNN80DocValuesConsumer.java +++ b/src/main/java/org/opensearch/knn/index/codec/KNN80Codec/KNN80DocValuesConsumer.java @@ -249,8 +249,12 @@ private void insertToIndex(KNNCodecUtil.VectorBatch pair, KNNEngine knnEngine, l } private void writeIndex(long indexAddress, String indexPath, KNNEngine knnEngine) { + Map parameters = ImmutableMap.of( + KNNConstants.INDEX_THREAD_QTY, + KNNSettings.state().getSettingValue(KNNSettings.KNN_ALGO_PARAM_INDEX_THREAD_QTY) + ); AccessController.doPrivileged((PrivilegedAction) () -> { - JNIService.writeIndex(indexPath, indexAddress, knnEngine); + JNIService.writeIndex(indexPath, indexAddress, knnEngine, parameters); return null; }); } From 978e200e943a63b9f85e2af33f0be7a010f91d59 Mon Sep 17 00:00:00 2001 From: Andrew Klepchick Date: Wed, 10 Jul 2024 16:12:35 -0700 Subject: [PATCH 03/16] Fixed commit, runs test cases Signed-off-by: Andrew Klepchick --- jni/include/faiss_index_service.h | 8 +- jni/src/faiss_index_service.cpp | 24 +----- jni/src/faiss_wrapper.cpp | 79 +------------------ .../org_opensearch_knn_jni_FaissService.cpp | 4 +- .../KNN80Codec/KNN80DocValuesConsumer.java | 1 - .../knn/index/codec/util/KNNCodecUtil.java | 25 +++++- .../org/opensearch/knn/jni/FaissService.java | 8 +- .../org/opensearch/knn/jni/JNIService.java | 16 +++- 8 files changed, 54 insertions(+), 111 deletions(-) diff --git a/jni/include/faiss_index_service.h b/jni/include/faiss_index_service.h index 05d1b9be35..9cd5f35339 100644 --- a/jni/include/faiss_index_service.h +++ b/jni/include/faiss_index_service.h @@ -59,7 +59,7 @@ class IndexService { * @param idMap a map of document id and vector id * @param parameters parameters to be applied to faiss index */ - virtual void insertToIndex(knn_jni::JNIUtilInterface *jniUtil, JNIEnv *env, faiss::MetricType metric, std::string indexDescription, int dim, int numIds, int threadCount, int64_t vectorsAddress, std::vector &ids, jlong idMapAddress, std::unordered_map parameters); + virtual void insertToIndex(int dim, int numIds, int threadCount, int64_t vectorsAddress, std::vector &ids, jlong idMapAddress, std::unordered_map parameters); /** * Write index to disk * @@ -72,7 +72,7 @@ class IndexService { * @param idMap a map of document id and vector id * @param parameters parameters to be applied to faiss index */ - virtual void writeIndex(knn_jni::JNIUtilInterface *jniUtil, JNIEnv *env, faiss::MetricType metric, std::string indexDescription, int threadCount, std::string indexPath, jlong idMapAddress, std::unordered_map parameters); + virtual void writeIndex(int threadCount, std::string indexPath, jlong idMapAddress, std::unordered_map parameters); // TODO Remove dependency on JNIUtilInterface and JNIEnv // TODO Reduce the number of parameters @@ -145,7 +145,7 @@ class BinaryIndexService : public IndexService { * @param idMap a map of document id and vector id * @param parameters parameters to be applied to faiss index */ - virtual void insertToIndex(knn_jni::JNIUtilInterface *jniUtil, JNIEnv *env, faiss::MetricType metric, std::string indexDescription, int dim, int numIds, int threadCount, int64_t vectorsAddress, std::vector &ids, jlong idMapAddress, std::unordered_map parameters) override; + virtual void insertToIndex(int dim, int numIds, int threadCount, int64_t vectorsAddress, std::vector &ids, jlong idMapAddress, std::unordered_map parameters) override; /** * Write index to disk * @@ -158,7 +158,7 @@ class BinaryIndexService : public IndexService { * @param idMap a map of document id and vector id * @param parameters parameters to be applied to faiss index */ - virtual void writeIndex(knn_jni::JNIUtilInterface *jniUtil, JNIEnv *env, faiss::MetricType metric, std::string indexDescription, int threadCount, std::string indexPath, jlong idMapAddress, std::unordered_map parameters) override; + virtual void writeIndex(int threadCount, std::string indexPath, jlong idMapAddress, std::unordered_map parameters) override; /** * Create binary index * diff --git a/jni/src/faiss_index_service.cpp b/jni/src/faiss_index_service.cpp index 80782f76ac..44e238f628 100644 --- a/jni/src/faiss_index_service.cpp +++ b/jni/src/faiss_index_service.cpp @@ -100,10 +100,6 @@ jlong IndexService::initIndex( } void IndexService::insertToIndex( - knn_jni::JNIUtilInterface * jniUtil, - JNIEnv * env, - faiss::MetricType metric, - std::string indexDescription, int dim, int numIds, int threadCount, @@ -112,8 +108,8 @@ void IndexService::insertToIndex( jlong idMapAddress, std::unordered_map parameters ) { - // Read vectors from memory address (unique ptr since we want to remove from memory after use) - std::unique_ptr> inputVectors (reinterpret_cast*>(vectorsAddress)); + // Read vectors from memory address + std::vector * inputVectors = reinterpret_cast*>(vectorsAddress); // The number of vectors can be int here because a lucene segment number of total docs never crosses INT_MAX value int numVectors = (int) (inputVectors->size() / (uint64_t) dim); @@ -137,10 +133,6 @@ void IndexService::insertToIndex( } void IndexService::writeIndex( - knn_jni::JNIUtilInterface * jniUtil, - JNIEnv * env, - faiss::MetricType metric, - std::string indexDescription, int threadCount, std::string indexPath, jlong idMapAddress, @@ -223,7 +215,7 @@ jlong BinaryIndexService::initIndex( ) { // Removed the check for number of vectors // Don't use unique_ptr here since we want to access the index in the future. - faiss::IndexBinary * indexWriter(faissMethods->indexBinaryFactory(dim, indexDescription.c_str(), metric)); + faiss::IndexBinary * indexWriter(faissMethods->indexBinaryFactory(dim, indexDescription.c_str())); // Set thread count if it is passed in as a parameter. Setting this variable will only impact the current thread if(threadCount != 0) { @@ -254,10 +246,6 @@ jlong BinaryIndexService::initIndex( } void BinaryIndexService::insertToIndex( - knn_jni::JNIUtilInterface * jniUtil, - JNIEnv * env, - faiss::MetricType metric, - std::string indexDescription, int dim, int numIds, int threadCount, @@ -267,7 +255,7 @@ void BinaryIndexService::insertToIndex( std::unordered_map parameters ) { // Read vectors from memory address (unique ptr since we want to remove from memory after use) - std::unique_ptr> inputVectors (reinterpret_cast*>(vectorsAddress)); + std::vector * inputVectors = reinterpret_cast*>(vectorsAddress); // The number of vectors can be int here because a lucene segment number of total docs never crosses INT_MAX value int numVectors = (int) (inputVectors->size() / (uint64_t) (dim / 8)); @@ -291,10 +279,6 @@ void BinaryIndexService::insertToIndex( } void BinaryIndexService::writeIndex( - knn_jni::JNIUtilInterface * jniUtil, - JNIEnv * env, - faiss::MetricType metric, - std::string indexDescription, int threadCount, std::string indexPath, jlong idMapAddress, diff --git a/jni/src/faiss_wrapper.cpp b/jni/src/faiss_wrapper.cpp index b599a304bb..96a74217f0 100644 --- a/jni/src/faiss_wrapper.cpp +++ b/jni/src/faiss_wrapper.cpp @@ -136,57 +136,6 @@ jlong knn_jni::faiss_wrapper::InitIndex(knn_jni::JNIUtilInterface * jniUtil, JNI return indexService->initIndex(jniUtil, env, metric, indexDescriptionCpp, dim, numDocs, threadCount, subParametersCpp); } -jlong knn_jni::faiss_wrapper::InitIndexFromTemplate(knn_jni::JNIUtilInterface * jniUtil, JNIEnv * env, jlong numDocs, jint dimJ, - jobject parametersJ, jbyteArray templateIndexJ, IndexService* indexService) { - - if(dimJ <= 0) { - throw std::runtime_error("Vectors dimensions cannot be less than or equal to 0"); - } - - if (parametersJ == nullptr) { - throw std::runtime_error("Parameters cannot be null"); - } - - // parametersJ is a Java Map. ConvertJavaMapToCppMap converts it to a c++ map - // so that it is easier to access. - auto parametersCpp = jniUtil->ConvertJavaMapToCppMap(env, parametersJ); - - // Parameters to pass - // Metric type - jobject spaceTypeJ = knn_jni::GetJObjectFromMapOrThrow(parametersCpp, knn_jni::SPACE_TYPE); - std::string spaceTypeCpp(jniUtil->ConvertJavaObjectToCppString(env, spaceTypeJ)); - faiss::MetricType metric = TranslateSpaceToMetric(spaceTypeCpp); - jniUtil->DeleteLocalRef(env, spaceTypeJ); - - // Dimension - int dim = (int)dimJ; - - // Number of docs - int docs = (int)numDocs; - - // Index description - jobject indexDescriptionJ = knn_jni::GetJObjectFromMapOrThrow(parametersCpp, knn_jni::INDEX_DESCRIPTION); - std::string indexDescriptionCpp(jniUtil->ConvertJavaObjectToCppString(env, indexDescriptionJ)); - jniUtil->DeleteLocalRef(env, indexDescriptionJ); - - // Thread count - int threadCount = 0; - if(parametersCpp.find(knn_jni::INDEX_THREAD_QUANTITY) != parametersCpp.end()) { - threadCount = jniUtil->ConvertJavaObjectToCppInteger(env, parametersCpp[knn_jni::INDEX_THREAD_QUANTITY]); - } - - // Extra parameters - // TODO: parse the entire map and remove jni object - std::unordered_map subParametersCpp; - if(parametersCpp.find(knn_jni::PARAMETERS) != parametersCpp.end()) { - subParametersCpp = jniUtil->ConvertJavaMapToCppMap(env, parametersCpp[knn_jni::PARAMETERS]); - } - // end parameters to pass - - // Create index - return indexService->initIndex(jniUtil, env, metric, indexDescriptionCpp, dim, numDocs, threadCount, subParametersCpp); -} - void knn_jni::faiss_wrapper::InsertToIndex(knn_jni::JNIUtilInterface * jniUtil, JNIEnv * env, jintArray idsJ, jlong vectorsAddressJ, jint dimJ, jlong index_ptr, jobject parametersJ, IndexService* indexService) { if (idsJ == nullptr) { @@ -209,24 +158,12 @@ void knn_jni::faiss_wrapper::InsertToIndex(knn_jni::JNIUtilInterface * jniUtil, // so that it is easier to access. auto parametersCpp = jniUtil->ConvertJavaMapToCppMap(env, parametersJ); - // Parameters to pass - // Metric type - jobject spaceTypeJ = knn_jni::GetJObjectFromMapOrThrow(parametersCpp, knn_jni::SPACE_TYPE); - std::string spaceTypeCpp(jniUtil->ConvertJavaObjectToCppString(env, spaceTypeJ)); - faiss::MetricType metric = TranslateSpaceToMetric(spaceTypeCpp); - jniUtil->DeleteLocalRef(env, spaceTypeJ); - // Dimension int dim = (int)dimJ; // Number of vectors int numIds = jniUtil->GetJavaIntArrayLength(env, idsJ); - // Index description - jobject indexDescriptionJ = knn_jni::GetJObjectFromMapOrThrow(parametersCpp, knn_jni::INDEX_DESCRIPTION); - std::string indexDescriptionCpp(jniUtil->ConvertJavaObjectToCppString(env, indexDescriptionJ)); - jniUtil->DeleteLocalRef(env, indexDescriptionJ); - // Thread count int threadCount = 0; if(parametersCpp.find(knn_jni::INDEX_THREAD_QUANTITY) != parametersCpp.end()) { @@ -248,7 +185,7 @@ void knn_jni::faiss_wrapper::InsertToIndex(knn_jni::JNIUtilInterface * jniUtil, // end parameters to pass // Create index - indexService->insertToIndex(jniUtil, env, metric, indexDescriptionCpp, dim, numIds, threadCount, vectorsAddress, ids, index_ptr, subParametersCpp); + indexService->insertToIndex(dim, numIds, threadCount, vectorsAddress, ids, index_ptr, subParametersCpp); } void knn_jni::faiss_wrapper::WriteIndex(knn_jni::JNIUtilInterface * jniUtil, JNIEnv * env, @@ -266,18 +203,6 @@ void knn_jni::faiss_wrapper::WriteIndex(knn_jni::JNIUtilInterface * jniUtil, JNI // so that it is easier to access. auto parametersCpp = jniUtil->ConvertJavaMapToCppMap(env, parametersJ); - // Parameters to pass - // Metric type - jobject spaceTypeJ = knn_jni::GetJObjectFromMapOrThrow(parametersCpp, knn_jni::SPACE_TYPE); - std::string spaceTypeCpp(jniUtil->ConvertJavaObjectToCppString(env, spaceTypeJ)); - faiss::MetricType metric = TranslateSpaceToMetric(spaceTypeCpp); - jniUtil->DeleteLocalRef(env, spaceTypeJ); - - // Index description - jobject indexDescriptionJ = knn_jni::GetJObjectFromMapOrThrow(parametersCpp, knn_jni::INDEX_DESCRIPTION); - std::string indexDescriptionCpp(jniUtil->ConvertJavaObjectToCppString(env, indexDescriptionJ)); - jniUtil->DeleteLocalRef(env, indexDescriptionJ); - // Thread count int threadCount = 0; if(parametersCpp.find(knn_jni::INDEX_THREAD_QUANTITY) != parametersCpp.end()) { @@ -296,7 +221,7 @@ void knn_jni::faiss_wrapper::WriteIndex(knn_jni::JNIUtilInterface * jniUtil, JNI // end parameters to pass // Create index - indexService->writeIndex(jniUtil, env, metric, indexDescriptionCpp, threadCount, indexPathCpp, index_ptr, subParametersCpp); + indexService->writeIndex(threadCount, indexPathCpp, index_ptr, subParametersCpp); } void knn_jni::faiss_wrapper::CreateIndex(knn_jni::JNIUtilInterface * jniUtil, JNIEnv * env, jintArray idsJ, jlong vectorsAddressJ, jint dimJ, diff --git a/jni/src/org_opensearch_knn_jni_FaissService.cpp b/jni/src/org_opensearch_knn_jni_FaissService.cpp index d6693b7546..f1505d965d 100644 --- a/jni/src/org_opensearch_knn_jni_FaissService.cpp +++ b/jni/src/org_opensearch_knn_jni_FaissService.cpp @@ -39,7 +39,7 @@ void JNI_OnUnload(JavaVM *vm, void *reserved) { jniUtil.Uninitialize(env); } -JNIEXPORT jlong JNICALL Java_org_opensearch_knn_jni_FaissService_initIndex(JNIEnv * env, jclass cls, jintArray idsJ, +JNIEXPORT jlong JNICALL Java_org_opensearch_knn_jni_FaissService_initIndex(JNIEnv * env, jclass cls, jlong numDocs, jint dimJ, jobject parametersJ) { @@ -52,7 +52,7 @@ JNIEXPORT jlong JNICALL Java_org_opensearch_knn_jni_FaissService_initIndex(JNIEn } } -JNIEXPORT jlong JNICALL Java_org_opensearch_knn_jni_FaissService_initBinaryIndex(JNIEnv * env, jclass cls, jintArray idsJ, +JNIEXPORT jlong JNICALL Java_org_opensearch_knn_jni_FaissService_initBinaryIndex(JNIEnv * env, jclass cls, jlong numDocs, jint dimJ, jobject parametersJ) { diff --git a/src/main/java/org/opensearch/knn/index/codec/KNN80Codec/KNN80DocValuesConsumer.java b/src/main/java/org/opensearch/knn/index/codec/KNN80Codec/KNN80DocValuesConsumer.java index 074c27f1f1..948ddb8022 100644 --- a/src/main/java/org/opensearch/knn/index/codec/KNN80Codec/KNN80DocValuesConsumer.java +++ b/src/main/java/org/opensearch/knn/index/codec/KNN80Codec/KNN80DocValuesConsumer.java @@ -204,7 +204,6 @@ private long initIndexFromScratch(FieldInfo fieldInfo, long size, int dim, KNNEn // parametersString will be null when legacy mapper is used if (parametersString == null) { - parameters.put(KNNConstants.SPACE_TYPE, fieldAttributes.getOrDefault(KNNConstants.SPACE_TYPE, SpaceType.DEFAULT.getValue())); String efConstruction = fieldAttributes.get(KNNConstants.HNSW_ALGO_EF_CONSTRUCTION); Map algoParams = new HashMap<>(); diff --git a/src/main/java/org/opensearch/knn/index/codec/util/KNNCodecUtil.java b/src/main/java/org/opensearch/knn/index/codec/util/KNNCodecUtil.java index f6cceab903..cd8a3b3160 100644 --- a/src/main/java/org/opensearch/knn/index/codec/util/KNNCodecUtil.java +++ b/src/main/java/org/opensearch/knn/index/codec/util/KNNCodecUtil.java @@ -90,7 +90,13 @@ public static KNNCodecUtil.VectorBatch getFloats(BinaryDocValues values) throws if (vectorList.isEmpty() == false) { vectorAddress = JNICommons.storeVectorData(vectorAddress, vectorList.toArray(new float[][] {}), totalLiveDocs * dimension); } - return new KNNCodecUtil.VectorBatch(docIdList.stream().mapToInt(Integer::intValue).toArray(), vectorAddress, dimension, serializationMode, true); + return new KNNCodecUtil.VectorBatch( + docIdList.stream().mapToInt(Integer::intValue).toArray(), + vectorAddress, + dimension, + serializationMode, + true + ); } public static KNNCodecUtil.VectorBatch getFloatsBatch(BinaryDocValues values) throws IOException { @@ -103,6 +109,7 @@ public static KNNCodecUtil.VectorBatch getFloatsBatch(BinaryDocValues values) th long totalLiveDocs = getTotalLiveDocsCount(values); long vectorsStreamingMemoryLimit = KNNSettings.getVectorStreamingMemoryLimit().getBytes(); long vectorsPerTransfer = Integer.MIN_VALUE; + //long vectorsPerTransfer = 1; for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) { BytesRef bytesref = values.binaryValue(); @@ -133,14 +140,26 @@ public static KNNCodecUtil.VectorBatch getFloatsBatch(BinaryDocValues values) th // We should probably come up with a better way to reuse the vectorList memory which we have // created. Problem here is doing like this can lead to a lot of list memory which is of no use and // will be garbage collected later on, but it creates pressure on JVM. We should revisit this. - return new KNNCodecUtil.VectorBatch(docIdList.stream().mapToInt(Integer::intValue).toArray(), vectorAddress, dimension, serializationMode, false); + return new KNNCodecUtil.VectorBatch( + docIdList.stream().mapToInt(Integer::intValue).toArray(), + vectorAddress, + dimension, + serializationMode, + false + ); } } } if (vectorList.isEmpty() == false) { vectorAddress = JNICommons.storeVectorData(vectorAddress, vectorList.toArray(new float[][] {}), totalLiveDocs * dimension); } - return new KNNCodecUtil.VectorBatch(docIdList.stream().mapToInt(Integer::intValue).toArray(), vectorAddress, dimension, serializationMode, true); + return new KNNCodecUtil.VectorBatch( + docIdList.stream().mapToInt(Integer::intValue).toArray(), + vectorAddress, + dimension, + serializationMode, + true + ); } public static long calculateArraySize(int numVectors, int vectorLength, SerializationMode serializationMode) { diff --git a/src/main/java/org/opensearch/knn/jni/FaissService.java b/src/main/java/org/opensearch/knn/jni/FaissService.java index 3b68d72bac..d2042cb0a7 100644 --- a/src/main/java/org/opensearch/knn/jni/FaissService.java +++ b/src/main/java/org/opensearch/knn/jni/FaissService.java @@ -55,7 +55,13 @@ class FaissService { public static native void insertToIndex(int[] ids, long vectorsAddress, int dim, long indexAddress, Map parameters); - public static native void insertToBinaryIndex(int[] ids, long vectorsAddress, int dim, long indexAddress, Map parameters); + public static native void insertToBinaryIndex( + int[] ids, + long vectorsAddress, + int dim, + long indexAddress, + Map parameters + ); public static native void writeIndex(long indexAddress, String indexPath, Map parameters); diff --git a/src/main/java/org/opensearch/knn/jni/JNIService.java b/src/main/java/org/opensearch/knn/jni/JNIService.java index 08ee0cf049..9ffd83055d 100644 --- a/src/main/java/org/opensearch/knn/jni/JNIService.java +++ b/src/main/java/org/opensearch/knn/jni/JNIService.java @@ -35,11 +35,19 @@ public static long initIndexFromScratch(long size, int dim, Map } } - throw new IllegalArgumentException(String.format("initIndexFromScratch not supported for provided engine : %s", knnEngine.getName())); + throw new IllegalArgumentException( + String.format("initIndexFromScratch not supported for provided engine : %s", knnEngine.getName()) + ); } - public static void insertToIndex(int[] docs, long vectorAddress, int dimension, Map parameters, - long indexAddress, KNNEngine knnEngine) { + public static void insertToIndex( + int[] docs, + long vectorAddress, + int dimension, + Map parameters, + long indexAddress, + KNNEngine knnEngine + ) { if (KNNEngine.FAISS == knnEngine) { if (parameters.get(KNNConstants.INDEX_DESCRIPTION_PARAMETER) != null && parameters.get(KNNConstants.INDEX_DESCRIPTION_PARAMETER).toString().startsWith(FAISS_BINARY_INDEX_PREFIX)) { @@ -47,6 +55,7 @@ public static void insertToIndex(int[] docs, long vectorAddress, int dimension, } else { FaissService.insertToIndex(docs, vectorAddress, dimension, indexAddress, parameters); } + return; } throw new IllegalArgumentException(String.format("insertToIndex not supported for provided engine : %s", knnEngine.getName())); @@ -60,6 +69,7 @@ public static void writeIndex(String indexPath, long indexAddress, KNNEngine knn } else { FaissService.writeIndex(indexAddress, indexPath, parameters); } + return; } throw new IllegalArgumentException(String.format("writeIndex not supported for provided engine : %s", knnEngine.getName())); From 23a80e1cd84db50151146861331a1c384b26d09d Mon Sep 17 00:00:00 2001 From: Andrew Klepchick Date: Wed, 10 Jul 2024 16:16:53 -0700 Subject: [PATCH 04/16] Spotless apply Signed-off-by: Andrew Klepchick --- .../java/org/opensearch/knn/index/codec/util/KNNCodecUtil.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/opensearch/knn/index/codec/util/KNNCodecUtil.java b/src/main/java/org/opensearch/knn/index/codec/util/KNNCodecUtil.java index cd8a3b3160..f7e078d91a 100644 --- a/src/main/java/org/opensearch/knn/index/codec/util/KNNCodecUtil.java +++ b/src/main/java/org/opensearch/knn/index/codec/util/KNNCodecUtil.java @@ -109,7 +109,7 @@ public static KNNCodecUtil.VectorBatch getFloatsBatch(BinaryDocValues values) th long totalLiveDocs = getTotalLiveDocsCount(values); long vectorsStreamingMemoryLimit = KNNSettings.getVectorStreamingMemoryLimit().getBytes(); long vectorsPerTransfer = Integer.MIN_VALUE; - //long vectorsPerTransfer = 1; + // long vectorsPerTransfer = 1; for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) { BytesRef bytesref = values.binaryValue(); From e025791fac4a0dc123742348835dfa18f31e3842 Mon Sep 17 00:00:00 2001 From: Andrew Klepchick Date: Thu, 11 Jul 2024 10:42:28 -0700 Subject: [PATCH 05/16] Attempt to fix memory leak Signed-off-by: Andrew Klepchick --- jni/src/faiss_index_service.cpp | 1 + .../org/opensearch/knn/index/codec/util/KNNCodecUtil.java | 7 +++---- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/jni/src/faiss_index_service.cpp b/jni/src/faiss_index_service.cpp index 44e238f628..aae9a57698 100644 --- a/jni/src/faiss_index_service.cpp +++ b/jni/src/faiss_index_service.cpp @@ -149,6 +149,7 @@ void IndexService::writeIndex( faissMethods->writeIndex(idMap, indexPath.c_str()); // Free the memory used by the index + idMap->reset(); delete idMap; } diff --git a/src/main/java/org/opensearch/knn/index/codec/util/KNNCodecUtil.java b/src/main/java/org/opensearch/knn/index/codec/util/KNNCodecUtil.java index f7e078d91a..a12e7d4dde 100644 --- a/src/main/java/org/opensearch/knn/index/codec/util/KNNCodecUtil.java +++ b/src/main/java/org/opensearch/knn/index/codec/util/KNNCodecUtil.java @@ -105,8 +105,7 @@ public static KNNCodecUtil.VectorBatch getFloatsBatch(BinaryDocValues values) th long vectorAddress = 0; int dimension = 0; SerializationMode serializationMode = SerializationMode.COLLECTION_OF_FLOATS; - - long totalLiveDocs = getTotalLiveDocsCount(values); += long vectorsStreamingMemoryLimit = KNNSettings.getVectorStreamingMemoryLimit().getBytes(); long vectorsPerTransfer = Integer.MIN_VALUE; // long vectorsPerTransfer = 1; @@ -135,7 +134,7 @@ public static KNNCodecUtil.VectorBatch getFloatsBatch(BinaryDocValues values) th vectorAddress = JNICommons.storeVectorData( vectorAddress, vectorList.toArray(new float[][] {}), - totalLiveDocs * dimension + docIdList.size() * dimension ); // We should probably come up with a better way to reuse the vectorList memory which we have // created. Problem here is doing like this can lead to a lot of list memory which is of no use and @@ -151,7 +150,7 @@ public static KNNCodecUtil.VectorBatch getFloatsBatch(BinaryDocValues values) th } } if (vectorList.isEmpty() == false) { - vectorAddress = JNICommons.storeVectorData(vectorAddress, vectorList.toArray(new float[][] {}), totalLiveDocs * dimension); + vectorAddress = JNICommons.storeVectorData(vectorAddress, vectorList.toArray(new float[][] {}), docIdList.size() * dimension); } return new KNNCodecUtil.VectorBatch( docIdList.stream().mapToInt(Integer::intValue).toArray(), From 3143a3e138762e0e2597352bdb7457ad8b473e5a Mon Sep 17 00:00:00 2001 From: Andrew Klepchick Date: Thu, 11 Jul 2024 10:59:14 -0700 Subject: [PATCH 06/16] Fixed syntax error Signed-off-by: Andrew Klepchick --- .../java/org/opensearch/knn/index/codec/util/KNNCodecUtil.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/opensearch/knn/index/codec/util/KNNCodecUtil.java b/src/main/java/org/opensearch/knn/index/codec/util/KNNCodecUtil.java index a12e7d4dde..1be9c526e6 100644 --- a/src/main/java/org/opensearch/knn/index/codec/util/KNNCodecUtil.java +++ b/src/main/java/org/opensearch/knn/index/codec/util/KNNCodecUtil.java @@ -105,7 +105,7 @@ public static KNNCodecUtil.VectorBatch getFloatsBatch(BinaryDocValues values) th long vectorAddress = 0; int dimension = 0; SerializationMode serializationMode = SerializationMode.COLLECTION_OF_FLOATS; -= + long vectorsStreamingMemoryLimit = KNNSettings.getVectorStreamingMemoryLimit().getBytes(); long vectorsPerTransfer = Integer.MIN_VALUE; // long vectorsPerTransfer = 1; From c4943f7c8e171b792427f33f6fa3944316abaff8 Mon Sep 17 00:00:00 2001 From: Andrew Klepchick Date: Thu, 11 Jul 2024 13:14:11 -0700 Subject: [PATCH 07/16] Possibly fix mem leak? Signed-off-by: Andrew Klepchick --- jni/src/faiss_wrapper.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/jni/src/faiss_wrapper.cpp b/jni/src/faiss_wrapper.cpp index 96a74217f0..3e5144f1e2 100644 --- a/jni/src/faiss_wrapper.cpp +++ b/jni/src/faiss_wrapper.cpp @@ -672,6 +672,7 @@ jobjectArray knn_jni::faiss_wrapper::QueryBinaryIndex_WithFilter(knn_jni::JNIUti void knn_jni::faiss_wrapper::Free(jlong indexPointer) { auto *indexWrapper = reinterpret_cast(indexPointer); + indexWrapper->reset(); delete indexWrapper; } From cf259e335715d04eb7e8a86ee690539cdfab37a0 Mon Sep 17 00:00:00 2001 From: Andrew Klepchick Date: Thu, 11 Jul 2024 14:51:02 -0700 Subject: [PATCH 08/16] Fixed initial allocation Signed-off-by: Andrew Klepchick --- jni/src/faiss_index_service.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jni/src/faiss_index_service.cpp b/jni/src/faiss_index_service.cpp index aae9a57698..8bb82203f0 100644 --- a/jni/src/faiss_index_service.cpp +++ b/jni/src/faiss_index_service.cpp @@ -92,7 +92,7 @@ jlong IndexService::initIndex( if(hnsw != NULL) { faiss::IndexFlat * storage = dynamic_cast(hnsw->storage); if(storage != NULL) { - storage->codes.reserve(dim * numVectors); + storage->codes.reserve(dim * numVectors * sizeof(float)); } } From 43f8e3dd5b2a43f3952d06421f8bf933a978453c Mon Sep 17 00:00:00 2001 From: Andrew Klepchick Date: Thu, 11 Jul 2024 16:57:49 -0700 Subject: [PATCH 09/16] Changes to freeing indices Signed-off-by: Andrew Klepchick --- jni/src/faiss_index_service.cpp | 9 ++++++++- jni/src/faiss_wrapper.cpp | 1 - 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/jni/src/faiss_index_service.cpp b/jni/src/faiss_index_service.cpp index 8bb82203f0..2fd5864c2d 100644 --- a/jni/src/faiss_index_service.cpp +++ b/jni/src/faiss_index_service.cpp @@ -94,6 +94,9 @@ jlong IndexService::initIndex( if(storage != NULL) { storage->codes.reserve(dim * numVectors * sizeof(float)); } + hnsw->hnsw.levels.reserve(numVectors); + hnsw->hnsw.neighbors.reserve(numVectors); + hnsw->hnsw.offsets.reserve(numVectors); } return (jlong)idMap; @@ -149,7 +152,7 @@ void IndexService::writeIndex( faissMethods->writeIndex(idMap, indexPath.c_str()); // Free the memory used by the index - idMap->reset(); + delete idMap->index; delete idMap; } @@ -241,6 +244,9 @@ jlong BinaryIndexService::initIndex( if(storage != NULL) { storage->xb.reserve(dim / 8 * numVectors); } + hnsw->hnsw.levels.reserve(numVectors); + hnsw->hnsw.neighbors.reserve(numVectors); + hnsw->hnsw.offsets.reserve(numVectors); } return (jlong)idMap; @@ -296,6 +302,7 @@ void BinaryIndexService::writeIndex( faissMethods->writeIndexBinary(idMap, indexPath.c_str()); // Free the memory used by the index + delete idMap->index; delete idMap; } diff --git a/jni/src/faiss_wrapper.cpp b/jni/src/faiss_wrapper.cpp index 3e5144f1e2..96a74217f0 100644 --- a/jni/src/faiss_wrapper.cpp +++ b/jni/src/faiss_wrapper.cpp @@ -672,7 +672,6 @@ jobjectArray knn_jni::faiss_wrapper::QueryBinaryIndex_WithFilter(knn_jni::JNIUti void knn_jni::faiss_wrapper::Free(jlong indexPointer) { auto *indexWrapper = reinterpret_cast(indexPointer); - indexWrapper->reset(); delete indexWrapper; } From cac43725161ae6eb20571cf6a29b6b3057d8f6b2 Mon Sep 17 00:00:00 2001 From: Andrew Klepchick Date: Tue, 16 Jul 2024 15:31:11 -0700 Subject: [PATCH 10/16] Added memory reserve for the id map Signed-off-by: Andrew Klepchick --- jni/src/faiss_index_service.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/jni/src/faiss_index_service.cpp b/jni/src/faiss_index_service.cpp index 2fd5864c2d..fd8cb42027 100644 --- a/jni/src/faiss_index_service.cpp +++ b/jni/src/faiss_index_service.cpp @@ -87,6 +87,8 @@ jlong IndexService::initIndex( // Add vectors faiss::IndexIDMap * idMap(faissMethods->indexIdMap(indexWriter)); + idMap->id_map.reserve(numVectors); + faiss::IndexHNSW * hnsw = dynamic_cast(idMap->index); if(hnsw != NULL) { @@ -237,6 +239,8 @@ jlong BinaryIndexService::initIndex( // Add vectors faiss::IndexBinaryIDMap * idMap(faissMethods->indexBinaryIdMap(indexWriter)); + idMap->id_map.reserve(numVectors); + faiss::IndexBinaryHNSW * hnsw = dynamic_cast(idMap->index); if(hnsw != NULL) { From f91aa4f464b93ff8b0e79e417e4a695157387580 Mon Sep 17 00:00:00 2001 From: Andrew Klepchick Date: Tue, 16 Jul 2024 15:41:45 -0700 Subject: [PATCH 11/16] Added some extra comments and code cleanup Signed-off-by: Andrew Klepchick --- jni/include/faiss_index_service.h | 24 ++++--------------- jni/src/faiss_index_service.cpp | 8 +++++++ .../KNN80Codec/KNN80DocValuesConsumer.java | 2 -- .../knn/index/codec/util/KNNCodecUtil.java | 3 --- 4 files changed, 12 insertions(+), 25 deletions(-) diff --git a/jni/include/faiss_index_service.h b/jni/include/faiss_index_service.h index 9cd5f35339..a1d319fe47 100644 --- a/jni/include/faiss_index_service.h +++ b/jni/include/faiss_index_service.h @@ -48,28 +48,20 @@ class IndexService { /** * Add vectors to index * - * @param jniUtil jni util - * @param env jni environment - * @param metric space type for distance calculation - * @param indexDescription index description to be used by faiss index factory * @param dim dimension of vectors * @param numIds number of vectors * @param threadCount number of thread count to be used while adding data * @param vectorsAddress memory address which is holding vector data - * @param idMap a map of document id and vector id + * @param idMapAddress index to insert to * @param parameters parameters to be applied to faiss index */ virtual void insertToIndex(int dim, int numIds, int threadCount, int64_t vectorsAddress, std::vector &ids, jlong idMapAddress, std::unordered_map parameters); /** * Write index to disk * - * @param jniUtil jni util - * @param env jni environment - * @param metric space type for distance calculation - * @param indexDescription index description to be used by faiss index factory * @param threadCount number of thread count to be used while adding data * @param indexPath path to write index - * @param idMap a map of document id and vector id + * @param idMapAddress index to insert to * @param parameters parameters to be applied to faiss index */ virtual void writeIndex(int threadCount, std::string indexPath, jlong idMapAddress, std::unordered_map parameters); @@ -134,28 +126,20 @@ class BinaryIndexService : public IndexService { /** * Add vectors to index * - * @param jniUtil jni util - * @param env jni environment - * @param metric space type for distance calculation - * @param indexDescription index description to be used by faiss index factory * @param dim dimension of vectors * @param numIds number of vectors * @param threadCount number of thread count to be used while adding data * @param vectorsAddress memory address which is holding vector data - * @param idMap a map of document id and vector id + * @param idMapAddress index to insert to * @param parameters parameters to be applied to faiss index */ virtual void insertToIndex(int dim, int numIds, int threadCount, int64_t vectorsAddress, std::vector &ids, jlong idMapAddress, std::unordered_map parameters) override; /** * Write index to disk * - * @param jniUtil jni util - * @param env jni environment - * @param metric space type for distance calculation - * @param indexDescription index description to be used by faiss index factory * @param threadCount number of thread count to be used while adding data * @param indexPath path to write index - * @param idMap a map of document id and vector id + * @param idMapAddress index to insert to * @param parameters parameters to be applied to faiss index */ virtual void writeIndex(int threadCount, std::string indexPath, jlong idMapAddress, std::unordered_map parameters) override; diff --git a/jni/src/faiss_index_service.cpp b/jni/src/faiss_index_service.cpp index fd8cb42027..8148c599cf 100644 --- a/jni/src/faiss_index_service.cpp +++ b/jni/src/faiss_index_service.cpp @@ -87,12 +87,16 @@ jlong IndexService::initIndex( // Add vectors faiss::IndexIDMap * idMap(faissMethods->indexIdMap(indexWriter)); + // Reserve memory for the vectors idMap->id_map.reserve(numVectors); + // Check if the underlying storage is IndexHNSW faiss::IndexHNSW * hnsw = dynamic_cast(idMap->index); if(hnsw != NULL) { + // Check that the hnsw storage is IndexFlat faiss::IndexFlat * storage = dynamic_cast(hnsw->storage); + // Reserve memory for all hnsw fields if(storage != NULL) { storage->codes.reserve(dim * numVectors * sizeof(float)); } @@ -239,12 +243,16 @@ jlong BinaryIndexService::initIndex( // Add vectors faiss::IndexBinaryIDMap * idMap(faissMethods->indexBinaryIdMap(indexWriter)); + // Reserve memory for the vectors idMap->id_map.reserve(numVectors); + // Check if the underlying storage is IndexBinaryHNSW faiss::IndexBinaryHNSW * hnsw = dynamic_cast(idMap->index); if(hnsw != NULL) { + // Check that the hnsw storage is IndexBinaryFlat faiss::IndexBinaryFlat * storage = dynamic_cast(hnsw->storage); + // Reserve memory for all hnsw fields if(storage != NULL) { storage->xb.reserve(dim / 8 * numVectors); } diff --git a/src/main/java/org/opensearch/knn/index/codec/KNN80Codec/KNN80DocValuesConsumer.java b/src/main/java/org/opensearch/knn/index/codec/KNN80Codec/KNN80DocValuesConsumer.java index 948ddb8022..28448acdcb 100644 --- a/src/main/java/org/opensearch/knn/index/codec/KNN80Codec/KNN80DocValuesConsumer.java +++ b/src/main/java/org/opensearch/knn/index/codec/KNN80Codec/KNN80DocValuesConsumer.java @@ -133,8 +133,6 @@ public void addKNNBinaryField(FieldInfo field, DocValuesProducer valuesProducer, ((FSDirectory) (FilterDirectory.unwrap(state.directory))).getDirectory().toString(), engineFileName ).toString(); - // Create library index either from model or from scratch - // This is a bit of a hack. We have to create an output here and then immediately close it to ensure that // engineFileName is added to the tracked files by Lucene's TrackingDirectoryWrapper. Otherwise, the file will // not be marked as added to the directory. diff --git a/src/main/java/org/opensearch/knn/index/codec/util/KNNCodecUtil.java b/src/main/java/org/opensearch/knn/index/codec/util/KNNCodecUtil.java index 1be9c526e6..d3c498e0a9 100644 --- a/src/main/java/org/opensearch/knn/index/codec/util/KNNCodecUtil.java +++ b/src/main/java/org/opensearch/knn/index/codec/util/KNNCodecUtil.java @@ -136,9 +136,6 @@ public static KNNCodecUtil.VectorBatch getFloatsBatch(BinaryDocValues values) th vectorList.toArray(new float[][] {}), docIdList.size() * dimension ); - // We should probably come up with a better way to reuse the vectorList memory which we have - // created. Problem here is doing like this can lead to a lot of list memory which is of no use and - // will be garbage collected later on, but it creates pressure on JVM. We should revisit this. return new KNNCodecUtil.VectorBatch( docIdList.stream().mapToInt(Integer::intValue).toArray(), vectorAddress, From 6890747e593424ca424446c0617d8d42313cbf4d Mon Sep 17 00:00:00 2001 From: Andrew Klepchick Date: Tue, 16 Jul 2024 15:43:58 -0700 Subject: [PATCH 12/16] Changed the name of getFloats to make it easier to distinguish from getFloatsBatch Signed-off-by: Andrew Klepchick --- .../knn/index/codec/KNN80Codec/KNN80DocValuesConsumer.java | 4 ++-- .../org/opensearch/knn/index/codec/util/KNNCodecUtil.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/opensearch/knn/index/codec/KNN80Codec/KNN80DocValuesConsumer.java b/src/main/java/org/opensearch/knn/index/codec/KNN80Codec/KNN80DocValuesConsumer.java index 28448acdcb..9db9f36cb4 100644 --- a/src/main/java/org/opensearch/knn/index/codec/KNN80Codec/KNN80DocValuesConsumer.java +++ b/src/main/java/org/opensearch/knn/index/codec/KNN80Codec/KNN80DocValuesConsumer.java @@ -143,7 +143,7 @@ public void addKNNBinaryField(FieldInfo field, DocValuesProducer valuesProducer, if (model.getModelBlob() == null) { throw new RuntimeException(String.format("There is no trained model with id \"%s\"", modelId)); } - KNNCodecUtil.VectorBatch pair = KNNCodecUtil.getFloats(values); + KNNCodecUtil.VectorBatch pair = KNNCodecUtil.getAllFloats(values); createKNNIndexFromTemplate(model.getModelBlob(), pair, knnEngine, indexPath); totalDocsIncrement += pair.docs.length; totalArraySize += calculateArraySize(pair.docs.length, pair.getDimension(), pair.serializationMode); @@ -165,7 +165,7 @@ public void addKNNBinaryField(FieldInfo field, DocValuesProducer valuesProducer, writeIndex(indexAddress, indexPath, knnEngine); } else { // Note that iterative graph construction has not yet been implemented for nmslib - KNNCodecUtil.VectorBatch pair = KNNCodecUtil.getFloats(values); + KNNCodecUtil.VectorBatch pair = KNNCodecUtil.getAllFloats(values); createKNNIndexFromScratch(field, pair, knnEngine, indexPath); totalDocsIncrement += pair.docs.length; totalArraySize += calculateArraySize(pair.docs.length, pair.getDimension(), pair.serializationMode); diff --git a/src/main/java/org/opensearch/knn/index/codec/util/KNNCodecUtil.java b/src/main/java/org/opensearch/knn/index/codec/util/KNNCodecUtil.java index d3c498e0a9..054408d9f3 100644 --- a/src/main/java/org/opensearch/knn/index/codec/util/KNNCodecUtil.java +++ b/src/main/java/org/opensearch/knn/index/codec/util/KNNCodecUtil.java @@ -43,7 +43,7 @@ public static final class VectorBatch { public boolean finished; } - public static KNNCodecUtil.VectorBatch getFloats(BinaryDocValues values) throws IOException { + public static KNNCodecUtil.VectorBatch getAllFloats(BinaryDocValues values) throws IOException { List vectorList = new ArrayList<>(); List docIdList = new ArrayList<>(); long vectorAddress = 0; From 24586b7338cf3ad0d64055c309635c60f9bc44a4 Mon Sep 17 00:00:00 2001 From: Andrew Klepchick Date: Tue, 16 Jul 2024 15:48:11 -0700 Subject: [PATCH 13/16] Improved readability of vector retrieval functions. Signed-off-by: Andrew Klepchick --- .../index/codec/KNN80Codec/KNN80DocValuesConsumer.java | 8 ++++---- .../org/opensearch/knn/index/codec/util/KNNCodecUtil.java | 8 ++++++-- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/opensearch/knn/index/codec/KNN80Codec/KNN80DocValuesConsumer.java b/src/main/java/org/opensearch/knn/index/codec/KNN80Codec/KNN80DocValuesConsumer.java index 9db9f36cb4..b9c9e733b9 100644 --- a/src/main/java/org/opensearch/knn/index/codec/KNN80Codec/KNN80DocValuesConsumer.java +++ b/src/main/java/org/opensearch/knn/index/codec/KNN80Codec/KNN80DocValuesConsumer.java @@ -143,13 +143,13 @@ public void addKNNBinaryField(FieldInfo field, DocValuesProducer valuesProducer, if (model.getModelBlob() == null) { throw new RuntimeException(String.format("There is no trained model with id \"%s\"", modelId)); } - KNNCodecUtil.VectorBatch pair = KNNCodecUtil.getAllFloats(values); + KNNCodecUtil.VectorBatch pair = KNNCodecUtil.readAllVectors(values); createKNNIndexFromTemplate(model.getModelBlob(), pair, knnEngine, indexPath); totalDocsIncrement += pair.docs.length; totalArraySize += calculateArraySize(pair.docs.length, pair.getDimension(), pair.serializationMode); } else { if (KNNEngine.FAISS == knnEngine) { - KNNCodecUtil.VectorBatch pair = KNNCodecUtil.getFloatsBatch(values); + KNNCodecUtil.VectorBatch pair = KNNCodecUtil.readVectorBatch(values); long indexAddress = initIndexFromScratch(field, num_docs, pair.getDimension(), knnEngine); while (true) { if (pair.docs.length != 0) insertToIndex(pair, knnEngine, indexAddress); @@ -160,12 +160,12 @@ public void addKNNBinaryField(FieldInfo field, DocValuesProducer valuesProducer, if (pair.finished) { break; } - pair = KNNCodecUtil.getFloatsBatch(values); + pair = KNNCodecUtil.readVectorBatch(values); } writeIndex(indexAddress, indexPath, knnEngine); } else { // Note that iterative graph construction has not yet been implemented for nmslib - KNNCodecUtil.VectorBatch pair = KNNCodecUtil.getAllFloats(values); + KNNCodecUtil.VectorBatch pair = KNNCodecUtil.readAllVectors(values); createKNNIndexFromScratch(field, pair, knnEngine, indexPath); totalDocsIncrement += pair.docs.length; totalArraySize += calculateArraySize(pair.docs.length, pair.getDimension(), pair.serializationMode); diff --git a/src/main/java/org/opensearch/knn/index/codec/util/KNNCodecUtil.java b/src/main/java/org/opensearch/knn/index/codec/util/KNNCodecUtil.java index 054408d9f3..248c697e3b 100644 --- a/src/main/java/org/opensearch/knn/index/codec/util/KNNCodecUtil.java +++ b/src/main/java/org/opensearch/knn/index/codec/util/KNNCodecUtil.java @@ -30,6 +30,8 @@ public class KNNCodecUtil { // Java rounds each array size up to multiples of 8 bytes public static final int JAVA_ROUNDING_NUMBER = 8; + // Class that holds all of the data of a batch of vectors, and whether or not + // the vector retrieval method is finished. @AllArgsConstructor public static final class VectorBatch { public int[] docs; @@ -43,7 +45,8 @@ public static final class VectorBatch { public boolean finished; } - public static KNNCodecUtil.VectorBatch getAllFloats(BinaryDocValues values) throws IOException { + // Function to get all of the floats from BinaryDocValues. Will always be finished reading vectors. + public static KNNCodecUtil.VectorBatch readAllVectors(BinaryDocValues values) throws IOException { List vectorList = new ArrayList<>(); List docIdList = new ArrayList<>(); long vectorAddress = 0; @@ -99,7 +102,8 @@ public static KNNCodecUtil.VectorBatch getAllFloats(BinaryDocValues values) thro ); } - public static KNNCodecUtil.VectorBatch getFloatsBatch(BinaryDocValues values) throws IOException { + // Function to get one batch of floats from BinaryDocValues. May not always be finished reading vectors. + public static KNNCodecUtil.VectorBatch readVectorBatch(BinaryDocValues values) throws IOException { List vectorList = new ArrayList<>(); List docIdList = new ArrayList<>(); long vectorAddress = 0; From eed29fae68b5c0033e236a9c0183794d60feb50b Mon Sep 17 00:00:00 2001 From: Andrew Klepchick Date: Tue, 16 Jul 2024 16:00:03 -0700 Subject: [PATCH 14/16] Removed extra cmake file Signed-off-by: Andrew Klepchick --- jni/cmake/init-faiss.cmake-e | 69 ------------------------------------ 1 file changed, 69 deletions(-) delete mode 100644 jni/cmake/init-faiss.cmake-e diff --git a/jni/cmake/init-faiss.cmake-e b/jni/cmake/init-faiss.cmake-e deleted file mode 100644 index bef93eda00..0000000000 --- a/jni/cmake/init-faiss.cmake-e +++ /dev/null @@ -1,69 +0,0 @@ -# -# Copyright OpenSearch Contributors -# SPDX-License-Identifier: Apache-2.0 -# - -# Check if faiss exists -find_path(FAISS_REPO_DIR NAMES faiss PATHS ${CMAKE_CURRENT_SOURCE_DIR}/external/faiss NO_DEFAULT_PATH) - -# If not, pull the updated submodule -if (NOT EXISTS ${FAISS_REPO_DIR}) - message(STATUS "Could not find faiss. Pulling updated submodule.") - execute_process(COMMAND git submodule update --init -- external/faiss WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}) -endif () - -# Check if patch exist, this is to skip git apply during CI build. See CI.yml with ubuntu. -find_path(PATCH_FILE NAMES 0001-Custom-patch-to-support-multi-vector.patch 0002-Enable-precomp-table-to-be-shared-ivfpq.patch 0003-Custom-patch-to-support-range-search-params.patch PATHS ${CMAKE_CURRENT_SOURCE_DIR}/patches/faiss NO_DEFAULT_PATH) - -# If it exists, apply patches -if (EXISTS ${PATCH_FILE}) - message(STATUS "Applying custom patches.") - execute_process(COMMAND git ${GIT_PATCH_COMMAND} --3way --ignore-space-change --ignore-whitespace ${CMAKE_CURRENT_SOURCE_DIR}/patches/faiss/0001-Custom-patch-to-support-multi-vector.patch WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/external/faiss ERROR_VARIABLE ERROR_MSG RESULT_VARIABLE RESULT_CODE) - execute_process(COMMAND git ${GIT_PATCH_COMMAND} --3way --ignore-space-change --ignore-whitespace ${CMAKE_CURRENT_SOURCE_DIR}/patches/faiss/0002-Enable-precomp-table-to-be-shared-ivfpq.patch WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/external/faiss ERROR_VARIABLE ERROR_MSG RESULT_VARIABLE RESULT_CODE) - execute_process(COMMAND git ${GIT_PATCH_COMMAND} --3way --ignore-space-change --ignore-whitespace ${CMAKE_CURRENT_SOURCE_DIR}/patches/faiss/0003-Custom-patch-to-support-range-search-params.patch WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/external/faiss ERROR_VARIABLE ERROR_MSG RESULT_VARIABLE RESULT_CODE) - if(RESULT_CODE) - message(FATAL_ERROR "Failed to apply patch:\n${ERROR_MSG}") - endif() -endif() - -if (${CMAKE_SYSTEM_NAME} STREQUAL Darwin) - if(CMAKE_C_COMPILER_ID MATCHES "Clang\$") - set(OpenMP_C_FLAGS "-Xpreprocessor -fopenmp") - set(OpenMP_C_LIB_NAMES "omp") - set(OpenMP_omp_LIBRARY /usr/local/opt/libomp/lib/libomp.dylib) - endif() - - if(CMAKE_CXX_COMPILER_ID MATCHES "Clang\$") - set(OpenMP_CXX_FLAGS "-Xpreprocessor -fopenmp -I/usr/local/opt/libomp/include") - set(OpenMP_CXX_LIB_NAMES "omp") - set(OpenMP_omp_LIBRARY /usr/local/opt/libomp/lib/libomp.dylib) - endif() -endif() - -find_package(ZLIB REQUIRED) - -# Statically link BLAS - ensure this is before we find the blas package so we dont dynamically link -set(BLA_STATIC ON) -find_package(BLAS REQUIRED) -enable_language(Fortran) -find_package(LAPACK REQUIRED) - -# Set relevant properties -set(BUILD_TESTING OFF) # Avoid building faiss tests -set(FAISS_ENABLE_GPU OFF) -set(FAISS_ENABLE_PYTHON OFF) - -if(NOT DEFINED SIMD_ENABLED) - set(SIMD_ENABLED true) # set default value as true if the argument is not set -endif() - -if(${CMAKE_SYSTEM_NAME} STREQUAL Windows OR ${CMAKE_SYSTEM_PROCESSOR} MATCHES "aarch64" OR ${CMAKE_SYSTEM_PROCESSOR} MATCHES "arm64" OR NOT ${SIMD_ENABLED}) - set(FAISS_OPT_LEVEL generic) # Keep optimization level as generic on Windows OS as it is not supported due to MINGW64 compiler issue. Also, on aarch64 avx2 is not supported. - set(TARGET_LINK_FAISS_LIB faiss) -else() - set(FAISS_OPT_LEVEL avx2) # Keep optimization level as avx2 to improve performance on Linux and Mac. - set(TARGET_LINK_FAISS_LIB faiss_avx2) - string(PREPEND LIB_EXT "_avx2") # Prepend "_avx2" to lib extension to create the library as "libopensearchknn_faiss_avx2.so" on linux and "libopensearchknn_faiss_avx2.jnilib" on mac -endif() - -add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/external/faiss EXCLUDE_FROM_ALL) From 79e0fe3794cf8bd0ebbf2ae17a31bdd6c2eb41f7 Mon Sep 17 00:00:00 2001 From: Andrew Klepchick Date: Tue, 16 Jul 2024 16:14:11 -0700 Subject: [PATCH 15/16] Added some more comments Signed-off-by: Andrew Klepchick --- jni/include/faiss_wrapper.h | 5 ++ .../org/opensearch/knn/jni/FaissService.java | 46 +++++++++++++++++++ 2 files changed, 51 insertions(+) diff --git a/jni/include/faiss_wrapper.h b/jni/include/faiss_wrapper.h index fa120f9dcc..5d81201519 100644 --- a/jni/include/faiss_wrapper.h +++ b/jni/include/faiss_wrapper.h @@ -18,10 +18,15 @@ namespace knn_jni { namespace faiss_wrapper { + // Intitialize an index. The configuration is defined by values in the Java map, parametersJ. + // Returns a pointer to the index. jlong InitIndex(knn_jni::JNIUtilInterface *jniUtil, JNIEnv *env, jlong numDocs, jint dimJ, jobject parametersJ, IndexService *indexService); + // Inserts to an initialized index. The configuration is defined by values in the Java map, parametersJ. void InsertToIndex(knn_jni::JNIUtilInterface *jniUtil, JNIEnv *env, jintArray idsJ, jlong vectorsAddressJ, jint dimJ, jlong index_ptr, jobject parametersJ, IndexService *indexService); + // Writes an initialized index. The configuration is defined by values in the Java map, parametersJ. + // The index is serialized to indexPathJ. void WriteIndex(knn_jni::JNIUtilInterface *jniUtil, JNIEnv *env, jstring indexPathJ, jlong index_ptr, jobject parametersJ, IndexService *indexService); // Create an index with ids and vectors. The configuration is defined by values in the Java map, parametersJ. diff --git a/src/main/java/org/opensearch/knn/jni/FaissService.java b/src/main/java/org/opensearch/knn/jni/FaissService.java index d2042cb0a7..798c53953e 100644 --- a/src/main/java/org/opensearch/knn/jni/FaissService.java +++ b/src/main/java/org/opensearch/knn/jni/FaissService.java @@ -49,12 +49,46 @@ class FaissService { }); } + /** + * Initialize the index for the native library. + * + * @param numDocs number of documents the index will have after all insertions + * @param dim dimension of the vector to be indexed + * @param parameters parameters to build index + * @return address of the index + */ public static native long initIndex(long numDocs, int dim, Map parameters); + /** + * Initialize the binary index for the native library. + * + * @param numDocs number of documents the index will have after all insertions + * @param dim dimension of the vector to be indexed + * @param parameters parameters to build index + * @return address of the index + */ public static native long initBinaryIndex(long numDocs, int dim, Map parameters); + /** + * Insert to the index. The memory occupied by the vectorsAddress will be freed during a function call + * + * @param ids the ids of the vectors + * @param vectorsAddress address of native memory where vectors are stored + * @param dim dimension of the vector to be indexed + * @param indexAddress address of the index + * @param parameters parameters to build index + */ public static native void insertToIndex(int[] ids, long vectorsAddress, int dim, long indexAddress, Map parameters); + /** + * Insert to the binary index. The memory occupied by the vectorsAddress will be freed during a function call + * + * @param ids the ids of the vectors + * @param vectorsAddress address of native memory where vectors are stored + * @param dim dimension of the vector to be indexed + * @param indexAddress address of the index + * @param parameters parameters to build index + */ public static native void insertToBinaryIndex( int[] ids, long vectorsAddress, @@ -63,8 +97,20 @@ public static native void insertToBinaryIndex( Map parameters ); + /** + * Write the index to disk. + * @param indexAddress address of the index + * @param indexPath path to save index file to + * @param parameters parameters to build index + */ public static native void writeIndex(long indexAddress, String indexPath, Map parameters); + /** + * Write the binary index to disk. + * @param indexAddress address of the index + * @param indexPath path to save index file to + * @param parameters parameters to build index + */ public static native void writeBinaryIndex(long indexAddress, String indexPath, Map parameters); /** From c13925df50ed95c980613b1ceaac4a1d3a4831f6 Mon Sep 17 00:00:00 2001 From: Andrew Klepchick Date: Wed, 17 Jul 2024 10:29:33 -0700 Subject: [PATCH 16/16] Cleaned up addKNNBinaryField method. Will be less complex once initIndexFromTemplate is added. Signed-off-by: Andrew Klepchick --- .../KNN80Codec/KNN80DocValuesConsumer.java | 58 ++++++++----------- 1 file changed, 24 insertions(+), 34 deletions(-) diff --git a/src/main/java/org/opensearch/knn/index/codec/KNN80Codec/KNN80DocValuesConsumer.java b/src/main/java/org/opensearch/knn/index/codec/KNN80Codec/KNN80DocValuesConsumer.java index b9c9e733b9..77e00ea3ee 100644 --- a/src/main/java/org/opensearch/knn/index/codec/KNN80Codec/KNN80DocValuesConsumer.java +++ b/src/main/java/org/opensearch/knn/index/codec/KNN80Codec/KNN80DocValuesConsumer.java @@ -114,12 +114,7 @@ public void addKNNBinaryField(FieldInfo field, DocValuesProducer valuesProducer, return; } BinaryDocValues values = valuesProducer.getBinary(field); - long num_docs = KNNCodecUtil.getTotalLiveDocsCount(values); - long totalArraySize = 0; - long totalDocsIncrement = 0; - if (isMerge) { - KNNGraphValue.MERGE_CURRENT_OPERATIONS.increment(); - } + long numDocs = KNNCodecUtil.getTotalLiveDocsCount(values); // Increment counter for number of graph index requests KNNCounter.GRAPH_INDEX_REQUESTS.increment(); final KNNEngine knnEngine = getKNNEngine(field); @@ -137,44 +132,39 @@ public void addKNNBinaryField(FieldInfo field, DocValuesProducer valuesProducer, // engineFileName is added to the tracked files by Lucene's TrackingDirectoryWrapper. Otherwise, the file will // not be marked as added to the directory. state.directory.createOutput(engineFileName, state.context).close(); + KNNCodecUtil.VectorBatch pair; + // Get first pair + if(field.attributes().containsKey(MODEL_ID) || KNNEngine.NMSLIB == knnEngine) { + pair = KNNCodecUtil.readAllVectors(values); + } else { + pair = KNNCodecUtil.readVectorBatch(values); + } + int dim = 0; + // This will be cleaner once support for initIndexFromTemplate is added. if (field.attributes().containsKey(MODEL_ID)) { String modelId = field.attributes().get(MODEL_ID); Model model = ModelCache.getInstance().get(modelId); if (model.getModelBlob() == null) { throw new RuntimeException(String.format("There is no trained model with id \"%s\"", modelId)); } - KNNCodecUtil.VectorBatch pair = KNNCodecUtil.readAllVectors(values); createKNNIndexFromTemplate(model.getModelBlob(), pair, knnEngine, indexPath); - totalDocsIncrement += pair.docs.length; - totalArraySize += calculateArraySize(pair.docs.length, pair.getDimension(), pair.serializationMode); - } else { - if (KNNEngine.FAISS == knnEngine) { - KNNCodecUtil.VectorBatch pair = KNNCodecUtil.readVectorBatch(values); - long indexAddress = initIndexFromScratch(field, num_docs, pair.getDimension(), knnEngine); - while (true) { - if (pair.docs.length != 0) insertToIndex(pair, knnEngine, indexAddress); - if (isMerge) { - totalArraySize += calculateArraySize(pair.docs.length, pair.getDimension(), pair.serializationMode); - totalDocsIncrement += pair.docs.length; - } - if (pair.finished) { - break; - } - pair = KNNCodecUtil.readVectorBatch(values); - } - writeIndex(indexAddress, indexPath, knnEngine); - } else { - // Note that iterative graph construction has not yet been implemented for nmslib - KNNCodecUtil.VectorBatch pair = KNNCodecUtil.readAllVectors(values); - createKNNIndexFromScratch(field, pair, knnEngine, indexPath); - totalDocsIncrement += pair.docs.length; - totalArraySize += calculateArraySize(pair.docs.length, pair.getDimension(), pair.serializationMode); + } else if (KNNEngine.FAISS == knnEngine) { + long indexAddress = initIndexFromScratch(field, numDocs, pair.getDimension(), knnEngine); + insertToIndex(pair, knnEngine, indexAddress); + while (!pair.finished) { + pair = KNNCodecUtil.readVectorBatch(values); + insertToIndex(pair, knnEngine, indexAddress); } + writeIndex(indexAddress, indexPath, knnEngine); + } else { + // Note that iterative graph construction has not yet been implemented for nmslib + createKNNIndexFromScratch(field, pair, knnEngine, indexPath); } if (isMerge) { - KNNGraphValue.MERGE_CURRENT_DOCS.incrementBy(totalDocsIncrement); - KNNGraphValue.MERGE_CURRENT_SIZE_IN_BYTES.incrementBy(totalArraySize); - recordMergeStats((int) totalDocsIncrement, totalArraySize); + KNNGraphValue.MERGE_CURRENT_OPERATIONS.increment(); + KNNGraphValue.MERGE_CURRENT_DOCS.incrementBy(numDocs); + KNNGraphValue.MERGE_CURRENT_SIZE_IN_BYTES.incrementBy(calculateArraySize((int)numDocs, dim, pair.serializationMode)); + recordMergeStats((int)numDocs, calculateArraySize((int)numDocs, dim, pair.serializationMode)); } if (isRefresh) { recordRefreshStats();