Skip to content

Commit

Permalink
Restore old behavior of NettyAdaptiveCumulator, but avoid using that …
Browse files Browse the repository at this point in the history
…class if Netty is on version 4.1.111 or later. (#11367) (#11369)
  • Loading branch information
larry-safran authored Jul 10, 2024
1 parent c2a3ed3 commit 97aa34f
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.google.common.base.Preconditions.checkState;

import com.google.common.annotations.VisibleForTesting;
import io.grpc.Attributes;
import io.grpc.ChannelLogger;
import io.grpc.Internal;
Expand All @@ -27,6 +28,7 @@
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
import io.netty.handler.codec.http2.Http2ConnectionHandler;
import io.netty.handler.codec.http2.Http2Settings;
import io.netty.util.Version;
import javax.annotation.Nullable;

/**
Expand All @@ -41,6 +43,30 @@ public abstract class GrpcHttp2ConnectionHandler extends Http2ConnectionHandler
@Nullable
protected final ChannelPromise channelUnused;
private final ChannelLogger negotiationLogger;
private static final boolean usingPre4_1_111_Netty;

static {
// Netty 4.1.111 introduced a change in the behavior of duplicate() method
// that breaks the assumption of the cumulator. We need to detect this version
// and adjust the behavior accordingly.

boolean identifiedOldVersion = false;
try {
Version version = Version.identify().get("netty-buffer");
if (version != null) {
String[] split = version.artifactVersion().split("\\.");
if (split.length >= 3
&& Integer.parseInt(split[0]) == 4
&& Integer.parseInt(split[1]) <= 1
&& Integer.parseInt(split[2]) < 111) {
identifiedOldVersion = true;
}
}
} catch (Exception e) {
// Ignore, we'll assume it's a new version.
}
usingPre4_1_111_Netty = identifiedOldVersion;
}

protected GrpcHttp2ConnectionHandler(
ChannelPromise channelUnused,
Expand All @@ -51,7 +77,16 @@ protected GrpcHttp2ConnectionHandler(
super(decoder, encoder, initialSettings);
this.channelUnused = channelUnused;
this.negotiationLogger = negotiationLogger;
setCumulator(ADAPTIVE_CUMULATOR);
if (usingPre4_1_111_Netty()) {
// We need to use the adaptive cumulator only if we're using a version of Netty that
// doesn't have the behavior that breaks it.
setCumulator(ADAPTIVE_CUMULATOR);
}
}

@VisibleForTesting
static boolean usingPre4_1_111_Netty() {
return usingPre4_1_111_Netty;
}

/**
Expand Down
50 changes: 26 additions & 24 deletions netty/src/main/java/io/grpc/netty/NettyAdaptiveCumulator.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,6 @@
/**
* "Adaptive" cumulator: cumulate {@link ByteBuf}s by dynamically switching between merge and
* compose strategies.
* <br><br>
*
* <p><b><font color="red">Avoid using</font></b>
* {@link CompositeByteBuf#addFlattenedComponents(boolean, ByteBuf)} as it can lead
* to corruption, where the components' readable area are not equal to the Composite's capacity
* (see https://github.com/netty/netty/issues/12844).
*/

class NettyAdaptiveCumulator implements Cumulator {
Expand Down Expand Up @@ -95,7 +89,8 @@ public final ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBu
composite.capacity(composite.writerIndex());
}
} else {
composite = alloc.compositeBuffer(Integer.MAX_VALUE).addComponent(true, cumulation);
composite = alloc.compositeBuffer(Integer.MAX_VALUE)
.addFlattenedComponents(true, cumulation);
}
addInput(alloc, composite, in);
in = null;
Expand All @@ -115,7 +110,7 @@ public final ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBu
@VisibleForTesting
void addInput(ByteBufAllocator alloc, CompositeByteBuf composite, ByteBuf in) {
if (shouldCompose(composite, in, composeMinSize)) {
composite.addComponent(true, in);
composite.addFlattenedComponents(true, in);
} else {
// The total size of the new data and the last component are below the threshold. Merge them.
mergeWithCompositeTail(alloc, composite, in);
Expand Down Expand Up @@ -161,13 +156,32 @@ static void mergeWithCompositeTail(
ByteBuf tail = composite.component(tailComponentIndex);
ByteBuf newTail = null;
try {
if (tail.refCnt() == 1 && !tail.isReadOnly() && newTailSize <= tail.maxCapacity()
&& !isCompositeOrWrappedComposite(tail)) {
if (tail.refCnt() == 1 && !tail.isReadOnly() && newTailSize <= tail.maxCapacity()) {
// Ideal case: the tail isn't shared, and can be expanded to the required capacity.

// Take ownership of the tail.
newTail = tail.retain();

// TODO(https://github.com/netty/netty/issues/12844): remove when we use Netty with
// the issue fixed.
// In certain cases, removing the CompositeByteBuf component, and then adding it back
// isn't idempotent. An example is provided in https://github.com/netty/netty/issues/12844.
// This happens because the buffer returned by composite.component() has out-of-sync
// indexes. Under the hood the CompositeByteBuf returns a duplicate() of the underlying
// buffer, but doesn't set the indexes.
//
// To get the right indexes we use the fact that composite.internalComponent() returns
// the slice() into the readable portion of the underlying buffer.
// We use this implementation detail (internalComponent() returning a *SlicedByteBuf),
// and combine it with the fact that SlicedByteBuf duplicates have their indexes
// adjusted so they correspond to the to the readable portion of the slice.
//
// Hence composite.internalComponent().duplicate() returns a buffer with the
// indexes that should've been on the composite.component() in the first place.
// Until the issue is fixed, we manually adjust the indexes of the removed component.
ByteBuf sliceDuplicate = composite.internalComponent(tailComponentIndex).duplicate();
newTail.setIndex(sliceDuplicate.readerIndex(), sliceDuplicate.writerIndex());

/*
* The tail is a readable non-composite buffer, so writeBytes() handles everything for us.
*
Expand All @@ -183,11 +197,7 @@ static void mergeWithCompositeTail(
newTail.writeBytes(in);

} else {
// The tail satisfies one or more criteria:
// - Shared
// - Not expandable
// - Composite
// - Wrapped Composite
// The tail is shared, or not expandable. Replace it with a new buffer of desired capacity.
newTail = alloc.buffer(alloc.calculateNewCapacity(newTailSize, Integer.MAX_VALUE));
newTail.setBytes(0, composite, tailStart, tailSize)
.setBytes(tailSize, in, in.readerIndex(), inputSize)
Expand All @@ -200,7 +210,7 @@ static void mergeWithCompositeTail(
// Remove the old tail, reset writer index.
composite.removeComponent(tailComponentIndex).setIndex(0, tailStart);
// Add back the new tail.
composite.addComponent(true, newTail);
composite.addFlattenedComponents(true, newTail);
// New tail's ownership transferred to the composite buf.
newTail = null;
composite.readerIndex(prevReader);
Expand All @@ -215,12 +225,4 @@ static void mergeWithCompositeTail(
}
}
}

private static boolean isCompositeOrWrappedComposite(ByteBuf tail) {
ByteBuf cur = tail;
while (cur.unwrap() != null) {
cur = cur.unwrap();
}
return cur instanceof CompositeByteBuf;
}
}
65 changes: 30 additions & 35 deletions netty/src/test/java/io/grpc/netty/NettyAdaptiveCumulatorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void setUp() {
@Override
void addInput(ByteBufAllocator alloc, CompositeByteBuf composite, ByteBuf in) {
// To limit the testing scope to NettyAdaptiveCumulator.cumulate(), always compose
composite.addComponent(true, in);
composite.addFlattenedComponents(true, in);
}
};

Expand Down Expand Up @@ -122,7 +122,7 @@ public void cumulate_contiguousCumulation_newCompositeFromContiguousAndInput() {

@Test
public void cumulate_compositeCumulation_inputAppendedAsANewComponent() {
CompositeByteBuf composite = alloc.compositeBuffer().addComponent(true, contiguous);
CompositeByteBuf composite = alloc.compositeBuffer().addFlattenedComponents(true, contiguous);
assertSame(composite, cumulator.cumulate(alloc, composite, in));
assertEquals(DATA_INITIAL, composite.component(0).toString(US_ASCII));
assertEquals(DATA_INCOMING, composite.component(1).toString(US_ASCII));
Expand All @@ -136,7 +136,7 @@ public void cumulate_compositeCumulation_inputAppendedAsANewComponent() {

@Test
public void cumulate_compositeCumulation_inputReleasedOnError() {
CompositeByteBuf composite = alloc.compositeBuffer().addComponent(true, contiguous);
CompositeByteBuf composite = alloc.compositeBuffer().addFlattenedComponents(true, contiguous);
try {
throwingCumulator.cumulate(alloc, composite, in);
fail("Cumulator didn't throw");
Expand Down Expand Up @@ -208,8 +208,8 @@ public void setUp() {
in = ByteBufUtil.writeAscii(alloc, inData);
tail = ByteBufUtil.writeAscii(alloc, tailData);
composite = alloc.compositeBuffer(Integer.MAX_VALUE);
// Note that addComponent() will not add a new component when tail is not readable.
composite.addComponent(true, tail);
// Note that addFlattenedComponents() will not add a new component when tail is not readable.
composite.addFlattenedComponents(true, tail);
}

@After
Expand Down Expand Up @@ -345,7 +345,7 @@ public void mergeWithCompositeTail_tailExpandable_write() {
assertThat(in.readableBytes()).isAtMost(tail.writableBytes());

// All fits, so tail capacity must stay the same.
composite.addComponent(true, tail);
composite.addFlattenedComponents(true, tail);
assertTailExpanded(EXPECTED_TAIL_DATA, fitCapacity);
}

Expand All @@ -362,7 +362,7 @@ public void mergeWithCompositeTail_tailExpandable_fastWrite() {
alloc.calculateNewCapacity(EXPECTED_TAIL_DATA.length(), Integer.MAX_VALUE);

// Tail capacity is extended to its fast capacity.
composite.addComponent(true, tail);
composite.addFlattenedComponents(true, tail);
assertTailExpanded(EXPECTED_TAIL_DATA, tailFastCapacity);
}

Expand All @@ -372,7 +372,7 @@ public void mergeWithCompositeTail_tailExpandable_reallocateInMemory() {
@SuppressWarnings("InlineMeInliner") // Requires Java 11
String inSuffixOverFastBytes = Strings.repeat("a", tailFastCapacity + 1);
int newTailSize = tail.readableBytes() + inSuffixOverFastBytes.length();
composite.addComponent(true, tail);
composite.addFlattenedComponents(true, tail);

// Make input larger than tailFastCapacity
in.writeCharSequence(inSuffixOverFastBytes, US_ASCII);
Expand All @@ -386,6 +386,9 @@ public void mergeWithCompositeTail_tailExpandable_reallocateInMemory() {
}

private void assertTailExpanded(String expectedTailReadableData, int expectedNewTailCapacity) {
if (!GrpcHttp2ConnectionHandler.usingPre4_1_111_Netty()) {
return; // Netty 4.1.111 doesn't work with NettyAdaptiveCumulator
}
int originalNumComponents = composite.numComponents();

// Handle the case when reader index is beyond all readable bytes of the cumulation.
Expand Down Expand Up @@ -435,21 +438,21 @@ public void mergeWithCompositeTail_tailNotExpandable_maxCapacityReached() {
@SuppressWarnings("InlineMeInliner") // Requires Java 11
String tailSuffixFullCapacity = Strings.repeat("a", tail.maxWritableBytes());
tail.writeCharSequence(tailSuffixFullCapacity, US_ASCII);
composite.addComponent(true, tail);
composite.addFlattenedComponents(true, tail);
assertTailReplaced();
}

@Test
public void mergeWithCompositeTail_tailNotExpandable_shared() {
tail.retain();
composite.addComponent(true, tail);
composite.addFlattenedComponents(true, tail);
assertTailReplaced();
tail.release();
}

@Test
public void mergeWithCompositeTail_tailNotExpandable_readOnly() {
composite.addComponent(true, tail.asReadOnly());
composite.addFlattenedComponents(true, tail.asReadOnly());
assertTailReplaced();
}

Expand Down Expand Up @@ -527,7 +530,8 @@ public void mergeWithCompositeTail_tailExpandable_mergedReleaseOnThrow() {
CompositeByteBuf compositeThrows = new CompositeByteBuf(alloc, false, Integer.MAX_VALUE,
tail) {
@Override
public CompositeByteBuf addComponent(boolean increaseWriterIndex, ByteBuf buffer) {
public CompositeByteBuf addFlattenedComponents(boolean increaseWriterIndex,
ByteBuf buffer) {
throw expectedError;
}
};
Expand Down Expand Up @@ -560,7 +564,8 @@ public void mergeWithCompositeTail_tailNotExpandable_mergedReleaseOnThrow() {
CompositeByteBuf compositeRo = new CompositeByteBuf(alloc, false, Integer.MAX_VALUE,
tail.asReadOnly()) {
@Override
public CompositeByteBuf addComponent(boolean increaseWriterIndex, ByteBuf buffer) {
public CompositeByteBuf addFlattenedComponents(boolean increaseWriterIndex,
ByteBuf buffer) {
throw expectedError;
}
};
Expand Down Expand Up @@ -614,16 +619,20 @@ public void mergeWithCompositeTail_outOfSyncComposite() {
ByteBuf buf = alloc.buffer(32).writeBytes("---01234".getBytes(US_ASCII));

// Start with a regular cumulation and add the buf as the only component.
CompositeByteBuf composite1 = alloc.compositeBuffer(8).addComponent(true, buf);
CompositeByteBuf composite1 = alloc.compositeBuffer(8).addFlattenedComponents(true, buf);
// Read composite1 buf to the beginning of the numbers.
assertThat(composite1.readCharSequence(3, US_ASCII).toString()).isEqualTo("---");

// Wrap composite1 into another cumulation. This is similar to
// what NettyAdaptiveCumulator.cumulate() does in the case the cumulation has refCnt != 1.
CompositeByteBuf composite2 =
alloc.compositeBuffer(8).addComponent(true, composite1);
alloc.compositeBuffer(8).addFlattenedComponents(true, composite1);
assertThat(composite2.toString(US_ASCII)).isEqualTo("01234");

if (!GrpcHttp2ConnectionHandler.usingPre4_1_111_Netty()) {
return; // Netty 4.1.111 doesn't work with NettyAdaptiveCumulator
}

// The previous operation does not adjust the read indexes of the underlying buffers,
// only the internal Component offsets. When the cumulator attempts to append the input to
// the tail buffer, it extracts it from the cumulation, writes to it, and then adds it back.
Expand All @@ -637,27 +646,13 @@ public void mergeWithCompositeTail_outOfSyncComposite() {
CompositeByteBuf cumulation = (CompositeByteBuf) cumulator.cumulate(alloc, composite2,
ByteBufUtil.writeAscii(alloc, "56789"));
assertThat(cumulation.toString(US_ASCII)).isEqualTo("0123456789");
}

@Test
public void mergeWithNonCompositeTail() {
NettyAdaptiveCumulator cumulator = new NettyAdaptiveCumulator(1024);
ByteBufAllocator alloc = new PooledByteBufAllocator();
ByteBuf buf = alloc.buffer().writeBytes("tail".getBytes(US_ASCII));
ByteBuf in = alloc.buffer().writeBytes("-012345".getBytes(US_ASCII));

CompositeByteBuf composite = alloc.compositeBuffer().addComponent(true, buf);

CompositeByteBuf cumulation = (CompositeByteBuf) cumulator.cumulate(alloc, composite, in);

assertEquals("tail-012345", cumulation.toString(US_ASCII));
assertEquals(0, in.refCnt());
assertEquals(1, cumulation.numComponents());

buf.setByte(2, '*').setByte(7, '$');
assertEquals("ta*l-01$345", cumulation.toString(US_ASCII));

composite.release();
// Correctness check: we still have a single component, and this component is still the
// original underlying buffer.
assertThat(cumulation.numComponents()).isEqualTo(1);
// Replace '2' with '*', and '8' with '$'.
buf.setByte(5, '*').setByte(11, '$');
assertThat(cumulation.toString(US_ASCII)).isEqualTo("01*34567$9");
}
}
}

0 comments on commit 97aa34f

Please sign in to comment.