Skip to content

Commit

Permalink
[KOGITO-6785] Serverless Workflow Correlation (#2160)
Browse files Browse the repository at this point in the history
* Inserting serverless workflow correlation
  • Loading branch information
tiagodolphine authored May 30, 2022
1 parent 183cd82 commit a939408
Show file tree
Hide file tree
Showing 46 changed files with 1,102 additions and 112 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2022 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.kie.kogito.correlation;

import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import java.util.StringJoiner;
import java.util.stream.Collectors;

public class CompositeCorrelation implements Correlation<Set<? extends Correlation<?>>> {

private String key;
private Set<? extends Correlation<?>> correlations;

public CompositeCorrelation(Set<? extends Correlation<?>> correlations) {
this.key = buildKey(correlations);
this.correlations = Collections.unmodifiableSet(correlations);
}

private static String buildKey(Set<? extends Correlation<?>> correlations) {
return correlations.stream().map(Correlation::getKey).collect(Collectors.joining("|"));
}

@Override
public String getKey() {
return key;
}

@Override
public Set<? extends Correlation<?>> getValue() {
return correlations;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof CompositeCorrelation)) {
return false;
}
CompositeCorrelation that = (CompositeCorrelation) o;
return Objects.equals(getValue(), that.getValue()) && Objects.equals(getKey(), that.getKey());
}

@Override
public int hashCode() {
return Objects.hash(getValue(), getKey());
}

@Override
public String toString() {
return new StringJoiner(", ", CompositeCorrelation.class.getSimpleName() + "[", "]")
.add("correlations=" + correlations)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,13 @@

import java.util.Optional;

public class Correlation {
public interface Correlation<V> {

private String key;
private Object value;
String getKey();

public Correlation(String key, Object value) {
this.key = key;
this.value = value;
}

public String getKey() {
return key;
}

public Object getValue() {
return value;
}
V getValue();

public String asString() {
return Optional.ofNullable(value).map(Object::toString).orElse(null);
default String asString() {
return Optional.ofNullable(getValue()).map(Object::toString).orElse(null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2022 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.kie.kogito.correlation;

public interface CorrelationEncoder {

String encode(Correlation correlation);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2022 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.kie.kogito.correlation;

public class CorrelationInstance {

private String correlationId;// encoded based on correlations
private String correlatedId;// == processInstanceId
private Correlation<?> correlation;

public CorrelationInstance(String correlationId, String correlatedId, Correlation<?> correlation) {
this.correlationId = correlationId;
this.correlatedId = correlatedId;
this.correlation = correlation;
}

public String getCorrelationId() {
return correlationId;
}

public String getCorrelatedId() {
return correlatedId;
}

public Correlation<?> getCorrelation() {
return correlation;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.kie.kogito.correlation;

public interface CorrelationResolver {

Correlation resolve(Object data);
Correlation<?> resolve(Object data);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2022 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.kie.kogito.correlation;

import java.util.Optional;

public interface CorrelationService {

CorrelationInstance create(Correlation correlation, String correlatedId);

Optional<CorrelationInstance> find(Correlation correlation);

Optional<CorrelationInstance> findByCorrelatedId(String correlatedId);

void delete(Correlation correlation);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2022 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.kie.kogito.correlation;

import java.util.Objects;
import java.util.StringJoiner;

public class SimpleCorrelation<T> implements Correlation<T> {

private String key;
private T value;

public SimpleCorrelation(String key) {
this(key, null);
}

public SimpleCorrelation(String key, T value) {
this.key = key;
this.value = value;
}

public String getKey() {
return key;
}

public T getValue() {
return value;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof SimpleCorrelation)) {
return false;
}
SimpleCorrelation that = (SimpleCorrelation) o;
return Objects.equals(getKey(), that.getKey()) && Objects.equals(getValue(), that.getValue());
}

@Override
public int hashCode() {
return Objects.hash(getKey(), getValue());
}

@Override
public String toString() {
return new StringJoiner(", ", SimpleCorrelation.class.getSimpleName() + "[", "]")
.add("key='" + key + "'")
.add("value=" + value)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,8 @@ public interface KogitoProcessInstance extends ProcessInstance, KogitoEventListe
* @return map with headers
*/
Map<String, List<String>> getHeaders();

void wrap(org.kie.kogito.process.ProcessInstance<?> kogitoProcessInstance);

org.kie.kogito.process.ProcessInstance<?> unwrap();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.util.function.Predicate;

import org.kie.kogito.Model;
import org.kie.kogito.correlation.CompositeCorrelation;
import org.kie.kogito.correlation.CorrelationService;
import org.kie.kogito.internal.process.runtime.KogitoNode;

public interface Process<T> {
Expand All @@ -27,10 +29,14 @@ public interface Process<T> {

ProcessInstance<T> createInstance(String businessKey, T workingMemory);

ProcessInstance<T> createInstance(String businessKey, CompositeCorrelation correlation, T workingMemory);

ProcessInstances<T> instances();

Collection<KogitoNode> findNodes(Predicate<KogitoNode> filter);

CorrelationService correlations();

<S> void send(Signal<S> sig);

T createModel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.function.Function;
import java.util.function.Predicate;

import org.kie.kogito.correlation.Correlation;
import org.kie.kogito.internal.process.runtime.KogitoNodeInstance;
import org.kie.kogito.internal.process.runtime.KogitoWorkItem;
import org.kie.kogito.process.flexible.AdHocFragment;
Expand Down Expand Up @@ -257,4 +258,5 @@ default ProcessInstance<T> checkError() {

long version();

Optional<? extends Correlation<?>> correlation();
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.kie.kogito.MappableToModel;
import org.kie.kogito.Model;
import org.kie.kogito.auth.SecurityPolicy;
import org.kie.kogito.correlation.CompositeCorrelation;
import org.kie.kogito.process.workitem.Attachment;
import org.kie.kogito.process.workitem.AttachmentInfo;
import org.kie.kogito.process.workitem.Comment;
Expand All @@ -45,7 +46,8 @@ <T extends Model> ProcessInstance<T> createProcessInstance(Process<T> process, S
<T extends Model> ProcessInstance<T> createProcessInstance(Process<T> process, String businessKey, T model,
String startFromNodeId,
String trigger,
String kogitoReferenceId);
String kogitoReferenceId,
CompositeCorrelation correlation);

<T extends MappableToModel<R>, R> List<R> getProcessInstanceOutput(Process<T> process);

Expand Down
Loading

0 comments on commit a939408

Please sign in to comment.