Skip to content

Commit

Permalink
Make sure all lambda requests are done
Browse files Browse the repository at this point in the history
It's possible that on shutdown the polling thread
could still have an active request. If this thread
gets a request from the queue it will break subsequent
test profile runs.

Fixes quarkusio#15195
  • Loading branch information
stuartwdouglas committed Mar 17, 2021
1 parent ce244e1 commit 7c6f45a
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ public AbstractLambdaPollLoop(ObjectMapper objectMapper, ObjectReader cognitoIdR
public void startPollLoop(ShutdownContext context) {

final AtomicBoolean running = new AtomicBoolean(true);

final Thread pollingThread = new Thread(new Runnable() {
@SuppressWarnings("unchecked")
@Override
Expand Down Expand Up @@ -122,8 +121,11 @@ public void run() {
}
}
}, "Lambda Thread");
pollingThread.setDaemon(true);
context.addShutdownTask(() -> {
running.set(false);
//note that this does not seem to be 100% reliable in unblocking the thread
//which is why it is a daemon.
pollingThread.interrupt();
});
pollingThread.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,41 +7,54 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import com.fasterxml.jackson.databind.ObjectMapper;

import io.quarkus.amazon.lambda.runtime.AmazonLambdaApi;
import io.quarkus.amazon.lambda.runtime.FunctionError;
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
import io.undertow.Undertow;
import io.undertow.httpcore.StatusCodes;
import io.undertow.server.HttpHandler;
import io.undertow.server.HttpServerExchange;
import io.undertow.server.RoutingHandler;
import io.undertow.server.handlers.BlockingHandler;

public class LambdaResourceManager implements QuarkusTestResourceLifecycleManager {

private volatile boolean started = false;
private volatile Undertow undertow;
private final AtomicInteger currentPollCount = new AtomicInteger();

public static final int PORT = Integer.getInteger("quarkus-internal.aws-lambda.test-port", 5387);

@Override
public Map<String, String> start() {

started = true;
RoutingHandler routingHandler = new RoutingHandler(true);
routingHandler.add("GET", AmazonLambdaApi.API_PATH_INVOCATION_NEXT, new HttpHandler() {
@Override
public void handleRequest(HttpServerExchange exchange) throws Exception {
LambdaStartedNotifier.started = true;
Map.Entry<String, String> req = null;
while (req == null) {
req = LambdaClient.REQUEST_QUEUE.poll(100, TimeUnit.MILLISECONDS);
if (undertow == null || undertow.getWorker() == null || undertow.getWorker().isShutdown()) {
return;
currentPollCount.incrementAndGet();
try {
LambdaStartedNotifier.started = true;
Map.Entry<String, String> req = null;
while (req == null) {
req = LambdaClient.REQUEST_QUEUE.poll(100, TimeUnit.MILLISECONDS);
if (!started || undertow == null || undertow.getWorker() == null || undertow.getWorker().isShutdown()) {
exchange.setStatusCode(StatusCodes.SERVICE_UNAVAILABLE);
exchange.setPersistent(false);
exchange.getOutputStream().close();
return;
}
}
exchange.addResponseHeader(AmazonLambdaApi.LAMBDA_RUNTIME_AWS_REQUEST_ID, req.getKey());
exchange.getOutputStream().write(req.getValue().getBytes(StandardCharsets.UTF_8));
exchange.getOutputStream().close();
} finally {
currentPollCount.decrementAndGet();
}
exchange.addResponseHeader(AmazonLambdaApi.LAMBDA_RUNTIME_AWS_REQUEST_ID, req.getKey());
exchange.writeAsync(req.getValue());
}
});
routingHandler.add("POST", AmazonLambdaApi.API_PATH_INVOCATION + "{req}" + AmazonLambdaApi.API_PATH_RESPONSE,
Expand Down Expand Up @@ -117,6 +130,16 @@ public void handleRequest(HttpServerExchange exchange) throws Exception {

@Override
public void stop() {
started = false;
while (currentPollCount.get() > 0) {
try {
//wait for all the responses to be sent
//before we shutdown
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
if (undertow == null)
return;
undertow.stop();
Expand Down

0 comments on commit 7c6f45a

Please sign in to comment.