Skip to content

Commit

Permalink
Only compress responses if request was compressed (#36867)
Browse files Browse the repository at this point in the history
This is a follow-up to some discussions around #36399. Currently we have
relatively confusing compression behavior where compression can be
configured for requests based on transport.compress or a specific
setting for a remote cluster. However, we can only compress responses
based on transport.compress as we do not know where a request is
coming from (currently).

This commit modifies the behavior to NEVER compress responses based on
settings. Instead, a response will only be compressed if the request was
compressed. This commit also updates the documentation to more clearly
described transport level compression.
  • Loading branch information
Tim-Brooks authored Dec 21, 2018
1 parent 3f5dd79 commit c8a8391
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 19 deletions.
28 changes: 28 additions & 0 deletions docs/reference/modules/transport.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,34 @@ and ensuring that the keepalive interval is shorter than any timeout that might
cause idle connections to be closed, or by setting `transport.ping_schedule` if
keepalives cannot be configured.

[float]
==== Transport Compression

[float]
===== Request Compresssion

By default, the `transport.compress` setting is `false` and network-level
request compression is disabled between nodes in the cluster. This default
normally makes sense for local cluster communication as compression has a
noticeable CPU cost and local clusters tend to be set up with fast network
connections between nodes.

The `transport.compress` setting always configures local cluster request
compression and is the fallback setting for remote cluster request compression.
If you want to configure remote request compression differently than local
request compression, you can set it on a per-remote cluster basis using the
<<remote-cluster-settings,`cluster.remote.${cluster_alias}.transport.compress` setting>>.


[float]
===== Response Compression

The compression settings do not configure compression for responses. {es} will
compress a response if the inbound request was compressed--even when compression
is not enabled. Similarly, {es} will not compress a response if the inbound
request was uncompressed--even when compression is enabled.


[float]
=== Transport Tracer

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
// this lock is here to make sure we close this transport and disconnect all the client nodes
// connections while no connect operations is going on
private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
private final boolean compressAllResponses;
private volatile BoundTransportAddress boundAddress;
private final String transportName;

Expand All @@ -166,7 +165,6 @@ public TcpTransport(String transportName, Settings settings, Version version, T
this.pageCacheRecycler = pageCacheRecycler;
this.circuitBreakerService = circuitBreakerService;
this.namedWriteableRegistry = namedWriteableRegistry;
this.compressAllResponses = TransportSettings.TRANSPORT_COMPRESS.get(settings);
this.networkService = networkService;
this.transportName = transportName;
this.transportLogger = new TransportLogger();
Expand Down Expand Up @@ -826,14 +824,13 @@ private void sendResponse(
final String action,
boolean compress,
byte status) throws IOException {
boolean compressMessage = compress || compressAllResponses;

status = TransportStatus.setResponse(status);
ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays);
CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bStream, compressMessage);
CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bStream, compress);
boolean addedReleaseListener = false;
try {
if (compressMessage) {
if (compress) {
status = TransportStatus.setCompress(status);
}
threadPool.getThreadContext().writeTo(stream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
Expand All @@ -34,20 +35,22 @@
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.io.StreamCorruptedException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.mockito.Mockito.mock;

/** Unit tests for {@link TcpTransport} */
public class TcpTransportTests extends ESTestCase {
Expand Down Expand Up @@ -184,11 +187,12 @@ public void testEnsureVersionCompatibility() {
+ version.minimumCompatibilityVersion() + "]", ise.getMessage());
}

public void testCompressRequest() throws IOException {
@SuppressForbidden(reason = "Allow accessing localhost")
public void testCompressRequestAndResponse() throws IOException {
final boolean compressed = randomBoolean();
Req request = new Req(randomRealisticUnicodeOfLengthBetween(10, 100));
ThreadPool threadPool = new TestThreadPool(TcpTransportTests.class.getName());
AtomicReference<BytesReference> messageCaptor = new AtomicReference<>();
AtomicReference<BytesReference> requestCaptor = new AtomicReference<>();
try {
TcpTransport transport = new TcpTransport("test", Settings.EMPTY, Version.CURRENT, threadPool,
PageCacheRecycler.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService(), null, null) {
Expand All @@ -200,7 +204,7 @@ protected FakeServerChannel bind(String name, InetSocketAddress address) throws

@Override
protected FakeTcpChannel initiateChannel(DiscoveryNode node) throws IOException {
return new FakeTcpChannel(true, messageCaptor);
return new FakeTcpChannel(false, requestCaptor);
}

@Override
Expand All @@ -215,7 +219,7 @@ public Releasable openConnection(DiscoveryNode node, ConnectionProfile profile,
int numConnections = profile.getNumConnections();
ArrayList<TcpChannel> fakeChannels = new ArrayList<>(numConnections);
for (int i = 0; i < numConnections; ++i) {
fakeChannels.add(new FakeTcpChannel(false, messageCaptor));
fakeChannels.add(new FakeTcpChannel(false, requestCaptor));
}
listener.onResponse(new NodeChannels(node, fakeChannels, profile, Version.CURRENT));
return () -> CloseableChannel.closeChannels(fakeChannels, false);
Expand All @@ -233,11 +237,20 @@ public Releasable openConnection(DiscoveryNode node, ConnectionProfile profile,
transport.openConnection(node, profileBuilder.build(), future);
Transport.Connection connection = future.actionGet();
connection.sendRequest(42, "foobar", request, TransportRequestOptions.EMPTY);
transport.registerRequestHandler(new RequestHandlerRegistry<>("foobar", Req::new, mock(TaskManager.class),
(request1, channel, task) -> channel.sendResponse(TransportResponse.Empty.INSTANCE), ThreadPool.Names.SAME,
true, true));

BytesReference reference = messageCaptor.get();
BytesReference reference = requestCaptor.get();
assertNotNull(reference);

StreamInput streamIn = reference.streamInput();
AtomicReference<BytesReference> responseCaptor = new AtomicReference<>();
InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(), 0);
FakeTcpChannel responseChannel = new FakeTcpChannel(true, address, address, responseCaptor);
transport.messageReceived(reference.slice(6, reference.length() - 6), responseChannel);


StreamInput streamIn = responseCaptor.get().streamInput();
streamIn.skip(TcpHeader.MARKER_BYTES_SIZE);
@SuppressWarnings("unused")
int len = streamIn.readInt();
Expand All @@ -247,17 +260,14 @@ public Releasable openConnection(DiscoveryNode node, ConnectionProfile profile,
Version version = Version.fromId(streamIn.readInt());
assertEquals(Version.CURRENT, version);
assertEquals(compressed, TransportStatus.isCompress(status));
assertFalse(TransportStatus.isRequest(status));
if (compressed) {
final int bytesConsumed = TcpHeader.HEADER_SIZE;
streamIn = CompressorFactory.compressor(reference.slice(bytesConsumed, reference.length() - bytesConsumed))
.streamInput(streamIn);
}
threadPool.getThreadContext().readHeaders(streamIn);
assertThat(streamIn.readStringArray(), equalTo(new String[0])); // features
assertEquals("foobar", streamIn.readString());
Req readReq = new Req("");
readReq.readFrom(streamIn);
assertEquals(request.value, readReq.value);
TransportResponse.Empty.INSTANCE.readFrom(streamIn);

} finally {
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
Expand Down Expand Up @@ -297,6 +307,10 @@ private Req(String value) {
this.value = value;
}

private Req(StreamInput in) throws IOException {
value = in.readString();
}

@Override
public void readFrom(StreamInput in) throws IOException {
value = in.readString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
public class FakeTcpChannel implements TcpChannel {

private final boolean isServer;
private final InetSocketAddress localAddress;
private final InetSocketAddress remoteAddress;
private final String profile;
private final AtomicReference<BytesReference> messageCaptor;
private final ChannelStats stats = new ChannelStats();
Expand All @@ -45,9 +47,21 @@ public FakeTcpChannel(boolean isServer, AtomicReference<BytesReference> messageC
this(isServer, "profile", messageCaptor);
}

public FakeTcpChannel(boolean isServer, InetSocketAddress localAddress, InetSocketAddress remoteAddress,
AtomicReference<BytesReference> messageCaptor) {
this(isServer, localAddress, remoteAddress,"profile", messageCaptor);
}


public FakeTcpChannel(boolean isServer, String profile, AtomicReference<BytesReference> messageCaptor) {
this(isServer, null, null, profile, messageCaptor);
}

public FakeTcpChannel(boolean isServer, InetSocketAddress localAddress, InetSocketAddress remoteAddress, String profile,
AtomicReference<BytesReference> messageCaptor) {
this.isServer = isServer;
this.localAddress = localAddress;
this.remoteAddress = remoteAddress;
this.profile = profile;
this.messageCaptor = messageCaptor;
}
Expand All @@ -64,12 +78,12 @@ public String getProfile() {

@Override
public InetSocketAddress getLocalAddress() {
return null;
return localAddress;
}

@Override
public InetSocketAddress getRemoteAddress() {
return null;
return remoteAddress;
}

@Override
Expand Down

0 comments on commit c8a8391

Please sign in to comment.