-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][broker] Fix multiple transfer corruption issues when TLS is enabled #22760
Conversation
de75e14
to
1ffda4c
Compare
On Netty side, PooledByteBuf.getBytes was made thread safe by making a copy of the internalByteBuffer with netty/netty#9120 changes. However this change is not reflected in the ReadOnlyByteBufferBuf implementation. (this doesn't have an impact on this PR, just sharing the observation) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
great work
// If the buffer is already read-only, .asReadOnly() will return the same buffer. | ||
// That's why the additional .retainedDuplicate() is needed to ensure that the returned buffer | ||
// has independent readIndex and writeIndex. | ||
return buf.asReadOnly().retainedDuplicate(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private static ByteBuf readOnlyRetainedDuplicate(ByteBuf buf) {
if (buf == null || buf.readableBytes() <= 0) {
return Unpooled.EMPTY_BUFFER;
}
if (buf.isReadOnly()) {
return buf.retainedDuplicate().asReadOnly();
}
return buf.asReadOnly().retain();
}
Is this should be better? I think we don't need to call duplicate()
after call asReadOnly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we don't need to call
duplicate()
after callasReadOnly
It is needed. The comment explains it. please check the comment for the reason.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean if the input buf is not readOnly
, asReadOnly
will create a new instance, so we just need to retain()
after asReadOnly
.
It also has independent read/write index.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (buf == null || buf.readableBytes() <= 0) {
return Unpooled.EMPTY_BUFFER;
}
I don't think that nulls should be tolerated when nulls shouldn't be passed as input. For the readableBytes() == 0 optimization, is that needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just defensive programming, if we are sure that the input buf cannot be null, this line can be removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I checked the source code, it looks a little strange that I didn't understand at the first time.
private static boolean attemptCopyToCumulation(ByteBuf cumulation, ByteBuf next, int wrapDataSize) {
final int inReadableBytes = next.readableBytes();
final int cumulationCapacity = cumulation.capacity();
if (wrapDataSize - cumulation.readableBytes() >= inReadableBytes &&
// Avoid using the same buffer if next's data would make cumulation exceed the wrapDataSize.
// Only copy if there is enough space available and the capacity is large enough, and attempt to
// resize if the capacity is small.
(cumulation.isWritable(inReadableBytes) && cumulationCapacity >= wrapDataSize ||
cumulationCapacity < wrapDataSize &&
ensureWritableSuccess(cumulation.ensureWritable(inReadableBytes, false)))) {
cumulation.writeBytes(next);
next.release();
return true;
}
return false;
}
We should return false immediately if cumulation
is not writable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This approach is currently blocked by another Netty bug where SslHandler doesn't support a read only buffer. That seems to be the reason why a deep copy is currently required. Fixing that issue in netty/netty#14071.
Yes, the fix looks good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should return false immediately if cumulation is not writable
In this case, isWritable returns true for a buffer for a wrapped read only buffer. This is surprising and the reason why this read only buffers aren't supported if there's another wrapper. It correctly returns true for isReadOnly.
One possible workaround could be to use Unpooled.unmodifiableBuffer
to add the readonly wrapper so that it's always the "top most" wrapper. That would also avoid the need for the extra duplicate()
wrapper.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case, isWritable returns true for a buffer for a wrapped read only buffer.
It sounds very strange, can you please point me where?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It sounds very strange, can you please point me where?
Yes, it's very surprising behavior. You can try it out with a debugger by running for example TlsProducerConsumerTest and checking what happens in the attemptCopyToCumulation if the .asReadOnly().retainedDuplicate()
solution is used.
The io.netty.buffer.AbstractByteBuf#isWritable(int) method gets called: https://github.com/netty/netty/blob/70d6a3f40d7e8fd3f5ced7600ed209c58944f673/buffer/src/main/java/io/netty/buffer/AbstractByteBuf.java#L171-L174
It will evaluate to true since it's implementation doesn't check for isReadOnly()
:
@Override
public boolean isWritable(int numBytes) {
return capacity() - writerIndex >= numBytes;
}
@dao-jun Please share that as a comment instead. This is my PR and not yours. :) I'll update the description based on the feedback. |
Just add additional context to help understand it quickly. CopyingEncoder is used to handle the case of However, The solution is pass a private static boolean attemptCopyToCumulation(ByteBuf cumulation, ByteBuf next, int wrapDataSize) {
final int inReadableBytes = next.readableBytes();
final int cumulationCapacity = cumulation.capacity();
if (wrapDataSize - cumulation.readableBytes() >= inReadableBytes &&
// Avoid using the same buffer if next's data would make cumulation exceed the wrapDataSize.
// Only copy if there is enough space available and the capacity is large enough, and attempt to
// resize if the capacity is small.
(cumulation.isWritable(inReadableBytes) && cumulationCapacity >= wrapDataSize ||
cumulationCapacity < wrapDataSize &&
ensureWritableSuccess(cumulation.ensureWritable(inReadableBytes, false)))) {
cumulation.writeBytes(next);
next.release();
return true;
}
return false;
} it should return false immediately if |
Yes, of cause, my bad, since I didn't explain myself correctly. |
Thanks for the useful summary @dao-jun. I'll revisit the description of this PR later once this comes to a conclusion. |
there might be a workaround (explained in #22760 (comment)). will test that. |
I think I'll need to add a repro test to the Pulsar code base. While testing the recent changes, I can see that the problem occurs even when there's the read only wrapper. |
There are also other bugs in this area. When TLS is enabled between Broker and Bookies, the Bookkeeper V3 protocol is used. |
Closing this PR since the transfer corruption issues will be fixed by changes in Netty 4.1.111.Final (netty/netty#14072, netty/netty#14071, netty/netty#14076 and netty/netty#14078) and Bookkeeper 4.16.6 (apache/bookkeeper#4289 and apache/bookkeeper#4293). |
Fix in Bookkeeper is apache/bookkeeper#4404 |
UPDATE: This PR has been replaced by #22810 . |
UPDATE: This PR has been replaced by #22810
Fixes #22601 #21892 #19460
Motivation
In Pulsar, there are multiple reported issues where the transferred output gets corrupted and fails with exceptions around invalid reader and writer index. One source of these issues are the ones which occur only when TLS is enabled.
I found these Netty issues that provide a lot of context:
It seems that this is a long time issue in Netty and it has been partially fixed. However, it's not fixed for many locations in the Netty code base and it's not safe to share ByteBuf instances in all cases.
In Pulsar, the sharing of ByteBuf instance happens in this case at least via the broker cache (RangeEntryCacheManagerImpl) and the pending reads manager (PendingReadsManager).
The SslHandler related issue was originally reported in Pulsar in 2018 with #2401 . The fix that time was #2464.
The ByteBuf
.copy()
method was used to copy the ByteBuf. The problem with this change is that.copy()
itself isn't thread safe and accesses the internalNioBuffer instance directly.This happens at least when the ByteBuf instance contains a ReadOnlyByteBufferBuf wrapper. This can be seen in the code https://github.com/netty/netty/blob/243de91df2e9a9bf0ad938f54f76063c14ba6e3d/buffer/src/main/java/io/netty/buffer/ReadOnlyByteBufferBuf.java#L412-L433 .
As a result of this, exceptions such as these ones occur:
It is likely that
Failed to peek sticky key from the message metadata java.lang.IllegalArgumentException: Invalid unknonwn tag type: 4
exceptions are also caused by the same root cause.java.lang.IndexOutOfBoundsException: readerIndex: 31215, writerIndex: 21324 (expected: 0 <= readerIndex <= writerIndex <= capacity(65536))
type of exceptions on the broker side are possibly caused by the same root cause as well.The root cause of such exceptions could also be different. A shared Netty ByteBuf must have at least have an independent view created with
duplicate
,slice
orretainedDuplicate
if the readerIndex is mutated.The ByteBuf instance must also be properly shared in a thread safe way. Failing to do that could result in similar symptoms and this PR doesn't fix that.
Modifications
Documentation
doc
doc-required
doc-not-needed
doc-complete