From f1b06a71c5b14ac689b4aa3424ad106bf6e55f65 Mon Sep 17 00:00:00 2001 From: Joris Mancini Date: Thu, 12 Dec 2024 17:50:24 +0100 Subject: [PATCH 1/3] feat: redirect build failures on rabbit dlq Signed-off-by: Joris Mancini --- .../modification/server/BuildException.java | 17 ++++++++ .../service/BuildFailedPublisherService.java | 41 ------------------- .../server/service/BuildWorkerService.java | 16 +++----- src/main/resources/config/application.yaml | 16 ++++++-- .../server/service/BuildTest.java | 11 +---- 5 files changed, 36 insertions(+), 65 deletions(-) create mode 100644 src/main/java/org/gridsuite/modification/server/BuildException.java delete mode 100644 src/main/java/org/gridsuite/modification/server/service/BuildFailedPublisherService.java diff --git a/src/main/java/org/gridsuite/modification/server/BuildException.java b/src/main/java/org/gridsuite/modification/server/BuildException.java new file mode 100644 index 000000000..29ad16c59 --- /dev/null +++ b/src/main/java/org/gridsuite/modification/server/BuildException.java @@ -0,0 +1,17 @@ +/* + * Copyright (c) 2024, RTE (http://www.rte-france.com) + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +package org.gridsuite.modification.server; + +/** + * @author Joris Mancini + */ +public class BuildException extends RuntimeException { + public BuildException(String message, Throwable e) { + super(message, e); + } +} diff --git a/src/main/java/org/gridsuite/modification/server/service/BuildFailedPublisherService.java b/src/main/java/org/gridsuite/modification/server/service/BuildFailedPublisherService.java deleted file mode 100644 index 06289dcdb..000000000 --- a/src/main/java/org/gridsuite/modification/server/service/BuildFailedPublisherService.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Copyright (c) 2022, RTE (http://www.rte-france.com) - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. - */ - -package org.gridsuite.modification.server.service; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.cloud.stream.function.StreamBridge; -import org.springframework.messaging.Message; -import org.springframework.messaging.support.MessageBuilder; -import org.springframework.stereotype.Service; - -@Service -public class BuildFailedPublisherService { - - private static final String CATEGORY_BROKER_OUTPUT = BuildFailedPublisherService.class.getName() + ".output-broker-messages"; - - private static final Logger LOGGER = LoggerFactory.getLogger(CATEGORY_BROKER_OUTPUT); - - @Autowired - private StreamBridge failedMessagePublisher; - - public void publishFail(String receiver, String failMessage) { - publish(receiver, failMessage); - } - - private void publish(String receiver, String failMessage) { - Message message = MessageBuilder - .withPayload("") - .setHeader("receiver", receiver) - .setHeader("message", failMessage) - .build(); - LOGGER.debug("Sending message : {}", message); - failedMessagePublisher.send("publishFailedBuild-out-0", message); - } -} diff --git a/src/main/java/org/gridsuite/modification/server/service/BuildWorkerService.java b/src/main/java/org/gridsuite/modification/server/service/BuildWorkerService.java index 157e06616..32da7bcd8 100644 --- a/src/main/java/org/gridsuite/modification/server/service/BuildWorkerService.java +++ b/src/main/java/org/gridsuite/modification/server/service/BuildWorkerService.java @@ -9,6 +9,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Sets; import lombok.NonNull; +import org.gridsuite.modification.server.BuildException; import org.gridsuite.modification.server.dto.BuildInfos; import org.gridsuite.modification.server.dto.NetworkModificationResult; import org.slf4j.Logger; @@ -46,8 +47,6 @@ public class BuildWorkerService { private final BuildStoppedPublisherService stoppedPublisherService; - private final BuildFailedPublisherService failedPublisherService; - private final Map> futures = new ConcurrentHashMap<>(); private final Map cancelBuildRequests = new ConcurrentHashMap<>(); @@ -61,12 +60,10 @@ public class BuildWorkerService { public BuildWorkerService(@NonNull NetworkModificationService networkModificationService, @NonNull ObjectMapper objectMapper, - @NonNull BuildStoppedPublisherService stoppedPublisherService, - @NonNull BuildFailedPublisherService failedPublisherService) { + @NonNull BuildStoppedPublisherService stoppedPublisherService) { this.networkModificationService = networkModificationService; this.objectMapper = objectMapper; this.stoppedPublisherService = stoppedPublisherService; - this.failedPublisherService = failedPublisherService; } private CompletableFuture execBuildVariant(BuildExecContext execContext, BuildInfos buildInfos) { @@ -98,11 +95,11 @@ private CompletableFuture execBuildVariant(BuildExecC @Bean public Consumer> consumeBuild() { return message -> { - BuildExecContext execContext = null; + BuildExecContext execContext; try { execContext = BuildExecContext.fromMessage(message, objectMapper); } catch (Exception e) { - LOGGER.error("Error retrieving message in consumeBuild", e); + throw new BuildException("Failed to read build message", e); } startBuild(Objects.requireNonNull(execContext)); }; @@ -124,12 +121,9 @@ private void startBuild(BuildExecContext execContext) { } catch (CancellationException e) { stoppedPublisherService.publishCancel(execContext.getReceiver(), CANCEL_MESSAGE); } catch (InterruptedException e) { - LOGGER.error(FAIL_MESSAGE, e); - failedPublisherService.publishFail(execContext.getReceiver(), FAIL_MESSAGE + " : " + e.getMessage()); Thread.currentThread().interrupt(); } catch (Exception e) { - LOGGER.error(FAIL_MESSAGE, e); - failedPublisherService.publishFail(execContext.getReceiver(), FAIL_MESSAGE + " : " + e.getMessage()); + throw new BuildException("Node build failed", e); } finally { futures.remove(execContext.getReceiver()); cancelBuildRequests.remove(execContext.getReceiver()); diff --git a/src/main/resources/config/application.yaml b/src/main/resources/config/application.yaml index 2e05b56e6..a62711220 100644 --- a/src/main/resources/config/application.yaml +++ b/src/main/resources/config/application.yaml @@ -21,6 +21,7 @@ spring: group: buildGroup consumer: concurrency: 2 + max-attempts: 1 publishBuild-out-0: destination: ${powsybl-ws.rabbitmq.destination.prefix:}build.run publishResultBuild-out-0: @@ -31,9 +32,18 @@ spring: destination: ${powsybl-ws.rabbitmq.destination.prefix:}build.cancel publishStoppedBuild-out-0: destination: ${powsybl-ws.rabbitmq.destination.prefix:}build.stopped - publishFailedBuild-out-0: - destination: ${powsybl-ws.rabbitmq.destination.prefix:}build.failed - output-bindings: publishBuild-out-0;publishResultBuild-out-0;publishCancelBuild-out-0;publishStoppedBuild-out-0;publishFailedBuild-out-0 + output-bindings: publishBuild-out-0;publishResultBuild-out-0;publishCancelBuild-out-0;publishStoppedBuild-out-0 + rabbit: + bindings: + consumeBuild-in-0: + consumer: + auto-bind-dlq: true + dead-letter-exchange: ${powsybl-ws.rabbitmq.destination.prefix:}build.run.dlx + dead-letter-routing-key: "#" + dead-letter-queue-name: ${powsybl-ws.rabbitmq.destination.prefix:}build.run.dlq + quorum: + enabled: true + delivery-limit: 2 powsybl-ws: database: diff --git a/src/test/java/org/gridsuite/modification/server/service/BuildTest.java b/src/test/java/org/gridsuite/modification/server/service/BuildTest.java index bd6b8150c..646fb3474 100644 --- a/src/test/java/org/gridsuite/modification/server/service/BuildTest.java +++ b/src/test/java/org/gridsuite/modification/server/service/BuildTest.java @@ -72,9 +72,6 @@ import static com.powsybl.iidm.network.ReactiveLimitsKind.MIN_MAX; import static org.gridsuite.modification.server.impacts.TestImpactUtils.*; import static org.gridsuite.modification.server.service.BuildWorkerService.CANCEL_MESSAGE; -import static org.gridsuite.modification.server.service.BuildWorkerService.FAIL_MESSAGE; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.startsWith; import static org.junit.jupiter.api.Assertions.*; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; @@ -127,9 +124,6 @@ class BuildTest { @Value("${spring.cloud.stream.bindings.publishStoppedBuild-out-0.destination}") private String buildStoppedDestination; - @Value("${spring.cloud.stream.bindings.publishFailedBuild-out-0.destination}") - private String buildFailedDestination; - @Autowired private OutputDestination output; @@ -910,9 +904,6 @@ void runBuildWithReportErrorTest(final MockWebServer server) throws Exception { assertTrue(TestUtils.getRequestsDone(1, server).stream().anyMatch(r -> r.matches("/v1/reports/.*"))); assertNull(output.receive(TIMEOUT, buildResultDestination)); - Message message = output.receive(TIMEOUT * 3, buildFailedDestination); - assertEquals("me", message.getHeaders().get("receiver")); - assertThat((String) message.getHeaders().get("message"), startsWith(FAIL_MESSAGE)); Message buildMessage = output.receive(TIMEOUT, consumeBuildDestination); assertNotNull(buildMessage); assertEquals("me", buildMessage.getHeaders().get("receiver")); @@ -970,7 +961,7 @@ private void testNetworkModificationsCount(UUID groupUuid, int actualSize) { @AfterEach void tearDown(final MockWebServer server) { - List destinations = List.of(consumeBuildDestination, cancelBuildDestination, buildResultDestination, buildStoppedDestination, buildFailedDestination); + List destinations = List.of(consumeBuildDestination, cancelBuildDestination, buildResultDestination, buildStoppedDestination); TestUtils.assertQueuesEmptyThenClear(destinations, output); try { TestUtils.assertServerRequestsEmptyThenShutdown(server); From a3580b61444186e925cc98df5cd746968754ea5d Mon Sep 17 00:00:00 2001 From: Joris Mancini Date: Fri, 13 Dec 2024 16:09:32 +0100 Subject: [PATCH 2/3] feat: avoid InterruptedException and rename Signed-off-by: Joris Mancini --- .../modification/server/service/BuildWorkerService.java | 4 +--- src/main/resources/config/application.yaml | 4 ++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/gridsuite/modification/server/service/BuildWorkerService.java b/src/main/java/org/gridsuite/modification/server/service/BuildWorkerService.java index 32da7bcd8..9ac2fbde4 100644 --- a/src/main/java/org/gridsuite/modification/server/service/BuildWorkerService.java +++ b/src/main/java/org/gridsuite/modification/server/service/BuildWorkerService.java @@ -110,7 +110,7 @@ private void startBuild(BuildExecContext execContext) { BuildInfos buildInfos = execContext.getBuildInfos(); CompletableFuture future = execBuildVariant(execContext, buildInfos); NetworkModificationResult result; - if (future != null && (result = future.get()) != null) { // result available + if (future != null && (result = future.join()) != null) { // result available notificationService.emitBuildResultMessage(result, execContext.getReceiver()); LOGGER.info("Build complete on node '{}'", execContext.getReceiver()); } else { // result not available : stop build request @@ -120,8 +120,6 @@ private void startBuild(BuildExecContext execContext) { } } catch (CancellationException e) { stoppedPublisherService.publishCancel(execContext.getReceiver(), CANCEL_MESSAGE); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); } catch (Exception e) { throw new BuildException("Node build failed", e); } finally { diff --git a/src/main/resources/config/application.yaml b/src/main/resources/config/application.yaml index a62711220..9fae09985 100644 --- a/src/main/resources/config/application.yaml +++ b/src/main/resources/config/application.yaml @@ -39,8 +39,8 @@ spring: consumer: auto-bind-dlq: true dead-letter-exchange: ${powsybl-ws.rabbitmq.destination.prefix:}build.run.dlx - dead-letter-routing-key: "#" - dead-letter-queue-name: ${powsybl-ws.rabbitmq.destination.prefix:}build.run.dlq + dead-letter-queue-name: ${powsybl-ws.rabbitmq.destination.prefix:}build.run.dlx.dlq + dead-letter-exchange-type: topic quorum: enabled: true delivery-limit: 2 From 334d8ba5b935d66c45347fc98688ee7cf421ec2c Mon Sep 17 00:00:00 2001 From: Joris Mancini Date: Fri, 20 Dec 2024 12:21:57 +0100 Subject: [PATCH 3/3] test: add test for incorrect input message Signed-off-by: Joris Mancini --- .../service/BuildWorkerServiceTest.java | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 src/test/java/org/gridsuite/modification/server/service/BuildWorkerServiceTest.java diff --git a/src/test/java/org/gridsuite/modification/server/service/BuildWorkerServiceTest.java b/src/test/java/org/gridsuite/modification/server/service/BuildWorkerServiceTest.java new file mode 100644 index 000000000..024aa9c18 --- /dev/null +++ b/src/test/java/org/gridsuite/modification/server/service/BuildWorkerServiceTest.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2024, RTE (http://www.rte-france.com) + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +package org.gridsuite.modification.server.service; + +import org.gridsuite.modification.server.BuildException; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.messaging.support.MessageBuilder; + +import static org.junit.jupiter.api.Assertions.assertThrows; + +@SpringBootTest +class BuildWorkerServiceTest { + + @Autowired + private BuildWorkerService buildWorkerService; + + @Test + void testConsumeBuildWithMalformedInput() { + assertThrows( + BuildException.class, + () -> buildWorkerService.consumeBuild().accept(MessageBuilder.withPayload("wrong message").build()), + "Failed to read build message"); + } +}