Skip to content

Commit

Permalink
Add BWC for batch ingestion (#769)
Browse files Browse the repository at this point in the history
* Add BWC for batch ingestion

Signed-off-by: Liyun Xiu <[email protected]>

* Update Changelog

Signed-off-by: Liyun Xiu <[email protected]>

* Fix spotlessLicenseCheck

Signed-off-by: Liyun Xiu <[email protected]>

* Fix comments

Signed-off-by: Liyun Xiu <[email protected]>

* Reuse the same code

Signed-off-by: Liyun Xiu <[email protected]>

* Rename some functions

Signed-off-by: Liyun Xiu <[email protected]>

* Rename a function

Signed-off-by: Liyun Xiu <[email protected]>

* Minor change to trigger rebuild

Signed-off-by: Liyun Xiu <[email protected]>

---------

Signed-off-by: Liyun Xiu <[email protected]>
  • Loading branch information
chishui authored Jul 8, 2024
1 parent 3ca4b6c commit a5abe3e
Show file tree
Hide file tree
Showing 8 changed files with 235 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Bug Fixes
- Fix for missing HybridQuery results when concurrent segment search is enabled ([#800](https://github.com/opensearch-project/neural-search/pull/800))
### Infrastructure
- Add BWC for batch ingestion ([#769](https://github.com/opensearch-project/neural-search/pull/769))
### Documentation
### Maintenance
### Refactoring
6 changes: 4 additions & 2 deletions qa/restart-upgrade/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,11 @@ task testAgainstOldCluster(type: StandaloneRestIntegTestTask) {
}
}

// Excluding the k-NN radial search tests because we introduce this feature in 2.14
// Excluding the k-NN radial search tests and batch ingestion tests because we introduce these features in 2.14
if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12") || ext.neural_search_bwc_version.startsWith("2.13")){
filter {
excludeTestsMatching "org.opensearch.neuralsearch.bwc.KnnRadialSearchIT.*"
excludeTestsMatching "org.opensearch.neuralsearch.bwc.BatchIngestionIT.*"
}
}

Expand Down Expand Up @@ -146,10 +147,11 @@ task testAgainstNewCluster(type: StandaloneRestIntegTestTask) {
}
}

// Excluding the k-NN radial search tests because we introduce this feature in 2.14
// Excluding the k-NN radial search tests and batch ingestion tests because we introduce these features in 2.14
if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12") || ext.neural_search_bwc_version.startsWith("2.13")){
filter {
excludeTestsMatching "org.opensearch.neuralsearch.bwc.KnnRadialSearchIT.*"
excludeTestsMatching "org.opensearch.neuralsearch.bwc.BatchIngestionIT.*"
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.neuralsearch.bwc;

import org.opensearch.neuralsearch.util.TestUtils;

import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;

import static org.opensearch.neuralsearch.util.BatchIngestionUtils.prepareDataForBulkIngestion;
import static org.opensearch.neuralsearch.util.TestUtils.NODES_BWC_CLUSTER;
import static org.opensearch.neuralsearch.util.TestUtils.SPARSE_ENCODING_PROCESSOR;

public class BatchIngestionIT extends AbstractRestartUpgradeRestTestCase {
private static final String PIPELINE_NAME = "pipeline-BatchIngestionIT";
private static final String TEXT_FIELD_NAME = "passage_text";
private static final String EMBEDDING_FIELD_NAME = "passage_embedding";
private static final int batchSize = 3;

public void testBatchIngestionWithNeuralSparseProcessor_E2EFlow() throws Exception {
waitForClusterHealthGreen(NODES_BWC_CLUSTER);
String indexName = getIndexNameForTest();
if (isRunningAgainstOldCluster()) {
String modelId = uploadSparseEncodingModel();
loadModel(modelId);
createPipelineForSparseEncodingProcessor(modelId, PIPELINE_NAME);
createIndexWithConfiguration(
indexName,
Files.readString(Path.of(classLoader.getResource("processor/SparseIndexMappings.json").toURI())),
PIPELINE_NAME
);
List<Map<String, String>> docs = prepareDataForBulkIngestion(0, 5);
bulkAddDocuments(indexName, TEXT_FIELD_NAME, PIPELINE_NAME, docs, batchSize);
validateDocCountAndInfo(indexName, 5, () -> getDocById(indexName, "4"), EMBEDDING_FIELD_NAME, Map.class);
} else {
String modelId = null;
modelId = TestUtils.getModelId(getIngestionPipeline(PIPELINE_NAME), SPARSE_ENCODING_PROCESSOR);
loadModel(modelId);
try {
List<Map<String, String>> docs = prepareDataForBulkIngestion(5, 5);
bulkAddDocuments(indexName, TEXT_FIELD_NAME, PIPELINE_NAME, docs, batchSize);
validateDocCountAndInfo(indexName, 10, () -> getDocById(indexName, "9"), EMBEDDING_FIELD_NAME, Map.class);
} finally {
wipeOfTestResources(indexName, PIPELINE_NAME, modelId, null);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,21 @@ private void createChunkingIndex(String indexName) throws Exception {
createIndexWithConfiguration(indexName, indexSetting, PIPELINE_NAME);
}

private void validateTestIndex(String indexName, String fieldName, int documentCount, Object expected) {
int docCount = getDocCount(indexName);
assertEquals(documentCount, docCount);
private Map<String, Object> getFirstDocumentInQuery(String indexName, int resultSize) {
MatchAllQueryBuilder query = new MatchAllQueryBuilder();
Map<String, Object> searchResults = search(indexName, query, 10);
Map<String, Object> searchResults = search(indexName, query, resultSize);
assertNotNull(searchResults);
Map<String, Object> document = getFirstInnerHit(searchResults);
assertNotNull(document);
Object documentSource = document.get("_source");
assert (documentSource instanceof Map);
@SuppressWarnings("unchecked")
Map<String, Object> documentSourceMap = (Map<String, Object>) documentSource;
assert (documentSourceMap).containsKey(fieldName);
Object ingestOutputs = documentSourceMap.get(fieldName);
assertEquals(expected, ingestOutputs);
return getFirstInnerHit(searchResults);
}

private void validateTestIndex(String indexName, String fieldName, int documentCount, Object expected) {
Object outputs = validateDocCountAndInfo(
indexName,
documentCount,
() -> getFirstDocumentInQuery(indexName, 10),
fieldName,
List.class
);
assertEquals(expected, outputs);
}
}
12 changes: 8 additions & 4 deletions qa/rolling-upgrade/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,11 @@ task testAgainstOldCluster(type: StandaloneRestIntegTestTask) {
}
}

// Excluding the k-NN radial search tests because we introduce this feature in 2.14
// Excluding the k-NN radial search and batch ingestion tests because we introduce these features in 2.14
if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12") || ext.neural_search_bwc_version.startsWith("2.13")){
filter {
excludeTestsMatching "org.opensearch.neuralsearch.bwc.KnnRadialSearchIT.*"
excludeTestsMatching "org.opensearch.neuralsearch.bwc.BatchIngestionIT.*"
}
}

Expand Down Expand Up @@ -147,10 +148,11 @@ task testAgainstOneThirdUpgradedCluster(type: StandaloneRestIntegTestTask) {
}
}

// Excluding the k-NN radial search tests because we introduce this feature in 2.14
// Excluding the k-NN radial search and batch ingestion tests because we introduce these features in 2.14
if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12") || ext.neural_search_bwc_version.startsWith("2.13")){
filter {
excludeTestsMatching "org.opensearch.neuralsearch.bwc.KnnRadialSearchIT.*"
excludeTestsMatching "org.opensearch.neuralsearch.bwc.BatchIngestionIT.*"
}
}

Expand Down Expand Up @@ -203,10 +205,11 @@ task testAgainstTwoThirdsUpgradedCluster(type: StandaloneRestIntegTestTask) {
}
}

// Excluding the k-NN radial search tests because we introduce this feature in 2.14
// Excluding the k-NN radial search and batch ingestion tests because we introduce these features in 2.14
if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12") || ext.neural_search_bwc_version.startsWith("2.13")){
filter {
excludeTestsMatching "org.opensearch.neuralsearch.bwc.KnnRadialSearchIT.*"
excludeTestsMatching "org.opensearch.neuralsearch.bwc.BatchIngestionIT.*"
}
}

Expand Down Expand Up @@ -259,10 +262,11 @@ task testRollingUpgrade(type: StandaloneRestIntegTestTask) {
}
}

// Excluding the k-NN radial search tests because we introduce this feature in 2.14
// Excluding the k-NN radial search and batch ingestion tests because we introduce these features in 2.14
if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12") || ext.neural_search_bwc_version.startsWith("2.13")){
filter {
excludeTestsMatching "org.opensearch.neuralsearch.bwc.KnnRadialSearchIT.*"
excludeTestsMatching "org.opensearch.neuralsearch.bwc.BatchIngestionIT.*"
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.neuralsearch.bwc;

import org.opensearch.neuralsearch.util.TestUtils;

import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;

import static org.opensearch.neuralsearch.util.BatchIngestionUtils.prepareDataForBulkIngestion;
import static org.opensearch.neuralsearch.util.TestUtils.NODES_BWC_CLUSTER;
import static org.opensearch.neuralsearch.util.TestUtils.SPARSE_ENCODING_PROCESSOR;

public class BatchIngestionIT extends AbstractRollingUpgradeTestCase {
private static final String SPARSE_PIPELINE = "BatchIngestionIT_sparse_pipeline_rolling";
private static final String TEXT_FIELD_NAME = "passage_text";
private static final String EMBEDDING_FIELD_NAME = "passage_embedding";

public void testBatchIngestion_SparseEncodingProcessor_E2EFlow() throws Exception {
waitForClusterHealthGreen(NODES_BWC_CLUSTER);
String indexName = getIndexNameForTest();
String sparseModelId = null;
switch (getClusterType()) {
case OLD:
sparseModelId = uploadSparseEncodingModel();
loadModel(sparseModelId);
createPipelineForSparseEncodingProcessor(sparseModelId, SPARSE_PIPELINE);
createIndexWithConfiguration(
indexName,
Files.readString(Path.of(classLoader.getResource("processor/SparseIndexMappings.json").toURI())),
SPARSE_PIPELINE
);
List<Map<String, String>> docs = prepareDataForBulkIngestion(0, 5);
bulkAddDocuments(indexName, TEXT_FIELD_NAME, SPARSE_PIPELINE, docs, 2);
validateDocCountAndInfo(indexName, 5, () -> getDocById(indexName, "4"), EMBEDDING_FIELD_NAME, Map.class);
break;
case MIXED:
sparseModelId = TestUtils.getModelId(getIngestionPipeline(SPARSE_PIPELINE), SPARSE_ENCODING_PROCESSOR);
loadModel(sparseModelId);
List<Map<String, String>> docsForMixed = prepareDataForBulkIngestion(5, 5);
bulkAddDocuments(indexName, TEXT_FIELD_NAME, SPARSE_PIPELINE, docsForMixed, 3);
validateDocCountAndInfo(indexName, 10, () -> getDocById(indexName, "9"), EMBEDDING_FIELD_NAME, Map.class);
break;
case UPGRADED:
try {
sparseModelId = TestUtils.getModelId(getIngestionPipeline(SPARSE_PIPELINE), SPARSE_ENCODING_PROCESSOR);
loadModel(sparseModelId);
List<Map<String, String>> docsForUpgraded = prepareDataForBulkIngestion(10, 5);
bulkAddDocuments(indexName, TEXT_FIELD_NAME, SPARSE_PIPELINE, docsForUpgraded, 2);
validateDocCountAndInfo(indexName, 15, () -> getDocById(indexName, "14"), EMBEDDING_FIELD_NAME, Map.class);
} finally {
wipeOfTestResources(indexName, SPARSE_PIPELINE, sparseModelId, null);
}
break;
default:
throw new IllegalStateException("Unexpected value: " + getClusterType());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Set;
import java.util.ArrayList;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -722,6 +723,36 @@ protected void addSparseEncodingDoc(
assertEquals(request.getEndpoint() + ": failed", RestStatus.CREATED, RestStatus.fromCode(response.getStatusLine().getStatusCode()));
}

protected void bulkAddDocuments(
final String index,
final String textField,
final String pipeline,
final List<Map<String, String>> docs,
final int batchSize
) throws IOException, ParseException {
StringBuilder builder = new StringBuilder();
for (int i = 0; i < docs.size(); ++i) {
String doc = String.format(
Locale.ROOT,
"{ \"index\": { \"_index\": \"%s\", \"_id\": \"%s\" } },\n" + "{ \"%s\": \"%s\"}",
index,
docs.get(i).get("id"),
textField,
docs.get(i).get("text")
);
builder.append(doc);
builder.append("\n");
}
Request request = new Request(
"POST",
String.format(Locale.ROOT, "/_bulk?refresh=true&pipeline=%s&batch_size=%d", pipeline, batchSize)
);
request.setJsonEntity(builder.toString());

Response response = client().performRequest(request);
assertEquals(request.getEndpoint() + ": failed", RestStatus.OK, RestStatus.fromCode(response.getStatusLine().getStatusCode()));
}

/**
* Parse the first returned hit from a search response as a map
*
Expand Down Expand Up @@ -1286,6 +1317,27 @@ protected void wipeOfTestResources(
}
}

protected Object validateDocCountAndInfo(
String indexName,
int expectedDocCount,
Supplier<Map<String, Object>> documentSupplier,
final String field,
final Class<?> valueType
) {
int count = getDocCount(indexName);
assertEquals(expectedDocCount, count);
Map<String, Object> document = documentSupplier.get();
assertNotNull(document);
Object documentSource = document.get("_source");
assertTrue(documentSource instanceof Map);
@SuppressWarnings("unchecked")
Map<String, Object> documentSourceMap = (Map<String, Object>) documentSource;
assertTrue(documentSourceMap.containsKey(field));
Object outputs = documentSourceMap.get(field);
assertTrue(valueType.isAssignableFrom(outputs.getClass()));
return outputs;
}

/**
* Enumeration for types of pipeline processors, used to lookup resources like create
* processor request as those are type specific
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.neuralsearch.util;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* A helper class to build docs for bulk request which is used by batch ingestion tests.
*/
public class BatchIngestionUtils {
private static final List<String> TEXTS = Arrays.asList(
"hello",
"world",
"an apple",
"find me",
"birdy",
"flying piggy",
"newspaper",
"dynamic programming",
"random text",
"finally"
);

public static List<Map<String, String>> prepareDataForBulkIngestion(int startId, int count) {
List<Map<String, String>> docs = new ArrayList<>();
for (int i = startId; i < startId + count; ++i) {
Map<String, String> params = new HashMap<>();
params.put("id", Integer.toString(i));
params.put("text", TEXTS.get(i % TEXTS.size()));
docs.add(params);
}
return docs;
}
}

0 comments on commit a5abe3e

Please sign in to comment.