Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add bufferUntil and bufferWhile with Supplier for the Collection to use #1925

Closed
rstoyanchev opened this issue Oct 21, 2019 · 4 comments
Closed
Labels
status/need-decision This needs a decision from the team type/enhancement A general enhancement
Milestone

Comments

@rstoyanchev
Copy link
Contributor

I want to use bufferUntil with a custom List<DataBuffer> to keep track and enforces a limit on the total number of bytes buffered. I could use something more custom for this, like handle but then I lose the ability to release cached items via doOnDiscard. As far as I can see there is currently no way to do "buffer until" with a custom custom Collection like it is possible with the collect operator.

@simonbasle
Copy link
Contributor

Coming back to this one, @rstoyanchev would the combination of windowUntil + collect work? Since the collect operator should now correctly discard as of #1924

@rstoyanchev
Copy link
Contributor Author

rstoyanchev commented Jan 13, 2020

It may be possible but I can't verify because the step before bufferUntil is concatMapIterable which produces a collection of allocated items and those don't seem to pass through doOnDiscard in case of a downstream error. Could there be an issue with concatMapIterable?

Here is a simplified test:

@Test
void concatMapIterableDoOnDiscardTest() {

	Foo foo1 = new Foo();
	Foo foo2 = new Foo();
	Foo foo3 = new Foo();

	Flux<Foo> source = Flux.just(1)
			.concatMapIterable(i -> Arrays.asList(foo1, foo2, foo3))
			.doOnDiscard(Foo.class, Foo::release);

	StepVerifier.create(source)
			.consumeNextWith(foo -> {
				foo.release();
			})
			.thenCancel()
			.verify();

	assertThat(foo1.getRefCount()).isEqualTo(0); // okay
	assertThat(foo2.getRefCount()).isEqualTo(0); // fails
	assertThat(foo3.getRefCount()).isEqualTo(0); // fails
}

static class Foo {

	int refCount = 1;

	public int getRefCount() {
		return this.refCount;
	}

	public void release() {
		this.refCount = 0;
	}
}

@simonbasle
Copy link
Contributor

@rstoyanchev is that still needed or have you found the suggested workaround is satisfactory?

@rstoyanchev
Copy link
Contributor Author

@simonbasle all good now with the related fix #2014.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status/need-decision This needs a decision from the team type/enhancement A general enhancement
Projects
None yet
Development

No branches or pull requests

3 participants