Skip to content

Commit

Permalink
feature(#958): default kafka topics for all integrations (#1168)
Browse files Browse the repository at this point in the history
* refactoring: rename application.properties to application.yml

* feature: introduce default destinations for integrations

* refactoring: remove digiwf-message-name from integration calls

* feature: adjust element templates, forms and bpmn process to default kafka topics

* chore: fix invalid element templates

* chore: remove app_message_name

* chore: remove app_message_name

* chore: remove app_message_name

* Revert "chore: remove app_message_name"

This reverts commit ecf9b68.

* Revert "chore: remove app_message_name"

This reverts commit c081f40.

* chore: replace app_message_name with app_integration_name

* chore: refactor configs

* fix: compiling error

* bugfix: fix incorrect function call

* chore: fix tests

* feature: use type for message correlation to avoid breaking changes

* feature: send digiwf.processdefinition header to integrations

* tmp: feature(#734): get processdefinition for integration call

* feature(#734): query root process instance id

* feature(#734): add feign error handling

* feature(#734): add tests

* feature(#734): add tests

* feature(#734): add tests

* feature(#734): add integration name to StreamingTemplateV01 and mark it as deprecated

* feature(#734): add integration name to StreamingTemplateV01 and mark it as deprecated

* feature(#734): add integration name to StreamingTemplateV01 and mark it as deprecated

* docs(#958): adjust documentation, element-templates and add connector documentation

* fix: address integration

* fix: processes

* Update EmailIntegration.json

* update element templates

---------

Co-authored-by: Lukas Mösle <[email protected]>
  • Loading branch information
lmoesle and Lukas Mösle authored Jan 29, 2024
1 parent b8a1ad5 commit 406a484
Show file tree
Hide file tree
Showing 185 changed files with 1,481 additions and 2,146 deletions.
9 changes: 0 additions & 9 deletions digiwf-connector/digiwf-camunda-connector-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
<artifactId>digiwf-camunda-connector-service</artifactId>
<name>digiwf-camunda-connector-service</name>

<properties>
</properties>

<parent>
<groupId>de.muenchen.oss.digiwf</groupId>
Expand Down Expand Up @@ -51,13 +49,6 @@
<artifactId>jackson-module-kotlin</artifactId>
</dependency>

<!-- Security -->
<dependency>
<groupId>de.muenchen.oss.digiwf</groupId>
<artifactId>digiwf-spring-security-starter</artifactId>
<version>${project.version}</version>
</dependency>

<!-- database -->
<dependency>
<groupId>com.h2database</groupId>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
digiwf:
streaming:
connector:
topics:
bpmnerror: dwf-connector-bpmnerror-local-01
incident: dwf-connector-incident-local-01
message: dwf-connector-local-01
dlq: dwf-connector-local-01-dlq
group: dwf-connector-local-01
security:
client-id: ${SSO_TASK_CLIENT_ID}
client-secret: ${SSO_TASK_CLIENT_SECRET}
spring:
cloud:
stream:
kafka:
binder:
consumerProperties:
auto:
offset:
reset: latest
brokers: ${KAFKA_BOOTSTRAP_SERVER:localhost}:${KAFKA_BOOTSTRAP_SERVER_PORT:29092}
logging:
level:
org:
springframework: info
io:
muenchendigital: info
server:
port: '8090'
de:
muenchen:
oss:
digiwf:
connector:
core:
integrations:
exampleIntegration: "dwf-example-integration-${DIGIWF_ENV}"

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
spring:
cloud:
stream:
kafka:
binder:
producerProperties:
value:
serializer: org.springframework.kafka.support.serializer.JsonSerializer
key:
serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumerProperties:
key:
deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
value:
deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
default:
consumer:
dlqName: ${digiwf.streaming.connector.topics.dlq}
enableDlq: 'true'
bindings:
dynamicProducer-out-0:
destination: ${digiwf.streaming.connector.topics.message}
producer:
auto-startup: 'true'
createBpmnError-in-0:
group: ${digiwf.streaming.connector.group}
destination: ${digiwf.streaming.connector.topics.bpmnerror}
createIncident-in-0:
destination: ${digiwf.streaming.connector.topics.incident}
group: ${digiwf.streaming.connector.group}
correlateMessage-in-0:
group: ${digiwf.streaming.connector.group}
destination: ${digiwf.streaming.connector.topics.message}
default-binder: kafka
default:
consumer:
maxAttempts: '1'
function:
definition: createIncident;createBpmnError;correlateMessage;dynamicProducer;
kafka:
consumer:
properties:
spring:
json:
trusted:
packages: '*'

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
spring:
application:
name: '@project.artifactId@'
server:
port: '8080'
feign:
client:
config:
default:
url: ${ENGINE_CAMUNDA_REST_ENDPOINT_URL}
digiwf-process-instance:
name: digiwf-process-instance
url: ${ENGINE_REST_ENDPOINT_URL}
camunda:
bpm:
client:
base-url: ${ENGINE_CAMUNDA_REST_ENDPOINT_URL}
date-format: yyyy-MM-dd'T'HH:mm:ss.SSSX
de:
muenchen:
oss:
digiwf:
connector:
core:
integrations:
addressIntegration: "dwf-address-${DIGIWF_ENV}"
alwIntegration: "dwf-alw-${DIGIWF_ENV}"
cosysIntegration: "dwf-cosys-${DIGIWF_ENV}"
dmsIntegration: "dwf-dms-${DIGIWF_ENV}"
emailIntegration: "dwf-email-${DIGIWF_ENV}"
s3Integration: "dwf-s3-${DIGIWF_ENV}"
# TODO: Remove this fallback after all processes are migrated to the new version. It's a legacy feature to avoid breaking changes.
deprecatedLegacyFeature: ${digiwf.streaming.connector.topics.dlq}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@
<artifactId>digiwf-connector-starter</artifactId>
<version>${project.version}</version>
</dependency>
<!-- Security -->
<dependency>
<groupId>de.muenchen.oss.digiwf</groupId>
<artifactId>digiwf-spring-security-starter</artifactId>
<version>${project.version}</version>
</dependency>


<!-- Validation -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.messaging.Message;
Expand All @@ -23,6 +24,7 @@
@RequiredArgsConstructor
@ComponentScan(basePackages = "de.muenchen.oss.digiwf.connector.adapter.camunda.rest")
@EnableConfigurationProperties(DigiWFCamundaConnectorProperties.class)
@EnableFeignClients(basePackages = "de.muenchen.oss.digiwf.connector.adapter.camunda.rest.out")
public class DigiWFCamundaConnectorAutoConfiguration {


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
import de.muenchen.oss.digiwf.connector.adapter.camunda.rest.mapper.EngineDataSerializer;
import de.muenchen.oss.digiwf.connector.core.application.port.in.ExecuteTaskInPort;
import de.muenchen.oss.digiwf.connector.core.application.port.in.ExecuteTaskInPort.ExecuteTaskCommand;
import de.muenchen.oss.digiwf.connector.core.domain.IntegrationNameConfigException;
import de.muenchen.oss.digiwf.connector.core.domain.ProcessDefinitionLoadingException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.camunda.bpm.client.task.ExternalTask;
import org.camunda.bpm.client.task.ExternalTaskHandler;
import org.camunda.bpm.client.task.ExternalTaskService;

import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;


Expand All @@ -25,21 +26,33 @@ public class CamundaClient implements ExternalTaskHandler {
@Override
public void execute(final ExternalTask externalTask, final ExternalTaskService externalTaskService) {
final Map<String, Object> data = this.getData(externalTask);
final String topic = (String) data.get(CamundaClientConfiguration.TOPIC_NAME);
String integrationName = (String) data.get(CamundaClientConfiguration.INTEGRATION_NAME);
final String customTopic = (String) data.get(CamundaClientConfiguration.TOPIC_NAME);
final String type = (String) data.get(CamundaClientConfiguration.TYPE_NAME);
log.info("External task received (topic {}, type {})", topic, type);
final Optional<String> message = Optional.ofNullable(data.get(CamundaClientConfiguration.MESSAGE_NAME)).map(Object::toString);
log.info("External task received (integration {}, type {})", integrationName, type);
final Map<String, Object> filteredData = this.filterVariables(data);

executeTaskInPort.executeTask(ExecuteTaskCommand.builder()
.messageName(message.orElse(null))
.destination(topic)
.type(type)
.instanceId(externalTask.getProcessInstanceId())
.data(filteredData)
.build());
// TODO: Remove this fallback after all processes are migrated to the new version. It's a legacy feature to avoid breaking changes.
if (integrationName == null) {
log.warn("Integration name is null. Falling back to deprecated legacy feature. Please update your process definition.");
integrationName = "deprecatedLegacyFeature";
}

externalTaskService.complete(externalTask);
try {
executeTaskInPort.executeTask(ExecuteTaskCommand.builder()
.customDestination(customTopic)
.integrationName(integrationName)
.type(type)
.instanceId(externalTask.getProcessInstanceId())
.data(filteredData)
.build());

externalTaskService.complete(externalTask);
} catch (final IntegrationNameConfigException e) {
externalTaskService.handleFailure(externalTask, e.getMessage(), e.getDetailedMessage(), 0, 0);
} catch (final ProcessDefinitionLoadingException e) {
externalTaskService.handleFailure(externalTask, e.getMessage(), e.getDetailedMessage(), 0, 0);
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,16 @@

@RequiredArgsConstructor
public class CamundaClientConfiguration {

public final static String MESSAGE_NAME = "app_message_name";
public final static String TYPE_NAME = "app_type_name";
public final static String TOPIC_NAME = "app_topic_name";

public final static String INTEGRATION_NAME = "app_integration_name";

private final List<String> filteredVariables;

public List<String> getFilters() {
final List<String> allFilters = new ArrayList<>();
allFilters.addAll(this.filteredVariables);
allFilters.addAll(List.of(MESSAGE_NAME, TOPIC_NAME, TYPE_NAME));
allFilters.addAll(List.of(TOPIC_NAME, TYPE_NAME, INTEGRATION_NAME));
return allFilters;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,17 @@ public class IncidentAdapter implements CreateIncidentOutPort {
private final EventSubscriptionApi eventSubscriptionApi;

@Override
public void createIncident(final String processInstanceId, final String messageName, final String messageContent) {
public void createIncident(final String processInstanceId, final String integrationName, final String messageContent) {
try {

//check parameters
Assert.notNull(processInstanceId, "process instance id cannot be empty");
Assert.notNull(messageName, "message name cannot be empty");
Assert.notNull(integrationName, "integrationName name cannot be empty");

//load corresponding event subscription
final List<EventSubscriptionDto> eventSubscriptions = this.eventSubscriptionApi.getEventSubscriptions(
null,
messageName,
integrationName,
EVENT_TYPE,
null,
processInstanceId,
Expand Down Expand Up @@ -69,7 +69,7 @@ public void createIncident(final String processInstanceId, final String messageN
// send create incident call
this.executionApi.createIncident(executionId, createIncidentDto);
} catch (final NoSuchElementException | IllegalArgumentException e) {
log.error("Cannot create incident for processinstance id {} and message name {}", processInstanceId, messageName);
log.error("Cannot create incident for processinstance id {} and integration name {}", processInstanceId, integrationName);
throw new RuntimeException(e);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package de.muenchen.oss.digiwf.connector.adapter.camunda.rest.out.processdefinition;

import de.muenchen.oss.digiwf.connector.core.application.port.out.ProcessOutPort;
import de.muenchen.oss.digiwf.connector.core.domain.ProcessDefinitionLoadingException;
import feign.FeignException;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
public class ProcessDefinitionAdapter implements ProcessOutPort {

private final ProcessInstanceClient processInstanceClient;

@Override
public String loadProcessDefinition(final String processInstanceId) throws ProcessDefinitionLoadingException {
try {
return processInstanceClient.getRootProcessInstanceDetail(processInstanceId).getDefinitionName();
} catch (final FeignException e) {
throw new ProcessDefinitionLoadingException("Could not load process definition for process instance with id " + processInstanceId, e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package de.muenchen.oss.digiwf.connector.adapter.camunda.rest.out.processdefinition;

import de.muenchen.oss.digiwf.spring.security.client.DigiwfFeignOauthClientConfig;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;

@FeignClient(
name = "${feign.client.config.digiwf-process-instance.name:digiwf-process-instance}",
url = "${feign.client.config.digiwf-process-instance.url}rest/service/instance",
configuration = DigiwfFeignOauthClientConfig.class
)
public interface ProcessInstanceClient {

@RequestMapping(method = RequestMethod.GET, value = "/root/{id}")
ServiceInstanceTO getRootProcessInstanceDetail(@PathVariable("id") final String id);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package de.muenchen.oss.digiwf.connector.adapter.camunda.rest.out.processdefinition;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.Date;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class ServiceInstanceTO {
private String id;
private String definitionName;
private Date startTime;
private Date endTime;
private String status;
private String description;

}
Loading

0 comments on commit 406a484

Please sign in to comment.