diff --git a/documentation/schema.json b/documentation/schema.json
index 8e825d0aa7a..0d3fabe6421 100644
--- a/documentation/schema.json
+++ b/documentation/schema.json
@@ -1722,6 +1722,17 @@
"type": "string",
"description": "The name of the egress provider to which the trace is egressed.",
"minLength": 1
+ },
+ "StoppingEvent": {
+ "description": "The event to watch for while collecting the trace, and once observed the trace will be stopped.",
+ "oneOf": [
+ {
+ "type": "null"
+ },
+ {
+ "$ref": "#/definitions/TraceEventFilter"
+ }
+ ]
}
}
},
@@ -1793,6 +1804,36 @@
"Verbose"
]
},
+ "TraceEventFilter": {
+ "type": "object",
+ "additionalProperties": false,
+ "required": [
+ "ProviderName",
+ "EventName"
+ ],
+ "properties": {
+ "ProviderName": {
+ "type": "string",
+ "description": "The event provider that will produce the specified event.",
+ "minLength": 1
+ },
+ "EventName": {
+ "type": "string",
+ "description": "The name of the event, which is a concatenation of the task name and opcode name, if any. The task and opcode names are separated by a '/'. If the event has no opcode, then the event name is just the task name.",
+ "minLength": 1
+ },
+ "PayloadFilter": {
+ "type": [
+ "null",
+ "object"
+ ],
+ "description": "A mapping of event payload field names to their expected value. A subset of the payload fields may be specified.",
+ "additionalProperties": {
+ "type": "string"
+ }
+ }
+ }
+ },
"ExecuteOptions": {
"type": "object",
"additionalProperties": false,
diff --git a/src/Microsoft.Diagnostics.Monitoring.Options/OptionsDisplayStrings.Designer.cs b/src/Microsoft.Diagnostics.Monitoring.Options/OptionsDisplayStrings.Designer.cs
index c8c368690c4..e4bffb6763a 100644
--- a/src/Microsoft.Diagnostics.Monitoring.Options/OptionsDisplayStrings.Designer.cs
+++ b/src/Microsoft.Diagnostics.Monitoring.Options/OptionsDisplayStrings.Designer.cs
@@ -715,6 +715,15 @@ public static string DisplayAttributeDescription_CollectTraceOptions_RequestRund
}
}
+ ///
+ /// Looks up a localized string similar to The event to watch for while collecting the trace, and once observed the trace will be stopped..
+ ///
+ public static string DisplayAttributeDescription_CollectTraceOptions_StoppingEvent {
+ get {
+ return ResourceManager.GetString("DisplayAttributeDescription_CollectTraceOptions_StoppingEvent", resourceCulture);
+ }
+ }
+
///
/// Looks up a localized string similar to Buffer size used when copying data from an egress callback returning a stream to the egress callback that is provided a stream to which data is written..
///
@@ -1391,6 +1400,33 @@ public static string DisplayAttributeDescription_ThreadpoolQueueLengthOptions_Le
}
}
+ ///
+ /// Looks up a localized string similar to The name of the event, which is a concatenation of the task name and opcode name, if any. The task and opcode names are separated by a '/'. If the event has no opcode, then the event name is just the task name..
+ ///
+ public static string DisplayAttributeDescription_TraceEventFilter_EventName {
+ get {
+ return ResourceManager.GetString("DisplayAttributeDescription_TraceEventFilter_EventName", resourceCulture);
+ }
+ }
+
+ ///
+ /// Looks up a localized string similar to A mapping of event payload field names to their expected value. A subset of the payload fields may be specified..
+ ///
+ public static string DisplayAttributeDescription_TraceEventFilter_PayloadFilter {
+ get {
+ return ResourceManager.GetString("DisplayAttributeDescription_TraceEventFilter_PayloadFilter", resourceCulture);
+ }
+ }
+
+ ///
+ /// Looks up a localized string similar to The event provider that will produce the specified event..
+ ///
+ public static string DisplayAttributeDescription_TraceEventFilter_ProviderName {
+ get {
+ return ResourceManager.GetString("DisplayAttributeDescription_TraceEventFilter_ProviderName", resourceCulture);
+ }
+ }
+
///
/// Looks up a localized string similar to The {0} field, {1} field, or {2} field is required..
///
diff --git a/src/Microsoft.Diagnostics.Monitoring.Options/OptionsDisplayStrings.resx b/src/Microsoft.Diagnostics.Monitoring.Options/OptionsDisplayStrings.resx
index 68cba82be5f..c5c6e2fcd41 100644
--- a/src/Microsoft.Diagnostics.Monitoring.Options/OptionsDisplayStrings.resx
+++ b/src/Microsoft.Diagnostics.Monitoring.Options/OptionsDisplayStrings.resx
@@ -729,4 +729,20 @@
The name of the egress provider to which the call stacks are egressed.
+
+ The event to watch for while collecting the trace, and once observed the trace will be stopped.
+ The description provided for the StoppingEvent parameter on CollectTraceOptions.
+
+
+ The name of the event, which is a concatenation of the task name and opcode name, if any. The task and opcode names are separated by a '/'. If the event has no opcode, then the event name is just the task name.
+ The description provided for the EventName parameter on TraceEventFilter.
+
+
+ The event provider that will produce the specified event.
+ The description provided for the ProviderName parameter on TraceEventFilter.
+
+
+ A mapping of event payload field names to their expected value. A subset of the payload fields may be specified.
+ The description provided for the PayloadFilter parameter on TraceEventFilter.
+
\ No newline at end of file
diff --git a/src/Microsoft.Diagnostics.Monitoring.WebApi/EventMonitoringPassthroughStream.cs b/src/Microsoft.Diagnostics.Monitoring.WebApi/EventMonitoringPassthroughStream.cs
new file mode 100644
index 00000000000..7312162b2cb
--- /dev/null
+++ b/src/Microsoft.Diagnostics.Monitoring.WebApi/EventMonitoringPassthroughStream.cs
@@ -0,0 +1,260 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using Microsoft.Diagnostics.Tracing;
+using System;
+using System.Collections.Generic;
+using System.Globalization;
+using System.IO;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Microsoft.Diagnostics.Monitoring.WebApi
+{
+ ///
+ /// A stream that can monitor an event stream which is compatible with
+ /// for a specific event while also passing along the event data to a destination stream.
+ ///
+ public sealed class EventMonitoringPassthroughStream : Stream
+ {
+ private readonly Action _onPayloadFilterMismatch;
+ private readonly Action _onEvent;
+ private readonly bool _callOnEventOnlyOnce;
+
+ private readonly Stream _sourceStream;
+ private readonly Stream _destinationStream;
+ private EventPipeEventSource _eventSource;
+
+ private readonly string _providerName;
+ private readonly string _eventName;
+
+ // The original payload filter of fieldName->fieldValue specified by the user. It will only be used to hydrate _payloadFilterIndexCache.
+ private readonly IDictionary _payloadFilter;
+
+ // This tracks the exact indices into the provided event's payload to check for the expected values instead
+ // of repeatedly searching the payload for the field names in _payloadFilter.
+ private Dictionary _payloadFilterIndexCache;
+
+
+ ///
+ /// A stream that can monitor an event stream which is compatible with
+ /// for a specific event while also passing along the event data to a destination stream.
+ ///
+ /// The stopping event provider name.
+ /// The stopping event name, which is the concatenation of the task name and opcode name, if set. for more information about the format.
+ /// A mapping of the stopping event payload field names to their expected values. A subset of the payload fields may be specified.
+ /// A callback that will be invoked each time the requested event has been observed.
+ /// A callback that will be invoked if the field names specified in do not match those in the event's manifest.
+ /// The source event stream which is compatible with .
+ /// The destination stream to write events. It must either be full duplex or be write-only.
+ /// The size of the buffer to use when writing to the .
+ /// If true, the provided will only be called for the first matching event.
+ /// If true, the provided will not be automatically closed when this class is.
+ public EventMonitoringPassthroughStream(
+ string providerName,
+ string eventName,
+ IDictionary payloadFilter,
+ Action onEvent,
+ Action onPayloadFilterMismatch,
+ Stream sourceStream,
+ Stream destinationStream,
+ int bufferSize,
+ bool callOnEventOnlyOnce,
+ bool leaveDestinationStreamOpen) : base()
+ {
+ _providerName = providerName;
+ _eventName = eventName;
+ _onEvent = onEvent;
+ _onPayloadFilterMismatch = onPayloadFilterMismatch;
+ _sourceStream = sourceStream;
+ _payloadFilter = payloadFilter;
+ _callOnEventOnlyOnce = callOnEventOnlyOnce;
+
+ // Wrap a buffered stream around the destination stream
+ // to avoid slowing down the event processing with the data
+ // passthrough unless there is significant pressure.
+ _destinationStream = new BufferedStream(
+ leaveDestinationStreamOpen
+ ? new StreamLeaveOpenWrapper(destinationStream)
+ : destinationStream,
+ bufferSize);
+ }
+
+ ///
+ /// Start processing the event stream, monitoring it for the requested event and transferring its data to the specified destination stream.
+ /// This will continue to run until the event stream is complete or a stop is requested, regardless of if the requested event has been observed.
+ ///
+ /// The cancellation token.
+ ///
+ public Task ProcessAsync(CancellationToken token)
+ {
+ return Task.Run(() =>
+ {
+ _eventSource = new EventPipeEventSource(this);
+ token.ThrowIfCancellationRequested();
+ using IDisposable registration = token.Register(() => _eventSource.Dispose());
+
+ _eventSource.Dynamic.AddCallbackForProviderEvent(_providerName, _eventName, TraceEventCallback);
+
+ // The EventPipeEventSource will drive the transferring of data to the destination stream as it processes events.
+ _eventSource.Process();
+ token.ThrowIfCancellationRequested();
+ }, token);
+ }
+
+ ///
+ /// Stops monitoring for the specified stopping event. Data will continue to be written to the provided destination stream.
+ ///
+ private void StopMonitoringForEvent()
+ {
+ _eventSource?.Dynamic.RemoveCallback(TraceEventCallback);
+ }
+
+ private void TraceEventCallback(TraceEvent obj)
+ {
+ if (_payloadFilterIndexCache == null && !HydratePayloadFilterCache(obj))
+ {
+ // The payload filter doesn't map onto the actual data,
+ // we'll never match the event so stop checking it
+ // and proceed with just transferring the data to the destination stream.
+ StopMonitoringForEvent();
+ _onPayloadFilterMismatch(obj);
+ return;
+ }
+
+ if (!DoesPayloadMatch(obj))
+ {
+ return;
+ }
+
+ if (_callOnEventOnlyOnce)
+ {
+ StopMonitoringForEvent();
+ }
+
+ _onEvent(obj);
+ }
+
+ ///
+ /// Hydrates the payload filter cache.
+ ///
+ /// An instance of the stopping event (matching provider, task name, and opcode), but without checking the payload yet.
+ ///
+ private bool HydratePayloadFilterCache(TraceEvent obj)
+ {
+ if (_payloadFilterIndexCache != null)
+ {
+ return true;
+ }
+
+ // If there's no payload filter, there's nothing to do.
+ if (_payloadFilter == null || _payloadFilter.Count == 0)
+ {
+ _payloadFilterIndexCache = new Dictionary(capacity: 0);
+ return true;
+ }
+
+ // If the payload has fewer fields than the requested filter, we can never match it.
+ // NOTE: this function will only ever be called with an instance of the stopping event
+ // (matching provider, task name, and opcode) but without checking the payload yet.
+ if (obj.PayloadNames.Length < _payloadFilter.Count)
+ {
+ return false;
+ }
+
+ Dictionary payloadFilterCache = new(capacity: _payloadFilter.Count);
+ for (int i = 0; (i < obj.PayloadNames.Length) && (payloadFilterCache.Count < _payloadFilter.Count); i++)
+ {
+ if (_payloadFilter.TryGetValue(obj.PayloadNames[i], out string payloadValue))
+ {
+ payloadFilterCache.Add(i, payloadValue);
+ }
+ }
+
+ // Check if one or more of the requested filter field names did not exist on the actual payload.
+ if (_payloadFilter.Count != payloadFilterCache.Count)
+ {
+ return false;
+ }
+
+ _payloadFilterIndexCache = payloadFilterCache;
+
+ return true;
+ }
+
+ private bool DoesPayloadMatch(TraceEvent obj)
+ {
+ foreach (var (fieldIndex, expectedValue) in _payloadFilterIndexCache)
+ {
+ string fieldValue = Convert.ToString(obj.PayloadValue(fieldIndex), CultureInfo.InvariantCulture) ?? string.Empty;
+ if (!string.Equals(fieldValue, expectedValue, StringComparison.Ordinal))
+ {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ public override int Read(byte[] buffer, int offset, int count)
+ {
+ return Read(buffer.AsSpan(offset, count));
+ }
+
+ public override int Read(Span buffer)
+ {
+ int bytesRead = _sourceStream.Read(buffer);
+ if (bytesRead != 0)
+ {
+ _destinationStream.Write(buffer[..bytesRead]);
+ }
+
+ return bytesRead;
+ }
+
+ public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ {
+ return ReadAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask();
+ }
+
+ public override async ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default)
+ {
+ int bytesRead = await _sourceStream.ReadAsync(buffer, cancellationToken);
+ if (bytesRead != 0)
+ {
+ await _destinationStream.WriteAsync(buffer[..bytesRead], cancellationToken);
+ }
+
+ return bytesRead;
+ }
+
+ public override bool CanSeek => false;
+ public override bool CanWrite => false;
+
+ public override bool CanTimeout => _sourceStream.CanRead;
+ public override bool CanRead => _sourceStream.CanRead;
+ public override long Length => _sourceStream.Length;
+
+ public override long Position { get => _sourceStream.Position; set => _sourceStream.Position = value; }
+ public override int ReadTimeout { get => _sourceStream.ReadTimeout; set => _sourceStream.ReadTimeout = value; }
+
+ public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
+ public override void SetLength(long value) => throw new NotSupportedException();
+ public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
+
+ public override void CopyTo(Stream destination, int bufferSize) => throw new NotSupportedException();
+ public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken) => throw new NotSupportedException();
+
+ public override void Flush() => _destinationStream.Flush();
+ public override Task FlushAsync(CancellationToken cancellationToken) => _destinationStream.FlushAsync(cancellationToken);
+
+ public override async ValueTask DisposeAsync()
+ {
+ _eventSource?.Dispose();
+ await _sourceStream.DisposeAsync();
+ await _destinationStream.DisposeAsync();
+ await base.DisposeAsync();
+ }
+ }
+}
diff --git a/src/Microsoft.Diagnostics.Monitoring.WebApi/LoggingExtensions.cs b/src/Microsoft.Diagnostics.Monitoring.WebApi/LoggingExtensions.cs
index aeb48aca370..d4e936fe6fd 100644
--- a/src/Microsoft.Diagnostics.Monitoring.WebApi/LoggingExtensions.cs
+++ b/src/Microsoft.Diagnostics.Monitoring.WebApi/LoggingExtensions.cs
@@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
+using Microsoft.Diagnostics.Tracing;
using Microsoft.Extensions.Logging;
using System;
@@ -51,6 +52,18 @@ internal static class LoggingExtensions
logLevel: LogLevel.Warning,
formatString: Strings.LogFormatString_DefaultProcessUnexpectedFailure);
+ private static readonly Action _stoppingTraceEventHit =
+ LoggerMessage.Define(
+ eventId: new EventId(8, "StoppingTraceEventHit"),
+ logLevel: LogLevel.Debug,
+ formatString: Strings.LogFormatString_StoppingTraceEventHit);
+
+ private static readonly Action _stoppingTraceEventPayloadFilterMismatch =
+ LoggerMessage.Define(
+ eventId: new EventId(9, "StoppingTraceEventPayloadFilterMismatch"),
+ logLevel: LogLevel.Warning,
+ formatString: Strings.LogFormatString_StoppingTraceEventPayloadFilterMismatch);
+
private static readonly Action _diagnosticRequestFailed =
LoggerMessage.Define(
eventId: new EventId(10, "DiagnosticRequestFailed"),
@@ -98,6 +111,16 @@ public static void DefaultProcessUnexpectedFailure(this ILogger logger, Exceptio
_defaultProcessUnexpectedFailure(logger, ex);
}
+ public static void StoppingTraceEventHit(this ILogger logger, TraceEvent traceEvent)
+ {
+ _stoppingTraceEventHit(logger, traceEvent.ProviderName, traceEvent.EventName, null);
+ }
+
+ public static void StoppingTraceEventPayloadFilterMismatch(this ILogger logger, TraceEvent traceEvent)
+ {
+ _stoppingTraceEventPayloadFilterMismatch(logger, traceEvent.ProviderName, traceEvent.EventName, string.Join(' ', traceEvent.PayloadNames), null);
+ }
+
public static void DiagnosticRequestFailed(this ILogger logger, int processId, Exception ex)
{
_diagnosticRequestFailed(logger, processId, ex);
diff --git a/src/Microsoft.Diagnostics.Monitoring.WebApi/StreamLeaveOpenWrapper.cs b/src/Microsoft.Diagnostics.Monitoring.WebApi/StreamLeaveOpenWrapper.cs
new file mode 100644
index 00000000000..635d87bbf2d
--- /dev/null
+++ b/src/Microsoft.Diagnostics.Monitoring.WebApi/StreamLeaveOpenWrapper.cs
@@ -0,0 +1,74 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.IO;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Microsoft.Diagnostics.Monitoring.WebApi
+{
+ ///
+ /// Wraps a given stream but leaves it open on Dispose.
+ ///
+ public sealed class StreamLeaveOpenWrapper : Stream
+ {
+ private readonly Stream _baseStream;
+
+ public StreamLeaveOpenWrapper(Stream baseStream) : base()
+ {
+ _baseStream = baseStream;
+ }
+
+ public override bool CanSeek => _baseStream.CanSeek;
+
+ public override bool CanTimeout => _baseStream.CanTimeout;
+
+ public override bool CanRead => _baseStream.CanRead;
+
+ public override bool CanWrite => _baseStream.CanWrite;
+
+ public override long Length => _baseStream.Length;
+
+ public override long Position { get => _baseStream.Position; set => _baseStream.Position = value; }
+
+ public override int ReadTimeout { get => _baseStream.ReadTimeout; set => _baseStream.ReadTimeout = value; }
+
+ public override int WriteTimeout { get => _baseStream.WriteTimeout; set => _baseStream.WriteTimeout = value; }
+
+ public override long Seek(long offset, SeekOrigin origin) => _baseStream.Seek(offset, origin);
+
+ public override int Read(Span buffer) => _baseStream.Read(buffer);
+
+ public override int Read(byte[] buffer, int offset, int count) => _baseStream.Read(buffer, offset, count);
+
+ public override int ReadByte() => _baseStream.ReadByte();
+
+ public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => _baseStream.ReadAsync(buffer, offset, count, cancellationToken);
+
+ public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) => _baseStream.ReadAsync(buffer, cancellationToken);
+
+ public override void Flush() => _baseStream.Flush();
+
+ public override void SetLength(long value) => _baseStream.SetLength(value);
+
+ public override void Write(byte[] buffer, int offset, int count) => _baseStream.Write(buffer, offset, count);
+
+ public override void Write(ReadOnlySpan buffer) => _baseStream.Write(buffer);
+
+ public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => _baseStream.WriteAsync(buffer, offset, count, cancellationToken);
+
+ public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) => _baseStream.WriteAsync(buffer, cancellationToken);
+
+ public override void WriteByte(byte value) => _baseStream.WriteByte(value);
+
+ public override Task FlushAsync(CancellationToken cancellationToken) => _baseStream.FlushAsync(cancellationToken);
+
+ public override void CopyTo(Stream destination, int bufferSize) => _baseStream.CopyTo(destination, bufferSize);
+
+ public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken) => _baseStream.CopyToAsync(destination, bufferSize, cancellationToken);
+
+ public override async ValueTask DisposeAsync() => await base.DisposeAsync();
+ }
+}
diff --git a/src/Microsoft.Diagnostics.Monitoring.WebApi/Utilities/TraceUtilities.cs b/src/Microsoft.Diagnostics.Monitoring.WebApi/Utilities/TraceUtilities.cs
index 69e7294d0c4..abee5032299 100644
--- a/src/Microsoft.Diagnostics.Monitoring.WebApi/Utilities/TraceUtilities.cs
+++ b/src/Microsoft.Diagnostics.Monitoring.WebApi/Utilities/TraceUtilities.cs
@@ -14,6 +14,14 @@ namespace Microsoft.Diagnostics.Monitoring.WebApi
{
internal static class TraceUtilities
{
+ // Buffer size matches FileStreamResult
+ private const int DefaultBufferSize = 0x10000;
+
+ public static string GenerateTraceFileName(IEndpointInfo endpointInfo)
+ {
+ return FormattableString.Invariant($"{Utilities.GetFileNameTimeStampUtcNow()}_{endpointInfo.ProcessId}.nettrace");
+ }
+
public static MonitoringSourceConfiguration GetTraceConfiguration(Models.TraceProfile profile, float metricsIntervalSeconds)
{
var configurations = new List();
@@ -65,5 +73,73 @@ public static MonitoringSourceConfiguration GetTraceConfiguration(Models.EventPi
requestRundown: requestRundown,
bufferSizeInMB: bufferSizeInMB);
}
+
+ public static async Task CaptureTraceAsync(TaskCompletionSource