diff --git a/core/deployment/src/main/java/io/quarkus/deployment/QuarkusAugmentor.java b/core/deployment/src/main/java/io/quarkus/deployment/QuarkusAugmentor.java index 2a71c00b6eb44..99df303269ebf 100644 --- a/core/deployment/src/main/java/io/quarkus/deployment/QuarkusAugmentor.java +++ b/core/deployment/src/main/java/io/quarkus/deployment/QuarkusAugmentor.java @@ -32,6 +32,7 @@ import io.quarkus.deployment.builditem.LiveReloadBuildItem; import io.quarkus.deployment.builditem.QuarkusBuildCloseablesBuildItem; import io.quarkus.deployment.builditem.RawCommandLineArgumentsBuildItem; +import io.quarkus.deployment.builditem.RuntimeApplicationShutdownBuildItem; import io.quarkus.deployment.builditem.ShutdownContextBuildItem; import io.quarkus.deployment.pkg.builditem.BuildSystemTargetBuildItem; import io.quarkus.dev.spi.DevModeType; @@ -128,6 +129,9 @@ public BuildResult run() throws Exception { for (Consumer i : buildChainCustomizers) { i.accept(chainBuilder); } + if (launchMode.isDevOrTest()) { + chainBuilder.addFinal(RuntimeApplicationShutdownBuildItem.class); + } final ArchiveRootBuildItem.Builder rootBuilder = ArchiveRootBuildItem.builder(); if (root != null) { diff --git a/core/deployment/src/main/java/io/quarkus/deployment/builditem/RuntimeApplicationShutdownBuildItem.java b/core/deployment/src/main/java/io/quarkus/deployment/builditem/RuntimeApplicationShutdownBuildItem.java new file mode 100644 index 0000000000000..1145e33f05a2f --- /dev/null +++ b/core/deployment/src/main/java/io/quarkus/deployment/builditem/RuntimeApplicationShutdownBuildItem.java @@ -0,0 +1,29 @@ +package io.quarkus.deployment.builditem; + +import org.jboss.logging.Logger; + +import io.quarkus.builder.item.MultiBuildItem; + +/** + * Build Item that can be used to queue shutdown tasks that are run when the runtime application shuts down. + * + * This is similar to {@link ShutdownContextBuildItem} however it applies to tasks on the 'build' side, so if a processor + * wants to close something after the application has completed this item lets it do this. + * + * This has no effect for production applications, and is only useful in dev/test mode. The main use case for this is + * for shutting down deployment side test utilities at the end of a test run. + */ +public final class RuntimeApplicationShutdownBuildItem extends MultiBuildItem { + + private static final Logger log = Logger.getLogger(RuntimeApplicationShutdownBuildItem.class); + + private final Runnable closeTask; + + public RuntimeApplicationShutdownBuildItem(Runnable closeTask) { + this.closeTask = closeTask; + } + + public Runnable getCloseTask() { + return closeTask; + } +} diff --git a/core/deployment/src/main/java/io/quarkus/deployment/dev/testing/TestTracingProcessor.java b/core/deployment/src/main/java/io/quarkus/deployment/dev/testing/TestTracingProcessor.java index 7e2777caaa5e4..89daef853919d 100644 --- a/core/deployment/src/main/java/io/quarkus/deployment/dev/testing/TestTracingProcessor.java +++ b/core/deployment/src/main/java/io/quarkus/deployment/dev/testing/TestTracingProcessor.java @@ -25,6 +25,7 @@ import io.quarkus.deployment.builditem.ServiceStartBuildItem; import io.quarkus.deployment.logging.LogCleanupFilterBuildItem; import io.quarkus.dev.spi.DevModeType; +import io.quarkus.dev.testing.ContinuousTestingSharedStateManager; import io.quarkus.dev.testing.TracingHandler; import io.quarkus.gizmo.Gizmo; @@ -82,6 +83,8 @@ void startTesting(TestConfig config, LiveReloadBuildItem liveReloadBuildItem, testSupport.stop(); } } + QuarkusClassLoader cl = (QuarkusClassLoader) Thread.currentThread().getContextClassLoader(); + ((QuarkusClassLoader) cl.parent()).addCloseTask(ContinuousTestingSharedStateManager::reset); } @BuildStep(onlyIf = IsTest.class) diff --git a/core/deployment/src/main/java/io/quarkus/runner/bootstrap/StartupActionImpl.java b/core/deployment/src/main/java/io/quarkus/runner/bootstrap/StartupActionImpl.java index b0710e1efbb24..9e41b7d060350 100644 --- a/core/deployment/src/main/java/io/quarkus/runner/bootstrap/StartupActionImpl.java +++ b/core/deployment/src/main/java/io/quarkus/runner/bootstrap/StartupActionImpl.java @@ -31,6 +31,7 @@ import io.quarkus.deployment.builditem.GeneratedClassBuildItem; import io.quarkus.deployment.builditem.GeneratedResourceBuildItem; import io.quarkus.deployment.builditem.MainClassBuildItem; +import io.quarkus.deployment.builditem.RuntimeApplicationShutdownBuildItem; import io.quarkus.deployment.builditem.TransformedClassesBuildItem; import io.quarkus.deployment.configuration.RunTimeConfigurationGenerator; import io.quarkus.dev.appstate.ApplicationStateNotification; @@ -106,6 +107,14 @@ public void run() { if (ApplicationStateNotification.getState() == ApplicationStateNotification.State.INITIAL) { ApplicationStateNotification.notifyStartupFailed(e); } + } finally { + for (var i : buildResult.consumeMulti(RuntimeApplicationShutdownBuildItem.class)) { + try { + i.getCloseTask().run(); + } catch (Throwable t) { + log.error("Failed to run close task", t); + } + } } } }, "Quarkus Main Thread"); @@ -173,6 +182,13 @@ public void accept(Integer integer) { } finally { runtimeClassLoader.close(); Thread.currentThread().setContextClassLoader(old); + for (var i : buildResult.consumeMulti(RuntimeApplicationShutdownBuildItem.class)) { + try { + i.getCloseTask().run(); + } catch (Throwable t) { + log.error("Failed to run close task", t); + } + } } } @@ -230,6 +246,14 @@ public void close() throws IOException { } } finally { ForkJoinClassLoading.setForkJoinClassLoader(ClassLoader.getSystemClassLoader()); + + for (var i : buildResult.consumeMulti(RuntimeApplicationShutdownBuildItem.class)) { + try { + i.getCloseTask().run(); + } catch (Throwable t) { + log.error("Failed to run close task", t); + } + } if (curatedApplication.getQuarkusBootstrap().getMode() == QuarkusBootstrap.Mode.TEST && !curatedApplication.getQuarkusBootstrap().isAuxiliaryApplication()) { //for tests we just always shut down the curated application, as it is only used once diff --git a/extensions/amazon-lambda-http/http-event-server/src/main/java/io/quarkus/amazon/lambda/runtime/MockHttpEventServer.java b/extensions/amazon-lambda-http/http-event-server/src/main/java/io/quarkus/amazon/lambda/runtime/MockHttpEventServer.java index c5f255c3ccebf..5130a50ad374a 100644 --- a/extensions/amazon-lambda-http/http-event-server/src/main/java/io/quarkus/amazon/lambda/runtime/MockHttpEventServer.java +++ b/extensions/amazon-lambda-http/http-event-server/src/main/java/io/quarkus/amazon/lambda/runtime/MockHttpEventServer.java @@ -80,6 +80,7 @@ public void handleHttpRequests(RoutingContext ctx) { try { byte[] mEvent = eventWriter.writeValueAsBytes(event); ctx.put(APIGatewayV2HTTPEvent.class.getName(), mEvent); + log.debugf("Putting message %s into the queue", requestId); queue.put(ctx); } catch (Exception e) { log.error("Publish failure", e); diff --git a/extensions/amazon-lambda-rest/rest-event-server/src/main/java/io/quarkus/amazon/lambda/runtime/MockRestEventServer.java b/extensions/amazon-lambda-rest/rest-event-server/src/main/java/io/quarkus/amazon/lambda/runtime/MockRestEventServer.java index 4a3665eaf2188..a89a964671d06 100644 --- a/extensions/amazon-lambda-rest/rest-event-server/src/main/java/io/quarkus/amazon/lambda/runtime/MockRestEventServer.java +++ b/extensions/amazon-lambda-rest/rest-event-server/src/main/java/io/quarkus/amazon/lambda/runtime/MockRestEventServer.java @@ -24,6 +24,7 @@ import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpServerResponse; import io.vertx.ext.web.RoutingContext; +import org.jboss.logging.Logger; public class MockRestEventServer extends MockEventServer { @@ -104,6 +105,7 @@ public void handleHttpRequests(RoutingContext ctx) { try { byte[] mEvent = eventWriter.writeValueAsBytes(event); ctx.put(AwsProxyRequest.class.getName(), mEvent); + log.debugf("Putting message %s into the queue", requestId); queue.put(ctx); } catch (Exception e) { log.error("Publish failure", e); diff --git a/extensions/amazon-lambda/common-deployment/src/main/java/io/quarkus/amazon/lambda/deployment/DevServicesLambdaProcessor.java b/extensions/amazon-lambda/common-deployment/src/main/java/io/quarkus/amazon/lambda/deployment/DevServicesLambdaProcessor.java index 6b41e5100b0c9..8a8eaea1e625a 100644 --- a/extensions/amazon-lambda/common-deployment/src/main/java/io/quarkus/amazon/lambda/deployment/DevServicesLambdaProcessor.java +++ b/extensions/amazon-lambda/common-deployment/src/main/java/io/quarkus/amazon/lambda/deployment/DevServicesLambdaProcessor.java @@ -14,9 +14,11 @@ import io.quarkus.deployment.IsNormal; import io.quarkus.deployment.annotations.BuildProducer; import io.quarkus.deployment.annotations.BuildStep; +import io.quarkus.deployment.annotations.Produce; import io.quarkus.deployment.annotations.Record; -import io.quarkus.deployment.builditem.DevServicesNativeConfigResultBuildItem; +import io.quarkus.deployment.builditem.DevServicesConfigResultBuildItem; import io.quarkus.deployment.builditem.LaunchModeBuildItem; +import io.quarkus.deployment.builditem.RuntimeApplicationShutdownBuildItem; import io.quarkus.deployment.builditem.ServiceStartBuildItem; import io.quarkus.runtime.LaunchMode; @@ -47,12 +49,14 @@ private boolean legacyTestingEnabled() { } } + @Produce(ServiceStartBuildItem.class) @BuildStep(onlyIfNot = IsNormal.class) public void startEventServer(LaunchModeBuildItem launchMode, LambdaConfig config, Optional override, - BuildProducer devServicePropertiesProducer, - BuildProducer serviceStartBuildItemBuildProducer) throws Exception { + BuildProducer devServicePropertiesProducer, + BuildProducer runtimeApplicationShutdownBuildItemBuildProducer) + throws Exception { if (!launchMode.getLaunchMode().isDevOrTest()) return; if (legacyTestingEnabled()) @@ -73,9 +77,8 @@ public void startEventServer(LaunchModeBuildItem launchMode, startMode = launchMode.getLaunchMode(); server.start(port); String baseUrl = "localhost:" + port + MockEventServer.BASE_PATH; - System.setProperty(AmazonLambdaApi.QUARKUS_INTERNAL_AWS_LAMBDA_TEST_API, baseUrl); devServicePropertiesProducer.produce( - new DevServicesNativeConfigResultBuildItem(AmazonLambdaApi.QUARKUS_INTERNAL_AWS_LAMBDA_TEST_API, baseUrl)); + new DevServicesConfigResultBuildItem(AmazonLambdaApi.QUARKUS_INTERNAL_AWS_LAMBDA_TEST_API, baseUrl)); Runnable closeTask = () -> { if (server != null) { try { @@ -91,5 +94,8 @@ public void startEventServer(LaunchModeBuildItem launchMode, }; QuarkusClassLoader cl = (QuarkusClassLoader) Thread.currentThread().getContextClassLoader(); ((QuarkusClassLoader) cl.parent()).addCloseTask(closeTask); + if (launchMode.isTest()) { + runtimeApplicationShutdownBuildItemBuildProducer.produce(new RuntimeApplicationShutdownBuildItem(closeTask)); + } } } diff --git a/extensions/amazon-lambda/common-runtime/src/main/java/io/quarkus/amazon/lambda/runtime/AbstractLambdaPollLoop.java b/extensions/amazon-lambda/common-runtime/src/main/java/io/quarkus/amazon/lambda/runtime/AbstractLambdaPollLoop.java index 1d7052ccc766d..eb861016d78ea 100644 --- a/extensions/amazon-lambda/common-runtime/src/main/java/io/quarkus/amazon/lambda/runtime/AbstractLambdaPollLoop.java +++ b/extensions/amazon-lambda/common-runtime/src/main/java/io/quarkus/amazon/lambda/runtime/AbstractLambdaPollLoop.java @@ -16,19 +16,23 @@ import com.fasterxml.jackson.databind.ObjectReader; import io.quarkus.runtime.Application; +import io.quarkus.runtime.LaunchMode; import io.quarkus.runtime.ShutdownContext; public abstract class AbstractLambdaPollLoop { private static final Logger log = Logger.getLogger(AbstractLambdaPollLoop.class); - private ObjectMapper objectMapper; - private ObjectReader cognitoIdReader; - private ObjectReader clientCtxReader; + private final ObjectMapper objectMapper; + private final ObjectReader cognitoIdReader; + private final ObjectReader clientCtxReader; + private final LaunchMode launchMode; - public AbstractLambdaPollLoop(ObjectMapper objectMapper, ObjectReader cognitoIdReader, ObjectReader clientCtxReader) { + public AbstractLambdaPollLoop(ObjectMapper objectMapper, ObjectReader cognitoIdReader, ObjectReader clientCtxReader, + LaunchMode launchMode) { this.objectMapper = objectMapper; this.cognitoIdReader = cognitoIdReader; this.clientCtxReader = clientCtxReader; + this.launchMode = launchMode; } protected abstract boolean isStream(); @@ -45,7 +49,7 @@ public void startPollLoop(ShutdownContext context) { @Override public void run() { try { - if (!LambdaHotReplacementRecorder.enabled) { + if (!LambdaHotReplacementRecorder.enabled && launchMode == LaunchMode.DEVELOPMENT) { // when running with continuous testing, this method fails // because currentApplication is not set when running as an // auxiliary application. So, just skip it if hot replacement enabled. @@ -81,7 +85,7 @@ public void run() { continue; } try { - if (LambdaHotReplacementRecorder.enabled) { + if (LambdaHotReplacementRecorder.enabled && launchMode == LaunchMode.DEVELOPMENT) { try { // do not interrupt during a hot replacement // as shutdown will abort and do nasty things. @@ -125,7 +129,7 @@ public void run() { if (abortGracefully(e)) { return; } - log.error("Failed to run lambda", e); + log.error("Failed to run lambda (" + launchMode + ")", e); postError(AmazonLambdaApi.invocationError(baseUrl, requestId), new FunctionError(e.getClass().getName(), e.getMessage())); @@ -134,7 +138,7 @@ public void run() { } catch (Exception e) { if (!abortGracefully(e)) - log.error("Error running lambda", e); + log.error("Error running lambda (" + launchMode + ")", e); Application app = Application.currentApplication(); if (app != null) { app.stop(); @@ -150,24 +154,24 @@ public void run() { } } catch (Exception e) { try { - log.error("Lambda init error", e); + log.error("Lambda init error (" + launchMode + ")", e); postError(AmazonLambdaApi.initError(baseUrl), new FunctionError(e.getClass().getName(), e.getMessage())); } catch (Exception ex) { - log.error("Failed to report init error", ex); + log.error("Failed to report init error (" + launchMode + ")", ex); } finally { // our main loop is done, time to shutdown Application app = Application.currentApplication(); if (app != null) { - log.error("Shutting down Quarkus application because of error"); + log.error("Shutting down Quarkus application because of error (" + launchMode + ")"); app.stop(); } } } finally { - log.info("Lambda polling thread complete"); + log.info("Lambda polling thread complete (" + launchMode + ")"); } } - }, "Lambda Thread"); + }, "Lambda Thread (" + launchMode + ")"); pollingThread.setDaemon(true); context.addShutdownTask(() -> { running.set(false); @@ -261,7 +265,7 @@ boolean abortGracefully(Exception ex) { // if we are running in test mode, or native mode outside of the lambda container, then don't output stack trace for socket errors boolean lambdaEnv = System.getenv("AWS_LAMBDA_RUNTIME_API") != null; - boolean testEnv = System.getProperty(AmazonLambdaApi.QUARKUS_INTERNAL_AWS_LAMBDA_TEST_API) != null; + boolean testEnv = LaunchMode.current() == LaunchMode.TEST; boolean graceful = ((ex instanceof SocketException || ex instanceof ConnectException) && testEnv) || (ex instanceof UnknownHostException && !lambdaEnv); diff --git a/extensions/amazon-lambda/common-runtime/src/main/java/io/quarkus/amazon/lambda/runtime/AmazonLambdaApi.java b/extensions/amazon-lambda/common-runtime/src/main/java/io/quarkus/amazon/lambda/runtime/AmazonLambdaApi.java index ec1e528ab3875..5b04cbf30bde5 100644 --- a/extensions/amazon-lambda/common-runtime/src/main/java/io/quarkus/amazon/lambda/runtime/AmazonLambdaApi.java +++ b/extensions/amazon-lambda/common-runtime/src/main/java/io/quarkus/amazon/lambda/runtime/AmazonLambdaApi.java @@ -3,6 +3,10 @@ import java.net.MalformedURLException; import java.net.URL; +import org.eclipse.microprofile.config.ConfigProvider; + +import io.quarkus.runtime.LaunchMode; + /** * Various constants and util methods used for communication with the AWS API. */ @@ -80,13 +84,15 @@ static String functionVersion() { } public static boolean isTestMode() { - return System.getProperty(AmazonLambdaApi.QUARKUS_INTERNAL_AWS_LAMBDA_TEST_API) != null; + //need this config check for native tests + return LaunchMode.current() == LaunchMode.TEST + || ConfigProvider.getConfig().getOptionalValue(QUARKUS_INTERNAL_AWS_LAMBDA_TEST_API, String.class).isPresent(); } private static String runtimeApi() { - String testApi = System.getProperty(QUARKUS_INTERNAL_AWS_LAMBDA_TEST_API); - if (testApi != null) { - return testApi; + var testApi = ConfigProvider.getConfig().getOptionalValue(QUARKUS_INTERNAL_AWS_LAMBDA_TEST_API, String.class); + if (testApi.isPresent()) { + return testApi.get(); } return System.getenv("AWS_LAMBDA_RUNTIME_API"); } diff --git a/extensions/amazon-lambda/deployment/src/main/java/io/quarkus/amazon/lambda/deployment/AmazonLambdaProcessor.java b/extensions/amazon-lambda/deployment/src/main/java/io/quarkus/amazon/lambda/deployment/AmazonLambdaProcessor.java index b755f0aae7f7b..7b14804bf9834 100644 --- a/extensions/amazon-lambda/deployment/src/main/java/io/quarkus/amazon/lambda/deployment/AmazonLambdaProcessor.java +++ b/extensions/amazon-lambda/deployment/src/main/java/io/quarkus/amazon/lambda/deployment/AmazonLambdaProcessor.java @@ -240,9 +240,10 @@ public void recordHandlerClass(List lambdas, @Record(value = ExecutionTime.RUNTIME_INIT) void startPoolLoop(AmazonLambdaRecorder recorder, ShutdownContextBuildItem shutdownContextBuildItem, + LaunchModeBuildItem launchModeBuildItem, List orderServicesFirst // try to order this after service recorders ) { - recorder.startPollLoop(shutdownContextBuildItem); + recorder.startPollLoop(shutdownContextBuildItem, launchModeBuildItem.getLaunchMode()); } @BuildStep @@ -253,7 +254,7 @@ void startPoolLoopDevOrTest(AmazonLambdaRecorder recorder, LaunchModeBuildItem launchModeBuildItem) { LaunchMode mode = launchModeBuildItem.getLaunchMode(); if (mode.isDevOrTest()) { - recorder.startPollLoop(shutdownContextBuildItem); + recorder.startPollLoop(shutdownContextBuildItem, mode); } } diff --git a/extensions/amazon-lambda/deployment/src/test/java/io/quarkus/amazon/lambda/deployment/testing/LambdaDevServicesContinuousTestingTestCase.java b/extensions/amazon-lambda/deployment/src/test/java/io/quarkus/amazon/lambda/deployment/testing/LambdaDevServicesContinuousTestingTestCase.java index 1565016006b69..8bd2c2f3b4a2e 100644 --- a/extensions/amazon-lambda/deployment/src/test/java/io/quarkus/amazon/lambda/deployment/testing/LambdaDevServicesContinuousTestingTestCase.java +++ b/extensions/amazon-lambda/deployment/src/test/java/io/quarkus/amazon/lambda/deployment/testing/LambdaDevServicesContinuousTestingTestCase.java @@ -6,7 +6,7 @@ import org.jboss.shrinkwrap.api.asset.StringAsset; import org.jboss.shrinkwrap.api.spec.JavaArchive; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.extension.RegisterExtension; import io.quarkus.test.ContinuousTestingTestUtils; @@ -20,7 +20,9 @@ public class LambdaDevServicesContinuousTestingTestCase { public JavaArchive get() { return ShrinkWrap.create(JavaArchive.class) .addClasses(GreetingLambda.class, Person.class) - .addAsResource(new StringAsset(ContinuousTestingTestUtils.appProperties("")), + .addAsResource( + new StringAsset(ContinuousTestingTestUtils.appProperties( + "quarkus.log.category.\"io.quarkus.amazon.lambda.runtime\".level=DEBUG")), "application.properties"); } }).setTestArchiveProducer(new Supplier() { @@ -30,7 +32,8 @@ public JavaArchive get() { } }); - @Test + //run this twice, to make sure everything is cleaned up properly + @RepeatedTest(2) public void testLambda() throws Exception { ContinuousTestingTestUtils utils = new ContinuousTestingTestUtils(); var result = utils.waitForNextCompletion(); diff --git a/extensions/amazon-lambda/event-server/src/main/java/io/quarkus/amazon/lambda/runtime/MockEventServer.java b/extensions/amazon-lambda/event-server/src/main/java/io/quarkus/amazon/lambda/runtime/MockEventServer.java index cea6dfd37e231..db4f4086c1eab 100644 --- a/extensions/amazon-lambda/event-server/src/main/java/io/quarkus/amazon/lambda/runtime/MockEventServer.java +++ b/extensions/amazon-lambda/event-server/src/main/java/io/quarkus/amazon/lambda/runtime/MockEventServer.java @@ -5,6 +5,7 @@ import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; @@ -24,21 +25,6 @@ public class MockEventServer implements Closeable { protected static final Logger log = Logger.getLogger(MockEventServer.class); public static final int DEFAULT_PORT = 8081; - public void start() { - int port = DEFAULT_PORT; - start(port); - } - - public void start(int port) { - this.port = port; - vertx = Vertx.vertx(); - httpServer = vertx.createHttpServer(); - router = Router.router(vertx); - setupRoutes(); - httpServer.requestHandler(router).listen(port).result(); - log.info("Mock Lambda Event Server Started"); - } - private Vertx vertx; private int port; protected HttpServer httpServer; @@ -50,11 +36,31 @@ public void start(int port) { public static final String INVOCATION = BASE_PATH + AmazonLambdaApi.API_PATH_INVOCATION; public static final String NEXT_INVOCATION = BASE_PATH + AmazonLambdaApi.API_PATH_INVOCATION_NEXT; public static final String POST_EVENT = BASE_PATH; + final AtomicBoolean closed = new AtomicBoolean(); public MockEventServer() { queue = new LinkedBlockingQueue<>(); } + public void start() { + int port = DEFAULT_PORT; + start(port); + } + + public void start(int port) { + this.port = port; + vertx = Vertx.vertx(); + httpServer = vertx.createHttpServer(); + router = Router.router(vertx); + setupRoutes(); + try { + httpServer.requestHandler(router).listen(port).toCompletionStage().toCompletableFuture().get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + log.info("Mock Lambda Event Server Started"); + } + public HttpServer getHttpServer() { return httpServer; } @@ -85,6 +91,7 @@ public void postEvent(RoutingContext ctx) { } ctx.put(AmazonLambdaApi.LAMBDA_TRACE_HEADER_KEY, traceId); try { + log.debugf("Putting message %s into the queue", requestId); queue.put(ctx); } catch (InterruptedException e) { log.error("Publish interrupted"); @@ -92,26 +99,22 @@ public void postEvent(RoutingContext ctx) { } } - private RoutingContext pollNextEvent() throws InterruptedException { - for (;;) { - RoutingContext request = queue.poll(10, TimeUnit.MILLISECONDS); - if (request != null) - return request; - - } - } - public void nextEvent(RoutingContext ctx) { // Vert.x barfs if you block too long so we have our own executor blockingPool.execute(() -> { final AtomicBoolean closed = new AtomicBoolean(false); ctx.response().closeHandler((v) -> closed.set(true)); + ctx.response().exceptionHandler((v) -> closed.set(true)); + ctx.request().connection().closeHandler((v) -> closed.set(true)); + ctx.request().connection().exceptionHandler((v) -> closed.set(true)); RoutingContext request = null; try { for (;;) { request = queue.poll(10, TimeUnit.MILLISECONDS); if (request != null) { if (closed.get()) { + log.debugf("Polled message %s but connection was closed, returning to queue", + request.get(AmazonLambdaApi.LAMBDA_RUNTIME_AWS_REQUEST_ID)); queue.put(request); return; } else { @@ -135,6 +138,7 @@ public void nextEvent(RoutingContext ctx) { ctx.response().putHeader(AmazonLambdaApi.LAMBDA_TRACE_HEADER_KEY, traceId); } String requestId = request.get(AmazonLambdaApi.LAMBDA_RUNTIME_AWS_REQUEST_ID); + log.debugf("Starting processing %s, added to pending request map", requestId); responsePending.put(requestId, request); ctx.response().putHeader(AmazonLambdaApi.LAMBDA_RUNTIME_AWS_REQUEST_ID, requestId); Buffer body = processEventBody(request); @@ -162,6 +166,7 @@ public void handleResponse(RoutingContext ctx) { ctx.fail(404); return; } + log.debugf("Sending response %s", requestId); Buffer buffer = ctx.getBody(); processResponse(ctx, pending, buffer); ctx.response().setStatusCode(204); @@ -176,6 +181,7 @@ public void handleRequeue(RoutingContext ctx) { ctx.fail(404); return; } + log.debugf("Requeue %s", requestId); try { queue.put(pending); } catch (InterruptedException e) { @@ -209,6 +215,7 @@ public void handleError(RoutingContext ctx) { ctx.fail(404); return; } + log.debugf("Sending response %s", requestId); Buffer buffer = ctx.getBody(); processError(ctx, pending, buffer); ctx.response().setStatusCode(204); @@ -232,9 +239,28 @@ public void processError(RoutingContext ctx, RoutingContext pending, Buffer buff @Override public void close() throws IOException { + if (!closed.compareAndSet(false, true)) { + return; + } log.info("Stopping Mock Lambda Event Server"); - httpServer.close().result(); - vertx.close().result(); - blockingPool.shutdown(); + for (var i : responsePending.entrySet()) { + i.getValue().response().setStatusCode(503).end(); + } + for (var i : queue) { + i.response().setStatusCode(503).end(); + } + try { + httpServer.close().toCompletionStage().toCompletableFuture().get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } finally { + try { + vertx.close().toCompletionStage().toCompletableFuture().get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } finally { + blockingPool.shutdown(); + } + } } } diff --git a/extensions/amazon-lambda/runtime/src/main/java/io/quarkus/amazon/lambda/runtime/AmazonLambdaRecorder.java b/extensions/amazon-lambda/runtime/src/main/java/io/quarkus/amazon/lambda/runtime/AmazonLambdaRecorder.java index 025873490e433..6fdc74a0b6bab 100644 --- a/extensions/amazon-lambda/runtime/src/main/java/io/quarkus/amazon/lambda/runtime/AmazonLambdaRecorder.java +++ b/extensions/amazon-lambda/runtime/src/main/java/io/quarkus/amazon/lambda/runtime/AmazonLambdaRecorder.java @@ -17,6 +17,7 @@ import io.quarkus.amazon.lambda.runtime.handlers.S3EventInputReader; import io.quarkus.arc.runtime.BeanContainer; +import io.quarkus.runtime.LaunchMode; import io.quarkus.runtime.ShutdownContext; import io.quarkus.runtime.annotations.Recorder; @@ -145,9 +146,9 @@ public void chooseHandlerClass(List>> uname } @SuppressWarnings("rawtypes") - public void startPollLoop(ShutdownContext context) { + public void startPollLoop(ShutdownContext context, LaunchMode launchMode) { AbstractLambdaPollLoop loop = new AbstractLambdaPollLoop(AmazonLambdaMapperRecorder.objectMapper, - AmazonLambdaMapperRecorder.cognitoIdReader, AmazonLambdaMapperRecorder.clientCtxReader) { + AmazonLambdaMapperRecorder.cognitoIdReader, AmazonLambdaMapperRecorder.clientCtxReader, launchMode) { @Override protected Object processRequest(Object input, AmazonLambdaContext context) throws Exception { diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/main/java/io/quarkus/funqy/deployment/bindings/FunqyLambdaBuildStep.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/main/java/io/quarkus/funqy/deployment/bindings/FunqyLambdaBuildStep.java index 82887800d171c..6a8bcbdc213d9 100644 --- a/extensions/funqy/funqy-amazon-lambda/deployment/src/main/java/io/quarkus/funqy/deployment/bindings/FunqyLambdaBuildStep.java +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/main/java/io/quarkus/funqy/deployment/bindings/FunqyLambdaBuildStep.java @@ -59,9 +59,10 @@ public RuntimeComplete choose(FunqyConfig config, FunqyLambdaBindingRecorder rec public void startPoolLoop(FunqyLambdaBindingRecorder recorder, RuntimeComplete ignored, ShutdownContextBuildItem shutdownContextBuildItem, + LaunchModeBuildItem launchModeBuildItem, List orderServicesFirst // try to order this after service recorders ) { - recorder.startPollLoop(shutdownContextBuildItem); + recorder.startPollLoop(shutdownContextBuildItem, launchModeBuildItem.getLaunchMode()); } @BuildStep @@ -73,7 +74,7 @@ public void startPoolLoopDevOrTest(RuntimeComplete ignored, LaunchModeBuildItem launchModeBuildItem) { LaunchMode mode = launchModeBuildItem.getLaunchMode(); if (mode.isDevOrTest()) { - recorder.startPollLoop(shutdownContextBuildItem); + recorder.startPollLoop(shutdownContextBuildItem, mode); } } } diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/FunqyLambdaBindingRecorder.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/FunqyLambdaBindingRecorder.java index 03ac85133e80c..d2726940abd85 100644 --- a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/FunqyLambdaBindingRecorder.java +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/FunqyLambdaBindingRecorder.java @@ -27,6 +27,7 @@ import io.quarkus.funqy.runtime.FunqyConfig; import io.quarkus.funqy.runtime.FunqyServerResponse; import io.quarkus.funqy.runtime.RequestContextImpl; +import io.quarkus.runtime.LaunchMode; import io.quarkus.runtime.ShutdownContext; import io.quarkus.runtime.annotations.Recorder; @@ -107,9 +108,9 @@ public static void handle(InputStream inputStream, OutputStream outputStream, Co } @SuppressWarnings("rawtypes") - public void startPollLoop(ShutdownContext context) { + public void startPollLoop(ShutdownContext context, LaunchMode launchMode) { AbstractLambdaPollLoop loop = new AbstractLambdaPollLoop(AmazonLambdaMapperRecorder.objectMapper, - AmazonLambdaMapperRecorder.cognitoIdReader, AmazonLambdaMapperRecorder.clientCtxReader) { + AmazonLambdaMapperRecorder.cognitoIdReader, AmazonLambdaMapperRecorder.clientCtxReader, launchMode) { @Override protected Object processRequest(Object input, AmazonLambdaContext context) throws Exception { diff --git a/extensions/vertx-http/deployment/src/main/java/io/quarkus/vertx/http/deployment/devmode/tests/TestsProcessor.java b/extensions/vertx-http/deployment/src/main/java/io/quarkus/vertx/http/deployment/devmode/tests/TestsProcessor.java index 8b58de97ce3ff..04d51e13b7aca 100644 --- a/extensions/vertx-http/deployment/src/main/java/io/quarkus/vertx/http/deployment/devmode/tests/TestsProcessor.java +++ b/extensions/vertx-http/deployment/src/main/java/io/quarkus/vertx/http/deployment/devmode/tests/TestsProcessor.java @@ -58,7 +58,7 @@ public void setupTestRoutes( // Add continuous testing routeBuildItemBuildProducer.produce(nonApplicationRootPathBuildItem.routeBuilder() .route("dev/test") - .handler(recorder.continousTestHandler(shutdownContextBuildItem)) + .handler(recorder.continuousTestHandler(shutdownContextBuildItem)) .build()); } diff --git a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/devmode/DevConsoleRecorder.java b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/devmode/DevConsoleRecorder.java index 8b2e5cb4ee755..1ccfd5f276f7a 100644 --- a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/devmode/DevConsoleRecorder.java +++ b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/devmode/DevConsoleRecorder.java @@ -45,7 +45,7 @@ public Handler devConsoleHandler(String devConsoleFinalDestinati return new DevConsoleStaticHandler(devConsoleFinalDestination); } - public Handler continousTestHandler(ShutdownContext context) { + public Handler continuousTestHandler(ShutdownContext context) { ContinuousTestWebSocketHandler handler = new ContinuousTestWebSocketHandler(); ContinuousTestingSharedStateManager.addStateListener(handler); @@ -53,7 +53,6 @@ public Handler continousTestHandler(ShutdownContext context) { @Override public void run() { ContinuousTestingSharedStateManager.removeStateListener(handler); - ContinuousTestingSharedStateManager.reset(); } });