Skip to content

Commit

Permalink
Tidy up jfr-streaming (#127)
Browse files Browse the repository at this point in the history
* Tidy up jfr-streaming

* Network read and write use same instruments

* Restore init method and call with Meter
  • Loading branch information
jack-berg authored Dec 2, 2021
1 parent 08ecf83 commit cd2c935
Show file tree
Hide file tree
Showing 21 changed files with 401 additions and 468 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
import io.opentelemetry.contrib.jfr.metrics.internal.network.NetworkReadHandler;
import io.opentelemetry.contrib.jfr.metrics.internal.network.NetworkWriteHandler;
import java.util.*;
import java.util.stream.Stream;

final class HandlerRegistry {
private static final String SCHEMA_URL = "https://opentelemetry.io/schemas/1.6.1";
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.contrib.jfr";
private static final String INSTRUMENTATION_VERSION = "1.7.0-SNAPSHOT";

Expand All @@ -33,28 +31,31 @@ private HandlerRegistry(List<? extends RecordedEventHandler> mappers) {
}

static HandlerRegistry createDefault(MeterProvider meterProvider) {
var otelMeter = meterProvider.get(INSTRUMENTATION_NAME, INSTRUMENTATION_VERSION, null);

var meter =
meterProvider
.meterBuilder(INSTRUMENTATION_NAME)
.setInstrumentationVersion(INSTRUMENTATION_VERSION)
.build();
var grouper = new ThreadGrouper();
var filtered =
var handlers =
List.of(
new ObjectAllocationInNewTLABHandler(otelMeter, grouper),
new ObjectAllocationOutsideTLABHandler(otelMeter, grouper),
new NetworkReadHandler(otelMeter, grouper),
new NetworkWriteHandler(otelMeter, grouper),
new G1GarbageCollectionHandler(otelMeter),
new GCHeapSummaryHandler(otelMeter),
new ContextSwitchRateHandler(otelMeter),
new OverallCPULoadHandler(otelMeter),
new ContainerConfigurationHandler(otelMeter),
new LongLockHandler(otelMeter, grouper));
filtered.forEach(RecordedEventHandler::init);

return new HandlerRegistry(filtered);
new ObjectAllocationInNewTLABHandler(grouper),
new ObjectAllocationOutsideTLABHandler(grouper),
new NetworkReadHandler(grouper),
new NetworkWriteHandler(grouper),
new G1GarbageCollectionHandler(),
new GCHeapSummaryHandler(),
new ContextSwitchRateHandler(),
new OverallCPULoadHandler(),
new ContainerConfigurationHandler(),
new LongLockHandler(grouper));
handlers.forEach(handler -> handler.initializeMeter(meter));

return new HandlerRegistry(handlers);
}

/** @return a stream of all entries in this registry. */
Stream<RecordedEventHandler> all() {
return mappers.stream();
/** @return all entries in this registry. */
List<RecordedEventHandler> all() {
return mappers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@
import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.contrib.jfr.metrics.internal.RecordedEventHandler;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
import jdk.jfr.EventSettings;
import jdk.jfr.consumer.RecordingStream;

/** The entry point class for the JFR-over-OpenTelemetry support. */
Expand All @@ -34,21 +32,18 @@ public static void enable(MeterProvider meterProvider) {
jfrMonitorService.submit(
() -> {
try (var recordingStream = new RecordingStream()) {
var enableMappedEvent = eventEnablerFor(recordingStream);
toMetricRegistry.all().forEach(enableMappedEvent);
toMetricRegistry.all().forEach(handler -> enableHandler(recordingStream, handler));
recordingStream.setReuse(false);
logger.log(Level.FINE, "Starting recording stream...");
recordingStream.start(); // run forever
}
});
}

private static Consumer<RecordedEventHandler> eventEnablerFor(RecordingStream recordingStream) {
return handler -> {
EventSettings eventSettings = recordingStream.enable(handler.getEventName());
handler.getPollingDuration().ifPresent(eventSettings::withPeriod);
handler.getThreshold().ifPresent(eventSettings::withThreshold);
recordingStream.onEvent(handler.getEventName(), handler);
};
private static void enableHandler(RecordingStream recordingStream, RecordedEventHandler handler) {
var eventSettings = recordingStream.enable(handler.getEventName());
handler.getPollingDuration().ifPresent(eventSettings::withPeriod);
handler.getThreshold().ifPresent(eventSettings::withThreshold);
recordingStream.onEvent(handler.getEventName(), handler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,30 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import jdk.jfr.consumer.RecordedEvent;

public abstract class AbstractThreadDispatchingHandler implements RecordedEventHandler {
// Will need pruning code for fast-cycling thread frameworks to prevent memory leaks
protected final Map<String, RecordedEventHandler> perThread = new HashMap<>();
protected final ThreadGrouper grouper;
private final Map<String, Consumer<RecordedEvent>> perThread = new HashMap<>();
private final ThreadGrouper grouper;

public AbstractThreadDispatchingHandler(ThreadGrouper grouper) {
this.grouper = grouper;
}

public void reset() {
perThread.clear();
}

public abstract String getEventName();

public abstract RecordedEventHandler createPerThreadSummarizer(String threadName);
public abstract Consumer<RecordedEvent> createPerThreadSummarizer(String threadName);

@Override
public void accept(RecordedEvent ev) {
final Optional<String> possibleGroupedThreadName = grouper.groupedName(ev);
possibleGroupedThreadName.ifPresent(
groupedThreadName -> {
perThread.computeIfAbsent(groupedThreadName, name -> createPerThreadSummarizer(name));
perThread.get(groupedThreadName).accept(ev);
});
grouper
.groupedName(ev)
.ifPresent(
groupedThreadName ->
perThread
.computeIfAbsent(groupedThreadName, this::createPerThreadSummarizer)
.accept(ev));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,20 @@ private Constants() {}
public static final String BYTES = "B";
public static final String MILLISECONDS = "ms";
public static final String PERCENTAGE = "%age";
public static final String READ = "read";
public static final String WRITE = "write";
public static final String USER = "user";
public static final String SYSTEM = "system";
public static final String MACHINE = "machine.total";
public static final String G1 = "g1";
public static final String USED = "used";
public static final String COMMITTED = "committed";

public static final String NETWORK_BYTES_NAME = "runtime.jvm.network.io";
public static final String NETWORK_BYTES_DESCRIPTION = "Network read/write bytes";
public static final String NETWORK_DURATION_NAME = "runtime.jvm.network.duration";
public static final String NETWORK_DURATION_DESCRIPTION = "Network read/write duration";
public static final String NETWORK_MODE_READ = "read";
public static final String NETWORK_MODE_WRITE = "write";

public static final AttributeKey<String> ATTR_THREAD_NAME = AttributeKey.stringKey("thread.name");
public static final AttributeKey<String> ATTR_ARENA_NAME = AttributeKey.stringKey("arena");
public static final AttributeKey<String> ATTR_NETWORK_MODE = AttributeKey.stringKey("mode");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package io.opentelemetry.contrib.jfr.metrics.internal;

import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.internal.NoopMeter;
import java.time.Duration;
import java.util.Optional;
import java.util.function.Consumer;
Expand All @@ -31,6 +33,14 @@ default boolean test(RecordedEvent event) {
return event.getEventType().getName().equalsIgnoreCase(getEventName());
}

/**
* Set the OpenTelemetry {@link Meter} after the SDK has been initialized. Until called,
* implementations should use instruments from {@link NoopMeter}.
*
* @param meter the meter
*/
void initializeMeter(Meter meter);

/**
* Optionally returns a polling duration for JFR events, if present
*
Expand All @@ -50,13 +60,4 @@ default Optional<Duration> getPollingDuration() {
default Optional<Duration> getThreshold() {
return Optional.empty();
}

/**
* Initialize the handler. Default implementation is a no-op
*
* @return
*/
default RecordedEventHandler init() {
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
import jdk.jfr.consumer.RecordedEvent;
import jdk.jfr.consumer.RecordedThread;

public class ThreadGrouper {
public final class ThreadGrouper {
// FIXME doesn't actually do any grouping, but should be safe for now
public Optional<String> groupedName(RecordedEvent ev) {
Object thisField = ev.getValue("eventThread");
if (thisField != null && thisField instanceof RecordedThread) {
if (thisField instanceof RecordedThread) {
return Optional.of(((RecordedThread) thisField).getJavaName());
}
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static io.opentelemetry.contrib.jfr.metrics.internal.Constants.ONE;

import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.internal.NoopMeter;
import io.opentelemetry.contrib.jfr.metrics.internal.RecordedEventHandler;
import jdk.jfr.consumer.RecordedEvent;

Expand All @@ -17,21 +18,19 @@ public final class ContainerConfigurationHandler implements RecordedEventHandler

private static final String EFFECTIVE_CPU_COUNT = "effectiveCpuCount";

private final Meter otelMeter;
private volatile long value = 0L;

public ContainerConfigurationHandler(Meter otelMeter) {
this.otelMeter = otelMeter;
public ContainerConfigurationHandler() {
initializeMeter(NoopMeter.getInstance());
}

public ContainerConfigurationHandler init() {
otelMeter
@Override
public void initializeMeter(Meter meter) {
meter
.upDownCounterBuilder(METRIC_NAME)
.ofDoubles()
.setUnit(ONE)
.buildWithCallback(codm -> codm.observe(value));

return this;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static io.opentelemetry.contrib.jfr.metrics.internal.Constants.HERTZ;

import io.opentelemetry.api.metrics.*;
import io.opentelemetry.api.metrics.internal.NoopMeter;
import io.opentelemetry.contrib.jfr.metrics.internal.RecordedEventHandler;
import java.time.Duration;
import java.util.Optional;
Expand All @@ -17,20 +18,10 @@ public final class ContextSwitchRateHandler implements RecordedEventHandler {
private static final String EVENT_NAME = "jdk.ThreadContextSwitchRate";
private static final String METRIC_NAME = "runtime.jvm.cpu.context_switch";

private final Meter otelMeter;
private volatile double value = 0;

public ContextSwitchRateHandler(Meter otelMeter) {
this.otelMeter = otelMeter;
}

public ContextSwitchRateHandler init() {
otelMeter
.upDownCounterBuilder(METRIC_NAME)
.ofDoubles()
.setUnit(HERTZ)
.buildWithCallback(codm -> codm.observe(value));
return this;
public ContextSwitchRateHandler() {
initializeMeter(NoopMeter.getInstance());
}

@Override
Expand All @@ -42,6 +33,15 @@ public String getEventName() {
return EVENT_NAME;
}

@Override
public void initializeMeter(Meter meter) {
meter
.upDownCounterBuilder(METRIC_NAME)
.ofDoubles()
.setUnit(HERTZ)
.buildWithCallback(codm -> codm.observe(value));
}

@Override
public Optional<Duration> getPollingDuration() {
return Optional.of(Duration.ofSeconds(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,38 @@
import static io.opentelemetry.contrib.jfr.metrics.internal.Constants.ATTR_THREAD_NAME;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.BoundDoubleHistogram;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.internal.NoopMeter;
import io.opentelemetry.contrib.jfr.metrics.internal.AbstractThreadDispatchingHandler;
import io.opentelemetry.contrib.jfr.metrics.internal.Constants;
import io.opentelemetry.contrib.jfr.metrics.internal.RecordedEventHandler;
import io.opentelemetry.contrib.jfr.metrics.internal.ThreadGrouper;
import java.time.Duration;
import java.util.Optional;
import java.util.function.Consumer;
import jdk.jfr.consumer.RecordedEvent;

public final class LongLockHandler extends AbstractThreadDispatchingHandler {
static final String EVENT_NAME = "jdk.JavaMonitorWait";
private final Meter otelMeter;
private static final String EVENT_NAME = "jdk.JavaMonitorWait";
private static final String METRIC_NAME = "runtime.jvm.cpu.longlock.time";
private static final String DESCRIPTION = "Long lock times";

public LongLockHandler(Meter otelMeter, ThreadGrouper grouper) {
private DoubleHistogram histogram;

public LongLockHandler(ThreadGrouper grouper) {
super(grouper);
this.otelMeter = otelMeter;
initializeMeter(NoopMeter.getInstance());
}

@Override
public void initializeMeter(Meter meter) {
histogram =
meter
.histogramBuilder(METRIC_NAME)
.setDescription(DESCRIPTION)
.setUnit(Constants.MILLISECONDS)
.build();
}

@Override
Expand All @@ -33,18 +48,32 @@ public String getEventName() {
}

@Override
public RecordedEventHandler createPerThreadSummarizer(String threadName) {
var attr = Attributes.of(ATTR_THREAD_NAME, threadName);
var builder = otelMeter.histogramBuilder(METRIC_NAME);
builder.setDescription(DESCRIPTION);
builder.setUnit(Constants.MILLISECONDS);
var histogram = builder.build().bind(attr);
var ret = new PerThreadLongLockHandler(histogram, threadName);
return ret.init();
public Consumer<RecordedEvent> createPerThreadSummarizer(String threadName) {
return new PerThreadLongLockHandler(histogram, threadName);
}

@Override
public Optional<Duration> getThreshold() {
return Optional.empty();
}

private static class PerThreadLongLockHandler implements Consumer<RecordedEvent> {
private static final String EVENT_THREAD = "eventThread";

private final BoundDoubleHistogram boundHistogram;

public PerThreadLongLockHandler(DoubleHistogram histogram, String threadName) {
this.boundHistogram = histogram.bind(Attributes.of(ATTR_THREAD_NAME, threadName));
}

@Override
public void accept(RecordedEvent recordedEvent) {
if (recordedEvent.hasField(EVENT_THREAD)) {
boundHistogram.record(recordedEvent.getDuration().toMillis());
}
// What about the class name in MONITOR_CLASS ?
// We can get a stack trace from the thread on the event
// var eventThread = recordedEvent.getThread(EVENT_THREAD);
}
}
}
Loading

0 comments on commit cd2c935

Please sign in to comment.