Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduced writing layer, getting rid of writing logic that uses an absolute path in the filesystem. #2241

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
## [Unreleased 2.x](https://github.com/opensearch-project/k-NN/compare/2.18...2.x)
### Features
### Enhancements
- Introduced a writing layer in native engines where relies on the writing interface to process IO. (#2241)[https://github.com/opensearch-project/k-NN/pull/2241]
### Bug Fixes
### Infrastructure
### Documentation
Expand Down
1 change: 1 addition & 0 deletions jni/cmake/init-nmslib.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ if(NOT DEFINED APPLY_LIB_PATCHES OR "${APPLY_LIB_PATCHES}" STREQUAL true)
list(APPEND PATCH_FILE_LIST "${CMAKE_CURRENT_SOURCE_DIR}/patches/nmslib/0001-Initialize-maxlevel-during-add-from-enterpoint-level.patch")
list(APPEND PATCH_FILE_LIST "${CMAKE_CURRENT_SOURCE_DIR}/patches/nmslib/0002-Adds-ability-to-pass-ef-parameter-in-the-query-for-h.patch")
list(APPEND PATCH_FILE_LIST "${CMAKE_CURRENT_SOURCE_DIR}/patches/nmslib/0003-Added-streaming-apis-for-vector-index-loading-in-Hnsw.patch")
list(APPEND PATCH_FILE_LIST "${CMAKE_CURRENT_SOURCE_DIR}/patches/nmslib/0004-Added-a-new-save-apis-in-Hnsw-with-streaming-interfa.patch")

# Get patch id of the last commit
execute_process(COMMAND sh -c "git --no-pager show HEAD | git patch-id --stable" OUTPUT_VARIABLE PATCH_ID_OUTPUT_FROM_COMMIT WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/external/nmslib)
Expand Down
6 changes: 6 additions & 0 deletions jni/include/commons.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

#ifndef OPENSEARCH_KNN_COMMONS_H
#define OPENSEARCH_KNN_COMMONS_H

#include "jni_util.h"
#include <jni.h>
namespace knn_jni {
Expand Down Expand Up @@ -99,3 +103,5 @@ namespace knn_jni {
int getIntegerMethodParameter(JNIEnv *, knn_jni::JNIUtilInterface *, std::unordered_map<std::string, jobject>, std::string, int);
}
}

#endif
63 changes: 38 additions & 25 deletions jni/include/faiss_index_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

#include <jni.h>
#include "faiss/MetricType.h"
#include "faiss/impl/io.h"
#include "jni_util.h"
#include "faiss_methods.h"
#include "faiss_stream_support.h"
#include <memory>

namespace knn_jni {
Expand All @@ -30,7 +32,8 @@ namespace faiss_wrapper {
*/
class IndexService {
public:
IndexService(std::unique_ptr<FaissMethods> faissMethods);
explicit IndexService(std::unique_ptr<FaissMethods> faissMethods);

/**
* Initialize index
*
Expand All @@ -45,6 +48,7 @@ class IndexService {
* @return memory address of the native index object
*/
virtual jlong initIndex(knn_jni::JNIUtilInterface *jniUtil, JNIEnv *env, faiss::MetricType metric, std::string indexDescription, int dim, int numVectors, int threadCount, std::unordered_map<std::string, jobject> parameters);

/**
* Add vectors to index
*
Expand All @@ -55,29 +59,34 @@ class IndexService {
* @param idMapAddress memory address of the native index object
*/
virtual void insertToIndex(int dim, int numIds, int threadCount, int64_t vectorsAddress, std::vector<int64_t> &ids, jlong idMapAddress);

/**
* Write index to disk
*
* @param threadCount number of thread count to be used while adding data
* @param indexPath path to write index
* @param idMap memory address of the native index object
* @param writer IOWriter implementation doing IO processing.
* In most cases, it is expected to have underlying Lucene's IndexOuptut.
* @param idMapAddress memory address of the native index object
*/
virtual void writeIndex(std::string indexPath, jlong idMapAddress);
virtual void writeIndex(faiss::IOWriter* writer, jlong idMapAddress);

virtual ~IndexService() = default;

protected:
virtual void allocIndex(faiss::Index * index, size_t dim, size_t numVectors);

std::unique_ptr<FaissMethods> faissMethods;
};
}; // class IndexService

/**
* A class to provide operations on index
* This class should evolve to have only cpp object but not jni object
*/
class BinaryIndexService : public IndexService {
class BinaryIndexService final : public IndexService {
public:
//TODO Remove dependency on JNIUtilInterface and JNIEnv
//TODO Reduce the number of parameters
BinaryIndexService(std::unique_ptr<FaissMethods> faissMethods);
explicit BinaryIndexService(std::unique_ptr<FaissMethods> faissMethods);

/**
* Initialize index
*
Expand All @@ -91,7 +100,8 @@ class BinaryIndexService : public IndexService {
* @param parameters parameters to be applied to faiss index
* @return memory address of the native index object
*/
virtual jlong initIndex(knn_jni::JNIUtilInterface *jniUtil, JNIEnv *env, faiss::MetricType metric, std::string indexDescription, int dim, int numVectors, int threadCount, std::unordered_map<std::string, jobject> parameters) override;
jlong initIndex(knn_jni::JNIUtilInterface *jniUtil, JNIEnv *env, faiss::MetricType metric, std::string indexDescription, int dim, int numVectors, int threadCount, std::unordered_map<std::string, jobject> parameters) final;

/**
* Add vectors to index
*
Expand All @@ -106,7 +116,8 @@ class BinaryIndexService : public IndexService {
* @param idMap a map of document id and vector id
* @param parameters parameters to be applied to faiss index
*/
virtual void insertToIndex(int dim, int numIds, int threadCount, int64_t vectorsAddress, std::vector<int64_t> &ids, jlong idMapAddress) override;
void insertToIndex(int dim, int numIds, int threadCount, int64_t vectorsAddress, std::vector<int64_t> &ids, jlong idMapAddress) final;

/**
* Write index to disk
*
Expand All @@ -119,23 +130,23 @@ class BinaryIndexService : public IndexService {
* @param idMap a map of document id and vector id
* @param parameters parameters to be applied to faiss index
*/
virtual void writeIndex(std::string indexPath, jlong idMapAddress) override;
virtual ~BinaryIndexService() = default;
void writeIndex(faiss::IOWriter* writer, jlong idMapAddress) final;

protected:
virtual void allocIndex(faiss::Index * index, size_t dim, size_t numVectors) override;
};
void allocIndex(faiss::Index * index, size_t dim, size_t numVectors) final;
}; // class BinaryIndexService

/**
* A class to provide operations on index
* This class should evolve to have only cpp object but not jni object
*/
class ByteIndexService : public IndexService {
class ByteIndexService final : public IndexService {
public:
//TODO Remove dependency on JNIUtilInterface and JNIEnv
//TODO Reduce the number of parameters
ByteIndexService(std::unique_ptr<FaissMethods> faissMethods);
explicit ByteIndexService(std::unique_ptr<FaissMethods> faissMethods);

/**
/**
* Initialize index
*
* @param jniUtil jni util
Expand All @@ -148,7 +159,8 @@ class ByteIndexService : public IndexService {
* @param parameters parameters to be applied to faiss index
* @return memory address of the native index object
*/
virtual jlong initIndex(knn_jni::JNIUtilInterface *jniUtil, JNIEnv *env, faiss::MetricType metric, std::string indexDescription, int dim, int numVectors, int threadCount, std::unordered_map<std::string, jobject> parameters) override;
jlong initIndex(knn_jni::JNIUtilInterface *jniUtil, JNIEnv *env, faiss::MetricType metric, std::string indexDescription, int dim, int numVectors, int threadCount, std::unordered_map<std::string, jobject> parameters) final;

/**
* Add vectors to index
*
Expand All @@ -163,7 +175,8 @@ class ByteIndexService : public IndexService {
* @param idMap a map of document id and vector id
* @param parameters parameters to be applied to faiss index
*/
virtual void insertToIndex(int dim, int numIds, int threadCount, int64_t vectorsAddress, std::vector<int64_t> &ids, jlong idMapAddress) override;
void insertToIndex(int dim, int numIds, int threadCount, int64_t vectorsAddress, std::vector<int64_t> &ids, jlong idMapAddress) final;

/**
* Write index to disk
*
Expand All @@ -176,14 +189,14 @@ class ByteIndexService : public IndexService {
* @param idMap a map of document id and vector id
* @param parameters parameters to be applied to faiss index
*/
virtual void writeIndex(std::string indexPath, jlong idMapAddress) override;
virtual ~ByteIndexService() = default;
protected:
virtual void allocIndex(faiss::Index * index, size_t dim, size_t numVectors) override;
};
void writeIndex(faiss::IOWriter* writer, jlong idMapAddress) final;

protected:
void allocIndex(faiss::Index * index, size_t dim, size_t numVectors) final;
}; // class ByteIndexService

}
}


#endif //OPENSEARCH_KNN_FAISS_INDEX_SERVICE_H
#endif //OPENSEARCH_KNN_FAISS_INDEX_SERVICE_H
14 changes: 11 additions & 3 deletions jni/include/faiss_methods.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#ifndef OPENSEARCH_KNN_FAISS_METHODS_H
#define OPENSEARCH_KNN_FAISS_METHODS_H

#include "faiss/impl/io.h"
#include "faiss/Index.h"
#include "faiss/IndexBinary.h"
#include "faiss/IndexIDMap.h"
Expand All @@ -26,14 +27,21 @@ namespace faiss_wrapper {
class FaissMethods {
public:
FaissMethods() = default;

virtual faiss::Index* indexFactory(int d, const char* description, faiss::MetricType metric);

virtual faiss::IndexBinary* indexBinaryFactory(int d, const char* description);

virtual faiss::IndexIDMapTemplate<faiss::Index>* indexIdMap(faiss::Index* index);

virtual faiss::IndexIDMapTemplate<faiss::IndexBinary>* indexBinaryIdMap(faiss::IndexBinary* index);
virtual void writeIndex(const faiss::Index* idx, const char* fname);
virtual void writeIndexBinary(const faiss::IndexBinary* idx, const char* fname);

virtual void writeIndex(const faiss::Index* idx, faiss::IOWriter* writer);

virtual void writeIndexBinary(const faiss::IndexBinary* idx, faiss::IOWriter* writer);

virtual ~FaissMethods() = default;
};
}; // class FaissMethods

} //namespace faiss_wrapper
} //namespace knn_jni
Expand Down
37 changes: 36 additions & 1 deletion jni/include/faiss_stream_support.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "faiss/impl/io.h"
#include "jni_util.h"
#include "native_engines_stream_support.h"
#include "parameter_utils.h"

#include <jni.h>
#include <stdexcept>
Expand All @@ -34,7 +35,7 @@ class FaissOpenSearchIOReader final : public faiss::IOReader {
public:
explicit FaissOpenSearchIOReader(NativeEngineIndexInputMediator *_mediator)
: faiss::IOReader(),
mediator(_mediator) {
mediator(knn_jni::util::ParameterCheck::require_non_null(_mediator, "mediator")) {
name = "FaissOpenSearchIOReader";
}

Expand All @@ -56,6 +57,40 @@ class FaissOpenSearchIOReader final : public faiss::IOReader {
}; // class FaissOpenSearchIOReader


/**
* A glue component inheriting IOWriter to delegate IO processing down to the given
* mediator. The mediator is expected to do write bytes via the provided Lucene's IndexOutput.
*/
class FaissOpenSearchIOWriter final : public faiss::IOWriter {
0ctopus13prime marked this conversation as resolved.
Show resolved Hide resolved
public:
explicit FaissOpenSearchIOWriter(NativeEngineIndexOutputMediator *_mediator)
0ctopus13prime marked this conversation as resolved.
Show resolved Hide resolved
: faiss::IOWriter(),
mediator(knn_jni::util::ParameterCheck::require_non_null(_mediator, "mediator")) {
name = "FaissOpenSearchIOWriter";
}

size_t operator()(const void *ptr, size_t size, size_t nitems) final {
0ctopus13prime marked this conversation as resolved.
Show resolved Hide resolved
const auto writeBytes = size * nitems;
if (writeBytes > 0) {
mediator->writeBytes(reinterpret_cast<const uint8_t *>(ptr), writeBytes);
}
return nitems;
}

// return a file number that can be memory-mapped
int filedescriptor() final {
throw std::runtime_error("filedescriptor() is not supported in FaissOpenSearchIOWriter.");
jmazanec15 marked this conversation as resolved.
Show resolved Hide resolved
}

void flush() {
mediator->flush();
}

private:
NativeEngineIndexOutputMediator *mediator;
}; // class FaissOpenSearchIOWriter



}
}
Expand Down
35 changes: 18 additions & 17 deletions jni/include/faiss_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "jni_util.h"
#include "faiss_index_service.h"
#include "faiss_stream_support.h"
#include <jni.h>

namespace knn_jni {
Expand All @@ -22,25 +23,25 @@ namespace knn_jni {

void InsertToIndex(knn_jni::JNIUtilInterface *jniUtil, JNIEnv *env, jintArray idsJ, jlong vectorsAddressJ, jint dimJ, jlong indexAddr, jint threadCount, IndexService *indexService);

void WriteIndex(knn_jni::JNIUtilInterface *jniUtil, JNIEnv *env, jstring indexPathJ, jlong indexAddr, IndexService *indexService);
void WriteIndex(knn_jni::JNIUtilInterface *jniUtil, JNIEnv *env, jobject output, jlong indexAddr, IndexService *indexService);

// Create an index with ids and vectors. Instead of creating a new index, this function creates the index
// based off of the template index passed in. The index is serialized to indexPathJ.
void CreateIndexFromTemplate(knn_jni::JNIUtilInterface * jniUtil, JNIEnv * env, jintArray idsJ,
jlong vectorsAddressJ, jint dimJ, jstring indexPathJ, jbyteArray templateIndexJ,
jlong vectorsAddressJ, jint dimJ, jobject output, jbyteArray templateIndexJ,
jobject parametersJ);

// Create an index with ids and vectors. Instead of creating a new index, this function creates the index
// based off of the template index passed in. The index is serialized to indexPathJ.
void CreateBinaryIndexFromTemplate(knn_jni::JNIUtilInterface * jniUtil, JNIEnv * env, jintArray idsJ,
jlong vectorsAddressJ, jint dimJ, jstring indexPathJ, jbyteArray templateIndexJ,
jobject parametersJ);
jlong vectorsAddressJ, jint dimJ, jobject output, jbyteArray templateIndexJ,
jobject parametersJ);

// Create a index with ids and byte vectors. Instead of creating a new index, this function creates the index
// based off of the template index passed in. The index is serialized to indexPathJ.
void CreateByteIndexFromTemplate(knn_jni::JNIUtilInterface * jniUtil, JNIEnv * env, jintArray idsJ,
jlong vectorsAddressJ, jint dimJ, jstring indexPathJ, jbyteArray templateIndexJ,
jobject parametersJ);
jlong vectorsAddressJ, jint dimJ, jobject output, jbyteArray templateIndexJ,
jobject parametersJ);

// Load an index from indexPathJ into memory.
//
Expand Down Expand Up @@ -74,28 +75,28 @@ namespace knn_jni {
// Sets the sharedIndexState for an index
void SetSharedIndexState(jlong indexPointerJ, jlong shareIndexStatePointerJ);

/**
/**
* Execute a query against the index located in memory at indexPointerJ
*
*
* Parameters:
* methodParamsJ: introduces a map to have additional method parameters
*
*
* Return an array of KNNQueryResults
*/
*/
jobjectArray QueryIndex(knn_jni::JNIUtilInterface * jniUtil, JNIEnv * env, jlong indexPointerJ,
jfloatArray queryVectorJ, jint kJ, jobject methodParamsJ, jintArray parentIdsJ);

/**
* Execute a query against the index located in memory at indexPointerJ along with Filters
*
*
* Parameters:
* methodParamsJ: introduces a map to have additional method parameters
*
*
* Return an array of KNNQueryResults
*/
jobjectArray QueryIndex_WithFilter(knn_jni::JNIUtilInterface * jniUtil, JNIEnv * env, jlong indexPointerJ,
jfloatArray queryVectorJ, jint kJ, jobject methodParamsJ, jlongArray filterIdsJ,
jint filterIdsTypeJ, jintArray parentIdsJ);
jfloatArray queryVectorJ, jint kJ, jobject methodParamsJ, jlongArray filterIdsJ,
jint filterIdsTypeJ, jintArray parentIdsJ);

// Execute a query against the binary index located in memory at indexPointerJ along with Filters
//
Expand Down Expand Up @@ -124,14 +125,14 @@ namespace knn_jni {
//
// Return the serialized representation
jbyteArray TrainBinaryIndex(knn_jni::JNIUtilInterface * jniUtil, JNIEnv * env, jobject parametersJ, jint dimension,
jlong trainVectorsPointerJ);
jlong trainVectorsPointerJ);

// Create an empty byte index defined by the values in the Java map, parametersJ. Train the index with
// the byte vectors located at trainVectorsPointerJ.
//
// Return the serialized representation
jbyteArray TrainByteIndex(knn_jni::JNIUtilInterface * jniUtil, JNIEnv * env, jobject parametersJ, jint dimension,
jlong trainVectorsPointerJ);
jlong trainVectorsPointerJ);

/*
* Perform a range search with filter against the index located in memory at indexPointerJ.
Expand Down Expand Up @@ -163,7 +164,7 @@ namespace knn_jni {
* @return an array of RangeQueryResults
*/
jobjectArray RangeSearch(knn_jni::JNIUtilInterface *jniUtil, JNIEnv *env, jlong indexPointerJ, jfloatArray queryVectorJ,
jfloat radiusJ, jobject methodParamsJ, jint maxResultWindowJ, jintArray parentIdsJ);
jfloat radiusJ, jobject methodParamsJ, jint maxResultWindowJ, jintArray parentIdsJ);
}
}

Expand Down
Loading
Loading