Skip to content

Commit

Permalink
Capture context across all scheduler methods
Browse files Browse the repository at this point in the history
Adapted from the draft code from @luneo7 in the discussions of
- #26242
- #25818
  • Loading branch information
jponge committed Jun 20, 2022
1 parent 66d8922 commit a06b438
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package io.quarkus.mutiny.deployment;

import java.util.Optional;
import java.util.concurrent.ExecutorService;

import org.jboss.threads.ContextHandler;

import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.annotations.ExecutionTime;
import io.quarkus.deployment.annotations.Record;
import io.quarkus.deployment.builditem.ContextHandlerBuildItem;
import io.quarkus.deployment.builditem.ExecutorBuildItem;
import io.quarkus.deployment.builditem.ShutdownContextBuildItem;
import io.quarkus.mutiny.runtime.MutinyInfrastructure;
Expand All @@ -13,10 +17,13 @@ public class MutinyProcessor {

@BuildStep
@Record(ExecutionTime.RUNTIME_INIT)
public void runtimeInit(ExecutorBuildItem executorBuildItem, MutinyInfrastructure recorder,
ShutdownContextBuildItem shutdownContext) {
public void runtimeInit(ExecutorBuildItem executorBuildItem,
MutinyInfrastructure recorder,
ShutdownContextBuildItem shutdownContext,
Optional<ContextHandlerBuildItem> contextHandler) {
ExecutorService executor = executorBuildItem.getExecutorProxy();
recorder.configureMutinyInfrastructure(executor, shutdownContext);
ContextHandler<Object> handler = contextHandler.map(ContextHandlerBuildItem::contextHandler).orElse(null);
recorder.configureMutinyInfrastructure(executor, shutdownContext, handler);
}

@BuildStep
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package io.quarkus.mutiny.runtime;

import java.util.concurrent.*;

import org.jboss.threads.ContextHandler;

class ContextualRunnableScheduledFuture<V> implements RunnableScheduledFuture<V> {
private final RunnableScheduledFuture<V> runnable;
private final Object context;
private final ContextHandler<Object> contextHandler;

public ContextualRunnableScheduledFuture(ContextHandler<Object> contextHandler, Object context,
RunnableScheduledFuture<V> runnable) {
this.contextHandler = contextHandler;
this.context = context;
this.runnable = runnable;
}

@Override
public boolean isPeriodic() {
return runnable.isPeriodic();
}

@Override
public long getDelay(TimeUnit unit) {
return runnable.getDelay(unit);
}

@Override
public int compareTo(Delayed o) {
return runnable.compareTo(o);
}

@Override
public void run() {
if (contextHandler != null) {
contextHandler.runWith(runnable, context);
} else {
runnable.run();
}
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return runnable.cancel(mayInterruptIfRunning);
}

@Override
public boolean isCancelled() {
return runnable.isCancelled();
}

@Override
public boolean isDone() {
return runnable.isDone();
}

@Override
public V get() throws InterruptedException, ExecutionException {
return runnable.get();
}

@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return runnable.get(timeout, unit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.function.Consumer;

import org.jboss.logging.Logger;
import org.jboss.threads.ContextHandler;

import io.quarkus.runtime.ShutdownContext;
import io.quarkus.runtime.annotations.Recorder;
Expand All @@ -18,12 +19,25 @@ public class MutinyInfrastructure {

public static final String VERTX_EVENT_LOOP_THREAD_PREFIX = "vert.x-eventloop-thread-";

public void configureMutinyInfrastructure(ExecutorService executor, ShutdownContext shutdownContext) {
public void configureMutinyInfrastructure(ExecutorService executor, ShutdownContext shutdownContext,
ContextHandler<Object> contextHandler) {
// Mutiny leaks a ScheduledExecutorService if we don't do this
Infrastructure.getDefaultWorkerPool().shutdown();

// Since executor is not a ScheduledExecutorService and Mutiny needs one for scheduling we have to adapt one around the provided executor
MutinyScheduler mutinyScheduler = new MutinyScheduler(executor);
MutinyScheduler mutinyScheduler = new MutinyScheduler(executor) {
@Override
protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
return super.decorateTask(runnable,
new ContextualRunnableScheduledFuture<>(contextHandler, contextHandler.captureContext(), task));
}

@Override
protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) {
return super.decorateTask(callable,
new ContextualRunnableScheduledFuture<>(contextHandler, contextHandler.captureContext(), task));
}
};
Infrastructure.setDefaultExecutor(new ScheduledExecutorService() {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,14 +546,15 @@ public Object captureContext() {

@Override
public void runWith(Runnable task, Object context) {
if (context != null) {
ContextInternal currentContext = (ContextInternal) Vertx.currentContext();
if (context != null && context != currentContext) {
// Only do context handling if it's non null
final ContextInternal vertxContext = (ContextInternal) context;
vertxContext.beginDispatch();
try {
task.run();
} finally {
vertxContext.endDispatch(null);
vertxContext.endDispatch(currentContext);
}
} else {
task.run();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.quarkus.it.resteasy.mutiny.regression.bug25818;

import java.util.concurrent.TimeUnit;

import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
Expand All @@ -10,6 +12,7 @@

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.vertx.core.Context;
import io.vertx.core.Vertx;

@Path("/reproducer/25818")
Expand Down Expand Up @@ -45,4 +48,36 @@ public Uni<String> defaultExecutor() {
.item(service::getBlocking)
.runSubscriptionOn(Infrastructure.getDefaultExecutor());
}

@GET
@Path("/worker-pool-submit")
public Uni<String> workerPoolSubmit() {
Vertx.currentContext().putLocal("yolo", "yolo");
return Uni.createFrom().emitter(emitter -> {
Infrastructure.getDefaultWorkerPool().submit(() -> {
Context ctx = Vertx.currentContext();
if (ctx != null) {
emitter.complete("yolo -> " + ctx.getLocal("yolo"));
} else {
emitter.complete("Context was null");
}
});
});
}

@GET
@Path("/worker-pool-schedule")
public Uni<String> workerPoolSchedule() {
Vertx.currentContext().putLocal("yolo", "yolo");
return Uni.createFrom().emitter(emitter -> {
Infrastructure.getDefaultWorkerPool().schedule(() -> {
Context ctx = Vertx.currentContext();
if (ctx != null) {
emitter.complete("yolo -> " + ctx.getLocal("yolo"));
} else {
emitter.complete("Context was null");
}
}, 25, TimeUnit.MILLISECONDS);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,21 @@ public void testWorkerPool() {
.body(is("hello-you"))
.statusCode(200);
}

@Test
public void yolo1() {
get("/reproducer/25818/worker-pool-submit")
.then()
.body(is("yolo -> yolo"))
.statusCode(200);
}

@Test
public void yolo2() {
get("/reproducer/25818/worker-pool-schedule")
.then()
.body(is("yolo -> yolo"))
.statusCode(200);
}
}
}

0 comments on commit a06b438

Please sign in to comment.