diff --git a/src/Akka.Hosting.API.Tests/verify/CoreApiSpec.ApproveCore.verified.txt b/src/Akka.Hosting.API.Tests/verify/CoreApiSpec.ApproveCore.verified.txt index d93627a..9568cd2 100644 --- a/src/Akka.Hosting.API.Tests/verify/CoreApiSpec.ApproveCore.verified.txt +++ b/src/Akka.Hosting.API.Tests/verify/CoreApiSpec.ApproveCore.verified.txt @@ -5,6 +5,8 @@ namespace Akka.Hosting { public ActorRegistry() { } public Akka.Actor.IActorRef Get() { } + public System.Threading.Tasks.Task GetAsync(System.Type key, System.Threading.CancellationToken ct = default) { } + public System.Threading.Tasks.Task GetAsync(System.Threading.CancellationToken ct = default) { } public System.Collections.Generic.IEnumerator> GetEnumerator() { } public void Register(Akka.Actor.IActorRef actor, bool overwrite = false) { } public bool TryGet(System.Type key, out Akka.Actor.IActorRef actor) { } @@ -18,7 +20,7 @@ namespace Akka.Hosting public ActorRegistryException(string message) { } public ActorRegistryException(string message, System.Exception innerException) { } } - public class ActorRegistryExtension : Akka.Actor.ExtensionIdProvider + public sealed class ActorRegistryExtension : Akka.Actor.ExtensionIdProvider { public ActorRegistryExtension() { } public override Akka.Hosting.ActorRegistry CreateExtension(Akka.Actor.ExtendedActorSystem system) { } @@ -114,6 +116,8 @@ namespace Akka.Hosting public interface IReadOnlyActorRegistry : System.Collections.Generic.IEnumerable>, System.Collections.IEnumerable { Akka.Actor.IActorRef Get(); + System.Threading.Tasks.Task GetAsync(System.Type key, System.Threading.CancellationToken ct = default); + System.Threading.Tasks.Task GetAsync(System.Threading.CancellationToken ct = default); bool TryGet(System.Type key, out Akka.Actor.IActorRef actor); bool TryGet(out Akka.Actor.IActorRef actor); } diff --git a/src/Akka.Hosting.Tests/ActorRegistrySpecs.cs b/src/Akka.Hosting.Tests/ActorRegistrySpecs.cs index 82e3a00..2353ebe 100644 --- a/src/Akka.Hosting.Tests/ActorRegistrySpecs.cs +++ b/src/Akka.Hosting.Tests/ActorRegistrySpecs.cs @@ -1,4 +1,7 @@ using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; using Akka.Actor; using FluentAssertions; using Xunit; @@ -70,4 +73,77 @@ public void Should_not_throw_on_missing_entry_during_TryGet() // assert registry.Invoking(x => x.TryGet(out var actor)).Should().NotThrow(); } + + [Fact] + public async Task Should_complete_GetAsync_upon_KeyRegistered() + { + // arrange + var registry = new ActorRegistry(); + + // act + var task = registry.GetAsync(); + task.IsCompletedSuccessfully.Should().BeFalse(); + + registry.Register(Nobody.Instance); + var result = await task; + + // assert + result.Should().Be(Nobody.Instance); + } + + [Fact] + public async Task Should_complete_multiple_GetAsync_upon_KeyRegistered() + { + // arrange + var registry = new ActorRegistry(); + + // act + var task1 = registry.GetAsync(); + var task2 = registry.GetAsync(); + var task3 = registry.GetAsync(); + + // validate that all three tasks are distinct + task1.Should().NotBe(task2).And.NotBe(task3); + + var aggregate = Task.WhenAll(task1, task2, task3); + + registry.Register(Nobody.Instance); + var result = await aggregate; + + // assert + result.First().Should().Be(Nobody.Instance); + } + + [Fact] + public void GetAsync_should_return_CompletedTask_if_Key_AlreadyExists() + { + // arrange + var registry = new ActorRegistry(); + registry.Register(Nobody.Instance); + + // act + var task = registry.GetAsync(); + + // assert + task.IsCompletedSuccessfully.Should().BeTrue(); + } + + [Fact] + public void GetAsync_should_Cancel_after_Timeout() + { + // arrange + var registry = new ActorRegistry(); + var cancellationTokenSource = new CancellationTokenSource(); + + // act + var task = registry.GetAsync(cancellationTokenSource.Token); + Action cancel = () => + { + cancellationTokenSource.Cancel(); + task.Wait(TimeSpan.FromSeconds(3)); + }; + + // assert + cancel.Should().Throw(); + } } \ No newline at end of file diff --git a/src/Akka.Hosting/ActorRegistry.cs b/src/Akka.Hosting/ActorRegistry.cs index c75dd18..cf991e6 100644 --- a/src/Akka.Hosting/ActorRegistry.cs +++ b/src/Akka.Hosting/ActorRegistry.cs @@ -2,6 +2,9 @@ using System.Collections; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Collections.Immutable; +using System.Threading; +using System.Threading.Tasks; using Akka.Actor; using Akka.Util; @@ -35,12 +38,12 @@ public RequiredActor(IReadOnlyActorRegistry registry) /// public IActorRef ActorRef { get; } - } + } /// /// INTERNAL API /// - public class ActorRegistryExtension : ExtensionIdProvider + public sealed class ActorRegistryExtension : ExtensionIdProvider { public override ActorRegistry CreateExtension(ExtendedActorSystem system) { @@ -55,12 +58,10 @@ public class ActorRegistryException : Exception { public ActorRegistryException(string message) : base(message) { - } - + public ActorRegistryException(string message, Exception innerException) : base(message, innerException) { - } } @@ -69,15 +70,12 @@ public ActorRegistryException(string message, Exception innerException) : base(m /// public sealed class DuplicateActorRegistryException : ActorRegistryException { - public DuplicateActorRegistryException(string message) : base(message) { - } - + public DuplicateActorRegistryException(string message, Exception innerException) : base(message, innerException) { - } } @@ -90,11 +88,60 @@ public MissingActorRegistryEntryException(string message) : base(message) { } - public MissingActorRegistryEntryException(string message, Exception innerException) : base(message, innerException) + public MissingActorRegistryEntryException(string message, Exception innerException) : base(message, + innerException) { } } + /// + /// Used to implement "wait for actor" mechanics + /// + internal sealed class WaitForActorRegistration : IEquatable + { + public WaitForActorRegistration(Type key, TaskCompletionSource waiter) + { + Key = key; + Waiter = waiter; + } + + public Type Key { get; } + + public TaskCompletionSource Waiter { get; } + + public CancellationTokenRegistration CancellationRegistration { get; set; } + + public bool Equals(WaitForActorRegistration other) + { + if (ReferenceEquals(null, other)) return false; + if (ReferenceEquals(this, other)) return true; + return Key == other.Key; + } + + public override bool Equals(object obj) + { + return ReferenceEquals(this, obj) || obj is WaitForActorRegistration other && Equals(other); + } + + public override int GetHashCode() + { + unchecked + { + return (Key.GetHashCode() * 397) ^ Waiter.GetHashCode(); + } + } + + public static bool operator ==(WaitForActorRegistration? left, WaitForActorRegistration? right) + { + return Equals(left, right); + } + + public static bool operator !=(WaitForActorRegistration? left, WaitForActorRegistration? right) + { + return !Equals(left, right); + } + } + /// /// Mutable, but thread-safe . /// @@ -106,7 +153,7 @@ public MissingActorRegistryEntryException(string message, Exception innerExcepti public class ActorRegistry : IActorRegistry, IExtension { private readonly ConcurrentDictionary _actorRegistrations = new(); - + /// /// Thrown when the same value is inserted twice and overwriting is not allowed. /// Thrown when a null is registered. @@ -114,7 +161,7 @@ public void Register(IActorRef actor, bool overwrite = false) { if (actor == null) throw new ArgumentNullException(nameof(actor), "Cannot register null actors"); - + if (!TryRegister(actor, overwrite)) { throw new DuplicateActorRegistryException( @@ -122,6 +169,16 @@ public void Register(IActorRef actor, bool overwrite = false) } } + /// + /// In the event that an actor is not available yet, typically during the very beginning of ActorSystem startup, + /// we can wait on that actor becoming available. + /// + /// + /// Have to store a collection of s here so each waiter gets its own cancellation token. + /// + private readonly ConcurrentDictionary> _actorWaiters = + new ConcurrentDictionary>(); + /// /// Attempts to register an actor with the registry. /// @@ -144,11 +201,16 @@ public bool TryRegister(Type key, IActorRef actor, bool overwrite = false) { if (actor == null) return false; - + if (!overwrite) - return _actorRegistrations.TryAdd(key, actor); + { + if (!_actorRegistrations.TryAdd(key, actor)) + return false; + } else _actorRegistrations[key] = actor; + + NotifyWaiters(key, _actorRegistrations[key]); return true; } @@ -163,7 +225,7 @@ public bool TryGet(out IActorRef actor) } /// - /// Try to retrieve an with the given . + /// Try to retrieve an with the given type. /// /// The key for a particular actor. /// The bound , if any. Is set to if key is not found. @@ -180,6 +242,64 @@ public bool TryGet(Type key, out IActorRef actor) return false; } + /// + public async Task GetAsync(CancellationToken ct = default) + { + return await GetAsync(typeof(TKey), ct); + } + + /// + public async Task GetAsync(Type key, CancellationToken ct = default) + { + // try to get the populated actor first, if available + if (TryGet(key, out var storedActor)) + { + return storedActor; + } + + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var waitingRegistration = new WaitForActorRegistration(key, tcs); + var registration = ct.Register(CancelWaiter(key, ct, waitingRegistration), _actorWaiters); + waitingRegistration.CancellationRegistration = registration; + + var r = _actorWaiters.AddOrUpdate(key, + type => { return ImmutableHashSet.Empty.Add(waitingRegistration); }, + (type, set) => { return set.Add(waitingRegistration); }); + + var b = r; + + + return await tcs.Task.ConfigureAwait(false); + } + + private void NotifyWaiters(Type key, IActorRef value) + { + // remove the registrations and then iterate over them + if (_actorWaiters.TryRemove(key, out var registrations)) + { + foreach (var r in registrations) + { + r.Waiter.TrySetResult(value); + r.CancellationRegistration.Dispose(); + } + } + } + + private static Action CancelWaiter(Type key, CancellationToken ct, + WaitForActorRegistration waitingRegistration) + { + return dict => + { + // first step during timeout is to remove our registration + var d = (ConcurrentDictionary>)dict; + d.AddOrUpdate(key, type => ImmutableHashSet.Empty, + (type, set) => set.Remove(waitingRegistration)); + + // next, cancel the task + waitingRegistration.Waiter.TrySetCanceled(ct); + }; + } + /// /// Fetches the by key. /// @@ -201,6 +321,7 @@ public IEnumerator> GetEnumerator() { return _actorRegistrations.GetEnumerator(); } + /// /// Allows enumerated access to the collection of all registered actors. /// @@ -214,8 +335,8 @@ public static ActorRegistry For(ActorSystem actorSystem) { return actorSystem.WithExtension(); } - } - + } + /// /// Represents a read-only collection of instances keyed by the actor name. /// @@ -227,7 +348,7 @@ public interface IReadOnlyActorRegistry : IEnumerableThe bound , if any. Is set to if key is not found. /// true if an actor with this key exists, false otherwise. bool TryGet(out IActorRef actor); - + /// /// Try to retrieve an with the given . /// @@ -235,7 +356,7 @@ public interface IReadOnlyActorRegistry : IEnumerableThe bound , if any. Is set to if key is not found. /// true if an actor with this key exists, false otherwise. bool TryGet(Type key, out IActorRef actor); - + /// /// Fetches the by key. /// @@ -243,6 +364,24 @@ public interface IReadOnlyActorRegistry : IEnumerableIf found, the underlying . /// If not found, returns . IActorRef Get(); + + /// + /// Asynchronously fetches the by key. Task will complete when the actor is registered. + /// + /// The CancellationToken that can be used to cancel the GetAsync operation. + /// The key type to retrieve this actor. + /// A that will complete when the actor is registered or will throw + /// a in the event that the is invoked. + public Task GetAsync(CancellationToken ct = default); + + /// + /// Asynchronously fetches the by key. Task will complete when the actor is registered. + /// + /// The CancellationToken that can be used to cancel the GetAsync operation. + /// The key type to retrieve this actor. + /// A that will complete when the actor is registered or will throw + /// a in the event that the is invoked. + public Task GetAsync(Type key, CancellationToken ct = default); } /// @@ -253,7 +392,7 @@ public interface IReadOnlyActorRegistry : IEnumerable to the registry you are definitely using it wrong. /// - public interface IActorRegistry: IReadOnlyActorRegistry + public interface IActorRegistry : IReadOnlyActorRegistry { /// /// Registers an actor into the registry. Throws an exception upon failure. @@ -261,7 +400,7 @@ public interface IActorRegistry: IReadOnlyActorRegistry /// The bound , if any. Is set to if key is not found. /// If true, allows overwriting of a previous actor with the same key. Defaults to false. void Register(IActorRef actor, bool overwrite = false); - + /// /// Attempts to register an actor with the registry. /// @@ -279,4 +418,4 @@ public interface IActorRegistry: IReadOnlyActorRegistry /// true if the actor was set to this key in the registry, false otherwise. bool TryRegister(Type key, IActorRef actor, bool overwrite = false); } -} +} \ No newline at end of file diff --git a/src/Directory.Build.props b/src/Directory.Build.props index 0ce69f8..ab780d6 100644 --- a/src/Directory.Build.props +++ b/src/Directory.Build.props @@ -2,8 +2,12 @@ Copyright © 2013-2022 Akka.NET Team Akka.NET Team - 0.4.0 - • [Add Microsoft.Extensions.Logging.ILoggerFactory logging support](https://github.com/akkadotnet/Akka.Hosting/pull/72) + 1.0.1 + Version 1.0.1 fixes options bug used in the cluster sharding Akka.Hosting extension method. +* [Update Akka.NET from 1.4.45 to 1.4.48](https://github.com/akkadotnet/akka.net/releases/tag/1.4.48) +* [Fix SimpleDemo project failing on `Development` environment](https://github.com/akkadotnet/Akka.Hosting/pull/184) +* [Add F# CustomJournalIdDemo project](https://github.com/akkadotnet/Akka.Hosting/pull/183) +* [Fix cluster sharding hosting extension method options bug](https://github.com/akkadotnet/Akka.Hosting/pull/186) akkalogo.png https://github.com/akkadotnet/Akka.Hosting