-
Notifications
You must be signed in to change notification settings - Fork 37
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Backport 2.x] Adds Create Index Building block (#57)
Adds Create Index Building block (#38) * Initial impelmentation of CreateIndex * Adds CreateIndex building block * Integrated WorkflowData and made the request async * Addressed PR comments * Added unit test and type field for fetching the payload * Addressed PR Comments --------- (cherry picked from commit a97b7d0) Signed-off-by: Owais Kazi <[email protected]> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
- Loading branch information
1 parent
b15c866
commit 0a45e8f
Showing
5 changed files
with
269 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
115 changes: 115 additions & 0 deletions
115
src/main/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStep.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
package org.opensearch.flowframework.workflow.CreateIndex; | ||
|
||
import com.google.common.base.Charsets; | ||
import com.google.common.io.Resources; | ||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.opensearch.action.admin.indices.create.CreateIndexRequest; | ||
import org.opensearch.action.admin.indices.create.CreateIndexResponse; | ||
import org.opensearch.client.Client; | ||
import org.opensearch.common.settings.Settings; | ||
import org.opensearch.common.xcontent.XContentType; | ||
import org.opensearch.core.action.ActionListener; | ||
import org.opensearch.flowframework.workflow.WorkflowData; | ||
import org.opensearch.flowframework.workflow.WorkflowStep; | ||
|
||
import java.io.IOException; | ||
import java.net.URL; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.CompletableFuture; | ||
|
||
/** | ||
* Step to create an index | ||
*/ | ||
public class CreateIndexStep implements WorkflowStep { | ||
|
||
private static final Logger logger = LogManager.getLogger(CreateIndexStep.class); | ||
private Client client; | ||
private final String NAME = "create_index_step"; | ||
|
||
/** | ||
* Instantiate this class | ||
* @param client Client to create an index | ||
*/ | ||
public CreateIndexStep(Client client) { | ||
this.client = client; | ||
} | ||
|
||
@Override | ||
public CompletableFuture<WorkflowData> execute(List<WorkflowData> data) { | ||
CompletableFuture<WorkflowData> future = new CompletableFuture<>(); | ||
ActionListener<CreateIndexResponse> actionListener = new ActionListener<>() { | ||
|
||
@Override | ||
public void onResponse(CreateIndexResponse createIndexResponse) { | ||
logger.info("created index: {}", createIndexResponse.index()); | ||
future.complete(new WorkflowData() { | ||
@Override | ||
public Map<String, Object> getContent() { | ||
return Map.of("index-name", createIndexResponse.index()); | ||
} | ||
}); | ||
} | ||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
logger.error("Failed to create an index", e); | ||
future.completeExceptionally(e); | ||
} | ||
}; | ||
|
||
String index = null; | ||
String type = null; | ||
Settings settings = null; | ||
|
||
for (WorkflowData workflowData : data) { | ||
Map<String, Object> content = workflowData.getContent(); | ||
index = (String) content.get("index-name"); | ||
type = (String) content.get("type"); | ||
if (index != null && type != null && settings != null) { | ||
break; | ||
} | ||
} | ||
|
||
// TODO: | ||
// 1. Create settings based on the index settings received from content | ||
|
||
try { | ||
CreateIndexRequest request = new CreateIndexRequest(index).mapping( | ||
getIndexMappings("mappings/" + type + ".json"), | ||
XContentType.JSON | ||
); | ||
client.admin().indices().create(request, actionListener); | ||
} catch (Exception e) { | ||
logger.error("Failed to find the right mapping for the index", e); | ||
} | ||
|
||
return future; | ||
} | ||
|
||
@Override | ||
public String getName() { | ||
return NAME; | ||
} | ||
|
||
/** | ||
* Get index mapping json content. | ||
* | ||
* @param mapping type of the index to fetch the specific mapping file | ||
* @return index mapping | ||
* @throws IOException IOException if mapping file can't be read correctly | ||
*/ | ||
private static String getIndexMappings(String mapping) throws IOException { | ||
URL url = CreateIndexStep.class.getClassLoader().getResource(mapping); | ||
return Resources.toString(url, Charsets.UTF_8); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
{ | ||
"properties": { | ||
"desc_v": { | ||
"type": "keyword" | ||
}, | ||
"name_v": { | ||
"type": "keyword" | ||
}, | ||
"description": { | ||
"type": "keyword" | ||
}, | ||
"name": { | ||
"type": "keyword" | ||
} | ||
} | ||
} |
99 changes: 99 additions & 0 deletions
99
src/test/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStepTests.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
package org.opensearch.flowframework.workflow.CreateIndex; | ||
|
||
import org.opensearch.action.admin.indices.create.CreateIndexRequest; | ||
import org.opensearch.action.admin.indices.create.CreateIndexResponse; | ||
import org.opensearch.client.AdminClient; | ||
import org.opensearch.client.Client; | ||
import org.opensearch.client.IndicesAdminClient; | ||
import org.opensearch.core.action.ActionListener; | ||
import org.opensearch.flowframework.workflow.WorkflowData; | ||
import org.opensearch.test.OpenSearchTestCase; | ||
|
||
import java.io.IOException; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.ExecutionException; | ||
|
||
import org.mockito.ArgumentCaptor; | ||
|
||
import static org.mockito.Mockito.any; | ||
import static org.mockito.Mockito.mock; | ||
import static org.mockito.Mockito.times; | ||
import static org.mockito.Mockito.verify; | ||
import static org.mockito.Mockito.when; | ||
|
||
public class CreateIndexStepTests extends OpenSearchTestCase { | ||
|
||
private WorkflowData inputData = WorkflowData.EMPTY; | ||
|
||
private Client client; | ||
|
||
private AdminClient adminClient; | ||
|
||
private IndicesAdminClient indicesAdminClient; | ||
|
||
@Override | ||
public void setUp() throws Exception { | ||
super.setUp(); | ||
|
||
inputData = new WorkflowData() { | ||
|
||
@Override | ||
public Map<String, Object> getContent() { | ||
return Map.ofEntries(Map.entry("index-name", "demo"), Map.entry("type", "knn")); | ||
} | ||
|
||
}; | ||
|
||
client = mock(Client.class); | ||
adminClient = mock(AdminClient.class); | ||
indicesAdminClient = mock(IndicesAdminClient.class); | ||
|
||
when(adminClient.indices()).thenReturn(indicesAdminClient); | ||
when(client.admin()).thenReturn(adminClient); | ||
|
||
} | ||
|
||
public void testCreateIndexStep() throws ExecutionException, InterruptedException, IOException { | ||
|
||
CreateIndexStep createIndexStep = new CreateIndexStep(client); | ||
|
||
ArgumentCaptor<ActionListener> actionListenerCaptor = ArgumentCaptor.forClass(ActionListener.class); | ||
CompletableFuture<WorkflowData> future = createIndexStep.execute(List.of(inputData)); | ||
assertFalse(future.isDone()); | ||
verify(indicesAdminClient, times(1)).create(any(CreateIndexRequest.class), actionListenerCaptor.capture()); | ||
actionListenerCaptor.getValue().onResponse(new CreateIndexResponse(true, true, "demo")); | ||
|
||
assertTrue(future.isDone() && !future.isCompletedExceptionally()); | ||
|
||
Map<String, Object> outputData = Map.of("index-name", "demo"); | ||
assertEquals(outputData, future.get().getContent()); | ||
|
||
} | ||
|
||
public void testCreateIndexStepFailure() throws ExecutionException, InterruptedException { | ||
|
||
CreateIndexStep createIndexStep = new CreateIndexStep(client); | ||
|
||
ArgumentCaptor<ActionListener> actionListenerCaptor = ArgumentCaptor.forClass(ActionListener.class); | ||
CompletableFuture<WorkflowData> future = createIndexStep.execute(List.of(inputData)); | ||
assertFalse(future.isDone()); | ||
verify(indicesAdminClient, times(1)).create(any(CreateIndexRequest.class), actionListenerCaptor.capture()); | ||
|
||
actionListenerCaptor.getValue().onFailure(new Exception("Failed to create an index")); | ||
|
||
assertTrue(future.isCompletedExceptionally()); | ||
ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get().getContent()); | ||
assertTrue(ex.getCause() instanceof Exception); | ||
assertEquals("Failed to create an index", ex.getCause().getMessage()); | ||
} | ||
} |