diff --git a/core/src/main/java/io/grpc/internal/MessageDeframer.java b/core/src/main/java/io/grpc/internal/MessageDeframer.java index c8b250c2143..13a01efec0a 100644 --- a/core/src/main/java/io/grpc/internal/MessageDeframer.java +++ b/core/src/main/java/io/grpc/internal/MessageDeframer.java @@ -406,7 +406,8 @@ private void processBody() { // There is no reliable way to get the uncompressed size per message when it's compressed, // because the uncompressed bytes are provided through an InputStream whose total size is // unknown until all bytes are read, and we don't know when it happens. - statsTraceCtx.inboundMessageRead(currentMessageSeqNo, inboundBodyWireSize, -1); + statsTraceCtx.inboundMessageRead(currentMessageSeqNo, inboundBodyWireSize, + (compressedFlag || fullStreamDecompressor != null) ? -1 : inboundBodyWireSize); inboundBodyWireSize = 0; InputStream stream = compressedFlag ? getCompressedBody() : getUncompressedBody(); nextFrame.touch(); diff --git a/core/src/test/java/io/grpc/internal/MessageDeframerTest.java b/core/src/test/java/io/grpc/internal/MessageDeframerTest.java index 1ec1ccb2082..8f1b908e999 100644 --- a/core/src/test/java/io/grpc/internal/MessageDeframerTest.java +++ b/core/src/test/java/io/grpc/internal/MessageDeframerTest.java @@ -133,7 +133,7 @@ public void simplePayload() { assertEquals(Bytes.asList(new byte[]{3, 14}), bytes(producer.getValue().next())); verify(listener, atLeastOnce()).bytesRead(anyInt()); verifyNoMoreInteractions(listener); - checkStats(tracer, transportTracer.getStats(), fakeClock, 2, 2); + checkStats(tracer, transportTracer.getStats(), fakeClock, useGzipInflatingBuffer, 2, 2); } @Test @@ -148,7 +148,7 @@ public void smallCombinedPayloads() { verify(listener, atLeastOnce()).bytesRead(anyInt()); assertEquals(Bytes.asList(new byte[]{14, 15}), bytes(streams.get(1).next())); verifyNoMoreInteractions(listener); - checkStats(tracer, transportTracer.getStats(), fakeClock, 1, 1, 2, 2); + checkStats(tracer, transportTracer.getStats(), fakeClock, useGzipInflatingBuffer, 1, 1, 2, 2); } @Test @@ -162,7 +162,7 @@ public void endOfStreamWithPayloadShouldNotifyEndOfStream() { verify(listener).deframerClosed(false); verify(listener, atLeastOnce()).bytesRead(anyInt()); verifyNoMoreInteractions(listener); - checkStats(tracer, transportTracer.getStats(), fakeClock, 1, 1); + checkStats(tracer, transportTracer.getStats(), fakeClock, useGzipInflatingBuffer, 1, 1); } @Test @@ -177,7 +177,7 @@ public void endOfStreamShouldNotifyEndOfStream() { } verify(listener).deframerClosed(false); verifyNoMoreInteractions(listener); - checkStats(tracer, transportTracer.getStats(), fakeClock); + checkStats(tracer, transportTracer.getStats(), fakeClock, false); } @Test @@ -189,7 +189,7 @@ public void endOfStreamWithPartialMessageShouldNotifyDeframerClosedWithPartialMe verify(listener, atLeastOnce()).bytesRead(anyInt()); verify(listener).deframerClosed(true); verifyNoMoreInteractions(listener); - checkStats(tracer, transportTracer.getStats(), fakeClock); + checkStats(tracer, transportTracer.getStats(), fakeClock, false); } @Test @@ -206,7 +206,7 @@ public void endOfStreamWithInvalidGzipBlockShouldNotifyDeframerClosedWithPartial deframer.closeWhenComplete(); verify(listener).deframerClosed(true); verifyNoMoreInteractions(listener); - checkStats(tracer, transportTracer.getStats(), fakeClock); + checkStats(tracer, transportTracer.getStats(), fakeClock, false); } @Test @@ -228,10 +228,11 @@ public void payloadSplitBetweenBuffers() { tracer, transportTracer.getStats(), fakeClock, + true, 7 /* msg size */ + 2 /* second buffer adds two bytes of overhead in deflate block */, 7); } else { - checkStats(tracer, transportTracer.getStats(), fakeClock, 7, 7); + checkStats(tracer, transportTracer.getStats(), fakeClock, false, 7, 7); } } @@ -248,7 +249,7 @@ public void frameHeaderSplitBetweenBuffers() { assertEquals(Bytes.asList(new byte[]{3}), bytes(producer.getValue().next())); verify(listener, atLeastOnce()).bytesRead(anyInt()); verifyNoMoreInteractions(listener); - checkStats(tracer, transportTracer.getStats(), fakeClock, 1, 1); + checkStats(tracer, transportTracer.getStats(), fakeClock, useGzipInflatingBuffer, 1, 1); } @Test @@ -259,7 +260,7 @@ public void emptyPayload() { assertEquals(Bytes.asList(), bytes(producer.getValue().next())); verify(listener, atLeastOnce()).bytesRead(anyInt()); verifyNoMoreInteractions(listener); - checkStats(tracer, transportTracer.getStats(), fakeClock, 0, 0); + checkStats(tracer, transportTracer.getStats(), fakeClock, useGzipInflatingBuffer, 0, 0); } @Test @@ -273,9 +274,10 @@ public void largerFrameSize() { verify(listener, atLeastOnce()).bytesRead(anyInt()); verifyNoMoreInteractions(listener); if (useGzipInflatingBuffer) { - checkStats(tracer, transportTracer.getStats(), fakeClock, 8 /* compressed size */, 1000); + checkStats(tracer, transportTracer.getStats(), fakeClock,true, + 8 /* compressed size */, 1000); } else { - checkStats(tracer, transportTracer.getStats(), fakeClock, 1000, 1000); + checkStats(tracer, transportTracer.getStats(), fakeClock, false, 1000, 1000); } } @@ -292,7 +294,7 @@ public void endOfStreamCallbackShouldWaitForMessageDelivery() { verify(listener).deframerClosed(false); verify(listener, atLeastOnce()).bytesRead(anyInt()); verifyNoMoreInteractions(listener); - checkStats(tracer, transportTracer.getStats(), fakeClock, 1, 1); + checkStats(tracer, transportTracer.getStats(), fakeClock, useGzipInflatingBuffer, 1, 1); } @Test @@ -308,6 +310,7 @@ public void compressed() { verify(listener).messagesAvailable(producer.capture()); assertEquals(Bytes.asList(new byte[1000]), bytes(producer.getValue().next())); verify(listener, atLeastOnce()).bytesRead(anyInt()); + checkStats(tracer, transportTracer.getStats(), fakeClock, true, 29, 1000); verifyNoMoreInteractions(listener); } @@ -502,7 +505,8 @@ public void sizeEnforcingInputStream_markReset() throws IOException { * @param sizes in the format {wire0, uncompressed0, wire1, uncompressed1, ...} */ private static void checkStats( - TestBaseStreamTracer tracer, TransportStats transportStats, FakeClock clock, long... sizes) { + TestBaseStreamTracer tracer, TransportStats transportStats, FakeClock clock, + boolean compressed, long... sizes) { assertEquals(0, sizes.length % 2); int count = sizes.length / 2; long expectedWireSize = 0; @@ -510,7 +514,8 @@ private static void checkStats( for (int i = 0; i < count; i++) { assertEquals("inboundMessage(" + i + ")", tracer.nextInboundEvent()); assertEquals( - String.format(Locale.US, "inboundMessageRead(%d, %d, -1)", i, sizes[i * 2]), + String.format(Locale.US, "inboundMessageRead(%d, %d, %d)", i, sizes[i * 2], + compressed ? -1 : sizes[i * 2 + 1]), tracer.nextInboundEvent()); expectedWireSize += sizes[i * 2]; expectedUncompressedSize += sizes[i * 2 + 1]; diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java index 6f2d3268ae0..838ee0797a7 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java @@ -217,7 +217,6 @@ public void outboundMessageSent( @Override public void inboundMessageRead( int seqNo, long optionalWireSize, long optionalUncompressedSize) { - //TODO(yifeizhuang): needs support from message deframer. if (optionalWireSize != optionalUncompressedSize) { recordInboundCompressedMessage(span, seqNo, optionalWireSize); }