From 2a441191c285697036d1008f76cdc3795c22989f Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Mon, 4 Mar 2024 15:06:50 -0800 Subject: [PATCH 1/9] Add created, last updated, and last provisioned fields to Template Signed-off-by: Daniel Widdis --- .../flowframework/common/CommonValue.java | 10 +- .../flowframework/model/Template.java | 143 +++++++++++++++--- .../CreateWorkflowTransportAction.java | 13 +- .../resources/mappings/global-context.json | 14 +- .../FlowFrameworkIndicesHandlerTests.java | 5 +- .../flowframework/model/TemplateTests.java | 14 +- .../rest/RestCreateWorkflowActionTests.java | 5 +- .../CreateWorkflowTransportActionTests.java | 15 +- .../GetWorkflowTransportActionTests.java | 5 +- ...ProvisionWorkflowTransportActionTests.java | 5 +- .../WorkflowRequestResponseTests.java | 11 +- .../util/EncryptorUtilsTests.java | 5 +- 12 files changed, 199 insertions(+), 46 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/common/CommonValue.java b/src/main/java/org/opensearch/flowframework/common/CommonValue.java index f4bec21a7..c70a70426 100644 --- a/src/main/java/org/opensearch/flowframework/common/CommonValue.java +++ b/src/main/java/org/opensearch/flowframework/common/CommonValue.java @@ -48,6 +48,12 @@ private CommonValue() {} public static final String CREATE_TIME = "create_time"; /** The template field name for the user who created the workflow **/ public static final String USER_FIELD = "user"; + /** The created time field */ + public static final String CREATED_TIME = "created_time"; + /** The last updated time field */ + public static final String LAST_UPDATED_TIME_FIELD = "last_updated_time"; + /** The last updated time field */ + public static final String LAST_PROVISIONED_TIME_FIELD = "last_provisioned_time"; /* * Constants associated with Rest or Transport actions @@ -156,10 +162,6 @@ private CommonValue() {} public static final String APP_TYPE_FIELD = "app_type"; /** To include field for an agent response */ public static final String INCLUDE_OUTPUT_IN_AGENT_RESPONSE = "include_output_in_agent_response"; - /** The created time field for an agent */ - public static final String CREATED_TIME = "created_time"; - /** The last updated time field for an agent */ - public static final String LAST_UPDATED_TIME_FIELD = "last_updated_time"; /** HttpHost */ public static final String HTTP_HOST_FIELD = "http_host"; /** Http scheme */ diff --git a/src/main/java/org/opensearch/flowframework/model/Template.java b/src/main/java/org/opensearch/flowframework/model/Template.java index 8ef914d65..1a466b956 100644 --- a/src/main/java/org/opensearch/flowframework/model/Template.java +++ b/src/main/java/org/opensearch/flowframework/model/Template.java @@ -21,6 +21,7 @@ import org.opensearch.flowframework.exception.FlowFrameworkException; import java.io.IOException; +import java.time.Instant; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -28,7 +29,10 @@ import java.util.Map.Entry; import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; +import static org.opensearch.flowframework.common.CommonValue.CREATED_TIME; import static org.opensearch.flowframework.common.CommonValue.DESCRIPTION_FIELD; +import static org.opensearch.flowframework.common.CommonValue.LAST_PROVISIONED_TIME_FIELD; +import static org.opensearch.flowframework.common.CommonValue.LAST_UPDATED_TIME_FIELD; import static org.opensearch.flowframework.common.CommonValue.NAME_FIELD; import static org.opensearch.flowframework.common.CommonValue.UI_METADATA_FIELD; import static org.opensearch.flowframework.common.CommonValue.USER_FIELD; @@ -48,14 +52,17 @@ public class Template implements ToXContentObject { /** The template field name for template use case */ public static final String USE_CASE_FIELD = "use_case"; - private String name; - private String description; - private String useCase; // probably an ENUM actually - private Version templateVersion; - private List compatibilityVersion; - private Map workflows; - private Map uiMetadata; - private User user; + private final String name; + private final String description; + private final String useCase; // probably an ENUM actually + private final Version templateVersion; + private final List compatibilityVersion; + private final Map workflows; + private final Map uiMetadata; + private final User user; + private final long createdTime; + private final long lastUpdatedTime; + private final long lastProvisionedTime; /** * Instantiate the object representing a use case template @@ -68,6 +75,9 @@ public class Template implements ToXContentObject { * @param workflows Workflow graph definitions corresponding to the defined operations. * @param uiMetadata The UI metadata related to the given workflow * @param user The user extracted from the thread context from the request + * @param createdTime Created time in milliseconds since the epoch + * @param lastUpdatedTime Last Updated time in milliseconds since the epoch + * @param lastProvisionedTime Last Provisioned time in milliseconds since the epoch */ public Template( String name, @@ -77,7 +87,10 @@ public Template( List compatibilityVersion, Map workflows, Map uiMetadata, - User user + User user, + long createdTime, + long lastUpdatedTime, + long lastProvisionedTime ) { this.name = name; this.description = description; @@ -87,10 +100,11 @@ public Template( this.workflows = Map.copyOf(workflows); this.uiMetadata = uiMetadata; this.user = user; + this.createdTime = createdTime < 0 ? Instant.now().toEpochMilli() : createdTime; + this.lastUpdatedTime = lastUpdatedTime < this.createdTime ? this.createdTime : lastUpdatedTime; + this.lastProvisionedTime = lastProvisionedTime; } - private Template() {} - /** * Class for constructing a Builder for Template */ @@ -103,6 +117,9 @@ public static class Builder { private Map workflows = new HashMap<>(); private Map uiMetadata = null; private User user = null; + private long createdTime = -1L; + private long lastUpdatedTime = -1L; + private long lastProvisionedTime = -1L; /** * Empty Constructor for the Builder object @@ -189,23 +206,55 @@ public Builder user(User user) { return this; } + /** + * Builder method for adding createdTime + * @param createdTime created time in milliseconds since the epoch + * @return the Builder object + */ + public Builder createdTime(long createdTime) { + this.createdTime = createdTime; + return this; + } + + /** + * Builder method for adding lastUpdatedTime + * @param lastUpdatedTime last updated time in milliseconds since the epoch + * @return the Builder object + */ + public Builder lastUpdatedTime(long lastUpdatedTime) { + this.lastUpdatedTime = lastUpdatedTime; + return this; + } + + /** + * Builder method for adding lastProvisionedTime + * @param lastProvisionedTime last provisioned time in milliseconds since the epoch + * @return the Builder object + */ + public Builder lastProvisionedTime(long lastProvisionedTime) { + this.lastProvisionedTime = lastProvisionedTime; + return this; + } + /** * Allows building a template * @return Template Object containing all needed fields */ public Template build() { - Template template = new Template(); - template.name = this.name; - template.description = this.description; - template.useCase = this.useCase; - template.templateVersion = this.templateVersion; - template.compatibilityVersion = this.compatibilityVersion; - template.workflows = this.workflows; - template.uiMetadata = this.uiMetadata; - template.user = this.user; - return template; + return new Template( + this.name, + this.description, + this.useCase, + this.templateVersion, + this.compatibilityVersion, + this.workflows, + this.uiMetadata, + this.user, + this.createdTime, + this.lastUpdatedTime, + this.lastProvisionedTime + ); } - } @Override @@ -244,6 +293,18 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws xContentBuilder.field(USER_FIELD, user); } + if (createdTime > 0) { + xContentBuilder.field(CREATED_TIME, createdTime); + } + + if (lastUpdatedTime > 0) { + xContentBuilder.field(LAST_UPDATED_TIME_FIELD, lastUpdatedTime); + } + + if (lastProvisionedTime > 0) { + xContentBuilder.field(LAST_PROVISIONED_TIME_FIELD, lastProvisionedTime); + } + return xContentBuilder.endObject(); } @@ -263,6 +324,9 @@ public static Template parse(XContentParser parser) throws IOException { Map workflows = new HashMap<>(); Map uiMetadata = null; User user = null; + long createdTime = -1; + long lastUpdatedTime = -1; + long lastProvisionedTime = -1; ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); while (parser.nextToken() != XContentParser.Token.END_OBJECT) { @@ -315,6 +379,15 @@ public static Template parse(XContentParser parser) throws IOException { case USER_FIELD: user = User.parse(parser); break; + case CREATED_TIME: + createdTime = parser.longValue(); + break; + case LAST_UPDATED_TIME_FIELD: + lastUpdatedTime = parser.longValue(); + break; + case LAST_PROVISIONED_TIME_FIELD: + lastProvisionedTime = parser.longValue(); + break; default: throw new FlowFrameworkException( "Unable to parse field [" + fieldName + "] in a template object.", @@ -334,6 +407,9 @@ public static Template parse(XContentParser parser) throws IOException { .workflows(workflows) .uiMetadata(uiMetadata) .user(user) + .createdTime(createdTime) + .lastUpdatedTime(lastUpdatedTime) + .lastProvisionedTime(lastProvisionedTime) .build(); } @@ -449,6 +525,27 @@ public User getUser() { return user; } + /** + * @return the createdTime + */ + public long createdTime() { + return createdTime; + } + + /** + * @return the lastUpdatedTime + */ + public long lastUpdatedTime() { + return lastUpdatedTime; + } + + /** + * @return the lastProvisionedTime + */ + public long lastProvisionedTime() { + return lastProvisionedTime; + } + @Override public String toString() { return "Template [name=" @@ -464,7 +561,7 @@ public String toString() { + ", workflows=" + workflows + ", uiMedata=" - + uiMetadata + + (uiMetadata == null ? "{}" : uiMetadata) + "]"; } } diff --git a/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java index cccbc26e2..d314a513f 100644 --- a/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java @@ -93,7 +93,7 @@ public CreateWorkflowTransportAction( protected void doExecute(Task task, WorkflowRequest request, ActionListener listener) { User user = getUserContext(client); - Template templateWithUser = new Template( + Template templateWithUserAndTimestamps = new Template( request.getTemplate().name(), request.getTemplate().description(), request.getTemplate().useCase(), @@ -101,15 +101,18 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener { flowFrameworkIndicesHandler.putInitialStateToWorkflowState( globalContextResponse.getId(), diff --git a/src/main/resources/mappings/global-context.json b/src/main/resources/mappings/global-context.json index dd282f40a..61b8a5487 100644 --- a/src/main/resources/mappings/global-context.json +++ b/src/main/resources/mappings/global-context.json @@ -1,7 +1,7 @@ { "dynamic": false, "_meta": { - "schema_version": 1 + "schema_version": 2 }, "properties": { "workflow_id": { @@ -73,6 +73,18 @@ } } } + }, + "created_time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + }, + "last_updated_time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + }, + "last_provisioned_time": { + "type": "date", + "format": "strict_date_time||epoch_millis" } } } diff --git a/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java b/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java index eda561f90..aa7ab4710 100644 --- a/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java +++ b/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java @@ -120,7 +120,10 @@ public void setUp() throws Exception { compatibilityVersions, Map.of("workflow", workflow), Collections.emptyMap(), - TestHelpers.randomUser() + TestHelpers.randomUser(), + -1L, + -1L, + -1L ); } diff --git a/src/test/java/org/opensearch/flowframework/model/TemplateTests.java b/src/test/java/org/opensearch/flowframework/model/TemplateTests.java index e0b88d725..95e9aaa9a 100644 --- a/src/test/java/org/opensearch/flowframework/model/TemplateTests.java +++ b/src/test/java/org/opensearch/flowframework/model/TemplateTests.java @@ -12,6 +12,7 @@ import org.opensearch.core.rest.RestStatus; import org.opensearch.flowframework.exception.FlowFrameworkException; import org.opensearch.test.OpenSearchTestCase; +import org.joda.time.Instant; import java.io.IOException; import java.util.Collections; @@ -48,7 +49,10 @@ public void testTemplate() throws IOException { compatibilityVersion, Map.of("workflow", workflow), uiMetadata, - null + null, + -1L, + -1L, + -1L ); assertEquals("test", template.name()); @@ -59,6 +63,10 @@ public void testTemplate() throws IOException { assertEquals(uiMetadata, template.getUiMetadata()); Workflow wf = template.workflows().get("workflow"); assertNotNull(wf); + assertTrue(template.createdTime() > Instant.now().minus(10_000).getMillis()); + assertTrue(template.createdTime() < Instant.now().getMillis()); + assertEquals(template.createdTime(), template.lastUpdatedTime()); + assertEquals(-1, template.lastProvisionedTime()); assertEquals("Workflow [userParams={key=value}, nodes=[A, B], edges=[A->B]]", wf.toString()); String json = TemplateTestJsonUtil.parseToJson(template); @@ -72,6 +80,10 @@ public void testTemplate() throws IOException { assertEquals(uiMetadata, templateX.getUiMetadata()); Workflow wfX = templateX.workflows().get("workflow"); assertNotNull(wfX); + assertTrue(template.createdTime() > Instant.now().minus(10_000).getMillis()); + assertTrue(template.createdTime() < Instant.now().getMillis()); + assertEquals(template.lastUpdatedTime(), template.createdTime()); + assertEquals(-1, template.lastProvisionedTime()); assertEquals("Workflow [userParams={key=value}, nodes=[A, B], edges=[A->B]]", wfX.toString()); } diff --git a/src/test/java/org/opensearch/flowframework/rest/RestCreateWorkflowActionTests.java b/src/test/java/org/opensearch/flowframework/rest/RestCreateWorkflowActionTests.java index d95f10375..0af971cfd 100644 --- a/src/test/java/org/opensearch/flowframework/rest/RestCreateWorkflowActionTests.java +++ b/src/test/java/org/opensearch/flowframework/rest/RestCreateWorkflowActionTests.java @@ -73,7 +73,10 @@ public void setUp() throws Exception { compatibilityVersions, Map.of("workflow", workflow), Collections.emptyMap(), - TestHelpers.randomUser() + TestHelpers.randomUser(), + -1L, + -1L, + -1L ); // Invalid template configuration, wrong field name diff --git a/src/test/java/org/opensearch/flowframework/transport/CreateWorkflowTransportActionTests.java b/src/test/java/org/opensearch/flowframework/transport/CreateWorkflowTransportActionTests.java index 7dd8267f1..f1964091e 100644 --- a/src/test/java/org/opensearch/flowframework/transport/CreateWorkflowTransportActionTests.java +++ b/src/test/java/org/opensearch/flowframework/transport/CreateWorkflowTransportActionTests.java @@ -123,7 +123,10 @@ public void setUp() throws Exception { compatibilityVersions, Map.of("workflow", workflow), Collections.emptyMap(), - TestHelpers.randomUser() + TestHelpers.randomUser(), + -1L, + -1L, + -1L ); } @@ -185,7 +188,10 @@ public void testValidation_Failed() throws Exception { List.of(Version.fromString("2.0.0"), Version.fromString("3.0.0")), Map.of("workflow", workflow), Collections.emptyMap(), - TestHelpers.randomUser() + TestHelpers.randomUser(), + -1L, + -1L, + -1L ); @SuppressWarnings("unchecked") @@ -501,7 +507,10 @@ private Template generateValidTemplate() { List.of(Version.fromString("2.0.0"), Version.fromString("3.0.0")), Map.of("workflow", workflow), Collections.emptyMap(), - TestHelpers.randomUser() + TestHelpers.randomUser(), + -1L, + -1L, + -1L ); return validTemplate; diff --git a/src/test/java/org/opensearch/flowframework/transport/GetWorkflowTransportActionTests.java b/src/test/java/org/opensearch/flowframework/transport/GetWorkflowTransportActionTests.java index a3b259ad9..79cd0b2a2 100644 --- a/src/test/java/org/opensearch/flowframework/transport/GetWorkflowTransportActionTests.java +++ b/src/test/java/org/opensearch/flowframework/transport/GetWorkflowTransportActionTests.java @@ -89,7 +89,10 @@ public void setUp() throws Exception { compatibilityVersions, Map.of("provision", workflow), Collections.emptyMap(), - TestHelpers.randomUser() + TestHelpers.randomUser(), + -1L, + -1L, + -1L ); ThreadPool clientThreadPool = mock(ThreadPool.class); diff --git a/src/test/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportActionTests.java b/src/test/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportActionTests.java index 2b2f58532..d0974532e 100644 --- a/src/test/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportActionTests.java +++ b/src/test/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportActionTests.java @@ -99,7 +99,10 @@ public void setUp() throws Exception { compatibilityVersions, Map.of("provision", workflow), Collections.emptyMap(), - TestHelpers.randomUser() + TestHelpers.randomUser(), + -1L, + -1L, + -1L ); ThreadPool clientThreadPool = mock(ThreadPool.class); diff --git a/src/test/java/org/opensearch/flowframework/transport/WorkflowRequestResponseTests.java b/src/test/java/org/opensearch/flowframework/transport/WorkflowRequestResponseTests.java index 200312bec..0946f53ac 100644 --- a/src/test/java/org/opensearch/flowframework/transport/WorkflowRequestResponseTests.java +++ b/src/test/java/org/opensearch/flowframework/transport/WorkflowRequestResponseTests.java @@ -52,7 +52,10 @@ public void setUp() throws Exception { compatibilityVersions, Map.of("workflow", workflow), Collections.emptyMap(), - TestHelpers.randomUser() + TestHelpers.randomUser(), + -1L, + -1L, + -1L ); } @@ -71,7 +74,7 @@ public void testNullIdWorkflowRequest() throws IOException { WorkflowRequest streamInputRequest = new WorkflowRequest(in); assertEquals(nullIdRequest.getWorkflowId(), streamInputRequest.getWorkflowId()); - assertEquals(nullIdRequest.getTemplate().toJson(), streamInputRequest.getTemplate().toJson()); + assertEquals(nullIdRequest.getTemplate().toString(), streamInputRequest.getTemplate().toString()); assertNull(nullIdRequest.validate()); assertFalse(nullIdRequest.isProvision()); assertTrue(nullIdRequest.getParams().isEmpty()); @@ -113,7 +116,7 @@ public void testWorkflowRequest() throws IOException { WorkflowRequest streamInputRequest = new WorkflowRequest(in); assertEquals(workflowRequest.getWorkflowId(), streamInputRequest.getWorkflowId()); - assertEquals(workflowRequest.getTemplate().toJson(), streamInputRequest.getTemplate().toJson()); + assertEquals(workflowRequest.getTemplate().toString(), streamInputRequest.getTemplate().toString()); assertNull(workflowRequest.validate()); assertFalse(workflowRequest.isProvision()); assertTrue(workflowRequest.getParams().isEmpty()); @@ -134,7 +137,7 @@ public void testWorkflowRequestWithParams() throws IOException { WorkflowRequest streamInputRequest = new WorkflowRequest(in); assertEquals(workflowRequest.getWorkflowId(), streamInputRequest.getWorkflowId()); - assertEquals(workflowRequest.getTemplate().toJson(), streamInputRequest.getTemplate().toJson()); + assertEquals(workflowRequest.getTemplate().toString(), streamInputRequest.getTemplate().toString()); assertNull(workflowRequest.validate()); assertTrue(workflowRequest.isProvision()); assertEquals("bar", workflowRequest.getParams().get("foo")); diff --git a/src/test/java/org/opensearch/flowframework/util/EncryptorUtilsTests.java b/src/test/java/org/opensearch/flowframework/util/EncryptorUtilsTests.java index a952b4253..5d404b907 100644 --- a/src/test/java/org/opensearch/flowframework/util/EncryptorUtilsTests.java +++ b/src/test/java/org/opensearch/flowframework/util/EncryptorUtilsTests.java @@ -77,7 +77,10 @@ public void setUp() throws Exception { compatibilityVersions, Map.of("provision", workflow), Collections.emptyMap(), - TestHelpers.randomUser() + TestHelpers.randomUser(), + -1L, + -1L, + -1L ); ClusterState clusterState = mock(ClusterState.class); From ae365732023d436bf80757ef2814b12951732806 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Mon, 4 Mar 2024 16:48:24 -0800 Subject: [PATCH 2/9] Change last updated timestamp when updating workflow Signed-off-by: Daniel Widdis --- .../CreateWorkflowTransportAction.java | 106 ++++++++++++------ .../CreateWorkflowTransportActionTests.java | 45 ++++++++ 2 files changed, 117 insertions(+), 34 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java index d314a513f..e5995f739 100644 --- a/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.ExceptionsHelper; +import org.opensearch.action.get.GetRequest; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.HandledTransportAction; @@ -38,12 +39,14 @@ import org.opensearch.tasks.Task; import org.opensearch.transport.TransportService; +import java.time.Instant; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; import static java.lang.Boolean.FALSE; +import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX; import static org.opensearch.flowframework.common.CommonValue.PROVISIONING_PROGRESS_FIELD; import static org.opensearch.flowframework.common.CommonValue.STATE_FIELD; import static org.opensearch.flowframework.util.ParseUtils.getUserContext; @@ -120,8 +123,9 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener { - flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc( - request.getWorkflowId(), - Map.ofEntries( - Map.entry(STATE_FIELD, State.NOT_STARTED), - Map.entry(PROVISIONING_PROGRESS_FIELD, ProvisioningProgress.NOT_STARTED) - ), - ActionListener.wrap(updateResponse -> { - logger.info("updated workflow {} state to {}", request.getWorkflowId(), State.NOT_STARTED.name()); - listener.onResponse(new WorkflowResponse(request.getWorkflowId())); - }, exception -> { - String errorMessage = "Failed to update workflow " + request.getWorkflowId() + " in template index"; - logger.error(errorMessage, exception); - if (exception instanceof FlowFrameworkException) { - listener.onFailure(exception); - } else { - listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception))); - } - }) - ); - }, exception -> { - String errorMessage = "Failed to update use case template " + request.getWorkflowId(); - logger.error(errorMessage, exception); - if (exception instanceof FlowFrameworkException) { - listener.onFailure(exception); + // This is an existing workflow (PUT) + // Fetch existing entry for time stamps + logger.info("Querying existing workflow from global context: {}", workflowId); + try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { + client.get(new GetRequest(GLOBAL_CONTEXT_INDEX, workflowId), ActionListener.wrap(getResponse -> { + context.restore(); + if (getResponse.isExists()) { + Template existingTemplate = Template.parse(getResponse.getSourceAsString()); + Template newTemplate = request.getTemplate(); + // Update existing entry, full document replacement + Template template = new Template.Builder().name(newTemplate.name()) + .description(newTemplate.description()) + .useCase(newTemplate.useCase()) + .templateVersion(newTemplate.templateVersion()) + .compatibilityVersion(newTemplate.compatibilityVersion()) + .workflows(newTemplate.workflows()) + .uiMetadata(newTemplate.getUiMetadata()) + .user(newTemplate.getUser()) // Should we care about old user here? + .createdTime(existingTemplate.createdTime()) + .lastUpdatedTime(Instant.now().toEpochMilli()) + .lastProvisionedTime(existingTemplate.lastProvisionedTime()) + .build(); + flowFrameworkIndicesHandler.updateTemplateInGlobalContext( + request.getWorkflowId(), + template, + ActionListener.wrap(response -> { + flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc( + request.getWorkflowId(), + Map.ofEntries( + Map.entry(STATE_FIELD, State.NOT_STARTED), + Map.entry(PROVISIONING_PROGRESS_FIELD, ProvisioningProgress.NOT_STARTED) + ), + ActionListener.wrap(updateResponse -> { + logger.info("updated workflow {} state to {}", request.getWorkflowId(), State.NOT_STARTED.name()); + listener.onResponse(new WorkflowResponse(request.getWorkflowId())); + }, exception -> { + String errorMessage = "Failed to update workflow " + request.getWorkflowId() + " in template index"; + logger.error(errorMessage, exception); + if (exception instanceof FlowFrameworkException) { + listener.onFailure(exception); + } else { + listener.onFailure( + new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception)) + ); + } + }) + ); + }, exception -> { + String errorMessage = "Failed to update use case template " + request.getWorkflowId(); + logger.error(errorMessage, exception); + if (exception instanceof FlowFrameworkException) { + listener.onFailure(exception); + } else { + listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception))); + } + }) + ); } else { - listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception))); + String errorMessage = "Failed to retrieve template (" + workflowId + ") from global context."; + logger.error(errorMessage); + listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.NOT_FOUND)); } - }) - ); + }, exception -> { + String errorMessage = "Failed to retrieve template (" + workflowId + ") from global context."; + logger.error(errorMessage, exception); + listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception))); + })); + } } } diff --git a/src/test/java/org/opensearch/flowframework/transport/CreateWorkflowTransportActionTests.java b/src/test/java/org/opensearch/flowframework/transport/CreateWorkflowTransportActionTests.java index f1964091e..877e91ad1 100644 --- a/src/test/java/org/opensearch/flowframework/transport/CreateWorkflowTransportActionTests.java +++ b/src/test/java/org/opensearch/flowframework/transport/CreateWorkflowTransportActionTests.java @@ -10,6 +10,8 @@ import org.apache.lucene.search.TotalHits; import org.opensearch.Version; +import org.opensearch.action.get.GetRequest; +import org.opensearch.action.get.GetResponse; import org.opensearch.action.index.IndexResponse; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; @@ -321,6 +323,15 @@ public void testFailedToUpdateWorkflow() { ActionListener listener = mock(ActionListener.class); WorkflowRequest updateWorkflow = new WorkflowRequest("1", template); + doAnswer(invocation -> { + ActionListener getListener = invocation.getArgument(1); + GetResponse getResponse = mock(GetResponse.class); + when(getResponse.isExists()).thenReturn(true); + when(getResponse.getSourceAsString()).thenReturn(template.toJson()); + getListener.onResponse(getResponse); + return null; + }).when(client).get(any(GetRequest.class), any()); + doAnswer(invocation -> { ActionListener responseListener = invocation.getArgument(2); responseListener.onFailure(new Exception("failed")); @@ -333,11 +344,45 @@ public void testFailedToUpdateWorkflow() { assertEquals("Failed to update use case template 1", exceptionCaptor.getValue().getMessage()); } + public void testFailedToUpdateNonExistingWorkflow() { + @SuppressWarnings("unchecked") + ActionListener listener = mock(ActionListener.class); + WorkflowRequest updateWorkflow = new WorkflowRequest("2", template); + + doAnswer(invocation -> { + ActionListener getListener = invocation.getArgument(1); + GetResponse getResponse = mock(GetResponse.class); + when(getResponse.isExists()).thenReturn(false); + getListener.onResponse(getResponse); + return null; + }).when(client).get(any(GetRequest.class), any()); + + doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(2); + responseListener.onFailure(new Exception("failed")); + return null; + }).when(flowFrameworkIndicesHandler).updateTemplateInGlobalContext(any(), any(Template.class), any()); + + createWorkflowTransportAction.doExecute(mock(Task.class), updateWorkflow, listener); + ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); + verify(listener, times(1)).onFailure(exceptionCaptor.capture()); + assertEquals("Failed to retrieve template (2) from global context.", exceptionCaptor.getValue().getMessage()); + } + public void testUpdateWorkflow() { @SuppressWarnings("unchecked") ActionListener listener = mock(ActionListener.class); WorkflowRequest updateWorkflow = new WorkflowRequest("1", template); + doAnswer(invocation -> { + ActionListener getListener = invocation.getArgument(1); + GetResponse getResponse = mock(GetResponse.class); + when(getResponse.isExists()).thenReturn(true); + when(getResponse.getSourceAsString()).thenReturn(new Template.Builder().name("test").build().toJson()); + getListener.onResponse(getResponse); + return null; + }).when(client).get(any(GetRequest.class), any()); + doAnswer(invocation -> { ActionListener responseListener = invocation.getArgument(2); responseListener.onResponse(new IndexResponse(new ShardId(GLOBAL_CONTEXT_INDEX, "", 1), "1", 1L, 1L, 1L, true)); From 1fa040e91565ee372078f174d3745724714117ff Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Mon, 4 Mar 2024 17:04:40 -0800 Subject: [PATCH 3/9] Change last provisioned timestamp when provisioning workflow Signed-off-by: Daniel Widdis --- CHANGELOG.md | 2 ++ .../ProvisionWorkflowTransportAction.java | 36 +++++++++++++++++-- ...ProvisionWorkflowTransportActionTests.java | 8 +++++ 3 files changed, 43 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 725d6a338..2bbe6db32 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) - Added an optional workflow_step param to the get workflow steps API ([#538](https://github.com/opensearch-project/flow-framework/pull/538)) ### Enhancements +- Add created, updated, and provisioned timestamps to saved template ([#551](https://github.com/opensearch-project/flow-framework/pull/551)) + ### Bug Fixes ### Infrastructure ### Documentation diff --git a/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java index 4ee10d557..86da71575 100644 --- a/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java @@ -118,10 +118,10 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener { if (TRUE.equals(workflowIsNotStarted)) { + // update state index flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc( workflowId, Map.ofEntries( @@ -145,7 +146,36 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener { logger.info("updated workflow {} state to {}", request.getWorkflowId(), State.PROVISIONING); executeWorkflowAsync(workflowId, provisionProcessSequence, listener); - listener.onResponse(new WorkflowResponse(workflowId)); + // update last provisioned field in template + Template newTemplate = new Template.Builder().name(template.name()) + .description(template.description()) + .useCase(template.useCase()) + .templateVersion(template.templateVersion()) + .compatibilityVersion(template.compatibilityVersion()) + .workflows(template.workflows()) + .uiMetadata(template.getUiMetadata()) + .user(template.getUser()) // Should we care about old user here? + .createdTime(template.createdTime()) + .lastUpdatedTime(template.lastUpdatedTime()) + .lastProvisionedTime(Instant.now().toEpochMilli()) + .build(); + flowFrameworkIndicesHandler.updateTemplateInGlobalContext( + request.getWorkflowId(), + newTemplate, + ActionListener.wrap(templateResponse -> { + listener.onResponse(new WorkflowResponse(request.getWorkflowId())); + }, exception -> { + String errorMessage = "Failed to update use case template " + request.getWorkflowId(); + logger.error(errorMessage, exception); + if (exception instanceof FlowFrameworkException) { + listener.onFailure(exception); + } else { + listener.onFailure( + new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception)) + ); + } + }) + ); }, exception -> { String errorMessage = "Failed to update workflow state: " + workflowId; logger.error(errorMessage, exception); diff --git a/src/test/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportActionTests.java b/src/test/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportActionTests.java index d0974532e..3969ec4ee 100644 --- a/src/test/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportActionTests.java +++ b/src/test/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportActionTests.java @@ -11,6 +11,7 @@ import org.opensearch.Version; import org.opensearch.action.get.GetRequest; import org.opensearch.action.get.GetResponse; +import org.opensearch.action.index.IndexResponse; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.update.UpdateResponse; import org.opensearch.client.Client; @@ -19,6 +20,7 @@ import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.flowframework.TestHelpers; import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; @@ -147,6 +149,12 @@ public void testProvisionWorkflow() { return null; }).when(flowFrameworkIndicesHandler).updateFlowFrameworkSystemIndexDoc(any(), any(), any()); + doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(2); + responseListener.onResponse(new IndexResponse(new ShardId(GLOBAL_CONTEXT_INDEX, "", 1), "1", 1L, 1L, 1L, true)); + return null; + }).when(flowFrameworkIndicesHandler).updateTemplateInGlobalContext(any(), any(Template.class), any()); + provisionWorkflowTransportAction.doExecute(mock(Task.class), workflowRequest, listener); ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(WorkflowResponse.class); verify(listener, times(1)).onResponse(responseCaptor.capture()); From 6216858491a79ee51c7b9da8867ef1fa0cce7007 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Mon, 4 Mar 2024 21:57:04 -0800 Subject: [PATCH 4/9] Allow overriding template not started check Signed-off-by: Daniel Widdis --- .../indices/FlowFrameworkIndicesHandler.java | 13 ++++++++++++- .../transport/ProvisionWorkflowTransportAction.java | 4 +++- .../flowframework/rest/FlowFrameworkRestApiIT.java | 2 +- .../ProvisionWorkflowTransportActionTests.java | 3 ++- 4 files changed, 18 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java index b0a629216..741ce4d4f 100644 --- a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java +++ b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java @@ -395,6 +395,17 @@ public void putInitialStateToWorkflowState(String workflowId, User user, ActionL * @param listener action listener */ public void updateTemplateInGlobalContext(String documentId, Template template, ActionListener listener) { + updateTemplateInGlobalContext(documentId, template, listener, false); + } + + /** + * Replaces a document in the global context index + * @param documentId the document Id + * @param template the use-case template + * @param listener action listener + * @param force if set true, ignores the requirement that the provisioning is not started + */ + public void updateTemplateInGlobalContext(String documentId, Template template, ActionListener listener, boolean force) { if (!doesIndexExist(GLOBAL_CONTEXT_INDEX)) { String errorMessage = "Failed to update template for workflow_id : " + documentId + ", global_context index does not exist."; logger.error(errorMessage); @@ -404,7 +415,7 @@ public void updateTemplateInGlobalContext(String documentId, Template template, doesTemplateExist(documentId, templateExists -> { if (templateExists) { isWorkflowNotStarted(documentId, workflowIsNotStarted -> { - if (workflowIsNotStarted) { + if (workflowIsNotStarted || force) { IndexRequest request = new IndexRequest(GLOBAL_CONTEXT_INDEX).id(documentId); try ( XContentBuilder builder = XContentFactory.jsonBuilder(); diff --git a/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java index 86da71575..c5f52ec71 100644 --- a/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java @@ -174,7 +174,9 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener { String errorMessage = "Failed to update workflow state: " + workflowId; diff --git a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java index 5e65d8cb1..d6c3ffa65 100644 --- a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java +++ b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java @@ -80,7 +80,7 @@ public void testFailedUpdateWorkflow() throws Exception { Template template = TestHelpers.createTemplateFromFile("createconnector-registerremotemodel-deploymodel.json"); ResponseException exception = expectThrows(ResponseException.class, () -> updateWorkflow(client(), "123", template)); - assertTrue(exception.getMessage().contains("Failed to get template: 123")); + assertTrue(exception.getMessage().contains("Failed to retrieve template (123) from global context.")); Response response = createWorkflow(client(), template); assertEquals(RestStatus.CREATED, TestHelpers.restStatus(response)); diff --git a/src/test/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportActionTests.java b/src/test/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportActionTests.java index 3969ec4ee..336e25e23 100644 --- a/src/test/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportActionTests.java +++ b/src/test/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportActionTests.java @@ -46,6 +46,7 @@ import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -153,7 +154,7 @@ public void testProvisionWorkflow() { ActionListener responseListener = invocation.getArgument(2); responseListener.onResponse(new IndexResponse(new ShardId(GLOBAL_CONTEXT_INDEX, "", 1), "1", 1L, 1L, 1L, true)); return null; - }).when(flowFrameworkIndicesHandler).updateTemplateInGlobalContext(any(), any(Template.class), any()); + }).when(flowFrameworkIndicesHandler).updateTemplateInGlobalContext(any(), any(Template.class), any(), anyBoolean()); provisionWorkflowTransportAction.doExecute(mock(Task.class), workflowRequest, listener); ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(WorkflowResponse.class); From 93fe0f591a58ca3e280470b1eaca9a456534a00f Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Mon, 4 Mar 2024 23:27:08 -0800 Subject: [PATCH 5/9] Use java.time and not joda time Signed-off-by: Daniel Widdis --- .../opensearch/flowframework/model/TemplateTests.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/test/java/org/opensearch/flowframework/model/TemplateTests.java b/src/test/java/org/opensearch/flowframework/model/TemplateTests.java index 95e9aaa9a..2b999464e 100644 --- a/src/test/java/org/opensearch/flowframework/model/TemplateTests.java +++ b/src/test/java/org/opensearch/flowframework/model/TemplateTests.java @@ -12,9 +12,9 @@ import org.opensearch.core.rest.RestStatus; import org.opensearch.flowframework.exception.FlowFrameworkException; import org.opensearch.test.OpenSearchTestCase; -import org.joda.time.Instant; import java.io.IOException; +import java.time.Instant; import java.util.Collections; import java.util.List; import java.util.Map; @@ -63,8 +63,8 @@ public void testTemplate() throws IOException { assertEquals(uiMetadata, template.getUiMetadata()); Workflow wf = template.workflows().get("workflow"); assertNotNull(wf); - assertTrue(template.createdTime() > Instant.now().minus(10_000).getMillis()); - assertTrue(template.createdTime() < Instant.now().getMillis()); + assertTrue(template.createdTime() > Instant.now().minusSeconds(10).toEpochMilli()); + assertTrue(template.createdTime() <= Instant.now().toEpochMilli()); assertEquals(template.createdTime(), template.lastUpdatedTime()); assertEquals(-1, template.lastProvisionedTime()); assertEquals("Workflow [userParams={key=value}, nodes=[A, B], edges=[A->B]]", wf.toString()); @@ -80,8 +80,8 @@ public void testTemplate() throws IOException { assertEquals(uiMetadata, templateX.getUiMetadata()); Workflow wfX = templateX.workflows().get("workflow"); assertNotNull(wfX); - assertTrue(template.createdTime() > Instant.now().minus(10_000).getMillis()); - assertTrue(template.createdTime() < Instant.now().getMillis()); + assertTrue(template.createdTime() > Instant.now().minusSeconds(10).toEpochMilli()); + assertTrue(template.createdTime() <= Instant.now().toEpochMilli()); assertEquals(template.lastUpdatedTime(), template.createdTime()); assertEquals(-1, template.lastProvisionedTime()); assertEquals("Workflow [userParams={key=value}, nodes=[A, B], edges=[A->B]]", wfX.toString()); From 7f5cd1056a26bb1f4b60cf59ff56ce888bb950d6 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Tue, 5 Mar 2024 09:02:51 -0800 Subject: [PATCH 6/9] Preserve timestamps when encrypting and redacting template Signed-off-by: Daniel Widdis --- .../CreateWorkflowTransportAction.java | 31 +++++++++---------- .../flowframework/util/EncryptorUtils.java | 18 +++++------ 2 files changed, 23 insertions(+), 26 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java index 71e47fb43..f987d7948 100644 --- a/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java @@ -96,7 +96,7 @@ public CreateWorkflowTransportAction( protected void doExecute(Task task, WorkflowRequest request, ActionListener listener) { User user = getUserContext(client); - Template templateWithUserAndTimestamps = new Template( + Template templateWithUser = new Template( request.getTemplate().name(), request.getTemplate().description(), request.getTemplate().useCase(), @@ -105,17 +105,17 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener { flowFrameworkIndicesHandler.putInitialStateToWorkflowState( globalContextResponse.getId(), @@ -238,16 +238,15 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener cipherFunction) { - Template.Builder processedTemplateBuilder = new Template.Builder(); - Map processedWorkflows = new HashMap<>(); for (Map.Entry entry : template.workflows().entrySet()) { @@ -161,7 +159,7 @@ private Template processTemplateCredentials(Template template, Function processedWorkflows = new HashMap<>(); for (Map.Entry entry : template.workflows().entrySet()) { @@ -241,7 +238,7 @@ public Template redactTemplateCredentials(Template template) { processedWorkflows.put(entry.getKey(), new Workflow(entry.getValue().userParams(), processedNodes, entry.getValue().edges())); } - Template processedTemplate = redactedTemplateBuilder.name(template.name()) + return new Template.Builder().name(template.name()) .description(template.description()) .useCase(template.useCase()) .templateVersion(template.templateVersion()) @@ -249,9 +246,10 @@ public Template redactTemplateCredentials(Template template) { .workflows(processedWorkflows) .uiMetadata(template.getUiMetadata()) .user(template.getUser()) + .createdTime(template.createdTime()) + .lastUpdatedTime(template.lastUpdatedTime()) + .lastProvisionedTime(template.lastProvisionedTime()) .build(); - - return processedTemplate; } /** From 765eae20449deede7ab707cbccdec963d62e3bd6 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Tue, 5 Mar 2024 19:27:45 -0800 Subject: [PATCH 7/9] Add bwc tests, more timestamp testing Signed-off-by: Daniel Widdis --- .github/workflows/test_bwc.yml | 45 ++++ build.gradle | 192 ++++++++++++++++++ .../flowframework/model/Template.java | 4 +- .../CreateWorkflowTransportAction.java | 7 +- ...FlowFrameworkBackwardsCompatibilityIT.java | 189 +++++++++++++++++ .../flowframework/model/TemplateTests.java | 15 +- .../rest/FlowFrameworkRestApiIT.java | 47 +++++ src/test/resources/template/noop.json | 13 ++ 8 files changed, 499 insertions(+), 13 deletions(-) create mode 100644 .github/workflows/test_bwc.yml create mode 100644 src/test/java/org/opensearch/flowframework/bwc/FlowFrameworkBackwardsCompatibilityIT.java create mode 100644 src/test/resources/template/noop.json diff --git a/.github/workflows/test_bwc.yml b/.github/workflows/test_bwc.yml new file mode 100644 index 000000000..b5f2e2d16 --- /dev/null +++ b/.github/workflows/test_bwc.yml @@ -0,0 +1,45 @@ +name: BWC +on: + push: + branches: + - "**" + pull_request: + branches: + - "**" + +jobs: + Build-ff-linux: + strategy: + matrix: + java: [11,17,21] + fail-fast: false + + name: Test Flow Framework BWC + runs-on: ubuntu-latest + + steps: + - name: Setup Java ${{ matrix.java }} + uses: actions/setup-java@v4 + with: + distribution: 'temurin' + java-version: ${{ matrix.java }} + + - name: Checkout Flow Framework + uses: actions/checkout@v4 + + - name: Assemble Flow Framework + run: | + plugin_version=`./gradlew properties -q | grep "opensearch_build:" | awk '{print $2}'` + echo plugin_version $plugin_version + ./gradlew assemble + echo "Creating ./src/test/resources/org/opensearch/flowframework/bwc/flow-framework/$plugin_version ..." + mkdir -p ./src/test/resources/org/opensearch/flowframework/bwc/flow-framework/$plugin_version + echo "Copying ./build/distributions/*.zip to ./src/test/resources/org/opensearch/flowframework/bwc/flow-framework/$plugin_version ..." + ls ./build/distributions/ + cp ./build/distributions/*.zip ./src/test/resources/org/opensearch/flowframework/bwc/flow-framework/$plugin_version + echo "Copied ./build/distributions/*.zip to ./src/test/resources/org/opensearch/flowframework/bwc/flow-framework/$plugin_version ..." + ls ./src/test/resources/org/opensearch/flowframework/bwc/flow-framework/$plugin_version + - name: Run Flow Framework Backwards Compatibility Tests + run: | + echo "Running backwards compatibility tests ..." + ./gradlew bwcTestSuite -Dtests.security.manager=false diff --git a/build.gradle b/build.gradle index a652cca1f..2f008a1c6 100644 --- a/build.gradle +++ b/build.gradle @@ -1,5 +1,6 @@ import java.nio.file.Files import org.opensearch.gradle.testclusters.OpenSearchCluster +import org.opensearch.gradle.testclusters.StandaloneRestIntegTestTask import org.opensearch.gradle.test.RestIntegTestTask import java.util.concurrent.Callable import java.nio.file.Paths @@ -23,6 +24,16 @@ buildscript { opensearch_no_snapshot = opensearch_build.replace("-SNAPSHOT","") System.setProperty('tests.security.manager', 'false') common_utils_version = System.getProperty("common_utils.version", opensearch_build) + + bwcVersionShort = "2.12.0" + bwcVersion = bwcVersionShort + ".0" + bwcOpenSearchFFDownload = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + bwcVersionShort + '/latest/linux/x64/tar/builds/' + + 'opensearch/plugins/opensearch-flow-framework-' + bwcVersion + '.zip' + baseName = "ffBwcCluster" + bwcFilePath = "src/test/resources/org/opensearch/flowframework/bwc/" + bwcFlowFrameworkPath = bwcFilePath + "flowframework/" + + isSameMajorVersion = opensearch_version.split("\\.")[0] == bwcVersionShort.split("\\.")[0] } repositories { @@ -78,6 +89,9 @@ dependencyLicenses.enabled = false // This requires an additional Jar not published as part of build-tools loggerUsageCheck.enabled = false thirdPartyAudit.enabled = false +// Allow test cases to be named Tests without having to be inherited from LuceneTestCase. +// see https://github.com/elastic/elasticsearch/blob/323f312bbc829a63056a79ebe45adced5099f6e6/buildSrc/src/main/java/org/elasticsearch/gradle/precommit/TestingConventionsTasks.java +testingConventions.enabled = false // No need to validate pom, as we do not upload to maven/sonatype validateNebulaPom.enabled = false @@ -192,6 +206,12 @@ jacocoTestReport { } tasks.named("check").configure { dependsOn(jacocoTestReport) } +tasks.named("yamlRestTest").configure { + filter { + excludeTestsMatching "org.opensearch.flowframework.rest.*IT" + excludeTestsMatching "org.opensearch.flowframework.bwc.*IT" + } +} // Set up integration tests task integTest(type: RestIntegTestTask) { @@ -231,6 +251,13 @@ integTest { } } + // Exclude BWC tests, run separately + if (System.getProperty("tests.rest.bwcsuite") == null) { + filter { + excludeTestsMatching "org.opensearch.flowframework.bwc.*IT" + } + } + // Exclude integration tests that require security plugin if (System.getProperty("https") == null || System.getProperty("https") == "false") { filter { @@ -425,6 +452,166 @@ task integTestRemote(type: RestIntegTestTask) { } } +2.times {i -> + testClusters { + "${baseName}$i" { + testDistribution = "ARCHIVE" + versions = [bwcVersionShort, opensearch_version] + numberOfNodes = 3 + plugin(provider(new Callable(){ + @Override + RegularFile call() throws Exception { + return new RegularFile() { + @Override + File getAsFile() { + if (new File("$project.rootDir/$bwcFilePath/flow-framework/$bwcVersion").exists()) { + project.delete(files("$project.rootDir/$bwcFilePath/flow-framework/$bwcVersion")) + } + project.mkdir bwcFlowFrameworkPath + bwcVersion + ant.get(src: bwcOpenSearchFFDownload, + dest: bwcFlowFrameworkPath + bwcVersion, + httpusecaches: false) + return fileTree(bwcFlowFrameworkPath + bwcVersion).getSingleFile() + } + } + } + })) + setting 'path.repo', "${buildDir}/cluster/shared/repo/${baseName}" + setting 'http.content_type.required', 'true' + } + } +} + +List> plugins = [ + provider(new Callable(){ + @Override + RegularFile call() throws Exception { + return new RegularFile() { + @Override + File getAsFile() { + return configurations.zipArchive.asFileTree.getSingleFile() + } + } + } + }), + provider(new Callable(){ + @Override + RegularFile call() throws Exception { + return new RegularFile() { + @Override + File getAsFile() { + return fileTree(bwcFilePath + "flow-framework/" + project.version).getSingleFile() + } + } + } + }) + ] + +// Creates 2 test clusters with 3 nodes of the old version. +2.times {i -> + task "${baseName}#oldVersionClusterTask$i"(type: StandaloneRestIntegTestTask) { + onlyIf { isSameMajorVersion || (i == 1) } + useCluster testClusters."${baseName}$i" + filter { + includeTestsMatching "org.opensearch.flowframework.bwc.*IT" + } + systemProperty 'tests.rest.bwcsuite', 'old_cluster' + systemProperty 'tests.rest.bwcsuite_round', 'old' + systemProperty 'tests.plugin_bwc_version', bwcVersion + nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}$i".allHttpSocketURI.join(",")}") + nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}$i".getName()}") + } +} + +// Upgrades one node of the old cluster to new OpenSearch version with upgraded plugin version +// This results in a mixed cluster with 2 nodes on the old version and 1 upgraded node. +// This is also used as a one third upgraded cluster for a rolling upgrade. +task "${baseName}#mixedClusterTask"(type: StandaloneRestIntegTestTask) { + onlyIf { isSameMajorVersion } + useCluster testClusters."${baseName}0" + dependsOn "${baseName}#oldVersionClusterTask0" + doFirst { + testClusters."${baseName}0".upgradeNodeAndPluginToNextVersion(plugins) + } + filter { + includeTestsMatching "org.opensearch.flowframework.bwc.*IT" + } + systemProperty 'tests.rest.bwcsuite', 'mixed_cluster' + systemProperty 'tests.rest.bwcsuite_round', 'first' + systemProperty 'tests.plugin_bwc_version', bwcVersion + nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}0".allHttpSocketURI.join(",")}") + nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}0".getName()}") +} + +// Upgrades the second node to new OpenSearch version with upgraded plugin version after the first node is upgraded. +// This results in a mixed cluster with 1 node on the old version and 2 upgraded nodes. +// This is used for rolling upgrade. +task "${baseName}#twoThirdsUpgradedClusterTask"(type: StandaloneRestIntegTestTask) { + onlyIf { isSameMajorVersion } + dependsOn "${baseName}#mixedClusterTask" + useCluster testClusters."${baseName}0" + doFirst { + testClusters."${baseName}0".upgradeNodeAndPluginToNextVersion(plugins) + } + filter { + includeTestsMatching "org.opensearch.flowframework.bwc.*IT" + } + systemProperty 'tests.rest.bwcsuite', 'mixed_cluster' + systemProperty 'tests.rest.bwcsuite_round', 'second' + systemProperty 'tests.plugin_bwc_version', bwcVersion + nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}0".allHttpSocketURI.join(",")}") + nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}0".getName()}") +} + +// Upgrades the third node to new OpenSearch version with upgraded plugin version after the second node is upgraded. +// This results in a fully upgraded cluster. +// This is used for rolling upgrade. +task "${baseName}#rollingUpgradeClusterTask"(type: StandaloneRestIntegTestTask) { + onlyIf { isSameMajorVersion } + dependsOn "${baseName}#twoThirdsUpgradedClusterTask" + useCluster testClusters."${baseName}0" + doFirst { + testClusters."${baseName}0".upgradeNodeAndPluginToNextVersion(plugins) + } + filter { + includeTestsMatching "org.opensearch.flowframework.bwc.*IT" + } + mustRunAfter "${baseName}#mixedClusterTask" + systemProperty 'tests.rest.bwcsuite', 'mixed_cluster' + systemProperty 'tests.rest.bwcsuite_round', 'third' + systemProperty 'tests.plugin_bwc_version', bwcVersion + nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}0".allHttpSocketURI.join(",")}") + nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}0".getName()}") +} + +// Upgrades all the nodes of the old cluster to new OpenSearch version with upgraded plugin version +// at the same time resulting in a fully upgraded cluster. +task "${baseName}#fullRestartClusterTask"(type: StandaloneRestIntegTestTask) { + dependsOn "${baseName}#oldVersionClusterTask1" + useCluster testClusters."${baseName}1" + doFirst { + testClusters."${baseName}1".upgradeAllNodesAndPluginsToNextVersion(plugins) + } + filter { + includeTestsMatching "org.opensearch.flowframework.bwc.*IT" + } + systemProperty 'tests.rest.bwcsuite', 'upgraded_cluster' + systemProperty 'tests.plugin_bwc_version', bwcVersion + nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}1".allHttpSocketURI.join(",")}") + nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}1".getName()}") +} + +// A bwc test suite which runs all the bwc tasks combined. +task bwcTestSuite(type: StandaloneRestIntegTestTask) { + filter { + excludeTestsMatching '**.*Test*' + excludeTestsMatching '**.*IT*' + setFailOnNoMatchingTests(false) + } + dependsOn tasks.named("${baseName}#mixedClusterTask") + dependsOn tasks.named("${baseName}#rollingUpgradeClusterTask") + dependsOn tasks.named("${baseName}#fullRestartClusterTask") +} // test retry configuration allprojects { @@ -438,6 +625,11 @@ allprojects { } } } + // Needed for Gradle 9.0 + tasks.withType(StandaloneRestIntegTestTask).configureEach { + testClassesDirs = sourceSets.test.output.classesDirs + classpath = sourceSets.test.runtimeClasspath + } } // Automatically sets up the integration test cluster locally diff --git a/src/main/java/org/opensearch/flowframework/model/Template.java b/src/main/java/org/opensearch/flowframework/model/Template.java index 8c1732a12..54a1eb65d 100644 --- a/src/main/java/org/opensearch/flowframework/model/Template.java +++ b/src/main/java/org/opensearch/flowframework/model/Template.java @@ -101,8 +101,8 @@ public Template( this.workflows = Map.copyOf(workflows); this.uiMetadata = uiMetadata; this.user = user; - this.createdTime = createdTime == null ? Instant.now() : createdTime; - this.lastUpdatedTime = (lastUpdatedTime == null || lastUpdatedTime.isBefore(this.createdTime)) ? this.createdTime : lastUpdatedTime; + this.createdTime = createdTime; + this.lastUpdatedTime = lastUpdatedTime; this.lastProvisionedTime = lastProvisionedTime; } diff --git a/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java index f987d7948..cd4b06422 100644 --- a/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java @@ -96,6 +96,7 @@ public CreateWorkflowTransportAction( protected void doExecute(Task task, WorkflowRequest request, ActionListener listener) { User user = getUserContext(client); + Instant creationTime = Instant.now(); Template templateWithUser = new Template( request.getTemplate().name(), request.getTemplate().description(), @@ -105,9 +106,9 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener> responseMap = (Map>) getAsMap(uri).get("nodes"); + for (Map response : responseMap.values()) { + List> plugins = (List>) response.get("plugins"); + Set pluginNames = plugins.stream().map(map -> map.get("name")).collect(Collectors.toSet()); + String workflowId = createNoopTemplate(); + Template t = getTemplate(workflowId); + switch (CLUSTER_TYPE) { + case OLD: + assertTrue(pluginNames.contains("opensearch-flow-framework")); + // mapping for 2.12 does not include time stamps + assertNull(t.createdTime()); + assertNull(t.lastUpdatedTime()); + assertNull(t.lastProvisionedTime()); + break; + case MIXED: + assertTrue(pluginNames.contains("opensearch-flow-framework")); + // Time stamps may or may not be null depending on whether index has been accessed by new version node + // So just test that the template parses + assertNull(t.lastProvisionedTime()); + break; + case UPGRADED: + assertTrue(pluginNames.contains("opensearch-flow-framework")); + // mapping for 2.13+ includes time stamps + assertNotNull(t.createdTime()); + assertEquals(t.createdTime(), t.lastUpdatedTime()); + assertNull(t.lastProvisionedTime()); + break; + } + break; + } + } + + private String getUri() { + switch (CLUSTER_TYPE) { + case OLD: + return "_nodes/" + CLUSTER_NAME + "-0/plugins"; + case MIXED: + String round = System.getProperty("tests.rest.bwcsuite_round"); + if (round.equals("second")) { + return "_nodes/" + CLUSTER_NAME + "-1/plugins"; + } else if (round.equals("third")) { + return "_nodes/" + CLUSTER_NAME + "-2/plugins"; + } else { + return "_nodes/" + CLUSTER_NAME + "-0/plugins"; + } + case UPGRADED: + return "_nodes/plugins"; + default: + throw new AssertionError("unknown cluster type: " + CLUSTER_TYPE); + } + } + + private String createNoopTemplate() throws IOException, ParseException { + Response response = TestHelpers.makeRequest( + client(), + "POST", + "_plugins/_flow_framework/workflow", + null, + "{\"name\":\"test\", \"workflows\":{\"provision\": {\"nodes\": [{\"id\":\"test-step\", \"type\":\"noop\"}]}}}", + List.of(new BasicHeader(HttpHeaders.USER_AGENT, "")) + ); + assertEquals(RestStatus.CREATED.getStatus(), response.getStatusLine().getStatusCode()); + + Map responseMap = entityAsMap(response); + String workflowId = (String) responseMap.get(WORKFLOW_ID); + assertNotNull(workflowId); + return workflowId; + } + + private Template getTemplate(String workflowId) throws IOException, ParseException { + Response response = TestHelpers.makeRequest( + client(), + "GET", + "_plugins/_flow_framework/workflow/" + workflowId, + null, + "", + List.of(new BasicHeader(HttpHeaders.USER_AGENT, "")) + ); + assertEquals(RestStatus.OK.getStatus(), response.getStatusLine().getStatusCode()); + + String body = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8); + return Template.parse(body); + } +} diff --git a/src/test/java/org/opensearch/flowframework/model/TemplateTests.java b/src/test/java/org/opensearch/flowframework/model/TemplateTests.java index d39455fd8..63dbf31f6 100644 --- a/src/test/java/org/opensearch/flowframework/model/TemplateTests.java +++ b/src/test/java/org/opensearch/flowframework/model/TemplateTests.java @@ -41,6 +41,7 @@ public void testTemplate() throws IOException { Workflow workflow = new Workflow(Map.of("key", "value"), nodes, edges); Map uiMetadata = null; + Instant now = Instant.now(); Template template = new Template( "test", "a test template", @@ -50,8 +51,8 @@ public void testTemplate() throws IOException { Map.of("workflow", workflow), uiMetadata, null, - null, - null, + now, + now, null ); @@ -63,9 +64,8 @@ public void testTemplate() throws IOException { assertEquals(uiMetadata, template.getUiMetadata()); Workflow wf = template.workflows().get("workflow"); assertNotNull(wf); - assertTrue(template.createdTime().isAfter(Instant.now().minusSeconds(10))); - assertFalse(template.createdTime().isAfter(Instant.now())); - assertEquals(template.createdTime(), template.lastUpdatedTime()); + assertEquals(now, template.createdTime()); + assertEquals(now, template.lastUpdatedTime()); assertNull(template.lastProvisionedTime()); assertEquals("Workflow [userParams={key=value}, nodes=[A, B], edges=[A->B]]", wf.toString()); @@ -80,9 +80,8 @@ public void testTemplate() throws IOException { assertEquals(uiMetadata, templateX.getUiMetadata()); Workflow wfX = templateX.workflows().get("workflow"); assertNotNull(wfX); - assertTrue(template.createdTime().isAfter(Instant.now().minusSeconds(10))); - assertFalse(template.createdTime().isAfter(Instant.now())); - assertEquals(template.createdTime(), template.lastUpdatedTime()); + assertEquals(now, template.createdTime()); + assertEquals(now, template.lastUpdatedTime()); assertNull(template.lastProvisionedTime()); assertEquals("Workflow [userParams={key=value}, nodes=[A, B], edges=[A->B]]", wfX.toString()); } diff --git a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java index d6c3ffa65..67e2041d5 100644 --- a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java +++ b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java @@ -8,6 +8,7 @@ */ package org.opensearch.flowframework.rest; +import org.apache.hc.core5.http.io.entity.EntityUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.action.search.SearchResponse; @@ -27,6 +28,8 @@ import org.junit.Before; import org.junit.ComparisonFailure; +import java.nio.charset.StandardCharsets; +import java.time.Instant; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -313,4 +316,48 @@ public void testCreateAndProvisionAgentFrameworkWorkflow() throws Exception { assertEquals(RestStatus.OK, TestHelpers.restStatus(deleteResponse)); } + public void testTimestamps() throws Exception { + Template noopTemplate = TestHelpers.createTemplateFromFile("noop.json"); + // Create the template, should have created and updated matching + Response response = createWorkflow(client(), noopTemplate); + Map responseMap = entityAsMap(response); + String workflowId = (String) responseMap.get(WORKFLOW_ID); + assertNotNull(workflowId); + + response = getWorkflow(client(), workflowId); + assertEquals(RestStatus.OK.getStatus(), response.getStatusLine().getStatusCode()); + Template t = Template.parse(EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8)); + Instant createdTime = t.createdTime(); + Instant lastUpdatedTime = t.lastUpdatedTime(); + assertNotNull(createdTime); + assertEquals(createdTime, lastUpdatedTime); + assertNull(t.lastProvisionedTime()); + + // Update the template, should have created same as before and updated newer + response = updateWorkflow(client(), workflowId, noopTemplate); + assertEquals(RestStatus.CREATED.getStatus(), response.getStatusLine().getStatusCode()); + + response = getWorkflow(client(), workflowId); + assertEquals(RestStatus.OK.getStatus(), response.getStatusLine().getStatusCode()); + t = Template.parse(EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8)); + assertEquals(createdTime, t.createdTime()); + assertTrue(t.lastUpdatedTime().isAfter(lastUpdatedTime)); + lastUpdatedTime = t.lastUpdatedTime(); + assertNull(t.lastProvisionedTime()); + + // Provision the template, should have created and updated same as before and provisioned newer + response = provisionWorkflow(client(), workflowId); + assertEquals(RestStatus.OK.getStatus(), response.getStatusLine().getStatusCode()); + + response = getWorkflow(client(), workflowId); + assertEquals(RestStatus.OK.getStatus(), response.getStatusLine().getStatusCode()); + t = Template.parse(EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8)); + assertEquals(createdTime, t.createdTime()); + assertEquals(lastUpdatedTime, t.lastUpdatedTime()); + assertTrue(t.lastProvisionedTime().isAfter(lastUpdatedTime)); + + // Clean up + response = deleteWorkflow(client(), workflowId); + assertEquals(RestStatus.OK.getStatus(), response.getStatusLine().getStatusCode()); + } } diff --git a/src/test/resources/template/noop.json b/src/test/resources/template/noop.json new file mode 100644 index 000000000..c0675151c --- /dev/null +++ b/src/test/resources/template/noop.json @@ -0,0 +1,13 @@ +{ + "name": "noop", + "workflows": { + "provision": { + "nodes": [ + { + "id": "no-op", + "type": "noop" + } + ] + } + } +} From c9972819cbce8a20cb787ba5a6a9906a5bc62083 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Thu, 7 Mar 2024 18:54:59 -0800 Subject: [PATCH 8/9] Build a Template from an existing one Signed-off-by: Daniel Widdis --- .../flowframework/common/CommonValue.java | 2 +- .../flowframework/model/Template.java | 29 +++++++++++++++++-- .../CreateWorkflowTransportAction.java | 10 +------ .../ProvisionWorkflowTransportAction.java | 13 +-------- .../flowframework/util/EncryptorUtils.java | 26 ++--------------- .../rest/FlowFrameworkRestApiIT.java | 20 ++----------- 6 files changed, 34 insertions(+), 66 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/common/CommonValue.java b/src/main/java/org/opensearch/flowframework/common/CommonValue.java index c70a70426..ec88a3778 100644 --- a/src/main/java/org/opensearch/flowframework/common/CommonValue.java +++ b/src/main/java/org/opensearch/flowframework/common/CommonValue.java @@ -52,7 +52,7 @@ private CommonValue() {} public static final String CREATED_TIME = "created_time"; /** The last updated time field */ public static final String LAST_UPDATED_TIME_FIELD = "last_updated_time"; - /** The last updated time field */ + /** The last provisioned time field */ public static final String LAST_PROVISIONED_TIME_FIELD = "last_provisioned_time"; /* diff --git a/src/main/java/org/opensearch/flowframework/model/Template.java b/src/main/java/org/opensearch/flowframework/model/Template.java index 54a1eb65d..71632d003 100644 --- a/src/main/java/org/opensearch/flowframework/model/Template.java +++ b/src/main/java/org/opensearch/flowframework/model/Template.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.time.Instant; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -114,8 +115,8 @@ public static class Builder { private String description = ""; private String useCase = ""; private Version templateVersion = null; - private List compatibilityVersion = new ArrayList<>(); - private Map workflows = new HashMap<>(); + private List compatibilityVersion = Collections.emptyList(); + private Map workflows = Collections.emptyMap(); private Map uiMetadata = null; private User user = null; private Instant createdTime = null; @@ -127,6 +128,30 @@ public static class Builder { */ public Builder() {} + /** + * Construct a Builder from an existing template + * @param t The existing template to copy + */ + public Builder(Template t) { + this.name = t.name(); + this.description = t.description(); + this.useCase = t.useCase(); + this.templateVersion = t.templateVersion; + if (t.compatibilityVersion() != null) { + this.compatibilityVersion = List.copyOf(t.compatibilityVersion()); + } + if (t.workflows() != null) { + this.workflows = Map.copyOf(t.workflows()); + } + if (t.getUiMetadata() != null) { + this.uiMetadata = Map.copyOf(t.getUiMetadata()); + } + this.user = t.getUser(); + this.createdTime = t.createdTime(); + this.lastUpdatedTime = t.lastUpdatedTime(); + this.lastProvisionedTime = t.lastProvisionedTime(); + } + /** * Builder method for adding template name * @param name template name diff --git a/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java index cd4b06422..677278a93 100644 --- a/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java @@ -240,15 +240,7 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener createWorkflowValidation(client(), cyclicalTemplate)); From c600d9e4ae9b12dd68aac8107b4f6ee0100c4327 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Fri, 8 Mar 2024 14:31:26 -0800 Subject: [PATCH 9/9] Rename param, add comments Signed-off-by: Daniel Widdis --- .../indices/FlowFrameworkIndicesHandler.java | 11 ++++++++--- .../bwc/FlowFrameworkBackwardsCompatibilityIT.java | 8 ++++---- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java index 741ce4d4f..0547c875a 100644 --- a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java +++ b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java @@ -403,9 +403,14 @@ public void updateTemplateInGlobalContext(String documentId, Template template, * @param documentId the document Id * @param template the use-case template * @param listener action listener - * @param force if set true, ignores the requirement that the provisioning is not started + * @param ignoreNotStartedCheck if set true, ignores the requirement that the provisioning is not started */ - public void updateTemplateInGlobalContext(String documentId, Template template, ActionListener listener, boolean force) { + public void updateTemplateInGlobalContext( + String documentId, + Template template, + ActionListener listener, + boolean ignoreNotStartedCheck + ) { if (!doesIndexExist(GLOBAL_CONTEXT_INDEX)) { String errorMessage = "Failed to update template for workflow_id : " + documentId + ", global_context index does not exist."; logger.error(errorMessage); @@ -415,7 +420,7 @@ public void updateTemplateInGlobalContext(String documentId, Template template, doesTemplateExist(documentId, templateExists -> { if (templateExists) { isWorkflowNotStarted(documentId, workflowIsNotStarted -> { - if (workflowIsNotStarted || force) { + if (workflowIsNotStarted || ignoreNotStartedCheck) { IndexRequest request = new IndexRequest(GLOBAL_CONTEXT_INDEX).id(documentId); try ( XContentBuilder builder = XContentFactory.jsonBuilder(); diff --git a/src/test/java/org/opensearch/flowframework/bwc/FlowFrameworkBackwardsCompatibilityIT.java b/src/test/java/org/opensearch/flowframework/bwc/FlowFrameworkBackwardsCompatibilityIT.java index 9e438714b..108252465 100644 --- a/src/test/java/org/opensearch/flowframework/bwc/FlowFrameworkBackwardsCompatibilityIT.java +++ b/src/test/java/org/opensearch/flowframework/bwc/FlowFrameworkBackwardsCompatibilityIT.java @@ -102,29 +102,29 @@ public static ClusterType parse(String value) { @SuppressWarnings("unchecked") public void testBackwardsCompatibility() throws Exception { + // This iteration of nodes is only to get plugins installed on each node. We don't currently use its functionality but in case we + // ever need version-based dependencies in future BWC tests it will be needed. It's directly copied from similar implementations + // in other plugins. String uri = getUri(); Map> responseMap = (Map>) getAsMap(uri).get("nodes"); for (Map response : responseMap.values()) { List> plugins = (List>) response.get("plugins"); Set pluginNames = plugins.stream().map(map -> map.get("name")).collect(Collectors.toSet()); + assertTrue(pluginNames.contains("opensearch-flow-framework")); String workflowId = createNoopTemplate(); Template t = getTemplate(workflowId); switch (CLUSTER_TYPE) { case OLD: - assertTrue(pluginNames.contains("opensearch-flow-framework")); // mapping for 2.12 does not include time stamps assertNull(t.createdTime()); assertNull(t.lastUpdatedTime()); assertNull(t.lastProvisionedTime()); break; case MIXED: - assertTrue(pluginNames.contains("opensearch-flow-framework")); // Time stamps may or may not be null depending on whether index has been accessed by new version node - // So just test that the template parses assertNull(t.lastProvisionedTime()); break; case UPGRADED: - assertTrue(pluginNames.contains("opensearch-flow-framework")); // mapping for 2.13+ includes time stamps assertNotNull(t.createdTime()); assertEquals(t.createdTime(), t.lastUpdatedTime());