Skip to content

Commit

Permalink
+ kamon-executors: providers for Runnable and Callable wrappers (kamo…
Browse files Browse the repository at this point in the history
…n-io#1331);

 + kamon-jdbc: wrapper SlickContextAwareRunnable
  • Loading branch information
alexmihailov authored and Alex Mihailov committed Apr 2, 2024
1 parent 7e31bad commit afc6a03
Show file tree
Hide file tree
Showing 11 changed files with 547 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,45 @@

package kamon.instrumentation.executor;

import com.typesafe.config.Config;
import kamon.Kamon;
import kamon.context.Context;
import kamon.context.Storage.Scope;
import kamon.instrumentation.executor.CaptureContextOnSubmitAdvices.CallableCollectionWrapperAdvisor;
import kamon.instrumentation.executor.CaptureContextOnSubmitAdvices.CallableWrapperAdvisor;
import kamon.instrumentation.executor.CaptureContextOnSubmitAdvices.RunnableWrapperAdvisor;
import kamon.instrumentation.executor.ContextAware.ContextAwareCallableProvider;
import kamon.instrumentation.executor.ContextAware.ContextAwareRunnableProvider;
import kamon.instrumentation.executor.ContextAware.DefaultContextAwareCallable;
import kamon.instrumentation.executor.ContextAware.DefaultContextAwareRunnable;
import kanela.agent.api.instrumentation.InstrumentationBuilder;
import kanela.agent.bootstrap.context.ContextHandler;
import kanela.agent.bootstrap.context.ContextProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Callable;

import static java.text.MessageFormat.format;
import static java.util.Collections.emptyList;
import static java.util.stream.Collectors.toList;

public final class CaptureContextOnSubmitInstrumentation extends InstrumentationBuilder {

private static final Logger LOG = LoggerFactory.getLogger(CaptureContextOnSubmitInstrumentation.class);

private volatile static Settings settings = readSettings(Kamon.config());

public CaptureContextOnSubmitInstrumentation() {

/**
* Set the ContextProvider
*/
ContextHandler.setContextProvider(new KamonContextProvider());

Kamon.onReconfigure(newConfig -> { settings = readSettings(newConfig); });

/**
* Instrument all implementations of:
*
Expand Down Expand Up @@ -74,65 +91,98 @@ public CaptureContextOnSubmitInstrumentation() {

}

/**
* Runs a Runnable within Kamon Context
*/
private static class ContextAwareRunnable implements Runnable {
private static final class Settings {
public final List<ContextAwareRunnableProvider> runnableAwareProviders;
public final List<ContextAwareCallableProvider> callableAwareProviders;

private final Runnable underlying;
private final Context context;

ContextAwareRunnable(Runnable r) {
this.context = Kamon.currentContext();
this.underlying = r;
private Settings(
List<ContextAwareRunnableProvider> runnableAwareProviders,
List<ContextAwareCallableProvider> callableAwareProviders
) {
this.runnableAwareProviders = runnableAwareProviders;
this.callableAwareProviders = callableAwareProviders;
}
}

@Override
public void run() {
final Scope scope = Kamon.storeContext(context);
try {
underlying.run();
} finally {
scope.close();
}
private static Settings readSettings(Config config) {
Config executorCaptureConfig = config.getConfig("kanela.modules.executor-service-capture-on-submit");
List<ContextAwareRunnableProvider> runnableAwareProviders ;
if (executorCaptureConfig.hasPath("context-aware-runnable-providers")) {
runnableAwareProviders = executorCaptureConfig.getStringList("context-aware-runnable-providers")
.stream()
.map(CaptureContextOnSubmitInstrumentation::loadRunnableProvider)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(toList());
} else {
runnableAwareProviders = emptyList();
}
}

/**
* Runs a Callable within Kamon Context
*/
private static class ContextAwareCallable<A> implements Callable<A> {
List<ContextAwareCallableProvider> callableAwareProviders;
if (executorCaptureConfig.hasPath("context-aware-callable-providers")) {
callableAwareProviders = executorCaptureConfig.getStringList("context-aware-callable-providers")
.stream()
.map(CaptureContextOnSubmitInstrumentation::loadCallableProvider)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(toList());
} else {
callableAwareProviders = emptyList();
}

private final Callable<A> underlying;
private final Context context;
return new Settings(runnableAwareProviders, callableAwareProviders);
}

ContextAwareCallable(Callable<A> c) {
this.context = Kamon.currentContext();
this.underlying = c;
private static Optional<ContextAwareRunnableProvider> loadRunnableProvider(String providerClassName) {
Optional<ContextAwareRunnableProvider> providerOpt;
try {
providerOpt = Optional.of(
(ContextAwareRunnableProvider) Class.forName(providerClassName).getConstructor().newInstance()
);
} catch (Exception e) {
LOG.warn(format("Error trying to load ContextAwareRunnableProvider: {0}.", providerClassName), e);
providerOpt = Optional.empty();
}
return providerOpt;
}

public A call() throws Exception {
final Scope scope = Kamon.storeContext(context);
try {
return underlying.call();
} finally {
scope.close();
}
private static Optional<ContextAwareCallableProvider> loadCallableProvider(String providerClassName) {
Optional<ContextAwareCallableProvider> providerOpt;
try {
providerOpt = Optional.of(
(ContextAwareCallableProvider) Class.forName(providerClassName).getConstructor().newInstance()
);
} catch (Exception e) {
LOG.warn(format("Error trying to load ContextAwareCallableProvider: {0}.", providerClassName), e);
providerOpt = Optional.empty();
}
return providerOpt;
}

/**
* implementation of kanela.agent.bootstrap.context.ContextProvider
*/
private static class KamonContextProvider implements ContextProvider {

@Override
public Runnable wrapInContextAware(Runnable runnable) {
return new ContextAwareRunnable(runnable);
public Runnable wrapInContextAware(Runnable r) {
return settings.runnableAwareProviders
.stream()
.filter(p -> p.test(r))
.findFirst()
.map(it -> it.provide(r))
.orElse(new DefaultContextAwareRunnable(r));
}

@SuppressWarnings("rawtypes")
@Override
public <A> Callable wrapInContextAware(Callable<A> callable) {
return new ContextAwareCallable<>(callable);
public <A> Callable wrapInContextAware(Callable<A> c) {
return settings.callableAwareProviders
.stream()
.filter(p -> p.test(c))
.findFirst()
.map(it -> it.provide(c))
.orElse(new DefaultContextAwareCallable<>(c));
}
}
}
}
Loading

0 comments on commit afc6a03

Please sign in to comment.