Skip to content

Commit

Permalink
[Fix apache#2165] Adding custom mutations
Browse files Browse the repository at this point in the history
  • Loading branch information
fjtirado committed Dec 17, 2024
1 parent 846de1c commit 26a1600
Show file tree
Hide file tree
Showing 11 changed files with 230 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.kie.kogito.index.api;

import com.fasterxml.jackson.databind.JsonNode;

public record ExecuteArgs(JsonNode input, String businessKey, String referenceId) {

public static ExecuteArgs of(JsonNode input) {
return new Builder().withInput(input).build();
}

public static Builder builder(JsonNode modelInput) {
return new Builder();
}

public static class Builder {

private JsonNode input;
private String businessKey;
private String referenceId;

private Builder() {
}

public Builder withInput(JsonNode input) {
this.input = input;
return this;
}

public Builder withBusinessKey(String businessKey) {
this.businessKey = businessKey;
return this;
}

public Builder withReferenceId(String referenceId) {
this.referenceId = referenceId;
return this;
}

public ExecuteArgs build() {
return new ExecuteArgs(input, businessKey, referenceId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@

import org.kie.kogito.index.model.Job;
import org.kie.kogito.index.model.Node;
import org.kie.kogito.index.model.ProcessDefinition;
import org.kie.kogito.index.model.ProcessInstance;
import org.kie.kogito.index.model.UserTaskInstance;

public interface KogitoRuntimeClient {

CompletableFuture<String> executeProcessIntance(String serviceURL, ProcessDefinition definition, ExecuteArgs args);

CompletableFuture<String> abortProcessInstance(String serviceURL, ProcessInstance processInstance);

CompletableFuture<String> retryProcessInstance(String serviceURL, ProcessInstance processInstance);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public ProcessDefinition getProcessDefinition(DataFetchingEnvironment env) {
return cacheService.getProcessDefinitionStorage().get(new ProcessDefinitionKey(source.getProcessId(), source.getVersion()));
}

protected String getServiceUrl(String endpoint, String processId) {
public String getServiceUrl(String endpoint, String processId) {
LOGGER.debug("Process endpoint {}", endpoint);
if (endpoint == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import org.kie.kogito.index.api.ExecuteArgs;
import org.kie.kogito.index.api.KogitoRuntimeClient;
import org.kie.kogito.index.api.KogitoRuntimeCommonClient;
import org.kie.kogito.index.model.Node;
import org.kie.kogito.index.model.ProcessDefinition;
import org.kie.kogito.index.model.ProcessInstance;
import org.kie.kogito.index.model.UserTaskInstance;
import org.kie.kogito.index.service.DataIndexServiceException;
Expand Down Expand Up @@ -72,6 +74,17 @@ class KogitoRuntimeClientImpl extends KogitoRuntimeCommonClient implements Kogit

private static final Logger LOGGER = LoggerFactory.getLogger(KogitoRuntimeClientImpl.class);

@Override
public CompletableFuture<String> executeProcessIntance(String serviceUrl, ProcessDefinition definition, ExecuteArgs args) {
CompletableFuture<String> future = new CompletableFuture<>();
HttpRequest<Buffer> request = getWebClient(serviceUrl).post("/" + definition.getId());
if (args.businessKey() != null) {
request.addQueryParam("businessKey", args.businessKey());
}
request.sendJson(args.input(), res -> asyncHttpResponseTreatment(res, future, "START ProcessInstance of type " + definition.getId()));
return future;
}

@Override
public CompletableFuture<String> abortProcessInstance(String serviceURL, ProcessInstance processInstance) {
String requestURI = format(ABORT_PROCESS_INSTANCE_PATH, processInstance.getProcessId(), processInstance.getId());
Expand Down Expand Up @@ -282,5 +295,4 @@ protected void send(String logMessage, Class type, CompletableFuture future, Asy
future.completeExceptionally(new DataIndexServiceException(getErrorMessage(logMessage, res.result()), res.cause()));
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.kie.kogito.index.service.graphql;

import java.util.Map;
import java.util.concurrent.CompletableFuture;

import org.kie.kogito.index.graphql.AbstractGraphQLSchemaManager;

import graphql.schema.DataFetcher;

public interface GraphQLMutationsProvider {
Map<String, DataFetcher<CompletableFuture<?>>> mutations(AbstractGraphQLSchemaManager schemaManager);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.ServiceLoader.Provider;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.kie.kogito.index.graphql.AbstractGraphQLSchemaManager;
import org.kie.kogito.index.graphql.query.GraphQLQueryParserRegistry;
Expand Down Expand Up @@ -101,6 +105,8 @@ public GraphQLSchema createSchema() {
builder.dataFetcher("UserTaskInstanceCommentDelete", this::deleteUserTaskComment);
builder.dataFetcher("UserTaskInstanceAttachmentUpdate", this::updateUserTaskAttachment);
builder.dataFetcher("UserTaskInstanceAttachmentDelete", this::deleteUserTaskAttachment);
ServiceLoader.load(GraphQLMutationsProvider.class).stream().map(Provider::get).map(m -> m.mutations(this)).flatMap(map -> map.entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (v1, v2) -> v2)).forEach(builder::dataFetcher);
return builder;
})
.type("ProcessDefinition", builder -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.kie.kogito</groupId>
<artifactId>data-index-service</artifactId>
<version>999-SNAPSHOT</version>
</parent>
<artifactId>data-index-service-shared-output</artifactId>
<dependencies>
<dependency>
<groupId>org.kie.kogito</groupId>
<artifactId>data-index-service-common</artifactId>
</dependency>
</dependencies>
<properties>
<java.module.name>org.kie.kogito.index.service.mutations</java.module.name>
</properties>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.kie.kogito.index.service.mutations;

import java.util.Map;
import java.util.concurrent.CompletableFuture;

import org.kie.kogito.index.api.ExecuteArgs;
import org.kie.kogito.index.graphql.AbstractGraphQLSchemaManager;
import org.kie.kogito.index.model.ProcessDefinition;
import org.kie.kogito.index.model.ProcessDefinitionKey;
import org.kie.kogito.index.model.ProcessInstance;
import org.kie.kogito.index.service.graphql.GraphQLMutationsProvider;
import org.kie.kogito.index.storage.DataIndexStorageService;
import org.kie.kogito.jackson.utils.MergeUtils;

import com.fasterxml.jackson.databind.JsonNode;

import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;

public class OutputGraphQLMutationProvider implements GraphQLMutationsProvider {

@Override
public Map<String, DataFetcher<CompletableFuture<?>>> mutations(AbstractGraphQLSchemaManager schemaManager) {
return Map.of("sharedOutput", env -> sharedOutput(schemaManager, env));
}

private CompletableFuture<String> sharedOutput(AbstractGraphQLSchemaManager schemaManager, DataFetchingEnvironment env) {
String assesmentId = env.getArgument("assesmentInstanceId");
String processId = env.getArgument("processId");
String processVersion = env.getArgument("processVersion");
DataIndexStorageService cacheService = schemaManager.getCacheService();
ProcessInstance processInstance = cacheService.getProcessInstanceStorage().get(assesmentId);
JsonNode input = env.getArgument("input");
if (processInstance != null) {
input = MergeUtils.merge(input, processInstance.getVariables());
}
ProcessDefinition processDefinition = cacheService.getProcessDefinitionStorage().get(new ProcessDefinitionKey(processId, processVersion));
if (processDefinition != null) {
return schemaManager.getDataIndexApiExecutor().executeProcessIntance(schemaManager.getServiceUrl(processDefinition.getEndpoint(),
processDefinition.getId()), processDefinition, ExecuteArgs.of(input));
}
return new CompletableFuture<>();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
org.kie.kogito.index.service.mutations.OutputGraphQLMutationProvider
1 change: 1 addition & 0 deletions data-index/data-index-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
<module>data-index-service-postgresql</module>
<module>data-index-service-infinispan</module>
<module>data-index-service-mongodb</module>
<module>data-index-service-shared-output</module>
</modules>

<profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,17 @@

import org.eclipse.microprofile.context.ManagedExecutor;
import org.kie.kogito.Application;
import org.kie.kogito.Model;
import org.kie.kogito.index.api.ExecuteArgs;
import org.kie.kogito.index.api.KogitoRuntimeClient;
import org.kie.kogito.index.api.KogitoRuntimeCommonClient;
import org.kie.kogito.index.model.Node;
import org.kie.kogito.index.model.ProcessDefinition;
import org.kie.kogito.index.model.ProcessInstance;
import org.kie.kogito.index.model.UserTaskInstance;
import org.kie.kogito.index.service.DataIndexServiceException;
import org.kie.kogito.internal.process.runtime.KogitoWorkflowProcess;
import org.kie.kogito.jackson.utils.JsonObjectUtils;
import org.kie.kogito.process.Process;
import org.kie.kogito.process.ProcessInstanceExecutionException;
import org.kie.kogito.process.Processes;
Expand Down Expand Up @@ -273,4 +277,16 @@ private String executeOnProcessInstance(String processId, String processInstance
}
});
}

@Override
public CompletableFuture<String> executeProcessIntance(String serviceURL, ProcessDefinition definition,
ExecuteArgs args) {
Process<?> process = processes != null ? processes.processById(definition.getId()) : null;
if (process == null) {
throw new DataIndexServiceException(String.format("Unable to find Process with id %s to perform the operation requested", definition.getId()));
}
Model m = (Model) process.createModel();
m.update(JsonObjectUtils.convertValue(args.input(), Map.class));
return CompletableFuture.completedFuture(process.createInstance(m).id());
}
}

0 comments on commit 26a1600

Please sign in to comment.