Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify ResourcePublisher #1346

Merged
merged 4 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Aspire.Dashboard/Components/Pages/ConsoleLogs.razor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ private async Task OnResourceListChangedAsync(ResourceChangeType changeType, Res
}
}
}
else if (changeType == ResourceChangeType.Deleted)
else if (changeType == ResourceChangeType.Delete)
{
_resourceNameMapping.Remove(resourceViewModel.Name);

Expand Down
2 changes: 1 addition & 1 deletion src/Aspire.Dashboard/Components/Pages/Resources.razor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ private async Task OnResourceListChanged(ResourceChangeType changeType, Resource
_resourcesMap[resource.Name] = resource;
break;

case ResourceChangeType.Deleted:
case ResourceChangeType.Delete:
_resourcesMap.Remove(resource.Name);
break;
}
Expand Down
2 changes: 1 addition & 1 deletion src/Aspire.Dashboard/Model/ResourceChangeType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ public enum ResourceChangeType
/// <summary>
/// The object was deleted.
/// </summary>
Deleted
Delete
}
2 changes: 1 addition & 1 deletion src/Aspire.Dashboard/Model/ResourceOutgoingPeerResolver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ private async Task OnResourceListChanged(ResourceChangeType changeType, Resource
{
_resourceNameMapping[resourceViewModel.Name] = resourceViewModel;
}
else if (changeType == ResourceChangeType.Deleted)
else if (changeType == ResourceChangeType.Delete)
{
_resourceNameMapping.TryRemove(resourceViewModel.Name, out _);
}
Expand Down
2 changes: 1 addition & 1 deletion src/Aspire.Hosting/Dashboard/DcpDataSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ private static ResourceChangeType ToChangeType(WatchEventType watchEventType)
return watchEventType switch
{
WatchEventType.Added or WatchEventType.Modified => ResourceChangeType.Upsert,
WatchEventType.Deleted => ResourceChangeType.Deleted,
WatchEventType.Deleted => ResourceChangeType.Delete,
_ => ResourceChangeType.Other
};
}
Expand Down
73 changes: 17 additions & 56 deletions src/Aspire.Hosting/Dashboard/ResourcePublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,22 @@ public ResourceSubscription Subscribe()

return new ResourceSubscription(
Snapshot: _snapshot.Values.ToList(),
Subscription: new ResourceSubscriptionEnumerable(channel, disposeAction: RemoveChannel));
}
Subscription: StreamUpdates());

void RemoveChannel(Channel<ResourceChange> channel)
{
ImmutableInterlocked.Update(ref _outgoingChannels, static (set, channel) => set.Remove(channel), channel);
async IAsyncEnumerable<ResourceChange> StreamUpdates()
{
try
{
while (!cancellationToken.IsCancellationRequested)
{
yield return await channel.Reader.ReadAsync(cancellationToken).ConfigureAwait(false);
}
}
finally
{
ImmutableInterlocked.Update(ref _outgoingChannels, static (set, channel) => set.Remove(channel), channel);
}
}
}
}

Expand All @@ -43,7 +53,7 @@ void RemoveChannel(Channel<ResourceChange> channel)
/// <param name="resource">The resource that was modified.</param>
/// <param name="changeType">The change type (Added, Modified, Deleted).</param>
/// <returns>A task that completes when the cache has been updated and all subscribers notified.</returns>
public async ValueTask Integrate(ResourceViewModel resource, ResourceChangeType changeType)
public async ValueTask IntegrateAsync(ResourceViewModel resource, ResourceChangeType changeType)
{
lock (_syncLock)
{
Expand All @@ -53,7 +63,7 @@ public async ValueTask Integrate(ResourceViewModel resource, ResourceChangeType
_snapshot[resource.Name] = resource;
break;

case ResourceChangeType.Deleted:
case ResourceChangeType.Delete:
_snapshot.Remove(resource.Name);
break;
}
Expand All @@ -64,53 +74,4 @@ public async ValueTask Integrate(ResourceViewModel resource, ResourceChangeType
await channel.Writer.WriteAsync(new(changeType, resource), cancellationToken).ConfigureAwait(false);
}
}

private sealed class ResourceSubscriptionEnumerable : IAsyncEnumerable<ResourceChange>
{
private readonly Channel<ResourceChange> _channel;
private readonly Action<Channel<ResourceChange>> _disposeAction;

public ResourceSubscriptionEnumerable(Channel<ResourceChange> channel, Action<Channel<ResourceChange>> disposeAction)
{
_channel = channel;
_disposeAction = disposeAction;
}

public IAsyncEnumerator<ResourceChange> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
return new ResourceSubscriptionEnumerator(_channel, _disposeAction, cancellationToken);
}
}

private sealed class ResourceSubscriptionEnumerator : IAsyncEnumerator<ResourceChange>
{
private readonly Channel<ResourceChange> _channel;
private readonly Action<Channel<ResourceChange>> _disposeAction;
private readonly CancellationToken _cancellationToken;

public ResourceSubscriptionEnumerator(
Channel<ResourceChange> channel, Action<Channel<ResourceChange>> disposeAction, CancellationToken cancellationToken)
{
_channel = channel;
_disposeAction = disposeAction;
_cancellationToken = cancellationToken;
Current = default!;
}

public ResourceChange Current { get; private set; }

public ValueTask DisposeAsync()
{
_disposeAction(_channel);

return ValueTask.CompletedTask;
}

public async ValueTask<bool> MoveNextAsync()
{
Current = await _channel.Reader.ReadAsync(_cancellationToken).ConfigureAwait(false);

return true;
}
}
}
2 changes: 1 addition & 1 deletion src/Aspire.Hosting/Dashboard/ResourceService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public ResourceService(

_resourcePublisher = new ResourcePublisher(_cancellationTokenSource.Token);

_ = new DcpDataSource(kubernetesService, applicationModel, loggerFactory, _resourcePublisher.Integrate, _cancellationTokenSource.Token);
_ = new DcpDataSource(kubernetesService, applicationModel, loggerFactory, _resourcePublisher.IntegrateAsync, _cancellationTokenSource.Token);

static string ComputeApplicationName(string applicationName)
{
Expand Down
153 changes: 153 additions & 0 deletions tests/Aspire.Hosting.Tests/Dashboard/ResourcePublisherTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using Aspire.Dashboard.Model;
using Aspire.Hosting.Dashboard;
using Xunit;

namespace Aspire.Hosting.Tests.Dashboard;

public class ResourcePublisherTests
{
[Fact]
public async Task ProducesExpectedSnapshotAndUpdates()
{
CancellationTokenSource cts = new();
ResourcePublisher publisher = new(cts.Token);

var a = CreateResource("A");
var b = CreateResource("B");
var c = CreateResource("C");

await publisher.IntegrateAsync(a, ResourceChangeType.Upsert).ConfigureAwait(false);
await publisher.IntegrateAsync(b, ResourceChangeType.Upsert).ConfigureAwait(false);

var (snapshot, subscription) = publisher.Subscribe();

Assert.Equal(2, snapshot.Count);
Assert.Contains(a, snapshot);
Assert.Contains(b, snapshot);

using AutoResetEvent sync = new(initialState: false);
List<ResourceChange> changes = [];

var task = Task.Run(async () =>
{
await foreach (var change in subscription)
{
changes.Add(change);
sync.Set();
}
});

await publisher.IntegrateAsync(c, ResourceChangeType.Upsert).ConfigureAwait(false);

sync.WaitOne(TimeSpan.FromSeconds(1));

var change = Assert.Single(changes);
Assert.Equal(ResourceChangeType.Upsert, change.ChangeType);
Assert.Same(c, change.Resource);

await cts.CancelAsync();

await Assert.ThrowsAsync<OperationCanceledException>(() => task);
}

[Fact]
public async Task SupportsMultipleSubscribers()
{
CancellationTokenSource cts = new();
ResourcePublisher publisher = new(cts.Token);

var a = CreateResource("A");
var b = CreateResource("B");
var c = CreateResource("C");

await publisher.IntegrateAsync(a, ResourceChangeType.Upsert).ConfigureAwait(false);
await publisher.IntegrateAsync(b, ResourceChangeType.Upsert).ConfigureAwait(false);

var (snapshot1, subscription1) = publisher.Subscribe();
var (snapshot2, subscription2) = publisher.Subscribe();

Assert.Equal(2, snapshot1.Count);
Assert.Equal(2, snapshot2.Count);

await publisher.IntegrateAsync(c, ResourceChangeType.Upsert).ConfigureAwait(false);

var enumerator1 = subscription1.GetAsyncEnumerator(cts.Token);
var enumerator2 = subscription2.GetAsyncEnumerator(cts.Token);

await enumerator1.MoveNextAsync();
await enumerator2.MoveNextAsync();

Assert.Equal(ResourceChangeType.Upsert, enumerator1.Current.ChangeType);
Assert.Equal(ResourceChangeType.Upsert, enumerator2.Current.ChangeType);
Assert.Same(c, enumerator1.Current.Resource);
Assert.Same(c, enumerator2.Current.Resource);

await cts.CancelAsync();
}

[Fact]
public async Task MergesResourcesInSnapshot()
{
CancellationTokenSource cts = new();
ResourcePublisher publisher = new(cts.Token);

var a1 = CreateResource("A");
var a2 = CreateResource("A");
var a3 = CreateResource("A");

await publisher.IntegrateAsync(a1, ResourceChangeType.Upsert).ConfigureAwait(false);
await publisher.IntegrateAsync(a2, ResourceChangeType.Upsert).ConfigureAwait(false);
await publisher.IntegrateAsync(a3, ResourceChangeType.Upsert).ConfigureAwait(false);

var (snapshot, _) = publisher.Subscribe();

Assert.Same(a3, Assert.Single(snapshot));

await cts.CancelAsync();
}

[Fact]
public async Task DeletesRemoveFromSnapshot()
{
CancellationTokenSource cts = new();
ResourcePublisher publisher = new(cts.Token);

var a = CreateResource("A");
var b = CreateResource("B");

await publisher.IntegrateAsync(a, ResourceChangeType.Upsert).ConfigureAwait(false);
await publisher.IntegrateAsync(b, ResourceChangeType.Upsert).ConfigureAwait(false);
await publisher.IntegrateAsync(a, ResourceChangeType.Delete).ConfigureAwait(false);

var (snapshot, _) = publisher.Subscribe();

Assert.Same(b, Assert.Single(snapshot));

await cts.CancelAsync();
}

private static ContainerViewModel CreateResource(string name)
{
return new ContainerViewModel()
{
Name = name,
Uid = "",
State = "",
CreationTimeStamp = null,
DisplayName = "",
Endpoints = [],
Environment = [],
ExpectedEndpointsCount = null,
LogSource = null!,
Services = [],
Args = [],
Command = "",
ContainerId = "",
Image = "",
Ports = []
};
}
}