Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PartGenerator streaming mode mixing parts content, or part content not properly delivered when downstream part prefetch is 0 #27743

Closed
djouvin opened this issue Nov 28, 2021 · 9 comments
Assignees
Labels
in: web Issues in web modules (web, webmvc, webflux, websocket) status: superseded An issue that has been superseded by another type: bug A general bug

Comments

@djouvin
Copy link

djouvin commented Nov 28, 2021

Affects: \5.2.x
Affects: \5.3.x

When trying to use the PartGenerator in steaming mode (streaming = true), the documentation states that part content should be consumed in an ordered and serial fashion. We still end up with a Flux<Part>, but of course we cannot use the flatMap operator to trigger part content consumption since it would break this requirement.

However, when using it, instead of raising an exception stating that parallel consumption of parts has been tried, the generator first silently mixes up the parts content (especially when very small parts are encountered, or when the part consumption is slower that the part reception/production), and a not explicit exception is finally thrown when trying to consume the ultimate or penultimate etc. (depending on how many parts have been silently skipped).

For example, if you have, say, a 20 KB part, then à 1 KB part and finally a 250 KB part, you will probably end up with the content of the 3rd part being sent as the 2nd part content, without notice, and when trying to consume the 3rd part, an exception will be thrown saying "Could not switch from STREAMING to STREAMING; current state: STREAMING".

If you try to use the concatMap operator, which is supposed to serialize mapper execution (and thus subscription to part content), the problems still appears because of the default prefetch of the operator (usually 32) : parts are still produced in advance, skipping small ones' content. So if you want to really serialize part content consumption, you have to use concatMap with no prefetch.

However, when doing so, you are faced with another problem (which is a bug) : no part's body content (i.e. part content's databuffer) are produced at all, even after subscription and unbounded request.

My analysis of the problem

This is due to the fact that the current implementation (through the requestToken() method) does not account for the correct downstream request (given by sink.requestedFromDownstream()) : only the downstream request from the parts' sink is accounted for, whereas when delivering part content, the content sink should be considered and not the part sink. Thus, two distinct requestToken() methods (or a parameterized requestToken(Sink sink)) should be defined, to distinguish between part demand and content demand : for example, we could define the requestContent() and requestPart() methods, with conditions pointing to different sinks. And the requestContent() method should probably be defined at the StreamingState inner class level, in order to point to the correct sink.

Another problem is that a new intermediate state should be defined : currenty, there is only one state for part chaining and content delivery, the StreamingState (that we could rename to ContentStreamingState). But a transitional state, that we could call PartStreamingState, should be defined to represent the fact that we are waiting for a new part to be requested from downstream, when all previous part content has been exhausted. The former would use requestContent() and the latter requestPart().

Also, the part number, and/or a flag stating that the part is already passed or not, should be defined in the StreamingState inner class, to be able to produce a correct error message when the serial consumption of parts and part contents is not honored by the downstream subscriber. Whenever a part content is subscribed or requested, the flag or part number should be checked to ensure the consumer is subscribing or requesting the correct part (the current one), an dnot a revious one : if the consumer requests or subscribes too late to a part that has already been streamed, an explicit error message, specifying the part numbers, should be raised, like "Trying to subscribe a part (number #) that has already been completed (current part: #)". Also, the StreamingState should display the part number in it's toString()method.

Finally, the 'ContentStreamingState' should also use an inner queue to deliver the content body, only when content is requested. Currently, when a data buffer comprises multiple parts, the parts content is delivered unconditionnally : it should be enqueued instead and delivered only when requested by part content subscriber.

Note. We could also consider defining a new StreamingPartGenerator class to isolate the steaming behavior from PartGenerator, since this is a quite different behavior in nature than the "normal' one (saving parts to disc or memory before serving them).

@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged or decided on label Nov 28, 2021
@jhoeller jhoeller added this to the Triage Queue milestone Nov 29, 2021
@jhoeller jhoeller added the in: web Issues in web modules (web, webmvc, webflux, websocket) label Nov 29, 2021
@poutsma poutsma self-assigned this Nov 29, 2021
@poutsma poutsma modified the milestones: Triage Queue, 6.0 M3 Dec 13, 2021
@poutsma
Copy link
Contributor

poutsma commented Dec 13, 2021

Resolving this complicated issue will take some time, with the possibility that we might need several iterations before getting it right. Scheduling for 6.0 M3 as a consequence.

@poutsma poutsma added type: bug A general bug and removed status: waiting-for-triage An issue we've not yet triaged or decided on labels Dec 13, 2021
@poutsma
Copy link
Contributor

poutsma commented Dec 13, 2021

To be clear: depending on the resolution, we might back port the solution to this issue to the 5.3.x branch once resolved.

@djouvin
Copy link
Author

djouvin commented Dec 13, 2021

I have a prototype implementation that works (in Groovy, but easily translatable in Java) : however, this impl is just managing the streaming case (not the non-streaming case).

@poutsma
Copy link
Contributor

poutsma commented Dec 13, 2021

Sounds good. Please file a PR if you have some code that could be useful for us, referring back to this issue in the PR.

@poutsma
Copy link
Contributor

poutsma commented Feb 4, 2022

We are considering to introduce a different way to handle streaming multipart support, because of inherent problems in the current model. See the description #28006. It would be great if you could share your thoughts in that issue.

@jomach
Copy link

jomach commented Mar 14, 2022

Hey, currently I need to build a upload endpoint that does not store data on this or holds the whole file in memory. This seems not to be possible right now due to this issue. When I activate streaming to true my controller never get's called.
I'm not an Flux expert but something is wrong on this part of the code:
PartGenerator.java:

private void newPart(State currentState, HttpHeaders headers) {
		if (isFormField(headers)) {
			changeStateInternal(new FormFieldState(headers));
			requestToken();
		}
		else if (!this.streaming) {
			changeStateInternal(new InMemoryState(headers));
			requestToken();
		}
		else {
			Flux<DataBuffer> streamingContent = Flux.create(contentSink -> {
				State newState = new StreamingState(contentSink);
				if (changeState(currentState, newState)) {
					contentSink.onRequest(l -> requestToken());
					requestToken();
				}
			});
			emitPart(DefaultParts.part(headers, streamingContent));
		}
	}

Better said I thing the issue is on the emiPart which holds to what @djouvin said.

@keyzj
Copy link

keyzj commented Mar 24, 2022

I have a prototype implementation that works (in Groovy, but easily translatable in Java) : however, this impl is just managing the streaming case (not the non-streaming case).

Hello! Could you please provide your prototype? I'd like to have a look :)

@djouvin
Copy link
Author

djouvin commented Mar 28, 2022

Here is the groovy code I used.
Note that to consume properly you should use concatMap with a 0 prefetch
StreamingPartGenerator.zip
.

@poutsma poutsma modified the milestones: 6.0.0-M4, 6.0.0-M5 May 9, 2022
@poutsma poutsma modified the milestones: 6.0.0-M5, 6.0.0-M6 Jul 11, 2022
@snicoll snicoll modified the milestones: 6.0.0-M6, 6.0.0-RC1 Sep 14, 2022
@poutsma
Copy link
Contributor

poutsma commented Oct 10, 2022

Closing this issue, because we have deprecated DefaultPartHttpMessageReader's streaming mode in 6.0 RC1, in favor of PartEvent and PartEventHttpMessageReader introduced through #28006. In short, the reason is that streaming mode put a lot of restrictions on its consumers, in terms of prefetch, but also other areas. See #28006 for more details, and also #29293.

@poutsma poutsma closed this as not planned Won't fix, can't repro, duplicate, stale Oct 10, 2022
@poutsma poutsma added the status: superseded An issue that has been superseded by another label Oct 10, 2022
@poutsma poutsma removed this from the 6.0.0-RC1 milestone Oct 10, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
in: web Issues in web modules (web, webmvc, webflux, websocket) status: superseded An issue that has been superseded by another type: bug A general bug
Projects
None yet
Development

No branches or pull requests

7 participants