Skip to content

Commit

Permalink
Merge pull request #35989 from mskacelik/otel-quartz-implementation
Browse files Browse the repository at this point in the history
Scheduler: OTel Instrumentation feature improvements and refactoring
  • Loading branch information
mkouba authored Nov 29, 2023
2 parents 44876a3 + efea7db commit 947d481
Show file tree
Hide file tree
Showing 39 changed files with 1,329 additions and 43 deletions.
5 changes: 5 additions & 0 deletions bom/application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2049,6 +2049,11 @@
<artifactId>quarkus-scheduler-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-scheduler-spi</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-scheduler-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.quarkus.opentelemetry.deployment.scheduler;

import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
import io.quarkus.deployment.Capabilities;
import io.quarkus.deployment.Capability;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.opentelemetry.deployment.OpenTelemetryEnabled;
import io.quarkus.opentelemetry.runtime.scheduler.OpenTelemetryJobInstrumenter;

public class OpenTelemetrySchedulerProcessor {

@BuildStep(onlyIf = OpenTelemetryEnabled.class)
void registerJobInstrumenter(Capabilities capabilities, BuildProducer<AdditionalBeanBuildItem> beans) {
if (capabilities.isPresent(Capability.SCHEDULER)) {
beans.produce(new AdditionalBeanBuildItem(OpenTelemetryJobInstrumenter.class));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -146,5 +146,4 @@ void resteasyReactiveIntegration(
preExceptionMapperHandlerBuildItemBuildProducer
.produce(new PreExceptionMapperHandlerBuildItem(new AttachExceptionHandler()));
}

}
5 changes: 5 additions & 0 deletions extensions/opentelemetry/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@
<artifactId>quarkus-smallrye-reactive-messaging</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-scheduler-spi</artifactId>
<optional>true</optional>
</dependency>

<!-- OpenTelemetry Dependencies -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package io.quarkus.opentelemetry.runtime.scheduler;

import java.util.concurrent.CompletionStage;

import jakarta.inject.Singleton;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.instrumenter.ErrorCauseExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;
import io.quarkus.scheduler.spi.JobInstrumenter;

@Singleton
public class OpenTelemetryJobInstrumenter implements JobInstrumenter {

private final Instrumenter<JobInstrumentationContext, Void> instrumenter;

public OpenTelemetryJobInstrumenter(OpenTelemetry openTelemetry) {
InstrumenterBuilder<JobInstrumentationContext, Void> instrumenterBuilder = Instrumenter.builder(
openTelemetry, "io.quarkus.opentelemetry",
new SpanNameExtractor<JobInstrumentationContext>() {
@Override
public String extract(JobInstrumentationContext context) {
return context.getSpanName();
}
});
instrumenterBuilder.setErrorCauseExtractor(new ErrorCauseExtractor() {
@Override
public Throwable extract(Throwable throwable) {
return throwable;
}
});
this.instrumenter = instrumenterBuilder.buildInstrumenter();
}

@Override
public CompletionStage<Void> instrument(JobInstrumentationContext instrumentationContext) {
Context parentCtx = Context.current();
Context context = instrumenter.start(parentCtx, instrumentationContext);
try (Scope scope = context.makeCurrent()) {
return instrumentationContext
.executeJob()
.whenComplete(
(result, throwable) -> instrumenter.end(context, instrumentationContext, null, throwable));
}
}

}
11 changes: 11 additions & 0 deletions extensions/quartz/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,17 @@
-->
</dependency>

<!-- OpenTelemetry tracing -->
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-instrumentation-api</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-instrumentation-api-semconv</artifactId>
<optional>true</optional>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package io.quarkus.quartz.runtime;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.JobKey;

import io.quarkus.scheduler.spi.JobInstrumenter;
import io.quarkus.scheduler.spi.JobInstrumenter.JobInstrumentationContext;

/**
*
* @see JobInstrumenter
*/
class InstrumentedJob implements Job {

private final Job delegate;
private final JobInstrumenter instrumenter;

InstrumentedJob(Job delegate, JobInstrumenter instrumenter) {
this.delegate = delegate;
this.instrumenter = instrumenter;
}

@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
instrumenter.instrument(new JobInstrumentationContext() {

@Override
public CompletionStage<Void> executeJob() {
try {
delegate.execute(context);
return CompletableFuture.completedFuture(null);
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}

}

@Override
public String getSpanName() {
JobKey key = context.getJobDetail().getKey();
return key.getGroup() + '.' + key.getName();
}
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,11 @@
import io.quarkus.scheduler.common.runtime.SchedulerContext;
import io.quarkus.scheduler.common.runtime.SyntheticScheduled;
import io.quarkus.scheduler.common.runtime.util.SchedulerUtils;
import io.quarkus.scheduler.runtime.SchedulerConfig;
import io.quarkus.scheduler.runtime.SchedulerRuntimeConfig;
import io.quarkus.scheduler.runtime.SchedulerRuntimeConfig.StartMode;
import io.quarkus.scheduler.runtime.SimpleScheduler;
import io.quarkus.scheduler.spi.JobInstrumenter;
import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle;
import io.quarkus.virtual.threads.VirtualThreadsRecorder;
import io.smallrye.common.vertx.VertxContext;
Expand Down Expand Up @@ -123,6 +125,8 @@ public class QuartzSchedulerImpl implements QuartzScheduler {
private final Event<ScheduledJobPaused> scheduledJobPausedEvent;
private final Event<ScheduledJobResumed> scheduledJobResumedEvent;
private final QuartzRuntimeConfig runtimeConfig;
private final SchedulerConfig schedulerConfig;
private final Instance<JobInstrumenter> jobInstrumenter;

public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport,
SchedulerRuntimeConfig schedulerRuntimeConfig,
Expand All @@ -131,7 +135,8 @@ public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport
Event<SchedulerResumed> schedulerResumedEvent, Event<ScheduledJobPaused> scheduledJobPausedEvent,
Event<ScheduledJobResumed> scheduledJobResumedEvent,
Instance<Job> jobs, Instance<UserTransaction> userTransaction,
Vertx vertx) {
Vertx vertx,
SchedulerConfig schedulerConfig, Instance<JobInstrumenter> jobInstrumenter) {
this.shutdownWaitTime = quartzSupport.getRuntimeConfig().shutdownWaitTime;
this.skippedExecutionEvent = skippedExecutionEvent;
this.successExecutionEvent = successExecutionEvent;
Expand All @@ -143,6 +148,8 @@ public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport
this.runtimeConfig = quartzSupport.getRuntimeConfig();
this.enabled = schedulerRuntimeConfig.enabled;
this.defaultOverdueGracePeriod = schedulerRuntimeConfig.overdueGracePeriod;
this.schedulerConfig = schedulerConfig;
this.jobInstrumenter = jobInstrumenter;

StartMode startMode = initStartMode(schedulerRuntimeConfig, runtimeConfig);

Expand Down Expand Up @@ -176,6 +183,11 @@ public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport
CronDefinition def = CronDefinitionBuilder.instanceDefinitionFor(cronType);
cronParser = new CronParser(def);

JobInstrumenter instrumenter = null;
if (schedulerConfig.tracingEnabled && jobInstrumenter.isResolvable()) {
instrumenter = jobInstrumenter.get();
}

if (!enabled) {
LOGGER.info("Quartz scheduler is disabled by config property and will not be started");
this.scheduler = null;
Expand All @@ -196,7 +208,8 @@ public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport
scheduler = schedulerFactory.getScheduler();

// Set custom job factory
scheduler.setJobFactory(new InvokerJobFactory(scheduledTasks, jobs, vertx));
scheduler.setJobFactory(
new InvokerJobFactory(scheduledTasks, jobs, vertx, instrumenter));

if (transaction != null) {
transaction.begin();
Expand All @@ -209,11 +222,12 @@ public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport
if (identity.isEmpty()) {
identity = ++nameSequence + "_" + method.getInvokerClassName();
}

ScheduledInvoker invoker = SimpleScheduler.initInvoker(
context.createInvoker(method.getInvokerClassName()),
skippedExecutionEvent, successExecutionEvent, failedExecutionEvent,
scheduled.concurrentExecution(),
SimpleScheduler.initSkipPredicate(scheduled.skipExecutionIf()));
SimpleScheduler.initSkipPredicate(scheduled.skipExecutionIf()), instrumenter);

JobDetail jobDetail = createJobDetail(identity, method.getInvokerClassName());
Optional<TriggerBuilder<?>> triggerBuilder = createTrigger(identity, scheduled, cronType, runtimeConfig,
Expand Down Expand Up @@ -791,6 +805,7 @@ public boolean isRunningOnVirtualThread() {
};
} else {
invoker = new DefaultInvoker() {

@Override
public CompletionStage<Void> invokeBean(ScheduledExecution execution) {
try {
Expand All @@ -807,15 +822,20 @@ public boolean isBlocking() {

};
}

Scheduled scheduled = new SyntheticScheduled(identity, cron, every, 0, TimeUnit.MINUTES, delayed,
overdueGracePeriod, concurrentExecution, skipPredicate, timeZone);

JobDetail jobDetail = createJobDetail(identity, QuartzSchedulerImpl.class.getName());
Optional<TriggerBuilder<?>> triggerBuilder = createTrigger(identity, scheduled, cronType, runtimeConfig, jobDetail);

if (triggerBuilder.isPresent()) {
JobInstrumenter instrumenter = null;
if (schedulerConfig.tracingEnabled && jobInstrumenter.isResolvable()) {
instrumenter = jobInstrumenter.get();
}
invoker = SimpleScheduler.initInvoker(invoker, skippedExecutionEvent, successExecutionEvent,
failedExecutionEvent, concurrentExecution, skipPredicate);
failedExecutionEvent, concurrentExecution, skipPredicate, instrumenter);
org.quartz.Trigger trigger = triggerBuilder.get().build();
QuartzTrigger existing = scheduledTasks.putIfAbsent(identity, new QuartzTrigger(trigger.getKey(),
new Function<>() {
Expand Down Expand Up @@ -895,6 +915,7 @@ public void run() {
});
} else {
context.executeBlocking(new Callable<Object>() {

@Override
public Object call() throws Exception {
return trigger.invoker.invoke(new QuartzScheduledExecution(trigger, jobExecutionContext));
Expand All @@ -918,9 +939,9 @@ public void handle(Void event) {
}
} else {
String jobName = jobExecutionContext.getJobDetail().getKey().getName();
LOGGER.warnf("Unable to find corresponding Quartz trigger for job %s. " +
"Update your Quartz table by removing all phantom jobs or make sure that there is a " +
"Scheduled method with the identity matching the job's name", jobName);
LOGGER.warnf("Unable to find corresponding Quartz trigger for job %s. "
+ "Update your Quartz table by removing all phantom jobs or make sure that there is a "
+ "Scheduled method with the identity matching the job's name", jobName);
}
}
}
Expand Down Expand Up @@ -1018,11 +1039,15 @@ static class InvokerJobFactory extends SimpleJobFactory {
final Map<String, QuartzTrigger> scheduledTasks;
final Instance<Job> jobs;
final Vertx vertx;
final JobInstrumenter instrumenter;

InvokerJobFactory(Map<String, QuartzTrigger> scheduledTasks, Instance<Job> jobs, Vertx vertx) {
InvokerJobFactory(Map<String, QuartzTrigger> scheduledTasks, Instance<Job> jobs, Vertx vertx,
JobInstrumenter instrumenter) {
this.scheduledTasks = scheduledTasks;
this.jobs = jobs;
this.vertx = vertx;
this.instrumenter = instrumenter;

}

@SuppressWarnings("unchecked")
Expand All @@ -1039,9 +1064,16 @@ public Job newJob(TriggerFiredBundle bundle, org.quartz.Scheduler Scheduler) thr
}
Instance<?> instance = jobs.select(jobClass);
if (instance.isResolvable()) {
return (Job) instance.get();
return jobWithSpanWrapper((Job) instance.get());
}
return jobWithSpanWrapper(super.newJob(bundle, Scheduler));
}

private Job jobWithSpanWrapper(Job job) {
if (instrumenter != null) {
return new InstrumentedJob(job, instrumenter);
}
return super.newJob(bundle, Scheduler);
return job;
}

}
Expand Down
8 changes: 8 additions & 0 deletions extensions/scheduler/common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-scheduler-api</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-scheduler-spi</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-instrumentation-api</artifactId>
</dependency>
<dependency>
<groupId>com.cronutils</groupId>
<artifactId>cron-utils</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.quarkus.scheduler.common.runtime;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import io.quarkus.scheduler.ScheduledExecution;
import io.quarkus.scheduler.spi.JobInstrumenter;
import io.quarkus.scheduler.spi.JobInstrumenter.JobInstrumentationContext;

/**
*
* @see JobInstrumenter
*/
public class InstrumentedInvoker extends DelegateInvoker {

private final JobInstrumenter instrumenter;

public InstrumentedInvoker(ScheduledInvoker delegate, JobInstrumenter instrumenter) {
super(delegate);
this.instrumenter = instrumenter;
}

@Override
public CompletionStage<Void> invoke(ScheduledExecution execution) throws Exception {
return instrumenter.instrument(new JobInstrumentationContext() {

@Override
public CompletionStage<Void> executeJob() {
try {
return delegate.invoke(execution);
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
}

@Override
public String getSpanName() {
return execution.getTrigger().getId();
}
});
}

}
Loading

0 comments on commit 947d481

Please sign in to comment.