Skip to content

Commit

Permalink
mend
Browse files Browse the repository at this point in the history
Signed-off-by: Amit Galitzky <[email protected]>
  • Loading branch information
amitgalitz committed Nov 6, 2023
1 parent 8b09716 commit 7374672
Show file tree
Hide file tree
Showing 17 changed files with 136 additions and 185 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.opensearch.flowframework.rest.RestProvisionWorkflowAction;
import org.opensearch.flowframework.transport.CreateWorkflowAction;
import org.opensearch.flowframework.transport.CreateWorkflowTransportAction;
import org.opensearch.flowframework.transport.GetWorkflowAction;
import org.opensearch.flowframework.transport.GetWorkflowTransportAction;
import org.opensearch.flowframework.transport.ProvisionWorkflowAction;
import org.opensearch.flowframework.transport.ProvisionWorkflowTransportAction;
import org.opensearch.flowframework.workflow.WorkflowProcessSorter;
Expand Down Expand Up @@ -101,7 +103,8 @@ public List<RestHandler> getRestHandlers(
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return ImmutableList.of(
new ActionHandler<>(CreateWorkflowAction.INSTANCE, CreateWorkflowTransportAction.class),
new ActionHandler<>(ProvisionWorkflowAction.INSTANCE, ProvisionWorkflowTransportAction.class)
new ActionHandler<>(ProvisionWorkflowAction.INSTANCE, ProvisionWorkflowTransportAction.class),
new ActionHandler<>(GetWorkflowAction.INSTANCE, GetWorkflowTransportAction.class)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,29 @@
import static org.opensearch.flowframework.common.CommonValue.RESOURCE_ID_FIELD;
import static org.opensearch.flowframework.common.CommonValue.RESOURCE_NAME_FIELD;

/**
* This represents an object in the WorkflowState {@link WorkflowState}.
*/
public class ResourcesCreated implements ToXContentObject, Writeable {

public String resourceName;
public String resourceId;
private String resourceName;
private String resourceId;

/**
* Create this resources created object with given resource name and ID.
* @param resourceName The resource name associating to the step name where it was created
* @param resourceId The resources ID for relating to the created resource
*/
public ResourcesCreated(String resourceName, String resourceId) {
this.resourceName = resourceName;
this.resourceId = resourceId;
}

/**
* Create this resources created object with an StreamInput
* @param input the input stream to read from
* @throws IOException if failed to read input stream
*/
public ResourcesCreated(StreamInput input) throws IOException {
this.resourceName = input.readString();
this.resourceId = input.readString();
Expand All @@ -50,6 +63,13 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(resourceId);
}

/**
* Parse raw JSON content into a resourcesCreated instance.
*
* @param parser JSON based content parser
* @return the parsed ResourcesCreated instance
* @throws IOException if content can't be parsed correctly
*/
public static ResourcesCreated parse(XContentParser parser) throws IOException {
String resourceName = null;
String resourceId = null;
Expand Down
15 changes: 2 additions & 13 deletions src/main/java/org/opensearch/flowframework/model/Template.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,18 +79,6 @@ public Template(
this.user = user;
}

// public Template(StreamInput input) throws IOException {
// this.name = input.readString();
// this.description = input.readOptionalString();
// this.useCase = input.readString();
// this.templateVersion = input.readVersion();
// this.compatibilityVersion = input.readList(Version::new); // Replace with actual method if different
// this.workflows = input.readMap(StreamInput::readString, WorkFlow::new); // Replace with the actual function to read WorkFlow objects
// if (input.readBoolean()) {
// this.user = new User(input); // Replace with the actual constructor or factory method for User
// }
// }

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
XContentBuilder xContentBuilder = builder.startObject();
Expand Down Expand Up @@ -125,13 +113,14 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return xContentBuilder.endObject();
}

// TODO: fix writeable when implementing get workflow API
@Override
public void writeTo(StreamOutput output) throws IOException {
output.writeString(name);
output.writeOptionalString(description);
output.writeString(useCase);
output.writeVersion(templateVersion);
output.writeList((List<? extends Writeable>) compatibilityVersion);
// output.writeList((List<? extends Writeable>) compatibilityVersion);
output.writeMapWithConsistentOrder(workflows);
if (user != null) {
output.writeBoolean(true); // user exists
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,20 @@ public WorkflowState(

private WorkflowState() {}

/**
* Instatiates a new WorkflowState from an input stream
* @param input the input stream to read from
* @throws IOException if the workflowId cannot be read from the input stream
*/
public WorkflowState(StreamInput input) throws IOException {
this.workflowId = input.readString();
this.error = input.readOptionalString();
this.state = input.readOptionalString();
this.provisioningProgress = input.readOptionalString();
this.provisionStartTime = input.readOptionalInstant();
this.provisionEndTime = input.readOptionalInstant();
this.user = input.readBoolean() ? new User(input) : null;
// TODO: fix error: cannot access Response issue when integrating with access control
// this.user = input.readBoolean() ? new User(input) : null;
this.uiMetadata = input.readBoolean() ? input.readMap() : null;
this.userOutputs = input.readBoolean() ? input.readMap() : null;
this.resourcesCreated = input.readList(ResourcesCreated::new);
Expand Down Expand Up @@ -499,8 +505,4 @@ public List<ResourcesCreated> resourcesCreated() {
return resourcesCreated;
}

public static WorkflowState fromStream(StreamInput in) throws IOException {
WorkflowState workflowState = new WorkflowState(in);
return workflowState;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI;

/**
* Rest Action to facilitate requests to get a workflow status
*/
public class RestGetWorkflowAction extends BaseRestHandler {

private static final String GET_WORKFLOW_ACTION = "get_workflow";
Expand Down Expand Up @@ -59,22 +62,8 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request
boolean all = request.paramAsBoolean("_all", false);

// Create request and provision
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null);
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null, all);
return channel -> client.execute(ProvisionWorkflowAction.INSTANCE, workflowRequest, new RestToXContentListener<>(channel));

// GetAnomalyDetectorRequest getAnomalyDetectorRequest = new GetAnomalyDetectorRequest(
// detectorId,
// RestActions.parseVersion(request),
// returnJob,
// returnTask,
// typesStr,
// rawPath,
// all,
// buildEntity(request, detectorId)
// );
//
// return channel -> client
// .execute(GetAnomalyDetectorAction.INSTANCE, getAnomalyDetectorRequest, new RestToXContentListener<>(channel));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,94 +13,52 @@
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.model.WorkflowState;

import java.io.IOException;

import static org.opensearch.flowframework.common.CommonValue.TEMPLATE_FIELD;

/**
* Transport Response from getting a workflow status
*/
public class GetWorkflowResponse extends ActionResponse implements ToXContentObject {

public static final String WORKFLOW_STATUS = "workflowStatus";
private String id;
private WorkflowState workflowState;
private Template template;
private boolean workflowStatus;

public GetWorkflowResponse(String id, WorkflowState workflowState, Template template, boolean workflowStatus) {
this.id = id;
this.template = template;
this.workflowState = workflowState;
this.workflowStatus = workflowStatus;
}
public WorkflowState workflowState;
public boolean allStatus;

/*
if (workflowStatus) {
out.writeBoolean(true); // profileResponse is true
if (workflowState != null) {
out.writeString(WORKFLOW_STATUS);
WorkflowState.writeTo(out);
}
if (template != null) {
out.writeString(TEMPLATE_FIELD);
template.writeTo(out);
}
} else {
out.writeBoolean(false); // profileResponse is false
out.writeString(id);
template.writeTo(out);
}
/**
* Instantiates a new GetWorkflowResponse from an input stream
* @param in the input stream to read from
* @throws IOException if the workflowId cannot be read from the input stream
*/
public GetWorkflowResponse(StreamInput in) throws IOException {
super(in);
workflowStatus = in.readBoolean();
if (workflowStatus) {
if (workflowState != null) {
workflowState = new WorkflowState(in);
}
if (template != null) {
template = new Template(in);
}
} else {
workflowState = null;
id = in.readString();
template = new Template(in);
}
workflowState = new WorkflowState(in);
allStatus = false;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
if (workflowStatus) {
builder.startObject();
builder.field(WORKFLOW_STATUS, workflowState);
builder.field(TEMPLATE_FIELD, template);
builder.endObject();
/**
* Instatiates a new GetWorkflowResponse from an input stream
* @param workflowState the workflow state object
* @param allStatus whether to return all fields in state index
*/
public GetWorkflowResponse(WorkflowState workflowState, boolean allStatus) {
if (allStatus) {
this.workflowState = workflowState;
} else {
builder.startObject();
builder.field("_id", id);
builder.field(TEMPLATE_FIELD, template);
builder.endObject();
this.workflowState = new WorkflowState.Builder().workflowId(workflowState.getWorkflowId())
.error(workflowState.getWorkflowId())
.state(workflowState.getWorkflowId())
.build();
}
return builder;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
if (workflowStatus) {
out.writeBoolean(true); // profileResponse is true
// if (workflowState != null) {
// out.writeString(WORKFLOW_STATUS);
// WorkflowState.writeTo(out);
// }
if (template != null) {
out.writeString(TEMPLATE_FIELD);
template.writeTo(out);
}
} else {
out.writeBoolean(false); // profileResponse is false
out.writeString(id);
template.writeTo(out);
}
workflowState.writeTo(out);
}

@Override
public XContentBuilder toXContent(XContentBuilder xContentBuilder, Params params) throws IOException {
return workflowState.toXContent(xContentBuilder, params);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,25 @@
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX;

public class GetWorkflowTransportAction extends HandledTransportAction<WorkflowRequest, GetWorkflowResponseTest> {
//TODO: Currently we only get the workflow status but we should change to be able to get the
// full template as well
/**
* Transport Action to get status of a current workflow
*/
public class GetWorkflowTransportAction extends HandledTransportAction<WorkflowRequest, GetWorkflowResponse> {

private final Logger logger = LogManager.getLogger(GetWorkflowTransportAction.class);

private final Client client;
private final NamedXContentRegistry xContentRegistry;

/**
* Intantiates a new CreateWorkflowTransportAction
* @param transportService the TransportService
* @param actionFilters action filters
* @param client The client used to make the request to OS
* @param xContentRegistry contentRegister to parse get response
*/
@Inject
public GetWorkflowTransportAction(
TransportService transportService,
Expand All @@ -51,7 +63,7 @@ public GetWorkflowTransportAction(
}

@Override
protected void doExecute(Task task, WorkflowRequest request, ActionListener<GetWorkflowResponseTest> listener) {
protected void doExecute(Task task, WorkflowRequest request, ActionListener<GetWorkflowResponse> listener) {
String workflowId = request.getWorkflowId();
User user = ParseUtils.getUserContext(client);
GetRequest getRequest = new GetRequest(WORKFLOW_STATE_INDEX).id(workflowId);
Expand All @@ -63,7 +75,7 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<GetW
try (XContentParser parser = ParseUtils.createXContentParserFromRegistry(xContentRegistry, r.getSourceAsBytesRef())) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
WorkflowState workflowState = WorkflowState.parse(parser);
listener.onResponse(new GetWorkflowResponseTest(workflowState));
listener.onResponse(new GetWorkflowResponse(workflowState, request.getAll()));
} catch (Exception e) {
logger.error("Failed to parse workflowState" + r.getId(), e);
listener.onFailure(e);
Expand Down
Loading

0 comments on commit 7374672

Please sign in to comment.