diff --git a/application/src/integrationTest/resources/application.yaml b/application/src/integrationTest/resources/application.yaml index 6865a9b..c46fea9 100644 --- a/application/src/integrationTest/resources/application.yaml +++ b/application/src/integrationTest/resources/application.yaml @@ -31,14 +31,20 @@ security: provider: SunRsaSign signature: SHA256withRSA -topics: -# short circuit configuration: topics are linked back to the same proxy instance +kafka: + # short circuit configuration: topics are linked back to the same proxy instance outgoing: - requests: requests - responses: responses + requests: + topic: requests + responses: + topic: responses incoming: - requests: requests - responses: responses + requests: + topic: requests + concurrency: 1 + responses: + topic: responses + concurrency: 1 soap: call-endpoint: diff --git a/application/src/main/kotlin/org/gxf/soapbridge/kafka/listeners/ProxyRequestKafkaListener.kt b/application/src/main/kotlin/org/gxf/soapbridge/kafka/listeners/ProxyRequestKafkaListener.kt index 2ad3a92..a5aafa7 100644 --- a/application/src/main/kotlin/org/gxf/soapbridge/kafka/listeners/ProxyRequestKafkaListener.kt +++ b/application/src/main/kotlin/org/gxf/soapbridge/kafka/listeners/ProxyRequestKafkaListener.kt @@ -15,7 +15,11 @@ import org.springframework.stereotype.Component class ProxyRequestKafkaListener(private val platformCommunicationService: PlatformCommunicationService) { private val logger = KotlinLogging.logger { } - @KafkaListener(topics = ["\${topics.incoming.requests}"], id = "gxf-request-consumer") + @KafkaListener( + id = "gxf-request-consumer", + topics = ["\${kafka.incoming.requests.topic}"], + concurrency = "\${kafka.incoming.requests.concurrency}" + ) fun consume(record: ConsumerRecord) { logger.debug { "Received request: ${record.key()}, ${record.value()}" } val requestMessage = ProxyServerRequestMessage.createInstanceFromString(record.value()) diff --git a/application/src/main/kotlin/org/gxf/soapbridge/kafka/listeners/ProxyResponseKafkaListener.kt b/application/src/main/kotlin/org/gxf/soapbridge/kafka/listeners/ProxyResponseKafkaListener.kt index 95191bb..fe2d3e7 100644 --- a/application/src/main/kotlin/org/gxf/soapbridge/kafka/listeners/ProxyResponseKafkaListener.kt +++ b/application/src/main/kotlin/org/gxf/soapbridge/kafka/listeners/ProxyResponseKafkaListener.kt @@ -15,7 +15,11 @@ import org.springframework.stereotype.Component class ProxyResponseKafkaListener(private val clientCommunicationService: ClientCommunicationService) { private val logger = KotlinLogging.logger { } - @KafkaListener(topics = ["\${topics.incoming.responses}"], id = "gxf-response-consumer") + @KafkaListener( + id = "gxf-response-consumer", + topics = ["\${kafka.incoming.responses.topic}"], + concurrency = "\${kafka.incoming.responses.concurrency}" + ) fun consume(record: ConsumerRecord) { logger.debug { "Received response: ${record.key()}, ${record.value()}" } val responseMessage = ProxyServerResponseMessage.createInstanceFromString(record.value()) diff --git a/application/src/main/kotlin/org/gxf/soapbridge/kafka/properties/TopicsConfigurationProperties.kt b/application/src/main/kotlin/org/gxf/soapbridge/kafka/properties/TopicsConfigurationProperties.kt index 2eb613f..1561a83 100644 --- a/application/src/main/kotlin/org/gxf/soapbridge/kafka/properties/TopicsConfigurationProperties.kt +++ b/application/src/main/kotlin/org/gxf/soapbridge/kafka/properties/TopicsConfigurationProperties.kt @@ -6,13 +6,16 @@ package org.gxf.soapbridge.kafka.properties import org.springframework.boot.context.properties.ConfigurationProperties -@ConfigurationProperties("topics") +@ConfigurationProperties("kafka") class TopicsConfigurationProperties( - val outgoing: RequestResponseTopics, - val incoming: RequestResponseTopics + val outgoing: OutgoingTopicsConfiguration, ) -class RequestResponseTopics( - val requests: String, - val responses: String +class OutgoingTopicsConfiguration( + val requests: OutgoingTopic, + val responses: OutgoingTopic ) + +class OutgoingTopic( + val topic: String +) \ No newline at end of file diff --git a/application/src/main/kotlin/org/gxf/soapbridge/kafka/senders/ProxyRequestKafkaSender.kt b/application/src/main/kotlin/org/gxf/soapbridge/kafka/senders/ProxyRequestKafkaSender.kt index 9c620a1..96fc475 100644 --- a/application/src/main/kotlin/org/gxf/soapbridge/kafka/senders/ProxyRequestKafkaSender.kt +++ b/application/src/main/kotlin/org/gxf/soapbridge/kafka/senders/ProxyRequestKafkaSender.kt @@ -17,7 +17,7 @@ class ProxyRequestKafkaSender( ) { private val logger = KotlinLogging.logger {} - private val topic = topicConfiguration.outgoing.requests + private val topic = topicConfiguration.outgoing.requests.topic fun send(requestMessage: ProxyServerRequestMessage) { logger.debug { "SOAP payload: ${requestMessage.soapPayload} to $topic" } diff --git a/application/src/main/kotlin/org/gxf/soapbridge/kafka/senders/ProxyResponseKafkaSender.kt b/application/src/main/kotlin/org/gxf/soapbridge/kafka/senders/ProxyResponseKafkaSender.kt index d97a38e..0841fd2 100644 --- a/application/src/main/kotlin/org/gxf/soapbridge/kafka/senders/ProxyResponseKafkaSender.kt +++ b/application/src/main/kotlin/org/gxf/soapbridge/kafka/senders/ProxyResponseKafkaSender.kt @@ -17,7 +17,7 @@ class ProxyResponseKafkaSender( ) { private val logger = KotlinLogging.logger {} - private val topic = topicConfiguration.outgoing.responses + private val topic = topicConfiguration.outgoing.responses.topic fun send(responseMessage: ProxyServerResponseMessage) { logger.debug { "SOAP payload: ${responseMessage.soapResponse} to $topic" } diff --git a/application/src/main/resources/application-dev.yml b/application/src/main/resources/application-dev.yml index 99ecf0c..427a305 100644 --- a/application/src/main/resources/application-dev.yml +++ b/application/src/main/resources/application-dev.yml @@ -26,20 +26,22 @@ security: provider: SunRsaSign signature: SHA256withRSA -topics: - calls: - requests: proxy-server-calls-requests - responses: proxy-server-calls-responses - notifications: - requests: proxy-server-notification-requests - responses: proxy-server-notification-responses +kafka: + outgoing: + requests: + topic: proxy-server-calls-requests + responses: + topic: proxy-server-calls-responses + incoming: + requests: + topic: proxy-server-notification-requests + concurrency: 1 + responses: + topic: proxy-server-notification-responses + concurrency: 1 soap: - notification: - host: localhost - port: 443 - protocol: https - platform: + call-endpoint: host: localhost port: 443 protocol: https diff --git a/application/src/test/java/org/gxf/soapbridge/soap/clients/SoapClientTest.java b/application/src/test/java/org/gxf/soapbridge/soap/clients/SoapClientTest.java index 268a00a..20d4b89 100644 --- a/application/src/test/java/org/gxf/soapbridge/soap/clients/SoapClientTest.java +++ b/application/src/test/java/org/gxf/soapbridge/soap/clients/SoapClientTest.java @@ -9,6 +9,7 @@ import java.net.ConnectException; import java.net.HttpURLConnection; import java.nio.charset.StandardCharsets; +import java.util.HashMap; import javax.net.ssl.HttpsURLConnection; import org.gxf.soapbridge.application.factories.HttpsUrlConnectionFactory; import org.gxf.soapbridge.application.services.SigningService; @@ -35,7 +36,7 @@ class SoapClientTest { new SoapConfigurationProperties( HostnameVerificationStrategy.BROWSER_COMPATIBLE_HOSTNAMES, 45, - "", + new HashMap<>(), new SoapEndpointConfiguration("localhost", 443, "https")); @InjectMocks SoapClient soapClient;