Skip to content

Commit

Permalink
[Backport 2.x] Updates exception handling with FlowFrameworkException…
Browse files Browse the repository at this point in the history
… AND adds dryrun param to Create Workflow (#144)

Updates exception handling with FlowFrameworkException AND adds dryrun param to Create Workflow (#137)

* Simplifying Template format, removing operations, resources created, user outputs



* Initial commit, modifies use case template to seperate workflow inputs into previous_node_inputs and user_inputs, adds graph validation after topologically sorting a workflow into a list of ProcessNode



* Adding tests



* Adding validate graph test



* Addressing PR comments, moving sorting/validating prior to executing async, adding success test case for graph validation



* Adding javadocs



* Moving validation prior to updating workflow state to provisioning



* Addressing PR comments Part 1



* Addressing PR comments Part 2 : Moving field names to common value class and using constants



* Adding definition for noop workflow step



* Addressing PR comments Part 3



* Modifies rest actions to throw flow framework exceptions, transport actions to create flow framework exceptions



* Fixing credentials field in workflow-step json



* Fixing test



* Using ExceptionsHelper.status() to determine the rest status code based on exceptions thrown by the transport client



* Adding dryrun param to create workflow API, allows for validation before saving



* concatenating log message with exception message on failure



* Adding dry run test



* Simplifying FlowFrameworkException::toXContent



---------


(cherry picked from commit c547658)

Signed-off-by: Joshua Palis <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 1e3c399 commit 80efe24
Show file tree
Hide file tree
Showing 15 changed files with 339 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ private CommonValue() {}
public static final String WORKFLOW_URI = FLOW_FRAMEWORK_BASE_URI + "/workflow";
/** Field name for workflow Id, the document Id of the indexed use case template */
public static final String WORKFLOW_ID = "workflow_id";
/** Field name for dry run, the flag to indicate if validation is necessary */
public static final String DRY_RUN = "dryrun";
/** The field name for provision workflow within a use case template*/
public static final String PROVISION_WORKFLOW = "provision";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,15 @@
package org.opensearch.flowframework.exception;

import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;

/**
* Representation of Flow Framework Exceptions
*/
public class FlowFrameworkException extends RuntimeException {
public class FlowFrameworkException extends RuntimeException implements ToXContentObject {

private static final long serialVersionUID = 1L;

Expand Down Expand Up @@ -60,4 +64,9 @@ public FlowFrameworkException(String message, Throwable cause, RestStatus restSt
public RestStatus getRestStatus() {
return restStatus;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject().field("error", this.getMessage()).endObject();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.google.common.io.Resources;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest;
Expand All @@ -29,6 +30,7 @@
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.commons.authuser.User;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.exception.FlowFrameworkException;
Expand Down Expand Up @@ -148,7 +150,7 @@ public void initFlowFrameworkIndexIfAbsent(FlowFrameworkIndex index, ActionListe
}
}, e -> {
logger.error("Failed to create index " + indexName, e);
internalListener.onFailure(e);
internalListener.onFailure(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e)));
});
CreateIndexRequest request = new CreateIndexRequest(indexName).mapping(mapping).settings(indexSettings);
client.admin().indices().create(request, actionListener);
Expand Down Expand Up @@ -181,17 +183,29 @@ public void initFlowFrameworkIndexIfAbsent(FlowFrameworkIndex index, ActionListe
);
}
}, exception -> {
logger.error("Failed to update index setting for: " + indexName, exception);
internalListener.onFailure(exception);
String errorMessage = "Failed to update index setting for: " + indexName;
logger.error(errorMessage, exception);
internalListener.onFailure(
new FlowFrameworkException(
errorMessage + " : " + exception.getMessage(),
ExceptionsHelper.status(exception)
)
);
}));
} else {
internalListener.onFailure(
new FlowFrameworkException("Failed to update index: " + indexName, INTERNAL_SERVER_ERROR)
);
}
}, exception -> {
logger.error("Failed to update index " + indexName, exception);
internalListener.onFailure(exception);
String errorMessage = "Failed to update index " + indexName;
logger.error(errorMessage, exception);
internalListener.onFailure(
new FlowFrameworkException(
errorMessage + " : " + exception.getMessage(),
ExceptionsHelper.status(exception)
)
);
})
);
} else {
Expand All @@ -200,17 +214,21 @@ public void initFlowFrameworkIndexIfAbsent(FlowFrameworkIndex index, ActionListe
internalListener.onResponse(true);
}
}, e -> {
logger.error("Failed to update index mapping", e);
internalListener.onFailure(e);
String errorMessage = "Failed to update index mapping";
logger.error(errorMessage, e);
internalListener.onFailure(
new FlowFrameworkException(errorMessage + " : " + e.getMessage(), ExceptionsHelper.status(e))
);
}));
} else {
// No need to update index if it's already updated.
internalListener.onResponse(true);
}
}
} catch (Exception e) {
logger.error("Failed to init index " + indexName, e);
listener.onFailure(e);
String errorMessage = "Failed to init index " + indexName;
logger.error(errorMessage, e);
listener.onFailure(new FlowFrameworkException(errorMessage + " : " + e.getMessage(), ExceptionsHelper.status(e)));
}
}

Expand Down Expand Up @@ -272,8 +290,9 @@ public void putTemplateToGlobalContext(Template template, ActionListener<IndexRe
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.index(request, ActionListener.runBefore(listener, () -> context.restore()));
} catch (Exception e) {
logger.error("Failed to index global_context index");
listener.onFailure(e);
String errorMessage = "Failed to index global_context index";
logger.error(errorMessage);
listener.onFailure(new FlowFrameworkException(errorMessage + " : " + e.getMessage(), ExceptionsHelper.status(e)));
}
}, e -> {
logger.error("Failed to create global_context index", e);
Expand Down Expand Up @@ -310,13 +329,15 @@ public void putInitialStateToWorkflowState(String workflowId, User user, ActionL
request.id(workflowId);
client.index(request, ActionListener.runBefore(listener, () -> context.restore()));
} catch (Exception e) {
logger.error("Failed to put state index document", e);
listener.onFailure(e);
String errorMessage = "Failed to put state index document";
logger.error(errorMessage, e);
listener.onFailure(new FlowFrameworkException(errorMessage + " : " + e.getMessage(), ExceptionsHelper.status(e)));
}

}, e -> {
logger.error("Failed to create global_context index", e);
listener.onFailure(e);
String errorMessage = "Failed to create global_context index";
logger.error(errorMessage, e);
listener.onFailure(new FlowFrameworkException(errorMessage + " : " + e.getMessage(), ExceptionsHelper.status(e)));
}));
}

Expand All @@ -332,7 +353,7 @@ public void updateTemplateInGlobalContext(String documentId, Template template,
+ documentId
+ ", global_context index does not exist.";
logger.error(exceptionMessage);
listener.onFailure(new Exception(exceptionMessage));
listener.onFailure(new FlowFrameworkException(exceptionMessage, RestStatus.BAD_REQUEST));
} else {
IndexRequest request = new IndexRequest(GLOBAL_CONTEXT_INDEX).id(documentId);
try (
Expand All @@ -343,8 +364,9 @@ public void updateTemplateInGlobalContext(String documentId, Template template,
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.index(request, ActionListener.runBefore(listener, () -> context.restore()));
} catch (Exception e) {
logger.error("Failed to update global_context entry : {}. {}", documentId, e.getMessage());
listener.onFailure(e);
String errorMessage = "Failed to update global_context entry : " + documentId;
logger.error(errorMessage, e);
listener.onFailure(new FlowFrameworkException(errorMessage + " : " + e.getMessage(), ExceptionsHelper.status(e)));
}
}
}
Expand All @@ -365,7 +387,7 @@ public void updateFlowFrameworkSystemIndexDoc(
if (!doesIndexExist(indexName)) {
String exceptionMessage = "Failed to update document for given workflow due to missing " + indexName + " index";
logger.error(exceptionMessage);
listener.onFailure(new Exception(exceptionMessage));
listener.onFailure(new FlowFrameworkException(exceptionMessage, RestStatus.BAD_REQUEST));
} else {
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
UpdateRequest updateRequest = new UpdateRequest(indexName, documentId);
Expand All @@ -376,8 +398,9 @@ public void updateFlowFrameworkSystemIndexDoc(
// TODO: decide what condition can be considered as an update conflict and add retry strategy
client.update(updateRequest, ActionListener.runBefore(listener, () -> context.restore()));
} catch (Exception e) {
logger.error("Failed to update {} entry : {}. {}", indexName, documentId, e.getMessage());
listener.onFailure(e);
String errorMessage = "Failed to update " + indexName + " entry : " + documentId;
logger.error(errorMessage, e);
listener.onFailure(new FlowFrameworkException(errorMessage + " : " + e.getMessage(), ExceptionsHelper.status(e)));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public static Template parse(XContentParser parser) throws IOException {
}
}
if (name == null) {
throw new IOException("An template object requires a name.");
throw new IOException("A template object requires a name.");
}

return new Template(name, description, useCase, templateVersion, compatibilityVersion, workflows, user);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,26 @@
package org.opensearch.flowframework.rest;

import com.google.common.collect.ImmutableList;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.client.node.NodeClient;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.transport.CreateWorkflowAction;
import org.opensearch.flowframework.transport.WorkflowRequest;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.action.RestToXContentListener;

import java.io.IOException;
import java.util.List;
import java.util.Locale;

import static org.opensearch.flowframework.common.CommonValue.DRY_RUN;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI;

Expand All @@ -29,6 +37,7 @@
*/
public class RestCreateWorkflowAction extends BaseRestHandler {

private static final Logger logger = LogManager.getLogger(RestCreateWorkflowAction.class);
private static final String CREATE_WORKFLOW_ACTION = "create_workflow_action";

/**
Expand All @@ -53,11 +62,32 @@ public List<Route> routes() {

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
try {

String workflowId = request.param(WORKFLOW_ID);
Template template = Template.parse(request.content().utf8ToString());
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, template);
return channel -> client.execute(CreateWorkflowAction.INSTANCE, workflowRequest, new RestToXContentListener<>(channel));
String workflowId = request.param(WORKFLOW_ID);
Template template = Template.parse(request.content().utf8ToString());
boolean dryRun = request.paramAsBoolean(DRY_RUN, false);

WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, template, dryRun);

return channel -> client.execute(CreateWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> {
XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(RestStatus.CREATED, builder));
}, exception -> {
try {
FlowFrameworkException ex = (FlowFrameworkException) exception;
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));
} catch (IOException e) {
logger.error("Failed to send back create workflow exception", e);
}
}));
} catch (Exception e) {
FlowFrameworkException ex = new FlowFrameworkException(e.getMessage(), RestStatus.BAD_REQUEST);
return channel -> channel.sendResponse(
new BytesRestResponse(ex.getRestStatus(), ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS))
);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,19 @@
package org.opensearch.flowframework.rest;

import com.google.common.collect.ImmutableList;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.client.node.NodeClient;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.transport.ProvisionWorkflowAction;
import org.opensearch.flowframework.transport.WorkflowRequest;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.action.RestToXContentListener;

import java.io.IOException;
import java.util.List;
Expand All @@ -30,6 +35,8 @@
*/
public class RestProvisionWorkflowAction extends BaseRestHandler {

private static final Logger logger = LogManager.getLogger(RestProvisionWorkflowAction.class);

private static final String PROVISION_WORKFLOW_ACTION = "provision_workflow_action";

/**
Expand All @@ -52,21 +59,35 @@ public List<Route> routes() {

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {

// Validate content
if (request.hasContent()) {
throw new FlowFrameworkException("Invalid request format", RestStatus.BAD_REQUEST);
}

// Validate params
String workflowId = request.param(WORKFLOW_ID);
if (workflowId == null) {
throw new FlowFrameworkException("workflow_id cannot be null", RestStatus.BAD_REQUEST);
try {
// Validate content
if (request.hasContent()) {
throw new FlowFrameworkException("Invalid request format", RestStatus.BAD_REQUEST);
}
// Validate params
if (workflowId == null) {
throw new FlowFrameworkException("workflow_id cannot be null", RestStatus.BAD_REQUEST);
}
// Create request and provision
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null);
return channel -> client.execute(ProvisionWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> {
XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
}, exception -> {
try {
FlowFrameworkException ex = (FlowFrameworkException) exception;
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));
} catch (IOException e) {
logger.error("Failed to send back provision workflow exception", e);
}
}));
} catch (FlowFrameworkException ex) {
return channel -> channel.sendResponse(
new BytesRestResponse(ex.getRestStatus(), ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS))
);
}

// Create request and provision
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null);
return channel -> client.execute(ProvisionWorkflowAction.INSTANCE, workflowRequest, new RestToXContentListener<>(channel));
}

}
Loading

0 comments on commit 80efe24

Please sign in to comment.