Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix(#1253): provide kafka configs with digiwf-message-starter #1256

Merged
merged 7 commits into from
Feb 6, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,17 @@
private final EventSubscriptionApi eventSubscriptionApi;

@Override
public void createIncident(final String processInstanceId, final String integrationName, final String messageContent) {
public void createIncident(final String processInstanceId, final String typeHeader, final String messageContent) {
try {

//check parameters
Assert.notNull(processInstanceId, "process instance id cannot be empty");
Assert.notNull(integrationName, "integrationName name cannot be empty");
Assert.notNull(typeHeader, "typeHeader name cannot be empty");

//load corresponding event subscription
final List<EventSubscriptionDto> eventSubscriptions = this.eventSubscriptionApi.getEventSubscriptions(
null,
integrationName,
typeHeader,
EVENT_TYPE,
null,
processInstanceId,
Expand Down Expand Up @@ -69,7 +69,7 @@
// send create incident call
this.executionApi.createIncident(executionId, createIncidentDto);
} catch (final NoSuchElementException | IllegalArgumentException e) {
log.error("Cannot create incident for processinstance id {} and integration name {}", processInstanceId, integrationName);
log.error("Cannot create incident for processinstance id {} and type header {}", processInstanceId, typeHeader);

Check warning on line 72 in digiwf-connector/digiwf-camunda-rest-connector-starter/src/main/java/de/muenchen/oss/digiwf/connector/adapter/camunda/rest/out/IncidentAdapter.java

View check run for this annotation

Codecov / codecov/patch

digiwf-connector/digiwf-camunda-rest-connector-starter/src/main/java/de/muenchen/oss/digiwf/connector/adapter/camunda/rest/out/IncidentAdapter.java#L72

Added line #L72 was not covered by tests
throw new RuntimeException(e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
public class IncidentConsumer {

private static final String HEADER_PROCESS_INSTANCE_ID = "digiwf.processinstanceid";
public static final String HEADER_INTEGRATION_NAME = "digiwf.integrationname";
public static final String HEADER_TYPE = "type";


private final CreateIncidentInPort inPort;
Expand All @@ -30,20 +30,20 @@ public class IncidentConsumer {
public Consumer<Message<String>> createIncident() {
return correlation -> {
final Optional<String> processInstanceId = Optional.ofNullable(correlation.getHeaders().get(HEADER_PROCESS_INSTANCE_ID)).map(Object::toString);
final Optional<String> integrationName = Optional.ofNullable(correlation.getHeaders().get(HEADER_INTEGRATION_NAME)).map(Object::toString);
final Optional<String> typeHeader = Optional.ofNullable(correlation.getHeaders().get(HEADER_TYPE)).map(Object::toString);

if (processInstanceId.isEmpty()) {
log.error("No process instance id present. Cannot create an incident");
return;
}

if (integrationName.isEmpty()) {
if (typeHeader.isEmpty()) {
log.error("No integrationName is present. Cannot create an incident");
lmoesle marked this conversation as resolved.
Show resolved Hide resolved
return;
}

log.info("Received create incident for process instance with id: {}", processInstanceId.get());
this.inPort.createIncident(processInstanceId.get(), integrationName.get(), correlation.getPayload());
this.inPort.createIncident(processInstanceId.get(), typeHeader.get(), correlation.getPayload());
};
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package de.muenchen.oss.digiwf.connector.core.application.port.in;

import jakarta.validation.constraints.NotBlank;

public interface CreateIncidentInPort {

/**
Expand All @@ -9,6 +11,6 @@ public interface CreateIncidentInPort {
* @param integrationName name of the integration that should be answered
* @param messageContent optional content of message for provide detailed information
*/
void createIncident(String processInstanceId, String integrationName, String messageContent);
void createIncident(@NotBlank String processInstanceId, @NotBlank String integrationName, String messageContent);

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ public interface CreateIncidentOutPort {
* Create a incident
*
* @param processInstanceId id of the process instance
* @param integrationName name of the integration that should be answered
* @param typeHeader type header of the integration
* @param messageContent optional content of message for provide detailed information
*/
void createIncident(String processInstanceId, String integrationName, @Nullable String messageContent);
void createIncident(String processInstanceId, String typeHeader, @Nullable String messageContent);

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ class CreateIncidentUseCase implements CreateIncidentInPort {
private final CreateIncidentOutPort outPort;

@Override
public void createIncident(@NotBlank String processInstanceId, @NotBlank String integrationName, String messageContent) {
public void createIncident(@NotBlank String processInstanceId, @NotBlank String typeHeader, String messageContent) {
log.info("Received create incident for process instance with id: {}", processInstanceId);
outPort.createIncident(processInstanceId, integrationName, messageContent);
outPort.createIncident(processInstanceId, typeHeader, messageContent);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
@DisplayName("Incident Consumer Test")
class IncidentConsumerTest {
private static final String HEADER_PROCESS_INSTANCE_ID = "digiwf.processinstanceid";
public static final String HEADER_INTEGRATION_NAME = "digiwf.integrationname";
public static final String HEADER_TYPE = "type";

private final CreateIncidentInPort inPort = mock(CreateIncidentInPort.class);

Expand Down Expand Up @@ -56,13 +56,13 @@ void shouldCreateIncidentIfHeadersAreGiven() {
public @NotNull MessageHeaders getHeaders() {
final HashMap<String, Object> map = new HashMap<>();
map.put(HEADER_PROCESS_INSTANCE_ID, "process-instance-id");
map.put(HEADER_INTEGRATION_NAME, "integration-name");
map.put(HEADER_TYPE, "type-header");

return new MessageHeaders(map);
}
});

verify(inPort).createIncident("process-instance-id", "integration-name", "payload");
verify(inPort).createIncident("process-instance-id", "type-header", "payload");
}

@Test
Expand All @@ -80,12 +80,12 @@ public String getPayload() {
public @NotNull MessageHeaders getHeaders() {
final HashMap<String, Object> map = new HashMap<>();
map.put(HEADER_PROCESS_INSTANCE_ID, "process-instance-id");
map.put(HEADER_INTEGRATION_NAME, "integration-name");
map.put(HEADER_TYPE, "type-header");

return new MessageHeaders(map);
}
});

verify(inPort).createIncident("process-instance-id", "integration-name", null);
verify(inPort).createIncident("process-instance-id", "type-header", null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,52 +29,21 @@ spring:
lifecycle:
timeout-per-shutdown-phase: 1m
cloud:
function:
definition: functionRouter;sendMessage;
routing-expression: "headers['type']"
stream:
bindings:
functionRouter-in-0:
destination: "dwf-address-${DIGIWF_ENV}"
group: dwf-address-service
sendMessage-out-0:
destination: "dwf-connector-${DIGIWF_ENV}"
function:
routing:
enabled: true
kafka:
binder:
configuration:
security:
protocol: ${KAFKA_SECURITY_PROTOCOL:PLAINTEXT}
producerProperties:
value:
serializer: org.springframework.kafka.support.serializer.JsonSerializer
key:
serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumerProperties:
auto:
offset:
reset: latest
key:
deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
value:
deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
brokers: ${KAFKA_BOOTSTRAP_SERVER:localhost}:${KAFKA_BOOTSTRAP_SERVER_PORT:29092}
kafka:
consumer:
properties:
spring:
json:
trusted:
packages: '*'
io:
muenchendigital:
digiwf:
message:
incidentDestination: "dwf-connector-incident-${DIGIWF_ENV}"
bpmnErrorDestination: "dwf-connector-bpmnerror-${DIGIWF_ENV}"
correlateMessageDestination: "dwf-connector-${DIGIWF_ENV}"
deadLetterQueueDestination: "dwf-connector-incident-${DIGIWF_ENV}"
typeMappings:
searchAddressesGermany: searchAddressesGermany
checkAddressMunich: checkAddressMunich
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ io:
incidentDestination: "dwf-connector-incident-${DIGIWF_ENV}"
bpmnErrorDestination: "dwf-connector-bpmnerror-${DIGIWF_ENV}"
correlateMessageDestination: "dwf-connector-${DIGIWF_ENV}"
deadLetterQueueDestination: "dwf-connector-incident-${DIGIWF_ENV}"
typeMappings:
getAlwResponsibility: "getAlwResponsibility"

Expand All @@ -27,52 +28,14 @@ info:
spring:
application.name: @project.artifactId@
banner.location: banner.txt

cloud:
stream:
kafka:
binder:
brokers: ${KAFKA_BOOTSTRAP_SERVER:localhost}:${KAFKA_BOOTSTRAP_SERVER_PORT:29092}
producerProperties:
value:
serializer: org.springframework.kafka.support.serializer.JsonSerializer
key:
serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumerProperties:
auto:
offset:
reset: latest
key:
deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
value:
deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
configuration:
security:
protocol: ${KAFKA_SECURITY_PROTOCOL:PLAINTEXT}
bindings:
functionRouter-in-0:
group: "dwf-alw-service"
destination: dwf-alw-${DIGIWF_ENV}
sendMessage-out-0:
destination: dwf-connector-${DIGIWF_ENV}
default:
consumer:
maxAttempts: '1'
default-binder: kafka
function:
routing:
enabled: 'true'

function:
definition: functionRouter;sendMessage;
routing-expression: "headers['type']"
kafka:
consumer:
properties:
spring:
json:
trusted:
packages: '*'

server:
shutdown: "graceful"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,48 +1,12 @@
spring:
cloud:
function:
definition: functionRouter;sendMessage;
routing-expression: "headers['type']"
stream:
function:
routing:
enabled: true
bindings:
functionRouter-in-0:
group: "dwf-cosys-service"
destination: "dwf-cosys-${DIGIWF_ENV}"
sendMessage-out-0:
destination: "dwf-connector-${DIGIWF_ENV}"
kafka:
binder:
consumerProperties:
key:
deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
value:
deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
producerProperties:
key:
serializer: org.springframework.kafka.support.serializer.JsonSerializer
value:
serializer: org.springframework.kafka.support.serializer.JsonSerializer
brokers: ${KAFKA_BOOTSTRAP_SERVER:localhost}:${KAFKA_BOOTSTRAP_SERVER_PORT:29092}

# ##DLQ Config -> only possible if group for functionRouter is set
# default:
# consumer:
# dlqName:
# enableDlq: true
# default:
# consumer:
# #maxAttempts: 1

kafka:
consumer:
properties:
spring:
json:
trusted:
packages: "*"

io:
muenchendigital:
Expand All @@ -51,3 +15,4 @@ io:
incidentDestination: "dwf-connector-incident-${DIGIWF_ENV}"
bpmnErrorDestination: "dwf-connector-bpmnerror-${DIGIWF_ENV}"
correlateMessageDestination: "dwf-connector-${DIGIWF_ENV}"
deadLetterQueueDestination: "dwf-connector-incident-${DIGIWF_ENV}"
Original file line number Diff line number Diff line change
Expand Up @@ -5,38 +5,13 @@ spring:
application:
name: @project.artifactId@
cloud:
function:
definition: functionRouter;sendMessage;
routing-expression: "headers['type']"
stream:
function:
routing:
enabled: 'true'
bindings:
functionRouter-in-0:
group: "dwf-dms-service"
destination: "dwf-dms-${DIGIWF_ENV}"
sendMessage-out-0:
destination: "dwf-connector-${DIGIWF_ENV}"
kafka:
binder:
configuration:
security:
protocol: ${KAFKA_SECURITY_PROTOCOL:PLAINTEXT}
producerProperties:
value:
serializer: org.springframework.kafka.support.serializer.JsonSerializer
key:
serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumerProperties:
auto:
offset:
reset: latest
key:
deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
value:
deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
brokers: ${KAFKA_BOOTSTRAP_SERVER:localhost}:${KAFKA_BOOTSTRAP_SERVER_PORT:29092}
security:
oauth2:
client:
Expand All @@ -53,13 +28,6 @@ spring:
client-id: ${SSO_DMS_CLIENT_ID}
client-secret: ${SSO_DMS_CLIENT_SECRET}
scope: email, profile, openid # needed for userInfo endpoint
kafka:
consumer:
properties:
spring:
json:
trusted:
packages: '*'
# Config for spring actuator endpoints
management:
server.port: ${server.port}
Expand Down Expand Up @@ -102,6 +70,7 @@ io:
incidentDestination: "dwf-connector-incident-${DIGIWF_ENV}"
bpmnErrorDestination: "dwf-connector-bpmnerror-${DIGIWF_ENV}"
correlateMessageDestination: "dwf-connector-${DIGIWF_ENV}"
deadLetterQueueDestination: "dwf-connector-incident-${DIGIWF_ENV}"

de:
muenchen:
Expand Down
Loading
Loading