diff --git a/src/main/java/io/vertx/core/Context.java b/src/main/java/io/vertx/core/Context.java index 046cf4428a1..f8884827ba9 100644 --- a/src/main/java/io/vertx/core/Context.java +++ b/src/main/java/io/vertx/core/Context.java @@ -16,6 +16,7 @@ import io.vertx.codegen.annotations.Nullable; import io.vertx.codegen.annotations.VertxGen; import io.vertx.core.impl.VertxThread; +import io.vertx.core.impl.launcher.VertxCommandLauncher; import io.vertx.core.json.JsonObject; import java.util.List; @@ -135,7 +136,9 @@ static boolean isOnVertxThread() { * @param resultHandler handler that will be called when the blocking code is complete * @param the type of the result */ - void executeBlocking(Handler> blockingCodeHandler, Handler> resultHandler); + default void executeBlocking(Handler> blockingCodeHandler, Handler> resultHandler) { + executeBlocking(blockingCodeHandler, true, resultHandler); + } /** * Same as {@link #executeBlocking(Handler, boolean, Handler)} but with an {@code handler} called when the operation completes @@ -145,7 +148,9 @@ static boolean isOnVertxThread() { /** * Same as {@link #executeBlocking(Handler, Handler)} but with an {@code handler} called when the operation completes */ - Future executeBlocking(Handler> blockingCodeHandler); + default Future executeBlocking(Handler> blockingCodeHandler) { + return executeBlocking(blockingCodeHandler, true); + } /** * If the context is associated with a Verticle deployment, this returns the deployment ID of that deployment. @@ -165,7 +170,9 @@ static boolean isOnVertxThread() { /** * The process args */ - List processArgs(); + default List processArgs() { + return VertxCommandLauncher.getProcessArguments(); + } /** * Is the current context an event loop context? diff --git a/src/main/java/io/vertx/core/file/impl/AsyncFileImpl.java b/src/main/java/io/vertx/core/file/impl/AsyncFileImpl.java index fe31d53733a..3b7d5b6c2ed 100644 --- a/src/main/java/io/vertx/core/file/impl/AsyncFileImpl.java +++ b/src/main/java/io/vertx/core/file/impl/AsyncFileImpl.java @@ -99,9 +99,9 @@ public class AsyncFileImpl implements AsyncFile { try { if (options.getPerms() != null) { FileAttribute attrs = PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString(options.getPerms())); - ch = AsynchronousFileChannel.open(file, opts, vertx.getWorkerPool(), attrs); + ch = AsynchronousFileChannel.open(file, opts, vertx.getWorkerPool().executor(), attrs); } else { - ch = AsynchronousFileChannel.open(file, opts, vertx.getWorkerPool()); + ch = AsynchronousFileChannel.open(file, opts, vertx.getWorkerPool().executor()); } if (options.isAppend()) writePos = ch.size(); } catch (IOException e) { diff --git a/src/main/java/io/vertx/core/http/impl/Http1xServerRequest.java b/src/main/java/io/vertx/core/http/impl/Http1xServerRequest.java index 4be9d850616..992615af25e 100644 --- a/src/main/java/io/vertx/core/http/impl/Http1xServerRequest.java +++ b/src/main/java/io/vertx/core/http/impl/Http1xServerRequest.java @@ -123,7 +123,7 @@ void setRequest(HttpRequest request) { private InboundBuffer pendingQueue() { if (pending == null) { - pending = new InboundBuffer<>(conn.getContext(), 8); + pending = new InboundBuffer<>(context, 8); pending.drainHandler(v -> conn.doResume()); pending.handler(buffer -> { if (buffer == InboundBuffer.END_SENTINEL) { diff --git a/src/main/java/io/vertx/core/impl/AbstractContext.java b/src/main/java/io/vertx/core/impl/AbstractContext.java deleted file mode 100644 index 236b95a811e..00000000000 --- a/src/main/java/io/vertx/core/impl/AbstractContext.java +++ /dev/null @@ -1,235 +0,0 @@ -/* - * Copyright (c) 2011-2019 Contributors to the Eclipse Foundation - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License 2.0 which is available at - * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 - * which is available at https://www.apache.org/licenses/LICENSE-2.0. - * - * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 - */ -package io.vertx.core.impl; - -import io.vertx.core.*; -import io.vertx.core.impl.future.FailedFuture; -import io.vertx.core.impl.future.PromiseImpl; -import io.vertx.core.impl.future.PromiseInternal; -import io.vertx.core.impl.future.SucceededFuture; -import io.vertx.core.impl.launcher.VertxCommandLauncher; - -import java.util.List; -import java.util.concurrent.TimeUnit; - -/** - * A context implementation that does not hold any specific state. - * - * @author Julien Viet - * @author Tim Fox - */ -abstract class AbstractContext implements ContextInternal { - - final boolean disableTCCL; - - public AbstractContext(boolean disableTCCL) { - this.disableTCCL = disableTCCL; - } - - @Override - public abstract boolean isEventLoopContext(); - - @Override - public final boolean isRunningOnContext() { - return Vertx.currentContext() == this && inThread(); - } - - abstract boolean inThread(); - - @Override - public boolean isWorkerContext() { - return !isEventLoopContext(); - } - - @Override - public void emit(Handler task) { - emit(null, task); - } - - @Override - public final void execute(Handler task) { - execute(null, task); - } - - @Override - public final void dispatch(Handler handler) { - dispatch(null, handler); - } - - public final ContextInternal beginDispatch() { - ContextInternal prev; - VertxThread th = (VertxThread) Thread.currentThread(); - prev = th.beginEmission(this); - if (!disableTCCL) { - th.setContextClassLoader(classLoader()); - } - return prev; - } - - public final void endDispatch(ContextInternal previous) { - VertxThread th = (VertxThread) Thread.currentThread(); - if (!disableTCCL) { - th.setContextClassLoader(previous != null ? previous.classLoader() : null); - } - th.endEmission(previous); - } - - @Override - public long setPeriodic(long delay, Handler handler) { - VertxImpl owner = (VertxImpl) owner(); - return owner.scheduleTimeout(this, true, delay, TimeUnit.MILLISECONDS, false, handler); - } - - @Override - public long setTimer(long delay, Handler handler) { - VertxImpl owner = (VertxImpl) owner(); - return owner.scheduleTimeout(this, false, delay, TimeUnit.MILLISECONDS, false, handler); - } - - @Override - public final void dispatch(T event, Handler handler) { - ContextInternal prev = beginDispatch(); - try { - handler.handle(event); - } catch (Throwable t) { - reportException(t); - } finally { - endDispatch(prev); - } - } - - public final void dispatch(Runnable handler) { - ContextInternal prev = beginDispatch(); - try { - handler.run(); - } catch (Throwable t) { - reportException(t); - } finally { - endDispatch(prev); - } - } - - @Override - public final List processArgs() { - return VertxCommandLauncher.getProcessArguments(); - } - - @Override - public final void executeBlockingInternal(Handler> action, Handler> resultHandler) { - Future fut = executeBlockingInternal(action); - setResultHandler(this, fut, resultHandler); - } - - @Override - public void executeBlockingInternal(Handler> action, boolean ordered, Handler> resultHandler) { - Future fut = executeBlockingInternal(action, ordered); - setResultHandler(this, fut, resultHandler); - } - - @Override - public void executeBlocking(Handler> blockingCodeHandler, boolean ordered, Handler> resultHandler) { - Future fut = executeBlocking(blockingCodeHandler, ordered); - setResultHandler(this, fut, resultHandler); - } - - @Override - public void executeBlocking(Handler> blockingCodeHandler, TaskQueue queue, Handler> resultHandler) { - Future fut = executeBlocking(blockingCodeHandler, queue); - setResultHandler(this, fut, resultHandler); - } - - @Override - public final void executeBlocking(Handler> blockingCodeHandler, Handler> resultHandler) { - executeBlocking(blockingCodeHandler, true, resultHandler); - } - - public Future executeBlocking(Handler> blockingCodeHandler) { - return executeBlocking(blockingCodeHandler, true); - } - - @Override - public PromiseInternal promise() { - return new PromiseImpl<>(this); - } - - @Override - public PromiseInternal promise(Handler> handler) { - if (handler instanceof PromiseInternal) { - PromiseInternal promise = (PromiseInternal) handler; - if (promise.context() != null) { - return promise; - } - } - PromiseInternal promise = promise(); - promise.future().onComplete(handler); - return promise; - } - - @Override - public Future succeededFuture() { - return new SucceededFuture<>(this, null); - } - - @Override - public Future succeededFuture(T result) { - return new SucceededFuture<>(this, result); - } - - @Override - public Future failedFuture(Throwable failure) { - return new FailedFuture<>(this, failure); - } - - @Override - public Future failedFuture(String message) { - return new FailedFuture<>(this, message); - } - - @SuppressWarnings("unchecked") - @Override - public final T get(Object key) { - return (T) contextData().get(key); - } - - @Override - public final void put(Object key, Object value) { - contextData().put(key, value); - } - - @Override - public final boolean remove(Object key) { - return contextData().remove(key) != null; - } - - @SuppressWarnings("unchecked") - @Override - public final T getLocal(Object key) { - return (T) localContextData().get(key); - } - - @Override - public final void putLocal(Object key, Object value) { - localContextData().put(key, value); - } - - @Override - public final boolean removeLocal(Object key) { - return localContextData().remove(key) != null; - } - - private static void setResultHandler(ContextInternal ctx, Future fut, Handler> resultHandler) { - if (resultHandler != null) { - fut.onComplete(resultHandler); - } else { - fut.onFailure(ctx::reportException); - } - } -} diff --git a/src/main/java/io/vertx/core/impl/ContextImpl.java b/src/main/java/io/vertx/core/impl/ContextBase.java similarity index 72% rename from src/main/java/io/vertx/core/impl/ContextImpl.java rename to src/main/java/io/vertx/core/impl/ContextBase.java index 9815bdf92f0..450e0828648 100644 --- a/src/main/java/io/vertx/core/impl/ContextImpl.java +++ b/src/main/java/io/vertx/core/impl/ContextBase.java @@ -25,32 +25,22 @@ import java.util.concurrent.RejectedExecutionException; /** + * A base class for {@link Context} implementations. + * * @author Tim Fox + * @author Julien Viet */ -abstract class ContextImpl extends AbstractContext { - - /** - * Execute the {@code task} disabling the thread-local association for the duration - * of the execution. {@link Vertx#currentContext()} will return {@code null}, - * @param task the task to execute - * @throws IllegalStateException if the current thread is not a Vertx thread - */ - static void executeIsolated(Handler task) { - Thread currentThread = Thread.currentThread(); - if (currentThread instanceof VertxThread) { - VertxThread vertxThread = (VertxThread) currentThread; - ContextInternal prev = vertxThread.beginEmission(null); - try { - task.handle(null); - } finally { - vertxThread.endEmission(prev); - } +public abstract class ContextBase implements ContextInternal { + + static void setResultHandler(ContextInternal ctx, Future fut, Handler> resultHandler) { + if (resultHandler != null) { + fut.onComplete(resultHandler); } else { - task.handle(null); + fut.onFailure(ctx::reportException); } } - private static final Logger log = LoggerFactory.getLogger(ContextImpl.class); + private static final Logger log = LoggerFactory.getLogger(ContextBase.class); private static final String DISABLE_TIMINGS_PROP_NAME = "vertx.disableContextTimings"; static final boolean DISABLE_TIMINGS = Boolean.getBoolean(DISABLE_TIMINGS_PROP_NAME); @@ -65,19 +55,17 @@ static void executeIsolated(Handler task) { private ConcurrentMap localData; private volatile Handler exceptionHandler; final TaskQueue internalOrderedTasks; - final WorkerPool internalBlockingPool; + final WorkerPool internalWorkerPool; final WorkerPool workerPool; final TaskQueue orderedTasks; - ContextImpl(VertxInternal vertx, - EventLoop eventLoop, - WorkerPool internalBlockingPool, - WorkerPool workerPool, - Deployment deployment, - CloseFuture closeFuture, - ClassLoader tccl, - boolean disableTCCL) { - super(disableTCCL); + protected ContextBase(VertxInternal vertx, + EventLoop eventLoop, + WorkerPool internalWorkerPool, + WorkerPool workerPool, + Deployment deployment, + CloseFuture closeFuture, + ClassLoader tccl) { this.deployment = deployment; this.config = deployment != null ? deployment.config() : new JsonObject(); this.eventLoop = eventLoop; @@ -85,7 +73,7 @@ static void executeIsolated(Handler task) { this.owner = vertx; this.workerPool = workerPool; this.closeFuture = closeFuture; - this.internalBlockingPool = internalBlockingPool; + this.internalWorkerPool = internalWorkerPool; this.orderedTasks = new TaskQueue(); this.internalOrderedTasks = new TaskQueue(); } @@ -99,16 +87,6 @@ public CloseFuture closeFuture() { return closeFuture; } - @Override - public boolean isDeployment() { - return deployment != null; - } - - @Override - public String deploymentID() { - return deployment != null ? deployment.deploymentID() : null; - } - @Override public JsonObject config() { return config; @@ -124,12 +102,12 @@ public VertxInternal owner() { @Override public Future executeBlockingInternal(Handler> action) { - return executeBlocking(this, action, internalBlockingPool, internalOrderedTasks); + return executeBlocking(this, action, internalWorkerPool, internalOrderedTasks); } @Override public Future executeBlockingInternal(Handler> action, boolean ordered) { - return executeBlocking(this, action, internalBlockingPool, ordered ? internalOrderedTasks : null); + return executeBlocking(this, action, internalWorkerPool, ordered ? internalOrderedTasks : null); } @Override @@ -235,49 +213,36 @@ public Handler exceptionHandler() { return exceptionHandler; } - public int getInstanceCount() { - // the no verticle case - if (deployment == null) { - return 0; - } - - // the single verticle without an instance flag explicitly defined - if (deployment.deploymentOptions() == null) { - return 1; - } - return deployment.deploymentOptions().getInstances(); - } - @Override public final void runOnContext(Handler action) { runOnContext(this, action); } - abstract void runOnContext(AbstractContext ctx, Handler action); + protected abstract void runOnContext(ContextInternal ctx, Handler action); @Override public void execute(Runnable task) { execute(this, task); } - abstract void execute(AbstractContext ctx, Runnable task); + protected abstract void execute(ContextInternal ctx, Runnable task); @Override public final void execute(T argument, Handler task) { execute(this, argument, task); } - abstract void execute(AbstractContext ctx, T argument, Handler task); + protected abstract void execute(ContextInternal ctx, T argument, Handler task); @Override public void emit(T argument, Handler task) { emit(this, argument, task); } - abstract void emit(AbstractContext ctx, T argument, Handler task); + protected abstract void emit(ContextInternal ctx, T argument, Handler task); @Override - public final ContextInternal duplicate() { + public ContextInternal duplicate() { return new DuplicatedContext(this); } } diff --git a/src/main/java/io/vertx/core/impl/ContextInternal.java b/src/main/java/io/vertx/core/impl/ContextInternal.java index b4e70c2e0d9..b5a44898c35 100644 --- a/src/main/java/io/vertx/core/impl/ContextInternal.java +++ b/src/main/java/io/vertx/core/impl/ContextInternal.java @@ -14,11 +14,17 @@ import io.netty.channel.EventLoop; import io.netty.util.concurrent.FastThreadLocalThread; import io.vertx.core.*; +import io.vertx.core.impl.future.FailedFuture; +import io.vertx.core.impl.future.PromiseImpl; import io.vertx.core.impl.future.PromiseInternal; +import io.vertx.core.impl.future.SucceededFuture; import io.vertx.core.spi.tracing.VertxTracer; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; + +import static io.vertx.core.impl.ContextBase.setResultHandler; /** * This interface provides an api for vert.x core internal use only @@ -33,9 +39,14 @@ public interface ContextInternal extends Context { * @return the current context */ static ContextInternal current() { - Thread current = Thread.currentThread(); - if (current instanceof VertxThread) { - return ((VertxThread) current).context(); + Thread thread = Thread.currentThread(); + if (thread instanceof VertxThread) { + return ((VertxThread) thread).context(); + } else { + VertxImpl.ContextDispatch current = VertxImpl.nonVertxContextDispatch.get(); + if (current != null) { + return current.context; + } } return null; } @@ -61,39 +72,62 @@ default void runOnContext(Handler action) { /** * @return a {@link Promise} associated with this context */ - PromiseInternal promise(); + default PromiseInternal promise() { + return new PromiseImpl<>(this); + } /** * @return a {@link Promise} associated with this context or the {@code handler} * if that handler is already an instance of {@code PromiseInternal} */ - PromiseInternal promise(Handler> handler); + default PromiseInternal promise(Handler> handler) { + if (handler instanceof PromiseInternal) { + PromiseInternal promise = (PromiseInternal) handler; + if (promise.context() != null) { + return promise; + } + } + PromiseInternal promise = promise(); + promise.future().onComplete(handler); + return promise; + } /** * @return an empty succeeded {@link Future} associated with this context */ - Future succeededFuture(); + default Future succeededFuture() { + return new SucceededFuture<>(this, null); + } /** * @return a succeeded {@link Future} of the {@code result} associated with this context */ - Future succeededFuture(T result); + default Future succeededFuture(T result) { + return new SucceededFuture<>(this, result); + } /** * @return a {@link Future} failed with the {@code failure} associated with this context */ - Future failedFuture(Throwable failure); + default Future failedFuture(Throwable failure) { + return new FailedFuture<>(this, failure); + } /** * @return a {@link Future} failed with the {@code message} associated with this context */ - Future failedFuture(String message); + default Future failedFuture(String message) { + return new FailedFuture<>(this, message); + } /** * Like {@link #executeBlocking(Handler, boolean, Handler)} but uses the {@code queue} to order the tasks instead * of the internal queue of this context. */ - void executeBlocking(Handler> blockingCodeHandler, TaskQueue queue, Handler> resultHandler); + default void executeBlocking(Handler> blockingCodeHandler, TaskQueue queue, Handler> resultHandler) { + Future fut = executeBlocking(blockingCodeHandler, queue); + setResultHandler(this, fut, resultHandler); + } /** * Like {@link #executeBlocking(Handler, boolean)} but uses the {@code queue} to order the tasks instead @@ -104,9 +138,21 @@ default void runOnContext(Handler action) { /** * Execute an internal task on the internal blocking ordered executor. */ - void executeBlockingInternal(Handler> action, Handler> resultHandler); + default void executeBlockingInternal(Handler> action, Handler> resultHandler) { + Future fut = executeBlockingInternal(action); + setResultHandler(this, fut, resultHandler); + } - void executeBlockingInternal(Handler> action, boolean ordered, Handler> resultHandler); + default void executeBlockingInternal(Handler> action, boolean ordered, Handler> resultHandler) { + Future fut = executeBlockingInternal(action, ordered); + setResultHandler(this, fut, resultHandler); + } + + @Override + default void executeBlocking(Handler> blockingCodeHandler, boolean ordered, Handler> resultHandler) { + Future fut = executeBlocking(blockingCodeHandler, ordered); + setResultHandler(this, fut, resultHandler); + } /** * Like {@link #executeBlockingInternal(Handler, Handler)} but returns a {@code Future} of the asynchronous result @@ -126,6 +172,8 @@ default void runOnContext(Handler action) { @Override VertxInternal owner(); + boolean inThread(); + /** * Emit the given {@code argument} event to the {@code task} and switch on this context if necessary, this also associates the * current thread with the current context so {@link Vertx#currentContext()} returns this context. @@ -142,12 +190,16 @@ default void runOnContext(Handler action) { /** * @see #emit(Object, Handler) */ - void emit(Handler task); + default void emit(Handler task) { + emit(null, task); + } /** * @see #execute(Object, Handler) */ - void execute(Handler task); + default void execute(Handler task) { + execute(null, task); + } /** * Execute the {@code task} on this context, it will be executed according to the @@ -169,22 +221,35 @@ default void runOnContext(Handler action) { /** * @return whether the current thread is running on this context */ - boolean isRunningOnContext(); + default boolean isRunningOnContext() { + return current() == this && inThread(); + } /** * @see #dispatch(Handler) */ - void dispatch(Runnable handler); + default void dispatch(Runnable handler) { + ContextInternal prev = beginDispatch(); + try { + handler.run(); + } catch (Throwable t) { + reportException(t); + } finally { + endDispatch(prev); + } + } /** * @see #dispatch(Object, Handler) */ - void dispatch(Handler handler); + default void dispatch(Handler handler) { + dispatch(null, handler); + } /** * Dispatch an {@code event} to the {@code handler} on this context. *

- * The handler is executed directly by the caller thread which must be a {@link VertxThread} or a {@link FastThreadLocalThread}. + * The handler is executed directly by the caller thread which must be a context thread. *

* The handler execution is monitored by the blocked thread checker. *

@@ -193,7 +258,16 @@ default void runOnContext(Handler action) { * @param event the event for the {@code handler} * @param handler the handler to execute with the {@code event} */ - void dispatch(E event, Handler handler); + default void dispatch(E event, Handler handler) { + ContextInternal prev = beginDispatch(); + try { + handler.handle(event); + } catch (Throwable t) { + reportException(t); + } finally { + endDispatch(prev); + } + } /** * Begin the execution of a task on this context. @@ -207,7 +281,10 @@ default void runOnContext(Handler action) { * @return the previous context that shall be restored after or {@code null} if there is none * @throws IllegalStateException when the current thread of execution cannot execute this task */ - ContextInternal beginDispatch(); + default ContextInternal beginDispatch() { + VertxImpl vertx = (VertxImpl) owner(); + return vertx.beginDispatch(this); + } /** * End the execution of a task on this context, see {@link #beginDispatch()} @@ -217,7 +294,10 @@ default void runOnContext(Handler action) { * @param previous the previous context to restore or {@code null} if there is none * @throws IllegalStateException when the current thread of execution cannot execute this task */ - void endDispatch(ContextInternal previous); + default void endDispatch(ContextInternal previous) { + VertxImpl vertx = (VertxImpl) owner(); + vertx.endDispatch(previous); + } /** * Report an exception to this context synchronously. @@ -235,11 +315,43 @@ default void runOnContext(Handler action) { */ ConcurrentMap contextData(); + @SuppressWarnings("unchecked") + @Override + default T get(Object key) { + return (T) contextData().get(key); + } + + @Override + default void put(Object key, Object value) { + contextData().put(key, value); + } + + @Override + default boolean remove(Object key) { + return contextData().remove(key) != null; + } + /** * @return the {@link ConcurrentMap} used to store local context data */ ConcurrentMap localContextData(); + @SuppressWarnings("unchecked") + @Override + default T getLocal(Object key) { + return (T) localContextData().get(key); + } + + @Override + default void putLocal(Object key, Object value) { + localContextData().put(key, value); + } + + @Override + default boolean removeLocal(Object key) { + return localContextData().remove(key) != null; + } + /** * @return the classloader associated with this context */ @@ -280,18 +392,46 @@ default void runOnContext(Handler action) { * Like {@link Vertx#setPeriodic(long, Handler)} except the periodic timer will fire on this context and the * timer will not be associated with the context close hook. */ - long setPeriodic(long delay, Handler handler); + default long setPeriodic(long delay, Handler handler) { + VertxImpl owner = (VertxImpl) owner(); + return owner.scheduleTimeout(this, true, delay, TimeUnit.MILLISECONDS, false, handler); + } /** * Like {@link Vertx#setTimer(long, Handler)} except the timer will fire on this context and the timer * will not be associated with the context close hook. */ - long setTimer(long delay, Handler handler); + default long setTimer(long delay, Handler handler) { + VertxImpl owner = (VertxImpl) owner(); + return owner.scheduleTimeout(this, false, delay, TimeUnit.MILLISECONDS, false, handler); + } /** * @return {@code true} when the context is associated with a deployment */ - boolean isDeployment(); + default boolean isDeployment() { + return getDeployment() != null; + } + + default String deploymentID() { + Deployment deployment = getDeployment(); + return deployment != null ? deployment.deploymentID() : null; + } + + default int getInstanceCount() { + Deployment deployment = getDeployment(); + + // the no verticle case + if (deployment == null) { + return 0; + } + + // the single verticle without an instance flag explicitly defined + if (deployment.deploymentOptions() == null) { + return 1; + } + return deployment.deploymentOptions().getInstances(); + } CloseFuture closeFuture(); diff --git a/src/main/java/io/vertx/core/impl/DeploymentManager.java b/src/main/java/io/vertx/core/impl/DeploymentManager.java index adc8a015e16..50d97b91ba3 100644 --- a/src/main/java/io/vertx/core/impl/DeploymentManager.java +++ b/src/main/java/io/vertx/core/impl/DeploymentManager.java @@ -184,7 +184,7 @@ private Future doDeploy(String identifier, for (Verticle verticle: verticles) { CloseFuture closeFuture = new CloseFuture(log); WorkerPool workerPool = poolName != null ? vertx.createSharedWorkerPool(poolName, options.getWorkerPoolSize(), options.getMaxWorkerExecuteTime(), options.getMaxWorkerExecuteTimeUnit()) : null; - ContextImpl context = (options.isWorker() ? vertx.createWorkerContext(deployment, closeFuture, workerPool, tccl) : + ContextBase context = (options.isWorker() ? vertx.createWorkerContext(deployment, closeFuture, workerPool, tccl) : vertx.createEventLoopContext(deployment, closeFuture, workerPool, tccl)); VerticleHolder holder = new VerticleHolder(verticle, context, workerPool, closeFuture); deployment.addVerticle(holder); @@ -226,11 +226,11 @@ private Future doDeploy(String identifier, static class VerticleHolder { final Verticle verticle; - final ContextImpl context; + final ContextBase context; final WorkerPool workerPool; final CloseFuture closeFuture; - VerticleHolder(Verticle verticle, ContextImpl context, WorkerPool workerPool, CloseFuture closeFuture) { + VerticleHolder(Verticle verticle, ContextBase context, WorkerPool workerPool, CloseFuture closeFuture) { this.verticle = verticle; this.context = context; this.workerPool = workerPool; @@ -274,7 +274,7 @@ public void addVerticle(VerticleHolder holder) { verticles.add(holder); } - private synchronized void rollback(ContextInternal callingContext, Handler> completionHandler, ContextImpl context, VerticleHolder closeFuture, Throwable cause) { + private synchronized void rollback(ContextInternal callingContext, Handler> completionHandler, ContextBase context, VerticleHolder closeFuture, Throwable cause) { if (status == ST_DEPLOYED) { status = ST_UNDEPLOYING; doUndeployChildren(callingContext).onComplete(childrenResult -> { @@ -331,7 +331,7 @@ public synchronized Future doUndeploy(ContextInternal undeployingContext) parent.removeChild(this); } for (VerticleHolder verticleHolder: verticles) { - ContextImpl context = verticleHolder.context; + ContextBase context = verticleHolder.context; Promise p = Promise.promise(); undeployFutures.add(p.future()); context.runOnContext(v -> { diff --git a/src/main/java/io/vertx/core/impl/DuplicatedContext.java b/src/main/java/io/vertx/core/impl/DuplicatedContext.java index 9307b584575..9484eb3d46f 100644 --- a/src/main/java/io/vertx/core/impl/DuplicatedContext.java +++ b/src/main/java/io/vertx/core/impl/DuplicatedContext.java @@ -33,18 +33,17 @@ * * @author Julien Viet */ -class DuplicatedContext extends AbstractContext { +class DuplicatedContext implements ContextInternal { - protected final ContextImpl delegate; + protected final ContextBase delegate; private ConcurrentMap localData; - DuplicatedContext(ContextImpl delegate) { - super(delegate.disableTCCL); + DuplicatedContext(ContextBase delegate) { this.delegate = delegate; } @Override - boolean inThread() { + public boolean inThread() { return delegate.inThread(); } @@ -53,31 +52,16 @@ public final CloseFuture closeFuture() { return delegate.closeFuture(); } - @Override - public final boolean isDeployment() { - return delegate.isDeployment(); - } - @Override public final VertxTracer tracer() { return delegate.tracer(); } - @Override - public final String deploymentID() { - return delegate.deploymentID(); - } - @Override public final JsonObject config() { return delegate.config(); } - @Override - public final int getInstanceCount() { - return delegate.getInstanceCount(); - } - @Override public final Context exceptionHandler(Handler handler) { delegate.exceptionHandler(handler); @@ -141,22 +125,22 @@ public final ConcurrentMap localContextData() { @Override public final Future executeBlockingInternal(Handler> action) { - return ContextImpl.executeBlocking(this, action, delegate.internalBlockingPool, delegate.internalOrderedTasks); + return ContextBase.executeBlocking(this, action, delegate.internalWorkerPool, delegate.internalOrderedTasks); } @Override public final Future executeBlockingInternal(Handler> action, boolean ordered) { - return ContextImpl.executeBlocking(this, action, delegate.internalBlockingPool, ordered ? delegate.internalOrderedTasks : null); + return ContextBase.executeBlocking(this, action, delegate.internalWorkerPool, ordered ? delegate.internalOrderedTasks : null); } @Override public final Future executeBlocking(Handler> action, boolean ordered) { - return ContextImpl.executeBlocking(this, action, delegate.workerPool, ordered ? delegate.orderedTasks : null); + return ContextBase.executeBlocking(this, action, delegate.workerPool, ordered ? delegate.orderedTasks : null); } @Override public final Future executeBlocking(Handler> blockingCodeHandler, TaskQueue queue) { - return ContextImpl.executeBlocking(this, blockingCodeHandler, delegate.workerPool, queue); + return ContextBase.executeBlocking(this, blockingCodeHandler, delegate.workerPool, queue); } @Override @@ -184,6 +168,11 @@ public boolean isEventLoopContext() { return delegate.isEventLoopContext(); } + @Override + public boolean isWorkerContext() { + return delegate.isWorkerContext(); + } + @Override public ContextInternal duplicate() { return new DuplicatedContext(delegate); diff --git a/src/main/java/io/vertx/core/impl/EventLoopContext.java b/src/main/java/io/vertx/core/impl/EventLoopContext.java index fe2bfceda81..69d1d60714e 100644 --- a/src/main/java/io/vertx/core/impl/EventLoopContext.java +++ b/src/main/java/io/vertx/core/impl/EventLoopContext.java @@ -20,7 +20,7 @@ /** * @author Tim Fox */ -public class EventLoopContext extends ContextImpl { +public class EventLoopContext extends ContextBase { EventLoopContext(VertxInternal vertx, EventLoop eventLoop, @@ -28,9 +28,8 @@ public class EventLoopContext extends ContextImpl { WorkerPool workerPool, Deployment deployment, CloseFuture closeFuture, - ClassLoader tccl, - boolean disableTCCL) { - super(vertx, eventLoop, internalBlockingPool, workerPool, deployment, closeFuture, tccl, disableTCCL); + ClassLoader tccl) { + super(vertx, eventLoop, internalBlockingPool, workerPool, deployment, closeFuture, tccl); } @Override @@ -39,7 +38,7 @@ public Executor executor() { } @Override - void runOnContext(AbstractContext ctx, Handler action) { + protected void runOnContext(ContextInternal ctx, Handler action) { try { nettyEventLoop().execute(() -> ctx.dispatch(action)); } catch (RejectedExecutionException ignore) { @@ -48,7 +47,7 @@ void runOnContext(AbstractContext ctx, Handler action) { } @Override - void emit(AbstractContext ctx, T argument, Handler task) { + protected void emit(ContextInternal ctx, T argument, Handler task) { EventLoop eventLoop = nettyEventLoop(); if (eventLoop.inEventLoop()) { ContextInternal prev = ctx.beginDispatch(); @@ -71,7 +70,7 @@ void emit(AbstractContext ctx, T argument, Handler task) { * */ @Override - void execute(AbstractContext ctx, T argument, Handler task) { + protected void execute(ContextInternal ctx, T argument, Handler task) { EventLoop eventLoop = nettyEventLoop(); if (eventLoop.inEventLoop()) { task.handle(argument); @@ -81,7 +80,7 @@ void execute(AbstractContext ctx, T argument, Handler task) { } @Override - void execute(AbstractContext ctx, Runnable task) { + protected void execute(ContextInternal ctx, Runnable task) { EventLoop eventLoop = nettyEventLoop(); if (eventLoop.inEventLoop()) { task.run(); @@ -96,7 +95,12 @@ public boolean isEventLoopContext() { } @Override - boolean inThread() { + public boolean isWorkerContext() { + return false; + } + + @Override + public boolean inThread() { return nettyEventLoop().inEventLoop(); } diff --git a/src/main/java/io/vertx/core/impl/HAManager.java b/src/main/java/io/vertx/core/impl/HAManager.java index 8cee2def0b5..73a90600151 100644 --- a/src/main/java/io/vertx/core/impl/HAManager.java +++ b/src/main/java/io/vertx/core/impl/HAManager.java @@ -344,7 +344,7 @@ private synchronized void checkQuorumWhenAdded(final String nodeID, final long s log.warn("Timed out waiting for group information to appear"); } else { // Remove any context we have here (from the timer) otherwise will screw things up when verticles are deployed - ContextImpl.executeIsolated(v -> { + ((VertxImpl)vertx).executeIsolated(v -> { checkQuorumWhenAdded(nodeID, start); }); } @@ -404,7 +404,7 @@ private void addToHA(String deploymentID, String verticleName, DeploymentOptions private void addToHADeployList(final String verticleName, final DeploymentOptions deploymentOptions, final Handler> doneHandler) { toDeployOnQuorum.add(() -> { - ContextImpl.executeIsolated(v -> { + ((VertxImpl)vertx).executeIsolated(v -> { deployVerticle(verticleName, deploymentOptions, doneHandler); }); }); @@ -428,7 +428,7 @@ private void undeployHADeployments() { Deployment dep = deploymentManager.getDeployment(deploymentID); if (dep != null) { if (dep.deploymentOptions().isHa()) { - ContextImpl.executeIsolated(v -> { + ((VertxImpl)vertx).executeIsolated(v -> { deploymentManager.undeployVerticle(deploymentID).onComplete(result -> { if (result.succeeded()) { log.info("Successfully undeployed HA deployment " + deploymentID + "-" + dep.verticleIdentifier() + " as there is no quorum"); @@ -523,7 +523,7 @@ private void processFailover(JsonObject failedVerticle) { final CountDownLatch latch = new CountDownLatch(1); final AtomicReference err = new AtomicReference<>(); // Now deploy this verticle on this node - ContextImpl.executeIsolated(v -> { + ((VertxImpl)vertx).executeIsolated(v -> { JsonObject options = failedVerticle.getJsonObject("options"); doDeployVerticle(verticleName, new DeploymentOptions(options), result -> { if (result.succeeded()) { diff --git a/src/main/java/io/vertx/core/impl/VertxImpl.java b/src/main/java/io/vertx/core/impl/VertxImpl.java index 3ef052c3240..2275b889ce8 100644 --- a/src/main/java/io/vertx/core/impl/VertxImpl.java +++ b/src/main/java/io/vertx/core/impl/VertxImpl.java @@ -137,7 +137,7 @@ public class VertxImpl implements VertxInternal, MetricsProvider { private final CloseFuture closeFuture; private final Transport transport; private final VertxTracer tracer; - private final ThreadLocal> stickyContext = new ThreadLocal<>(); + private final ThreadLocal> stickyContext = new ThreadLocal<>(); private final boolean disableTCCL; VertxImpl(VertxOptions options, ClusterManager clusterManager, NodeSelector nodeSelector, VertxMetrics metrics, @@ -402,8 +402,13 @@ public void runOnContext(Handler task) { } // The background pool is used for making blocking calls to legacy synchronous APIs - public ExecutorService getWorkerPool() { - return workerPool.executor(); + public WorkerPool getWorkerPool() { + return workerPool; + } + + @Override + public WorkerPool getInternalWorkerPool() { + return internalWorkerPool; } public EventLoopGroup getEventLoopGroup() { @@ -415,7 +420,7 @@ public EventLoopGroup getAcceptorEventLoopGroup() { } public ContextInternal getOrCreateContext() { - AbstractContext ctx = getContext(); + ContextInternal ctx = getContext(); if (ctx == null) { // We are running embedded - Create a context ctx = createEventLoopContext(); @@ -464,12 +469,12 @@ public boolean cancelTimer(long id) { @Override public EventLoopContext createEventLoopContext(Deployment deployment, CloseFuture closeFuture, WorkerPool workerPool, ClassLoader tccl) { - return new EventLoopContext(this, eventLoopGroup.next(), internalWorkerPool, workerPool != null ? workerPool : this.workerPool, deployment, closeFuture, tccl, disableTCCL); + return new EventLoopContext(this, eventLoopGroup.next(), internalWorkerPool, workerPool != null ? workerPool : this.workerPool, deployment, closeFuture, disableTCCL ? null : tccl); } @Override public EventLoopContext createEventLoopContext(EventLoop eventLoop, WorkerPool workerPool, ClassLoader tccl) { - return new EventLoopContext(this, eventLoop, internalWorkerPool, workerPool != null ? workerPool : this.workerPool, null, closeFuture, tccl, disableTCCL); + return new EventLoopContext(this, eventLoop, internalWorkerPool, workerPool != null ? workerPool : this.workerPool, null, closeFuture, disableTCCL ? tccl : null); } @Override @@ -479,7 +484,7 @@ public EventLoopContext createEventLoopContext() { @Override public WorkerContext createWorkerContext(Deployment deployment, CloseFuture closeFuture, WorkerPool workerPool, ClassLoader tccl) { - return new WorkerContext(this, internalWorkerPool, workerPool != null ? workerPool : this.workerPool, deployment, closeFuture, tccl, disableTCCL); + return new WorkerContext(this, internalWorkerPool, workerPool != null ? workerPool : this.workerPool, deployment, closeFuture, disableTCCL ? null : tccl); } @Override @@ -536,12 +541,12 @@ public long scheduleTimeout(ContextInternal context, return task.id; } - public AbstractContext getContext() { - AbstractContext context = (AbstractContext) ContextInternal.current(); + public ContextInternal getContext() { + ContextInternal context = ContextInternal.current(); if (context != null && context.owner() == this) { return context; } else { - WeakReference ref = stickyContext.get(); + WeakReference ref = stickyContext.get(); return ref != null ? ref.get() : null; } } @@ -1145,7 +1150,135 @@ public void removeCloseHook(Closeable hook) { } private CloseFuture resolveCloseFuture() { - AbstractContext context = getContext(); + ContextInternal context = getContext(); return context != null ? context.closeFuture() : closeFuture; } + + /** + * Execute the {@code task} disabling the thread-local association for the duration + * of the execution. {@link Vertx#currentContext()} will return {@code null}, + * @param task the task to execute + * @throws IllegalStateException if the current thread is not a Vertx thread + */ + void executeIsolated(Handler task) { + if (Thread.currentThread() instanceof VertxThread) { + ContextInternal prev = beginDispatch(null); + try { + task.handle(null); + } finally { + endDispatch(prev); + } + } else { + task.handle(null); + } + } + + static class ContextDispatch { + ContextInternal context; + ClassLoader topLevelTCCL; + } + + /** + * Context dispatch info for context running with non vertx threads (Loom). + */ + static final ThreadLocal nonVertxContextDispatch = new ThreadLocal<>(); + + /** + * Begin the emission of a context event. + *

+ * This is a low level interface that should not be used, instead {@link ContextInternal#dispatch(Object, io.vertx.core.Handler)} + * shall be used. + * + * @param context the context on which the event is emitted on + * @return the current context that shall be restored + */ + ContextInternal beginDispatch(ContextInternal context) { + Thread thread = Thread.currentThread(); + ContextInternal prev; + if (thread instanceof VertxThread) { + VertxThread vertxThread = (VertxThread) thread; + prev = vertxThread.context; + if (!ContextBase.DISABLE_TIMINGS) { + vertxThread.executeStart(); + } + vertxThread.context = context; + if (!disableTCCL) { + if (prev == null) { + vertxThread.topLevelTCCL = Thread.currentThread().getContextClassLoader(); + } + if (context != null) { + thread.setContextClassLoader(context.classLoader()); + } + } + } else { + prev = beginDispatch2(thread, context); + } + return prev; + } + + private ContextInternal beginDispatch2(Thread thread, ContextInternal context) { + ContextDispatch current = nonVertxContextDispatch.get(); + ContextInternal prev; + if (current != null) { + prev = current.context; + } else { + current = new ContextDispatch(); + nonVertxContextDispatch.set(current); + prev = null; + } + current.context = context; + if (!disableTCCL) { + if (prev == null) { + current.topLevelTCCL = Thread.currentThread().getContextClassLoader(); + } + thread.setContextClassLoader(context.classLoader()); + } + return prev; + } + + /** + * End the emission of a context task. + *

+ * This is a low level interface that should not be used, instead {@link ContextInternal#dispatch(Object, io.vertx.core.Handler)} + * shall be used. + * + * @param prev the previous context thread to restore, might be {@code null} + */ + void endDispatch(ContextInternal prev) { + Thread thread = Thread.currentThread(); + if (thread instanceof VertxThread) { + VertxThread vertxThread = (VertxThread) thread; + vertxThread.context = prev; + if (!disableTCCL) { + ClassLoader tccl; + if (prev == null) { + tccl = vertxThread.topLevelTCCL; + vertxThread.topLevelTCCL = null; + } else { + tccl = prev.classLoader(); + } + Thread.currentThread().setContextClassLoader(tccl); + } + if (!ContextBase.DISABLE_TIMINGS) { + vertxThread.executeEnd(); + } + } else { + endDispatch2(prev); + } + } + + private void endDispatch2(ContextInternal prev) { + ClassLoader tccl; + ContextDispatch current = nonVertxContextDispatch.get(); + if (prev != null) { + current.context = prev; + tccl = prev.classLoader(); + } else { + nonVertxContextDispatch.remove(); + tccl = current.topLevelTCCL; + } + if (!disableTCCL) { + Thread.currentThread().setContextClassLoader(tccl); + } + } } diff --git a/src/main/java/io/vertx/core/impl/VertxInternal.java b/src/main/java/io/vertx/core/impl/VertxInternal.java index 5d431e8c6e2..36da3b9dffc 100644 --- a/src/main/java/io/vertx/core/impl/VertxInternal.java +++ b/src/main/java/io/vertx/core/impl/VertxInternal.java @@ -71,7 +71,9 @@ public interface VertxInternal extends Vertx { EventLoopGroup getAcceptorEventLoopGroup(); - ExecutorService getWorkerPool(); + WorkerPool getWorkerPool(); + + WorkerPool getInternalWorkerPool(); Map sharedHttpServers(); diff --git a/src/main/java/io/vertx/core/impl/VertxThread.java b/src/main/java/io/vertx/core/impl/VertxThread.java index 0c867e0521e..7322fcabe51 100644 --- a/src/main/java/io/vertx/core/impl/VertxThread.java +++ b/src/main/java/io/vertx/core/impl/VertxThread.java @@ -25,8 +25,8 @@ public class VertxThread extends FastThreadLocalThread implements BlockedThreadC private final long maxExecTime; private final TimeUnit maxExecTimeUnit; private long execStart; - private ContextInternal context; - private ClassLoader topLevelTCCL; + ContextInternal context; + ClassLoader topLevelTCCL; public VertxThread(Runnable target, String name, boolean worker, long maxExecTime, TimeUnit maxExecTimeUnit) { super(target, name); @@ -42,13 +42,13 @@ ContextInternal context() { return context; } - private void executeStart() { + void executeStart() { if (context == null) { execStart = System.nanoTime(); } } - private void executeEnd() { + void executeEnd() { if (context == null) { execStart = 0; } @@ -72,43 +72,4 @@ public TimeUnit maxExecTimeUnit() { return maxExecTimeUnit; } - /** - * Begin the emission of a context event. - *

- * This is a low level interface that should not be used, instead {@link ContextInternal#dispatch(Object, io.vertx.core.Handler)} - * shall be used. - * - * @param context the context on which the event is emitted on - * @return the current context that shall be restored - */ - ContextInternal beginEmission(ContextInternal context) { - if (!ContextImpl.DISABLE_TIMINGS) { - executeStart(); - } - ContextInternal prev = this.context; - if (prev == null) { - topLevelTCCL = Thread.currentThread().getContextClassLoader(); - } - this.context = context; - return prev; - } - - /** - * End the emission of a context task. - *

- * This is a low level interface that should not be used, instead {@link ContextInternal#dispatch(Object, io.vertx.core.Handler)} - * shall be used. - * - * @param prev the previous context thread to restore, might be {@code null} - */ - void endEmission(ContextInternal prev) { - context = prev; - if (prev == null) { - Thread.currentThread().setContextClassLoader(topLevelTCCL); - topLevelTCCL = null; - } - if (!ContextImpl.DISABLE_TIMINGS) { - executeEnd(); - } - } } diff --git a/src/main/java/io/vertx/core/impl/WorkerContext.java b/src/main/java/io/vertx/core/impl/WorkerContext.java index 79a1304d420..29fe6eea27c 100644 --- a/src/main/java/io/vertx/core/impl/WorkerContext.java +++ b/src/main/java/io/vertx/core/impl/WorkerContext.java @@ -22,20 +22,19 @@ /** * @author Tim Fox */ -public class WorkerContext extends ContextImpl { +public class WorkerContext extends ContextBase { WorkerContext(VertxInternal vertx, WorkerPool internalBlockingPool, WorkerPool workerPool, Deployment deployment, CloseFuture closeFuture, - ClassLoader tccl, - boolean disableTCCL) { - super(vertx, vertx.getEventLoopGroup().next(), internalBlockingPool, workerPool, deployment, closeFuture, tccl, disableTCCL); + ClassLoader tccl) { + super(vertx, vertx.getEventLoopGroup().next(), internalBlockingPool, workerPool, deployment, closeFuture, tccl); } @Override - void runOnContext(AbstractContext ctx, Handler action) { + protected void runOnContext(ContextInternal ctx, Handler action) { try { run(ctx, null, action); } catch (RejectedExecutionException ignore) { @@ -50,19 +49,19 @@ void runOnContext(AbstractContext ctx, Handler action) { * */ @Override - void execute(AbstractContext ctx, T argument, Handler task) { + protected void execute(ContextInternal ctx, T argument, Handler task) { execute(orderedTasks, argument, task); } @Override - void emit(AbstractContext ctx, T argument, Handler task) { + protected void emit(ContextInternal ctx, T argument, Handler task) { execute(orderedTasks, argument, arg -> { ctx.dispatch(arg, task); }); } @Override - void execute(AbstractContext ctx, Runnable task) { + protected void execute(ContextInternal ctx, Runnable task) { execute(this, task, Runnable::run); } @@ -71,6 +70,11 @@ public boolean isEventLoopContext() { return false; } + @Override + public boolean isWorkerContext() { + return true; + } + private Executor executor; @Override @@ -125,7 +129,7 @@ private void execute(TaskQueue queue, T argument, Handler task) { } @Override - boolean inThread() { + public boolean inThread() { return Context.isOnWorkerThread(); } } diff --git a/src/main/java/io/vertx/core/impl/WorkerExecutorImpl.java b/src/main/java/io/vertx/core/impl/WorkerExecutorImpl.java index 1926fafc5bb..bf136ade848 100644 --- a/src/main/java/io/vertx/core/impl/WorkerExecutorImpl.java +++ b/src/main/java/io/vertx/core/impl/WorkerExecutorImpl.java @@ -63,8 +63,8 @@ public WorkerPool getPool() { } } ContextInternal context = (ContextInternal) vertx.getOrCreateContext(); - ContextImpl impl = context instanceof DuplicatedContext ? ((DuplicatedContext)context).delegate : (ContextImpl) context; - return ContextImpl.executeBlocking(context, blockingCodeHandler, pool, ordered ? impl.orderedTasks : null); + ContextBase impl = context instanceof DuplicatedContext ? ((DuplicatedContext)context).delegate : (ContextBase) context; + return ContextBase.executeBlocking(context, blockingCodeHandler, pool, ordered ? impl.orderedTasks : null); } public void executeBlocking(Handler> blockingCodeHandler, boolean ordered, Handler> asyncResultHandler) { diff --git a/src/main/java/io/vertx/core/streams/impl/InboundBuffer.java b/src/main/java/io/vertx/core/streams/impl/InboundBuffer.java index 32b65a3f95f..5391b616ac0 100644 --- a/src/main/java/io/vertx/core/streams/impl/InboundBuffer.java +++ b/src/main/java/io/vertx/core/streams/impl/InboundBuffer.java @@ -99,8 +99,7 @@ public InboundBuffer(Context context, long highWaterMark) { } private void checkThread() { - Thread thread = Thread.currentThread(); - if (!(thread instanceof FastThreadLocalThread)) { + if (!context.inThread()) { throw new IllegalStateException("This operation must be called from a Vert.x thread"); } } diff --git a/src/test/benchmarks/io/vertx/core/impl/BenchmarkContext.java b/src/test/benchmarks/io/vertx/core/impl/BenchmarkContext.java index b376d8604f3..15a313ab464 100644 --- a/src/test/benchmarks/io/vertx/core/impl/BenchmarkContext.java +++ b/src/test/benchmarks/io/vertx/core/impl/BenchmarkContext.java @@ -19,7 +19,7 @@ /** * @author Julien Viet */ -public class BenchmarkContext extends ContextImpl { +public class BenchmarkContext extends ContextBase { public static BenchmarkContext create(Vertx vertx) { VertxImpl impl = (VertxImpl) vertx; @@ -32,7 +32,7 @@ public static BenchmarkContext create(Vertx vertx) { } public BenchmarkContext(VertxInternal vertx, WorkerPool internalBlockingPool, WorkerPool workerPool, ClassLoader tccl) { - super(vertx, vertx.getEventLoopGroup().next(), internalBlockingPool, workerPool, null, null, tccl, false); + super(vertx, vertx.getEventLoopGroup().next(), internalBlockingPool, workerPool, null, null, tccl); } @Override @@ -41,27 +41,27 @@ public Executor executor() { } @Override - boolean inThread() { + public boolean inThread() { throw new UnsupportedOperationException(); } @Override - void emit(AbstractContext ctx, T argument, Handler task) { + protected void emit(ContextInternal ctx, T argument, Handler task) { throw new UnsupportedOperationException(); } @Override - void runOnContext(AbstractContext ctx, Handler action) { + protected void runOnContext(ContextInternal ctx, Handler action) { ctx.dispatch(null, action); } @Override - void execute(AbstractContext ctx, T argument, Handler task) { + protected void execute(ContextInternal ctx, T argument, Handler task) { task.handle(argument); } @Override - void execute(AbstractContext ctx, Runnable task) { + protected void execute(ContextInternal ctx, Runnable task) { task.run(); } @@ -75,4 +75,8 @@ public boolean isEventLoopContext() { return false; } + @Override + public boolean isWorkerContext() { + return false; + } } diff --git a/src/test/java/io/vertx/core/ContextTest.java b/src/test/java/io/vertx/core/ContextTest.java index 4116517a064..9980d1c374c 100644 --- a/src/test/java/io/vertx/core/ContextTest.java +++ b/src/test/java/io/vertx/core/ContextTest.java @@ -983,4 +983,34 @@ public void testIsDuplicatedContextFromWorkerContext() { }); await(); } + + @Test + public void testDispatchContextOnAnyThread() { + ClassLoader tccl1 = new URLClassLoader(new URL[0]); + ClassLoader tccl2 = new URLClassLoader(new URL[0]); + VertxImpl impl = (VertxImpl) vertx; + ContextInternal ctx1 = new FakeContext(impl, tccl1); + ContextInternal ctx2 = new FakeContext(impl, tccl2); + AtomicInteger exec = new AtomicInteger(); + Thread thread = Thread.currentThread(); + ClassLoader current = thread.getContextClassLoader(); + ctx1.dispatch(() -> { + assertSame(thread, Thread.currentThread()); + assertSame(ctx1, Vertx.currentContext()); + assertSame(tccl1, thread.getContextClassLoader()); + assertEquals(1, exec.incrementAndGet()); + ctx2.dispatch(() -> { + assertSame(thread, Thread.currentThread()); + assertSame(ctx2, Vertx.currentContext()); + assertSame(tccl2, thread.getContextClassLoader()); + assertEquals(2, exec.incrementAndGet()); + }); + assertSame(ctx1, Vertx.currentContext()); + assertSame(tccl1, thread.getContextClassLoader()); + assertEquals(2, exec.get()); + }); + assertNull(Vertx.currentContext()); + assertSame(current, thread.getContextClassLoader()); + assertEquals(2, exec.get()); + } } diff --git a/src/test/java/io/vertx/core/FakeContext.java b/src/test/java/io/vertx/core/FakeContext.java new file mode 100644 index 00000000000..1d8845bc646 --- /dev/null +++ b/src/test/java/io/vertx/core/FakeContext.java @@ -0,0 +1,174 @@ +package io.vertx.core; + +import io.netty.channel.EventLoop; +import io.vertx.codegen.annotations.Nullable; +import io.vertx.core.impl.CloseFuture; +import io.vertx.core.impl.ContextInternal; +import io.vertx.core.impl.Deployment; +import io.vertx.core.impl.TaskQueue; +import io.vertx.core.impl.VertxImpl; +import io.vertx.core.impl.VertxInternal; +import io.vertx.core.impl.WorkerPool; +import io.vertx.core.json.JsonObject; +import io.vertx.core.spi.tracing.VertxTracer; + +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; + +class FakeContext implements ContextInternal { + + private final VertxImpl impl; + private final ClassLoader tccl; + + public FakeContext(VertxImpl impl, ClassLoader classLoader) { + this.impl = impl; + this.tccl = classLoader; + } + + @Override + public Executor executor() { + return command -> { + + }; + } + + @Override + public void runOnContext(Handler action) { + + } + + @Override + public boolean inThread() { + return false; + } + + @Override + public Future<@Nullable T> executeBlocking(Handler> blockingCodeHandler, boolean ordered) { + return null; + } + + @Override + public String deploymentID() { + return null; + } + + @Override + public @Nullable JsonObject config() { + return null; + } + + @Override + public int getInstanceCount() { + return 0; + } + + @Override + public Context exceptionHandler(@Nullable Handler handler) { + return null; + } + + @Override + public @Nullable Handler exceptionHandler() { + return null; + } + + @Override + public boolean isEventLoopContext() { + return false; + } + + @Override + public boolean isWorkerContext() { + return false; + } + + @Override + public EventLoop nettyEventLoop() { + return null; + } + + @Override + public Future executeBlocking(Handler> blockingCodeHandler, TaskQueue queue) { + return null; + } + + @Override + public Future executeBlockingInternal(Handler> action) { + return null; + } + + @Override + public Future executeBlockingInternal(Handler> action, boolean ordered) { + return null; + } + + @Override + public Deployment getDeployment() { + return null; + } + + @Override + public VertxInternal owner() { + return impl; + } + + @Override + public void emit(T argument, Handler task) { + + } + + @Override + public void execute(Runnable task) { + + } + + @Override + public void execute(T argument, Handler task) { + + } + + @Override + public void reportException(Throwable t) { + + } + + @Override + public ConcurrentMap contextData() { + return null; + } + + @Override + public ConcurrentMap localContextData() { + return null; + } + + @Override + public ClassLoader classLoader() { + return tccl; + } + + @Override + public WorkerPool workerPool() { + return null; + } + + @Override + public VertxTracer tracer() { + return null; + } + + @Override + public ContextInternal duplicate() { + return null; + } + + @Override + public boolean isDeployment() { + return false; + } + + @Override + public CloseFuture closeFuture() { + return null; + } +}