Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler: OTel Instrumentation feature improvements and refactoring #35989

Merged
merged 1 commit into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions bom/application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2049,6 +2049,11 @@
<artifactId>quarkus-scheduler-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-scheduler-spi</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-scheduler-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.quarkus.opentelemetry.deployment.scheduler;

import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
import io.quarkus.deployment.Capabilities;
import io.quarkus.deployment.Capability;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.opentelemetry.deployment.OpenTelemetryEnabled;
import io.quarkus.opentelemetry.runtime.scheduler.OpenTelemetryJobInstrumenter;

public class OpenTelemetrySchedulerProcessor {

@BuildStep(onlyIf = OpenTelemetryEnabled.class)
void registerJobInstrumenter(Capabilities capabilities, BuildProducer<AdditionalBeanBuildItem> beans) {
if (capabilities.isPresent(Capability.SCHEDULER)) {
beans.produce(new AdditionalBeanBuildItem(OpenTelemetryJobInstrumenter.class));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -146,5 +146,4 @@ void resteasyReactiveIntegration(
preExceptionMapperHandlerBuildItemBuildProducer
.produce(new PreExceptionMapperHandlerBuildItem(new AttachExceptionHandler()));
}

}
5 changes: 5 additions & 0 deletions extensions/opentelemetry/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@
<artifactId>quarkus-smallrye-reactive-messaging</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-scheduler-spi</artifactId>
<optional>true</optional>
</dependency>

<!-- OpenTelemetry Dependencies -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package io.quarkus.opentelemetry.runtime.scheduler;

import java.util.concurrent.CompletionStage;

import jakarta.inject.Singleton;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.instrumenter.ErrorCauseExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;
import io.quarkus.scheduler.spi.JobInstrumenter;

@Singleton
public class OpenTelemetryJobInstrumenter implements JobInstrumenter {

private final Instrumenter<JobInstrumentationContext, Void> instrumenter;

public OpenTelemetryJobInstrumenter(OpenTelemetry openTelemetry) {
InstrumenterBuilder<JobInstrumentationContext, Void> instrumenterBuilder = Instrumenter.builder(
openTelemetry, "io.quarkus.opentelemetry",
new SpanNameExtractor<JobInstrumentationContext>() {
@Override
public String extract(JobInstrumentationContext context) {
return context.getSpanName();
}
});
instrumenterBuilder.setErrorCauseExtractor(new ErrorCauseExtractor() {
@Override
public Throwable extract(Throwable throwable) {
return throwable;
}
});
this.instrumenter = instrumenterBuilder.buildInstrumenter();
}

@Override
public CompletionStage<Void> instrument(JobInstrumentationContext instrumentationContext) {
Context parentCtx = Context.current();
Context context = instrumenter.start(parentCtx, instrumentationContext);
try (Scope scope = context.makeCurrent()) {
return instrumentationContext
.executeJob()
.whenComplete(
(result, throwable) -> instrumenter.end(context, instrumentationContext, null, throwable));
}
}

}
11 changes: 11 additions & 0 deletions extensions/quartz/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,17 @@
-->
</dependency>

<!-- OpenTelemetry tracing -->
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
mkouba marked this conversation as resolved.
Show resolved Hide resolved
<artifactId>opentelemetry-instrumentation-api</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-instrumentation-api-semconv</artifactId>
<optional>true</optional>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package io.quarkus.quartz.runtime;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.JobKey;

import io.quarkus.scheduler.spi.JobInstrumenter;
import io.quarkus.scheduler.spi.JobInstrumenter.JobInstrumentationContext;

/**
*
* @see JobInstrumenter
*/
class InstrumentedJob implements Job {

private final Job delegate;
private final JobInstrumenter instrumenter;

InstrumentedJob(Job delegate, JobInstrumenter instrumenter) {
this.delegate = delegate;
this.instrumenter = instrumenter;
}

@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
instrumenter.instrument(new JobInstrumentationContext() {

@Override
public CompletionStage<Void> executeJob() {
try {
delegate.execute(context);
return CompletableFuture.completedFuture(null);
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}

}

@Override
public String getSpanName() {
JobKey key = context.getJobDetail().getKey();
return key.getGroup() + '.' + key.getName();
}
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,11 @@
import io.quarkus.scheduler.common.runtime.SchedulerContext;
import io.quarkus.scheduler.common.runtime.SyntheticScheduled;
import io.quarkus.scheduler.common.runtime.util.SchedulerUtils;
import io.quarkus.scheduler.runtime.SchedulerConfig;
import io.quarkus.scheduler.runtime.SchedulerRuntimeConfig;
import io.quarkus.scheduler.runtime.SchedulerRuntimeConfig.StartMode;
import io.quarkus.scheduler.runtime.SimpleScheduler;
import io.quarkus.scheduler.spi.JobInstrumenter;
import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle;
import io.quarkus.virtual.threads.VirtualThreadsRecorder;
import io.smallrye.common.vertx.VertxContext;
Expand Down Expand Up @@ -123,6 +125,8 @@ public class QuartzSchedulerImpl implements QuartzScheduler {
private final Event<ScheduledJobPaused> scheduledJobPausedEvent;
private final Event<ScheduledJobResumed> scheduledJobResumedEvent;
private final QuartzRuntimeConfig runtimeConfig;
private final SchedulerConfig schedulerConfig;
private final Instance<JobInstrumenter> jobInstrumenter;

public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport,
SchedulerRuntimeConfig schedulerRuntimeConfig,
Expand All @@ -131,7 +135,8 @@ public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport
Event<SchedulerResumed> schedulerResumedEvent, Event<ScheduledJobPaused> scheduledJobPausedEvent,
Event<ScheduledJobResumed> scheduledJobResumedEvent,
Instance<Job> jobs, Instance<UserTransaction> userTransaction,
Vertx vertx) {
Vertx vertx,
SchedulerConfig schedulerConfig, Instance<JobInstrumenter> jobInstrumenter) {
this.shutdownWaitTime = quartzSupport.getRuntimeConfig().shutdownWaitTime;
this.skippedExecutionEvent = skippedExecutionEvent;
this.successExecutionEvent = successExecutionEvent;
Expand All @@ -143,6 +148,8 @@ public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport
this.runtimeConfig = quartzSupport.getRuntimeConfig();
this.enabled = schedulerRuntimeConfig.enabled;
this.defaultOverdueGracePeriod = schedulerRuntimeConfig.overdueGracePeriod;
this.schedulerConfig = schedulerConfig;
this.jobInstrumenter = jobInstrumenter;

StartMode startMode = initStartMode(schedulerRuntimeConfig, runtimeConfig);

Expand Down Expand Up @@ -176,6 +183,11 @@ public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport
CronDefinition def = CronDefinitionBuilder.instanceDefinitionFor(cronType);
cronParser = new CronParser(def);

JobInstrumenter instrumenter = null;
if (schedulerConfig.tracingEnabled && jobInstrumenter.isResolvable()) {
instrumenter = jobInstrumenter.get();
}

if (!enabled) {
LOGGER.info("Quartz scheduler is disabled by config property and will not be started");
this.scheduler = null;
Expand All @@ -196,7 +208,8 @@ public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport
scheduler = schedulerFactory.getScheduler();

// Set custom job factory
scheduler.setJobFactory(new InvokerJobFactory(scheduledTasks, jobs, vertx));
scheduler.setJobFactory(
new InvokerJobFactory(scheduledTasks, jobs, vertx, instrumenter));

if (transaction != null) {
transaction.begin();
Expand All @@ -209,11 +222,12 @@ public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport
if (identity.isEmpty()) {
identity = ++nameSequence + "_" + method.getInvokerClassName();
}

ScheduledInvoker invoker = SimpleScheduler.initInvoker(
context.createInvoker(method.getInvokerClassName()),
skippedExecutionEvent, successExecutionEvent, failedExecutionEvent,
scheduled.concurrentExecution(),
SimpleScheduler.initSkipPredicate(scheduled.skipExecutionIf()));
SimpleScheduler.initSkipPredicate(scheduled.skipExecutionIf()), instrumenter);

JobDetail jobDetail = createJobDetail(identity, method.getInvokerClassName());
Optional<TriggerBuilder<?>> triggerBuilder = createTrigger(identity, scheduled, cronType, runtimeConfig,
Expand Down Expand Up @@ -791,6 +805,7 @@ public boolean isRunningOnVirtualThread() {
};
} else {
invoker = new DefaultInvoker() {

@Override
public CompletionStage<Void> invokeBean(ScheduledExecution execution) {
try {
Expand All @@ -807,15 +822,20 @@ public boolean isBlocking() {

};
}

Scheduled scheduled = new SyntheticScheduled(identity, cron, every, 0, TimeUnit.MINUTES, delayed,
overdueGracePeriod, concurrentExecution, skipPredicate, timeZone);

JobDetail jobDetail = createJobDetail(identity, QuartzSchedulerImpl.class.getName());
Optional<TriggerBuilder<?>> triggerBuilder = createTrigger(identity, scheduled, cronType, runtimeConfig, jobDetail);

if (triggerBuilder.isPresent()) {
JobInstrumenter instrumenter = null;
if (schedulerConfig.tracingEnabled && jobInstrumenter.isResolvable()) {
instrumenter = jobInstrumenter.get();
}
invoker = SimpleScheduler.initInvoker(invoker, skippedExecutionEvent, successExecutionEvent,
failedExecutionEvent, concurrentExecution, skipPredicate);
failedExecutionEvent, concurrentExecution, skipPredicate, instrumenter);
org.quartz.Trigger trigger = triggerBuilder.get().build();
QuartzTrigger existing = scheduledTasks.putIfAbsent(identity, new QuartzTrigger(trigger.getKey(),
new Function<>() {
Expand Down Expand Up @@ -895,6 +915,7 @@ public void run() {
});
} else {
context.executeBlocking(new Callable<Object>() {

@Override
public Object call() throws Exception {
return trigger.invoker.invoke(new QuartzScheduledExecution(trigger, jobExecutionContext));
Expand All @@ -918,9 +939,9 @@ public void handle(Void event) {
}
} else {
String jobName = jobExecutionContext.getJobDetail().getKey().getName();
LOGGER.warnf("Unable to find corresponding Quartz trigger for job %s. " +
"Update your Quartz table by removing all phantom jobs or make sure that there is a " +
"Scheduled method with the identity matching the job's name", jobName);
LOGGER.warnf("Unable to find corresponding Quartz trigger for job %s. "
+ "Update your Quartz table by removing all phantom jobs or make sure that there is a "
+ "Scheduled method with the identity matching the job's name", jobName);
}
}
}
Expand Down Expand Up @@ -1018,11 +1039,15 @@ static class InvokerJobFactory extends SimpleJobFactory {
final Map<String, QuartzTrigger> scheduledTasks;
final Instance<Job> jobs;
final Vertx vertx;
final JobInstrumenter instrumenter;

InvokerJobFactory(Map<String, QuartzTrigger> scheduledTasks, Instance<Job> jobs, Vertx vertx) {
InvokerJobFactory(Map<String, QuartzTrigger> scheduledTasks, Instance<Job> jobs, Vertx vertx,
JobInstrumenter instrumenter) {
this.scheduledTasks = scheduledTasks;
this.jobs = jobs;
this.vertx = vertx;
this.instrumenter = instrumenter;

}

@SuppressWarnings("unchecked")
Expand All @@ -1039,9 +1064,16 @@ public Job newJob(TriggerFiredBundle bundle, org.quartz.Scheduler Scheduler) thr
}
Instance<?> instance = jobs.select(jobClass);
if (instance.isResolvable()) {
return (Job) instance.get();
return jobWithSpanWrapper((Job) instance.get());
}
return jobWithSpanWrapper(super.newJob(bundle, Scheduler));
}

private Job jobWithSpanWrapper(Job job) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe rename the method to not use concept names from OTel?

if (instrumenter != null) {
return new InstrumentedJob(job, instrumenter);
}
return super.newJob(bundle, Scheduler);
return job;
}

}
Expand Down
8 changes: 8 additions & 0 deletions extensions/scheduler/common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-scheduler-api</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-scheduler-spi</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-instrumentation-api</artifactId>
</dependency>
<dependency>
<groupId>com.cronutils</groupId>
<artifactId>cron-utils</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.quarkus.scheduler.common.runtime;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import io.quarkus.scheduler.ScheduledExecution;
import io.quarkus.scheduler.spi.JobInstrumenter;
import io.quarkus.scheduler.spi.JobInstrumenter.JobInstrumentationContext;

/**
*
* @see JobInstrumenter
*/
public class InstrumentedInvoker extends DelegateInvoker {

private final JobInstrumenter instrumenter;

public InstrumentedInvoker(ScheduledInvoker delegate, JobInstrumenter instrumenter) {
super(delegate);
this.instrumenter = instrumenter;
}

@Override
public CompletionStage<Void> invoke(ScheduledExecution execution) throws Exception {
return instrumenter.instrument(new JobInstrumentationContext() {

@Override
public CompletionStage<Void> executeJob() {
try {
return delegate.invoke(execution);
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
}

@Override
public String getSpanName() {
return execution.getTrigger().getId();
}
});
}

}
Loading
Loading