From 3b8370416eee6b5148bfd91339e73ee168b2b8a4 Mon Sep 17 00:00:00 2001 From: Drew Noakes Date: Tue, 12 Dec 2023 13:59:23 +1100 Subject: [PATCH 1/4] Simplify ResourcePublisher We publish updates to resources via `IAsyncEnumerable`. The previous code defined a custom enumerable/enumerator for this, but it's possible to use a generator function instead and reduce the amount of code a bit. --- .../Dashboard/ResourcePublisher.cs | 69 ++++--------------- 1 file changed, 15 insertions(+), 54 deletions(-) diff --git a/src/Aspire.Hosting/Dashboard/ResourcePublisher.cs b/src/Aspire.Hosting/Dashboard/ResourcePublisher.cs index 60c096d0f0..939b0c5c75 100644 --- a/src/Aspire.Hosting/Dashboard/ResourcePublisher.cs +++ b/src/Aspire.Hosting/Dashboard/ResourcePublisher.cs @@ -28,12 +28,22 @@ public ResourceSubscription Subscribe() return new ResourceSubscription( Snapshot: _snapshot.Values.ToList(), - Subscription: new ResourceSubscriptionEnumerable(channel, disposeAction: RemoveChannel)); - } + Subscription: StreamUpdates()); - void RemoveChannel(Channel channel) - { - ImmutableInterlocked.Update(ref _outgoingChannels, static (set, channel) => set.Remove(channel), channel); + async IAsyncEnumerable StreamUpdates() + { + try + { + while (!cancellationToken.IsCancellationRequested) + { + yield return await channel.Reader.ReadAsync(cancellationToken).ConfigureAwait(false); + } + } + finally + { + ImmutableInterlocked.Update(ref _outgoingChannels, static (set, channel) => set.Remove(channel), channel); + } + } } } @@ -64,53 +74,4 @@ public async ValueTask Integrate(ResourceViewModel resource, ResourceChangeType await channel.Writer.WriteAsync(new(changeType, resource), cancellationToken).ConfigureAwait(false); } } - - private sealed class ResourceSubscriptionEnumerable : IAsyncEnumerable - { - private readonly Channel _channel; - private readonly Action> _disposeAction; - - public ResourceSubscriptionEnumerable(Channel channel, Action> disposeAction) - { - _channel = channel; - _disposeAction = disposeAction; - } - - public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) - { - return new ResourceSubscriptionEnumerator(_channel, _disposeAction, cancellationToken); - } - } - - private sealed class ResourceSubscriptionEnumerator : IAsyncEnumerator - { - private readonly Channel _channel; - private readonly Action> _disposeAction; - private readonly CancellationToken _cancellationToken; - - public ResourceSubscriptionEnumerator( - Channel channel, Action> disposeAction, CancellationToken cancellationToken) - { - _channel = channel; - _disposeAction = disposeAction; - _cancellationToken = cancellationToken; - Current = default!; - } - - public ResourceChange Current { get; private set; } - - public ValueTask DisposeAsync() - { - _disposeAction(_channel); - - return ValueTask.CompletedTask; - } - - public async ValueTask MoveNextAsync() - { - Current = await _channel.Reader.ReadAsync(_cancellationToken).ConfigureAwait(false); - - return true; - } - } } From 2da6e2b29866a57dca80fbb21e5658bfd66cb466 Mon Sep 17 00:00:00 2001 From: Drew Noakes Date: Tue, 12 Dec 2023 22:00:58 +1100 Subject: [PATCH 2/4] Rename ResourceChangeType.Deleted -> Delete It's more consistent with the other enum member, named Upsert (which should otherwise be Upserted). --- src/Aspire.Dashboard/Components/Pages/ConsoleLogs.razor.cs | 2 +- src/Aspire.Dashboard/Components/Pages/Resources.razor.cs | 2 +- src/Aspire.Dashboard/Model/ResourceChangeType.cs | 2 +- src/Aspire.Dashboard/Model/ResourceOutgoingPeerResolver.cs | 2 +- src/Aspire.Hosting/Dashboard/DcpDataSource.cs | 2 +- src/Aspire.Hosting/Dashboard/ResourcePublisher.cs | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/Aspire.Dashboard/Components/Pages/ConsoleLogs.razor.cs b/src/Aspire.Dashboard/Components/Pages/ConsoleLogs.razor.cs index 6e21431d8d..b519e05053 100644 --- a/src/Aspire.Dashboard/Components/Pages/ConsoleLogs.razor.cs +++ b/src/Aspire.Dashboard/Components/Pages/ConsoleLogs.razor.cs @@ -208,7 +208,7 @@ private async Task OnResourceListChangedAsync(ResourceChangeType changeType, Res } } } - else if (changeType == ResourceChangeType.Deleted) + else if (changeType == ResourceChangeType.Delete) { _resourceNameMapping.Remove(resourceViewModel.Name); diff --git a/src/Aspire.Dashboard/Components/Pages/Resources.razor.cs b/src/Aspire.Dashboard/Components/Pages/Resources.razor.cs index bc91a48bc2..c8d5e2cab2 100644 --- a/src/Aspire.Dashboard/Components/Pages/Resources.razor.cs +++ b/src/Aspire.Dashboard/Components/Pages/Resources.razor.cs @@ -155,7 +155,7 @@ private async Task OnResourceListChanged(ResourceChangeType changeType, Resource _resourcesMap[resource.Name] = resource; break; - case ResourceChangeType.Deleted: + case ResourceChangeType.Delete: _resourcesMap.Remove(resource.Name); break; } diff --git a/src/Aspire.Dashboard/Model/ResourceChangeType.cs b/src/Aspire.Dashboard/Model/ResourceChangeType.cs index 2c74ad1e90..26e9a8dd0c 100644 --- a/src/Aspire.Dashboard/Model/ResourceChangeType.cs +++ b/src/Aspire.Dashboard/Model/ResourceChangeType.cs @@ -15,5 +15,5 @@ public enum ResourceChangeType /// /// The object was deleted. /// - Deleted + Delete } diff --git a/src/Aspire.Dashboard/Model/ResourceOutgoingPeerResolver.cs b/src/Aspire.Dashboard/Model/ResourceOutgoingPeerResolver.cs index 0ef203f788..e16d277ec0 100644 --- a/src/Aspire.Dashboard/Model/ResourceOutgoingPeerResolver.cs +++ b/src/Aspire.Dashboard/Model/ResourceOutgoingPeerResolver.cs @@ -39,7 +39,7 @@ private async Task OnResourceListChanged(ResourceChangeType changeType, Resource { _resourceNameMapping[resourceViewModel.Name] = resourceViewModel; } - else if (changeType == ResourceChangeType.Deleted) + else if (changeType == ResourceChangeType.Delete) { _resourceNameMapping.TryRemove(resourceViewModel.Name, out _); } diff --git a/src/Aspire.Hosting/Dashboard/DcpDataSource.cs b/src/Aspire.Hosting/Dashboard/DcpDataSource.cs index a9e1df02d0..3d39dfeb29 100644 --- a/src/Aspire.Hosting/Dashboard/DcpDataSource.cs +++ b/src/Aspire.Hosting/Dashboard/DcpDataSource.cs @@ -405,7 +405,7 @@ private static ResourceChangeType ToChangeType(WatchEventType watchEventType) return watchEventType switch { WatchEventType.Added or WatchEventType.Modified => ResourceChangeType.Upsert, - WatchEventType.Deleted => ResourceChangeType.Deleted, + WatchEventType.Deleted => ResourceChangeType.Delete, _ => ResourceChangeType.Other }; } diff --git a/src/Aspire.Hosting/Dashboard/ResourcePublisher.cs b/src/Aspire.Hosting/Dashboard/ResourcePublisher.cs index 939b0c5c75..54024247c4 100644 --- a/src/Aspire.Hosting/Dashboard/ResourcePublisher.cs +++ b/src/Aspire.Hosting/Dashboard/ResourcePublisher.cs @@ -63,7 +63,7 @@ public async ValueTask Integrate(ResourceViewModel resource, ResourceChangeType _snapshot[resource.Name] = resource; break; - case ResourceChangeType.Deleted: + case ResourceChangeType.Delete: _snapshot.Remove(resource.Name); break; } From 033cb859783d38344adeb59755587f79392e3cf2 Mon Sep 17 00:00:00 2001 From: Drew Noakes Date: Tue, 12 Dec 2023 22:01:17 +1100 Subject: [PATCH 3/4] Add Async suffix to async method --- src/Aspire.Hosting/Dashboard/ResourcePublisher.cs | 2 +- src/Aspire.Hosting/Dashboard/ResourceService.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Aspire.Hosting/Dashboard/ResourcePublisher.cs b/src/Aspire.Hosting/Dashboard/ResourcePublisher.cs index 54024247c4..b860a3595c 100644 --- a/src/Aspire.Hosting/Dashboard/ResourcePublisher.cs +++ b/src/Aspire.Hosting/Dashboard/ResourcePublisher.cs @@ -53,7 +53,7 @@ async IAsyncEnumerable StreamUpdates() /// The resource that was modified. /// The change type (Added, Modified, Deleted). /// A task that completes when the cache has been updated and all subscribers notified. - public async ValueTask Integrate(ResourceViewModel resource, ResourceChangeType changeType) + public async ValueTask IntegrateAsync(ResourceViewModel resource, ResourceChangeType changeType) { lock (_syncLock) { diff --git a/src/Aspire.Hosting/Dashboard/ResourceService.cs b/src/Aspire.Hosting/Dashboard/ResourceService.cs index badf808f6a..1a557b9c54 100644 --- a/src/Aspire.Hosting/Dashboard/ResourceService.cs +++ b/src/Aspire.Hosting/Dashboard/ResourceService.cs @@ -21,7 +21,7 @@ public ResourceService( _resourcePublisher = new ResourcePublisher(_cancellationTokenSource.Token); - _ = new DcpDataSource(kubernetesService, applicationModel, loggerFactory, _resourcePublisher.Integrate, _cancellationTokenSource.Token); + _ = new DcpDataSource(kubernetesService, applicationModel, loggerFactory, _resourcePublisher.IntegrateAsync, _cancellationTokenSource.Token); static string ComputeApplicationName(string applicationName) { From 01b2b2e496f6bfefbdd6b6cb9b3bf67a3b2555cd Mon Sep 17 00:00:00 2001 From: Drew Noakes Date: Tue, 12 Dec 2023 22:01:28 +1100 Subject: [PATCH 4/4] Add unit tests for ResourcePublisher --- .../Dashboard/ResourcePublisherTests.cs | 153 ++++++++++++++++++ 1 file changed, 153 insertions(+) create mode 100644 tests/Aspire.Hosting.Tests/Dashboard/ResourcePublisherTests.cs diff --git a/tests/Aspire.Hosting.Tests/Dashboard/ResourcePublisherTests.cs b/tests/Aspire.Hosting.Tests/Dashboard/ResourcePublisherTests.cs new file mode 100644 index 0000000000..9c452e7425 --- /dev/null +++ b/tests/Aspire.Hosting.Tests/Dashboard/ResourcePublisherTests.cs @@ -0,0 +1,153 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using Aspire.Dashboard.Model; +using Aspire.Hosting.Dashboard; +using Xunit; + +namespace Aspire.Hosting.Tests.Dashboard; + +public class ResourcePublisherTests +{ + [Fact] + public async Task ProducesExpectedSnapshotAndUpdates() + { + CancellationTokenSource cts = new(); + ResourcePublisher publisher = new(cts.Token); + + var a = CreateResource("A"); + var b = CreateResource("B"); + var c = CreateResource("C"); + + await publisher.IntegrateAsync(a, ResourceChangeType.Upsert).ConfigureAwait(false); + await publisher.IntegrateAsync(b, ResourceChangeType.Upsert).ConfigureAwait(false); + + var (snapshot, subscription) = publisher.Subscribe(); + + Assert.Equal(2, snapshot.Count); + Assert.Contains(a, snapshot); + Assert.Contains(b, snapshot); + + using AutoResetEvent sync = new(initialState: false); + List changes = []; + + var task = Task.Run(async () => + { + await foreach (var change in subscription) + { + changes.Add(change); + sync.Set(); + } + }); + + await publisher.IntegrateAsync(c, ResourceChangeType.Upsert).ConfigureAwait(false); + + sync.WaitOne(TimeSpan.FromSeconds(1)); + + var change = Assert.Single(changes); + Assert.Equal(ResourceChangeType.Upsert, change.ChangeType); + Assert.Same(c, change.Resource); + + await cts.CancelAsync(); + + await Assert.ThrowsAsync(() => task); + } + + [Fact] + public async Task SupportsMultipleSubscribers() + { + CancellationTokenSource cts = new(); + ResourcePublisher publisher = new(cts.Token); + + var a = CreateResource("A"); + var b = CreateResource("B"); + var c = CreateResource("C"); + + await publisher.IntegrateAsync(a, ResourceChangeType.Upsert).ConfigureAwait(false); + await publisher.IntegrateAsync(b, ResourceChangeType.Upsert).ConfigureAwait(false); + + var (snapshot1, subscription1) = publisher.Subscribe(); + var (snapshot2, subscription2) = publisher.Subscribe(); + + Assert.Equal(2, snapshot1.Count); + Assert.Equal(2, snapshot2.Count); + + await publisher.IntegrateAsync(c, ResourceChangeType.Upsert).ConfigureAwait(false); + + var enumerator1 = subscription1.GetAsyncEnumerator(cts.Token); + var enumerator2 = subscription2.GetAsyncEnumerator(cts.Token); + + await enumerator1.MoveNextAsync(); + await enumerator2.MoveNextAsync(); + + Assert.Equal(ResourceChangeType.Upsert, enumerator1.Current.ChangeType); + Assert.Equal(ResourceChangeType.Upsert, enumerator2.Current.ChangeType); + Assert.Same(c, enumerator1.Current.Resource); + Assert.Same(c, enumerator2.Current.Resource); + + await cts.CancelAsync(); + } + + [Fact] + public async Task MergesResourcesInSnapshot() + { + CancellationTokenSource cts = new(); + ResourcePublisher publisher = new(cts.Token); + + var a1 = CreateResource("A"); + var a2 = CreateResource("A"); + var a3 = CreateResource("A"); + + await publisher.IntegrateAsync(a1, ResourceChangeType.Upsert).ConfigureAwait(false); + await publisher.IntegrateAsync(a2, ResourceChangeType.Upsert).ConfigureAwait(false); + await publisher.IntegrateAsync(a3, ResourceChangeType.Upsert).ConfigureAwait(false); + + var (snapshot, _) = publisher.Subscribe(); + + Assert.Same(a3, Assert.Single(snapshot)); + + await cts.CancelAsync(); + } + + [Fact] + public async Task DeletesRemoveFromSnapshot() + { + CancellationTokenSource cts = new(); + ResourcePublisher publisher = new(cts.Token); + + var a = CreateResource("A"); + var b = CreateResource("B"); + + await publisher.IntegrateAsync(a, ResourceChangeType.Upsert).ConfigureAwait(false); + await publisher.IntegrateAsync(b, ResourceChangeType.Upsert).ConfigureAwait(false); + await publisher.IntegrateAsync(a, ResourceChangeType.Delete).ConfigureAwait(false); + + var (snapshot, _) = publisher.Subscribe(); + + Assert.Same(b, Assert.Single(snapshot)); + + await cts.CancelAsync(); + } + + private static ContainerViewModel CreateResource(string name) + { + return new ContainerViewModel() + { + Name = name, + Uid = "", + State = "", + CreationTimeStamp = null, + DisplayName = "", + Endpoints = [], + Environment = [], + ExpectedEndpointsCount = null, + LogSource = null!, + Services = [], + Args = [], + Command = "", + ContainerId = "", + Image = "", + Ports = [] + }; + } +}