diff --git a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/OTelTelemetryPlugin.java b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/OTelTelemetryPlugin.java index a1ca3adf4d2a2..b25d1d7d9271c 100644 --- a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/OTelTelemetryPlugin.java +++ b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/OTelTelemetryPlugin.java @@ -44,13 +44,14 @@ public List> getSettings() { OTelTelemetrySettings.TRACER_EXPORTER_BATCH_SIZE_SETTING, OTelTelemetrySettings.TRACER_EXPORTER_DELAY_SETTING, OTelTelemetrySettings.TRACER_EXPORTER_MAX_QUEUE_SIZE_SETTING, - OTelTelemetrySettings.OTEL_TRACER_SPAN_EXPORTER_CLASS_SETTING + OTelTelemetrySettings.OTEL_TRACER_SPAN_EXPORTER_CLASS_SETTING, + OTelTelemetrySettings.TRACER_HEAD_SAMPLER_SAMPLING_RATIO ); } @Override public Optional getTelemetry(TelemetrySettings settings) { - return Optional.of(telemetry()); + return Optional.of(telemetry(settings)); } @Override @@ -58,8 +59,8 @@ public String getName() { return OTEL_TRACER_NAME; } - private Telemetry telemetry() { - return new OTelTelemetry(new OTelTracingTelemetry(OTelResourceProvider.get(settings)), new MetricsTelemetry() { + private Telemetry telemetry(TelemetrySettings telemetrySettings) { + return new OTelTelemetry(new OTelTracingTelemetry(OTelResourceProvider.get(this.settings, telemetrySettings.getClusterSettings())), new MetricsTelemetry() { }); } diff --git a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/OTelTelemetrySettings.java b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/OTelTelemetrySettings.java index 59c87cca22986..2ed11f3c6a351 100644 --- a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/OTelTelemetrySettings.java +++ b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/OTelTelemetrySettings.java @@ -9,7 +9,9 @@ package org.opensearch.telemetry; import org.opensearch.SpecialPermission; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.telemetry.tracing.exporter.OTelSpanExporterFactory; @@ -29,7 +31,7 @@ public final class OTelTelemetrySettings { * Base Constructor. */ private OTelTelemetrySettings() {} - + private volatile double samplingProbability; /** * span exporter batch size */ @@ -60,6 +62,34 @@ private OTelTelemetrySettings() {} Setting.Property.Final ); + /** + * Probability of sampler + */ + public static final Setting TRACER_HEAD_SAMPLER_SAMPLING_RATIO = Setting.doubleSetting( + "telemetry.otel.tracer.head.sample.sampling.ratio", + 0.001d, + 0.000d, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + /** + * Constructor + * @param settings setting + * @param clusterSettings cluster settings + */ + public OTelTelemetrySettings(Settings settings, ClusterSettings clusterSettings) { + this.samplingProbability = TRACER_HEAD_SAMPLER_SAMPLING_RATIO.get(settings); + clusterSettings.addSettingsUpdateConsumer(TRACER_HEAD_SAMPLER_SAMPLING_RATIO, this::setSamplingProbability); + } + + /** + * Get sampling ratio + */ + public double getTracerHeadSamplerSamplingRatio(){ + return samplingProbability; + } + /** * Span Exporter type setting. */ @@ -83,4 +113,12 @@ private OTelTelemetrySettings() {} Setting.Property.NodeScope, Setting.Property.Final ); + + /** + * Set sampling ratio + * @param samplingProbability double + */ + public void setSamplingProbability(double samplingProbability) { + this.samplingProbability = samplingProbability; + } } diff --git a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelResourceProvider.java b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelResourceProvider.java index 1ec4818b8b73e..54dfaf6e57dbe 100644 --- a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelResourceProvider.java +++ b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelResourceProvider.java @@ -9,6 +9,8 @@ package org.opensearch.telemetry.tracing; import org.opensearch.common.settings.Settings; +import org.opensearch.telemetry.OTelTelemetrySettings; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.telemetry.tracing.exporter.OTelSpanExporterFactory; import java.util.concurrent.TimeUnit; @@ -24,6 +26,8 @@ import io.opentelemetry.sdk.trace.export.SpanExporter; import io.opentelemetry.sdk.trace.samplers.Sampler; import io.opentelemetry.semconv.resource.attributes.ResourceAttributes; +import org.opensearch.telemetry.tracing.sampler.HeadSampler; +import org.opensearch.telemetry.tracing.sampler.ProbabilisticSampler; import static org.opensearch.telemetry.OTelTelemetrySettings.TRACER_EXPORTER_BATCH_SIZE_SETTING; import static org.opensearch.telemetry.OTelTelemetrySettings.TRACER_EXPORTER_DELAY_SETTING; @@ -38,14 +42,15 @@ private OTelResourceProvider() {} /** * Creates OpenTelemetry instance with default configuration * @param settings cluster settings + * @param clusterSettings cluster settings * @return OpenTelemetry instance */ - public static OpenTelemetry get(Settings settings) { + public static OpenTelemetry get(Settings settings, ClusterSettings clusterSettings) { return get( settings, OTelSpanExporterFactory.create(settings), ContextPropagators.create(W3CTraceContextPropagator.getInstance()), - Sampler.alwaysOn() + Sampler.parentBased(new HeadSampler(new ProbabilisticSampler(new OTelTelemetrySettings(settings, clusterSettings)))) ); } diff --git a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/sampler/HeadSampler.java b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/sampler/HeadSampler.java new file mode 100644 index 0000000000000..2068e526a0b34 --- /dev/null +++ b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/sampler/HeadSampler.java @@ -0,0 +1,60 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry.tracing.sampler; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.trace.data.LinkData; +import io.opentelemetry.sdk.trace.samplers.Sampler; +import io.opentelemetry.sdk.trace.samplers.SamplingResult; + +import java.util.List; + +/** + * HeadBased sampler + */ +public class HeadSampler implements Sampler { + private final Sampler defaultSampler; + + /** + * Creates Head based sampler + * @param defaultSampler defaultSampler + */ + public HeadSampler(Sampler defaultSampler) { + this.defaultSampler = defaultSampler; + } + + @Override + public SamplingResult shouldSample( + Context parentContext, + String traceId, + String name, + SpanKind spanKind, + Attributes attributes, + List parentLinks) { + + if (attributes.get(AttributeKey.stringKey("trace")) != null && Boolean.parseBoolean(attributes.get(AttributeKey.stringKey("trace"))) ) { + return SamplingResult.recordAndSample(); + } else { + return defaultSampler.shouldSample(parentContext, traceId, name, spanKind, attributes, parentLinks); + } + } + + @Override + public String getDescription() { + return "HeadSampler"; + } + + @Override + public String toString() { + return getDescription(); + } +} diff --git a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/sampler/ProbabilisticSampler.java b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/sampler/ProbabilisticSampler.java new file mode 100644 index 0000000000000..150e41985ca02 --- /dev/null +++ b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/sampler/ProbabilisticSampler.java @@ -0,0 +1,75 @@ +package org.opensearch.telemetry.tracing.sampler; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.trace.data.LinkData; +import io.opentelemetry.sdk.trace.samplers.Sampler; +import io.opentelemetry.sdk.trace.samplers.SamplingResult; +import org.opensearch.telemetry.OTelTelemetrySettings; + +import java.util.List; + +/** + * ProbabilisticSampler implements a head-based sampling strategy based on provided settings. + */ +public class ProbabilisticSampler implements Sampler { + private Sampler defaultSampler; + private final OTelTelemetrySettings oTelTelemetrySettings; + + private double tracerHeadSamplerSamplingRatio; + + /** + * Constructor + * + * @param oTelTelemetrySettings Telemetry settings. + */ + public ProbabilisticSampler(OTelTelemetrySettings oTelTelemetrySettings) { + if (oTelTelemetrySettings == null) { + throw new IllegalArgumentException("oTelTelemetrySettings cannot be null"); + } + this.oTelTelemetrySettings = oTelTelemetrySettings; + this.tracerHeadSamplerSamplingRatio = oTelTelemetrySettings.getTracerHeadSamplerSamplingRatio(); + this.defaultSampler = createProbabilitySampler(tracerHeadSamplerSamplingRatio); + } + + private Sampler createProbabilitySampler(double samplingRatio) { + return Sampler.traceIdRatioBased(samplingRatio); + } + + Sampler getSampler() { + double newSamplingRatio = oTelTelemetrySettings.getTracerHeadSamplerSamplingRatio(); + if(isSamplingRatioChanged(newSamplingRatio)){ + this.tracerHeadSamplerSamplingRatio = newSamplingRatio; + defaultSampler = createProbabilitySampler(tracerHeadSamplerSamplingRatio); + } + return defaultSampler; + } + + private boolean isSamplingRatioChanged(double newSamplingRatio) { + return Double.compare(this.tracerHeadSamplerSamplingRatio, newSamplingRatio)!=0; + } + double getTracerHeadSamplerSamplingRatio() { + return tracerHeadSamplerSamplingRatio; + } + @Override + public SamplingResult shouldSample( + Context parentContext, + String traceId, + String name, + SpanKind spanKind, + Attributes attributes, + List parentLinks) { + return getSampler().shouldSample(parentContext, traceId, name, spanKind, attributes, parentLinks); + } + + @Override + public String getDescription() { + return "Probabilistic Sampler"; + } + + @Override + public String toString() { + return getDescription(); + } +} diff --git a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/sampler/package-info.java b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/sampler/package-info.java new file mode 100644 index 0000000000000..f8da186c5292d --- /dev/null +++ b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/sampler/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * This package contains classes needed for HeadBased Sampler. + */ +package org.opensearch.telemetry.tracing.sampler; diff --git a/plugins/telemetry-otel/src/test/java/org/opensearch/telemetry/OTelTelemetryPluginTests.java b/plugins/telemetry-otel/src/test/java/org/opensearch/telemetry/OTelTelemetryPluginTests.java index 611656942860f..dd917093e6993 100644 --- a/plugins/telemetry-otel/src/test/java/org/opensearch/telemetry/OTelTelemetryPluginTests.java +++ b/plugins/telemetry-otel/src/test/java/org/opensearch/telemetry/OTelTelemetryPluginTests.java @@ -29,6 +29,7 @@ import static org.opensearch.telemetry.OTelTelemetrySettings.TRACER_EXPORTER_BATCH_SIZE_SETTING; import static org.opensearch.telemetry.OTelTelemetrySettings.TRACER_EXPORTER_DELAY_SETTING; import static org.opensearch.telemetry.OTelTelemetrySettings.TRACER_EXPORTER_MAX_QUEUE_SIZE_SETTING; +import static org.opensearch.telemetry.OTelTelemetrySettings.TRACER_HEAD_SAMPLER_SAMPLING_RATIO; public class OTelTelemetryPluginTests extends OpenSearchTestCase { @@ -42,10 +43,15 @@ public void setup() { // io.opentelemetry.sdk.OpenTelemetrySdk.close waits only for 10 seconds for shutdown to complete. Settings settings = Settings.builder().put(TRACER_EXPORTER_DELAY_SETTING.getKey(), "1s").build(); oTelTracerModulePlugin = new OTelTelemetryPlugin(settings); - telemetry = oTelTracerModulePlugin.getTelemetry(null); + telemetry = oTelTracerModulePlugin.getTelemetry(new TelemetrySettings(settings, new ClusterSettings(settings, getClusterSettings()))); tracingTelemetry = telemetry.get().getTracingTelemetry(); } - + private Set> getClusterSettings() { + Set> allTracerSettings = new HashSet<>(); + allTracerSettings.add(TRACER_HEAD_SAMPLER_SAMPLING_RATIO); + ClusterSettings.FEATURE_FLAGGED_CLUSTER_SETTINGS.get(List.of(FeatureFlags.TELEMETRY)).stream().forEach((allTracerSettings::add)); + return allTracerSettings; + } public void testGetTelemetry() { Set> allTracerSettings = new HashSet<>(); ClusterSettings.FEATURE_FLAGGED_CLUSTER_SETTINGS.get(List.of(FeatureFlags.TELEMETRY)).stream().forEach((allTracerSettings::add)); @@ -56,7 +62,8 @@ public void testGetTelemetry() { TRACER_EXPORTER_BATCH_SIZE_SETTING, TRACER_EXPORTER_DELAY_SETTING, TRACER_EXPORTER_MAX_QUEUE_SIZE_SETTING, - OTEL_TRACER_SPAN_EXPORTER_CLASS_SETTING + OTEL_TRACER_SPAN_EXPORTER_CLASS_SETTING, + TRACER_HEAD_SAMPLER_SAMPLING_RATIO ), oTelTracerModulePlugin.getSettings() ); diff --git a/plugins/telemetry-otel/src/test/java/org/opensearch/telemetry/tracing/sampler/HeadSamplerTests.java b/plugins/telemetry-otel/src/test/java/org/opensearch/telemetry/tracing/sampler/HeadSamplerTests.java new file mode 100644 index 0000000000000..d5fb2d7a6dc54 --- /dev/null +++ b/plugins/telemetry-otel/src/test/java/org/opensearch/telemetry/tracing/sampler/HeadSamplerTests.java @@ -0,0 +1,80 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry.tracing.sampler; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.SpanKind; +import org.opensearch.test.OpenSearchTestCase; +import io.opentelemetry.sdk.trace.samplers.Sampler; +import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.trace.samplers.SamplingResult; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.never; +import io.opentelemetry.api.common.Attributes; + +import java.util.Collections; + +public class HeadSamplerTests extends OpenSearchTestCase { + + public void testShouldSampleWithTraceAttributeAsTrue() { + + // Create a mock default sampler + Sampler defaultSampler = mock(Sampler.class); + when(defaultSampler.shouldSample(any(), anyString(), anyString(), any(), any(), any())) + .thenReturn(SamplingResult.drop()); + + // Create an instance of HeadSampler with the mock default sampler + HeadSampler headSampler = new HeadSampler(defaultSampler); + + // Create a mock Context and Attributes + Context parentContext = mock(Context.class); + Attributes attributes = Attributes.of(AttributeKey.stringKey("trace"), "true"); + + // Call shouldSample on HeadSampler + SamplingResult result = headSampler.shouldSample(parentContext, "traceId", "spanName", + SpanKind.INTERNAL, attributes, Collections.emptyList()); + + assertEquals(SamplingResult.recordAndSample(), result); + + // Verify that the default sampler's shouldSample method was not called + verify(defaultSampler, never()).shouldSample(any(), anyString(), anyString(), any(), any(), any()); + } + + + public void testShouldSampleWithoutTraceAttribute() { + + // Create a mock default sampler + Sampler defaultSampler = mock(Sampler.class); + when(defaultSampler.shouldSample(any(), anyString(), anyString(), any(), any(), any())) + .thenReturn(SamplingResult.recordAndSample()); + + // Create an instance of HeadSampler with the mock default sampler + HeadSampler headSampler = new HeadSampler(defaultSampler); + + // Create a mock Context and Attributes + Context parentContext = mock(Context.class); + Attributes attributes = Attributes.empty(); + + // Call shouldSample on HeadSampler + SamplingResult result = headSampler.shouldSample(parentContext, "traceId", "spanName", + SpanKind.INTERNAL, attributes, Collections.emptyList()); + + // Verify that HeadSampler returned SamplingResult.recordAndSample() + assertEquals(SamplingResult.recordAndSample(), result); + + // Verify that the default sampler's shouldSample method was called + verify(defaultSampler).shouldSample(any(), anyString(), anyString(), any(), any(), any()); + } + +} diff --git a/plugins/telemetry-otel/src/test/java/org/opensearch/telemetry/tracing/sampler/ProbabilisticSamplerTests.java b/plugins/telemetry-otel/src/test/java/org/opensearch/telemetry/tracing/sampler/ProbabilisticSamplerTests.java new file mode 100644 index 0000000000000..91668ef55590d --- /dev/null +++ b/plugins/telemetry-otel/src/test/java/org/opensearch/telemetry/tracing/sampler/ProbabilisticSamplerTests.java @@ -0,0 +1,82 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry.tracing.sampler; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.trace.samplers.Sampler; +import io.opentelemetry.sdk.trace.samplers.SamplingResult; +import junit.framework.TestCase; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.telemetry.OTelTelemetrySettings; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static io.opentelemetry.sdk.trace.samplers.SamplingDecision.RECORD_AND_SAMPLE; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.*; +import static org.opensearch.telemetry.OTelTelemetrySettings.TRACER_EXPORTER_DELAY_SETTING; +import static org.opensearch.telemetry.OTelTelemetrySettings.TRACER_HEAD_SAMPLER_SAMPLING_RATIO; + +public class ProbabilisticSamplerTests extends TestCase { + + // When ProbabilisticSampler is created with OTelTelemetrySettings as null + public void testProbabilisticSamplerWithNullSettings() { + // Verify that the constructor throws IllegalArgumentException when given null settings + assertThrows(IllegalArgumentException.class, () -> { + new ProbabilisticSampler(null); + }); + } + + public void testDefaultGetSampler() { + Settings settings = Settings.builder().put(TRACER_EXPORTER_DELAY_SETTING.getKey(), "1s").build(); + OTelTelemetrySettings oTelTelemetrySettings = new OTelTelemetrySettings(settings, new ClusterSettings(settings, getClusterSettings())); + + // Probabilistic Sampler + ProbabilisticSampler probabilisticSampler = new ProbabilisticSampler(oTelTelemetrySettings); + + assertNotNull(probabilisticSampler.getSampler()); + assertEquals(0.001, probabilisticSampler.getTracerHeadSamplerSamplingRatio()); + } + + + public void testGetSamplerWithUpdatedSamplingRatio() { + Settings settings = Settings.builder().put(TRACER_EXPORTER_DELAY_SETTING.getKey(), "1s").build(); + + OTelTelemetrySettings oTelTelemetrySettings = new OTelTelemetrySettings(settings, new ClusterSettings(settings, getClusterSettings())); + + // Probabilistic Sampler + ProbabilisticSampler probabilisticSampler = new ProbabilisticSampler(oTelTelemetrySettings); + assertEquals(0.001, probabilisticSampler.getTracerHeadSamplerSamplingRatio()); + + oTelTelemetrySettings.setSamplingProbability(0.002); + + // Need to call getSampler() to update the value of tracerHeadSamplerSamplingRatio + Sampler updatedProbabilisticSampler = probabilisticSampler.getSampler(); + assertEquals(0.002, probabilisticSampler.getTracerHeadSamplerSamplingRatio()); + } + + private Set> getClusterSettings() { + Set> allTracerSettings = new HashSet<>(); + allTracerSettings.add(TRACER_HEAD_SAMPLER_SAMPLING_RATIO); + return allTracerSettings; + } + +} diff --git a/server/src/main/java/org/opensearch/telemetry/TelemetrySettings.java b/server/src/main/java/org/opensearch/telemetry/TelemetrySettings.java index 7c9e0d5ac8097..7be0702d3e045 100644 --- a/server/src/main/java/org/opensearch/telemetry/TelemetrySettings.java +++ b/server/src/main/java/org/opensearch/telemetry/TelemetrySettings.java @@ -16,6 +16,9 @@ * Wrapper class to encapsulate tracing related settings */ public class TelemetrySettings { + + private final Settings settings; + private final ClusterSettings clusterSettings; public static final Setting TRACER_ENABLED_SETTING = Setting.boolSetting( "telemetry.tracer.enabled", false, @@ -26,6 +29,8 @@ public class TelemetrySettings { private volatile boolean tracingEnabled; public TelemetrySettings(Settings settings, ClusterSettings clusterSettings) { + this.clusterSettings = clusterSettings; + this.settings = settings; this.tracingEnabled = TRACER_ENABLED_SETTING.get(settings); clusterSettings.addSettingsUpdateConsumer(TRACER_ENABLED_SETTING, this::setTracingEnabled); @@ -39,4 +44,11 @@ public boolean isTracingEnabled() { return tracingEnabled; } + public Settings getSettings() { + return settings; + } + + public ClusterSettings getClusterSettings() { + return clusterSettings; + } }