-
Notifications
You must be signed in to change notification settings - Fork 416
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Await: asynchronous evaluation of sequence elements #205
Conversation
Interesting. Some thoughts, in no particular order
While this is very useful, I'm unsure this is a good fit for MoreLINQ and it should instead be put in a separate library. Processing of async sequences is likely to require a lot more functions and functionality, and I don't know if this will end up polluting the MoreLINQ library with either more dependencies, or reimplementation of functionality available elsewhere[1]. Perhaps putting this into a separate dll ( [1] For example, I made my own implementation of public static IEnumerable<TResult> SelectAsync<TSource, TResult>(this IEnumerable<TSource> src, Func<TSource, Task<TResult>> asyncSelector, int maxConcurrency) {
// Merge iterates eagerly over the source sequence
// However, Observable.FromAsync will only execute the function when it is subscribed to
// Therefore, we *must* call the async selector inside the function, otherwise the maxConcurrency is useless
return src.Select(s => Observable.FromAsync(() => asyncSelector(s))).Merge(maxConcurrency).ToEnumerable();
} |
@fsateler Thanks for all that feedback! The initial implementation was a very poor crack at the problem. The latter commits address all of your points & The updated usage now looks like this: var beatles = new[]
{
"John Lennon",
"Paul McCartney",
"George Harrison",
"Ringo Starr",
};
var http = new HttpClient();
var results =
from e in beatles.SelectAsync(async e => new
{
Topic = e,
Response = await http.SendAsync(new HttpRequestMessage(HttpMethod.Get, "https://en.wikipedia.org/wiki/" + e.Replace(' ', '_')))
})
select new
{
e.Topic,
e.Response.StatusCode,
e.Response.Content.Headers.ContentLength,
e.Response.Content.Headers.ContentType.MediaType,
};
foreach (var e in results)
Console.WriteLine(e.ToString());
Not sure. Take a look at the code now.
That's what I'm afraid of too thus why I consider this PR as an RFC (should have a label for that?) for now.
I don't want to create & maintain another whole library for just one method in the async category. Perhaps a
Thanks for sharing that and I had done something very similar in the past in projects where I already had a dependency on Rx. However, for projects that don't, I felt it may be possible to implement it simply using existing infrastructure from the framework. |
There is already a
This sounds like a good idea, but orthogonal to what I proposed.
It is certainly possible. But, then we get back to the most important issue:
.
What I mean is that mixing async code and sequences is usually more complex than just having a |
MoreLinq/SelectAsync.cs
Outdated
var queue = new BlockingCollection<object>(); | ||
using (var _ = source.GetEnumerator()) | ||
{ | ||
var item = _; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this indirection?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To stop ReSharper from complaining about potentially accessing a disposed closure. I found this was good enough for now instead of polluting the code with suppression comments.
MoreLinq/SelectAsync.cs
Outdated
if (maxConcurrency <= 0) throw new ArgumentOutOfRangeException(nameof(maxConcurrency)); | ||
if (selector == null) throw new ArgumentNullException("selector"); | ||
|
||
var queue = new BlockingCollection<object>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use Tuple<ExceptionDispatchInfo, Task<TResult>>
instead of object? Then, use an approach like the one in Go or Javascript: The first item is null if no error ocurred, the second item is null if an error occurred.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's exactly how it started out! Don't believe me? See my Throttle
implementation that was the basis for SelectAsync
. In the end, I felt that Tuple
represents a product and not a union/either/choice. With a tuple, you can get invalid combinations & while both approaches need if-ing on the various cases, using a super type like object
as a union of the run-time types just seemed better as a shortcut. Ideally I'd have Notification<T>
handy because that's exactly what's needed here but I didn't want to go out and add new types as I'm in two minds of adding this to MoreLINQ.
For avoidance of doubt, I'm not suggesting you should provide these methods for |
I think it's understood that there needs to be a balancing act here. If streaming is the greater benefit then order can't be guaranteed and if order is important then you need to follow-up with an I have the same concerns as you about the proliferation of other async-enabled operators, which is why I continue to be in two minds about adding this to MoreLINQ. I reckon though that |
Some open points:
|
The
So it is safe (or not incorrect) to assume so. |
There are now new overloads where the projection function receives an additional argument that's a Below is an example using an overload that sends a var beatles = new[]
{
"John Lennon",
"Paul McCartney",
"George Harrison",
"Ringo Starr",
};
var http = new HttpClient();
var results =
from e in beatles.SelectAsync(async (e, ct) =>
{
try
{
var url = "https://en.wikipedia.org/wiki/" + e.Replace(' ', '_');
return new
{
Topic = e,
Response = await http.SendAsync(new HttpRequestMessage(HttpMethod.Get, url), ct),
};
}
catch (OperationCanceledException)
{
Console.WriteLine("ABORT! " + e);
throw;
}
})
select new
{
e.Topic,
e.Response.StatusCode,
e.Response.Content.Headers.ContentLength,
e.Response.Content.Headers.ContentType.MediaType,
};
foreach (var e in results.Take(2))
Console.WriteLine(e.ToString()); Because we only care about 2 results (note the
Since |
# Conflicts: # MoreLinq.Test/project.json # MoreLinq/project.json
System.Collections.Concurrent is available in .NET Standard 1.1 and above, and we don't want to add a whole new target for that just now since it is covered by the .NET Standard 2.0 target.
Cool and I'd like to work towards getting moving it out of the experimental space. If we can settle on a name and release it as experimental then there'll be time to work out unit tests and iron out any kinks from battle-testing. |
I realised that it is a bit useless to have a projection function in the first overload. The method can be reduced to simply being an extension of a sequence of tasks ( |
With 09497dc, I'm proposing to rename public static IAwaitQuery<TResult> Await<T, TResult>(
this IEnumerable<T> source,
Func<T, CancellationToken, Task<TResult>> evaluator) It's purpose is not as much to project (T → TResult) as it is to simply supply or inject the @fsateler What do you think? Am I twisting things to force retrofitting a definition that favours |
@fsateler Did I miss anything from the changes you requested in your last review? Wondering what's keeping to ship this as an experiment? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the new names better.
I think this is ready to go into the Experimental namespace.
This PR is just an idea, an RFC to solicit feedback. I'm not even sure this belongs in MoreLINQ as it may expand the scope of the library to cover async scenarios.
Output is: