diff --git a/data-index/data-index-common/src/main/java/org/kie/kogito/index/CommonUtils.java b/data-index/data-index-common/src/main/java/org/kie/kogito/index/CommonUtils.java index 3bc4b724db..047080e35a 100644 --- a/data-index/data-index-common/src/main/java/org/kie/kogito/index/CommonUtils.java +++ b/data-index/data-index-common/src/main/java/org/kie/kogito/index/CommonUtils.java @@ -20,13 +20,38 @@ import java.util.Set; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class CommonUtils { public static final int ERROR_STATE = 5; private static final Set finalStates = Set.of("Completed", "Aborted"); + private static final Logger logger = LoggerFactory.getLogger(CommonUtils.class); public static boolean isTaskCompleted(String status) { return finalStates.contains(status); } + public static String getServiceUrl(String endpoint, String processId) { + logger.debug("Process endpoint {}", endpoint); + if (endpoint == null) { + return null; + } + if (endpoint.startsWith("/")) { + logger.warn("Process '{}' endpoint '{}', does not contain full URL, please review the kogito.service.url system property to point the public URL for this runtime.", + processId, endpoint); + } + String context = getContext(processId); + logger.debug("Process context {}", context); + if (context.equals(endpoint) || endpoint.equals("/" + context)) { + return null; + } else { + return endpoint.contains("/" + context) ? endpoint.substring(0, endpoint.lastIndexOf("/" + context)) : null; + } + } + + private static String getContext(String processId) { + return processId != null && processId.contains(".") ? processId.substring(processId.lastIndexOf('.') + 1) : processId; + } } diff --git a/data-index/data-index-common/src/main/java/org/kie/kogito/index/api/ExecuteArgs.java b/data-index/data-index-common/src/main/java/org/kie/kogito/index/api/ExecuteArgs.java new file mode 100644 index 0000000000..6049104c2b --- /dev/null +++ b/data-index/data-index-common/src/main/java/org/kie/kogito/index/api/ExecuteArgs.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.index.api; + +import com.fasterxml.jackson.databind.JsonNode; + +public record ExecuteArgs(JsonNode input, String businessKey, String referenceId) { + + public static ExecuteArgs of(JsonNode input) { + return new Builder().withInput(input).build(); + } + + public static Builder builder(JsonNode modelInput) { + return new Builder(); + } + + public static class Builder { + + private JsonNode input; + private String businessKey; + private String referenceId; + + private Builder() { + } + + public Builder withInput(JsonNode input) { + this.input = input; + return this; + } + + public Builder withBusinessKey(String businessKey) { + this.businessKey = businessKey; + return this; + } + + public Builder withReferenceId(String referenceId) { + this.referenceId = referenceId; + return this; + } + + public ExecuteArgs build() { + return new ExecuteArgs(input, businessKey, referenceId); + } + } +} diff --git a/data-index/data-index-common/src/main/java/org/kie/kogito/index/api/KogitoRuntimeClient.java b/data-index/data-index-common/src/main/java/org/kie/kogito/index/api/KogitoRuntimeClient.java index 50b94365ee..c6e42dd9d4 100644 --- a/data-index/data-index-common/src/main/java/org/kie/kogito/index/api/KogitoRuntimeClient.java +++ b/data-index/data-index-common/src/main/java/org/kie/kogito/index/api/KogitoRuntimeClient.java @@ -24,11 +24,14 @@ import org.kie.kogito.index.model.Job; import org.kie.kogito.index.model.Node; +import org.kie.kogito.index.model.ProcessDefinition; import org.kie.kogito.index.model.ProcessInstance; import org.kie.kogito.index.model.UserTaskInstance; public interface KogitoRuntimeClient { + CompletableFuture executeProcessIntance(ProcessDefinition definition, ExecuteArgs args); + CompletableFuture abortProcessInstance(String serviceURL, ProcessInstance processInstance); CompletableFuture retryProcessInstance(String serviceURL, ProcessInstance processInstance); diff --git a/data-index/data-index-graphql/src/main/java/org/kie/kogito/index/graphql/AbstractGraphQLSchemaManager.java b/data-index/data-index-graphql/src/main/java/org/kie/kogito/index/graphql/AbstractGraphQLSchemaManager.java index 6dd81262b1..960496d4da 100644 --- a/data-index/data-index-graphql/src/main/java/org/kie/kogito/index/graphql/AbstractGraphQLSchemaManager.java +++ b/data-index/data-index-graphql/src/main/java/org/kie/kogito/index/graphql/AbstractGraphQLSchemaManager.java @@ -28,6 +28,7 @@ import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; +import org.kie.kogito.index.CommonUtils; import org.kie.kogito.index.api.KogitoRuntimeClient; import org.kie.kogito.index.graphql.query.GraphQLQueryOrderByParser; import org.kie.kogito.index.graphql.query.GraphQLQueryParserRegistry; @@ -142,25 +143,7 @@ public ProcessDefinition getProcessDefinition(DataFetchingEnvironment env) { } protected String getServiceUrl(String endpoint, String processId) { - LOGGER.debug("Process endpoint {}", endpoint); - if (endpoint == null) { - return null; - } - if (endpoint.startsWith("/")) { - LOGGER.warn("Process '{}' endpoint '{}', does not contain full URL, please review the kogito.service.url system property to point the public URL for this runtime.", - processId, endpoint); - } - String context = getContext(processId); - LOGGER.debug("Process context {}", context); - if (context.equals(endpoint) || endpoint.equals("/" + context)) { - return null; - } else { - return endpoint.contains("/" + context) ? endpoint.substring(0, endpoint.lastIndexOf("/" + context)) : null; - } - } - - private String getContext(String processId) { - return processId != null && processId.contains(".") ? processId.substring(processId.lastIndexOf('.') + 1) : processId; + return CommonUtils.getServiceUrl(endpoint, processId); } protected Collection getChildProcessInstancesValues(DataFetchingEnvironment env) { diff --git a/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/api/KogitoRuntimeClientImpl.java b/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/api/KogitoRuntimeClientImpl.java index badbe77b24..500d7ad0d9 100644 --- a/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/api/KogitoRuntimeClientImpl.java +++ b/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/api/KogitoRuntimeClientImpl.java @@ -22,9 +22,12 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; +import org.kie.kogito.index.CommonUtils; +import org.kie.kogito.index.api.ExecuteArgs; import org.kie.kogito.index.api.KogitoRuntimeClient; import org.kie.kogito.index.api.KogitoRuntimeCommonClient; import org.kie.kogito.index.model.Node; +import org.kie.kogito.index.model.ProcessDefinition; import org.kie.kogito.index.model.ProcessInstance; import org.kie.kogito.index.model.UserTaskInstance; import org.kie.kogito.index.service.DataIndexServiceException; @@ -72,6 +75,17 @@ class KogitoRuntimeClientImpl extends KogitoRuntimeCommonClient implements Kogit private static final Logger LOGGER = LoggerFactory.getLogger(KogitoRuntimeClientImpl.class); + @Override + public CompletableFuture executeProcessIntance(ProcessDefinition definition, ExecuteArgs args) { + CompletableFuture future = new CompletableFuture<>(); + HttpRequest request = getWebClient(CommonUtils.getServiceUrl(definition.getEndpoint(), definition.getId())).post("/" + definition.getId()); + if (args.businessKey() != null) { + request.addQueryParam("businessKey", args.businessKey()); + } + request.sendJson(args.input(), res -> asyncHttpResponseTreatment(res, future, "START ProcessInstance of type " + definition.getId())); + return future; + } + @Override public CompletableFuture abortProcessInstance(String serviceURL, ProcessInstance processInstance) { String requestURI = format(ABORT_PROCESS_INSTANCE_PATH, processInstance.getProcessId(), processInstance.getId()); @@ -282,5 +296,4 @@ protected void send(String logMessage, Class type, CompletableFuture future, Asy future.completeExceptionally(new DataIndexServiceException(getErrorMessage(logMessage, res.result()), res.cause())); } } - } diff --git a/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/graphql/GraphQLMutationsProvider.java b/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/graphql/GraphQLMutationsProvider.java new file mode 100644 index 0000000000..cda1b2ce7f --- /dev/null +++ b/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/graphql/GraphQLMutationsProvider.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.index.service.graphql; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +import org.kie.kogito.index.graphql.AbstractGraphQLSchemaManager; + +import graphql.schema.DataFetcher; + +public interface GraphQLMutationsProvider { + Map>> mutations(AbstractGraphQLSchemaManager schemaManager); +} diff --git a/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/graphql/GraphQLSchemaManagerImpl.java b/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/graphql/GraphQLSchemaManagerImpl.java index 2acaf539ab..7d7de3de9b 100644 --- a/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/graphql/GraphQLSchemaManagerImpl.java +++ b/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/graphql/GraphQLSchemaManagerImpl.java @@ -21,8 +21,12 @@ import java.io.IOException; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.ServiceLoader; +import java.util.ServiceLoader.Provider; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.kie.kogito.index.graphql.AbstractGraphQLSchemaManager; import org.kie.kogito.index.graphql.query.GraphQLQueryParserRegistry; @@ -101,6 +105,8 @@ public GraphQLSchema createSchema() { builder.dataFetcher("UserTaskInstanceCommentDelete", this::deleteUserTaskComment); builder.dataFetcher("UserTaskInstanceAttachmentUpdate", this::updateUserTaskAttachment); builder.dataFetcher("UserTaskInstanceAttachmentDelete", this::deleteUserTaskAttachment); + ServiceLoader.load(GraphQLMutationsProvider.class).stream().map(Provider::get).map(m -> m.mutations(this)).flatMap(map -> map.entrySet().stream()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (v1, v2) -> v2)).forEach(builder::dataFetcher); return builder; }) .type("ProcessDefinition", builder -> { diff --git a/data-index/data-index-service/data-index-service-shared-output/pom.xml b/data-index/data-index-service/data-index-service-shared-output/pom.xml new file mode 100644 index 0000000000..a97483374a --- /dev/null +++ b/data-index/data-index-service/data-index-service-shared-output/pom.xml @@ -0,0 +1,38 @@ + + + 4.0.0 + + org.kie.kogito + data-index-service + 999-SNAPSHOT + + data-index-service-shared-output + + + org.kie.kogito + data-index-service-common + + + + org.kie.kogito.index.service.mutations + + \ No newline at end of file diff --git a/data-index/data-index-service/data-index-service-shared-output/src/main/java/org/kie/kogito/index/service/mutations/OutputGraphQLMutationProvider.java b/data-index/data-index-service/data-index-service-shared-output/src/main/java/org/kie/kogito/index/service/mutations/OutputGraphQLMutationProvider.java new file mode 100644 index 0000000000..b865775cba --- /dev/null +++ b/data-index/data-index-service/data-index-service-shared-output/src/main/java/org/kie/kogito/index/service/mutations/OutputGraphQLMutationProvider.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.index.service.mutations; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +import org.kie.kogito.index.api.ExecuteArgs; +import org.kie.kogito.index.graphql.AbstractGraphQLSchemaManager; +import org.kie.kogito.index.model.ProcessDefinition; +import org.kie.kogito.index.model.ProcessDefinitionKey; +import org.kie.kogito.index.model.ProcessInstance; +import org.kie.kogito.index.service.graphql.GraphQLMutationsProvider; +import org.kie.kogito.index.storage.DataIndexStorageService; +import org.kie.kogito.jackson.utils.MergeUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.JsonNode; + +import graphql.schema.DataFetcher; +import graphql.schema.DataFetchingEnvironment; + +public class OutputGraphQLMutationProvider implements GraphQLMutationsProvider { + + private static Logger logger = LoggerFactory.getLogger(OutputGraphQLMutationProvider.class); + private static final String COMPLETED_INSTANCE_ID = "completedInstanceId"; + + @Override + public Map>> mutations(AbstractGraphQLSchemaManager schemaManager) { + return Map.of("sharedOutput", env -> sharedOutput(schemaManager, env)); + } + + private CompletableFuture sharedOutput(AbstractGraphQLSchemaManager schemaManager, DataFetchingEnvironment env) { + DataIndexStorageService cacheService = schemaManager.getCacheService(); + ProcessDefinitionKey key = new ProcessDefinitionKey(mandatoryArgument(env, "processId"), mandatoryArgument(env, "processVersion")); + ProcessDefinition processDefinition = cacheService.getProcessDefinitionStorage().get(key); + if (processDefinition == null) { + throw new IllegalArgumentException(key + "does not correspond to any existing process definition"); + } + JsonNode input = env.getArgument("input"); + String completedInstanceId = env.getArgument(COMPLETED_INSTANCE_ID); + if (completedInstanceId != null) { + ProcessInstance processInstance = cacheService.getProcessInstanceStorage().get(completedInstanceId); + if (processInstance != null) { + input = MergeUtils.merge(input, processInstance.getVariables()); + } else { + logger.warn("Completed Instance Id {} cannot be found, using user input as it is", completedInstanceId); + } + } else { + logger.warn("Missing " + COMPLETED_INSTANCE_ID + " parameter, using user input as it is"); + } + return schemaManager.getDataIndexApiExecutor().executeProcessIntance(processDefinition, ExecuteArgs.of(input)); + } + + private static T mandatoryArgument(DataFetchingEnvironment env, String name) { + T result = env.getArgument(name); + if (result == null) { + throw new IllegalArgumentException("Missing " + name + " mandatory parameter"); + } + return result; + } +} diff --git a/data-index/data-index-service/data-index-service-shared-output/src/main/resources/META-INF/services/org.kie.kogito.index.service.graphql.GraphQLMutationsProvider b/data-index/data-index-service/data-index-service-shared-output/src/main/resources/META-INF/services/org.kie.kogito.index.service.graphql.GraphQLMutationsProvider new file mode 100644 index 0000000000..a06562efb8 --- /dev/null +++ b/data-index/data-index-service/data-index-service-shared-output/src/main/resources/META-INF/services/org.kie.kogito.index.service.graphql.GraphQLMutationsProvider @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +org.kie.kogito.index.service.mutations.OutputGraphQLMutationProvider \ No newline at end of file diff --git a/data-index/data-index-service/pom.xml b/data-index/data-index-service/pom.xml index cdc2c04d47..236e069267 100644 --- a/data-index/data-index-service/pom.xml +++ b/data-index/data-index-service/pom.xml @@ -37,6 +37,7 @@ data-index-service-postgresql data-index-service-infinispan data-index-service-mongodb + data-index-service-shared-output diff --git a/data-index/kogito-addons-quarkus-data-index-persistence/kogito-addons-quarkus-data-index-persistence-common/runtime/src/main/java/org/kie/kogito/index/addon/api/KogitoAddonRuntimeClientImpl.java b/data-index/kogito-addons-quarkus-data-index-persistence/kogito-addons-quarkus-data-index-persistence-common/runtime/src/main/java/org/kie/kogito/index/addon/api/KogitoAddonRuntimeClientImpl.java index 1f9b64d91d..34ad5a868a 100644 --- a/data-index/kogito-addons-quarkus-data-index-persistence/kogito-addons-quarkus-data-index-persistence-common/runtime/src/main/java/org/kie/kogito/index/addon/api/KogitoAddonRuntimeClientImpl.java +++ b/data-index/kogito-addons-quarkus-data-index-persistence/kogito-addons-quarkus-data-index-persistence-common/runtime/src/main/java/org/kie/kogito/index/addon/api/KogitoAddonRuntimeClientImpl.java @@ -29,13 +29,17 @@ import org.eclipse.microprofile.context.ManagedExecutor; import org.kie.kogito.Application; +import org.kie.kogito.Model; +import org.kie.kogito.index.api.ExecuteArgs; import org.kie.kogito.index.api.KogitoRuntimeClient; import org.kie.kogito.index.api.KogitoRuntimeCommonClient; import org.kie.kogito.index.model.Node; +import org.kie.kogito.index.model.ProcessDefinition; import org.kie.kogito.index.model.ProcessInstance; import org.kie.kogito.index.model.UserTaskInstance; import org.kie.kogito.index.service.DataIndexServiceException; import org.kie.kogito.internal.process.runtime.KogitoWorkflowProcess; +import org.kie.kogito.jackson.utils.JsonObjectUtils; import org.kie.kogito.process.Process; import org.kie.kogito.process.ProcessInstanceExecutionException; import org.kie.kogito.process.Processes; @@ -273,4 +277,15 @@ private String executeOnProcessInstance(String processId, String processInstance } }); } + + @Override + public CompletableFuture executeProcessIntance(ProcessDefinition definition, ExecuteArgs args) { + Process process = processes != null ? processes.processById(definition.getId()) : null; + if (process == null) { + throw new DataIndexServiceException(String.format("Unable to find Process with id %s to perform the operation requested", definition.getId())); + } + Model m = (Model) process.createModel(); + m.update(JsonObjectUtils.convertValue(args.input(), Map.class)); + return CompletableFuture.completedFuture(process.createInstance(m).id()); + } } diff --git a/kogito-apps-bom/pom.xml b/kogito-apps-bom/pom.xml index 774cd46967..37e90676fa 100644 --- a/kogito-apps-bom/pom.xml +++ b/kogito-apps-bom/pom.xml @@ -250,6 +250,11 @@ data-index-service ${project.version} + + org.kie.kogito + data-index-service-shared-output + ${project.version} + org.kie.kogito data-index-storage-api