diff --git a/bom/application/pom.xml b/bom/application/pom.xml index 83a4d773d47546..dace74060fee34 100644 --- a/bom/application/pom.xml +++ b/bom/application/pom.xml @@ -2038,6 +2038,11 @@ quarkus-scheduler-api ${project.version} + + io.quarkus + quarkus-scheduler-spi + ${project.version} + io.quarkus quarkus-scheduler-common diff --git a/extensions/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/scheduler/OpenTelemetrySchedulerProcessor.java b/extensions/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/scheduler/OpenTelemetrySchedulerProcessor.java new file mode 100644 index 00000000000000..ecb8dbdd113f32 --- /dev/null +++ b/extensions/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/scheduler/OpenTelemetrySchedulerProcessor.java @@ -0,0 +1,20 @@ +package io.quarkus.opentelemetry.deployment.scheduler; + +import io.quarkus.arc.deployment.AdditionalBeanBuildItem; +import io.quarkus.deployment.Capabilities; +import io.quarkus.deployment.Capability; +import io.quarkus.deployment.annotations.BuildProducer; +import io.quarkus.deployment.annotations.BuildStep; +import io.quarkus.opentelemetry.deployment.OpenTelemetryEnabled; +import io.quarkus.opentelemetry.runtime.scheduler.OpenTelemetryJobInstrumenter; + +public class OpenTelemetrySchedulerProcessor { + + @BuildStep(onlyIf = OpenTelemetryEnabled.class) + void registerJobInstrumenter(Capabilities capabilities, BuildProducer beans) { + if (capabilities.isPresent(Capability.SCHEDULER)) { + beans.produce(new AdditionalBeanBuildItem(OpenTelemetryJobInstrumenter.class)); + } + } + +} diff --git a/extensions/opentelemetry/runtime/pom.xml b/extensions/opentelemetry/runtime/pom.xml index 7befd1be1b261e..ee9e53a80558c9 100644 --- a/extensions/opentelemetry/runtime/pom.xml +++ b/extensions/opentelemetry/runtime/pom.xml @@ -61,6 +61,11 @@ quarkus-smallrye-reactive-messaging true + + io.quarkus + quarkus-scheduler-spi + true + diff --git a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/WithSpanInvoker.java b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/scheduler/OpenTelemetryJobInstrumenter.java similarity index 50% rename from extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/WithSpanInvoker.java rename to extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/scheduler/OpenTelemetryJobInstrumenter.java index c03a436042e650..3a1a84c111bbb3 100644 --- a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/WithSpanInvoker.java +++ b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/scheduler/OpenTelemetryJobInstrumenter.java @@ -1,8 +1,8 @@ -package io.quarkus.scheduler.common.runtime; +package io.quarkus.opentelemetry.runtime.scheduler; import java.util.concurrent.CompletionStage; -import jakarta.enterprise.inject.spi.CDI; +import jakarta.inject.Singleton; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.context.Context; @@ -11,19 +11,20 @@ import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor; -import io.quarkus.scheduler.ScheduledExecution; +import io.quarkus.scheduler.spi.JobInstrumenter; -public class WithSpanInvoker extends DelegateInvoker { - private final Instrumenter instrumenter; +@Singleton +public class OpenTelemetryJobInstrumenter implements JobInstrumenter { - public WithSpanInvoker(ScheduledInvoker delegate) { - super(delegate); - InstrumenterBuilder instrumenterBuilder = Instrumenter.builder( - CDI.current().select(OpenTelemetry.class).get(), "io.quarkus.opentelemetry", - new SpanNameExtractor() { + private final Instrumenter instrumenter; + + public OpenTelemetryJobInstrumenter(OpenTelemetry openTelemetry) { + InstrumenterBuilder instrumenterBuilder = Instrumenter.builder( + openTelemetry, "io.quarkus.opentelemetry", + new SpanNameExtractor() { @Override - public String extract(ScheduledExecution scheduledExecution) { - return scheduledExecution.getTrigger().getId(); + public String extract(JobInstrumentationContext context) { + return context.getSpanName(); } }); instrumenterBuilder.setErrorCauseExtractor(new ErrorCauseExtractor() { @@ -32,19 +33,19 @@ public Throwable extract(Throwable throwable) { return throwable; } }); - this.instrumenter = instrumenterBuilder.buildInstrumenter(); } @Override - public CompletionStage invoke(ScheduledExecution execution) throws Exception { + public CompletionStage instrument(JobInstrumentationContext instrumentationContext) { Context parentCtx = Context.current(); - Context context = instrumenter.start(parentCtx, execution); + Context context = instrumenter.start(parentCtx, instrumentationContext); try (Scope scope = context.makeCurrent()) { - return delegate - .invoke(execution) + return instrumentationContext + .executeJob() .whenComplete( - (result, throwable) -> instrumenter.end(context, execution, null, throwable)); + (result, throwable) -> instrumenter.end(context, instrumentationContext, null, throwable)); } } + } diff --git a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/InstrumentedJob.java b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/InstrumentedJob.java new file mode 100644 index 00000000000000..f7744c282895ad --- /dev/null +++ b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/InstrumentedJob.java @@ -0,0 +1,51 @@ +package io.quarkus.quartz.runtime; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + +import org.quartz.Job; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; +import org.quartz.JobKey; + +import io.quarkus.scheduler.spi.JobInstrumenter; +import io.quarkus.scheduler.spi.JobInstrumenter.JobInstrumentationContext; + +/** + * + * @see JobInstrumenter + */ +class InstrumentedJob implements Job { + + private final Job delegate; + private final JobInstrumenter instrumenter; + + InstrumentedJob(Job delegate, JobInstrumenter instrumenter) { + this.delegate = delegate; + this.instrumenter = instrumenter; + } + + @Override + public void execute(JobExecutionContext context) throws JobExecutionException { + instrumenter.instrument(new JobInstrumentationContext() { + + @Override + public CompletionStage executeJob() { + try { + delegate.execute(context); + return CompletableFuture.completedFuture(null); + } catch (Exception e) { + return CompletableFuture.failedFuture(e); + } + + } + + @Override + public String getSpanName() { + JobKey key = context.getJobDetail().getKey(); + return key.getGroup() + '.' + key.getName(); + } + }); + } + +} diff --git a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSchedulerImpl.java b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSchedulerImpl.java index aed62062d62013..cedfa97b9cfac9 100644 --- a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSchedulerImpl.java +++ b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSchedulerImpl.java @@ -31,13 +31,11 @@ import jakarta.enterprise.inject.Instance; import jakarta.enterprise.inject.Produces; import jakarta.enterprise.inject.Typed; -import jakarta.enterprise.inject.spi.CDI; import jakarta.inject.Singleton; import jakarta.interceptor.Interceptor; import jakarta.transaction.SystemException; import jakarta.transaction.UserTransaction; -import org.eclipse.microprofile.config.ConfigProvider; import org.jboss.logging.Logger; import org.quartz.CronScheduleBuilder; import org.quartz.Job; @@ -67,7 +65,6 @@ import io.quarkus.arc.Subclass; import io.quarkus.quartz.QuartzScheduler; -import io.quarkus.quartz.runtime.tracing.WithSpanJob; import io.quarkus.runtime.StartupEvent; import io.quarkus.scheduler.FailedExecution; import io.quarkus.scheduler.Scheduled; @@ -87,12 +84,12 @@ import io.quarkus.scheduler.common.runtime.ScheduledMethod; import io.quarkus.scheduler.common.runtime.SchedulerContext; import io.quarkus.scheduler.common.runtime.SyntheticScheduled; -import io.quarkus.scheduler.common.runtime.WithSpanInvoker; import io.quarkus.scheduler.common.runtime.util.SchedulerUtils; import io.quarkus.scheduler.runtime.SchedulerConfig; import io.quarkus.scheduler.runtime.SchedulerRuntimeConfig; import io.quarkus.scheduler.runtime.SchedulerRuntimeConfig.StartMode; import io.quarkus.scheduler.runtime.SimpleScheduler; +import io.quarkus.scheduler.spi.JobInstrumenter; import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle; import io.quarkus.virtual.threads.VirtualThreadsRecorder; import io.smallrye.common.vertx.VertxContext; @@ -129,6 +126,7 @@ public class QuartzSchedulerImpl implements QuartzScheduler { private final Event scheduledJobResumedEvent; private final QuartzRuntimeConfig runtimeConfig; private final SchedulerConfig schedulerConfig; + private final Instance jobInstrumenter; public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport, SchedulerRuntimeConfig schedulerRuntimeConfig, @@ -138,7 +136,7 @@ public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport Event scheduledJobResumedEvent, Instance jobs, Instance userTransaction, Vertx vertx, - SchedulerConfig schedulerConfig) { + SchedulerConfig schedulerConfig, Instance jobInstrumenter) { this.shutdownWaitTime = quartzSupport.getRuntimeConfig().shutdownWaitTime; this.skippedExecutionEvent = skippedExecutionEvent; this.successExecutionEvent = successExecutionEvent; @@ -151,6 +149,7 @@ public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport this.enabled = schedulerRuntimeConfig.enabled; this.defaultOverdueGracePeriod = schedulerRuntimeConfig.overdueGracePeriod; this.schedulerConfig = schedulerConfig; + this.jobInstrumenter = jobInstrumenter; StartMode startMode = initStartMode(schedulerRuntimeConfig, runtimeConfig); @@ -184,6 +183,11 @@ public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport CronDefinition def = CronDefinitionBuilder.instanceDefinitionFor(cronType); cronParser = new CronParser(def); + JobInstrumenter instrumenter = null; + if (schedulerConfig.tracingEnabled && jobInstrumenter.isResolvable()) { + instrumenter = jobInstrumenter.get(); + } + if (!enabled) { LOGGER.info("Quartz scheduler is disabled by config property and will not be started"); this.scheduler = null; @@ -205,7 +209,7 @@ public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport // Set custom job factory scheduler.setJobFactory( - new InvokerJobFactory(scheduledTasks, jobs, vertx, schedulerConfig)); + new InvokerJobFactory(scheduledTasks, jobs, vertx, instrumenter)); if (transaction != null) { transaction.begin(); @@ -218,11 +222,12 @@ public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport if (identity.isEmpty()) { identity = ++nameSequence + "_" + method.getInvokerClassName(); } + ScheduledInvoker invoker = SimpleScheduler.initInvoker( context.createInvoker(method.getInvokerClassName()), skippedExecutionEvent, successExecutionEvent, failedExecutionEvent, scheduled.concurrentExecution(), - SimpleScheduler.initSkipPredicate(scheduled.skipExecutionIf())); + SimpleScheduler.initSkipPredicate(scheduled.skipExecutionIf()), instrumenter); JobDetail jobDetail = createJobDetail(identity, method.getInvokerClassName()); Optional> triggerBuilder = createTrigger(identity, scheduled, cronType, runtimeConfig, @@ -766,14 +771,6 @@ private Optional> createTrigger(String identity, Scheduled sch return Optional.of(triggerBuilder); } - private static boolean isOTelTracingEnabled() { - return ConfigProvider.getConfig().getValue("quarkus.otel.enabled", Boolean.class); - } - - private static boolean isOTelExtensionAvailable() { - return CDI.current().select(io.opentelemetry.api.OpenTelemetry.class).isResolvable(); - } - class QuartzJobDefinition extends AbstractJobDefinition { QuartzJobDefinition(String id) { @@ -833,11 +830,12 @@ public boolean isBlocking() { Optional> triggerBuilder = createTrigger(identity, scheduled, cronType, runtimeConfig, jobDetail); if (triggerBuilder.isPresent()) { - invoker = SimpleScheduler.initInvoker(invoker, skippedExecutionEvent, successExecutionEvent, - failedExecutionEvent, concurrentExecution, skipPredicate); - if (schedulerConfig.tracingEnabled && isOTelExtensionAvailable() && isOTelTracingEnabled()) { - invoker = new WithSpanInvoker(invoker); + JobInstrumenter instrumenter = null; + if (schedulerConfig.tracingEnabled && jobInstrumenter.isResolvable()) { + instrumenter = jobInstrumenter.get(); } + invoker = SimpleScheduler.initInvoker(invoker, skippedExecutionEvent, successExecutionEvent, + failedExecutionEvent, concurrentExecution, skipPredicate, instrumenter); org.quartz.Trigger trigger = triggerBuilder.get().build(); QuartzTrigger existing = scheduledTasks.putIfAbsent(identity, new QuartzTrigger(trigger.getKey(), new Function<>() { @@ -1041,14 +1039,14 @@ static class InvokerJobFactory extends SimpleJobFactory { final Map scheduledTasks; final Instance jobs; final Vertx vertx; - final SchedulerConfig schedulerConfig; + final JobInstrumenter instrumenter; InvokerJobFactory(Map scheduledTasks, Instance jobs, Vertx vertx, - SchedulerConfig schedulerConfig) { + JobInstrumenter instrumenter) { this.scheduledTasks = scheduledTasks; this.jobs = jobs; this.vertx = vertx; - this.schedulerConfig = schedulerConfig; + this.instrumenter = instrumenter; } @@ -1072,14 +1070,11 @@ public Job newJob(TriggerFiredBundle bundle, org.quartz.Scheduler Scheduler) thr } private Job jobWithSpanWrapper(Job job) { - if (schedulerConfig.tracingEnabled && isOTelExtensionAvailable() && isOTelTracingEnabled()) { - return new WithSpanJob(job, isCapturingExperimentalSpanAttributesDisabled()); + if (instrumenter != null) { + return new InstrumentedJob(job, instrumenter); } return job; } - private boolean isCapturingExperimentalSpanAttributesDisabled() { - return ConfigProvider.getConfig().getValue("quarkus.otel.experimental.attributes.disabled", Boolean.class); - } } } diff --git a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/tracing/QuartzCodeAttributesGetter.java b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/tracing/QuartzCodeAttributesGetter.java deleted file mode 100644 index 753545f8ae83d7..00000000000000 --- a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/tracing/QuartzCodeAttributesGetter.java +++ /dev/null @@ -1,18 +0,0 @@ -package io.quarkus.quartz.runtime.tracing; - -import org.quartz.JobExecutionContext; - -import io.opentelemetry.instrumentation.api.instrumenter.code.CodeAttributesGetter; - -public final class QuartzCodeAttributesGetter implements CodeAttributesGetter { - - @Override - public Class getCodeClass(JobExecutionContext jobExecutionContext) { - return jobExecutionContext.getJobDetail().getJobClass(); - } - - @Override - public String getMethodName(JobExecutionContext jobExecutionContext) { - return "execute"; - } -} diff --git a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/tracing/QuartzErrorCauseExtractor.java b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/tracing/QuartzErrorCauseExtractor.java deleted file mode 100644 index 1a32ad14513a1b..00000000000000 --- a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/tracing/QuartzErrorCauseExtractor.java +++ /dev/null @@ -1,16 +0,0 @@ -package io.quarkus.quartz.runtime.tracing; - -import org.quartz.SchedulerException; - -import io.opentelemetry.instrumentation.api.instrumenter.ErrorCauseExtractor; - -public final class QuartzErrorCauseExtractor implements ErrorCauseExtractor { - @Override - public Throwable extract(Throwable error) { - Throwable userError = error; - while (userError instanceof SchedulerException) { - userError = ((SchedulerException) userError).getUnderlyingException(); - } - return userError; - } -} diff --git a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/tracing/QuartzSpanNameExtractor.java b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/tracing/QuartzSpanNameExtractor.java deleted file mode 100644 index 1c3f89803f9879..00000000000000 --- a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/tracing/QuartzSpanNameExtractor.java +++ /dev/null @@ -1,14 +0,0 @@ -package io.quarkus.quartz.runtime.tracing; - -import org.quartz.JobExecutionContext; -import org.quartz.JobKey; - -import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor; - -public final class QuartzSpanNameExtractor implements SpanNameExtractor { - @Override - public String extract(JobExecutionContext job) { - JobKey key = job.getJobDetail().getKey(); - return key.getGroup() + '.' + key.getName(); - } -} diff --git a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/tracing/WithSpanJob.java b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/tracing/WithSpanJob.java deleted file mode 100644 index 6d887671c8dca8..00000000000000 --- a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/tracing/WithSpanJob.java +++ /dev/null @@ -1,53 +0,0 @@ -package io.quarkus.quartz.runtime.tracing; - -import jakarta.enterprise.inject.spi.CDI; - -import org.quartz.Job; -import org.quartz.JobExecutionContext; -import org.quartz.JobExecutionException; - -import io.opentelemetry.api.OpenTelemetry; -import io.opentelemetry.api.common.AttributeKey; -import io.opentelemetry.context.Context; -import io.opentelemetry.context.Scope; -import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; -import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; -import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; -import io.opentelemetry.instrumentation.api.instrumenter.code.CodeAttributesExtractor; - -public class WithSpanJob implements Job { - private final Job delegate; - - private final Instrumenter instrumenter; - - public WithSpanJob(Job delegate, boolean captureExperimentalSpanAttributes) { - this.delegate = delegate; - InstrumenterBuilder jobInstrumenterBuilder = Instrumenter.builder( - CDI.current().select(OpenTelemetry.class).get(), "io.quarkus.opentelemetry", - new QuartzSpanNameExtractor()); - - if (!captureExperimentalSpanAttributes) { - jobInstrumenterBuilder.addAttributesExtractor( - AttributesExtractor.constant(AttributeKey.stringKey("job.system"), "quartz")); - } - jobInstrumenterBuilder.setErrorCauseExtractor(new QuartzErrorCauseExtractor()); - jobInstrumenterBuilder.addAttributesExtractor( - CodeAttributesExtractor.create(new QuartzCodeAttributesGetter())); - this.instrumenter = jobInstrumenterBuilder.buildInstrumenter(); - } - - @Override - public void execute(JobExecutionContext job) throws JobExecutionException { - Context parentCtx = Context.current(); - Context context = instrumenter.start(parentCtx, job); - Throwable t = null; - try (Scope scope = context.makeCurrent()) { - delegate.execute(job); - } catch (Throwable throwable) { - t = throwable; - throw throwable; - } finally { - instrumenter.end(context, job, null, t); - } - } -} diff --git a/extensions/scheduler/common/pom.xml b/extensions/scheduler/common/pom.xml index 28f0926ecd0dcf..126368f70c885f 100644 --- a/extensions/scheduler/common/pom.xml +++ b/extensions/scheduler/common/pom.xml @@ -17,6 +17,10 @@ io.quarkus quarkus-scheduler-api + + io.quarkus + quarkus-scheduler-spi + io.opentelemetry.instrumentation opentelemetry-instrumentation-api diff --git a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/InstrumentedInvoker.java b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/InstrumentedInvoker.java new file mode 100644 index 00000000000000..c0e330001f436a --- /dev/null +++ b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/InstrumentedInvoker.java @@ -0,0 +1,43 @@ +package io.quarkus.scheduler.common.runtime; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + +import io.quarkus.scheduler.ScheduledExecution; +import io.quarkus.scheduler.spi.JobInstrumenter; +import io.quarkus.scheduler.spi.JobInstrumenter.JobInstrumentationContext; + +/** + * + * @see JobInstrumenter + */ +public class InstrumentedInvoker extends DelegateInvoker { + + private final JobInstrumenter instrumenter; + + public InstrumentedInvoker(ScheduledInvoker delegate, JobInstrumenter instrumenter) { + super(delegate); + this.instrumenter = instrumenter; + } + + @Override + public CompletionStage invoke(ScheduledExecution execution) throws Exception { + return instrumenter.instrument(new JobInstrumentationContext() { + + @Override + public CompletionStage executeJob() { + try { + return delegate.invoke(execution); + } catch (Exception e) { + return CompletableFuture.failedFuture(e); + } + } + + @Override + public String getSpanName() { + return execution.getTrigger().getId(); + } + }); + } + +} diff --git a/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/SchedulerProcessor.java b/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/SchedulerProcessor.java index 60196c55e4190e..82c9a9ba89d699 100644 --- a/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/SchedulerProcessor.java +++ b/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/SchedulerProcessor.java @@ -367,30 +367,6 @@ public void metrics(SchedulerConfig config, } } - @BuildStep - public void tracing(SchedulerConfig config, - Capabilities capabilities, BuildProducer annotationsTransformer) { - - if (config.tracingEnabled && capabilities.isPresent(Capability.OPENTELEMETRY_TRACER)) { - DotName withSpan = DotName.createSimple("io.opentelemetry.instrumentation.annotations.WithSpan"); - DotName legacyWithSpan = DotName.createSimple("io.opentelemetry.extension.annotations.WithSpan"); - - annotationsTransformer.produce(new AnnotationsTransformerBuildItem(AnnotationsTransformer.builder() - .appliesTo(METHOD) - .whenContainsAny(List.of(SchedulerDotNames.SCHEDULED_NAME, SchedulerDotNames.SCHEDULES_NAME)) - .whenContainsNone(List.of(withSpan, legacyWithSpan)) - .transform(context -> { - MethodInfo scheduledMethod = context.getTarget().asMethod(); - context.transform() - .add(withSpan) - .done(); - LOGGER.debugf("Added OpenTelemetry @WithSpan to a @Scheduled method %s#%s()", - scheduledMethod.declaringClass().name(), - scheduledMethod.name()); - }))); - } - } - private String generateInvoker(ScheduledBusinessMethodItem scheduledMethod, ClassOutput classOutput) { BeanInfo bean = scheduledMethod.getBean(); diff --git a/extensions/scheduler/pom.xml b/extensions/scheduler/pom.xml index a763e4e95e6d8b..babdb6acf34c24 100644 --- a/extensions/scheduler/pom.xml +++ b/extensions/scheduler/pom.xml @@ -17,6 +17,7 @@ deployment api + spi common kotlin runtime diff --git a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java index ffda522e973b96..afc99c574ee8d3 100644 --- a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java +++ b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java @@ -24,12 +24,11 @@ import jakarta.enterprise.event.Event; import jakarta.enterprise.event.Observes; import jakarta.enterprise.inject.Any; +import jakarta.enterprise.inject.Instance; import jakarta.enterprise.inject.Typed; -import jakarta.enterprise.inject.spi.CDI; import jakarta.inject.Singleton; import jakarta.interceptor.Interceptor; -import org.eclipse.microprofile.config.ConfigProvider; import org.jboss.logging.Logger; import org.jboss.threads.JBossScheduledThreadPoolExecutor; @@ -57,6 +56,7 @@ import io.quarkus.scheduler.common.runtime.AbstractJobDefinition; import io.quarkus.scheduler.common.runtime.DefaultInvoker; import io.quarkus.scheduler.common.runtime.Events; +import io.quarkus.scheduler.common.runtime.InstrumentedInvoker; import io.quarkus.scheduler.common.runtime.ScheduledInvoker; import io.quarkus.scheduler.common.runtime.ScheduledMethod; import io.quarkus.scheduler.common.runtime.SchedulerContext; @@ -64,9 +64,9 @@ import io.quarkus.scheduler.common.runtime.SkipPredicateInvoker; import io.quarkus.scheduler.common.runtime.StatusEmitterInvoker; import io.quarkus.scheduler.common.runtime.SyntheticScheduled; -import io.quarkus.scheduler.common.runtime.WithSpanInvoker; import io.quarkus.scheduler.common.runtime.util.SchedulerUtils; import io.quarkus.scheduler.runtime.SchedulerRuntimeConfig.StartMode; +import io.quarkus.scheduler.spi.JobInstrumenter; import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle; import io.quarkus.virtual.threads.VirtualThreadsRecorder; import io.smallrye.common.vertx.VertxContext; @@ -98,12 +98,14 @@ public class SimpleScheduler implements Scheduler { private final Event scheduledJobPausedEvent; private final Event scheduledJobResumedEvent; private final SchedulerConfig schedulerConfig; + private final Instance jobInstrumenter; public SimpleScheduler(SchedulerContext context, SchedulerRuntimeConfig schedulerRuntimeConfig, Event skippedExecutionEvent, Event successExecutionEvent, Event failedExecutionEvent, Event schedulerPausedEvent, Event schedulerResumedEvent, Event scheduledJobPausedEvent, - Event scheduledJobResumedEvent, Vertx vertx, SchedulerConfig schedulerConfig) { + Event scheduledJobResumedEvent, Vertx vertx, SchedulerConfig schedulerConfig, + Instance jobInstrumenter) { this.running = true; this.enabled = schedulerRuntimeConfig.enabled; this.scheduledTasks = new ConcurrentHashMap<>(); @@ -116,6 +118,7 @@ public SimpleScheduler(SchedulerContext context, SchedulerRuntimeConfig schedule this.scheduledJobPausedEvent = scheduledJobPausedEvent; this.scheduledJobResumedEvent = scheduledJobResumedEvent; this.schedulerConfig = schedulerConfig; + this.jobInstrumenter = jobInstrumenter; CronDefinition definition = CronDefinitionBuilder.instanceDefinitionFor(context.getCronType()); this.cronParser = new CronParser(definition); @@ -158,9 +161,13 @@ public void run() { Optional trigger = createTrigger(id, method.getMethodDescription(), cronParser, scheduled, defaultOverdueGracePeriod); if (trigger.isPresent()) { + JobInstrumenter instrumenter = null; + if (schedulerConfig.tracingEnabled && jobInstrumenter.isResolvable()) { + instrumenter = jobInstrumenter.get(); + } ScheduledInvoker invoker = initInvoker(context.createInvoker(method.getInvokerClassName()), skippedExecutionEvent, successExecutionEvent, failedExecutionEvent, - scheduled.concurrentExecution(), initSkipPredicate(scheduled.skipExecutionIf())); + scheduled.concurrentExecution(), initSkipPredicate(scheduled.skipExecutionIf()), instrumenter); scheduledTasks.put(trigger.get().id, new ScheduledTask(trigger.get(), invoker, false)); } } @@ -357,7 +364,7 @@ Optional createTrigger(String id, String methodDescription, CronP public static ScheduledInvoker initInvoker(ScheduledInvoker invoker, Event skippedExecutionEvent, Event successExecutionEvent, Event failedExecutionEvent, ConcurrentExecution concurrentExecution, - Scheduled.SkipPredicate skipPredicate) { + Scheduled.SkipPredicate skipPredicate, JobInstrumenter instrumenter) { invoker = new StatusEmitterInvoker(invoker, successExecutionEvent, failedExecutionEvent); if (concurrentExecution == ConcurrentExecution.SKIP) { invoker = new SkipConcurrentExecutionInvoker(invoker, skippedExecutionEvent); @@ -365,6 +372,9 @@ public static ScheduledInvoker initInvoker(ScheduledInvoker invoker, Event + + + io.quarkus + quarkus-scheduler-parent + 999-SNAPSHOT + + 4.0.0 + + quarkus-scheduler-spi + Quarkus - Scheduler - SPI + + + + io.quarkus + quarkus-scheduler-api + + + diff --git a/extensions/scheduler/spi/src/main/java/io/quarkus/scheduler/spi/JobInstrumenter.java b/extensions/scheduler/spi/src/main/java/io/quarkus/scheduler/spi/JobInstrumenter.java new file mode 100644 index 00000000000000..d6df78b5691c8f --- /dev/null +++ b/extensions/scheduler/spi/src/main/java/io/quarkus/scheduler/spi/JobInstrumenter.java @@ -0,0 +1,23 @@ +package io.quarkus.scheduler.spi; + +import java.util.concurrent.CompletionStage; + +/** + * Instruments a scheduled job. + *

+ * Telemetry extensions can provide exactly one CDI bean of this type. The scope must be either {@link jakarta.inject.Singleton} + * or {@link jakarta.enterprise.context.ApplicationScoped}. + */ +public interface JobInstrumenter { + + CompletionStage instrument(JobInstrumentationContext context); + + interface JobInstrumentationContext { + + String getSpanName(); + + CompletionStage executeJob(); + + } + +}