Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modifies use case template format and adds graph validation when provisioning #119

Merged
merged 17 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,22 @@ private CommonValue() {}
/** The provision workflow thread pool name */
public static final String PROVISION_THREAD_POOL = "opensearch_workflow_provision";

/** Index name field */
public static final String INDEX_NAME = "index_name";
/** Type field */
public static final String TYPE = "type";
/** ID Field */
public static final String ID = "id";
/** Pipeline Id field */
public static final String PIPELINE_ID = "pipeline_id";
/** Processors field */
public static final String PROCESSORS = "processors";
/** Field map field */
public static final String FIELD_MAP = "field_map";
/** Input Field Name field */
public static final String INPUT_FIELD_NAME = "input_field_name";
/** Output Field Name field */
public static final String OUTPUT_FIELD_NAME = "output_field_name";
/** Model Id field */
public static final String MODEL_ID = "model_id";
/** Function Name field */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ public enum FlowFrameworkIndex {
ThrowingSupplierWrapper.throwingSupplierWrapper(FlowFrameworkIndicesHandler::getGlobalContextMappings),
GLOBAL_CONTEXT_INDEX_VERSION
),
/**
* Workflow State Index
*/
WORKFLOW_STATE(
WORKFLOW_STATE_INDEX,
ThrowingSupplierWrapper.throwingSupplierWrapper(FlowFrameworkIndicesHandler::getWorkflowStateMappings),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
*/
// TODO: transfer this to more detailed array for each step
public enum ProvisioningProgress {
/** Not Started State */
NOT_STARTED,
/** In Progress State */
IN_PROGRESS,
/** Done State */
DONE
}
4 changes: 4 additions & 0 deletions src/main/java/org/opensearch/flowframework/model/State.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@
* Enum relating to the state of a workflow
*/
public enum State {
/** Not Started state */
NOT_STARTED,
/** Provisioning state */
PROVISIONING,
/** Failed state */
FAILED,
/** Completed state */
COMPLETED
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@ public class WorkflowNode implements ToXContentObject {
public static final String ID_FIELD = "id";
/** The template field name for node type */
public static final String TYPE_FIELD = "type";
/** The template field name for previous node inputs */
public static final String PREVIOUS_NODE_INPUTS_FIELD = "previous_node_inputs";
/** The template field name for node inputs */
public static final String INPUTS_FIELD = "inputs";
public static final String USER_INPUTS_FIELD = "user_inputs";
/** The field defining processors in the inputs for search and ingest pipelines */
public static final String PROCESSORS_FIELD = "processors";
/** The field defining the timeout value for this node */
Expand All @@ -50,19 +52,22 @@ public class WorkflowNode implements ToXContentObject {

private final String id; // unique id
private final String type; // maps to a WorkflowStep
private final Map<String, Object> inputs; // maps to WorkflowData
private final Map<String, String> previousNodeInputs;
private final Map<String, Object> userInputs; // maps to WorkflowData

/**
* Create this node with the id and type, and any user input.
*
* @param id A unique string identifying this node
* @param type The type of {@link WorkflowStep} to create for the corresponding {@link ProcessNode}
* @param inputs Optional input to populate params in {@link WorkflowData}
* @param previousNodeInputs Optional input to identify inputs coming from predecessor nodes
* @param userInputs Optional input to populate params in {@link WorkflowData}
*/
public WorkflowNode(String id, String type, Map<String, Object> inputs) {
public WorkflowNode(String id, String type, Map<String, String> previousNodeInputs, Map<String, Object> userInputs) {
this.id = id;
this.type = type;
this.inputs = Map.copyOf(inputs);
this.previousNodeInputs = Map.copyOf(previousNodeInputs);
this.userInputs = Map.copyOf(userInputs);
}

@Override
Expand All @@ -71,8 +76,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
xContentBuilder.field(ID_FIELD, this.id);
xContentBuilder.field(TYPE_FIELD, this.type);

xContentBuilder.startObject(INPUTS_FIELD);
for (Entry<String, Object> e : inputs.entrySet()) {
xContentBuilder.field(PREVIOUS_NODE_INPUTS_FIELD);
buildStringToStringMap(xContentBuilder, previousNodeInputs);

xContentBuilder.startObject(USER_INPUTS_FIELD);
for (Entry<String, Object> e : userInputs.entrySet()) {
xContentBuilder.field(e.getKey());
if (e.getValue() instanceof String) {
xContentBuilder.value(e.getValue());
Expand Down Expand Up @@ -107,7 +115,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
public static WorkflowNode parse(XContentParser parser) throws IOException {
String id = null;
String type = null;
Map<String, Object> inputs = new HashMap<>();
Map<String, String> previousNodeInputs = new HashMap<>();
Map<String, Object> userInputs = new HashMap<>();

ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
Expand All @@ -120,30 +129,33 @@ public static WorkflowNode parse(XContentParser parser) throws IOException {
case TYPE_FIELD:
type = parser.text();
break;
case INPUTS_FIELD:
case PREVIOUS_NODE_INPUTS_FIELD:
previousNodeInputs = parseStringToStringMap(parser);
break;
case USER_INPUTS_FIELD:
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String inputFieldName = parser.currentName();
switch (parser.nextToken()) {
case VALUE_STRING:
inputs.put(inputFieldName, parser.text());
userInputs.put(inputFieldName, parser.text());
break;
case START_OBJECT:
inputs.put(inputFieldName, parseStringToStringMap(parser));
userInputs.put(inputFieldName, parseStringToStringMap(parser));
break;
case START_ARRAY:
if (PROCESSORS_FIELD.equals(inputFieldName)) {
List<PipelineProcessor> processorList = new ArrayList<>();
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
processorList.add(PipelineProcessor.parse(parser));
}
inputs.put(inputFieldName, processorList.toArray(new PipelineProcessor[0]));
userInputs.put(inputFieldName, processorList.toArray(new PipelineProcessor[0]));
} else {
List<Map<String, String>> mapList = new ArrayList<>();
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
mapList.add(parseStringToStringMap(parser));
}
inputs.put(inputFieldName, mapList.toArray(new Map[0]));
userInputs.put(inputFieldName, mapList.toArray(new Map[0]));
}
break;
default:
Expand All @@ -159,7 +171,7 @@ public static WorkflowNode parse(XContentParser parser) throws IOException {
throw new IOException("An node object requires both an id and type field.");
}

return new WorkflowNode(id, type, inputs);
return new WorkflowNode(id, type, previousNodeInputs, userInputs);
}

/**
Expand All @@ -179,11 +191,19 @@ public String type() {
}

/**
* Return this node's input data
* Return this node's user input data
* @return the inputs
*/
public Map<String, Object> userInputs() {
return userInputs;
}

/**
* Return this node's predecessor inputs
* @return the inputs
*/
public Map<String, Object> inputs() {
return inputs;
public Map<String, String> previousNodeInputs() {
return previousNodeInputs;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.model;

import org.opensearch.core.xcontent.XContentParser;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;

/**
* This represents the an object of workflow steps json which maps each step to expected inputs and outputs
*/
public class WorkflowStepValidator {

/** Inputs field name */
private static final String INPUTS_FIELD = "inputs";
/** Outputs field name */
private static final String OUTPUTS_FIELD = "outputs";

private List<String> inputs;
private List<String> outputs;

/**
* Intantiate the object representing a Workflow Step validator
* @param inputs the workflow step inputs
* @param outputs the workflow step outputs
*/
public WorkflowStepValidator(List<String> inputs, List<String> outputs) {
this.inputs = inputs;
this.outputs = outputs;
}

/**
* Parse raw json content into a WorkflowStepValidator instance
* @param parser json based content parser
* @return an instance of the WorkflowStepValidator
* @throws IOException if the content cannot be parsed correctly
*/
public static WorkflowStepValidator parse(XContentParser parser) throws IOException {
List<String> parsedInputs = new ArrayList<>();
List<String> parsedOutputs = new ArrayList<>();

ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String fieldName = parser.currentName();
parser.nextToken();
switch (fieldName) {
case INPUTS_FIELD:
ensureExpectedToken(XContentParser.Token.START_ARRAY, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
parsedInputs.add(parser.text());
}
break;
case OUTPUTS_FIELD:
ensureExpectedToken(XContentParser.Token.START_ARRAY, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
parsedOutputs.add(parser.text());
}
break;
default:
throw new IOException("Unable to parse field [" + fieldName + "] in a WorkflowStepValidator object.");
}
}
return new WorkflowStepValidator(parsedInputs, parsedOutputs);
}

/**
* Get the required inputs
* @return the inputs
*/
public List<String> getInputs() {
return List.copyOf(inputs);
}

/**
* Get the required outputs
* @return the outputs
*/
public List<String> getOutputs() {
return List.copyOf(outputs);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.model;

import com.google.common.base.Charsets;
import com.google.common.io.Resources;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.flowframework.util.ParseUtils;

import java.io.IOException;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;

/**
* This represents the workflow steps json which maps each step to expected inputs and outputs
*/
public class WorkflowValidator {

private Map<String, WorkflowStepValidator> workflowStepValidators;

/**
* Intantiate the object representing a Workflow validator
* @param workflowStepValidators a map of {@link WorkflowStepValidator}
*/
public WorkflowValidator(Map<String, WorkflowStepValidator> workflowStepValidators) {
this.workflowStepValidators = workflowStepValidators;
}

/**
* Parse raw json content into a WorkflowValidator instance
* @param parser json based content parser
* @return an instance of the WorkflowValidator
* @throws IOException if the content cannot be parsed correctly
*/
public static WorkflowValidator parse(XContentParser parser) throws IOException {

Map<String, WorkflowStepValidator> workflowStepValidators = new HashMap<>();

ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String type = parser.currentName();
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
workflowStepValidators.put(type, WorkflowStepValidator.parse(parser));
}
return new WorkflowValidator(workflowStepValidators);
}

/**
* Parse a workflow step JSON file into a WorkflowValidator object
*
* @param file the file name of the workflow step json
* @return A {@link WorkflowValidator} represented by the JSON
* @throws IOException on failure to read and parse the json file
*/
public static WorkflowValidator parse(String file) throws IOException {
URL url = WorkflowValidator.class.getClassLoader().getResource(file);
String json = Resources.toString(url, Charsets.UTF_8);
return parse(ParseUtils.jsonToParser(json));
}

/**
* Get the map of WorkflowStepValidators
* @return the map of WorkflowStepValidators
*/
public Map<String, WorkflowStepValidator> getWorkflowStepValidators() {
return Map.copyOf(this.workflowStepValidators);
}

}
Loading