diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt index 50f1e70dc99..1400a378611 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt @@ -17,9 +17,24 @@ namespace Akka.Streams } public class static ActorAttributes { + public static Akka.Streams.Attributes CreateDebugLogging(bool enabled) { } public static Akka.Streams.Attributes CreateDispatcher(string dispatcherName) { } + public static Akka.Streams.Attributes CreateFuzzingMode(bool enabled) { } + public static Akka.Streams.Attributes CreateMaxFixedBufferSize(int size) { } + public static Akka.Streams.Attributes CreateOutputBurstLimit(int limit) { } + public static Akka.Streams.Attributes CreateStreamSubscriptionTimeout(System.TimeSpan timeout, Akka.Streams.StreamSubscriptionTimeoutTerminationMode mode) { } public static Akka.Streams.Attributes CreateSupervisionStrategy(Akka.Streams.Supervision.Decider strategy) { } - public sealed class Dispatcher : Akka.Streams.Attributes.IAttribute, System.IEquatable + public static Akka.Streams.Attributes CreateSyncProcessingLimit(int limit) { } + public sealed class DebugLogging : Akka.Streams.Attributes.IAttribute, Akka.Streams.Attributes.IMandatoryAttribute, System.IEquatable + { + public DebugLogging(bool enabled) { } + public bool Enabled { get; } + public bool Equals(Akka.Streams.ActorAttributes.DebugLogging other) { } + public override bool Equals(object obj) { } + public override int GetHashCode() { } + public override string ToString() { } + } + public sealed class Dispatcher : Akka.Streams.Attributes.IAttribute, Akka.Streams.Attributes.IMandatoryAttribute, System.IEquatable { public readonly string Name; public Dispatcher(string name) { } @@ -28,12 +43,58 @@ namespace Akka.Streams public override int GetHashCode() { } public override string ToString() { } } - public sealed class SupervisionStrategy : Akka.Streams.Attributes.IAttribute + public sealed class FuzzingMode : Akka.Streams.Attributes.IAttribute, Akka.Streams.Attributes.IMandatoryAttribute, System.IEquatable + { + public FuzzingMode(bool enabled) { } + public bool Enabled { get; } + public bool Equals(Akka.Streams.ActorAttributes.FuzzingMode other) { } + public override bool Equals(object obj) { } + public override int GetHashCode() { } + public override string ToString() { } + } + public sealed class MaxFixedBufferSize : Akka.Streams.Attributes.IAttribute, Akka.Streams.Attributes.IMandatoryAttribute, System.IEquatable + { + public MaxFixedBufferSize(int size) { } + public int Size { get; } + public bool Equals(Akka.Streams.ActorAttributes.MaxFixedBufferSize other) { } + public override bool Equals(object obj) { } + public override int GetHashCode() { } + public override string ToString() { } + } + public sealed class OutputBurstLimit : Akka.Streams.Attributes.IAttribute, Akka.Streams.Attributes.IMandatoryAttribute, System.IEquatable + { + public OutputBurstLimit(int limit) { } + public int Limit { get; } + public bool Equals(Akka.Streams.ActorAttributes.OutputBurstLimit other) { } + public override bool Equals(object obj) { } + public override int GetHashCode() { } + public override string ToString() { } + } + public sealed class StreamSubscriptionTimeout : Akka.Streams.Attributes.IAttribute, Akka.Streams.Attributes.IMandatoryAttribute, System.IEquatable + { + public StreamSubscriptionTimeout(System.TimeSpan timeout, Akka.Streams.StreamSubscriptionTimeoutTerminationMode mode) { } + public Akka.Streams.StreamSubscriptionTimeoutTerminationMode Mode { get; } + public System.TimeSpan Timeout { get; } + public bool Equals(Akka.Streams.ActorAttributes.StreamSubscriptionTimeout other) { } + public override bool Equals(object obj) { } + public override int GetHashCode() { } + public override string ToString() { } + } + public sealed class SupervisionStrategy : Akka.Streams.Attributes.IAttribute, Akka.Streams.Attributes.IMandatoryAttribute { public readonly Akka.Streams.Supervision.Decider Decider; public SupervisionStrategy(Akka.Streams.Supervision.Decider decider) { } public override string ToString() { } } + public sealed class SyncProcessingLimit : Akka.Streams.Attributes.IAttribute, Akka.Streams.Attributes.IMandatoryAttribute, System.IEquatable + { + public SyncProcessingLimit(int limit) { } + public int Limit { get; } + public bool Equals(Akka.Streams.ActorAttributes.SyncProcessingLimit other) { } + public override bool Equals(object obj) { } + public override int GetHashCode() { } + public override string ToString() { } + } } public abstract class ActorMaterializer : Akka.Streams.IMaterializer, Akka.Streams.IMaterializerLoggingProvider, System.IDisposable { @@ -116,10 +177,14 @@ namespace Akka.Streams where TAttr : class, Akka.Streams.Attributes.IAttribute { } public System.Collections.Generic.IEnumerable GetAttributeList() where TAttr : Akka.Streams.Attributes.IAttribute { } + [System.ObsoleteAttribute("Attributes should always be most specific, use GetAttribute()")] public TAttr GetFirstAttribute(TAttr defaultIfNotFound) where TAttr : class, Akka.Streams.Attributes.IAttribute { } + [System.ObsoleteAttribute("Attributes should always be most specific, use GetAttribute()")] public TAttr GetFirstAttribute() where TAttr : class, Akka.Streams.Attributes.IAttribute { } + public TAttr GetMandatoryAttribute() + where TAttr : class, Akka.Streams.Attributes.IMandatoryAttribute { } public string GetNameLifted() { } public string GetNameOrDefault(string defaultIfNotFound = "unknown-operation") { } public override string ToString() { } @@ -130,8 +195,33 @@ namespace Akka.Streams public override bool Equals(object obj) { } public override string ToString() { } } + public sealed class CancellationStrategy : Akka.Streams.Attributes.IAttribute, Akka.Streams.Attributes.IMandatoryAttribute + { + public CancellationStrategy(Akka.Streams.Attributes.CancellationStrategy.IStrategy strategy) { } + public Akka.Streams.Attributes.CancellationStrategy.IStrategy Strategy { get; } + public class AfterDelay : Akka.Streams.Attributes.CancellationStrategy.IStrategy + { + public AfterDelay(System.TimeSpan delay, Akka.Streams.Attributes.CancellationStrategy.IStrategy strategy) { } + public System.TimeSpan Delay { get; } + public Akka.Streams.Attributes.CancellationStrategy.IStrategy Strategy { get; } + } + public class CompleteStage : Akka.Streams.Attributes.CancellationStrategy.IStrategy + { + public CompleteStage() { } + } + public class FailStage : Akka.Streams.Attributes.CancellationStrategy.IStrategy + { + public FailStage() { } + } + public interface IStrategy { } + public class PropagateFailure : Akka.Streams.Attributes.CancellationStrategy.IStrategy + { + public PropagateFailure() { } + } + } public interface IAttribute { } - public sealed class InputBuffer : Akka.Streams.Attributes.IAttribute, System.IEquatable + public interface IMandatoryAttribute : Akka.Streams.Attributes.IAttribute { } + public sealed class InputBuffer : Akka.Streams.Attributes.IAttribute, Akka.Streams.Attributes.IMandatoryAttribute, System.IEquatable { public readonly int Initial; public readonly int Max; @@ -800,12 +890,46 @@ namespace Akka.Streams } public class static StreamRefAttributes { + public static Akka.Streams.Attributes CreateBufferCapacity(int capacity) { } + public static Akka.Streams.Attributes CreateDemandRedeliveryInterval(System.TimeSpan timeout) { } + public static Akka.Streams.Attributes CreateFinalTerminationSignalDeadline(System.TimeSpan timeout) { } public static Akka.Streams.Attributes CreateSubscriptionTimeout(System.TimeSpan timeout) { } + public sealed class BufferCapacity : Akka.Streams.Attributes.IAttribute, Akka.Streams.StreamRefAttributes.IStreamRefAttribute, System.IEquatable + { + public BufferCapacity(int capacity) { } + public int Capacity { get; } + public bool Equals(Akka.Streams.StreamRefAttributes.BufferCapacity other) { } + public override bool Equals(object obj) { } + public override int GetHashCode() { } + public override string ToString() { } + } + public sealed class DemandRedeliveryInterval : Akka.Streams.Attributes.IAttribute, Akka.Streams.StreamRefAttributes.IStreamRefAttribute, System.IEquatable + { + public DemandRedeliveryInterval(System.TimeSpan timeout) { } + public System.TimeSpan Timeout { get; } + public bool Equals(Akka.Streams.StreamRefAttributes.DemandRedeliveryInterval other) { } + public override bool Equals(object obj) { } + public override int GetHashCode() { } + public override string ToString() { } + } + public sealed class FinalTerminationSignalDeadline : Akka.Streams.Attributes.IAttribute, Akka.Streams.StreamRefAttributes.IStreamRefAttribute, System.IEquatable + { + public FinalTerminationSignalDeadline(System.TimeSpan timeout) { } + public System.TimeSpan Timeout { get; } + public bool Equals(Akka.Streams.StreamRefAttributes.FinalTerminationSignalDeadline other) { } + public override bool Equals(object obj) { } + public override int GetHashCode() { } + public override string ToString() { } + } public interface IStreamRefAttribute : Akka.Streams.Attributes.IAttribute { } - public sealed class SubscriptionTimeout : Akka.Streams.Attributes.IAttribute, Akka.Streams.StreamRefAttributes.IStreamRefAttribute + public sealed class SubscriptionTimeout : Akka.Streams.Attributes.IAttribute, Akka.Streams.StreamRefAttributes.IStreamRefAttribute, System.IEquatable { public SubscriptionTimeout(System.TimeSpan timeout) { } public System.TimeSpan Timeout { get; } + public bool Equals(Akka.Streams.StreamRefAttributes.SubscriptionTimeout other) { } + public override bool Equals(object obj) { } + public override int GetHashCode() { } + public override string ToString() { } } } public sealed class StreamRefSubscriptionTimeoutException : Akka.Pattern.IllegalStateException diff --git a/src/core/Akka.Streams/Attributes.cs b/src/core/Akka.Streams/Attributes.cs index 98c741f7fb5..d4b79abf32d 100644 --- a/src/core/Akka.Streams/Attributes.cs +++ b/src/core/Akka.Streams/Attributes.cs @@ -10,6 +10,7 @@ using System.Linq; using System.Text; using Akka.Event; +using Akka.Pattern; using Akka.Streams.Dsl; using Akka.Streams.Implementation; using Akka.Streams.Supervision; @@ -31,7 +32,16 @@ public sealed class Attributes public interface IAttribute { } /// - /// TBD + /// Attributes that are always present (is defined with default values by the materializer) + /// + /// Not for user extension + /// + public interface IMandatoryAttribute : IAttribute { } + + /// + /// Specifies the name of the operation. + /// + /// Use factory method to create instances. /// public sealed class Name : IAttribute, IEquatable { @@ -67,9 +77,13 @@ public Name(string value) } /// - /// TBD + /// Each asynchronous piece of a materialized stream topology is executed by one Actor + /// that manages an input buffer for all inlets of its shape. This attribute configures + /// the initial and maximal input buffer in number of elements for each inlet. + /// + /// Use factory method to create instances. /// - public sealed class InputBuffer : IAttribute, IEquatable + public sealed class InputBuffer : IMandatoryAttribute, IEquatable { /// /// TBD @@ -193,7 +207,7 @@ public sealed class AsyncBoundary : IAttribute, IEquatable private AsyncBoundary() { } /// - public bool Equals(AsyncBoundary other) => true; + public bool Equals(AsyncBoundary other) => other is AsyncBoundary; /// public override bool Equals(object obj) => obj is AsyncBoundary; @@ -202,6 +216,87 @@ private AsyncBoundary() { } public override string ToString() => "AsyncBoundary"; } + /// + /// Cancellation strategies provide a way to configure the behavior of a stage + /// when `cancelStage` is called. + /// + /// It is only relevant for stream components that have more than one output + /// and do not define a custom cancellation behavior by overriding `onDownstreamFinish`. + /// In those cases, if the first output is cancelled, the default behavior + /// is to call `cancelStage` which shuts down the stage completely. + /// The given strategy will allow customization of how the shutdown procedure should be done precisely. + /// + public sealed class CancellationStrategy:IMandatoryAttribute + { + internal CancellationStrategy Default { get; } = new CancellationStrategy(new PropagateFailure()); + + public IStrategy Strategy { get; } + + public CancellationStrategy(IStrategy strategy) + { + Strategy = strategy; + } + + public interface IStrategy { } + + /// + /// Strategy that treats `cancelStage` the same as `completeStage`, i.e. all inlets are cancelled (propagating the + /// cancellation cause) and all outlets are regularly completed. + /// + /// This used to be the default behavior before Akka 2.6. + /// + /// This behavior can be problematic in stacks of BidiFlows where different layers of the stack are both connected + /// through inputs and outputs. In this case, an error in a doubly connected component triggers both a cancellation + /// going upstream and an error going downstream.Since the stack might be connected to those components with inlets and + /// outlets, a race starts whether the cancellation or the error arrives first.If the error arrives first, that's usually + /// good because then the error can be propagated both on inlets and outlets.However, if the cancellation arrives first, + /// the previous default behavior to complete the stage will lead other outputs to be completed regularly.The error + /// which arrive late at the other hand will just be ignored (that connection will have been cancelled already and also + /// the paths through which the error could propagates are already shut down). + /// + public class CompleteStage : IStrategy { } + + /// + /// Strategy that treats `cancelStage` the same as `failStage`, i.e. all inlets are cancelled (propagating the + /// cancellation cause) and all outlets are failed propagating the cause from cancellation. + /// + public class FailStage : IStrategy { } + + /// + /// Strategy that treats `cancelStage` in different ways depending on the cause that was given to the cancellation. + /// + /// If the cause was a regular, active cancellation (`SubscriptionWithCancelException.NoMoreElementsNeeded`), the stage + /// receiving this cancellation is completed regularly. + /// + /// If another cause was given, this is treated as an error and the behavior is the same as with `failStage`. + /// + /// This is a good default strategy. + /// + public class PropagateFailure : IStrategy { } + + /// + /// Strategy that allows to delay any action when `cancelStage` is invoked. + /// + /// The idea of this strategy is to delay any action on cancellation because it is expected that the stage is completed + /// through another path in the meantime. The downside is that a stage and a stream may live longer than expected if no + /// such signal is received and cancellation is invoked later on. In streams with many stages that all apply this strategy, + /// this strategy might significantly delay the propagation of a cancellation signal because each upstream stage might impose + /// such a delay. During this time, the stream will be mostly "silent", i.e. it cannot make progress because of backpressure, + /// but you might still be able observe a long delay at the ultimate source. + /// + public class AfterDelay : IStrategy + { + public TimeSpan Delay { get; } + public IStrategy Strategy { get; } + + public AfterDelay(TimeSpan delay, IStrategy strategy) + { + Delay = delay; + Strategy = strategy; + } + } + } + /// /// TBD /// @@ -219,12 +314,32 @@ public Attributes(params IAttribute[] attributes) } /// - /// TBD + /// The list is ordered with the most specific attribute first, least specific last. + /// + /// Note that operators in general should not inspect the whole hierarchy but instead use + /// to get the most specific attribute value. /// public IEnumerable AttributeList => _attributes; /// - /// Get all attributes of a given type or subtype thereof + /// Note that this must only be used during traversal building and not during materialization + /// as it will then always return true because of the defaults from the ActorMaterializerSettings + /// + /// INTERNAL API + /// + internal bool IsAsync + => _attributes.Count() > 0 && + _attributes.Any( + attr => attr is AsyncBoundary || + attr is ActorAttributes.Dispatcher); + + /// + /// Get all attributes of a given type (or subtypes thereof). + /// + /// Note that operators in general should not inspect the whole hierarchy but instead use + /// to get the most specific attribute value. + /// + /// The list is ordered with the most specific attribute first, least specific last. /// /// TBD /// TBD @@ -248,6 +363,7 @@ public TAttr GetAttribute(TAttr defaultIfNotFound) where TAttr : class, I /// TBD /// TBD /// TBD + [Obsolete("Attributes should always be most specific, use GetAttribute()")] public TAttr GetFirstAttribute(TAttr defaultIfNotFound) where TAttr : class, IAttribute => GetFirstAttribute() ?? defaultIfNotFound; @@ -264,9 +380,23 @@ public TAttr GetAttribute() where TAttr : class, IAttribute /// /// TBD /// TBD + [Obsolete("Attributes should always be most specific, use GetAttribute()")] public TAttr GetFirstAttribute() where TAttr : class, IAttribute => _attributes.FirstOrDefault(attr => attr is TAttr) as TAttr; + /// + /// Get the most specific of one of the mandatory attributes. Mandatory attributes are guaranteed + /// to always be among the attributes when the attributes are coming from a materialization. + /// + /// + /// + public TAttr GetMandatoryAttribute() where TAttr : class, IMandatoryAttribute + { + if (!(_attributes.First(attr => attr is TAttr) is TAttr result)) + throw new IllegalStateException($"Mandatory attribute [{typeof(TAttr)}] not found."); + return result; + } + /// /// Adds given attributes to the end of these attributes. /// @@ -321,6 +451,9 @@ public string GetNameOrDefault(string defaultIfNotFound = "unknown-operation") /// /// Test whether the given attribute is contained within this attributes list. + /// + /// Note that operators in general should not inspect the whole hierarchy but instead use + /// `get` to get the most specific attribute value. /// /// TBD /// TBD @@ -330,14 +463,22 @@ public string GetNameOrDefault(string defaultIfNotFound = "unknown-operation") /// /// Specifies the name of the operation. /// If the name is null or empty the name is ignored, i.e. is returned. + /// + /// When using this method the name is encoded with `Uri.EscapeUriString` because + /// the name is sometimes used as part of actor name. If that is not desired + /// the name can be added in it's raw format using `.And(new Attributes(new Name(name)))`. /// /// TBD /// TBD public static Attributes CreateName(string name) - => string.IsNullOrEmpty(name) ? None : new Attributes(new Name(name)); + => string.IsNullOrEmpty(name) ? + None : + new Attributes(new Name(Uri.EscapeUriString(name))); /// - /// Specifies the initial and maximum size of the input buffer. + /// Each asynchronous piece of a materialized stream topology is executed by one Actor + /// that manages an input buffer for all inlets of its shape. This attribute configures + /// the initial and maximal input buffer in number of elements for each inlet. /// /// TBD /// TBD @@ -365,6 +506,7 @@ public static Attributes CreateLogLevels(LogLevel onElement = LogLevel.DebugLeve LogLevel onFinish = LogLevel.DebugLevel, LogLevel onError = LogLevel.ErrorLevel) => new Attributes(new LogLevels(onElement, onFinish, onError)); + // TODO: different than scala code, investigate later. /// /// Compute a name by concatenating all Name attributes that the given module /// has, returning the given default value if none are found. @@ -386,14 +528,16 @@ public static string ExtractName(IModule module, string defaultIfNotFound) } /// - /// Attributes for the . Note that more attributes defined in . + /// Attributes for the . Note that more attributes defined in . /// public static class ActorAttributes { /// - /// TBD + /// Configures the dispatcher to be used by streams. + /// + /// Use factory method to create instances. /// - public sealed class Dispatcher : Attributes.IAttribute, IEquatable + public sealed class Dispatcher : Attributes.IMandatoryAttribute, IEquatable { /// /// TBD @@ -432,7 +576,7 @@ public bool Equals(Dispatcher other) /// /// TBD /// - public sealed class SupervisionStrategy : Attributes.IAttribute + public sealed class SupervisionStrategy : Attributes.IMandatoryAttribute { /// /// TBD @@ -453,7 +597,221 @@ public SupervisionStrategy(Decider decider) } /// - /// Specifies the name of the dispatcher. + /// Enables additional low level troubleshooting logging at DEBUG log level + /// + /// Use factory method `CreateDebugLogging` to create. + /// + public sealed class DebugLogging : + Attributes.IMandatoryAttribute, + IEquatable + { + public bool Enabled { get; } + + public DebugLogging(bool enabled) + { + Enabled = enabled; + } + + /// + public bool Equals(DebugLogging other) + { + if (other is null) return false; + if (ReferenceEquals(this, other)) return true; + return Enabled == other.Enabled; + } + + /// + public override bool Equals(object obj) => obj is DebugLogging attr && Equals(attr); + + /// + public override int GetHashCode() => Enabled.GetHashCode(); + + /// + public override string ToString() => $"DebugLogging(enabled={Enabled})"; + } + + /// + /// Defines a timeout for stream subscription and what action to take when that hits. + /// + /// Use factory method `CreateStreamSubscriptionTimeout` to create. + /// + public sealed class StreamSubscriptionTimeout : + Attributes.IMandatoryAttribute, + IEquatable + { + public TimeSpan Timeout { get; } + public StreamSubscriptionTimeoutTerminationMode Mode { get; } + + public StreamSubscriptionTimeout(TimeSpan timeout, StreamSubscriptionTimeoutTerminationMode mode) + { + if (timeout.Ticks < 0) + throw new ArgumentException("Timeout must be finite.", nameof(timeout)); + Timeout = timeout; + Mode = mode; + } + + public bool Equals(StreamSubscriptionTimeout other) + { + if (other is null) return false; + if (ReferenceEquals(this, other)) return true; + return Timeout.Equals(other.Timeout) && Mode.Equals(other.Mode); + } + + /// + public override bool Equals(object obj) => obj is StreamSubscriptionTimeout attr && Equals(attr); + + /// + public override int GetHashCode() + { + unchecked + { + var initial = Timeout.GetHashCode(); + return (initial * 397) ^ Mode.GetHashCode(); + } + } + + /// + public override string ToString() => $"StreamSubscriptionTimeout(timeout={Timeout.TotalMilliseconds}ms, mode={Mode})"; + } + + /// + /// Maximum number of elements emitted in batch if downstream signals large demand. + /// + /// Use factory method `CreateOutputBurstLimit` to create. + /// + public sealed class OutputBurstLimit : + Attributes.IMandatoryAttribute, + IEquatable + { + public int Limit { get; } + + public OutputBurstLimit(int limit) + { + Limit = limit; + } + + public bool Equals(OutputBurstLimit other) + { + if (other is null) return false; + if (ReferenceEquals(this, other)) return true; + return Limit == other.Limit; + } + + /// + public override bool Equals(object obj) => obj is OutputBurstLimit attr && Equals(attr); + + /// + public override int GetHashCode() => Limit.GetHashCode(); + + /// + public override string ToString() => $"OutputBurstLimit(limit={Limit})"; + } + + /// + /// Test utility: fuzzing mode means that GraphStage events are not processed + /// in FIFO order within a fused subgraph, but randomized. + /// + /// Use factory method `CreateFuzzingMode` to create. + /// + public sealed class FuzzingMode : + Attributes.IMandatoryAttribute, + IEquatable + { + public bool Enabled { get; } + + public FuzzingMode(bool enabled) + { + Enabled = enabled; + } + + public bool Equals(FuzzingMode other) + { + if (other is null) return false; + if (ReferenceEquals(this, other)) return true; + return Enabled == other.Enabled; + } + + /// + public override bool Equals(object obj) => obj is FuzzingMode attr && Equals(attr); + + /// + public override int GetHashCode() => Enabled.GetHashCode(); + + /// + public override string ToString() => $"FuzzingMode(enabled={Enabled})"; + } + + /// + /// Configure the maximum buffer size for which a FixedSizeBuffer will be preallocated. + /// This defaults to a large value because it is usually better to fail early when + /// system memory is not sufficient to hold the buffer. + /// + /// Use factory method `CreateMaxFixedBufferSize` to create. + /// + public sealed class MaxFixedBufferSize : + Attributes.IMandatoryAttribute, + IEquatable + { + public int Size { get; } + + public MaxFixedBufferSize(int size) + { + Size = size; + } + + public bool Equals(MaxFixedBufferSize other) + { + if (other is null) return false; + if (ReferenceEquals(this, other)) return true; + return Size == other.Size; + } + + /// + public override bool Equals(object obj) => obj is MaxFixedBufferSize attr && Equals(attr); + + /// + public override int GetHashCode() => Size.GetHashCode(); + + /// + public override string ToString() => $"MaxFixedBufferSize(size={Size})"; + } + + /// + /// Limit for number of messages that can be processed synchronously + /// in stream to substream communication. + /// + /// Use factory method `CreateSyncProcessingLimit` to create. + /// + public sealed class SyncProcessingLimit : + Attributes.IMandatoryAttribute, + IEquatable + { + public int Limit { get; } + + public SyncProcessingLimit(int limit) + { + Limit = limit; + } + + public bool Equals(SyncProcessingLimit other) + { + if (other is null) return false; + if (ReferenceEquals(this, other)) return true; + return Limit == other.Limit; + } + + /// + public override bool Equals(object obj) => obj is SyncProcessingLimit attr && Equals(attr); + + /// + public override int GetHashCode() => Limit.GetHashCode(); + + /// + public override string ToString() => $"SyncProcessingLimit(limit={Limit})"; + } + + /// + /// Specifies the name of the dispatcher. This also adds an async boundary. /// /// TBD /// TBD @@ -470,6 +828,60 @@ public SupervisionStrategy(Decider decider) /// TBD public static Attributes CreateSupervisionStrategy(Decider strategy) => new Attributes(new SupervisionStrategy(strategy)); + + /// + /// Enables additional low level troubleshooting logging at DEBUG log level + /// + /// + /// + public static Attributes CreateDebugLogging(bool enabled) + => new Attributes(new DebugLogging(enabled)); + + /// + /// Defines a timeout for stream subscription and what action to take when that hits. + /// + /// + /// + /// + public static Attributes CreateStreamSubscriptionTimeout( + TimeSpan timeout, + StreamSubscriptionTimeoutTerminationMode mode) + => new Attributes(new StreamSubscriptionTimeout(timeout, mode)); + + /// + /// Maximum number of elements emitted in batch if downstream signals large demand. + /// + /// + /// + public static Attributes CreateOutputBurstLimit(int limit) + => new Attributes(new OutputBurstLimit(limit)); + + /// + /// Test utility: fuzzing mode means that GraphStage events are not processed + /// in FIFO order within a fused subgraph, but randomized. + /// + /// + /// + public static Attributes CreateFuzzingMode(bool enabled) + => new Attributes(new FuzzingMode(enabled)); + + /// + /// Configure the maximum buffer size for which a FixedSizeBuffer will be preallocated. + /// This defaults to a large value because it is usually better to fail early when + /// system memory is not sufficient to hold the buffer. + /// + /// + /// + public static Attributes CreateMaxFixedBufferSize(int size) + => new Attributes(new MaxFixedBufferSize(size)); + + /// + /// Limit for number of messages that can be processed synchronously in stream to substream communication + /// + /// + /// + public static Attributes CreateSyncProcessingLimit(int limit) + => new Attributes(new SyncProcessingLimit(limit)); } /// @@ -480,22 +892,159 @@ public static class StreamRefAttributes { /// /// Attributes specific to stream refs. + /// + /// Not for user extension. /// public interface IStreamRefAttribute : Attributes.IAttribute { } - public sealed class SubscriptionTimeout : IStreamRefAttribute + /// + /// Specifies the subscription timeout within which the remote side MUST subscribe to the handed out stream reference. + /// + public sealed class SubscriptionTimeout : IStreamRefAttribute, IEquatable { public TimeSpan Timeout { get; } public SubscriptionTimeout(TimeSpan timeout) { + if (timeout.Ticks < 0) + throw new ArgumentException("Timeout must be finite.", nameof(timeout)); + Timeout = timeout; } + + public bool Equals(SubscriptionTimeout other) + { + if (other is null) return false; + if (ReferenceEquals(this, other)) return true; + return Timeout.Equals(other.Timeout); + } + + /// + public override bool Equals(object obj) => obj is SubscriptionTimeout attr && Equals(attr); + + /// + public override int GetHashCode() => Timeout.GetHashCode(); + + /// + public override string ToString() => $"SubscriptionTimeout(timeout={Timeout.TotalMilliseconds}ms)"; } - + + /// + /// Specifies the size of the buffer on the receiving side that is eagerly filled even without demand. + /// + public sealed class BufferCapacity : IStreamRefAttribute, IEquatable + { + public int Capacity { get; } + public BufferCapacity (int capacity) + { + if (capacity <= 0) + throw new ArgumentException("Capacity must be greater than zero", nameof(capacity)); + Capacity = capacity; + } + + public bool Equals(BufferCapacity other) + { + if (other is null) return false; + if (ReferenceEquals(this, other)) return true; + return Capacity == other.Capacity; + } + + /// + public override bool Equals(object obj) => obj is BufferCapacity attr && Equals(attr); + + /// + public override int GetHashCode() => Capacity.GetHashCode(); + + /// + public override string ToString() => $"BufferCapacity(capacity={Capacity})"; + } + + /// + /// If no new elements arrive within this timeout, demand is redelivered. + /// + public sealed class DemandRedeliveryInterval : IStreamRefAttribute, IEquatable + { + public TimeSpan Timeout { get; } + + public DemandRedeliveryInterval(TimeSpan timeout) + { + if (timeout.Ticks < 0) + throw new ArgumentException("Timeout must be finite.", nameof(timeout)); + + Timeout = timeout; + } + + public bool Equals(DemandRedeliveryInterval other) + { + if (other is null) return false; + if (ReferenceEquals(this, other)) return true; + return Timeout.Equals(other.Timeout); + } + + /// + public override bool Equals(object obj) => obj is DemandRedeliveryInterval attr && Equals(attr); + + /// + public override int GetHashCode() => Timeout.GetHashCode(); + + /// + public override string ToString() => $"DemandRedeliveryInterval(timeout={Timeout.TotalMilliseconds}ms)"; + } + + /// + /// The time between the Terminated signal being received and when the local SourceRef determines to fail itself + /// + public sealed class FinalTerminationSignalDeadline : IStreamRefAttribute, IEquatable + { + public TimeSpan Timeout { get; } + + public FinalTerminationSignalDeadline(TimeSpan timeout) + { + if (timeout.Ticks < 0) + throw new ArgumentException("Timeout must be finite.", nameof(timeout)); + + Timeout = timeout; + } + + public bool Equals(FinalTerminationSignalDeadline other) + { + if (other is null) return false; + if (ReferenceEquals(this, other)) return true; + return Timeout.Equals(other.Timeout); + } + + /// + public override bool Equals(object obj) => obj is FinalTerminationSignalDeadline attr && Equals(attr); + + /// + public override int GetHashCode() => Timeout.GetHashCode(); + + /// + public override string ToString() => $"FinalTerminationSignalDeadline(timeout={Timeout.TotalMilliseconds}ms)"; + } + /// /// Specifies the subscription timeout within which the remote side MUST subscribe to the handed out stream reference. /// public static Attributes CreateSubscriptionTimeout(TimeSpan timeout) => new Attributes(new SubscriptionTimeout(timeout)); + + /// + /// Specifies the size of the buffer on the receiving side that is eagerly filled even without demand. + /// + public static Attributes CreateBufferCapacity(int capacity) + => new Attributes(new BufferCapacity(capacity)); + + + /// + /// If no new elements arrive within this timeout, demand is redelivered. + /// + public static Attributes CreateDemandRedeliveryInterval(TimeSpan timeout) + => new Attributes(new DemandRedeliveryInterval(timeout)); + + /// + /// The time between the Terminated signal being received and when the local SourceRef determines to fail itself + /// + public static Attributes CreateFinalTerminationSignalDeadline(TimeSpan timeout) + => new Attributes(new FinalTerminationSignalDeadline(timeout)); } } diff --git a/src/core/Akka/Util/Internal/Extensions.cs b/src/core/Akka/Util/Internal/Extensions.cs index 2406af4fcf2..b008966f756 100644 --- a/src/core/Akka/Util/Internal/Extensions.cs +++ b/src/core/Akka/Util/Internal/Extensions.cs @@ -61,7 +61,7 @@ public static IEnumerable SplitDottedPathHonouringQuotes(this string pat var j = 0; while (true) { - if (j > path.Length) yield break; + if (j >= path.Length) yield break; else if (path[j] == '\"') { i = path.IndexOf('\"', j + 1);