-
Notifications
You must be signed in to change notification settings - Fork 47
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix netty buffer leak on short-circuit response #401
Conversation
caaec67
to
992566b
Compare
@@ -223,6 +223,7 @@ private void forwardShortCircuitResponse(ResponseHeaderData header, ApiMessage r | |||
"Attempt to respond with ApiMessage of type " + ApiKeys.forId(response.apiKey()) + " but request is of type " + decodedFrame.apiKey()); | |||
} | |||
DecodedResponseFrame<?> responseFrame = new DecodedResponseFrame<>(decodedFrame.apiVersion(), decodedFrame.correlationId(), header, response); | |||
decodedFrame.transferBuffers(responseFrame); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe these two lines would be better as decodedFrame.copy(header, response)
but it's back to Generics hell since DecodedResponseFrame<B extends ApiMessage>
has a generic type B for the ApiMessage, but here decodedFrame
is DecodedFrame<?, ?>
so we don't know the ApiMessage type it's dealing with. It could be done as in the other PR, remove the generic message type, and work in terms of ApiMessage with additional checks to ensure we only copy in the same concrete ApiMessage type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe best to get this fix in as is and look at refactoring the generics in another PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should get this in and refactor later.
I'm now wondering if we can also have a leak or unexpected behaviour if we use any netty buffers in the construction of a message that's used with I tried it experimentally and it seems to work okay, maybe because in the testing environment we don't have anything else trying to allocate buffers that could get allocated that released memory. @franz1981 am I right that that could be a problem? In miniature it would look like
I imagine this mean that something else could be allocated the same memory after the release. I've confirmed this with a little playing around like:
Which prints |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for looking into this @robobario.
Lets fix this now and clean things up later.
|
||
final LoggerContext ctx = (LoggerContext) LogManager.getContext(false); | ||
final Configuration config = ctx.getConfiguration(); | ||
appender = (NettyLeakLogAppender) config.getAppenders().get("NettyLeakLogAppender"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Urgh! This is both ingenious and nasty.
I think it still suffers from the fact the leak detector can only work on GC cycles, it detects things being freed with a positive reference count. Maybe after each should also request a GC cycle.
edit: I see there is an attempt to force GC in the test itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeahp a bit painful, it can possibly cause a non-deterministic failure in an unrelated test in KrpcFilterIT, but the leak trace should point you in the right direction. Better than a silent leak :(
integrationtests/src/test/java/io/kroxylicious/proxy/KrpcFilterIT.java
Outdated
Show resolved
Hide resolved
@@ -223,6 +223,7 @@ private void forwardShortCircuitResponse(ResponseHeaderData header, ApiMessage r | |||
"Attempt to respond with ApiMessage of type " + ApiKeys.forId(response.apiKey()) + " but request is of type " + decodedFrame.apiKey()); | |||
} | |||
DecodedResponseFrame<?> responseFrame = new DecodedResponseFrame<>(decodedFrame.apiVersion(), decodedFrame.correlationId(), header, response); | |||
decodedFrame.transferBuffers(responseFrame); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should get this in and refactor later.
@@ -223,6 +223,7 @@ private void forwardShortCircuitResponse(ResponseHeaderData header, ApiMessage r | |||
"Attempt to respond with ApiMessage of type " + ApiKeys.forId(response.apiKey()) + " but request is of type " + decodedFrame.apiKey()); | |||
} | |||
DecodedResponseFrame<?> responseFrame = new DecodedResponseFrame<>(decodedFrame.apiVersion(), decodedFrame.correlationId(), header, response); | |||
decodedFrame.transferBuffers(responseFrame); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you mentioned elsewhere why its this way round but it feels odd that we are transferring a responseFrame to a request frame. Naively I would expect us to keep the request frame around until we had finished with the response not the other way around.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's the other way around, I've renamed the method to decodedFrame.transferBuffersTo(responseFrame)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess part of what confused me so the pushing of buffers rather than pulling but I guess it has to be that way round for access
import io.netty.util.ResourceLeakDetector; | ||
|
||
@Plugin(name = "NettyLeakLogAppender", category = "Core", elementType = "appender", printObject = true) | ||
public class NettyLeakLogAppender extends AbstractAppender { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking out loud I wonder if its worth adding this to the junit extension so we can apply it to all the integration tests?
Its the sort of thing I think would be good to support filter authors in verifying as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the junit5 extension that right place? the extension doesn't actually know anything about netty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I did wonder about that. Maybe a separate extension in Kroxylicious test tools
Related the leak, yep, if nio buffer is stolen should be used temporarily and I have to verify if retaining the originating ByteBuf would help (because the NIO buffer is a view which assume the owner won't be shared, so probably not). kroxylicious/kroxylicious/src/main/java/io/kroxylicious/proxy/internal/util/ByteBufOutputStream.java Line 33 in 4b7a824
Said that, such uses should be temporarily and rely instead of ownership control of Netty buffers instead, when it makes sense (retain is a costy operation and calls should make evident when the ownership is fully transferred or shared, avoiding the retain in the former case) |
Netty ByteBuf uses explicit reference counting to control when they are released back to a pool. The buffer has to be released by someone before it is garbage collected or we have a leak. A leak was introduced when we added the ability to `forwardResponse` while handling a Request. If the filter allocates a buffer using KrpcFilterContext#createByteBufferOutputStream, then the buffer is added to the DecodedFrame associated with the context. Then it is assumed that this frame will be read or written to the netty channel, so that netty can call release on it, which releases the buffers on the frame. In the short-circuit response case we create a new frame and the buffers on the old frame are left to be garbage collected.
5bf3779
to
202d7b7
Compare
A leak was introduced when we added the ability to `forwardResponse` while handling a Request. If the filter allocates a buffer using KrpcFilterContext#createByteBufferOutputStream, then the buffer is added to the DecodedFrame associated with the context. Then it is assumed that this frame will be read or written to the netty channel, so that netty can call release on it, which releases the buffers on the frame. In the short-circuit response case we create a new frame and the buffers on the old frame are left to be garbage collected. With this solution we transfer the buffers to the new frame that is handed to netty.
Type of change
Description
Netty ByteBuf uses explicit reference counting to control when they are released back to a pool. The buffer has to be released by someone before it is garbage collected or we have a leak.
A leak was introduced when we added the ability to
forwardResponse
while handling a Request. If the filter allocates a buffer using KrpcFilterContext#createByteBufferOutputStream, then the buffer is added to the DecodedFrame associated with the context. Then it is assumed that this frame will be read or written to the netty channel, so that netty can call release on it, which releases the buffers on the frame. In the short-circuit response case we create a new frame and the buffers on the old frame are left to be garbage collected.Checklist
Please go through this checklist and make sure all applicable tasks have been done