diff --git a/src/main/java/org/gridsuite/modification/server/BuildException.java b/src/main/java/org/gridsuite/modification/server/BuildException.java new file mode 100644 index 000000000..29ad16c59 --- /dev/null +++ b/src/main/java/org/gridsuite/modification/server/BuildException.java @@ -0,0 +1,17 @@ +/* + * Copyright (c) 2024, RTE (http://www.rte-france.com) + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +package org.gridsuite.modification.server; + +/** + * @author Joris Mancini + */ +public class BuildException extends RuntimeException { + public BuildException(String message, Throwable e) { + super(message, e); + } +} diff --git a/src/main/java/org/gridsuite/modification/server/service/BuildFailedPublisherService.java b/src/main/java/org/gridsuite/modification/server/service/BuildFailedPublisherService.java deleted file mode 100644 index 06289dcdb..000000000 --- a/src/main/java/org/gridsuite/modification/server/service/BuildFailedPublisherService.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Copyright (c) 2022, RTE (http://www.rte-france.com) - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. - */ - -package org.gridsuite.modification.server.service; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.cloud.stream.function.StreamBridge; -import org.springframework.messaging.Message; -import org.springframework.messaging.support.MessageBuilder; -import org.springframework.stereotype.Service; - -@Service -public class BuildFailedPublisherService { - - private static final String CATEGORY_BROKER_OUTPUT = BuildFailedPublisherService.class.getName() + ".output-broker-messages"; - - private static final Logger LOGGER = LoggerFactory.getLogger(CATEGORY_BROKER_OUTPUT); - - @Autowired - private StreamBridge failedMessagePublisher; - - public void publishFail(String receiver, String failMessage) { - publish(receiver, failMessage); - } - - private void publish(String receiver, String failMessage) { - Message message = MessageBuilder - .withPayload("") - .setHeader("receiver", receiver) - .setHeader("message", failMessage) - .build(); - LOGGER.debug("Sending message : {}", message); - failedMessagePublisher.send("publishFailedBuild-out-0", message); - } -} diff --git a/src/main/java/org/gridsuite/modification/server/service/BuildWorkerService.java b/src/main/java/org/gridsuite/modification/server/service/BuildWorkerService.java index 157e06616..9ac2fbde4 100644 --- a/src/main/java/org/gridsuite/modification/server/service/BuildWorkerService.java +++ b/src/main/java/org/gridsuite/modification/server/service/BuildWorkerService.java @@ -9,6 +9,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Sets; import lombok.NonNull; +import org.gridsuite.modification.server.BuildException; import org.gridsuite.modification.server.dto.BuildInfos; import org.gridsuite.modification.server.dto.NetworkModificationResult; import org.slf4j.Logger; @@ -46,8 +47,6 @@ public class BuildWorkerService { private final BuildStoppedPublisherService stoppedPublisherService; - private final BuildFailedPublisherService failedPublisherService; - private final Map> futures = new ConcurrentHashMap<>(); private final Map cancelBuildRequests = new ConcurrentHashMap<>(); @@ -61,12 +60,10 @@ public class BuildWorkerService { public BuildWorkerService(@NonNull NetworkModificationService networkModificationService, @NonNull ObjectMapper objectMapper, - @NonNull BuildStoppedPublisherService stoppedPublisherService, - @NonNull BuildFailedPublisherService failedPublisherService) { + @NonNull BuildStoppedPublisherService stoppedPublisherService) { this.networkModificationService = networkModificationService; this.objectMapper = objectMapper; this.stoppedPublisherService = stoppedPublisherService; - this.failedPublisherService = failedPublisherService; } private CompletableFuture execBuildVariant(BuildExecContext execContext, BuildInfos buildInfos) { @@ -98,11 +95,11 @@ private CompletableFuture execBuildVariant(BuildExecC @Bean public Consumer> consumeBuild() { return message -> { - BuildExecContext execContext = null; + BuildExecContext execContext; try { execContext = BuildExecContext.fromMessage(message, objectMapper); } catch (Exception e) { - LOGGER.error("Error retrieving message in consumeBuild", e); + throw new BuildException("Failed to read build message", e); } startBuild(Objects.requireNonNull(execContext)); }; @@ -113,7 +110,7 @@ private void startBuild(BuildExecContext execContext) { BuildInfos buildInfos = execContext.getBuildInfos(); CompletableFuture future = execBuildVariant(execContext, buildInfos); NetworkModificationResult result; - if (future != null && (result = future.get()) != null) { // result available + if (future != null && (result = future.join()) != null) { // result available notificationService.emitBuildResultMessage(result, execContext.getReceiver()); LOGGER.info("Build complete on node '{}'", execContext.getReceiver()); } else { // result not available : stop build request @@ -123,13 +120,8 @@ private void startBuild(BuildExecContext execContext) { } } catch (CancellationException e) { stoppedPublisherService.publishCancel(execContext.getReceiver(), CANCEL_MESSAGE); - } catch (InterruptedException e) { - LOGGER.error(FAIL_MESSAGE, e); - failedPublisherService.publishFail(execContext.getReceiver(), FAIL_MESSAGE + " : " + e.getMessage()); - Thread.currentThread().interrupt(); } catch (Exception e) { - LOGGER.error(FAIL_MESSAGE, e); - failedPublisherService.publishFail(execContext.getReceiver(), FAIL_MESSAGE + " : " + e.getMessage()); + throw new BuildException("Node build failed", e); } finally { futures.remove(execContext.getReceiver()); cancelBuildRequests.remove(execContext.getReceiver()); diff --git a/src/main/resources/config/application.yaml b/src/main/resources/config/application.yaml index 2e05b56e6..9fae09985 100644 --- a/src/main/resources/config/application.yaml +++ b/src/main/resources/config/application.yaml @@ -21,6 +21,7 @@ spring: group: buildGroup consumer: concurrency: 2 + max-attempts: 1 publishBuild-out-0: destination: ${powsybl-ws.rabbitmq.destination.prefix:}build.run publishResultBuild-out-0: @@ -31,9 +32,18 @@ spring: destination: ${powsybl-ws.rabbitmq.destination.prefix:}build.cancel publishStoppedBuild-out-0: destination: ${powsybl-ws.rabbitmq.destination.prefix:}build.stopped - publishFailedBuild-out-0: - destination: ${powsybl-ws.rabbitmq.destination.prefix:}build.failed - output-bindings: publishBuild-out-0;publishResultBuild-out-0;publishCancelBuild-out-0;publishStoppedBuild-out-0;publishFailedBuild-out-0 + output-bindings: publishBuild-out-0;publishResultBuild-out-0;publishCancelBuild-out-0;publishStoppedBuild-out-0 + rabbit: + bindings: + consumeBuild-in-0: + consumer: + auto-bind-dlq: true + dead-letter-exchange: ${powsybl-ws.rabbitmq.destination.prefix:}build.run.dlx + dead-letter-queue-name: ${powsybl-ws.rabbitmq.destination.prefix:}build.run.dlx.dlq + dead-letter-exchange-type: topic + quorum: + enabled: true + delivery-limit: 2 powsybl-ws: database: diff --git a/src/test/java/org/gridsuite/modification/server/service/BuildTest.java b/src/test/java/org/gridsuite/modification/server/service/BuildTest.java index bd6b8150c..646fb3474 100644 --- a/src/test/java/org/gridsuite/modification/server/service/BuildTest.java +++ b/src/test/java/org/gridsuite/modification/server/service/BuildTest.java @@ -72,9 +72,6 @@ import static com.powsybl.iidm.network.ReactiveLimitsKind.MIN_MAX; import static org.gridsuite.modification.server.impacts.TestImpactUtils.*; import static org.gridsuite.modification.server.service.BuildWorkerService.CANCEL_MESSAGE; -import static org.gridsuite.modification.server.service.BuildWorkerService.FAIL_MESSAGE; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.startsWith; import static org.junit.jupiter.api.Assertions.*; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; @@ -127,9 +124,6 @@ class BuildTest { @Value("${spring.cloud.stream.bindings.publishStoppedBuild-out-0.destination}") private String buildStoppedDestination; - @Value("${spring.cloud.stream.bindings.publishFailedBuild-out-0.destination}") - private String buildFailedDestination; - @Autowired private OutputDestination output; @@ -910,9 +904,6 @@ void runBuildWithReportErrorTest(final MockWebServer server) throws Exception { assertTrue(TestUtils.getRequestsDone(1, server).stream().anyMatch(r -> r.matches("/v1/reports/.*"))); assertNull(output.receive(TIMEOUT, buildResultDestination)); - Message message = output.receive(TIMEOUT * 3, buildFailedDestination); - assertEquals("me", message.getHeaders().get("receiver")); - assertThat((String) message.getHeaders().get("message"), startsWith(FAIL_MESSAGE)); Message buildMessage = output.receive(TIMEOUT, consumeBuildDestination); assertNotNull(buildMessage); assertEquals("me", buildMessage.getHeaders().get("receiver")); @@ -970,7 +961,7 @@ private void testNetworkModificationsCount(UUID groupUuid, int actualSize) { @AfterEach void tearDown(final MockWebServer server) { - List destinations = List.of(consumeBuildDestination, cancelBuildDestination, buildResultDestination, buildStoppedDestination, buildFailedDestination); + List destinations = List.of(consumeBuildDestination, cancelBuildDestination, buildResultDestination, buildStoppedDestination); TestUtils.assertQueuesEmptyThenClear(destinations, output); try { TestUtils.assertServerRequestsEmptyThenShutdown(server); diff --git a/src/test/java/org/gridsuite/modification/server/service/BuildWorkerServiceTest.java b/src/test/java/org/gridsuite/modification/server/service/BuildWorkerServiceTest.java new file mode 100644 index 000000000..024aa9c18 --- /dev/null +++ b/src/test/java/org/gridsuite/modification/server/service/BuildWorkerServiceTest.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2024, RTE (http://www.rte-france.com) + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +package org.gridsuite.modification.server.service; + +import org.gridsuite.modification.server.BuildException; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.messaging.support.MessageBuilder; + +import static org.junit.jupiter.api.Assertions.assertThrows; + +@SpringBootTest +class BuildWorkerServiceTest { + + @Autowired + private BuildWorkerService buildWorkerService; + + @Test + void testConsumeBuildWithMalformedInput() { + assertThrows( + BuildException.class, + () -> buildWorkerService.consumeBuild().accept(MessageBuilder.withPayload("wrong message").build()), + "Failed to read build message"); + } +}