From d14fe7733b2ce361e08c05624668fddbf2763a86 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Tue, 27 Aug 2024 17:03:01 +1000 Subject: [PATCH] Expand RecordingInstrucments to support collection of observers (#112195) The support is needed for RecordingInstruments to be used in tests for guages with a collection of observers. Relates: #110630 --- .../telemetry/RecordingInstruments.java | 29 ++++++++----- .../telemetry/RecordingMeterRegistry.java | 42 +++++++++++-------- 2 files changed, 43 insertions(+), 28 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/telemetry/RecordingInstruments.java b/test/framework/src/main/java/org/elasticsearch/telemetry/RecordingInstruments.java index 35417c16e7e1c..49e667bb74e5b 100644 --- a/test/framework/src/main/java/org/elasticsearch/telemetry/RecordingInstruments.java +++ b/test/framework/src/main/java/org/elasticsearch/telemetry/RecordingInstruments.java @@ -24,6 +24,7 @@ import org.elasticsearch.telemetry.metric.LongUpDownCounter; import org.elasticsearch.telemetry.metric.LongWithAttributes; +import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.Objects; @@ -53,7 +54,7 @@ public String getName() { } } - protected interface NumberWithAttributesObserver extends Supplier>> { + protected interface NumberWithAttributesObserver extends Supplier>>> { } @@ -74,7 +75,7 @@ public void run() { return; } var observation = observer.get(); - call(observation.v1(), observation.v2()); + observation.forEach(o -> call(o.v1(), o.v2())); } } @@ -109,10 +110,10 @@ public void incrementBy(double inc, Map attributes) { } public static class RecordingDoubleGauge extends CallbackRecordingInstrument implements DoubleGauge { - public RecordingDoubleGauge(String name, Supplier observer, MetricRecorder recorder) { + public RecordingDoubleGauge(String name, Supplier> observer, MetricRecorder recorder) { super(name, () -> { var observation = observer.get(); - return new Tuple<>(observation.value(), observation.attributes()); + return observation.stream().map(o -> new Tuple<>((Number) o.value(), o.attributes())).toList(); }, recorder); } } @@ -172,10 +173,14 @@ public void incrementBy(long inc, Map attributes) { public static class RecordingAsyncLongCounter extends CallbackRecordingInstrument implements LongAsyncCounter { - public RecordingAsyncLongCounter(String name, Supplier observer, MetricRecorder recorder) { + public RecordingAsyncLongCounter( + String name, + Supplier> observer, + MetricRecorder recorder + ) { super(name, () -> { var observation = observer.get(); - return new Tuple<>(observation.value(), observation.attributes()); + return observation.stream().map(o -> new Tuple<>((Number) o.value(), o.attributes())).toList(); }, recorder); } @@ -183,10 +188,14 @@ public RecordingAsyncLongCounter(String name, Supplier obser public static class RecordingAsyncDoubleCounter extends CallbackRecordingInstrument implements DoubleAsyncCounter { - public RecordingAsyncDoubleCounter(String name, Supplier observer, MetricRecorder recorder) { + public RecordingAsyncDoubleCounter( + String name, + Supplier> observer, + MetricRecorder recorder + ) { super(name, () -> { var observation = observer.get(); - return new Tuple<>(observation.value(), observation.attributes()); + return observation.stream().map(o -> new Tuple<>((Number) o.value(), o.attributes())).toList(); }, recorder); } @@ -194,10 +203,10 @@ public RecordingAsyncDoubleCounter(String name, Supplier o public static class RecordingLongGauge extends CallbackRecordingInstrument implements LongGauge { - public RecordingLongGauge(String name, Supplier observer, MetricRecorder recorder) { + public RecordingLongGauge(String name, Supplier> observer, MetricRecorder recorder) { super(name, () -> { var observation = observer.get(); - return new Tuple<>(observation.value(), observation.attributes()); + return observation.stream().map(o -> new Tuple<>((Number) o.value(), o.attributes())).toList(); }, recorder); } } diff --git a/test/framework/src/main/java/org/elasticsearch/telemetry/RecordingMeterRegistry.java b/test/framework/src/main/java/org/elasticsearch/telemetry/RecordingMeterRegistry.java index 97fe0ad1370ef..392445aa77a8f 100644 --- a/test/framework/src/main/java/org/elasticsearch/telemetry/RecordingMeterRegistry.java +++ b/test/framework/src/main/java/org/elasticsearch/telemetry/RecordingMeterRegistry.java @@ -24,6 +24,7 @@ import org.elasticsearch.telemetry.metric.MeterRegistry; import java.util.Collection; +import java.util.Collections; import java.util.function.Supplier; /** @@ -72,9 +73,7 @@ protected DoubleUpDownCounter buildDoubleUpDownCounter(String name, String descr @Override public DoubleGauge registerDoubleGauge(String name, String description, String unit, Supplier observer) { - DoubleGauge instrument = buildDoubleGauge(name, description, unit, observer); - recorder.register(instrument, InstrumentType.fromInstrument(instrument), name, description, unit); - return instrument; + return registerDoublesGauge(name, description, unit, () -> Collections.singleton(observer.get())); } @Override @@ -84,7 +83,9 @@ public DoubleGauge registerDoublesGauge( String unit, Supplier> observer ) { - throw new UnsupportedOperationException("not implemented"); + DoubleGauge instrument = buildDoubleGauge(name, description, unit, observer); + recorder.register(instrument, InstrumentType.fromInstrument(instrument), name, description, unit); + return instrument; } @Override @@ -92,7 +93,12 @@ public DoubleGauge getDoubleGauge(String name) { return (DoubleGauge) recorder.getInstrument(InstrumentType.DOUBLE_GAUGE, name); } - protected DoubleGauge buildDoubleGauge(String name, String description, String unit, Supplier observer) { + protected DoubleGauge buildDoubleGauge( + String name, + String description, + String unit, + Supplier> observer + ) { return new RecordingInstruments.RecordingDoubleGauge(name, observer, recorder); } @@ -121,9 +127,7 @@ public LongCounter registerLongCounter(String name, String description, String u @Override public LongAsyncCounter registerLongAsyncCounter(String name, String description, String unit, Supplier observer) { - LongAsyncCounter instrument = new RecordingInstruments.RecordingAsyncLongCounter(name, observer, recorder); - recorder.register(instrument, InstrumentType.fromInstrument(instrument), name, description, unit); - return instrument; + return registerLongsAsyncCounter(name, description, unit, () -> Collections.singleton(observer.get())); } @Override @@ -133,7 +137,9 @@ public LongAsyncCounter registerLongsAsyncCounter( String unit, Supplier> observer ) { - throw new UnsupportedOperationException("not implemented"); + LongAsyncCounter instrument = new RecordingInstruments.RecordingAsyncLongCounter(name, observer, recorder); + recorder.register(instrument, InstrumentType.fromInstrument(instrument), name, description, unit); + return instrument; } @Override @@ -148,9 +154,7 @@ public DoubleAsyncCounter registerDoubleAsyncCounter( String unit, Supplier observer ) { - DoubleAsyncCounter instrument = new RecordingInstruments.RecordingAsyncDoubleCounter(name, observer, recorder); - recorder.register(instrument, InstrumentType.fromInstrument(instrument), name, description, unit); - return instrument; + return registerDoublesAsyncCounter(name, description, unit, () -> Collections.singleton(observer.get())); } @Override @@ -160,7 +164,9 @@ public DoubleAsyncCounter registerDoublesAsyncCounter( String unit, Supplier> observer ) { - throw new UnsupportedOperationException("not implemented"); + DoubleAsyncCounter instrument = new RecordingInstruments.RecordingAsyncDoubleCounter(name, observer, recorder); + recorder.register(instrument, InstrumentType.fromInstrument(instrument), name, description, unit); + return instrument; } @Override @@ -196,14 +202,14 @@ protected LongUpDownCounter buildLongUpDownCounter(String name, String descripti @Override public LongGauge registerLongGauge(String name, String description, String unit, Supplier observer) { - LongGauge instrument = buildLongGauge(name, description, unit, observer); - recorder.register(instrument, InstrumentType.fromInstrument(instrument), name, description, unit); - return instrument; + return registerLongsGauge(name, description, unit, () -> Collections.singleton(observer.get())); } @Override public LongGauge registerLongsGauge(String name, String description, String unit, Supplier> observer) { - throw new UnsupportedOperationException("not implemented"); + LongGauge instrument = buildLongGauge(name, description, unit, observer); + recorder.register(instrument, InstrumentType.fromInstrument(instrument), name, description, unit); + return instrument; } @Override @@ -211,7 +217,7 @@ public LongGauge getLongGauge(String name) { return (LongGauge) recorder.getInstrument(InstrumentType.LONG_GAUGE, name); } - protected LongGauge buildLongGauge(String name, String description, String unit, Supplier observer) { + protected LongGauge buildLongGauge(String name, String description, String unit, Supplier> observer) { return new RecordingInstruments.RecordingLongGauge(name, observer, recorder); }