Skip to content

Commit

Permalink
Feature/quantization integration (#1976)
Browse files Browse the repository at this point in the history
* Integrate KNNVectorValues with vector ANN Search flow (#1952)

Signed-off-by: Navneet Verma <[email protected]>

* Iterative index integration (#1956)

* Iterative Vector Insertion (#1840)

* Rebased with new version of k-NN

Signed-off-by: Andrew Klepchick <[email protected]>

* Optimized faiss insertion

Signed-off-by: Andrew Klepchick <[email protected]>

* Optimized threadCount logic

Signed-off-by: Andrew Klepchick <[email protected]>

* Removed IDEA files

Signed-off-by: Andrew Klepchick <[email protected]>

* Removed unnecessary cmake file

Signed-off-by: Andrew Klepchick <[email protected]>

* Added comments to new functions

Signed-off-by: Andrew Klepchick <[email protected]>

* Removed createIndex and fixed test cases that use it

Signed-off-by: Andrew Klepchick <[email protected]>

* Removed unused code

Signed-off-by: Andrew Klepchick <[email protected]>

* Explained zero initialization for vector transfer

Signed-off-by: Andrew Klepchick <[email protected]>

* Added locale

Signed-off-by: Andrew Klepchick <[email protected]>

* Spotless Apply

Signed-off-by: Andrew Klepchick <[email protected]>

* Account for zero documents in finished batch

Signed-off-by: Andrew Klepchick <[email protected]>

* Changed where we check for zero docs

Signed-off-by: Andrew Klepchick <[email protected]>

* Changed tip for return

Signed-off-by: Andrew Klepchick <[email protected]>

* Use unique pointers to make sure resources are released on exception

Signed-off-by: Andrew Klepchick <[email protected]>

* Moved createIndex to testUtils

Signed-off-by: Andrew Klepchick <[email protected]>

* Fixed memory management so that the underlying index is not deleted after initialized

Signed-off-by: Andrew Klepchick <[email protected]>

* Created new KNNIndexBuilder graph to make index building more modular

Signed-off-by: Andrew Klepchick <[email protected]>

* Streamlined logic in KNNIndexBuilder.

Signed-off-by: Andrew Klepchick <[email protected]>

* Cleaned up unnecessary code in KNN80DocValuesConsumer

Signed-off-by: Andrew Klepchick <[email protected]>

* Fixed memory management process

Signed-off-by: Andrew Klepchick <[email protected]>

* Added note about index initialization in faiss_index_service

Signed-off-by: Andrew Klepchick <[email protected]>

* Accounted for case where the exception happens after the indexWriter is released.

Signed-off-by: Andrew Klepchick <[email protected]>

* Delete jni/src/.idea/modules.xml

Signed-off-by: Andrew Klepchick <[email protected]>

* Delete jni/src/.idea/vcs.xml

Signed-off-by: Andrew Klepchick <[email protected]>

* Delete jni/src/.idea/workspace.xml

Signed-off-by: Andrew Klepchick <[email protected]>

* Spotless apply and free iterative index on exception

Signed-off-by: Andrew Klepchick <[email protected]>

* Undid hack for checking first document metrics

Signed-off-by: Andrew Klepchick <[email protected]>

* Removed print statements

Signed-off-by: Andrew Klepchick <[email protected]>

* Free Vector Transfer on batch ingestion

Signed-off-by: Andrew Klepchick <[email protected]>

* Undid free

Signed-off-by: Andrew Klepchick <[email protected]>

* Fixed check for transfer ready

Signed-off-by: Andrew Klepchick <[email protected]>

* Don't crash when zero vectors inserted?

Signed-off-by: Andrew Klepchick <[email protected]>

* Reverted to old insertion process?

Signed-off-by: Andrew Klepchick <[email protected]>

* Spotless apply

Signed-off-by: Andrew Klepchick <[email protected]>

* Added back createOutput

Signed-off-by: Andrew Klepchick <[email protected]>

* Removed prior createOutput

Signed-off-by: Andrew Klepchick <[email protected]>

* Test remaking vectorTransfer

Signed-off-by: Andrew Klepchick <[email protected]>

* Test restructuring of insertion

Signed-off-by: Andrew Klepchick <[email protected]>

* Fixed case where vector address is immediately discarded

Signed-off-by: Andrew Klepchick <[email protected]>

* Spotless apply

Signed-off-by: Andrew Klepchick <[email protected]>

* Split Index Builder into multiple classes

Signed-off-by: Andrew Klepchick <[email protected]>

* Fixed descriptions of functions in faiss_index_service

Signed-off-by: Andrew Klepchick <[email protected]>

* Added back copyright files

Signed-off-by: Andrew Klepchick <[email protected]>

* Removed unused builder names

Signed-off-by: Andrew Klepchick <[email protected]>

* Modified tests to work with new insertion methods

Signed-off-by: Andrew Klepchick <[email protected]>

* Track index insertions

Signed-off-by: Andrew Klepchick <[email protected]>

* Tracked insertions for binary indices

Signed-off-by: Andrew Klepchick <[email protected]>

* Added back insertIds

Signed-off-by: Andrew Klepchick <[email protected]>

* Added check for freeVectorData to see if it works with an already deleted address

Signed-off-by: Andrew Klepchick <[email protected]>

* Cleaned up logs and comments in KNNIndexBuilder

Signed-off-by: Andrew Klepchick <[email protected]>

* Restructured the logic for KNNIndexBuilder

Signed-off-by: Andrew Klepchick <[email protected]>

* Changed package name of KNNIndexBuilder

Signed-off-by: Andrew Klepchick <[email protected]>

* Changed all package names and deleted unnecessary headers

Signed-off-by: Andrew Klepchick <[email protected]>

* Fixed for loop

Signed-off-by: Andrew Klepchick <[email protected]>

* Removed createIndex methods for faiss index service

Signed-off-by: Andrew Klepchick <[email protected]>

* Fixed package to fit naming conventions

Signed-off-by: Andrew Klepchick <[email protected]>

* Changed name of index builder

Signed-off-by: Andrew Klepchick <[email protected]>

* Spotless apply

Signed-off-by: Andrew Klepchick <[email protected]>

* Added comments to NativeIndexBuilder and restructured

Signed-off-by: Andrew Klepchick <[email protected]>

* Added deletion for memoryAddress

Signed-off-by: Andrew Klepchick <[email protected]>

* Spotless apply

Signed-off-by: Andrew Klepchick <[email protected]>

* Changed naming of classes to Writer and changed package name to fit conventions

Signed-off-by: Andrew Klepchick <[email protected]>

* Changed NativeIndexInfo and NativeVectorInfo to follow builder pattern

Signed-off-by: Andrew Klepchick <[email protected]>

* Added feature to changelog

Signed-off-by: Andrew Klepchick <[email protected]>

* Added class descriptions to each NativeIndexWriter

Signed-off-by: Andrew Klepchick <[email protected]>

* Changed name to getBytesPerVector

Signed-off-by: Andrew Klepchick <[email protected]>

* Added == false instead of ! for readability

Signed-off-by: Andrew Klepchick <[email protected]>

* Fixed changelog

Signed-off-by: Andrew Klepchick <[email protected]>

* Fixed naming in docvaluesconsumer

Signed-off-by: Andrew Klepchick <[email protected]>

* SpotlessApply

Signed-off-by: Andrew Klepchick <[email protected]>

* Made it so that we don't reuse testValues and removed a foot gun

Signed-off-by: Andrew Klepchick <[email protected]>

* Removed another foot gun in getIndexInfo

Signed-off-by: Andrew Klepchick <[email protected]>

* Fixed javadoc

Signed-off-by: Andrew Klepchick <[email protected]>

* Added deletion on exception cases

Signed-off-by: Andrew Klepchick <[email protected]>

* Removed unnecessary delete (NativeIndexWriter will handle deletion of vectors on exception)

Signed-off-by: Andrew Klepchick <[email protected]>

* Added correct logger and getWriter method to NativeIndexWriter

Signed-off-by: Andrew Klepchick <[email protected]>

* Ensured memory safety on JNI layer so that Java doesn't have to wrap everything in a try catch loop.

Signed-off-by: Andrew Klepchick <[email protected]>

* Refactored NativeIndexWriter and added comments to FaissService

Signed-off-by: Andrew Klepchick <[email protected]>

* Removed free in the JNIExport since index will always be freed in writeIndex.

Signed-off-by: Andrew Klepchick <[email protected]>

* Changed getVectorTransfer back to accept VectorDataType

Signed-off-by: Andrew Klepchick <[email protected]>

* Reverted free since not guaranteed to be IDMap.

Signed-off-by: Andrew Klepchick <[email protected]>

* Added all processes in addKNNBinaryField to NativeIndexWriter.createKNNIndex

Signed-off-by: Andrew Klepchick <[email protected]>

* Fixed javadoc

Signed-off-by: Andrew Klepchick <[email protected]>

* Applied spotless

Signed-off-by: Andrew Klepchick <[email protected]>

* Added back writeFooter

Signed-off-by: Andrew Klepchick <[email protected]>

* Removed threadCount fron writeIndex

Signed-off-by: Andrew Klepchick <[email protected]>

* Removed redundancies in KNN80DocValuesConsumer

Signed-off-by: Andrew Klepchick <[email protected]>

* Removed serializationMode

Signed-off-by: Andrew Klepchick <[email protected]>

* Fixed changelog

Signed-off-by: Andrew Klepchick <[email protected]>

* Fixed changelog

Signed-off-by: Andrew Klepchick <[email protected]>

* Removed double free test as we don't have to worry about that anymore

Signed-off-by: Andrew Klepchick <[email protected]>

* Accounted for HNSWSQ in index service

Signed-off-by: Andrew Klepchick <[email protected]>

* Removed delete in catch

Signed-off-by: Andrew Klepchick <[email protected]>

* Fixed faiss tests to work with writeIndex

Signed-off-by: Andrew Klepchick <[email protected]>

---------

Signed-off-by: Andrew Klepchick <[email protected]>

* Index Initialization Alloc Method (#1933)

* Added methods for allocating memory before inserting vectors to a faiss index

Signed-off-by: Andrew Klepchick <[email protected]>

* Fixed logic that gets type of index

Signed-off-by: Andrew Klepchick <[email protected]>

* Removed print statement

Signed-off-by: Andrew Klepchick <[email protected]>

* Removed unnecessary iostream

Signed-off-by: Andrew Klepchick <[email protected]>

* Removed flat index

Signed-off-by: Andrew Klepchick <[email protected]>

* Fixed flat index case

Signed-off-by: Andrew Klepchick <[email protected]>

* Fixed naming

Signed-off-by: Andrew Klepchick <[email protected]>

* Properly allocate HNSWSQ storage

Signed-off-by: Andrew Klepchick <[email protected]>

* Removed print statements

Signed-off-by: Andrew Klepchick <[email protected]>

* Fixed changelog

Signed-off-by: Andrew Klepchick <[email protected]>

* Removed unnecessary lib

Signed-off-by: Andrew Klepchick <[email protected]>

* Made alloc adaptive to different code sizes

Signed-off-by: Andrew Klepchick <[email protected]>

---------

Signed-off-by: Andrew Klepchick <[email protected]>

* Integrates FAISS iterative builds with NativeEngines990KnnVectorsFormat

Changes include reusing the same vector buffer in the JNI layer

Signed-off-by: Tejas Shah <[email protected]>

---------

Signed-off-by: Andrew Klepchick <[email protected]>
Signed-off-by: Tejas Shah <[email protected]>
Co-authored-by: Andrew Klepchick <[email protected]>

* Integrates FAISS iterative builds with NativeEngines990KnnVectorsFormat

Changes include reusing the same vector buffer in the JNI layer

Signed-off-by: Tejas Shah <[email protected]>

---------

Signed-off-by: Navneet Verma <[email protected]>
Signed-off-by: Andrew Klepchick <[email protected]>
Signed-off-by: Tejas Shah <[email protected]>
Co-authored-by: Navneet Verma <[email protected]>
Co-authored-by: Andrew Klepchick <[email protected]>
  • Loading branch information
3 people authored Aug 15, 2024
1 parent 415bb17 commit 4313e89
Show file tree
Hide file tree
Showing 20 changed files with 566 additions and 434 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
* Integrate Lucene Vector field with native engines to use KNNVectorFormat during segment creation [#1945](https://github.com/opensearch-project/k-NN/pull/1945)
* Disallow a vector field to have an invalid character for a physical file name. [#1936] (https://github.com/opensearch-project/k-NN/pull/1936)
* Fixed and abstracted functionality for allocating index memory [#1933](https://github.com/opensearch-project/k-NN/pull/1933)
* Fix graph merge stats size calculation [#1844](https://github.com/opensearch-project/k-NN/pull/1844)
* Disallow a vector field to have an invalid character for a physical file name. [#1936](https://github.com/opensearch-project/k-NN/pull/1936)
### Infrastructure
### Documentation
### Maintenance
Expand Down
38 changes: 20 additions & 18 deletions jni/src/faiss_index_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,25 +83,30 @@ jlong IndexService::initIndex(
std::unordered_map<std::string, jobject> parameters
) {
// Create index using Faiss factory method
std::unique_ptr<faiss::Index> indexWriter(faissMethods->indexFactory(dim, indexDescription.c_str(), metric));
std::unique_ptr<faiss::Index> index(faissMethods->indexFactory(dim, indexDescription.c_str(), metric));

// Set thread count if it is passed in as a parameter. Setting this variable will only impact the current thread
if(threadCount != 0) {
omp_set_num_threads(threadCount);
}

// Add extra parameters that cant be configured with the index factory
SetExtraParameters<faiss::Index, faiss::IndexIVF, faiss::IndexHNSW>(jniUtil, env, parameters, indexWriter.get());
SetExtraParameters<faiss::Index, faiss::IndexIVF, faiss::IndexHNSW>(jniUtil, env, parameters, index.get());

// Check that the index does not need to be trained
if(!indexWriter->is_trained) {
if(!index->is_trained) {
throw std::runtime_error("Index is not trained");
}

std::unique_ptr<faiss::IndexIDMap> idMap (faissMethods->indexIdMap(indexWriter.get()));
std::unique_ptr<faiss::IndexIDMap> idMap (faissMethods->indexIdMap(index.get()));
//Makes sure the index is deleted when the destructor is called
idMap->own_fields = true;

allocIndex(dynamic_cast<faiss::Index *>(idMap->index), dim, numVectors);
indexWriter.release();

//Release the ownership so as to make sure not delete the underlying index that is created. The index is needed later
//in insert and write operations
index.release();
return reinterpret_cast<jlong>(idMap.release());
}

Expand Down Expand Up @@ -147,11 +152,8 @@ void IndexService::writeIndex(
// Write the index to disk
faissMethods->writeIndex(idMap.get(), indexPath.c_str());
} catch(std::exception &e) {
delete idMap->index;
throw std::runtime_error("Failed to write index to disk");
}
// Free the memory used by the index
delete idMap->index;
}

BinaryIndexService::BinaryIndexService(std::unique_ptr<FaissMethods> faissMethods) : IndexService(std::move(faissMethods)) {}
Expand All @@ -175,25 +177,29 @@ jlong BinaryIndexService::initIndex(
std::unordered_map<std::string, jobject> parameters
) {
// Create index using Faiss factory method
std::unique_ptr<faiss::IndexBinary> indexWriter(faissMethods->indexBinaryFactory(dim, indexDescription.c_str()));

std::unique_ptr<faiss::IndexBinary> index(faissMethods->indexBinaryFactory(dim, indexDescription.c_str()));
// Set thread count if it is passed in as a parameter. Setting this variable will only impact the current thread
if(threadCount != 0) {
omp_set_num_threads(threadCount);
}

// Add extra parameters that cant be configured with the index factory
SetExtraParameters<faiss::IndexBinary, faiss::IndexBinaryIVF, faiss::IndexBinaryHNSW>(jniUtil, env, parameters, indexWriter.get());
SetExtraParameters<faiss::IndexBinary, faiss::IndexBinaryIVF, faiss::IndexBinaryHNSW>(jniUtil, env, parameters, index.get());

// Check that the index does not need to be trained
if(!indexWriter->is_trained) {
if(!index->is_trained) {
throw std::runtime_error("Index is not trained");
}

std::unique_ptr<faiss::IndexBinaryIDMap> idMap(faissMethods->indexBinaryIdMap(indexWriter.get()));
std::unique_ptr<faiss::IndexBinaryIDMap> idMap(faissMethods->indexBinaryIdMap(index.get()));
//Makes sure the index is deleted when the destructor is called
idMap->own_fields = true;

allocIndex(dynamic_cast<faiss::Index *>(idMap->index), dim, numVectors);
indexWriter.release();

//Release the ownership so as to make sure not delete the underlying index that is created. The index is needed later
//in insert and write operations
index.release();
return reinterpret_cast<jlong>(idMap.release());
}

Expand Down Expand Up @@ -240,12 +246,8 @@ void BinaryIndexService::writeIndex(
// Write the index to disk
faissMethods->writeIndexBinary(idMap.get(), indexPath.c_str());
} catch(std::exception &e) {
delete idMap->index;
throw std::runtime_error("Failed to write index to disk");
}

// Free the memory used by the index
delete idMap->index;
}

} // namespace faiss_wrapper
Expand Down
24 changes: 13 additions & 11 deletions src/main/java/org/opensearch/knn/common/KNNVectorUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import lombok.AccessLevel;
import lombok.NoArgsConstructor;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

@NoArgsConstructor(access = AccessLevel.PRIVATE)
Expand Down Expand Up @@ -45,17 +45,19 @@ public static boolean isZeroVector(float[] vector) {
return true;
}

/**
* Creates an int overflow safe arraylist. If there is an overflow it will create a list with default initial size
* @param batchSize size to allocate
* @return an arrayList
/*
* Converts an integer List to and array
* @param integerList
* @return null if list is null or empty, int[] otherwise
*/
public static <T> ArrayList<T> createArrayList(long batchSize) {
try {
return new ArrayList<>(Math.toIntExact(batchSize));
} catch (Exception exception) {
// No-op
public static int[] intListToArray(final List<Integer> integerList) {
if (integerList == null || integerList.isEmpty()) {
return null;
}
int[] intArray = new int[integerList.size()];
for (int i = 0; i < integerList.size(); i++) {
intArray[i] = integerList.get(i);
}
return new ArrayList<>();
return intArray;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,50 +6,62 @@
package org.opensearch.knn.index.codec.nativeindex;

import org.opensearch.knn.common.KNNConstants;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.VectorDataType;
import org.opensearch.knn.index.codec.nativeindex.model.BuildIndexParams;
import org.opensearch.knn.index.codec.transfer.OffHeapByteQuantizedVectorTransfer;
import org.opensearch.knn.index.codec.transfer.OffHeapByteVectorTransfer;
import org.opensearch.knn.index.codec.transfer.OffHeapFloatVectorTransfer;
import org.opensearch.knn.index.codec.transfer.VectorTransfer;
import org.opensearch.knn.index.vectorvalues.KNNFloatVectorValues;
import org.opensearch.knn.index.codec.transfer.OffHeapVectorTransfer;
import org.opensearch.knn.index.vectorvalues.KNNVectorValues;
import org.opensearch.knn.jni.JNIService;

import java.io.IOException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;
import static org.opensearch.knn.common.KNNConstants.MODEL_ID;
import static org.opensearch.knn.common.KNNVectorUtil.intListToArray;

/**
* Transfers all vectors to offheap and then builds an index
* Transfers all vectors to off heap and then builds an index
*/
final class VectorTransferIndexBuildStrategy implements NativeIndexBuildStrategy {
final class DefaultIndexBuildStrategy implements NativeIndexBuildStrategy {

private static VectorTransferIndexBuildStrategy INSTANCE = new VectorTransferIndexBuildStrategy();
private static DefaultIndexBuildStrategy INSTANCE = new DefaultIndexBuildStrategy();

public static VectorTransferIndexBuildStrategy getInstance() {
public static DefaultIndexBuildStrategy getInstance() {
return INSTANCE;
}

private VectorTransferIndexBuildStrategy() {}
private DefaultIndexBuildStrategy() {}

public void buildAndWriteIndex(final BuildIndexParams indexInfo, final KNNVectorValues<?> knnVectorValues) throws IOException {
// iterating it once to be safe
knnVectorValues.init();
try (final VectorTransfer vectorTransfer = getVectorTransfer(indexInfo.getVectorDataType(), knnVectorValues)) {
vectorTransfer.transferBatch();
assert !vectorTransfer.hasNext();
knnVectorValues.init(); // to get bytesPerVector
int transferLimit = (int) Math.max(1, KNNSettings.getVectorStreamingMemoryLimit().getBytes() / knnVectorValues.bytesPerVector());
try (final OffHeapVectorTransfer vectorTransfer = getVectorTransfer(indexInfo.getVectorDataType(), transferLimit)) {

final List<Integer> tranferredDocIds = new ArrayList<>();
while (knnVectorValues.docId() != NO_MORE_DOCS) {
// append is true here so off heap memory buffer isn't overwritten
vectorTransfer.transfer(knnVectorValues.conditionalCloneVector(), true);
tranferredDocIds.add(knnVectorValues.docId());
knnVectorValues.nextDoc();
}
vectorTransfer.flush(true);

final Map<String, Object> params = indexInfo.getParameters();
long vectorAddress = vectorTransfer.getVectorAddress();
// Currently this is if else as there are only two cases, with more cases this will have to be made
// more maintainable
if (params.containsKey(MODEL_ID)) {
AccessController.doPrivileged((PrivilegedAction<Void>) () -> {
JNIService.createIndexFromTemplate(
vectorTransfer.getTransferredDocsIds(),
vectorTransfer.getVectorAddress(),
intListToArray(tranferredDocIds),
vectorAddress,
knnVectorValues.dimension(),
indexInfo.getIndexPath(),
(byte[]) params.get(KNNConstants.MODEL_BLOB_PARAMETER),
Expand All @@ -61,8 +73,8 @@ public void buildAndWriteIndex(final BuildIndexParams indexInfo, final KNNVector
} else {
AccessController.doPrivileged((PrivilegedAction<Void>) () -> {
JNIService.createIndex(
vectorTransfer.getTransferredDocsIds(),
vectorTransfer.getVectorAddress(),
intListToArray(tranferredDocIds),
vectorAddress,
knnVectorValues.dimension(),
indexInfo.getIndexPath(),
indexInfo.getParameters(),
Expand All @@ -77,13 +89,13 @@ public void buildAndWriteIndex(final BuildIndexParams indexInfo, final KNNVector
}
}

private VectorTransfer getVectorTransfer(VectorDataType vectorDataType, KNNVectorValues<?> knnVectorValues) throws IOException {
private <T> OffHeapVectorTransfer<T> getVectorTransfer(VectorDataType vectorDataType, int transferLimit) throws IOException {
switch (vectorDataType) {
case FLOAT:
return new OffHeapFloatVectorTransfer((KNNFloatVectorValues) knnVectorValues, knnVectorValues.totalLiveDocs());
return (OffHeapVectorTransfer<T>) new OffHeapFloatVectorTransfer(transferLimit);
case BINARY:
case BYTE:
return new OffHeapByteQuantizedVectorTransfer<>((KNNVectorValues<byte[]>) knnVectorValues, knnVectorValues.totalLiveDocs());
return (OffHeapVectorTransfer<T>) new OffHeapByteVectorTransfer(transferLimit);
default:
throw new IllegalArgumentException("Unsupported vector data type: " + vectorDataType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,26 @@

package org.opensearch.knn.index.codec.nativeindex;

import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.VectorDataType;
import org.opensearch.knn.index.codec.nativeindex.model.BuildIndexParams;
import org.opensearch.knn.index.codec.transfer.OffHeapByteQuantizedVectorTransfer;
import org.opensearch.knn.index.codec.transfer.OffHeapByteVectorTransfer;
import org.opensearch.knn.index.codec.transfer.OffHeapFloatVectorTransfer;
import org.opensearch.knn.index.codec.transfer.VectorTransfer;
import org.opensearch.knn.index.codec.transfer.OffHeapVectorTransfer;
import org.opensearch.knn.index.engine.KNNEngine;
import org.opensearch.knn.index.vectorvalues.KNNFloatVectorValues;
import org.opensearch.knn.index.vectorvalues.KNNVectorValues;
import org.opensearch.knn.jni.JNIService;

import java.io.IOException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;
import static org.opensearch.knn.common.KNNVectorUtil.intListToArray;

/**
* Iteratively builds the index.
*/
Expand All @@ -33,7 +38,8 @@ public static MemOptimizedNativeIndexBuildStrategy getInstance() {

private MemOptimizedNativeIndexBuildStrategy() {}

public void buildAndWriteIndex(BuildIndexParams indexInfo, final KNNVectorValues<?> knnVectorValues) throws IOException {
public void buildAndWriteIndex(final BuildIndexParams indexInfo, final KNNVectorValues<?> knnVectorValues) throws IOException {
// Needed to make sure we dont get 0 dimensions while initializing index
knnVectorValues.init();
KNNEngine engine = indexInfo.getKnnEngine();
Map<String, Object> indexParameters = indexInfo.getParameters();
Expand All @@ -48,18 +54,49 @@ public void buildAndWriteIndex(BuildIndexParams indexInfo, final KNNVectorValues
)
);

try (final VectorTransfer vectorTransfer = getVectorTransfer(indexInfo.getVectorDataType(), knnVectorValues)) {
int transferLimit = (int) Math.max(1, KNNSettings.getVectorStreamingMemoryLimit().getBytes() / knnVectorValues.bytesPerVector());
try (final OffHeapVectorTransfer vectorTransfer = getVectorTransfer(indexInfo.getVectorDataType(), transferLimit)) {

while (vectorTransfer.hasNext()) {
vectorTransfer.transferBatch();
long vectorAddress = vectorTransfer.getVectorAddress();
int[] docs = vectorTransfer.getTransferredDocsIds();
final List<Integer> tranferredDocIds = new ArrayList<>();
while (knnVectorValues.docId() != NO_MORE_DOCS) {
// append is false to be able to reuse the memory location
boolean transferred = vectorTransfer.transfer(knnVectorValues.conditionalCloneVector(), false);
tranferredDocIds.add(knnVectorValues.docId());
if (transferred) {
// Insert vectors
long vectorAddress = vectorTransfer.getVectorAddress();
AccessController.doPrivileged((PrivilegedAction<Void>) () -> {
JNIService.insertToIndex(
intListToArray(tranferredDocIds),
vectorAddress,
knnVectorValues.dimension(),
indexParameters,
indexMemoryAddress,
engine
);
return null;
});
tranferredDocIds.clear();
}
knnVectorValues.nextDoc();
}

// Insert vectors
boolean flush = vectorTransfer.flush(false);
// Need to make sure that the flushed vectors are indexed
if (flush) {
long vectorAddress = vectorTransfer.getVectorAddress();
AccessController.doPrivileged((PrivilegedAction<Void>) () -> {
JNIService.insertToIndex(docs, vectorAddress, knnVectorValues.dimension(), indexParameters, indexMemoryAddress, engine);
JNIService.insertToIndex(
intListToArray(tranferredDocIds),
vectorAddress,
knnVectorValues.dimension(),
indexParameters,
indexMemoryAddress,
engine
);
return null;
});
tranferredDocIds.clear();
}

// Write vector
Expand All @@ -73,14 +110,13 @@ public void buildAndWriteIndex(BuildIndexParams indexInfo, final KNNVectorValues
}
}

// TODO: Will probably need a factory once quantization is added
private VectorTransfer getVectorTransfer(VectorDataType vectorDataType, KNNVectorValues<?> knnVectorValues) throws IOException {
private <T> OffHeapVectorTransfer<T> getVectorTransfer(final VectorDataType vectorDataType, final int transferLimit) {
switch (vectorDataType) {
case FLOAT:
return new OffHeapFloatVectorTransfer((KNNFloatVectorValues) knnVectorValues);
return (OffHeapVectorTransfer<T>) new OffHeapFloatVectorTransfer(transferLimit);
case BINARY:
case BYTE:
return new OffHeapByteQuantizedVectorTransfer<>((KNNVectorValues<byte[]>) knnVectorValues);
return (OffHeapVectorTransfer<T>) new OffHeapByteVectorTransfer(transferLimit);
default:
throw new IllegalArgumentException("Unsupported vector data type: " + vectorDataType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public static NativeIndexWriter getWriter(final FieldInfo fieldInfo, SegmentWrit
if (iterative) {
return new NativeIndexWriter(state, fieldInfo, MemOptimizedNativeIndexBuildStrategy.getInstance());
}
return new NativeIndexWriter(state, fieldInfo, VectorTransferIndexBuildStrategy.getInstance());
return new NativeIndexWriter(state, fieldInfo, DefaultIndexBuildStrategy.getInstance());
}

/**
Expand Down
Loading

0 comments on commit 4313e89

Please sign in to comment.