Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expand RecordingInstrucments to support collection of observers #112195

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.telemetry.metric.LongUpDownCounter;
import org.elasticsearch.telemetry.metric.LongWithAttributes;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -53,7 +54,7 @@ public String getName() {
}
}

protected interface NumberWithAttributesObserver extends Supplier<Tuple<Number, Map<String, Object>>> {
protected interface NumberWithAttributesObserver extends Supplier<Collection<Tuple<Number, Map<String, Object>>>> {

}

Expand All @@ -74,7 +75,7 @@ public void run() {
return;
}
var observation = observer.get();
call(observation.v1(), observation.v2());
observation.forEach(o -> call(o.v1(), o.v2()));
}
}

Expand Down Expand Up @@ -109,10 +110,10 @@ public void incrementBy(double inc, Map<String, Object> attributes) {
}

public static class RecordingDoubleGauge extends CallbackRecordingInstrument implements DoubleGauge {
public RecordingDoubleGauge(String name, Supplier<DoubleWithAttributes> observer, MetricRecorder<Instrument> recorder) {
public RecordingDoubleGauge(String name, Supplier<Collection<DoubleWithAttributes>> observer, MetricRecorder<Instrument> recorder) {
super(name, () -> {
var observation = observer.get();
return new Tuple<>(observation.value(), observation.attributes());
return observation.stream().map(o -> new Tuple<>((Number) o.value(), o.attributes())).toList();
}, recorder);
}
}
Expand Down Expand Up @@ -172,32 +173,40 @@ public void incrementBy(long inc, Map<String, Object> attributes) {

public static class RecordingAsyncLongCounter extends CallbackRecordingInstrument implements LongAsyncCounter {

public RecordingAsyncLongCounter(String name, Supplier<LongWithAttributes> observer, MetricRecorder<Instrument> recorder) {
public RecordingAsyncLongCounter(
String name,
Supplier<Collection<LongWithAttributes>> observer,
MetricRecorder<Instrument> recorder
) {
super(name, () -> {
var observation = observer.get();
return new Tuple<>(observation.value(), observation.attributes());
return observation.stream().map(o -> new Tuple<>((Number) o.value(), o.attributes())).toList();
}, recorder);
}

}

public static class RecordingAsyncDoubleCounter extends CallbackRecordingInstrument implements DoubleAsyncCounter {

public RecordingAsyncDoubleCounter(String name, Supplier<DoubleWithAttributes> observer, MetricRecorder<Instrument> recorder) {
public RecordingAsyncDoubleCounter(
String name,
Supplier<Collection<DoubleWithAttributes>> observer,
MetricRecorder<Instrument> recorder
) {
super(name, () -> {
var observation = observer.get();
return new Tuple<>(observation.value(), observation.attributes());
return observation.stream().map(o -> new Tuple<>((Number) o.value(), o.attributes())).toList();
}, recorder);
}

}

public static class RecordingLongGauge extends CallbackRecordingInstrument implements LongGauge {

public RecordingLongGauge(String name, Supplier<LongWithAttributes> observer, MetricRecorder<Instrument> recorder) {
public RecordingLongGauge(String name, Supplier<Collection<LongWithAttributes>> observer, MetricRecorder<Instrument> recorder) {
super(name, () -> {
var observation = observer.get();
return new Tuple<>(observation.value(), observation.attributes());
return observation.stream().map(o -> new Tuple<>((Number) o.value(), o.attributes())).toList();
}, recorder);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.telemetry.metric.MeterRegistry;

import java.util.Collection;
import java.util.Collections;
import java.util.function.Supplier;

/**
Expand Down Expand Up @@ -72,9 +73,7 @@ protected DoubleUpDownCounter buildDoubleUpDownCounter(String name, String descr

@Override
public DoubleGauge registerDoubleGauge(String name, String description, String unit, Supplier<DoubleWithAttributes> observer) {
DoubleGauge instrument = buildDoubleGauge(name, description, unit, observer);
recorder.register(instrument, InstrumentType.fromInstrument(instrument), name, description, unit);
return instrument;
return registerDoublesGauge(name, description, unit, () -> Collections.singleton(observer.get()));
}

@Override
Expand All @@ -84,15 +83,22 @@ public DoubleGauge registerDoublesGauge(
String unit,
Supplier<Collection<DoubleWithAttributes>> observer
) {
throw new UnsupportedOperationException("not implemented");
DoubleGauge instrument = buildDoubleGauge(name, description, unit, observer);
recorder.register(instrument, InstrumentType.fromInstrument(instrument), name, description, unit);
return instrument;
}

@Override
public DoubleGauge getDoubleGauge(String name) {
return (DoubleGauge) recorder.getInstrument(InstrumentType.DOUBLE_GAUGE, name);
}

protected DoubleGauge buildDoubleGauge(String name, String description, String unit, Supplier<DoubleWithAttributes> observer) {
protected DoubleGauge buildDoubleGauge(
String name,
String description,
String unit,
Supplier<Collection<DoubleWithAttributes>> observer
) {
return new RecordingInstruments.RecordingDoubleGauge(name, observer, recorder);
}

Expand Down Expand Up @@ -121,9 +127,7 @@ public LongCounter registerLongCounter(String name, String description, String u

@Override
public LongAsyncCounter registerLongAsyncCounter(String name, String description, String unit, Supplier<LongWithAttributes> observer) {
LongAsyncCounter instrument = new RecordingInstruments.RecordingAsyncLongCounter(name, observer, recorder);
recorder.register(instrument, InstrumentType.fromInstrument(instrument), name, description, unit);
return instrument;
return registerLongsAsyncCounter(name, description, unit, () -> Collections.singleton(observer.get()));
}

@Override
Expand All @@ -133,7 +137,9 @@ public LongAsyncCounter registerLongsAsyncCounter(
String unit,
Supplier<Collection<LongWithAttributes>> observer
) {
throw new UnsupportedOperationException("not implemented");
LongAsyncCounter instrument = new RecordingInstruments.RecordingAsyncLongCounter(name, observer, recorder);
recorder.register(instrument, InstrumentType.fromInstrument(instrument), name, description, unit);
return instrument;
}

@Override
Expand All @@ -148,9 +154,7 @@ public DoubleAsyncCounter registerDoubleAsyncCounter(
String unit,
Supplier<DoubleWithAttributes> observer
) {
DoubleAsyncCounter instrument = new RecordingInstruments.RecordingAsyncDoubleCounter(name, observer, recorder);
recorder.register(instrument, InstrumentType.fromInstrument(instrument), name, description, unit);
return instrument;
return registerDoublesAsyncCounter(name, description, unit, () -> Collections.singleton(observer.get()));
}

@Override
Expand All @@ -160,7 +164,9 @@ public DoubleAsyncCounter registerDoublesAsyncCounter(
String unit,
Supplier<Collection<DoubleWithAttributes>> observer
) {
throw new UnsupportedOperationException("not implemented");
DoubleAsyncCounter instrument = new RecordingInstruments.RecordingAsyncDoubleCounter(name, observer, recorder);
recorder.register(instrument, InstrumentType.fromInstrument(instrument), name, description, unit);
return instrument;
}

@Override
Expand Down Expand Up @@ -196,22 +202,22 @@ protected LongUpDownCounter buildLongUpDownCounter(String name, String descripti

@Override
public LongGauge registerLongGauge(String name, String description, String unit, Supplier<LongWithAttributes> observer) {
LongGauge instrument = buildLongGauge(name, description, unit, observer);
recorder.register(instrument, InstrumentType.fromInstrument(instrument), name, description, unit);
return instrument;
return registerLongsGauge(name, description, unit, () -> Collections.singleton(observer.get()));
}

@Override
public LongGauge registerLongsGauge(String name, String description, String unit, Supplier<Collection<LongWithAttributes>> observer) {
throw new UnsupportedOperationException("not implemented");
LongGauge instrument = buildLongGauge(name, description, unit, observer);
recorder.register(instrument, InstrumentType.fromInstrument(instrument), name, description, unit);
return instrument;
}

@Override
public LongGauge getLongGauge(String name) {
return (LongGauge) recorder.getInstrument(InstrumentType.LONG_GAUGE, name);
}

protected LongGauge buildLongGauge(String name, String description, String unit, Supplier<LongWithAttributes> observer) {
protected LongGauge buildLongGauge(String name, String description, String unit, Supplier<Collection<LongWithAttributes>> observer) {
return new RecordingInstruments.RecordingLongGauge(name, observer, recorder);
}

Expand Down