diff --git a/docs/reference/modules/remote-clusters.asciidoc b/docs/reference/modules/remote-clusters.asciidoc index 923580133a76d..ef9aee6edfcd4 100644 --- a/docs/reference/modules/remote-clusters.asciidoc +++ b/docs/reference/modules/remote-clusters.asciidoc @@ -288,11 +288,12 @@ separately. `cluster.remote..transport.compress`:: - Per cluster boolean setting that enables you to configure compression for - requests to a specific remote cluster. This setting impacts only requests + Per cluster setting that enables you to configure compression for requests + to a specific remote cluster. This setting impacts only requests sent to the remote cluster. If the inbound request is compressed, - Elasticsearch compresses the response. If unset, the global - `transport.compress` is used as the fallback setting. + Elasticsearch compresses the response. The setting options are `true`, + `indexing_data`, and `false`. The option `indexing_data` is experimental. + If unset, the global `transport.compress` is used as the fallback setting. [discrete] [[remote-cluster-sniff-settings]] diff --git a/docs/reference/modules/transport.asciidoc b/docs/reference/modules/transport.asciidoc index 58727ca9f2a32..a86febd2b905e 100644 --- a/docs/reference/modules/transport.asciidoc +++ b/docs/reference/modules/transport.asciidoc @@ -49,8 +49,19 @@ time setting format). Defaults to `30s`. `transport.compress`:: (<>) -Set to `true` to enable compression (`DEFLATE`) between -all nodes. Defaults to `false`. +Set to `true`, `indexing_data`, or `false` to configure transport compression +between nodes. The option `true` will compress all data. The option +`indexing_data` will compress only the raw index data sent between nodes during +ingest, ccr following (excluding bootstrap), and operations based shard recovery +(excluding transferring lucene files). The `indexing_data` option is experimental. +Defaults to `false`. + +`transport.compression_scheme`:: +(<>) +Configures the compression scheme for `transport.compress`. The options are +`deflate` or `lz4`. The option `lz4` is experimental. If `lz4` is configured and + the remote node has not been upgraded to a version supporting `lz4`, the traffic + will be sent uncompressed. Defaults to `deflate`. `transport.ping_schedule`:: (<>) @@ -172,6 +183,11 @@ normally makes sense for local cluster communication as compression has a noticeable CPU cost and local clusters tend to be set up with fast network connections between nodes. +The `transport.compress` configuration option `indexing_data` will only +compress requests that relate to the transport of raw indexing source data +between nodes. This option primarily compresses data sent during ingest, +ccr, and shard recovery. This option is experimental. + The `transport.compress` setting always configures local cluster request compression and is the fallback setting for remote cluster request compression. If you want to configure remote request compression differently than local diff --git a/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java b/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java index ad2af61424a5a..ebe942996734d 100644 --- a/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java +++ b/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java @@ -15,6 +15,7 @@ import static org.elasticsearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING; import static org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_COMPRESS; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -30,6 +31,7 @@ import java.util.ArrayList; import java.util.Base64; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -41,6 +43,7 @@ import org.apache.http.util.EntityUtils; import org.elasticsearch.Version; +import org.elasticsearch.action.admin.cluster.settings.ClusterGetSettingsResponse; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; @@ -48,6 +51,7 @@ import org.elasticsearch.client.WarningFailureException; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.MetadataIndexStateService; +import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.core.Booleans; import org.elasticsearch.core.CheckedFunction; import org.elasticsearch.common.Strings; @@ -66,6 +70,7 @@ import org.elasticsearch.test.XContentTestUtils; import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.test.rest.yaml.ObjectPath; +import org.elasticsearch.transport.Compression; import org.junit.Before; /** @@ -1664,6 +1669,41 @@ public void testForbidDisableSoftDeletesOnRestore() throws Exception { } } + /** + * In 7.14 the cluster.remote.*.transport.compress setting was change from a boolean to an enum setting + * with true/false as options. This test ensures that the old boolean setting in cluster state is + * translated properly. This test can be removed in 9.0. + */ + public void testTransportCompressionSetting() throws IOException { + if (isRunningAgainstOldCluster()) { + final Request putSettingsRequest = new Request("PUT", "/_cluster/settings"); + try (XContentBuilder builder = jsonBuilder()) { + builder.startObject(); + { + builder.startObject("persistent"); + { + builder.field("cluster.remote.foo.seeds", Collections.singletonList("localhost:9200")); + builder.field("cluster.remote.foo.transport.compress", "true"); + } + builder.endObject(); + } + builder.endObject(); + putSettingsRequest.setJsonEntity(Strings.toString(builder)); + } + client().performRequest(putSettingsRequest); + } else { + final Request getSettingsRequest = new Request("GET", "/_cluster/settings"); + final Response getSettingsResponse = client().performRequest(getSettingsRequest); + try (XContentParser parser = createParser(JsonXContent.jsonXContent, getSettingsResponse.getEntity().getContent())) { + final ClusterGetSettingsResponse clusterGetSettingsResponse = ClusterGetSettingsResponse.fromXContent(parser); + final Settings settings = clusterGetSettingsResponse.getPersistentSettings(); + assertThat( + REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace("foo").get(settings), + equalTo(Compression.Enabled.TRUE)); + } + } + } + public static void assertNumHits(String index, int numHits, int totalShards) throws IOException { Map resp = entityAsMap(client().performRequest(new Request("GET", "/" + index + "/_search"))); assertNoFailures(resp); diff --git a/server/build.gradle b/server/build.gradle index e3c8e40662de5..4e909cbecefd2 100644 --- a/server/build.gradle +++ b/server/build.gradle @@ -92,6 +92,9 @@ dependencies { api project(":libs:elasticsearch-cli") api 'com.carrotsearch:hppc:0.8.1' + // LZ4 + api 'org.lz4:lz4-java:1.8.0' + // time handling, remove with java 8 time api "joda-time:joda-time:${versions.joda}" @@ -306,6 +309,11 @@ tasks.named("thirdPartyAudit").configure { if (BuildParams.runtimeJavaVersion > JavaVersion.VERSION_1_8) { ignoreMissingClasses 'javax.xml.bind.DatatypeConverter' } + + ignoreViolations( + // from java-lz4 + 'net.jpountz.util.UnsafeUtils' + ) } tasks.named("dependencyLicenses").configure { diff --git a/server/licenses/lz4-java-1.8.0.jar.sha1 b/server/licenses/lz4-java-1.8.0.jar.sha1 new file mode 100644 index 0000000000000..5e3536d1b7d29 --- /dev/null +++ b/server/licenses/lz4-java-1.8.0.jar.sha1 @@ -0,0 +1 @@ +4b986a99445e49ea5fbf5d149c4b63f6ed6c6780 \ No newline at end of file diff --git a/server/licenses/lz4-java-LICENSE.txt b/server/licenses/lz4-java-LICENSE.txt new file mode 100644 index 0000000000000..d645695673349 --- /dev/null +++ b/server/licenses/lz4-java-LICENSE.txt @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/server/licenses/lz4-java-NOTICE.txt b/server/licenses/lz4-java-NOTICE.txt new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java index c3a0856235a22..2f08c886aec68 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java @@ -16,13 +16,14 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.transport.RawIndexingDataTransportRequest; import java.io.IOException; import java.util.HashSet; import java.util.Set; import java.util.stream.Stream; -public class BulkShardRequest extends ReplicatedWriteRequest implements Accountable { +public class BulkShardRequest extends ReplicatedWriteRequest implements Accountable, RawIndexingDataTransportRequest { public static final Version COMPACT_SHARD_ID_VERSION = Version.V_7_9_0; private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(BulkShardRequest.class); diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index b9e9aa68fc831..4589ab0e32258 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -332,6 +332,7 @@ public void apply(Settings value, Settings current, Settings previous) { RemoteClusterService.SEARCH_ENABLE_REMOTE_CLUSTERS, RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE, RemoteClusterService.REMOTE_CLUSTER_COMPRESS, + RemoteClusterService.REMOTE_CLUSTER_COMPRESSION_SCHEME, RemoteConnectionStrategy.REMOTE_CONNECTION_MODE, ProxyConnectionStrategy.PROXY_ADDRESS, ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS, @@ -362,6 +363,7 @@ public void apply(Settings value, Settings current, Settings previous) { TransportSettings.PUBLISH_PORT_PROFILE, TransportSettings.OLD_TRANSPORT_COMPRESS, TransportSettings.TRANSPORT_COMPRESS, + TransportSettings.TRANSPORT_COMPRESSION_SCHEME, TransportSettings.PING_SCHEDULE, TransportSettings.TCP_CONNECT_TIMEOUT, TransportSettings.CONNECT_TIMEOUT, diff --git a/server/src/main/java/org/elasticsearch/common/settings/Setting.java b/server/src/main/java/org/elasticsearch/common/settings/Setting.java index 1cdb36a9021e0..6ee582687e203 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/Setting.java +++ b/server/src/main/java/org/elasticsearch/common/settings/Setting.java @@ -1420,6 +1420,23 @@ public static > Setting enumSetting(Class clazz, String return new Setting<>(key, defaultValue.toString(), e -> Enum.valueOf(clazz, e.toUpperCase(Locale.ROOT)), properties); } + /** + * Creates a setting where the allowed values are defined as enum constants. All enum constants must be uppercase. + * + * @param clazz the enum class + * @param key the key for the setting + * @param fallbackSetting the fallback setting for this setting + * @param validator validator for this setting + * @param properties properties for this setting like scope, filtering... + * @param the generics type parameter reflecting the actual type of the enum + * @return the setting object + */ + public static > Setting enumSetting(Class clazz, String key, Setting fallbackSetting, + Validator validator, Property... properties) { + return new Setting<>(new SimpleKey(key), fallbackSetting, fallbackSetting::getRaw, + e -> Enum.valueOf(clazz, e.toUpperCase(Locale.ROOT)), validator, properties); + } + /** * Creates a setting which specifies a memory size. This can either be * specified as an absolute bytes value or as a percentage of the heap diff --git a/server/src/main/java/org/elasticsearch/discovery/HandshakingTransportAddressConnector.java b/server/src/main/java/org/elasticsearch/discovery/HandshakingTransportAddressConnector.java index 4fad08201d914..ce559f3afaafc 100644 --- a/server/src/main/java/org/elasticsearch/discovery/HandshakingTransportAddressConnector.java +++ b/server/src/main/java/org/elasticsearch/discovery/HandshakingTransportAddressConnector.java @@ -73,7 +73,7 @@ protected void doRun() { logger.trace("[{}] opening probe connection", thisConnectionAttempt); transportService.openConnection(targetNode, ConnectionProfile.buildSingleChannelProfile(Type.REG, probeConnectTimeout, probeHandshakeTimeout, - TimeValue.MINUS_ONE, null), listener.delegateFailure((l, connection) -> { + TimeValue.MINUS_ONE, null, null), listener.delegateFailure((l, connection) -> { logger.trace("[{}] opened probe connection", thisConnectionAttempt); // use NotifyOnceListener to make sure the following line does not result in onFailure being called when diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java b/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java index 6863b7ab8091e..7e06e382bf755 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java @@ -198,7 +198,7 @@ protected void ping(final Consumer resultsConsumer, final ConnectionProfile connectionProfile = ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, requestDuration, requestDuration, - TimeValue.MINUS_ONE, null); + TimeValue.MINUS_ONE, null, null); final PingingRound pingingRound = new PingingRound(pingingRoundIdGenerator.incrementAndGet(), seedAddresses, resultsConsumer, nodes.getLocalNode(), connectionProfile); activePingingRounds.put(pingingRound.id(), pingingRound); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java index 6675b7b90d072..35c40a31e0643 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java @@ -16,11 +16,12 @@ import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.transport.RawIndexingDataTransportRequest; import java.io.IOException; import java.util.List; -public class RecoveryTranslogOperationsRequest extends RecoveryTransportRequest { +public class RecoveryTranslogOperationsRequest extends RecoveryTransportRequest implements RawIndexingDataTransportRequest { private final long recoveryId; private final ShardId shardId; diff --git a/server/src/main/java/org/elasticsearch/transport/Compression.java b/server/src/main/java/org/elasticsearch/transport/Compression.java new file mode 100644 index 0000000000000..cb8fc010de366 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/transport/Compression.java @@ -0,0 +1,83 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.transport; + +import net.jpountz.lz4.LZ4Factory; + +import org.elasticsearch.Version; +import org.elasticsearch.common.bytes.BytesReference; + +import java.io.IOException; +import java.io.OutputStream; + +public class Compression { + + public enum Scheme { + LZ4, + DEFLATE; + + static final Version LZ4_VERSION = Version.V_7_14_0; + static final int HEADER_LENGTH = 4; + private static final byte[] DEFLATE_HEADER = new byte[]{'D', 'F', 'L', '\0'}; + private static final byte[] LZ4_HEADER = new byte[]{'L', 'Z', '4', '\0'}; + private static final int LZ4_BLOCK_SIZE; + + static { + String blockSizeString = System.getProperty("es.transport.compression.lz4_block_size"); + if (blockSizeString != null) { + int lz4BlockSize = Integer.parseInt(blockSizeString); + if (lz4BlockSize < 1024 || lz4BlockSize > (512 * 1024)) { + throw new IllegalArgumentException("lz4_block_size must be >= 1KB and <= 512KB"); + } + LZ4_BLOCK_SIZE = lz4BlockSize; + } else { + LZ4_BLOCK_SIZE = 64 * 1024; + } + } + + public static boolean isDeflate(BytesReference bytes) { + byte firstByte = bytes.get(0); + if (firstByte != Compression.Scheme.DEFLATE_HEADER[0]) { + return false; + } else { + return validateHeader(bytes, DEFLATE_HEADER); + } + } + + public static boolean isLZ4(BytesReference bytes) { + byte firstByte = bytes.get(0); + if (firstByte != Scheme.LZ4_HEADER[0]) { + return false; + } else { + return validateHeader(bytes, LZ4_HEADER); + } + } + + private static boolean validateHeader(BytesReference bytes, byte[] header) { + for (int i = 1; i < Compression.Scheme.HEADER_LENGTH; ++i) { + if (bytes.get(i) != header[i]) { + return false; + } + } + return true; + } + + public static OutputStream lz4OutputStream(OutputStream outputStream) throws IOException { + outputStream.write(LZ4_HEADER); + return new ReuseBuffersLZ4BlockOutputStream(outputStream, LZ4_BLOCK_SIZE, LZ4Factory.safeInstance().fastCompressor()); + } + } + + public enum Enabled { + TRUE, + INDEXING_DATA, + FALSE + } + +} diff --git a/server/src/main/java/org/elasticsearch/transport/ConnectionProfile.java b/server/src/main/java/org/elasticsearch/transport/ConnectionProfile.java index d6b5868fa5c03..56dc0bdbfbc3c 100644 --- a/server/src/main/java/org/elasticsearch/transport/ConnectionProfile.java +++ b/server/src/main/java/org/elasticsearch/transport/ConnectionProfile.java @@ -35,7 +35,8 @@ public static ConnectionProfile resolveConnectionProfile(@Nullable ConnectionPro if (profile == null) { return fallbackProfile; } else if (profile.getConnectTimeout() != null && profile.getHandshakeTimeout() != null - && profile.getPingInterval() != null && profile.getCompressionEnabled() != null) { + && profile.getPingInterval() != null && profile.getCompressionEnabled() != null + && profile.getCompressionScheme() != null) { return profile; } else { ConnectionProfile.Builder builder = new ConnectionProfile.Builder(profile); @@ -51,6 +52,9 @@ public static ConnectionProfile resolveConnectionProfile(@Nullable ConnectionPro if (profile.getCompressionEnabled() == null) { builder.setCompressionEnabled(fallbackProfile.getCompressionEnabled()); } + if (profile.getCompressionScheme() == null) { + builder.setCompressionScheme(fallbackProfile.getCompressionScheme()); + } return builder.build(); } } @@ -72,6 +76,7 @@ public static ConnectionProfile buildDefaultConnectionProfile(Settings settings) builder.setHandshakeTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings)); builder.setPingInterval(TransportSettings.PING_SCHEDULE.get(settings)); builder.setCompressionEnabled(TransportSettings.TRANSPORT_COMPRESS.get(settings)); + builder.setCompressionScheme(TransportSettings.TRANSPORT_COMPRESSION_SCHEME.get(settings)); builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK); builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING); // if we are not master eligible we don't need a dedicated channel to publish the state @@ -88,7 +93,7 @@ public static ConnectionProfile buildDefaultConnectionProfile(Settings settings) * when opening single use connections */ public static ConnectionProfile buildSingleChannelProfile(TransportRequestOptions.Type channelType) { - return buildSingleChannelProfile(channelType, null, null, null, null); + return buildSingleChannelProfile(channelType, null, null, null, null, null); } /** @@ -97,7 +102,8 @@ public static ConnectionProfile buildSingleChannelProfile(TransportRequestOption */ public static ConnectionProfile buildSingleChannelProfile(TransportRequestOptions.Type channelType, @Nullable TimeValue connectTimeout, @Nullable TimeValue handshakeTimeout, @Nullable TimeValue pingInterval, - @Nullable Boolean compressionEnabled) { + @Nullable Compression.Enabled compressionEnabled, + @Nullable Compression.Scheme compressionScheme) { Builder builder = new Builder(); builder.addConnections(1, channelType); final EnumSet otherTypes = EnumSet.allOf(TransportRequestOptions.Type.class); @@ -115,6 +121,9 @@ public static ConnectionProfile buildSingleChannelProfile(TransportRequestOption if (compressionEnabled != null) { builder.setCompressionEnabled(compressionEnabled); } + if (compressionScheme != null) { + builder.setCompressionScheme(compressionScheme); + } return builder.build(); } @@ -123,16 +132,19 @@ public static ConnectionProfile buildSingleChannelProfile(TransportRequestOption private final TimeValue connectTimeout; private final TimeValue handshakeTimeout; private final TimeValue pingInterval; - private final Boolean compressionEnabled; + private final Compression.Enabled compressionEnabled; + private final Compression.Scheme compressionScheme; private ConnectionProfile(List handles, int numConnections, TimeValue connectTimeout, - TimeValue handshakeTimeout, TimeValue pingInterval, Boolean compressionEnabled) { + TimeValue handshakeTimeout, TimeValue pingInterval, Compression.Enabled compressionEnabled, + Compression.Scheme compressionScheme) { this.handles = handles; this.numConnections = numConnections; this.connectTimeout = connectTimeout; this.handshakeTimeout = handshakeTimeout; this.pingInterval = pingInterval; this.compressionEnabled = compressionEnabled; + this.compressionScheme = compressionScheme; } /** @@ -144,7 +156,8 @@ public static class Builder { private int numConnections = 0; private TimeValue connectTimeout; private TimeValue handshakeTimeout; - private Boolean compressionEnabled; + private Compression.Enabled compressionEnabled; + private Compression.Scheme compressionScheme; private TimeValue pingInterval; /** create an empty builder */ @@ -159,6 +172,7 @@ public Builder(ConnectionProfile source) { connectTimeout = source.getConnectTimeout(); handshakeTimeout = source.getHandshakeTimeout(); compressionEnabled = source.getCompressionEnabled(); + compressionScheme = source.getCompressionScheme(); pingInterval = source.getPingInterval(); } /** @@ -192,13 +206,21 @@ public Builder setPingInterval(TimeValue pingInterval) { } /** - * Sets compression enabled for this connection profile + * Sets compression enabled configuration for this connection profile */ - public Builder setCompressionEnabled(boolean compressionEnabled) { + public Builder setCompressionEnabled(Compression.Enabled compressionEnabled) { this.compressionEnabled = compressionEnabled; return this; } + /** + * Sets compression scheme for this connection profile + */ + public Builder setCompressionScheme(Compression.Scheme compressionScheme) { + this.compressionScheme = compressionScheme; + return this; + } + /** * Adds a number of connections for one or more types. Each type can only be added once. * @param numConnections the number of connections to use in the pool for the given connection types @@ -230,7 +252,7 @@ public ConnectionProfile build() { throw new IllegalStateException("not all types are added for this connection profile - missing types: " + types); } return new ConnectionProfile(Collections.unmodifiableList(handles), numConnections, connectTimeout, handshakeTimeout, - pingInterval, compressionEnabled); + pingInterval, compressionEnabled, compressionScheme); } } @@ -257,13 +279,21 @@ public TimeValue getPingInterval() { } /** - * Returns boolean indicating if compression is enabled or null if no explicit compression + * Returns the compression enabled configuration or null if no explicit compression configuration * is set on this profile. */ - public Boolean getCompressionEnabled() { + public Compression.Enabled getCompressionEnabled() { return compressionEnabled; } + /** + * Returns the configured compression scheme or null if no explicit + * compression scheme is set on this profile. + */ + public Compression.Scheme getCompressionScheme() { + return compressionScheme; + } + /** * Returns the total number of connections for this profile */ diff --git a/server/src/main/java/org/elasticsearch/transport/DeflateTransportDecompressor.java b/server/src/main/java/org/elasticsearch/transport/DeflateTransportDecompressor.java new file mode 100644 index 0000000000000..04fe07d50880b --- /dev/null +++ b/server/src/main/java/org/elasticsearch/transport/DeflateTransportDecompressor.java @@ -0,0 +1,122 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.transport; + +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.BytesRefIterator; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.bytes.ReleasableBytesReference; +import org.elasticsearch.common.recycler.Recycler; +import org.elasticsearch.common.util.PageCacheRecycler; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.zip.DataFormatException; +import java.util.zip.Inflater; + +public class DeflateTransportDecompressor implements TransportDecompressor { + + private final Inflater inflater; + private final PageCacheRecycler recycler; + private final ArrayDeque> pages; + private int pageOffset = PageCacheRecycler.BYTE_PAGE_SIZE; + private boolean hasSkippedHeader = false; + + public DeflateTransportDecompressor(PageCacheRecycler recycler) { + this.recycler = recycler; + inflater = new Inflater(true); + pages = new ArrayDeque<>(4); + } + + @Override + public int decompress(BytesReference bytesReference) throws IOException { + int bytesConsumed = 0; + if (hasSkippedHeader == false) { + hasSkippedHeader = true; + int headerLength = Compression.Scheme.HEADER_LENGTH; + bytesReference = bytesReference.slice(headerLength, bytesReference.length() - headerLength); + bytesConsumed += headerLength; + } + + BytesRefIterator refIterator = bytesReference.iterator(); + BytesRef ref; + while ((ref = refIterator.next()) != null) { + inflater.setInput(ref.bytes, ref.offset, ref.length); + bytesConsumed += ref.length; + boolean continueInflating = true; + while (continueInflating) { + final boolean isNewPage = pageOffset == PageCacheRecycler.BYTE_PAGE_SIZE; + if (isNewPage) { + pageOffset = 0; + pages.add(recycler.bytePage(false)); + } + final Recycler.V page = pages.getLast(); + + byte[] output = page.v(); + try { + int bytesInflated = inflater.inflate(output, pageOffset, PageCacheRecycler.BYTE_PAGE_SIZE - pageOffset); + pageOffset += bytesInflated; + if (isNewPage) { + if (bytesInflated == 0) { + Recycler.V removed = pages.pollLast(); + assert removed == page; + removed.close(); + pageOffset = PageCacheRecycler.BYTE_PAGE_SIZE; + } + } + } catch (DataFormatException e) { + throw new IOException("Exception while inflating bytes", e); + } + if (inflater.needsInput()) { + continueInflating = false; + } + if (inflater.finished()) { + bytesConsumed -= inflater.getRemaining(); + continueInflating = false; + } + assert inflater.needsDictionary() == false; + } + } + + return bytesConsumed; + } + + public boolean isEOS() { + return inflater.finished(); + } + + @Override + public ReleasableBytesReference pollDecompressedPage(boolean isEOS) { + if (pages.isEmpty()) { + return null; + } else if (pages.size() == 1) { + if (isEOS) { + assert isEOS(); + Recycler.V page = pages.pollFirst(); + ReleasableBytesReference reference = new ReleasableBytesReference(new BytesArray(page.v(), 0, pageOffset), page); + pageOffset = 0; + return reference; + } else { + return null; + } + } else { + Recycler.V page = pages.pollFirst(); + return new ReleasableBytesReference(new BytesArray(page.v()), page); + } + } + + @Override + public void close() { + inflater.end(); + for (Recycler.V page : pages) { + page.close(); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/transport/InboundDecoder.java b/server/src/main/java/org/elasticsearch/transport/InboundDecoder.java index cb49c957717ff..8136357151a7d 100644 --- a/server/src/main/java/org/elasticsearch/transport/InboundDecoder.java +++ b/server/src/main/java/org/elasticsearch/transport/InboundDecoder.java @@ -29,6 +29,7 @@ public class InboundDecoder implements Releasable { private TransportDecompressor decompressor; private int totalNetworkSize = -1; private int bytesConsumed = 0; + private boolean isCompressed = false; private boolean isClosed = false; public InboundDecoder(Version version, PageCacheRecycler recycler) { @@ -64,7 +65,7 @@ public int internalDecode(ReleasableBytesReference reference, Consumer f Header header = readHeader(version, messageLength, reference); bytesConsumed += headerBytesToRead; if (header.isCompressed()) { - decompressor = new TransportDecompressor(recycler); + isCompressed = true; } fragmentConsumer.accept(header); @@ -75,32 +76,42 @@ public int internalDecode(ReleasableBytesReference reference, Consumer f } } } else { - // There are a minimum number of bytes required to start decompression - if (decompressor != null && decompressor.canDecompress(reference.length()) == false) { - return 0; + if (isCompressed && decompressor == null) { + // Attempt to initialize decompressor + TransportDecompressor decompressor = TransportDecompressor.getDecompressor(recycler, reference); + if (decompressor == null) { + return 0; + } else { + this.decompressor = decompressor; + } } - int bytesToConsume = Math.min(reference.length(), totalNetworkSize - bytesConsumed); - bytesConsumed += bytesToConsume; + int remainingToConsume = totalNetworkSize - bytesConsumed; + int maxBytesToConsume = Math.min(reference.length(), remainingToConsume); ReleasableBytesReference retainedContent; - if (isDone()) { - retainedContent = reference.retainedSlice(0, bytesToConsume); + if (maxBytesToConsume == remainingToConsume) { + retainedContent = reference.retainedSlice(0, maxBytesToConsume); } else { retainedContent = reference.retain(); } + + int bytesConsumedThisDecode = 0; if (decompressor != null) { - decompress(retainedContent); + bytesConsumedThisDecode += decompress(retainedContent); + bytesConsumed += bytesConsumedThisDecode; ReleasableBytesReference decompressed; - while ((decompressed = decompressor.pollDecompressedPage()) != null) { + while ((decompressed = decompressor.pollDecompressedPage(isDone())) != null) { fragmentConsumer.accept(decompressed); } } else { + bytesConsumedThisDecode += maxBytesToConsume; + bytesConsumed += maxBytesToConsume; fragmentConsumer.accept(retainedContent); } if (isDone()) { finishMessage(fragmentConsumer); } - return bytesToConsume; + return bytesConsumedThisDecode; } } @@ -119,16 +130,16 @@ private void cleanDecodeState() { try { Releasables.closeExpectNoException(decompressor); } finally { + isCompressed = false; decompressor = null; totalNetworkSize = -1; bytesConsumed = 0; } } - private void decompress(ReleasableBytesReference content) throws IOException { + private int decompress(ReleasableBytesReference content) throws IOException { try (ReleasableBytesReference toRelease = content) { - int consumed = decompressor.decompress(content); - assert consumed == content.length(); + return decompressor.decompress(content); } } diff --git a/server/src/main/java/org/elasticsearch/transport/Lz4TransportDecompressor.java b/server/src/main/java/org/elasticsearch/transport/Lz4TransportDecompressor.java new file mode 100644 index 0000000000000..9f545c07fd86e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/transport/Lz4TransportDecompressor.java @@ -0,0 +1,352 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.elasticsearch.transport; + +import net.jpountz.lz4.LZ4Exception; +import net.jpountz.lz4.LZ4Factory; +import net.jpountz.lz4.LZ4FastDecompressor; + +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.bytes.ReleasableBytesReference; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.recycler.Recycler; +import org.elasticsearch.common.util.PageCacheRecycler; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Locale; +import java.util.zip.Checksum; + +/** + * This file is forked from the https://netty.io project. In particular it forks the following file + * io.netty.handler.codec.compression.Lz4FrameDecoder. + * + * It modifies the original netty code to operate on byte arrays opposed to ByteBufs. + * Additionally, it integrates the decompression code to work in the Elasticsearch transport + * pipeline, Finally, it replaces the custom Netty decoder exceptions. + * + * This class is necessary as Netty is not a dependency in Elasticsearch server module. + */ +public class Lz4TransportDecompressor implements TransportDecompressor { + + private static final ThreadLocal DECOMPRESSED = ThreadLocal.withInitial(() -> BytesRef.EMPTY_BYTES); + private static final ThreadLocal COMPRESSED = ThreadLocal.withInitial(() -> BytesRef.EMPTY_BYTES); + + /** + * Magic number of LZ4 block. + */ + static final long MAGIC_NUMBER = (long) 'L' << 56 | + (long) 'Z' << 48 | + (long) '4' << 40 | + (long) 'B' << 32 | + 'l' << 24 | + 'o' << 16 | + 'c' << 8 | + 'k'; + + static final int HEADER_LENGTH = 8 + // magic number + 1 + // token + 4 + // compressed length + 4 + // decompressed length + 4; // checksum + + + /** + * Base value for compression level. + */ + static final int COMPRESSION_LEVEL_BASE = 10; + + static final int MIN_BLOCK_SIZE = 64; + static final int MAX_BLOCK_SIZE = 1 << COMPRESSION_LEVEL_BASE + 0x0F; // 32 M + static final int DEFAULT_BLOCK_SIZE = 1 << 16; // 64 KB + + static final int BLOCK_TYPE_NON_COMPRESSED = 0x10; + static final int BLOCK_TYPE_COMPRESSED = 0x20; + + private enum State { + INIT_BLOCK, + DECOMPRESS_DATA, + FINISHED, + CORRUPTED + } + + private State currentState = State.INIT_BLOCK; + + /** + * Underlying decompressor in use. + */ + private LZ4FastDecompressor decompressor; + + /** + * Underlying checksum calculator in use. + */ + private Checksum checksum; + + /** + * Type of current block. + */ + private int blockType; + + /** + * Compressed length of current incoming block. + */ + private int compressedLength; + + /** + * Decompressed length of current incoming block. + */ + private int decompressedLength; + + /** + * Checksum value of current incoming block. + */ + private int currentChecksum; + + private final PageCacheRecycler recycler; + private final ArrayDeque> pages; + private int pageOffset = PageCacheRecycler.BYTE_PAGE_SIZE; + private boolean hasSkippedESHeader = false; + + public Lz4TransportDecompressor(PageCacheRecycler recycler) { + this.decompressor = LZ4Factory.safeInstance().fastDecompressor(); + this.recycler = recycler; + this.pages = new ArrayDeque<>(4); + this.checksum = null; + } + + @Override + public ReleasableBytesReference pollDecompressedPage(boolean isEOS) { + if (pages.isEmpty()) { + return null; + } else if (pages.size() == 1) { + if (isEOS) { + Recycler.V page = pages.pollFirst(); + ReleasableBytesReference reference = new ReleasableBytesReference(new BytesArray(page.v(), 0, pageOffset), page); + pageOffset = 0; + return reference; + } else { + return null; + } + } else { + Recycler.V page = pages.pollFirst(); + return new ReleasableBytesReference(new BytesArray(page.v()), page); + } + } + + @Override + public void close() { + for (Recycler.V page : pages) { + page.close(); + } + } + + @Override + public int decompress(BytesReference bytesReference) throws IOException { + int bytesConsumed = 0; + if (hasSkippedESHeader == false) { + hasSkippedESHeader = true; + int esHeaderLength = Compression.Scheme.HEADER_LENGTH; + bytesReference = bytesReference.slice(esHeaderLength, bytesReference.length() - esHeaderLength); + bytesConsumed += esHeaderLength; + } + + while (true) { + int consumed = decodeBlock(bytesReference); + bytesConsumed += consumed; + int newLength = bytesReference.length() - consumed; + if (consumed > 0 && newLength > 0) { + bytesReference = bytesReference.slice(consumed, newLength); + } else { + break; + } + } + + return bytesConsumed; + } + + private int decodeBlock(BytesReference reference) throws IOException { + int bytesConsumed = 0; + try { + switch (currentState) { + case INIT_BLOCK: + if (reference.length() < HEADER_LENGTH) { + return bytesConsumed; + } + try (StreamInput in = reference.streamInput()) { + final long magic = in.readLong(); + if (magic != MAGIC_NUMBER) { + throw new IllegalStateException("unexpected block identifier"); + } + + final int token = in.readByte(); + final int compressionLevel = (token & 0x0F) + COMPRESSION_LEVEL_BASE; + int blockType = token & 0xF0; + + int compressedLength = Integer.reverseBytes(in.readInt()); + if (compressedLength < 0 || compressedLength > MAX_BLOCK_SIZE) { + throw new IllegalStateException(String.format(Locale.ROOT, + "invalid compressedLength: %d (expected: 0-%d)", + compressedLength, MAX_BLOCK_SIZE)); + } + + int decompressedLength = Integer.reverseBytes(in.readInt()); + final int maxDecompressedLength = 1 << compressionLevel; + if (decompressedLength < 0 || decompressedLength > maxDecompressedLength) { + throw new IllegalStateException(String.format(Locale.ROOT, + "invalid decompressedLength: %d (expected: 0-%d)", + decompressedLength, maxDecompressedLength)); + } + if (decompressedLength == 0 && compressedLength != 0 + || decompressedLength != 0 && compressedLength == 0 + || blockType == BLOCK_TYPE_NON_COMPRESSED && decompressedLength != compressedLength) { + throw new IllegalStateException(String.format(Locale.ROOT, + "stream corrupted: compressedLength(%d) and decompressedLength(%d) mismatch", + compressedLength, decompressedLength)); + } + + int currentChecksum = Integer.reverseBytes(in.readInt()); + bytesConsumed += HEADER_LENGTH; + + if (decompressedLength == 0) { + if (currentChecksum != 0) { + throw new IllegalStateException("stream corrupted: checksum error"); + } + currentState = State.FINISHED; + decompressor = null; + checksum = null; + break; + } + + this.blockType = blockType; + this.compressedLength = compressedLength; + this.decompressedLength = decompressedLength; + this.currentChecksum = currentChecksum; + } + + currentState = State.DECOMPRESS_DATA; + break; + case DECOMPRESS_DATA: + if (reference.length() < compressedLength) { + break; + } + + final Checksum checksum = this.checksum; + byte[] decompressed = getThreadLocalBuffer(DECOMPRESSED, decompressedLength); + + try { + switch (blockType) { + case BLOCK_TYPE_NON_COMPRESSED: + try (StreamInput streamInput = reference.streamInput()) { + streamInput.readBytes(decompressed, 0, decompressedLength); + } + break; + case BLOCK_TYPE_COMPRESSED: + BytesRef ref = reference.iterator().next(); + final byte[] compressed; + final int compressedOffset; + if (ref.length >= compressedLength) { + compressed = ref.bytes; + compressedOffset = ref.offset; + } else { + compressed = getThreadLocalBuffer(COMPRESSED, compressedLength); + compressedOffset = 0; + try (StreamInput streamInput = reference.streamInput()) { + streamInput.readBytes(compressed, 0, compressedLength); + } + } + decompressor.decompress(compressed, compressedOffset, decompressed, 0, decompressedLength); + break; + default: + throw new IllegalStateException(String.format(Locale.ROOT, + "unexpected blockType: %d (expected: %d or %d)", + blockType, BLOCK_TYPE_NON_COMPRESSED, BLOCK_TYPE_COMPRESSED)); + } + // Skip inbound bytes after we processed them. + bytesConsumed += compressedLength; + + if (checksum != null) { + checksum.reset(); + checksum.update(decompressed, 0, decompressedLength); + final int checksumResult = (int) checksum.getValue(); + if (checksumResult != currentChecksum) { + throw new IllegalStateException(String.format(Locale.ROOT, + "stream corrupted: mismatching checksum: %d (expected: %d)", + checksumResult, currentChecksum)); + } + } + + int bytesToCopy = decompressedLength; + int uncompressedOffset = 0; + while (bytesToCopy > 0) { + final boolean isNewPage = pageOffset == PageCacheRecycler.BYTE_PAGE_SIZE; + if (isNewPage) { + pageOffset = 0; + pages.add(recycler.bytePage(false)); + } + final Recycler.V page = pages.getLast(); + + int toCopy = Math.min(bytesToCopy, PageCacheRecycler.BYTE_PAGE_SIZE - pageOffset); + System.arraycopy(decompressed, uncompressedOffset, page.v(), pageOffset, toCopy); + pageOffset += toCopy; + bytesToCopy -= toCopy; + uncompressedOffset += toCopy; + } + currentState = State.INIT_BLOCK; + } catch (LZ4Exception e) { + throw new IllegalStateException(e); + } + break; + case FINISHED: + break; + case CORRUPTED: + throw new IllegalStateException("LZ4 stream corrupted."); + default: + throw new IllegalStateException(); + } + } catch (IOException e) { + currentState = State.CORRUPTED; + throw e; + } + return bytesConsumed; + } + + private byte[] getThreadLocalBuffer(ThreadLocal threadLocal, int requiredSize) { + byte[] buffer = threadLocal.get(); + if (requiredSize > buffer.length) { + buffer = new byte[requiredSize]; + threadLocal.set(buffer); + } + return buffer; + } + + /** + * Returns {@code true} if and only if the end of the compressed stream + * has been reached. + */ + public boolean isClosed() { + return currentState == State.FINISHED; + } +} diff --git a/server/src/main/java/org/elasticsearch/transport/NetworkMessage.java b/server/src/main/java/org/elasticsearch/transport/NetworkMessage.java index fa4320aa42b9a..976da4268c3de 100644 --- a/server/src/main/java/org/elasticsearch/transport/NetworkMessage.java +++ b/server/src/main/java/org/elasticsearch/transport/NetworkMessage.java @@ -21,12 +21,18 @@ public abstract class NetworkMessage { protected final Writeable threadContext; protected final long requestId; protected final byte status; + protected final Compression.Scheme compressionScheme; - NetworkMessage(ThreadContext threadContext, Version version, byte status, long requestId) { + NetworkMessage(ThreadContext threadContext, Version version, byte status, long requestId, Compression.Scheme compressionScheme) { this.threadContext = threadContext.captureAsWriteable(); this.version = version; this.requestId = requestId; - this.status = status; + this.compressionScheme = adjustedScheme(version, compressionScheme); + if (this.compressionScheme != null) { + this.status = TransportStatus.setCompress(status); + } else { + this.status = status; + } } public Version getVersion() { @@ -56,4 +62,8 @@ boolean isHandshake() { boolean isError() { return TransportStatus.isError(status); } + + private static Compression.Scheme adjustedScheme(Version version, Compression.Scheme compressionScheme) { + return compressionScheme == Compression.Scheme.LZ4 && version.before(Compression.Scheme.LZ4_VERSION) ? null : compressionScheme; + } } diff --git a/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java b/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java index d5d16bf5f5020..b6a2f1e96fe5c 100644 --- a/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java @@ -40,19 +40,21 @@ final class OutboundHandler { private final StatsTracker statsTracker; private final ThreadPool threadPool; private final BigArrays bigArrays; + private final Compression.Scheme configuredCompressionScheme; private volatile long slowLogThresholdMs = Long.MAX_VALUE; private volatile TransportMessageListener messageListener = TransportMessageListener.NOOP_LISTENER; OutboundHandler(String nodeName, Version version, String[] features, StatsTracker statsTracker, ThreadPool threadPool, - BigArrays bigArrays) { + BigArrays bigArrays, Compression.Scheme compressionScheme) { this.nodeName = nodeName; this.version = version; this.features = features; this.statsTracker = statsTracker; this.threadPool = threadPool; this.bigArrays = bigArrays; + this.configuredCompressionScheme = compressionScheme; } void setSlowLogThreshold(TimeValue slowLogThreshold) { @@ -71,8 +73,15 @@ void sendRequest(final DiscoveryNode node, final TcpChannel channel, final long final TransportRequest request, final TransportRequestOptions options, final Version channelVersion, final boolean compressRequest, final boolean isHandshake) throws IOException, TransportException { Version version = Version.min(this.version, channelVersion); - OutboundMessage.Request message = new OutboundMessage.Request(threadPool.getThreadContext(), features, request, version, action, - requestId, isHandshake, compressRequest); + final Compression.Scheme compressionScheme; + if (compressRequest) { + compressionScheme = configuredCompressionScheme; + } else { + compressionScheme = null; + } + OutboundMessage.Request message = + new OutboundMessage.Request(threadPool.getThreadContext(), features, request, version, action, requestId, isHandshake, + compressionScheme); if (request.tryIncRef() == false) { assert false : "request [" + request + "] has been released already"; throw new AlreadyClosedException("request [" + request + "] has been released already"); @@ -93,12 +102,18 @@ void sendRequest(final DiscoveryNode node, final TcpChannel channel, final long * * @see #sendErrorResponse(Version, Set, TcpChannel, long, String, Exception) for sending error responses */ - void sendResponse(final Version nodeVersion, final Set features, final TcpChannel channel, - final long requestId, final String action, final TransportResponse response, - final boolean compress, final boolean isHandshake) throws IOException { + void sendResponse(final Version nodeVersion, final Set features, final TcpChannel channel, final long requestId, + final String action, final TransportResponse response, final boolean compressResponse, final boolean isHandshake) + throws IOException { Version version = Version.min(this.version, nodeVersion); + final Compression.Scheme compressionScheme; + if (compressResponse) { + compressionScheme = configuredCompressionScheme; + } else { + compressionScheme = null; + } OutboundMessage.Response message = new OutboundMessage.Response(threadPool.getThreadContext(), features, response, version, - requestId, isHandshake, compress); + requestId, isHandshake, compressionScheme); ActionListener listener = ActionListener.wrap(() -> messageListener.onResponseSent(requestId, action, response)); sendMessage(channel, message, listener); } @@ -112,7 +127,7 @@ void sendErrorResponse(final Version nodeVersion, final Set features, fi TransportAddress address = new TransportAddress(channel.getLocalAddress()); RemoteTransportException tx = new RemoteTransportException(nodeName, address, action, error); OutboundMessage.Response message = new OutboundMessage.Response(threadPool.getThreadContext(), features, tx, version, requestId, - false, false); + false, null); ActionListener listener = ActionListener.wrap(() -> messageListener.onResponseSent(requestId, action, error)); sendMessage(channel, message, listener); } diff --git a/server/src/main/java/org/elasticsearch/transport/OutboundMessage.java b/server/src/main/java/org/elasticsearch/transport/OutboundMessage.java index 7c7af3e5b796e..49a5746bbbdd0 100644 --- a/server/src/main/java/org/elasticsearch/transport/OutboundMessage.java +++ b/server/src/main/java/org/elasticsearch/transport/OutboundMessage.java @@ -26,8 +26,9 @@ abstract class OutboundMessage extends NetworkMessage { protected final Writeable message; - OutboundMessage(ThreadContext threadContext, Version version, byte status, long requestId, Writeable message) { - super(threadContext, version, status, requestId); + OutboundMessage(ThreadContext threadContext, Version version, byte status, long requestId, Compression.Scheme compressionScheme, + Writeable message) { + super(threadContext, version, status, requestId, compressionScheme); this.message = message; } @@ -90,8 +91,14 @@ BytesReference serialize(BytesStreamOutput bytesStream) throws IOException { // compressed stream wrapped bytes must be no-close wrapped since we need to close the compressed wrapper below to release // resources and write EOS marker bytes but must not yet release the bytes themselves - private OutputStreamStreamOutput wrapCompressed(BytesStreamOutput bytesStream) throws IOException { - return new OutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.noCloseStream(bytesStream))); + private StreamOutput wrapCompressed(BytesStreamOutput bytesStream) throws IOException { + if (compressionScheme == Compression.Scheme.DEFLATE) { + return new OutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.noCloseStream(bytesStream))); + } else if (compressionScheme == Compression.Scheme.LZ4) { + return new OutputStreamStreamOutput(Compression.Scheme.lz4OutputStream(Streams.noCloseStream(bytesStream))); + } else { + throw new IllegalArgumentException("Invalid compression scheme: " + compressionScheme); + } } protected void writeVariableHeader(StreamOutput stream) throws IOException { @@ -104,10 +111,10 @@ static class Request extends OutboundMessage { private final String action; Request(ThreadContext threadContext, String[] features, Writeable message, Version version, String action, long requestId, - boolean isHandshake, boolean compress) { - super(threadContext, version, setStatus(compress, isHandshake, message), requestId, message); - this.features = features; + boolean isHandshake, Compression.Scheme compressionScheme) { + super(threadContext, version, setStatus(isHandshake), requestId, adjustCompressionScheme(compressionScheme, message), message); this.action = action; + this.features = features; } @Override @@ -119,12 +126,18 @@ protected void writeVariableHeader(StreamOutput stream) throws IOException { stream.writeString(action); } - private static byte setStatus(boolean compress, boolean isHandshake, Writeable message) { + // Do not compress instances of BytesTransportRequest + private static Compression.Scheme adjustCompressionScheme(Compression.Scheme compressionScheme, Writeable message) { + if (message instanceof BytesTransportRequest) { + return null; + } else { + return compressionScheme; + } + } + + private static byte setStatus(boolean isHandshake) { byte status = 0; status = TransportStatus.setRequest(status); - if (compress && OutboundMessage.canCompress(message)) { - status = TransportStatus.setCompress(status); - } if (isHandshake) { status = TransportStatus.setHandshake(status); } @@ -144,8 +157,8 @@ static class Response extends OutboundMessage { private final Set features; Response(ThreadContext threadContext, Set features, Writeable message, Version version, long requestId, - boolean isHandshake, boolean compress) { - super(threadContext, version, setStatus(compress, isHandshake, message), requestId, message); + boolean isHandshake, Compression.Scheme compressionScheme) { + super(threadContext, version, setStatus(isHandshake, message), requestId, compressionScheme, message); this.features = features; } @@ -155,15 +168,12 @@ protected void writeVariableHeader(StreamOutput stream) throws IOException { stream.setFeatures(features); } - private static byte setStatus(boolean compress, boolean isHandshake, Writeable message) { + private static byte setStatus(boolean isHandshake, Writeable message) { byte status = 0; status = TransportStatus.setResponse(status); if (message instanceof RemoteTransportException) { status = TransportStatus.setError(status); } - if (compress) { - status = TransportStatus.setCompress(status); - } if (isHandshake) { status = TransportStatus.setHandshake(status); } @@ -177,8 +187,4 @@ public String toString() { + message.getClass() + "}"; } } - - private static boolean canCompress(Writeable message) { - return message instanceof BytesTransportRequest == false; - } } diff --git a/server/src/main/java/org/elasticsearch/transport/RawIndexingDataTransportRequest.java b/server/src/main/java/org/elasticsearch/transport/RawIndexingDataTransportRequest.java new file mode 100644 index 0000000000000..8969bd6a43fad --- /dev/null +++ b/server/src/main/java/org/elasticsearch/transport/RawIndexingDataTransportRequest.java @@ -0,0 +1,17 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.transport; + +/** + * Requests that implement this interface will be compressed when {@link TransportSettings#TRANSPORT_COMPRESS} + * is configured to {@link Compression.Enabled#INDEXING_DATA}. This is primary intended to be + * requests/responses primarily composed of raw source data. + */ +public interface RawIndexingDataTransportRequest { +} diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index 019dc2edd58c3..b87003887c82c 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -48,6 +48,7 @@ import java.util.stream.Stream; import static org.elasticsearch.common.settings.Setting.boolSetting; +import static org.elasticsearch.common.settings.Setting.enumSetting; import static org.elasticsearch.common.settings.Setting.timeSetting; /** @@ -149,10 +150,16 @@ public String getKey(final String key) { (ns, key) -> timeSetting(key, TransportSettings.PING_SCHEDULE, new RemoteConnectionEnabled<>(ns, key), Setting.Property.Dynamic, Setting.Property.NodeScope)); - public static final Setting.AffixSetting REMOTE_CLUSTER_COMPRESS = Setting.affixKeySetting( + public static final Setting.AffixSetting REMOTE_CLUSTER_COMPRESS = Setting.affixKeySetting( "cluster.remote.", "transport.compress", - (ns, key) -> boolSetting(key, TransportSettings.TRANSPORT_COMPRESS, + (ns, key) -> enumSetting(Compression.Enabled.class, key, TransportSettings.TRANSPORT_COMPRESS, + new RemoteConnectionEnabled<>(ns, key), Setting.Property.Dynamic, Setting.Property.NodeScope)); + + public static final Setting.AffixSetting REMOTE_CLUSTER_COMPRESSION_SCHEME = Setting.affixKeySetting( + "cluster.remote.", + "transport.compression_scheme", + (ns, key) -> enumSetting(Compression.Scheme.class, key, TransportSettings.TRANSPORT_COMPRESSION_SCHEME, new RemoteConnectionEnabled<>(ns, key), Setting.Property.Dynamic, Setting.Property.NodeScope)); private final boolean enabled; diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java index 528c8b3b08e03..96c3321588a52 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java @@ -125,6 +125,8 @@ static ConnectionProfile buildConnectionProfile(String clusterAlias, Settings se .setConnectTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings)) .setHandshakeTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings)) .setCompressionEnabled(RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterAlias).get(settings)) + .setCompressionScheme(RemoteClusterService.REMOTE_CLUSTER_COMPRESSION_SCHEME + .getConcreteSettingForNamespace(clusterAlias).get(settings)) .setPingInterval(RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE.getConcreteSettingForNamespace(clusterAlias).get(settings)) .addConnections(0, TransportRequestOptions.Type.BULK, TransportRequestOptions.Type.STATE, TransportRequestOptions.Type.RECOVERY, TransportRequestOptions.Type.PING) @@ -276,7 +278,10 @@ boolean shouldRebuildConnection(Settings newSettings) { if (newMode.equals(strategyType()) == false) { return true; } else { - Boolean compressionEnabled = RemoteClusterService.REMOTE_CLUSTER_COMPRESS + Compression.Enabled compressionEnabled = RemoteClusterService.REMOTE_CLUSTER_COMPRESS + .getConcreteSettingForNamespace(clusterAlias) + .get(newSettings); + Compression.Scheme compressionScheme = RemoteClusterService.REMOTE_CLUSTER_COMPRESSION_SCHEME .getConcreteSettingForNamespace(clusterAlias) .get(newSettings); TimeValue pingSchedule = RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE @@ -286,6 +291,7 @@ boolean shouldRebuildConnection(Settings newSettings) { ConnectionProfile oldProfile = connectionManager.getConnectionProfile(); ConnectionProfile.Builder builder = new ConnectionProfile.Builder(oldProfile); builder.setCompressionEnabled(compressionEnabled); + builder.setCompressionScheme(compressionScheme); builder.setPingInterval(pingSchedule); ConnectionProfile newProfile = builder.build(); return connectionProfileChanged(oldProfile, newProfile) || strategyMustBeRebuilt(newSettings); @@ -354,7 +360,8 @@ private List> getAndClearListeners() { private boolean connectionProfileChanged(ConnectionProfile oldProfile, ConnectionProfile newProfile) { return Objects.equals(oldProfile.getCompressionEnabled(), newProfile.getCompressionEnabled()) == false - || Objects.equals(oldProfile.getPingInterval(), newProfile.getPingInterval()) == false; + || Objects.equals(oldProfile.getPingInterval(), newProfile.getPingInterval()) == false + || Objects.equals(oldProfile.getCompressionScheme(), newProfile.getCompressionScheme()) == false; } static class StrategyValidator implements Setting.Validator { diff --git a/server/src/main/java/org/elasticsearch/transport/ReuseBuffersLZ4BlockOutputStream.java b/server/src/main/java/org/elasticsearch/transport/ReuseBuffersLZ4BlockOutputStream.java new file mode 100644 index 0000000000000..ed0c02f4ed035 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/transport/ReuseBuffersLZ4BlockOutputStream.java @@ -0,0 +1,334 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +/* + * Copyright 2020 Adrien Grand and the lz4-java contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.elasticsearch.transport; + +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.zip.Checksum; + +import net.jpountz.lz4.LZ4BlockInputStream; +import net.jpountz.lz4.LZ4Compressor; +import net.jpountz.lz4.LZ4Factory; +import net.jpountz.lz4.LZ4FrameOutputStream; +import net.jpountz.util.SafeUtils; +import net.jpountz.xxhash.StreamingXXHash32; +import net.jpountz.xxhash.XXHashFactory; + +import org.apache.lucene.util.BytesRef; + +/** + * This file is forked from https://github.com/lz4/lz4-java. In particular it forks the following file + * net.jpountz.lz4.LZ4BlockOutputStream. + * + * It modifies the original lz4-java code to allow the reuse of local thread local byte arrays. This prevents + * the need to allocate two new byte arrays everytime a new stream is created. For the Elasticsearch use case, + * a single thread should fully compress the stream in one go to avoid memory corruption. + * + * + * Streaming LZ4 (not compatible with the LZ4 Frame format). + * This class compresses data into fixed-size blocks of compressed data. + * This class uses its own format and is not compatible with the LZ4 Frame format. + * For interoperability with other LZ4 tools, use {@link LZ4FrameOutputStream}, + * which is compatible with the LZ4 Frame format. This class remains for backward compatibility. + * @see LZ4BlockInputStream + * @see LZ4FrameOutputStream + */ +public class ReuseBuffersLZ4BlockOutputStream extends FilterOutputStream { + + private static class ArrayBox { + private byte[] uncompressed = BytesRef.EMPTY_BYTES; + private byte[] compressed = BytesRef.EMPTY_BYTES; + private boolean owned = false; + + private void markOwnership(int uncompressedBlockSize, int compressedMaxSize) { + assert owned == false; + owned = true; + if (uncompressedBlockSize > uncompressed.length) { + uncompressed = new byte[uncompressedBlockSize]; + } + if (compressedMaxSize > compressed.length) { + compressed = new byte[compressedMaxSize]; + } + } + + private void release() { + owned = false; + } + } + + private static final ThreadLocal ARRAY_BOX = ThreadLocal.withInitial(ArrayBox::new); + + static final byte[] MAGIC = new byte[] { 'L', 'Z', '4', 'B', 'l', 'o', 'c', 'k' }; + static final int MAGIC_LENGTH = MAGIC.length; + + static final int HEADER_LENGTH = + MAGIC_LENGTH // magic bytes + + 1 // token + + 4 // compressed length + + 4 // decompressed length + + 4; // checksum + + static final int COMPRESSION_LEVEL_BASE = 10; + static final int MIN_BLOCK_SIZE = 64; + static final int MAX_BLOCK_SIZE = 1 << (COMPRESSION_LEVEL_BASE + 0x0F); + + static final int COMPRESSION_METHOD_RAW = 0x10; + static final int COMPRESSION_METHOD_LZ4 = 0x20; + + static final int DEFAULT_SEED = 0x9747b28c; + + private static int compressionLevel(int blockSize) { + if (blockSize < MIN_BLOCK_SIZE) { + throw new IllegalArgumentException("blockSize must be >= " + MIN_BLOCK_SIZE + ", got " + blockSize); + } else if (blockSize > MAX_BLOCK_SIZE) { + throw new IllegalArgumentException("blockSize must be <= " + MAX_BLOCK_SIZE + ", got " + blockSize); + } + int compressionLevel = 32 - Integer.numberOfLeadingZeros(blockSize - 1); // ceil of log2 + assert (1 << compressionLevel) >= blockSize; + assert blockSize * 2 > (1 << compressionLevel); + compressionLevel = Math.max(0, compressionLevel - COMPRESSION_LEVEL_BASE); + assert compressionLevel >= 0 && compressionLevel <= 0x0F; + return compressionLevel; + } + + private final int blockSize; + private final int compressionLevel; + private final LZ4Compressor compressor; + private final Checksum checksum; + private final ArrayBox arrayBox; + private final byte[] buffer; + private final byte[] compressedBuffer; + private final boolean syncFlush; + private boolean finished; + private int o; + + /** + * Creates a new {@link OutputStream} with configurable block size. Large + * blocks require more memory at compression and decompression time but + * should improve the compression ratio. + * + * @param out the {@link OutputStream} to feed + * @param blockSize the maximum number of bytes to try to compress at once, + * must be >= 64 and <= 32 M + * @param compressor the {@link LZ4Compressor} instance to use to compress + * data + * @param checksum the {@link Checksum} instance to use to check data for + * integrity. + * @param syncFlush true if pending data should also be flushed on {@link #flush()} + */ + public ReuseBuffersLZ4BlockOutputStream(OutputStream out, int blockSize, LZ4Compressor compressor, Checksum checksum, + boolean syncFlush) { + super(out); + this.blockSize = blockSize; + this.compressor = compressor; + this.checksum = checksum; + this.compressionLevel = compressionLevel(blockSize); + final int compressedBlockSize = HEADER_LENGTH + compressor.maxCompressedLength(blockSize); + this.arrayBox = ARRAY_BOX.get(); + arrayBox.markOwnership(blockSize, compressedBlockSize); + this.buffer = arrayBox.uncompressed; + this.compressedBuffer = arrayBox.compressed; + this.syncFlush = syncFlush; + o = 0; + finished = false; + System.arraycopy(MAGIC, 0, compressedBuffer, 0, MAGIC_LENGTH); + } + + /** + * Creates a new instance which checks stream integrity using + * {@link StreamingXXHash32} and doesn't sync flush. + * + * @param out the {@link OutputStream} to feed + * @param blockSize the maximum number of bytes to try to compress at once, + * must be >= 64 and <= 32 M + * @param compressor the {@link LZ4Compressor} instance to use to compress + * data + * + * @see #ReuseBuffersLZ4BlockOutputStream(OutputStream, int, LZ4Compressor, Checksum, boolean) + * @see StreamingXXHash32#asChecksum() + */ + public ReuseBuffersLZ4BlockOutputStream(OutputStream out, int blockSize, LZ4Compressor compressor) { + this(out, blockSize, compressor, XXHashFactory.fastestInstance().newStreamingHash32(DEFAULT_SEED).asChecksum(), false); + } + + /** + * Creates a new instance which compresses with the standard LZ4 compression + * algorithm. + * + * @param out the {@link OutputStream} to feed + * @param blockSize the maximum number of bytes to try to compress at once, + * must be >= 64 and <= 32 M + * + * @see #ReuseBuffersLZ4BlockOutputStream(OutputStream, int, LZ4Compressor) + * @see LZ4Factory#fastCompressor() + */ + public ReuseBuffersLZ4BlockOutputStream(OutputStream out, int blockSize) { + this(out, blockSize, LZ4Factory.fastestInstance().fastCompressor()); + } + + /** + * Creates a new instance which compresses into blocks of 64 KB. + * + * @param out the {@link OutputStream} to feed + * + * @see #ReuseBuffersLZ4BlockOutputStream(OutputStream, int) + */ + public ReuseBuffersLZ4BlockOutputStream(OutputStream out) { + this(out, 1 << 16); + } + + private void ensureNotFinished() { + if (finished) { + throw new IllegalStateException("This stream is already closed"); + } + } + + @Override + public void write(int b) throws IOException { + ensureNotFinished(); + if (o == blockSize) { + flushBufferedData(); + } + buffer[o++] = (byte) b; + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + SafeUtils.checkRange(b, off, len); + ensureNotFinished(); + + while (o + len > blockSize) { + final int l = blockSize - o; + System.arraycopy(b, off, buffer, o, blockSize - o); + o = blockSize; + flushBufferedData(); + off += l; + len -= l; + } + System.arraycopy(b, off, buffer, o, len); + o += len; + } + + @Override + public void write(byte[] b) throws IOException { + ensureNotFinished(); + write(b, 0, b.length); + } + + @Override + public void close() throws IOException { + try { + if (finished == false) { + finish(); + } + if (out != null) { + out.close(); + out = null; + } + } finally { + arrayBox.release(); + } + } + + private void flushBufferedData() throws IOException { + if (o == 0) { + return; + } + checksum.reset(); + checksum.update(buffer, 0, o); + final int check = (int) checksum.getValue(); + int compressedLength = compressor.compress(buffer, 0, o, compressedBuffer, HEADER_LENGTH); + final int compressMethod; + if (compressedLength >= o) { + compressMethod = COMPRESSION_METHOD_RAW; + compressedLength = o; + System.arraycopy(buffer, 0, compressedBuffer, HEADER_LENGTH, o); + } else { + compressMethod = COMPRESSION_METHOD_LZ4; + } + + compressedBuffer[MAGIC_LENGTH] = (byte) (compressMethod | compressionLevel); + writeIntLE(compressedLength, compressedBuffer, MAGIC_LENGTH + 1); + writeIntLE(o, compressedBuffer, MAGIC_LENGTH + 5); + writeIntLE(check, compressedBuffer, MAGIC_LENGTH + 9); + assert MAGIC_LENGTH + 13 == HEADER_LENGTH; + out.write(compressedBuffer, 0, HEADER_LENGTH + compressedLength); + o = 0; + } + + /** + * Flushes this compressed {@link OutputStream}. + * + * If the stream has been created with syncFlush=true, pending + * data will be compressed and appended to the underlying {@link OutputStream} + * before calling {@link OutputStream#flush()} on the underlying stream. + * Otherwise, this method just flushes the underlying stream, so pending + * data might not be available for reading until {@link #finish()} or + * {@link #close()} is called. + */ + @Override + public void flush() throws IOException { + if (out != null) { + if (syncFlush) { + flushBufferedData(); + } + out.flush(); + } + } + + /** + * Same as {@link #close()} except that it doesn't close the underlying stream. + * This can be useful if you want to keep on using the underlying stream. + * + * @throws IOException if an I/O error occurs. + */ + public void finish() throws IOException { + ensureNotFinished(); + flushBufferedData(); + compressedBuffer[MAGIC_LENGTH] = (byte) (COMPRESSION_METHOD_RAW | compressionLevel); + writeIntLE(0, compressedBuffer, MAGIC_LENGTH + 1); + writeIntLE(0, compressedBuffer, MAGIC_LENGTH + 5); + writeIntLE(0, compressedBuffer, MAGIC_LENGTH + 9); + assert MAGIC_LENGTH + 13 == HEADER_LENGTH; + out.write(compressedBuffer, 0, HEADER_LENGTH); + finished = true; + out.flush(); + } + + private static void writeIntLE(int i, byte[] buf, int off) { + buf[off++] = (byte) i; + buf[off++] = (byte) (i >>> 8); + buf[off++] = (byte) (i >>> 16); + buf[off++] = (byte) (i >>> 24); + } + + @Override + public String toString() { + return getClass().getSimpleName() + "(out=" + out + ", blockSize=" + blockSize + + ", compressor=" + compressor + ", checksum=" + checksum + ")"; + } + +} + diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 73e94175e804c..139d804bf9bd9 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -135,6 +135,7 @@ public TcpTransport(Settings settings, Version version, ThreadPool threadPool, P this.pageCacheRecycler = pageCacheRecycler; this.circuitBreakerService = circuitBreakerService; this.networkService = networkService; + Compression.Scheme compressionScheme = TransportSettings.TRANSPORT_COMPRESSION_SCHEME.get(settings); String nodeName = Node.NODE_NAME_SETTING.get(settings); final Settings defaultFeatures = TransportSettings.DEFAULT_FEATURES_SETTING.get(settings); String[] features; @@ -151,7 +152,7 @@ public TcpTransport(Settings settings, Version version, ThreadPool threadPool, P } BigArrays bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService, CircuitBreaker.IN_FLIGHT_REQUESTS); - this.outboundHandler = new OutboundHandler(nodeName, version, features, statsTracker, threadPool, bigArrays); + this.outboundHandler = new OutboundHandler(nodeName, version, features, statsTracker, threadPool, bigArrays, compressionScheme); this.handshaker = new TransportHandshaker(version, threadPool, (node, channel, requestId, v) -> outboundHandler.sendRequest(node, channel, requestId, TransportHandshaker.HANDSHAKE_ACTION_NAME, new TransportHandshaker.HandshakeRequest(version), @@ -198,7 +199,7 @@ public final class NodeChannels extends CloseableConnection { private final List channels; private final DiscoveryNode node; private final Version version; - private final boolean compress; + private final Compression.Enabled compress; private final AtomicBoolean isClosing = new AtomicBoolean(false); NodeChannels(DiscoveryNode node, List channels, ConnectionProfile connectionProfile, Version handshakeVersion) { @@ -257,7 +258,9 @@ public void sendRequest(long requestId, String action, TransportRequest request, throw new NodeNotConnectedException(node, "connection already closed"); } TcpChannel channel = channel(options.type()); - outboundHandler.sendRequest(node, channel, requestId, action, request, options, getVersion(), compress, false); + boolean shouldCompress = compress == Compression.Enabled.TRUE || + (compress == Compression.Enabled.INDEXING_DATA && request instanceof RawIndexingDataTransportRequest); + outboundHandler.sendRequest(node, channel, requestId, action, request, options, getVersion(), shouldCompress, false); } @Override diff --git a/server/src/main/java/org/elasticsearch/transport/TransportDecompressor.java b/server/src/main/java/org/elasticsearch/transport/TransportDecompressor.java index 58df5c632731d..07e477be13037 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportDecompressor.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportDecompressor.java @@ -8,130 +8,51 @@ package org.elasticsearch.transport; -import org.apache.lucene.util.BytesRef; -import org.apache.lucene.util.BytesRefIterator; -import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.ReleasableBytesReference; -import org.elasticsearch.common.compress.CompressorFactory; -import org.elasticsearch.core.Releasable; -import org.elasticsearch.common.recycler.Recycler; import org.elasticsearch.common.util.PageCacheRecycler; +import org.elasticsearch.core.Releasable; import java.io.IOException; -import java.util.ArrayDeque; -import java.util.zip.DataFormatException; -import java.util.zip.Inflater; -public class TransportDecompressor implements Releasable { +public interface TransportDecompressor extends Releasable { - private final Inflater inflater; - private final PageCacheRecycler recycler; - private final ArrayDeque> pages; - private int pageOffset = PageCacheRecycler.BYTE_PAGE_SIZE; - private boolean hasReadHeader = false; + /** + * Decompress the provided bytes + * + * @param bytesReference to decompress + * @return number of compressed bytes consumed + */ + int decompress(BytesReference bytesReference) throws IOException; - public TransportDecompressor(PageCacheRecycler recycler) { - this.recycler = recycler; - inflater = new Inflater(true); - pages = new ArrayDeque<>(4); - } + ReleasableBytesReference pollDecompressedPage(boolean isEOS); - public int decompress(BytesReference bytesReference) throws IOException { - int bytesConsumed = 0; - if (hasReadHeader == false) { - if (CompressorFactory.COMPRESSOR.isCompressed(bytesReference) == false) { - int maxToRead = Math.min(bytesReference.length(), 10); - StringBuilder sb = new StringBuilder("stream marked as compressed, but no compressor found, first [") - .append(maxToRead).append("] content bytes out of [").append(bytesReference.length()) - .append("] readable bytes with message size [").append(bytesReference.length()).append("] ").append("] are ["); - for (int i = 0; i < maxToRead; i++) { - sb.append(bytesReference.get(i)).append(","); - } - sb.append("]"); - throw new IllegalStateException(sb.toString()); - } - hasReadHeader = true; - int headerLength = CompressorFactory.COMPRESSOR.headerLength(); - bytesReference = bytesReference.slice(headerLength, bytesReference.length() - headerLength); - bytesConsumed += headerLength; - } + @Override + void close(); - BytesRefIterator refIterator = bytesReference.iterator(); - BytesRef ref; - while ((ref = refIterator.next()) != null) { - inflater.setInput(ref.bytes, ref.offset, ref.length); - bytesConsumed += ref.length; - boolean continueInflating = true; - while (continueInflating) { - final Recycler.V page; - final boolean isNewPage = pageOffset == PageCacheRecycler.BYTE_PAGE_SIZE; - if (isNewPage) { - pageOffset = 0; - page = recycler.bytePage(false); - } else { - page = pages.getLast(); - } - byte[] output = page.v(); - try { - int bytesInflated = inflater.inflate(output, pageOffset, PageCacheRecycler.BYTE_PAGE_SIZE - pageOffset); - pageOffset += bytesInflated; - if (isNewPage) { - if (bytesInflated == 0) { - page.close(); - pageOffset = PageCacheRecycler.BYTE_PAGE_SIZE; - } else { - pages.add(page); - } - } - } catch (DataFormatException e) { - throw new IOException("Exception while inflating bytes", e); - } - if (inflater.needsInput()) { - continueInflating = false; - } - if (inflater.finished()) { - bytesConsumed -= inflater.getRemaining(); - continueInflating = false; - } - assert inflater.needsDictionary() == false; - } + static TransportDecompressor getDecompressor(PageCacheRecycler recycler, BytesReference bytes) throws IOException { + if (bytes.length() < Compression.Scheme.HEADER_LENGTH) { + return null; } - return bytesConsumed; - } - - public boolean canDecompress(int bytesAvailable) { - return hasReadHeader || bytesAvailable >= CompressorFactory.COMPRESSOR.headerLength(); - } - - public boolean isEOS() { - return inflater.finished(); - } - - public ReleasableBytesReference pollDecompressedPage() { - if (pages.isEmpty()) { - return null; - } else if (pages.size() == 1) { - if (isEOS()) { - Recycler.V page = pages.pollFirst(); - ReleasableBytesReference reference = new ReleasableBytesReference(new BytesArray(page.v(), 0, pageOffset), page); - pageOffset = 0; - return reference; - } else { - return null; - } + if (Compression.Scheme.isDeflate(bytes)) { + return new DeflateTransportDecompressor(recycler); + } else if (Compression.Scheme.isLZ4(bytes)) { + return new Lz4TransportDecompressor(recycler); } else { - Recycler.V page = pages.pollFirst(); - return new ReleasableBytesReference(new BytesArray(page.v()), page); + throw createIllegalState(bytes); } } - @Override - public void close() { - inflater.end(); - for (Recycler.V page : pages) { - page.close(); + static IllegalStateException createIllegalState(BytesReference bytes) { + int maxToRead = Math.min(bytes.length(), 10); + StringBuilder sb = new StringBuilder("stream marked as compressed, but no compressor found, first [") + .append(maxToRead).append("] content bytes out of [").append(bytes.length()) + .append("] readable bytes with message size [").append(bytes.length()).append("] ").append("] are ["); + for (int i = 0; i < maxToRead; i++) { + sb.append(bytes.get(i)).append(","); } + sb.append("]"); + return new IllegalStateException(sb.toString()); } } diff --git a/server/src/main/java/org/elasticsearch/transport/TransportSettings.java b/server/src/main/java/org/elasticsearch/transport/TransportSettings.java index 15fb4f0404ec0..4762b8bb9b0f5 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportSettings.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportSettings.java @@ -21,6 +21,7 @@ import static java.util.Collections.emptyList; import static org.elasticsearch.common.settings.Setting.affixKeySetting; import static org.elasticsearch.common.settings.Setting.boolSetting; +import static org.elasticsearch.common.settings.Setting.enumSetting; import static org.elasticsearch.common.settings.Setting.intSetting; import static org.elasticsearch.common.settings.Setting.listSetting; import static org.elasticsearch.common.settings.Setting.timeSetting; @@ -51,10 +52,14 @@ public final class TransportSettings { intSetting("transport.publish_port", -1, -1, Setting.Property.NodeScope); public static final Setting.AffixSetting PUBLISH_PORT_PROFILE = affixKeySetting("transport.profiles.", "publish_port", key -> intSetting(key, -1, -1, Setting.Property.NodeScope)); - public static final Setting OLD_TRANSPORT_COMPRESS = - boolSetting("transport.tcp.compress", false, Setting.Property.NodeScope, Setting.Property.Deprecated); - public static final Setting TRANSPORT_COMPRESS = - boolSetting("transport.compress", OLD_TRANSPORT_COMPRESS, Setting.Property.NodeScope); + public static final Setting OLD_TRANSPORT_COMPRESS = + enumSetting(Compression.Enabled.class, "transport.tcp.compress", Compression.Enabled.FALSE, Setting.Property.NodeScope, + Setting.Property.Deprecated); + public static final Setting TRANSPORT_COMPRESS = + enumSetting(Compression.Enabled.class, "transport.compress", OLD_TRANSPORT_COMPRESS, (e) -> {}, Setting.Property.NodeScope); + public static final Setting TRANSPORT_COMPRESSION_SCHEME = + enumSetting(Compression.Scheme.class, "transport.compression_scheme", Compression.Scheme.DEFLATE, + Setting.Property.NodeScope); // the scheduled internal ping interval setting, defaults to disabled (-1) public static final Setting PING_SCHEDULE = timeSetting("transport.ping_schedule", TimeValue.timeValueSeconds(-1), Setting.Property.NodeScope); diff --git a/server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java b/server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java index 4976183220870..43d294489d4ab 100644 --- a/server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java @@ -56,7 +56,7 @@ public void createConnectionManager() { TimeValue oneSecond = new TimeValue(1000); TimeValue oneMinute = TimeValue.timeValueMinutes(1); connectionProfile = ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, oneSecond, oneSecond, - oneMinute, false); + oneMinute, Compression.Enabled.FALSE, Compression.Scheme.DEFLATE); } @After diff --git a/server/src/test/java/org/elasticsearch/transport/ConnectionProfileTests.java b/server/src/test/java/org/elasticsearch/transport/ConnectionProfileTests.java index 009694a0149d2..1f99d29a8f2d3 100644 --- a/server/src/test/java/org/elasticsearch/transport/ConnectionProfileTests.java +++ b/server/src/test/java/org/elasticsearch/transport/ConnectionProfileTests.java @@ -31,7 +31,10 @@ public void testBuildConnectionProfile() { TimeValue connectTimeout = TimeValue.timeValueMillis(randomIntBetween(1, 10)); TimeValue handshakeTimeout = TimeValue.timeValueMillis(randomIntBetween(1, 10)); TimeValue pingInterval = TimeValue.timeValueMillis(randomIntBetween(1, 10)); - boolean compressionEnabled = randomBoolean(); + Compression.Enabled compressionEnabled = + randomFrom(Compression.Enabled.TRUE, Compression.Enabled.FALSE, Compression.Enabled.INDEXING_DATA); + Compression.Scheme compressionScheme = + randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4); final boolean setConnectTimeout = randomBoolean(); if (setConnectTimeout) { builder.setConnectTimeout(connectTimeout); @@ -40,10 +43,16 @@ public void testBuildConnectionProfile() { if (setHandshakeTimeout) { builder.setHandshakeTimeout(handshakeTimeout); } + final boolean setCompress = randomBoolean(); if (setCompress) { builder.setCompressionEnabled(compressionEnabled); } + + final boolean setCompressionScheme = randomBoolean(); + if (setCompressionScheme) { + builder.setCompressionScheme(compressionScheme); + } final boolean setPingInterval = randomBoolean(); if (setPingInterval) { builder.setPingInterval(pingInterval); @@ -81,6 +90,12 @@ public void testBuildConnectionProfile() { assertNull(build.getCompressionEnabled()); } + if (setCompressionScheme) { + assertEquals(compressionScheme, build.getCompressionScheme()); + } else { + assertNull(build.getCompressionScheme()); + } + if (setPingInterval) { assertEquals(pingInterval, build.getPingInterval()); } else { @@ -171,7 +186,15 @@ public void testConnectionProfileResolve() { } final boolean connectionCompressSet = randomBoolean(); if (connectionCompressSet) { - builder.setCompressionEnabled(randomBoolean()); + Compression.Enabled compressionEnabled = + randomFrom(Compression.Enabled.TRUE, Compression.Enabled.FALSE, Compression.Enabled.INDEXING_DATA); + builder.setCompressionEnabled(compressionEnabled); + } + final boolean connectionCompressionScheme = randomBoolean(); + if (connectionCompressionScheme) { + Compression.Scheme compressionScheme = + randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4); + builder.setCompressionScheme(compressionScheme); } final ConnectionProfile profile = builder.build(); @@ -188,6 +211,9 @@ public void testConnectionProfileResolve() { equalTo(pingIntervalSet ? profile.getPingInterval() : defaultProfile.getPingInterval())); assertThat(resolved.getCompressionEnabled(), equalTo(connectionCompressSet ? profile.getCompressionEnabled() : defaultProfile.getCompressionEnabled())); + assertThat(resolved.getCompressionScheme(), + equalTo(connectionCompressionScheme ? profile.getCompressionScheme() : + defaultProfile.getCompressionScheme())); } public void testDefaultConnectionProfile() { @@ -201,6 +227,7 @@ public void testDefaultConnectionProfile() { assertEquals(TransportSettings.CONNECT_TIMEOUT.get(Settings.EMPTY), profile.getConnectTimeout()); assertEquals(TransportSettings.CONNECT_TIMEOUT.get(Settings.EMPTY), profile.getHandshakeTimeout()); assertEquals(TransportSettings.TRANSPORT_COMPRESS.get(Settings.EMPTY), profile.getCompressionEnabled()); + assertEquals(TransportSettings.TRANSPORT_COMPRESSION_SCHEME.get(Settings.EMPTY), profile.getCompressionScheme()); assertEquals(TransportSettings.PING_SCHEDULE.get(Settings.EMPTY), profile.getPingInterval()); profile = ConnectionProfile.buildDefaultConnectionProfile(nonMasterNode()); diff --git a/server/src/test/java/org/elasticsearch/transport/TransportDecompressorTests.java b/server/src/test/java/org/elasticsearch/transport/DeflateTransportDecompressorTests.java similarity index 86% rename from server/src/test/java/org/elasticsearch/transport/TransportDecompressorTests.java rename to server/src/test/java/org/elasticsearch/transport/DeflateTransportDecompressorTests.java index 60e06ceef2945..f7eddbe185d8d 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportDecompressorTests.java +++ b/server/src/test/java/org/elasticsearch/transport/DeflateTransportDecompressorTests.java @@ -24,7 +24,7 @@ import java.io.IOException; import java.io.OutputStream; -public class TransportDecompressorTests extends ESTestCase { +public class DeflateTransportDecompressorTests extends ESTestCase { public void testSimpleCompression() throws IOException { try (BytesStreamOutput output = new BytesStreamOutput()) { @@ -35,11 +35,11 @@ public void testSimpleCompression() throws IOException { BytesReference bytes = output.bytes(); - TransportDecompressor decompressor = new TransportDecompressor(PageCacheRecycler.NON_RECYCLING_INSTANCE); + DeflateTransportDecompressor decompressor = new DeflateTransportDecompressor(PageCacheRecycler.NON_RECYCLING_INSTANCE); int bytesConsumed = decompressor.decompress(bytes); assertEquals(bytes.length(), bytesConsumed); assertTrue(decompressor.isEOS()); - ReleasableBytesReference releasableBytesReference = decompressor.pollDecompressedPage(); + ReleasableBytesReference releasableBytesReference = decompressor.pollDecompressedPage(true); assertEquals(randomByte, releasableBytesReference.get(0)); releasableBytesReference.close(); @@ -57,14 +57,14 @@ public void testMultiPageCompression() throws IOException { BytesReference bytes = output.bytes(); - TransportDecompressor decompressor = new TransportDecompressor(PageCacheRecycler.NON_RECYCLING_INSTANCE); + DeflateTransportDecompressor decompressor = new DeflateTransportDecompressor(PageCacheRecycler.NON_RECYCLING_INSTANCE); int bytesConsumed = decompressor.decompress(bytes); assertEquals(bytes.length(), bytesConsumed); assertTrue(decompressor.isEOS()); - ReleasableBytesReference reference1 = decompressor.pollDecompressedPage(); - ReleasableBytesReference reference2 = decompressor.pollDecompressedPage(); - ReleasableBytesReference reference3 = decompressor.pollDecompressedPage(); - assertNull(decompressor.pollDecompressedPage()); + ReleasableBytesReference reference1 = decompressor.pollDecompressedPage(false); + ReleasableBytesReference reference2 = decompressor.pollDecompressedPage(false); + ReleasableBytesReference reference3 = decompressor.pollDecompressedPage(true); + assertNull(decompressor.pollDecompressedPage(true)); BytesReference composite = CompositeBytesReference.of(reference1, reference2, reference3); assertEquals(4 * 10000, composite.length()); StreamInput streamInput = composite.streamInput(); @@ -86,7 +86,7 @@ public void testIncrementalMultiPageCompression() throws IOException { BytesReference bytes = output.bytes(); - TransportDecompressor decompressor = new TransportDecompressor(PageCacheRecycler.NON_RECYCLING_INSTANCE); + DeflateTransportDecompressor decompressor = new DeflateTransportDecompressor(PageCacheRecycler.NON_RECYCLING_INSTANCE); int split1 = (int) (bytes.length() * 0.3); int split2 = (int) (bytes.length() * 0.65); @@ -103,10 +103,10 @@ public void testIncrementalMultiPageCompression() throws IOException { int bytesConsumed3 = decompressor.decompress(inbound3); assertEquals(inbound3.length(), bytesConsumed3); assertTrue(decompressor.isEOS()); - ReleasableBytesReference reference1 = decompressor.pollDecompressedPage(); - ReleasableBytesReference reference2 = decompressor.pollDecompressedPage(); - ReleasableBytesReference reference3 = decompressor.pollDecompressedPage(); - assertNull(decompressor.pollDecompressedPage()); + ReleasableBytesReference reference1 = decompressor.pollDecompressedPage(false); + ReleasableBytesReference reference2 = decompressor.pollDecompressedPage(false); + ReleasableBytesReference reference3 = decompressor.pollDecompressedPage(true); + assertNull(decompressor.pollDecompressedPage(false)); BytesReference composite = CompositeBytesReference.of(reference1, reference2, reference3); assertEquals(4 * 10000, composite.length()); StreamInput streamInput = composite.streamInput(); @@ -117,5 +117,4 @@ public void testIncrementalMultiPageCompression() throws IOException { } } - } diff --git a/server/src/test/java/org/elasticsearch/transport/InboundDecoderTests.java b/server/src/test/java/org/elasticsearch/transport/InboundDecoderTests.java index 272a99478ae51..4756ea771a3cd 100644 --- a/server/src/test/java/org/elasticsearch/transport/InboundDecoderTests.java +++ b/server/src/test/java/org/elasticsearch/transport/InboundDecoderTests.java @@ -48,10 +48,10 @@ public void testDecode() throws IOException { OutboundMessage message; if (isRequest) { message = new OutboundMessage.Request(threadContext, new String[0], new TestRequest(randomAlphaOfLength(100)), - Version.CURRENT, action, requestId, false, false); + Version.CURRENT, action, requestId, false, null); } else { message = new OutboundMessage.Response(threadContext, Collections.emptySet(), new TestResponse(randomAlphaOfLength(100)), - Version.CURRENT, requestId, false, false); + Version.CURRENT, requestId, false, null); } final BytesReference totalBytes = message.serialize(new BytesStreamOutput()); @@ -97,13 +97,14 @@ public void testDecode() throws IOException { public void testDecodePreHeaderSizeVariableInt() throws IOException { // TODO: Can delete test on 9.0 - boolean isCompressed = randomBoolean(); + Compression.Scheme compressionScheme = randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.DEFLATE, null); String action = "test-request"; long requestId = randomNonNegativeLong(); final Version preHeaderVariableInt = Version.V_7_5_0; final String contentValue = randomAlphaOfLength(100); - final OutboundMessage message = new OutboundMessage.Request(threadContext, new String[0], new TestRequest(contentValue), - preHeaderVariableInt, action, requestId, true, isCompressed); + // 8.0 is only compatible with handshakes on a pre-variable int version + final OutboundMessage message = new OutboundMessage.Request(threadContext, new String[0], new TestRequest(contentValue), + preHeaderVariableInt, action, requestId, true, compressionScheme); final BytesReference totalBytes = message.serialize(new BytesStreamOutput()); int partialHeaderSize = TcpHeader.headerSize(preHeaderVariableInt); @@ -118,7 +119,11 @@ public void testDecodePreHeaderSizeVariableInt() throws IOException { final Header header = (Header) fragments.get(0); assertEquals(requestId, header.getRequestId()); assertEquals(preHeaderVariableInt, header.getVersion()); - assertEquals(isCompressed, header.isCompressed()); + if (compressionScheme == null) { + assertFalse(header.isCompressed()); + } else { + assertTrue(header.isCompressed()); + } assertTrue(header.isHandshake()); assertTrue(header.isRequest()); assertTrue(header.needsToReadVariableHeader()); @@ -140,7 +145,7 @@ public void testDecodeHandshakeCompatibility() throws IOException { threadContext.putHeader(headerKey, headerValue); Version handshakeCompat = Version.CURRENT.minimumCompatibilityVersion().minimumCompatibilityVersion(); OutboundMessage message = new OutboundMessage.Request(threadContext, new String[0], new TestRequest(randomAlphaOfLength(100)), - handshakeCompat, action, requestId, true, false); + handshakeCompat, action, requestId, true, null); final BytesReference bytes = message.serialize(new BytesStreamOutput()); int totalHeaderSize = TcpHeader.headerSize(handshakeCompat); @@ -176,14 +181,15 @@ public void testCompressedDecode() throws IOException { } OutboundMessage message; TransportMessage transportMessage; + Compression.Scheme scheme = randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4); if (isRequest) { transportMessage = new TestRequest(randomAlphaOfLength(100)); message = new OutboundMessage.Request(threadContext, new String[0], transportMessage, Version.CURRENT, action, requestId, - false, true); + false, scheme); } else { transportMessage = new TestResponse(randomAlphaOfLength(100)); message = new OutboundMessage.Response(threadContext, Collections.emptySet(), transportMessage, Version.CURRENT, requestId, - false, true); + false, scheme); } final BytesReference totalBytes = message.serialize(new BytesStreamOutput()); @@ -237,7 +243,7 @@ public void testCompressedDecodeHandshakeCompatibility() throws IOException { threadContext.putHeader(headerKey, headerValue); Version handshakeCompat = Version.CURRENT.minimumCompatibilityVersion().minimumCompatibilityVersion(); OutboundMessage message = new OutboundMessage.Request(threadContext, new String[0], new TestRequest(randomAlphaOfLength(100)), - handshakeCompat, action, requestId, true, true); + handshakeCompat, action, requestId, true, Compression.Scheme.DEFLATE); final BytesReference bytes = message.serialize(new BytesStreamOutput()); int totalHeaderSize = TcpHeader.headerSize(handshakeCompat); @@ -265,7 +271,7 @@ public void testVersionIncompatibilityDecodeException() throws IOException { long requestId = randomNonNegativeLong(); Version incompatibleVersion = Version.CURRENT.minimumCompatibilityVersion().minimumCompatibilityVersion(); OutboundMessage message = new OutboundMessage.Request(threadContext, new String[0], new TestRequest(randomAlphaOfLength(100)), - incompatibleVersion, action, requestId, false, true); + incompatibleVersion, action, requestId, false, Compression.Scheme.DEFLATE); final BytesReference bytes = message.serialize(new BytesStreamOutput()); diff --git a/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java b/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java index 0037b342212f1..73906c245c4f1 100644 --- a/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java @@ -64,9 +64,9 @@ public void setUp() throws Exception { channel = new FakeTcpChannel(randomBoolean(), buildNewFakeTransportAddress().address(), buildNewFakeTransportAddress().address()); NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); TransportHandshaker handshaker = new TransportHandshaker(version, threadPool, (n, c, r, v) -> {}); + TransportKeepAlive keepAlive = new TransportKeepAlive(threadPool, TcpChannel::sendMessage); OutboundHandler outboundHandler = new OutboundHandler("node", version, new String[0], new StatsTracker(), threadPool, - BigArrays.NON_RECYCLING_INSTANCE); - TransportKeepAlive keepAlive = new TransportKeepAlive(threadPool, outboundHandler::sendBytes); + BigArrays.NON_RECYCLING_INSTANCE, randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4)); requestHandlers = new Transport.RequestHandlers(); responseHandlers = new Transport.ResponseHandlers(); handler = new InboundHandler(threadPool, outboundHandler, namedWriteableRegistry, handshaker, keepAlive, requestHandlers, @@ -126,7 +126,7 @@ public TestResponse read(StreamInput in) throws IOException { requestHandlers.registerHandler(registry); String requestValue = randomAlphaOfLength(10); OutboundMessage.Request request = new OutboundMessage.Request(threadPool.getThreadContext(), new String[0], - new TestRequest(requestValue), version, action, requestId, false, false); + new TestRequest(requestValue), version, action, requestId, false, null); BytesReference fullRequestBytes = request.serialize(new BytesStreamOutput()); BytesReference requestContent = fullRequestBytes.slice(headerSize, fullRequestBytes.length() - headerSize); diff --git a/server/src/test/java/org/elasticsearch/transport/InboundPipelineTests.java b/server/src/test/java/org/elasticsearch/transport/InboundPipelineTests.java index 51e3e1ef28195..cf9a8b23ef1dd 100644 --- a/server/src/test/java/org/elasticsearch/transport/InboundPipelineTests.java +++ b/server/src/test/java/org/elasticsearch/transport/InboundPipelineTests.java @@ -94,9 +94,16 @@ public void testPipelineHandling() throws IOException { try (BytesStreamOutput streamOutput = new BytesStreamOutput()) { while (streamOutput.size() < BYTE_THRESHOLD) { final Version version = randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion()); - final String value = randomAlphaOfLength(randomIntBetween(10, 200)); + final String value = randomRealisticUnicodeOfCodepointLength(randomIntBetween(200, 400)); final boolean isRequest = randomBoolean(); - final boolean isCompressed = randomBoolean(); + + Compression.Scheme scheme; + if (randomBoolean()) { + scheme = null; + } else { + scheme = randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4); + } + boolean isCompressed = isCompressed(version, scheme); final long requestId = totalMessages++; final MessageData messageData; @@ -107,17 +114,17 @@ public void testPipelineHandling() throws IOException { if (rarely()) { messageData = new MessageData(version, requestId, true, isCompressed, breakThisAction, null); message = new OutboundMessage.Request(threadContext, new String[0], new TestRequest(value), - version, breakThisAction, requestId, false, isCompressed); + version, breakThisAction, requestId, false, scheme); expectedExceptionClass = new CircuitBreakingException("", CircuitBreaker.Durability.PERMANENT); } else { messageData = new MessageData(version, requestId, true, isCompressed, actionName, value); message = new OutboundMessage.Request(threadContext, new String[0], new TestRequest(value), - version, actionName, requestId, false, isCompressed); + version, actionName, requestId, false, scheme); } } else { messageData = new MessageData(version, requestId, false, isCompressed, null, value); message = new OutboundMessage.Response(threadContext, Collections.emptySet(), new TestResponse(value), - version, requestId, false, isCompressed); + version, requestId, false, scheme); } expected.add(new Tuple<>(messageData, expectedExceptionClass)); @@ -166,6 +173,14 @@ public void testPipelineHandling() throws IOException { } } + private static boolean isCompressed(Version version, Compression.Scheme scheme) { + if (version.before(Compression.Scheme.LZ4_VERSION) && scheme == Compression.Scheme.LZ4) { + return false; + } else { + return scheme != null; + } + } + public void testDecodeExceptionIsPropagated() throws IOException { BiConsumer messageHandler = (c, m) -> {}; final StatsTracker statsTracker = new StatsTracker(); @@ -185,10 +200,10 @@ public void testDecodeExceptionIsPropagated() throws IOException { OutboundMessage message; if (isRequest) { message = new OutboundMessage.Request(threadContext, new String[0], new TestRequest(value), - invalidVersion, actionName, requestId, false, false); + invalidVersion, actionName, requestId, false, null); } else { message = new OutboundMessage.Response(threadContext, Collections.emptySet(), new TestResponse(value), - invalidVersion, requestId, false, false); + invalidVersion, requestId, false, null); } final BytesReference reference = message.serialize(streamOutput); @@ -222,10 +237,10 @@ public void testEnsureBodyIsNotPrematurelyReleased() throws IOException { OutboundMessage message; if (isRequest) { message = new OutboundMessage.Request(threadContext, new String[0], new TestRequest(value), - version, actionName, requestId, false, false); + version, actionName, requestId, false, null); } else { message = new OutboundMessage.Response(threadContext, Collections.emptySet(), new TestResponse(value), - version, requestId, false, false); + version, requestId, false, null); } final BytesReference reference = message.serialize(streamOutput); diff --git a/server/src/test/java/org/elasticsearch/transport/Lz4TransportDecompressorTests.java b/server/src/test/java/org/elasticsearch/transport/Lz4TransportDecompressorTests.java new file mode 100644 index 0000000000000..a1da456a096bc --- /dev/null +++ b/server/src/test/java/org/elasticsearch/transport/Lz4TransportDecompressorTests.java @@ -0,0 +1,173 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.transport; + +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.bytes.CompositeBytesReference; +import org.elasticsearch.common.bytes.ReleasableBytesReference; +import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.util.PageCacheRecycler; +import org.elasticsearch.core.Releasables; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.io.OutputStream; + +import static org.hamcrest.Matchers.lessThan; + +public class Lz4TransportDecompressorTests extends ESTestCase { + + public void testSimpleCompression() throws IOException { + try (BytesStreamOutput output = new BytesStreamOutput()) { + byte randomByte = randomByte(); + try (OutputStream lz4BlockStream = Compression.Scheme.lz4OutputStream(Streams.noCloseStream(output))) { + lz4BlockStream.write(randomByte); + } + + BytesReference bytes = output.bytes(); + + Lz4TransportDecompressor decompressor = new Lz4TransportDecompressor(PageCacheRecycler.NON_RECYCLING_INSTANCE); + int bytesConsumed = decompressor.decompress(bytes); + assertEquals(bytes.length(), bytesConsumed); + ReleasableBytesReference releasableBytesReference = decompressor.pollDecompressedPage(true); + assertEquals(randomByte, releasableBytesReference.get(0)); + releasableBytesReference.close(); + } + } + + public void testMultiPageCompression() throws IOException { + int intsToWrite = 50000; + int uncompressedLength = intsToWrite * 4; + + try (BytesStreamOutput output = new BytesStreamOutput()) { + try (StreamOutput lz4BlockStream = new OutputStreamStreamOutput(Compression.Scheme.lz4OutputStream( + Streams.flushOnCloseStream(output)))) { + for (int i = 0; i < intsToWrite; ++i) { + int lowByte = (i & 0xFF); + if (lowByte < 128) { + lz4BlockStream.writeInt(0); + } else if (lowByte < 200) { + lz4BlockStream.writeInt(1); + } else { + lz4BlockStream.writeInt(i); + } + } + } + + BytesReference bytes = output.bytes(); + // Since 200 / 255 data is repeated, we should get a compression ratio of at least 50% + assertThat(bytes.length(), lessThan(uncompressedLength / 2)); + + Lz4TransportDecompressor decompressor = new Lz4TransportDecompressor(PageCacheRecycler.NON_RECYCLING_INSTANCE); + int bytesConsumed = decompressor.decompress(bytes); + assertEquals(bytes.length(), bytesConsumed); + + int numOfUncompressedPages = uncompressedLength / PageCacheRecycler.BYTE_PAGE_SIZE; + if (bytes.length() % PageCacheRecycler.BYTE_PAGE_SIZE > 0) { + numOfUncompressedPages += 1; + } + + ReleasableBytesReference[] polledReferences = new ReleasableBytesReference[numOfUncompressedPages]; + for (int i = 0; i < numOfUncompressedPages - 1; ++i) { + polledReferences[i] = decompressor.pollDecompressedPage(false); + } + + polledReferences[numOfUncompressedPages - 1] = decompressor.pollDecompressedPage(true); + assertNull(decompressor.pollDecompressedPage(true)); + + BytesReference composite = CompositeBytesReference.of(polledReferences); + assertEquals(uncompressedLength, composite.length()); + StreamInput streamInput = composite.streamInput(); + for (int i = 0; i < intsToWrite; ++i) { + int lowByte = (i & 0xFF); + if (lowByte < 128) { + assertEquals(0, streamInput.readInt()); + } else if (lowByte < 200) { + assertEquals(1, streamInput.readInt()); + } else { + assertEquals(i, streamInput.readInt()); + } + } + Releasables.close(polledReferences); + } + } + + public void testIncrementalMultiPageCompression() throws IOException { + int intsToWrite = 50000; + int uncompressedLength = intsToWrite * 4; + + try (BytesStreamOutput output = new BytesStreamOutput()) { + try (StreamOutput lz4BlockStream = new OutputStreamStreamOutput( + Compression.Scheme.lz4OutputStream(Streams.flushOnCloseStream(output)))) { + for (int i = 0; i < intsToWrite; ++i) { + int lowByte = (i & 0xFF); + if (lowByte < 128) { + lz4BlockStream.writeInt(0); + } else if (lowByte < 200) { + lz4BlockStream.writeInt(1); + } else { + lz4BlockStream.writeInt(i); + } + } + } + + BytesReference bytes = output.bytes(); + // Since 200 / 255 data is repeated, we should get a compression ratio of at least 50% + assertThat(bytes.length(), lessThan(uncompressedLength / 2)); + + Lz4TransportDecompressor decompressor = new Lz4TransportDecompressor(PageCacheRecycler.NON_RECYCLING_INSTANCE); + + int split1 = (int) (bytes.length() * 0.3); + int split2 = (int) (bytes.length() * 0.65); + BytesReference inbound1 = bytes.slice(0, split1); + BytesReference inbound2 = bytes.slice(split1, split2 - split1); + BytesReference inbound3 = bytes.slice(split2, bytes.length() - split2); + + int bytesConsumed1 = decompressor.decompress(inbound1); + BytesReference next = CompositeBytesReference.of(inbound1.slice(bytesConsumed1, inbound1.length() - bytesConsumed1), inbound2); + int bytesConsumed2 = decompressor.decompress(next); + BytesReference next2 = CompositeBytesReference.of(next.slice(bytesConsumed2, next.length() - bytesConsumed2), inbound3); + int bytesConsumed3 = decompressor.decompress(next2); + assertEquals(bytes.length(), bytesConsumed1 + bytesConsumed2 + bytesConsumed3); + + int numOfUncompressedPages = uncompressedLength / PageCacheRecycler.BYTE_PAGE_SIZE; + if (bytes.length() % PageCacheRecycler.BYTE_PAGE_SIZE > 0) { + numOfUncompressedPages += 1; + } + + ReleasableBytesReference[] polledReferences = new ReleasableBytesReference[numOfUncompressedPages]; + for (int i = 0; i < numOfUncompressedPages - 1; ++i) { + polledReferences[i] = decompressor.pollDecompressedPage(false); + } + + polledReferences[numOfUncompressedPages - 1] = decompressor.pollDecompressedPage(true); + assertNull(decompressor.pollDecompressedPage(true)); + + BytesReference composite = CompositeBytesReference.of(polledReferences); + assertEquals(uncompressedLength, composite.length()); + StreamInput streamInput = composite.streamInput(); + for (int i = 0; i < intsToWrite; ++i) { + int lowByte = (i & 0xFF); + if (lowByte < 128) { + assertEquals(0, streamInput.readInt()); + } else if (lowByte < 200) { + assertEquals(1, streamInput.readInt()); + } else { + assertEquals(i, streamInput.readInt()); + } + } + Releasables.close(polledReferences); + + } + } +} diff --git a/server/src/test/java/org/elasticsearch/transport/OutboundHandlerTests.java b/server/src/test/java/org/elasticsearch/transport/OutboundHandlerTests.java index ada83c58cff37..23b12a013a622 100644 --- a/server/src/test/java/org/elasticsearch/transport/OutboundHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/OutboundHandlerTests.java @@ -61,6 +61,7 @@ public class OutboundHandlerTests extends ESTestCase { private OutboundHandler handler; private FakeTcpChannel channel; private DiscoveryNode node; + private Compression.Scheme compressionScheme; @Before public void setUp() throws Exception { @@ -70,7 +71,9 @@ public void setUp() throws Exception { node = new DiscoveryNode("", transportAddress, Version.CURRENT); String[] features = {feature1, feature2}; StatsTracker statsTracker = new StatsTracker(); - handler = new OutboundHandler("node", Version.CURRENT, features, statsTracker, threadPool, BigArrays.NON_RECYCLING_INSTANCE); + compressionScheme = randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4); + handler = new OutboundHandler("node", Version.CURRENT, features, statsTracker, threadPool, BigArrays.NON_RECYCLING_INSTANCE, + compressionScheme); final LongSupplier millisSupplier = () -> TimeValue.nsecToMSec(System.nanoTime()); final InboundDecoder decoder = new InboundDecoder(Version.CURRENT, PageCacheRecycler.NON_RECYCLING_INSTANCE); @@ -124,6 +127,8 @@ public void testSendRequest() throws IOException { long requestId = randomLongBetween(0, 300); boolean isHandshake = randomBoolean(); boolean compress = randomBoolean(); + boolean compressUnsupportedDueToVersion = compressionScheme == Compression.Scheme.LZ4 + && version.before(Compression.Scheme.LZ4_VERSION); String value = "message"; threadContext.putHeader("header", "header_value"); TestRequest request = new TestRequest(value); @@ -170,7 +175,7 @@ public void onRequestSent(DiscoveryNode node, long requestId, String action, Tra } else { assertFalse(header.isHandshake()); } - if (compress) { + if (compress && compressUnsupportedDueToVersion == false) { assertTrue(header.isCompressed()); } else { assertFalse(header.isCompressed()); @@ -187,6 +192,9 @@ public void testSendResponse() throws IOException { long requestId = randomLongBetween(0, 300); boolean isHandshake = randomBoolean(); boolean compress = randomBoolean(); + boolean compressUnsupportedDueToVersion = compressionScheme == Compression.Scheme.LZ4 + && version.before(Compression.Scheme.LZ4_VERSION); + String value = "message"; threadContext.putHeader("header", "header_value"); TestResponse response = new TestResponse(value); @@ -229,7 +237,7 @@ public void onResponseSent(long requestId, String action, TransportResponse resp } else { assertFalse(header.isHandshake()); } - if (compress) { + if (compress && compressUnsupportedDueToVersion == false) { assertTrue(header.isCompressed()); } else { assertFalse(header.isCompressed()); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index d08995c597be4..57be10116d245 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -360,8 +360,13 @@ public void testChangeSettings() throws Exception { Settings.Builder settingsChange = Settings.builder(); TimeValue pingSchedule = TimeValue.timeValueSeconds(randomIntBetween(6, 8)); settingsChange.put("cluster.remote.cluster_1.transport.ping_schedule", pingSchedule); - boolean compressionEnabled = true; - settingsChange.put("cluster.remote.cluster_1.transport.compress", compressionEnabled); + boolean compressionScheme = randomBoolean(); + Compression.Enabled enabled = randomFrom(Compression.Enabled.TRUE, Compression.Enabled.INDEXING_DATA); + if (compressionScheme) { + settingsChange.put("cluster.remote.cluster_1.transport.compression_scheme", Compression.Scheme.LZ4); + } else { + settingsChange.put("cluster.remote.cluster_1.transport.compress", enabled); + } settingsChange.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); service.validateAndUpdateRemoteCluster("cluster_1", settingsChange.build()); assertBusy(remoteClusterConnection::isClosed); @@ -369,7 +374,13 @@ public void testChangeSettings() throws Exception { remoteClusterConnection = service.getRemoteClusterConnection("cluster_1"); ConnectionProfile connectionProfile = remoteClusterConnection.getConnectionManager().getConnectionProfile(); assertEquals(pingSchedule, connectionProfile.getPingInterval()); - assertEquals(compressionEnabled, connectionProfile.getCompressionEnabled()); + if (compressionScheme) { + assertEquals(Compression.Enabled.FALSE, connectionProfile.getCompressionEnabled()); + assertEquals(Compression.Scheme.LZ4, connectionProfile.getCompressionScheme()); + } else { + assertEquals(enabled, connectionProfile.getCompressionEnabled()); + assertEquals(Compression.Scheme.DEFLATE, connectionProfile.getCompressionScheme()); + } } } } diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java index 7b48c29eb97df..0ecfaf3676cb4 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java @@ -45,7 +45,8 @@ public void testSameStrategyChangeMeansThatStrategyDoesNotNeedToBeRebuilt() { public void testChangeInConnectionProfileMeansTheStrategyMustBeRebuilt() { ClusterConnectionManager connectionManager = new ClusterConnectionManager(TestProfiles.LIGHT_PROFILE, mock(Transport.class)); assertEquals(TimeValue.MINUS_ONE, connectionManager.getConnectionProfile().getPingInterval()); - assertEquals(false, connectionManager.getConnectionProfile().getCompressionEnabled()); + assertEquals(Compression.Enabled.FALSE, connectionManager.getConnectionProfile().getCompressionEnabled()); + assertEquals(Compression.Scheme.DEFLATE, connectionManager.getConnectionProfile().getCompressionScheme()); RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager("cluster-alias", connectionManager); FakeConnectionStrategy first = new FakeConnectionStrategy("cluster-alias", mock(TransportService.class), remoteConnectionManager, RemoteConnectionStrategy.ConnectionStrategy.PROXY); @@ -53,11 +54,23 @@ public void testChangeInConnectionProfileMeansTheStrategyMustBeRebuilt() { Settings.Builder newBuilder = Settings.builder(); newBuilder.put(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace("cluster-alias").getKey(), "proxy"); newBuilder.put(ProxyConnectionStrategy.PROXY_ADDRESS.getConcreteSettingForNamespace("cluster-alias").getKey(), "127.0.0.1:9300"); - if (randomBoolean()) { + String ping = "ping"; + String compress = "compress"; + String compressionScheme = "compression_scheme"; + String change = randomFrom(ping, compress, compressionScheme); + if (change.equals(ping)) { newBuilder.put(RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE.getConcreteSettingForNamespace("cluster-alias").getKey(), TimeValue.timeValueSeconds(5)); + } else if (change.equals(compress)) { + newBuilder.put(RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace("cluster-alias").getKey(), + randomFrom(Compression.Enabled.INDEXING_DATA, Compression.Enabled.TRUE)); + } else if (change.equals(compressionScheme)) { + newBuilder.put( + RemoteClusterService.REMOTE_CLUSTER_COMPRESSION_SCHEME.getConcreteSettingForNamespace("cluster-alias").getKey(), + Compression.Scheme.LZ4 + ); } else { - newBuilder.put(RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace("cluster-alias").getKey(), true); + throw new AssertionError("Unexpected option: " + change); } assertTrue(first.shouldRebuildConnection(newBuilder.build())); } diff --git a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java index 58f7d5413f816..323bc78ca7959 100644 --- a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java @@ -403,7 +403,7 @@ private void testExceptionHandling(boolean startTransport, Exception exception, TcpTransport.handleException(channel, exception, lifecycle, new OutboundHandler(randomAlphaOfLength(10), Version.CURRENT, new String[0], new StatsTracker(), testThreadPool, - BigArrays.NON_RECYCLING_INSTANCE)); + BigArrays.NON_RECYCLING_INSTANCE, randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4))); if (expectClosed) { assertTrue(listener.isDone()); diff --git a/server/src/test/java/org/elasticsearch/transport/TransportLoggerTests.java b/server/src/test/java/org/elasticsearch/transport/TransportLoggerTests.java index 84175e12f2aff..b2942cbcf9483 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportLoggerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportLoggerTests.java @@ -78,7 +78,7 @@ public void testLoggingHandler() throws IOException { } private BytesReference buildRequest() throws IOException { - boolean compress = randomBoolean(); + Compression.Scheme compress = randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4, null); try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) { OutboundMessage.Request request = new OutboundMessage.Request(new ThreadContext(Settings.EMPTY), new String[0], new ClusterStatsRequest(), Version.CURRENT, ClusterStatsAction.NAME, randomInt(30), false, compress); diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index f7b167862a6e9..15c4da06cfe49 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -15,6 +15,7 @@ import com.carrotsearch.randomizedtesting.generators.RandomNumbers; import com.carrotsearch.randomizedtesting.generators.RandomPicks; import com.carrotsearch.randomizedtesting.generators.RandomStrings; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.store.AlreadyClosedException; @@ -46,7 +47,6 @@ import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.core.Nullable; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.Strings; import org.elasticsearch.common.breaker.CircuitBreaker; @@ -62,12 +62,14 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.Releasables; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.ZenDiscovery; @@ -106,6 +108,7 @@ import org.elasticsearch.test.disruption.ServiceDisruptionScheme; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.MockTransportClient; +import org.elasticsearch.transport.Compression; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportSettings; @@ -470,7 +473,20 @@ public Collection> getPlugins() { private static Settings getRandomNodeSettings(long seed) { Random random = new Random(seed); Builder builder = Settings.builder(); - builder.put(TransportSettings.TRANSPORT_COMPRESS.getKey(), rarely(random)); + if (rarely(random)) { + builder.put(TransportSettings.TRANSPORT_COMPRESS.getKey(), Compression.Enabled.TRUE); + } else { + if (random.nextBoolean()) { + builder.put(TransportSettings.TRANSPORT_COMPRESS.getKey(), Compression.Enabled.FALSE); + } else { + builder.put(TransportSettings.TRANSPORT_COMPRESS.getKey(), Compression.Enabled.INDEXING_DATA); + } + } + if (random.nextBoolean()) { + builder.put(TransportSettings.TRANSPORT_COMPRESSION_SCHEME.getKey(), Compression.Scheme.DEFLATE); + } else { + builder.put(TransportSettings.TRANSPORT_COMPRESSION_SCHEME.getKey(), Compression.Scheme.LZ4); + } if (random.nextBoolean()) { builder.put("cache.recycler.page.type", RandomPicks.randomFrom(random, PageCacheRecycler.Type.values())); } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 7ce897510ab2c..660dd6d94fdcf 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -554,7 +554,11 @@ public void testVoidMessageCompressed() { } }); - Settings settingsWithCompress = Settings.builder().put(TransportSettings.TRANSPORT_COMPRESS.getKey(), true).build(); + Settings settingsWithCompress = Settings.builder() + .put(TransportSettings.TRANSPORT_COMPRESS.getKey(), Compression.Enabled.TRUE) + .put(TransportSettings.TRANSPORT_COMPRESSION_SCHEME.getKey(), + randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4)) + .build(); ConnectionProfile connectionProfile = ConnectionProfile.buildDefaultConnectionProfile(settingsWithCompress); serviceC.connectToNode(serviceA.getLocalDiscoNode(), connectionProfile); @@ -605,7 +609,11 @@ public void testHelloWorldCompressed() throws IOException { } }); - Settings settingsWithCompress = Settings.builder().put(TransportSettings.TRANSPORT_COMPRESS.getKey(), true).build(); + Settings settingsWithCompress = Settings.builder() + .put(TransportSettings.TRANSPORT_COMPRESS.getKey(), Compression.Enabled.TRUE) + .put(TransportSettings.TRANSPORT_COMPRESSION_SCHEME.getKey(), + randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4)) + .build(); ConnectionProfile connectionProfile = ConnectionProfile.buildDefaultConnectionProfile(settingsWithCompress); serviceC.connectToNode(serviceA.getLocalDiscoNode(), connectionProfile); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/TestProfiles.java b/test/framework/src/main/java/org/elasticsearch/transport/TestProfiles.java index 432578b3cef28..637f09943b185 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/TestProfiles.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/TestProfiles.java @@ -26,6 +26,7 @@ private TestProfiles() {} builder.setConnectTimeout(source.getConnectTimeout()); builder.setHandshakeTimeout(source.getHandshakeTimeout()); builder.setCompressionEnabled(source.getCompressionEnabled()); + builder.setCompressionScheme(source.getCompressionScheme()); builder.setPingInterval(source.getPingInterval()); builder.addConnections(1, TransportRequestOptions.Type.BULK, diff --git a/test/framework/src/main/java/org/elasticsearch/transport/TestTransportChannels.java b/test/framework/src/main/java/org/elasticsearch/transport/TestTransportChannels.java index 0c007a9ecd730..73079c05e4cd8 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/TestTransportChannels.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/TestTransportChannels.java @@ -14,12 +14,15 @@ import java.util.Collections; +import static org.elasticsearch.test.ESTestCase.randomFrom; + public class TestTransportChannels { public static TcpTransportChannel newFakeTcpTransportChannel(String nodeName, TcpChannel channel, ThreadPool threadPool, String action, long requestId, Version version) { return new TcpTransportChannel( - new OutboundHandler(nodeName, version, new String[0], new StatsTracker(), threadPool, BigArrays.NON_RECYCLING_INSTANCE), + new OutboundHandler(nodeName, version, new String[0], new StatsTracker(), threadPool, BigArrays.NON_RECYCLING_INSTANCE, + randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4)), channel, action, requestId, version, Collections.emptySet(), false, false, () -> {}); } } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java index 0c603bcc512f1..31ea88e8421f0 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java @@ -171,6 +171,7 @@ protected ConnectionProfile maybeOverrideConnectionProfile(ConnectionProfile con builder.setConnectTimeout(connectionProfile.getConnectTimeout()); builder.setPingInterval(connectionProfile.getPingInterval()); builder.setCompressionEnabled(connectionProfile.getCompressionEnabled()); + builder.setCompressionScheme(connectionProfile.getCompressionScheme()); return builder.build(); } diff --git a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index f68b06868ab45..118641636220c 100644 --- a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -79,6 +79,7 @@ import org.elasticsearch.snapshots.SnapshotRestoreException; import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.test.BackgroundIndexer; +import org.elasticsearch.transport.Compression; import org.elasticsearch.transport.NoSuchRemoteClusterException; import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.transport.SniffConnectionStrategy; @@ -1325,7 +1326,8 @@ public void testUpdateRemoteConfigsDuringFollowing() throws Exception { ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest().masterNodeTimeout(TimeValue.MAX_VALUE); String address = getLeaderCluster().getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString(); - Setting compress = RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace("leader_cluster"); + Setting compress = + RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace("leader_cluster"); Setting> seeds = SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace("leader_cluster"); settingsRequest.persistentSettings(Settings.builder().put(compress.getKey(), true).put(seeds.getKey(), address)); assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet()); @@ -1356,7 +1358,8 @@ public void testUpdateRemoteConfigsDuringFollowing() throws Exception { } finally { ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest().masterNodeTimeout(TimeValue.MAX_VALUE); String address = getLeaderCluster().getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString(); - Setting compress = RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace("leader_cluster"); + Setting compress = + RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace("leader_cluster"); Setting> seeds = SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace("leader_cluster"); settingsRequest.persistentSettings(Settings.builder().put(compress.getKey(), compress.getDefault(Settings.EMPTY)) .put(seeds.getKey(), address)); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java index 3d4b24169d4d2..d3c938a755a1d 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java @@ -41,6 +41,7 @@ import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.RawIndexingDataTransportRequest; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ccr.Ccr; @@ -66,7 +67,7 @@ private ShardChangesAction() { super(NAME, ShardChangesAction.Response::new); } - public static class Request extends SingleShardRequest { + public static class Request extends SingleShardRequest implements RawIndexingDataTransportRequest { private long fromSeqNo; private int maxOperationCount;