Skip to content

Commit

Permalink
Adding integ test for new processor
Browse files Browse the repository at this point in the history
Signed-off-by: Martin Gaievski <[email protected]>
  • Loading branch information
martin-gaievski committed Sep 29, 2023
1 parent 2ae2df8 commit 88b65d9
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,18 @@ public abstract class BaseNeuralSearchIT extends OpenSearchSecureRestTestCase {
protected static final String DEFAULT_COMBINATION_METHOD = "arithmetic_mean";
protected static final String PARAM_NAME_WEIGHTS = "weights";

protected String PIPELINE_CONFIGURATION_NAME = "processor/PipelineConfiguration.json";
protected static final Map<ProcessorType, String> PIPELINE_CONFIGS_BY_TYPE = Map.of(
ProcessorType.TEXT_EMBEDDING,
"processor/PipelineConfiguration.json",
ProcessorType.SPARSE_ENCODING,
"processor/SparseEncodingPipelineConfiguration.json",
ProcessorType.TEXT_IMAGE_EMBEDDING,
"processor/PipelineForTextImageEmbeddingProcessorConfiguration.json"
);
// protected String PIPELINE_CONFIGURATION_NAME = "processor/PipelineConfiguration.json";

protected final ClassLoader classLoader = this.getClass().getClassLoader();

protected void setPipelineConfigurationName(String pipelineConfigurationName) {
this.PIPELINE_CONFIGURATION_NAME = pipelineConfigurationName;
}

@Before
public void setupSettings() {
if (isUpdateClusterSettings()) {
Expand Down Expand Up @@ -237,13 +241,21 @@ protected void createIndexWithConfiguration(String indexName, String indexConfig
}

protected void createPipelineProcessor(String modelId, String pipelineName) throws Exception {
createPipelineProcessor(modelId, pipelineName, ProcessorType.TEXT_EMBEDDING);
}

protected void createPipelineProcessor(String modelId, String pipelineName, ProcessorType processorType) throws Exception {
Response pipelineCreateResponse = makeRequest(
client(),
"PUT",
"/_ingest/pipeline/" + pipelineName,
null,
toHttpEntity(
String.format(LOCALE, Files.readString(Path.of(classLoader.getResource(PIPELINE_CONFIGURATION_NAME).toURI())), modelId)
String.format(
LOCALE,
Files.readString(Path.of(classLoader.getResource(PIPELINE_CONFIGS_BY_TYPE.get(processorType)).toURI())),
modelId
)
),
ImmutableList.of(new BasicHeader(HttpHeaders.USER_AGENT, "Kibana"))
);
Expand Down Expand Up @@ -711,4 +723,14 @@ protected String getDeployedModelId() {
assertEquals(1, modelIds.size());
return modelIds.iterator().next();
}

/**
* Enumeration for types of pipeline processors, used to lookup resources like create
* processor request as those are type specific
*/
protected enum ProcessorType {
TEXT_EMBEDDING,
TEXT_IMAGE_EMBEDDING,
SPARSE_ENCODING
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.apache.http.message.BasicHeader;
import org.apache.http.util.EntityUtils;
import org.junit.After;
import org.junit.Before;
import org.opensearch.client.Response;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.XContentType;
Expand All @@ -40,14 +39,9 @@ public void tearDown() {
findDeployedModels().forEach(this::deleteModel);
}

@Before
public void setPipelineName() {
this.setPipelineConfigurationName("processor/SparseEncodingPipelineConfiguration.json");
}

public void testSparseEncodingProcessor() throws Exception {
String modelId = prepareModel();
createPipelineProcessor(modelId, PIPELINE_NAME);
createPipelineProcessor(modelId, PIPELINE_NAME, ProcessorType.SPARSE_ENCODING);
createSparseEncodingIndex();
ingestDocument();
assertEquals(1, getDocCount(INDEX_NAME));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.neuralsearch.processor;

import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;

import lombok.SneakyThrows;

import org.apache.http.HttpHeaders;
import org.apache.http.message.BasicHeader;
import org.apache.http.util.EntityUtils;
import org.junit.After;
import org.opensearch.client.Response;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.neuralsearch.common.BaseNeuralSearchIT;

import com.google.common.collect.ImmutableList;

/**
* Testing text_and_image_embedding ingest processor. We can only test text in integ tests, none of pre-built models
* supports both text and image.
*/
public class TextImageEmbeddingProcessorIT extends BaseNeuralSearchIT {

private static final String INDEX_NAME = "text_image_embedding_index";
private static final String PIPELINE_NAME = "ingest-pipeline";

@After
@SneakyThrows
public void tearDown() {
super.tearDown();
findDeployedModels().forEach(this::deleteModel);
}

public void testEmbeddingProcessor_whenIngestingDocumentWithSourceMatchingTextMapping_thenSuccessful() throws Exception {
String modelId = uploadModel();
loadModel(modelId);
createPipelineProcessor(modelId, PIPELINE_NAME, ProcessorType.TEXT_IMAGE_EMBEDDING);
createTextImageEmbeddingIndex();
ingestDocumentWithTextMappedToEmbeddingField();
assertEquals(1, getDocCount(INDEX_NAME));
}

public void testEmbeddingProcessor_whenIngestingDocumentWithSourceWithoutMatchingInMapping_thenSuccessful() throws Exception {
String modelId = uploadModel();
loadModel(modelId);
createPipelineProcessor(modelId, PIPELINE_NAME, ProcessorType.TEXT_IMAGE_EMBEDDING);
createTextImageEmbeddingIndex();
ingestDocumentWithoutMappedFields();
assertEquals(1, getDocCount(INDEX_NAME));
}

private String uploadModel() throws Exception {
String requestBody = Files.readString(Path.of(classLoader.getResource("processor/UploadModelRequestBody.json").toURI()));
return uploadModel(requestBody);
}

private void createTextImageEmbeddingIndex() throws Exception {
createIndexWithConfiguration(
INDEX_NAME,
Files.readString(Path.of(classLoader.getResource("processor/IndexMappings.json").toURI())),
PIPELINE_NAME
);
}

private void ingestDocumentWithTextMappedToEmbeddingField() throws Exception {
String ingestDocumentBody = "{\n"
+ " \"title\": \"This is a good day\",\n"
+ " \"description\": \"daily logging\",\n"
+ " \"passage_text\": \"A very nice day today\",\n"
+ " \"favorites\": {\n"
+ " \"game\": \"overwatch\",\n"
+ " \"movie\": null\n"
+ " }\n"
+ "}\n";
ingestDocument(ingestDocumentBody);
}

private void ingestDocumentWithoutMappedFields() throws Exception {
String ingestDocumentBody = "{\n"
+ " \"title\": \"This is a good day\",\n"
+ " \"description\": \"daily logging\",\n"
+ " \"some_random_field\": \"Today is a sunny weather\"\n"
+ "}\n";
ingestDocument(ingestDocumentBody);
}

private void ingestDocument(final String ingestDocument) throws Exception {
Response response = makeRequest(
client(),
"POST",
INDEX_NAME + "/_doc?refresh",
null,
toHttpEntity(ingestDocument),
ImmutableList.of(new BasicHeader(HttpHeaders.USER_AGENT, "Kibana"))
);
Map<String, Object> map = XContentHelper.convertToMap(
XContentType.JSON.xContent(),
EntityUtils.toString(response.getEntity()),
false
);
assertEquals("created", map.get("result"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"description": "text image embedding pipeline",
"processors": [
{
"text_image_embedding": {
"model_id": "%s",
"embedding": "passage_embedding",
"field_map": {
"text": "passage_text",
"image": "passage_image"
}
}
}
]
}

0 comments on commit 88b65d9

Please sign in to comment.