Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add BWC for batch ingestion #769

Merged
merged 14 commits into from
Jul 8, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ public void testBatchIngestionWithNeuralSparseProcessor_E2EFlow() throws Excepti
);
List<Map<String, String>> docs = prepareDocsForBulk(0, 5);
addDocsThroughBulk(indexName, TEXT_FIELD_NAME, PIPELINE_NAME, docs, batchSize);
validateDocCountAndEmbedding(indexName, 5, "4", EMBEDDING_FIELD_NAME);
validateDocCountAndDocInfo(indexName, 5, () -> getDocById(indexName, "4"), EMBEDDING_FIELD_NAME, Map.class);
chishui marked this conversation as resolved.
Show resolved Hide resolved
} else {
String modelId = null;
modelId = TestUtils.getModelId(getIngestionPipeline(PIPELINE_NAME), SPARSE_ENCODING_PROCESSOR);
loadModel(modelId);
try {
List<Map<String, String>> docs = prepareDocsForBulk(5, 5);
addDocsThroughBulk(indexName, TEXT_FIELD_NAME, PIPELINE_NAME, docs, batchSize);
validateDocCountAndEmbedding(indexName, 10, "9", EMBEDDING_FIELD_NAME);
validateDocCountAndDocInfo(indexName, 10, () -> getDocById(indexName, "9"), EMBEDDING_FIELD_NAME, Map.class);
} finally {
wipeOfTestResources(indexName, PIPELINE_NAME, modelId, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,21 @@ private void createChunkingIndex(String indexName) throws Exception {
createIndexWithConfiguration(indexName, indexSetting, PIPELINE_NAME);
}

private void validateTestIndex(String indexName, String fieldName, int documentCount, Object expected) {
int docCount = getDocCount(indexName);
assertEquals(documentCount, docCount);
private Map<String, Object> getFirstDocumentInQuery(String indexName, int resultSize) {
MatchAllQueryBuilder query = new MatchAllQueryBuilder();
Map<String, Object> searchResults = search(indexName, query, 10);
Map<String, Object> searchResults = search(indexName, query, resultSize);
assertNotNull(searchResults);
Map<String, Object> document = getFirstInnerHit(searchResults);
assertNotNull(document);
Object documentSource = document.get("_source");
assert (documentSource instanceof Map);
@SuppressWarnings("unchecked")
Map<String, Object> documentSourceMap = (Map<String, Object>) documentSource;
assert (documentSourceMap).containsKey(fieldName);
Object ingestOutputs = documentSourceMap.get(fieldName);
assertEquals(expected, ingestOutputs);
return getFirstInnerHit(searchResults);
}

private void validateTestIndex(String indexName, String fieldName, int documentCount, Object expected) {
Object outputs = validateDocCountAndDocInfo(
indexName,
documentCount,
() -> getFirstDocumentInQuery(indexName, 10),
fieldName,
List.class
);
assertEquals(expected, outputs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,22 @@ public void testBatchIngestion_SparseEncodingProcessor_E2EFlow() throws Exceptio
);
List<Map<String, String>> docs = prepareDocsForBulk(0, 5);
addDocsThroughBulk(indexName, TEXT_FIELD_NAME, SPARSE_PIPELINE, docs, 2);
validateDocCountAndEmbedding(indexName, 5, "4", EMBEDDING_FIELD_NAME);
validateDocCountAndDocInfo(indexName, 5, () -> getDocById(indexName, "4"), EMBEDDING_FIELD_NAME, Map.class);
break;
case MIXED:
sparseModelId = TestUtils.getModelId(getIngestionPipeline(SPARSE_PIPELINE), SPARSE_ENCODING_PROCESSOR);
loadModel(sparseModelId);
List<Map<String, String>> docsForMixed = prepareDocsForBulk(5, 5);
addDocsThroughBulk(indexName, TEXT_FIELD_NAME, SPARSE_PIPELINE, docsForMixed, 3);
validateDocCountAndEmbedding(indexName, 10, "9", EMBEDDING_FIELD_NAME);
validateDocCountAndDocInfo(indexName, 10, () -> getDocById(indexName, "9"), EMBEDDING_FIELD_NAME, Map.class);
break;
case UPGRADED:
try {
sparseModelId = TestUtils.getModelId(getIngestionPipeline(SPARSE_PIPELINE), SPARSE_ENCODING_PROCESSOR);
loadModel(sparseModelId);
List<Map<String, String>> docsForUpgraded = prepareDocsForBulk(10, 5);
addDocsThroughBulk(indexName, TEXT_FIELD_NAME, SPARSE_PIPELINE, docsForUpgraded, 2);
validateDocCountAndEmbedding(indexName, 15, "14", EMBEDDING_FIELD_NAME);
validateDocCountAndDocInfo(indexName, 15, () -> getDocById(indexName, "14"), EMBEDDING_FIELD_NAME, Map.class);
} finally {
wipeOfTestResources(indexName, SPARSE_PIPELINE, sparseModelId, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Set;
import java.util.ArrayList;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -1285,17 +1286,25 @@ protected void wipeOfTestResources(
}
}

protected void validateDocCountAndEmbedding(String indexName, int expectedDocCount, String docId, final String embeddingField) {
protected Object validateDocCountAndDocInfo(
String indexName,
int expectedDocCount,
Supplier<Map<String, Object>> documentSupplier,
final String field,
final Class<?> valueType
) {
int count = getDocCount(indexName);
assertEquals(expectedDocCount, count);
Map<String, Object> document = getDocById(indexName, docId);
Map<String, Object> document = documentSupplier.get();
zhichao-aws marked this conversation as resolved.
Show resolved Hide resolved
assertNotNull(document);
Object documentSource = document.get("_source");
assertTrue(documentSource instanceof Map);
@SuppressWarnings("unchecked")
Map<String, Object> documentSourceMap = (Map<String, Object>) documentSource;
assertTrue(documentSourceMap.containsKey(embeddingField));
Object ingestOutputs = documentSourceMap.get(embeddingField);
assertTrue(ingestOutputs instanceof Map);
assertTrue(documentSourceMap.containsKey(field));
Object outputs = documentSourceMap.get(field);
assertTrue(valueType.isAssignableFrom(outputs.getClass()));
return outputs;
}

/**
Expand Down
Loading