Skip to content

Commit

Permalink
Expand RecordingInstrucments to support collection of observers (elas…
Browse files Browse the repository at this point in the history
…tic#112195)

The support is needed for RecordingInstruments to be used in tests for
guages with a collection of observers.

Relates: elastic#110630
  • Loading branch information
ywangd authored and cbuescher committed Sep 4, 2024
1 parent e3f1830 commit 30c4bef
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.telemetry.metric.LongUpDownCounter;
import org.elasticsearch.telemetry.metric.LongWithAttributes;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -53,7 +54,7 @@ public String getName() {
}
}

protected interface NumberWithAttributesObserver extends Supplier<Tuple<Number, Map<String, Object>>> {
protected interface NumberWithAttributesObserver extends Supplier<Collection<Tuple<Number, Map<String, Object>>>> {

}

Expand All @@ -74,7 +75,7 @@ public void run() {
return;
}
var observation = observer.get();
call(observation.v1(), observation.v2());
observation.forEach(o -> call(o.v1(), o.v2()));
}
}

Expand Down Expand Up @@ -109,10 +110,10 @@ public void incrementBy(double inc, Map<String, Object> attributes) {
}

public static class RecordingDoubleGauge extends CallbackRecordingInstrument implements DoubleGauge {
public RecordingDoubleGauge(String name, Supplier<DoubleWithAttributes> observer, MetricRecorder<Instrument> recorder) {
public RecordingDoubleGauge(String name, Supplier<Collection<DoubleWithAttributes>> observer, MetricRecorder<Instrument> recorder) {
super(name, () -> {
var observation = observer.get();
return new Tuple<>(observation.value(), observation.attributes());
return observation.stream().map(o -> new Tuple<>((Number) o.value(), o.attributes())).toList();
}, recorder);
}
}
Expand Down Expand Up @@ -172,32 +173,40 @@ public void incrementBy(long inc, Map<String, Object> attributes) {

public static class RecordingAsyncLongCounter extends CallbackRecordingInstrument implements LongAsyncCounter {

public RecordingAsyncLongCounter(String name, Supplier<LongWithAttributes> observer, MetricRecorder<Instrument> recorder) {
public RecordingAsyncLongCounter(
String name,
Supplier<Collection<LongWithAttributes>> observer,
MetricRecorder<Instrument> recorder
) {
super(name, () -> {
var observation = observer.get();
return new Tuple<>(observation.value(), observation.attributes());
return observation.stream().map(o -> new Tuple<>((Number) o.value(), o.attributes())).toList();
}, recorder);
}

}

public static class RecordingAsyncDoubleCounter extends CallbackRecordingInstrument implements DoubleAsyncCounter {

public RecordingAsyncDoubleCounter(String name, Supplier<DoubleWithAttributes> observer, MetricRecorder<Instrument> recorder) {
public RecordingAsyncDoubleCounter(
String name,
Supplier<Collection<DoubleWithAttributes>> observer,
MetricRecorder<Instrument> recorder
) {
super(name, () -> {
var observation = observer.get();
return new Tuple<>(observation.value(), observation.attributes());
return observation.stream().map(o -> new Tuple<>((Number) o.value(), o.attributes())).toList();
}, recorder);
}

}

public static class RecordingLongGauge extends CallbackRecordingInstrument implements LongGauge {

public RecordingLongGauge(String name, Supplier<LongWithAttributes> observer, MetricRecorder<Instrument> recorder) {
public RecordingLongGauge(String name, Supplier<Collection<LongWithAttributes>> observer, MetricRecorder<Instrument> recorder) {
super(name, () -> {
var observation = observer.get();
return new Tuple<>(observation.value(), observation.attributes());
return observation.stream().map(o -> new Tuple<>((Number) o.value(), o.attributes())).toList();
}, recorder);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.telemetry.metric.MeterRegistry;

import java.util.Collection;
import java.util.Collections;
import java.util.function.Supplier;

/**
Expand Down Expand Up @@ -72,9 +73,7 @@ protected DoubleUpDownCounter buildDoubleUpDownCounter(String name, String descr

@Override
public DoubleGauge registerDoubleGauge(String name, String description, String unit, Supplier<DoubleWithAttributes> observer) {
DoubleGauge instrument = buildDoubleGauge(name, description, unit, observer);
recorder.register(instrument, InstrumentType.fromInstrument(instrument), name, description, unit);
return instrument;
return registerDoublesGauge(name, description, unit, () -> Collections.singleton(observer.get()));
}

@Override
Expand All @@ -84,15 +83,22 @@ public DoubleGauge registerDoublesGauge(
String unit,
Supplier<Collection<DoubleWithAttributes>> observer
) {
throw new UnsupportedOperationException("not implemented");
DoubleGauge instrument = buildDoubleGauge(name, description, unit, observer);
recorder.register(instrument, InstrumentType.fromInstrument(instrument), name, description, unit);
return instrument;
}

@Override
public DoubleGauge getDoubleGauge(String name) {
return (DoubleGauge) recorder.getInstrument(InstrumentType.DOUBLE_GAUGE, name);
}

protected DoubleGauge buildDoubleGauge(String name, String description, String unit, Supplier<DoubleWithAttributes> observer) {
protected DoubleGauge buildDoubleGauge(
String name,
String description,
String unit,
Supplier<Collection<DoubleWithAttributes>> observer
) {
return new RecordingInstruments.RecordingDoubleGauge(name, observer, recorder);
}

Expand Down Expand Up @@ -121,9 +127,7 @@ public LongCounter registerLongCounter(String name, String description, String u

@Override
public LongAsyncCounter registerLongAsyncCounter(String name, String description, String unit, Supplier<LongWithAttributes> observer) {
LongAsyncCounter instrument = new RecordingInstruments.RecordingAsyncLongCounter(name, observer, recorder);
recorder.register(instrument, InstrumentType.fromInstrument(instrument), name, description, unit);
return instrument;
return registerLongsAsyncCounter(name, description, unit, () -> Collections.singleton(observer.get()));
}

@Override
Expand All @@ -133,7 +137,9 @@ public LongAsyncCounter registerLongsAsyncCounter(
String unit,
Supplier<Collection<LongWithAttributes>> observer
) {
throw new UnsupportedOperationException("not implemented");
LongAsyncCounter instrument = new RecordingInstruments.RecordingAsyncLongCounter(name, observer, recorder);
recorder.register(instrument, InstrumentType.fromInstrument(instrument), name, description, unit);
return instrument;
}

@Override
Expand All @@ -148,9 +154,7 @@ public DoubleAsyncCounter registerDoubleAsyncCounter(
String unit,
Supplier<DoubleWithAttributes> observer
) {
DoubleAsyncCounter instrument = new RecordingInstruments.RecordingAsyncDoubleCounter(name, observer, recorder);
recorder.register(instrument, InstrumentType.fromInstrument(instrument), name, description, unit);
return instrument;
return registerDoublesAsyncCounter(name, description, unit, () -> Collections.singleton(observer.get()));
}

@Override
Expand All @@ -160,7 +164,9 @@ public DoubleAsyncCounter registerDoublesAsyncCounter(
String unit,
Supplier<Collection<DoubleWithAttributes>> observer
) {
throw new UnsupportedOperationException("not implemented");
DoubleAsyncCounter instrument = new RecordingInstruments.RecordingAsyncDoubleCounter(name, observer, recorder);
recorder.register(instrument, InstrumentType.fromInstrument(instrument), name, description, unit);
return instrument;
}

@Override
Expand Down Expand Up @@ -196,22 +202,22 @@ protected LongUpDownCounter buildLongUpDownCounter(String name, String descripti

@Override
public LongGauge registerLongGauge(String name, String description, String unit, Supplier<LongWithAttributes> observer) {
LongGauge instrument = buildLongGauge(name, description, unit, observer);
recorder.register(instrument, InstrumentType.fromInstrument(instrument), name, description, unit);
return instrument;
return registerLongsGauge(name, description, unit, () -> Collections.singleton(observer.get()));
}

@Override
public LongGauge registerLongsGauge(String name, String description, String unit, Supplier<Collection<LongWithAttributes>> observer) {
throw new UnsupportedOperationException("not implemented");
LongGauge instrument = buildLongGauge(name, description, unit, observer);
recorder.register(instrument, InstrumentType.fromInstrument(instrument), name, description, unit);
return instrument;
}

@Override
public LongGauge getLongGauge(String name) {
return (LongGauge) recorder.getInstrument(InstrumentType.LONG_GAUGE, name);
}

protected LongGauge buildLongGauge(String name, String description, String unit, Supplier<LongWithAttributes> observer) {
protected LongGauge buildLongGauge(String name, String description, String unit, Supplier<Collection<LongWithAttributes>> observer) {
return new RecordingInstruments.RecordingLongGauge(name, observer, recorder);
}

Expand Down

0 comments on commit 30c4bef

Please sign in to comment.