Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add helper class to capture context using ScheduledExecutorService #6712

Merged
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Gradle
build
.gradle
.kotlin
local.properties
out/

Expand Down
29 changes: 29 additions & 0 deletions context/src/main/java/io/opentelemetry/context/Context.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
Expand Down Expand Up @@ -135,9 +136,37 @@ static Executor taskWrapping(Executor executor) {
* @since 1.1.0
*/
static ExecutorService taskWrapping(ExecutorService executorService) {
if (executorService instanceof CurrentContextExecutorService) {
return executorService;
}
return new CurrentContextExecutorService(executorService);
}

/**
* Returns an {@link ScheduledExecutorService} which delegates to the provided {@code
* executorService}, wrapping all invocations of {@link ExecutorService} methods such as {@link
* ExecutorService#execute(Runnable)} or {@link ExecutorService#submit(Runnable)} with the
* {@linkplain Context#current() current context} at the time of invocation.
ammachado marked this conversation as resolved.
Show resolved Hide resolved
*
* <p>This is generally used to create an {@link ScheduledExecutorService} which will forward the
* {@link Context} during an invocation to another thread. For example, you may use something like
* {@code ScheduledExecutorService dbExecutor = Context.wrapTasks(threadPool)} to ensure calls
* like {@code dbExecutor.execute(() -> database.query())} have {@link Context} available on the
* thread executing database queries.
*
* <p>Note: The context will not be propagated for {@link
* ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)} and {@link
* ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)} calls.
*
* @since 1.43.0
*/
static ScheduledExecutorService taskWrapping(ScheduledExecutorService executorService) {
ammachado marked this conversation as resolved.
Show resolved Hide resolved
if (executorService instanceof CurrentContextScheduledExecutorService) {
return executorService;
}
return new CurrentContextScheduledExecutorService(executorService);
}

/**
* Returns the value stored in this {@link Context} for the given {@link ContextKey}, or {@code
* null} if there is no value for the key in this context.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

final class CurrentContextExecutorService extends ForwardingExecutorService {
class CurrentContextExecutorService extends ForwardingExecutorService {

CurrentContextExecutorService(ExecutorService delegate) {
super(delegate);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.context;

import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

final class CurrentContextScheduledExecutorService extends CurrentContextExecutorService
implements ScheduledExecutorService {

private final ScheduledExecutorService delegate;

CurrentContextScheduledExecutorService(ScheduledExecutorService delegate) {
super(delegate);
this.delegate = delegate;
}
Comment on lines +16 to +21
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private final ScheduledExecutorService delegate;
CurrentContextScheduledExecutorService(ScheduledExecutorService delegate) {
super(delegate);
this.delegate = delegate;
}
CurrentContextScheduledExecutorService(ScheduledExecutorService delegate) {
super(delegate);
}


@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return delegate.schedule(Context.current().wrap(command), delay, unit);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return delegate.schedule(Context.current().wrap(command), delay, unit);
return ((ScheduledExecutorService) delegate())
.schedule(Context.current().wrap(command), delay, unit);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I prefer just keeping a local copy of the delegate rather than having to make this cast everywhere.

}

@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return delegate.schedule(Context.current().wrap(callable), delay, unit);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return delegate.schedule(Context.current().wrap(callable), delay, unit);
return ((ScheduledExecutorService) delegate())
.schedule(Context.current().wrap(callable), delay, unit);

}

@Override
public ScheduledFuture<?> scheduleAtFixedRate(
Runnable command, long initialDelay, long period, TimeUnit unit) {
return delegate.scheduleAtFixedRate(command, initialDelay, period, unit);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return delegate.scheduleAtFixedRate(command, initialDelay, period, unit);
return ((ScheduledExecutorService) delegate())
.scheduleAtFixedRate(command, initialDelay, period, unit);

}

@Override
public ScheduledFuture<?> scheduleWithFixedDelay(
Runnable command, long initialDelay, long delay, TimeUnit unit) {
return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit);
return ((ScheduledExecutorService) delegate())
.scheduleWithFixedDelay(command, initialDelay, delay, unit);

}
}
Loading
Loading