Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix(#1253): provide kafka configs with digiwf-message-starter #1256

Merged
merged 7 commits into from
Feb 6, 2024

Conversation

lmoesle
Copy link
Contributor

@lmoesle lmoesle commented Jan 31, 2024

Description

  • Move kafka configs to digiwf-message-starter
  • Configure DLQs
  • Adjust docs
  • Fix kafka topics in local development stack

Reference

Issues: closes #1253

Check-List

  • All Acceptance criteria of user story are met
  • JUnit tests are written (60% CodeCov)
  • Internal Review is maintained
  • Documentations external and internal are completed
  • Smoketest successful (Manual E2E-Test depending on Change)
  • No Branch waste left
  • Board is up-to-date
  • Openshift environments are prepared (Secrets, etc.) and release-issue is maintained

@lmoesle
Copy link
Contributor Author

lmoesle commented Jan 31, 2024

@simonhir welches Topic soll ich als DLQ in den Integrationen konfigurieren?

Copy link

codecov bot commented Jan 31, 2024

Codecov Report

Attention: 2 lines in your changes are missing coverage. Please review.

Comparison is base (824ee6f) 39.38% compared to head (541ab0b) 39.38%.
Report is 3 commits behind head on dev.

❗ Current head 541ab0b differs from pull request most recent head 66c2681. Consider uploading reports for the commit 66c2681 to get more accurate results

Files Patch % Lines
...ctor/adapter/camunda/rest/out/IncidentAdapter.java 50.00% 1 Missing ⚠️
...or/core/adapter/in/streaming/IncidentConsumer.java 66.66% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff            @@
##                dev    #1256   +/-   ##
=========================================
  Coverage     39.38%   39.38%           
  Complexity     1344     1344           
=========================================
  Files           600      600           
  Lines          8732     8732           
  Branches        430      430           
=========================================
  Hits           3439     3439           
  Misses         5155     5155           
  Partials        138      138           
Flag Coverage Δ
unittests 39.38% <66.66%> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@simonhir
Copy link
Member

simonhir commented Feb 1, 2024

@lmoesle so wie ich das sehe müsste das dwf-digiwf-engine-<ENV>-dlq sein. Wäre jetzt irgendwie eher von einem *connector*-Topic ausgegangen. Müsste man auf jeden Fall auch sicher stellen, dass die Engine selbst da nicht irgendwie drauf hört. Hab auf die schnelle aber jetzt nichts entsprechendes gefunden.
Das obige Topic, wäre so aktuell auch als DIGIWF_STREAMING_CONNECTOR_TOPICS_DLQ für den Connector im Ops-Repo.

@lmoesle
Copy link
Contributor Author

lmoesle commented Feb 1, 2024

Ich hab das DLQ Topic jetzt nochmal umbenannt, dass die Integrationen auch das Engine DLQ Topic verwenden

@simonhir
Copy link
Member

simonhir commented Feb 1, 2024

Habs gerade mal probiert indem ich throw new MailSendException("Test"); in die Mail-Integration eingebaut hatte.
Führt dann zu folgendem Log, wo es für mich erstmal so aussieht, wie wenn der die DLQ richtig setzt. Ein Incident wird da aber noch nicht draus.

Log
2024.02.01 11:05:59.837 | ERROR | [KafkaConsumerDestination{consumerDestinationName='dwf-email-local-01', partitions=1, dlqName='dwf-digiwf-engine-local-01-dlq'}.container-0-C-1] | LoggingHandler | [LogAccessor.java : 250] - org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@3137a6cf], failedMessage=GenericMessage [payload=byte[697], headers={deliveryAttempt=3, kafka_timestampType=CREATE_TIME, type=sendMailFromEventBus, kafka_receivedTopic=dwf-email-local-01, digiwf.processdefinition=example-email-V02, digiwf.integrationname=emailIntegration, kafka_offset=9, spring.cloud.stream.sendto.destination=dwf-email-local-01, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@21289d0a, digiwf.processinstanceid=5af89f98-c0e9-11ee-9bb3-88899d5a525c, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTimestamp=1706781942747, kafka_groupId=dwf-email-service}]
	at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
	at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:108)
	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
	at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
	at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:375)
	at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:346)
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:326)
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:299)
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
	at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
	at org.springframework.integration.endpoint.MessageProducerSupport.lambda$sendMessage$1(MessageProducerSupport.java:262)
	at io.micrometer.observation.Observation.lambda$observe$0(Observation.java:493)
	at io.micrometer.observation.Observation.observeWithContext(Observation.java:603)
	at io.micrometer.observation.Observation.observe(Observation.java:492)
	at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:262)
	at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:394)
	at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.lambda$onMessage$0(KafkaMessageDrivenChannelAdapter.java:464)
	at org.springframework.integration.kafka.inbound.KafkaInboundEndpoint.lambda$doWithRetry$0(KafkaInboundEndpoint.java:70)
	at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:335)
	at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:227)
	at org.springframework.integration.kafka.inbound.KafkaInboundEndpoint.doWithRetry(KafkaInboundEndpoint.java:66)
	at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:461)
	at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:425)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2873)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2854)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$57(KafkaMessageListenerContainer.java:2772)
	at io.micrometer.observation.Observation.lambda$observe$4(Observation.java:544)
	at io.micrometer.observation.Observation.observeWithContext(Observation.java:603)
	at io.micrometer.observation.Observation.observe(Observation.java:544)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2770)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2622)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2508)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2150)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1505)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1469)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1344)
	at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.springframework.mail.MailSendException: Test
	at de.muenchen.oss.digiwf.email.integration.application.usecase.SendMailUseCase.sendMail(SendMailUseCase.java:61)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:343)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:751)
	at org.springframework.validation.beanvalidation.MethodValidationInterceptor.invoke(MethodValidationInterceptor.java:141)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:751)
	at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:703)
	at de.muenchen.oss.digiwf.email.integration.application.usecase.SendMailUseCase$$SpringCGLIB$$0.sendMail(<generated>)
	at de.muenchen.oss.digiwf.email.integration.adapter.in.MessageProcessor.lambda$emailIntegration$0(MessageProcessor.java:27)
	at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.invokeConsumer(SimpleFunctionRegistry.java:1029)
	at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:731)
	at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:577)
	at org.springframework.cloud.function.context.config.RoutingFunction.route(RoutingFunction.java:196)
	at org.springframework.cloud.function.context.config.RoutingFunction.apply(RoutingFunction.java:107)
	at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:725)
	at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:577)
	at org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper.apply(PartitionAwareFunctionWrapper.java:92)
	at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionWrapper.apply(FunctionConfiguration.java:832)
	at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:661)
	at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
	... 41 more

@lmoesle lmoesle force-pushed the bugfix/#1253-configure-dlqs-for-integrations branch from 32cbbed to 44cd1a4 Compare February 1, 2024 16:46
@lmoesle
Copy link
Contributor Author

lmoesle commented Feb 1, 2024

Ich hab's mir nochmal angeschaut. Ich habe die deadLetterQueueDestination auf das Incident Topic des Connectors eingestellt. So werden Incidents erzeugt, wenn der Retry failed.
Dabei ist mir noch ein Bug im Connector aufgefallen, den ich auch direkt gefixed habe. Bei #958 ist beim Renaming von Integration Name zu Type etwas schiefgegangen und Incidents konnten nicht erzeugt werden.

@simonhir
Copy link
Member

simonhir commented Feb 6, 2024

Funktioniert 👍 cool wäre wenn man den Fehler auch zurück geben würde, aber das ist wenn was für ein extra Ticket.

…nchen/oss/digiwf/connector/core/adapter/in/streaming/IncidentConsumer.java

Co-authored-by: Simon Hirtreiter <[email protected]>
@lmoesle lmoesle enabled auto-merge (squash) February 6, 2024 14:41
@lmoesle lmoesle merged commit 4002d39 into dev Feb 6, 2024
5 checks passed
@lmoesle lmoesle deleted the bugfix/#1253-configure-dlqs-for-integrations branch February 6, 2024 14:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Dlq not correctly configured for integrations
2 participants