diff --git a/src/Tests/Microsoft.Diagnostics.Monitoring.TestCommon/Microsoft.Diagnostics.Monitoring.TestCommon.csproj b/src/Tests/Microsoft.Diagnostics.Monitoring.TestCommon/Microsoft.Diagnostics.Monitoring.TestCommon.csproj
index 6dc2f6f4363..23db369d110 100644
--- a/src/Tests/Microsoft.Diagnostics.Monitoring.TestCommon/Microsoft.Diagnostics.Monitoring.TestCommon.csproj
+++ b/src/Tests/Microsoft.Diagnostics.Monitoring.TestCommon/Microsoft.Diagnostics.Monitoring.TestCommon.csproj
@@ -20,6 +20,11 @@
     <ProjectReference Include="..\..\Microsoft.Diagnostics.Monitoring.Options\Microsoft.Diagnostics.Monitoring.Options.csproj" />
   </ItemGroup>
 
+  <ItemGroup>
+    <InternalsVisibleTo Include="Microsoft.Diagnostics.Monitoring.Tool.FunctionalTests"/>
+    <InternalsVisibleTo Include="Microsoft.Diagnostics.Monitoring.Tool.UnitTests"/>
+  </ItemGroup>
+
   <Import Project="GenerateDotNetHost.targets" />
 
 </Project>
\ No newline at end of file
diff --git a/src/Tests/Microsoft.Diagnostics.Monitoring.TestCommon/TestAppScenarios.cs b/src/Tests/Microsoft.Diagnostics.Monitoring.TestCommon/TestAppScenarios.cs
index d2b6aac76ac..96bb8fb42e9 100644
--- a/src/Tests/Microsoft.Diagnostics.Monitoring.TestCommon/TestAppScenarios.cs
+++ b/src/Tests/Microsoft.Diagnostics.Monitoring.TestCommon/TestAppScenarios.cs
@@ -46,5 +46,17 @@ public static class Commands
                 public const string StartLogging = nameof(StartLogging);
             }
         }
+
+        public static class SpinWait
+        {
+            public const string Name = nameof(SpinWait);
+
+            public static class Commands
+            {
+                public const string StartSpin = nameof(StartSpin);
+
+                public const string StopSpin = nameof(StopSpin);
+            }
+        }
     }
 }
diff --git a/src/Tests/Microsoft.Diagnostics.Monitoring.Tool.UnitTests/CollectionRulePipelineTests.cs b/src/Tests/Microsoft.Diagnostics.Monitoring.Tool.UnitTests/CollectionRulePipelineTests.cs
new file mode 100644
index 00000000000..4ba492b9b75
--- /dev/null
+++ b/src/Tests/Microsoft.Diagnostics.Monitoring.Tool.UnitTests/CollectionRulePipelineTests.cs
@@ -0,0 +1,386 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using Microsoft.Diagnostics.Monitoring.TestCommon;
+using Microsoft.Diagnostics.Monitoring.TestCommon.Options;
+using Microsoft.Diagnostics.Monitoring.TestCommon.Runners;
+using Microsoft.Diagnostics.Monitoring.Tool.UnitTests.CollectionRules.Actions;
+using Microsoft.Diagnostics.Monitoring.Tool.UnitTests.CollectionRules.Triggers;
+using Microsoft.Diagnostics.Monitoring.WebApi;
+using Microsoft.Diagnostics.Tools.Monitor.CollectionRules;
+using Microsoft.Diagnostics.Tools.Monitor.CollectionRules.Actions;
+using Microsoft.Diagnostics.Tools.Monitor.CollectionRules.Options;
+using Microsoft.Diagnostics.Tools.Monitor.CollectionRules.Options.Triggers;
+using Microsoft.Diagnostics.Tools.Monitor.CollectionRules.Triggers;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Options;
+using System;
+using System.Collections.Generic;
+using System.Reflection;
+using System.Threading;
+using System.Threading.Tasks;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace Microsoft.Diagnostics.Monitoring.Tool.UnitTests
+{
+    public class CollectionRulePipelineTests
+    {
+        private readonly TimeSpan DefaultPipelineTimeout = TimeSpan.FromSeconds(30);
+        private const string TestRuleName = "TestPipelineRule";
+
+        private readonly ITestOutputHelper _outputHelper;
+
+        public CollectionRulePipelineTests(ITestOutputHelper outputHelper)
+        {
+            _outputHelper = outputHelper;
+        }
+
+        /// <summary>
+        /// Test that the pipeline works with the Startup trigger.
+        /// </summary>
+        [Theory]
+        [MemberData(nameof(GetTfmsSupportingPortListener))]
+        public Task CollectionRulePipeline_StartupTriggerTest(TargetFrameworkMoniker appTfm)
+        {
+            CallbackActionService callbackService = new(_outputHelper);
+
+            return ExecuteScenario(
+                appTfm,
+                TestAppScenarios.AsyncWait.Name,
+                TestRuleName,
+                options =>
+                {
+                    options.CreateCollectionRule(TestRuleName)
+                        .SetStartupTrigger()
+                        .AddAction(CallbackAction.ActionName);
+                },
+                async (runner, pipeline) =>
+                {
+                    using CancellationTokenSource cancellationSource = new(DefaultPipelineTimeout);
+
+                    // Register first callback before pipeline starts. This callback should be completed before
+                    // the pipeline finishes starting.
+                    Task callback1Task = callbackService.WaitWithCancellationAsync(cancellationSource.Token);
+
+                    // Startup trigger will cause the the pipeline to complete the start phase
+                    // after the action list has been completed.
+                    await pipeline.StartAsync(cancellationSource.Token);
+
+                    // Register second callback after pipeline starts. The second callback should not be
+                    // completed because it was registered after the pipeline had finished starting. Since
+                    // the action list is only ever executed once and is executed before the pipeline finishes
+                    // starting, thus subsequent invocations of the action list should not occur.
+                    Task callback2Task = callbackService.WaitWithCancellationAsync(cancellationSource.Token);
+
+                    // Since the action list was completed before the pipeline finished starting,
+                    // the action should have invoked it's callback.
+                    using CancellationTokenSource callbackCancellationSource = new(TimeSpan.FromMilliseconds(50));
+                    await callback1Task.WithCancellation(callbackCancellationSource.Token);
+
+                    // Regardless of the action list constraints, the pipeline should have only
+                    // executed the action list once due to the use of a startup trigger.
+                    VerifyExecutionCount(callbackService, 1);
+
+                    // Validate that the action list was not executed a second time.
+                    Assert.False(callback2Task.IsCompletedSuccessfully);
+
+                    // Pipeline should have completed shortly after finished starting. This should only
+                    // wait for a very short time, if at all.
+                    await pipeline.RunAsync(cancellationSource.Token);
+
+                    // Validate that the action list was not executed a second time.
+                    Assert.False(callback2Task.IsCompletedSuccessfully);
+
+                    await runner.SendCommandAsync(TestAppScenarios.AsyncWait.Commands.Continue);
+                },
+                services =>
+                {
+                    services.RegisterTestAction(callbackService);
+                });
+        }
+
+        /// <summary>
+        /// Test that the pipeline works with the EventCounter trigger.
+        /// </summary>
+        [Theory]
+        [MemberData(nameof(GetTfmsSupportingPortListener))]
+        public Task CollectionRulePipeline_EventCounterTriggerTest(TargetFrameworkMoniker appTfm)
+        {
+            CallbackActionService callbackService = new(_outputHelper);
+
+            return ExecuteScenario(
+                appTfm,
+                TestAppScenarios.SpinWait.Name,
+                TestRuleName,
+                options =>
+                {
+                    options.CreateCollectionRule(TestRuleName)
+                        .SetEventCounterTrigger(out EventCounterOptions eventCounterOptions)
+                        .AddAction(CallbackAction.ActionName);
+
+                    // cpu usage greater that 5% for 3 seconds
+                    eventCounterOptions.ProviderName = "System.Runtime";
+                    eventCounterOptions.CounterName = "cpu-usage";
+                    eventCounterOptions.GreaterThan = 5;
+                    eventCounterOptions.SlidingWindowDuration = TimeSpan.FromSeconds(2);
+                },
+                async (runner, pipeline) =>
+                {
+                    using CancellationTokenSource cancellationSource = new(DefaultPipelineTimeout);
+
+                    // Register first callback before pipeline starts. This callback should be completed after
+                    // the pipeline finishes starting.
+                    Task callbackTask = callbackService.WaitWithCancellationAsync(cancellationSource.Token);
+
+                    // Start pipeline with EventCounter trigger.
+                    await pipeline.StartAsync(cancellationSource.Token);
+
+                    await runner.SendCommandAsync(TestAppScenarios.SpinWait.Commands.StartSpin);
+
+                    // This should not complete until the trigger conditions are satisfied for the first time.
+                    await callbackTask;
+
+                    VerifyExecutionCount(callbackService, 1);
+
+                    await runner.SendCommandAsync(TestAppScenarios.SpinWait.Commands.StopSpin);
+
+                    // Validate that the pipeline is not in a completed state.
+                    // The pipeline should already be running since it was started.
+                    Task runTask = pipeline.RunAsync(cancellationSource.Token);
+                    Assert.False(runTask.IsCompleted);
+
+                    await pipeline.StopAsync(cancellationSource.Token);
+                },
+                services =>
+                {
+                    services.RegisterTestAction(callbackService);
+                });
+        }
+
+        /// <summary>
+        /// Test that the CollectionRulePipeline completes to due to rule duration limit.
+        /// </summary>
+        [Theory]
+        [MemberData(nameof(GetTfmsSupportingPortListener))]
+        public Task CollectionRulePipeline_DurationLimitTest(TargetFrameworkMoniker appTfm)
+        {
+            ManualTriggerService triggerService = new();
+            CallbackActionService callbackService = new(_outputHelper);
+
+            return ExecuteScenario(
+                appTfm,
+                TestAppScenarios.AsyncWait.Name,
+                TestRuleName,
+                options =>
+                {
+                    options.CreateCollectionRule(TestRuleName)
+                        .SetManualTrigger()
+                        .AddAction(CallbackAction.ActionName)
+                        .SetDurationLimit(TimeSpan.FromSeconds(3));
+                },
+                async (runner, pipeline) =>
+                {
+                    using CancellationTokenSource cancellationSource = new(DefaultPipelineTimeout);
+
+                    // Pipeline should run to completion due to rule duration limit.
+                    await pipeline.RunAsync(cancellationSource.Token);
+
+                    await runner.SendCommandAsync(TestAppScenarios.AsyncWait.Commands.Continue);
+
+                    // Action list should not have been executed.
+                    VerifyExecutionCount(callbackService, expectedCount: 0);
+                },
+                services =>
+                {
+                    services.RegisterManualTrigger(triggerService);
+                    services.RegisterTestAction(callbackService);
+                });
+        }
+
+        /// <summary>
+        /// Test that the CollectionRulePipeline completes to due to action count limit.
+        /// </summary>
+        [Theory]
+        [MemberData(nameof(GetTfmsSupportingPortListener))]
+        public Task CollectionRulePipeline_ActionCountLimitUnlimitedDurationTest(TargetFrameworkMoniker appTfm)
+        {
+            const int ExpectedActionExecutionCount = 3;
+
+            ManualTriggerService triggerService = new();
+            CallbackActionService callbackService = new(_outputHelper);
+
+            return ExecuteScenario(
+                appTfm,
+                TestAppScenarios.AsyncWait.Name,
+                TestRuleName,
+                options =>
+                {
+                    options.CreateCollectionRule(TestRuleName)
+                        .SetManualTrigger()
+                        .AddAction(CallbackAction.ActionName)
+                        .SetActionLimits(count: ExpectedActionExecutionCount);
+                },
+                async (runner, pipeline) =>
+                {
+                    using CancellationTokenSource cancellationSource = new(DefaultPipelineTimeout);
+
+                    await pipeline.StartAsync(cancellationSource.Token);
+
+                    await ManualTriggerBurstAsync(triggerService);
+
+                    // Pipeline should run to completion due to action count limit without sliding window.
+                    await pipeline.RunAsync(cancellationSource.Token);
+
+                    await runner.SendCommandAsync(TestAppScenarios.AsyncWait.Commands.Continue);
+
+                    // Action list should have been executed the expected number of times
+                    VerifyExecutionCount(callbackService, ExpectedActionExecutionCount);
+                },
+                services =>
+                {
+                    services.RegisterManualTrigger(triggerService);
+                    services.RegisterTestAction(callbackService);
+                });
+        }
+
+        /// <summary>
+        /// Test that the CollectionRulePipeline thottles actions when action count limit is reached within window.
+        /// </summary>
+        [Theory]
+        [MemberData(nameof(GetTfmsSupportingPortListener))]
+        public Task CollectionRulePipeline_ActionCountLimitSlidingDurationTest(TargetFrameworkMoniker appTfm)
+        {
+            const int ExpectedActionExecutionCount = 3;
+            TimeSpan SlidingWindowDuration = TimeSpan.FromSeconds(3);
+
+            ManualTriggerService triggerService = new();
+            CallbackActionService callbackService = new(_outputHelper);
+
+            return ExecuteScenario(
+                appTfm,
+                TestAppScenarios.AsyncWait.Name,
+                TestRuleName,
+                options =>
+                {
+                    options.CreateCollectionRule(TestRuleName)
+                        .SetManualTrigger()
+                        .AddAction(CallbackAction.ActionName)
+                        .SetActionLimits(
+                            count: ExpectedActionExecutionCount,
+                            slidingWindowDuration: SlidingWindowDuration);
+                },
+                async (runner, pipeline) =>
+                {
+                    using CancellationTokenSource cancellationSource = new(DefaultPipelineTimeout);
+
+                    // Pipeline should run to completion due to action count limit without sliding window.
+                    await pipeline.StartAsync(cancellationSource.Token);
+
+                    await ManualTriggerBurstAsync(triggerService);
+
+                    // Action list should have been executed the expected number of times
+                    VerifyExecutionCount(callbackService, ExpectedActionExecutionCount);
+
+                    // Wait for existing execution times to fall out of sliding window.
+                    await Task.Delay(SlidingWindowDuration * 1.2);
+
+                    await ManualTriggerBurstAsync(triggerService);
+
+                    // Expect total action invocation count to be twice the limit
+                    VerifyExecutionCount(callbackService, 2 * ExpectedActionExecutionCount);
+
+                    await runner.SendCommandAsync(TestAppScenarios.AsyncWait.Commands.Continue);
+
+                    await pipeline.StopAsync(cancellationSource.Token);
+                },
+                services =>
+                {
+                    services.RegisterManualTrigger(triggerService);
+                    services.RegisterTestAction(callbackService);
+                });
+        }
+
+        /// <summary>
+        /// Writes the list of action execution timestamps to the output log.
+        /// </summary>
+        private void VerifyExecutionCount(CallbackActionService service, int expectedCount)
+        {
+            _outputHelper.WriteLine("Action execution times:");
+            foreach (DateTime timestamp in service.ExecutionTimestamps)
+            {
+                _outputHelper.WriteLine("- {0}", timestamp.TimeOfDay);
+            }
+
+            Assert.Equal(expectedCount, service.ExecutionTimestamps.Count);
+        }
+
+        private async Task ManualTriggerBurstAsync(ManualTriggerService service)
+        {
+            for (int i = 0; i < 10; i++)
+            {
+                service.NotifySubscribers();
+                await Task.Delay(TimeSpan.FromMilliseconds(100));
+            }
+        }
+
+        public static IEnumerable<object[]> GetTfmsSupportingPortListener()
+        {
+            yield return new object[] { TargetFrameworkMoniker.Net50 };
+            yield return new object[] { TargetFrameworkMoniker.Net60 };
+        }
+
+        private async Task ExecuteScenario(
+            TargetFrameworkMoniker tfm,
+            string scenarioName,
+            string collectionRuleName,
+            Action<Tools.Monitor.RootOptions> setup,
+            Func<AppRunner, CollectionRulePipeline, Task> pipelineCallback,
+            Action<IServiceCollection> servicesCallback = null)
+        {
+            DiagnosticPortHelper.Generate(DiagnosticPortConnectionMode.Listen, out _, out string transportName);
+            _outputHelper.WriteLine("Starting server endpoint info source at '" + transportName + "'.");
+
+            AppRunner runner = new(_outputHelper, Assembly.GetExecutingAssembly(), tfm: tfm);
+            runner.ConnectionMode = DiagnosticPortConnectionMode.Connect;
+            runner.DiagnosticPortPath = transportName;
+            runner.ScenarioName = scenarioName;
+
+            EndpointInfoSourceCallback endpointInfoCallback = new(_outputHelper);
+            List<Tools.Monitor.IEndpointInfoSourceCallbacks> callbacks = new();
+            callbacks.Add(endpointInfoCallback);
+            Tools.Monitor.ServerEndpointInfoSource source = new(transportName, callbacks);
+            source.Start();
+
+            Task<IEndpointInfo> endpointInfoTask = endpointInfoCallback.WaitForNewEndpointInfoAsync(runner, CommonTestTimeouts.StartProcess);
+
+            await runner.ExecuteAsync(async () =>
+            {
+                IEndpointInfo endpointInfo = await endpointInfoTask;
+
+                await TestHostHelper.CreateCollectionRulesHost(
+                    _outputHelper,
+                    setup,
+                    async host =>
+                    {
+                        ActionListExecutor actionListExecutor =
+                            host.Services.GetRequiredService<ActionListExecutor>();
+                        ICollectionRuleTriggerOperations triggerOperations =
+                            host.Services.GetRequiredService<ICollectionRuleTriggerOperations>();
+                        IOptionsMonitor<CollectionRuleOptions> optionsMonitor =
+                            host.Services.GetRequiredService<IOptionsMonitor<CollectionRuleOptions>>();
+
+                        await using CollectionRulePipeline pipeline = new(
+                            actionListExecutor,
+                            triggerOperations,
+                            optionsMonitor.Get(collectionRuleName),
+                            endpointInfo);
+
+                        await pipelineCallback(runner, pipeline);
+                    },
+                    servicesCallback);
+            });
+        }
+    }
+}
diff --git a/src/Tests/Microsoft.Diagnostics.Monitoring.Tool.UnitTests/CollectionRules/Actions/ActionsServiceCollectionExtensions.cs b/src/Tests/Microsoft.Diagnostics.Monitoring.Tool.UnitTests/CollectionRules/Actions/ActionsServiceCollectionExtensions.cs
new file mode 100644
index 00000000000..658323c3f30
--- /dev/null
+++ b/src/Tests/Microsoft.Diagnostics.Monitoring.Tool.UnitTests/CollectionRules/Actions/ActionsServiceCollectionExtensions.cs
@@ -0,0 +1,19 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using Microsoft.Diagnostics.Tools.Monitor;
+using Microsoft.Extensions.DependencyInjection;
+
+namespace Microsoft.Diagnostics.Monitoring.Tool.UnitTests.CollectionRules.Actions
+{
+    internal static class ActionsServiceCollectionExtensions
+    {
+        public static IServiceCollection RegisterTestAction(this IServiceCollection services, CallbackActionService callback)
+        {
+            services.AddSingleton(callback);
+            services.RegisterCollectionRuleAction<CallbackAction, object>(CallbackAction.ActionName);
+            return services;
+        }
+    }
+}
diff --git a/src/Tests/Microsoft.Diagnostics.Monitoring.Tool.UnitTests/CollectionRules/Actions/CallbackAction.cs b/src/Tests/Microsoft.Diagnostics.Monitoring.Tool.UnitTests/CollectionRules/Actions/CallbackAction.cs
new file mode 100644
index 00000000000..6901c488497
--- /dev/null
+++ b/src/Tests/Microsoft.Diagnostics.Monitoring.Tool.UnitTests/CollectionRules/Actions/CallbackAction.cs
@@ -0,0 +1,137 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using Microsoft.Diagnostics.Monitoring.TestCommon;
+using Microsoft.Diagnostics.Monitoring.WebApi;
+using Microsoft.Diagnostics.Tools.Monitor.CollectionRules.Actions;
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+using Xunit.Abstractions;
+
+namespace Microsoft.Diagnostics.Monitoring.Tool.UnitTests.CollectionRules.Actions
+{
+    internal sealed class CallbackAction : ICollectionRuleAction<object>
+    {
+        public static readonly string ActionName = nameof(CallbackAction);
+
+        private readonly CallbackActionService _service;
+
+        public CallbackAction(CallbackActionService service)
+        {
+            _service = service;
+        }
+
+        public async Task<CollectionRuleActionResult> ExecuteAsync(object options, IEndpointInfo endpointInfo, CancellationToken token)
+        {
+            await _service.NotifyListeners(token);
+
+            return new CollectionRuleActionResult();
+        }
+    }
+
+    internal sealed class CallbackActionService
+    {
+        private readonly List<CompletionEntry> _entries = new();
+        private readonly SemaphoreSlim _entriesSemaphore = new(1);
+        private readonly List<DateTime> _executionTimestamps = new();
+        private readonly ITestOutputHelper _outputHelper;
+
+        private int _nextId = 1;
+
+        public CallbackActionService(ITestOutputHelper outputHelper)
+        {
+            _outputHelper = outputHelper ?? throw new ArgumentNullException(nameof(outputHelper));
+        }
+
+        public async Task NotifyListeners(CancellationToken token)
+        {
+            await _entriesSemaphore.WaitAsync(token);
+            try
+            {
+                lock (_executionTimestamps)
+                {
+                    _executionTimestamps.Add(DateTime.Now);
+                }
+                
+                _outputHelper.WriteLine("[Callback] Completing {0} source(s).", _entries.Count);
+
+                foreach (var entry in _entries)
+                {
+                    entry.Complete();
+                }
+                _entries.Clear();
+            }
+            finally
+            {
+                _entriesSemaphore.Release();
+            }
+        }
+
+        public async Task WaitWithCancellationAsync(CancellationToken token)
+        {
+            int id = _nextId++;
+            string name = $"Callback{id}";
+
+            CompletionEntry entry = new(_outputHelper, name);
+
+            await _entriesSemaphore.WaitAsync(token);
+            try
+            {
+                _outputHelper.WriteLine("[Test] Registering {0}.", name);
+
+                _entries.Add(entry);
+            }
+            finally
+            {
+                _entriesSemaphore.Release();
+            }
+
+            await entry.WithCancellation(token);
+        }
+
+        public IReadOnlyCollection<DateTime> ExecutionTimestamps
+        {
+            get
+            {
+                lock (_executionTimestamps)
+                {
+                    return _executionTimestamps.AsReadOnly();
+                }
+            }
+        }
+
+        private sealed class CompletionEntry
+        {
+            private readonly TaskCompletionSource<object> _completionSource =
+                new(TaskCreationOptions.RunContinuationsAsynchronously);
+            private readonly string _name;
+            private readonly ITestOutputHelper _outputHelper;
+
+            public CompletionEntry(ITestOutputHelper outputHelper, string name)
+            {
+                _name = name;
+                _outputHelper = outputHelper;
+            }
+
+            public void Complete()
+            {
+                _outputHelper.WriteLine("[Callback] Begin completing {0}.", _name);
+                if (!_completionSource.TrySetResult(null))
+                {
+                    _outputHelper.WriteLine("[Callback] Unable to complete {0}.", _name);
+                }
+                _outputHelper.WriteLine("[Callback] End completing {0}.", _name);
+            }
+
+            public async Task WithCancellation(CancellationToken token)
+            {
+                _outputHelper.WriteLine("[Test] Begin waiting for {0} completion.", _name);
+                await _completionSource.WithCancellation(token);
+                _outputHelper.WriteLine("[Test] End waiting for {0} completion.", _name);
+            }
+        }
+    }
+}
diff --git a/src/Tests/Microsoft.Diagnostics.Monitoring.Tool.UnitTests/CollectionRules/Triggers/ManualTrigger.cs b/src/Tests/Microsoft.Diagnostics.Monitoring.Tool.UnitTests/CollectionRules/Triggers/ManualTrigger.cs
new file mode 100644
index 00000000000..90355529a76
--- /dev/null
+++ b/src/Tests/Microsoft.Diagnostics.Monitoring.Tool.UnitTests/CollectionRules/Triggers/ManualTrigger.cs
@@ -0,0 +1,70 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using Microsoft.Diagnostics.Monitoring.WebApi;
+using Microsoft.Diagnostics.Tools.Monitor.CollectionRules.Triggers;
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Microsoft.Diagnostics.Monitoring.Tool.UnitTests.CollectionRules.Triggers
+{
+    internal sealed class ManualTriggerFactory : ICollectionRuleTriggerFactory
+    {
+        private readonly ManualTriggerService _service;
+
+        public ManualTriggerFactory(ManualTriggerService service)
+        {
+            _service = service ?? throw new ArgumentNullException(nameof(service));
+        }
+
+        public ICollectionRuleTrigger Create(IEndpointInfo endpointInfo, Action callback)
+        {
+            return new ManualTrigger(_service, callback);
+        }
+    }
+
+    internal sealed class ManualTrigger : ICollectionRuleTrigger
+    {
+        public const string TriggerName = nameof(ManualTrigger);
+
+        private readonly Action _callback;
+        private readonly ManualTriggerService _service;
+
+        public ManualTrigger(ManualTriggerService service, Action callback)
+        {
+            _callback = callback ?? throw new ArgumentNullException(nameof(callback));
+            _service = service ?? throw new ArgumentNullException(nameof(service));
+        }
+
+        public Task StartAsync(CancellationToken cancellationToken)
+        {
+            _service.Notify += NotifyHandler;
+
+            return Task.CompletedTask;
+        }
+
+        public Task StopAsync(CancellationToken cancellationToken)
+        {
+            _service.Notify -= NotifyHandler;
+
+            return Task.CompletedTask;
+        }
+
+        private void NotifyHandler(object sender, EventArgs args)
+        {
+            _callback();
+        }
+    }
+
+    internal sealed class ManualTriggerService
+    {
+        public event EventHandler Notify;
+
+        public void NotifySubscribers()
+        {
+            Notify?.Invoke(this, EventArgs.Empty);
+        }
+    }
+}
diff --git a/src/Tests/Microsoft.Diagnostics.Monitoring.Tool.UnitTests/CollectionRules/Triggers/TriggersServiceCollectionExtensions.cs b/src/Tests/Microsoft.Diagnostics.Monitoring.Tool.UnitTests/CollectionRules/Triggers/TriggersServiceCollectionExtensions.cs
new file mode 100644
index 00000000000..f25c3c76790
--- /dev/null
+++ b/src/Tests/Microsoft.Diagnostics.Monitoring.Tool.UnitTests/CollectionRules/Triggers/TriggersServiceCollectionExtensions.cs
@@ -0,0 +1,18 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using Microsoft.Diagnostics.Tools.Monitor;
+using Microsoft.Extensions.DependencyInjection;
+
+namespace Microsoft.Diagnostics.Monitoring.Tool.UnitTests.CollectionRules.Triggers
+{
+    internal static class TriggersServiceCollectionExtensions
+    {
+        public static IServiceCollection RegisterManualTrigger(this IServiceCollection services, ManualTriggerService service)
+        {
+            services.AddSingleton(service);
+            return services.RegisterCollectionRuleTrigger<ManualTriggerFactory>(ManualTrigger.TriggerName);
+        }
+    }
+}
diff --git a/src/Tests/Microsoft.Diagnostics.Monitoring.Tool.UnitTests/Options/CollectionRuleOptionsExtensions.cs b/src/Tests/Microsoft.Diagnostics.Monitoring.Tool.UnitTests/Options/CollectionRuleOptionsExtensions.cs
index 2fd255922ef..f708d394b03 100644
--- a/src/Tests/Microsoft.Diagnostics.Monitoring.Tool.UnitTests/Options/CollectionRuleOptionsExtensions.cs
+++ b/src/Tests/Microsoft.Diagnostics.Monitoring.Tool.UnitTests/Options/CollectionRuleOptionsExtensions.cs
@@ -3,11 +3,13 @@
 // See the LICENSE file in the project root for more information.
 
 using Microsoft.Diagnostics.Monitoring.Tool.UnitTests;
+using Microsoft.Diagnostics.Monitoring.Tool.UnitTests.CollectionRules.Triggers;
 using Microsoft.Diagnostics.Monitoring.WebApi.Models;
 using Microsoft.Diagnostics.Tools.Monitor.CollectionRules;
 using Microsoft.Diagnostics.Tools.Monitor.CollectionRules.Options;
 using Microsoft.Diagnostics.Tools.Monitor.CollectionRules.Options.Actions;
 using Microsoft.Diagnostics.Tools.Monitor.CollectionRules.Options.Triggers;
+using System;
 using System.Collections.Generic;
 using System.Linq;
 using Xunit;
@@ -129,6 +131,31 @@ public static CollectionRuleOptions AddExecuteActionAppAction(this CollectionRul
             return options;
         }
 
+        public static CollectionRuleOptions SetActionLimits(this CollectionRuleOptions options, int? count = null, TimeSpan? slidingWindowDuration = null)
+        {
+            if (null == options.Limits)
+            {
+                options.Limits = new CollectionRuleLimitsOptions();
+            }
+
+            options.Limits.ActionCount = count;
+            options.Limits.ActionCountSlidingWindowDuration = slidingWindowDuration;
+
+            return options;
+        }
+
+        public static CollectionRuleOptions SetDurationLimit(this CollectionRuleOptions options, TimeSpan duration)
+        {
+            if (null == options.Limits)
+            {
+                options.Limits = new CollectionRuleLimitsOptions();
+            }
+            
+            options.Limits.RuleDuration = duration;
+
+            return options;
+        }
+
         public static CollectionRuleOptions SetEventCounterTrigger(this CollectionRuleOptions options, out EventCounterOptions settings)
         {
             SetTrigger(options, KnownCollectionRuleTriggers.EventCounter, out CollectionRuleTriggerOptions triggerOptions);
@@ -140,6 +167,13 @@ public static CollectionRuleOptions SetEventCounterTrigger(this CollectionRuleOp
             return options;
         }
 
+        public static CollectionRuleOptions SetManualTrigger(this CollectionRuleOptions options)
+        {
+            SetTrigger(options, ManualTrigger.TriggerName, out _);
+
+            return options;
+        }
+
         public static CollectionRuleOptions SetStartupTrigger(this CollectionRuleOptions options)
         {
             return SetTrigger(options, KnownCollectionRuleTriggers.Startup, out _);
diff --git a/src/Tests/Microsoft.Diagnostics.Monitoring.Tool.UnitTests/TestHostHelper.cs b/src/Tests/Microsoft.Diagnostics.Monitoring.Tool.UnitTests/TestHostHelper.cs
index f43c50c66e9..04eb714ff7a 100644
--- a/src/Tests/Microsoft.Diagnostics.Monitoring.Tool.UnitTests/TestHostHelper.cs
+++ b/src/Tests/Microsoft.Diagnostics.Monitoring.Tool.UnitTests/TestHostHelper.cs
@@ -4,6 +4,7 @@
 
 using Microsoft.Diagnostics.Tools.Monitor;
 using Microsoft.Extensions.Configuration;
+using Microsoft.Extensions.DependencyInjection;
 using Microsoft.Extensions.Hosting;
 using System;
 using System.Collections.Generic;
@@ -17,13 +18,14 @@ internal static class TestHostHelper
         public static async Task CreateCollectionRulesHost(
             ITestOutputHelper outputHelper,
             Action<RootOptions> setup,
-            Func<IHost, Task> callback)
+            Func<IHost, Task> hostCallback,
+            Action<IServiceCollection> servicesCallback = null)
         {
-            IHost host = CreateHost(outputHelper, setup);
+            IHost host = CreateHost(outputHelper, setup, servicesCallback);
 
             try
             {
-                await callback(host);
+                await hostCallback(host);
             }
             finally
             {
@@ -34,13 +36,14 @@ public static async Task CreateCollectionRulesHost(
         public static async Task CreateCollectionRulesHost(
             ITestOutputHelper outputHelper,
             Action<RootOptions> setup,
-            Action<IHost> callback)
+            Action<IHost> hostCallback,
+            Action<IServiceCollection> servicesCallback = null)
         {
-            IHost host = CreateHost(outputHelper, setup);
+            IHost host = CreateHost(outputHelper, setup, servicesCallback);
 
             try
             {
-                callback(host);
+                hostCallback(host);
             }
             finally
             {
@@ -50,7 +53,8 @@ public static async Task CreateCollectionRulesHost(
 
         public static IHost CreateHost(
             ITestOutputHelper outputHelper,
-            Action<RootOptions> setup)
+            Action<RootOptions> setup,
+            Action<IServiceCollection> servicesCallback)
         {
             return new HostBuilder()
                 .ConfigureAppConfiguration(builder =>
@@ -72,6 +76,7 @@ public static IHost CreateHost(
                 {
                     services.ConfigureCollectionRules();
                     services.ConfigureEgress();
+                    servicesCallback?.Invoke(services);
                 })
                 .Build();
         }
diff --git a/src/Tests/Microsoft.Diagnostics.Monitoring.UnitTestApp/Program.cs b/src/Tests/Microsoft.Diagnostics.Monitoring.UnitTestApp/Program.cs
index a2d2e527b29..5a589bd0526 100644
--- a/src/Tests/Microsoft.Diagnostics.Monitoring.UnitTestApp/Program.cs
+++ b/src/Tests/Microsoft.Diagnostics.Monitoring.UnitTestApp/Program.cs
@@ -16,6 +16,7 @@ public static Task<int> Main(string[] args)
             return new CommandLineBuilder()
                 .AddCommand(AsyncWaitScenario.Command())
                 .AddCommand(LoggerScenario.Command())
+                .AddCommand(SpinWaitScenario.Command())
                 .UseDefaults()
                 .Build()
                 .InvokeAsync(args);
diff --git a/src/Tests/Microsoft.Diagnostics.Monitoring.UnitTestApp/Scenarios/SpinWaitScenario.cs b/src/Tests/Microsoft.Diagnostics.Monitoring.UnitTestApp/Scenarios/SpinWaitScenario.cs
new file mode 100644
index 00000000000..b53428832bf
--- /dev/null
+++ b/src/Tests/Microsoft.Diagnostics.Monitoring.UnitTestApp/Scenarios/SpinWaitScenario.cs
@@ -0,0 +1,44 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using Microsoft.Diagnostics.Monitoring.TestCommon;
+using System;
+using System.CommandLine;
+using System.CommandLine.Invocation;
+using System.Diagnostics;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Microsoft.Diagnostics.Monitoring.UnitTestApp.Scenarios
+{
+    /// <summary>
+    /// Synchronously spins until it receives the Continue command.
+    /// </summary>
+    internal class SpinWaitScenario
+    {
+        public static Command Command()
+        {
+            Command command = new(TestAppScenarios.SpinWait.Name);
+            command.Handler = CommandHandler.Create((Func<CancellationToken, Task<int>>)ExecuteAsync);
+            return command;
+        }
+
+        public static Task<int> ExecuteAsync(CancellationToken token)
+        {
+            return ScenarioHelpers.RunScenarioAsync(async logger =>
+            {
+                await ScenarioHelpers.WaitForCommandAsync(TestAppScenarios.SpinWait.Commands.StartSpin, logger);
+
+                Task continueTask = Task.Run(() => ScenarioHelpers.WaitForCommandAsync(TestAppScenarios.SpinWait.Commands.StopSpin, logger));
+
+                while (!continueTask.IsCompleted)
+                {
+                    Thread.SpinWait(1_000_000);
+                }
+
+                return 0;
+            }, token);
+        }
+    }
+}
diff --git a/src/Tools/dotnet-monitor/CollectionRules/Actions/ExecuteAction.cs b/src/Tools/dotnet-monitor/CollectionRules/Actions/ExecuteAction.cs
index 5092ed72b13..910b7f62f97 100644
--- a/src/Tools/dotnet-monitor/CollectionRules/Actions/ExecuteAction.cs
+++ b/src/Tools/dotnet-monitor/CollectionRules/Actions/ExecuteAction.cs
@@ -68,9 +68,7 @@ public async Task WaitForExitAsync(Process process, Task<int> exitedTask, Cancel
         {
             if (!process.HasExited)
             {
-                TaskCompletionSource<object> cancellationTaskSource = new();
-                using var _ = token.Register(() => cancellationTaskSource.TrySetCanceled(token));
-                await Task.WhenAny(exitedTask, cancellationTaskSource.Task).Unwrap().ConfigureAwait(false);
+                await exitedTask.WithCancellation(token).ConfigureAwait(false);
             }
         }
 
diff --git a/src/Tools/dotnet-monitor/CollectionRules/CollectionRulePipeline.cs b/src/Tools/dotnet-monitor/CollectionRules/CollectionRulePipeline.cs
new file mode 100644
index 00000000000..ce2893525f5
--- /dev/null
+++ b/src/Tools/dotnet-monitor/CollectionRules/CollectionRulePipeline.cs
@@ -0,0 +1,248 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using Microsoft.Diagnostics.Monitoring;
+using Microsoft.Diagnostics.Monitoring.WebApi;
+using Microsoft.Diagnostics.Tools.Monitor.CollectionRules.Actions;
+using Microsoft.Diagnostics.Tools.Monitor.CollectionRules.Options;
+using Microsoft.Diagnostics.Tools.Monitor.CollectionRules.Triggers;
+using System;
+using System.Collections.Generic;
+using System.Globalization;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Microsoft.Diagnostics.Tools.Monitor.CollectionRules
+{
+    internal class CollectionRulePipeline : Pipeline
+    {
+        // The executor of the action list for the collection rule.
+        private readonly ActionListExecutor _actionListExecutor;
+
+        // The endpoint that represents the process on which the collection rule is executed.
+        private readonly IEndpointInfo _endpointInfo;
+
+        // The rule description that determines the behavior of the pipeline.
+        private readonly CollectionRuleOptions _ruleOptions;
+        
+        // Task completion source for signalling when the pipeline has finished starting.
+        private readonly TaskCompletionSource<object> _startedSource =
+            new(TaskCreationOptions.RunContinuationsAsynchronously);
+        
+        // Operations for getting trigger information.
+        private readonly ICollectionRuleTriggerOperations _triggerOperations;
+
+        public CollectionRulePipeline(
+            ActionListExecutor actionListExecutor,
+            ICollectionRuleTriggerOperations triggerOperations,
+            CollectionRuleOptions ruleOptions,
+            IEndpointInfo endpointInfo)
+        {
+            _actionListExecutor = actionListExecutor ?? throw new ArgumentNullException(nameof(actionListExecutor));
+            _endpointInfo = endpointInfo ?? throw new ArgumentNullException(nameof(endpointInfo));
+            _ruleOptions = ruleOptions ?? throw new ArgumentNullException(nameof(ruleOptions));
+            _triggerOperations = triggerOperations ?? throw new ArgumentNullException(nameof(triggerOperations));
+        }
+
+        /// <summary>
+        /// Starts the execution of the pipeline without waiting for it to run to completion.
+        /// </summary>
+        /// <remarks>
+        /// If the specified trigger is a startup trigger, this method will complete when the
+        /// action list has completed execution. If the specified trigger is not a startup
+        /// trigger, this method will complete after the trigger has been started.
+        /// </remarks>
+        public async Task StartAsync(CancellationToken token)
+        {
+            var runTask = RunAsync(token);
+
+            await _startedSource.WithCancellation(token).ConfigureAwait(false);
+        }
+
+        /// <summary>
+        /// Runs the pipeline to completion.
+        /// </summary>
+        /// <remarks>
+        /// The pipeline will only successfully complete in the following scenarios:
+        /// (1) the trigger is a startup trigger and the action list successfully executes once.
+        /// (2) without a specified action count window duration, the number of action list executions equals the action count limit.
+        /// </remarks>
+        protected override async Task OnRun(CancellationToken token)
+        {
+            if (!_triggerOperations.TryCreateFactory(_ruleOptions.Trigger.Type, out ICollectionRuleTriggerFactoryProxy factory))
+            {
+                throw new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, Strings.ErrorMessage_CouldNotMapToTrigger, _ruleOptions.Trigger.Type));
+            }
+
+            using CancellationTokenSource durationCancellationSource = new();
+            using CancellationTokenSource linkedCancellationSource = CancellationTokenSource.CreateLinkedTokenSource(
+                durationCancellationSource.Token,
+                token);
+
+            CancellationToken linkedToken = linkedCancellationSource.Token;
+
+            TimeSpan? actionCountWindowDuration = _ruleOptions.Limits?.ActionCountSlidingWindowDuration;
+            int actionCountLimit = (_ruleOptions.Limits?.ActionCount).GetValueOrDefault(CollectionRuleLimitsOptionsDefaults.ActionCount);
+            Queue<DateTime> executionTimestamps = new(actionCountLimit);
+
+            // Start cancellation timer for graceful stop of the collection rule
+            // when the rule duration has been specified. Conditionally enable this
+            // based on if the rule has a duration limit.
+            TimeSpan? ruleDuration = _ruleOptions.Limits?.RuleDuration;
+            if (ruleDuration.HasValue)
+            {
+                durationCancellationSource.CancelAfter(ruleDuration.Value);
+            }
+
+            try
+            {
+                bool completePipeline = false;
+                while (!completePipeline)
+                {
+                    TaskCompletionSource<object> triggerSatisfiedSource =
+                        new(TaskCreationOptions.RunContinuationsAsynchronously);
+
+                    ICollectionRuleTrigger trigger = null;
+                    try
+                    {
+                        trigger = factory.Create(
+                            _endpointInfo,
+                            () => triggerSatisfiedSource.TrySetResult(null),
+                            _ruleOptions.Trigger.Settings);
+
+                        if (null == trigger)
+                        {
+                            throw new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, Strings.ErrorMessage_TriggerFactoryFailed, _ruleOptions.Trigger.Type));
+                        }
+
+                        // Start the trigger.
+                        await trigger.StartAsync(linkedToken).ConfigureAwait(false);
+
+                        // The pipeline signals that it has started just after starting a non-startup trigger.
+                        // Instances with startup triggers signal start after having finished executing the action list.
+                        if (trigger is not ICollectionRuleStartupTrigger)
+                        {
+                            // Signal that the pipeline trigger is initialized.
+                            _startedSource.TrySetResult(null);
+                        }
+
+                        // Wait for the trigger to be satisfied.
+                        await triggerSatisfiedSource.WithCancellation(linkedToken).ConfigureAwait(false);
+                    }
+                    finally
+                    {
+                        try
+                        {
+                            // Intentionally not using the linkedToken. If the linkedToken was signaled
+                            // due to pipeline duration expiring, try to stop the trigger gracefully
+                            // unless forced by a caller to the pipeline.
+                            await trigger.StopAsync(token).ConfigureAwait(false);
+                        }
+                        finally
+                        {
+                            if (trigger is IAsyncDisposable asyncDisposableTrigger)
+                            {
+                                await asyncDisposableTrigger.DisposeAsync().ConfigureAwait(false);
+                            }
+                            else if (trigger is IDisposable disposableTrigger)
+                            {
+                                disposableTrigger.Dispose();
+                            }
+                        }
+                    }
+
+                    DateTime currentTimestamp = DateTime.UtcNow;
+
+                    // If rule has an action count window, Remove all execution timestamps that fall outside the window.
+                    if (actionCountWindowDuration.HasValue)
+                    {
+                        DateTime windowStartTimestamp = currentTimestamp - actionCountWindowDuration.Value;
+                        while (executionTimestamps.Count > 0)
+                        {
+                            DateTime executionTimestamp = executionTimestamps.Peek();
+                            if (executionTimestamp < windowStartTimestamp)
+                            {
+                                executionTimestamps.Dequeue();
+                            }
+                            else
+                            {
+                                // Stop clearing out previous executions
+                                break;
+                            }
+                        }
+                    }
+
+                    // Check if executing actions has been throttled due to count limit
+                    if (actionCountLimit > executionTimestamps.Count)
+                    {
+                        executionTimestamps.Enqueue(currentTimestamp);
+
+                        try
+                        {
+                            // Intentionally not using the linkedToken. Allow the action list to execute gracefully
+                            // unless forced by a caller to cancel or stop the running of the pipeline.
+                            await _actionListExecutor.ExecuteActions(_ruleOptions.Actions, _endpointInfo, token);
+                        }
+                        catch (Exception ex) when (ex is not OperationCanceledException)
+                        {
+                            // Bad action execution shouldn't fail the pipeline.
+                            // TODO: log that the action execution has failed.
+                        }
+                        finally
+                        {
+                            // The collection rule has executed the action list the maximum
+                            // number of times as specified by the limits and the action count
+                            // window was not specified. Since the pipeline can no longer execute
+                            // actions, the pipeline can complete.
+                            completePipeline = actionCountLimit <= executionTimestamps.Count &&
+                                !actionCountWindowDuration.HasValue;
+                        }
+                    }
+                    else
+                    {
+                        // Throttled
+                    }
+
+                    linkedToken.ThrowIfCancellationRequested();
+
+                    // If the trigger is a startup trigger, only execute the action list once
+                    // and then complete the pipeline.
+                    if (trigger is ICollectionRuleStartupTrigger)
+                    {
+                        // Signal that the pipeline trigger is initialized.
+                        _startedSource.TrySetResult(null);
+
+                        // Complete the pipeline since the action list is only executed once
+                        // for collection rules with startup triggers.
+                        completePipeline = true;
+                    }
+                }
+            }
+            catch (OperationCanceledException) when (durationCancellationSource.IsCancellationRequested)
+            {
+                // This exception is caused by the pipeline duration expiring.
+                // Handle it to allow pipeline to be in completed state.
+            }
+        }
+
+        protected override Task OnCleanup()
+        {
+            _startedSource.TrySetCanceled();
+
+            return base.OnCleanup();
+        }
+
+        // Temporary until Pipeline APIs are public or get an InternalsVisibleTo for the tests
+        public new Task RunAsync(CancellationToken token)
+        {
+            return base.RunAsync(token);
+        }
+
+        // Temporary until Pipeline APIs are public or get an InternalsVisibleTo for the tests
+        public new Task StopAsync(CancellationToken token)
+        {
+            return base.StopAsync(token);
+        }
+    }
+}
diff --git a/src/Tools/dotnet-monitor/CollectionRules/Triggers/EventCounterTriggerFactory.cs b/src/Tools/dotnet-monitor/CollectionRules/Triggers/EventCounterTriggerFactory.cs
index 44029aea3dd..254e6185c3a 100644
--- a/src/Tools/dotnet-monitor/CollectionRules/Triggers/EventCounterTriggerFactory.cs
+++ b/src/Tools/dotnet-monitor/CollectionRules/Triggers/EventCounterTriggerFactory.cs
@@ -2,6 +2,8 @@
 // The .NET Foundation licenses this file to you under the MIT license.
 // See the LICENSE file in the project root for more information.
 
+using Microsoft.Diagnostics.Monitoring.EventPipe.Triggers;
+using Microsoft.Diagnostics.Monitoring.EventPipe.Triggers.EventCounter;
 using Microsoft.Diagnostics.Monitoring.WebApi;
 using Microsoft.Diagnostics.Tools.Monitor.CollectionRules.Options.Triggers;
 using System;
@@ -14,10 +16,36 @@ namespace Microsoft.Diagnostics.Tools.Monitor.CollectionRules.Triggers
     internal sealed class EventCounterTriggerFactory :
         ICollectionRuleTriggerFactory<EventCounterOptions>
     {
+        private readonly EventPipeTriggerFactory _eventPipeTriggerFactory;
+        private readonly ITraceEventTriggerFactory<EventCounterTriggerSettings> _traceEventTriggerFactory;
+
+        public EventCounterTriggerFactory(
+            EventPipeTriggerFactory eventPipeTriggerFactory,
+            ITraceEventTriggerFactory<EventCounterTriggerSettings> traceEventTriggerFactory)
+        {
+            _eventPipeTriggerFactory = eventPipeTriggerFactory;
+            _traceEventTriggerFactory = traceEventTriggerFactory;
+        }
+
         /// <inheritdoc/>
         public ICollectionRuleTrigger Create(IEndpointInfo endpointInfo, Action callback, EventCounterOptions options)
         {
-            throw new NotImplementedException("TODO: Implement EventCounterTrigger.");
+            EventCounterTriggerSettings settings = new()
+            {
+                ProviderName = options.ProviderName,
+                CounterIntervalSeconds = options.Frequency.GetValueOrDefault(EventCounterOptionsDefaults.Frequency),
+                CounterName = options.CounterName,
+                GreaterThan = options.GreaterThan,
+                LessThan = options.LessThan,
+                SlidingWindowDuration = options.SlidingWindowDuration.GetValueOrDefault(TimeSpan.Parse(EventCounterOptionsDefaults.SlidingWindowDuration))
+            };
+
+            return _eventPipeTriggerFactory.Create(
+                endpointInfo,
+                EventCounterTrigger.CreateConfiguration(settings),
+                _traceEventTriggerFactory,
+                settings,
+                callback);
         }
     }
 }
diff --git a/src/Tools/dotnet-monitor/CollectionRules/Triggers/EventPipeTriggerFactory.cs b/src/Tools/dotnet-monitor/CollectionRules/Triggers/EventPipeTriggerFactory.cs
new file mode 100644
index 00000000000..c2d17d2c257
--- /dev/null
+++ b/src/Tools/dotnet-monitor/CollectionRules/Triggers/EventPipeTriggerFactory.cs
@@ -0,0 +1,91 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using Microsoft.Diagnostics.Monitoring.EventPipe;
+using Microsoft.Diagnostics.Monitoring.EventPipe.Triggers;
+using Microsoft.Diagnostics.Monitoring.EventPipe.Triggers.Pipelines;
+using Microsoft.Diagnostics.Monitoring.WebApi;
+using Microsoft.Diagnostics.NETCore.Client;
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Microsoft.Diagnostics.Tools.Monitor.CollectionRules.Triggers
+{
+    internal sealed class EventPipeTriggerFactory
+    {
+        /// <summary>
+        /// Creates a collection rule trigger sourced from the event pipe of the target
+        /// process represented by the specified endpoint.
+        /// </summary>
+        public ICollectionRuleTrigger Create<TSettings>(
+            IEndpointInfo endpointInfo,
+            MonitoringSourceConfiguration configuration,
+            ITraceEventTriggerFactory<TSettings> factory,
+            TSettings settings,
+            Action callback)
+        {
+            return new EventPipeTrigger<TSettings>(
+                endpointInfo,
+                configuration,
+                factory,
+                settings,
+                callback);
+        }
+
+        private sealed class EventPipeTrigger<TSettings> :
+            ICollectionRuleTrigger,
+            IAsyncDisposable
+        {
+            private readonly EventPipeTriggerPipeline<TSettings> _pipeline;
+
+            public EventPipeTrigger(
+                IEndpointInfo endpointInfo,
+                MonitoringSourceConfiguration configuration,
+                ITraceEventTriggerFactory<TSettings> factory,
+                TSettings settings,
+                Action callback)
+            {
+                EventPipeTriggerPipelineSettings<TSettings> pipelineSettings = new()
+                {
+                    Configuration = configuration,
+                    Duration = Timeout.InfiniteTimeSpan,
+                    TriggerFactory = factory,
+                    TriggerSettings = settings
+                };
+
+                _pipeline = new EventPipeTriggerPipeline<TSettings>(
+                    new DiagnosticsClient(endpointInfo.Endpoint),
+                    pipelineSettings,
+                    _ => callback());
+            }
+
+            public async Task StartAsync(CancellationToken cancellationToken)
+            {
+                // Wrap the passed CancellationToken into a linked CancellationTokenSource so that the
+                // RunAsync method is only cancellable for the execution of the StartAsync method. Don't
+                // want the caller to be able to cancel the run of the pipeline after having finished
+                // executing the StartAsync method.
+                using CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
+
+                Task runTask = _pipeline.RunAsync(cts.Token);
+
+                IEventSourcePipelineInternal pipelineInternal = _pipeline;
+
+                // Wait for the event pipe session to have started before returning.
+                await pipelineInternal.SessionStarted;
+            }
+
+            public Task StopAsync(CancellationToken cancellationToken)
+            {
+                return _pipeline.StopAsync(cancellationToken);
+            }
+
+            public async ValueTask DisposeAsync()
+            {
+                await _pipeline.DisposeAsync();
+            }
+        }
+    }
+}
diff --git a/src/Tools/dotnet-monitor/CollectionRules/Triggers/StartupTrigger.cs b/src/Tools/dotnet-monitor/CollectionRules/Triggers/StartupTrigger.cs
index bea01f23152..26ef5553177 100644
--- a/src/Tools/dotnet-monitor/CollectionRules/Triggers/StartupTrigger.cs
+++ b/src/Tools/dotnet-monitor/CollectionRules/Triggers/StartupTrigger.cs
@@ -2,6 +2,7 @@
 // The .NET Foundation licenses this file to you under the MIT license.
 // See the LICENSE file in the project root for more information.
 
+using System;
 using System.Threading;
 using System.Threading.Tasks;
 
@@ -14,8 +15,16 @@ namespace Microsoft.Diagnostics.Tools.Monitor.CollectionRules.Triggers
     internal sealed class StartupTrigger :
         ICollectionRuleStartupTrigger
     {
+        private readonly Action _callback;
+
+        public StartupTrigger(Action callback)
+        {
+            _callback = callback;
+        }
+
         public Task StartAsync(CancellationToken cancellationToken)
         {
+            _callback();
             return Task.CompletedTask;
         }
 
diff --git a/src/Tools/dotnet-monitor/CollectionRules/Triggers/StartupTriggerFactory.cs b/src/Tools/dotnet-monitor/CollectionRules/Triggers/StartupTriggerFactory.cs
index e9b25f916b4..65082e9ab05 100644
--- a/src/Tools/dotnet-monitor/CollectionRules/Triggers/StartupTriggerFactory.cs
+++ b/src/Tools/dotnet-monitor/CollectionRules/Triggers/StartupTriggerFactory.cs
@@ -16,7 +16,7 @@ internal sealed class StartupTriggerFactory :
         /// <inheritdoc/>
         public ICollectionRuleTrigger Create(IEndpointInfo endpointInfo, Action callback)
         {
-            return new StartupTrigger();
+            return new StartupTrigger(callback);
         }
     }
 }
diff --git a/src/Tools/dotnet-monitor/ServiceCollectionExtensions.cs b/src/Tools/dotnet-monitor/ServiceCollectionExtensions.cs
index 30e9cdf1eb2..f25ee3f7279 100644
--- a/src/Tools/dotnet-monitor/ServiceCollectionExtensions.cs
+++ b/src/Tools/dotnet-monitor/ServiceCollectionExtensions.cs
@@ -4,6 +4,8 @@
 
 using Microsoft.AspNetCore.Authentication;
 using Microsoft.AspNetCore.Authentication.JwtBearer;
+using Microsoft.Diagnostics.Monitoring.EventPipe.Triggers;
+using Microsoft.Diagnostics.Monitoring.EventPipe.Triggers.EventCounter;
 using Microsoft.Diagnostics.Monitoring.WebApi;
 using Microsoft.Diagnostics.Tools.Monitor.CollectionRules;
 using Microsoft.Diagnostics.Tools.Monitor.CollectionRules.Actions;
@@ -72,9 +74,12 @@ public static IServiceCollection ConfigureCollectionRules(this IServiceCollectio
             services.RegisterCollectionRuleTrigger<AspNetRequestCountTriggerFactory, AspNetRequestCountOptions>(KnownCollectionRuleTriggers.AspNetRequestCount);
             services.RegisterCollectionRuleTrigger<AspNetRequestDurationTriggerFactory, AspNetRequestDurationOptions>(KnownCollectionRuleTriggers.AspNetRequestDuration);
             services.RegisterCollectionRuleTrigger<AspNetResponseStatusTriggerFactory, AspNetResponseStatusOptions>(KnownCollectionRuleTriggers.AspNetResponseStatus);
-            services.RegisterCollectionRuleTrigger<EventCounterTriggerFactory, EventCounterOptions>(KnownCollectionRuleTriggers.EventCounter);
+            services.RegisterCollectionRuleTrigger<CollectionRules.Triggers.EventCounterTriggerFactory, EventCounterOptions>(KnownCollectionRuleTriggers.EventCounter);
             services.RegisterCollectionRuleTrigger<StartupTriggerFactory>(KnownCollectionRuleTriggers.Startup);
 
+            services.AddSingleton<EventPipeTriggerFactory>();
+            services.AddSingleton<ITraceEventTriggerFactory<EventCounterTriggerSettings>, Monitoring.EventPipe.Triggers.EventCounter.EventCounterTriggerFactory>();
+
             services.AddSingleton<CollectionRulesConfigurationProvider>();
             services.AddSingleton<ICollectionRuleActionOperations, CollectionRuleActionOperations>();
             services.AddSingleton<ICollectionRuleTriggerOperations, CollectionRuleTriggerOperations>();
@@ -87,7 +92,7 @@ public static IServiceCollection ConfigureCollectionRules(this IServiceCollectio
             return services;
         }
 
-        private static IServiceCollection RegisterCollectionRuleAction<TAction, TOptions>(this IServiceCollection services, string actionName)
+        public static IServiceCollection RegisterCollectionRuleAction<TAction, TOptions>(this IServiceCollection services, string actionName)
             where TAction : class, ICollectionRuleAction<TOptions>
             where TOptions : class, new()
         {
@@ -100,7 +105,7 @@ private static IServiceCollection RegisterCollectionRuleAction<TAction, TOptions
             return services;
         }
 
-        private static IServiceCollection RegisterCollectionRuleTrigger<TFactory>(this IServiceCollection services, string triggerName)
+        public static IServiceCollection RegisterCollectionRuleTrigger<TFactory>(this IServiceCollection services, string triggerName)
             where TFactory : class, ICollectionRuleTriggerFactory
         {
             services.AddSingleton<TFactory>();
@@ -110,7 +115,7 @@ private static IServiceCollection RegisterCollectionRuleTrigger<TFactory>(this I
             return services;
         }
 
-        private static IServiceCollection RegisterCollectionRuleTrigger<TFactory, TOptions>(this IServiceCollection services, string triggerName)
+        public static IServiceCollection RegisterCollectionRuleTrigger<TFactory, TOptions>(this IServiceCollection services, string triggerName)
             where TFactory : class, ICollectionRuleTriggerFactory<TOptions>
             where TOptions : class, new()
         {
diff --git a/src/Tools/dotnet-monitor/Strings.Designer.cs b/src/Tools/dotnet-monitor/Strings.Designer.cs
index 8e5a11db4d0..df5d29a4f8a 100644
--- a/src/Tools/dotnet-monitor/Strings.Designer.cs
+++ b/src/Tools/dotnet-monitor/Strings.Designer.cs
@@ -78,6 +78,15 @@ internal static string ErrorMessage_CouldNotMapToAction {
             }
         }
         
+        /// <summary>
+        ///   Looks up a localized string similar to The &apos;{0}&apos; trigger was not registered correctly..
+        /// </summary>
+        internal static string ErrorMessage_CouldNotMapToTrigger {
+            get {
+                return ResourceManager.GetString("ErrorMessage_CouldNotMapToTrigger", resourceCulture);
+            }
+        }
+        
         /// <summary>
         ///   Looks up a localized string similar to In &apos;Listen&apos; mode, the diagnostic port endpoint name must be specified..
         /// </summary>
@@ -285,6 +294,15 @@ internal static string ErrorMessage_RejectedJwk {
             }
         }
         
+        /// <summary>
+        ///   Looks up a localized string similar to The &apos;{0}&apos; trigger factory failed to create a trigger instance..
+        /// </summary>
+        internal static string ErrorMessage_TriggerFactoryFailed {
+            get {
+                return ResourceManager.GetString("ErrorMessage_TriggerFactoryFailed", resourceCulture);
+            }
+        }
+        
         /// <summary>
         ///   Looks up a localized string similar to Both the {0} field and the {1} field cannot be specified..
         /// </summary>
diff --git a/src/Tools/dotnet-monitor/Strings.resx b/src/Tools/dotnet-monitor/Strings.resx
index f8eb8461bfe..dfa1787fb8d 100644
--- a/src/Tools/dotnet-monitor/Strings.resx
+++ b/src/Tools/dotnet-monitor/Strings.resx
@@ -125,6 +125,10 @@
     <value>The '{0}' action was not registered correctly.</value>
     <comment>Gets the format string for invalid mapping from a string to an ICollectionRuleActionProxy.</comment>
   </data>
+  <data name="ErrorMessage_CouldNotMapToTrigger" xml:space="preserve">
+    <value>The '{0}' trigger was not registered correctly.</value>
+    <comment>Gets the format string for invalid mapping from a string to an ICollectionRuleTriggerProxy.</comment>
+  </data>
   <data name="ErrorMessage_DiagnosticPortMissingInListenMode" xml:space="preserve">
     <value>In 'Listen' mode, the diagnostic port endpoint name must be specified.</value>
     <comment>Gets a string similar to "In 'Listen' mode, the diagnostic port endpoint name must be specified.".</comment>
@@ -252,6 +256,10 @@
 1 Format Parameter:
 0. configName: The variable name for the provided string.</comment>
   </data>
+  <data name="ErrorMessage_TriggerFactoryFailed" xml:space="preserve">
+    <value>The '{0}' trigger factory failed to create a trigger instance.</value>
+    <comment>Gets the format string describing the error condition when a trigger factory does not return a valid trigger instance.</comment>
+  </data>
   <data name="ErrorMessage_TwoFieldsCannotBeSpecified" xml:space="preserve">
     <value>Both the {0} field and the {1} field cannot be specified.</value>
     <comment>{0} = Name of first field that must not be specified if second field is specified.
diff --git a/src/Tools/dotnet-monitor/TaskExtensions.cs b/src/Tools/dotnet-monitor/TaskExtensions.cs
index 1a0a4ab75ca..aa092c7ec84 100644
--- a/src/Tools/dotnet-monitor/TaskExtensions.cs
+++ b/src/Tools/dotnet-monitor/TaskExtensions.cs
@@ -11,7 +11,7 @@ namespace Microsoft.Diagnostics.Monitoring.TestCommon
 namespace Microsoft.Diagnostics.Tools.Monitor
 #endif
 {
-    public static class TaskExtensions
+    internal static class TaskExtensions
     {
         public static async Task WithCancellation(this Task task, CancellationToken token)
         {