From a3e6ae5a8e40c5627350f1c1fa1b5bd9cb231f19 Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Tue, 4 Jan 2022 10:23:51 +0100 Subject: [PATCH] Allow Vert.x to use other Context implementations than EventLoopContext and WorkerContext. The Context and its related classes/intefaces have been refactored to pull up code as much as possible in Context and ContextInternal in order to ease new implementations, consequently the AbstractContext class has been removed. The ContextImpl class has been renamed ContextBase and provides a base class for implementing context. fixes #4425 --- src/main/java/io/vertx/core/Context.java | 13 +- .../vertx/core/file/impl/AsyncFileImpl.java | 4 +- .../core/http/impl/Http1xServerRequest.java | 2 +- .../io/vertx/core/impl/AbstractContext.java | 235 ------------------ .../{ContextImpl.java => ContextBase.java} | 87 ++----- .../io/vertx/core/impl/ContextInternal.java | 188 ++++++++++++-- .../io/vertx/core/impl/DeploymentManager.java | 10 +- .../io/vertx/core/impl/DuplicatedContext.java | 37 +-- .../io/vertx/core/impl/EventLoopContext.java | 22 +- .../java/io/vertx/core/impl/HAManager.java | 8 +- .../java/io/vertx/core/impl/VertxImpl.java | 155 +++++++++++- .../io/vertx/core/impl/VertxInternal.java | 4 +- .../java/io/vertx/core/impl/VertxThread.java | 47 +--- .../io/vertx/core/impl/WorkerContext.java | 22 +- .../vertx/core/impl/WorkerExecutorImpl.java | 4 +- .../core/streams/impl/InboundBuffer.java | 3 +- .../io/vertx/core/impl/BenchmarkContext.java | 18 +- src/test/java/io/vertx/core/ContextTest.java | 30 +++ src/test/java/io/vertx/core/FakeContext.java | 174 +++++++++++++ 19 files changed, 620 insertions(+), 443 deletions(-) delete mode 100644 src/main/java/io/vertx/core/impl/AbstractContext.java rename src/main/java/io/vertx/core/impl/{ContextImpl.java => ContextBase.java} (72%) create mode 100644 src/test/java/io/vertx/core/FakeContext.java diff --git a/src/main/java/io/vertx/core/Context.java b/src/main/java/io/vertx/core/Context.java index 046cf4428a1..f8884827ba9 100644 --- a/src/main/java/io/vertx/core/Context.java +++ b/src/main/java/io/vertx/core/Context.java @@ -16,6 +16,7 @@ import io.vertx.codegen.annotations.Nullable; import io.vertx.codegen.annotations.VertxGen; import io.vertx.core.impl.VertxThread; +import io.vertx.core.impl.launcher.VertxCommandLauncher; import io.vertx.core.json.JsonObject; import java.util.List; @@ -135,7 +136,9 @@ static boolean isOnVertxThread() { * @param resultHandler handler that will be called when the blocking code is complete * @param the type of the result */ - void executeBlocking(Handler> blockingCodeHandler, Handler> resultHandler); + default void executeBlocking(Handler> blockingCodeHandler, Handler> resultHandler) { + executeBlocking(blockingCodeHandler, true, resultHandler); + } /** * Same as {@link #executeBlocking(Handler, boolean, Handler)} but with an {@code handler} called when the operation completes @@ -145,7 +148,9 @@ static boolean isOnVertxThread() { /** * Same as {@link #executeBlocking(Handler, Handler)} but with an {@code handler} called when the operation completes */ - Future executeBlocking(Handler> blockingCodeHandler); + default Future executeBlocking(Handler> blockingCodeHandler) { + return executeBlocking(blockingCodeHandler, true); + } /** * If the context is associated with a Verticle deployment, this returns the deployment ID of that deployment. @@ -165,7 +170,9 @@ static boolean isOnVertxThread() { /** * The process args */ - List processArgs(); + default List processArgs() { + return VertxCommandLauncher.getProcessArguments(); + } /** * Is the current context an event loop context? diff --git a/src/main/java/io/vertx/core/file/impl/AsyncFileImpl.java b/src/main/java/io/vertx/core/file/impl/AsyncFileImpl.java index fe31d53733a..3b7d5b6c2ed 100644 --- a/src/main/java/io/vertx/core/file/impl/AsyncFileImpl.java +++ b/src/main/java/io/vertx/core/file/impl/AsyncFileImpl.java @@ -99,9 +99,9 @@ public class AsyncFileImpl implements AsyncFile { try { if (options.getPerms() != null) { FileAttribute attrs = PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString(options.getPerms())); - ch = AsynchronousFileChannel.open(file, opts, vertx.getWorkerPool(), attrs); + ch = AsynchronousFileChannel.open(file, opts, vertx.getWorkerPool().executor(), attrs); } else { - ch = AsynchronousFileChannel.open(file, opts, vertx.getWorkerPool()); + ch = AsynchronousFileChannel.open(file, opts, vertx.getWorkerPool().executor()); } if (options.isAppend()) writePos = ch.size(); } catch (IOException e) { diff --git a/src/main/java/io/vertx/core/http/impl/Http1xServerRequest.java b/src/main/java/io/vertx/core/http/impl/Http1xServerRequest.java index 4be9d850616..992615af25e 100644 --- a/src/main/java/io/vertx/core/http/impl/Http1xServerRequest.java +++ b/src/main/java/io/vertx/core/http/impl/Http1xServerRequest.java @@ -123,7 +123,7 @@ void setRequest(HttpRequest request) { private InboundBuffer pendingQueue() { if (pending == null) { - pending = new InboundBuffer<>(conn.getContext(), 8); + pending = new InboundBuffer<>(context, 8); pending.drainHandler(v -> conn.doResume()); pending.handler(buffer -> { if (buffer == InboundBuffer.END_SENTINEL) { diff --git a/src/main/java/io/vertx/core/impl/AbstractContext.java b/src/main/java/io/vertx/core/impl/AbstractContext.java deleted file mode 100644 index 236b95a811e..00000000000 --- a/src/main/java/io/vertx/core/impl/AbstractContext.java +++ /dev/null @@ -1,235 +0,0 @@ -/* - * Copyright (c) 2011-2019 Contributors to the Eclipse Foundation - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License 2.0 which is available at - * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 - * which is available at https://www.apache.org/licenses/LICENSE-2.0. - * - * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 - */ -package io.vertx.core.impl; - -import io.vertx.core.*; -import io.vertx.core.impl.future.FailedFuture; -import io.vertx.core.impl.future.PromiseImpl; -import io.vertx.core.impl.future.PromiseInternal; -import io.vertx.core.impl.future.SucceededFuture; -import io.vertx.core.impl.launcher.VertxCommandLauncher; - -import java.util.List; -import java.util.concurrent.TimeUnit; - -/** - * A context implementation that does not hold any specific state. - * - * @author Julien Viet - * @author Tim Fox - */ -abstract class AbstractContext implements ContextInternal { - - final boolean disableTCCL; - - public AbstractContext(boolean disableTCCL) { - this.disableTCCL = disableTCCL; - } - - @Override - public abstract boolean isEventLoopContext(); - - @Override - public final boolean isRunningOnContext() { - return Vertx.currentContext() == this && inThread(); - } - - abstract boolean inThread(); - - @Override - public boolean isWorkerContext() { - return !isEventLoopContext(); - } - - @Override - public void emit(Handler task) { - emit(null, task); - } - - @Override - public final void execute(Handler task) { - execute(null, task); - } - - @Override - public final void dispatch(Handler handler) { - dispatch(null, handler); - } - - public final ContextInternal beginDispatch() { - ContextInternal prev; - VertxThread th = (VertxThread) Thread.currentThread(); - prev = th.beginEmission(this); - if (!disableTCCL) { - th.setContextClassLoader(classLoader()); - } - return prev; - } - - public final void endDispatch(ContextInternal previous) { - VertxThread th = (VertxThread) Thread.currentThread(); - if (!disableTCCL) { - th.setContextClassLoader(previous != null ? previous.classLoader() : null); - } - th.endEmission(previous); - } - - @Override - public long setPeriodic(long delay, Handler handler) { - VertxImpl owner = (VertxImpl) owner(); - return owner.scheduleTimeout(this, true, delay, TimeUnit.MILLISECONDS, false, handler); - } - - @Override - public long setTimer(long delay, Handler handler) { - VertxImpl owner = (VertxImpl) owner(); - return owner.scheduleTimeout(this, false, delay, TimeUnit.MILLISECONDS, false, handler); - } - - @Override - public final void dispatch(T event, Handler handler) { - ContextInternal prev = beginDispatch(); - try { - handler.handle(event); - } catch (Throwable t) { - reportException(t); - } finally { - endDispatch(prev); - } - } - - public final void dispatch(Runnable handler) { - ContextInternal prev = beginDispatch(); - try { - handler.run(); - } catch (Throwable t) { - reportException(t); - } finally { - endDispatch(prev); - } - } - - @Override - public final List processArgs() { - return VertxCommandLauncher.getProcessArguments(); - } - - @Override - public final void executeBlockingInternal(Handler> action, Handler> resultHandler) { - Future fut = executeBlockingInternal(action); - setResultHandler(this, fut, resultHandler); - } - - @Override - public void executeBlockingInternal(Handler> action, boolean ordered, Handler> resultHandler) { - Future fut = executeBlockingInternal(action, ordered); - setResultHandler(this, fut, resultHandler); - } - - @Override - public void executeBlocking(Handler> blockingCodeHandler, boolean ordered, Handler> resultHandler) { - Future fut = executeBlocking(blockingCodeHandler, ordered); - setResultHandler(this, fut, resultHandler); - } - - @Override - public void executeBlocking(Handler> blockingCodeHandler, TaskQueue queue, Handler> resultHandler) { - Future fut = executeBlocking(blockingCodeHandler, queue); - setResultHandler(this, fut, resultHandler); - } - - @Override - public final void executeBlocking(Handler> blockingCodeHandler, Handler> resultHandler) { - executeBlocking(blockingCodeHandler, true, resultHandler); - } - - public Future executeBlocking(Handler> blockingCodeHandler) { - return executeBlocking(blockingCodeHandler, true); - } - - @Override - public PromiseInternal promise() { - return new PromiseImpl<>(this); - } - - @Override - public PromiseInternal promise(Handler> handler) { - if (handler instanceof PromiseInternal) { - PromiseInternal promise = (PromiseInternal) handler; - if (promise.context() != null) { - return promise; - } - } - PromiseInternal promise = promise(); - promise.future().onComplete(handler); - return promise; - } - - @Override - public Future succeededFuture() { - return new SucceededFuture<>(this, null); - } - - @Override - public Future succeededFuture(T result) { - return new SucceededFuture<>(this, result); - } - - @Override - public Future failedFuture(Throwable failure) { - return new FailedFuture<>(this, failure); - } - - @Override - public Future failedFuture(String message) { - return new FailedFuture<>(this, message); - } - - @SuppressWarnings("unchecked") - @Override - public final T get(Object key) { - return (T) contextData().get(key); - } - - @Override - public final void put(Object key, Object value) { - contextData().put(key, value); - } - - @Override - public final boolean remove(Object key) { - return contextData().remove(key) != null; - } - - @SuppressWarnings("unchecked") - @Override - public final T getLocal(Object key) { - return (T) localContextData().get(key); - } - - @Override - public final void putLocal(Object key, Object value) { - localContextData().put(key, value); - } - - @Override - public final boolean removeLocal(Object key) { - return localContextData().remove(key) != null; - } - - private static void setResultHandler(ContextInternal ctx, Future fut, Handler> resultHandler) { - if (resultHandler != null) { - fut.onComplete(resultHandler); - } else { - fut.onFailure(ctx::reportException); - } - } -} diff --git a/src/main/java/io/vertx/core/impl/ContextImpl.java b/src/main/java/io/vertx/core/impl/ContextBase.java similarity index 72% rename from src/main/java/io/vertx/core/impl/ContextImpl.java rename to src/main/java/io/vertx/core/impl/ContextBase.java index 9815bdf92f0..450e0828648 100644 --- a/src/main/java/io/vertx/core/impl/ContextImpl.java +++ b/src/main/java/io/vertx/core/impl/ContextBase.java @@ -25,32 +25,22 @@ import java.util.concurrent.RejectedExecutionException; /** + * A base class for {@link Context} implementations. + * * @author Tim Fox + * @author Julien Viet */ -abstract class ContextImpl extends AbstractContext { - - /** - * Execute the {@code task} disabling the thread-local association for the duration - * of the execution. {@link Vertx#currentContext()} will return {@code null}, - * @param task the task to execute - * @throws IllegalStateException if the current thread is not a Vertx thread - */ - static void executeIsolated(Handler task) { - Thread currentThread = Thread.currentThread(); - if (currentThread instanceof VertxThread) { - VertxThread vertxThread = (VertxThread) currentThread; - ContextInternal prev = vertxThread.beginEmission(null); - try { - task.handle(null); - } finally { - vertxThread.endEmission(prev); - } +public abstract class ContextBase implements ContextInternal { + + static void setResultHandler(ContextInternal ctx, Future fut, Handler> resultHandler) { + if (resultHandler != null) { + fut.onComplete(resultHandler); } else { - task.handle(null); + fut.onFailure(ctx::reportException); } } - private static final Logger log = LoggerFactory.getLogger(ContextImpl.class); + private static final Logger log = LoggerFactory.getLogger(ContextBase.class); private static final String DISABLE_TIMINGS_PROP_NAME = "vertx.disableContextTimings"; static final boolean DISABLE_TIMINGS = Boolean.getBoolean(DISABLE_TIMINGS_PROP_NAME); @@ -65,19 +55,17 @@ static void executeIsolated(Handler task) { private ConcurrentMap localData; private volatile Handler exceptionHandler; final TaskQueue internalOrderedTasks; - final WorkerPool internalBlockingPool; + final WorkerPool internalWorkerPool; final WorkerPool workerPool; final TaskQueue orderedTasks; - ContextImpl(VertxInternal vertx, - EventLoop eventLoop, - WorkerPool internalBlockingPool, - WorkerPool workerPool, - Deployment deployment, - CloseFuture closeFuture, - ClassLoader tccl, - boolean disableTCCL) { - super(disableTCCL); + protected ContextBase(VertxInternal vertx, + EventLoop eventLoop, + WorkerPool internalWorkerPool, + WorkerPool workerPool, + Deployment deployment, + CloseFuture closeFuture, + ClassLoader tccl) { this.deployment = deployment; this.config = deployment != null ? deployment.config() : new JsonObject(); this.eventLoop = eventLoop; @@ -85,7 +73,7 @@ static void executeIsolated(Handler task) { this.owner = vertx; this.workerPool = workerPool; this.closeFuture = closeFuture; - this.internalBlockingPool = internalBlockingPool; + this.internalWorkerPool = internalWorkerPool; this.orderedTasks = new TaskQueue(); this.internalOrderedTasks = new TaskQueue(); } @@ -99,16 +87,6 @@ public CloseFuture closeFuture() { return closeFuture; } - @Override - public boolean isDeployment() { - return deployment != null; - } - - @Override - public String deploymentID() { - return deployment != null ? deployment.deploymentID() : null; - } - @Override public JsonObject config() { return config; @@ -124,12 +102,12 @@ public VertxInternal owner() { @Override public Future executeBlockingInternal(Handler> action) { - return executeBlocking(this, action, internalBlockingPool, internalOrderedTasks); + return executeBlocking(this, action, internalWorkerPool, internalOrderedTasks); } @Override public Future executeBlockingInternal(Handler> action, boolean ordered) { - return executeBlocking(this, action, internalBlockingPool, ordered ? internalOrderedTasks : null); + return executeBlocking(this, action, internalWorkerPool, ordered ? internalOrderedTasks : null); } @Override @@ -235,49 +213,36 @@ public Handler exceptionHandler() { return exceptionHandler; } - public int getInstanceCount() { - // the no verticle case - if (deployment == null) { - return 0; - } - - // the single verticle without an instance flag explicitly defined - if (deployment.deploymentOptions() == null) { - return 1; - } - return deployment.deploymentOptions().getInstances(); - } - @Override public final void runOnContext(Handler action) { runOnContext(this, action); } - abstract void runOnContext(AbstractContext ctx, Handler action); + protected abstract void runOnContext(ContextInternal ctx, Handler action); @Override public void execute(Runnable task) { execute(this, task); } - abstract void execute(AbstractContext ctx, Runnable task); + protected abstract void execute(ContextInternal ctx, Runnable task); @Override public final void execute(T argument, Handler task) { execute(this, argument, task); } - abstract void execute(AbstractContext ctx, T argument, Handler task); + protected abstract void execute(ContextInternal ctx, T argument, Handler task); @Override public void emit(T argument, Handler task) { emit(this, argument, task); } - abstract void emit(AbstractContext ctx, T argument, Handler task); + protected abstract void emit(ContextInternal ctx, T argument, Handler task); @Override - public final ContextInternal duplicate() { + public ContextInternal duplicate() { return new DuplicatedContext(this); } } diff --git a/src/main/java/io/vertx/core/impl/ContextInternal.java b/src/main/java/io/vertx/core/impl/ContextInternal.java index b4e70c2e0d9..b5a44898c35 100644 --- a/src/main/java/io/vertx/core/impl/ContextInternal.java +++ b/src/main/java/io/vertx/core/impl/ContextInternal.java @@ -14,11 +14,17 @@ import io.netty.channel.EventLoop; import io.netty.util.concurrent.FastThreadLocalThread; import io.vertx.core.*; +import io.vertx.core.impl.future.FailedFuture; +import io.vertx.core.impl.future.PromiseImpl; import io.vertx.core.impl.future.PromiseInternal; +import io.vertx.core.impl.future.SucceededFuture; import io.vertx.core.spi.tracing.VertxTracer; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; + +import static io.vertx.core.impl.ContextBase.setResultHandler; /** * This interface provides an api for vert.x core internal use only @@ -33,9 +39,14 @@ public interface ContextInternal extends Context { * @return the current context */ static ContextInternal current() { - Thread current = Thread.currentThread(); - if (current instanceof VertxThread) { - return ((VertxThread) current).context(); + Thread thread = Thread.currentThread(); + if (thread instanceof VertxThread) { + return ((VertxThread) thread).context(); + } else { + VertxImpl.ContextDispatch current = VertxImpl.nonVertxContextDispatch.get(); + if (current != null) { + return current.context; + } } return null; } @@ -61,39 +72,62 @@ default void runOnContext(Handler action) { /** * @return a {@link Promise} associated with this context */ - PromiseInternal promise(); + default PromiseInternal promise() { + return new PromiseImpl<>(this); + } /** * @return a {@link Promise} associated with this context or the {@code handler} * if that handler is already an instance of {@code PromiseInternal} */ - PromiseInternal promise(Handler> handler); + default PromiseInternal promise(Handler> handler) { + if (handler instanceof PromiseInternal) { + PromiseInternal promise = (PromiseInternal) handler; + if (promise.context() != null) { + return promise; + } + } + PromiseInternal promise = promise(); + promise.future().onComplete(handler); + return promise; + } /** * @return an empty succeeded {@link Future} associated with this context */ - Future succeededFuture(); + default Future succeededFuture() { + return new SucceededFuture<>(this, null); + } /** * @return a succeeded {@link Future} of the {@code result} associated with this context */ - Future succeededFuture(T result); + default Future succeededFuture(T result) { + return new SucceededFuture<>(this, result); + } /** * @return a {@link Future} failed with the {@code failure} associated with this context */ - Future failedFuture(Throwable failure); + default Future failedFuture(Throwable failure) { + return new FailedFuture<>(this, failure); + } /** * @return a {@link Future} failed with the {@code message} associated with this context */ - Future failedFuture(String message); + default Future failedFuture(String message) { + return new FailedFuture<>(this, message); + } /** * Like {@link #executeBlocking(Handler, boolean, Handler)} but uses the {@code queue} to order the tasks instead * of the internal queue of this context. */ - void executeBlocking(Handler> blockingCodeHandler, TaskQueue queue, Handler> resultHandler); + default void executeBlocking(Handler> blockingCodeHandler, TaskQueue queue, Handler> resultHandler) { + Future fut = executeBlocking(blockingCodeHandler, queue); + setResultHandler(this, fut, resultHandler); + } /** * Like {@link #executeBlocking(Handler, boolean)} but uses the {@code queue} to order the tasks instead @@ -104,9 +138,21 @@ default void runOnContext(Handler action) { /** * Execute an internal task on the internal blocking ordered executor. */ - void executeBlockingInternal(Handler> action, Handler> resultHandler); + default void executeBlockingInternal(Handler> action, Handler> resultHandler) { + Future fut = executeBlockingInternal(action); + setResultHandler(this, fut, resultHandler); + } - void executeBlockingInternal(Handler> action, boolean ordered, Handler> resultHandler); + default void executeBlockingInternal(Handler> action, boolean ordered, Handler> resultHandler) { + Future fut = executeBlockingInternal(action, ordered); + setResultHandler(this, fut, resultHandler); + } + + @Override + default void executeBlocking(Handler> blockingCodeHandler, boolean ordered, Handler> resultHandler) { + Future fut = executeBlocking(blockingCodeHandler, ordered); + setResultHandler(this, fut, resultHandler); + } /** * Like {@link #executeBlockingInternal(Handler, Handler)} but returns a {@code Future} of the asynchronous result @@ -126,6 +172,8 @@ default void runOnContext(Handler action) { @Override VertxInternal owner(); + boolean inThread(); + /** * Emit the given {@code argument} event to the {@code task} and switch on this context if necessary, this also associates the * current thread with the current context so {@link Vertx#currentContext()} returns this context. @@ -142,12 +190,16 @@ default void runOnContext(Handler action) { /** * @see #emit(Object, Handler) */ - void emit(Handler task); + default void emit(Handler task) { + emit(null, task); + } /** * @see #execute(Object, Handler) */ - void execute(Handler task); + default void execute(Handler task) { + execute(null, task); + } /** * Execute the {@code task} on this context, it will be executed according to the @@ -169,22 +221,35 @@ default void runOnContext(Handler action) { /** * @return whether the current thread is running on this context */ - boolean isRunningOnContext(); + default boolean isRunningOnContext() { + return current() == this && inThread(); + } /** * @see #dispatch(Handler) */ - void dispatch(Runnable handler); + default void dispatch(Runnable handler) { + ContextInternal prev = beginDispatch(); + try { + handler.run(); + } catch (Throwable t) { + reportException(t); + } finally { + endDispatch(prev); + } + } /** * @see #dispatch(Object, Handler) */ - void dispatch(Handler handler); + default void dispatch(Handler handler) { + dispatch(null, handler); + } /** * Dispatch an {@code event} to the {@code handler} on this context. *

- * The handler is executed directly by the caller thread which must be a {@link VertxThread} or a {@link FastThreadLocalThread}. + * The handler is executed directly by the caller thread which must be a context thread. *

* The handler execution is monitored by the blocked thread checker. *

@@ -193,7 +258,16 @@ default void runOnContext(Handler action) { * @param event the event for the {@code handler} * @param handler the handler to execute with the {@code event} */ - void dispatch(E event, Handler handler); + default void dispatch(E event, Handler handler) { + ContextInternal prev = beginDispatch(); + try { + handler.handle(event); + } catch (Throwable t) { + reportException(t); + } finally { + endDispatch(prev); + } + } /** * Begin the execution of a task on this context. @@ -207,7 +281,10 @@ default void runOnContext(Handler action) { * @return the previous context that shall be restored after or {@code null} if there is none * @throws IllegalStateException when the current thread of execution cannot execute this task */ - ContextInternal beginDispatch(); + default ContextInternal beginDispatch() { + VertxImpl vertx = (VertxImpl) owner(); + return vertx.beginDispatch(this); + } /** * End the execution of a task on this context, see {@link #beginDispatch()} @@ -217,7 +294,10 @@ default void runOnContext(Handler action) { * @param previous the previous context to restore or {@code null} if there is none * @throws IllegalStateException when the current thread of execution cannot execute this task */ - void endDispatch(ContextInternal previous); + default void endDispatch(ContextInternal previous) { + VertxImpl vertx = (VertxImpl) owner(); + vertx.endDispatch(previous); + } /** * Report an exception to this context synchronously. @@ -235,11 +315,43 @@ default void runOnContext(Handler action) { */ ConcurrentMap contextData(); + @SuppressWarnings("unchecked") + @Override + default T get(Object key) { + return (T) contextData().get(key); + } + + @Override + default void put(Object key, Object value) { + contextData().put(key, value); + } + + @Override + default boolean remove(Object key) { + return contextData().remove(key) != null; + } + /** * @return the {@link ConcurrentMap} used to store local context data */ ConcurrentMap localContextData(); + @SuppressWarnings("unchecked") + @Override + default T getLocal(Object key) { + return (T) localContextData().get(key); + } + + @Override + default void putLocal(Object key, Object value) { + localContextData().put(key, value); + } + + @Override + default boolean removeLocal(Object key) { + return localContextData().remove(key) != null; + } + /** * @return the classloader associated with this context */ @@ -280,18 +392,46 @@ default void runOnContext(Handler action) { * Like {@link Vertx#setPeriodic(long, Handler)} except the periodic timer will fire on this context and the * timer will not be associated with the context close hook. */ - long setPeriodic(long delay, Handler handler); + default long setPeriodic(long delay, Handler handler) { + VertxImpl owner = (VertxImpl) owner(); + return owner.scheduleTimeout(this, true, delay, TimeUnit.MILLISECONDS, false, handler); + } /** * Like {@link Vertx#setTimer(long, Handler)} except the timer will fire on this context and the timer * will not be associated with the context close hook. */ - long setTimer(long delay, Handler handler); + default long setTimer(long delay, Handler handler) { + VertxImpl owner = (VertxImpl) owner(); + return owner.scheduleTimeout(this, false, delay, TimeUnit.MILLISECONDS, false, handler); + } /** * @return {@code true} when the context is associated with a deployment */ - boolean isDeployment(); + default boolean isDeployment() { + return getDeployment() != null; + } + + default String deploymentID() { + Deployment deployment = getDeployment(); + return deployment != null ? deployment.deploymentID() : null; + } + + default int getInstanceCount() { + Deployment deployment = getDeployment(); + + // the no verticle case + if (deployment == null) { + return 0; + } + + // the single verticle without an instance flag explicitly defined + if (deployment.deploymentOptions() == null) { + return 1; + } + return deployment.deploymentOptions().getInstances(); + } CloseFuture closeFuture(); diff --git a/src/main/java/io/vertx/core/impl/DeploymentManager.java b/src/main/java/io/vertx/core/impl/DeploymentManager.java index adc8a015e16..50d97b91ba3 100644 --- a/src/main/java/io/vertx/core/impl/DeploymentManager.java +++ b/src/main/java/io/vertx/core/impl/DeploymentManager.java @@ -184,7 +184,7 @@ private Future doDeploy(String identifier, for (Verticle verticle: verticles) { CloseFuture closeFuture = new CloseFuture(log); WorkerPool workerPool = poolName != null ? vertx.createSharedWorkerPool(poolName, options.getWorkerPoolSize(), options.getMaxWorkerExecuteTime(), options.getMaxWorkerExecuteTimeUnit()) : null; - ContextImpl context = (options.isWorker() ? vertx.createWorkerContext(deployment, closeFuture, workerPool, tccl) : + ContextBase context = (options.isWorker() ? vertx.createWorkerContext(deployment, closeFuture, workerPool, tccl) : vertx.createEventLoopContext(deployment, closeFuture, workerPool, tccl)); VerticleHolder holder = new VerticleHolder(verticle, context, workerPool, closeFuture); deployment.addVerticle(holder); @@ -226,11 +226,11 @@ private Future doDeploy(String identifier, static class VerticleHolder { final Verticle verticle; - final ContextImpl context; + final ContextBase context; final WorkerPool workerPool; final CloseFuture closeFuture; - VerticleHolder(Verticle verticle, ContextImpl context, WorkerPool workerPool, CloseFuture closeFuture) { + VerticleHolder(Verticle verticle, ContextBase context, WorkerPool workerPool, CloseFuture closeFuture) { this.verticle = verticle; this.context = context; this.workerPool = workerPool; @@ -274,7 +274,7 @@ public void addVerticle(VerticleHolder holder) { verticles.add(holder); } - private synchronized void rollback(ContextInternal callingContext, Handler> completionHandler, ContextImpl context, VerticleHolder closeFuture, Throwable cause) { + private synchronized void rollback(ContextInternal callingContext, Handler> completionHandler, ContextBase context, VerticleHolder closeFuture, Throwable cause) { if (status == ST_DEPLOYED) { status = ST_UNDEPLOYING; doUndeployChildren(callingContext).onComplete(childrenResult -> { @@ -331,7 +331,7 @@ public synchronized Future doUndeploy(ContextInternal undeployingContext) parent.removeChild(this); } for (VerticleHolder verticleHolder: verticles) { - ContextImpl context = verticleHolder.context; + ContextBase context = verticleHolder.context; Promise p = Promise.promise(); undeployFutures.add(p.future()); context.runOnContext(v -> { diff --git a/src/main/java/io/vertx/core/impl/DuplicatedContext.java b/src/main/java/io/vertx/core/impl/DuplicatedContext.java index 9307b584575..9484eb3d46f 100644 --- a/src/main/java/io/vertx/core/impl/DuplicatedContext.java +++ b/src/main/java/io/vertx/core/impl/DuplicatedContext.java @@ -33,18 +33,17 @@ * * @author Julien Viet */ -class DuplicatedContext extends AbstractContext { +class DuplicatedContext implements ContextInternal { - protected final ContextImpl delegate; + protected final ContextBase delegate; private ConcurrentMap localData; - DuplicatedContext(ContextImpl delegate) { - super(delegate.disableTCCL); + DuplicatedContext(ContextBase delegate) { this.delegate = delegate; } @Override - boolean inThread() { + public boolean inThread() { return delegate.inThread(); } @@ -53,31 +52,16 @@ public final CloseFuture closeFuture() { return delegate.closeFuture(); } - @Override - public final boolean isDeployment() { - return delegate.isDeployment(); - } - @Override public final VertxTracer tracer() { return delegate.tracer(); } - @Override - public final String deploymentID() { - return delegate.deploymentID(); - } - @Override public final JsonObject config() { return delegate.config(); } - @Override - public final int getInstanceCount() { - return delegate.getInstanceCount(); - } - @Override public final Context exceptionHandler(Handler handler) { delegate.exceptionHandler(handler); @@ -141,22 +125,22 @@ public final ConcurrentMap localContextData() { @Override public final Future executeBlockingInternal(Handler> action) { - return ContextImpl.executeBlocking(this, action, delegate.internalBlockingPool, delegate.internalOrderedTasks); + return ContextBase.executeBlocking(this, action, delegate.internalWorkerPool, delegate.internalOrderedTasks); } @Override public final Future executeBlockingInternal(Handler> action, boolean ordered) { - return ContextImpl.executeBlocking(this, action, delegate.internalBlockingPool, ordered ? delegate.internalOrderedTasks : null); + return ContextBase.executeBlocking(this, action, delegate.internalWorkerPool, ordered ? delegate.internalOrderedTasks : null); } @Override public final Future executeBlocking(Handler> action, boolean ordered) { - return ContextImpl.executeBlocking(this, action, delegate.workerPool, ordered ? delegate.orderedTasks : null); + return ContextBase.executeBlocking(this, action, delegate.workerPool, ordered ? delegate.orderedTasks : null); } @Override public final Future executeBlocking(Handler> blockingCodeHandler, TaskQueue queue) { - return ContextImpl.executeBlocking(this, blockingCodeHandler, delegate.workerPool, queue); + return ContextBase.executeBlocking(this, blockingCodeHandler, delegate.workerPool, queue); } @Override @@ -184,6 +168,11 @@ public boolean isEventLoopContext() { return delegate.isEventLoopContext(); } + @Override + public boolean isWorkerContext() { + return delegate.isWorkerContext(); + } + @Override public ContextInternal duplicate() { return new DuplicatedContext(delegate); diff --git a/src/main/java/io/vertx/core/impl/EventLoopContext.java b/src/main/java/io/vertx/core/impl/EventLoopContext.java index fe2bfceda81..69d1d60714e 100644 --- a/src/main/java/io/vertx/core/impl/EventLoopContext.java +++ b/src/main/java/io/vertx/core/impl/EventLoopContext.java @@ -20,7 +20,7 @@ /** * @author Tim Fox */ -public class EventLoopContext extends ContextImpl { +public class EventLoopContext extends ContextBase { EventLoopContext(VertxInternal vertx, EventLoop eventLoop, @@ -28,9 +28,8 @@ public class EventLoopContext extends ContextImpl { WorkerPool workerPool, Deployment deployment, CloseFuture closeFuture, - ClassLoader tccl, - boolean disableTCCL) { - super(vertx, eventLoop, internalBlockingPool, workerPool, deployment, closeFuture, tccl, disableTCCL); + ClassLoader tccl) { + super(vertx, eventLoop, internalBlockingPool, workerPool, deployment, closeFuture, tccl); } @Override @@ -39,7 +38,7 @@ public Executor executor() { } @Override - void runOnContext(AbstractContext ctx, Handler action) { + protected void runOnContext(ContextInternal ctx, Handler action) { try { nettyEventLoop().execute(() -> ctx.dispatch(action)); } catch (RejectedExecutionException ignore) { @@ -48,7 +47,7 @@ void runOnContext(AbstractContext ctx, Handler action) { } @Override - void emit(AbstractContext ctx, T argument, Handler task) { + protected void emit(ContextInternal ctx, T argument, Handler task) { EventLoop eventLoop = nettyEventLoop(); if (eventLoop.inEventLoop()) { ContextInternal prev = ctx.beginDispatch(); @@ -71,7 +70,7 @@ void emit(AbstractContext ctx, T argument, Handler task) { * */ @Override - void execute(AbstractContext ctx, T argument, Handler task) { + protected void execute(ContextInternal ctx, T argument, Handler task) { EventLoop eventLoop = nettyEventLoop(); if (eventLoop.inEventLoop()) { task.handle(argument); @@ -81,7 +80,7 @@ void execute(AbstractContext ctx, T argument, Handler task) { } @Override - void execute(AbstractContext ctx, Runnable task) { + protected void execute(ContextInternal ctx, Runnable task) { EventLoop eventLoop = nettyEventLoop(); if (eventLoop.inEventLoop()) { task.run(); @@ -96,7 +95,12 @@ public boolean isEventLoopContext() { } @Override - boolean inThread() { + public boolean isWorkerContext() { + return false; + } + + @Override + public boolean inThread() { return nettyEventLoop().inEventLoop(); } diff --git a/src/main/java/io/vertx/core/impl/HAManager.java b/src/main/java/io/vertx/core/impl/HAManager.java index 8cee2def0b5..73a90600151 100644 --- a/src/main/java/io/vertx/core/impl/HAManager.java +++ b/src/main/java/io/vertx/core/impl/HAManager.java @@ -344,7 +344,7 @@ private synchronized void checkQuorumWhenAdded(final String nodeID, final long s log.warn("Timed out waiting for group information to appear"); } else { // Remove any context we have here (from the timer) otherwise will screw things up when verticles are deployed - ContextImpl.executeIsolated(v -> { + ((VertxImpl)vertx).executeIsolated(v -> { checkQuorumWhenAdded(nodeID, start); }); } @@ -404,7 +404,7 @@ private void addToHA(String deploymentID, String verticleName, DeploymentOptions private void addToHADeployList(final String verticleName, final DeploymentOptions deploymentOptions, final Handler> doneHandler) { toDeployOnQuorum.add(() -> { - ContextImpl.executeIsolated(v -> { + ((VertxImpl)vertx).executeIsolated(v -> { deployVerticle(verticleName, deploymentOptions, doneHandler); }); }); @@ -428,7 +428,7 @@ private void undeployHADeployments() { Deployment dep = deploymentManager.getDeployment(deploymentID); if (dep != null) { if (dep.deploymentOptions().isHa()) { - ContextImpl.executeIsolated(v -> { + ((VertxImpl)vertx).executeIsolated(v -> { deploymentManager.undeployVerticle(deploymentID).onComplete(result -> { if (result.succeeded()) { log.info("Successfully undeployed HA deployment " + deploymentID + "-" + dep.verticleIdentifier() + " as there is no quorum"); @@ -523,7 +523,7 @@ private void processFailover(JsonObject failedVerticle) { final CountDownLatch latch = new CountDownLatch(1); final AtomicReference err = new AtomicReference<>(); // Now deploy this verticle on this node - ContextImpl.executeIsolated(v -> { + ((VertxImpl)vertx).executeIsolated(v -> { JsonObject options = failedVerticle.getJsonObject("options"); doDeployVerticle(verticleName, new DeploymentOptions(options), result -> { if (result.succeeded()) { diff --git a/src/main/java/io/vertx/core/impl/VertxImpl.java b/src/main/java/io/vertx/core/impl/VertxImpl.java index 3ef052c3240..2275b889ce8 100644 --- a/src/main/java/io/vertx/core/impl/VertxImpl.java +++ b/src/main/java/io/vertx/core/impl/VertxImpl.java @@ -137,7 +137,7 @@ public class VertxImpl implements VertxInternal, MetricsProvider { private final CloseFuture closeFuture; private final Transport transport; private final VertxTracer tracer; - private final ThreadLocal> stickyContext = new ThreadLocal<>(); + private final ThreadLocal> stickyContext = new ThreadLocal<>(); private final boolean disableTCCL; VertxImpl(VertxOptions options, ClusterManager clusterManager, NodeSelector nodeSelector, VertxMetrics metrics, @@ -402,8 +402,13 @@ public void runOnContext(Handler task) { } // The background pool is used for making blocking calls to legacy synchronous APIs - public ExecutorService getWorkerPool() { - return workerPool.executor(); + public WorkerPool getWorkerPool() { + return workerPool; + } + + @Override + public WorkerPool getInternalWorkerPool() { + return internalWorkerPool; } public EventLoopGroup getEventLoopGroup() { @@ -415,7 +420,7 @@ public EventLoopGroup getAcceptorEventLoopGroup() { } public ContextInternal getOrCreateContext() { - AbstractContext ctx = getContext(); + ContextInternal ctx = getContext(); if (ctx == null) { // We are running embedded - Create a context ctx = createEventLoopContext(); @@ -464,12 +469,12 @@ public boolean cancelTimer(long id) { @Override public EventLoopContext createEventLoopContext(Deployment deployment, CloseFuture closeFuture, WorkerPool workerPool, ClassLoader tccl) { - return new EventLoopContext(this, eventLoopGroup.next(), internalWorkerPool, workerPool != null ? workerPool : this.workerPool, deployment, closeFuture, tccl, disableTCCL); + return new EventLoopContext(this, eventLoopGroup.next(), internalWorkerPool, workerPool != null ? workerPool : this.workerPool, deployment, closeFuture, disableTCCL ? null : tccl); } @Override public EventLoopContext createEventLoopContext(EventLoop eventLoop, WorkerPool workerPool, ClassLoader tccl) { - return new EventLoopContext(this, eventLoop, internalWorkerPool, workerPool != null ? workerPool : this.workerPool, null, closeFuture, tccl, disableTCCL); + return new EventLoopContext(this, eventLoop, internalWorkerPool, workerPool != null ? workerPool : this.workerPool, null, closeFuture, disableTCCL ? tccl : null); } @Override @@ -479,7 +484,7 @@ public EventLoopContext createEventLoopContext() { @Override public WorkerContext createWorkerContext(Deployment deployment, CloseFuture closeFuture, WorkerPool workerPool, ClassLoader tccl) { - return new WorkerContext(this, internalWorkerPool, workerPool != null ? workerPool : this.workerPool, deployment, closeFuture, tccl, disableTCCL); + return new WorkerContext(this, internalWorkerPool, workerPool != null ? workerPool : this.workerPool, deployment, closeFuture, disableTCCL ? null : tccl); } @Override @@ -536,12 +541,12 @@ public long scheduleTimeout(ContextInternal context, return task.id; } - public AbstractContext getContext() { - AbstractContext context = (AbstractContext) ContextInternal.current(); + public ContextInternal getContext() { + ContextInternal context = ContextInternal.current(); if (context != null && context.owner() == this) { return context; } else { - WeakReference ref = stickyContext.get(); + WeakReference ref = stickyContext.get(); return ref != null ? ref.get() : null; } } @@ -1145,7 +1150,135 @@ public void removeCloseHook(Closeable hook) { } private CloseFuture resolveCloseFuture() { - AbstractContext context = getContext(); + ContextInternal context = getContext(); return context != null ? context.closeFuture() : closeFuture; } + + /** + * Execute the {@code task} disabling the thread-local association for the duration + * of the execution. {@link Vertx#currentContext()} will return {@code null}, + * @param task the task to execute + * @throws IllegalStateException if the current thread is not a Vertx thread + */ + void executeIsolated(Handler task) { + if (Thread.currentThread() instanceof VertxThread) { + ContextInternal prev = beginDispatch(null); + try { + task.handle(null); + } finally { + endDispatch(prev); + } + } else { + task.handle(null); + } + } + + static class ContextDispatch { + ContextInternal context; + ClassLoader topLevelTCCL; + } + + /** + * Context dispatch info for context running with non vertx threads (Loom). + */ + static final ThreadLocal nonVertxContextDispatch = new ThreadLocal<>(); + + /** + * Begin the emission of a context event. + *

+ * This is a low level interface that should not be used, instead {@link ContextInternal#dispatch(Object, io.vertx.core.Handler)} + * shall be used. + * + * @param context the context on which the event is emitted on + * @return the current context that shall be restored + */ + ContextInternal beginDispatch(ContextInternal context) { + Thread thread = Thread.currentThread(); + ContextInternal prev; + if (thread instanceof VertxThread) { + VertxThread vertxThread = (VertxThread) thread; + prev = vertxThread.context; + if (!ContextBase.DISABLE_TIMINGS) { + vertxThread.executeStart(); + } + vertxThread.context = context; + if (!disableTCCL) { + if (prev == null) { + vertxThread.topLevelTCCL = Thread.currentThread().getContextClassLoader(); + } + if (context != null) { + thread.setContextClassLoader(context.classLoader()); + } + } + } else { + prev = beginDispatch2(thread, context); + } + return prev; + } + + private ContextInternal beginDispatch2(Thread thread, ContextInternal context) { + ContextDispatch current = nonVertxContextDispatch.get(); + ContextInternal prev; + if (current != null) { + prev = current.context; + } else { + current = new ContextDispatch(); + nonVertxContextDispatch.set(current); + prev = null; + } + current.context = context; + if (!disableTCCL) { + if (prev == null) { + current.topLevelTCCL = Thread.currentThread().getContextClassLoader(); + } + thread.setContextClassLoader(context.classLoader()); + } + return prev; + } + + /** + * End the emission of a context task. + *

+ * This is a low level interface that should not be used, instead {@link ContextInternal#dispatch(Object, io.vertx.core.Handler)} + * shall be used. + * + * @param prev the previous context thread to restore, might be {@code null} + */ + void endDispatch(ContextInternal prev) { + Thread thread = Thread.currentThread(); + if (thread instanceof VertxThread) { + VertxThread vertxThread = (VertxThread) thread; + vertxThread.context = prev; + if (!disableTCCL) { + ClassLoader tccl; + if (prev == null) { + tccl = vertxThread.topLevelTCCL; + vertxThread.topLevelTCCL = null; + } else { + tccl = prev.classLoader(); + } + Thread.currentThread().setContextClassLoader(tccl); + } + if (!ContextBase.DISABLE_TIMINGS) { + vertxThread.executeEnd(); + } + } else { + endDispatch2(prev); + } + } + + private void endDispatch2(ContextInternal prev) { + ClassLoader tccl; + ContextDispatch current = nonVertxContextDispatch.get(); + if (prev != null) { + current.context = prev; + tccl = prev.classLoader(); + } else { + nonVertxContextDispatch.remove(); + tccl = current.topLevelTCCL; + } + if (!disableTCCL) { + Thread.currentThread().setContextClassLoader(tccl); + } + } } diff --git a/src/main/java/io/vertx/core/impl/VertxInternal.java b/src/main/java/io/vertx/core/impl/VertxInternal.java index 5d431e8c6e2..36da3b9dffc 100644 --- a/src/main/java/io/vertx/core/impl/VertxInternal.java +++ b/src/main/java/io/vertx/core/impl/VertxInternal.java @@ -71,7 +71,9 @@ public interface VertxInternal extends Vertx { EventLoopGroup getAcceptorEventLoopGroup(); - ExecutorService getWorkerPool(); + WorkerPool getWorkerPool(); + + WorkerPool getInternalWorkerPool(); Map sharedHttpServers(); diff --git a/src/main/java/io/vertx/core/impl/VertxThread.java b/src/main/java/io/vertx/core/impl/VertxThread.java index 0c867e0521e..7322fcabe51 100644 --- a/src/main/java/io/vertx/core/impl/VertxThread.java +++ b/src/main/java/io/vertx/core/impl/VertxThread.java @@ -25,8 +25,8 @@ public class VertxThread extends FastThreadLocalThread implements BlockedThreadC private final long maxExecTime; private final TimeUnit maxExecTimeUnit; private long execStart; - private ContextInternal context; - private ClassLoader topLevelTCCL; + ContextInternal context; + ClassLoader topLevelTCCL; public VertxThread(Runnable target, String name, boolean worker, long maxExecTime, TimeUnit maxExecTimeUnit) { super(target, name); @@ -42,13 +42,13 @@ ContextInternal context() { return context; } - private void executeStart() { + void executeStart() { if (context == null) { execStart = System.nanoTime(); } } - private void executeEnd() { + void executeEnd() { if (context == null) { execStart = 0; } @@ -72,43 +72,4 @@ public TimeUnit maxExecTimeUnit() { return maxExecTimeUnit; } - /** - * Begin the emission of a context event. - *

- * This is a low level interface that should not be used, instead {@link ContextInternal#dispatch(Object, io.vertx.core.Handler)} - * shall be used. - * - * @param context the context on which the event is emitted on - * @return the current context that shall be restored - */ - ContextInternal beginEmission(ContextInternal context) { - if (!ContextImpl.DISABLE_TIMINGS) { - executeStart(); - } - ContextInternal prev = this.context; - if (prev == null) { - topLevelTCCL = Thread.currentThread().getContextClassLoader(); - } - this.context = context; - return prev; - } - - /** - * End the emission of a context task. - *

- * This is a low level interface that should not be used, instead {@link ContextInternal#dispatch(Object, io.vertx.core.Handler)} - * shall be used. - * - * @param prev the previous context thread to restore, might be {@code null} - */ - void endEmission(ContextInternal prev) { - context = prev; - if (prev == null) { - Thread.currentThread().setContextClassLoader(topLevelTCCL); - topLevelTCCL = null; - } - if (!ContextImpl.DISABLE_TIMINGS) { - executeEnd(); - } - } } diff --git a/src/main/java/io/vertx/core/impl/WorkerContext.java b/src/main/java/io/vertx/core/impl/WorkerContext.java index 79a1304d420..29fe6eea27c 100644 --- a/src/main/java/io/vertx/core/impl/WorkerContext.java +++ b/src/main/java/io/vertx/core/impl/WorkerContext.java @@ -22,20 +22,19 @@ /** * @author Tim Fox */ -public class WorkerContext extends ContextImpl { +public class WorkerContext extends ContextBase { WorkerContext(VertxInternal vertx, WorkerPool internalBlockingPool, WorkerPool workerPool, Deployment deployment, CloseFuture closeFuture, - ClassLoader tccl, - boolean disableTCCL) { - super(vertx, vertx.getEventLoopGroup().next(), internalBlockingPool, workerPool, deployment, closeFuture, tccl, disableTCCL); + ClassLoader tccl) { + super(vertx, vertx.getEventLoopGroup().next(), internalBlockingPool, workerPool, deployment, closeFuture, tccl); } @Override - void runOnContext(AbstractContext ctx, Handler action) { + protected void runOnContext(ContextInternal ctx, Handler action) { try { run(ctx, null, action); } catch (RejectedExecutionException ignore) { @@ -50,19 +49,19 @@ void runOnContext(AbstractContext ctx, Handler action) { * */ @Override - void execute(AbstractContext ctx, T argument, Handler task) { + protected void execute(ContextInternal ctx, T argument, Handler task) { execute(orderedTasks, argument, task); } @Override - void emit(AbstractContext ctx, T argument, Handler task) { + protected void emit(ContextInternal ctx, T argument, Handler task) { execute(orderedTasks, argument, arg -> { ctx.dispatch(arg, task); }); } @Override - void execute(AbstractContext ctx, Runnable task) { + protected void execute(ContextInternal ctx, Runnable task) { execute(this, task, Runnable::run); } @@ -71,6 +70,11 @@ public boolean isEventLoopContext() { return false; } + @Override + public boolean isWorkerContext() { + return true; + } + private Executor executor; @Override @@ -125,7 +129,7 @@ private void execute(TaskQueue queue, T argument, Handler task) { } @Override - boolean inThread() { + public boolean inThread() { return Context.isOnWorkerThread(); } } diff --git a/src/main/java/io/vertx/core/impl/WorkerExecutorImpl.java b/src/main/java/io/vertx/core/impl/WorkerExecutorImpl.java index 1926fafc5bb..bf136ade848 100644 --- a/src/main/java/io/vertx/core/impl/WorkerExecutorImpl.java +++ b/src/main/java/io/vertx/core/impl/WorkerExecutorImpl.java @@ -63,8 +63,8 @@ public WorkerPool getPool() { } } ContextInternal context = (ContextInternal) vertx.getOrCreateContext(); - ContextImpl impl = context instanceof DuplicatedContext ? ((DuplicatedContext)context).delegate : (ContextImpl) context; - return ContextImpl.executeBlocking(context, blockingCodeHandler, pool, ordered ? impl.orderedTasks : null); + ContextBase impl = context instanceof DuplicatedContext ? ((DuplicatedContext)context).delegate : (ContextBase) context; + return ContextBase.executeBlocking(context, blockingCodeHandler, pool, ordered ? impl.orderedTasks : null); } public void executeBlocking(Handler> blockingCodeHandler, boolean ordered, Handler> asyncResultHandler) { diff --git a/src/main/java/io/vertx/core/streams/impl/InboundBuffer.java b/src/main/java/io/vertx/core/streams/impl/InboundBuffer.java index 32b65a3f95f..5391b616ac0 100644 --- a/src/main/java/io/vertx/core/streams/impl/InboundBuffer.java +++ b/src/main/java/io/vertx/core/streams/impl/InboundBuffer.java @@ -99,8 +99,7 @@ public InboundBuffer(Context context, long highWaterMark) { } private void checkThread() { - Thread thread = Thread.currentThread(); - if (!(thread instanceof FastThreadLocalThread)) { + if (!context.inThread()) { throw new IllegalStateException("This operation must be called from a Vert.x thread"); } } diff --git a/src/test/benchmarks/io/vertx/core/impl/BenchmarkContext.java b/src/test/benchmarks/io/vertx/core/impl/BenchmarkContext.java index b376d8604f3..15a313ab464 100644 --- a/src/test/benchmarks/io/vertx/core/impl/BenchmarkContext.java +++ b/src/test/benchmarks/io/vertx/core/impl/BenchmarkContext.java @@ -19,7 +19,7 @@ /** * @author Julien Viet */ -public class BenchmarkContext extends ContextImpl { +public class BenchmarkContext extends ContextBase { public static BenchmarkContext create(Vertx vertx) { VertxImpl impl = (VertxImpl) vertx; @@ -32,7 +32,7 @@ public static BenchmarkContext create(Vertx vertx) { } public BenchmarkContext(VertxInternal vertx, WorkerPool internalBlockingPool, WorkerPool workerPool, ClassLoader tccl) { - super(vertx, vertx.getEventLoopGroup().next(), internalBlockingPool, workerPool, null, null, tccl, false); + super(vertx, vertx.getEventLoopGroup().next(), internalBlockingPool, workerPool, null, null, tccl); } @Override @@ -41,27 +41,27 @@ public Executor executor() { } @Override - boolean inThread() { + public boolean inThread() { throw new UnsupportedOperationException(); } @Override - void emit(AbstractContext ctx, T argument, Handler task) { + protected void emit(ContextInternal ctx, T argument, Handler task) { throw new UnsupportedOperationException(); } @Override - void runOnContext(AbstractContext ctx, Handler action) { + protected void runOnContext(ContextInternal ctx, Handler action) { ctx.dispatch(null, action); } @Override - void execute(AbstractContext ctx, T argument, Handler task) { + protected void execute(ContextInternal ctx, T argument, Handler task) { task.handle(argument); } @Override - void execute(AbstractContext ctx, Runnable task) { + protected void execute(ContextInternal ctx, Runnable task) { task.run(); } @@ -75,4 +75,8 @@ public boolean isEventLoopContext() { return false; } + @Override + public boolean isWorkerContext() { + return false; + } } diff --git a/src/test/java/io/vertx/core/ContextTest.java b/src/test/java/io/vertx/core/ContextTest.java index 4116517a064..9980d1c374c 100644 --- a/src/test/java/io/vertx/core/ContextTest.java +++ b/src/test/java/io/vertx/core/ContextTest.java @@ -983,4 +983,34 @@ public void testIsDuplicatedContextFromWorkerContext() { }); await(); } + + @Test + public void testDispatchContextOnAnyThread() { + ClassLoader tccl1 = new URLClassLoader(new URL[0]); + ClassLoader tccl2 = new URLClassLoader(new URL[0]); + VertxImpl impl = (VertxImpl) vertx; + ContextInternal ctx1 = new FakeContext(impl, tccl1); + ContextInternal ctx2 = new FakeContext(impl, tccl2); + AtomicInteger exec = new AtomicInteger(); + Thread thread = Thread.currentThread(); + ClassLoader current = thread.getContextClassLoader(); + ctx1.dispatch(() -> { + assertSame(thread, Thread.currentThread()); + assertSame(ctx1, Vertx.currentContext()); + assertSame(tccl1, thread.getContextClassLoader()); + assertEquals(1, exec.incrementAndGet()); + ctx2.dispatch(() -> { + assertSame(thread, Thread.currentThread()); + assertSame(ctx2, Vertx.currentContext()); + assertSame(tccl2, thread.getContextClassLoader()); + assertEquals(2, exec.incrementAndGet()); + }); + assertSame(ctx1, Vertx.currentContext()); + assertSame(tccl1, thread.getContextClassLoader()); + assertEquals(2, exec.get()); + }); + assertNull(Vertx.currentContext()); + assertSame(current, thread.getContextClassLoader()); + assertEquals(2, exec.get()); + } } diff --git a/src/test/java/io/vertx/core/FakeContext.java b/src/test/java/io/vertx/core/FakeContext.java new file mode 100644 index 00000000000..1d8845bc646 --- /dev/null +++ b/src/test/java/io/vertx/core/FakeContext.java @@ -0,0 +1,174 @@ +package io.vertx.core; + +import io.netty.channel.EventLoop; +import io.vertx.codegen.annotations.Nullable; +import io.vertx.core.impl.CloseFuture; +import io.vertx.core.impl.ContextInternal; +import io.vertx.core.impl.Deployment; +import io.vertx.core.impl.TaskQueue; +import io.vertx.core.impl.VertxImpl; +import io.vertx.core.impl.VertxInternal; +import io.vertx.core.impl.WorkerPool; +import io.vertx.core.json.JsonObject; +import io.vertx.core.spi.tracing.VertxTracer; + +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; + +class FakeContext implements ContextInternal { + + private final VertxImpl impl; + private final ClassLoader tccl; + + public FakeContext(VertxImpl impl, ClassLoader classLoader) { + this.impl = impl; + this.tccl = classLoader; + } + + @Override + public Executor executor() { + return command -> { + + }; + } + + @Override + public void runOnContext(Handler action) { + + } + + @Override + public boolean inThread() { + return false; + } + + @Override + public Future<@Nullable T> executeBlocking(Handler> blockingCodeHandler, boolean ordered) { + return null; + } + + @Override + public String deploymentID() { + return null; + } + + @Override + public @Nullable JsonObject config() { + return null; + } + + @Override + public int getInstanceCount() { + return 0; + } + + @Override + public Context exceptionHandler(@Nullable Handler handler) { + return null; + } + + @Override + public @Nullable Handler exceptionHandler() { + return null; + } + + @Override + public boolean isEventLoopContext() { + return false; + } + + @Override + public boolean isWorkerContext() { + return false; + } + + @Override + public EventLoop nettyEventLoop() { + return null; + } + + @Override + public Future executeBlocking(Handler> blockingCodeHandler, TaskQueue queue) { + return null; + } + + @Override + public Future executeBlockingInternal(Handler> action) { + return null; + } + + @Override + public Future executeBlockingInternal(Handler> action, boolean ordered) { + return null; + } + + @Override + public Deployment getDeployment() { + return null; + } + + @Override + public VertxInternal owner() { + return impl; + } + + @Override + public void emit(T argument, Handler task) { + + } + + @Override + public void execute(Runnable task) { + + } + + @Override + public void execute(T argument, Handler task) { + + } + + @Override + public void reportException(Throwable t) { + + } + + @Override + public ConcurrentMap contextData() { + return null; + } + + @Override + public ConcurrentMap localContextData() { + return null; + } + + @Override + public ClassLoader classLoader() { + return tccl; + } + + @Override + public WorkerPool workerPool() { + return null; + } + + @Override + public VertxTracer tracer() { + return null; + } + + @Override + public ContextInternal duplicate() { + return null; + } + + @Override + public boolean isDeployment() { + return false; + } + + @Override + public CloseFuture closeFuture() { + return null; + } +}