Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IllegalArgumentException when trying to zip an iterable of several FluxJust-s #338

Closed
kirmerzlikin opened this issue Jan 4, 2017 · 10 comments
Labels
type/bug A general bug warn/api-change Breaking change with compilation errors
Milestone

Comments

@kirmerzlikin
Copy link

@Test
public void zipFluxJustIterable() {
	Flux<Integer> flux1 = Flux.just(1);
	Flux<Integer> flux2 = Flux.just(2);
	Flux<Tuple2> zipped = Flux.zip(Arrays.asList(flux1, flux2));
	StepVerifier.create(zipped)
			.expectNextCount(1)
			.expectComplete()
			.verify();
}

This test fails due to the IllegalArgumentException, thrown by a FluxZip::handleBoth.

Exception is caused by two factors:

  • zipper function, used in a handleBoth method, is a Tuples::fromArray method, which can work only with arrays of length 1-7
  • array of scalars, retrieved from FluxJust-s, is created initially with length of 8
@kirmerzlikin
Copy link
Author

Here's stacktrace

java.lang.AssertionError: expected: count = 1; actual: produced = 0; signal: onError(java.lang.IllegalArgumentException: too many arguments (8), 8 maximum supported)

	at reactor.test.DefaultStepVerifierBuilder.fail(DefaultStepVerifierBuilder.java:1324)
	at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.checkCountMismatch(DefaultStepVerifierBuilder.java:751)
	at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onSignalCount(DefaultStepVerifierBuilder.java:916)
	at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onExpectation(DefaultStepVerifierBuilder.java:804)
	at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onError(DefaultStepVerifierBuilder.java:638)
	at reactor.core.publisher.FluxZip.handleBoth(FluxZip.java:270)
	at reactor.core.publisher.FluxZip.handleIterableMode(FluxZip.java:199)
	at reactor.core.publisher.FluxZip.subscribe(FluxZip.java:128)
	at reactor.test.DefaultStepVerifierBuilder$DefaultStepVerifier.verify(DefaultStepVerifierBuilder.java:474)
	at reactor.test.DefaultStepVerifierBuilder$DefaultStepVerifier.verify(DefaultStepVerifierBuilder.java:450)
	at io.pivotal.literx.Part08OtherOperations.zipFluxJustIterable(Part08OtherOperations.java:163)

@smaldini smaldini added the type/bug A general bug label Jan 4, 2017
@smaldini smaldini added this to the 3.0.5.RELEASE milestone Jan 4, 2017
kirmerzlikin pushed a commit to kirmerzlikin/reactor-core that referenced this issue Jan 4, 2017
… FluxJust-s

Make Tuples create Tuple8 from array of length 8
@simonbasle
Copy link
Member

simonbasle commented Jan 4, 2017

there's a couple of issues there, that render that signature effectively unusable :( The shortest fix is to indeed cover the case 8 in Tuples#fromArray.

but keep in mind that Tuple are only meant to grow up to a size of 8, and they don't offer much value if they cannot be generified... So the recommended workaround would be to use zip(flux1, flux2).

Even if we fix the root cause in fromArray, zip(Iterable) would still have the limitation of a cardinality of 8. Past that limit, you're better off using your own combinator and type (because you probably want some semantics attached to these 9+ values).

As that signature was never actually usable, that makes us wonder if that signature should be removed altogether, actually

@kirmerzlikin
Copy link
Author

kirmerzlikin commented Jan 4, 2017

@simonbasle,
I agree, that for this specific case it is possible to use zip with defined number of arguments. But sometimes it could be useful not to know how much fluxes you are dealing with, and in such cases zip that receives Iterable is the solution.
Also for now it really isn't clear enough that zip(Iterable<? extends Publisher<?>> sources) is limited by the maximum Tuple size , but I think, it should at least handle all different tuples

@simonbasle
Copy link
Member

actually, even if you don't know the number of flux, that signature isn't even that useful. It returns a Tuple2, which means that to access the values, you'd have to cast to eg. Tuple8 (and thus know the cardinality)... The only way it can be helpful is by using it as an Iterable<Object>...

@kirmerzlikin
Copy link
Author

@simonbasle , you're right
Looks like this method isn't easy to use at all

off-topic: is it possible that FluxPublishOnTest::normalFilteredBackpressured test is unstable?
It fails in the job 397 for #340, but I don't see, how my changes affect this functionality, also I can't make it fail locally.

@simonbasle
Copy link
Member

@kirmerzlikin: yeah, not very useful. we are considering 2 options at this point:

  1. remove it altogether
  2. replace it with a differently opinionated combinator, namely returning a Flux<List<Object>> rather than a Flux<Tuple2>. That way it would be truly unbounded, would be at least as usable as the Tuple2 version (since you can iterate over the list all the same).

That alternative solution still is impacted by the current issue though: FluxZip#handleIterableMode initializes an array of size 8, which means that using the Arrays::asList combinator for instance would incorrectly return a List of size 8 when we have between 1 and 7 publishers -_-

on your side note, yes some tests are unfortunately still a bit unstable :(

@smaldini
Copy link
Contributor

smaldini commented Jan 4, 2017

The CI sometimes runs on timeout in Travis (unlike our bamboo builds) but we have an ongoing story to separate consistency jobs that send millions of events like this one from operator testing.

@kirmerzlikin
Copy link
Author

kirmerzlikin commented Jan 4, 2017

@simonbasle,
In my home opinion, there are not much cases, where you might need many (>6) "parallel" fluxes and even less cases, where you might need to pack its results in generic tuple structure. So first option looks more reasonable.

Also, considering mentioned options, existing test unstability and this comment, I think that I should close my PR and just wait for 3.0.5.RELEASE (-_-) zzz

@smaldini
Copy link
Contributor

smaldini commented Jan 4, 2017

I concur @kirmerzlikin , we just want to keep it simple and symmetric and consistent, other indeterministic zip operate with a combinator because beyond 6 you just want to quickly materialize the result in your own POJO or risk weak semantic on the passed signal (e.g. finding out what's getT1/getT2.../getT8 when you are downstream). We might want to deprecate it and Tuple8 but not remove them all together just now before 3.1 to avoid any binary compatibility issue.

@smaldini
Copy link
Contributor

smaldini commented Jan 4, 2017

Separately I'd like to propose a Tuple2#flatten method that would recursively flatten tuple inside tuple in a final Object[]. This might be useful to the user doing:
flux.zipWith(flux2).zipWith(flux3).map(Tuple2::flatten).map(combinator).subscribe()

simonbasle added a commit that referenced this issue Jan 4, 2017
Size 8 to Tuple8 should have been acceptable but wasn't covered.
Size 0 was rejected but with misleading message.

Added special message and shortcircuit for null or empty arrays.
Made oversize error message more explicit.

Added tests to TupleTests for these different cases.
simonbasle added a commit that referenced this issue Jan 4, 2017
Flux.zip(Iterable) is a clunky API: as it combines to Tuples, it limits
the number of acceptable sources to 1-8. Furthermore, the Tuple2 return
type means that you either need to know the number of sources in order
to cast to the relevant TupleN (in which case you should instead use
zip(source1, source2, ...)) OR you can only use the tuple as Iterable.

In all cases, combining to a List would be more relevant, and usage of
a custom combinator or of the fixed sources alternatives is to be
preferred from now on.

Using a custom combinator like Arrays::asList is susceptible to a bug
if the number of sources is below 8 though: the scalars Object array is
initialized to a length of 8 in any case, and grows beyond if needed,
but it is not trimmed in FluxZip#handleIterableMode if we have less than
8 sources. This is now fixed and tested.
simonbasle added a commit that referenced this issue Jan 6, 2017
Size 8 to Tuple8 should have been acceptable but wasn't covered.
Size 0 was rejected but with misleading message.

Added special message and shortcircuit for null or empty arrays.
Made oversize error message more explicit.

Added tests to TupleTests for these different cases.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug A general bug warn/api-change Breaking change with compilation errors
Projects
None yet
Development

No branches or pull requests

3 participants