-
Notifications
You must be signed in to change notification settings - Fork 2.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Revert "Disable MutinyTest#testSSE for now as it's unstable on CI" #11727
Conversation
@jaikiran Thanks! So, it's a bit more complicated :-) In this test, the items are emitted on 2 threads which lead to a kind of race condition in how the items are written in RestEasy. It passes if we removed the So you want me to open a PR against your PR, or reopen a fresh one? |
@cescoffier just push the fix here if possible |
and who forgot to run the formatter before committing? I give you and hint... He is currently writing this comment. |
Don't worry, I fixed it and pushed the change a few seconds back :) |
The JDK 14 build failed with an error in the very same test. |
Hum... so it's not the reason... Something else is going on. |
working on a faster reproducer (https://github.com/cescoffier/quarkus/runs/1052595409?check_suite_focus=true).... obviously it passes when you disable the other tests. |
I haven't been able to get too much out of my investigations last weekend. But when I did check, what I noticed was when this test fails, the HTTP request that's made to |
Given that resteasy seems to be involved, I checked out resteasy upgrades in Quarkus. We upgraded from 4.5.5.Final to 4.5.6.Final on July 22nd. Comparing the commit differences in these 2 releases of resteasy[1], I don't see anything significant. There's one commit which does touch the "HTTP layer", but I can't yet see how that might impact SSEs. |
I didn't count, but I confirm the behavior seen by @jaikiran 👍
So, the client is re-connecting to the stream which restarts from the beginning (it's a cold stream so it's the expected behavior). The items are emitted, but the client does not get the item, probably due to the reconnection. |
@FroMage by any chance do you know what could trigger a reconnection on the client-side of the SSE? The client does not receive any items and does not get a failure, it just reconnects. |
It only reconnects if it gets an IO exception on the connection, in theory |
But SSE mandates reconnections, so if you have a cold stream it's not clear that this is your issue. What precisely is sent by the server and what does the client receive and expect? |
The client sends a few items, but generally, only send the first one and immediately get a reconnect, restarting the stream from the beginning. From my understand:
|
OK, well in that case we have to know what is the "something bad happens". |
Looking at this code https://github.com/resteasy/Resteasy/blob/4.5.6.Final/resteasy-client/src/main/java/org/jboss/resteasy/plugins/providers/sse/client/SseEventSourceImpl.java#L333, it may not necessarily be an IOException which triggers this reconnect. It looks like, if for whatever reason it fails to deserialize the |
Wouldn't you get |
Trying to get a bit more details. I was able to use my own implementation of |
The only way I can see that happening is |
Trying to substitution the ServerSentEventImpl with my own implementation to log more. |
(slightly OT - long term, I think it's time to see how we can ease the integration of Byteman https://byteman.jboss.org/ into all these different modes of Quarkus launched apps/tests) |
Did anyone try running with the previous RESTEasy version, if only to eliminate the upgrade as a problem? |
Continuing to make slow progress: Something is wrong when this is happening:
It's closing the SSE because the read chunk is
So something is wrong in the content sent by the server. |
This goes via @Override
public SseEventInputImpl readFrom(Class<SseEventInputImpl> cls, Type type, Annotation[] annotations,
MediaType mediaType, MultivaluedMap<String, String> httpHeaders, InputStream entityStream) throws IOException,
WebApplicationException
{
MediaType streamType = mediaType;
if (mediaType.getParameters() != null)
{
Map<String, String> map = mediaType.getParameters();
String elementType = map.get(SseConstants.SSE_ELEMENT_MEDIA_TYPE);
if (elementType != null)
{
mediaType = MediaType.valueOf(elementType);
}
}
return new SseEventInputImpl(annotations, streamType, mediaType, httpHeaders, entityStream);
} Which doesn't do anything: public SseEventInputImpl(final Annotation[] annotations, final MediaType streamType, final MediaType elementType,
final MultivaluedMap<String, String> httpHeaders, final InputStream inputStream)
{
this.annotations = annotations;
this.mediaType = elementType;
this.httpHeaders = httpHeaders;
this.inputStream = inputStream;
this.textLike = MediaTypeHelper.isTextLike(streamType);
this.escape = streamType != null && streamType.toString().startsWith("application/x-stream-general");
} But there are these methods, that I presume are called: public InboundSseEvent read() throws IOException
{
byte[] chunk = null;
try
{
lastFieldWasData = false;
chunk = readEvent(inputStream);
if (chunk == null)
{
close();
return null;
}
}
catch (IOException e1)
{
try
{
close();
}
catch (IOException e)
{
//TODO: add a log message
}
throw e1;
}
final ByteArrayInputStream entityStream = new ByteArrayInputStream(chunk);
final ByteArrayOutputStream temSave = new ByteArrayOutputStream();
Charset charset = StandardCharsets.UTF_8;
if (mediaType != null && mediaType.getParameters().get(MediaType.CHARSET_PARAMETER) != null)
{
charset = Charset.forName(mediaType.getParameters().get(MediaType.CHARSET_PARAMETER));
}
final InboundSseEventImpl.Builder eventBuilder = new InboundSseEventImpl.Builder(annotations, mediaType,
httpHeaders);
//TODO: Look at if this can be improved
int b = -1;
SseConstants.EVENT currentState = SseConstants.EVENT.START;
while ((b = entityStream.read()) != -1)
{
try
{
if (currentState == SseConstants.EVENT.START)
{
if (b == '\r' || b == '\n')
{
continue;
}
if (b == ':')
{
currentState = SseConstants.EVENT.COMMENT;
continue;
}
else
{
temSave.write(b);
currentState = SseConstants.EVENT.FIELD;
continue;
}
}
if (currentState == SseConstants.EVENT.COMMENT)
{
b = readLine(entityStream, '\n', temSave);
String commentLine = temSave.toString(charset.name());
eventBuilder.commentLine(commentLine);
temSave.reset();
currentState = SseConstants.EVENT.START;
continue;
}
if (currentState == SseConstants.EVENT.FIELD)
{
temSave.write(b);
b = readLine(entityStream, ':', temSave);
String fieldName = temSave.toString(StandardCharsets.UTF_8.name());
temSave.reset();
if (b == ':')
{
//spec says there is space after colon
do
{
b = entityStream.read();
}
while (b == ' ');
if (b != '\n' && b != '\r' && b != -1)
{
temSave.write(b);
b = readLine(entityStream, '\n', temSave);
}
}
processField(eventBuilder, fieldName, mediaType, temSave.toByteArray());
temSave.reset();
currentState = SseConstants.EVENT.START;
continue;
}
}
catch (IOException e)
{
throw new IOException(Messages.MESSAGES.readEventException(), e);
}
}
InboundSseEventImpl event = (InboundSseEventImpl) eventBuilder.build();
if (this.providers != null)
{
event.setProvider(this.providers);
}
return event;
} Which I'm not sure how to make sense of, since it tries to call public byte[] readEvent(final InputStream in) throws IOException
{
@SuppressWarnings("resource")
EventByteArrayOutputStream buffer = new EventByteArrayOutputStream();
int data;
int pos = 0;
boolean boundary = false;
byte[] eolBuffer = new byte[5];
while ((data = in.read()) != -1)
{
byte b = (byte) data;
if (!textLike && b == '\\')
{
buffer.write(b);
b = (byte) in.read();
}
else
{
if (b == '\r' || b == '\n')
{
eolBuffer[pos] = b;
//if it meets \r\r , \n\n , \r\n\r\n or \n\r\n\r\n
if ((pos > 0 && eolBuffer[pos] == eolBuffer[pos - 1])
|| (pos >= 3 && new String(eolBuffer, 0, pos, StandardCharsets.UTF_8).contains(DELIMITER)))
{
boundary = true;
}
//take it a boundary if there are 5 unexpected eols
if (pos++ > 4)
{
boundary = true;
}
}
else
{
pos = 0;
}
}
buffer.write(b);
if (boundary && buffer.size() > pos)
{
return buffer.getEventPayLoad();
}
//if it's emtpy
if (boundary && buffer.size() == pos)
{
pos = 0;
boundary = false;
buffer.reset();
continue;
}
}
return null;
} So yeah, that's where you'll have to look at the bytes being looked at. |
But this class doesn't appear to have changed at all recently. |
@FroMage exactly, it's where I'm looking at the moment. |
It's not in your call directly, but is there any chance it's running as part of the test for some reason? What if you disable these endpoints and the test that calls them? I'd like to rule out the accidental use of a client context. |
Let me try that right now. |
Also, print It looks like the only way to get a |
@FroMage same failure:
|
OK, we found the culprits then: f65ee2b added: ResteasyProviderFactory.setInstance(clientProviderFactory); Which just breaks all server calls that expect this to be a server client factory. |
That's NASTY. |
BTW I think it's pretty nuts to have two Good luck fixing that, BTW, it won't be trivial. |
Hello @FroMage, does that also explain why this test passes sometimes but fails on other occasions? Or are we still missing any other links? |
@radcortez @kenfinnigan How can we remove that call? |
How can it explain that it sometimes fails. I guess it could depend on when you end up executing that client recorder stuff. It's not clear to me, honestly. But it's still wrong. |
Looks like I was a visionary: #11448 (review) :) |
If it's a problem I have nothing against reverting this PR until we can get a proper fix. |
Haha, well spotted, you should have called their bluff though ;) |
@cescoffier, I now have something that might shed some light on why this is inconsistent. I just need your setup where this is happening consistently. I plan to push a certain change there. Is that OK? I will make sure I copy over your current branch locally before messing up that branch and pushign my commits. |
@jaikiran go ahead. I'm not going to work on that for the rest of the day. Feel free to remove my hundreds of traces. |
So, I managed to get Byteman integrated (in a hackish way for now) against this test to see what's going on. @FroMage had anyway already solved the mystery of the root cause, so it helped me come up with specific Byteman scripts[1] to figure why this fails only on certain occasions. Here's what it shows up. When this test succeeds, the
On the other hand, when the test fails, the order of these recorders is reversed. So the
So the transient nature of this failure is due to non-determinstic execution ordering of the recorders. [1] https://github.com/jaikiran/quarkus/commits/mutiny-sse-enable-byteman P.S: I'll see if I can make this Byteman integration a bit more seamless and easier to have to usable by some kind of simple switch for solving issues like these. |
Replaced by #11828 |
@jaikiran @FroMage @cescoffier @kenfinnigan , sorry for the late feedback here, I've been on PTO and it took time to process the backlog once back. The exception above from the NOOPServerHelper is indeed caused by a ResteasyProviderFactory instance meant for client side usage being actually used on server side. And f65ee2b is really problematic, you should really be careful with ResteasyProviderFactory.setInstance(). Actually, ResteasyProviderFactory.getInstance() is risky too as it allows incurring in problems like this and unfortunately it's often used in resteasy as it's convenient... Having said this, I don't personally like the design of ResteasyProviderFactory that much (I've seen other failures because of lack of proper isolation of instances), but improving it is a hard task. I tried something back in 2019, but didn't manage to have enough time to focus on it and finish it (resteasy/resteasy#2240). |
@asoldano asoldano/Resteasy@3b7b8c4 looks indeed like the right thing to do. |
@asoldano There is no other alternative to setting the If there's another way to achieve what is needed, explicitly defining the provider factory to control what providers are loaded, without using a static method/field, then please do so. But I'd appreciate not blocking this approach without an alternative |
IMO the two issues are orthogonal. @asoldano 's fix is the proper fix for RESTEasy, because it ensures the server's providers are used by its SSE system. With that fix, the order in which you set the The question of being able to configure |
To follow up on this, Ron should have merged Ken's PR. |
In #11693 we disabled the
MutinyTest#testSSE
due to CI issues. Now that CI has stabilized, re-enabling this to see if there are any genuine issues.Fixes #11687