Skip to content

Commit

Permalink
Merge branch 'main' into tool-step-config
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis committed Oct 21, 2024
2 parents b9970aa + 389d680 commit b8d2f0c
Show file tree
Hide file tree
Showing 15 changed files with 667 additions and 31 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)

## [Unreleased 2.x](https://github.com/opensearch-project/flow-framework/compare/2.17...2.x)
### Features
- Add ApiSpecFetcher for Fetching and Comparing API Specifications ([#651](https://github.com/opensearch-project/flow-framework/issues/651))
- Add optional config field to tool step ([#899](https://github.com/opensearch-project/flow-framework/pull/899))

### Enhancements
- Incrementally remove resources from workflow state during deprovisioning ([#898](https://github.com/opensearch-project/flow-framework/pull/898))

### Bug Fixes
### Infrastructure
### Documentation
Expand Down
30 changes: 25 additions & 5 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ buildscript {
opensearch_no_snapshot = opensearch_build.replace("-SNAPSHOT","")
System.setProperty('tests.security.manager', 'false')
common_utils_version = System.getProperty("common_utils.version", opensearch_build)

swaggerCoreVersion = "2.2.23"
bwcVersionShort = "2.12.0"
bwcVersion = bwcVersionShort + ".0"
bwcOpenSearchFFDownload = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + bwcVersionShort + '/latest/linux/x64/tar/builds/' +
Expand All @@ -34,6 +34,10 @@ buildscript {
bwcFlowFrameworkPath = bwcFilePath + "flowframework/"

isSameMajorVersion = opensearch_version.split("\\.")[0] == bwcVersionShort.split("\\.")[0]
swaggerVersion = "2.1.22"
jacksonVersion = "2.18.0"
swaggerCoreVersion = "2.2.25"

}

repositories {
Expand Down Expand Up @@ -164,21 +168,37 @@ configurations {

dependencies {
implementation "org.opensearch:opensearch:${opensearch_version}"
implementation 'org.junit.jupiter:junit-jupiter:5.11.1'
implementation 'org.junit.jupiter:junit-jupiter:5.11.2'
api group: 'org.opensearch', name:'opensearch-ml-client', version: "${opensearch_build}"
api group: 'org.opensearch.client', name: 'opensearch-rest-client', version: "${opensearch_version}"
implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.17.0'
implementation "org.opensearch:common-utils:${common_utils_version}"
implementation "com.amazonaws:aws-encryption-sdk-java:3.0.1"
implementation "software.amazon.cryptography:aws-cryptographic-material-providers:1.7.0"
implementation "org.dafny:DafnyRuntime:4.8.0"
implementation "software.amazon.smithy.dafny:conversion:0.1"
implementation "org.dafny:DafnyRuntime:4.8.1"
implementation "software.amazon.smithy.dafny:conversion:0.1.1"
implementation 'org.bouncycastle:bcprov-jdk18on:1.78.1'
api "org.apache.httpcomponents.core5:httpcore5:5.3"
implementation "jakarta.json.bind:jakarta.json.bind-api:3.0.1"
implementation "org.glassfish:jakarta.json:2.0.1"
implementation "org.eclipse:yasson:3.0.4"
implementation "com.google.code.gson:gson:2.11.0"
// Swagger-Parser dependencies for API consistency tests
implementation "io.swagger.core.v3:swagger-models:${swaggerCoreVersion}"
implementation "io.swagger.core.v3:swagger-core:${swaggerCoreVersion}"
implementation "io.swagger.parser.v3:swagger-parser-core:${swaggerVersion}"
implementation "io.swagger.parser.v3:swagger-parser:${swaggerVersion}"
implementation "io.swagger.parser.v3:swagger-parser-v3:${swaggerVersion}"
// Declare and force Jackson dependencies for tests
testImplementation("com.fasterxml.jackson.core:jackson-databind") {
version { strictly("${jacksonVersion}") }
}
testImplementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310") {
version { strictly("${jacksonVersion}") }
}
testImplementation("com.fasterxml.jackson.core:jackson-annotations") {
version { strictly("${jacksonVersion}") }
}

// ZipArchive dependencies used for integration tests
zipArchive group: 'org.opensearch.plugin', name:'opensearch-ml-plugin', version: "${opensearch_build}"
Expand All @@ -189,7 +209,7 @@ dependencies {
configurations.all {
resolutionStrategy {
force("com.google.guava:guava:33.3.1-jre") // CVE for 31.1, keep to force transitive dependencies
force("com.fasterxml.jackson.core:jackson-core:2.18.0") // Dependency Jar Hell
force("com.fasterxml.jackson.core:jackson-core:${jacksonVersion}") // Dependency Jar Hell
force("org.apache.httpcomponents.core5:httpcore5:5.3") // Dependency Jar Hell
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,4 +235,9 @@ private CommonValue() {}
public static final String CREATE_INGEST_PIPELINE_MODEL_ID = "create_ingest_pipeline.model_id";
/** The field name for reindex source index substitution */
public static final String REINDEX_SOURCE_INDEX = "reindex.source_index";

/**URI for the YAML file of the ML Commons API specification.*/
public static final String ML_COMMONS_API_SPEC_YAML_URI =
"https://raw.githubusercontent.com/opensearch-project/opensearch-api-specification/refs/heads/main/spec/namespaces/ml.yaml";

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.exception;

import org.opensearch.OpenSearchException;

import java.util.List;

/**
* Custom exception to be thrown when an error occurs during the parsing of an API specification.
*/
public class ApiSpecParseException extends OpenSearchException {

/**
* Constructor with message.
*
* @param message The detail message.
*/
public ApiSpecParseException(String message) {
super(message);
}

/**
* Constructor with message and cause.
*
* @param message The detail message.
* @param cause The cause of the exception.
*/
public ApiSpecParseException(String message, Throwable cause) {
super(message, cause);
}

/**
* Constructor with message and list of detailed errors.
*
* @param message The detail message.
* @param details The list of errors encountered during the parsing process.
*/
public ApiSpecParseException(String message, List<String> details) {
super(message + ": " + String.join(", ", details));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessageFactory;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.DocWriteRequest.OpType;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest;
Expand Down Expand Up @@ -693,6 +695,7 @@ public void addResourceToStateIndex(
getAndUpdateResourceInStateDocumentWithRetries(
workflowId,
newResource,
OpType.INDEX,
RETRIES,
ActionListener.runBefore(listener, context::restore)
);
Expand All @@ -701,15 +704,41 @@ public void addResourceToStateIndex(
}

/**
* Performs a get and update of a State Index document adding a new resource with strong consistency and retries
* Removes a resource from the state index, including common exception handling
* @param workflowId The workflow document id in the state index
* @param resourceToDelete The resource to delete
* @param listener the ActionListener for this step to handle completing the future after update
*/
public void deleteResourceFromStateIndex(String workflowId, ResourceCreated resourceToDelete, ActionListener<WorkflowData> listener) {
if (!doesIndexExist(WORKFLOW_STATE_INDEX)) {
String errorMessage = "Failed to update state for " + workflowId + " due to missing " + WORKFLOW_STATE_INDEX + " index";
logger.error(errorMessage);
listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.NOT_FOUND));
} else {
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
getAndUpdateResourceInStateDocumentWithRetries(
workflowId,
resourceToDelete,
OpType.DELETE,
RETRIES,
ActionListener.runBefore(listener, context::restore)
);
}
}
}

/**
* Performs a get and update of a State Index document adding or removing a resource with strong consistency and retries
* @param workflowId The document id to update
* @param newResource The resource to add to the resources created list
* @param resource The resource to add or remove from the resources created list
* @param operation The operation to perform on the resource (INDEX to append to the list or DELETE to remove)
* @param retries The number of retries on update version conflicts
* @param listener The listener to complete on success or failure
*/
private void getAndUpdateResourceInStateDocumentWithRetries(
String workflowId,
ResourceCreated newResource,
ResourceCreated resource,
OpType operation,
int retries,
ActionListener<WorkflowData> listener
) {
Expand All @@ -721,7 +750,11 @@ private void getAndUpdateResourceInStateDocumentWithRetries(
}
WorkflowState currentState = WorkflowState.parse(getResponse.getSourceAsString());
List<ResourceCreated> resourcesCreated = new ArrayList<>(currentState.resourcesCreated());
resourcesCreated.add(newResource);
if (operation == OpType.DELETE) {
resourcesCreated.removeIf(r -> r.resourceMap().equals(resource.resourceMap()));
} else {
resourcesCreated.add(resource);
}
XContentBuilder builder = XContentFactory.jsonBuilder();
WorkflowState newState = WorkflowState.builder(currentState).resourcesCreated(resourcesCreated).build();
newState.toXContent(builder, null);
Expand All @@ -732,41 +765,54 @@ private void getAndUpdateResourceInStateDocumentWithRetries(
client.update(
updateRequest,
ActionListener.wrap(
r -> handleStateUpdateSuccess(workflowId, newResource, listener),
e -> handleStateUpdateException(workflowId, newResource, retries, listener, e)
r -> handleStateUpdateSuccess(workflowId, resource, operation, listener),
e -> handleStateUpdateException(workflowId, resource, operation, retries, listener, e)
)
);
}, ex -> handleStateUpdateException(workflowId, newResource, 0, listener, ex)));
}, ex -> handleStateUpdateException(workflowId, resource, operation, 0, listener, ex)));
}

private void handleStateUpdateSuccess(String workflowId, ResourceCreated newResource, ActionListener<WorkflowData> listener) {
private void handleStateUpdateSuccess(
String workflowId,
ResourceCreated newResource,
OpType operation,
ActionListener<WorkflowData> listener
) {
String resourceName = newResource.resourceType();
String resourceId = newResource.resourceId();
String nodeId = newResource.workflowStepId();
logger.info("Updated resources created for {} on step {} with {} {}", workflowId, nodeId, resourceName, resourceId);
logger.info(
"Updated resources created for {} on step {} to {} resource {} {}",
workflowId,
nodeId,
operation.equals(OpType.DELETE) ? "delete" : "add",
resourceName,
resourceId
);
listener.onResponse(new WorkflowData(Map.of(resourceName, resourceId), workflowId, nodeId));
}

private void handleStateUpdateException(
String workflowId,
ResourceCreated newResource,
OpType operation,
int retries,
ActionListener<WorkflowData> listener,
Exception e
) {
if (e instanceof VersionConflictEngineException && retries > 0) {
// Retry if we haven't exhausted retries
getAndUpdateResourceInStateDocumentWithRetries(workflowId, newResource, retries - 1, listener);
getAndUpdateResourceInStateDocumentWithRetries(workflowId, newResource, operation, retries - 1, listener);
return;
}
String errorMessage = "Failed to update workflow state for "
+ workflowId
+ " on step "
+ newResource.workflowStepId()
+ " with "
+ newResource.resourceType()
+ " "
+ newResource.resourceId();
String errorMessage = ParameterizedMessageFactory.INSTANCE.newMessage(
"Failed to update workflow state for {} on step {} to {} resource {} {}",
workflowId,
newResource.workflowStepId(),
operation.equals(OpType.DELETE) ? "delete" : "add",
newResource.resourceType(),
newResource.resourceId()
).getFormattedMessage();
logger.error(errorMessage, e);
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.opensearch.flowframework.common.CommonValue.ALLOW_DELETE;
Expand Down Expand Up @@ -214,19 +215,32 @@ private void executeDeprovisionSequence(
// Repeat attempting to delete resources as long as at least one is successful
int resourceCount = deprovisionProcessSequence.size();
while (resourceCount > 0) {
PlainActionFuture<WorkflowData> stateUpdateFuture;
Iterator<ProcessNode> iter = deprovisionProcessSequence.iterator();
while (iter.hasNext()) {
do {
ProcessNode deprovisionNode = iter.next();
ResourceCreated resource = getResourceFromDeprovisionNode(deprovisionNode, resourcesCreated);
String resourceNameAndId = getResourceNameAndId(resource);
PlainActionFuture<WorkflowData> deprovisionFuture = deprovisionNode.execute();
stateUpdateFuture = PlainActionFuture.newFuture();
try {
deprovisionFuture.get();
logger.info("Successful {} for {}", deprovisionNode.id(), resourceNameAndId);
// Remove from state index resource list
flowFrameworkIndicesHandler.deleteResourceFromStateIndex(workflowId, resource, stateUpdateFuture);
try {
// Wait at most 1 second for state index update.
stateUpdateFuture.actionGet(1, TimeUnit.SECONDS);
} catch (Exception e) {
// Ignore incremental resource removal failures (or timeouts) as we catch up at the end with remainingResources
}
// Remove from list so we don't try again
iter.remove();
// Pause briefly before next step
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Throwable t) {
// If any deprovision fails due to not found, it's a success
if (t.getCause() instanceof OpenSearchStatusException
Expand All @@ -238,7 +252,7 @@ private void executeDeprovisionSequence(
logger.info("Failed {} for {}", deprovisionNode.id(), resourceNameAndId);
}
}
}
} while (iter.hasNext());
if (deprovisionProcessSequence.size() < resourceCount) {
// If we've deleted something, decrement and try again if not zero
resourceCount = deprovisionProcessSequence.size();
Expand All @@ -259,6 +273,7 @@ private void executeDeprovisionSequence(
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
} else {
Expand All @@ -274,6 +289,7 @@ private void executeDeprovisionSequence(
if (!deleteNotAllowed.isEmpty()) {
logger.info("Resources requiring allow_delete: {}.", deleteNotAllowed);
}
// This is a redundant best-effort backup to the incremental deletion done earlier
updateWorkflowState(workflowId, remainingResources, deleteNotAllowed, listener);
}

Expand Down
Loading

0 comments on commit b8d2f0c

Please sign in to comment.