From 363a04299ac45f18718f4d41d514d0c630401258 Mon Sep 17 00:00:00 2001 From: Stuart Douglas Date: Thu, 9 Sep 2021 10:26:31 +1000 Subject: [PATCH] Fix lambda continuous testing failure If the tests start before the dev mode has initialized the system property would be seen by dev mode, and dev mode could open it's endpoint on 8081. This causes the tests to be run against the dev mode endpoint which breaks change tracking. Also fixes a few other minor things. --- .../quarkus/deployment/QuarkusAugmentor.java | 4 + .../RuntimeApplicationShutdownBuildItem.java | 29 +++++++ .../dev/testing/TestTracingProcessor.java | 3 + .../runner/bootstrap/StartupActionImpl.java | 24 ++++++ .../lambda/runtime/MockHttpEventServer.java | 1 + .../lambda/runtime/MockRestEventServer.java | 1 + .../DevServicesLambdaProcessor.java | 16 ++-- .../runtime/AbstractLambdaPollLoop.java | 32 ++++---- .../lambda/runtime/AmazonLambdaApi.java | 14 +++- .../deployment/AmazonLambdaProcessor.java | 5 +- ...aDevServicesContinuousTestingTestCase.java | 9 ++- .../lambda/runtime/MockEventServer.java | 80 ++++++++++++------- .../lambda/runtime/AmazonLambdaRecorder.java | 5 +- .../bindings/FunqyLambdaBuildStep.java | 5 +- .../lambda/FunqyLambdaBindingRecorder.java | 5 +- .../devmode/tests/TestsProcessor.java | 2 +- .../runtime/devmode/DevConsoleRecorder.java | 3 +- 17 files changed, 174 insertions(+), 64 deletions(-) create mode 100644 core/deployment/src/main/java/io/quarkus/deployment/builditem/RuntimeApplicationShutdownBuildItem.java diff --git a/core/deployment/src/main/java/io/quarkus/deployment/QuarkusAugmentor.java b/core/deployment/src/main/java/io/quarkus/deployment/QuarkusAugmentor.java index 2a71c00b6eb44..99df303269ebf 100644 --- a/core/deployment/src/main/java/io/quarkus/deployment/QuarkusAugmentor.java +++ b/core/deployment/src/main/java/io/quarkus/deployment/QuarkusAugmentor.java @@ -32,6 +32,7 @@ import io.quarkus.deployment.builditem.LiveReloadBuildItem; import io.quarkus.deployment.builditem.QuarkusBuildCloseablesBuildItem; import io.quarkus.deployment.builditem.RawCommandLineArgumentsBuildItem; +import io.quarkus.deployment.builditem.RuntimeApplicationShutdownBuildItem; import io.quarkus.deployment.builditem.ShutdownContextBuildItem; import io.quarkus.deployment.pkg.builditem.BuildSystemTargetBuildItem; import io.quarkus.dev.spi.DevModeType; @@ -128,6 +129,9 @@ public BuildResult run() throws Exception { for (Consumer i : buildChainCustomizers) { i.accept(chainBuilder); } + if (launchMode.isDevOrTest()) { + chainBuilder.addFinal(RuntimeApplicationShutdownBuildItem.class); + } final ArchiveRootBuildItem.Builder rootBuilder = ArchiveRootBuildItem.builder(); if (root != null) { diff --git a/core/deployment/src/main/java/io/quarkus/deployment/builditem/RuntimeApplicationShutdownBuildItem.java b/core/deployment/src/main/java/io/quarkus/deployment/builditem/RuntimeApplicationShutdownBuildItem.java new file mode 100644 index 0000000000000..1145e33f05a2f --- /dev/null +++ b/core/deployment/src/main/java/io/quarkus/deployment/builditem/RuntimeApplicationShutdownBuildItem.java @@ -0,0 +1,29 @@ +package io.quarkus.deployment.builditem; + +import org.jboss.logging.Logger; + +import io.quarkus.builder.item.MultiBuildItem; + +/** + * Build Item that can be used to queue shutdown tasks that are run when the runtime application shuts down. + * + * This is similar to {@link ShutdownContextBuildItem} however it applies to tasks on the 'build' side, so if a processor + * wants to close something after the application has completed this item lets it do this. + * + * This has no effect for production applications, and is only useful in dev/test mode. The main use case for this is + * for shutting down deployment side test utilities at the end of a test run. + */ +public final class RuntimeApplicationShutdownBuildItem extends MultiBuildItem { + + private static final Logger log = Logger.getLogger(RuntimeApplicationShutdownBuildItem.class); + + private final Runnable closeTask; + + public RuntimeApplicationShutdownBuildItem(Runnable closeTask) { + this.closeTask = closeTask; + } + + public Runnable getCloseTask() { + return closeTask; + } +} diff --git a/core/deployment/src/main/java/io/quarkus/deployment/dev/testing/TestTracingProcessor.java b/core/deployment/src/main/java/io/quarkus/deployment/dev/testing/TestTracingProcessor.java index 7e2777caaa5e4..89daef853919d 100644 --- a/core/deployment/src/main/java/io/quarkus/deployment/dev/testing/TestTracingProcessor.java +++ b/core/deployment/src/main/java/io/quarkus/deployment/dev/testing/TestTracingProcessor.java @@ -25,6 +25,7 @@ import io.quarkus.deployment.builditem.ServiceStartBuildItem; import io.quarkus.deployment.logging.LogCleanupFilterBuildItem; import io.quarkus.dev.spi.DevModeType; +import io.quarkus.dev.testing.ContinuousTestingSharedStateManager; import io.quarkus.dev.testing.TracingHandler; import io.quarkus.gizmo.Gizmo; @@ -82,6 +83,8 @@ void startTesting(TestConfig config, LiveReloadBuildItem liveReloadBuildItem, testSupport.stop(); } } + QuarkusClassLoader cl = (QuarkusClassLoader) Thread.currentThread().getContextClassLoader(); + ((QuarkusClassLoader) cl.parent()).addCloseTask(ContinuousTestingSharedStateManager::reset); } @BuildStep(onlyIf = IsTest.class) diff --git a/core/deployment/src/main/java/io/quarkus/runner/bootstrap/StartupActionImpl.java b/core/deployment/src/main/java/io/quarkus/runner/bootstrap/StartupActionImpl.java index b0710e1efbb24..9e41b7d060350 100644 --- a/core/deployment/src/main/java/io/quarkus/runner/bootstrap/StartupActionImpl.java +++ b/core/deployment/src/main/java/io/quarkus/runner/bootstrap/StartupActionImpl.java @@ -31,6 +31,7 @@ import io.quarkus.deployment.builditem.GeneratedClassBuildItem; import io.quarkus.deployment.builditem.GeneratedResourceBuildItem; import io.quarkus.deployment.builditem.MainClassBuildItem; +import io.quarkus.deployment.builditem.RuntimeApplicationShutdownBuildItem; import io.quarkus.deployment.builditem.TransformedClassesBuildItem; import io.quarkus.deployment.configuration.RunTimeConfigurationGenerator; import io.quarkus.dev.appstate.ApplicationStateNotification; @@ -106,6 +107,14 @@ public void run() { if (ApplicationStateNotification.getState() == ApplicationStateNotification.State.INITIAL) { ApplicationStateNotification.notifyStartupFailed(e); } + } finally { + for (var i : buildResult.consumeMulti(RuntimeApplicationShutdownBuildItem.class)) { + try { + i.getCloseTask().run(); + } catch (Throwable t) { + log.error("Failed to run close task", t); + } + } } } }, "Quarkus Main Thread"); @@ -173,6 +182,13 @@ public void accept(Integer integer) { } finally { runtimeClassLoader.close(); Thread.currentThread().setContextClassLoader(old); + for (var i : buildResult.consumeMulti(RuntimeApplicationShutdownBuildItem.class)) { + try { + i.getCloseTask().run(); + } catch (Throwable t) { + log.error("Failed to run close task", t); + } + } } } @@ -230,6 +246,14 @@ public void close() throws IOException { } } finally { ForkJoinClassLoading.setForkJoinClassLoader(ClassLoader.getSystemClassLoader()); + + for (var i : buildResult.consumeMulti(RuntimeApplicationShutdownBuildItem.class)) { + try { + i.getCloseTask().run(); + } catch (Throwable t) { + log.error("Failed to run close task", t); + } + } if (curatedApplication.getQuarkusBootstrap().getMode() == QuarkusBootstrap.Mode.TEST && !curatedApplication.getQuarkusBootstrap().isAuxiliaryApplication()) { //for tests we just always shut down the curated application, as it is only used once diff --git a/extensions/amazon-lambda-http/http-event-server/src/main/java/io/quarkus/amazon/lambda/runtime/MockHttpEventServer.java b/extensions/amazon-lambda-http/http-event-server/src/main/java/io/quarkus/amazon/lambda/runtime/MockHttpEventServer.java index c5f255c3ccebf..5130a50ad374a 100644 --- a/extensions/amazon-lambda-http/http-event-server/src/main/java/io/quarkus/amazon/lambda/runtime/MockHttpEventServer.java +++ b/extensions/amazon-lambda-http/http-event-server/src/main/java/io/quarkus/amazon/lambda/runtime/MockHttpEventServer.java @@ -80,6 +80,7 @@ public void handleHttpRequests(RoutingContext ctx) { try { byte[] mEvent = eventWriter.writeValueAsBytes(event); ctx.put(APIGatewayV2HTTPEvent.class.getName(), mEvent); + log.debugf("Putting message %s into the queue", requestId); queue.put(ctx); } catch (Exception e) { log.error("Publish failure", e); diff --git a/extensions/amazon-lambda-rest/rest-event-server/src/main/java/io/quarkus/amazon/lambda/runtime/MockRestEventServer.java b/extensions/amazon-lambda-rest/rest-event-server/src/main/java/io/quarkus/amazon/lambda/runtime/MockRestEventServer.java index 4a3665eaf2188..c8dfbf23c8db2 100644 --- a/extensions/amazon-lambda-rest/rest-event-server/src/main/java/io/quarkus/amazon/lambda/runtime/MockRestEventServer.java +++ b/extensions/amazon-lambda-rest/rest-event-server/src/main/java/io/quarkus/amazon/lambda/runtime/MockRestEventServer.java @@ -104,6 +104,7 @@ public void handleHttpRequests(RoutingContext ctx) { try { byte[] mEvent = eventWriter.writeValueAsBytes(event); ctx.put(AwsProxyRequest.class.getName(), mEvent); + log.debugf("Putting message %s into the queue", requestId); queue.put(ctx); } catch (Exception e) { log.error("Publish failure", e); diff --git a/extensions/amazon-lambda/common-deployment/src/main/java/io/quarkus/amazon/lambda/deployment/DevServicesLambdaProcessor.java b/extensions/amazon-lambda/common-deployment/src/main/java/io/quarkus/amazon/lambda/deployment/DevServicesLambdaProcessor.java index 6b41e5100b0c9..8a8eaea1e625a 100644 --- a/extensions/amazon-lambda/common-deployment/src/main/java/io/quarkus/amazon/lambda/deployment/DevServicesLambdaProcessor.java +++ b/extensions/amazon-lambda/common-deployment/src/main/java/io/quarkus/amazon/lambda/deployment/DevServicesLambdaProcessor.java @@ -14,9 +14,11 @@ import io.quarkus.deployment.IsNormal; import io.quarkus.deployment.annotations.BuildProducer; import io.quarkus.deployment.annotations.BuildStep; +import io.quarkus.deployment.annotations.Produce; import io.quarkus.deployment.annotations.Record; -import io.quarkus.deployment.builditem.DevServicesNativeConfigResultBuildItem; +import io.quarkus.deployment.builditem.DevServicesConfigResultBuildItem; import io.quarkus.deployment.builditem.LaunchModeBuildItem; +import io.quarkus.deployment.builditem.RuntimeApplicationShutdownBuildItem; import io.quarkus.deployment.builditem.ServiceStartBuildItem; import io.quarkus.runtime.LaunchMode; @@ -47,12 +49,14 @@ private boolean legacyTestingEnabled() { } } + @Produce(ServiceStartBuildItem.class) @BuildStep(onlyIfNot = IsNormal.class) public void startEventServer(LaunchModeBuildItem launchMode, LambdaConfig config, Optional override, - BuildProducer devServicePropertiesProducer, - BuildProducer serviceStartBuildItemBuildProducer) throws Exception { + BuildProducer devServicePropertiesProducer, + BuildProducer runtimeApplicationShutdownBuildItemBuildProducer) + throws Exception { if (!launchMode.getLaunchMode().isDevOrTest()) return; if (legacyTestingEnabled()) @@ -73,9 +77,8 @@ public void startEventServer(LaunchModeBuildItem launchMode, startMode = launchMode.getLaunchMode(); server.start(port); String baseUrl = "localhost:" + port + MockEventServer.BASE_PATH; - System.setProperty(AmazonLambdaApi.QUARKUS_INTERNAL_AWS_LAMBDA_TEST_API, baseUrl); devServicePropertiesProducer.produce( - new DevServicesNativeConfigResultBuildItem(AmazonLambdaApi.QUARKUS_INTERNAL_AWS_LAMBDA_TEST_API, baseUrl)); + new DevServicesConfigResultBuildItem(AmazonLambdaApi.QUARKUS_INTERNAL_AWS_LAMBDA_TEST_API, baseUrl)); Runnable closeTask = () -> { if (server != null) { try { @@ -91,5 +94,8 @@ public void startEventServer(LaunchModeBuildItem launchMode, }; QuarkusClassLoader cl = (QuarkusClassLoader) Thread.currentThread().getContextClassLoader(); ((QuarkusClassLoader) cl.parent()).addCloseTask(closeTask); + if (launchMode.isTest()) { + runtimeApplicationShutdownBuildItemBuildProducer.produce(new RuntimeApplicationShutdownBuildItem(closeTask)); + } } } diff --git a/extensions/amazon-lambda/common-runtime/src/main/java/io/quarkus/amazon/lambda/runtime/AbstractLambdaPollLoop.java b/extensions/amazon-lambda/common-runtime/src/main/java/io/quarkus/amazon/lambda/runtime/AbstractLambdaPollLoop.java index 1d7052ccc766d..eb861016d78ea 100644 --- a/extensions/amazon-lambda/common-runtime/src/main/java/io/quarkus/amazon/lambda/runtime/AbstractLambdaPollLoop.java +++ b/extensions/amazon-lambda/common-runtime/src/main/java/io/quarkus/amazon/lambda/runtime/AbstractLambdaPollLoop.java @@ -16,19 +16,23 @@ import com.fasterxml.jackson.databind.ObjectReader; import io.quarkus.runtime.Application; +import io.quarkus.runtime.LaunchMode; import io.quarkus.runtime.ShutdownContext; public abstract class AbstractLambdaPollLoop { private static final Logger log = Logger.getLogger(AbstractLambdaPollLoop.class); - private ObjectMapper objectMapper; - private ObjectReader cognitoIdReader; - private ObjectReader clientCtxReader; + private final ObjectMapper objectMapper; + private final ObjectReader cognitoIdReader; + private final ObjectReader clientCtxReader; + private final LaunchMode launchMode; - public AbstractLambdaPollLoop(ObjectMapper objectMapper, ObjectReader cognitoIdReader, ObjectReader clientCtxReader) { + public AbstractLambdaPollLoop(ObjectMapper objectMapper, ObjectReader cognitoIdReader, ObjectReader clientCtxReader, + LaunchMode launchMode) { this.objectMapper = objectMapper; this.cognitoIdReader = cognitoIdReader; this.clientCtxReader = clientCtxReader; + this.launchMode = launchMode; } protected abstract boolean isStream(); @@ -45,7 +49,7 @@ public void startPollLoop(ShutdownContext context) { @Override public void run() { try { - if (!LambdaHotReplacementRecorder.enabled) { + if (!LambdaHotReplacementRecorder.enabled && launchMode == LaunchMode.DEVELOPMENT) { // when running with continuous testing, this method fails // because currentApplication is not set when running as an // auxiliary application. So, just skip it if hot replacement enabled. @@ -81,7 +85,7 @@ public void run() { continue; } try { - if (LambdaHotReplacementRecorder.enabled) { + if (LambdaHotReplacementRecorder.enabled && launchMode == LaunchMode.DEVELOPMENT) { try { // do not interrupt during a hot replacement // as shutdown will abort and do nasty things. @@ -125,7 +129,7 @@ public void run() { if (abortGracefully(e)) { return; } - log.error("Failed to run lambda", e); + log.error("Failed to run lambda (" + launchMode + ")", e); postError(AmazonLambdaApi.invocationError(baseUrl, requestId), new FunctionError(e.getClass().getName(), e.getMessage())); @@ -134,7 +138,7 @@ public void run() { } catch (Exception e) { if (!abortGracefully(e)) - log.error("Error running lambda", e); + log.error("Error running lambda (" + launchMode + ")", e); Application app = Application.currentApplication(); if (app != null) { app.stop(); @@ -150,24 +154,24 @@ public void run() { } } catch (Exception e) { try { - log.error("Lambda init error", e); + log.error("Lambda init error (" + launchMode + ")", e); postError(AmazonLambdaApi.initError(baseUrl), new FunctionError(e.getClass().getName(), e.getMessage())); } catch (Exception ex) { - log.error("Failed to report init error", ex); + log.error("Failed to report init error (" + launchMode + ")", ex); } finally { // our main loop is done, time to shutdown Application app = Application.currentApplication(); if (app != null) { - log.error("Shutting down Quarkus application because of error"); + log.error("Shutting down Quarkus application because of error (" + launchMode + ")"); app.stop(); } } } finally { - log.info("Lambda polling thread complete"); + log.info("Lambda polling thread complete (" + launchMode + ")"); } } - }, "Lambda Thread"); + }, "Lambda Thread (" + launchMode + ")"); pollingThread.setDaemon(true); context.addShutdownTask(() -> { running.set(false); @@ -261,7 +265,7 @@ boolean abortGracefully(Exception ex) { // if we are running in test mode, or native mode outside of the lambda container, then don't output stack trace for socket errors boolean lambdaEnv = System.getenv("AWS_LAMBDA_RUNTIME_API") != null; - boolean testEnv = System.getProperty(AmazonLambdaApi.QUARKUS_INTERNAL_AWS_LAMBDA_TEST_API) != null; + boolean testEnv = LaunchMode.current() == LaunchMode.TEST; boolean graceful = ((ex instanceof SocketException || ex instanceof ConnectException) && testEnv) || (ex instanceof UnknownHostException && !lambdaEnv); diff --git a/extensions/amazon-lambda/common-runtime/src/main/java/io/quarkus/amazon/lambda/runtime/AmazonLambdaApi.java b/extensions/amazon-lambda/common-runtime/src/main/java/io/quarkus/amazon/lambda/runtime/AmazonLambdaApi.java index ec1e528ab3875..5b04cbf30bde5 100644 --- a/extensions/amazon-lambda/common-runtime/src/main/java/io/quarkus/amazon/lambda/runtime/AmazonLambdaApi.java +++ b/extensions/amazon-lambda/common-runtime/src/main/java/io/quarkus/amazon/lambda/runtime/AmazonLambdaApi.java @@ -3,6 +3,10 @@ import java.net.MalformedURLException; import java.net.URL; +import org.eclipse.microprofile.config.ConfigProvider; + +import io.quarkus.runtime.LaunchMode; + /** * Various constants and util methods used for communication with the AWS API. */ @@ -80,13 +84,15 @@ static String functionVersion() { } public static boolean isTestMode() { - return System.getProperty(AmazonLambdaApi.QUARKUS_INTERNAL_AWS_LAMBDA_TEST_API) != null; + //need this config check for native tests + return LaunchMode.current() == LaunchMode.TEST + || ConfigProvider.getConfig().getOptionalValue(QUARKUS_INTERNAL_AWS_LAMBDA_TEST_API, String.class).isPresent(); } private static String runtimeApi() { - String testApi = System.getProperty(QUARKUS_INTERNAL_AWS_LAMBDA_TEST_API); - if (testApi != null) { - return testApi; + var testApi = ConfigProvider.getConfig().getOptionalValue(QUARKUS_INTERNAL_AWS_LAMBDA_TEST_API, String.class); + if (testApi.isPresent()) { + return testApi.get(); } return System.getenv("AWS_LAMBDA_RUNTIME_API"); } diff --git a/extensions/amazon-lambda/deployment/src/main/java/io/quarkus/amazon/lambda/deployment/AmazonLambdaProcessor.java b/extensions/amazon-lambda/deployment/src/main/java/io/quarkus/amazon/lambda/deployment/AmazonLambdaProcessor.java index b755f0aae7f7b..7b14804bf9834 100644 --- a/extensions/amazon-lambda/deployment/src/main/java/io/quarkus/amazon/lambda/deployment/AmazonLambdaProcessor.java +++ b/extensions/amazon-lambda/deployment/src/main/java/io/quarkus/amazon/lambda/deployment/AmazonLambdaProcessor.java @@ -240,9 +240,10 @@ public void recordHandlerClass(List lambdas, @Record(value = ExecutionTime.RUNTIME_INIT) void startPoolLoop(AmazonLambdaRecorder recorder, ShutdownContextBuildItem shutdownContextBuildItem, + LaunchModeBuildItem launchModeBuildItem, List orderServicesFirst // try to order this after service recorders ) { - recorder.startPollLoop(shutdownContextBuildItem); + recorder.startPollLoop(shutdownContextBuildItem, launchModeBuildItem.getLaunchMode()); } @BuildStep @@ -253,7 +254,7 @@ void startPoolLoopDevOrTest(AmazonLambdaRecorder recorder, LaunchModeBuildItem launchModeBuildItem) { LaunchMode mode = launchModeBuildItem.getLaunchMode(); if (mode.isDevOrTest()) { - recorder.startPollLoop(shutdownContextBuildItem); + recorder.startPollLoop(shutdownContextBuildItem, mode); } } diff --git a/extensions/amazon-lambda/deployment/src/test/java/io/quarkus/amazon/lambda/deployment/testing/LambdaDevServicesContinuousTestingTestCase.java b/extensions/amazon-lambda/deployment/src/test/java/io/quarkus/amazon/lambda/deployment/testing/LambdaDevServicesContinuousTestingTestCase.java index 1565016006b69..8bd2c2f3b4a2e 100644 --- a/extensions/amazon-lambda/deployment/src/test/java/io/quarkus/amazon/lambda/deployment/testing/LambdaDevServicesContinuousTestingTestCase.java +++ b/extensions/amazon-lambda/deployment/src/test/java/io/quarkus/amazon/lambda/deployment/testing/LambdaDevServicesContinuousTestingTestCase.java @@ -6,7 +6,7 @@ import org.jboss.shrinkwrap.api.asset.StringAsset; import org.jboss.shrinkwrap.api.spec.JavaArchive; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.extension.RegisterExtension; import io.quarkus.test.ContinuousTestingTestUtils; @@ -20,7 +20,9 @@ public class LambdaDevServicesContinuousTestingTestCase { public JavaArchive get() { return ShrinkWrap.create(JavaArchive.class) .addClasses(GreetingLambda.class, Person.class) - .addAsResource(new StringAsset(ContinuousTestingTestUtils.appProperties("")), + .addAsResource( + new StringAsset(ContinuousTestingTestUtils.appProperties( + "quarkus.log.category.\"io.quarkus.amazon.lambda.runtime\".level=DEBUG")), "application.properties"); } }).setTestArchiveProducer(new Supplier() { @@ -30,7 +32,8 @@ public JavaArchive get() { } }); - @Test + //run this twice, to make sure everything is cleaned up properly + @RepeatedTest(2) public void testLambda() throws Exception { ContinuousTestingTestUtils utils = new ContinuousTestingTestUtils(); var result = utils.waitForNextCompletion(); diff --git a/extensions/amazon-lambda/event-server/src/main/java/io/quarkus/amazon/lambda/runtime/MockEventServer.java b/extensions/amazon-lambda/event-server/src/main/java/io/quarkus/amazon/lambda/runtime/MockEventServer.java index cea6dfd37e231..db4f4086c1eab 100644 --- a/extensions/amazon-lambda/event-server/src/main/java/io/quarkus/amazon/lambda/runtime/MockEventServer.java +++ b/extensions/amazon-lambda/event-server/src/main/java/io/quarkus/amazon/lambda/runtime/MockEventServer.java @@ -5,6 +5,7 @@ import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; @@ -24,21 +25,6 @@ public class MockEventServer implements Closeable { protected static final Logger log = Logger.getLogger(MockEventServer.class); public static final int DEFAULT_PORT = 8081; - public void start() { - int port = DEFAULT_PORT; - start(port); - } - - public void start(int port) { - this.port = port; - vertx = Vertx.vertx(); - httpServer = vertx.createHttpServer(); - router = Router.router(vertx); - setupRoutes(); - httpServer.requestHandler(router).listen(port).result(); - log.info("Mock Lambda Event Server Started"); - } - private Vertx vertx; private int port; protected HttpServer httpServer; @@ -50,11 +36,31 @@ public void start(int port) { public static final String INVOCATION = BASE_PATH + AmazonLambdaApi.API_PATH_INVOCATION; public static final String NEXT_INVOCATION = BASE_PATH + AmazonLambdaApi.API_PATH_INVOCATION_NEXT; public static final String POST_EVENT = BASE_PATH; + final AtomicBoolean closed = new AtomicBoolean(); public MockEventServer() { queue = new LinkedBlockingQueue<>(); } + public void start() { + int port = DEFAULT_PORT; + start(port); + } + + public void start(int port) { + this.port = port; + vertx = Vertx.vertx(); + httpServer = vertx.createHttpServer(); + router = Router.router(vertx); + setupRoutes(); + try { + httpServer.requestHandler(router).listen(port).toCompletionStage().toCompletableFuture().get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + log.info("Mock Lambda Event Server Started"); + } + public HttpServer getHttpServer() { return httpServer; } @@ -85,6 +91,7 @@ public void postEvent(RoutingContext ctx) { } ctx.put(AmazonLambdaApi.LAMBDA_TRACE_HEADER_KEY, traceId); try { + log.debugf("Putting message %s into the queue", requestId); queue.put(ctx); } catch (InterruptedException e) { log.error("Publish interrupted"); @@ -92,26 +99,22 @@ public void postEvent(RoutingContext ctx) { } } - private RoutingContext pollNextEvent() throws InterruptedException { - for (;;) { - RoutingContext request = queue.poll(10, TimeUnit.MILLISECONDS); - if (request != null) - return request; - - } - } - public void nextEvent(RoutingContext ctx) { // Vert.x barfs if you block too long so we have our own executor blockingPool.execute(() -> { final AtomicBoolean closed = new AtomicBoolean(false); ctx.response().closeHandler((v) -> closed.set(true)); + ctx.response().exceptionHandler((v) -> closed.set(true)); + ctx.request().connection().closeHandler((v) -> closed.set(true)); + ctx.request().connection().exceptionHandler((v) -> closed.set(true)); RoutingContext request = null; try { for (;;) { request = queue.poll(10, TimeUnit.MILLISECONDS); if (request != null) { if (closed.get()) { + log.debugf("Polled message %s but connection was closed, returning to queue", + request.get(AmazonLambdaApi.LAMBDA_RUNTIME_AWS_REQUEST_ID)); queue.put(request); return; } else { @@ -135,6 +138,7 @@ public void nextEvent(RoutingContext ctx) { ctx.response().putHeader(AmazonLambdaApi.LAMBDA_TRACE_HEADER_KEY, traceId); } String requestId = request.get(AmazonLambdaApi.LAMBDA_RUNTIME_AWS_REQUEST_ID); + log.debugf("Starting processing %s, added to pending request map", requestId); responsePending.put(requestId, request); ctx.response().putHeader(AmazonLambdaApi.LAMBDA_RUNTIME_AWS_REQUEST_ID, requestId); Buffer body = processEventBody(request); @@ -162,6 +166,7 @@ public void handleResponse(RoutingContext ctx) { ctx.fail(404); return; } + log.debugf("Sending response %s", requestId); Buffer buffer = ctx.getBody(); processResponse(ctx, pending, buffer); ctx.response().setStatusCode(204); @@ -176,6 +181,7 @@ public void handleRequeue(RoutingContext ctx) { ctx.fail(404); return; } + log.debugf("Requeue %s", requestId); try { queue.put(pending); } catch (InterruptedException e) { @@ -209,6 +215,7 @@ public void handleError(RoutingContext ctx) { ctx.fail(404); return; } + log.debugf("Sending response %s", requestId); Buffer buffer = ctx.getBody(); processError(ctx, pending, buffer); ctx.response().setStatusCode(204); @@ -232,9 +239,28 @@ public void processError(RoutingContext ctx, RoutingContext pending, Buffer buff @Override public void close() throws IOException { + if (!closed.compareAndSet(false, true)) { + return; + } log.info("Stopping Mock Lambda Event Server"); - httpServer.close().result(); - vertx.close().result(); - blockingPool.shutdown(); + for (var i : responsePending.entrySet()) { + i.getValue().response().setStatusCode(503).end(); + } + for (var i : queue) { + i.response().setStatusCode(503).end(); + } + try { + httpServer.close().toCompletionStage().toCompletableFuture().get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } finally { + try { + vertx.close().toCompletionStage().toCompletableFuture().get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } finally { + blockingPool.shutdown(); + } + } } } diff --git a/extensions/amazon-lambda/runtime/src/main/java/io/quarkus/amazon/lambda/runtime/AmazonLambdaRecorder.java b/extensions/amazon-lambda/runtime/src/main/java/io/quarkus/amazon/lambda/runtime/AmazonLambdaRecorder.java index 025873490e433..6fdc74a0b6bab 100644 --- a/extensions/amazon-lambda/runtime/src/main/java/io/quarkus/amazon/lambda/runtime/AmazonLambdaRecorder.java +++ b/extensions/amazon-lambda/runtime/src/main/java/io/quarkus/amazon/lambda/runtime/AmazonLambdaRecorder.java @@ -17,6 +17,7 @@ import io.quarkus.amazon.lambda.runtime.handlers.S3EventInputReader; import io.quarkus.arc.runtime.BeanContainer; +import io.quarkus.runtime.LaunchMode; import io.quarkus.runtime.ShutdownContext; import io.quarkus.runtime.annotations.Recorder; @@ -145,9 +146,9 @@ public void chooseHandlerClass(List>> uname } @SuppressWarnings("rawtypes") - public void startPollLoop(ShutdownContext context) { + public void startPollLoop(ShutdownContext context, LaunchMode launchMode) { AbstractLambdaPollLoop loop = new AbstractLambdaPollLoop(AmazonLambdaMapperRecorder.objectMapper, - AmazonLambdaMapperRecorder.cognitoIdReader, AmazonLambdaMapperRecorder.clientCtxReader) { + AmazonLambdaMapperRecorder.cognitoIdReader, AmazonLambdaMapperRecorder.clientCtxReader, launchMode) { @Override protected Object processRequest(Object input, AmazonLambdaContext context) throws Exception { diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/main/java/io/quarkus/funqy/deployment/bindings/FunqyLambdaBuildStep.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/main/java/io/quarkus/funqy/deployment/bindings/FunqyLambdaBuildStep.java index 82887800d171c..6a8bcbdc213d9 100644 --- a/extensions/funqy/funqy-amazon-lambda/deployment/src/main/java/io/quarkus/funqy/deployment/bindings/FunqyLambdaBuildStep.java +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/main/java/io/quarkus/funqy/deployment/bindings/FunqyLambdaBuildStep.java @@ -59,9 +59,10 @@ public RuntimeComplete choose(FunqyConfig config, FunqyLambdaBindingRecorder rec public void startPoolLoop(FunqyLambdaBindingRecorder recorder, RuntimeComplete ignored, ShutdownContextBuildItem shutdownContextBuildItem, + LaunchModeBuildItem launchModeBuildItem, List orderServicesFirst // try to order this after service recorders ) { - recorder.startPollLoop(shutdownContextBuildItem); + recorder.startPollLoop(shutdownContextBuildItem, launchModeBuildItem.getLaunchMode()); } @BuildStep @@ -73,7 +74,7 @@ public void startPoolLoopDevOrTest(RuntimeComplete ignored, LaunchModeBuildItem launchModeBuildItem) { LaunchMode mode = launchModeBuildItem.getLaunchMode(); if (mode.isDevOrTest()) { - recorder.startPollLoop(shutdownContextBuildItem); + recorder.startPollLoop(shutdownContextBuildItem, mode); } } } diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/FunqyLambdaBindingRecorder.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/FunqyLambdaBindingRecorder.java index 03ac85133e80c..d2726940abd85 100644 --- a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/FunqyLambdaBindingRecorder.java +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/FunqyLambdaBindingRecorder.java @@ -27,6 +27,7 @@ import io.quarkus.funqy.runtime.FunqyConfig; import io.quarkus.funqy.runtime.FunqyServerResponse; import io.quarkus.funqy.runtime.RequestContextImpl; +import io.quarkus.runtime.LaunchMode; import io.quarkus.runtime.ShutdownContext; import io.quarkus.runtime.annotations.Recorder; @@ -107,9 +108,9 @@ public static void handle(InputStream inputStream, OutputStream outputStream, Co } @SuppressWarnings("rawtypes") - public void startPollLoop(ShutdownContext context) { + public void startPollLoop(ShutdownContext context, LaunchMode launchMode) { AbstractLambdaPollLoop loop = new AbstractLambdaPollLoop(AmazonLambdaMapperRecorder.objectMapper, - AmazonLambdaMapperRecorder.cognitoIdReader, AmazonLambdaMapperRecorder.clientCtxReader) { + AmazonLambdaMapperRecorder.cognitoIdReader, AmazonLambdaMapperRecorder.clientCtxReader, launchMode) { @Override protected Object processRequest(Object input, AmazonLambdaContext context) throws Exception { diff --git a/extensions/vertx-http/deployment/src/main/java/io/quarkus/vertx/http/deployment/devmode/tests/TestsProcessor.java b/extensions/vertx-http/deployment/src/main/java/io/quarkus/vertx/http/deployment/devmode/tests/TestsProcessor.java index 8b58de97ce3ff..04d51e13b7aca 100644 --- a/extensions/vertx-http/deployment/src/main/java/io/quarkus/vertx/http/deployment/devmode/tests/TestsProcessor.java +++ b/extensions/vertx-http/deployment/src/main/java/io/quarkus/vertx/http/deployment/devmode/tests/TestsProcessor.java @@ -58,7 +58,7 @@ public void setupTestRoutes( // Add continuous testing routeBuildItemBuildProducer.produce(nonApplicationRootPathBuildItem.routeBuilder() .route("dev/test") - .handler(recorder.continousTestHandler(shutdownContextBuildItem)) + .handler(recorder.continuousTestHandler(shutdownContextBuildItem)) .build()); } diff --git a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/devmode/DevConsoleRecorder.java b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/devmode/DevConsoleRecorder.java index 8b2e5cb4ee755..1ccfd5f276f7a 100644 --- a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/devmode/DevConsoleRecorder.java +++ b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/devmode/DevConsoleRecorder.java @@ -45,7 +45,7 @@ public Handler devConsoleHandler(String devConsoleFinalDestinati return new DevConsoleStaticHandler(devConsoleFinalDestination); } - public Handler continousTestHandler(ShutdownContext context) { + public Handler continuousTestHandler(ShutdownContext context) { ContinuousTestWebSocketHandler handler = new ContinuousTestWebSocketHandler(); ContinuousTestingSharedStateManager.addStateListener(handler); @@ -53,7 +53,6 @@ public Handler continousTestHandler(ShutdownContext context) { @Override public void run() { ContinuousTestingSharedStateManager.removeStateListener(handler); - ContinuousTestingSharedStateManager.reset(); } });