Skip to content

Commit

Permalink
Clean up task executor type hierarchy
Browse files Browse the repository at this point in the history
Motivation:

There are a few issues in our task executor type hierarchy.

- We don't provide the context-aware version of the following task executor types:
  - `Executor` -> `ContextAwareExecutor`
  - `ExecutorService` -> `ContextAwareExecutorService`
  - `BlockingTaskExecutor` -> `ContextAwareBlockingTaskExecutor`
- There are no `RequestContext.makeContextAware()` and `makeContextPropagating()`
  that accept the aforementioned executor types.
- `blockingTaskExecutor` properties often use `ScheduledExecutorService`
  rather than `BlockingTaskExecutor`.

Modifications:

- Added `ContextAwareExecutor`.
- Made `ContextAwareExecutorService` public.
- Added `ContxtAwareBlockingTaskExecutor`.
- Added the implementation of the aforementioned types.
- Added the context-propagating version of the aforementioned types.
- Merged `common.RequestContextUtil` to `internal.common.RequestContextUtil`
  for less confusion.
- Added `BlockingTaskExecutor.of(ScheduledExecutorService)`.
- Changed the type of `blockingTaskExecutor` from `ScheduledExecutorService`.
- Added an overloaded builder method `blockingTaskExecutor(BlockingTaskExecutor)`.
- Removed the problematic wrapping of `blockingTaskExecutor` in
  `DefaultServerConfig.monitorBlockingTaskExecutor()`.
  - The wrapped executor is only used when a user gets the executor via
    `ServerConfig.blockingTaskExecutor()`, which is often not the case.
    A user gets the executor via `ServiceRequestContext.blockingTaskExecutor()`
    and thus the wrapped executor is never used.

Result:

- API completeness
- (Breaking change) The type of `blockingTaskExecutor` property has been
  changed from `ScheduledExecutorService` to `BlockingTaskExecutor`.
  - Simply recompiling your code should be enough in most cases because
    `BlockingTaskExecutor` is a `ScheduledExecutorService`.
  • Loading branch information
trustin committed Mar 17, 2023
1 parent 72de6ff commit 64978ae
Show file tree
Hide file tree
Showing 42 changed files with 714 additions and 229 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 LINE Corporation
* Copyright 2023 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
Expand All @@ -15,14 +15,12 @@
*/
package com.linecorp.armeria.common;

final class RequestContextUtil {
import com.linecorp.armeria.common.util.BlockingTaskExecutor;

static void ensureSameCtx(RequestContext ctx, ContextHolder contextHolder, Class<?> type) {
if (ctx != contextHolder.context()) {
throw new IllegalArgumentException(
"cannot create a " + type.getSimpleName() + " using another " + contextHolder);
}
abstract class AbstractContextAwareBlockingTaskExecutor
extends AbstractContextAwareScheduledExecutorService<BlockingTaskExecutor>
implements BlockingTaskExecutor {
AbstractContextAwareBlockingTaskExecutor(BlockingTaskExecutor executor) {
super(executor);
}

private RequestContextUtil() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright 2023 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.linecorp.armeria.common;

import static java.util.Objects.requireNonNull;

import java.util.concurrent.Executor;
import java.util.function.Supplier;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.linecorp.armeria.common.annotation.Nullable;

abstract class AbstractContextAwareExecutor<ES extends Executor> implements Executor {
enum LogRequestContextWarningOnce implements Supplier<RequestContext> {
INSTANCE;

@Override
@Nullable
public RequestContext get() {
ClassLoaderHack.loadMe();
return null;
}

/**
* This won't be referenced until {@link #get()} is called. If there's only one classloader, the
* initializer will only be called once.
*/
private static final class ClassLoaderHack {
static void loadMe() {}

static {
logger.warn(
"Attempted to propagate request context to an executor task, " +
"but no request context available. " +
"If this executor is used for non-request-related tasks then it's safe to ignore this",
new NoRequestContextException());
}
}

private static final class NoRequestContextException extends RuntimeException {
private static final long serialVersionUID = 2804189311774982052L;
}
}

private static final Logger logger = LoggerFactory.getLogger(AbstractContextAwareExecutor.class);
private final ES executor;

AbstractContextAwareExecutor(ES executor) {
this.executor = requireNonNull(executor, "executor");
}

@Nullable
abstract RequestContext contextOrNull();

public final ES withoutContext() {
return executor;
}

final Runnable makeContextAware(Runnable task) {
final RequestContext context = contextOrNull();
return context == null ? task : context.makeContextAware(task);
}

@Override
public final void execute(Runnable command) {
executor.execute(makeContextAware(command));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,139 +26,89 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.linecorp.armeria.common.annotation.Nullable;

abstract class AbstractContextAwareExecutorService<ES extends ExecutorService> implements ExecutorService {
enum LogRequestContextWarningOnce implements Supplier<RequestContext> {
INSTANCE;

@Override
@Nullable
public RequestContext get() {
ClassLoaderHack.loadMe();
return null;
}

/**
* This won't be referenced until {@link #get()} is called. If there's only one classloader, the
* initializer will only be called once.
*/
private static final class ClassLoaderHack {
static void loadMe() {}

static {
logger.warn(
"Attempted to propagate request context to an executor task, " +
"but no request context available. " +
"If this executor is used for non-request-related tasks then it's safe to ignore this",
new NoRequestContextException());
}
}

private static final class NoRequestContextException extends RuntimeException {
private static final long serialVersionUID = 2804189311774982052L;
}
}

private static final Logger logger = LoggerFactory.getLogger(
AbstractContextAwareScheduledExecutorService.class);
final ES executor;
abstract class AbstractContextAwareExecutorService<ES extends ExecutorService>
extends AbstractContextAwareExecutor<ES> implements ExecutorService {

AbstractContextAwareExecutorService(ES executor) {
this.executor = requireNonNull(executor, "executor");
super(executor);
}

@Nullable
abstract RequestContext contextOrNull();

@Override
public final void shutdown() {
executor.shutdown();
withoutContext().shutdown();
}

@Override
public final List<Runnable> shutdownNow() {
return executor.shutdownNow();
}

final Runnable makeContextAware(Runnable task) {
final RequestContext context = contextOrNull();
return context == null ? task : context.makeContextAware(task);
}

final <T> Callable<T> makeContextAware(Callable<T> task) {
final RequestContext context = contextOrNull();
return context == null ? task : context.makeContextAware(task);
}

private <T> Collection<? extends Callable<T>> makeContextAware(
Collection<? extends Callable<T>> tasks) {
return requireNonNull(tasks, "tasks").stream().map(this::makeContextAware)
.collect(toImmutableList());
return withoutContext().shutdownNow();
}

@Override
public Future<?> submit(Runnable task) {
return executor.submit(makeContextAware(task));
return withoutContext().submit(makeContextAware(task));
}

@Override
public <T> Future<T> submit(Runnable task, T result) {
return executor.submit(makeContextAware(task), result);
return withoutContext().submit(makeContextAware(task), result);
}

@Override
public <T> Future<T> submit(Callable<T> task) {
return executor.submit(makeContextAware(task));
return withoutContext().submit(makeContextAware(task));
}

@Override
public final boolean isShutdown() {
return executor.isShutdown();
return withoutContext().isShutdown();
}

@Override
public final boolean isTerminated() {
return executor.isTerminated();
return withoutContext().isTerminated();
}

@Override
public final boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return executor.awaitTermination(timeout, unit);
return withoutContext().awaitTermination(timeout, unit);
}

@Override
public final <T> List<Future<T>> invokeAll(
Collection<? extends Callable<T>> tasks) throws InterruptedException {
return executor.invokeAll(makeContextAware(tasks));
return withoutContext().invokeAll(makeContextAware(tasks));
}

@Override
public final <T> List<Future<T>> invokeAll(
Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
return executor.invokeAll(makeContextAware(tasks), timeout, unit);
return withoutContext().invokeAll(makeContextAware(tasks), timeout, unit);
}

@Override
public final <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
return executor.invokeAny(makeContextAware(tasks));
return withoutContext().invokeAny(makeContextAware(tasks));
}

@Override
public final <T> T invokeAny(
Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return executor.invokeAny(makeContextAware(tasks), timeout, unit);
return withoutContext().invokeAny(makeContextAware(tasks), timeout, unit);
}

@Override
public final void execute(Runnable command) {
executor.execute(makeContextAware(command));
final <T> Callable<T> makeContextAware(Callable<T> task) {
final RequestContext context = contextOrNull();
return context == null ? task : context.makeContextAware(task);
}

private <T> Collection<? extends Callable<T>> makeContextAware(
Collection<? extends Callable<T>> tasks) {
return requireNonNull(tasks, "tasks")
.stream()
.map(this::makeContextAware)
.collect(toImmutableList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,33 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

abstract class AbstractContextAwareScheduledExecutorService
extends AbstractContextAwareExecutorService<ScheduledExecutorService>
abstract class AbstractContextAwareScheduledExecutorService<ES extends ScheduledExecutorService>
extends AbstractContextAwareExecutorService<ES>
implements ScheduledExecutorService {
AbstractContextAwareScheduledExecutorService(ScheduledExecutorService executor) {

AbstractContextAwareScheduledExecutorService(ES executor) {
super(executor);
}

@Override
public final ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return executor.schedule(makeContextAware(command), delay, unit);
return withoutContext().schedule(makeContextAware(command), delay, unit);
}

@Override
public final <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return executor.schedule(makeContextAware(callable), delay, unit);
return withoutContext().schedule(makeContextAware(callable), delay, unit);
}

@Override
public final ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period,
TimeUnit unit) {
return executor.scheduleAtFixedRate(makeContextAware(command), initialDelay, period, unit);
return withoutContext().scheduleAtFixedRate(makeContextAware(command), initialDelay, period, unit);
}

@Override
public final ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay,
TimeUnit unit) {
return executor.scheduleWithFixedDelay(makeContextAware(command), initialDelay, delay, unit);
return withoutContext().scheduleWithFixedDelay(makeContextAware(command), initialDelay, delay, unit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package com.linecorp.armeria.common;

import java.util.concurrent.ScheduledExecutorService;

import com.linecorp.armeria.client.ClientFactoryBuilder;
import com.linecorp.armeria.common.util.BlockingTaskExecutor;
import com.linecorp.armeria.common.util.EventLoopGroups;
Expand All @@ -37,10 +35,10 @@ public final class CommonPools {
EventLoopGroups.newEventLoopGroup(Flags.numCommonWorkers(), "armeria-common-worker", true);

/**
* Returns the default common blocking task {@link ScheduledExecutorService} which is used for
* Returns the default common blocking task {@link BlockingTaskExecutor} which is used for
* potentially long-running tasks which may block I/O threads.
*/
public static ScheduledExecutorService blockingTaskExecutor() {
public static BlockingTaskExecutor blockingTaskExecutor() {
return BLOCKING_TASK_EXECUTOR;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2023 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.linecorp.armeria.common;

import static com.linecorp.armeria.internal.common.RequestContextUtil.ensureSameCtx;
import static java.util.Objects.requireNonNull;

import com.linecorp.armeria.common.util.BlockingTaskExecutor;

/**
* A delegating {@link BlockingTaskExecutor} that sets the {@link RequestContext} before executing
* any submitted tasks.
*/
public interface ContextAwareBlockingTaskExecutor
extends BlockingTaskExecutor, ContextAwareScheduledExecutorService {

/**
* Returns a new {@link ContextAwareEventLoop} that sets the specified {@link RequestContext}
* before executing any submitted tasks.
*/
static ContextAwareBlockingTaskExecutor of(RequestContext context, BlockingTaskExecutor executor) {
requireNonNull(context, "context");
requireNonNull(executor, "executor");
if (executor instanceof ContextAwareBlockingTaskExecutor) {
ensureSameCtx(context, (ContextAwareBlockingTaskExecutor) executor,
ContextAwareBlockingTaskExecutor.class);
return (ContextAwareBlockingTaskExecutor) executor;
}
return new DefaultContextAwareBlockingTaskExecutor(context, executor);
}

/**
* Returns the {@link RequestContext} that was specified when creating
* this {@link ContextAwareBlockingTaskExecutor}.
*/
@Override
RequestContext context();

/**
* Returns the {@link BlockingTaskExecutor} that executes the submitted tasks without setting the
* {@link RequestContext}.
*/
@Override
BlockingTaskExecutor withoutContext();
}
Loading

0 comments on commit 64978ae

Please sign in to comment.