-
Notifications
You must be signed in to change notification settings - Fork 4.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Async parallel foreach #1946
Comments
|
@ichensky Compare this with the synchronous
But |
Hmm. Probably wants to be based on dotnet/corefx#32640 ? |
@Clockwork-Muse Possibly. I think |
@svick Yes accepting |
Usually when making |
Kool.. |
This API is urgently needed. In particular, there is no way currently to process a set of "items" in parallel and in an async way. The source for the code in the opening post is https://blogs.msdn.microsoft.com/pfxteam/2012/03/05/implementing-a-simple-foreachasync-part-2/. I am very active on Stack Overflow and I see people needing this all the time. People then use very bad workarounds such as starting all items in parallel at the same time, then I needed this API multiple times myself. So I developed a solution based on the code in this article. I made the following enhancements which the framework should also have:
Should the DOP be automatic? In my opinion, we have no choice but to force the caller to specify an explicit DOP. Usually, user code will perform IO as part of the loop (that's likely the reason for using async). The system has no way of knowing the right DOP for IO. Different devices (disk, SSD, web service) have vastly different characteristics. Also, the DOP might be intentionally low in order to not overload the system being called. IMO, no auto-tuning is possible. We cannot even make the parameter optional and default to the CPU count. The CPU count is unrelated to the ideal IO DOP! This is why IO on the thread pool can explode so radically. The auto-tuning heuristics can get into a state where they add unbounded amounts of threads (see these issues: https://github.com/dotnet/coreclr/issues/1754, https://github.com/dotnet/coreclr/issues/2184). |
@tomesendam @GSPP could you please add the full proposal according to the first step in the doc https://github.com/dotnet/corefx/blob/master/Documentation/project-docs/api-review-process.md. Thanks. |
@tarekgh I will try when I find the time to work on this! |
I agree this is urgently needed for the reasons @GSPP lists. I'd also add it's urgently needed because I'd go so far as to say that Parallel.ForEach should maybe throw an exception when passed a Task returning method- or at the very least VS should ship with an analyzer that points out this is an anti-pattern. Even then, this only solves half the problem. Users need a proper async API to call as an alternative. |
It's not Task-returning though. It's |
This is unlikely will be in v3.0. Please apply the request I mentioned in my comment https://github.com/dotnet/corefx/issues/34233#issuecomment-455354106 to be able to look at it as whole and move forward. |
@tomesendam if you want I can open the api proposal issue I think that a base parallel foreach should accept a Func<T, CancellationToken, Task> asyncBody; The same CT that is passed in There could be other overloads, not accepting a CT for the body that would call the "base" overload, discarding the CT. So in essence: Task<ParallelLoopResult> ForEachAsync<T>(IAsyncEnumerable<T> source, ParallelOptions parallelOptions, Func<T, Task> asyncBody)
{
return ForEachAsync(source, parallelOptions, (work, ct) => asyncBody(work));
}
Task<ParallelLoopResult> ForEachAsync<T>(IAsyncEnumerable<T> source, ParallelOptions parallelOptions, Func<T, CancellationToken, Task> asyncBody); |
@MihaZupan Thanks! @tarekgh I'll look into everything next week as I have a week off then. But maybe I can find time this weekend! |
@tarekgh Reason I'm putting this in a comment is to check if this is what you expect! If this is something you can work with I'll edit the original issue. There are situations where a asynchronous parallel foreach is needed instead of the already existing There are examples on this issue already here is one: https://devblogs.microsoft.com/pfxteam/implementing-a-simple-foreachasync-part-2/ /// <summary>
/// Executes a foreach asynchronously.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="source">The source.</param>
/// <param name="dop">The degrees of parallelism.</param>
/// <param name="body">The body.</param>
/// <returns></returns>
public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
{
return Task.WhenAll(
from partition in System.Collections.Concurrent.Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate
{
using (partition)
{
while (partition.MoveNext())
await body(partition.Current);
}
}));
} Proposed APITask<ParallelLoopResult> ForEachAsync<T>(IAsyncEnumerable<T> source, ParallelOptions parallelOptions, Func<T, Task> asyncBody)
{
return ForEachAsync(source, parallelOptions, (work, ct) => asyncBody(work));
}
Task<ParallelLoopResult> ForEachAsync<T>(IAsyncEnumerable<T> source, ParallelOptions parallelOptions, Func<T, CancellationToken, Task> asyncBody);
Task<ParallelLoopResult> ForEachAsync<T>(IEnumerable<T> source, ParallelOptions parallelOptions, Func<T, Task> asyncBody)
{
return ForEachAsync(source, parallelOptions, (work, ct) => asyncBody(work));
}
Task<ParallelLoopResult> ForEachAsync<T>(IEnumerable<T> source, ParallelOptions parallelOptions, Func<T, CancellationToken, Task> asyncBody)
{
//possibly cast IEnumerable<T> to IAsyncEnumerable<T>
} Open questions |
As far as I understand it, conversion between A From what I gathered from the source code, implementing this "properly" is a non-trivial change, requiring changes to the I think input from @stephentoub would be valuable here. |
I feel very strongly that it must be manual and a required parameter. See my comment (at the bottom). I'm speaking from experience and theoretically there.
If you mean the non-generic version, no. But I think you meant the generic version. Definitely yes. Often, the input is a list of work items (not IO). E.g. URLs, file names, DTOs, ... Of course,
New open question: Should the partitioner concept be supported at all? I personally have never needed anything but a single items partitioner. For IO work, batch partitioning would likely not increase efficiency much. Also note, that the default behavior must be no batching or chunking. I'm quoting my point 4:
The TL;TD is that IO needs exact DOP control. Anything in the way of that is a non-starter. |
I agree on setting dop explicitly. I too have never used the Partitioner directly. I don't think that it would be needed for IAsyncEnumerable, was just pointing it out as a part that would need changes during implementation. |
We should think about the order that source items are processed. We cannot be fully deterministic here but we have choices:
Often, it is desirable to process items in approximate sequential order. For example, if you have a list of database keys you want to retrieve you likely want to send the queries to the database in order. That way data locality is increased and cache usage is better. Or, if you want to read data blocks from a file you want to issue the IOs in ascending file position order so that the disk head sweeps sequentially over the disk (elevator algorithm). The performance gains from this can be large. Data locality is important. This is an argument for (2). An argument against (1) is that since this feature is mainly used for IO any small CPU optimization coming from (1) would not matter much. I believe that (2) must be supported or else it can be a performance dealbreaker in some scenarios. We can independently decide what the default should be. My feeling is that only (2) should be supported. |
I just wanted to create an issue for the exact same request. @tomesendam do You think that adding one overload that supports IProgress would be fine?
This single overload would be useful for progress notification (for example we want to know how many files are downloaded and notify the user on progress) What do You guys think? |
@Misiu you can send the progress from inside the |
@GSPP I forgot about that :) |
@tomesendam I am not a team member but I have contributed multiple times by writing up a comprehensive treatment of some issue that I think the framework should solve. This is how I would approach this: I think think you should now write down the full API surface in C#. Also, all relevant design points must be discussed and decided. I contributed some in my posts in this issue. You'd need to read through everything that was said here and write in your proposal how you would decide. I'd create a numbered list of all design aspects. Say what the design question is, the possible decisions, their pros and cons and your tentative decision. This enables the team to then take your comprehensive summary into an internal meeting and produce feedback. This can lead to a final decision about this API. After that it can be implemented with no surprises left. I think it's awesome that we are now making progress on this API! This is a very important feature that will come to be used a lot. We need to get the design exactly right. There is no second chance because compatibility forces design mistakes to be carried forever. |
@tarekgh any chance this might get added with .NET Core 3.0? |
@Misiu we are very late on adding such feature in 3.0. meanwhile you can use the workaround mentioned in the issue description. |
Any update on this issue, btw? |
@PureKrome honestly I've forgotten about this issue. I'll create the complete list of design decisions I can come up with by the end of today. This most likely will not be enough so others should feel free to comment more design decisions and I'll add them to the list. |
@YohanSciubukgian I don't think this API is the correct place for such a feature. That's better suited for the devs using this API to do on their own. |
Could we get an update on this please? |
@tarekgh @stephentoub how can we move this forward? Can someone summarize the contentious points and proposed options? |
I think the only open issue here is DOP should be forced or can be defaulted. I think we should allow the default and we can clarify that in the docs with some different use case examples. but I'll let @tomesendam and @stephentoub continue their discussion to reach some conclusion here. I moved this to 6.0 release. |
This sounds good to me. If @stephentoub has other ideas I'll gladly hear them out! |
Yup. So... what is the proposed set of APIs now? |
If I understand correctly, the proposed APIs on the top still hold. the detailed behavior may need to change in the description. @tomesendam please correct me if I spelled anything wrong here. |
Correct, I'll change the description to fit the default DOP. |
one last question I have, as we are going to have a default anyway, does it make sense to make ParallelOptions parameter be nullable to avoid allocations if the caller is happy with whatever default we'll have? |
Yes, I don't see a problem with that. I'll update it for now. Can always revert if others in this issue, have problems with that. |
Personally, rather than making it nullable, I'd prefer an additional overload that just didn't take one. Since the options is also how a CancellationToken is specified, that delegate also wouldn't take a CancellationToken, e.g. public static Task ForEachAsync<T>(IEnumerable<T> source, Func<T, ValuteTask> asyncBody) Then a user isn't forced into unnecessary ceremony to provide things for the default common case. There are other aspects of Parallel.ForEach that aren't represented in this proposal, e.g. support for ParallelLoopState, support for Partitioner, task state, etc. Is the plan to just leave those out for now and add overloads for them only if needed later? |
I think this is a good idea. @tomesendam could you please update the proposal by adding the overloads?
I think we can wait to see if there is a demand on that and then add the overloads. |
@stephentoub I agree with @tarekgh, I would wat and see if there's demand for those overloads. And I've changed the description to reflect your comment. |
And now the issue is marked ready for design review :-) |
I'll post my opinions on these: This API is meant to be used with async IO. Any async IO carries a lot more overhead than typical parallel algorithms or allocations bring. I do not think that CPU efficiency should be a significant concern here.
|
namespace System.Threading.Tasks
{
public static class Parallel
{
public static Task ForEachAsync<T>(IAsyncEnumerable<T> source, Func<T, CancellationToken, ValueTask> body);
public static Task ForEachAsync<T>(IAsyncEnumerable<T> source, CancellationToken cancellationToken, Func<T, CancellationToken, ValueTask> body);
public static Task ForEachAsync<T>(IAsyncEnumerable<T> source, ParallelOptions parallelOptions, Func<T, CancellationToken, ValueTask> body);
public static Task ForEachAsync<T>(IEnumerable<T> source, Func<T, CancellationToken, ValueTask> body);
public static Task ForEachAsync<T>(IEnumerable<T> source, CancellationToken cancellationToken, Func<T, CancellationToken, ValueTask> body);
public static Task ForEachAsync<T>(IEnumerable<T> source, ParallelOptions parallelOptions, Func<T, CancellationToken, ValueTask> body);
}
} |
@stephentoub great work! Excited to see what will be done with this. |
A small step for a developer, a big step for mankind. Awesome! |
There are situations where a asynchronous parallel foreach is needed instead of the already existing
Parallel.ForEach()
and while it's not rocket science to implement yourself I think it would be a nice feature to be implemented in corefx. This should be seen as the asynchronous alternative to the existingParallel.ForEach()
. And should takeIEnumerable<T>
as well asIAsyncEnumerable<T>
.There are examples on this issue already here is one: https://devblogs.microsoft.com/pfxteam/implementing-a-simple-foreachasync-part-2/
Proposed API
Closed questions
While I consider these questions to be closed feel free to continue discussion on them.
DOP Will have a default, just like parallel linq. However documentation should make it clear that this default can be wrong in various cases. Such as detailed by @GSPP in various comments.
Yes it should
No we should ensure the specified DOP is adhered to exactly. Creating batches can lead to a smaller than expected effective DOP.
By default we should try to process source items sequentially to improve data locality. Reason being: HDD read performance will increase drastically. Furthermore cache usage will be improved. It should however be made very clear to people in docs, that the order cannot be guaranteed
There should be one but I'll leave that outside of this issue.
Yes ValueTask is more accomodating than Task (cheaper to convert Task to ValueTask than the other way arround). Furthermore it should result in a peformance increase.
Yes this should be specified see This comment for more info
No there shouldn't be. It would have little to no benefit, but would cause a lot of implications (ambiguity, analysis becomes harder)
The text was updated successfully, but these errors were encountered: