Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FDP-94: Add monitoring and small fixes #9

Merged
merged 12 commits into from
Dec 7, 2023
Merged
1 change: 1 addition & 0 deletions application/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ dependencies {
testImplementation("org.junit.jupiter:junit-jupiter-engine")
testImplementation("org.junit.jupiter:junit-jupiter-params")
testImplementation("org.mockito:mockito-junit-jupiter")
testImplementation("org.assertj:assertj-core")

// Generate test and integration test reports
jacocoAggregation(project(":application"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import java.time.Duration


@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@ComponentScan(basePackages = ["org.gxf.soapbridge"])
@EmbeddedKafka(topics = ["requests", "responses"])
class EndToEndTest(
@LocalServerPort private val soapPort: Int,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package org.gxf.soapbridge.application.services;

import org.gxf.soapbridge.soap.clients.Connection;
import org.gxf.soapbridge.soap.exceptions.ConnectionNotFoundInCacheException;
import org.gxf.soapbridge.valueobjects.ProxyServerResponseMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -17,48 +16,50 @@
@Service
public class ClientCommunicationService {

private static final Logger LOGGER = LoggerFactory.getLogger(ClientCommunicationService.class);
private static final Logger LOGGER = LoggerFactory.getLogger(ClientCommunicationService.class);

/** Service used to cache incoming connections from client applications. */
private final ConnectionCacheService connectionCacheService;
/**
* Service used to cache incoming connections from client applications.
*/
private final ConnectionCacheService connectionCacheService;

/** Service used to sign and/or verify the content of queue messages. */
private final SigningService signingService;
/**
* Service used to sign and/or verify the content of queue messages.
*/
private final SigningService signingService;

public ClientCommunicationService(
final ConnectionCacheService connectionCacheService, final SigningService signingService) {
this.connectionCacheService = connectionCacheService;
this.signingService = signingService;
}
public ClientCommunicationService(
final ConnectionCacheService connectionCacheService, final SigningService signingService) {
this.connectionCacheService = connectionCacheService;
this.signingService = signingService;
}

/**
* Process an incoming queue message. The content of the message has to be verified by the {@link
* SigningService}. Then a response from GXF will set for the pending connection from a client.
*
* @param proxyServerResponseMessage The incoming queue message to process.
*/
public void handleIncomingResponse(final ProxyServerResponseMessage proxyServerResponseMessage) {
final boolean isValid =
signingService.verifyContent(
proxyServerResponseMessage.constructString(),
proxyServerResponseMessage.getSignature());

/**
* Process an incoming queue message. The content of the message has to be verified by the {@link
* SigningService}. Then a response from GXF will set for the pending connection from a client.
*
* @param proxyServerResponseMessage The incoming queue message to process.
*/
public void handleIncomingResponse(final ProxyServerResponseMessage proxyServerResponseMessage) {
final boolean isValid =
signingService.verifyContent(
proxyServerResponseMessage.constructString(),
proxyServerResponseMessage.getSignature());
final Connection connection =
connectionCacheService.findConnection(proxyServerResponseMessage.getConnectionId());

if (connection == null) {
LOGGER.error("No connection found in cache for id: {}", proxyServerResponseMessage.getConnectionId());
return;
}

try {
final Connection connection =
connectionCacheService.findConnection(proxyServerResponseMessage.getConnectionId());
if (connection != null) {
if (isValid) {
LOGGER.debug("Connection valid, set SOAP response");
connection.setSoapResponse(proxyServerResponseMessage.getSoapResponse());
LOGGER.debug("Connection valid, set SOAP response");
connection.setSoapResponse(proxyServerResponseMessage.getSoapResponse());
} else {
LOGGER.error("ProxyServerResponseMessage failed to pass security check.");
connection.setSoapResponse("Security check has failed.");
LOGGER.error("ProxyServerResponseMessage failed to pass security check.");
connection.setSoapResponse("Security check has failed.");
}
} else {
LOGGER.error("No connection found in cache for id.");
}
} catch (final ConnectionNotFoundInCacheException e) {
LOGGER.error("ConnectionNotFoundInCacheException", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
package org.gxf.soapbridge.application.services;

import java.util.concurrent.ConcurrentHashMap;

import jakarta.annotation.Nullable;
import jakarta.annotation.PostConstruct;
import org.gxf.soapbridge.monitoring.MonitoringService;
import org.gxf.soapbridge.soap.clients.Connection;
import org.gxf.soapbridge.soap.exceptions.ConnectionNotFoundInCacheException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
Expand All @@ -21,6 +24,17 @@ public class ConnectionCacheService {
*/
private static final ConcurrentHashMap<String, Connection> cache = new ConcurrentHashMap<>();

private final MonitoringService monitoringService;

public ConnectionCacheService(MonitoringService monitoringService) {
this.monitoringService = monitoringService;
}

@PostConstruct
public void postConstructor() {
monitoringService.monitorCacheSize(cache);
}

/**
* Creates a connection and puts it in the cache.
*
Expand All @@ -39,19 +53,12 @@ public Connection cacheConnection() {
*
* @param connectionId The key for the {@link Connection} instance obtained by calling {@link
* ConnectionCacheService#cacheConnection()}.
* @return A {@link Connection} instance.
* @throws ConnectionNotFoundInCacheException In case the connection is not present in the {@link
* ConnectionCacheService#cache}.
* @return A {@link Connection} instance. If no connection with the id is present return null.
*/
public Connection findConnection(final String connectionId)
throws ConnectionNotFoundInCacheException {
@Nullable
public Connection findConnection(final String connectionId) {
LOGGER.debug("Trying to find connection with connectionId: {}", connectionId);
final Connection connection = cache.get(connectionId);
if (connection == null) {
throw new ConnectionNotFoundInCacheException(
String.format("Unable to find connection for connectionId: %s", connectionId));
}
return connection;
return cache.get(connectionId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@
import java.io.BufferedReader;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.*;
import java.util.stream.Collectors;
import org.gxf.soapbridge.application.services.ConnectionCacheService;
import org.gxf.soapbridge.application.services.SigningService;
import org.gxf.soapbridge.configuration.properties.SoapConfigurationProperties;
import org.gxf.soapbridge.kafka.senders.ProxyRequestKafkaSender;
import org.gxf.soapbridge.monitoring.MonitoringService;
import org.gxf.soapbridge.soap.clients.Connection;
import org.gxf.soapbridge.soap.exceptions.ConnectionNotFoundInCacheException;
import org.gxf.soapbridge.soap.exceptions.ProxyServerException;
import org.gxf.soapbridge.valueobjects.ProxyServerRequestMessage;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -57,16 +58,20 @@ public class SoapEndpoint implements HttpRequestHandler {
/** Map of time-outs for specific functions. */
private final Map<String, Integer> customTimeOutsMap;

private final MonitoringService monitoringService;

public SoapEndpoint(
final ConnectionCacheService connectionCacheService,
final SoapConfigurationProperties soapConfiguration,
final ProxyRequestKafkaSender proxyRequestsSender,
final SigningService signingService) {
final ConnectionCacheService connectionCacheService,
final SoapConfigurationProperties soapConfiguration,
final ProxyRequestKafkaSender proxyRequestsSender,
final SigningService signingService,
MonitoringService monitoringService) {
this.connectionCacheService = connectionCacheService;
this.soapConfiguration = soapConfiguration;
this.proxyRequestsSender = proxyRequestsSender;
this.signingService = signingService;
customTimeOutsMap = soapConfiguration.getCustomTimeouts();
this.monitoringService = monitoringService;
}

/** Handles incoming SOAP requests. */
Expand All @@ -75,6 +80,7 @@ public void handleRequest(
@NotNull final HttpServletRequest request, @NotNull final HttpServletResponse response)
throws ServletException, IOException {

Instant startTime = Instant.now();
// For debugging, print all headers and parameters.
LOGGER.debug("Start of SoapEndpoint.handleRequest()");
logHeaderValues(request);
Expand All @@ -89,6 +95,7 @@ public void handleRequest(
final String soapPayload = readSoapPayload(request);
if (soapPayload == null) {
LOGGER.error("Unable to read SOAP request, returning 500.");
monitoringService.recordConnectionTime(startTime, request.getContextPath(), false);
createErrorResponse(response);
return;
}
Expand All @@ -102,6 +109,7 @@ public void handleRequest(
}
if (organisationName == null) {
LOGGER.error("Unable to find client certificate, returning 500.");
monitoringService.recordConnectionTime(startTime, request.getContextPath(), false);
createErrorResponse(response);
return;
}
Expand All @@ -121,6 +129,7 @@ public void handleRequest(
requestMessage.setSignature(signature);
} catch (final ProxyServerException e) {
LOGGER.error("Unable to sign message or set security key", e);
monitoringService.recordConnectionTime(startTime, request.getContextPath(), false);
createErrorResponse(response);
connectionCacheService.removeConnection(connectionId);
return;
Expand All @@ -142,12 +151,14 @@ public void handleRequest(
final boolean responseReceived = newConnection.waitForResponseReceived(timeout);
if (!responseReceived) {
LOGGER.error("No response received within the specified timeout of {} seconds", timeout);
monitoringService.recordConnectionTime(startTime, request.getContextPath(), false);
createErrorResponse(response);
connectionCacheService.removeConnection(connectionId);
return;
}
} catch (final InterruptedException e) {
LOGGER.error("Error while waiting for response", e);
monitoringService.recordConnectionTime(startTime, request.getContextPath(), false);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mogelijk is het nuttiger om errors te tellen. Eventueel naast het meten van de duratie.

createErrorResponse(response);
connectionCacheService.removeConnection(connectionId);
Thread.currentThread().interrupt();
Expand All @@ -157,10 +168,12 @@ public void handleRequest(
final String soap = readResponse(connectionId);
if (soap == null) {
LOGGER.error("Unable to read SOAP response: null");
monitoringService.recordConnectionTime(startTime, request.getContextPath(), false);
createErrorResponse(response);
} else {
LOGGER.debug("Request handled, trying to send response...");
createSuccessFulResponse(response, soap);
monitoringService.recordConnectionTime(startTime, request.getContextPath(), true);
}

LOGGER.debug(
Expand Down Expand Up @@ -229,14 +242,16 @@ private Integer shouldUseCustomTimeOut(final String soapPayload) {

private String readResponse(final String connectionId) throws ServletException {
final String soap;
try {
final Connection connection = connectionCacheService.findConnection(connectionId);
soap = connection.getSoapResponse();
connectionCacheService.removeConnection(connectionId);
} catch (final ConnectionNotFoundInCacheException e) {
LOGGER.error("Unexpected error while trying to find a cached connection", e);

final Connection connection = connectionCacheService.findConnection(connectionId);

if (connection == null) {
LOGGER.error("Unexpected error while trying to find a cached connection for id: {}", connectionId);
throw new ServletException("Unable to obtain response");
}

soap = connection.getSoapResponse();
connectionCacheService.removeConnection(connectionId);
return soap;
}

Expand Down

This file was deleted.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is dit voldoende om ook bijv. fail-rates uit te halen? Of moeten we wat counters toeveoegen?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is het überhaupt nodig om fouten te timen? Kunnen we niet gerichter timeouts, technische exceptions etc. 'counten'?

Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package org.gxf.soapbridge.monitoring

import io.micrometer.core.instrument.Gauge
import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.Timer
import org.springframework.stereotype.Service
import java.time.Duration
import java.time.Instant

@Service
class MonitoringService(
private val registry: MeterRegistry
) {

companion object {
private const val METRIC_PREFIX = "gxf.soap.bridge"
const val CACHE_SIZE_METRIC = "${METRIC_PREFIX}.cache.size"
const val CONNECTION_TIMER_METRIC = "${METRIC_PREFIX}.request.timer"

const val CONNECTION_TIMER_CONTEXT_TAG = "context"
const val CONNECTION_TIMER_SUCCESSFUL_TAG = "successful"

}

/**
* Creates a gauge to monitor the size of a cache.
*
* @param cache The cache to monitor, represented as a Map.
* @return A Gauge object that measures the size of the cache.
*/
fun monitorCacheSize(cache: Map<*, *>) =
Gauge
.builder(CACHE_SIZE_METRIC, cache) { it.size.toDouble() }
.register(registry)

/**
* Records the connection time for a request.
*
* The timer also counts the amount of requests handled.
*
* @param startTime The start time of the request.
* @param context The context of the request.
* @param successful Flag indicating if the request was successful.
*/
fun recordConnectionTime(startTime: Instant, context: String, successful: Boolean) {
sanderv marked this conversation as resolved.
Show resolved Hide resolved
val duration = Duration.between(startTime, Instant.now())

Timer
.builder(CONNECTION_TIMER_METRIC)
.description("The time it takes to handle an incoming soap request")
.tag(CONNECTION_TIMER_CONTEXT_TAG, context)
.tag(CONNECTION_TIMER_SUCCESSFUL_TAG, successful.toString())
.register(registry)
.record(duration)
}
}
Loading
Loading