Skip to content

Commit

Permalink
removed additional listener in provision action
Browse files Browse the repository at this point in the history
Signed-off-by: Amit Galitzky <[email protected]>
  • Loading branch information
amitgalitz committed Nov 15, 2023
1 parent 3ee4d72 commit 2fa6fd1
Showing 1 changed file with 39 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work

// Respond to rest action then execute provisioning workflow async
listener.onResponse(new WorkflowResponse(workflowId));
executeWorkflowAsync(workflowId, provisionProcessSequence);
executeWorkflowAsync(workflowId, provisionProcessSequence, listener);

}, exception -> {
if (exception instanceof FlowFrameworkException) {
Expand All @@ -155,59 +155,22 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
* Retrieves a thread from the provision thread pool to execute a workflow
* @param workflowId The id of the workflow
* @param workflowSequence The sorted workflow to execute
* @param listener ActionListener for any failures that don't get caught earlier in below step
*/
private void executeWorkflowAsync(String workflowId, List<ProcessNode> workflowSequence) {
// TODO : Update Action listener type to State index Request
ActionListener<String> provisionWorkflowListener = ActionListener.wrap(response -> {
logger.info("Provisioning completed successfully for workflow {}", workflowId);
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc(
workflowId,
ImmutableMap.of(
STATE_FIELD,
State.COMPLETED,
PROVISIONING_PROGRESS_FIELD,
ProvisioningProgress.DONE,
PROVISION_END_TIME_FIELD,
Instant.now().toEpochMilli()
),
ActionListener.wrap(updateResponse -> {
logger.info("updated workflow {} state to {}", workflowId, State.COMPLETED);
}, exception -> { logger.error("Failed to update workflow state : {}", exception.getMessage()); })
);

}, exception -> {
logger.error("Provisioning failed for workflow {} : {}", workflowId, exception);

flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc(
workflowId,
ImmutableMap.of(
STATE_FIELD,
State.FAILED,
ERROR_FIELD,
exception.getMessage(), // TODO: potentially improve the error message here
PROVISIONING_PROGRESS_FIELD,
ProvisioningProgress.FAILED,
PROVISION_END_TIME_FIELD,
Instant.now().toEpochMilli()
),
ActionListener.wrap(updateResponse -> {
logger.info("updated workflow {} state to {}", workflowId, State.COMPLETED);
}, exceptionState -> { logger.error("Failed to update workflow state : {}", exceptionState.getMessage()); })
);
});
private void executeWorkflowAsync(String workflowId, List<ProcessNode> workflowSequence, ActionListener<WorkflowResponse> listener) {
try {
threadPool.executor(PROVISION_THREAD_POOL).execute(() -> { executeWorkflow(workflowSequence, provisionWorkflowListener); });
threadPool.executor(PROVISION_THREAD_POOL).execute(() -> { executeWorkflow(workflowSequence, workflowId); });

Check warning on line 162 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L162

Added line #L162 was not covered by tests
} catch (Exception exception) {
provisionWorkflowListener.onFailure(new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception)));
listener.onFailure(new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception)));
}
}

/**
* Executes the given workflow sequence
* @param workflowSequence The topologically sorted workflow to execute
* @param workflowListener The listener that updates the status of a workflow execution
* @param workflowId The workflowId associated with the workflow that is executing
*/
private void executeWorkflow(List<ProcessNode> workflowSequence, ActionListener<String> workflowListener) {
private void executeWorkflow(List<ProcessNode> workflowSequence, String workflowId) {
try {

List<CompletableFuture<?>> workflowFutureList = new ArrayList<>();
Expand All @@ -232,12 +195,39 @@ private void executeWorkflow(List<ProcessNode> workflowSequence, ActionListener<
// Attempt to join each workflow step future, may throw a CompletionException if any step completes exceptionally
workflowFutureList.forEach(CompletableFuture::join);

workflowListener.onResponse("READY");

} catch (IllegalArgumentException e) {
workflowListener.onFailure(new FlowFrameworkException(e.getMessage(), RestStatus.BAD_REQUEST));
logger.info("Provisioning completed successfully for workflow {}", workflowId);
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc(

Check warning on line 199 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L198-L199

Added lines #L198 - L199 were not covered by tests
workflowId,
ImmutableMap.of(

Check warning on line 201 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L201

Added line #L201 was not covered by tests
STATE_FIELD,
State.COMPLETED,
PROVISIONING_PROGRESS_FIELD,
ProvisioningProgress.DONE,
PROVISION_END_TIME_FIELD,
Instant.now().toEpochMilli()

Check warning on line 207 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L207

Added line #L207 was not covered by tests
),
ActionListener.wrap(updateResponse -> {
logger.info("updated workflow {} state to {}", workflowId, State.COMPLETED);
}, exception -> { logger.error("Failed to update workflow state : {}", exception.getMessage()); })

Check warning on line 211 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L209-L211

Added lines #L209 - L211 were not covered by tests
);
} catch (Exception ex) {
workflowListener.onFailure(new FlowFrameworkException(ex.getMessage(), ExceptionsHelper.status(ex)));
logger.error("Provisioning failed for workflow {} : {}", workflowId, ex);
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc(

Check warning on line 215 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L214-L215

Added lines #L214 - L215 were not covered by tests
workflowId,
ImmutableMap.of(

Check warning on line 217 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L217

Added line #L217 was not covered by tests
STATE_FIELD,
State.FAILED,
ERROR_FIELD,
ex.getMessage(), // TODO: potentially improve the error message here

Check warning on line 221 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L221

Added line #L221 was not covered by tests
PROVISIONING_PROGRESS_FIELD,
ProvisioningProgress.FAILED,
PROVISION_END_TIME_FIELD,
Instant.now().toEpochMilli()

Check warning on line 225 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L225

Added line #L225 was not covered by tests
),
ActionListener.wrap(updateResponse -> {
logger.info("updated workflow {} state to {}", workflowId, State.COMPLETED);
}, exceptionState -> { logger.error("Failed to update workflow state : {}", exceptionState.getMessage()); })

Check warning on line 229 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L227-L229

Added lines #L227 - L229 were not covered by tests
);
}
}

Expand Down

0 comments on commit 2fa6fd1

Please sign in to comment.