From b1d33ce40e7cee457b9cb3f60f3cad786459c82d Mon Sep 17 00:00:00 2001 From: Marek Skacelik Date: Fri, 15 Sep 2023 11:14:42 +0200 Subject: [PATCH] Added OTel Instrumentation feature for both SimpleScheduler and Quartz Scheduler --- .../InstrumentationProcessor.java | 3 +- extensions/opentelemetry/runtime/pom.xml | 11 +++- .../config/build/TracesBuildConfig.java | 1 + .../config/runtime/OTelRuntimeConfig.java | 8 +++ extensions/quartz/runtime/pom.xml | 11 ++++ .../io/quarkus/quartz/QuartzScheduler.java | 1 - .../quartz/runtime/QuartzSchedulerImpl.java | 56 ++++++++++++++++--- .../tracing/QuartzCodeAttributesGetter.java | 18 ++++++ .../tracing/QuartzErrorCauseExtractor.java | 16 ++++++ .../tracing/QuartzSpanNameExtractor.java | 14 +++++ .../quartz/runtime/tracing/WithSpanJob.java | 53 ++++++++++++++++++ extensions/scheduler/common/pom.xml | 4 ++ .../common/runtime/WithSpanInvoker.java | 50 +++++++++++++++++ .../scheduler/runtime/SchedulerConfig.java | 4 +- .../scheduler/runtime/SchedulerRecorder.java | 1 + .../scheduler/runtime/SimpleScheduler.java | 24 +++++++- 16 files changed, 257 insertions(+), 18 deletions(-) create mode 100644 extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/tracing/QuartzCodeAttributesGetter.java create mode 100644 extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/tracing/QuartzErrorCauseExtractor.java create mode 100644 extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/tracing/QuartzSpanNameExtractor.java create mode 100644 extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/tracing/WithSpanJob.java create mode 100644 extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/WithSpanInvoker.java diff --git a/extensions/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/tracing/instrumentation/InstrumentationProcessor.java b/extensions/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/tracing/instrumentation/InstrumentationProcessor.java index 1bac28f2b0fe32..853a6682a92617 100644 --- a/extensions/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/tracing/instrumentation/InstrumentationProcessor.java +++ b/extensions/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/tracing/instrumentation/InstrumentationProcessor.java @@ -109,7 +109,7 @@ VertxOptionsConsumerBuildItem vertxTracingOptions( return new VertxOptionsConsumerBuildItem(vertxTracingOptions, LIBRARY_AFTER); } - // RESTEasy and Vert.x web + // REaSTEasy and Vert.x web @BuildStep void registerResteasyClassicAndOrResteasyReactiveProvider( Capabilities capabilities, @@ -142,5 +142,4 @@ void resteasyReactiveIntegration( preExceptionMapperHandlerBuildItemBuildProducer .produce(new PreExceptionMapperHandlerBuildItem(new AttachExceptionHandler())); } - } diff --git a/extensions/opentelemetry/runtime/pom.xml b/extensions/opentelemetry/runtime/pom.xml index 7befd1be1b261e..700b06eb1d29fc 100644 --- a/extensions/opentelemetry/runtime/pom.xml +++ b/extensions/opentelemetry/runtime/pom.xml @@ -61,7 +61,16 @@ quarkus-smallrye-reactive-messaging true - + + io.quarkus + quarkus-quartz + true + + + + + + io.opentelemetry diff --git a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/config/build/TracesBuildConfig.java b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/config/build/TracesBuildConfig.java index 2bbe78e10fbf13..54c6908fc9b64a 100644 --- a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/config/build/TracesBuildConfig.java +++ b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/config/build/TracesBuildConfig.java @@ -12,6 +12,7 @@ * Tracing build time configuration */ @ConfigGroup +//@ConfigRoot(phase = ConfigPhase.BUILD_AND_RUN_TIME_FIXED) public interface TracesBuildConfig { /** diff --git a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/config/runtime/OTelRuntimeConfig.java b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/config/runtime/OTelRuntimeConfig.java index b7eb750b2e36fc..2a40c85ac11d18 100644 --- a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/config/runtime/OTelRuntimeConfig.java +++ b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/config/runtime/OTelRuntimeConfig.java @@ -63,4 +63,12 @@ public interface OTelRuntimeConfig { */ @WithName("experimental.resource.disabled-keys") Optional> experimentalResourceDisabledKeys(); + + /** + * Indicates whether experimental attributes are disabled. + * If enabled, experimental data will be excluded from spans. + */ + @WithName("experimental.attributes.disabled") + @WithDefault("false") + boolean attributesExperimentalDisabled(); } diff --git a/extensions/quartz/runtime/pom.xml b/extensions/quartz/runtime/pom.xml index 0540be355fbaeb..1b7818b00dfa17 100644 --- a/extensions/quartz/runtime/pom.xml +++ b/extensions/quartz/runtime/pom.xml @@ -55,6 +55,17 @@ --> + + + io.opentelemetry.instrumentation + opentelemetry-instrumentation-api + true + + + io.opentelemetry.instrumentation + opentelemetry-instrumentation-api-semconv + true + diff --git a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/QuartzScheduler.java b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/QuartzScheduler.java index 395a6de8369a4a..1c36f338ac2e7c 100644 --- a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/QuartzScheduler.java +++ b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/QuartzScheduler.java @@ -12,5 +12,4 @@ public interface QuartzScheduler extends Scheduler { * @return the underlying {@link org.quartz.Scheduler} instance, or {@code null} if the scheduler was not started */ org.quartz.Scheduler getScheduler(); - } diff --git a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSchedulerImpl.java b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSchedulerImpl.java index ce07d6ee5914c4..1bd7ca86347a2d 100644 --- a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSchedulerImpl.java +++ b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSchedulerImpl.java @@ -31,11 +31,13 @@ import jakarta.enterprise.inject.Instance; import jakarta.enterprise.inject.Produces; import jakarta.enterprise.inject.Typed; +import jakarta.enterprise.inject.spi.CDI; import jakarta.inject.Singleton; import jakarta.interceptor.Interceptor; import jakarta.transaction.SystemException; import jakarta.transaction.UserTransaction; +import org.eclipse.microprofile.config.ConfigProvider; import org.jboss.logging.Logger; import org.quartz.CronScheduleBuilder; import org.quartz.Job; @@ -65,6 +67,7 @@ import io.quarkus.arc.Subclass; import io.quarkus.quartz.QuartzScheduler; +import io.quarkus.quartz.runtime.tracing.WithSpanJob; import io.quarkus.runtime.StartupEvent; import io.quarkus.scheduler.FailedExecution; import io.quarkus.scheduler.Scheduled; @@ -84,7 +87,9 @@ import io.quarkus.scheduler.common.runtime.ScheduledMethod; import io.quarkus.scheduler.common.runtime.SchedulerContext; import io.quarkus.scheduler.common.runtime.SyntheticScheduled; +import io.quarkus.scheduler.common.runtime.WithSpanInvoker; import io.quarkus.scheduler.common.runtime.util.SchedulerUtils; +import io.quarkus.scheduler.runtime.SchedulerConfig; import io.quarkus.scheduler.runtime.SchedulerRuntimeConfig; import io.quarkus.scheduler.runtime.SchedulerRuntimeConfig.StartMode; import io.quarkus.scheduler.runtime.SimpleScheduler; @@ -123,6 +128,7 @@ public class QuartzSchedulerImpl implements QuartzScheduler { private final Event scheduledJobPausedEvent; private final Event scheduledJobResumedEvent; private final QuartzRuntimeConfig runtimeConfig; + private final SchedulerConfig schedulerConfig; public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport, SchedulerRuntimeConfig schedulerRuntimeConfig, @@ -131,7 +137,8 @@ public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport Event schedulerResumedEvent, Event scheduledJobPausedEvent, Event scheduledJobResumedEvent, Instance jobs, Instance userTransaction, - Vertx vertx) { + Vertx vertx, + SchedulerConfig schedulerConfig) { this.shutdownWaitTime = quartzSupport.getRuntimeConfig().shutdownWaitTime; this.skippedExecutionEvent = skippedExecutionEvent; this.successExecutionEvent = successExecutionEvent; @@ -143,6 +150,7 @@ public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport this.runtimeConfig = quartzSupport.getRuntimeConfig(); this.enabled = schedulerRuntimeConfig.enabled; this.defaultOverdueGracePeriod = schedulerRuntimeConfig.overdueGracePeriod; + this.schedulerConfig = schedulerConfig; StartMode startMode = initStartMode(schedulerRuntimeConfig, runtimeConfig); @@ -196,7 +204,8 @@ public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport scheduler = schedulerFactory.getScheduler(); // Set custom job factory - scheduler.setJobFactory(new InvokerJobFactory(scheduledTasks, jobs, vertx)); + scheduler.setJobFactory( + new InvokerJobFactory(scheduledTasks, jobs, vertx, schedulerConfig)); if (transaction != null) { transaction.begin(); @@ -757,6 +766,14 @@ private Optional> createTrigger(String identity, Scheduled sch return Optional.of(triggerBuilder); } + private static boolean isOTelTracingEnabled() { + return ConfigProvider.getConfig().getValue("quarkus.otel.enabled", Boolean.class); + } + + private static boolean isOTelExtensionAvailable() { + return CDI.current().select(io.opentelemetry.api.OpenTelemetry.class).isResolvable(); + } + class QuartzJobDefinition extends AbstractJobDefinition { QuartzJobDefinition(String id) { @@ -791,6 +808,7 @@ public boolean isRunningOnVirtualThread() { }; } else { invoker = new DefaultInvoker() { + @Override public CompletionStage invokeBean(ScheduledExecution execution) { try { @@ -807,6 +825,7 @@ public boolean isBlocking() { }; } + Scheduled scheduled = new SyntheticScheduled(identity, cron, every, 0, TimeUnit.MINUTES, delayed, overdueGracePeriod, concurrentExecution, skipPredicate, timeZone); @@ -816,6 +835,9 @@ public boolean isBlocking() { if (triggerBuilder.isPresent()) { invoker = SimpleScheduler.initInvoker(invoker, skippedExecutionEvent, successExecutionEvent, failedExecutionEvent, concurrentExecution, skipPredicate); + if (schedulerConfig.tracingEnabled && isOTelExtensionAvailable() && isOTelTracingEnabled()) { + invoker = new WithSpanInvoker(invoker); + } org.quartz.Trigger trigger = triggerBuilder.get().build(); QuartzTrigger existing = scheduledTasks.putIfAbsent(identity, new QuartzTrigger(trigger.getKey(), new Function<>() { @@ -895,6 +917,7 @@ public void run() { }); } else { context.executeBlocking(new Callable() { + @Override public Object call() throws Exception { return trigger.invoker.invoke(new QuartzScheduledExecution(trigger, jobExecutionContext)); @@ -917,10 +940,11 @@ public void handle(Void event) { }); } } else { - String jobName = jobExecutionContext.getJobDetail().getKey().getName(); - LOGGER.warnf("Unable to find corresponding Quartz trigger for job %s. " + - "Update your Quartz table by removing all phantom jobs or make sure that there is a " + - "Scheduled method with the identity matching the job's name", jobName); + String jobName = jobExecutionContext.getJobDetail().getKey() + .getName(); + LOGGER.warnf("Unable to find corresponding Quartz trigger for job %s. " + + "Update your Quartz table by removing all phantom jobs or make sure that there is a " + + "Scheduled method with the identity matching the job's name", jobName); } } } @@ -1018,11 +1042,15 @@ static class InvokerJobFactory extends SimpleJobFactory { final Map scheduledTasks; final Instance jobs; final Vertx vertx; + final SchedulerConfig schedulerConfig; - InvokerJobFactory(Map scheduledTasks, Instance jobs, Vertx vertx) { + InvokerJobFactory(Map scheduledTasks, Instance jobs, Vertx vertx, + SchedulerConfig schedulerConfig) { this.scheduledTasks = scheduledTasks; this.jobs = jobs; this.vertx = vertx; + this.schedulerConfig = schedulerConfig; + } @SuppressWarnings("unchecked") @@ -1039,10 +1067,20 @@ public Job newJob(TriggerFiredBundle bundle, org.quartz.Scheduler Scheduler) thr } Instance instance = jobs.select(jobClass); if (instance.isResolvable()) { - return (Job) instance.get(); + return jobWithSpanWrapper((Job) instance.get()); + } + return jobWithSpanWrapper(super.newJob(bundle, Scheduler)); + } + + private Job jobWithSpanWrapper(Job job) { + if (schedulerConfig.tracingEnabled && isOTelExtensionAvailable() && isOTelTracingEnabled()) { + return new WithSpanJob(job, isCapturingExperimentalSpanAttributesDisabled()); } - return super.newJob(bundle, Scheduler); + return job; } + private boolean isCapturingExperimentalSpanAttributesDisabled() { + return ConfigProvider.getConfig().getValue("quarkus.otel.experimental.attributes.disabled", Boolean.class); + } } } diff --git a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/tracing/QuartzCodeAttributesGetter.java b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/tracing/QuartzCodeAttributesGetter.java new file mode 100644 index 00000000000000..753545f8ae83d7 --- /dev/null +++ b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/tracing/QuartzCodeAttributesGetter.java @@ -0,0 +1,18 @@ +package io.quarkus.quartz.runtime.tracing; + +import org.quartz.JobExecutionContext; + +import io.opentelemetry.instrumentation.api.instrumenter.code.CodeAttributesGetter; + +public final class QuartzCodeAttributesGetter implements CodeAttributesGetter { + + @Override + public Class getCodeClass(JobExecutionContext jobExecutionContext) { + return jobExecutionContext.getJobDetail().getJobClass(); + } + + @Override + public String getMethodName(JobExecutionContext jobExecutionContext) { + return "execute"; + } +} diff --git a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/tracing/QuartzErrorCauseExtractor.java b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/tracing/QuartzErrorCauseExtractor.java new file mode 100644 index 00000000000000..1a32ad14513a1b --- /dev/null +++ b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/tracing/QuartzErrorCauseExtractor.java @@ -0,0 +1,16 @@ +package io.quarkus.quartz.runtime.tracing; + +import org.quartz.SchedulerException; + +import io.opentelemetry.instrumentation.api.instrumenter.ErrorCauseExtractor; + +public final class QuartzErrorCauseExtractor implements ErrorCauseExtractor { + @Override + public Throwable extract(Throwable error) { + Throwable userError = error; + while (userError instanceof SchedulerException) { + userError = ((SchedulerException) userError).getUnderlyingException(); + } + return userError; + } +} diff --git a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/tracing/QuartzSpanNameExtractor.java b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/tracing/QuartzSpanNameExtractor.java new file mode 100644 index 00000000000000..1c3f89803f9879 --- /dev/null +++ b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/tracing/QuartzSpanNameExtractor.java @@ -0,0 +1,14 @@ +package io.quarkus.quartz.runtime.tracing; + +import org.quartz.JobExecutionContext; +import org.quartz.JobKey; + +import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor; + +public final class QuartzSpanNameExtractor implements SpanNameExtractor { + @Override + public String extract(JobExecutionContext job) { + JobKey key = job.getJobDetail().getKey(); + return key.getGroup() + '.' + key.getName(); + } +} diff --git a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/tracing/WithSpanJob.java b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/tracing/WithSpanJob.java new file mode 100644 index 00000000000000..6d887671c8dca8 --- /dev/null +++ b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/tracing/WithSpanJob.java @@ -0,0 +1,53 @@ +package io.quarkus.quartz.runtime.tracing; + +import jakarta.enterprise.inject.spi.CDI; + +import org.quartz.Job; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; +import io.opentelemetry.instrumentation.api.instrumenter.code.CodeAttributesExtractor; + +public class WithSpanJob implements Job { + private final Job delegate; + + private final Instrumenter instrumenter; + + public WithSpanJob(Job delegate, boolean captureExperimentalSpanAttributes) { + this.delegate = delegate; + InstrumenterBuilder jobInstrumenterBuilder = Instrumenter.builder( + CDI.current().select(OpenTelemetry.class).get(), "io.quarkus.opentelemetry", + new QuartzSpanNameExtractor()); + + if (!captureExperimentalSpanAttributes) { + jobInstrumenterBuilder.addAttributesExtractor( + AttributesExtractor.constant(AttributeKey.stringKey("job.system"), "quartz")); + } + jobInstrumenterBuilder.setErrorCauseExtractor(new QuartzErrorCauseExtractor()); + jobInstrumenterBuilder.addAttributesExtractor( + CodeAttributesExtractor.create(new QuartzCodeAttributesGetter())); + this.instrumenter = jobInstrumenterBuilder.buildInstrumenter(); + } + + @Override + public void execute(JobExecutionContext job) throws JobExecutionException { + Context parentCtx = Context.current(); + Context context = instrumenter.start(parentCtx, job); + Throwable t = null; + try (Scope scope = context.makeCurrent()) { + delegate.execute(job); + } catch (Throwable throwable) { + t = throwable; + throw throwable; + } finally { + instrumenter.end(context, job, null, t); + } + } +} diff --git a/extensions/scheduler/common/pom.xml b/extensions/scheduler/common/pom.xml index 7da88cfc94fe6c..28f0926ecd0dcf 100644 --- a/extensions/scheduler/common/pom.xml +++ b/extensions/scheduler/common/pom.xml @@ -17,6 +17,10 @@ io.quarkus quarkus-scheduler-api + + io.opentelemetry.instrumentation + opentelemetry-instrumentation-api + com.cronutils cron-utils diff --git a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/WithSpanInvoker.java b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/WithSpanInvoker.java new file mode 100644 index 00000000000000..c03a436042e650 --- /dev/null +++ b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/WithSpanInvoker.java @@ -0,0 +1,50 @@ +package io.quarkus.scheduler.common.runtime; + +import java.util.concurrent.CompletionStage; + +import jakarta.enterprise.inject.spi.CDI; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.instrumentation.api.instrumenter.ErrorCauseExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; +import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor; +import io.quarkus.scheduler.ScheduledExecution; + +public class WithSpanInvoker extends DelegateInvoker { + private final Instrumenter instrumenter; + + public WithSpanInvoker(ScheduledInvoker delegate) { + super(delegate); + InstrumenterBuilder instrumenterBuilder = Instrumenter.builder( + CDI.current().select(OpenTelemetry.class).get(), "io.quarkus.opentelemetry", + new SpanNameExtractor() { + @Override + public String extract(ScheduledExecution scheduledExecution) { + return scheduledExecution.getTrigger().getId(); + } + }); + instrumenterBuilder.setErrorCauseExtractor(new ErrorCauseExtractor() { + @Override + public Throwable extract(Throwable throwable) { + return throwable; + } + }); + + this.instrumenter = instrumenterBuilder.buildInstrumenter(); + } + + @Override + public CompletionStage invoke(ScheduledExecution execution) throws Exception { + Context parentCtx = Context.current(); + Context context = instrumenter.start(parentCtx, execution); + try (Scope scope = context.makeCurrent()) { + return delegate + .invoke(execution) + .whenComplete( + (result, throwable) -> instrumenter.end(context, execution, null, throwable)); + } + } +} diff --git a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SchedulerConfig.java b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SchedulerConfig.java index 9e6274a255de51..23ce44a235145a 100644 --- a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SchedulerConfig.java +++ b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SchedulerConfig.java @@ -25,9 +25,9 @@ public class SchedulerConfig { public boolean metricsEnabled; /** - * Tracing will be enabled if the OpenTelemetry extension is present and this value is true. + * Controls whether tracing is enabled. If set to true and the OpenTelemetry extension is present, + * tracing will be enabled, creating automatic Spans for each scheduled task. */ @ConfigItem(name = "tracing.enabled") public boolean tracingEnabled; - } diff --git a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SchedulerRecorder.java b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SchedulerRecorder.java index 37db2be1f486b3..b4d70db24e5e79 100644 --- a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SchedulerRecorder.java +++ b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SchedulerRecorder.java @@ -33,6 +33,7 @@ public CronType getCronType() { public List getScheduledMethods() { return metadata; } + }; } }; diff --git a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java index d69c97d0d3ba1f..ffda522e973b96 100644 --- a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java +++ b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java @@ -25,9 +25,11 @@ import jakarta.enterprise.event.Observes; import jakarta.enterprise.inject.Any; import jakarta.enterprise.inject.Typed; +import jakarta.enterprise.inject.spi.CDI; import jakarta.inject.Singleton; import jakarta.interceptor.Interceptor; +import org.eclipse.microprofile.config.ConfigProvider; import org.jboss.logging.Logger; import org.jboss.threads.JBossScheduledThreadPoolExecutor; @@ -62,6 +64,7 @@ import io.quarkus.scheduler.common.runtime.SkipPredicateInvoker; import io.quarkus.scheduler.common.runtime.StatusEmitterInvoker; import io.quarkus.scheduler.common.runtime.SyntheticScheduled; +import io.quarkus.scheduler.common.runtime.WithSpanInvoker; import io.quarkus.scheduler.common.runtime.util.SchedulerUtils; import io.quarkus.scheduler.runtime.SchedulerRuntimeConfig.StartMode; import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle; @@ -94,12 +97,13 @@ public class SimpleScheduler implements Scheduler { private final Event schedulerResumedEvent; private final Event scheduledJobPausedEvent; private final Event scheduledJobResumedEvent; + private final SchedulerConfig schedulerConfig; public SimpleScheduler(SchedulerContext context, SchedulerRuntimeConfig schedulerRuntimeConfig, Event skippedExecutionEvent, Event successExecutionEvent, Event failedExecutionEvent, Event schedulerPausedEvent, Event schedulerResumedEvent, Event scheduledJobPausedEvent, - Event scheduledJobResumedEvent, Vertx vertx) { + Event scheduledJobResumedEvent, Vertx vertx, SchedulerConfig schedulerConfig) { this.running = true; this.enabled = schedulerRuntimeConfig.enabled; this.scheduledTasks = new ConcurrentHashMap<>(); @@ -111,6 +115,7 @@ public SimpleScheduler(SchedulerContext context, SchedulerRuntimeConfig schedule this.schedulerResumedEvent = schedulerResumedEvent; this.scheduledJobPausedEvent = scheduledJobPausedEvent; this.scheduledJobResumedEvent = scheduledJobResumedEvent; + this.schedulerConfig = schedulerConfig; CronDefinition definition = CronDefinitionBuilder.instanceDefinitionFor(context.getCronType()); this.cronParser = new CronParser(definition); @@ -168,7 +173,7 @@ public JobDefinition newJob(String identity) { if (scheduledTasks.containsKey(identity)) { throw new IllegalStateException("A job with this identity is already scheduled: " + identity); } - return new SimpleJobDefinition(identity); + return new SimpleJobDefinition(identity, schedulerConfig); } @Override @@ -629,8 +634,11 @@ public Instant getScheduledFireTime() { class SimpleJobDefinition extends AbstractJobDefinition { - SimpleJobDefinition(String id) { + private final SchedulerConfig schedulerConfig; + + SimpleJobDefinition(String id, SchedulerConfig schedulerConfig) { super(id); + this.schedulerConfig = schedulerConfig; } @Override @@ -685,6 +693,9 @@ public boolean isBlocking() { SimpleTrigger simpleTrigger = trigger.get(); invoker = initInvoker(invoker, skippedExecutionEvent, successExecutionEvent, failedExecutionEvent, concurrentExecution, skipPredicate); + if (schedulerConfig.tracingEnabled && isOTelExtensionAvailable() && isOTelTracingEnabled()) { + invoker = new WithSpanInvoker(invoker); + } ScheduledTask scheduledTask = new ScheduledTask(trigger.get(), invoker, true); ScheduledTask existing = scheduledTasks.putIfAbsent(simpleTrigger.id, scheduledTask); if (existing != null) { @@ -695,6 +706,13 @@ public boolean isBlocking() { return null; } + private boolean isOTelTracingEnabled() { + return ConfigProvider.getConfig().getValue("quarkus.otel.enabled", Boolean.class); + } + + private boolean isOTelExtensionAvailable() { + return CDI.current().select(io.opentelemetry.api.OpenTelemetry.class).isResolvable(); + } } }