Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[API Proposal] Channel.CreateBoundedPrioritized #101292

Open
andreas-eriksson opened this issue Apr 19, 2024 · 7 comments
Open

[API Proposal] Channel.CreateBoundedPrioritized #101292

andreas-eriksson opened this issue Apr 19, 2024 · 7 comments
Labels
api-suggestion Early API idea and discussion, it is NOT ready for implementation area-System.Threading.Channels needs-further-triage Issue has been initially triaged, but needs deeper consideration or reconsideration
Milestone

Comments

@andreas-eriksson
Copy link

Background and motivation

This is a follow up to the UnboundedPrioritizedChannel #62761

It would be nice to have a prioritized bounded channel so that consumers could work on the highest priority item in a channel while still being able to limit the number of items held in the channel.

cc: @stephentoub

API Proposal

namespace System.Threading.Channels;

public class Channel
{
+   public static Channel<T> CreateBoundedPrioritized<T>();
+   public static Channel<T> CreateBoundedPrioritized<T>(BoundedPrioritizedChannelOptions<T> options);
}
+public sealed partial class BoundedPrioritizedChannelOptions<T> : BoundedChannelOptions
+{
+   public System.Collections.Generic.IComparer<T>? Comparer { get; set; }
+}

API Usage

var queue = Channel.CreateBoundedPrioritized<Foo>(100);

Alternative Designs

No response

Risks

I may be hard to understand what happens when items with higher priority are written to the channel when the channel has reached capacity, but I don't think there is ant other option that is valid.

@andreas-eriksson andreas-eriksson added the api-suggestion Early API idea and discussion, it is NOT ready for implementation label Apr 19, 2024
@dotnet-policy-service dotnet-policy-service bot added the untriaged New issue has not been triaged by the area owner label Apr 19, 2024
@stephentoub
Copy link
Member

I may be hard to understand what happens when items with higher priority are written to the channel when the channel has reached capacity, but I don't think there is ant other option that is valid.

What behavior are you proposing, and why aren't there any other valid behaviors?

@andreas-eriksson
Copy link
Author

On second thought I think it should be possible to implement all FullMode options. The DropNewest could probably drop the least prioritized and the DropOldest could drop the item next in turn.

It might however be worth implementing new options as it is not crystal clear what item to drop otherwise.

public enum BoundedPrioritizedChannelFullMode
{
    /// <summary>Wait for space to be available in order to complete the write operation.</summary>
    Wait,
    /// <summary>Remove and ignore the least prioritized item in the channel in order to make room for the item being written.</summary>
    DropLeastPrioritized,
    /// <summary>Remove and ignore the most prioritized item in the channel in order to make room for the item being written.</summary>
    DropMostPrioritized,
    /// <summary>Drop the item being written.</summary>
    DropWrite
    }

@ericstj ericstj added needs-further-triage Issue has been initially triaged, but needs deeper consideration or reconsideration and removed untriaged New issue has not been triaged by the area owner labels Jul 18, 2024
@ericstj ericstj added this to the Future milestone Jul 18, 2024
@Zetanova
Copy link

I wrote a priority ChannelTaskScheduler for the akka.net project view years ago.
It can be used as a regular TaskScheduler.

The assumption to get the highest priority item by using a simple Comparer does not solve all use cases.
In many queues the priority of an item can depend on waiting-time or better depending on how often it got passed by a higher priority item.

There should be a counter for each item that should be passed to the Comparer
or a State instance to include processing stats.

Example:

  1. An item D with priority-low got passed 5 times by a higher priority item.
  2. An item A with priority-high got queued up
  3. Both items are compared with the help of the supplied Comparer
  4. The comparers result is that D has higher priority then A,
    because of some internal decision like if count >= 5 then highest

The ChannelTaskScheduler uses 4 priorities and creates for each priority an dedicated Channel.
But dequeues depending on how many higher priority items got processed before.
see DoWork

@julealgon
Copy link

What would a valid workaround to this be today? I assume one would need 2 channels: one for "normal" priority items, and another for "high priority" ones, and then have the consumer check the high priority channel first?

@Zetanova
Copy link

Don't really know what's the best/performant solution is,
but the current one simply lacks the flexibility.

To use multiple channels for this feature is only a workaround.
The awaiting of multiple channels at the same time (ValueTask => Tasks => WhenAny)
it's complex/not optimal.

I would have two ideas, they could be used both together:

  • Include a counter to the channel and tag each item with a position.
    Passed count would be the difference of (current counter - item position)
    The comparer would take then a tuple of (Item,PassedCount) to make it's comparison
  • Add priority buckets into the mix.
    Instead of the comparer, the user need supply then a bucket-id function.
    I think a Hashed Wheel Timer concept would be optimal for it.

@Zetanova
Copy link

By the way the ordering need to be declared or optional possible to set.
It is currently not defined, if an absolute or best-effort ordering is used.

@Zetanova
Copy link

I had one more idea what could be simple to implement.

Beside to taking an instance of a comparer, the Channel could take a Comparer Factory
A new instance of the comparer could be created on the second item write and used until the first item read.
If necessary, a comparer instance could be created on the second read and used until the first item write.

With the combination of the item-position the user could implement a proper priority algorithms
and initialize some state like take the time only once in the constructor.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api-suggestion Early API idea and discussion, it is NOT ready for implementation area-System.Threading.Channels needs-further-triage Issue has been initially triaged, but needs deeper consideration or reconsideration
Projects
None yet
Development

No branches or pull requests

5 participants