From a97b7d0f1f1cc48cb12420c3de8412e50f0de584 Mon Sep 17 00:00:00 2001 From: Owais Kazi Date: Mon, 25 Sep 2023 10:44:33 -0700 Subject: [PATCH] Adds Create Index Building block (#38) * Initial impelmentation of CreateIndex Signed-off-by: Owais Kazi * Adds CreateIndex building block Signed-off-by: Owais Kazi * Integrated WorkflowData and made the request async Signed-off-by: Owais Kazi * Addressed PR comments Signed-off-by: Owais Kazi * Added unit test and type field for fetching the payload Signed-off-by: Owais Kazi * Addressed PR Comments Signed-off-by: Owais Kazi --------- Signed-off-by: Owais Kazi --- build.gradle | 3 +- .../flowframework/FlowFrameworkPlugin.java | 38 +++++- .../workflow/CreateIndex/CreateIndexStep.java | 115 ++++++++++++++++++ src/main/resources/mappings/knn.json | 16 +++ .../CreateIndex/CreateIndexStepTests.java | 99 +++++++++++++++ 5 files changed, 269 insertions(+), 2 deletions(-) create mode 100644 src/main/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStep.java create mode 100644 src/main/resources/mappings/knn.json create mode 100644 src/test/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStepTests.java diff --git a/build.gradle b/build.gradle index aa20423ee..c3b193b15 100644 --- a/build.gradle +++ b/build.gradle @@ -56,6 +56,7 @@ opensearchplugin { dependencyLicenses.enabled = false // This requires an additional Jar not published as part of build-tools loggerUsageCheck.enabled = false +thirdPartyAudit.enabled = false // No need to validate pom, as we do not upload to maven/sonatype validateNebulaPom.enabled = false @@ -106,7 +107,7 @@ dependencies { implementation "org.opensearch:opensearch:${opensearch_version}" implementation 'org.junit.jupiter:junit-jupiter:5.10.0' implementation "com.google.code.gson:gson:2.10.1" - compileOnly "com.google.guava:guava:32.1.2-jre" + implementation "com.google.guava:guava:32.1.2-jre" api group: 'org.opensearch', name:'opensearch-ml-client', version: "${opensearch_build}" configurations.all { diff --git a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java index f810767eb..2f7a34f3f 100644 --- a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java +++ b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java @@ -8,11 +8,47 @@ */ package org.opensearch.flowframework; +import com.google.common.collect.ImmutableList; +import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.env.Environment; +import org.opensearch.env.NodeEnvironment; +import org.opensearch.flowframework.workflow.CreateIndex.CreateIndexStep; import org.opensearch.plugins.Plugin; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.script.ScriptService; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.watcher.ResourceWatcherService; + +import java.util.Collection; +import java.util.function.Supplier; /** * An OpenSearch plugin that enables builders to innovate AI apps on OpenSearch. */ public class FlowFrameworkPlugin extends Plugin { - // Implement the relevant Plugin Interfaces here + + private Client client; + + @Override + public Collection createComponents( + Client client, + ClusterService clusterService, + ThreadPool threadPool, + ResourceWatcherService resourceWatcherService, + ScriptService scriptService, + NamedXContentRegistry xContentRegistry, + Environment environment, + NodeEnvironment nodeEnvironment, + NamedWriteableRegistry namedWriteableRegistry, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier repositoriesServiceSupplier + ) { + this.client = client; + CreateIndexStep createIndexStep = new CreateIndexStep(client); + return ImmutableList.of(createIndexStep); + } } diff --git a/src/main/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStep.java b/src/main/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStep.java new file mode 100644 index 000000000..ebef2cae8 --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStep.java @@ -0,0 +1,115 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.workflow.CreateIndex; + +import com.google.common.base.Charsets; +import com.google.common.io.Resources; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.admin.indices.create.CreateIndexRequest; +import org.opensearch.action.admin.indices.create.CreateIndexResponse; +import org.opensearch.client.Client; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.action.ActionListener; +import org.opensearch.flowframework.workflow.WorkflowData; +import org.opensearch.flowframework.workflow.WorkflowStep; + +import java.io.IOException; +import java.net.URL; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * Step to create an index + */ +public class CreateIndexStep implements WorkflowStep { + + private static final Logger logger = LogManager.getLogger(CreateIndexStep.class); + private Client client; + private final String NAME = "create_index_step"; + + /** + * Instantiate this class + * @param client Client to create an index + */ + public CreateIndexStep(Client client) { + this.client = client; + } + + @Override + public CompletableFuture execute(List data) { + CompletableFuture future = new CompletableFuture<>(); + ActionListener actionListener = new ActionListener<>() { + + @Override + public void onResponse(CreateIndexResponse createIndexResponse) { + logger.info("created index: {}", createIndexResponse.index()); + future.complete(new WorkflowData() { + @Override + public Map getContent() { + return Map.of("index-name", createIndexResponse.index()); + } + }); + } + + @Override + public void onFailure(Exception e) { + logger.error("Failed to create an index", e); + future.completeExceptionally(e); + } + }; + + String index = null; + String type = null; + Settings settings = null; + + for (WorkflowData workflowData : data) { + Map content = workflowData.getContent(); + index = (String) content.get("index-name"); + type = (String) content.get("type"); + if (index != null && type != null && settings != null) { + break; + } + } + + // TODO: + // 1. Create settings based on the index settings received from content + + try { + CreateIndexRequest request = new CreateIndexRequest(index).mapping( + getIndexMappings("mappings/" + type + ".json"), + XContentType.JSON + ); + client.admin().indices().create(request, actionListener); + } catch (Exception e) { + logger.error("Failed to find the right mapping for the index", e); + } + + return future; + } + + @Override + public String getName() { + return NAME; + } + + /** + * Get index mapping json content. + * + * @param mapping type of the index to fetch the specific mapping file + * @return index mapping + * @throws IOException IOException if mapping file can't be read correctly + */ + private static String getIndexMappings(String mapping) throws IOException { + URL url = CreateIndexStep.class.getClassLoader().getResource(mapping); + return Resources.toString(url, Charsets.UTF_8); + } +} diff --git a/src/main/resources/mappings/knn.json b/src/main/resources/mappings/knn.json new file mode 100644 index 000000000..c31946e62 --- /dev/null +++ b/src/main/resources/mappings/knn.json @@ -0,0 +1,16 @@ +{ + "properties": { + "desc_v": { + "type": "keyword" + }, + "name_v": { + "type": "keyword" + }, + "description": { + "type": "keyword" + }, + "name": { + "type": "keyword" + } + } +} diff --git a/src/test/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStepTests.java new file mode 100644 index 000000000..638dea251 --- /dev/null +++ b/src/test/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStepTests.java @@ -0,0 +1,99 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.workflow.CreateIndex; + +import org.opensearch.action.admin.indices.create.CreateIndexRequest; +import org.opensearch.action.admin.indices.create.CreateIndexResponse; +import org.opensearch.client.AdminClient; +import org.opensearch.client.Client; +import org.opensearch.client.IndicesAdminClient; +import org.opensearch.core.action.ActionListener; +import org.opensearch.flowframework.workflow.WorkflowData; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import org.mockito.ArgumentCaptor; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class CreateIndexStepTests extends OpenSearchTestCase { + + private WorkflowData inputData = WorkflowData.EMPTY; + + private Client client; + + private AdminClient adminClient; + + private IndicesAdminClient indicesAdminClient; + + @Override + public void setUp() throws Exception { + super.setUp(); + + inputData = new WorkflowData() { + + @Override + public Map getContent() { + return Map.ofEntries(Map.entry("index-name", "demo"), Map.entry("type", "knn")); + } + + }; + + client = mock(Client.class); + adminClient = mock(AdminClient.class); + indicesAdminClient = mock(IndicesAdminClient.class); + + when(adminClient.indices()).thenReturn(indicesAdminClient); + when(client.admin()).thenReturn(adminClient); + + } + + public void testCreateIndexStep() throws ExecutionException, InterruptedException, IOException { + + CreateIndexStep createIndexStep = new CreateIndexStep(client); + + ArgumentCaptor actionListenerCaptor = ArgumentCaptor.forClass(ActionListener.class); + CompletableFuture future = createIndexStep.execute(List.of(inputData)); + assertFalse(future.isDone()); + verify(indicesAdminClient, times(1)).create(any(CreateIndexRequest.class), actionListenerCaptor.capture()); + actionListenerCaptor.getValue().onResponse(new CreateIndexResponse(true, true, "demo")); + + assertTrue(future.isDone() && !future.isCompletedExceptionally()); + + Map outputData = Map.of("index-name", "demo"); + assertEquals(outputData, future.get().getContent()); + + } + + public void testCreateIndexStepFailure() throws ExecutionException, InterruptedException { + + CreateIndexStep createIndexStep = new CreateIndexStep(client); + + ArgumentCaptor actionListenerCaptor = ArgumentCaptor.forClass(ActionListener.class); + CompletableFuture future = createIndexStep.execute(List.of(inputData)); + assertFalse(future.isDone()); + verify(indicesAdminClient, times(1)).create(any(CreateIndexRequest.class), actionListenerCaptor.capture()); + + actionListenerCaptor.getValue().onFailure(new Exception("Failed to create an index")); + + assertTrue(future.isCompletedExceptionally()); + ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get().getContent()); + assertTrue(ex.getCause() instanceof Exception); + assertEquals("Failed to create an index", ex.getCause().getMessage()); + } +}