Skip to content

Commit

Permalink
[Fix apache#2165] Adding custom mutations
Browse files Browse the repository at this point in the history
  • Loading branch information
fjtirado committed Dec 17, 2024
1 parent 846de1c commit 4e2682b
Show file tree
Hide file tree
Showing 12 changed files with 273 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,38 @@

import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommonUtils {

public static final int ERROR_STATE = 5;
private static final Set<String> finalStates = Set.of("Completed", "Aborted");
private static final Logger logger = LoggerFactory.getLogger(CommonUtils.class);

public static boolean isTaskCompleted(String status) {
return finalStates.contains(status);
}

public static String getServiceUrl(String endpoint, String processId) {
logger.debug("Process endpoint {}", endpoint);
if (endpoint == null) {
return null;
}
if (endpoint.startsWith("/")) {
logger.warn("Process '{}' endpoint '{}', does not contain full URL, please review the kogito.service.url system property to point the public URL for this runtime.",
processId, endpoint);
}
String context = getContext(processId);
logger.debug("Process context {}", context);
if (context.equals(endpoint) || endpoint.equals("/" + context)) {
return null;
} else {
return endpoint.contains("/" + context) ? endpoint.substring(0, endpoint.lastIndexOf("/" + context)) : null;
}
}

private static String getContext(String processId) {
return processId != null && processId.contains(".") ? processId.substring(processId.lastIndexOf('.') + 1) : processId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.kie.kogito.index.api;

import com.fasterxml.jackson.databind.JsonNode;

public record ExecuteArgs(JsonNode input, String businessKey, String referenceId) {

public static ExecuteArgs of(JsonNode input) {
return new Builder().withInput(input).build();
}

public static Builder builder(JsonNode modelInput) {
return new Builder();
}

public static class Builder {

private JsonNode input;
private String businessKey;
private String referenceId;

private Builder() {
}

public Builder withInput(JsonNode input) {
this.input = input;
return this;
}

public Builder withBusinessKey(String businessKey) {
this.businessKey = businessKey;
return this;
}

public Builder withReferenceId(String referenceId) {
this.referenceId = referenceId;
return this;
}

public ExecuteArgs build() {
return new ExecuteArgs(input, businessKey, referenceId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@

import org.kie.kogito.index.model.Job;
import org.kie.kogito.index.model.Node;
import org.kie.kogito.index.model.ProcessDefinition;
import org.kie.kogito.index.model.ProcessInstance;
import org.kie.kogito.index.model.UserTaskInstance;

public interface KogitoRuntimeClient {

CompletableFuture<String> executeProcessIntance(ProcessDefinition definition, ExecuteArgs args);

CompletableFuture<String> abortProcessInstance(String serviceURL, ProcessInstance processInstance);

CompletableFuture<String> retryProcessInstance(String serviceURL, ProcessInstance processInstance);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

import org.kie.kogito.index.CommonUtils;
import org.kie.kogito.index.api.KogitoRuntimeClient;
import org.kie.kogito.index.graphql.query.GraphQLQueryOrderByParser;
import org.kie.kogito.index.graphql.query.GraphQLQueryParserRegistry;
Expand Down Expand Up @@ -142,25 +143,7 @@ public ProcessDefinition getProcessDefinition(DataFetchingEnvironment env) {
}

protected String getServiceUrl(String endpoint, String processId) {
LOGGER.debug("Process endpoint {}", endpoint);
if (endpoint == null) {
return null;
}
if (endpoint.startsWith("/")) {
LOGGER.warn("Process '{}' endpoint '{}', does not contain full URL, please review the kogito.service.url system property to point the public URL for this runtime.",
processId, endpoint);
}
String context = getContext(processId);
LOGGER.debug("Process context {}", context);
if (context.equals(endpoint) || endpoint.equals("/" + context)) {
return null;
} else {
return endpoint.contains("/" + context) ? endpoint.substring(0, endpoint.lastIndexOf("/" + context)) : null;
}
}

private String getContext(String processId) {
return processId != null && processId.contains(".") ? processId.substring(processId.lastIndexOf('.') + 1) : processId;
return CommonUtils.getServiceUrl(endpoint, processId);
}

protected Collection<ProcessInstance> getChildProcessInstancesValues(DataFetchingEnvironment env) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import org.kie.kogito.index.CommonUtils;
import org.kie.kogito.index.api.ExecuteArgs;
import org.kie.kogito.index.api.KogitoRuntimeClient;
import org.kie.kogito.index.api.KogitoRuntimeCommonClient;
import org.kie.kogito.index.model.Node;
import org.kie.kogito.index.model.ProcessDefinition;
import org.kie.kogito.index.model.ProcessInstance;
import org.kie.kogito.index.model.UserTaskInstance;
import org.kie.kogito.index.service.DataIndexServiceException;
Expand Down Expand Up @@ -72,6 +75,17 @@ class KogitoRuntimeClientImpl extends KogitoRuntimeCommonClient implements Kogit

private static final Logger LOGGER = LoggerFactory.getLogger(KogitoRuntimeClientImpl.class);

@Override
public CompletableFuture<String> executeProcessIntance(ProcessDefinition definition, ExecuteArgs args) {
CompletableFuture<String> future = new CompletableFuture<>();
HttpRequest<Buffer> request = getWebClient(CommonUtils.getServiceUrl(definition.getEndpoint(), definition.getId())).post("/" + definition.getId());
if (args.businessKey() != null) {
request.addQueryParam("businessKey", args.businessKey());
}
request.sendJson(args.input(), res -> asyncHttpResponseTreatment(res, future, "START ProcessInstance of type " + definition.getId()));
return future;
}

@Override
public CompletableFuture<String> abortProcessInstance(String serviceURL, ProcessInstance processInstance) {
String requestURI = format(ABORT_PROCESS_INSTANCE_PATH, processInstance.getProcessId(), processInstance.getId());
Expand Down Expand Up @@ -282,5 +296,4 @@ protected void send(String logMessage, Class type, CompletableFuture future, Asy
future.completeExceptionally(new DataIndexServiceException(getErrorMessage(logMessage, res.result()), res.cause()));
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.kie.kogito.index.service.graphql;

import java.util.Map;
import java.util.concurrent.CompletableFuture;

import org.kie.kogito.index.graphql.AbstractGraphQLSchemaManager;

import graphql.schema.DataFetcher;

public interface GraphQLMutationsProvider {
Map<String, DataFetcher<CompletableFuture<?>>> mutations(AbstractGraphQLSchemaManager schemaManager);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.ServiceLoader.Provider;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.kie.kogito.index.graphql.AbstractGraphQLSchemaManager;
import org.kie.kogito.index.graphql.query.GraphQLQueryParserRegistry;
Expand Down Expand Up @@ -101,6 +105,8 @@ public GraphQLSchema createSchema() {
builder.dataFetcher("UserTaskInstanceCommentDelete", this::deleteUserTaskComment);
builder.dataFetcher("UserTaskInstanceAttachmentUpdate", this::updateUserTaskAttachment);
builder.dataFetcher("UserTaskInstanceAttachmentDelete", this::deleteUserTaskAttachment);
ServiceLoader.load(GraphQLMutationsProvider.class).stream().map(Provider::get).map(m -> m.mutations(this)).flatMap(map -> map.entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (v1, v2) -> v2)).forEach(builder::dataFetcher);
return builder;
})
.type("ProcessDefinition", builder -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.kie.kogito</groupId>
<artifactId>data-index-service</artifactId>
<version>999-SNAPSHOT</version>
</parent>
<artifactId>data-index-service-shared-output</artifactId>
<dependencies>
<dependency>
<groupId>org.kie.kogito</groupId>
<artifactId>data-index-service-common</artifactId>
</dependency>
</dependencies>
<properties>
<java.module.name>org.kie.kogito.index.service.mutations</java.module.name>
</properties>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.kie.kogito.index.service.mutations;

import java.util.Map;
import java.util.concurrent.CompletableFuture;

import org.kie.kogito.index.api.ExecuteArgs;
import org.kie.kogito.index.graphql.AbstractGraphQLSchemaManager;
import org.kie.kogito.index.model.ProcessDefinition;
import org.kie.kogito.index.model.ProcessDefinitionKey;
import org.kie.kogito.index.model.ProcessInstance;
import org.kie.kogito.index.service.graphql.GraphQLMutationsProvider;
import org.kie.kogito.index.storage.DataIndexStorageService;
import org.kie.kogito.jackson.utils.MergeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.JsonNode;

import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;

public class OutputGraphQLMutationProvider implements GraphQLMutationsProvider {

private static Logger logger = LoggerFactory.getLogger(OutputGraphQLMutationProvider.class);
private static final String COMPLETED_INSTANCE_ID = "completedInstanceId";

@Override
public Map<String, DataFetcher<CompletableFuture<?>>> mutations(AbstractGraphQLSchemaManager schemaManager) {
return Map.of("sharedOutput", env -> sharedOutput(schemaManager, env));
}

private CompletableFuture<String> sharedOutput(AbstractGraphQLSchemaManager schemaManager, DataFetchingEnvironment env) {
DataIndexStorageService cacheService = schemaManager.getCacheService();
ProcessDefinitionKey key = new ProcessDefinitionKey(mandatoryArgument(env, "processId"), mandatoryArgument(env, "processVersion"));
ProcessDefinition processDefinition = cacheService.getProcessDefinitionStorage().get(key);
if (processDefinition == null) {
throw new IllegalArgumentException(key + "does not correspond to any existing process definition");
}
JsonNode input = env.getArgument("input");
String completedInstanceId = env.getArgument(COMPLETED_INSTANCE_ID);
if (completedInstanceId != null) {
ProcessInstance processInstance = cacheService.getProcessInstanceStorage().get(completedInstanceId);
if (processInstance != null) {
input = MergeUtils.merge(input, processInstance.getVariables());
} else {
logger.warn("Completed Instance Id {} cannot be found, using user input as it is", completedInstanceId);
}
} else {
logger.warn("Missing " + COMPLETED_INSTANCE_ID + " parameter, using user input as it is");
}
return schemaManager.getDataIndexApiExecutor().executeProcessIntance(processDefinition, ExecuteArgs.of(input));
}

private static <T> T mandatoryArgument(DataFetchingEnvironment env, String name) {
T result = env.getArgument(name);
if (result == null) {
throw new IllegalArgumentException("Missing " + name + " mandatory parameter");
}
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
org.kie.kogito.index.service.mutations.OutputGraphQLMutationProvider
1 change: 1 addition & 0 deletions data-index/data-index-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
<module>data-index-service-postgresql</module>
<module>data-index-service-infinispan</module>
<module>data-index-service-mongodb</module>
<module>data-index-service-shared-output</module>
</modules>

<profiles>
Expand Down
Loading

0 comments on commit 4e2682b

Please sign in to comment.