Skip to content

Commit

Permalink
FDP-94: Add Kafka listener concurrency configuration
Browse files Browse the repository at this point in the history
Signed-off-by: Jasper Kamerling <[email protected]>
  • Loading branch information
jasperkamerling committed Nov 17, 2023
1 parent 2e8cffc commit b74935a
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 29 deletions.
18 changes: 12 additions & 6 deletions application/src/integrationTest/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,20 @@ security:
provider: SunRsaSign
signature: SHA256withRSA

topics:
# short circuit configuration: topics are linked back to the same proxy instance
kafka:
# short circuit configuration: topics are linked back to the same proxy instance
outgoing:
requests: requests
responses: responses
requests:
topic: requests
responses:
topic: responses
incoming:
requests: requests
responses: responses
requests:
topic: requests
concurrency: 1
responses:
topic: responses
concurrency: 1

soap:
call-endpoint:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ import org.springframework.stereotype.Component
class ProxyRequestKafkaListener(private val platformCommunicationService: PlatformCommunicationService) {
private val logger = KotlinLogging.logger { }

@KafkaListener(topics = ["\${topics.incoming.requests}"], id = "gxf-request-consumer")
@KafkaListener(
id = "gxf-request-consumer",
topics = ["\${kafka.incoming.requests.topic}"],
concurrency = "\${kafka.incoming.requests.concurrency}"
)
fun consume(record: ConsumerRecord<String, String>) {
logger.debug { "Received request: ${record.key()}, ${record.value()}" }
val requestMessage = ProxyServerRequestMessage.createInstanceFromString(record.value())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ import org.springframework.stereotype.Component
class ProxyResponseKafkaListener(private val clientCommunicationService: ClientCommunicationService) {
private val logger = KotlinLogging.logger { }

@KafkaListener(topics = ["\${topics.incoming.responses}"], id = "gxf-response-consumer")
@KafkaListener(
id = "gxf-response-consumer",
topics = ["\${kafka.incoming.responses.topic}"],
concurrency = "\${kafka.incoming.responses.concurrency}"
)
fun consume(record: ConsumerRecord<String, String>) {
logger.debug { "Received response: ${record.key()}, ${record.value()}" }
val responseMessage = ProxyServerResponseMessage.createInstanceFromString(record.value())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ package org.gxf.soapbridge.kafka.properties

import org.springframework.boot.context.properties.ConfigurationProperties

@ConfigurationProperties("topics")
@ConfigurationProperties("kafka")
class TopicsConfigurationProperties(
val outgoing: RequestResponseTopics,
val incoming: RequestResponseTopics
val outgoing: OutgoingTopicsConfiguration,
)

class RequestResponseTopics(
val requests: String,
val responses: String
class OutgoingTopicsConfiguration(
val requests: OutgoingTopic,
val responses: OutgoingTopic
)

class OutgoingTopic(
val topic: String
)
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class ProxyRequestKafkaSender(
) {
private val logger = KotlinLogging.logger {}

private val topic = topicConfiguration.outgoing.requests
private val topic = topicConfiguration.outgoing.requests.topic

fun send(requestMessage: ProxyServerRequestMessage) {
logger.debug { "SOAP payload: ${requestMessage.soapPayload} to $topic" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class ProxyResponseKafkaSender(
) {
private val logger = KotlinLogging.logger {}

private val topic = topicConfiguration.outgoing.responses
private val topic = topicConfiguration.outgoing.responses.topic

fun send(responseMessage: ProxyServerResponseMessage) {
logger.debug { "SOAP payload: ${responseMessage.soapResponse} to $topic" }
Expand Down
26 changes: 14 additions & 12 deletions application/src/main/resources/application-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,22 @@ security:
provider: SunRsaSign
signature: SHA256withRSA

topics:
calls:
requests: proxy-server-calls-requests
responses: proxy-server-calls-responses
notifications:
requests: proxy-server-notification-requests
responses: proxy-server-notification-responses
kafka:
outgoing:
requests:
topic: proxy-server-calls-requests
responses:
topic: proxy-server-calls-responses
incoming:
requests:
topic: proxy-server-notification-requests
concurrency: 1
responses:
topic: proxy-server-notification-responses
concurrency: 1

soap:
notification:
host: localhost
port: 443
protocol: https
platform:
call-endpoint:
host: localhost
port: 443
protocol: https
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.net.ConnectException;
import java.net.HttpURLConnection;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import javax.net.ssl.HttpsURLConnection;
import org.gxf.soapbridge.application.factories.HttpsUrlConnectionFactory;
import org.gxf.soapbridge.application.services.SigningService;
Expand All @@ -35,7 +36,7 @@ class SoapClientTest {
new SoapConfigurationProperties(
HostnameVerificationStrategy.BROWSER_COMPATIBLE_HOSTNAMES,
45,
"",
new HashMap<>(),
new SoapEndpointConfiguration("localhost", 443, "https"));

@InjectMocks SoapClient soapClient;
Expand Down

0 comments on commit b74935a

Please sign in to comment.