Skip to content

Commit

Permalink
new exception type for workflow step failures
Browse files Browse the repository at this point in the history
Signed-off-by: Amit Galitzky <[email protected]>
  • Loading branch information
amitgalitz committed Mar 14, 2024
1 parent 7198573 commit 7b249f6
Show file tree
Hide file tree
Showing 22 changed files with 165 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class FlowFrameworkException extends RuntimeException implements ToXConte
private static final long serialVersionUID = 1L;

/** The rest status code of this exception */
private final RestStatus restStatus;
protected final RestStatus restStatus;

/**
* Constructor with error message.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.exception;

import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;

/**
* Representation of an exception that is caused by a workflow step failing outside of our plugin
* This is caught by an external client (e.g. ml-client) returning the failure
*/
public class WorkflowStepException extends FlowFrameworkException implements ToXContentObject {

private static final long serialVersionUID = 1L;

/**
* Constructor with error message.
*
* @param message message of the exception
* @param restStatus HTTP status code of the response
*/
public WorkflowStepException(String message, RestStatus restStatus) {
super(message, restStatus);
}

/**
* Constructor with specified cause.
* @param cause exception cause
* @param restStatus HTTP status code of the response
*/
public WorkflowStepException(Throwable cause, RestStatus restStatus) {
super(cause, restStatus);
}

/**
* Constructor with specified error message adn cause.
* @param message error message
* @param cause exception cause
* @param restStatus HTTP status code of the response
*/
public WorkflowStepException(String message, Throwable cause, RestStatus restStatus) {
super(message, cause, restStatus);
}

/**
* Getter for restStatus.
*
* @return the HTTP status code associated with the exception
*/
public RestStatus getRestStatus() {
return restStatus;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject().field("error", this.getMessage()).endObject();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.flowframework.common.FlowFrameworkSettings;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.transport.CreateWorkflowAction;
import org.opensearch.flowframework.transport.WorkflowRequest;
Expand Down Expand Up @@ -123,9 +124,14 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
channel.sendResponse(new BytesRestResponse(RestStatus.CREATED, builder));
}, exception -> {
try {
FlowFrameworkException ex = exception instanceof FlowFrameworkException
? (FlowFrameworkException) exception
: new FlowFrameworkException("Failed to create workflow.", ExceptionsHelper.status(exception));
FlowFrameworkException ex;
if (exception instanceof WorkflowStepException) {
ex = (WorkflowStepException) exception;
} else if (exception instanceof FlowFrameworkException) {
ex = (FlowFrameworkException) exception;
} else {
ex = new FlowFrameworkException("Failed to create workflow.", ExceptionsHelper.status(exception));
}
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.common.FlowFrameworkSettings;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.transport.DeleteWorkflowAction;
import org.opensearch.flowframework.transport.WorkflowRequest;
import org.opensearch.rest.BaseRestHandler;
Expand Down Expand Up @@ -84,9 +85,14 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
}, exception -> {
try {
FlowFrameworkException ex = exception instanceof FlowFrameworkException
? (FlowFrameworkException) exception
: new FlowFrameworkException("Failed to delete workflow.", ExceptionsHelper.status(exception));
FlowFrameworkException ex;
if (exception instanceof WorkflowStepException) {
ex = (WorkflowStepException) exception;
} else if (exception instanceof FlowFrameworkException) {
ex = (FlowFrameworkException) exception;
} else {
ex = new FlowFrameworkException("Failed to delete workflow.", ExceptionsHelper.status(exception));
}
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.common.FlowFrameworkSettings;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.transport.DeprovisionWorkflowAction;
import org.opensearch.flowframework.transport.WorkflowRequest;
import org.opensearch.rest.BaseRestHandler;
Expand Down Expand Up @@ -66,8 +67,7 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request
}
// Validate content
if (request.hasContent()) {
// BaseRestHandler will give appropriate error message
return channel -> channel.sendResponse(null);
throw new FlowFrameworkException("deprovision request should have no payload", RestStatus.BAD_REQUEST);
}
// Validate params
if (workflowId == null) {
Expand All @@ -80,9 +80,14 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
}, exception -> {
try {
FlowFrameworkException ex = exception instanceof FlowFrameworkException
? (FlowFrameworkException) exception
: new FlowFrameworkException("Failed to deprovision workflow.", ExceptionsHelper.status(exception));
FlowFrameworkException ex;
if (exception instanceof WorkflowStepException) {
ex = (WorkflowStepException) exception;
} else if (exception instanceof FlowFrameworkException) {
ex = (FlowFrameworkException) exception;
} else {
ex = new FlowFrameworkException("Failed to deprovision workflow.", ExceptionsHelper.status(exception));
}
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.common.FlowFrameworkSettings;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.transport.GetWorkflowAction;
import org.opensearch.flowframework.transport.WorkflowRequest;
import org.opensearch.rest.BaseRestHandler;
Expand Down Expand Up @@ -85,9 +86,14 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
}, exception -> {
try {
FlowFrameworkException ex = exception instanceof FlowFrameworkException
? (FlowFrameworkException) exception
: new FlowFrameworkException("Failed to get workflow.", ExceptionsHelper.status(exception));
FlowFrameworkException ex;
if (exception instanceof WorkflowStepException) {
ex = (WorkflowStepException) exception;
} else if (exception instanceof FlowFrameworkException) {
ex = (FlowFrameworkException) exception;
} else {
ex = new FlowFrameworkException("Failed to get workflow.", ExceptionsHelper.status(exception));
}
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.common.FlowFrameworkSettings;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.transport.GetWorkflowStateAction;
import org.opensearch.flowframework.transport.GetWorkflowStateRequest;
import org.opensearch.rest.BaseRestHandler;
Expand Down Expand Up @@ -82,9 +83,14 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
}, exception -> {
try {
FlowFrameworkException ex = exception instanceof FlowFrameworkException
? (FlowFrameworkException) exception
: new FlowFrameworkException("Failed to get workflow state.", ExceptionsHelper.status(exception));
FlowFrameworkException ex;
if (exception instanceof WorkflowStepException) {
ex = (WorkflowStepException) exception;
} else if (exception instanceof FlowFrameworkException) {
ex = (FlowFrameworkException) exception;
} else {
ex = new FlowFrameworkException("Failed to get workflow state.", ExceptionsHelper.status(exception));
}
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.common.FlowFrameworkSettings;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.transport.GetWorkflowStepAction;
import org.opensearch.flowframework.transport.WorkflowRequest;
import org.opensearch.rest.BaseRestHandler;
Expand Down Expand Up @@ -81,9 +82,14 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
}, exception -> {
try {
FlowFrameworkException ex = exception instanceof FlowFrameworkException
? (FlowFrameworkException) exception
: new FlowFrameworkException("Failed to get workflow step.", ExceptionsHelper.status(exception));
FlowFrameworkException ex;
if (exception instanceof WorkflowStepException) {
ex = (WorkflowStepException) exception;
} else if (exception instanceof FlowFrameworkException) {
ex = (FlowFrameworkException) exception;
} else {
ex = new FlowFrameworkException("Failed to get workflow step.", ExceptionsHelper.status(exception));
}
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.flowframework.common.FlowFrameworkSettings;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.transport.ProvisionWorkflowAction;
import org.opensearch.flowframework.transport.WorkflowRequest;
import org.opensearch.rest.BaseRestHandler;
Expand Down Expand Up @@ -92,9 +93,14 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
}, exception -> {
try {
FlowFrameworkException ex = exception instanceof FlowFrameworkException
? (FlowFrameworkException) exception
: new FlowFrameworkException("Failed to provision workflow.", ExceptionsHelper.status(exception));
FlowFrameworkException ex;
if (exception instanceof WorkflowStepException) {
ex = (WorkflowStepException) exception;
} else if (exception instanceof FlowFrameworkException) {
ex = (FlowFrameworkException) exception;
} else {
ex = new FlowFrameworkException("Failed to provision workflow.", ExceptionsHelper.status(exception));
}
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,18 @@ private void executeWorkflow(List<ProcessNode> workflowSequence, String workflow
}, exception -> { logger.error("Failed to update workflow state for workflow {}", workflowId, exception); })
);
} catch (Exception ex) {
RestStatus status;
if (ex instanceof FlowFrameworkException) {
status = ((FlowFrameworkException) ex).getRestStatus();
} else {
status = ExceptionsHelper.status(ex);
}
logger.error("Provisioning failed for workflow {} during step {}.", workflowId, currentStepId, ex);
String errorMessage = (ex.getCause() == null ? ex.getClass().getName() : ex.getCause().getClass().getName())
+ " during step "
+ currentStepId;
+ currentStepId
+ ", restStatus: "
+ status.toString();
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc(
workflowId,
Map.ofEntries(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.util.ParseUtils;

Expand Down Expand Up @@ -135,7 +136,7 @@ public void onResponse(AcknowledgedResponse acknowledgedResponse) {
public void onFailure(Exception e) {
String errorMessage = "Failed step " + pipelineToBeCreated;
logger.error(errorMessage, e);
createPipelineFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
createPipelineFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}

};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.flowframework.common.FlowFrameworkSettings;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.util.ParseUtils;
import org.opensearch.ml.client.MachineLearningNodeClient;
Expand Down Expand Up @@ -214,7 +215,7 @@ public PlainActionFuture<WorkflowData> execute(
}, exception -> {
String errorMessage = "Failed to register local model in step " + currentNodeId;
logger.error(errorMessage, exception);
registerLocalModelFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception)));
registerLocalModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(exception)));
}));
} catch (FlowFrameworkException e) {
registerLocalModelFuture.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.core.rest.RestStatus;
import org.opensearch.flowframework.common.FlowFrameworkSettings;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.ml.client.MachineLearningNodeClient;
import org.opensearch.ml.common.MLTask;
Expand Down Expand Up @@ -127,7 +128,7 @@ protected void retryableGetMlTask(
}, exception -> {
String errorMessage = workflowStep + " failed";
logger.error(errorMessage, exception);
mlTaskListener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST));
mlTaskListener.onFailure(new WorkflowStepException(errorMessage, RestStatus.BAD_REQUEST));
}));
try {
Thread.sleep(this.retryDuration.getMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.util.ParseUtils;
import org.opensearch.ml.client.MachineLearningNodeClient;
Expand Down Expand Up @@ -123,7 +124,7 @@ public void onResponse(MLCreateConnectorResponse mlCreateConnectorResponse) {
public void onFailure(Exception e) {
String errorMessage = "Failed to create connector";
logger.error(errorMessage, e);
createConnectorFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
createConnectorFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
};

Expand Down
Loading

0 comments on commit 7b249f6

Please sign in to comment.