Skip to content

Commit

Permalink
Added OTel Instrumentation feature for both SimpleScheduler and Quart…
Browse files Browse the repository at this point in the history
…z Scheduler
  • Loading branch information
mskacelik committed Nov 23, 2023
1 parent 63ff7b0 commit 3f6c563
Show file tree
Hide file tree
Showing 11 changed files with 234 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,5 +146,4 @@ void resteasyReactiveIntegration(
preExceptionMapperHandlerBuildItemBuildProducer
.produce(new PreExceptionMapperHandlerBuildItem(new AttachExceptionHandler()));
}

}
11 changes: 11 additions & 0 deletions extensions/quartz/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,17 @@
-->
</dependency>

<!-- OpenTelemetry tracing -->
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-instrumentation-api</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-instrumentation-api-semconv</artifactId>
<optional>true</optional>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@
import jakarta.enterprise.inject.Instance;
import jakarta.enterprise.inject.Produces;
import jakarta.enterprise.inject.Typed;
import jakarta.enterprise.inject.spi.CDI;
import jakarta.inject.Singleton;
import jakarta.interceptor.Interceptor;
import jakarta.transaction.SystemException;
import jakarta.transaction.UserTransaction;

import org.eclipse.microprofile.config.ConfigProvider;
import org.jboss.logging.Logger;
import org.quartz.CronScheduleBuilder;
import org.quartz.Job;
Expand Down Expand Up @@ -65,6 +67,7 @@

import io.quarkus.arc.Subclass;
import io.quarkus.quartz.QuartzScheduler;
import io.quarkus.quartz.runtime.tracing.WithSpanJob;
import io.quarkus.runtime.StartupEvent;
import io.quarkus.scheduler.FailedExecution;
import io.quarkus.scheduler.Scheduled;
Expand All @@ -84,7 +87,9 @@
import io.quarkus.scheduler.common.runtime.ScheduledMethod;
import io.quarkus.scheduler.common.runtime.SchedulerContext;
import io.quarkus.scheduler.common.runtime.SyntheticScheduled;
import io.quarkus.scheduler.common.runtime.WithSpanInvoker;
import io.quarkus.scheduler.common.runtime.util.SchedulerUtils;
import io.quarkus.scheduler.runtime.SchedulerConfig;
import io.quarkus.scheduler.runtime.SchedulerRuntimeConfig;
import io.quarkus.scheduler.runtime.SchedulerRuntimeConfig.StartMode;
import io.quarkus.scheduler.runtime.SimpleScheduler;
Expand Down Expand Up @@ -123,6 +128,7 @@ public class QuartzSchedulerImpl implements QuartzScheduler {
private final Event<ScheduledJobPaused> scheduledJobPausedEvent;
private final Event<ScheduledJobResumed> scheduledJobResumedEvent;
private final QuartzRuntimeConfig runtimeConfig;
private final SchedulerConfig schedulerConfig;

public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport,
SchedulerRuntimeConfig schedulerRuntimeConfig,
Expand All @@ -131,7 +137,8 @@ public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport
Event<SchedulerResumed> schedulerResumedEvent, Event<ScheduledJobPaused> scheduledJobPausedEvent,
Event<ScheduledJobResumed> scheduledJobResumedEvent,
Instance<Job> jobs, Instance<UserTransaction> userTransaction,
Vertx vertx) {
Vertx vertx,
SchedulerConfig schedulerConfig) {
this.shutdownWaitTime = quartzSupport.getRuntimeConfig().shutdownWaitTime;
this.skippedExecutionEvent = skippedExecutionEvent;
this.successExecutionEvent = successExecutionEvent;
Expand All @@ -143,6 +150,7 @@ public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport
this.runtimeConfig = quartzSupport.getRuntimeConfig();
this.enabled = schedulerRuntimeConfig.enabled;
this.defaultOverdueGracePeriod = schedulerRuntimeConfig.overdueGracePeriod;
this.schedulerConfig = schedulerConfig;

StartMode startMode = initStartMode(schedulerRuntimeConfig, runtimeConfig);

Expand Down Expand Up @@ -196,7 +204,8 @@ public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport
scheduler = schedulerFactory.getScheduler();

// Set custom job factory
scheduler.setJobFactory(new InvokerJobFactory(scheduledTasks, jobs, vertx));
scheduler.setJobFactory(
new InvokerJobFactory(scheduledTasks, jobs, vertx, schedulerConfig));

if (transaction != null) {
transaction.begin();
Expand Down Expand Up @@ -757,6 +766,14 @@ private Optional<TriggerBuilder<?>> createTrigger(String identity, Scheduled sch
return Optional.of(triggerBuilder);
}

private static boolean isOTelTracingEnabled() {
return ConfigProvider.getConfig().getValue("quarkus.otel.enabled", Boolean.class);
}

private static boolean isOTelExtensionAvailable() {
return CDI.current().select(io.opentelemetry.api.OpenTelemetry.class).isResolvable();
}

class QuartzJobDefinition extends AbstractJobDefinition {

QuartzJobDefinition(String id) {
Expand Down Expand Up @@ -791,6 +808,7 @@ public boolean isRunningOnVirtualThread() {
};
} else {
invoker = new DefaultInvoker() {

@Override
public CompletionStage<Void> invokeBean(ScheduledExecution execution) {
try {
Expand All @@ -807,6 +825,7 @@ public boolean isBlocking() {

};
}

Scheduled scheduled = new SyntheticScheduled(identity, cron, every, 0, TimeUnit.MINUTES, delayed,
overdueGracePeriod, concurrentExecution, skipPredicate, timeZone);

Expand All @@ -816,6 +835,9 @@ public boolean isBlocking() {
if (triggerBuilder.isPresent()) {
invoker = SimpleScheduler.initInvoker(invoker, skippedExecutionEvent, successExecutionEvent,
failedExecutionEvent, concurrentExecution, skipPredicate);
if (schedulerConfig.tracingEnabled && isOTelExtensionAvailable() && isOTelTracingEnabled()) {
invoker = new WithSpanInvoker(invoker);
}
org.quartz.Trigger trigger = triggerBuilder.get().build();
QuartzTrigger existing = scheduledTasks.putIfAbsent(identity, new QuartzTrigger(trigger.getKey(),
new Function<>() {
Expand Down Expand Up @@ -895,6 +917,7 @@ public void run() {
});
} else {
context.executeBlocking(new Callable<Object>() {

@Override
public Object call() throws Exception {
return trigger.invoker.invoke(new QuartzScheduledExecution(trigger, jobExecutionContext));
Expand All @@ -918,9 +941,9 @@ public void handle(Void event) {
}
} else {
String jobName = jobExecutionContext.getJobDetail().getKey().getName();
LOGGER.warnf("Unable to find corresponding Quartz trigger for job %s. " +
"Update your Quartz table by removing all phantom jobs or make sure that there is a " +
"Scheduled method with the identity matching the job's name", jobName);
LOGGER.warnf("Unable to find corresponding Quartz trigger for job %s. "
+ "Update your Quartz table by removing all phantom jobs or make sure that there is a "
+ "Scheduled method with the identity matching the job's name", jobName);
}
}
}
Expand Down Expand Up @@ -1018,11 +1041,15 @@ static class InvokerJobFactory extends SimpleJobFactory {
final Map<String, QuartzTrigger> scheduledTasks;
final Instance<Job> jobs;
final Vertx vertx;
final SchedulerConfig schedulerConfig;

InvokerJobFactory(Map<String, QuartzTrigger> scheduledTasks, Instance<Job> jobs, Vertx vertx) {
InvokerJobFactory(Map<String, QuartzTrigger> scheduledTasks, Instance<Job> jobs, Vertx vertx,
SchedulerConfig schedulerConfig) {
this.scheduledTasks = scheduledTasks;
this.jobs = jobs;
this.vertx = vertx;
this.schedulerConfig = schedulerConfig;

}

@SuppressWarnings("unchecked")
Expand All @@ -1039,10 +1066,20 @@ public Job newJob(TriggerFiredBundle bundle, org.quartz.Scheduler Scheduler) thr
}
Instance<?> instance = jobs.select(jobClass);
if (instance.isResolvable()) {
return (Job) instance.get();
return jobWithSpanWrapper((Job) instance.get());
}
return jobWithSpanWrapper(super.newJob(bundle, Scheduler));
}

private Job jobWithSpanWrapper(Job job) {
if (schedulerConfig.tracingEnabled && isOTelExtensionAvailable() && isOTelTracingEnabled()) {
return new WithSpanJob(job, isCapturingExperimentalSpanAttributesDisabled());
}
return super.newJob(bundle, Scheduler);
return job;
}

private boolean isCapturingExperimentalSpanAttributesDisabled() {
return ConfigProvider.getConfig().getValue("quarkus.otel.experimental.attributes.disabled", Boolean.class);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.quarkus.quartz.runtime.tracing;

import org.quartz.JobExecutionContext;

import io.opentelemetry.instrumentation.api.instrumenter.code.CodeAttributesGetter;

public final class QuartzCodeAttributesGetter implements CodeAttributesGetter<JobExecutionContext> {

@Override
public Class<?> getCodeClass(JobExecutionContext jobExecutionContext) {
return jobExecutionContext.getJobDetail().getJobClass();
}

@Override
public String getMethodName(JobExecutionContext jobExecutionContext) {
return "execute";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.quarkus.quartz.runtime.tracing;

import org.quartz.SchedulerException;

import io.opentelemetry.instrumentation.api.instrumenter.ErrorCauseExtractor;

public final class QuartzErrorCauseExtractor implements ErrorCauseExtractor {
@Override
public Throwable extract(Throwable error) {
Throwable userError = error;
while (userError instanceof SchedulerException) {
userError = ((SchedulerException) userError).getUnderlyingException();
}
return userError;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.quarkus.quartz.runtime.tracing;

import org.quartz.JobExecutionContext;
import org.quartz.JobKey;

import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;

public final class QuartzSpanNameExtractor implements SpanNameExtractor<JobExecutionContext> {
@Override
public String extract(JobExecutionContext job) {
JobKey key = job.getJobDetail().getKey();
return key.getGroup() + '.' + key.getName();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package io.quarkus.quartz.runtime.tracing;

import jakarta.enterprise.inject.spi.CDI;

import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.code.CodeAttributesExtractor;

public class WithSpanJob implements Job {
private final Job delegate;

private final Instrumenter<JobExecutionContext, Void> instrumenter;

public WithSpanJob(Job delegate, boolean captureExperimentalSpanAttributes) {
this.delegate = delegate;
InstrumenterBuilder<JobExecutionContext, Void> jobInstrumenterBuilder = Instrumenter.builder(
CDI.current().select(OpenTelemetry.class).get(), "io.quarkus.opentelemetry",
new QuartzSpanNameExtractor());

if (!captureExperimentalSpanAttributes) {
jobInstrumenterBuilder.addAttributesExtractor(
AttributesExtractor.constant(AttributeKey.stringKey("job.system"), "quartz"));
}
jobInstrumenterBuilder.setErrorCauseExtractor(new QuartzErrorCauseExtractor());
jobInstrumenterBuilder.addAttributesExtractor(
CodeAttributesExtractor.create(new QuartzCodeAttributesGetter()));
this.instrumenter = jobInstrumenterBuilder.buildInstrumenter();
}

@Override
public void execute(JobExecutionContext job) throws JobExecutionException {
Context parentCtx = Context.current();
Context context = instrumenter.start(parentCtx, job);
Throwable t = null;
try (Scope scope = context.makeCurrent()) {
delegate.execute(job);
} catch (Throwable throwable) {
t = throwable;
throw throwable;
} finally {
instrumenter.end(context, job, null, t);
}
}
}
4 changes: 4 additions & 0 deletions extensions/scheduler/common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-scheduler-api</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-instrumentation-api</artifactId>
</dependency>
<dependency>
<groupId>com.cronutils</groupId>
<artifactId>cron-utils</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package io.quarkus.scheduler.common.runtime;

import java.util.concurrent.CompletionStage;

import jakarta.enterprise.inject.spi.CDI;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.instrumenter.ErrorCauseExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;
import io.quarkus.scheduler.ScheduledExecution;

public class WithSpanInvoker extends DelegateInvoker {
private final Instrumenter<ScheduledExecution, Void> instrumenter;

public WithSpanInvoker(ScheduledInvoker delegate) {
super(delegate);
InstrumenterBuilder<ScheduledExecution, Void> instrumenterBuilder = Instrumenter.builder(
CDI.current().select(OpenTelemetry.class).get(), "io.quarkus.opentelemetry",
new SpanNameExtractor<ScheduledExecution>() {
@Override
public String extract(ScheduledExecution scheduledExecution) {
return scheduledExecution.getTrigger().getId();
}
});
instrumenterBuilder.setErrorCauseExtractor(new ErrorCauseExtractor() {
@Override
public Throwable extract(Throwable throwable) {
return throwable;
}
});

this.instrumenter = instrumenterBuilder.buildInstrumenter();
}

@Override
public CompletionStage<Void> invoke(ScheduledExecution execution) throws Exception {
Context parentCtx = Context.current();
Context context = instrumenter.start(parentCtx, execution);
try (Scope scope = context.makeCurrent()) {
return delegate
.invoke(execution)
.whenComplete(
(result, throwable) -> instrumenter.end(context, execution, null, throwable));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ public class SchedulerConfig {
public boolean metricsEnabled;

/**
* Tracing will be enabled if the OpenTelemetry extension is present and this value is true.
* Controls whether tracing is enabled. If set to true and the OpenTelemetry extension is present,
* tracing will be enabled, creating automatic Spans for each scheduled task.
*/
@ConfigItem(name = "tracing.enabled")
public boolean tracingEnabled;

}
Loading

0 comments on commit 3f6c563

Please sign in to comment.