From efea7db93994864436b6e3d278285e05bba918ae Mon Sep 17 00:00:00 2001 From: Marek Skacelik Date: Fri, 15 Sep 2023 11:14:42 +0200 Subject: [PATCH] Added OTel Instrumentation feature for both SimpleScheduler and Quartz Scheduler Scheduler: introduce SPI module and JobInstrumenter - implement JobInstrumenter in OTel extension - remove the annotation transformer that adds OTel @WithSpan to @Scheduled methods added tests for scheduler/quartz OTel integration. Co-authored-by: Martin Kouba --- bom/application/pom.xml | 5 + .../OpenTelemetrySchedulerProcessor.java | 20 +++ .../InstrumentationProcessor.java | 1 - extensions/opentelemetry/runtime/pom.xml | 5 + .../OpenTelemetryJobInstrumenter.java | 51 +++++++ extensions/quartz/runtime/pom.xml | 11 ++ .../quartz/runtime/InstrumentedJob.java | 51 +++++++ .../quartz/runtime/QuartzSchedulerImpl.java | 52 +++++-- extensions/scheduler/common/pom.xml | 8 + .../common/runtime/InstrumentedInvoker.java | 43 ++++++ .../deployment/SchedulerProcessor.java | 24 --- extensions/scheduler/pom.xml | 1 + .../scheduler/runtime/SchedulerConfig.java | 4 +- .../scheduler/runtime/SimpleScheduler.java | 34 ++++- extensions/scheduler/spi/pom.xml | 21 +++ .../scheduler/spi/JobInstrumenter.java | 23 +++ .../opentelemetry-quartz/pom.xml | 144 ++++++++++++++++++ .../opentelemetry/quartz/CountResource.java | 40 +++++ .../it/opentelemetry/quartz/Counter.java | 30 ++++ .../quartz/ExporterResource.java | 38 +++++ .../quartz/FailedBasicScheduler.java | 17 +++ .../quartz/FailedJobDefinitionScheduler.java | 29 ++++ .../quartz/FailedManualScheduler.java | 52 +++++++ .../quartz/JobDefinitionCounter.java | 37 +++++ .../quartz/ManualScheduledCounter.java | 59 +++++++ .../src/main/resources/application.properties | 5 + .../quartz/OpenTelemetryQuartzIT.java | 16 ++ .../quartz/OpenTelemetryQuartzTest.java | 104 +++++++++++++ .../opentelemetry-scheduler/pom.xml | 144 ++++++++++++++++++ .../scheduler/CountResource.java | 30 ++++ .../it/opentelemetry/scheduler/Counter.java | 30 ++++ .../scheduler/ExporterResource.java | 38 +++++ .../scheduler/FailedBasicScheduler.java | 17 +++ .../FailedJobDefinitionScheduler.java | 29 ++++ .../scheduler/JobDefinitionCounter.java | 37 +++++ .../src/main/resources/application.properties | 5 + .../scheduler/OpenTelemetrySchedulerIT.java | 16 ++ .../scheduler/OpenTelemetrySchedulerTest.java | 99 ++++++++++++ integration-tests/pom.xml | 2 + 39 files changed, 1329 insertions(+), 43 deletions(-) create mode 100644 extensions/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/scheduler/OpenTelemetrySchedulerProcessor.java create mode 100644 extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/scheduler/OpenTelemetryJobInstrumenter.java create mode 100644 extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/InstrumentedJob.java create mode 100644 extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/InstrumentedInvoker.java create mode 100644 extensions/scheduler/spi/pom.xml create mode 100644 extensions/scheduler/spi/src/main/java/io/quarkus/scheduler/spi/JobInstrumenter.java create mode 100644 integration-tests/opentelemetry-quartz/pom.xml create mode 100644 integration-tests/opentelemetry-quartz/src/main/java/io/quarkus/it/opentelemetry/quartz/CountResource.java create mode 100644 integration-tests/opentelemetry-quartz/src/main/java/io/quarkus/it/opentelemetry/quartz/Counter.java create mode 100644 integration-tests/opentelemetry-quartz/src/main/java/io/quarkus/it/opentelemetry/quartz/ExporterResource.java create mode 100644 integration-tests/opentelemetry-quartz/src/main/java/io/quarkus/it/opentelemetry/quartz/FailedBasicScheduler.java create mode 100644 integration-tests/opentelemetry-quartz/src/main/java/io/quarkus/it/opentelemetry/quartz/FailedJobDefinitionScheduler.java create mode 100644 integration-tests/opentelemetry-quartz/src/main/java/io/quarkus/it/opentelemetry/quartz/FailedManualScheduler.java create mode 100644 integration-tests/opentelemetry-quartz/src/main/java/io/quarkus/it/opentelemetry/quartz/JobDefinitionCounter.java create mode 100644 integration-tests/opentelemetry-quartz/src/main/java/io/quarkus/it/opentelemetry/quartz/ManualScheduledCounter.java create mode 100644 integration-tests/opentelemetry-quartz/src/main/resources/application.properties create mode 100644 integration-tests/opentelemetry-quartz/src/test/java/io/quarkus/it/opentelemetry/quartz/OpenTelemetryQuartzIT.java create mode 100644 integration-tests/opentelemetry-quartz/src/test/java/io/quarkus/it/opentelemetry/quartz/OpenTelemetryQuartzTest.java create mode 100644 integration-tests/opentelemetry-scheduler/pom.xml create mode 100644 integration-tests/opentelemetry-scheduler/src/main/java/io/quarkus/it/opentelemetry/scheduler/CountResource.java create mode 100644 integration-tests/opentelemetry-scheduler/src/main/java/io/quarkus/it/opentelemetry/scheduler/Counter.java create mode 100644 integration-tests/opentelemetry-scheduler/src/main/java/io/quarkus/it/opentelemetry/scheduler/ExporterResource.java create mode 100644 integration-tests/opentelemetry-scheduler/src/main/java/io/quarkus/it/opentelemetry/scheduler/FailedBasicScheduler.java create mode 100644 integration-tests/opentelemetry-scheduler/src/main/java/io/quarkus/it/opentelemetry/scheduler/FailedJobDefinitionScheduler.java create mode 100644 integration-tests/opentelemetry-scheduler/src/main/java/io/quarkus/it/opentelemetry/scheduler/JobDefinitionCounter.java create mode 100644 integration-tests/opentelemetry-scheduler/src/main/resources/application.properties create mode 100644 integration-tests/opentelemetry-scheduler/src/test/java/io/quarkus/it/opentelemetry/scheduler/OpenTelemetrySchedulerIT.java create mode 100644 integration-tests/opentelemetry-scheduler/src/test/java/io/quarkus/it/opentelemetry/scheduler/OpenTelemetrySchedulerTest.java diff --git a/bom/application/pom.xml b/bom/application/pom.xml index e8016d1353185..6cc3063807b1a 100644 --- a/bom/application/pom.xml +++ b/bom/application/pom.xml @@ -2049,6 +2049,11 @@ quarkus-scheduler-api ${project.version} + + io.quarkus + quarkus-scheduler-spi + ${project.version} + io.quarkus quarkus-scheduler-common diff --git a/extensions/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/scheduler/OpenTelemetrySchedulerProcessor.java b/extensions/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/scheduler/OpenTelemetrySchedulerProcessor.java new file mode 100644 index 0000000000000..ecb8dbdd113f3 --- /dev/null +++ b/extensions/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/scheduler/OpenTelemetrySchedulerProcessor.java @@ -0,0 +1,20 @@ +package io.quarkus.opentelemetry.deployment.scheduler; + +import io.quarkus.arc.deployment.AdditionalBeanBuildItem; +import io.quarkus.deployment.Capabilities; +import io.quarkus.deployment.Capability; +import io.quarkus.deployment.annotations.BuildProducer; +import io.quarkus.deployment.annotations.BuildStep; +import io.quarkus.opentelemetry.deployment.OpenTelemetryEnabled; +import io.quarkus.opentelemetry.runtime.scheduler.OpenTelemetryJobInstrumenter; + +public class OpenTelemetrySchedulerProcessor { + + @BuildStep(onlyIf = OpenTelemetryEnabled.class) + void registerJobInstrumenter(Capabilities capabilities, BuildProducer beans) { + if (capabilities.isPresent(Capability.SCHEDULER)) { + beans.produce(new AdditionalBeanBuildItem(OpenTelemetryJobInstrumenter.class)); + } + } + +} diff --git a/extensions/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/tracing/instrumentation/InstrumentationProcessor.java b/extensions/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/tracing/instrumentation/InstrumentationProcessor.java index 83101515d740c..d8aa5e59bd0cb 100644 --- a/extensions/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/tracing/instrumentation/InstrumentationProcessor.java +++ b/extensions/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/tracing/instrumentation/InstrumentationProcessor.java @@ -146,5 +146,4 @@ void resteasyReactiveIntegration( preExceptionMapperHandlerBuildItemBuildProducer .produce(new PreExceptionMapperHandlerBuildItem(new AttachExceptionHandler())); } - } diff --git a/extensions/opentelemetry/runtime/pom.xml b/extensions/opentelemetry/runtime/pom.xml index dd7d2fda4acde..9d47d52ade00a 100644 --- a/extensions/opentelemetry/runtime/pom.xml +++ b/extensions/opentelemetry/runtime/pom.xml @@ -61,6 +61,11 @@ quarkus-smallrye-reactive-messaging true + + io.quarkus + quarkus-scheduler-spi + true + diff --git a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/scheduler/OpenTelemetryJobInstrumenter.java b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/scheduler/OpenTelemetryJobInstrumenter.java new file mode 100644 index 0000000000000..3a1a84c111bbb --- /dev/null +++ b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/scheduler/OpenTelemetryJobInstrumenter.java @@ -0,0 +1,51 @@ +package io.quarkus.opentelemetry.runtime.scheduler; + +import java.util.concurrent.CompletionStage; + +import jakarta.inject.Singleton; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.instrumentation.api.instrumenter.ErrorCauseExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; +import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor; +import io.quarkus.scheduler.spi.JobInstrumenter; + +@Singleton +public class OpenTelemetryJobInstrumenter implements JobInstrumenter { + + private final Instrumenter instrumenter; + + public OpenTelemetryJobInstrumenter(OpenTelemetry openTelemetry) { + InstrumenterBuilder instrumenterBuilder = Instrumenter.builder( + openTelemetry, "io.quarkus.opentelemetry", + new SpanNameExtractor() { + @Override + public String extract(JobInstrumentationContext context) { + return context.getSpanName(); + } + }); + instrumenterBuilder.setErrorCauseExtractor(new ErrorCauseExtractor() { + @Override + public Throwable extract(Throwable throwable) { + return throwable; + } + }); + this.instrumenter = instrumenterBuilder.buildInstrumenter(); + } + + @Override + public CompletionStage instrument(JobInstrumentationContext instrumentationContext) { + Context parentCtx = Context.current(); + Context context = instrumenter.start(parentCtx, instrumentationContext); + try (Scope scope = context.makeCurrent()) { + return instrumentationContext + .executeJob() + .whenComplete( + (result, throwable) -> instrumenter.end(context, instrumentationContext, null, throwable)); + } + } + +} diff --git a/extensions/quartz/runtime/pom.xml b/extensions/quartz/runtime/pom.xml index 0540be355fbae..1b7818b00dfa1 100644 --- a/extensions/quartz/runtime/pom.xml +++ b/extensions/quartz/runtime/pom.xml @@ -55,6 +55,17 @@ --> + + + io.opentelemetry.instrumentation + opentelemetry-instrumentation-api + true + + + io.opentelemetry.instrumentation + opentelemetry-instrumentation-api-semconv + true + diff --git a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/InstrumentedJob.java b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/InstrumentedJob.java new file mode 100644 index 0000000000000..f7744c282895a --- /dev/null +++ b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/InstrumentedJob.java @@ -0,0 +1,51 @@ +package io.quarkus.quartz.runtime; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + +import org.quartz.Job; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; +import org.quartz.JobKey; + +import io.quarkus.scheduler.spi.JobInstrumenter; +import io.quarkus.scheduler.spi.JobInstrumenter.JobInstrumentationContext; + +/** + * + * @see JobInstrumenter + */ +class InstrumentedJob implements Job { + + private final Job delegate; + private final JobInstrumenter instrumenter; + + InstrumentedJob(Job delegate, JobInstrumenter instrumenter) { + this.delegate = delegate; + this.instrumenter = instrumenter; + } + + @Override + public void execute(JobExecutionContext context) throws JobExecutionException { + instrumenter.instrument(new JobInstrumentationContext() { + + @Override + public CompletionStage executeJob() { + try { + delegate.execute(context); + return CompletableFuture.completedFuture(null); + } catch (Exception e) { + return CompletableFuture.failedFuture(e); + } + + } + + @Override + public String getSpanName() { + JobKey key = context.getJobDetail().getKey(); + return key.getGroup() + '.' + key.getName(); + } + }); + } + +} diff --git a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSchedulerImpl.java b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSchedulerImpl.java index ce07d6ee5914c..cedfa97b9cfac 100644 --- a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSchedulerImpl.java +++ b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSchedulerImpl.java @@ -85,9 +85,11 @@ import io.quarkus.scheduler.common.runtime.SchedulerContext; import io.quarkus.scheduler.common.runtime.SyntheticScheduled; import io.quarkus.scheduler.common.runtime.util.SchedulerUtils; +import io.quarkus.scheduler.runtime.SchedulerConfig; import io.quarkus.scheduler.runtime.SchedulerRuntimeConfig; import io.quarkus.scheduler.runtime.SchedulerRuntimeConfig.StartMode; import io.quarkus.scheduler.runtime.SimpleScheduler; +import io.quarkus.scheduler.spi.JobInstrumenter; import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle; import io.quarkus.virtual.threads.VirtualThreadsRecorder; import io.smallrye.common.vertx.VertxContext; @@ -123,6 +125,8 @@ public class QuartzSchedulerImpl implements QuartzScheduler { private final Event scheduledJobPausedEvent; private final Event scheduledJobResumedEvent; private final QuartzRuntimeConfig runtimeConfig; + private final SchedulerConfig schedulerConfig; + private final Instance jobInstrumenter; public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport, SchedulerRuntimeConfig schedulerRuntimeConfig, @@ -131,7 +135,8 @@ public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport Event schedulerResumedEvent, Event scheduledJobPausedEvent, Event scheduledJobResumedEvent, Instance jobs, Instance userTransaction, - Vertx vertx) { + Vertx vertx, + SchedulerConfig schedulerConfig, Instance jobInstrumenter) { this.shutdownWaitTime = quartzSupport.getRuntimeConfig().shutdownWaitTime; this.skippedExecutionEvent = skippedExecutionEvent; this.successExecutionEvent = successExecutionEvent; @@ -143,6 +148,8 @@ public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport this.runtimeConfig = quartzSupport.getRuntimeConfig(); this.enabled = schedulerRuntimeConfig.enabled; this.defaultOverdueGracePeriod = schedulerRuntimeConfig.overdueGracePeriod; + this.schedulerConfig = schedulerConfig; + this.jobInstrumenter = jobInstrumenter; StartMode startMode = initStartMode(schedulerRuntimeConfig, runtimeConfig); @@ -176,6 +183,11 @@ public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport CronDefinition def = CronDefinitionBuilder.instanceDefinitionFor(cronType); cronParser = new CronParser(def); + JobInstrumenter instrumenter = null; + if (schedulerConfig.tracingEnabled && jobInstrumenter.isResolvable()) { + instrumenter = jobInstrumenter.get(); + } + if (!enabled) { LOGGER.info("Quartz scheduler is disabled by config property and will not be started"); this.scheduler = null; @@ -196,7 +208,8 @@ public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport scheduler = schedulerFactory.getScheduler(); // Set custom job factory - scheduler.setJobFactory(new InvokerJobFactory(scheduledTasks, jobs, vertx)); + scheduler.setJobFactory( + new InvokerJobFactory(scheduledTasks, jobs, vertx, instrumenter)); if (transaction != null) { transaction.begin(); @@ -209,11 +222,12 @@ public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport if (identity.isEmpty()) { identity = ++nameSequence + "_" + method.getInvokerClassName(); } + ScheduledInvoker invoker = SimpleScheduler.initInvoker( context.createInvoker(method.getInvokerClassName()), skippedExecutionEvent, successExecutionEvent, failedExecutionEvent, scheduled.concurrentExecution(), - SimpleScheduler.initSkipPredicate(scheduled.skipExecutionIf())); + SimpleScheduler.initSkipPredicate(scheduled.skipExecutionIf()), instrumenter); JobDetail jobDetail = createJobDetail(identity, method.getInvokerClassName()); Optional> triggerBuilder = createTrigger(identity, scheduled, cronType, runtimeConfig, @@ -791,6 +805,7 @@ public boolean isRunningOnVirtualThread() { }; } else { invoker = new DefaultInvoker() { + @Override public CompletionStage invokeBean(ScheduledExecution execution) { try { @@ -807,6 +822,7 @@ public boolean isBlocking() { }; } + Scheduled scheduled = new SyntheticScheduled(identity, cron, every, 0, TimeUnit.MINUTES, delayed, overdueGracePeriod, concurrentExecution, skipPredicate, timeZone); @@ -814,8 +830,12 @@ public boolean isBlocking() { Optional> triggerBuilder = createTrigger(identity, scheduled, cronType, runtimeConfig, jobDetail); if (triggerBuilder.isPresent()) { + JobInstrumenter instrumenter = null; + if (schedulerConfig.tracingEnabled && jobInstrumenter.isResolvable()) { + instrumenter = jobInstrumenter.get(); + } invoker = SimpleScheduler.initInvoker(invoker, skippedExecutionEvent, successExecutionEvent, - failedExecutionEvent, concurrentExecution, skipPredicate); + failedExecutionEvent, concurrentExecution, skipPredicate, instrumenter); org.quartz.Trigger trigger = triggerBuilder.get().build(); QuartzTrigger existing = scheduledTasks.putIfAbsent(identity, new QuartzTrigger(trigger.getKey(), new Function<>() { @@ -895,6 +915,7 @@ public void run() { }); } else { context.executeBlocking(new Callable() { + @Override public Object call() throws Exception { return trigger.invoker.invoke(new QuartzScheduledExecution(trigger, jobExecutionContext)); @@ -918,9 +939,9 @@ public void handle(Void event) { } } else { String jobName = jobExecutionContext.getJobDetail().getKey().getName(); - LOGGER.warnf("Unable to find corresponding Quartz trigger for job %s. " + - "Update your Quartz table by removing all phantom jobs or make sure that there is a " + - "Scheduled method with the identity matching the job's name", jobName); + LOGGER.warnf("Unable to find corresponding Quartz trigger for job %s. " + + "Update your Quartz table by removing all phantom jobs or make sure that there is a " + + "Scheduled method with the identity matching the job's name", jobName); } } } @@ -1018,11 +1039,15 @@ static class InvokerJobFactory extends SimpleJobFactory { final Map scheduledTasks; final Instance jobs; final Vertx vertx; + final JobInstrumenter instrumenter; - InvokerJobFactory(Map scheduledTasks, Instance jobs, Vertx vertx) { + InvokerJobFactory(Map scheduledTasks, Instance jobs, Vertx vertx, + JobInstrumenter instrumenter) { this.scheduledTasks = scheduledTasks; this.jobs = jobs; this.vertx = vertx; + this.instrumenter = instrumenter; + } @SuppressWarnings("unchecked") @@ -1039,9 +1064,16 @@ public Job newJob(TriggerFiredBundle bundle, org.quartz.Scheduler Scheduler) thr } Instance instance = jobs.select(jobClass); if (instance.isResolvable()) { - return (Job) instance.get(); + return jobWithSpanWrapper((Job) instance.get()); + } + return jobWithSpanWrapper(super.newJob(bundle, Scheduler)); + } + + private Job jobWithSpanWrapper(Job job) { + if (instrumenter != null) { + return new InstrumentedJob(job, instrumenter); } - return super.newJob(bundle, Scheduler); + return job; } } diff --git a/extensions/scheduler/common/pom.xml b/extensions/scheduler/common/pom.xml index 7da88cfc94fe6..126368f70c885 100644 --- a/extensions/scheduler/common/pom.xml +++ b/extensions/scheduler/common/pom.xml @@ -17,6 +17,14 @@ io.quarkus quarkus-scheduler-api + + io.quarkus + quarkus-scheduler-spi + + + io.opentelemetry.instrumentation + opentelemetry-instrumentation-api + com.cronutils cron-utils diff --git a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/InstrumentedInvoker.java b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/InstrumentedInvoker.java new file mode 100644 index 0000000000000..c0e330001f436 --- /dev/null +++ b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/InstrumentedInvoker.java @@ -0,0 +1,43 @@ +package io.quarkus.scheduler.common.runtime; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + +import io.quarkus.scheduler.ScheduledExecution; +import io.quarkus.scheduler.spi.JobInstrumenter; +import io.quarkus.scheduler.spi.JobInstrumenter.JobInstrumentationContext; + +/** + * + * @see JobInstrumenter + */ +public class InstrumentedInvoker extends DelegateInvoker { + + private final JobInstrumenter instrumenter; + + public InstrumentedInvoker(ScheduledInvoker delegate, JobInstrumenter instrumenter) { + super(delegate); + this.instrumenter = instrumenter; + } + + @Override + public CompletionStage invoke(ScheduledExecution execution) throws Exception { + return instrumenter.instrument(new JobInstrumentationContext() { + + @Override + public CompletionStage executeJob() { + try { + return delegate.invoke(execution); + } catch (Exception e) { + return CompletableFuture.failedFuture(e); + } + } + + @Override + public String getSpanName() { + return execution.getTrigger().getId(); + } + }); + } + +} diff --git a/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/SchedulerProcessor.java b/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/SchedulerProcessor.java index 60196c55e4190..82c9a9ba89d69 100644 --- a/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/SchedulerProcessor.java +++ b/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/SchedulerProcessor.java @@ -367,30 +367,6 @@ public void metrics(SchedulerConfig config, } } - @BuildStep - public void tracing(SchedulerConfig config, - Capabilities capabilities, BuildProducer annotationsTransformer) { - - if (config.tracingEnabled && capabilities.isPresent(Capability.OPENTELEMETRY_TRACER)) { - DotName withSpan = DotName.createSimple("io.opentelemetry.instrumentation.annotations.WithSpan"); - DotName legacyWithSpan = DotName.createSimple("io.opentelemetry.extension.annotations.WithSpan"); - - annotationsTransformer.produce(new AnnotationsTransformerBuildItem(AnnotationsTransformer.builder() - .appliesTo(METHOD) - .whenContainsAny(List.of(SchedulerDotNames.SCHEDULED_NAME, SchedulerDotNames.SCHEDULES_NAME)) - .whenContainsNone(List.of(withSpan, legacyWithSpan)) - .transform(context -> { - MethodInfo scheduledMethod = context.getTarget().asMethod(); - context.transform() - .add(withSpan) - .done(); - LOGGER.debugf("Added OpenTelemetry @WithSpan to a @Scheduled method %s#%s()", - scheduledMethod.declaringClass().name(), - scheduledMethod.name()); - }))); - } - } - private String generateInvoker(ScheduledBusinessMethodItem scheduledMethod, ClassOutput classOutput) { BeanInfo bean = scheduledMethod.getBean(); diff --git a/extensions/scheduler/pom.xml b/extensions/scheduler/pom.xml index a763e4e95e6d8..babdb6acf34c2 100644 --- a/extensions/scheduler/pom.xml +++ b/extensions/scheduler/pom.xml @@ -17,6 +17,7 @@ deployment api + spi common kotlin runtime diff --git a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SchedulerConfig.java b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SchedulerConfig.java index 9e6274a255de5..23ce44a235145 100644 --- a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SchedulerConfig.java +++ b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SchedulerConfig.java @@ -25,9 +25,9 @@ public class SchedulerConfig { public boolean metricsEnabled; /** - * Tracing will be enabled if the OpenTelemetry extension is present and this value is true. + * Controls whether tracing is enabled. If set to true and the OpenTelemetry extension is present, + * tracing will be enabled, creating automatic Spans for each scheduled task. */ @ConfigItem(name = "tracing.enabled") public boolean tracingEnabled; - } diff --git a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java index d69c97d0d3ba1..afc99c574ee8d 100644 --- a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java +++ b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java @@ -24,6 +24,7 @@ import jakarta.enterprise.event.Event; import jakarta.enterprise.event.Observes; import jakarta.enterprise.inject.Any; +import jakarta.enterprise.inject.Instance; import jakarta.enterprise.inject.Typed; import jakarta.inject.Singleton; import jakarta.interceptor.Interceptor; @@ -55,6 +56,7 @@ import io.quarkus.scheduler.common.runtime.AbstractJobDefinition; import io.quarkus.scheduler.common.runtime.DefaultInvoker; import io.quarkus.scheduler.common.runtime.Events; +import io.quarkus.scheduler.common.runtime.InstrumentedInvoker; import io.quarkus.scheduler.common.runtime.ScheduledInvoker; import io.quarkus.scheduler.common.runtime.ScheduledMethod; import io.quarkus.scheduler.common.runtime.SchedulerContext; @@ -64,6 +66,7 @@ import io.quarkus.scheduler.common.runtime.SyntheticScheduled; import io.quarkus.scheduler.common.runtime.util.SchedulerUtils; import io.quarkus.scheduler.runtime.SchedulerRuntimeConfig.StartMode; +import io.quarkus.scheduler.spi.JobInstrumenter; import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle; import io.quarkus.virtual.threads.VirtualThreadsRecorder; import io.smallrye.common.vertx.VertxContext; @@ -94,12 +97,15 @@ public class SimpleScheduler implements Scheduler { private final Event schedulerResumedEvent; private final Event scheduledJobPausedEvent; private final Event scheduledJobResumedEvent; + private final SchedulerConfig schedulerConfig; + private final Instance jobInstrumenter; public SimpleScheduler(SchedulerContext context, SchedulerRuntimeConfig schedulerRuntimeConfig, Event skippedExecutionEvent, Event successExecutionEvent, Event failedExecutionEvent, Event schedulerPausedEvent, Event schedulerResumedEvent, Event scheduledJobPausedEvent, - Event scheduledJobResumedEvent, Vertx vertx) { + Event scheduledJobResumedEvent, Vertx vertx, SchedulerConfig schedulerConfig, + Instance jobInstrumenter) { this.running = true; this.enabled = schedulerRuntimeConfig.enabled; this.scheduledTasks = new ConcurrentHashMap<>(); @@ -111,6 +117,8 @@ public SimpleScheduler(SchedulerContext context, SchedulerRuntimeConfig schedule this.schedulerResumedEvent = schedulerResumedEvent; this.scheduledJobPausedEvent = scheduledJobPausedEvent; this.scheduledJobResumedEvent = scheduledJobResumedEvent; + this.schedulerConfig = schedulerConfig; + this.jobInstrumenter = jobInstrumenter; CronDefinition definition = CronDefinitionBuilder.instanceDefinitionFor(context.getCronType()); this.cronParser = new CronParser(definition); @@ -153,9 +161,13 @@ public void run() { Optional trigger = createTrigger(id, method.getMethodDescription(), cronParser, scheduled, defaultOverdueGracePeriod); if (trigger.isPresent()) { + JobInstrumenter instrumenter = null; + if (schedulerConfig.tracingEnabled && jobInstrumenter.isResolvable()) { + instrumenter = jobInstrumenter.get(); + } ScheduledInvoker invoker = initInvoker(context.createInvoker(method.getInvokerClassName()), skippedExecutionEvent, successExecutionEvent, failedExecutionEvent, - scheduled.concurrentExecution(), initSkipPredicate(scheduled.skipExecutionIf())); + scheduled.concurrentExecution(), initSkipPredicate(scheduled.skipExecutionIf()), instrumenter); scheduledTasks.put(trigger.get().id, new ScheduledTask(trigger.get(), invoker, false)); } } @@ -168,7 +180,7 @@ public JobDefinition newJob(String identity) { if (scheduledTasks.containsKey(identity)) { throw new IllegalStateException("A job with this identity is already scheduled: " + identity); } - return new SimpleJobDefinition(identity); + return new SimpleJobDefinition(identity, schedulerConfig); } @Override @@ -352,7 +364,7 @@ Optional createTrigger(String id, String methodDescription, CronP public static ScheduledInvoker initInvoker(ScheduledInvoker invoker, Event skippedExecutionEvent, Event successExecutionEvent, Event failedExecutionEvent, ConcurrentExecution concurrentExecution, - Scheduled.SkipPredicate skipPredicate) { + Scheduled.SkipPredicate skipPredicate, JobInstrumenter instrumenter) { invoker = new StatusEmitterInvoker(invoker, successExecutionEvent, failedExecutionEvent); if (concurrentExecution == ConcurrentExecution.SKIP) { invoker = new SkipConcurrentExecutionInvoker(invoker, skippedExecutionEvent); @@ -360,6 +372,9 @@ public static ScheduledInvoker initInvoker(ScheduledInvoker invoker, Event + + + io.quarkus + quarkus-scheduler-parent + 999-SNAPSHOT + + 4.0.0 + + quarkus-scheduler-spi + Quarkus - Scheduler - SPI + + + + io.quarkus + quarkus-scheduler-api + + + diff --git a/extensions/scheduler/spi/src/main/java/io/quarkus/scheduler/spi/JobInstrumenter.java b/extensions/scheduler/spi/src/main/java/io/quarkus/scheduler/spi/JobInstrumenter.java new file mode 100644 index 0000000000000..d6df78b5691c8 --- /dev/null +++ b/extensions/scheduler/spi/src/main/java/io/quarkus/scheduler/spi/JobInstrumenter.java @@ -0,0 +1,23 @@ +package io.quarkus.scheduler.spi; + +import java.util.concurrent.CompletionStage; + +/** + * Instruments a scheduled job. + *

+ * Telemetry extensions can provide exactly one CDI bean of this type. The scope must be either {@link jakarta.inject.Singleton} + * or {@link jakarta.enterprise.context.ApplicationScoped}. + */ +public interface JobInstrumenter { + + CompletionStage instrument(JobInstrumentationContext context); + + interface JobInstrumentationContext { + + String getSpanName(); + + CompletionStage executeJob(); + + } + +} diff --git a/integration-tests/opentelemetry-quartz/pom.xml b/integration-tests/opentelemetry-quartz/pom.xml new file mode 100644 index 0000000000000..006bac75a7f1f --- /dev/null +++ b/integration-tests/opentelemetry-quartz/pom.xml @@ -0,0 +1,144 @@ + + + + quarkus-integration-tests-parent + io.quarkus + 999-SNAPSHOT + + 4.0.0 + + quarkus-integration-test-opentelemetry-quartz + Quarkus - Integration Tests - OpenTelemetry Quartz + + + + io.quarkus + quarkus-arc + + + io.quarkus + quarkus-resteasy-reactive + + + io.quarkus + quarkus-quartz + + + io.quarkus + quarkus-opentelemetry + + + io.quarkus + quarkus-resteasy-reactive-jackson + + + + + io.opentelemetry + opentelemetry-sdk-testing + + + + + io.quarkus + quarkus-junit5 + test + + + io.rest-assured + rest-assured + test + + + org.awaitility + awaitility + test + + + + io.quarkus + quarkus-arc-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-quartz-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-resteasy-reactive-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-opentelemetry-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-resteasy-reactive-jackson-deployment + ${project.version} + pom + test + + + * + * + + + + + + + + + io.quarkus + quarkus-maven-plugin + + + + build + + + + + + + + + diff --git a/integration-tests/opentelemetry-quartz/src/main/java/io/quarkus/it/opentelemetry/quartz/CountResource.java b/integration-tests/opentelemetry-quartz/src/main/java/io/quarkus/it/opentelemetry/quartz/CountResource.java new file mode 100644 index 0000000000000..dd6f36e860655 --- /dev/null +++ b/integration-tests/opentelemetry-quartz/src/main/java/io/quarkus/it/opentelemetry/quartz/CountResource.java @@ -0,0 +1,40 @@ +package io.quarkus.it.opentelemetry.quartz; + +import jakarta.inject.Inject; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.core.MediaType; + +@Path("/scheduler/count") +public class CountResource { + + @Inject + Counter counter; + + @Inject + ManualScheduledCounter manualScheduledCounter; + + @Inject + JobDefinitionCounter jobDefinitionCounter; + + @GET + @Produces(MediaType.TEXT_PLAIN) + public Integer getCount() { + return counter.get(); + } + + @GET + @Path("manual") + @Produces(MediaType.TEXT_PLAIN) + public Integer getManualCount() { + return manualScheduledCounter.get(); + } + + @GET + @Path("job-definition") + @Produces(MediaType.TEXT_PLAIN) + public Integer getJobDefinitionCount() { + return jobDefinitionCounter.get(); + } +} diff --git a/integration-tests/opentelemetry-quartz/src/main/java/io/quarkus/it/opentelemetry/quartz/Counter.java b/integration-tests/opentelemetry-quartz/src/main/java/io/quarkus/it/opentelemetry/quartz/Counter.java new file mode 100644 index 0000000000000..0199a8fb362a5 --- /dev/null +++ b/integration-tests/opentelemetry-quartz/src/main/java/io/quarkus/it/opentelemetry/quartz/Counter.java @@ -0,0 +1,30 @@ +package io.quarkus.it.opentelemetry.quartz; + +import java.util.concurrent.atomic.AtomicInteger; + +import jakarta.annotation.PostConstruct; +import jakarta.enterprise.context.ApplicationScoped; + +import io.quarkus.scheduler.Scheduled; + +@ApplicationScoped +public class Counter { + + AtomicInteger counter; + + @PostConstruct + void init() { + counter = new AtomicInteger(); + } + + public int get() { + return counter.get(); + } + + @Scheduled(cron = "*/1 * * * * ?", identity = "myCounter") + void increment() throws InterruptedException { + Thread.sleep(100l); + counter.incrementAndGet(); + } + +} diff --git a/integration-tests/opentelemetry-quartz/src/main/java/io/quarkus/it/opentelemetry/quartz/ExporterResource.java b/integration-tests/opentelemetry-quartz/src/main/java/io/quarkus/it/opentelemetry/quartz/ExporterResource.java new file mode 100644 index 0000000000000..5f6407cc16167 --- /dev/null +++ b/integration-tests/opentelemetry-quartz/src/main/java/io/quarkus/it/opentelemetry/quartz/ExporterResource.java @@ -0,0 +1,38 @@ +package io.quarkus.it.opentelemetry.quartz; + +import java.util.List; +import java.util.stream.Collectors; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Produces; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; + +import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; +import io.opentelemetry.sdk.trace.data.SpanData; + +@Path("") +public class ExporterResource { + @Inject + InMemorySpanExporter inMemorySpanExporter; + + @GET + @Path("/export") + public List export() { // only export scheduled spans + return inMemorySpanExporter.getFinishedSpanItems() + .stream() + .filter(sd -> !sd.getName().contains("export") && !sd.getName().contains("GET")) + .collect(Collectors.toList()); + } + + @ApplicationScoped + static class InMemorySpanExporterProducer { + @Produces + @Singleton + InMemorySpanExporter inMemorySpanExporter() { + return InMemorySpanExporter.create(); + } + } +} diff --git a/integration-tests/opentelemetry-quartz/src/main/java/io/quarkus/it/opentelemetry/quartz/FailedBasicScheduler.java b/integration-tests/opentelemetry-quartz/src/main/java/io/quarkus/it/opentelemetry/quartz/FailedBasicScheduler.java new file mode 100644 index 0000000000000..9450827e7e328 --- /dev/null +++ b/integration-tests/opentelemetry-quartz/src/main/java/io/quarkus/it/opentelemetry/quartz/FailedBasicScheduler.java @@ -0,0 +1,17 @@ +package io.quarkus.it.opentelemetry.quartz; + +import jakarta.enterprise.context.ApplicationScoped; + +import io.quarkus.scheduler.Scheduled; + +@ApplicationScoped +public class FailedBasicScheduler { + + @Scheduled(cron = "*/1 * * * * ?", identity = "myFailedBasicScheduler") + void init() throws InterruptedException { + Thread.sleep(100l); + throw new RuntimeException("error occurred in myFailedBasicScheduler."); + + } + +} diff --git a/integration-tests/opentelemetry-quartz/src/main/java/io/quarkus/it/opentelemetry/quartz/FailedJobDefinitionScheduler.java b/integration-tests/opentelemetry-quartz/src/main/java/io/quarkus/it/opentelemetry/quartz/FailedJobDefinitionScheduler.java new file mode 100644 index 0000000000000..705b5c357c9aa --- /dev/null +++ b/integration-tests/opentelemetry-quartz/src/main/java/io/quarkus/it/opentelemetry/quartz/FailedJobDefinitionScheduler.java @@ -0,0 +1,29 @@ +package io.quarkus.it.opentelemetry.quartz; + +import jakarta.annotation.PostConstruct; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import io.quarkus.quartz.QuartzScheduler; +import io.quarkus.runtime.Startup; + +@ApplicationScoped +@Startup +public class FailedJobDefinitionScheduler { + + @Inject + QuartzScheduler scheduler; + + @PostConstruct + void init() { + scheduler.newJob("myFailedJobDefinition").setCron("*/1 * * * * ?").setTask(ex -> { + try { + Thread.sleep(100l); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + throw new RuntimeException("error occurred in myFailedJobDefinition."); + }).schedule(); + } + +} diff --git a/integration-tests/opentelemetry-quartz/src/main/java/io/quarkus/it/opentelemetry/quartz/FailedManualScheduler.java b/integration-tests/opentelemetry-quartz/src/main/java/io/quarkus/it/opentelemetry/quartz/FailedManualScheduler.java new file mode 100644 index 0000000000000..21dc72c5f5adf --- /dev/null +++ b/integration-tests/opentelemetry-quartz/src/main/java/io/quarkus/it/opentelemetry/quartz/FailedManualScheduler.java @@ -0,0 +1,52 @@ +package io.quarkus.it.opentelemetry.quartz; + +import jakarta.annotation.PostConstruct; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import org.quartz.Job; +import org.quartz.JobBuilder; +import org.quartz.JobDetail; +import org.quartz.JobExecutionContext; +import org.quartz.SchedulerException; +import org.quartz.SimpleScheduleBuilder; +import org.quartz.Trigger; +import org.quartz.TriggerBuilder; + +import io.quarkus.runtime.Startup; +import io.quarkus.runtime.annotations.RegisterForReflection; + +@Startup +@ApplicationScoped +public class FailedManualScheduler { + @Inject + org.quartz.Scheduler quartz; + + @PostConstruct + void init() throws SchedulerException { + JobDetail job = JobBuilder.newJob(CountingJob.class).withIdentity("myFailedManualJob", "myFailedGroup").build(); + Trigger trigger = TriggerBuilder + .newTrigger() + .withIdentity("myFailedTrigger", "myFailedGroup") + .startNow() + .withSchedule(SimpleScheduleBuilder + .simpleSchedule() + .repeatForever() + .withIntervalInSeconds(1)) + .build(); + quartz.scheduleJob(job, trigger); + } + + @RegisterForReflection + public static class CountingJob implements Job { + @Override + public void execute(JobExecutionContext jobExecutionContext) { + try { + Thread.sleep(100l); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + throw new RuntimeException("error occurred in myFailedManualJob."); + } + } +} diff --git a/integration-tests/opentelemetry-quartz/src/main/java/io/quarkus/it/opentelemetry/quartz/JobDefinitionCounter.java b/integration-tests/opentelemetry-quartz/src/main/java/io/quarkus/it/opentelemetry/quartz/JobDefinitionCounter.java new file mode 100644 index 0000000000000..70d1038080df3 --- /dev/null +++ b/integration-tests/opentelemetry-quartz/src/main/java/io/quarkus/it/opentelemetry/quartz/JobDefinitionCounter.java @@ -0,0 +1,37 @@ +package io.quarkus.it.opentelemetry.quartz; + +import java.util.concurrent.atomic.AtomicInteger; + +import jakarta.annotation.PostConstruct; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import io.quarkus.quartz.QuartzScheduler; +import io.quarkus.runtime.Startup; + +@ApplicationScoped +@Startup +public class JobDefinitionCounter { + + @Inject + QuartzScheduler scheduler; + + AtomicInteger counter; + + @PostConstruct + void init() { + counter = new AtomicInteger(); + scheduler.newJob("myJobDefinition").setCron("*/1 * * * * ?").setTask(ex -> { + try { + Thread.sleep(100l); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + counter.incrementAndGet(); + }).schedule(); + } + + public int get() { + return counter.get(); + } +} diff --git a/integration-tests/opentelemetry-quartz/src/main/java/io/quarkus/it/opentelemetry/quartz/ManualScheduledCounter.java b/integration-tests/opentelemetry-quartz/src/main/java/io/quarkus/it/opentelemetry/quartz/ManualScheduledCounter.java new file mode 100644 index 0000000000000..00aca09f07c11 --- /dev/null +++ b/integration-tests/opentelemetry-quartz/src/main/java/io/quarkus/it/opentelemetry/quartz/ManualScheduledCounter.java @@ -0,0 +1,59 @@ +package io.quarkus.it.opentelemetry.quartz; + +import java.util.concurrent.atomic.AtomicInteger; + +import jakarta.annotation.PostConstruct; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import org.quartz.Job; +import org.quartz.JobBuilder; +import org.quartz.JobDetail; +import org.quartz.JobExecutionContext; +import org.quartz.SchedulerException; +import org.quartz.SimpleScheduleBuilder; +import org.quartz.Trigger; +import org.quartz.TriggerBuilder; + +import io.quarkus.runtime.Startup; +import io.quarkus.runtime.annotations.RegisterForReflection; + +@Startup +@ApplicationScoped +public class ManualScheduledCounter { + @Inject + org.quartz.Scheduler quartz; + private static AtomicInteger counter = new AtomicInteger(); + + public int get() { + return counter.get(); + } + + @PostConstruct + void init() throws SchedulerException { + JobDetail job = JobBuilder.newJob(CountingJob.class).withIdentity("myManualJob", "myGroup").build(); + Trigger trigger = TriggerBuilder + .newTrigger() + .withIdentity("myTrigger", "myGroup") + .startNow() + .withSchedule(SimpleScheduleBuilder + .simpleSchedule() + .repeatForever() + .withIntervalInSeconds(1)) + .build(); + quartz.scheduleJob(job, trigger); + } + + @RegisterForReflection + public static class CountingJob implements Job { + @Override + public void execute(JobExecutionContext jobExecutionContext) { + try { + Thread.sleep(100l); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + counter.incrementAndGet(); + } + } +} diff --git a/integration-tests/opentelemetry-quartz/src/main/resources/application.properties b/integration-tests/opentelemetry-quartz/src/main/resources/application.properties new file mode 100644 index 0000000000000..b32b9f635240d --- /dev/null +++ b/integration-tests/opentelemetry-quartz/src/main/resources/application.properties @@ -0,0 +1,5 @@ +# speed up build +quarkus.otel.bsp.schedule.delay=100 +quarkus.otel.bsp.export.timeout=5s + +quarkus.scheduler.tracing.enabled=true \ No newline at end of file diff --git a/integration-tests/opentelemetry-quartz/src/test/java/io/quarkus/it/opentelemetry/quartz/OpenTelemetryQuartzIT.java b/integration-tests/opentelemetry-quartz/src/test/java/io/quarkus/it/opentelemetry/quartz/OpenTelemetryQuartzIT.java new file mode 100644 index 0000000000000..e938a8d4f73c7 --- /dev/null +++ b/integration-tests/opentelemetry-quartz/src/test/java/io/quarkus/it/opentelemetry/quartz/OpenTelemetryQuartzIT.java @@ -0,0 +1,16 @@ +package io.quarkus.it.opentelemetry.quartz; + +import org.junit.jupiter.api.Test; + +import io.quarkus.test.junit.DisabledOnIntegrationTest; +import io.quarkus.test.junit.QuarkusIntegrationTest; + +@QuarkusIntegrationTest +public class OpenTelemetryQuartzIT extends OpenTelemetryQuartzTest { + @Test + @DisabledOnIntegrationTest("native mode testing span does not have a field 'exception' (only in integration-test, not in quarkus app)") + @Override + public void quartzSpanTest() { + super.quartzSpanTest(); + } +} diff --git a/integration-tests/opentelemetry-quartz/src/test/java/io/quarkus/it/opentelemetry/quartz/OpenTelemetryQuartzTest.java b/integration-tests/opentelemetry-quartz/src/test/java/io/quarkus/it/opentelemetry/quartz/OpenTelemetryQuartzTest.java new file mode 100644 index 0000000000000..7f3897f6c3671 --- /dev/null +++ b/integration-tests/opentelemetry-quartz/src/test/java/io/quarkus/it/opentelemetry/quartz/OpenTelemetryQuartzTest.java @@ -0,0 +1,104 @@ +package io.quarkus.it.opentelemetry.quartz; + +import static io.restassured.RestAssured.get; +import static io.restassured.RestAssured.given; +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.time.Duration; +import java.util.List; +import java.util.Map; + +import org.junit.jupiter.api.Test; + +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; +import io.quarkus.test.junit.QuarkusTest; +import io.restassured.common.mapper.TypeRef; +import io.restassured.response.Response; + +@QuarkusTest +public class OpenTelemetryQuartzTest { + + private static long DURATION_IN_NANOSECONDS = 100_000_000; // Thread.sleep(100l) for each job + + @Test + public void quartzSpanTest() { + // ensure that scheduled job is called + assertCounter("/scheduler/count", 1, Duration.ofSeconds(1)); + // assert programmatically scheduled job is called + assertCounter("/scheduler/count/manual", 1, Duration.ofSeconds(1)); + // assert JobDefinition type scheduler + assertCounter("/scheduler/count/job-definition", 1, Duration.ofSeconds(1)); + + // ------- SPAN ASSERTS ------- + List> spans = getSpans(); + + assertJobSpan(spans, "myCounter", DURATION_IN_NANOSECONDS); // identity + assertJobSpan(spans, "myGroup.myManualJob", DURATION_IN_NANOSECONDS); // group + identity + assertJobSpan(spans, "myJobDefinition", DURATION_IN_NANOSECONDS); // identity + + // errors + assertErrorJobSpan(spans, "myFailedBasicScheduler", DURATION_IN_NANOSECONDS, + "error occurred in myFailedBasicScheduler."); + assertErrorJobSpan(spans, "myFailedGroup.myFailedManualJob", DURATION_IN_NANOSECONDS, + "error occurred in myFailedManualJob."); + assertErrorJobSpan(spans, "myFailedJobDefinition", DURATION_IN_NANOSECONDS, + "error occurred in myFailedJobDefinition."); + + } + + private void assertCounter(String counterPath, int expectedCount, Duration timeout) { + await().atMost(timeout) + .until(() -> { + Response response = given().when().get(counterPath); + int code = response.statusCode(); + if (code != 200) { + return false; + } + String body = response.asString(); + int count = Integer.valueOf(body); + return count >= expectedCount; + }); + + } + + private List> getSpans() { + return get("/export").body().as(new TypeRef<>() { + }); + } + + private void assertJobSpan(List> spans, String expectedName, long expectedDuration) { + assertNotNull(spans); + assertFalse(spans.isEmpty()); + Map span = spans.stream().filter(map -> map.get("name").equals(expectedName)).findFirst().orElse(null); + assertNotNull(span, "Span with name '" + expectedName + "' not found."); + assertEquals(SpanKind.INTERNAL.toString(), span.get("kind"), "Span with name '" + expectedName + "' is not internal."); + + long start = (long) span.get("startEpochNanos"); + long end = (long) span.get("endEpochNanos"); + long delta = (end - start); + assertTrue(delta >= expectedDuration, + "Duration of span with name '" + expectedName + + "' is not longer than 100ms, actual duration: " + delta + " (ns)"); + } + + private void assertErrorJobSpan(List> spans, String expectedName, long expectedDuration, + String expectedErrorMessage) { + assertJobSpan(spans, expectedName, expectedDuration); + Map span = spans.stream().filter(map -> map.get("name").equals(expectedName)).findFirst() + .orElseThrow(AssertionError::new); // this assert should never be thrown, since we already checked it in `assertJobSpan` + + Map statusAttributes = (Map) span.get("status"); + assertNotNull(statusAttributes, "Span with name '" + expectedName + "' is not an ERROR"); + assertEquals(StatusCode.ERROR.toString(), statusAttributes.get("statusCode"), + "Span with name '" + expectedName + "' is not an ERROR"); + Map exception = (Map) ((List>) span.get("events")).stream() + .map(map -> map.get("exception")).findFirst().orElseThrow(AssertionError::new); + assertTrue(((String) exception.get("message")).contains(expectedErrorMessage), + "Span with name '" + expectedName + "' has wrong error message"); + } +} diff --git a/integration-tests/opentelemetry-scheduler/pom.xml b/integration-tests/opentelemetry-scheduler/pom.xml new file mode 100644 index 0000000000000..f0fc716016933 --- /dev/null +++ b/integration-tests/opentelemetry-scheduler/pom.xml @@ -0,0 +1,144 @@ + + + + quarkus-integration-tests-parent + io.quarkus + 999-SNAPSHOT + + 4.0.0 + + quarkus-integration-test-opentelemetry-scheduler + Quarkus - Integration Tests - OpenTelemetry Scheduler + + + + io.quarkus + quarkus-arc + + + io.quarkus + quarkus-resteasy-reactive + + + io.quarkus + quarkus-scheduler + + + io.quarkus + quarkus-opentelemetry + + + io.quarkus + quarkus-resteasy-reactive-jackson + + + + + io.opentelemetry + opentelemetry-sdk-testing + + + + + io.quarkus + quarkus-junit5 + test + + + io.rest-assured + rest-assured + test + + + org.awaitility + awaitility + test + + + + io.quarkus + quarkus-arc-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-scheduler-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-resteasy-reactive-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-opentelemetry-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-resteasy-reactive-jackson-deployment + ${project.version} + pom + test + + + * + * + + + + + + + + + io.quarkus + quarkus-maven-plugin + + + + build + + + + + + + + + diff --git a/integration-tests/opentelemetry-scheduler/src/main/java/io/quarkus/it/opentelemetry/scheduler/CountResource.java b/integration-tests/opentelemetry-scheduler/src/main/java/io/quarkus/it/opentelemetry/scheduler/CountResource.java new file mode 100644 index 0000000000000..90906d9bc9288 --- /dev/null +++ b/integration-tests/opentelemetry-scheduler/src/main/java/io/quarkus/it/opentelemetry/scheduler/CountResource.java @@ -0,0 +1,30 @@ +package io.quarkus.it.opentelemetry.scheduler; + +import jakarta.inject.Inject; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.core.MediaType; + +@Path("/scheduler/count") +public class CountResource { + + @Inject + Counter counter; + + @Inject + JobDefinitionCounter jobDefinitionCounter; + + @GET + @Produces(MediaType.TEXT_PLAIN) + public Integer getCount() { + return counter.get(); + } + + @GET + @Path("job-definition") + @Produces(MediaType.TEXT_PLAIN) + public Integer getJobDefinitionCount() { + return jobDefinitionCounter.get(); + } +} diff --git a/integration-tests/opentelemetry-scheduler/src/main/java/io/quarkus/it/opentelemetry/scheduler/Counter.java b/integration-tests/opentelemetry-scheduler/src/main/java/io/quarkus/it/opentelemetry/scheduler/Counter.java new file mode 100644 index 0000000000000..c175e7716fe9f --- /dev/null +++ b/integration-tests/opentelemetry-scheduler/src/main/java/io/quarkus/it/opentelemetry/scheduler/Counter.java @@ -0,0 +1,30 @@ +package io.quarkus.it.opentelemetry.scheduler; + +import java.util.concurrent.atomic.AtomicInteger; + +import jakarta.annotation.PostConstruct; +import jakarta.enterprise.context.ApplicationScoped; + +import io.quarkus.scheduler.Scheduled; + +@ApplicationScoped +public class Counter { + + AtomicInteger counter; + + @PostConstruct + void init() { + counter = new AtomicInteger(); + } + + public int get() { + return counter.get(); + } + + @Scheduled(cron = "*/1 * * * * ?", identity = "myCounter") + void increment() throws InterruptedException { + Thread.sleep(100l); + counter.incrementAndGet(); + } + +} diff --git a/integration-tests/opentelemetry-scheduler/src/main/java/io/quarkus/it/opentelemetry/scheduler/ExporterResource.java b/integration-tests/opentelemetry-scheduler/src/main/java/io/quarkus/it/opentelemetry/scheduler/ExporterResource.java new file mode 100644 index 0000000000000..b7d246eb46673 --- /dev/null +++ b/integration-tests/opentelemetry-scheduler/src/main/java/io/quarkus/it/opentelemetry/scheduler/ExporterResource.java @@ -0,0 +1,38 @@ +package io.quarkus.it.opentelemetry.scheduler; + +import java.util.List; +import java.util.stream.Collectors; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Produces; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; + +import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; +import io.opentelemetry.sdk.trace.data.SpanData; + +@Path("") +public class ExporterResource { + @Inject + InMemorySpanExporter inMemorySpanExporter; + + @GET + @Path("/export") + public List export() { // only export scheduled spans + return inMemorySpanExporter.getFinishedSpanItems() + .stream() + .filter(sd -> !sd.getName().contains("export") && !sd.getName().contains("GET")) + .collect(Collectors.toList()); + } + + @ApplicationScoped + static class InMemorySpanExporterProducer { + @Produces + @Singleton + InMemorySpanExporter inMemorySpanExporter() { + return InMemorySpanExporter.create(); + } + } +} diff --git a/integration-tests/opentelemetry-scheduler/src/main/java/io/quarkus/it/opentelemetry/scheduler/FailedBasicScheduler.java b/integration-tests/opentelemetry-scheduler/src/main/java/io/quarkus/it/opentelemetry/scheduler/FailedBasicScheduler.java new file mode 100644 index 0000000000000..f03822402cd37 --- /dev/null +++ b/integration-tests/opentelemetry-scheduler/src/main/java/io/quarkus/it/opentelemetry/scheduler/FailedBasicScheduler.java @@ -0,0 +1,17 @@ +package io.quarkus.it.opentelemetry.scheduler; + +import jakarta.enterprise.context.ApplicationScoped; + +import io.quarkus.scheduler.Scheduled; + +@ApplicationScoped +public class FailedBasicScheduler { + + @Scheduled(cron = "*/1 * * * * ?", identity = "myFailedBasicScheduler") + void init() throws InterruptedException { + Thread.sleep(100l); + throw new RuntimeException("error occurred in myFailedBasicScheduler."); + + } + +} diff --git a/integration-tests/opentelemetry-scheduler/src/main/java/io/quarkus/it/opentelemetry/scheduler/FailedJobDefinitionScheduler.java b/integration-tests/opentelemetry-scheduler/src/main/java/io/quarkus/it/opentelemetry/scheduler/FailedJobDefinitionScheduler.java new file mode 100644 index 0000000000000..41d45b6ceff4d --- /dev/null +++ b/integration-tests/opentelemetry-scheduler/src/main/java/io/quarkus/it/opentelemetry/scheduler/FailedJobDefinitionScheduler.java @@ -0,0 +1,29 @@ +package io.quarkus.it.opentelemetry.scheduler; + +import jakarta.annotation.PostConstruct; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import io.quarkus.runtime.Startup; +import io.quarkus.scheduler.Scheduler; + +@ApplicationScoped +@Startup +public class FailedJobDefinitionScheduler { + + @Inject + Scheduler scheduler; + + @PostConstruct + void init() { + scheduler.newJob("myFailedJobDefinition").setCron("*/1 * * * * ?").setTask(ex -> { + try { + Thread.sleep(100l); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + throw new RuntimeException("error occurred in myFailedJobDefinition."); + }).schedule(); + } + +} diff --git a/integration-tests/opentelemetry-scheduler/src/main/java/io/quarkus/it/opentelemetry/scheduler/JobDefinitionCounter.java b/integration-tests/opentelemetry-scheduler/src/main/java/io/quarkus/it/opentelemetry/scheduler/JobDefinitionCounter.java new file mode 100644 index 0000000000000..16ee5db3e1273 --- /dev/null +++ b/integration-tests/opentelemetry-scheduler/src/main/java/io/quarkus/it/opentelemetry/scheduler/JobDefinitionCounter.java @@ -0,0 +1,37 @@ +package io.quarkus.it.opentelemetry.scheduler; + +import java.util.concurrent.atomic.AtomicInteger; + +import jakarta.annotation.PostConstruct; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import io.quarkus.runtime.Startup; +import io.quarkus.scheduler.Scheduler; + +@ApplicationScoped +@Startup +public class JobDefinitionCounter { + + @Inject + Scheduler scheduler; + + AtomicInteger counter; + + @PostConstruct + void init() { + counter = new AtomicInteger(); + scheduler.newJob("myJobDefinition").setCron("*/1 * * * * ?").setTask(ex -> { + try { + Thread.sleep(100l); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + counter.incrementAndGet(); + }).schedule(); + } + + public int get() { + return counter.get(); + } +} diff --git a/integration-tests/opentelemetry-scheduler/src/main/resources/application.properties b/integration-tests/opentelemetry-scheduler/src/main/resources/application.properties new file mode 100644 index 0000000000000..b32b9f635240d --- /dev/null +++ b/integration-tests/opentelemetry-scheduler/src/main/resources/application.properties @@ -0,0 +1,5 @@ +# speed up build +quarkus.otel.bsp.schedule.delay=100 +quarkus.otel.bsp.export.timeout=5s + +quarkus.scheduler.tracing.enabled=true \ No newline at end of file diff --git a/integration-tests/opentelemetry-scheduler/src/test/java/io/quarkus/it/opentelemetry/scheduler/OpenTelemetrySchedulerIT.java b/integration-tests/opentelemetry-scheduler/src/test/java/io/quarkus/it/opentelemetry/scheduler/OpenTelemetrySchedulerIT.java new file mode 100644 index 0000000000000..dd8c254377ca9 --- /dev/null +++ b/integration-tests/opentelemetry-scheduler/src/test/java/io/quarkus/it/opentelemetry/scheduler/OpenTelemetrySchedulerIT.java @@ -0,0 +1,16 @@ +package io.quarkus.it.opentelemetry.scheduler; + +import org.junit.jupiter.api.Test; + +import io.quarkus.test.junit.DisabledOnIntegrationTest; +import io.quarkus.test.junit.QuarkusIntegrationTest; + +@QuarkusIntegrationTest +public class OpenTelemetrySchedulerIT extends OpenTelemetrySchedulerTest { + @Test + @DisabledOnIntegrationTest("native mode testing span does not have a field 'exception' (only in integration-test, not in quarkus app)") + @Override + public void schedulerSpanTest() { + super.schedulerSpanTest(); + } +} diff --git a/integration-tests/opentelemetry-scheduler/src/test/java/io/quarkus/it/opentelemetry/scheduler/OpenTelemetrySchedulerTest.java b/integration-tests/opentelemetry-scheduler/src/test/java/io/quarkus/it/opentelemetry/scheduler/OpenTelemetrySchedulerTest.java new file mode 100644 index 0000000000000..c33b61ccff386 --- /dev/null +++ b/integration-tests/opentelemetry-scheduler/src/test/java/io/quarkus/it/opentelemetry/scheduler/OpenTelemetrySchedulerTest.java @@ -0,0 +1,99 @@ +package io.quarkus.it.opentelemetry.scheduler; + +import static io.restassured.RestAssured.get; +import static io.restassured.RestAssured.given; +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.time.Duration; +import java.util.List; +import java.util.Map; + +import org.junit.jupiter.api.Test; + +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; +import io.quarkus.test.junit.QuarkusTest; +import io.restassured.common.mapper.TypeRef; +import io.restassured.response.Response; + +@QuarkusTest +public class OpenTelemetrySchedulerTest { + + private static long DURATION_IN_NANOSECONDS = 100_000_000; // Thread.sleep(100l) for each job + + @Test + public void schedulerSpanTest() { + // ensure that scheduled job is called + assertCounter("/scheduler/count", 1, Duration.ofSeconds(1)); + // assert JobDefinition type scheduler + assertCounter("/scheduler/count/job-definition", 1, Duration.ofSeconds(1)); + + // ------- SPAN ASSERTS ------- + List> spans = getSpans(); + + assertJobSpan(spans, "myCounter", DURATION_IN_NANOSECONDS); // identity + assertJobSpan(spans, "myJobDefinition", DURATION_IN_NANOSECONDS); // identity + + // errors + assertErrorJobSpan(spans, "myFailedBasicScheduler", DURATION_IN_NANOSECONDS, + "error occurred in myFailedBasicScheduler."); + assertErrorJobSpan(spans, "myFailedJobDefinition", DURATION_IN_NANOSECONDS, + "error occurred in myFailedJobDefinition."); + + } + + private void assertCounter(String counterPath, int expectedCount, Duration timeout) { + await().atMost(timeout) + .until(() -> { + Response response = given().when().get(counterPath); + int code = response.statusCode(); + if (code != 200) { + return false; + } + String body = response.asString(); + int count = Integer.valueOf(body); + return count >= expectedCount; + }); + + } + + private List> getSpans() { + return get("/export").body().as(new TypeRef<>() { + }); + } + + private void assertJobSpan(List> spans, String expectedName, long expectedDuration) { + assertNotNull(spans); + assertFalse(spans.isEmpty()); + Map span = spans.stream().filter(map -> map.get("name").equals(expectedName)).findFirst().orElse(null); + assertNotNull(span, "Span with name '" + expectedName + "' not found."); + assertEquals(SpanKind.INTERNAL.toString(), span.get("kind"), "Span with name '" + expectedName + "' is not internal."); + + long start = (long) span.get("startEpochNanos"); + long end = (long) span.get("endEpochNanos"); + long delta = (end - start); + assertTrue(delta >= expectedDuration, + "Duration of span with name '" + expectedName + + "' is not longer than 100ms, actual duration: " + delta + " (ns)"); + } + + private void assertErrorJobSpan(List> spans, String expectedName, long expectedDuration, + String expectedErrorMessage) { + assertJobSpan(spans, expectedName, expectedDuration); + Map span = spans.stream().filter(map -> map.get("name").equals(expectedName)).findFirst() + .orElseThrow(AssertionError::new); // this assert should never be thrown, since we already checked it in `assertJobSpan` + + Map statusAttributes = (Map) span.get("status"); + assertNotNull(statusAttributes, "Span with name '" + expectedName + "' is not an ERROR"); + assertEquals(StatusCode.ERROR.toString(), statusAttributes.get("statusCode"), + "Span with name '" + expectedName + "' is not an ERROR"); + Map exception = (Map) ((List>) span.get("events")).stream() + .map(map -> map.get("exception")).findFirst().orElseThrow(AssertionError::new); + assertTrue(((String) exception.get("message")).contains(expectedErrorMessage), + "Span with name '" + expectedName + "' has wrong error message"); + } +} diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 7323819a93f1f..295815807544a 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -356,6 +356,8 @@ opentelemetry opentelemetry-spi opentelemetry-jdbc-instrumentation + opentelemetry-quartz + opentelemetry-scheduler opentelemetry-vertx opentelemetry-reactive opentelemetry-grpc