diff --git a/src/OpenTelemetry.Exporter.Prometheus/.publicApi/net461/PublicAPI.Unshipped.txt b/src/OpenTelemetry.Exporter.Prometheus/.publicApi/net461/PublicAPI.Unshipped.txt index 2327332329e..70bd44b3df7 100644 --- a/src/OpenTelemetry.Exporter.Prometheus/.publicApi/net461/PublicAPI.Unshipped.txt +++ b/src/OpenTelemetry.Exporter.Prometheus/.publicApi/net461/PublicAPI.Unshipped.txt @@ -8,6 +8,8 @@ OpenTelemetry.Exporter.PrometheusExporterOptions.HttpListenerPrefixes.set -> voi OpenTelemetry.Exporter.PrometheusExporterOptions.PrometheusExporterOptions() -> void OpenTelemetry.Exporter.PrometheusExporterOptions.ScrapeEndpointPath.get -> string OpenTelemetry.Exporter.PrometheusExporterOptions.ScrapeEndpointPath.set -> void +OpenTelemetry.Exporter.PrometheusExporterOptions.ScrapeResponseCacheDurationMilliseconds.get -> int +OpenTelemetry.Exporter.PrometheusExporterOptions.ScrapeResponseCacheDurationMilliseconds.set -> void OpenTelemetry.Exporter.PrometheusExporterOptions.StartHttpListener.get -> bool OpenTelemetry.Exporter.PrometheusExporterOptions.StartHttpListener.set -> void OpenTelemetry.Metrics.PrometheusExporterMeterProviderBuilderExtensions diff --git a/src/OpenTelemetry.Exporter.Prometheus/.publicApi/netcoreapp3.1/PublicAPI.Unshipped.txt b/src/OpenTelemetry.Exporter.Prometheus/.publicApi/netcoreapp3.1/PublicAPI.Unshipped.txt index ba9a12cdc47..fa2cedde662 100644 --- a/src/OpenTelemetry.Exporter.Prometheus/.publicApi/netcoreapp3.1/PublicAPI.Unshipped.txt +++ b/src/OpenTelemetry.Exporter.Prometheus/.publicApi/netcoreapp3.1/PublicAPI.Unshipped.txt @@ -9,6 +9,8 @@ OpenTelemetry.Exporter.PrometheusExporterOptions.HttpListenerPrefixes.set -> voi OpenTelemetry.Exporter.PrometheusExporterOptions.PrometheusExporterOptions() -> void OpenTelemetry.Exporter.PrometheusExporterOptions.ScrapeEndpointPath.get -> string OpenTelemetry.Exporter.PrometheusExporterOptions.ScrapeEndpointPath.set -> void +OpenTelemetry.Exporter.PrometheusExporterOptions.ScrapeResponseCacheDurationMilliseconds.get -> int +OpenTelemetry.Exporter.PrometheusExporterOptions.ScrapeResponseCacheDurationMilliseconds.set -> void OpenTelemetry.Exporter.PrometheusExporterOptions.StartHttpListener.get -> bool OpenTelemetry.Exporter.PrometheusExporterOptions.StartHttpListener.set -> void OpenTelemetry.Metrics.PrometheusExporterMeterProviderBuilderExtensions diff --git a/src/OpenTelemetry.Exporter.Prometheus/CHANGELOG.md b/src/OpenTelemetry.Exporter.Prometheus/CHANGELOG.md index ff68b89701b..6e4c164b0c5 100644 --- a/src/OpenTelemetry.Exporter.Prometheus/CHANGELOG.md +++ b/src/OpenTelemetry.Exporter.Prometheus/CHANGELOG.md @@ -2,6 +2,10 @@ ## Unreleased +* Added scrape endpoint response caching feature & + `ScrapeResponseCacheDurationMilliseconds` option + ([#2610](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2610)) + ## 1.2.0-beta1 Released 2021-Oct-08 diff --git a/src/OpenTelemetry.Exporter.Prometheus/Implementation/PrometheusCollectionManager.cs b/src/OpenTelemetry.Exporter.Prometheus/Implementation/PrometheusCollectionManager.cs new file mode 100644 index 00000000000..6430143c779 --- /dev/null +++ b/src/OpenTelemetry.Exporter.Prometheus/Implementation/PrometheusCollectionManager.cs @@ -0,0 +1,216 @@ +// +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +using System; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; +using OpenTelemetry.Metrics; + +namespace OpenTelemetry.Exporter.Prometheus +{ + internal sealed class PrometheusCollectionManager + { + private readonly PrometheusExporter exporter; + private readonly int scrapeResponseCacheDurationInMilliseconds; + private readonly Func, ExportResult> onCollectRef; + private byte[] buffer = new byte[85000]; // encourage the object to live in LOH (large object heap) + private int globalLockState; + private ArraySegment previousDataView; + private DateTime? previousDataViewExpirationAtUtc; + private int readerCount; + private bool collectionRunning; + private TaskCompletionSource> collectionTcs; + + public PrometheusCollectionManager(PrometheusExporter exporter) + { + this.exporter = exporter; + this.scrapeResponseCacheDurationInMilliseconds = this.exporter.Options.ScrapeResponseCacheDurationMilliseconds; + this.onCollectRef = this.OnCollect; + } + +#if NETCOREAPP3_1_OR_GREATER + public ValueTask> EnterCollect() +#else + public Task> EnterCollect() +#endif + { + this.EnterGlobalLock(); + + // If we are within {ScrapeResponseCacheDurationMilliseconds} of the + // last successful collect, return the previous view. + if (this.previousDataViewExpirationAtUtc.HasValue && this.previousDataViewExpirationAtUtc >= DateTime.UtcNow) + { + Interlocked.Increment(ref this.readerCount); + this.ExitGlobalLock(); +#if NETCOREAPP3_1_OR_GREATER + return new ValueTask>(this.previousDataView); +#else + return Task.FromResult(this.previousDataView); +#endif + } + + // If a collection is already running, return a task to wait on the result. + if (this.collectionRunning) + { + if (this.collectionTcs == null) + { + this.collectionTcs = new TaskCompletionSource>(TaskCreationOptions.RunContinuationsAsynchronously); + } + + Interlocked.Increment(ref this.readerCount); + this.ExitGlobalLock(); +#if NETCOREAPP3_1_OR_GREATER + return new ValueTask>(this.collectionTcs.Task); +#else + return this.collectionTcs.Task; +#endif + } + + this.WaitForReadersToComplete(); + + // Start a collection on the current thread. + this.collectionRunning = true; + this.previousDataViewExpirationAtUtc = null; + Interlocked.Increment(ref this.readerCount); + this.ExitGlobalLock(); + + bool result = this.ExecuteCollect(); + if (result && this.scrapeResponseCacheDurationInMilliseconds > 0) + { + this.previousDataViewExpirationAtUtc = DateTime.UtcNow.AddMilliseconds(this.scrapeResponseCacheDurationInMilliseconds); + } + + this.EnterGlobalLock(); + + this.collectionRunning = false; + + if (this.collectionTcs != null) + { + this.collectionTcs.SetResult(this.previousDataView); + this.collectionTcs = null; + } + + this.ExitGlobalLock(); + +#if NETCOREAPP3_1_OR_GREATER + return new ValueTask>(this.previousDataView); +#else + return Task.FromResult(this.previousDataView); +#endif + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void ExitCollect() + { + Interlocked.Decrement(ref this.readerCount); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void EnterGlobalLock() + { + SpinWait lockWait = default; + while (true) + { + if (Interlocked.CompareExchange(ref this.globalLockState, 1, this.globalLockState) != 0) + { + lockWait.SpinOnce(); + continue; + } + + break; + } + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void ExitGlobalLock() + { + this.globalLockState = 0; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void WaitForReadersToComplete() + { + SpinWait readWait = default; + while (true) + { + if (Interlocked.CompareExchange(ref this.readerCount, 0, this.readerCount) != 0) + { + readWait.SpinOnce(); + continue; + } + + break; + } + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private bool ExecuteCollect() + { + this.exporter.OnExport = this.onCollectRef; + bool result = this.exporter.Collect(Timeout.Infinite); + this.exporter.OnExport = null; + return result; + } + + private ExportResult OnCollect(Batch metrics) + { + int cursor = 0; + + try + { + foreach (var metric in metrics) + { + while (true) + { + try + { + cursor = PrometheusSerializer.WriteMetric(this.buffer, cursor, metric); + break; + } + catch (IndexOutOfRangeException) + { + int bufferSize = this.buffer.Length * 2; + + // there are two cases we might run into the following condition: + // 1. we have many metrics to be exported - in this case we probably want + // to put some upper limit and allow the user to configure it. + // 2. we got an IndexOutOfRangeException which was triggered by some other + // code instead of the buffer[cursor++] - in this case we should give up + // at certain point rather than allocating like crazy. + if (bufferSize > 100 * 1024 * 1024) + { + throw; + } + + var newBuffer = new byte[bufferSize]; + this.buffer.CopyTo(newBuffer, 0); + this.buffer = newBuffer; + } + } + } + + this.previousDataView = new ArraySegment(this.buffer, 0, cursor); + return ExportResult.Success; + } + catch (Exception) + { + this.previousDataView = new ArraySegment(Array.Empty(), 0, 0); + return ExportResult.Failure; + } + } + } +} diff --git a/src/OpenTelemetry.Exporter.Prometheus/Implementation/PrometheusExporterHttpServer.cs b/src/OpenTelemetry.Exporter.Prometheus/Implementation/PrometheusExporterHttpServer.cs index d60d8552a91..eb81bc3dd02 100644 --- a/src/OpenTelemetry.Exporter.Prometheus/Implementation/PrometheusExporterHttpServer.cs +++ b/src/OpenTelemetry.Exporter.Prometheus/Implementation/PrometheusExporterHttpServer.cs @@ -117,9 +117,6 @@ public void Dispose() private void WorkerProc() { - var bufferSize = 85000; // encourage the object to live in LOH (large object heap) - var buffer = new byte[bufferSize]; - this.httpListener.Start(); try @@ -131,74 +128,7 @@ private void WorkerProc() ctxTask.Wait(this.tokenSource.Token); var ctx = ctxTask.Result; - try - { - ctx.Response.StatusCode = 200; - ctx.Response.Headers.Add("Server", string.Empty); - ctx.Response.ContentType = "text/plain; charset=utf-8; version=0.0.4"; - - this.exporter.OnExport = (metrics) => - { - try - { - var cursor = 0; - foreach (var metric in metrics) - { - while (true) - { - try - { - cursor = PrometheusSerializer.WriteMetric(buffer, cursor, metric); - break; - } - catch (IndexOutOfRangeException) - { - bufferSize = bufferSize * 2; - - // there are two cases we might run into the following condition: - // 1. we have many metrics to be exported - in this case we probably want - // to put some upper limit and allow the user to configure it. - // 2. we got an IndexOutOfRangeException which was triggered by some other - // code instead of the buffer[cursor++] - in this case we should give up - // at certain point rather than allocating like crazy. - if (bufferSize > 100 * 1024 * 1024) - { - throw; - } - - var newBuffer = new byte[bufferSize]; - buffer.CopyTo(newBuffer, 0); - buffer = newBuffer; - } - } - } - - ctx.Response.OutputStream.Write(buffer, 0, cursor - 0); - return ExportResult.Success; - } - catch (Exception) - { - return ExportResult.Failure; - } - }; - - this.exporter.Collect(Timeout.Infinite); - this.exporter.OnExport = null; - } - catch (Exception ex) - { - PrometheusExporterEventSource.Log.FailedExport(ex); - - ctx.Response.StatusCode = 500; - } - - try - { - ctx.Response.Close(); - } - catch - { - } + Task.Run(() => this.ProcessRequestAsync(ctx)); } } catch (OperationCanceledException ex) @@ -218,5 +148,46 @@ private void WorkerProc() } } } + + private async Task ProcessRequestAsync(HttpListenerContext context) + { + try + { + var data = await this.exporter.CollectionManager.EnterCollect().ConfigureAwait(false); + try + { + if (data.Count > 0) + { + context.Response.StatusCode = 200; + context.Response.Headers.Add("Server", string.Empty); + context.Response.ContentType = "text/plain; charset=utf-8; version=0.0.4"; + + await context.Response.OutputStream.WriteAsync(data.Array, 0, data.Count).ConfigureAwait(false); + } + else + { + throw new InvalidOperationException("Collection failure."); + } + } + finally + { + this.exporter.CollectionManager.ExitCollect(); + } + } + catch (Exception ex) + { + PrometheusExporterEventSource.Log.FailedExport(ex); + + context.Response.StatusCode = 500; + } + + try + { + context.Response.Close(); + } + catch + { + } + } } } diff --git a/src/OpenTelemetry.Exporter.Prometheus/Implementation/PrometheusExporterMiddleware.cs b/src/OpenTelemetry.Exporter.Prometheus/Implementation/PrometheusExporterMiddleware.cs index ee870cf8f65..42b9fc4458d 100644 --- a/src/OpenTelemetry.Exporter.Prometheus/Implementation/PrometheusExporterMiddleware.cs +++ b/src/OpenTelemetry.Exporter.Prometheus/Implementation/PrometheusExporterMiddleware.cs @@ -17,8 +17,6 @@ #if NETCOREAPP3_1_OR_GREATER using System; using System.Diagnostics; -using System.Runtime.CompilerServices; -using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.Http; using OpenTelemetry.Internal; @@ -50,6 +48,11 @@ public PrometheusExporterMiddleware(MeterProvider meterProvider, RequestDelegate this.exporter = exporter; } + internal PrometheusExporterMiddleware(PrometheusExporter exporter) + { + this.exporter = exporter; + } + /// /// Invoke. /// @@ -61,38 +64,26 @@ public async Task InvokeAsync(HttpContext httpContext) var response = httpContext.Response; - if (!this.exporter.TryEnterSemaphore()) - { - response.StatusCode = 429; - return; - } - - var buffer = new byte[65536]; - var count = 0; - - this.exporter.OnExport = (metrics) => + try { + var data = await this.exporter.CollectionManager.EnterCollect().ConfigureAwait(false); try { - count = PrometheusSerializer.WriteMetrics(buffer, 0, metrics); - return ExportResult.Success; - } - catch (Exception ex) - { - PrometheusExporterEventSource.Log.FailedExport(ex); - return ExportResult.Failure; - } - }; + if (data.Count > 0) + { + response.StatusCode = 200; + response.ContentType = "text/plain; charset=utf-8; version=0.0.4"; - try - { - if (this.exporter.Collect(Timeout.Infinite)) - { - await WriteMetricsToResponse(buffer, count, response).ConfigureAwait(false); + await response.Body.WriteAsync(data.Array, 0, data.Count).ConfigureAwait(false); + } + else + { + throw new InvalidOperationException("Collection failure."); + } } - else + finally { - response.StatusCode = 500; + this.exporter.CollectionManager.ExitCollect(); } } catch (Exception ex) @@ -103,22 +94,9 @@ public async Task InvokeAsync(HttpContext httpContext) response.StatusCode = 500; } } - finally - { - this.exporter.ReleaseSemaphore(); - } this.exporter.OnExport = null; } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal static async Task WriteMetricsToResponse(byte[] buffer, int count, HttpResponse response) - { - response.StatusCode = 200; - response.ContentType = "text/plain; charset=utf-8; version=0.0.4"; - - await response.Body.WriteAsync(buffer, 0, count).ConfigureAwait(false); - } } } #endif diff --git a/src/OpenTelemetry.Exporter.Prometheus/Implementation/PrometheusSerializerExt.cs b/src/OpenTelemetry.Exporter.Prometheus/Implementation/PrometheusSerializerExt.cs index 8ad32d8d058..1e638501dc7 100644 --- a/src/OpenTelemetry.Exporter.Prometheus/Implementation/PrometheusSerializerExt.cs +++ b/src/OpenTelemetry.Exporter.Prometheus/Implementation/PrometheusSerializerExt.cs @@ -66,19 +66,25 @@ public static int WriteMetric(byte[] buffer, int cursor, Metric metric) // Counter and Gauge cursor = WriteMetricName(buffer, cursor, metric.Name, metric.Unit); - buffer[cursor++] = unchecked((byte)'{'); - for (var i = 0; i < keys.Length; i++) + int numberOfKeys = keys?.Length ?? 0; + if (numberOfKeys > 0) { - if (i > 0) + buffer[cursor++] = unchecked((byte)'{'); + + for (var i = 0; i < keys.Length; i++) { - buffer[cursor++] = unchecked((byte)','); + if (i > 0) + { + buffer[cursor++] = unchecked((byte)','); + } + + cursor = WriteLabel(buffer, cursor, keys[i], values[i]); } - cursor = WriteLabel(buffer, cursor, keys[i], values[i]); + buffer[cursor++] = unchecked((byte)'}'); } - buffer[cursor++] = unchecked((byte)'}'); buffer[cursor++] = unchecked((byte)' '); if (((int)metric.MetricType & 0b_0000_1111) == 0x0a /* I8 */) diff --git a/src/OpenTelemetry.Exporter.Prometheus/PrometheusExporter.cs b/src/OpenTelemetry.Exporter.Prometheus/PrometheusExporter.cs index 30be8f412b9..9470b31e67a 100644 --- a/src/OpenTelemetry.Exporter.Prometheus/PrometheusExporter.cs +++ b/src/OpenTelemetry.Exporter.Prometheus/PrometheusExporter.cs @@ -15,7 +15,6 @@ // using System; -using System.Threading; using OpenTelemetry.Exporter.Prometheus; using OpenTelemetry.Metrics; @@ -31,7 +30,6 @@ public class PrometheusExporter : BaseExporter, IPullMetricExporter internal const string HttpListenerStartFailureExceptionMessage = "PrometheusExporter http listener could not be started."; internal readonly PrometheusExporterOptions Options; internal Batch Metrics; // TODO: this is no longer needed, we can remove it later - private readonly SemaphoreSlim semaphore = new SemaphoreSlim(1, 1); private readonly PrometheusExporterHttpServer metricsHttpServer; private Func funcCollect; private Func, ExportResult> funcExport; @@ -57,6 +55,8 @@ public PrometheusExporter(PrometheusExporterOptions options) throw new InvalidOperationException(HttpListenerStartFailureExceptionMessage, ex); } } + + this.CollectionManager = new PrometheusCollectionManager(this); } public Func Collect @@ -71,21 +71,13 @@ internal Func, ExportResult> OnExport set => this.funcExport = value; } + internal PrometheusCollectionManager CollectionManager { get; } + public override ExportResult Export(in Batch metrics) { return this.OnExport(metrics); } - internal bool TryEnterSemaphore() - { - return this.semaphore.Wait(0); - } - - internal void ReleaseSemaphore() - { - this.semaphore.Release(); - } - protected override void Dispose(bool disposing) { if (!this.disposed) @@ -93,7 +85,6 @@ protected override void Dispose(bool disposing) if (disposing) { this.metricsHttpServer?.Dispose(); - this.semaphore.Dispose(); } this.disposed = true; diff --git a/src/OpenTelemetry.Exporter.Prometheus/PrometheusExporterOptions.cs b/src/OpenTelemetry.Exporter.Prometheus/PrometheusExporterOptions.cs index adc4780015f..cea95b37c04 100644 --- a/src/OpenTelemetry.Exporter.Prometheus/PrometheusExporterOptions.cs +++ b/src/OpenTelemetry.Exporter.Prometheus/PrometheusExporterOptions.cs @@ -27,6 +27,8 @@ public class PrometheusExporterOptions internal const string DefaultScrapeEndpointPath = "/metrics"; internal Func GetUtcNowDateTimeOffset = () => DateTimeOffset.UtcNow; + private int scrapeResponseCacheDurationMilliseconds = 10 * 1000; + #if NETCOREAPP3_1_OR_GREATER /// /// Gets or sets a value indicating whether or not an http listener @@ -51,5 +53,25 @@ public class PrometheusExporterOptions /// Gets or sets the path to use for the scraping endpoint. Default value: /metrics. /// public string ScrapeEndpointPath { get; set; } = DefaultScrapeEndpointPath; + + /// + /// Gets or sets the cache duration in milliseconds for scrape responses. Default value: 10,000 (10 seconds). + /// + /// + /// Note: Specify 0 to disable response caching. + /// + public int ScrapeResponseCacheDurationMilliseconds + { + get => this.scrapeResponseCacheDurationMilliseconds; + set + { + if (value < 0) + { + throw new ArgumentOutOfRangeException(nameof(value), "Value should be greater than or equal to zero."); + } + + this.scrapeResponseCacheDurationMilliseconds = value; + } + } } } diff --git a/src/OpenTelemetry.Exporter.Prometheus/README.md b/src/OpenTelemetry.Exporter.Prometheus/README.md index 29f1468e709..bdc5f849f8e 100644 --- a/src/OpenTelemetry.Exporter.Prometheus/README.md +++ b/src/OpenTelemetry.Exporter.Prometheus/README.md @@ -92,6 +92,11 @@ properties: either the http listener or the middleware registered by `UseOpenTelemetryPrometheusScrapingEndpoint`. Default value: `/metrics`. +* `ScrapeResponseCacheDurationMilliseconds`: Configures scrape endpoint response + caching. Multiple scrape requests within the cache duration time period will + receive the same previously generated response. The default value is 10 + seconds. Set to 0 to disable response caching. + See [`TestPrometheusExporter.cs`](../../examples/Console/TestPrometheusExporter.cs) for example use. diff --git a/test/OpenTelemetry.Exporter.Prometheus.Tests/PrometheusCollectionManagerTests.cs b/test/OpenTelemetry.Exporter.Prometheus.Tests/PrometheusCollectionManagerTests.cs new file mode 100644 index 00000000000..36d2aa087de --- /dev/null +++ b/test/OpenTelemetry.Exporter.Prometheus.Tests/PrometheusCollectionManagerTests.cs @@ -0,0 +1,136 @@ +// +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +using System; +using System.Diagnostics.Metrics; +#if NET461 +using System.Linq; +#endif +using System.Threading; +using System.Threading.Tasks; +using OpenTelemetry.Metrics; +using OpenTelemetry.Tests; +using Xunit; + +namespace OpenTelemetry.Exporter.Prometheus.Tests +{ + public sealed class PrometheusCollectionManagerTests + { + [Fact] + public async Task EnterExitCollectTest() + { + using var meter = new Meter(Utils.GetCurrentMethodName()); + + using (var provider = Sdk.CreateMeterProviderBuilder() + .AddMeter(meter.Name) + .AddPrometheusExporter() + .Build()) + { + if (!provider.TryFindExporter(out PrometheusExporter exporter)) + { + throw new InvalidOperationException("PrometheusExporter could not be found on MeterProvider."); + } + + int runningCollectCount = 0; + var collectFunc = exporter.Collect; + exporter.Collect = (timeout) => + { + bool result = collectFunc(timeout); + runningCollectCount++; + Thread.Sleep(5000); + return result; + }; + + var counter = meter.CreateCounter("counter_int", description: "Prometheus help text goes here \n escaping."); + counter.Add(100); + + Task[] collectTasks = new Task[10]; + for (int i = 0; i < collectTasks.Length; i++) + { + collectTasks[i] = Task.Run(async () => + { + ArraySegment data = await exporter.CollectionManager.EnterCollect().ConfigureAwait(false); + try + { + return data.ToArray(); + } + finally + { + exporter.CollectionManager.ExitCollect(); + } + }); + } + + await Task.WhenAll(collectTasks).ConfigureAwait(false); + + Assert.Equal(1, runningCollectCount); + + byte[] firstBatch = collectTasks[0].Result; + for (int i = 1; i < collectTasks.Length; i++) + { + Assert.Equal(firstBatch, collectTasks[i].Result); + } + + counter.Add(100); + + // This should use the cache and ignore the second counter update. + var task = exporter.CollectionManager.EnterCollect(); + Assert.True(task.IsCompleted); + ArraySegment data = await task.ConfigureAwait(false); + try + { + Assert.Equal(1, runningCollectCount); + Assert.Equal(firstBatch, data); + } + finally + { + exporter.CollectionManager.ExitCollect(); + } + + Thread.Sleep(exporter.Options.ScrapeResponseCacheDurationMilliseconds); + + counter.Add(100); + + for (int i = 0; i < collectTasks.Length; i++) + { + collectTasks[i] = Task.Run(async () => + { + ArraySegment data = await exporter.CollectionManager.EnterCollect().ConfigureAwait(false); + try + { + return data.ToArray(); + } + finally + { + exporter.CollectionManager.ExitCollect(); + } + }); + } + + await Task.WhenAll(collectTasks).ConfigureAwait(false); + + Assert.Equal(2, runningCollectCount); + Assert.NotEqual(firstBatch, collectTasks[0].Result); + + firstBatch = collectTasks[0].Result; + for (int i = 1; i < collectTasks.Length; i++) + { + Assert.Equal(firstBatch, collectTasks[i].Result); + } + } + } + } +}