Skip to content

Commit

Permalink
WIP: Added OTel Instrumentation feature for scheduling (quartz)
Browse files Browse the repository at this point in the history
  • Loading branch information
mskacelik committed Sep 22, 2023
1 parent f786f4a commit bb57965
Show file tree
Hide file tree
Showing 24 changed files with 493 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.eclipse.microprofile.config.ConfigProvider;

import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
import io.quarkus.arc.deployment.BeanContainerBuildItem;
import io.quarkus.deployment.Capabilities;
import io.quarkus.deployment.Capability;
import io.quarkus.deployment.annotations.BuildProducer;
Expand Down Expand Up @@ -143,4 +144,18 @@ void resteasyReactiveIntegration(
.produce(new PreExceptionMapperHandlerBuildItem(new AttachExceptionHandler()));
}

@BuildStep
@Record(ExecutionTime.RUNTIME_INIT)
void quartzTracingOptions(Capabilities capabilities,
InstrumentationRecorder recorder,
BeanContainerBuildItem beanContainerBuildItem) {
// if (capabilities.isPresent(Capability.QUARTZ) &&
// ConfigProvider.getConfig().getValue("quarkus.scheduler.tracing.enabled", Boolean.class)) {
// recorder.setupQuartzTracer(
// beanContainerBuildItem.getValue(),
// // ConfigProvider.getConfig().getValue("quarkus.otel.experimental.attributes.disabled", Boolean.class)
// false);
// }
}

}
11 changes: 10 additions & 1 deletion extensions/opentelemetry/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,16 @@
<artifactId>quarkus-smallrye-reactive-messaging</artifactId>
<optional>true</optional>
</dependency>

<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-quartz</artifactId>
<optional>true</optional>
</dependency>
<!-- <dependency>-->
<!-- <groupId>io.quarkus</groupId>-->
<!-- <artifactId>quarkus-scheduler</artifactId>-->
<!-- <optional>true</optional>-->
<!-- </dependency>-->
<!-- OpenTelemetry Dependencies -->
<dependency>
<groupId>io.opentelemetry</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@
import java.util.Optional;

import io.quarkus.runtime.annotations.ConfigGroup;
import io.quarkus.runtime.annotations.ConfigPhase;
import io.quarkus.runtime.annotations.ConfigRoot;
import io.smallrye.config.WithDefault;
import io.smallrye.config.WithName;

/**
* Tracing build time configuration
*/
@ConfigGroup
@ConfigRoot(phase = ConfigPhase.BUILD_AND_RUN_TIME_FIXED)
public interface TracesBuildConfig {

/**
Expand Down Expand Up @@ -56,4 +60,13 @@ public interface TracesBuildConfig {
* EndUser SpanProcessor configurations.
*/
EndUserSpanProcessorConfig eusp();

/**
* something
*/

@WithName("experimental.attributes.disabled")
@WithDefault("false")
boolean attributesExperimentalDisabled();

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import io.opentelemetry.api.OpenTelemetry;
import io.quarkus.arc.runtime.BeanContainer;
import io.quarkus.opentelemetry.runtime.tracing.intrumentation.quartz.QuartzTelemetry;
import io.quarkus.opentelemetry.runtime.tracing.intrumentation.vertx.EventBusInstrumenterVertxTracer;
import io.quarkus.opentelemetry.runtime.tracing.intrumentation.vertx.HttpInstrumenterVertxTracer;
import io.quarkus.opentelemetry.runtime.tracing.intrumentation.vertx.OpenTelemetryVertxMetricsFactory;
Expand Down Expand Up @@ -39,6 +40,13 @@ public void setupVertxTracer(BeanContainer beanContainer) {
FACTORY.getVertxTracerDelegator().setDelegate(openTelemetryVertxTracer);
}

/* RUNTIME INIT */
public void setupQuartzTracer(BeanContainer beanContainer, boolean captureExperimentalSpanAttributes) {
OpenTelemetry openTelemetry = beanContainer.beanInstance(OpenTelemetry.class);
io.quarkus.quartz.QuartzScheduler scheduler = beanContainer.beanInstance(io.quarkus.quartz.QuartzScheduler.class);
new QuartzTelemetry(openTelemetry, scheduler.getScheduler(), captureExperimentalSpanAttributes);
}

/* RUNTIME INIT */
public Consumer<VertxOptions> getVertxTracingMetricsOptions() {
MetricsOptions metricsOptions = new MetricsOptions()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.quarkus.opentelemetry.runtime.tracing.intrumentation.quartz;

import org.quartz.JobExecutionContext;

import io.opentelemetry.instrumentation.api.instrumenter.code.CodeAttributesGetter;

final class QuartzCodeAttributesGetter implements CodeAttributesGetter<JobExecutionContext> {

@Override
public Class<?> getCodeClass(JobExecutionContext jobExecutionContext) {
return jobExecutionContext.getJobDetail().getJobClass();
}

@Override
public String getMethodName(JobExecutionContext jobExecutionContext) {
return "execute";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.quarkus.opentelemetry.runtime.tracing.intrumentation.quartz;

import org.quartz.SchedulerException;

import io.opentelemetry.instrumentation.api.instrumenter.ErrorCauseExtractor;

final class QuartzErrorCauseExtractor implements ErrorCauseExtractor {
@Override
public Throwable extract(Throwable error) {
Throwable userError = error;
while (userError instanceof SchedulerException) {
userError = ((SchedulerException) userError).getUnderlyingException();
}
return userError;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.quarkus.opentelemetry.runtime.tracing.intrumentation.quartz;

import org.quartz.JobExecutionContext;
import org.quartz.JobKey;

import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;

final class QuartzSpanNameExtractor implements SpanNameExtractor<JobExecutionContext> {
@Override
public String extract(JobExecutionContext job) {
JobKey key = job.getJobDetail().getKey();
return key.getGroup() + '.' + key.getName();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package io.quarkus.opentelemetry.runtime.tracing.intrumentation.quartz;

import static io.quarkus.opentelemetry.runtime.config.build.OTelBuildConfig.INSTRUMENTATION_NAME;
import static org.quartz.impl.matchers.EverythingMatcher.allJobs;

import org.quartz.JobExecutionContext;
import org.quartz.JobListener;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.code.CodeAttributesExtractor;

public final class QuartzTelemetry {

public QuartzTelemetry(OpenTelemetry openTelemetry, Scheduler scheduler, boolean captureExperimentalSpanAttributes) {
InstrumenterBuilder<JobExecutionContext, Void> instrumenter = Instrumenter.builder(openTelemetry, INSTRUMENTATION_NAME,
new QuartzSpanNameExtractor());

// TODO: make property for experimental data, by default ON
if (captureExperimentalSpanAttributes) {
instrumenter.addAttributesExtractor(
AttributesExtractor.constant(AttributeKey.stringKey("job.system"), "quartz"));
}
instrumenter.setErrorCauseExtractor(new QuartzErrorCauseExtractor());
instrumenter.addAttributesExtractor(
CodeAttributesExtractor.create(new QuartzCodeAttributesGetter()));

configure(scheduler, new TracingJobListener(instrumenter.buildInstrumenter()));
}

private void configure(Scheduler scheduler, JobListener jobListener) {
Trigger x = null;
try {
for (JobListener listener : scheduler.getListenerManager().getJobListeners()) {
if (listener instanceof TracingJobListener) {
return;
}
}
} catch (SchedulerException e) {
// Ignore
}
try {
// We must pass a matcher to work around a bug in Quartz 2.0.0. It's unlikely anyone uses
// a version before 2.0.2, but it makes muzzle simple.
scheduler.getListenerManager().addJobListener(jobListener, allJobs());
} catch (SchedulerException e) {
throw new IllegalStateException("Could not add JobListener to Scheduler", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package io.quarkus.opentelemetry.runtime.tracing.intrumentation.quartz;

import jakarta.enterprise.inject.spi.CDI;

import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.JobListener;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import io.quarkus.quartz.QuartzScheduler;

public final class TracingJobListener implements JobListener {

private static final VirtualField<JobExecutionContext, ContextAndScope> contextVirtualField = VirtualField
.find(JobExecutionContext.class, ContextAndScope.class);

private final Instrumenter<JobExecutionContext, Void> instrumenter;

TracingJobListener(Instrumenter<JobExecutionContext, Void> instrumenter) {
this.instrumenter = instrumenter;
}

@Override
public String getName() {
return TracingJobListener.class.getName();
}

@Override
public void jobExecutionVetoed(JobExecutionContext jobExecutionContext) {
// TODO: Consider adding an attribute for vetoed jobs.
}

@Override
public void jobToBeExecuted(JobExecutionContext job) {
// if (!getScheduler().isTriggerProgrammatic(job.getTrigger().getKey())) {
// return;
// }
System.err.println("START: " + System.nanoTime());

Context parentCtx = Context.current();
if (!instrumenter.shouldStart(parentCtx, job)) {
return;
}

Context context = instrumenter.start(parentCtx, job);

// Listeners are executed synchronously on the same thread starting here.
// https://github.com/quartz-scheduler/quartz/blob/quartz-2.0.x/quartz/src/main/java/org/quartz/core/JobRunShell.java#L180
// However, if a listener before this one throws an exception in wasExecuted, we won't be
// executed. Library instrumentation users need to make sure other listeners don't throw
// exceptions.
Scope scope = context.makeCurrent();
contextVirtualField.set(job, new ContextAndScope(context, scope));
}

@Override
public void jobWasExecuted(JobExecutionContext job, JobExecutionException error) {
// if (!getScheduler().isTriggerProgrammatic(job.getTrigger().getKey())) {
// System.err.println("ANOTACE");
// return;
// }
System.err.println("END: " + System.nanoTime());
ContextAndScope contextAndScope = contextVirtualField.get(job);
if (contextAndScope == null) {
// Would only happen if we didn't start a span (maybe a previous joblistener threw an
// exception before ours could process the start event).
return;
}

contextAndScope.closeScope();
instrumenter.end(contextAndScope.getContext(), job, null, error);
}

private static final class ContextAndScope {
private final Context context;
private final Scope scope;

ContextAndScope(Context context, Scope scope) {
this.context = context;
this.scope = scope;
}

Context getContext() {
return context;
}

void closeScope() {
scope.close();
}
}

private QuartzScheduler getScheduler() {
return CDI.current().select(QuartzScheduler.class).get();
}
}
11 changes: 11 additions & 0 deletions extensions/quartz/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,17 @@
-->
</dependency>

<!-- OpenTelemetry tracing -->
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-instrumentation-api</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-instrumentation-api-semconv</artifactId>
<optional>true</optional>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,4 @@ public interface QuartzScheduler extends Scheduler {
* @return the underlying {@link org.quartz.Scheduler} instance, or {@code null} if the scheduler was not started
*/
org.quartz.Scheduler getScheduler();

}
Loading

0 comments on commit bb57965

Please sign in to comment.