Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prior art for this kind of concurrency in other languages #2

Open
bakkot opened this issue Feb 6, 2023 · 11 comments
Open

Prior art for this kind of concurrency in other languages #2

bakkot opened this issue Feb 6, 2023 · 11 comments

Comments

@bakkot
Copy link
Collaborator

bakkot commented Feb 6, 2023

I looked for prior art in other languages and libraries and mostly came up empty.

Python's async iterators aren't usable with its built-in global map and filter functions. There's imap, but it only works with sync iterables, and despite what you might infer from the (extremely sparse) docs, it's not lazy - it will pump the underlying iterator indefinitely, and starts doing work as soon as you invoke it.

Java doesn't really have async iterators, but its Flow class is close, kind of. Unfortunately there's no built-in map or filter for it. I did find a library on top of Flow which does something like this, including an implementation for filter which looks very close to my WIP implementation (which you should not look too closely at yet).

Swift has async iterators with helpers including map and filter like those proposed here (i.e., they take async callbacks). But the next method on Swift's async iterators is mutating, which means you're statically prevented from calling it a second time before the first finishes, so you can't get concurrency. I did find a comment from the original design thread where someone asks about concurrency, but they got no response. Later in the thread there's a comment which mentions the possibility of a buffering type which attempts to fetch multiple items eagerly, though presumably that's just so that later reads of the resulting wrapper are faster, since those fetches can't happen concurrently.

Rust does not yet have async iterators, though the widely-used futures crate has streams and corresponding helpers like map/and_then (the sync/async-function-taking versions respectively) and filter. As with Swift, the language will statically prevent you from calling .next a second time before the first finishes (because it borrows the iterator), so you can't get concurrency in the helpers. EDIT: actually, that's not entirely true; see below.

C# has IAsyncEnumerable, and there's a project providing extension methods like Select (map) and Where (filter). But the design of IAsyncEnumerable does not lend itself to advancing more than one item at a time: instead of a next method which returns an optional value, it has a Current property and a MoveNextAsync method which returns a Future for a boolean. And indeed I found an article on MSDN which mentions "MoveNextAsync shouldn’t be called again on a given enumerator until the previous call to it has completed", though I don't see this documented in the docs for that method.


As for JS libraries,

RxJS is a little hard to compare, since it's a different paradigm. There is no built-in async version of filter or map. But there are concatMap and mergeMap (formerly flatMap), which can return Observables or anything which can be turned into an Observable, including Promises, so you can treat them as an async map, and also use them to build an async filter. concatMap drains each resulting observable before invoking the mapper again, preserving ordering at the cost of being sequential, whereas mergeMap invokes the mapper as soon as possible and merges the results, giving you concurrency at the cost of losing ordering. I don't see an easy way to do async mapping or filtering concurrently without losing ordering.

IxJS has map and filter for async iterators, but they're based on async generators, so calling .next multiple times doesn't give you concurrency (but the implementation is trivially correct, which is nice).

iter-tools actually has exactly the bufferAhead method discussed in tc39/proposal-iterator-helpers#262, there spelled asyncBuffer, and for the same reason. Nice! It also has map and filter methods, but they're based on async generators, so you don't get concurrency even if you use bufferAhead despite what the docs for bufferAhead suggest. There's an open issue about this.

iter-ops also has map and filter, but it takes a somewhat more radical approach - it allows the async iterator you get from map to produce promises-for-promises, which is arguably a contract violation, and provides wait and waitRace helpers to unwrap them (with the latter calling .next multiple times and potentially giving you results out of order). As you can guess from waitRace it does allow you to invoke .next multiple times and get concurrency. Unfortunately doing so can cause you to get results out of order even if you don't use waitRace, as in the following snippet:

iter-ops out of order results
import { pipeAsync, filter } from 'iter-ops';

let sleep = ms => new Promise(res => setTimeout(res, ms));

let iterable = pipeAsync(
  [1, 3, 2, 1.1],
  filter(async x => {
    await sleep(x * 10);
    return x > 1;
  }),
);

let it = iterable[Symbol.asyncIterator]();
console.log(await Promise.all([
  it.next().then(v => v.value),
  it.next().then(v => v.value),
  it.next().then(v => v.value),
]));

// prints `[ 1.1, 3, 2 ]`

The modern-async package provides some helpers like mapGenerator which take an explicit concurrency parameter and produce async iterators. That's not quite the same, because it will keep reading ahead and buffering indefinitely as long as at least one requested item is not ready yet - e.g. if the first call to the mapper takes 10 seconds and the subsequent ones take 1, it will buffer 9 items even with a concurrency parameter of 2. Still, the experience of using it is pretty close.

@bakkot bakkot changed the title Prior art for this kind of parallelism in other languages Prior art for this kind of concurrency in other languages Feb 6, 2023
@trxcllnt
Copy link

trxcllnt commented Apr 3, 2023

IxJS has map and filter for async iterators, but they're based on async generators, so calling .next multiple times doesn't give you concurrency (but the implementation is trivially correct, which is nice).

Co-author/maintainer of IxJS here -- we do support concurrent flattening operators similar to Rx (flatMap() flattens an infinite set of inner sequences, switchMap() switches to flattening latest inner sequence, concatMap() flattens each inner sequence in order). It's a bit of a headache to maintain because we flatten the sequences via Promise.race(), but it works.

For example, this example won't deterministically yield results in a, b, c order because their due time is random.

@bakkot
Copy link
Collaborator Author

bakkot commented Apr 3, 2023

Ah, neat, thanks for the additional context.

The thing proposed here would be something a bit different - from what I can tell, pumping a flatMap iterator in IxJS a single time will buffer arbitrarily many values from the underlying iterator, up to the specified concurrency. And it will never attempt to pull from any individual iterator multiple times until the previous call to .next on that iterator has settled. So my description is still accurate - multiple calls to .next isn't the thing driving concurrency in IxJS.

@trxcllnt
Copy link

trxcllnt commented Apr 4, 2023

That's correct. flatMap() pulls the inner sequences as fast as the outer source yields them, and there is only a single pending Promise<IteratorResult<T>> per inner sequence at a time.

That said, there's no technical limitation to doing what you're describing. I've just never seen a feature request for it, and I don't think it was part of the original LINQ implementations (IObservable, IAsyncEnumerable, etc.). @mattpodwysocki please correct me if I'm wrong 🙂.

Would you expect the concurrently running results to be delivered in order, or is out of order OK? If the former, it seems like that might require buffering per concurrent operator.

@trxcllnt
Copy link

trxcllnt commented Apr 4, 2023

After thinking about it more, I'm not sure this can be implemented, at least not without introducing additional enumeration strategies.

If the consumer pulls a value from the flatMap iterator, from which sequence should the flatMap operator pull that value? Should it pull from its outer source? Load-balance across the inner sequences? Pull another value from all active sequences, and/or the source? etc.

Any choice (including the one Ix implicitly makes) seems arbitrary, but all seem like they could be useful under the right conditions. I could imagine a suite of concurrent flattening operators for each case, expanding on the set of existing concurrent = 0 | 1 | Infinity operators.

@bakkot
Copy link
Collaborator Author

bakkot commented Apr 4, 2023

The plan is for this kind of concurrency to be order-preserving, which means that the obvious choice is to pull from the current inner sequence until it's finished - any other option means doing work which may not be relevant until arbitrarily far in the future, since you don't know how many more items will be vended by the inner sequence, all of which will need to be vended by flatMap before you use results from a different sequence.

If the first of X outstanding calls to .next on an inner sequence completes with { done: true }, flatMap will then immediately pull from the outer sequence again and then call .next() on the resulting new inner sequence X times.

That is, you only get concurrency in the current inner sequence.

I agree there's various other kinds of concurrency possible in flatMap, but I think they mostly don't fit this proposal - we can't really provide the full suite without making the API too complicated. In the specific case that you want concurrency in pulling from the outer sequence, I'm intending to add a bufferAhead(N) helper which eagerly pumps and buffers N items, and you could get concurrency on the outer sequence by replacing .flatMap(fn) with .map(fn).bufferAhead(N).flatMap(x => x). It's a bit annoying but still pretty clear, I think.

@trxcllnt
Copy link

trxcllnt commented Apr 4, 2023

Monadic bind (aka, >>=, flatMap, SelectMany, etc.) is the operator in terms of which all other operators can be implemented, so getting this one operator right is the most important thing to me. You automatically get concurrent map/filter/etc. for free if flatMap is implemented with sufficient flexibility.

(Apologies and/or stop me if you know all this already, It's been a while since I've been involved in TC39, and I'm not familiar with your background. I'm not meaning to come off as patronizing/condescending, but I also don't want to reference a bunch of LINQ/FP concepts without context).

I'm glad to know order matters. What you've described sounds like one way to preserve order, but not necessarily the only way, and I worry this definition of bind is unnecessarily restrictive. For example, what you're describing sounds like LINQ's concatMap. concatMap can be implemented in terms of flatMap, but not the inverse.

I'd have to think a bit harder before I felt confident to say LINQ's (aka IAsyncEnumerable) original definition of flatMap is still correct for async-pull sequences with concurrent tasks... assuming such behavior is sound in all other ways, my gut says it is (or is very close).

@trxcllnt
Copy link

trxcllnt commented Apr 4, 2023

That is, you only get concurrency in the current inner sequence.

This sounds like a combination of sequential and concurrent flattening:

const listOfListsOfUrls = from([
  // first, fetch these concurrently
  from(["http://foo.com", "http://bar.com"]),
  // then fetch these concurrently after the previous batch returns
  from(["http://baz.com", "http://qux.com"]),
]);

// Load each list of URLs in order
const sequentialBatches = concatMap(listOfUrls => {
  // flatMap's definition ensures all inner sequences are enumerated
  // concurrently, so these fetch() requests run at the same time
  const concurrentFetches = flatMap(x => fetch(x))(listOfUrls);
  return concurrentFetches;
})(listOfListsOfUrls);

// Or a more concise version:
// concatMap(flatMap(fetch))(listOfListsOfUrls);

In the above, the concurrency is achieved by wrapping and unwrapping the AsyncIterables. This is a general design pattern/feature of monadic designs and solves a large number of problems elegantly. So to reiterate, this is an example of why I care about getting flatMap right.

@bakkot
Copy link
Collaborator Author

bakkot commented Apr 4, 2023

You automatically get concurrent map/filter/etc. for free if flatMap is implemented with sufficient flexibility.

Well, kind of, but that's more relevant to producer-driven concurrency rather than consumer-driven concurrency. The idea here is to enable consumer-driven concurrency, where the consumer can pull multiple times and the underlying iterator, if it supports it, can do the work to satisfy all those requests concurrently. And of course if you don't pull concurrently then you get no concurrency.

That means that the helpers themselves don't need to specify a degree of concurrency. There's just going to be a map/filter/flatMap methods, with the same signature they always have - no concurrency parameters anywhere. The way you get concurrency is by the consumer pulling concurrently - e.g. let it = x.filter(p); await Promise.all(it.next(), it.next());.

But you can't get this sort of consumer-driven concurrency with a filter implemented in terms of flatMap. You'll get the same observable behavior, in a functional sense - that is, you get the same resulting sequence, just not concurrency. I don't think it's possible for this to be otherwise, at least not without adding a bunch of parameters to flatMap and choosing those parameters appropriately when implementing filter etc in terms of flatMap - so even then it wouldn't exactly be "for free".

For example, what you're describing sounds like LINQ's concatMap. concatMap can be implemented in terms of flatMap, but not the inverse.

Since iterators are definitionally ordered sequences, we specifically do not want to provide a flatMap which can produce results in nondeterministic order. That's a thing which is sometimes useful to do, but as you say flatMap is the name reserved for the monadic bind operation, which on ordered sequences is the equivalent of ReactiveX's concatMap. (I don't know what "LINQ's concatMap" refers to.)

Nondeterministic flatMap isn't even a well-defined function unless you treat async iterators as being unordered (since it's, well, nondeterministic, rather than being a function in the mathematical sense).

I'd have to think a bit harder before I felt confident to say LINQ's (aka IAsyncEnumerable) original definition of flatMap is still correct for async-pull sequences with concurrent tasks... assuming such behavior is sound in all other ways, my gut says it is (or is very close).

I'm not entirely sure which definition you're talking about here. (Is there a concrete definition somewhere that isn't just "whatever dotnet does"?) The underlying primitive operation on IAsyncEnumerable, MoveNextAsync, does not allow concurrent pulling, so I don't know if the question can come up.

This sounds like a combination of sequential and concurrent flattening:

Sorry, I'm not following the point of this example.

If the point is to illustrate "do these things in concurrently, followed by these things concurrently", that's easy to do with the design in this proposal, though you do need to specify the degree of concurrency using the bufferAhead helper:

let listOfListsOfUrls = AsyncIterator.from([
  AsyncIterator.from(["http://foo.com", "http://bar.com"]),
  AsyncIterator.from(["http://baz.com", "http://qux.com"]),
]);

let results = await listOfListsOfUrls
  .flatMap(inner => inner.map(x => fetch(x)).bufferAhead(2)) // concurrency of 2 within each batch
  .toArray(); // [html for `foo`, html for `bar`, html for `baz`, html for `qux`]

// `toArray` pulls sequentially, not concurrently, so this is not concurrent across batches

And of course you get the results in precisely the order foo, bar, baz, qux.

I'm confused by your example, though - flatMap(x => fetch(x))(listOfUrls) seems like a type error unless fetch is itself returning a list. Shouldn't that just be map? Unless you're using flatMap specifically because map is not concurrent, but it seems like an abuse of flatMap to mean "map, but concurrent".

@rektide
Copy link

rektide commented Jun 7, 2023

p-map is probably one of the most well-used tools folks reach for when trying to concurrently map over iterables / async iterables, via the library's concurrency option. It does allow async mappers. It returns a promise for an array of results though, which is different than what's happening here, but I think it shows a very successful concurrent map.

It's a map function & results are returned in order. Under the hood it's doing reference counting to keep track of how many mappings are outstanding.

@bakkot
Copy link
Collaborator Author

bakkot commented Aug 3, 2023

Correction to the OP: rust's futures crate has a buffered helper on Streams of Futures (think a sync iterator of Promises), which does in fact work almost exactly like the bufferAhead helper proposed here. It's not quite the same thing, but it's very close.

@getify
Copy link

getify commented Sep 8, 2024

Apologies for a late entry here, but just to list another "prior art" JS library to take a look at, with how I handle limiting concurrency in those iteration helpers:

https://github.com/getify/fasy?tab=readme-ov-file#limiting-concurrency

// run `mapperFn()` fully concurrently over entire `list`
FA.concurrent.map( mapperFn, list )

// run `mapperFn()` concurrently, but in a pool size
// limited to max of 5; whenever one finishes,
// immediately pull and run another from `list`
// (note: equivalent to `concurrent(5,5)`)
FA.concurrent(5).map( mapperFn, list )

// run `mapperFn()` concurrently, pool size max
// of 5, but don't pull in more operations from
// `list` until the active count is *below* the
// active threshold of 3
FA.concurrent(5,3).map( mapperFn, list )

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants