diff --git a/docs/reference/modules/transport.asciidoc b/docs/reference/modules/transport.asciidoc index e181395f95939..91e58228fd97a 100644 --- a/docs/reference/modules/transport.asciidoc +++ b/docs/reference/modules/transport.asciidoc @@ -200,4 +200,6 @@ request compression, you can set it on a per-remote cluster basis using the The compression settings do not configure compression for responses. {es} will compress a response if the inbound request was compressed--even when compression is not enabled. Similarly, {es} will not compress a response if the inbound -request was uncompressed--even when compression is enabled. +request was uncompressed--even when compression is enabled. The compression +scheme used to compress a response will be the same scheme the remote node used +to compress the request. diff --git a/server/src/main/java/org/elasticsearch/transport/DeflateTransportDecompressor.java b/server/src/main/java/org/elasticsearch/transport/DeflateTransportDecompressor.java index 04fe07d50880b..d4e64af6eb203 100644 --- a/server/src/main/java/org/elasticsearch/transport/DeflateTransportDecompressor.java +++ b/server/src/main/java/org/elasticsearch/transport/DeflateTransportDecompressor.java @@ -112,6 +112,11 @@ public ReleasableBytesReference pollDecompressedPage(boolean isEOS) { } } + @Override + public Compression.Scheme getScheme() { + return Compression.Scheme.DEFLATE; + } + @Override public void close() { inflater.end(); diff --git a/server/src/main/java/org/elasticsearch/transport/Header.java b/server/src/main/java/org/elasticsearch/transport/Header.java index aa7bcf198b039..0b455bc3370f8 100644 --- a/server/src/main/java/org/elasticsearch/transport/Header.java +++ b/server/src/main/java/org/elasticsearch/transport/Header.java @@ -32,6 +32,7 @@ public class Header { String actionName; Tuple, Map>> headers; Set features; + private Compression.Scheme compressionScheme = null; Header(int networkMessageSize, long requestId, byte status, Version version) { this.networkMessageSize = networkMessageSize; @@ -80,6 +81,10 @@ public String getActionName() { return actionName; } + public Compression.Scheme getCompressionScheme() { + return compressionScheme; + } + boolean needsToReadVariableHeader() { return headers == null; } @@ -112,6 +117,11 @@ void finishParsingHeader(StreamInput input) throws IOException { } } + void setCompressionScheme(Compression.Scheme compressionScheme) { + assert isCompressed(); + this.compressionScheme = compressionScheme; + } + @Override public String toString() { return "Header{" + networkMessageSize + "}{" + version + "}{" + requestId + "}{" + isRequest() + "}{" + isError() + "}{" diff --git a/server/src/main/java/org/elasticsearch/transport/InboundAggregator.java b/server/src/main/java/org/elasticsearch/transport/InboundAggregator.java index b7a1d35263116..2ff60c873a933 100644 --- a/server/src/main/java/org/elasticsearch/transport/InboundAggregator.java +++ b/server/src/main/java/org/elasticsearch/transport/InboundAggregator.java @@ -63,6 +63,13 @@ public void headerReceived(Header header) { } } + public void updateCompressionScheme(Compression.Scheme compressionScheme) { + ensureOpen(); + assert isAggregating(); + assert firstContent == null && contentAggregation == null; + currentHeader.setCompressionScheme(compressionScheme); + } + public void aggregate(ReleasableBytesReference content) { ensureOpen(); assert isAggregating(); @@ -112,6 +119,7 @@ public InboundMessage finishAggregation() throws IOException { success = true; return new InboundMessage(aggregated.getHeader(), aggregationException); } else { + assert uncompressedOrSchemeDefined(aggregated.getHeader()); success = true; return aggregated; } @@ -188,6 +196,10 @@ private void initializeRequestState() { } } + private static boolean uncompressedOrSchemeDefined(Header header) { + return header.isCompressed() == (header.getCompressionScheme() != null); + } + private void checkBreaker(final Header header, final int contentLength, final BreakerControl breakerControl) { if (header.isRequest() == false) { return; diff --git a/server/src/main/java/org/elasticsearch/transport/InboundDecoder.java b/server/src/main/java/org/elasticsearch/transport/InboundDecoder.java index 8136357151a7d..4693bdccda549 100644 --- a/server/src/main/java/org/elasticsearch/transport/InboundDecoder.java +++ b/server/src/main/java/org/elasticsearch/transport/InboundDecoder.java @@ -83,6 +83,7 @@ public int internalDecode(ReleasableBytesReference reference, Consumer f return 0; } else { this.decompressor = decompressor; + fragmentConsumer.accept(this.decompressor.getScheme()); } } int remainingToConsume = totalNetworkSize - bytesConsumed; diff --git a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java index ac4eb44f415d7..86635eb5b0f37 100644 --- a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java @@ -160,7 +160,7 @@ private void handleRequest(TcpChannel channel, Head final StreamInput stream = namedWriteableStream(message.openOrGetStreamInput()); assertRemoteVersion(stream, header.getVersion()); final TransportChannel transportChannel = new TcpTransportChannel(outboundHandler, channel, action, requestId, version, - header.getFeatures(), header.isCompressed(), header.isHandshake(), message.takeBreakerReleaseControl()); + header.getFeatures(), header.getCompressionScheme(), header.isHandshake(), message.takeBreakerReleaseControl()); try { handshaker.handleHandshake(transportChannel, requestId, stream); } catch (Exception e) { @@ -175,7 +175,7 @@ private void handleRequest(TcpChannel channel, Head } } else { final TransportChannel transportChannel = new TcpTransportChannel(outboundHandler, channel, action, requestId, version, - header.getFeatures(), header.isCompressed(), header.isHandshake(), message.takeBreakerReleaseControl()); + header.getFeatures(), header.getCompressionScheme(), header.isHandshake(), message.takeBreakerReleaseControl()); try { messageListener.onRequestReceived(requestId, action); if (message.isShortCircuit()) { diff --git a/server/src/main/java/org/elasticsearch/transport/InboundPipeline.java b/server/src/main/java/org/elasticsearch/transport/InboundPipeline.java index 8b063b3e53be9..b99e9cc788584 100644 --- a/server/src/main/java/org/elasticsearch/transport/InboundPipeline.java +++ b/server/src/main/java/org/elasticsearch/transport/InboundPipeline.java @@ -119,6 +119,9 @@ private void forwardFragments(TcpChannel channel, ArrayList fragments) t if (fragment instanceof Header) { assert aggregator.isAggregating() == false; aggregator.headerReceived((Header) fragment); + } else if (fragment instanceof Compression.Scheme) { + assert aggregator.isAggregating(); + aggregator.updateCompressionScheme((Compression.Scheme) fragment); } else if (fragment == InboundDecoder.PING) { assert aggregator.isAggregating() == false; messageHandler.accept(channel, PING_MESSAGE); diff --git a/server/src/main/java/org/elasticsearch/transport/Lz4TransportDecompressor.java b/server/src/main/java/org/elasticsearch/transport/Lz4TransportDecompressor.java index 9f545c07fd86e..8ce58618e5c05 100644 --- a/server/src/main/java/org/elasticsearch/transport/Lz4TransportDecompressor.java +++ b/server/src/main/java/org/elasticsearch/transport/Lz4TransportDecompressor.java @@ -156,6 +156,11 @@ public ReleasableBytesReference pollDecompressedPage(boolean isEOS) { } } + @Override + public Compression.Scheme getScheme() { + return Compression.Scheme.LZ4; + } + @Override public void close() { for (Recycler.V page : pages) { diff --git a/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java b/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java index b6a2f1e96fe5c..5d9b3085f5d6c 100644 --- a/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java @@ -40,21 +40,19 @@ final class OutboundHandler { private final StatsTracker statsTracker; private final ThreadPool threadPool; private final BigArrays bigArrays; - private final Compression.Scheme configuredCompressionScheme; private volatile long slowLogThresholdMs = Long.MAX_VALUE; private volatile TransportMessageListener messageListener = TransportMessageListener.NOOP_LISTENER; OutboundHandler(String nodeName, Version version, String[] features, StatsTracker statsTracker, ThreadPool threadPool, - BigArrays bigArrays, Compression.Scheme compressionScheme) { + BigArrays bigArrays) { this.nodeName = nodeName; this.version = version; this.features = features; this.statsTracker = statsTracker; this.threadPool = threadPool; this.bigArrays = bigArrays; - this.configuredCompressionScheme = compressionScheme; } void setSlowLogThreshold(TimeValue slowLogThreshold) { @@ -71,14 +69,8 @@ void sendBytes(TcpChannel channel, BytesReference bytes, ActionListener li */ void sendRequest(final DiscoveryNode node, final TcpChannel channel, final long requestId, final String action, final TransportRequest request, final TransportRequestOptions options, final Version channelVersion, - final boolean compressRequest, final boolean isHandshake) throws IOException, TransportException { + final Compression.Scheme compressionScheme, final boolean isHandshake) throws IOException, TransportException { Version version = Version.min(this.version, channelVersion); - final Compression.Scheme compressionScheme; - if (compressRequest) { - compressionScheme = configuredCompressionScheme; - } else { - compressionScheme = null; - } OutboundMessage.Request message = new OutboundMessage.Request(threadPool.getThreadContext(), features, request, version, action, requestId, isHandshake, compressionScheme); @@ -103,15 +95,10 @@ void sendRequest(final DiscoveryNode node, final TcpChannel channel, final long * @see #sendErrorResponse(Version, Set, TcpChannel, long, String, Exception) for sending error responses */ void sendResponse(final Version nodeVersion, final Set features, final TcpChannel channel, final long requestId, - final String action, final TransportResponse response, final boolean compressResponse, final boolean isHandshake) + final String action, final TransportResponse response, final Compression.Scheme compressionScheme, + final boolean isHandshake) throws IOException { Version version = Version.min(this.version, nodeVersion); - final Compression.Scheme compressionScheme; - if (compressResponse) { - compressionScheme = configuredCompressionScheme; - } else { - compressionScheme = null; - } OutboundMessage.Response message = new OutboundMessage.Response(threadPool.getThreadContext(), features, response, version, requestId, isHandshake, compressionScheme); ActionListener listener = ActionListener.wrap(() -> messageListener.onResponseSent(requestId, action, response)); diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index d46cc9e6eacf9..f357544633b1d 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -135,7 +135,6 @@ public TcpTransport(Settings settings, Version version, ThreadPool threadPool, P this.pageCacheRecycler = pageCacheRecycler; this.circuitBreakerService = circuitBreakerService; this.networkService = networkService; - Compression.Scheme compressionScheme = TransportSettings.TRANSPORT_COMPRESSION_SCHEME.get(settings); String nodeName = Node.NODE_NAME_SETTING.get(settings); final Settings defaultFeatures = TransportSettings.DEFAULT_FEATURES_SETTING.get(settings); String[] features; @@ -152,11 +151,11 @@ public TcpTransport(Settings settings, Version version, ThreadPool threadPool, P } BigArrays bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService, CircuitBreaker.IN_FLIGHT_REQUESTS); - this.outboundHandler = new OutboundHandler(nodeName, version, features, statsTracker, threadPool, bigArrays, compressionScheme); + this.outboundHandler = new OutboundHandler(nodeName, version, features, statsTracker, threadPool, bigArrays); this.handshaker = new TransportHandshaker(version, threadPool, (node, channel, requestId, v) -> outboundHandler.sendRequest(node, channel, requestId, TransportHandshaker.HANDSHAKE_ACTION_NAME, new TransportHandshaker.HandshakeRequest(version), - TransportRequestOptions.EMPTY, v, false, true)); + TransportRequestOptions.EMPTY, v, null, true)); this.keepAlive = new TransportKeepAlive(threadPool, this.outboundHandler::sendBytes); this.inboundHandler = new InboundHandler(threadPool, outboundHandler, namedWriteableRegistry, handshaker, keepAlive, requestHandlers, responseHandlers); @@ -200,6 +199,7 @@ public final class NodeChannels extends CloseableConnection { private final DiscoveryNode node; private final Version version; private final Compression.Enabled compress; + private final Compression.Scheme compressionScheme; private final AtomicBoolean isClosing = new AtomicBoolean(false); NodeChannels(DiscoveryNode node, List channels, ConnectionProfile connectionProfile, Version handshakeVersion) { @@ -214,6 +214,7 @@ public final class NodeChannels extends CloseableConnection { } version = handshakeVersion; compress = connectionProfile.getCompressionEnabled(); + compressionScheme = connectionProfile.getCompressionScheme(); } @Override @@ -261,11 +262,12 @@ public void sendRequest(long requestId, String action, TransportRequest request, // We compress if total transport compression is enabled or if indexing_data transport compression // is enabled and the request is a RawIndexingDataTransportRequest which indicates it should be // compressed. - boolean shouldCompress = compress == Compression.Enabled.TRUE || + final boolean shouldCompress = compress == Compression.Enabled.TRUE || (compress == Compression.Enabled.INDEXING_DATA && request instanceof RawIndexingDataTransportRequest && ((RawIndexingDataTransportRequest) request).isRawIndexingData()); - outboundHandler.sendRequest(node, channel, requestId, action, request, options, getVersion(), shouldCompress, false); + final Compression.Scheme schemeToUse = shouldCompress ? compressionScheme : null; + outboundHandler.sendRequest(node, channel, requestId, action, request, options, getVersion(), schemeToUse, false); } @Override diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransportChannel.java b/server/src/main/java/org/elasticsearch/transport/TcpTransportChannel.java index fcbc3af0daf06..44fa549ac23fe 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransportChannel.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransportChannel.java @@ -24,19 +24,19 @@ public final class TcpTransportChannel implements TransportChannel { private final long requestId; private final Version version; private final Set features; - private final boolean compressResponse; + private final Compression.Scheme compressionScheme; private final boolean isHandshake; private final Releasable breakerRelease; TcpTransportChannel(OutboundHandler outboundHandler, TcpChannel channel, String action, long requestId, Version version, - Set features, boolean compressResponse, boolean isHandshake, Releasable breakerRelease) { + Set features, Compression.Scheme compressionScheme, boolean isHandshake, Releasable breakerRelease) { this.version = version; this.features = features; this.channel = channel; this.outboundHandler = outboundHandler; this.action = action; this.requestId = requestId; - this.compressResponse = compressResponse; + this.compressionScheme = compressionScheme; this.isHandshake = isHandshake; this.breakerRelease = breakerRelease; } @@ -49,7 +49,7 @@ public String getProfileName() { @Override public void sendResponse(TransportResponse response) throws IOException { try { - outboundHandler.sendResponse(version, features, channel, requestId, action, response, compressResponse, isHandshake); + outboundHandler.sendResponse(version, features, channel, requestId, action, response, compressionScheme, isHandshake); } finally { release(false); } diff --git a/server/src/main/java/org/elasticsearch/transport/TransportDecompressor.java b/server/src/main/java/org/elasticsearch/transport/TransportDecompressor.java index 07e477be13037..cdc3509da6395 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportDecompressor.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportDecompressor.java @@ -27,6 +27,8 @@ public interface TransportDecompressor extends Releasable { ReleasableBytesReference pollDecompressedPage(boolean isEOS); + Compression.Scheme getScheme(); + @Override void close(); diff --git a/server/src/test/java/org/elasticsearch/transport/InboundDecoderTests.java b/server/src/test/java/org/elasticsearch/transport/InboundDecoderTests.java index 4756ea771a3cd..92ad769ae1bb4 100644 --- a/server/src/test/java/org/elasticsearch/transport/InboundDecoderTests.java +++ b/server/src/test/java/org/elasticsearch/transport/InboundDecoderTests.java @@ -132,7 +132,11 @@ public void testDecodePreHeaderSizeVariableInt() throws IOException { final BytesReference bytes2 = totalBytes.slice(bytesConsumed, totalBytes.length() - bytesConsumed); final ReleasableBytesReference releasable2 = ReleasableBytesReference.wrap(bytes2); int bytesConsumed2 = decoder.decode(releasable2, fragments::add); - assertEquals(2, fragments.size()); + if (compressionScheme == null) { + assertEquals(2, fragments.size()); + } else { + assertEquals(3, fragments.size()); + } assertEquals(InboundDecoder.END_CONTENT, fragments.get(fragments.size() - 1)); assertEquals(totalBytes.length() - bytesConsumed, bytesConsumed2); } @@ -195,7 +199,7 @@ public void testCompressedDecode() throws IOException { final BytesReference totalBytes = message.serialize(new BytesStreamOutput()); final BytesStreamOutput out = new BytesStreamOutput(); transportMessage.writeTo(out); - final BytesReference uncompressedBytes =out.bytes(); + final BytesReference uncompressedBytes = out.bytes(); int totalHeaderSize = TcpHeader.headerSize(Version.CURRENT) + totalBytes.getInt(TcpHeader.VARIABLE_HEADER_SIZE_POSITION); InboundDecoder decoder = new InboundDecoder(Version.CURRENT, PageCacheRecycler.NON_RECYCLING_INSTANCE); @@ -226,9 +230,11 @@ public void testCompressedDecode() throws IOException { int bytesConsumed2 = decoder.decode(releasable2, fragments::add); assertEquals(totalBytes.length() - totalHeaderSize, bytesConsumed2); - final Object content = fragments.get(0); - final Object endMarker = fragments.get(1); + final Object compressionScheme = fragments.get(0); + final Object content = fragments.get(1); + final Object endMarker = fragments.get(2); + assertEquals(scheme, compressionScheme); assertEquals(uncompressedBytes, content); // Ref count is not incremented since the bytes are immediately consumed on decompression assertEquals(1, releasable2.refCount()); diff --git a/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java b/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java index 73906c245c4f1..ac43759eaabc0 100644 --- a/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java @@ -66,7 +66,7 @@ public void setUp() throws Exception { TransportHandshaker handshaker = new TransportHandshaker(version, threadPool, (n, c, r, v) -> {}); TransportKeepAlive keepAlive = new TransportKeepAlive(threadPool, TcpChannel::sendMessage); OutboundHandler outboundHandler = new OutboundHandler("node", version, new String[0], new StatsTracker(), threadPool, - BigArrays.NON_RECYCLING_INSTANCE, randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4)); + BigArrays.NON_RECYCLING_INSTANCE); requestHandlers = new Transport.RequestHandlers(); responseHandlers = new Transport.ResponseHandlers(); handler = new InboundHandler(threadPool, outboundHandler, namedWriteableRegistry, handshaker, keepAlive, requestHandlers, diff --git a/server/src/test/java/org/elasticsearch/transport/InboundPipelineTests.java b/server/src/test/java/org/elasticsearch/transport/InboundPipelineTests.java index cf9a8b23ef1dd..ea70abbb62d40 100644 --- a/server/src/test/java/org/elasticsearch/transport/InboundPipelineTests.java +++ b/server/src/test/java/org/elasticsearch/transport/InboundPipelineTests.java @@ -55,15 +55,20 @@ public void testPipelineHandling() throws IOException { final Version version = header.getVersion(); final boolean isRequest = header.isRequest(); final long requestId = header.getRequestId(); - final boolean isCompressed = header.isCompressed(); + final Compression.Scheme compressionScheme = header.getCompressionScheme(); + if (header.isCompressed()) { + assertNotNull(compressionScheme); + } else { + assertNull(compressionScheme); + } if (m.isShortCircuit()) { - actualData = new MessageData(version, requestId, isRequest, isCompressed, header.getActionName(), null); + actualData = new MessageData(version, requestId, isRequest, compressionScheme, header.getActionName(), null); } else if (isRequest) { final TestRequest request = new TestRequest(m.openOrGetStreamInput()); - actualData = new MessageData(version, requestId, isRequest, isCompressed, header.getActionName(), request.value); + actualData = new MessageData(version, requestId, isRequest, compressionScheme, header.getActionName(), request.value); } else { final TestResponse response = new TestResponse(m.openOrGetStreamInput()); - actualData = new MessageData(version, requestId, isRequest, isCompressed, null, response.value); + actualData = new MessageData(version, requestId, isRequest, compressionScheme, null, response.value); } actual.add(new Tuple<>(actualData, m.getException())); } catch (IOException e) { @@ -96,14 +101,7 @@ public void testPipelineHandling() throws IOException { final Version version = randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion()); final String value = randomRealisticUnicodeOfCodepointLength(randomIntBetween(200, 400)); final boolean isRequest = randomBoolean(); - - Compression.Scheme scheme; - if (randomBoolean()) { - scheme = null; - } else { - scheme = randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4); - } - boolean isCompressed = isCompressed(version, scheme); + Compression.Scheme compressionScheme = getCompressionScheme(version); final long requestId = totalMessages++; final MessageData messageData; @@ -112,19 +110,19 @@ public void testPipelineHandling() throws IOException { OutboundMessage message; if (isRequest) { if (rarely()) { - messageData = new MessageData(version, requestId, true, isCompressed, breakThisAction, null); + messageData = new MessageData(version, requestId, true, compressionScheme, breakThisAction, null); message = new OutboundMessage.Request(threadContext, new String[0], new TestRequest(value), - version, breakThisAction, requestId, false, scheme); + version, breakThisAction, requestId, false, compressionScheme); expectedExceptionClass = new CircuitBreakingException("", CircuitBreaker.Durability.PERMANENT); } else { - messageData = new MessageData(version, requestId, true, isCompressed, actionName, value); + messageData = new MessageData(version, requestId, true, compressionScheme, actionName, value); message = new OutboundMessage.Request(threadContext, new String[0], new TestRequest(value), - version, actionName, requestId, false, scheme); + version, actionName, requestId, false, compressionScheme); } } else { - messageData = new MessageData(version, requestId, false, isCompressed, null, value); + messageData = new MessageData(version, requestId, false, compressionScheme, null, value); message = new OutboundMessage.Response(threadContext, Collections.emptySet(), new TestResponse(value), - version, requestId, false, scheme); + version, requestId, false, compressionScheme); } expected.add(new Tuple<>(messageData, expectedExceptionClass)); @@ -154,7 +152,7 @@ public void testPipelineHandling() throws IOException { final MessageData actualMessageData = actualTuple.v1(); assertEquals(expectedMessageData.requestId, actualMessageData.requestId); assertEquals(expectedMessageData.isRequest, actualMessageData.isRequest); - assertEquals(expectedMessageData.isCompressed, actualMessageData.isCompressed); + assertEquals(expectedMessageData.compressionScheme, actualMessageData.compressionScheme); assertEquals(expectedMessageData.actionName, actualMessageData.actionName); assertEquals(expectedMessageData.value, actualMessageData.value); if (expectedTuple.v2() != null) { @@ -173,11 +171,15 @@ public void testPipelineHandling() throws IOException { } } - private static boolean isCompressed(Version version, Compression.Scheme scheme) { - if (version.before(Compression.Scheme.LZ4_VERSION) && scheme == Compression.Scheme.LZ4) { - return false; + private static Compression.Scheme getCompressionScheme(Version version) { + if (randomBoolean()) { + return null; } else { - return scheme != null; + if (version.before(Compression.Scheme.LZ4_VERSION)) { + return Compression.Scheme.DEFLATE; + } else { + return randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4); + } } } @@ -273,15 +275,16 @@ private static class MessageData { private final Version version; private final long requestId; private final boolean isRequest; - private final boolean isCompressed; + private final Compression.Scheme compressionScheme; private final String value; private final String actionName; - private MessageData(Version version, long requestId, boolean isRequest, boolean isCompressed, String actionName, String value) { + private MessageData(Version version, long requestId, boolean isRequest, Compression.Scheme compressionScheme, String actionName, + String value) { this.version = version; this.requestId = requestId; this.isRequest = isRequest; - this.isCompressed = isCompressed; + this.compressionScheme = compressionScheme; this.actionName = actionName; this.value = value; } @@ -294,7 +297,7 @@ public boolean equals(Object o) { MessageData that = (MessageData) o; return requestId == that.requestId && isRequest == that.isRequest && - isCompressed == that.isCompressed && + Objects.equals(compressionScheme, that.compressionScheme) && Objects.equals(version, that.version) && Objects.equals(value, that.value) && Objects.equals(actionName, that.actionName); @@ -302,7 +305,7 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(version, requestId, isRequest, isCompressed, value, actionName); + return Objects.hash(version, requestId, isRequest, compressionScheme, value, actionName); } } } diff --git a/server/src/test/java/org/elasticsearch/transport/OutboundHandlerTests.java b/server/src/test/java/org/elasticsearch/transport/OutboundHandlerTests.java index 23b12a013a622..46ff946a99ffe 100644 --- a/server/src/test/java/org/elasticsearch/transport/OutboundHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/OutboundHandlerTests.java @@ -72,8 +72,7 @@ public void setUp() throws Exception { String[] features = {feature1, feature2}; StatsTracker statsTracker = new StatsTracker(); compressionScheme = randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4); - handler = new OutboundHandler("node", Version.CURRENT, features, statsTracker, threadPool, BigArrays.NON_RECYCLING_INSTANCE, - compressionScheme); + handler = new OutboundHandler("node", Version.CURRENT, features, statsTracker, threadPool, BigArrays.NON_RECYCLING_INSTANCE); final LongSupplier millisSupplier = () -> TimeValue.nsecToMSec(System.nanoTime()); final InboundDecoder decoder = new InboundDecoder(Version.CURRENT, PageCacheRecycler.NON_RECYCLING_INSTANCE); @@ -147,7 +146,11 @@ public void onRequestSent(DiscoveryNode node, long requestId, String action, Tra requestRef.set(request); } }); - handler.sendRequest(node, channel, requestId, action, request, options, version, compress, isHandshake); + if (compress) { + handler.sendRequest(node, channel, requestId, action, request, options, version, compressionScheme, isHandshake); + } else { + handler.sendRequest(node, channel, requestId, action, request, options, version, null, isHandshake); + } BytesReference reference = channel.getMessageCaptor().get(); ActionListener sendListener = channel.getListenerCaptor().get(); @@ -210,7 +213,12 @@ public void onResponseSent(long requestId, String action, TransportResponse resp responseRef.set(response); } }); - handler.sendResponse(version, Collections.emptySet(), channel, requestId, action, response, compress, isHandshake); + + if (compress) { + handler.sendResponse(version, Collections.emptySet(), channel, requestId, action, response, compressionScheme, isHandshake); + } else { + handler.sendResponse(version, Collections.emptySet(), channel, requestId, action, response, null, isHandshake); + } BytesReference reference = channel.getMessageCaptor().get(); ActionListener sendListener = channel.getListenerCaptor().get(); diff --git a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java index 323bc78ca7959..58f7d5413f816 100644 --- a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java @@ -403,7 +403,7 @@ private void testExceptionHandling(boolean startTransport, Exception exception, TcpTransport.handleException(channel, exception, lifecycle, new OutboundHandler(randomAlphaOfLength(10), Version.CURRENT, new String[0], new StatsTracker(), testThreadPool, - BigArrays.NON_RECYCLING_INSTANCE, randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4))); + BigArrays.NON_RECYCLING_INSTANCE)); if (expectClosed) { assertTrue(listener.isDone()); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index b8951aa7c90ab..9d45c190a99da 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -674,10 +674,17 @@ public void testIndexingDataCompression() throws Exception { serviceA.registerRequestHandler("internal:sayHello", ThreadPool.Names.GENERIC, StringMessageRequest::new, handler); serviceC.registerRequestHandler("internal:sayHello", ThreadPool.Names.GENERIC, StringMessageRequest::new, handler); + final Compression.Scheme scheme; + if (serviceA.getLocalDiscoNode().getVersion().onOrAfter(Compression.Scheme.LZ4_VERSION) && + serviceC.getLocalDiscoNode().getVersion().onOrAfter(Compression.Scheme.LZ4_VERSION)) { + scheme = randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4); + } else { + scheme = Compression.Scheme.DEFLATE; + } + Settings settingsWithCompress = Settings.builder() .put(TransportSettings.TRANSPORT_COMPRESS.getKey(), Compression.Enabled.INDEXING_DATA) - .put(TransportSettings.TRANSPORT_COMPRESSION_SCHEME.getKey(), - randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4)) + .put(TransportSettings.TRANSPORT_COMPRESSION_SCHEME.getKey(), scheme) .build(); ConnectionProfile connectionProfile = ConnectionProfile.buildDefaultConnectionProfile(settingsWithCompress); serviceC.connectToNode(serviceA.getLocalDiscoNode(), connectionProfile); @@ -2418,7 +2425,7 @@ public void handleException(TransportException exp) { transportResponseHandler); receivedLatch.await(); assertBusy(() -> { // netty for instance invokes this concurrently so we better use assert busy here - TransportStats transportStats = serviceC.transport.getStats(); // request has ben send + TransportStats transportStats = serviceC.transport.getStats(); // request has been send assertEquals(1, transportStats.getRxCount()); assertEquals(2, transportStats.getTxCount()); assertEquals(25, transportStats.getRxSize().getBytes()); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/TestTransportChannels.java b/test/framework/src/main/java/org/elasticsearch/transport/TestTransportChannels.java index 73079c05e4cd8..6f913a3f242db 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/TestTransportChannels.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/TestTransportChannels.java @@ -14,15 +14,12 @@ import java.util.Collections; -import static org.elasticsearch.test.ESTestCase.randomFrom; - public class TestTransportChannels { public static TcpTransportChannel newFakeTcpTransportChannel(String nodeName, TcpChannel channel, ThreadPool threadPool, String action, long requestId, Version version) { return new TcpTransportChannel( - new OutboundHandler(nodeName, version, new String[0], new StatsTracker(), threadPool, BigArrays.NON_RECYCLING_INSTANCE, - randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4)), - channel, action, requestId, version, Collections.emptySet(), false, false, () -> {}); + new OutboundHandler(nodeName, version, new String[0], new StatsTracker(), threadPool, BigArrays.NON_RECYCLING_INSTANCE), + channel, action, requestId, version, Collections.emptySet(), null, false, () -> {}); } }