Skip to content

Commit

Permalink
Scheduler: introduce SPI module and JobInstrumenter
Browse files Browse the repository at this point in the history
- implement JobInstrumenter in OTel extension
- remove the annotation transformer that adds OTel @WithSpan to
@scheduled methods
  • Loading branch information
mkouba authored and mskacelik committed Nov 23, 2023
1 parent 3f6c563 commit 0b3bbfd
Show file tree
Hide file tree
Showing 17 changed files with 235 additions and 187 deletions.
5 changes: 5 additions & 0 deletions bom/application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2038,6 +2038,11 @@
<artifactId>quarkus-scheduler-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-scheduler-spi</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-scheduler-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.quarkus.opentelemetry.deployment.scheduler;

import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
import io.quarkus.deployment.Capabilities;
import io.quarkus.deployment.Capability;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.opentelemetry.deployment.OpenTelemetryEnabled;
import io.quarkus.opentelemetry.runtime.scheduler.OpenTelemetryJobInstrumenter;

public class OpenTelemetrySchedulerProcessor {

@BuildStep(onlyIf = OpenTelemetryEnabled.class)
void registerJobInstrumenter(Capabilities capabilities, BuildProducer<AdditionalBeanBuildItem> beans) {
if (capabilities.isPresent(Capability.SCHEDULER)) {
beans.produce(new AdditionalBeanBuildItem(OpenTelemetryJobInstrumenter.class));
}
}

}
5 changes: 5 additions & 0 deletions extensions/opentelemetry/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@
<artifactId>quarkus-smallrye-reactive-messaging</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-scheduler-spi</artifactId>
<optional>true</optional>
</dependency>

<!-- OpenTelemetry Dependencies -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package io.quarkus.scheduler.common.runtime;
package io.quarkus.opentelemetry.runtime.scheduler;

import java.util.concurrent.CompletionStage;

import jakarta.enterprise.inject.spi.CDI;
import jakarta.inject.Singleton;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.context.Context;
Expand All @@ -11,19 +11,20 @@
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;
import io.quarkus.scheduler.ScheduledExecution;
import io.quarkus.scheduler.spi.JobInstrumenter;

public class WithSpanInvoker extends DelegateInvoker {
private final Instrumenter<ScheduledExecution, Void> instrumenter;
@Singleton
public class OpenTelemetryJobInstrumenter implements JobInstrumenter {

public WithSpanInvoker(ScheduledInvoker delegate) {
super(delegate);
InstrumenterBuilder<ScheduledExecution, Void> instrumenterBuilder = Instrumenter.builder(
CDI.current().select(OpenTelemetry.class).get(), "io.quarkus.opentelemetry",
new SpanNameExtractor<ScheduledExecution>() {
private final Instrumenter<JobInstrumentationContext, Void> instrumenter;

public OpenTelemetryJobInstrumenter(OpenTelemetry openTelemetry) {
InstrumenterBuilder<JobInstrumentationContext, Void> instrumenterBuilder = Instrumenter.builder(
openTelemetry, "io.quarkus.opentelemetry",
new SpanNameExtractor<JobInstrumentationContext>() {
@Override
public String extract(ScheduledExecution scheduledExecution) {
return scheduledExecution.getTrigger().getId();
public String extract(JobInstrumentationContext context) {
return context.getSpanName();
}
});
instrumenterBuilder.setErrorCauseExtractor(new ErrorCauseExtractor() {
Expand All @@ -32,19 +33,19 @@ public Throwable extract(Throwable throwable) {
return throwable;
}
});

this.instrumenter = instrumenterBuilder.buildInstrumenter();
}

@Override
public CompletionStage<Void> invoke(ScheduledExecution execution) throws Exception {
public CompletionStage<Void> instrument(JobInstrumentationContext instrumentationContext) {
Context parentCtx = Context.current();
Context context = instrumenter.start(parentCtx, execution);
Context context = instrumenter.start(parentCtx, instrumentationContext);
try (Scope scope = context.makeCurrent()) {
return delegate
.invoke(execution)
return instrumentationContext
.executeJob()
.whenComplete(
(result, throwable) -> instrumenter.end(context, execution, null, throwable));
(result, throwable) -> instrumenter.end(context, instrumentationContext, null, throwable));
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package io.quarkus.quartz.runtime;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.JobKey;

import io.quarkus.scheduler.spi.JobInstrumenter;
import io.quarkus.scheduler.spi.JobInstrumenter.JobInstrumentationContext;

/**
*
* @see JobInstrumenter
*/
class InstrumentedJob implements Job {

private final Job delegate;
private final JobInstrumenter instrumenter;

InstrumentedJob(Job delegate, JobInstrumenter instrumenter) {
this.delegate = delegate;
this.instrumenter = instrumenter;
}

@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
instrumenter.instrument(new JobInstrumentationContext() {

@Override
public CompletionStage<Void> executeJob() {
try {
delegate.execute(context);
return CompletableFuture.completedFuture(null);
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}

}

@Override
public String getSpanName() {
JobKey key = context.getJobDetail().getKey();
return key.getGroup() + '.' + key.getName();
}
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,11 @@
import jakarta.enterprise.inject.Instance;
import jakarta.enterprise.inject.Produces;
import jakarta.enterprise.inject.Typed;
import jakarta.enterprise.inject.spi.CDI;
import jakarta.inject.Singleton;
import jakarta.interceptor.Interceptor;
import jakarta.transaction.SystemException;
import jakarta.transaction.UserTransaction;

import org.eclipse.microprofile.config.ConfigProvider;
import org.jboss.logging.Logger;
import org.quartz.CronScheduleBuilder;
import org.quartz.Job;
Expand Down Expand Up @@ -67,7 +65,6 @@

import io.quarkus.arc.Subclass;
import io.quarkus.quartz.QuartzScheduler;
import io.quarkus.quartz.runtime.tracing.WithSpanJob;
import io.quarkus.runtime.StartupEvent;
import io.quarkus.scheduler.FailedExecution;
import io.quarkus.scheduler.Scheduled;
Expand All @@ -87,12 +84,12 @@
import io.quarkus.scheduler.common.runtime.ScheduledMethod;
import io.quarkus.scheduler.common.runtime.SchedulerContext;
import io.quarkus.scheduler.common.runtime.SyntheticScheduled;
import io.quarkus.scheduler.common.runtime.WithSpanInvoker;
import io.quarkus.scheduler.common.runtime.util.SchedulerUtils;
import io.quarkus.scheduler.runtime.SchedulerConfig;
import io.quarkus.scheduler.runtime.SchedulerRuntimeConfig;
import io.quarkus.scheduler.runtime.SchedulerRuntimeConfig.StartMode;
import io.quarkus.scheduler.runtime.SimpleScheduler;
import io.quarkus.scheduler.spi.JobInstrumenter;
import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle;
import io.quarkus.virtual.threads.VirtualThreadsRecorder;
import io.smallrye.common.vertx.VertxContext;
Expand Down Expand Up @@ -129,6 +126,7 @@ public class QuartzSchedulerImpl implements QuartzScheduler {
private final Event<ScheduledJobResumed> scheduledJobResumedEvent;
private final QuartzRuntimeConfig runtimeConfig;
private final SchedulerConfig schedulerConfig;
private final Instance<JobInstrumenter> jobInstrumenter;

public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport,
SchedulerRuntimeConfig schedulerRuntimeConfig,
Expand All @@ -138,7 +136,7 @@ public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport
Event<ScheduledJobResumed> scheduledJobResumedEvent,
Instance<Job> jobs, Instance<UserTransaction> userTransaction,
Vertx vertx,
SchedulerConfig schedulerConfig) {
SchedulerConfig schedulerConfig, Instance<JobInstrumenter> jobInstrumenter) {
this.shutdownWaitTime = quartzSupport.getRuntimeConfig().shutdownWaitTime;
this.skippedExecutionEvent = skippedExecutionEvent;
this.successExecutionEvent = successExecutionEvent;
Expand All @@ -151,6 +149,7 @@ public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport
this.enabled = schedulerRuntimeConfig.enabled;
this.defaultOverdueGracePeriod = schedulerRuntimeConfig.overdueGracePeriod;
this.schedulerConfig = schedulerConfig;
this.jobInstrumenter = jobInstrumenter;

StartMode startMode = initStartMode(schedulerRuntimeConfig, runtimeConfig);

Expand Down Expand Up @@ -184,6 +183,11 @@ public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport
CronDefinition def = CronDefinitionBuilder.instanceDefinitionFor(cronType);
cronParser = new CronParser(def);

JobInstrumenter instrumenter = null;
if (schedulerConfig.tracingEnabled && jobInstrumenter.isResolvable()) {
instrumenter = jobInstrumenter.get();
}

if (!enabled) {
LOGGER.info("Quartz scheduler is disabled by config property and will not be started");
this.scheduler = null;
Expand All @@ -205,7 +209,7 @@ public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport

// Set custom job factory
scheduler.setJobFactory(
new InvokerJobFactory(scheduledTasks, jobs, vertx, schedulerConfig));
new InvokerJobFactory(scheduledTasks, jobs, vertx, instrumenter));

if (transaction != null) {
transaction.begin();
Expand All @@ -218,11 +222,12 @@ public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport
if (identity.isEmpty()) {
identity = ++nameSequence + "_" + method.getInvokerClassName();
}

ScheduledInvoker invoker = SimpleScheduler.initInvoker(
context.createInvoker(method.getInvokerClassName()),
skippedExecutionEvent, successExecutionEvent, failedExecutionEvent,
scheduled.concurrentExecution(),
SimpleScheduler.initSkipPredicate(scheduled.skipExecutionIf()));
SimpleScheduler.initSkipPredicate(scheduled.skipExecutionIf()), instrumenter);

JobDetail jobDetail = createJobDetail(identity, method.getInvokerClassName());
Optional<TriggerBuilder<?>> triggerBuilder = createTrigger(identity, scheduled, cronType, runtimeConfig,
Expand Down Expand Up @@ -766,14 +771,6 @@ private Optional<TriggerBuilder<?>> createTrigger(String identity, Scheduled sch
return Optional.of(triggerBuilder);
}

private static boolean isOTelTracingEnabled() {
return ConfigProvider.getConfig().getValue("quarkus.otel.enabled", Boolean.class);
}

private static boolean isOTelExtensionAvailable() {
return CDI.current().select(io.opentelemetry.api.OpenTelemetry.class).isResolvable();
}

class QuartzJobDefinition extends AbstractJobDefinition {

QuartzJobDefinition(String id) {
Expand Down Expand Up @@ -833,11 +830,12 @@ public boolean isBlocking() {
Optional<TriggerBuilder<?>> triggerBuilder = createTrigger(identity, scheduled, cronType, runtimeConfig, jobDetail);

if (triggerBuilder.isPresent()) {
invoker = SimpleScheduler.initInvoker(invoker, skippedExecutionEvent, successExecutionEvent,
failedExecutionEvent, concurrentExecution, skipPredicate);
if (schedulerConfig.tracingEnabled && isOTelExtensionAvailable() && isOTelTracingEnabled()) {
invoker = new WithSpanInvoker(invoker);
JobInstrumenter instrumenter = null;
if (schedulerConfig.tracingEnabled && jobInstrumenter.isResolvable()) {
instrumenter = jobInstrumenter.get();
}
invoker = SimpleScheduler.initInvoker(invoker, skippedExecutionEvent, successExecutionEvent,
failedExecutionEvent, concurrentExecution, skipPredicate, instrumenter);
org.quartz.Trigger trigger = triggerBuilder.get().build();
QuartzTrigger existing = scheduledTasks.putIfAbsent(identity, new QuartzTrigger(trigger.getKey(),
new Function<>() {
Expand Down Expand Up @@ -1041,14 +1039,14 @@ static class InvokerJobFactory extends SimpleJobFactory {
final Map<String, QuartzTrigger> scheduledTasks;
final Instance<Job> jobs;
final Vertx vertx;
final SchedulerConfig schedulerConfig;
final JobInstrumenter instrumenter;

InvokerJobFactory(Map<String, QuartzTrigger> scheduledTasks, Instance<Job> jobs, Vertx vertx,
SchedulerConfig schedulerConfig) {
JobInstrumenter instrumenter) {
this.scheduledTasks = scheduledTasks;
this.jobs = jobs;
this.vertx = vertx;
this.schedulerConfig = schedulerConfig;
this.instrumenter = instrumenter;

}

Expand All @@ -1072,14 +1070,11 @@ public Job newJob(TriggerFiredBundle bundle, org.quartz.Scheduler Scheduler) thr
}

private Job jobWithSpanWrapper(Job job) {
if (schedulerConfig.tracingEnabled && isOTelExtensionAvailable() && isOTelTracingEnabled()) {
return new WithSpanJob(job, isCapturingExperimentalSpanAttributesDisabled());
if (instrumenter != null) {
return new InstrumentedJob(job, instrumenter);
}
return job;
}

private boolean isCapturingExperimentalSpanAttributesDisabled() {
return ConfigProvider.getConfig().getValue("quarkus.otel.experimental.attributes.disabled", Boolean.class);
}
}
}

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit 0b3bbfd

Please sign in to comment.