Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tidy up jfr-streaming #127

Merged
merged 4 commits into from
Dec 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
import io.opentelemetry.contrib.jfr.metrics.internal.network.NetworkReadHandler;
import io.opentelemetry.contrib.jfr.metrics.internal.network.NetworkWriteHandler;
import java.util.*;
import java.util.stream.Stream;

final class HandlerRegistry {
private static final String SCHEMA_URL = "https://opentelemetry.io/schemas/1.6.1";
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.contrib.jfr";
private static final String INSTRUMENTATION_VERSION = "1.7.0-SNAPSHOT";

Expand All @@ -33,28 +31,31 @@ private HandlerRegistry(List<? extends RecordedEventHandler> mappers) {
}

static HandlerRegistry createDefault(MeterProvider meterProvider) {
var otelMeter = meterProvider.get(INSTRUMENTATION_NAME, INSTRUMENTATION_VERSION, null);

var meter =
meterProvider
.meterBuilder(INSTRUMENTATION_NAME)
.setInstrumentationVersion(INSTRUMENTATION_VERSION)
.build();
var grouper = new ThreadGrouper();
var filtered =
var handlers =
List.of(
new ObjectAllocationInNewTLABHandler(otelMeter, grouper),
new ObjectAllocationOutsideTLABHandler(otelMeter, grouper),
new NetworkReadHandler(otelMeter, grouper),
new NetworkWriteHandler(otelMeter, grouper),
new G1GarbageCollectionHandler(otelMeter),
new GCHeapSummaryHandler(otelMeter),
new ContextSwitchRateHandler(otelMeter),
new OverallCPULoadHandler(otelMeter),
new ContainerConfigurationHandler(otelMeter),
new LongLockHandler(otelMeter, grouper));
filtered.forEach(RecordedEventHandler::init);

return new HandlerRegistry(filtered);
new ObjectAllocationInNewTLABHandler(grouper),
new ObjectAllocationOutsideTLABHandler(grouper),
new NetworkReadHandler(grouper),
new NetworkWriteHandler(grouper),
new G1GarbageCollectionHandler(),
new GCHeapSummaryHandler(),
new ContextSwitchRateHandler(),
new OverallCPULoadHandler(),
new ContainerConfigurationHandler(),
new LongLockHandler(grouper));
handlers.forEach(handler -> handler.initializeMeter(meter));

return new HandlerRegistry(handlers);
}

/** @return a stream of all entries in this registry. */
Stream<RecordedEventHandler> all() {
return mappers.stream();
/** @return all entries in this registry. */
List<RecordedEventHandler> all() {
return mappers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@
import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.contrib.jfr.metrics.internal.RecordedEventHandler;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
import jdk.jfr.EventSettings;
import jdk.jfr.consumer.RecordingStream;

/** The entry point class for the JFR-over-OpenTelemetry support. */
Expand All @@ -34,21 +32,18 @@ public static void enable(MeterProvider meterProvider) {
jfrMonitorService.submit(
() -> {
try (var recordingStream = new RecordingStream()) {
var enableMappedEvent = eventEnablerFor(recordingStream);
toMetricRegistry.all().forEach(enableMappedEvent);
toMetricRegistry.all().forEach(handler -> enableHandler(recordingStream, handler));
recordingStream.setReuse(false);
logger.log(Level.FINE, "Starting recording stream...");
recordingStream.start(); // run forever
}
});
}

private static Consumer<RecordedEventHandler> eventEnablerFor(RecordingStream recordingStream) {
return handler -> {
EventSettings eventSettings = recordingStream.enable(handler.getEventName());
handler.getPollingDuration().ifPresent(eventSettings::withPeriod);
handler.getThreshold().ifPresent(eventSettings::withThreshold);
recordingStream.onEvent(handler.getEventName(), handler);
};
private static void enableHandler(RecordingStream recordingStream, RecordedEventHandler handler) {
var eventSettings = recordingStream.enable(handler.getEventName());
handler.getPollingDuration().ifPresent(eventSettings::withPeriod);
handler.getThreshold().ifPresent(eventSettings::withThreshold);
recordingStream.onEvent(handler.getEventName(), handler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,30 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import jdk.jfr.consumer.RecordedEvent;

public abstract class AbstractThreadDispatchingHandler implements RecordedEventHandler {
// Will need pruning code for fast-cycling thread frameworks to prevent memory leaks
protected final Map<String, RecordedEventHandler> perThread = new HashMap<>();
protected final ThreadGrouper grouper;
private final Map<String, Consumer<RecordedEvent>> perThread = new HashMap<>();
private final ThreadGrouper grouper;

public AbstractThreadDispatchingHandler(ThreadGrouper grouper) {
this.grouper = grouper;
}

public void reset() {
perThread.clear();
}

public abstract String getEventName();

public abstract RecordedEventHandler createPerThreadSummarizer(String threadName);
public abstract Consumer<RecordedEvent> createPerThreadSummarizer(String threadName);

@Override
public void accept(RecordedEvent ev) {
final Optional<String> possibleGroupedThreadName = grouper.groupedName(ev);
possibleGroupedThreadName.ifPresent(
groupedThreadName -> {
perThread.computeIfAbsent(groupedThreadName, name -> createPerThreadSummarizer(name));
perThread.get(groupedThreadName).accept(ev);
});
grouper
.groupedName(ev)
.ifPresent(
groupedThreadName ->
perThread
.computeIfAbsent(groupedThreadName, this::createPerThreadSummarizer)
.accept(ev));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,20 @@ private Constants() {}
public static final String BYTES = "B";
public static final String MILLISECONDS = "ms";
public static final String PERCENTAGE = "%age";
public static final String READ = "read";
public static final String WRITE = "write";
public static final String USER = "user";
public static final String SYSTEM = "system";
public static final String MACHINE = "machine.total";
public static final String G1 = "g1";
public static final String USED = "used";
public static final String COMMITTED = "committed";

public static final String NETWORK_BYTES_NAME = "runtime.jvm.network.io";
public static final String NETWORK_BYTES_DESCRIPTION = "Network read/write bytes";
public static final String NETWORK_DURATION_NAME = "runtime.jvm.network.duration";
public static final String NETWORK_DURATION_DESCRIPTION = "Network read/write duration";
public static final String NETWORK_MODE_READ = "read";
public static final String NETWORK_MODE_WRITE = "write";

public static final AttributeKey<String> ATTR_THREAD_NAME = AttributeKey.stringKey("thread.name");
public static final AttributeKey<String> ATTR_ARENA_NAME = AttributeKey.stringKey("arena");
public static final AttributeKey<String> ATTR_NETWORK_MODE = AttributeKey.stringKey("mode");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package io.opentelemetry.contrib.jfr.metrics.internal;

import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.internal.NoopMeter;
import java.time.Duration;
import java.util.Optional;
import java.util.function.Consumer;
Expand All @@ -31,6 +33,14 @@ default boolean test(RecordedEvent event) {
return event.getEventType().getName().equalsIgnoreCase(getEventName());
}

/**
* Set the OpenTelemetry {@link Meter} after the SDK has been initialized. Until called,
* implementations should use instruments from {@link NoopMeter}.
*
* @param meter the meter
*/
void initializeMeter(Meter meter);

/**
* Optionally returns a polling duration for JFR events, if present
*
Expand All @@ -50,13 +60,4 @@ default Optional<Duration> getPollingDuration() {
default Optional<Duration> getThreshold() {
return Optional.empty();
}

/**
* Initialize the handler. Default implementation is a no-op
*
* @return
*/
default RecordedEventHandler init() {
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
import jdk.jfr.consumer.RecordedEvent;
import jdk.jfr.consumer.RecordedThread;

public class ThreadGrouper {
public final class ThreadGrouper {
// FIXME doesn't actually do any grouping, but should be safe for now
public Optional<String> groupedName(RecordedEvent ev) {
Object thisField = ev.getValue("eventThread");
if (thisField != null && thisField instanceof RecordedThread) {
if (thisField instanceof RecordedThread) {
return Optional.of(((RecordedThread) thisField).getJavaName());
}
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static io.opentelemetry.contrib.jfr.metrics.internal.Constants.ONE;

import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.internal.NoopMeter;
import io.opentelemetry.contrib.jfr.metrics.internal.RecordedEventHandler;
import jdk.jfr.consumer.RecordedEvent;

Expand All @@ -17,21 +18,19 @@ public final class ContainerConfigurationHandler implements RecordedEventHandler

private static final String EFFECTIVE_CPU_COUNT = "effectiveCpuCount";

private final Meter otelMeter;
private volatile long value = 0L;

public ContainerConfigurationHandler(Meter otelMeter) {
this.otelMeter = otelMeter;
public ContainerConfigurationHandler() {
initializeMeter(NoopMeter.getInstance());
}

public ContainerConfigurationHandler init() {
otelMeter
@Override
public void initializeMeter(Meter meter) {
meter
.upDownCounterBuilder(METRIC_NAME)
.ofDoubles()
.setUnit(ONE)
.buildWithCallback(codm -> codm.observe(value));

return this;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static io.opentelemetry.contrib.jfr.metrics.internal.Constants.HERTZ;

import io.opentelemetry.api.metrics.*;
import io.opentelemetry.api.metrics.internal.NoopMeter;
import io.opentelemetry.contrib.jfr.metrics.internal.RecordedEventHandler;
import java.time.Duration;
import java.util.Optional;
Expand All @@ -17,20 +18,10 @@ public final class ContextSwitchRateHandler implements RecordedEventHandler {
private static final String EVENT_NAME = "jdk.ThreadContextSwitchRate";
private static final String METRIC_NAME = "runtime.jvm.cpu.context_switch";

private final Meter otelMeter;
private volatile double value = 0;

public ContextSwitchRateHandler(Meter otelMeter) {
this.otelMeter = otelMeter;
}

public ContextSwitchRateHandler init() {
otelMeter
.upDownCounterBuilder(METRIC_NAME)
.ofDoubles()
.setUnit(HERTZ)
.buildWithCallback(codm -> codm.observe(value));
return this;
public ContextSwitchRateHandler() {
initializeMeter(NoopMeter.getInstance());
}

@Override
Expand All @@ -42,6 +33,15 @@ public String getEventName() {
return EVENT_NAME;
}

@Override
public void initializeMeter(Meter meter) {
meter
.upDownCounterBuilder(METRIC_NAME)
.ofDoubles()
.setUnit(HERTZ)
.buildWithCallback(codm -> codm.observe(value));
}

@Override
public Optional<Duration> getPollingDuration() {
return Optional.of(Duration.ofSeconds(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,38 @@
import static io.opentelemetry.contrib.jfr.metrics.internal.Constants.ATTR_THREAD_NAME;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.BoundDoubleHistogram;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.internal.NoopMeter;
import io.opentelemetry.contrib.jfr.metrics.internal.AbstractThreadDispatchingHandler;
import io.opentelemetry.contrib.jfr.metrics.internal.Constants;
import io.opentelemetry.contrib.jfr.metrics.internal.RecordedEventHandler;
import io.opentelemetry.contrib.jfr.metrics.internal.ThreadGrouper;
import java.time.Duration;
import java.util.Optional;
import java.util.function.Consumer;
import jdk.jfr.consumer.RecordedEvent;

public final class LongLockHandler extends AbstractThreadDispatchingHandler {
static final String EVENT_NAME = "jdk.JavaMonitorWait";
private final Meter otelMeter;
private static final String EVENT_NAME = "jdk.JavaMonitorWait";
private static final String METRIC_NAME = "runtime.jvm.cpu.longlock.time";
private static final String DESCRIPTION = "Long lock times";

public LongLockHandler(Meter otelMeter, ThreadGrouper grouper) {
private DoubleHistogram histogram;

public LongLockHandler(ThreadGrouper grouper) {
super(grouper);
this.otelMeter = otelMeter;
initializeMeter(NoopMeter.getInstance());
}

@Override
public void initializeMeter(Meter meter) {
histogram =
meter
.histogramBuilder(METRIC_NAME)
.setDescription(DESCRIPTION)
.setUnit(Constants.MILLISECONDS)
.build();
}

@Override
Expand All @@ -33,18 +48,32 @@ public String getEventName() {
}

@Override
public RecordedEventHandler createPerThreadSummarizer(String threadName) {
var attr = Attributes.of(ATTR_THREAD_NAME, threadName);
var builder = otelMeter.histogramBuilder(METRIC_NAME);
builder.setDescription(DESCRIPTION);
builder.setUnit(Constants.MILLISECONDS);
var histogram = builder.build().bind(attr);
var ret = new PerThreadLongLockHandler(histogram, threadName);
return ret.init();
public Consumer<RecordedEvent> createPerThreadSummarizer(String threadName) {
return new PerThreadLongLockHandler(histogram, threadName);
}

@Override
public Optional<Duration> getThreshold() {
return Optional.empty();
}

private static class PerThreadLongLockHandler implements Consumer<RecordedEvent> {
private static final String EVENT_THREAD = "eventThread";

private final BoundDoubleHistogram boundHistogram;

public PerThreadLongLockHandler(DoubleHistogram histogram, String threadName) {
this.boundHistogram = histogram.bind(Attributes.of(ATTR_THREAD_NAME, threadName));
}

@Override
public void accept(RecordedEvent recordedEvent) {
if (recordedEvent.hasField(EVENT_THREAD)) {
boundHistogram.record(recordedEvent.getDuration().toMillis());
}
// What about the class name in MONITOR_CLASS ?
// We can get a stack trace from the thread on the event
// var eventThread = recordedEvent.getThread(EVENT_THREAD);
}
}
}
Loading